aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/queue.go
diff options
context:
space:
mode:
authorFelix Lange <fjl@users.noreply.github.com>2017-01-24 20:20:37 +0800
committerGitHub <noreply@github.com>2017-01-24 20:20:37 +0800
commitf1069a30b9ca13769ace154ff25dfabc83f25485 (patch)
treed7d9cac279b46ab301cd3a05016711ddd45ada7c /eth/downloader/queue.go
parent2718b4282840347677f55d5ea7865fbae35838d6 (diff)
downloaddexon-f1069a30b9ca13769ace154ff25dfabc83f25485.tar
dexon-f1069a30b9ca13769ace154ff25dfabc83f25485.tar.gz
dexon-f1069a30b9ca13769ace154ff25dfabc83f25485.tar.bz2
dexon-f1069a30b9ca13769ace154ff25dfabc83f25485.tar.lz
dexon-f1069a30b9ca13769ace154ff25dfabc83f25485.tar.xz
dexon-f1069a30b9ca13769ace154ff25dfabc83f25485.tar.zst
dexon-f1069a30b9ca13769ace154ff25dfabc83f25485.zip
eth/downloader: improve deliverNodeData (#3588)
Commit d3b751e accidentally deleted a crucial 'return' statement, leading to a crash in case of an issue with node data. This change improves the fix in PR #3591 by removing the lock entirely.
Diffstat (limited to 'eth/downloader/queue.go')
-rw-r--r--eth/downloader/queue.go124
1 files changed, 52 insertions, 72 deletions
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index 3318879e2..5be09f37d 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -23,7 +23,6 @@ import (
"errors"
"fmt"
"sync"
- "sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
@@ -101,10 +100,9 @@ type queue struct {
stateTaskQueue *prque.Prque // [eth/63] Priority queue of the hashes to fetch the node data for
statePendPool map[string]*fetchRequest // [eth/63] Currently pending node data retrieval operations
- stateDatabase ethdb.Database // [eth/63] Trie database to populate during state reassembly
- stateScheduler *state.StateSync // [eth/63] State trie synchronisation scheduler and integrator
- stateProcessors int32 // [eth/63] Number of currently running state processors
- stateSchedLock sync.RWMutex // [eth/63] Lock serialising access to the state scheduler
+ stateDatabase ethdb.Database // [eth/63] Trie database to populate during state reassembly
+ stateScheduler *state.StateSync // [eth/63] State trie synchronisation scheduler and integrator
+ stateWriters int // [eth/63] Number of running state DB writer goroutines
resultCache []*fetchResult // Downloaded but not yet delivered fetch results
resultOffset uint64 // Offset of the first cached fetch result in the block chain
@@ -143,9 +141,6 @@ func (q *queue) Reset() {
q.lock.Lock()
defer q.lock.Unlock()
- q.stateSchedLock.Lock()
- defer q.stateSchedLock.Unlock()
-
q.closed = false
q.mode = FullSync
q.fastSyncPivot = 0
@@ -209,13 +204,24 @@ func (q *queue) PendingReceipts() int {
// PendingNodeData retrieves the number of node data entries pending for retrieval.
func (q *queue) PendingNodeData() int {
- q.stateSchedLock.RLock()
- defer q.stateSchedLock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
+ return q.pendingNodeDataLocked()
+}
+
+// pendingNodeDataLocked retrieves the number of node data entries pending for retrieval.
+// The caller must hold q.lock.
+func (q *queue) pendingNodeDataLocked() int {
+ var n int
if q.stateScheduler != nil {
- return q.stateScheduler.Pending()
+ n = q.stateScheduler.Pending()
+ }
+ // Ensure that PendingNodeData doesn't return 0 until all state is written.
+ if q.stateWriters > 0 {
+ n++
}
- return 0
+ return n
}
// InFlightHeaders retrieves whether there are header fetch requests currently
@@ -251,7 +257,7 @@ func (q *queue) InFlightNodeData() bool {
q.lock.Lock()
defer q.lock.Unlock()
- return len(q.statePendPool)+int(atomic.LoadInt32(&q.stateProcessors)) > 0
+ return len(q.statePendPool)+q.stateWriters > 0
}
// Idle returns if the queue is fully idle or has some data still inside. This
@@ -264,12 +270,9 @@ func (q *queue) Idle() bool {
pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool)
cached := len(q.blockDonePool) + len(q.receiptDonePool)
- q.stateSchedLock.RLock()
if q.stateScheduler != nil {
queued += q.stateScheduler.Pending()
}
- q.stateSchedLock.RUnlock()
-
return (queued + pending + cached) == 0
}
@@ -398,9 +401,7 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
req.Hashes = make(map[common.Hash]int) // Make sure executing requests fail, but don't disappear
}
- q.stateSchedLock.Lock()
q.stateScheduler = state.NewStateSync(header.Root, q.stateDatabase)
- q.stateSchedLock.Unlock()
}
inserts = append(inserts, header)
q.headerHead = hash
@@ -459,7 +460,7 @@ func (q *queue) countProcessableItems() int {
// resultCache has space for fsHeaderForceVerify items. Not
// doing this could leave us unable to download the required
// amount of headers.
- if i > 0 || len(q.stateTaskPool) > 0 || q.PendingNodeData() > 0 {
+ if i > 0 || len(q.stateTaskPool) > 0 || q.pendingNodeDataLocked() > 0 {
return i
}
for j := 0; j < fsHeaderForceVerify; j++ {
@@ -524,9 +525,6 @@ func (q *queue) ReserveHeaders(p *peer, count int) *fetchRequest {
func (q *queue) ReserveNodeData(p *peer, count int) *fetchRequest {
// Create a task generator to fetch status-fetch tasks if all schedules ones are done
generator := func(max int) {
- q.stateSchedLock.Lock()
- defer q.stateSchedLock.Unlock()
-
if q.stateScheduler != nil {
for _, hash := range q.stateScheduler.Missing(max) {
q.stateTaskPool[hash] = q.stateTaskIndex
@@ -1068,7 +1066,7 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(int, boo
}
}
// Iterate over the downloaded data and verify each of them
- accepted, errs := 0, make([]error, 0)
+ errs := make([]error, 0)
process := []trie.SyncResult{}
for _, blob := range data {
// Skip any state trie entries that were not requested
@@ -1079,68 +1077,50 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(int, boo
}
// Inject the next state trie item into the processing queue
process = append(process, trie.SyncResult{Hash: hash, Data: blob})
- accepted++
-
delete(request.Hashes, hash)
delete(q.stateTaskPool, hash)
}
- // Start the asynchronous node state data injection
- atomic.AddInt32(&q.stateProcessors, 1)
- go func() {
- defer atomic.AddInt32(&q.stateProcessors, -1)
- q.deliverNodeData(process, callback)
- }()
// Return all failed or missing fetches to the queue
for hash, index := range request.Hashes {
q.stateTaskQueue.Push(hash, float32(index))
}
+ if q.stateScheduler == nil {
+ return 0, errNoFetchesPending
+ }
+
+ // Run valid nodes through the trie download scheduler. It writes completed nodes to a
+ // batch, which is committed asynchronously. This may lead to over-fetches because the
+ // scheduler treats everything as written after Process has returned, but it's
+ // unlikely to be an issue in practice.
+ batch := q.stateDatabase.NewBatch()
+ progressed, nproc, procerr := q.stateScheduler.Process(process, batch)
+ q.stateWriters += 1
+ go func() {
+ if procerr == nil {
+ nproc = len(process)
+ procerr = batch.Write()
+ }
+ // Return processing errors through the callback so the sync gets canceled. The
+ // number of writers is decremented prior to the call so PendingNodeData will
+ // return zero when the callback runs.
+ q.lock.Lock()
+ q.stateWriters -= 1
+ q.lock.Unlock()
+ callback(nproc, progressed, procerr)
+ // Wake up WaitResults after the state has been written because it might be
+ // waiting for completion of the pivot block's state download.
+ q.active.Signal()
+ }()
+
// If none of the data items were good, it's a stale delivery
switch {
case len(errs) == 0:
- return accepted, nil
+ return len(process), nil
case len(errs) == len(request.Hashes):
- return accepted, errStaleDelivery
+ return len(process), errStaleDelivery
default:
- return accepted, fmt.Errorf("multiple failures: %v", errs)
- }
-}
-
-// deliverNodeData is the asynchronous node data processor that injects a batch
-// of sync results into the state scheduler.
-func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(int, bool, error)) {
- // Wake up WaitResults after the state has been written because it
- // might be waiting for the pivot block state to get completed.
- defer q.active.Signal()
-
- // Process results one by one to permit task fetches in between
- progressed := false
- for i, result := range results {
- q.stateSchedLock.Lock()
-
- if q.stateScheduler == nil {
- // Syncing aborted since this async delivery started, bail out
- q.stateSchedLock.Unlock()
- callback(i, progressed, errNoFetchesPending)
- return
- }
-
- batch := q.stateDatabase.NewBatch()
- prog, _, err := q.stateScheduler.Process([]trie.SyncResult{result}, batch)
- if err != nil {
- q.stateSchedLock.Unlock()
- callback(i, progressed, err)
- return
- }
- if err = batch.Write(); err != nil {
- q.stateSchedLock.Unlock()
- callback(i, progressed, err)
- return // TODO(karalabe): If a DB write fails (disk full), we ought to cancel the sync
- }
- // Item processing succeeded, release the lock (temporarily)
- progressed = progressed || prog
- q.stateSchedLock.Unlock()
+ return len(process), fmt.Errorf("multiple failures: %v", errs)
}
- callback(len(results), progressed, nil)
}
// Prepare configures the result cache to allow accepting and caching inbound