aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/queue.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader/queue.go')
-rw-r--r--eth/downloader/queue.go45
1 files changed, 28 insertions, 17 deletions
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index fd239f7e4..b7ad92099 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -362,20 +362,20 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
// Make sure chain order is honoured and preserved throughout
hash := header.Hash()
if header.Number == nil || header.Number.Uint64() != from {
- glog.V(logger.Warn).Infof("Header #%v [%x] broke chain ordering, expected %d", header.Number, hash[:4], from)
+ glog.V(logger.Warn).Infof("Header #%v [%x…] broke chain ordering, expected %d", header.Number, hash[:4], from)
break
}
if q.headerHead != (common.Hash{}) && q.headerHead != header.ParentHash {
- glog.V(logger.Warn).Infof("Header #%v [%x] broke chain ancestry", header.Number, hash[:4])
+ glog.V(logger.Warn).Infof("Header #%v [%x…] broke chain ancestry", header.Number, hash[:4])
break
}
// Make sure no duplicate requests are executed
if _, ok := q.blockTaskPool[hash]; ok {
- glog.V(logger.Warn).Infof("Header #%d [%x] already scheduled for block fetch", header.Number.Uint64(), hash[:4])
+ glog.V(logger.Warn).Infof("Header #%d [%x…] already scheduled for block fetch", header.Number.Uint64(), hash[:4])
continue
}
if _, ok := q.receiptTaskPool[hash]; ok {
- glog.V(logger.Warn).Infof("Header #%d [%x] already scheduled for receipt fetch", header.Number.Uint64(), hash[:4])
+ glog.V(logger.Warn).Infof("Header #%d [%x…] already scheduled for receipt fetch", header.Number.Uint64(), hash[:4])
continue
}
// Queue the header for content retrieval
@@ -388,7 +388,16 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))
}
if q.mode == FastSync && header.Number.Uint64() == q.fastSyncPivot {
- // Pivoting point of the fast sync, retrieve the state tries
+ // Pivoting point of the fast sync, switch the state retrieval to this
+ glog.V(logger.Debug).Infof("Switching state downloads to %d [%x…]", header.Number.Uint64(), header.Hash().Bytes()[:4])
+
+ q.stateTaskIndex = 0
+ q.stateTaskPool = make(map[common.Hash]int)
+ q.stateTaskQueue.Reset()
+ for _, req := range q.statePendPool {
+ req.Hashes = make(map[common.Hash]int) // Make sure executing requests fail, but don't disappear
+ }
+
q.stateSchedLock.Lock()
q.stateScheduler = state.NewStateSync(header.Root, q.stateDatabase)
q.stateSchedLock.Unlock()
@@ -866,10 +875,10 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh
accepted := len(headers) == MaxHeaderFetch
if accepted {
if headers[0].Number.Uint64() != request.From {
- glog.V(logger.Detail).Infof("Peer %s: first header #%v [%x] broke chain ordering, expected %d", id, headers[0].Number, headers[0].Hash().Bytes()[:4], request.From)
+ glog.V(logger.Detail).Infof("Peer %s: first header #%v [%x…] broke chain ordering, expected %d", id, headers[0].Number, headers[0].Hash().Bytes()[:4], request.From)
accepted = false
} else if headers[len(headers)-1].Hash() != target {
- glog.V(logger.Detail).Infof("Peer %s: last header #%v [%x] broke skeleton structure, expected %x", id, headers[len(headers)-1].Number, headers[len(headers)-1].Hash().Bytes()[:4], target[:4])
+ glog.V(logger.Detail).Infof("Peer %s: last header #%v [%x…] broke skeleton structure, expected %x", id, headers[len(headers)-1].Number, headers[len(headers)-1].Hash().Bytes()[:4], target[:4])
accepted = false
}
}
@@ -877,12 +886,12 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh
for i, header := range headers[1:] {
hash := header.Hash()
if want := request.From + 1 + uint64(i); header.Number.Uint64() != want {
- glog.V(logger.Warn).Infof("Peer %s: header #%v [%x] broke chain ordering, expected %d", id, header.Number, hash[:4], want)
+ glog.V(logger.Warn).Infof("Peer %s: header #%v [%x…] broke chain ordering, expected %d", id, header.Number, hash[:4], want)
accepted = false
break
}
if headers[i].Hash() != header.ParentHash {
- glog.V(logger.Warn).Infof("Peer %s: header #%v [%x] broke chain ancestry", id, header.Number, hash[:4])
+ glog.V(logger.Warn).Infof("Peer %s: header #%v [%x…] broke chain ancestry", id, header.Number, hash[:4])
accepted = false
break
}
@@ -1039,9 +1048,8 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
}
// DeliverNodeData injects a node state data retrieval response into the queue.
-// The method returns the number of node state entries originally requested, and
-// the number of them actually accepted from the delivery.
-func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) (int, error) {
+// The method returns the number of node state accepted from the delivery.
+func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(int, bool, error)) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
@@ -1099,31 +1107,34 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
// deliverNodeData is the asynchronous node data processor that injects a batch
// of sync results into the state scheduler.
-func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(error, int)) {
+func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(int, bool, error)) {
// Wake up WaitResults after the state has been written because it
// might be waiting for the pivot block state to get completed.
defer q.active.Signal()
// Process results one by one to permit task fetches in between
+ progressed := false
for i, result := range results {
q.stateSchedLock.Lock()
if q.stateScheduler == nil {
// Syncing aborted since this async delivery started, bail out
q.stateSchedLock.Unlock()
- callback(errNoFetchesPending, i)
+ callback(i, progressed, errNoFetchesPending)
return
}
- if _, err := q.stateScheduler.Process([]trie.SyncResult{result}); err != nil {
+ if prog, _, err := q.stateScheduler.Process([]trie.SyncResult{result}); err != nil {
// Processing a state result failed, bail out
q.stateSchedLock.Unlock()
- callback(err, i)
+ callback(i, progressed, err)
return
+ } else if prog {
+ progressed = true
}
// Item processing succeeded, release the lock (temporarily)
q.stateSchedLock.Unlock()
}
- callback(nil, len(results))
+ callback(len(results), progressed, nil)
}
// Prepare configures the result cache to allow accepting and caching inbound