aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/statesync.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader/statesync.go')
-rw-r--r--eth/downloader/statesync.go36
1 files changed, 15 insertions, 21 deletions
diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go
index a0b05c9be..9cc65a208 100644
--- a/eth/downloader/statesync.go
+++ b/eth/downloader/statesync.go
@@ -20,7 +20,6 @@ import (
"fmt"
"hash"
"sync"
- "sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
@@ -132,7 +131,10 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync {
// Send the next finished request to the current sync:
case deliverReqCh <- deliverReq:
- finished = append(finished[:0], finished[1:]...)
+ // Shift out the first request, but also set the emptied slot to nil for GC
+ copy(finished, finished[1:])
+ finished[len(finished)-1] = nil
+ finished = finished[:len(finished)-1]
// Handle incoming state packs:
case pack := <-d.stateCh:
@@ -291,6 +293,9 @@ func (s *stateSync) loop() error {
case <-s.cancel:
return errCancelStateFetch
+ case <-s.d.cancelCh:
+ return errCancelStateFetch
+
case req := <-s.deliver:
// Response, disconnect or timeout triggered, drop the peer if stalling
log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "dropped", req.dropped, "timeout", !req.dropped && req.timedOut())
@@ -301,15 +306,11 @@ func (s *stateSync) loop() error {
s.d.dropPeer(req.peer.id)
}
// Process all the received blobs and check for stale delivery
- stale, err := s.process(req)
- if err != nil {
+ if err := s.process(req); err != nil {
log.Warn("Node data write error", "err", err)
return err
}
- // The the delivery contains requested data, mark the node idle (otherwise it's a timed out delivery)
- if !stale {
- req.peer.SetNodeDataIdle(len(req.response))
- }
+ req.peer.SetNodeDataIdle(len(req.response))
}
}
return s.commit(true)
@@ -349,6 +350,7 @@ func (s *stateSync) assignTasks() {
case s.d.trackStateReq <- req:
req.peer.FetchNodeData(req.items)
case <-s.cancel:
+ case <-s.d.cancelCh:
}
}
}
@@ -387,7 +389,7 @@ func (s *stateSync) fillTasks(n int, req *stateReq) {
// process iterates over a batch of delivered state data, injecting each item
// into a running state sync, re-queuing any items that were requested but not
// delivered.
-func (s *stateSync) process(req *stateReq) (bool, error) {
+func (s *stateSync) process(req *stateReq) error {
// Collect processing stats and update progress if valid data was received
duplicate, unexpected := 0, 0
@@ -398,7 +400,7 @@ func (s *stateSync) process(req *stateReq) (bool, error) {
}(time.Now())
// Iterate over all the delivered data and inject one-by-one into the trie
- progress, stale := false, len(req.response) > 0
+ progress := false
for _, blob := range req.response {
prog, hash, err := s.processNodeData(blob)
@@ -412,20 +414,12 @@ func (s *stateSync) process(req *stateReq) (bool, error) {
case trie.ErrAlreadyProcessed:
duplicate++
default:
- return stale, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err)
+ return fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err)
}
- // If the node delivered a requested item, mark the delivery non-stale
if _, ok := req.tasks[hash]; ok {
delete(req.tasks, hash)
- stale = false
}
}
- // If we're inside the critical section, reset fail counter since we progressed.
- if progress && atomic.LoadUint32(&s.d.fsPivotFails) > 1 {
- log.Trace("Fast-sync progressed, resetting fail counter", "previous", atomic.LoadUint32(&s.d.fsPivotFails))
- atomic.StoreUint32(&s.d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block
- }
-
// Put unfulfilled tasks back into the retry queue
npeers := s.d.peers.Len()
for hash, task := range req.tasks {
@@ -438,12 +432,12 @@ func (s *stateSync) process(req *stateReq) (bool, error) {
// If we've requested the node too many times already, it may be a malicious
// sync where nobody has the right data. Abort.
if len(task.attempts) >= npeers {
- return stale, fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers)
+ return fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers)
}
// Missing item, place into the retry queue.
s.tasks[hash] = task
}
- return stale, nil
+ return nil
}
// processNodeData tries to inject a trie node data blob delivered from a remote