diff options
Diffstat (limited to 'les/flowcontrol')
-rw-r--r-- | les/flowcontrol/control.go | 121 | ||||
-rw-r--r-- | les/flowcontrol/manager.go | 41 |
2 files changed, 114 insertions, 48 deletions
diff --git a/les/flowcontrol/control.go b/les/flowcontrol/control.go index acb131ea4..e45537cf5 100644 --- a/les/flowcontrol/control.go +++ b/les/flowcontrol/control.go @@ -24,7 +24,7 @@ import ( "github.com/ethereum/go-ethereum/common/mclock" ) -const fcTimeConst = 1000000 +const fcTimeConst = time.Millisecond type ServerParams struct { BufLimit, MinRecharge uint64 @@ -33,7 +33,7 @@ type ServerParams struct { type ClientNode struct { params *ServerParams bufValue uint64 - lastTime int64 + lastTime mclock.AbsTime lock sync.Mutex cm *ClientManager cmNode *cmNode @@ -44,7 +44,7 @@ func NewClientNode(cm *ClientManager, params *ServerParams) *ClientNode { cm: cm, params: params, bufValue: params.BufLimit, - lastTime: getTime(), + lastTime: mclock.Now(), } node.cmNode = cm.addNode(node) return node @@ -54,12 +54,12 @@ func (peer *ClientNode) Remove(cm *ClientManager) { cm.removeNode(peer.cmNode) } -func (peer *ClientNode) recalcBV(time int64) { +func (peer *ClientNode) recalcBV(time mclock.AbsTime) { dt := uint64(time - peer.lastTime) if time < peer.lastTime { dt = 0 } - peer.bufValue += peer.params.MinRecharge * dt / fcTimeConst + peer.bufValue += peer.params.MinRecharge * dt / uint64(fcTimeConst) if peer.bufValue > peer.params.BufLimit { peer.bufValue = peer.params.BufLimit } @@ -70,7 +70,7 @@ func (peer *ClientNode) AcceptRequest() (uint64, bool) { peer.lock.Lock() defer peer.lock.Unlock() - time := getTime() + time := mclock.Now() peer.recalcBV(time) return peer.bufValue, peer.cm.accept(peer.cmNode, time) } @@ -79,7 +79,7 @@ func (peer *ClientNode) RequestProcessed(cost uint64) (bv, realCost uint64) { peer.lock.Lock() defer peer.lock.Unlock() - time := getTime() + time := mclock.Now() peer.recalcBV(time) peer.bufValue -= cost peer.recalcBV(time) @@ -94,66 +94,127 @@ func (peer *ClientNode) RequestProcessed(cost uint64) (bv, realCost uint64) { } type ServerNode struct { - bufEstimate uint64 - lastTime int64 - params *ServerParams - sumCost uint64 // sum of req costs sent to this server - pending map[uint64]uint64 // value = sumCost after sending the given req - lock sync.RWMutex + bufEstimate uint64 + lastTime mclock.AbsTime + params *ServerParams + sumCost uint64 // sum of req costs sent to this server + pending map[uint64]uint64 // value = sumCost after sending the given req + assignedRequest uint64 // when != 0, only the request with the given ID can be sent to this peer + assignToken chan struct{} // send to this channel before assigning, read from it after deassigning + lock sync.RWMutex } func NewServerNode(params *ServerParams) *ServerNode { return &ServerNode{ bufEstimate: params.BufLimit, - lastTime: getTime(), + lastTime: mclock.Now(), params: params, pending: make(map[uint64]uint64), + assignToken: make(chan struct{}, 1), } } -func getTime() int64 { - return int64(mclock.Now()) -} - -func (peer *ServerNode) recalcBLE(time int64) { +func (peer *ServerNode) recalcBLE(time mclock.AbsTime) { dt := uint64(time - peer.lastTime) if time < peer.lastTime { dt = 0 } - peer.bufEstimate += peer.params.MinRecharge * dt / fcTimeConst + peer.bufEstimate += peer.params.MinRecharge * dt / uint64(fcTimeConst) if peer.bufEstimate > peer.params.BufLimit { peer.bufEstimate = peer.params.BufLimit } peer.lastTime = time } -func (peer *ServerNode) canSend(maxCost uint64) uint64 { +// safetyMargin is added to the flow control waiting time when estimated buffer value is low +const safetyMargin = time.Millisecond * 200 + +func (peer *ServerNode) canSend(maxCost uint64) time.Duration { + maxCost += uint64(safetyMargin) * peer.params.MinRecharge / uint64(fcTimeConst) + if maxCost > peer.params.BufLimit { + maxCost = peer.params.BufLimit + } if peer.bufEstimate >= maxCost { return 0 } - return (maxCost - peer.bufEstimate) * fcTimeConst / peer.params.MinRecharge + return time.Duration((maxCost - peer.bufEstimate) * uint64(fcTimeConst) / peer.params.MinRecharge) } -func (peer *ServerNode) CanSend(maxCost uint64) uint64 { +// CanSend returns the minimum waiting time required before sending a request +// with the given maximum estimated cost +func (peer *ServerNode) CanSend(maxCost uint64) time.Duration { peer.lock.RLock() defer peer.lock.RUnlock() return peer.canSend(maxCost) } +// AssignRequest tries to assign the server node to the given request, guaranteeing +// that once it returns true, no request will be sent to the node before this one +func (peer *ServerNode) AssignRequest(reqID uint64) bool { + select { + case peer.assignToken <- struct{}{}: + default: + return false + } + peer.lock.Lock() + peer.assignedRequest = reqID + peer.lock.Unlock() + return true +} + +// MustAssignRequest waits until the node can be assigned to the given request. +// It is always guaranteed that assignments are released in a short amount of time. +func (peer *ServerNode) MustAssignRequest(reqID uint64) { + peer.assignToken <- struct{}{} + peer.lock.Lock() + peer.assignedRequest = reqID + peer.lock.Unlock() +} + +// DeassignRequest releases a request assignment in case the planned request +// is not being sent. +func (peer *ServerNode) DeassignRequest(reqID uint64) { + peer.lock.Lock() + if peer.assignedRequest == reqID { + peer.assignedRequest = 0 + <-peer.assignToken + } + peer.lock.Unlock() +} + +// IsAssigned returns true if the server node has already been assigned to a request +// (note that this function returning false does not guarantee that you can assign a request +// immediately afterwards, its only purpose is to help peer selection) +func (peer *ServerNode) IsAssigned() bool { + peer.lock.RLock() + locked := peer.assignedRequest != 0 + peer.lock.RUnlock() + return locked +} + // blocks until request can be sent func (peer *ServerNode) SendRequest(reqID, maxCost uint64) { peer.lock.Lock() defer peer.lock.Unlock() - peer.recalcBLE(getTime()) - for peer.bufEstimate < maxCost { - wait := time.Duration(peer.canSend(maxCost)) + if peer.assignedRequest != reqID { + peer.lock.Unlock() + peer.MustAssignRequest(reqID) + peer.lock.Lock() + } + + peer.recalcBLE(mclock.Now()) + wait := peer.canSend(maxCost) + for wait > 0 { peer.lock.Unlock() time.Sleep(wait) peer.lock.Lock() - peer.recalcBLE(getTime()) + peer.recalcBLE(mclock.Now()) + wait = peer.canSend(maxCost) } + peer.assignedRequest = 0 + <-peer.assignToken peer.bufEstimate -= maxCost peer.sumCost += maxCost if reqID >= 0 { @@ -162,14 +223,18 @@ func (peer *ServerNode) SendRequest(reqID, maxCost uint64) { } func (peer *ServerNode) GotReply(reqID, bv uint64) { + peer.lock.Lock() defer peer.lock.Unlock() + if bv > peer.params.BufLimit { + bv = peer.params.BufLimit + } sc, ok := peer.pending[reqID] if !ok { return } delete(peer.pending, reqID) peer.bufEstimate = bv - (peer.sumCost - sc) - peer.lastTime = getTime() + peer.lastTime = mclock.Now() } diff --git a/les/flowcontrol/manager.go b/les/flowcontrol/manager.go index 786884437..d3cc57aa6 100644 --- a/les/flowcontrol/manager.go +++ b/les/flowcontrol/manager.go @@ -20,22 +20,23 @@ package flowcontrol import ( "sync" "time" + + "github.com/ethereum/go-ethereum/common/mclock" ) const rcConst = 1000000 type cmNode struct { - node *ClientNode - lastUpdate int64 - reqAccepted int64 - serving, recharging bool - rcWeight uint64 - rcValue, rcDelta int64 - finishRecharge, startValue int64 + node *ClientNode + lastUpdate mclock.AbsTime + serving, recharging bool + rcWeight uint64 + rcValue, rcDelta, startValue int64 + finishRecharge mclock.AbsTime } -func (node *cmNode) update(time int64) { - dt := time - node.lastUpdate +func (node *cmNode) update(time mclock.AbsTime) { + dt := int64(time - node.lastUpdate) node.rcValue += node.rcDelta * dt / rcConst node.lastUpdate = time if node.recharging && time >= node.finishRecharge { @@ -62,7 +63,7 @@ func (node *cmNode) set(serving bool, simReqCnt, sumWeight uint64) { } if node.recharging { node.rcDelta = -int64(node.node.cm.rcRecharge * node.rcWeight / sumWeight) - node.finishRecharge = node.lastUpdate + node.rcValue*rcConst/(-node.rcDelta) + node.finishRecharge = node.lastUpdate + mclock.AbsTime(node.rcValue*rcConst/(-node.rcDelta)) } } @@ -73,7 +74,7 @@ type ClientManager struct { maxSimReq, maxRcSum uint64 rcRecharge uint64 resumeQueue chan chan bool - time int64 + time mclock.AbsTime } func NewClientManager(rcTarget, maxSimReq, maxRcSum uint64) *ClientManager { @@ -98,7 +99,7 @@ func (self *ClientManager) Stop() { } func (self *ClientManager) addNode(cnode *ClientNode) *cmNode { - time := getTime() + time := mclock.Now() node := &cmNode{ node: cnode, lastUpdate: time, @@ -109,7 +110,7 @@ func (self *ClientManager) addNode(cnode *ClientNode) *cmNode { defer self.lock.Unlock() self.nodes[node] = struct{}{} - self.update(getTime()) + self.update(mclock.Now()) return node } @@ -117,14 +118,14 @@ func (self *ClientManager) removeNode(node *cmNode) { self.lock.Lock() defer self.lock.Unlock() - time := getTime() + time := mclock.Now() self.stop(node, time) delete(self.nodes, node) self.update(time) } // recalc sumWeight -func (self *ClientManager) updateNodes(time int64) (rce bool) { +func (self *ClientManager) updateNodes(time mclock.AbsTime) (rce bool) { var sumWeight, rcSum uint64 for node, _ := range self.nodes { rc := node.recharging @@ -142,7 +143,7 @@ func (self *ClientManager) updateNodes(time int64) (rce bool) { return } -func (self *ClientManager) update(time int64) { +func (self *ClientManager) update(time mclock.AbsTime) { for { firstTime := time for node, _ := range self.nodes { @@ -172,7 +173,7 @@ func (self *ClientManager) queueProc() { for { time.Sleep(time.Millisecond * 10) self.lock.Lock() - self.update(getTime()) + self.update(mclock.Now()) cs := self.canStartReq() self.lock.Unlock() if cs { @@ -183,7 +184,7 @@ func (self *ClientManager) queueProc() { } } -func (self *ClientManager) accept(node *cmNode, time int64) bool { +func (self *ClientManager) accept(node *cmNode, time mclock.AbsTime) bool { self.lock.Lock() defer self.lock.Unlock() @@ -205,7 +206,7 @@ func (self *ClientManager) accept(node *cmNode, time int64) bool { return true } -func (self *ClientManager) stop(node *cmNode, time int64) { +func (self *ClientManager) stop(node *cmNode, time mclock.AbsTime) { if node.serving { self.update(time) self.simReqCnt-- @@ -214,7 +215,7 @@ func (self *ClientManager) stop(node *cmNode, time int64) { } } -func (self *ClientManager) processed(node *cmNode, time int64) (rcValue, rcCost uint64) { +func (self *ClientManager) processed(node *cmNode, time mclock.AbsTime) (rcValue, rcCost uint64) { self.lock.Lock() defer self.lock.Unlock() |