From 2a75fe3308faf4d77054e00b55566c9f18591572 Mon Sep 17 00:00:00 2001
From: Andrey Petrov <andrey.petrov@shazow.net>
Date: Sun, 25 Feb 2018 15:39:29 -0500
Subject: rpc: Add admin_addTrustedPeer and admin_removeTrustedPeer.

These RPC calls are analogous to Parity's parity_addReservedPeer and
parity_removeReservedPeer.

They are useful for adjusting the trusted peer set during runtime,
without requiring restarting the server.
---
 p2p/server.go | 46 +++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 43 insertions(+), 3 deletions(-)

(limited to 'p2p')

diff --git a/p2p/server.go b/p2p/server.go
index cdb5b1926..39ff2f51e 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -169,6 +169,8 @@ type Server struct {
 	quit          chan struct{}
 	addstatic     chan *discover.Node
 	removestatic  chan *discover.Node
+	addtrusted    chan *discover.Node
+	removetrusted chan *discover.Node
 	posthandshake chan *conn
 	addpeer       chan *conn
 	delpeer       chan peerDrop
@@ -300,6 +302,23 @@ func (srv *Server) RemovePeer(node *discover.Node) {
 	}
 }
 
+// AddTrustedPeer adds the given node to a reserved whitelist which allows the
+// node to always connect, even if the slot are full.
+func (srv *Server) AddTrustedPeer(node *discover.Node) {
+	select {
+	case srv.addtrusted <- node:
+	case <-srv.quit:
+	}
+}
+
+// RemoveTrustedPeer removes the given node from the trusted peer set.
+func (srv *Server) RemoveTrustedPeer(node *discover.Node) {
+	select {
+	case srv.removetrusted <- node:
+	case <-srv.quit:
+	}
+}
+
 // SubscribePeers subscribes the given channel to peer events
 func (srv *Server) SubscribeEvents(ch chan *PeerEvent) event.Subscription {
 	return srv.peerFeed.Subscribe(ch)
@@ -410,6 +429,8 @@ func (srv *Server) Start() (err error) {
 	srv.posthandshake = make(chan *conn)
 	srv.addstatic = make(chan *discover.Node)
 	srv.removestatic = make(chan *discover.Node)
+	srv.addtrusted = make(chan *discover.Node)
+	srv.removetrusted = make(chan *discover.Node)
 	srv.peerOp = make(chan peerOpFunc)
 	srv.peerOpDone = make(chan struct{})
 
@@ -546,8 +567,7 @@ func (srv *Server) run(dialstate dialer) {
 		queuedTasks  []task // tasks that can't run yet
 	)
 	// Put trusted nodes into a map to speed up checks.
-	// Trusted peers are loaded on startup and cannot be
-	// modified while the server is running.
+	// Trusted peers are loaded on startup or added via AddTrustedPeer RPC.
 	for _, n := range srv.TrustedNodes {
 		trusted[n.ID] = true
 	}
@@ -599,12 +619,32 @@ running:
 		case n := <-srv.removestatic:
 			// This channel is used by RemovePeer to send a
 			// disconnect request to a peer and begin the
-			// stop keeping the node connected
+			// stop keeping the node connected.
 			srv.log.Trace("Removing static node", "node", n)
 			dialstate.removeStatic(n)
 			if p, ok := peers[n.ID]; ok {
 				p.Disconnect(DiscRequested)
 			}
+		case n := <-srv.addtrusted:
+			// This channel is used by AddTrustedPeer to add an enode
+			// to the trusted node set.
+			srv.log.Trace("Adding trusted node", "node", n)
+			trusted[n.ID] = true
+			// Mark any already-connected peer as trusted
+			if p, ok := peers[n.ID]; ok {
+				p.rw.flags |= trustedConn
+			}
+		case n := <-srv.removetrusted:
+			// This channel is used by RemoveTrustedPeer to remove an enode
+			// from the trusted node set.
+			srv.log.Trace("Removing trusted node", "node", n)
+			if _, ok := trusted[n.ID]; ok {
+				delete(trusted, n.ID)
+			}
+			// Unmark any already-connected peer as trusted
+			if p, ok := peers[n.ID]; ok {
+				p.rw.flags &= ^trustedConn
+			}
 		case op := <-srv.peerOp:
 			// This channel is used by Peers and PeerCount.
 			op(peers)
-- 
cgit v1.2.3


From 773857a5242a3fe7458a9c9b60a4ea6333582e56 Mon Sep 17 00:00:00 2001
From: Andrey Petrov <andrey.petrov@shazow.net>
Date: Sun, 18 Mar 2018 12:25:35 -0400
Subject: p2p: Test for MaxPeers=0 and TrustedPeer override

---
 p2p/server_test.go | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 54 insertions(+)

(limited to 'p2p')

diff --git a/p2p/server_test.go b/p2p/server_test.go
index 10c36528e..efab9bb5e 100644
--- a/p2p/server_test.go
+++ b/p2p/server_test.go
@@ -363,7 +363,61 @@ func TestServerAtCap(t *testing.T) {
 	if !c.is(trustedConn) {
 		t.Error("Server did not set trusted flag")
 	}
+}
+
+func TestServerPeerLimits(t *testing.T) {
+	srvkey := newkey()
+	srvid := discover.PubkeyID(&srvkey.PublicKey)
+
+	var tp *setupTransport = &setupTransport{id: srvid, phs: &protoHandshake{ID: srvid}}
+	var flags connFlag = dynDialedConn
+	var dialDest *discover.Node = &discover.Node{ID: srvid}
 
+	srv := &Server{
+		Config: Config{
+			PrivateKey: srvkey,
+			MaxPeers:   0,
+			NoDial:     true,
+			Protocols:  []Protocol{discard},
+		},
+		newTransport: func(fd net.Conn) transport { return tp },
+		log:          log.New(),
+	}
+	if err := srv.Start(); err != nil {
+		t.Fatalf("couldn't start server: %v", err)
+	}
+
+	// Check that server is full (MaxPeers=0)
+	conn, _ := net.Pipe()
+	srv.SetupConn(conn, flags, dialDest)
+	if tp.closeErr != DiscTooManyPeers {
+		t.Errorf("unexpected close error: %q", tp.closeErr)
+	}
+	conn.Close()
+
+	srv.AddTrustedPeer(dialDest)
+
+	// Check that server allows a trusted peer despite being full.
+	conn, _ = net.Pipe()
+	srv.SetupConn(conn, flags, dialDest)
+	if tp.closeErr == DiscTooManyPeers {
+		t.Errorf("failed to bypass MaxPeers with trusted node: %q", tp.closeErr)
+	}
+
+	if tp.closeErr != DiscSelf {
+		t.Errorf("unexpected close error: %q", tp.closeErr)
+	}
+	conn.Close()
+
+	srv.RemoveTrustedPeer(dialDest)
+
+	// Check that server is full again.
+	conn, _ = net.Pipe()
+	srv.SetupConn(conn, flags, dialDest)
+	if tp.closeErr != DiscTooManyPeers {
+		t.Errorf("unexpected close error: %q", tp.closeErr)
+	}
+	conn.Close()
 }
 
 func TestServerSetupConn(t *testing.T) {
-- 
cgit v1.2.3


From 699794d88d86c30fa8ac74c0bbe3e0ac9cde88a2 Mon Sep 17 00:00:00 2001
From: Andrey Petrov <andrey.petrov@shazow.net>
Date: Tue, 5 Jun 2018 15:45:43 -0400
Subject: p2p: More tests for AddTrustedPeer/RemoveTrustedPeer

---
 p2p/server_test.go | 60 ++++++++++++++++++++++++++++++++++++++++++++++--------
 1 file changed, 52 insertions(+), 8 deletions(-)

(limited to 'p2p')

diff --git a/p2p/server_test.go b/p2p/server_test.go
index efab9bb5e..5fad1d0a7 100644
--- a/p2p/server_test.go
+++ b/p2p/server_test.go
@@ -148,7 +148,8 @@ func TestServerDial(t *testing.T) {
 
 	// tell the server to connect
 	tcpAddr := listener.Addr().(*net.TCPAddr)
-	srv.AddPeer(&discover.Node{ID: remid, IP: tcpAddr.IP, TCP: uint16(tcpAddr.Port)})
+	node := &discover.Node{ID: remid, IP: tcpAddr.IP, TCP: uint16(tcpAddr.Port)}
+	srv.AddPeer(node)
 
 	select {
 	case conn := <-accepted:
@@ -170,6 +171,21 @@ func TestServerDial(t *testing.T) {
 			if !reflect.DeepEqual(peers, []*Peer{peer}) {
 				t.Errorf("Peers mismatch: got %v, want %v", peers, []*Peer{peer})
 			}
+
+			// Test AddTrustedPeer/RemoveTrustedPeer and changing Trusted flags
+			// Particularly for race conditions on changing the flag state.
+			if peer := srv.Peers()[0]; peer.Info().Network.Trusted {
+				t.Errorf("peer is trusted prematurely: %v", peer)
+			}
+			srv.AddTrustedPeer(node)
+			if peer := srv.Peers()[0]; !peer.Info().Network.Trusted {
+				t.Errorf("peer is not trusted after AddTrustedPeer: %v", peer)
+			}
+			srv.RemoveTrustedPeer(node)
+			if peer := srv.Peers()[0]; peer.Info().Network.Trusted {
+				t.Errorf("peer is trusted after RemoveTrustedPeer: %v", peer)
+			}
+
 		case <-time.After(1 * time.Second):
 			t.Error("server did not launch peer within one second")
 		}
@@ -351,7 +367,8 @@ func TestServerAtCap(t *testing.T) {
 		}
 	}
 	// Try inserting a non-trusted connection.
-	c := newconn(randomID())
+	anotherID := randomID()
+	c := newconn(anotherID)
 	if err := srv.checkpoint(c, srv.posthandshake); err != DiscTooManyPeers {
 		t.Error("wrong error for insert:", err)
 	}
@@ -363,15 +380,41 @@ func TestServerAtCap(t *testing.T) {
 	if !c.is(trustedConn) {
 		t.Error("Server did not set trusted flag")
 	}
+
+	// Remove from trusted set and try again
+	srv.RemoveTrustedPeer(&discover.Node{ID: trustedID})
+	c = newconn(trustedID)
+	if err := srv.checkpoint(c, srv.posthandshake); err != DiscTooManyPeers {
+		t.Error("wrong error for insert:", err)
+	}
+
+	// Add anotherID to trusted set and try again
+	srv.AddTrustedPeer(&discover.Node{ID: anotherID})
+	c = newconn(anotherID)
+	if err := srv.checkpoint(c, srv.posthandshake); err != nil {
+		t.Error("unexpected error for trusted conn @posthandshake:", err)
+	}
+	if !c.is(trustedConn) {
+		t.Error("Server did not set trusted flag")
+	}
 }
 
 func TestServerPeerLimits(t *testing.T) {
 	srvkey := newkey()
-	srvid := discover.PubkeyID(&srvkey.PublicKey)
 
-	var tp *setupTransport = &setupTransport{id: srvid, phs: &protoHandshake{ID: srvid}}
+	clientid := randomID()
+	clientnode := &discover.Node{ID: clientid}
+
+	var tp *setupTransport = &setupTransport{
+		id: clientid,
+		phs: &protoHandshake{
+			ID: clientid,
+			// Force "DiscUselessPeer" due to unmatching caps
+			// Caps: []Cap{discard.cap()},
+		},
+	}
 	var flags connFlag = dynDialedConn
-	var dialDest *discover.Node = &discover.Node{ID: srvid}
+	var dialDest *discover.Node = &discover.Node{ID: clientid}
 
 	srv := &Server{
 		Config: Config{
@@ -386,6 +429,7 @@ func TestServerPeerLimits(t *testing.T) {
 	if err := srv.Start(); err != nil {
 		t.Fatalf("couldn't start server: %v", err)
 	}
+	defer srv.Stop()
 
 	// Check that server is full (MaxPeers=0)
 	conn, _ := net.Pipe()
@@ -395,7 +439,7 @@ func TestServerPeerLimits(t *testing.T) {
 	}
 	conn.Close()
 
-	srv.AddTrustedPeer(dialDest)
+	srv.AddTrustedPeer(clientnode)
 
 	// Check that server allows a trusted peer despite being full.
 	conn, _ = net.Pipe()
@@ -404,12 +448,12 @@ func TestServerPeerLimits(t *testing.T) {
 		t.Errorf("failed to bypass MaxPeers with trusted node: %q", tp.closeErr)
 	}
 
-	if tp.closeErr != DiscSelf {
+	if tp.closeErr != DiscUselessPeer {
 		t.Errorf("unexpected close error: %q", tp.closeErr)
 	}
 	conn.Close()
 
-	srv.RemoveTrustedPeer(dialDest)
+	srv.RemoveTrustedPeer(clientnode)
 
 	// Check that server is full again.
 	conn, _ = net.Pipe()
-- 
cgit v1.2.3


From 399aa710d514561be571dc180aa4afe9fcc2138d Mon Sep 17 00:00:00 2001
From: Andrey Petrov <andrey.petrov@shazow.net>
Date: Thu, 7 Jun 2018 10:31:09 -0400
Subject: p2p: Attempt to race check peer.Inbound() in TestServerDial

---
 p2p/server_test.go | 3 +++
 1 file changed, 3 insertions(+)

(limited to 'p2p')

diff --git a/p2p/server_test.go b/p2p/server_test.go
index 5fad1d0a7..7eca46938 100644
--- a/p2p/server_test.go
+++ b/p2p/server_test.go
@@ -181,6 +181,9 @@ func TestServerDial(t *testing.T) {
 			if peer := srv.Peers()[0]; !peer.Info().Network.Trusted {
 				t.Errorf("peer is not trusted after AddTrustedPeer: %v", peer)
 			}
+			if peer := srv.Peers()[0]; peer.Inbound() {
+				t.Errorf("peer is marked inbound")
+			}
 			srv.RemoveTrustedPeer(node)
 			if peer := srv.Peers()[0]; peer.Info().Network.Trusted {
 				t.Errorf("peer is trusted after RemoveTrustedPeer: %v", peer)
-- 
cgit v1.2.3


From dcca66bce8ec79bcf0e06c32f57d0011f8d9fa93 Mon Sep 17 00:00:00 2001
From: Andrey Petrov <andrey.petrov@shazow.net>
Date: Thu, 7 Jun 2018 10:42:40 -0400
Subject: p2p: Cache inbound flag on Peer.isInbound to avoid a race

---
 p2p/peer.go | 26 ++++++++++++++------------
 1 file changed, 14 insertions(+), 12 deletions(-)

(limited to 'p2p')

diff --git a/p2p/peer.go b/p2p/peer.go
index c3907349f..ff8602602 100644
--- a/p2p/peer.go
+++ b/p2p/peer.go
@@ -95,10 +95,11 @@ type PeerEvent struct {
 
 // Peer represents a connected remote node.
 type Peer struct {
-	rw      *conn
-	running map[string]*protoRW
-	log     log.Logger
-	created mclock.AbsTime
+	rw        *conn
+	isInbound bool // Cached from rw.flags to avoid a race condition
+	running   map[string]*protoRW
+	log       log.Logger
+	created   mclock.AbsTime
 
 	wg       sync.WaitGroup
 	protoErr chan error
@@ -160,19 +161,20 @@ func (p *Peer) String() string {
 
 // Inbound returns true if the peer is an inbound connection
 func (p *Peer) Inbound() bool {
-	return p.rw.flags&inboundConn != 0
+	return p.isInbound
 }
 
 func newPeer(conn *conn, protocols []Protocol) *Peer {
 	protomap := matchProtocols(protocols, conn.caps, conn)
 	p := &Peer{
-		rw:       conn,
-		running:  protomap,
-		created:  mclock.Now(),
-		disc:     make(chan DiscReason),
-		protoErr: make(chan error, len(protomap)+1), // protocols + pingLoop
-		closed:   make(chan struct{}),
-		log:      log.New("id", conn.id, "conn", conn.flags),
+		rw:        conn,
+		isInbound: conn.is(inboundConn),
+		running:   protomap,
+		created:   mclock.Now(),
+		disc:      make(chan DiscReason),
+		protoErr:  make(chan error, len(protomap)+1), // protocols + pingLoop
+		closed:    make(chan struct{}),
+		log:       log.New("id", conn.id, "conn", conn.flags),
 	}
 	return p
 }
-- 
cgit v1.2.3


From 193a402cc08e69f8c6b92106e8e81104d260d26c Mon Sep 17 00:00:00 2001
From: Andrey Petrov <andrey.petrov@shazow.net>
Date: Thu, 7 Jun 2018 12:49:07 -0400
Subject: p2p: Test for peer.rw.flags race conditions

---
 p2p/server_test.go | 29 ++++++++++++++++++-----------
 1 file changed, 18 insertions(+), 11 deletions(-)

(limited to 'p2p')

diff --git a/p2p/server_test.go b/p2p/server_test.go
index 7eca46938..65897e018 100644
--- a/p2p/server_test.go
+++ b/p2p/server_test.go
@@ -177,18 +177,25 @@ func TestServerDial(t *testing.T) {
 			if peer := srv.Peers()[0]; peer.Info().Network.Trusted {
 				t.Errorf("peer is trusted prematurely: %v", peer)
 			}
-			srv.AddTrustedPeer(node)
-			if peer := srv.Peers()[0]; !peer.Info().Network.Trusted {
-				t.Errorf("peer is not trusted after AddTrustedPeer: %v", peer)
-			}
-			if peer := srv.Peers()[0]; peer.Inbound() {
-				t.Errorf("peer is marked inbound")
-			}
-			srv.RemoveTrustedPeer(node)
-			if peer := srv.Peers()[0]; peer.Info().Network.Trusted {
-				t.Errorf("peer is trusted after RemoveTrustedPeer: %v", peer)
-			}
+			done := make(chan bool)
+			go func() {
+				srv.AddTrustedPeer(node)
+				if peer := srv.Peers()[0]; !peer.Info().Network.Trusted {
+					t.Errorf("peer is not trusted after AddTrustedPeer: %v", peer)
+				}
+				srv.RemoveTrustedPeer(node)
+				if peer := srv.Peers()[0]; peer.Info().Network.Trusted {
+					t.Errorf("peer is trusted after RemoveTrustedPeer: %v", peer)
+				}
+				done <- true
+			}()
+
+			// Trigger potential race conditions
+			peer = srv.Peers()[0]
+			_ = peer.Inbound()
+			_ = peer.Info()
 
+			<-done
 		case <-time.After(1 * time.Second):
 			t.Error("server did not launch peer within one second")
 		}
-- 
cgit v1.2.3


From 6209545083f656f2dccbe4561644a757ff6443b5 Mon Sep 17 00:00:00 2001
From: Andrey Petrov <andrey.petrov@shazow.net>
Date: Thu, 7 Jun 2018 21:50:08 -0400
Subject: p2p: Wrap conn.flags ops with atomic.Load/Store

---
 p2p/peer.go        | 26 ++++++++++++--------------
 p2p/server.go      | 20 ++++++++++++++++----
 p2p/server_test.go |  2 --
 3 files changed, 28 insertions(+), 20 deletions(-)

(limited to 'p2p')

diff --git a/p2p/peer.go b/p2p/peer.go
index ff8602602..c4c1fcd7c 100644
--- a/p2p/peer.go
+++ b/p2p/peer.go
@@ -95,11 +95,10 @@ type PeerEvent struct {
 
 // Peer represents a connected remote node.
 type Peer struct {
-	rw        *conn
-	isInbound bool // Cached from rw.flags to avoid a race condition
-	running   map[string]*protoRW
-	log       log.Logger
-	created   mclock.AbsTime
+	rw      *conn
+	running map[string]*protoRW
+	log     log.Logger
+	created mclock.AbsTime
 
 	wg       sync.WaitGroup
 	protoErr chan error
@@ -161,20 +160,19 @@ func (p *Peer) String() string {
 
 // Inbound returns true if the peer is an inbound connection
 func (p *Peer) Inbound() bool {
-	return p.isInbound
+	return p.rw.is(inboundConn)
 }
 
 func newPeer(conn *conn, protocols []Protocol) *Peer {
 	protomap := matchProtocols(protocols, conn.caps, conn)
 	p := &Peer{
-		rw:        conn,
-		isInbound: conn.is(inboundConn),
-		running:   protomap,
-		created:   mclock.Now(),
-		disc:      make(chan DiscReason),
-		protoErr:  make(chan error, len(protomap)+1), // protocols + pingLoop
-		closed:    make(chan struct{}),
-		log:       log.New("id", conn.id, "conn", conn.flags),
+		rw:       conn,
+		running:  protomap,
+		created:  mclock.Now(),
+		disc:     make(chan DiscReason),
+		protoErr: make(chan error, len(protomap)+1), // protocols + pingLoop
+		closed:   make(chan struct{}),
+		log:      log.New("id", conn.id, "conn", conn.flags),
 	}
 	return p
 }
diff --git a/p2p/server.go b/p2p/server.go
index 39ff2f51e..d2cb94925 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -23,6 +23,7 @@ import (
 	"fmt"
 	"net"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"github.com/ethereum/go-ethereum/common"
@@ -187,7 +188,7 @@ type peerDrop struct {
 	requested bool // true if signaled by the peer
 }
 
-type connFlag int
+type connFlag int32
 
 const (
 	dynDialedConn connFlag = 1 << iota
@@ -252,7 +253,18 @@ func (f connFlag) String() string {
 }
 
 func (c *conn) is(f connFlag) bool {
-	return c.flags&f != 0
+	flags := connFlag(atomic.LoadInt32((*int32)(&c.flags)))
+	return flags&f != 0
+}
+
+func (c *conn) set(f connFlag, val bool) {
+	flags := connFlag(atomic.LoadInt32((*int32)(&c.flags)))
+	if val {
+		flags |= f
+	} else {
+		flags &= ^f
+	}
+	atomic.StoreInt32((*int32)(&c.flags), int32(flags))
 }
 
 // Peers returns all connected peers.
@@ -632,7 +644,7 @@ running:
 			trusted[n.ID] = true
 			// Mark any already-connected peer as trusted
 			if p, ok := peers[n.ID]; ok {
-				p.rw.flags |= trustedConn
+				p.rw.set(trustedConn, true)
 			}
 		case n := <-srv.removetrusted:
 			// This channel is used by RemoveTrustedPeer to remove an enode
@@ -643,7 +655,7 @@ running:
 			}
 			// Unmark any already-connected peer as trusted
 			if p, ok := peers[n.ID]; ok {
-				p.rw.flags &= ^trustedConn
+				p.rw.set(trustedConn, false)
 			}
 		case op := <-srv.peerOp:
 			// This channel is used by Peers and PeerCount.
diff --git a/p2p/server_test.go b/p2p/server_test.go
index 65897e018..3f24a79ba 100644
--- a/p2p/server_test.go
+++ b/p2p/server_test.go
@@ -189,12 +189,10 @@ func TestServerDial(t *testing.T) {
 				}
 				done <- true
 			}()
-
 			// Trigger potential race conditions
 			peer = srv.Peers()[0]
 			_ = peer.Inbound()
 			_ = peer.Info()
-
 			<-done
 		case <-time.After(1 * time.Second):
 			t.Error("server did not launch peer within one second")
-- 
cgit v1.2.3