aboutsummaryrefslogtreecommitdiffstats
path: root/les/odr.go
diff options
context:
space:
mode:
Diffstat (limited to 'les/odr.go')
-rw-r--r--les/odr.go73
1 files changed, 36 insertions, 37 deletions
diff --git a/les/odr.go b/les/odr.go
index 444b1da2a..8878508c4 100644
--- a/les/odr.go
+++ b/les/odr.go
@@ -17,6 +17,8 @@
package les
import (
+ "crypto/rand"
+ "encoding/binary"
"sync"
"time"
@@ -37,6 +39,11 @@ var (
// peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func(id string)
+type odrPeerSelector interface {
+ selectPeer(func(*peer) (bool, uint64)) *peer
+ adjustResponseTime(*poolEntry, time.Duration, bool)
+}
+
type LesOdr struct {
light.OdrBackend
db ethdb.Database
@@ -44,15 +51,13 @@ type LesOdr struct {
removePeer peerDropFn
mlock, clock sync.Mutex
sentReqs map[uint64]*sentReq
- peers *odrPeerSet
- lastReqID uint64
+ serverPool odrPeerSelector
}
func NewLesOdr(db ethdb.Database) *LesOdr {
return &LesOdr{
db: db,
stop: make(chan struct{}),
- peers: newOdrPeerSet(),
sentReqs: make(map[uint64]*sentReq),
}
}
@@ -77,16 +82,6 @@ type sentReq struct {
answered chan struct{} // closed and set to nil when any peer answers it
}
-// RegisterPeer registers a new LES peer to the ODR capable peer set
-func (self *LesOdr) RegisterPeer(p *peer) error {
- return self.peers.register(p)
-}
-
-// UnregisterPeer removes a peer from the ODR capable peer set
-func (self *LesOdr) UnregisterPeer(p *peer) {
- self.peers.unregister(p)
-}
-
const (
MsgBlockBodies = iota
MsgCode
@@ -142,29 +137,26 @@ func (self *LesOdr) requestPeer(req *sentReq, peer *peer, delivered, timeout cha
select {
case <-delivered:
- servTime := uint64(mclock.Now() - stime)
- self.peers.updateTimeout(peer, false)
- self.peers.updateServTime(peer, servTime)
+ if self.serverPool != nil {
+ self.serverPool.adjustResponseTime(peer.poolEntry, time.Duration(mclock.Now()-stime), false)
+ }
return
case <-time.After(softRequestTimeout):
close(timeout)
- if self.peers.updateTimeout(peer, true) {
- self.removePeer(peer.id)
- }
case <-self.stop:
return
}
select {
case <-delivered:
- servTime := uint64(mclock.Now() - stime)
- self.peers.updateServTime(peer, servTime)
- return
case <-time.After(hardRequestTimeout):
- self.removePeer(peer.id)
+ go self.removePeer(peer.id)
case <-self.stop:
return
}
+ if self.serverPool != nil {
+ self.serverPool.adjustResponseTime(peer.poolEntry, time.Duration(mclock.Now()-stime), true)
+ }
}
// networkRequest sends a request to known peers until an answer is received
@@ -176,7 +168,7 @@ func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) erro
sentTo: make(map[*peer]chan struct{}),
answered: answered, // reply delivered by any peer
}
- reqID := self.getNextReqID()
+ reqID := getNextReqID()
self.mlock.Lock()
self.sentReqs[reqID] = req
self.mlock.Unlock()
@@ -193,7 +185,16 @@ func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) erro
exclude := make(map[*peer]struct{})
for {
- if peer := self.peers.bestPeer(lreq, exclude); peer == nil {
+ var p *peer
+ if self.serverPool != nil {
+ p = self.serverPool.selectPeer(func(p *peer) (bool, uint64) {
+ if !lreq.CanSend(p) {
+ return false, 0
+ }
+ return true, p.fcServer.CanSend(lreq.GetCost(p))
+ })
+ }
+ if p == nil {
select {
case <-ctx.Done():
return ctx.Err()
@@ -202,17 +203,17 @@ func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) erro
case <-time.After(retryPeers):
}
} else {
- exclude[peer] = struct{}{}
+ exclude[p] = struct{}{}
delivered := make(chan struct{})
timeout := make(chan struct{})
req.lock.Lock()
- req.sentTo[peer] = delivered
+ req.sentTo[p] = delivered
req.lock.Unlock()
reqWg.Add(1)
- cost := lreq.GetCost(peer)
- peer.fcServer.SendRequest(reqID, cost)
- go self.requestPeer(req, peer, delivered, timeout, reqWg)
- lreq.Request(reqID, peer)
+ cost := lreq.GetCost(p)
+ p.fcServer.SendRequest(reqID, cost)
+ go self.requestPeer(req, p, delivered, timeout, reqWg)
+ lreq.Request(reqID, p)
select {
case <-ctx.Done():
@@ -239,10 +240,8 @@ func (self *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err err
return
}
-func (self *LesOdr) getNextReqID() uint64 {
- self.clock.Lock()
- defer self.clock.Unlock()
-
- self.lastReqID++
- return self.lastReqID
+func getNextReqID() uint64 {
+ var rnd [8]byte
+ rand.Read(rnd[:])
+ return binary.BigEndian.Uint64(rnd[:])
}