diff options
Diffstat (limited to 'les/flowcontrol')
-rw-r--r-- | les/flowcontrol/control.go | 172 | ||||
-rw-r--r-- | les/flowcontrol/manager.go | 223 |
2 files changed, 395 insertions, 0 deletions
diff --git a/les/flowcontrol/control.go b/les/flowcontrol/control.go new file mode 100644 index 000000000..1b569db0b --- /dev/null +++ b/les/flowcontrol/control.go @@ -0,0 +1,172 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Package flowcontrol implements a client side flow control mechanism +package flowcontrol + +import ( + "sync" + "time" + + "github.com/ethereum/go-ethereum/common/mclock" +) + +const fcTimeConst = 1000000 + +type ServerParams struct { + BufLimit, MinRecharge uint64 +} + +type ClientNode struct { + params *ServerParams + bufValue uint64 + lastTime int64 + lock sync.Mutex + cm *ClientManager + cmNode *cmNode +} + +func NewClientNode(cm *ClientManager, params *ServerParams) *ClientNode { + node := &ClientNode{ + cm: cm, + params: params, + bufValue: params.BufLimit, + lastTime: getTime(), + } + node.cmNode = cm.addNode(node) + return node +} + +func (peer *ClientNode) Remove(cm *ClientManager) { + cm.removeNode(peer.cmNode) +} + +func (peer *ClientNode) recalcBV(time int64) { + dt := uint64(time - peer.lastTime) + if time < peer.lastTime { + dt = 0 + } + peer.bufValue += peer.params.MinRecharge * dt / fcTimeConst + if peer.bufValue > peer.params.BufLimit { + peer.bufValue = peer.params.BufLimit + } + peer.lastTime = time +} + +func (peer *ClientNode) AcceptRequest() (uint64, bool) { + peer.lock.Lock() + defer peer.lock.Unlock() + + time := getTime() + peer.recalcBV(time) + return peer.bufValue, peer.cm.accept(peer.cmNode, time) +} + +func (peer *ClientNode) RequestProcessed(cost uint64) (bv, realCost uint64) { + peer.lock.Lock() + defer peer.lock.Unlock() + + time := getTime() + peer.recalcBV(time) + peer.bufValue -= cost + peer.recalcBV(time) + rcValue, rcost := peer.cm.processed(peer.cmNode, time) + if rcValue < peer.params.BufLimit { + bv := peer.params.BufLimit - rcValue + if bv > peer.bufValue { + peer.bufValue = bv + } + } + return peer.bufValue, rcost +} + +type ServerNode struct { + bufEstimate uint64 + lastTime int64 + params *ServerParams + sumCost uint64 // sum of req costs sent to this server + pending map[uint64]uint64 // value = sumCost after sending the given req + lock sync.Mutex +} + +func NewServerNode(params *ServerParams) *ServerNode { + return &ServerNode{ + bufEstimate: params.BufLimit, + lastTime: getTime(), + params: params, + pending: make(map[uint64]uint64), + } +} + +func getTime() int64 { + return int64(mclock.Now()) +} + +func (peer *ServerNode) recalcBLE(time int64) { + dt := uint64(time - peer.lastTime) + if time < peer.lastTime { + dt = 0 + } + peer.bufEstimate += peer.params.MinRecharge * dt / fcTimeConst + if peer.bufEstimate > peer.params.BufLimit { + peer.bufEstimate = peer.params.BufLimit + } + peer.lastTime = time +} + +func (peer *ServerNode) canSend(maxCost uint64) uint64 { + if peer.bufEstimate >= maxCost { + return 0 + } + return (maxCost - peer.bufEstimate) * fcTimeConst / peer.params.MinRecharge +} + +func (peer *ServerNode) CanSend(maxCost uint64) uint64 { + peer.lock.Lock() + defer peer.lock.Unlock() + + return peer.canSend(maxCost) +} + +// blocks until request can be sent +func (peer *ServerNode) SendRequest(reqID, maxCost uint64) { + peer.lock.Lock() + defer peer.lock.Unlock() + + peer.recalcBLE(getTime()) + for peer.bufEstimate < maxCost { + time.Sleep(time.Duration(peer.canSend(maxCost))) + peer.recalcBLE(getTime()) + } + peer.bufEstimate -= maxCost + peer.sumCost += maxCost + if reqID >= 0 { + peer.pending[reqID] = peer.sumCost + } +} + +func (peer *ServerNode) GotReply(reqID, bv uint64) { + peer.lock.Lock() + defer peer.lock.Unlock() + + sc, ok := peer.pending[reqID] + if !ok { + return + } + delete(peer.pending, reqID) + peer.bufEstimate = bv - (peer.sumCost - sc) + peer.lastTime = getTime() +} diff --git a/les/flowcontrol/manager.go b/les/flowcontrol/manager.go new file mode 100644 index 000000000..c0469e7b6 --- /dev/null +++ b/les/flowcontrol/manager.go @@ -0,0 +1,223 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Package flowcontrol implements a client side flow control mechanism +package flowcontrol + +import ( + "sync" + "time" +) + +const rcConst = 1000000 + +type cmNode struct { + node *ClientNode + lastUpdate int64 + reqAccepted int64 + serving, recharging bool + rcWeight uint64 + rcValue, rcDelta int64 + finishRecharge, startValue int64 +} + +func (node *cmNode) update(time int64) { + dt := time - node.lastUpdate + node.rcValue += node.rcDelta * dt / rcConst + node.lastUpdate = time + if node.recharging && time >= node.finishRecharge { + node.recharging = false + node.rcDelta = 0 + node.rcValue = 0 + } +} + +func (node *cmNode) set(serving bool, simReqCnt, sumWeight uint64) { + if node.serving && !serving { + node.recharging = true + sumWeight += node.rcWeight + } + node.serving = serving + if node.recharging && serving { + node.recharging = false + sumWeight -= node.rcWeight + } + + node.rcDelta = 0 + if serving { + node.rcDelta = int64(rcConst / simReqCnt) + } + if node.recharging { + node.rcDelta = -int64(node.node.cm.rcRecharge * node.rcWeight / sumWeight) + node.finishRecharge = node.lastUpdate + node.rcValue*rcConst/(-node.rcDelta) + } +} + +type ClientManager struct { + lock sync.Mutex + nodes map[*cmNode]struct{} + simReqCnt, sumWeight, rcSumValue uint64 + maxSimReq, maxRcSum uint64 + rcRecharge uint64 + resumeQueue chan chan bool + time int64 +} + +func NewClientManager(rcTarget, maxSimReq, maxRcSum uint64) *ClientManager { + cm := &ClientManager{ + nodes: make(map[*cmNode]struct{}), + resumeQueue: make(chan chan bool), + rcRecharge: rcConst * rcConst / (100*rcConst/rcTarget - rcConst), + maxSimReq: maxSimReq, + maxRcSum: maxRcSum, + } + go cm.queueProc() + return cm +} + +func (self *ClientManager) Stop() { + self.lock.Lock() + defer self.lock.Unlock() + + // signal any waiting accept routines to return false + self.nodes = make(map[*cmNode]struct{}) + close(self.resumeQueue) +} + +func (self *ClientManager) addNode(cnode *ClientNode) *cmNode { + time := getTime() + node := &cmNode{ + node: cnode, + lastUpdate: time, + finishRecharge: time, + rcWeight: 1, + } + self.lock.Lock() + defer self.lock.Unlock() + + self.nodes[node] = struct{}{} + self.update(getTime()) + return node +} + +func (self *ClientManager) removeNode(node *cmNode) { + self.lock.Lock() + defer self.lock.Unlock() + + time := getTime() + self.stop(node, time) + delete(self.nodes, node) + self.update(time) +} + +// recalc sumWeight +func (self *ClientManager) updateNodes(time int64) (rce bool) { + var sumWeight, rcSum uint64 + for node, _ := range self.nodes { + rc := node.recharging + node.update(time) + if rc && !node.recharging { + rce = true + } + if node.recharging { + sumWeight += node.rcWeight + } + rcSum += uint64(node.rcValue) + } + self.sumWeight = sumWeight + self.rcSumValue = rcSum + return +} + +func (self *ClientManager) update(time int64) { + for { + firstTime := time + for node, _ := range self.nodes { + if node.recharging && node.finishRecharge < firstTime { + firstTime = node.finishRecharge + } + } + if self.updateNodes(firstTime) { + for node, _ := range self.nodes { + if node.recharging { + node.set(node.serving, self.simReqCnt, self.sumWeight) + } + } + } else { + self.time = time + return + } + } +} + +func (self *ClientManager) canStartReq() bool { + return self.simReqCnt < self.maxSimReq && self.rcSumValue < self.maxRcSum +} + +func (self *ClientManager) queueProc() { + for rc := range self.resumeQueue { + for { + time.Sleep(time.Millisecond * 10) + self.lock.Lock() + self.update(getTime()) + cs := self.canStartReq() + self.lock.Unlock() + if cs { + break + } + } + close(rc) + } +} + +func (self *ClientManager) accept(node *cmNode, time int64) bool { + self.lock.Lock() + defer self.lock.Unlock() + + self.update(time) + if !self.canStartReq() { + resume := make(chan bool) + self.lock.Unlock() + self.resumeQueue <- resume + <-resume + self.lock.Lock() + if _, ok := self.nodes[node]; !ok { + return false // reject if node has been removed or manager has been stopped + } + } + self.simReqCnt++ + node.set(true, self.simReqCnt, self.sumWeight) + node.startValue = node.rcValue + self.update(self.time) + return true +} + +func (self *ClientManager) stop(node *cmNode, time int64) { + if node.serving { + self.update(time) + self.simReqCnt-- + node.set(false, self.simReqCnt, self.sumWeight) + self.update(time) + } +} + +func (self *ClientManager) processed(node *cmNode, time int64) (rcValue, rcCost uint64) { + self.lock.Lock() + defer self.lock.Unlock() + + self.stop(node, time) + return uint64(node.rcValue), uint64(node.rcValue - node.startValue) +} |