aboutsummaryrefslogtreecommitdiffstats
path: root/les/odr.go
diff options
context:
space:
mode:
Diffstat (limited to 'les/odr.go')
-rw-r--r--les/odr.go55
1 files changed, 26 insertions, 29 deletions
diff --git a/les/odr.go b/les/odr.go
index 444b1da2a..10ef928df 100644
--- a/les/odr.go
+++ b/les/odr.go
@@ -37,6 +37,11 @@ var (
// peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func(id string)
+type odrPeerSelector interface {
+ selectPeer(func(*peer) (bool, uint64)) *peer
+ adjustResponseTime(*poolEntry, time.Duration, bool)
+}
+
type LesOdr struct {
light.OdrBackend
db ethdb.Database
@@ -44,7 +49,7 @@ type LesOdr struct {
removePeer peerDropFn
mlock, clock sync.Mutex
sentReqs map[uint64]*sentReq
- peers *odrPeerSet
+ serverPool odrPeerSelector
lastReqID uint64
}
@@ -52,7 +57,6 @@ func NewLesOdr(db ethdb.Database) *LesOdr {
return &LesOdr{
db: db,
stop: make(chan struct{}),
- peers: newOdrPeerSet(),
sentReqs: make(map[uint64]*sentReq),
}
}
@@ -77,16 +81,6 @@ type sentReq struct {
answered chan struct{} // closed and set to nil when any peer answers it
}
-// RegisterPeer registers a new LES peer to the ODR capable peer set
-func (self *LesOdr) RegisterPeer(p *peer) error {
- return self.peers.register(p)
-}
-
-// UnregisterPeer removes a peer from the ODR capable peer set
-func (self *LesOdr) UnregisterPeer(p *peer) {
- self.peers.unregister(p)
-}
-
const (
MsgBlockBodies = iota
MsgCode
@@ -142,29 +136,26 @@ func (self *LesOdr) requestPeer(req *sentReq, peer *peer, delivered, timeout cha
select {
case <-delivered:
- servTime := uint64(mclock.Now() - stime)
- self.peers.updateTimeout(peer, false)
- self.peers.updateServTime(peer, servTime)
+ if self.serverPool != nil {
+ self.serverPool.adjustResponseTime(peer.poolEntry, time.Duration(mclock.Now()-stime), false)
+ }
return
case <-time.After(softRequestTimeout):
close(timeout)
- if self.peers.updateTimeout(peer, true) {
- self.removePeer(peer.id)
- }
case <-self.stop:
return
}
select {
case <-delivered:
- servTime := uint64(mclock.Now() - stime)
- self.peers.updateServTime(peer, servTime)
- return
case <-time.After(hardRequestTimeout):
- self.removePeer(peer.id)
+ go self.removePeer(peer.id)
case <-self.stop:
return
}
+ if self.serverPool != nil {
+ self.serverPool.adjustResponseTime(peer.poolEntry, time.Duration(mclock.Now()-stime), true)
+ }
}
// networkRequest sends a request to known peers until an answer is received
@@ -193,7 +184,13 @@ func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) erro
exclude := make(map[*peer]struct{})
for {
- if peer := self.peers.bestPeer(lreq, exclude); peer == nil {
+ var p *peer
+ if self.serverPool != nil {
+ p = self.serverPool.selectPeer(func(p *peer) (bool, uint64) {
+ return true, p.fcServer.CanSend(lreq.GetCost(p))
+ })
+ }
+ if p == nil {
select {
case <-ctx.Done():
return ctx.Err()
@@ -202,17 +199,17 @@ func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) erro
case <-time.After(retryPeers):
}
} else {
- exclude[peer] = struct{}{}
+ exclude[p] = struct{}{}
delivered := make(chan struct{})
timeout := make(chan struct{})
req.lock.Lock()
- req.sentTo[peer] = delivered
+ req.sentTo[p] = delivered
req.lock.Unlock()
reqWg.Add(1)
- cost := lreq.GetCost(peer)
- peer.fcServer.SendRequest(reqID, cost)
- go self.requestPeer(req, peer, delivered, timeout, reqWg)
- lreq.Request(reqID, peer)
+ cost := lreq.GetCost(p)
+ p.fcServer.SendRequest(reqID, cost)
+ go self.requestPeer(req, p, delivered, timeout, reqWg)
+ lreq.Request(reqID, p)
select {
case <-ctx.Done():