aboutsummaryrefslogtreecommitdiffstats
path: root/les/odr.go
diff options
context:
space:
mode:
Diffstat (limited to 'les/odr.go')
-rw-r--r--les/odr.go195
1 files changed, 15 insertions, 180 deletions
diff --git a/les/odr.go b/les/odr.go
index 684f36c76..3f7584b48 100644
--- a/les/odr.go
+++ b/les/odr.go
@@ -18,45 +18,24 @@ package les
import (
"context"
- "crypto/rand"
- "encoding/binary"
- "sync"
- "time"
- "github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/log"
)
-var (
- softRequestTimeout = time.Millisecond * 500
- hardRequestTimeout = time.Second * 10
-)
-
-// peerDropFn is a callback type for dropping a peer detected as malicious.
-type peerDropFn func(id string)
-
-type odrPeerSelector interface {
- adjustResponseTime(*poolEntry, time.Duration, bool)
-}
-
+// LesOdr implements light.OdrBackend
type LesOdr struct {
- light.OdrBackend
- db ethdb.Database
- stop chan struct{}
- removePeer peerDropFn
- mlock, clock sync.Mutex
- sentReqs map[uint64]*sentReq
- serverPool odrPeerSelector
- reqDist *requestDistributor
+ db ethdb.Database
+ stop chan struct{}
+ retriever *retrieveManager
}
-func NewLesOdr(db ethdb.Database) *LesOdr {
+func NewLesOdr(db ethdb.Database, retriever *retrieveManager) *LesOdr {
return &LesOdr{
- db: db,
- stop: make(chan struct{}),
- sentReqs: make(map[uint64]*sentReq),
+ db: db,
+ retriever: retriever,
+ stop: make(chan struct{}),
}
}
@@ -68,17 +47,6 @@ func (odr *LesOdr) Database() ethdb.Database {
return odr.db
}
-// validatorFunc is a function that processes a message.
-type validatorFunc func(ethdb.Database, *Msg) error
-
-// sentReq is a request waiting for an answer that satisfies its valFunc
-type sentReq struct {
- valFunc validatorFunc
- sentTo map[*peer]chan struct{}
- lock sync.RWMutex // protects acces to sentTo
- answered chan struct{} // closed and set to nil when any peer answers it
-}
-
const (
MsgBlockBodies = iota
MsgCode
@@ -94,156 +62,29 @@ type Msg struct {
Obj interface{}
}
-// Deliver is called by the LES protocol manager to deliver ODR reply messages to waiting requests
-func (self *LesOdr) Deliver(peer *peer, msg *Msg) error {
- var delivered chan struct{}
- self.mlock.Lock()
- req, ok := self.sentReqs[msg.ReqID]
- self.mlock.Unlock()
- if ok {
- req.lock.Lock()
- delivered, ok = req.sentTo[peer]
- req.lock.Unlock()
- }
-
- if !ok {
- return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID)
- }
-
- if err := req.valFunc(self.db, msg); err != nil {
- peer.Log().Warn("Invalid odr response", "err", err)
- return errResp(ErrInvalidResponse, "reqID = %v", msg.ReqID)
- }
- close(delivered)
- req.lock.Lock()
- delete(req.sentTo, peer)
- if req.answered != nil {
- close(req.answered)
- req.answered = nil
- }
- req.lock.Unlock()
- return nil
-}
-
-func (self *LesOdr) requestPeer(req *sentReq, peer *peer, delivered, timeout chan struct{}, reqWg *sync.WaitGroup) {
- stime := mclock.Now()
- defer func() {
- req.lock.Lock()
- delete(req.sentTo, peer)
- req.lock.Unlock()
- reqWg.Done()
- }()
-
- select {
- case <-delivered:
- if self.serverPool != nil {
- self.serverPool.adjustResponseTime(peer.poolEntry, time.Duration(mclock.Now()-stime), false)
- }
- return
- case <-time.After(softRequestTimeout):
- close(timeout)
- case <-self.stop:
- return
- }
-
- select {
- case <-delivered:
- case <-time.After(hardRequestTimeout):
- peer.Log().Debug("Request timed out hard")
- go self.removePeer(peer.id)
- case <-self.stop:
- return
- }
- if self.serverPool != nil {
- self.serverPool.adjustResponseTime(peer.poolEntry, time.Duration(mclock.Now()-stime), true)
- }
-}
-
-// networkRequest sends a request to known peers until an answer is received
-// or the context is cancelled
-func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) error {
- answered := make(chan struct{})
- req := &sentReq{
- valFunc: lreq.Validate,
- sentTo: make(map[*peer]chan struct{}),
- answered: answered, // reply delivered by any peer
- }
-
- exclude := make(map[*peer]struct{})
-
- reqWg := new(sync.WaitGroup)
- reqWg.Add(1)
- defer reqWg.Done()
+// Retrieve tries to fetch an object from the LES network.
+// If the network retrieval was successful, it stores the object in local db.
+func (self *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err error) {
+ lreq := LesRequest(req)
- var timeout chan struct{}
- reqID := getNextReqID()
+ reqID := genReqID()
rq := &distReq{
getCost: func(dp distPeer) uint64 {
return lreq.GetCost(dp.(*peer))
},
canSend: func(dp distPeer) bool {
p := dp.(*peer)
- _, ok := exclude[p]
- return !ok && lreq.CanSend(p)
+ return lreq.CanSend(p)
},
request: func(dp distPeer) func() {
p := dp.(*peer)
- exclude[p] = struct{}{}
- delivered := make(chan struct{})
- timeout = make(chan struct{})
- req.lock.Lock()
- req.sentTo[p] = delivered
- req.lock.Unlock()
- reqWg.Add(1)
cost := lreq.GetCost(p)
p.fcServer.QueueRequest(reqID, cost)
- go self.requestPeer(req, p, delivered, timeout, reqWg)
return func() { lreq.Request(reqID, p) }
},
}
- self.mlock.Lock()
- self.sentReqs[reqID] = req
- self.mlock.Unlock()
-
- go func() {
- reqWg.Wait()
- self.mlock.Lock()
- delete(self.sentReqs, reqID)
- self.mlock.Unlock()
- }()
-
- for {
- peerChn := self.reqDist.queue(rq)
- select {
- case <-ctx.Done():
- self.reqDist.cancel(rq)
- return ctx.Err()
- case <-answered:
- self.reqDist.cancel(rq)
- return nil
- case _, ok := <-peerChn:
- if !ok {
- return ErrNoPeers
- }
- }
-
- select {
- case <-ctx.Done():
- return ctx.Err()
- case <-answered:
- return nil
- case <-timeout:
- }
- }
-}
-
-// Retrieve tries to fetch an object from the LES network.
-// If the network retrieval was successful, it stores the object in local db.
-func (self *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err error) {
- lreq := LesRequest(req)
- err = self.networkRequest(ctx, lreq)
- if err == nil {
+ if err = self.retriever.retrieve(ctx, reqID, rq, func(p distPeer, msg *Msg) error { return lreq.Validate(self.db, msg) }); err == nil {
// retrieved from network, store in db
req.StoreResult(self.db)
} else {
@@ -251,9 +92,3 @@ func (self *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err err
}
return
}
-
-func getNextReqID() uint64 {
- var rnd [8]byte
- rand.Read(rnd[:])
- return binary.BigEndian.Uint64(rnd[:])
-}