diff options
-rw-r--r-- | p2p/peer.go | 2 | ||||
-rw-r--r-- | p2p/server.go | 66 | ||||
-rw-r--r-- | p2p/server_test.go | 121 |
3 files changed, 184 insertions, 5 deletions
diff --git a/p2p/peer.go b/p2p/peer.go index af019d07a..4828d3234 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -434,6 +434,7 @@ type PeerInfo struct { RemoteAddress string `json:"remoteAddress"` // Remote endpoint of the TCP data connection Inbound bool `json:"inbound"` Trusted bool `json:"trusted"` + Notary bool `json:"notary"` Static bool `json:"static"` } `json:"network"` Protocols map[string]interface{} `json:"protocols"` // Sub-protocol specific metadata fields @@ -458,6 +459,7 @@ func (p *Peer) Info() *PeerInfo { info.Network.RemoteAddress = p.RemoteAddr().String() info.Network.Inbound = p.rw.is(inboundConn) info.Network.Trusted = p.rw.is(trustedConn) + info.Network.Notary = p.rw.is(notaryConn) info.Network.Static = p.rw.is(staticDialedConn) // Gather all the running protocol infos diff --git a/p2p/server.go b/p2p/server.go index 566f01ffc..15f6ad167 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -180,6 +180,8 @@ type Server struct { removestatic chan *enode.Node addtrusted chan *enode.Node removetrusted chan *enode.Node + addnotary chan *enode.Node + removenotary chan *enode.Node posthandshake chan *conn addpeer chan *conn delpeer chan peerDrop @@ -203,6 +205,7 @@ const ( staticDialedConn inboundConn trustedConn + notaryConn ) // conn wraps a network connection with information gathered @@ -254,6 +257,9 @@ func (f connFlag) String() string { if f&inboundConn != 0 { s += "-inbound" } + if f¬aryConn != 0 { + s += "-notary" + } if s != "" { s = s[1:] } @@ -344,6 +350,27 @@ func (srv *Server) RemoveTrustedPeer(node *enode.Node) { } } +// AddNotaryPeer connects to the given node and maintains the connection until the +// server is shut down. If the connection fails for any reason, the server will +// attempt to reconnect the peer. +// AddNotaryPeer also adds the given node to a reserved whitelist which allows the +// node to always connect, even if the slot are full. +func (srv *Server) AddNotaryPeer(node *discover.Node) { + select { + case srv.addnotary <- node: + case <-srv.quit: + } +} + +// RemoveNotaryPeer disconnects from the given node. +// RemoveNotaryPeer also removes the given node from the notary peer set. +func (srv *Server) RemoveNotaryPeer(node *discover.Node) { + select { + case srv.removenotary <- node: + case <-srv.quit: + } +} + // SubscribePeers subscribes the given channel to peer events func (srv *Server) SubscribeEvents(ch chan *PeerEvent) event.Subscription { return srv.peerFeed.Subscribe(ch) @@ -440,6 +467,8 @@ func (srv *Server) Start() (err error) { srv.removestatic = make(chan *enode.Node) srv.addtrusted = make(chan *enode.Node) srv.removetrusted = make(chan *enode.Node) + srv.addnotary = make(chan *enode.Node) + srv.removenotary = make(chan *enode.Node) srv.peerOp = make(chan peerOpFunc) srv.peerOpDone = make(chan struct{}) @@ -610,6 +639,7 @@ func (srv *Server) run(dialstate dialer) { peers = make(map[enode.ID]*Peer) inboundCount = 0 trusted = make(map[enode.ID]bool, len(srv.TrustedNodes)) + notary = make(map[enode.ID]bool) taskdone = make(chan task, maxActiveDialTasks) runningTasks []task queuedTasks []task // tasks that can't run yet @@ -693,6 +723,33 @@ running: if p, ok := peers[n.ID()]; ok { p.rw.set(trustedConn, false) } + case n := <-srv.addnotary: + // This channel is used by AddNotaryPeer to add to the + // ephemeral notary peer list. Add it to the dialer, + // it will keep the node connected. + srv.log.Trace("Adding notary node", "node", n) + notary[n.ID] = true + if p, ok := peers[n.ID]; ok { + p.rw.set(notaryConn, true) + } + dialstate.addStatic(n) + case n := <-srv.removenotary: + // This channel is used by RemoveNotaryPeer to send a + // disconnect request to a peer and begin the + // stop keeping the node connected. + srv.log.Trace("Removing notary node", "node", n) + if _, ok := notary[n.ID]; ok { + delete(notary, n.ID) + } + + if p, ok := peers[n.ID]; ok { + p.rw.set(notaryConn, false) + } + + dialstate.removeStatic(n) + if p, ok := peers[n.ID]; ok { + p.Disconnect(DiscRequested) + } case op := <-srv.peerOp: // This channel is used by Peers and PeerCount. op(peers) @@ -711,6 +768,11 @@ running: // Ensure that the trusted flag is set before checking against MaxPeers. c.flags |= trustedConn } + + if notary[c.id] { + c.flags |= notaryConn + } + // TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them. select { case c.cont <- srv.encHandshakeChecks(peers, inboundCount, c): @@ -791,9 +853,9 @@ func (srv *Server) protoHandshakeChecks(peers map[enode.ID]*Peer, inboundCount i func (srv *Server) encHandshakeChecks(peers map[enode.ID]*Peer, inboundCount int, c *conn) error { switch { - case !c.is(trustedConn|staticDialedConn) && len(peers) >= srv.MaxPeers: + case !c.is(trustedConn|notaryConn|staticDialedConn) && len(peers) >= srv.MaxPeers: return DiscTooManyPeers - case !c.is(trustedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns(): + case !c.is(trustedConn|notaryConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns(): return DiscTooManyPeers case peers[c.node.ID()] != nil: return DiscAlreadyConnected diff --git a/p2p/server_test.go b/p2p/server_test.go index f665c1424..c3ff825a3 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -27,6 +27,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" "golang.org/x/crypto/sha3" @@ -172,10 +173,14 @@ func TestServerDial(t *testing.T) { } // Test AddTrustedPeer/RemoveTrustedPeer and changing Trusted flags + // Test AddNotaryPeer/RemoveTrustedPeer and changing Notary flags. // Particularly for race conditions on changing the flag state. if peer := srv.Peers()[0]; peer.Info().Network.Trusted { t.Errorf("peer is trusted prematurely: %v", peer) } + if peer := srv.Peers()[0]; peer.Info().Network.Notary { + t.Errorf("peer is notary prematurely: %v", peer) + } done := make(chan bool) go func() { srv.AddTrustedPeer(node) @@ -186,6 +191,15 @@ func TestServerDial(t *testing.T) { if peer := srv.Peers()[0]; peer.Info().Network.Trusted { t.Errorf("peer is trusted after RemoveTrustedPeer: %v", peer) } + + srv.AddNotaryPeer(node) + if peer := srv.Peers()[0]; !peer.Info().Network.Notary { + t.Errorf("peer is not notary after AddNotaryPeer: %v", peer) + } + srv.RemoveNotaryPeer(node) + if peer := srv.Peers()[0]; peer.Info().Network.Notary { + t.Errorf("peer is notary after RemoveNotaryPeer: %v", peer) + } done <- true }() // Trigger potential race conditions @@ -202,6 +216,73 @@ func TestServerDial(t *testing.T) { } } +// TestNotaryPeer checks that the node is added to and remove from static when +// AddNotaryPeer and RemoveNotaryPeer is called. +func TestNotaryPeer(t *testing.T) { + var ( + returned = make(chan struct{}) + add, remove = make(chan *discover.Node), make(chan *discover.Node) + tg = taskgen{ + newFunc: func(running int, peers map[discover.NodeID]*Peer) []task { + return []task{} + }, + doneFunc: func(t task) {}, + addFunc: func(n *discover.Node) { + add <- n + }, + removeFunc: func(n *discover.Node) { + remove <- n + }, + } + ) + + srv := &Server{ + Config: Config{MaxPeers: 10}, + quit: make(chan struct{}), + ntab: fakeTable{}, + addnotary: make(chan *discover.Node), + removenotary: make(chan *discover.Node), + running: true, + log: log.New(), + } + srv.loopWG.Add(1) + go func() { + srv.run(tg) + close(returned) + }() + + notaryID := randomID() + go srv.AddNotaryPeer(&discover.Node{ID: notaryID}) + + select { + case n := <-add: + if n.ID != notaryID { + t.Errorf("node ID mismatched: got %s, want %s", + n.ID.String(), notaryID.String()) + } + case <-time.After(1 * time.Second): + t.Error("add static is not called within one second") + } + + go srv.RemoveNotaryPeer(&discover.Node{ID: notaryID}) + select { + case n := <-remove: + if n.ID != notaryID { + t.Errorf("node ID mismatched: got %s, want %s", + n.ID.String(), notaryID.String()) + } + case <-time.After(1 * time.Second): + t.Error("remove static is not called within one second") + } + + srv.Stop() + select { + case <-returned: + case <-time.After(500 * time.Millisecond): + t.Error("Server.run did not return within 500ms") + } +} + // This test checks that tasks generated by dialstate are // actually executed and taskdone is called for them. func TestServerTaskScheduling(t *testing.T) { @@ -326,6 +407,9 @@ func TestServerManyTasks(t *testing.T) { type taskgen struct { newFunc func(running int, peers map[enode.ID]*Peer) []task doneFunc func(task) + + addFunc func(*discover.Node) + removeFunc func(*discover.Node) } func (tg taskgen) newTasks(running int, peers map[enode.ID]*Peer, now time.Time) []task { @@ -334,9 +418,11 @@ func (tg taskgen) newTasks(running int, peers map[enode.ID]*Peer, now time.Time) func (tg taskgen) taskDone(t task, now time.Time) { tg.doneFunc(t) } -func (tg taskgen) addStatic(*enode.Node) { +func (tg taskgen) addStatic(n *enode.Node) { + tg.addFunc(n) } -func (tg taskgen) removeStatic(*enode.Node) { +func (tg taskgen) removeStatic(n *enode.Node) { + tg.removeFunc(n) } type testTask struct { @@ -350,10 +436,11 @@ func (t *testTask) Do(srv *Server) { // This test checks that connections are disconnected // just after the encryption handshake when the server is -// at capacity. Trusted connections should still be accepted. +// at capacity. Trusted and Notary connections should still be accepted. func TestServerAtCap(t *testing.T) { trustedNode := newkey() trustedID := enode.PubkeyToIDV4(&trustedNode.PublicKey) + notaryID := randomID() srv := &Server{ Config: Config{ PrivateKey: newkey(), @@ -366,6 +453,7 @@ func TestServerAtCap(t *testing.T) { t.Fatalf("could not start: %v", err) } defer srv.Stop() + srv.AddNotaryPeer(&discover.Node{ID: notaryID}) newconn := func(id enode.ID) *conn { fd, _ := net.Pipe() @@ -396,6 +484,15 @@ func TestServerAtCap(t *testing.T) { t.Error("Server did not set trusted flag") } + // Try inserting a notary connection. + c = newconn(notaryID) + if err := srv.checkpoint(c, srv.posthandshake); err != nil { + t.Error("unexpected error for notary conn @posthandshake:", err) + } + if !c.is(notaryConn) { + t.Error("Server did not set notary flag") + } + // Remove from trusted set and try again srv.RemoveTrustedPeer(newNode(trustedID, nil)) c = newconn(trustedID) @@ -412,6 +509,24 @@ func TestServerAtCap(t *testing.T) { if !c.is(trustedConn) { t.Error("Server did not set trusted flag") } + + // Remove from notary set and try again + srv.RemoveNotaryPeer(&discover.Node{ID: notaryID}) + c = newconn(notaryID) + if err := srv.checkpoint(c, srv.posthandshake); err != DiscTooManyPeers { + t.Error("wrong error for insert:", err) + } + + // Add anotherID to notary set and try again + anotherNotaryID := randomID() + srv.AddNotaryPeer(&discover.Node{ID: anotherNotaryID}) + c = newconn(anotherNotaryID) + if err := srv.checkpoint(c, srv.posthandshake); err != nil { + t.Error("unexpected error for notary conn @posthandshake:", err) + } + if !c.is(notaryConn) { + t.Error("Server did not set notary flag") + } } func TestServerPeerLimits(t *testing.T) { |