diff options
Diffstat (limited to 'p2p/server.go')
-rw-r--r-- | p2p/server.go | 425 |
1 files changed, 425 insertions, 0 deletions
diff --git a/p2p/server.go b/p2p/server.go new file mode 100644 index 000000000..34000cb4c --- /dev/null +++ b/p2p/server.go @@ -0,0 +1,425 @@ +package p2p + +import ( + "bytes" + "crypto/ecdsa" + "errors" + "fmt" + "net" + "runtime" + "sync" + "time" + + "github.com/ethereum/go-ethereum/ethutil" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/nat" +) + +const ( + defaultDialTimeout = 10 * time.Second + refreshPeersInterval = 30 * time.Second + + // total timeout for encryption handshake and protocol + // handshake in both directions. + handshakeTimeout = 5 * time.Second + // maximum time allowed for reading a complete message. + // this is effectively the amount of time a connection can be idle. + frameReadTimeout = 1 * time.Minute + // maximum amount of time allowed for writing a complete message. + frameWriteTimeout = 5 * time.Second +) + +var srvlog = logger.NewLogger("P2P Server") +var srvjslog = logger.NewJsonLogger() + +// MakeName creates a node name that follows the ethereum convention +// for such names. It adds the operation system name and Go runtime version +// the name. +func MakeName(name, version string) string { + return fmt.Sprintf("%s/v%s/%s/%s", name, version, runtime.GOOS, runtime.Version()) +} + +// Server manages all peer connections. +// +// The fields of Server are used as configuration parameters. +// You should set them before starting the Server. Fields may not be +// modified while the server is running. +type Server struct { + // This field must be set to a valid secp256k1 private key. + PrivateKey *ecdsa.PrivateKey + + // MaxPeers is the maximum number of peers that can be + // connected. It must be greater than zero. + MaxPeers int + + // Name sets the node name of this server. + // Use MakeName to create a name that follows existing conventions. + Name string + + // Bootstrap nodes are used to establish connectivity + // with the rest of the network. + BootstrapNodes []*discover.Node + + // Protocols should contain the protocols supported + // by the server. Matching protocols are launched for + // each peer. + Protocols []Protocol + + // If ListenAddr is set to a non-nil address, the server + // will listen for incoming connections. + // + // If the port is zero, the operating system will pick a port. The + // ListenAddr field will be updated with the actual address when + // the server is started. + ListenAddr string + + // If set to a non-nil value, the given NAT port mapper + // is used to make the listening port available to the + // Internet. + NAT nat.Interface + + // If Dialer is set to a non-nil value, the given Dialer + // is used to dial outbound peer connections. + Dialer *net.Dialer + + // If NoDial is true, the server will not dial any peers. + NoDial bool + + // Hooks for testing. These are useful because we can inhibit + // the whole protocol stack. + setupFunc + newPeerHook + + ourHandshake *protoHandshake + + lock sync.RWMutex + running bool + listener net.Listener + peers map[discover.NodeID]*Peer + + ntab *discover.Table + + quit chan struct{} + loopWG sync.WaitGroup // {dial,listen,nat}Loop + peerWG sync.WaitGroup // active peer goroutines + peerConnect chan *discover.Node +} + +type setupFunc func(net.Conn, *ecdsa.PrivateKey, *protoHandshake, *discover.Node) (*conn, error) +type newPeerHook func(*Peer) + +// Peers returns all connected peers. +func (srv *Server) Peers() (peers []*Peer) { + srv.lock.RLock() + defer srv.lock.RUnlock() + for _, peer := range srv.peers { + if peer != nil { + peers = append(peers, peer) + } + } + return +} + +// PeerCount returns the number of connected peers. +func (srv *Server) PeerCount() int { + srv.lock.RLock() + n := len(srv.peers) + srv.lock.RUnlock() + return n +} + +// SuggestPeer creates a connection to the given Node if it +// is not already connected. +func (srv *Server) SuggestPeer(n *discover.Node) { + srv.peerConnect <- n +} + +// Broadcast sends an RLP-encoded message to all connected peers. +// This method is deprecated and will be removed later. +func (srv *Server) Broadcast(protocol string, code uint64, data ...interface{}) { + var payload []byte + if data != nil { + payload = ethutil.Encode(data) + } + srv.lock.RLock() + defer srv.lock.RUnlock() + for _, peer := range srv.peers { + if peer != nil { + var msg = Msg{Code: code} + if data != nil { + msg.Payload = bytes.NewReader(payload) + msg.Size = uint32(len(payload)) + } + peer.writeProtoMsg(protocol, msg) + } + } +} + +// Start starts running the server. +// Servers can be re-used and started again after stopping. +func (srv *Server) Start() (err error) { + srv.lock.Lock() + defer srv.lock.Unlock() + if srv.running { + return errors.New("server already running") + } + srvlog.Infoln("Starting Server") + + // static fields + if srv.PrivateKey == nil { + return fmt.Errorf("Server.PrivateKey must be set to a non-nil key") + } + if srv.MaxPeers <= 0 { + return fmt.Errorf("Server.MaxPeers must be > 0") + } + srv.quit = make(chan struct{}) + srv.peers = make(map[discover.NodeID]*Peer) + srv.peerConnect = make(chan *discover.Node) + if srv.setupFunc == nil { + srv.setupFunc = setupConn + } + + // node table + ntab, err := discover.ListenUDP(srv.PrivateKey, srv.ListenAddr, srv.NAT) + if err != nil { + return err + } + srv.ntab = ntab + + // handshake + srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: ntab.Self()} + for _, p := range srv.Protocols { + srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap()) + } + + // listen/dial + if srv.ListenAddr != "" { + if err := srv.startListening(); err != nil { + return err + } + } + if srv.Dialer == nil { + srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout} + } + if !srv.NoDial { + srv.loopWG.Add(1) + go srv.dialLoop() + } + if srv.NoDial && srv.ListenAddr == "" { + srvlog.Warnln("I will be kind-of useless, neither dialing nor listening.") + } + + srv.running = true + return nil +} + +func (srv *Server) startListening() error { + listener, err := net.Listen("tcp", srv.ListenAddr) + if err != nil { + return err + } + laddr := listener.Addr().(*net.TCPAddr) + srv.ListenAddr = laddr.String() + srv.listener = listener + srv.loopWG.Add(1) + go srv.listenLoop() + if !laddr.IP.IsLoopback() && srv.NAT != nil { + srv.loopWG.Add(1) + go func() { + nat.Map(srv.NAT, srv.quit, "tcp", laddr.Port, laddr.Port, "ethereum p2p") + srv.loopWG.Done() + }() + } + return nil +} + +// Stop terminates the server and all active peer connections. +// It blocks until all active connections have been closed. +func (srv *Server) Stop() { + srv.lock.Lock() + if !srv.running { + srv.lock.Unlock() + return + } + srv.running = false + srv.lock.Unlock() + + srvlog.Infoln("Stopping Server") + srv.ntab.Close() + if srv.listener != nil { + // this unblocks listener Accept + srv.listener.Close() + } + close(srv.quit) + srv.loopWG.Wait() + + // No new peers can be added at this point because dialLoop and + // listenLoop are down. It is safe to call peerWG.Wait because + // peerWG.Add is not called outside of those loops. + for _, peer := range srv.peers { + peer.Disconnect(DiscQuitting) + } + srv.peerWG.Wait() +} + +// main loop for adding connections via listening +func (srv *Server) listenLoop() { + defer srv.loopWG.Done() + srvlog.Infoln("Listening on", srv.listener.Addr()) + for { + conn, err := srv.listener.Accept() + if err != nil { + return + } + srvlog.Debugf("Accepted conn %v\n", conn.RemoteAddr()) + srv.peerWG.Add(1) + go srv.startPeer(conn, nil) + } +} + +func (srv *Server) dialLoop() { + defer srv.loopWG.Done() + refresh := time.NewTicker(refreshPeersInterval) + defer refresh.Stop() + + srv.ntab.Bootstrap(srv.BootstrapNodes) + go srv.findPeers() + + dialed := make(chan *discover.Node) + dialing := make(map[discover.NodeID]bool) + + // TODO: limit number of active dials + // TODO: ensure only one findPeers goroutine is running + // TODO: pause findPeers when we're at capacity + + for { + select { + case <-refresh.C: + + go srv.findPeers() + + case dest := <-srv.peerConnect: + // avoid dialing nodes that are already connected. + // there is another check for this in addPeer, + // which runs after the handshake. + srv.lock.Lock() + _, isconnected := srv.peers[dest.ID] + srv.lock.Unlock() + if isconnected || dialing[dest.ID] || dest.ID == srv.ntab.Self() { + continue + } + + dialing[dest.ID] = true + srv.peerWG.Add(1) + go func() { + srv.dialNode(dest) + // at this point, the peer has been added + // or discarded. either way, we're not dialing it anymore. + dialed <- dest + }() + + case dest := <-dialed: + delete(dialing, dest.ID) + + case <-srv.quit: + // TODO: maybe wait for active dials + return + } + } +} + +func (srv *Server) dialNode(dest *discover.Node) { + addr := &net.TCPAddr{IP: dest.IP, Port: dest.TCPPort} + srvlog.Debugf("Dialing %v\n", dest) + conn, err := srv.Dialer.Dial("tcp", addr.String()) + if err != nil { + srvlog.DebugDetailf("dial error: %v", err) + return + } + srv.startPeer(conn, dest) +} + +func (srv *Server) findPeers() { + far := srv.ntab.Self() + for i := range far { + far[i] = ^far[i] + } + closeToSelf := srv.ntab.Lookup(srv.ntab.Self()) + farFromSelf := srv.ntab.Lookup(far) + + for i := 0; i < len(closeToSelf) || i < len(farFromSelf); i++ { + if i < len(closeToSelf) { + srv.peerConnect <- closeToSelf[i] + } + if i < len(farFromSelf) { + srv.peerConnect <- farFromSelf[i] + } + } +} + +func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) { + // TODO: handle/store session token + fd.SetDeadline(time.Now().Add(handshakeTimeout)) + conn, err := srv.setupFunc(fd, srv.PrivateKey, srv.ourHandshake, dest) + if err != nil { + fd.Close() + srvlog.Debugf("Handshake with %v failed: %v", fd.RemoteAddr(), err) + return + } + + conn.MsgReadWriter = &netWrapper{ + wrapped: conn.MsgReadWriter, + conn: fd, rtimeout: frameReadTimeout, wtimeout: frameWriteTimeout, + } + p := newPeer(fd, conn, srv.Protocols) + if ok, reason := srv.addPeer(conn.ID, p); !ok { + srvlog.DebugDetailf("Not adding %v (%v)\n", p, reason) + p.politeDisconnect(reason) + return + } + + srvlog.Debugf("Added %v\n", p) + srvjslog.LogJson(&logger.P2PConnected{ + RemoteId: fmt.Sprintf("%x", conn.ID[:]), + RemoteAddress: fd.RemoteAddr().String(), + RemoteVersionString: conn.Name, + NumConnections: srv.PeerCount(), + }) + + if srv.newPeerHook != nil { + srv.newPeerHook(p) + } + discreason := p.run() + srv.removePeer(p) + + srvlog.Debugf("Removed %v (%v)\n", p, discreason) + srvjslog.LogJson(&logger.P2PDisconnected{ + RemoteId: fmt.Sprintf("%x", conn.ID[:]), + NumConnections: srv.PeerCount(), + }) +} + +func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) { + srv.lock.Lock() + defer srv.lock.Unlock() + switch { + case !srv.running: + return false, DiscQuitting + case len(srv.peers) >= srv.MaxPeers: + return false, DiscTooManyPeers + case srv.peers[id] != nil: + return false, DiscAlreadyConnected + case id == srv.ntab.Self(): + return false, DiscSelf + } + srv.peers[id] = p + return true, 0 +} + +func (srv *Server) removePeer(p *Peer) { + srv.lock.Lock() + delete(srv.peers, p.ID()) + srv.lock.Unlock() + srv.peerWG.Done() +} |