diff options
author | Felix Lange <fjl@users.noreply.github.com> | 2018-02-12 20:36:09 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2018-02-12 20:36:09 +0800 |
commit | 9123eceb0f78f69e88d909a56ad7fadb75570198 (patch) | |
tree | f3f02d178a7b8452e29e8902490bc308ebae0bde /p2p/server.go | |
parent | 1d39912a9b9ec710460c82c403cc8f72ab7d30d9 (diff) | |
download | go-tangerine-9123eceb0f78f69e88d909a56ad7fadb75570198.tar go-tangerine-9123eceb0f78f69e88d909a56ad7fadb75570198.tar.gz go-tangerine-9123eceb0f78f69e88d909a56ad7fadb75570198.tar.bz2 go-tangerine-9123eceb0f78f69e88d909a56ad7fadb75570198.tar.lz go-tangerine-9123eceb0f78f69e88d909a56ad7fadb75570198.tar.xz go-tangerine-9123eceb0f78f69e88d909a56ad7fadb75570198.tar.zst go-tangerine-9123eceb0f78f69e88d909a56ad7fadb75570198.zip |
p2p, p2p/discover: misc connectivity improvements (#16069)
* p2p: add DialRatio for configuration of inbound vs. dialed connections
* p2p: add connection flags to PeerInfo
* p2p/netutil: add SameNet, DistinctNetSet
* p2p/discover: improve revalidation and seeding
This changes node revalidation to be periodic instead of on-demand. This
should prevent issues where dead nodes get stuck in closer buckets
because no other node will ever come along to replace them.
Every 5 seconds (on average), the last node in a random bucket is
checked and moved to the front of the bucket if it is still responding.
If revalidation fails, the last node is replaced by an entry of the
'replacement list' containing recently-seen nodes.
Most close buckets are removed because it's very unlikely we'll ever
encounter a node that would fall into any of those buckets.
Table seeding is also improved: we now require a few minutes of table
membership before considering a node as a potential seed node. This
should make it less likely to store short-lived nodes as potential
seeds.
* p2p/discover: fix nits in UDP transport
We would skip sending neighbors replies if there were fewer than
maxNeighbors results and CheckRelayIP returned an error for the last
one. While here, also resolve a TODO about pong reply tokens.
Diffstat (limited to 'p2p/server.go')
-rw-r--r-- | p2p/server.go | 77 |
1 files changed, 50 insertions, 27 deletions
diff --git a/p2p/server.go b/p2p/server.go index 2cff94ea5..edc1d9d21 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -40,11 +40,10 @@ const ( refreshPeersInterval = 30 * time.Second staticPeerCheckInterval = 15 * time.Second - // Maximum number of concurrently handshaking inbound connections. - maxAcceptConns = 50 - - // Maximum number of concurrently dialing outbound connections. - maxActiveDialTasks = 16 + // Connectivity defaults. + maxActiveDialTasks = 16 + defaultMaxPendingPeers = 50 + defaultDialRatio = 3 // Maximum time allowed for reading a complete message. // This is effectively the amount of time a connection can be idle. @@ -70,6 +69,11 @@ type Config struct { // Zero defaults to preset values. MaxPendingPeers int `toml:",omitempty"` + // DialRatio controls the ratio of inbound to dialed connections. + // Example: a DialRatio of 2 allows 1/2 of connections to be dialed. + // Setting DialRatio to zero defaults it to 3. + DialRatio int `toml:",omitempty"` + // NoDiscovery can be used to disable the peer discovery mechanism. // Disabling is useful for protocol debugging (manual topology). NoDiscovery bool @@ -427,7 +431,6 @@ func (srv *Server) Start() (err error) { if err != nil { return err } - realaddr = conn.LocalAddr().(*net.UDPAddr) if srv.NAT != nil { if !realaddr.IP.IsLoopback() { @@ -447,11 +450,16 @@ func (srv *Server) Start() (err error) { // node table if !srv.NoDiscovery { - ntab, err := discover.ListenUDP(srv.PrivateKey, conn, realaddr, unhandled, srv.NodeDatabase, srv.NetRestrict) - if err != nil { - return err + cfg := discover.Config{ + PrivateKey: srv.PrivateKey, + AnnounceAddr: realaddr, + NodeDBPath: srv.NodeDatabase, + NetRestrict: srv.NetRestrict, + Bootnodes: srv.BootstrapNodes, + Unhandled: unhandled, } - if err := ntab.SetFallbackNodes(srv.BootstrapNodes); err != nil { + ntab, err := discover.ListenUDP(conn, cfg) + if err != nil { return err } srv.ntab = ntab @@ -476,10 +484,7 @@ func (srv *Server) Start() (err error) { srv.DiscV5 = ntab } - dynPeers := (srv.MaxPeers + 1) / 2 - if srv.NoDiscovery { - dynPeers = 0 - } + dynPeers := srv.maxDialedConns() dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict) // handshake @@ -536,6 +541,7 @@ func (srv *Server) run(dialstate dialer) { defer srv.loopWG.Done() var ( peers = make(map[discover.NodeID]*Peer) + inboundCount = 0 trusted = make(map[discover.NodeID]bool, len(srv.TrustedNodes)) taskdone = make(chan task, maxActiveDialTasks) runningTasks []task @@ -621,14 +627,14 @@ running: } // TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them. select { - case c.cont <- srv.encHandshakeChecks(peers, c): + case c.cont <- srv.encHandshakeChecks(peers, inboundCount, c): case <-srv.quit: break running } case c := <-srv.addpeer: // At this point the connection is past the protocol handshake. // Its capabilities are known and the remote identity is verified. - err := srv.protoHandshakeChecks(peers, c) + err := srv.protoHandshakeChecks(peers, inboundCount, c) if err == nil { // The handshakes are done and it passed all checks. p := newPeer(c, srv.Protocols) @@ -639,8 +645,11 @@ running: } name := truncateName(c.name) srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1) - peers[c.id] = p go srv.runPeer(p) + peers[c.id] = p + if p.Inbound() { + inboundCount++ + } } // The dialer logic relies on the assumption that // dial tasks complete after the peer has been added or @@ -655,6 +664,9 @@ running: d := common.PrettyDuration(mclock.Now() - pd.created) pd.log.Debug("Removing p2p peer", "duration", d, "peers", len(peers)-1, "req", pd.requested, "err", pd.err) delete(peers, pd.ID()) + if pd.Inbound() { + inboundCount-- + } } } @@ -681,20 +693,22 @@ running: } } -func (srv *Server) protoHandshakeChecks(peers map[discover.NodeID]*Peer, c *conn) error { +func (srv *Server) protoHandshakeChecks(peers map[discover.NodeID]*Peer, inboundCount int, c *conn) error { // Drop connections with no matching protocols. if len(srv.Protocols) > 0 && countMatchingProtocols(srv.Protocols, c.caps) == 0 { return DiscUselessPeer } // Repeat the encryption handshake checks because the // peer set might have changed between the handshakes. - return srv.encHandshakeChecks(peers, c) + return srv.encHandshakeChecks(peers, inboundCount, c) } -func (srv *Server) encHandshakeChecks(peers map[discover.NodeID]*Peer, c *conn) error { +func (srv *Server) encHandshakeChecks(peers map[discover.NodeID]*Peer, inboundCount int, c *conn) error { switch { case !c.is(trustedConn|staticDialedConn) && len(peers) >= srv.MaxPeers: return DiscTooManyPeers + case !c.is(trustedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns(): + return DiscTooManyPeers case peers[c.id] != nil: return DiscAlreadyConnected case c.id == srv.Self().ID: @@ -704,6 +718,21 @@ func (srv *Server) encHandshakeChecks(peers map[discover.NodeID]*Peer, c *conn) } } +func (srv *Server) maxInboundConns() int { + return srv.MaxPeers - srv.maxDialedConns() +} + +func (srv *Server) maxDialedConns() int { + if srv.NoDiscovery || srv.NoDial { + return 0 + } + r := srv.DialRatio + if r == 0 { + r = defaultDialRatio + } + return srv.MaxPeers / r +} + type tempError interface { Temporary() bool } @@ -714,10 +743,7 @@ func (srv *Server) listenLoop() { defer srv.loopWG.Done() srv.log.Info("RLPx listener up", "self", srv.makeSelf(srv.listener, srv.ntab)) - // This channel acts as a semaphore limiting - // active inbound connections that are lingering pre-handshake. - // If all slots are taken, no further connections are accepted. - tokens := maxAcceptConns + tokens := defaultMaxPendingPeers if srv.MaxPendingPeers > 0 { tokens = srv.MaxPendingPeers } @@ -758,9 +784,6 @@ func (srv *Server) listenLoop() { fd = newMeteredConn(fd, true) srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr()) - - // Spawn the handler. It will give the slot back when the connection - // has been established. go func() { srv.SetupConn(fd, inboundConn, nil) slots <- struct{}{} |