aboutsummaryrefslogtreecommitdiffstats
path: root/eth/sync.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/sync.go')
-rw-r--r--eth/sync.go94
1 files changed, 94 insertions, 0 deletions
diff --git a/eth/sync.go b/eth/sync.go
index 8e4e3cfbe..a25d4d4fd 100644
--- a/eth/sync.go
+++ b/eth/sync.go
@@ -2,6 +2,7 @@ package eth
import (
"math"
+ "math/rand"
"sync/atomic"
"time"
@@ -10,6 +11,7 @@ import (
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
+ "github.com/ethereum/go-ethereum/p2p/discover"
)
const (
@@ -20,6 +22,10 @@ const (
notifyFetchTimeout = 5 * time.Second // Maximum alloted time to return an explicitly requested block
minDesiredPeerCount = 5 // Amount of peers desired to start syncing
blockProcAmount = 256
+
+ // This is the target size for the packs of transactions sent by txsyncLoop.
+ // A pack can get larger than this if a single transactions exceeds this size.
+ txsyncPackSize = 100 * 1024
)
// blockAnnounce is the hash notification of the availability of a new block in
@@ -30,6 +36,94 @@ type blockAnnounce struct {
time time.Time
}
+type txsync struct {
+ p *peer
+ txs []*types.Transaction
+}
+
+// syncTransactions starts sending all currently pending transactions to the given peer.
+func (pm *ProtocolManager) syncTransactions(p *peer) {
+ txs := pm.txpool.GetTransactions()
+ if len(txs) == 0 {
+ return
+ }
+ select {
+ case pm.txsyncCh <- &txsync{p, txs}:
+ case <-pm.quitSync:
+ }
+}
+
+// txsyncLoop takes care of the initial transaction sync for each new
+// connection. When a new peer appears, we relay all currently pending
+// transactions. In order to minimise egress bandwidth usage, we send
+// the transactions in small packs to one peer at a time.
+func (pm *ProtocolManager) txsyncLoop() {
+ var (
+ pending = make(map[discover.NodeID]*txsync)
+ sending = false // whether a send is active
+ pack = new(txsync) // the pack that is being sent
+ done = make(chan error, 1) // result of the send
+ )
+
+ // send starts a sending a pack of transactions from the sync.
+ send := func(s *txsync) {
+ // Fill pack with transactions up to the target size.
+ size := common.StorageSize(0)
+ pack.p = s.p
+ pack.txs = pack.txs[:0]
+ for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
+ pack.txs = append(pack.txs, s.txs[i])
+ size += s.txs[i].Size()
+ }
+ // Remove the transactions that will be sent.
+ s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
+ if len(s.txs) == 0 {
+ delete(pending, s.p.ID())
+ }
+ // Send the pack in the background.
+ glog.V(logger.Detail).Infof("%v: sending %d transactions (%v)", s.p.Peer, len(pack.txs), size)
+ sending = true
+ go func() { done <- pack.p.sendTransactions(pack.txs) }()
+ }
+
+ // pick chooses the next pending sync.
+ pick := func() *txsync {
+ if len(pending) == 0 {
+ return nil
+ }
+ n := rand.Intn(len(pending)) + 1
+ for _, s := range pending {
+ if n--; n == 0 {
+ return s
+ }
+ }
+ return nil
+ }
+
+ for {
+ select {
+ case s := <-pm.txsyncCh:
+ pending[s.p.ID()] = s
+ if !sending {
+ send(s)
+ }
+ case err := <-done:
+ sending = false
+ // Stop tracking peers that cause send failures.
+ if err != nil {
+ glog.V(logger.Debug).Infof("%v: tx send failed: %v", pack.p.Peer, err)
+ delete(pending, pack.p.ID())
+ }
+ // Schedule the next send.
+ if s := pick(); s != nil {
+ send(s)
+ }
+ case <-pm.quitSync:
+ return
+ }
+ }
+}
+
// fetcher is responsible for collecting hash notifications, and periodically
// checking all unknown ones and individually fetching them.
func (pm *ProtocolManager) fetcher() {