diff options
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r-- | eth/downloader/downloader.go | 60 |
1 files changed, 50 insertions, 10 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index e19b70dfd..0298dfa0b 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -59,8 +59,8 @@ var ( maxQueuedStates = 256 * 1024 // [eth/63] Maximum number of state requests to queue (DOS protection) maxResultsProcess = 256 // Number of download results to import at once into the chain - headerCheckFrequency = 64 // Verification frequency of the downloaded headers during fast sync - minCheckedHeaders = 1024 // Number of headers to verify fully when approaching the chain head + headerCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync + minCheckedHeaders = 2048 // Number of headers to verify fully when approaching the chain head minFullBlocks = 1024 // Number of blocks to retrieve fully even in fast sync ) @@ -117,6 +117,7 @@ type Downloader struct { insertHeaders headerChainInsertFn // Injects a batch of headers into the chain insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain + rollback chainRollbackFn // Removes a batch of recently added chain links dropPeer peerDropFn // Drops a peer for misbehaving // Status @@ -152,7 +153,7 @@ type Downloader struct { func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, hasBlock blockCheckFn, getHeader headerRetrievalFn, getBlock blockRetrievalFn, headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn, headFastBlock headFastBlockRetrievalFn, commitHeadBlock headBlockCommitterFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn, insertBlocks blockChainInsertFn, - insertReceipts receiptChainInsertFn, dropPeer peerDropFn) *Downloader { + insertReceipts receiptChainInsertFn, rollback chainRollbackFn, dropPeer peerDropFn) *Downloader { return &Downloader{ mode: mode, @@ -171,6 +172,7 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader he insertHeaders: insertHeaders, insertBlocks: insertBlocks, insertReceipts: insertReceipts, + rollback: rollback, dropPeer: dropPeer, newPeerCh: make(chan *peer, 1), hashCh: make(chan dataPack, 1), @@ -383,7 +385,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e d.syncStatsChainHeight = latest d.syncStatsLock.Unlock() - // Initiate the sync using a concurrent header and content retrieval algorithm + // Initiate the sync using a concurrent header and content retrieval algorithm pivot := uint64(0) if latest > uint64(minFullBlocks) { pivot = latest - uint64(minFullBlocks) @@ -394,10 +396,10 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e d.syncInitHook(origin, latest) } errc := make(chan error, 4) - go func() { errc <- d.fetchHeaders(p, td, origin+1) }() // Headers are always retrieved - go func() { errc <- d.fetchBodies(origin + 1) }() // Bodies are retrieved during normal and fast sync - go func() { errc <- d.fetchReceipts(origin + 1) }() // Receipts are retrieved during fast sync - go func() { errc <- d.fetchNodeData() }() // Node state data is retrieved during fast sync + go func() { errc <- d.fetchHeaders(p, td, origin+1, latest) }() // Headers are always retrieved + go func() { errc <- d.fetchBodies(origin + 1) }() // Bodies are retrieved during normal and fast sync + go func() { errc <- d.fetchReceipts(origin + 1) }() // Receipts are retrieved during fast sync + go func() { errc <- d.fetchNodeData() }() // Node state data is retrieved during fast sync // If any fetcher fails, cancel the others var fail error @@ -1049,10 +1051,28 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) { // // The queue parameter can be used to switch between queuing headers for block // body download too, or directly import as pure header chains. -func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { +func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from, latest uint64) error { glog.V(logger.Debug).Infof("%v: downloading headers from #%d", p, from) defer glog.V(logger.Debug).Infof("%v: header download terminated", p) + // Keep a count of uncertain headers to roll back + rollback := []*types.Header{} + defer func() { + if len(rollback) > 0 { + hashes := make([]common.Hash, len(rollback)) + for i, header := range rollback { + hashes[i] = header.Hash() + } + d.rollback(hashes) + } + }() + // Calculate the pivoting point for switching from fast to slow sync + pivot := uint64(0) + if d.mode == FastSync && latest > uint64(minFullBlocks) { + pivot = latest - uint64(minFullBlocks) + } else if d.mode == LightSync { + pivot = latest + } // Create a timeout timer, and the associated hash fetcher request := time.Now() // time of the last fetch request timeout := time.NewTimer(0) // timer to dump a non-responsive active peer @@ -1124,10 +1144,30 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from) if d.mode == FastSync || d.mode == LightSync { - if n, err := d.insertHeaders(headers, headerCheckFrequency); err != nil { + // Collect the yet unknown headers to mark them as uncertain + unknown := make([]*types.Header, 0, len(headers)) + for _, header := range headers { + if !d.hasHeader(header.Hash()) { + unknown = append(unknown, header) + } + } + // If we're importing pure headers, verify based on their recentness + frequency := headerCheckFrequency + if headers[len(headers)-1].Number.Uint64()+uint64(minCheckedHeaders) > pivot { + frequency = 1 + } + if n, err := d.insertHeaders(headers, frequency); err != nil { glog.V(logger.Debug).Infof("%v: invalid header #%d [%x…]: %v", p, headers[n].Number, headers[n].Hash().Bytes()[:4], err) return errInvalidChain } + // All verifications passed, store newly found uncertain headers + rollback = append(rollback, unknown...) + if len(rollback) > minCheckedHeaders { + rollback = append(rollback[:0], rollback[len(rollback)-minCheckedHeaders:]...) + } + if headers[len(headers)-1].Number.Uint64() >= pivot { + rollback = rollback[:0] + } } if d.mode == FullSync || d.mode == FastSync { inserts := d.queue.Schedule(headers, from) |