aboutsummaryrefslogtreecommitdiffstats
path: root/les/odr.go
diff options
context:
space:
mode:
Diffstat (limited to 'les/odr.go')
-rw-r--r--les/odr.go99
1 files changed, 55 insertions, 44 deletions
diff --git a/les/odr.go b/les/odr.go
index afc894ab5..06b44d318 100644
--- a/les/odr.go
+++ b/les/odr.go
@@ -32,14 +32,12 @@ import (
var (
softRequestTimeout = time.Millisecond * 500
hardRequestTimeout = time.Second * 10
- retryPeers = time.Second * 1
)
// peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func(id string)
type odrPeerSelector interface {
- selectPeerWait(uint64, func(*peer) (bool, time.Duration), <-chan struct{}) *peer
adjustResponseTime(*poolEntry, time.Duration, bool)
}
@@ -51,6 +49,7 @@ type LesOdr struct {
mlock, clock sync.Mutex
sentReqs map[uint64]*sentReq
serverPool odrPeerSelector
+ reqDist *requestDistributor
}
func NewLesOdr(db ethdb.Database) *LesOdr {
@@ -165,69 +164,81 @@ func (self *LesOdr) requestPeer(req *sentReq, peer *peer, delivered, timeout cha
func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) error {
answered := make(chan struct{})
req := &sentReq{
- valFunc: lreq.Valid,
+ valFunc: lreq.Validate,
sentTo: make(map[*peer]chan struct{}),
answered: answered, // reply delivered by any peer
}
- reqID := getNextReqID()
- self.mlock.Lock()
- self.sentReqs[reqID] = req
- self.mlock.Unlock()
+
+ exclude := make(map[*peer]struct{})
reqWg := new(sync.WaitGroup)
reqWg.Add(1)
defer reqWg.Done()
- go func() {
- reqWg.Wait()
- self.mlock.Lock()
- delete(self.sentReqs, reqID)
- self.mlock.Unlock()
- }()
- exclude := make(map[*peer]struct{})
- for {
- var p *peer
- if self.serverPool != nil {
- p = self.serverPool.selectPeerWait(reqID, func(p *peer) (bool, time.Duration) {
- if _, ok := exclude[p]; ok || !lreq.CanSend(p) {
- return false, 0
- }
- return true, p.fcServer.CanSend(lreq.GetCost(p))
- }, ctx.Done())
- }
- if p == nil {
- select {
- case <-ctx.Done():
- return ctx.Err()
- case <-req.answered:
- return nil
- case <-time.After(retryPeers):
- }
- } else {
+ var timeout chan struct{}
+ reqID := getNextReqID()
+ rq := &distReq{
+ getCost: func(dp distPeer) uint64 {
+ return lreq.GetCost(dp.(*peer))
+ },
+ canSend: func(dp distPeer) bool {
+ p := dp.(*peer)
+ _, ok := exclude[p]
+ return !ok && lreq.CanSend(p)
+ },
+ request: func(dp distPeer) func() {
+ p := dp.(*peer)
exclude[p] = struct{}{}
delivered := make(chan struct{})
- timeout := make(chan struct{})
+ timeout = make(chan struct{})
req.lock.Lock()
req.sentTo[p] = delivered
req.lock.Unlock()
reqWg.Add(1)
cost := lreq.GetCost(p)
- p.fcServer.SendRequest(reqID, cost)
+ p.fcServer.QueueRequest(reqID, cost)
go self.requestPeer(req, p, delivered, timeout, reqWg)
- lreq.Request(reqID, p)
-
- select {
- case <-ctx.Done():
- return ctx.Err()
- case <-answered:
- return nil
- case <-timeout:
+ return func() { lreq.Request(reqID, p) }
+ },
+ }
+
+ self.mlock.Lock()
+ self.sentReqs[reqID] = req
+ self.mlock.Unlock()
+
+ go func() {
+ reqWg.Wait()
+ self.mlock.Lock()
+ delete(self.sentReqs, reqID)
+ self.mlock.Unlock()
+ }()
+
+ for {
+ peerChn := self.reqDist.queue(rq)
+ select {
+ case <-ctx.Done():
+ self.reqDist.cancel(rq)
+ return ctx.Err()
+ case <-answered:
+ self.reqDist.cancel(rq)
+ return nil
+ case _, ok := <-peerChn:
+ if !ok {
+ return ErrNoPeers
}
}
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-answered:
+ return nil
+ case <-timeout:
+ }
}
}
-// Retrieve tries to fetch an object from the local db, then from the LES network.
+// Retrieve tries to fetch an object from the LES network.
// If the network retrieval was successful, it stores the object in local db.
func (self *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err error) {
lreq := LesRequest(req)