diff options
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r-- | eth/downloader/downloader.go | 334 |
1 files changed, 178 insertions, 156 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 839969f03..e4d1392d0 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -34,7 +34,6 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" - "github.com/ethereum/go-ethereum/trie" "github.com/rcrowley/go-metrics" ) @@ -99,8 +98,9 @@ type Downloader struct { mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle) mux *event.TypeMux // Event multiplexer to announce sync operation events - queue *queue // Scheduler for selecting the hashes to download - peers *peerSet // Set of active peers from which download can proceed + queue *queue // Scheduler for selecting the hashes to download + peers *peerSet // Set of active peers from which download can proceed + stateDB ethdb.Database fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries) fsPivotFails uint32 // Number of subsequent fast sync failures in the critical section @@ -109,9 +109,9 @@ type Downloader struct { rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops) // Statistics - syncStatsChainOrigin uint64 // Origin block number where syncing started at - syncStatsChainHeight uint64 // Highest block number known when syncing started - syncStatsStateDone uint64 // Number of state trie entries already pulled + syncStatsChainOrigin uint64 // Origin block number where syncing started at + syncStatsChainHeight uint64 // Highest block number known when syncing started + syncStatsState stateSyncStats syncStatsLock sync.RWMutex // Lock protecting the sync stats fields // Callbacks @@ -136,16 +136,18 @@ type Downloader struct { notified int32 // Channels - newPeerCh chan *peer headerCh chan dataPack // [eth/62] Channel receiving inbound block headers bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts - stateCh chan dataPack // [eth/63] Channel receiving inbound node state data bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks - stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks + // for stateFetcher + stateSyncStart chan *stateSync + trackStateReq chan *stateReq + stateCh chan dataPack // [eth/63] Channel receiving inbound node state data + // Cancellation and termination cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop) cancelCh chan struct{} // Channel to cancel mid-flight syncs @@ -170,8 +172,9 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader he dl := &Downloader{ mode: mode, mux: mux, - queue: newQueue(stateDb), + queue: newQueue(), peers: newPeerSet(), + stateDB: stateDb, rttEstimate: uint64(rttMaxEstimate), rttConfidence: uint64(1000000), hasHeader: hasHeader, @@ -188,18 +191,20 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader he insertReceipts: insertReceipts, rollback: rollback, dropPeer: dropPeer, - newPeerCh: make(chan *peer, 1), headerCh: make(chan dataPack, 1), bodyCh: make(chan dataPack, 1), receiptCh: make(chan dataPack, 1), - stateCh: make(chan dataPack, 1), bodyWakeCh: make(chan bool, 1), receiptWakeCh: make(chan bool, 1), - stateWakeCh: make(chan bool, 1), headerProcCh: make(chan []*types.Header, 1), quitCh: make(chan struct{}), + // for stateFetcher + stateSyncStart: make(chan *stateSync), + trackStateReq: make(chan *stateReq), + stateCh: make(chan dataPack), } go dl.qosTuner() + go dl.stateFetcher() return dl } @@ -211,9 +216,6 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader he // of processed and the total number of known states are also returned. Otherwise // these are zero. func (d *Downloader) Progress() ethereum.SyncProgress { - // Fetch the pending state count outside of the lock to prevent unforeseen deadlocks - pendingStates := uint64(d.queue.PendingNodeData()) - // Lock the current stats and return the progress d.syncStatsLock.RLock() defer d.syncStatsLock.RUnlock() @@ -231,8 +233,8 @@ func (d *Downloader) Progress() ethereum.SyncProgress { StartingBlock: d.syncStatsChainOrigin, CurrentBlock: current, HighestBlock: d.syncStatsChainHeight, - PulledStates: d.syncStatsStateDone, - KnownStates: d.syncStatsStateDone + pendingStates, + PulledStates: d.syncStatsState.processed, + KnownStates: d.syncStatsState.processed + d.syncStatsState.pending, } } @@ -324,13 +326,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode d.queue.Reset() d.peers.Reset() - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case <-ch: default: } } - for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh, d.stateCh} { + for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh} { for empty := false; !empty; { select { case <-ch: @@ -439,30 +441,40 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e if d.syncInitHook != nil { d.syncInitHook(origin, height) } - return d.spawnSync(origin+1, - func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved - func() error { return d.processHeaders(origin+1, td) }, // Headers are always retrieved - func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync - func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync - func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync - ) + + fetchers := []func() error{ + func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved + func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync + func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync + func() error { return d.processHeaders(origin+1, td) }, + } + if d.mode == FastSync { + fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) }) + } else if d.mode == FullSync { + fetchers = append(fetchers, d.processFullSyncContent) + } + err = d.spawnSync(fetchers) + if err != nil && d.mode == FastSync && d.fsPivotLock != nil { + // If sync failed in the critical section, bump the fail counter. + atomic.AddUint32(&d.fsPivotFails, 1) + } + return err } // spawnSync runs d.process and all given fetcher functions to completion in // separate goroutines, returning the first error that appears. -func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error { +func (d *Downloader) spawnSync(fetchers []func() error) error { var wg sync.WaitGroup - errc := make(chan error, len(fetchers)+1) - wg.Add(len(fetchers) + 1) - go func() { defer wg.Done(); errc <- d.processContent() }() + errc := make(chan error, len(fetchers)) + wg.Add(len(fetchers)) for _, fn := range fetchers { fn := fn go func() { defer wg.Done(); errc <- fn() }() } // Wait for the first error, then terminate the others. var err error - for i := 0; i < len(fetchers)+1; i++ { - if i == len(fetchers) { + for i := 0; i < len(fetchers); i++ { + if i == len(fetchers)-1 { // Close the queue when all fetchers have exited. // This will cause the block processor to end when // it has processed the queue. @@ -475,11 +487,6 @@ func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error { d.queue.Close() d.Cancel() wg.Wait() - - // If sync failed in the critical section, bump the fail counter - if err != nil && d.mode == FastSync && d.fsPivotLock != nil { - atomic.AddUint32(&d.fsPivotFails, 1) - } return err } @@ -552,7 +559,6 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) { return nil, errTimeout case <-d.bodyCh: - case <-d.stateCh: case <-d.receiptCh: // Out of bounds delivery, ignore } @@ -649,7 +655,6 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { return 0, errTimeout case <-d.bodyCh: - case <-d.stateCh: case <-d.receiptCh: // Out of bounds delivery, ignore } @@ -714,7 +719,6 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { return 0, errTimeout case <-d.bodyCh: - case <-d.stateCh: case <-d.receiptCh: // Out of bounds delivery, ignore } @@ -827,7 +831,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { d.dropPeer(p.id) // Finish the sync gracefully instead of dumping the gathered data though - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- false: case <-d.cancelCh: @@ -927,68 +931,6 @@ func (d *Downloader) fetchReceipts(from uint64) error { return err } -// fetchNodeData iteratively downloads the scheduled state trie nodes, taking any -// available peers, reserving a chunk of nodes for each, waiting for delivery and -// also periodically checking for timeouts. -func (d *Downloader) fetchNodeData() error { - log.Debug("Downloading node state data") - - var ( - deliver = func(packet dataPack) (int, error) { - start := time.Now() - return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(delivered int, progressed bool, err error) { - // If the peer returned old-requested data, forgive - if err == trie.ErrNotRequested { - log.Debug("Forgiving reply to stale state request", "peer", packet.PeerId()) - return - } - if err != nil { - // If the node data processing failed, the root hash is very wrong, abort - log.Error("State processing failed", "peer", packet.PeerId(), "err", err) - d.Cancel() - return - } - // Processing succeeded, notify state fetcher of continuation - pending := d.queue.PendingNodeData() - if pending > 0 { - select { - case d.stateWakeCh <- true: - default: - } - } - d.syncStatsLock.Lock() - d.syncStatsStateDone += uint64(delivered) - syncStatsStateDone := d.syncStatsStateDone // Thread safe copy for the log below - d.syncStatsLock.Unlock() - - // If real database progress was made, reset any fast-sync pivot failure - if progressed && atomic.LoadUint32(&d.fsPivotFails) > 1 { - log.Debug("Fast-sync progressed, resetting fail counter", "previous", atomic.LoadUint32(&d.fsPivotFails)) - atomic.StoreUint32(&d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block - } - // Log a message to the user and return - if delivered > 0 { - log.Info("Imported new state entries", "count", delivered, "elapsed", common.PrettyDuration(time.Since(start)), "processed", syncStatsStateDone, "pending", pending) - } - }) - } - expire = func() map[string]int { return d.queue.ExpireNodeData(d.requestTTL()) } - throttle = func() bool { return false } - reserve = func(p *peer, count int) (*fetchRequest, bool, error) { - return d.queue.ReserveNodeData(p, count), false, nil - } - fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) } - capacity = func(p *peer) int { return p.NodeDataCapacity(d.requestRTT()) } - setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) } - ) - err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire, - d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch, - d.queue.CancelNodeData, capacity, d.peers.NodeDataIdlePeers, setIdle, "states") - - log.Debug("Node state data download terminated", "err", err) - return err -} - // fetchParts iteratively downloads scheduled block parts, taking any available // peers, reserving a chunk of fetch requests for each, waiting for delivery and // also periodically checking for timeouts. @@ -1229,7 +1171,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { // Terminate header processing if we synced up if len(headers) == 0 { // Notify everyone that headers are fully processed - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- false: case <-d.cancelCh: @@ -1341,7 +1283,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { origin += uint64(limit) } // Signal the content downloaders of the availablility of new tasks - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- true: default: @@ -1351,71 +1293,151 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { } } -// processContent takes fetch results from the queue and tries to import them -// into the chain. The type of import operation will depend on the result contents. -func (d *Downloader) processContent() error { - pivot := d.queue.FastSyncPivot() +// processFullSyncContent takes fetch results from the queue and imports them into the chain. +func (d *Downloader) processFullSyncContent() error { for { results := d.queue.WaitResults() if len(results) == 0 { - return nil // queue empty + return nil } if d.chainInsertHook != nil { d.chainInsertHook(results) } - // Actually import the blocks - first, last := results[0].Header, results[len(results)-1].Header + if err := d.importBlockResults(results); err != nil { + return err + } + } +} + +func (d *Downloader) importBlockResults(results []*fetchResult) error { + for len(results) != 0 { + // Check for any termination requests. This makes clean shutdown faster. + select { + case <-d.quitCh: + return errCancelContentProcessing + default: + } + // Retrieve the a batch of results to import + items := int(math.Min(float64(len(results)), float64(maxResultsProcess))) + first, last := results[0].Header, results[items-1].Header log.Debug("Inserting downloaded chain", "items", len(results), "firstnum", first.Number, "firsthash", first.Hash(), "lastnum", last.Number, "lasthash", last.Hash(), ) - for len(results) != 0 { - // Check for any termination requests - select { - case <-d.quitCh: - return errCancelContentProcessing - default: - } - // Retrieve the a batch of results to import - var ( - blocks = make([]*types.Block, 0, maxResultsProcess) - receipts = make([]types.Receipts, 0, maxResultsProcess) - ) - items := int(math.Min(float64(len(results)), float64(maxResultsProcess))) - for _, result := range results[:items] { - switch { - case d.mode == FullSync: - blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)) - case d.mode == FastSync: - blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)) - if result.Header.Number.Uint64() <= pivot { - receipts = append(receipts, result.Receipts) - } - } - } - // Try to process the results, aborting if there's an error - var ( - err error - index int - ) - switch { - case len(receipts) > 0: - index, err = d.insertReceipts(blocks, receipts) - if err == nil && blocks[len(blocks)-1].NumberU64() == pivot { - log.Debug("Committing block as new head", "number", blocks[len(blocks)-1].Number(), "hash", blocks[len(blocks)-1].Hash()) - index, err = len(blocks)-1, d.commitHeadBlock(blocks[len(blocks)-1].Hash()) - } - default: - index, err = d.insertBlocks(blocks) + blocks := make([]*types.Block, items) + for i, result := range results[:items] { + blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) + } + if index, err := d.insertBlocks(blocks); err != nil { + log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) + return errInvalidChain + } + // Shift the results to the next batch + results = results[items:] + } + return nil +} + +// processFastSyncContent takes fetch results from the queue and writes them to the +// database. It also controls the synchronisation of state nodes of the pivot block. +func (d *Downloader) processFastSyncContent(latest *types.Header) error { + // Start syncing state of the reported head block. + // This should get us most of the state of the pivot block. + stateSync := d.syncState(latest.Root) + defer stateSync.Cancel() + go func() { + if err := stateSync.Wait(); err != nil { + d.queue.Close() // wake up WaitResults + } + }() + + pivot := d.queue.FastSyncPivot() + for { + results := d.queue.WaitResults() + if len(results) == 0 { + return stateSync.Cancel() + } + if d.chainInsertHook != nil { + d.chainInsertHook(results) + } + P, beforeP, afterP := splitAroundPivot(pivot, results) + if err := d.commitFastSyncData(beforeP, stateSync); err != nil { + return err + } + if P != nil { + stateSync.Cancel() + if err := d.commitPivotBlock(P); err != nil { + return err } - if err != nil { - log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) - return errInvalidChain + } + if err := d.importBlockResults(afterP); err != nil { + return err + } + } +} + +func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, before, after []*fetchResult) { + for _, result := range results { + num := result.Header.Number.Uint64() + switch { + case num < pivot: + before = append(before, result) + case num == pivot: + p = result + default: + after = append(after, result) + } + } + return p, before, after +} + +func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *stateSync) error { + for len(results) != 0 { + // Check for any termination requests. + select { + case <-d.quitCh: + return errCancelContentProcessing + case <-stateSync.done: + if err := stateSync.Wait(); err != nil { + return err } - // Shift the results to the next batch - results = results[items:] + default: + } + // Retrieve the a batch of results to import + items := int(math.Min(float64(len(results)), float64(maxResultsProcess))) + first, last := results[0].Header, results[items-1].Header + log.Debug("Inserting fast-sync blocks", "items", len(results), + "firstnum", first.Number, "firsthash", first.Hash(), + "lastnumn", last.Number, "lasthash", last.Hash(), + ) + blocks := make([]*types.Block, items) + receipts := make([]types.Receipts, items) + for i, result := range results[:items] { + blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) + receipts[i] = result.Receipts + } + if index, err := d.insertReceipts(blocks, receipts); err != nil { + log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) + return errInvalidChain } + // Shift the results to the next batch + results = results[items:] + } + return nil +} + +func (d *Downloader) commitPivotBlock(result *fetchResult) error { + b := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) + // Sync the pivot block state. This should complete reasonably quickly because + // we've already synced up to the reported head block state earlier. + if err := d.syncState(b.Root()).Wait(); err != nil { + return err + } + log.Debug("Committing fast sync pivot as new head", "number", b.Number(), "hash", b.Hash()) + if _, err := d.insertReceipts([]*types.Block{b}, []types.Receipts{result.Receipts}); err != nil { + return err } + return d.commitHeadBlock(b.Hash()) } // DeliverHeaders injects a new batch of block headers received from a remote |