diff options
author | Felföldi Zsolt <zsfelfoldi@gmail.com> | 2017-03-23 03:44:22 +0800 |
---|---|---|
committer | Felix Lange <fjl@users.noreply.github.com> | 2017-03-23 03:44:22 +0800 |
commit | 525116dbff916825463931361f75e75e955c12e2 (patch) | |
tree | b272801c420ad9a591f227919567c7952b0bd512 /les/flowcontrol/control.go | |
parent | 1c1dc0e0fc41d871aa17377d407515f437d3a54d (diff) | |
download | go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.gz go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.bz2 go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.lz go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.xz go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.zst go-tangerine-525116dbff916825463931361f75e75e955c12e2.zip |
les: implement request distributor, fix blocking issues (#3660)
* les: implement request distributor, fix blocking issues
* core: moved header validation before chain mutex lock
Diffstat (limited to 'les/flowcontrol/control.go')
-rw-r--r-- | les/flowcontrol/control.go | 104 |
1 files changed, 25 insertions, 79 deletions
diff --git a/les/flowcontrol/control.go b/les/flowcontrol/control.go index e45537cf5..e40e69346 100644 --- a/les/flowcontrol/control.go +++ b/les/flowcontrol/control.go @@ -94,14 +94,12 @@ func (peer *ClientNode) RequestProcessed(cost uint64) (bv, realCost uint64) { } type ServerNode struct { - bufEstimate uint64 - lastTime mclock.AbsTime - params *ServerParams - sumCost uint64 // sum of req costs sent to this server - pending map[uint64]uint64 // value = sumCost after sending the given req - assignedRequest uint64 // when != 0, only the request with the given ID can be sent to this peer - assignToken chan struct{} // send to this channel before assigning, read from it after deassigning - lock sync.RWMutex + bufEstimate uint64 + lastTime mclock.AbsTime + params *ServerParams + sumCost uint64 // sum of req costs sent to this server + pending map[uint64]uint64 // value = sumCost after sending the given req + lock sync.RWMutex } func NewServerNode(params *ServerParams) *ServerNode { @@ -110,7 +108,6 @@ func NewServerNode(params *ServerParams) *ServerNode { lastTime: mclock.Now(), params: params, pending: make(map[uint64]uint64), - assignToken: make(chan struct{}, 1), } } @@ -127,94 +124,37 @@ func (peer *ServerNode) recalcBLE(time mclock.AbsTime) { } // safetyMargin is added to the flow control waiting time when estimated buffer value is low -const safetyMargin = time.Millisecond * 200 +const safetyMargin = time.Millisecond -func (peer *ServerNode) canSend(maxCost uint64) time.Duration { +func (peer *ServerNode) canSend(maxCost uint64) (time.Duration, float64) { + peer.recalcBLE(mclock.Now()) maxCost += uint64(safetyMargin) * peer.params.MinRecharge / uint64(fcTimeConst) if maxCost > peer.params.BufLimit { maxCost = peer.params.BufLimit } if peer.bufEstimate >= maxCost { - return 0 + return 0, float64(peer.bufEstimate-maxCost) / float64(peer.params.BufLimit) } - return time.Duration((maxCost - peer.bufEstimate) * uint64(fcTimeConst) / peer.params.MinRecharge) + return time.Duration((maxCost - peer.bufEstimate) * uint64(fcTimeConst) / peer.params.MinRecharge), 0 } // CanSend returns the minimum waiting time required before sending a request -// with the given maximum estimated cost -func (peer *ServerNode) CanSend(maxCost uint64) time.Duration { +// with the given maximum estimated cost. Second return value is the relative +// estimated buffer level after sending the request (divided by BufLimit). +func (peer *ServerNode) CanSend(maxCost uint64) (time.Duration, float64) { peer.lock.RLock() defer peer.lock.RUnlock() return peer.canSend(maxCost) } -// AssignRequest tries to assign the server node to the given request, guaranteeing -// that once it returns true, no request will be sent to the node before this one -func (peer *ServerNode) AssignRequest(reqID uint64) bool { - select { - case peer.assignToken <- struct{}{}: - default: - return false - } - peer.lock.Lock() - peer.assignedRequest = reqID - peer.lock.Unlock() - return true -} - -// MustAssignRequest waits until the node can be assigned to the given request. -// It is always guaranteed that assignments are released in a short amount of time. -func (peer *ServerNode) MustAssignRequest(reqID uint64) { - peer.assignToken <- struct{}{} - peer.lock.Lock() - peer.assignedRequest = reqID - peer.lock.Unlock() -} - -// DeassignRequest releases a request assignment in case the planned request -// is not being sent. -func (peer *ServerNode) DeassignRequest(reqID uint64) { - peer.lock.Lock() - if peer.assignedRequest == reqID { - peer.assignedRequest = 0 - <-peer.assignToken - } - peer.lock.Unlock() -} - -// IsAssigned returns true if the server node has already been assigned to a request -// (note that this function returning false does not guarantee that you can assign a request -// immediately afterwards, its only purpose is to help peer selection) -func (peer *ServerNode) IsAssigned() bool { - peer.lock.RLock() - locked := peer.assignedRequest != 0 - peer.lock.RUnlock() - return locked -} - -// blocks until request can be sent -func (peer *ServerNode) SendRequest(reqID, maxCost uint64) { +// QueueRequest should be called when the request has been assigned to the given +// server node, before putting it in the send queue. It is mandatory that requests +// are sent in the same order as the QueueRequest calls are made. +func (peer *ServerNode) QueueRequest(reqID, maxCost uint64) { peer.lock.Lock() defer peer.lock.Unlock() - if peer.assignedRequest != reqID { - peer.lock.Unlock() - peer.MustAssignRequest(reqID) - peer.lock.Lock() - } - - peer.recalcBLE(mclock.Now()) - wait := peer.canSend(maxCost) - for wait > 0 { - peer.lock.Unlock() - time.Sleep(wait) - peer.lock.Lock() - peer.recalcBLE(mclock.Now()) - wait = peer.canSend(maxCost) - } - peer.assignedRequest = 0 - <-peer.assignToken peer.bufEstimate -= maxCost peer.sumCost += maxCost if reqID >= 0 { @@ -222,6 +162,8 @@ func (peer *ServerNode) SendRequest(reqID, maxCost uint64) { } } +// GotReply adjusts estimated buffer value according to the value included in +// the latest request reply. func (peer *ServerNode) GotReply(reqID, bv uint64) { peer.lock.Lock() @@ -235,6 +177,10 @@ func (peer *ServerNode) GotReply(reqID, bv uint64) { return } delete(peer.pending, reqID) - peer.bufEstimate = bv - (peer.sumCost - sc) + cc := peer.sumCost - sc + peer.bufEstimate = 0 + if bv > cc { + peer.bufEstimate = bv - cc + } peer.lastTime = mclock.Now() } |