aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/discover/table.go
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/discover/table.go')
-rw-r--r--p2p/discover/table.go135
1 files changed, 81 insertions, 54 deletions
diff --git a/p2p/discover/table.go b/p2p/discover/table.go
index 70adfaddc..b077f010c 100644
--- a/p2p/discover/table.go
+++ b/p2p/discover/table.go
@@ -1,18 +1,18 @@
// Copyright 2015 The go-ethereum Authors
-// This file is part of go-ethereum.
+// This file is part of the go-ethereum library.
//
-// go-ethereum is free software: you can redistribute it and/or modify
+// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
-// go-ethereum is distributed in the hope that it will be useful,
+// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
-// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package discover implements the Node Discovery Protocol.
//
@@ -78,9 +78,8 @@ type transport interface {
close()
}
-// bucket contains nodes, ordered by their last activity.
-// the entry that was most recently active is the last element
-// in entries.
+// bucket contains nodes, ordered by their last activity. the entry
+// that was most recently active is the first element in entries.
type bucket struct {
lastLookup time.Time
entries []*Node
@@ -164,7 +163,9 @@ func randUint(max uint32) uint32 {
// Close terminates the network listener and flushes the node database.
func (tab *Table) Close() {
- tab.net.close()
+ if tab.net != nil {
+ tab.net.close()
+ }
tab.db.close()
}
@@ -233,7 +234,7 @@ func (tab *Table) Lookup(targetID NodeID) []*Node {
if fails >= maxFindnodeFailures {
glog.V(logger.Detail).Infof("Evacuating node %x: %d findnode failures", n.ID[:8], fails)
- tab.del(n)
+ tab.delete(n)
}
}
reply <- tab.bondall(r)
@@ -399,15 +400,11 @@ func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16
node = w.n
}
}
- // Even if bonding temporarily failed, give the node a chance
if node != nil {
- tab.mutex.Lock()
- defer tab.mutex.Unlock()
-
- b := tab.buckets[logdist(tab.self.sha, node.sha)]
- if !b.bump(node) {
- tab.pingreplace(node, b)
- }
+ // Add the node to the table even if the bonding ping/pong
+ // fails. It will be relaced quickly if it continues to be
+ // unresponsive.
+ tab.add(node)
tab.db.updateFindFails(id, 0)
}
return node, result
@@ -418,7 +415,7 @@ func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAdd
<-tab.bondslots
defer func() { tab.bondslots <- struct{}{} }()
- // Ping the remote side and wait for a pong
+ // Ping the remote side and wait for a pong.
if w.err = tab.ping(id, addr); w.err != nil {
close(w.done)
return
@@ -429,33 +426,14 @@ func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAdd
// waitping will simply time out.
tab.net.waitping(id)
}
- // Bonding succeeded, update the node database
+ // Bonding succeeded, update the node database.
w.n = newNode(id, addr.IP, uint16(addr.Port), tcpPort)
tab.db.updateNode(w.n)
close(w.done)
}
-func (tab *Table) pingreplace(new *Node, b *bucket) {
- if len(b.entries) == bucketSize {
- oldest := b.entries[bucketSize-1]
- if err := tab.ping(oldest.ID, oldest.addr()); err == nil {
- // The node responded, we don't need to replace it.
- return
- }
- } else {
- // Add a slot at the end so the last entry doesn't
- // fall off when adding the new node.
- b.entries = append(b.entries, nil)
- }
- copy(b.entries[1:], b.entries)
- b.entries[0] = new
- if tab.nodeAddedHook != nil {
- tab.nodeAddedHook(new)
- }
-}
-
-// ping a remote endpoint and wait for a reply, also updating the node database
-// accordingly.
+// ping a remote endpoint and wait for a reply, also updating the node
+// database accordingly.
func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error {
// Update the last ping and send the message
tab.db.updateLastPing(id, time.Now())
@@ -465,24 +443,53 @@ func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error {
// Pong received, update the database and return
tab.db.updateLastPong(id, time.Now())
tab.db.ensureExpirer()
-
return nil
}
-// add puts the entries into the table if their corresponding
-// bucket is not full. The caller must hold tab.mutex.
-func (tab *Table) add(entries []*Node) {
+// add attempts to add the given node its corresponding bucket. If the
+// bucket has space available, adding the node succeeds immediately.
+// Otherwise, the node is added if the least recently active node in
+// the bucket does not respond to a ping packet.
+//
+// The caller must not hold tab.mutex.
+func (tab *Table) add(new *Node) {
+ b := tab.buckets[logdist(tab.self.sha, new.sha)]
+ tab.mutex.Lock()
+ if b.bump(new) {
+ tab.mutex.Unlock()
+ return
+ }
+ var oldest *Node
+ if len(b.entries) == bucketSize {
+ oldest = b.entries[bucketSize-1]
+ // Let go of the mutex so other goroutines can access
+ // the table while we ping the least recently active node.
+ tab.mutex.Unlock()
+ if err := tab.ping(oldest.ID, oldest.addr()); err == nil {
+ // The node responded, don't replace it.
+ return
+ }
+ tab.mutex.Lock()
+ }
+ added := b.replace(new, oldest)
+ tab.mutex.Unlock()
+ if added && tab.nodeAddedHook != nil {
+ tab.nodeAddedHook(new)
+ }
+}
+
+// stuff adds nodes the table to the end of their corresponding bucket
+// if the bucket is not full. The caller must hold tab.mutex.
+func (tab *Table) stuff(nodes []*Node) {
outer:
- for _, n := range entries {
+ for _, n := range nodes {
if n.ID == tab.self.ID {
- // don't add self.
- continue
+ continue // don't add self
}
bucket := tab.buckets[logdist(tab.self.sha, n.sha)]
for i := range bucket.entries {
if bucket.entries[i].ID == n.ID {
- // already in bucket
- continue outer
+ continue outer // already in bucket
}
}
if len(bucket.entries) < bucketSize {
@@ -494,12 +501,11 @@ outer:
}
}
-// del removes an entry from the node table (used to evacuate failed/non-bonded
-// discovery peers).
-func (tab *Table) del(node *Node) {
+// delete removes an entry from the node table (used to evacuate
+// failed/non-bonded discovery peers).
+func (tab *Table) delete(node *Node) {
tab.mutex.Lock()
defer tab.mutex.Unlock()
-
bucket := tab.buckets[logdist(tab.self.sha, node.sha)]
for i := range bucket.entries {
if bucket.entries[i].ID == node.ID {
@@ -509,6 +515,27 @@ func (tab *Table) del(node *Node) {
}
}
+func (b *bucket) replace(n *Node, last *Node) bool {
+ // Don't add if b already contains n.
+ for i := range b.entries {
+ if b.entries[i].ID == n.ID {
+ return false
+ }
+ }
+ // Replace last if it is still the last entry or just add n if b
+ // isn't full. If is no longer the last entry, it has either been
+ // replaced with someone else or became active.
+ if len(b.entries) == bucketSize && (last == nil || b.entries[bucketSize-1].ID != last.ID) {
+ return false
+ }
+ if len(b.entries) < bucketSize {
+ b.entries = append(b.entries, nil)
+ }
+ copy(b.entries[1:], b.entries)
+ b.entries[0] = n
+ return true
+}
+
func (b *bucket) bump(n *Node) bool {
for i := range b.entries {
if b.entries[i].ID == n.ID {