From 01ed3fa1a9414328eb6c4fc839e1b2044a786a7a Mon Sep 17 00:00:00 2001
From: Felix Lange <fjl@twurst.com>
Date: Fri, 7 Aug 2015 00:10:26 +0200
Subject: p2p/discover: unlock the table during ping replacement

Table.mutex was being held while waiting for a reply packet, which
effectively made many parts of the whole stack block on that packet,
including the net_peerCount RPC call.
---
 p2p/discover/table.go      | 121 +++++++++++++++++++++++++++------------------
 p2p/discover/table_test.go |   6 +--
 p2p/discover/udp_test.go   |   2 +-
 3 files changed, 77 insertions(+), 52 deletions(-)

(limited to 'p2p')

diff --git a/p2p/discover/table.go b/p2p/discover/table.go
index 67f7ec46f..b077f010c 100644
--- a/p2p/discover/table.go
+++ b/p2p/discover/table.go
@@ -78,9 +78,8 @@ type transport interface {
 	close()
 }
 
-// bucket contains nodes, ordered by their last activity.
-// the entry that was most recently active is the last element
-// in entries.
+// bucket contains nodes, ordered by their last activity. the entry
+// that was most recently active is the first element in entries.
 type bucket struct {
 	lastLookup time.Time
 	entries    []*Node
@@ -235,7 +234,7 @@ func (tab *Table) Lookup(targetID NodeID) []*Node {
 
 						if fails >= maxFindnodeFailures {
 							glog.V(logger.Detail).Infof("Evacuating node %x: %d findnode failures", n.ID[:8], fails)
-							tab.del(n)
+							tab.delete(n)
 						}
 					}
 					reply <- tab.bondall(r)
@@ -401,15 +400,11 @@ func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16
 			node = w.n
 		}
 	}
-	// Even if bonding temporarily failed, give the node a chance
 	if node != nil {
-		tab.mutex.Lock()
-		defer tab.mutex.Unlock()
-
-		b := tab.buckets[logdist(tab.self.sha, node.sha)]
-		if !b.bump(node) {
-			tab.pingreplace(node, b)
-		}
+		// Add the node to the table even if the bonding ping/pong
+		// fails. It will be relaced quickly if it continues to be
+		// unresponsive.
+		tab.add(node)
 		tab.db.updateFindFails(id, 0)
 	}
 	return node, result
@@ -420,7 +415,7 @@ func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAdd
 	<-tab.bondslots
 	defer func() { tab.bondslots <- struct{}{} }()
 
-	// Ping the remote side and wait for a pong
+	// Ping the remote side and wait for a pong.
 	if w.err = tab.ping(id, addr); w.err != nil {
 		close(w.done)
 		return
@@ -431,33 +426,14 @@ func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAdd
 		// waitping will simply time out.
 		tab.net.waitping(id)
 	}
-	// Bonding succeeded, update the node database
+	// Bonding succeeded, update the node database.
 	w.n = newNode(id, addr.IP, uint16(addr.Port), tcpPort)
 	tab.db.updateNode(w.n)
 	close(w.done)
 }
 
-func (tab *Table) pingreplace(new *Node, b *bucket) {
-	if len(b.entries) == bucketSize {
-		oldest := b.entries[bucketSize-1]
-		if err := tab.ping(oldest.ID, oldest.addr()); err == nil {
-			// The node responded, we don't need to replace it.
-			return
-		}
-	} else {
-		// Add a slot at the end so the last entry doesn't
-		// fall off when adding the new node.
-		b.entries = append(b.entries, nil)
-	}
-	copy(b.entries[1:], b.entries)
-	b.entries[0] = new
-	if tab.nodeAddedHook != nil {
-		tab.nodeAddedHook(new)
-	}
-}
-
-// ping a remote endpoint and wait for a reply, also updating the node database
-// accordingly.
+// ping a remote endpoint and wait for a reply, also updating the node
+// database accordingly.
 func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error {
 	// Update the last ping and send the message
 	tab.db.updateLastPing(id, time.Now())
@@ -467,24 +443,53 @@ func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error {
 	// Pong received, update the database and return
 	tab.db.updateLastPong(id, time.Now())
 	tab.db.ensureExpirer()
-
 	return nil
 }
 
-// add puts the entries into the table if their corresponding
-// bucket is not full. The caller must hold tab.mutex.
-func (tab *Table) add(entries []*Node) {
+// add attempts to add the given node its corresponding bucket. If the
+// bucket has space available, adding the node succeeds immediately.
+// Otherwise, the node is added if the least recently active node in
+// the bucket does not respond to a ping packet.
+//
+// The caller must not hold tab.mutex.
+func (tab *Table) add(new *Node) {
+	b := tab.buckets[logdist(tab.self.sha, new.sha)]
+	tab.mutex.Lock()
+	if b.bump(new) {
+		tab.mutex.Unlock()
+		return
+	}
+	var oldest *Node
+	if len(b.entries) == bucketSize {
+		oldest = b.entries[bucketSize-1]
+		// Let go of the mutex so other goroutines can access
+		// the table while we ping the least recently active node.
+		tab.mutex.Unlock()
+		if err := tab.ping(oldest.ID, oldest.addr()); err == nil {
+			// The node responded, don't replace it.
+			return
+		}
+		tab.mutex.Lock()
+	}
+	added := b.replace(new, oldest)
+	tab.mutex.Unlock()
+	if added && tab.nodeAddedHook != nil {
+		tab.nodeAddedHook(new)
+	}
+}
+
+// stuff adds nodes the table to the end of their corresponding bucket
+// if the bucket is not full. The caller must hold tab.mutex.
+func (tab *Table) stuff(nodes []*Node) {
 outer:
-	for _, n := range entries {
+	for _, n := range nodes {
 		if n.ID == tab.self.ID {
-			// don't add self.
-			continue
+			continue // don't add self
 		}
 		bucket := tab.buckets[logdist(tab.self.sha, n.sha)]
 		for i := range bucket.entries {
 			if bucket.entries[i].ID == n.ID {
-				// already in bucket
-				continue outer
+				continue outer // already in bucket
 			}
 		}
 		if len(bucket.entries) < bucketSize {
@@ -496,12 +501,11 @@ outer:
 	}
 }
 
-// del removes an entry from the node table (used to evacuate failed/non-bonded
-// discovery peers).
-func (tab *Table) del(node *Node) {
+// delete removes an entry from the node table (used to evacuate
+// failed/non-bonded discovery peers).
+func (tab *Table) delete(node *Node) {
 	tab.mutex.Lock()
 	defer tab.mutex.Unlock()
-
 	bucket := tab.buckets[logdist(tab.self.sha, node.sha)]
 	for i := range bucket.entries {
 		if bucket.entries[i].ID == node.ID {
@@ -511,6 +515,27 @@ func (tab *Table) del(node *Node) {
 	}
 }
 
+func (b *bucket) replace(n *Node, last *Node) bool {
+	// Don't add if b already contains n.
+	for i := range b.entries {
+		if b.entries[i].ID == n.ID {
+			return false
+		}
+	}
+	// Replace last if it is still the last entry or just add n if b
+	// isn't full. If is no longer the last entry, it has either been
+	// replaced with someone else or became active.
+	if len(b.entries) == bucketSize && (last == nil || b.entries[bucketSize-1].ID != last.ID) {
+		return false
+	}
+	if len(b.entries) < bucketSize {
+		b.entries = append(b.entries, nil)
+	}
+	copy(b.entries[1:], b.entries)
+	b.entries[0] = n
+	return true
+}
+
 func (b *bucket) bump(n *Node) bool {
 	for i := range b.entries {
 		if b.entries[i].ID == n.ID {
diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go
index d259177bf..426f4e9cc 100644
--- a/p2p/discover/table_test.go
+++ b/p2p/discover/table_test.go
@@ -178,8 +178,8 @@ func TestTable_closest(t *testing.T) {
 	test := func(test *closeTest) bool {
 		// for any node table, Target and N
 		tab := newTable(nil, test.Self, &net.UDPAddr{}, "")
-		tab.add(test.All)
 		defer tab.Close()
+		tab.stuff(test.All)
 
 		// check that doClosest(Target, N) returns nodes
 		result := tab.closest(test.Target, test.N).entries
@@ -240,7 +240,7 @@ func TestTable_ReadRandomNodesGetAll(t *testing.T) {
 		defer tab.Close()
 		for i := 0; i < len(buf); i++ {
 			ld := cfg.Rand.Intn(len(tab.buckets))
-			tab.add([]*Node{nodeAtDistance(tab.self.sha, ld)})
+			tab.stuff([]*Node{nodeAtDistance(tab.self.sha, ld)})
 		}
 		gotN := tab.ReadRandomNodes(buf)
 		if gotN != tab.len() {
@@ -288,7 +288,7 @@ func TestTable_Lookup(t *testing.T) {
 	}
 	// seed table with initial node (otherwise lookup will terminate immediately)
 	seed := newNode(lookupTestnet.dists[256][0], net.IP{}, 256, 0)
-	tab.add([]*Node{seed})
+	tab.stuff([]*Node{seed})
 
 	results := tab.Lookup(lookupTestnet.target)
 	t.Logf("results:")
diff --git a/p2p/discover/udp_test.go b/p2p/discover/udp_test.go
index 8d6d3e855..b913424dd 100644
--- a/p2p/discover/udp_test.go
+++ b/p2p/discover/udp_test.go
@@ -167,7 +167,7 @@ func TestUDP_findnode(t *testing.T) {
 	for i := 0; i < bucketSize; i++ {
 		nodes.push(nodeAtDistance(test.table.self.sha, i+2), bucketSize)
 	}
-	test.table.add(nodes.entries)
+	test.table.stuff(nodes.entries)
 
 	// ensure there's a bond with the test node,
 	// findnode won't be accepted otherwise.
-- 
cgit v1.2.3