aboutsummaryrefslogtreecommitdiffstats
path: root/les/distributor.go
diff options
context:
space:
mode:
Diffstat (limited to 'les/distributor.go')
-rw-r--r--les/distributor.go33
1 files changed, 22 insertions, 11 deletions
diff --git a/les/distributor.go b/les/distributor.go
index 1de267f27..9235adc03 100644
--- a/les/distributor.go
+++ b/les/distributor.go
@@ -62,9 +62,10 @@ type distReq struct {
canSend func(distPeer) bool
request func(distPeer) func()
- reqOrder uint64
- sentChn chan distPeer
- element *list.Element
+ reqOrder uint64
+ sentChn chan distPeer
+ element *list.Element
+ waitForPeers mclock.AbsTime
}
// newRequestDistributor creates a new request distributor
@@ -106,7 +107,11 @@ func (d *requestDistributor) registerTestPeer(p distPeer) {
// distMaxWait is the maximum waiting time after which further necessary waiting
// times are recalculated based on new feedback from the servers
-const distMaxWait = time.Millisecond * 10
+const distMaxWait = time.Millisecond * 50
+
+// waitForPeers is the time window in which a request does not fail even if it
+// has no suitable peers to send to at the moment
+const waitForPeers = time.Second * 3
// main event loop
func (d *requestDistributor) loop() {
@@ -179,8 +184,6 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
checkedPeers := make(map[distPeer]struct{})
elem := d.reqQueue.Front()
var (
- bestPeer distPeer
- bestReq *distReq
bestWait time.Duration
sel *weightedRandomSelect
)
@@ -188,9 +191,18 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
d.peerLock.RLock()
defer d.peerLock.RUnlock()
- for (len(d.peers) > 0 || elem == d.reqQueue.Front()) && elem != nil {
+ peerCount := len(d.peers)
+ for (len(checkedPeers) < peerCount || elem == d.reqQueue.Front()) && elem != nil {
req := elem.Value.(*distReq)
canSend := false
+ now := d.clock.Now()
+ if req.waitForPeers > now {
+ canSend = true
+ wait := time.Duration(req.waitForPeers - now)
+ if bestWait == 0 || wait < bestWait {
+ bestWait = wait
+ }
+ }
for peer := range d.peers {
if _, ok := checkedPeers[peer]; !ok && peer.canQueue() && req.canSend(peer) {
canSend = true
@@ -202,9 +214,7 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
}
sel.update(selectPeerItem{peer: peer, req: req, weight: int64(bufRemain*1000000) + 1})
} else {
- if bestReq == nil || wait < bestWait {
- bestPeer = peer
- bestReq = req
+ if bestWait == 0 || wait < bestWait {
bestWait = wait
}
}
@@ -223,7 +233,7 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
c := sel.choose().(selectPeerItem)
return c.peer, c.req, 0
}
- return bestPeer, bestReq, bestWait
+ return nil, nil, bestWait
}
// queue adds a request to the distribution queue, returns a channel where the
@@ -237,6 +247,7 @@ func (d *requestDistributor) queue(r *distReq) chan distPeer {
if r.reqOrder == 0 {
d.lastReqOrder++
r.reqOrder = d.lastReqOrder
+ r.waitForPeers = d.clock.Now() + mclock.AbsTime(waitForPeers)
}
back := d.reqQueue.Back()