aboutsummaryrefslogtreecommitdiffstats
path: root/p2p
diff options
context:
space:
mode:
authorFelföldi Zsolt <zsfelfoldi@gmail.com>2018-08-06 19:30:04 +0800
committerGitHub <noreply@github.com>2018-08-06 19:30:04 +0800
commitc4df67461f6e0d35389e0dabd5a932a991e3a42d (patch)
treec4baaa02385b397f9b872bb4ac8a0a36c7bfeb6a /p2p
parent941018b570aef77f1a47197a88e7a0b533b718f7 (diff)
parent6209545083f656f2dccbe4561644a757ff6443b5 (diff)
downloaddexon-c4df67461f6e0d35389e0dabd5a932a991e3a42d.tar
dexon-c4df67461f6e0d35389e0dabd5a932a991e3a42d.tar.gz
dexon-c4df67461f6e0d35389e0dabd5a932a991e3a42d.tar.bz2
dexon-c4df67461f6e0d35389e0dabd5a932a991e3a42d.tar.lz
dexon-c4df67461f6e0d35389e0dabd5a932a991e3a42d.tar.xz
dexon-c4df67461f6e0d35389e0dabd5a932a991e3a42d.tar.zst
dexon-c4df67461f6e0d35389e0dabd5a932a991e3a42d.zip
Merge pull request #16333 from shazow/addremovetrustedpeer
rpc: Add admin_addTrustedPeer and admin_removeTrustedPeer.
Diffstat (limited to 'p2p')
-rw-r--r--p2p/peer.go2
-rw-r--r--p2p/server.go62
-rw-r--r--p2p/server_test.go110
3 files changed, 166 insertions, 8 deletions
diff --git a/p2p/peer.go b/p2p/peer.go
index e8d1f2540..482e3d506 100644
--- a/p2p/peer.go
+++ b/p2p/peer.go
@@ -165,7 +165,7 @@ func (p *Peer) String() string {
// Inbound returns true if the peer is an inbound connection
func (p *Peer) Inbound() bool {
- return p.rw.flags&inboundConn != 0
+ return p.rw.is(inboundConn)
}
func newPeer(conn *conn, protocols []Protocol) *Peer {
diff --git a/p2p/server.go b/p2p/server.go
index 8f860d8f1..669ef740d 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -23,6 +23,7 @@ import (
"fmt"
"net"
"sync"
+ "sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
@@ -169,6 +170,8 @@ type Server struct {
quit chan struct{}
addstatic chan *discover.Node
removestatic chan *discover.Node
+ addtrusted chan *discover.Node
+ removetrusted chan *discover.Node
posthandshake chan *conn
addpeer chan *conn
delpeer chan peerDrop
@@ -185,7 +188,7 @@ type peerDrop struct {
requested bool // true if signaled by the peer
}
-type connFlag int
+type connFlag int32
const (
dynDialedConn connFlag = 1 << iota
@@ -250,7 +253,18 @@ func (f connFlag) String() string {
}
func (c *conn) is(f connFlag) bool {
- return c.flags&f != 0
+ flags := connFlag(atomic.LoadInt32((*int32)(&c.flags)))
+ return flags&f != 0
+}
+
+func (c *conn) set(f connFlag, val bool) {
+ flags := connFlag(atomic.LoadInt32((*int32)(&c.flags)))
+ if val {
+ flags |= f
+ } else {
+ flags &= ^f
+ }
+ atomic.StoreInt32((*int32)(&c.flags), int32(flags))
}
// Peers returns all connected peers.
@@ -300,6 +314,23 @@ func (srv *Server) RemovePeer(node *discover.Node) {
}
}
+// AddTrustedPeer adds the given node to a reserved whitelist which allows the
+// node to always connect, even if the slot are full.
+func (srv *Server) AddTrustedPeer(node *discover.Node) {
+ select {
+ case srv.addtrusted <- node:
+ case <-srv.quit:
+ }
+}
+
+// RemoveTrustedPeer removes the given node from the trusted peer set.
+func (srv *Server) RemoveTrustedPeer(node *discover.Node) {
+ select {
+ case srv.removetrusted <- node:
+ case <-srv.quit:
+ }
+}
+
// SubscribePeers subscribes the given channel to peer events
func (srv *Server) SubscribeEvents(ch chan *PeerEvent) event.Subscription {
return srv.peerFeed.Subscribe(ch)
@@ -411,6 +442,8 @@ func (srv *Server) Start() (err error) {
srv.posthandshake = make(chan *conn)
srv.addstatic = make(chan *discover.Node)
srv.removestatic = make(chan *discover.Node)
+ srv.addtrusted = make(chan *discover.Node)
+ srv.removetrusted = make(chan *discover.Node)
srv.peerOp = make(chan peerOpFunc)
srv.peerOpDone = make(chan struct{})
@@ -547,8 +580,7 @@ func (srv *Server) run(dialstate dialer) {
queuedTasks []task // tasks that can't run yet
)
// Put trusted nodes into a map to speed up checks.
- // Trusted peers are loaded on startup and cannot be
- // modified while the server is running.
+ // Trusted peers are loaded on startup or added via AddTrustedPeer RPC.
for _, n := range srv.TrustedNodes {
trusted[n.ID] = true
}
@@ -600,12 +632,32 @@ running:
case n := <-srv.removestatic:
// This channel is used by RemovePeer to send a
// disconnect request to a peer and begin the
- // stop keeping the node connected
+ // stop keeping the node connected.
srv.log.Trace("Removing static node", "node", n)
dialstate.removeStatic(n)
if p, ok := peers[n.ID]; ok {
p.Disconnect(DiscRequested)
}
+ case n := <-srv.addtrusted:
+ // This channel is used by AddTrustedPeer to add an enode
+ // to the trusted node set.
+ srv.log.Trace("Adding trusted node", "node", n)
+ trusted[n.ID] = true
+ // Mark any already-connected peer as trusted
+ if p, ok := peers[n.ID]; ok {
+ p.rw.set(trustedConn, true)
+ }
+ case n := <-srv.removetrusted:
+ // This channel is used by RemoveTrustedPeer to remove an enode
+ // from the trusted node set.
+ srv.log.Trace("Removing trusted node", "node", n)
+ if _, ok := trusted[n.ID]; ok {
+ delete(trusted, n.ID)
+ }
+ // Unmark any already-connected peer as trusted
+ if p, ok := peers[n.ID]; ok {
+ p.rw.set(trustedConn, false)
+ }
case op := <-srv.peerOp:
// This channel is used by Peers and PeerCount.
op(peers)
diff --git a/p2p/server_test.go b/p2p/server_test.go
index 10c36528e..3f24a79ba 100644
--- a/p2p/server_test.go
+++ b/p2p/server_test.go
@@ -148,7 +148,8 @@ func TestServerDial(t *testing.T) {
// tell the server to connect
tcpAddr := listener.Addr().(*net.TCPAddr)
- srv.AddPeer(&discover.Node{ID: remid, IP: tcpAddr.IP, TCP: uint16(tcpAddr.Port)})
+ node := &discover.Node{ID: remid, IP: tcpAddr.IP, TCP: uint16(tcpAddr.Port)}
+ srv.AddPeer(node)
select {
case conn := <-accepted:
@@ -170,6 +171,29 @@ func TestServerDial(t *testing.T) {
if !reflect.DeepEqual(peers, []*Peer{peer}) {
t.Errorf("Peers mismatch: got %v, want %v", peers, []*Peer{peer})
}
+
+ // Test AddTrustedPeer/RemoveTrustedPeer and changing Trusted flags
+ // Particularly for race conditions on changing the flag state.
+ if peer := srv.Peers()[0]; peer.Info().Network.Trusted {
+ t.Errorf("peer is trusted prematurely: %v", peer)
+ }
+ done := make(chan bool)
+ go func() {
+ srv.AddTrustedPeer(node)
+ if peer := srv.Peers()[0]; !peer.Info().Network.Trusted {
+ t.Errorf("peer is not trusted after AddTrustedPeer: %v", peer)
+ }
+ srv.RemoveTrustedPeer(node)
+ if peer := srv.Peers()[0]; peer.Info().Network.Trusted {
+ t.Errorf("peer is trusted after RemoveTrustedPeer: %v", peer)
+ }
+ done <- true
+ }()
+ // Trigger potential race conditions
+ peer = srv.Peers()[0]
+ _ = peer.Inbound()
+ _ = peer.Info()
+ <-done
case <-time.After(1 * time.Second):
t.Error("server did not launch peer within one second")
}
@@ -351,7 +375,8 @@ func TestServerAtCap(t *testing.T) {
}
}
// Try inserting a non-trusted connection.
- c := newconn(randomID())
+ anotherID := randomID()
+ c := newconn(anotherID)
if err := srv.checkpoint(c, srv.posthandshake); err != DiscTooManyPeers {
t.Error("wrong error for insert:", err)
}
@@ -364,6 +389,87 @@ func TestServerAtCap(t *testing.T) {
t.Error("Server did not set trusted flag")
}
+ // Remove from trusted set and try again
+ srv.RemoveTrustedPeer(&discover.Node{ID: trustedID})
+ c = newconn(trustedID)
+ if err := srv.checkpoint(c, srv.posthandshake); err != DiscTooManyPeers {
+ t.Error("wrong error for insert:", err)
+ }
+
+ // Add anotherID to trusted set and try again
+ srv.AddTrustedPeer(&discover.Node{ID: anotherID})
+ c = newconn(anotherID)
+ if err := srv.checkpoint(c, srv.posthandshake); err != nil {
+ t.Error("unexpected error for trusted conn @posthandshake:", err)
+ }
+ if !c.is(trustedConn) {
+ t.Error("Server did not set trusted flag")
+ }
+}
+
+func TestServerPeerLimits(t *testing.T) {
+ srvkey := newkey()
+
+ clientid := randomID()
+ clientnode := &discover.Node{ID: clientid}
+
+ var tp *setupTransport = &setupTransport{
+ id: clientid,
+ phs: &protoHandshake{
+ ID: clientid,
+ // Force "DiscUselessPeer" due to unmatching caps
+ // Caps: []Cap{discard.cap()},
+ },
+ }
+ var flags connFlag = dynDialedConn
+ var dialDest *discover.Node = &discover.Node{ID: clientid}
+
+ srv := &Server{
+ Config: Config{
+ PrivateKey: srvkey,
+ MaxPeers: 0,
+ NoDial: true,
+ Protocols: []Protocol{discard},
+ },
+ newTransport: func(fd net.Conn) transport { return tp },
+ log: log.New(),
+ }
+ if err := srv.Start(); err != nil {
+ t.Fatalf("couldn't start server: %v", err)
+ }
+ defer srv.Stop()
+
+ // Check that server is full (MaxPeers=0)
+ conn, _ := net.Pipe()
+ srv.SetupConn(conn, flags, dialDest)
+ if tp.closeErr != DiscTooManyPeers {
+ t.Errorf("unexpected close error: %q", tp.closeErr)
+ }
+ conn.Close()
+
+ srv.AddTrustedPeer(clientnode)
+
+ // Check that server allows a trusted peer despite being full.
+ conn, _ = net.Pipe()
+ srv.SetupConn(conn, flags, dialDest)
+ if tp.closeErr == DiscTooManyPeers {
+ t.Errorf("failed to bypass MaxPeers with trusted node: %q", tp.closeErr)
+ }
+
+ if tp.closeErr != DiscUselessPeer {
+ t.Errorf("unexpected close error: %q", tp.closeErr)
+ }
+ conn.Close()
+
+ srv.RemoveTrustedPeer(clientnode)
+
+ // Check that server is full again.
+ conn, _ = net.Pipe()
+ srv.SetupConn(conn, flags, dialDest)
+ if tp.closeErr != DiscTooManyPeers {
+ t.Errorf("unexpected close error: %q", tp.closeErr)
+ }
+ conn.Close()
}
func TestServerSetupConn(t *testing.T) {