aboutsummaryrefslogtreecommitdiffstats
path: root/core/test/network.go
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2018-11-15 13:05:50 +0800
committerGitHub <noreply@github.com>2018-11-15 13:05:50 +0800
commitce9da6912a16f064160781bbff8a9762e305bae9 (patch)
treef4de88f0488687e4480cc8461b8a3cbf03a1c85b /core/test/network.go
parent7b68cc8fa60d91a7c6ed2f78dc851da48d1fc258 (diff)
downloaddexon-consensus-ce9da6912a16f064160781bbff8a9762e305bae9.tar
dexon-consensus-ce9da6912a16f064160781bbff8a9762e305bae9.tar.gz
dexon-consensus-ce9da6912a16f064160781bbff8a9762e305bae9.tar.bz2
dexon-consensus-ce9da6912a16f064160781bbff8a9762e305bae9.tar.lz
dexon-consensus-ce9da6912a16f064160781bbff8a9762e305bae9.tar.xz
dexon-consensus-ce9da6912a16f064160781bbff8a9762e305bae9.tar.zst
dexon-consensus-ce9da6912a16f064160781bbff8a9762e305bae9.zip
test: fix network (#328)
* Broadcast to set of node instead of broadcasting when attaching cache. * Fix pull blocks
Diffstat (limited to 'core/test/network.go')
-rw-r--r--core/test/network.go151
1 files changed, 90 insertions, 61 deletions
diff --git a/core/test/network.go b/core/test/network.go
index 6aff3d6..e603a45 100644
--- a/core/test/network.go
+++ b/core/test/network.go
@@ -32,11 +32,10 @@ import (
"github.com/dexon-foundation/dexon-consensus/core/crypto"
"github.com/dexon-foundation/dexon-consensus/core/types"
typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg"
+ "github.com/dexon-foundation/dexon-consensus/core/utils"
)
const (
- // Count of rounds of notary set cached in network module.
- cachedNotarySetSize = 10
// Count of maximum count of peers to pull votes from.
maxPullingPeerCount = 3
)
@@ -145,13 +144,15 @@ type Network struct {
voteCacheSize int
votePositions []types.Position
stateModule *State
- notarySetLock sync.RWMutex
- notarySets []map[types.NodeID]struct{}
- notarySetMinRound uint64
peers map[types.NodeID]struct{}
unreceivedBlocksLock sync.RWMutex
unreceivedBlocks map[common.Hash]chan<- common.Hash
latencyModel LatencyModel
+ cache *utils.NodeSetCache
+ notarySetCachesLock sync.Mutex
+ notarySetCaches map[uint64]map[uint32]map[types.NodeID]struct{}
+ dkgSetCachesLock sync.Mutex
+ dkgSetCaches map[uint64]map[types.NodeID]struct{}
}
// NewNetwork setup network stuffs for nodes, which provides an
@@ -169,6 +170,8 @@ func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel,
blockCache: make(map[common.Hash]*types.Block),
unreceivedBlocks: make(map[common.Hash]chan<- common.Hash),
latencyModel: latency,
+ notarySetCaches: make(map[uint64]map[uint32]map[types.NodeID]struct{}),
+ dkgSetCaches: make(map[uint64]map[types.NodeID]struct{}),
voteCache: make(
map[types.Position]map[types.VoteHeader]*types.Vote),
}
@@ -199,9 +202,8 @@ func (n *Network) PullVotes(pos types.Position) {
// BroadcastVote implements core.Network interface.
func (n *Network) BroadcastVote(vote *types.Vote) {
- if err := n.trans.Broadcast(vote); err != nil {
- panic(err)
- }
+ n.broadcastToSet(
+ n.getNotarySet(vote.Position.Round, vote.Position.ChainID), vote)
n.addVoteToCache(vote)
}
@@ -209,9 +211,8 @@ func (n *Network) BroadcastVote(vote *types.Vote) {
func (n *Network) BroadcastBlock(block *types.Block) {
// Avoid data race in fake transport.
block = n.cloneForFake(block).(*types.Block)
- if err := n.trans.Broadcast(block); err != nil {
- panic(err)
- }
+ n.broadcastToSet(
+ n.getNotarySet(block.Position.Round, block.Position.ChainID), block)
n.addBlockToCache(block)
}
@@ -257,13 +258,6 @@ func (n *Network) BroadcastRandomnessResult(
}
}
-// broadcast message to all other nodes in the network.
-func (n *Network) broadcast(message interface{}) {
- if err := n.trans.Broadcast(message); err != nil {
- panic(err)
- }
-}
-
// SendDKGPrivateShare implements core.Network interface.
func (n *Network) SendDKGPrivateShare(
recv crypto.PublicKey, prvShare *typesDKG.PrivateShare) {
@@ -275,17 +269,13 @@ func (n *Network) SendDKGPrivateShare(
// BroadcastDKGPrivateShare implements core.Network interface.
func (n *Network) BroadcastDKGPrivateShare(
prvShare *typesDKG.PrivateShare) {
- if err := n.trans.Broadcast(prvShare); err != nil {
- panic(err)
- }
+ n.broadcastToSet(n.getDKGSet(prvShare.Round), prvShare)
}
// BroadcastDKGPartialSignature implements core.Network interface.
func (n *Network) BroadcastDKGPartialSignature(
psig *typesDKG.PartialSignature) {
- if err := n.trans.Broadcast(psig); err != nil {
- panic(err)
- }
+ n.broadcastToSet(n.getDKGSet(psig.Round), psig)
}
// ReceiveChan implements core.Network interface.
@@ -329,6 +319,7 @@ func (n *Network) dispatchMsg(e *TransportEnvelope) {
if ch, exists := n.unreceivedBlocks[v.Hash]; exists {
ch <- v.Hash
}
+ delete(n.unreceivedBlocks, v.Hash)
}()
n.toConsensus <- v
case *types.Vote:
@@ -410,7 +401,7 @@ Loop:
if !ok {
break Loop
}
- n.dispatchMsg(e)
+ go n.dispatchMsg(e)
}
}
}
@@ -454,31 +445,16 @@ func (n *Network) ReceiveChanForNode() <-chan interface{} {
// addStateModule attaches a State instance to this network.
func (n *Network) addStateModule(s *State) {
+ // This variable should be attached before run, no lock to protect it.
n.stateModule = s
}
-// appendRoundSetting updates essential info to network module for each round.
-func (n *Network) appendRoundSetting(
- round uint64, notarySet map[types.NodeID]struct{}) {
- n.notarySetLock.Lock()
- defer n.notarySetLock.Unlock()
- if len(n.notarySets) != 0 {
- // This network module is already initialized, do some check against
- // the inputs.
- if round != n.notarySetMinRound+uint64(len(n.notarySets)) {
- panic(fmt.Errorf(
- "round not increasing when appending round setting: %v", round))
- }
- } else {
- n.notarySetMinRound = round
- }
- n.notarySets = append(n.notarySets, notarySet)
- // Purge cached notary sets.
- if len(n.notarySets) > cachedNotarySetSize {
- n.notarySets = n.notarySets[1:]
- n.notarySetMinRound++
- }
- return
+// AddNodeSetCache attaches an utils.NodeSetCache to this module. Once attached
+// The behavior of Broadcast-X methods would be switched to broadcast to correct
+// set of peers, instead of all peers.
+func (n *Network) AddNodeSetCache(cache *utils.NodeSetCache) {
+ // This variable should be attached before run, no lock to protect it.
+ n.cache = cache
}
func (n *Network) pullBlocksAsync(hashes common.Hashes) {
@@ -490,17 +466,10 @@ func (n *Network) pullBlocksAsync(hashes common.Hashes) {
defer n.unreceivedBlocksLock.Unlock()
for _, h := range hashes {
if _, exists := n.unreceivedBlocks[h]; exists {
- panic(fmt.Errorf("attempting to pull one block multiple times"))
+ continue
}
n.unreceivedBlocks[h] = ch
- }
- }()
- // Clean all unreceived block entry in unrecelivedBlocks field when leaving.
- defer func() {
- n.unreceivedBlocksLock.Lock()
- defer n.unreceivedBlocksLock.Unlock()
- for _, h := range hashes {
- delete(n.unreceivedBlocks, h)
+ notYetReceived[h] = struct{}{}
}
}()
req := &PullRequest{
@@ -511,6 +480,9 @@ func (n *Network) pullBlocksAsync(hashes common.Hashes) {
// Randomly pick peers to send pull requests.
Loop:
for nID := range n.peers {
+ if nID == n.ID {
+ continue
+ }
if err := n.trans.Send(nID, req); err != nil {
// Try next peer.
continue
@@ -547,11 +519,7 @@ func (n *Network) pullVotesAsync(pos types.Position) {
Identity: pos,
}
// Get corresponding notary set.
- notarySet := func() map[types.NodeID]struct{} {
- n.notarySetLock.Lock()
- defer n.notarySetLock.Unlock()
- return n.notarySets[pos.Round-n.notarySetMinRound]
- }()
+ notarySet := n.getNotarySet(pos.Round, pos.ChainID)
// Randomly select one peer from notary set and send a pull request.
sentCount := 0
for nID := range notarySet {
@@ -613,3 +581,64 @@ func (n *Network) cloneForFake(v interface{}) interface{} {
}
return v
}
+
+// getNotarySet gets notary set for that (round, chain) from cache.
+func (n *Network) getNotarySet(
+ round uint64, chain uint32) map[types.NodeID]struct{} {
+ if n.cache == nil {
+ // Default behavior is to broadcast to all peers, which makes it easier
+ // to be used in simple test cases.
+ return n.peers
+ }
+ n.notarySetCachesLock.Lock()
+ defer n.notarySetCachesLock.Unlock()
+ roundSets, exists := n.notarySetCaches[round]
+ if !exists {
+ roundSets = make(map[uint32]map[types.NodeID]struct{})
+ n.notarySetCaches[round] = roundSets
+ }
+ set, exists := roundSets[chain]
+ if !exists {
+ var err error
+ set, err = n.cache.GetNotarySet(round, chain)
+ if err != nil {
+ panic(err)
+ }
+ roundSets[chain] = set
+ }
+ return set
+}
+
+// getDKGSet gets DKG set for that round from cache.
+func (n *Network) getDKGSet(round uint64) map[types.NodeID]struct{} {
+ if n.cache == nil {
+ // Default behavior is to broadcast to all peers, which makes it easier
+ // to be used in simple test cases.
+ return n.peers
+ }
+ n.dkgSetCachesLock.Lock()
+ defer n.dkgSetCachesLock.Unlock()
+ set, exists := n.dkgSetCaches[round]
+ if !exists {
+ var err error
+ set, err = n.cache.GetDKGSet(round)
+ if err != nil {
+ panic(err)
+ }
+ n.dkgSetCaches[round] = set
+ }
+ return set
+}
+
+// broadcastToSet broadcast a message to a set of nodes.
+func (n *Network) broadcastToSet(
+ set map[types.NodeID]struct{}, msg interface{}) {
+ for nID := range set {
+ if nID == n.ID {
+ continue
+ }
+ if err := n.trans.Send(nID, msg); err != nil {
+ panic(err)
+ }
+ }
+}