From 5bdc1159433138d92ed6fefb253e3c6ed3a43995 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 5 Feb 2015 03:07:58 +0100 Subject: p2p: integrate p2p/discover Overview of changes: - ClientIdentity has been removed, use discover.NodeID - Server now requires a private key to be set (instead of public key) - Server performs the encryption handshake before launching Peer - Dial logic takes peers from discover table - Encryption handshake code has been cleaned up a bit - baseProtocol is gone because we don't exchange peers anymore - Some parts of baseProtocol have moved into Peer instead --- p2p/server.go | 343 +++++++++++++++++++++++++++++++--------------------------- 1 file changed, 184 insertions(+), 159 deletions(-) (limited to 'p2p/server.go') diff --git a/p2p/server.go b/p2p/server.go index 4fd1f7d03..9b8030541 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -2,37 +2,56 @@ package p2p import ( "bytes" + "crypto/ecdsa" "errors" "fmt" + "io" "net" + "runtime" "sync" "time" "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/p2p/discover" ) const ( - outboundAddressPoolSize = 500 defaultDialTimeout = 10 * time.Second + refreshPeersInterval = 30 * time.Second portMappingUpdateInterval = 15 * time.Minute portMappingTimeout = 20 * time.Minute ) var srvlog = logger.NewLogger("P2P Server") +// MakeName creates a node name that follows the ethereum convention +// for such names. It adds the operation system name and Go runtime version +// the name. +func MakeName(name, version string) string { + return fmt.Sprintf("%s/v%s/%s/%s", name, version, runtime.GOOS, runtime.Version()) +} + // Server manages all peer connections. // // The fields of Server are used as configuration parameters. // You should set them before starting the Server. Fields may not be // modified while the server is running. type Server struct { - // This field must be set to a valid client identity. - Identity ClientIdentity + // This field must be set to a valid secp256k1 private key. + PrivateKey *ecdsa.PrivateKey // MaxPeers is the maximum number of peers that can be // connected. It must be greater than zero. MaxPeers int + // Name sets the node name of this server. + // Use MakeName to create a name that follows existing conventions. + Name string + + // Bootstrap nodes are used to establish connectivity + // with the rest of the network. + BootstrapNodes []discover.Node + // Protocols should contain the protocols supported // by the server. Matching protocols are launched for // each peer. @@ -62,22 +81,23 @@ type Server struct { // If NoDial is true, the server will not dial any peers. NoDial bool - // Hook for testing. This is useful because we can inhibit + // Hooks for testing. These are useful because we can inhibit // the whole protocol stack. - newPeerFunc peerFunc + handshakeFunc + newPeerHook - lock sync.RWMutex - running bool - listener net.Listener - laddr *net.TCPAddr // real listen addr - peers []*Peer - peerSlots chan int - peerCount int - - quit chan struct{} - wg sync.WaitGroup - peerConnect chan *peerAddr - peerDisconnect chan *Peer + lock sync.RWMutex + running bool + listener net.Listener + laddr *net.TCPAddr // real listen addr + peers map[discover.NodeID]*Peer + + ntab *discover.Table + + quit chan struct{} + loopWG sync.WaitGroup // {dial,listen,nat}Loop + peerWG sync.WaitGroup // active peer goroutines + peerConnect chan *discover.Node } // NAT is implemented by NAT traversal methods. @@ -90,7 +110,8 @@ type NAT interface { String() string } -type peerFunc func(srv *Server, c net.Conn, dialAddr *peerAddr) *Peer +type handshakeFunc func(io.ReadWriter, *ecdsa.PrivateKey, *discover.Node) (discover.NodeID, []byte, error) +type newPeerHook func(*Peer) // Peers returns all connected peers. func (srv *Server) Peers() (peers []*Peer) { @@ -107,18 +128,15 @@ func (srv *Server) Peers() (peers []*Peer) { // PeerCount returns the number of connected peers. func (srv *Server) PeerCount() int { srv.lock.RLock() - defer srv.lock.RUnlock() - return srv.peerCount + n := len(srv.peers) + srv.lock.RUnlock() + return n } -// SuggestPeer injects an address into the outbound address pool. -func (srv *Server) SuggestPeer(ip net.IP, port int, nodeID []byte) { - addr := &peerAddr{ip, uint64(port), nodeID} - select { - case srv.peerConnect <- addr: - default: // don't block - srvlog.Warnf("peer suggestion %v ignored", addr) - } +// SuggestPeer creates a connection to the given Node if it +// is not already connected. +func (srv *Server) SuggestPeer(ip net.IP, port int, id discover.NodeID) { + srv.peerConnect <- &discover.Node{ID: id, Addr: &net.UDPAddr{IP: ip, Port: port}} } // Broadcast sends an RLP-encoded message to all connected peers. @@ -152,47 +170,47 @@ func (srv *Server) Start() (err error) { } srvlog.Infoln("Starting Server") - // initialize fields - if srv.Identity == nil { - return fmt.Errorf("Server.Identity must be set to a non-nil identity") + // initialize all the fields + if srv.PrivateKey == nil { + return fmt.Errorf("Server.PrivateKey must be set to a non-nil key") } if srv.MaxPeers <= 0 { return fmt.Errorf("Server.MaxPeers must be > 0") } srv.quit = make(chan struct{}) - srv.peers = make([]*Peer, srv.MaxPeers) - srv.peerSlots = make(chan int, srv.MaxPeers) - srv.peerConnect = make(chan *peerAddr, outboundAddressPoolSize) - srv.peerDisconnect = make(chan *Peer) - if srv.newPeerFunc == nil { - srv.newPeerFunc = newServerPeer + srv.peers = make(map[discover.NodeID]*Peer) + srv.peerConnect = make(chan *discover.Node) + + if srv.handshakeFunc == nil { + srv.handshakeFunc = encHandshake } if srv.Blacklist == nil { srv.Blacklist = NewBlacklist() } - if srv.Dialer == nil { - srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout} - } - if srv.ListenAddr != "" { if err := srv.startListening(); err != nil { return err } } + + // dial stuff + dt, err := discover.ListenUDP(srv.PrivateKey, srv.ListenAddr) + if err != nil { + return err + } + srv.ntab = dt + if srv.Dialer == nil { + srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout} + } if !srv.NoDial { - srv.wg.Add(1) + srv.loopWG.Add(1) go srv.dialLoop() } + if srv.NoDial && srv.ListenAddr == "" { srvlog.Warnln("I will be kind-of useless, neither dialing nor listening.") } - // make all slots available - for i := range srv.peers { - srv.peerSlots <- i - } - // note: discLoop is not part of WaitGroup - go srv.discLoop() srv.running = true return nil } @@ -205,10 +223,10 @@ func (srv *Server) startListening() error { srv.ListenAddr = listener.Addr().String() srv.laddr = listener.Addr().(*net.TCPAddr) srv.listener = listener - srv.wg.Add(1) + srv.loopWG.Add(1) go srv.listenLoop() if !srv.laddr.IP.IsLoopback() && srv.NAT != nil { - srv.wg.Add(1) + srv.loopWG.Add(1) go srv.natLoop(srv.laddr.Port) } return nil @@ -225,57 +243,41 @@ func (srv *Server) Stop() { srv.running = false srv.lock.Unlock() - srvlog.Infoln("Stopping server") + srvlog.Infoln("Stopping Server") + srv.ntab.Close() if srv.listener != nil { // this unblocks listener Accept srv.listener.Close() } close(srv.quit) - for _, peer := range srv.Peers() { - peer.Disconnect(DiscQuitting) - } - srv.wg.Wait() + srv.loopWG.Wait() - // wait till they actually disconnect - // this is checked by claiming all peerSlots. - // slots become available as the peers disconnect. - for i := 0; i < cap(srv.peerSlots); i++ { - <-srv.peerSlots - } - // terminate discLoop - close(srv.peerDisconnect) -} - -func (srv *Server) discLoop() { - for peer := range srv.peerDisconnect { - srv.removePeer(peer) + // No new peers can be added at this point because dialLoop and + // listenLoop are down. It is safe to call peerWG.Wait because + // peerWG.Add is not called outside of those loops. + for _, peer := range srv.peers { + peer.Disconnect(DiscQuitting) } + srv.peerWG.Wait() } // main loop for adding connections via listening func (srv *Server) listenLoop() { - defer srv.wg.Done() - + defer srv.loopWG.Done() srvlog.Infoln("Listening on", srv.listener.Addr()) for { - select { - case slot := <-srv.peerSlots: - srvlog.Debugf("grabbed slot %v for listening", slot) - conn, err := srv.listener.Accept() - if err != nil { - srv.peerSlots <- slot - return - } - srvlog.Debugf("Accepted conn %v (slot %d)\n", conn.RemoteAddr(), slot) - srv.addPeer(conn, nil, slot) - case <-srv.quit: + conn, err := srv.listener.Accept() + if err != nil { return } + srvlog.Debugf("Accepted conn %v\n", conn.RemoteAddr()) + srv.peerWG.Add(1) + go srv.startPeer(conn, nil) } } func (srv *Server) natLoop(port int) { - defer srv.wg.Done() + defer srv.loopWG.Done() for { srv.updatePortMapping(port) select { @@ -314,108 +316,131 @@ func (srv *Server) removePortMapping(port int) { } func (srv *Server) dialLoop() { - defer srv.wg.Done() - var ( - suggest chan *peerAddr - slot *int - slots = srv.peerSlots - ) + defer srv.loopWG.Done() + refresh := time.NewTicker(refreshPeersInterval) + defer refresh.Stop() + + srv.ntab.Bootstrap(srv.BootstrapNodes) + go srv.findPeers() + + dialed := make(chan *discover.Node) + dialing := make(map[discover.NodeID]bool) + + // TODO: limit number of active dials + // TODO: ensure only one findPeers goroutine is running + // TODO: pause findPeers when we're at capacity + for { select { - case i := <-slots: - // we need a peer in slot i, slot reserved - slot = &i - // now we can watch for candidate peers in the next loop - suggest = srv.peerConnect - // do not consume more until candidate peer is found - slots = nil - - case desc := <-suggest: - // candidate peer found, will dial out asyncronously - // if connection fails slot will be released - srvlog.DebugDetailf("dial %v (%v)", desc, *slot) - go srv.dialPeer(desc, *slot) - // we can watch if more peers needed in the next loop - slots = srv.peerSlots - // until then we dont care about candidate peers - suggest = nil + case <-refresh.C: - case <-srv.quit: - // give back the currently reserved slot - if slot != nil { - srv.peerSlots <- *slot + go srv.findPeers() + + case dest := <-srv.peerConnect: + srv.lock.Lock() + _, isconnected := srv.peers[dest.ID] + srv.lock.Unlock() + if isconnected || dialing[dest.ID] { + continue } + + dialing[dest.ID] = true + srv.peerWG.Add(1) + go func() { + srv.dialNode(dest) + // at this point, the peer has been added + // or discarded. either way, we're not dialing it anymore. + dialed <- dest + }() + + case dest := <-dialed: + delete(dialing, dest.ID) + + case <-srv.quit: + // TODO: maybe wait for active dials return } } } -// connect to peer via dial out -func (srv *Server) dialPeer(desc *peerAddr, slot int) { - srvlog.Debugf("Dialing %v (slot %d)\n", desc, slot) - conn, err := srv.Dialer.Dial(desc.Network(), desc.String()) +func (srv *Server) dialNode(dest *discover.Node) { + srvlog.Debugf("Dialing %v\n", dest.Addr) + conn, err := srv.Dialer.Dial("tcp", dest.Addr.String()) if err != nil { srvlog.DebugDetailf("dial error: %v", err) - srv.peerSlots <- slot return } - go srv.addPeer(conn, desc, slot) + srv.startPeer(conn, dest) } -// creates the new peer object and inserts it into its slot -func (srv *Server) addPeer(conn net.Conn, desc *peerAddr, slot int) *Peer { - srv.lock.Lock() - defer srv.lock.Unlock() - if !srv.running { +func (srv *Server) findPeers() { + far := srv.ntab.Self() + for i := range far { + far[i] = ^far[i] + } + closeToSelf := srv.ntab.Lookup(srv.ntab.Self()) + farFromSelf := srv.ntab.Lookup(far) + + for i := 0; i < len(closeToSelf) || i < len(farFromSelf); i++ { + if i < len(closeToSelf) { + srv.peerConnect <- closeToSelf[i] + } + if i < len(farFromSelf) { + srv.peerConnect <- farFromSelf[i] + } + } +} + +func (srv *Server) startPeer(conn net.Conn, dest *discover.Node) { + // TODO: I/O timeout, handle/store session token + remoteID, _, err := srv.handshakeFunc(conn, srv.PrivateKey, dest) + if err != nil { conn.Close() - srv.peerSlots <- slot // release slot - return nil + srvlog.Debugf("Encryption Handshake with %v failed: %v", conn.RemoteAddr(), err) + return + } + + ourID := srv.ntab.Self() + p := newPeer(conn, srv.Protocols, srv.Name, &ourID, &remoteID) + if ok, reason := srv.addPeer(remoteID, p); !ok { + p.Disconnect(reason) + return } - peer := srv.newPeerFunc(srv, conn, desc) - peer.slot = slot - srv.peers[slot] = peer - srv.peerCount++ - go func() { peer.loop(); srv.peerDisconnect <- peer }() - return peer + + srv.newPeerHook(p) + p.run() + srv.removePeer(p) } -// removes peer: sending disconnect msg, stop peer, remove rom list/table, release slot -func (srv *Server) removePeer(peer *Peer) { +func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) { srv.lock.Lock() defer srv.lock.Unlock() - srvlog.Debugf("Removing %v (slot %v)\n", peer, peer.slot) - if srv.peers[peer.slot] != peer { - srvlog.Warnln("Invalid peer to remove:", peer) - return - } - // remove from list and index - srv.peerCount-- - srv.peers[peer.slot] = nil - // release slot to signal need for a new peer, last! - srv.peerSlots <- peer.slot + switch { + case !srv.running: + return false, DiscQuitting + case len(srv.peers) >= srv.MaxPeers: + return false, DiscTooManyPeers + case srv.peers[id] != nil: + return false, DiscAlreadyConnected + case srv.Blacklist.Exists(id[:]): + return false, DiscUselessPeer + case id == srv.ntab.Self(): + return false, DiscSelf + } + srvlog.Debugf("Adding %v\n", p) + srv.peers[id] = p + return true, 0 } -func (srv *Server) verifyPeer(addr *peerAddr) error { - if srv.Blacklist.Exists(addr.Pubkey) { - return errors.New("blacklisted") - } - if bytes.Equal(srv.Identity.Pubkey()[1:], addr.Pubkey) { - return newPeerError(errPubkeyForbidden, "not allowed to connect to srv") - } - srv.lock.RLock() - defer srv.lock.RUnlock() - for _, peer := range srv.peers { - if peer != nil { - id := peer.Identity() - if id != nil && bytes.Equal(id.Pubkey(), addr.Pubkey) { - return errors.New("already connected") - } - } - } - return nil +// removes peer: sending disconnect msg, stop peer, remove rom list/table, release slot +func (srv *Server) removePeer(p *Peer) { + srvlog.Debugf("Removing %v\n", p) + srv.lock.Lock() + delete(srv.peers, *p.remoteID) + srv.lock.Unlock() + srv.peerWG.Done() } -// TODO replace with "Set" type Blacklist interface { Get([]byte) (bool, error) Put([]byte) error -- cgit v1.2.3 From 8564eb9f7efcc7a0e639f6a45d7e67037fedac5f Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 6 Feb 2015 14:40:53 +0100 Subject: p2p/discover: add node URL functions, distinguish TCP/UDP ports The discovery RPC protocol does not yet distinguish TCP and UDP ports. But it can't hurt to do so in our internal model. --- p2p/server.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'p2p/server.go') diff --git a/p2p/server.go b/p2p/server.go index 9b8030541..87be97a2f 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -136,7 +136,7 @@ func (srv *Server) PeerCount() int { // SuggestPeer creates a connection to the given Node if it // is not already connected. func (srv *Server) SuggestPeer(ip net.IP, port int, id discover.NodeID) { - srv.peerConnect <- &discover.Node{ID: id, Addr: &net.UDPAddr{IP: ip, Port: port}} + srv.peerConnect <- &discover.Node{ID: id, IP: ip, TCPPort: port} } // Broadcast sends an RLP-encoded message to all connected peers. @@ -364,8 +364,9 @@ func (srv *Server) dialLoop() { } func (srv *Server) dialNode(dest *discover.Node) { - srvlog.Debugf("Dialing %v\n", dest.Addr) - conn, err := srv.Dialer.Dial("tcp", dest.Addr.String()) + addr := &net.TCPAddr{IP: dest.IP, Port: dest.TCPPort} + srvlog.Debugf("Dialing %v\n", dest) + conn, err := srv.Dialer.Dial("tcp", addr.String()) if err != nil { srvlog.DebugDetailf("dial error: %v", err) return -- cgit v1.2.3 From e34d1341022a51d8a86c4836c91e4e0ded888d27 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Sat, 7 Feb 2015 00:13:22 +0100 Subject: p2p: fixes for actual connections The unit test hooks were turned on 'in production'. --- p2p/server.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'p2p/server.go') diff --git a/p2p/server.go b/p2p/server.go index 87be97a2f..c6d7fc2e8 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -408,7 +408,9 @@ func (srv *Server) startPeer(conn net.Conn, dest *discover.Node) { return } - srv.newPeerHook(p) + if srv.newPeerHook != nil { + srv.newPeerHook(p) + } p.run() srv.removePeer(p) } -- cgit v1.2.3 From 2cf4fed11b01bb99e08b838f7df2b9396f42f758 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Sat, 7 Feb 2015 00:15:04 +0100 Subject: cmd/mist, eth, javascript, p2p: use Node URLs for peer suggestions --- p2p/server.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'p2p/server.go') diff --git a/p2p/server.go b/p2p/server.go index c6d7fc2e8..bb3101485 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -135,8 +135,8 @@ func (srv *Server) PeerCount() int { // SuggestPeer creates a connection to the given Node if it // is not already connected. -func (srv *Server) SuggestPeer(ip net.IP, port int, id discover.NodeID) { - srv.peerConnect <- &discover.Node{ID: id, IP: ip, TCPPort: port} +func (srv *Server) SuggestPeer(n *discover.Node) { + srv.peerConnect <- n } // Broadcast sends an RLP-encoded message to all connected peers. -- cgit v1.2.3 From 028775a0863946c1e9ad51fe7b22faa5c59b2605 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Sat, 7 Feb 2015 00:38:36 +0100 Subject: cmd/ethereum, cmd/mist: add flag for discovery bootstrap nodes --- p2p/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'p2p/server.go') diff --git a/p2p/server.go b/p2p/server.go index bb3101485..3cab61102 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -50,7 +50,7 @@ type Server struct { // Bootstrap nodes are used to establish connectivity // with the rest of the network. - BootstrapNodes []discover.Node + BootstrapNodes []*discover.Node // Protocols should contain the protocols supported // by the server. Matching protocols are launched for -- cgit v1.2.3 From d0a2e655c9599f462bb20bd49bc69b8e1e330a21 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 11 Feb 2015 17:19:31 +0100 Subject: cmd/ethereum, cmd/mist, eth, p2p: use package p2p/nat This deletes the old NAT implementation. --- p2p/server.go | 70 ++++++++++------------------------------------------------- 1 file changed, 11 insertions(+), 59 deletions(-) (limited to 'p2p/server.go') diff --git a/p2p/server.go b/p2p/server.go index 3cab61102..a0f2dee23 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -13,13 +13,12 @@ import ( "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/nat" ) const ( - defaultDialTimeout = 10 * time.Second - refreshPeersInterval = 30 * time.Second - portMappingUpdateInterval = 15 * time.Minute - portMappingTimeout = 20 * time.Minute + defaultDialTimeout = 10 * time.Second + refreshPeersInterval = 30 * time.Second ) var srvlog = logger.NewLogger("P2P Server") @@ -72,7 +71,7 @@ type Server struct { // If set to a non-nil value, the given NAT port mapper // is used to make the listening port available to the // Internet. - NAT NAT + NAT nat.Interface // If Dialer is set to a non-nil value, the given Dialer // is used to dial outbound peer connections. @@ -89,7 +88,6 @@ type Server struct { lock sync.RWMutex running bool listener net.Listener - laddr *net.TCPAddr // real listen addr peers map[discover.NodeID]*Peer ntab *discover.Table @@ -100,16 +98,6 @@ type Server struct { peerConnect chan *discover.Node } -// NAT is implemented by NAT traversal methods. -type NAT interface { - GetExternalAddress() (net.IP, error) - AddPortMapping(protocol string, extport, intport int, name string, lifetime time.Duration) error - DeletePortMapping(protocol string, extport, intport int) error - - // Should return name of the method. - String() string -} - type handshakeFunc func(io.ReadWriter, *ecdsa.PrivateKey, *discover.Node) (discover.NodeID, []byte, error) type newPeerHook func(*Peer) @@ -220,14 +208,17 @@ func (srv *Server) startListening() error { if err != nil { return err } - srv.ListenAddr = listener.Addr().String() - srv.laddr = listener.Addr().(*net.TCPAddr) + laddr := listener.Addr().(*net.TCPAddr) + srv.ListenAddr = laddr.String() srv.listener = listener srv.loopWG.Add(1) go srv.listenLoop() - if !srv.laddr.IP.IsLoopback() && srv.NAT != nil { + if !laddr.IP.IsLoopback() && srv.NAT != nil { srv.loopWG.Add(1) - go srv.natLoop(srv.laddr.Port) + go func() { + nat.Map(srv.NAT, srv.quit, "tcp", laddr.Port, laddr.Port, "ethereum p2p") + srv.loopWG.Done() + }() } return nil } @@ -276,45 +267,6 @@ func (srv *Server) listenLoop() { } } -func (srv *Server) natLoop(port int) { - defer srv.loopWG.Done() - for { - srv.updatePortMapping(port) - select { - case <-time.After(portMappingUpdateInterval): - // one more round - case <-srv.quit: - srv.removePortMapping(port) - return - } - } -} - -func (srv *Server) updatePortMapping(port int) { - srvlog.Infoln("Attempting to map port", port, "with", srv.NAT) - err := srv.NAT.AddPortMapping("tcp", port, port, "ethereum p2p", portMappingTimeout) - if err != nil { - srvlog.Errorln("Port mapping error:", err) - return - } - extip, err := srv.NAT.GetExternalAddress() - if err != nil { - srvlog.Errorln("Error getting external IP:", err) - return - } - srv.lock.Lock() - extaddr := *(srv.listener.Addr().(*net.TCPAddr)) - extaddr.IP = extip - srvlog.Infoln("Mapped port, external addr is", &extaddr) - srv.laddr = &extaddr - srv.lock.Unlock() -} - -func (srv *Server) removePortMapping(port int) { - srvlog.Infoln("Removing port mapping for", port, "with", srv.NAT) - srv.NAT.DeletePortMapping("tcp", port, port) -} - func (srv *Server) dialLoop() { defer srv.loopWG.Done() refresh := time.NewTicker(refreshPeersInterval) -- cgit v1.2.3 From 170eb3ac684231dc2ddebc34e7006e0f2b5fc0c1 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 13 Feb 2015 11:38:34 +0100 Subject: p2p/discover: map listening port using configured mechanism --- p2p/server.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'p2p/server.go') diff --git a/p2p/server.go b/p2p/server.go index a0f2dee23..e44f3d7ab 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -182,7 +182,7 @@ func (srv *Server) Start() (err error) { } // dial stuff - dt, err := discover.ListenUDP(srv.PrivateKey, srv.ListenAddr) + dt, err := discover.ListenUDP(srv.PrivateKey, srv.ListenAddr, srv.NAT) if err != nil { return err } @@ -194,7 +194,6 @@ func (srv *Server) Start() (err error) { srv.loopWG.Add(1) go srv.dialLoop() } - if srv.NoDial && srv.ListenAddr == "" { srvlog.Warnln("I will be kind-of useless, neither dialing nor listening.") } -- cgit v1.2.3 From 22ee366ed6419516eece4436c9f7b5b63ea8a713 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 13 Feb 2015 14:47:05 +0100 Subject: p2p: fix goroutine leak for invalid peers The deflect logic called Disconnect on the peer, but the peer never ran and wouldn't process the disconnect request. --- p2p/server.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) (limited to 'p2p/server.go') diff --git a/p2p/server.go b/p2p/server.go index e44f3d7ab..f61bb897e 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -351,19 +351,21 @@ func (srv *Server) startPeer(conn net.Conn, dest *discover.Node) { srvlog.Debugf("Encryption Handshake with %v failed: %v", conn.RemoteAddr(), err) return } - ourID := srv.ntab.Self() p := newPeer(conn, srv.Protocols, srv.Name, &ourID, &remoteID) if ok, reason := srv.addPeer(remoteID, p); !ok { - p.Disconnect(reason) + srvlog.DebugDetailf("Not adding %v (%v)\n", p, reason) + p.politeDisconnect(reason) return } + srvlog.Debugf("Added %v\n", p) if srv.newPeerHook != nil { srv.newPeerHook(p) } - p.run() + discreason := p.run() srv.removePeer(p) + srvlog.Debugf("Removed %v (%v)\n", p, discreason) } func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) { @@ -381,14 +383,11 @@ func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) { case id == srv.ntab.Self(): return false, DiscSelf } - srvlog.Debugf("Adding %v\n", p) srv.peers[id] = p return true, 0 } -// removes peer: sending disconnect msg, stop peer, remove rom list/table, release slot func (srv *Server) removePeer(p *Peer) { - srvlog.Debugf("Removing %v\n", p) srv.lock.Lock() delete(srv.peers, *p.remoteID) srv.lock.Unlock() -- cgit v1.2.3 From 7101f4499873fbf6a68cbe08a45797ff8ec71e74 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 13 Feb 2015 14:49:49 +0100 Subject: p2p: add I/O timeout for encrytion handshake --- p2p/server.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'p2p/server.go') diff --git a/p2p/server.go b/p2p/server.go index f61bb897e..b93340150 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -17,6 +17,7 @@ import ( ) const ( + handshakeTimeout = 5 * time.Second defaultDialTimeout = 10 * time.Second refreshPeersInterval = 30 * time.Second ) @@ -344,7 +345,8 @@ func (srv *Server) findPeers() { } func (srv *Server) startPeer(conn net.Conn, dest *discover.Node) { - // TODO: I/O timeout, handle/store session token + // TODO: handle/store session token + conn.SetDeadline(time.Now().Add(handshakeTimeout)) remoteID, _, err := srv.handshakeFunc(conn, srv.PrivateKey, dest) if err != nil { conn.Close() -- cgit v1.2.3 From 5cc1256fd679cbb8cb80502494b8c02befc757c8 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 13 Feb 2015 14:50:14 +0100 Subject: p2p: ensure we don't dial ourself addPeer doesn't allow self connects, but we can avoid opening connections in the first place. --- p2p/server.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'p2p/server.go') diff --git a/p2p/server.go b/p2p/server.go index b93340150..e510be521 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -289,10 +289,13 @@ func (srv *Server) dialLoop() { go srv.findPeers() case dest := <-srv.peerConnect: + // avoid dialing nodes that are already connected. + // there is another check for this in addPeer, + // which runs after the handshake. srv.lock.Lock() _, isconnected := srv.peers[dest.ID] srv.lock.Unlock() - if isconnected || dialing[dest.ID] { + if isconnected || dialing[dest.ID] || dest.ID == srv.ntab.Self() { continue } -- cgit v1.2.3