aboutsummaryrefslogtreecommitdiffstats
path: root/core/test
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2019-01-03 16:00:45 +0800
committerGitHub <noreply@github.com>2019-01-03 16:00:45 +0800
commit09a00a0580c2fd6e11b17f1793edca5f7bb2f5f8 (patch)
tree12362787be4d3b6bcd0051591a7bc0c60d859878 /core/test
parent5739e74781092ac09d8b3a575cddc71b50beedf4 (diff)
downloadtangerine-consensus-09a00a0580c2fd6e11b17f1793edca5f7bb2f5f8.tar
tangerine-consensus-09a00a0580c2fd6e11b17f1793edca5f7bb2f5f8.tar.gz
tangerine-consensus-09a00a0580c2fd6e11b17f1793edca5f7bb2f5f8.tar.bz2
tangerine-consensus-09a00a0580c2fd6e11b17f1793edca5f7bb2f5f8.tar.lz
tangerine-consensus-09a00a0580c2fd6e11b17f1793edca5f7bb2f5f8.tar.xz
tangerine-consensus-09a00a0580c2fd6e11b17f1793edca5f7bb2f5f8.tar.zst
tangerine-consensus-09a00a0580c2fd6e11b17f1793edca5f7bb2f5f8.zip
simulation: add latency for gossip (#389)
Diffstat (limited to 'core/test')
-rw-r--r--core/test/fake-transport.go28
-rw-r--r--core/test/interface.go3
-rw-r--r--core/test/network.go201
-rw-r--r--core/test/network_test.go28
-rw-r--r--core/test/tcp-transport.go69
-rw-r--r--core/test/transport_test.go29
-rw-r--r--core/test/utils.go12
7 files changed, 191 insertions, 179 deletions
diff --git a/core/test/fake-transport.go b/core/test/fake-transport.go
index 388d0aa..056c3d5 100644
--- a/core/test/fake-transport.go
+++ b/core/test/fake-transport.go
@@ -39,7 +39,6 @@ type FakeTransport struct {
recvChannel chan *TransportEnvelope
serverChannel chan<- *TransportEnvelope
peers map[types.NodeID]fakePeerRecord
- latency LatencyModel
}
// NewFakeTransportServer constructs FakeTransport instance for peer server.
@@ -51,31 +50,24 @@ func NewFakeTransportServer() TransportServer {
}
// NewFakeTransportClient constructs FakeTransport instance for peer.
-func NewFakeTransportClient(
- pubKey crypto.PublicKey, latency LatencyModel) TransportClient {
-
+func NewFakeTransportClient(pubKey crypto.PublicKey) TransportClient {
return &FakeTransport{
peerType: TransportPeer,
recvChannel: make(chan *TransportEnvelope, 1000),
nID: types.NewNodeID(pubKey),
pubKey: pubKey,
- latency: latency,
}
}
// Send implements Transport.Send method.
func (t *FakeTransport) Send(
endpoint types.NodeID, msg interface{}) (err error) {
-
rec, exists := t.peers[endpoint]
if !exists {
err = fmt.Errorf("the endpoint does not exists: %v", endpoint)
return
}
go func(ch chan<- *TransportEnvelope) {
- if t.latency != nil {
- time.Sleep(t.latency.Delay())
- }
ch <- &TransportEnvelope{
PeerType: t.peerType,
From: t.nID,
@@ -98,12 +90,16 @@ func (t *FakeTransport) Report(msg interface{}) (err error) {
}
// Broadcast implements Transport.Broadcast method.
-func (t *FakeTransport) Broadcast(msg interface{}) (err error) {
- for k := range t.peers {
- if k == t.nID {
+func (t *FakeTransport) Broadcast(endpoints map[types.NodeID]struct{},
+ latency LatencyModel, msg interface{}) (err error) {
+ for ID := range endpoints {
+ if ID == t.nID {
continue
}
- t.Send(k, msg)
+ go func(nID types.NodeID) {
+ time.Sleep(latency.Delay())
+ t.Send(nID, msg)
+ }(ID)
}
return
}
@@ -177,7 +173,11 @@ func (t *FakeTransport) WaitForPeers(numPeers uint32) (err error) {
}
}
// The collected peer channels are shared for all peers.
- if err = t.Broadcast(t.peers); err != nil {
+ peers := make(map[types.NodeID]struct{})
+ for ID := range t.peers {
+ peers[ID] = struct{}{}
+ }
+ if err = t.Broadcast(peers, &FixedLatencyModel{}, t.peers); err != nil {
return
}
return
diff --git a/core/test/interface.go b/core/test/interface.go
index 1388dc1..d9578de 100644
--- a/core/test/interface.go
+++ b/core/test/interface.go
@@ -91,7 +91,8 @@ type TransportClient interface {
// Transport defines the interface for basic transportation capabilities.
type Transport interface {
// Broadcast a message to all peers in network.
- Broadcast(msg interface{}) error
+ Broadcast(endpoints map[types.NodeID]struct{}, latency LatencyModel,
+ msg interface{}) error
// Send one message to a peer.
Send(endpoint types.NodeID, msg interface{}) error
// Close would cleanup allocated resources.
diff --git a/core/test/network.go b/core/test/network.go
index a79898e..066d36c 100644
--- a/core/test/network.go
+++ b/core/test/network.go
@@ -22,7 +22,6 @@ import (
"encoding/json"
"errors"
"fmt"
- "log"
"net"
"strconv"
"sync"
@@ -52,9 +51,12 @@ const (
// NetworkConfig is the configuration for Network module.
type NetworkConfig struct {
- Type NetworkType
- PeerServer string
- PeerPort int
+ Type NetworkType
+ PeerServer string
+ PeerPort int
+ DirectLatency LatencyModel
+ GossipLatency LatencyModel
+ Marshaller Marshaller
}
// PullRequest is a generic request to pull everything (ex. vote, block...).
@@ -151,7 +153,6 @@ type Network struct {
unreceivedBlocks map[common.Hash]chan<- common.Hash
unreceivedRandomnessLock sync.RWMutex
unreceivedRandomness map[common.Hash]chan<- common.Hash
- latencyModel LatencyModel
cache *utils.NodeSetCache
notarySetCachesLock sync.Mutex
notarySetCaches map[uint64]map[uint32]map[types.NodeID]struct{}
@@ -161,8 +162,8 @@ type Network struct {
// NewNetwork setup network stuffs for nodes, which provides an
// implementation of core.Network based on TransportClient.
-func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel,
- marshaller Marshaller, config NetworkConfig) (n *Network) {
+func NewNetwork(pubKey crypto.PublicKey, config NetworkConfig) (
+ n *Network) {
// Construct basic network instance.
n = &Network{
ID: types.NewNodeID(pubKey),
@@ -175,9 +176,10 @@ func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel,
randomnessCache: make(map[common.Hash]*types.BlockRandomnessResult),
unreceivedBlocks: make(map[common.Hash]chan<- common.Hash),
unreceivedRandomness: make(map[common.Hash]chan<- common.Hash),
- latencyModel: latency,
- notarySetCaches: make(map[uint64]map[uint32]map[types.NodeID]struct{}),
- dkgSetCaches: make(map[uint64]map[types.NodeID]struct{}),
+ peers: make(map[types.NodeID]struct{}),
+ notarySetCaches: make(
+ map[uint64]map[uint32]map[types.NodeID]struct{}),
+ dkgSetCaches: make(map[uint64]map[types.NodeID]struct{}),
voteCache: make(
map[types.Position]map[types.VoteHeader]*types.Vote),
}
@@ -185,11 +187,11 @@ func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel,
// Construct transport layer.
switch config.Type {
case NetworkTypeTCPLocal:
- n.trans = NewTCPTransportClient(pubKey, latency, marshaller, true)
+ n.trans = NewTCPTransportClient(pubKey, config.Marshaller, true)
case NetworkTypeTCP:
- n.trans = NewTCPTransportClient(pubKey, latency, marshaller, false)
+ n.trans = NewTCPTransportClient(pubKey, config.Marshaller, false)
case NetworkTypeFake:
- n.trans = NewFakeTransportClient(pubKey, latency)
+ n.trans = NewFakeTransportClient(pubKey)
default:
panic(fmt.Errorf("unknown network type: %v", config.Type))
}
@@ -213,8 +215,11 @@ func (n *Network) PullRandomness(hashes common.Hashes) {
// BroadcastVote implements core.Network interface.
func (n *Network) BroadcastVote(vote *types.Vote) {
- n.broadcastToSet(
- n.getNotarySet(vote.Position.Round, vote.Position.ChainID), vote)
+ if err := n.trans.Broadcast(
+ n.getNotarySet(vote.Position.Round, vote.Position.ChainID),
+ n.config.DirectLatency, vote); err != nil {
+ panic(err)
+ }
n.addVoteToCache(vote)
}
@@ -222,28 +227,33 @@ func (n *Network) BroadcastVote(vote *types.Vote) {
func (n *Network) BroadcastBlock(block *types.Block) {
// Avoid data race in fake transport.
block = n.cloneForFake(block).(*types.Block)
- n.broadcastToSet(
- n.getNotarySet(block.Position.Round, block.Position.ChainID), block)
+ notarySet := n.getNotarySet(block.Position.Round, block.Position.ChainID)
+ if err := n.trans.Broadcast(
+ notarySet, n.config.DirectLatency, block); err != nil {
+ panic(err)
+ }
+ if err := n.trans.Broadcast(getComplementSet(n.peers, notarySet),
+ n.config.GossipLatency, block); err != nil {
+ panic(err)
+ }
n.addBlockToCache(block)
}
// BroadcastAgreementResult implements core.Network interface.
func (n *Network) BroadcastAgreementResult(
- randRequest *types.AgreementResult) {
- n.sentAgreementLock.Lock()
- defer n.sentAgreementLock.Unlock()
- if _, exist := n.sentAgreement[randRequest.BlockHash]; exist {
+ result *types.AgreementResult) {
+ if !n.markAgreementResultAsSent(result.BlockHash) {
return
}
- if len(n.sentAgreement) > 1000 {
- // Randomly drop one entry.
- for k := range n.sentAgreement {
- delete(n.sentAgreement, k)
- break
- }
+ // Send to DKG set first.
+ dkgSet := n.getDKGSet(result.Position.Round)
+ if err := n.trans.Broadcast(
+ dkgSet, n.config.DirectLatency, result); err != nil {
+ panic(err)
}
- n.sentAgreement[randRequest.BlockHash] = struct{}{}
- if err := n.trans.Broadcast(randRequest); err != nil {
+ // Gossip to other nodes.
+ if err := n.trans.Broadcast(getComplementSet(n.peers, dkgSet),
+ n.config.GossipLatency, result); err != nil {
panic(err)
}
}
@@ -251,20 +261,19 @@ func (n *Network) BroadcastAgreementResult(
// BroadcastRandomnessResult implements core.Network interface.
func (n *Network) BroadcastRandomnessResult(
randResult *types.BlockRandomnessResult) {
- n.sentRandomnessLock.Lock()
- defer n.sentRandomnessLock.Unlock()
- if _, exist := n.sentRandomness[randResult.BlockHash]; exist {
+ if !n.markRandomnessResultAsSent(randResult.BlockHash) {
return
}
- if len(n.sentRandomness) > 1000 {
- // Randomly drop one entry.
- for k := range n.sentRandomness {
- delete(n.sentRandomness, k)
- break
- }
+ // Send to notary set first.
+ notarySet := n.getNotarySet(
+ randResult.Position.Round, randResult.Position.ChainID)
+ if err := n.trans.Broadcast(
+ notarySet, n.config.DirectLatency, randResult); err != nil {
+ panic(err)
}
- n.sentRandomness[randResult.BlockHash] = struct{}{}
- if err := n.trans.Broadcast(randResult); err != nil {
+ // Gossip to other nodes.
+ if err := n.trans.Broadcast(getComplementSet(n.peers, notarySet),
+ n.config.GossipLatency, randResult); err != nil {
panic(err)
}
n.addRandomnessToCache(randResult)
@@ -273,21 +282,25 @@ func (n *Network) BroadcastRandomnessResult(
// SendDKGPrivateShare implements core.Network interface.
func (n *Network) SendDKGPrivateShare(
recv crypto.PublicKey, prvShare *typesDKG.PrivateShare) {
- if err := n.trans.Send(types.NewNodeID(recv), prvShare); err != nil {
- panic(err)
- }
+ n.send(types.NewNodeID(recv), prvShare)
}
// BroadcastDKGPrivateShare implements core.Network interface.
func (n *Network) BroadcastDKGPrivateShare(
prvShare *typesDKG.PrivateShare) {
- n.broadcastToSet(n.getDKGSet(prvShare.Round), prvShare)
+ if err := n.trans.Broadcast(n.getDKGSet(prvShare.Round),
+ n.config.DirectLatency, prvShare); err != nil {
+ panic(err)
+ }
}
// BroadcastDKGPartialSignature implements core.Network interface.
func (n *Network) BroadcastDKGPartialSignature(
psig *typesDKG.PartialSignature) {
- n.broadcastToSet(n.getDKGSet(psig.Round), psig)
+ if err := n.trans.Broadcast(
+ n.getDKGSet(psig.Round), n.config.DirectLatency, psig); err != nil {
+ panic(err)
+ }
}
// ReceiveChan implements core.Network interface.
@@ -312,7 +325,6 @@ func (n *Network) Setup(serverEndpoint interface{}) (err error) {
return
}
peerKeys := n.trans.Peers()
- n.peers = make(map[types.NodeID]struct{})
for _, k := range peerKeys {
n.peers[types.NewNodeID(k)] = struct{}{}
}
@@ -374,9 +386,7 @@ func (n *Network) handlePullRequest(req *PullRequest) {
break All
default:
}
- if err := n.trans.Send(req.Requester, b); err != nil {
- log.Println("unable to send block", req.Requester, err)
- }
+ n.send(req.Requester, b)
}
}()
case "vote":
@@ -386,9 +396,7 @@ func (n *Network) handlePullRequest(req *PullRequest) {
defer n.voteCacheLock.Unlock()
if votes, exists := n.voteCache[pos]; exists {
for _, v := range votes {
- if err := n.trans.Send(req.Requester, v); err != nil {
- log.Println("unable to send vote", req.Requester, err)
- }
+ n.send(req.Requester, v)
}
}
}()
@@ -408,9 +416,7 @@ func (n *Network) handlePullRequest(req *PullRequest) {
break All
default:
}
- if err := n.trans.Send(req.Requester, r); err != nil {
- log.Println("unable to send randomness", req.Requester, err)
- }
+ n.send(req.Requester, r)
}
}()
default:
@@ -457,24 +463,16 @@ func (n *Network) Report(msg interface{}) error {
return n.trans.Report(msg)
}
+// Broadcast a message to all peers.
+func (n *Network) Broadcast(msg interface{}) error {
+ return n.trans.Broadcast(n.peers, &FixedLatencyModel{}, msg)
+}
+
// Peers exports 'Peers' method of Transport.
func (n *Network) Peers() []crypto.PublicKey {
return n.trans.Peers()
}
-// Broadcast exports 'Broadcast' method of Transport, and would panic when
-// error.
-func (n *Network) Broadcast(msg interface{}) {
- if err := n.trans.Broadcast(msg); err != nil {
- panic(err)
- }
-}
-
-// Send exports 'Send' method of Transport.
-func (n *Network) Send(nodeID types.NodeID, msg interface{}) error {
- return n.trans.Send(nodeID, msg)
-}
-
// ReceiveChanForNode returns a channel for messages not handled by
// core.Consensus.
func (n *Network) ReceiveChanForNode() <-chan interface{} {
@@ -521,14 +519,11 @@ Loop:
if nID == n.ID {
continue
}
- if err := n.trans.Send(nID, req); err != nil {
- // Try next peer.
- continue
- }
+ n.send(nID, req)
select {
case <-n.ctx.Done():
break Loop
- case <-time.After(2 * n.latencyModel.Delay()):
+ case <-time.After(2 * n.config.DirectLatency.Delay()):
// Consume everything in the notification channel.
for {
select {
@@ -561,10 +556,7 @@ func (n *Network) pullVotesAsync(pos types.Position) {
// Randomly select one peer from notary set and send a pull request.
sentCount := 0
for nID := range notarySet {
- if err := n.trans.Send(nID, req); err != nil {
- // Try next peer.
- continue
- }
+ n.send(nID, req)
sentCount++
if sentCount >= maxPullingPeerCount {
break
@@ -598,14 +590,11 @@ Loop:
if nID == n.ID {
continue
}
- if err := n.trans.Send(nID, req); err != nil {
- // Try next peer.
- continue
- }
+ n.send(nID, req)
select {
case <-n.ctx.Done():
break Loop
- case <-time.After(2 * n.latencyModel.Delay()):
+ case <-time.After(2 * n.config.DirectLatency.Delay()):
// Consume everything in the notification channel.
for {
select {
@@ -673,6 +662,40 @@ func (n *Network) addRandomnessToCache(rand *types.BlockRandomnessResult) {
n.randomnessCache[rand.BlockHash] = rand
}
+func (n *Network) markAgreementResultAsSent(blockHash common.Hash) bool {
+ n.sentAgreementLock.Lock()
+ defer n.sentAgreementLock.Unlock()
+ if _, exist := n.sentAgreement[blockHash]; exist {
+ return false
+ }
+ if len(n.sentAgreement) > 1000 {
+ // Randomly drop one entry.
+ for k := range n.sentAgreement {
+ delete(n.sentAgreement, k)
+ break
+ }
+ }
+ n.sentAgreement[blockHash] = struct{}{}
+ return true
+}
+
+func (n *Network) markRandomnessResultAsSent(blockHash common.Hash) bool {
+ n.sentRandomnessLock.Lock()
+ defer n.sentRandomnessLock.Unlock()
+ if _, exist := n.sentRandomness[blockHash]; exist {
+ return false
+ }
+ if len(n.sentRandomness) > 1000 {
+ // Randomly drop one entry.
+ for k := range n.sentRandomness {
+ delete(n.sentRandomness, k)
+ break
+ }
+ }
+ n.sentRandomness[blockHash] = struct{}{}
+ return true
+}
+
func (n *Network) cloneForFake(v interface{}) interface{} {
if n.config.Type != NetworkTypeFake {
return v
@@ -735,15 +758,11 @@ func (n *Network) getDKGSet(round uint64) map[types.NodeID]struct{} {
return set
}
-// broadcastToSet broadcast a message to a set of nodes.
-func (n *Network) broadcastToSet(
- set map[types.NodeID]struct{}, msg interface{}) {
- for nID := range set {
- if nID == n.ID {
- continue
- }
- if err := n.trans.Send(nID, msg); err != nil {
+func (n *Network) send(endpoint types.NodeID, msg interface{}) {
+ go func() {
+ time.Sleep(n.config.DirectLatency.Delay())
+ if err := n.trans.Send(endpoint, msg); err != nil {
panic(err)
}
- }
+ }()
}
diff --git a/core/test/network_test.go b/core/test/network_test.go
index e05dec5..d040a16 100644
--- a/core/test/network_test.go
+++ b/core/test/network_test.go
@@ -48,11 +48,11 @@ func (s *NetworkTestSuite) setupNetworks(
// Setup several network modules.
networks := make(map[types.NodeID]*Network)
for _, key := range pubKeys {
- n := NewNetwork(
- key,
- &FixedLatencyModel{},
- NewDefaultMarshaller(nil),
- NetworkConfig{Type: NetworkTypeFake})
+ n := NewNetwork(key, NetworkConfig{
+ Type: NetworkTypeFake,
+ DirectLatency: &FixedLatencyModel{},
+ GossipLatency: &FixedLatencyModel{},
+ Marshaller: NewDefaultMarshaller(nil)})
networks[n.ID] = n
wg.Add(1)
go func() {
@@ -297,21 +297,13 @@ func (s *NetworkTestSuite) TestBroadcastToSet() {
// Try broadcasting with datum from round 0, and make sure only node belongs
// to that set receiving the message.
nerd.BroadcastVote(&types.Vote{})
- req.IsType(<-notaryNode.ReceiveChan(), &types.Vote{})
- nerd.BroadcastBlock(&types.Block{})
- req.IsType(<-notaryNode.ReceiveChan(), &types.Block{})
+ req.IsType(&types.Vote{}, <-notaryNode.ReceiveChan())
nerd.BroadcastDKGPrivateShare(&typesDKG.PrivateShare{})
- req.IsType(<-dkgNode.ReceiveChan(), &typesDKG.PrivateShare{})
+ req.IsType(&typesDKG.PrivateShare{}, <-dkgNode.ReceiveChan())
nerd.BroadcastDKGPartialSignature(&typesDKG.PartialSignature{})
- req.IsType(<-dkgNode.ReceiveChan(), &typesDKG.PartialSignature{})
- // There should be no remaining message in each node.
- for _, n := range networks {
- select {
- case <-n.ReceiveChan():
- req.False(true)
- default:
- }
- }
+ req.IsType(&typesDKG.PartialSignature{}, <-dkgNode.ReceiveChan())
+ nerd.BroadcastBlock(&types.Block{})
+ req.IsType(&types.Block{}, <-notaryNode.ReceiveChan())
}
func TestNetwork(t *testing.T) {
diff --git a/core/test/tcp-transport.go b/core/test/tcp-transport.go
index b16bbcb..e04ba53 100644
--- a/core/test/tcp-transport.go
+++ b/core/test/tcp-transport.go
@@ -109,20 +109,14 @@ type TCPTransport struct {
recvChannel chan *TransportEnvelope
ctx context.Context
cancel context.CancelFunc
- latency LatencyModel
marshaller Marshaller
throughputRecords []ThroughputRecord
throughputLock sync.Mutex
}
// NewTCPTransport constructs an TCPTransport instance.
-func NewTCPTransport(
- peerType TransportPeerType,
- pubKey crypto.PublicKey,
- latency LatencyModel,
- marshaller Marshaller,
- localPort int) *TCPTransport {
-
+func NewTCPTransport(peerType TransportPeerType, pubKey crypto.PublicKey,
+ marshaller Marshaller, localPort int) *TCPTransport {
ctx, cancel := context.WithCancel(context.Background())
return &TCPTransport{
peerType: peerType,
@@ -133,7 +127,6 @@ func NewTCPTransport(
ctx: ctx,
cancel: cancel,
localPort: localPort,
- latency: latency,
marshaller: marshaller,
throughputRecords: []ThroughputRecord{},
}
@@ -202,6 +195,14 @@ func (t *TCPTransport) clientHandshake(conn net.Conn) (
return
}
+func (t *TCPTransport) send(
+ endpoint types.NodeID, msg interface{}, payload []byte) {
+ t.peersLock.RLock()
+ defer t.peersLock.RUnlock()
+ t.handleThroughputData(msg, payload)
+ t.peers[endpoint].sendChannel <- payload
+}
+
// Send implements Transport.Send method.
func (t *TCPTransport) Send(
endpoint types.NodeID, msg interface{}) (err error) {
@@ -210,38 +211,25 @@ func (t *TCPTransport) Send(
if err != nil {
return
}
- go func() {
- if t.latency != nil {
- time.Sleep(t.latency.Delay())
- }
- t.peersLock.RLock()
- defer t.peersLock.RUnlock()
- t.handleThroughputData(msg, payload)
- t.peers[endpoint].sendChannel <- payload
- }()
+ go t.send(endpoint, msg, payload)
return
}
// Broadcast implements Transport.Broadcast method.
-func (t *TCPTransport) Broadcast(msg interface{}) (err error) {
+func (t *TCPTransport) Broadcast(endpoints map[types.NodeID]struct{},
+ latency LatencyModel, msg interface{}) (err error) {
payload, err := t.marshalMessage(msg)
if err != nil {
return
}
- t.peersLock.RLock()
- defer t.peersLock.RUnlock()
-
- for nID, rec := range t.peers {
+ for nID := range endpoints {
if nID == t.nID {
continue
}
- go func(ch chan<- []byte) {
- if t.latency != nil {
- time.Sleep(t.latency.Delay())
- }
- t.handleThroughputData(msg, payload)
- ch <- payload
- }(rec.sendChannel)
+ go func(ID types.NodeID) {
+ time.Sleep(latency.Delay())
+ t.send(ID, msg, payload)
+ }(nID)
}
return
}
@@ -585,14 +573,12 @@ type TCPTransportClient struct {
// NewTCPTransportClient constructs a TCPTransportClient instance.
func NewTCPTransportClient(
pubKey crypto.PublicKey,
- latency LatencyModel,
marshaller Marshaller,
local bool) *TCPTransportClient {
return &TCPTransportClient{
- TCPTransport: *NewTCPTransport(
- TransportPeer, pubKey, latency, marshaller, 8080),
- local: local,
+ TCPTransport: *NewTCPTransport(TransportPeer, pubKey, marshaller, 8080),
+ local: local,
}
}
@@ -776,11 +762,7 @@ func NewTCPTransportServer(
// NOTE: the assumption here is the node ID of peers
// won't be zero.
TCPTransport: *NewTCPTransport(
- TransportPeerServer,
- prvKey.PublicKey(),
- nil,
- marshaller,
- serverPort),
+ TransportPeerServer, prvKey.PublicKey(), marshaller, serverPort),
}
}
@@ -828,7 +810,11 @@ func (t *TCPTransportServer) WaitForPeers(numPeers uint32) (err error) {
if err = t.buildConnectionsToPeers(); err != nil {
return
}
- if err = t.Broadcast(peersInfo); err != nil {
+ peers := make(map[types.NodeID]struct{})
+ for ID := range t.peers {
+ peers[ID] = struct{}{}
+ }
+ if err = t.Broadcast(peers, &FixedLatencyModel{}, peersInfo); err != nil {
return
}
// Wait for peers to send 'ready' report.
@@ -851,7 +837,8 @@ func (t *TCPTransportServer) WaitForPeers(numPeers uint32) (err error) {
}
}
// Ack all peers ready to go.
- if err = t.Broadcast(&tcpMessage{Type: "all-ready"}); err != nil {
+ if err = t.Broadcast(peers, &FixedLatencyModel{},
+ &tcpMessage{Type: "all-ready"}); err != nil {
return
}
return
diff --git a/core/test/transport_test.go b/core/test/transport_test.go
index d5c4260..8305ee2 100644
--- a/core/test/transport_test.go
+++ b/core/test/transport_test.go
@@ -87,13 +87,11 @@ type TransportTestSuite struct {
}
func (s *TransportTestSuite) baseTest(
- server *testPeerServer,
- peers map[types.NodeID]*testPeer,
- delay time.Duration) {
-
+ server *testPeerServer, peers map[types.NodeID]*testPeer, delay float64) {
var (
- req = s.Require()
- wg sync.WaitGroup
+ req = s.Require()
+ delayDuration = time.Duration(delay) * time.Millisecond
+ wg sync.WaitGroup
)
// For each peers, do following stuffs:
@@ -150,6 +148,10 @@ func (s *TransportTestSuite) baseTest(
}
wg.Add(len(peers) + 1)
go handleServer(server)
+ peersAsMap := make(map[types.NodeID]struct{})
+ for nID := range peers {
+ peersAsMap[nID] = struct{}{}
+ }
for nID, peer := range peers {
go handlePeer(peer)
// Broadcast a block.
@@ -158,7 +160,8 @@ func (s *TransportTestSuite) baseTest(
Hash: common.NewRandomHash(),
}
peer.myBlockSentTime = time.Now()
- peer.trans.Broadcast(peer.myBlock)
+ peer.trans.Broadcast(
+ peersAsMap, &FixedLatencyModel{Latency: delay}, peer.myBlock)
// Report a block to server.
peer.expectedEchoHash = common.NewRandomHash()
peer.trans.Report(&types.Block{
@@ -187,7 +190,7 @@ func (s *TransportTestSuite) baseTest(
continue
}
req.True(otherPeer.blocksReceiveTime[peer.myBlock.Hash].Sub(
- peer.myBlockSentTime) >= delay)
+ peer.myBlockSentTime) >= delayDuration)
}
}
}
@@ -200,7 +203,6 @@ func (s *TransportTestSuite) TestFake() {
prvKeys = GenerateRandomPrivateKeys(peerCount)
err error
wg sync.WaitGroup
- latency = &FixedLatencyModel{Latency: 300}
server = &testPeerServer{trans: NewFakeTransportServer()}
)
// Setup PeerServer
@@ -212,7 +214,7 @@ func (s *TransportTestSuite) TestFake() {
nID := types.NewNodeID(key.PublicKey())
peer := &testPeer{
nID: nID,
- trans: NewFakeTransportClient(key.PublicKey(), latency),
+ trans: NewFakeTransportClient(key.PublicKey()),
}
peers[nID] = peer
go func() {
@@ -226,7 +228,7 @@ func (s *TransportTestSuite) TestFake() {
server.trans.WaitForPeers(uint32(peerCount))
// Make sure all clients are ready.
wg.Wait()
- s.baseTest(server, peers, 300*time.Millisecond)
+ s.baseTest(server, peers, 300)
req.Nil(server.trans.Close())
for _, peer := range peers {
req.Nil(peer.trans.Close())
@@ -242,7 +244,6 @@ func (s *TransportTestSuite) TestTCPLocal() {
prvKeys = GenerateRandomPrivateKeys(peerCount)
err error
wg sync.WaitGroup
- latency = &FixedLatencyModel{Latency: 300}
serverPort = 8080
serverAddr = net.JoinHostPort("127.0.0.1", strconv.Itoa(serverPort))
server = &testPeerServer{
@@ -258,7 +259,7 @@ func (s *TransportTestSuite) TestTCPLocal() {
peer := &testPeer{
nID: nID,
trans: NewTCPTransportClient(
- prvKey.PublicKey(), latency, &testMarshaller{}, true),
+ prvKey.PublicKey(), &testMarshaller{}, true),
}
peers[nID] = peer
go func() {
@@ -274,7 +275,7 @@ func (s *TransportTestSuite) TestTCPLocal() {
// Make sure all clients are ready.
wg.Wait()
- s.baseTest(server, peers, 300*time.Millisecond)
+ s.baseTest(server, peers, 300)
req.Nil(server.trans.Close())
for _, peer := range peers {
req.Nil(peer.trans.Close())
diff --git a/core/test/utils.go b/core/test/utils.go
index 8a14ebf..d85395b 100644
--- a/core/test/utils.go
+++ b/core/test/utils.go
@@ -244,3 +244,15 @@ func LaunchDummyReceiver(
}()
return dummyCancel
}
+
+func getComplementSet(
+ all, set map[types.NodeID]struct{}) map[types.NodeID]struct{} {
+ complement := make(map[types.NodeID]struct{})
+ for nID := range all {
+ if _, exists := set[nID]; exists {
+ continue
+ }
+ complement[nID] = struct{}{}
+ }
+ return complement
+}