aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2018-11-15 13:05:50 +0800
committerGitHub <noreply@github.com>2018-11-15 13:05:50 +0800
commitce9da6912a16f064160781bbff8a9762e305bae9 (patch)
treef4de88f0488687e4480cc8461b8a3cbf03a1c85b
parent7b68cc8fa60d91a7c6ed2f78dc851da48d1fc258 (diff)
downloaddexon-consensus-ce9da6912a16f064160781bbff8a9762e305bae9.tar
dexon-consensus-ce9da6912a16f064160781bbff8a9762e305bae9.tar.gz
dexon-consensus-ce9da6912a16f064160781bbff8a9762e305bae9.tar.bz2
dexon-consensus-ce9da6912a16f064160781bbff8a9762e305bae9.tar.lz
dexon-consensus-ce9da6912a16f064160781bbff8a9762e305bae9.tar.xz
dexon-consensus-ce9da6912a16f064160781bbff8a9762e305bae9.tar.zst
dexon-consensus-ce9da6912a16f064160781bbff8a9762e305bae9.zip
test: fix network (#328)
* Broadcast to set of node instead of broadcasting when attaching cache. * Fix pull blocks
-rw-r--r--core/test/governance.go10
-rw-r--r--core/test/network.go151
-rw-r--r--core/test/network_test.go96
-rw-r--r--core/utils.go5
-rw-r--r--integration_test/consensus_test.go4
5 files changed, 184 insertions, 82 deletions
diff --git a/core/test/governance.go b/core/test/governance.go
index 58b773a..f96e9e7 100644
--- a/core/test/governance.go
+++ b/core/test/governance.go
@@ -237,16 +237,6 @@ func (g *Governance) CatchUpWithRound(round uint64) {
config, nodeSet := g.stateModule.Snapshot()
g.configs = append(g.configs, config)
g.nodeSets = append(g.nodeSets, nodeSet)
- if g.networkModule == nil {
- continue
- }
- // Notify network module for new notary set.
- round := uint64(len(g.configs)) - 1
- notarySet := make(map[types.NodeID]struct{})
- for _, k := range g.nodeSets[round] {
- notarySet[types.NewNodeID(k)] = struct{}{}
- }
- g.networkModule.appendRoundSetting(round, notarySet)
}
}
diff --git a/core/test/network.go b/core/test/network.go
index 6aff3d6..e603a45 100644
--- a/core/test/network.go
+++ b/core/test/network.go
@@ -32,11 +32,10 @@ import (
"github.com/dexon-foundation/dexon-consensus/core/crypto"
"github.com/dexon-foundation/dexon-consensus/core/types"
typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg"
+ "github.com/dexon-foundation/dexon-consensus/core/utils"
)
const (
- // Count of rounds of notary set cached in network module.
- cachedNotarySetSize = 10
// Count of maximum count of peers to pull votes from.
maxPullingPeerCount = 3
)
@@ -145,13 +144,15 @@ type Network struct {
voteCacheSize int
votePositions []types.Position
stateModule *State
- notarySetLock sync.RWMutex
- notarySets []map[types.NodeID]struct{}
- notarySetMinRound uint64
peers map[types.NodeID]struct{}
unreceivedBlocksLock sync.RWMutex
unreceivedBlocks map[common.Hash]chan<- common.Hash
latencyModel LatencyModel
+ cache *utils.NodeSetCache
+ notarySetCachesLock sync.Mutex
+ notarySetCaches map[uint64]map[uint32]map[types.NodeID]struct{}
+ dkgSetCachesLock sync.Mutex
+ dkgSetCaches map[uint64]map[types.NodeID]struct{}
}
// NewNetwork setup network stuffs for nodes, which provides an
@@ -169,6 +170,8 @@ func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel,
blockCache: make(map[common.Hash]*types.Block),
unreceivedBlocks: make(map[common.Hash]chan<- common.Hash),
latencyModel: latency,
+ notarySetCaches: make(map[uint64]map[uint32]map[types.NodeID]struct{}),
+ dkgSetCaches: make(map[uint64]map[types.NodeID]struct{}),
voteCache: make(
map[types.Position]map[types.VoteHeader]*types.Vote),
}
@@ -199,9 +202,8 @@ func (n *Network) PullVotes(pos types.Position) {
// BroadcastVote implements core.Network interface.
func (n *Network) BroadcastVote(vote *types.Vote) {
- if err := n.trans.Broadcast(vote); err != nil {
- panic(err)
- }
+ n.broadcastToSet(
+ n.getNotarySet(vote.Position.Round, vote.Position.ChainID), vote)
n.addVoteToCache(vote)
}
@@ -209,9 +211,8 @@ func (n *Network) BroadcastVote(vote *types.Vote) {
func (n *Network) BroadcastBlock(block *types.Block) {
// Avoid data race in fake transport.
block = n.cloneForFake(block).(*types.Block)
- if err := n.trans.Broadcast(block); err != nil {
- panic(err)
- }
+ n.broadcastToSet(
+ n.getNotarySet(block.Position.Round, block.Position.ChainID), block)
n.addBlockToCache(block)
}
@@ -257,13 +258,6 @@ func (n *Network) BroadcastRandomnessResult(
}
}
-// broadcast message to all other nodes in the network.
-func (n *Network) broadcast(message interface{}) {
- if err := n.trans.Broadcast(message); err != nil {
- panic(err)
- }
-}
-
// SendDKGPrivateShare implements core.Network interface.
func (n *Network) SendDKGPrivateShare(
recv crypto.PublicKey, prvShare *typesDKG.PrivateShare) {
@@ -275,17 +269,13 @@ func (n *Network) SendDKGPrivateShare(
// BroadcastDKGPrivateShare implements core.Network interface.
func (n *Network) BroadcastDKGPrivateShare(
prvShare *typesDKG.PrivateShare) {
- if err := n.trans.Broadcast(prvShare); err != nil {
- panic(err)
- }
+ n.broadcastToSet(n.getDKGSet(prvShare.Round), prvShare)
}
// BroadcastDKGPartialSignature implements core.Network interface.
func (n *Network) BroadcastDKGPartialSignature(
psig *typesDKG.PartialSignature) {
- if err := n.trans.Broadcast(psig); err != nil {
- panic(err)
- }
+ n.broadcastToSet(n.getDKGSet(psig.Round), psig)
}
// ReceiveChan implements core.Network interface.
@@ -329,6 +319,7 @@ func (n *Network) dispatchMsg(e *TransportEnvelope) {
if ch, exists := n.unreceivedBlocks[v.Hash]; exists {
ch <- v.Hash
}
+ delete(n.unreceivedBlocks, v.Hash)
}()
n.toConsensus <- v
case *types.Vote:
@@ -410,7 +401,7 @@ Loop:
if !ok {
break Loop
}
- n.dispatchMsg(e)
+ go n.dispatchMsg(e)
}
}
}
@@ -454,31 +445,16 @@ func (n *Network) ReceiveChanForNode() <-chan interface{} {
// addStateModule attaches a State instance to this network.
func (n *Network) addStateModule(s *State) {
+ // This variable should be attached before run, no lock to protect it.
n.stateModule = s
}
-// appendRoundSetting updates essential info to network module for each round.
-func (n *Network) appendRoundSetting(
- round uint64, notarySet map[types.NodeID]struct{}) {
- n.notarySetLock.Lock()
- defer n.notarySetLock.Unlock()
- if len(n.notarySets) != 0 {
- // This network module is already initialized, do some check against
- // the inputs.
- if round != n.notarySetMinRound+uint64(len(n.notarySets)) {
- panic(fmt.Errorf(
- "round not increasing when appending round setting: %v", round))
- }
- } else {
- n.notarySetMinRound = round
- }
- n.notarySets = append(n.notarySets, notarySet)
- // Purge cached notary sets.
- if len(n.notarySets) > cachedNotarySetSize {
- n.notarySets = n.notarySets[1:]
- n.notarySetMinRound++
- }
- return
+// AddNodeSetCache attaches an utils.NodeSetCache to this module. Once attached
+// The behavior of Broadcast-X methods would be switched to broadcast to correct
+// set of peers, instead of all peers.
+func (n *Network) AddNodeSetCache(cache *utils.NodeSetCache) {
+ // This variable should be attached before run, no lock to protect it.
+ n.cache = cache
}
func (n *Network) pullBlocksAsync(hashes common.Hashes) {
@@ -490,17 +466,10 @@ func (n *Network) pullBlocksAsync(hashes common.Hashes) {
defer n.unreceivedBlocksLock.Unlock()
for _, h := range hashes {
if _, exists := n.unreceivedBlocks[h]; exists {
- panic(fmt.Errorf("attempting to pull one block multiple times"))
+ continue
}
n.unreceivedBlocks[h] = ch
- }
- }()
- // Clean all unreceived block entry in unrecelivedBlocks field when leaving.
- defer func() {
- n.unreceivedBlocksLock.Lock()
- defer n.unreceivedBlocksLock.Unlock()
- for _, h := range hashes {
- delete(n.unreceivedBlocks, h)
+ notYetReceived[h] = struct{}{}
}
}()
req := &PullRequest{
@@ -511,6 +480,9 @@ func (n *Network) pullBlocksAsync(hashes common.Hashes) {
// Randomly pick peers to send pull requests.
Loop:
for nID := range n.peers {
+ if nID == n.ID {
+ continue
+ }
if err := n.trans.Send(nID, req); err != nil {
// Try next peer.
continue
@@ -547,11 +519,7 @@ func (n *Network) pullVotesAsync(pos types.Position) {
Identity: pos,
}
// Get corresponding notary set.
- notarySet := func() map[types.NodeID]struct{} {
- n.notarySetLock.Lock()
- defer n.notarySetLock.Unlock()
- return n.notarySets[pos.Round-n.notarySetMinRound]
- }()
+ notarySet := n.getNotarySet(pos.Round, pos.ChainID)
// Randomly select one peer from notary set and send a pull request.
sentCount := 0
for nID := range notarySet {
@@ -613,3 +581,64 @@ func (n *Network) cloneForFake(v interface{}) interface{} {
}
return v
}
+
+// getNotarySet gets notary set for that (round, chain) from cache.
+func (n *Network) getNotarySet(
+ round uint64, chain uint32) map[types.NodeID]struct{} {
+ if n.cache == nil {
+ // Default behavior is to broadcast to all peers, which makes it easier
+ // to be used in simple test cases.
+ return n.peers
+ }
+ n.notarySetCachesLock.Lock()
+ defer n.notarySetCachesLock.Unlock()
+ roundSets, exists := n.notarySetCaches[round]
+ if !exists {
+ roundSets = make(map[uint32]map[types.NodeID]struct{})
+ n.notarySetCaches[round] = roundSets
+ }
+ set, exists := roundSets[chain]
+ if !exists {
+ var err error
+ set, err = n.cache.GetNotarySet(round, chain)
+ if err != nil {
+ panic(err)
+ }
+ roundSets[chain] = set
+ }
+ return set
+}
+
+// getDKGSet gets DKG set for that round from cache.
+func (n *Network) getDKGSet(round uint64) map[types.NodeID]struct{} {
+ if n.cache == nil {
+ // Default behavior is to broadcast to all peers, which makes it easier
+ // to be used in simple test cases.
+ return n.peers
+ }
+ n.dkgSetCachesLock.Lock()
+ defer n.dkgSetCachesLock.Unlock()
+ set, exists := n.dkgSetCaches[round]
+ if !exists {
+ var err error
+ set, err = n.cache.GetDKGSet(round)
+ if err != nil {
+ panic(err)
+ }
+ n.dkgSetCaches[round] = set
+ }
+ return set
+}
+
+// broadcastToSet broadcast a message to a set of nodes.
+func (n *Network) broadcastToSet(
+ set map[types.NodeID]struct{}, msg interface{}) {
+ for nID := range set {
+ if nID == n.ID {
+ continue
+ }
+ if err := n.trans.Send(nID, msg); err != nil {
+ panic(err)
+ }
+ }
+}
diff --git a/core/test/network_test.go b/core/test/network_test.go
index fe11ed1..1f9ec6f 100644
--- a/core/test/network_test.go
+++ b/core/test/network_test.go
@@ -26,7 +26,10 @@ import (
"time"
"github.com/dexon-foundation/dexon-consensus/common"
+ "github.com/dexon-foundation/dexon-consensus/core/crypto"
"github.com/dexon-foundation/dexon-consensus/core/types"
+ typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg"
+ "github.com/dexon-foundation/dexon-consensus/core/utils"
"github.com/stretchr/testify/suite"
)
@@ -35,7 +38,7 @@ type NetworkTestSuite struct {
}
func (s *NetworkTestSuite) setupNetworks(
- peerCount int) map[types.NodeID]*Network {
+ pubKeys []crypto.PublicKey) map[types.NodeID]*Network {
var (
server = NewFakeTransportServer()
wg sync.WaitGroup
@@ -43,8 +46,6 @@ func (s *NetworkTestSuite) setupNetworks(
serverChannel, err := server.Host()
s.Require().NoError(err)
// Setup several network modules.
- _, pubKeys, err := NewKeys(peerCount)
- s.Require().NoError(err)
networks := make(map[types.NodeID]*Network)
for _, key := range pubKeys {
n := NewNetwork(
@@ -60,7 +61,7 @@ func (s *NetworkTestSuite) setupNetworks(
go n.Run()
}()
}
- s.Require().NoError(server.WaitForPeers(uint32(peerCount)))
+ s.Require().NoError(server.WaitForPeers(uint32(len(pubKeys))))
wg.Wait()
return networks
}
@@ -112,7 +113,9 @@ func (s *NetworkTestSuite) TestPullBlocks() {
peerCount = 10
req = s.Require()
)
- networks := s.setupNetworks(peerCount)
+ _, pubKeys, err := NewKeys(peerCount)
+ req.NoError(err)
+ networks := s.setupNetworks(pubKeys)
// Generate several random hashes.
hashes := common.Hashes{}
for range networks {
@@ -132,6 +135,8 @@ func (s *NetworkTestSuite) TestPullBlocks() {
req.NoError(master.trans.Send(n.ID, &types.Block{Hash: h}))
}
}
+ // Make sure each node receive their blocks.
+ time.Sleep(1 * time.Second)
// Initiate a pull request from network 0 by removing corresponding hash in
// hashes.
master.PullBlocks(hashes)
@@ -170,7 +175,9 @@ func (s *NetworkTestSuite) TestPullVotes() {
voteTestCount = 15
req = s.Require()
)
- networks := s.setupNetworks(peerCount)
+ _, pubKeys, err := NewKeys(peerCount)
+ req.NoError(err)
+ networks := s.setupNetworks(pubKeys)
// Randomly pick one network instance as master.
var master *Network
for _, master = range networks {
@@ -202,10 +209,6 @@ func (s *NetworkTestSuite) TestPullVotes() {
notarySets[v.Position.Round][n.ID] = struct{}{}
}
}
- // Let master knows all notary sets.
- for i, notarySet := range notarySets {
- master.appendRoundSetting(uint64(i), notarySet)
- }
// Randomly generate votes set to test.
votesToTest := make(map[types.VoteHeader]struct{})
for len(votesToTest) < voteTestCount {
@@ -237,6 +240,79 @@ func (s *NetworkTestSuite) TestPullVotes() {
}
}
+func (s *NetworkTestSuite) TestBroadcastToSet() {
+ // Make sure when a network module attached to a utils.NodeSetCache,
+ // These function would broadcast to correct nodes, not all peers.
+ // - BroadcastVote, notary set.
+ // - BroadcastBlock, notary set.
+ // - BroadcastDKGPrivateShare, DKG set.
+ // - BroadcastDKGPartialSignature, DKG set.
+ var (
+ req = s.Require()
+ peerCount = 5
+ )
+ _, pubKeys, err := NewKeys(peerCount)
+ req.NoError(err)
+ gov, err := NewGovernance(pubKeys, time.Second, 2)
+ req.NoError(err)
+ req.NoError(gov.State().RequestChange(StateChangeDKGSetSize, uint32(1)))
+ req.NoError(gov.State().RequestChange(StateChangeNotarySetSize, uint32(1)))
+ networks := s.setupNetworks(pubKeys)
+ cache := utils.NewNodeSetCache(gov)
+ // Cache required set of nodeIDs.
+ dkgSet, err := cache.GetDKGSet(0)
+ req.NoError(err)
+ req.Len(dkgSet, 1)
+ notarySet, err := cache.GetNotarySet(0, 0)
+ req.NoError(err)
+ req.Len(notarySet, 1)
+ var (
+ // Some node don't belong to any set.
+ nerd *Network
+ dkgNode, notaryNode *Network
+ )
+ for nID, n := range networks {
+ if _, exists := dkgSet[nID]; exists {
+ continue
+ }
+ if _, exists := notarySet[nID]; exists {
+ continue
+ }
+ nerd = n
+ break
+ }
+ for nID := range dkgSet {
+ dkgNode = networks[nID]
+ break
+ }
+ for nID := range notarySet {
+ notaryNode = networks[nID]
+ break
+ }
+ req.NotNil(nerd)
+ req.NotNil(dkgNode)
+ req.NotNil(notaryNode)
+ nerd.AddNodeSetCache(cache)
+ // Try broadcasting with datum from round 0, and make sure only node belongs
+ // to that set receiving the message.
+ nerd.BroadcastVote(&types.Vote{})
+ req.IsType(<-notaryNode.ReceiveChan(), &types.Vote{})
+ nerd.BroadcastBlock(&types.Block{})
+ req.IsType(<-notaryNode.ReceiveChan(), &types.Block{})
+ nerd.BroadcastDKGPrivateShare(&typesDKG.PrivateShare{})
+ req.IsType(<-dkgNode.ReceiveChan(), &typesDKG.PrivateShare{})
+ nerd.BroadcastDKGPartialSignature(&typesDKG.PartialSignature{})
+ req.IsType(<-dkgNode.ReceiveChan(), &typesDKG.PartialSignature{})
+ // There should be no remaining message in each node.
+ for _, n := range networks {
+ select {
+ case <-n.ReceiveChan():
+ req.False(true)
+ default:
+ }
+ }
+}
+
func TestNetwork(t *testing.T) {
suite.Run(t, new(NetworkTestSuite))
}
diff --git a/core/utils.go b/core/utils.go
index 6b9ce63..a3da340 100644
--- a/core/utils.go
+++ b/core/utils.go
@@ -27,8 +27,13 @@ import (
"github.com/dexon-foundation/dexon-consensus/common"
"github.com/dexon-foundation/dexon-consensus/core/crypto"
"github.com/dexon-foundation/dexon-consensus/core/types"
+ "github.com/dexon-foundation/dexon-consensus/core/utils"
)
+// NodeSetCache is type alias to avoid fullnode compile error when moving
+// it to core/utils package.
+type NodeSetCache = utils.NodeSetCache
+
var (
debug = false
// ErrEmptyTimestamps would be reported if Block.timestamps is empty.
diff --git a/integration_test/consensus_test.go b/integration_test/consensus_test.go
index 356efb1..6bc6c4b 100644
--- a/integration_test/consensus_test.go
+++ b/integration_test/consensus_test.go
@@ -28,6 +28,7 @@ import (
"github.com/dexon-foundation/dexon-consensus/core/crypto"
"github.com/dexon-foundation/dexon-consensus/core/test"
"github.com/dexon-foundation/dexon-consensus/core/types"
+ "github.com/dexon-foundation/dexon-consensus/core/utils"
"github.com/stretchr/testify/suite"
)
@@ -69,6 +70,8 @@ func (s *ConsensusTestSuite) setupNodes(
test.NetworkConfig{Type: test.NetworkTypeFake})
gov := seedGov.Clone()
gov.SwitchToRemoteMode(networkModule)
+ gov.NotifyRoundHeight(0, 0)
+ networkModule.AddNodeSetCache(utils.NewNodeSetCache(gov))
app := test.NewApp(gov.State())
// Now is the consensus module.
con := core.NewConsensus(
@@ -114,7 +117,6 @@ func (s *ConsensusTestSuite) TestSimple() {
req.NoError(err)
req.NoError(seedGov.State().RequestChange(
test.StateChangeRoundInterval, 25*time.Second))
- seedGov.NotifyRoundHeight(0, 0)
// A short round interval.
nodes := s.setupNodes(dMoment, prvKeys, seedGov)
for _, n := range nodes {