diff options
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r-- | eth/downloader/downloader.go | 317 |
1 files changed, 166 insertions, 151 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 746c6a402..7f490d9e9 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -18,10 +18,8 @@ package downloader import ( - "crypto/rand" "errors" "fmt" - "math" "math/big" "sync" "sync/atomic" @@ -61,12 +59,11 @@ var ( maxHeadersProcess = 2048 // Number of header download results to import at once into the chain maxResultsProcess = 2048 // Number of content download results to import at once into the chain - fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync - fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected - fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it - fsPivotInterval = 256 // Number of headers out of which to randomize the pivot point - fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in fast sync - fsCriticalTrials = uint32(32) // Number of times to retry in the cricical section before bailing + fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync + fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected + fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it + fsHeaderContCheck = 3 * time.Second // Time interval to check for header continuations during state download + fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in fast sync ) var ( @@ -102,9 +99,6 @@ type Downloader struct { peers *peerSet // Set of active peers from which download can proceed stateDB ethdb.Database - fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries) - fsPivotFails uint32 // Number of subsequent fast sync failures in the critical section - rttEstimate uint64 // Round trip time to target for download requests rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops) @@ -124,6 +118,7 @@ type Downloader struct { synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing synchronising int32 notified int32 + committed int32 // Channels headerCh chan dataPack // [eth/62] Channel receiving inbound block headers @@ -156,7 +151,7 @@ type Downloader struct { // LightChain encapsulates functions required to synchronise a light chain. type LightChain interface { // HasHeader verifies a header's presence in the local chain. - HasHeader(h common.Hash, number uint64) bool + HasHeader(common.Hash, uint64) bool // GetHeaderByHash retrieves a header from the local chain. GetHeaderByHash(common.Hash) *types.Header @@ -179,7 +174,7 @@ type BlockChain interface { LightChain // HasBlockAndState verifies block and associated states' presence in the local chain. - HasBlockAndState(common.Hash) bool + HasBlockAndState(common.Hash, uint64) bool // GetBlockByHash retrieves a block from the local chain. GetBlockByHash(common.Hash) *types.Block @@ -391,9 +386,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode // Set the requested sync mode, unless it's forbidden d.mode = mode - if d.mode == FastSync && atomic.LoadUint32(&d.fsPivotFails) >= fsCriticalTrials { - d.mode = FullSync - } + // Retrieve the origin peer and initiate the downloading process p := d.peers.Peer(id) if p == nil { @@ -441,57 +434,40 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I d.syncStatsChainHeight = height d.syncStatsLock.Unlock() - // Initiate the sync using a concurrent header and content retrieval algorithm + // Ensure our origin point is below any fast sync pivot point pivot := uint64(0) - switch d.mode { - case LightSync: - pivot = height - case FastSync: - // Calculate the new fast/slow sync pivot point - if d.fsPivotLock == nil { - pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval))) - if err != nil { - panic(fmt.Sprintf("Failed to access crypto random source: %v", err)) - } - if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() { - pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64() - } + if d.mode == FastSync { + if height <= uint64(fsMinFullBlocks) { + origin = 0 } else { - // Pivot point locked in, use this and do not pick a new one! - pivot = d.fsPivotLock.Number.Uint64() - } - // If the point is below the origin, move origin back to ensure state download - if pivot < origin { - if pivot > 0 { + pivot = height - uint64(fsMinFullBlocks) + if pivot <= origin { origin = pivot - 1 - } else { - origin = 0 } } - log.Debug("Fast syncing until pivot block", "pivot", pivot) } - d.queue.Prepare(origin+1, d.mode, pivot, latest) + d.committed = 1 + if d.mode == FastSync && pivot != 0 { + d.committed = 0 + } + // Initiate the sync using a concurrent header and content retrieval algorithm + d.queue.Prepare(origin+1, d.mode) if d.syncInitHook != nil { d.syncInitHook(origin, height) } fetchers := []func() error{ - func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved - func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync - func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync - func() error { return d.processHeaders(origin+1, td) }, + func() error { return d.fetchHeaders(p, origin+1, pivot) }, // Headers are always retrieved + func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync + func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync + func() error { return d.processHeaders(origin+1, pivot, td) }, } if d.mode == FastSync { fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) }) } else if d.mode == FullSync { fetchers = append(fetchers, d.processFullSyncContent) } - err = d.spawnSync(fetchers) - if err != nil && d.mode == FastSync && d.fsPivotLock != nil { - // If sync failed in the critical section, bump the fail counter. - atomic.AddUint32(&d.fsPivotFails, 1) - } - return err + return d.spawnSync(fetchers) } // spawnSync runs d.process and all given fetcher functions to completion in @@ -671,7 +647,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err continue } // Otherwise check if we already know the header or not - if (d.mode == FullSync && d.blockchain.HasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.lightchain.HasHeader(headers[i].Hash(), headers[i].Number.Uint64())) { + if (d.mode == FullSync && d.blockchain.HasBlockAndState(headers[i].Hash(), headers[i].Number.Uint64())) || (d.mode != FullSync && d.lightchain.HasHeader(headers[i].Hash(), headers[i].Number.Uint64())) { number, hash = headers[i].Number.Uint64(), headers[i].Hash() // If every header is known, even future ones, the peer straight out lied about its head @@ -736,7 +712,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err arrived = true // Modify the search interval based on the response - if (d.mode == FullSync && !d.blockchain.HasBlockAndState(headers[0].Hash())) || (d.mode != FullSync && !d.lightchain.HasHeader(headers[0].Hash(), headers[0].Number.Uint64())) { + if (d.mode == FullSync && !d.blockchain.HasBlockAndState(headers[0].Hash(), headers[0].Number.Uint64())) || (d.mode != FullSync && !d.lightchain.HasHeader(headers[0].Hash(), headers[0].Number.Uint64())) { end = check break } @@ -774,7 +750,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err // other peers are only accepted if they map cleanly to the skeleton. If no one // can fill in the skeleton - not even the origin peer - it's assumed invalid and // the origin is dropped. -func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error { +func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) error { p.log.Debug("Directing header downloads", "origin", from) defer p.log.Debug("Header download terminated") @@ -825,6 +801,18 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error { } // If no more headers are inbound, notify the content fetchers and return if packet.Items() == 0 { + // Don't abort header fetches while the pivot is downloading + if atomic.LoadInt32(&d.committed) == 0 && pivot <= from { + p.log.Debug("No headers, waiting for pivot commit") + select { + case <-time.After(fsHeaderContCheck): + getHeaders(from) + continue + case <-d.cancelCh: + return errCancelHeaderFetch + } + } + // Pivot done (or not in fast sync) and no more headers, terminate the process p.log.Debug("No more headers available") select { case d.headerProcCh <- nil: @@ -1129,10 +1117,8 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv } if request.From > 0 { peer.log.Trace("Requesting new batch of data", "type", kind, "from", request.From) - } else if len(request.Headers) > 0 { - peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Headers), "from", request.Headers[0].Number) } else { - peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Hashes)) + peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Headers), "from", request.Headers[0].Number) } // Fetch the chunk and make sure any errors return the hashes to the queue if fetchHook != nil { @@ -1160,10 +1146,7 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv // processHeaders takes batches of retrieved headers from an input channel and // keeps processing and scheduling them into the header chain and downloader's // queue until the stream ends or a failure occurs. -func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { - // Calculate the pivoting point for switching from fast to slow sync - pivot := d.queue.FastSyncPivot() - +func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error { // Keep a count of uncertain headers to roll back rollback := []*types.Header{} defer func() { @@ -1188,19 +1171,6 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { "header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number), "fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock), "block", fmt.Sprintf("%d->%d", lastBlock, curBlock)) - - // If we're already past the pivot point, this could be an attack, thread carefully - if rollback[len(rollback)-1].Number.Uint64() > pivot { - // If we didn't ever fail, lock in the pivot header (must! not! change!) - if atomic.LoadUint32(&d.fsPivotFails) == 0 { - for _, header := range rollback { - if header.Number.Uint64() == pivot { - log.Warn("Fast-sync pivot locked in", "number", pivot, "hash", header.Hash()) - d.fsPivotLock = header - } - } - } - } } }() @@ -1302,13 +1272,6 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...) } } - // If we're fast syncing and just pulled in the pivot, make sure it's the one locked in - if d.mode == FastSync && d.fsPivotLock != nil && chunk[0].Number.Uint64() <= pivot && chunk[len(chunk)-1].Number.Uint64() >= pivot { - if pivot := chunk[int(pivot-chunk[0].Number.Uint64())]; pivot.Hash() != d.fsPivotLock.Hash() { - log.Warn("Pivot doesn't match locked in one", "remoteNumber", pivot.Number, "remoteHash", pivot.Hash(), "localNumber", d.fsPivotLock.Number, "localHash", d.fsPivotLock.Hash()) - return errInvalidChain - } - } // Unless we're doing light chains, schedule the headers for associated content retrieval if d.mode == FullSync || d.mode == FastSync { // If we've reached the allowed number of pending headers, stall a bit @@ -1343,7 +1306,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { // processFullSyncContent takes fetch results from the queue and imports them into the chain. func (d *Downloader) processFullSyncContent() error { for { - results := d.queue.WaitResults() + results := d.queue.Results(true) if len(results) == 0 { return nil } @@ -1357,30 +1320,28 @@ func (d *Downloader) processFullSyncContent() error { } func (d *Downloader) importBlockResults(results []*fetchResult) error { - for len(results) != 0 { - // Check for any termination requests. This makes clean shutdown faster. - select { - case <-d.quitCh: - return errCancelContentProcessing - default: - } - // Retrieve the a batch of results to import - items := int(math.Min(float64(len(results)), float64(maxResultsProcess))) - first, last := results[0].Header, results[items-1].Header - log.Debug("Inserting downloaded chain", "items", len(results), - "firstnum", first.Number, "firsthash", first.Hash(), - "lastnum", last.Number, "lasthash", last.Hash(), - ) - blocks := make([]*types.Block, items) - for i, result := range results[:items] { - blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) - } - if index, err := d.blockchain.InsertChain(blocks); err != nil { - log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) - return errInvalidChain - } - // Shift the results to the next batch - results = results[items:] + // Check for any early termination requests + if len(results) == 0 { + return nil + } + select { + case <-d.quitCh: + return errCancelContentProcessing + default: + } + // Retrieve the a batch of results to import + first, last := results[0].Header, results[len(results)-1].Header + log.Debug("Inserting downloaded chain", "items", len(results), + "firstnum", first.Number, "firsthash", first.Hash(), + "lastnum", last.Number, "lasthash", last.Hash(), + ) + blocks := make([]*types.Block, len(results)) + for i, result := range results { + blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) + } + if index, err := d.blockchain.InsertChain(blocks); err != nil { + log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) + return errInvalidChain } return nil } @@ -1388,35 +1349,92 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error { // processFastSyncContent takes fetch results from the queue and writes them to the // database. It also controls the synchronisation of state nodes of the pivot block. func (d *Downloader) processFastSyncContent(latest *types.Header) error { - // Start syncing state of the reported head block. - // This should get us most of the state of the pivot block. + // Start syncing state of the reported head block. This should get us most of + // the state of the pivot block. stateSync := d.syncState(latest.Root) defer stateSync.Cancel() go func() { - if err := stateSync.Wait(); err != nil { + if err := stateSync.Wait(); err != nil && err != errCancelStateFetch { d.queue.Close() // wake up WaitResults } }() - - pivot := d.queue.FastSyncPivot() + // Figure out the ideal pivot block. Note, that this goalpost may move if the + // sync takes long enough for the chain head to move significantly. + pivot := uint64(0) + if height := latest.Number.Uint64(); height > uint64(fsMinFullBlocks) { + pivot = height - uint64(fsMinFullBlocks) + } + // To cater for moving pivot points, track the pivot block and subsequently + // accumulated download results separatey. + var ( + oldPivot *fetchResult // Locked in pivot block, might change eventually + oldTail []*fetchResult // Downloaded content after the pivot + ) for { - results := d.queue.WaitResults() + // Wait for the next batch of downloaded data to be available, and if the pivot + // block became stale, move the goalpost + results := d.queue.Results(oldPivot == nil) // Block if we're not monitoring pivot staleness if len(results) == 0 { - return stateSync.Cancel() + // If pivot sync is done, stop + if oldPivot == nil { + return stateSync.Cancel() + } + // If sync failed, stop + select { + case <-d.cancelCh: + return stateSync.Cancel() + default: + } } if d.chainInsertHook != nil { d.chainInsertHook(results) } + if oldPivot != nil { + results = append(append([]*fetchResult{oldPivot}, oldTail...), results...) + } + // Split around the pivot block and process the two sides via fast/full sync + if atomic.LoadInt32(&d.committed) == 0 { + latest = results[len(results)-1].Header + if height := latest.Number.Uint64(); height > pivot+2*uint64(fsMinFullBlocks) { + log.Warn("Pivot became stale, moving", "old", pivot, "new", height-uint64(fsMinFullBlocks)) + pivot = height - uint64(fsMinFullBlocks) + } + } P, beforeP, afterP := splitAroundPivot(pivot, results) if err := d.commitFastSyncData(beforeP, stateSync); err != nil { return err } if P != nil { - stateSync.Cancel() - if err := d.commitPivotBlock(P); err != nil { - return err + // If new pivot block found, cancel old state retrieval and restart + if oldPivot != P { + stateSync.Cancel() + + stateSync = d.syncState(P.Header.Root) + defer stateSync.Cancel() + go func() { + if err := stateSync.Wait(); err != nil && err != errCancelStateFetch { + d.queue.Close() // wake up WaitResults + } + }() + oldPivot = P + } + // Wait for completion, occasionally checking for pivot staleness + select { + case <-stateSync.done: + if stateSync.err != nil { + return stateSync.err + } + if err := d.commitPivotBlock(P); err != nil { + return err + } + oldPivot = nil + + case <-time.After(time.Second): + oldTail = afterP + continue } } + // Fast sync done, pivot commit done, full import if err := d.importBlockResults(afterP); err != nil { return err } @@ -1439,52 +1457,49 @@ func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, bef } func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *stateSync) error { - for len(results) != 0 { - // Check for any termination requests. - select { - case <-d.quitCh: - return errCancelContentProcessing - case <-stateSync.done: - if err := stateSync.Wait(); err != nil { - return err - } - default: - } - // Retrieve the a batch of results to import - items := int(math.Min(float64(len(results)), float64(maxResultsProcess))) - first, last := results[0].Header, results[items-1].Header - log.Debug("Inserting fast-sync blocks", "items", len(results), - "firstnum", first.Number, "firsthash", first.Hash(), - "lastnumn", last.Number, "lasthash", last.Hash(), - ) - blocks := make([]*types.Block, items) - receipts := make([]types.Receipts, items) - for i, result := range results[:items] { - blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) - receipts[i] = result.Receipts - } - if index, err := d.blockchain.InsertReceiptChain(blocks, receipts); err != nil { - log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) - return errInvalidChain + // Check for any early termination requests + if len(results) == 0 { + return nil + } + select { + case <-d.quitCh: + return errCancelContentProcessing + case <-stateSync.done: + if err := stateSync.Wait(); err != nil { + return err } - // Shift the results to the next batch - results = results[items:] + default: + } + // Retrieve the a batch of results to import + first, last := results[0].Header, results[len(results)-1].Header + log.Debug("Inserting fast-sync blocks", "items", len(results), + "firstnum", first.Number, "firsthash", first.Hash(), + "lastnumn", last.Number, "lasthash", last.Hash(), + ) + blocks := make([]*types.Block, len(results)) + receipts := make([]types.Receipts, len(results)) + for i, result := range results { + blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) + receipts[i] = result.Receipts + } + if index, err := d.blockchain.InsertReceiptChain(blocks, receipts); err != nil { + log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) + return errInvalidChain } return nil } func (d *Downloader) commitPivotBlock(result *fetchResult) error { - b := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) - // Sync the pivot block state. This should complete reasonably quickly because - // we've already synced up to the reported head block state earlier. - if err := d.syncState(b.Root()).Wait(); err != nil { + block := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) + log.Debug("Committing fast sync pivot as new head", "number", block.Number(), "hash", block.Hash()) + if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, []types.Receipts{result.Receipts}); err != nil { return err } - log.Debug("Committing fast sync pivot as new head", "number", b.Number(), "hash", b.Hash()) - if _, err := d.blockchain.InsertReceiptChain([]*types.Block{b}, []types.Receipts{result.Receipts}); err != nil { + if err := d.blockchain.FastSyncCommitHead(block.Hash()); err != nil { return err } - return d.blockchain.FastSyncCommitHead(b.Hash()) + atomic.StoreInt32(&d.committed, 1) + return nil } // DeliverHeaders injects a new batch of block headers received from a remote |