aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/discover
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/discover')
-rw-r--r--p2p/discover/database.go17
-rw-r--r--p2p/discover/database_test.go11
-rw-r--r--p2p/discover/table.go131
3 files changed, 126 insertions, 33 deletions
diff --git a/p2p/discover/database.go b/p2p/discover/database.go
index 3a3f1254b..1b73c3dea 100644
--- a/p2p/discover/database.go
+++ b/p2p/discover/database.go
@@ -44,9 +44,10 @@ var (
nodeDBVersionKey = []byte("version") // Version of the database to flush if changes
nodeDBItemPrefix = []byte("n:") // Identifier to prefix node entries with
- nodeDBDiscoverRoot = ":discover"
- nodeDBDiscoverPing = nodeDBDiscoverRoot + ":lastping"
- nodeDBDiscoverPong = nodeDBDiscoverRoot + ":lastpong"
+ nodeDBDiscoverRoot = ":discover"
+ nodeDBDiscoverPing = nodeDBDiscoverRoot + ":lastping"
+ nodeDBDiscoverPong = nodeDBDiscoverRoot + ":lastpong"
+ nodeDBDiscoverFindFails = nodeDBDiscoverRoot + ":findfail"
)
// newNodeDB creates a new node database for storing and retrieving infos about
@@ -275,6 +276,16 @@ func (db *nodeDB) updateLastPong(id NodeID, instance time.Time) error {
return db.storeInt64(makeKey(id, nodeDBDiscoverPong), instance.Unix())
}
+// findFails retrieves the number of findnode failures since bonding.
+func (db *nodeDB) findFails(id NodeID) int {
+ return int(db.fetchInt64(makeKey(id, nodeDBDiscoverFindFails)))
+}
+
+// updateFindFails updates the number of findnode failures since bonding.
+func (db *nodeDB) updateFindFails(id NodeID, fails int) error {
+ return db.storeInt64(makeKey(id, nodeDBDiscoverFindFails), int64(fails))
+}
+
// querySeeds retrieves a batch of nodes to be used as potential seed servers
// during bootstrapping the node into the network.
//
diff --git a/p2p/discover/database_test.go b/p2p/discover/database_test.go
index 88f5d2155..4fce164ca 100644
--- a/p2p/discover/database_test.go
+++ b/p2p/discover/database_test.go
@@ -93,6 +93,7 @@ func TestNodeDBFetchStore(t *testing.T) {
30303,
)
inst := time.Now()
+ num := 314
db, _ := newNodeDB("", Version, NodeID{})
defer db.close()
@@ -117,6 +118,16 @@ func TestNodeDBFetchStore(t *testing.T) {
if stored := db.lastPong(node.ID); stored.Unix() != inst.Unix() {
t.Errorf("pong: value mismatch: have %v, want %v", stored, inst)
}
+ // Check fetch/store operations on a node findnode-failure object
+ if stored := db.findFails(node.ID); stored != 0 {
+ t.Errorf("find-node fails: non-existing object: %v", stored)
+ }
+ if err := db.updateFindFails(node.ID, num); err != nil {
+ t.Errorf("find-node fails: failed to update: %v", err)
+ }
+ if stored := db.findFails(node.ID); stored != num {
+ t.Errorf("find-node fails: value mismatch: have %v, want %v", stored, num)
+ }
// Check fetch/store operations on an actual node object
if stored := db.node(node.ID); stored != nil {
t.Errorf("node: non-existing object: %v", stored)
diff --git a/p2p/discover/table.go b/p2p/discover/table.go
index b523a0684..4b7ddb775 100644
--- a/p2p/discover/table.go
+++ b/p2p/discover/table.go
@@ -27,6 +27,7 @@ const (
nBuckets = hashBits + 1 // Number of buckets
maxBondingPingPongs = 16
+ maxFindnodeFailures = 5
)
type Table struct {
@@ -190,6 +191,12 @@ func (tab *Table) Lookup(targetID NodeID) []*Node {
result := tab.closest(target, bucketSize)
tab.mutex.Unlock()
+ // If the result set is empty, all nodes were dropped, refresh
+ if len(result.entries) == 0 {
+ tab.refresh()
+ return nil
+ }
+
for {
// ask the alpha closest nodes that we haven't asked yet
for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ {
@@ -198,7 +205,19 @@ func (tab *Table) Lookup(targetID NodeID) []*Node {
asked[n.ID] = true
pendingQueries++
go func() {
- r, _ := tab.net.findnode(n.ID, n.addr(), targetID)
+ // Find potential neighbors to bond with
+ r, err := tab.net.findnode(n.ID, n.addr(), targetID)
+ if err != nil {
+ // Bump the failure counter to detect and evacuate non-bonded entries
+ fails := tab.db.findFails(n.ID) + 1
+ tab.db.updateFindFails(n.ID, fails)
+ glog.V(logger.Detail).Infof("Bumping failures for %x: %d", n.ID[:8], fails)
+
+ if fails >= maxFindnodeFailures {
+ glog.V(logger.Detail).Infof("Evacuating node %x: %d findnode failures", n.ID[:8], fails)
+ tab.del(n)
+ }
+ }
reply <- tab.bondall(r)
}()
}
@@ -219,30 +238,53 @@ func (tab *Table) Lookup(targetID NodeID) []*Node {
return result.entries
}
-// refresh performs a lookup for a random target to keep buckets full.
+// refresh performs a lookup for a random target to keep buckets full, or seeds
+// the table if it is empty (initial bootstrap or discarded faulty peers).
func (tab *Table) refresh() {
- // The Kademlia paper specifies that the bucket refresh should
- // perform a refresh in the least recently used bucket. We cannot
- // adhere to this because the findnode target is a 512bit value
- // (not hash-sized) and it is not easily possible to generate a
- // sha3 preimage that falls into a chosen bucket.
- //
- // We perform a lookup with a random target instead.
- var target NodeID
- rand.Read(target[:])
- result := tab.Lookup(target)
- if len(result) == 0 {
+ seed := true
+
+ // If the discovery table is empty, seed with previously known nodes
+ tab.mutex.Lock()
+ for _, bucket := range tab.buckets {
+ if len(bucket.entries) > 0 {
+ seed = false
+ break
+ }
+ }
+ tab.mutex.Unlock()
+
+ // If the table is not empty, try to refresh using the live entries
+ if !seed {
+ // The Kademlia paper specifies that the bucket refresh should
+ // perform a refresh in the least recently used bucket. We cannot
+ // adhere to this because the findnode target is a 512bit value
+ // (not hash-sized) and it is not easily possible to generate a
+ // sha3 preimage that falls into a chosen bucket.
+ //
+ // We perform a lookup with a random target instead.
+ var target NodeID
+ rand.Read(target[:])
+
+ result := tab.Lookup(target)
+ if len(result) == 0 {
+ // Lookup failed, seed after all
+ seed = true
+ }
+ }
+
+ if seed {
// Pick a batch of previously know seeds to lookup with
seeds := tab.db.querySeeds(10)
for _, seed := range seeds {
glog.V(logger.Debug).Infoln("Seeding network with", seed)
}
- // Bootstrap the table with a self lookup
- all := tab.bondall(append(tab.nursery, seeds...))
- tab.mutex.Lock()
- tab.add(all)
- tab.mutex.Unlock()
- tab.Lookup(tab.self.ID)
+ nodes := append(tab.nursery, seeds...)
+
+ // Bond with all the seed nodes (will pingpong only if failed recently)
+ bonded := tab.bondall(nodes)
+ if len(bonded) > 0 {
+ tab.Lookup(tab.self.ID)
+ }
// TODO: the Kademlia paper says that we're supposed to perform
// random lookups in all buckets further away than our closest neighbor.
}
@@ -305,8 +347,16 @@ func (tab *Table) bondall(nodes []*Node) (result []*Node) {
// If pinged is true, the remote node has just pinged us and one half
// of the process can be skipped.
func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) (*Node, error) {
- var n *Node
- if n = tab.db.node(id); n == nil {
+ // Retrieve a previously known node and any recent findnode failures
+ node, fails := tab.db.node(id), 0
+ if node != nil {
+ fails = tab.db.findFails(id)
+ }
+ // If the node is unknown (non-bonded) or failed (remotely unknown), bond from scratch
+ var result error
+ if node == nil || fails > 0 {
+ glog.V(logger.Detail).Infof("Bonding %x: known=%v, fails=%v", id[:8], node != nil, fails)
+
tab.bondmu.Lock()
w := tab.bonding[id]
if w != nil {
@@ -325,18 +375,24 @@ func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16
delete(tab.bonding, id)
tab.bondmu.Unlock()
}
- n = w.n
- if w.err != nil {
- return nil, w.err
+ // Retrieve the bonding results
+ result = w.err
+ if result == nil {
+ node = w.n
}
}
- tab.mutex.Lock()
- defer tab.mutex.Unlock()
- b := tab.buckets[logdist(tab.self.sha, n.sha)]
- if !b.bump(n) {
- tab.pingreplace(n, b)
+ // Even if bonding temporarily failed, give the node a chance
+ if node != nil {
+ tab.mutex.Lock()
+ defer tab.mutex.Unlock()
+
+ b := tab.buckets[logdist(tab.self.sha, node.sha)]
+ if !b.bump(node) {
+ tab.pingreplace(node, b)
+ }
+ tab.db.updateFindFails(id, 0)
}
- return n, nil
+ return node, result
}
func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) {
@@ -414,6 +470,21 @@ outer:
}
}
+// del removes an entry from the node table (used to evacuate failed/non-bonded
+// discovery peers).
+func (tab *Table) del(node *Node) {
+ tab.mutex.Lock()
+ defer tab.mutex.Unlock()
+
+ bucket := tab.buckets[logdist(tab.self.sha, node.sha)]
+ for i := range bucket.entries {
+ if bucket.entries[i].ID == node.ID {
+ bucket.entries = append(bucket.entries[:i], bucket.entries[i+1:]...)
+ return
+ }
+ }
+}
+
func (b *bucket) bump(n *Node) bool {
for i := range b.entries {
if b.entries[i].ID == n.ID {