diff options
author | Sonic <sonic@cobinhood.com> | 2018-09-25 20:37:11 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@byzantine-lab.io> | 2019-06-12 17:23:38 +0800 |
commit | c08afdbc7741d4559683ff304157303af8766c89 (patch) | |
tree | 5125f6984343b51e3b95230f6f2816fa11a11386 | |
parent | 8b977d488bce2d9d9d29a4ddc460b1ffac44aed2 (diff) | |
download | go-tangerine-c08afdbc7741d4559683ff304157303af8766c89.tar go-tangerine-c08afdbc7741d4559683ff304157303af8766c89.tar.gz go-tangerine-c08afdbc7741d4559683ff304157303af8766c89.tar.bz2 go-tangerine-c08afdbc7741d4559683ff304157303af8766c89.tar.lz go-tangerine-c08afdbc7741d4559683ff304157303af8766c89.tar.xz go-tangerine-c08afdbc7741d4559683ff304157303af8766c89.tar.zst go-tangerine-c08afdbc7741d4559683ff304157303af8766c89.zip |
dex: redesign p2p network topology
- Let p2p server support direct connection and group connection.
- Introduce node meta table to maintain IP of all nodes in node set,
in memory and let nodes in the network can sync this table.
- Let peerSet able to manage direct connections to notary set and dkg set.
The mechanism to refresh the network topology when configuration round
change is not done yet.
-rw-r--r-- | core/events.go | 7 | ||||
-rw-r--r-- | dex/handler.go | 216 | ||||
-rw-r--r-- | dex/handler_test.go | 445 | ||||
-rw-r--r-- | dex/helper_test.go | 78 | ||||
-rw-r--r-- | dex/network.go | 17 | ||||
-rw-r--r-- | dex/nodetable.go | 79 | ||||
-rw-r--r-- | dex/nodetable_test.go | 93 | ||||
-rw-r--r-- | dex/notaryset.go | 203 | ||||
-rw-r--r-- | dex/peer.go | 296 | ||||
-rw-r--r-- | dex/peer_test.go | 628 | ||||
-rw-r--r-- | dex/protocol.go | 42 | ||||
-rw-r--r-- | dex/protocol_test.go | 82 | ||||
-rw-r--r-- | dex/sync.go | 95 | ||||
-rw-r--r-- | p2p/dial.go | 86 | ||||
-rw-r--r-- | p2p/dial_test.go | 217 | ||||
-rw-r--r-- | p2p/discover/table.go | 12 | ||||
-rw-r--r-- | p2p/discover/table_test.go | 14 | ||||
-rw-r--r-- | p2p/discover/table_util_test.go | 4 | ||||
-rw-r--r-- | p2p/peer.go | 2 | ||||
-rw-r--r-- | p2p/server.go | 271 | ||||
-rw-r--r-- | p2p/server_test.go | 192 |
21 files changed, 2423 insertions, 656 deletions
diff --git a/core/events.go b/core/events.go index 02a083002..e76aa4784 100644 --- a/core/events.go +++ b/core/events.go @@ -46,3 +46,10 @@ type ChainSideEvent struct { } type ChainHeadEvent struct{ Block *types.Block } + +type NewNotarySetEvent struct { + Round uint64 + Pubkeys map[string]struct{} // pubkeys in hex format +} + +type NewCRSEvent struct{ Round uint64 } diff --git a/dex/handler.go b/dex/handler.go index bc932fb28..5348b06f2 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -49,6 +49,8 @@ const ( // txChanSize is the size of channel listening to NewTxsEvent. // The number is referenced from the size of tx pool. txChanSize = 4096 + + metaChanSize = 10240 ) var ( @@ -59,11 +61,6 @@ var ( // not compatible (low protocol version restrictions and high requirements). var errIncompatibleConfig = errors.New("incompatible configuration") -type newNotarySetEvent struct { - Round uint64 - Set map[string]struct{} -} - func errResp(code errCode, format string, v ...interface{}) error { return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) } @@ -75,32 +72,35 @@ type ProtocolManager struct { acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing) txpool txPool + nodeTable *nodeTable + gov governance blockchain *core.BlockChain chainconfig *params.ChainConfig maxPeers int - downloader *downloader.Downloader - fetcher *fetcher.Fetcher - peers *peerSet - notarySetManager *notarySetManager + downloader *downloader.Downloader + fetcher *fetcher.Fetcher + peers *peerSet SubProtocols []p2p.Protocol eventMux *event.TypeMux txsCh chan core.NewTxsEvent txsSub event.Subscription + metasCh chan newMetasEvent + metasSub event.Subscription minedBlockSub *event.TypeMuxSubscription // channels for fetcher, syncer, txsyncLoop newPeerCh chan *peer txsyncCh chan *txsync + metasyncCh chan *metasync quitSync chan struct{} noMorePeers chan struct{} - // channels for notarySetLoop - newNotarySetCh chan newNotarySetEvent - newRoundCh chan uint64 - newNotaryNodeInfoCh chan *notaryNodeInfo + // channels for peerSetLoop + crsCh chan core.NewCRSEvent + crsSub event.Subscription srvr p2pServer @@ -111,24 +111,26 @@ type ProtocolManager struct { // NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // with the Ethereum network. -func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) { +func NewProtocolManager( + config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, + mux *event.TypeMux, txpool txPool, engine consensus.Engine, + blockchain *core.BlockChain, chaindb ethdb.Database, + gov governance) (*ProtocolManager, error) { + tab := newNodeTable() // Create the protocol manager with the base fields manager := &ProtocolManager{ - networkID: networkID, - eventMux: mux, - txpool: txpool, - blockchain: blockchain, - chainconfig: config, - peers: newPeerSet(), - newPeerCh: make(chan *peer), - noMorePeers: make(chan struct{}), - txsyncCh: make(chan *txsync), - quitSync: make(chan struct{}), - newNotarySetCh: make(chan newNotarySetEvent), - newRoundCh: make(chan uint64), - newNotaryNodeInfoCh: make(chan *notaryNodeInfo), - } - manager.notarySetManager = newNotarySetManager(manager.newNotaryNodeInfoCh) + networkID: networkID, + eventMux: mux, + txpool: txpool, + nodeTable: tab, + gov: gov, + blockchain: blockchain, + chainconfig: config, + newPeerCh: make(chan *peer), + noMorePeers: make(chan struct{}), + txsyncCh: make(chan *txsync), + quitSync: make(chan struct{}), + } // Figure out whether to allow fast sync or not if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 { @@ -216,22 +218,33 @@ func (pm *ProtocolManager) removePeer(id string) { func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) { pm.maxPeers = maxPeers pm.srvr = srvr + pm.peers = newPeerSet(pm.gov, pm.srvr, pm.nodeTable) + + // if our self in node set build the node info // broadcast transactions pm.txsCh = make(chan core.NewTxsEvent, txChanSize) pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh) go pm.txBroadcastLoop() + // broadcast node metas + pm.metasCh = make(chan newMetasEvent, metaChanSize) + pm.metasSub = pm.nodeTable.SubscribeNewMetasEvent(pm.metasCh) + go pm.metaBroadcastLoop() + // broadcast mined blocks pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) go pm.minedBroadcastLoop() - // run the notary set loop - go pm.notarySetLoop() + // run the peer set loop + pm.crsCh = make(chan core.NewCRSEvent) + pm.crsSub = pm.gov.SubscribeNewCRSEvent(pm.crsCh) + go pm.peerSetLoop() // start sync handlers go pm.syncer() go pm.txsyncLoop() + go pm.metasyncLoop() } func (pm *ProtocolManager) Stop() { @@ -301,6 +314,7 @@ func (pm *ProtocolManager) handle(p *peer) error { // Propagate existing transactions. new transactions appearing // after this will be sent via broadcasts. pm.syncTransactions(p) + pm.syncNodeMetas(p) // If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil { @@ -699,18 +713,18 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.txpool.AddRemotes(txs) - case msg.Code == NotaryNodeInfoMsg: - var info notaryNodeInfo - if err := msg.Decode(&info); err != nil { + case msg.Code == MetaMsg: + var metas []*NodeMeta + if err := msg.Decode(&metas); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - - // TODO(sonic): validate signature - p.MarkNotaryNodeInfo(info.Hash()) - // Broadcast to peers - pm.BroadcastNotaryNodeInfo(&info) - - pm.notarySetManager.TryAddInfo(&info) + for i, meta := range metas { + if meta == nil { + return errResp(ErrDecode, "node meta %d is nil", i) + } + p.MarkNodeMeta(meta.Hash()) + } + pm.nodeTable.Add(metas) default: return errResp(ErrInvalidMsgCode, "%v", msg.Code) } @@ -769,11 +783,20 @@ func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) { } } -// BroadcastNotaryNodeInfo will propagate notary node info to its peers. -func (pm *ProtocolManager) BroadcastNotaryNodeInfo(info *notaryNodeInfo) { - peers := pm.peers.PeersWithoutNotaryNodeInfo(info.Hash()) - for _, peer := range peers { - peer.AsyncSendNotaryNodeInfo(info) +// BroadcastMetas will propagate node metas to its peers. +func (pm *ProtocolManager) BroadcastMetas(metas []*NodeMeta) { + var metaset = make(map[*peer][]*NodeMeta) + + for _, meta := range metas { + peers := pm.peers.PeersWithoutNodeMeta(meta.Hash()) + for _, peer := range peers { + metaset[peer] = append(metaset[peer], meta) + } + log.Trace("Broadcast meta", "ID", meta.ID, "recipients", len(peers)) + } + + for peer, metas := range metaset { + peer.AsyncSendNodeMetas(metas) } } @@ -801,97 +824,32 @@ func (pm *ProtocolManager) txBroadcastLoop() { } } -// a loop keep building and maintaining peers in notary set. -func (pm *ProtocolManager) notarySetLoop() { - // TODO: - // * pm need to know current round, and whether we are in notary set. - // * (done) able to handle node info in the near future round. - // * check peer limit. - // * revisit channels (newNotarySetCh, newRoundCh, newNotaryNodeInfoCh) (buffered ?) - - // If we are in notary set and we are synced with network. - // events: - // * new notary set is determined, and we are in notary set. - // (TBD: subscribe the event or polling govornance) - // - advance round - // - build new notary set - // - remove old notary set, remove old notary set from static - - // * current round node info changed. - - self := pm.srvr.Self() - var round uint64 - +func (pm *ProtocolManager) metaBroadcastLoop() { for { select { - case event := <-pm.newNotarySetCh: - // new notary set is determined. - if _, ok := event.Set[self.ID.String()]; ok { - // initialize the new notary set and add it to pm.notarySetManager - s := newNotarySet(event.Round, event.Set) - pm.notarySetManager.Register(event.Round, s) - - // TODO: handle signature - pm.BroadcastNotaryNodeInfo(¬aryNodeInfo{ - ID: self.ID, - IP: self.IP, - UDP: self.UDP, - TCP: self.TCP, - Round: event.Round, - Timestamp: time.Now().Unix(), - }) - } - - case r := <-pm.newRoundCh: - // move to new round. - round = r + case event := <-pm.metasCh: + pm.BroadcastMetas(event.Metas) - // TODO: revisit this to make sure how many previous round's - // notary set we need to keep. - - // rmove notary set before current round, they are useless - for _, s := range pm.notarySetManager.Before(round) { - pm.removeNotarySetFromStatic(s) - } - - // prepare notary set connections for new round. - if pm.isInNotarySet(round + 1) { - // we assume the notary set for round + 1 is added to - // pm.notarySetManager before this step. - if n, ok := pm.notarySetManager.Round(round + 1); ok { - pm.addNotarySetToStatic(n) - } - } - - case <-pm.newNotaryNodeInfoCh: - // some node info in "current round" is updated. - // try to connect them by the new info. - if pm.isInNotarySet(round) { - if n, ok := pm.notarySetManager.Round(round); ok { - pm.addNotarySetToStatic(n) - } - } - - case <-pm.quitSync: + // Err() channel will be closed when unsubscribing. + case <-pm.metasSub.Err(): return } } } -func (pm *ProtocolManager) isInNotarySet(r uint64) bool { - return false -} - -func (pm *ProtocolManager) addNotarySetToStatic(n *notarySet) { - for _, node := range n.NodesToAdd() { - pm.srvr.AddNotaryPeer(node) - n.MarkAdded(node.ID.String()) - } -} - -func (pm *ProtocolManager) removeNotarySetFromStatic(n *notarySet) { - for _, node := range n.Nodes() { - pm.srvr.RemoveNotaryPeer(node) +// a loop keep building and maintaining peers in notary set. +// TODO: finish this +func (pm *ProtocolManager) peerSetLoop() { + for { + select { + case event := <-pm.crsCh: + pm.peers.BuildNotaryConn(event.Round) + pm.peers.BuildDKGConn(event.Round) + pm.peers.ForgetNotaryConn(event.Round - 1) + pm.peers.ForgetDKGConn(event.Round - 1) + case <-pm.quitSync: + return + } } } diff --git a/dex/handler_test.go b/dex/handler_test.go new file mode 100644 index 000000000..981362bb5 --- /dev/null +++ b/dex/handler_test.go @@ -0,0 +1,445 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package dex + +import ( + "math" + "math/big" + "math/rand" + "testing" + + "github.com/dexon-foundation/dexon/common" + "github.com/dexon-foundation/dexon/core" + "github.com/dexon-foundation/dexon/core/state" + "github.com/dexon-foundation/dexon/core/types" + "github.com/dexon-foundation/dexon/crypto" + "github.com/dexon-foundation/dexon/eth/downloader" + "github.com/dexon-foundation/dexon/ethdb" + "github.com/dexon-foundation/dexon/p2p" + "github.com/dexon-foundation/dexon/params" +) + +// Tests that protocol versions and modes of operations are matched up properly. +func TestProtocolCompatibility(t *testing.T) { + // Define the compatibility chart + tests := []struct { + version uint + mode downloader.SyncMode + compatible bool + }{ + {61, downloader.FullSync, true}, {62, downloader.FullSync, true}, {63, downloader.FullSync, true}, + {61, downloader.FastSync, true}, {62, downloader.FastSync, true}, {63, downloader.FastSync, true}, + } + // Make sure anything we screw up is restored + backup := ProtocolVersions + defer func() { ProtocolVersions = backup }() + + // Try all available compatibility configs and check for errors + for i, tt := range tests { + ProtocolVersions = []uint{tt.version} + + pm, _, err := newTestProtocolManager(tt.mode, 0, nil, nil) + if pm != nil { + defer pm.Stop() + } + if (err == nil && !tt.compatible) || (err != nil && tt.compatible) { + t.Errorf("test %d: compatibility mismatch: have error %v, want compatibility %v", i, err, tt.compatible) + } + } +} + +// Tests that block headers can be retrieved from a remote chain based on user queries. +func TestGetBlockHeaders62(t *testing.T) { testGetBlockHeaders(t, 62) } +func TestGetBlockHeaders63(t *testing.T) { testGetBlockHeaders(t, 63) } + +func testGetBlockHeaders(t *testing.T, protocol int) { + pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, downloader.MaxHashFetch+15, nil, nil) + peer, _ := newTestPeer("peer", protocol, pm, true) + defer peer.close() + + // Create a "random" unknown hash for testing + var unknown common.Hash + for i := range unknown { + unknown[i] = byte(i) + } + // Create a batch of tests for various scenarios + limit := uint64(downloader.MaxHeaderFetch) + tests := []struct { + query *getBlockHeadersData // The query to execute for header retrieval + expect []common.Hash // The hashes of the block whose headers are expected + }{ + // A single random block should be retrievable by hash and number too + { + &getBlockHeadersData{Origin: hashOrNumber{Hash: pm.blockchain.GetBlockByNumber(limit / 2).Hash()}, Amount: 1}, + []common.Hash{pm.blockchain.GetBlockByNumber(limit / 2).Hash()}, + }, { + &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Amount: 1}, + []common.Hash{pm.blockchain.GetBlockByNumber(limit / 2).Hash()}, + }, + // Multiple headers should be retrievable in both directions + { + &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Amount: 3}, + []common.Hash{ + pm.blockchain.GetBlockByNumber(limit / 2).Hash(), + pm.blockchain.GetBlockByNumber(limit/2 + 1).Hash(), + pm.blockchain.GetBlockByNumber(limit/2 + 2).Hash(), + }, + }, { + &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Amount: 3, Reverse: true}, + []common.Hash{ + pm.blockchain.GetBlockByNumber(limit / 2).Hash(), + pm.blockchain.GetBlockByNumber(limit/2 - 1).Hash(), + pm.blockchain.GetBlockByNumber(limit/2 - 2).Hash(), + }, + }, + // Multiple headers with skip lists should be retrievable + { + &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Skip: 3, Amount: 3}, + []common.Hash{ + pm.blockchain.GetBlockByNumber(limit / 2).Hash(), + pm.blockchain.GetBlockByNumber(limit/2 + 4).Hash(), + pm.blockchain.GetBlockByNumber(limit/2 + 8).Hash(), + }, + }, { + &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Skip: 3, Amount: 3, Reverse: true}, + []common.Hash{ + pm.blockchain.GetBlockByNumber(limit / 2).Hash(), + pm.blockchain.GetBlockByNumber(limit/2 - 4).Hash(), + pm.blockchain.GetBlockByNumber(limit/2 - 8).Hash(), + }, + }, + // The chain endpoints should be retrievable + { + &getBlockHeadersData{Origin: hashOrNumber{Number: 0}, Amount: 1}, + []common.Hash{pm.blockchain.GetBlockByNumber(0).Hash()}, + }, { + &getBlockHeadersData{Origin: hashOrNumber{Number: pm.blockchain.CurrentBlock().NumberU64()}, Amount: 1}, + []common.Hash{pm.blockchain.CurrentBlock().Hash()}, + }, + // Ensure protocol limits are honored + { + &getBlockHeadersData{Origin: hashOrNumber{Number: pm.blockchain.CurrentBlock().NumberU64() - 1}, Amount: limit + 10, Reverse: true}, + pm.blockchain.GetBlockHashesFromHash(pm.blockchain.CurrentBlock().Hash(), limit), + }, + // Check that requesting more than available is handled gracefully + { + &getBlockHeadersData{Origin: hashOrNumber{Number: pm.blockchain.CurrentBlock().NumberU64() - 4}, Skip: 3, Amount: 3}, + []common.Hash{ + pm.blockchain.GetBlockByNumber(pm.blockchain.CurrentBlock().NumberU64() - 4).Hash(), + pm.blockchain.GetBlockByNumber(pm.blockchain.CurrentBlock().NumberU64()).Hash(), + }, + }, { + &getBlockHeadersData{Origin: hashOrNumber{Number: 4}, Skip: 3, Amount: 3, Reverse: true}, + []common.Hash{ + pm.blockchain.GetBlockByNumber(4).Hash(), + pm.blockchain.GetBlockByNumber(0).Hash(), + }, + }, + // Check that requesting more than available is handled gracefully, even if mid skip + { + &getBlockHeadersData{Origin: hashOrNumber{Number: pm.blockchain.CurrentBlock().NumberU64() - 4}, Skip: 2, Amount: 3}, + []common.Hash{ + pm.blockchain.GetBlockByNumber(pm.blockchain.CurrentBlock().NumberU64() - 4).Hash(), + pm.blockchain.GetBlockByNumber(pm.blockchain.CurrentBlock().NumberU64() - 1).Hash(), + }, + }, { + &getBlockHeadersData{Origin: hashOrNumber{Number: 4}, Skip: 2, Amount: 3, Reverse: true}, + []common.Hash{ + pm.blockchain.GetBlockByNumber(4).Hash(), + pm.blockchain.GetBlockByNumber(1).Hash(), + }, + }, + // Check a corner case where requesting more can iterate past the endpoints + { + &getBlockHeadersData{Origin: hashOrNumber{Number: 2}, Amount: 5, Reverse: true}, + []common.Hash{ + pm.blockchain.GetBlockByNumber(2).Hash(), + pm.blockchain.GetBlockByNumber(1).Hash(), + pm.blockchain.GetBlockByNumber(0).Hash(), + }, + }, + // Check a corner case where skipping overflow loops back into the chain start + { + &getBlockHeadersData{Origin: hashOrNumber{Hash: pm.blockchain.GetBlockByNumber(3).Hash()}, Amount: 2, Reverse: false, Skip: math.MaxUint64 - 1}, + []common.Hash{ + pm.blockchain.GetBlockByNumber(3).Hash(), + }, + }, + // Check a corner case where skipping overflow loops back to the same header + { + &getBlockHeadersData{Origin: hashOrNumber{Hash: pm.blockchain.GetBlockByNumber(1).Hash()}, Amount: 2, Reverse: false, Skip: math.MaxUint64}, + []common.Hash{ + pm.blockchain.GetBlockByNumber(1).Hash(), + }, + }, + // Check that non existing headers aren't returned + { + &getBlockHeadersData{Origin: hashOrNumber{Hash: unknown}, Amount: 1}, + []common.Hash{}, + }, { + &getBlockHeadersData{Origin: hashOrNumber{Number: pm.blockchain.CurrentBlock().NumberU64() + 1}, Amount: 1}, + []common.Hash{}, + }, + } + // Run each of the tests and verify the results against the chain + for i, tt := range tests { + // Collect the headers to expect in the response + headers := []*types.Header{} + for _, hash := range tt.expect { + headers = append(headers, pm.blockchain.GetBlockByHash(hash).Header()) + } + // Send the hash request and verify the response + p2p.Send(peer.app, 0x03, tt.query) + if err := p2p.ExpectMsg(peer.app, 0x04, headers); err != nil { + t.Errorf("test %d: headers mismatch: %v", i, err) + } + // If the test used number origins, repeat with hashes as the too + if tt.query.Origin.Hash == (common.Hash{}) { + if origin := pm.blockchain.GetBlockByNumber(tt.query.Origin.Number); origin != nil { + tt.query.Origin.Hash, tt.query.Origin.Number = origin.Hash(), 0 + + p2p.Send(peer.app, 0x03, tt.query) + if err := p2p.ExpectMsg(peer.app, 0x04, headers); err != nil { + t.Errorf("test %d: headers mismatch: %v", i, err) + } + } + } + } +} + +// Tests that block contents can be retrieved from a remote chain based on their hashes. +func TestGetBlockBodies62(t *testing.T) { testGetBlockBodies(t, 62) } +func TestGetBlockBodies63(t *testing.T) { testGetBlockBodies(t, 63) } + +func testGetBlockBodies(t *testing.T, protocol int) { + pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, downloader.MaxBlockFetch+15, nil, nil) + peer, _ := newTestPeer("peer", protocol, pm, true) + defer peer.close() + + // Create a batch of tests for various scenarios + limit := downloader.MaxBlockFetch + tests := []struct { + random int // Number of blocks to fetch randomly from the chain + explicit []common.Hash // Explicitly requested blocks + available []bool // Availability of explicitly requested blocks + expected int // Total number of existing blocks to expect + }{ + {1, nil, nil, 1}, // A single random block should be retrievable + {10, nil, nil, 10}, // Multiple random blocks should be retrievable + {limit, nil, nil, limit}, // The maximum possible blocks should be retrievable + {limit + 1, nil, nil, limit}, // No more than the possible block count should be returned + {0, []common.Hash{pm.blockchain.Genesis().Hash()}, []bool{true}, 1}, // The genesis block should be retrievable + {0, []common.Hash{pm.blockchain.CurrentBlock().Hash()}, []bool{true}, 1}, // The chains head block should be retrievable + {0, []common.Hash{{}}, []bool{false}, 0}, // A non existent block should not be returned + + // Existing and non-existing blocks interleaved should not cause problems + {0, []common.Hash{ + {}, + pm.blockchain.GetBlockByNumber(1).Hash(), + {}, + pm.blockchain.GetBlockByNumber(10).Hash(), + {}, + pm.blockchain.GetBlockByNumber(100).Hash(), + {}, + }, []bool{false, true, false, true, false, true, false}, 3}, + } + // Run each of the tests and verify the results against the chain + for i, tt := range tests { + // Collect the hashes to request, and the response to expect + hashes, seen := []common.Hash{}, make(map[int64]bool) + bodies := []*blockBody{} + + for j := 0; j < tt.random; j++ { + for { + num := rand.Int63n(int64(pm.blockchain.CurrentBlock().NumberU64())) + if !seen[num] { + seen[num] = true + + block := pm.blockchain.GetBlockByNumber(uint64(num)) + hashes = append(hashes, block.Hash()) + if len(bodies) < tt.expected { + bodies = append(bodies, &blockBody{Transactions: block.Transactions(), Uncles: block.Uncles()}) + } + break + } + } + } + for j, hash := range tt.explicit { + hashes = append(hashes, hash) + if tt.available[j] && len(bodies) < tt.expected { + block := pm.blockchain.GetBlockByHash(hash) + bodies = append(bodies, &blockBody{Transactions: block.Transactions(), Uncles: block.Uncles()}) + } + } + // Send the hash request and verify the response + p2p.Send(peer.app, 0x05, hashes) + if err := p2p.ExpectMsg(peer.app, 0x06, bodies); err != nil { + t.Errorf("test %d: bodies mismatch: %v", i, err) + } + } +} + +// Tests that the node state database can be retrieved based on hashes. +func TestGetNodeData63(t *testing.T) { testGetNodeData(t, 63) } + +func testGetNodeData(t *testing.T, protocol int) { + // Define three accounts to simulate transactions with + acc1Key, _ := crypto.HexToECDSA("8a1f9a8f95be41cd7ccb6168179afb4504aefe388d1e14474d32c45c72ce7b7a") + acc2Key, _ := crypto.HexToECDSA("49a7b37aa6f6645917e7b807e9d1c00d4fa71f18343b0d4122a4d2df64dd6fee") + acc1Addr := crypto.PubkeyToAddress(acc1Key.PublicKey) + acc2Addr := crypto.PubkeyToAddress(acc2Key.PublicKey) + + signer := types.HomesteadSigner{} + // Create a chain generator with some simple transactions (blatantly stolen from @fjl/chain_markets_test) + generator := func(i int, block *core.BlockGen) { + switch i { + case 0: + // In block 1, the test bank sends account #1 some ether. + tx, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBank), acc1Addr, big.NewInt(10000), params.TxGas, nil, nil), signer, testBankKey) + block.AddTx(tx) + case 1: + // In block 2, the test bank sends some more ether to account #1. + // acc1Addr passes it on to account #2. + tx1, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBank), acc1Addr, big.NewInt(1000), params.TxGas, nil, nil), signer, testBankKey) + tx2, _ := types.SignTx(types.NewTransaction(block.TxNonce(acc1Addr), acc2Addr, big.NewInt(1000), params.TxGas, nil, nil), signer, acc1Key) + block.AddTx(tx1) + block.AddTx(tx2) + case 2: + // Block 3 is empty but was mined by account #2. + block.SetCoinbase(acc2Addr) + block.SetExtra([]byte("yeehaw")) + case 3: + // Block 4 includes blocks 2 and 3 as uncle headers (with modified extra data). + b2 := block.PrevBlock(1).Header() + b2.Extra = []byte("foo") + block.AddUncle(b2) + b3 := block.PrevBlock(2).Header() + b3.Extra = []byte("foo") + block.AddUncle(b3) + } + } + // Assemble the test environment + pm, db := newTestProtocolManagerMust(t, downloader.FullSync, 4, generator, nil) + peer, _ := newTestPeer("peer", protocol, pm, true) + defer peer.close() + + // Fetch for now the entire chain db + hashes := []common.Hash{} + for _, key := range db.Keys() { + if len(key) == len(common.Hash{}) { + hashes = append(hashes, common.BytesToHash(key)) + } + } + p2p.Send(peer.app, 0x0d, hashes) + msg, err := peer.app.ReadMsg() + if err != nil { + t.Fatalf("failed to read node data response: %v", err) + } + if msg.Code != 0x0e { + t.Fatalf("response packet code mismatch: have %x, want %x", msg.Code, 0x0c) + } + var data [][]byte + if err := msg.Decode(&data); err != nil { + t.Fatalf("failed to decode response node data: %v", err) + } + // Verify that all hashes correspond to the requested data, and reconstruct a state tree + for i, want := range hashes { + if hash := crypto.Keccak256Hash(data[i]); hash != want { + t.Errorf("data hash mismatch: have %x, want %x", hash, want) + } + } + statedb := ethdb.NewMemDatabase() + for i := 0; i < len(data); i++ { + statedb.Put(hashes[i].Bytes(), data[i]) + } + accounts := []common.Address{testBank, acc1Addr, acc2Addr} + for i := uint64(0); i <= pm.blockchain.CurrentBlock().NumberU64(); i++ { + trie, _ := state.New(pm.blockchain.GetBlockByNumber(i).Root(), state.NewDatabase(statedb)) + + for j, acc := range accounts { + state, _ := pm.blockchain.State() + bw := state.GetBalance(acc) + bh := trie.GetBalance(acc) + + if (bw != nil && bh == nil) || (bw == nil && bh != nil) { + t.Errorf("test %d, account %d: balance mismatch: have %v, want %v", i, j, bh, bw) + } + if bw != nil && bh != nil && bw.Cmp(bw) != 0 { + t.Errorf("test %d, account %d: balance mismatch: have %v, want %v", i, j, bh, bw) + } + } + } +} + +// Tests that the transaction receipts can be retrieved based on hashes. +func TestGetReceipt63(t *testing.T) { testGetReceipt(t, 63) } + +func testGetReceipt(t *testing.T, protocol int) { + // Define three accounts to simulate transactions with + acc1Key, _ := crypto.HexToECDSA("8a1f9a8f95be41cd7ccb6168179afb4504aefe388d1e14474d32c45c72ce7b7a") + acc2Key, _ := crypto.HexToECDSA("49a7b37aa6f6645917e7b807e9d1c00d4fa71f18343b0d4122a4d2df64dd6fee") + acc1Addr := crypto.PubkeyToAddress(acc1Key.PublicKey) + acc2Addr := crypto.PubkeyToAddress(acc2Key.PublicKey) + + signer := types.HomesteadSigner{} + // Create a chain generator with some simple transactions (blatantly stolen from @fjl/chain_markets_test) + generator := func(i int, block *core.BlockGen) { + switch i { + case 0: + // In block 1, the test bank sends account #1 some ether. + tx, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBank), acc1Addr, big.NewInt(10000), params.TxGas, nil, nil), signer, testBankKey) + block.AddTx(tx) + case 1: + // In block 2, the test bank sends some more ether to account #1. + // acc1Addr passes it on to account #2. + tx1, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBank), acc1Addr, big.NewInt(1000), params.TxGas, nil, nil), signer, testBankKey) + tx2, _ := types.SignTx(types.NewTransaction(block.TxNonce(acc1Addr), acc2Addr, big.NewInt(1000), params.TxGas, nil, nil), signer, acc1Key) + block.AddTx(tx1) + block.AddTx(tx2) + case 2: + // Block 3 is empty but was mined by account #2. + block.SetCoinbase(acc2Addr) + block.SetExtra([]byte("yeehaw")) + case 3: + // Block 4 includes blocks 2 and 3 as uncle headers (with modified extra data). + b2 := block.PrevBlock(1).Header() + b2.Extra = []byte("foo") + block.AddUncle(b2) + b3 := block.PrevBlock(2).Header() + b3.Extra = []byte("foo") + block.AddUncle(b3) + } + } + // Assemble the test environment + pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 4, generator, nil) + peer, _ := newTestPeer("peer", protocol, pm, true) + defer peer.close() + + // Collect the hashes to request, and the response to expect + hashes, receipts := []common.Hash{}, []types.Receipts{} + for i := uint64(0); i <= pm.blockchain.CurrentBlock().NumberU64(); i++ { + block := pm.blockchain.GetBlockByNumber(i) + + hashes = append(hashes, block.Hash()) + receipts = append(receipts, pm.blockchain.GetReceiptsByHash(block.Hash())) + } + // Send the hash request and verify the response + p2p.Send(peer.app, 0x0f, hashes) + if err := p2p.ExpectMsg(peer.app, 0x10, receipts); err != nil { + t.Errorf("receipts mismatch: %v", err) + } +} diff --git a/dex/helper_test.go b/dex/helper_test.go index 7e3479958..dcda6f4d2 100644 --- a/dex/helper_test.go +++ b/dex/helper_test.go @@ -37,7 +37,7 @@ import ( "github.com/dexon-foundation/dexon/ethdb" "github.com/dexon-foundation/dexon/event" "github.com/dexon-foundation/dexon/p2p" - "github.com/dexon-foundation/dexon/p2p/discover" + "github.com/dexon-foundation/dexon/p2p/enode" "github.com/dexon-foundation/dexon/params" ) @@ -48,24 +48,47 @@ var ( // testP2PServer is a fake, helper p2p server for testing purposes. type testP2PServer struct { - added chan *discover.Node - removed chan *discover.Node + mu sync.Mutex + self *enode.Node + direct map[enode.NodeID]*enode.Node + group map[string][]*enode.Node } -func (s *testP2PServer) Self() *discover.Node { - return &discover.Node{} +func newTestP2PServer(self *enode.Node) *testP2PServer { + return &testP2PServer{ + self: self, + direct: make(map[enode.NodeID]*enode.Node), + group: make(map[string][]*enode.Node), + } } -func (s *testP2PServer) AddNotaryPeer(node *discover.Node) { - if s.added != nil { - s.added <- node - } +func (s *testP2PServer) Self() *enode.Node { + return s.self } -func (s *testP2PServer) RemoveNotaryPeer(node *discover.Node) { - if s.removed != nil { - s.removed <- node - } +func (s *testP2PServer) AddDirectPeer(node *enode.Node) { + s.mu.Lock() + defer s.mu.Unlock() + s.direct[node.ID] = node +} + +func (s *testP2PServer) RemoveDirectPeer(node *enode.Node) { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.direct, node.ID) +} + +func (s *testP2PServer) AddGroup( + name string, nodes []*enode.Node, num uint64) { + s.mu.Lock() + defer s.mu.Unlock() + s.group[name] = nodes +} + +func (s *testP2PServer) RemoveGroup(name string) { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.group, name) } // newTestProtocolManager creates a new protocol manager for testing purposes, @@ -88,11 +111,11 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func panic(err) } - pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db) + pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db, &testGovernance{}) if err != nil { return nil, nil, err } - pm.Start(&testP2PServer{}, 1000) + pm.Start(newTestP2PServer(&enode.Node{}), 1000) return pm, db, nil } @@ -157,6 +180,29 @@ func newTestTransaction(from *ecdsa.PrivateKey, nonce uint64, datasize int) *typ return tx } +// testGovernance is a fake, helper governance for testing purposes +type testGovernance struct { + getChainNumFunc func(uint64) uint32 + getNotarySetFunc func(uint32, uint64) map[string]struct{} + getDKGSetFunc func(uint64) map[string]struct{} +} + +func (g *testGovernance) GetChainNum(round uint64) uint32 { + return g.getChainNumFunc(round) +} + +func (g *testGovernance) GetNotarySet(chainID uint32, round uint64) map[string]struct{} { + return g.getNotarySetFunc(chainID, round) +} + +func (g *testGovernance) GetDKGSet(round uint64) map[string]struct{} { + return g.getDKGSetFunc(round) +} + +func (g *testGovernance) SubscribeNewCRSEvent(ch chan core.NewCRSEvent) event.Subscription { + return nil +} + // testPeer is a simulated peer to allow testing direct network calls. type testPeer struct { net p2p.MsgReadWriter // Network layer reader/writer to simulate remote messaging @@ -170,7 +216,7 @@ func newTestPeer(name string, version int, pm *ProtocolManager, shake bool) (*te app, net := p2p.MsgPipe() // Generate a random id and create the peer - var id discover.NodeID + var id enode.NodeID rand.Read(id[:]) peer := pm.newPeer(version, p2p.NewPeer(id, name, nil), net) diff --git a/dex/network.go b/dex/network.go index c3a0ef792..b19b46147 100644 --- a/dex/network.go +++ b/dex/network.go @@ -23,14 +23,6 @@ func (n *DexconNetwork) BroadcastVote(vote *types.Vote) { func (n *DexconNetwork) BroadcastBlock(block *types.Block) { } -// BroadcastRandomnessRequest broadcasts rand request to DKG set. -func (n *DexconNetwork) BroadcastRandomnessRequest(randRequest *types.AgreementResult) { -} - -// BroadcastRandomnessResult broadcasts rand request to Notary set. -func (n *DexconNetwork) BroadcastRandomnessResult(randResult *types.BlockRandomnessResult) { -} - // SendDKGPrivateShare sends PrivateShare to a DKG participant. func (n *DexconNetwork) SendDKGPrivateShare( pub crypto.PublicKey, prvShare *types.DKGPrivateShare) { @@ -47,6 +39,15 @@ func (n *DexconNetwork) BroadcastDKGPartialSignature( psig *types.DKGPartialSignature) { } +// BroadcastRandomnessRequest broadcasts rand request to DKG set. +func (n *DexconNetwork) BroadcastRandomnessRequest(randRequest *types.AgreementResult) { + +} + +// BroadcastRandomnessResult broadcasts rand request to Notary set. +func (n *DexconNetwork) BroadcastRandomnessResult(randResult *types.BlockRandomnessResult) { +} + // ReceiveChan returns a channel to receive messages from DEXON network. func (n *DexconNetwork) ReceiveChan() <-chan interface{} { return n.receiveChan diff --git a/dex/nodetable.go b/dex/nodetable.go new file mode 100644 index 000000000..929b168a8 --- /dev/null +++ b/dex/nodetable.go @@ -0,0 +1,79 @@ +package dex + +import ( + "net" + "sync" + + "github.com/dexon-foundation/dexon/common" + "github.com/dexon-foundation/dexon/crypto/sha3" + "github.com/dexon-foundation/dexon/event" + "github.com/dexon-foundation/dexon/p2p/enode" + "github.com/dexon-foundation/dexon/rlp" +) + +type NodeMeta struct { + ID enode.ID + IP net.IP + UDP int + TCP int + Timestamp uint64 + Sig []byte +} + +func (n *NodeMeta) Hash() (h common.Hash) { + hw := sha3.NewKeccak256() + rlp.Encode(hw, n) + hw.Sum(h[:0]) + return h +} + +type newMetasEvent struct{ Metas []*NodeMeta } + +type nodeTable struct { + mu sync.RWMutex + entry map[enode.ID]*NodeMeta + feed event.Feed +} + +func newNodeTable() *nodeTable { + return &nodeTable{ + entry: make(map[enode.ID]*NodeMeta), + } +} + +func (t *nodeTable) Get(id enode.ID) *NodeMeta { + t.mu.RLock() + defer t.mu.RUnlock() + return t.entry[id] +} + +func (t *nodeTable) Add(metas []*NodeMeta) { + t.mu.Lock() + defer t.mu.Unlock() + + var newMetas []*NodeMeta + for _, meta := range metas { + // TODO: validate the meta + if e, ok := t.entry[meta.ID]; ok && e.Timestamp > meta.Timestamp { + continue + } + t.entry[meta.ID] = meta + newMetas = append(newMetas, meta) + } + t.feed.Send(newMetasEvent{newMetas}) +} + +func (t *nodeTable) Metas() []*NodeMeta { + t.mu.RLock() + defer t.mu.RUnlock() + metas := make([]*NodeMeta, 0, len(t.entry)) + for _, meta := range t.entry { + metas = append(metas, meta) + } + return metas +} + +func (t *nodeTable) SubscribeNewMetasEvent( + ch chan<- newMetasEvent) event.Subscription { + return t.feed.Subscribe(ch) +} diff --git a/dex/nodetable_test.go b/dex/nodetable_test.go new file mode 100644 index 000000000..5b3f7de57 --- /dev/null +++ b/dex/nodetable_test.go @@ -0,0 +1,93 @@ +package dex + +import ( + "math/rand" + "testing" + "time" + + "github.com/dexon-foundation/dexon/common" + "github.com/dexon-foundation/dexon/p2p/enode" +) + +func TestNodeTable(t *testing.T) { + table := newNodeTable() + ch := make(chan newMetasEvent) + table.SubscribeNewMetasEvent(ch) + + metas1 := []*NodeMeta{ + &NodeMeta{ID: randomID()}, + &NodeMeta{ID: randomID()}, + } + + metas2 := []*NodeMeta{ + &NodeMeta{ID: randomID()}, + &NodeMeta{ID: randomID()}, + } + + go table.Add(metas1) + + select { + case newMetas := <-ch: + m := map[common.Hash]struct{}{} + for _, meta := range newMetas.Metas { + m[meta.Hash()] = struct{}{} + } + + if len(m) != len(metas1) { + t.Errorf("len mismatch: got %d, want: %d", + len(m), len(metas1)) + } + + for _, meta := range metas1 { + if _, ok := m[meta.Hash()]; !ok { + t.Errorf("expected meta (%s) not exists", meta.Hash()) + } + } + case <-time.After(1 * time.Second): + t.Error("did not receive new metas event within one second") + } + + go table.Add(metas2) + select { + case newMetas := <-ch: + m := map[common.Hash]struct{}{} + for _, meta := range newMetas.Metas { + m[meta.Hash()] = struct{}{} + } + + if len(m) != len(metas1) { + t.Errorf("len mismatch: got %d, want: %d", + len(m), len(metas2)) + } + + for _, meta := range metas2 { + if _, ok := m[meta.Hash()]; !ok { + t.Errorf("expected meta (%s) not exists", meta.Hash()) + } + } + case <-time.After(1 * time.Second): + t.Error("did not receive new metas event within one second") + } + + var metas []*NodeMeta + metas = append(metas, metas1...) + metas = append(metas, metas2...) + allMetas := table.Metas() + if len(allMetas) != len(metas) { + t.Errorf("all metas num mismatch: got %d, want %d", + len(metas), len(allMetas)) + } + + for _, m := range metas { + if m.Hash() != table.Get(m.ID).Hash() { + t.Errorf("meta (%s) mismatch", m.ID.String()) + } + } +} + +func randomID() (id enode.ID) { + for i := range id { + id[i] = byte(rand.Intn(255)) + } + return id +} diff --git a/dex/notaryset.go b/dex/notaryset.go deleted file mode 100644 index 74d259314..000000000 --- a/dex/notaryset.go +++ /dev/null @@ -1,203 +0,0 @@ -package dex - -import ( - "fmt" - "sync" - - "github.com/dexon-foundation/dexon/p2p/discover" -) - -type nodeInfo struct { - info *notaryNodeInfo - added bool -} - -func (n *nodeInfo) NewNode() *discover.Node { - return discover.NewNode(n.info.ID, n.info.IP, n.info.UDP, n.info.TCP) -} - -type notarySet struct { - round uint64 - m map[string]*nodeInfo - lock sync.RWMutex -} - -func newNotarySet(round uint64, s map[string]struct{}) *notarySet { - m := make(map[string]*nodeInfo) - for nodeID := range s { - m[nodeID] = &nodeInfo{} - } - - return ¬arySet{ - round: round, - m: m, - } -} - -// Call this function when the notaryNodeInfoMsg is received. -func (n *notarySet) AddInfo(info *notaryNodeInfo) error { - n.lock.Lock() - defer n.lock.Unlock() - - // check round - if info.Round != n.round { - return fmt.Errorf("invalid round") - } - - nInfo, ok := n.m[info.ID.String()] - if !ok { - return fmt.Errorf("not in notary set") - } - - // if the info exists check timstamp - if nInfo.info != nil { - if nInfo.info.Timestamp > info.Timestamp { - return fmt.Errorf("old msg") - } - } - - nInfo.info = info - return nil -} - -// MarkAdded mark the notary node as added -// to prevent duplcate addition in the future. -func (n *notarySet) MarkAdded(nodeID string) { - if info, ok := n.m[nodeID]; ok { - info.added = true - } -} - -// Return all nodes -func (n *notarySet) Nodes() []*discover.Node { - n.lock.RLock() - defer n.lock.RUnlock() - - list := make([]*discover.Node, 0, len(n.m)) - for _, info := range n.m { - list = append(list, info.NewNode()) - } - return list -} - -// Return nodes that need to be added to p2p server as notary node. -func (n *notarySet) NodesToAdd() []*discover.Node { - n.lock.RLock() - defer n.lock.RUnlock() - - var list []*discover.Node - for _, info := range n.m { - // craete a new discover.Node - if !info.added { - list = append(list, info.NewNode()) - } - } - return list -} - -type notarySetManager struct { - m map[uint64]*notarySet - lock sync.RWMutex - queued map[uint64]map[string]*notaryNodeInfo - round uint64 // biggest round of managed notary sets - newNotaryNodeInfoCh chan *notaryNodeInfo -} - -func newNotarySetManager( - newNotaryNodeInfoCh chan *notaryNodeInfo) *notarySetManager { - return ¬arySetManager{ - m: make(map[uint64]*notarySet), - queued: make(map[uint64]map[string]*notaryNodeInfo), - newNotaryNodeInfoCh: newNotaryNodeInfoCh, - } -} - -// Register injects a new notary set into the manager and -// processes the queued info. -func (n *notarySetManager) Register(r uint64, s *notarySet) { - n.lock.Lock() - defer n.lock.Unlock() - if r > n.round { - n.round = r - } - n.m[r] = s - n.processQueuedInfo() -} - -// Unregister removes the notary set of the given round. -func (n *notarySetManager) Unregister(r uint64) { - n.lock.Lock() - defer n.lock.Unlock() - delete(n.m, r) -} - -// Round returns the notary set of the given round. -func (n *notarySetManager) Round(r uint64) (*notarySet, bool) { - n.lock.RLock() - defer n.lock.RUnlock() - s, ok := n.m[r] - return s, ok -} - -// Before returns all the notary sets that before the given round. -func (n *notarySetManager) Before(r uint64) []*notarySet { - n.lock.RLock() - defer n.lock.RUnlock() - var list []*notarySet - for round, s := range n.m { - if round < r { - list = append(list, s) - } - } - return list -} - -// TryAddInfo associates the given info to the notary set if the notary set is -// managed by the manager. -// If the notary node info is belong to future notary set, queue the info. -func (n *notarySetManager) TryAddInfo(info *notaryNodeInfo) { - n.lock.Lock() - defer n.lock.Unlock() - n.tryAddInfo(info) -} - -// This function is extract for calling without lock. -// Make sure the caller already accquired the lock. -func (n *notarySetManager) tryAddInfo(info *notaryNodeInfo) { - if info.Round > n.round { - if q, ok := n.queued[info.Round]; ok { - q[info.Hash().String()] = info - return - } - n.queued[info.Round] = map[string]*notaryNodeInfo{ - info.Hash().String(): info, - } - return - } - - s, ok := n.Round(info.Round) - if !ok { - return - } - s.AddInfo(info) - - // TODO(sonic): handle timeout - n.newNotaryNodeInfoCh <- info -} - -func (n *notarySetManager) processQueuedInfo() { - n.lock.Lock() - defer n.lock.Unlock() - if q, ok := n.queued[n.round]; ok { - for _, info := range q { - n.tryAddInfo(info) - } - } - - // Clear queue infos before current round. - for round := range n.queued { - if round <= n.round { - delete(n.queued, round) - } - } -} diff --git a/dex/peer.go b/dex/peer.go index f1a4335d1..8c218f1d9 100644 --- a/dex/peer.go +++ b/dex/peer.go @@ -27,6 +27,7 @@ import ( "github.com/dexon-foundation/dexon/common" "github.com/dexon-foundation/dexon/core/types" "github.com/dexon-foundation/dexon/p2p" + "github.com/dexon-foundation/dexon/p2p/discover" "github.com/dexon-foundation/dexon/rlp" ) @@ -34,19 +35,20 @@ var ( errClosed = errors.New("peer set is closed") errAlreadyRegistered = errors.New("peer is already registered") errNotRegistered = errors.New("peer is not registered") - errInvalidRound = errors.New("invalid round") ) const ( maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS) + maxKnownMetas = 32768 // Maximum metas hashes to keep in the known list (prevent DOS) maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS) - maxKnownInfos = 1024 // maxQueuedTxs is the maximum number of transaction lists to queue up before // dropping broadcasts. This is a sensitive number as a transaction list might // contain a single transaction, or thousands. maxQueuedTxs = 128 + maxQueuedMetas = 512 + // maxQueuedProps is the maximum number of block propagations to queue up before // dropping broadcasts. There's not much point in queueing stale blocks, so a few // that might cover uncles should be enough. @@ -57,9 +59,9 @@ const ( // above some healthy uncle limit, so use that. maxQueuedAnns = 4 - maxQueuedInfos = 1024 - handshakeTimeout = 5 * time.Second + + groupNodeNum = 3 ) // PeerInfo represents a short summary of the Ethereum sub-protocol metadata known @@ -76,13 +78,27 @@ type propEvent struct { td *big.Int } +type setType uint32 + +const ( + dkgset = iota + notaryset +) + +type peerLabel struct { + set setType + chainID uint32 + round uint64 +} + type peer struct { id string *p2p.Peer rw p2p.MsgReadWriter - version int // Protocol version negotiated + version int // Protocol version negotiated + labels mapset.Set forkDrop *time.Timer // Timed connection dropper if forks aren't validated in time head common.Hash @@ -90,12 +106,12 @@ type peer struct { lock sync.RWMutex knownTxs mapset.Set // Set of transaction hashes known to be known by this peer + knownMetas mapset.Set // Set of node metas known to be known by this peer knownBlocks mapset.Set // Set of block hashes known to be known by this peer - knownInfos mapset.Set // Set of infos known to be known by this peer queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer + queuedMetas chan []*NodeMeta // Queue of node metas to broadcast to the peer queuedProps chan *propEvent // Queue of blocks to broadcast to the peer queuedAnns chan *types.Block // Queue of blocks to announce to the peer - queuedInfos chan *notaryNodeInfo // Queue of infos to broadcast to the peer term chan struct{} // Termination channel to stop the broadcaster } @@ -104,20 +120,21 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { Peer: p, rw: rw, version: version, - id: fmt.Sprintf("%x", p.ID().Bytes()[:8]), + labels: mapset.NewSet(), + id: p.ID().String(), knownTxs: mapset.NewSet(), + knownMetas: mapset.NewSet(), knownBlocks: mapset.NewSet(), - knownInfos: mapset.NewSet(), queuedTxs: make(chan []*types.Transaction, maxQueuedTxs), + queuedMetas: make(chan []*NodeMeta, maxQueuedMetas), queuedProps: make(chan *propEvent, maxQueuedProps), queuedAnns: make(chan *types.Block, maxQueuedAnns), - queuedInfos: make(chan *notaryNodeInfo, maxQueuedInfos), term: make(chan struct{}), } } // broadcast is a write loop that multiplexes block propagations, announcements, -// transaction and notary node infos broadcasts into the remote peer. +// transaction and notary node metas broadcasts into the remote peer. // The goal is to have an async writer that does not lock up node internals. func (p *peer) broadcast() { for { @@ -128,6 +145,12 @@ func (p *peer) broadcast() { } p.Log().Trace("Broadcast transactions", "count", len(txs)) + case metas := <-p.queuedMetas: + if err := p.SendNodeMetas(metas); err != nil { + return + } + p.Log().Trace("Broadcast node metas", "count", len(metas)) + case prop := <-p.queuedProps: if err := p.SendNewBlock(prop.block, prop.td); err != nil { return @@ -140,12 +163,6 @@ func (p *peer) broadcast() { } p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash()) - case info := <-p.queuedInfos: - if err := p.SendNotaryNodeInfo(info); err != nil { - return - } - p.Log().Trace("Broadcast notary node info") - case <-p.term: return } @@ -157,6 +174,14 @@ func (p *peer) close() { close(p.term) } +func (p *peer) addLabel(label peerLabel) { + p.labels.Add(label) +} + +func (p *peer) removeLabel(label peerLabel) { + p.labels.Remove(label) +} + // Info gathers and returns a collection of metadata known about a peer. func (p *peer) Info() *PeerInfo { hash, td := p.Head() @@ -207,11 +232,11 @@ func (p *peer) MarkTransaction(hash common.Hash) { p.knownTxs.Add(hash) } -func (p *peer) MarkNotaryNodeInfo(hash common.Hash) { - for p.knownInfos.Cardinality() >= maxKnownInfos { - p.knownInfos.Pop() +func (p *peer) MarkNodeMeta(hash common.Hash) { + for p.knownMetas.Cardinality() >= maxKnownMetas { + p.knownMetas.Pop() } - p.knownInfos.Add(hash) + p.knownMetas.Add(hash) } // SendTransactions sends transactions to the peer and includes the hashes @@ -236,21 +261,26 @@ func (p *peer) AsyncSendTransactions(txs []*types.Transaction) { } } -// SendNotaryNodeInfo sends the info to the peer and includes the hashes -// in its info hash set for future reference. -func (p *peer) SendNotaryNodeInfo(info *notaryNodeInfo) error { - return p2p.Send(p.rw, NotaryNodeInfoMsg, info) +// SendNodeMetas sends the metas to the peer and includes the hashes +// in its metas hash set for future reference. +func (p *peer) SendNodeMetas(metas []*NodeMeta) error { + for _, meta := range metas { + p.knownMetas.Add(meta.Hash()) + } + return p2p.Send(p.rw, MetaMsg, metas) } -// AsyncSendNotaryNodeInfo queues list of notary node info propagation to a +// AsyncSendNodeMeta queues list of notary node meta propagation to a // remote peer. If the peer's broadcast queue is full, the event is silently // dropped. -func (p *peer) AsyncSendNotaryNodeInfo(info *notaryNodeInfo) { +func (p *peer) AsyncSendNodeMetas(metas []*NodeMeta) { select { - case p.queuedInfos <- info: - p.knownInfos.Add(info.Hash()) + case p.queuedMetas <- metas: + for _, meta := range metas { + p.knownMetas.Add(meta.Hash()) + } default: - p.Log().Debug("Dropping notary node info propagation") + p.Log().Debug("Dropping node meta propagation", "count", len(metas)) } } @@ -431,7 +461,7 @@ func (p *peer) readStatus(network uint64, status *statusData, genesis common.Has // String implements fmt.Stringer. func (p *peer) String() string { return fmt.Sprintf("Peer %s [%s]", p.id, - fmt.Sprintf("eth/%2d", p.version), + fmt.Sprintf("dex/%2d", p.version), ) } @@ -441,18 +471,25 @@ type peerSet struct { peers map[string]*peer lock sync.RWMutex closed bool + tab *nodeTable - // TODO(sonic): revist this map after dexon core SDK is finalized. - // use types.ValidatorID? or implement Stringer for types.ValidatorID - notaryPeers map[uint64]map[string]*peer - round uint64 + srvr p2pServer + gov governance + peerLabels map[string]map[peerLabel]struct{} + notaryHistory map[uint64]struct{} + dkgHistory map[uint64]struct{} } // newPeerSet creates a new peer set to track the active participants. -func newPeerSet() *peerSet { +func newPeerSet(gov governance, srvr p2pServer, tab *nodeTable) *peerSet { return &peerSet{ - peers: make(map[string]*peer), - notaryPeers: make(map[uint64]map[string]*peer), + peers: make(map[string]*peer), + gov: gov, + srvr: srvr, + tab: tab, + peerLabels: make(map[string]map[peerLabel]struct{}), + notaryHistory: make(map[uint64]struct{}), + dkgHistory: make(map[uint64]struct{}), } } @@ -537,14 +574,14 @@ func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer { return list } -// PeersWithoutNotaryNodeInfo retrieves a list of peers that do not have a -// given info in their set of known hashes. -func (ps *peerSet) PeersWithoutNotaryNodeInfo(hash common.Hash) []*peer { +// PeersWithoutNodeMeta retrieves a list of peers that do not have a +// given meta in their set of known hashes. +func (ps *peerSet) PeersWithoutNodeMeta(hash common.Hash) []*peer { ps.lock.RLock() defer ps.lock.RUnlock() list := make([]*peer, 0, len(ps.peers)) for _, p := range ps.peers { - if p.knownInfos.Contains(hash) { + if !p.knownMetas.Contains(hash) { list = append(list, p) } } @@ -580,44 +617,171 @@ func (ps *peerSet) Close() { ps.closed = true } -// AddNotaryPeer adds a peer into notary peer of the given round. -// Caller of this function need to make sure that the peer is acutally in -// notary set. -func (ps *peerSet) AddNotaryPeer(round uint64, p *peer) error { +func (ps *peerSet) BuildNotaryConn(round uint64) { ps.lock.Lock() defer ps.lock.Unlock() - // TODO(sonic): revisit this round check after dexon core SDK is finalized. - if round < ps.round || round > ps.round+2 { - return errInvalidRound + if _, ok := ps.notaryHistory[round]; ok { + return } - if _, ok := ps.peers[p.id]; !ok { - return errNotRegistered + ps.notaryHistory[round] = struct{}{} + + selfID := ps.srvr.Self().ID.String() + for chainID := uint32(0); chainID < ps.gov.GetChainNum(round); chainID++ { + s := ps.gov.GetNotarySet(chainID, round) + + // not in notary set, add group + if _, ok := s[selfID]; !ok { + var nodes []*discover.Node + for id := range s { + nodes = append(nodes, ps.newNode(id)) + } + ps.srvr.AddGroup(notarySetName(chainID, round), nodes, groupNodeNum) + continue + } + + label := peerLabel{ + set: notaryset, + chainID: chainID, + round: round, + } + delete(s, selfID) + for id := range s { + ps.addDirectPeer(id, label) + } } +} - ps.notaryPeers[round][p.id] = p - return nil +func (ps *peerSet) ForgetNotaryConn(round uint64) { + ps.lock.Lock() + defer ps.lock.Unlock() + + // forget all the rounds before the given round + for r := range ps.notaryHistory { + if r <= round { + ps.forgetNotaryConn(r) + delete(ps.notaryHistory, r) + } + } } -// NotaryPeers return peers in notary set of the given round. -func (ps *peerSet) NotaryPeers(round uint64) []*peer { - ps.lock.RLock() - defer ps.lock.RUnlock() +func (ps *peerSet) forgetNotaryConn(round uint64) { + selfID := ps.srvr.Self().ID.String() + for chainID := uint32(0); chainID < ps.gov.GetChainNum(round); chainID++ { + s := ps.gov.GetNotarySet(chainID, round) + if _, ok := s[selfID]; !ok { + ps.srvr.RemoveGroup(notarySetName(chainID, round)) + continue + } - list := make([]*peer, 0, len(ps.notaryPeers[round])) - for _, p := range ps.notaryPeers[round] { - if _, ok := ps.peers[p.id]; ok { - list = append(list, p) + label := peerLabel{ + set: notaryset, + chainID: chainID, + round: round, + } + delete(s, selfID) + for id := range s { + ps.removeDirectPeer(id, label) } } - return list } -// NextRound moves notary peer set to next round. -func (ps *peerSet) NextRound() { +func notarySetName(chainID uint32, round uint64) string { + return fmt.Sprintf("%d-%d-notaryset", chainID, round) +} + +func (ps *peerSet) BuildDKGConn(round uint64) { ps.lock.Lock() defer ps.lock.Unlock() - delete(ps.notaryPeers, ps.round) - ps.round = ps.round + 1 + selfID := ps.srvr.Self().ID.String() + s := ps.gov.GetDKGSet(round) + if _, ok := s[selfID]; !ok { + return + } + ps.dkgHistory[round] = struct{}{} + + delete(s, selfID) + for id := range s { + ps.addDirectPeer(id, peerLabel{ + set: dkgset, + round: round, + }) + } +} + +func (ps *peerSet) ForgetDKGConn(round uint64) { + ps.lock.Lock() + defer ps.lock.Unlock() + + // forget all the rounds before the given round + for r := range ps.dkgHistory { + if r <= round { + ps.forgetDKGConn(r) + delete(ps.dkgHistory, r) + } + } +} + +func (ps *peerSet) forgetDKGConn(round uint64) { + selfID := ps.srvr.Self().ID.String() + s := ps.gov.GetDKGSet(round) + if _, ok := s[selfID]; !ok { + return + } + + delete(s, selfID) + label := peerLabel{ + set: dkgset, + round: round, + } + for id := range s { + ps.removeDirectPeer(id, label) + } +} + +// make sure the ps.lock is hold +func (ps *peerSet) addDirectPeer(id string, label peerLabel) { + // if the peer exists add the label + if p, ok := ps.peers[id]; ok { + p.addLabel(label) + } + + if _, ok := ps.peerLabels[id]; !ok { + ps.peerLabels[id] = make(map[peerLabel]struct{}) + } + + ps.peerLabels[id][label] = struct{}{} + ps.srvr.AddDirectPeer(ps.newNode(id)) +} + +// make sure the ps.lock is hold +func (ps *peerSet) removeDirectPeer(id string, label peerLabel) { + if p, ok := ps.peers[id]; ok { + p.removeLabel(label) + } + + delete(ps.peerLabels[id], label) + + if len(ps.peerLabels[id]) == 0 { + ps.srvr.RemoveDirectPeer(ps.newNode(id)) + delete(ps.peerLabels, id) + } +} + +func (ps *peerSet) newNode(id string) *enode.Node { + nodeID := enode.HexID(id) + meta := ps.tab.Get(enode.HexID(id)) + + var r enr.Record + r.Set(enr.ID(nodeID.String())) + r.Set(enr.IP(meta.IP)) + r.Set(enr.TCP(meta.TCP)) + r.Set(enr.UDP(meta.UDP)) + + n, err := enode.New(enode.ValidSchemes, &r) + if err != nil { + panic(err) + } + return n } diff --git a/dex/peer_test.go b/dex/peer_test.go new file mode 100644 index 000000000..bac6ed5ec --- /dev/null +++ b/dex/peer_test.go @@ -0,0 +1,628 @@ +package dex + +import ( + "fmt" + "math/big" + "testing" + + mapset "github.com/deckarep/golang-set" + "github.com/dexon-foundation/dexon/p2p/discover" + "github.com/dexon-foundation/dexon/p2p/enode" +) + +func TestPeerSetBuildAndForgetNotaryConn(t *testing.T) { + self := discover.Node{ID: nodeID(0)} + server := newTestP2PServer(&self) + table := newNodeTable() + + gov := &testGovernance{ + getChainNumFunc: func(uint64) uint32 { + return 3 + }, + } + + round10 := [][]enode.ID{ + []enode.ID{nodeID(0), nodeID(1), nodeID(2)}, + []enode.ID{nodeID(1), nodeID(3)}, + []enode.ID{nodeID(2), nodeID(4)}, + } + round11 := [][]enode.ID{ + []enode.ID{nodeID(0), nodeID(1), nodeID(5)}, + []enode.ID{nodeID(5), nodeID(6)}, + []enode.ID{nodeID(0), nodeID(2), nodeID(4)}, + } + round12 := [][]enode.ID{ + []enode.ID{nodeID(0), nodeID(3), nodeID(5)}, + []enode.ID{nodeID(0), nodeID(7), nodeID(8)}, + []enode.ID{nodeID(0), nodeID(2), nodeID(6)}, + } + + gov.getNotarySetFunc = func(cid uint32, round uint64) map[string]struct{} { + m := map[uint64][][]enode.ID{ + 10: round10, + 11: round11, + 12: round12, + } + return newTestNodeSet(m[round][cid]) + } + + ps := newPeerSet(gov, server, table) + peer1 := newDummyPeer(nodeID(1)) + peer2 := newDummyPeer(nodeID(2)) + var err error + err = ps.Register(peer1) + if err != nil { + t.Error(err) + } + err = ps.Register(peer2) + if err != nil { + t.Error(err) + } + + // build round 10 + ps.BuildNotaryConn(10) + + err = checkLabels(peer1, []peerLabel{ + peerLabel{notaryset, 0, 10}, + }) + if err != nil { + t.Error(err) + } + err = checkLabels(peer2, []peerLabel{ + peerLabel{notaryset, 0, 10}, + }) + if err != nil { + t.Error(err) + } + err = checkPeerLabels(ps, map[string][]peerLabel{ + nodeID(1).String(): []peerLabel{ + peerLabel{notaryset, 0, 10}, + }, + nodeID(2).String(): []peerLabel{ + peerLabel{notaryset, 0, 10}, + }, + }) + if err != nil { + t.Error(err) + } + err = checkPeerSetHistory(ps, []uint64{10}, notaryset) + if err != nil { + t.Error(err) + } + err = checkDirectPeer(server, []enode.ID{ + nodeID(1), nodeID(2), + }) + if err != nil { + t.Error(err) + } + err = checkGroup(server, []string{ + notarySetName(1, 10), + notarySetName(2, 10), + }) + if err != nil { + t.Error(err) + } + + // build round 11 + ps.BuildNotaryConn(11) + + err = checkLabels(peer1, []peerLabel{ + peerLabel{notaryset, 0, 10}, + peerLabel{notaryset, 0, 11}, + }) + if err != nil { + t.Error(err) + } + err = checkLabels(peer2, []peerLabel{ + peerLabel{notaryset, 0, 10}, + peerLabel{notaryset, 2, 11}, + }) + if err != nil { + t.Error(err) + } + err = checkPeerLabels(ps, map[string][]peerLabel{ + nodeID(1).String(): []peerLabel{ + peerLabel{notaryset, 0, 10}, + peerLabel{notaryset, 0, 11}, + }, + nodeID(2).String(): []peerLabel{ + peerLabel{notaryset, 0, 10}, + peerLabel{notaryset, 2, 11}, + }, + nodeID(4).String(): []peerLabel{ + peerLabel{notaryset, 2, 11}, + }, + nodeID(5).String(): []peerLabel{ + peerLabel{notaryset, 0, 11}, + }, + }) + if err != nil { + t.Error(err) + } + err = checkPeerSetHistory(ps, []uint64{10, 11}, notaryset) + if err != nil { + t.Error(err) + } + err = checkDirectPeer(server, []enode.ID{ + nodeID(1), nodeID(2), nodeID(4), nodeID(5), + }) + if err != nil { + t.Error(err) + } + err = checkGroup(server, []string{ + notarySetName(1, 10), + notarySetName(2, 10), + notarySetName(1, 11), + }) + if err != nil { + t.Error(err) + } + + // build round 12 + ps.BuildNotaryConn(12) + + err = checkLabels(peer1, []peerLabel{ + peerLabel{notaryset, 0, 10}, + peerLabel{notaryset, 0, 11}, + }) + if err != nil { + t.Error(err) + } + err = checkLabels(peer2, []peerLabel{ + peerLabel{notaryset, 0, 10}, + peerLabel{notaryset, 2, 11}, + peerLabel{notaryset, 2, 12}, + }) + if err != nil { + t.Error(err) + } + err = checkPeerLabels(ps, map[string][]peerLabel{ + nodeID(1).String(): []peerLabel{ + peerLabel{notaryset, 0, 10}, + peerLabel{notaryset, 0, 11}, + }, + nodeID(2).String(): []peerLabel{ + peerLabel{notaryset, 0, 10}, + peerLabel{notaryset, 2, 11}, + peerLabel{notaryset, 2, 12}, + }, + nodeID(3).String(): []peerLabel{ + peerLabel{notaryset, 0, 12}, + }, + nodeID(4).String(): []peerLabel{ + peerLabel{notaryset, 2, 11}, + }, + nodeID(5).String(): []peerLabel{ + peerLabel{notaryset, 0, 11}, + peerLabel{notaryset, 0, 12}, + }, + nodeID(6).String(): []peerLabel{ + peerLabel{notaryset, 2, 12}, + }, + nodeID(7).String(): []peerLabel{ + peerLabel{notaryset, 1, 12}, + }, + nodeID(8).String(): []peerLabel{ + peerLabel{notaryset, 1, 12}, + }, + }) + if err != nil { + t.Error(err) + } + err = checkPeerSetHistory(ps, []uint64{10, 11, 12}, notaryset) + if err != nil { + t.Error(err) + } + err = checkDirectPeer(server, []enode.ID{ + nodeID(1), nodeID(2), nodeID(3), nodeID(4), + nodeID(5), nodeID(6), nodeID(7), nodeID(8), + }) + if err != nil { + t.Error(err) + } + err = checkGroup(server, []string{ + notarySetName(1, 10), + notarySetName(2, 10), + notarySetName(1, 11), + }) + if err != nil { + t.Error(err) + } + + // forget round 11 + ps.ForgetNotaryConn(11) + + err = checkLabels(peer1, []peerLabel{}) + if err != nil { + t.Error(err) + } + err = checkLabels(peer2, []peerLabel{ + peerLabel{notaryset, 2, 12}, + }) + if err != nil { + t.Error(err) + } + err = checkPeerLabels(ps, map[string][]peerLabel{ + nodeID(2).String(): []peerLabel{ + peerLabel{notaryset, 2, 12}, + }, + nodeID(3).String(): []peerLabel{ + peerLabel{notaryset, 0, 12}, + }, + nodeID(5).String(): []peerLabel{ + peerLabel{notaryset, 0, 12}, + }, + nodeID(6).String(): []peerLabel{ + peerLabel{notaryset, 2, 12}, + }, + nodeID(7).String(): []peerLabel{ + peerLabel{notaryset, 1, 12}, + }, + nodeID(8).String(): []peerLabel{ + peerLabel{notaryset, 1, 12}, + }, + }) + if err != nil { + t.Error(err) + } + err = checkPeerSetHistory(ps, []uint64{12}, notaryset) + if err != nil { + t.Error(err) + } + err = checkDirectPeer(server, []enode.ID{ + nodeID(2), nodeID(3), + nodeID(5), nodeID(6), nodeID(7), nodeID(8), + }) + if err != nil { + t.Error(err) + } + err = checkGroup(server, []string{}) + if err != nil { + t.Error(err) + } + + // forget round 12 + ps.ForgetNotaryConn(12) + err = checkLabels(peer1, []peerLabel{}) + if err != nil { + t.Error(err) + } + err = checkLabels(peer2, []peerLabel{}) + if err != nil { + t.Error(err) + } + err = checkPeerLabels(ps, map[string][]peerLabel{}) + if err != nil { + t.Error(err) + } + err = checkPeerSetHistory(ps, []uint64{}, notaryset) + if err != nil { + t.Error(err) + } + err = checkDirectPeer(server, []enode.ID{}) + if err != nil { + t.Error(err) + } + err = checkGroup(server, []string{}) + if err != nil { + t.Error(err) + } + +} + +func TestPeerSetBuildDKGConn(t *testing.T) { + self := discover.Node{ID: nodeID(0)} + server := newTestP2PServer(&self) + table := newNodeTable() + + gov := &testGovernance{} + + gov.getDKGSetFunc = func(round uint64) map[string]struct{} { + m := map[uint64][]enode.ID{ + 10: []enode.ID{nodeID(0), nodeID(1), nodeID(2)}, + 11: []enode.ID{nodeID(1), nodeID(2), nodeID(5)}, + 12: []enode.ID{nodeID(0), nodeID(3), nodeID(5)}, + } + return newTestNodeSet(m[round]) + } + + ps := newPeerSet(gov, server, table) + peer1 := newDummyPeer(nodeID(1)) + peer2 := newDummyPeer(nodeID(2)) + var err error + err = ps.Register(peer1) + if err != nil { + t.Error(err) + } + err = ps.Register(peer2) + if err != nil { + t.Error(err) + } + + // build round 10 + ps.BuildDKGConn(10) + + err = checkLabels(peer1, []peerLabel{ + peerLabel{dkgset, 0, 10}, + }) + if err != nil { + t.Error(err) + } + err = checkLabels(peer2, []peerLabel{ + peerLabel{dkgset, 0, 10}, + }) + if err != nil { + t.Error(err) + } + err = checkPeerLabels(ps, map[string][]peerLabel{ + nodeID(1).String(): []peerLabel{ + peerLabel{dkgset, 0, 10}, + }, + nodeID(2).String(): []peerLabel{ + peerLabel{dkgset, 0, 10}, + }, + }) + if err != nil { + t.Error(err) + } + err = checkPeerSetHistory(ps, []uint64{10}, dkgset) + if err != nil { + t.Error(err) + } + err = checkDirectPeer(server, []enode.ID{ + nodeID(1), nodeID(2), + }) + if err != nil { + t.Error(err) + } + + // build round 11 + ps.BuildDKGConn(11) + + err = checkLabels(peer1, []peerLabel{ + peerLabel{dkgset, 0, 10}, + }) + if err != nil { + t.Error(err) + } + err = checkLabels(peer2, []peerLabel{ + peerLabel{dkgset, 0, 10}, + }) + if err != nil { + t.Error(err) + } + err = checkPeerLabels(ps, map[string][]peerLabel{ + nodeID(1).String(): []peerLabel{ + peerLabel{dkgset, 0, 10}, + }, + nodeID(2).String(): []peerLabel{ + peerLabel{dkgset, 0, 10}, + }, + }) + if err != nil { + t.Error(err) + } + err = checkPeerSetHistory(ps, []uint64{10}, dkgset) + if err != nil { + t.Error(err) + } + err = checkDirectPeer(server, []enode.ID{ + nodeID(1), nodeID(2), + }) + if err != nil { + t.Error(err) + } + + // build round 12 + ps.BuildDKGConn(12) + + err = checkLabels(peer1, []peerLabel{ + peerLabel{dkgset, 0, 10}, + }) + if err != nil { + t.Error(err) + } + err = checkLabels(peer2, []peerLabel{ + peerLabel{dkgset, 0, 10}, + }) + if err != nil { + t.Error(err) + } + err = checkPeerLabels(ps, map[string][]peerLabel{ + nodeID(1).String(): []peerLabel{ + peerLabel{dkgset, 0, 10}, + }, + nodeID(2).String(): []peerLabel{ + peerLabel{dkgset, 0, 10}, + }, + nodeID(3).String(): []peerLabel{ + peerLabel{dkgset, 0, 12}, + }, + nodeID(5).String(): []peerLabel{ + peerLabel{dkgset, 0, 12}, + }, + }) + if err != nil { + t.Error(err) + } + err = checkPeerSetHistory(ps, []uint64{10, 12}, dkgset) + if err != nil { + t.Error(err) + } + err = checkDirectPeer(server, []enode.ID{ + nodeID(1), nodeID(2), nodeID(3), nodeID(5), + }) + if err != nil { + t.Error(err) + } + + // forget round 11 + ps.ForgetDKGConn(11) + + err = checkLabels(peer1, []peerLabel{}) + if err != nil { + t.Error(err) + } + err = checkLabels(peer2, []peerLabel{}) + if err != nil { + t.Error(err) + } + err = checkPeerLabels(ps, map[string][]peerLabel{ + nodeID(3).String(): []peerLabel{ + peerLabel{dkgset, 0, 12}, + }, + nodeID(5).String(): []peerLabel{ + peerLabel{dkgset, 0, 12}, + }, + }) + if err != nil { + t.Error(err) + } + err = checkPeerSetHistory(ps, []uint64{12}, dkgset) + if err != nil { + t.Error(err) + } + err = checkDirectPeer(server, []enode.ID{ + nodeID(3), nodeID(5), + }) + if err != nil { + t.Error(err) + } + + // forget round 12 + ps.ForgetDKGConn(12) + err = checkLabels(peer1, []peerLabel{}) + if err != nil { + t.Error(err) + } + err = checkLabels(peer2, []peerLabel{}) + if err != nil { + t.Error(err) + } + err = checkPeerLabels(ps, map[string][]peerLabel{}) + if err != nil { + t.Error(err) + } + err = checkPeerSetHistory(ps, []uint64{}, dkgset) + if err != nil { + t.Error(err) + } + err = checkDirectPeer(server, []enode.ID{}) + if err != nil { + t.Error(err) + } +} + +func checkLabels(p *peer, want []peerLabel) error { + if p.labels.Cardinality() != len(want) { + return fmt.Errorf("num of labels mismatch: got %d, want %d", + p.labels.Cardinality(), len(want)) + } + + for _, label := range want { + if !p.labels.Contains(label) { + return fmt.Errorf("label %+v not exist", label) + } + } + return nil +} + +func checkPeerLabels(ps *peerSet, want map[string][]peerLabel) error { + if len(ps.peerLabels) != len(want) { + return fmt.Errorf("peer num mismatch: got %d, want %d", + len(ps.peerLabels), len(want)) + } + + for peerID, gotLabels := range ps.peerLabels { + wantLabels, ok := want[peerID] + if !ok { + return fmt.Errorf("peer id %s not exists", peerID) + } + + if len(gotLabels) != len(wantLabels) { + return fmt.Errorf( + "num of labels of peer id %s mismatch: got %d, want %d", + peerID, len(gotLabels), len(wantLabels)) + } + + for _, label := range wantLabels { + if _, ok := gotLabels[label]; !ok { + fmt.Errorf("label: %+v not exists", label) + } + } + } + return nil +} + +func checkPeerSetHistory(ps *peerSet, want []uint64, set setType) error { + var history map[uint64]struct{} + switch set { + case notaryset: + history = ps.notaryHistory + case dkgset: + history = ps.dkgHistory + default: + return fmt.Errorf("invalid set: %d", set) + } + + if len(history) != len(want) { + return fmt.Errorf("num of history mismatch: got %d, want %d", + len(history), len(want)) + } + + for _, r := range want { + if _, ok := history[r]; !ok { + return fmt.Errorf("round %d not exists", r) + } + } + return nil +} + +func checkDirectPeer(srvr *testP2PServer, want []enode.ID) error { + if len(srvr.direct) != len(want) { + return fmt.Errorf("num of direct peer mismatch: got %d, want %d", + len(srvr.direct), len(want)) + } + + for _, id := range want { + if _, ok := srvr.direct[id]; !ok { + return fmt.Errorf("direct peer %s not exists", id.String()) + } + } + return nil +} +func checkGroup(srvr *testP2PServer, want []string) error { + if len(srvr.group) != len(want) { + return fmt.Errorf("num of group mismatch: got %d, want %d", + len(srvr.group), len(want)) + } + + for _, name := range want { + if _, ok := srvr.group[name]; !ok { + return fmt.Errorf("group %s not exists", name) + } + } + return nil +} + +func nodeID(n int64) enode.ID { + b := big.NewInt(n).Bytes() + var id enode.ID + copy(id[len(id)-len(b):], b) + return id +} + +func newTestNodeSet(nodes []enode.ID) map[string]struct{} { + m := make(map[string]struct{}) + for _, node := range nodes { + m[node.String()] = struct{}{} + } + return m +} + +func newDummyPeer(id enode.ID) *peer { + return &peer{ + labels: mapset.NewSet(), + id: id.String(), + } +} diff --git a/dex/protocol.go b/dex/protocol.go index 0111edf18..7b01217ff 100644 --- a/dex/protocol.go +++ b/dex/protocol.go @@ -20,14 +20,11 @@ import ( "fmt" "io" "math/big" - "net" "github.com/dexon-foundation/dexon/common" "github.com/dexon-foundation/dexon/core" "github.com/dexon-foundation/dexon/core/types" - "github.com/dexon-foundation/dexon/crypto/sha3" "github.com/dexon-foundation/dexon/event" - "github.com/dexon-foundation/dexon/p2p/discover" "github.com/dexon-foundation/dexon/p2p/enode" "github.com/dexon-foundation/dexon/rlp" ) @@ -67,7 +64,7 @@ const ( ReceiptsMsg = 0x10 // Protocol messages belonging to dex/64 - NotaryNodeInfoMsg = 0x11 + MetaMsg = 0x11 ) type errCode int @@ -114,12 +111,26 @@ type txPool interface { SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription } +type governance interface { + GetChainNum(uint64) uint32 + + GetNotarySet(uint32, uint64) map[string]struct{} + + GetDKGSet(uint64) map[string]struct{} + + SubscribeNewCRSEvent(ch chan core.NewCRSEvent) event.Subscription +} + type p2pServer interface { Self() *enode.Node - AddNotaryPeer(*discover.Node) + AddDirectPeer(*enode.Node) + + RemoveDirectPeer(*enode.Node) - RemoveNotaryPeer(*discover.Node) + AddGroup(string, []*enode.Node, uint64) + + RemoveGroup(string) } // statusData is the network packet for the status message. @@ -195,22 +206,3 @@ type blockBody struct { // blockBodiesData is the network packet for block content distribution. type blockBodiesData []*blockBody - -// TODO(sonic): revisit this msg when dexon core SDK is finalized. -// notartyNodeInfo is the network packet for notary node ip info. -type notaryNodeInfo struct { - ID discover.NodeID - IP net.IP - UDP uint16 - TCP uint16 - Round uint64 - Sig []byte - Timestamp int64 -} - -func (n *notaryNodeInfo) Hash() (h common.Hash) { - hw := sha3.NewKeccak256() - rlp.Encode(hw, n) - hw.Sum(h[:0]) - return h -} diff --git a/dex/protocol_test.go b/dex/protocol_test.go index c1b6efcfc..8c7638b2b 100644 --- a/dex/protocol_test.go +++ b/dex/protocol_test.go @@ -221,3 +221,85 @@ func TestGetBlockHeadersDataEncodeDecode(t *testing.T) { } } } + +func TestRecvNodeMetas(t *testing.T) { + pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) + p, _ := newTestPeer("peer", dex64, pm, true) + defer pm.Stop() + defer p.close() + + meta := NodeMeta{ + ID: nodeID(1), + } + + ch := make(chan newMetasEvent) + pm.nodeTable.SubscribeNewMetasEvent(ch) + + if err := p2p.Send(p.app, MetaMsg, []interface{}{meta}); err != nil { + t.Fatalf("send error: %v", err) + } + + select { + case event := <-ch: + metas := event.Metas + if len(metas) != 1 { + t.Errorf("wrong number of new metas: got %d, want 1", len(metas)) + } else if metas[0].Hash() != meta.Hash() { + t.Errorf("added wrong meta hash: got %v, want %v", metas[0].Hash(), meta.Hash()) + } + case <-time.After(3 * time.Second): + t.Errorf("no newMetasEvent received within 3 seconds") + } +} + +func TestSendNodeMetas(t *testing.T) { + pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) + defer pm.Stop() + + allmetas := make([]*NodeMeta, 100) + for nonce := range allmetas { + allmetas[nonce] = &NodeMeta{ID: nodeID(int64(nonce))} + } + + // Connect several peers. They should all receive the pending transactions. + var wg sync.WaitGroup + checkmetas := func(p *testPeer) { + defer wg.Done() + defer p.close() + seen := make(map[common.Hash]bool) + for _, meta := range allmetas { + seen[meta.Hash()] = false + } + for n := 0; n < len(allmetas) && !t.Failed(); { + var metas []*NodeMeta + msg, err := p.app.ReadMsg() + if err != nil { + t.Errorf("%v: read error: %v", p.Peer, err) + } else if msg.Code != MetaMsg { + t.Errorf("%v: got code %d, want MetaMsg", p.Peer, msg.Code) + } + if err := msg.Decode(&metas); err != nil { + t.Errorf("%v: %v", p.Peer, err) + } + for _, meta := range metas { + hash := meta.Hash() + seenmeta, want := seen[hash] + if seenmeta { + t.Errorf("%v: got meta more than once: %x", p.Peer, hash) + } + if !want { + t.Errorf("%v: got unexpected meta: %x", p.Peer, hash) + } + seen[hash] = true + n++ + } + } + } + for i := 0; i < 3; i++ { + p, _ := newTestPeer(fmt.Sprintf("peer #%d", i), dex64, pm, true) + wg.Add(1) + go checkmetas(p) + } + pm.nodeTable.Add(allmetas) + wg.Wait() +} diff --git a/dex/sync.go b/dex/sync.go index d7fe748bc..5af6076bc 100644 --- a/dex/sync.go +++ b/dex/sync.go @@ -25,7 +25,7 @@ import ( "github.com/dexon-foundation/dexon/core/types" "github.com/dexon-foundation/dexon/eth/downloader" "github.com/dexon-foundation/dexon/log" - "github.com/dexon-foundation/dexon/p2p/discover" + "github.com/dexon-foundation/dexon/p2p/enode" ) const ( @@ -35,6 +35,9 @@ const ( // This is the target size for the packs of transactions sent by txsyncLoop. // A pack can get larger than this if a single transactions exceeds this size. txsyncPackSize = 100 * 1024 + + // This is the target number for the packs of metas sent by metasyncLoop. + metasyncPackNum = 1024 ) type txsync struct { @@ -64,7 +67,7 @@ func (pm *ProtocolManager) syncTransactions(p *peer) { // the transactions in small packs to one peer at a time. func (pm *ProtocolManager) txsyncLoop() { var ( - pending = make(map[discover.NodeID]*txsync) + pending = make(map[enode.ID]*txsync) sending = false // whether a send is active pack = new(txsync) // the pack that is being sent done = make(chan error, 1) // result of the send @@ -129,6 +132,94 @@ func (pm *ProtocolManager) txsyncLoop() { } } +type metasync struct { + p *peer + metas []*NodeMeta +} + +// syncNodeMetas starts sending all node metas to the given peer. +func (pm *ProtocolManager) syncNodeMetas(p *peer) { + metas := pm.nodeTable.Metas() + if len(metas) == 0 { + return + } + select { + case pm.metasyncCh <- &metasync{p, metas}: + case <-pm.quitSync: + } +} + +// metasyncLoop takes care of the initial node meta sync for each new +// connection. When a new peer appears, we relay all currently node metas. +// In order to minimise egress bandwidth usage, we send +// the metas in small packs to one peer at a time. +func (pm *ProtocolManager) metasyncLoop() { + var ( + pending = make(map[enode.ID]*metasync) + sending = false // whether a send is active + pack = new(metasync) // the pack that is being sent + done = make(chan error, 1) // result of the send + ) + + // send starts a sending a pack of transactions from the sync. + send := func(s *metasync) { + // Fill pack with node metas up to the target num. + var num int + pack.p = s.p + pack.metas = pack.metas[:0] + for i := 0; i < len(s.metas) && num < metasyncPackNum; i++ { + pack.metas = append(pack.metas, s.metas[i]) + num += 1 + } + // Remove the metas that will be sent. + s.metas = s.metas[:copy(s.metas, s.metas[len(pack.metas):])] + if len(s.metas) == 0 { + delete(pending, s.p.ID()) + } + // Send the pack in the background. + s.p.Log().Trace("Sending batch of transactions", "count", len(pack.metas), "bytes", num) + sending = true + go func() { done <- pack.p.SendNodeMetas(pack.metas) }() + } + + // pick chooses the next pending sync. + pick := func() *metasync { + if len(pending) == 0 { + return nil + } + n := rand.Intn(len(pending)) + 1 + for _, s := range pending { + if n--; n == 0 { + return s + } + } + return nil + } + + for { + select { + case s := <-pm.metasyncCh: + pending[s.p.ID()] = s + if !sending { + send(s) + } + case err := <-done: + sending = false + // Stop tracking peers that cause send failures. + if err != nil { + pack.p.Log().Debug("NodeMeta send failed", "err", err) + delete(pending, pack.p.ID()) + } + // Schedule the next send. + if s := pick(); s != nil { + send(s) + } + case <-pm.quitSync: + return + } + } +} + // syncer is responsible for periodically synchronising with the network, both // downloading hashes and blocks as well as handling the announcement handler. func (pm *ProtocolManager) syncer() { diff --git a/p2p/dial.go b/p2p/dial.go index 9b24ed96a..909bed863 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -64,6 +64,12 @@ func (t TCPDialer) Dial(dest *enode.Node) (net.Conn, error) { return t.Dialer.Dial("tcp", addr.String()) } +type dialGroup struct { + name string + nodes map[enode.ID]*enode.Node + num uint64 +} + // dialstate schedules dials and discovery lookups. // it get's a chance to compute new tasks on every iteration // of the main loop in Server.run. @@ -78,6 +84,8 @@ type dialstate struct { lookupBuf []*enode.Node // current discovery lookup results randomNodes []*enode.Node // filled from Table static map[enode.ID]*dialTask + direct map[enode.ID]*dialTask + group map[string]*dialGroup hist *dialHistory start time.Time // time when the dialer was first used @@ -85,6 +93,7 @@ type dialstate struct { } type discoverTable interface { + Self() *enode.Node Close() Resolve(*enode.Node) *enode.Node LookupRandom() []*enode.Node @@ -133,6 +142,8 @@ func newDialState(self enode.ID, static []*enode.Node, bootnodes []*enode.Node, self: self, netrestrict: netrestrict, static: make(map[enode.ID]*dialTask), + direct: make(map[enode.ID]*dialTask), + group: make(map[string]*dialGroup), dialing: make(map[enode.ID]connFlag), bootnodes: make([]*enode.Node, len(bootnodes)), randomNodes: make([]*enode.Node, maxdyn/2), @@ -159,6 +170,23 @@ func (s *dialstate) removeStatic(n *enode.Node) { s.hist.remove(n.ID()) } +func (s *dialstate) addDirect(n *enode.Node) { + s.direct[n.ID()] = &dialTask{flags: directDialedConn, dest: n} +} + +func (s *dialstate) removeDirect(n *enode.Node) { + delete(s.direct, n.ID()) + s.hist.remove(n.ID()) +} + +func (s *dialstate) addGroup(g *dialGroup) { + s.group[g.name] = g +} + +func (s *dialstate) removeGroup(g *dialGroup) { + delete(s.group, g.name) +} + func (s *dialstate) newTasks(nRunning int, peers map[enode.ID]*Peer, now time.Time) []task { if s.start.IsZero() { s.start = now @@ -196,13 +224,69 @@ func (s *dialstate) newTasks(nRunning int, peers map[enode.ID]*Peer, now time.Ti err := s.checkDial(t.dest, peers) switch err { case errNotWhitelisted, errSelf: - log.Warn("Removing static dial candidate", "id", t.dest.ID, "addr", &net.TCPAddr{IP: t.dest.IP(), Port: t.dest.TCP()}, "err", err) + log.Warn("Removing static dial candidate", "id", t.dest.ID(), "addr", &net.TCPAddr{IP: t.dest.IP(), Port: t.dest.TCP()}, "err", err) delete(s.static, t.dest.ID()) case nil: s.dialing[id] = t.flags newtasks = append(newtasks, t) } } + + for id, t := range s.direct { + err := s.checkDial(t.dest, peers) + switch err { + case errNotWhitelisted, errSelf: + log.Warn("Removing direct dial candidate", "id", t.dest.ID(), "addr", &net.TCPAddr{IP: t.dest.IP(), Port: t.dest.TCP()}, "err", err) + delete(s.direct, t.dest.ID()) + case nil: + s.dialing[id] = t.flags + newtasks = append(newtasks, t) + } + } + + // compute connected + connected := map[string]map[enode.ID]struct{}{} + for _, g := range s.group { + connected[g.name] = map[enode.ID]struct{}{} + } + + for id := range peers { + for _, g := range s.group { + if _, ok := g.nodes[id]; ok { + connected[g.name][id] = struct{}{} + } + } + } + + for id := range s.dialing { + for _, g := range s.group { + if _, ok := g.nodes[id]; ok { + connected[g.name][id] = struct{}{} + } + } + } + + groupNodes := map[enode.ID]*enode.Node{} + for _, g := range s.group { + for _, n := range g.nodes { + if uint64(len(connected[g.name])) >= g.num { + break + } + err := s.checkDial(n, peers) + switch err { + case errNotWhitelisted, errSelf: + log.Warn("Removing group dial candidate", "id", n.ID(), "addr", &net.TCPAddr{IP: n.IP(), Port: n.TCP()}, "err", err) + delete(g.nodes, n.ID()) + case nil: + groupNodes[n.ID()] = n + connected[g.name][n.ID()] = struct{}{} + } + } + } + for _, n := range groupNodes { + addDial(groupDialedConn, n) + } + // If we don't have any peers whatsoever, try to dial a random bootnode. This // scenario is useful for the testnet (and private networks) where the discovery // table might be full of mostly bad peers, making it hard to find good ones. diff --git a/p2p/dial_test.go b/p2p/dial_test.go index 3805ff690..f9122de6f 100644 --- a/p2p/dial_test.go +++ b/p2p/dial_test.go @@ -525,6 +525,223 @@ func TestDialStateStaticDial(t *testing.T) { }) } +func TestDialStateDirectDial(t *testing.T) { + wantDirect := []*enode.Node{ + newNode(uintID(1), nil), + newNode(uintID(2), nil), + newNode(uintID(3), nil), + newNode(uintID(4), nil), + newNode(uintID(5), nil), + } + init := newDialState(enode.ID{}, nil, nil, fakeTable{}, 0, nil) + for _, node := range wantDirect { + init.addDirect(node) + } + + runDialTest(t, dialtest{ + init: init, + rounds: []round{ + // Direct dials are launched for the nodes that + // aren't yet connected. + { + peers: []*Peer{ + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(1), nil)}}, + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(2), nil)}}, + }, + new: []task{ + &dialTask{flags: directDialedConn, dest: newNode(uintID(3), nil)}, + &dialTask{flags: directDialedConn, dest: newNode(uintID(4), nil)}, + &dialTask{flags: directDialedConn, dest: newNode(uintID(5), nil)}, + }, + }, + // No new tasks are launched in this round because all direct + // nodes are either connected or still being dialed. + { + peers: []*Peer{ + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(1), nil)}}, + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(2), nil)}}, + {rw: &conn{flags: staticDialedConn, node: newNode(uintID(3), nil)}}, + }, + done: []task{ + &dialTask{flags: staticDialedConn, dest: newNode(uintID(3), nil)}, + }, + }, + // No new dial tasks are launched because all direct + // nodes are now connected. + { + peers: []*Peer{ + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(1), nil)}}, + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(2), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(3), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(4), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(5), nil)}}, + }, + done: []task{ + &dialTask{flags: directDialedConn, dest: newNode(uintID(4), nil)}, + &dialTask{flags: directDialedConn, dest: newNode(uintID(5), nil)}, + }, + new: []task{ + &waitExpireTask{Duration: 14 * time.Second}, + }, + }, + // Wait a round for dial history to expire, no new tasks should spawn. + { + peers: []*Peer{ + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(1), nil)}}, + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(2), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(3), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(4), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(5), nil)}}, + }, + }, + // If a direct node is dropped, it should be immediately redialed, + // irrespective whether it was originally static or dynamic. + { + peers: []*Peer{ + {rw: &conn{flags: dynDialedConn, node: newNode(uintID(1), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(3), nil)}}, + {rw: &conn{flags: directDialedConn, node: newNode(uintID(5), nil)}}, + }, + new: []task{ + &dialTask{flags: directDialedConn, dest: newNode(uintID(2), nil)}, + &dialTask{flags: directDialedConn, dest: newNode(uintID(4), nil)}, + }, + }, + }, + }) +} + +func TestDialStateGroupDial(t *testing.T) { + groups := []*dialGroup{ + &dialGroup{ + name: "g1", + nodes: map[enode.ID]*enode.Node{ + uintID(1): newNode(uintID(1), nil), + uintID(2): newNode(uintID(2), nil), + }, + num: 2, + }, + &dialGroup{ + name: "g2", + nodes: map[enode.ID]*enode.Node{ + uintID(2): newNode(uintID(2), nil), + uintID(3): newNode(uintID(3), nil), + uintID(4): newNode(uintID(4), nil), + uintID(5): newNode(uintID(5), nil), + uintID(6): newNode(uintID(6), nil), + }, + num: 2, + }, + } + + type groupTest struct { + peers []*Peer + dialing map[enode.ID]connFlag + ceiling map[string]uint64 + } + + tests := []groupTest{ + { + peers: nil, + dialing: map[enode.ID]connFlag{}, + ceiling: map[string]uint64{"g1": 2, "g2": 4}, + }, + { + peers: []*Peer{ + {rw: &conn{flags: staticDialedConn, node: newNode(uintID(2), nil)}}, + }, + dialing: map[enode.ID]connFlag{ + uintID(1): staticDialedConn, + }, + ceiling: map[string]uint64{"g1": 2, "g2": 2}, + }, + { + peers: []*Peer{ + {rw: &conn{flags: staticDialedConn, node: newNode(uintID(1), nil)}}, + {rw: &conn{flags: staticDialedConn, node: newNode(uintID(3), nil)}}, + {rw: &conn{flags: staticDialedConn, node: newNode(uintID(4), nil)}}, + {rw: &conn{flags: staticDialedConn, node: newNode(uintID(5), nil)}}, + }, + dialing: map[enode.ID]connFlag{ + uintID(2): staticDialedConn, + }, + ceiling: map[string]uint64{"g1": 2, "g2": 4}, + }, + { + peers: nil, + dialing: map[enode.ID]connFlag{ + uintID(1): staticDialedConn, + uintID(2): staticDialedConn, + uintID(3): staticDialedConn, + }, + ceiling: map[string]uint64{"g1": 2, "g2": 4}, + }, + } + + pm := func(ps []*Peer) map[enode.ID]*Peer { + m := make(map[enode.ID]*Peer) + for _, p := range ps { + m[p.rw.node.ID()] = p + } + return m + } + + run := func(i int, tt groupTest) { + d := newDialState(enode.ID{}, nil, nil, fakeTable{}, 0, nil) + d.dialing = make(map[enode.ID]connFlag) + for k, v := range tt.dialing { + d.dialing[k] = v + } + + for _, g := range groups { + d.addGroup(g) + } + peermap := pm(tt.peers) + new := d.newTasks(len(tt.dialing), peermap, time.Now()) + + cnt := map[string]uint64{} + for id := range peermap { + for _, g := range groups { + if _, ok := g.nodes[id]; ok { + cnt[g.name]++ + } + } + } + + for id := range tt.dialing { + for _, g := range groups { + if _, ok := g.nodes[id]; ok { + cnt[g.name]++ + } + } + } + + for _, task := range new { + id := task.(*dialTask).dest.ID() + for _, g := range groups { + if _, ok := g.nodes[id]; ok { + cnt[g.name]++ + } + } + } + + for _, g := range groups { + if cnt[g.name] < g.num { + t.Errorf("test %d) group %s peers + dialing + new < num (%d < %d)", + i, g.name, cnt[g.name], g.num) + } + if cnt[g.name] > tt.ceiling[g.name] { + t.Errorf("test %d) group %s peers + dialing + new > ceiling (%d > %d)", + i, g.name, cnt[g.name], tt.ceiling[g.name]) + } + } + } + + for i, tt := range tests { + run(i, tt) + } +} + // This test checks that static peers will be redialed immediately if they were re-added to a static list. func TestDialStaticAfterReset(t *testing.T) { wantStatic := []*enode.Node{ diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 25ea7b0b2..e8a219871 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -127,7 +127,7 @@ func newTable(t transport, db *enode.DB, bootnodes []*enode.Node) (*Table, error return tab, nil } -func (tab *Table) self() *enode.Node { +func (tab *Table) Self() *enode.Node { return tab.net.self() } @@ -258,7 +258,7 @@ func (tab *Table) lookup(targetKey encPubkey, refreshIfEmpty bool) []*node { ) // don't query further if we hit ourself. // unlikely to happen often in practice. - asked[tab.self().ID()] = true + asked[tab.Self().ID()] = true for { tab.mutex.Lock() @@ -419,7 +419,7 @@ func (tab *Table) doRefresh(done chan struct{}) { // Run self lookup to discover new neighbor nodes. // We can only do this if we have a secp256k1 identity. var key ecdsa.PublicKey - if err := tab.self().Load((*enode.Secp256k1)(&key)); err == nil { + if err := tab.Self().Load((*enode.Secp256k1)(&key)); err == nil { tab.lookup(encodePubkey(&key), false) } @@ -544,7 +544,7 @@ func (tab *Table) len() (n int) { // bucket returns the bucket for the given node ID hash. func (tab *Table) bucket(id enode.ID) *bucket { - d := enode.LogDist(tab.self().ID(), id) + d := enode.LogDist(tab.Self().ID(), id) if d <= bucketMinDistance { return tab.buckets[0] } @@ -557,7 +557,7 @@ func (tab *Table) bucket(id enode.ID) *bucket { // // The caller must not hold tab.mutex. func (tab *Table) addSeenNode(n *node) { - if n.ID() == tab.self().ID() { + if n.ID() == tab.Self().ID() { return } @@ -599,7 +599,7 @@ func (tab *Table) addVerifiedNode(n *node) { if !tab.isInitDone() { return } - if n.ID() == tab.self().ID() { + if n.ID() == tab.Self().ID() { return } diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go index 2321e743f..8947a074e 100644 --- a/p2p/discover/table_test.go +++ b/p2p/discover/table_test.go @@ -147,7 +147,7 @@ func TestTable_IPLimit(t *testing.T) { defer tab.Close() for i := 0; i < tableIPLimit+1; i++ { - n := nodeAtDistance(tab.self().ID(), i, net.IP{172, 0, 1, byte(i)}) + n := nodeAtDistance(tab.Self().ID(), i, net.IP{172, 0, 1, byte(i)}) tab.addSeenNode(n) } if tab.len() > tableIPLimit { @@ -165,7 +165,7 @@ func TestTable_BucketIPLimit(t *testing.T) { d := 3 for i := 0; i < bucketIPLimit+1; i++ { - n := nodeAtDistance(tab.self().ID(), d, net.IP{172, 0, 1, byte(i)}) + n := nodeAtDistance(tab.Self().ID(), d, net.IP{172, 0, 1, byte(i)}) tab.addSeenNode(n) } if tab.len() > bucketIPLimit { @@ -264,7 +264,7 @@ func TestTable_ReadRandomNodesGetAll(t *testing.T) { for i := 0; i < len(buf); i++ { ld := cfg.Rand.Intn(len(tab.buckets)) - fillTable(tab, []*node{nodeAtDistance(tab.self().ID(), ld, intIP(ld))}) + fillTable(tab, []*node{nodeAtDistance(tab.Self().ID(), ld, intIP(ld))}) } gotN := tab.ReadRandomNodes(buf) if gotN != tab.len() { @@ -312,8 +312,8 @@ func TestTable_addVerifiedNode(t *testing.T) { defer tab.Close() // Insert two nodes. - n1 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 1}) - n2 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 2}) + n1 := nodeAtDistance(tab.Self().ID(), 256, net.IP{88, 77, 66, 1}) + n2 := nodeAtDistance(tab.Self().ID(), 256, net.IP{88, 77, 66, 2}) tab.addSeenNode(n1) tab.addSeenNode(n2) @@ -344,8 +344,8 @@ func TestTable_addSeenNode(t *testing.T) { defer tab.Close() // Insert two nodes. - n1 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 1}) - n2 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 2}) + n1 := nodeAtDistance(tab.Self().ID(), 256, net.IP{88, 77, 66, 1}) + n2 := nodeAtDistance(tab.Self().ID(), 256, net.IP{88, 77, 66, 2}) tab.addSeenNode(n1) tab.addSeenNode(n2) diff --git a/p2p/discover/table_util_test.go b/p2p/discover/table_util_test.go index 20238aabc..7e149b22c 100644 --- a/p2p/discover/table_util_test.go +++ b/p2p/discover/table_util_test.go @@ -75,10 +75,10 @@ func intIP(i int) net.IP { // fillBucket inserts nodes into the given bucket until it is full. func fillBucket(tab *Table, n *node) (last *node) { - ld := enode.LogDist(tab.self().ID(), n.ID()) + ld := enode.LogDist(tab.Self().ID(), n.ID()) b := tab.bucket(n.ID()) for len(b.entries) < bucketSize { - b.entries = append(b.entries, nodeAtDistance(tab.self().ID(), ld, intIP(ld))) + b.entries = append(b.entries, nodeAtDistance(tab.Self().ID(), ld, intIP(ld))) } return b.entries[bucketSize-1] } diff --git a/p2p/peer.go b/p2p/peer.go index 3c75d7dd5..2c357fdc9 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -434,7 +434,6 @@ type PeerInfo struct { RemoteAddress string `json:"remoteAddress"` // Remote endpoint of the TCP data connection Inbound bool `json:"inbound"` Trusted bool `json:"trusted"` - Notary bool `json:"notary"` Static bool `json:"static"` } `json:"network"` Protocols map[string]interface{} `json:"protocols"` // Sub-protocol specific metadata fields @@ -459,7 +458,6 @@ func (p *Peer) Info() *PeerInfo { info.Network.RemoteAddress = p.RemoteAddr().String() info.Network.Inbound = p.rw.is(inboundConn) info.Network.Trusted = p.rw.is(trustedConn) - info.Network.Notary = p.rw.is(notaryConn) info.Network.Static = p.rw.is(staticDialedConn) // Gather all the running protocol infos diff --git a/p2p/server.go b/p2p/server.go index c3b3a00d4..a58673342 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -33,13 +33,13 @@ import ( "github.com/dexon-foundation/dexon/crypto" "github.com/dexon-foundation/dexon/event" "github.com/dexon-foundation/dexon/log" - "github.com/dexon-foundation/dexon/p2p/discover" "github.com/dexon-foundation/dexon/p2p/discv5" "github.com/dexon-foundation/dexon/p2p/enode" "github.com/dexon-foundation/dexon/p2p/enr" "github.com/dexon-foundation/dexon/p2p/nat" "github.com/dexon-foundation/dexon/p2p/netutil" "github.com/dexon-foundation/dexon/rlp" + "github.com/ethereum/go-ethereum/p2p/discover" ) const ( @@ -178,10 +178,12 @@ type Server struct { quit chan struct{} addstatic chan *enode.Node removestatic chan *enode.Node + adddirect chan *enode.Node + removedirect chan *enode.Node addtrusted chan *enode.Node removetrusted chan *enode.Node - addnotary chan *enode.Node - removenotary chan *enode.Node + addgroup chan *dialGroup + removegroup chan *dialGroup posthandshake chan *conn addpeer chan *conn delpeer chan peerDrop @@ -203,9 +205,10 @@ type connFlag int32 const ( dynDialedConn connFlag = 1 << iota staticDialedConn + directDialedConn + groupDialedConn inboundConn trustedConn - notaryConn ) // conn wraps a network connection with information gathered @@ -254,12 +257,15 @@ func (f connFlag) String() string { if f&staticDialedConn != 0 { s += "-staticdial" } + if f&directDialedConn != 0 { + s += "-directdial" + } + if f&groupDialedConn != 0 { + s += "-groupdial" + } if f&inboundConn != 0 { s += "-inbound" } - if f¬aryConn != 0 { - s += "-notary" - } if s != "" { s = s[1:] } @@ -333,40 +339,57 @@ func (srv *Server) RemovePeer(node *enode.Node) { } } -// AddTrustedPeer adds the given node to a reserved whitelist which allows the -// node to always connect, even if the slot are full. -func (srv *Server) AddTrustedPeer(node *enode.Node) { +// AddDirectPeer connects to the given node and maintains the connection until the +// server is shut down. If the connection fails for any reason, the server will +// attempt to reconnect the peer. +func (srv *Server) AddDirectPeer(node *enode.Node) { select { - case srv.addtrusted <- node: + case srv.adddirect <- node: case <-srv.quit: } } -// RemoveTrustedPeer removes the given node from the trusted peer set. -func (srv *Server) RemoveTrustedPeer(node *enode.Node) { +// RemoveDirectPeer disconnects from the given node +func (srv *Server) RemoveDirectPeer(node *enode.Node) { select { - case srv.removetrusted <- node: + case srv.removedirect <- node: case <-srv.quit: } } -// AddNotaryPeer connects to the given node and maintains the connection until the -// server is shut down. If the connection fails for any reason, the server will -// attempt to reconnect the peer. -// AddNotaryPeer also adds the given node to a reserved whitelist which allows the +func (srv *Server) AddGroup(name string, nodes []*enode.Node, num uint64) { + m := map[enode.ID]*enode.Node{} + for _, node := range nodes { + m[node.ID()] = node + } + g := &dialGroup{name: name, nodes: m, num: num} + select { + case srv.addgroup <- g: + case <-srv.quit: + } +} + +func (srv *Server) RemoveGroup(name string) { + g := &dialGroup{name: name} + select { + case srv.removegroup <- g: + case <-srv.quit: + } +} + +// AddTrustedPeer adds the given node to a reserved whitelist which allows the // node to always connect, even if the slot are full. -func (srv *Server) AddNotaryPeer(node *discover.Node) { +func (srv *Server) AddTrustedPeer(node *enode.Node) { select { - case srv.addnotary <- node: + case srv.addtrusted <- node: case <-srv.quit: } } -// RemoveNotaryPeer disconnects from the given node. -// RemoveNotaryPeer also removes the given node from the notary peer set. -func (srv *Server) RemoveNotaryPeer(node *discover.Node) { +// RemoveTrustedPeer removes the given node from the trusted peer set. +func (srv *Server) RemoveTrustedPeer(node *enode.Node) { select { - case srv.removenotary <- node: + case srv.removetrusted <- node: case <-srv.quit: } } @@ -388,6 +411,36 @@ func (srv *Server) Self() *enode.Node { return ln.Node() } +func (srv *Server) makeSelf(listener net.Listener, ntab discoverTable) *enode.Node { + // If the node is running but discovery is off, manually assemble the node infos. + if ntab == nil { + addr := srv.tcpAddr(listener) + return enode.NewV4(&srv.PrivateKey.PublicKey, addr.IP, addr.Port, 0) + } + // Otherwise return the discovery node. + return ntab.Self() +} + +func (srv *Server) tcpAddr(listener net.Listener) net.TCPAddr { + addr := net.TCPAddr{IP: net.IP{0, 0, 0, 0}} + if listener == nil { + return addr // Inbound connections disabled, use zero address. + } + // Otherwise inject the listener address too. + if a, ok := listener.Addr().(*net.TCPAddr); ok { + addr = *a + } + if srv.NAT != nil { + if ip, err := srv.NAT.ExternalIP(); err == nil { + addr.IP = ip + } + } + if addr.IP.IsUnspecified() { + addr.IP = net.IP{127, 0, 0, 1} + } + return addr +} + // Stop terminates the server and all active peer connections. // It blocks until all active connections have been closed. func (srv *Server) Stop() { @@ -465,10 +518,12 @@ func (srv *Server) Start() (err error) { srv.posthandshake = make(chan *conn) srv.addstatic = make(chan *enode.Node) srv.removestatic = make(chan *enode.Node) + srv.adddirect = make(chan *enode.Node) + srv.removedirect = make(chan *enode.Node) + srv.addgroup = make(chan *dialGroup) + srv.removegroup = make(chan *dialGroup) srv.addtrusted = make(chan *enode.Node) srv.removetrusted = make(chan *enode.Node) - srv.addnotary = make(chan *enode.Node) - srv.removenotary = make(chan *enode.Node) srv.peerOp = make(chan peerOpFunc) srv.peerOpDone = make(chan struct{}) @@ -628,6 +683,10 @@ type dialer interface { taskDone(task, time.Time) addStatic(*enode.Node) removeStatic(*enode.Node) + addDirect(*enode.Node) + removeDirect(*enode.Node) + addGroup(*dialGroup) + removeGroup(*dialGroup) } func (srv *Server) run(dialstate dialer) { @@ -636,13 +695,15 @@ func (srv *Server) run(dialstate dialer) { defer srv.nodedb.Close() var ( - peers = make(map[enode.ID]*Peer) - inboundCount = 0 - trusted = make(map[enode.ID]bool, len(srv.TrustedNodes)) - notary = make(map[enode.ID]bool) - taskdone = make(chan task, maxActiveDialTasks) - runningTasks []task - queuedTasks []task // tasks that can't run yet + peers = make(map[enode.ID]*Peer) + inboundCount = 0 + trusted = make(map[enode.ID]bool, len(srv.TrustedNodes)) + peerflags = make(map[enode.ID]connFlag) + groupRefCount = make(map[enode.ID]int32) + groups = make(map[string]*dialGroup) + taskdone = make(chan task, maxActiveDialTasks) + runningTasks []task + queuedTasks []task // tasks that can't run yet ) // Put trusted nodes into a map to speed up checks. // Trusted peers are loaded on startup or added via AddTrustedPeer RPC. @@ -680,6 +741,60 @@ func (srv *Server) run(dialstate dialer) { } } + // remember and maintain the connection flags locally + setConnFlags := func(id enode.ID, f connFlag, val bool) { + if p, ok := peers[id]; ok { + p.rw.set(f, val) + } + if val { + peerflags[id] |= f + } else { + peerflags[id] &= ^f + } + if peerflags[id] == 0 { + delete(peerflags, id) + } + } + + // Put trusted nodes into a map to speed up checks. + // Trusted peers are loaded on startup or added via AddTrustedPeer RPC. + for _, n := range srv.TrustedNodes { + setConnFlags(n.ID(), trustedConn, true) + } + + canDisconnect := func(p *Peer) bool { + f, ok := peerflags[p.ID()] + if ok && f != 0 { + return false + } + return !p.rw.is(dynDialedConn | inboundConn) + } + + removeGroup := func(g *dialGroup) { + if gg, ok := groups[g.name]; ok { + for id := range gg.nodes { + groupRefCount[id]-- + if groupRefCount[id] == 0 { + setConnFlags(id, groupDialedConn, false) + delete(groupRefCount, id) + } + } + } + } + + addGroup := func(g *dialGroup) { + if _, ok := groups[g.name]; ok { + removeGroup(groups[g.name]) + } + for id := range g.nodes { + groupRefCount[id]++ + if groupRefCount[id] > 0 { + setConnFlags(id, groupDialedConn, true) + } + } + groups[g.name] = g + } + running: for { scheduleTasks() @@ -693,63 +808,59 @@ running: // ephemeral static peer list. Add it to the dialer, // it will keep the node connected. srv.log.Trace("Adding static node", "node", n) + setConnFlags(n.ID(), staticDialedConn, true) dialstate.addStatic(n) case n := <-srv.removestatic: // This channel is used by RemovePeer to send a // disconnect request to a peer and begin the // stop keeping the node connected. srv.log.Trace("Removing static node", "node", n) + setConnFlags(n.ID(), staticDialedConn, false) dialstate.removeStatic(n) - if p, ok := peers[n.ID()]; ok { + if p, ok := peers[n.ID()]; ok && canDisconnect(p) { p.Disconnect(DiscRequested) } + case n := <-srv.adddirect: + // This channel is used by AddDirectPeer to add to the + // ephemeral direct peer list. Add it to the dialer, + // it will keep the node connected. + srv.log.Trace("Adding direct node", "node", n) + setConnFlags(n.ID(), directDialedConn, true) + if p, ok := peers[n.ID()]; ok { + p.rw.set(directDialedConn, true) + } + dialstate.addDirect(n) + case n := <-srv.removedirect: + // This channel is used by RemoveDirectPeer to send a + // disconnect request to a peer and begin the + // stop keeping the node connected. + srv.log.Trace("Removing direct node", "node", n) + setConnFlags(n.ID(), directDialedConn, false) + if p, ok := peers[n.ID()]; ok { + p.rw.set(directDialedConn, false) + if !p.rw.is(trustedConn | groupDialedConn) { + p.Disconnect(DiscRequested) + } + } + dialstate.removeDirect(n) + case g := <-srv.addgroup: + srv.log.Trace("Adding group", "group", g) + addGroup(g) + dialstate.addGroup(g) + case g := <-srv.removegroup: + srv.log.Trace("Removing group", "group", g) + removeGroup(g) + dialstate.removeGroup(g) case n := <-srv.addtrusted: // This channel is used by AddTrustedPeer to add an enode // to the trusted node set. srv.log.Trace("Adding trusted node", "node", n) - trusted[n.ID()] = true - // Mark any already-connected peer as trusted - if p, ok := peers[n.ID()]; ok { - p.rw.set(trustedConn, true) - } + setConnFlags(n.ID(), trustedConn, true) case n := <-srv.removetrusted: // This channel is used by RemoveTrustedPeer to remove an enode // from the trusted node set. srv.log.Trace("Removing trusted node", "node", n) - if _, ok := trusted[n.ID()]; ok { - delete(trusted, n.ID()) - } - // Unmark any already-connected peer as trusted - if p, ok := peers[n.ID()]; ok { - p.rw.set(trustedConn, false) - } - case n := <-srv.addnotary: - // This channel is used by AddNotaryPeer to add to the - // ephemeral notary peer list. Add it to the dialer, - // it will keep the node connected. - srv.log.Trace("Adding notary node", "node", n) - notary[n.ID] = true - if p, ok := peers[n.ID]; ok { - p.rw.set(notaryConn, true) - } - dialstate.addStatic(n) - case n := <-srv.removenotary: - // This channel is used by RemoveNotaryPeer to send a - // disconnect request to a peer and begin the - // stop keeping the node connected. - srv.log.Trace("Removing notary node", "node", n) - if _, ok := notary[n.ID]; ok { - delete(notary, n.ID) - } - - if p, ok := peers[n.ID]; ok { - p.rw.set(notaryConn, false) - } - - dialstate.removeStatic(n) - if p, ok := peers[n.ID]; ok { - p.Disconnect(DiscRequested) - } + setConnFlags(n.ID(), trustedConn, false) case op := <-srv.peerOp: // This channel is used by Peers and PeerCount. op(peers) @@ -764,15 +875,9 @@ running: case c := <-srv.posthandshake: // A connection has passed the encryption handshake so // the remote identity is known (but hasn't been verified yet). - if trusted[c.node.ID()] { - // Ensure that the trusted flag is set before checking against MaxPeers. - c.flags |= trustedConn + if f, ok := peerflags[c.node.ID()]; ok { + c.flags |= f } - - if notary[c.id] { - c.flags |= notaryConn - } - // TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them. select { case c.cont <- srv.encHandshakeChecks(peers, inboundCount, c): @@ -853,9 +958,9 @@ func (srv *Server) protoHandshakeChecks(peers map[enode.ID]*Peer, inboundCount i func (srv *Server) encHandshakeChecks(peers map[enode.ID]*Peer, inboundCount int, c *conn) error { switch { - case !c.is(trustedConn|notaryConn|staticDialedConn) && len(peers) >= srv.MaxPeers: + case !c.is(trustedConn|staticDialedConn|directDialedConn|groupDialedConn) && len(peers) >= srv.MaxPeers: return DiscTooManyPeers - case !c.is(trustedConn|notaryConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns(): + case !c.is(trustedConn|directDialedConn|groupDialedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns(): return DiscTooManyPeers case peers[c.node.ID()] != nil: return DiscAlreadyConnected diff --git a/p2p/server_test.go b/p2p/server_test.go index b46240722..8bd113791 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -27,7 +27,6 @@ import ( "github.com/dexon-foundation/dexon/crypto" "github.com/dexon-foundation/dexon/log" - "github.com/dexon-foundation/dexon/p2p/discover" "github.com/dexon-foundation/dexon/p2p/enode" "github.com/dexon-foundation/dexon/p2p/enr" "golang.org/x/crypto/sha3" @@ -173,14 +172,10 @@ func TestServerDial(t *testing.T) { } // Test AddTrustedPeer/RemoveTrustedPeer and changing Trusted flags - // Test AddNotaryPeer/RemoveTrustedPeer and changing Notary flags. // Particularly for race conditions on changing the flag state. if peer := srv.Peers()[0]; peer.Info().Network.Trusted { t.Errorf("peer is trusted prematurely: %v", peer) } - if peer := srv.Peers()[0]; peer.Info().Network.Notary { - t.Errorf("peer is notary prematurely: %v", peer) - } done := make(chan bool) go func() { srv.AddTrustedPeer(node) @@ -192,14 +187,6 @@ func TestServerDial(t *testing.T) { t.Errorf("peer is trusted after RemoveTrustedPeer: %v", peer) } - srv.AddNotaryPeer(node) - if peer := srv.Peers()[0]; !peer.Info().Network.Notary { - t.Errorf("peer is not notary after AddNotaryPeer: %v", peer) - } - srv.RemoveNotaryPeer(node) - if peer := srv.Peers()[0]; peer.Info().Network.Notary { - t.Errorf("peer is notary after RemoveNotaryPeer: %v", peer) - } done <- true }() // Trigger potential race conditions @@ -216,70 +203,89 @@ func TestServerDial(t *testing.T) { } } -// TestNotaryPeer checks that the node is added to and remove from static when -// AddNotaryPeer and RemoveNotaryPeer is called. -func TestNotaryPeer(t *testing.T) { - var ( - returned = make(chan struct{}) - add, remove = make(chan *discover.Node), make(chan *discover.Node) - tg = taskgen{ - newFunc: func(running int, peers map[discover.NodeID]*Peer) []task { - return []task{} - }, - doneFunc: func(t task) {}, - addFunc: func(n *discover.Node) { - add <- n - }, - removeFunc: func(n *discover.Node) { - remove <- n - }, - } - ) - +func TestServerPeerConnFlag(t *testing.T) { srv := &Server{ - Config: Config{MaxPeers: 10}, - quit: make(chan struct{}), - ntab: fakeTable{}, - addnotary: make(chan *discover.Node), - removenotary: make(chan *discover.Node), - running: true, - log: log.New(), + Config: Config{ + PrivateKey: newkey(), + MaxPeers: 10, + NoDial: true, + }, } - srv.loopWG.Add(1) - go func() { - srv.run(tg) - close(returned) - }() + if err := srv.Start(); err != nil { + t.Fatalf("could not start: %v", err) + } + defer srv.Stop() - notaryID := randomID() - go srv.AddNotaryPeer(&discover.Node{ID: notaryID}) + // inject a peer + key := newkey() + id := enode.PubkeyToIDV4(&key.PublicKey) + node := newNode(id, nil) + fd, _ := net.Pipe() + c := &conn{ + node: node, + fd: fd, + transport: newTestTransport(&key.PublicKey, fd), + flags: inboundConn, + cont: make(chan error), + } + if err := srv.checkpoint(c, srv.addpeer); err != nil { + t.Fatalf("could not add conn: %v", err) + } - select { - case n := <-add: - if n.ID != notaryID { - t.Errorf("node ID mismatched: got %s, want %s", - n.ID.String(), notaryID.String()) - } - case <-time.After(1 * time.Second): - t.Error("add static is not called within one second") + srv.AddTrustedPeer(node) + srv.Peers() // leverage this function to ensure trusted peer is added + if c.flags != (inboundConn | trustedConn) { + t.Errorf("flags mismatch: got %d, want %d", + c.flags, (inboundConn | trustedConn)) } - go srv.RemoveNotaryPeer(&discover.Node{ID: notaryID}) - select { - case n := <-remove: - if n.ID != notaryID { - t.Errorf("node ID mismatched: got %s, want %s", - n.ID.String(), notaryID.String()) - } - case <-time.After(1 * time.Second): - t.Error("remove static is not called within one second") + srv.AddDirectPeer(node) + srv.Peers() // leverage this function to ensure trusted peer is added + if c.flags != (inboundConn | trustedConn | directDialedConn) { + t.Errorf("flags mismatch: got %d, want %d", + c.flags, (inboundConn | trustedConn | directDialedConn)) } - srv.Stop() - select { - case <-returned: - case <-time.After(500 * time.Millisecond): - t.Error("Server.run did not return within 500ms") + srv.AddGroup("g1", []*enode.Node{node}, 1) + srv.Peers() // leverage this function to ensure trusted peer is added + if c.flags != (inboundConn | trustedConn | directDialedConn | groupDialedConn) { + t.Errorf("flags mismatch: got %d, want %d", + c.flags, (inboundConn | trustedConn | directDialedConn | groupDialedConn)) + } + + srv.AddGroup("g2", []*enode.Node{node}, 1) + srv.Peers() // leverage this function to ensure trusted peer is added + if c.flags != (inboundConn | trustedConn | directDialedConn | groupDialedConn) { + t.Errorf("flags mismatch: got %d, want %d", + c.flags, (inboundConn | trustedConn | directDialedConn | groupDialedConn)) + } + + srv.RemoveTrustedPeer(node) + srv.Peers() // leverage this function to ensure trusted peer is added + if c.flags != (inboundConn | directDialedConn | groupDialedConn) { + t.Errorf("flags mismatch: got %d, want %d", + c.flags, (inboundConn | directDialedConn | directDialedConn)) + } + + srv.RemoveDirectPeer(node) + srv.Peers() // leverage this function to ensure trusted peer is added + if c.flags != (inboundConn | groupDialedConn) { + t.Errorf("flags mismatch: got %d, want %d", + c.flags, (inboundConn | directDialedConn)) + } + + srv.RemoveGroup("g1") + srv.Peers() // leverage this function to ensure trusted peer is added + if c.flags != (inboundConn | groupDialedConn) { + t.Errorf("flags mismatch: got %d, want %d", + c.flags, (inboundConn | directDialedConn)) + } + + srv.RemoveGroup("g2") + srv.Peers() // leverage this function to ensure trusted peer is added + if c.flags != inboundConn { + t.Errorf("flags mismatch: got %d, want %d", + c.flags, inboundConn) } } @@ -407,9 +413,6 @@ func TestServerManyTasks(t *testing.T) { type taskgen struct { newFunc func(running int, peers map[enode.ID]*Peer) []task doneFunc func(task) - - addFunc func(*discover.Node) - removeFunc func(*discover.Node) } func (tg taskgen) newTasks(running int, peers map[enode.ID]*Peer, now time.Time) []task { @@ -418,11 +421,17 @@ func (tg taskgen) newTasks(running int, peers map[enode.ID]*Peer, now time.Time) func (tg taskgen) taskDone(t task, now time.Time) { tg.doneFunc(t) } -func (tg taskgen) addStatic(n *enode.Node) { - tg.addFunc(n) +func (tg taskgen) addStatic(*enode.Node) { +} +func (tg taskgen) removeStatic(*enode.Node) { } -func (tg taskgen) removeStatic(n *enode.Node) { - tg.removeFunc(n) +func (tg taskgen) addDirect(*enode.Node) { +} +func (tg taskgen) removeDirect(*enode.Node) { +} +func (tg taskgen) addGroup(*dialGroup) { +} +func (tg taskgen) removeGroup(*dialGroup) { } type testTask struct { @@ -436,11 +445,10 @@ func (t *testTask) Do(srv *Server) { // This test checks that connections are disconnected // just after the encryption handshake when the server is -// at capacity. Trusted and Notary connections should still be accepted. +// at capacity. Trusted connections should still be accepted. func TestServerAtCap(t *testing.T) { trustedNode := newkey() trustedID := enode.PubkeyToIDV4(&trustedNode.PublicKey) - notaryID := randomID() srv := &Server{ Config: Config{ PrivateKey: newkey(), @@ -453,7 +461,6 @@ func TestServerAtCap(t *testing.T) { t.Fatalf("could not start: %v", err) } defer srv.Stop() - srv.AddNotaryPeer(&discover.Node{ID: notaryID}) newconn := func(id enode.ID) *conn { fd, _ := net.Pipe() @@ -484,15 +491,6 @@ func TestServerAtCap(t *testing.T) { t.Error("Server did not set trusted flag") } - // Try inserting a notary connection. - c = newconn(notaryID) - if err := srv.checkpoint(c, srv.posthandshake); err != nil { - t.Error("unexpected error for notary conn @posthandshake:", err) - } - if !c.is(notaryConn) { - t.Error("Server did not set notary flag") - } - // Remove from trusted set and try again srv.RemoveTrustedPeer(newNode(trustedID, nil)) c = newconn(trustedID) @@ -509,24 +507,6 @@ func TestServerAtCap(t *testing.T) { if !c.is(trustedConn) { t.Error("Server did not set trusted flag") } - - // Remove from notary set and try again - srv.RemoveNotaryPeer(&discover.Node{ID: notaryID}) - c = newconn(notaryID) - if err := srv.checkpoint(c, srv.posthandshake); err != DiscTooManyPeers { - t.Error("wrong error for insert:", err) - } - - // Add anotherID to notary set and try again - anotherNotaryID := randomID() - srv.AddNotaryPeer(&discover.Node{ID: anotherNotaryID}) - c = newconn(anotherNotaryID) - if err := srv.checkpoint(c, srv.posthandshake); err != nil { - t.Error("unexpected error for notary conn @posthandshake:", err) - } - if !c.is(notaryConn) { - t.Error("Server did not set notary flag") - } } func TestServerPeerLimits(t *testing.T) { |