aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/downloader.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r--eth/downloader/downloader.go334
1 files changed, 178 insertions, 156 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 839969f03..e4d1392d0 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -34,7 +34,6 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
- "github.com/ethereum/go-ethereum/trie"
"github.com/rcrowley/go-metrics"
)
@@ -99,8 +98,9 @@ type Downloader struct {
mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle)
mux *event.TypeMux // Event multiplexer to announce sync operation events
- queue *queue // Scheduler for selecting the hashes to download
- peers *peerSet // Set of active peers from which download can proceed
+ queue *queue // Scheduler for selecting the hashes to download
+ peers *peerSet // Set of active peers from which download can proceed
+ stateDB ethdb.Database
fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries)
fsPivotFails uint32 // Number of subsequent fast sync failures in the critical section
@@ -109,9 +109,9 @@ type Downloader struct {
rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
// Statistics
- syncStatsChainOrigin uint64 // Origin block number where syncing started at
- syncStatsChainHeight uint64 // Highest block number known when syncing started
- syncStatsStateDone uint64 // Number of state trie entries already pulled
+ syncStatsChainOrigin uint64 // Origin block number where syncing started at
+ syncStatsChainHeight uint64 // Highest block number known when syncing started
+ syncStatsState stateSyncStats
syncStatsLock sync.RWMutex // Lock protecting the sync stats fields
// Callbacks
@@ -136,16 +136,18 @@ type Downloader struct {
notified int32
// Channels
- newPeerCh chan *peer
headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
- stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
- stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks
headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks
+ // for stateFetcher
+ stateSyncStart chan *stateSync
+ trackStateReq chan *stateReq
+ stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
+
// Cancellation and termination
cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop)
cancelCh chan struct{} // Channel to cancel mid-flight syncs
@@ -170,8 +172,9 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader he
dl := &Downloader{
mode: mode,
mux: mux,
- queue: newQueue(stateDb),
+ queue: newQueue(),
peers: newPeerSet(),
+ stateDB: stateDb,
rttEstimate: uint64(rttMaxEstimate),
rttConfidence: uint64(1000000),
hasHeader: hasHeader,
@@ -188,18 +191,20 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader he
insertReceipts: insertReceipts,
rollback: rollback,
dropPeer: dropPeer,
- newPeerCh: make(chan *peer, 1),
headerCh: make(chan dataPack, 1),
bodyCh: make(chan dataPack, 1),
receiptCh: make(chan dataPack, 1),
- stateCh: make(chan dataPack, 1),
bodyWakeCh: make(chan bool, 1),
receiptWakeCh: make(chan bool, 1),
- stateWakeCh: make(chan bool, 1),
headerProcCh: make(chan []*types.Header, 1),
quitCh: make(chan struct{}),
+ // for stateFetcher
+ stateSyncStart: make(chan *stateSync),
+ trackStateReq: make(chan *stateReq),
+ stateCh: make(chan dataPack),
}
go dl.qosTuner()
+ go dl.stateFetcher()
return dl
}
@@ -211,9 +216,6 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader he
// of processed and the total number of known states are also returned. Otherwise
// these are zero.
func (d *Downloader) Progress() ethereum.SyncProgress {
- // Fetch the pending state count outside of the lock to prevent unforeseen deadlocks
- pendingStates := uint64(d.queue.PendingNodeData())
-
// Lock the current stats and return the progress
d.syncStatsLock.RLock()
defer d.syncStatsLock.RUnlock()
@@ -231,8 +233,8 @@ func (d *Downloader) Progress() ethereum.SyncProgress {
StartingBlock: d.syncStatsChainOrigin,
CurrentBlock: current,
HighestBlock: d.syncStatsChainHeight,
- PulledStates: d.syncStatsStateDone,
- KnownStates: d.syncStatsStateDone + pendingStates,
+ PulledStates: d.syncStatsState.processed,
+ KnownStates: d.syncStatsState.processed + d.syncStatsState.pending,
}
}
@@ -324,13 +326,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
d.queue.Reset()
d.peers.Reset()
- for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case <-ch:
default:
}
}
- for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh, d.stateCh} {
+ for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh} {
for empty := false; !empty; {
select {
case <-ch:
@@ -439,30 +441,40 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
if d.syncInitHook != nil {
d.syncInitHook(origin, height)
}
- return d.spawnSync(origin+1,
- func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
- func() error { return d.processHeaders(origin+1, td) }, // Headers are always retrieved
- func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
- func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
- func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync
- )
+
+ fetchers := []func() error{
+ func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
+ func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
+ func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
+ func() error { return d.processHeaders(origin+1, td) },
+ }
+ if d.mode == FastSync {
+ fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) })
+ } else if d.mode == FullSync {
+ fetchers = append(fetchers, d.processFullSyncContent)
+ }
+ err = d.spawnSync(fetchers)
+ if err != nil && d.mode == FastSync && d.fsPivotLock != nil {
+ // If sync failed in the critical section, bump the fail counter.
+ atomic.AddUint32(&d.fsPivotFails, 1)
+ }
+ return err
}
// spawnSync runs d.process and all given fetcher functions to completion in
// separate goroutines, returning the first error that appears.
-func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error {
+func (d *Downloader) spawnSync(fetchers []func() error) error {
var wg sync.WaitGroup
- errc := make(chan error, len(fetchers)+1)
- wg.Add(len(fetchers) + 1)
- go func() { defer wg.Done(); errc <- d.processContent() }()
+ errc := make(chan error, len(fetchers))
+ wg.Add(len(fetchers))
for _, fn := range fetchers {
fn := fn
go func() { defer wg.Done(); errc <- fn() }()
}
// Wait for the first error, then terminate the others.
var err error
- for i := 0; i < len(fetchers)+1; i++ {
- if i == len(fetchers) {
+ for i := 0; i < len(fetchers); i++ {
+ if i == len(fetchers)-1 {
// Close the queue when all fetchers have exited.
// This will cause the block processor to end when
// it has processed the queue.
@@ -475,11 +487,6 @@ func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error {
d.queue.Close()
d.Cancel()
wg.Wait()
-
- // If sync failed in the critical section, bump the fail counter
- if err != nil && d.mode == FastSync && d.fsPivotLock != nil {
- atomic.AddUint32(&d.fsPivotFails, 1)
- }
return err
}
@@ -552,7 +559,6 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
return nil, errTimeout
case <-d.bodyCh:
- case <-d.stateCh:
case <-d.receiptCh:
// Out of bounds delivery, ignore
}
@@ -649,7 +655,6 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
return 0, errTimeout
case <-d.bodyCh:
- case <-d.stateCh:
case <-d.receiptCh:
// Out of bounds delivery, ignore
}
@@ -714,7 +719,6 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
return 0, errTimeout
case <-d.bodyCh:
- case <-d.stateCh:
case <-d.receiptCh:
// Out of bounds delivery, ignore
}
@@ -827,7 +831,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
d.dropPeer(p.id)
// Finish the sync gracefully instead of dumping the gathered data though
- for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case ch <- false:
case <-d.cancelCh:
@@ -927,68 +931,6 @@ func (d *Downloader) fetchReceipts(from uint64) error {
return err
}
-// fetchNodeData iteratively downloads the scheduled state trie nodes, taking any
-// available peers, reserving a chunk of nodes for each, waiting for delivery and
-// also periodically checking for timeouts.
-func (d *Downloader) fetchNodeData() error {
- log.Debug("Downloading node state data")
-
- var (
- deliver = func(packet dataPack) (int, error) {
- start := time.Now()
- return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(delivered int, progressed bool, err error) {
- // If the peer returned old-requested data, forgive
- if err == trie.ErrNotRequested {
- log.Debug("Forgiving reply to stale state request", "peer", packet.PeerId())
- return
- }
- if err != nil {
- // If the node data processing failed, the root hash is very wrong, abort
- log.Error("State processing failed", "peer", packet.PeerId(), "err", err)
- d.Cancel()
- return
- }
- // Processing succeeded, notify state fetcher of continuation
- pending := d.queue.PendingNodeData()
- if pending > 0 {
- select {
- case d.stateWakeCh <- true:
- default:
- }
- }
- d.syncStatsLock.Lock()
- d.syncStatsStateDone += uint64(delivered)
- syncStatsStateDone := d.syncStatsStateDone // Thread safe copy for the log below
- d.syncStatsLock.Unlock()
-
- // If real database progress was made, reset any fast-sync pivot failure
- if progressed && atomic.LoadUint32(&d.fsPivotFails) > 1 {
- log.Debug("Fast-sync progressed, resetting fail counter", "previous", atomic.LoadUint32(&d.fsPivotFails))
- atomic.StoreUint32(&d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block
- }
- // Log a message to the user and return
- if delivered > 0 {
- log.Info("Imported new state entries", "count", delivered, "elapsed", common.PrettyDuration(time.Since(start)), "processed", syncStatsStateDone, "pending", pending)
- }
- })
- }
- expire = func() map[string]int { return d.queue.ExpireNodeData(d.requestTTL()) }
- throttle = func() bool { return false }
- reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
- return d.queue.ReserveNodeData(p, count), false, nil
- }
- fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) }
- capacity = func(p *peer) int { return p.NodeDataCapacity(d.requestRTT()) }
- setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) }
- )
- err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire,
- d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch,
- d.queue.CancelNodeData, capacity, d.peers.NodeDataIdlePeers, setIdle, "states")
-
- log.Debug("Node state data download terminated", "err", err)
- return err
-}
-
// fetchParts iteratively downloads scheduled block parts, taking any available
// peers, reserving a chunk of fetch requests for each, waiting for delivery and
// also periodically checking for timeouts.
@@ -1229,7 +1171,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
// Terminate header processing if we synced up
if len(headers) == 0 {
// Notify everyone that headers are fully processed
- for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case ch <- false:
case <-d.cancelCh:
@@ -1341,7 +1283,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
origin += uint64(limit)
}
// Signal the content downloaders of the availablility of new tasks
- for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case ch <- true:
default:
@@ -1351,71 +1293,151 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
}
}
-// processContent takes fetch results from the queue and tries to import them
-// into the chain. The type of import operation will depend on the result contents.
-func (d *Downloader) processContent() error {
- pivot := d.queue.FastSyncPivot()
+// processFullSyncContent takes fetch results from the queue and imports them into the chain.
+func (d *Downloader) processFullSyncContent() error {
for {
results := d.queue.WaitResults()
if len(results) == 0 {
- return nil // queue empty
+ return nil
}
if d.chainInsertHook != nil {
d.chainInsertHook(results)
}
- // Actually import the blocks
- first, last := results[0].Header, results[len(results)-1].Header
+ if err := d.importBlockResults(results); err != nil {
+ return err
+ }
+ }
+}
+
+func (d *Downloader) importBlockResults(results []*fetchResult) error {
+ for len(results) != 0 {
+ // Check for any termination requests. This makes clean shutdown faster.
+ select {
+ case <-d.quitCh:
+ return errCancelContentProcessing
+ default:
+ }
+ // Retrieve the a batch of results to import
+ items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
+ first, last := results[0].Header, results[items-1].Header
log.Debug("Inserting downloaded chain", "items", len(results),
"firstnum", first.Number, "firsthash", first.Hash(),
"lastnum", last.Number, "lasthash", last.Hash(),
)
- for len(results) != 0 {
- // Check for any termination requests
- select {
- case <-d.quitCh:
- return errCancelContentProcessing
- default:
- }
- // Retrieve the a batch of results to import
- var (
- blocks = make([]*types.Block, 0, maxResultsProcess)
- receipts = make([]types.Receipts, 0, maxResultsProcess)
- )
- items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
- for _, result := range results[:items] {
- switch {
- case d.mode == FullSync:
- blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
- case d.mode == FastSync:
- blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
- if result.Header.Number.Uint64() <= pivot {
- receipts = append(receipts, result.Receipts)
- }
- }
- }
- // Try to process the results, aborting if there's an error
- var (
- err error
- index int
- )
- switch {
- case len(receipts) > 0:
- index, err = d.insertReceipts(blocks, receipts)
- if err == nil && blocks[len(blocks)-1].NumberU64() == pivot {
- log.Debug("Committing block as new head", "number", blocks[len(blocks)-1].Number(), "hash", blocks[len(blocks)-1].Hash())
- index, err = len(blocks)-1, d.commitHeadBlock(blocks[len(blocks)-1].Hash())
- }
- default:
- index, err = d.insertBlocks(blocks)
+ blocks := make([]*types.Block, items)
+ for i, result := range results[:items] {
+ blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
+ }
+ if index, err := d.insertBlocks(blocks); err != nil {
+ log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
+ return errInvalidChain
+ }
+ // Shift the results to the next batch
+ results = results[items:]
+ }
+ return nil
+}
+
+// processFastSyncContent takes fetch results from the queue and writes them to the
+// database. It also controls the synchronisation of state nodes of the pivot block.
+func (d *Downloader) processFastSyncContent(latest *types.Header) error {
+ // Start syncing state of the reported head block.
+ // This should get us most of the state of the pivot block.
+ stateSync := d.syncState(latest.Root)
+ defer stateSync.Cancel()
+ go func() {
+ if err := stateSync.Wait(); err != nil {
+ d.queue.Close() // wake up WaitResults
+ }
+ }()
+
+ pivot := d.queue.FastSyncPivot()
+ for {
+ results := d.queue.WaitResults()
+ if len(results) == 0 {
+ return stateSync.Cancel()
+ }
+ if d.chainInsertHook != nil {
+ d.chainInsertHook(results)
+ }
+ P, beforeP, afterP := splitAroundPivot(pivot, results)
+ if err := d.commitFastSyncData(beforeP, stateSync); err != nil {
+ return err
+ }
+ if P != nil {
+ stateSync.Cancel()
+ if err := d.commitPivotBlock(P); err != nil {
+ return err
}
- if err != nil {
- log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
- return errInvalidChain
+ }
+ if err := d.importBlockResults(afterP); err != nil {
+ return err
+ }
+ }
+}
+
+func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, before, after []*fetchResult) {
+ for _, result := range results {
+ num := result.Header.Number.Uint64()
+ switch {
+ case num < pivot:
+ before = append(before, result)
+ case num == pivot:
+ p = result
+ default:
+ after = append(after, result)
+ }
+ }
+ return p, before, after
+}
+
+func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *stateSync) error {
+ for len(results) != 0 {
+ // Check for any termination requests.
+ select {
+ case <-d.quitCh:
+ return errCancelContentProcessing
+ case <-stateSync.done:
+ if err := stateSync.Wait(); err != nil {
+ return err
}
- // Shift the results to the next batch
- results = results[items:]
+ default:
+ }
+ // Retrieve the a batch of results to import
+ items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
+ first, last := results[0].Header, results[items-1].Header
+ log.Debug("Inserting fast-sync blocks", "items", len(results),
+ "firstnum", first.Number, "firsthash", first.Hash(),
+ "lastnumn", last.Number, "lasthash", last.Hash(),
+ )
+ blocks := make([]*types.Block, items)
+ receipts := make([]types.Receipts, items)
+ for i, result := range results[:items] {
+ blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
+ receipts[i] = result.Receipts
+ }
+ if index, err := d.insertReceipts(blocks, receipts); err != nil {
+ log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
+ return errInvalidChain
}
+ // Shift the results to the next batch
+ results = results[items:]
+ }
+ return nil
+}
+
+func (d *Downloader) commitPivotBlock(result *fetchResult) error {
+ b := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
+ // Sync the pivot block state. This should complete reasonably quickly because
+ // we've already synced up to the reported head block state earlier.
+ if err := d.syncState(b.Root()).Wait(); err != nil {
+ return err
+ }
+ log.Debug("Committing fast sync pivot as new head", "number", b.Number(), "hash", b.Hash())
+ if _, err := d.insertReceipts([]*types.Block{b}, []types.Receipts{result.Receipts}); err != nil {
+ return err
}
+ return d.commitHeadBlock(b.Hash())
}
// DeliverHeaders injects a new batch of block headers received from a remote