diff options
author | Mission Liao <mission.liao@dexon.org> | 2018-11-15 13:05:50 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-11-15 13:05:50 +0800 |
commit | ce9da6912a16f064160781bbff8a9762e305bae9 (patch) | |
tree | f4de88f0488687e4480cc8461b8a3cbf03a1c85b | |
parent | 7b68cc8fa60d91a7c6ed2f78dc851da48d1fc258 (diff) | |
download | dexon-consensus-ce9da6912a16f064160781bbff8a9762e305bae9.tar dexon-consensus-ce9da6912a16f064160781bbff8a9762e305bae9.tar.gz dexon-consensus-ce9da6912a16f064160781bbff8a9762e305bae9.tar.bz2 dexon-consensus-ce9da6912a16f064160781bbff8a9762e305bae9.tar.lz dexon-consensus-ce9da6912a16f064160781bbff8a9762e305bae9.tar.xz dexon-consensus-ce9da6912a16f064160781bbff8a9762e305bae9.tar.zst dexon-consensus-ce9da6912a16f064160781bbff8a9762e305bae9.zip |
test: fix network (#328)
* Broadcast to set of node instead of broadcasting when attaching cache.
* Fix pull blocks
-rw-r--r-- | core/test/governance.go | 10 | ||||
-rw-r--r-- | core/test/network.go | 151 | ||||
-rw-r--r-- | core/test/network_test.go | 96 | ||||
-rw-r--r-- | core/utils.go | 5 | ||||
-rw-r--r-- | integration_test/consensus_test.go | 4 |
5 files changed, 184 insertions, 82 deletions
diff --git a/core/test/governance.go b/core/test/governance.go index 58b773a..f96e9e7 100644 --- a/core/test/governance.go +++ b/core/test/governance.go @@ -237,16 +237,6 @@ func (g *Governance) CatchUpWithRound(round uint64) { config, nodeSet := g.stateModule.Snapshot() g.configs = append(g.configs, config) g.nodeSets = append(g.nodeSets, nodeSet) - if g.networkModule == nil { - continue - } - // Notify network module for new notary set. - round := uint64(len(g.configs)) - 1 - notarySet := make(map[types.NodeID]struct{}) - for _, k := range g.nodeSets[round] { - notarySet[types.NewNodeID(k)] = struct{}{} - } - g.networkModule.appendRoundSetting(round, notarySet) } } diff --git a/core/test/network.go b/core/test/network.go index 6aff3d6..e603a45 100644 --- a/core/test/network.go +++ b/core/test/network.go @@ -32,11 +32,10 @@ import ( "github.com/dexon-foundation/dexon-consensus/core/crypto" "github.com/dexon-foundation/dexon-consensus/core/types" typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg" + "github.com/dexon-foundation/dexon-consensus/core/utils" ) const ( - // Count of rounds of notary set cached in network module. - cachedNotarySetSize = 10 // Count of maximum count of peers to pull votes from. maxPullingPeerCount = 3 ) @@ -145,13 +144,15 @@ type Network struct { voteCacheSize int votePositions []types.Position stateModule *State - notarySetLock sync.RWMutex - notarySets []map[types.NodeID]struct{} - notarySetMinRound uint64 peers map[types.NodeID]struct{} unreceivedBlocksLock sync.RWMutex unreceivedBlocks map[common.Hash]chan<- common.Hash latencyModel LatencyModel + cache *utils.NodeSetCache + notarySetCachesLock sync.Mutex + notarySetCaches map[uint64]map[uint32]map[types.NodeID]struct{} + dkgSetCachesLock sync.Mutex + dkgSetCaches map[uint64]map[types.NodeID]struct{} } // NewNetwork setup network stuffs for nodes, which provides an @@ -169,6 +170,8 @@ func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel, blockCache: make(map[common.Hash]*types.Block), unreceivedBlocks: make(map[common.Hash]chan<- common.Hash), latencyModel: latency, + notarySetCaches: make(map[uint64]map[uint32]map[types.NodeID]struct{}), + dkgSetCaches: make(map[uint64]map[types.NodeID]struct{}), voteCache: make( map[types.Position]map[types.VoteHeader]*types.Vote), } @@ -199,9 +202,8 @@ func (n *Network) PullVotes(pos types.Position) { // BroadcastVote implements core.Network interface. func (n *Network) BroadcastVote(vote *types.Vote) { - if err := n.trans.Broadcast(vote); err != nil { - panic(err) - } + n.broadcastToSet( + n.getNotarySet(vote.Position.Round, vote.Position.ChainID), vote) n.addVoteToCache(vote) } @@ -209,9 +211,8 @@ func (n *Network) BroadcastVote(vote *types.Vote) { func (n *Network) BroadcastBlock(block *types.Block) { // Avoid data race in fake transport. block = n.cloneForFake(block).(*types.Block) - if err := n.trans.Broadcast(block); err != nil { - panic(err) - } + n.broadcastToSet( + n.getNotarySet(block.Position.Round, block.Position.ChainID), block) n.addBlockToCache(block) } @@ -257,13 +258,6 @@ func (n *Network) BroadcastRandomnessResult( } } -// broadcast message to all other nodes in the network. -func (n *Network) broadcast(message interface{}) { - if err := n.trans.Broadcast(message); err != nil { - panic(err) - } -} - // SendDKGPrivateShare implements core.Network interface. func (n *Network) SendDKGPrivateShare( recv crypto.PublicKey, prvShare *typesDKG.PrivateShare) { @@ -275,17 +269,13 @@ func (n *Network) SendDKGPrivateShare( // BroadcastDKGPrivateShare implements core.Network interface. func (n *Network) BroadcastDKGPrivateShare( prvShare *typesDKG.PrivateShare) { - if err := n.trans.Broadcast(prvShare); err != nil { - panic(err) - } + n.broadcastToSet(n.getDKGSet(prvShare.Round), prvShare) } // BroadcastDKGPartialSignature implements core.Network interface. func (n *Network) BroadcastDKGPartialSignature( psig *typesDKG.PartialSignature) { - if err := n.trans.Broadcast(psig); err != nil { - panic(err) - } + n.broadcastToSet(n.getDKGSet(psig.Round), psig) } // ReceiveChan implements core.Network interface. @@ -329,6 +319,7 @@ func (n *Network) dispatchMsg(e *TransportEnvelope) { if ch, exists := n.unreceivedBlocks[v.Hash]; exists { ch <- v.Hash } + delete(n.unreceivedBlocks, v.Hash) }() n.toConsensus <- v case *types.Vote: @@ -410,7 +401,7 @@ Loop: if !ok { break Loop } - n.dispatchMsg(e) + go n.dispatchMsg(e) } } } @@ -454,31 +445,16 @@ func (n *Network) ReceiveChanForNode() <-chan interface{} { // addStateModule attaches a State instance to this network. func (n *Network) addStateModule(s *State) { + // This variable should be attached before run, no lock to protect it. n.stateModule = s } -// appendRoundSetting updates essential info to network module for each round. -func (n *Network) appendRoundSetting( - round uint64, notarySet map[types.NodeID]struct{}) { - n.notarySetLock.Lock() - defer n.notarySetLock.Unlock() - if len(n.notarySets) != 0 { - // This network module is already initialized, do some check against - // the inputs. - if round != n.notarySetMinRound+uint64(len(n.notarySets)) { - panic(fmt.Errorf( - "round not increasing when appending round setting: %v", round)) - } - } else { - n.notarySetMinRound = round - } - n.notarySets = append(n.notarySets, notarySet) - // Purge cached notary sets. - if len(n.notarySets) > cachedNotarySetSize { - n.notarySets = n.notarySets[1:] - n.notarySetMinRound++ - } - return +// AddNodeSetCache attaches an utils.NodeSetCache to this module. Once attached +// The behavior of Broadcast-X methods would be switched to broadcast to correct +// set of peers, instead of all peers. +func (n *Network) AddNodeSetCache(cache *utils.NodeSetCache) { + // This variable should be attached before run, no lock to protect it. + n.cache = cache } func (n *Network) pullBlocksAsync(hashes common.Hashes) { @@ -490,17 +466,10 @@ func (n *Network) pullBlocksAsync(hashes common.Hashes) { defer n.unreceivedBlocksLock.Unlock() for _, h := range hashes { if _, exists := n.unreceivedBlocks[h]; exists { - panic(fmt.Errorf("attempting to pull one block multiple times")) + continue } n.unreceivedBlocks[h] = ch - } - }() - // Clean all unreceived block entry in unrecelivedBlocks field when leaving. - defer func() { - n.unreceivedBlocksLock.Lock() - defer n.unreceivedBlocksLock.Unlock() - for _, h := range hashes { - delete(n.unreceivedBlocks, h) + notYetReceived[h] = struct{}{} } }() req := &PullRequest{ @@ -511,6 +480,9 @@ func (n *Network) pullBlocksAsync(hashes common.Hashes) { // Randomly pick peers to send pull requests. Loop: for nID := range n.peers { + if nID == n.ID { + continue + } if err := n.trans.Send(nID, req); err != nil { // Try next peer. continue @@ -547,11 +519,7 @@ func (n *Network) pullVotesAsync(pos types.Position) { Identity: pos, } // Get corresponding notary set. - notarySet := func() map[types.NodeID]struct{} { - n.notarySetLock.Lock() - defer n.notarySetLock.Unlock() - return n.notarySets[pos.Round-n.notarySetMinRound] - }() + notarySet := n.getNotarySet(pos.Round, pos.ChainID) // Randomly select one peer from notary set and send a pull request. sentCount := 0 for nID := range notarySet { @@ -613,3 +581,64 @@ func (n *Network) cloneForFake(v interface{}) interface{} { } return v } + +// getNotarySet gets notary set for that (round, chain) from cache. +func (n *Network) getNotarySet( + round uint64, chain uint32) map[types.NodeID]struct{} { + if n.cache == nil { + // Default behavior is to broadcast to all peers, which makes it easier + // to be used in simple test cases. + return n.peers + } + n.notarySetCachesLock.Lock() + defer n.notarySetCachesLock.Unlock() + roundSets, exists := n.notarySetCaches[round] + if !exists { + roundSets = make(map[uint32]map[types.NodeID]struct{}) + n.notarySetCaches[round] = roundSets + } + set, exists := roundSets[chain] + if !exists { + var err error + set, err = n.cache.GetNotarySet(round, chain) + if err != nil { + panic(err) + } + roundSets[chain] = set + } + return set +} + +// getDKGSet gets DKG set for that round from cache. +func (n *Network) getDKGSet(round uint64) map[types.NodeID]struct{} { + if n.cache == nil { + // Default behavior is to broadcast to all peers, which makes it easier + // to be used in simple test cases. + return n.peers + } + n.dkgSetCachesLock.Lock() + defer n.dkgSetCachesLock.Unlock() + set, exists := n.dkgSetCaches[round] + if !exists { + var err error + set, err = n.cache.GetDKGSet(round) + if err != nil { + panic(err) + } + n.dkgSetCaches[round] = set + } + return set +} + +// broadcastToSet broadcast a message to a set of nodes. +func (n *Network) broadcastToSet( + set map[types.NodeID]struct{}, msg interface{}) { + for nID := range set { + if nID == n.ID { + continue + } + if err := n.trans.Send(nID, msg); err != nil { + panic(err) + } + } +} diff --git a/core/test/network_test.go b/core/test/network_test.go index fe11ed1..1f9ec6f 100644 --- a/core/test/network_test.go +++ b/core/test/network_test.go @@ -26,7 +26,10 @@ import ( "time" "github.com/dexon-foundation/dexon-consensus/common" + "github.com/dexon-foundation/dexon-consensus/core/crypto" "github.com/dexon-foundation/dexon-consensus/core/types" + typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg" + "github.com/dexon-foundation/dexon-consensus/core/utils" "github.com/stretchr/testify/suite" ) @@ -35,7 +38,7 @@ type NetworkTestSuite struct { } func (s *NetworkTestSuite) setupNetworks( - peerCount int) map[types.NodeID]*Network { + pubKeys []crypto.PublicKey) map[types.NodeID]*Network { var ( server = NewFakeTransportServer() wg sync.WaitGroup @@ -43,8 +46,6 @@ func (s *NetworkTestSuite) setupNetworks( serverChannel, err := server.Host() s.Require().NoError(err) // Setup several network modules. - _, pubKeys, err := NewKeys(peerCount) - s.Require().NoError(err) networks := make(map[types.NodeID]*Network) for _, key := range pubKeys { n := NewNetwork( @@ -60,7 +61,7 @@ func (s *NetworkTestSuite) setupNetworks( go n.Run() }() } - s.Require().NoError(server.WaitForPeers(uint32(peerCount))) + s.Require().NoError(server.WaitForPeers(uint32(len(pubKeys)))) wg.Wait() return networks } @@ -112,7 +113,9 @@ func (s *NetworkTestSuite) TestPullBlocks() { peerCount = 10 req = s.Require() ) - networks := s.setupNetworks(peerCount) + _, pubKeys, err := NewKeys(peerCount) + req.NoError(err) + networks := s.setupNetworks(pubKeys) // Generate several random hashes. hashes := common.Hashes{} for range networks { @@ -132,6 +135,8 @@ func (s *NetworkTestSuite) TestPullBlocks() { req.NoError(master.trans.Send(n.ID, &types.Block{Hash: h})) } } + // Make sure each node receive their blocks. + time.Sleep(1 * time.Second) // Initiate a pull request from network 0 by removing corresponding hash in // hashes. master.PullBlocks(hashes) @@ -170,7 +175,9 @@ func (s *NetworkTestSuite) TestPullVotes() { voteTestCount = 15 req = s.Require() ) - networks := s.setupNetworks(peerCount) + _, pubKeys, err := NewKeys(peerCount) + req.NoError(err) + networks := s.setupNetworks(pubKeys) // Randomly pick one network instance as master. var master *Network for _, master = range networks { @@ -202,10 +209,6 @@ func (s *NetworkTestSuite) TestPullVotes() { notarySets[v.Position.Round][n.ID] = struct{}{} } } - // Let master knows all notary sets. - for i, notarySet := range notarySets { - master.appendRoundSetting(uint64(i), notarySet) - } // Randomly generate votes set to test. votesToTest := make(map[types.VoteHeader]struct{}) for len(votesToTest) < voteTestCount { @@ -237,6 +240,79 @@ func (s *NetworkTestSuite) TestPullVotes() { } } +func (s *NetworkTestSuite) TestBroadcastToSet() { + // Make sure when a network module attached to a utils.NodeSetCache, + // These function would broadcast to correct nodes, not all peers. + // - BroadcastVote, notary set. + // - BroadcastBlock, notary set. + // - BroadcastDKGPrivateShare, DKG set. + // - BroadcastDKGPartialSignature, DKG set. + var ( + req = s.Require() + peerCount = 5 + ) + _, pubKeys, err := NewKeys(peerCount) + req.NoError(err) + gov, err := NewGovernance(pubKeys, time.Second, 2) + req.NoError(err) + req.NoError(gov.State().RequestChange(StateChangeDKGSetSize, uint32(1))) + req.NoError(gov.State().RequestChange(StateChangeNotarySetSize, uint32(1))) + networks := s.setupNetworks(pubKeys) + cache := utils.NewNodeSetCache(gov) + // Cache required set of nodeIDs. + dkgSet, err := cache.GetDKGSet(0) + req.NoError(err) + req.Len(dkgSet, 1) + notarySet, err := cache.GetNotarySet(0, 0) + req.NoError(err) + req.Len(notarySet, 1) + var ( + // Some node don't belong to any set. + nerd *Network + dkgNode, notaryNode *Network + ) + for nID, n := range networks { + if _, exists := dkgSet[nID]; exists { + continue + } + if _, exists := notarySet[nID]; exists { + continue + } + nerd = n + break + } + for nID := range dkgSet { + dkgNode = networks[nID] + break + } + for nID := range notarySet { + notaryNode = networks[nID] + break + } + req.NotNil(nerd) + req.NotNil(dkgNode) + req.NotNil(notaryNode) + nerd.AddNodeSetCache(cache) + // Try broadcasting with datum from round 0, and make sure only node belongs + // to that set receiving the message. + nerd.BroadcastVote(&types.Vote{}) + req.IsType(<-notaryNode.ReceiveChan(), &types.Vote{}) + nerd.BroadcastBlock(&types.Block{}) + req.IsType(<-notaryNode.ReceiveChan(), &types.Block{}) + nerd.BroadcastDKGPrivateShare(&typesDKG.PrivateShare{}) + req.IsType(<-dkgNode.ReceiveChan(), &typesDKG.PrivateShare{}) + nerd.BroadcastDKGPartialSignature(&typesDKG.PartialSignature{}) + req.IsType(<-dkgNode.ReceiveChan(), &typesDKG.PartialSignature{}) + // There should be no remaining message in each node. + for _, n := range networks { + select { + case <-n.ReceiveChan(): + req.False(true) + default: + } + } +} + func TestNetwork(t *testing.T) { suite.Run(t, new(NetworkTestSuite)) } diff --git a/core/utils.go b/core/utils.go index 6b9ce63..a3da340 100644 --- a/core/utils.go +++ b/core/utils.go @@ -27,8 +27,13 @@ import ( "github.com/dexon-foundation/dexon-consensus/common" "github.com/dexon-foundation/dexon-consensus/core/crypto" "github.com/dexon-foundation/dexon-consensus/core/types" + "github.com/dexon-foundation/dexon-consensus/core/utils" ) +// NodeSetCache is type alias to avoid fullnode compile error when moving +// it to core/utils package. +type NodeSetCache = utils.NodeSetCache + var ( debug = false // ErrEmptyTimestamps would be reported if Block.timestamps is empty. diff --git a/integration_test/consensus_test.go b/integration_test/consensus_test.go index 356efb1..6bc6c4b 100644 --- a/integration_test/consensus_test.go +++ b/integration_test/consensus_test.go @@ -28,6 +28,7 @@ import ( "github.com/dexon-foundation/dexon-consensus/core/crypto" "github.com/dexon-foundation/dexon-consensus/core/test" "github.com/dexon-foundation/dexon-consensus/core/types" + "github.com/dexon-foundation/dexon-consensus/core/utils" "github.com/stretchr/testify/suite" ) @@ -69,6 +70,8 @@ func (s *ConsensusTestSuite) setupNodes( test.NetworkConfig{Type: test.NetworkTypeFake}) gov := seedGov.Clone() gov.SwitchToRemoteMode(networkModule) + gov.NotifyRoundHeight(0, 0) + networkModule.AddNodeSetCache(utils.NewNodeSetCache(gov)) app := test.NewApp(gov.State()) // Now is the consensus module. con := core.NewConsensus( @@ -114,7 +117,6 @@ func (s *ConsensusTestSuite) TestSimple() { req.NoError(err) req.NoError(seedGov.State().RequestChange( test.StateChangeRoundInterval, 25*time.Second)) - seedGov.NotifyRoundHeight(0, 0) // A short round interval. nodes := s.setupNodes(dMoment, prvKeys, seedGov) for _, n := range nodes { |