aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/server.go')
-rw-r--r--p2p/server.go150
1 files changed, 119 insertions, 31 deletions
diff --git a/p2p/server.go b/p2p/server.go
index 922df55ba..90e92dc05 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -40,11 +40,10 @@ const (
refreshPeersInterval = 30 * time.Second
staticPeerCheckInterval = 15 * time.Second
- // Maximum number of concurrently handshaking inbound connections.
- maxAcceptConns = 50
-
- // Maximum number of concurrently dialing outbound connections.
- maxActiveDialTasks = 16
+ // Connectivity defaults.
+ maxActiveDialTasks = 16
+ defaultMaxPendingPeers = 50
+ defaultDialRatio = 3
// Maximum time allowed for reading a complete message.
// This is effectively the amount of time a connection can be idle.
@@ -70,6 +69,11 @@ type Config struct {
// Zero defaults to preset values.
MaxPendingPeers int `toml:",omitempty"`
+ // DialRatio controls the ratio of inbound to dialed connections.
+ // Example: a DialRatio of 2 allows 1/2 of connections to be dialed.
+ // Setting DialRatio to zero defaults it to 3.
+ DialRatio int `toml:",omitempty"`
+
// NoDiscovery can be used to disable the peer discovery mechanism.
// Disabling is useful for protocol debugging (manual topology).
NoDiscovery bool
@@ -78,9 +82,6 @@ type Config struct {
// protocol should be started or not.
DiscoveryV5 bool `toml:",omitempty"`
- // Listener address for the V5 discovery protocol UDP traffic.
- DiscoveryV5Addr string `toml:",omitempty"`
-
// Name sets the node name of this server.
// Use common.MakeName to create a name that follows existing conventions.
Name string `toml:"-"`
@@ -141,7 +142,7 @@ type Config struct {
EnableMsgEvents bool
// Logger is a custom logger to use with the p2p.Server.
- Logger log.Logger
+ Logger log.Logger `toml:",omitempty"`
}
// Server manages all peer connections.
@@ -354,6 +355,32 @@ func (srv *Server) Stop() {
srv.loopWG.Wait()
}
+// sharedUDPConn implements a shared connection. Write sends messages to the underlying connection while read returns
+// messages that were found unprocessable and sent to the unhandled channel by the primary listener.
+type sharedUDPConn struct {
+ *net.UDPConn
+ unhandled chan discover.ReadPacket
+}
+
+// ReadFromUDP implements discv5.conn
+func (s *sharedUDPConn) ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error) {
+ packet, ok := <-s.unhandled
+ if !ok {
+ return 0, nil, fmt.Errorf("Connection was closed")
+ }
+ l := len(packet.Data)
+ if l > len(b) {
+ l = len(b)
+ }
+ copy(b[:l], packet.Data[:l])
+ return l, packet.Addr, nil
+}
+
+// Close implements discv5.conn
+func (s *sharedUDPConn) Close() error {
+ return nil
+}
+
// Start starts running the server.
// Servers can not be re-used after stopping.
func (srv *Server) Start() (err error) {
@@ -388,20 +415,66 @@ func (srv *Server) Start() (err error) {
srv.peerOp = make(chan peerOpFunc)
srv.peerOpDone = make(chan struct{})
- // node table
- if !srv.NoDiscovery {
- ntab, err := discover.ListenUDP(srv.PrivateKey, srv.ListenAddr, srv.NAT, srv.NodeDatabase, srv.NetRestrict)
+ var (
+ conn *net.UDPConn
+ sconn *sharedUDPConn
+ realaddr *net.UDPAddr
+ unhandled chan discover.ReadPacket
+ )
+
+ if !srv.NoDiscovery || srv.DiscoveryV5 {
+ addr, err := net.ResolveUDPAddr("udp", srv.ListenAddr)
if err != nil {
return err
}
- if err := ntab.SetFallbackNodes(srv.BootstrapNodes); err != nil {
+ conn, err = net.ListenUDP("udp", addr)
+ if err != nil {
+ return err
+ }
+ realaddr = conn.LocalAddr().(*net.UDPAddr)
+ if srv.NAT != nil {
+ if !realaddr.IP.IsLoopback() {
+ go nat.Map(srv.NAT, srv.quit, "udp", realaddr.Port, realaddr.Port, "ethereum discovery")
+ }
+ // TODO: react to external IP changes over time.
+ if ext, err := srv.NAT.ExternalIP(); err == nil {
+ realaddr = &net.UDPAddr{IP: ext, Port: realaddr.Port}
+ }
+ }
+ }
+
+ if !srv.NoDiscovery && srv.DiscoveryV5 {
+ unhandled = make(chan discover.ReadPacket, 100)
+ sconn = &sharedUDPConn{conn, unhandled}
+ }
+
+ // node table
+ if !srv.NoDiscovery {
+ cfg := discover.Config{
+ PrivateKey: srv.PrivateKey,
+ AnnounceAddr: realaddr,
+ NodeDBPath: srv.NodeDatabase,
+ NetRestrict: srv.NetRestrict,
+ Bootnodes: srv.BootstrapNodes,
+ Unhandled: unhandled,
+ }
+ ntab, err := discover.ListenUDP(conn, cfg)
+ if err != nil {
return err
}
srv.ntab = ntab
}
if srv.DiscoveryV5 {
- ntab, err := discv5.ListenUDP(srv.PrivateKey, srv.DiscoveryV5Addr, srv.NAT, "", srv.NetRestrict) //srv.NodeDatabase)
+ var (
+ ntab *discv5.Network
+ err error
+ )
+ if sconn != nil {
+ ntab, err = discv5.ListenUDP(srv.PrivateKey, sconn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)
+ } else {
+ ntab, err = discv5.ListenUDP(srv.PrivateKey, conn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)
+ }
if err != nil {
return err
}
@@ -411,10 +484,7 @@ func (srv *Server) Start() (err error) {
srv.DiscV5 = ntab
}
- dynPeers := (srv.MaxPeers + 1) / 2
- if srv.NoDiscovery {
- dynPeers = 0
- }
+ dynPeers := srv.maxDialedConns()
dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)
// handshake
@@ -471,6 +541,7 @@ func (srv *Server) run(dialstate dialer) {
defer srv.loopWG.Done()
var (
peers = make(map[discover.NodeID]*Peer)
+ inboundCount = 0
trusted = make(map[discover.NodeID]bool, len(srv.TrustedNodes))
taskdone = make(chan task, maxActiveDialTasks)
runningTasks []task
@@ -556,14 +627,14 @@ running:
}
// TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them.
select {
- case c.cont <- srv.encHandshakeChecks(peers, c):
+ case c.cont <- srv.encHandshakeChecks(peers, inboundCount, c):
case <-srv.quit:
break running
}
case c := <-srv.addpeer:
// At this point the connection is past the protocol handshake.
// Its capabilities are known and the remote identity is verified.
- err := srv.protoHandshakeChecks(peers, c)
+ err := srv.protoHandshakeChecks(peers, inboundCount, c)
if err == nil {
// The handshakes are done and it passed all checks.
p := newPeer(c, srv.Protocols)
@@ -574,8 +645,11 @@ running:
}
name := truncateName(c.name)
srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
- peers[c.id] = p
go srv.runPeer(p)
+ peers[c.id] = p
+ if p.Inbound() {
+ inboundCount++
+ }
}
// The dialer logic relies on the assumption that
// dial tasks complete after the peer has been added or
@@ -590,6 +664,9 @@ running:
d := common.PrettyDuration(mclock.Now() - pd.created)
pd.log.Debug("Removing p2p peer", "duration", d, "peers", len(peers)-1, "req", pd.requested, "err", pd.err)
delete(peers, pd.ID())
+ if pd.Inbound() {
+ inboundCount--
+ }
}
}
@@ -616,20 +693,22 @@ running:
}
}
-func (srv *Server) protoHandshakeChecks(peers map[discover.NodeID]*Peer, c *conn) error {
+func (srv *Server) protoHandshakeChecks(peers map[discover.NodeID]*Peer, inboundCount int, c *conn) error {
// Drop connections with no matching protocols.
if len(srv.Protocols) > 0 && countMatchingProtocols(srv.Protocols, c.caps) == 0 {
return DiscUselessPeer
}
// Repeat the encryption handshake checks because the
// peer set might have changed between the handshakes.
- return srv.encHandshakeChecks(peers, c)
+ return srv.encHandshakeChecks(peers, inboundCount, c)
}
-func (srv *Server) encHandshakeChecks(peers map[discover.NodeID]*Peer, c *conn) error {
+func (srv *Server) encHandshakeChecks(peers map[discover.NodeID]*Peer, inboundCount int, c *conn) error {
switch {
case !c.is(trustedConn|staticDialedConn) && len(peers) >= srv.MaxPeers:
return DiscTooManyPeers
+ case !c.is(trustedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns():
+ return DiscTooManyPeers
case peers[c.id] != nil:
return DiscAlreadyConnected
case c.id == srv.Self().ID:
@@ -639,6 +718,21 @@ func (srv *Server) encHandshakeChecks(peers map[discover.NodeID]*Peer, c *conn)
}
}
+func (srv *Server) maxInboundConns() int {
+ return srv.MaxPeers - srv.maxDialedConns()
+}
+
+func (srv *Server) maxDialedConns() int {
+ if srv.NoDiscovery || srv.NoDial {
+ return 0
+ }
+ r := srv.DialRatio
+ if r == 0 {
+ r = defaultDialRatio
+ }
+ return srv.MaxPeers / r
+}
+
type tempError interface {
Temporary() bool
}
@@ -649,10 +743,7 @@ func (srv *Server) listenLoop() {
defer srv.loopWG.Done()
srv.log.Info("RLPx listener up", "self", srv.makeSelf(srv.listener, srv.ntab))
- // This channel acts as a semaphore limiting
- // active inbound connections that are lingering pre-handshake.
- // If all slots are taken, no further connections are accepted.
- tokens := maxAcceptConns
+ tokens := defaultMaxPendingPeers
if srv.MaxPendingPeers > 0 {
tokens = srv.MaxPendingPeers
}
@@ -693,9 +784,6 @@ func (srv *Server) listenLoop() {
fd = newMeteredConn(fd, true)
srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())
-
- // Spawn the handler. It will give the slot back when the connection
- // has been established.
go func() {
srv.SetupConn(fd, inboundConn, nil)
slots <- struct{}{}