aboutsummaryrefslogtreecommitdiffstats
path: root/eth/sync.go
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-06-09 01:38:39 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-06-09 01:38:39 +0800
commit9ed166c196b07047299579e5ea2b6ece26aec5c6 (patch)
treed9c3f6ca949382f09f2a2cbc239096b441b853bb /eth/sync.go
parentfdccce781e94819ec9dc13ef6540a33efd3b26c6 (diff)
downloaddexon-9ed166c196b07047299579e5ea2b6ece26aec5c6.tar
dexon-9ed166c196b07047299579e5ea2b6ece26aec5c6.tar.gz
dexon-9ed166c196b07047299579e5ea2b6ece26aec5c6.tar.bz2
dexon-9ed166c196b07047299579e5ea2b6ece26aec5c6.tar.lz
dexon-9ed166c196b07047299579e5ea2b6ece26aec5c6.tar.xz
dexon-9ed166c196b07047299579e5ea2b6ece26aec5c6.tar.zst
dexon-9ed166c196b07047299579e5ea2b6ece26aec5c6.zip
eth: split and handle explicitly vs. download requested blocks
Diffstat (limited to 'eth/sync.go')
-rw-r--r--eth/sync.go56
1 files changed, 56 insertions, 0 deletions
diff --git a/eth/sync.go b/eth/sync.go
index 1a1cbdb47..f761f3cd1 100644
--- a/eth/sync.go
+++ b/eth/sync.go
@@ -12,6 +12,16 @@ import (
"github.com/ethereum/go-ethereum/logger/glog"
)
+const (
+ forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available
+ blockProcCycle = 500 * time.Millisecond // Time interval to check for new blocks to process
+ notifyCheckCycle = 100 * time.Millisecond // Time interval to allow hash notifies to fulfill before hard fetching
+ notifyArriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
+ notifyFetchTimeout = 5 * time.Second // Maximum alloted time to return an explicitly requested block
+ minDesiredPeerCount = 5 // Amount of peers desired to start syncing
+ blockProcAmount = 256
+)
+
// blockAnnounce is the hash notification of the availability of a new block in
// the network.
type blockAnnounce struct {
@@ -25,6 +35,7 @@ type blockAnnounce struct {
func (pm *ProtocolManager) fetcher() {
announces := make(map[common.Hash]*blockAnnounce)
request := make(map[*peer][]common.Hash)
+ pending := make(map[common.Hash]*blockAnnounce)
cycle := time.Tick(notifyCheckCycle)
// Iterate the block fetching until a quit is requested
@@ -38,11 +49,18 @@ func (pm *ProtocolManager) fetcher() {
}
case <-cycle:
+ // Clean up any expired block fetches
+ for hash, announce := range pending {
+ if time.Since(announce.time) > notifyFetchTimeout {
+ delete(pending, hash)
+ }
+ }
// Check if any notified blocks failed to arrive
for hash, announce := range announces {
if time.Since(announce.time) > notifyArriveTimeout {
if !pm.chainman.HasBlock(hash) {
request[announce.peer] = append(request[announce.peer], hash)
+ pending[hash] = announce
}
delete(announces, hash)
}
@@ -57,6 +75,44 @@ func (pm *ProtocolManager) fetcher() {
}
request = make(map[*peer][]common.Hash)
+ case filter := <-pm.newBlockCh:
+ // Blocks arrived, extract any explicit requests, return all else
+ var blocks types.Blocks
+ select {
+ case blocks = <-filter:
+ case <-pm.quitSync:
+ return
+ }
+
+ fetch, sync := []*types.Block{}, []*types.Block{}
+ for _, block := range blocks {
+ hash := block.Hash()
+ if _, ok := pending[hash]; ok {
+ fetch = append(fetch, block)
+ } else {
+ sync = append(sync, block)
+ }
+ }
+
+ select {
+ case filter <- sync:
+ case <-pm.quitSync:
+ return
+ }
+ // If any explicit fetches were replied to, import them
+ if len(fetch) > 0 {
+ go func() {
+ for _, block := range fetch {
+ if announce := pending[block.Hash()]; announce != nil {
+ if err := pm.importBlock(announce.peer, block, nil); err != nil {
+ glog.V(logger.Detail).Infof("Failed to import explicitly fetched block: %v", err)
+ return
+ }
+ }
+ }
+ }()
+ }
+
case <-pm.quitSync:
return
}