From 22d1f0faf1fcb20063b719f8fc70534e268485b6 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 3 Apr 2015 03:56:17 +0200 Subject: p2p: improve peer selection logic This commit introduces a new (temporary) peer selection strategy based on random lookups. While we're here, also implement the TODOs in dialLoop. --- p2p/server.go | 117 ++++++++++++++++++++++++++++++---------------------------- 1 file changed, 61 insertions(+), 56 deletions(-) (limited to 'p2p/server.go') diff --git a/p2p/server.go b/p2p/server.go index 0a2621aa8..b21b83252 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -3,6 +3,7 @@ package p2p import ( "bytes" "crypto/ecdsa" + "crypto/rand" "errors" "fmt" "net" @@ -276,46 +277,58 @@ func (srv *Server) listenLoop() { } func (srv *Server) dialLoop() { + var ( + dialed = make(chan *discover.Node) + dialing = make(map[discover.NodeID]bool) + findresults = make(chan []*discover.Node) + refresh = time.NewTimer(0) + ) defer srv.loopWG.Done() - refresh := time.NewTicker(refreshPeersInterval) defer refresh.Stop() - srv.ntab.Bootstrap(srv.BootstrapNodes) - go srv.findPeers() - - dialed := make(chan *discover.Node) - dialing := make(map[discover.NodeID]bool) - - // TODO: limit number of active dials - // TODO: ensure only one findPeers goroutine is running - // TODO: pause findPeers when we're at capacity + // TODO: maybe limit number of active dials + dial := func(dest *discover.Node) { + srv.lock.Lock() + ok, _ := srv.checkPeer(dest.ID) + srv.lock.Unlock() + // Don't dial nodes that would fail the checks in addPeer. + // This is important because the connection handshake is a lot + // of work and we'd rather avoid doing that work for peers + // that can't be added. + if !ok || dialing[dest.ID] { + return + } + dialing[dest.ID] = true + srv.peerWG.Add(1) + go func() { + srv.dialNode(dest) + dialed <- dest + }() + } + srv.ntab.Bootstrap(srv.BootstrapNodes) for { select { case <-refresh.C: - - go srv.findPeers() - - case dest := <-srv.peerConnect: - // avoid dialing nodes that are already connected. - // there is another check for this in addPeer, - // which runs after the handshake. srv.lock.Lock() - _, isconnected := srv.peers[dest.ID] + needpeers := len(srv.peers) < srv.MaxPeers srv.lock.Unlock() - if isconnected || dialing[dest.ID] || dest.ID == srv.Self().ID { - continue + if needpeers { + go func() { + var target discover.NodeID + rand.Read(target[:]) + findresults <- srv.ntab.Lookup(target) + }() + refresh.Stop() } - dialing[dest.ID] = true - srv.peerWG.Add(1) - go func() { - srv.dialNode(dest) - // at this point, the peer has been added - // or discarded. either way, we're not dialing it anymore. - dialed <- dest - }() - + case dest := <-srv.peerConnect: + dial(dest) + case dests := <-findresults: + for _, dest := range dests { + dial(dest) + } + refresh.Reset(refreshPeersInterval) case dest := <-dialed: delete(dialing, dest.ID) @@ -341,24 +354,6 @@ func (srv *Server) Self() *discover.Node { return srv.ntab.Self() } -func (srv *Server) findPeers() { - far := srv.Self().ID - for i := range far { - far[i] = ^far[i] - } - closeToSelf := srv.ntab.Lookup(srv.Self().ID) - farFromSelf := srv.ntab.Lookup(far) - - for i := 0; i < len(closeToSelf) || i < len(farFromSelf); i++ { - if i < len(closeToSelf) { - srv.peerConnect <- closeToSelf[i] - } - if i < len(farFromSelf) { - srv.peerConnect <- farFromSelf[i] - } - } -} - func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) { // TODO: handle/store session token fd.SetDeadline(time.Now().Add(handshakeTimeout)) @@ -368,7 +363,6 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) { glog.V(logger.Debug).Infof("Handshake with %v failed: %v", fd.RemoteAddr(), err) return } - conn.MsgReadWriter = &netWrapper{ wrapped: conn.MsgReadWriter, conn: fd, rtimeout: frameReadTimeout, wtimeout: frameWriteTimeout, @@ -379,24 +373,27 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) { p.politeDisconnect(reason) return } + // The handshakes are done and it passed all checks. + // Spawn the Peer loops. + go srv.runPeer(p) +} +func (srv *Server) runPeer(p *Peer) { glog.V(logger.Debug).Infof("Added %v\n", p) srvjslog.LogJson(&logger.P2PConnected{ - RemoteId: fmt.Sprintf("%x", conn.ID[:]), - RemoteAddress: fd.RemoteAddr().String(), - RemoteVersionString: conn.Name, + RemoteId: p.ID().String(), + RemoteAddress: p.RemoteAddr().String(), + RemoteVersionString: p.Name(), NumConnections: srv.PeerCount(), }) - if srv.newPeerHook != nil { srv.newPeerHook(p) } discreason := p.run() srv.removePeer(p) - glog.V(logger.Debug).Infof("Removed %v (%v)\n", p, discreason) srvjslog.LogJson(&logger.P2PDisconnected{ - RemoteId: fmt.Sprintf("%x", conn.ID[:]), + RemoteId: p.ID().String(), NumConnections: srv.PeerCount(), }) } @@ -404,6 +401,14 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) { func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) { srv.lock.Lock() defer srv.lock.Unlock() + if ok, reason := srv.checkPeer(id); !ok { + return false, reason + } + srv.peers[id] = p + return true, 0 +} + +func (srv *Server) checkPeer(id discover.NodeID) (bool, DiscReason) { switch { case !srv.running: return false, DiscQuitting @@ -413,9 +418,9 @@ func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) { return false, DiscAlreadyConnected case id == srv.Self().ID: return false, DiscSelf + default: + return true, 0 } - srv.peers[id] = p - return true, 0 } func (srv *Server) removePeer(p *Peer) { -- cgit v1.2.3 From 145330fdf28b6d6bf3558dec0b512a5379d446f7 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 10 Apr 2015 13:20:26 +0200 Subject: p2p: properly decrement peer wait group counter for setup errors --- p2p/server.go | 11 +++++++++++ 1 file changed, 11 insertions(+) (limited to 'p2p/server.go') diff --git a/p2p/server.go b/p2p/server.go index b21b83252..d227d477a 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -344,6 +344,10 @@ func (srv *Server) dialNode(dest *discover.Node) { glog.V(logger.Debug).Infof("Dialing %v\n", dest) conn, err := srv.Dialer.Dial("tcp", addr.String()) if err != nil { + // dialLoop adds to the wait group counter when launching + // dialNode, so we need to count it down again. startPeer also + // does that when an error occurs. + srv.peerWG.Done() glog.V(logger.Detail).Infof("dial error: %v", err) return } @@ -356,11 +360,17 @@ func (srv *Server) Self() *discover.Node { func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) { // TODO: handle/store session token + + // Run setupFunc, which should create an authenticated connection + // and run the capability exchange. Note that any early error + // returns during that exchange need to call peerWG.Done because + // the callers of startPeer added the peer to the wait group already. fd.SetDeadline(time.Now().Add(handshakeTimeout)) conn, err := srv.setupFunc(fd, srv.PrivateKey, srv.ourHandshake, dest) if err != nil { fd.Close() glog.V(logger.Debug).Infof("Handshake with %v failed: %v", fd.RemoteAddr(), err) + srv.peerWG.Done() return } conn.MsgReadWriter = &netWrapper{ @@ -371,6 +381,7 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) { if ok, reason := srv.addPeer(conn.ID, p); !ok { glog.V(logger.Detail).Infof("Not adding %v (%v)\n", p, reason) p.politeDisconnect(reason) + srv.peerWG.Done() return } // The handshakes are done and it passed all checks. -- cgit v1.2.3 From b3c058a9e4e9296583ba516c537768b96a2fb8a0 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 10 Apr 2015 13:25:35 +0200 Subject: p2p: improve disconnect signaling at handshake time As of this commit, p2p will disconnect nodes directly after the encryption handshake if too many peer connections are active. Errors in the protocol handshake packet are now handled more politely by sending a disconnect packet before closing the connection. --- p2p/server.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) (limited to 'p2p/server.go') diff --git a/p2p/server.go b/p2p/server.go index d227d477a..88f7ba2ec 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -99,7 +99,7 @@ type Server struct { peerConnect chan *discover.Node } -type setupFunc func(net.Conn, *ecdsa.PrivateKey, *protoHandshake, *discover.Node) (*conn, error) +type setupFunc func(net.Conn, *ecdsa.PrivateKey, *protoHandshake, *discover.Node, bool) (*conn, error) type newPeerHook func(*Peer) // Peers returns all connected peers. @@ -261,6 +261,11 @@ func (srv *Server) Stop() { srv.peerWG.Wait() } +// Self returns the local node's endpoint information. +func (srv *Server) Self() *discover.Node { + return srv.ntab.Self() +} + // main loop for adding connections via listening func (srv *Server) listenLoop() { defer srv.loopWG.Done() @@ -354,10 +359,6 @@ func (srv *Server) dialNode(dest *discover.Node) { srv.startPeer(conn, dest) } -func (srv *Server) Self() *discover.Node { - return srv.ntab.Self() -} - func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) { // TODO: handle/store session token @@ -366,7 +367,10 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) { // returns during that exchange need to call peerWG.Done because // the callers of startPeer added the peer to the wait group already. fd.SetDeadline(time.Now().Add(handshakeTimeout)) - conn, err := srv.setupFunc(fd, srv.PrivateKey, srv.ourHandshake, dest) + srv.lock.RLock() + atcap := len(srv.peers) == srv.MaxPeers + srv.lock.RUnlock() + conn, err := srv.setupFunc(fd, srv.PrivateKey, srv.ourHandshake, dest, atcap) if err != nil { fd.Close() glog.V(logger.Debug).Infof("Handshake with %v failed: %v", fd.RemoteAddr(), err) -- cgit v1.2.3 From 56977c225ecbf7834e380c03714c3e8d70e0b184 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 10 Apr 2015 17:23:09 +0200 Subject: p2p: use RLock instead of Lock for pre-dial checks --- p2p/server.go | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) (limited to 'p2p/server.go') diff --git a/p2p/server.go b/p2p/server.go index 88f7ba2ec..7b880f77b 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -86,12 +86,12 @@ type Server struct { ourHandshake *protoHandshake - lock sync.RWMutex - running bool - listener net.Listener - peers map[discover.NodeID]*Peer + lock sync.RWMutex // protects running and peers + running bool + peers map[discover.NodeID]*Peer - ntab *discover.Table + ntab *discover.Table + listener net.Listener quit chan struct{} loopWG sync.WaitGroup // {dial,listen,nat}Loop @@ -293,16 +293,17 @@ func (srv *Server) dialLoop() { // TODO: maybe limit number of active dials dial := func(dest *discover.Node) { - srv.lock.Lock() - ok, _ := srv.checkPeer(dest.ID) - srv.lock.Unlock() // Don't dial nodes that would fail the checks in addPeer. // This is important because the connection handshake is a lot // of work and we'd rather avoid doing that work for peers // that can't be added. + srv.lock.RLock() + ok, _ := srv.checkPeer(dest.ID) + srv.lock.RUnlock() if !ok || dialing[dest.ID] { return } + dialing[dest.ID] = true srv.peerWG.Add(1) go func() { @@ -315,9 +316,10 @@ func (srv *Server) dialLoop() { for { select { case <-refresh.C: - srv.lock.Lock() + // Grab some nodes to connect to if we're not at capacity. + srv.lock.RLock() needpeers := len(srv.peers) < srv.MaxPeers - srv.lock.Unlock() + srv.lock.RUnlock() if needpeers { go func() { var target discover.NodeID -- cgit v1.2.3 From c5332537f5726610c3c1606ead8cbaa83144b537 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 10 Apr 2015 17:24:41 +0200 Subject: p2p: limit number of lingering inbound pre-handshake connections This is supposed to apply some back pressure so Server is not accepting more connections than it can actually handle. The current limit is 50. This doesn't really need to be configurable, but we'll see how it behaves in our test nodes and adjust accordingly. --- p2p/server.go | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) (limited to 'p2p/server.go') diff --git a/p2p/server.go b/p2p/server.go index 7b880f77b..5cd3dc2ad 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -21,6 +21,11 @@ const ( defaultDialTimeout = 10 * time.Second refreshPeersInterval = 30 * time.Second + // This is the maximum number of inbound connection + // that are allowed to linger between 'accepted' and + // 'added as peer'. + maxAcceptConns = 50 + // total timeout for encryption handshake and protocol // handshake in both directions. handshakeTimeout = 5 * time.Second @@ -269,15 +274,28 @@ func (srv *Server) Self() *discover.Node { // main loop for adding connections via listening func (srv *Server) listenLoop() { defer srv.loopWG.Done() + + // This channel acts as a semaphore limiting + // active inbound connections that are lingering pre-handshake. + // If all slots are taken, no further connections are accepted. + slots := make(chan struct{}, maxAcceptConns) + for i := 0; i < maxAcceptConns; i++ { + slots <- struct{}{} + } + glog.V(logger.Info).Infoln("Listening on", srv.listener.Addr()) for { + <-slots conn, err := srv.listener.Accept() if err != nil { return } glog.V(logger.Debug).Infof("Accepted conn %v\n", conn.RemoteAddr()) srv.peerWG.Add(1) - go srv.startPeer(conn, nil) + go func() { + srv.startPeer(conn, nil) + slots <- struct{}{} + }() } } -- cgit v1.2.3