aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/server.go
diff options
context:
space:
mode:
authorJeffrey Wilcke <jeffrey@ethereum.org>2015-04-10 23:59:32 +0800
committerJeffrey Wilcke <jeffrey@ethereum.org>2015-04-10 23:59:32 +0800
commit92fbb61625c8fa8f07c6ab9458b53884b8a4e95c (patch)
tree7cdf4fce82ff599fcd86aa01bd952ddf310fb2f1 /p2p/server.go
parentfc1d1f9afd155fab1f614c6a0340233f90afafd6 (diff)
parentc5332537f5726610c3c1606ead8cbaa83144b537 (diff)
downloadgo-tangerine-92fbb61625c8fa8f07c6ab9458b53884b8a4e95c.tar
go-tangerine-92fbb61625c8fa8f07c6ab9458b53884b8a4e95c.tar.gz
go-tangerine-92fbb61625c8fa8f07c6ab9458b53884b8a4e95c.tar.bz2
go-tangerine-92fbb61625c8fa8f07c6ab9458b53884b8a4e95c.tar.lz
go-tangerine-92fbb61625c8fa8f07c6ab9458b53884b8a4e95c.tar.xz
go-tangerine-92fbb61625c8fa8f07c6ab9458b53884b8a4e95c.tar.zst
go-tangerine-92fbb61625c8fa8f07c6ab9458b53884b8a4e95c.zip
Merge pull request #691 from fjl/discovery-fixes
p2p: a bunch of fixes
Diffstat (limited to 'p2p/server.go')
-rw-r--r--p2p/server.go176
1 files changed, 108 insertions, 68 deletions
diff --git a/p2p/server.go b/p2p/server.go
index 0a2621aa8..5cd3dc2ad 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -3,6 +3,7 @@ package p2p
import (
"bytes"
"crypto/ecdsa"
+ "crypto/rand"
"errors"
"fmt"
"net"
@@ -20,6 +21,11 @@ const (
defaultDialTimeout = 10 * time.Second
refreshPeersInterval = 30 * time.Second
+ // This is the maximum number of inbound connection
+ // that are allowed to linger between 'accepted' and
+ // 'added as peer'.
+ maxAcceptConns = 50
+
// total timeout for encryption handshake and protocol
// handshake in both directions.
handshakeTimeout = 5 * time.Second
@@ -85,12 +91,12 @@ type Server struct {
ourHandshake *protoHandshake
- lock sync.RWMutex
- running bool
- listener net.Listener
- peers map[discover.NodeID]*Peer
+ lock sync.RWMutex // protects running and peers
+ running bool
+ peers map[discover.NodeID]*Peer
- ntab *discover.Table
+ ntab *discover.Table
+ listener net.Listener
quit chan struct{}
loopWG sync.WaitGroup // {dial,listen,nat}Loop
@@ -98,7 +104,7 @@ type Server struct {
peerConnect chan *discover.Node
}
-type setupFunc func(net.Conn, *ecdsa.PrivateKey, *protoHandshake, *discover.Node) (*conn, error)
+type setupFunc func(net.Conn, *ecdsa.PrivateKey, *protoHandshake, *discover.Node, bool) (*conn, error)
type newPeerHook func(*Peer)
// Peers returns all connected peers.
@@ -260,62 +266,94 @@ func (srv *Server) Stop() {
srv.peerWG.Wait()
}
+// Self returns the local node's endpoint information.
+func (srv *Server) Self() *discover.Node {
+ return srv.ntab.Self()
+}
+
// main loop for adding connections via listening
func (srv *Server) listenLoop() {
defer srv.loopWG.Done()
+
+ // This channel acts as a semaphore limiting
+ // active inbound connections that are lingering pre-handshake.
+ // If all slots are taken, no further connections are accepted.
+ slots := make(chan struct{}, maxAcceptConns)
+ for i := 0; i < maxAcceptConns; i++ {
+ slots <- struct{}{}
+ }
+
glog.V(logger.Info).Infoln("Listening on", srv.listener.Addr())
for {
+ <-slots
conn, err := srv.listener.Accept()
if err != nil {
return
}
glog.V(logger.Debug).Infof("Accepted conn %v\n", conn.RemoteAddr())
srv.peerWG.Add(1)
- go srv.startPeer(conn, nil)
+ go func() {
+ srv.startPeer(conn, nil)
+ slots <- struct{}{}
+ }()
}
}
func (srv *Server) dialLoop() {
+ var (
+ dialed = make(chan *discover.Node)
+ dialing = make(map[discover.NodeID]bool)
+ findresults = make(chan []*discover.Node)
+ refresh = time.NewTimer(0)
+ )
defer srv.loopWG.Done()
- refresh := time.NewTicker(refreshPeersInterval)
defer refresh.Stop()
- srv.ntab.Bootstrap(srv.BootstrapNodes)
- go srv.findPeers()
-
- dialed := make(chan *discover.Node)
- dialing := make(map[discover.NodeID]bool)
+ // TODO: maybe limit number of active dials
+ dial := func(dest *discover.Node) {
+ // Don't dial nodes that would fail the checks in addPeer.
+ // This is important because the connection handshake is a lot
+ // of work and we'd rather avoid doing that work for peers
+ // that can't be added.
+ srv.lock.RLock()
+ ok, _ := srv.checkPeer(dest.ID)
+ srv.lock.RUnlock()
+ if !ok || dialing[dest.ID] {
+ return
+ }
- // TODO: limit number of active dials
- // TODO: ensure only one findPeers goroutine is running
- // TODO: pause findPeers when we're at capacity
+ dialing[dest.ID] = true
+ srv.peerWG.Add(1)
+ go func() {
+ srv.dialNode(dest)
+ dialed <- dest
+ }()
+ }
+ srv.ntab.Bootstrap(srv.BootstrapNodes)
for {
select {
case <-refresh.C:
-
- go srv.findPeers()
+ // Grab some nodes to connect to if we're not at capacity.
+ srv.lock.RLock()
+ needpeers := len(srv.peers) < srv.MaxPeers
+ srv.lock.RUnlock()
+ if needpeers {
+ go func() {
+ var target discover.NodeID
+ rand.Read(target[:])
+ findresults <- srv.ntab.Lookup(target)
+ }()
+ refresh.Stop()
+ }
case dest := <-srv.peerConnect:
- // avoid dialing nodes that are already connected.
- // there is another check for this in addPeer,
- // which runs after the handshake.
- srv.lock.Lock()
- _, isconnected := srv.peers[dest.ID]
- srv.lock.Unlock()
- if isconnected || dialing[dest.ID] || dest.ID == srv.Self().ID {
- continue
+ dial(dest)
+ case dests := <-findresults:
+ for _, dest := range dests {
+ dial(dest)
}
-
- dialing[dest.ID] = true
- srv.peerWG.Add(1)
- go func() {
- srv.dialNode(dest)
- // at this point, the peer has been added
- // or discarded. either way, we're not dialing it anymore.
- dialed <- dest
- }()
-
+ refresh.Reset(refreshPeersInterval)
case dest := <-dialed:
delete(dialing, dest.ID)
@@ -331,44 +369,34 @@ func (srv *Server) dialNode(dest *discover.Node) {
glog.V(logger.Debug).Infof("Dialing %v\n", dest)
conn, err := srv.Dialer.Dial("tcp", addr.String())
if err != nil {
+ // dialLoop adds to the wait group counter when launching
+ // dialNode, so we need to count it down again. startPeer also
+ // does that when an error occurs.
+ srv.peerWG.Done()
glog.V(logger.Detail).Infof("dial error: %v", err)
return
}
srv.startPeer(conn, dest)
}
-func (srv *Server) Self() *discover.Node {
- return srv.ntab.Self()
-}
-
-func (srv *Server) findPeers() {
- far := srv.Self().ID
- for i := range far {
- far[i] = ^far[i]
- }
- closeToSelf := srv.ntab.Lookup(srv.Self().ID)
- farFromSelf := srv.ntab.Lookup(far)
-
- for i := 0; i < len(closeToSelf) || i < len(farFromSelf); i++ {
- if i < len(closeToSelf) {
- srv.peerConnect <- closeToSelf[i]
- }
- if i < len(farFromSelf) {
- srv.peerConnect <- farFromSelf[i]
- }
- }
-}
-
func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
// TODO: handle/store session token
+
+ // Run setupFunc, which should create an authenticated connection
+ // and run the capability exchange. Note that any early error
+ // returns during that exchange need to call peerWG.Done because
+ // the callers of startPeer added the peer to the wait group already.
fd.SetDeadline(time.Now().Add(handshakeTimeout))
- conn, err := srv.setupFunc(fd, srv.PrivateKey, srv.ourHandshake, dest)
+ srv.lock.RLock()
+ atcap := len(srv.peers) == srv.MaxPeers
+ srv.lock.RUnlock()
+ conn, err := srv.setupFunc(fd, srv.PrivateKey, srv.ourHandshake, dest, atcap)
if err != nil {
fd.Close()
glog.V(logger.Debug).Infof("Handshake with %v failed: %v", fd.RemoteAddr(), err)
+ srv.peerWG.Done()
return
}
-
conn.MsgReadWriter = &netWrapper{
wrapped: conn.MsgReadWriter,
conn: fd, rtimeout: frameReadTimeout, wtimeout: frameWriteTimeout,
@@ -377,26 +405,30 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
if ok, reason := srv.addPeer(conn.ID, p); !ok {
glog.V(logger.Detail).Infof("Not adding %v (%v)\n", p, reason)
p.politeDisconnect(reason)
+ srv.peerWG.Done()
return
}
+ // The handshakes are done and it passed all checks.
+ // Spawn the Peer loops.
+ go srv.runPeer(p)
+}
+func (srv *Server) runPeer(p *Peer) {
glog.V(logger.Debug).Infof("Added %v\n", p)
srvjslog.LogJson(&logger.P2PConnected{
- RemoteId: fmt.Sprintf("%x", conn.ID[:]),
- RemoteAddress: fd.RemoteAddr().String(),
- RemoteVersionString: conn.Name,
+ RemoteId: p.ID().String(),
+ RemoteAddress: p.RemoteAddr().String(),
+ RemoteVersionString: p.Name(),
NumConnections: srv.PeerCount(),
})
-
if srv.newPeerHook != nil {
srv.newPeerHook(p)
}
discreason := p.run()
srv.removePeer(p)
-
glog.V(logger.Debug).Infof("Removed %v (%v)\n", p, discreason)
srvjslog.LogJson(&logger.P2PDisconnected{
- RemoteId: fmt.Sprintf("%x", conn.ID[:]),
+ RemoteId: p.ID().String(),
NumConnections: srv.PeerCount(),
})
}
@@ -404,6 +436,14 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) {
srv.lock.Lock()
defer srv.lock.Unlock()
+ if ok, reason := srv.checkPeer(id); !ok {
+ return false, reason
+ }
+ srv.peers[id] = p
+ return true, 0
+}
+
+func (srv *Server) checkPeer(id discover.NodeID) (bool, DiscReason) {
switch {
case !srv.running:
return false, DiscQuitting
@@ -413,9 +453,9 @@ func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) {
return false, DiscAlreadyConnected
case id == srv.Self().ID:
return false, DiscSelf
+ default:
+ return true, 0
}
- srv.peers[id] = p
- return true, 0
}
func (srv *Server) removePeer(p *Peer) {