aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/server.go
diff options
context:
space:
mode:
authorSonic <sonic@dexon.org>2019-01-31 19:40:39 +0800
committerWei-Ning Huang <w@byzantine-lab.io>2019-06-12 17:27:22 +0800
commit30d6c027cda04c08a1216b32d9fda2e19f53f4c0 (patch)
treeda62879f109f3669cfaa419891c023f0d1b3121e /p2p/server.go
parent13651db29d3534620bf156be1539857cf558d352 (diff)
downloadgo-tangerine-30d6c027cda04c08a1216b32d9fda2e19f53f4c0.tar
go-tangerine-30d6c027cda04c08a1216b32d9fda2e19f53f4c0.tar.gz
go-tangerine-30d6c027cda04c08a1216b32d9fda2e19f53f4c0.tar.bz2
go-tangerine-30d6c027cda04c08a1216b32d9fda2e19f53f4c0.tar.lz
go-tangerine-30d6c027cda04c08a1216b32d9fda2e19f53f4c0.tar.xz
go-tangerine-30d6c027cda04c08a1216b32d9fda2e19f53f4c0.tar.zst
go-tangerine-30d6c027cda04c08a1216b32d9fda2e19f53f4c0.zip
p2p, dex: rework connection management (#183)
* p2p, dex: rework connection management * dex: refresh our node record periodically * dex: don't send new record event if no new record
Diffstat (limited to 'p2p/server.go')
-rw-r--r--p2p/server.go146
1 files changed, 26 insertions, 120 deletions
diff --git a/p2p/server.go b/p2p/server.go
index 36b1721a5..58b76a708 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -182,8 +182,6 @@ type Server struct {
removedirect chan *enode.Node
addtrusted chan *enode.Node
removetrusted chan *enode.Node
- addgroup chan *dialGroup
- removegroup chan *dialGroup
posthandshake chan *conn
addpeer chan *conn
delpeer chan peerDrop
@@ -206,7 +204,6 @@ const (
dynDialedConn connFlag = 1 << iota
staticDialedConn
directDialedConn
- groupDialedConn
inboundConn
trustedConn
)
@@ -260,9 +257,6 @@ func (f connFlag) String() string {
if f&directDialedConn != 0 {
s += "-directdial"
}
- if f&groupDialedConn != 0 {
- s += "-groupdial"
- }
if f&inboundConn != 0 {
s += "-inbound"
}
@@ -357,26 +351,6 @@ func (srv *Server) RemoveDirectPeer(node *enode.Node) {
}
}
-func (srv *Server) AddGroup(name string, nodes []*enode.Node, num uint64) {
- m := map[enode.ID]*enode.Node{}
- for _, node := range nodes {
- m[node.ID()] = node
- }
- g := &dialGroup{name: name, nodes: m, num: num}
- select {
- case srv.addgroup <- g:
- case <-srv.quit:
- }
-}
-
-func (srv *Server) RemoveGroup(name string) {
- g := &dialGroup{name: name}
- select {
- case srv.removegroup <- g:
- case <-srv.quit:
- }
-}
-
// AddTrustedPeer adds the given node to a reserved whitelist which allows the
// node to always connect, even if the slot are full.
func (srv *Server) AddTrustedPeer(node *enode.Node) {
@@ -524,8 +498,6 @@ func (srv *Server) Start() (err error) {
srv.removestatic = make(chan *enode.Node)
srv.adddirect = make(chan *enode.Node)
srv.removedirect = make(chan *enode.Node)
- srv.addgroup = make(chan *dialGroup)
- srv.removegroup = make(chan *dialGroup)
srv.addtrusted = make(chan *enode.Node)
srv.removetrusted = make(chan *enode.Node)
srv.peerOp = make(chan peerOpFunc)
@@ -689,8 +661,6 @@ type dialer interface {
removeStatic(*enode.Node)
addDirect(*enode.Node)
removeDirect(*enode.Node)
- addGroup(*dialGroup)
- removeGroup(*dialGroup)
}
func (srv *Server) run(dialstate dialer) {
@@ -699,15 +669,12 @@ func (srv *Server) run(dialstate dialer) {
defer srv.nodedb.Close()
var (
- peers = make(map[enode.ID]*Peer)
- inboundCount = 0
- trusted = make(map[enode.ID]bool, len(srv.TrustedNodes))
- peerflags = make(map[enode.ID]connFlag)
- groupRefCount = make(map[enode.ID]int32)
- groups = make(map[string]*dialGroup)
- taskdone = make(chan task, maxActiveDialTasks)
- runningTasks []task
- queuedTasks []task // tasks that can't run yet
+ peers = make(map[enode.ID]*Peer)
+ inboundCount = 0
+ trusted = make(map[enode.ID]bool, len(srv.TrustedNodes))
+ taskdone = make(chan task, maxActiveDialTasks)
+ runningTasks []task
+ queuedTasks []task // tasks that can't run yet
)
// Put trusted nodes into a map to speed up checks.
// Trusted peers are loaded on startup or added via AddTrustedPeer RPC.
@@ -745,60 +712,6 @@ func (srv *Server) run(dialstate dialer) {
}
}
- // remember and maintain the connection flags locally
- setConnFlags := func(id enode.ID, f connFlag, val bool) {
- if p, ok := peers[id]; ok {
- p.rw.set(f, val)
- }
- if val {
- peerflags[id] |= f
- } else {
- peerflags[id] &= ^f
- }
- if peerflags[id] == 0 {
- delete(peerflags, id)
- }
- }
-
- // Put trusted nodes into a map to speed up checks.
- // Trusted peers are loaded on startup or added via AddTrustedPeer RPC.
- for _, n := range srv.TrustedNodes {
- setConnFlags(n.ID(), trustedConn, true)
- }
-
- canDisconnect := func(p *Peer) bool {
- f, ok := peerflags[p.ID()]
- if ok && f != 0 {
- return false
- }
- return !p.rw.is(dynDialedConn | inboundConn)
- }
-
- removeGroup := func(g *dialGroup) {
- if gg, ok := groups[g.name]; ok {
- for id := range gg.nodes {
- groupRefCount[id]--
- if groupRefCount[id] == 0 {
- setConnFlags(id, groupDialedConn, false)
- delete(groupRefCount, id)
- }
- }
- }
- }
-
- addGroup := func(g *dialGroup) {
- if _, ok := groups[g.name]; ok {
- removeGroup(groups[g.name])
- }
- for id := range g.nodes {
- groupRefCount[id]++
- if groupRefCount[id] > 0 {
- setConnFlags(id, groupDialedConn, true)
- }
- }
- groups[g.name] = g
- }
-
running:
for {
scheduleTasks()
@@ -812,16 +725,14 @@ running:
// ephemeral static peer list. Add it to the dialer,
// it will keep the node connected.
srv.log.Trace("Adding static node", "node", n)
- setConnFlags(n.ID(), staticDialedConn, true)
dialstate.addStatic(n)
case n := <-srv.removestatic:
// This channel is used by RemovePeer to send a
// disconnect request to a peer and begin the
// stop keeping the node connected.
srv.log.Trace("Removing static node", "node", n)
- setConnFlags(n.ID(), staticDialedConn, false)
dialstate.removeStatic(n)
- if p, ok := peers[n.ID()]; ok && canDisconnect(p) {
+ if p, ok := peers[n.ID()]; ok {
p.Disconnect(DiscRequested)
}
case n := <-srv.adddirect:
@@ -829,42 +740,36 @@ running:
// ephemeral direct peer list. Add it to the dialer,
// it will keep the node connected.
srv.log.Trace("Adding direct node", "node", n)
- setConnFlags(n.ID(), directDialedConn, true)
- if p, ok := peers[n.ID()]; ok {
- p.rw.set(directDialedConn, true)
- }
dialstate.addDirect(n)
case n := <-srv.removedirect:
// This channel is used by RemoveDirectPeer to send a
// disconnect request to a peer and begin the
// stop keeping the node connected.
srv.log.Trace("Removing direct node", "node", n)
- setConnFlags(n.ID(), directDialedConn, false)
+ dialstate.removeDirect(n)
if p, ok := peers[n.ID()]; ok {
- p.rw.set(directDialedConn, false)
- if !p.rw.is(trustedConn | groupDialedConn) {
- p.Disconnect(DiscRequested)
- }
+ p.Disconnect(DiscRequested)
}
- dialstate.removeDirect(n)
- case g := <-srv.addgroup:
- srv.log.Trace("Adding group", "group", g)
- addGroup(g)
- dialstate.addGroup(g)
- case g := <-srv.removegroup:
- srv.log.Trace("Removing group", "group", g)
- removeGroup(g)
- dialstate.removeGroup(g)
case n := <-srv.addtrusted:
// This channel is used by AddTrustedPeer to add an enode
// to the trusted node set.
srv.log.Trace("Adding trusted node", "node", n)
- setConnFlags(n.ID(), trustedConn, true)
+ trusted[n.ID()] = true
+ // Mark any already-connected peer as trusted
+ if p, ok := peers[n.ID()]; ok {
+ p.rw.set(trustedConn, true)
+ }
case n := <-srv.removetrusted:
// This channel is used by RemoveTrustedPeer to remove an enode
// from the trusted node set.
srv.log.Trace("Removing trusted node", "node", n)
- setConnFlags(n.ID(), trustedConn, false)
+ if _, ok := trusted[n.ID()]; ok {
+ delete(trusted, n.ID())
+ }
+ // Unmark any already-connected peer as trusted
+ if p, ok := peers[n.ID()]; ok {
+ p.rw.set(trustedConn, false)
+ }
case op := <-srv.peerOp:
// This channel is used by Peers and PeerCount.
op(peers)
@@ -879,8 +784,9 @@ running:
case c := <-srv.posthandshake:
// A connection has passed the encryption handshake so
// the remote identity is known (but hasn't been verified yet).
- if f, ok := peerflags[c.node.ID()]; ok {
- c.flags |= f
+ if trusted[c.node.ID()] {
+ // Ensure that the trusted flag is set before checking against MaxPeers.
+ c.flags |= trustedConn
}
// TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them.
select {
@@ -962,9 +868,9 @@ func (srv *Server) protoHandshakeChecks(peers map[enode.ID]*Peer, inboundCount i
func (srv *Server) encHandshakeChecks(peers map[enode.ID]*Peer, inboundCount int, c *conn) error {
switch {
- case !c.is(trustedConn|staticDialedConn|directDialedConn|groupDialedConn) && len(peers) >= srv.MaxPeers:
+ case !c.is(trustedConn|staticDialedConn|directDialedConn) && len(peers) >= srv.MaxPeers:
return DiscTooManyPeers
- case !c.is(trustedConn|directDialedConn|groupDialedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns():
+ case !c.is(trustedConn|directDialedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns():
return DiscTooManyPeers
case peers[c.node.ID()] != nil:
return DiscAlreadyConnected