From 5fc0efa940c7663a33d0fc501807a2627d2cb573 Mon Sep 17 00:00:00 2001 From: Mission Liao Date: Fri, 28 Sep 2018 12:32:50 +0800 Subject: core: hide types.NodeID from full node. (#147) * Refine core.Governance interface - Remove types.NodeID from interface declaration. - All parameter should be round based. * Add core.NodeSetCache * Agreement accepts map of nodeID directly. * test.Transport.Peers method return public keys. --- core/agreement-state_test.go | 19 +++--- core/agreement.go | 11 ++- core/agreement_test.go | 11 +-- core/consensus.go | 139 ++++++++++++++----------------------- core/consensus_test.go | 44 +++++------- core/crypto/ecdsa/ecdsa.go | 6 ++ core/interfaces.go | 4 +- core/nodeset-cache.go | 158 +++++++++++++++++++++++++++++++++++++++++++ core/nodeset-cache_test.go | 114 +++++++++++++++++++++++++++++++ core/test/fake-transport.go | 34 ++++++---- core/test/governance.go | 29 +++----- core/test/interface.go | 5 +- core/test/tcp-transport.go | 101 +++++++++++++++++++-------- core/test/transport_test.go | 22 +++--- core/test/utils.go | 14 ++++ integration_test/node.go | 6 +- integration_test/utils.go | 24 ++----- simulation/governance.go | 25 ++++--- simulation/network.go | 15 ++-- simulation/node.go | 10 +-- simulation/peer-server.go | 4 +- 21 files changed, 544 insertions(+), 251 deletions(-) create mode 100644 core/nodeset-cache.go create mode 100644 core/nodeset-cache_test.go diff --git a/core/agreement-state_test.go b/core/agreement-state_test.go index 74ed7a0..c09dcfd 100644 --- a/core/agreement-state_test.go +++ b/core/agreement-state_test.go @@ -32,7 +32,7 @@ import ( type AgreementStateTestSuite struct { suite.Suite ID types.NodeID - prvKey map[types.NodeID]crypto.PrivateKey + prvKeys map[types.NodeID]crypto.PrivateKey voteChan chan *types.Vote blockChan chan common.Hash confirmChan chan common.Hash @@ -62,7 +62,7 @@ func (s *AgreementStateTestSuite) proposeBlock( Hash: common.NewRandomHash(), } s.block[block.Hash] = block - s.Require().Nil(leader.prepareBlock(block, s.prvKey[s.ID])) + s.Require().Nil(leader.prepareBlock(block, s.prvKeys[s.ID])) return block } @@ -70,7 +70,7 @@ func (s *AgreementStateTestSuite) prepareVote( nID types.NodeID, voteType types.VoteType, blockHash common.Hash, period uint64) ( vote *types.Vote) { - prvKey, exist := s.prvKey[nID] + prvKey, exist := s.prvKeys[nID] s.Require().True(exist) vote = &types.Vote{ ProposerID: nID, @@ -88,7 +88,7 @@ func (s *AgreementStateTestSuite) SetupTest() { prvKey, err := ecdsa.NewPrivateKey() s.Require().Nil(err) s.ID = types.NewNodeID(prvKey.PublicKey()) - s.prvKey = map[types.NodeID]crypto.PrivateKey{ + s.prvKeys = map[types.NodeID]crypto.PrivateKey{ s.ID: prvKey, } s.voteChan = make(chan *types.Vote, 100) @@ -103,14 +103,15 @@ func (s *AgreementStateTestSuite) newAgreement(numNode int) *agreement { return s.proposeBlock(leader) } - notarySet := make(types.NodeIDs, numNode-1) - for i := range notarySet { + notarySet := make(map[types.NodeID]struct{}) + for i := 0; i < numNode-1; i++ { prvKey, err := ecdsa.NewPrivateKey() s.Require().Nil(err) - notarySet[i] = types.NewNodeID(prvKey.PublicKey()) - s.prvKey[notarySet[i]] = prvKey + nID := types.NewNodeID(prvKey.PublicKey()) + notarySet[nID] = struct{}{} + s.prvKeys[nID] = prvKey } - notarySet = append(notarySet, s.ID) + notarySet[s.ID] = struct{}{} agreement := newAgreement( s.ID, &agreementStateTestReceiver{s}, diff --git a/core/agreement.go b/core/agreement.go index 65711d5..d9e7432 100644 --- a/core/agreement.go +++ b/core/agreement.go @@ -116,7 +116,7 @@ type agreement struct { func newAgreement( ID types.NodeID, recv agreementReceiver, - notarySet types.NodeIDs, + notarySet map[types.NodeID]struct{}, leader *leaderSelector, blockProposer blockProposerFn) *agreement { agreement := &agreement{ @@ -141,7 +141,9 @@ func (a *agreement) terminate() { } // restart the agreement -func (a *agreement) restart(notarySet types.NodeIDs, aID types.Position) { +func (a *agreement) restart( + notarySet map[types.NodeID]struct{}, aID types.Position) { + func() { a.lock.Lock() defer a.lock.Unlock() @@ -158,10 +160,7 @@ func (a *agreement) restart(notarySet types.NodeIDs, aID types.Position) { a.data.defaultBlock = common.Hash{} a.hasOutput = false a.state = newPrepareState(a.data) - a.notarySet = make(map[types.NodeID]struct{}) - for _, v := range notarySet { - a.notarySet[v] = struct{}{} - } + a.notarySet = notarySet a.candidateBlock = make(map[common.Hash]*types.Block) a.aID.Store(aID) }() diff --git a/core/agreement_test.go b/core/agreement_test.go index 2c98181..d384dc6 100644 --- a/core/agreement_test.go +++ b/core/agreement_test.go @@ -86,14 +86,15 @@ func (s *AgreementTestSuite) newAgreement(numNotarySet int) *agreement { return s.proposeBlock(agreementIdx) } - notarySet := make(types.NodeIDs, numNotarySet-1) - for i := range notarySet { + notarySet := make(map[types.NodeID]struct{}) + for i := 0; i < numNotarySet-1; i++ { prvKey, err := ecdsa.NewPrivateKey() s.Require().Nil(err) - notarySet[i] = types.NewNodeID(prvKey.PublicKey()) - s.prvKey[notarySet[i]] = prvKey + nID := types.NewNodeID(prvKey.PublicKey()) + notarySet[nID] = struct{}{} + s.prvKey[nID] = prvKey } - notarySet = append(notarySet, s.ID) + notarySet[s.ID] = struct{}{} agreement := newAgreement( s.ID, &agreementTestReceiver{s}, diff --git a/core/consensus.go b/core/consensus.go index d7dc3a4..f1f4a2e 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -21,7 +21,6 @@ import ( "context" "fmt" "log" - "sort" "sync" "time" @@ -43,8 +42,8 @@ func (e *ErrMissingBlockInfo) Error() string { // Errors for consensus core. var ( - ErrProposerNotInNotarySet = fmt.Errorf( - "proposer is not in notary set") + ErrProposerNotInNodeSet = fmt.Errorf( + "proposer is not in node set") ErrIncorrectHash = fmt.Errorf( "hash of block is incorrect") ErrIncorrectSignature = fmt.Errorf( @@ -109,10 +108,11 @@ func (recv *consensusReceiver) ConfirmBlock(hash common.Hash) { // consensusDKGReceiver implements dkgReceiver. type consensusDKGReceiver struct { - ID types.NodeID - gov Governance - prvKey crypto.PrivateKey - network Network + ID types.NodeID + gov Governance + prvKey crypto.PrivateKey + nodeSetCache *NodeSetCache + network Network } // ProposeDKGComplaint proposes a DKGComplaint. @@ -148,7 +148,12 @@ func (recv *consensusDKGReceiver) ProposeDKGPrivateShare( log.Println(err) return } - recv.network.SendDKGPrivateShare(prv.ReceiverID, prv) + receiverPubKey, exists := recv.nodeSetCache.GetPublicKey(prv.ReceiverID) + if !exists { + log.Println("public key for receiver not found") + return + } + recv.network.SendDKGPrivateShare(receiverPubKey, prv) } // ProposeDKGAntiNackComplaint propose a DKGPrivateShare as an anti complaint. @@ -197,10 +202,10 @@ type Consensus struct { tickerObj Ticker // Misc. - notarySet map[types.NodeID]struct{} - lock sync.RWMutex - ctx context.Context - ctxCancel context.CancelFunc + nodeSetCache *NodeSetCache + lock sync.RWMutex + ctx context.Context + ctxCancel context.CancelFunc } // NewConsensus construct an Consensus instance. @@ -211,49 +216,47 @@ func NewConsensus( network Network, prv crypto.PrivateKey) *Consensus { - // TODO(w): load latest round from DB. + // TODO(w): load latest blockHeight from DB, and use config at that height. var round uint64 config := gov.GetConfiguration(round) - // TODO(w): notarySet is different for each chain, need to write a // GetNotarySetForChain(nodeSet, shardID, chainID, crs) function to get the // correct notary set for a given chain. - notarySet := gov.GetNodeSet(round) + nodeSetCache := NewNodeSetCache(gov) crs := gov.GetCRS(round) - - ID := types.NewNodeID(prv.PublicKey()) - // Setup acking by information returned from Governace. + nodes, err := nodeSetCache.GetNodeIDs(0) + if err != nil { + panic(err) + } rb := newReliableBroadcast() rb.setChainNum(config.NumChains) - for nID := range notarySet { + for nID := range nodes { rb.addNode(nID) } // Setup context. ctx, ctxCancel := context.WithCancel(context.Background()) // Setup sequencer by information returned from Governace. - var nodes types.NodeIDs - for nID := range notarySet { - nodes = append(nodes, nID) - } to := newTotalOrdering( uint64(config.K), - uint64(float32(len(notarySet)-1)*config.PhiRatio+1), + uint64(float32(len(nodes)-1)*config.PhiRatio+1), config.NumChains) + ID := types.NewNodeID(prv.PublicKey()) cfgModule := newConfigurationChain( ID, &consensusDKGReceiver{ - ID: ID, - gov: gov, - prvKey: prv, - network: network, + ID: ID, + gov: gov, + prvKey: prv, + nodeSetCache: nodeSetCache, + network: network, }, gov) // Register DKG for the initial round. This is a temporary function call for // simulation. - cfgModule.registerDKG(0, len(notarySet)/3) + cfgModule.registerDKG(0, len(nodes)/3) // Check if the application implement Debug interface. debug, _ := app.(Debug) @@ -272,7 +275,7 @@ func NewConsensus( prvKey: prv, dkgReady: sync.NewCond(&sync.Mutex{}), cfgModule: cfgModule, - notarySet: notarySet, + nodeSetCache: nodeSetCache, ctx: ctx, ctxCancel: ctxCancel, } @@ -332,10 +335,9 @@ func (con *Consensus) Run() { func (con *Consensus) runBA(chainID uint32, tick <-chan struct{}) { // TODO(jimmy-dexon): move this function inside agreement. - - nodes := make(types.NodeIDs, 0, len(con.notarySet)) - for nID := range con.notarySet { - nodes = append(nodes, nID) + nodes, err := con.nodeSetCache.GetNodeIDs(0) + if err != nil { + panic(err) } agreement := con.baModules[chainID] recv := con.receivers[chainID] @@ -390,8 +392,13 @@ func (con *Consensus) runDKGTSIG() { if err := con.cfgModule.runDKG(round); err != nil { panic(err) } + nodes, err := con.nodeSetCache.GetNodeIDs(0) + if err != nil { + // TODO(mission): should be done in some bootstrap routine. + panic(err) + } hash := HashConfigurationBlock( - con.gov.GetNodeSet(0), + nodes, con.gov.GetConfiguration(0), common.Hash{}, con.cfgModule.prevHash) @@ -415,58 +422,6 @@ func (con *Consensus) runDKGTSIG() { // RunLegacy starts running Legacy DEXON Consensus. func (con *Consensus) RunLegacy() { - go con.processMsg(con.network.ReceiveChan(), con.processBlock) - go con.processWitnessData() - - chainID := uint32(0) - hashes := make(common.Hashes, 0, len(con.notarySet)) - for nID := range con.notarySet { - hashes = append(hashes, nID.Hash) - } - sort.Sort(hashes) - for i, hash := range hashes { - if hash == con.ID.Hash { - chainID = uint32(i) - break - } - } - con.rbModule.setChainNum(uint32(len(hashes))) - - genesisBlock := &types.Block{ - ProposerID: con.ID, - Position: types.Position{ - ChainID: chainID, - }, - } - if err := con.PrepareGenesisBlock(genesisBlock, time.Now().UTC()); err != nil { - log.Println(err) - } - if err := con.processBlock(genesisBlock); err != nil { - log.Println(err) - } - con.network.BroadcastBlock(genesisBlock) - -ProposingBlockLoop: - for { - select { - case <-con.tickerObj.Tick(): - case <-con.ctx.Done(): - break ProposingBlockLoop - } - block := &types.Block{ - ProposerID: con.ID, - Position: types.Position{ - ChainID: chainID, - }, - } - if err := con.prepareBlock(block, time.Now().UTC()); err != nil { - log.Println(err) - } - if err := con.processBlock(block); err != nil { - log.Println(err) - } - con.network.BroadcastBlock(block) - } } // Stop the Consensus core. @@ -734,8 +689,14 @@ func (con *Consensus) PrepareGenesisBlock(b *types.Block, // ProcessWitnessAck is the entry point to submit one witness ack. func (con *Consensus) ProcessWitnessAck(witnessAck *types.WitnessAck) (err error) { witnessAck = witnessAck.Clone() - if _, exists := con.notarySet[witnessAck.ProposerID]; !exists { - err = ErrProposerNotInNotarySet + // TODO(mission): check witness set for that round. + var round uint64 + exists, err := con.nodeSetCache.Exists(round, witnessAck.ProposerID) + if err != nil { + return + } + if !exists { + err = ErrProposerNotInNodeSet return } err = con.ccModule.processWitnessAck(witnessAck) diff --git a/core/consensus_test.go b/core/consensus_test.go index 0bb7e8f..3043511 100644 --- a/core/consensus_test.go +++ b/core/consensus_test.go @@ -24,6 +24,7 @@ import ( "github.com/dexon-foundation/dexon-consensus-core/common" "github.com/dexon-foundation/dexon-consensus-core/core/blockdb" + "github.com/dexon-foundation/dexon-consensus-core/core/crypto" "github.com/dexon-foundation/dexon-consensus-core/core/test" "github.com/dexon-foundation/dexon-consensus-core/core/types" "github.com/stretchr/testify/suite" @@ -46,7 +47,7 @@ func (n *network) BroadcastWitnessAck(witnessAck *types.WitnessAck) { // SendDKGPrivateShare sends PrivateShare to a DKG participant. func (n *network) SendDKGPrivateShare( - recv types.NodeID, prvShare *types.DKGPrivateShare) { + recv crypto.PublicKey, prvShare *types.DKGPrivateShare) { } // BroadcastDKGPrivateShare broadcasts PrivateShare to all DKG participants. @@ -86,14 +87,12 @@ func (s *ConsensusTestSuite) prepareGenesisBlock( } func (s *ConsensusTestSuite) prepareConsensus( - gov *test.Governance, nID types.NodeID) (*test.App, *Consensus) { + gov *test.Governance, prvKey crypto.PrivateKey) (*test.App, *Consensus) { app := test.NewApp() db, err := blockdb.NewMemBackedBlockDB() s.Require().Nil(err) - prv, exist := gov.GetPrivateKey(nID) - s.Require().Nil(exist) - con := NewConsensus(app, gov, db, &network{}, prv) + con := NewConsensus(app, gov, db, &network{}, prvKey) return app, con } @@ -113,25 +112,23 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() { minInterval = 50 * time.Millisecond gov, err = test.NewGovernance(4, time.Second) req = s.Require() + prvKeys = gov.GetPrivateKeys() nodes []types.NodeID ) s.Require().Nil(err) - - for nID := range gov.GetNodeSet(0) { - nodes = append(nodes, nID) - } - // Setup core.Consensus and test.App. objs := map[types.NodeID]*struct { app *test.App con *Consensus }{} - for _, nID := range nodes { - app, con := s.prepareConsensus(gov, nID) + for _, key := range prvKeys { + nID := types.NewNodeID(key.PublicKey()) + app, con := s.prepareConsensus(gov, key) objs[nID] = &struct { app *test.App con *Consensus }{app, con} + nodes = append(nodes, nID) } // It's a helper function to emit one block // to all core.Consensus objects. @@ -331,16 +328,16 @@ func (s *ConsensusTestSuite) TestPrepareBlock() { gov, err = test.NewGovernance(4, time.Second) req = s.Require() nodes []types.NodeID + prvKeys = gov.GetPrivateKeys() ) s.Require().Nil(err) - for nID := range gov.GetNodeSet(0) { - nodes = append(nodes, nID) - } // Setup core.Consensus and test.App. cons := map[types.NodeID]*Consensus{} - for _, nID := range nodes { - _, con := s.prepareConsensus(gov, nID) + for _, key := range prvKeys { + _, con := s.prepareConsensus(gov, key) + nID := types.NewNodeID(key.PublicKey()) cons[nID] = con + nodes = append(nodes, nID) } b00 := s.prepareGenesisBlock(nodes[0], 0, cons[nodes[0]]) b10 := s.prepareGenesisBlock(nodes[1], 1, cons[nodes[1]]) @@ -375,17 +372,12 @@ func (s *ConsensusTestSuite) TestPrepareBlock() { } func (s *ConsensusTestSuite) TestPrepareGenesisBlock() { - var ( - gov, err = test.NewGovernance(4, time.Second) - nodes []types.NodeID - ) + gov, err := test.NewGovernance(4, time.Second) s.Require().Nil(err) - for nID := range gov.GetNodeSet(0) { - nodes = append(nodes, nID) - } - _, con := s.prepareConsensus(gov, nodes[0]) + prvKey := gov.GetPrivateKeys()[0] + _, con := s.prepareConsensus(gov, prvKey) block := &types.Block{ - ProposerID: nodes[0], + ProposerID: types.NewNodeID(prvKey.PublicKey()), } con.PrepareGenesisBlock(block, time.Now().UTC()) s.True(block.IsGenesis()) diff --git a/core/crypto/ecdsa/ecdsa.go b/core/crypto/ecdsa/ecdsa.go index 5f98395..0196c47 100644 --- a/core/crypto/ecdsa/ecdsa.go +++ b/core/crypto/ecdsa/ecdsa.go @@ -64,6 +64,12 @@ func newPublicKey(prvKey *ecdsa.PrivateKey) *publicKey { } } +// NewPublicKeyFromByteSlice constructs an eth.publicKey instance from +// a byte slice. +func NewPublicKeyFromByteSlice(b []byte) crypto.PublicKey { + return publicKey{publicKey: b} +} + // decompressPubkey parses a public key in the 33-byte compressed format. func decompressPubkey(pubkey []byte) (publicKey, error) { _, err := ethcrypto.DecompressPubkey(pubkey) diff --git a/core/interfaces.go b/core/interfaces.go index b0a85ae..0319e8d 100644 --- a/core/interfaces.go +++ b/core/interfaces.go @@ -72,7 +72,7 @@ type Network interface { BroadcastWitnessAck(witnessAck *types.WitnessAck) // SendDKGPrivateShare sends PrivateShare to a DKG participant. - SendDKGPrivateShare(recv types.NodeID, prvShare *types.DKGPrivateShare) + SendDKGPrivateShare(pub crypto.PublicKey, prvShare *types.DKGPrivateShare) // BroadcastDKGPrivateShare broadcasts PrivateShare to all DKG participants. BroadcastDKGPrivateShare(prvShare *types.DKGPrivateShare) @@ -99,7 +99,7 @@ type Governance interface { // GetNodeSet returns the node set at a given round. // Return the genesis node set if round == 0. - GetNodeSet(round uint64) map[types.NodeID]struct{} + GetNodeSet(round uint64) []crypto.PublicKey // Porpose a ThresholdSignature of round. ProposeThresholdSignature(round uint64, signature crypto.Signature) diff --git a/core/nodeset-cache.go b/core/nodeset-cache.go new file mode 100644 index 0000000..d574817 --- /dev/null +++ b/core/nodeset-cache.go @@ -0,0 +1,158 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// . + +package core + +import ( + "errors" + "sync" + + "github.com/dexon-foundation/dexon-consensus-core/core/crypto" + "github.com/dexon-foundation/dexon-consensus-core/core/types" +) + +var ( + // ErrRoundNotReady means we got nil config from governance contract. + ErrRoundNotReady = errors.New("round is not ready") +) + +// NodeSetCache caches node set information from governance contract. +type NodeSetCache struct { + lock sync.RWMutex + gov Governance + rounds map[uint64]map[types.NodeID]struct{} + keyPool map[types.NodeID]*struct { + pubKey crypto.PublicKey + refCnt int + } +} + +// NewNodeSetCache constructs an NodeSetCache instance. +func NewNodeSetCache(gov Governance) *NodeSetCache { + return &NodeSetCache{ + gov: gov, + rounds: make(map[uint64]map[types.NodeID]struct{}), + keyPool: make(map[types.NodeID]*struct { + pubKey crypto.PublicKey + refCnt int + }), + } +} + +// Exists checks if a node is in node set of that round. +func (cache *NodeSetCache) Exists( + round uint64, nodeID types.NodeID) (exists bool, err error) { + + nIDs, exists := cache.get(round) + if !exists { + if nIDs, err = cache.update(round); err != nil { + return + } + } + _, exists = nIDs[nodeID] + return +} + +// GetPublicKey return public key for that node: +func (cache *NodeSetCache) GetPublicKey( + nodeID types.NodeID) (key crypto.PublicKey, exists bool) { + + cache.lock.RLock() + defer cache.lock.RUnlock() + + rec, exists := cache.keyPool[nodeID] + if exists { + key = rec.pubKey + } + return +} + +// GetNodeIDs returns IDs of nodes set of this round as map. +func (cache *NodeSetCache) GetNodeIDs( + round uint64) (nIDs map[types.NodeID]struct{}, err error) { + + IDs, exists := cache.get(round) + if !exists { + if IDs, err = cache.update(round); err != nil { + return + } + } + // Clone the map. + nIDs = make(map[types.NodeID]struct{}) + for ID := range IDs { + nIDs[ID] = struct{}{} + } + return +} + +// update node set for that round. +// +// This cache would maintain 10 rounds before the updated round and purge +// rounds not in this range. +func (cache *NodeSetCache) update( + round uint64) (nIDs map[types.NodeID]struct{}, err error) { + + cache.lock.Lock() + defer cache.lock.Unlock() + + // Get the requested round from governance contract. + keySet := cache.gov.GetNodeSet(round) + if keySet == nil { + // That round is not ready yet. + err = ErrRoundNotReady + return + } + // Cache new round. + nIDs = make(map[types.NodeID]struct{}) + for _, key := range keySet { + nID := types.NewNodeID(key) + nIDs[nID] = struct{}{} + if rec, exists := cache.keyPool[nID]; exists { + rec.refCnt++ + } else { + cache.keyPool[nID] = &struct { + pubKey crypto.PublicKey + refCnt int + }{key, 1} + } + } + cache.rounds[round] = nIDs + // Purge older rounds. + for rID, nIDs := range cache.rounds { + if round-rID <= 5 { + continue + } + for nID := range nIDs { + rec := cache.keyPool[nID] + if rec.refCnt--; rec.refCnt == 0 { + delete(cache.keyPool, nID) + } + } + delete(cache.rounds, rID) + } + return +} + +func (cache *NodeSetCache) get( + round uint64) (nIDs map[types.NodeID]struct{}, exists bool) { + + cache.lock.RLock() + defer cache.lock.RUnlock() + + nIDs, exists = cache.rounds[round] + return +} diff --git a/core/nodeset-cache_test.go b/core/nodeset-cache_test.go new file mode 100644 index 0000000..e9d8867 --- /dev/null +++ b/core/nodeset-cache_test.go @@ -0,0 +1,114 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// . + +package core + +import ( + "testing" + + "github.com/dexon-foundation/dexon-consensus-core/core/crypto" + "github.com/dexon-foundation/dexon-consensus-core/core/crypto/ecdsa" + "github.com/dexon-foundation/dexon-consensus-core/core/types" + "github.com/stretchr/testify/suite" +) + +type testGov struct { + s *NodeSetCacheTestSuite + curKeys []crypto.PublicKey +} + +func (g *testGov) GetConfiguration(round uint64) (cfg *types.Config) { return } +func (g *testGov) GetCRS(round uint64) (b []byte) { return } +func (g *testGov) GetNodeSet(round uint64) []crypto.PublicKey { + // Randomly generating keys, and check them for verification. + g.curKeys = []crypto.PublicKey{} + for i := 0; i < 10; i++ { + prvKey, err := ecdsa.NewPrivateKey() + g.s.Require().NoError(err) + g.curKeys = append(g.curKeys, prvKey.PublicKey()) + } + return g.curKeys +} +func (g *testGov) ProposeThresholdSignature( + round uint64, signature crypto.Signature) { +} +func (g *testGov) GetThresholdSignature( + round uint64) (sig crypto.Signature, exists bool) { + return +} +func (g *testGov) AddDKGComplaint(complaint *types.DKGComplaint) {} +func (g *testGov) DKGComplaints( + round uint64) (cs []*types.DKGComplaint) { + return +} +func (g *testGov) AddDKGMasterPublicKey( + masterPublicKey *types.DKGMasterPublicKey) { +} +func (g *testGov) DKGMasterPublicKeys( + round uint64) (keys []*types.DKGMasterPublicKey) { + return +} + +type NodeSetCacheTestSuite struct { + suite.Suite +} + +func (s *NodeSetCacheTestSuite) TestBasicUsage() { + var ( + gov = &testGov{s: s} + cache = NewNodeSetCache(gov) + req = s.Require() + ) + + chk := func( + cache *NodeSetCache, round uint64, nodeSet map[types.NodeID]struct{}) { + + for nID := range nodeSet { + // It should exists. + exists, err := cache.Exists(round, nID) + req.NoError(err) + req.True(exists) + // We could get keys. + key, exists := cache.GetPublicKey(nID) + req.NotNil(key) + req.True(exists) + } + } + + // Try to get round 0. + nodeSet0, err := cache.GetNodeIDs(0) + req.NoError(err) + chk(cache, 0, nodeSet0) + // Try to get round 1. + nodeSet1, err := cache.GetNodeIDs(1) + req.NoError(err) + chk(cache, 0, nodeSet0) + chk(cache, 1, nodeSet1) + // Try to get round 6, round 0 should be purged. + nodeSet6, err := cache.GetNodeIDs(6) + req.NoError(err) + chk(cache, 1, nodeSet1) + chk(cache, 6, nodeSet6) + for nID := range nodeSet0 { + _, exists := cache.GetPublicKey(nID) + req.False(exists) + } +} + +func TestNodeSetCache(t *testing.T) { + suite.Run(t, new(NodeSetCacheTestSuite)) +} diff --git a/core/test/fake-transport.go b/core/test/fake-transport.go index 2f1686e..e4bc252 100644 --- a/core/test/fake-transport.go +++ b/core/test/fake-transport.go @@ -21,17 +21,24 @@ import ( "fmt" "time" + "github.com/dexon-foundation/dexon-consensus-core/core/crypto" "github.com/dexon-foundation/dexon-consensus-core/core/types" ) +type fakePeerRecord struct { + sendChannel chan<- *TransportEnvelope + pubKey crypto.PublicKey +} + // FakeTransport implement TransportServer and TransportClient interface // by using golang channel. type FakeTransport struct { peerType TransportPeerType nID types.NodeID + pubKey crypto.PublicKey recvChannel chan *TransportEnvelope serverChannel chan<- *TransportEnvelope - peers map[types.NodeID]chan<- *TransportEnvelope + peers map[types.NodeID]fakePeerRecord latency LatencyModel } @@ -45,12 +52,13 @@ func NewFakeTransportServer() TransportServer { // NewFakeTransportClient constructs FakeTransport instance for peer. func NewFakeTransportClient( - nID types.NodeID, latency LatencyModel) TransportClient { + pubKey crypto.PublicKey, latency LatencyModel) TransportClient { return &FakeTransport{ peerType: TransportPeer, recvChannel: make(chan *TransportEnvelope, 1000), - nID: nID, + nID: types.NewNodeID(pubKey), + pubKey: pubKey, latency: latency, } } @@ -59,7 +67,7 @@ func NewFakeTransportClient( func (t *FakeTransport) Send( endpoint types.NodeID, msg interface{}) (err error) { - ch, exists := t.peers[endpoint] + rec, exists := t.peers[endpoint] if !exists { err = fmt.Errorf("the endpoint does not exists: %v", endpoint) return @@ -73,7 +81,7 @@ func (t *FakeTransport) Send( From: t.nID, Msg: msg, } - }(ch) + }(rec.sendChannel) return } @@ -107,10 +115,9 @@ func (t *FakeTransport) Close() (err error) { } // Peers implements Transport.Peers method. -func (t *FakeTransport) Peers() (peers map[types.NodeID]struct{}) { - peers = make(map[types.NodeID]struct{}) - for nID := range t.peers { - peers[nID] = struct{}{} +func (t *FakeTransport) Peers() (peers []crypto.PublicKey) { + for _, rec := range t.peers { + peers = append(peers, rec.pubKey) } return } @@ -135,7 +142,7 @@ func (t *FakeTransport) Join( continue } if t.peers, ok = - envelope.Msg.(map[types.NodeID]chan<- *TransportEnvelope); !ok { + envelope.Msg.(map[types.NodeID]fakePeerRecord); !ok { envelopes = append(envelopes, envelope) continue @@ -155,13 +162,16 @@ func (t *FakeTransport) Host() (chan *TransportEnvelope, error) { // WaitForPeers implements TransportServer.WaitForPeers method. func (t *FakeTransport) WaitForPeers(numPeers int) (err error) { - t.peers = make(map[types.NodeID]chan<- *TransportEnvelope) + t.peers = make(map[types.NodeID]fakePeerRecord) for { envelope := <-t.recvChannel // Panic here if some peer send other stuffs before // receiving peer lists. newPeer := envelope.Msg.(*FakeTransport) - t.peers[envelope.From] = newPeer.recvChannel + t.peers[envelope.From] = fakePeerRecord{ + sendChannel: newPeer.recvChannel, + pubKey: newPeer.pubKey, + } if len(t.peers) == numPeers { break } diff --git a/core/test/governance.go b/core/test/governance.go index bc5f58a..c666ffc 100644 --- a/core/test/governance.go +++ b/core/test/governance.go @@ -37,7 +37,6 @@ var ( type Governance struct { lambdaBA time.Duration lambdaDKG time.Duration - nodeSet map[types.NodeID]struct{} privateKeys map[types.NodeID]crypto.PrivateKey tsig map[uint64]crypto.Signature DKGComplaint map[uint64][]*types.DKGComplaint @@ -51,7 +50,6 @@ func NewGovernance(nodeCount int, lambda time.Duration) ( g = &Governance{ lambdaBA: lambda, lambdaDKG: lambda * 10, - nodeSet: make(map[types.NodeID]struct{}), privateKeys: make(map[types.NodeID]crypto.PrivateKey), tsig: make(map[uint64]crypto.Signature), DKGComplaint: make(map[uint64][]*types.DKGComplaint), @@ -63,7 +61,6 @@ func NewGovernance(nodeCount int, lambda time.Duration) ( return nil, err } nID := types.NewNodeID(prv.PublicKey()) - g.nodeSet[nID] = struct{}{} g.privateKeys[nID] = prv } return @@ -71,21 +68,19 @@ func NewGovernance(nodeCount int, lambda time.Duration) ( // GetNodeSet implements Governance interface to return current // notary set. -func (g *Governance) GetNodeSet(round uint64) ( - ret map[types.NodeID]struct{}) { - // Return a cloned map. - ret = make(map[types.NodeID]struct{}) - for k := range g.nodeSet { - ret[k] = struct{}{} +func (g *Governance) GetNodeSet(_ uint64) ( + ret []crypto.PublicKey) { + for _, key := range g.privateKeys { + ret = append(ret, key.PublicKey()) } return } // GetConfiguration returns the configuration at a given block height. -func (g *Governance) GetConfiguration(round uint64) *types.Config { +func (g *Governance) GetConfiguration(_ uint64) *types.Config { return &types.Config{ NumShards: 1, - NumChains: uint32(len(g.nodeSet)), + NumChains: uint32(len(g.privateKeys)), LambdaBA: g.lambdaBA, LambdaDKG: g.lambdaDKG, K: 0, @@ -98,15 +93,11 @@ func (g *Governance) GetCRS(round uint64) []byte { return []byte("__ DEXON") } -// GetPrivateKey return the private key for that node, this function +// GetPrivateKeys return the private key for that node, this function // is a test utility and not a general Governance interface. -func (g *Governance) GetPrivateKey( - nID types.NodeID) (key crypto.PrivateKey, err error) { - - key, exists := g.privateKeys[nID] - if !exists { - err = ErrPrivateKeyNotExists - return +func (g *Governance) GetPrivateKeys() (keys []crypto.PrivateKey) { + for _, k := range g.privateKeys { + keys = append(keys, k) } return } diff --git a/core/test/interface.go b/core/test/interface.go index a422ee7..ad8304e 100644 --- a/core/test/interface.go +++ b/core/test/interface.go @@ -19,6 +19,7 @@ package test import ( "github.com/dexon-foundation/dexon-consensus-core/core/blockdb" + "github.com/dexon-foundation/dexon-consensus-core/core/crypto" "github.com/dexon-foundation/dexon-consensus-core/core/types" ) @@ -96,10 +97,10 @@ type Transport interface { // Close would cleanup allocated resources. Close() error - // Peers return IDs of all connected nodes in p2p favor. + // Peers return public keys of all connected nodes in p2p favor. // This method should be accessed after ether 'Join' or 'WaitForPeers' // returned. - Peers() map[types.NodeID]struct{} + Peers() []crypto.PublicKey } // Marshaller defines an interface to convert between interface{} and []byte. diff --git a/core/test/tcp-transport.go b/core/test/tcp-transport.go index 0f9bd73..6e3ddfb 100644 --- a/core/test/tcp-transport.go +++ b/core/test/tcp-transport.go @@ -19,6 +19,7 @@ package test import ( "context" + "encoding/base64" "encoding/binary" "encoding/json" "fmt" @@ -28,13 +29,22 @@ import ( "net" "os" "strconv" + "strings" "sync" "syscall" "time" + "github.com/dexon-foundation/dexon-consensus-core/core/crypto" + "github.com/dexon-foundation/dexon-consensus-core/core/crypto/ecdsa" "github.com/dexon-foundation/dexon-consensus-core/core/types" ) +type tcpPeerRecord struct { + conn string + sendChannel chan<- []byte + pubKey crypto.PublicKey +} + // tcpMessage is the general message between peers and server. type tcpMessage struct { NodeID types.NodeID `json:"nid"` @@ -42,13 +52,33 @@ type tcpMessage struct { Info string `json:"conn"` } +// buildPeerInfo is a tricky way to combine connection string and +// base64 encoded byte slice for public key into a single string, +// separated by ';'. +func buildPeerInfo(pubKey crypto.PublicKey, conn string) string { + return conn + ";" + base64.StdEncoding.EncodeToString(pubKey.Bytes()) +} + +// parsePeerInfo parse connection string and base64 encoded public key built +// via buildPeerInfo. +func parsePeerInfo(info string) (key crypto.PublicKey, conn string) { + tokens := strings.Split(info, ";") + conn = tokens[0] + data, err := base64.StdEncoding.DecodeString(tokens[1]) + if err != nil { + panic(err) + } + key = ecdsa.NewPublicKeyFromByteSlice(data) + return +} + // TCPTransport implements Transport interface via TCP connection. type TCPTransport struct { peerType TransportPeerType nID types.NodeID + pubKey crypto.PublicKey localPort int - peersInfo map[types.NodeID]string - peers map[types.NodeID]chan<- []byte + peers map[types.NodeID]*tcpPeerRecord peersLock sync.RWMutex recvChannel chan *TransportEnvelope ctx context.Context @@ -60,7 +90,7 @@ type TCPTransport struct { // NewTCPTransport constructs an TCPTransport instance. func NewTCPTransport( peerType TransportPeerType, - nID types.NodeID, + pubKey crypto.PublicKey, latency LatencyModel, marshaller Marshaller, localPort int) *TCPTransport { @@ -68,9 +98,9 @@ func NewTCPTransport( ctx, cancel := context.WithCancel(context.Background()) return &TCPTransport{ peerType: peerType, - nID: nID, - peersInfo: make(map[types.NodeID]string), - peers: make(map[types.NodeID]chan<- []byte), + nID: types.NewNodeID(pubKey), + pubKey: pubKey, + peers: make(map[types.NodeID]*tcpPeerRecord), recvChannel: make(chan *TransportEnvelope, 1000), ctx: ctx, cancel: cancel, @@ -96,7 +126,7 @@ func (t *TCPTransport) Send( t.peersLock.RLock() defer t.peersLock.RUnlock() - t.peers[endpoint] <- payload + t.peers[endpoint].sendChannel <- payload }() return } @@ -110,7 +140,7 @@ func (t *TCPTransport) Broadcast(msg interface{}) (err error) { t.peersLock.RLock() defer t.peersLock.RUnlock() - for nID, ch := range t.peers { + for nID, rec := range t.peers { if nID == t.nID { continue } @@ -119,7 +149,7 @@ func (t *TCPTransport) Broadcast(msg interface{}) (err error) { time.Sleep(t.latency.Delay()) } ch <- payload - }(ch) + }(rec.sendChannel) } return } @@ -131,7 +161,7 @@ func (t *TCPTransport) Close() (err error) { // Reset peers. t.peersLock.Lock() defer t.peersLock.Unlock() - t.peers = make(map[types.NodeID]chan<- []byte) + t.peers = make(map[types.NodeID]*tcpPeerRecord) // Tell our user that this channel is closed. close(t.recvChannel) t.recvChannel = nil @@ -139,10 +169,9 @@ func (t *TCPTransport) Close() (err error) { } // Peers implements Transport.Peers method. -func (t *TCPTransport) Peers() (peers map[types.NodeID]struct{}) { - peers = make(map[types.NodeID]struct{}) - for nID := range t.peersInfo { - peers[nID] = struct{}{} +func (t *TCPTransport) Peers() (peers []crypto.PublicKey) { + for _, rec := range t.peers { + peers = append(peers, rec.pubKey) } return } @@ -376,7 +405,7 @@ func (t *TCPTransport) listenerRoutine(listener *net.TCPListener) { // we only utilize the write part for simplicity. func (t *TCPTransport) buildConnectionsToPeers() (err error) { var wg sync.WaitGroup - for nID, addr := range t.peersInfo { + for nID, rec := range t.peers { if nID == t.nID { continue } @@ -394,8 +423,8 @@ func (t *TCPTransport) buildConnectionsToPeers() (err error) { t.peersLock.Lock() defer t.peersLock.Unlock() - t.peers[nID] = t.connWriter(conn) - }(nID, addr) + t.peers[nID].sendChannel = t.connWriter(conn) + }(nID, rec.conn) } wg.Wait() return @@ -410,14 +439,15 @@ type TCPTransportClient struct { // NewTCPTransportClient constructs a TCPTransportClient instance. func NewTCPTransportClient( - nID types.NodeID, + pubKey crypto.PublicKey, latency LatencyModel, marshaller Marshaller, local bool) *TCPTransportClient { return &TCPTransportClient{ - TCPTransport: *NewTCPTransport(TransportPeer, nID, latency, marshaller, 8080), - local: local, + TCPTransport: *NewTCPTransport( + TransportPeer, pubKey, latency, marshaller, 8080), + local: local, } } @@ -436,7 +466,6 @@ func (t *TCPTransportClient) Report(msg interface{}) (err error) { // Join implements TransportClient.Join method. func (t *TCPTransportClient) Join( serverEndpoint interface{}) (ch <-chan *TransportEnvelope, err error) { - // Initiate a TCP server. // TODO(mission): config initial listening port. var ( @@ -475,7 +504,6 @@ func (t *TCPTransportClient) Join( t.localPort = 1024 + rand.Int()%1024 } go t.listenerRoutine(ln.(*net.TCPListener)) - serverConn, err := net.Dial("tcp", serverEndpoint.(string)) if err != nil { return @@ -492,17 +520,26 @@ func (t *TCPTransportClient) Join( conn = net.JoinHostPort(ip, strconv.Itoa(t.localPort)) } if err = t.Report(&tcpMessage{ - Type: "conn", NodeID: t.nID, - Info: conn, + Type: "conn", + Info: buildPeerInfo(t.pubKey, conn), }); err != nil { return } // Wait for peers list sent by server. e := <-t.recvChannel - if t.peersInfo, ok = e.Msg.(map[types.NodeID]string); !ok { + peersInfo, ok := e.Msg.(map[types.NodeID]string) + if !ok { panic(fmt.Errorf("expect peer list, not %v", e)) } + // Setup peers information. + for nID, info := range peersInfo { + pubKey, conn := parsePeerInfo(info) + t.peers[nID] = &tcpPeerRecord{ + conn: conn, + pubKey: pubKey, + } + } // Setup connections to other peers. if err = t.buildConnectionsToPeers(); err != nil { return @@ -551,7 +588,7 @@ func NewTCPTransportServer( // won't be zero. TCPTransport: *NewTCPTransport( TransportPeerServer, - types.NodeID{}, + ecdsa.NewPublicKeyFromByteSlice(nil), nil, marshaller, serverPort), @@ -576,6 +613,7 @@ func (t *TCPTransportServer) Host() (chan *TransportEnvelope, error) { func (t *TCPTransportServer) WaitForPeers(numPeers int) (err error) { // Collect peers info. Packets other than peer info is // unexpected. + peersInfo := make(map[types.NodeID]string) for { // Wait for connection info reported by peers. e := <-t.recvChannel @@ -586,9 +624,14 @@ func (t *TCPTransportServer) WaitForPeers(numPeers int) (err error) { if msg.Type != "conn" { panic(fmt.Errorf("expect connection report, not %v", e)) } - t.peersInfo[msg.NodeID] = msg.Info + pubKey, conn := parsePeerInfo(msg.Info) + t.peers[msg.NodeID] = &tcpPeerRecord{ + conn: conn, + pubKey: pubKey, + } + peersInfo[msg.NodeID] = msg.Info // Check if we already collect enought peers. - if len(t.peersInfo) == numPeers { + if len(peersInfo) == numPeers { break } } @@ -596,7 +639,7 @@ func (t *TCPTransportServer) WaitForPeers(numPeers int) (err error) { if err = t.buildConnectionsToPeers(); err != nil { return } - if err = t.Broadcast(t.peersInfo); err != nil { + if err = t.Broadcast(peersInfo); err != nil { return } // Wait for peers to send 'ready' report. diff --git a/core/test/transport_test.go b/core/test/transport_test.go index 4d71fc8..056cd39 100644 --- a/core/test/transport_test.go +++ b/core/test/transport_test.go @@ -197,7 +197,7 @@ func (s *TransportTestSuite) TestFake() { peerCount = 13 req = s.Require() peers = make(map[types.NodeID]*testPeer) - nIDs = GenerateRandomNodeIDs(peerCount) + prvKeys = GenerateRandomPrivateKeys(peerCount) err error wg sync.WaitGroup latency = &FixedLatencyModel{Latency: 300} @@ -207,11 +207,12 @@ func (s *TransportTestSuite) TestFake() { server.recv, err = server.trans.Host() req.Nil(err) // Setup Peers - wg.Add(len(nIDs)) - for _, nID := range nIDs { + wg.Add(len(prvKeys)) + for _, key := range prvKeys { + nID := types.NewNodeID(key.PublicKey()) peer := &testPeer{ nID: nID, - trans: NewFakeTransportClient(nID, latency), + trans: NewFakeTransportClient(key.PublicKey(), latency), } peers[nID] = peer go func() { @@ -233,11 +234,12 @@ func (s *TransportTestSuite) TestFake() { } func (s *TransportTestSuite) TestTCPLocal() { + var ( peerCount = 25 req = s.Require() peers = make(map[types.NodeID]*testPeer) - nIDs = GenerateRandomNodeIDs(peerCount) + prvKeys = GenerateRandomPrivateKeys(peerCount) err error wg sync.WaitGroup latency = &FixedLatencyModel{Latency: 300} @@ -250,11 +252,13 @@ func (s *TransportTestSuite) TestTCPLocal() { server.recv, err = server.trans.Host() req.Nil(err) // Setup Peers - wg.Add(len(nIDs)) - for _, nID := range nIDs { + wg.Add(len(prvKeys)) + for _, prvKey := range prvKeys { + nID := types.NewNodeID(prvKey.PublicKey()) peer := &testPeer{ - nID: nID, - trans: NewTCPTransportClient(nID, latency, &testMarshaller{}, true), + nID: nID, + trans: NewTCPTransportClient( + prvKey.PublicKey(), latency, &testMarshaller{}, true), } peers[nID] = peer go func() { diff --git a/core/test/utils.go b/core/test/utils.go index 887ef14..2fc21ce 100644 --- a/core/test/utils.go +++ b/core/test/utils.go @@ -24,6 +24,8 @@ import ( "time" "github.com/dexon-foundation/dexon-consensus-core/common" + "github.com/dexon-foundation/dexon-consensus-core/core/crypto" + "github.com/dexon-foundation/dexon-consensus-core/core/crypto/ecdsa" "github.com/dexon-foundation/dexon-consensus-core/core/types" ) @@ -43,6 +45,18 @@ func GenerateRandomNodeIDs(nodeCount int) (nIDs types.NodeIDs) { return } +// GenerateRandomPrivateKeys generate a set of private keys. +func GenerateRandomPrivateKeys(nodeCount int) (prvKeys []crypto.PrivateKey) { + for i := 0; i < nodeCount; i++ { + prvKey, err := ecdsa.NewPrivateKey() + if err != nil { + panic(err) + } + prvKeys = append(prvKeys, prvKey) + } + return +} + // CalcLatencyStatistics calculates average and deviation from a slice // of latencies. func CalcLatencyStatistics(latencies []time.Duration) (avg, dev time.Duration) { diff --git a/integration_test/node.go b/integration_test/node.go index d66a86a..ee25b11 100644 --- a/integration_test/node.go +++ b/integration_test/node.go @@ -88,9 +88,13 @@ func NewNode( shardID = uint32(0) chainID = uint32(math.MaxUint32) governanceConfig = gov.GetConfiguration(0) - broadcastTargets = gov.GetNodeSet(0) + nodeSetKeys = gov.GetNodeSet(0) nodeID = types.NewNodeID(privateKey.PublicKey()) ) + broadcastTargets := make(map[types.NodeID]struct{}) + for _, k := range nodeSetKeys { + broadcastTargets[types.NewNodeID(k)] = struct{}{} + } hashes := common.Hashes{} for nID := range broadcastTargets { hashes = append(hashes, nID.Hash) diff --git a/integration_test/utils.go b/integration_test/utils.go index 07d41b6..3e33362 100644 --- a/integration_test/utils.go +++ b/integration_test/utils.go @@ -4,7 +4,6 @@ import ( "time" "github.com/dexon-foundation/dexon-consensus-core/core/blockdb" - "github.com/dexon-foundation/dexon-consensus-core/core/crypto" "github.com/dexon-foundation/dexon-consensus-core/core/test" "github.com/dexon-foundation/dexon-consensus-core/core/types" ) @@ -18,11 +17,6 @@ func PrepareNodes( nodes map[types.NodeID]*Node, err error) { - var ( - db blockdb.BlockDatabase - key crypto.PrivateKey - ) - apps = make(map[types.NodeID]*test.App) dbs = make(map[types.NodeID]blockdb.BlockDatabase) nodes = make(map[types.NodeID]*Node) @@ -31,25 +25,21 @@ func PrepareNodes( if err != nil { return } - for nID := range gov.GetNodeSet(0) { + for _, prvKey := range gov.GetPrivateKeys() { + nID := types.NewNodeID(prvKey.PublicKey()) apps[nID] = test.NewApp() - - if db, err = blockdb.NewMemBackedBlockDB(); err != nil { - return - } - dbs[nID] = db - } - for nID := range gov.GetNodeSet(0) { - if key, err = gov.GetPrivateKey(nID); err != nil { + dbs[nID], err = blockdb.NewMemBackedBlockDB() + if err != nil { return } nodes[nID] = NewNode( apps[nID], gov, dbs[nID], - key, + prvKey, networkLatency, - proposingLatency) + proposingLatency, + ) } return } diff --git a/simulation/governance.go b/simulation/governance.go index eb0f2a2..3290075 100644 --- a/simulation/governance.go +++ b/simulation/governance.go @@ -32,7 +32,7 @@ import ( type simGovernance struct { id types.NodeID lock sync.RWMutex - notarySet map[types.NodeID]struct{} + nodeSet map[types.NodeID]crypto.PublicKey expectedNumNodes int k int phiRatio float32 @@ -52,7 +52,7 @@ func newSimGovernance( numNodes int, consensusConfig config.Consensus) *simGovernance { return &simGovernance{ id: id, - notarySet: make(map[types.NodeID]struct{}), + nodeSet: make(map[types.NodeID]crypto.PublicKey), expectedNumNodes: numNodes, k: consensusConfig.K, phiRatio: consensusConfig.PhiRatio, @@ -71,17 +71,14 @@ func (g *simGovernance) setNetwork(network *network) { } // GetNodeSet returns the current notary set. -func (g *simGovernance) GetNodeSet( - blockHeight uint64) map[types.NodeID]struct{} { +func (g *simGovernance) GetNodeSet(round uint64) (ret []crypto.PublicKey) { g.lock.RLock() defer g.lock.RUnlock() - // Return the cloned notarySet. - ret := make(map[types.NodeID]struct{}) - for k := range g.notarySet { - ret[k] = struct{}{} + for _, pubKey := range g.nodeSet { + ret = append(ret, pubKey) } - return ret + return } // GetConfiguration returns the configuration at a given round. @@ -102,17 +99,19 @@ func (g *simGovernance) GetCRS(round uint64) []byte { } // addNode add a new node into the simulated governance contract. -func (g *simGovernance) addNode(nID types.NodeID) { +func (g *simGovernance) addNode(pubKey crypto.PublicKey) { + nID := types.NewNodeID(pubKey) + g.lock.Lock() defer g.lock.Unlock() - if _, exists := g.notarySet[nID]; exists { + if _, exists := g.nodeSet[nID]; exists { return } - if len(g.notarySet) == g.expectedNumNodes { + if len(g.nodeSet) == g.expectedNumNodes { panic(fmt.Errorf("attempt to add node when ready")) } - g.notarySet[nID] = struct{}{} + g.nodeSet[nID] = pubKey } // ProposeThresholdSignature porposes a ThresholdSignature of round. diff --git a/simulation/network.go b/simulation/network.go index 3d3acaa..f1f586a 100644 --- a/simulation/network.go +++ b/simulation/network.go @@ -26,6 +26,7 @@ import ( "time" "github.com/dexon-foundation/dexon-consensus-core/common" + "github.com/dexon-foundation/dexon-consensus-core/core/crypto" "github.com/dexon-foundation/dexon-consensus-core/core/test" "github.com/dexon-foundation/dexon-consensus-core/core/types" "github.com/dexon-foundation/dexon-consensus-core/simulation/config" @@ -88,7 +89,7 @@ type network struct { // newNetwork setup network stuffs for nodes, which provides an // implementation of core.Network based on test.TransportClient. -func newNetwork(nID types.NodeID, cfg config.Networking) (n *network) { +func newNetwork(pubKey crypto.PublicKey, cfg config.Networking) (n *network) { // Construct latency model. latency := &test.NormalLatencyModel{ Mean: cfg.Mean, @@ -105,12 +106,12 @@ func newNetwork(nID types.NodeID, cfg config.Networking) (n *network) { switch cfg.Type { case config.NetworkTypeTCPLocal: n.trans = test.NewTCPTransportClient( - nID, latency, &jsonMarshaller{}, true) + pubKey, latency, &jsonMarshaller{}, true) case config.NetworkTypeTCP: n.trans = test.NewTCPTransportClient( - nID, latency, &jsonMarshaller{}, false) + pubKey, latency, &jsonMarshaller{}, false) case config.NetworkTypeFake: - n.trans = test.NewFakeTransportClient(nID, latency) + n.trans = test.NewFakeTransportClient(pubKey, latency) default: panic(fmt.Errorf("unknown network type: %v", cfg.Type)) } @@ -147,8 +148,8 @@ func (n *network) broadcast(message interface{}) { // SendDKGPrivateShare implements core.Network interface. func (n *network) SendDKGPrivateShare( - recv types.NodeID, prvShare *types.DKGPrivateShare) { - if err := n.trans.Send(recv, prvShare); err != nil { + recv crypto.PublicKey, prvShare *types.DKGPrivateShare) { + if err := n.trans.Send(types.NewNodeID(recv), prvShare); err != nil { panic(err) } } @@ -249,6 +250,6 @@ func (n *network) report(msg interface{}) error { } // peers exports 'Peers' method of test.Transport. -func (n *network) peers() map[types.NodeID]struct{} { +func (n *network) peers() []crypto.PublicKey { return n.trans.Peers() } diff --git a/simulation/node.go b/simulation/node.go index 710b5e9..c9163b3 100644 --- a/simulation/node.go +++ b/simulation/node.go @@ -49,8 +49,9 @@ func newNode( prvKey crypto.PrivateKey, config config.Config) *node { - id := types.NewNodeID(prvKey.PublicKey()) - netModule := newNetwork(id, config.Networking) + pubKey := prvKey.PublicKey() + netModule := newNetwork(pubKey, config.Networking) + id := types.NewNodeID(pubKey) db, err := blockdb.NewMemBackedBlockDB( id.String() + ".blockdb") if err != nil { @@ -85,8 +86,9 @@ func (n *node) run(serverEndpoint interface{}, legacy bool) { n.gov.setNetwork(n.netModule) // Run consensus. hashes := make(common.Hashes, 0, len(peers)) - for nID := range peers { - n.gov.addNode(nID) + for _, pubKey := range peers { + nID := types.NewNodeID(pubKey) + n.gov.addNode(pubKey) hashes = append(hashes, nID.Hash) } sort.Sort(hashes) diff --git a/simulation/peer-server.go b/simulation/peer-server.go index 30cf896..c8a3078 100644 --- a/simulation/peer-server.go +++ b/simulation/peer-server.go @@ -168,7 +168,9 @@ func (p *PeerServer) Run() { panic(err) } // Cache peers' info. - p.peers = p.trans.Peers() + for _, pubKey := range p.trans.Peers() { + p.peers[types.NewNodeID(pubKey)] = struct{}{} + } // Initialize total order result cache. for id := range p.peers { p.peerTotalOrder[id] = NewTotalOrderResult(id) -- cgit v1.2.3