diff options
author | Sonic <sonic@cobinhood.com> | 2018-09-25 20:37:11 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@byzantine-lab.io> | 2019-06-12 17:23:38 +0800 |
commit | c08afdbc7741d4559683ff304157303af8766c89 (patch) | |
tree | 5125f6984343b51e3b95230f6f2816fa11a11386 /dex/sync.go | |
parent | 8b977d488bce2d9d9d29a4ddc460b1ffac44aed2 (diff) | |
download | go-tangerine-c08afdbc7741d4559683ff304157303af8766c89.tar go-tangerine-c08afdbc7741d4559683ff304157303af8766c89.tar.gz go-tangerine-c08afdbc7741d4559683ff304157303af8766c89.tar.bz2 go-tangerine-c08afdbc7741d4559683ff304157303af8766c89.tar.lz go-tangerine-c08afdbc7741d4559683ff304157303af8766c89.tar.xz go-tangerine-c08afdbc7741d4559683ff304157303af8766c89.tar.zst go-tangerine-c08afdbc7741d4559683ff304157303af8766c89.zip |
dex: redesign p2p network topology
- Let p2p server support direct connection and group connection.
- Introduce node meta table to maintain IP of all nodes in node set,
in memory and let nodes in the network can sync this table.
- Let peerSet able to manage direct connections to notary set and dkg set.
The mechanism to refresh the network topology when configuration round
change is not done yet.
Diffstat (limited to 'dex/sync.go')
-rw-r--r-- | dex/sync.go | 95 |
1 files changed, 93 insertions, 2 deletions
diff --git a/dex/sync.go b/dex/sync.go index d7fe748bc..5af6076bc 100644 --- a/dex/sync.go +++ b/dex/sync.go @@ -25,7 +25,7 @@ import ( "github.com/dexon-foundation/dexon/core/types" "github.com/dexon-foundation/dexon/eth/downloader" "github.com/dexon-foundation/dexon/log" - "github.com/dexon-foundation/dexon/p2p/discover" + "github.com/dexon-foundation/dexon/p2p/enode" ) const ( @@ -35,6 +35,9 @@ const ( // This is the target size for the packs of transactions sent by txsyncLoop. // A pack can get larger than this if a single transactions exceeds this size. txsyncPackSize = 100 * 1024 + + // This is the target number for the packs of metas sent by metasyncLoop. + metasyncPackNum = 1024 ) type txsync struct { @@ -64,7 +67,7 @@ func (pm *ProtocolManager) syncTransactions(p *peer) { // the transactions in small packs to one peer at a time. func (pm *ProtocolManager) txsyncLoop() { var ( - pending = make(map[discover.NodeID]*txsync) + pending = make(map[enode.ID]*txsync) sending = false // whether a send is active pack = new(txsync) // the pack that is being sent done = make(chan error, 1) // result of the send @@ -129,6 +132,94 @@ func (pm *ProtocolManager) txsyncLoop() { } } +type metasync struct { + p *peer + metas []*NodeMeta +} + +// syncNodeMetas starts sending all node metas to the given peer. +func (pm *ProtocolManager) syncNodeMetas(p *peer) { + metas := pm.nodeTable.Metas() + if len(metas) == 0 { + return + } + select { + case pm.metasyncCh <- &metasync{p, metas}: + case <-pm.quitSync: + } +} + +// metasyncLoop takes care of the initial node meta sync for each new +// connection. When a new peer appears, we relay all currently node metas. +// In order to minimise egress bandwidth usage, we send +// the metas in small packs to one peer at a time. +func (pm *ProtocolManager) metasyncLoop() { + var ( + pending = make(map[enode.ID]*metasync) + sending = false // whether a send is active + pack = new(metasync) // the pack that is being sent + done = make(chan error, 1) // result of the send + ) + + // send starts a sending a pack of transactions from the sync. + send := func(s *metasync) { + // Fill pack with node metas up to the target num. + var num int + pack.p = s.p + pack.metas = pack.metas[:0] + for i := 0; i < len(s.metas) && num < metasyncPackNum; i++ { + pack.metas = append(pack.metas, s.metas[i]) + num += 1 + } + // Remove the metas that will be sent. + s.metas = s.metas[:copy(s.metas, s.metas[len(pack.metas):])] + if len(s.metas) == 0 { + delete(pending, s.p.ID()) + } + // Send the pack in the background. + s.p.Log().Trace("Sending batch of transactions", "count", len(pack.metas), "bytes", num) + sending = true + go func() { done <- pack.p.SendNodeMetas(pack.metas) }() + } + + // pick chooses the next pending sync. + pick := func() *metasync { + if len(pending) == 0 { + return nil + } + n := rand.Intn(len(pending)) + 1 + for _, s := range pending { + if n--; n == 0 { + return s + } + } + return nil + } + + for { + select { + case s := <-pm.metasyncCh: + pending[s.p.ID()] = s + if !sending { + send(s) + } + case err := <-done: + sending = false + // Stop tracking peers that cause send failures. + if err != nil { + pack.p.Log().Debug("NodeMeta send failed", "err", err) + delete(pending, pack.p.ID()) + } + // Schedule the next send. + if s := pick(); s != nil { + send(s) + } + case <-pm.quitSync: + return + } + } +} + // syncer is responsible for periodically synchronising with the network, both // downloading hashes and blocks as well as handling the announcement handler. func (pm *ProtocolManager) syncer() { |