diff options
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r-- | eth/downloader/downloader.go | 65 |
1 files changed, 51 insertions, 14 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index f6dbb4610..0bc8b4acf 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -73,6 +73,7 @@ var ( fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it fsPivotInterval = 512 // Number of headers out of which to randomize the pivot point fsMinFullBlocks = 1024 // Number of blocks to retrieve fully even in fast sync + fsCriticalTrials = 10 // Number of times to retry in the cricical section before bailing ) var ( @@ -103,13 +104,15 @@ var ( ) type Downloader struct { - mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle) - noFast bool // Flag to disable fast syncing in case of a security error - mux *event.TypeMux // Event multiplexer to announce sync operation events + mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle) + mux *event.TypeMux // Event multiplexer to announce sync operation events queue *queue // Scheduler for selecting the hashes to download peers *peerSet // Set of active peers from which download can proceed + fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries) + fsPivotFails int // Number of fast sync failures in the critical section + interrupt int32 // Atomic boolean to signal termination // Statistics @@ -314,6 +317,15 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode default: } } + for _, ch := range []chan dataPack{d.hashCh, d.blockCh, d.headerCh, d.bodyCh, d.receiptCh, d.stateCh} { + for empty := false; !empty; { + select { + case <-ch: + default: + empty = true + } + } + } for empty := false; !empty; { select { case <-d.headerProcCh: @@ -330,7 +342,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode // Set the requested sync mode, unless it's forbidden d.mode = mode - if d.mode == FastSync && d.noFast { + if d.mode == FastSync && d.fsPivotFails >= fsCriticalTrials { d.mode = FullSync } // Retrieve the origin peer and initiate the downloading process @@ -413,12 +425,17 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e pivot = height case FastSync: // Calculate the new fast/slow sync pivot point - pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval))) - if err != nil { - panic(fmt.Sprintf("Failed to access crypto random source: %v", err)) - } - if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() { - pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64() + if d.fsPivotLock == nil { + pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval))) + if err != nil { + panic(fmt.Sprintf("Failed to access crypto random source: %v", err)) + } + if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() { + pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64() + } + } else { + // Pivot point locked in, use this and do not pick a new one! + pivot = d.fsPivotLock.Number.Uint64() } // If the point is below the origin, move origin back to ensure state download if pivot < origin { @@ -1218,8 +1235,12 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { // If no more headers are inbound, notify the content fetchers and return if packet.Items() == 0 { glog.V(logger.Debug).Infof("%v: no available headers", p) - d.headerProcCh <- nil - return nil + select { + case d.headerProcCh <- nil: + return nil + case <-d.cancelCh: + return errCancelHeaderFetch + } } headers := packet.(*headerPack).headers @@ -1611,9 +1632,18 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { glog.V(logger.Warn).Infof("Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)", len(hashes), lastHeader, d.headHeader().Number, lastFastBlock, d.headFastBlock().Number(), lastBlock, d.headBlock().Number()) - // If we're already past the pivot point, this could be an attack, disable fast sync + // If we're already past the pivot point, this could be an attack, thread carefully if rollback[len(rollback)-1].Number.Uint64() > pivot { - d.noFast = true + // If we didn't ever fail, lock in te pivot header (must! not! change!) + if d.fsPivotFails == 0 { + for _, header := range rollback { + if header.Number.Uint64() == pivot { + glog.V(logger.Warn).Infof("Fast-sync critical section failure, locked pivot to header #%d [%x…]", pivot, header.Hash().Bytes()[:4]) + d.fsPivotLock = header + } + } + } + d.fsPivotFails++ } } }() @@ -1712,6 +1742,13 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...) } } + // If we're fast syncing and just pulled in the pivot, make sure it's the one locked in + if d.mode == FastSync && d.fsPivotLock != nil && chunk[0].Number.Uint64() <= pivot && chunk[len(chunk)-1].Number.Uint64() >= pivot { + if pivot := chunk[int(pivot-chunk[0].Number.Uint64())]; pivot.Hash() != d.fsPivotLock.Hash() { + glog.V(logger.Warn).Infof("Pivot doesn't match locked in version: have #%v [%x…], want #%v [%x…]", pivot.Number, pivot.Hash().Bytes()[:4], d.fsPivotLock.Number, d.fsPivotLock.Hash().Bytes()[:4]) + return errInvalidChain + } + } // Unless we're doing light chains, schedule the headers for associated content retrieval if d.mode == FullSync || d.mode == FastSync { // If we've reached the allowed number of pending headers, stall a bit |