aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2017-09-12 19:39:34 +0800
committerPéter Szilágyi <peterke@gmail.com>2017-09-12 20:13:14 +0800
commitf46adfac285741e943b97779d2053b22e66ce18d (patch)
tree626096a5edf82044e28eaddf55dfd1475cdeec5b /eth/downloader
parent514b1587db0c7231bf7b49782b3257fe1125a54a (diff)
downloaddexon-f46adfac285741e943b97779d2053b22e66ce18d.tar
dexon-f46adfac285741e943b97779d2053b22e66ce18d.tar.gz
dexon-f46adfac285741e943b97779d2053b22e66ce18d.tar.bz2
dexon-f46adfac285741e943b97779d2053b22e66ce18d.tar.lz
dexon-f46adfac285741e943b97779d2053b22e66ce18d.tar.xz
dexon-f46adfac285741e943b97779d2053b22e66ce18d.tar.zst
dexon-f46adfac285741e943b97779d2053b22e66ce18d.zip
eth/downloader: track peer drops and deassign state sync tasks
Diffstat (limited to 'eth/downloader')
-rw-r--r--eth/downloader/peer.go22
-rw-r--r--eth/downloader/statesync.go29
2 files changed, 42 insertions, 9 deletions
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go
index d0dc9a8aa..e638744ea 100644
--- a/eth/downloader/peer.go
+++ b/eth/downloader/peer.go
@@ -349,9 +349,10 @@ func (p *peerConnection) Lacks(hash common.Hash) bool {
// peerSet represents the collection of active peer participating in the chain
// download procedure.
type peerSet struct {
- peers map[string]*peerConnection
- newPeerFeed event.Feed
- lock sync.RWMutex
+ peers map[string]*peerConnection
+ newPeerFeed event.Feed
+ peerDropFeed event.Feed
+ lock sync.RWMutex
}
// newPeerSet creates a new peer set top track the active download sources.
@@ -361,10 +362,16 @@ func newPeerSet() *peerSet {
}
}
+// SubscribeNewPeers subscribes to peer arrival events.
func (ps *peerSet) SubscribeNewPeers(ch chan<- *peerConnection) event.Subscription {
return ps.newPeerFeed.Subscribe(ch)
}
+// SubscribePeerDrops subscribes to peer departure events.
+func (ps *peerSet) SubscribePeerDrops(ch chan<- *peerConnection) event.Subscription {
+ return ps.peerDropFeed.Subscribe(ch)
+}
+
// Reset iterates over the current peer set, and resets each of the known peers
// to prepare for a next batch of block retrieval.
func (ps *peerSet) Reset() {
@@ -419,12 +426,15 @@ func (ps *peerSet) Register(p *peerConnection) error {
// actions to/from that particular entity.
func (ps *peerSet) Unregister(id string) error {
ps.lock.Lock()
- defer ps.lock.Unlock()
-
- if _, ok := ps.peers[id]; !ok {
+ p, ok := ps.peers[id]
+ if !ok {
+ defer ps.lock.Unlock()
return errNotRegistered
}
delete(ps.peers, id)
+ ps.lock.Unlock()
+
+ ps.peerDropFeed.Send(p)
return nil
}
diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go
index eb5416f63..a0b05c9be 100644
--- a/eth/downloader/statesync.go
+++ b/eth/downloader/statesync.go
@@ -40,6 +40,7 @@ type stateReq struct {
timer *time.Timer // Timer to fire when the RTT timeout expires
peer *peerConnection // Peer that we're requesting from
response [][]byte // Response data of the peer (nil for timeouts)
+ dropped bool // Flag whether the peer dropped off early
}
// timedOut returns if this request timed out.
@@ -105,6 +106,11 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync {
go s.run()
defer s.Cancel()
+ // Listen for peer departure events to cancel assigned tasks
+ peerDrop := make(chan *peerConnection, 1024)
+ peerSub := s.d.peers.SubscribePeerDrops(peerDrop)
+ defer peerSub.Unsubscribe()
+
for {
// Enable sending of the first buffered element if there is one.
var (
@@ -143,6 +149,20 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync {
finished = append(finished, req)
delete(active, pack.PeerId())
+ // Handle dropped peer connections:
+ case p := <-peerDrop:
+ // Skip if no request is currently pending
+ req := active[p.id]
+ if req == nil {
+ continue
+ }
+ // Finalize the request and queue up for processing
+ req.timer.Stop()
+ req.dropped = true
+
+ finished = append(finished, req)
+ delete(active, p.id)
+
// Handle timed-out requests:
case req := <-timeout:
// If the peer is already requesting something else, ignore the stale timeout.
@@ -167,6 +187,9 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync {
log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id)
// Make sure the previous one doesn't get siletly lost
+ old.timer.Stop()
+ old.dropped = true
+
finished = append(finished, old)
}
// Start a timer to notify the sync loop if the peer stalled.
@@ -269,9 +292,9 @@ func (s *stateSync) loop() error {
return errCancelStateFetch
case req := <-s.deliver:
- // Response or timeout triggered, drop the peer if stalling
- log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "timeout", req.timedOut())
- if len(req.items) <= 2 && req.timedOut() {
+ // Response, disconnect or timeout triggered, drop the peer if stalling
+ log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "dropped", req.dropped, "timeout", !req.dropped && req.timedOut())
+ if len(req.items) <= 2 && !req.dropped && req.timedOut() {
// 2 items are the minimum requested, if even that times out, we've no use of
// this peer at the moment.
log.Warn("Stalling state sync, dropping peer", "peer", req.peer.id)