diff options
author | Mission Liao <mission.liao@dexon.org> | 2018-11-15 13:05:50 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-11-15 13:05:50 +0800 |
commit | ce9da6912a16f064160781bbff8a9762e305bae9 (patch) | |
tree | f4de88f0488687e4480cc8461b8a3cbf03a1c85b /core/test/network.go | |
parent | 7b68cc8fa60d91a7c6ed2f78dc851da48d1fc258 (diff) | |
download | dexon-consensus-ce9da6912a16f064160781bbff8a9762e305bae9.tar dexon-consensus-ce9da6912a16f064160781bbff8a9762e305bae9.tar.gz dexon-consensus-ce9da6912a16f064160781bbff8a9762e305bae9.tar.bz2 dexon-consensus-ce9da6912a16f064160781bbff8a9762e305bae9.tar.lz dexon-consensus-ce9da6912a16f064160781bbff8a9762e305bae9.tar.xz dexon-consensus-ce9da6912a16f064160781bbff8a9762e305bae9.tar.zst dexon-consensus-ce9da6912a16f064160781bbff8a9762e305bae9.zip |
test: fix network (#328)
* Broadcast to set of node instead of broadcasting when attaching cache.
* Fix pull blocks
Diffstat (limited to 'core/test/network.go')
-rw-r--r-- | core/test/network.go | 151 |
1 files changed, 90 insertions, 61 deletions
diff --git a/core/test/network.go b/core/test/network.go index 6aff3d6..e603a45 100644 --- a/core/test/network.go +++ b/core/test/network.go @@ -32,11 +32,10 @@ import ( "github.com/dexon-foundation/dexon-consensus/core/crypto" "github.com/dexon-foundation/dexon-consensus/core/types" typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg" + "github.com/dexon-foundation/dexon-consensus/core/utils" ) const ( - // Count of rounds of notary set cached in network module. - cachedNotarySetSize = 10 // Count of maximum count of peers to pull votes from. maxPullingPeerCount = 3 ) @@ -145,13 +144,15 @@ type Network struct { voteCacheSize int votePositions []types.Position stateModule *State - notarySetLock sync.RWMutex - notarySets []map[types.NodeID]struct{} - notarySetMinRound uint64 peers map[types.NodeID]struct{} unreceivedBlocksLock sync.RWMutex unreceivedBlocks map[common.Hash]chan<- common.Hash latencyModel LatencyModel + cache *utils.NodeSetCache + notarySetCachesLock sync.Mutex + notarySetCaches map[uint64]map[uint32]map[types.NodeID]struct{} + dkgSetCachesLock sync.Mutex + dkgSetCaches map[uint64]map[types.NodeID]struct{} } // NewNetwork setup network stuffs for nodes, which provides an @@ -169,6 +170,8 @@ func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel, blockCache: make(map[common.Hash]*types.Block), unreceivedBlocks: make(map[common.Hash]chan<- common.Hash), latencyModel: latency, + notarySetCaches: make(map[uint64]map[uint32]map[types.NodeID]struct{}), + dkgSetCaches: make(map[uint64]map[types.NodeID]struct{}), voteCache: make( map[types.Position]map[types.VoteHeader]*types.Vote), } @@ -199,9 +202,8 @@ func (n *Network) PullVotes(pos types.Position) { // BroadcastVote implements core.Network interface. func (n *Network) BroadcastVote(vote *types.Vote) { - if err := n.trans.Broadcast(vote); err != nil { - panic(err) - } + n.broadcastToSet( + n.getNotarySet(vote.Position.Round, vote.Position.ChainID), vote) n.addVoteToCache(vote) } @@ -209,9 +211,8 @@ func (n *Network) BroadcastVote(vote *types.Vote) { func (n *Network) BroadcastBlock(block *types.Block) { // Avoid data race in fake transport. block = n.cloneForFake(block).(*types.Block) - if err := n.trans.Broadcast(block); err != nil { - panic(err) - } + n.broadcastToSet( + n.getNotarySet(block.Position.Round, block.Position.ChainID), block) n.addBlockToCache(block) } @@ -257,13 +258,6 @@ func (n *Network) BroadcastRandomnessResult( } } -// broadcast message to all other nodes in the network. -func (n *Network) broadcast(message interface{}) { - if err := n.trans.Broadcast(message); err != nil { - panic(err) - } -} - // SendDKGPrivateShare implements core.Network interface. func (n *Network) SendDKGPrivateShare( recv crypto.PublicKey, prvShare *typesDKG.PrivateShare) { @@ -275,17 +269,13 @@ func (n *Network) SendDKGPrivateShare( // BroadcastDKGPrivateShare implements core.Network interface. func (n *Network) BroadcastDKGPrivateShare( prvShare *typesDKG.PrivateShare) { - if err := n.trans.Broadcast(prvShare); err != nil { - panic(err) - } + n.broadcastToSet(n.getDKGSet(prvShare.Round), prvShare) } // BroadcastDKGPartialSignature implements core.Network interface. func (n *Network) BroadcastDKGPartialSignature( psig *typesDKG.PartialSignature) { - if err := n.trans.Broadcast(psig); err != nil { - panic(err) - } + n.broadcastToSet(n.getDKGSet(psig.Round), psig) } // ReceiveChan implements core.Network interface. @@ -329,6 +319,7 @@ func (n *Network) dispatchMsg(e *TransportEnvelope) { if ch, exists := n.unreceivedBlocks[v.Hash]; exists { ch <- v.Hash } + delete(n.unreceivedBlocks, v.Hash) }() n.toConsensus <- v case *types.Vote: @@ -410,7 +401,7 @@ Loop: if !ok { break Loop } - n.dispatchMsg(e) + go n.dispatchMsg(e) } } } @@ -454,31 +445,16 @@ func (n *Network) ReceiveChanForNode() <-chan interface{} { // addStateModule attaches a State instance to this network. func (n *Network) addStateModule(s *State) { + // This variable should be attached before run, no lock to protect it. n.stateModule = s } -// appendRoundSetting updates essential info to network module for each round. -func (n *Network) appendRoundSetting( - round uint64, notarySet map[types.NodeID]struct{}) { - n.notarySetLock.Lock() - defer n.notarySetLock.Unlock() - if len(n.notarySets) != 0 { - // This network module is already initialized, do some check against - // the inputs. - if round != n.notarySetMinRound+uint64(len(n.notarySets)) { - panic(fmt.Errorf( - "round not increasing when appending round setting: %v", round)) - } - } else { - n.notarySetMinRound = round - } - n.notarySets = append(n.notarySets, notarySet) - // Purge cached notary sets. - if len(n.notarySets) > cachedNotarySetSize { - n.notarySets = n.notarySets[1:] - n.notarySetMinRound++ - } - return +// AddNodeSetCache attaches an utils.NodeSetCache to this module. Once attached +// The behavior of Broadcast-X methods would be switched to broadcast to correct +// set of peers, instead of all peers. +func (n *Network) AddNodeSetCache(cache *utils.NodeSetCache) { + // This variable should be attached before run, no lock to protect it. + n.cache = cache } func (n *Network) pullBlocksAsync(hashes common.Hashes) { @@ -490,17 +466,10 @@ func (n *Network) pullBlocksAsync(hashes common.Hashes) { defer n.unreceivedBlocksLock.Unlock() for _, h := range hashes { if _, exists := n.unreceivedBlocks[h]; exists { - panic(fmt.Errorf("attempting to pull one block multiple times")) + continue } n.unreceivedBlocks[h] = ch - } - }() - // Clean all unreceived block entry in unrecelivedBlocks field when leaving. - defer func() { - n.unreceivedBlocksLock.Lock() - defer n.unreceivedBlocksLock.Unlock() - for _, h := range hashes { - delete(n.unreceivedBlocks, h) + notYetReceived[h] = struct{}{} } }() req := &PullRequest{ @@ -511,6 +480,9 @@ func (n *Network) pullBlocksAsync(hashes common.Hashes) { // Randomly pick peers to send pull requests. Loop: for nID := range n.peers { + if nID == n.ID { + continue + } if err := n.trans.Send(nID, req); err != nil { // Try next peer. continue @@ -547,11 +519,7 @@ func (n *Network) pullVotesAsync(pos types.Position) { Identity: pos, } // Get corresponding notary set. - notarySet := func() map[types.NodeID]struct{} { - n.notarySetLock.Lock() - defer n.notarySetLock.Unlock() - return n.notarySets[pos.Round-n.notarySetMinRound] - }() + notarySet := n.getNotarySet(pos.Round, pos.ChainID) // Randomly select one peer from notary set and send a pull request. sentCount := 0 for nID := range notarySet { @@ -613,3 +581,64 @@ func (n *Network) cloneForFake(v interface{}) interface{} { } return v } + +// getNotarySet gets notary set for that (round, chain) from cache. +func (n *Network) getNotarySet( + round uint64, chain uint32) map[types.NodeID]struct{} { + if n.cache == nil { + // Default behavior is to broadcast to all peers, which makes it easier + // to be used in simple test cases. + return n.peers + } + n.notarySetCachesLock.Lock() + defer n.notarySetCachesLock.Unlock() + roundSets, exists := n.notarySetCaches[round] + if !exists { + roundSets = make(map[uint32]map[types.NodeID]struct{}) + n.notarySetCaches[round] = roundSets + } + set, exists := roundSets[chain] + if !exists { + var err error + set, err = n.cache.GetNotarySet(round, chain) + if err != nil { + panic(err) + } + roundSets[chain] = set + } + return set +} + +// getDKGSet gets DKG set for that round from cache. +func (n *Network) getDKGSet(round uint64) map[types.NodeID]struct{} { + if n.cache == nil { + // Default behavior is to broadcast to all peers, which makes it easier + // to be used in simple test cases. + return n.peers + } + n.dkgSetCachesLock.Lock() + defer n.dkgSetCachesLock.Unlock() + set, exists := n.dkgSetCaches[round] + if !exists { + var err error + set, err = n.cache.GetDKGSet(round) + if err != nil { + panic(err) + } + n.dkgSetCaches[round] = set + } + return set +} + +// broadcastToSet broadcast a message to a set of nodes. +func (n *Network) broadcastToSet( + set map[types.NodeID]struct{}, msg interface{}) { + for nID := range set { + if nID == n.ID { + continue + } + if err := n.trans.Send(nID, msg); err != nil { + panic(err) + } + } +} |