aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/queue.go
diff options
context:
space:
mode:
authorFelix Lange <fjl@twurst.com>2015-11-14 00:08:15 +0800
committerFelix Lange <fjl@twurst.com>2015-11-19 21:18:34 +0800
commit900da3d800ad299c22ecb5d14f477600931d70b6 (patch)
treeba63e738bd44c77a1c161cf906ddab374beb8f43 /eth/downloader/queue.go
parent9422eec55460aaca300cabd52124ed0cbd8dedd3 (diff)
downloaddexon-900da3d800ad299c22ecb5d14f477600931d70b6.tar
dexon-900da3d800ad299c22ecb5d14f477600931d70b6.tar.gz
dexon-900da3d800ad299c22ecb5d14f477600931d70b6.tar.bz2
dexon-900da3d800ad299c22ecb5d14f477600931d70b6.tar.lz
dexon-900da3d800ad299c22ecb5d14f477600931d70b6.tar.xz
dexon-900da3d800ad299c22ecb5d14f477600931d70b6.tar.zst
dexon-900da3d800ad299c22ecb5d14f477600931d70b6.zip
eth/downloader: don't hang for spurious deliveries
Unexpected deliveries could block indefinitely if they arrived at the right time. The fix is to ensure that the cancellation channel is always closed when the sync ends, unblocking any deliveries. Also remove the atomic check for whether a sync is currently running because it doesn't help and can be misleading. Cancelling always seems to break the tests though. The downloader spawned d.process whenever new data arrived, making it somewhat hard to track when block processing was actually done. Fix this by running d.process in a dedicated goroutine that is tied to the lifecycle of the sync. d.process gets notified of new work by the queue instead of being invoked all the time. This removes a ton of weird workaround code, including a hairy use of atomic CAS.
Diffstat (limited to 'eth/downloader/queue.go')
-rw-r--r--eth/downloader/queue.go198
1 files changed, 101 insertions, 97 deletions
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index 1fb5b6e12..584797d7b 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -101,11 +101,14 @@ type queue struct {
resultCache []*fetchResult // Downloaded but not yet delivered fetch results
resultOffset uint64 // Offset of the first cached fetch result in the block chain
- lock sync.RWMutex
+ lock *sync.Mutex
+ active *sync.Cond
+ closed bool
}
// newQueue creates a new download queue for scheduling block retrieval.
func newQueue(stateDb ethdb.Database) *queue {
+ lock := new(sync.Mutex)
return &queue{
hashPool: make(map[common.Hash]int),
hashQueue: prque.New(),
@@ -122,6 +125,8 @@ func newQueue(stateDb ethdb.Database) *queue {
statePendPool: make(map[string]*fetchRequest),
stateDatabase: stateDb,
resultCache: make([]*fetchResult, blockCacheLimit),
+ active: sync.NewCond(lock),
+ lock: lock,
}
}
@@ -133,6 +138,7 @@ func (q *queue) Reset() {
q.stateSchedLock.Lock()
defer q.stateSchedLock.Unlock()
+ q.closed = false
q.mode = FullSync
q.fastSyncPivot = 0
@@ -162,18 +168,27 @@ func (q *queue) Reset() {
q.resultOffset = 0
}
+// Close marks the end of the sync, unblocking WaitResults.
+// It may be called even if the queue is already closed.
+func (q *queue) Close() {
+ q.lock.Lock()
+ q.closed = true
+ q.lock.Unlock()
+ q.active.Broadcast()
+}
+
// PendingBlocks retrieves the number of block (body) requests pending for retrieval.
func (q *queue) PendingBlocks() int {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
return q.hashQueue.Size() + q.blockTaskQueue.Size()
}
// PendingReceipts retrieves the number of block receipts pending for retrieval.
func (q *queue) PendingReceipts() int {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
return q.receiptTaskQueue.Size()
}
@@ -192,8 +207,8 @@ func (q *queue) PendingNodeData() int {
// InFlightBlocks retrieves whether there are block fetch requests currently in
// flight.
func (q *queue) InFlightBlocks() bool {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
return len(q.blockPendPool) > 0
}
@@ -201,8 +216,8 @@ func (q *queue) InFlightBlocks() bool {
// InFlightReceipts retrieves whether there are receipt fetch requests currently
// in flight.
func (q *queue) InFlightReceipts() bool {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
return len(q.receiptPendPool) > 0
}
@@ -210,8 +225,8 @@ func (q *queue) InFlightReceipts() bool {
// InFlightNodeData retrieves whether there are node data entry fetch requests
// currently in flight.
func (q *queue) InFlightNodeData() bool {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
return len(q.statePendPool)+int(atomic.LoadInt32(&q.stateProcessors)) > 0
}
@@ -219,8 +234,8 @@ func (q *queue) InFlightNodeData() bool {
// Idle returns if the queue is fully idle or has some data still inside. This
// method is used by the tester to detect termination events.
func (q *queue) Idle() bool {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
queued := q.hashQueue.Size() + q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size()
pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool)
@@ -237,8 +252,8 @@ func (q *queue) Idle() bool {
// FastSyncPivot retrieves the currently used fast sync pivot point.
func (q *queue) FastSyncPivot() uint64 {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
return q.fastSyncPivot
}
@@ -246,8 +261,8 @@ func (q *queue) FastSyncPivot() uint64 {
// ShouldThrottleBlocks checks if the download should be throttled (active block (body)
// fetches exceed block cache).
func (q *queue) ShouldThrottleBlocks() bool {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
// Calculate the currently in-flight block (body) requests
pending := 0
@@ -261,8 +276,8 @@ func (q *queue) ShouldThrottleBlocks() bool {
// ShouldThrottleReceipts checks if the download should be throttled (active receipt
// fetches exceed block cache).
func (q *queue) ShouldThrottleReceipts() bool {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
// Calculate the currently in-flight receipt requests
pending := 0
@@ -351,91 +366,74 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
return inserts
}
-// GetHeadResult retrieves the first fetch result from the cache, or nil if it hasn't
-// been downloaded yet (or simply non existent).
-func (q *queue) GetHeadResult() *fetchResult {
- q.lock.RLock()
- defer q.lock.RUnlock()
+// WaitResults retrieves and permanently removes a batch of fetch
+// results from the cache. the result slice will be empty if the queue
+// has been closed.
+func (q *queue) WaitResults() []*fetchResult {
+ q.lock.Lock()
+ defer q.lock.Unlock()
- // If there are no results pending, return nil
- if len(q.resultCache) == 0 || q.resultCache[0] == nil {
- return nil
- }
- // If the next result is still incomplete, return nil
- if q.resultCache[0].Pending > 0 {
- return nil
+ nproc := q.countProcessableItems()
+ for nproc == 0 && !q.closed {
+ q.active.Wait()
+ nproc = q.countProcessableItems()
}
- // If the next result is the fast sync pivot...
- if q.mode == FastSync && q.resultCache[0].Header.Number.Uint64() == q.fastSyncPivot {
- // If the pivot state trie is still being pulled, return nil
- if len(q.stateTaskPool) > 0 {
- return nil
+ results := make([]*fetchResult, nproc)
+ copy(results, q.resultCache[:nproc])
+ if len(results) > 0 {
+ // Mark results as done before dropping them from the cache.
+ for _, result := range results {
+ hash := result.Header.Hash()
+ delete(q.blockDonePool, hash)
+ delete(q.receiptDonePool, hash)
}
- if q.PendingNodeData() > 0 {
- return nil
- }
- // If the state is done, but not enough post-pivot headers were verified, stall...
- for i := 0; i < fsHeaderForceVerify; i++ {
- if i+1 >= len(q.resultCache) || q.resultCache[i+1] == nil {
- return nil
- }
+ // Delete the results from the cache and clear the tail.
+ copy(q.resultCache, q.resultCache[nproc:])
+ for i := len(q.resultCache) - nproc; i < len(q.resultCache); i++ {
+ q.resultCache[i] = nil
}
+ // Advance the expected block number of the first cache entry.
+ q.resultOffset += uint64(nproc)
}
- return q.resultCache[0]
+ return results
}
-// TakeResults retrieves and permanently removes a batch of fetch results from
-// the cache.
-func (q *queue) TakeResults() []*fetchResult {
- q.lock.Lock()
- defer q.lock.Unlock()
-
- // Accumulate all available results
- results := []*fetchResult{}
+// countProcessableItems counts the processable items.
+func (q *queue) countProcessableItems() int {
for i, result := range q.resultCache {
- // Stop if no more results are ready
+ // Don't process incomplete or unavailable items.
if result == nil || result.Pending > 0 {
- break
+ return i
}
- // The fast sync pivot block may only be processed after state fetch completes
- if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot {
- if len(q.stateTaskPool) > 0 {
- break
- }
- if q.PendingNodeData() > 0 {
- break
- }
- // Even is state fetch is done, ensure post-pivot headers passed verifications
- safe := true
- for j := 0; j < fsHeaderForceVerify; j++ {
- if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil {
- safe = false
+ // Special handling for the fast-sync pivot block:
+ if q.mode == FastSync {
+ bnum := result.Header.Number.Uint64()
+ if bnum == q.fastSyncPivot {
+ // If the state of the pivot block is not
+ // available yet, we cannot proceed and return 0.
+ //
+ // Stop before processing the pivot block to ensure that
+ // resultCache has space for fsHeaderForceVerify items. Not
+ // doing this could leave us unable to download the required
+ // amount of headers.
+ if i > 0 || len(q.stateTaskPool) > 0 || q.PendingNodeData() > 0 {
+ return i
+ }
+ for j := 0; j < fsHeaderForceVerify; j++ {
+ if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil {
+ return i
+ }
}
}
- if !safe {
- break
+ // If we're just the fast sync pivot, stop as well
+ // because the following batch needs different insertion.
+ // This simplifies handling the switchover in d.process.
+ if bnum == q.fastSyncPivot+1 && i > 0 {
+ return i
}
}
- // If we've just inserted the fast sync pivot, stop as the following batch needs different insertion
- if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot+1 && len(results) > 0 {
- break
- }
- results = append(results, result)
-
- hash := result.Header.Hash()
- delete(q.blockDonePool, hash)
- delete(q.receiptDonePool, hash)
}
- // Delete the results from the slice and let them be garbage collected
- // without this slice trick the results would stay in memory until nil
- // would be assigned to them.
- copy(q.resultCache, q.resultCache[len(results):])
- for k, n := len(q.resultCache)-len(results), len(q.resultCache); k < n; k++ {
- q.resultCache[k] = nil
- }
- q.resultOffset += uint64(len(results))
-
- return results
+ return len(q.resultCache)
}
// ReserveBlocks reserves a set of block hashes for the given peer, skipping any
@@ -584,6 +582,7 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ
// If we're the first to request this task, initialise the result container
index := int(header.Number.Int64() - int64(q.resultOffset))
if index >= len(q.resultCache) || index < 0 {
+ common.Report("index allocation went beyond available resultCache space")
return nil, false, errInvalidChain
}
if q.resultCache[index] == nil {
@@ -617,6 +616,10 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ
for _, header := range skip {
taskQueue.Push(header, -float32(header.Number.Uint64()))
}
+ if progress {
+ // Wake WaitResults, resultCache was modified
+ q.active.Signal()
+ }
// Assemble and return the block download request
if len(send) == 0 {
return nil, progress, nil
@@ -737,7 +740,7 @@ func (q *queue) ExpireNodeData(timeout time.Duration) []string {
// expire is the generic check that move expired tasks from a pending pool back
// into a task pool, returning all entities caught with expired tasks.
//
-// Note, this method expects the queue lock to be already held for writing. The
+// Note, this method expects the queue lock to be already held. The
// reason the lock is not obtained in here is because the parameters already need
// to access the queue, so they already need a lock anyway.
func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) []string {
@@ -813,17 +816,16 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
for hash, index := range request.Hashes {
q.hashQueue.Push(hash, float32(index))
}
+ // Wake up WaitResults
+ q.active.Signal()
// If none of the blocks were good, it's a stale delivery
switch {
case len(errs) == 0:
return nil
-
case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBlock):
return errs[0]
-
case len(errs) == len(blocks):
return errStaleDelivery
-
default:
return fmt.Errorf("multiple failures: %v", errs)
}
@@ -915,14 +917,14 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
taskQueue.Push(header, -float32(header.Number.Uint64()))
}
}
+ // Wake up WaitResults
+ q.active.Signal()
// If none of the data was good, it's a stale delivery
switch {
case failure == nil || failure == errInvalidChain:
return failure
-
case useful:
return fmt.Errorf("partial failure: %v", failure)
-
default:
return errStaleDelivery
}
@@ -977,10 +979,8 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
switch {
case len(errs) == 0:
return nil
-
case len(errs) == len(request.Hashes):
return errStaleDelivery
-
default:
return fmt.Errorf("multiple failures: %v", errs)
}
@@ -989,6 +989,10 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
// deliverNodeData is the asynchronous node data processor that injects a batch
// of sync results into the state scheduler.
func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(error, int)) {
+ // Wake up WaitResults after the state has been written because it
+ // might be waiting for the pivot block state to get completed.
+ defer q.active.Signal()
+
// Process results one by one to permit task fetches in between
for i, result := range results {
q.stateSchedLock.Lock()