diff options
Diffstat (limited to 'dex/sync.go')
-rw-r--r-- | dex/sync.go | 95 |
1 files changed, 93 insertions, 2 deletions
diff --git a/dex/sync.go b/dex/sync.go index d7fe748bc..5af6076bc 100644 --- a/dex/sync.go +++ b/dex/sync.go @@ -25,7 +25,7 @@ import ( "github.com/dexon-foundation/dexon/core/types" "github.com/dexon-foundation/dexon/eth/downloader" "github.com/dexon-foundation/dexon/log" - "github.com/dexon-foundation/dexon/p2p/discover" + "github.com/dexon-foundation/dexon/p2p/enode" ) const ( @@ -35,6 +35,9 @@ const ( // This is the target size for the packs of transactions sent by txsyncLoop. // A pack can get larger than this if a single transactions exceeds this size. txsyncPackSize = 100 * 1024 + + // This is the target number for the packs of metas sent by metasyncLoop. + metasyncPackNum = 1024 ) type txsync struct { @@ -64,7 +67,7 @@ func (pm *ProtocolManager) syncTransactions(p *peer) { // the transactions in small packs to one peer at a time. func (pm *ProtocolManager) txsyncLoop() { var ( - pending = make(map[discover.NodeID]*txsync) + pending = make(map[enode.ID]*txsync) sending = false // whether a send is active pack = new(txsync) // the pack that is being sent done = make(chan error, 1) // result of the send @@ -129,6 +132,94 @@ func (pm *ProtocolManager) txsyncLoop() { } } +type metasync struct { + p *peer + metas []*NodeMeta +} + +// syncNodeMetas starts sending all node metas to the given peer. +func (pm *ProtocolManager) syncNodeMetas(p *peer) { + metas := pm.nodeTable.Metas() + if len(metas) == 0 { + return + } + select { + case pm.metasyncCh <- &metasync{p, metas}: + case <-pm.quitSync: + } +} + +// metasyncLoop takes care of the initial node meta sync for each new +// connection. When a new peer appears, we relay all currently node metas. +// In order to minimise egress bandwidth usage, we send +// the metas in small packs to one peer at a time. +func (pm *ProtocolManager) metasyncLoop() { + var ( + pending = make(map[enode.ID]*metasync) + sending = false // whether a send is active + pack = new(metasync) // the pack that is being sent + done = make(chan error, 1) // result of the send + ) + + // send starts a sending a pack of transactions from the sync. + send := func(s *metasync) { + // Fill pack with node metas up to the target num. + var num int + pack.p = s.p + pack.metas = pack.metas[:0] + for i := 0; i < len(s.metas) && num < metasyncPackNum; i++ { + pack.metas = append(pack.metas, s.metas[i]) + num += 1 + } + // Remove the metas that will be sent. + s.metas = s.metas[:copy(s.metas, s.metas[len(pack.metas):])] + if len(s.metas) == 0 { + delete(pending, s.p.ID()) + } + // Send the pack in the background. + s.p.Log().Trace("Sending batch of transactions", "count", len(pack.metas), "bytes", num) + sending = true + go func() { done <- pack.p.SendNodeMetas(pack.metas) }() + } + + // pick chooses the next pending sync. + pick := func() *metasync { + if len(pending) == 0 { + return nil + } + n := rand.Intn(len(pending)) + 1 + for _, s := range pending { + if n--; n == 0 { + return s + } + } + return nil + } + + for { + select { + case s := <-pm.metasyncCh: + pending[s.p.ID()] = s + if !sending { + send(s) + } + case err := <-done: + sending = false + // Stop tracking peers that cause send failures. + if err != nil { + pack.p.Log().Debug("NodeMeta send failed", "err", err) + delete(pending, pack.p.ID()) + } + // Schedule the next send. + if s := pick(); s != nil { + send(s) + } + case <-pm.quitSync: + return + } + } +} + // syncer is responsible for periodically synchronising with the network, both // downloading hashes and blocks as well as handling the announcement handler. func (pm *ProtocolManager) syncer() { |