From 81372e5746fedf0ad691ab628096b7caefbe3008 Mon Sep 17 00:00:00 2001 From: Mission Liao Date: Fri, 9 Nov 2018 22:15:28 +0800 Subject: test: implement pulling in network layer (#314) * Add definition for test.PullRequest * Cache notary sets for each round in network module * Cache peers as nodeID in network module. * Implement pull blocks * Implement pull vote --- core/test/governance.go | 10 ++ core/test/marshaller.go | 9 ++ core/test/network.go | 362 ++++++++++++++++++++++++++++++++++++++++------ core/test/network_test.go | 242 +++++++++++++++++++++++++++++++ 4 files changed, 580 insertions(+), 43 deletions(-) create mode 100644 core/test/network_test.go (limited to 'core') diff --git a/core/test/governance.go b/core/test/governance.go index e6b6232..f27139f 100644 --- a/core/test/governance.go +++ b/core/test/governance.go @@ -232,6 +232,16 @@ func (g *Governance) CatchUpWithRound(round uint64) { config, nodeSet := g.stateModule.Snapshot() g.configs = append(g.configs, config) g.nodeSets = append(g.nodeSets, nodeSet) + if g.networkModule == nil { + continue + } + // Notify network module for new notary set. + round := uint64(len(g.configs)) - 1 + notarySet := make(map[types.NodeID]struct{}) + for _, k := range g.nodeSets[round] { + notarySet[types.NewNodeID(k)] = struct{}{} + } + g.networkModule.appendRoundSetting(round, notarySet) } } diff --git a/core/test/marshaller.go b/core/test/marshaller.go index a1b15b6..5f15e11 100644 --- a/core/test/marshaller.go +++ b/core/test/marshaller.go @@ -101,6 +101,12 @@ func (m *DefaultMarshaller) Unmarshal( break } msg = *packed + case "pull-request": + req := &PullRequest{} + if err = json.Unmarshal(payload, req); err != nil { + break + } + msg = req default: if m.fallback == nil { err = fmt.Errorf("unknown msg type: %v", msgType) @@ -145,6 +151,9 @@ func (m *DefaultMarshaller) Marshal( case packedStateChanges: msgType = "packed-state-changes" payload, err = json.Marshal(msg) + case *PullRequest: + msgType = "pull-request" + payload, err = json.Marshal(msg) default: if m.fallback == nil { err = fmt.Errorf("unknwon message type: %v", msg) diff --git a/core/test/network.go b/core/test/network.go index 0d92af0..949b158 100644 --- a/core/test/network.go +++ b/core/test/network.go @@ -19,11 +19,14 @@ package test import ( "context" + "encoding/json" "errors" "fmt" + "log" "net" "strconv" "sync" + "time" "github.com/dexon-foundation/dexon-consensus/common" "github.com/dexon-foundation/dexon-consensus/core/crypto" @@ -31,6 +34,13 @@ import ( typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg" ) +const ( + // Count of rounds of notary set cached in network module. + cachedNotarySetSize = 10 + // Count of maximum count of peers to pull votes from. + maxPullingPeerCount = 3 +) + // NetworkType is the simulation network type. type NetworkType string @@ -48,22 +58,100 @@ type NetworkConfig struct { PeerPort int } +// PullRequest is a generic request to pull everything (ex. vote, block...). +type PullRequest struct { + Requester types.NodeID + Type string + Identity interface{} +} + +// MarshalJSON implements json.Marshaller. +func (req *PullRequest) MarshalJSON() (b []byte, err error) { + var idAsBytes []byte + // Make sure caller prepare correct identity for pull requests. + switch req.Type { + case "block": + idAsBytes, err = json.Marshal(req.Identity.(common.Hashes)) + case "vote": + idAsBytes, err = json.Marshal(req.Identity.(types.Position)) + default: + err = fmt.Errorf("unknown ID type for pull request: %v", req.Type) + } + if err != nil { + return + } + b, err = json.Marshal(&struct { + Requester types.NodeID `json:"req"` + Type string `json:"type"` + Identity []byte `json:"id"` + }{req.Requester, req.Type, idAsBytes}) + return +} + +// UnmarshalJSON iumplements json.Unmarshaller. +func (req *PullRequest) UnmarshalJSON(data []byte) (err error) { + rawReq := &struct { + Requester types.NodeID `json:"req"` + Type string `json:"type"` + Identity []byte `json:"id"` + }{} + if err = json.Unmarshal(data, rawReq); err != nil { + return + } + var ID interface{} + switch rawReq.Type { + case "block": + hashes := common.Hashes{} + if err = json.Unmarshal(rawReq.Identity, &hashes); err != nil { + break + } + ID = hashes + case "vote": + pos := types.Position{} + if err = json.Unmarshal(rawReq.Identity, &pos); err != nil { + break + } + ID = pos + default: + err = fmt.Errorf("unknown pull request type: %v", rawReq.Type) + } + if err != nil { + return + } + req.Requester = rawReq.Requester + req.Type = rawReq.Type + req.Identity = ID + return +} + // Network implements core.Network interface based on TransportClient. type Network struct { - config NetworkConfig - ctx context.Context - ctxCancel context.CancelFunc - trans TransportClient - fromTransport <-chan *TransportEnvelope - toConsensus chan interface{} - toNode chan interface{} - sentRandomnessLock sync.Mutex - sentRandomness map[common.Hash]struct{} - sentAgreementLock sync.Mutex - sentAgreement map[common.Hash]struct{} - blockCacheLock sync.RWMutex - blockCache map[common.Hash]*types.Block - stateModule *State + ID types.NodeID + config NetworkConfig + ctx context.Context + ctxCancel context.CancelFunc + trans TransportClient + fromTransport <-chan *TransportEnvelope + toConsensus chan interface{} + toNode chan interface{} + sentRandomnessLock sync.Mutex + sentRandomness map[common.Hash]struct{} + sentAgreementLock sync.Mutex + sentAgreement map[common.Hash]struct{} + blockCacheLock sync.RWMutex + blockCache map[common.Hash]*types.Block + voteCacheLock sync.RWMutex + voteCache map[types.Position]map[types.VoteHeader]*types.Vote + voteCacheSize int + votePositions []types.Position + stateModule *State + notarySetLock sync.RWMutex + notarySets []map[types.NodeID]struct{} + notarySetMinRound uint64 + peers map[types.NodeID]struct{} + unreceivedBlocksLock sync.RWMutex + unreceivedBlocks map[common.Hash]chan<- common.Hash + latencyModel LatencyModel } // NewNetwork setup network stuffs for nodes, which provides an @@ -72,12 +160,17 @@ func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel, marshaller Marshaller, config NetworkConfig) (n *Network) { // Construct basic network instance. n = &Network{ - config: config, - toConsensus: make(chan interface{}, 1000), - toNode: make(chan interface{}, 1000), - sentRandomness: make(map[common.Hash]struct{}), - sentAgreement: make(map[common.Hash]struct{}), - blockCache: make(map[common.Hash]*types.Block), + ID: types.NewNodeID(pubKey), + config: config, + toConsensus: make(chan interface{}, 1000), + toNode: make(chan interface{}, 1000), + sentRandomness: make(map[common.Hash]struct{}), + sentAgreement: make(map[common.Hash]struct{}), + blockCache: make(map[common.Hash]*types.Block), + unreceivedBlocks: make(map[common.Hash]chan<- common.Hash), + latencyModel: latency, + voteCache: make( + map[types.Position]map[types.VoteHeader]*types.Vote), } n.ctx, n.ctxCancel = context.WithCancel(context.Background()) // Construct transport layer. @@ -96,23 +189,12 @@ func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel, // PullBlocks implements core.Network interface. func (n *Network) PullBlocks(hashes common.Hashes) { - go func() { - n.blockCacheLock.RLock() - defer n.blockCacheLock.RUnlock() - for _, hash := range hashes { - // TODO(jimmy-dexon): request block from network instead of cache. - if block, exist := n.blockCache[hash]; exist { - n.toConsensus <- block - continue - } - panic(fmt.Errorf("unknown block %s requested", hash)) - } - }() + go n.pullBlocksAsync(hashes) } // PullVotes implements core.Network interface. func (n *Network) PullVotes(pos types.Position) { - // TODO(jimmy-dexon): implement this. + go n.pullVotesAsync(pos) } // BroadcastVote implements core.Network interface. @@ -120,6 +202,7 @@ func (n *Network) BroadcastVote(vote *types.Vote) { if err := n.trans.Broadcast(vote); err != nil { panic(err) } + n.addVoteToCache(vote) } // BroadcastBlock implements core.Network interface. @@ -127,6 +210,7 @@ func (n *Network) BroadcastBlock(block *types.Block) { if err := n.trans.Broadcast(block); err != nil { panic(err) } + n.addBlockToCache(block) } // BroadcastAgreementResult implements core.Network interface. @@ -223,26 +307,32 @@ func (n *Network) Setup(serverEndpoint interface{}) (err error) { if err != nil { return } + peerKeys := n.trans.Peers() + n.peers = make(map[types.NodeID]struct{}) + for _, k := range peerKeys { + n.peers[types.NewNodeID(k)] = struct{}{} + } return } func (n *Network) dispatchMsg(e *TransportEnvelope) { switch v := e.Msg.(type) { case *types.Block: + n.addBlockToCache(v) + // Notify pulling routine about the newly arrived block. func() { - n.blockCacheLock.Lock() - defer n.blockCacheLock.Unlock() - if len(n.blockCache) > 500 { - // Randomly purge one block from cache. - for k := range n.blockCache { - delete(n.blockCache, k) - break - } + n.unreceivedBlocksLock.Lock() + defer n.unreceivedBlocksLock.Unlock() + if ch, exists := n.unreceivedBlocks[v.Hash]; exists { + ch <- v.Hash } - n.blockCache[v.Hash] = v }() n.toConsensus <- e.Msg - case *types.Vote, *types.AgreementResult, *types.BlockRandomnessResult, + case *types.Vote: + // Add this vote to cache. + n.addVoteToCache(v) + n.toConsensus <- e.Msg + case *types.AgreementResult, *types.BlockRandomnessResult, *typesDKG.PrivateShare, *typesDKG.PartialSignature: n.toConsensus <- e.Msg case packedStateChanges: @@ -253,11 +343,54 @@ func (n *Network) dispatchMsg(e *TransportEnvelope) { if err := n.stateModule.AddRequestsFromOthers([]byte(v)); err != nil { panic(err) } + case *PullRequest: + go n.handlePullRequest(v) default: n.toNode <- e.Msg } } +func (n *Network) handlePullRequest(req *PullRequest) { + switch req.Type { + case "block": + hashes := req.Identity.(common.Hashes) + func() { + n.blockCacheLock.Lock() + defer n.blockCacheLock.Unlock() + All: + for _, h := range hashes { + b, exists := n.blockCache[h] + if !exists { + continue + } + select { + case <-n.ctx.Done(): + break All + default: + } + if err := n.trans.Send(req.Requester, b); err != nil { + log.Println("unable to send block", req.Requester, err) + } + } + }() + case "vote": + pos := req.Identity.(types.Position) + func() { + n.voteCacheLock.Lock() + defer n.voteCacheLock.Unlock() + if votes, exists := n.voteCache[pos]; exists { + for _, v := range votes { + if err := n.trans.Send(req.Requester, v); err != nil { + log.Println("unable to send vote", req.Requester, err) + } + } + } + }() + default: + panic(fmt.Errorf("unknown type of pull request: %v", req.Type)) + } +} + // Run the main loop. func (n *Network) Run() { Loop: @@ -320,3 +453,146 @@ func (n *Network) ReceiveChanForNode() <-chan interface{} { func (n *Network) addStateModule(s *State) { n.stateModule = s } + +// appendRoundSetting updates essential info to network module for each round. +func (n *Network) appendRoundSetting( + round uint64, notarySet map[types.NodeID]struct{}) { + n.notarySetLock.Lock() + defer n.notarySetLock.Unlock() + if len(n.notarySets) != 0 { + // This network module is already initialized, do some check against + // the inputs. + if round != n.notarySetMinRound+uint64(len(n.notarySets)) { + panic(fmt.Errorf( + "round not increasing when appending round setting: %v", round)) + } + } else { + n.notarySetMinRound = round + } + n.notarySets = append(n.notarySets, notarySet) + // Purge cached notary sets. + if len(n.notarySets) > cachedNotarySetSize { + n.notarySets = n.notarySets[1:] + n.notarySetMinRound++ + } + return +} + +func (n *Network) pullBlocksAsync(hashes common.Hashes) { + // Setup notification channels for each block hash. + notYetReceived := make(map[common.Hash]struct{}) + ch := make(chan common.Hash, len(hashes)) + func() { + n.unreceivedBlocksLock.Lock() + defer n.unreceivedBlocksLock.Unlock() + for _, h := range hashes { + if _, exists := n.unreceivedBlocks[h]; exists { + panic(fmt.Errorf("attempting to pull one block multiple times")) + } + n.unreceivedBlocks[h] = ch + } + }() + // Clean all unreceived block entry in unrecelivedBlocks field when leaving. + defer func() { + n.unreceivedBlocksLock.Lock() + defer n.unreceivedBlocksLock.Unlock() + for _, h := range hashes { + delete(n.unreceivedBlocks, h) + } + }() + req := &PullRequest{ + Requester: n.ID, + Type: "block", + Identity: hashes, + } + // Randomly pick peers to send pull requests. +Loop: + for nID := range n.peers { + if err := n.trans.Send(nID, req); err != nil { + // Try next peer. + continue + } + select { + case <-n.ctx.Done(): + break Loop + case <-time.After(2 * n.latencyModel.Delay()): + // Consume everything in the notification channel. + for { + select { + case h, ok := <-ch: + if !ok { + // This network module is closed. + break Loop + } + delete(notYetReceived, h) + if len(notYetReceived) == 0 { + break Loop + } + default: + continue Loop + } + } + } + } +} + +func (n *Network) pullVotesAsync(pos types.Position) { + // Randomly pick several peers to pull votes from. + req := &PullRequest{ + Requester: n.ID, + Type: "vote", + Identity: pos, + } + // Get corresponding notary set. + notarySet := func() map[types.NodeID]struct{} { + n.notarySetLock.Lock() + defer n.notarySetLock.Unlock() + return n.notarySets[pos.Round-n.notarySetMinRound] + }() + // Randomly select one peer from notary set and send a pull request. + sentCount := 0 + for nID := range notarySet { + if err := n.trans.Send(nID, req); err != nil { + // Try next peer. + continue + } + sentCount++ + if sentCount >= maxPullingPeerCount { + break + } + } +} + +func (n *Network) addBlockToCache(b *types.Block) { + n.blockCacheLock.Lock() + defer n.blockCacheLock.Unlock() + if len(n.blockCache) > 1000 { + // Randomly purge one block from cache. + for k := range n.blockCache { + delete(n.blockCache, k) + break + } + } + n.blockCache[b.Hash] = b +} + +func (n *Network) addVoteToCache(v *types.Vote) { + n.voteCacheLock.Lock() + defer n.voteCacheLock.Unlock() + if n.voteCacheSize >= 128 { + pos := n.votePositions[0] + n.voteCacheSize -= len(n.voteCache[pos]) + delete(n.voteCache, pos) + n.votePositions = n.votePositions[1:] + } + if _, exists := n.voteCache[v.Position]; !exists { + n.votePositions = append(n.votePositions, v.Position) + n.voteCache[v.Position] = + make(map[types.VoteHeader]*types.Vote) + } + if _, exists := n.voteCache[v.Position][v.VoteHeader]; exists { + return + } + n.voteCache[v.Position][v.VoteHeader] = v + n.voteCacheSize++ +} diff --git a/core/test/network_test.go b/core/test/network_test.go new file mode 100644 index 0000000..fe11ed1 --- /dev/null +++ b/core/test/network_test.go @@ -0,0 +1,242 @@ +// Copyright 2018 The dexon-consensus Authors +// This file is part of the dexon-consensus library. +// +// The dexon-consensus library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus library. If not, see +// . + +package test + +import ( + "context" + "encoding/json" + "math/rand" + "sync" + "testing" + "time" + + "github.com/dexon-foundation/dexon-consensus/common" + "github.com/dexon-foundation/dexon-consensus/core/types" + "github.com/stretchr/testify/suite" +) + +type NetworkTestSuite struct { + suite.Suite +} + +func (s *NetworkTestSuite) setupNetworks( + peerCount int) map[types.NodeID]*Network { + var ( + server = NewFakeTransportServer() + wg sync.WaitGroup + ) + serverChannel, err := server.Host() + s.Require().NoError(err) + // Setup several network modules. + _, pubKeys, err := NewKeys(peerCount) + s.Require().NoError(err) + networks := make(map[types.NodeID]*Network) + for _, key := range pubKeys { + n := NewNetwork( + key, + &FixedLatencyModel{}, + NewDefaultMarshaller(nil), + NetworkConfig{Type: NetworkTypeFake}) + networks[n.ID] = n + wg.Add(1) + go func() { + defer wg.Done() + s.Require().NoError(n.Setup(serverChannel)) + go n.Run() + }() + } + s.Require().NoError(server.WaitForPeers(uint32(peerCount))) + wg.Wait() + return networks +} + +func (s *NetworkTestSuite) TestPullRequestMarshaling() { + // Verify pull request for blocks is able to be marshalled. + blockHashes := common.Hashes{ + common.NewRandomHash(), + common.NewRandomHash(), + common.NewRandomHash(), + } + req := &PullRequest{ + Requester: GenerateRandomNodeIDs(1)[0], + Type: "block", + Identity: blockHashes, + } + b, err := json.Marshal(req) + s.Require().NoError(err) + req2 := &PullRequest{} + s.Require().NoError(json.Unmarshal(b, req2)) + s.Require().Equal(req.Requester, req2.Requester) + s.Require().Equal(req.Type, req2.Type) + s.Require().Equal(blockHashes, req2.Identity) + // Verify pull request for votes is able to be marshalled. + req = &PullRequest{ + Requester: GenerateRandomNodeIDs(1)[0], + Type: "vote", + Identity: types.Position{ + Round: 1, + ChainID: 2, + Height: 3, + }} + b, err = json.Marshal(req) + s.Require().NoError(err) + req2 = &PullRequest{} + s.Require().NoError(json.Unmarshal(b, req2)) + s.Require().Equal(req.Requester, req2.Requester) + s.Require().Equal(req.Type, req2.Type) + s.Require().Equal(req.Identity.(types.Position).Round, + req.Identity.(types.Position).Round) + s.Require().Equal(req.Identity.(types.Position).ChainID, + req.Identity.(types.Position).ChainID) + s.Require().Equal(req.Identity.(types.Position).Height, + req.Identity.(types.Position).Height) +} + +func (s *NetworkTestSuite) TestPullBlocks() { + var ( + peerCount = 10 + req = s.Require() + ) + networks := s.setupNetworks(peerCount) + // Generate several random hashes. + hashes := common.Hashes{} + for range networks { + hashes = append(hashes, common.NewRandomHash()) + } + // Randomly pick one network instance as master. + var master *Network + for _, master = range networks { + break + } + // Send a fake block to a random network (except master) by those hashes. + for _, h := range hashes { + for _, n := range networks { + if n.ID == master.ID { + continue + } + req.NoError(master.trans.Send(n.ID, &types.Block{Hash: h})) + } + } + // Initiate a pull request from network 0 by removing corresponding hash in + // hashes. + master.PullBlocks(hashes) + awaitMap := make(map[common.Hash]struct{}) + for _, h := range hashes { + awaitMap[h] = struct{}{} + } + // We should be able to receive all hashes. + ctx, cancelFunc := context.WithTimeout(context.Background(), 3*time.Second) + defer func() { cancelFunc() }() + for { + select { + case v := <-master.ReceiveChan(): + b, ok := v.(*types.Block) + if !ok { + break + } + delete(awaitMap, b.Hash) + case <-ctx.Done(): + // This test case fails, we didn't receive pulled blocks. + req.False(true) + } + if len(awaitMap) == 0 { + break + } + } +} + +func (s *NetworkTestSuite) TestPullVotes() { + // The functionality of pulling votes is not deterministic, so the test here + // only tries to "retry pulling votes until we can get some votes back". + var ( + peerCount = 10 + maxRound = uint64(5) + voteCount = 200 + voteTestCount = 15 + req = s.Require() + ) + networks := s.setupNetworks(peerCount) + // Randomly pick one network instance as master. + var master *Network + for _, master = range networks { + break + } + // Prepare notary sets. + notarySets := []map[types.NodeID]struct{}{} + for i := uint64(0); i <= maxRound; i++ { + notarySets = append(notarySets, make(map[types.NodeID]struct{})) + } + // Randomly generate votes to random peers, except master. + votes := make(map[types.VoteHeader]*types.Vote) + randObj := rand.New(rand.NewSource(time.Now().UnixNano())) + for len(votes) < voteCount { + for _, n := range networks { + if n.ID == master.ID { + continue + } + v := types.NewVote( + types.VoteInit, common.NewRandomHash(), randObj.Uint64()) + v.Position = types.Position{ + ChainID: randObj.Uint32(), + Height: randObj.Uint64(), + Round: uint64(randObj.Intn(int(maxRound + 1))), + } + req.NoError(master.trans.Send(n.ID, v)) + votes[v.VoteHeader] = v + // Add this node to corresponding notary set for this vote. + notarySets[v.Position.Round][n.ID] = struct{}{} + } + } + // Let master knows all notary sets. + for i, notarySet := range notarySets { + master.appendRoundSetting(uint64(i), notarySet) + } + // Randomly generate votes set to test. + votesToTest := make(map[types.VoteHeader]struct{}) + for len(votesToTest) < voteTestCount { + // Randomly pick a vote + for _, v := range votes { + votesToTest[v.VoteHeader] = struct{}{} + break + } + } + // Try to pull all votes with timeout. + ctx, cancelFunc := context.WithTimeout(context.Background(), 3*time.Second) + defer func() { cancelFunc() }() + for len(votesToTest) > 0 { + for vHeader := range votesToTest { + master.PullVotes(vHeader.Position) + break + } + select { + case v := <-master.ReceiveChan(): + vv, ok := v.(*types.Vote) + if !ok { + break + } + delete(votesToTest, vv.VoteHeader) + case <-ctx.Done(): + req.True(false) + default: + } + } +} + +func TestNetwork(t *testing.T) { + suite.Run(t, new(NetworkTestSuite)) +} -- cgit v1.2.3