aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader')
-rw-r--r--eth/downloader/downloader.go317
-rw-r--r--eth/downloader/downloader_test.go192
-rw-r--r--eth/downloader/queue.go169
-rw-r--r--eth/downloader/statesync.go31
4 files changed, 314 insertions, 395 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 746c6a402..7f490d9e9 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -18,10 +18,8 @@
package downloader
import (
- "crypto/rand"
"errors"
"fmt"
- "math"
"math/big"
"sync"
"sync/atomic"
@@ -61,12 +59,11 @@ var (
maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
maxResultsProcess = 2048 // Number of content download results to import at once into the chain
- fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync
- fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected
- fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it
- fsPivotInterval = 256 // Number of headers out of which to randomize the pivot point
- fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in fast sync
- fsCriticalTrials = uint32(32) // Number of times to retry in the cricical section before bailing
+ fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync
+ fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected
+ fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it
+ fsHeaderContCheck = 3 * time.Second // Time interval to check for header continuations during state download
+ fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in fast sync
)
var (
@@ -102,9 +99,6 @@ type Downloader struct {
peers *peerSet // Set of active peers from which download can proceed
stateDB ethdb.Database
- fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries)
- fsPivotFails uint32 // Number of subsequent fast sync failures in the critical section
-
rttEstimate uint64 // Round trip time to target for download requests
rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
@@ -124,6 +118,7 @@ type Downloader struct {
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
synchronising int32
notified int32
+ committed int32
// Channels
headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
@@ -156,7 +151,7 @@ type Downloader struct {
// LightChain encapsulates functions required to synchronise a light chain.
type LightChain interface {
// HasHeader verifies a header's presence in the local chain.
- HasHeader(h common.Hash, number uint64) bool
+ HasHeader(common.Hash, uint64) bool
// GetHeaderByHash retrieves a header from the local chain.
GetHeaderByHash(common.Hash) *types.Header
@@ -179,7 +174,7 @@ type BlockChain interface {
LightChain
// HasBlockAndState verifies block and associated states' presence in the local chain.
- HasBlockAndState(common.Hash) bool
+ HasBlockAndState(common.Hash, uint64) bool
// GetBlockByHash retrieves a block from the local chain.
GetBlockByHash(common.Hash) *types.Block
@@ -391,9 +386,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
// Set the requested sync mode, unless it's forbidden
d.mode = mode
- if d.mode == FastSync && atomic.LoadUint32(&d.fsPivotFails) >= fsCriticalTrials {
- d.mode = FullSync
- }
+
// Retrieve the origin peer and initiate the downloading process
p := d.peers.Peer(id)
if p == nil {
@@ -441,57 +434,40 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
d.syncStatsChainHeight = height
d.syncStatsLock.Unlock()
- // Initiate the sync using a concurrent header and content retrieval algorithm
+ // Ensure our origin point is below any fast sync pivot point
pivot := uint64(0)
- switch d.mode {
- case LightSync:
- pivot = height
- case FastSync:
- // Calculate the new fast/slow sync pivot point
- if d.fsPivotLock == nil {
- pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
- if err != nil {
- panic(fmt.Sprintf("Failed to access crypto random source: %v", err))
- }
- if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {
- pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64()
- }
+ if d.mode == FastSync {
+ if height <= uint64(fsMinFullBlocks) {
+ origin = 0
} else {
- // Pivot point locked in, use this and do not pick a new one!
- pivot = d.fsPivotLock.Number.Uint64()
- }
- // If the point is below the origin, move origin back to ensure state download
- if pivot < origin {
- if pivot > 0 {
+ pivot = height - uint64(fsMinFullBlocks)
+ if pivot <= origin {
origin = pivot - 1
- } else {
- origin = 0
}
}
- log.Debug("Fast syncing until pivot block", "pivot", pivot)
}
- d.queue.Prepare(origin+1, d.mode, pivot, latest)
+ d.committed = 1
+ if d.mode == FastSync && pivot != 0 {
+ d.committed = 0
+ }
+ // Initiate the sync using a concurrent header and content retrieval algorithm
+ d.queue.Prepare(origin+1, d.mode)
if d.syncInitHook != nil {
d.syncInitHook(origin, height)
}
fetchers := []func() error{
- func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
- func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
- func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
- func() error { return d.processHeaders(origin+1, td) },
+ func() error { return d.fetchHeaders(p, origin+1, pivot) }, // Headers are always retrieved
+ func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
+ func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
+ func() error { return d.processHeaders(origin+1, pivot, td) },
}
if d.mode == FastSync {
fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) })
} else if d.mode == FullSync {
fetchers = append(fetchers, d.processFullSyncContent)
}
- err = d.spawnSync(fetchers)
- if err != nil && d.mode == FastSync && d.fsPivotLock != nil {
- // If sync failed in the critical section, bump the fail counter.
- atomic.AddUint32(&d.fsPivotFails, 1)
- }
- return err
+ return d.spawnSync(fetchers)
}
// spawnSync runs d.process and all given fetcher functions to completion in
@@ -671,7 +647,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err
continue
}
// Otherwise check if we already know the header or not
- if (d.mode == FullSync && d.blockchain.HasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.lightchain.HasHeader(headers[i].Hash(), headers[i].Number.Uint64())) {
+ if (d.mode == FullSync && d.blockchain.HasBlockAndState(headers[i].Hash(), headers[i].Number.Uint64())) || (d.mode != FullSync && d.lightchain.HasHeader(headers[i].Hash(), headers[i].Number.Uint64())) {
number, hash = headers[i].Number.Uint64(), headers[i].Hash()
// If every header is known, even future ones, the peer straight out lied about its head
@@ -736,7 +712,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err
arrived = true
// Modify the search interval based on the response
- if (d.mode == FullSync && !d.blockchain.HasBlockAndState(headers[0].Hash())) || (d.mode != FullSync && !d.lightchain.HasHeader(headers[0].Hash(), headers[0].Number.Uint64())) {
+ if (d.mode == FullSync && !d.blockchain.HasBlockAndState(headers[0].Hash(), headers[0].Number.Uint64())) || (d.mode != FullSync && !d.lightchain.HasHeader(headers[0].Hash(), headers[0].Number.Uint64())) {
end = check
break
}
@@ -774,7 +750,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err
// other peers are only accepted if they map cleanly to the skeleton. If no one
// can fill in the skeleton - not even the origin peer - it's assumed invalid and
// the origin is dropped.
-func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error {
+func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) error {
p.log.Debug("Directing header downloads", "origin", from)
defer p.log.Debug("Header download terminated")
@@ -825,6 +801,18 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error {
}
// If no more headers are inbound, notify the content fetchers and return
if packet.Items() == 0 {
+ // Don't abort header fetches while the pivot is downloading
+ if atomic.LoadInt32(&d.committed) == 0 && pivot <= from {
+ p.log.Debug("No headers, waiting for pivot commit")
+ select {
+ case <-time.After(fsHeaderContCheck):
+ getHeaders(from)
+ continue
+ case <-d.cancelCh:
+ return errCancelHeaderFetch
+ }
+ }
+ // Pivot done (or not in fast sync) and no more headers, terminate the process
p.log.Debug("No more headers available")
select {
case d.headerProcCh <- nil:
@@ -1129,10 +1117,8 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
}
if request.From > 0 {
peer.log.Trace("Requesting new batch of data", "type", kind, "from", request.From)
- } else if len(request.Headers) > 0 {
- peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Headers), "from", request.Headers[0].Number)
} else {
- peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Hashes))
+ peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Headers), "from", request.Headers[0].Number)
}
// Fetch the chunk and make sure any errors return the hashes to the queue
if fetchHook != nil {
@@ -1160,10 +1146,7 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
// processHeaders takes batches of retrieved headers from an input channel and
// keeps processing and scheduling them into the header chain and downloader's
// queue until the stream ends or a failure occurs.
-func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
- // Calculate the pivoting point for switching from fast to slow sync
- pivot := d.queue.FastSyncPivot()
-
+func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error {
// Keep a count of uncertain headers to roll back
rollback := []*types.Header{}
defer func() {
@@ -1188,19 +1171,6 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
"header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number),
"fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock),
"block", fmt.Sprintf("%d->%d", lastBlock, curBlock))
-
- // If we're already past the pivot point, this could be an attack, thread carefully
- if rollback[len(rollback)-1].Number.Uint64() > pivot {
- // If we didn't ever fail, lock in the pivot header (must! not! change!)
- if atomic.LoadUint32(&d.fsPivotFails) == 0 {
- for _, header := range rollback {
- if header.Number.Uint64() == pivot {
- log.Warn("Fast-sync pivot locked in", "number", pivot, "hash", header.Hash())
- d.fsPivotLock = header
- }
- }
- }
- }
}
}()
@@ -1302,13 +1272,6 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
}
}
- // If we're fast syncing and just pulled in the pivot, make sure it's the one locked in
- if d.mode == FastSync && d.fsPivotLock != nil && chunk[0].Number.Uint64() <= pivot && chunk[len(chunk)-1].Number.Uint64() >= pivot {
- if pivot := chunk[int(pivot-chunk[0].Number.Uint64())]; pivot.Hash() != d.fsPivotLock.Hash() {
- log.Warn("Pivot doesn't match locked in one", "remoteNumber", pivot.Number, "remoteHash", pivot.Hash(), "localNumber", d.fsPivotLock.Number, "localHash", d.fsPivotLock.Hash())
- return errInvalidChain
- }
- }
// Unless we're doing light chains, schedule the headers for associated content retrieval
if d.mode == FullSync || d.mode == FastSync {
// If we've reached the allowed number of pending headers, stall a bit
@@ -1343,7 +1306,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
// processFullSyncContent takes fetch results from the queue and imports them into the chain.
func (d *Downloader) processFullSyncContent() error {
for {
- results := d.queue.WaitResults()
+ results := d.queue.Results(true)
if len(results) == 0 {
return nil
}
@@ -1357,30 +1320,28 @@ func (d *Downloader) processFullSyncContent() error {
}
func (d *Downloader) importBlockResults(results []*fetchResult) error {
- for len(results) != 0 {
- // Check for any termination requests. This makes clean shutdown faster.
- select {
- case <-d.quitCh:
- return errCancelContentProcessing
- default:
- }
- // Retrieve the a batch of results to import
- items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
- first, last := results[0].Header, results[items-1].Header
- log.Debug("Inserting downloaded chain", "items", len(results),
- "firstnum", first.Number, "firsthash", first.Hash(),
- "lastnum", last.Number, "lasthash", last.Hash(),
- )
- blocks := make([]*types.Block, items)
- for i, result := range results[:items] {
- blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
- }
- if index, err := d.blockchain.InsertChain(blocks); err != nil {
- log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
- return errInvalidChain
- }
- // Shift the results to the next batch
- results = results[items:]
+ // Check for any early termination requests
+ if len(results) == 0 {
+ return nil
+ }
+ select {
+ case <-d.quitCh:
+ return errCancelContentProcessing
+ default:
+ }
+ // Retrieve the a batch of results to import
+ first, last := results[0].Header, results[len(results)-1].Header
+ log.Debug("Inserting downloaded chain", "items", len(results),
+ "firstnum", first.Number, "firsthash", first.Hash(),
+ "lastnum", last.Number, "lasthash", last.Hash(),
+ )
+ blocks := make([]*types.Block, len(results))
+ for i, result := range results {
+ blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
+ }
+ if index, err := d.blockchain.InsertChain(blocks); err != nil {
+ log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
+ return errInvalidChain
}
return nil
}
@@ -1388,35 +1349,92 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
// processFastSyncContent takes fetch results from the queue and writes them to the
// database. It also controls the synchronisation of state nodes of the pivot block.
func (d *Downloader) processFastSyncContent(latest *types.Header) error {
- // Start syncing state of the reported head block.
- // This should get us most of the state of the pivot block.
+ // Start syncing state of the reported head block. This should get us most of
+ // the state of the pivot block.
stateSync := d.syncState(latest.Root)
defer stateSync.Cancel()
go func() {
- if err := stateSync.Wait(); err != nil {
+ if err := stateSync.Wait(); err != nil && err != errCancelStateFetch {
d.queue.Close() // wake up WaitResults
}
}()
-
- pivot := d.queue.FastSyncPivot()
+ // Figure out the ideal pivot block. Note, that this goalpost may move if the
+ // sync takes long enough for the chain head to move significantly.
+ pivot := uint64(0)
+ if height := latest.Number.Uint64(); height > uint64(fsMinFullBlocks) {
+ pivot = height - uint64(fsMinFullBlocks)
+ }
+ // To cater for moving pivot points, track the pivot block and subsequently
+ // accumulated download results separatey.
+ var (
+ oldPivot *fetchResult // Locked in pivot block, might change eventually
+ oldTail []*fetchResult // Downloaded content after the pivot
+ )
for {
- results := d.queue.WaitResults()
+ // Wait for the next batch of downloaded data to be available, and if the pivot
+ // block became stale, move the goalpost
+ results := d.queue.Results(oldPivot == nil) // Block if we're not monitoring pivot staleness
if len(results) == 0 {
- return stateSync.Cancel()
+ // If pivot sync is done, stop
+ if oldPivot == nil {
+ return stateSync.Cancel()
+ }
+ // If sync failed, stop
+ select {
+ case <-d.cancelCh:
+ return stateSync.Cancel()
+ default:
+ }
}
if d.chainInsertHook != nil {
d.chainInsertHook(results)
}
+ if oldPivot != nil {
+ results = append(append([]*fetchResult{oldPivot}, oldTail...), results...)
+ }
+ // Split around the pivot block and process the two sides via fast/full sync
+ if atomic.LoadInt32(&d.committed) == 0 {
+ latest = results[len(results)-1].Header
+ if height := latest.Number.Uint64(); height > pivot+2*uint64(fsMinFullBlocks) {
+ log.Warn("Pivot became stale, moving", "old", pivot, "new", height-uint64(fsMinFullBlocks))
+ pivot = height - uint64(fsMinFullBlocks)
+ }
+ }
P, beforeP, afterP := splitAroundPivot(pivot, results)
if err := d.commitFastSyncData(beforeP, stateSync); err != nil {
return err
}
if P != nil {
- stateSync.Cancel()
- if err := d.commitPivotBlock(P); err != nil {
- return err
+ // If new pivot block found, cancel old state retrieval and restart
+ if oldPivot != P {
+ stateSync.Cancel()
+
+ stateSync = d.syncState(P.Header.Root)
+ defer stateSync.Cancel()
+ go func() {
+ if err := stateSync.Wait(); err != nil && err != errCancelStateFetch {
+ d.queue.Close() // wake up WaitResults
+ }
+ }()
+ oldPivot = P
+ }
+ // Wait for completion, occasionally checking for pivot staleness
+ select {
+ case <-stateSync.done:
+ if stateSync.err != nil {
+ return stateSync.err
+ }
+ if err := d.commitPivotBlock(P); err != nil {
+ return err
+ }
+ oldPivot = nil
+
+ case <-time.After(time.Second):
+ oldTail = afterP
+ continue
}
}
+ // Fast sync done, pivot commit done, full import
if err := d.importBlockResults(afterP); err != nil {
return err
}
@@ -1439,52 +1457,49 @@ func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, bef
}
func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *stateSync) error {
- for len(results) != 0 {
- // Check for any termination requests.
- select {
- case <-d.quitCh:
- return errCancelContentProcessing
- case <-stateSync.done:
- if err := stateSync.Wait(); err != nil {
- return err
- }
- default:
- }
- // Retrieve the a batch of results to import
- items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
- first, last := results[0].Header, results[items-1].Header
- log.Debug("Inserting fast-sync blocks", "items", len(results),
- "firstnum", first.Number, "firsthash", first.Hash(),
- "lastnumn", last.Number, "lasthash", last.Hash(),
- )
- blocks := make([]*types.Block, items)
- receipts := make([]types.Receipts, items)
- for i, result := range results[:items] {
- blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
- receipts[i] = result.Receipts
- }
- if index, err := d.blockchain.InsertReceiptChain(blocks, receipts); err != nil {
- log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
- return errInvalidChain
+ // Check for any early termination requests
+ if len(results) == 0 {
+ return nil
+ }
+ select {
+ case <-d.quitCh:
+ return errCancelContentProcessing
+ case <-stateSync.done:
+ if err := stateSync.Wait(); err != nil {
+ return err
}
- // Shift the results to the next batch
- results = results[items:]
+ default:
+ }
+ // Retrieve the a batch of results to import
+ first, last := results[0].Header, results[len(results)-1].Header
+ log.Debug("Inserting fast-sync blocks", "items", len(results),
+ "firstnum", first.Number, "firsthash", first.Hash(),
+ "lastnumn", last.Number, "lasthash", last.Hash(),
+ )
+ blocks := make([]*types.Block, len(results))
+ receipts := make([]types.Receipts, len(results))
+ for i, result := range results {
+ blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
+ receipts[i] = result.Receipts
+ }
+ if index, err := d.blockchain.InsertReceiptChain(blocks, receipts); err != nil {
+ log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
+ return errInvalidChain
}
return nil
}
func (d *Downloader) commitPivotBlock(result *fetchResult) error {
- b := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
- // Sync the pivot block state. This should complete reasonably quickly because
- // we've already synced up to the reported head block state earlier.
- if err := d.syncState(b.Root()).Wait(); err != nil {
+ block := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
+ log.Debug("Committing fast sync pivot as new head", "number", block.Number(), "hash", block.Hash())
+ if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, []types.Receipts{result.Receipts}); err != nil {
return err
}
- log.Debug("Committing fast sync pivot as new head", "number", b.Number(), "hash", b.Hash())
- if _, err := d.blockchain.InsertReceiptChain([]*types.Block{b}, []types.Receipts{result.Receipts}); err != nil {
+ if err := d.blockchain.FastSyncCommitHead(block.Hash()); err != nil {
return err
}
- return d.blockchain.FastSyncCommitHead(b.Hash())
+ atomic.StoreInt32(&d.committed, 1)
+ return nil
}
// DeliverHeaders injects a new batch of block headers received from a remote
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index e9c7b6170..d94d55f11 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -28,7 +28,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
- "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
@@ -45,8 +44,8 @@ var (
// Reduce some of the parameters to make the tester faster.
func init() {
MaxForkAncestry = uint64(10000)
- blockCacheLimit = 1024
- fsCriticalTrials = 10
+ blockCacheItems = 1024
+ fsHeaderContCheck = 500 * time.Millisecond
}
// downloadTester is a test simulator for mocking out local block chain.
@@ -223,7 +222,7 @@ func (dl *downloadTester) HasHeader(hash common.Hash, number uint64) bool {
}
// HasBlockAndState checks if a block and associated state is present in the testers canonical chain.
-func (dl *downloadTester) HasBlockAndState(hash common.Hash) bool {
+func (dl *downloadTester) HasBlockAndState(hash common.Hash, number uint64) bool {
block := dl.GetBlockByHash(hash)
if block == nil {
return false
@@ -293,7 +292,7 @@ func (dl *downloadTester) CurrentFastBlock() *types.Block {
func (dl *downloadTester) FastSyncCommitHead(hash common.Hash) error {
// For now only check that the state trie is correct
if block := dl.GetBlockByHash(hash); block != nil {
- _, err := trie.NewSecure(block.Root(), dl.stateDb, 0)
+ _, err := trie.NewSecure(block.Root(), trie.NewDatabase(dl.stateDb), 0)
return err
}
return fmt.Errorf("non existent block: %x", hash[:4])
@@ -619,28 +618,22 @@ func assertOwnChain(t *testing.T, tester *downloadTester, length int) {
// number of items of the various chain components.
func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, lengths []int) {
// Initialize the counters for the first fork
- headers, blocks := lengths[0], lengths[0]
+ headers, blocks, receipts := lengths[0], lengths[0], lengths[0]-fsMinFullBlocks
- minReceipts, maxReceipts := lengths[0]-fsMinFullBlocks-fsPivotInterval, lengths[0]-fsMinFullBlocks
- if minReceipts < 0 {
- minReceipts = 1
- }
- if maxReceipts < 0 {
- maxReceipts = 1
+ if receipts < 0 {
+ receipts = 1
}
// Update the counters for each subsequent fork
for _, length := range lengths[1:] {
headers += length - common
blocks += length - common
-
- minReceipts += length - common - fsMinFullBlocks - fsPivotInterval
- maxReceipts += length - common - fsMinFullBlocks
+ receipts += length - common - fsMinFullBlocks
}
switch tester.downloader.mode {
case FullSync:
- minReceipts, maxReceipts = 1, 1
+ receipts = 1
case LightSync:
- blocks, minReceipts, maxReceipts = 1, 1, 1
+ blocks, receipts = 1, 1
}
if hs := len(tester.ownHeaders); hs != headers {
t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, headers)
@@ -648,11 +641,12 @@ func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, leng
if bs := len(tester.ownBlocks); bs != blocks {
t.Fatalf("synchronised blocks mismatch: have %v, want %v", bs, blocks)
}
- if rs := len(tester.ownReceipts); rs < minReceipts || rs > maxReceipts {
- t.Fatalf("synchronised receipts mismatch: have %v, want between [%v, %v]", rs, minReceipts, maxReceipts)
+ if rs := len(tester.ownReceipts); rs != receipts {
+ t.Fatalf("synchronised receipts mismatch: have %v, want %v", rs, receipts)
}
// Verify the state trie too for fast syncs
- if tester.downloader.mode == FastSync {
+ /*if tester.downloader.mode == FastSync {
+ pivot := uint64(0)
var index int
if pivot := int(tester.downloader.queue.fastSyncPivot); pivot < common {
index = pivot
@@ -660,11 +654,11 @@ func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, leng
index = len(tester.ownHashes) - lengths[len(lengths)-1] + int(tester.downloader.queue.fastSyncPivot)
}
if index > 0 {
- if statedb, err := state.New(tester.ownHeaders[tester.ownHashes[index]].Root, state.NewDatabase(tester.stateDb)); statedb == nil || err != nil {
+ if statedb, err := state.New(tester.ownHeaders[tester.ownHashes[index]].Root, state.NewDatabase(trie.NewDatabase(tester.stateDb))); statedb == nil || err != nil {
t.Fatalf("state reconstruction failed: %v", err)
}
}
- }
+ }*/
}
// Tests that simple synchronization against a canonical chain works correctly.
@@ -684,7 +678,7 @@ func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) {
defer tester.terminate()
// Create a small enough block chain to download
- targetBlocks := blockCacheLimit - 15
+ targetBlocks := blockCacheItems - 15
hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
@@ -710,7 +704,7 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
defer tester.terminate()
// Create a long block chain to download and the tester
- targetBlocks := 8 * blockCacheLimit
+ targetBlocks := 8 * blockCacheItems
hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
@@ -745,9 +739,9 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
cached = len(tester.downloader.queue.blockDonePool)
if mode == FastSync {
if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached {
- if tester.downloader.queue.resultCache[receipts].Header.Number.Uint64() < tester.downloader.queue.fastSyncPivot {
- cached = receipts
- }
+ //if tester.downloader.queue.resultCache[receipts].Header.Number.Uint64() < tester.downloader.queue.fastSyncPivot {
+ cached = receipts
+ //}
}
}
frozen = int(atomic.LoadUint32(&blocked))
@@ -755,7 +749,7 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
tester.downloader.queue.lock.Unlock()
tester.lock.Unlock()
- if cached == blockCacheLimit || retrieved+cached+frozen == targetBlocks+1 {
+ if cached == blockCacheItems || retrieved+cached+frozen == targetBlocks+1 {
break
}
}
@@ -765,8 +759,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
tester.lock.RLock()
retrieved = len(tester.ownBlocks)
tester.lock.RUnlock()
- if cached != blockCacheLimit && retrieved+cached+frozen != targetBlocks+1 {
- t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheLimit, retrieved, frozen, targetBlocks+1)
+ if cached != blockCacheItems && retrieved+cached+frozen != targetBlocks+1 {
+ t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheItems, retrieved, frozen, targetBlocks+1)
}
// Permit the blocked blocks to import
if atomic.LoadUint32(&blocked) > 0 {
@@ -974,7 +968,7 @@ func testCancel(t *testing.T, protocol int, mode SyncMode) {
defer tester.terminate()
// Create a small enough block chain to download and the tester
- targetBlocks := blockCacheLimit - 15
+ targetBlocks := blockCacheItems - 15
if targetBlocks >= MaxHashFetch {
targetBlocks = MaxHashFetch - 15
}
@@ -1016,12 +1010,12 @@ func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) {
// Create various peers with various parts of the chain
targetPeers := 8
- targetBlocks := targetPeers*blockCacheLimit - 15
+ targetBlocks := targetPeers*blockCacheItems - 15
hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
for i := 0; i < targetPeers; i++ {
id := fmt.Sprintf("peer #%d", i)
- tester.newPeer(id, protocol, hashes[i*blockCacheLimit:], headers, blocks, receipts)
+ tester.newPeer(id, protocol, hashes[i*blockCacheItems:], headers, blocks, receipts)
}
if err := tester.sync("peer #0", nil, mode); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
@@ -1045,7 +1039,7 @@ func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) {
defer tester.terminate()
// Create a small enough block chain to download
- targetBlocks := blockCacheLimit - 15
+ targetBlocks := blockCacheItems - 15
hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
// Create peers of every type
@@ -1084,7 +1078,7 @@ func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) {
defer tester.terminate()
// Create a block chain to download
- targetBlocks := 2*blockCacheLimit - 15
+ targetBlocks := 2*blockCacheItems - 15
hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
@@ -1110,8 +1104,8 @@ func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) {
bodiesNeeded++
}
}
- for hash, receipt := range receipts {
- if mode == FastSync && len(receipt) > 0 && headers[hash].Number.Uint64() <= tester.downloader.queue.fastSyncPivot {
+ for _, receipt := range receipts {
+ if mode == FastSync && len(receipt) > 0 {
receiptsNeeded++
}
}
@@ -1139,7 +1133,7 @@ func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
defer tester.terminate()
// Create a small enough block chain to download
- targetBlocks := blockCacheLimit - 15
+ targetBlocks := blockCacheItems - 15
hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
// Attempt a full sync with an attacker feeding gapped headers
@@ -1174,7 +1168,7 @@ func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
defer tester.terminate()
// Create a small enough block chain to download
- targetBlocks := blockCacheLimit - 15
+ targetBlocks := blockCacheItems - 15
hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
// Attempt a full sync with an attacker feeding shifted headers
@@ -1208,7 +1202,7 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
defer tester.terminate()
// Create a small enough block chain to download
- targetBlocks := 3*fsHeaderSafetyNet + fsPivotInterval + fsMinFullBlocks
+ targetBlocks := 3*fsHeaderSafetyNet + 256 + fsMinFullBlocks
hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
// Attempt to sync with an attacker that feeds junk during the fast sync phase.
@@ -1248,7 +1242,6 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
tester.newPeer("withhold-attack", protocol, hashes, headers, blocks, receipts)
missing = 3*fsHeaderSafetyNet + MaxHeaderFetch + 1
- tester.downloader.fsPivotFails = 0
tester.downloader.syncInitHook = func(uint64, uint64) {
for i := missing; i <= len(hashes); i++ {
delete(tester.peerHeaders["withhold-attack"], hashes[len(hashes)-i])
@@ -1267,8 +1260,6 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
t.Errorf("fast sync pivot block #%d not rolled back", head)
}
}
- tester.downloader.fsPivotFails = fsCriticalTrials
-
// Synchronise with the valid peer and make sure sync succeeds. Since the last
// rollback should also disable fast syncing for this process, verify that we
// did a fresh full sync. Note, we can't assert anything about the receipts
@@ -1383,7 +1374,7 @@ func testSyncProgress(t *testing.T, protocol int, mode SyncMode) {
defer tester.terminate()
// Create a small enough block chain to download
- targetBlocks := blockCacheLimit - 15
+ targetBlocks := blockCacheItems - 15
hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
// Set a sync init hook to catch progress changes
@@ -1532,7 +1523,7 @@ func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
defer tester.terminate()
// Create a small enough block chain to download
- targetBlocks := blockCacheLimit - 15
+ targetBlocks := blockCacheItems - 15
hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
// Set a sync init hook to catch progress changes
@@ -1609,7 +1600,7 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
defer tester.terminate()
// Create a small block chain
- targetBlocks := blockCacheLimit - 15
+ targetBlocks := blockCacheItems - 15
hashes, headers, blocks, receipts := tester.makeChain(targetBlocks+3, 0, tester.genesis, nil, false)
// Set a sync init hook to catch progress changes
@@ -1697,6 +1688,7 @@ func TestDeliverHeadersHang(t *testing.T) {
type floodingTestPeer struct {
peer Peer
tester *downloadTester
+ pend sync.WaitGroup
}
func (ftp *floodingTestPeer) Head() (common.Hash, *big.Int) { return ftp.peer.Head() }
@@ -1717,9 +1709,12 @@ func (ftp *floodingTestPeer) RequestHeadersByNumber(from uint64, count, skip int
deliveriesDone := make(chan struct{}, 500)
for i := 0; i < cap(deliveriesDone); i++ {
peer := fmt.Sprintf("fake-peer%d", i)
+ ftp.pend.Add(1)
+
go func() {
ftp.tester.downloader.DeliverHeaders(peer, []*types.Header{{}, {}, {}, {}})
deliveriesDone <- struct{}{}
+ ftp.pend.Done()
}()
}
// Deliver the actual requested headers.
@@ -1751,110 +1746,15 @@ func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
// Whenever the downloader requests headers, flood it with
// a lot of unrequested header deliveries.
tester.downloader.peers.peers["peer"].peer = &floodingTestPeer{
- tester.downloader.peers.peers["peer"].peer,
- tester,
+ peer: tester.downloader.peers.peers["peer"].peer,
+ tester: tester,
}
if err := tester.sync("peer", nil, mode); err != nil {
- t.Errorf("sync failed: %v", err)
+ t.Errorf("test %d: sync failed: %v", i, err)
}
tester.terminate()
- }
-}
-
-// Tests that if fast sync aborts in the critical section, it can restart a few
-// times before giving up.
-// We use data driven subtests to manage this so that it will be parallel on its own
-// and not with the other tests, avoiding intermittent failures.
-func TestFastCriticalRestarts(t *testing.T) {
- testCases := []struct {
- protocol int
- progress bool
- }{
- {63, false},
- {64, false},
- {63, true},
- {64, true},
- }
- for _, tc := range testCases {
- t.Run(fmt.Sprintf("protocol %d progress %v", tc.protocol, tc.progress), func(t *testing.T) {
- testFastCriticalRestarts(t, tc.protocol, tc.progress)
- })
- }
-}
-
-func testFastCriticalRestarts(t *testing.T, protocol int, progress bool) {
- t.Parallel()
-
- tester := newTester()
- defer tester.terminate()
-
- // Create a large enough blockchin to actually fast sync on
- targetBlocks := fsMinFullBlocks + 2*fsPivotInterval - 15
- hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false)
-
- // Create a tester peer with a critical section header missing (force failures)
- tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
- delete(tester.peerHeaders["peer"], hashes[fsMinFullBlocks-1])
- tester.downloader.dropPeer = func(id string) {} // We reuse the same "faulty" peer throughout the test
-
- // Remove all possible pivot state roots and slow down replies (test failure resets later)
- for i := 0; i < fsPivotInterval; i++ {
- tester.peerMissingStates["peer"][headers[hashes[fsMinFullBlocks+i]].Root] = true
- }
- (tester.downloader.peers.peers["peer"].peer).(*downloadTesterPeer).setDelay(500 * time.Millisecond) // Enough to reach the critical section
-
- // Synchronise with the peer a few times and make sure they fail until the retry limit
- for i := 0; i < int(fsCriticalTrials)-1; i++ {
- // Attempt a sync and ensure it fails properly
- if err := tester.sync("peer", nil, FastSync); err == nil {
- t.Fatalf("failing fast sync succeeded: %v", err)
- }
- time.Sleep(150 * time.Millisecond) // Make sure no in-flight requests remain
-
- // If it's the first failure, pivot should be locked => reenable all others to detect pivot changes
- if i == 0 {
- time.Sleep(150 * time.Millisecond) // Make sure no in-flight requests remain
- if tester.downloader.fsPivotLock == nil {
- time.Sleep(400 * time.Millisecond) // Make sure the first huge timeout expires too
- t.Fatalf("pivot block not locked in after critical section failure")
- }
- tester.lock.Lock()
- tester.peerHeaders["peer"][hashes[fsMinFullBlocks-1]] = headers[hashes[fsMinFullBlocks-1]]
- tester.peerMissingStates["peer"] = map[common.Hash]bool{tester.downloader.fsPivotLock.Root: true}
- (tester.downloader.peers.peers["peer"].peer).(*downloadTesterPeer).setDelay(0)
- tester.lock.Unlock()
- }
- }
- // Return all nodes if we're testing fast sync progression
- if progress {
- tester.lock.Lock()
- tester.peerMissingStates["peer"] = map[common.Hash]bool{}
- tester.lock.Unlock()
-
- if err := tester.sync("peer", nil, FastSync); err != nil {
- t.Fatalf("failed to synchronise blocks in progressed fast sync: %v", err)
- }
- time.Sleep(150 * time.Millisecond) // Make sure no in-flight requests remain
- if fails := atomic.LoadUint32(&tester.downloader.fsPivotFails); fails != 1 {
- t.Fatalf("progressed pivot trial count mismatch: have %v, want %v", fails, 1)
- }
- assertOwnChain(t, tester, targetBlocks+1)
- } else {
- if err := tester.sync("peer", nil, FastSync); err == nil {
- t.Fatalf("succeeded to synchronise blocks in failed fast sync")
- }
- time.Sleep(150 * time.Millisecond) // Make sure no in-flight requests remain
-
- if fails := atomic.LoadUint32(&tester.downloader.fsPivotFails); fails != fsCriticalTrials {
- t.Fatalf("failed pivot trial count mismatch: have %v, want %v", fails, fsCriticalTrials)
- }
- }
- // Retry limit exhausted, downloader will switch to full sync, should succeed
- if err := tester.sync("peer", nil, FastSync); err != nil {
- t.Fatalf("failed to synchronise blocks in slow sync: %v", err)
+ // Flush all goroutines to prevent messing with subsequent tests
+ tester.downloader.peers.peers["peer"].peer.(*floodingTestPeer).pend.Wait()
}
- // Note, we can't assert the chain here because the test asserter assumes sync
- // completed using a single mode of operation, whereas fast-then-slow can result
- // in arbitrary intermediate state that's not cleanly verifiable.
}
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index 6926f1d8c..a1a70e46e 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -32,7 +32,11 @@ import (
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
-var blockCacheLimit = 8192 // Maximum number of blocks to cache before throttling the download
+var (
+ blockCacheItems = 8192 // Maximum number of blocks to cache before throttling the download
+ blockCacheMemory = 64 * 1024 * 1024 // Maximum amount of memory to use for block caching
+ blockCacheSizeWeight = 0.1 // Multiplier to approximate the average block size based on past ones
+)
var (
errNoFetchesPending = errors.New("no fetches pending")
@@ -41,17 +45,17 @@ var (
// fetchRequest is a currently running data retrieval operation.
type fetchRequest struct {
- Peer *peerConnection // Peer to which the request was sent
- From uint64 // [eth/62] Requested chain element index (used for skeleton fills only)
- Hashes map[common.Hash]int // [eth/61] Requested hashes with their insertion index (priority)
- Headers []*types.Header // [eth/62] Requested headers, sorted by request order
- Time time.Time // Time when the request was made
+ Peer *peerConnection // Peer to which the request was sent
+ From uint64 // [eth/62] Requested chain element index (used for skeleton fills only)
+ Headers []*types.Header // [eth/62] Requested headers, sorted by request order
+ Time time.Time // Time when the request was made
}
// fetchResult is a struct collecting partial results from data fetchers until
// all outstanding pieces complete and the result as a whole can be processed.
type fetchResult struct {
- Pending int // Number of data fetches still pending
+ Pending int // Number of data fetches still pending
+ Hash common.Hash // Hash of the header to prevent recalculating
Header *types.Header
Uncles []*types.Header
@@ -61,12 +65,10 @@ type fetchResult struct {
// queue represents hashes that are either need fetching or are being fetched
type queue struct {
- mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching
- fastSyncPivot uint64 // Block number where the fast sync pivots into archive synchronisation mode
-
- headerHead common.Hash // [eth/62] Hash of the last queued header to verify order
+ mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching
// Headers are "special", they download in batches, supported by a skeleton chain
+ headerHead common.Hash // [eth/62] Hash of the last queued header to verify order
headerTaskPool map[uint64]*types.Header // [eth/62] Pending header retrieval tasks, mapping starting indexes to skeleton headers
headerTaskQueue *prque.Prque // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for
headerPeerMiss map[string]map[uint64]struct{} // [eth/62] Set of per-peer header batches known to be unavailable
@@ -87,8 +89,9 @@ type queue struct {
receiptPendPool map[string]*fetchRequest // [eth/63] Currently pending receipt retrieval operations
receiptDonePool map[common.Hash]struct{} // [eth/63] Set of the completed receipt fetches
- resultCache []*fetchResult // Downloaded but not yet delivered fetch results
- resultOffset uint64 // Offset of the first cached fetch result in the block chain
+ resultCache []*fetchResult // Downloaded but not yet delivered fetch results
+ resultOffset uint64 // Offset of the first cached fetch result in the block chain
+ resultSize common.StorageSize // Approximate size of a block (exponential moving average)
lock *sync.Mutex
active *sync.Cond
@@ -109,7 +112,7 @@ func newQueue() *queue {
receiptTaskQueue: prque.New(),
receiptPendPool: make(map[string]*fetchRequest),
receiptDonePool: make(map[common.Hash]struct{}),
- resultCache: make([]*fetchResult, blockCacheLimit),
+ resultCache: make([]*fetchResult, blockCacheItems),
active: sync.NewCond(lock),
lock: lock,
}
@@ -122,10 +125,8 @@ func (q *queue) Reset() {
q.closed = false
q.mode = FullSync
- q.fastSyncPivot = 0
q.headerHead = common.Hash{}
-
q.headerPendPool = make(map[string]*fetchRequest)
q.blockTaskPool = make(map[common.Hash]*types.Header)
@@ -138,7 +139,7 @@ func (q *queue) Reset() {
q.receiptPendPool = make(map[string]*fetchRequest)
q.receiptDonePool = make(map[common.Hash]struct{})
- q.resultCache = make([]*fetchResult, blockCacheLimit)
+ q.resultCache = make([]*fetchResult, blockCacheItems)
q.resultOffset = 0
}
@@ -214,27 +215,13 @@ func (q *queue) Idle() bool {
return (queued + pending + cached) == 0
}
-// FastSyncPivot retrieves the currently used fast sync pivot point.
-func (q *queue) FastSyncPivot() uint64 {
- q.lock.Lock()
- defer q.lock.Unlock()
-
- return q.fastSyncPivot
-}
-
// ShouldThrottleBlocks checks if the download should be throttled (active block (body)
// fetches exceed block cache).
func (q *queue) ShouldThrottleBlocks() bool {
q.lock.Lock()
defer q.lock.Unlock()
- // Calculate the currently in-flight block (body) requests
- pending := 0
- for _, request := range q.blockPendPool {
- pending += len(request.Hashes) + len(request.Headers)
- }
- // Throttle if more blocks (bodies) are in-flight than free space in the cache
- return pending >= len(q.resultCache)-len(q.blockDonePool)
+ return q.resultSlots(q.blockPendPool, q.blockDonePool) <= 0
}
// ShouldThrottleReceipts checks if the download should be throttled (active receipt
@@ -243,13 +230,39 @@ func (q *queue) ShouldThrottleReceipts() bool {
q.lock.Lock()
defer q.lock.Unlock()
- // Calculate the currently in-flight receipt requests
+ return q.resultSlots(q.receiptPendPool, q.receiptDonePool) <= 0
+}
+
+// resultSlots calculates the number of results slots available for requests
+// whilst adhering to both the item and the memory limit too of the results
+// cache.
+func (q *queue) resultSlots(pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}) int {
+ // Calculate the maximum length capped by the memory limit
+ limit := len(q.resultCache)
+ if common.StorageSize(len(q.resultCache))*q.resultSize > common.StorageSize(blockCacheMemory) {
+ limit = int((common.StorageSize(blockCacheMemory) + q.resultSize - 1) / q.resultSize)
+ }
+ // Calculate the number of slots already finished
+ finished := 0
+ for _, result := range q.resultCache[:limit] {
+ if result == nil {
+ break
+ }
+ if _, ok := donePool[result.Hash]; ok {
+ finished++
+ }
+ }
+ // Calculate the number of slots currently downloading
pending := 0
- for _, request := range q.receiptPendPool {
- pending += len(request.Headers)
+ for _, request := range pendPool {
+ for _, header := range request.Headers {
+ if header.Number.Uint64() < q.resultOffset+uint64(limit) {
+ pending++
+ }
+ }
}
- // Throttle if more receipts are in-flight than free space in the cache
- return pending >= len(q.resultCache)-len(q.receiptDonePool)
+ // Return the free slots to distribute
+ return limit - finished - pending
}
// ScheduleSkeleton adds a batch of header retrieval tasks to the queue to fill
@@ -323,8 +336,7 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
q.blockTaskPool[hash] = header
q.blockTaskQueue.Push(header, -float32(header.Number.Uint64()))
- if q.mode == FastSync && header.Number.Uint64() <= q.fastSyncPivot {
- // Fast phase of the fast sync, retrieve receipts too
+ if q.mode == FastSync {
q.receiptTaskPool[hash] = header
q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))
}
@@ -335,18 +347,25 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
return inserts
}
-// WaitResults retrieves and permanently removes a batch of fetch
-// results from the cache. the result slice will be empty if the queue
-// has been closed.
-func (q *queue) WaitResults() []*fetchResult {
+// Results retrieves and permanently removes a batch of fetch results from
+// the cache. the result slice will be empty if the queue has been closed.
+func (q *queue) Results(block bool) []*fetchResult {
q.lock.Lock()
defer q.lock.Unlock()
+ // Count the number of items available for processing
nproc := q.countProcessableItems()
for nproc == 0 && !q.closed {
+ if !block {
+ return nil
+ }
q.active.Wait()
nproc = q.countProcessableItems()
}
+ // Since we have a batch limit, don't pull more into "dangling" memory
+ if nproc > maxResultsProcess {
+ nproc = maxResultsProcess
+ }
results := make([]*fetchResult, nproc)
copy(results, q.resultCache[:nproc])
if len(results) > 0 {
@@ -363,6 +382,21 @@ func (q *queue) WaitResults() []*fetchResult {
}
// Advance the expected block number of the first cache entry.
q.resultOffset += uint64(nproc)
+
+ // Recalculate the result item weights to prevent memory exhaustion
+ for _, result := range results {
+ size := result.Header.Size()
+ for _, uncle := range result.Uncles {
+ size += uncle.Size()
+ }
+ for _, receipt := range result.Receipts {
+ size += receipt.Size()
+ }
+ for _, tx := range result.Transactions {
+ size += tx.Size()
+ }
+ q.resultSize = common.StorageSize(blockCacheSizeWeight)*size + (1-common.StorageSize(blockCacheSizeWeight))*q.resultSize
+ }
}
return results
}
@@ -370,21 +404,9 @@ func (q *queue) WaitResults() []*fetchResult {
// countProcessableItems counts the processable items.
func (q *queue) countProcessableItems() int {
for i, result := range q.resultCache {
- // Don't process incomplete or unavailable items.
if result == nil || result.Pending > 0 {
return i
}
- // Stop before processing the pivot block to ensure that
- // resultCache has space for fsHeaderForceVerify items. Not
- // doing this could leave us unable to download the required
- // amount of headers.
- if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot {
- for j := 0; j < fsHeaderForceVerify; j++ {
- if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil {
- return i
- }
- }
- }
}
return len(q.resultCache)
}
@@ -473,10 +495,8 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
return nil, false, nil
}
// Calculate an upper limit on the items we might fetch (i.e. throttling)
- space := len(q.resultCache) - len(donePool)
- for _, request := range pendPool {
- space -= len(request.Headers)
- }
+ space := q.resultSlots(pendPool, donePool)
+
// Retrieve a batch of tasks, skipping previously failed ones
send := make([]*types.Header, 0, count)
skip := make([]*types.Header, 0)
@@ -484,6 +504,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
progress := false
for proc := 0; proc < space && len(send) < count && !taskQueue.Empty(); proc++ {
header := taskQueue.PopItem().(*types.Header)
+ hash := header.Hash()
// If we're the first to request this task, initialise the result container
index := int(header.Number.Int64() - int64(q.resultOffset))
@@ -493,18 +514,19 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
}
if q.resultCache[index] == nil {
components := 1
- if q.mode == FastSync && header.Number.Uint64() <= q.fastSyncPivot {
+ if q.mode == FastSync {
components = 2
}
q.resultCache[index] = &fetchResult{
Pending: components,
+ Hash: hash,
Header: header,
}
}
// If this fetch task is a noop, skip this fetch operation
if isNoop(header) {
- donePool[header.Hash()] = struct{}{}
- delete(taskPool, header.Hash())
+ donePool[hash] = struct{}{}
+ delete(taskPool, hash)
space, proc = space-1, proc-1
q.resultCache[index].Pending--
@@ -512,7 +534,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
continue
}
// Otherwise unless the peer is known not to have the data, add to the retrieve list
- if p.Lacks(header.Hash()) {
+ if p.Lacks(hash) {
skip = append(skip, header)
} else {
send = append(send, header)
@@ -565,9 +587,6 @@ func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool m
if request.From > 0 {
taskQueue.Push(request.From, -float32(request.From))
}
- for hash, index := range request.Hashes {
- taskQueue.Push(hash, float32(index))
- }
for _, header := range request.Headers {
taskQueue.Push(header, -float32(header.Number.Uint64()))
}
@@ -640,18 +659,11 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest,
if request.From > 0 {
taskQueue.Push(request.From, -float32(request.From))
}
- for hash, index := range request.Hashes {
- taskQueue.Push(hash, float32(index))
- }
for _, header := range request.Headers {
taskQueue.Push(header, -float32(header.Number.Uint64()))
}
// Add the peer to the expiry report along the the number of failed requests
- expirations := len(request.Hashes)
- if expirations < len(request.Headers) {
- expirations = len(request.Headers)
- }
- expiries[id] = expirations
+ expiries[id] = len(request.Headers)
}
}
// Remove the expired requests from the pending pool
@@ -828,14 +840,16 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
failure = err
break
}
- donePool[header.Hash()] = struct{}{}
+ hash := header.Hash()
+
+ donePool[hash] = struct{}{}
q.resultCache[index].Pending--
useful = true
accepted++
// Clean up a successful fetch
request.Headers[i] = nil
- delete(taskPool, header.Hash())
+ delete(taskPool, hash)
}
// Return all failed or missing fetches to the queue
for _, header := range request.Headers {
@@ -860,7 +874,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
// Prepare configures the result cache to allow accepting and caching inbound
// fetch results.
-func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types.Header) {
+func (q *queue) Prepare(offset uint64, mode SyncMode) {
q.lock.Lock()
defer q.lock.Unlock()
@@ -868,6 +882,5 @@ func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types.
if q.resultOffset < offset {
q.resultOffset = offset
}
- q.fastSyncPivot = pivot
q.mode = mode
}
diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go
index 937828b94..9cc65a208 100644
--- a/eth/downloader/statesync.go
+++ b/eth/downloader/statesync.go
@@ -20,7 +20,6 @@ import (
"fmt"
"hash"
"sync"
- "sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
@@ -294,6 +293,9 @@ func (s *stateSync) loop() error {
case <-s.cancel:
return errCancelStateFetch
+ case <-s.d.cancelCh:
+ return errCancelStateFetch
+
case req := <-s.deliver:
// Response, disconnect or timeout triggered, drop the peer if stalling
log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "dropped", req.dropped, "timeout", !req.dropped && req.timedOut())
@@ -304,15 +306,11 @@ func (s *stateSync) loop() error {
s.d.dropPeer(req.peer.id)
}
// Process all the received blobs and check for stale delivery
- stale, err := s.process(req)
- if err != nil {
+ if err := s.process(req); err != nil {
log.Warn("Node data write error", "err", err)
return err
}
- // The the delivery contains requested data, mark the node idle (otherwise it's a timed out delivery)
- if !stale {
- req.peer.SetNodeDataIdle(len(req.response))
- }
+ req.peer.SetNodeDataIdle(len(req.response))
}
}
return s.commit(true)
@@ -352,6 +350,7 @@ func (s *stateSync) assignTasks() {
case s.d.trackStateReq <- req:
req.peer.FetchNodeData(req.items)
case <-s.cancel:
+ case <-s.d.cancelCh:
}
}
}
@@ -390,7 +389,7 @@ func (s *stateSync) fillTasks(n int, req *stateReq) {
// process iterates over a batch of delivered state data, injecting each item
// into a running state sync, re-queuing any items that were requested but not
// delivered.
-func (s *stateSync) process(req *stateReq) (bool, error) {
+func (s *stateSync) process(req *stateReq) error {
// Collect processing stats and update progress if valid data was received
duplicate, unexpected := 0, 0
@@ -401,7 +400,7 @@ func (s *stateSync) process(req *stateReq) (bool, error) {
}(time.Now())
// Iterate over all the delivered data and inject one-by-one into the trie
- progress, stale := false, len(req.response) > 0
+ progress := false
for _, blob := range req.response {
prog, hash, err := s.processNodeData(blob)
@@ -415,20 +414,12 @@ func (s *stateSync) process(req *stateReq) (bool, error) {
case trie.ErrAlreadyProcessed:
duplicate++
default:
- return stale, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err)
+ return fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err)
}
- // If the node delivered a requested item, mark the delivery non-stale
if _, ok := req.tasks[hash]; ok {
delete(req.tasks, hash)
- stale = false
}
}
- // If we're inside the critical section, reset fail counter since we progressed.
- if progress && atomic.LoadUint32(&s.d.fsPivotFails) > 1 {
- log.Trace("Fast-sync progressed, resetting fail counter", "previous", atomic.LoadUint32(&s.d.fsPivotFails))
- atomic.StoreUint32(&s.d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block
- }
-
// Put unfulfilled tasks back into the retry queue
npeers := s.d.peers.Len()
for hash, task := range req.tasks {
@@ -441,12 +432,12 @@ func (s *stateSync) process(req *stateReq) (bool, error) {
// If we've requested the node too many times already, it may be a malicious
// sync where nobody has the right data. Abort.
if len(task.attempts) >= npeers {
- return stale, fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers)
+ return fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers)
}
// Missing item, place into the retry queue.
s.tasks[hash] = task
}
- return stale, nil
+ return nil
}
// processNodeData tries to inject a trie node data blob delivered from a remote