aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--p2p/dial.go110
-rw-r--r--p2p/dial_test.go128
2 files changed, 175 insertions, 63 deletions
diff --git a/p2p/dial.go b/p2p/dial.go
index bdc9f852c..c0e703d7d 100644
--- a/p2p/dial.go
+++ b/p2p/dial.go
@@ -36,6 +36,10 @@ const (
// Discovery lookups are throttled and can only run
// once every few seconds.
lookupInterval = 4 * time.Second
+
+ // Endpoint resolution is throttled with bounded backoff.
+ initialResolveDelay = 60 * time.Second
+ maxResolveDelay = time.Hour
)
// dialstate schedules dials and discovery lookups.
@@ -46,17 +50,17 @@ type dialstate struct {
ntab discoverTable
lookupRunning bool
-
- dialing map[discover.NodeID]connFlag
- lookupBuf []*discover.Node // current discovery lookup results
- randomNodes []*discover.Node // filled from Table
- static map[discover.NodeID]*discover.Node
- hist *dialHistory
+ dialing map[discover.NodeID]connFlag
+ lookupBuf []*discover.Node // current discovery lookup results
+ randomNodes []*discover.Node // filled from Table
+ static map[discover.NodeID]*dialTask
+ hist *dialHistory
}
type discoverTable interface {
Self() *discover.Node
Close()
+ Resolve(target discover.NodeID) *discover.Node
Lookup(target discover.NodeID) []*discover.Node
ReadRandomNodes([]*discover.Node) int
}
@@ -74,10 +78,13 @@ type task interface {
Do(*Server)
}
-// A dialTask is generated for each node that is dialed.
+// A dialTask is generated for each node that is dialed. Its
+// fields cannot be accessed while the task is running.
type dialTask struct {
- flags connFlag
- dest *discover.Node
+ flags connFlag
+ dest *discover.Node
+ lastResolved time.Time
+ resolveDelay time.Duration
}
// discoverTask runs discovery table operations.
@@ -97,26 +104,31 @@ func newDialState(static []*discover.Node, ntab discoverTable, maxdyn int) *dial
s := &dialstate{
maxDynDials: maxdyn,
ntab: ntab,
- static: make(map[discover.NodeID]*discover.Node),
+ static: make(map[discover.NodeID]*dialTask),
dialing: make(map[discover.NodeID]connFlag),
randomNodes: make([]*discover.Node, maxdyn/2),
hist: new(dialHistory),
}
for _, n := range static {
- s.static[n.ID] = n
+ s.addStatic(n)
}
return s
}
func (s *dialstate) addStatic(n *discover.Node) {
- s.static[n.ID] = n
+ // This overwites the task instead of updating an existing
+ // entry, giving users the opportunity to force a resolve operation.
+ s.static[n.ID] = &dialTask{flags: staticDialedConn, dest: n}
}
func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now time.Time) []task {
var newtasks []task
+ isDialing := func(id discover.NodeID) bool {
+ _, found := s.dialing[id]
+ return found || peers[id] != nil || s.hist.contains(id)
+ }
addDial := func(flag connFlag, n *discover.Node) bool {
- _, dialing := s.dialing[n.ID]
- if dialing || peers[n.ID] != nil || s.hist.contains(n.ID) {
+ if isDialing(n.ID) {
return false
}
s.dialing[n.ID] = flag
@@ -141,8 +153,11 @@ func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now
s.hist.expire(now)
// Create dials for static nodes if they are not connected.
- for _, n := range s.static {
- addDial(staticDialedConn, n)
+ for id, t := range s.static {
+ if !isDialing(id) {
+ s.dialing[id] = t.flags
+ newtasks = append(newtasks, t)
+ }
}
// Use random nodes from the table for half of the necessary
@@ -194,17 +209,68 @@ func (s *dialstate) taskDone(t task, now time.Time) {
}
func (t *dialTask) Do(srv *Server) {
- addr := &net.TCPAddr{IP: t.dest.IP, Port: int(t.dest.TCP)}
- glog.V(logger.Debug).Infof("dialing %v\n", t.dest)
+ if t.dest.Incomplete() {
+ if !t.resolve(srv) {
+ return
+ }
+ }
+ success := t.dial(srv, t.dest)
+ // Try resolving the ID of static nodes if dialing failed.
+ if !success && t.flags&staticDialedConn != 0 {
+ if t.resolve(srv) {
+ t.dial(srv, t.dest)
+ }
+ }
+}
+
+// resolve attempts to find the current endpoint for the destination
+// using discovery.
+//
+// Resolve operations are throttled with backoff to avoid flooding the
+// discovery network with useless queries for nodes that don't exist.
+// The backoff delay resets when the node is found.
+func (t *dialTask) resolve(srv *Server) bool {
+ if srv.ntab == nil {
+ glog.V(logger.Debug).Infof("can't resolve node %x: discovery is disabled", t.dest.ID[:6])
+ return false
+ }
+ if t.resolveDelay == 0 {
+ t.resolveDelay = initialResolveDelay
+ }
+ if time.Since(t.lastResolved) < t.resolveDelay {
+ return false
+ }
+ resolved := srv.ntab.Resolve(t.dest.ID)
+ t.lastResolved = time.Now()
+ if resolved == nil {
+ t.resolveDelay *= 2
+ if t.resolveDelay > maxResolveDelay {
+ t.resolveDelay = maxResolveDelay
+ }
+ glog.V(logger.Debug).Infof("resolving node %x failed (new delay: %v)", t.dest.ID[:6], t.resolveDelay)
+ return false
+ }
+ // The node was found.
+ t.resolveDelay = initialResolveDelay
+ t.dest = resolved
+ glog.V(logger.Debug).Infof("resolved node %x: %v:%d", t.dest.ID[:6], t.dest.IP, t.dest.TCP)
+ return true
+}
+
+// dial performs the actual connection attempt.
+func (t *dialTask) dial(srv *Server, dest *discover.Node) bool {
+ addr := &net.TCPAddr{IP: dest.IP, Port: int(dest.TCP)}
+ glog.V(logger.Debug).Infof("dial tcp %v (%x)\n", addr, dest.ID[:6])
fd, err := srv.Dialer.Dial("tcp", addr.String())
if err != nil {
- glog.V(logger.Detail).Infof("dial error: %v", err)
- return
+ glog.V(logger.Detail).Infof("%v", err)
+ return false
}
mfd := newMeteredConn(fd, false)
-
- srv.setupConn(mfd, t.flags, t.dest)
+ srv.setupConn(mfd, t.flags, dest)
+ return true
}
+
func (t *dialTask) String() string {
return fmt.Sprintf("%v %x %v:%d", t.flags, t.dest.ID[:8], t.dest.IP, t.dest.TCP)
}
diff --git a/p2p/dial_test.go b/p2p/dial_test.go
index 0127b2d87..3447660a3 100644
--- a/p2p/dial_test.go
+++ b/p2p/dial_test.go
@@ -18,6 +18,7 @@ package p2p
import (
"encoding/binary"
+ "net"
"reflect"
"testing"
"time"
@@ -79,6 +80,7 @@ type fakeTable []*discover.Node
func (t fakeTable) Self() *discover.Node { return new(discover.Node) }
func (t fakeTable) Close() {}
func (t fakeTable) Lookup(discover.NodeID) []*discover.Node { return nil }
+func (t fakeTable) Resolve(discover.NodeID) *discover.Node { return nil }
func (t fakeTable) ReadRandomNodes(buf []*discover.Node) int { return copy(buf, t) }
// This test checks that dynamic dials are launched from discovery results.
@@ -113,9 +115,9 @@ func TestDialStateDynDial(t *testing.T) {
}},
},
new: []task{
- &dialTask{dynDialedConn, &discover.Node{ID: uintID(3)}},
- &dialTask{dynDialedConn, &discover.Node{ID: uintID(4)}},
- &dialTask{dynDialedConn, &discover.Node{ID: uintID(5)}},
+ &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(3)}},
+ &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(4)}},
+ &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(5)}},
},
},
// Some of the dials complete but no new ones are launched yet because
@@ -129,8 +131,8 @@ func TestDialStateDynDial(t *testing.T) {
{rw: &conn{flags: dynDialedConn, id: uintID(4)}},
},
done: []task{
- &dialTask{dynDialedConn, &discover.Node{ID: uintID(3)}},
- &dialTask{dynDialedConn, &discover.Node{ID: uintID(4)}},
+ &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(3)}},
+ &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(4)}},
},
},
// No new dial tasks are launched in the this round because
@@ -145,7 +147,7 @@ func TestDialStateDynDial(t *testing.T) {
{rw: &conn{flags: dynDialedConn, id: uintID(5)}},
},
done: []task{
- &dialTask{dynDialedConn, &discover.Node{ID: uintID(5)}},
+ &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(5)}},
},
new: []task{
&waitExpireTask{Duration: 14 * time.Second},
@@ -162,7 +164,7 @@ func TestDialStateDynDial(t *testing.T) {
{rw: &conn{flags: dynDialedConn, id: uintID(5)}},
},
new: []task{
- &dialTask{dynDialedConn, &discover.Node{ID: uintID(6)}},
+ &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(6)}},
},
},
// More peers (3,4) drop off and dial for ID 6 completes.
@@ -175,10 +177,10 @@ func TestDialStateDynDial(t *testing.T) {
{rw: &conn{flags: dynDialedConn, id: uintID(5)}},
},
done: []task{
- &dialTask{dynDialedConn, &discover.Node{ID: uintID(6)}},
+ &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(6)}},
},
new: []task{
- &dialTask{dynDialedConn, &discover.Node{ID: uintID(7)}},
+ &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(7)}},
&discoverTask{},
},
},
@@ -193,7 +195,7 @@ func TestDialStateDynDial(t *testing.T) {
{rw: &conn{flags: dynDialedConn, id: uintID(7)}},
},
done: []task{
- &dialTask{dynDialedConn, &discover.Node{ID: uintID(7)}},
+ &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(7)}},
},
},
// Finish the running node discovery with an empty set. A new lookup
@@ -236,11 +238,11 @@ func TestDialStateDynDialFromTable(t *testing.T) {
// 5 out of 8 of the nodes returned by ReadRandomNodes are dialed.
{
new: []task{
- &dialTask{dynDialedConn, &discover.Node{ID: uintID(1)}},
- &dialTask{dynDialedConn, &discover.Node{ID: uintID(2)}},
- &dialTask{dynDialedConn, &discover.Node{ID: uintID(3)}},
- &dialTask{dynDialedConn, &discover.Node{ID: uintID(4)}},
- &dialTask{dynDialedConn, &discover.Node{ID: uintID(5)}},
+ &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(1)}},
+ &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(2)}},
+ &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(3)}},
+ &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(4)}},
+ &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(5)}},
&discoverTask{},
},
},
@@ -251,8 +253,8 @@ func TestDialStateDynDialFromTable(t *testing.T) {
{rw: &conn{flags: dynDialedConn, id: uintID(2)}},
},
done: []task{
- &dialTask{dynDialedConn, &discover.Node{ID: uintID(1)}},
- &dialTask{dynDialedConn, &discover.Node{ID: uintID(2)}},
+ &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(1)}},
+ &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(2)}},
&discoverTask{results: []*discover.Node{
{ID: uintID(10)},
{ID: uintID(11)},
@@ -260,9 +262,9 @@ func TestDialStateDynDialFromTable(t *testing.T) {
}},
},
new: []task{
- &dialTask{dynDialedConn, &discover.Node{ID: uintID(10)}},
- &dialTask{dynDialedConn, &discover.Node{ID: uintID(11)}},
- &dialTask{dynDialedConn, &discover.Node{ID: uintID(12)}},
+ &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(10)}},
+ &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(11)}},
+ &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(12)}},
&discoverTask{},
},
},
@@ -276,12 +278,12 @@ func TestDialStateDynDialFromTable(t *testing.T) {
{rw: &conn{flags: dynDialedConn, id: uintID(12)}},
},
done: []task{
- &dialTask{dynDialedConn, &discover.Node{ID: uintID(3)}},
- &dialTask{dynDialedConn, &discover.Node{ID: uintID(4)}},
- &dialTask{dynDialedConn, &discover.Node{ID: uintID(5)}},
- &dialTask{dynDialedConn, &discover.Node{ID: uintID(10)}},
- &dialTask{dynDialedConn, &discover.Node{ID: uintID(11)}},
- &dialTask{dynDialedConn, &discover.Node{ID: uintID(12)}},
+ &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(3)}},
+ &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(4)}},
+ &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(5)}},
+ &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(10)}},
+ &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(11)}},
+ &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(12)}},
},
},
// Waiting for expiry. No waitExpireTask is launched because the
@@ -332,9 +334,9 @@ func TestDialStateStaticDial(t *testing.T) {
{rw: &conn{flags: dynDialedConn, id: uintID(2)}},
},
new: []task{
- &dialTask{staticDialedConn, &discover.Node{ID: uintID(3)}},
- &dialTask{staticDialedConn, &discover.Node{ID: uintID(4)}},
- &dialTask{staticDialedConn, &discover.Node{ID: uintID(5)}},
+ &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(3)}},
+ &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(4)}},
+ &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(5)}},
},
},
// No new tasks are launched in this round because all static
@@ -346,7 +348,7 @@ func TestDialStateStaticDial(t *testing.T) {
{rw: &conn{flags: staticDialedConn, id: uintID(3)}},
},
done: []task{
- &dialTask{staticDialedConn, &discover.Node{ID: uintID(3)}},
+ &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(3)}},
},
},
// No new dial tasks are launched because all static
@@ -360,8 +362,8 @@ func TestDialStateStaticDial(t *testing.T) {
{rw: &conn{flags: staticDialedConn, id: uintID(5)}},
},
done: []task{
- &dialTask{staticDialedConn, &discover.Node{ID: uintID(4)}},
- &dialTask{staticDialedConn, &discover.Node{ID: uintID(5)}},
+ &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(4)}},
+ &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(5)}},
},
new: []task{
&waitExpireTask{Duration: 14 * time.Second},
@@ -386,8 +388,8 @@ func TestDialStateStaticDial(t *testing.T) {
{rw: &conn{flags: staticDialedConn, id: uintID(5)}},
},
new: []task{
- &dialTask{staticDialedConn, &discover.Node{ID: uintID(2)}},
- &dialTask{staticDialedConn, &discover.Node{ID: uintID(4)}},
+ &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(2)}},
+ &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(4)}},
},
},
},
@@ -410,9 +412,9 @@ func TestDialStateCache(t *testing.T) {
{
peers: nil,
new: []task{
- &dialTask{staticDialedConn, &discover.Node{ID: uintID(1)}},
- &dialTask{staticDialedConn, &discover.Node{ID: uintID(2)}},
- &dialTask{staticDialedConn, &discover.Node{ID: uintID(3)}},
+ &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(1)}},
+ &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(2)}},
+ &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(3)}},
},
},
// No new tasks are launched in this round because all static
@@ -423,8 +425,8 @@ func TestDialStateCache(t *testing.T) {
{rw: &conn{flags: staticDialedConn, id: uintID(2)}},
},
done: []task{
- &dialTask{staticDialedConn, &discover.Node{ID: uintID(1)}},
- &dialTask{staticDialedConn, &discover.Node{ID: uintID(2)}},
+ &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(1)}},
+ &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(2)}},
},
},
// A salvage task is launched to wait for node 3's history
@@ -435,7 +437,7 @@ func TestDialStateCache(t *testing.T) {
{rw: &conn{flags: dynDialedConn, id: uintID(2)}},
},
done: []task{
- &dialTask{staticDialedConn, &discover.Node{ID: uintID(3)}},
+ &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(3)}},
},
new: []task{
&waitExpireTask{Duration: 14 * time.Second},
@@ -455,13 +457,40 @@ func TestDialStateCache(t *testing.T) {
{rw: &conn{flags: dynDialedConn, id: uintID(2)}},
},
new: []task{
- &dialTask{staticDialedConn, &discover.Node{ID: uintID(3)}},
+ &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(3)}},
},
},
},
})
}
+func TestDialResolve(t *testing.T) {
+ resolved := discover.NewNode(uintID(1), net.IP{127, 0, 55, 234}, 3333, 4444)
+ table := &resolveMock{answer: resolved}
+ state := newDialState(nil, table, 0)
+
+ // Check that the task is generated with an incomplete ID.
+ dest := discover.NewNode(uintID(1), nil, 0, 0)
+ state.addStatic(dest)
+ tasks := state.newTasks(0, nil, time.Time{})
+ if !reflect.DeepEqual(tasks, []task{&dialTask{flags: staticDialedConn, dest: dest}}) {
+ t.Fatalf("expected dial task, got %#v", tasks)
+ }
+
+ // Now run the task, it should resolve the ID once.
+ srv := &Server{ntab: table, Dialer: &net.Dialer{Deadline: time.Now().Add(-5 * time.Minute)}}
+ tasks[0].Do(srv)
+ if !reflect.DeepEqual(table.resolveCalls, []discover.NodeID{dest.ID}) {
+ t.Fatalf("wrong resolve calls, got %v", table.resolveCalls)
+ }
+
+ // Report it as done to the dialer, which should update the static node record.
+ state.taskDone(tasks[0], time.Now())
+ if state.static[uintID(1)].dest != resolved {
+ t.Fatalf("state.dest not updated")
+ }
+}
+
// compares task lists but doesn't care about the order.
func sametasks(a, b []task) bool {
if len(a) != len(b) {
@@ -484,3 +513,20 @@ func uintID(i uint32) discover.NodeID {
binary.BigEndian.PutUint32(id[:], i)
return id
}
+
+// implements discoverTable for TestDialResolve
+type resolveMock struct {
+ resolveCalls []discover.NodeID
+ answer *discover.Node
+}
+
+func (t *resolveMock) Resolve(id discover.NodeID) *discover.Node {
+ t.resolveCalls = append(t.resolveCalls, id)
+ return t.answer
+}
+
+func (t *resolveMock) Self() *discover.Node { return new(discover.Node) }
+func (t *resolveMock) Close() {}
+func (t *resolveMock) Bootstrap([]*discover.Node) {}
+func (t *resolveMock) Lookup(discover.NodeID) []*discover.Node { return nil }
+func (t *resolveMock) ReadRandomNodes(buf []*discover.Node) int { return 0 }