From f46adfac285741e943b97779d2053b22e66ce18d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 12 Sep 2017 14:39:34 +0300 Subject: eth/downloader: track peer drops and deassign state sync tasks --- eth/downloader/peer.go | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) (limited to 'eth/downloader/peer.go') diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index d0dc9a8aa..e638744ea 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -349,9 +349,10 @@ func (p *peerConnection) Lacks(hash common.Hash) bool { // peerSet represents the collection of active peer participating in the chain // download procedure. type peerSet struct { - peers map[string]*peerConnection - newPeerFeed event.Feed - lock sync.RWMutex + peers map[string]*peerConnection + newPeerFeed event.Feed + peerDropFeed event.Feed + lock sync.RWMutex } // newPeerSet creates a new peer set top track the active download sources. @@ -361,10 +362,16 @@ func newPeerSet() *peerSet { } } +// SubscribeNewPeers subscribes to peer arrival events. func (ps *peerSet) SubscribeNewPeers(ch chan<- *peerConnection) event.Subscription { return ps.newPeerFeed.Subscribe(ch) } +// SubscribePeerDrops subscribes to peer departure events. +func (ps *peerSet) SubscribePeerDrops(ch chan<- *peerConnection) event.Subscription { + return ps.peerDropFeed.Subscribe(ch) +} + // Reset iterates over the current peer set, and resets each of the known peers // to prepare for a next batch of block retrieval. func (ps *peerSet) Reset() { @@ -419,12 +426,15 @@ func (ps *peerSet) Register(p *peerConnection) error { // actions to/from that particular entity. func (ps *peerSet) Unregister(id string) error { ps.lock.Lock() - defer ps.lock.Unlock() - - if _, ok := ps.peers[id]; !ok { + p, ok := ps.peers[id] + if !ok { + defer ps.lock.Unlock() return errNotRegistered } delete(ps.peers, id) + ps.lock.Unlock() + + ps.peerDropFeed.Send(p) return nil } -- cgit v1.2.3