aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--core/events.go7
-rw-r--r--dex/handler.go216
-rw-r--r--dex/handler_test.go445
-rw-r--r--dex/helper_test.go78
-rw-r--r--dex/network.go17
-rw-r--r--dex/nodetable.go79
-rw-r--r--dex/nodetable_test.go93
-rw-r--r--dex/notaryset.go203
-rw-r--r--dex/peer.go296
-rw-r--r--dex/peer_test.go628
-rw-r--r--dex/protocol.go42
-rw-r--r--dex/protocol_test.go82
-rw-r--r--dex/sync.go95
-rw-r--r--p2p/dial.go86
-rw-r--r--p2p/dial_test.go217
-rw-r--r--p2p/discover/table.go12
-rw-r--r--p2p/discover/table_test.go6
-rw-r--r--p2p/discover/table_util_test.go4
-rw-r--r--p2p/peer.go2
-rw-r--r--p2p/server.go271
-rw-r--r--p2p/server_test.go192
21 files changed, 2419 insertions, 652 deletions
diff --git a/core/events.go b/core/events.go
index 02a083002..e76aa4784 100644
--- a/core/events.go
+++ b/core/events.go
@@ -46,3 +46,10 @@ type ChainSideEvent struct {
}
type ChainHeadEvent struct{ Block *types.Block }
+
+type NewNotarySetEvent struct {
+ Round uint64
+ Pubkeys map[string]struct{} // pubkeys in hex format
+}
+
+type NewCRSEvent struct{ Round uint64 }
diff --git a/dex/handler.go b/dex/handler.go
index bc932fb28..5348b06f2 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -49,6 +49,8 @@ const (
// txChanSize is the size of channel listening to NewTxsEvent.
// The number is referenced from the size of tx pool.
txChanSize = 4096
+
+ metaChanSize = 10240
)
var (
@@ -59,11 +61,6 @@ var (
// not compatible (low protocol version restrictions and high requirements).
var errIncompatibleConfig = errors.New("incompatible configuration")
-type newNotarySetEvent struct {
- Round uint64
- Set map[string]struct{}
-}
-
func errResp(code errCode, format string, v ...interface{}) error {
return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
}
@@ -75,32 +72,35 @@ type ProtocolManager struct {
acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)
txpool txPool
+ nodeTable *nodeTable
+ gov governance
blockchain *core.BlockChain
chainconfig *params.ChainConfig
maxPeers int
- downloader *downloader.Downloader
- fetcher *fetcher.Fetcher
- peers *peerSet
- notarySetManager *notarySetManager
+ downloader *downloader.Downloader
+ fetcher *fetcher.Fetcher
+ peers *peerSet
SubProtocols []p2p.Protocol
eventMux *event.TypeMux
txsCh chan core.NewTxsEvent
txsSub event.Subscription
+ metasCh chan newMetasEvent
+ metasSub event.Subscription
minedBlockSub *event.TypeMuxSubscription
// channels for fetcher, syncer, txsyncLoop
newPeerCh chan *peer
txsyncCh chan *txsync
+ metasyncCh chan *metasync
quitSync chan struct{}
noMorePeers chan struct{}
- // channels for notarySetLoop
- newNotarySetCh chan newNotarySetEvent
- newRoundCh chan uint64
- newNotaryNodeInfoCh chan *notaryNodeInfo
+ // channels for peerSetLoop
+ crsCh chan core.NewCRSEvent
+ crsSub event.Subscription
srvr p2pServer
@@ -111,24 +111,26 @@ type ProtocolManager struct {
// NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the Ethereum network.
-func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {
+func NewProtocolManager(
+ config *params.ChainConfig, mode downloader.SyncMode, networkID uint64,
+ mux *event.TypeMux, txpool txPool, engine consensus.Engine,
+ blockchain *core.BlockChain, chaindb ethdb.Database,
+ gov governance) (*ProtocolManager, error) {
+ tab := newNodeTable()
// Create the protocol manager with the base fields
manager := &ProtocolManager{
- networkID: networkID,
- eventMux: mux,
- txpool: txpool,
- blockchain: blockchain,
- chainconfig: config,
- peers: newPeerSet(),
- newPeerCh: make(chan *peer),
- noMorePeers: make(chan struct{}),
- txsyncCh: make(chan *txsync),
- quitSync: make(chan struct{}),
- newNotarySetCh: make(chan newNotarySetEvent),
- newRoundCh: make(chan uint64),
- newNotaryNodeInfoCh: make(chan *notaryNodeInfo),
- }
- manager.notarySetManager = newNotarySetManager(manager.newNotaryNodeInfoCh)
+ networkID: networkID,
+ eventMux: mux,
+ txpool: txpool,
+ nodeTable: tab,
+ gov: gov,
+ blockchain: blockchain,
+ chainconfig: config,
+ newPeerCh: make(chan *peer),
+ noMorePeers: make(chan struct{}),
+ txsyncCh: make(chan *txsync),
+ quitSync: make(chan struct{}),
+ }
// Figure out whether to allow fast sync or not
if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {
@@ -216,22 +218,33 @@ func (pm *ProtocolManager) removePeer(id string) {
func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) {
pm.maxPeers = maxPeers
pm.srvr = srvr
+ pm.peers = newPeerSet(pm.gov, pm.srvr, pm.nodeTable)
+
+ // if our self in node set build the node info
// broadcast transactions
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
go pm.txBroadcastLoop()
+ // broadcast node metas
+ pm.metasCh = make(chan newMetasEvent, metaChanSize)
+ pm.metasSub = pm.nodeTable.SubscribeNewMetasEvent(pm.metasCh)
+ go pm.metaBroadcastLoop()
+
// broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()
- // run the notary set loop
- go pm.notarySetLoop()
+ // run the peer set loop
+ pm.crsCh = make(chan core.NewCRSEvent)
+ pm.crsSub = pm.gov.SubscribeNewCRSEvent(pm.crsCh)
+ go pm.peerSetLoop()
// start sync handlers
go pm.syncer()
go pm.txsyncLoop()
+ go pm.metasyncLoop()
}
func (pm *ProtocolManager) Stop() {
@@ -301,6 +314,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
pm.syncTransactions(p)
+ pm.syncNodeMetas(p)
// If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork
if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
@@ -699,18 +713,18 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
pm.txpool.AddRemotes(txs)
- case msg.Code == NotaryNodeInfoMsg:
- var info notaryNodeInfo
- if err := msg.Decode(&info); err != nil {
+ case msg.Code == MetaMsg:
+ var metas []*NodeMeta
+ if err := msg.Decode(&metas); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
-
- // TODO(sonic): validate signature
- p.MarkNotaryNodeInfo(info.Hash())
- // Broadcast to peers
- pm.BroadcastNotaryNodeInfo(&info)
-
- pm.notarySetManager.TryAddInfo(&info)
+ for i, meta := range metas {
+ if meta == nil {
+ return errResp(ErrDecode, "node meta %d is nil", i)
+ }
+ p.MarkNodeMeta(meta.Hash())
+ }
+ pm.nodeTable.Add(metas)
default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
}
@@ -769,11 +783,20 @@ func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
}
}
-// BroadcastNotaryNodeInfo will propagate notary node info to its peers.
-func (pm *ProtocolManager) BroadcastNotaryNodeInfo(info *notaryNodeInfo) {
- peers := pm.peers.PeersWithoutNotaryNodeInfo(info.Hash())
- for _, peer := range peers {
- peer.AsyncSendNotaryNodeInfo(info)
+// BroadcastMetas will propagate node metas to its peers.
+func (pm *ProtocolManager) BroadcastMetas(metas []*NodeMeta) {
+ var metaset = make(map[*peer][]*NodeMeta)
+
+ for _, meta := range metas {
+ peers := pm.peers.PeersWithoutNodeMeta(meta.Hash())
+ for _, peer := range peers {
+ metaset[peer] = append(metaset[peer], meta)
+ }
+ log.Trace("Broadcast meta", "ID", meta.ID, "recipients", len(peers))
+ }
+
+ for peer, metas := range metaset {
+ peer.AsyncSendNodeMetas(metas)
}
}
@@ -801,97 +824,32 @@ func (pm *ProtocolManager) txBroadcastLoop() {
}
}
-// a loop keep building and maintaining peers in notary set.
-func (pm *ProtocolManager) notarySetLoop() {
- // TODO:
- // * pm need to know current round, and whether we are in notary set.
- // * (done) able to handle node info in the near future round.
- // * check peer limit.
- // * revisit channels (newNotarySetCh, newRoundCh, newNotaryNodeInfoCh) (buffered ?)
-
- // If we are in notary set and we are synced with network.
- // events:
- // * new notary set is determined, and we are in notary set.
- // (TBD: subscribe the event or polling govornance)
- // - advance round
- // - build new notary set
- // - remove old notary set, remove old notary set from static
-
- // * current round node info changed.
-
- self := pm.srvr.Self()
- var round uint64
-
+func (pm *ProtocolManager) metaBroadcastLoop() {
for {
select {
- case event := <-pm.newNotarySetCh:
- // new notary set is determined.
- if _, ok := event.Set[self.ID.String()]; ok {
- // initialize the new notary set and add it to pm.notarySetManager
- s := newNotarySet(event.Round, event.Set)
- pm.notarySetManager.Register(event.Round, s)
-
- // TODO: handle signature
- pm.BroadcastNotaryNodeInfo(&notaryNodeInfo{
- ID: self.ID,
- IP: self.IP,
- UDP: self.UDP,
- TCP: self.TCP,
- Round: event.Round,
- Timestamp: time.Now().Unix(),
- })
- }
-
- case r := <-pm.newRoundCh:
- // move to new round.
- round = r
+ case event := <-pm.metasCh:
+ pm.BroadcastMetas(event.Metas)
- // TODO: revisit this to make sure how many previous round's
- // notary set we need to keep.
-
- // rmove notary set before current round, they are useless
- for _, s := range pm.notarySetManager.Before(round) {
- pm.removeNotarySetFromStatic(s)
- }
-
- // prepare notary set connections for new round.
- if pm.isInNotarySet(round + 1) {
- // we assume the notary set for round + 1 is added to
- // pm.notarySetManager before this step.
- if n, ok := pm.notarySetManager.Round(round + 1); ok {
- pm.addNotarySetToStatic(n)
- }
- }
-
- case <-pm.newNotaryNodeInfoCh:
- // some node info in "current round" is updated.
- // try to connect them by the new info.
- if pm.isInNotarySet(round) {
- if n, ok := pm.notarySetManager.Round(round); ok {
- pm.addNotarySetToStatic(n)
- }
- }
-
- case <-pm.quitSync:
+ // Err() channel will be closed when unsubscribing.
+ case <-pm.metasSub.Err():
return
}
}
}
-func (pm *ProtocolManager) isInNotarySet(r uint64) bool {
- return false
-}
-
-func (pm *ProtocolManager) addNotarySetToStatic(n *notarySet) {
- for _, node := range n.NodesToAdd() {
- pm.srvr.AddNotaryPeer(node)
- n.MarkAdded(node.ID.String())
- }
-}
-
-func (pm *ProtocolManager) removeNotarySetFromStatic(n *notarySet) {
- for _, node := range n.Nodes() {
- pm.srvr.RemoveNotaryPeer(node)
+// a loop keep building and maintaining peers in notary set.
+// TODO: finish this
+func (pm *ProtocolManager) peerSetLoop() {
+ for {
+ select {
+ case event := <-pm.crsCh:
+ pm.peers.BuildNotaryConn(event.Round)
+ pm.peers.BuildDKGConn(event.Round)
+ pm.peers.ForgetNotaryConn(event.Round - 1)
+ pm.peers.ForgetDKGConn(event.Round - 1)
+ case <-pm.quitSync:
+ return
+ }
}
}
diff --git a/dex/handler_test.go b/dex/handler_test.go
new file mode 100644
index 000000000..981362bb5
--- /dev/null
+++ b/dex/handler_test.go
@@ -0,0 +1,445 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package dex
+
+import (
+ "math"
+ "math/big"
+ "math/rand"
+ "testing"
+
+ "github.com/dexon-foundation/dexon/common"
+ "github.com/dexon-foundation/dexon/core"
+ "github.com/dexon-foundation/dexon/core/state"
+ "github.com/dexon-foundation/dexon/core/types"
+ "github.com/dexon-foundation/dexon/crypto"
+ "github.com/dexon-foundation/dexon/eth/downloader"
+ "github.com/dexon-foundation/dexon/ethdb"
+ "github.com/dexon-foundation/dexon/p2p"
+ "github.com/dexon-foundation/dexon/params"
+)
+
+// Tests that protocol versions and modes of operations are matched up properly.
+func TestProtocolCompatibility(t *testing.T) {
+ // Define the compatibility chart
+ tests := []struct {
+ version uint
+ mode downloader.SyncMode
+ compatible bool
+ }{
+ {61, downloader.FullSync, true}, {62, downloader.FullSync, true}, {63, downloader.FullSync, true},
+ {61, downloader.FastSync, true}, {62, downloader.FastSync, true}, {63, downloader.FastSync, true},
+ }
+ // Make sure anything we screw up is restored
+ backup := ProtocolVersions
+ defer func() { ProtocolVersions = backup }()
+
+ // Try all available compatibility configs and check for errors
+ for i, tt := range tests {
+ ProtocolVersions = []uint{tt.version}
+
+ pm, _, err := newTestProtocolManager(tt.mode, 0, nil, nil)
+ if pm != nil {
+ defer pm.Stop()
+ }
+ if (err == nil && !tt.compatible) || (err != nil && tt.compatible) {
+ t.Errorf("test %d: compatibility mismatch: have error %v, want compatibility %v", i, err, tt.compatible)
+ }
+ }
+}
+
+// Tests that block headers can be retrieved from a remote chain based on user queries.
+func TestGetBlockHeaders62(t *testing.T) { testGetBlockHeaders(t, 62) }
+func TestGetBlockHeaders63(t *testing.T) { testGetBlockHeaders(t, 63) }
+
+func testGetBlockHeaders(t *testing.T, protocol int) {
+ pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, downloader.MaxHashFetch+15, nil, nil)
+ peer, _ := newTestPeer("peer", protocol, pm, true)
+ defer peer.close()
+
+ // Create a "random" unknown hash for testing
+ var unknown common.Hash
+ for i := range unknown {
+ unknown[i] = byte(i)
+ }
+ // Create a batch of tests for various scenarios
+ limit := uint64(downloader.MaxHeaderFetch)
+ tests := []struct {
+ query *getBlockHeadersData // The query to execute for header retrieval
+ expect []common.Hash // The hashes of the block whose headers are expected
+ }{
+ // A single random block should be retrievable by hash and number too
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Hash: pm.blockchain.GetBlockByNumber(limit / 2).Hash()}, Amount: 1},
+ []common.Hash{pm.blockchain.GetBlockByNumber(limit / 2).Hash()},
+ }, {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Amount: 1},
+ []common.Hash{pm.blockchain.GetBlockByNumber(limit / 2).Hash()},
+ },
+ // Multiple headers should be retrievable in both directions
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Amount: 3},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(limit / 2).Hash(),
+ pm.blockchain.GetBlockByNumber(limit/2 + 1).Hash(),
+ pm.blockchain.GetBlockByNumber(limit/2 + 2).Hash(),
+ },
+ }, {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Amount: 3, Reverse: true},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(limit / 2).Hash(),
+ pm.blockchain.GetBlockByNumber(limit/2 - 1).Hash(),
+ pm.blockchain.GetBlockByNumber(limit/2 - 2).Hash(),
+ },
+ },
+ // Multiple headers with skip lists should be retrievable
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Skip: 3, Amount: 3},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(limit / 2).Hash(),
+ pm.blockchain.GetBlockByNumber(limit/2 + 4).Hash(),
+ pm.blockchain.GetBlockByNumber(limit/2 + 8).Hash(),
+ },
+ }, {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Skip: 3, Amount: 3, Reverse: true},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(limit / 2).Hash(),
+ pm.blockchain.GetBlockByNumber(limit/2 - 4).Hash(),
+ pm.blockchain.GetBlockByNumber(limit/2 - 8).Hash(),
+ },
+ },
+ // The chain endpoints should be retrievable
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: 0}, Amount: 1},
+ []common.Hash{pm.blockchain.GetBlockByNumber(0).Hash()},
+ }, {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: pm.blockchain.CurrentBlock().NumberU64()}, Amount: 1},
+ []common.Hash{pm.blockchain.CurrentBlock().Hash()},
+ },
+ // Ensure protocol limits are honored
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: pm.blockchain.CurrentBlock().NumberU64() - 1}, Amount: limit + 10, Reverse: true},
+ pm.blockchain.GetBlockHashesFromHash(pm.blockchain.CurrentBlock().Hash(), limit),
+ },
+ // Check that requesting more than available is handled gracefully
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: pm.blockchain.CurrentBlock().NumberU64() - 4}, Skip: 3, Amount: 3},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(pm.blockchain.CurrentBlock().NumberU64() - 4).Hash(),
+ pm.blockchain.GetBlockByNumber(pm.blockchain.CurrentBlock().NumberU64()).Hash(),
+ },
+ }, {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: 4}, Skip: 3, Amount: 3, Reverse: true},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(4).Hash(),
+ pm.blockchain.GetBlockByNumber(0).Hash(),
+ },
+ },
+ // Check that requesting more than available is handled gracefully, even if mid skip
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: pm.blockchain.CurrentBlock().NumberU64() - 4}, Skip: 2, Amount: 3},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(pm.blockchain.CurrentBlock().NumberU64() - 4).Hash(),
+ pm.blockchain.GetBlockByNumber(pm.blockchain.CurrentBlock().NumberU64() - 1).Hash(),
+ },
+ }, {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: 4}, Skip: 2, Amount: 3, Reverse: true},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(4).Hash(),
+ pm.blockchain.GetBlockByNumber(1).Hash(),
+ },
+ },
+ // Check a corner case where requesting more can iterate past the endpoints
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: 2}, Amount: 5, Reverse: true},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(2).Hash(),
+ pm.blockchain.GetBlockByNumber(1).Hash(),
+ pm.blockchain.GetBlockByNumber(0).Hash(),
+ },
+ },
+ // Check a corner case where skipping overflow loops back into the chain start
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Hash: pm.blockchain.GetBlockByNumber(3).Hash()}, Amount: 2, Reverse: false, Skip: math.MaxUint64 - 1},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(3).Hash(),
+ },
+ },
+ // Check a corner case where skipping overflow loops back to the same header
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Hash: pm.blockchain.GetBlockByNumber(1).Hash()}, Amount: 2, Reverse: false, Skip: math.MaxUint64},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(1).Hash(),
+ },
+ },
+ // Check that non existing headers aren't returned
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Hash: unknown}, Amount: 1},
+ []common.Hash{},
+ }, {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: pm.blockchain.CurrentBlock().NumberU64() + 1}, Amount: 1},
+ []common.Hash{},
+ },
+ }
+ // Run each of the tests and verify the results against the chain
+ for i, tt := range tests {
+ // Collect the headers to expect in the response
+ headers := []*types.Header{}
+ for _, hash := range tt.expect {
+ headers = append(headers, pm.blockchain.GetBlockByHash(hash).Header())
+ }
+ // Send the hash request and verify the response
+ p2p.Send(peer.app, 0x03, tt.query)
+ if err := p2p.ExpectMsg(peer.app, 0x04, headers); err != nil {
+ t.Errorf("test %d: headers mismatch: %v", i, err)
+ }
+ // If the test used number origins, repeat with hashes as the too
+ if tt.query.Origin.Hash == (common.Hash{}) {
+ if origin := pm.blockchain.GetBlockByNumber(tt.query.Origin.Number); origin != nil {
+ tt.query.Origin.Hash, tt.query.Origin.Number = origin.Hash(), 0
+
+ p2p.Send(peer.app, 0x03, tt.query)
+ if err := p2p.ExpectMsg(peer.app, 0x04, headers); err != nil {
+ t.Errorf("test %d: headers mismatch: %v", i, err)
+ }
+ }
+ }
+ }
+}
+
+// Tests that block contents can be retrieved from a remote chain based on their hashes.
+func TestGetBlockBodies62(t *testing.T) { testGetBlockBodies(t, 62) }
+func TestGetBlockBodies63(t *testing.T) { testGetBlockBodies(t, 63) }
+
+func testGetBlockBodies(t *testing.T, protocol int) {
+ pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, downloader.MaxBlockFetch+15, nil, nil)
+ peer, _ := newTestPeer("peer", protocol, pm, true)
+ defer peer.close()
+
+ // Create a batch of tests for various scenarios
+ limit := downloader.MaxBlockFetch
+ tests := []struct {
+ random int // Number of blocks to fetch randomly from the chain
+ explicit []common.Hash // Explicitly requested blocks
+ available []bool // Availability of explicitly requested blocks
+ expected int // Total number of existing blocks to expect
+ }{
+ {1, nil, nil, 1}, // A single random block should be retrievable
+ {10, nil, nil, 10}, // Multiple random blocks should be retrievable
+ {limit, nil, nil, limit}, // The maximum possible blocks should be retrievable
+ {limit + 1, nil, nil, limit}, // No more than the possible block count should be returned
+ {0, []common.Hash{pm.blockchain.Genesis().Hash()}, []bool{true}, 1}, // The genesis block should be retrievable
+ {0, []common.Hash{pm.blockchain.CurrentBlock().Hash()}, []bool{true}, 1}, // The chains head block should be retrievable
+ {0, []common.Hash{{}}, []bool{false}, 0}, // A non existent block should not be returned
+
+ // Existing and non-existing blocks interleaved should not cause problems
+ {0, []common.Hash{
+ {},
+ pm.blockchain.GetBlockByNumber(1).Hash(),
+ {},
+ pm.blockchain.GetBlockByNumber(10).Hash(),
+ {},
+ pm.blockchain.GetBlockByNumber(100).Hash(),
+ {},
+ }, []bool{false, true, false, true, false, true, false}, 3},
+ }
+ // Run each of the tests and verify the results against the chain
+ for i, tt := range tests {
+ // Collect the hashes to request, and the response to expect
+ hashes, seen := []common.Hash{}, make(map[int64]bool)
+ bodies := []*blockBody{}
+
+ for j := 0; j < tt.random; j++ {
+ for {
+ num := rand.Int63n(int64(pm.blockchain.CurrentBlock().NumberU64()))
+ if !seen[num] {
+ seen[num] = true
+
+ block := pm.blockchain.GetBlockByNumber(uint64(num))
+ hashes = append(hashes, block.Hash())
+ if len(bodies) < tt.expected {
+ bodies = append(bodies, &blockBody{Transactions: block.Transactions(), Uncles: block.Uncles()})
+ }
+ break
+ }
+ }
+ }
+ for j, hash := range tt.explicit {
+ hashes = append(hashes, hash)
+ if tt.available[j] && len(bodies) < tt.expected {
+ block := pm.blockchain.GetBlockByHash(hash)
+ bodies = append(bodies, &blockBody{Transactions: block.Transactions(), Uncles: block.Uncles()})
+ }
+ }
+ // Send the hash request and verify the response
+ p2p.Send(peer.app, 0x05, hashes)
+ if err := p2p.ExpectMsg(peer.app, 0x06, bodies); err != nil {
+ t.Errorf("test %d: bodies mismatch: %v", i, err)
+ }
+ }
+}
+
+// Tests that the node state database can be retrieved based on hashes.
+func TestGetNodeData63(t *testing.T) { testGetNodeData(t, 63) }
+
+func testGetNodeData(t *testing.T, protocol int) {
+ // Define three accounts to simulate transactions with
+ acc1Key, _ := crypto.HexToECDSA("8a1f9a8f95be41cd7ccb6168179afb4504aefe388d1e14474d32c45c72ce7b7a")
+ acc2Key, _ := crypto.HexToECDSA("49a7b37aa6f6645917e7b807e9d1c00d4fa71f18343b0d4122a4d2df64dd6fee")
+ acc1Addr := crypto.PubkeyToAddress(acc1Key.PublicKey)
+ acc2Addr := crypto.PubkeyToAddress(acc2Key.PublicKey)
+
+ signer := types.HomesteadSigner{}
+ // Create a chain generator with some simple transactions (blatantly stolen from @fjl/chain_markets_test)
+ generator := func(i int, block *core.BlockGen) {
+ switch i {
+ case 0:
+ // In block 1, the test bank sends account #1 some ether.
+ tx, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBank), acc1Addr, big.NewInt(10000), params.TxGas, nil, nil), signer, testBankKey)
+ block.AddTx(tx)
+ case 1:
+ // In block 2, the test bank sends some more ether to account #1.
+ // acc1Addr passes it on to account #2.
+ tx1, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBank), acc1Addr, big.NewInt(1000), params.TxGas, nil, nil), signer, testBankKey)
+ tx2, _ := types.SignTx(types.NewTransaction(block.TxNonce(acc1Addr), acc2Addr, big.NewInt(1000), params.TxGas, nil, nil), signer, acc1Key)
+ block.AddTx(tx1)
+ block.AddTx(tx2)
+ case 2:
+ // Block 3 is empty but was mined by account #2.
+ block.SetCoinbase(acc2Addr)
+ block.SetExtra([]byte("yeehaw"))
+ case 3:
+ // Block 4 includes blocks 2 and 3 as uncle headers (with modified extra data).
+ b2 := block.PrevBlock(1).Header()
+ b2.Extra = []byte("foo")
+ block.AddUncle(b2)
+ b3 := block.PrevBlock(2).Header()
+ b3.Extra = []byte("foo")
+ block.AddUncle(b3)
+ }
+ }
+ // Assemble the test environment
+ pm, db := newTestProtocolManagerMust(t, downloader.FullSync, 4, generator, nil)
+ peer, _ := newTestPeer("peer", protocol, pm, true)
+ defer peer.close()
+
+ // Fetch for now the entire chain db
+ hashes := []common.Hash{}
+ for _, key := range db.Keys() {
+ if len(key) == len(common.Hash{}) {
+ hashes = append(hashes, common.BytesToHash(key))
+ }
+ }
+ p2p.Send(peer.app, 0x0d, hashes)
+ msg, err := peer.app.ReadMsg()
+ if err != nil {
+ t.Fatalf("failed to read node data response: %v", err)
+ }
+ if msg.Code != 0x0e {
+ t.Fatalf("response packet code mismatch: have %x, want %x", msg.Code, 0x0c)
+ }
+ var data [][]byte
+ if err := msg.Decode(&data); err != nil {
+ t.Fatalf("failed to decode response node data: %v", err)
+ }
+ // Verify that all hashes correspond to the requested data, and reconstruct a state tree
+ for i, want := range hashes {
+ if hash := crypto.Keccak256Hash(data[i]); hash != want {
+ t.Errorf("data hash mismatch: have %x, want %x", hash, want)
+ }
+ }
+ statedb := ethdb.NewMemDatabase()
+ for i := 0; i < len(data); i++ {
+ statedb.Put(hashes[i].Bytes(), data[i])
+ }
+ accounts := []common.Address{testBank, acc1Addr, acc2Addr}
+ for i := uint64(0); i <= pm.blockchain.CurrentBlock().NumberU64(); i++ {
+ trie, _ := state.New(pm.blockchain.GetBlockByNumber(i).Root(), state.NewDatabase(statedb))
+
+ for j, acc := range accounts {
+ state, _ := pm.blockchain.State()
+ bw := state.GetBalance(acc)
+ bh := trie.GetBalance(acc)
+
+ if (bw != nil && bh == nil) || (bw == nil && bh != nil) {
+ t.Errorf("test %d, account %d: balance mismatch: have %v, want %v", i, j, bh, bw)
+ }
+ if bw != nil && bh != nil && bw.Cmp(bw) != 0 {
+ t.Errorf("test %d, account %d: balance mismatch: have %v, want %v", i, j, bh, bw)
+ }
+ }
+ }
+}
+
+// Tests that the transaction receipts can be retrieved based on hashes.
+func TestGetReceipt63(t *testing.T) { testGetReceipt(t, 63) }
+
+func testGetReceipt(t *testing.T, protocol int) {
+ // Define three accounts to simulate transactions with
+ acc1Key, _ := crypto.HexToECDSA("8a1f9a8f95be41cd7ccb6168179afb4504aefe388d1e14474d32c45c72ce7b7a")
+ acc2Key, _ := crypto.HexToECDSA("49a7b37aa6f6645917e7b807e9d1c00d4fa71f18343b0d4122a4d2df64dd6fee")
+ acc1Addr := crypto.PubkeyToAddress(acc1Key.PublicKey)
+ acc2Addr := crypto.PubkeyToAddress(acc2Key.PublicKey)
+
+ signer := types.HomesteadSigner{}
+ // Create a chain generator with some simple transactions (blatantly stolen from @fjl/chain_markets_test)
+ generator := func(i int, block *core.BlockGen) {
+ switch i {
+ case 0:
+ // In block 1, the test bank sends account #1 some ether.
+ tx, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBank), acc1Addr, big.NewInt(10000), params.TxGas, nil, nil), signer, testBankKey)
+ block.AddTx(tx)
+ case 1:
+ // In block 2, the test bank sends some more ether to account #1.
+ // acc1Addr passes it on to account #2.
+ tx1, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBank), acc1Addr, big.NewInt(1000), params.TxGas, nil, nil), signer, testBankKey)
+ tx2, _ := types.SignTx(types.NewTransaction(block.TxNonce(acc1Addr), acc2Addr, big.NewInt(1000), params.TxGas, nil, nil), signer, acc1Key)
+ block.AddTx(tx1)
+ block.AddTx(tx2)
+ case 2:
+ // Block 3 is empty but was mined by account #2.
+ block.SetCoinbase(acc2Addr)
+ block.SetExtra([]byte("yeehaw"))
+ case 3:
+ // Block 4 includes blocks 2 and 3 as uncle headers (with modified extra data).
+ b2 := block.PrevBlock(1).Header()
+ b2.Extra = []byte("foo")
+ block.AddUncle(b2)
+ b3 := block.PrevBlock(2).Header()
+ b3.Extra = []byte("foo")
+ block.AddUncle(b3)
+ }
+ }
+ // Assemble the test environment
+ pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 4, generator, nil)
+ peer, _ := newTestPeer("peer", protocol, pm, true)
+ defer peer.close()
+
+ // Collect the hashes to request, and the response to expect
+ hashes, receipts := []common.Hash{}, []types.Receipts{}
+ for i := uint64(0); i <= pm.blockchain.CurrentBlock().NumberU64(); i++ {
+ block := pm.blockchain.GetBlockByNumber(i)
+
+ hashes = append(hashes, block.Hash())
+ receipts = append(receipts, pm.blockchain.GetReceiptsByHash(block.Hash()))
+ }
+ // Send the hash request and verify the response
+ p2p.Send(peer.app, 0x0f, hashes)
+ if err := p2p.ExpectMsg(peer.app, 0x10, receipts); err != nil {
+ t.Errorf("receipts mismatch: %v", err)
+ }
+}
diff --git a/dex/helper_test.go b/dex/helper_test.go
index 7e3479958..dcda6f4d2 100644
--- a/dex/helper_test.go
+++ b/dex/helper_test.go
@@ -37,7 +37,7 @@ import (
"github.com/dexon-foundation/dexon/ethdb"
"github.com/dexon-foundation/dexon/event"
"github.com/dexon-foundation/dexon/p2p"
- "github.com/dexon-foundation/dexon/p2p/discover"
+ "github.com/dexon-foundation/dexon/p2p/enode"
"github.com/dexon-foundation/dexon/params"
)
@@ -48,24 +48,47 @@ var (
// testP2PServer is a fake, helper p2p server for testing purposes.
type testP2PServer struct {
- added chan *discover.Node
- removed chan *discover.Node
+ mu sync.Mutex
+ self *enode.Node
+ direct map[enode.NodeID]*enode.Node
+ group map[string][]*enode.Node
}
-func (s *testP2PServer) Self() *discover.Node {
- return &discover.Node{}
+func newTestP2PServer(self *enode.Node) *testP2PServer {
+ return &testP2PServer{
+ self: self,
+ direct: make(map[enode.NodeID]*enode.Node),
+ group: make(map[string][]*enode.Node),
+ }
}
-func (s *testP2PServer) AddNotaryPeer(node *discover.Node) {
- if s.added != nil {
- s.added <- node
- }
+func (s *testP2PServer) Self() *enode.Node {
+ return s.self
}
-func (s *testP2PServer) RemoveNotaryPeer(node *discover.Node) {
- if s.removed != nil {
- s.removed <- node
- }
+func (s *testP2PServer) AddDirectPeer(node *enode.Node) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.direct[node.ID] = node
+}
+
+func (s *testP2PServer) RemoveDirectPeer(node *enode.Node) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ delete(s.direct, node.ID)
+}
+
+func (s *testP2PServer) AddGroup(
+ name string, nodes []*enode.Node, num uint64) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.group[name] = nodes
+}
+
+func (s *testP2PServer) RemoveGroup(name string) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ delete(s.group, name)
}
// newTestProtocolManager creates a new protocol manager for testing purposes,
@@ -88,11 +111,11 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func
panic(err)
}
- pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db)
+ pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db, &testGovernance{})
if err != nil {
return nil, nil, err
}
- pm.Start(&testP2PServer{}, 1000)
+ pm.Start(newTestP2PServer(&enode.Node{}), 1000)
return pm, db, nil
}
@@ -157,6 +180,29 @@ func newTestTransaction(from *ecdsa.PrivateKey, nonce uint64, datasize int) *typ
return tx
}
+// testGovernance is a fake, helper governance for testing purposes
+type testGovernance struct {
+ getChainNumFunc func(uint64) uint32
+ getNotarySetFunc func(uint32, uint64) map[string]struct{}
+ getDKGSetFunc func(uint64) map[string]struct{}
+}
+
+func (g *testGovernance) GetChainNum(round uint64) uint32 {
+ return g.getChainNumFunc(round)
+}
+
+func (g *testGovernance) GetNotarySet(chainID uint32, round uint64) map[string]struct{} {
+ return g.getNotarySetFunc(chainID, round)
+}
+
+func (g *testGovernance) GetDKGSet(round uint64) map[string]struct{} {
+ return g.getDKGSetFunc(round)
+}
+
+func (g *testGovernance) SubscribeNewCRSEvent(ch chan core.NewCRSEvent) event.Subscription {
+ return nil
+}
+
// testPeer is a simulated peer to allow testing direct network calls.
type testPeer struct {
net p2p.MsgReadWriter // Network layer reader/writer to simulate remote messaging
@@ -170,7 +216,7 @@ func newTestPeer(name string, version int, pm *ProtocolManager, shake bool) (*te
app, net := p2p.MsgPipe()
// Generate a random id and create the peer
- var id discover.NodeID
+ var id enode.NodeID
rand.Read(id[:])
peer := pm.newPeer(version, p2p.NewPeer(id, name, nil), net)
diff --git a/dex/network.go b/dex/network.go
index c3a0ef792..b19b46147 100644
--- a/dex/network.go
+++ b/dex/network.go
@@ -23,14 +23,6 @@ func (n *DexconNetwork) BroadcastVote(vote *types.Vote) {
func (n *DexconNetwork) BroadcastBlock(block *types.Block) {
}
-// BroadcastRandomnessRequest broadcasts rand request to DKG set.
-func (n *DexconNetwork) BroadcastRandomnessRequest(randRequest *types.AgreementResult) {
-}
-
-// BroadcastRandomnessResult broadcasts rand request to Notary set.
-func (n *DexconNetwork) BroadcastRandomnessResult(randResult *types.BlockRandomnessResult) {
-}
-
// SendDKGPrivateShare sends PrivateShare to a DKG participant.
func (n *DexconNetwork) SendDKGPrivateShare(
pub crypto.PublicKey, prvShare *types.DKGPrivateShare) {
@@ -47,6 +39,15 @@ func (n *DexconNetwork) BroadcastDKGPartialSignature(
psig *types.DKGPartialSignature) {
}
+// BroadcastRandomnessRequest broadcasts rand request to DKG set.
+func (n *DexconNetwork) BroadcastRandomnessRequest(randRequest *types.AgreementResult) {
+
+}
+
+// BroadcastRandomnessResult broadcasts rand request to Notary set.
+func (n *DexconNetwork) BroadcastRandomnessResult(randResult *types.BlockRandomnessResult) {
+}
+
// ReceiveChan returns a channel to receive messages from DEXON network.
func (n *DexconNetwork) ReceiveChan() <-chan interface{} {
return n.receiveChan
diff --git a/dex/nodetable.go b/dex/nodetable.go
new file mode 100644
index 000000000..929b168a8
--- /dev/null
+++ b/dex/nodetable.go
@@ -0,0 +1,79 @@
+package dex
+
+import (
+ "net"
+ "sync"
+
+ "github.com/dexon-foundation/dexon/common"
+ "github.com/dexon-foundation/dexon/crypto/sha3"
+ "github.com/dexon-foundation/dexon/event"
+ "github.com/dexon-foundation/dexon/p2p/enode"
+ "github.com/dexon-foundation/dexon/rlp"
+)
+
+type NodeMeta struct {
+ ID enode.ID
+ IP net.IP
+ UDP int
+ TCP int
+ Timestamp uint64
+ Sig []byte
+}
+
+func (n *NodeMeta) Hash() (h common.Hash) {
+ hw := sha3.NewKeccak256()
+ rlp.Encode(hw, n)
+ hw.Sum(h[:0])
+ return h
+}
+
+type newMetasEvent struct{ Metas []*NodeMeta }
+
+type nodeTable struct {
+ mu sync.RWMutex
+ entry map[enode.ID]*NodeMeta
+ feed event.Feed
+}
+
+func newNodeTable() *nodeTable {
+ return &nodeTable{
+ entry: make(map[enode.ID]*NodeMeta),
+ }
+}
+
+func (t *nodeTable) Get(id enode.ID) *NodeMeta {
+ t.mu.RLock()
+ defer t.mu.RUnlock()
+ return t.entry[id]
+}
+
+func (t *nodeTable) Add(metas []*NodeMeta) {
+ t.mu.Lock()
+ defer t.mu.Unlock()
+
+ var newMetas []*NodeMeta
+ for _, meta := range metas {
+ // TODO: validate the meta
+ if e, ok := t.entry[meta.ID]; ok && e.Timestamp > meta.Timestamp {
+ continue
+ }
+ t.entry[meta.ID] = meta
+ newMetas = append(newMetas, meta)
+ }
+ t.feed.Send(newMetasEvent{newMetas})
+}
+
+func (t *nodeTable) Metas() []*NodeMeta {
+ t.mu.RLock()
+ defer t.mu.RUnlock()
+ metas := make([]*NodeMeta, 0, len(t.entry))
+ for _, meta := range t.entry {
+ metas = append(metas, meta)
+ }
+ return metas
+}
+
+func (t *nodeTable) SubscribeNewMetasEvent(
+ ch chan<- newMetasEvent) event.Subscription {
+ return t.feed.Subscribe(ch)
+}
diff --git a/dex/nodetable_test.go b/dex/nodetable_test.go
new file mode 100644
index 000000000..5b3f7de57
--- /dev/null
+++ b/dex/nodetable_test.go
@@ -0,0 +1,93 @@
+package dex
+
+import (
+ "math/rand"
+ "testing"
+ "time"
+
+ "github.com/dexon-foundation/dexon/common"
+ "github.com/dexon-foundation/dexon/p2p/enode"
+)
+
+func TestNodeTable(t *testing.T) {
+ table := newNodeTable()
+ ch := make(chan newMetasEvent)
+ table.SubscribeNewMetasEvent(ch)
+
+ metas1 := []*NodeMeta{
+ &NodeMeta{ID: randomID()},
+ &NodeMeta{ID: randomID()},
+ }
+
+ metas2 := []*NodeMeta{
+ &NodeMeta{ID: randomID()},
+ &NodeMeta{ID: randomID()},
+ }
+
+ go table.Add(metas1)
+
+ select {
+ case newMetas := <-ch:
+ m := map[common.Hash]struct{}{}
+ for _, meta := range newMetas.Metas {
+ m[meta.Hash()] = struct{}{}
+ }
+
+ if len(m) != len(metas1) {
+ t.Errorf("len mismatch: got %d, want: %d",
+ len(m), len(metas1))
+ }
+
+ for _, meta := range metas1 {
+ if _, ok := m[meta.Hash()]; !ok {
+ t.Errorf("expected meta (%s) not exists", meta.Hash())
+ }
+ }
+ case <-time.After(1 * time.Second):
+ t.Error("did not receive new metas event within one second")
+ }
+
+ go table.Add(metas2)
+ select {
+ case newMetas := <-ch:
+ m := map[common.Hash]struct{}{}
+ for _, meta := range newMetas.Metas {
+ m[meta.Hash()] = struct{}{}
+ }
+
+ if len(m) != len(metas1) {
+ t.Errorf("len mismatch: got %d, want: %d",
+ len(m), len(metas2))
+ }
+
+ for _, meta := range metas2 {
+ if _, ok := m[meta.Hash()]; !ok {
+ t.Errorf("expected meta (%s) not exists", meta.Hash())
+ }
+ }
+ case <-time.After(1 * time.Second):
+ t.Error("did not receive new metas event within one second")
+ }
+
+ var metas []*NodeMeta
+ metas = append(metas, metas1...)
+ metas = append(metas, metas2...)
+ allMetas := table.Metas()
+ if len(allMetas) != len(metas) {
+ t.Errorf("all metas num mismatch: got %d, want %d",
+ len(metas), len(allMetas))
+ }
+
+ for _, m := range metas {
+ if m.Hash() != table.Get(m.ID).Hash() {
+ t.Errorf("meta (%s) mismatch", m.ID.String())
+ }
+ }
+}
+
+func randomID() (id enode.ID) {
+ for i := range id {
+ id[i] = byte(rand.Intn(255))
+ }
+ return id
+}
diff --git a/dex/notaryset.go b/dex/notaryset.go
deleted file mode 100644
index 74d259314..000000000
--- a/dex/notaryset.go
+++ /dev/null
@@ -1,203 +0,0 @@
-package dex
-
-import (
- "fmt"
- "sync"
-
- "github.com/dexon-foundation/dexon/p2p/discover"
-)
-
-type nodeInfo struct {
- info *notaryNodeInfo
- added bool
-}
-
-func (n *nodeInfo) NewNode() *discover.Node {
- return discover.NewNode(n.info.ID, n.info.IP, n.info.UDP, n.info.TCP)
-}
-
-type notarySet struct {
- round uint64
- m map[string]*nodeInfo
- lock sync.RWMutex
-}
-
-func newNotarySet(round uint64, s map[string]struct{}) *notarySet {
- m := make(map[string]*nodeInfo)
- for nodeID := range s {
- m[nodeID] = &nodeInfo{}
- }
-
- return &notarySet{
- round: round,
- m: m,
- }
-}
-
-// Call this function when the notaryNodeInfoMsg is received.
-func (n *notarySet) AddInfo(info *notaryNodeInfo) error {
- n.lock.Lock()
- defer n.lock.Unlock()
-
- // check round
- if info.Round != n.round {
- return fmt.Errorf("invalid round")
- }
-
- nInfo, ok := n.m[info.ID.String()]
- if !ok {
- return fmt.Errorf("not in notary set")
- }
-
- // if the info exists check timstamp
- if nInfo.info != nil {
- if nInfo.info.Timestamp > info.Timestamp {
- return fmt.Errorf("old msg")
- }
- }
-
- nInfo.info = info
- return nil
-}
-
-// MarkAdded mark the notary node as added
-// to prevent duplcate addition in the future.
-func (n *notarySet) MarkAdded(nodeID string) {
- if info, ok := n.m[nodeID]; ok {
- info.added = true
- }
-}
-
-// Return all nodes
-func (n *notarySet) Nodes() []*discover.Node {
- n.lock.RLock()
- defer n.lock.RUnlock()
-
- list := make([]*discover.Node, 0, len(n.m))
- for _, info := range n.m {
- list = append(list, info.NewNode())
- }
- return list
-}
-
-// Return nodes that need to be added to p2p server as notary node.
-func (n *notarySet) NodesToAdd() []*discover.Node {
- n.lock.RLock()
- defer n.lock.RUnlock()
-
- var list []*discover.Node
- for _, info := range n.m {
- // craete a new discover.Node
- if !info.added {
- list = append(list, info.NewNode())
- }
- }
- return list
-}
-
-type notarySetManager struct {
- m map[uint64]*notarySet
- lock sync.RWMutex
- queued map[uint64]map[string]*notaryNodeInfo
- round uint64 // biggest round of managed notary sets
- newNotaryNodeInfoCh chan *notaryNodeInfo
-}
-
-func newNotarySetManager(
- newNotaryNodeInfoCh chan *notaryNodeInfo) *notarySetManager {
- return &notarySetManager{
- m: make(map[uint64]*notarySet),
- queued: make(map[uint64]map[string]*notaryNodeInfo),
- newNotaryNodeInfoCh: newNotaryNodeInfoCh,
- }
-}
-
-// Register injects a new notary set into the manager and
-// processes the queued info.
-func (n *notarySetManager) Register(r uint64, s *notarySet) {
- n.lock.Lock()
- defer n.lock.Unlock()
- if r > n.round {
- n.round = r
- }
- n.m[r] = s
- n.processQueuedInfo()
-}
-
-// Unregister removes the notary set of the given round.
-func (n *notarySetManager) Unregister(r uint64) {
- n.lock.Lock()
- defer n.lock.Unlock()
- delete(n.m, r)
-}
-
-// Round returns the notary set of the given round.
-func (n *notarySetManager) Round(r uint64) (*notarySet, bool) {
- n.lock.RLock()
- defer n.lock.RUnlock()
- s, ok := n.m[r]
- return s, ok
-}
-
-// Before returns all the notary sets that before the given round.
-func (n *notarySetManager) Before(r uint64) []*notarySet {
- n.lock.RLock()
- defer n.lock.RUnlock()
- var list []*notarySet
- for round, s := range n.m {
- if round < r {
- list = append(list, s)
- }
- }
- return list
-}
-
-// TryAddInfo associates the given info to the notary set if the notary set is
-// managed by the manager.
-// If the notary node info is belong to future notary set, queue the info.
-func (n *notarySetManager) TryAddInfo(info *notaryNodeInfo) {
- n.lock.Lock()
- defer n.lock.Unlock()
- n.tryAddInfo(info)
-}
-
-// This function is extract for calling without lock.
-// Make sure the caller already accquired the lock.
-func (n *notarySetManager) tryAddInfo(info *notaryNodeInfo) {
- if info.Round > n.round {
- if q, ok := n.queued[info.Round]; ok {
- q[info.Hash().String()] = info
- return
- }
- n.queued[info.Round] = map[string]*notaryNodeInfo{
- info.Hash().String(): info,
- }
- return
- }
-
- s, ok := n.Round(info.Round)
- if !ok {
- return
- }
- s.AddInfo(info)
-
- // TODO(sonic): handle timeout
- n.newNotaryNodeInfoCh <- info
-}
-
-func (n *notarySetManager) processQueuedInfo() {
- n.lock.Lock()
- defer n.lock.Unlock()
- if q, ok := n.queued[n.round]; ok {
- for _, info := range q {
- n.tryAddInfo(info)
- }
- }
-
- // Clear queue infos before current round.
- for round := range n.queued {
- if round <= n.round {
- delete(n.queued, round)
- }
- }
-}
diff --git a/dex/peer.go b/dex/peer.go
index f1a4335d1..8c218f1d9 100644
--- a/dex/peer.go
+++ b/dex/peer.go
@@ -27,6 +27,7 @@ import (
"github.com/dexon-foundation/dexon/common"
"github.com/dexon-foundation/dexon/core/types"
"github.com/dexon-foundation/dexon/p2p"
+ "github.com/dexon-foundation/dexon/p2p/discover"
"github.com/dexon-foundation/dexon/rlp"
)
@@ -34,19 +35,20 @@ var (
errClosed = errors.New("peer set is closed")
errAlreadyRegistered = errors.New("peer is already registered")
errNotRegistered = errors.New("peer is not registered")
- errInvalidRound = errors.New("invalid round")
)
const (
maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
+ maxKnownMetas = 32768 // Maximum metas hashes to keep in the known list (prevent DOS)
maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
- maxKnownInfos = 1024
// maxQueuedTxs is the maximum number of transaction lists to queue up before
// dropping broadcasts. This is a sensitive number as a transaction list might
// contain a single transaction, or thousands.
maxQueuedTxs = 128
+ maxQueuedMetas = 512
+
// maxQueuedProps is the maximum number of block propagations to queue up before
// dropping broadcasts. There's not much point in queueing stale blocks, so a few
// that might cover uncles should be enough.
@@ -57,9 +59,9 @@ const (
// above some healthy uncle limit, so use that.
maxQueuedAnns = 4
- maxQueuedInfos = 1024
-
handshakeTimeout = 5 * time.Second
+
+ groupNodeNum = 3
)
// PeerInfo represents a short summary of the Ethereum sub-protocol metadata known
@@ -76,13 +78,27 @@ type propEvent struct {
td *big.Int
}
+type setType uint32
+
+const (
+ dkgset = iota
+ notaryset
+)
+
+type peerLabel struct {
+ set setType
+ chainID uint32
+ round uint64
+}
+
type peer struct {
id string
*p2p.Peer
rw p2p.MsgReadWriter
- version int // Protocol version negotiated
+ version int // Protocol version negotiated
+ labels mapset.Set
forkDrop *time.Timer // Timed connection dropper if forks aren't validated in time
head common.Hash
@@ -90,12 +106,12 @@ type peer struct {
lock sync.RWMutex
knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
+ knownMetas mapset.Set // Set of node metas known to be known by this peer
knownBlocks mapset.Set // Set of block hashes known to be known by this peer
- knownInfos mapset.Set // Set of infos known to be known by this peer
queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
+ queuedMetas chan []*NodeMeta // Queue of node metas to broadcast to the peer
queuedProps chan *propEvent // Queue of blocks to broadcast to the peer
queuedAnns chan *types.Block // Queue of blocks to announce to the peer
- queuedInfos chan *notaryNodeInfo // Queue of infos to broadcast to the peer
term chan struct{} // Termination channel to stop the broadcaster
}
@@ -104,20 +120,21 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
Peer: p,
rw: rw,
version: version,
- id: fmt.Sprintf("%x", p.ID().Bytes()[:8]),
+ labels: mapset.NewSet(),
+ id: p.ID().String(),
knownTxs: mapset.NewSet(),
+ knownMetas: mapset.NewSet(),
knownBlocks: mapset.NewSet(),
- knownInfos: mapset.NewSet(),
queuedTxs: make(chan []*types.Transaction, maxQueuedTxs),
+ queuedMetas: make(chan []*NodeMeta, maxQueuedMetas),
queuedProps: make(chan *propEvent, maxQueuedProps),
queuedAnns: make(chan *types.Block, maxQueuedAnns),
- queuedInfos: make(chan *notaryNodeInfo, maxQueuedInfos),
term: make(chan struct{}),
}
}
// broadcast is a write loop that multiplexes block propagations, announcements,
-// transaction and notary node infos broadcasts into the remote peer.
+// transaction and notary node metas broadcasts into the remote peer.
// The goal is to have an async writer that does not lock up node internals.
func (p *peer) broadcast() {
for {
@@ -128,6 +145,12 @@ func (p *peer) broadcast() {
}
p.Log().Trace("Broadcast transactions", "count", len(txs))
+ case metas := <-p.queuedMetas:
+ if err := p.SendNodeMetas(metas); err != nil {
+ return
+ }
+ p.Log().Trace("Broadcast node metas", "count", len(metas))
+
case prop := <-p.queuedProps:
if err := p.SendNewBlock(prop.block, prop.td); err != nil {
return
@@ -140,12 +163,6 @@ func (p *peer) broadcast() {
}
p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash())
- case info := <-p.queuedInfos:
- if err := p.SendNotaryNodeInfo(info); err != nil {
- return
- }
- p.Log().Trace("Broadcast notary node info")
-
case <-p.term:
return
}
@@ -157,6 +174,14 @@ func (p *peer) close() {
close(p.term)
}
+func (p *peer) addLabel(label peerLabel) {
+ p.labels.Add(label)
+}
+
+func (p *peer) removeLabel(label peerLabel) {
+ p.labels.Remove(label)
+}
+
// Info gathers and returns a collection of metadata known about a peer.
func (p *peer) Info() *PeerInfo {
hash, td := p.Head()
@@ -207,11 +232,11 @@ func (p *peer) MarkTransaction(hash common.Hash) {
p.knownTxs.Add(hash)
}
-func (p *peer) MarkNotaryNodeInfo(hash common.Hash) {
- for p.knownInfos.Cardinality() >= maxKnownInfos {
- p.knownInfos.Pop()
+func (p *peer) MarkNodeMeta(hash common.Hash) {
+ for p.knownMetas.Cardinality() >= maxKnownMetas {
+ p.knownMetas.Pop()
}
- p.knownInfos.Add(hash)
+ p.knownMetas.Add(hash)
}
// SendTransactions sends transactions to the peer and includes the hashes
@@ -236,21 +261,26 @@ func (p *peer) AsyncSendTransactions(txs []*types.Transaction) {
}
}
-// SendNotaryNodeInfo sends the info to the peer and includes the hashes
-// in its info hash set for future reference.
-func (p *peer) SendNotaryNodeInfo(info *notaryNodeInfo) error {
- return p2p.Send(p.rw, NotaryNodeInfoMsg, info)
+// SendNodeMetas sends the metas to the peer and includes the hashes
+// in its metas hash set for future reference.
+func (p *peer) SendNodeMetas(metas []*NodeMeta) error {
+ for _, meta := range metas {
+ p.knownMetas.Add(meta.Hash())
+ }
+ return p2p.Send(p.rw, MetaMsg, metas)
}
-// AsyncSendNotaryNodeInfo queues list of notary node info propagation to a
+// AsyncSendNodeMeta queues list of notary node meta propagation to a
// remote peer. If the peer's broadcast queue is full, the event is silently
// dropped.
-func (p *peer) AsyncSendNotaryNodeInfo(info *notaryNodeInfo) {
+func (p *peer) AsyncSendNodeMetas(metas []*NodeMeta) {
select {
- case p.queuedInfos <- info:
- p.knownInfos.Add(info.Hash())
+ case p.queuedMetas <- metas:
+ for _, meta := range metas {
+ p.knownMetas.Add(meta.Hash())
+ }
default:
- p.Log().Debug("Dropping notary node info propagation")
+ p.Log().Debug("Dropping node meta propagation", "count", len(metas))
}
}
@@ -431,7 +461,7 @@ func (p *peer) readStatus(network uint64, status *statusData, genesis common.Has
// String implements fmt.Stringer.
func (p *peer) String() string {
return fmt.Sprintf("Peer %s [%s]", p.id,
- fmt.Sprintf("eth/%2d", p.version),
+ fmt.Sprintf("dex/%2d", p.version),
)
}
@@ -441,18 +471,25 @@ type peerSet struct {
peers map[string]*peer
lock sync.RWMutex
closed bool
+ tab *nodeTable
- // TODO(sonic): revist this map after dexon core SDK is finalized.
- // use types.ValidatorID? or implement Stringer for types.ValidatorID
- notaryPeers map[uint64]map[string]*peer
- round uint64
+ srvr p2pServer
+ gov governance
+ peerLabels map[string]map[peerLabel]struct{}
+ notaryHistory map[uint64]struct{}
+ dkgHistory map[uint64]struct{}
}
// newPeerSet creates a new peer set to track the active participants.
-func newPeerSet() *peerSet {
+func newPeerSet(gov governance, srvr p2pServer, tab *nodeTable) *peerSet {
return &peerSet{
- peers: make(map[string]*peer),
- notaryPeers: make(map[uint64]map[string]*peer),
+ peers: make(map[string]*peer),
+ gov: gov,
+ srvr: srvr,
+ tab: tab,
+ peerLabels: make(map[string]map[peerLabel]struct{}),
+ notaryHistory: make(map[uint64]struct{}),
+ dkgHistory: make(map[uint64]struct{}),
}
}
@@ -537,14 +574,14 @@ func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer {
return list
}
-// PeersWithoutNotaryNodeInfo retrieves a list of peers that do not have a
-// given info in their set of known hashes.
-func (ps *peerSet) PeersWithoutNotaryNodeInfo(hash common.Hash) []*peer {
+// PeersWithoutNodeMeta retrieves a list of peers that do not have a
+// given meta in their set of known hashes.
+func (ps *peerSet) PeersWithoutNodeMeta(hash common.Hash) []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()
list := make([]*peer, 0, len(ps.peers))
for _, p := range ps.peers {
- if p.knownInfos.Contains(hash) {
+ if !p.knownMetas.Contains(hash) {
list = append(list, p)
}
}
@@ -580,44 +617,171 @@ func (ps *peerSet) Close() {
ps.closed = true
}
-// AddNotaryPeer adds a peer into notary peer of the given round.
-// Caller of this function need to make sure that the peer is acutally in
-// notary set.
-func (ps *peerSet) AddNotaryPeer(round uint64, p *peer) error {
+func (ps *peerSet) BuildNotaryConn(round uint64) {
ps.lock.Lock()
defer ps.lock.Unlock()
- // TODO(sonic): revisit this round check after dexon core SDK is finalized.
- if round < ps.round || round > ps.round+2 {
- return errInvalidRound
+ if _, ok := ps.notaryHistory[round]; ok {
+ return
}
- if _, ok := ps.peers[p.id]; !ok {
- return errNotRegistered
+ ps.notaryHistory[round] = struct{}{}
+
+ selfID := ps.srvr.Self().ID.String()
+ for chainID := uint32(0); chainID < ps.gov.GetChainNum(round); chainID++ {
+ s := ps.gov.GetNotarySet(chainID, round)
+
+ // not in notary set, add group
+ if _, ok := s[selfID]; !ok {
+ var nodes []*discover.Node
+ for id := range s {
+ nodes = append(nodes, ps.newNode(id))
+ }
+ ps.srvr.AddGroup(notarySetName(chainID, round), nodes, groupNodeNum)
+ continue
+ }
+
+ label := peerLabel{
+ set: notaryset,
+ chainID: chainID,
+ round: round,
+ }
+ delete(s, selfID)
+ for id := range s {
+ ps.addDirectPeer(id, label)
+ }
}
+}
- ps.notaryPeers[round][p.id] = p
- return nil
+func (ps *peerSet) ForgetNotaryConn(round uint64) {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
+
+ // forget all the rounds before the given round
+ for r := range ps.notaryHistory {
+ if r <= round {
+ ps.forgetNotaryConn(r)
+ delete(ps.notaryHistory, r)
+ }
+ }
}
-// NotaryPeers return peers in notary set of the given round.
-func (ps *peerSet) NotaryPeers(round uint64) []*peer {
- ps.lock.RLock()
- defer ps.lock.RUnlock()
+func (ps *peerSet) forgetNotaryConn(round uint64) {
+ selfID := ps.srvr.Self().ID.String()
+ for chainID := uint32(0); chainID < ps.gov.GetChainNum(round); chainID++ {
+ s := ps.gov.GetNotarySet(chainID, round)
+ if _, ok := s[selfID]; !ok {
+ ps.srvr.RemoveGroup(notarySetName(chainID, round))
+ continue
+ }
- list := make([]*peer, 0, len(ps.notaryPeers[round]))
- for _, p := range ps.notaryPeers[round] {
- if _, ok := ps.peers[p.id]; ok {
- list = append(list, p)
+ label := peerLabel{
+ set: notaryset,
+ chainID: chainID,
+ round: round,
+ }
+ delete(s, selfID)
+ for id := range s {
+ ps.removeDirectPeer(id, label)
}
}
- return list
}
-// NextRound moves notary peer set to next round.
-func (ps *peerSet) NextRound() {
+func notarySetName(chainID uint32, round uint64) string {
+ return fmt.Sprintf("%d-%d-notaryset", chainID, round)
+}
+
+func (ps *peerSet) BuildDKGConn(round uint64) {
ps.lock.Lock()
defer ps.lock.Unlock()
- delete(ps.notaryPeers, ps.round)
- ps.round = ps.round + 1
+ selfID := ps.srvr.Self().ID.String()
+ s := ps.gov.GetDKGSet(round)
+ if _, ok := s[selfID]; !ok {
+ return
+ }
+ ps.dkgHistory[round] = struct{}{}
+
+ delete(s, selfID)
+ for id := range s {
+ ps.addDirectPeer(id, peerLabel{
+ set: dkgset,
+ round: round,
+ })
+ }
+}
+
+func (ps *peerSet) ForgetDKGConn(round uint64) {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
+
+ // forget all the rounds before the given round
+ for r := range ps.dkgHistory {
+ if r <= round {
+ ps.forgetDKGConn(r)
+ delete(ps.dkgHistory, r)
+ }
+ }
+}
+
+func (ps *peerSet) forgetDKGConn(round uint64) {
+ selfID := ps.srvr.Self().ID.String()
+ s := ps.gov.GetDKGSet(round)
+ if _, ok := s[selfID]; !ok {
+ return
+ }
+
+ delete(s, selfID)
+ label := peerLabel{
+ set: dkgset,
+ round: round,
+ }
+ for id := range s {
+ ps.removeDirectPeer(id, label)
+ }
+}
+
+// make sure the ps.lock is hold
+func (ps *peerSet) addDirectPeer(id string, label peerLabel) {
+ // if the peer exists add the label
+ if p, ok := ps.peers[id]; ok {
+ p.addLabel(label)
+ }
+
+ if _, ok := ps.peerLabels[id]; !ok {
+ ps.peerLabels[id] = make(map[peerLabel]struct{})
+ }
+
+ ps.peerLabels[id][label] = struct{}{}
+ ps.srvr.AddDirectPeer(ps.newNode(id))
+}
+
+// make sure the ps.lock is hold
+func (ps *peerSet) removeDirectPeer(id string, label peerLabel) {
+ if p, ok := ps.peers[id]; ok {
+ p.removeLabel(label)
+ }
+
+ delete(ps.peerLabels[id], label)
+
+ if len(ps.peerLabels[id]) == 0 {
+ ps.srvr.RemoveDirectPeer(ps.newNode(id))
+ delete(ps.peerLabels, id)
+ }
+}
+
+func (ps *peerSet) newNode(id string) *enode.Node {
+ nodeID := enode.HexID(id)
+ meta := ps.tab.Get(enode.HexID(id))
+
+ var r enr.Record
+ r.Set(enr.ID(nodeID.String()))
+ r.Set(enr.IP(meta.IP))
+ r.Set(enr.TCP(meta.TCP))
+ r.Set(enr.UDP(meta.UDP))
+
+ n, err := enode.New(enode.ValidSchemes, &r)
+ if err != nil {
+ panic(err)
+ }
+ return n
}
diff --git a/dex/peer_test.go b/dex/peer_test.go
new file mode 100644
index 000000000..bac6ed5ec
--- /dev/null
+++ b/dex/peer_test.go
@@ -0,0 +1,628 @@
+package dex
+
+import (
+ "fmt"
+ "math/big"
+ "testing"
+
+ mapset "github.com/deckarep/golang-set"
+ "github.com/dexon-foundation/dexon/p2p/discover"
+ "github.com/dexon-foundation/dexon/p2p/enode"
+)
+
+func TestPeerSetBuildAndForgetNotaryConn(t *testing.T) {
+ self := discover.Node{ID: nodeID(0)}
+ server := newTestP2PServer(&self)
+ table := newNodeTable()
+
+ gov := &testGovernance{
+ getChainNumFunc: func(uint64) uint32 {
+ return 3
+ },
+ }
+
+ round10 := [][]enode.ID{
+ []enode.ID{nodeID(0), nodeID(1), nodeID(2)},
+ []enode.ID{nodeID(1), nodeID(3)},
+ []enode.ID{nodeID(2), nodeID(4)},
+ }
+ round11 := [][]enode.ID{
+ []enode.ID{nodeID(0), nodeID(1), nodeID(5)},
+ []enode.ID{nodeID(5), nodeID(6)},
+ []enode.ID{nodeID(0), nodeID(2), nodeID(4)},
+ }
+ round12 := [][]enode.ID{
+ []enode.ID{nodeID(0), nodeID(3), nodeID(5)},
+ []enode.ID{nodeID(0), nodeID(7), nodeID(8)},
+ []enode.ID{nodeID(0), nodeID(2), nodeID(6)},
+ }
+
+ gov.getNotarySetFunc = func(cid uint32, round uint64) map[string]struct{} {
+ m := map[uint64][][]enode.ID{
+ 10: round10,
+ 11: round11,
+ 12: round12,
+ }
+ return newTestNodeSet(m[round][cid])
+ }
+
+ ps := newPeerSet(gov, server, table)
+ peer1 := newDummyPeer(nodeID(1))
+ peer2 := newDummyPeer(nodeID(2))
+ var err error
+ err = ps.Register(peer1)
+ if err != nil {
+ t.Error(err)
+ }
+ err = ps.Register(peer2)
+ if err != nil {
+ t.Error(err)
+ }
+
+ // build round 10
+ ps.BuildNotaryConn(10)
+
+ err = checkLabels(peer1, []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkLabels(peer2, []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerLabels(ps, map[string][]peerLabel{
+ nodeID(1).String(): []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ },
+ nodeID(2).String(): []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ },
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerSetHistory(ps, []uint64{10}, notaryset)
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkDirectPeer(server, []enode.ID{
+ nodeID(1), nodeID(2),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkGroup(server, []string{
+ notarySetName(1, 10),
+ notarySetName(2, 10),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+
+ // build round 11
+ ps.BuildNotaryConn(11)
+
+ err = checkLabels(peer1, []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ peerLabel{notaryset, 0, 11},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkLabels(peer2, []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ peerLabel{notaryset, 2, 11},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerLabels(ps, map[string][]peerLabel{
+ nodeID(1).String(): []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ peerLabel{notaryset, 0, 11},
+ },
+ nodeID(2).String(): []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ peerLabel{notaryset, 2, 11},
+ },
+ nodeID(4).String(): []peerLabel{
+ peerLabel{notaryset, 2, 11},
+ },
+ nodeID(5).String(): []peerLabel{
+ peerLabel{notaryset, 0, 11},
+ },
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerSetHistory(ps, []uint64{10, 11}, notaryset)
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkDirectPeer(server, []enode.ID{
+ nodeID(1), nodeID(2), nodeID(4), nodeID(5),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkGroup(server, []string{
+ notarySetName(1, 10),
+ notarySetName(2, 10),
+ notarySetName(1, 11),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+
+ // build round 12
+ ps.BuildNotaryConn(12)
+
+ err = checkLabels(peer1, []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ peerLabel{notaryset, 0, 11},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkLabels(peer2, []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ peerLabel{notaryset, 2, 11},
+ peerLabel{notaryset, 2, 12},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerLabels(ps, map[string][]peerLabel{
+ nodeID(1).String(): []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ peerLabel{notaryset, 0, 11},
+ },
+ nodeID(2).String(): []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ peerLabel{notaryset, 2, 11},
+ peerLabel{notaryset, 2, 12},
+ },
+ nodeID(3).String(): []peerLabel{
+ peerLabel{notaryset, 0, 12},
+ },
+ nodeID(4).String(): []peerLabel{
+ peerLabel{notaryset, 2, 11},
+ },
+ nodeID(5).String(): []peerLabel{
+ peerLabel{notaryset, 0, 11},
+ peerLabel{notaryset, 0, 12},
+ },
+ nodeID(6).String(): []peerLabel{
+ peerLabel{notaryset, 2, 12},
+ },
+ nodeID(7).String(): []peerLabel{
+ peerLabel{notaryset, 1, 12},
+ },
+ nodeID(8).String(): []peerLabel{
+ peerLabel{notaryset, 1, 12},
+ },
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerSetHistory(ps, []uint64{10, 11, 12}, notaryset)
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkDirectPeer(server, []enode.ID{
+ nodeID(1), nodeID(2), nodeID(3), nodeID(4),
+ nodeID(5), nodeID(6), nodeID(7), nodeID(8),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkGroup(server, []string{
+ notarySetName(1, 10),
+ notarySetName(2, 10),
+ notarySetName(1, 11),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+
+ // forget round 11
+ ps.ForgetNotaryConn(11)
+
+ err = checkLabels(peer1, []peerLabel{})
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkLabels(peer2, []peerLabel{
+ peerLabel{notaryset, 2, 12},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerLabels(ps, map[string][]peerLabel{
+ nodeID(2).String(): []peerLabel{
+ peerLabel{notaryset, 2, 12},
+ },
+ nodeID(3).String(): []peerLabel{
+ peerLabel{notaryset, 0, 12},
+ },
+ nodeID(5).String(): []peerLabel{
+ peerLabel{notaryset, 0, 12},
+ },
+ nodeID(6).String(): []peerLabel{
+ peerLabel{notaryset, 2, 12},
+ },
+ nodeID(7).String(): []peerLabel{
+ peerLabel{notaryset, 1, 12},
+ },
+ nodeID(8).String(): []peerLabel{
+ peerLabel{notaryset, 1, 12},
+ },
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerSetHistory(ps, []uint64{12}, notaryset)
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkDirectPeer(server, []enode.ID{
+ nodeID(2), nodeID(3),
+ nodeID(5), nodeID(6), nodeID(7), nodeID(8),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkGroup(server, []string{})
+ if err != nil {
+ t.Error(err)
+ }
+
+ // forget round 12
+ ps.ForgetNotaryConn(12)
+ err = checkLabels(peer1, []peerLabel{})
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkLabels(peer2, []peerLabel{})
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerLabels(ps, map[string][]peerLabel{})
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerSetHistory(ps, []uint64{}, notaryset)
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkDirectPeer(server, []enode.ID{})
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkGroup(server, []string{})
+ if err != nil {
+ t.Error(err)
+ }
+
+}
+
+func TestPeerSetBuildDKGConn(t *testing.T) {
+ self := discover.Node{ID: nodeID(0)}
+ server := newTestP2PServer(&self)
+ table := newNodeTable()
+
+ gov := &testGovernance{}
+
+ gov.getDKGSetFunc = func(round uint64) map[string]struct{} {
+ m := map[uint64][]enode.ID{
+ 10: []enode.ID{nodeID(0), nodeID(1), nodeID(2)},
+ 11: []enode.ID{nodeID(1), nodeID(2), nodeID(5)},
+ 12: []enode.ID{nodeID(0), nodeID(3), nodeID(5)},
+ }
+ return newTestNodeSet(m[round])
+ }
+
+ ps := newPeerSet(gov, server, table)
+ peer1 := newDummyPeer(nodeID(1))
+ peer2 := newDummyPeer(nodeID(2))
+ var err error
+ err = ps.Register(peer1)
+ if err != nil {
+ t.Error(err)
+ }
+ err = ps.Register(peer2)
+ if err != nil {
+ t.Error(err)
+ }
+
+ // build round 10
+ ps.BuildDKGConn(10)
+
+ err = checkLabels(peer1, []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkLabels(peer2, []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerLabels(ps, map[string][]peerLabel{
+ nodeID(1).String(): []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ },
+ nodeID(2).String(): []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ },
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerSetHistory(ps, []uint64{10}, dkgset)
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkDirectPeer(server, []enode.ID{
+ nodeID(1), nodeID(2),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+
+ // build round 11
+ ps.BuildDKGConn(11)
+
+ err = checkLabels(peer1, []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkLabels(peer2, []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerLabels(ps, map[string][]peerLabel{
+ nodeID(1).String(): []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ },
+ nodeID(2).String(): []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ },
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerSetHistory(ps, []uint64{10}, dkgset)
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkDirectPeer(server, []enode.ID{
+ nodeID(1), nodeID(2),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+
+ // build round 12
+ ps.BuildDKGConn(12)
+
+ err = checkLabels(peer1, []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkLabels(peer2, []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerLabels(ps, map[string][]peerLabel{
+ nodeID(1).String(): []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ },
+ nodeID(2).String(): []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ },
+ nodeID(3).String(): []peerLabel{
+ peerLabel{dkgset, 0, 12},
+ },
+ nodeID(5).String(): []peerLabel{
+ peerLabel{dkgset, 0, 12},
+ },
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerSetHistory(ps, []uint64{10, 12}, dkgset)
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkDirectPeer(server, []enode.ID{
+ nodeID(1), nodeID(2), nodeID(3), nodeID(5),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+
+ // forget round 11
+ ps.ForgetDKGConn(11)
+
+ err = checkLabels(peer1, []peerLabel{})
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkLabels(peer2, []peerLabel{})
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerLabels(ps, map[string][]peerLabel{
+ nodeID(3).String(): []peerLabel{
+ peerLabel{dkgset, 0, 12},
+ },
+ nodeID(5).String(): []peerLabel{
+ peerLabel{dkgset, 0, 12},
+ },
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerSetHistory(ps, []uint64{12}, dkgset)
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkDirectPeer(server, []enode.ID{
+ nodeID(3), nodeID(5),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+
+ // forget round 12
+ ps.ForgetDKGConn(12)
+ err = checkLabels(peer1, []peerLabel{})
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkLabels(peer2, []peerLabel{})
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerLabels(ps, map[string][]peerLabel{})
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerSetHistory(ps, []uint64{}, dkgset)
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkDirectPeer(server, []enode.ID{})
+ if err != nil {
+ t.Error(err)
+ }
+}
+
+func checkLabels(p *peer, want []peerLabel) error {
+ if p.labels.Cardinality() != len(want) {
+ return fmt.Errorf("num of labels mismatch: got %d, want %d",
+ p.labels.Cardinality(), len(want))
+ }
+
+ for _, label := range want {
+ if !p.labels.Contains(label) {
+ return fmt.Errorf("label %+v not exist", label)
+ }
+ }
+ return nil
+}
+
+func checkPeerLabels(ps *peerSet, want map[string][]peerLabel) error {
+ if len(ps.peerLabels) != len(want) {
+ return fmt.Errorf("peer num mismatch: got %d, want %d",
+ len(ps.peerLabels), len(want))
+ }
+
+ for peerID, gotLabels := range ps.peerLabels {
+ wantLabels, ok := want[peerID]
+ if !ok {
+ return fmt.Errorf("peer id %s not exists", peerID)
+ }
+
+ if len(gotLabels) != len(wantLabels) {
+ return fmt.Errorf(
+ "num of labels of peer id %s mismatch: got %d, want %d",
+ peerID, len(gotLabels), len(wantLabels))
+ }
+
+ for _, label := range wantLabels {
+ if _, ok := gotLabels[label]; !ok {
+ fmt.Errorf("label: %+v not exists", label)
+ }
+ }
+ }
+ return nil
+}
+
+func checkPeerSetHistory(ps *peerSet, want []uint64, set setType) error {
+ var history map[uint64]struct{}
+ switch set {
+ case notaryset:
+ history = ps.notaryHistory
+ case dkgset:
+ history = ps.dkgHistory
+ default:
+ return fmt.Errorf("invalid set: %d", set)
+ }
+
+ if len(history) != len(want) {
+ return fmt.Errorf("num of history mismatch: got %d, want %d",
+ len(history), len(want))
+ }
+
+ for _, r := range want {
+ if _, ok := history[r]; !ok {
+ return fmt.Errorf("round %d not exists", r)
+ }
+ }
+ return nil
+}
+
+func checkDirectPeer(srvr *testP2PServer, want []enode.ID) error {
+ if len(srvr.direct) != len(want) {
+ return fmt.Errorf("num of direct peer mismatch: got %d, want %d",
+ len(srvr.direct), len(want))
+ }
+
+ for _, id := range want {
+ if _, ok := srvr.direct[id]; !ok {
+ return fmt.Errorf("direct peer %s not exists", id.String())
+ }
+ }
+ return nil
+}
+func checkGroup(srvr *testP2PServer, want []string) error {
+ if len(srvr.group) != len(want) {
+ return fmt.Errorf("num of group mismatch: got %d, want %d",
+ len(srvr.group), len(want))
+ }
+
+ for _, name := range want {
+ if _, ok := srvr.group[name]; !ok {
+ return fmt.Errorf("group %s not exists", name)
+ }
+ }
+ return nil
+}
+
+func nodeID(n int64) enode.ID {
+ b := big.NewInt(n).Bytes()
+ var id enode.ID
+ copy(id[len(id)-len(b):], b)
+ return id
+}
+
+func newTestNodeSet(nodes []enode.ID) map[string]struct{} {
+ m := make(map[string]struct{})
+ for _, node := range nodes {
+ m[node.String()] = struct{}{}
+ }
+ return m
+}
+
+func newDummyPeer(id enode.ID) *peer {
+ return &peer{
+ labels: mapset.NewSet(),
+ id: id.String(),
+ }
+}
diff --git a/dex/protocol.go b/dex/protocol.go
index 0111edf18..7b01217ff 100644
--- a/dex/protocol.go
+++ b/dex/protocol.go
@@ -20,14 +20,11 @@ import (
"fmt"
"io"
"math/big"
- "net"
"github.com/dexon-foundation/dexon/common"
"github.com/dexon-foundation/dexon/core"
"github.com/dexon-foundation/dexon/core/types"
- "github.com/dexon-foundation/dexon/crypto/sha3"
"github.com/dexon-foundation/dexon/event"
- "github.com/dexon-foundation/dexon/p2p/discover"
"github.com/dexon-foundation/dexon/p2p/enode"
"github.com/dexon-foundation/dexon/rlp"
)
@@ -67,7 +64,7 @@ const (
ReceiptsMsg = 0x10
// Protocol messages belonging to dex/64
- NotaryNodeInfoMsg = 0x11
+ MetaMsg = 0x11
)
type errCode int
@@ -114,12 +111,26 @@ type txPool interface {
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
}
+type governance interface {
+ GetChainNum(uint64) uint32
+
+ GetNotarySet(uint32, uint64) map[string]struct{}
+
+ GetDKGSet(uint64) map[string]struct{}
+
+ SubscribeNewCRSEvent(ch chan core.NewCRSEvent) event.Subscription
+}
+
type p2pServer interface {
Self() *enode.Node
- AddNotaryPeer(*discover.Node)
+ AddDirectPeer(*enode.Node)
+
+ RemoveDirectPeer(*enode.Node)
- RemoveNotaryPeer(*discover.Node)
+ AddGroup(string, []*enode.Node, uint64)
+
+ RemoveGroup(string)
}
// statusData is the network packet for the status message.
@@ -195,22 +206,3 @@ type blockBody struct {
// blockBodiesData is the network packet for block content distribution.
type blockBodiesData []*blockBody
-
-// TODO(sonic): revisit this msg when dexon core SDK is finalized.
-// notartyNodeInfo is the network packet for notary node ip info.
-type notaryNodeInfo struct {
- ID discover.NodeID
- IP net.IP
- UDP uint16
- TCP uint16
- Round uint64
- Sig []byte
- Timestamp int64
-}
-
-func (n *notaryNodeInfo) Hash() (h common.Hash) {
- hw := sha3.NewKeccak256()
- rlp.Encode(hw, n)
- hw.Sum(h[:0])
- return h
-}
diff --git a/dex/protocol_test.go b/dex/protocol_test.go
index c1b6efcfc..8c7638b2b 100644
--- a/dex/protocol_test.go
+++ b/dex/protocol_test.go
@@ -221,3 +221,85 @@ func TestGetBlockHeadersDataEncodeDecode(t *testing.T) {
}
}
}
+
+func TestRecvNodeMetas(t *testing.T) {
+ pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
+ p, _ := newTestPeer("peer", dex64, pm, true)
+ defer pm.Stop()
+ defer p.close()
+
+ meta := NodeMeta{
+ ID: nodeID(1),
+ }
+
+ ch := make(chan newMetasEvent)
+ pm.nodeTable.SubscribeNewMetasEvent(ch)
+
+ if err := p2p.Send(p.app, MetaMsg, []interface{}{meta}); err != nil {
+ t.Fatalf("send error: %v", err)
+ }
+
+ select {
+ case event := <-ch:
+ metas := event.Metas
+ if len(metas) != 1 {
+ t.Errorf("wrong number of new metas: got %d, want 1", len(metas))
+ } else if metas[0].Hash() != meta.Hash() {
+ t.Errorf("added wrong meta hash: got %v, want %v", metas[0].Hash(), meta.Hash())
+ }
+ case <-time.After(3 * time.Second):
+ t.Errorf("no newMetasEvent received within 3 seconds")
+ }
+}
+
+func TestSendNodeMetas(t *testing.T) {
+ pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
+ defer pm.Stop()
+
+ allmetas := make([]*NodeMeta, 100)
+ for nonce := range allmetas {
+ allmetas[nonce] = &NodeMeta{ID: nodeID(int64(nonce))}
+ }
+
+ // Connect several peers. They should all receive the pending transactions.
+ var wg sync.WaitGroup
+ checkmetas := func(p *testPeer) {
+ defer wg.Done()
+ defer p.close()
+ seen := make(map[common.Hash]bool)
+ for _, meta := range allmetas {
+ seen[meta.Hash()] = false
+ }
+ for n := 0; n < len(allmetas) && !t.Failed(); {
+ var metas []*NodeMeta
+ msg, err := p.app.ReadMsg()
+ if err != nil {
+ t.Errorf("%v: read error: %v", p.Peer, err)
+ } else if msg.Code != MetaMsg {
+ t.Errorf("%v: got code %d, want MetaMsg", p.Peer, msg.Code)
+ }
+ if err := msg.Decode(&metas); err != nil {
+ t.Errorf("%v: %v", p.Peer, err)
+ }
+ for _, meta := range metas {
+ hash := meta.Hash()
+ seenmeta, want := seen[hash]
+ if seenmeta {
+ t.Errorf("%v: got meta more than once: %x", p.Peer, hash)
+ }
+ if !want {
+ t.Errorf("%v: got unexpected meta: %x", p.Peer, hash)
+ }
+ seen[hash] = true
+ n++
+ }
+ }
+ }
+ for i := 0; i < 3; i++ {
+ p, _ := newTestPeer(fmt.Sprintf("peer #%d", i), dex64, pm, true)
+ wg.Add(1)
+ go checkmetas(p)
+ }
+ pm.nodeTable.Add(allmetas)
+ wg.Wait()
+}
diff --git a/dex/sync.go b/dex/sync.go
index d7fe748bc..5af6076bc 100644
--- a/dex/sync.go
+++ b/dex/sync.go
@@ -25,7 +25,7 @@ import (
"github.com/dexon-foundation/dexon/core/types"
"github.com/dexon-foundation/dexon/eth/downloader"
"github.com/dexon-foundation/dexon/log"
- "github.com/dexon-foundation/dexon/p2p/discover"
+ "github.com/dexon-foundation/dexon/p2p/enode"
)
const (
@@ -35,6 +35,9 @@ const (
// This is the target size for the packs of transactions sent by txsyncLoop.
// A pack can get larger than this if a single transactions exceeds this size.
txsyncPackSize = 100 * 1024
+
+ // This is the target number for the packs of metas sent by metasyncLoop.
+ metasyncPackNum = 1024
)
type txsync struct {
@@ -64,7 +67,7 @@ func (pm *ProtocolManager) syncTransactions(p *peer) {
// the transactions in small packs to one peer at a time.
func (pm *ProtocolManager) txsyncLoop() {
var (
- pending = make(map[discover.NodeID]*txsync)
+ pending = make(map[enode.ID]*txsync)
sending = false // whether a send is active
pack = new(txsync) // the pack that is being sent
done = make(chan error, 1) // result of the send
@@ -129,6 +132,94 @@ func (pm *ProtocolManager) txsyncLoop() {
}
}
+type metasync struct {
+ p *peer
+ metas []*NodeMeta
+}
+
+// syncNodeMetas starts sending all node metas to the given peer.
+func (pm *ProtocolManager) syncNodeMetas(p *peer) {
+ metas := pm.nodeTable.Metas()
+ if len(metas) == 0 {
+ return
+ }
+ select {
+ case pm.metasyncCh <- &metasync{p, metas}:
+ case <-pm.quitSync:
+ }
+}
+
+// metasyncLoop takes care of the initial node meta sync for each new
+// connection. When a new peer appears, we relay all currently node metas.
+// In order to minimise egress bandwidth usage, we send
+// the metas in small packs to one peer at a time.
+func (pm *ProtocolManager) metasyncLoop() {
+ var (
+ pending = make(map[enode.ID]*metasync)
+ sending = false // whether a send is active
+ pack = new(metasync) // the pack that is being sent
+ done = make(chan error, 1) // result of the send
+ )
+
+ // send starts a sending a pack of transactions from the sync.
+ send := func(s *metasync) {
+ // Fill pack with node metas up to the target num.
+ var num int
+ pack.p = s.p
+ pack.metas = pack.metas[:0]
+ for i := 0; i < len(s.metas) && num < metasyncPackNum; i++ {
+ pack.metas = append(pack.metas, s.metas[i])
+ num += 1
+ }
+ // Remove the metas that will be sent.
+ s.metas = s.metas[:copy(s.metas, s.metas[len(pack.metas):])]
+ if len(s.metas) == 0 {
+ delete(pending, s.p.ID())
+ }
+ // Send the pack in the background.
+ s.p.Log().Trace("Sending batch of transactions", "count", len(pack.metas), "bytes", num)
+ sending = true
+ go func() { done <- pack.p.SendNodeMetas(pack.metas) }()
+ }
+
+ // pick chooses the next pending sync.
+ pick := func() *metasync {
+ if len(pending) == 0 {
+ return nil
+ }
+ n := rand.Intn(len(pending)) + 1
+ for _, s := range pending {
+ if n--; n == 0 {
+ return s
+ }
+ }
+ return nil
+ }
+
+ for {
+ select {
+ case s := <-pm.metasyncCh:
+ pending[s.p.ID()] = s
+ if !sending {
+ send(s)
+ }
+ case err := <-done:
+ sending = false
+ // Stop tracking peers that cause send failures.
+ if err != nil {
+ pack.p.Log().Debug("NodeMeta send failed", "err", err)
+ delete(pending, pack.p.ID())
+ }
+ // Schedule the next send.
+ if s := pick(); s != nil {
+ send(s)
+ }
+ case <-pm.quitSync:
+ return
+ }
+ }
+}
+
// syncer is responsible for periodically synchronising with the network, both
// downloading hashes and blocks as well as handling the announcement handler.
func (pm *ProtocolManager) syncer() {
diff --git a/p2p/dial.go b/p2p/dial.go
index 9b24ed96a..909bed863 100644
--- a/p2p/dial.go
+++ b/p2p/dial.go
@@ -64,6 +64,12 @@ func (t TCPDialer) Dial(dest *enode.Node) (net.Conn, error) {
return t.Dialer.Dial("tcp", addr.String())
}
+type dialGroup struct {
+ name string
+ nodes map[enode.ID]*enode.Node
+ num uint64
+}
+
// dialstate schedules dials and discovery lookups.
// it get's a chance to compute new tasks on every iteration
// of the main loop in Server.run.
@@ -78,6 +84,8 @@ type dialstate struct {
lookupBuf []*enode.Node // current discovery lookup results
randomNodes []*enode.Node // filled from Table
static map[enode.ID]*dialTask
+ direct map[enode.ID]*dialTask
+ group map[string]*dialGroup
hist *dialHistory
start time.Time // time when the dialer was first used
@@ -85,6 +93,7 @@ type dialstate struct {
}
type discoverTable interface {
+ Self() *enode.Node
Close()
Resolve(*enode.Node) *enode.Node
LookupRandom() []*enode.Node
@@ -133,6 +142,8 @@ func newDialState(self enode.ID, static []*enode.Node, bootnodes []*enode.Node,
self: self,
netrestrict: netrestrict,
static: make(map[enode.ID]*dialTask),
+ direct: make(map[enode.ID]*dialTask),
+ group: make(map[string]*dialGroup),
dialing: make(map[enode.ID]connFlag),
bootnodes: make([]*enode.Node, len(bootnodes)),
randomNodes: make([]*enode.Node, maxdyn/2),
@@ -159,6 +170,23 @@ func (s *dialstate) removeStatic(n *enode.Node) {
s.hist.remove(n.ID())
}
+func (s *dialstate) addDirect(n *enode.Node) {
+ s.direct[n.ID()] = &dialTask{flags: directDialedConn, dest: n}
+}
+
+func (s *dialstate) removeDirect(n *enode.Node) {
+ delete(s.direct, n.ID())
+ s.hist.remove(n.ID())
+}
+
+func (s *dialstate) addGroup(g *dialGroup) {
+ s.group[g.name] = g
+}
+
+func (s *dialstate) removeGroup(g *dialGroup) {
+ delete(s.group, g.name)
+}
+
func (s *dialstate) newTasks(nRunning int, peers map[enode.ID]*Peer, now time.Time) []task {
if s.start.IsZero() {
s.start = now
@@ -196,13 +224,69 @@ func (s *dialstate) newTasks(nRunning int, peers map[enode.ID]*Peer, now time.Ti
err := s.checkDial(t.dest, peers)
switch err {
case errNotWhitelisted, errSelf:
- log.Warn("Removing static dial candidate", "id", t.dest.ID, "addr", &net.TCPAddr{IP: t.dest.IP(), Port: t.dest.TCP()}, "err", err)
+ log.Warn("Removing static dial candidate", "id", t.dest.ID(), "addr", &net.TCPAddr{IP: t.dest.IP(), Port: t.dest.TCP()}, "err", err)
delete(s.static, t.dest.ID())
case nil:
s.dialing[id] = t.flags
newtasks = append(newtasks, t)
}
}
+
+ for id, t := range s.direct {
+ err := s.checkDial(t.dest, peers)
+ switch err {
+ case errNotWhitelisted, errSelf:
+ log.Warn("Removing direct dial candidate", "id", t.dest.ID(), "addr", &net.TCPAddr{IP: t.dest.IP(), Port: t.dest.TCP()}, "err", err)
+ delete(s.direct, t.dest.ID())
+ case nil:
+ s.dialing[id] = t.flags
+ newtasks = append(newtasks, t)
+ }
+ }
+
+ // compute connected
+ connected := map[string]map[enode.ID]struct{}{}
+ for _, g := range s.group {
+ connected[g.name] = map[enode.ID]struct{}{}
+ }
+
+ for id := range peers {
+ for _, g := range s.group {
+ if _, ok := g.nodes[id]; ok {
+ connected[g.name][id] = struct{}{}
+ }
+ }
+ }
+
+ for id := range s.dialing {
+ for _, g := range s.group {
+ if _, ok := g.nodes[id]; ok {
+ connected[g.name][id] = struct{}{}
+ }
+ }
+ }
+
+ groupNodes := map[enode.ID]*enode.Node{}
+ for _, g := range s.group {
+ for _, n := range g.nodes {
+ if uint64(len(connected[g.name])) >= g.num {
+ break
+ }
+ err := s.checkDial(n, peers)
+ switch err {
+ case errNotWhitelisted, errSelf:
+ log.Warn("Removing group dial candidate", "id", n.ID(), "addr", &net.TCPAddr{IP: n.IP(), Port: n.TCP()}, "err", err)
+ delete(g.nodes, n.ID())
+ case nil:
+ groupNodes[n.ID()] = n
+ connected[g.name][n.ID()] = struct{}{}
+ }
+ }
+ }
+ for _, n := range groupNodes {
+ addDial(groupDialedConn, n)
+ }
+
// If we don't have any peers whatsoever, try to dial a random bootnode. This
// scenario is useful for the testnet (and private networks) where the discovery
// table might be full of mostly bad peers, making it hard to find good ones.
diff --git a/p2p/dial_test.go b/p2p/dial_test.go
index 3805ff690..f9122de6f 100644
--- a/p2p/dial_test.go
+++ b/p2p/dial_test.go
@@ -525,6 +525,223 @@ func TestDialStateStaticDial(t *testing.T) {
})
}
+func TestDialStateDirectDial(t *testing.T) {
+ wantDirect := []*enode.Node{
+ newNode(uintID(1), nil),
+ newNode(uintID(2), nil),
+ newNode(uintID(3), nil),
+ newNode(uintID(4), nil),
+ newNode(uintID(5), nil),
+ }
+ init := newDialState(enode.ID{}, nil, nil, fakeTable{}, 0, nil)
+ for _, node := range wantDirect {
+ init.addDirect(node)
+ }
+
+ runDialTest(t, dialtest{
+ init: init,
+ rounds: []round{
+ // Direct dials are launched for the nodes that
+ // aren't yet connected.
+ {
+ peers: []*Peer{
+ {rw: &conn{flags: dynDialedConn, node: newNode(uintID(1), nil)}},
+ {rw: &conn{flags: dynDialedConn, node: newNode(uintID(2), nil)}},
+ },
+ new: []task{
+ &dialTask{flags: directDialedConn, dest: newNode(uintID(3), nil)},
+ &dialTask{flags: directDialedConn, dest: newNode(uintID(4), nil)},
+ &dialTask{flags: directDialedConn, dest: newNode(uintID(5), nil)},
+ },
+ },
+ // No new tasks are launched in this round because all direct
+ // nodes are either connected or still being dialed.
+ {
+ peers: []*Peer{
+ {rw: &conn{flags: dynDialedConn, node: newNode(uintID(1), nil)}},
+ {rw: &conn{flags: dynDialedConn, node: newNode(uintID(2), nil)}},
+ {rw: &conn{flags: staticDialedConn, node: newNode(uintID(3), nil)}},
+ },
+ done: []task{
+ &dialTask{flags: staticDialedConn, dest: newNode(uintID(3), nil)},
+ },
+ },
+ // No new dial tasks are launched because all direct
+ // nodes are now connected.
+ {
+ peers: []*Peer{
+ {rw: &conn{flags: dynDialedConn, node: newNode(uintID(1), nil)}},
+ {rw: &conn{flags: dynDialedConn, node: newNode(uintID(2), nil)}},
+ {rw: &conn{flags: directDialedConn, node: newNode(uintID(3), nil)}},
+ {rw: &conn{flags: directDialedConn, node: newNode(uintID(4), nil)}},
+ {rw: &conn{flags: directDialedConn, node: newNode(uintID(5), nil)}},
+ },
+ done: []task{
+ &dialTask{flags: directDialedConn, dest: newNode(uintID(4), nil)},
+ &dialTask{flags: directDialedConn, dest: newNode(uintID(5), nil)},
+ },
+ new: []task{
+ &waitExpireTask{Duration: 14 * time.Second},
+ },
+ },
+ // Wait a round for dial history to expire, no new tasks should spawn.
+ {
+ peers: []*Peer{
+ {rw: &conn{flags: dynDialedConn, node: newNode(uintID(1), nil)}},
+ {rw: &conn{flags: dynDialedConn, node: newNode(uintID(2), nil)}},
+ {rw: &conn{flags: directDialedConn, node: newNode(uintID(3), nil)}},
+ {rw: &conn{flags: directDialedConn, node: newNode(uintID(4), nil)}},
+ {rw: &conn{flags: directDialedConn, node: newNode(uintID(5), nil)}},
+ },
+ },
+ // If a direct node is dropped, it should be immediately redialed,
+ // irrespective whether it was originally static or dynamic.
+ {
+ peers: []*Peer{
+ {rw: &conn{flags: dynDialedConn, node: newNode(uintID(1), nil)}},
+ {rw: &conn{flags: directDialedConn, node: newNode(uintID(3), nil)}},
+ {rw: &conn{flags: directDialedConn, node: newNode(uintID(5), nil)}},
+ },
+ new: []task{
+ &dialTask{flags: directDialedConn, dest: newNode(uintID(2), nil)},
+ &dialTask{flags: directDialedConn, dest: newNode(uintID(4), nil)},
+ },
+ },
+ },
+ })
+}
+
+func TestDialStateGroupDial(t *testing.T) {
+ groups := []*dialGroup{
+ &dialGroup{
+ name: "g1",
+ nodes: map[enode.ID]*enode.Node{
+ uintID(1): newNode(uintID(1), nil),
+ uintID(2): newNode(uintID(2), nil),
+ },
+ num: 2,
+ },
+ &dialGroup{
+ name: "g2",
+ nodes: map[enode.ID]*enode.Node{
+ uintID(2): newNode(uintID(2), nil),
+ uintID(3): newNode(uintID(3), nil),
+ uintID(4): newNode(uintID(4), nil),
+ uintID(5): newNode(uintID(5), nil),
+ uintID(6): newNode(uintID(6), nil),
+ },
+ num: 2,
+ },
+ }
+
+ type groupTest struct {
+ peers []*Peer
+ dialing map[enode.ID]connFlag
+ ceiling map[string]uint64
+ }
+
+ tests := []groupTest{
+ {
+ peers: nil,
+ dialing: map[enode.ID]connFlag{},
+ ceiling: map[string]uint64{"g1": 2, "g2": 4},
+ },
+ {
+ peers: []*Peer{
+ {rw: &conn{flags: staticDialedConn, node: newNode(uintID(2), nil)}},
+ },
+ dialing: map[enode.ID]connFlag{
+ uintID(1): staticDialedConn,
+ },
+ ceiling: map[string]uint64{"g1": 2, "g2": 2},
+ },
+ {
+ peers: []*Peer{
+ {rw: &conn{flags: staticDialedConn, node: newNode(uintID(1), nil)}},
+ {rw: &conn{flags: staticDialedConn, node: newNode(uintID(3), nil)}},
+ {rw: &conn{flags: staticDialedConn, node: newNode(uintID(4), nil)}},
+ {rw: &conn{flags: staticDialedConn, node: newNode(uintID(5), nil)}},
+ },
+ dialing: map[enode.ID]connFlag{
+ uintID(2): staticDialedConn,
+ },
+ ceiling: map[string]uint64{"g1": 2, "g2": 4},
+ },
+ {
+ peers: nil,
+ dialing: map[enode.ID]connFlag{
+ uintID(1): staticDialedConn,
+ uintID(2): staticDialedConn,
+ uintID(3): staticDialedConn,
+ },
+ ceiling: map[string]uint64{"g1": 2, "g2": 4},
+ },
+ }
+
+ pm := func(ps []*Peer) map[enode.ID]*Peer {
+ m := make(map[enode.ID]*Peer)
+ for _, p := range ps {
+ m[p.rw.node.ID()] = p
+ }
+ return m
+ }
+
+ run := func(i int, tt groupTest) {
+ d := newDialState(enode.ID{}, nil, nil, fakeTable{}, 0, nil)
+ d.dialing = make(map[enode.ID]connFlag)
+ for k, v := range tt.dialing {
+ d.dialing[k] = v
+ }
+
+ for _, g := range groups {
+ d.addGroup(g)
+ }
+ peermap := pm(tt.peers)
+ new := d.newTasks(len(tt.dialing), peermap, time.Now())
+
+ cnt := map[string]uint64{}
+ for id := range peermap {
+ for _, g := range groups {
+ if _, ok := g.nodes[id]; ok {
+ cnt[g.name]++
+ }
+ }
+ }
+
+ for id := range tt.dialing {
+ for _, g := range groups {
+ if _, ok := g.nodes[id]; ok {
+ cnt[g.name]++
+ }
+ }
+ }
+
+ for _, task := range new {
+ id := task.(*dialTask).dest.ID()
+ for _, g := range groups {
+ if _, ok := g.nodes[id]; ok {
+ cnt[g.name]++
+ }
+ }
+ }
+
+ for _, g := range groups {
+ if cnt[g.name] < g.num {
+ t.Errorf("test %d) group %s peers + dialing + new < num (%d < %d)",
+ i, g.name, cnt[g.name], g.num)
+ }
+ if cnt[g.name] > tt.ceiling[g.name] {
+ t.Errorf("test %d) group %s peers + dialing + new > ceiling (%d > %d)",
+ i, g.name, cnt[g.name], tt.ceiling[g.name])
+ }
+ }
+ }
+
+ for i, tt := range tests {
+ run(i, tt)
+ }
+}
+
// This test checks that static peers will be redialed immediately if they were re-added to a static list.
func TestDialStaticAfterReset(t *testing.T) {
wantStatic := []*enode.Node{
diff --git a/p2p/discover/table.go b/p2p/discover/table.go
index 9fb397d36..2b4640211 100644
--- a/p2p/discover/table.go
+++ b/p2p/discover/table.go
@@ -125,7 +125,7 @@ func newTable(t transport, db *enode.DB, bootnodes []*enode.Node) (*Table, error
return tab, nil
}
-func (tab *Table) self() *enode.Node {
+func (tab *Table) Self() *enode.Node {
return tab.net.self()
}
@@ -258,7 +258,7 @@ func (tab *Table) lookup(targetKey encPubkey, refreshIfEmpty bool) []*node {
)
// don't query further if we hit ourself.
// unlikely to happen often in practice.
- asked[tab.self().ID()] = true
+ asked[tab.Self().ID()] = true
for {
tab.mutex.Lock()
@@ -411,7 +411,7 @@ func (tab *Table) doRefresh(done chan struct{}) {
// Run self lookup to discover new neighbor nodes.
// We can only do this if we have a secp256k1 identity.
var key ecdsa.PublicKey
- if err := tab.self().Load((*enode.Secp256k1)(&key)); err == nil {
+ if err := tab.Self().Load((*enode.Secp256k1)(&key)); err == nil {
tab.lookup(encodePubkey(&key), false)
}
@@ -533,7 +533,7 @@ func (tab *Table) len() (n int) {
// bucket returns the bucket for the given node ID hash.
func (tab *Table) bucket(id enode.ID) *bucket {
- d := enode.LogDist(tab.self().ID(), id)
+ d := enode.LogDist(tab.Self().ID(), id)
if d <= bucketMinDistance {
return tab.buckets[0]
}
@@ -546,7 +546,7 @@ func (tab *Table) bucket(id enode.ID) *bucket {
//
// The caller must not hold tab.mutex.
func (tab *Table) add(n *node) {
- if n.ID() == tab.self().ID() {
+ if n.ID() == tab.Self().ID() {
return
}
@@ -579,7 +579,7 @@ func (tab *Table) stuff(nodes []*node) {
defer tab.mutex.Unlock()
for _, n := range nodes {
- if n.ID() == tab.self().ID() {
+ if n.ID() == tab.Self().ID() {
continue // don't add self
}
b := tab.bucket(n.ID())
diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go
index 50ee7816b..74eb0605b 100644
--- a/p2p/discover/table_test.go
+++ b/p2p/discover/table_test.go
@@ -141,7 +141,7 @@ func TestTable_IPLimit(t *testing.T) {
defer db.Close()
for i := 0; i < tableIPLimit+1; i++ {
- n := nodeAtDistance(tab.self().ID(), i, net.IP{172, 0, 1, byte(i)})
+ n := nodeAtDistance(tab.Self().ID(), i, net.IP{172, 0, 1, byte(i)})
tab.add(n)
}
if tab.len() > tableIPLimit {
@@ -158,7 +158,7 @@ func TestTable_BucketIPLimit(t *testing.T) {
d := 3
for i := 0; i < bucketIPLimit+1; i++ {
- n := nodeAtDistance(tab.self().ID(), d, net.IP{172, 0, 1, byte(i)})
+ n := nodeAtDistance(tab.Self().ID(), d, net.IP{172, 0, 1, byte(i)})
tab.add(n)
}
if tab.len() > bucketIPLimit {
@@ -240,7 +240,7 @@ func TestTable_ReadRandomNodesGetAll(t *testing.T) {
for i := 0; i < len(buf); i++ {
ld := cfg.Rand.Intn(len(tab.buckets))
- tab.stuff([]*node{nodeAtDistance(tab.self().ID(), ld, intIP(ld))})
+ tab.stuff([]*node{nodeAtDistance(tab.Self().ID(), ld, intIP(ld))})
}
gotN := tab.ReadRandomNodes(buf)
if gotN != tab.len() {
diff --git a/p2p/discover/table_util_test.go b/p2p/discover/table_util_test.go
index 96cf7196a..df74539c8 100644
--- a/p2p/discover/table_util_test.go
+++ b/p2p/discover/table_util_test.go
@@ -75,10 +75,10 @@ func intIP(i int) net.IP {
// fillBucket inserts nodes into the given bucket until it is full.
func fillBucket(tab *Table, n *node) (last *node) {
- ld := enode.LogDist(tab.self().ID(), n.ID())
+ ld := enode.LogDist(tab.Self().ID(), n.ID())
b := tab.bucket(n.ID())
for len(b.entries) < bucketSize {
- b.entries = append(b.entries, nodeAtDistance(tab.self().ID(), ld, intIP(ld)))
+ b.entries = append(b.entries, nodeAtDistance(tab.Self().ID(), ld, intIP(ld)))
}
return b.entries[bucketSize-1]
}
diff --git a/p2p/peer.go b/p2p/peer.go
index 3c75d7dd5..2c357fdc9 100644
--- a/p2p/peer.go
+++ b/p2p/peer.go
@@ -434,7 +434,6 @@ type PeerInfo struct {
RemoteAddress string `json:"remoteAddress"` // Remote endpoint of the TCP data connection
Inbound bool `json:"inbound"`
Trusted bool `json:"trusted"`
- Notary bool `json:"notary"`
Static bool `json:"static"`
} `json:"network"`
Protocols map[string]interface{} `json:"protocols"` // Sub-protocol specific metadata fields
@@ -459,7 +458,6 @@ func (p *Peer) Info() *PeerInfo {
info.Network.RemoteAddress = p.RemoteAddr().String()
info.Network.Inbound = p.rw.is(inboundConn)
info.Network.Trusted = p.rw.is(trustedConn)
- info.Network.Notary = p.rw.is(notaryConn)
info.Network.Static = p.rw.is(staticDialedConn)
// Gather all the running protocol infos
diff --git a/p2p/server.go b/p2p/server.go
index c3b3a00d4..a58673342 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -33,13 +33,13 @@ import (
"github.com/dexon-foundation/dexon/crypto"
"github.com/dexon-foundation/dexon/event"
"github.com/dexon-foundation/dexon/log"
- "github.com/dexon-foundation/dexon/p2p/discover"
"github.com/dexon-foundation/dexon/p2p/discv5"
"github.com/dexon-foundation/dexon/p2p/enode"
"github.com/dexon-foundation/dexon/p2p/enr"
"github.com/dexon-foundation/dexon/p2p/nat"
"github.com/dexon-foundation/dexon/p2p/netutil"
"github.com/dexon-foundation/dexon/rlp"
+ "github.com/ethereum/go-ethereum/p2p/discover"
)
const (
@@ -178,10 +178,12 @@ type Server struct {
quit chan struct{}
addstatic chan *enode.Node
removestatic chan *enode.Node
+ adddirect chan *enode.Node
+ removedirect chan *enode.Node
addtrusted chan *enode.Node
removetrusted chan *enode.Node
- addnotary chan *enode.Node
- removenotary chan *enode.Node
+ addgroup chan *dialGroup
+ removegroup chan *dialGroup
posthandshake chan *conn
addpeer chan *conn
delpeer chan peerDrop
@@ -203,9 +205,10 @@ type connFlag int32
const (
dynDialedConn connFlag = 1 << iota
staticDialedConn
+ directDialedConn
+ groupDialedConn
inboundConn
trustedConn
- notaryConn
)
// conn wraps a network connection with information gathered
@@ -254,12 +257,15 @@ func (f connFlag) String() string {
if f&staticDialedConn != 0 {
s += "-staticdial"
}
+ if f&directDialedConn != 0 {
+ s += "-directdial"
+ }
+ if f&groupDialedConn != 0 {
+ s += "-groupdial"
+ }
if f&inboundConn != 0 {
s += "-inbound"
}
- if f&notaryConn != 0 {
- s += "-notary"
- }
if s != "" {
s = s[1:]
}
@@ -333,40 +339,57 @@ func (srv *Server) RemovePeer(node *enode.Node) {
}
}
-// AddTrustedPeer adds the given node to a reserved whitelist which allows the
-// node to always connect, even if the slot are full.
-func (srv *Server) AddTrustedPeer(node *enode.Node) {
+// AddDirectPeer connects to the given node and maintains the connection until the
+// server is shut down. If the connection fails for any reason, the server will
+// attempt to reconnect the peer.
+func (srv *Server) AddDirectPeer(node *enode.Node) {
select {
- case srv.addtrusted <- node:
+ case srv.adddirect <- node:
case <-srv.quit:
}
}
-// RemoveTrustedPeer removes the given node from the trusted peer set.
-func (srv *Server) RemoveTrustedPeer(node *enode.Node) {
+// RemoveDirectPeer disconnects from the given node
+func (srv *Server) RemoveDirectPeer(node *enode.Node) {
select {
- case srv.removetrusted <- node:
+ case srv.removedirect <- node:
case <-srv.quit:
}
}
-// AddNotaryPeer connects to the given node and maintains the connection until the
-// server is shut down. If the connection fails for any reason, the server will
-// attempt to reconnect the peer.
-// AddNotaryPeer also adds the given node to a reserved whitelist which allows the
+func (srv *Server) AddGroup(name string, nodes []*enode.Node, num uint64) {
+ m := map[enode.ID]*enode.Node{}
+ for _, node := range nodes {
+ m[node.ID()] = node
+ }
+ g := &dialGroup{name: name, nodes: m, num: num}
+ select {
+ case srv.addgroup <- g:
+ case <-srv.quit:
+ }
+}
+
+func (srv *Server) RemoveGroup(name string) {
+ g := &dialGroup{name: name}
+ select {
+ case srv.removegroup <- g:
+ case <-srv.quit:
+ }
+}
+
+// AddTrustedPeer adds the given node to a reserved whitelist which allows the
// node to always connect, even if the slot are full.
-func (srv *Server) AddNotaryPeer(node *discover.Node) {
+func (srv *Server) AddTrustedPeer(node *enode.Node) {
select {
- case srv.addnotary <- node:
+ case srv.addtrusted <- node:
case <-srv.quit:
}
}
-// RemoveNotaryPeer disconnects from the given node.
-// RemoveNotaryPeer also removes the given node from the notary peer set.
-func (srv *Server) RemoveNotaryPeer(node *discover.Node) {
+// RemoveTrustedPeer removes the given node from the trusted peer set.
+func (srv *Server) RemoveTrustedPeer(node *enode.Node) {
select {
- case srv.removenotary <- node:
+ case srv.removetrusted <- node:
case <-srv.quit:
}
}
@@ -388,6 +411,36 @@ func (srv *Server) Self() *enode.Node {
return ln.Node()
}
+func (srv *Server) makeSelf(listener net.Listener, ntab discoverTable) *enode.Node {
+ // If the node is running but discovery is off, manually assemble the node infos.
+ if ntab == nil {
+ addr := srv.tcpAddr(listener)
+ return enode.NewV4(&srv.PrivateKey.PublicKey, addr.IP, addr.Port, 0)
+ }
+ // Otherwise return the discovery node.
+ return ntab.Self()
+}
+
+func (srv *Server) tcpAddr(listener net.Listener) net.TCPAddr {
+ addr := net.TCPAddr{IP: net.IP{0, 0, 0, 0}}
+ if listener == nil {
+ return addr // Inbound connections disabled, use zero address.
+ }
+ // Otherwise inject the listener address too.
+ if a, ok := listener.Addr().(*net.TCPAddr); ok {
+ addr = *a
+ }
+ if srv.NAT != nil {
+ if ip, err := srv.NAT.ExternalIP(); err == nil {
+ addr.IP = ip
+ }
+ }
+ if addr.IP.IsUnspecified() {
+ addr.IP = net.IP{127, 0, 0, 1}
+ }
+ return addr
+}
+
// Stop terminates the server and all active peer connections.
// It blocks until all active connections have been closed.
func (srv *Server) Stop() {
@@ -465,10 +518,12 @@ func (srv *Server) Start() (err error) {
srv.posthandshake = make(chan *conn)
srv.addstatic = make(chan *enode.Node)
srv.removestatic = make(chan *enode.Node)
+ srv.adddirect = make(chan *enode.Node)
+ srv.removedirect = make(chan *enode.Node)
+ srv.addgroup = make(chan *dialGroup)
+ srv.removegroup = make(chan *dialGroup)
srv.addtrusted = make(chan *enode.Node)
srv.removetrusted = make(chan *enode.Node)
- srv.addnotary = make(chan *enode.Node)
- srv.removenotary = make(chan *enode.Node)
srv.peerOp = make(chan peerOpFunc)
srv.peerOpDone = make(chan struct{})
@@ -628,6 +683,10 @@ type dialer interface {
taskDone(task, time.Time)
addStatic(*enode.Node)
removeStatic(*enode.Node)
+ addDirect(*enode.Node)
+ removeDirect(*enode.Node)
+ addGroup(*dialGroup)
+ removeGroup(*dialGroup)
}
func (srv *Server) run(dialstate dialer) {
@@ -636,13 +695,15 @@ func (srv *Server) run(dialstate dialer) {
defer srv.nodedb.Close()
var (
- peers = make(map[enode.ID]*Peer)
- inboundCount = 0
- trusted = make(map[enode.ID]bool, len(srv.TrustedNodes))
- notary = make(map[enode.ID]bool)
- taskdone = make(chan task, maxActiveDialTasks)
- runningTasks []task
- queuedTasks []task // tasks that can't run yet
+ peers = make(map[enode.ID]*Peer)
+ inboundCount = 0
+ trusted = make(map[enode.ID]bool, len(srv.TrustedNodes))
+ peerflags = make(map[enode.ID]connFlag)
+ groupRefCount = make(map[enode.ID]int32)
+ groups = make(map[string]*dialGroup)
+ taskdone = make(chan task, maxActiveDialTasks)
+ runningTasks []task
+ queuedTasks []task // tasks that can't run yet
)
// Put trusted nodes into a map to speed up checks.
// Trusted peers are loaded on startup or added via AddTrustedPeer RPC.
@@ -680,6 +741,60 @@ func (srv *Server) run(dialstate dialer) {
}
}
+ // remember and maintain the connection flags locally
+ setConnFlags := func(id enode.ID, f connFlag, val bool) {
+ if p, ok := peers[id]; ok {
+ p.rw.set(f, val)
+ }
+ if val {
+ peerflags[id] |= f
+ } else {
+ peerflags[id] &= ^f
+ }
+ if peerflags[id] == 0 {
+ delete(peerflags, id)
+ }
+ }
+
+ // Put trusted nodes into a map to speed up checks.
+ // Trusted peers are loaded on startup or added via AddTrustedPeer RPC.
+ for _, n := range srv.TrustedNodes {
+ setConnFlags(n.ID(), trustedConn, true)
+ }
+
+ canDisconnect := func(p *Peer) bool {
+ f, ok := peerflags[p.ID()]
+ if ok && f != 0 {
+ return false
+ }
+ return !p.rw.is(dynDialedConn | inboundConn)
+ }
+
+ removeGroup := func(g *dialGroup) {
+ if gg, ok := groups[g.name]; ok {
+ for id := range gg.nodes {
+ groupRefCount[id]--
+ if groupRefCount[id] == 0 {
+ setConnFlags(id, groupDialedConn, false)
+ delete(groupRefCount, id)
+ }
+ }
+ }
+ }
+
+ addGroup := func(g *dialGroup) {
+ if _, ok := groups[g.name]; ok {
+ removeGroup(groups[g.name])
+ }
+ for id := range g.nodes {
+ groupRefCount[id]++
+ if groupRefCount[id] > 0 {
+ setConnFlags(id, groupDialedConn, true)
+ }
+ }
+ groups[g.name] = g
+ }
+
running:
for {
scheduleTasks()
@@ -693,63 +808,59 @@ running:
// ephemeral static peer list. Add it to the dialer,
// it will keep the node connected.
srv.log.Trace("Adding static node", "node", n)
+ setConnFlags(n.ID(), staticDialedConn, true)
dialstate.addStatic(n)
case n := <-srv.removestatic:
// This channel is used by RemovePeer to send a
// disconnect request to a peer and begin the
// stop keeping the node connected.
srv.log.Trace("Removing static node", "node", n)
+ setConnFlags(n.ID(), staticDialedConn, false)
dialstate.removeStatic(n)
- if p, ok := peers[n.ID()]; ok {
+ if p, ok := peers[n.ID()]; ok && canDisconnect(p) {
p.Disconnect(DiscRequested)
}
+ case n := <-srv.adddirect:
+ // This channel is used by AddDirectPeer to add to the
+ // ephemeral direct peer list. Add it to the dialer,
+ // it will keep the node connected.
+ srv.log.Trace("Adding direct node", "node", n)
+ setConnFlags(n.ID(), directDialedConn, true)
+ if p, ok := peers[n.ID()]; ok {
+ p.rw.set(directDialedConn, true)
+ }
+ dialstate.addDirect(n)
+ case n := <-srv.removedirect:
+ // This channel is used by RemoveDirectPeer to send a
+ // disconnect request to a peer and begin the
+ // stop keeping the node connected.
+ srv.log.Trace("Removing direct node", "node", n)
+ setConnFlags(n.ID(), directDialedConn, false)
+ if p, ok := peers[n.ID()]; ok {
+ p.rw.set(directDialedConn, false)
+ if !p.rw.is(trustedConn | groupDialedConn) {
+ p.Disconnect(DiscRequested)
+ }
+ }
+ dialstate.removeDirect(n)
+ case g := <-srv.addgroup:
+ srv.log.Trace("Adding group", "group", g)
+ addGroup(g)
+ dialstate.addGroup(g)
+ case g := <-srv.removegroup:
+ srv.log.Trace("Removing group", "group", g)
+ removeGroup(g)
+ dialstate.removeGroup(g)
case n := <-srv.addtrusted:
// This channel is used by AddTrustedPeer to add an enode
// to the trusted node set.
srv.log.Trace("Adding trusted node", "node", n)
- trusted[n.ID()] = true
- // Mark any already-connected peer as trusted
- if p, ok := peers[n.ID()]; ok {
- p.rw.set(trustedConn, true)
- }
+ setConnFlags(n.ID(), trustedConn, true)
case n := <-srv.removetrusted:
// This channel is used by RemoveTrustedPeer to remove an enode
// from the trusted node set.
srv.log.Trace("Removing trusted node", "node", n)
- if _, ok := trusted[n.ID()]; ok {
- delete(trusted, n.ID())
- }
- // Unmark any already-connected peer as trusted
- if p, ok := peers[n.ID()]; ok {
- p.rw.set(trustedConn, false)
- }
- case n := <-srv.addnotary:
- // This channel is used by AddNotaryPeer to add to the
- // ephemeral notary peer list. Add it to the dialer,
- // it will keep the node connected.
- srv.log.Trace("Adding notary node", "node", n)
- notary[n.ID] = true
- if p, ok := peers[n.ID]; ok {
- p.rw.set(notaryConn, true)
- }
- dialstate.addStatic(n)
- case n := <-srv.removenotary:
- // This channel is used by RemoveNotaryPeer to send a
- // disconnect request to a peer and begin the
- // stop keeping the node connected.
- srv.log.Trace("Removing notary node", "node", n)
- if _, ok := notary[n.ID]; ok {
- delete(notary, n.ID)
- }
-
- if p, ok := peers[n.ID]; ok {
- p.rw.set(notaryConn, false)
- }
-
- dialstate.removeStatic(n)
- if p, ok := peers[n.ID]; ok {
- p.Disconnect(DiscRequested)
- }
+ setConnFlags(n.ID(), trustedConn, false)
case op := <-srv.peerOp:
// This channel is used by Peers and PeerCount.
op(peers)
@@ -764,15 +875,9 @@ running:
case c := <-srv.posthandshake:
// A connection has passed the encryption handshake so
// the remote identity is known (but hasn't been verified yet).
- if trusted[c.node.ID()] {
- // Ensure that the trusted flag is set before checking against MaxPeers.
- c.flags |= trustedConn
+ if f, ok := peerflags[c.node.ID()]; ok {
+ c.flags |= f
}
-
- if notary[c.id] {
- c.flags |= notaryConn
- }
-
// TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them.
select {
case c.cont <- srv.encHandshakeChecks(peers, inboundCount, c):
@@ -853,9 +958,9 @@ func (srv *Server) protoHandshakeChecks(peers map[enode.ID]*Peer, inboundCount i
func (srv *Server) encHandshakeChecks(peers map[enode.ID]*Peer, inboundCount int, c *conn) error {
switch {
- case !c.is(trustedConn|notaryConn|staticDialedConn) && len(peers) >= srv.MaxPeers:
+ case !c.is(trustedConn|staticDialedConn|directDialedConn|groupDialedConn) && len(peers) >= srv.MaxPeers:
return DiscTooManyPeers
- case !c.is(trustedConn|notaryConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns():
+ case !c.is(trustedConn|directDialedConn|groupDialedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns():
return DiscTooManyPeers
case peers[c.node.ID()] != nil:
return DiscAlreadyConnected
diff --git a/p2p/server_test.go b/p2p/server_test.go
index 58bb75bc6..8e18ef156 100644
--- a/p2p/server_test.go
+++ b/p2p/server_test.go
@@ -28,7 +28,6 @@ import (
"github.com/dexon-foundation/dexon/crypto"
"github.com/dexon-foundation/dexon/crypto/sha3"
"github.com/dexon-foundation/dexon/log"
- "github.com/dexon-foundation/dexon/p2p/discover"
"github.com/dexon-foundation/dexon/p2p/enode"
"github.com/dexon-foundation/dexon/p2p/enr"
)
@@ -173,14 +172,10 @@ func TestServerDial(t *testing.T) {
}
// Test AddTrustedPeer/RemoveTrustedPeer and changing Trusted flags
- // Test AddNotaryPeer/RemoveTrustedPeer and changing Notary flags.
// Particularly for race conditions on changing the flag state.
if peer := srv.Peers()[0]; peer.Info().Network.Trusted {
t.Errorf("peer is trusted prematurely: %v", peer)
}
- if peer := srv.Peers()[0]; peer.Info().Network.Notary {
- t.Errorf("peer is notary prematurely: %v", peer)
- }
done := make(chan bool)
go func() {
srv.AddTrustedPeer(node)
@@ -192,14 +187,6 @@ func TestServerDial(t *testing.T) {
t.Errorf("peer is trusted after RemoveTrustedPeer: %v", peer)
}
- srv.AddNotaryPeer(node)
- if peer := srv.Peers()[0]; !peer.Info().Network.Notary {
- t.Errorf("peer is not notary after AddNotaryPeer: %v", peer)
- }
- srv.RemoveNotaryPeer(node)
- if peer := srv.Peers()[0]; peer.Info().Network.Notary {
- t.Errorf("peer is notary after RemoveNotaryPeer: %v", peer)
- }
done <- true
}()
// Trigger potential race conditions
@@ -216,70 +203,89 @@ func TestServerDial(t *testing.T) {
}
}
-// TestNotaryPeer checks that the node is added to and remove from static when
-// AddNotaryPeer and RemoveNotaryPeer is called.
-func TestNotaryPeer(t *testing.T) {
- var (
- returned = make(chan struct{})
- add, remove = make(chan *discover.Node), make(chan *discover.Node)
- tg = taskgen{
- newFunc: func(running int, peers map[discover.NodeID]*Peer) []task {
- return []task{}
- },
- doneFunc: func(t task) {},
- addFunc: func(n *discover.Node) {
- add <- n
- },
- removeFunc: func(n *discover.Node) {
- remove <- n
- },
- }
- )
-
+func TestServerPeerConnFlag(t *testing.T) {
srv := &Server{
- Config: Config{MaxPeers: 10},
- quit: make(chan struct{}),
- ntab: fakeTable{},
- addnotary: make(chan *discover.Node),
- removenotary: make(chan *discover.Node),
- running: true,
- log: log.New(),
+ Config: Config{
+ PrivateKey: newkey(),
+ MaxPeers: 10,
+ NoDial: true,
+ },
}
- srv.loopWG.Add(1)
- go func() {
- srv.run(tg)
- close(returned)
- }()
+ if err := srv.Start(); err != nil {
+ t.Fatalf("could not start: %v", err)
+ }
+ defer srv.Stop()
- notaryID := randomID()
- go srv.AddNotaryPeer(&discover.Node{ID: notaryID})
+ // inject a peer
+ key := newkey()
+ id := enode.PubkeyToIDV4(&key.PublicKey)
+ node := newNode(id, nil)
+ fd, _ := net.Pipe()
+ c := &conn{
+ node: node,
+ fd: fd,
+ transport: newTestTransport(&key.PublicKey, fd),
+ flags: inboundConn,
+ cont: make(chan error),
+ }
+ if err := srv.checkpoint(c, srv.addpeer); err != nil {
+ t.Fatalf("could not add conn: %v", err)
+ }
- select {
- case n := <-add:
- if n.ID != notaryID {
- t.Errorf("node ID mismatched: got %s, want %s",
- n.ID.String(), notaryID.String())
- }
- case <-time.After(1 * time.Second):
- t.Error("add static is not called within one second")
+ srv.AddTrustedPeer(node)
+ srv.Peers() // leverage this function to ensure trusted peer is added
+ if c.flags != (inboundConn | trustedConn) {
+ t.Errorf("flags mismatch: got %d, want %d",
+ c.flags, (inboundConn | trustedConn))
}
- go srv.RemoveNotaryPeer(&discover.Node{ID: notaryID})
- select {
- case n := <-remove:
- if n.ID != notaryID {
- t.Errorf("node ID mismatched: got %s, want %s",
- n.ID.String(), notaryID.String())
- }
- case <-time.After(1 * time.Second):
- t.Error("remove static is not called within one second")
+ srv.AddDirectPeer(node)
+ srv.Peers() // leverage this function to ensure trusted peer is added
+ if c.flags != (inboundConn | trustedConn | directDialedConn) {
+ t.Errorf("flags mismatch: got %d, want %d",
+ c.flags, (inboundConn | trustedConn | directDialedConn))
}
- srv.Stop()
- select {
- case <-returned:
- case <-time.After(500 * time.Millisecond):
- t.Error("Server.run did not return within 500ms")
+ srv.AddGroup("g1", []*enode.Node{node}, 1)
+ srv.Peers() // leverage this function to ensure trusted peer is added
+ if c.flags != (inboundConn | trustedConn | directDialedConn | groupDialedConn) {
+ t.Errorf("flags mismatch: got %d, want %d",
+ c.flags, (inboundConn | trustedConn | directDialedConn | groupDialedConn))
+ }
+
+ srv.AddGroup("g2", []*enode.Node{node}, 1)
+ srv.Peers() // leverage this function to ensure trusted peer is added
+ if c.flags != (inboundConn | trustedConn | directDialedConn | groupDialedConn) {
+ t.Errorf("flags mismatch: got %d, want %d",
+ c.flags, (inboundConn | trustedConn | directDialedConn | groupDialedConn))
+ }
+
+ srv.RemoveTrustedPeer(node)
+ srv.Peers() // leverage this function to ensure trusted peer is added
+ if c.flags != (inboundConn | directDialedConn | groupDialedConn) {
+ t.Errorf("flags mismatch: got %d, want %d",
+ c.flags, (inboundConn | directDialedConn | directDialedConn))
+ }
+
+ srv.RemoveDirectPeer(node)
+ srv.Peers() // leverage this function to ensure trusted peer is added
+ if c.flags != (inboundConn | groupDialedConn) {
+ t.Errorf("flags mismatch: got %d, want %d",
+ c.flags, (inboundConn | directDialedConn))
+ }
+
+ srv.RemoveGroup("g1")
+ srv.Peers() // leverage this function to ensure trusted peer is added
+ if c.flags != (inboundConn | groupDialedConn) {
+ t.Errorf("flags mismatch: got %d, want %d",
+ c.flags, (inboundConn | directDialedConn))
+ }
+
+ srv.RemoveGroup("g2")
+ srv.Peers() // leverage this function to ensure trusted peer is added
+ if c.flags != inboundConn {
+ t.Errorf("flags mismatch: got %d, want %d",
+ c.flags, inboundConn)
}
}
@@ -407,9 +413,6 @@ func TestServerManyTasks(t *testing.T) {
type taskgen struct {
newFunc func(running int, peers map[enode.ID]*Peer) []task
doneFunc func(task)
-
- addFunc func(*discover.Node)
- removeFunc func(*discover.Node)
}
func (tg taskgen) newTasks(running int, peers map[enode.ID]*Peer, now time.Time) []task {
@@ -418,11 +421,17 @@ func (tg taskgen) newTasks(running int, peers map[enode.ID]*Peer, now time.Time)
func (tg taskgen) taskDone(t task, now time.Time) {
tg.doneFunc(t)
}
-func (tg taskgen) addStatic(n *enode.Node) {
- tg.addFunc(n)
+func (tg taskgen) addStatic(*enode.Node) {
+}
+func (tg taskgen) removeStatic(*enode.Node) {
}
-func (tg taskgen) removeStatic(n *enode.Node) {
- tg.removeFunc(n)
+func (tg taskgen) addDirect(*enode.Node) {
+}
+func (tg taskgen) removeDirect(*enode.Node) {
+}
+func (tg taskgen) addGroup(*dialGroup) {
+}
+func (tg taskgen) removeGroup(*dialGroup) {
}
type testTask struct {
@@ -436,11 +445,10 @@ func (t *testTask) Do(srv *Server) {
// This test checks that connections are disconnected
// just after the encryption handshake when the server is
-// at capacity. Trusted and Notary connections should still be accepted.
+// at capacity. Trusted connections should still be accepted.
func TestServerAtCap(t *testing.T) {
trustedNode := newkey()
trustedID := enode.PubkeyToIDV4(&trustedNode.PublicKey)
- notaryID := randomID()
srv := &Server{
Config: Config{
PrivateKey: newkey(),
@@ -453,7 +461,6 @@ func TestServerAtCap(t *testing.T) {
t.Fatalf("could not start: %v", err)
}
defer srv.Stop()
- srv.AddNotaryPeer(&discover.Node{ID: notaryID})
newconn := func(id enode.ID) *conn {
fd, _ := net.Pipe()
@@ -484,15 +491,6 @@ func TestServerAtCap(t *testing.T) {
t.Error("Server did not set trusted flag")
}
- // Try inserting a notary connection.
- c = newconn(notaryID)
- if err := srv.checkpoint(c, srv.posthandshake); err != nil {
- t.Error("unexpected error for notary conn @posthandshake:", err)
- }
- if !c.is(notaryConn) {
- t.Error("Server did not set notary flag")
- }
-
// Remove from trusted set and try again
srv.RemoveTrustedPeer(newNode(trustedID, nil))
c = newconn(trustedID)
@@ -509,24 +507,6 @@ func TestServerAtCap(t *testing.T) {
if !c.is(trustedConn) {
t.Error("Server did not set trusted flag")
}
-
- // Remove from notary set and try again
- srv.RemoveNotaryPeer(&discover.Node{ID: notaryID})
- c = newconn(notaryID)
- if err := srv.checkpoint(c, srv.posthandshake); err != DiscTooManyPeers {
- t.Error("wrong error for insert:", err)
- }
-
- // Add anotherID to notary set and try again
- anotherNotaryID := randomID()
- srv.AddNotaryPeer(&discover.Node{ID: anotherNotaryID})
- c = newconn(anotherNotaryID)
- if err := srv.checkpoint(c, srv.posthandshake); err != nil {
- t.Error("unexpected error for notary conn @posthandshake:", err)
- }
- if !c.is(notaryConn) {
- t.Error("Server did not set notary flag")
- }
}
func TestServerPeerLimits(t *testing.T) {