diff options
Diffstat (limited to 'p2p/dial.go')
-rw-r--r-- | p2p/dial.go | 276 |
1 files changed, 276 insertions, 0 deletions
diff --git a/p2p/dial.go b/p2p/dial.go new file mode 100644 index 000000000..71065c5ee --- /dev/null +++ b/p2p/dial.go @@ -0,0 +1,276 @@ +package p2p + +import ( + "container/heap" + "crypto/rand" + "fmt" + "net" + "time" + + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/p2p/discover" +) + +const ( + // This is the amount of time spent waiting in between + // redialing a certain node. + dialHistoryExpiration = 30 * time.Second + + // Discovery lookup tasks will wait for this long when + // no results are returned. This can happen if the table + // becomes empty (i.e. not often). + emptyLookupDelay = 10 * time.Second +) + +// dialstate schedules dials and discovery lookups. +// it get's a chance to compute new tasks on every iteration +// of the main loop in Server.run. +type dialstate struct { + maxDynDials int + ntab discoverTable + + lookupRunning bool + bootstrapped bool + + dialing map[discover.NodeID]connFlag + lookupBuf []*discover.Node // current discovery lookup results + randomNodes []*discover.Node // filled from Table + static map[discover.NodeID]*discover.Node + hist *dialHistory +} + +type discoverTable interface { + Self() *discover.Node + Close() + Bootstrap([]*discover.Node) + Lookup(target discover.NodeID) []*discover.Node + ReadRandomNodes([]*discover.Node) int +} + +// the dial history remembers recent dials. +type dialHistory []pastDial + +// pastDial is an entry in the dial history. +type pastDial struct { + id discover.NodeID + exp time.Time +} + +type task interface { + Do(*Server) +} + +// A dialTask is generated for each node that is dialed. +type dialTask struct { + flags connFlag + dest *discover.Node +} + +// discoverTask runs discovery table operations. +// Only one discoverTask is active at any time. +// +// If bootstrap is true, the task runs Table.Bootstrap, +// otherwise it performs a random lookup and leaves the +// results in the task. +type discoverTask struct { + bootstrap bool + results []*discover.Node +} + +// A waitExpireTask is generated if there are no other tasks +// to keep the loop in Server.run ticking. +type waitExpireTask struct { + time.Duration +} + +func newDialState(static []*discover.Node, ntab discoverTable, maxdyn int) *dialstate { + s := &dialstate{ + maxDynDials: maxdyn, + ntab: ntab, + static: make(map[discover.NodeID]*discover.Node), + dialing: make(map[discover.NodeID]connFlag), + randomNodes: make([]*discover.Node, maxdyn/2), + hist: new(dialHistory), + } + for _, n := range static { + s.static[n.ID] = n + } + return s +} + +func (s *dialstate) addStatic(n *discover.Node) { + s.static[n.ID] = n +} + +func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now time.Time) []task { + var newtasks []task + addDial := func(flag connFlag, n *discover.Node) bool { + _, dialing := s.dialing[n.ID] + if dialing || peers[n.ID] != nil || s.hist.contains(n.ID) { + return false + } + s.dialing[n.ID] = flag + newtasks = append(newtasks, &dialTask{flags: flag, dest: n}) + return true + } + + // Compute number of dynamic dials necessary at this point. + needDynDials := s.maxDynDials + for _, p := range peers { + if p.rw.is(dynDialedConn) { + needDynDials-- + } + } + for _, flag := range s.dialing { + if flag&dynDialedConn != 0 { + needDynDials-- + } + } + + // Expire the dial history on every invocation. + s.hist.expire(now) + + // Create dials for static nodes if they are not connected. + for _, n := range s.static { + addDial(staticDialedConn, n) + } + + // Use random nodes from the table for half of the necessary + // dynamic dials. + randomCandidates := needDynDials / 2 + if randomCandidates > 0 && s.bootstrapped { + n := s.ntab.ReadRandomNodes(s.randomNodes) + for i := 0; i < randomCandidates && i < n; i++ { + if addDial(dynDialedConn, s.randomNodes[i]) { + needDynDials-- + } + } + } + // Create dynamic dials from random lookup results, removing tried + // items from the result buffer. + i := 0 + for ; i < len(s.lookupBuf) && needDynDials > 0; i++ { + if addDial(dynDialedConn, s.lookupBuf[i]) { + needDynDials-- + } + } + s.lookupBuf = s.lookupBuf[:copy(s.lookupBuf, s.lookupBuf[i:])] + // Launch a discovery lookup if more candidates are needed. The + // first discoverTask bootstraps the table and won't return any + // results. + if len(s.lookupBuf) < needDynDials && !s.lookupRunning { + s.lookupRunning = true + newtasks = append(newtasks, &discoverTask{bootstrap: !s.bootstrapped}) + } + + // Launch a timer to wait for the next node to expire if all + // candidates have been tried and no task is currently active. + // This should prevent cases where the dialer logic is not ticked + // because there are no pending events. + if nRunning == 0 && len(newtasks) == 0 && s.hist.Len() > 0 { + t := &waitExpireTask{s.hist.min().exp.Sub(now)} + newtasks = append(newtasks, t) + } + return newtasks +} + +func (s *dialstate) taskDone(t task, now time.Time) { + switch t := t.(type) { + case *dialTask: + s.hist.add(t.dest.ID, now.Add(dialHistoryExpiration)) + delete(s.dialing, t.dest.ID) + case *discoverTask: + if t.bootstrap { + s.bootstrapped = true + } + s.lookupRunning = false + s.lookupBuf = append(s.lookupBuf, t.results...) + } +} + +func (t *dialTask) Do(srv *Server) { + addr := &net.TCPAddr{IP: t.dest.IP, Port: int(t.dest.TCP)} + glog.V(logger.Debug).Infof("dialing %v\n", t.dest) + fd, err := srv.Dialer.Dial("tcp", addr.String()) + if err != nil { + glog.V(logger.Detail).Infof("dial error: %v", err) + return + } + srv.setupConn(fd, t.flags, t.dest) +} +func (t *dialTask) String() string { + return fmt.Sprintf("%v %x %v:%d", t.flags, t.dest.ID[:8], t.dest.IP, t.dest.TCP) +} + +func (t *discoverTask) Do(srv *Server) { + if t.bootstrap { + srv.ntab.Bootstrap(srv.BootstrapNodes) + } else { + var target discover.NodeID + rand.Read(target[:]) + t.results = srv.ntab.Lookup(target) + // newTasks generates a lookup task whenever dynamic dials are + // necessary. Lookups need to take some time, otherwise the + // event loop spins too fast. An empty result can only be + // returned if the table is empty. + if len(t.results) == 0 { + time.Sleep(emptyLookupDelay) + } + } +} + +func (t *discoverTask) String() (s string) { + if t.bootstrap { + s = "discovery bootstrap" + } else { + s = "discovery lookup" + } + if len(t.results) > 0 { + s += fmt.Sprintf(" (%d results)", len(t.results)) + } + return s +} + +func (t waitExpireTask) Do(*Server) { + time.Sleep(t.Duration) +} +func (t waitExpireTask) String() string { + return fmt.Sprintf("wait for dial hist expire (%v)", t.Duration) +} + +// Use only these methods to access or modify dialHistory. +func (h dialHistory) min() pastDial { + return h[0] +} +func (h *dialHistory) add(id discover.NodeID, exp time.Time) { + heap.Push(h, pastDial{id, exp}) +} +func (h dialHistory) contains(id discover.NodeID) bool { + for _, v := range h { + if v.id == id { + return true + } + } + return false +} +func (h *dialHistory) expire(now time.Time) { + for h.Len() > 0 && h.min().exp.Before(now) { + heap.Pop(h) + } +} + +// heap.Interface boilerplate +func (h dialHistory) Len() int { return len(h) } +func (h dialHistory) Less(i, j int) bool { return h[i].exp.Before(h[j].exp) } +func (h dialHistory) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h *dialHistory) Push(x interface{}) { + *h = append(*h, x.(pastDial)) +} +func (h *dialHistory) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} |