From f46adfac285741e943b97779d2053b22e66ce18d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 12 Sep 2017 14:39:34 +0300 Subject: eth/downloader: track peer drops and deassign state sync tasks --- eth/downloader/peer.go | 22 ++++++++++++++++------ eth/downloader/statesync.go | 29 ++++++++++++++++++++++++++--- 2 files changed, 42 insertions(+), 9 deletions(-) (limited to 'eth/downloader') diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index d0dc9a8aa..e638744ea 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -349,9 +349,10 @@ func (p *peerConnection) Lacks(hash common.Hash) bool { // peerSet represents the collection of active peer participating in the chain // download procedure. type peerSet struct { - peers map[string]*peerConnection - newPeerFeed event.Feed - lock sync.RWMutex + peers map[string]*peerConnection + newPeerFeed event.Feed + peerDropFeed event.Feed + lock sync.RWMutex } // newPeerSet creates a new peer set top track the active download sources. @@ -361,10 +362,16 @@ func newPeerSet() *peerSet { } } +// SubscribeNewPeers subscribes to peer arrival events. func (ps *peerSet) SubscribeNewPeers(ch chan<- *peerConnection) event.Subscription { return ps.newPeerFeed.Subscribe(ch) } +// SubscribePeerDrops subscribes to peer departure events. +func (ps *peerSet) SubscribePeerDrops(ch chan<- *peerConnection) event.Subscription { + return ps.peerDropFeed.Subscribe(ch) +} + // Reset iterates over the current peer set, and resets each of the known peers // to prepare for a next batch of block retrieval. func (ps *peerSet) Reset() { @@ -419,12 +426,15 @@ func (ps *peerSet) Register(p *peerConnection) error { // actions to/from that particular entity. func (ps *peerSet) Unregister(id string) error { ps.lock.Lock() - defer ps.lock.Unlock() - - if _, ok := ps.peers[id]; !ok { + p, ok := ps.peers[id] + if !ok { + defer ps.lock.Unlock() return errNotRegistered } delete(ps.peers, id) + ps.lock.Unlock() + + ps.peerDropFeed.Send(p) return nil } diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go index eb5416f63..a0b05c9be 100644 --- a/eth/downloader/statesync.go +++ b/eth/downloader/statesync.go @@ -40,6 +40,7 @@ type stateReq struct { timer *time.Timer // Timer to fire when the RTT timeout expires peer *peerConnection // Peer that we're requesting from response [][]byte // Response data of the peer (nil for timeouts) + dropped bool // Flag whether the peer dropped off early } // timedOut returns if this request timed out. @@ -105,6 +106,11 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync { go s.run() defer s.Cancel() + // Listen for peer departure events to cancel assigned tasks + peerDrop := make(chan *peerConnection, 1024) + peerSub := s.d.peers.SubscribePeerDrops(peerDrop) + defer peerSub.Unsubscribe() + for { // Enable sending of the first buffered element if there is one. var ( @@ -143,6 +149,20 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync { finished = append(finished, req) delete(active, pack.PeerId()) + // Handle dropped peer connections: + case p := <-peerDrop: + // Skip if no request is currently pending + req := active[p.id] + if req == nil { + continue + } + // Finalize the request and queue up for processing + req.timer.Stop() + req.dropped = true + + finished = append(finished, req) + delete(active, p.id) + // Handle timed-out requests: case req := <-timeout: // If the peer is already requesting something else, ignore the stale timeout. @@ -167,6 +187,9 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync { log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id) // Make sure the previous one doesn't get siletly lost + old.timer.Stop() + old.dropped = true + finished = append(finished, old) } // Start a timer to notify the sync loop if the peer stalled. @@ -269,9 +292,9 @@ func (s *stateSync) loop() error { return errCancelStateFetch case req := <-s.deliver: - // Response or timeout triggered, drop the peer if stalling - log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "timeout", req.timedOut()) - if len(req.items) <= 2 && req.timedOut() { + // Response, disconnect or timeout triggered, drop the peer if stalling + log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "dropped", req.dropped, "timeout", !req.dropped && req.timedOut()) + if len(req.items) <= 2 && !req.dropped && req.timedOut() { // 2 items are the minimum requested, if even that times out, we've no use of // this peer at the moment. log.Warn("Stalling state sync, dropping peer", "peer", req.peer.id) -- cgit v1.2.3