diff options
Diffstat (limited to 'eth/downloader/queue.go')
-rw-r--r-- | eth/downloader/queue.go | 293 |
1 files changed, 12 insertions, 281 deletions
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 855097c45..8a7735d67 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -26,20 +26,13 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/trie" "github.com/rcrowley/go-metrics" "gopkg.in/karalabe/cookiejar.v2/collections/prque" ) -var ( - blockCacheLimit = 8192 // Maximum number of blocks to cache before throttling the download - maxInFlightStates = 8192 // Maximum number of state downloads to allow concurrently -) +var blockCacheLimit = 8192 // Maximum number of blocks to cache before throttling the download var ( errNoFetchesPending = errors.New("no fetches pending") @@ -94,15 +87,6 @@ type queue struct { receiptPendPool map[string]*fetchRequest // [eth/63] Currently pending receipt retrieval operations receiptDonePool map[common.Hash]struct{} // [eth/63] Set of the completed receipt fetches - stateTaskIndex int // [eth/63] Counter indexing the added hashes to ensure prioritised retrieval order - stateTaskPool map[common.Hash]int // [eth/63] Pending node data retrieval tasks, mapping to their priority - stateTaskQueue *prque.Prque // [eth/63] Priority queue of the hashes to fetch the node data for - statePendPool map[string]*fetchRequest // [eth/63] Currently pending node data retrieval operations - - stateDatabase ethdb.Database // [eth/63] Trie database to populate during state reassembly - stateScheduler *state.StateSync // [eth/63] State trie synchronisation scheduler and integrator - stateWriters int // [eth/63] Number of running state DB writer goroutines - resultCache []*fetchResult // Downloaded but not yet delivered fetch results resultOffset uint64 // Offset of the first cached fetch result in the block chain @@ -112,7 +96,7 @@ type queue struct { } // newQueue creates a new download queue for scheduling block retrieval. -func newQueue(stateDb ethdb.Database) *queue { +func newQueue() *queue { lock := new(sync.Mutex) return &queue{ headerPendPool: make(map[string]*fetchRequest), @@ -125,10 +109,6 @@ func newQueue(stateDb ethdb.Database) *queue { receiptTaskQueue: prque.New(), receiptPendPool: make(map[string]*fetchRequest), receiptDonePool: make(map[common.Hash]struct{}), - stateTaskPool: make(map[common.Hash]int), - stateTaskQueue: prque.New(), - statePendPool: make(map[string]*fetchRequest), - stateDatabase: stateDb, resultCache: make([]*fetchResult, blockCacheLimit), active: sync.NewCond(lock), lock: lock, @@ -158,12 +138,6 @@ func (q *queue) Reset() { q.receiptPendPool = make(map[string]*fetchRequest) q.receiptDonePool = make(map[common.Hash]struct{}) - q.stateTaskIndex = 0 - q.stateTaskPool = make(map[common.Hash]int) - q.stateTaskQueue.Reset() - q.statePendPool = make(map[string]*fetchRequest) - q.stateScheduler = nil - q.resultCache = make([]*fetchResult, blockCacheLimit) q.resultOffset = 0 } @@ -201,28 +175,6 @@ func (q *queue) PendingReceipts() int { return q.receiptTaskQueue.Size() } -// PendingNodeData retrieves the number of node data entries pending for retrieval. -func (q *queue) PendingNodeData() int { - q.lock.Lock() - defer q.lock.Unlock() - - return q.pendingNodeDataLocked() -} - -// pendingNodeDataLocked retrieves the number of node data entries pending for retrieval. -// The caller must hold q.lock. -func (q *queue) pendingNodeDataLocked() int { - var n int - if q.stateScheduler != nil { - n = q.stateScheduler.Pending() - } - // Ensure that PendingNodeData doesn't return 0 until all state is written. - if q.stateWriters > 0 { - n++ - } - return n -} - // InFlightHeaders retrieves whether there are header fetch requests currently // in flight. func (q *queue) InFlightHeaders() bool { @@ -250,28 +202,15 @@ func (q *queue) InFlightReceipts() bool { return len(q.receiptPendPool) > 0 } -// InFlightNodeData retrieves whether there are node data entry fetch requests -// currently in flight. -func (q *queue) InFlightNodeData() bool { - q.lock.Lock() - defer q.lock.Unlock() - - return len(q.statePendPool)+q.stateWriters > 0 -} - -// Idle returns if the queue is fully idle or has some data still inside. This -// method is used by the tester to detect termination events. +// Idle returns if the queue is fully idle or has some data still inside. func (q *queue) Idle() bool { q.lock.Lock() defer q.lock.Unlock() - queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size() - pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool) + queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + pending := len(q.blockPendPool) + len(q.receiptPendPool) cached := len(q.blockDonePool) + len(q.receiptDonePool) - if q.stateScheduler != nil { - queued += q.stateScheduler.Pending() - } return (queued + pending + cached) == 0 } @@ -389,19 +328,6 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { q.receiptTaskPool[hash] = header q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64())) } - if q.mode == FastSync && header.Number.Uint64() == q.fastSyncPivot { - // Pivoting point of the fast sync, switch the state retrieval to this - log.Debug("Switching state downloads to new block", "number", header.Number, "hash", hash) - - q.stateTaskIndex = 0 - q.stateTaskPool = make(map[common.Hash]int) - q.stateTaskQueue.Reset() - for _, req := range q.statePendPool { - req.Hashes = make(map[common.Hash]int) // Make sure executing requests fail, but don't disappear - } - - q.stateScheduler = state.NewStateSync(header.Root, q.stateDatabase) - } inserts = append(inserts, header) q.headerHead = hash from++ @@ -448,31 +374,15 @@ func (q *queue) countProcessableItems() int { if result == nil || result.Pending > 0 { return i } - // Special handling for the fast-sync pivot block: - if q.mode == FastSync { - bnum := result.Header.Number.Uint64() - if bnum == q.fastSyncPivot { - // If the state of the pivot block is not - // available yet, we cannot proceed and return 0. - // - // Stop before processing the pivot block to ensure that - // resultCache has space for fsHeaderForceVerify items. Not - // doing this could leave us unable to download the required - // amount of headers. - if i > 0 || len(q.stateTaskPool) > 0 || q.pendingNodeDataLocked() > 0 { + // Stop before processing the pivot block to ensure that + // resultCache has space for fsHeaderForceVerify items. Not + // doing this could leave us unable to download the required + // amount of headers. + if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot { + for j := 0; j < fsHeaderForceVerify; j++ { + if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil { return i } - for j := 0; j < fsHeaderForceVerify; j++ { - if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil { - return i - } - } - } - // If we're just the fast sync pivot, stop as well - // because the following batch needs different insertion. - // This simplifies handling the switchover in d.process. - if bnum == q.fastSyncPivot+1 && i > 0 { - return i } } } @@ -519,81 +429,6 @@ func (q *queue) ReserveHeaders(p *peer, count int) *fetchRequest { return request } -// ReserveNodeData reserves a set of node data hashes for the given peer, skipping -// any previously failed download. -func (q *queue) ReserveNodeData(p *peer, count int) *fetchRequest { - // Create a task generator to fetch status-fetch tasks if all schedules ones are done - generator := func(max int) { - if q.stateScheduler != nil { - for _, hash := range q.stateScheduler.Missing(max) { - q.stateTaskPool[hash] = q.stateTaskIndex - q.stateTaskQueue.Push(hash, -float32(q.stateTaskIndex)) - q.stateTaskIndex++ - } - } - } - q.lock.Lock() - defer q.lock.Unlock() - - return q.reserveHashes(p, count, q.stateTaskQueue, generator, q.statePendPool, maxInFlightStates) -} - -// reserveHashes reserves a set of hashes for the given peer, skipping previously -// failed ones. -// -// Note, this method expects the queue lock to be already held for writing. The -// reason the lock is not obtained in here is because the parameters already need -// to access the queue, so they already need a lock anyway. -func (q *queue) reserveHashes(p *peer, count int, taskQueue *prque.Prque, taskGen func(int), pendPool map[string]*fetchRequest, maxPending int) *fetchRequest { - // Short circuit if the peer's already downloading something (sanity check to - // not corrupt state) - if _, ok := pendPool[p.id]; ok { - return nil - } - // Calculate an upper limit on the hashes we might fetch (i.e. throttling) - allowance := maxPending - if allowance > 0 { - for _, request := range pendPool { - allowance -= len(request.Hashes) - } - } - // If there's a task generator, ask it to fill our task queue - if taskGen != nil && taskQueue.Size() < allowance { - taskGen(allowance - taskQueue.Size()) - } - if taskQueue.Empty() { - return nil - } - // Retrieve a batch of hashes, skipping previously failed ones - send := make(map[common.Hash]int) - skip := make(map[common.Hash]int) - - for proc := 0; (allowance == 0 || proc < allowance) && len(send) < count && !taskQueue.Empty(); proc++ { - hash, priority := taskQueue.Pop() - if p.Lacks(hash.(common.Hash)) { - skip[hash.(common.Hash)] = int(priority) - } else { - send[hash.(common.Hash)] = int(priority) - } - } - // Merge all the skipped hashes back - for hash, index := range skip { - taskQueue.Push(hash, float32(index)) - } - // Assemble and return the block download request - if len(send) == 0 { - return nil - } - request := &fetchRequest{ - Peer: p, - Hashes: send, - Time: time.Now(), - } - pendPool[p.id] = request - - return request -} - // ReserveBodies reserves a set of body fetches for the given peer, skipping any // previously failed downloads. Beside the next batch of needed fetches, it also // returns a flag whether empty blocks were queued requiring processing. @@ -722,12 +557,6 @@ func (q *queue) CancelReceipts(request *fetchRequest) { q.cancel(request, q.receiptTaskQueue, q.receiptPendPool) } -// CancelNodeData aborts a node state data fetch request, returning all pending -// hashes to the task queue. -func (q *queue) CancelNodeData(request *fetchRequest) { - q.cancel(request, q.stateTaskQueue, q.statePendPool) -} - // Cancel aborts a fetch request, returning all pending hashes to the task queue. func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) { q.lock.Lock() @@ -764,12 +593,6 @@ func (q *queue) Revoke(peerId string) { } delete(q.receiptPendPool, peerId) } - if request, ok := q.statePendPool[peerId]; ok { - for hash, index := range request.Hashes { - q.stateTaskQueue.Push(hash, float32(index)) - } - delete(q.statePendPool, peerId) - } } // ExpireHeaders checks for in flight requests that exceeded a timeout allowance, @@ -799,15 +622,6 @@ func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int { return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue, receiptTimeoutMeter) } -// ExpireNodeData checks for in flight node data requests that exceeded a timeout -// allowance, canceling them and returning the responsible peers for penalisation. -func (q *queue) ExpireNodeData(timeout time.Duration) map[string]int { - q.lock.Lock() - defer q.lock.Unlock() - - return q.expire(timeout, q.statePendPool, q.stateTaskQueue, stateTimeoutMeter) -} - // expire is the generic check that move expired tasks from a pending pool back // into a task pool, returning all entities caught with expired tasks. // @@ -1044,84 +858,6 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ } } -// DeliverNodeData injects a node state data retrieval response into the queue. -// The method returns the number of node state accepted from the delivery. -func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(int, bool, error)) (int, error) { - q.lock.Lock() - defer q.lock.Unlock() - - // Short circuit if the data was never requested - request := q.statePendPool[id] - if request == nil { - return 0, errNoFetchesPending - } - stateReqTimer.UpdateSince(request.Time) - delete(q.statePendPool, id) - - // If no data was retrieved, mark their hashes as unavailable for the origin peer - if len(data) == 0 { - for hash := range request.Hashes { - request.Peer.MarkLacking(hash) - } - } - // Iterate over the downloaded data and verify each of them - errs := make([]error, 0) - process := []trie.SyncResult{} - for _, blob := range data { - // Skip any state trie entries that were not requested - hash := common.BytesToHash(crypto.Keccak256(blob)) - if _, ok := request.Hashes[hash]; !ok { - errs = append(errs, fmt.Errorf("non-requested state data %x", hash)) - continue - } - // Inject the next state trie item into the processing queue - process = append(process, trie.SyncResult{Hash: hash, Data: blob}) - delete(request.Hashes, hash) - delete(q.stateTaskPool, hash) - } - // Return all failed or missing fetches to the queue - for hash, index := range request.Hashes { - q.stateTaskQueue.Push(hash, float32(index)) - } - if q.stateScheduler == nil { - return 0, errNoFetchesPending - } - - // Run valid nodes through the trie download scheduler. It writes completed nodes to a - // batch, which is committed asynchronously. This may lead to over-fetches because the - // scheduler treats everything as written after Process has returned, but it's - // unlikely to be an issue in practice. - batch := q.stateDatabase.NewBatch() - progressed, nproc, procerr := q.stateScheduler.Process(process, batch) - q.stateWriters += 1 - go func() { - if procerr == nil { - nproc = len(process) - procerr = batch.Write() - } - // Return processing errors through the callback so the sync gets canceled. The - // number of writers is decremented prior to the call so PendingNodeData will - // return zero when the callback runs. - q.lock.Lock() - q.stateWriters -= 1 - q.lock.Unlock() - callback(nproc, progressed, procerr) - // Wake up WaitResults after the state has been written because it might be - // waiting for completion of the pivot block's state download. - q.active.Signal() - }() - - // If none of the data items were good, it's a stale delivery - switch { - case len(errs) == 0: - return len(process), nil - case len(errs) == len(request.Hashes): - return len(process), errStaleDelivery - default: - return len(process), fmt.Errorf("multiple failures: %v", errs) - } -} - // Prepare configures the result cache to allow accepting and caching inbound // fetch results. func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types.Header) { @@ -1134,9 +870,4 @@ func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types. } q.fastSyncPivot = pivot q.mode = mode - - // If long running fast sync, also start up a head stateretrieval immediately - if mode == FastSync && pivot > 0 { - q.stateScheduler = state.NewStateSync(head.Root, q.stateDatabase) - } } |