aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/dial.go
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/dial.go')
-rw-r--r--p2p/dial.go276
1 files changed, 276 insertions, 0 deletions
diff --git a/p2p/dial.go b/p2p/dial.go
new file mode 100644
index 000000000..71065c5ee
--- /dev/null
+++ b/p2p/dial.go
@@ -0,0 +1,276 @@
+package p2p
+
+import (
+ "container/heap"
+ "crypto/rand"
+ "fmt"
+ "net"
+ "time"
+
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+)
+
+const (
+ // This is the amount of time spent waiting in between
+ // redialing a certain node.
+ dialHistoryExpiration = 30 * time.Second
+
+ // Discovery lookup tasks will wait for this long when
+ // no results are returned. This can happen if the table
+ // becomes empty (i.e. not often).
+ emptyLookupDelay = 10 * time.Second
+)
+
+// dialstate schedules dials and discovery lookups.
+// it get's a chance to compute new tasks on every iteration
+// of the main loop in Server.run.
+type dialstate struct {
+ maxDynDials int
+ ntab discoverTable
+
+ lookupRunning bool
+ bootstrapped bool
+
+ dialing map[discover.NodeID]connFlag
+ lookupBuf []*discover.Node // current discovery lookup results
+ randomNodes []*discover.Node // filled from Table
+ static map[discover.NodeID]*discover.Node
+ hist *dialHistory
+}
+
+type discoverTable interface {
+ Self() *discover.Node
+ Close()
+ Bootstrap([]*discover.Node)
+ Lookup(target discover.NodeID) []*discover.Node
+ ReadRandomNodes([]*discover.Node) int
+}
+
+// the dial history remembers recent dials.
+type dialHistory []pastDial
+
+// pastDial is an entry in the dial history.
+type pastDial struct {
+ id discover.NodeID
+ exp time.Time
+}
+
+type task interface {
+ Do(*Server)
+}
+
+// A dialTask is generated for each node that is dialed.
+type dialTask struct {
+ flags connFlag
+ dest *discover.Node
+}
+
+// discoverTask runs discovery table operations.
+// Only one discoverTask is active at any time.
+//
+// If bootstrap is true, the task runs Table.Bootstrap,
+// otherwise it performs a random lookup and leaves the
+// results in the task.
+type discoverTask struct {
+ bootstrap bool
+ results []*discover.Node
+}
+
+// A waitExpireTask is generated if there are no other tasks
+// to keep the loop in Server.run ticking.
+type waitExpireTask struct {
+ time.Duration
+}
+
+func newDialState(static []*discover.Node, ntab discoverTable, maxdyn int) *dialstate {
+ s := &dialstate{
+ maxDynDials: maxdyn,
+ ntab: ntab,
+ static: make(map[discover.NodeID]*discover.Node),
+ dialing: make(map[discover.NodeID]connFlag),
+ randomNodes: make([]*discover.Node, maxdyn/2),
+ hist: new(dialHistory),
+ }
+ for _, n := range static {
+ s.static[n.ID] = n
+ }
+ return s
+}
+
+func (s *dialstate) addStatic(n *discover.Node) {
+ s.static[n.ID] = n
+}
+
+func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now time.Time) []task {
+ var newtasks []task
+ addDial := func(flag connFlag, n *discover.Node) bool {
+ _, dialing := s.dialing[n.ID]
+ if dialing || peers[n.ID] != nil || s.hist.contains(n.ID) {
+ return false
+ }
+ s.dialing[n.ID] = flag
+ newtasks = append(newtasks, &dialTask{flags: flag, dest: n})
+ return true
+ }
+
+ // Compute number of dynamic dials necessary at this point.
+ needDynDials := s.maxDynDials
+ for _, p := range peers {
+ if p.rw.is(dynDialedConn) {
+ needDynDials--
+ }
+ }
+ for _, flag := range s.dialing {
+ if flag&dynDialedConn != 0 {
+ needDynDials--
+ }
+ }
+
+ // Expire the dial history on every invocation.
+ s.hist.expire(now)
+
+ // Create dials for static nodes if they are not connected.
+ for _, n := range s.static {
+ addDial(staticDialedConn, n)
+ }
+
+ // Use random nodes from the table for half of the necessary
+ // dynamic dials.
+ randomCandidates := needDynDials / 2
+ if randomCandidates > 0 && s.bootstrapped {
+ n := s.ntab.ReadRandomNodes(s.randomNodes)
+ for i := 0; i < randomCandidates && i < n; i++ {
+ if addDial(dynDialedConn, s.randomNodes[i]) {
+ needDynDials--
+ }
+ }
+ }
+ // Create dynamic dials from random lookup results, removing tried
+ // items from the result buffer.
+ i := 0
+ for ; i < len(s.lookupBuf) && needDynDials > 0; i++ {
+ if addDial(dynDialedConn, s.lookupBuf[i]) {
+ needDynDials--
+ }
+ }
+ s.lookupBuf = s.lookupBuf[:copy(s.lookupBuf, s.lookupBuf[i:])]
+ // Launch a discovery lookup if more candidates are needed. The
+ // first discoverTask bootstraps the table and won't return any
+ // results.
+ if len(s.lookupBuf) < needDynDials && !s.lookupRunning {
+ s.lookupRunning = true
+ newtasks = append(newtasks, &discoverTask{bootstrap: !s.bootstrapped})
+ }
+
+ // Launch a timer to wait for the next node to expire if all
+ // candidates have been tried and no task is currently active.
+ // This should prevent cases where the dialer logic is not ticked
+ // because there are no pending events.
+ if nRunning == 0 && len(newtasks) == 0 && s.hist.Len() > 0 {
+ t := &waitExpireTask{s.hist.min().exp.Sub(now)}
+ newtasks = append(newtasks, t)
+ }
+ return newtasks
+}
+
+func (s *dialstate) taskDone(t task, now time.Time) {
+ switch t := t.(type) {
+ case *dialTask:
+ s.hist.add(t.dest.ID, now.Add(dialHistoryExpiration))
+ delete(s.dialing, t.dest.ID)
+ case *discoverTask:
+ if t.bootstrap {
+ s.bootstrapped = true
+ }
+ s.lookupRunning = false
+ s.lookupBuf = append(s.lookupBuf, t.results...)
+ }
+}
+
+func (t *dialTask) Do(srv *Server) {
+ addr := &net.TCPAddr{IP: t.dest.IP, Port: int(t.dest.TCP)}
+ glog.V(logger.Debug).Infof("dialing %v\n", t.dest)
+ fd, err := srv.Dialer.Dial("tcp", addr.String())
+ if err != nil {
+ glog.V(logger.Detail).Infof("dial error: %v", err)
+ return
+ }
+ srv.setupConn(fd, t.flags, t.dest)
+}
+func (t *dialTask) String() string {
+ return fmt.Sprintf("%v %x %v:%d", t.flags, t.dest.ID[:8], t.dest.IP, t.dest.TCP)
+}
+
+func (t *discoverTask) Do(srv *Server) {
+ if t.bootstrap {
+ srv.ntab.Bootstrap(srv.BootstrapNodes)
+ } else {
+ var target discover.NodeID
+ rand.Read(target[:])
+ t.results = srv.ntab.Lookup(target)
+ // newTasks generates a lookup task whenever dynamic dials are
+ // necessary. Lookups need to take some time, otherwise the
+ // event loop spins too fast. An empty result can only be
+ // returned if the table is empty.
+ if len(t.results) == 0 {
+ time.Sleep(emptyLookupDelay)
+ }
+ }
+}
+
+func (t *discoverTask) String() (s string) {
+ if t.bootstrap {
+ s = "discovery bootstrap"
+ } else {
+ s = "discovery lookup"
+ }
+ if len(t.results) > 0 {
+ s += fmt.Sprintf(" (%d results)", len(t.results))
+ }
+ return s
+}
+
+func (t waitExpireTask) Do(*Server) {
+ time.Sleep(t.Duration)
+}
+func (t waitExpireTask) String() string {
+ return fmt.Sprintf("wait for dial hist expire (%v)", t.Duration)
+}
+
+// Use only these methods to access or modify dialHistory.
+func (h dialHistory) min() pastDial {
+ return h[0]
+}
+func (h *dialHistory) add(id discover.NodeID, exp time.Time) {
+ heap.Push(h, pastDial{id, exp})
+}
+func (h dialHistory) contains(id discover.NodeID) bool {
+ for _, v := range h {
+ if v.id == id {
+ return true
+ }
+ }
+ return false
+}
+func (h *dialHistory) expire(now time.Time) {
+ for h.Len() > 0 && h.min().exp.Before(now) {
+ heap.Pop(h)
+ }
+}
+
+// heap.Interface boilerplate
+func (h dialHistory) Len() int { return len(h) }
+func (h dialHistory) Less(i, j int) bool { return h[i].exp.Before(h[j].exp) }
+func (h dialHistory) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
+func (h *dialHistory) Push(x interface{}) {
+ *h = append(*h, x.(pastDial))
+}
+func (h *dialHistory) Pop() interface{} {
+ old := *h
+ n := len(old)
+ x := old[n-1]
+ *h = old[0 : n-1]
+ return x
+}