aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/queue.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader/queue.go')
-rw-r--r--eth/downloader/queue.go301
1 files changed, 164 insertions, 137 deletions
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index 56b46e285..1e55560db 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -101,11 +101,14 @@ type queue struct {
resultCache []*fetchResult // Downloaded but not yet delivered fetch results
resultOffset uint64 // Offset of the first cached fetch result in the block chain
- lock sync.RWMutex
+ lock *sync.Mutex
+ active *sync.Cond
+ closed bool
}
// newQueue creates a new download queue for scheduling block retrieval.
func newQueue(stateDb ethdb.Database) *queue {
+ lock := new(sync.Mutex)
return &queue{
hashPool: make(map[common.Hash]int),
hashQueue: prque.New(),
@@ -122,6 +125,8 @@ func newQueue(stateDb ethdb.Database) *queue {
statePendPool: make(map[string]*fetchRequest),
stateDatabase: stateDb,
resultCache: make([]*fetchResult, blockCacheLimit),
+ active: sync.NewCond(lock),
+ lock: lock,
}
}
@@ -133,6 +138,7 @@ func (q *queue) Reset() {
q.stateSchedLock.Lock()
defer q.stateSchedLock.Unlock()
+ q.closed = false
q.mode = FullSync
q.fastSyncPivot = 0
@@ -162,18 +168,27 @@ func (q *queue) Reset() {
q.resultOffset = 0
}
+// Close marks the end of the sync, unblocking WaitResults.
+// It may be called even if the queue is already closed.
+func (q *queue) Close() {
+ q.lock.Lock()
+ q.closed = true
+ q.lock.Unlock()
+ q.active.Broadcast()
+}
+
// PendingBlocks retrieves the number of block (body) requests pending for retrieval.
func (q *queue) PendingBlocks() int {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
return q.hashQueue.Size() + q.blockTaskQueue.Size()
}
// PendingReceipts retrieves the number of block receipts pending for retrieval.
func (q *queue) PendingReceipts() int {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
return q.receiptTaskQueue.Size()
}
@@ -192,8 +207,8 @@ func (q *queue) PendingNodeData() int {
// InFlightBlocks retrieves whether there are block fetch requests currently in
// flight.
func (q *queue) InFlightBlocks() bool {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
return len(q.blockPendPool) > 0
}
@@ -201,8 +216,8 @@ func (q *queue) InFlightBlocks() bool {
// InFlightReceipts retrieves whether there are receipt fetch requests currently
// in flight.
func (q *queue) InFlightReceipts() bool {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
return len(q.receiptPendPool) > 0
}
@@ -210,8 +225,8 @@ func (q *queue) InFlightReceipts() bool {
// InFlightNodeData retrieves whether there are node data entry fetch requests
// currently in flight.
func (q *queue) InFlightNodeData() bool {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
return len(q.statePendPool)+int(atomic.LoadInt32(&q.stateProcessors)) > 0
}
@@ -219,8 +234,8 @@ func (q *queue) InFlightNodeData() bool {
// Idle returns if the queue is fully idle or has some data still inside. This
// method is used by the tester to detect termination events.
func (q *queue) Idle() bool {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
queued := q.hashQueue.Size() + q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size()
pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool)
@@ -237,8 +252,8 @@ func (q *queue) Idle() bool {
// FastSyncPivot retrieves the currently used fast sync pivot point.
func (q *queue) FastSyncPivot() uint64 {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
return q.fastSyncPivot
}
@@ -246,8 +261,8 @@ func (q *queue) FastSyncPivot() uint64 {
// ShouldThrottleBlocks checks if the download should be throttled (active block (body)
// fetches exceed block cache).
func (q *queue) ShouldThrottleBlocks() bool {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
// Calculate the currently in-flight block (body) requests
pending := 0
@@ -261,8 +276,8 @@ func (q *queue) ShouldThrottleBlocks() bool {
// ShouldThrottleReceipts checks if the download should be throttled (active receipt
// fetches exceed block cache).
func (q *queue) ShouldThrottleReceipts() bool {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
// Calculate the currently in-flight receipt requests
pending := 0
@@ -351,91 +366,74 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
return inserts
}
-// GetHeadResult retrieves the first fetch result from the cache, or nil if it hasn't
-// been downloaded yet (or simply non existent).
-func (q *queue) GetHeadResult() *fetchResult {
- q.lock.RLock()
- defer q.lock.RUnlock()
+// WaitResults retrieves and permanently removes a batch of fetch
+// results from the cache. the result slice will be empty if the queue
+// has been closed.
+func (q *queue) WaitResults() []*fetchResult {
+ q.lock.Lock()
+ defer q.lock.Unlock()
- // If there are no results pending, return nil
- if len(q.resultCache) == 0 || q.resultCache[0] == nil {
- return nil
- }
- // If the next result is still incomplete, return nil
- if q.resultCache[0].Pending > 0 {
- return nil
+ nproc := q.countProcessableItems()
+ for nproc == 0 && !q.closed {
+ q.active.Wait()
+ nproc = q.countProcessableItems()
}
- // If the next result is the fast sync pivot...
- if q.mode == FastSync && q.resultCache[0].Header.Number.Uint64() == q.fastSyncPivot {
- // If the pivot state trie is still being pulled, return nil
- if len(q.stateTaskPool) > 0 {
- return nil
+ results := make([]*fetchResult, nproc)
+ copy(results, q.resultCache[:nproc])
+ if len(results) > 0 {
+ // Mark results as done before dropping them from the cache.
+ for _, result := range results {
+ hash := result.Header.Hash()
+ delete(q.blockDonePool, hash)
+ delete(q.receiptDonePool, hash)
}
- if q.PendingNodeData() > 0 {
- return nil
- }
- // If the state is done, but not enough post-pivot headers were verified, stall...
- for i := 0; i < fsHeaderForceVerify; i++ {
- if i+1 >= len(q.resultCache) || q.resultCache[i+1] == nil {
- return nil
- }
+ // Delete the results from the cache and clear the tail.
+ copy(q.resultCache, q.resultCache[nproc:])
+ for i := len(q.resultCache) - nproc; i < len(q.resultCache); i++ {
+ q.resultCache[i] = nil
}
+ // Advance the expected block number of the first cache entry.
+ q.resultOffset += uint64(nproc)
}
- return q.resultCache[0]
+ return results
}
-// TakeResults retrieves and permanently removes a batch of fetch results from
-// the cache.
-func (q *queue) TakeResults() []*fetchResult {
- q.lock.Lock()
- defer q.lock.Unlock()
-
- // Accumulate all available results
- results := []*fetchResult{}
+// countProcessableItems counts the processable items.
+func (q *queue) countProcessableItems() int {
for i, result := range q.resultCache {
- // Stop if no more results are ready
+ // Don't process incomplete or unavailable items.
if result == nil || result.Pending > 0 {
- break
+ return i
}
- // The fast sync pivot block may only be processed after state fetch completes
- if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot {
- if len(q.stateTaskPool) > 0 {
- break
- }
- if q.PendingNodeData() > 0 {
- break
- }
- // Even is state fetch is done, ensure post-pivot headers passed verifications
- safe := true
- for j := 0; j < fsHeaderForceVerify; j++ {
- if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil {
- safe = false
+ // Special handling for the fast-sync pivot block:
+ if q.mode == FastSync {
+ bnum := result.Header.Number.Uint64()
+ if bnum == q.fastSyncPivot {
+ // If the state of the pivot block is not
+ // available yet, we cannot proceed and return 0.
+ //
+ // Stop before processing the pivot block to ensure that
+ // resultCache has space for fsHeaderForceVerify items. Not
+ // doing this could leave us unable to download the required
+ // amount of headers.
+ if i > 0 || len(q.stateTaskPool) > 0 || q.PendingNodeData() > 0 {
+ return i
+ }
+ for j := 0; j < fsHeaderForceVerify; j++ {
+ if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil {
+ return i
+ }
}
}
- if !safe {
- break
+ // If we're just the fast sync pivot, stop as well
+ // because the following batch needs different insertion.
+ // This simplifies handling the switchover in d.process.
+ if bnum == q.fastSyncPivot+1 && i > 0 {
+ return i
}
}
- // If we've just inserted the fast sync pivot, stop as the following batch needs different insertion
- if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot+1 && len(results) > 0 {
- break
- }
- results = append(results, result)
-
- hash := result.Header.Hash()
- delete(q.blockDonePool, hash)
- delete(q.receiptDonePool, hash)
- }
- // Delete the results from the slice and let them be garbage collected
- // without this slice trick the results would stay in memory until nil
- // would be assigned to them.
- copy(q.resultCache, q.resultCache[len(results):])
- for k, n := len(q.resultCache)-len(results), len(q.resultCache); k < n; k++ {
- q.resultCache[k] = nil
}
- q.resultOffset += uint64(len(results))
-
- return results
+ return len(q.resultCache)
}
// ReserveBlocks reserves a set of block hashes for the given peer, skipping any
@@ -501,7 +499,7 @@ func (q *queue) reserveHashes(p *peer, count int, taskQueue *prque.Prque, taskGe
for proc := 0; (allowance == 0 || proc < allowance) && len(send) < count && !taskQueue.Empty(); proc++ {
hash, priority := taskQueue.Pop()
- if p.ignored.Has(hash) {
+ if p.Lacks(hash.(common.Hash)) {
skip[hash.(common.Hash)] = int(priority)
} else {
send[hash.(common.Hash)] = int(priority)
@@ -584,6 +582,7 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ
// If we're the first to request this task, initialise the result container
index := int(header.Number.Int64() - int64(q.resultOffset))
if index >= len(q.resultCache) || index < 0 {
+ common.Report("index allocation went beyond available resultCache space")
return nil, false, errInvalidChain
}
if q.resultCache[index] == nil {
@@ -607,7 +606,7 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ
continue
}
// Otherwise unless the peer is known not to have the data, add to the retrieve list
- if p.ignored.Has(header.Hash()) {
+ if p.Lacks(header.Hash()) {
skip = append(skip, header)
} else {
send = append(send, header)
@@ -617,6 +616,10 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ
for _, header := range skip {
taskQueue.Push(header, -float32(header.Number.Uint64()))
}
+ if progress {
+ // Wake WaitResults, resultCache was modified
+ q.active.Signal()
+ }
// Assemble and return the block download request
if len(send) == 0 {
return nil, progress, nil
@@ -700,7 +703,7 @@ func (q *queue) Revoke(peerId string) {
// ExpireBlocks checks for in flight requests that exceeded a timeout allowance,
// canceling them and returning the responsible peers for penalisation.
-func (q *queue) ExpireBlocks(timeout time.Duration) []string {
+func (q *queue) ExpireBlocks(timeout time.Duration) map[string]int {
q.lock.Lock()
defer q.lock.Unlock()
@@ -709,7 +712,7 @@ func (q *queue) ExpireBlocks(timeout time.Duration) []string {
// ExpireBodies checks for in flight block body requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalisation.
-func (q *queue) ExpireBodies(timeout time.Duration) []string {
+func (q *queue) ExpireBodies(timeout time.Duration) map[string]int {
q.lock.Lock()
defer q.lock.Unlock()
@@ -718,7 +721,7 @@ func (q *queue) ExpireBodies(timeout time.Duration) []string {
// ExpireReceipts checks for in flight receipt requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalisation.
-func (q *queue) ExpireReceipts(timeout time.Duration) []string {
+func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int {
q.lock.Lock()
defer q.lock.Unlock()
@@ -727,7 +730,7 @@ func (q *queue) ExpireReceipts(timeout time.Duration) []string {
// ExpireNodeData checks for in flight node data requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalisation.
-func (q *queue) ExpireNodeData(timeout time.Duration) []string {
+func (q *queue) ExpireNodeData(timeout time.Duration) map[string]int {
q.lock.Lock()
defer q.lock.Unlock()
@@ -737,12 +740,12 @@ func (q *queue) ExpireNodeData(timeout time.Duration) []string {
// expire is the generic check that move expired tasks from a pending pool back
// into a task pool, returning all entities caught with expired tasks.
//
-// Note, this method expects the queue lock to be already held for writing. The
+// Note, this method expects the queue lock to be already held. The
// reason the lock is not obtained in here is because the parameters already need
// to access the queue, so they already need a lock anyway.
-func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) []string {
+func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) map[string]int {
// Iterate over the expired requests and return each to the queue
- peers := []string{}
+ expiries := make(map[string]int)
for id, request := range pendPool {
if time.Since(request.Time) > timeout {
// Update the metrics with the timeout
@@ -755,25 +758,32 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest,
for _, header := range request.Headers {
taskQueue.Push(header, -float32(header.Number.Uint64()))
}
- peers = append(peers, id)
+ // Add the peer to the expiry report along the the number of failed requests
+ expirations := len(request.Hashes)
+ if expirations < len(request.Headers) {
+ expirations = len(request.Headers)
+ }
+ expiries[id] = expirations
}
}
// Remove the expired requests from the pending pool
- for _, id := range peers {
+ for id, _ := range expiries {
delete(pendPool, id)
}
- return peers
+ return expiries
}
-// DeliverBlocks injects a block retrieval response into the download queue.
-func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
+// DeliverBlocks injects a block retrieval response into the download queue. The
+// method returns the number of blocks accepted from the delivery and also wakes
+// any threads waiting for data delivery.
+func (q *queue) DeliverBlocks(id string, blocks []*types.Block) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
// Short circuit if the blocks were never requested
request := q.blockPendPool[id]
if request == nil {
- return errNoFetchesPending
+ return 0, errNoFetchesPending
}
blockReqTimer.UpdateSince(request.Time)
delete(q.blockPendPool, id)
@@ -781,11 +791,11 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
// If no blocks were retrieved, mark them as unavailable for the origin peer
if len(blocks) == 0 {
for hash, _ := range request.Hashes {
- request.Peer.ignored.Add(hash)
+ request.Peer.MarkLacking(hash)
}
}
// Iterate over the downloaded blocks and add each of them
- errs := make([]error, 0)
+ accepted, errs := 0, make([]error, 0)
for _, block := range blocks {
// Skip any blocks that were not requested
hash := block.Hash()
@@ -808,29 +818,33 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
delete(request.Hashes, hash)
delete(q.hashPool, hash)
+ accepted++
}
// Return all failed or missing fetches to the queue
for hash, index := range request.Hashes {
q.hashQueue.Push(hash, float32(index))
}
+ // Wake up WaitResults
+ if accepted > 0 {
+ q.active.Signal()
+ }
// If none of the blocks were good, it's a stale delivery
switch {
case len(errs) == 0:
- return nil
-
+ return accepted, nil
case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBlock):
- return errs[0]
-
+ return accepted, errs[0]
case len(errs) == len(blocks):
- return errStaleDelivery
-
+ return accepted, errStaleDelivery
default:
- return fmt.Errorf("multiple failures: %v", errs)
+ return accepted, fmt.Errorf("multiple failures: %v", errs)
}
}
// DeliverBodies injects a block body retrieval response into the results queue.
-func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error {
+// The method returns the number of blocks bodies accepted from the delivery and
+// also wakes any threads waiting for data delivery.
+func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
@@ -846,7 +860,9 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLi
}
// DeliverReceipts injects a receipt retrieval response into the results queue.
-func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) error {
+// The method returns the number of transaction receipts accepted from the delivery
+// and also wakes any threads waiting for data delivery.
+func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
@@ -865,26 +881,29 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) error
// Note, this method expects the queue lock to be already held for writing. The
// reason the lock is not obtained in here is because the parameters already need
// to access the queue, so they already need a lock anyway.
-func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, pendPool map[string]*fetchRequest,
- donePool map[common.Hash]struct{}, reqTimer metrics.Timer, results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) error {
+func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
+ pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, reqTimer metrics.Timer,
+ results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) (int, error) {
+
// Short circuit if the data was never requested
request := pendPool[id]
if request == nil {
- return errNoFetchesPending
+ return 0, errNoFetchesPending
}
reqTimer.UpdateSince(request.Time)
delete(pendPool, id)
// If no data items were retrieved, mark them as unavailable for the origin peer
if results == 0 {
- for hash, _ := range request.Headers {
- request.Peer.ignored.Add(hash)
+ for _, header := range request.Headers {
+ request.Peer.MarkLacking(header.Hash())
}
}
// Assemble each of the results with their headers and retrieved data parts
var (
- failure error
- useful bool
+ accepted int
+ failure error
+ useful bool
)
for i, header := range request.Headers {
// Short circuit assembly if no more fetch results are found
@@ -904,6 +923,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
donePool[header.Hash()] = struct{}{}
q.resultCache[index].Pending--
useful = true
+ accepted++
// Clean up a successful fetch
request.Headers[i] = nil
@@ -915,28 +935,32 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
taskQueue.Push(header, -float32(header.Number.Uint64()))
}
}
+ // Wake up WaitResults
+ if accepted > 0 {
+ q.active.Signal()
+ }
// If none of the data was good, it's a stale delivery
switch {
case failure == nil || failure == errInvalidChain:
- return failure
-
+ return accepted, failure
case useful:
- return fmt.Errorf("partial failure: %v", failure)
-
+ return accepted, fmt.Errorf("partial failure: %v", failure)
default:
- return errStaleDelivery
+ return accepted, errStaleDelivery
}
}
// DeliverNodeData injects a node state data retrieval response into the queue.
-func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) error {
+// The method returns the number of node state entries originally requested, and
+// the number of them actually accepted from the delivery.
+func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
// Short circuit if the data was never requested
request := q.statePendPool[id]
if request == nil {
- return errNoFetchesPending
+ return 0, errNoFetchesPending
}
stateReqTimer.UpdateSince(request.Time)
delete(q.statePendPool, id)
@@ -944,14 +968,14 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
// If no data was retrieved, mark their hashes as unavailable for the origin peer
if len(data) == 0 {
for hash, _ := range request.Hashes {
- request.Peer.ignored.Add(hash)
+ request.Peer.MarkLacking(hash)
}
}
// Iterate over the downloaded data and verify each of them
- errs := make([]error, 0)
+ accepted, errs := 0, make([]error, 0)
process := []trie.SyncResult{}
for _, blob := range data {
- // Skip any blocks that were not requested
+ // Skip any state trie entires that were not requested
hash := common.BytesToHash(crypto.Sha3(blob))
if _, ok := request.Hashes[hash]; !ok {
errs = append(errs, fmt.Errorf("non-requested state data %x", hash))
@@ -959,6 +983,7 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
}
// Inject the next state trie item into the processing queue
process = append(process, trie.SyncResult{hash, blob})
+ accepted++
delete(request.Hashes, hash)
delete(q.stateTaskPool, hash)
@@ -976,19 +1001,21 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
// If none of the data items were good, it's a stale delivery
switch {
case len(errs) == 0:
- return nil
-
+ return accepted, nil
case len(errs) == len(request.Hashes):
- return errStaleDelivery
-
+ return accepted, errStaleDelivery
default:
- return fmt.Errorf("multiple failures: %v", errs)
+ return accepted, fmt.Errorf("multiple failures: %v", errs)
}
}
// deliverNodeData is the asynchronous node data processor that injects a batch
// of sync results into the state scheduler.
func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(error, int)) {
+ // Wake up WaitResults after the state has been written because it
+ // might be waiting for the pivot block state to get completed.
+ defer q.active.Signal()
+
// Process results one by one to permit task fetches in between
for i, result := range results {
q.stateSchedLock.Lock()