From 8335eac4a488716c396f114cbe7522919b97e224 Mon Sep 17 00:00:00 2001 From: Sonic Date: Tue, 25 Sep 2018 20:37:11 +0800 Subject: dex: redesign p2p network topology - Let p2p server support direct connection and group connection. - Introduce node meta table to maintain IP of all nodes in node set, in memory and let nodes in the network can sync this table. - Let peerSet able to manage direct connections to notary set and dkg set. The mechanism to refresh the network topology when configuration round change is not done yet. --- dex/handler.go | 216 +++++++---------- dex/handler_test.go | 445 +++++++++++++++++++++++++++++++++++ dex/helper_test.go | 78 +++++-- dex/network.go | 17 +- dex/nodetable.go | 79 +++++++ dex/nodetable_test.go | 93 ++++++++ dex/notaryset.go | 203 ---------------- dex/peer.go | 296 ++++++++++++++++++------ dex/peer_test.go | 628 ++++++++++++++++++++++++++++++++++++++++++++++++++ dex/protocol.go | 42 ++-- dex/protocol_test.go | 82 +++++++ dex/sync.go | 95 +++++++- 12 files changed, 1825 insertions(+), 449 deletions(-) create mode 100644 dex/handler_test.go create mode 100644 dex/nodetable.go create mode 100644 dex/nodetable_test.go delete mode 100644 dex/notaryset.go create mode 100644 dex/peer_test.go (limited to 'dex') diff --git a/dex/handler.go b/dex/handler.go index bc932fb28..5348b06f2 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -49,6 +49,8 @@ const ( // txChanSize is the size of channel listening to NewTxsEvent. // The number is referenced from the size of tx pool. txChanSize = 4096 + + metaChanSize = 10240 ) var ( @@ -59,11 +61,6 @@ var ( // not compatible (low protocol version restrictions and high requirements). var errIncompatibleConfig = errors.New("incompatible configuration") -type newNotarySetEvent struct { - Round uint64 - Set map[string]struct{} -} - func errResp(code errCode, format string, v ...interface{}) error { return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) } @@ -75,32 +72,35 @@ type ProtocolManager struct { acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing) txpool txPool + nodeTable *nodeTable + gov governance blockchain *core.BlockChain chainconfig *params.ChainConfig maxPeers int - downloader *downloader.Downloader - fetcher *fetcher.Fetcher - peers *peerSet - notarySetManager *notarySetManager + downloader *downloader.Downloader + fetcher *fetcher.Fetcher + peers *peerSet SubProtocols []p2p.Protocol eventMux *event.TypeMux txsCh chan core.NewTxsEvent txsSub event.Subscription + metasCh chan newMetasEvent + metasSub event.Subscription minedBlockSub *event.TypeMuxSubscription // channels for fetcher, syncer, txsyncLoop newPeerCh chan *peer txsyncCh chan *txsync + metasyncCh chan *metasync quitSync chan struct{} noMorePeers chan struct{} - // channels for notarySetLoop - newNotarySetCh chan newNotarySetEvent - newRoundCh chan uint64 - newNotaryNodeInfoCh chan *notaryNodeInfo + // channels for peerSetLoop + crsCh chan core.NewCRSEvent + crsSub event.Subscription srvr p2pServer @@ -111,24 +111,26 @@ type ProtocolManager struct { // NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // with the Ethereum network. -func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) { +func NewProtocolManager( + config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, + mux *event.TypeMux, txpool txPool, engine consensus.Engine, + blockchain *core.BlockChain, chaindb ethdb.Database, + gov governance) (*ProtocolManager, error) { + tab := newNodeTable() // Create the protocol manager with the base fields manager := &ProtocolManager{ - networkID: networkID, - eventMux: mux, - txpool: txpool, - blockchain: blockchain, - chainconfig: config, - peers: newPeerSet(), - newPeerCh: make(chan *peer), - noMorePeers: make(chan struct{}), - txsyncCh: make(chan *txsync), - quitSync: make(chan struct{}), - newNotarySetCh: make(chan newNotarySetEvent), - newRoundCh: make(chan uint64), - newNotaryNodeInfoCh: make(chan *notaryNodeInfo), - } - manager.notarySetManager = newNotarySetManager(manager.newNotaryNodeInfoCh) + networkID: networkID, + eventMux: mux, + txpool: txpool, + nodeTable: tab, + gov: gov, + blockchain: blockchain, + chainconfig: config, + newPeerCh: make(chan *peer), + noMorePeers: make(chan struct{}), + txsyncCh: make(chan *txsync), + quitSync: make(chan struct{}), + } // Figure out whether to allow fast sync or not if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 { @@ -216,22 +218,33 @@ func (pm *ProtocolManager) removePeer(id string) { func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) { pm.maxPeers = maxPeers pm.srvr = srvr + pm.peers = newPeerSet(pm.gov, pm.srvr, pm.nodeTable) + + // if our self in node set build the node info // broadcast transactions pm.txsCh = make(chan core.NewTxsEvent, txChanSize) pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh) go pm.txBroadcastLoop() + // broadcast node metas + pm.metasCh = make(chan newMetasEvent, metaChanSize) + pm.metasSub = pm.nodeTable.SubscribeNewMetasEvent(pm.metasCh) + go pm.metaBroadcastLoop() + // broadcast mined blocks pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) go pm.minedBroadcastLoop() - // run the notary set loop - go pm.notarySetLoop() + // run the peer set loop + pm.crsCh = make(chan core.NewCRSEvent) + pm.crsSub = pm.gov.SubscribeNewCRSEvent(pm.crsCh) + go pm.peerSetLoop() // start sync handlers go pm.syncer() go pm.txsyncLoop() + go pm.metasyncLoop() } func (pm *ProtocolManager) Stop() { @@ -301,6 +314,7 @@ func (pm *ProtocolManager) handle(p *peer) error { // Propagate existing transactions. new transactions appearing // after this will be sent via broadcasts. pm.syncTransactions(p) + pm.syncNodeMetas(p) // If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil { @@ -699,18 +713,18 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.txpool.AddRemotes(txs) - case msg.Code == NotaryNodeInfoMsg: - var info notaryNodeInfo - if err := msg.Decode(&info); err != nil { + case msg.Code == MetaMsg: + var metas []*NodeMeta + if err := msg.Decode(&metas); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - - // TODO(sonic): validate signature - p.MarkNotaryNodeInfo(info.Hash()) - // Broadcast to peers - pm.BroadcastNotaryNodeInfo(&info) - - pm.notarySetManager.TryAddInfo(&info) + for i, meta := range metas { + if meta == nil { + return errResp(ErrDecode, "node meta %d is nil", i) + } + p.MarkNodeMeta(meta.Hash()) + } + pm.nodeTable.Add(metas) default: return errResp(ErrInvalidMsgCode, "%v", msg.Code) } @@ -769,11 +783,20 @@ func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) { } } -// BroadcastNotaryNodeInfo will propagate notary node info to its peers. -func (pm *ProtocolManager) BroadcastNotaryNodeInfo(info *notaryNodeInfo) { - peers := pm.peers.PeersWithoutNotaryNodeInfo(info.Hash()) - for _, peer := range peers { - peer.AsyncSendNotaryNodeInfo(info) +// BroadcastMetas will propagate node metas to its peers. +func (pm *ProtocolManager) BroadcastMetas(metas []*NodeMeta) { + var metaset = make(map[*peer][]*NodeMeta) + + for _, meta := range metas { + peers := pm.peers.PeersWithoutNodeMeta(meta.Hash()) + for _, peer := range peers { + metaset[peer] = append(metaset[peer], meta) + } + log.Trace("Broadcast meta", "ID", meta.ID, "recipients", len(peers)) + } + + for peer, metas := range metaset { + peer.AsyncSendNodeMetas(metas) } } @@ -801,97 +824,32 @@ func (pm *ProtocolManager) txBroadcastLoop() { } } -// a loop keep building and maintaining peers in notary set. -func (pm *ProtocolManager) notarySetLoop() { - // TODO: - // * pm need to know current round, and whether we are in notary set. - // * (done) able to handle node info in the near future round. - // * check peer limit. - // * revisit channels (newNotarySetCh, newRoundCh, newNotaryNodeInfoCh) (buffered ?) - - // If we are in notary set and we are synced with network. - // events: - // * new notary set is determined, and we are in notary set. - // (TBD: subscribe the event or polling govornance) - // - advance round - // - build new notary set - // - remove old notary set, remove old notary set from static - - // * current round node info changed. - - self := pm.srvr.Self() - var round uint64 - +func (pm *ProtocolManager) metaBroadcastLoop() { for { select { - case event := <-pm.newNotarySetCh: - // new notary set is determined. - if _, ok := event.Set[self.ID.String()]; ok { - // initialize the new notary set and add it to pm.notarySetManager - s := newNotarySet(event.Round, event.Set) - pm.notarySetManager.Register(event.Round, s) - - // TODO: handle signature - pm.BroadcastNotaryNodeInfo(¬aryNodeInfo{ - ID: self.ID, - IP: self.IP, - UDP: self.UDP, - TCP: self.TCP, - Round: event.Round, - Timestamp: time.Now().Unix(), - }) - } - - case r := <-pm.newRoundCh: - // move to new round. - round = r + case event := <-pm.metasCh: + pm.BroadcastMetas(event.Metas) - // TODO: revisit this to make sure how many previous round's - // notary set we need to keep. - - // rmove notary set before current round, they are useless - for _, s := range pm.notarySetManager.Before(round) { - pm.removeNotarySetFromStatic(s) - } - - // prepare notary set connections for new round. - if pm.isInNotarySet(round + 1) { - // we assume the notary set for round + 1 is added to - // pm.notarySetManager before this step. - if n, ok := pm.notarySetManager.Round(round + 1); ok { - pm.addNotarySetToStatic(n) - } - } - - case <-pm.newNotaryNodeInfoCh: - // some node info in "current round" is updated. - // try to connect them by the new info. - if pm.isInNotarySet(round) { - if n, ok := pm.notarySetManager.Round(round); ok { - pm.addNotarySetToStatic(n) - } - } - - case <-pm.quitSync: + // Err() channel will be closed when unsubscribing. + case <-pm.metasSub.Err(): return } } } -func (pm *ProtocolManager) isInNotarySet(r uint64) bool { - return false -} - -func (pm *ProtocolManager) addNotarySetToStatic(n *notarySet) { - for _, node := range n.NodesToAdd() { - pm.srvr.AddNotaryPeer(node) - n.MarkAdded(node.ID.String()) - } -} - -func (pm *ProtocolManager) removeNotarySetFromStatic(n *notarySet) { - for _, node := range n.Nodes() { - pm.srvr.RemoveNotaryPeer(node) +// a loop keep building and maintaining peers in notary set. +// TODO: finish this +func (pm *ProtocolManager) peerSetLoop() { + for { + select { + case event := <-pm.crsCh: + pm.peers.BuildNotaryConn(event.Round) + pm.peers.BuildDKGConn(event.Round) + pm.peers.ForgetNotaryConn(event.Round - 1) + pm.peers.ForgetDKGConn(event.Round - 1) + case <-pm.quitSync: + return + } } } diff --git a/dex/handler_test.go b/dex/handler_test.go new file mode 100644 index 000000000..981362bb5 --- /dev/null +++ b/dex/handler_test.go @@ -0,0 +1,445 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package dex + +import ( + "math" + "math/big" + "math/rand" + "testing" + + "github.com/dexon-foundation/dexon/common" + "github.com/dexon-foundation/dexon/core" + "github.com/dexon-foundation/dexon/core/state" + "github.com/dexon-foundation/dexon/core/types" + "github.com/dexon-foundation/dexon/crypto" + "github.com/dexon-foundation/dexon/eth/downloader" + "github.com/dexon-foundation/dexon/ethdb" + "github.com/dexon-foundation/dexon/p2p" + "github.com/dexon-foundation/dexon/params" +) + +// Tests that protocol versions and modes of operations are matched up properly. +func TestProtocolCompatibility(t *testing.T) { + // Define the compatibility chart + tests := []struct { + version uint + mode downloader.SyncMode + compatible bool + }{ + {61, downloader.FullSync, true}, {62, downloader.FullSync, true}, {63, downloader.FullSync, true}, + {61, downloader.FastSync, true}, {62, downloader.FastSync, true}, {63, downloader.FastSync, true}, + } + // Make sure anything we screw up is restored + backup := ProtocolVersions + defer func() { ProtocolVersions = backup }() + + // Try all available compatibility configs and check for errors + for i, tt := range tests { + ProtocolVersions = []uint{tt.version} + + pm, _, err := newTestProtocolManager(tt.mode, 0, nil, nil) + if pm != nil { + defer pm.Stop() + } + if (err == nil && !tt.compatible) || (err != nil && tt.compatible) { + t.Errorf("test %d: compatibility mismatch: have error %v, want compatibility %v", i, err, tt.compatible) + } + } +} + +// Tests that block headers can be retrieved from a remote chain based on user queries. +func TestGetBlockHeaders62(t *testing.T) { testGetBlockHeaders(t, 62) } +func TestGetBlockHeaders63(t *testing.T) { testGetBlockHeaders(t, 63) } + +func testGetBlockHeaders(t *testing.T, protocol int) { + pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, downloader.MaxHashFetch+15, nil, nil) + peer, _ := newTestPeer("peer", protocol, pm, true) + defer peer.close() + + // Create a "random" unknown hash for testing + var unknown common.Hash + for i := range unknown { + unknown[i] = byte(i) + } + // Create a batch of tests for various scenarios + limit := uint64(downloader.MaxHeaderFetch) + tests := []struct { + query *getBlockHeadersData // The query to execute for header retrieval + expect []common.Hash // The hashes of the block whose headers are expected + }{ + // A single random block should be retrievable by hash and number too + { + &getBlockHeadersData{Origin: hashOrNumber{Hash: pm.blockchain.GetBlockByNumber(limit / 2).Hash()}, Amount: 1}, + []common.Hash{pm.blockchain.GetBlockByNumber(limit / 2).Hash()}, + }, { + &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Amount: 1}, + []common.Hash{pm.blockchain.GetBlockByNumber(limit / 2).Hash()}, + }, + // Multiple headers should be retrievable in both directions + { + &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Amount: 3}, + []common.Hash{ + pm.blockchain.GetBlockByNumber(limit / 2).Hash(), + pm.blockchain.GetBlockByNumber(limit/2 + 1).Hash(), + pm.blockchain.GetBlockByNumber(limit/2 + 2).Hash(), + }, + }, { + &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Amount: 3, Reverse: true}, + []common.Hash{ + pm.blockchain.GetBlockByNumber(limit / 2).Hash(), + pm.blockchain.GetBlockByNumber(limit/2 - 1).Hash(), + pm.blockchain.GetBlockByNumber(limit/2 - 2).Hash(), + }, + }, + // Multiple headers with skip lists should be retrievable + { + &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Skip: 3, Amount: 3}, + []common.Hash{ + pm.blockchain.GetBlockByNumber(limit / 2).Hash(), + pm.blockchain.GetBlockByNumber(limit/2 + 4).Hash(), + pm.blockchain.GetBlockByNumber(limit/2 + 8).Hash(), + }, + }, { + &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Skip: 3, Amount: 3, Reverse: true}, + []common.Hash{ + pm.blockchain.GetBlockByNumber(limit / 2).Hash(), + pm.blockchain.GetBlockByNumber(limit/2 - 4).Hash(), + pm.blockchain.GetBlockByNumber(limit/2 - 8).Hash(), + }, + }, + // The chain endpoints should be retrievable + { + &getBlockHeadersData{Origin: hashOrNumber{Number: 0}, Amount: 1}, + []common.Hash{pm.blockchain.GetBlockByNumber(0).Hash()}, + }, { + &getBlockHeadersData{Origin: hashOrNumber{Number: pm.blockchain.CurrentBlock().NumberU64()}, Amount: 1}, + []common.Hash{pm.blockchain.CurrentBlock().Hash()}, + }, + // Ensure protocol limits are honored + { + &getBlockHeadersData{Origin: hashOrNumber{Number: pm.blockchain.CurrentBlock().NumberU64() - 1}, Amount: limit + 10, Reverse: true}, + pm.blockchain.GetBlockHashesFromHash(pm.blockchain.CurrentBlock().Hash(), limit), + }, + // Check that requesting more than available is handled gracefully + { + &getBlockHeadersData{Origin: hashOrNumber{Number: pm.blockchain.CurrentBlock().NumberU64() - 4}, Skip: 3, Amount: 3}, + []common.Hash{ + pm.blockchain.GetBlockByNumber(pm.blockchain.CurrentBlock().NumberU64() - 4).Hash(), + pm.blockchain.GetBlockByNumber(pm.blockchain.CurrentBlock().NumberU64()).Hash(), + }, + }, { + &getBlockHeadersData{Origin: hashOrNumber{Number: 4}, Skip: 3, Amount: 3, Reverse: true}, + []common.Hash{ + pm.blockchain.GetBlockByNumber(4).Hash(), + pm.blockchain.GetBlockByNumber(0).Hash(), + }, + }, + // Check that requesting more than available is handled gracefully, even if mid skip + { + &getBlockHeadersData{Origin: hashOrNumber{Number: pm.blockchain.CurrentBlock().NumberU64() - 4}, Skip: 2, Amount: 3}, + []common.Hash{ + pm.blockchain.GetBlockByNumber(pm.blockchain.CurrentBlock().NumberU64() - 4).Hash(), + pm.blockchain.GetBlockByNumber(pm.blockchain.CurrentBlock().NumberU64() - 1).Hash(), + }, + }, { + &getBlockHeadersData{Origin: hashOrNumber{Number: 4}, Skip: 2, Amount: 3, Reverse: true}, + []common.Hash{ + pm.blockchain.GetBlockByNumber(4).Hash(), + pm.blockchain.GetBlockByNumber(1).Hash(), + }, + }, + // Check a corner case where requesting more can iterate past the endpoints + { + &getBlockHeadersData{Origin: hashOrNumber{Number: 2}, Amount: 5, Reverse: true}, + []common.Hash{ + pm.blockchain.GetBlockByNumber(2).Hash(), + pm.blockchain.GetBlockByNumber(1).Hash(), + pm.blockchain.GetBlockByNumber(0).Hash(), + }, + }, + // Check a corner case where skipping overflow loops back into the chain start + { + &getBlockHeadersData{Origin: hashOrNumber{Hash: pm.blockchain.GetBlockByNumber(3).Hash()}, Amount: 2, Reverse: false, Skip: math.MaxUint64 - 1}, + []common.Hash{ + pm.blockchain.GetBlockByNumber(3).Hash(), + }, + }, + // Check a corner case where skipping overflow loops back to the same header + { + &getBlockHeadersData{Origin: hashOrNumber{Hash: pm.blockchain.GetBlockByNumber(1).Hash()}, Amount: 2, Reverse: false, Skip: math.MaxUint64}, + []common.Hash{ + pm.blockchain.GetBlockByNumber(1).Hash(), + }, + }, + // Check that non existing headers aren't returned + { + &getBlockHeadersData{Origin: hashOrNumber{Hash: unknown}, Amount: 1}, + []common.Hash{}, + }, { + &getBlockHeadersData{Origin: hashOrNumber{Number: pm.blockchain.CurrentBlock().NumberU64() + 1}, Amount: 1}, + []common.Hash{}, + }, + } + // Run each of the tests and verify the results against the chain + for i, tt := range tests { + // Collect the headers to expect in the response + headers := []*types.Header{} + for _, hash := range tt.expect { + headers = append(headers, pm.blockchain.GetBlockByHash(hash).Header()) + } + // Send the hash request and verify the response + p2p.Send(peer.app, 0x03, tt.query) + if err := p2p.ExpectMsg(peer.app, 0x04, headers); err != nil { + t.Errorf("test %d: headers mismatch: %v", i, err) + } + // If the test used number origins, repeat with hashes as the too + if tt.query.Origin.Hash == (common.Hash{}) { + if origin := pm.blockchain.GetBlockByNumber(tt.query.Origin.Number); origin != nil { + tt.query.Origin.Hash, tt.query.Origin.Number = origin.Hash(), 0 + + p2p.Send(peer.app, 0x03, tt.query) + if err := p2p.ExpectMsg(peer.app, 0x04, headers); err != nil { + t.Errorf("test %d: headers mismatch: %v", i, err) + } + } + } + } +} + +// Tests that block contents can be retrieved from a remote chain based on their hashes. +func TestGetBlockBodies62(t *testing.T) { testGetBlockBodies(t, 62) } +func TestGetBlockBodies63(t *testing.T) { testGetBlockBodies(t, 63) } + +func testGetBlockBodies(t *testing.T, protocol int) { + pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, downloader.MaxBlockFetch+15, nil, nil) + peer, _ := newTestPeer("peer", protocol, pm, true) + defer peer.close() + + // Create a batch of tests for various scenarios + limit := downloader.MaxBlockFetch + tests := []struct { + random int // Number of blocks to fetch randomly from the chain + explicit []common.Hash // Explicitly requested blocks + available []bool // Availability of explicitly requested blocks + expected int // Total number of existing blocks to expect + }{ + {1, nil, nil, 1}, // A single random block should be retrievable + {10, nil, nil, 10}, // Multiple random blocks should be retrievable + {limit, nil, nil, limit}, // The maximum possible blocks should be retrievable + {limit + 1, nil, nil, limit}, // No more than the possible block count should be returned + {0, []common.Hash{pm.blockchain.Genesis().Hash()}, []bool{true}, 1}, // The genesis block should be retrievable + {0, []common.Hash{pm.blockchain.CurrentBlock().Hash()}, []bool{true}, 1}, // The chains head block should be retrievable + {0, []common.Hash{{}}, []bool{false}, 0}, // A non existent block should not be returned + + // Existing and non-existing blocks interleaved should not cause problems + {0, []common.Hash{ + {}, + pm.blockchain.GetBlockByNumber(1).Hash(), + {}, + pm.blockchain.GetBlockByNumber(10).Hash(), + {}, + pm.blockchain.GetBlockByNumber(100).Hash(), + {}, + }, []bool{false, true, false, true, false, true, false}, 3}, + } + // Run each of the tests and verify the results against the chain + for i, tt := range tests { + // Collect the hashes to request, and the response to expect + hashes, seen := []common.Hash{}, make(map[int64]bool) + bodies := []*blockBody{} + + for j := 0; j < tt.random; j++ { + for { + num := rand.Int63n(int64(pm.blockchain.CurrentBlock().NumberU64())) + if !seen[num] { + seen[num] = true + + block := pm.blockchain.GetBlockByNumber(uint64(num)) + hashes = append(hashes, block.Hash()) + if len(bodies) < tt.expected { + bodies = append(bodies, &blockBody{Transactions: block.Transactions(), Uncles: block.Uncles()}) + } + break + } + } + } + for j, hash := range tt.explicit { + hashes = append(hashes, hash) + if tt.available[j] && len(bodies) < tt.expected { + block := pm.blockchain.GetBlockByHash(hash) + bodies = append(bodies, &blockBody{Transactions: block.Transactions(), Uncles: block.Uncles()}) + } + } + // Send the hash request and verify the response + p2p.Send(peer.app, 0x05, hashes) + if err := p2p.ExpectMsg(peer.app, 0x06, bodies); err != nil { + t.Errorf("test %d: bodies mismatch: %v", i, err) + } + } +} + +// Tests that the node state database can be retrieved based on hashes. +func TestGetNodeData63(t *testing.T) { testGetNodeData(t, 63) } + +func testGetNodeData(t *testing.T, protocol int) { + // Define three accounts to simulate transactions with + acc1Key, _ := crypto.HexToECDSA("8a1f9a8f95be41cd7ccb6168179afb4504aefe388d1e14474d32c45c72ce7b7a") + acc2Key, _ := crypto.HexToECDSA("49a7b37aa6f6645917e7b807e9d1c00d4fa71f18343b0d4122a4d2df64dd6fee") + acc1Addr := crypto.PubkeyToAddress(acc1Key.PublicKey) + acc2Addr := crypto.PubkeyToAddress(acc2Key.PublicKey) + + signer := types.HomesteadSigner{} + // Create a chain generator with some simple transactions (blatantly stolen from @fjl/chain_markets_test) + generator := func(i int, block *core.BlockGen) { + switch i { + case 0: + // In block 1, the test bank sends account #1 some ether. + tx, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBank), acc1Addr, big.NewInt(10000), params.TxGas, nil, nil), signer, testBankKey) + block.AddTx(tx) + case 1: + // In block 2, the test bank sends some more ether to account #1. + // acc1Addr passes it on to account #2. + tx1, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBank), acc1Addr, big.NewInt(1000), params.TxGas, nil, nil), signer, testBankKey) + tx2, _ := types.SignTx(types.NewTransaction(block.TxNonce(acc1Addr), acc2Addr, big.NewInt(1000), params.TxGas, nil, nil), signer, acc1Key) + block.AddTx(tx1) + block.AddTx(tx2) + case 2: + // Block 3 is empty but was mined by account #2. + block.SetCoinbase(acc2Addr) + block.SetExtra([]byte("yeehaw")) + case 3: + // Block 4 includes blocks 2 and 3 as uncle headers (with modified extra data). + b2 := block.PrevBlock(1).Header() + b2.Extra = []byte("foo") + block.AddUncle(b2) + b3 := block.PrevBlock(2).Header() + b3.Extra = []byte("foo") + block.AddUncle(b3) + } + } + // Assemble the test environment + pm, db := newTestProtocolManagerMust(t, downloader.FullSync, 4, generator, nil) + peer, _ := newTestPeer("peer", protocol, pm, true) + defer peer.close() + + // Fetch for now the entire chain db + hashes := []common.Hash{} + for _, key := range db.Keys() { + if len(key) == len(common.Hash{}) { + hashes = append(hashes, common.BytesToHash(key)) + } + } + p2p.Send(peer.app, 0x0d, hashes) + msg, err := peer.app.ReadMsg() + if err != nil { + t.Fatalf("failed to read node data response: %v", err) + } + if msg.Code != 0x0e { + t.Fatalf("response packet code mismatch: have %x, want %x", msg.Code, 0x0c) + } + var data [][]byte + if err := msg.Decode(&data); err != nil { + t.Fatalf("failed to decode response node data: %v", err) + } + // Verify that all hashes correspond to the requested data, and reconstruct a state tree + for i, want := range hashes { + if hash := crypto.Keccak256Hash(data[i]); hash != want { + t.Errorf("data hash mismatch: have %x, want %x", hash, want) + } + } + statedb := ethdb.NewMemDatabase() + for i := 0; i < len(data); i++ { + statedb.Put(hashes[i].Bytes(), data[i]) + } + accounts := []common.Address{testBank, acc1Addr, acc2Addr} + for i := uint64(0); i <= pm.blockchain.CurrentBlock().NumberU64(); i++ { + trie, _ := state.New(pm.blockchain.GetBlockByNumber(i).Root(), state.NewDatabase(statedb)) + + for j, acc := range accounts { + state, _ := pm.blockchain.State() + bw := state.GetBalance(acc) + bh := trie.GetBalance(acc) + + if (bw != nil && bh == nil) || (bw == nil && bh != nil) { + t.Errorf("test %d, account %d: balance mismatch: have %v, want %v", i, j, bh, bw) + } + if bw != nil && bh != nil && bw.Cmp(bw) != 0 { + t.Errorf("test %d, account %d: balance mismatch: have %v, want %v", i, j, bh, bw) + } + } + } +} + +// Tests that the transaction receipts can be retrieved based on hashes. +func TestGetReceipt63(t *testing.T) { testGetReceipt(t, 63) } + +func testGetReceipt(t *testing.T, protocol int) { + // Define three accounts to simulate transactions with + acc1Key, _ := crypto.HexToECDSA("8a1f9a8f95be41cd7ccb6168179afb4504aefe388d1e14474d32c45c72ce7b7a") + acc2Key, _ := crypto.HexToECDSA("49a7b37aa6f6645917e7b807e9d1c00d4fa71f18343b0d4122a4d2df64dd6fee") + acc1Addr := crypto.PubkeyToAddress(acc1Key.PublicKey) + acc2Addr := crypto.PubkeyToAddress(acc2Key.PublicKey) + + signer := types.HomesteadSigner{} + // Create a chain generator with some simple transactions (blatantly stolen from @fjl/chain_markets_test) + generator := func(i int, block *core.BlockGen) { + switch i { + case 0: + // In block 1, the test bank sends account #1 some ether. + tx, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBank), acc1Addr, big.NewInt(10000), params.TxGas, nil, nil), signer, testBankKey) + block.AddTx(tx) + case 1: + // In block 2, the test bank sends some more ether to account #1. + // acc1Addr passes it on to account #2. + tx1, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBank), acc1Addr, big.NewInt(1000), params.TxGas, nil, nil), signer, testBankKey) + tx2, _ := types.SignTx(types.NewTransaction(block.TxNonce(acc1Addr), acc2Addr, big.NewInt(1000), params.TxGas, nil, nil), signer, acc1Key) + block.AddTx(tx1) + block.AddTx(tx2) + case 2: + // Block 3 is empty but was mined by account #2. + block.SetCoinbase(acc2Addr) + block.SetExtra([]byte("yeehaw")) + case 3: + // Block 4 includes blocks 2 and 3 as uncle headers (with modified extra data). + b2 := block.PrevBlock(1).Header() + b2.Extra = []byte("foo") + block.AddUncle(b2) + b3 := block.PrevBlock(2).Header() + b3.Extra = []byte("foo") + block.AddUncle(b3) + } + } + // Assemble the test environment + pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 4, generator, nil) + peer, _ := newTestPeer("peer", protocol, pm, true) + defer peer.close() + + // Collect the hashes to request, and the response to expect + hashes, receipts := []common.Hash{}, []types.Receipts{} + for i := uint64(0); i <= pm.blockchain.CurrentBlock().NumberU64(); i++ { + block := pm.blockchain.GetBlockByNumber(i) + + hashes = append(hashes, block.Hash()) + receipts = append(receipts, pm.blockchain.GetReceiptsByHash(block.Hash())) + } + // Send the hash request and verify the response + p2p.Send(peer.app, 0x0f, hashes) + if err := p2p.ExpectMsg(peer.app, 0x10, receipts); err != nil { + t.Errorf("receipts mismatch: %v", err) + } +} diff --git a/dex/helper_test.go b/dex/helper_test.go index 7e3479958..dcda6f4d2 100644 --- a/dex/helper_test.go +++ b/dex/helper_test.go @@ -37,7 +37,7 @@ import ( "github.com/dexon-foundation/dexon/ethdb" "github.com/dexon-foundation/dexon/event" "github.com/dexon-foundation/dexon/p2p" - "github.com/dexon-foundation/dexon/p2p/discover" + "github.com/dexon-foundation/dexon/p2p/enode" "github.com/dexon-foundation/dexon/params" ) @@ -48,24 +48,47 @@ var ( // testP2PServer is a fake, helper p2p server for testing purposes. type testP2PServer struct { - added chan *discover.Node - removed chan *discover.Node + mu sync.Mutex + self *enode.Node + direct map[enode.NodeID]*enode.Node + group map[string][]*enode.Node } -func (s *testP2PServer) Self() *discover.Node { - return &discover.Node{} +func newTestP2PServer(self *enode.Node) *testP2PServer { + return &testP2PServer{ + self: self, + direct: make(map[enode.NodeID]*enode.Node), + group: make(map[string][]*enode.Node), + } } -func (s *testP2PServer) AddNotaryPeer(node *discover.Node) { - if s.added != nil { - s.added <- node - } +func (s *testP2PServer) Self() *enode.Node { + return s.self } -func (s *testP2PServer) RemoveNotaryPeer(node *discover.Node) { - if s.removed != nil { - s.removed <- node - } +func (s *testP2PServer) AddDirectPeer(node *enode.Node) { + s.mu.Lock() + defer s.mu.Unlock() + s.direct[node.ID] = node +} + +func (s *testP2PServer) RemoveDirectPeer(node *enode.Node) { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.direct, node.ID) +} + +func (s *testP2PServer) AddGroup( + name string, nodes []*enode.Node, num uint64) { + s.mu.Lock() + defer s.mu.Unlock() + s.group[name] = nodes +} + +func (s *testP2PServer) RemoveGroup(name string) { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.group, name) } // newTestProtocolManager creates a new protocol manager for testing purposes, @@ -88,11 +111,11 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func panic(err) } - pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db) + pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db, &testGovernance{}) if err != nil { return nil, nil, err } - pm.Start(&testP2PServer{}, 1000) + pm.Start(newTestP2PServer(&enode.Node{}), 1000) return pm, db, nil } @@ -157,6 +180,29 @@ func newTestTransaction(from *ecdsa.PrivateKey, nonce uint64, datasize int) *typ return tx } +// testGovernance is a fake, helper governance for testing purposes +type testGovernance struct { + getChainNumFunc func(uint64) uint32 + getNotarySetFunc func(uint32, uint64) map[string]struct{} + getDKGSetFunc func(uint64) map[string]struct{} +} + +func (g *testGovernance) GetChainNum(round uint64) uint32 { + return g.getChainNumFunc(round) +} + +func (g *testGovernance) GetNotarySet(chainID uint32, round uint64) map[string]struct{} { + return g.getNotarySetFunc(chainID, round) +} + +func (g *testGovernance) GetDKGSet(round uint64) map[string]struct{} { + return g.getDKGSetFunc(round) +} + +func (g *testGovernance) SubscribeNewCRSEvent(ch chan core.NewCRSEvent) event.Subscription { + return nil +} + // testPeer is a simulated peer to allow testing direct network calls. type testPeer struct { net p2p.MsgReadWriter // Network layer reader/writer to simulate remote messaging @@ -170,7 +216,7 @@ func newTestPeer(name string, version int, pm *ProtocolManager, shake bool) (*te app, net := p2p.MsgPipe() // Generate a random id and create the peer - var id discover.NodeID + var id enode.NodeID rand.Read(id[:]) peer := pm.newPeer(version, p2p.NewPeer(id, name, nil), net) diff --git a/dex/network.go b/dex/network.go index c3a0ef792..b19b46147 100644 --- a/dex/network.go +++ b/dex/network.go @@ -23,14 +23,6 @@ func (n *DexconNetwork) BroadcastVote(vote *types.Vote) { func (n *DexconNetwork) BroadcastBlock(block *types.Block) { } -// BroadcastRandomnessRequest broadcasts rand request to DKG set. -func (n *DexconNetwork) BroadcastRandomnessRequest(randRequest *types.AgreementResult) { -} - -// BroadcastRandomnessResult broadcasts rand request to Notary set. -func (n *DexconNetwork) BroadcastRandomnessResult(randResult *types.BlockRandomnessResult) { -} - // SendDKGPrivateShare sends PrivateShare to a DKG participant. func (n *DexconNetwork) SendDKGPrivateShare( pub crypto.PublicKey, prvShare *types.DKGPrivateShare) { @@ -47,6 +39,15 @@ func (n *DexconNetwork) BroadcastDKGPartialSignature( psig *types.DKGPartialSignature) { } +// BroadcastRandomnessRequest broadcasts rand request to DKG set. +func (n *DexconNetwork) BroadcastRandomnessRequest(randRequest *types.AgreementResult) { + +} + +// BroadcastRandomnessResult broadcasts rand request to Notary set. +func (n *DexconNetwork) BroadcastRandomnessResult(randResult *types.BlockRandomnessResult) { +} + // ReceiveChan returns a channel to receive messages from DEXON network. func (n *DexconNetwork) ReceiveChan() <-chan interface{} { return n.receiveChan diff --git a/dex/nodetable.go b/dex/nodetable.go new file mode 100644 index 000000000..929b168a8 --- /dev/null +++ b/dex/nodetable.go @@ -0,0 +1,79 @@ +package dex + +import ( + "net" + "sync" + + "github.com/dexon-foundation/dexon/common" + "github.com/dexon-foundation/dexon/crypto/sha3" + "github.com/dexon-foundation/dexon/event" + "github.com/dexon-foundation/dexon/p2p/enode" + "github.com/dexon-foundation/dexon/rlp" +) + +type NodeMeta struct { + ID enode.ID + IP net.IP + UDP int + TCP int + Timestamp uint64 + Sig []byte +} + +func (n *NodeMeta) Hash() (h common.Hash) { + hw := sha3.NewKeccak256() + rlp.Encode(hw, n) + hw.Sum(h[:0]) + return h +} + +type newMetasEvent struct{ Metas []*NodeMeta } + +type nodeTable struct { + mu sync.RWMutex + entry map[enode.ID]*NodeMeta + feed event.Feed +} + +func newNodeTable() *nodeTable { + return &nodeTable{ + entry: make(map[enode.ID]*NodeMeta), + } +} + +func (t *nodeTable) Get(id enode.ID) *NodeMeta { + t.mu.RLock() + defer t.mu.RUnlock() + return t.entry[id] +} + +func (t *nodeTable) Add(metas []*NodeMeta) { + t.mu.Lock() + defer t.mu.Unlock() + + var newMetas []*NodeMeta + for _, meta := range metas { + // TODO: validate the meta + if e, ok := t.entry[meta.ID]; ok && e.Timestamp > meta.Timestamp { + continue + } + t.entry[meta.ID] = meta + newMetas = append(newMetas, meta) + } + t.feed.Send(newMetasEvent{newMetas}) +} + +func (t *nodeTable) Metas() []*NodeMeta { + t.mu.RLock() + defer t.mu.RUnlock() + metas := make([]*NodeMeta, 0, len(t.entry)) + for _, meta := range t.entry { + metas = append(metas, meta) + } + return metas +} + +func (t *nodeTable) SubscribeNewMetasEvent( + ch chan<- newMetasEvent) event.Subscription { + return t.feed.Subscribe(ch) +} diff --git a/dex/nodetable_test.go b/dex/nodetable_test.go new file mode 100644 index 000000000..5b3f7de57 --- /dev/null +++ b/dex/nodetable_test.go @@ -0,0 +1,93 @@ +package dex + +import ( + "math/rand" + "testing" + "time" + + "github.com/dexon-foundation/dexon/common" + "github.com/dexon-foundation/dexon/p2p/enode" +) + +func TestNodeTable(t *testing.T) { + table := newNodeTable() + ch := make(chan newMetasEvent) + table.SubscribeNewMetasEvent(ch) + + metas1 := []*NodeMeta{ + &NodeMeta{ID: randomID()}, + &NodeMeta{ID: randomID()}, + } + + metas2 := []*NodeMeta{ + &NodeMeta{ID: randomID()}, + &NodeMeta{ID: randomID()}, + } + + go table.Add(metas1) + + select { + case newMetas := <-ch: + m := map[common.Hash]struct{}{} + for _, meta := range newMetas.Metas { + m[meta.Hash()] = struct{}{} + } + + if len(m) != len(metas1) { + t.Errorf("len mismatch: got %d, want: %d", + len(m), len(metas1)) + } + + for _, meta := range metas1 { + if _, ok := m[meta.Hash()]; !ok { + t.Errorf("expected meta (%s) not exists", meta.Hash()) + } + } + case <-time.After(1 * time.Second): + t.Error("did not receive new metas event within one second") + } + + go table.Add(metas2) + select { + case newMetas := <-ch: + m := map[common.Hash]struct{}{} + for _, meta := range newMetas.Metas { + m[meta.Hash()] = struct{}{} + } + + if len(m) != len(metas1) { + t.Errorf("len mismatch: got %d, want: %d", + len(m), len(metas2)) + } + + for _, meta := range metas2 { + if _, ok := m[meta.Hash()]; !ok { + t.Errorf("expected meta (%s) not exists", meta.Hash()) + } + } + case <-time.After(1 * time.Second): + t.Error("did not receive new metas event within one second") + } + + var metas []*NodeMeta + metas = append(metas, metas1...) + metas = append(metas, metas2...) + allMetas := table.Metas() + if len(allMetas) != len(metas) { + t.Errorf("all metas num mismatch: got %d, want %d", + len(metas), len(allMetas)) + } + + for _, m := range metas { + if m.Hash() != table.Get(m.ID).Hash() { + t.Errorf("meta (%s) mismatch", m.ID.String()) + } + } +} + +func randomID() (id enode.ID) { + for i := range id { + id[i] = byte(rand.Intn(255)) + } + return id +} diff --git a/dex/notaryset.go b/dex/notaryset.go deleted file mode 100644 index 74d259314..000000000 --- a/dex/notaryset.go +++ /dev/null @@ -1,203 +0,0 @@ -package dex - -import ( - "fmt" - "sync" - - "github.com/dexon-foundation/dexon/p2p/discover" -) - -type nodeInfo struct { - info *notaryNodeInfo - added bool -} - -func (n *nodeInfo) NewNode() *discover.Node { - return discover.NewNode(n.info.ID, n.info.IP, n.info.UDP, n.info.TCP) -} - -type notarySet struct { - round uint64 - m map[string]*nodeInfo - lock sync.RWMutex -} - -func newNotarySet(round uint64, s map[string]struct{}) *notarySet { - m := make(map[string]*nodeInfo) - for nodeID := range s { - m[nodeID] = &nodeInfo{} - } - - return ¬arySet{ - round: round, - m: m, - } -} - -// Call this function when the notaryNodeInfoMsg is received. -func (n *notarySet) AddInfo(info *notaryNodeInfo) error { - n.lock.Lock() - defer n.lock.Unlock() - - // check round - if info.Round != n.round { - return fmt.Errorf("invalid round") - } - - nInfo, ok := n.m[info.ID.String()] - if !ok { - return fmt.Errorf("not in notary set") - } - - // if the info exists check timstamp - if nInfo.info != nil { - if nInfo.info.Timestamp > info.Timestamp { - return fmt.Errorf("old msg") - } - } - - nInfo.info = info - return nil -} - -// MarkAdded mark the notary node as added -// to prevent duplcate addition in the future. -func (n *notarySet) MarkAdded(nodeID string) { - if info, ok := n.m[nodeID]; ok { - info.added = true - } -} - -// Return all nodes -func (n *notarySet) Nodes() []*discover.Node { - n.lock.RLock() - defer n.lock.RUnlock() - - list := make([]*discover.Node, 0, len(n.m)) - for _, info := range n.m { - list = append(list, info.NewNode()) - } - return list -} - -// Return nodes that need to be added to p2p server as notary node. -func (n *notarySet) NodesToAdd() []*discover.Node { - n.lock.RLock() - defer n.lock.RUnlock() - - var list []*discover.Node - for _, info := range n.m { - // craete a new discover.Node - if !info.added { - list = append(list, info.NewNode()) - } - } - return list -} - -type notarySetManager struct { - m map[uint64]*notarySet - lock sync.RWMutex - queued map[uint64]map[string]*notaryNodeInfo - round uint64 // biggest round of managed notary sets - newNotaryNodeInfoCh chan *notaryNodeInfo -} - -func newNotarySetManager( - newNotaryNodeInfoCh chan *notaryNodeInfo) *notarySetManager { - return ¬arySetManager{ - m: make(map[uint64]*notarySet), - queued: make(map[uint64]map[string]*notaryNodeInfo), - newNotaryNodeInfoCh: newNotaryNodeInfoCh, - } -} - -// Register injects a new notary set into the manager and -// processes the queued info. -func (n *notarySetManager) Register(r uint64, s *notarySet) { - n.lock.Lock() - defer n.lock.Unlock() - if r > n.round { - n.round = r - } - n.m[r] = s - n.processQueuedInfo() -} - -// Unregister removes the notary set of the given round. -func (n *notarySetManager) Unregister(r uint64) { - n.lock.Lock() - defer n.lock.Unlock() - delete(n.m, r) -} - -// Round returns the notary set of the given round. -func (n *notarySetManager) Round(r uint64) (*notarySet, bool) { - n.lock.RLock() - defer n.lock.RUnlock() - s, ok := n.m[r] - return s, ok -} - -// Before returns all the notary sets that before the given round. -func (n *notarySetManager) Before(r uint64) []*notarySet { - n.lock.RLock() - defer n.lock.RUnlock() - var list []*notarySet - for round, s := range n.m { - if round < r { - list = append(list, s) - } - } - return list -} - -// TryAddInfo associates the given info to the notary set if the notary set is -// managed by the manager. -// If the notary node info is belong to future notary set, queue the info. -func (n *notarySetManager) TryAddInfo(info *notaryNodeInfo) { - n.lock.Lock() - defer n.lock.Unlock() - n.tryAddInfo(info) -} - -// This function is extract for calling without lock. -// Make sure the caller already accquired the lock. -func (n *notarySetManager) tryAddInfo(info *notaryNodeInfo) { - if info.Round > n.round { - if q, ok := n.queued[info.Round]; ok { - q[info.Hash().String()] = info - return - } - n.queued[info.Round] = map[string]*notaryNodeInfo{ - info.Hash().String(): info, - } - return - } - - s, ok := n.Round(info.Round) - if !ok { - return - } - s.AddInfo(info) - - // TODO(sonic): handle timeout - n.newNotaryNodeInfoCh <- info -} - -func (n *notarySetManager) processQueuedInfo() { - n.lock.Lock() - defer n.lock.Unlock() - if q, ok := n.queued[n.round]; ok { - for _, info := range q { - n.tryAddInfo(info) - } - } - - // Clear queue infos before current round. - for round := range n.queued { - if round <= n.round { - delete(n.queued, round) - } - } -} diff --git a/dex/peer.go b/dex/peer.go index f1a4335d1..8c218f1d9 100644 --- a/dex/peer.go +++ b/dex/peer.go @@ -27,6 +27,7 @@ import ( "github.com/dexon-foundation/dexon/common" "github.com/dexon-foundation/dexon/core/types" "github.com/dexon-foundation/dexon/p2p" + "github.com/dexon-foundation/dexon/p2p/discover" "github.com/dexon-foundation/dexon/rlp" ) @@ -34,19 +35,20 @@ var ( errClosed = errors.New("peer set is closed") errAlreadyRegistered = errors.New("peer is already registered") errNotRegistered = errors.New("peer is not registered") - errInvalidRound = errors.New("invalid round") ) const ( maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS) + maxKnownMetas = 32768 // Maximum metas hashes to keep in the known list (prevent DOS) maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS) - maxKnownInfos = 1024 // maxQueuedTxs is the maximum number of transaction lists to queue up before // dropping broadcasts. This is a sensitive number as a transaction list might // contain a single transaction, or thousands. maxQueuedTxs = 128 + maxQueuedMetas = 512 + // maxQueuedProps is the maximum number of block propagations to queue up before // dropping broadcasts. There's not much point in queueing stale blocks, so a few // that might cover uncles should be enough. @@ -57,9 +59,9 @@ const ( // above some healthy uncle limit, so use that. maxQueuedAnns = 4 - maxQueuedInfos = 1024 - handshakeTimeout = 5 * time.Second + + groupNodeNum = 3 ) // PeerInfo represents a short summary of the Ethereum sub-protocol metadata known @@ -76,13 +78,27 @@ type propEvent struct { td *big.Int } +type setType uint32 + +const ( + dkgset = iota + notaryset +) + +type peerLabel struct { + set setType + chainID uint32 + round uint64 +} + type peer struct { id string *p2p.Peer rw p2p.MsgReadWriter - version int // Protocol version negotiated + version int // Protocol version negotiated + labels mapset.Set forkDrop *time.Timer // Timed connection dropper if forks aren't validated in time head common.Hash @@ -90,12 +106,12 @@ type peer struct { lock sync.RWMutex knownTxs mapset.Set // Set of transaction hashes known to be known by this peer + knownMetas mapset.Set // Set of node metas known to be known by this peer knownBlocks mapset.Set // Set of block hashes known to be known by this peer - knownInfos mapset.Set // Set of infos known to be known by this peer queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer + queuedMetas chan []*NodeMeta // Queue of node metas to broadcast to the peer queuedProps chan *propEvent // Queue of blocks to broadcast to the peer queuedAnns chan *types.Block // Queue of blocks to announce to the peer - queuedInfos chan *notaryNodeInfo // Queue of infos to broadcast to the peer term chan struct{} // Termination channel to stop the broadcaster } @@ -104,20 +120,21 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { Peer: p, rw: rw, version: version, - id: fmt.Sprintf("%x", p.ID().Bytes()[:8]), + labels: mapset.NewSet(), + id: p.ID().String(), knownTxs: mapset.NewSet(), + knownMetas: mapset.NewSet(), knownBlocks: mapset.NewSet(), - knownInfos: mapset.NewSet(), queuedTxs: make(chan []*types.Transaction, maxQueuedTxs), + queuedMetas: make(chan []*NodeMeta, maxQueuedMetas), queuedProps: make(chan *propEvent, maxQueuedProps), queuedAnns: make(chan *types.Block, maxQueuedAnns), - queuedInfos: make(chan *notaryNodeInfo, maxQueuedInfos), term: make(chan struct{}), } } // broadcast is a write loop that multiplexes block propagations, announcements, -// transaction and notary node infos broadcasts into the remote peer. +// transaction and notary node metas broadcasts into the remote peer. // The goal is to have an async writer that does not lock up node internals. func (p *peer) broadcast() { for { @@ -128,6 +145,12 @@ func (p *peer) broadcast() { } p.Log().Trace("Broadcast transactions", "count", len(txs)) + case metas := <-p.queuedMetas: + if err := p.SendNodeMetas(metas); err != nil { + return + } + p.Log().Trace("Broadcast node metas", "count", len(metas)) + case prop := <-p.queuedProps: if err := p.SendNewBlock(prop.block, prop.td); err != nil { return @@ -140,12 +163,6 @@ func (p *peer) broadcast() { } p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash()) - case info := <-p.queuedInfos: - if err := p.SendNotaryNodeInfo(info); err != nil { - return - } - p.Log().Trace("Broadcast notary node info") - case <-p.term: return } @@ -157,6 +174,14 @@ func (p *peer) close() { close(p.term) } +func (p *peer) addLabel(label peerLabel) { + p.labels.Add(label) +} + +func (p *peer) removeLabel(label peerLabel) { + p.labels.Remove(label) +} + // Info gathers and returns a collection of metadata known about a peer. func (p *peer) Info() *PeerInfo { hash, td := p.Head() @@ -207,11 +232,11 @@ func (p *peer) MarkTransaction(hash common.Hash) { p.knownTxs.Add(hash) } -func (p *peer) MarkNotaryNodeInfo(hash common.Hash) { - for p.knownInfos.Cardinality() >= maxKnownInfos { - p.knownInfos.Pop() +func (p *peer) MarkNodeMeta(hash common.Hash) { + for p.knownMetas.Cardinality() >= maxKnownMetas { + p.knownMetas.Pop() } - p.knownInfos.Add(hash) + p.knownMetas.Add(hash) } // SendTransactions sends transactions to the peer and includes the hashes @@ -236,21 +261,26 @@ func (p *peer) AsyncSendTransactions(txs []*types.Transaction) { } } -// SendNotaryNodeInfo sends the info to the peer and includes the hashes -// in its info hash set for future reference. -func (p *peer) SendNotaryNodeInfo(info *notaryNodeInfo) error { - return p2p.Send(p.rw, NotaryNodeInfoMsg, info) +// SendNodeMetas sends the metas to the peer and includes the hashes +// in its metas hash set for future reference. +func (p *peer) SendNodeMetas(metas []*NodeMeta) error { + for _, meta := range metas { + p.knownMetas.Add(meta.Hash()) + } + return p2p.Send(p.rw, MetaMsg, metas) } -// AsyncSendNotaryNodeInfo queues list of notary node info propagation to a +// AsyncSendNodeMeta queues list of notary node meta propagation to a // remote peer. If the peer's broadcast queue is full, the event is silently // dropped. -func (p *peer) AsyncSendNotaryNodeInfo(info *notaryNodeInfo) { +func (p *peer) AsyncSendNodeMetas(metas []*NodeMeta) { select { - case p.queuedInfos <- info: - p.knownInfos.Add(info.Hash()) + case p.queuedMetas <- metas: + for _, meta := range metas { + p.knownMetas.Add(meta.Hash()) + } default: - p.Log().Debug("Dropping notary node info propagation") + p.Log().Debug("Dropping node meta propagation", "count", len(metas)) } } @@ -431,7 +461,7 @@ func (p *peer) readStatus(network uint64, status *statusData, genesis common.Has // String implements fmt.Stringer. func (p *peer) String() string { return fmt.Sprintf("Peer %s [%s]", p.id, - fmt.Sprintf("eth/%2d", p.version), + fmt.Sprintf("dex/%2d", p.version), ) } @@ -441,18 +471,25 @@ type peerSet struct { peers map[string]*peer lock sync.RWMutex closed bool + tab *nodeTable - // TODO(sonic): revist this map after dexon core SDK is finalized. - // use types.ValidatorID? or implement Stringer for types.ValidatorID - notaryPeers map[uint64]map[string]*peer - round uint64 + srvr p2pServer + gov governance + peerLabels map[string]map[peerLabel]struct{} + notaryHistory map[uint64]struct{} + dkgHistory map[uint64]struct{} } // newPeerSet creates a new peer set to track the active participants. -func newPeerSet() *peerSet { +func newPeerSet(gov governance, srvr p2pServer, tab *nodeTable) *peerSet { return &peerSet{ - peers: make(map[string]*peer), - notaryPeers: make(map[uint64]map[string]*peer), + peers: make(map[string]*peer), + gov: gov, + srvr: srvr, + tab: tab, + peerLabels: make(map[string]map[peerLabel]struct{}), + notaryHistory: make(map[uint64]struct{}), + dkgHistory: make(map[uint64]struct{}), } } @@ -537,14 +574,14 @@ func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer { return list } -// PeersWithoutNotaryNodeInfo retrieves a list of peers that do not have a -// given info in their set of known hashes. -func (ps *peerSet) PeersWithoutNotaryNodeInfo(hash common.Hash) []*peer { +// PeersWithoutNodeMeta retrieves a list of peers that do not have a +// given meta in their set of known hashes. +func (ps *peerSet) PeersWithoutNodeMeta(hash common.Hash) []*peer { ps.lock.RLock() defer ps.lock.RUnlock() list := make([]*peer, 0, len(ps.peers)) for _, p := range ps.peers { - if p.knownInfos.Contains(hash) { + if !p.knownMetas.Contains(hash) { list = append(list, p) } } @@ -580,44 +617,171 @@ func (ps *peerSet) Close() { ps.closed = true } -// AddNotaryPeer adds a peer into notary peer of the given round. -// Caller of this function need to make sure that the peer is acutally in -// notary set. -func (ps *peerSet) AddNotaryPeer(round uint64, p *peer) error { +func (ps *peerSet) BuildNotaryConn(round uint64) { ps.lock.Lock() defer ps.lock.Unlock() - // TODO(sonic): revisit this round check after dexon core SDK is finalized. - if round < ps.round || round > ps.round+2 { - return errInvalidRound + if _, ok := ps.notaryHistory[round]; ok { + return } - if _, ok := ps.peers[p.id]; !ok { - return errNotRegistered + ps.notaryHistory[round] = struct{}{} + + selfID := ps.srvr.Self().ID.String() + for chainID := uint32(0); chainID < ps.gov.GetChainNum(round); chainID++ { + s := ps.gov.GetNotarySet(chainID, round) + + // not in notary set, add group + if _, ok := s[selfID]; !ok { + var nodes []*discover.Node + for id := range s { + nodes = append(nodes, ps.newNode(id)) + } + ps.srvr.AddGroup(notarySetName(chainID, round), nodes, groupNodeNum) + continue + } + + label := peerLabel{ + set: notaryset, + chainID: chainID, + round: round, + } + delete(s, selfID) + for id := range s { + ps.addDirectPeer(id, label) + } } +} - ps.notaryPeers[round][p.id] = p - return nil +func (ps *peerSet) ForgetNotaryConn(round uint64) { + ps.lock.Lock() + defer ps.lock.Unlock() + + // forget all the rounds before the given round + for r := range ps.notaryHistory { + if r <= round { + ps.forgetNotaryConn(r) + delete(ps.notaryHistory, r) + } + } } -// NotaryPeers return peers in notary set of the given round. -func (ps *peerSet) NotaryPeers(round uint64) []*peer { - ps.lock.RLock() - defer ps.lock.RUnlock() +func (ps *peerSet) forgetNotaryConn(round uint64) { + selfID := ps.srvr.Self().ID.String() + for chainID := uint32(0); chainID < ps.gov.GetChainNum(round); chainID++ { + s := ps.gov.GetNotarySet(chainID, round) + if _, ok := s[selfID]; !ok { + ps.srvr.RemoveGroup(notarySetName(chainID, round)) + continue + } - list := make([]*peer, 0, len(ps.notaryPeers[round])) - for _, p := range ps.notaryPeers[round] { - if _, ok := ps.peers[p.id]; ok { - list = append(list, p) + label := peerLabel{ + set: notaryset, + chainID: chainID, + round: round, + } + delete(s, selfID) + for id := range s { + ps.removeDirectPeer(id, label) } } - return list } -// NextRound moves notary peer set to next round. -func (ps *peerSet) NextRound() { +func notarySetName(chainID uint32, round uint64) string { + return fmt.Sprintf("%d-%d-notaryset", chainID, round) +} + +func (ps *peerSet) BuildDKGConn(round uint64) { ps.lock.Lock() defer ps.lock.Unlock() - delete(ps.notaryPeers, ps.round) - ps.round = ps.round + 1 + selfID := ps.srvr.Self().ID.String() + s := ps.gov.GetDKGSet(round) + if _, ok := s[selfID]; !ok { + return + } + ps.dkgHistory[round] = struct{}{} + + delete(s, selfID) + for id := range s { + ps.addDirectPeer(id, peerLabel{ + set: dkgset, + round: round, + }) + } +} + +func (ps *peerSet) ForgetDKGConn(round uint64) { + ps.lock.Lock() + defer ps.lock.Unlock() + + // forget all the rounds before the given round + for r := range ps.dkgHistory { + if r <= round { + ps.forgetDKGConn(r) + delete(ps.dkgHistory, r) + } + } +} + +func (ps *peerSet) forgetDKGConn(round uint64) { + selfID := ps.srvr.Self().ID.String() + s := ps.gov.GetDKGSet(round) + if _, ok := s[selfID]; !ok { + return + } + + delete(s, selfID) + label := peerLabel{ + set: dkgset, + round: round, + } + for id := range s { + ps.removeDirectPeer(id, label) + } +} + +// make sure the ps.lock is hold +func (ps *peerSet) addDirectPeer(id string, label peerLabel) { + // if the peer exists add the label + if p, ok := ps.peers[id]; ok { + p.addLabel(label) + } + + if _, ok := ps.peerLabels[id]; !ok { + ps.peerLabels[id] = make(map[peerLabel]struct{}) + } + + ps.peerLabels[id][label] = struct{}{} + ps.srvr.AddDirectPeer(ps.newNode(id)) +} + +// make sure the ps.lock is hold +func (ps *peerSet) removeDirectPeer(id string, label peerLabel) { + if p, ok := ps.peers[id]; ok { + p.removeLabel(label) + } + + delete(ps.peerLabels[id], label) + + if len(ps.peerLabels[id]) == 0 { + ps.srvr.RemoveDirectPeer(ps.newNode(id)) + delete(ps.peerLabels, id) + } +} + +func (ps *peerSet) newNode(id string) *enode.Node { + nodeID := enode.HexID(id) + meta := ps.tab.Get(enode.HexID(id)) + + var r enr.Record + r.Set(enr.ID(nodeID.String())) + r.Set(enr.IP(meta.IP)) + r.Set(enr.TCP(meta.TCP)) + r.Set(enr.UDP(meta.UDP)) + + n, err := enode.New(enode.ValidSchemes, &r) + if err != nil { + panic(err) + } + return n } diff --git a/dex/peer_test.go b/dex/peer_test.go new file mode 100644 index 000000000..bac6ed5ec --- /dev/null +++ b/dex/peer_test.go @@ -0,0 +1,628 @@ +package dex + +import ( + "fmt" + "math/big" + "testing" + + mapset "github.com/deckarep/golang-set" + "github.com/dexon-foundation/dexon/p2p/discover" + "github.com/dexon-foundation/dexon/p2p/enode" +) + +func TestPeerSetBuildAndForgetNotaryConn(t *testing.T) { + self := discover.Node{ID: nodeID(0)} + server := newTestP2PServer(&self) + table := newNodeTable() + + gov := &testGovernance{ + getChainNumFunc: func(uint64) uint32 { + return 3 + }, + } + + round10 := [][]enode.ID{ + []enode.ID{nodeID(0), nodeID(1), nodeID(2)}, + []enode.ID{nodeID(1), nodeID(3)}, + []enode.ID{nodeID(2), nodeID(4)}, + } + round11 := [][]enode.ID{ + []enode.ID{nodeID(0), nodeID(1), nodeID(5)}, + []enode.ID{nodeID(5), nodeID(6)}, + []enode.ID{nodeID(0), nodeID(2), nodeID(4)}, + } + round12 := [][]enode.ID{ + []enode.ID{nodeID(0), nodeID(3), nodeID(5)}, + []enode.ID{nodeID(0), nodeID(7), nodeID(8)}, + []enode.ID{nodeID(0), nodeID(2), nodeID(6)}, + } + + gov.getNotarySetFunc = func(cid uint32, round uint64) map[string]struct{} { + m := map[uint64][][]enode.ID{ + 10: round10, + 11: round11, + 12: round12, + } + return newTestNodeSet(m[round][cid]) + } + + ps := newPeerSet(gov, server, table) + peer1 := newDummyPeer(nodeID(1)) + peer2 := newDummyPeer(nodeID(2)) + var err error + err = ps.Register(peer1) + if err != nil { + t.Error(err) + } + err = ps.Register(peer2) + if err != nil { + t.Error(err) + } + + // build round 10 + ps.BuildNotaryConn(10) + + err = checkLabels(peer1, []peerLabel{ + peerLabel{notaryset, 0, 10}, + }) + if err != nil { + t.Error(err) + } + err = checkLabels(peer2, []peerLabel{ + peerLabel{notaryset, 0, 10}, + }) + if err != nil { + t.Error(err) + } + err = checkPeerLabels(ps, map[string][]peerLabel{ + nodeID(1).String(): []peerLabel{ + peerLabel{notaryset, 0, 10}, + }, + nodeID(2).String(): []peerLabel{ + peerLabel{notaryset, 0, 10}, + }, + }) + if err != nil { + t.Error(err) + } + err = checkPeerSetHistory(ps, []uint64{10}, notaryset) + if err != nil { + t.Error(err) + } + err = checkDirectPeer(server, []enode.ID{ + nodeID(1), nodeID(2), + }) + if err != nil { + t.Error(err) + } + err = checkGroup(server, []string{ + notarySetName(1, 10), + notarySetName(2, 10), + }) + if err != nil { + t.Error(err) + } + + // build round 11 + ps.BuildNotaryConn(11) + + err = checkLabels(peer1, []peerLabel{ + peerLabel{notaryset, 0, 10}, + peerLabel{notaryset, 0, 11}, + }) + if err != nil { + t.Error(err) + } + err = checkLabels(peer2, []peerLabel{ + peerLabel{notaryset, 0, 10}, + peerLabel{notaryset, 2, 11}, + }) + if err != nil { + t.Error(err) + } + err = checkPeerLabels(ps, map[string][]peerLabel{ + nodeID(1).String(): []peerLabel{ + peerLabel{notaryset, 0, 10}, + peerLabel{notaryset, 0, 11}, + }, + nodeID(2).String(): []peerLabel{ + peerLabel{notaryset, 0, 10}, + peerLabel{notaryset, 2, 11}, + }, + nodeID(4).String(): []peerLabel{ + peerLabel{notaryset, 2, 11}, + }, + nodeID(5).String(): []peerLabel{ + peerLabel{notaryset, 0, 11}, + }, + }) + if err != nil { + t.Error(err) + } + err = checkPeerSetHistory(ps, []uint64{10, 11}, notaryset) + if err != nil { + t.Error(err) + } + err = checkDirectPeer(server, []enode.ID{ + nodeID(1), nodeID(2), nodeID(4), nodeID(5), + }) + if err != nil { + t.Error(err) + } + err = checkGroup(server, []string{ + notarySetName(1, 10), + notarySetName(2, 10), + notarySetName(1, 11), + }) + if err != nil { + t.Error(err) + } + + // build round 12 + ps.BuildNotaryConn(12) + + err = checkLabels(peer1, []peerLabel{ + peerLabel{notaryset, 0, 10}, + peerLabel{notaryset, 0, 11}, + }) + if err != nil { + t.Error(err) + } + err = checkLabels(peer2, []peerLabel{ + peerLabel{notaryset, 0, 10}, + peerLabel{notaryset, 2, 11}, + peerLabel{notaryset, 2, 12}, + }) + if err != nil { + t.Error(err) + } + err = checkPeerLabels(ps, map[string][]peerLabel{ + nodeID(1).String(): []peerLabel{ + peerLabel{notaryset, 0, 10}, + peerLabel{notaryset, 0, 11}, + }, + nodeID(2).String(): []peerLabel{ + peerLabel{notaryset, 0, 10}, + peerLabel{notaryset, 2, 11}, + peerLabel{notaryset, 2, 12}, + }, + nodeID(3).String(): []peerLabel{ + peerLabel{notaryset, 0, 12}, + }, + nodeID(4).String(): []peerLabel{ + peerLabel{notaryset, 2, 11}, + }, + nodeID(5).String(): []peerLabel{ + peerLabel{notaryset, 0, 11}, + peerLabel{notaryset, 0, 12}, + }, + nodeID(6).String(): []peerLabel{ + peerLabel{notaryset, 2, 12}, + }, + nodeID(7).String(): []peerLabel{ + peerLabel{notaryset, 1, 12}, + }, + nodeID(8).String(): []peerLabel{ + peerLabel{notaryset, 1, 12}, + }, + }) + if err != nil { + t.Error(err) + } + err = checkPeerSetHistory(ps, []uint64{10, 11, 12}, notaryset) + if err != nil { + t.Error(err) + } + err = checkDirectPeer(server, []enode.ID{ + nodeID(1), nodeID(2), nodeID(3), nodeID(4), + nodeID(5), nodeID(6), nodeID(7), nodeID(8), + }) + if err != nil { + t.Error(err) + } + err = checkGroup(server, []string{ + notarySetName(1, 10), + notarySetName(2, 10), + notarySetName(1, 11), + }) + if err != nil { + t.Error(err) + } + + // forget round 11 + ps.ForgetNotaryConn(11) + + err = checkLabels(peer1, []peerLabel{}) + if err != nil { + t.Error(err) + } + err = checkLabels(peer2, []peerLabel{ + peerLabel{notaryset, 2, 12}, + }) + if err != nil { + t.Error(err) + } + err = checkPeerLabels(ps, map[string][]peerLabel{ + nodeID(2).String(): []peerLabel{ + peerLabel{notaryset, 2, 12}, + }, + nodeID(3).String(): []peerLabel{ + peerLabel{notaryset, 0, 12}, + }, + nodeID(5).String(): []peerLabel{ + peerLabel{notaryset, 0, 12}, + }, + nodeID(6).String(): []peerLabel{ + peerLabel{notaryset, 2, 12}, + }, + nodeID(7).String(): []peerLabel{ + peerLabel{notaryset, 1, 12}, + }, + nodeID(8).String(): []peerLabel{ + peerLabel{notaryset, 1, 12}, + }, + }) + if err != nil { + t.Error(err) + } + err = checkPeerSetHistory(ps, []uint64{12}, notaryset) + if err != nil { + t.Error(err) + } + err = checkDirectPeer(server, []enode.ID{ + nodeID(2), nodeID(3), + nodeID(5), nodeID(6), nodeID(7), nodeID(8), + }) + if err != nil { + t.Error(err) + } + err = checkGroup(server, []string{}) + if err != nil { + t.Error(err) + } + + // forget round 12 + ps.ForgetNotaryConn(12) + err = checkLabels(peer1, []peerLabel{}) + if err != nil { + t.Error(err) + } + err = checkLabels(peer2, []peerLabel{}) + if err != nil { + t.Error(err) + } + err = checkPeerLabels(ps, map[string][]peerLabel{}) + if err != nil { + t.Error(err) + } + err = checkPeerSetHistory(ps, []uint64{}, notaryset) + if err != nil { + t.Error(err) + } + err = checkDirectPeer(server, []enode.ID{}) + if err != nil { + t.Error(err) + } + err = checkGroup(server, []string{}) + if err != nil { + t.Error(err) + } + +} + +func TestPeerSetBuildDKGConn(t *testing.T) { + self := discover.Node{ID: nodeID(0)} + server := newTestP2PServer(&self) + table := newNodeTable() + + gov := &testGovernance{} + + gov.getDKGSetFunc = func(round uint64) map[string]struct{} { + m := map[uint64][]enode.ID{ + 10: []enode.ID{nodeID(0), nodeID(1), nodeID(2)}, + 11: []enode.ID{nodeID(1), nodeID(2), nodeID(5)}, + 12: []enode.ID{nodeID(0), nodeID(3), nodeID(5)}, + } + return newTestNodeSet(m[round]) + } + + ps := newPeerSet(gov, server, table) + peer1 := newDummyPeer(nodeID(1)) + peer2 := newDummyPeer(nodeID(2)) + var err error + err = ps.Register(peer1) + if err != nil { + t.Error(err) + } + err = ps.Register(peer2) + if err != nil { + t.Error(err) + } + + // build round 10 + ps.BuildDKGConn(10) + + err = checkLabels(peer1, []peerLabel{ + peerLabel{dkgset, 0, 10}, + }) + if err != nil { + t.Error(err) + } + err = checkLabels(peer2, []peerLabel{ + peerLabel{dkgset, 0, 10}, + }) + if err != nil { + t.Error(err) + } + err = checkPeerLabels(ps, map[string][]peerLabel{ + nodeID(1).String(): []peerLabel{ + peerLabel{dkgset, 0, 10}, + }, + nodeID(2).String(): []peerLabel{ + peerLabel{dkgset, 0, 10}, + }, + }) + if err != nil { + t.Error(err) + } + err = checkPeerSetHistory(ps, []uint64{10}, dkgset) + if err != nil { + t.Error(err) + } + err = checkDirectPeer(server, []enode.ID{ + nodeID(1), nodeID(2), + }) + if err != nil { + t.Error(err) + } + + // build round 11 + ps.BuildDKGConn(11) + + err = checkLabels(peer1, []peerLabel{ + peerLabel{dkgset, 0, 10}, + }) + if err != nil { + t.Error(err) + } + err = checkLabels(peer2, []peerLabel{ + peerLabel{dkgset, 0, 10}, + }) + if err != nil { + t.Error(err) + } + err = checkPeerLabels(ps, map[string][]peerLabel{ + nodeID(1).String(): []peerLabel{ + peerLabel{dkgset, 0, 10}, + }, + nodeID(2).String(): []peerLabel{ + peerLabel{dkgset, 0, 10}, + }, + }) + if err != nil { + t.Error(err) + } + err = checkPeerSetHistory(ps, []uint64{10}, dkgset) + if err != nil { + t.Error(err) + } + err = checkDirectPeer(server, []enode.ID{ + nodeID(1), nodeID(2), + }) + if err != nil { + t.Error(err) + } + + // build round 12 + ps.BuildDKGConn(12) + + err = checkLabels(peer1, []peerLabel{ + peerLabel{dkgset, 0, 10}, + }) + if err != nil { + t.Error(err) + } + err = checkLabels(peer2, []peerLabel{ + peerLabel{dkgset, 0, 10}, + }) + if err != nil { + t.Error(err) + } + err = checkPeerLabels(ps, map[string][]peerLabel{ + nodeID(1).String(): []peerLabel{ + peerLabel{dkgset, 0, 10}, + }, + nodeID(2).String(): []peerLabel{ + peerLabel{dkgset, 0, 10}, + }, + nodeID(3).String(): []peerLabel{ + peerLabel{dkgset, 0, 12}, + }, + nodeID(5).String(): []peerLabel{ + peerLabel{dkgset, 0, 12}, + }, + }) + if err != nil { + t.Error(err) + } + err = checkPeerSetHistory(ps, []uint64{10, 12}, dkgset) + if err != nil { + t.Error(err) + } + err = checkDirectPeer(server, []enode.ID{ + nodeID(1), nodeID(2), nodeID(3), nodeID(5), + }) + if err != nil { + t.Error(err) + } + + // forget round 11 + ps.ForgetDKGConn(11) + + err = checkLabels(peer1, []peerLabel{}) + if err != nil { + t.Error(err) + } + err = checkLabels(peer2, []peerLabel{}) + if err != nil { + t.Error(err) + } + err = checkPeerLabels(ps, map[string][]peerLabel{ + nodeID(3).String(): []peerLabel{ + peerLabel{dkgset, 0, 12}, + }, + nodeID(5).String(): []peerLabel{ + peerLabel{dkgset, 0, 12}, + }, + }) + if err != nil { + t.Error(err) + } + err = checkPeerSetHistory(ps, []uint64{12}, dkgset) + if err != nil { + t.Error(err) + } + err = checkDirectPeer(server, []enode.ID{ + nodeID(3), nodeID(5), + }) + if err != nil { + t.Error(err) + } + + // forget round 12 + ps.ForgetDKGConn(12) + err = checkLabels(peer1, []peerLabel{}) + if err != nil { + t.Error(err) + } + err = checkLabels(peer2, []peerLabel{}) + if err != nil { + t.Error(err) + } + err = checkPeerLabels(ps, map[string][]peerLabel{}) + if err != nil { + t.Error(err) + } + err = checkPeerSetHistory(ps, []uint64{}, dkgset) + if err != nil { + t.Error(err) + } + err = checkDirectPeer(server, []enode.ID{}) + if err != nil { + t.Error(err) + } +} + +func checkLabels(p *peer, want []peerLabel) error { + if p.labels.Cardinality() != len(want) { + return fmt.Errorf("num of labels mismatch: got %d, want %d", + p.labels.Cardinality(), len(want)) + } + + for _, label := range want { + if !p.labels.Contains(label) { + return fmt.Errorf("label %+v not exist", label) + } + } + return nil +} + +func checkPeerLabels(ps *peerSet, want map[string][]peerLabel) error { + if len(ps.peerLabels) != len(want) { + return fmt.Errorf("peer num mismatch: got %d, want %d", + len(ps.peerLabels), len(want)) + } + + for peerID, gotLabels := range ps.peerLabels { + wantLabels, ok := want[peerID] + if !ok { + return fmt.Errorf("peer id %s not exists", peerID) + } + + if len(gotLabels) != len(wantLabels) { + return fmt.Errorf( + "num of labels of peer id %s mismatch: got %d, want %d", + peerID, len(gotLabels), len(wantLabels)) + } + + for _, label := range wantLabels { + if _, ok := gotLabels[label]; !ok { + fmt.Errorf("label: %+v not exists", label) + } + } + } + return nil +} + +func checkPeerSetHistory(ps *peerSet, want []uint64, set setType) error { + var history map[uint64]struct{} + switch set { + case notaryset: + history = ps.notaryHistory + case dkgset: + history = ps.dkgHistory + default: + return fmt.Errorf("invalid set: %d", set) + } + + if len(history) != len(want) { + return fmt.Errorf("num of history mismatch: got %d, want %d", + len(history), len(want)) + } + + for _, r := range want { + if _, ok := history[r]; !ok { + return fmt.Errorf("round %d not exists", r) + } + } + return nil +} + +func checkDirectPeer(srvr *testP2PServer, want []enode.ID) error { + if len(srvr.direct) != len(want) { + return fmt.Errorf("num of direct peer mismatch: got %d, want %d", + len(srvr.direct), len(want)) + } + + for _, id := range want { + if _, ok := srvr.direct[id]; !ok { + return fmt.Errorf("direct peer %s not exists", id.String()) + } + } + return nil +} +func checkGroup(srvr *testP2PServer, want []string) error { + if len(srvr.group) != len(want) { + return fmt.Errorf("num of group mismatch: got %d, want %d", + len(srvr.group), len(want)) + } + + for _, name := range want { + if _, ok := srvr.group[name]; !ok { + return fmt.Errorf("group %s not exists", name) + } + } + return nil +} + +func nodeID(n int64) enode.ID { + b := big.NewInt(n).Bytes() + var id enode.ID + copy(id[len(id)-len(b):], b) + return id +} + +func newTestNodeSet(nodes []enode.ID) map[string]struct{} { + m := make(map[string]struct{}) + for _, node := range nodes { + m[node.String()] = struct{}{} + } + return m +} + +func newDummyPeer(id enode.ID) *peer { + return &peer{ + labels: mapset.NewSet(), + id: id.String(), + } +} diff --git a/dex/protocol.go b/dex/protocol.go index 0111edf18..7b01217ff 100644 --- a/dex/protocol.go +++ b/dex/protocol.go @@ -20,14 +20,11 @@ import ( "fmt" "io" "math/big" - "net" "github.com/dexon-foundation/dexon/common" "github.com/dexon-foundation/dexon/core" "github.com/dexon-foundation/dexon/core/types" - "github.com/dexon-foundation/dexon/crypto/sha3" "github.com/dexon-foundation/dexon/event" - "github.com/dexon-foundation/dexon/p2p/discover" "github.com/dexon-foundation/dexon/p2p/enode" "github.com/dexon-foundation/dexon/rlp" ) @@ -67,7 +64,7 @@ const ( ReceiptsMsg = 0x10 // Protocol messages belonging to dex/64 - NotaryNodeInfoMsg = 0x11 + MetaMsg = 0x11 ) type errCode int @@ -114,12 +111,26 @@ type txPool interface { SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription } +type governance interface { + GetChainNum(uint64) uint32 + + GetNotarySet(uint32, uint64) map[string]struct{} + + GetDKGSet(uint64) map[string]struct{} + + SubscribeNewCRSEvent(ch chan core.NewCRSEvent) event.Subscription +} + type p2pServer interface { Self() *enode.Node - AddNotaryPeer(*discover.Node) + AddDirectPeer(*enode.Node) + + RemoveDirectPeer(*enode.Node) - RemoveNotaryPeer(*discover.Node) + AddGroup(string, []*enode.Node, uint64) + + RemoveGroup(string) } // statusData is the network packet for the status message. @@ -195,22 +206,3 @@ type blockBody struct { // blockBodiesData is the network packet for block content distribution. type blockBodiesData []*blockBody - -// TODO(sonic): revisit this msg when dexon core SDK is finalized. -// notartyNodeInfo is the network packet for notary node ip info. -type notaryNodeInfo struct { - ID discover.NodeID - IP net.IP - UDP uint16 - TCP uint16 - Round uint64 - Sig []byte - Timestamp int64 -} - -func (n *notaryNodeInfo) Hash() (h common.Hash) { - hw := sha3.NewKeccak256() - rlp.Encode(hw, n) - hw.Sum(h[:0]) - return h -} diff --git a/dex/protocol_test.go b/dex/protocol_test.go index c1b6efcfc..8c7638b2b 100644 --- a/dex/protocol_test.go +++ b/dex/protocol_test.go @@ -221,3 +221,85 @@ func TestGetBlockHeadersDataEncodeDecode(t *testing.T) { } } } + +func TestRecvNodeMetas(t *testing.T) { + pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) + p, _ := newTestPeer("peer", dex64, pm, true) + defer pm.Stop() + defer p.close() + + meta := NodeMeta{ + ID: nodeID(1), + } + + ch := make(chan newMetasEvent) + pm.nodeTable.SubscribeNewMetasEvent(ch) + + if err := p2p.Send(p.app, MetaMsg, []interface{}{meta}); err != nil { + t.Fatalf("send error: %v", err) + } + + select { + case event := <-ch: + metas := event.Metas + if len(metas) != 1 { + t.Errorf("wrong number of new metas: got %d, want 1", len(metas)) + } else if metas[0].Hash() != meta.Hash() { + t.Errorf("added wrong meta hash: got %v, want %v", metas[0].Hash(), meta.Hash()) + } + case <-time.After(3 * time.Second): + t.Errorf("no newMetasEvent received within 3 seconds") + } +} + +func TestSendNodeMetas(t *testing.T) { + pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) + defer pm.Stop() + + allmetas := make([]*NodeMeta, 100) + for nonce := range allmetas { + allmetas[nonce] = &NodeMeta{ID: nodeID(int64(nonce))} + } + + // Connect several peers. They should all receive the pending transactions. + var wg sync.WaitGroup + checkmetas := func(p *testPeer) { + defer wg.Done() + defer p.close() + seen := make(map[common.Hash]bool) + for _, meta := range allmetas { + seen[meta.Hash()] = false + } + for n := 0; n < len(allmetas) && !t.Failed(); { + var metas []*NodeMeta + msg, err := p.app.ReadMsg() + if err != nil { + t.Errorf("%v: read error: %v", p.Peer, err) + } else if msg.Code != MetaMsg { + t.Errorf("%v: got code %d, want MetaMsg", p.Peer, msg.Code) + } + if err := msg.Decode(&metas); err != nil { + t.Errorf("%v: %v", p.Peer, err) + } + for _, meta := range metas { + hash := meta.Hash() + seenmeta, want := seen[hash] + if seenmeta { + t.Errorf("%v: got meta more than once: %x", p.Peer, hash) + } + if !want { + t.Errorf("%v: got unexpected meta: %x", p.Peer, hash) + } + seen[hash] = true + n++ + } + } + } + for i := 0; i < 3; i++ { + p, _ := newTestPeer(fmt.Sprintf("peer #%d", i), dex64, pm, true) + wg.Add(1) + go checkmetas(p) + } + pm.nodeTable.Add(allmetas) + wg.Wait() +} diff --git a/dex/sync.go b/dex/sync.go index d7fe748bc..5af6076bc 100644 --- a/dex/sync.go +++ b/dex/sync.go @@ -25,7 +25,7 @@ import ( "github.com/dexon-foundation/dexon/core/types" "github.com/dexon-foundation/dexon/eth/downloader" "github.com/dexon-foundation/dexon/log" - "github.com/dexon-foundation/dexon/p2p/discover" + "github.com/dexon-foundation/dexon/p2p/enode" ) const ( @@ -35,6 +35,9 @@ const ( // This is the target size for the packs of transactions sent by txsyncLoop. // A pack can get larger than this if a single transactions exceeds this size. txsyncPackSize = 100 * 1024 + + // This is the target number for the packs of metas sent by metasyncLoop. + metasyncPackNum = 1024 ) type txsync struct { @@ -64,7 +67,7 @@ func (pm *ProtocolManager) syncTransactions(p *peer) { // the transactions in small packs to one peer at a time. func (pm *ProtocolManager) txsyncLoop() { var ( - pending = make(map[discover.NodeID]*txsync) + pending = make(map[enode.ID]*txsync) sending = false // whether a send is active pack = new(txsync) // the pack that is being sent done = make(chan error, 1) // result of the send @@ -129,6 +132,94 @@ func (pm *ProtocolManager) txsyncLoop() { } } +type metasync struct { + p *peer + metas []*NodeMeta +} + +// syncNodeMetas starts sending all node metas to the given peer. +func (pm *ProtocolManager) syncNodeMetas(p *peer) { + metas := pm.nodeTable.Metas() + if len(metas) == 0 { + return + } + select { + case pm.metasyncCh <- &metasync{p, metas}: + case <-pm.quitSync: + } +} + +// metasyncLoop takes care of the initial node meta sync for each new +// connection. When a new peer appears, we relay all currently node metas. +// In order to minimise egress bandwidth usage, we send +// the metas in small packs to one peer at a time. +func (pm *ProtocolManager) metasyncLoop() { + var ( + pending = make(map[enode.ID]*metasync) + sending = false // whether a send is active + pack = new(metasync) // the pack that is being sent + done = make(chan error, 1) // result of the send + ) + + // send starts a sending a pack of transactions from the sync. + send := func(s *metasync) { + // Fill pack with node metas up to the target num. + var num int + pack.p = s.p + pack.metas = pack.metas[:0] + for i := 0; i < len(s.metas) && num < metasyncPackNum; i++ { + pack.metas = append(pack.metas, s.metas[i]) + num += 1 + } + // Remove the metas that will be sent. + s.metas = s.metas[:copy(s.metas, s.metas[len(pack.metas):])] + if len(s.metas) == 0 { + delete(pending, s.p.ID()) + } + // Send the pack in the background. + s.p.Log().Trace("Sending batch of transactions", "count", len(pack.metas), "bytes", num) + sending = true + go func() { done <- pack.p.SendNodeMetas(pack.metas) }() + } + + // pick chooses the next pending sync. + pick := func() *metasync { + if len(pending) == 0 { + return nil + } + n := rand.Intn(len(pending)) + 1 + for _, s := range pending { + if n--; n == 0 { + return s + } + } + return nil + } + + for { + select { + case s := <-pm.metasyncCh: + pending[s.p.ID()] = s + if !sending { + send(s) + } + case err := <-done: + sending = false + // Stop tracking peers that cause send failures. + if err != nil { + pack.p.Log().Debug("NodeMeta send failed", "err", err) + delete(pending, pack.p.ID()) + } + // Schedule the next send. + if s := pick(); s != nil { + send(s) + } + case <-pm.quitSync: + return + } + } +} + // syncer is responsible for periodically synchronising with the network, both // downloading hashes and blocks as well as handling the announcement handler. func (pm *ProtocolManager) syncer() { -- cgit v1.2.3