From 3953bf0031b6b2a4302b333aa65fc8ccaa7d788c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 30 Apr 2015 15:06:05 +0300 Subject: p2p: limit the outbound dialing too --- p2p/server.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) (limited to 'p2p/server.go') diff --git a/p2p/server.go b/p2p/server.go index 5e0c917fc..5424b5463 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -27,6 +27,9 @@ const ( // 'added as peer'. maxAcceptConns = 50 + // Maximum number of concurrently dialing outbound connections. + maxDialingConns = 50 + // total timeout for encryption handshake and protocol // handshake in both directions. handshakeTimeout = 5 * time.Second @@ -401,7 +404,11 @@ func (srv *Server) dialLoop() { defer srv.loopWG.Done() defer refresh.Stop() - // TODO: maybe limit number of active dials + // Limit the number of concurrent dials + slots := make(chan struct{}, maxDialingConns) + for i := 0; i < maxDialingConns; i++ { + slots <- struct{}{} + } dial := func(dest *discover.Node) { // Don't dial nodes that would fail the checks in addPeer. // This is important because the connection handshake is a lot @@ -413,6 +420,9 @@ func (srv *Server) dialLoop() { if !ok || dialing[dest.ID] { return } + // Request a dial slot to prevent CPU exhaustion + <-slots + defer func() { slots <- struct{}{} }() dialing[dest.ID] = true srv.peerWG.Add(1) -- cgit v1.2.3 From 29fef349efd87dcca76b0593e6b68ca9f3ccf2cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 30 Apr 2015 15:21:09 +0300 Subject: p2p: fix a dialing race in the throttler --- p2p/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'p2p/server.go') diff --git a/p2p/server.go b/p2p/server.go index 5424b5463..16768f920 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -422,13 +422,13 @@ func (srv *Server) dialLoop() { } // Request a dial slot to prevent CPU exhaustion <-slots - defer func() { slots <- struct{}{} }() dialing[dest.ID] = true srv.peerWG.Add(1) go func() { srv.dialNode(dest) dialed <- dest + slots <- struct{}{} }() } -- cgit v1.2.3 From 2060bc8bac0e803c661e0c0b233284ce52630c1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 30 Apr 2015 17:12:23 +0300 Subject: p2p: fix dial throttling race condition --- p2p/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'p2p/server.go') diff --git a/p2p/server.go b/p2p/server.go index 16768f920..b7a92ce55 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -427,8 +427,8 @@ func (srv *Server) dialLoop() { srv.peerWG.Add(1) go func() { srv.dialNode(dest) - dialed <- dest slots <- struct{}{} + dialed <- dest }() } -- cgit v1.2.3 From af932177755f5f839ab29b16dc490d3e1bb3708d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 30 Apr 2015 17:39:08 +0300 Subject: p2p: reduce the concurrent handshakes to 10/10 in/out --- p2p/server.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'p2p/server.go') diff --git a/p2p/server.go b/p2p/server.go index b7a92ce55..164aaba37 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -25,10 +25,10 @@ const ( // This is the maximum number of inbound connection // that are allowed to linger between 'accepted' and // 'added as peer'. - maxAcceptConns = 50 + maxAcceptConns = 10 // Maximum number of concurrently dialing outbound connections. - maxDialingConns = 50 + maxDialingConns = 10 // total timeout for encryption handshake and protocol // handshake in both directions. -- cgit v1.2.3 From 4d5a719f256d7dfbaab2cc9c632cd7996067508f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 4 May 2015 17:35:49 +0300 Subject: cmd, eth, p2p: introduce pending peer cli arg, add tests --- p2p/server.go | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) (limited to 'p2p/server.go') diff --git a/p2p/server.go b/p2p/server.go index 164aaba37..77f66f167 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -22,9 +22,7 @@ const ( refreshPeersInterval = 30 * time.Second staticPeerCheckInterval = 15 * time.Second - // This is the maximum number of inbound connection - // that are allowed to linger between 'accepted' and - // 'added as peer'. + // Maximum number of concurrently handshaking inbound connections. maxAcceptConns = 10 // Maximum number of concurrently dialing outbound connections. @@ -55,6 +53,11 @@ type Server struct { // connected. It must be greater than zero. MaxPeers int + // MaxPendingPeers is the maximum number of peers that can be pending in the + // handshake phase, counted separately for inbound and outbound connections. + // Zero defaults to preset values. + MaxPendingPeers int + // Name sets the node name of this server. // Use common.MakeName to create a name that follows existing conventions. Name string @@ -334,8 +337,12 @@ func (srv *Server) listenLoop() { // This channel acts as a semaphore limiting // active inbound connections that are lingering pre-handshake. // If all slots are taken, no further connections are accepted. - slots := make(chan struct{}, maxAcceptConns) - for i := 0; i < maxAcceptConns; i++ { + tokens := maxAcceptConns + if srv.MaxPendingPeers > 0 { + tokens = srv.MaxPendingPeers + } + slots := make(chan struct{}, tokens) + for i := 0; i < tokens; i++ { slots <- struct{}{} } @@ -405,8 +412,12 @@ func (srv *Server) dialLoop() { defer refresh.Stop() // Limit the number of concurrent dials - slots := make(chan struct{}, maxDialingConns) - for i := 0; i < maxDialingConns; i++ { + tokens := maxAcceptConns + if srv.MaxPendingPeers > 0 { + tokens = srv.MaxPendingPeers + } + slots := make(chan struct{}, tokens) + for i := 0; i < tokens; i++ { slots <- struct{}{} } dial := func(dest *discover.Node) { -- cgit v1.2.3 From 914e57e49bea0617515e1935972c5990a222cd7b Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 8 May 2015 15:54:35 +0200 Subject: p2p: fix disconnect at capacity With the introduction of static/trusted nodes, the peer count can go above MaxPeers. Update the capacity check to handle this. While here, decouple the trusted nodes check from the handshake by passing a function instead. --- p2p/server.go | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) (limited to 'p2p/server.go') diff --git a/p2p/server.go b/p2p/server.go index 77f66f167..0598547e4 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -126,7 +126,7 @@ type Server struct { peerWG sync.WaitGroup // active peer goroutines } -type setupFunc func(net.Conn, *ecdsa.PrivateKey, *protoHandshake, *discover.Node, bool, map[discover.NodeID]bool) (*conn, error) +type setupFunc func(net.Conn, *ecdsa.PrivateKey, *protoHandshake, *discover.Node, func(discover.NodeID) bool) (*conn, error) type newPeerHook func(*Peer) // Peers returns all connected peers. @@ -506,17 +506,7 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) { // the callers of startPeer added the peer to the wait group already. fd.SetDeadline(time.Now().Add(handshakeTimeout)) - // Check capacity, but override for static nodes - srv.lock.RLock() - atcap := len(srv.peers) == srv.MaxPeers - if dest != nil { - if _, ok := srv.staticNodes[dest.ID]; ok { - atcap = false - } - } - srv.lock.RUnlock() - - conn, err := srv.setupFunc(fd, srv.PrivateKey, srv.ourHandshake, dest, atcap, srv.trustedNodes) + conn, err := srv.setupFunc(fd, srv.PrivateKey, srv.ourHandshake, dest, srv.keepconn) if err != nil { fd.Close() glog.V(logger.Debug).Infof("Handshake with %v failed: %v", fd.RemoteAddr(), err) @@ -539,6 +529,21 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) { go srv.runPeer(p) } +// preflight checks whether a connection should be kept. it runs +// after the encryption handshake, as soon as the remote identity is +// known. +func (srv *Server) keepconn(id discover.NodeID) bool { + srv.lock.RLock() + defer srv.lock.RUnlock() + if _, ok := srv.staticNodes[id]; ok { + return true // static nodes are always allowed + } + if _, ok := srv.trustedNodes[id]; ok { + return true // trusted nodes are always allowed + } + return len(srv.peers) < srv.MaxPeers +} + func (srv *Server) runPeer(p *Peer) { glog.V(logger.Debug).Infof("Added %v\n", p) srvjslog.LogJson(&logger.P2PConnected{ -- cgit v1.2.3 From 9c0f36c46dd85f02c6c02cc646714b2576a70f27 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 8 May 2015 15:58:19 +0200 Subject: p2p: use maxDialingConns instead of maxAcceptConns as dial limit --- p2p/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'p2p/server.go') diff --git a/p2p/server.go b/p2p/server.go index 0598547e4..171798a1d 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -412,7 +412,7 @@ func (srv *Server) dialLoop() { defer refresh.Stop() // Limit the number of concurrent dials - tokens := maxAcceptConns + tokens := maxDialingConns if srv.MaxPendingPeers > 0 { tokens = srv.MaxPendingPeers } -- cgit v1.2.3 From d4f0a67323dec12e5b84ba4907970267a2e27601 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 8 May 2015 16:09:38 +0200 Subject: p2p: drop connections with no matching protocols --- p2p/server.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) (limited to 'p2p/server.go') diff --git a/p2p/server.go b/p2p/server.go index 171798a1d..3c6fb5893 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -518,7 +518,7 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) { conn: fd, rtimeout: frameReadTimeout, wtimeout: frameWriteTimeout, } p := newPeer(fd, conn, srv.Protocols) - if ok, reason := srv.addPeer(conn.ID, p); !ok { + if ok, reason := srv.addPeer(conn, p); !ok { glog.V(logger.Detail).Infof("Not adding %v (%v)\n", p, reason) p.politeDisconnect(reason) srv.peerWG.Done() @@ -564,13 +564,18 @@ func (srv *Server) runPeer(p *Peer) { }) } -func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) { +func (srv *Server) addPeer(conn *conn, p *Peer) (bool, DiscReason) { + // drop connections with no matching protocols. + if len(srv.Protocols) > 0 && countMatchingProtocols(srv.Protocols, conn.protoHandshake.Caps) == 0 { + return false, DiscUselessPeer + } + // add the peer if it passes the other checks. srv.lock.Lock() defer srv.lock.Unlock() - if ok, reason := srv.checkPeer(id); !ok { + if ok, reason := srv.checkPeer(conn.ID); !ok { return false, reason } - srv.peers[id] = p + srv.peers[conn.ID] = p return true, 0 } -- cgit v1.2.3