diff options
author | Mission Liao <mission.liao@dexon.org> | 2019-01-03 16:00:45 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-01-03 16:00:45 +0800 |
commit | 09a00a0580c2fd6e11b17f1793edca5f7bb2f5f8 (patch) | |
tree | 12362787be4d3b6bcd0051591a7bc0c60d859878 | |
parent | 5739e74781092ac09d8b3a575cddc71b50beedf4 (diff) | |
download | dexon-consensus-09a00a0580c2fd6e11b17f1793edca5f7bb2f5f8.tar dexon-consensus-09a00a0580c2fd6e11b17f1793edca5f7bb2f5f8.tar.gz dexon-consensus-09a00a0580c2fd6e11b17f1793edca5f7bb2f5f8.tar.bz2 dexon-consensus-09a00a0580c2fd6e11b17f1793edca5f7bb2f5f8.tar.lz dexon-consensus-09a00a0580c2fd6e11b17f1793edca5f7bb2f5f8.tar.xz dexon-consensus-09a00a0580c2fd6e11b17f1793edca5f7bb2f5f8.tar.zst dexon-consensus-09a00a0580c2fd6e11b17f1793edca5f7bb2f5f8.zip |
simulation: add latency for gossip (#389)
-rw-r--r-- | cmd/dexcon-simulation-with-scheduler/main.go | 6 | ||||
-rw-r--r-- | core/test/fake-transport.go | 28 | ||||
-rw-r--r-- | core/test/interface.go | 3 | ||||
-rw-r--r-- | core/test/network.go | 201 | ||||
-rw-r--r-- | core/test/network_test.go | 28 | ||||
-rw-r--r-- | core/test/tcp-transport.go | 69 | ||||
-rw-r--r-- | core/test/transport_test.go | 29 | ||||
-rw-r--r-- | core/test/utils.go | 12 | ||||
-rw-r--r-- | integration_test/consensus_test.go | 11 | ||||
-rw-r--r-- | simulation/config/config.go | 27 | ||||
-rw-r--r-- | simulation/node.go | 23 | ||||
-rw-r--r-- | simulation/peer-server.go | 9 | ||||
-rw-r--r-- | test_config/test-config-change.toml | 5 | ||||
-rw-r--r-- | test_config/test.toml | 5 |
14 files changed, 244 insertions, 212 deletions
diff --git a/cmd/dexcon-simulation-with-scheduler/main.go b/cmd/dexcon-simulation-with-scheduler/main.go index 1ce9c99..8d59825 100644 --- a/cmd/dexcon-simulation-with-scheduler/main.go +++ b/cmd/dexcon-simulation-with-scheduler/main.go @@ -69,8 +69,8 @@ func main() { } // Setup latencies, nodes. networkLatency := &test.NormalLatencyModel{ - Sigma: cfg.Networking.Sigma, - Mean: cfg.Networking.Mean, + Sigma: cfg.Networking.Direct.Sigma, + Mean: cfg.Networking.Direct.Mean, } proposingLatency := &test.NormalLatencyModel{ Sigma: cfg.Node.Legacy.ProposeIntervalSigma, @@ -85,7 +85,7 @@ func main() { gov, err := test.NewGovernance( test.NewState( pubKeys, - time.Duration(cfg.Networking.Mean)*time.Millisecond, + time.Duration(cfg.Networking.Direct.Mean)*time.Millisecond, &common.NullLogger{}, true, ), core.ConfigRoundShift) diff --git a/core/test/fake-transport.go b/core/test/fake-transport.go index 388d0aa..056c3d5 100644 --- a/core/test/fake-transport.go +++ b/core/test/fake-transport.go @@ -39,7 +39,6 @@ type FakeTransport struct { recvChannel chan *TransportEnvelope serverChannel chan<- *TransportEnvelope peers map[types.NodeID]fakePeerRecord - latency LatencyModel } // NewFakeTransportServer constructs FakeTransport instance for peer server. @@ -51,31 +50,24 @@ func NewFakeTransportServer() TransportServer { } // NewFakeTransportClient constructs FakeTransport instance for peer. -func NewFakeTransportClient( - pubKey crypto.PublicKey, latency LatencyModel) TransportClient { - +func NewFakeTransportClient(pubKey crypto.PublicKey) TransportClient { return &FakeTransport{ peerType: TransportPeer, recvChannel: make(chan *TransportEnvelope, 1000), nID: types.NewNodeID(pubKey), pubKey: pubKey, - latency: latency, } } // Send implements Transport.Send method. func (t *FakeTransport) Send( endpoint types.NodeID, msg interface{}) (err error) { - rec, exists := t.peers[endpoint] if !exists { err = fmt.Errorf("the endpoint does not exists: %v", endpoint) return } go func(ch chan<- *TransportEnvelope) { - if t.latency != nil { - time.Sleep(t.latency.Delay()) - } ch <- &TransportEnvelope{ PeerType: t.peerType, From: t.nID, @@ -98,12 +90,16 @@ func (t *FakeTransport) Report(msg interface{}) (err error) { } // Broadcast implements Transport.Broadcast method. -func (t *FakeTransport) Broadcast(msg interface{}) (err error) { - for k := range t.peers { - if k == t.nID { +func (t *FakeTransport) Broadcast(endpoints map[types.NodeID]struct{}, + latency LatencyModel, msg interface{}) (err error) { + for ID := range endpoints { + if ID == t.nID { continue } - t.Send(k, msg) + go func(nID types.NodeID) { + time.Sleep(latency.Delay()) + t.Send(nID, msg) + }(ID) } return } @@ -177,7 +173,11 @@ func (t *FakeTransport) WaitForPeers(numPeers uint32) (err error) { } } // The collected peer channels are shared for all peers. - if err = t.Broadcast(t.peers); err != nil { + peers := make(map[types.NodeID]struct{}) + for ID := range t.peers { + peers[ID] = struct{}{} + } + if err = t.Broadcast(peers, &FixedLatencyModel{}, t.peers); err != nil { return } return diff --git a/core/test/interface.go b/core/test/interface.go index 1388dc1..d9578de 100644 --- a/core/test/interface.go +++ b/core/test/interface.go @@ -91,7 +91,8 @@ type TransportClient interface { // Transport defines the interface for basic transportation capabilities. type Transport interface { // Broadcast a message to all peers in network. - Broadcast(msg interface{}) error + Broadcast(endpoints map[types.NodeID]struct{}, latency LatencyModel, + msg interface{}) error // Send one message to a peer. Send(endpoint types.NodeID, msg interface{}) error // Close would cleanup allocated resources. diff --git a/core/test/network.go b/core/test/network.go index a79898e..066d36c 100644 --- a/core/test/network.go +++ b/core/test/network.go @@ -22,7 +22,6 @@ import ( "encoding/json" "errors" "fmt" - "log" "net" "strconv" "sync" @@ -52,9 +51,12 @@ const ( // NetworkConfig is the configuration for Network module. type NetworkConfig struct { - Type NetworkType - PeerServer string - PeerPort int + Type NetworkType + PeerServer string + PeerPort int + DirectLatency LatencyModel + GossipLatency LatencyModel + Marshaller Marshaller } // PullRequest is a generic request to pull everything (ex. vote, block...). @@ -151,7 +153,6 @@ type Network struct { unreceivedBlocks map[common.Hash]chan<- common.Hash unreceivedRandomnessLock sync.RWMutex unreceivedRandomness map[common.Hash]chan<- common.Hash - latencyModel LatencyModel cache *utils.NodeSetCache notarySetCachesLock sync.Mutex notarySetCaches map[uint64]map[uint32]map[types.NodeID]struct{} @@ -161,8 +162,8 @@ type Network struct { // NewNetwork setup network stuffs for nodes, which provides an // implementation of core.Network based on TransportClient. -func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel, - marshaller Marshaller, config NetworkConfig) (n *Network) { +func NewNetwork(pubKey crypto.PublicKey, config NetworkConfig) ( + n *Network) { // Construct basic network instance. n = &Network{ ID: types.NewNodeID(pubKey), @@ -175,9 +176,10 @@ func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel, randomnessCache: make(map[common.Hash]*types.BlockRandomnessResult), unreceivedBlocks: make(map[common.Hash]chan<- common.Hash), unreceivedRandomness: make(map[common.Hash]chan<- common.Hash), - latencyModel: latency, - notarySetCaches: make(map[uint64]map[uint32]map[types.NodeID]struct{}), - dkgSetCaches: make(map[uint64]map[types.NodeID]struct{}), + peers: make(map[types.NodeID]struct{}), + notarySetCaches: make( + map[uint64]map[uint32]map[types.NodeID]struct{}), + dkgSetCaches: make(map[uint64]map[types.NodeID]struct{}), voteCache: make( map[types.Position]map[types.VoteHeader]*types.Vote), } @@ -185,11 +187,11 @@ func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel, // Construct transport layer. switch config.Type { case NetworkTypeTCPLocal: - n.trans = NewTCPTransportClient(pubKey, latency, marshaller, true) + n.trans = NewTCPTransportClient(pubKey, config.Marshaller, true) case NetworkTypeTCP: - n.trans = NewTCPTransportClient(pubKey, latency, marshaller, false) + n.trans = NewTCPTransportClient(pubKey, config.Marshaller, false) case NetworkTypeFake: - n.trans = NewFakeTransportClient(pubKey, latency) + n.trans = NewFakeTransportClient(pubKey) default: panic(fmt.Errorf("unknown network type: %v", config.Type)) } @@ -213,8 +215,11 @@ func (n *Network) PullRandomness(hashes common.Hashes) { // BroadcastVote implements core.Network interface. func (n *Network) BroadcastVote(vote *types.Vote) { - n.broadcastToSet( - n.getNotarySet(vote.Position.Round, vote.Position.ChainID), vote) + if err := n.trans.Broadcast( + n.getNotarySet(vote.Position.Round, vote.Position.ChainID), + n.config.DirectLatency, vote); err != nil { + panic(err) + } n.addVoteToCache(vote) } @@ -222,28 +227,33 @@ func (n *Network) BroadcastVote(vote *types.Vote) { func (n *Network) BroadcastBlock(block *types.Block) { // Avoid data race in fake transport. block = n.cloneForFake(block).(*types.Block) - n.broadcastToSet( - n.getNotarySet(block.Position.Round, block.Position.ChainID), block) + notarySet := n.getNotarySet(block.Position.Round, block.Position.ChainID) + if err := n.trans.Broadcast( + notarySet, n.config.DirectLatency, block); err != nil { + panic(err) + } + if err := n.trans.Broadcast(getComplementSet(n.peers, notarySet), + n.config.GossipLatency, block); err != nil { + panic(err) + } n.addBlockToCache(block) } // BroadcastAgreementResult implements core.Network interface. func (n *Network) BroadcastAgreementResult( - randRequest *types.AgreementResult) { - n.sentAgreementLock.Lock() - defer n.sentAgreementLock.Unlock() - if _, exist := n.sentAgreement[randRequest.BlockHash]; exist { + result *types.AgreementResult) { + if !n.markAgreementResultAsSent(result.BlockHash) { return } - if len(n.sentAgreement) > 1000 { - // Randomly drop one entry. - for k := range n.sentAgreement { - delete(n.sentAgreement, k) - break - } + // Send to DKG set first. + dkgSet := n.getDKGSet(result.Position.Round) + if err := n.trans.Broadcast( + dkgSet, n.config.DirectLatency, result); err != nil { + panic(err) } - n.sentAgreement[randRequest.BlockHash] = struct{}{} - if err := n.trans.Broadcast(randRequest); err != nil { + // Gossip to other nodes. + if err := n.trans.Broadcast(getComplementSet(n.peers, dkgSet), + n.config.GossipLatency, result); err != nil { panic(err) } } @@ -251,20 +261,19 @@ func (n *Network) BroadcastAgreementResult( // BroadcastRandomnessResult implements core.Network interface. func (n *Network) BroadcastRandomnessResult( randResult *types.BlockRandomnessResult) { - n.sentRandomnessLock.Lock() - defer n.sentRandomnessLock.Unlock() - if _, exist := n.sentRandomness[randResult.BlockHash]; exist { + if !n.markRandomnessResultAsSent(randResult.BlockHash) { return } - if len(n.sentRandomness) > 1000 { - // Randomly drop one entry. - for k := range n.sentRandomness { - delete(n.sentRandomness, k) - break - } + // Send to notary set first. + notarySet := n.getNotarySet( + randResult.Position.Round, randResult.Position.ChainID) + if err := n.trans.Broadcast( + notarySet, n.config.DirectLatency, randResult); err != nil { + panic(err) } - n.sentRandomness[randResult.BlockHash] = struct{}{} - if err := n.trans.Broadcast(randResult); err != nil { + // Gossip to other nodes. + if err := n.trans.Broadcast(getComplementSet(n.peers, notarySet), + n.config.GossipLatency, randResult); err != nil { panic(err) } n.addRandomnessToCache(randResult) @@ -273,21 +282,25 @@ func (n *Network) BroadcastRandomnessResult( // SendDKGPrivateShare implements core.Network interface. func (n *Network) SendDKGPrivateShare( recv crypto.PublicKey, prvShare *typesDKG.PrivateShare) { - if err := n.trans.Send(types.NewNodeID(recv), prvShare); err != nil { - panic(err) - } + n.send(types.NewNodeID(recv), prvShare) } // BroadcastDKGPrivateShare implements core.Network interface. func (n *Network) BroadcastDKGPrivateShare( prvShare *typesDKG.PrivateShare) { - n.broadcastToSet(n.getDKGSet(prvShare.Round), prvShare) + if err := n.trans.Broadcast(n.getDKGSet(prvShare.Round), + n.config.DirectLatency, prvShare); err != nil { + panic(err) + } } // BroadcastDKGPartialSignature implements core.Network interface. func (n *Network) BroadcastDKGPartialSignature( psig *typesDKG.PartialSignature) { - n.broadcastToSet(n.getDKGSet(psig.Round), psig) + if err := n.trans.Broadcast( + n.getDKGSet(psig.Round), n.config.DirectLatency, psig); err != nil { + panic(err) + } } // ReceiveChan implements core.Network interface. @@ -312,7 +325,6 @@ func (n *Network) Setup(serverEndpoint interface{}) (err error) { return } peerKeys := n.trans.Peers() - n.peers = make(map[types.NodeID]struct{}) for _, k := range peerKeys { n.peers[types.NewNodeID(k)] = struct{}{} } @@ -374,9 +386,7 @@ func (n *Network) handlePullRequest(req *PullRequest) { break All default: } - if err := n.trans.Send(req.Requester, b); err != nil { - log.Println("unable to send block", req.Requester, err) - } + n.send(req.Requester, b) } }() case "vote": @@ -386,9 +396,7 @@ func (n *Network) handlePullRequest(req *PullRequest) { defer n.voteCacheLock.Unlock() if votes, exists := n.voteCache[pos]; exists { for _, v := range votes { - if err := n.trans.Send(req.Requester, v); err != nil { - log.Println("unable to send vote", req.Requester, err) - } + n.send(req.Requester, v) } } }() @@ -408,9 +416,7 @@ func (n *Network) handlePullRequest(req *PullRequest) { break All default: } - if err := n.trans.Send(req.Requester, r); err != nil { - log.Println("unable to send randomness", req.Requester, err) - } + n.send(req.Requester, r) } }() default: @@ -457,24 +463,16 @@ func (n *Network) Report(msg interface{}) error { return n.trans.Report(msg) } +// Broadcast a message to all peers. +func (n *Network) Broadcast(msg interface{}) error { + return n.trans.Broadcast(n.peers, &FixedLatencyModel{}, msg) +} + // Peers exports 'Peers' method of Transport. func (n *Network) Peers() []crypto.PublicKey { return n.trans.Peers() } -// Broadcast exports 'Broadcast' method of Transport, and would panic when -// error. -func (n *Network) Broadcast(msg interface{}) { - if err := n.trans.Broadcast(msg); err != nil { - panic(err) - } -} - -// Send exports 'Send' method of Transport. -func (n *Network) Send(nodeID types.NodeID, msg interface{}) error { - return n.trans.Send(nodeID, msg) -} - // ReceiveChanForNode returns a channel for messages not handled by // core.Consensus. func (n *Network) ReceiveChanForNode() <-chan interface{} { @@ -521,14 +519,11 @@ Loop: if nID == n.ID { continue } - if err := n.trans.Send(nID, req); err != nil { - // Try next peer. - continue - } + n.send(nID, req) select { case <-n.ctx.Done(): break Loop - case <-time.After(2 * n.latencyModel.Delay()): + case <-time.After(2 * n.config.DirectLatency.Delay()): // Consume everything in the notification channel. for { select { @@ -561,10 +556,7 @@ func (n *Network) pullVotesAsync(pos types.Position) { // Randomly select one peer from notary set and send a pull request. sentCount := 0 for nID := range notarySet { - if err := n.trans.Send(nID, req); err != nil { - // Try next peer. - continue - } + n.send(nID, req) sentCount++ if sentCount >= maxPullingPeerCount { break @@ -598,14 +590,11 @@ Loop: if nID == n.ID { continue } - if err := n.trans.Send(nID, req); err != nil { - // Try next peer. - continue - } + n.send(nID, req) select { case <-n.ctx.Done(): break Loop - case <-time.After(2 * n.latencyModel.Delay()): + case <-time.After(2 * n.config.DirectLatency.Delay()): // Consume everything in the notification channel. for { select { @@ -673,6 +662,40 @@ func (n *Network) addRandomnessToCache(rand *types.BlockRandomnessResult) { n.randomnessCache[rand.BlockHash] = rand } +func (n *Network) markAgreementResultAsSent(blockHash common.Hash) bool { + n.sentAgreementLock.Lock() + defer n.sentAgreementLock.Unlock() + if _, exist := n.sentAgreement[blockHash]; exist { + return false + } + if len(n.sentAgreement) > 1000 { + // Randomly drop one entry. + for k := range n.sentAgreement { + delete(n.sentAgreement, k) + break + } + } + n.sentAgreement[blockHash] = struct{}{} + return true +} + +func (n *Network) markRandomnessResultAsSent(blockHash common.Hash) bool { + n.sentRandomnessLock.Lock() + defer n.sentRandomnessLock.Unlock() + if _, exist := n.sentRandomness[blockHash]; exist { + return false + } + if len(n.sentRandomness) > 1000 { + // Randomly drop one entry. + for k := range n.sentRandomness { + delete(n.sentRandomness, k) + break + } + } + n.sentRandomness[blockHash] = struct{}{} + return true +} + func (n *Network) cloneForFake(v interface{}) interface{} { if n.config.Type != NetworkTypeFake { return v @@ -735,15 +758,11 @@ func (n *Network) getDKGSet(round uint64) map[types.NodeID]struct{} { return set } -// broadcastToSet broadcast a message to a set of nodes. -func (n *Network) broadcastToSet( - set map[types.NodeID]struct{}, msg interface{}) { - for nID := range set { - if nID == n.ID { - continue - } - if err := n.trans.Send(nID, msg); err != nil { +func (n *Network) send(endpoint types.NodeID, msg interface{}) { + go func() { + time.Sleep(n.config.DirectLatency.Delay()) + if err := n.trans.Send(endpoint, msg); err != nil { panic(err) } - } + }() } diff --git a/core/test/network_test.go b/core/test/network_test.go index e05dec5..d040a16 100644 --- a/core/test/network_test.go +++ b/core/test/network_test.go @@ -48,11 +48,11 @@ func (s *NetworkTestSuite) setupNetworks( // Setup several network modules. networks := make(map[types.NodeID]*Network) for _, key := range pubKeys { - n := NewNetwork( - key, - &FixedLatencyModel{}, - NewDefaultMarshaller(nil), - NetworkConfig{Type: NetworkTypeFake}) + n := NewNetwork(key, NetworkConfig{ + Type: NetworkTypeFake, + DirectLatency: &FixedLatencyModel{}, + GossipLatency: &FixedLatencyModel{}, + Marshaller: NewDefaultMarshaller(nil)}) networks[n.ID] = n wg.Add(1) go func() { @@ -297,21 +297,13 @@ func (s *NetworkTestSuite) TestBroadcastToSet() { // Try broadcasting with datum from round 0, and make sure only node belongs // to that set receiving the message. nerd.BroadcastVote(&types.Vote{}) - req.IsType(<-notaryNode.ReceiveChan(), &types.Vote{}) - nerd.BroadcastBlock(&types.Block{}) - req.IsType(<-notaryNode.ReceiveChan(), &types.Block{}) + req.IsType(&types.Vote{}, <-notaryNode.ReceiveChan()) nerd.BroadcastDKGPrivateShare(&typesDKG.PrivateShare{}) - req.IsType(<-dkgNode.ReceiveChan(), &typesDKG.PrivateShare{}) + req.IsType(&typesDKG.PrivateShare{}, <-dkgNode.ReceiveChan()) nerd.BroadcastDKGPartialSignature(&typesDKG.PartialSignature{}) - req.IsType(<-dkgNode.ReceiveChan(), &typesDKG.PartialSignature{}) - // There should be no remaining message in each node. - for _, n := range networks { - select { - case <-n.ReceiveChan(): - req.False(true) - default: - } - } + req.IsType(&typesDKG.PartialSignature{}, <-dkgNode.ReceiveChan()) + nerd.BroadcastBlock(&types.Block{}) + req.IsType(&types.Block{}, <-notaryNode.ReceiveChan()) } func TestNetwork(t *testing.T) { diff --git a/core/test/tcp-transport.go b/core/test/tcp-transport.go index b16bbcb..e04ba53 100644 --- a/core/test/tcp-transport.go +++ b/core/test/tcp-transport.go @@ -109,20 +109,14 @@ type TCPTransport struct { recvChannel chan *TransportEnvelope ctx context.Context cancel context.CancelFunc - latency LatencyModel marshaller Marshaller throughputRecords []ThroughputRecord throughputLock sync.Mutex } // NewTCPTransport constructs an TCPTransport instance. -func NewTCPTransport( - peerType TransportPeerType, - pubKey crypto.PublicKey, - latency LatencyModel, - marshaller Marshaller, - localPort int) *TCPTransport { - +func NewTCPTransport(peerType TransportPeerType, pubKey crypto.PublicKey, + marshaller Marshaller, localPort int) *TCPTransport { ctx, cancel := context.WithCancel(context.Background()) return &TCPTransport{ peerType: peerType, @@ -133,7 +127,6 @@ func NewTCPTransport( ctx: ctx, cancel: cancel, localPort: localPort, - latency: latency, marshaller: marshaller, throughputRecords: []ThroughputRecord{}, } @@ -202,6 +195,14 @@ func (t *TCPTransport) clientHandshake(conn net.Conn) ( return } +func (t *TCPTransport) send( + endpoint types.NodeID, msg interface{}, payload []byte) { + t.peersLock.RLock() + defer t.peersLock.RUnlock() + t.handleThroughputData(msg, payload) + t.peers[endpoint].sendChannel <- payload +} + // Send implements Transport.Send method. func (t *TCPTransport) Send( endpoint types.NodeID, msg interface{}) (err error) { @@ -210,38 +211,25 @@ func (t *TCPTransport) Send( if err != nil { return } - go func() { - if t.latency != nil { - time.Sleep(t.latency.Delay()) - } - t.peersLock.RLock() - defer t.peersLock.RUnlock() - t.handleThroughputData(msg, payload) - t.peers[endpoint].sendChannel <- payload - }() + go t.send(endpoint, msg, payload) return } // Broadcast implements Transport.Broadcast method. -func (t *TCPTransport) Broadcast(msg interface{}) (err error) { +func (t *TCPTransport) Broadcast(endpoints map[types.NodeID]struct{}, + latency LatencyModel, msg interface{}) (err error) { payload, err := t.marshalMessage(msg) if err != nil { return } - t.peersLock.RLock() - defer t.peersLock.RUnlock() - - for nID, rec := range t.peers { + for nID := range endpoints { if nID == t.nID { continue } - go func(ch chan<- []byte) { - if t.latency != nil { - time.Sleep(t.latency.Delay()) - } - t.handleThroughputData(msg, payload) - ch <- payload - }(rec.sendChannel) + go func(ID types.NodeID) { + time.Sleep(latency.Delay()) + t.send(ID, msg, payload) + }(nID) } return } @@ -585,14 +573,12 @@ type TCPTransportClient struct { // NewTCPTransportClient constructs a TCPTransportClient instance. func NewTCPTransportClient( pubKey crypto.PublicKey, - latency LatencyModel, marshaller Marshaller, local bool) *TCPTransportClient { return &TCPTransportClient{ - TCPTransport: *NewTCPTransport( - TransportPeer, pubKey, latency, marshaller, 8080), - local: local, + TCPTransport: *NewTCPTransport(TransportPeer, pubKey, marshaller, 8080), + local: local, } } @@ -776,11 +762,7 @@ func NewTCPTransportServer( // NOTE: the assumption here is the node ID of peers // won't be zero. TCPTransport: *NewTCPTransport( - TransportPeerServer, - prvKey.PublicKey(), - nil, - marshaller, - serverPort), + TransportPeerServer, prvKey.PublicKey(), marshaller, serverPort), } } @@ -828,7 +810,11 @@ func (t *TCPTransportServer) WaitForPeers(numPeers uint32) (err error) { if err = t.buildConnectionsToPeers(); err != nil { return } - if err = t.Broadcast(peersInfo); err != nil { + peers := make(map[types.NodeID]struct{}) + for ID := range t.peers { + peers[ID] = struct{}{} + } + if err = t.Broadcast(peers, &FixedLatencyModel{}, peersInfo); err != nil { return } // Wait for peers to send 'ready' report. @@ -851,7 +837,8 @@ func (t *TCPTransportServer) WaitForPeers(numPeers uint32) (err error) { } } // Ack all peers ready to go. - if err = t.Broadcast(&tcpMessage{Type: "all-ready"}); err != nil { + if err = t.Broadcast(peers, &FixedLatencyModel{}, + &tcpMessage{Type: "all-ready"}); err != nil { return } return diff --git a/core/test/transport_test.go b/core/test/transport_test.go index d5c4260..8305ee2 100644 --- a/core/test/transport_test.go +++ b/core/test/transport_test.go @@ -87,13 +87,11 @@ type TransportTestSuite struct { } func (s *TransportTestSuite) baseTest( - server *testPeerServer, - peers map[types.NodeID]*testPeer, - delay time.Duration) { - + server *testPeerServer, peers map[types.NodeID]*testPeer, delay float64) { var ( - req = s.Require() - wg sync.WaitGroup + req = s.Require() + delayDuration = time.Duration(delay) * time.Millisecond + wg sync.WaitGroup ) // For each peers, do following stuffs: @@ -150,6 +148,10 @@ func (s *TransportTestSuite) baseTest( } wg.Add(len(peers) + 1) go handleServer(server) + peersAsMap := make(map[types.NodeID]struct{}) + for nID := range peers { + peersAsMap[nID] = struct{}{} + } for nID, peer := range peers { go handlePeer(peer) // Broadcast a block. @@ -158,7 +160,8 @@ func (s *TransportTestSuite) baseTest( Hash: common.NewRandomHash(), } peer.myBlockSentTime = time.Now() - peer.trans.Broadcast(peer.myBlock) + peer.trans.Broadcast( + peersAsMap, &FixedLatencyModel{Latency: delay}, peer.myBlock) // Report a block to server. peer.expectedEchoHash = common.NewRandomHash() peer.trans.Report(&types.Block{ @@ -187,7 +190,7 @@ func (s *TransportTestSuite) baseTest( continue } req.True(otherPeer.blocksReceiveTime[peer.myBlock.Hash].Sub( - peer.myBlockSentTime) >= delay) + peer.myBlockSentTime) >= delayDuration) } } } @@ -200,7 +203,6 @@ func (s *TransportTestSuite) TestFake() { prvKeys = GenerateRandomPrivateKeys(peerCount) err error wg sync.WaitGroup - latency = &FixedLatencyModel{Latency: 300} server = &testPeerServer{trans: NewFakeTransportServer()} ) // Setup PeerServer @@ -212,7 +214,7 @@ func (s *TransportTestSuite) TestFake() { nID := types.NewNodeID(key.PublicKey()) peer := &testPeer{ nID: nID, - trans: NewFakeTransportClient(key.PublicKey(), latency), + trans: NewFakeTransportClient(key.PublicKey()), } peers[nID] = peer go func() { @@ -226,7 +228,7 @@ func (s *TransportTestSuite) TestFake() { server.trans.WaitForPeers(uint32(peerCount)) // Make sure all clients are ready. wg.Wait() - s.baseTest(server, peers, 300*time.Millisecond) + s.baseTest(server, peers, 300) req.Nil(server.trans.Close()) for _, peer := range peers { req.Nil(peer.trans.Close()) @@ -242,7 +244,6 @@ func (s *TransportTestSuite) TestTCPLocal() { prvKeys = GenerateRandomPrivateKeys(peerCount) err error wg sync.WaitGroup - latency = &FixedLatencyModel{Latency: 300} serverPort = 8080 serverAddr = net.JoinHostPort("127.0.0.1", strconv.Itoa(serverPort)) server = &testPeerServer{ @@ -258,7 +259,7 @@ func (s *TransportTestSuite) TestTCPLocal() { peer := &testPeer{ nID: nID, trans: NewTCPTransportClient( - prvKey.PublicKey(), latency, &testMarshaller{}, true), + prvKey.PublicKey(), &testMarshaller{}, true), } peers[nID] = peer go func() { @@ -274,7 +275,7 @@ func (s *TransportTestSuite) TestTCPLocal() { // Make sure all clients are ready. wg.Wait() - s.baseTest(server, peers, 300*time.Millisecond) + s.baseTest(server, peers, 300) req.Nil(server.trans.Close()) for _, peer := range peers { req.Nil(peer.trans.Close()) diff --git a/core/test/utils.go b/core/test/utils.go index 8a14ebf..d85395b 100644 --- a/core/test/utils.go +++ b/core/test/utils.go @@ -244,3 +244,15 @@ func LaunchDummyReceiver( }() return dummyCancel } + +func getComplementSet( + all, set map[types.NodeID]struct{}) map[types.NodeID]struct{} { + complement := make(map[types.NodeID]struct{}) + for nID := range all { + if _, exists := set[nID]; exists { + continue + } + complement[nID] = struct{}{} + } + return complement +} diff --git a/integration_test/consensus_test.go b/integration_test/consensus_test.go index 5c3f181..fa62bda 100644 --- a/integration_test/consensus_test.go +++ b/integration_test/consensus_test.go @@ -68,11 +68,12 @@ func (s *ConsensusTestSuite) setupNodes( dbInst, err := db.NewMemBackedDB() s.Require().NoError(err) // Prepare essential modules: app, gov, db. - networkModule := test.NewNetwork( - k.PublicKey(), - &test.FixedLatencyModel{}, - test.NewDefaultMarshaller(nil), - test.NetworkConfig{Type: test.NetworkTypeFake}) + networkModule := test.NewNetwork(k.PublicKey(), test.NetworkConfig{ + Type: test.NetworkTypeFake, + DirectLatency: &test.FixedLatencyModel{}, + GossipLatency: &test.FixedLatencyModel{}, + Marshaller: test.NewDefaultMarshaller(nil)}, + ) gov := seedGov.Clone() gov.SwitchToRemoteMode(networkModule) gov.NotifyRoundHeight(0, 0) diff --git a/simulation/config/config.go b/simulation/config/config.go index 797145c..d22f4f1 100644 --- a/simulation/config/config.go +++ b/simulation/config/config.go @@ -56,14 +56,18 @@ type Node struct { Changes []Change } +// LatencyModel for ths simulation. +type LatencyModel struct { + Mean float64 + Sigma float64 +} + // Networking config. type Networking struct { Type test.NetworkType PeerServer string - - Mean float64 - Sigma float64 - LossRateValue float64 + Direct LatencyModel + Gossip LatencyModel } // Scheduler Settings. @@ -129,11 +133,16 @@ func GenerateDefault(path string) error { MaxBlock: math.MaxUint64, }, Networking: Networking{ - Type: test.NetworkTypeTCPLocal, - PeerServer: "127.0.0.1", - Mean: 100, - Sigma: 10, - LossRateValue: 0, + Type: test.NetworkTypeTCPLocal, + PeerServer: "127.0.0.1", + Direct: LatencyModel{ + Mean: 100, + Sigma: 10, + }, + Gossip: LatencyModel{ + Mean: 300, + Sigma: 25, + }, }, Scheduler: Scheduler{ WorkerNum: 2, diff --git a/simulation/node.go b/simulation/node.go index dc4a725..026db66 100644 --- a/simulation/node.go +++ b/simulation/node.go @@ -70,18 +70,19 @@ type node struct { func newNode(prvKey crypto.PrivateKey, logger common.Logger, cfg config.Config) *node { pubKey := prvKey.PublicKey() - netModule := test.NewNetwork( - pubKey, - &test.NormalLatencyModel{ - Mean: cfg.Networking.Mean, - Sigma: cfg.Networking.Sigma, + netModule := test.NewNetwork(pubKey, test.NetworkConfig{ + Type: cfg.Networking.Type, + PeerServer: cfg.Networking.PeerServer, + PeerPort: peerPort, + DirectLatency: &test.NormalLatencyModel{ + Mean: cfg.Networking.Direct.Mean, + Sigma: cfg.Networking.Direct.Sigma, }, - test.NewDefaultMarshaller(&jsonMarshaller{}), - test.NetworkConfig{ - Type: cfg.Networking.Type, - PeerServer: cfg.Networking.PeerServer, - PeerPort: peerPort, - }) + GossipLatency: &test.NormalLatencyModel{ + Mean: cfg.Networking.Gossip.Mean, + Sigma: cfg.Networking.Gossip.Sigma, + }, + Marshaller: test.NewDefaultMarshaller(&jsonMarshaller{})}) id := types.NewNodeID(pubKey) dbInst, err := db.NewMemBackedDB(id.String() + ".db") if err != nil { diff --git a/simulation/peer-server.go b/simulation/peer-server.go index 14a825a..69ed029 100644 --- a/simulation/peer-server.go +++ b/simulation/peer-server.go @@ -90,7 +90,8 @@ func (p *PeerServer) handleBlockList(id types.NodeID, blocks *BlockList) { } p.verifiedLen += uint64(length) if p.verifiedLen >= p.cfg.Node.MaxBlock { - if err := p.trans.Broadcast(ntfShutdown); err != nil { + if err := p.trans.Broadcast( + p.peers, &test.FixedLatencyModel{}, ntfShutdown); err != nil { panic(err) } } @@ -199,7 +200,8 @@ func (p *PeerServer) Run() { } // Cache peers' info. for _, pubKey := range p.trans.Peers() { - p.peers[types.NewNodeID(pubKey)] = struct{}{} + nID := types.NewNodeID(pubKey) + p.peers[nID] = struct{}{} } // Pick a mater node to execute pending config changes. for nID := range p.peers { @@ -225,7 +227,8 @@ func (p *PeerServer) Run() { break } } - if err := p.trans.Broadcast(ntfReady); err != nil { + if err := p.trans.Broadcast( + p.peers, &test.FixedLatencyModel{}, ntfReady); err != nil { panic(err) } log.Println("Simulation is ready to go with", len(p.peers), "nodes") diff --git a/test_config/test-config-change.toml b/test_config/test-config-change.toml index 5950b30..396e627 100644 --- a/test_config/test-config-change.toml +++ b/test_config/test-config-change.toml @@ -38,9 +38,12 @@ propose_interval_sigma = 5e+01 [networking] type = "fake" peer_server = "127.0.0.1" +[networking.direct] mean = 1e+01 sigma = 1e+01 -loss_rate_value = 0e+00 +[networking.gossip] +mean = 3e+01 +sigma = 3e+01 [scheduler] worker_num = 2 diff --git a/test_config/test.toml b/test_config/test.toml index 0261d8d..8c6e342 100644 --- a/test_config/test.toml +++ b/test_config/test.toml @@ -23,9 +23,12 @@ propose_interval_sigma = 5e+01 [networking] type = "fake" peer_server = "127.0.0.1" +[networking.direct] mean = 1e+01 sigma = 1e+01 -loss_rate_value = 0e+00 +[networking.gossip] +mean = 3e+01 +sigma = 3e+01 [scheduler] worker_num = 2 |