diff options
Diffstat (limited to 'les/distributor.go')
-rw-r--r-- | les/distributor.go | 33 |
1 files changed, 22 insertions, 11 deletions
diff --git a/les/distributor.go b/les/distributor.go index 1de267f27..9235adc03 100644 --- a/les/distributor.go +++ b/les/distributor.go @@ -62,9 +62,10 @@ type distReq struct { canSend func(distPeer) bool request func(distPeer) func() - reqOrder uint64 - sentChn chan distPeer - element *list.Element + reqOrder uint64 + sentChn chan distPeer + element *list.Element + waitForPeers mclock.AbsTime } // newRequestDistributor creates a new request distributor @@ -106,7 +107,11 @@ func (d *requestDistributor) registerTestPeer(p distPeer) { // distMaxWait is the maximum waiting time after which further necessary waiting // times are recalculated based on new feedback from the servers -const distMaxWait = time.Millisecond * 10 +const distMaxWait = time.Millisecond * 50 + +// waitForPeers is the time window in which a request does not fail even if it +// has no suitable peers to send to at the moment +const waitForPeers = time.Second * 3 // main event loop func (d *requestDistributor) loop() { @@ -179,8 +184,6 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) { checkedPeers := make(map[distPeer]struct{}) elem := d.reqQueue.Front() var ( - bestPeer distPeer - bestReq *distReq bestWait time.Duration sel *weightedRandomSelect ) @@ -188,9 +191,18 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) { d.peerLock.RLock() defer d.peerLock.RUnlock() - for (len(d.peers) > 0 || elem == d.reqQueue.Front()) && elem != nil { + peerCount := len(d.peers) + for (len(checkedPeers) < peerCount || elem == d.reqQueue.Front()) && elem != nil { req := elem.Value.(*distReq) canSend := false + now := d.clock.Now() + if req.waitForPeers > now { + canSend = true + wait := time.Duration(req.waitForPeers - now) + if bestWait == 0 || wait < bestWait { + bestWait = wait + } + } for peer := range d.peers { if _, ok := checkedPeers[peer]; !ok && peer.canQueue() && req.canSend(peer) { canSend = true @@ -202,9 +214,7 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) { } sel.update(selectPeerItem{peer: peer, req: req, weight: int64(bufRemain*1000000) + 1}) } else { - if bestReq == nil || wait < bestWait { - bestPeer = peer - bestReq = req + if bestWait == 0 || wait < bestWait { bestWait = wait } } @@ -223,7 +233,7 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) { c := sel.choose().(selectPeerItem) return c.peer, c.req, 0 } - return bestPeer, bestReq, bestWait + return nil, nil, bestWait } // queue adds a request to the distribution queue, returns a channel where the @@ -237,6 +247,7 @@ func (d *requestDistributor) queue(r *distReq) chan distPeer { if r.reqOrder == 0 { d.lastReqOrder++ r.reqOrder = d.lastReqOrder + r.waitForPeers = d.clock.Now() + mclock.AbsTime(waitForPeers) } back := d.reqQueue.Back() |