From a5d08c893d61f66d60d8a91216aee5347b78f93e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felf=C3=B6ldi=20Zsolt?= Date: Wed, 21 Jun 2017 03:27:38 -0700 Subject: les: code refactoring (#14416) This commit does various code refactorings: - generalizes and moves the request retrieval/timeout/resend logic out of LesOdr (will be used by a subsequent PR) - reworks the peer management logic so that all services can register with peerSet to get notified about added/dropped peers (also gets rid of the ugly getAllPeers callback in requestDistributor) - moves peerSet, LesOdr, requestDistributor and retrieveManager initialization out of ProtocolManager because I believe they do not really belong there and the whole init process was ugly and ad-hoc --- les/distributor.go | 58 +++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 42 insertions(+), 16 deletions(-) (limited to 'les/distributor.go') diff --git a/les/distributor.go b/les/distributor.go index 71afe2b73..e8ef5b02e 100644 --- a/les/distributor.go +++ b/les/distributor.go @@ -34,11 +34,11 @@ var ErrNoPeers = errors.New("no suitable peers available") type requestDistributor struct { reqQueue *list.List lastReqOrder uint64 + peers map[distPeer]struct{} + peerLock sync.RWMutex stopChn, loopChn chan struct{} loopNextSent bool lock sync.Mutex - - getAllPeers func() map[distPeer]struct{} } // distPeer is an LES server peer interface for the request distributor. @@ -71,15 +71,39 @@ type distReq struct { } // newRequestDistributor creates a new request distributor -func newRequestDistributor(getAllPeers func() map[distPeer]struct{}, stopChn chan struct{}) *requestDistributor { - r := &requestDistributor{ - reqQueue: list.New(), - loopChn: make(chan struct{}, 2), - stopChn: stopChn, - getAllPeers: getAllPeers, +func newRequestDistributor(peers *peerSet, stopChn chan struct{}) *requestDistributor { + d := &requestDistributor{ + reqQueue: list.New(), + loopChn: make(chan struct{}, 2), + stopChn: stopChn, + peers: make(map[distPeer]struct{}), + } + if peers != nil { + peers.notify(d) } - go r.loop() - return r + go d.loop() + return d +} + +// registerPeer implements peerSetNotify +func (d *requestDistributor) registerPeer(p *peer) { + d.peerLock.Lock() + d.peers[p] = struct{}{} + d.peerLock.Unlock() +} + +// unregisterPeer implements peerSetNotify +func (d *requestDistributor) unregisterPeer(p *peer) { + d.peerLock.Lock() + delete(d.peers, p) + d.peerLock.Unlock() +} + +// registerTestPeer adds a new test peer +func (d *requestDistributor) registerTestPeer(p distPeer) { + d.peerLock.Lock() + d.peers[p] = struct{}{} + d.peerLock.Unlock() } // distMaxWait is the maximum waiting time after which further necessary waiting @@ -152,8 +176,7 @@ func (sp selectPeerItem) Weight() int64 { // nextRequest returns the next possible request from any peer, along with the // associated peer and necessary waiting time func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) { - peers := d.getAllPeers() - + checkedPeers := make(map[distPeer]struct{}) elem := d.reqQueue.Front() var ( bestPeer distPeer @@ -162,11 +185,14 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) { sel *weightedRandomSelect ) - for (len(peers) > 0 || elem == d.reqQueue.Front()) && elem != nil { + d.peerLock.RLock() + defer d.peerLock.RUnlock() + + for (len(d.peers) > 0 || elem == d.reqQueue.Front()) && elem != nil { req := elem.Value.(*distReq) canSend := false - for peer, _ := range peers { - if peer.canQueue() && req.canSend(peer) { + for peer, _ := range d.peers { + if _, ok := checkedPeers[peer]; !ok && peer.canQueue() && req.canSend(peer) { canSend = true cost := req.getCost(peer) wait, bufRemain := peer.waitBefore(cost) @@ -182,7 +208,7 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) { bestWait = wait } } - delete(peers, peer) + checkedPeers[peer] = struct{}{} } } next := elem.Next() -- cgit v1.2.3