diff options
-rw-r--r-- | p2p/dial.go | 143 | ||||
-rw-r--r-- | p2p/dial_test.go | 156 | ||||
-rw-r--r-- | p2p/discover/database_test.go | 16 | ||||
-rw-r--r-- | p2p/discover/node.go | 92 | ||||
-rw-r--r-- | p2p/discover/node_test.go | 88 | ||||
-rw-r--r-- | p2p/discover/table.go | 131 | ||||
-rw-r--r-- | p2p/discover/table_test.go | 6 | ||||
-rw-r--r-- | p2p/discover/udp.go | 12 | ||||
-rw-r--r-- | p2p/discover/udp_test.go | 5 | ||||
-rw-r--r-- | p2p/server.go | 3 |
10 files changed, 444 insertions, 208 deletions
diff --git a/p2p/dial.go b/p2p/dial.go index 0fd3a4cf5..c0e703d7d 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -36,6 +36,10 @@ const ( // Discovery lookups are throttled and can only run // once every few seconds. lookupInterval = 4 * time.Second + + // Endpoint resolution is throttled with bounded backoff. + initialResolveDelay = 60 * time.Second + maxResolveDelay = time.Hour ) // dialstate schedules dials and discovery lookups. @@ -46,19 +50,17 @@ type dialstate struct { ntab discoverTable lookupRunning bool - bootstrapped bool - - dialing map[discover.NodeID]connFlag - lookupBuf []*discover.Node // current discovery lookup results - randomNodes []*discover.Node // filled from Table - static map[discover.NodeID]*discover.Node - hist *dialHistory + dialing map[discover.NodeID]connFlag + lookupBuf []*discover.Node // current discovery lookup results + randomNodes []*discover.Node // filled from Table + static map[discover.NodeID]*dialTask + hist *dialHistory } type discoverTable interface { Self() *discover.Node Close() - Bootstrap([]*discover.Node) + Resolve(target discover.NodeID) *discover.Node Lookup(target discover.NodeID) []*discover.Node ReadRandomNodes([]*discover.Node) int } @@ -76,21 +78,20 @@ type task interface { Do(*Server) } -// A dialTask is generated for each node that is dialed. +// A dialTask is generated for each node that is dialed. Its +// fields cannot be accessed while the task is running. type dialTask struct { - flags connFlag - dest *discover.Node + flags connFlag + dest *discover.Node + lastResolved time.Time + resolveDelay time.Duration } // discoverTask runs discovery table operations. // Only one discoverTask is active at any time. -// -// If bootstrap is true, the task runs Table.Bootstrap, -// otherwise it performs a random lookup and leaves the -// results in the task. +// discoverTask.Do performs a random lookup. type discoverTask struct { - bootstrap bool - results []*discover.Node + results []*discover.Node } // A waitExpireTask is generated if there are no other tasks @@ -103,26 +104,31 @@ func newDialState(static []*discover.Node, ntab discoverTable, maxdyn int) *dial s := &dialstate{ maxDynDials: maxdyn, ntab: ntab, - static: make(map[discover.NodeID]*discover.Node), + static: make(map[discover.NodeID]*dialTask), dialing: make(map[discover.NodeID]connFlag), randomNodes: make([]*discover.Node, maxdyn/2), hist: new(dialHistory), } for _, n := range static { - s.static[n.ID] = n + s.addStatic(n) } return s } func (s *dialstate) addStatic(n *discover.Node) { - s.static[n.ID] = n + // This overwites the task instead of updating an existing + // entry, giving users the opportunity to force a resolve operation. + s.static[n.ID] = &dialTask{flags: staticDialedConn, dest: n} } func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now time.Time) []task { var newtasks []task + isDialing := func(id discover.NodeID) bool { + _, found := s.dialing[id] + return found || peers[id] != nil || s.hist.contains(id) + } addDial := func(flag connFlag, n *discover.Node) bool { - _, dialing := s.dialing[n.ID] - if dialing || peers[n.ID] != nil || s.hist.contains(n.ID) { + if isDialing(n.ID) { return false } s.dialing[n.ID] = flag @@ -147,14 +153,17 @@ func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now s.hist.expire(now) // Create dials for static nodes if they are not connected. - for _, n := range s.static { - addDial(staticDialedConn, n) + for id, t := range s.static { + if !isDialing(id) { + s.dialing[id] = t.flags + newtasks = append(newtasks, t) + } } // Use random nodes from the table for half of the necessary // dynamic dials. randomCandidates := needDynDials / 2 - if randomCandidates > 0 && s.bootstrapped { + if randomCandidates > 0 { n := s.ntab.ReadRandomNodes(s.randomNodes) for i := 0; i < randomCandidates && i < n; i++ { if addDial(dynDialedConn, s.randomNodes[i]) { @@ -171,12 +180,10 @@ func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now } } s.lookupBuf = s.lookupBuf[:copy(s.lookupBuf, s.lookupBuf[i:])] - // Launch a discovery lookup if more candidates are needed. The - // first discoverTask bootstraps the table and won't return any - // results. + // Launch a discovery lookup if more candidates are needed. if len(s.lookupBuf) < needDynDials && !s.lookupRunning { s.lookupRunning = true - newtasks = append(newtasks, &discoverTask{bootstrap: !s.bootstrapped}) + newtasks = append(newtasks, &discoverTask{}) } // Launch a timer to wait for the next node to expire if all @@ -196,35 +203,79 @@ func (s *dialstate) taskDone(t task, now time.Time) { s.hist.add(t.dest.ID, now.Add(dialHistoryExpiration)) delete(s.dialing, t.dest.ID) case *discoverTask: - if t.bootstrap { - s.bootstrapped = true - } s.lookupRunning = false s.lookupBuf = append(s.lookupBuf, t.results...) } } func (t *dialTask) Do(srv *Server) { - addr := &net.TCPAddr{IP: t.dest.IP, Port: int(t.dest.TCP)} - glog.V(logger.Debug).Infof("dialing %v\n", t.dest) + if t.dest.Incomplete() { + if !t.resolve(srv) { + return + } + } + success := t.dial(srv, t.dest) + // Try resolving the ID of static nodes if dialing failed. + if !success && t.flags&staticDialedConn != 0 { + if t.resolve(srv) { + t.dial(srv, t.dest) + } + } +} + +// resolve attempts to find the current endpoint for the destination +// using discovery. +// +// Resolve operations are throttled with backoff to avoid flooding the +// discovery network with useless queries for nodes that don't exist. +// The backoff delay resets when the node is found. +func (t *dialTask) resolve(srv *Server) bool { + if srv.ntab == nil { + glog.V(logger.Debug).Infof("can't resolve node %x: discovery is disabled", t.dest.ID[:6]) + return false + } + if t.resolveDelay == 0 { + t.resolveDelay = initialResolveDelay + } + if time.Since(t.lastResolved) < t.resolveDelay { + return false + } + resolved := srv.ntab.Resolve(t.dest.ID) + t.lastResolved = time.Now() + if resolved == nil { + t.resolveDelay *= 2 + if t.resolveDelay > maxResolveDelay { + t.resolveDelay = maxResolveDelay + } + glog.V(logger.Debug).Infof("resolving node %x failed (new delay: %v)", t.dest.ID[:6], t.resolveDelay) + return false + } + // The node was found. + t.resolveDelay = initialResolveDelay + t.dest = resolved + glog.V(logger.Debug).Infof("resolved node %x: %v:%d", t.dest.ID[:6], t.dest.IP, t.dest.TCP) + return true +} + +// dial performs the actual connection attempt. +func (t *dialTask) dial(srv *Server, dest *discover.Node) bool { + addr := &net.TCPAddr{IP: dest.IP, Port: int(dest.TCP)} + glog.V(logger.Debug).Infof("dial tcp %v (%x)\n", addr, dest.ID[:6]) fd, err := srv.Dialer.Dial("tcp", addr.String()) if err != nil { - glog.V(logger.Detail).Infof("dial error: %v", err) - return + glog.V(logger.Detail).Infof("%v", err) + return false } mfd := newMeteredConn(fd, false) - - srv.setupConn(mfd, t.flags, t.dest) + srv.setupConn(mfd, t.flags, dest) + return true } + func (t *dialTask) String() string { return fmt.Sprintf("%v %x %v:%d", t.flags, t.dest.ID[:8], t.dest.IP, t.dest.TCP) } func (t *discoverTask) Do(srv *Server) { - if t.bootstrap { - srv.ntab.Bootstrap(srv.BootstrapNodes) - return - } // newTasks generates a lookup task whenever dynamic dials are // necessary. Lookups need to take some time, otherwise the // event loop spins too fast. @@ -238,12 +289,8 @@ func (t *discoverTask) Do(srv *Server) { t.results = srv.ntab.Lookup(target) } -func (t *discoverTask) String() (s string) { - if t.bootstrap { - s = "discovery bootstrap" - } else { - s = "discovery lookup" - } +func (t *discoverTask) String() string { + s := "discovery lookup" if len(t.results) > 0 { s += fmt.Sprintf(" (%d results)", len(t.results)) } diff --git a/p2p/dial_test.go b/p2p/dial_test.go index d24e03e29..3447660a3 100644 --- a/p2p/dial_test.go +++ b/p2p/dial_test.go @@ -18,6 +18,7 @@ package p2p import ( "encoding/binary" + "net" "reflect" "testing" "time" @@ -76,15 +77,11 @@ func runDialTest(t *testing.T, test dialtest) { type fakeTable []*discover.Node -func (t fakeTable) Self() *discover.Node { return new(discover.Node) } -func (t fakeTable) Close() {} -func (t fakeTable) Bootstrap([]*discover.Node) {} -func (t fakeTable) Lookup(target discover.NodeID) []*discover.Node { - return nil -} -func (t fakeTable) ReadRandomNodes(buf []*discover.Node) int { - return copy(buf, t) -} +func (t fakeTable) Self() *discover.Node { return new(discover.Node) } +func (t fakeTable) Close() {} +func (t fakeTable) Lookup(discover.NodeID) []*discover.Node { return nil } +func (t fakeTable) Resolve(discover.NodeID) *discover.Node { return nil } +func (t fakeTable) ReadRandomNodes(buf []*discover.Node) int { return copy(buf, t) } // This test checks that dynamic dials are launched from discovery results. func TestDialStateDynDial(t *testing.T) { @@ -98,7 +95,7 @@ func TestDialStateDynDial(t *testing.T) { {rw: &conn{flags: dynDialedConn, id: uintID(1)}}, {rw: &conn{flags: dynDialedConn, id: uintID(2)}}, }, - new: []task{&discoverTask{bootstrap: true}}, + new: []task{&discoverTask{}}, }, // Dynamic dials are launched when it completes. { @@ -108,7 +105,7 @@ func TestDialStateDynDial(t *testing.T) { {rw: &conn{flags: dynDialedConn, id: uintID(2)}}, }, done: []task{ - &discoverTask{bootstrap: true, results: []*discover.Node{ + &discoverTask{results: []*discover.Node{ {ID: uintID(2)}, // this one is already connected and not dialed. {ID: uintID(3)}, {ID: uintID(4)}, @@ -118,9 +115,9 @@ func TestDialStateDynDial(t *testing.T) { }}, }, new: []task{ - &dialTask{dynDialedConn, &discover.Node{ID: uintID(3)}}, - &dialTask{dynDialedConn, &discover.Node{ID: uintID(4)}}, - &dialTask{dynDialedConn, &discover.Node{ID: uintID(5)}}, + &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(3)}}, + &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(4)}}, + &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(5)}}, }, }, // Some of the dials complete but no new ones are launched yet because @@ -134,8 +131,8 @@ func TestDialStateDynDial(t *testing.T) { {rw: &conn{flags: dynDialedConn, id: uintID(4)}}, }, done: []task{ - &dialTask{dynDialedConn, &discover.Node{ID: uintID(3)}}, - &dialTask{dynDialedConn, &discover.Node{ID: uintID(4)}}, + &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(3)}}, + &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(4)}}, }, }, // No new dial tasks are launched in the this round because @@ -150,7 +147,7 @@ func TestDialStateDynDial(t *testing.T) { {rw: &conn{flags: dynDialedConn, id: uintID(5)}}, }, done: []task{ - &dialTask{dynDialedConn, &discover.Node{ID: uintID(5)}}, + &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(5)}}, }, new: []task{ &waitExpireTask{Duration: 14 * time.Second}, @@ -167,7 +164,7 @@ func TestDialStateDynDial(t *testing.T) { {rw: &conn{flags: dynDialedConn, id: uintID(5)}}, }, new: []task{ - &dialTask{dynDialedConn, &discover.Node{ID: uintID(6)}}, + &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(6)}}, }, }, // More peers (3,4) drop off and dial for ID 6 completes. @@ -180,10 +177,10 @@ func TestDialStateDynDial(t *testing.T) { {rw: &conn{flags: dynDialedConn, id: uintID(5)}}, }, done: []task{ - &dialTask{dynDialedConn, &discover.Node{ID: uintID(6)}}, + &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(6)}}, }, new: []task{ - &dialTask{dynDialedConn, &discover.Node{ID: uintID(7)}}, + &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(7)}}, &discoverTask{}, }, }, @@ -198,7 +195,7 @@ func TestDialStateDynDial(t *testing.T) { {rw: &conn{flags: dynDialedConn, id: uintID(7)}}, }, done: []task{ - &dialTask{dynDialedConn, &discover.Node{ID: uintID(7)}}, + &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(7)}}, }, }, // Finish the running node discovery with an empty set. A new lookup @@ -238,22 +235,15 @@ func TestDialStateDynDialFromTable(t *testing.T) { runDialTest(t, dialtest{ init: newDialState(nil, table, 10), rounds: []round{ - // Discovery bootstrap is launched. - { - new: []task{&discoverTask{bootstrap: true}}, - }, // 5 out of 8 of the nodes returned by ReadRandomNodes are dialed. { - done: []task{ - &discoverTask{bootstrap: true}, - }, new: []task{ - &dialTask{dynDialedConn, &discover.Node{ID: uintID(1)}}, - &dialTask{dynDialedConn, &discover.Node{ID: uintID(2)}}, - &dialTask{dynDialedConn, &discover.Node{ID: uintID(3)}}, - &dialTask{dynDialedConn, &discover.Node{ID: uintID(4)}}, - &dialTask{dynDialedConn, &discover.Node{ID: uintID(5)}}, - &discoverTask{bootstrap: false}, + &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(1)}}, + &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(2)}}, + &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(3)}}, + &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(4)}}, + &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(5)}}, + &discoverTask{}, }, }, // Dialing nodes 1,2 succeeds. Dials from the lookup are launched. @@ -263,8 +253,8 @@ func TestDialStateDynDialFromTable(t *testing.T) { {rw: &conn{flags: dynDialedConn, id: uintID(2)}}, }, done: []task{ - &dialTask{dynDialedConn, &discover.Node{ID: uintID(1)}}, - &dialTask{dynDialedConn, &discover.Node{ID: uintID(2)}}, + &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(1)}}, + &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(2)}}, &discoverTask{results: []*discover.Node{ {ID: uintID(10)}, {ID: uintID(11)}, @@ -272,10 +262,10 @@ func TestDialStateDynDialFromTable(t *testing.T) { }}, }, new: []task{ - &dialTask{dynDialedConn, &discover.Node{ID: uintID(10)}}, - &dialTask{dynDialedConn, &discover.Node{ID: uintID(11)}}, - &dialTask{dynDialedConn, &discover.Node{ID: uintID(12)}}, - &discoverTask{bootstrap: false}, + &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(10)}}, + &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(11)}}, + &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(12)}}, + &discoverTask{}, }, }, // Dialing nodes 3,4,5 fails. The dials from the lookup succeed. @@ -288,12 +278,12 @@ func TestDialStateDynDialFromTable(t *testing.T) { {rw: &conn{flags: dynDialedConn, id: uintID(12)}}, }, done: []task{ - &dialTask{dynDialedConn, &discover.Node{ID: uintID(3)}}, - &dialTask{dynDialedConn, &discover.Node{ID: uintID(4)}}, - &dialTask{dynDialedConn, &discover.Node{ID: uintID(5)}}, - &dialTask{dynDialedConn, &discover.Node{ID: uintID(10)}}, - &dialTask{dynDialedConn, &discover.Node{ID: uintID(11)}}, - &dialTask{dynDialedConn, &discover.Node{ID: uintID(12)}}, + &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(3)}}, + &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(4)}}, + &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(5)}}, + &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(10)}}, + &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(11)}}, + &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(12)}}, }, }, // Waiting for expiry. No waitExpireTask is launched because the @@ -344,9 +334,9 @@ func TestDialStateStaticDial(t *testing.T) { {rw: &conn{flags: dynDialedConn, id: uintID(2)}}, }, new: []task{ - &dialTask{staticDialedConn, &discover.Node{ID: uintID(3)}}, - &dialTask{staticDialedConn, &discover.Node{ID: uintID(4)}}, - &dialTask{staticDialedConn, &discover.Node{ID: uintID(5)}}, + &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(3)}}, + &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(4)}}, + &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(5)}}, }, }, // No new tasks are launched in this round because all static @@ -358,7 +348,7 @@ func TestDialStateStaticDial(t *testing.T) { {rw: &conn{flags: staticDialedConn, id: uintID(3)}}, }, done: []task{ - &dialTask{staticDialedConn, &discover.Node{ID: uintID(3)}}, + &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(3)}}, }, }, // No new dial tasks are launched because all static @@ -372,8 +362,8 @@ func TestDialStateStaticDial(t *testing.T) { {rw: &conn{flags: staticDialedConn, id: uintID(5)}}, }, done: []task{ - &dialTask{staticDialedConn, &discover.Node{ID: uintID(4)}}, - &dialTask{staticDialedConn, &discover.Node{ID: uintID(5)}}, + &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(4)}}, + &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(5)}}, }, new: []task{ &waitExpireTask{Duration: 14 * time.Second}, @@ -398,8 +388,8 @@ func TestDialStateStaticDial(t *testing.T) { {rw: &conn{flags: staticDialedConn, id: uintID(5)}}, }, new: []task{ - &dialTask{staticDialedConn, &discover.Node{ID: uintID(2)}}, - &dialTask{staticDialedConn, &discover.Node{ID: uintID(4)}}, + &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(2)}}, + &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(4)}}, }, }, }, @@ -422,9 +412,9 @@ func TestDialStateCache(t *testing.T) { { peers: nil, new: []task{ - &dialTask{staticDialedConn, &discover.Node{ID: uintID(1)}}, - &dialTask{staticDialedConn, &discover.Node{ID: uintID(2)}}, - &dialTask{staticDialedConn, &discover.Node{ID: uintID(3)}}, + &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(1)}}, + &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(2)}}, + &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(3)}}, }, }, // No new tasks are launched in this round because all static @@ -435,8 +425,8 @@ func TestDialStateCache(t *testing.T) { {rw: &conn{flags: staticDialedConn, id: uintID(2)}}, }, done: []task{ - &dialTask{staticDialedConn, &discover.Node{ID: uintID(1)}}, - &dialTask{staticDialedConn, &discover.Node{ID: uintID(2)}}, + &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(1)}}, + &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(2)}}, }, }, // A salvage task is launched to wait for node 3's history @@ -447,7 +437,7 @@ func TestDialStateCache(t *testing.T) { {rw: &conn{flags: dynDialedConn, id: uintID(2)}}, }, done: []task{ - &dialTask{staticDialedConn, &discover.Node{ID: uintID(3)}}, + &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(3)}}, }, new: []task{ &waitExpireTask{Duration: 14 * time.Second}, @@ -467,13 +457,40 @@ func TestDialStateCache(t *testing.T) { {rw: &conn{flags: dynDialedConn, id: uintID(2)}}, }, new: []task{ - &dialTask{staticDialedConn, &discover.Node{ID: uintID(3)}}, + &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(3)}}, }, }, }, }) } +func TestDialResolve(t *testing.T) { + resolved := discover.NewNode(uintID(1), net.IP{127, 0, 55, 234}, 3333, 4444) + table := &resolveMock{answer: resolved} + state := newDialState(nil, table, 0) + + // Check that the task is generated with an incomplete ID. + dest := discover.NewNode(uintID(1), nil, 0, 0) + state.addStatic(dest) + tasks := state.newTasks(0, nil, time.Time{}) + if !reflect.DeepEqual(tasks, []task{&dialTask{flags: staticDialedConn, dest: dest}}) { + t.Fatalf("expected dial task, got %#v", tasks) + } + + // Now run the task, it should resolve the ID once. + srv := &Server{ntab: table, Dialer: &net.Dialer{Deadline: time.Now().Add(-5 * time.Minute)}} + tasks[0].Do(srv) + if !reflect.DeepEqual(table.resolveCalls, []discover.NodeID{dest.ID}) { + t.Fatalf("wrong resolve calls, got %v", table.resolveCalls) + } + + // Report it as done to the dialer, which should update the static node record. + state.taskDone(tasks[0], time.Now()) + if state.static[uintID(1)].dest != resolved { + t.Fatalf("state.dest not updated") + } +} + // compares task lists but doesn't care about the order. func sametasks(a, b []task) bool { if len(a) != len(b) { @@ -496,3 +513,20 @@ func uintID(i uint32) discover.NodeID { binary.BigEndian.PutUint32(id[:], i) return id } + +// implements discoverTable for TestDialResolve +type resolveMock struct { + resolveCalls []discover.NodeID + answer *discover.Node +} + +func (t *resolveMock) Resolve(id discover.NodeID) *discover.Node { + t.resolveCalls = append(t.resolveCalls, id) + return t.answer +} + +func (t *resolveMock) Self() *discover.Node { return new(discover.Node) } +func (t *resolveMock) Close() {} +func (t *resolveMock) Bootstrap([]*discover.Node) {} +func (t *resolveMock) Lookup(discover.NodeID) []*discover.Node { return nil } +func (t *resolveMock) ReadRandomNodes(buf []*discover.Node) int { return 0 } diff --git a/p2p/discover/database_test.go b/p2p/discover/database_test.go index 80c1a6ff2..5a729f02b 100644 --- a/p2p/discover/database_test.go +++ b/p2p/discover/database_test.go @@ -102,7 +102,7 @@ func TestNodeDBInt64(t *testing.T) { } func TestNodeDBFetchStore(t *testing.T) { - node := newNode( + node := NewNode( MustHexID("0x1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), net.IP{192, 168, 0, 1}, 30303, @@ -165,7 +165,7 @@ var nodeDBSeedQueryNodes = []struct { // This one should not be in the result set because its last // pong time is too far in the past. { - node: newNode( + node: NewNode( MustHexID("0x84d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), net.IP{127, 0, 0, 3}, 30303, @@ -176,7 +176,7 @@ var nodeDBSeedQueryNodes = []struct { // This one shouldn't be in in the result set because its // nodeID is the local node's ID. { - node: newNode( + node: NewNode( MustHexID("0x57d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), net.IP{127, 0, 0, 3}, 30303, @@ -187,7 +187,7 @@ var nodeDBSeedQueryNodes = []struct { // These should be in the result set. { - node: newNode( + node: NewNode( MustHexID("0x22d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), net.IP{127, 0, 0, 1}, 30303, @@ -196,7 +196,7 @@ var nodeDBSeedQueryNodes = []struct { pong: time.Now().Add(-2 * time.Second), }, { - node: newNode( + node: NewNode( MustHexID("0x44d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), net.IP{127, 0, 0, 2}, 30303, @@ -205,7 +205,7 @@ var nodeDBSeedQueryNodes = []struct { pong: time.Now().Add(-3 * time.Second), }, { - node: newNode( + node: NewNode( MustHexID("0xe2d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), net.IP{127, 0, 0, 3}, 30303, @@ -303,7 +303,7 @@ var nodeDBExpirationNodes = []struct { exp bool }{ { - node: newNode( + node: NewNode( MustHexID("0x01d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), net.IP{127, 0, 0, 1}, 30303, @@ -312,7 +312,7 @@ var nodeDBExpirationNodes = []struct { pong: time.Now().Add(-nodeDBNodeExpiration + time.Minute), exp: false, }, { - node: newNode( + node: NewNode( MustHexID("0x02d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), net.IP{127, 0, 0, 2}, 30303, diff --git a/p2p/discover/node.go b/p2p/discover/node.go index dd19df3a2..c4a3b5011 100644 --- a/p2p/discover/node.go +++ b/p2p/discover/node.go @@ -26,6 +26,7 @@ import ( "math/rand" "net" "net/url" + "regexp" "strconv" "strings" @@ -37,6 +38,7 @@ import ( const nodeIDBits = 512 // Node represents a host on the network. +// The fields of Node may not be modified. type Node struct { IP net.IP // len 4 for IPv4 or 16 for IPv6 UDP, TCP uint16 // port numbers @@ -54,7 +56,9 @@ type Node struct { contested bool } -func newNode(id NodeID, ip net.IP, udpPort, tcpPort uint16) *Node { +// NewNode creates a new node. It is mostly meant to be used for +// testing purposes. +func NewNode(id NodeID, ip net.IP, udpPort, tcpPort uint16) *Node { if ipv4 := ip.To4(); ipv4 != nil { ip = ipv4 } @@ -71,31 +75,65 @@ func (n *Node) addr() *net.UDPAddr { return &net.UDPAddr{IP: n.IP, Port: int(n.UDP)} } +// Incomplete returns true for nodes with no IP address. +func (n *Node) Incomplete() bool { + return n.IP == nil +} + +// checks whether n is a valid complete node. +func (n *Node) validateComplete() error { + if n.Incomplete() { + return errors.New("incomplete node") + } + if n.UDP == 0 { + return errors.New("missing UDP port") + } + if n.TCP == 0 { + return errors.New("missing TCP port") + } + if n.IP.IsMulticast() || n.IP.IsUnspecified() { + return errors.New("invalid IP (multicast/unspecified)") + } + _, err := n.ID.Pubkey() // validate the key (on curve, etc.) + return err +} + // The string representation of a Node is a URL. // Please see ParseNode for a description of the format. func (n *Node) String() string { - addr := net.TCPAddr{IP: n.IP, Port: int(n.TCP)} - u := url.URL{ - Scheme: "enode", - User: url.User(fmt.Sprintf("%x", n.ID[:])), - Host: addr.String(), - } - if n.UDP != n.TCP { - u.RawQuery = "discport=" + strconv.Itoa(int(n.UDP)) + u := url.URL{Scheme: "enode"} + if n.Incomplete() { + u.Host = fmt.Sprintf("%x", n.ID[:]) + } else { + addr := net.TCPAddr{IP: n.IP, Port: int(n.TCP)} + u.User = url.User(fmt.Sprintf("%x", n.ID[:])) + u.Host = addr.String() + if n.UDP != n.TCP { + u.RawQuery = "discport=" + strconv.Itoa(int(n.UDP)) + } } return u.String() } -// ParseNode parses a node URL. +var incompleteNodeURL = regexp.MustCompile("(?i)^(?:enode://)?([0-9a-f]+)$") + +// ParseNode parses a node designator. +// +// There are two basic forms of node designators +// - incomplete nodes, which only have the public key (node ID) +// - complete nodes, which contain the public key and IP/Port information +// +// For incomplete nodes, the designator must look like one of these // -// A node URL has scheme "enode". +// enode://<hex node id> +// <hex node id> // -// The hexadecimal node ID is encoded in the username portion of the -// URL, separated from the host by an @ sign. The hostname can only be -// given as an IP address, DNS domain names are not allowed. The port -// in the host name section is the TCP listening port. If the TCP and -// UDP (discovery) ports differ, the UDP port is specified as query -// parameter "discport". +// For complete nodes, the node ID is encoded in the username portion +// of the URL, separated from the host by an @ sign. The hostname can +// only be given as an IP address, DNS domain names are not allowed. +// The port in the host name section is the TCP listening port. If the +// TCP and UDP (discovery) ports differ, the UDP port is specified as +// query parameter "discport". // // In the following example, the node URL describes // a node with IP address 10.3.58.6, TCP listening port 30303 @@ -103,12 +141,26 @@ func (n *Node) String() string { // // enode://<hex node id>@10.3.58.6:30303?discport=30301 func ParseNode(rawurl string) (*Node, error) { + if m := incompleteNodeURL.FindStringSubmatch(rawurl); m != nil { + id, err := HexID(m[1]) + if err != nil { + return nil, fmt.Errorf("invalid node ID (%v)", err) + } + return NewNode(id, nil, 0, 0), nil + } + return parseComplete(rawurl) +} + +func parseComplete(rawurl string) (*Node, error) { var ( id NodeID ip net.IP tcpPort, udpPort uint64 ) u, err := url.Parse(rawurl) + if err != nil { + return nil, err + } if u.Scheme != "enode" { return nil, errors.New("invalid URL scheme, want \"enode\"") } @@ -143,7 +195,7 @@ func ParseNode(rawurl string) (*Node, error) { return nil, errors.New("invalid discport in query") } } - return newNode(id, ip, uint16(udpPort), uint16(tcpPort)), nil + return NewNode(id, ip, uint16(udpPort), uint16(tcpPort)), nil } // MustParseNode parses a node URL. It panics if the URL is not valid. @@ -180,7 +232,7 @@ func HexID(in string) (NodeID, error) { if err != nil { return id, err } else if len(b) != len(id) { - return id, fmt.Errorf("wrong length, need %d hex bytes", len(id)) + return id, fmt.Errorf("wrong length, want %d hex chars", len(id)*2) } copy(id[:], b) return id, nil @@ -215,7 +267,7 @@ func (id NodeID) Pubkey() (*ecdsa.PublicKey, error) { p.X.SetBytes(id[:half]) p.Y.SetBytes(id[half:]) if !p.Curve.IsOnCurve(p.X, p.Y) { - return nil, errors.New("not a point on the S256 curve") + return nil, errors.New("id is invalid secp256k1 curve point") } return p, nil } diff --git a/p2p/discover/node_test.go b/p2p/discover/node_test.go index e523e12d2..3d1662d0b 100644 --- a/p2p/discover/node_test.go +++ b/p2p/discover/node_test.go @@ -17,10 +17,12 @@ package discover import ( + "fmt" "math/big" "math/rand" "net" "reflect" + "strings" "testing" "testing/quick" "time" @@ -29,6 +31,27 @@ import ( "github.com/ethereum/go-ethereum/crypto" ) +func ExampleNewNode() { + id := MustHexID("1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439") + + // Complete nodes contain UDP and TCP endpoints: + n1 := NewNode(id, net.ParseIP("2001:db8:3c4d:15::abcd:ef12"), 52150, 30303) + fmt.Println("n1:", n1) + fmt.Println("n1.Incomplete() ->", n1.Incomplete()) + + // An incomplete node can be created by passing zero values + // for all parameters except id. + n2 := NewNode(id, nil, 0, 0) + fmt.Println("n2:", n2) + fmt.Println("n2.Incomplete() ->", n2.Incomplete()) + + // Output: + // n1: enode://1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439@[2001:db8:3c4d:15::abcd:ef12]:30303?discport=52150 + // n1.Incomplete() -> false + // n2: enode://1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439 + // n2.Incomplete() -> true +} + var parseNodeTests = []struct { rawurl string wantError string @@ -39,13 +62,10 @@ var parseNodeTests = []struct { wantError: `invalid URL scheme, want "enode"`, }, { - rawurl: "enode://foobar", - wantError: `does not contain node ID`, - }, - { rawurl: "enode://01010101@123.124.125.126:3", - wantError: `invalid node ID (wrong length, need 64 hex bytes)`, + wantError: `invalid node ID (wrong length, want 128 hex chars)`, }, + // Complete nodes with IP address. { rawurl: "enode://1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439@hostname:3", wantError: `invalid IP address`, @@ -60,7 +80,7 @@ var parseNodeTests = []struct { }, { rawurl: "enode://1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439@127.0.0.1:52150", - wantResult: newNode( + wantResult: NewNode( MustHexID("0x1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), net.IP{0x7f, 0x0, 0x0, 0x1}, 52150, @@ -69,7 +89,7 @@ var parseNodeTests = []struct { }, { rawurl: "enode://1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439@[::]:52150", - wantResult: newNode( + wantResult: NewNode( MustHexID("0x1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), net.ParseIP("::"), 52150, @@ -78,7 +98,7 @@ var parseNodeTests = []struct { }, { rawurl: "enode://1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439@[2001:db8:3c4d:15::abcd:ef12]:52150", - wantResult: newNode( + wantResult: NewNode( MustHexID("0x1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), net.ParseIP("2001:db8:3c4d:15::abcd:ef12"), 52150, @@ -87,33 +107,62 @@ var parseNodeTests = []struct { }, { rawurl: "enode://1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439@127.0.0.1:52150?discport=22334", - wantResult: newNode( + wantResult: NewNode( MustHexID("0x1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), net.IP{0x7f, 0x0, 0x0, 0x1}, 22334, 52150, ), }, + // Incomplete nodes with no address. + { + rawurl: "1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439", + wantResult: NewNode( + MustHexID("0x1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + nil, 0, 0, + ), + }, + { + rawurl: "enode://1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439", + wantResult: NewNode( + MustHexID("0x1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + nil, 0, 0, + ), + }, + // Invalid URLs + { + rawurl: "01010101", + wantError: `invalid node ID (wrong length, want 128 hex chars)`, + }, + { + rawurl: "enode://01010101", + wantError: `invalid node ID (wrong length, want 128 hex chars)`, + }, + { + // This test checks that errors from url.Parse are handled. + rawurl: "://foo", + wantError: `parse ://foo: missing protocol scheme`, + }, } func TestParseNode(t *testing.T) { - for i, test := range parseNodeTests { + for _, test := range parseNodeTests { n, err := ParseNode(test.rawurl) if test.wantError != "" { if err == nil { - t.Errorf("test %d: got nil error, expected %#q", i, test.wantError) + t.Errorf("test %q:\n got nil error, expected %#q", test.rawurl, test.wantError) continue } else if err.Error() != test.wantError { - t.Errorf("test %d: got error %#q, expected %#q", i, err.Error(), test.wantError) + t.Errorf("test %q:\n got error %#q, expected %#q", test.rawurl, err.Error(), test.wantError) continue } } else { if err != nil { - t.Errorf("test %d: unexpected error: %v", i, err) + t.Errorf("test %q:\n unexpected error: %v", test.rawurl, err) continue } if !reflect.DeepEqual(n, test.wantResult) { - t.Errorf("test %d: result mismatch:\ngot: %#v, want: %#v", i, n, test.wantResult) + t.Errorf("test %q:\n result mismatch:\ngot: %#v, want: %#v", test.rawurl, n, test.wantResult) } } } @@ -121,12 +170,11 @@ func TestParseNode(t *testing.T) { func TestNodeString(t *testing.T) { for i, test := range parseNodeTests { - if test.wantError != "" { - continue - } - str := test.wantResult.String() - if str != test.rawurl { - t.Errorf("test %d: Node.String() mismatch:\ngot: %s\nwant: %s", i, str, test.rawurl) + if test.wantError == "" && strings.HasPrefix(test.rawurl, "enode://") { + str := test.wantResult.String() + if str != test.rawurl { + t.Errorf("test %d: Node.String() mismatch:\ngot: %s\nwant: %s", i, str, test.rawurl) + } } } } diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 298ba3fa6..abb7980f8 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -25,6 +25,7 @@ package discover import ( "crypto/rand" "encoding/binary" + "fmt" "net" "sort" "sync" @@ -56,7 +57,7 @@ type Table struct { nursery []*Node // bootstrap nodes db *nodeDB // database of known nodes - refreshReq chan struct{} + refreshReq chan chan struct{} closeReq chan struct{} closed chan struct{} @@ -99,10 +100,10 @@ func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string tab := &Table{ net: t, db: db, - self: newNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port)), + self: NewNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port)), bonding: make(map[NodeID]*bondproc), bondslots: make(chan struct{}, maxBondingPingPongs), - refreshReq: make(chan struct{}), + refreshReq: make(chan chan struct{}), closeReq: make(chan struct{}), closed: make(chan struct{}), } @@ -179,21 +180,49 @@ func (tab *Table) Close() { } } -// Bootstrap sets the bootstrap nodes. These nodes are used to connect -// to the network if the table is empty. Bootstrap will also attempt to -// fill the table by performing random lookup operations on the -// network. -func (tab *Table) Bootstrap(nodes []*Node) { +// SetFallbackNodes sets the initial points of contact. These nodes +// are used to connect to the network if the table is empty and there +// are no known nodes in the database. +func (tab *Table) SetFallbackNodes(nodes []*Node) error { + for _, n := range nodes { + if err := n.validateComplete(); err != nil { + return fmt.Errorf("bad bootstrap/fallback node %q (%v)", n, err) + } + } tab.mutex.Lock() - // TODO: maybe filter nodes with bad fields (nil, etc.) to avoid strange crashes tab.nursery = make([]*Node, 0, len(nodes)) for _, n := range nodes { cpy := *n + // Recompute cpy.sha because the node might not have been + // created by NewNode or ParseNode. cpy.sha = crypto.Sha3Hash(n.ID[:]) tab.nursery = append(tab.nursery, &cpy) } tab.mutex.Unlock() - tab.requestRefresh() + tab.refresh() + return nil +} + +// Resolve searches for a specific node with the given ID. +// It returns nil if the node could not be found. +func (tab *Table) Resolve(targetID NodeID) *Node { + // If the node is present in the local table, no + // network interaction is required. + hash := crypto.Sha3Hash(targetID[:]) + tab.mutex.Lock() + cl := tab.closest(hash, 1) + tab.mutex.Unlock() + if len(cl.entries) > 0 && cl.entries[0].ID == targetID { + return cl.entries[0] + } + // Otherwise, do a network lookup. + result := tab.Lookup(targetID) + for _, n := range result { + if n.ID == targetID { + return n + } + } + return nil } // Lookup performs a network search for nodes close @@ -202,26 +231,36 @@ func (tab *Table) Bootstrap(nodes []*Node) { // The given target does not need to be an actual node // identifier. func (tab *Table) Lookup(targetID NodeID) []*Node { + return tab.lookup(targetID, true) +} + +func (tab *Table) lookup(targetID NodeID, refreshIfEmpty bool) []*Node { var ( target = crypto.Sha3Hash(targetID[:]) asked = make(map[NodeID]bool) seen = make(map[NodeID]bool) reply = make(chan []*Node, alpha) pendingQueries = 0 + result *nodesByDistance ) // don't query further if we hit ourself. // unlikely to happen often in practice. asked[tab.self.ID] = true - tab.mutex.Lock() - // generate initial result set - result := tab.closest(target, bucketSize) - tab.mutex.Unlock() - - // If the result set is empty, all nodes were dropped, refresh. - if len(result.entries) == 0 { - tab.requestRefresh() - return nil + for { + tab.mutex.Lock() + // generate initial result set + result = tab.closest(target, bucketSize) + tab.mutex.Unlock() + if len(result.entries) > 0 || !refreshIfEmpty { + break + } + // The result set is empty, all nodes were dropped, refresh. + // We actually wait for the refresh to complete here. The very + // first query will hit this case and run the bootstrapping + // logic. + <-tab.refresh() + refreshIfEmpty = false } for { @@ -265,24 +304,24 @@ func (tab *Table) Lookup(targetID NodeID) []*Node { return result.entries } -func (tab *Table) requestRefresh() { +func (tab *Table) refresh() <-chan struct{} { + done := make(chan struct{}) select { - case tab.refreshReq <- struct{}{}: + case tab.refreshReq <- done: case <-tab.closed: + close(done) } + return done } +// refreshLoop schedules doRefresh runs and coordinates shutdown. func (tab *Table) refreshLoop() { - defer func() { - tab.db.close() - if tab.net != nil { - tab.net.close() - } - close(tab.closed) - }() - - timer := time.NewTicker(autoRefreshInterval) - var done chan struct{} + var ( + timer = time.NewTicker(autoRefreshInterval) + waiting []chan struct{} // accumulates waiting callers while doRefresh runs + done chan struct{} // where doRefresh reports completion + ) +loop: for { select { case <-timer.C: @@ -290,20 +329,34 @@ func (tab *Table) refreshLoop() { done = make(chan struct{}) go tab.doRefresh(done) } - case <-tab.refreshReq: + case req := <-tab.refreshReq: + waiting = append(waiting, req) if done == nil { done = make(chan struct{}) go tab.doRefresh(done) } case <-done: + for _, ch := range waiting { + close(ch) + } + waiting = nil done = nil case <-tab.closeReq: - if done != nil { - <-done - } - return + break loop } } + + if tab.net != nil { + tab.net.close() + } + if done != nil { + <-done + } + for _, ch := range waiting { + close(ch) + } + tab.db.close() + close(tab.closed) } // doRefresh performs a lookup for a random target to keep buckets @@ -320,7 +373,7 @@ func (tab *Table) doRefresh(done chan struct{}) { // We perform a lookup with a random target instead. var target NodeID rand.Read(target[:]) - result := tab.Lookup(target) + result := tab.lookup(target, false) if len(result) > 0 { return } @@ -344,7 +397,7 @@ func (tab *Table) doRefresh(done chan struct{}) { tab.mutex.Unlock() // Finally, do a self lookup to fill up the buckets. - tab.Lookup(tab.self.ID) + tab.lookup(tab.self.ID, false) } // closest returns the n nodes in the table that are closest to the @@ -466,7 +519,7 @@ func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAdd tab.net.waitping(id) } // Bonding succeeded, update the node database. - w.n = newNode(id, addr.IP, uint16(addr.Port), tcpPort) + w.n = NewNode(id, addr.IP, uint16(addr.Port), tcpPort) tab.db.updateNode(w.n) close(w.done) } diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go index 13effaed6..30a418f44 100644 --- a/p2p/discover/table_test.go +++ b/p2p/discover/table_test.go @@ -36,7 +36,7 @@ func TestTable_pingReplace(t *testing.T) { transport := newPingRecorder() tab, _ := newTable(transport, NodeID{}, &net.UDPAddr{}, "") defer tab.Close() - pingSender := newNode(MustHexID("a502af0f59b2aab7746995408c79e9ca312d2793cc997e44fc55eda62f0150bbb8c59a6f9269ba3a081518b62699ee807c7c19c20125ddfccca872608af9e370"), net.IP{}, 99, 99) + pingSender := NewNode(MustHexID("a502af0f59b2aab7746995408c79e9ca312d2793cc997e44fc55eda62f0150bbb8c59a6f9269ba3a081518b62699ee807c7c19c20125ddfccca872608af9e370"), net.IP{}, 99, 99) // fill up the sender's bucket. last := fillBucket(tab, 253) @@ -287,7 +287,7 @@ func TestTable_Lookup(t *testing.T) { t.Fatalf("lookup on empty table returned %d results: %#v", len(results), results) } // seed table with initial node (otherwise lookup will terminate immediately) - seed := newNode(lookupTestnet.dists[256][0], net.IP{}, 256, 0) + seed := NewNode(lookupTestnet.dists[256][0], net.IP{}, 256, 0) tab.stuff([]*Node{seed}) results := tab.Lookup(lookupTestnet.target) @@ -517,7 +517,7 @@ func (tn *preminedTestnet) findnode(toid NodeID, toaddr *net.UDPAddr, target Nod next := uint16(toaddr.Port) - 1 var result []*Node for i, id := range tn.dists[toaddr.Port] { - result = append(result, newNode(id, net.ParseIP("127.0.0.1"), next, uint16(i))) + result = append(result, NewNode(id, net.ParseIP("127.0.0.1"), next, uint16(i))) } return result, nil } diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go index fc7fa737c..72b2a45e5 100644 --- a/p2p/discover/udp.go +++ b/p2p/discover/udp.go @@ -114,13 +114,11 @@ func makeEndpoint(addr *net.UDPAddr, tcpPort uint16) rpcEndpoint { return rpcEndpoint{IP: ip, UDP: uint16(addr.Port), TCP: tcpPort} } -func nodeFromRPC(rn rpcNode) (n *Node, valid bool) { +func nodeFromRPC(rn rpcNode) (*Node, error) { // TODO: don't accept localhost, LAN addresses from internet hosts - // TODO: check public key is on secp256k1 curve - if rn.IP.IsMulticast() || rn.IP.IsUnspecified() || rn.UDP == 0 { - return nil, false - } - return newNode(rn.ID, rn.IP, rn.UDP, rn.TCP), true + n := NewNode(rn.ID, rn.IP, rn.UDP, rn.TCP) + err := n.validateComplete() + return n, err } func nodeToRPC(n *Node) rpcNode { @@ -271,7 +269,7 @@ func (t *udp) findnode(toid NodeID, toaddr *net.UDPAddr, target NodeID) ([]*Node reply := r.(*neighbors) for _, rn := range reply.Nodes { nreceived++ - if n, valid := nodeFromRPC(rn); valid { + if n, err := nodeFromRPC(rn); err == nil { nodes = append(nodes, n) } } diff --git a/p2p/discover/udp_test.go b/p2p/discover/udp_test.go index 944e73d6e..8ed12b8ec 100644 --- a/p2p/discover/udp_test.go +++ b/p2p/discover/udp_test.go @@ -167,16 +167,17 @@ func TestUDP_responseTimeouts(t *testing.T) { binary.BigEndian.PutUint64(p.from[:], uint64(i)) if p.ptype <= 128 { p.errc = timeoutErr + test.udp.addpending <- p nTimeouts++ } else { p.errc = nilErr + test.udp.addpending <- p time.AfterFunc(randomDuration(60*time.Millisecond), func() { if !test.udp.handleReply(p.from, p.ptype, nil) { t.Logf("not matched: %v", p) } }) } - test.udp.addpending <- p time.Sleep(randomDuration(30 * time.Millisecond)) } @@ -243,7 +244,7 @@ func TestUDP_findnode(t *testing.T) { // ensure there's a bond with the test node, // findnode won't be accepted otherwise. - test.table.db.updateNode(newNode( + test.table.db.updateNode(NewNode( PubkeyID(&test.remotekey.PublicKey), test.remoteaddr.IP, uint16(test.remoteaddr.Port), diff --git a/p2p/server.go b/p2p/server.go index 7991585f1..52d1be677 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -334,6 +334,9 @@ func (srv *Server) Start() (err error) { if err != nil { return err } + if err := ntab.SetFallbackNodes(srv.BootstrapNodes); err != nil { + return err + } srv.ntab = ntab } |