diff options
author | Felföldi Zsolt <zsfelfoldi@gmail.com> | 2019-05-31 02:51:13 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-05-31 02:51:13 +0800 |
commit | 58497f46bd0bdd105828c30500e863e826e598cd (patch) | |
tree | 7c658530edfebb6e47ed5f993753f4b48fd1747e /les/servingqueue.go | |
parent | 3d58268bba92c6c8f7f035bafcd1608bc22cee51 (diff) | |
download | go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.tar go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.tar.gz go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.tar.bz2 go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.tar.lz go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.tar.xz go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.tar.zst go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.zip |
les, les/flowcontrol: implement LES/3 (#19329)
les, les/flowcontrol: implement LES/3
Diffstat (limited to 'les/servingqueue.go')
-rw-r--r-- | les/servingqueue.go | 219 |
1 files changed, 176 insertions, 43 deletions
diff --git a/les/servingqueue.go b/les/servingqueue.go index 2438fdfe3..26656ec01 100644 --- a/les/servingqueue.go +++ b/les/servingqueue.go @@ -17,16 +17,24 @@ package les import ( + "fmt" + "sort" "sync" + "sync/atomic" "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/common/prque" + "github.com/ethereum/go-ethereum/les/csvlogger" ) // servingQueue allows running tasks in a limited number of threads and puts the // waiting tasks in a priority queue type servingQueue struct { - tokenCh chan runToken + recentTime, queuedTime, servingTimeDiff uint64 + burstLimit, burstDropLimit uint64 + burstDecRate float64 + lastUpdate mclock.AbsTime + queueAddCh, queueBestCh chan *servingTask stopThreadCh, quit chan struct{} setThreadsCh chan int @@ -36,6 +44,10 @@ type servingQueue struct { queue *prque.Prque // priority queue for waiting or suspended tasks best *servingTask // the highest priority task (not included in the queue) suspendBias int64 // priority bias against suspending an already running task + + logger *csvlogger.Logger + logRecentTime *csvlogger.Channel + logQueuedTime *csvlogger.Channel } // servingTask represents a request serving task. Tasks can be implemented to @@ -47,12 +59,13 @@ type servingQueue struct { // - run: execute a single step; return true if finished // - after: executed after run finishes or returns an error, receives the total serving time type servingTask struct { - sq *servingQueue - servingTime uint64 - priority int64 - biasAdded bool - token runToken - tokenCh chan runToken + sq *servingQueue + servingTime, timeAdded, maxTime, expTime uint64 + peer *peer + priority int64 + biasAdded bool + token runToken + tokenCh chan runToken } // runToken received by servingTask.start allows the task to run. Closing the @@ -63,20 +76,19 @@ type runToken chan struct{} // start blocks until the task can start and returns true if it is allowed to run. // Returning false means that the task should be cancelled. func (t *servingTask) start() bool { + if t.peer.isFrozen() { + return false + } + t.tokenCh = make(chan runToken, 1) select { - case t.token = <-t.sq.tokenCh: - default: - t.tokenCh = make(chan runToken, 1) - select { - case t.sq.queueAddCh <- t: - case <-t.sq.quit: - return false - } - select { - case t.token = <-t.tokenCh: - case <-t.sq.quit: - return false - } + case t.sq.queueAddCh <- t: + case <-t.sq.quit: + return false + } + select { + case t.token = <-t.tokenCh: + case <-t.sq.quit: + return false } if t.token == nil { return false @@ -90,6 +102,14 @@ func (t *servingTask) start() bool { func (t *servingTask) done() uint64 { t.servingTime += uint64(mclock.Now()) close(t.token) + diff := t.servingTime - t.timeAdded + t.timeAdded = t.servingTime + if t.expTime > diff { + t.expTime -= diff + atomic.AddUint64(&t.sq.servingTimeDiff, t.expTime) + } else { + t.expTime = 0 + } return t.servingTime } @@ -107,16 +127,22 @@ func (t *servingTask) waitOrStop() bool { } // newServingQueue returns a new servingQueue -func newServingQueue(suspendBias int64) *servingQueue { +func newServingQueue(suspendBias int64, utilTarget float64, logger *csvlogger.Logger) *servingQueue { sq := &servingQueue{ - queue: prque.New(nil), - suspendBias: suspendBias, - tokenCh: make(chan runToken), - queueAddCh: make(chan *servingTask, 100), - queueBestCh: make(chan *servingTask), - stopThreadCh: make(chan struct{}), - quit: make(chan struct{}), - setThreadsCh: make(chan int, 10), + queue: prque.New(nil), + suspendBias: suspendBias, + queueAddCh: make(chan *servingTask, 100), + queueBestCh: make(chan *servingTask), + stopThreadCh: make(chan struct{}), + quit: make(chan struct{}), + setThreadsCh: make(chan int, 10), + burstLimit: uint64(utilTarget * bufLimitRatio * 1200000), + burstDropLimit: uint64(utilTarget * bufLimitRatio * 1000000), + burstDecRate: utilTarget, + lastUpdate: mclock.Now(), + logger: logger, + logRecentTime: logger.NewMinMaxChannel("recentTime", false), + logQueuedTime: logger.NewMinMaxChannel("queuedTime", false), } sq.wg.Add(2) go sq.queueLoop() @@ -125,9 +151,12 @@ func newServingQueue(suspendBias int64) *servingQueue { } // newTask creates a new task with the given priority -func (sq *servingQueue) newTask(priority int64) *servingTask { +func (sq *servingQueue) newTask(peer *peer, maxTime uint64, priority int64) *servingTask { return &servingTask{ sq: sq, + peer: peer, + maxTime: maxTime, + expTime: maxTime, priority: priority, } } @@ -144,18 +173,12 @@ func (sq *servingQueue) threadController() { select { case best := <-sq.queueBestCh: best.tokenCh <- token - default: - select { - case best := <-sq.queueBestCh: - best.tokenCh <- token - case sq.tokenCh <- token: - case <-sq.stopThreadCh: - sq.wg.Done() - return - case <-sq.quit: - sq.wg.Done() - return - } + case <-sq.stopThreadCh: + sq.wg.Done() + return + case <-sq.quit: + sq.wg.Done() + return } <-token select { @@ -170,6 +193,100 @@ func (sq *servingQueue) threadController() { } } +type ( + // peerTasks lists the tasks received from a given peer when selecting peers to freeze + peerTasks struct { + peer *peer + list []*servingTask + sumTime uint64 + priority float64 + } + // peerList is a sortable list of peerTasks + peerList []*peerTasks +) + +func (l peerList) Len() int { + return len(l) +} + +func (l peerList) Less(i, j int) bool { + return l[i].priority < l[j].priority +} + +func (l peerList) Swap(i, j int) { + l[i], l[j] = l[j], l[i] +} + +// freezePeers selects the peers with the worst priority queued tasks and freezes +// them until burstTime goes under burstDropLimit or all peers are frozen +func (sq *servingQueue) freezePeers() { + peerMap := make(map[*peer]*peerTasks) + var peerList peerList + if sq.best != nil { + sq.queue.Push(sq.best, sq.best.priority) + } + sq.best = nil + for sq.queue.Size() > 0 { + task := sq.queue.PopItem().(*servingTask) + tasks := peerMap[task.peer] + if tasks == nil { + bufValue, bufLimit := task.peer.fcClient.BufferStatus() + if bufLimit < 1 { + bufLimit = 1 + } + tasks = &peerTasks{ + peer: task.peer, + priority: float64(bufValue) / float64(bufLimit), // lower value comes first + } + peerMap[task.peer] = tasks + peerList = append(peerList, tasks) + } + tasks.list = append(tasks.list, task) + tasks.sumTime += task.expTime + } + sort.Sort(peerList) + drop := true + sq.logger.Event("freezing peers") + for _, tasks := range peerList { + if drop { + tasks.peer.freezeClient() + tasks.peer.fcClient.Freeze() + sq.queuedTime -= tasks.sumTime + if sq.logQueuedTime != nil { + sq.logQueuedTime.Update(float64(sq.queuedTime) / 1000) + } + sq.logger.Event(fmt.Sprintf("frozen peer sumTime=%d, %v", tasks.sumTime, tasks.peer.id)) + drop = sq.recentTime+sq.queuedTime > sq.burstDropLimit + for _, task := range tasks.list { + task.tokenCh <- nil + } + } else { + for _, task := range tasks.list { + sq.queue.Push(task, task.priority) + } + } + } + if sq.queue.Size() > 0 { + sq.best = sq.queue.PopItem().(*servingTask) + } +} + +// updateRecentTime recalculates the recent serving time value +func (sq *servingQueue) updateRecentTime() { + subTime := atomic.SwapUint64(&sq.servingTimeDiff, 0) + now := mclock.Now() + dt := now - sq.lastUpdate + sq.lastUpdate = now + if dt > 0 { + subTime += uint64(float64(dt) * sq.burstDecRate) + } + if sq.recentTime > subTime { + sq.recentTime -= subTime + } else { + sq.recentTime = 0 + } +} + // addTask inserts a task into the priority queue func (sq *servingQueue) addTask(task *servingTask) { if sq.best == nil { @@ -177,10 +294,18 @@ func (sq *servingQueue) addTask(task *servingTask) { } else if task.priority > sq.best.priority { sq.queue.Push(sq.best, sq.best.priority) sq.best = task - return } else { sq.queue.Push(task, task.priority) } + sq.updateRecentTime() + sq.queuedTime += task.expTime + if sq.logQueuedTime != nil { + sq.logRecentTime.Update(float64(sq.recentTime) / 1000) + sq.logQueuedTime.Update(float64(sq.queuedTime) / 1000) + } + if sq.recentTime+sq.queuedTime > sq.burstLimit { + sq.freezePeers() + } } // queueLoop is an event loop running in a goroutine. It receives tasks from queueAddCh @@ -189,10 +314,18 @@ func (sq *servingQueue) addTask(task *servingTask) { func (sq *servingQueue) queueLoop() { for { if sq.best != nil { + expTime := sq.best.expTime select { case task := <-sq.queueAddCh: sq.addTask(task) case sq.queueBestCh <- sq.best: + sq.updateRecentTime() + sq.queuedTime -= expTime + sq.recentTime += expTime + if sq.logQueuedTime != nil { + sq.logRecentTime.Update(float64(sq.recentTime) / 1000) + sq.logQueuedTime.Update(float64(sq.queuedTime) / 1000) + } if sq.queue.Size() == 0 { sq.best = nil } else { |