aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/server.go')
-rw-r--r--p2p/server.go271
1 files changed, 188 insertions, 83 deletions
diff --git a/p2p/server.go b/p2p/server.go
index c3b3a00d4..a58673342 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -33,13 +33,13 @@ import (
"github.com/dexon-foundation/dexon/crypto"
"github.com/dexon-foundation/dexon/event"
"github.com/dexon-foundation/dexon/log"
- "github.com/dexon-foundation/dexon/p2p/discover"
"github.com/dexon-foundation/dexon/p2p/discv5"
"github.com/dexon-foundation/dexon/p2p/enode"
"github.com/dexon-foundation/dexon/p2p/enr"
"github.com/dexon-foundation/dexon/p2p/nat"
"github.com/dexon-foundation/dexon/p2p/netutil"
"github.com/dexon-foundation/dexon/rlp"
+ "github.com/ethereum/go-ethereum/p2p/discover"
)
const (
@@ -178,10 +178,12 @@ type Server struct {
quit chan struct{}
addstatic chan *enode.Node
removestatic chan *enode.Node
+ adddirect chan *enode.Node
+ removedirect chan *enode.Node
addtrusted chan *enode.Node
removetrusted chan *enode.Node
- addnotary chan *enode.Node
- removenotary chan *enode.Node
+ addgroup chan *dialGroup
+ removegroup chan *dialGroup
posthandshake chan *conn
addpeer chan *conn
delpeer chan peerDrop
@@ -203,9 +205,10 @@ type connFlag int32
const (
dynDialedConn connFlag = 1 << iota
staticDialedConn
+ directDialedConn
+ groupDialedConn
inboundConn
trustedConn
- notaryConn
)
// conn wraps a network connection with information gathered
@@ -254,12 +257,15 @@ func (f connFlag) String() string {
if f&staticDialedConn != 0 {
s += "-staticdial"
}
+ if f&directDialedConn != 0 {
+ s += "-directdial"
+ }
+ if f&groupDialedConn != 0 {
+ s += "-groupdial"
+ }
if f&inboundConn != 0 {
s += "-inbound"
}
- if f&notaryConn != 0 {
- s += "-notary"
- }
if s != "" {
s = s[1:]
}
@@ -333,40 +339,57 @@ func (srv *Server) RemovePeer(node *enode.Node) {
}
}
-// AddTrustedPeer adds the given node to a reserved whitelist which allows the
-// node to always connect, even if the slot are full.
-func (srv *Server) AddTrustedPeer(node *enode.Node) {
+// AddDirectPeer connects to the given node and maintains the connection until the
+// server is shut down. If the connection fails for any reason, the server will
+// attempt to reconnect the peer.
+func (srv *Server) AddDirectPeer(node *enode.Node) {
select {
- case srv.addtrusted <- node:
+ case srv.adddirect <- node:
case <-srv.quit:
}
}
-// RemoveTrustedPeer removes the given node from the trusted peer set.
-func (srv *Server) RemoveTrustedPeer(node *enode.Node) {
+// RemoveDirectPeer disconnects from the given node
+func (srv *Server) RemoveDirectPeer(node *enode.Node) {
select {
- case srv.removetrusted <- node:
+ case srv.removedirect <- node:
case <-srv.quit:
}
}
-// AddNotaryPeer connects to the given node and maintains the connection until the
-// server is shut down. If the connection fails for any reason, the server will
-// attempt to reconnect the peer.
-// AddNotaryPeer also adds the given node to a reserved whitelist which allows the
+func (srv *Server) AddGroup(name string, nodes []*enode.Node, num uint64) {
+ m := map[enode.ID]*enode.Node{}
+ for _, node := range nodes {
+ m[node.ID()] = node
+ }
+ g := &dialGroup{name: name, nodes: m, num: num}
+ select {
+ case srv.addgroup <- g:
+ case <-srv.quit:
+ }
+}
+
+func (srv *Server) RemoveGroup(name string) {
+ g := &dialGroup{name: name}
+ select {
+ case srv.removegroup <- g:
+ case <-srv.quit:
+ }
+}
+
+// AddTrustedPeer adds the given node to a reserved whitelist which allows the
// node to always connect, even if the slot are full.
-func (srv *Server) AddNotaryPeer(node *discover.Node) {
+func (srv *Server) AddTrustedPeer(node *enode.Node) {
select {
- case srv.addnotary <- node:
+ case srv.addtrusted <- node:
case <-srv.quit:
}
}
-// RemoveNotaryPeer disconnects from the given node.
-// RemoveNotaryPeer also removes the given node from the notary peer set.
-func (srv *Server) RemoveNotaryPeer(node *discover.Node) {
+// RemoveTrustedPeer removes the given node from the trusted peer set.
+func (srv *Server) RemoveTrustedPeer(node *enode.Node) {
select {
- case srv.removenotary <- node:
+ case srv.removetrusted <- node:
case <-srv.quit:
}
}
@@ -388,6 +411,36 @@ func (srv *Server) Self() *enode.Node {
return ln.Node()
}
+func (srv *Server) makeSelf(listener net.Listener, ntab discoverTable) *enode.Node {
+ // If the node is running but discovery is off, manually assemble the node infos.
+ if ntab == nil {
+ addr := srv.tcpAddr(listener)
+ return enode.NewV4(&srv.PrivateKey.PublicKey, addr.IP, addr.Port, 0)
+ }
+ // Otherwise return the discovery node.
+ return ntab.Self()
+}
+
+func (srv *Server) tcpAddr(listener net.Listener) net.TCPAddr {
+ addr := net.TCPAddr{IP: net.IP{0, 0, 0, 0}}
+ if listener == nil {
+ return addr // Inbound connections disabled, use zero address.
+ }
+ // Otherwise inject the listener address too.
+ if a, ok := listener.Addr().(*net.TCPAddr); ok {
+ addr = *a
+ }
+ if srv.NAT != nil {
+ if ip, err := srv.NAT.ExternalIP(); err == nil {
+ addr.IP = ip
+ }
+ }
+ if addr.IP.IsUnspecified() {
+ addr.IP = net.IP{127, 0, 0, 1}
+ }
+ return addr
+}
+
// Stop terminates the server and all active peer connections.
// It blocks until all active connections have been closed.
func (srv *Server) Stop() {
@@ -465,10 +518,12 @@ func (srv *Server) Start() (err error) {
srv.posthandshake = make(chan *conn)
srv.addstatic = make(chan *enode.Node)
srv.removestatic = make(chan *enode.Node)
+ srv.adddirect = make(chan *enode.Node)
+ srv.removedirect = make(chan *enode.Node)
+ srv.addgroup = make(chan *dialGroup)
+ srv.removegroup = make(chan *dialGroup)
srv.addtrusted = make(chan *enode.Node)
srv.removetrusted = make(chan *enode.Node)
- srv.addnotary = make(chan *enode.Node)
- srv.removenotary = make(chan *enode.Node)
srv.peerOp = make(chan peerOpFunc)
srv.peerOpDone = make(chan struct{})
@@ -628,6 +683,10 @@ type dialer interface {
taskDone(task, time.Time)
addStatic(*enode.Node)
removeStatic(*enode.Node)
+ addDirect(*enode.Node)
+ removeDirect(*enode.Node)
+ addGroup(*dialGroup)
+ removeGroup(*dialGroup)
}
func (srv *Server) run(dialstate dialer) {
@@ -636,13 +695,15 @@ func (srv *Server) run(dialstate dialer) {
defer srv.nodedb.Close()
var (
- peers = make(map[enode.ID]*Peer)
- inboundCount = 0
- trusted = make(map[enode.ID]bool, len(srv.TrustedNodes))
- notary = make(map[enode.ID]bool)
- taskdone = make(chan task, maxActiveDialTasks)
- runningTasks []task
- queuedTasks []task // tasks that can't run yet
+ peers = make(map[enode.ID]*Peer)
+ inboundCount = 0
+ trusted = make(map[enode.ID]bool, len(srv.TrustedNodes))
+ peerflags = make(map[enode.ID]connFlag)
+ groupRefCount = make(map[enode.ID]int32)
+ groups = make(map[string]*dialGroup)
+ taskdone = make(chan task, maxActiveDialTasks)
+ runningTasks []task
+ queuedTasks []task // tasks that can't run yet
)
// Put trusted nodes into a map to speed up checks.
// Trusted peers are loaded on startup or added via AddTrustedPeer RPC.
@@ -680,6 +741,60 @@ func (srv *Server) run(dialstate dialer) {
}
}
+ // remember and maintain the connection flags locally
+ setConnFlags := func(id enode.ID, f connFlag, val bool) {
+ if p, ok := peers[id]; ok {
+ p.rw.set(f, val)
+ }
+ if val {
+ peerflags[id] |= f
+ } else {
+ peerflags[id] &= ^f
+ }
+ if peerflags[id] == 0 {
+ delete(peerflags, id)
+ }
+ }
+
+ // Put trusted nodes into a map to speed up checks.
+ // Trusted peers are loaded on startup or added via AddTrustedPeer RPC.
+ for _, n := range srv.TrustedNodes {
+ setConnFlags(n.ID(), trustedConn, true)
+ }
+
+ canDisconnect := func(p *Peer) bool {
+ f, ok := peerflags[p.ID()]
+ if ok && f != 0 {
+ return false
+ }
+ return !p.rw.is(dynDialedConn | inboundConn)
+ }
+
+ removeGroup := func(g *dialGroup) {
+ if gg, ok := groups[g.name]; ok {
+ for id := range gg.nodes {
+ groupRefCount[id]--
+ if groupRefCount[id] == 0 {
+ setConnFlags(id, groupDialedConn, false)
+ delete(groupRefCount, id)
+ }
+ }
+ }
+ }
+
+ addGroup := func(g *dialGroup) {
+ if _, ok := groups[g.name]; ok {
+ removeGroup(groups[g.name])
+ }
+ for id := range g.nodes {
+ groupRefCount[id]++
+ if groupRefCount[id] > 0 {
+ setConnFlags(id, groupDialedConn, true)
+ }
+ }
+ groups[g.name] = g
+ }
+
running:
for {
scheduleTasks()
@@ -693,63 +808,59 @@ running:
// ephemeral static peer list. Add it to the dialer,
// it will keep the node connected.
srv.log.Trace("Adding static node", "node", n)
+ setConnFlags(n.ID(), staticDialedConn, true)
dialstate.addStatic(n)
case n := <-srv.removestatic:
// This channel is used by RemovePeer to send a
// disconnect request to a peer and begin the
// stop keeping the node connected.
srv.log.Trace("Removing static node", "node", n)
+ setConnFlags(n.ID(), staticDialedConn, false)
dialstate.removeStatic(n)
- if p, ok := peers[n.ID()]; ok {
+ if p, ok := peers[n.ID()]; ok && canDisconnect(p) {
p.Disconnect(DiscRequested)
}
+ case n := <-srv.adddirect:
+ // This channel is used by AddDirectPeer to add to the
+ // ephemeral direct peer list. Add it to the dialer,
+ // it will keep the node connected.
+ srv.log.Trace("Adding direct node", "node", n)
+ setConnFlags(n.ID(), directDialedConn, true)
+ if p, ok := peers[n.ID()]; ok {
+ p.rw.set(directDialedConn, true)
+ }
+ dialstate.addDirect(n)
+ case n := <-srv.removedirect:
+ // This channel is used by RemoveDirectPeer to send a
+ // disconnect request to a peer and begin the
+ // stop keeping the node connected.
+ srv.log.Trace("Removing direct node", "node", n)
+ setConnFlags(n.ID(), directDialedConn, false)
+ if p, ok := peers[n.ID()]; ok {
+ p.rw.set(directDialedConn, false)
+ if !p.rw.is(trustedConn | groupDialedConn) {
+ p.Disconnect(DiscRequested)
+ }
+ }
+ dialstate.removeDirect(n)
+ case g := <-srv.addgroup:
+ srv.log.Trace("Adding group", "group", g)
+ addGroup(g)
+ dialstate.addGroup(g)
+ case g := <-srv.removegroup:
+ srv.log.Trace("Removing group", "group", g)
+ removeGroup(g)
+ dialstate.removeGroup(g)
case n := <-srv.addtrusted:
// This channel is used by AddTrustedPeer to add an enode
// to the trusted node set.
srv.log.Trace("Adding trusted node", "node", n)
- trusted[n.ID()] = true
- // Mark any already-connected peer as trusted
- if p, ok := peers[n.ID()]; ok {
- p.rw.set(trustedConn, true)
- }
+ setConnFlags(n.ID(), trustedConn, true)
case n := <-srv.removetrusted:
// This channel is used by RemoveTrustedPeer to remove an enode
// from the trusted node set.
srv.log.Trace("Removing trusted node", "node", n)
- if _, ok := trusted[n.ID()]; ok {
- delete(trusted, n.ID())
- }
- // Unmark any already-connected peer as trusted
- if p, ok := peers[n.ID()]; ok {
- p.rw.set(trustedConn, false)
- }
- case n := <-srv.addnotary:
- // This channel is used by AddNotaryPeer to add to the
- // ephemeral notary peer list. Add it to the dialer,
- // it will keep the node connected.
- srv.log.Trace("Adding notary node", "node", n)
- notary[n.ID] = true
- if p, ok := peers[n.ID]; ok {
- p.rw.set(notaryConn, true)
- }
- dialstate.addStatic(n)
- case n := <-srv.removenotary:
- // This channel is used by RemoveNotaryPeer to send a
- // disconnect request to a peer and begin the
- // stop keeping the node connected.
- srv.log.Trace("Removing notary node", "node", n)
- if _, ok := notary[n.ID]; ok {
- delete(notary, n.ID)
- }
-
- if p, ok := peers[n.ID]; ok {
- p.rw.set(notaryConn, false)
- }
-
- dialstate.removeStatic(n)
- if p, ok := peers[n.ID]; ok {
- p.Disconnect(DiscRequested)
- }
+ setConnFlags(n.ID(), trustedConn, false)
case op := <-srv.peerOp:
// This channel is used by Peers and PeerCount.
op(peers)
@@ -764,15 +875,9 @@ running:
case c := <-srv.posthandshake:
// A connection has passed the encryption handshake so
// the remote identity is known (but hasn't been verified yet).
- if trusted[c.node.ID()] {
- // Ensure that the trusted flag is set before checking against MaxPeers.
- c.flags |= trustedConn
+ if f, ok := peerflags[c.node.ID()]; ok {
+ c.flags |= f
}
-
- if notary[c.id] {
- c.flags |= notaryConn
- }
-
// TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them.
select {
case c.cont <- srv.encHandshakeChecks(peers, inboundCount, c):
@@ -853,9 +958,9 @@ func (srv *Server) protoHandshakeChecks(peers map[enode.ID]*Peer, inboundCount i
func (srv *Server) encHandshakeChecks(peers map[enode.ID]*Peer, inboundCount int, c *conn) error {
switch {
- case !c.is(trustedConn|notaryConn|staticDialedConn) && len(peers) >= srv.MaxPeers:
+ case !c.is(trustedConn|staticDialedConn|directDialedConn|groupDialedConn) && len(peers) >= srv.MaxPeers:
return DiscTooManyPeers
- case !c.is(trustedConn|notaryConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns():
+ case !c.is(trustedConn|directDialedConn|groupDialedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns():
return DiscTooManyPeers
case peers[c.node.ID()] != nil:
return DiscAlreadyConnected