diff options
author | obscuren <geffobscura@gmail.com> | 2015-02-13 22:35:54 +0800 |
---|---|---|
committer | obscuren <geffobscura@gmail.com> | 2015-02-13 22:35:54 +0800 |
commit | 76fa75b39439e6c8c83ebc03fb32aa615227cc44 (patch) | |
tree | 9b755296ae96dd5e4ced28d6de4d5abfdeb33915 /p2p/server.go | |
parent | 75d164037fb9bbf75def7c5501727fd634ef124f (diff) | |
parent | 32a9c0ca809508c1648b8f44f3e09725af7a80d3 (diff) | |
download | go-tangerine-76fa75b39439e6c8c83ebc03fb32aa615227cc44.tar go-tangerine-76fa75b39439e6c8c83ebc03fb32aa615227cc44.tar.gz go-tangerine-76fa75b39439e6c8c83ebc03fb32aa615227cc44.tar.bz2 go-tangerine-76fa75b39439e6c8c83ebc03fb32aa615227cc44.tar.lz go-tangerine-76fa75b39439e6c8c83ebc03fb32aa615227cc44.tar.xz go-tangerine-76fa75b39439e6c8c83ebc03fb32aa615227cc44.tar.zst go-tangerine-76fa75b39439e6c8c83ebc03fb32aa615227cc44.zip |
wip
Diffstat (limited to 'p2p/server.go')
-rw-r--r-- | p2p/server.go | 402 |
1 files changed, 191 insertions, 211 deletions
diff --git a/p2p/server.go b/p2p/server.go index e0d9f18a5..e510be521 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -2,37 +2,56 @@ package p2p import ( "bytes" + "crypto/ecdsa" "errors" "fmt" + "io" "net" + "runtime" "sync" "time" "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/nat" ) const ( - outboundAddressPoolSize = 500 - defaultDialTimeout = 10 * time.Second - portMappingUpdateInterval = 15 * time.Minute - portMappingTimeout = 20 * time.Minute + handshakeTimeout = 5 * time.Second + defaultDialTimeout = 10 * time.Second + refreshPeersInterval = 30 * time.Second ) var srvlog = logger.NewLogger("P2P Server") +// MakeName creates a node name that follows the ethereum convention +// for such names. It adds the operation system name and Go runtime version +// the name. +func MakeName(name, version string) string { + return fmt.Sprintf("%s/v%s/%s/%s", name, version, runtime.GOOS, runtime.Version()) +} + // Server manages all peer connections. // // The fields of Server are used as configuration parameters. // You should set them before starting the Server. Fields may not be // modified while the server is running. type Server struct { - // This field must be set to a valid client identity. - Identity ClientIdentity + // This field must be set to a valid secp256k1 private key. + PrivateKey *ecdsa.PrivateKey // MaxPeers is the maximum number of peers that can be // connected. It must be greater than zero. MaxPeers int + // Name sets the node name of this server. + // Use MakeName to create a name that follows existing conventions. + Name string + + // Bootstrap nodes are used to establish connectivity + // with the rest of the network. + BootstrapNodes []*discover.Node + // Protocols should contain the protocols supported // by the server. Matching protocols are launched for // each peer. @@ -53,7 +72,7 @@ type Server struct { // If set to a non-nil value, the given NAT port mapper // is used to make the listening port available to the // Internet. - NAT NAT + NAT nat.Interface // If Dialer is set to a non-nil value, the given Dialer // is used to dial outbound peer connections. @@ -62,35 +81,26 @@ type Server struct { // If NoDial is true, the server will not dial any peers. NoDial bool - // Hook for testing. This is useful because we can inhibit + // Hooks for testing. These are useful because we can inhibit // the whole protocol stack. - newPeerFunc peerFunc + handshakeFunc + newPeerHook - lock sync.RWMutex - running bool - listener net.Listener - laddr *net.TCPAddr // real listen addr - peers []*Peer - peerSlots chan int - peerCount int - - quit chan struct{} - wg sync.WaitGroup - peerConnect chan *peerAddr - peerDisconnect chan *Peer -} + lock sync.RWMutex + running bool + listener net.Listener + peers map[discover.NodeID]*Peer -// NAT is implemented by NAT traversal methods. -type NAT interface { - GetExternalAddress() (net.IP, error) - AddPortMapping(protocol string, extport, intport int, name string, lifetime time.Duration) error - DeletePortMapping(protocol string, extport, intport int) error + ntab *discover.Table - // Should return name of the method. - String() string + quit chan struct{} + loopWG sync.WaitGroup // {dial,listen,nat}Loop + peerWG sync.WaitGroup // active peer goroutines + peerConnect chan *discover.Node } -type peerFunc func(srv *Server, c net.Conn, dialAddr *peerAddr) *Peer +type handshakeFunc func(io.ReadWriter, *ecdsa.PrivateKey, *discover.Node) (discover.NodeID, []byte, error) +type newPeerHook func(*Peer) // Peers returns all connected peers. func (srv *Server) Peers() (peers []*Peer) { @@ -107,18 +117,15 @@ func (srv *Server) Peers() (peers []*Peer) { // PeerCount returns the number of connected peers. func (srv *Server) PeerCount() int { srv.lock.RLock() - defer srv.lock.RUnlock() - return srv.peerCount + n := len(srv.peers) + srv.lock.RUnlock() + return n } -// SuggestPeer injects an address into the outbound address pool. -func (srv *Server) SuggestPeer(ip net.IP, port int, nodeID []byte) { - addr := &peerAddr{ip, uint64(port), nodeID} - select { - case srv.peerConnect <- addr: - default: // don't block - srvlog.Warnf("peer suggestion %v ignored", addr) - } +// SuggestPeer creates a connection to the given Node if it +// is not already connected. +func (srv *Server) SuggestPeer(n *discover.Node) { + srv.peerConnect <- n } // Broadcast sends an RLP-encoded message to all connected peers. @@ -152,47 +159,46 @@ func (srv *Server) Start() (err error) { } srvlog.Infoln("Starting Server") - // initialize fields - if srv.Identity == nil { - return fmt.Errorf("Server.Identity must be set to a non-nil identity") + // initialize all the fields + if srv.PrivateKey == nil { + return fmt.Errorf("Server.PrivateKey must be set to a non-nil key") } if srv.MaxPeers <= 0 { return fmt.Errorf("Server.MaxPeers must be > 0") } srv.quit = make(chan struct{}) - srv.peers = make([]*Peer, srv.MaxPeers) - srv.peerSlots = make(chan int, srv.MaxPeers) - srv.peerConnect = make(chan *peerAddr, outboundAddressPoolSize) - srv.peerDisconnect = make(chan *Peer) - if srv.newPeerFunc == nil { - srv.newPeerFunc = newServerPeer + srv.peers = make(map[discover.NodeID]*Peer) + srv.peerConnect = make(chan *discover.Node) + + if srv.handshakeFunc == nil { + srv.handshakeFunc = encHandshake } if srv.Blacklist == nil { srv.Blacklist = NewBlacklist() } - if srv.Dialer == nil { - srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout} - } - if srv.ListenAddr != "" { if err := srv.startListening(); err != nil { return err } } + + // dial stuff + dt, err := discover.ListenUDP(srv.PrivateKey, srv.ListenAddr, srv.NAT) + if err != nil { + return err + } + srv.ntab = dt + if srv.Dialer == nil { + srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout} + } if !srv.NoDial { - srv.wg.Add(1) + srv.loopWG.Add(1) go srv.dialLoop() } if srv.NoDial && srv.ListenAddr == "" { srvlog.Warnln("I will be kind-of useless, neither dialing nor listening.") } - // make all slots available - for i := range srv.peers { - srv.peerSlots <- i - } - // note: discLoop is not part of WaitGroup - go srv.discLoop() srv.running = true return nil } @@ -202,14 +208,17 @@ func (srv *Server) startListening() error { if err != nil { return err } - srv.ListenAddr = listener.Addr().String() - srv.laddr = listener.Addr().(*net.TCPAddr) + laddr := listener.Addr().(*net.TCPAddr) + srv.ListenAddr = laddr.String() srv.listener = listener - srv.wg.Add(1) + srv.loopWG.Add(1) go srv.listenLoop() - if !srv.laddr.IP.IsLoopback() && srv.NAT != nil { - srv.wg.Add(1) - go srv.natLoop(srv.laddr.Port) + if !laddr.IP.IsLoopback() && srv.NAT != nil { + srv.loopWG.Add(1) + go func() { + nat.Map(srv.NAT, srv.quit, "tcp", laddr.Port, laddr.Port, "ethereum p2p") + srv.loopWG.Done() + }() } return nil } @@ -225,200 +234,171 @@ func (srv *Server) Stop() { srv.running = false srv.lock.Unlock() - srvlog.Infoln("Stopping server") + srvlog.Infoln("Stopping Server") + srv.ntab.Close() if srv.listener != nil { // this unblocks listener Accept srv.listener.Close() } close(srv.quit) - for _, peer := range srv.Peers() { - peer.Disconnect(DiscQuitting) - } - srv.wg.Wait() - - // wait till they actually disconnect - // this is checked by claiming all peerSlots. - // slots become available as the peers disconnect. - for i := 0; i < cap(srv.peerSlots); i++ { - <-srv.peerSlots - } - // terminate discLoop - close(srv.peerDisconnect) -} + srv.loopWG.Wait() -func (srv *Server) discLoop() { - for peer := range srv.peerDisconnect { - srv.removePeer(peer) + // No new peers can be added at this point because dialLoop and + // listenLoop are down. It is safe to call peerWG.Wait because + // peerWG.Add is not called outside of those loops. + for _, peer := range srv.peers { + peer.Disconnect(DiscQuitting) } + srv.peerWG.Wait() } // main loop for adding connections via listening func (srv *Server) listenLoop() { - defer srv.wg.Done() - + defer srv.loopWG.Done() srvlog.Infoln("Listening on", srv.listener.Addr()) for { - select { - case slot := <-srv.peerSlots: - srvlog.Debugf("grabbed slot %v for listening", slot) - conn, err := srv.listener.Accept() - if err != nil { - srv.peerSlots <- slot - return - } - srvlog.Debugf("Accepted conn %v (slot %d)\n", conn.RemoteAddr(), slot) - srv.addPeer(conn, nil, slot) - case <-srv.quit: + conn, err := srv.listener.Accept() + if err != nil { return } + srvlog.Debugf("Accepted conn %v\n", conn.RemoteAddr()) + srv.peerWG.Add(1) + go srv.startPeer(conn, nil) } } -func (srv *Server) natLoop(port int) { - defer srv.wg.Done() +func (srv *Server) dialLoop() { + defer srv.loopWG.Done() + refresh := time.NewTicker(refreshPeersInterval) + defer refresh.Stop() + + srv.ntab.Bootstrap(srv.BootstrapNodes) + go srv.findPeers() + + dialed := make(chan *discover.Node) + dialing := make(map[discover.NodeID]bool) + + // TODO: limit number of active dials + // TODO: ensure only one findPeers goroutine is running + // TODO: pause findPeers when we're at capacity + for { - srv.updatePortMapping(port) select { - case <-time.After(portMappingUpdateInterval): - // one more round + case <-refresh.C: + + go srv.findPeers() + + case dest := <-srv.peerConnect: + // avoid dialing nodes that are already connected. + // there is another check for this in addPeer, + // which runs after the handshake. + srv.lock.Lock() + _, isconnected := srv.peers[dest.ID] + srv.lock.Unlock() + if isconnected || dialing[dest.ID] || dest.ID == srv.ntab.Self() { + continue + } + + dialing[dest.ID] = true + srv.peerWG.Add(1) + go func() { + srv.dialNode(dest) + // at this point, the peer has been added + // or discarded. either way, we're not dialing it anymore. + dialed <- dest + }() + + case dest := <-dialed: + delete(dialing, dest.ID) + case <-srv.quit: - srv.removePortMapping(port) + // TODO: maybe wait for active dials return } } } -func (srv *Server) updatePortMapping(port int) { - srvlog.Infoln("Attempting to map port", port, "with", srv.NAT) - err := srv.NAT.AddPortMapping("tcp", port, port, "ethereum p2p", portMappingTimeout) +func (srv *Server) dialNode(dest *discover.Node) { + addr := &net.TCPAddr{IP: dest.IP, Port: dest.TCPPort} + srvlog.Debugf("Dialing %v\n", dest) + conn, err := srv.Dialer.Dial("tcp", addr.String()) if err != nil { - srvlog.Errorln("Port mapping error:", err) - return - } - extip, err := srv.NAT.GetExternalAddress() - if err != nil { - srvlog.Errorln("Error getting external IP:", err) + srvlog.DebugDetailf("dial error: %v", err) return } - srv.lock.Lock() - extaddr := *(srv.listener.Addr().(*net.TCPAddr)) - extaddr.IP = extip - srvlog.Infoln("Mapped port, external addr is", &extaddr) - srv.laddr = &extaddr - srv.lock.Unlock() + srv.startPeer(conn, dest) } -func (srv *Server) removePortMapping(port int) { - srvlog.Infoln("Removing port mapping for", port, "with", srv.NAT) - srv.NAT.DeletePortMapping("tcp", port, port) -} - -func (srv *Server) dialLoop() { - defer srv.wg.Done() - var ( - suggest chan *peerAddr - slot *int - slots = srv.peerSlots - ) - for { - select { - case i := <-slots: - // we need a peer in slot i, slot reserved - slot = &i - // now we can watch for candidate peers in the next loop - suggest = srv.peerConnect - // do not consume more until candidate peer is found - slots = nil - - case desc := <-suggest: - // candidate peer found, will dial out asyncronously - // if connection fails slot will be released - srvlog.DebugDetailf("dial %v (%v)", desc, *slot) - go srv.dialPeer(desc, *slot) - // we can watch if more peers needed in the next loop - slots = srv.peerSlots - // until then we dont care about candidate peers - suggest = nil +func (srv *Server) findPeers() { + far := srv.ntab.Self() + for i := range far { + far[i] = ^far[i] + } + closeToSelf := srv.ntab.Lookup(srv.ntab.Self()) + farFromSelf := srv.ntab.Lookup(far) - case <-srv.quit: - // give back the currently reserved slot - if slot != nil { - srv.peerSlots <- *slot - } - return + for i := 0; i < len(closeToSelf) || i < len(farFromSelf); i++ { + if i < len(closeToSelf) { + srv.peerConnect <- closeToSelf[i] + } + if i < len(farFromSelf) { + srv.peerConnect <- farFromSelf[i] } } } -// connect to peer via dial out -func (srv *Server) dialPeer(desc *peerAddr, slot int) { - srvlog.Debugf("Dialing %v (slot %d)\n", desc, slot) - conn, err := srv.Dialer.Dial(desc.Network(), desc.String()) +func (srv *Server) startPeer(conn net.Conn, dest *discover.Node) { + // TODO: handle/store session token + conn.SetDeadline(time.Now().Add(handshakeTimeout)) + remoteID, _, err := srv.handshakeFunc(conn, srv.PrivateKey, dest) if err != nil { - srvlog.DebugDetailf("dial error: %v", err) - srv.peerSlots <- slot + conn.Close() + srvlog.Debugf("Encryption Handshake with %v failed: %v", conn.RemoteAddr(), err) return } - go srv.addPeer(conn, desc, slot) -} + ourID := srv.ntab.Self() + p := newPeer(conn, srv.Protocols, srv.Name, &ourID, &remoteID) + if ok, reason := srv.addPeer(remoteID, p); !ok { + srvlog.DebugDetailf("Not adding %v (%v)\n", p, reason) + p.politeDisconnect(reason) + return + } + srvlog.Debugf("Added %v\n", p) -// creates the new peer object and inserts it into its slot -func (srv *Server) addPeer(conn net.Conn, desc *peerAddr, slot int) *Peer { - srv.lock.Lock() - defer srv.lock.Unlock() - if !srv.running { - conn.Close() - srv.peerSlots <- slot // release slot - return nil + if srv.newPeerHook != nil { + srv.newPeerHook(p) } - peer := srv.newPeerFunc(srv, conn, desc) - peer.slot = slot - srv.peers[slot] = peer - srv.peerCount++ - go func() { - peer.loop() - srv.peerDisconnect <- peer - }() - return peer + discreason := p.run() + srv.removePeer(p) + srvlog.Debugf("Removed %v (%v)\n", p, discreason) } -// removes peer: sending disconnect msg, stop peer, remove rom list/table, release slot -func (srv *Server) removePeer(peer *Peer) { +func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) { srv.lock.Lock() defer srv.lock.Unlock() - srvlog.Debugf("Removing %v (slot %v)\n", peer, peer.slot) - if srv.peers[peer.slot] != peer { - srvlog.Warnln("Invalid peer to remove:", peer) - return - } - // remove from list and index - srv.peerCount-- - srv.peers[peer.slot] = nil - // release slot to signal need for a new peer, last! - srv.peerSlots <- peer.slot + switch { + case !srv.running: + return false, DiscQuitting + case len(srv.peers) >= srv.MaxPeers: + return false, DiscTooManyPeers + case srv.peers[id] != nil: + return false, DiscAlreadyConnected + case srv.Blacklist.Exists(id[:]): + return false, DiscUselessPeer + case id == srv.ntab.Self(): + return false, DiscSelf + } + srv.peers[id] = p + return true, 0 } -func (srv *Server) verifyPeer(addr *peerAddr) error { - if srv.Blacklist.Exists(addr.Pubkey) { - return errors.New("blacklisted") - } - if bytes.Equal(srv.Identity.Pubkey()[1:], addr.Pubkey) { - return newPeerError(errPubkeyForbidden, "not allowed to connect to srv") - } - srv.lock.RLock() - defer srv.lock.RUnlock() - for _, peer := range srv.peers { - if peer != nil { - id := peer.Identity() - if id != nil && bytes.Equal(id.Pubkey(), addr.Pubkey) { - return errors.New("already connected") - } - } - } - return nil +func (srv *Server) removePeer(p *Peer) { + srv.lock.Lock() + delete(srv.peers, *p.remoteID) + srv.lock.Unlock() + srv.peerWG.Done() } -// TODO replace with "Set" type Blacklist interface { Get([]byte) (bool, error) Put([]byte) error |