aboutsummaryrefslogtreecommitdiffstats
path: root/les/distributor.go
diff options
context:
space:
mode:
Diffstat (limited to 'les/distributor.go')
-rw-r--r--les/distributor.go58
1 files changed, 42 insertions, 16 deletions
diff --git a/les/distributor.go b/les/distributor.go
index 71afe2b73..e8ef5b02e 100644
--- a/les/distributor.go
+++ b/les/distributor.go
@@ -34,11 +34,11 @@ var ErrNoPeers = errors.New("no suitable peers available")
type requestDistributor struct {
reqQueue *list.List
lastReqOrder uint64
+ peers map[distPeer]struct{}
+ peerLock sync.RWMutex
stopChn, loopChn chan struct{}
loopNextSent bool
lock sync.Mutex
-
- getAllPeers func() map[distPeer]struct{}
}
// distPeer is an LES server peer interface for the request distributor.
@@ -71,15 +71,39 @@ type distReq struct {
}
// newRequestDistributor creates a new request distributor
-func newRequestDistributor(getAllPeers func() map[distPeer]struct{}, stopChn chan struct{}) *requestDistributor {
- r := &requestDistributor{
- reqQueue: list.New(),
- loopChn: make(chan struct{}, 2),
- stopChn: stopChn,
- getAllPeers: getAllPeers,
+func newRequestDistributor(peers *peerSet, stopChn chan struct{}) *requestDistributor {
+ d := &requestDistributor{
+ reqQueue: list.New(),
+ loopChn: make(chan struct{}, 2),
+ stopChn: stopChn,
+ peers: make(map[distPeer]struct{}),
+ }
+ if peers != nil {
+ peers.notify(d)
}
- go r.loop()
- return r
+ go d.loop()
+ return d
+}
+
+// registerPeer implements peerSetNotify
+func (d *requestDistributor) registerPeer(p *peer) {
+ d.peerLock.Lock()
+ d.peers[p] = struct{}{}
+ d.peerLock.Unlock()
+}
+
+// unregisterPeer implements peerSetNotify
+func (d *requestDistributor) unregisterPeer(p *peer) {
+ d.peerLock.Lock()
+ delete(d.peers, p)
+ d.peerLock.Unlock()
+}
+
+// registerTestPeer adds a new test peer
+func (d *requestDistributor) registerTestPeer(p distPeer) {
+ d.peerLock.Lock()
+ d.peers[p] = struct{}{}
+ d.peerLock.Unlock()
}
// distMaxWait is the maximum waiting time after which further necessary waiting
@@ -152,8 +176,7 @@ func (sp selectPeerItem) Weight() int64 {
// nextRequest returns the next possible request from any peer, along with the
// associated peer and necessary waiting time
func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
- peers := d.getAllPeers()
-
+ checkedPeers := make(map[distPeer]struct{})
elem := d.reqQueue.Front()
var (
bestPeer distPeer
@@ -162,11 +185,14 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
sel *weightedRandomSelect
)
- for (len(peers) > 0 || elem == d.reqQueue.Front()) && elem != nil {
+ d.peerLock.RLock()
+ defer d.peerLock.RUnlock()
+
+ for (len(d.peers) > 0 || elem == d.reqQueue.Front()) && elem != nil {
req := elem.Value.(*distReq)
canSend := false
- for peer, _ := range peers {
- if peer.canQueue() && req.canSend(peer) {
+ for peer, _ := range d.peers {
+ if _, ok := checkedPeers[peer]; !ok && peer.canQueue() && req.canSend(peer) {
canSend = true
cost := req.getCost(peer)
wait, bufRemain := peer.waitBefore(cost)
@@ -182,7 +208,7 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
bestWait = wait
}
}
- delete(peers, peer)
+ checkedPeers[peer] = struct{}{}
}
}
next := elem.Next()