aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFelix Lange <fjl@twurst.com>2016-05-02 23:01:13 +0800
committerFelix Lange <fjl@twurst.com>2016-05-03 00:50:15 +0800
commit32bb280179a44b9ad1058766bf61cdbacea30a59 (patch)
tree7bf50a724703d5baf9423d891c229ea2936b5adf
parent1c20313a6a1a35d5f540f878e7c263327c2ccfc1 (diff)
downloadgo-tangerine-32bb280179a44b9ad1058766bf61cdbacea30a59.tar
go-tangerine-32bb280179a44b9ad1058766bf61cdbacea30a59.tar.gz
go-tangerine-32bb280179a44b9ad1058766bf61cdbacea30a59.tar.bz2
go-tangerine-32bb280179a44b9ad1058766bf61cdbacea30a59.tar.lz
go-tangerine-32bb280179a44b9ad1058766bf61cdbacea30a59.tar.xz
go-tangerine-32bb280179a44b9ad1058766bf61cdbacea30a59.tar.zst
go-tangerine-32bb280179a44b9ad1058766bf61cdbacea30a59.zip
p2p: improve readability of dial task scheduling code
-rw-r--r--p2p/server.go57
-rw-r--r--p2p/server_test.go50
2 files changed, 78 insertions, 29 deletions
diff --git a/p2p/server.go b/p2p/server.go
index 52d1be677..3b2f2b078 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -398,12 +398,11 @@ type dialer interface {
func (srv *Server) run(dialstate dialer) {
defer srv.loopWG.Done()
var (
- peers = make(map[discover.NodeID]*Peer)
- trusted = make(map[discover.NodeID]bool, len(srv.TrustedNodes))
-
- tasks []task
- pendingTasks []task
+ peers = make(map[discover.NodeID]*Peer)
+ trusted = make(map[discover.NodeID]bool, len(srv.TrustedNodes))
taskdone = make(chan task, maxActiveDialTasks)
+ runningTasks []task
+ queuedTasks []task // tasks that can't run yet
)
// Put trusted nodes into a map to speed up checks.
// Trusted peers are loaded on startup and cannot be
@@ -412,39 +411,39 @@ func (srv *Server) run(dialstate dialer) {
trusted[n.ID] = true
}
- // Some task list helpers.
+ // removes t from runningTasks
delTask := func(t task) {
- for i := range tasks {
- if tasks[i] == t {
- tasks = append(tasks[:i], tasks[i+1:]...)
+ for i := range runningTasks {
+ if runningTasks[i] == t {
+ runningTasks = append(runningTasks[:i], runningTasks[i+1:]...)
break
}
}
}
- scheduleTasks := func(new []task) {
- pt := append(pendingTasks, new...)
- start := maxActiveDialTasks - len(tasks)
- if len(pt) < start {
- start = len(pt)
+ // starts until max number of active tasks is satisfied
+ startTasks := func(ts []task) (rest []task) {
+ i := 0
+ for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {
+ t := ts[i]
+ glog.V(logger.Detail).Infoln("new task:", t)
+ go func() { t.Do(srv); taskdone <- t }()
+ runningTasks = append(runningTasks, t)
}
- if start > 0 {
- tasks = append(tasks, pt[:start]...)
- for _, t := range pt[:start] {
- t := t
- glog.V(logger.Detail).Infoln("new task:", t)
- go func() { t.Do(srv); taskdone <- t }()
- }
- copy(pt, pt[start:])
- pendingTasks = pt[:len(pt)-start]
+ return ts[i:]
+ }
+ scheduleTasks := func() {
+ // Start from queue first.
+ queuedTasks = append(queuedTasks[:0], startTasks(queuedTasks)...)
+ // Query dialer for new tasks and start as many as possible now.
+ if len(runningTasks) < maxActiveDialTasks {
+ nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now())
+ queuedTasks = append(queuedTasks, startTasks(nt)...)
}
}
running:
for {
- // Query the dialer for new tasks and launch them.
- now := time.Now()
- nt := dialstate.newTasks(len(pendingTasks)+len(tasks), peers, now)
- scheduleTasks(nt)
+ scheduleTasks()
select {
case <-srv.quit:
@@ -466,7 +465,7 @@ running:
// can update its state and remove it from the active
// tasks list.
glog.V(logger.Detail).Infoln("<-taskdone:", t)
- dialstate.taskDone(t, now)
+ dialstate.taskDone(t, time.Now())
delTask(t)
case c := <-srv.posthandshake:
// A connection has passed the encryption handshake so
@@ -513,7 +512,7 @@ running:
// Wait for peers to shut down. Pending connections and tasks are
// not handled here and will terminate soon-ish because srv.quit
// is closed.
- glog.V(logger.Detail).Infof("ignoring %d pending tasks at spindown", len(tasks))
+ glog.V(logger.Detail).Infof("ignoring %d pending tasks at spindown", len(runningTasks))
for len(peers) > 0 {
p := <-srv.delpeer
glog.V(logger.Detail).Infoln("<-delpeer (spindown):", p)
diff --git a/p2p/server_test.go b/p2p/server_test.go
index 02d1c8e01..b437ac367 100644
--- a/p2p/server_test.go
+++ b/p2p/server_test.go
@@ -235,6 +235,56 @@ func TestServerTaskScheduling(t *testing.T) {
}
}
+// This test checks that Server doesn't drop tasks,
+// even if newTasks returns more than the maximum number of tasks.
+func TestServerManyTasks(t *testing.T) {
+ alltasks := make([]task, 300)
+ for i := range alltasks {
+ alltasks[i] = &testTask{index: i}
+ }
+
+ var (
+ srv = &Server{quit: make(chan struct{}), ntab: fakeTable{}, running: true}
+ done = make(chan *testTask)
+ start, end = 0, 0
+ )
+ defer srv.Stop()
+ srv.loopWG.Add(1)
+ go srv.run(taskgen{
+ newFunc: func(running int, peers map[discover.NodeID]*Peer) []task {
+ start, end = end, end+maxActiveDialTasks+10
+ if end > len(alltasks) {
+ end = len(alltasks)
+ }
+ return alltasks[start:end]
+ },
+ doneFunc: func(tt task) {
+ done <- tt.(*testTask)
+ },
+ })
+
+ doneset := make(map[int]bool)
+ timeout := time.After(2 * time.Second)
+ for len(doneset) < len(alltasks) {
+ select {
+ case tt := <-done:
+ if doneset[tt.index] {
+ t.Errorf("task %d got done more than once", tt.index)
+ } else {
+ doneset[tt.index] = true
+ }
+ case <-timeout:
+ t.Errorf("%d of %d tasks got done within 2s", len(doneset), len(alltasks))
+ for i := 0; i < len(alltasks); i++ {
+ if !doneset[i] {
+ t.Logf("task %d not done", i)
+ }
+ }
+ return
+ }
+ }
+}
+
type taskgen struct {
newFunc func(running int, peers map[discover.NodeID]*Peer) []task
doneFunc func(task)