diff options
Diffstat (limited to 'les/odr.go')
-rw-r--r-- | les/odr.go | 55 |
1 files changed, 26 insertions, 29 deletions
diff --git a/les/odr.go b/les/odr.go index 444b1da2a..10ef928df 100644 --- a/les/odr.go +++ b/les/odr.go @@ -37,6 +37,11 @@ var ( // peerDropFn is a callback type for dropping a peer detected as malicious. type peerDropFn func(id string) +type odrPeerSelector interface { + selectPeer(func(*peer) (bool, uint64)) *peer + adjustResponseTime(*poolEntry, time.Duration, bool) +} + type LesOdr struct { light.OdrBackend db ethdb.Database @@ -44,7 +49,7 @@ type LesOdr struct { removePeer peerDropFn mlock, clock sync.Mutex sentReqs map[uint64]*sentReq - peers *odrPeerSet + serverPool odrPeerSelector lastReqID uint64 } @@ -52,7 +57,6 @@ func NewLesOdr(db ethdb.Database) *LesOdr { return &LesOdr{ db: db, stop: make(chan struct{}), - peers: newOdrPeerSet(), sentReqs: make(map[uint64]*sentReq), } } @@ -77,16 +81,6 @@ type sentReq struct { answered chan struct{} // closed and set to nil when any peer answers it } -// RegisterPeer registers a new LES peer to the ODR capable peer set -func (self *LesOdr) RegisterPeer(p *peer) error { - return self.peers.register(p) -} - -// UnregisterPeer removes a peer from the ODR capable peer set -func (self *LesOdr) UnregisterPeer(p *peer) { - self.peers.unregister(p) -} - const ( MsgBlockBodies = iota MsgCode @@ -142,29 +136,26 @@ func (self *LesOdr) requestPeer(req *sentReq, peer *peer, delivered, timeout cha select { case <-delivered: - servTime := uint64(mclock.Now() - stime) - self.peers.updateTimeout(peer, false) - self.peers.updateServTime(peer, servTime) + if self.serverPool != nil { + self.serverPool.adjustResponseTime(peer.poolEntry, time.Duration(mclock.Now()-stime), false) + } return case <-time.After(softRequestTimeout): close(timeout) - if self.peers.updateTimeout(peer, true) { - self.removePeer(peer.id) - } case <-self.stop: return } select { case <-delivered: - servTime := uint64(mclock.Now() - stime) - self.peers.updateServTime(peer, servTime) - return case <-time.After(hardRequestTimeout): - self.removePeer(peer.id) + go self.removePeer(peer.id) case <-self.stop: return } + if self.serverPool != nil { + self.serverPool.adjustResponseTime(peer.poolEntry, time.Duration(mclock.Now()-stime), true) + } } // networkRequest sends a request to known peers until an answer is received @@ -193,7 +184,13 @@ func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) erro exclude := make(map[*peer]struct{}) for { - if peer := self.peers.bestPeer(lreq, exclude); peer == nil { + var p *peer + if self.serverPool != nil { + p = self.serverPool.selectPeer(func(p *peer) (bool, uint64) { + return true, p.fcServer.CanSend(lreq.GetCost(p)) + }) + } + if p == nil { select { case <-ctx.Done(): return ctx.Err() @@ -202,17 +199,17 @@ func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) erro case <-time.After(retryPeers): } } else { - exclude[peer] = struct{}{} + exclude[p] = struct{}{} delivered := make(chan struct{}) timeout := make(chan struct{}) req.lock.Lock() - req.sentTo[peer] = delivered + req.sentTo[p] = delivered req.lock.Unlock() reqWg.Add(1) - cost := lreq.GetCost(peer) - peer.fcServer.SendRequest(reqID, cost) - go self.requestPeer(req, peer, delivered, timeout, reqWg) - lreq.Request(reqID, peer) + cost := lreq.GetCost(p) + p.fcServer.SendRequest(reqID, cost) + go self.requestPeer(req, p, delivered, timeout, reqWg) + lreq.Request(reqID, p) select { case <-ctx.Done(): |