aboutsummaryrefslogtreecommitdiffstats
path: root/les/distributor.go
diff options
context:
space:
mode:
authorFelföldi Zsolt <zsfelfoldi@gmail.com>2017-06-21 18:27:38 +0800
committerFelix Lange <fjl@users.noreply.github.com>2017-06-21 18:27:38 +0800
commita5d08c893d61f66d60d8a91216aee5347b78f93e (patch)
tree500f3a788ecd4f299692ce1d1069f2efdc79d73d /les/distributor.go
parent60e27b51bc5643bc6a76151020a9e1a245340b70 (diff)
downloadgo-tangerine-a5d08c893d61f66d60d8a91216aee5347b78f93e.tar
go-tangerine-a5d08c893d61f66d60d8a91216aee5347b78f93e.tar.gz
go-tangerine-a5d08c893d61f66d60d8a91216aee5347b78f93e.tar.bz2
go-tangerine-a5d08c893d61f66d60d8a91216aee5347b78f93e.tar.lz
go-tangerine-a5d08c893d61f66d60d8a91216aee5347b78f93e.tar.xz
go-tangerine-a5d08c893d61f66d60d8a91216aee5347b78f93e.tar.zst
go-tangerine-a5d08c893d61f66d60d8a91216aee5347b78f93e.zip
les: code refactoring (#14416)
This commit does various code refactorings: - generalizes and moves the request retrieval/timeout/resend logic out of LesOdr (will be used by a subsequent PR) - reworks the peer management logic so that all services can register with peerSet to get notified about added/dropped peers (also gets rid of the ugly getAllPeers callback in requestDistributor) - moves peerSet, LesOdr, requestDistributor and retrieveManager initialization out of ProtocolManager because I believe they do not really belong there and the whole init process was ugly and ad-hoc
Diffstat (limited to 'les/distributor.go')
-rw-r--r--les/distributor.go58
1 files changed, 42 insertions, 16 deletions
diff --git a/les/distributor.go b/les/distributor.go
index 71afe2b73..e8ef5b02e 100644
--- a/les/distributor.go
+++ b/les/distributor.go
@@ -34,11 +34,11 @@ var ErrNoPeers = errors.New("no suitable peers available")
type requestDistributor struct {
reqQueue *list.List
lastReqOrder uint64
+ peers map[distPeer]struct{}
+ peerLock sync.RWMutex
stopChn, loopChn chan struct{}
loopNextSent bool
lock sync.Mutex
-
- getAllPeers func() map[distPeer]struct{}
}
// distPeer is an LES server peer interface for the request distributor.
@@ -71,15 +71,39 @@ type distReq struct {
}
// newRequestDistributor creates a new request distributor
-func newRequestDistributor(getAllPeers func() map[distPeer]struct{}, stopChn chan struct{}) *requestDistributor {
- r := &requestDistributor{
- reqQueue: list.New(),
- loopChn: make(chan struct{}, 2),
- stopChn: stopChn,
- getAllPeers: getAllPeers,
+func newRequestDistributor(peers *peerSet, stopChn chan struct{}) *requestDistributor {
+ d := &requestDistributor{
+ reqQueue: list.New(),
+ loopChn: make(chan struct{}, 2),
+ stopChn: stopChn,
+ peers: make(map[distPeer]struct{}),
+ }
+ if peers != nil {
+ peers.notify(d)
}
- go r.loop()
- return r
+ go d.loop()
+ return d
+}
+
+// registerPeer implements peerSetNotify
+func (d *requestDistributor) registerPeer(p *peer) {
+ d.peerLock.Lock()
+ d.peers[p] = struct{}{}
+ d.peerLock.Unlock()
+}
+
+// unregisterPeer implements peerSetNotify
+func (d *requestDistributor) unregisterPeer(p *peer) {
+ d.peerLock.Lock()
+ delete(d.peers, p)
+ d.peerLock.Unlock()
+}
+
+// registerTestPeer adds a new test peer
+func (d *requestDistributor) registerTestPeer(p distPeer) {
+ d.peerLock.Lock()
+ d.peers[p] = struct{}{}
+ d.peerLock.Unlock()
}
// distMaxWait is the maximum waiting time after which further necessary waiting
@@ -152,8 +176,7 @@ func (sp selectPeerItem) Weight() int64 {
// nextRequest returns the next possible request from any peer, along with the
// associated peer and necessary waiting time
func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
- peers := d.getAllPeers()
-
+ checkedPeers := make(map[distPeer]struct{})
elem := d.reqQueue.Front()
var (
bestPeer distPeer
@@ -162,11 +185,14 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
sel *weightedRandomSelect
)
- for (len(peers) > 0 || elem == d.reqQueue.Front()) && elem != nil {
+ d.peerLock.RLock()
+ defer d.peerLock.RUnlock()
+
+ for (len(d.peers) > 0 || elem == d.reqQueue.Front()) && elem != nil {
req := elem.Value.(*distReq)
canSend := false
- for peer, _ := range peers {
- if peer.canQueue() && req.canSend(peer) {
+ for peer, _ := range d.peers {
+ if _, ok := checkedPeers[peer]; !ok && peer.canQueue() && req.canSend(peer) {
canSend = true
cost := req.getCost(peer)
wait, bufRemain := peer.waitBefore(cost)
@@ -182,7 +208,7 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
bestWait = wait
}
}
- delete(peers, peer)
+ checkedPeers[peer] = struct{}{}
}
}
next := elem.Next()