aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/server.go
diff options
context:
space:
mode:
authorSonic <sonic@cobinhood.com>2018-09-25 20:37:11 +0800
committerWei-Ning Huang <w@dexon.org>2019-04-09 21:32:49 +0800
commit8335eac4a488716c396f114cbe7522919b97e224 (patch)
treec680248b9a7346e63ddde403689c2a484a43c4b4 /p2p/server.go
parent6f442cd7793daad014aa0d55b3b7320392c22f02 (diff)
downloaddexon-8335eac4a488716c396f114cbe7522919b97e224.tar
dexon-8335eac4a488716c396f114cbe7522919b97e224.tar.gz
dexon-8335eac4a488716c396f114cbe7522919b97e224.tar.bz2
dexon-8335eac4a488716c396f114cbe7522919b97e224.tar.lz
dexon-8335eac4a488716c396f114cbe7522919b97e224.tar.xz
dexon-8335eac4a488716c396f114cbe7522919b97e224.tar.zst
dexon-8335eac4a488716c396f114cbe7522919b97e224.zip
dex: redesign p2p network topology
- Let p2p server support direct connection and group connection. - Introduce node meta table to maintain IP of all nodes in node set, in memory and let nodes in the network can sync this table. - Let peerSet able to manage direct connections to notary set and dkg set. The mechanism to refresh the network topology when configuration round change is not done yet.
Diffstat (limited to 'p2p/server.go')
-rw-r--r--p2p/server.go271
1 files changed, 188 insertions, 83 deletions
diff --git a/p2p/server.go b/p2p/server.go
index c3b3a00d4..a58673342 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -33,13 +33,13 @@ import (
"github.com/dexon-foundation/dexon/crypto"
"github.com/dexon-foundation/dexon/event"
"github.com/dexon-foundation/dexon/log"
- "github.com/dexon-foundation/dexon/p2p/discover"
"github.com/dexon-foundation/dexon/p2p/discv5"
"github.com/dexon-foundation/dexon/p2p/enode"
"github.com/dexon-foundation/dexon/p2p/enr"
"github.com/dexon-foundation/dexon/p2p/nat"
"github.com/dexon-foundation/dexon/p2p/netutil"
"github.com/dexon-foundation/dexon/rlp"
+ "github.com/ethereum/go-ethereum/p2p/discover"
)
const (
@@ -178,10 +178,12 @@ type Server struct {
quit chan struct{}
addstatic chan *enode.Node
removestatic chan *enode.Node
+ adddirect chan *enode.Node
+ removedirect chan *enode.Node
addtrusted chan *enode.Node
removetrusted chan *enode.Node
- addnotary chan *enode.Node
- removenotary chan *enode.Node
+ addgroup chan *dialGroup
+ removegroup chan *dialGroup
posthandshake chan *conn
addpeer chan *conn
delpeer chan peerDrop
@@ -203,9 +205,10 @@ type connFlag int32
const (
dynDialedConn connFlag = 1 << iota
staticDialedConn
+ directDialedConn
+ groupDialedConn
inboundConn
trustedConn
- notaryConn
)
// conn wraps a network connection with information gathered
@@ -254,12 +257,15 @@ func (f connFlag) String() string {
if f&staticDialedConn != 0 {
s += "-staticdial"
}
+ if f&directDialedConn != 0 {
+ s += "-directdial"
+ }
+ if f&groupDialedConn != 0 {
+ s += "-groupdial"
+ }
if f&inboundConn != 0 {
s += "-inbound"
}
- if f&notaryConn != 0 {
- s += "-notary"
- }
if s != "" {
s = s[1:]
}
@@ -333,40 +339,57 @@ func (srv *Server) RemovePeer(node *enode.Node) {
}
}
-// AddTrustedPeer adds the given node to a reserved whitelist which allows the
-// node to always connect, even if the slot are full.
-func (srv *Server) AddTrustedPeer(node *enode.Node) {
+// AddDirectPeer connects to the given node and maintains the connection until the
+// server is shut down. If the connection fails for any reason, the server will
+// attempt to reconnect the peer.
+func (srv *Server) AddDirectPeer(node *enode.Node) {
select {
- case srv.addtrusted <- node:
+ case srv.adddirect <- node:
case <-srv.quit:
}
}
-// RemoveTrustedPeer removes the given node from the trusted peer set.
-func (srv *Server) RemoveTrustedPeer(node *enode.Node) {
+// RemoveDirectPeer disconnects from the given node
+func (srv *Server) RemoveDirectPeer(node *enode.Node) {
select {
- case srv.removetrusted <- node:
+ case srv.removedirect <- node:
case <-srv.quit:
}
}
-// AddNotaryPeer connects to the given node and maintains the connection until the
-// server is shut down. If the connection fails for any reason, the server will
-// attempt to reconnect the peer.
-// AddNotaryPeer also adds the given node to a reserved whitelist which allows the
+func (srv *Server) AddGroup(name string, nodes []*enode.Node, num uint64) {
+ m := map[enode.ID]*enode.Node{}
+ for _, node := range nodes {
+ m[node.ID()] = node
+ }
+ g := &dialGroup{name: name, nodes: m, num: num}
+ select {
+ case srv.addgroup <- g:
+ case <-srv.quit:
+ }
+}
+
+func (srv *Server) RemoveGroup(name string) {
+ g := &dialGroup{name: name}
+ select {
+ case srv.removegroup <- g:
+ case <-srv.quit:
+ }
+}
+
+// AddTrustedPeer adds the given node to a reserved whitelist which allows the
// node to always connect, even if the slot are full.
-func (srv *Server) AddNotaryPeer(node *discover.Node) {
+func (srv *Server) AddTrustedPeer(node *enode.Node) {
select {
- case srv.addnotary <- node:
+ case srv.addtrusted <- node:
case <-srv.quit:
}
}
-// RemoveNotaryPeer disconnects from the given node.
-// RemoveNotaryPeer also removes the given node from the notary peer set.
-func (srv *Server) RemoveNotaryPeer(node *discover.Node) {
+// RemoveTrustedPeer removes the given node from the trusted peer set.
+func (srv *Server) RemoveTrustedPeer(node *enode.Node) {
select {
- case srv.removenotary <- node:
+ case srv.removetrusted <- node:
case <-srv.quit:
}
}
@@ -388,6 +411,36 @@ func (srv *Server) Self() *enode.Node {
return ln.Node()
}
+func (srv *Server) makeSelf(listener net.Listener, ntab discoverTable) *enode.Node {
+ // If the node is running but discovery is off, manually assemble the node infos.
+ if ntab == nil {
+ addr := srv.tcpAddr(listener)
+ return enode.NewV4(&srv.PrivateKey.PublicKey, addr.IP, addr.Port, 0)
+ }
+ // Otherwise return the discovery node.
+ return ntab.Self()
+}
+
+func (srv *Server) tcpAddr(listener net.Listener) net.TCPAddr {
+ addr := net.TCPAddr{IP: net.IP{0, 0, 0, 0}}
+ if listener == nil {
+ return addr // Inbound connections disabled, use zero address.
+ }
+ // Otherwise inject the listener address too.
+ if a, ok := listener.Addr().(*net.TCPAddr); ok {
+ addr = *a
+ }
+ if srv.NAT != nil {
+ if ip, err := srv.NAT.ExternalIP(); err == nil {
+ addr.IP = ip
+ }
+ }
+ if addr.IP.IsUnspecified() {
+ addr.IP = net.IP{127, 0, 0, 1}
+ }
+ return addr
+}
+
// Stop terminates the server and all active peer connections.
// It blocks until all active connections have been closed.
func (srv *Server) Stop() {
@@ -465,10 +518,12 @@ func (srv *Server) Start() (err error) {
srv.posthandshake = make(chan *conn)
srv.addstatic = make(chan *enode.Node)
srv.removestatic = make(chan *enode.Node)
+ srv.adddirect = make(chan *enode.Node)
+ srv.removedirect = make(chan *enode.Node)
+ srv.addgroup = make(chan *dialGroup)
+ srv.removegroup = make(chan *dialGroup)
srv.addtrusted = make(chan *enode.Node)
srv.removetrusted = make(chan *enode.Node)
- srv.addnotary = make(chan *enode.Node)
- srv.removenotary = make(chan *enode.Node)
srv.peerOp = make(chan peerOpFunc)
srv.peerOpDone = make(chan struct{})
@@ -628,6 +683,10 @@ type dialer interface {
taskDone(task, time.Time)
addStatic(*enode.Node)
removeStatic(*enode.Node)
+ addDirect(*enode.Node)
+ removeDirect(*enode.Node)
+ addGroup(*dialGroup)
+ removeGroup(*dialGroup)
}
func (srv *Server) run(dialstate dialer) {
@@ -636,13 +695,15 @@ func (srv *Server) run(dialstate dialer) {
defer srv.nodedb.Close()
var (
- peers = make(map[enode.ID]*Peer)
- inboundCount = 0
- trusted = make(map[enode.ID]bool, len(srv.TrustedNodes))
- notary = make(map[enode.ID]bool)
- taskdone = make(chan task, maxActiveDialTasks)
- runningTasks []task
- queuedTasks []task // tasks that can't run yet
+ peers = make(map[enode.ID]*Peer)
+ inboundCount = 0
+ trusted = make(map[enode.ID]bool, len(srv.TrustedNodes))
+ peerflags = make(map[enode.ID]connFlag)
+ groupRefCount = make(map[enode.ID]int32)
+ groups = make(map[string]*dialGroup)
+ taskdone = make(chan task, maxActiveDialTasks)
+ runningTasks []task
+ queuedTasks []task // tasks that can't run yet
)
// Put trusted nodes into a map to speed up checks.
// Trusted peers are loaded on startup or added via AddTrustedPeer RPC.
@@ -680,6 +741,60 @@ func (srv *Server) run(dialstate dialer) {
}
}
+ // remember and maintain the connection flags locally
+ setConnFlags := func(id enode.ID, f connFlag, val bool) {
+ if p, ok := peers[id]; ok {
+ p.rw.set(f, val)
+ }
+ if val {
+ peerflags[id] |= f
+ } else {
+ peerflags[id] &= ^f
+ }
+ if peerflags[id] == 0 {
+ delete(peerflags, id)
+ }
+ }
+
+ // Put trusted nodes into a map to speed up checks.
+ // Trusted peers are loaded on startup or added via AddTrustedPeer RPC.
+ for _, n := range srv.TrustedNodes {
+ setConnFlags(n.ID(), trustedConn, true)
+ }
+
+ canDisconnect := func(p *Peer) bool {
+ f, ok := peerflags[p.ID()]
+ if ok && f != 0 {
+ return false
+ }
+ return !p.rw.is(dynDialedConn | inboundConn)
+ }
+
+ removeGroup := func(g *dialGroup) {
+ if gg, ok := groups[g.name]; ok {
+ for id := range gg.nodes {
+ groupRefCount[id]--
+ if groupRefCount[id] == 0 {
+ setConnFlags(id, groupDialedConn, false)
+ delete(groupRefCount, id)
+ }
+ }
+ }
+ }
+
+ addGroup := func(g *dialGroup) {
+ if _, ok := groups[g.name]; ok {
+ removeGroup(groups[g.name])
+ }
+ for id := range g.nodes {
+ groupRefCount[id]++
+ if groupRefCount[id] > 0 {
+ setConnFlags(id, groupDialedConn, true)
+ }
+ }
+ groups[g.name] = g
+ }
+
running:
for {
scheduleTasks()
@@ -693,63 +808,59 @@ running:
// ephemeral static peer list. Add it to the dialer,
// it will keep the node connected.
srv.log.Trace("Adding static node", "node", n)
+ setConnFlags(n.ID(), staticDialedConn, true)
dialstate.addStatic(n)
case n := <-srv.removestatic:
// This channel is used by RemovePeer to send a
// disconnect request to a peer and begin the
// stop keeping the node connected.
srv.log.Trace("Removing static node", "node", n)
+ setConnFlags(n.ID(), staticDialedConn, false)
dialstate.removeStatic(n)
- if p, ok := peers[n.ID()]; ok {
+ if p, ok := peers[n.ID()]; ok && canDisconnect(p) {
p.Disconnect(DiscRequested)
}
+ case n := <-srv.adddirect:
+ // This channel is used by AddDirectPeer to add to the
+ // ephemeral direct peer list. Add it to the dialer,
+ // it will keep the node connected.
+ srv.log.Trace("Adding direct node", "node", n)
+ setConnFlags(n.ID(), directDialedConn, true)
+ if p, ok := peers[n.ID()]; ok {
+ p.rw.set(directDialedConn, true)
+ }
+ dialstate.addDirect(n)
+ case n := <-srv.removedirect:
+ // This channel is used by RemoveDirectPeer to send a
+ // disconnect request to a peer and begin the
+ // stop keeping the node connected.
+ srv.log.Trace("Removing direct node", "node", n)
+ setConnFlags(n.ID(), directDialedConn, false)
+ if p, ok := peers[n.ID()]; ok {
+ p.rw.set(directDialedConn, false)
+ if !p.rw.is(trustedConn | groupDialedConn) {
+ p.Disconnect(DiscRequested)
+ }
+ }
+ dialstate.removeDirect(n)
+ case g := <-srv.addgroup:
+ srv.log.Trace("Adding group", "group", g)
+ addGroup(g)
+ dialstate.addGroup(g)
+ case g := <-srv.removegroup:
+ srv.log.Trace("Removing group", "group", g)
+ removeGroup(g)
+ dialstate.removeGroup(g)
case n := <-srv.addtrusted:
// This channel is used by AddTrustedPeer to add an enode
// to the trusted node set.
srv.log.Trace("Adding trusted node", "node", n)
- trusted[n.ID()] = true
- // Mark any already-connected peer as trusted
- if p, ok := peers[n.ID()]; ok {
- p.rw.set(trustedConn, true)
- }
+ setConnFlags(n.ID(), trustedConn, true)
case n := <-srv.removetrusted:
// This channel is used by RemoveTrustedPeer to remove an enode
// from the trusted node set.
srv.log.Trace("Removing trusted node", "node", n)
- if _, ok := trusted[n.ID()]; ok {
- delete(trusted, n.ID())
- }
- // Unmark any already-connected peer as trusted
- if p, ok := peers[n.ID()]; ok {
- p.rw.set(trustedConn, false)
- }
- case n := <-srv.addnotary:
- // This channel is used by AddNotaryPeer to add to the
- // ephemeral notary peer list. Add it to the dialer,
- // it will keep the node connected.
- srv.log.Trace("Adding notary node", "node", n)
- notary[n.ID] = true
- if p, ok := peers[n.ID]; ok {
- p.rw.set(notaryConn, true)
- }
- dialstate.addStatic(n)
- case n := <-srv.removenotary:
- // This channel is used by RemoveNotaryPeer to send a
- // disconnect request to a peer and begin the
- // stop keeping the node connected.
- srv.log.Trace("Removing notary node", "node", n)
- if _, ok := notary[n.ID]; ok {
- delete(notary, n.ID)
- }
-
- if p, ok := peers[n.ID]; ok {
- p.rw.set(notaryConn, false)
- }
-
- dialstate.removeStatic(n)
- if p, ok := peers[n.ID]; ok {
- p.Disconnect(DiscRequested)
- }
+ setConnFlags(n.ID(), trustedConn, false)
case op := <-srv.peerOp:
// This channel is used by Peers and PeerCount.
op(peers)
@@ -764,15 +875,9 @@ running:
case c := <-srv.posthandshake:
// A connection has passed the encryption handshake so
// the remote identity is known (but hasn't been verified yet).
- if trusted[c.node.ID()] {
- // Ensure that the trusted flag is set before checking against MaxPeers.
- c.flags |= trustedConn
+ if f, ok := peerflags[c.node.ID()]; ok {
+ c.flags |= f
}
-
- if notary[c.id] {
- c.flags |= notaryConn
- }
-
// TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them.
select {
case c.cont <- srv.encHandshakeChecks(peers, inboundCount, c):
@@ -853,9 +958,9 @@ func (srv *Server) protoHandshakeChecks(peers map[enode.ID]*Peer, inboundCount i
func (srv *Server) encHandshakeChecks(peers map[enode.ID]*Peer, inboundCount int, c *conn) error {
switch {
- case !c.is(trustedConn|notaryConn|staticDialedConn) && len(peers) >= srv.MaxPeers:
+ case !c.is(trustedConn|staticDialedConn|directDialedConn|groupDialedConn) && len(peers) >= srv.MaxPeers:
return DiscTooManyPeers
- case !c.is(trustedConn|notaryConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns():
+ case !c.is(trustedConn|directDialedConn|groupDialedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns():
return DiscTooManyPeers
case peers[c.node.ID()] != nil:
return DiscAlreadyConnected