aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/discover/udp.go
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/discover/udp.go')
-rw-r--r--p2p/discover/udp.go92
1 files changed, 48 insertions, 44 deletions
diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go
index 45fcce282..37a044902 100644
--- a/p2p/discover/udp.go
+++ b/p2p/discover/udp.go
@@ -23,12 +23,12 @@ import (
"errors"
"fmt"
"net"
+ "sync"
"time"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
- "github.com/ethereum/go-ethereum/p2p/nat"
"github.com/ethereum/go-ethereum/p2p/netutil"
"github.com/ethereum/go-ethereum/rlp"
)
@@ -118,9 +118,11 @@ type (
)
func makeEndpoint(addr *net.UDPAddr, tcpPort uint16) rpcEndpoint {
- ip := addr.IP.To4()
- if ip == nil {
- ip = addr.IP.To16()
+ ip := net.IP{}
+ if ip4 := addr.IP.To4(); ip4 != nil {
+ ip = ip4
+ } else if ip6 := addr.IP.To16(); ip6 != nil {
+ ip = ip6
}
return rpcEndpoint{IP: ip, UDP: uint16(addr.Port), TCP: tcpPort}
}
@@ -165,20 +167,19 @@ type conn interface {
LocalAddr() net.Addr
}
-// udp implements the RPC protocol.
+// udp implements the discovery v4 UDP wire protocol.
type udp struct {
conn conn
netrestrict *netutil.Netlist
priv *ecdsa.PrivateKey
- ourEndpoint rpcEndpoint
+ localNode *enode.LocalNode
+ db *enode.DB
+ tab *Table
+ wg sync.WaitGroup
addpending chan *pending
gotreply chan reply
-
- closing chan struct{}
- nat nat.Interface
-
- *Table
+ closing chan struct{}
}
// pending represents a pending reply.
@@ -230,60 +231,57 @@ type Config struct {
PrivateKey *ecdsa.PrivateKey
// These settings are optional:
- AnnounceAddr *net.UDPAddr // local address announced in the DHT
- NodeDBPath string // if set, the node database is stored at this filesystem location
- NetRestrict *netutil.Netlist // network whitelist
- Bootnodes []*enode.Node // list of bootstrap nodes
- Unhandled chan<- ReadPacket // unhandled packets are sent on this channel
+ NetRestrict *netutil.Netlist // network whitelist
+ Bootnodes []*enode.Node // list of bootstrap nodes
+ Unhandled chan<- ReadPacket // unhandled packets are sent on this channel
}
// ListenUDP returns a new table that listens for UDP packets on laddr.
-func ListenUDP(c conn, cfg Config) (*Table, error) {
- tab, _, err := newUDP(c, cfg)
+func ListenUDP(c conn, ln *enode.LocalNode, cfg Config) (*Table, error) {
+ tab, _, err := newUDP(c, ln, cfg)
if err != nil {
return nil, err
}
- log.Info("UDP listener up", "self", tab.self)
return tab, nil
}
-func newUDP(c conn, cfg Config) (*Table, *udp, error) {
- realaddr := c.LocalAddr().(*net.UDPAddr)
- if cfg.AnnounceAddr != nil {
- realaddr = cfg.AnnounceAddr
- }
- self := enode.NewV4(&cfg.PrivateKey.PublicKey, realaddr.IP, realaddr.Port, realaddr.Port)
- db, err := enode.OpenDB(cfg.NodeDBPath)
- if err != nil {
- return nil, nil, err
- }
-
+func newUDP(c conn, ln *enode.LocalNode, cfg Config) (*Table, *udp, error) {
udp := &udp{
conn: c,
priv: cfg.PrivateKey,
netrestrict: cfg.NetRestrict,
+ localNode: ln,
+ db: ln.Database(),
closing: make(chan struct{}),
gotreply: make(chan reply),
addpending: make(chan *pending),
}
- // TODO: separate TCP port
- udp.ourEndpoint = makeEndpoint(realaddr, uint16(realaddr.Port))
- tab, err := newTable(udp, self, db, cfg.Bootnodes)
+ tab, err := newTable(udp, ln.Database(), cfg.Bootnodes)
if err != nil {
return nil, nil, err
}
- udp.Table = tab
+ udp.tab = tab
+ udp.wg.Add(2)
go udp.loop()
go udp.readLoop(cfg.Unhandled)
- return udp.Table, udp, nil
+ return udp.tab, udp, nil
+}
+
+func (t *udp) self() *enode.Node {
+ return t.localNode.Node()
}
func (t *udp) close() {
close(t.closing)
t.conn.Close()
- t.db.Close()
- // TODO: wait for the loops to end.
+ t.wg.Wait()
+}
+
+func (t *udp) ourEndpoint() rpcEndpoint {
+ n := t.self()
+ a := &net.UDPAddr{IP: n.IP(), Port: n.UDP()}
+ return makeEndpoint(a, uint16(n.TCP()))
}
// ping sends a ping message to the given node and waits for a reply.
@@ -296,7 +294,7 @@ func (t *udp) ping(toid enode.ID, toaddr *net.UDPAddr) error {
func (t *udp) sendPing(toid enode.ID, toaddr *net.UDPAddr, callback func()) <-chan error {
req := &ping{
Version: 4,
- From: t.ourEndpoint,
+ From: t.ourEndpoint(),
To: makeEndpoint(toaddr, 0), // TODO: maybe use known TCP port from DB
Expiration: uint64(time.Now().Add(expiration).Unix()),
}
@@ -313,6 +311,7 @@ func (t *udp) sendPing(toid enode.ID, toaddr *net.UDPAddr, callback func()) <-ch
}
return ok
})
+ t.localNode.UDPContact(toaddr)
t.write(toaddr, req.name(), packet)
return errc
}
@@ -381,6 +380,8 @@ func (t *udp) handleReply(from enode.ID, ptype byte, req packet) bool {
// loop runs in its own goroutine. it keeps track of
// the refresh timer and the pending reply queue.
func (t *udp) loop() {
+ defer t.wg.Done()
+
var (
plist = list.New()
timeout = time.NewTimer(0)
@@ -542,10 +543,11 @@ func encodePacket(priv *ecdsa.PrivateKey, ptype byte, req interface{}) (packet,
// readLoop runs in its own goroutine. it handles incoming UDP packets.
func (t *udp) readLoop(unhandled chan<- ReadPacket) {
- defer t.conn.Close()
+ defer t.wg.Done()
if unhandled != nil {
defer close(unhandled)
}
+
// Discovery packets are defined to be no larger than 1280 bytes.
// Packets larger than this size will be cut at the end and treated
// as invalid because their hash won't match.
@@ -629,10 +631,11 @@ func (req *ping) handle(t *udp, from *net.UDPAddr, fromKey encPubkey, mac []byte
n := wrapNode(enode.NewV4(key, from.IP, int(req.From.TCP), from.Port))
t.handleReply(n.ID(), pingPacket, req)
if time.Since(t.db.LastPongReceived(n.ID())) > bondExpiration {
- t.sendPing(n.ID(), from, func() { t.addThroughPing(n) })
+ t.sendPing(n.ID(), from, func() { t.tab.addThroughPing(n) })
} else {
- t.addThroughPing(n)
+ t.tab.addThroughPing(n)
}
+ t.localNode.UDPEndpointStatement(from, &net.UDPAddr{IP: req.To.IP, Port: int(req.To.UDP)})
t.db.UpdateLastPingReceived(n.ID(), time.Now())
return nil
}
@@ -647,6 +650,7 @@ func (req *pong) handle(t *udp, from *net.UDPAddr, fromKey encPubkey, mac []byte
if !t.handleReply(fromID, pongPacket, req) {
return errUnsolicitedReply
}
+ t.localNode.UDPEndpointStatement(from, &net.UDPAddr{IP: req.To.IP, Port: int(req.To.UDP)})
t.db.UpdateLastPongReceived(fromID, time.Now())
return nil
}
@@ -668,9 +672,9 @@ func (req *findnode) handle(t *udp, from *net.UDPAddr, fromKey encPubkey, mac []
return errUnknownNode
}
target := enode.ID(crypto.Keccak256Hash(req.Target[:]))
- t.mutex.Lock()
- closest := t.closest(target, bucketSize).entries
- t.mutex.Unlock()
+ t.tab.mutex.Lock()
+ closest := t.tab.closest(target, bucketSize).entries
+ t.tab.mutex.Unlock()
p := neighbors{Expiration: uint64(time.Now().Add(expiration).Unix())}
var sent bool