aboutsummaryrefslogtreecommitdiffstats
path: root/dex
diff options
context:
space:
mode:
authorSonic <sonic@cobinhood.com>2018-09-25 20:37:11 +0800
committerWei-Ning Huang <w@byzantine-lab.io>2019-06-12 17:23:38 +0800
commitc08afdbc7741d4559683ff304157303af8766c89 (patch)
tree5125f6984343b51e3b95230f6f2816fa11a11386 /dex
parent8b977d488bce2d9d9d29a4ddc460b1ffac44aed2 (diff)
downloadgo-tangerine-c08afdbc7741d4559683ff304157303af8766c89.tar
go-tangerine-c08afdbc7741d4559683ff304157303af8766c89.tar.gz
go-tangerine-c08afdbc7741d4559683ff304157303af8766c89.tar.bz2
go-tangerine-c08afdbc7741d4559683ff304157303af8766c89.tar.lz
go-tangerine-c08afdbc7741d4559683ff304157303af8766c89.tar.xz
go-tangerine-c08afdbc7741d4559683ff304157303af8766c89.tar.zst
go-tangerine-c08afdbc7741d4559683ff304157303af8766c89.zip
dex: redesign p2p network topology
- Let p2p server support direct connection and group connection. - Introduce node meta table to maintain IP of all nodes in node set, in memory and let nodes in the network can sync this table. - Let peerSet able to manage direct connections to notary set and dkg set. The mechanism to refresh the network topology when configuration round change is not done yet.
Diffstat (limited to 'dex')
-rw-r--r--dex/handler.go216
-rw-r--r--dex/handler_test.go445
-rw-r--r--dex/helper_test.go78
-rw-r--r--dex/network.go17
-rw-r--r--dex/nodetable.go79
-rw-r--r--dex/nodetable_test.go93
-rw-r--r--dex/notaryset.go203
-rw-r--r--dex/peer.go296
-rw-r--r--dex/peer_test.go628
-rw-r--r--dex/protocol.go42
-rw-r--r--dex/protocol_test.go82
-rw-r--r--dex/sync.go95
12 files changed, 1825 insertions, 449 deletions
diff --git a/dex/handler.go b/dex/handler.go
index bc932fb28..5348b06f2 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -49,6 +49,8 @@ const (
// txChanSize is the size of channel listening to NewTxsEvent.
// The number is referenced from the size of tx pool.
txChanSize = 4096
+
+ metaChanSize = 10240
)
var (
@@ -59,11 +61,6 @@ var (
// not compatible (low protocol version restrictions and high requirements).
var errIncompatibleConfig = errors.New("incompatible configuration")
-type newNotarySetEvent struct {
- Round uint64
- Set map[string]struct{}
-}
-
func errResp(code errCode, format string, v ...interface{}) error {
return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
}
@@ -75,32 +72,35 @@ type ProtocolManager struct {
acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)
txpool txPool
+ nodeTable *nodeTable
+ gov governance
blockchain *core.BlockChain
chainconfig *params.ChainConfig
maxPeers int
- downloader *downloader.Downloader
- fetcher *fetcher.Fetcher
- peers *peerSet
- notarySetManager *notarySetManager
+ downloader *downloader.Downloader
+ fetcher *fetcher.Fetcher
+ peers *peerSet
SubProtocols []p2p.Protocol
eventMux *event.TypeMux
txsCh chan core.NewTxsEvent
txsSub event.Subscription
+ metasCh chan newMetasEvent
+ metasSub event.Subscription
minedBlockSub *event.TypeMuxSubscription
// channels for fetcher, syncer, txsyncLoop
newPeerCh chan *peer
txsyncCh chan *txsync
+ metasyncCh chan *metasync
quitSync chan struct{}
noMorePeers chan struct{}
- // channels for notarySetLoop
- newNotarySetCh chan newNotarySetEvent
- newRoundCh chan uint64
- newNotaryNodeInfoCh chan *notaryNodeInfo
+ // channels for peerSetLoop
+ crsCh chan core.NewCRSEvent
+ crsSub event.Subscription
srvr p2pServer
@@ -111,24 +111,26 @@ type ProtocolManager struct {
// NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the Ethereum network.
-func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {
+func NewProtocolManager(
+ config *params.ChainConfig, mode downloader.SyncMode, networkID uint64,
+ mux *event.TypeMux, txpool txPool, engine consensus.Engine,
+ blockchain *core.BlockChain, chaindb ethdb.Database,
+ gov governance) (*ProtocolManager, error) {
+ tab := newNodeTable()
// Create the protocol manager with the base fields
manager := &ProtocolManager{
- networkID: networkID,
- eventMux: mux,
- txpool: txpool,
- blockchain: blockchain,
- chainconfig: config,
- peers: newPeerSet(),
- newPeerCh: make(chan *peer),
- noMorePeers: make(chan struct{}),
- txsyncCh: make(chan *txsync),
- quitSync: make(chan struct{}),
- newNotarySetCh: make(chan newNotarySetEvent),
- newRoundCh: make(chan uint64),
- newNotaryNodeInfoCh: make(chan *notaryNodeInfo),
- }
- manager.notarySetManager = newNotarySetManager(manager.newNotaryNodeInfoCh)
+ networkID: networkID,
+ eventMux: mux,
+ txpool: txpool,
+ nodeTable: tab,
+ gov: gov,
+ blockchain: blockchain,
+ chainconfig: config,
+ newPeerCh: make(chan *peer),
+ noMorePeers: make(chan struct{}),
+ txsyncCh: make(chan *txsync),
+ quitSync: make(chan struct{}),
+ }
// Figure out whether to allow fast sync or not
if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {
@@ -216,22 +218,33 @@ func (pm *ProtocolManager) removePeer(id string) {
func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) {
pm.maxPeers = maxPeers
pm.srvr = srvr
+ pm.peers = newPeerSet(pm.gov, pm.srvr, pm.nodeTable)
+
+ // if our self in node set build the node info
// broadcast transactions
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
go pm.txBroadcastLoop()
+ // broadcast node metas
+ pm.metasCh = make(chan newMetasEvent, metaChanSize)
+ pm.metasSub = pm.nodeTable.SubscribeNewMetasEvent(pm.metasCh)
+ go pm.metaBroadcastLoop()
+
// broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()
- // run the notary set loop
- go pm.notarySetLoop()
+ // run the peer set loop
+ pm.crsCh = make(chan core.NewCRSEvent)
+ pm.crsSub = pm.gov.SubscribeNewCRSEvent(pm.crsCh)
+ go pm.peerSetLoop()
// start sync handlers
go pm.syncer()
go pm.txsyncLoop()
+ go pm.metasyncLoop()
}
func (pm *ProtocolManager) Stop() {
@@ -301,6 +314,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
pm.syncTransactions(p)
+ pm.syncNodeMetas(p)
// If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork
if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
@@ -699,18 +713,18 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
pm.txpool.AddRemotes(txs)
- case msg.Code == NotaryNodeInfoMsg:
- var info notaryNodeInfo
- if err := msg.Decode(&info); err != nil {
+ case msg.Code == MetaMsg:
+ var metas []*NodeMeta
+ if err := msg.Decode(&metas); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
-
- // TODO(sonic): validate signature
- p.MarkNotaryNodeInfo(info.Hash())
- // Broadcast to peers
- pm.BroadcastNotaryNodeInfo(&info)
-
- pm.notarySetManager.TryAddInfo(&info)
+ for i, meta := range metas {
+ if meta == nil {
+ return errResp(ErrDecode, "node meta %d is nil", i)
+ }
+ p.MarkNodeMeta(meta.Hash())
+ }
+ pm.nodeTable.Add(metas)
default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
}
@@ -769,11 +783,20 @@ func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
}
}
-// BroadcastNotaryNodeInfo will propagate notary node info to its peers.
-func (pm *ProtocolManager) BroadcastNotaryNodeInfo(info *notaryNodeInfo) {
- peers := pm.peers.PeersWithoutNotaryNodeInfo(info.Hash())
- for _, peer := range peers {
- peer.AsyncSendNotaryNodeInfo(info)
+// BroadcastMetas will propagate node metas to its peers.
+func (pm *ProtocolManager) BroadcastMetas(metas []*NodeMeta) {
+ var metaset = make(map[*peer][]*NodeMeta)
+
+ for _, meta := range metas {
+ peers := pm.peers.PeersWithoutNodeMeta(meta.Hash())
+ for _, peer := range peers {
+ metaset[peer] = append(metaset[peer], meta)
+ }
+ log.Trace("Broadcast meta", "ID", meta.ID, "recipients", len(peers))
+ }
+
+ for peer, metas := range metaset {
+ peer.AsyncSendNodeMetas(metas)
}
}
@@ -801,97 +824,32 @@ func (pm *ProtocolManager) txBroadcastLoop() {
}
}
-// a loop keep building and maintaining peers in notary set.
-func (pm *ProtocolManager) notarySetLoop() {
- // TODO:
- // * pm need to know current round, and whether we are in notary set.
- // * (done) able to handle node info in the near future round.
- // * check peer limit.
- // * revisit channels (newNotarySetCh, newRoundCh, newNotaryNodeInfoCh) (buffered ?)
-
- // If we are in notary set and we are synced with network.
- // events:
- // * new notary set is determined, and we are in notary set.
- // (TBD: subscribe the event or polling govornance)
- // - advance round
- // - build new notary set
- // - remove old notary set, remove old notary set from static
-
- // * current round node info changed.
-
- self := pm.srvr.Self()
- var round uint64
-
+func (pm *ProtocolManager) metaBroadcastLoop() {
for {
select {
- case event := <-pm.newNotarySetCh:
- // new notary set is determined.
- if _, ok := event.Set[self.ID.String()]; ok {
- // initialize the new notary set and add it to pm.notarySetManager
- s := newNotarySet(event.Round, event.Set)
- pm.notarySetManager.Register(event.Round, s)
-
- // TODO: handle signature
- pm.BroadcastNotaryNodeInfo(&notaryNodeInfo{
- ID: self.ID,
- IP: self.IP,
- UDP: self.UDP,
- TCP: self.TCP,
- Round: event.Round,
- Timestamp: time.Now().Unix(),
- })
- }
-
- case r := <-pm.newRoundCh:
- // move to new round.
- round = r
+ case event := <-pm.metasCh:
+ pm.BroadcastMetas(event.Metas)
- // TODO: revisit this to make sure how many previous round's
- // notary set we need to keep.
-
- // rmove notary set before current round, they are useless
- for _, s := range pm.notarySetManager.Before(round) {
- pm.removeNotarySetFromStatic(s)
- }
-
- // prepare notary set connections for new round.
- if pm.isInNotarySet(round + 1) {
- // we assume the notary set for round + 1 is added to
- // pm.notarySetManager before this step.
- if n, ok := pm.notarySetManager.Round(round + 1); ok {
- pm.addNotarySetToStatic(n)
- }
- }
-
- case <-pm.newNotaryNodeInfoCh:
- // some node info in "current round" is updated.
- // try to connect them by the new info.
- if pm.isInNotarySet(round) {
- if n, ok := pm.notarySetManager.Round(round); ok {
- pm.addNotarySetToStatic(n)
- }
- }
-
- case <-pm.quitSync:
+ // Err() channel will be closed when unsubscribing.
+ case <-pm.metasSub.Err():
return
}
}
}
-func (pm *ProtocolManager) isInNotarySet(r uint64) bool {
- return false
-}
-
-func (pm *ProtocolManager) addNotarySetToStatic(n *notarySet) {
- for _, node := range n.NodesToAdd() {
- pm.srvr.AddNotaryPeer(node)
- n.MarkAdded(node.ID.String())
- }
-}
-
-func (pm *ProtocolManager) removeNotarySetFromStatic(n *notarySet) {
- for _, node := range n.Nodes() {
- pm.srvr.RemoveNotaryPeer(node)
+// a loop keep building and maintaining peers in notary set.
+// TODO: finish this
+func (pm *ProtocolManager) peerSetLoop() {
+ for {
+ select {
+ case event := <-pm.crsCh:
+ pm.peers.BuildNotaryConn(event.Round)
+ pm.peers.BuildDKGConn(event.Round)
+ pm.peers.ForgetNotaryConn(event.Round - 1)
+ pm.peers.ForgetDKGConn(event.Round - 1)
+ case <-pm.quitSync:
+ return
+ }
}
}
diff --git a/dex/handler_test.go b/dex/handler_test.go
new file mode 100644
index 000000000..981362bb5
--- /dev/null
+++ b/dex/handler_test.go
@@ -0,0 +1,445 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package dex
+
+import (
+ "math"
+ "math/big"
+ "math/rand"
+ "testing"
+
+ "github.com/dexon-foundation/dexon/common"
+ "github.com/dexon-foundation/dexon/core"
+ "github.com/dexon-foundation/dexon/core/state"
+ "github.com/dexon-foundation/dexon/core/types"
+ "github.com/dexon-foundation/dexon/crypto"
+ "github.com/dexon-foundation/dexon/eth/downloader"
+ "github.com/dexon-foundation/dexon/ethdb"
+ "github.com/dexon-foundation/dexon/p2p"
+ "github.com/dexon-foundation/dexon/params"
+)
+
+// Tests that protocol versions and modes of operations are matched up properly.
+func TestProtocolCompatibility(t *testing.T) {
+ // Define the compatibility chart
+ tests := []struct {
+ version uint
+ mode downloader.SyncMode
+ compatible bool
+ }{
+ {61, downloader.FullSync, true}, {62, downloader.FullSync, true}, {63, downloader.FullSync, true},
+ {61, downloader.FastSync, true}, {62, downloader.FastSync, true}, {63, downloader.FastSync, true},
+ }
+ // Make sure anything we screw up is restored
+ backup := ProtocolVersions
+ defer func() { ProtocolVersions = backup }()
+
+ // Try all available compatibility configs and check for errors
+ for i, tt := range tests {
+ ProtocolVersions = []uint{tt.version}
+
+ pm, _, err := newTestProtocolManager(tt.mode, 0, nil, nil)
+ if pm != nil {
+ defer pm.Stop()
+ }
+ if (err == nil && !tt.compatible) || (err != nil && tt.compatible) {
+ t.Errorf("test %d: compatibility mismatch: have error %v, want compatibility %v", i, err, tt.compatible)
+ }
+ }
+}
+
+// Tests that block headers can be retrieved from a remote chain based on user queries.
+func TestGetBlockHeaders62(t *testing.T) { testGetBlockHeaders(t, 62) }
+func TestGetBlockHeaders63(t *testing.T) { testGetBlockHeaders(t, 63) }
+
+func testGetBlockHeaders(t *testing.T, protocol int) {
+ pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, downloader.MaxHashFetch+15, nil, nil)
+ peer, _ := newTestPeer("peer", protocol, pm, true)
+ defer peer.close()
+
+ // Create a "random" unknown hash for testing
+ var unknown common.Hash
+ for i := range unknown {
+ unknown[i] = byte(i)
+ }
+ // Create a batch of tests for various scenarios
+ limit := uint64(downloader.MaxHeaderFetch)
+ tests := []struct {
+ query *getBlockHeadersData // The query to execute for header retrieval
+ expect []common.Hash // The hashes of the block whose headers are expected
+ }{
+ // A single random block should be retrievable by hash and number too
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Hash: pm.blockchain.GetBlockByNumber(limit / 2).Hash()}, Amount: 1},
+ []common.Hash{pm.blockchain.GetBlockByNumber(limit / 2).Hash()},
+ }, {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Amount: 1},
+ []common.Hash{pm.blockchain.GetBlockByNumber(limit / 2).Hash()},
+ },
+ // Multiple headers should be retrievable in both directions
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Amount: 3},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(limit / 2).Hash(),
+ pm.blockchain.GetBlockByNumber(limit/2 + 1).Hash(),
+ pm.blockchain.GetBlockByNumber(limit/2 + 2).Hash(),
+ },
+ }, {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Amount: 3, Reverse: true},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(limit / 2).Hash(),
+ pm.blockchain.GetBlockByNumber(limit/2 - 1).Hash(),
+ pm.blockchain.GetBlockByNumber(limit/2 - 2).Hash(),
+ },
+ },
+ // Multiple headers with skip lists should be retrievable
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Skip: 3, Amount: 3},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(limit / 2).Hash(),
+ pm.blockchain.GetBlockByNumber(limit/2 + 4).Hash(),
+ pm.blockchain.GetBlockByNumber(limit/2 + 8).Hash(),
+ },
+ }, {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Skip: 3, Amount: 3, Reverse: true},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(limit / 2).Hash(),
+ pm.blockchain.GetBlockByNumber(limit/2 - 4).Hash(),
+ pm.blockchain.GetBlockByNumber(limit/2 - 8).Hash(),
+ },
+ },
+ // The chain endpoints should be retrievable
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: 0}, Amount: 1},
+ []common.Hash{pm.blockchain.GetBlockByNumber(0).Hash()},
+ }, {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: pm.blockchain.CurrentBlock().NumberU64()}, Amount: 1},
+ []common.Hash{pm.blockchain.CurrentBlock().Hash()},
+ },
+ // Ensure protocol limits are honored
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: pm.blockchain.CurrentBlock().NumberU64() - 1}, Amount: limit + 10, Reverse: true},
+ pm.blockchain.GetBlockHashesFromHash(pm.blockchain.CurrentBlock().Hash(), limit),
+ },
+ // Check that requesting more than available is handled gracefully
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: pm.blockchain.CurrentBlock().NumberU64() - 4}, Skip: 3, Amount: 3},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(pm.blockchain.CurrentBlock().NumberU64() - 4).Hash(),
+ pm.blockchain.GetBlockByNumber(pm.blockchain.CurrentBlock().NumberU64()).Hash(),
+ },
+ }, {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: 4}, Skip: 3, Amount: 3, Reverse: true},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(4).Hash(),
+ pm.blockchain.GetBlockByNumber(0).Hash(),
+ },
+ },
+ // Check that requesting more than available is handled gracefully, even if mid skip
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: pm.blockchain.CurrentBlock().NumberU64() - 4}, Skip: 2, Amount: 3},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(pm.blockchain.CurrentBlock().NumberU64() - 4).Hash(),
+ pm.blockchain.GetBlockByNumber(pm.blockchain.CurrentBlock().NumberU64() - 1).Hash(),
+ },
+ }, {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: 4}, Skip: 2, Amount: 3, Reverse: true},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(4).Hash(),
+ pm.blockchain.GetBlockByNumber(1).Hash(),
+ },
+ },
+ // Check a corner case where requesting more can iterate past the endpoints
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: 2}, Amount: 5, Reverse: true},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(2).Hash(),
+ pm.blockchain.GetBlockByNumber(1).Hash(),
+ pm.blockchain.GetBlockByNumber(0).Hash(),
+ },
+ },
+ // Check a corner case where skipping overflow loops back into the chain start
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Hash: pm.blockchain.GetBlockByNumber(3).Hash()}, Amount: 2, Reverse: false, Skip: math.MaxUint64 - 1},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(3).Hash(),
+ },
+ },
+ // Check a corner case where skipping overflow loops back to the same header
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Hash: pm.blockchain.GetBlockByNumber(1).Hash()}, Amount: 2, Reverse: false, Skip: math.MaxUint64},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(1).Hash(),
+ },
+ },
+ // Check that non existing headers aren't returned
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Hash: unknown}, Amount: 1},
+ []common.Hash{},
+ }, {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: pm.blockchain.CurrentBlock().NumberU64() + 1}, Amount: 1},
+ []common.Hash{},
+ },
+ }
+ // Run each of the tests and verify the results against the chain
+ for i, tt := range tests {
+ // Collect the headers to expect in the response
+ headers := []*types.Header{}
+ for _, hash := range tt.expect {
+ headers = append(headers, pm.blockchain.GetBlockByHash(hash).Header())
+ }
+ // Send the hash request and verify the response
+ p2p.Send(peer.app, 0x03, tt.query)
+ if err := p2p.ExpectMsg(peer.app, 0x04, headers); err != nil {
+ t.Errorf("test %d: headers mismatch: %v", i, err)
+ }
+ // If the test used number origins, repeat with hashes as the too
+ if tt.query.Origin.Hash == (common.Hash{}) {
+ if origin := pm.blockchain.GetBlockByNumber(tt.query.Origin.Number); origin != nil {
+ tt.query.Origin.Hash, tt.query.Origin.Number = origin.Hash(), 0
+
+ p2p.Send(peer.app, 0x03, tt.query)
+ if err := p2p.ExpectMsg(peer.app, 0x04, headers); err != nil {
+ t.Errorf("test %d: headers mismatch: %v", i, err)
+ }
+ }
+ }
+ }
+}
+
+// Tests that block contents can be retrieved from a remote chain based on their hashes.
+func TestGetBlockBodies62(t *testing.T) { testGetBlockBodies(t, 62) }
+func TestGetBlockBodies63(t *testing.T) { testGetBlockBodies(t, 63) }
+
+func testGetBlockBodies(t *testing.T, protocol int) {
+ pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, downloader.MaxBlockFetch+15, nil, nil)
+ peer, _ := newTestPeer("peer", protocol, pm, true)
+ defer peer.close()
+
+ // Create a batch of tests for various scenarios
+ limit := downloader.MaxBlockFetch
+ tests := []struct {
+ random int // Number of blocks to fetch randomly from the chain
+ explicit []common.Hash // Explicitly requested blocks
+ available []bool // Availability of explicitly requested blocks
+ expected int // Total number of existing blocks to expect
+ }{
+ {1, nil, nil, 1}, // A single random block should be retrievable
+ {10, nil, nil, 10}, // Multiple random blocks should be retrievable
+ {limit, nil, nil, limit}, // The maximum possible blocks should be retrievable
+ {limit + 1, nil, nil, limit}, // No more than the possible block count should be returned
+ {0, []common.Hash{pm.blockchain.Genesis().Hash()}, []bool{true}, 1}, // The genesis block should be retrievable
+ {0, []common.Hash{pm.blockchain.CurrentBlock().Hash()}, []bool{true}, 1}, // The chains head block should be retrievable
+ {0, []common.Hash{{}}, []bool{false}, 0}, // A non existent block should not be returned
+
+ // Existing and non-existing blocks interleaved should not cause problems
+ {0, []common.Hash{
+ {},
+ pm.blockchain.GetBlockByNumber(1).Hash(),
+ {},
+ pm.blockchain.GetBlockByNumber(10).Hash(),
+ {},
+ pm.blockchain.GetBlockByNumber(100).Hash(),
+ {},
+ }, []bool{false, true, false, true, false, true, false}, 3},
+ }
+ // Run each of the tests and verify the results against the chain
+ for i, tt := range tests {
+ // Collect the hashes to request, and the response to expect
+ hashes, seen := []common.Hash{}, make(map[int64]bool)
+ bodies := []*blockBody{}
+
+ for j := 0; j < tt.random; j++ {
+ for {
+ num := rand.Int63n(int64(pm.blockchain.CurrentBlock().NumberU64()))
+ if !seen[num] {
+ seen[num] = true
+
+ block := pm.blockchain.GetBlockByNumber(uint64(num))
+ hashes = append(hashes, block.Hash())
+ if len(bodies) < tt.expected {
+ bodies = append(bodies, &blockBody{Transactions: block.Transactions(), Uncles: block.Uncles()})
+ }
+ break
+ }
+ }
+ }
+ for j, hash := range tt.explicit {
+ hashes = append(hashes, hash)
+ if tt.available[j] && len(bodies) < tt.expected {
+ block := pm.blockchain.GetBlockByHash(hash)
+ bodies = append(bodies, &blockBody{Transactions: block.Transactions(), Uncles: block.Uncles()})
+ }
+ }
+ // Send the hash request and verify the response
+ p2p.Send(peer.app, 0x05, hashes)
+ if err := p2p.ExpectMsg(peer.app, 0x06, bodies); err != nil {
+ t.Errorf("test %d: bodies mismatch: %v", i, err)
+ }
+ }
+}
+
+// Tests that the node state database can be retrieved based on hashes.
+func TestGetNodeData63(t *testing.T) { testGetNodeData(t, 63) }
+
+func testGetNodeData(t *testing.T, protocol int) {
+ // Define three accounts to simulate transactions with
+ acc1Key, _ := crypto.HexToECDSA("8a1f9a8f95be41cd7ccb6168179afb4504aefe388d1e14474d32c45c72ce7b7a")
+ acc2Key, _ := crypto.HexToECDSA("49a7b37aa6f6645917e7b807e9d1c00d4fa71f18343b0d4122a4d2df64dd6fee")
+ acc1Addr := crypto.PubkeyToAddress(acc1Key.PublicKey)
+ acc2Addr := crypto.PubkeyToAddress(acc2Key.PublicKey)
+
+ signer := types.HomesteadSigner{}
+ // Create a chain generator with some simple transactions (blatantly stolen from @fjl/chain_markets_test)
+ generator := func(i int, block *core.BlockGen) {
+ switch i {
+ case 0:
+ // In block 1, the test bank sends account #1 some ether.
+ tx, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBank), acc1Addr, big.NewInt(10000), params.TxGas, nil, nil), signer, testBankKey)
+ block.AddTx(tx)
+ case 1:
+ // In block 2, the test bank sends some more ether to account #1.
+ // acc1Addr passes it on to account #2.
+ tx1, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBank), acc1Addr, big.NewInt(1000), params.TxGas, nil, nil), signer, testBankKey)
+ tx2, _ := types.SignTx(types.NewTransaction(block.TxNonce(acc1Addr), acc2Addr, big.NewInt(1000), params.TxGas, nil, nil), signer, acc1Key)
+ block.AddTx(tx1)
+ block.AddTx(tx2)
+ case 2:
+ // Block 3 is empty but was mined by account #2.
+ block.SetCoinbase(acc2Addr)
+ block.SetExtra([]byte("yeehaw"))
+ case 3:
+ // Block 4 includes blocks 2 and 3 as uncle headers (with modified extra data).
+ b2 := block.PrevBlock(1).Header()
+ b2.Extra = []byte("foo")
+ block.AddUncle(b2)
+ b3 := block.PrevBlock(2).Header()
+ b3.Extra = []byte("foo")
+ block.AddUncle(b3)
+ }
+ }
+ // Assemble the test environment
+ pm, db := newTestProtocolManagerMust(t, downloader.FullSync, 4, generator, nil)
+ peer, _ := newTestPeer("peer", protocol, pm, true)
+ defer peer.close()
+
+ // Fetch for now the entire chain db
+ hashes := []common.Hash{}
+ for _, key := range db.Keys() {
+ if len(key) == len(common.Hash{}) {
+ hashes = append(hashes, common.BytesToHash(key))
+ }
+ }
+ p2p.Send(peer.app, 0x0d, hashes)
+ msg, err := peer.app.ReadMsg()
+ if err != nil {
+ t.Fatalf("failed to read node data response: %v", err)
+ }
+ if msg.Code != 0x0e {
+ t.Fatalf("response packet code mismatch: have %x, want %x", msg.Code, 0x0c)
+ }
+ var data [][]byte
+ if err := msg.Decode(&data); err != nil {
+ t.Fatalf("failed to decode response node data: %v", err)
+ }
+ // Verify that all hashes correspond to the requested data, and reconstruct a state tree
+ for i, want := range hashes {
+ if hash := crypto.Keccak256Hash(data[i]); hash != want {
+ t.Errorf("data hash mismatch: have %x, want %x", hash, want)
+ }
+ }
+ statedb := ethdb.NewMemDatabase()
+ for i := 0; i < len(data); i++ {
+ statedb.Put(hashes[i].Bytes(), data[i])
+ }
+ accounts := []common.Address{testBank, acc1Addr, acc2Addr}
+ for i := uint64(0); i <= pm.blockchain.CurrentBlock().NumberU64(); i++ {
+ trie, _ := state.New(pm.blockchain.GetBlockByNumber(i).Root(), state.NewDatabase(statedb))
+
+ for j, acc := range accounts {
+ state, _ := pm.blockchain.State()
+ bw := state.GetBalance(acc)
+ bh := trie.GetBalance(acc)
+
+ if (bw != nil && bh == nil) || (bw == nil && bh != nil) {
+ t.Errorf("test %d, account %d: balance mismatch: have %v, want %v", i, j, bh, bw)
+ }
+ if bw != nil && bh != nil && bw.Cmp(bw) != 0 {
+ t.Errorf("test %d, account %d: balance mismatch: have %v, want %v", i, j, bh, bw)
+ }
+ }
+ }
+}
+
+// Tests that the transaction receipts can be retrieved based on hashes.
+func TestGetReceipt63(t *testing.T) { testGetReceipt(t, 63) }
+
+func testGetReceipt(t *testing.T, protocol int) {
+ // Define three accounts to simulate transactions with
+ acc1Key, _ := crypto.HexToECDSA("8a1f9a8f95be41cd7ccb6168179afb4504aefe388d1e14474d32c45c72ce7b7a")
+ acc2Key, _ := crypto.HexToECDSA("49a7b37aa6f6645917e7b807e9d1c00d4fa71f18343b0d4122a4d2df64dd6fee")
+ acc1Addr := crypto.PubkeyToAddress(acc1Key.PublicKey)
+ acc2Addr := crypto.PubkeyToAddress(acc2Key.PublicKey)
+
+ signer := types.HomesteadSigner{}
+ // Create a chain generator with some simple transactions (blatantly stolen from @fjl/chain_markets_test)
+ generator := func(i int, block *core.BlockGen) {
+ switch i {
+ case 0:
+ // In block 1, the test bank sends account #1 some ether.
+ tx, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBank), acc1Addr, big.NewInt(10000), params.TxGas, nil, nil), signer, testBankKey)
+ block.AddTx(tx)
+ case 1:
+ // In block 2, the test bank sends some more ether to account #1.
+ // acc1Addr passes it on to account #2.
+ tx1, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBank), acc1Addr, big.NewInt(1000), params.TxGas, nil, nil), signer, testBankKey)
+ tx2, _ := types.SignTx(types.NewTransaction(block.TxNonce(acc1Addr), acc2Addr, big.NewInt(1000), params.TxGas, nil, nil), signer, acc1Key)
+ block.AddTx(tx1)
+ block.AddTx(tx2)
+ case 2:
+ // Block 3 is empty but was mined by account #2.
+ block.SetCoinbase(acc2Addr)
+ block.SetExtra([]byte("yeehaw"))
+ case 3:
+ // Block 4 includes blocks 2 and 3 as uncle headers (with modified extra data).
+ b2 := block.PrevBlock(1).Header()
+ b2.Extra = []byte("foo")
+ block.AddUncle(b2)
+ b3 := block.PrevBlock(2).Header()
+ b3.Extra = []byte("foo")
+ block.AddUncle(b3)
+ }
+ }
+ // Assemble the test environment
+ pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 4, generator, nil)
+ peer, _ := newTestPeer("peer", protocol, pm, true)
+ defer peer.close()
+
+ // Collect the hashes to request, and the response to expect
+ hashes, receipts := []common.Hash{}, []types.Receipts{}
+ for i := uint64(0); i <= pm.blockchain.CurrentBlock().NumberU64(); i++ {
+ block := pm.blockchain.GetBlockByNumber(i)
+
+ hashes = append(hashes, block.Hash())
+ receipts = append(receipts, pm.blockchain.GetReceiptsByHash(block.Hash()))
+ }
+ // Send the hash request and verify the response
+ p2p.Send(peer.app, 0x0f, hashes)
+ if err := p2p.ExpectMsg(peer.app, 0x10, receipts); err != nil {
+ t.Errorf("receipts mismatch: %v", err)
+ }
+}
diff --git a/dex/helper_test.go b/dex/helper_test.go
index 7e3479958..dcda6f4d2 100644
--- a/dex/helper_test.go
+++ b/dex/helper_test.go
@@ -37,7 +37,7 @@ import (
"github.com/dexon-foundation/dexon/ethdb"
"github.com/dexon-foundation/dexon/event"
"github.com/dexon-foundation/dexon/p2p"
- "github.com/dexon-foundation/dexon/p2p/discover"
+ "github.com/dexon-foundation/dexon/p2p/enode"
"github.com/dexon-foundation/dexon/params"
)
@@ -48,24 +48,47 @@ var (
// testP2PServer is a fake, helper p2p server for testing purposes.
type testP2PServer struct {
- added chan *discover.Node
- removed chan *discover.Node
+ mu sync.Mutex
+ self *enode.Node
+ direct map[enode.NodeID]*enode.Node
+ group map[string][]*enode.Node
}
-func (s *testP2PServer) Self() *discover.Node {
- return &discover.Node{}
+func newTestP2PServer(self *enode.Node) *testP2PServer {
+ return &testP2PServer{
+ self: self,
+ direct: make(map[enode.NodeID]*enode.Node),
+ group: make(map[string][]*enode.Node),
+ }
}
-func (s *testP2PServer) AddNotaryPeer(node *discover.Node) {
- if s.added != nil {
- s.added <- node
- }
+func (s *testP2PServer) Self() *enode.Node {
+ return s.self
}
-func (s *testP2PServer) RemoveNotaryPeer(node *discover.Node) {
- if s.removed != nil {
- s.removed <- node
- }
+func (s *testP2PServer) AddDirectPeer(node *enode.Node) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.direct[node.ID] = node
+}
+
+func (s *testP2PServer) RemoveDirectPeer(node *enode.Node) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ delete(s.direct, node.ID)
+}
+
+func (s *testP2PServer) AddGroup(
+ name string, nodes []*enode.Node, num uint64) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.group[name] = nodes
+}
+
+func (s *testP2PServer) RemoveGroup(name string) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ delete(s.group, name)
}
// newTestProtocolManager creates a new protocol manager for testing purposes,
@@ -88,11 +111,11 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func
panic(err)
}
- pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db)
+ pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db, &testGovernance{})
if err != nil {
return nil, nil, err
}
- pm.Start(&testP2PServer{}, 1000)
+ pm.Start(newTestP2PServer(&enode.Node{}), 1000)
return pm, db, nil
}
@@ -157,6 +180,29 @@ func newTestTransaction(from *ecdsa.PrivateKey, nonce uint64, datasize int) *typ
return tx
}
+// testGovernance is a fake, helper governance for testing purposes
+type testGovernance struct {
+ getChainNumFunc func(uint64) uint32
+ getNotarySetFunc func(uint32, uint64) map[string]struct{}
+ getDKGSetFunc func(uint64) map[string]struct{}
+}
+
+func (g *testGovernance) GetChainNum(round uint64) uint32 {
+ return g.getChainNumFunc(round)
+}
+
+func (g *testGovernance) GetNotarySet(chainID uint32, round uint64) map[string]struct{} {
+ return g.getNotarySetFunc(chainID, round)
+}
+
+func (g *testGovernance) GetDKGSet(round uint64) map[string]struct{} {
+ return g.getDKGSetFunc(round)
+}
+
+func (g *testGovernance) SubscribeNewCRSEvent(ch chan core.NewCRSEvent) event.Subscription {
+ return nil
+}
+
// testPeer is a simulated peer to allow testing direct network calls.
type testPeer struct {
net p2p.MsgReadWriter // Network layer reader/writer to simulate remote messaging
@@ -170,7 +216,7 @@ func newTestPeer(name string, version int, pm *ProtocolManager, shake bool) (*te
app, net := p2p.MsgPipe()
// Generate a random id and create the peer
- var id discover.NodeID
+ var id enode.NodeID
rand.Read(id[:])
peer := pm.newPeer(version, p2p.NewPeer(id, name, nil), net)
diff --git a/dex/network.go b/dex/network.go
index c3a0ef792..b19b46147 100644
--- a/dex/network.go
+++ b/dex/network.go
@@ -23,14 +23,6 @@ func (n *DexconNetwork) BroadcastVote(vote *types.Vote) {
func (n *DexconNetwork) BroadcastBlock(block *types.Block) {
}
-// BroadcastRandomnessRequest broadcasts rand request to DKG set.
-func (n *DexconNetwork) BroadcastRandomnessRequest(randRequest *types.AgreementResult) {
-}
-
-// BroadcastRandomnessResult broadcasts rand request to Notary set.
-func (n *DexconNetwork) BroadcastRandomnessResult(randResult *types.BlockRandomnessResult) {
-}
-
// SendDKGPrivateShare sends PrivateShare to a DKG participant.
func (n *DexconNetwork) SendDKGPrivateShare(
pub crypto.PublicKey, prvShare *types.DKGPrivateShare) {
@@ -47,6 +39,15 @@ func (n *DexconNetwork) BroadcastDKGPartialSignature(
psig *types.DKGPartialSignature) {
}
+// BroadcastRandomnessRequest broadcasts rand request to DKG set.
+func (n *DexconNetwork) BroadcastRandomnessRequest(randRequest *types.AgreementResult) {
+
+}
+
+// BroadcastRandomnessResult broadcasts rand request to Notary set.
+func (n *DexconNetwork) BroadcastRandomnessResult(randResult *types.BlockRandomnessResult) {
+}
+
// ReceiveChan returns a channel to receive messages from DEXON network.
func (n *DexconNetwork) ReceiveChan() <-chan interface{} {
return n.receiveChan
diff --git a/dex/nodetable.go b/dex/nodetable.go
new file mode 100644
index 000000000..929b168a8
--- /dev/null
+++ b/dex/nodetable.go
@@ -0,0 +1,79 @@
+package dex
+
+import (
+ "net"
+ "sync"
+
+ "github.com/dexon-foundation/dexon/common"
+ "github.com/dexon-foundation/dexon/crypto/sha3"
+ "github.com/dexon-foundation/dexon/event"
+ "github.com/dexon-foundation/dexon/p2p/enode"
+ "github.com/dexon-foundation/dexon/rlp"
+)
+
+type NodeMeta struct {
+ ID enode.ID
+ IP net.IP
+ UDP int
+ TCP int
+ Timestamp uint64
+ Sig []byte
+}
+
+func (n *NodeMeta) Hash() (h common.Hash) {
+ hw := sha3.NewKeccak256()
+ rlp.Encode(hw, n)
+ hw.Sum(h[:0])
+ return h
+}
+
+type newMetasEvent struct{ Metas []*NodeMeta }
+
+type nodeTable struct {
+ mu sync.RWMutex
+ entry map[enode.ID]*NodeMeta
+ feed event.Feed
+}
+
+func newNodeTable() *nodeTable {
+ return &nodeTable{
+ entry: make(map[enode.ID]*NodeMeta),
+ }
+}
+
+func (t *nodeTable) Get(id enode.ID) *NodeMeta {
+ t.mu.RLock()
+ defer t.mu.RUnlock()
+ return t.entry[id]
+}
+
+func (t *nodeTable) Add(metas []*NodeMeta) {
+ t.mu.Lock()
+ defer t.mu.Unlock()
+
+ var newMetas []*NodeMeta
+ for _, meta := range metas {
+ // TODO: validate the meta
+ if e, ok := t.entry[meta.ID]; ok && e.Timestamp > meta.Timestamp {
+ continue
+ }
+ t.entry[meta.ID] = meta
+ newMetas = append(newMetas, meta)
+ }
+ t.feed.Send(newMetasEvent{newMetas})
+}
+
+func (t *nodeTable) Metas() []*NodeMeta {
+ t.mu.RLock()
+ defer t.mu.RUnlock()
+ metas := make([]*NodeMeta, 0, len(t.entry))
+ for _, meta := range t.entry {
+ metas = append(metas, meta)
+ }
+ return metas
+}
+
+func (t *nodeTable) SubscribeNewMetasEvent(
+ ch chan<- newMetasEvent) event.Subscription {
+ return t.feed.Subscribe(ch)
+}
diff --git a/dex/nodetable_test.go b/dex/nodetable_test.go
new file mode 100644
index 000000000..5b3f7de57
--- /dev/null
+++ b/dex/nodetable_test.go
@@ -0,0 +1,93 @@
+package dex
+
+import (
+ "math/rand"
+ "testing"
+ "time"
+
+ "github.com/dexon-foundation/dexon/common"
+ "github.com/dexon-foundation/dexon/p2p/enode"
+)
+
+func TestNodeTable(t *testing.T) {
+ table := newNodeTable()
+ ch := make(chan newMetasEvent)
+ table.SubscribeNewMetasEvent(ch)
+
+ metas1 := []*NodeMeta{
+ &NodeMeta{ID: randomID()},
+ &NodeMeta{ID: randomID()},
+ }
+
+ metas2 := []*NodeMeta{
+ &NodeMeta{ID: randomID()},
+ &NodeMeta{ID: randomID()},
+ }
+
+ go table.Add(metas1)
+
+ select {
+ case newMetas := <-ch:
+ m := map[common.Hash]struct{}{}
+ for _, meta := range newMetas.Metas {
+ m[meta.Hash()] = struct{}{}
+ }
+
+ if len(m) != len(metas1) {
+ t.Errorf("len mismatch: got %d, want: %d",
+ len(m), len(metas1))
+ }
+
+ for _, meta := range metas1 {
+ if _, ok := m[meta.Hash()]; !ok {
+ t.Errorf("expected meta (%s) not exists", meta.Hash())
+ }
+ }
+ case <-time.After(1 * time.Second):
+ t.Error("did not receive new metas event within one second")
+ }
+
+ go table.Add(metas2)
+ select {
+ case newMetas := <-ch:
+ m := map[common.Hash]struct{}{}
+ for _, meta := range newMetas.Metas {
+ m[meta.Hash()] = struct{}{}
+ }
+
+ if len(m) != len(metas1) {
+ t.Errorf("len mismatch: got %d, want: %d",
+ len(m), len(metas2))
+ }
+
+ for _, meta := range metas2 {
+ if _, ok := m[meta.Hash()]; !ok {
+ t.Errorf("expected meta (%s) not exists", meta.Hash())
+ }
+ }
+ case <-time.After(1 * time.Second):
+ t.Error("did not receive new metas event within one second")
+ }
+
+ var metas []*NodeMeta
+ metas = append(metas, metas1...)
+ metas = append(metas, metas2...)
+ allMetas := table.Metas()
+ if len(allMetas) != len(metas) {
+ t.Errorf("all metas num mismatch: got %d, want %d",
+ len(metas), len(allMetas))
+ }
+
+ for _, m := range metas {
+ if m.Hash() != table.Get(m.ID).Hash() {
+ t.Errorf("meta (%s) mismatch", m.ID.String())
+ }
+ }
+}
+
+func randomID() (id enode.ID) {
+ for i := range id {
+ id[i] = byte(rand.Intn(255))
+ }
+ return id
+}
diff --git a/dex/notaryset.go b/dex/notaryset.go
deleted file mode 100644
index 74d259314..000000000
--- a/dex/notaryset.go
+++ /dev/null
@@ -1,203 +0,0 @@
-package dex
-
-import (
- "fmt"
- "sync"
-
- "github.com/dexon-foundation/dexon/p2p/discover"
-)
-
-type nodeInfo struct {
- info *notaryNodeInfo
- added bool
-}
-
-func (n *nodeInfo) NewNode() *discover.Node {
- return discover.NewNode(n.info.ID, n.info.IP, n.info.UDP, n.info.TCP)
-}
-
-type notarySet struct {
- round uint64
- m map[string]*nodeInfo
- lock sync.RWMutex
-}
-
-func newNotarySet(round uint64, s map[string]struct{}) *notarySet {
- m := make(map[string]*nodeInfo)
- for nodeID := range s {
- m[nodeID] = &nodeInfo{}
- }
-
- return &notarySet{
- round: round,
- m: m,
- }
-}
-
-// Call this function when the notaryNodeInfoMsg is received.
-func (n *notarySet) AddInfo(info *notaryNodeInfo) error {
- n.lock.Lock()
- defer n.lock.Unlock()
-
- // check round
- if info.Round != n.round {
- return fmt.Errorf("invalid round")
- }
-
- nInfo, ok := n.m[info.ID.String()]
- if !ok {
- return fmt.Errorf("not in notary set")
- }
-
- // if the info exists check timstamp
- if nInfo.info != nil {
- if nInfo.info.Timestamp > info.Timestamp {
- return fmt.Errorf("old msg")
- }
- }
-
- nInfo.info = info
- return nil
-}
-
-// MarkAdded mark the notary node as added
-// to prevent duplcate addition in the future.
-func (n *notarySet) MarkAdded(nodeID string) {
- if info, ok := n.m[nodeID]; ok {
- info.added = true
- }
-}
-
-// Return all nodes
-func (n *notarySet) Nodes() []*discover.Node {
- n.lock.RLock()
- defer n.lock.RUnlock()
-
- list := make([]*discover.Node, 0, len(n.m))
- for _, info := range n.m {
- list = append(list, info.NewNode())
- }
- return list
-}
-
-// Return nodes that need to be added to p2p server as notary node.
-func (n *notarySet) NodesToAdd() []*discover.Node {
- n.lock.RLock()
- defer n.lock.RUnlock()
-
- var list []*discover.Node
- for _, info := range n.m {
- // craete a new discover.Node
- if !info.added {
- list = append(list, info.NewNode())
- }
- }
- return list
-}
-
-type notarySetManager struct {
- m map[uint64]*notarySet
- lock sync.RWMutex
- queued map[uint64]map[string]*notaryNodeInfo
- round uint64 // biggest round of managed notary sets
- newNotaryNodeInfoCh chan *notaryNodeInfo
-}
-
-func newNotarySetManager(
- newNotaryNodeInfoCh chan *notaryNodeInfo) *notarySetManager {
- return &notarySetManager{
- m: make(map[uint64]*notarySet),
- queued: make(map[uint64]map[string]*notaryNodeInfo),
- newNotaryNodeInfoCh: newNotaryNodeInfoCh,
- }
-}
-
-// Register injects a new notary set into the manager and
-// processes the queued info.
-func (n *notarySetManager) Register(r uint64, s *notarySet) {
- n.lock.Lock()
- defer n.lock.Unlock()
- if r > n.round {
- n.round = r
- }
- n.m[r] = s
- n.processQueuedInfo()
-}
-
-// Unregister removes the notary set of the given round.
-func (n *notarySetManager) Unregister(r uint64) {
- n.lock.Lock()
- defer n.lock.Unlock()
- delete(n.m, r)
-}
-
-// Round returns the notary set of the given round.
-func (n *notarySetManager) Round(r uint64) (*notarySet, bool) {
- n.lock.RLock()
- defer n.lock.RUnlock()
- s, ok := n.m[r]
- return s, ok
-}
-
-// Before returns all the notary sets that before the given round.
-func (n *notarySetManager) Before(r uint64) []*notarySet {
- n.lock.RLock()
- defer n.lock.RUnlock()
- var list []*notarySet
- for round, s := range n.m {
- if round < r {
- list = append(list, s)
- }
- }
- return list
-}
-
-// TryAddInfo associates the given info to the notary set if the notary set is
-// managed by the manager.
-// If the notary node info is belong to future notary set, queue the info.
-func (n *notarySetManager) TryAddInfo(info *notaryNodeInfo) {
- n.lock.Lock()
- defer n.lock.Unlock()
- n.tryAddInfo(info)
-}
-
-// This function is extract for calling without lock.
-// Make sure the caller already accquired the lock.
-func (n *notarySetManager) tryAddInfo(info *notaryNodeInfo) {
- if info.Round > n.round {
- if q, ok := n.queued[info.Round]; ok {
- q[info.Hash().String()] = info
- return
- }
- n.queued[info.Round] = map[string]*notaryNodeInfo{
- info.Hash().String(): info,
- }
- return
- }
-
- s, ok := n.Round(info.Round)
- if !ok {
- return
- }
- s.AddInfo(info)
-
- // TODO(sonic): handle timeout
- n.newNotaryNodeInfoCh <- info
-}
-
-func (n *notarySetManager) processQueuedInfo() {
- n.lock.Lock()
- defer n.lock.Unlock()
- if q, ok := n.queued[n.round]; ok {
- for _, info := range q {
- n.tryAddInfo(info)
- }
- }
-
- // Clear queue infos before current round.
- for round := range n.queued {
- if round <= n.round {
- delete(n.queued, round)
- }
- }
-}
diff --git a/dex/peer.go b/dex/peer.go
index f1a4335d1..8c218f1d9 100644
--- a/dex/peer.go
+++ b/dex/peer.go
@@ -27,6 +27,7 @@ import (
"github.com/dexon-foundation/dexon/common"
"github.com/dexon-foundation/dexon/core/types"
"github.com/dexon-foundation/dexon/p2p"
+ "github.com/dexon-foundation/dexon/p2p/discover"
"github.com/dexon-foundation/dexon/rlp"
)
@@ -34,19 +35,20 @@ var (
errClosed = errors.New("peer set is closed")
errAlreadyRegistered = errors.New("peer is already registered")
errNotRegistered = errors.New("peer is not registered")
- errInvalidRound = errors.New("invalid round")
)
const (
maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
+ maxKnownMetas = 32768 // Maximum metas hashes to keep in the known list (prevent DOS)
maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
- maxKnownInfos = 1024
// maxQueuedTxs is the maximum number of transaction lists to queue up before
// dropping broadcasts. This is a sensitive number as a transaction list might
// contain a single transaction, or thousands.
maxQueuedTxs = 128
+ maxQueuedMetas = 512
+
// maxQueuedProps is the maximum number of block propagations to queue up before
// dropping broadcasts. There's not much point in queueing stale blocks, so a few
// that might cover uncles should be enough.
@@ -57,9 +59,9 @@ const (
// above some healthy uncle limit, so use that.
maxQueuedAnns = 4
- maxQueuedInfos = 1024
-
handshakeTimeout = 5 * time.Second
+
+ groupNodeNum = 3
)
// PeerInfo represents a short summary of the Ethereum sub-protocol metadata known
@@ -76,13 +78,27 @@ type propEvent struct {
td *big.Int
}
+type setType uint32
+
+const (
+ dkgset = iota
+ notaryset
+)
+
+type peerLabel struct {
+ set setType
+ chainID uint32
+ round uint64
+}
+
type peer struct {
id string
*p2p.Peer
rw p2p.MsgReadWriter
- version int // Protocol version negotiated
+ version int // Protocol version negotiated
+ labels mapset.Set
forkDrop *time.Timer // Timed connection dropper if forks aren't validated in time
head common.Hash
@@ -90,12 +106,12 @@ type peer struct {
lock sync.RWMutex
knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
+ knownMetas mapset.Set // Set of node metas known to be known by this peer
knownBlocks mapset.Set // Set of block hashes known to be known by this peer
- knownInfos mapset.Set // Set of infos known to be known by this peer
queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
+ queuedMetas chan []*NodeMeta // Queue of node metas to broadcast to the peer
queuedProps chan *propEvent // Queue of blocks to broadcast to the peer
queuedAnns chan *types.Block // Queue of blocks to announce to the peer
- queuedInfos chan *notaryNodeInfo // Queue of infos to broadcast to the peer
term chan struct{} // Termination channel to stop the broadcaster
}
@@ -104,20 +120,21 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
Peer: p,
rw: rw,
version: version,
- id: fmt.Sprintf("%x", p.ID().Bytes()[:8]),
+ labels: mapset.NewSet(),
+ id: p.ID().String(),
knownTxs: mapset.NewSet(),
+ knownMetas: mapset.NewSet(),
knownBlocks: mapset.NewSet(),
- knownInfos: mapset.NewSet(),
queuedTxs: make(chan []*types.Transaction, maxQueuedTxs),
+ queuedMetas: make(chan []*NodeMeta, maxQueuedMetas),
queuedProps: make(chan *propEvent, maxQueuedProps),
queuedAnns: make(chan *types.Block, maxQueuedAnns),
- queuedInfos: make(chan *notaryNodeInfo, maxQueuedInfos),
term: make(chan struct{}),
}
}
// broadcast is a write loop that multiplexes block propagations, announcements,
-// transaction and notary node infos broadcasts into the remote peer.
+// transaction and notary node metas broadcasts into the remote peer.
// The goal is to have an async writer that does not lock up node internals.
func (p *peer) broadcast() {
for {
@@ -128,6 +145,12 @@ func (p *peer) broadcast() {
}
p.Log().Trace("Broadcast transactions", "count", len(txs))
+ case metas := <-p.queuedMetas:
+ if err := p.SendNodeMetas(metas); err != nil {
+ return
+ }
+ p.Log().Trace("Broadcast node metas", "count", len(metas))
+
case prop := <-p.queuedProps:
if err := p.SendNewBlock(prop.block, prop.td); err != nil {
return
@@ -140,12 +163,6 @@ func (p *peer) broadcast() {
}
p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash())
- case info := <-p.queuedInfos:
- if err := p.SendNotaryNodeInfo(info); err != nil {
- return
- }
- p.Log().Trace("Broadcast notary node info")
-
case <-p.term:
return
}
@@ -157,6 +174,14 @@ func (p *peer) close() {
close(p.term)
}
+func (p *peer) addLabel(label peerLabel) {
+ p.labels.Add(label)
+}
+
+func (p *peer) removeLabel(label peerLabel) {
+ p.labels.Remove(label)
+}
+
// Info gathers and returns a collection of metadata known about a peer.
func (p *peer) Info() *PeerInfo {
hash, td := p.Head()
@@ -207,11 +232,11 @@ func (p *peer) MarkTransaction(hash common.Hash) {
p.knownTxs.Add(hash)
}
-func (p *peer) MarkNotaryNodeInfo(hash common.Hash) {
- for p.knownInfos.Cardinality() >= maxKnownInfos {
- p.knownInfos.Pop()
+func (p *peer) MarkNodeMeta(hash common.Hash) {
+ for p.knownMetas.Cardinality() >= maxKnownMetas {
+ p.knownMetas.Pop()
}
- p.knownInfos.Add(hash)
+ p.knownMetas.Add(hash)
}
// SendTransactions sends transactions to the peer and includes the hashes
@@ -236,21 +261,26 @@ func (p *peer) AsyncSendTransactions(txs []*types.Transaction) {
}
}
-// SendNotaryNodeInfo sends the info to the peer and includes the hashes
-// in its info hash set for future reference.
-func (p *peer) SendNotaryNodeInfo(info *notaryNodeInfo) error {
- return p2p.Send(p.rw, NotaryNodeInfoMsg, info)
+// SendNodeMetas sends the metas to the peer and includes the hashes
+// in its metas hash set for future reference.
+func (p *peer) SendNodeMetas(metas []*NodeMeta) error {
+ for _, meta := range metas {
+ p.knownMetas.Add(meta.Hash())
+ }
+ return p2p.Send(p.rw, MetaMsg, metas)
}
-// AsyncSendNotaryNodeInfo queues list of notary node info propagation to a
+// AsyncSendNodeMeta queues list of notary node meta propagation to a
// remote peer. If the peer's broadcast queue is full, the event is silently
// dropped.
-func (p *peer) AsyncSendNotaryNodeInfo(info *notaryNodeInfo) {
+func (p *peer) AsyncSendNodeMetas(metas []*NodeMeta) {
select {
- case p.queuedInfos <- info:
- p.knownInfos.Add(info.Hash())
+ case p.queuedMetas <- metas:
+ for _, meta := range metas {
+ p.knownMetas.Add(meta.Hash())
+ }
default:
- p.Log().Debug("Dropping notary node info propagation")
+ p.Log().Debug("Dropping node meta propagation", "count", len(metas))
}
}
@@ -431,7 +461,7 @@ func (p *peer) readStatus(network uint64, status *statusData, genesis common.Has
// String implements fmt.Stringer.
func (p *peer) String() string {
return fmt.Sprintf("Peer %s [%s]", p.id,
- fmt.Sprintf("eth/%2d", p.version),
+ fmt.Sprintf("dex/%2d", p.version),
)
}
@@ -441,18 +471,25 @@ type peerSet struct {
peers map[string]*peer
lock sync.RWMutex
closed bool
+ tab *nodeTable
- // TODO(sonic): revist this map after dexon core SDK is finalized.
- // use types.ValidatorID? or implement Stringer for types.ValidatorID
- notaryPeers map[uint64]map[string]*peer
- round uint64
+ srvr p2pServer
+ gov governance
+ peerLabels map[string]map[peerLabel]struct{}
+ notaryHistory map[uint64]struct{}
+ dkgHistory map[uint64]struct{}
}
// newPeerSet creates a new peer set to track the active participants.
-func newPeerSet() *peerSet {
+func newPeerSet(gov governance, srvr p2pServer, tab *nodeTable) *peerSet {
return &peerSet{
- peers: make(map[string]*peer),
- notaryPeers: make(map[uint64]map[string]*peer),
+ peers: make(map[string]*peer),
+ gov: gov,
+ srvr: srvr,
+ tab: tab,
+ peerLabels: make(map[string]map[peerLabel]struct{}),
+ notaryHistory: make(map[uint64]struct{}),
+ dkgHistory: make(map[uint64]struct{}),
}
}
@@ -537,14 +574,14 @@ func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer {
return list
}
-// PeersWithoutNotaryNodeInfo retrieves a list of peers that do not have a
-// given info in their set of known hashes.
-func (ps *peerSet) PeersWithoutNotaryNodeInfo(hash common.Hash) []*peer {
+// PeersWithoutNodeMeta retrieves a list of peers that do not have a
+// given meta in their set of known hashes.
+func (ps *peerSet) PeersWithoutNodeMeta(hash common.Hash) []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()
list := make([]*peer, 0, len(ps.peers))
for _, p := range ps.peers {
- if p.knownInfos.Contains(hash) {
+ if !p.knownMetas.Contains(hash) {
list = append(list, p)
}
}
@@ -580,44 +617,171 @@ func (ps *peerSet) Close() {
ps.closed = true
}
-// AddNotaryPeer adds a peer into notary peer of the given round.
-// Caller of this function need to make sure that the peer is acutally in
-// notary set.
-func (ps *peerSet) AddNotaryPeer(round uint64, p *peer) error {
+func (ps *peerSet) BuildNotaryConn(round uint64) {
ps.lock.Lock()
defer ps.lock.Unlock()
- // TODO(sonic): revisit this round check after dexon core SDK is finalized.
- if round < ps.round || round > ps.round+2 {
- return errInvalidRound
+ if _, ok := ps.notaryHistory[round]; ok {
+ return
}
- if _, ok := ps.peers[p.id]; !ok {
- return errNotRegistered
+ ps.notaryHistory[round] = struct{}{}
+
+ selfID := ps.srvr.Self().ID.String()
+ for chainID := uint32(0); chainID < ps.gov.GetChainNum(round); chainID++ {
+ s := ps.gov.GetNotarySet(chainID, round)
+
+ // not in notary set, add group
+ if _, ok := s[selfID]; !ok {
+ var nodes []*discover.Node
+ for id := range s {
+ nodes = append(nodes, ps.newNode(id))
+ }
+ ps.srvr.AddGroup(notarySetName(chainID, round), nodes, groupNodeNum)
+ continue
+ }
+
+ label := peerLabel{
+ set: notaryset,
+ chainID: chainID,
+ round: round,
+ }
+ delete(s, selfID)
+ for id := range s {
+ ps.addDirectPeer(id, label)
+ }
}
+}
- ps.notaryPeers[round][p.id] = p
- return nil
+func (ps *peerSet) ForgetNotaryConn(round uint64) {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
+
+ // forget all the rounds before the given round
+ for r := range ps.notaryHistory {
+ if r <= round {
+ ps.forgetNotaryConn(r)
+ delete(ps.notaryHistory, r)
+ }
+ }
}
-// NotaryPeers return peers in notary set of the given round.
-func (ps *peerSet) NotaryPeers(round uint64) []*peer {
- ps.lock.RLock()
- defer ps.lock.RUnlock()
+func (ps *peerSet) forgetNotaryConn(round uint64) {
+ selfID := ps.srvr.Self().ID.String()
+ for chainID := uint32(0); chainID < ps.gov.GetChainNum(round); chainID++ {
+ s := ps.gov.GetNotarySet(chainID, round)
+ if _, ok := s[selfID]; !ok {
+ ps.srvr.RemoveGroup(notarySetName(chainID, round))
+ continue
+ }
- list := make([]*peer, 0, len(ps.notaryPeers[round]))
- for _, p := range ps.notaryPeers[round] {
- if _, ok := ps.peers[p.id]; ok {
- list = append(list, p)
+ label := peerLabel{
+ set: notaryset,
+ chainID: chainID,
+ round: round,
+ }
+ delete(s, selfID)
+ for id := range s {
+ ps.removeDirectPeer(id, label)
}
}
- return list
}
-// NextRound moves notary peer set to next round.
-func (ps *peerSet) NextRound() {
+func notarySetName(chainID uint32, round uint64) string {
+ return fmt.Sprintf("%d-%d-notaryset", chainID, round)
+}
+
+func (ps *peerSet) BuildDKGConn(round uint64) {
ps.lock.Lock()
defer ps.lock.Unlock()
- delete(ps.notaryPeers, ps.round)
- ps.round = ps.round + 1
+ selfID := ps.srvr.Self().ID.String()
+ s := ps.gov.GetDKGSet(round)
+ if _, ok := s[selfID]; !ok {
+ return
+ }
+ ps.dkgHistory[round] = struct{}{}
+
+ delete(s, selfID)
+ for id := range s {
+ ps.addDirectPeer(id, peerLabel{
+ set: dkgset,
+ round: round,
+ })
+ }
+}
+
+func (ps *peerSet) ForgetDKGConn(round uint64) {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
+
+ // forget all the rounds before the given round
+ for r := range ps.dkgHistory {
+ if r <= round {
+ ps.forgetDKGConn(r)
+ delete(ps.dkgHistory, r)
+ }
+ }
+}
+
+func (ps *peerSet) forgetDKGConn(round uint64) {
+ selfID := ps.srvr.Self().ID.String()
+ s := ps.gov.GetDKGSet(round)
+ if _, ok := s[selfID]; !ok {
+ return
+ }
+
+ delete(s, selfID)
+ label := peerLabel{
+ set: dkgset,
+ round: round,
+ }
+ for id := range s {
+ ps.removeDirectPeer(id, label)
+ }
+}
+
+// make sure the ps.lock is hold
+func (ps *peerSet) addDirectPeer(id string, label peerLabel) {
+ // if the peer exists add the label
+ if p, ok := ps.peers[id]; ok {
+ p.addLabel(label)
+ }
+
+ if _, ok := ps.peerLabels[id]; !ok {
+ ps.peerLabels[id] = make(map[peerLabel]struct{})
+ }
+
+ ps.peerLabels[id][label] = struct{}{}
+ ps.srvr.AddDirectPeer(ps.newNode(id))
+}
+
+// make sure the ps.lock is hold
+func (ps *peerSet) removeDirectPeer(id string, label peerLabel) {
+ if p, ok := ps.peers[id]; ok {
+ p.removeLabel(label)
+ }
+
+ delete(ps.peerLabels[id], label)
+
+ if len(ps.peerLabels[id]) == 0 {
+ ps.srvr.RemoveDirectPeer(ps.newNode(id))
+ delete(ps.peerLabels, id)
+ }
+}
+
+func (ps *peerSet) newNode(id string) *enode.Node {
+ nodeID := enode.HexID(id)
+ meta := ps.tab.Get(enode.HexID(id))
+
+ var r enr.Record
+ r.Set(enr.ID(nodeID.String()))
+ r.Set(enr.IP(meta.IP))
+ r.Set(enr.TCP(meta.TCP))
+ r.Set(enr.UDP(meta.UDP))
+
+ n, err := enode.New(enode.ValidSchemes, &r)
+ if err != nil {
+ panic(err)
+ }
+ return n
}
diff --git a/dex/peer_test.go b/dex/peer_test.go
new file mode 100644
index 000000000..bac6ed5ec
--- /dev/null
+++ b/dex/peer_test.go
@@ -0,0 +1,628 @@
+package dex
+
+import (
+ "fmt"
+ "math/big"
+ "testing"
+
+ mapset "github.com/deckarep/golang-set"
+ "github.com/dexon-foundation/dexon/p2p/discover"
+ "github.com/dexon-foundation/dexon/p2p/enode"
+)
+
+func TestPeerSetBuildAndForgetNotaryConn(t *testing.T) {
+ self := discover.Node{ID: nodeID(0)}
+ server := newTestP2PServer(&self)
+ table := newNodeTable()
+
+ gov := &testGovernance{
+ getChainNumFunc: func(uint64) uint32 {
+ return 3
+ },
+ }
+
+ round10 := [][]enode.ID{
+ []enode.ID{nodeID(0), nodeID(1), nodeID(2)},
+ []enode.ID{nodeID(1), nodeID(3)},
+ []enode.ID{nodeID(2), nodeID(4)},
+ }
+ round11 := [][]enode.ID{
+ []enode.ID{nodeID(0), nodeID(1), nodeID(5)},
+ []enode.ID{nodeID(5), nodeID(6)},
+ []enode.ID{nodeID(0), nodeID(2), nodeID(4)},
+ }
+ round12 := [][]enode.ID{
+ []enode.ID{nodeID(0), nodeID(3), nodeID(5)},
+ []enode.ID{nodeID(0), nodeID(7), nodeID(8)},
+ []enode.ID{nodeID(0), nodeID(2), nodeID(6)},
+ }
+
+ gov.getNotarySetFunc = func(cid uint32, round uint64) map[string]struct{} {
+ m := map[uint64][][]enode.ID{
+ 10: round10,
+ 11: round11,
+ 12: round12,
+ }
+ return newTestNodeSet(m[round][cid])
+ }
+
+ ps := newPeerSet(gov, server, table)
+ peer1 := newDummyPeer(nodeID(1))
+ peer2 := newDummyPeer(nodeID(2))
+ var err error
+ err = ps.Register(peer1)
+ if err != nil {
+ t.Error(err)
+ }
+ err = ps.Register(peer2)
+ if err != nil {
+ t.Error(err)
+ }
+
+ // build round 10
+ ps.BuildNotaryConn(10)
+
+ err = checkLabels(peer1, []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkLabels(peer2, []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerLabels(ps, map[string][]peerLabel{
+ nodeID(1).String(): []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ },
+ nodeID(2).String(): []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ },
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerSetHistory(ps, []uint64{10}, notaryset)
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkDirectPeer(server, []enode.ID{
+ nodeID(1), nodeID(2),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkGroup(server, []string{
+ notarySetName(1, 10),
+ notarySetName(2, 10),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+
+ // build round 11
+ ps.BuildNotaryConn(11)
+
+ err = checkLabels(peer1, []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ peerLabel{notaryset, 0, 11},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkLabels(peer2, []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ peerLabel{notaryset, 2, 11},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerLabels(ps, map[string][]peerLabel{
+ nodeID(1).String(): []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ peerLabel{notaryset, 0, 11},
+ },
+ nodeID(2).String(): []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ peerLabel{notaryset, 2, 11},
+ },
+ nodeID(4).String(): []peerLabel{
+ peerLabel{notaryset, 2, 11},
+ },
+ nodeID(5).String(): []peerLabel{
+ peerLabel{notaryset, 0, 11},
+ },
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerSetHistory(ps, []uint64{10, 11}, notaryset)
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkDirectPeer(server, []enode.ID{
+ nodeID(1), nodeID(2), nodeID(4), nodeID(5),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkGroup(server, []string{
+ notarySetName(1, 10),
+ notarySetName(2, 10),
+ notarySetName(1, 11),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+
+ // build round 12
+ ps.BuildNotaryConn(12)
+
+ err = checkLabels(peer1, []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ peerLabel{notaryset, 0, 11},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkLabels(peer2, []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ peerLabel{notaryset, 2, 11},
+ peerLabel{notaryset, 2, 12},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerLabels(ps, map[string][]peerLabel{
+ nodeID(1).String(): []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ peerLabel{notaryset, 0, 11},
+ },
+ nodeID(2).String(): []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ peerLabel{notaryset, 2, 11},
+ peerLabel{notaryset, 2, 12},
+ },
+ nodeID(3).String(): []peerLabel{
+ peerLabel{notaryset, 0, 12},
+ },
+ nodeID(4).String(): []peerLabel{
+ peerLabel{notaryset, 2, 11},
+ },
+ nodeID(5).String(): []peerLabel{
+ peerLabel{notaryset, 0, 11},
+ peerLabel{notaryset, 0, 12},
+ },
+ nodeID(6).String(): []peerLabel{
+ peerLabel{notaryset, 2, 12},
+ },
+ nodeID(7).String(): []peerLabel{
+ peerLabel{notaryset, 1, 12},
+ },
+ nodeID(8).String(): []peerLabel{
+ peerLabel{notaryset, 1, 12},
+ },
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerSetHistory(ps, []uint64{10, 11, 12}, notaryset)
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkDirectPeer(server, []enode.ID{
+ nodeID(1), nodeID(2), nodeID(3), nodeID(4),
+ nodeID(5), nodeID(6), nodeID(7), nodeID(8),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkGroup(server, []string{
+ notarySetName(1, 10),
+ notarySetName(2, 10),
+ notarySetName(1, 11),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+
+ // forget round 11
+ ps.ForgetNotaryConn(11)
+
+ err = checkLabels(peer1, []peerLabel{})
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkLabels(peer2, []peerLabel{
+ peerLabel{notaryset, 2, 12},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerLabels(ps, map[string][]peerLabel{
+ nodeID(2).String(): []peerLabel{
+ peerLabel{notaryset, 2, 12},
+ },
+ nodeID(3).String(): []peerLabel{
+ peerLabel{notaryset, 0, 12},
+ },
+ nodeID(5).String(): []peerLabel{
+ peerLabel{notaryset, 0, 12},
+ },
+ nodeID(6).String(): []peerLabel{
+ peerLabel{notaryset, 2, 12},
+ },
+ nodeID(7).String(): []peerLabel{
+ peerLabel{notaryset, 1, 12},
+ },
+ nodeID(8).String(): []peerLabel{
+ peerLabel{notaryset, 1, 12},
+ },
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerSetHistory(ps, []uint64{12}, notaryset)
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkDirectPeer(server, []enode.ID{
+ nodeID(2), nodeID(3),
+ nodeID(5), nodeID(6), nodeID(7), nodeID(8),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkGroup(server, []string{})
+ if err != nil {
+ t.Error(err)
+ }
+
+ // forget round 12
+ ps.ForgetNotaryConn(12)
+ err = checkLabels(peer1, []peerLabel{})
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkLabels(peer2, []peerLabel{})
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerLabels(ps, map[string][]peerLabel{})
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerSetHistory(ps, []uint64{}, notaryset)
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkDirectPeer(server, []enode.ID{})
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkGroup(server, []string{})
+ if err != nil {
+ t.Error(err)
+ }
+
+}
+
+func TestPeerSetBuildDKGConn(t *testing.T) {
+ self := discover.Node{ID: nodeID(0)}
+ server := newTestP2PServer(&self)
+ table := newNodeTable()
+
+ gov := &testGovernance{}
+
+ gov.getDKGSetFunc = func(round uint64) map[string]struct{} {
+ m := map[uint64][]enode.ID{
+ 10: []enode.ID{nodeID(0), nodeID(1), nodeID(2)},
+ 11: []enode.ID{nodeID(1), nodeID(2), nodeID(5)},
+ 12: []enode.ID{nodeID(0), nodeID(3), nodeID(5)},
+ }
+ return newTestNodeSet(m[round])
+ }
+
+ ps := newPeerSet(gov, server, table)
+ peer1 := newDummyPeer(nodeID(1))
+ peer2 := newDummyPeer(nodeID(2))
+ var err error
+ err = ps.Register(peer1)
+ if err != nil {
+ t.Error(err)
+ }
+ err = ps.Register(peer2)
+ if err != nil {
+ t.Error(err)
+ }
+
+ // build round 10
+ ps.BuildDKGConn(10)
+
+ err = checkLabels(peer1, []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkLabels(peer2, []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerLabels(ps, map[string][]peerLabel{
+ nodeID(1).String(): []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ },
+ nodeID(2).String(): []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ },
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerSetHistory(ps, []uint64{10}, dkgset)
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkDirectPeer(server, []enode.ID{
+ nodeID(1), nodeID(2),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+
+ // build round 11
+ ps.BuildDKGConn(11)
+
+ err = checkLabels(peer1, []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkLabels(peer2, []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerLabels(ps, map[string][]peerLabel{
+ nodeID(1).String(): []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ },
+ nodeID(2).String(): []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ },
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerSetHistory(ps, []uint64{10}, dkgset)
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkDirectPeer(server, []enode.ID{
+ nodeID(1), nodeID(2),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+
+ // build round 12
+ ps.BuildDKGConn(12)
+
+ err = checkLabels(peer1, []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkLabels(peer2, []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerLabels(ps, map[string][]peerLabel{
+ nodeID(1).String(): []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ },
+ nodeID(2).String(): []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ },
+ nodeID(3).String(): []peerLabel{
+ peerLabel{dkgset, 0, 12},
+ },
+ nodeID(5).String(): []peerLabel{
+ peerLabel{dkgset, 0, 12},
+ },
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerSetHistory(ps, []uint64{10, 12}, dkgset)
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkDirectPeer(server, []enode.ID{
+ nodeID(1), nodeID(2), nodeID(3), nodeID(5),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+
+ // forget round 11
+ ps.ForgetDKGConn(11)
+
+ err = checkLabels(peer1, []peerLabel{})
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkLabels(peer2, []peerLabel{})
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerLabels(ps, map[string][]peerLabel{
+ nodeID(3).String(): []peerLabel{
+ peerLabel{dkgset, 0, 12},
+ },
+ nodeID(5).String(): []peerLabel{
+ peerLabel{dkgset, 0, 12},
+ },
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerSetHistory(ps, []uint64{12}, dkgset)
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkDirectPeer(server, []enode.ID{
+ nodeID(3), nodeID(5),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+
+ // forget round 12
+ ps.ForgetDKGConn(12)
+ err = checkLabels(peer1, []peerLabel{})
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkLabels(peer2, []peerLabel{})
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerLabels(ps, map[string][]peerLabel{})
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerSetHistory(ps, []uint64{}, dkgset)
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkDirectPeer(server, []enode.ID{})
+ if err != nil {
+ t.Error(err)
+ }
+}
+
+func checkLabels(p *peer, want []peerLabel) error {
+ if p.labels.Cardinality() != len(want) {
+ return fmt.Errorf("num of labels mismatch: got %d, want %d",
+ p.labels.Cardinality(), len(want))
+ }
+
+ for _, label := range want {
+ if !p.labels.Contains(label) {
+ return fmt.Errorf("label %+v not exist", label)
+ }
+ }
+ return nil
+}
+
+func checkPeerLabels(ps *peerSet, want map[string][]peerLabel) error {
+ if len(ps.peerLabels) != len(want) {
+ return fmt.Errorf("peer num mismatch: got %d, want %d",
+ len(ps.peerLabels), len(want))
+ }
+
+ for peerID, gotLabels := range ps.peerLabels {
+ wantLabels, ok := want[peerID]
+ if !ok {
+ return fmt.Errorf("peer id %s not exists", peerID)
+ }
+
+ if len(gotLabels) != len(wantLabels) {
+ return fmt.Errorf(
+ "num of labels of peer id %s mismatch: got %d, want %d",
+ peerID, len(gotLabels), len(wantLabels))
+ }
+
+ for _, label := range wantLabels {
+ if _, ok := gotLabels[label]; !ok {
+ fmt.Errorf("label: %+v not exists", label)
+ }
+ }
+ }
+ return nil
+}
+
+func checkPeerSetHistory(ps *peerSet, want []uint64, set setType) error {
+ var history map[uint64]struct{}
+ switch set {
+ case notaryset:
+ history = ps.notaryHistory
+ case dkgset:
+ history = ps.dkgHistory
+ default:
+ return fmt.Errorf("invalid set: %d", set)
+ }
+
+ if len(history) != len(want) {
+ return fmt.Errorf("num of history mismatch: got %d, want %d",
+ len(history), len(want))
+ }
+
+ for _, r := range want {
+ if _, ok := history[r]; !ok {
+ return fmt.Errorf("round %d not exists", r)
+ }
+ }
+ return nil
+}
+
+func checkDirectPeer(srvr *testP2PServer, want []enode.ID) error {
+ if len(srvr.direct) != len(want) {
+ return fmt.Errorf("num of direct peer mismatch: got %d, want %d",
+ len(srvr.direct), len(want))
+ }
+
+ for _, id := range want {
+ if _, ok := srvr.direct[id]; !ok {
+ return fmt.Errorf("direct peer %s not exists", id.String())
+ }
+ }
+ return nil
+}
+func checkGroup(srvr *testP2PServer, want []string) error {
+ if len(srvr.group) != len(want) {
+ return fmt.Errorf("num of group mismatch: got %d, want %d",
+ len(srvr.group), len(want))
+ }
+
+ for _, name := range want {
+ if _, ok := srvr.group[name]; !ok {
+ return fmt.Errorf("group %s not exists", name)
+ }
+ }
+ return nil
+}
+
+func nodeID(n int64) enode.ID {
+ b := big.NewInt(n).Bytes()
+ var id enode.ID
+ copy(id[len(id)-len(b):], b)
+ return id
+}
+
+func newTestNodeSet(nodes []enode.ID) map[string]struct{} {
+ m := make(map[string]struct{})
+ for _, node := range nodes {
+ m[node.String()] = struct{}{}
+ }
+ return m
+}
+
+func newDummyPeer(id enode.ID) *peer {
+ return &peer{
+ labels: mapset.NewSet(),
+ id: id.String(),
+ }
+}
diff --git a/dex/protocol.go b/dex/protocol.go
index 0111edf18..7b01217ff 100644
--- a/dex/protocol.go
+++ b/dex/protocol.go
@@ -20,14 +20,11 @@ import (
"fmt"
"io"
"math/big"
- "net"
"github.com/dexon-foundation/dexon/common"
"github.com/dexon-foundation/dexon/core"
"github.com/dexon-foundation/dexon/core/types"
- "github.com/dexon-foundation/dexon/crypto/sha3"
"github.com/dexon-foundation/dexon/event"
- "github.com/dexon-foundation/dexon/p2p/discover"
"github.com/dexon-foundation/dexon/p2p/enode"
"github.com/dexon-foundation/dexon/rlp"
)
@@ -67,7 +64,7 @@ const (
ReceiptsMsg = 0x10
// Protocol messages belonging to dex/64
- NotaryNodeInfoMsg = 0x11
+ MetaMsg = 0x11
)
type errCode int
@@ -114,12 +111,26 @@ type txPool interface {
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
}
+type governance interface {
+ GetChainNum(uint64) uint32
+
+ GetNotarySet(uint32, uint64) map[string]struct{}
+
+ GetDKGSet(uint64) map[string]struct{}
+
+ SubscribeNewCRSEvent(ch chan core.NewCRSEvent) event.Subscription
+}
+
type p2pServer interface {
Self() *enode.Node
- AddNotaryPeer(*discover.Node)
+ AddDirectPeer(*enode.Node)
+
+ RemoveDirectPeer(*enode.Node)
- RemoveNotaryPeer(*discover.Node)
+ AddGroup(string, []*enode.Node, uint64)
+
+ RemoveGroup(string)
}
// statusData is the network packet for the status message.
@@ -195,22 +206,3 @@ type blockBody struct {
// blockBodiesData is the network packet for block content distribution.
type blockBodiesData []*blockBody
-
-// TODO(sonic): revisit this msg when dexon core SDK is finalized.
-// notartyNodeInfo is the network packet for notary node ip info.
-type notaryNodeInfo struct {
- ID discover.NodeID
- IP net.IP
- UDP uint16
- TCP uint16
- Round uint64
- Sig []byte
- Timestamp int64
-}
-
-func (n *notaryNodeInfo) Hash() (h common.Hash) {
- hw := sha3.NewKeccak256()
- rlp.Encode(hw, n)
- hw.Sum(h[:0])
- return h
-}
diff --git a/dex/protocol_test.go b/dex/protocol_test.go
index c1b6efcfc..8c7638b2b 100644
--- a/dex/protocol_test.go
+++ b/dex/protocol_test.go
@@ -221,3 +221,85 @@ func TestGetBlockHeadersDataEncodeDecode(t *testing.T) {
}
}
}
+
+func TestRecvNodeMetas(t *testing.T) {
+ pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
+ p, _ := newTestPeer("peer", dex64, pm, true)
+ defer pm.Stop()
+ defer p.close()
+
+ meta := NodeMeta{
+ ID: nodeID(1),
+ }
+
+ ch := make(chan newMetasEvent)
+ pm.nodeTable.SubscribeNewMetasEvent(ch)
+
+ if err := p2p.Send(p.app, MetaMsg, []interface{}{meta}); err != nil {
+ t.Fatalf("send error: %v", err)
+ }
+
+ select {
+ case event := <-ch:
+ metas := event.Metas
+ if len(metas) != 1 {
+ t.Errorf("wrong number of new metas: got %d, want 1", len(metas))
+ } else if metas[0].Hash() != meta.Hash() {
+ t.Errorf("added wrong meta hash: got %v, want %v", metas[0].Hash(), meta.Hash())
+ }
+ case <-time.After(3 * time.Second):
+ t.Errorf("no newMetasEvent received within 3 seconds")
+ }
+}
+
+func TestSendNodeMetas(t *testing.T) {
+ pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
+ defer pm.Stop()
+
+ allmetas := make([]*NodeMeta, 100)
+ for nonce := range allmetas {
+ allmetas[nonce] = &NodeMeta{ID: nodeID(int64(nonce))}
+ }
+
+ // Connect several peers. They should all receive the pending transactions.
+ var wg sync.WaitGroup
+ checkmetas := func(p *testPeer) {
+ defer wg.Done()
+ defer p.close()
+ seen := make(map[common.Hash]bool)
+ for _, meta := range allmetas {
+ seen[meta.Hash()] = false
+ }
+ for n := 0; n < len(allmetas) && !t.Failed(); {
+ var metas []*NodeMeta
+ msg, err := p.app.ReadMsg()
+ if err != nil {
+ t.Errorf("%v: read error: %v", p.Peer, err)
+ } else if msg.Code != MetaMsg {
+ t.Errorf("%v: got code %d, want MetaMsg", p.Peer, msg.Code)
+ }
+ if err := msg.Decode(&metas); err != nil {
+ t.Errorf("%v: %v", p.Peer, err)
+ }
+ for _, meta := range metas {
+ hash := meta.Hash()
+ seenmeta, want := seen[hash]
+ if seenmeta {
+ t.Errorf("%v: got meta more than once: %x", p.Peer, hash)
+ }
+ if !want {
+ t.Errorf("%v: got unexpected meta: %x", p.Peer, hash)
+ }
+ seen[hash] = true
+ n++
+ }
+ }
+ }
+ for i := 0; i < 3; i++ {
+ p, _ := newTestPeer(fmt.Sprintf("peer #%d", i), dex64, pm, true)
+ wg.Add(1)
+ go checkmetas(p)
+ }
+ pm.nodeTable.Add(allmetas)
+ wg.Wait()
+}
diff --git a/dex/sync.go b/dex/sync.go
index d7fe748bc..5af6076bc 100644
--- a/dex/sync.go
+++ b/dex/sync.go
@@ -25,7 +25,7 @@ import (
"github.com/dexon-foundation/dexon/core/types"
"github.com/dexon-foundation/dexon/eth/downloader"
"github.com/dexon-foundation/dexon/log"
- "github.com/dexon-foundation/dexon/p2p/discover"
+ "github.com/dexon-foundation/dexon/p2p/enode"
)
const (
@@ -35,6 +35,9 @@ const (
// This is the target size for the packs of transactions sent by txsyncLoop.
// A pack can get larger than this if a single transactions exceeds this size.
txsyncPackSize = 100 * 1024
+
+ // This is the target number for the packs of metas sent by metasyncLoop.
+ metasyncPackNum = 1024
)
type txsync struct {
@@ -64,7 +67,7 @@ func (pm *ProtocolManager) syncTransactions(p *peer) {
// the transactions in small packs to one peer at a time.
func (pm *ProtocolManager) txsyncLoop() {
var (
- pending = make(map[discover.NodeID]*txsync)
+ pending = make(map[enode.ID]*txsync)
sending = false // whether a send is active
pack = new(txsync) // the pack that is being sent
done = make(chan error, 1) // result of the send
@@ -129,6 +132,94 @@ func (pm *ProtocolManager) txsyncLoop() {
}
}
+type metasync struct {
+ p *peer
+ metas []*NodeMeta
+}
+
+// syncNodeMetas starts sending all node metas to the given peer.
+func (pm *ProtocolManager) syncNodeMetas(p *peer) {
+ metas := pm.nodeTable.Metas()
+ if len(metas) == 0 {
+ return
+ }
+ select {
+ case pm.metasyncCh <- &metasync{p, metas}:
+ case <-pm.quitSync:
+ }
+}
+
+// metasyncLoop takes care of the initial node meta sync for each new
+// connection. When a new peer appears, we relay all currently node metas.
+// In order to minimise egress bandwidth usage, we send
+// the metas in small packs to one peer at a time.
+func (pm *ProtocolManager) metasyncLoop() {
+ var (
+ pending = make(map[enode.ID]*metasync)
+ sending = false // whether a send is active
+ pack = new(metasync) // the pack that is being sent
+ done = make(chan error, 1) // result of the send
+ )
+
+ // send starts a sending a pack of transactions from the sync.
+ send := func(s *metasync) {
+ // Fill pack with node metas up to the target num.
+ var num int
+ pack.p = s.p
+ pack.metas = pack.metas[:0]
+ for i := 0; i < len(s.metas) && num < metasyncPackNum; i++ {
+ pack.metas = append(pack.metas, s.metas[i])
+ num += 1
+ }
+ // Remove the metas that will be sent.
+ s.metas = s.metas[:copy(s.metas, s.metas[len(pack.metas):])]
+ if len(s.metas) == 0 {
+ delete(pending, s.p.ID())
+ }
+ // Send the pack in the background.
+ s.p.Log().Trace("Sending batch of transactions", "count", len(pack.metas), "bytes", num)
+ sending = true
+ go func() { done <- pack.p.SendNodeMetas(pack.metas) }()
+ }
+
+ // pick chooses the next pending sync.
+ pick := func() *metasync {
+ if len(pending) == 0 {
+ return nil
+ }
+ n := rand.Intn(len(pending)) + 1
+ for _, s := range pending {
+ if n--; n == 0 {
+ return s
+ }
+ }
+ return nil
+ }
+
+ for {
+ select {
+ case s := <-pm.metasyncCh:
+ pending[s.p.ID()] = s
+ if !sending {
+ send(s)
+ }
+ case err := <-done:
+ sending = false
+ // Stop tracking peers that cause send failures.
+ if err != nil {
+ pack.p.Log().Debug("NodeMeta send failed", "err", err)
+ delete(pending, pack.p.ID())
+ }
+ // Schedule the next send.
+ if s := pick(); s != nil {
+ send(s)
+ }
+ case <-pm.quitSync:
+ return
+ }
+ }
+}
+
// syncer is responsible for periodically synchronising with the network, both
// downloading hashes and blocks as well as handling the announcement handler.
func (pm *ProtocolManager) syncer() {