diff options
author | Sonic <sonic@dexon.org> | 2019-01-31 19:40:39 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@byzantine-lab.io> | 2019-06-12 17:27:22 +0800 |
commit | 30d6c027cda04c08a1216b32d9fda2e19f53f4c0 (patch) | |
tree | da62879f109f3669cfaa419891c023f0d1b3121e /p2p/server.go | |
parent | 13651db29d3534620bf156be1539857cf558d352 (diff) | |
download | go-tangerine-30d6c027cda04c08a1216b32d9fda2e19f53f4c0.tar go-tangerine-30d6c027cda04c08a1216b32d9fda2e19f53f4c0.tar.gz go-tangerine-30d6c027cda04c08a1216b32d9fda2e19f53f4c0.tar.bz2 go-tangerine-30d6c027cda04c08a1216b32d9fda2e19f53f4c0.tar.lz go-tangerine-30d6c027cda04c08a1216b32d9fda2e19f53f4c0.tar.xz go-tangerine-30d6c027cda04c08a1216b32d9fda2e19f53f4c0.tar.zst go-tangerine-30d6c027cda04c08a1216b32d9fda2e19f53f4c0.zip |
p2p, dex: rework connection management (#183)
* p2p, dex: rework connection management
* dex: refresh our node record periodically
* dex: don't send new record event if no new record
Diffstat (limited to 'p2p/server.go')
-rw-r--r-- | p2p/server.go | 146 |
1 files changed, 26 insertions, 120 deletions
diff --git a/p2p/server.go b/p2p/server.go index 36b1721a5..58b76a708 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -182,8 +182,6 @@ type Server struct { removedirect chan *enode.Node addtrusted chan *enode.Node removetrusted chan *enode.Node - addgroup chan *dialGroup - removegroup chan *dialGroup posthandshake chan *conn addpeer chan *conn delpeer chan peerDrop @@ -206,7 +204,6 @@ const ( dynDialedConn connFlag = 1 << iota staticDialedConn directDialedConn - groupDialedConn inboundConn trustedConn ) @@ -260,9 +257,6 @@ func (f connFlag) String() string { if f&directDialedConn != 0 { s += "-directdial" } - if f&groupDialedConn != 0 { - s += "-groupdial" - } if f&inboundConn != 0 { s += "-inbound" } @@ -357,26 +351,6 @@ func (srv *Server) RemoveDirectPeer(node *enode.Node) { } } -func (srv *Server) AddGroup(name string, nodes []*enode.Node, num uint64) { - m := map[enode.ID]*enode.Node{} - for _, node := range nodes { - m[node.ID()] = node - } - g := &dialGroup{name: name, nodes: m, num: num} - select { - case srv.addgroup <- g: - case <-srv.quit: - } -} - -func (srv *Server) RemoveGroup(name string) { - g := &dialGroup{name: name} - select { - case srv.removegroup <- g: - case <-srv.quit: - } -} - // AddTrustedPeer adds the given node to a reserved whitelist which allows the // node to always connect, even if the slot are full. func (srv *Server) AddTrustedPeer(node *enode.Node) { @@ -524,8 +498,6 @@ func (srv *Server) Start() (err error) { srv.removestatic = make(chan *enode.Node) srv.adddirect = make(chan *enode.Node) srv.removedirect = make(chan *enode.Node) - srv.addgroup = make(chan *dialGroup) - srv.removegroup = make(chan *dialGroup) srv.addtrusted = make(chan *enode.Node) srv.removetrusted = make(chan *enode.Node) srv.peerOp = make(chan peerOpFunc) @@ -689,8 +661,6 @@ type dialer interface { removeStatic(*enode.Node) addDirect(*enode.Node) removeDirect(*enode.Node) - addGroup(*dialGroup) - removeGroup(*dialGroup) } func (srv *Server) run(dialstate dialer) { @@ -699,15 +669,12 @@ func (srv *Server) run(dialstate dialer) { defer srv.nodedb.Close() var ( - peers = make(map[enode.ID]*Peer) - inboundCount = 0 - trusted = make(map[enode.ID]bool, len(srv.TrustedNodes)) - peerflags = make(map[enode.ID]connFlag) - groupRefCount = make(map[enode.ID]int32) - groups = make(map[string]*dialGroup) - taskdone = make(chan task, maxActiveDialTasks) - runningTasks []task - queuedTasks []task // tasks that can't run yet + peers = make(map[enode.ID]*Peer) + inboundCount = 0 + trusted = make(map[enode.ID]bool, len(srv.TrustedNodes)) + taskdone = make(chan task, maxActiveDialTasks) + runningTasks []task + queuedTasks []task // tasks that can't run yet ) // Put trusted nodes into a map to speed up checks. // Trusted peers are loaded on startup or added via AddTrustedPeer RPC. @@ -745,60 +712,6 @@ func (srv *Server) run(dialstate dialer) { } } - // remember and maintain the connection flags locally - setConnFlags := func(id enode.ID, f connFlag, val bool) { - if p, ok := peers[id]; ok { - p.rw.set(f, val) - } - if val { - peerflags[id] |= f - } else { - peerflags[id] &= ^f - } - if peerflags[id] == 0 { - delete(peerflags, id) - } - } - - // Put trusted nodes into a map to speed up checks. - // Trusted peers are loaded on startup or added via AddTrustedPeer RPC. - for _, n := range srv.TrustedNodes { - setConnFlags(n.ID(), trustedConn, true) - } - - canDisconnect := func(p *Peer) bool { - f, ok := peerflags[p.ID()] - if ok && f != 0 { - return false - } - return !p.rw.is(dynDialedConn | inboundConn) - } - - removeGroup := func(g *dialGroup) { - if gg, ok := groups[g.name]; ok { - for id := range gg.nodes { - groupRefCount[id]-- - if groupRefCount[id] == 0 { - setConnFlags(id, groupDialedConn, false) - delete(groupRefCount, id) - } - } - } - } - - addGroup := func(g *dialGroup) { - if _, ok := groups[g.name]; ok { - removeGroup(groups[g.name]) - } - for id := range g.nodes { - groupRefCount[id]++ - if groupRefCount[id] > 0 { - setConnFlags(id, groupDialedConn, true) - } - } - groups[g.name] = g - } - running: for { scheduleTasks() @@ -812,16 +725,14 @@ running: // ephemeral static peer list. Add it to the dialer, // it will keep the node connected. srv.log.Trace("Adding static node", "node", n) - setConnFlags(n.ID(), staticDialedConn, true) dialstate.addStatic(n) case n := <-srv.removestatic: // This channel is used by RemovePeer to send a // disconnect request to a peer and begin the // stop keeping the node connected. srv.log.Trace("Removing static node", "node", n) - setConnFlags(n.ID(), staticDialedConn, false) dialstate.removeStatic(n) - if p, ok := peers[n.ID()]; ok && canDisconnect(p) { + if p, ok := peers[n.ID()]; ok { p.Disconnect(DiscRequested) } case n := <-srv.adddirect: @@ -829,42 +740,36 @@ running: // ephemeral direct peer list. Add it to the dialer, // it will keep the node connected. srv.log.Trace("Adding direct node", "node", n) - setConnFlags(n.ID(), directDialedConn, true) - if p, ok := peers[n.ID()]; ok { - p.rw.set(directDialedConn, true) - } dialstate.addDirect(n) case n := <-srv.removedirect: // This channel is used by RemoveDirectPeer to send a // disconnect request to a peer and begin the // stop keeping the node connected. srv.log.Trace("Removing direct node", "node", n) - setConnFlags(n.ID(), directDialedConn, false) + dialstate.removeDirect(n) if p, ok := peers[n.ID()]; ok { - p.rw.set(directDialedConn, false) - if !p.rw.is(trustedConn | groupDialedConn) { - p.Disconnect(DiscRequested) - } + p.Disconnect(DiscRequested) } - dialstate.removeDirect(n) - case g := <-srv.addgroup: - srv.log.Trace("Adding group", "group", g) - addGroup(g) - dialstate.addGroup(g) - case g := <-srv.removegroup: - srv.log.Trace("Removing group", "group", g) - removeGroup(g) - dialstate.removeGroup(g) case n := <-srv.addtrusted: // This channel is used by AddTrustedPeer to add an enode // to the trusted node set. srv.log.Trace("Adding trusted node", "node", n) - setConnFlags(n.ID(), trustedConn, true) + trusted[n.ID()] = true + // Mark any already-connected peer as trusted + if p, ok := peers[n.ID()]; ok { + p.rw.set(trustedConn, true) + } case n := <-srv.removetrusted: // This channel is used by RemoveTrustedPeer to remove an enode // from the trusted node set. srv.log.Trace("Removing trusted node", "node", n) - setConnFlags(n.ID(), trustedConn, false) + if _, ok := trusted[n.ID()]; ok { + delete(trusted, n.ID()) + } + // Unmark any already-connected peer as trusted + if p, ok := peers[n.ID()]; ok { + p.rw.set(trustedConn, false) + } case op := <-srv.peerOp: // This channel is used by Peers and PeerCount. op(peers) @@ -879,8 +784,9 @@ running: case c := <-srv.posthandshake: // A connection has passed the encryption handshake so // the remote identity is known (but hasn't been verified yet). - if f, ok := peerflags[c.node.ID()]; ok { - c.flags |= f + if trusted[c.node.ID()] { + // Ensure that the trusted flag is set before checking against MaxPeers. + c.flags |= trustedConn } // TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them. select { @@ -962,9 +868,9 @@ func (srv *Server) protoHandshakeChecks(peers map[enode.ID]*Peer, inboundCount i func (srv *Server) encHandshakeChecks(peers map[enode.ID]*Peer, inboundCount int, c *conn) error { switch { - case !c.is(trustedConn|staticDialedConn|directDialedConn|groupDialedConn) && len(peers) >= srv.MaxPeers: + case !c.is(trustedConn|staticDialedConn|directDialedConn) && len(peers) >= srv.MaxPeers: return DiscTooManyPeers - case !c.is(trustedConn|directDialedConn|groupDialedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns(): + case !c.is(trustedConn|directDialedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns(): return DiscTooManyPeers case peers[c.node.ID()] != nil: return DiscAlreadyConnected |