From 86ec742f975d825f42dd69ebf17b0adaa66542c0 Mon Sep 17 00:00:00 2001
From: Felix Lange <fjl@users.noreply.github.com>
Date: Thu, 31 Jan 2019 11:48:54 +0100
Subject: p2p/discover: improve table addition code (#18974)

This change clears up confusion around the two ways in which nodes
can be added to the table.

When a neighbors packet is received as a reply to findnode, the nodes
contained in the reply are added as 'seen' entries if sufficient space
is available.

When a ping is received and the endpoint verification has taken place,
the remote node is added as a 'verified' entry or moved to the front of
the bucket if present. This also updates the node's IP address and port
if they have changed.
---
 p2p/discover/table.go           | 117 +++++++++++++++++++++++++++-------------
 p2p/discover/table_test.go      |  96 +++++++++++++++++++++++++++++++--
 p2p/discover/table_util_test.go |  20 +------
 p2p/discover/udp.go             |   4 +-
 4 files changed, 175 insertions(+), 62 deletions(-)

(limited to 'p2p')

diff --git a/p2p/discover/table.go b/p2p/discover/table.go
index ba4c06327..ef0c08afc 100644
--- a/p2p/discover/table.go
+++ b/p2p/discover/table.go
@@ -328,7 +328,7 @@ func (tab *Table) findnode(n *node, targetKey encPubkey, reply chan<- []*node) {
 	// Grab as many nodes as possible. Some of them might not be alive anymore, but we'll
 	// just remove those again during revalidation.
 	for _, n := range r {
-		tab.add(n)
+		tab.addSeenNode(n)
 	}
 	reply <- r
 }
@@ -443,7 +443,7 @@ func (tab *Table) loadSeedNodes() {
 		seed := seeds[i]
 		age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID(), seed.IP())) }}
 		log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age)
-		tab.add(seed)
+		tab.addSeenNode(seed)
 	}
 }
 
@@ -468,7 +468,7 @@ func (tab *Table) doRevalidate(done chan<- struct{}) {
 		// The node responded, move it to the front.
 		last.livenessChecks++
 		log.Debug("Revalidated node", "b", bi, "id", last.ID(), "checks", last.livenessChecks)
-		b.bump(last)
+		tab.bumpInBucket(b, last)
 		return
 	}
 	// No reply received, pick a replacement or delete the node if there aren't
@@ -551,12 +551,12 @@ func (tab *Table) bucket(id enode.ID) *bucket {
 	return tab.buckets[d-bucketMinDistance-1]
 }
 
-// add attempts to add the given node to its corresponding bucket. If the bucket has space
-// available, adding the node succeeds immediately. Otherwise, the node is added if the
-// least recently active node in the bucket does not respond to a ping packet.
+// addSeenNode adds a node which may or may not be live to the end of a bucket. If the
+// bucket has space available, adding the node succeeds immediately. Otherwise, the node is
+// added to the replacements list.
 //
 // The caller must not hold tab.mutex.
-func (tab *Table) add(n *node) {
+func (tab *Table) addSeenNode(n *node) {
 	if n.ID() == tab.self().ID() {
 		return
 	}
@@ -564,23 +564,68 @@ func (tab *Table) add(n *node) {
 	tab.mutex.Lock()
 	defer tab.mutex.Unlock()
 	b := tab.bucket(n.ID())
-	if !tab.bumpOrAdd(b, n) {
-		// Node is not in table. Add it to the replacement list.
+	if contains(b.entries, n.ID()) {
+		// Already in bucket, don't add.
+		return
+	}
+	if len(b.entries) >= bucketSize {
+		// Bucket full, maybe add as replacement.
 		tab.addReplacement(b, n)
+		return
+	}
+	if !tab.addIP(b, n.IP()) {
+		// Can't add: IP limit reached.
+		return
+	}
+	// Add to end of bucket:
+	b.entries = append(b.entries, n)
+	b.replacements = deleteNode(b.replacements, n)
+	n.addedAt = time.Now()
+	if tab.nodeAddedHook != nil {
+		tab.nodeAddedHook(n)
 	}
 }
 
-// addThroughPing adds the given node to the table. Compared to plain
-// 'add' there is an additional safety measure: if the table is still
-// initializing the node is not added. This prevents an attack where the
-// table could be filled by just sending ping repeatedly.
+// addVerifiedNode adds a node whose existence has been verified recently to the front of a
+// bucket. If the node is already in the bucket, it is moved to the front. If the bucket
+// has no space, the node is added to the replacements list.
+//
+// There is an additional safety measure: if the table is still initializing the node
+// is not added. This prevents an attack where the table could be filled by just sending
+// ping repeatedly.
 //
 // The caller must not hold tab.mutex.
-func (tab *Table) addThroughPing(n *node) {
+func (tab *Table) addVerifiedNode(n *node) {
 	if !tab.isInitDone() {
 		return
 	}
-	tab.add(n)
+	if n.ID() == tab.self().ID() {
+		return
+	}
+
+	tab.mutex.Lock()
+	defer tab.mutex.Unlock()
+	b := tab.bucket(n.ID())
+	if tab.bumpInBucket(b, n) {
+		// Already in bucket, moved to front.
+		return
+	}
+	if len(b.entries) >= bucketSize {
+		// Bucket full, maybe add as replacement.
+		tab.addReplacement(b, n)
+		return
+	}
+	if !tab.addIP(b, n.IP()) {
+		// Can't add: IP limit reached.
+		return
+	}
+	// Add to front of bucket.
+	b.entries, _ = pushNode(b.entries, n, bucketSize)
+	b.replacements = deleteNode(b.replacements, n)
+	n.addedAt = time.Now()
+	if tab.nodeAddedHook != nil {
+		tab.nodeAddedHook(n)
+	}
 }
 
 // delete removes an entry from the node table. It is used to evacuate dead nodes.
@@ -651,12 +696,21 @@ func (tab *Table) replace(b *bucket, last *node) *node {
 	return r
 }
 
-// bump moves the given node to the front of the bucket entry list
+// bumpInBucket moves the given node to the front of the bucket entry list
 // if it is contained in that list.
-func (b *bucket) bump(n *node) bool {
+func (tab *Table) bumpInBucket(b *bucket, n *node) bool {
 	for i := range b.entries {
 		if b.entries[i].ID() == n.ID() {
-			// move it to the front
+			if !n.IP().Equal(b.entries[i].IP()) {
+				// Endpoint has changed, ensure that the new IP fits into table limits.
+				tab.removeIP(b, b.entries[i].IP())
+				if !tab.addIP(b, n.IP()) {
+					// It doesn't, put the previous one back.
+					tab.addIP(b, b.entries[i].IP())
+					return false
+				}
+			}
+			// Move it to the front.
 			copy(b.entries[1:], b.entries[:i])
 			b.entries[0] = n
 			return true
@@ -665,29 +719,20 @@ func (b *bucket) bump(n *node) bool {
 	return false
 }
 
-// bumpOrAdd moves n to the front of the bucket entry list or adds it if the list isn't
-// full. The return value is true if n is in the bucket.
-func (tab *Table) bumpOrAdd(b *bucket, n *node) bool {
-	if b.bump(n) {
-		return true
-	}
-	if len(b.entries) >= bucketSize || !tab.addIP(b, n.IP()) {
-		return false
-	}
-	b.entries, _ = pushNode(b.entries, n, bucketSize)
-	b.replacements = deleteNode(b.replacements, n)
-	n.addedAt = time.Now()
-	if tab.nodeAddedHook != nil {
-		tab.nodeAddedHook(n)
-	}
-	return true
-}
-
 func (tab *Table) deleteInBucket(b *bucket, n *node) {
 	b.entries = deleteNode(b.entries, n)
 	tab.removeIP(b, n.IP())
 }
 
+func contains(ns []*node, id enode.ID) bool {
+	for _, n := range ns {
+		if n.ID() == id {
+			return true
+		}
+	}
+	return false
+}
+
 // pushNode adds n to the front of list, keeping at most max items.
 func pushNode(list []*node, n *node, max int) ([]*node, *node) {
 	if len(list) < max {
diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go
index b00a93211..b5622e3a2 100644
--- a/p2p/discover/table_test.go
+++ b/p2p/discover/table_test.go
@@ -30,6 +30,7 @@ import (
 	"github.com/ethereum/go-ethereum/crypto"
 	"github.com/ethereum/go-ethereum/p2p/enode"
 	"github.com/ethereum/go-ethereum/p2p/enr"
+	"github.com/ethereum/go-ethereum/p2p/netutil"
 )
 
 func TestTable_pingReplace(t *testing.T) {
@@ -64,7 +65,7 @@ func testPingReplace(t *testing.T, newNodeIsResponding, lastInBucketIsResponding
 	// its bucket if it is unresponsive. Revalidate again to ensure that
 	transport.dead[last.ID()] = !lastInBucketIsResponding
 	transport.dead[pingSender.ID()] = !newNodeIsResponding
-	tab.add(pingSender)
+	tab.addSeenNode(pingSender)
 	tab.doRevalidate(make(chan struct{}, 1))
 	tab.doRevalidate(make(chan struct{}, 1))
 
@@ -114,10 +115,14 @@ func TestBucket_bumpNoDuplicates(t *testing.T) {
 	}
 
 	prop := func(nodes []*node, bumps []int) (ok bool) {
+		tab, db := newTestTable(newPingRecorder())
+		defer db.Close()
+		defer tab.Close()
+
 		b := &bucket{entries: make([]*node, len(nodes))}
 		copy(b.entries, nodes)
 		for i, pos := range bumps {
-			b.bump(b.entries[pos])
+			tab.bumpInBucket(b, b.entries[pos])
 			if hasDuplicates(b.entries) {
 				t.Logf("bucket has duplicates after %d/%d bumps:", i+1, len(bumps))
 				for _, n := range b.entries {
@@ -126,6 +131,7 @@ func TestBucket_bumpNoDuplicates(t *testing.T) {
 				return false
 			}
 		}
+		checkIPLimitInvariant(t, tab)
 		return true
 	}
 	if err := quick.Check(prop, cfg); err != nil {
@@ -142,11 +148,12 @@ func TestTable_IPLimit(t *testing.T) {
 
 	for i := 0; i < tableIPLimit+1; i++ {
 		n := nodeAtDistance(tab.self().ID(), i, net.IP{172, 0, 1, byte(i)})
-		tab.add(n)
+		tab.addSeenNode(n)
 	}
 	if tab.len() > tableIPLimit {
 		t.Errorf("too many nodes in table")
 	}
+	checkIPLimitInvariant(t, tab)
 }
 
 // This checks that the per-bucket IP limit is applied correctly.
@@ -159,11 +166,28 @@ func TestTable_BucketIPLimit(t *testing.T) {
 	d := 3
 	for i := 0; i < bucketIPLimit+1; i++ {
 		n := nodeAtDistance(tab.self().ID(), d, net.IP{172, 0, 1, byte(i)})
-		tab.add(n)
+		tab.addSeenNode(n)
 	}
 	if tab.len() > bucketIPLimit {
 		t.Errorf("too many nodes in table")
 	}
+	checkIPLimitInvariant(t, tab)
+}
+
+// checkIPLimitInvariant checks that ip limit sets contain an entry for every
+// node in the table and no extra entries.
+func checkIPLimitInvariant(t *testing.T, tab *Table) {
+	t.Helper()
+
+	tabset := netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit}
+	for _, b := range tab.buckets {
+		for _, n := range b.entries {
+			tabset.Add(n.IP())
+		}
+	}
+	if tabset.String() != tab.ips.String() {
+		t.Errorf("table IP set is incorrect:\nhave: %v\nwant: %v", tab.ips, tabset)
+	}
 }
 
 func TestTable_closest(t *testing.T) {
@@ -281,6 +305,69 @@ func (*closeTest) Generate(rand *rand.Rand, size int) reflect.Value {
 	return reflect.ValueOf(t)
 }
 
+func TestTable_addVerifiedNode(t *testing.T) {
+	tab, db := newTestTable(newPingRecorder())
+	<-tab.initDone
+	defer db.Close()
+	defer tab.Close()
+
+	// Insert two nodes.
+	n1 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 1})
+	n2 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 2})
+	tab.addSeenNode(n1)
+	tab.addSeenNode(n2)
+
+	// Verify bucket content:
+	bcontent := []*node{n1, n2}
+	if !reflect.DeepEqual(tab.bucket(n1.ID()).entries, bcontent) {
+		t.Fatalf("wrong bucket content: %v", tab.bucket(n1.ID()).entries)
+	}
+
+	// Add a changed version of n2.
+	newrec := n2.Record()
+	newrec.Set(enr.IP{99, 99, 99, 99})
+	newn2 := wrapNode(enode.SignNull(newrec, n2.ID()))
+	tab.addVerifiedNode(newn2)
+
+	// Check that bucket is updated correctly.
+	newBcontent := []*node{newn2, n1}
+	if !reflect.DeepEqual(tab.bucket(n1.ID()).entries, newBcontent) {
+		t.Fatalf("wrong bucket content after update: %v", tab.bucket(n1.ID()).entries)
+	}
+	checkIPLimitInvariant(t, tab)
+}
+
+func TestTable_addSeenNode(t *testing.T) {
+	tab, db := newTestTable(newPingRecorder())
+	<-tab.initDone
+	defer db.Close()
+	defer tab.Close()
+
+	// Insert two nodes.
+	n1 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 1})
+	n2 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 2})
+	tab.addSeenNode(n1)
+	tab.addSeenNode(n2)
+
+	// Verify bucket content:
+	bcontent := []*node{n1, n2}
+	if !reflect.DeepEqual(tab.bucket(n1.ID()).entries, bcontent) {
+		t.Fatalf("wrong bucket content: %v", tab.bucket(n1.ID()).entries)
+	}
+
+	// Add a changed version of n2.
+	newrec := n2.Record()
+	newrec.Set(enr.IP{99, 99, 99, 99})
+	newn2 := wrapNode(enode.SignNull(newrec, n2.ID()))
+	tab.addSeenNode(newn2)
+
+	// Check that bucket content is unchanged.
+	if !reflect.DeepEqual(tab.bucket(n1.ID()).entries, bcontent) {
+		t.Fatalf("wrong bucket content after update: %v", tab.bucket(n1.ID()).entries)
+	}
+	checkIPLimitInvariant(t, tab)
+}
+
 func TestTable_Lookup(t *testing.T) {
 	tab, db := newTestTable(lookupTestnet)
 	defer db.Close()
@@ -535,7 +622,6 @@ func (tn *preminedTestnet) findnode(toid enode.ID, toaddr *net.UDPAddr, target e
 }
 
 func (*preminedTestnet) close()                                        {}
-func (*preminedTestnet) waitping(from enode.ID) error                  { return nil }
 func (*preminedTestnet) ping(toid enode.ID, toaddr *net.UDPAddr) error { return nil }
 
 // mine generates a testnet struct literal with nodes at
diff --git a/p2p/discover/table_util_test.go b/p2p/discover/table_util_test.go
index 3ce582b99..e61c9e6fc 100644
--- a/p2p/discover/table_util_test.go
+++ b/p2p/discover/table_util_test.go
@@ -86,17 +86,8 @@ func fillBucket(tab *Table, n *node) (last *node) {
 // fillTable adds nodes the table to the end of their corresponding bucket
 // if the bucket is not full. The caller must not hold tab.mutex.
 func fillTable(tab *Table, nodes []*node) {
-	tab.mutex.Lock()
-	defer tab.mutex.Unlock()
-
 	for _, n := range nodes {
-		if n.ID() == tab.self().ID() {
-			continue // don't add self
-		}
-		b := tab.bucket(n.ID())
-		if len(b.entries) < bucketSize {
-			tab.bumpOrAdd(b, n)
-		}
+		tab.addSeenNode(n)
 	}
 }
 
@@ -154,15 +145,6 @@ func hasDuplicates(slice []*node) bool {
 	return false
 }
 
-func contains(ns []*node, id enode.ID) bool {
-	for _, n := range ns {
-		if n.ID() == id {
-			return true
-		}
-	}
-	return false
-}
-
 func sortedByDistanceTo(distbase enode.ID, slice []*node) bool {
 	var last enode.ID
 	for i, e := range slice {
diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go
index 5ce4c43dc..df9a3065f 100644
--- a/p2p/discover/udp.go
+++ b/p2p/discover/udp.go
@@ -661,10 +661,10 @@ func (req *ping) handle(t *udp, from *net.UDPAddr, fromID enode.ID, mac []byte)
 	n := wrapNode(enode.NewV4(req.senderKey, from.IP, int(req.From.TCP), from.Port))
 	if time.Since(t.db.LastPongReceived(n.ID(), from.IP)) > bondExpiration {
 		t.sendPing(fromID, from, func() {
-			t.tab.addThroughPing(n)
+			t.tab.addVerifiedNode(n)
 		})
 	} else {
-		t.tab.addThroughPing(n)
+		t.tab.addVerifiedNode(n)
 	}
 
 	// Update node database and endpoint predictor.
-- 
cgit v1.2.3