aboutsummaryrefslogtreecommitdiffstats
path: root/core/test/tcp-transport.go
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2019-01-03 16:00:45 +0800
committerGitHub <noreply@github.com>2019-01-03 16:00:45 +0800
commit09a00a0580c2fd6e11b17f1793edca5f7bb2f5f8 (patch)
tree12362787be4d3b6bcd0051591a7bc0c60d859878 /core/test/tcp-transport.go
parent5739e74781092ac09d8b3a575cddc71b50beedf4 (diff)
downloaddexon-consensus-09a00a0580c2fd6e11b17f1793edca5f7bb2f5f8.tar
dexon-consensus-09a00a0580c2fd6e11b17f1793edca5f7bb2f5f8.tar.gz
dexon-consensus-09a00a0580c2fd6e11b17f1793edca5f7bb2f5f8.tar.bz2
dexon-consensus-09a00a0580c2fd6e11b17f1793edca5f7bb2f5f8.tar.lz
dexon-consensus-09a00a0580c2fd6e11b17f1793edca5f7bb2f5f8.tar.xz
dexon-consensus-09a00a0580c2fd6e11b17f1793edca5f7bb2f5f8.tar.zst
dexon-consensus-09a00a0580c2fd6e11b17f1793edca5f7bb2f5f8.zip
simulation: add latency for gossip (#389)
Diffstat (limited to 'core/test/tcp-transport.go')
-rw-r--r--core/test/tcp-transport.go69
1 files changed, 28 insertions, 41 deletions
diff --git a/core/test/tcp-transport.go b/core/test/tcp-transport.go
index b16bbcb..e04ba53 100644
--- a/core/test/tcp-transport.go
+++ b/core/test/tcp-transport.go
@@ -109,20 +109,14 @@ type TCPTransport struct {
recvChannel chan *TransportEnvelope
ctx context.Context
cancel context.CancelFunc
- latency LatencyModel
marshaller Marshaller
throughputRecords []ThroughputRecord
throughputLock sync.Mutex
}
// NewTCPTransport constructs an TCPTransport instance.
-func NewTCPTransport(
- peerType TransportPeerType,
- pubKey crypto.PublicKey,
- latency LatencyModel,
- marshaller Marshaller,
- localPort int) *TCPTransport {
-
+func NewTCPTransport(peerType TransportPeerType, pubKey crypto.PublicKey,
+ marshaller Marshaller, localPort int) *TCPTransport {
ctx, cancel := context.WithCancel(context.Background())
return &TCPTransport{
peerType: peerType,
@@ -133,7 +127,6 @@ func NewTCPTransport(
ctx: ctx,
cancel: cancel,
localPort: localPort,
- latency: latency,
marshaller: marshaller,
throughputRecords: []ThroughputRecord{},
}
@@ -202,6 +195,14 @@ func (t *TCPTransport) clientHandshake(conn net.Conn) (
return
}
+func (t *TCPTransport) send(
+ endpoint types.NodeID, msg interface{}, payload []byte) {
+ t.peersLock.RLock()
+ defer t.peersLock.RUnlock()
+ t.handleThroughputData(msg, payload)
+ t.peers[endpoint].sendChannel <- payload
+}
+
// Send implements Transport.Send method.
func (t *TCPTransport) Send(
endpoint types.NodeID, msg interface{}) (err error) {
@@ -210,38 +211,25 @@ func (t *TCPTransport) Send(
if err != nil {
return
}
- go func() {
- if t.latency != nil {
- time.Sleep(t.latency.Delay())
- }
- t.peersLock.RLock()
- defer t.peersLock.RUnlock()
- t.handleThroughputData(msg, payload)
- t.peers[endpoint].sendChannel <- payload
- }()
+ go t.send(endpoint, msg, payload)
return
}
// Broadcast implements Transport.Broadcast method.
-func (t *TCPTransport) Broadcast(msg interface{}) (err error) {
+func (t *TCPTransport) Broadcast(endpoints map[types.NodeID]struct{},
+ latency LatencyModel, msg interface{}) (err error) {
payload, err := t.marshalMessage(msg)
if err != nil {
return
}
- t.peersLock.RLock()
- defer t.peersLock.RUnlock()
-
- for nID, rec := range t.peers {
+ for nID := range endpoints {
if nID == t.nID {
continue
}
- go func(ch chan<- []byte) {
- if t.latency != nil {
- time.Sleep(t.latency.Delay())
- }
- t.handleThroughputData(msg, payload)
- ch <- payload
- }(rec.sendChannel)
+ go func(ID types.NodeID) {
+ time.Sleep(latency.Delay())
+ t.send(ID, msg, payload)
+ }(nID)
}
return
}
@@ -585,14 +573,12 @@ type TCPTransportClient struct {
// NewTCPTransportClient constructs a TCPTransportClient instance.
func NewTCPTransportClient(
pubKey crypto.PublicKey,
- latency LatencyModel,
marshaller Marshaller,
local bool) *TCPTransportClient {
return &TCPTransportClient{
- TCPTransport: *NewTCPTransport(
- TransportPeer, pubKey, latency, marshaller, 8080),
- local: local,
+ TCPTransport: *NewTCPTransport(TransportPeer, pubKey, marshaller, 8080),
+ local: local,
}
}
@@ -776,11 +762,7 @@ func NewTCPTransportServer(
// NOTE: the assumption here is the node ID of peers
// won't be zero.
TCPTransport: *NewTCPTransport(
- TransportPeerServer,
- prvKey.PublicKey(),
- nil,
- marshaller,
- serverPort),
+ TransportPeerServer, prvKey.PublicKey(), marshaller, serverPort),
}
}
@@ -828,7 +810,11 @@ func (t *TCPTransportServer) WaitForPeers(numPeers uint32) (err error) {
if err = t.buildConnectionsToPeers(); err != nil {
return
}
- if err = t.Broadcast(peersInfo); err != nil {
+ peers := make(map[types.NodeID]struct{})
+ for ID := range t.peers {
+ peers[ID] = struct{}{}
+ }
+ if err = t.Broadcast(peers, &FixedLatencyModel{}, peersInfo); err != nil {
return
}
// Wait for peers to send 'ready' report.
@@ -851,7 +837,8 @@ func (t *TCPTransportServer) WaitForPeers(numPeers uint32) (err error) {
}
}
// Ack all peers ready to go.
- if err = t.Broadcast(&tcpMessage{Type: "all-ready"}); err != nil {
+ if err = t.Broadcast(peers, &FixedLatencyModel{},
+ &tcpMessage{Type: "all-ready"}); err != nil {
return
}
return