aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/dial.go
diff options
context:
space:
mode:
authorFelix Lange <fjl@twurst.com>2015-05-16 06:38:28 +0800
committerFelix Lange <fjl@twurst.com>2015-05-25 07:17:14 +0800
commit1440f9a37a8baf67b989ddf0b8cc30c9a1970e14 (patch)
treef8db89ae4aeea16c4cb87877df8fea9688c6ac99 /p2p/dial.go
parent9f38ef5d970d1ccb50d2a7697562ea547ff625c8 (diff)
downloaddexon-1440f9a37a8baf67b989ddf0b8cc30c9a1970e14.tar
dexon-1440f9a37a8baf67b989ddf0b8cc30c9a1970e14.tar.gz
dexon-1440f9a37a8baf67b989ddf0b8cc30c9a1970e14.tar.bz2
dexon-1440f9a37a8baf67b989ddf0b8cc30c9a1970e14.tar.lz
dexon-1440f9a37a8baf67b989ddf0b8cc30c9a1970e14.tar.xz
dexon-1440f9a37a8baf67b989ddf0b8cc30c9a1970e14.tar.zst
dexon-1440f9a37a8baf67b989ddf0b8cc30c9a1970e14.zip
p2p: new dialer, peer management without locks
The most visible change is event-based dialing, which should be an improvement over the timer-based system that we have at the moment. The dialer gets a chance to compute new tasks whenever peers change or dials complete. This is better than checking peers on a timer because dials happen faster. The dialer can now make more precise decisions about whom to dial based on the peer set and we can test those decisions without actually opening any sockets. Peer management is easier to test because the tests can inject connections at checkpoints (after enc handshake, after protocol handshake). Most of the handshake stuff is now part of the RLPx code. It could be exported or move to its own package because it is no longer entangled with Server logic.
Diffstat (limited to 'p2p/dial.go')
-rw-r--r--p2p/dial.go276
1 files changed, 276 insertions, 0 deletions
diff --git a/p2p/dial.go b/p2p/dial.go
new file mode 100644
index 000000000..71065c5ee
--- /dev/null
+++ b/p2p/dial.go
@@ -0,0 +1,276 @@
+package p2p
+
+import (
+ "container/heap"
+ "crypto/rand"
+ "fmt"
+ "net"
+ "time"
+
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+)
+
+const (
+ // This is the amount of time spent waiting in between
+ // redialing a certain node.
+ dialHistoryExpiration = 30 * time.Second
+
+ // Discovery lookup tasks will wait for this long when
+ // no results are returned. This can happen if the table
+ // becomes empty (i.e. not often).
+ emptyLookupDelay = 10 * time.Second
+)
+
+// dialstate schedules dials and discovery lookups.
+// it get's a chance to compute new tasks on every iteration
+// of the main loop in Server.run.
+type dialstate struct {
+ maxDynDials int
+ ntab discoverTable
+
+ lookupRunning bool
+ bootstrapped bool
+
+ dialing map[discover.NodeID]connFlag
+ lookupBuf []*discover.Node // current discovery lookup results
+ randomNodes []*discover.Node // filled from Table
+ static map[discover.NodeID]*discover.Node
+ hist *dialHistory
+}
+
+type discoverTable interface {
+ Self() *discover.Node
+ Close()
+ Bootstrap([]*discover.Node)
+ Lookup(target discover.NodeID) []*discover.Node
+ ReadRandomNodes([]*discover.Node) int
+}
+
+// the dial history remembers recent dials.
+type dialHistory []pastDial
+
+// pastDial is an entry in the dial history.
+type pastDial struct {
+ id discover.NodeID
+ exp time.Time
+}
+
+type task interface {
+ Do(*Server)
+}
+
+// A dialTask is generated for each node that is dialed.
+type dialTask struct {
+ flags connFlag
+ dest *discover.Node
+}
+
+// discoverTask runs discovery table operations.
+// Only one discoverTask is active at any time.
+//
+// If bootstrap is true, the task runs Table.Bootstrap,
+// otherwise it performs a random lookup and leaves the
+// results in the task.
+type discoverTask struct {
+ bootstrap bool
+ results []*discover.Node
+}
+
+// A waitExpireTask is generated if there are no other tasks
+// to keep the loop in Server.run ticking.
+type waitExpireTask struct {
+ time.Duration
+}
+
+func newDialState(static []*discover.Node, ntab discoverTable, maxdyn int) *dialstate {
+ s := &dialstate{
+ maxDynDials: maxdyn,
+ ntab: ntab,
+ static: make(map[discover.NodeID]*discover.Node),
+ dialing: make(map[discover.NodeID]connFlag),
+ randomNodes: make([]*discover.Node, maxdyn/2),
+ hist: new(dialHistory),
+ }
+ for _, n := range static {
+ s.static[n.ID] = n
+ }
+ return s
+}
+
+func (s *dialstate) addStatic(n *discover.Node) {
+ s.static[n.ID] = n
+}
+
+func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now time.Time) []task {
+ var newtasks []task
+ addDial := func(flag connFlag, n *discover.Node) bool {
+ _, dialing := s.dialing[n.ID]
+ if dialing || peers[n.ID] != nil || s.hist.contains(n.ID) {
+ return false
+ }
+ s.dialing[n.ID] = flag
+ newtasks = append(newtasks, &dialTask{flags: flag, dest: n})
+ return true
+ }
+
+ // Compute number of dynamic dials necessary at this point.
+ needDynDials := s.maxDynDials
+ for _, p := range peers {
+ if p.rw.is(dynDialedConn) {
+ needDynDials--
+ }
+ }
+ for _, flag := range s.dialing {
+ if flag&dynDialedConn != 0 {
+ needDynDials--
+ }
+ }
+
+ // Expire the dial history on every invocation.
+ s.hist.expire(now)
+
+ // Create dials for static nodes if they are not connected.
+ for _, n := range s.static {
+ addDial(staticDialedConn, n)
+ }
+
+ // Use random nodes from the table for half of the necessary
+ // dynamic dials.
+ randomCandidates := needDynDials / 2
+ if randomCandidates > 0 && s.bootstrapped {
+ n := s.ntab.ReadRandomNodes(s.randomNodes)
+ for i := 0; i < randomCandidates && i < n; i++ {
+ if addDial(dynDialedConn, s.randomNodes[i]) {
+ needDynDials--
+ }
+ }
+ }
+ // Create dynamic dials from random lookup results, removing tried
+ // items from the result buffer.
+ i := 0
+ for ; i < len(s.lookupBuf) && needDynDials > 0; i++ {
+ if addDial(dynDialedConn, s.lookupBuf[i]) {
+ needDynDials--
+ }
+ }
+ s.lookupBuf = s.lookupBuf[:copy(s.lookupBuf, s.lookupBuf[i:])]
+ // Launch a discovery lookup if more candidates are needed. The
+ // first discoverTask bootstraps the table and won't return any
+ // results.
+ if len(s.lookupBuf) < needDynDials && !s.lookupRunning {
+ s.lookupRunning = true
+ newtasks = append(newtasks, &discoverTask{bootstrap: !s.bootstrapped})
+ }
+
+ // Launch a timer to wait for the next node to expire if all
+ // candidates have been tried and no task is currently active.
+ // This should prevent cases where the dialer logic is not ticked
+ // because there are no pending events.
+ if nRunning == 0 && len(newtasks) == 0 && s.hist.Len() > 0 {
+ t := &waitExpireTask{s.hist.min().exp.Sub(now)}
+ newtasks = append(newtasks, t)
+ }
+ return newtasks
+}
+
+func (s *dialstate) taskDone(t task, now time.Time) {
+ switch t := t.(type) {
+ case *dialTask:
+ s.hist.add(t.dest.ID, now.Add(dialHistoryExpiration))
+ delete(s.dialing, t.dest.ID)
+ case *discoverTask:
+ if t.bootstrap {
+ s.bootstrapped = true
+ }
+ s.lookupRunning = false
+ s.lookupBuf = append(s.lookupBuf, t.results...)
+ }
+}
+
+func (t *dialTask) Do(srv *Server) {
+ addr := &net.TCPAddr{IP: t.dest.IP, Port: int(t.dest.TCP)}
+ glog.V(logger.Debug).Infof("dialing %v\n", t.dest)
+ fd, err := srv.Dialer.Dial("tcp", addr.String())
+ if err != nil {
+ glog.V(logger.Detail).Infof("dial error: %v", err)
+ return
+ }
+ srv.setupConn(fd, t.flags, t.dest)
+}
+func (t *dialTask) String() string {
+ return fmt.Sprintf("%v %x %v:%d", t.flags, t.dest.ID[:8], t.dest.IP, t.dest.TCP)
+}
+
+func (t *discoverTask) Do(srv *Server) {
+ if t.bootstrap {
+ srv.ntab.Bootstrap(srv.BootstrapNodes)
+ } else {
+ var target discover.NodeID
+ rand.Read(target[:])
+ t.results = srv.ntab.Lookup(target)
+ // newTasks generates a lookup task whenever dynamic dials are
+ // necessary. Lookups need to take some time, otherwise the
+ // event loop spins too fast. An empty result can only be
+ // returned if the table is empty.
+ if len(t.results) == 0 {
+ time.Sleep(emptyLookupDelay)
+ }
+ }
+}
+
+func (t *discoverTask) String() (s string) {
+ if t.bootstrap {
+ s = "discovery bootstrap"
+ } else {
+ s = "discovery lookup"
+ }
+ if len(t.results) > 0 {
+ s += fmt.Sprintf(" (%d results)", len(t.results))
+ }
+ return s
+}
+
+func (t waitExpireTask) Do(*Server) {
+ time.Sleep(t.Duration)
+}
+func (t waitExpireTask) String() string {
+ return fmt.Sprintf("wait for dial hist expire (%v)", t.Duration)
+}
+
+// Use only these methods to access or modify dialHistory.
+func (h dialHistory) min() pastDial {
+ return h[0]
+}
+func (h *dialHistory) add(id discover.NodeID, exp time.Time) {
+ heap.Push(h, pastDial{id, exp})
+}
+func (h dialHistory) contains(id discover.NodeID) bool {
+ for _, v := range h {
+ if v.id == id {
+ return true
+ }
+ }
+ return false
+}
+func (h *dialHistory) expire(now time.Time) {
+ for h.Len() > 0 && h.min().exp.Before(now) {
+ heap.Pop(h)
+ }
+}
+
+// heap.Interface boilerplate
+func (h dialHistory) Len() int { return len(h) }
+func (h dialHistory) Less(i, j int) bool { return h[i].exp.Before(h[j].exp) }
+func (h dialHistory) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
+func (h *dialHistory) Push(x interface{}) {
+ *h = append(*h, x.(pastDial))
+}
+func (h *dialHistory) Pop() interface{} {
+ old := *h
+ n := len(old)
+ x := old[n-1]
+ *h = old[0 : n-1]
+ return x
+}