From 2a75fe3308faf4d77054e00b55566c9f18591572 Mon Sep 17 00:00:00 2001 From: Andrey Petrov Date: Sun, 25 Feb 2018 15:39:29 -0500 Subject: rpc: Add admin_addTrustedPeer and admin_removeTrustedPeer. These RPC calls are analogous to Parity's parity_addReservedPeer and parity_removeReservedPeer. They are useful for adjusting the trusted peer set during runtime, without requiring restarting the server. --- p2p/server.go | 46 +++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 43 insertions(+), 3 deletions(-) (limited to 'p2p') diff --git a/p2p/server.go b/p2p/server.go index cdb5b1926..39ff2f51e 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -169,6 +169,8 @@ type Server struct { quit chan struct{} addstatic chan *discover.Node removestatic chan *discover.Node + addtrusted chan *discover.Node + removetrusted chan *discover.Node posthandshake chan *conn addpeer chan *conn delpeer chan peerDrop @@ -300,6 +302,23 @@ func (srv *Server) RemovePeer(node *discover.Node) { } } +// AddTrustedPeer adds the given node to a reserved whitelist which allows the +// node to always connect, even if the slot are full. +func (srv *Server) AddTrustedPeer(node *discover.Node) { + select { + case srv.addtrusted <- node: + case <-srv.quit: + } +} + +// RemoveTrustedPeer removes the given node from the trusted peer set. +func (srv *Server) RemoveTrustedPeer(node *discover.Node) { + select { + case srv.removetrusted <- node: + case <-srv.quit: + } +} + // SubscribePeers subscribes the given channel to peer events func (srv *Server) SubscribeEvents(ch chan *PeerEvent) event.Subscription { return srv.peerFeed.Subscribe(ch) @@ -410,6 +429,8 @@ func (srv *Server) Start() (err error) { srv.posthandshake = make(chan *conn) srv.addstatic = make(chan *discover.Node) srv.removestatic = make(chan *discover.Node) + srv.addtrusted = make(chan *discover.Node) + srv.removetrusted = make(chan *discover.Node) srv.peerOp = make(chan peerOpFunc) srv.peerOpDone = make(chan struct{}) @@ -546,8 +567,7 @@ func (srv *Server) run(dialstate dialer) { queuedTasks []task // tasks that can't run yet ) // Put trusted nodes into a map to speed up checks. - // Trusted peers are loaded on startup and cannot be - // modified while the server is running. + // Trusted peers are loaded on startup or added via AddTrustedPeer RPC. for _, n := range srv.TrustedNodes { trusted[n.ID] = true } @@ -599,12 +619,32 @@ running: case n := <-srv.removestatic: // This channel is used by RemovePeer to send a // disconnect request to a peer and begin the - // stop keeping the node connected + // stop keeping the node connected. srv.log.Trace("Removing static node", "node", n) dialstate.removeStatic(n) if p, ok := peers[n.ID]; ok { p.Disconnect(DiscRequested) } + case n := <-srv.addtrusted: + // This channel is used by AddTrustedPeer to add an enode + // to the trusted node set. + srv.log.Trace("Adding trusted node", "node", n) + trusted[n.ID] = true + // Mark any already-connected peer as trusted + if p, ok := peers[n.ID]; ok { + p.rw.flags |= trustedConn + } + case n := <-srv.removetrusted: + // This channel is used by RemoveTrustedPeer to remove an enode + // from the trusted node set. + srv.log.Trace("Removing trusted node", "node", n) + if _, ok := trusted[n.ID]; ok { + delete(trusted, n.ID) + } + // Unmark any already-connected peer as trusted + if p, ok := peers[n.ID]; ok { + p.rw.flags &= ^trustedConn + } case op := <-srv.peerOp: // This channel is used by Peers and PeerCount. op(peers) -- cgit v1.2.3 From 773857a5242a3fe7458a9c9b60a4ea6333582e56 Mon Sep 17 00:00:00 2001 From: Andrey Petrov Date: Sun, 18 Mar 2018 12:25:35 -0400 Subject: p2p: Test for MaxPeers=0 and TrustedPeer override --- p2p/server_test.go | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) (limited to 'p2p') diff --git a/p2p/server_test.go b/p2p/server_test.go index 10c36528e..efab9bb5e 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -363,7 +363,61 @@ func TestServerAtCap(t *testing.T) { if !c.is(trustedConn) { t.Error("Server did not set trusted flag") } +} + +func TestServerPeerLimits(t *testing.T) { + srvkey := newkey() + srvid := discover.PubkeyID(&srvkey.PublicKey) + + var tp *setupTransport = &setupTransport{id: srvid, phs: &protoHandshake{ID: srvid}} + var flags connFlag = dynDialedConn + var dialDest *discover.Node = &discover.Node{ID: srvid} + srv := &Server{ + Config: Config{ + PrivateKey: srvkey, + MaxPeers: 0, + NoDial: true, + Protocols: []Protocol{discard}, + }, + newTransport: func(fd net.Conn) transport { return tp }, + log: log.New(), + } + if err := srv.Start(); err != nil { + t.Fatalf("couldn't start server: %v", err) + } + + // Check that server is full (MaxPeers=0) + conn, _ := net.Pipe() + srv.SetupConn(conn, flags, dialDest) + if tp.closeErr != DiscTooManyPeers { + t.Errorf("unexpected close error: %q", tp.closeErr) + } + conn.Close() + + srv.AddTrustedPeer(dialDest) + + // Check that server allows a trusted peer despite being full. + conn, _ = net.Pipe() + srv.SetupConn(conn, flags, dialDest) + if tp.closeErr == DiscTooManyPeers { + t.Errorf("failed to bypass MaxPeers with trusted node: %q", tp.closeErr) + } + + if tp.closeErr != DiscSelf { + t.Errorf("unexpected close error: %q", tp.closeErr) + } + conn.Close() + + srv.RemoveTrustedPeer(dialDest) + + // Check that server is full again. + conn, _ = net.Pipe() + srv.SetupConn(conn, flags, dialDest) + if tp.closeErr != DiscTooManyPeers { + t.Errorf("unexpected close error: %q", tp.closeErr) + } + conn.Close() } func TestServerSetupConn(t *testing.T) { -- cgit v1.2.3 From 699794d88d86c30fa8ac74c0bbe3e0ac9cde88a2 Mon Sep 17 00:00:00 2001 From: Andrey Petrov Date: Tue, 5 Jun 2018 15:45:43 -0400 Subject: p2p: More tests for AddTrustedPeer/RemoveTrustedPeer --- p2p/server_test.go | 60 ++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 52 insertions(+), 8 deletions(-) (limited to 'p2p') diff --git a/p2p/server_test.go b/p2p/server_test.go index efab9bb5e..5fad1d0a7 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -148,7 +148,8 @@ func TestServerDial(t *testing.T) { // tell the server to connect tcpAddr := listener.Addr().(*net.TCPAddr) - srv.AddPeer(&discover.Node{ID: remid, IP: tcpAddr.IP, TCP: uint16(tcpAddr.Port)}) + node := &discover.Node{ID: remid, IP: tcpAddr.IP, TCP: uint16(tcpAddr.Port)} + srv.AddPeer(node) select { case conn := <-accepted: @@ -170,6 +171,21 @@ func TestServerDial(t *testing.T) { if !reflect.DeepEqual(peers, []*Peer{peer}) { t.Errorf("Peers mismatch: got %v, want %v", peers, []*Peer{peer}) } + + // Test AddTrustedPeer/RemoveTrustedPeer and changing Trusted flags + // Particularly for race conditions on changing the flag state. + if peer := srv.Peers()[0]; peer.Info().Network.Trusted { + t.Errorf("peer is trusted prematurely: %v", peer) + } + srv.AddTrustedPeer(node) + if peer := srv.Peers()[0]; !peer.Info().Network.Trusted { + t.Errorf("peer is not trusted after AddTrustedPeer: %v", peer) + } + srv.RemoveTrustedPeer(node) + if peer := srv.Peers()[0]; peer.Info().Network.Trusted { + t.Errorf("peer is trusted after RemoveTrustedPeer: %v", peer) + } + case <-time.After(1 * time.Second): t.Error("server did not launch peer within one second") } @@ -351,7 +367,8 @@ func TestServerAtCap(t *testing.T) { } } // Try inserting a non-trusted connection. - c := newconn(randomID()) + anotherID := randomID() + c := newconn(anotherID) if err := srv.checkpoint(c, srv.posthandshake); err != DiscTooManyPeers { t.Error("wrong error for insert:", err) } @@ -363,15 +380,41 @@ func TestServerAtCap(t *testing.T) { if !c.is(trustedConn) { t.Error("Server did not set trusted flag") } + + // Remove from trusted set and try again + srv.RemoveTrustedPeer(&discover.Node{ID: trustedID}) + c = newconn(trustedID) + if err := srv.checkpoint(c, srv.posthandshake); err != DiscTooManyPeers { + t.Error("wrong error for insert:", err) + } + + // Add anotherID to trusted set and try again + srv.AddTrustedPeer(&discover.Node{ID: anotherID}) + c = newconn(anotherID) + if err := srv.checkpoint(c, srv.posthandshake); err != nil { + t.Error("unexpected error for trusted conn @posthandshake:", err) + } + if !c.is(trustedConn) { + t.Error("Server did not set trusted flag") + } } func TestServerPeerLimits(t *testing.T) { srvkey := newkey() - srvid := discover.PubkeyID(&srvkey.PublicKey) - var tp *setupTransport = &setupTransport{id: srvid, phs: &protoHandshake{ID: srvid}} + clientid := randomID() + clientnode := &discover.Node{ID: clientid} + + var tp *setupTransport = &setupTransport{ + id: clientid, + phs: &protoHandshake{ + ID: clientid, + // Force "DiscUselessPeer" due to unmatching caps + // Caps: []Cap{discard.cap()}, + }, + } var flags connFlag = dynDialedConn - var dialDest *discover.Node = &discover.Node{ID: srvid} + var dialDest *discover.Node = &discover.Node{ID: clientid} srv := &Server{ Config: Config{ @@ -386,6 +429,7 @@ func TestServerPeerLimits(t *testing.T) { if err := srv.Start(); err != nil { t.Fatalf("couldn't start server: %v", err) } + defer srv.Stop() // Check that server is full (MaxPeers=0) conn, _ := net.Pipe() @@ -395,7 +439,7 @@ func TestServerPeerLimits(t *testing.T) { } conn.Close() - srv.AddTrustedPeer(dialDest) + srv.AddTrustedPeer(clientnode) // Check that server allows a trusted peer despite being full. conn, _ = net.Pipe() @@ -404,12 +448,12 @@ func TestServerPeerLimits(t *testing.T) { t.Errorf("failed to bypass MaxPeers with trusted node: %q", tp.closeErr) } - if tp.closeErr != DiscSelf { + if tp.closeErr != DiscUselessPeer { t.Errorf("unexpected close error: %q", tp.closeErr) } conn.Close() - srv.RemoveTrustedPeer(dialDest) + srv.RemoveTrustedPeer(clientnode) // Check that server is full again. conn, _ = net.Pipe() -- cgit v1.2.3 From 399aa710d514561be571dc180aa4afe9fcc2138d Mon Sep 17 00:00:00 2001 From: Andrey Petrov Date: Thu, 7 Jun 2018 10:31:09 -0400 Subject: p2p: Attempt to race check peer.Inbound() in TestServerDial --- p2p/server_test.go | 3 +++ 1 file changed, 3 insertions(+) (limited to 'p2p') diff --git a/p2p/server_test.go b/p2p/server_test.go index 5fad1d0a7..7eca46938 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -181,6 +181,9 @@ func TestServerDial(t *testing.T) { if peer := srv.Peers()[0]; !peer.Info().Network.Trusted { t.Errorf("peer is not trusted after AddTrustedPeer: %v", peer) } + if peer := srv.Peers()[0]; peer.Inbound() { + t.Errorf("peer is marked inbound") + } srv.RemoveTrustedPeer(node) if peer := srv.Peers()[0]; peer.Info().Network.Trusted { t.Errorf("peer is trusted after RemoveTrustedPeer: %v", peer) -- cgit v1.2.3 From dcca66bce8ec79bcf0e06c32f57d0011f8d9fa93 Mon Sep 17 00:00:00 2001 From: Andrey Petrov Date: Thu, 7 Jun 2018 10:42:40 -0400 Subject: p2p: Cache inbound flag on Peer.isInbound to avoid a race --- p2p/peer.go | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) (limited to 'p2p') diff --git a/p2p/peer.go b/p2p/peer.go index c3907349f..ff8602602 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -95,10 +95,11 @@ type PeerEvent struct { // Peer represents a connected remote node. type Peer struct { - rw *conn - running map[string]*protoRW - log log.Logger - created mclock.AbsTime + rw *conn + isInbound bool // Cached from rw.flags to avoid a race condition + running map[string]*protoRW + log log.Logger + created mclock.AbsTime wg sync.WaitGroup protoErr chan error @@ -160,19 +161,20 @@ func (p *Peer) String() string { // Inbound returns true if the peer is an inbound connection func (p *Peer) Inbound() bool { - return p.rw.flags&inboundConn != 0 + return p.isInbound } func newPeer(conn *conn, protocols []Protocol) *Peer { protomap := matchProtocols(protocols, conn.caps, conn) p := &Peer{ - rw: conn, - running: protomap, - created: mclock.Now(), - disc: make(chan DiscReason), - protoErr: make(chan error, len(protomap)+1), // protocols + pingLoop - closed: make(chan struct{}), - log: log.New("id", conn.id, "conn", conn.flags), + rw: conn, + isInbound: conn.is(inboundConn), + running: protomap, + created: mclock.Now(), + disc: make(chan DiscReason), + protoErr: make(chan error, len(protomap)+1), // protocols + pingLoop + closed: make(chan struct{}), + log: log.New("id", conn.id, "conn", conn.flags), } return p } -- cgit v1.2.3 From 193a402cc08e69f8c6b92106e8e81104d260d26c Mon Sep 17 00:00:00 2001 From: Andrey Petrov Date: Thu, 7 Jun 2018 12:49:07 -0400 Subject: p2p: Test for peer.rw.flags race conditions --- p2p/server_test.go | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) (limited to 'p2p') diff --git a/p2p/server_test.go b/p2p/server_test.go index 7eca46938..65897e018 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -177,18 +177,25 @@ func TestServerDial(t *testing.T) { if peer := srv.Peers()[0]; peer.Info().Network.Trusted { t.Errorf("peer is trusted prematurely: %v", peer) } - srv.AddTrustedPeer(node) - if peer := srv.Peers()[0]; !peer.Info().Network.Trusted { - t.Errorf("peer is not trusted after AddTrustedPeer: %v", peer) - } - if peer := srv.Peers()[0]; peer.Inbound() { - t.Errorf("peer is marked inbound") - } - srv.RemoveTrustedPeer(node) - if peer := srv.Peers()[0]; peer.Info().Network.Trusted { - t.Errorf("peer is trusted after RemoveTrustedPeer: %v", peer) - } + done := make(chan bool) + go func() { + srv.AddTrustedPeer(node) + if peer := srv.Peers()[0]; !peer.Info().Network.Trusted { + t.Errorf("peer is not trusted after AddTrustedPeer: %v", peer) + } + srv.RemoveTrustedPeer(node) + if peer := srv.Peers()[0]; peer.Info().Network.Trusted { + t.Errorf("peer is trusted after RemoveTrustedPeer: %v", peer) + } + done <- true + }() + + // Trigger potential race conditions + peer = srv.Peers()[0] + _ = peer.Inbound() + _ = peer.Info() + <-done case <-time.After(1 * time.Second): t.Error("server did not launch peer within one second") } -- cgit v1.2.3 From 6209545083f656f2dccbe4561644a757ff6443b5 Mon Sep 17 00:00:00 2001 From: Andrey Petrov Date: Thu, 7 Jun 2018 21:50:08 -0400 Subject: p2p: Wrap conn.flags ops with atomic.Load/Store --- p2p/peer.go | 26 ++++++++++++-------------- p2p/server.go | 20 ++++++++++++++++---- p2p/server_test.go | 2 -- 3 files changed, 28 insertions(+), 20 deletions(-) (limited to 'p2p') diff --git a/p2p/peer.go b/p2p/peer.go index ff8602602..c4c1fcd7c 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -95,11 +95,10 @@ type PeerEvent struct { // Peer represents a connected remote node. type Peer struct { - rw *conn - isInbound bool // Cached from rw.flags to avoid a race condition - running map[string]*protoRW - log log.Logger - created mclock.AbsTime + rw *conn + running map[string]*protoRW + log log.Logger + created mclock.AbsTime wg sync.WaitGroup protoErr chan error @@ -161,20 +160,19 @@ func (p *Peer) String() string { // Inbound returns true if the peer is an inbound connection func (p *Peer) Inbound() bool { - return p.isInbound + return p.rw.is(inboundConn) } func newPeer(conn *conn, protocols []Protocol) *Peer { protomap := matchProtocols(protocols, conn.caps, conn) p := &Peer{ - rw: conn, - isInbound: conn.is(inboundConn), - running: protomap, - created: mclock.Now(), - disc: make(chan DiscReason), - protoErr: make(chan error, len(protomap)+1), // protocols + pingLoop - closed: make(chan struct{}), - log: log.New("id", conn.id, "conn", conn.flags), + rw: conn, + running: protomap, + created: mclock.Now(), + disc: make(chan DiscReason), + protoErr: make(chan error, len(protomap)+1), // protocols + pingLoop + closed: make(chan struct{}), + log: log.New("id", conn.id, "conn", conn.flags), } return p } diff --git a/p2p/server.go b/p2p/server.go index 39ff2f51e..d2cb94925 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -23,6 +23,7 @@ import ( "fmt" "net" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -187,7 +188,7 @@ type peerDrop struct { requested bool // true if signaled by the peer } -type connFlag int +type connFlag int32 const ( dynDialedConn connFlag = 1 << iota @@ -252,7 +253,18 @@ func (f connFlag) String() string { } func (c *conn) is(f connFlag) bool { - return c.flags&f != 0 + flags := connFlag(atomic.LoadInt32((*int32)(&c.flags))) + return flags&f != 0 +} + +func (c *conn) set(f connFlag, val bool) { + flags := connFlag(atomic.LoadInt32((*int32)(&c.flags))) + if val { + flags |= f + } else { + flags &= ^f + } + atomic.StoreInt32((*int32)(&c.flags), int32(flags)) } // Peers returns all connected peers. @@ -632,7 +644,7 @@ running: trusted[n.ID] = true // Mark any already-connected peer as trusted if p, ok := peers[n.ID]; ok { - p.rw.flags |= trustedConn + p.rw.set(trustedConn, true) } case n := <-srv.removetrusted: // This channel is used by RemoveTrustedPeer to remove an enode @@ -643,7 +655,7 @@ running: } // Unmark any already-connected peer as trusted if p, ok := peers[n.ID]; ok { - p.rw.flags &= ^trustedConn + p.rw.set(trustedConn, false) } case op := <-srv.peerOp: // This channel is used by Peers and PeerCount. diff --git a/p2p/server_test.go b/p2p/server_test.go index 65897e018..3f24a79ba 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -189,12 +189,10 @@ func TestServerDial(t *testing.T) { } done <- true }() - // Trigger potential race conditions peer = srv.Peers()[0] _ = peer.Inbound() _ = peer.Info() - <-done case <-time.After(1 * time.Second): t.Error("server did not launch peer within one second") -- cgit v1.2.3