diff options
Diffstat (limited to 'p2p/discover/udp.go')
-rw-r--r-- | p2p/discover/udp.go | 92 |
1 files changed, 48 insertions, 44 deletions
diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go index 45fcce282..37a044902 100644 --- a/p2p/discover/udp.go +++ b/p2p/discover/udp.go @@ -23,12 +23,12 @@ import ( "errors" "fmt" "net" + "sync" "time" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/p2p/nat" "github.com/ethereum/go-ethereum/p2p/netutil" "github.com/ethereum/go-ethereum/rlp" ) @@ -118,9 +118,11 @@ type ( ) func makeEndpoint(addr *net.UDPAddr, tcpPort uint16) rpcEndpoint { - ip := addr.IP.To4() - if ip == nil { - ip = addr.IP.To16() + ip := net.IP{} + if ip4 := addr.IP.To4(); ip4 != nil { + ip = ip4 + } else if ip6 := addr.IP.To16(); ip6 != nil { + ip = ip6 } return rpcEndpoint{IP: ip, UDP: uint16(addr.Port), TCP: tcpPort} } @@ -165,20 +167,19 @@ type conn interface { LocalAddr() net.Addr } -// udp implements the RPC protocol. +// udp implements the discovery v4 UDP wire protocol. type udp struct { conn conn netrestrict *netutil.Netlist priv *ecdsa.PrivateKey - ourEndpoint rpcEndpoint + localNode *enode.LocalNode + db *enode.DB + tab *Table + wg sync.WaitGroup addpending chan *pending gotreply chan reply - - closing chan struct{} - nat nat.Interface - - *Table + closing chan struct{} } // pending represents a pending reply. @@ -230,60 +231,57 @@ type Config struct { PrivateKey *ecdsa.PrivateKey // These settings are optional: - AnnounceAddr *net.UDPAddr // local address announced in the DHT - NodeDBPath string // if set, the node database is stored at this filesystem location - NetRestrict *netutil.Netlist // network whitelist - Bootnodes []*enode.Node // list of bootstrap nodes - Unhandled chan<- ReadPacket // unhandled packets are sent on this channel + NetRestrict *netutil.Netlist // network whitelist + Bootnodes []*enode.Node // list of bootstrap nodes + Unhandled chan<- ReadPacket // unhandled packets are sent on this channel } // ListenUDP returns a new table that listens for UDP packets on laddr. -func ListenUDP(c conn, cfg Config) (*Table, error) { - tab, _, err := newUDP(c, cfg) +func ListenUDP(c conn, ln *enode.LocalNode, cfg Config) (*Table, error) { + tab, _, err := newUDP(c, ln, cfg) if err != nil { return nil, err } - log.Info("UDP listener up", "self", tab.self) return tab, nil } -func newUDP(c conn, cfg Config) (*Table, *udp, error) { - realaddr := c.LocalAddr().(*net.UDPAddr) - if cfg.AnnounceAddr != nil { - realaddr = cfg.AnnounceAddr - } - self := enode.NewV4(&cfg.PrivateKey.PublicKey, realaddr.IP, realaddr.Port, realaddr.Port) - db, err := enode.OpenDB(cfg.NodeDBPath) - if err != nil { - return nil, nil, err - } - +func newUDP(c conn, ln *enode.LocalNode, cfg Config) (*Table, *udp, error) { udp := &udp{ conn: c, priv: cfg.PrivateKey, netrestrict: cfg.NetRestrict, + localNode: ln, + db: ln.Database(), closing: make(chan struct{}), gotreply: make(chan reply), addpending: make(chan *pending), } - // TODO: separate TCP port - udp.ourEndpoint = makeEndpoint(realaddr, uint16(realaddr.Port)) - tab, err := newTable(udp, self, db, cfg.Bootnodes) + tab, err := newTable(udp, ln.Database(), cfg.Bootnodes) if err != nil { return nil, nil, err } - udp.Table = tab + udp.tab = tab + udp.wg.Add(2) go udp.loop() go udp.readLoop(cfg.Unhandled) - return udp.Table, udp, nil + return udp.tab, udp, nil +} + +func (t *udp) self() *enode.Node { + return t.localNode.Node() } func (t *udp) close() { close(t.closing) t.conn.Close() - t.db.Close() - // TODO: wait for the loops to end. + t.wg.Wait() +} + +func (t *udp) ourEndpoint() rpcEndpoint { + n := t.self() + a := &net.UDPAddr{IP: n.IP(), Port: n.UDP()} + return makeEndpoint(a, uint16(n.TCP())) } // ping sends a ping message to the given node and waits for a reply. @@ -296,7 +294,7 @@ func (t *udp) ping(toid enode.ID, toaddr *net.UDPAddr) error { func (t *udp) sendPing(toid enode.ID, toaddr *net.UDPAddr, callback func()) <-chan error { req := &ping{ Version: 4, - From: t.ourEndpoint, + From: t.ourEndpoint(), To: makeEndpoint(toaddr, 0), // TODO: maybe use known TCP port from DB Expiration: uint64(time.Now().Add(expiration).Unix()), } @@ -313,6 +311,7 @@ func (t *udp) sendPing(toid enode.ID, toaddr *net.UDPAddr, callback func()) <-ch } return ok }) + t.localNode.UDPContact(toaddr) t.write(toaddr, req.name(), packet) return errc } @@ -381,6 +380,8 @@ func (t *udp) handleReply(from enode.ID, ptype byte, req packet) bool { // loop runs in its own goroutine. it keeps track of // the refresh timer and the pending reply queue. func (t *udp) loop() { + defer t.wg.Done() + var ( plist = list.New() timeout = time.NewTimer(0) @@ -542,10 +543,11 @@ func encodePacket(priv *ecdsa.PrivateKey, ptype byte, req interface{}) (packet, // readLoop runs in its own goroutine. it handles incoming UDP packets. func (t *udp) readLoop(unhandled chan<- ReadPacket) { - defer t.conn.Close() + defer t.wg.Done() if unhandled != nil { defer close(unhandled) } + // Discovery packets are defined to be no larger than 1280 bytes. // Packets larger than this size will be cut at the end and treated // as invalid because their hash won't match. @@ -629,10 +631,11 @@ func (req *ping) handle(t *udp, from *net.UDPAddr, fromKey encPubkey, mac []byte n := wrapNode(enode.NewV4(key, from.IP, int(req.From.TCP), from.Port)) t.handleReply(n.ID(), pingPacket, req) if time.Since(t.db.LastPongReceived(n.ID())) > bondExpiration { - t.sendPing(n.ID(), from, func() { t.addThroughPing(n) }) + t.sendPing(n.ID(), from, func() { t.tab.addThroughPing(n) }) } else { - t.addThroughPing(n) + t.tab.addThroughPing(n) } + t.localNode.UDPEndpointStatement(from, &net.UDPAddr{IP: req.To.IP, Port: int(req.To.UDP)}) t.db.UpdateLastPingReceived(n.ID(), time.Now()) return nil } @@ -647,6 +650,7 @@ func (req *pong) handle(t *udp, from *net.UDPAddr, fromKey encPubkey, mac []byte if !t.handleReply(fromID, pongPacket, req) { return errUnsolicitedReply } + t.localNode.UDPEndpointStatement(from, &net.UDPAddr{IP: req.To.IP, Port: int(req.To.UDP)}) t.db.UpdateLastPongReceived(fromID, time.Now()) return nil } @@ -668,9 +672,9 @@ func (req *findnode) handle(t *udp, from *net.UDPAddr, fromKey encPubkey, mac [] return errUnknownNode } target := enode.ID(crypto.Keccak256Hash(req.Target[:])) - t.mutex.Lock() - closest := t.closest(target, bucketSize).entries - t.mutex.Unlock() + t.tab.mutex.Lock() + closest := t.tab.closest(target, bucketSize).entries + t.tab.mutex.Unlock() p := neighbors{Expiration: uint64(time.Now().Add(expiration).Unix())} var sent bool |