aboutsummaryrefslogtreecommitdiffstats
path: root/dex/sync.go
diff options
context:
space:
mode:
Diffstat (limited to 'dex/sync.go')
-rw-r--r--dex/sync.go95
1 files changed, 93 insertions, 2 deletions
diff --git a/dex/sync.go b/dex/sync.go
index d7fe748bc..5af6076bc 100644
--- a/dex/sync.go
+++ b/dex/sync.go
@@ -25,7 +25,7 @@ import (
"github.com/dexon-foundation/dexon/core/types"
"github.com/dexon-foundation/dexon/eth/downloader"
"github.com/dexon-foundation/dexon/log"
- "github.com/dexon-foundation/dexon/p2p/discover"
+ "github.com/dexon-foundation/dexon/p2p/enode"
)
const (
@@ -35,6 +35,9 @@ const (
// This is the target size for the packs of transactions sent by txsyncLoop.
// A pack can get larger than this if a single transactions exceeds this size.
txsyncPackSize = 100 * 1024
+
+ // This is the target number for the packs of metas sent by metasyncLoop.
+ metasyncPackNum = 1024
)
type txsync struct {
@@ -64,7 +67,7 @@ func (pm *ProtocolManager) syncTransactions(p *peer) {
// the transactions in small packs to one peer at a time.
func (pm *ProtocolManager) txsyncLoop() {
var (
- pending = make(map[discover.NodeID]*txsync)
+ pending = make(map[enode.ID]*txsync)
sending = false // whether a send is active
pack = new(txsync) // the pack that is being sent
done = make(chan error, 1) // result of the send
@@ -129,6 +132,94 @@ func (pm *ProtocolManager) txsyncLoop() {
}
}
+type metasync struct {
+ p *peer
+ metas []*NodeMeta
+}
+
+// syncNodeMetas starts sending all node metas to the given peer.
+func (pm *ProtocolManager) syncNodeMetas(p *peer) {
+ metas := pm.nodeTable.Metas()
+ if len(metas) == 0 {
+ return
+ }
+ select {
+ case pm.metasyncCh <- &metasync{p, metas}:
+ case <-pm.quitSync:
+ }
+}
+
+// metasyncLoop takes care of the initial node meta sync for each new
+// connection. When a new peer appears, we relay all currently node metas.
+// In order to minimise egress bandwidth usage, we send
+// the metas in small packs to one peer at a time.
+func (pm *ProtocolManager) metasyncLoop() {
+ var (
+ pending = make(map[enode.ID]*metasync)
+ sending = false // whether a send is active
+ pack = new(metasync) // the pack that is being sent
+ done = make(chan error, 1) // result of the send
+ )
+
+ // send starts a sending a pack of transactions from the sync.
+ send := func(s *metasync) {
+ // Fill pack with node metas up to the target num.
+ var num int
+ pack.p = s.p
+ pack.metas = pack.metas[:0]
+ for i := 0; i < len(s.metas) && num < metasyncPackNum; i++ {
+ pack.metas = append(pack.metas, s.metas[i])
+ num += 1
+ }
+ // Remove the metas that will be sent.
+ s.metas = s.metas[:copy(s.metas, s.metas[len(pack.metas):])]
+ if len(s.metas) == 0 {
+ delete(pending, s.p.ID())
+ }
+ // Send the pack in the background.
+ s.p.Log().Trace("Sending batch of transactions", "count", len(pack.metas), "bytes", num)
+ sending = true
+ go func() { done <- pack.p.SendNodeMetas(pack.metas) }()
+ }
+
+ // pick chooses the next pending sync.
+ pick := func() *metasync {
+ if len(pending) == 0 {
+ return nil
+ }
+ n := rand.Intn(len(pending)) + 1
+ for _, s := range pending {
+ if n--; n == 0 {
+ return s
+ }
+ }
+ return nil
+ }
+
+ for {
+ select {
+ case s := <-pm.metasyncCh:
+ pending[s.p.ID()] = s
+ if !sending {
+ send(s)
+ }
+ case err := <-done:
+ sending = false
+ // Stop tracking peers that cause send failures.
+ if err != nil {
+ pack.p.Log().Debug("NodeMeta send failed", "err", err)
+ delete(pending, pack.p.ID())
+ }
+ // Schedule the next send.
+ if s := pick(); s != nil {
+ send(s)
+ }
+ case <-pm.quitSync:
+ return
+ }
+ }
+}
+
// syncer is responsible for periodically synchronising with the network, both
// downloading hashes and blocks as well as handling the announcement handler.
func (pm *ProtocolManager) syncer() {