aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/discover/table.go
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/discover/table.go')
-rw-r--r--p2p/discover/table.go84
1 files changed, 39 insertions, 45 deletions
diff --git a/p2p/discover/table.go b/p2p/discover/table.go
index 9f7f1d41b..ba4c06327 100644
--- a/p2p/discover/table.go
+++ b/p2p/discover/table.go
@@ -75,8 +75,10 @@ type Table struct {
net transport
refreshReq chan chan struct{}
initDone chan struct{}
- closeReq chan struct{}
- closed chan struct{}
+
+ closeOnce sync.Once
+ closeReq chan struct{}
+ closed chan struct{}
nodeAddedHook func(*node) // for testing
}
@@ -180,16 +182,14 @@ func (tab *Table) ReadRandomNodes(buf []*enode.Node) (n int) {
// Close terminates the network listener and flushes the node database.
func (tab *Table) Close() {
- if tab.net != nil {
- tab.net.close()
- }
-
- select {
- case <-tab.closed:
- // already closed.
- case tab.closeReq <- struct{}{}:
- <-tab.closed // wait for refreshLoop to end.
- }
+ tab.closeOnce.Do(func() {
+ if tab.net != nil {
+ tab.net.close()
+ }
+ // Wait for loop to end.
+ close(tab.closeReq)
+ <-tab.closed
+ })
}
// setFallbackNodes sets the initial points of contact. These nodes
@@ -290,12 +290,16 @@ func (tab *Table) lookup(targetKey encPubkey, refreshIfEmpty bool) []*node {
// we have asked all closest nodes, stop the search
break
}
- // wait for the next reply
- for _, n := range <-reply {
- if n != nil && !seen[n.ID()] {
- seen[n.ID()] = true
- result.push(n, bucketSize)
+ select {
+ case nodes := <-reply:
+ for _, n := range nodes {
+ if n != nil && !seen[n.ID()] {
+ seen[n.ID()] = true
+ result.push(n, bucketSize)
+ }
}
+ case <-tab.closeReq:
+ return nil // shutdown, no need to continue.
}
pendingQueries--
}
@@ -303,18 +307,22 @@ func (tab *Table) lookup(targetKey encPubkey, refreshIfEmpty bool) []*node {
}
func (tab *Table) findnode(n *node, targetKey encPubkey, reply chan<- []*node) {
- fails := tab.db.FindFails(n.ID())
+ fails := tab.db.FindFails(n.ID(), n.IP())
r, err := tab.net.findnode(n.ID(), n.addr(), targetKey)
- if err != nil || len(r) == 0 {
+ if err == errClosed {
+ // Avoid recording failures on shutdown.
+ reply <- nil
+ return
+ } else if err != nil || len(r) == 0 {
fails++
- tab.db.UpdateFindFails(n.ID(), fails)
+ tab.db.UpdateFindFails(n.ID(), n.IP(), fails)
log.Trace("Findnode failed", "id", n.ID(), "failcount", fails, "err", err)
if fails >= maxFindnodeFailures {
log.Trace("Too many findnode failures, dropping", "id", n.ID(), "failcount", fails)
tab.delete(n)
}
} else if fails > 0 {
- tab.db.UpdateFindFails(n.ID(), fails-1)
+ tab.db.UpdateFindFails(n.ID(), n.IP(), fails-1)
}
// Grab as many nodes as possible. Some of them might not be alive anymore, but we'll
@@ -329,7 +337,7 @@ func (tab *Table) refresh() <-chan struct{} {
done := make(chan struct{})
select {
case tab.refreshReq <- done:
- case <-tab.closed:
+ case <-tab.closeReq:
close(done)
}
return done
@@ -433,7 +441,7 @@ func (tab *Table) loadSeedNodes() {
seeds = append(seeds, tab.nursery...)
for i := range seeds {
seed := seeds[i]
- age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID())) }}
+ age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID(), seed.IP())) }}
log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age)
tab.add(seed)
}
@@ -458,16 +466,17 @@ func (tab *Table) doRevalidate(done chan<- struct{}) {
b := tab.buckets[bi]
if err == nil {
// The node responded, move it to the front.
- log.Debug("Revalidated node", "b", bi, "id", last.ID())
+ last.livenessChecks++
+ log.Debug("Revalidated node", "b", bi, "id", last.ID(), "checks", last.livenessChecks)
b.bump(last)
return
}
// No reply received, pick a replacement or delete the node if there aren't
// any replacements.
if r := tab.replace(b, last); r != nil {
- log.Debug("Replaced dead node", "b", bi, "id", last.ID(), "ip", last.IP(), "r", r.ID(), "rip", r.IP())
+ log.Debug("Replaced dead node", "b", bi, "id", last.ID(), "ip", last.IP(), "checks", last.livenessChecks, "r", r.ID(), "rip", r.IP())
} else {
- log.Debug("Removed dead node", "b", bi, "id", last.ID(), "ip", last.IP())
+ log.Debug("Removed dead node", "b", bi, "id", last.ID(), "ip", last.IP(), "checks", last.livenessChecks)
}
}
@@ -502,7 +511,7 @@ func (tab *Table) copyLiveNodes() {
now := time.Now()
for _, b := range &tab.buckets {
for _, n := range b.entries {
- if now.Sub(n.addedAt) >= seedMinTableTime {
+ if n.livenessChecks > 0 && now.Sub(n.addedAt) >= seedMinTableTime {
tab.db.UpdateNode(unwrapNode(n))
}
}
@@ -518,7 +527,9 @@ func (tab *Table) closest(target enode.ID, nresults int) *nodesByDistance {
close := &nodesByDistance{target: target}
for _, b := range &tab.buckets {
for _, n := range b.entries {
- close.push(n, nresults)
+ if n.livenessChecks > 0 {
+ close.push(n, nresults)
+ }
}
}
return close
@@ -572,23 +583,6 @@ func (tab *Table) addThroughPing(n *node) {
tab.add(n)
}
-// stuff adds nodes the table to the end of their corresponding bucket
-// if the bucket is not full. The caller must not hold tab.mutex.
-func (tab *Table) stuff(nodes []*node) {
- tab.mutex.Lock()
- defer tab.mutex.Unlock()
-
- for _, n := range nodes {
- if n.ID() == tab.self().ID() {
- continue // don't add self
- }
- b := tab.bucket(n.ID())
- if len(b.entries) < bucketSize {
- tab.bumpOrAdd(b, n)
- }
- }
-}
-
// delete removes an entry from the node table. It is used to evacuate dead nodes.
func (tab *Table) delete(node *node) {
tab.mutex.Lock()