diff options
Diffstat (limited to 'p2p/discover/table.go')
-rw-r--r-- | p2p/discover/table.go | 84 |
1 files changed, 39 insertions, 45 deletions
diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 9f7f1d41b..ba4c06327 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -75,8 +75,10 @@ type Table struct { net transport refreshReq chan chan struct{} initDone chan struct{} - closeReq chan struct{} - closed chan struct{} + + closeOnce sync.Once + closeReq chan struct{} + closed chan struct{} nodeAddedHook func(*node) // for testing } @@ -180,16 +182,14 @@ func (tab *Table) ReadRandomNodes(buf []*enode.Node) (n int) { // Close terminates the network listener and flushes the node database. func (tab *Table) Close() { - if tab.net != nil { - tab.net.close() - } - - select { - case <-tab.closed: - // already closed. - case tab.closeReq <- struct{}{}: - <-tab.closed // wait for refreshLoop to end. - } + tab.closeOnce.Do(func() { + if tab.net != nil { + tab.net.close() + } + // Wait for loop to end. + close(tab.closeReq) + <-tab.closed + }) } // setFallbackNodes sets the initial points of contact. These nodes @@ -290,12 +290,16 @@ func (tab *Table) lookup(targetKey encPubkey, refreshIfEmpty bool) []*node { // we have asked all closest nodes, stop the search break } - // wait for the next reply - for _, n := range <-reply { - if n != nil && !seen[n.ID()] { - seen[n.ID()] = true - result.push(n, bucketSize) + select { + case nodes := <-reply: + for _, n := range nodes { + if n != nil && !seen[n.ID()] { + seen[n.ID()] = true + result.push(n, bucketSize) + } } + case <-tab.closeReq: + return nil // shutdown, no need to continue. } pendingQueries-- } @@ -303,18 +307,22 @@ func (tab *Table) lookup(targetKey encPubkey, refreshIfEmpty bool) []*node { } func (tab *Table) findnode(n *node, targetKey encPubkey, reply chan<- []*node) { - fails := tab.db.FindFails(n.ID()) + fails := tab.db.FindFails(n.ID(), n.IP()) r, err := tab.net.findnode(n.ID(), n.addr(), targetKey) - if err != nil || len(r) == 0 { + if err == errClosed { + // Avoid recording failures on shutdown. + reply <- nil + return + } else if err != nil || len(r) == 0 { fails++ - tab.db.UpdateFindFails(n.ID(), fails) + tab.db.UpdateFindFails(n.ID(), n.IP(), fails) log.Trace("Findnode failed", "id", n.ID(), "failcount", fails, "err", err) if fails >= maxFindnodeFailures { log.Trace("Too many findnode failures, dropping", "id", n.ID(), "failcount", fails) tab.delete(n) } } else if fails > 0 { - tab.db.UpdateFindFails(n.ID(), fails-1) + tab.db.UpdateFindFails(n.ID(), n.IP(), fails-1) } // Grab as many nodes as possible. Some of them might not be alive anymore, but we'll @@ -329,7 +337,7 @@ func (tab *Table) refresh() <-chan struct{} { done := make(chan struct{}) select { case tab.refreshReq <- done: - case <-tab.closed: + case <-tab.closeReq: close(done) } return done @@ -433,7 +441,7 @@ func (tab *Table) loadSeedNodes() { seeds = append(seeds, tab.nursery...) for i := range seeds { seed := seeds[i] - age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID())) }} + age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID(), seed.IP())) }} log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age) tab.add(seed) } @@ -458,16 +466,17 @@ func (tab *Table) doRevalidate(done chan<- struct{}) { b := tab.buckets[bi] if err == nil { // The node responded, move it to the front. - log.Debug("Revalidated node", "b", bi, "id", last.ID()) + last.livenessChecks++ + log.Debug("Revalidated node", "b", bi, "id", last.ID(), "checks", last.livenessChecks) b.bump(last) return } // No reply received, pick a replacement or delete the node if there aren't // any replacements. if r := tab.replace(b, last); r != nil { - log.Debug("Replaced dead node", "b", bi, "id", last.ID(), "ip", last.IP(), "r", r.ID(), "rip", r.IP()) + log.Debug("Replaced dead node", "b", bi, "id", last.ID(), "ip", last.IP(), "checks", last.livenessChecks, "r", r.ID(), "rip", r.IP()) } else { - log.Debug("Removed dead node", "b", bi, "id", last.ID(), "ip", last.IP()) + log.Debug("Removed dead node", "b", bi, "id", last.ID(), "ip", last.IP(), "checks", last.livenessChecks) } } @@ -502,7 +511,7 @@ func (tab *Table) copyLiveNodes() { now := time.Now() for _, b := range &tab.buckets { for _, n := range b.entries { - if now.Sub(n.addedAt) >= seedMinTableTime { + if n.livenessChecks > 0 && now.Sub(n.addedAt) >= seedMinTableTime { tab.db.UpdateNode(unwrapNode(n)) } } @@ -518,7 +527,9 @@ func (tab *Table) closest(target enode.ID, nresults int) *nodesByDistance { close := &nodesByDistance{target: target} for _, b := range &tab.buckets { for _, n := range b.entries { - close.push(n, nresults) + if n.livenessChecks > 0 { + close.push(n, nresults) + } } } return close @@ -572,23 +583,6 @@ func (tab *Table) addThroughPing(n *node) { tab.add(n) } -// stuff adds nodes the table to the end of their corresponding bucket -// if the bucket is not full. The caller must not hold tab.mutex. -func (tab *Table) stuff(nodes []*node) { - tab.mutex.Lock() - defer tab.mutex.Unlock() - - for _, n := range nodes { - if n.ID() == tab.self().ID() { - continue // don't add self - } - b := tab.bucket(n.ID()) - if len(b.entries) < bucketSize { - tab.bumpOrAdd(b, n) - } - } -} - // delete removes an entry from the node table. It is used to evacuate dead nodes. func (tab *Table) delete(node *node) { tab.mutex.Lock() |