aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/server.go')
-rw-r--r--p2p/server.go425
1 files changed, 425 insertions, 0 deletions
diff --git a/p2p/server.go b/p2p/server.go
new file mode 100644
index 000000000..34000cb4c
--- /dev/null
+++ b/p2p/server.go
@@ -0,0 +1,425 @@
+package p2p
+
+import (
+ "bytes"
+ "crypto/ecdsa"
+ "errors"
+ "fmt"
+ "net"
+ "runtime"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/ethutil"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/p2p/nat"
+)
+
+const (
+ defaultDialTimeout = 10 * time.Second
+ refreshPeersInterval = 30 * time.Second
+
+ // total timeout for encryption handshake and protocol
+ // handshake in both directions.
+ handshakeTimeout = 5 * time.Second
+ // maximum time allowed for reading a complete message.
+ // this is effectively the amount of time a connection can be idle.
+ frameReadTimeout = 1 * time.Minute
+ // maximum amount of time allowed for writing a complete message.
+ frameWriteTimeout = 5 * time.Second
+)
+
+var srvlog = logger.NewLogger("P2P Server")
+var srvjslog = logger.NewJsonLogger()
+
+// MakeName creates a node name that follows the ethereum convention
+// for such names. It adds the operation system name and Go runtime version
+// the name.
+func MakeName(name, version string) string {
+ return fmt.Sprintf("%s/v%s/%s/%s", name, version, runtime.GOOS, runtime.Version())
+}
+
+// Server manages all peer connections.
+//
+// The fields of Server are used as configuration parameters.
+// You should set them before starting the Server. Fields may not be
+// modified while the server is running.
+type Server struct {
+ // This field must be set to a valid secp256k1 private key.
+ PrivateKey *ecdsa.PrivateKey
+
+ // MaxPeers is the maximum number of peers that can be
+ // connected. It must be greater than zero.
+ MaxPeers int
+
+ // Name sets the node name of this server.
+ // Use MakeName to create a name that follows existing conventions.
+ Name string
+
+ // Bootstrap nodes are used to establish connectivity
+ // with the rest of the network.
+ BootstrapNodes []*discover.Node
+
+ // Protocols should contain the protocols supported
+ // by the server. Matching protocols are launched for
+ // each peer.
+ Protocols []Protocol
+
+ // If ListenAddr is set to a non-nil address, the server
+ // will listen for incoming connections.
+ //
+ // If the port is zero, the operating system will pick a port. The
+ // ListenAddr field will be updated with the actual address when
+ // the server is started.
+ ListenAddr string
+
+ // If set to a non-nil value, the given NAT port mapper
+ // is used to make the listening port available to the
+ // Internet.
+ NAT nat.Interface
+
+ // If Dialer is set to a non-nil value, the given Dialer
+ // is used to dial outbound peer connections.
+ Dialer *net.Dialer
+
+ // If NoDial is true, the server will not dial any peers.
+ NoDial bool
+
+ // Hooks for testing. These are useful because we can inhibit
+ // the whole protocol stack.
+ setupFunc
+ newPeerHook
+
+ ourHandshake *protoHandshake
+
+ lock sync.RWMutex
+ running bool
+ listener net.Listener
+ peers map[discover.NodeID]*Peer
+
+ ntab *discover.Table
+
+ quit chan struct{}
+ loopWG sync.WaitGroup // {dial,listen,nat}Loop
+ peerWG sync.WaitGroup // active peer goroutines
+ peerConnect chan *discover.Node
+}
+
+type setupFunc func(net.Conn, *ecdsa.PrivateKey, *protoHandshake, *discover.Node) (*conn, error)
+type newPeerHook func(*Peer)
+
+// Peers returns all connected peers.
+func (srv *Server) Peers() (peers []*Peer) {
+ srv.lock.RLock()
+ defer srv.lock.RUnlock()
+ for _, peer := range srv.peers {
+ if peer != nil {
+ peers = append(peers, peer)
+ }
+ }
+ return
+}
+
+// PeerCount returns the number of connected peers.
+func (srv *Server) PeerCount() int {
+ srv.lock.RLock()
+ n := len(srv.peers)
+ srv.lock.RUnlock()
+ return n
+}
+
+// SuggestPeer creates a connection to the given Node if it
+// is not already connected.
+func (srv *Server) SuggestPeer(n *discover.Node) {
+ srv.peerConnect <- n
+}
+
+// Broadcast sends an RLP-encoded message to all connected peers.
+// This method is deprecated and will be removed later.
+func (srv *Server) Broadcast(protocol string, code uint64, data ...interface{}) {
+ var payload []byte
+ if data != nil {
+ payload = ethutil.Encode(data)
+ }
+ srv.lock.RLock()
+ defer srv.lock.RUnlock()
+ for _, peer := range srv.peers {
+ if peer != nil {
+ var msg = Msg{Code: code}
+ if data != nil {
+ msg.Payload = bytes.NewReader(payload)
+ msg.Size = uint32(len(payload))
+ }
+ peer.writeProtoMsg(protocol, msg)
+ }
+ }
+}
+
+// Start starts running the server.
+// Servers can be re-used and started again after stopping.
+func (srv *Server) Start() (err error) {
+ srv.lock.Lock()
+ defer srv.lock.Unlock()
+ if srv.running {
+ return errors.New("server already running")
+ }
+ srvlog.Infoln("Starting Server")
+
+ // static fields
+ if srv.PrivateKey == nil {
+ return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
+ }
+ if srv.MaxPeers <= 0 {
+ return fmt.Errorf("Server.MaxPeers must be > 0")
+ }
+ srv.quit = make(chan struct{})
+ srv.peers = make(map[discover.NodeID]*Peer)
+ srv.peerConnect = make(chan *discover.Node)
+ if srv.setupFunc == nil {
+ srv.setupFunc = setupConn
+ }
+
+ // node table
+ ntab, err := discover.ListenUDP(srv.PrivateKey, srv.ListenAddr, srv.NAT)
+ if err != nil {
+ return err
+ }
+ srv.ntab = ntab
+
+ // handshake
+ srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: ntab.Self()}
+ for _, p := range srv.Protocols {
+ srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
+ }
+
+ // listen/dial
+ if srv.ListenAddr != "" {
+ if err := srv.startListening(); err != nil {
+ return err
+ }
+ }
+ if srv.Dialer == nil {
+ srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout}
+ }
+ if !srv.NoDial {
+ srv.loopWG.Add(1)
+ go srv.dialLoop()
+ }
+ if srv.NoDial && srv.ListenAddr == "" {
+ srvlog.Warnln("I will be kind-of useless, neither dialing nor listening.")
+ }
+
+ srv.running = true
+ return nil
+}
+
+func (srv *Server) startListening() error {
+ listener, err := net.Listen("tcp", srv.ListenAddr)
+ if err != nil {
+ return err
+ }
+ laddr := listener.Addr().(*net.TCPAddr)
+ srv.ListenAddr = laddr.String()
+ srv.listener = listener
+ srv.loopWG.Add(1)
+ go srv.listenLoop()
+ if !laddr.IP.IsLoopback() && srv.NAT != nil {
+ srv.loopWG.Add(1)
+ go func() {
+ nat.Map(srv.NAT, srv.quit, "tcp", laddr.Port, laddr.Port, "ethereum p2p")
+ srv.loopWG.Done()
+ }()
+ }
+ return nil
+}
+
+// Stop terminates the server and all active peer connections.
+// It blocks until all active connections have been closed.
+func (srv *Server) Stop() {
+ srv.lock.Lock()
+ if !srv.running {
+ srv.lock.Unlock()
+ return
+ }
+ srv.running = false
+ srv.lock.Unlock()
+
+ srvlog.Infoln("Stopping Server")
+ srv.ntab.Close()
+ if srv.listener != nil {
+ // this unblocks listener Accept
+ srv.listener.Close()
+ }
+ close(srv.quit)
+ srv.loopWG.Wait()
+
+ // No new peers can be added at this point because dialLoop and
+ // listenLoop are down. It is safe to call peerWG.Wait because
+ // peerWG.Add is not called outside of those loops.
+ for _, peer := range srv.peers {
+ peer.Disconnect(DiscQuitting)
+ }
+ srv.peerWG.Wait()
+}
+
+// main loop for adding connections via listening
+func (srv *Server) listenLoop() {
+ defer srv.loopWG.Done()
+ srvlog.Infoln("Listening on", srv.listener.Addr())
+ for {
+ conn, err := srv.listener.Accept()
+ if err != nil {
+ return
+ }
+ srvlog.Debugf("Accepted conn %v\n", conn.RemoteAddr())
+ srv.peerWG.Add(1)
+ go srv.startPeer(conn, nil)
+ }
+}
+
+func (srv *Server) dialLoop() {
+ defer srv.loopWG.Done()
+ refresh := time.NewTicker(refreshPeersInterval)
+ defer refresh.Stop()
+
+ srv.ntab.Bootstrap(srv.BootstrapNodes)
+ go srv.findPeers()
+
+ dialed := make(chan *discover.Node)
+ dialing := make(map[discover.NodeID]bool)
+
+ // TODO: limit number of active dials
+ // TODO: ensure only one findPeers goroutine is running
+ // TODO: pause findPeers when we're at capacity
+
+ for {
+ select {
+ case <-refresh.C:
+
+ go srv.findPeers()
+
+ case dest := <-srv.peerConnect:
+ // avoid dialing nodes that are already connected.
+ // there is another check for this in addPeer,
+ // which runs after the handshake.
+ srv.lock.Lock()
+ _, isconnected := srv.peers[dest.ID]
+ srv.lock.Unlock()
+ if isconnected || dialing[dest.ID] || dest.ID == srv.ntab.Self() {
+ continue
+ }
+
+ dialing[dest.ID] = true
+ srv.peerWG.Add(1)
+ go func() {
+ srv.dialNode(dest)
+ // at this point, the peer has been added
+ // or discarded. either way, we're not dialing it anymore.
+ dialed <- dest
+ }()
+
+ case dest := <-dialed:
+ delete(dialing, dest.ID)
+
+ case <-srv.quit:
+ // TODO: maybe wait for active dials
+ return
+ }
+ }
+}
+
+func (srv *Server) dialNode(dest *discover.Node) {
+ addr := &net.TCPAddr{IP: dest.IP, Port: dest.TCPPort}
+ srvlog.Debugf("Dialing %v\n", dest)
+ conn, err := srv.Dialer.Dial("tcp", addr.String())
+ if err != nil {
+ srvlog.DebugDetailf("dial error: %v", err)
+ return
+ }
+ srv.startPeer(conn, dest)
+}
+
+func (srv *Server) findPeers() {
+ far := srv.ntab.Self()
+ for i := range far {
+ far[i] = ^far[i]
+ }
+ closeToSelf := srv.ntab.Lookup(srv.ntab.Self())
+ farFromSelf := srv.ntab.Lookup(far)
+
+ for i := 0; i < len(closeToSelf) || i < len(farFromSelf); i++ {
+ if i < len(closeToSelf) {
+ srv.peerConnect <- closeToSelf[i]
+ }
+ if i < len(farFromSelf) {
+ srv.peerConnect <- farFromSelf[i]
+ }
+ }
+}
+
+func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
+ // TODO: handle/store session token
+ fd.SetDeadline(time.Now().Add(handshakeTimeout))
+ conn, err := srv.setupFunc(fd, srv.PrivateKey, srv.ourHandshake, dest)
+ if err != nil {
+ fd.Close()
+ srvlog.Debugf("Handshake with %v failed: %v", fd.RemoteAddr(), err)
+ return
+ }
+
+ conn.MsgReadWriter = &netWrapper{
+ wrapped: conn.MsgReadWriter,
+ conn: fd, rtimeout: frameReadTimeout, wtimeout: frameWriteTimeout,
+ }
+ p := newPeer(fd, conn, srv.Protocols)
+ if ok, reason := srv.addPeer(conn.ID, p); !ok {
+ srvlog.DebugDetailf("Not adding %v (%v)\n", p, reason)
+ p.politeDisconnect(reason)
+ return
+ }
+
+ srvlog.Debugf("Added %v\n", p)
+ srvjslog.LogJson(&logger.P2PConnected{
+ RemoteId: fmt.Sprintf("%x", conn.ID[:]),
+ RemoteAddress: fd.RemoteAddr().String(),
+ RemoteVersionString: conn.Name,
+ NumConnections: srv.PeerCount(),
+ })
+
+ if srv.newPeerHook != nil {
+ srv.newPeerHook(p)
+ }
+ discreason := p.run()
+ srv.removePeer(p)
+
+ srvlog.Debugf("Removed %v (%v)\n", p, discreason)
+ srvjslog.LogJson(&logger.P2PDisconnected{
+ RemoteId: fmt.Sprintf("%x", conn.ID[:]),
+ NumConnections: srv.PeerCount(),
+ })
+}
+
+func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) {
+ srv.lock.Lock()
+ defer srv.lock.Unlock()
+ switch {
+ case !srv.running:
+ return false, DiscQuitting
+ case len(srv.peers) >= srv.MaxPeers:
+ return false, DiscTooManyPeers
+ case srv.peers[id] != nil:
+ return false, DiscAlreadyConnected
+ case id == srv.ntab.Self():
+ return false, DiscSelf
+ }
+ srv.peers[id] = p
+ return true, 0
+}
+
+func (srv *Server) removePeer(p *Peer) {
+ srv.lock.Lock()
+ delete(srv.peers, p.ID())
+ srv.lock.Unlock()
+ srv.peerWG.Done()
+}