aboutsummaryrefslogtreecommitdiffstats
path: root/les/retrieve.go
diff options
context:
space:
mode:
Diffstat (limited to 'les/retrieve.go')
-rw-r--r--les/retrieve.go395
1 files changed, 395 insertions, 0 deletions
diff --git a/les/retrieve.go b/les/retrieve.go
new file mode 100644
index 000000000..b060e0b0d
--- /dev/null
+++ b/les/retrieve.go
@@ -0,0 +1,395 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Package light implements on-demand retrieval capable state and chain objects
+// for the Ethereum Light Client.
+package les
+
+import (
+ "context"
+ "crypto/rand"
+ "encoding/binary"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common/mclock"
+)
+
+var (
+ retryQueue = time.Millisecond * 100
+ softRequestTimeout = time.Millisecond * 500
+ hardRequestTimeout = time.Second * 10
+)
+
+// retrieveManager is a layer on top of requestDistributor which takes care of
+// matching replies by request ID and handles timeouts and resends if necessary.
+type retrieveManager struct {
+ dist *requestDistributor
+ peers *peerSet
+ serverPool peerSelector
+
+ lock sync.RWMutex
+ sentReqs map[uint64]*sentReq
+}
+
+// validatorFunc is a function that processes a reply message
+type validatorFunc func(distPeer, *Msg) error
+
+// peerSelector receives feedback info about response times and timeouts
+type peerSelector interface {
+ adjustResponseTime(*poolEntry, time.Duration, bool)
+}
+
+// sentReq represents a request sent and tracked by retrieveManager
+type sentReq struct {
+ rm *retrieveManager
+ req *distReq
+ id uint64
+ validate validatorFunc
+
+ eventsCh chan reqPeerEvent
+ stopCh chan struct{}
+ stopped bool
+ err error
+
+ lock sync.RWMutex // protect access to sentTo map
+ sentTo map[distPeer]sentReqToPeer
+
+ reqQueued bool // a request has been queued but not sent
+ reqSent bool // a request has been sent but not timed out
+ reqSrtoCount int // number of requests that reached soft (but not hard) timeout
+}
+
+// sentReqToPeer notifies the request-from-peer goroutine (tryRequest) about a response
+// delivered by the given peer. Only one delivery is allowed per request per peer,
+// after which delivered is set to true, the validity of the response is sent on the
+// valid channel and no more responses are accepted.
+type sentReqToPeer struct {
+ delivered bool
+ valid chan bool
+}
+
+// reqPeerEvent is sent by the request-from-peer goroutine (tryRequest) to the
+// request state machine (retrieveLoop) through the eventsCh channel.
+type reqPeerEvent struct {
+ event int
+ peer distPeer
+}
+
+const (
+ rpSent = iota // if peer == nil, not sent (no suitable peers)
+ rpSoftTimeout
+ rpHardTimeout
+ rpDeliveredValid
+ rpDeliveredInvalid
+)
+
+// newRetrieveManager creates the retrieve manager
+func newRetrieveManager(peers *peerSet, dist *requestDistributor, serverPool peerSelector) *retrieveManager {
+ return &retrieveManager{
+ peers: peers,
+ dist: dist,
+ serverPool: serverPool,
+ sentReqs: make(map[uint64]*sentReq),
+ }
+}
+
+// retrieve sends a request (to multiple peers if necessary) and waits for an answer
+// that is delivered through the deliver function and successfully validated by the
+// validator callback. It returns when a valid answer is delivered or the context is
+// cancelled.
+func (rm *retrieveManager) retrieve(ctx context.Context, reqID uint64, req *distReq, val validatorFunc) error {
+ sentReq := rm.sendReq(reqID, req, val)
+ select {
+ case <-sentReq.stopCh:
+ case <-ctx.Done():
+ sentReq.stop(ctx.Err())
+ }
+ return sentReq.getError()
+}
+
+// sendReq starts a process that keeps trying to retrieve a valid answer for a
+// request from any suitable peers until stopped or succeeded.
+func (rm *retrieveManager) sendReq(reqID uint64, req *distReq, val validatorFunc) *sentReq {
+ r := &sentReq{
+ rm: rm,
+ req: req,
+ id: reqID,
+ sentTo: make(map[distPeer]sentReqToPeer),
+ stopCh: make(chan struct{}),
+ eventsCh: make(chan reqPeerEvent, 10),
+ validate: val,
+ }
+
+ canSend := req.canSend
+ req.canSend = func(p distPeer) bool {
+ // add an extra check to canSend: the request has not been sent to the same peer before
+ r.lock.RLock()
+ _, sent := r.sentTo[p]
+ r.lock.RUnlock()
+ return !sent && canSend(p)
+ }
+
+ request := req.request
+ req.request = func(p distPeer) func() {
+ // before actually sending the request, put an entry into the sentTo map
+ r.lock.Lock()
+ r.sentTo[p] = sentReqToPeer{false, make(chan bool, 1)}
+ r.lock.Unlock()
+ return request(p)
+ }
+ rm.lock.Lock()
+ rm.sentReqs[reqID] = r
+ rm.lock.Unlock()
+
+ go r.retrieveLoop()
+ return r
+}
+
+// deliver is called by the LES protocol manager to deliver reply messages to waiting requests
+func (rm *retrieveManager) deliver(peer distPeer, msg *Msg) error {
+ rm.lock.RLock()
+ req, ok := rm.sentReqs[msg.ReqID]
+ rm.lock.RUnlock()
+
+ if ok {
+ return req.deliver(peer, msg)
+ }
+ return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID)
+}
+
+// reqStateFn represents a state of the retrieve loop state machine
+type reqStateFn func() reqStateFn
+
+// retrieveLoop is the retrieval state machine event loop
+func (r *sentReq) retrieveLoop() {
+ go r.tryRequest()
+ r.reqQueued = true
+ state := r.stateRequesting
+
+ for state != nil {
+ state = state()
+ }
+
+ r.rm.lock.Lock()
+ delete(r.rm.sentReqs, r.id)
+ r.rm.lock.Unlock()
+}
+
+// stateRequesting: a request has been queued or sent recently; when it reaches soft timeout,
+// a new request is sent to a new peer
+func (r *sentReq) stateRequesting() reqStateFn {
+ select {
+ case ev := <-r.eventsCh:
+ r.update(ev)
+ switch ev.event {
+ case rpSent:
+ if ev.peer == nil {
+ // request send failed, no more suitable peers
+ if r.waiting() {
+ // we are already waiting for sent requests which may succeed so keep waiting
+ return r.stateNoMorePeers
+ }
+ // nothing to wait for, no more peers to ask, return with error
+ r.stop(ErrNoPeers)
+ // no need to go to stopped state because waiting() already returned false
+ return nil
+ }
+ case rpSoftTimeout:
+ // last request timed out, try asking a new peer
+ go r.tryRequest()
+ r.reqQueued = true
+ return r.stateRequesting
+ case rpDeliveredValid:
+ r.stop(nil)
+ return r.stateStopped
+ }
+ return r.stateRequesting
+ case <-r.stopCh:
+ return r.stateStopped
+ }
+}
+
+// stateNoMorePeers: could not send more requests because no suitable peers are available.
+// Peers may become suitable for a certain request later or new peers may appear so we
+// keep trying.
+func (r *sentReq) stateNoMorePeers() reqStateFn {
+ select {
+ case <-time.After(retryQueue):
+ go r.tryRequest()
+ r.reqQueued = true
+ return r.stateRequesting
+ case ev := <-r.eventsCh:
+ r.update(ev)
+ if ev.event == rpDeliveredValid {
+ r.stop(nil)
+ return r.stateStopped
+ }
+ return r.stateNoMorePeers
+ case <-r.stopCh:
+ return r.stateStopped
+ }
+}
+
+// stateStopped: request succeeded or cancelled, just waiting for some peers
+// to either answer or time out hard
+func (r *sentReq) stateStopped() reqStateFn {
+ for r.waiting() {
+ r.update(<-r.eventsCh)
+ }
+ return nil
+}
+
+// update updates the queued/sent flags and timed out peers counter according to the event
+func (r *sentReq) update(ev reqPeerEvent) {
+ switch ev.event {
+ case rpSent:
+ r.reqQueued = false
+ if ev.peer != nil {
+ r.reqSent = true
+ }
+ case rpSoftTimeout:
+ r.reqSent = false
+ r.reqSrtoCount++
+ case rpHardTimeout, rpDeliveredValid, rpDeliveredInvalid:
+ r.reqSrtoCount--
+ }
+}
+
+// waiting returns true if the retrieval mechanism is waiting for an answer from
+// any peer
+func (r *sentReq) waiting() bool {
+ return r.reqQueued || r.reqSent || r.reqSrtoCount > 0
+}
+
+// tryRequest tries to send the request to a new peer and waits for it to either
+// succeed or time out if it has been sent. It also sends the appropriate reqPeerEvent
+// messages to the request's event channel.
+func (r *sentReq) tryRequest() {
+ sent := r.rm.dist.queue(r.req)
+ var p distPeer
+ select {
+ case p = <-sent:
+ case <-r.stopCh:
+ if r.rm.dist.cancel(r.req) {
+ p = nil
+ } else {
+ p = <-sent
+ }
+ }
+
+ r.eventsCh <- reqPeerEvent{rpSent, p}
+ if p == nil {
+ return
+ }
+
+ reqSent := mclock.Now()
+ srto, hrto := false, false
+
+ r.lock.RLock()
+ s, ok := r.sentTo[p]
+ r.lock.RUnlock()
+ if !ok {
+ panic(nil)
+ }
+
+ defer func() {
+ // send feedback to server pool and remove peer if hard timeout happened
+ pp, ok := p.(*peer)
+ if ok && r.rm.serverPool != nil {
+ respTime := time.Duration(mclock.Now() - reqSent)
+ r.rm.serverPool.adjustResponseTime(pp.poolEntry, respTime, srto)
+ }
+ if hrto {
+ pp.Log().Debug("Request timed out hard")
+ if r.rm.peers != nil {
+ r.rm.peers.Unregister(pp.id)
+ }
+ }
+
+ r.lock.Lock()
+ delete(r.sentTo, p)
+ r.lock.Unlock()
+ }()
+
+ select {
+ case ok := <-s.valid:
+ if ok {
+ r.eventsCh <- reqPeerEvent{rpDeliveredValid, p}
+ } else {
+ r.eventsCh <- reqPeerEvent{rpDeliveredInvalid, p}
+ }
+ return
+ case <-time.After(softRequestTimeout):
+ srto = true
+ r.eventsCh <- reqPeerEvent{rpSoftTimeout, p}
+ }
+
+ select {
+ case ok := <-s.valid:
+ if ok {
+ r.eventsCh <- reqPeerEvent{rpDeliveredValid, p}
+ } else {
+ r.eventsCh <- reqPeerEvent{rpDeliveredInvalid, p}
+ }
+ case <-time.After(hardRequestTimeout):
+ hrto = true
+ r.eventsCh <- reqPeerEvent{rpHardTimeout, p}
+ }
+}
+
+// deliver a reply belonging to this request
+func (r *sentReq) deliver(peer distPeer, msg *Msg) error {
+ r.lock.Lock()
+ defer r.lock.Unlock()
+
+ s, ok := r.sentTo[peer]
+ if !ok || s.delivered {
+ return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID)
+ }
+ valid := r.validate(peer, msg) == nil
+ r.sentTo[peer] = sentReqToPeer{true, s.valid}
+ s.valid <- valid
+ if !valid {
+ return errResp(ErrInvalidResponse, "reqID = %v", msg.ReqID)
+ }
+ return nil
+}
+
+// stop stops the retrieval process and sets an error code that will be returned
+// by getError
+func (r *sentReq) stop(err error) {
+ r.lock.Lock()
+ if !r.stopped {
+ r.stopped = true
+ r.err = err
+ close(r.stopCh)
+ }
+ r.lock.Unlock()
+}
+
+// getError returns any retrieval error (either internally generated or set by the
+// stop function) after stopCh has been closed
+func (r *sentReq) getError() error {
+ return r.err
+}
+
+// genReqID generates a new random request ID
+func genReqID() uint64 {
+ var rnd [8]byte
+ rand.Read(rnd[:])
+ return binary.BigEndian.Uint64(rnd[:])
+}