aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2018-11-09 22:15:28 +0800
committerGitHub <noreply@github.com>2018-11-09 22:15:28 +0800
commit81372e5746fedf0ad691ab628096b7caefbe3008 (patch)
tree1b705364be7ecda7c322230deb441820b8858613 /core
parenta7e3c046693d9df860422e71a14c8659b36afe6a (diff)
downloaddexon-consensus-81372e5746fedf0ad691ab628096b7caefbe3008.tar
dexon-consensus-81372e5746fedf0ad691ab628096b7caefbe3008.tar.gz
dexon-consensus-81372e5746fedf0ad691ab628096b7caefbe3008.tar.bz2
dexon-consensus-81372e5746fedf0ad691ab628096b7caefbe3008.tar.lz
dexon-consensus-81372e5746fedf0ad691ab628096b7caefbe3008.tar.xz
dexon-consensus-81372e5746fedf0ad691ab628096b7caefbe3008.tar.zst
dexon-consensus-81372e5746fedf0ad691ab628096b7caefbe3008.zip
test: implement pulling in network layer (#314)
* Add definition for test.PullRequest * Cache notary sets for each round in network module * Cache peers as nodeID in network module. * Implement pull blocks * Implement pull vote
Diffstat (limited to 'core')
-rw-r--r--core/test/governance.go10
-rw-r--r--core/test/marshaller.go9
-rw-r--r--core/test/network.go362
-rw-r--r--core/test/network_test.go242
4 files changed, 580 insertions, 43 deletions
diff --git a/core/test/governance.go b/core/test/governance.go
index e6b6232..f27139f 100644
--- a/core/test/governance.go
+++ b/core/test/governance.go
@@ -232,6 +232,16 @@ func (g *Governance) CatchUpWithRound(round uint64) {
config, nodeSet := g.stateModule.Snapshot()
g.configs = append(g.configs, config)
g.nodeSets = append(g.nodeSets, nodeSet)
+ if g.networkModule == nil {
+ continue
+ }
+ // Notify network module for new notary set.
+ round := uint64(len(g.configs)) - 1
+ notarySet := make(map[types.NodeID]struct{})
+ for _, k := range g.nodeSets[round] {
+ notarySet[types.NewNodeID(k)] = struct{}{}
+ }
+ g.networkModule.appendRoundSetting(round, notarySet)
}
}
diff --git a/core/test/marshaller.go b/core/test/marshaller.go
index a1b15b6..5f15e11 100644
--- a/core/test/marshaller.go
+++ b/core/test/marshaller.go
@@ -101,6 +101,12 @@ func (m *DefaultMarshaller) Unmarshal(
break
}
msg = *packed
+ case "pull-request":
+ req := &PullRequest{}
+ if err = json.Unmarshal(payload, req); err != nil {
+ break
+ }
+ msg = req
default:
if m.fallback == nil {
err = fmt.Errorf("unknown msg type: %v", msgType)
@@ -145,6 +151,9 @@ func (m *DefaultMarshaller) Marshal(
case packedStateChanges:
msgType = "packed-state-changes"
payload, err = json.Marshal(msg)
+ case *PullRequest:
+ msgType = "pull-request"
+ payload, err = json.Marshal(msg)
default:
if m.fallback == nil {
err = fmt.Errorf("unknwon message type: %v", msg)
diff --git a/core/test/network.go b/core/test/network.go
index 0d92af0..949b158 100644
--- a/core/test/network.go
+++ b/core/test/network.go
@@ -19,11 +19,14 @@ package test
import (
"context"
+ "encoding/json"
"errors"
"fmt"
+ "log"
"net"
"strconv"
"sync"
+ "time"
"github.com/dexon-foundation/dexon-consensus/common"
"github.com/dexon-foundation/dexon-consensus/core/crypto"
@@ -31,6 +34,13 @@ import (
typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg"
)
+const (
+ // Count of rounds of notary set cached in network module.
+ cachedNotarySetSize = 10
+ // Count of maximum count of peers to pull votes from.
+ maxPullingPeerCount = 3
+)
+
// NetworkType is the simulation network type.
type NetworkType string
@@ -48,22 +58,100 @@ type NetworkConfig struct {
PeerPort int
}
+// PullRequest is a generic request to pull everything (ex. vote, block...).
+type PullRequest struct {
+ Requester types.NodeID
+ Type string
+ Identity interface{}
+}
+
+// MarshalJSON implements json.Marshaller.
+func (req *PullRequest) MarshalJSON() (b []byte, err error) {
+ var idAsBytes []byte
+ // Make sure caller prepare correct identity for pull requests.
+ switch req.Type {
+ case "block":
+ idAsBytes, err = json.Marshal(req.Identity.(common.Hashes))
+ case "vote":
+ idAsBytes, err = json.Marshal(req.Identity.(types.Position))
+ default:
+ err = fmt.Errorf("unknown ID type for pull request: %v", req.Type)
+ }
+ if err != nil {
+ return
+ }
+ b, err = json.Marshal(&struct {
+ Requester types.NodeID `json:"req"`
+ Type string `json:"type"`
+ Identity []byte `json:"id"`
+ }{req.Requester, req.Type, idAsBytes})
+ return
+}
+
+// UnmarshalJSON iumplements json.Unmarshaller.
+func (req *PullRequest) UnmarshalJSON(data []byte) (err error) {
+ rawReq := &struct {
+ Requester types.NodeID `json:"req"`
+ Type string `json:"type"`
+ Identity []byte `json:"id"`
+ }{}
+ if err = json.Unmarshal(data, rawReq); err != nil {
+ return
+ }
+ var ID interface{}
+ switch rawReq.Type {
+ case "block":
+ hashes := common.Hashes{}
+ if err = json.Unmarshal(rawReq.Identity, &hashes); err != nil {
+ break
+ }
+ ID = hashes
+ case "vote":
+ pos := types.Position{}
+ if err = json.Unmarshal(rawReq.Identity, &pos); err != nil {
+ break
+ }
+ ID = pos
+ default:
+ err = fmt.Errorf("unknown pull request type: %v", rawReq.Type)
+ }
+ if err != nil {
+ return
+ }
+ req.Requester = rawReq.Requester
+ req.Type = rawReq.Type
+ req.Identity = ID
+ return
+}
+
// Network implements core.Network interface based on TransportClient.
type Network struct {
- config NetworkConfig
- ctx context.Context
- ctxCancel context.CancelFunc
- trans TransportClient
- fromTransport <-chan *TransportEnvelope
- toConsensus chan interface{}
- toNode chan interface{}
- sentRandomnessLock sync.Mutex
- sentRandomness map[common.Hash]struct{}
- sentAgreementLock sync.Mutex
- sentAgreement map[common.Hash]struct{}
- blockCacheLock sync.RWMutex
- blockCache map[common.Hash]*types.Block
- stateModule *State
+ ID types.NodeID
+ config NetworkConfig
+ ctx context.Context
+ ctxCancel context.CancelFunc
+ trans TransportClient
+ fromTransport <-chan *TransportEnvelope
+ toConsensus chan interface{}
+ toNode chan interface{}
+ sentRandomnessLock sync.Mutex
+ sentRandomness map[common.Hash]struct{}
+ sentAgreementLock sync.Mutex
+ sentAgreement map[common.Hash]struct{}
+ blockCacheLock sync.RWMutex
+ blockCache map[common.Hash]*types.Block
+ voteCacheLock sync.RWMutex
+ voteCache map[types.Position]map[types.VoteHeader]*types.Vote
+ voteCacheSize int
+ votePositions []types.Position
+ stateModule *State
+ notarySetLock sync.RWMutex
+ notarySets []map[types.NodeID]struct{}
+ notarySetMinRound uint64
+ peers map[types.NodeID]struct{}
+ unreceivedBlocksLock sync.RWMutex
+ unreceivedBlocks map[common.Hash]chan<- common.Hash
+ latencyModel LatencyModel
}
// NewNetwork setup network stuffs for nodes, which provides an
@@ -72,12 +160,17 @@ func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel,
marshaller Marshaller, config NetworkConfig) (n *Network) {
// Construct basic network instance.
n = &Network{
- config: config,
- toConsensus: make(chan interface{}, 1000),
- toNode: make(chan interface{}, 1000),
- sentRandomness: make(map[common.Hash]struct{}),
- sentAgreement: make(map[common.Hash]struct{}),
- blockCache: make(map[common.Hash]*types.Block),
+ ID: types.NewNodeID(pubKey),
+ config: config,
+ toConsensus: make(chan interface{}, 1000),
+ toNode: make(chan interface{}, 1000),
+ sentRandomness: make(map[common.Hash]struct{}),
+ sentAgreement: make(map[common.Hash]struct{}),
+ blockCache: make(map[common.Hash]*types.Block),
+ unreceivedBlocks: make(map[common.Hash]chan<- common.Hash),
+ latencyModel: latency,
+ voteCache: make(
+ map[types.Position]map[types.VoteHeader]*types.Vote),
}
n.ctx, n.ctxCancel = context.WithCancel(context.Background())
// Construct transport layer.
@@ -96,23 +189,12 @@ func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel,
// PullBlocks implements core.Network interface.
func (n *Network) PullBlocks(hashes common.Hashes) {
- go func() {
- n.blockCacheLock.RLock()
- defer n.blockCacheLock.RUnlock()
- for _, hash := range hashes {
- // TODO(jimmy-dexon): request block from network instead of cache.
- if block, exist := n.blockCache[hash]; exist {
- n.toConsensus <- block
- continue
- }
- panic(fmt.Errorf("unknown block %s requested", hash))
- }
- }()
+ go n.pullBlocksAsync(hashes)
}
// PullVotes implements core.Network interface.
func (n *Network) PullVotes(pos types.Position) {
- // TODO(jimmy-dexon): implement this.
+ go n.pullVotesAsync(pos)
}
// BroadcastVote implements core.Network interface.
@@ -120,6 +202,7 @@ func (n *Network) BroadcastVote(vote *types.Vote) {
if err := n.trans.Broadcast(vote); err != nil {
panic(err)
}
+ n.addVoteToCache(vote)
}
// BroadcastBlock implements core.Network interface.
@@ -127,6 +210,7 @@ func (n *Network) BroadcastBlock(block *types.Block) {
if err := n.trans.Broadcast(block); err != nil {
panic(err)
}
+ n.addBlockToCache(block)
}
// BroadcastAgreementResult implements core.Network interface.
@@ -223,26 +307,32 @@ func (n *Network) Setup(serverEndpoint interface{}) (err error) {
if err != nil {
return
}
+ peerKeys := n.trans.Peers()
+ n.peers = make(map[types.NodeID]struct{})
+ for _, k := range peerKeys {
+ n.peers[types.NewNodeID(k)] = struct{}{}
+ }
return
}
func (n *Network) dispatchMsg(e *TransportEnvelope) {
switch v := e.Msg.(type) {
case *types.Block:
+ n.addBlockToCache(v)
+ // Notify pulling routine about the newly arrived block.
func() {
- n.blockCacheLock.Lock()
- defer n.blockCacheLock.Unlock()
- if len(n.blockCache) > 500 {
- // Randomly purge one block from cache.
- for k := range n.blockCache {
- delete(n.blockCache, k)
- break
- }
+ n.unreceivedBlocksLock.Lock()
+ defer n.unreceivedBlocksLock.Unlock()
+ if ch, exists := n.unreceivedBlocks[v.Hash]; exists {
+ ch <- v.Hash
}
- n.blockCache[v.Hash] = v
}()
n.toConsensus <- e.Msg
- case *types.Vote, *types.AgreementResult, *types.BlockRandomnessResult,
+ case *types.Vote:
+ // Add this vote to cache.
+ n.addVoteToCache(v)
+ n.toConsensus <- e.Msg
+ case *types.AgreementResult, *types.BlockRandomnessResult,
*typesDKG.PrivateShare, *typesDKG.PartialSignature:
n.toConsensus <- e.Msg
case packedStateChanges:
@@ -253,11 +343,54 @@ func (n *Network) dispatchMsg(e *TransportEnvelope) {
if err := n.stateModule.AddRequestsFromOthers([]byte(v)); err != nil {
panic(err)
}
+ case *PullRequest:
+ go n.handlePullRequest(v)
default:
n.toNode <- e.Msg
}
}
+func (n *Network) handlePullRequest(req *PullRequest) {
+ switch req.Type {
+ case "block":
+ hashes := req.Identity.(common.Hashes)
+ func() {
+ n.blockCacheLock.Lock()
+ defer n.blockCacheLock.Unlock()
+ All:
+ for _, h := range hashes {
+ b, exists := n.blockCache[h]
+ if !exists {
+ continue
+ }
+ select {
+ case <-n.ctx.Done():
+ break All
+ default:
+ }
+ if err := n.trans.Send(req.Requester, b); err != nil {
+ log.Println("unable to send block", req.Requester, err)
+ }
+ }
+ }()
+ case "vote":
+ pos := req.Identity.(types.Position)
+ func() {
+ n.voteCacheLock.Lock()
+ defer n.voteCacheLock.Unlock()
+ if votes, exists := n.voteCache[pos]; exists {
+ for _, v := range votes {
+ if err := n.trans.Send(req.Requester, v); err != nil {
+ log.Println("unable to send vote", req.Requester, err)
+ }
+ }
+ }
+ }()
+ default:
+ panic(fmt.Errorf("unknown type of pull request: %v", req.Type))
+ }
+}
+
// Run the main loop.
func (n *Network) Run() {
Loop:
@@ -320,3 +453,146 @@ func (n *Network) ReceiveChanForNode() <-chan interface{} {
func (n *Network) addStateModule(s *State) {
n.stateModule = s
}
+
+// appendRoundSetting updates essential info to network module for each round.
+func (n *Network) appendRoundSetting(
+ round uint64, notarySet map[types.NodeID]struct{}) {
+ n.notarySetLock.Lock()
+ defer n.notarySetLock.Unlock()
+ if len(n.notarySets) != 0 {
+ // This network module is already initialized, do some check against
+ // the inputs.
+ if round != n.notarySetMinRound+uint64(len(n.notarySets)) {
+ panic(fmt.Errorf(
+ "round not increasing when appending round setting: %v", round))
+ }
+ } else {
+ n.notarySetMinRound = round
+ }
+ n.notarySets = append(n.notarySets, notarySet)
+ // Purge cached notary sets.
+ if len(n.notarySets) > cachedNotarySetSize {
+ n.notarySets = n.notarySets[1:]
+ n.notarySetMinRound++
+ }
+ return
+}
+
+func (n *Network) pullBlocksAsync(hashes common.Hashes) {
+ // Setup notification channels for each block hash.
+ notYetReceived := make(map[common.Hash]struct{})
+ ch := make(chan common.Hash, len(hashes))
+ func() {
+ n.unreceivedBlocksLock.Lock()
+ defer n.unreceivedBlocksLock.Unlock()
+ for _, h := range hashes {
+ if _, exists := n.unreceivedBlocks[h]; exists {
+ panic(fmt.Errorf("attempting to pull one block multiple times"))
+ }
+ n.unreceivedBlocks[h] = ch
+ }
+ }()
+ // Clean all unreceived block entry in unrecelivedBlocks field when leaving.
+ defer func() {
+ n.unreceivedBlocksLock.Lock()
+ defer n.unreceivedBlocksLock.Unlock()
+ for _, h := range hashes {
+ delete(n.unreceivedBlocks, h)
+ }
+ }()
+ req := &PullRequest{
+ Requester: n.ID,
+ Type: "block",
+ Identity: hashes,
+ }
+ // Randomly pick peers to send pull requests.
+Loop:
+ for nID := range n.peers {
+ if err := n.trans.Send(nID, req); err != nil {
+ // Try next peer.
+ continue
+ }
+ select {
+ case <-n.ctx.Done():
+ break Loop
+ case <-time.After(2 * n.latencyModel.Delay()):
+ // Consume everything in the notification channel.
+ for {
+ select {
+ case h, ok := <-ch:
+ if !ok {
+ // This network module is closed.
+ break Loop
+ }
+ delete(notYetReceived, h)
+ if len(notYetReceived) == 0 {
+ break Loop
+ }
+ default:
+ continue Loop
+ }
+ }
+ }
+ }
+}
+
+func (n *Network) pullVotesAsync(pos types.Position) {
+ // Randomly pick several peers to pull votes from.
+ req := &PullRequest{
+ Requester: n.ID,
+ Type: "vote",
+ Identity: pos,
+ }
+ // Get corresponding notary set.
+ notarySet := func() map[types.NodeID]struct{} {
+ n.notarySetLock.Lock()
+ defer n.notarySetLock.Unlock()
+ return n.notarySets[pos.Round-n.notarySetMinRound]
+ }()
+ // Randomly select one peer from notary set and send a pull request.
+ sentCount := 0
+ for nID := range notarySet {
+ if err := n.trans.Send(nID, req); err != nil {
+ // Try next peer.
+ continue
+ }
+ sentCount++
+ if sentCount >= maxPullingPeerCount {
+ break
+ }
+ }
+}
+
+func (n *Network) addBlockToCache(b *types.Block) {
+ n.blockCacheLock.Lock()
+ defer n.blockCacheLock.Unlock()
+ if len(n.blockCache) > 1000 {
+ // Randomly purge one block from cache.
+ for k := range n.blockCache {
+ delete(n.blockCache, k)
+ break
+ }
+ }
+ n.blockCache[b.Hash] = b
+}
+
+func (n *Network) addVoteToCache(v *types.Vote) {
+ n.voteCacheLock.Lock()
+ defer n.voteCacheLock.Unlock()
+ if n.voteCacheSize >= 128 {
+ pos := n.votePositions[0]
+ n.voteCacheSize -= len(n.voteCache[pos])
+ delete(n.voteCache, pos)
+ n.votePositions = n.votePositions[1:]
+ }
+ if _, exists := n.voteCache[v.Position]; !exists {
+ n.votePositions = append(n.votePositions, v.Position)
+ n.voteCache[v.Position] =
+ make(map[types.VoteHeader]*types.Vote)
+ }
+ if _, exists := n.voteCache[v.Position][v.VoteHeader]; exists {
+ return
+ }
+ n.voteCache[v.Position][v.VoteHeader] = v
+ n.voteCacheSize++
+}
diff --git a/core/test/network_test.go b/core/test/network_test.go
new file mode 100644
index 0000000..fe11ed1
--- /dev/null
+++ b/core/test/network_test.go
@@ -0,0 +1,242 @@
+// Copyright 2018 The dexon-consensus Authors
+// This file is part of the dexon-consensus library.
+//
+// The dexon-consensus library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package test
+
+import (
+ "context"
+ "encoding/json"
+ "math/rand"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/dexon-foundation/dexon-consensus/common"
+ "github.com/dexon-foundation/dexon-consensus/core/types"
+ "github.com/stretchr/testify/suite"
+)
+
+type NetworkTestSuite struct {
+ suite.Suite
+}
+
+func (s *NetworkTestSuite) setupNetworks(
+ peerCount int) map[types.NodeID]*Network {
+ var (
+ server = NewFakeTransportServer()
+ wg sync.WaitGroup
+ )
+ serverChannel, err := server.Host()
+ s.Require().NoError(err)
+ // Setup several network modules.
+ _, pubKeys, err := NewKeys(peerCount)
+ s.Require().NoError(err)
+ networks := make(map[types.NodeID]*Network)
+ for _, key := range pubKeys {
+ n := NewNetwork(
+ key,
+ &FixedLatencyModel{},
+ NewDefaultMarshaller(nil),
+ NetworkConfig{Type: NetworkTypeFake})
+ networks[n.ID] = n
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ s.Require().NoError(n.Setup(serverChannel))
+ go n.Run()
+ }()
+ }
+ s.Require().NoError(server.WaitForPeers(uint32(peerCount)))
+ wg.Wait()
+ return networks
+}
+
+func (s *NetworkTestSuite) TestPullRequestMarshaling() {
+ // Verify pull request for blocks is able to be marshalled.
+ blockHashes := common.Hashes{
+ common.NewRandomHash(),
+ common.NewRandomHash(),
+ common.NewRandomHash(),
+ }
+ req := &PullRequest{
+ Requester: GenerateRandomNodeIDs(1)[0],
+ Type: "block",
+ Identity: blockHashes,
+ }
+ b, err := json.Marshal(req)
+ s.Require().NoError(err)
+ req2 := &PullRequest{}
+ s.Require().NoError(json.Unmarshal(b, req2))
+ s.Require().Equal(req.Requester, req2.Requester)
+ s.Require().Equal(req.Type, req2.Type)
+ s.Require().Equal(blockHashes, req2.Identity)
+ // Verify pull request for votes is able to be marshalled.
+ req = &PullRequest{
+ Requester: GenerateRandomNodeIDs(1)[0],
+ Type: "vote",
+ Identity: types.Position{
+ Round: 1,
+ ChainID: 2,
+ Height: 3,
+ }}
+ b, err = json.Marshal(req)
+ s.Require().NoError(err)
+ req2 = &PullRequest{}
+ s.Require().NoError(json.Unmarshal(b, req2))
+ s.Require().Equal(req.Requester, req2.Requester)
+ s.Require().Equal(req.Type, req2.Type)
+ s.Require().Equal(req.Identity.(types.Position).Round,
+ req.Identity.(types.Position).Round)
+ s.Require().Equal(req.Identity.(types.Position).ChainID,
+ req.Identity.(types.Position).ChainID)
+ s.Require().Equal(req.Identity.(types.Position).Height,
+ req.Identity.(types.Position).Height)
+}
+
+func (s *NetworkTestSuite) TestPullBlocks() {
+ var (
+ peerCount = 10
+ req = s.Require()
+ )
+ networks := s.setupNetworks(peerCount)
+ // Generate several random hashes.
+ hashes := common.Hashes{}
+ for range networks {
+ hashes = append(hashes, common.NewRandomHash())
+ }
+ // Randomly pick one network instance as master.
+ var master *Network
+ for _, master = range networks {
+ break
+ }
+ // Send a fake block to a random network (except master) by those hashes.
+ for _, h := range hashes {
+ for _, n := range networks {
+ if n.ID == master.ID {
+ continue
+ }
+ req.NoError(master.trans.Send(n.ID, &types.Block{Hash: h}))
+ }
+ }
+ // Initiate a pull request from network 0 by removing corresponding hash in
+ // hashes.
+ master.PullBlocks(hashes)
+ awaitMap := make(map[common.Hash]struct{})
+ for _, h := range hashes {
+ awaitMap[h] = struct{}{}
+ }
+ // We should be able to receive all hashes.
+ ctx, cancelFunc := context.WithTimeout(context.Background(), 3*time.Second)
+ defer func() { cancelFunc() }()
+ for {
+ select {
+ case v := <-master.ReceiveChan():
+ b, ok := v.(*types.Block)
+ if !ok {
+ break
+ }
+ delete(awaitMap, b.Hash)
+ case <-ctx.Done():
+ // This test case fails, we didn't receive pulled blocks.
+ req.False(true)
+ }
+ if len(awaitMap) == 0 {
+ break
+ }
+ }
+}
+
+func (s *NetworkTestSuite) TestPullVotes() {
+ // The functionality of pulling votes is not deterministic, so the test here
+ // only tries to "retry pulling votes until we can get some votes back".
+ var (
+ peerCount = 10
+ maxRound = uint64(5)
+ voteCount = 200
+ voteTestCount = 15
+ req = s.Require()
+ )
+ networks := s.setupNetworks(peerCount)
+ // Randomly pick one network instance as master.
+ var master *Network
+ for _, master = range networks {
+ break
+ }
+ // Prepare notary sets.
+ notarySets := []map[types.NodeID]struct{}{}
+ for i := uint64(0); i <= maxRound; i++ {
+ notarySets = append(notarySets, make(map[types.NodeID]struct{}))
+ }
+ // Randomly generate votes to random peers, except master.
+ votes := make(map[types.VoteHeader]*types.Vote)
+ randObj := rand.New(rand.NewSource(time.Now().UnixNano()))
+ for len(votes) < voteCount {
+ for _, n := range networks {
+ if n.ID == master.ID {
+ continue
+ }
+ v := types.NewVote(
+ types.VoteInit, common.NewRandomHash(), randObj.Uint64())
+ v.Position = types.Position{
+ ChainID: randObj.Uint32(),
+ Height: randObj.Uint64(),
+ Round: uint64(randObj.Intn(int(maxRound + 1))),
+ }
+ req.NoError(master.trans.Send(n.ID, v))
+ votes[v.VoteHeader] = v
+ // Add this node to corresponding notary set for this vote.
+ notarySets[v.Position.Round][n.ID] = struct{}{}
+ }
+ }
+ // Let master knows all notary sets.
+ for i, notarySet := range notarySets {
+ master.appendRoundSetting(uint64(i), notarySet)
+ }
+ // Randomly generate votes set to test.
+ votesToTest := make(map[types.VoteHeader]struct{})
+ for len(votesToTest) < voteTestCount {
+ // Randomly pick a vote
+ for _, v := range votes {
+ votesToTest[v.VoteHeader] = struct{}{}
+ break
+ }
+ }
+ // Try to pull all votes with timeout.
+ ctx, cancelFunc := context.WithTimeout(context.Background(), 3*time.Second)
+ defer func() { cancelFunc() }()
+ for len(votesToTest) > 0 {
+ for vHeader := range votesToTest {
+ master.PullVotes(vHeader.Position)
+ break
+ }
+ select {
+ case v := <-master.ReceiveChan():
+ vv, ok := v.(*types.Vote)
+ if !ok {
+ break
+ }
+ delete(votesToTest, vv.VoteHeader)
+ case <-ctx.Done():
+ req.True(false)
+ default:
+ }
+ }
+}
+
+func TestNetwork(t *testing.T) {
+ suite.Run(t, new(NetworkTestSuite))
+}