aboutsummaryrefslogtreecommitdiffstats
path: root/les/servingqueue.go
diff options
context:
space:
mode:
authorFelföldi Zsolt <zsfelfoldi@gmail.com>2019-05-31 02:51:13 +0800
committerGitHub <noreply@github.com>2019-05-31 02:51:13 +0800
commit58497f46bd0bdd105828c30500e863e826e598cd (patch)
tree7c658530edfebb6e47ed5f993753f4b48fd1747e /les/servingqueue.go
parent3d58268bba92c6c8f7f035bafcd1608bc22cee51 (diff)
downloadgo-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.tar
go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.tar.gz
go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.tar.bz2
go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.tar.lz
go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.tar.xz
go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.tar.zst
go-tangerine-58497f46bd0bdd105828c30500e863e826e598cd.zip
les, les/flowcontrol: implement LES/3 (#19329)
les, les/flowcontrol: implement LES/3
Diffstat (limited to 'les/servingqueue.go')
-rw-r--r--les/servingqueue.go219
1 files changed, 176 insertions, 43 deletions
diff --git a/les/servingqueue.go b/les/servingqueue.go
index 2438fdfe3..26656ec01 100644
--- a/les/servingqueue.go
+++ b/les/servingqueue.go
@@ -17,16 +17,24 @@
package les
import (
+ "fmt"
+ "sort"
"sync"
+ "sync/atomic"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/common/prque"
+ "github.com/ethereum/go-ethereum/les/csvlogger"
)
// servingQueue allows running tasks in a limited number of threads and puts the
// waiting tasks in a priority queue
type servingQueue struct {
- tokenCh chan runToken
+ recentTime, queuedTime, servingTimeDiff uint64
+ burstLimit, burstDropLimit uint64
+ burstDecRate float64
+ lastUpdate mclock.AbsTime
+
queueAddCh, queueBestCh chan *servingTask
stopThreadCh, quit chan struct{}
setThreadsCh chan int
@@ -36,6 +44,10 @@ type servingQueue struct {
queue *prque.Prque // priority queue for waiting or suspended tasks
best *servingTask // the highest priority task (not included in the queue)
suspendBias int64 // priority bias against suspending an already running task
+
+ logger *csvlogger.Logger
+ logRecentTime *csvlogger.Channel
+ logQueuedTime *csvlogger.Channel
}
// servingTask represents a request serving task. Tasks can be implemented to
@@ -47,12 +59,13 @@ type servingQueue struct {
// - run: execute a single step; return true if finished
// - after: executed after run finishes or returns an error, receives the total serving time
type servingTask struct {
- sq *servingQueue
- servingTime uint64
- priority int64
- biasAdded bool
- token runToken
- tokenCh chan runToken
+ sq *servingQueue
+ servingTime, timeAdded, maxTime, expTime uint64
+ peer *peer
+ priority int64
+ biasAdded bool
+ token runToken
+ tokenCh chan runToken
}
// runToken received by servingTask.start allows the task to run. Closing the
@@ -63,20 +76,19 @@ type runToken chan struct{}
// start blocks until the task can start and returns true if it is allowed to run.
// Returning false means that the task should be cancelled.
func (t *servingTask) start() bool {
+ if t.peer.isFrozen() {
+ return false
+ }
+ t.tokenCh = make(chan runToken, 1)
select {
- case t.token = <-t.sq.tokenCh:
- default:
- t.tokenCh = make(chan runToken, 1)
- select {
- case t.sq.queueAddCh <- t:
- case <-t.sq.quit:
- return false
- }
- select {
- case t.token = <-t.tokenCh:
- case <-t.sq.quit:
- return false
- }
+ case t.sq.queueAddCh <- t:
+ case <-t.sq.quit:
+ return false
+ }
+ select {
+ case t.token = <-t.tokenCh:
+ case <-t.sq.quit:
+ return false
}
if t.token == nil {
return false
@@ -90,6 +102,14 @@ func (t *servingTask) start() bool {
func (t *servingTask) done() uint64 {
t.servingTime += uint64(mclock.Now())
close(t.token)
+ diff := t.servingTime - t.timeAdded
+ t.timeAdded = t.servingTime
+ if t.expTime > diff {
+ t.expTime -= diff
+ atomic.AddUint64(&t.sq.servingTimeDiff, t.expTime)
+ } else {
+ t.expTime = 0
+ }
return t.servingTime
}
@@ -107,16 +127,22 @@ func (t *servingTask) waitOrStop() bool {
}
// newServingQueue returns a new servingQueue
-func newServingQueue(suspendBias int64) *servingQueue {
+func newServingQueue(suspendBias int64, utilTarget float64, logger *csvlogger.Logger) *servingQueue {
sq := &servingQueue{
- queue: prque.New(nil),
- suspendBias: suspendBias,
- tokenCh: make(chan runToken),
- queueAddCh: make(chan *servingTask, 100),
- queueBestCh: make(chan *servingTask),
- stopThreadCh: make(chan struct{}),
- quit: make(chan struct{}),
- setThreadsCh: make(chan int, 10),
+ queue: prque.New(nil),
+ suspendBias: suspendBias,
+ queueAddCh: make(chan *servingTask, 100),
+ queueBestCh: make(chan *servingTask),
+ stopThreadCh: make(chan struct{}),
+ quit: make(chan struct{}),
+ setThreadsCh: make(chan int, 10),
+ burstLimit: uint64(utilTarget * bufLimitRatio * 1200000),
+ burstDropLimit: uint64(utilTarget * bufLimitRatio * 1000000),
+ burstDecRate: utilTarget,
+ lastUpdate: mclock.Now(),
+ logger: logger,
+ logRecentTime: logger.NewMinMaxChannel("recentTime", false),
+ logQueuedTime: logger.NewMinMaxChannel("queuedTime", false),
}
sq.wg.Add(2)
go sq.queueLoop()
@@ -125,9 +151,12 @@ func newServingQueue(suspendBias int64) *servingQueue {
}
// newTask creates a new task with the given priority
-func (sq *servingQueue) newTask(priority int64) *servingTask {
+func (sq *servingQueue) newTask(peer *peer, maxTime uint64, priority int64) *servingTask {
return &servingTask{
sq: sq,
+ peer: peer,
+ maxTime: maxTime,
+ expTime: maxTime,
priority: priority,
}
}
@@ -144,18 +173,12 @@ func (sq *servingQueue) threadController() {
select {
case best := <-sq.queueBestCh:
best.tokenCh <- token
- default:
- select {
- case best := <-sq.queueBestCh:
- best.tokenCh <- token
- case sq.tokenCh <- token:
- case <-sq.stopThreadCh:
- sq.wg.Done()
- return
- case <-sq.quit:
- sq.wg.Done()
- return
- }
+ case <-sq.stopThreadCh:
+ sq.wg.Done()
+ return
+ case <-sq.quit:
+ sq.wg.Done()
+ return
}
<-token
select {
@@ -170,6 +193,100 @@ func (sq *servingQueue) threadController() {
}
}
+type (
+ // peerTasks lists the tasks received from a given peer when selecting peers to freeze
+ peerTasks struct {
+ peer *peer
+ list []*servingTask
+ sumTime uint64
+ priority float64
+ }
+ // peerList is a sortable list of peerTasks
+ peerList []*peerTasks
+)
+
+func (l peerList) Len() int {
+ return len(l)
+}
+
+func (l peerList) Less(i, j int) bool {
+ return l[i].priority < l[j].priority
+}
+
+func (l peerList) Swap(i, j int) {
+ l[i], l[j] = l[j], l[i]
+}
+
+// freezePeers selects the peers with the worst priority queued tasks and freezes
+// them until burstTime goes under burstDropLimit or all peers are frozen
+func (sq *servingQueue) freezePeers() {
+ peerMap := make(map[*peer]*peerTasks)
+ var peerList peerList
+ if sq.best != nil {
+ sq.queue.Push(sq.best, sq.best.priority)
+ }
+ sq.best = nil
+ for sq.queue.Size() > 0 {
+ task := sq.queue.PopItem().(*servingTask)
+ tasks := peerMap[task.peer]
+ if tasks == nil {
+ bufValue, bufLimit := task.peer.fcClient.BufferStatus()
+ if bufLimit < 1 {
+ bufLimit = 1
+ }
+ tasks = &peerTasks{
+ peer: task.peer,
+ priority: float64(bufValue) / float64(bufLimit), // lower value comes first
+ }
+ peerMap[task.peer] = tasks
+ peerList = append(peerList, tasks)
+ }
+ tasks.list = append(tasks.list, task)
+ tasks.sumTime += task.expTime
+ }
+ sort.Sort(peerList)
+ drop := true
+ sq.logger.Event("freezing peers")
+ for _, tasks := range peerList {
+ if drop {
+ tasks.peer.freezeClient()
+ tasks.peer.fcClient.Freeze()
+ sq.queuedTime -= tasks.sumTime
+ if sq.logQueuedTime != nil {
+ sq.logQueuedTime.Update(float64(sq.queuedTime) / 1000)
+ }
+ sq.logger.Event(fmt.Sprintf("frozen peer sumTime=%d, %v", tasks.sumTime, tasks.peer.id))
+ drop = sq.recentTime+sq.queuedTime > sq.burstDropLimit
+ for _, task := range tasks.list {
+ task.tokenCh <- nil
+ }
+ } else {
+ for _, task := range tasks.list {
+ sq.queue.Push(task, task.priority)
+ }
+ }
+ }
+ if sq.queue.Size() > 0 {
+ sq.best = sq.queue.PopItem().(*servingTask)
+ }
+}
+
+// updateRecentTime recalculates the recent serving time value
+func (sq *servingQueue) updateRecentTime() {
+ subTime := atomic.SwapUint64(&sq.servingTimeDiff, 0)
+ now := mclock.Now()
+ dt := now - sq.lastUpdate
+ sq.lastUpdate = now
+ if dt > 0 {
+ subTime += uint64(float64(dt) * sq.burstDecRate)
+ }
+ if sq.recentTime > subTime {
+ sq.recentTime -= subTime
+ } else {
+ sq.recentTime = 0
+ }
+}
+
// addTask inserts a task into the priority queue
func (sq *servingQueue) addTask(task *servingTask) {
if sq.best == nil {
@@ -177,10 +294,18 @@ func (sq *servingQueue) addTask(task *servingTask) {
} else if task.priority > sq.best.priority {
sq.queue.Push(sq.best, sq.best.priority)
sq.best = task
- return
} else {
sq.queue.Push(task, task.priority)
}
+ sq.updateRecentTime()
+ sq.queuedTime += task.expTime
+ if sq.logQueuedTime != nil {
+ sq.logRecentTime.Update(float64(sq.recentTime) / 1000)
+ sq.logQueuedTime.Update(float64(sq.queuedTime) / 1000)
+ }
+ if sq.recentTime+sq.queuedTime > sq.burstLimit {
+ sq.freezePeers()
+ }
}
// queueLoop is an event loop running in a goroutine. It receives tasks from queueAddCh
@@ -189,10 +314,18 @@ func (sq *servingQueue) addTask(task *servingTask) {
func (sq *servingQueue) queueLoop() {
for {
if sq.best != nil {
+ expTime := sq.best.expTime
select {
case task := <-sq.queueAddCh:
sq.addTask(task)
case sq.queueBestCh <- sq.best:
+ sq.updateRecentTime()
+ sq.queuedTime -= expTime
+ sq.recentTime += expTime
+ if sq.logQueuedTime != nil {
+ sq.logRecentTime.Update(float64(sq.recentTime) / 1000)
+ sq.logQueuedTime.Update(float64(sq.queuedTime) / 1000)
+ }
if sq.queue.Size() == 0 {
sq.best = nil
} else {