aboutsummaryrefslogtreecommitdiffstats
path: root/les/retrieve.go
diff options
context:
space:
mode:
authorFelföldi Zsolt <zsfelfoldi@gmail.com>2019-05-31 02:51:13 +0800
committerGitHub <noreply@github.com>2019-05-31 02:51:13 +0800
commit58497f46bd0bdd105828c30500e863e826e598cd (patch)
tree7c658530edfebb6e47ed5f993753f4b48fd1747e /les/retrieve.go
parent3d58268bba92c6c8f7f035bafcd1608bc22cee51 (diff)
downloadgo-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.tar
go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.tar.gz
go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.tar.bz2
go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.tar.lz
go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.tar.xz
go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.tar.zst
go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.zip
les, les/flowcontrol: implement LES/3 (#19329)
les, les/flowcontrol: implement LES/3
Diffstat (limited to 'les/retrieve.go')
-rw-r--r--les/retrieve.go70
1 files changed, 53 insertions, 17 deletions
diff --git a/les/retrieve.go b/les/retrieve.go
index dd9d14598..d17a02e1a 100644
--- a/les/retrieve.go
+++ b/les/retrieve.go
@@ -78,8 +78,8 @@ type sentReq struct {
// after which delivered is set to true, the validity of the response is sent on the
// valid channel and no more responses are accepted.
type sentReqToPeer struct {
- delivered bool
- valid chan bool
+ delivered, frozen bool
+ event chan int
}
// reqPeerEvent is sent by the request-from-peer goroutine (tryRequest) to the
@@ -95,6 +95,7 @@ const (
rpHardTimeout
rpDeliveredValid
rpDeliveredInvalid
+ rpNotDelivered
)
// newRetrieveManager creates the retrieve manager
@@ -149,7 +150,7 @@ func (rm *retrieveManager) sendReq(reqID uint64, req *distReq, val validatorFunc
req.request = func(p distPeer) func() {
// before actually sending the request, put an entry into the sentTo map
r.lock.Lock()
- r.sentTo[p] = sentReqToPeer{false, make(chan bool, 1)}
+ r.sentTo[p] = sentReqToPeer{delivered: false, frozen: false, event: make(chan int, 1)}
r.lock.Unlock()
return request(p)
}
@@ -173,6 +174,17 @@ func (rm *retrieveManager) deliver(peer distPeer, msg *Msg) error {
return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID)
}
+// frozen is called by the LES protocol manager when a server has suspended its service and we
+// should not expect an answer for the requests already sent there
+func (rm *retrieveManager) frozen(peer distPeer) {
+ rm.lock.RLock()
+ defer rm.lock.RUnlock()
+
+ for _, req := range rm.sentReqs {
+ req.frozen(peer)
+ }
+}
+
// reqStateFn represents a state of the retrieve loop state machine
type reqStateFn func() reqStateFn
@@ -215,7 +227,7 @@ func (r *sentReq) stateRequesting() reqStateFn {
go r.tryRequest()
r.lastReqQueued = true
return r.stateRequesting
- case rpDeliveredInvalid:
+ case rpDeliveredInvalid, rpNotDelivered:
// if it was the last sent request (set to nil by update) then start a new one
if !r.lastReqQueued && r.lastReqSentTo == nil {
go r.tryRequest()
@@ -277,7 +289,7 @@ func (r *sentReq) update(ev reqPeerEvent) {
r.reqSrtoCount++
case rpHardTimeout:
r.reqSrtoCount--
- case rpDeliveredValid, rpDeliveredInvalid:
+ case rpDeliveredValid, rpDeliveredInvalid, rpNotDelivered:
if ev.peer == r.lastReqSentTo {
r.lastReqSentTo = nil
} else {
@@ -343,12 +355,13 @@ func (r *sentReq) tryRequest() {
}()
select {
- case ok := <-s.valid:
- if ok {
- r.eventsCh <- reqPeerEvent{rpDeliveredValid, p}
- } else {
- r.eventsCh <- reqPeerEvent{rpDeliveredInvalid, p}
+ case event := <-s.event:
+ if event == rpNotDelivered {
+ r.lock.Lock()
+ delete(r.sentTo, p)
+ r.lock.Unlock()
}
+ r.eventsCh <- reqPeerEvent{event, p}
return
case <-time.After(softRequestTimeout):
srto = true
@@ -356,12 +369,13 @@ func (r *sentReq) tryRequest() {
}
select {
- case ok := <-s.valid:
- if ok {
- r.eventsCh <- reqPeerEvent{rpDeliveredValid, p}
- } else {
- r.eventsCh <- reqPeerEvent{rpDeliveredInvalid, p}
+ case event := <-s.event:
+ if event == rpNotDelivered {
+ r.lock.Lock()
+ delete(r.sentTo, p)
+ r.lock.Unlock()
}
+ r.eventsCh <- reqPeerEvent{event, p}
case <-time.After(hardRequestTimeout):
hrto = true
r.eventsCh <- reqPeerEvent{rpHardTimeout, p}
@@ -377,15 +391,37 @@ func (r *sentReq) deliver(peer distPeer, msg *Msg) error {
if !ok || s.delivered {
return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID)
}
+ if s.frozen {
+ return nil
+ }
valid := r.validate(peer, msg) == nil
- r.sentTo[peer] = sentReqToPeer{true, s.valid}
- s.valid <- valid
+ r.sentTo[peer] = sentReqToPeer{delivered: true, frozen: false, event: s.event}
+ if valid {
+ s.event <- rpDeliveredValid
+ } else {
+ s.event <- rpDeliveredInvalid
+ }
if !valid {
return errResp(ErrInvalidResponse, "reqID = %v", msg.ReqID)
}
return nil
}
+// frozen sends a "not delivered" event to the peer event channel belonging to the
+// given peer if the request has been sent there, causing the state machine to not
+// expect an answer and potentially even send the request to the same peer again
+// when canSend allows it.
+func (r *sentReq) frozen(peer distPeer) {
+ r.lock.Lock()
+ defer r.lock.Unlock()
+
+ s, ok := r.sentTo[peer]
+ if ok && !s.delivered && !s.frozen {
+ r.sentTo[peer] = sentReqToPeer{delivered: false, frozen: true, event: s.event}
+ s.event <- rpNotDelivered
+ }
+}
+
// stop stops the retrieval process and sets an error code that will be returned
// by getError
func (r *sentReq) stop(err error) {