aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/downloader.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r--eth/downloader/downloader.go317
1 files changed, 166 insertions, 151 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 746c6a402..7f490d9e9 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -18,10 +18,8 @@
package downloader
import (
- "crypto/rand"
"errors"
"fmt"
- "math"
"math/big"
"sync"
"sync/atomic"
@@ -61,12 +59,11 @@ var (
maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
maxResultsProcess = 2048 // Number of content download results to import at once into the chain
- fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync
- fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected
- fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it
- fsPivotInterval = 256 // Number of headers out of which to randomize the pivot point
- fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in fast sync
- fsCriticalTrials = uint32(32) // Number of times to retry in the cricical section before bailing
+ fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync
+ fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected
+ fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it
+ fsHeaderContCheck = 3 * time.Second // Time interval to check for header continuations during state download
+ fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in fast sync
)
var (
@@ -102,9 +99,6 @@ type Downloader struct {
peers *peerSet // Set of active peers from which download can proceed
stateDB ethdb.Database
- fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries)
- fsPivotFails uint32 // Number of subsequent fast sync failures in the critical section
-
rttEstimate uint64 // Round trip time to target for download requests
rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
@@ -124,6 +118,7 @@ type Downloader struct {
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
synchronising int32
notified int32
+ committed int32
// Channels
headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
@@ -156,7 +151,7 @@ type Downloader struct {
// LightChain encapsulates functions required to synchronise a light chain.
type LightChain interface {
// HasHeader verifies a header's presence in the local chain.
- HasHeader(h common.Hash, number uint64) bool
+ HasHeader(common.Hash, uint64) bool
// GetHeaderByHash retrieves a header from the local chain.
GetHeaderByHash(common.Hash) *types.Header
@@ -179,7 +174,7 @@ type BlockChain interface {
LightChain
// HasBlockAndState verifies block and associated states' presence in the local chain.
- HasBlockAndState(common.Hash) bool
+ HasBlockAndState(common.Hash, uint64) bool
// GetBlockByHash retrieves a block from the local chain.
GetBlockByHash(common.Hash) *types.Block
@@ -391,9 +386,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
// Set the requested sync mode, unless it's forbidden
d.mode = mode
- if d.mode == FastSync && atomic.LoadUint32(&d.fsPivotFails) >= fsCriticalTrials {
- d.mode = FullSync
- }
+
// Retrieve the origin peer and initiate the downloading process
p := d.peers.Peer(id)
if p == nil {
@@ -441,57 +434,40 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
d.syncStatsChainHeight = height
d.syncStatsLock.Unlock()
- // Initiate the sync using a concurrent header and content retrieval algorithm
+ // Ensure our origin point is below any fast sync pivot point
pivot := uint64(0)
- switch d.mode {
- case LightSync:
- pivot = height
- case FastSync:
- // Calculate the new fast/slow sync pivot point
- if d.fsPivotLock == nil {
- pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
- if err != nil {
- panic(fmt.Sprintf("Failed to access crypto random source: %v", err))
- }
- if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {
- pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64()
- }
+ if d.mode == FastSync {
+ if height <= uint64(fsMinFullBlocks) {
+ origin = 0
} else {
- // Pivot point locked in, use this and do not pick a new one!
- pivot = d.fsPivotLock.Number.Uint64()
- }
- // If the point is below the origin, move origin back to ensure state download
- if pivot < origin {
- if pivot > 0 {
+ pivot = height - uint64(fsMinFullBlocks)
+ if pivot <= origin {
origin = pivot - 1
- } else {
- origin = 0
}
}
- log.Debug("Fast syncing until pivot block", "pivot", pivot)
}
- d.queue.Prepare(origin+1, d.mode, pivot, latest)
+ d.committed = 1
+ if d.mode == FastSync && pivot != 0 {
+ d.committed = 0
+ }
+ // Initiate the sync using a concurrent header and content retrieval algorithm
+ d.queue.Prepare(origin+1, d.mode)
if d.syncInitHook != nil {
d.syncInitHook(origin, height)
}
fetchers := []func() error{
- func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
- func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
- func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
- func() error { return d.processHeaders(origin+1, td) },
+ func() error { return d.fetchHeaders(p, origin+1, pivot) }, // Headers are always retrieved
+ func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
+ func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
+ func() error { return d.processHeaders(origin+1, pivot, td) },
}
if d.mode == FastSync {
fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) })
} else if d.mode == FullSync {
fetchers = append(fetchers, d.processFullSyncContent)
}
- err = d.spawnSync(fetchers)
- if err != nil && d.mode == FastSync && d.fsPivotLock != nil {
- // If sync failed in the critical section, bump the fail counter.
- atomic.AddUint32(&d.fsPivotFails, 1)
- }
- return err
+ return d.spawnSync(fetchers)
}
// spawnSync runs d.process and all given fetcher functions to completion in
@@ -671,7 +647,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err
continue
}
// Otherwise check if we already know the header or not
- if (d.mode == FullSync && d.blockchain.HasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.lightchain.HasHeader(headers[i].Hash(), headers[i].Number.Uint64())) {
+ if (d.mode == FullSync && d.blockchain.HasBlockAndState(headers[i].Hash(), headers[i].Number.Uint64())) || (d.mode != FullSync && d.lightchain.HasHeader(headers[i].Hash(), headers[i].Number.Uint64())) {
number, hash = headers[i].Number.Uint64(), headers[i].Hash()
// If every header is known, even future ones, the peer straight out lied about its head
@@ -736,7 +712,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err
arrived = true
// Modify the search interval based on the response
- if (d.mode == FullSync && !d.blockchain.HasBlockAndState(headers[0].Hash())) || (d.mode != FullSync && !d.lightchain.HasHeader(headers[0].Hash(), headers[0].Number.Uint64())) {
+ if (d.mode == FullSync && !d.blockchain.HasBlockAndState(headers[0].Hash(), headers[0].Number.Uint64())) || (d.mode != FullSync && !d.lightchain.HasHeader(headers[0].Hash(), headers[0].Number.Uint64())) {
end = check
break
}
@@ -774,7 +750,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err
// other peers are only accepted if they map cleanly to the skeleton. If no one
// can fill in the skeleton - not even the origin peer - it's assumed invalid and
// the origin is dropped.
-func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error {
+func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) error {
p.log.Debug("Directing header downloads", "origin", from)
defer p.log.Debug("Header download terminated")
@@ -825,6 +801,18 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error {
}
// If no more headers are inbound, notify the content fetchers and return
if packet.Items() == 0 {
+ // Don't abort header fetches while the pivot is downloading
+ if atomic.LoadInt32(&d.committed) == 0 && pivot <= from {
+ p.log.Debug("No headers, waiting for pivot commit")
+ select {
+ case <-time.After(fsHeaderContCheck):
+ getHeaders(from)
+ continue
+ case <-d.cancelCh:
+ return errCancelHeaderFetch
+ }
+ }
+ // Pivot done (or not in fast sync) and no more headers, terminate the process
p.log.Debug("No more headers available")
select {
case d.headerProcCh <- nil:
@@ -1129,10 +1117,8 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
}
if request.From > 0 {
peer.log.Trace("Requesting new batch of data", "type", kind, "from", request.From)
- } else if len(request.Headers) > 0 {
- peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Headers), "from", request.Headers[0].Number)
} else {
- peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Hashes))
+ peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Headers), "from", request.Headers[0].Number)
}
// Fetch the chunk and make sure any errors return the hashes to the queue
if fetchHook != nil {
@@ -1160,10 +1146,7 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
// processHeaders takes batches of retrieved headers from an input channel and
// keeps processing and scheduling them into the header chain and downloader's
// queue until the stream ends or a failure occurs.
-func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
- // Calculate the pivoting point for switching from fast to slow sync
- pivot := d.queue.FastSyncPivot()
-
+func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error {
// Keep a count of uncertain headers to roll back
rollback := []*types.Header{}
defer func() {
@@ -1188,19 +1171,6 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
"header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number),
"fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock),
"block", fmt.Sprintf("%d->%d", lastBlock, curBlock))
-
- // If we're already past the pivot point, this could be an attack, thread carefully
- if rollback[len(rollback)-1].Number.Uint64() > pivot {
- // If we didn't ever fail, lock in the pivot header (must! not! change!)
- if atomic.LoadUint32(&d.fsPivotFails) == 0 {
- for _, header := range rollback {
- if header.Number.Uint64() == pivot {
- log.Warn("Fast-sync pivot locked in", "number", pivot, "hash", header.Hash())
- d.fsPivotLock = header
- }
- }
- }
- }
}
}()
@@ -1302,13 +1272,6 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
}
}
- // If we're fast syncing and just pulled in the pivot, make sure it's the one locked in
- if d.mode == FastSync && d.fsPivotLock != nil && chunk[0].Number.Uint64() <= pivot && chunk[len(chunk)-1].Number.Uint64() >= pivot {
- if pivot := chunk[int(pivot-chunk[0].Number.Uint64())]; pivot.Hash() != d.fsPivotLock.Hash() {
- log.Warn("Pivot doesn't match locked in one", "remoteNumber", pivot.Number, "remoteHash", pivot.Hash(), "localNumber", d.fsPivotLock.Number, "localHash", d.fsPivotLock.Hash())
- return errInvalidChain
- }
- }
// Unless we're doing light chains, schedule the headers for associated content retrieval
if d.mode == FullSync || d.mode == FastSync {
// If we've reached the allowed number of pending headers, stall a bit
@@ -1343,7 +1306,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
// processFullSyncContent takes fetch results from the queue and imports them into the chain.
func (d *Downloader) processFullSyncContent() error {
for {
- results := d.queue.WaitResults()
+ results := d.queue.Results(true)
if len(results) == 0 {
return nil
}
@@ -1357,30 +1320,28 @@ func (d *Downloader) processFullSyncContent() error {
}
func (d *Downloader) importBlockResults(results []*fetchResult) error {
- for len(results) != 0 {
- // Check for any termination requests. This makes clean shutdown faster.
- select {
- case <-d.quitCh:
- return errCancelContentProcessing
- default:
- }
- // Retrieve the a batch of results to import
- items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
- first, last := results[0].Header, results[items-1].Header
- log.Debug("Inserting downloaded chain", "items", len(results),
- "firstnum", first.Number, "firsthash", first.Hash(),
- "lastnum", last.Number, "lasthash", last.Hash(),
- )
- blocks := make([]*types.Block, items)
- for i, result := range results[:items] {
- blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
- }
- if index, err := d.blockchain.InsertChain(blocks); err != nil {
- log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
- return errInvalidChain
- }
- // Shift the results to the next batch
- results = results[items:]
+ // Check for any early termination requests
+ if len(results) == 0 {
+ return nil
+ }
+ select {
+ case <-d.quitCh:
+ return errCancelContentProcessing
+ default:
+ }
+ // Retrieve the a batch of results to import
+ first, last := results[0].Header, results[len(results)-1].Header
+ log.Debug("Inserting downloaded chain", "items", len(results),
+ "firstnum", first.Number, "firsthash", first.Hash(),
+ "lastnum", last.Number, "lasthash", last.Hash(),
+ )
+ blocks := make([]*types.Block, len(results))
+ for i, result := range results {
+ blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
+ }
+ if index, err := d.blockchain.InsertChain(blocks); err != nil {
+ log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
+ return errInvalidChain
}
return nil
}
@@ -1388,35 +1349,92 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
// processFastSyncContent takes fetch results from the queue and writes them to the
// database. It also controls the synchronisation of state nodes of the pivot block.
func (d *Downloader) processFastSyncContent(latest *types.Header) error {
- // Start syncing state of the reported head block.
- // This should get us most of the state of the pivot block.
+ // Start syncing state of the reported head block. This should get us most of
+ // the state of the pivot block.
stateSync := d.syncState(latest.Root)
defer stateSync.Cancel()
go func() {
- if err := stateSync.Wait(); err != nil {
+ if err := stateSync.Wait(); err != nil && err != errCancelStateFetch {
d.queue.Close() // wake up WaitResults
}
}()
-
- pivot := d.queue.FastSyncPivot()
+ // Figure out the ideal pivot block. Note, that this goalpost may move if the
+ // sync takes long enough for the chain head to move significantly.
+ pivot := uint64(0)
+ if height := latest.Number.Uint64(); height > uint64(fsMinFullBlocks) {
+ pivot = height - uint64(fsMinFullBlocks)
+ }
+ // To cater for moving pivot points, track the pivot block and subsequently
+ // accumulated download results separatey.
+ var (
+ oldPivot *fetchResult // Locked in pivot block, might change eventually
+ oldTail []*fetchResult // Downloaded content after the pivot
+ )
for {
- results := d.queue.WaitResults()
+ // Wait for the next batch of downloaded data to be available, and if the pivot
+ // block became stale, move the goalpost
+ results := d.queue.Results(oldPivot == nil) // Block if we're not monitoring pivot staleness
if len(results) == 0 {
- return stateSync.Cancel()
+ // If pivot sync is done, stop
+ if oldPivot == nil {
+ return stateSync.Cancel()
+ }
+ // If sync failed, stop
+ select {
+ case <-d.cancelCh:
+ return stateSync.Cancel()
+ default:
+ }
}
if d.chainInsertHook != nil {
d.chainInsertHook(results)
}
+ if oldPivot != nil {
+ results = append(append([]*fetchResult{oldPivot}, oldTail...), results...)
+ }
+ // Split around the pivot block and process the two sides via fast/full sync
+ if atomic.LoadInt32(&d.committed) == 0 {
+ latest = results[len(results)-1].Header
+ if height := latest.Number.Uint64(); height > pivot+2*uint64(fsMinFullBlocks) {
+ log.Warn("Pivot became stale, moving", "old", pivot, "new", height-uint64(fsMinFullBlocks))
+ pivot = height - uint64(fsMinFullBlocks)
+ }
+ }
P, beforeP, afterP := splitAroundPivot(pivot, results)
if err := d.commitFastSyncData(beforeP, stateSync); err != nil {
return err
}
if P != nil {
- stateSync.Cancel()
- if err := d.commitPivotBlock(P); err != nil {
- return err
+ // If new pivot block found, cancel old state retrieval and restart
+ if oldPivot != P {
+ stateSync.Cancel()
+
+ stateSync = d.syncState(P.Header.Root)
+ defer stateSync.Cancel()
+ go func() {
+ if err := stateSync.Wait(); err != nil && err != errCancelStateFetch {
+ d.queue.Close() // wake up WaitResults
+ }
+ }()
+ oldPivot = P
+ }
+ // Wait for completion, occasionally checking for pivot staleness
+ select {
+ case <-stateSync.done:
+ if stateSync.err != nil {
+ return stateSync.err
+ }
+ if err := d.commitPivotBlock(P); err != nil {
+ return err
+ }
+ oldPivot = nil
+
+ case <-time.After(time.Second):
+ oldTail = afterP
+ continue
}
}
+ // Fast sync done, pivot commit done, full import
if err := d.importBlockResults(afterP); err != nil {
return err
}
@@ -1439,52 +1457,49 @@ func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, bef
}
func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *stateSync) error {
- for len(results) != 0 {
- // Check for any termination requests.
- select {
- case <-d.quitCh:
- return errCancelContentProcessing
- case <-stateSync.done:
- if err := stateSync.Wait(); err != nil {
- return err
- }
- default:
- }
- // Retrieve the a batch of results to import
- items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
- first, last := results[0].Header, results[items-1].Header
- log.Debug("Inserting fast-sync blocks", "items", len(results),
- "firstnum", first.Number, "firsthash", first.Hash(),
- "lastnumn", last.Number, "lasthash", last.Hash(),
- )
- blocks := make([]*types.Block, items)
- receipts := make([]types.Receipts, items)
- for i, result := range results[:items] {
- blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
- receipts[i] = result.Receipts
- }
- if index, err := d.blockchain.InsertReceiptChain(blocks, receipts); err != nil {
- log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
- return errInvalidChain
+ // Check for any early termination requests
+ if len(results) == 0 {
+ return nil
+ }
+ select {
+ case <-d.quitCh:
+ return errCancelContentProcessing
+ case <-stateSync.done:
+ if err := stateSync.Wait(); err != nil {
+ return err
}
- // Shift the results to the next batch
- results = results[items:]
+ default:
+ }
+ // Retrieve the a batch of results to import
+ first, last := results[0].Header, results[len(results)-1].Header
+ log.Debug("Inserting fast-sync blocks", "items", len(results),
+ "firstnum", first.Number, "firsthash", first.Hash(),
+ "lastnumn", last.Number, "lasthash", last.Hash(),
+ )
+ blocks := make([]*types.Block, len(results))
+ receipts := make([]types.Receipts, len(results))
+ for i, result := range results {
+ blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
+ receipts[i] = result.Receipts
+ }
+ if index, err := d.blockchain.InsertReceiptChain(blocks, receipts); err != nil {
+ log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
+ return errInvalidChain
}
return nil
}
func (d *Downloader) commitPivotBlock(result *fetchResult) error {
- b := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
- // Sync the pivot block state. This should complete reasonably quickly because
- // we've already synced up to the reported head block state earlier.
- if err := d.syncState(b.Root()).Wait(); err != nil {
+ block := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
+ log.Debug("Committing fast sync pivot as new head", "number", block.Number(), "hash", block.Hash())
+ if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, []types.Receipts{result.Receipts}); err != nil {
return err
}
- log.Debug("Committing fast sync pivot as new head", "number", b.Number(), "hash", b.Hash())
- if _, err := d.blockchain.InsertReceiptChain([]*types.Block{b}, []types.Receipts{result.Receipts}); err != nil {
+ if err := d.blockchain.FastSyncCommitHead(block.Hash()); err != nil {
return err
}
- return d.blockchain.FastSyncCommitHead(b.Hash())
+ atomic.StoreInt32(&d.committed, 1)
+ return nil
}
// DeliverHeaders injects a new batch of block headers received from a remote