// Copyright 2017 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // The go-ethereum library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . package les import ( "container/list" "sync" "time" "github.com/ethereum/go-ethereum/common/mclock" ) // requestDistributor implements a mechanism that distributes requests to // suitable peers, obeying flow control rules and prioritizing them in creation // order (even when a resend is necessary). type requestDistributor struct { clock mclock.Clock reqQueue *list.List lastReqOrder uint64 peers map[distPeer]struct{} peerLock sync.RWMutex stopChn, loopChn chan struct{} loopNextSent bool lock sync.Mutex } // distPeer is an LES server peer interface for the request distributor. // waitBefore returns either the necessary waiting time before sending a request // with the given upper estimated cost or the estimated remaining relative buffer // value after sending such a request (in which case the request can be sent // immediately). At least one of these values is always zero. type distPeer interface { waitBefore(uint64) (time.Duration, float64) canQueue() bool queueSend(f func()) } // distReq is the request abstraction used by the distributor. It is based on // three callback functions: // - getCost returns the upper estimate of the cost of sending the request to a given peer // - canSend tells if the server peer is suitable to serve the request // - request prepares sending the request to the given peer and returns a function that // does the actual sending. Request order should be preserved but the callback itself should not // block until it is sent because other peers might still be able to receive requests while // one of them is blocking. Instead, the returned function is put in the peer's send queue. type distReq struct { getCost func(distPeer) uint64 canSend func(distPeer) bool request func(distPeer) func() reqOrder uint64 sentChn chan distPeer element *list.Element waitForPeers mclock.AbsTime } // newRequestDistributor creates a new request distributor func newRequestDistributor(peers *peerSet, stopChn chan struct{}, clock mclock.Clock) *requestDistributor { d := &requestDistributor{ clock: clock, reqQueue: list.New(), loopChn: make(chan struct{}, 2), stopChn: stopChn, peers: make(map[distPeer]struct{}), } if peers != nil { peers.notify(d) } go d.loop() return d } // registerPeer implements peerSetNotify func (d *requestDistributor) registerPeer(p *peer) { d.peerLock.Lock() d.peers[p] = struct{}{} d.peerLock.Unlock() } // unregisterPeer implements peerSetNotify func (d *requestDistributor) unregisterPeer(p *peer) { d.peerLock.Lock() delete(d.peers, p) d.peerLock.Unlock() } // registerTestPeer adds a new test peer func (d *requestDistributor) registerTestPeer(p distPeer) { d.peerLock.Lock() d.peers[p] = struct{}{} d.peerLock.Unlock() } // distMaxWait is the maximum waiting time after which further necessary waiting // times are recalculated based on new feedback from the servers const distMaxWait = time.Millisecond * 50 // waitForPeers is the time window in which a request does not fail even if it // has no suitable peers to send to at the moment const waitForPeers = time.Second * 3 // main event loop func (d *requestDistributor) loop() { for { select { case <-d.stopChn: d.lock.Lock() elem := d.reqQueue.Front() for elem != nil { req := elem.Value.(*distReq) close(req.sentChn) req.sentChn = nil elem = elem.Next() } d.lock.Unlock() return case <-d.loopChn: d.lock.Lock() d.loopNextSent = false loop: for { peer, req, wait := d.nextRequest() if req != nil && wait == 0 { chn := req.sentChn // save sentChn because remove sets it to nil d.remove(req) send := req.request(peer) if send != nil { peer.queueSend(send) } chn <- peer close(chn) } else { if wait == 0 { // no request to send and nothing to wait for; the next // queued request will wake up the loop break loop } d.loopNextSent = true // a "next" signal has been sent, do not send another one until this one has been received if wait > distMaxWait { // waiting times may be reduced by incoming request replies, if it is too long, recalculate it periodically wait = distMaxWait } go func() { d.clock.Sleep(wait) d.loopChn <- struct{}{} }() break loop } } d.lock.Unlock() } } } // selectPeerItem represents a peer to be selected for a request by weightedRandomSelect type selectPeerItem struct { peer distPeer req *distReq weight int64 } // Weight implements wrsItem interface func (sp selectPeerItem) Weight() int64 { return sp.weight } // nextRequest returns the next possible request from any peer, along with the // associated peer and necessary waiting time func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) { checkedPeers := make(map[distPeer]struct{}) elem := d.reqQueue.Front() var ( bestWait time.Duration sel *weightedRandomSelect ) d.peerLock.RLock() defer d.peerLock.RUnlock() peerCount := len(d.peers) for (len(checkedPeers) < peerCount || elem == d.reqQueue.Front()) && elem != nil { req := elem.Value.(*distReq) canSend := false now := d.clock.Now() if req.waitForPeers > now { canSend = true wait := time.Duration(req.waitForPeers - now) if bestWait == 0 || wait < bestWait { bestWait = wait } } for peer := range d.peers { if _, ok := checkedPeers[peer]; !ok && peer.canQueue() && req.canSend(peer) { canSend = true cost := req.getCost(peer) wait, bufRemain := peer.waitBefore(cost) if wait == 0 { if sel == nil { sel = newWeightedRandomSelect() } sel.update(selectPeerItem{peer: peer, req: req, weight: int64(bufRemain*1000000) + 1}) } else { if bestWait == 0 || wait < bestWait { bestWait = wait } } checkedPeers[peer] = struct{}{} } } next := elem.Next() if !canSend && elem == d.reqQueue.Front() { close(req.sentChn) d.remove(req) } elem = next } if sel != nil { c := sel.choose().(selectPeerItem) return c.peer, c.req, 0 } return nil, nil, bestWait } // queue adds a request to the distribution queue, returns a channel where the // receiving peer is sent once the request has been sent (request callback returned). // If the request is cancelled or timed out without suitable peers, the channel is // closed without sending any peer references to it. func (d *requestDistributor) queue(r *distReq) chan distPeer { d.lock.Lock() defer d.lock.Unlock() if r.reqOrder == 0 { d.lastReqOrder++ r.reqOrder = d.lastReqOrder r.waitForPeers = d.clock.Now() + mclock.AbsTime(waitForPeers) } back := d.reqQueue.Back() if back == nil || r.reqOrder > back.Value.(*distReq).reqOrder { r.element = d.reqQueue.PushBack(r) } else { before := d.reqQueue.Front() for before.Value.(*distReq).reqOrder < r.reqOrder { before = before.Next() } r.element = d.reqQueue.InsertBefore(r, before) } if !d.loopNextSent { d.loopNextSent = true d.loopChn <- struct{}{} } r.sentChn = make(chan distPeer, 1) return r.sentChn } // cancel removes a request from the queue if it has not been sent yet (returns // false if it has been sent already). It is guaranteed that the callback functions // will not be called after cancel returns. func (d *requestDistributor) cancel(r *distReq) bool { d.lock.Lock() defer d.lock.Unlock() if r.sentChn == nil { return false } close(r.sentChn) d.remove(r) return true } // remove removes a request from the queue func (d *requestDistributor) remove(r *distReq) { r.sentChn = nil if r.element != nil { d.reqQueue.Remove(r.element) r.element = nil } }