aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/dial.go
diff options
context:
space:
mode:
authorFelix Lange <fjl@twurst.com>2015-10-23 05:46:28 +0800
committerFelix Lange <fjl@twurst.com>2015-12-18 06:39:49 +0800
commit6c41e675ec6fc720a1e8429fa4ef035a476e26d8 (patch)
tree53345a95058ce6c6c2e578809ffabf425b329eb5 /p2p/dial.go
parent04c6369a09baa5267a01713663f7c1cbb08896c9 (diff)
downloadgo-tangerine-6c41e675ec6fc720a1e8429fa4ef035a476e26d8.tar
go-tangerine-6c41e675ec6fc720a1e8429fa4ef035a476e26d8.tar.gz
go-tangerine-6c41e675ec6fc720a1e8429fa4ef035a476e26d8.tar.bz2
go-tangerine-6c41e675ec6fc720a1e8429fa4ef035a476e26d8.tar.lz
go-tangerine-6c41e675ec6fc720a1e8429fa4ef035a476e26d8.tar.xz
go-tangerine-6c41e675ec6fc720a1e8429fa4ef035a476e26d8.tar.zst
go-tangerine-6c41e675ec6fc720a1e8429fa4ef035a476e26d8.zip
p2p: resolve incomplete dial targets
This change makes it possible to add peers without providing their IP address. The endpoint of the target node is resolved using the discovery protocol.
Diffstat (limited to 'p2p/dial.go')
-rw-r--r--p2p/dial.go110
1 files changed, 88 insertions, 22 deletions
diff --git a/p2p/dial.go b/p2p/dial.go
index bdc9f852c..c0e703d7d 100644
--- a/p2p/dial.go
+++ b/p2p/dial.go
@@ -36,6 +36,10 @@ const (
// Discovery lookups are throttled and can only run
// once every few seconds.
lookupInterval = 4 * time.Second
+
+ // Endpoint resolution is throttled with bounded backoff.
+ initialResolveDelay = 60 * time.Second
+ maxResolveDelay = time.Hour
)
// dialstate schedules dials and discovery lookups.
@@ -46,17 +50,17 @@ type dialstate struct {
ntab discoverTable
lookupRunning bool
-
- dialing map[discover.NodeID]connFlag
- lookupBuf []*discover.Node // current discovery lookup results
- randomNodes []*discover.Node // filled from Table
- static map[discover.NodeID]*discover.Node
- hist *dialHistory
+ dialing map[discover.NodeID]connFlag
+ lookupBuf []*discover.Node // current discovery lookup results
+ randomNodes []*discover.Node // filled from Table
+ static map[discover.NodeID]*dialTask
+ hist *dialHistory
}
type discoverTable interface {
Self() *discover.Node
Close()
+ Resolve(target discover.NodeID) *discover.Node
Lookup(target discover.NodeID) []*discover.Node
ReadRandomNodes([]*discover.Node) int
}
@@ -74,10 +78,13 @@ type task interface {
Do(*Server)
}
-// A dialTask is generated for each node that is dialed.
+// A dialTask is generated for each node that is dialed. Its
+// fields cannot be accessed while the task is running.
type dialTask struct {
- flags connFlag
- dest *discover.Node
+ flags connFlag
+ dest *discover.Node
+ lastResolved time.Time
+ resolveDelay time.Duration
}
// discoverTask runs discovery table operations.
@@ -97,26 +104,31 @@ func newDialState(static []*discover.Node, ntab discoverTable, maxdyn int) *dial
s := &dialstate{
maxDynDials: maxdyn,
ntab: ntab,
- static: make(map[discover.NodeID]*discover.Node),
+ static: make(map[discover.NodeID]*dialTask),
dialing: make(map[discover.NodeID]connFlag),
randomNodes: make([]*discover.Node, maxdyn/2),
hist: new(dialHistory),
}
for _, n := range static {
- s.static[n.ID] = n
+ s.addStatic(n)
}
return s
}
func (s *dialstate) addStatic(n *discover.Node) {
- s.static[n.ID] = n
+ // This overwites the task instead of updating an existing
+ // entry, giving users the opportunity to force a resolve operation.
+ s.static[n.ID] = &dialTask{flags: staticDialedConn, dest: n}
}
func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now time.Time) []task {
var newtasks []task
+ isDialing := func(id discover.NodeID) bool {
+ _, found := s.dialing[id]
+ return found || peers[id] != nil || s.hist.contains(id)
+ }
addDial := func(flag connFlag, n *discover.Node) bool {
- _, dialing := s.dialing[n.ID]
- if dialing || peers[n.ID] != nil || s.hist.contains(n.ID) {
+ if isDialing(n.ID) {
return false
}
s.dialing[n.ID] = flag
@@ -141,8 +153,11 @@ func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now
s.hist.expire(now)
// Create dials for static nodes if they are not connected.
- for _, n := range s.static {
- addDial(staticDialedConn, n)
+ for id, t := range s.static {
+ if !isDialing(id) {
+ s.dialing[id] = t.flags
+ newtasks = append(newtasks, t)
+ }
}
// Use random nodes from the table for half of the necessary
@@ -194,17 +209,68 @@ func (s *dialstate) taskDone(t task, now time.Time) {
}
func (t *dialTask) Do(srv *Server) {
- addr := &net.TCPAddr{IP: t.dest.IP, Port: int(t.dest.TCP)}
- glog.V(logger.Debug).Infof("dialing %v\n", t.dest)
+ if t.dest.Incomplete() {
+ if !t.resolve(srv) {
+ return
+ }
+ }
+ success := t.dial(srv, t.dest)
+ // Try resolving the ID of static nodes if dialing failed.
+ if !success && t.flags&staticDialedConn != 0 {
+ if t.resolve(srv) {
+ t.dial(srv, t.dest)
+ }
+ }
+}
+
+// resolve attempts to find the current endpoint for the destination
+// using discovery.
+//
+// Resolve operations are throttled with backoff to avoid flooding the
+// discovery network with useless queries for nodes that don't exist.
+// The backoff delay resets when the node is found.
+func (t *dialTask) resolve(srv *Server) bool {
+ if srv.ntab == nil {
+ glog.V(logger.Debug).Infof("can't resolve node %x: discovery is disabled", t.dest.ID[:6])
+ return false
+ }
+ if t.resolveDelay == 0 {
+ t.resolveDelay = initialResolveDelay
+ }
+ if time.Since(t.lastResolved) < t.resolveDelay {
+ return false
+ }
+ resolved := srv.ntab.Resolve(t.dest.ID)
+ t.lastResolved = time.Now()
+ if resolved == nil {
+ t.resolveDelay *= 2
+ if t.resolveDelay > maxResolveDelay {
+ t.resolveDelay = maxResolveDelay
+ }
+ glog.V(logger.Debug).Infof("resolving node %x failed (new delay: %v)", t.dest.ID[:6], t.resolveDelay)
+ return false
+ }
+ // The node was found.
+ t.resolveDelay = initialResolveDelay
+ t.dest = resolved
+ glog.V(logger.Debug).Infof("resolved node %x: %v:%d", t.dest.ID[:6], t.dest.IP, t.dest.TCP)
+ return true
+}
+
+// dial performs the actual connection attempt.
+func (t *dialTask) dial(srv *Server, dest *discover.Node) bool {
+ addr := &net.TCPAddr{IP: dest.IP, Port: int(dest.TCP)}
+ glog.V(logger.Debug).Infof("dial tcp %v (%x)\n", addr, dest.ID[:6])
fd, err := srv.Dialer.Dial("tcp", addr.String())
if err != nil {
- glog.V(logger.Detail).Infof("dial error: %v", err)
- return
+ glog.V(logger.Detail).Infof("%v", err)
+ return false
}
mfd := newMeteredConn(fd, false)
-
- srv.setupConn(mfd, t.flags, t.dest)
+ srv.setupConn(mfd, t.flags, dest)
+ return true
}
+
func (t *dialTask) String() string {
return fmt.Sprintf("%v %x %v:%d", t.flags, t.dest.ID[:8], t.dest.IP, t.dest.TCP)
}