aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--p2p/peer.go2
-rw-r--r--p2p/server.go66
-rw-r--r--p2p/server_test.go121
3 files changed, 184 insertions, 5 deletions
diff --git a/p2p/peer.go b/p2p/peer.go
index af019d07a..4828d3234 100644
--- a/p2p/peer.go
+++ b/p2p/peer.go
@@ -434,6 +434,7 @@ type PeerInfo struct {
RemoteAddress string `json:"remoteAddress"` // Remote endpoint of the TCP data connection
Inbound bool `json:"inbound"`
Trusted bool `json:"trusted"`
+ Notary bool `json:"notary"`
Static bool `json:"static"`
} `json:"network"`
Protocols map[string]interface{} `json:"protocols"` // Sub-protocol specific metadata fields
@@ -458,6 +459,7 @@ func (p *Peer) Info() *PeerInfo {
info.Network.RemoteAddress = p.RemoteAddr().String()
info.Network.Inbound = p.rw.is(inboundConn)
info.Network.Trusted = p.rw.is(trustedConn)
+ info.Network.Notary = p.rw.is(notaryConn)
info.Network.Static = p.rw.is(staticDialedConn)
// Gather all the running protocol infos
diff --git a/p2p/server.go b/p2p/server.go
index 566f01ffc..15f6ad167 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -180,6 +180,8 @@ type Server struct {
removestatic chan *enode.Node
addtrusted chan *enode.Node
removetrusted chan *enode.Node
+ addnotary chan *enode.Node
+ removenotary chan *enode.Node
posthandshake chan *conn
addpeer chan *conn
delpeer chan peerDrop
@@ -203,6 +205,7 @@ const (
staticDialedConn
inboundConn
trustedConn
+ notaryConn
)
// conn wraps a network connection with information gathered
@@ -254,6 +257,9 @@ func (f connFlag) String() string {
if f&inboundConn != 0 {
s += "-inbound"
}
+ if f&notaryConn != 0 {
+ s += "-notary"
+ }
if s != "" {
s = s[1:]
}
@@ -344,6 +350,27 @@ func (srv *Server) RemoveTrustedPeer(node *enode.Node) {
}
}
+// AddNotaryPeer connects to the given node and maintains the connection until the
+// server is shut down. If the connection fails for any reason, the server will
+// attempt to reconnect the peer.
+// AddNotaryPeer also adds the given node to a reserved whitelist which allows the
+// node to always connect, even if the slot are full.
+func (srv *Server) AddNotaryPeer(node *discover.Node) {
+ select {
+ case srv.addnotary <- node:
+ case <-srv.quit:
+ }
+}
+
+// RemoveNotaryPeer disconnects from the given node.
+// RemoveNotaryPeer also removes the given node from the notary peer set.
+func (srv *Server) RemoveNotaryPeer(node *discover.Node) {
+ select {
+ case srv.removenotary <- node:
+ case <-srv.quit:
+ }
+}
+
// SubscribePeers subscribes the given channel to peer events
func (srv *Server) SubscribeEvents(ch chan *PeerEvent) event.Subscription {
return srv.peerFeed.Subscribe(ch)
@@ -440,6 +467,8 @@ func (srv *Server) Start() (err error) {
srv.removestatic = make(chan *enode.Node)
srv.addtrusted = make(chan *enode.Node)
srv.removetrusted = make(chan *enode.Node)
+ srv.addnotary = make(chan *enode.Node)
+ srv.removenotary = make(chan *enode.Node)
srv.peerOp = make(chan peerOpFunc)
srv.peerOpDone = make(chan struct{})
@@ -610,6 +639,7 @@ func (srv *Server) run(dialstate dialer) {
peers = make(map[enode.ID]*Peer)
inboundCount = 0
trusted = make(map[enode.ID]bool, len(srv.TrustedNodes))
+ notary = make(map[enode.ID]bool)
taskdone = make(chan task, maxActiveDialTasks)
runningTasks []task
queuedTasks []task // tasks that can't run yet
@@ -693,6 +723,33 @@ running:
if p, ok := peers[n.ID()]; ok {
p.rw.set(trustedConn, false)
}
+ case n := <-srv.addnotary:
+ // This channel is used by AddNotaryPeer to add to the
+ // ephemeral notary peer list. Add it to the dialer,
+ // it will keep the node connected.
+ srv.log.Trace("Adding notary node", "node", n)
+ notary[n.ID] = true
+ if p, ok := peers[n.ID]; ok {
+ p.rw.set(notaryConn, true)
+ }
+ dialstate.addStatic(n)
+ case n := <-srv.removenotary:
+ // This channel is used by RemoveNotaryPeer to send a
+ // disconnect request to a peer and begin the
+ // stop keeping the node connected.
+ srv.log.Trace("Removing notary node", "node", n)
+ if _, ok := notary[n.ID]; ok {
+ delete(notary, n.ID)
+ }
+
+ if p, ok := peers[n.ID]; ok {
+ p.rw.set(notaryConn, false)
+ }
+
+ dialstate.removeStatic(n)
+ if p, ok := peers[n.ID]; ok {
+ p.Disconnect(DiscRequested)
+ }
case op := <-srv.peerOp:
// This channel is used by Peers and PeerCount.
op(peers)
@@ -711,6 +768,11 @@ running:
// Ensure that the trusted flag is set before checking against MaxPeers.
c.flags |= trustedConn
}
+
+ if notary[c.id] {
+ c.flags |= notaryConn
+ }
+
// TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them.
select {
case c.cont <- srv.encHandshakeChecks(peers, inboundCount, c):
@@ -791,9 +853,9 @@ func (srv *Server) protoHandshakeChecks(peers map[enode.ID]*Peer, inboundCount i
func (srv *Server) encHandshakeChecks(peers map[enode.ID]*Peer, inboundCount int, c *conn) error {
switch {
- case !c.is(trustedConn|staticDialedConn) && len(peers) >= srv.MaxPeers:
+ case !c.is(trustedConn|notaryConn|staticDialedConn) && len(peers) >= srv.MaxPeers:
return DiscTooManyPeers
- case !c.is(trustedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns():
+ case !c.is(trustedConn|notaryConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns():
return DiscTooManyPeers
case peers[c.node.ID()] != nil:
return DiscAlreadyConnected
diff --git a/p2p/server_test.go b/p2p/server_test.go
index f665c1424..c3ff825a3 100644
--- a/p2p/server_test.go
+++ b/p2p/server_test.go
@@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"golang.org/x/crypto/sha3"
@@ -172,10 +173,14 @@ func TestServerDial(t *testing.T) {
}
// Test AddTrustedPeer/RemoveTrustedPeer and changing Trusted flags
+ // Test AddNotaryPeer/RemoveTrustedPeer and changing Notary flags.
// Particularly for race conditions on changing the flag state.
if peer := srv.Peers()[0]; peer.Info().Network.Trusted {
t.Errorf("peer is trusted prematurely: %v", peer)
}
+ if peer := srv.Peers()[0]; peer.Info().Network.Notary {
+ t.Errorf("peer is notary prematurely: %v", peer)
+ }
done := make(chan bool)
go func() {
srv.AddTrustedPeer(node)
@@ -186,6 +191,15 @@ func TestServerDial(t *testing.T) {
if peer := srv.Peers()[0]; peer.Info().Network.Trusted {
t.Errorf("peer is trusted after RemoveTrustedPeer: %v", peer)
}
+
+ srv.AddNotaryPeer(node)
+ if peer := srv.Peers()[0]; !peer.Info().Network.Notary {
+ t.Errorf("peer is not notary after AddNotaryPeer: %v", peer)
+ }
+ srv.RemoveNotaryPeer(node)
+ if peer := srv.Peers()[0]; peer.Info().Network.Notary {
+ t.Errorf("peer is notary after RemoveNotaryPeer: %v", peer)
+ }
done <- true
}()
// Trigger potential race conditions
@@ -202,6 +216,73 @@ func TestServerDial(t *testing.T) {
}
}
+// TestNotaryPeer checks that the node is added to and remove from static when
+// AddNotaryPeer and RemoveNotaryPeer is called.
+func TestNotaryPeer(t *testing.T) {
+ var (
+ returned = make(chan struct{})
+ add, remove = make(chan *discover.Node), make(chan *discover.Node)
+ tg = taskgen{
+ newFunc: func(running int, peers map[discover.NodeID]*Peer) []task {
+ return []task{}
+ },
+ doneFunc: func(t task) {},
+ addFunc: func(n *discover.Node) {
+ add <- n
+ },
+ removeFunc: func(n *discover.Node) {
+ remove <- n
+ },
+ }
+ )
+
+ srv := &Server{
+ Config: Config{MaxPeers: 10},
+ quit: make(chan struct{}),
+ ntab: fakeTable{},
+ addnotary: make(chan *discover.Node),
+ removenotary: make(chan *discover.Node),
+ running: true,
+ log: log.New(),
+ }
+ srv.loopWG.Add(1)
+ go func() {
+ srv.run(tg)
+ close(returned)
+ }()
+
+ notaryID := randomID()
+ go srv.AddNotaryPeer(&discover.Node{ID: notaryID})
+
+ select {
+ case n := <-add:
+ if n.ID != notaryID {
+ t.Errorf("node ID mismatched: got %s, want %s",
+ n.ID.String(), notaryID.String())
+ }
+ case <-time.After(1 * time.Second):
+ t.Error("add static is not called within one second")
+ }
+
+ go srv.RemoveNotaryPeer(&discover.Node{ID: notaryID})
+ select {
+ case n := <-remove:
+ if n.ID != notaryID {
+ t.Errorf("node ID mismatched: got %s, want %s",
+ n.ID.String(), notaryID.String())
+ }
+ case <-time.After(1 * time.Second):
+ t.Error("remove static is not called within one second")
+ }
+
+ srv.Stop()
+ select {
+ case <-returned:
+ case <-time.After(500 * time.Millisecond):
+ t.Error("Server.run did not return within 500ms")
+ }
+}
+
// This test checks that tasks generated by dialstate are
// actually executed and taskdone is called for them.
func TestServerTaskScheduling(t *testing.T) {
@@ -326,6 +407,9 @@ func TestServerManyTasks(t *testing.T) {
type taskgen struct {
newFunc func(running int, peers map[enode.ID]*Peer) []task
doneFunc func(task)
+
+ addFunc func(*discover.Node)
+ removeFunc func(*discover.Node)
}
func (tg taskgen) newTasks(running int, peers map[enode.ID]*Peer, now time.Time) []task {
@@ -334,9 +418,11 @@ func (tg taskgen) newTasks(running int, peers map[enode.ID]*Peer, now time.Time)
func (tg taskgen) taskDone(t task, now time.Time) {
tg.doneFunc(t)
}
-func (tg taskgen) addStatic(*enode.Node) {
+func (tg taskgen) addStatic(n *enode.Node) {
+ tg.addFunc(n)
}
-func (tg taskgen) removeStatic(*enode.Node) {
+func (tg taskgen) removeStatic(n *enode.Node) {
+ tg.removeFunc(n)
}
type testTask struct {
@@ -350,10 +436,11 @@ func (t *testTask) Do(srv *Server) {
// This test checks that connections are disconnected
// just after the encryption handshake when the server is
-// at capacity. Trusted connections should still be accepted.
+// at capacity. Trusted and Notary connections should still be accepted.
func TestServerAtCap(t *testing.T) {
trustedNode := newkey()
trustedID := enode.PubkeyToIDV4(&trustedNode.PublicKey)
+ notaryID := randomID()
srv := &Server{
Config: Config{
PrivateKey: newkey(),
@@ -366,6 +453,7 @@ func TestServerAtCap(t *testing.T) {
t.Fatalf("could not start: %v", err)
}
defer srv.Stop()
+ srv.AddNotaryPeer(&discover.Node{ID: notaryID})
newconn := func(id enode.ID) *conn {
fd, _ := net.Pipe()
@@ -396,6 +484,15 @@ func TestServerAtCap(t *testing.T) {
t.Error("Server did not set trusted flag")
}
+ // Try inserting a notary connection.
+ c = newconn(notaryID)
+ if err := srv.checkpoint(c, srv.posthandshake); err != nil {
+ t.Error("unexpected error for notary conn @posthandshake:", err)
+ }
+ if !c.is(notaryConn) {
+ t.Error("Server did not set notary flag")
+ }
+
// Remove from trusted set and try again
srv.RemoveTrustedPeer(newNode(trustedID, nil))
c = newconn(trustedID)
@@ -412,6 +509,24 @@ func TestServerAtCap(t *testing.T) {
if !c.is(trustedConn) {
t.Error("Server did not set trusted flag")
}
+
+ // Remove from notary set and try again
+ srv.RemoveNotaryPeer(&discover.Node{ID: notaryID})
+ c = newconn(notaryID)
+ if err := srv.checkpoint(c, srv.posthandshake); err != DiscTooManyPeers {
+ t.Error("wrong error for insert:", err)
+ }
+
+ // Add anotherID to notary set and try again
+ anotherNotaryID := randomID()
+ srv.AddNotaryPeer(&discover.Node{ID: anotherNotaryID})
+ c = newconn(anotherNotaryID)
+ if err := srv.checkpoint(c, srv.posthandshake); err != nil {
+ t.Error("unexpected error for notary conn @posthandshake:", err)
+ }
+ if !c.is(notaryConn) {
+ t.Error("Server did not set notary flag")
+ }
}
func TestServerPeerLimits(t *testing.T) {