aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/discover/udp.go
diff options
context:
space:
mode:
authorJanos Guljas <janos@resenje.org>2018-09-25 22:57:31 +0800
committerJanos Guljas <janos@resenje.org>2018-09-25 22:57:31 +0800
commit24349144b6c0642755569268bab56b9033743212 (patch)
tree9d9d2b6659fd8a56512dfc807aafe4b733165ae1 /p2p/discover/udp.go
parent7d56602391e155e2ce9ba7c261300a1804ab9972 (diff)
parentd3441ebb563439bac0837d70591f92e2c6080303 (diff)
downloaddexon-24349144b6c0642755569268bab56b9033743212.tar
dexon-24349144b6c0642755569268bab56b9033743212.tar.gz
dexon-24349144b6c0642755569268bab56b9033743212.tar.bz2
dexon-24349144b6c0642755569268bab56b9033743212.tar.lz
dexon-24349144b6c0642755569268bab56b9033743212.tar.xz
dexon-24349144b6c0642755569268bab56b9033743212.tar.zst
dexon-24349144b6c0642755569268bab56b9033743212.zip
Merge branch 'master' into max-stream-peer-servers
Diffstat (limited to 'p2p/discover/udp.go')
-rw-r--r--p2p/discover/udp.go122
1 files changed, 72 insertions, 50 deletions
diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go
index 0ff47c5e4..45fcce282 100644
--- a/p2p/discover/udp.go
+++ b/p2p/discover/udp.go
@@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/nat"
"github.com/ethereum/go-ethereum/p2p/netutil"
"github.com/ethereum/go-ethereum/rlp"
@@ -46,8 +47,9 @@ var (
// Timeouts
const (
- respTimeout = 500 * time.Millisecond
- expiration = 20 * time.Second
+ respTimeout = 500 * time.Millisecond
+ expiration = 20 * time.Second
+ bondExpiration = 24 * time.Hour
ntpFailureThreshold = 32 // Continuous timeouts after which to check NTP
ntpWarningCooldown = 10 * time.Minute // Minimum amount of time to pass before repeating NTP warning
@@ -87,7 +89,7 @@ type (
// findnode is a query for nodes close to the given target.
findnode struct {
- Target NodeID // doesn't need to be an actual public key
+ Target encPubkey
Expiration uint64
// Ignore additional fields (for forward compatibility).
Rest []rlp.RawValue `rlp:"tail"`
@@ -105,7 +107,7 @@ type (
IP net.IP // len 4 for IPv4 or 16 for IPv6
UDP uint16 // for discovery protocol
TCP uint16 // for RLPx protocol
- ID NodeID
+ ID encPubkey
}
rpcEndpoint struct {
@@ -123,7 +125,7 @@ func makeEndpoint(addr *net.UDPAddr, tcpPort uint16) rpcEndpoint {
return rpcEndpoint{IP: ip, UDP: uint16(addr.Port), TCP: tcpPort}
}
-func (t *udp) nodeFromRPC(sender *net.UDPAddr, rn rpcNode) (*Node, error) {
+func (t *udp) nodeFromRPC(sender *net.UDPAddr, rn rpcNode) (*node, error) {
if rn.UDP <= 1024 {
return nil, errors.New("low port")
}
@@ -133,17 +135,26 @@ func (t *udp) nodeFromRPC(sender *net.UDPAddr, rn rpcNode) (*Node, error) {
if t.netrestrict != nil && !t.netrestrict.Contains(rn.IP) {
return nil, errors.New("not contained in netrestrict whitelist")
}
- n := NewNode(rn.ID, rn.IP, rn.UDP, rn.TCP)
- err := n.validateComplete()
+ key, err := decodePubkey(rn.ID)
+ if err != nil {
+ return nil, err
+ }
+ n := wrapNode(enode.NewV4(key, rn.IP, int(rn.TCP), int(rn.UDP)))
+ err = n.ValidateComplete()
return n, err
}
-func nodeToRPC(n *Node) rpcNode {
- return rpcNode{ID: n.ID, IP: n.IP, UDP: n.UDP, TCP: n.TCP}
+func nodeToRPC(n *node) rpcNode {
+ var key ecdsa.PublicKey
+ var ekey encPubkey
+ if err := n.Load((*enode.Secp256k1)(&key)); err == nil {
+ ekey = encodePubkey(&key)
+ }
+ return rpcNode{ID: ekey, IP: n.IP(), UDP: uint16(n.UDP()), TCP: uint16(n.TCP())}
}
type packet interface {
- handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error
+ handle(t *udp, from *net.UDPAddr, fromKey encPubkey, mac []byte) error
name() string
}
@@ -181,7 +192,7 @@ type udp struct {
// to all the callback functions for that node.
type pending struct {
// these fields must match in the reply.
- from NodeID
+ from enode.ID
ptype byte
// time when the request must complete
@@ -199,7 +210,7 @@ type pending struct {
}
type reply struct {
- from NodeID
+ from enode.ID
ptype byte
data interface{}
// loop indicates whether there was
@@ -222,7 +233,7 @@ type Config struct {
AnnounceAddr *net.UDPAddr // local address announced in the DHT
NodeDBPath string // if set, the node database is stored at this filesystem location
NetRestrict *netutil.Netlist // network whitelist
- Bootnodes []*Node // list of bootstrap nodes
+ Bootnodes []*enode.Node // list of bootstrap nodes
Unhandled chan<- ReadPacket // unhandled packets are sent on this channel
}
@@ -237,6 +248,16 @@ func ListenUDP(c conn, cfg Config) (*Table, error) {
}
func newUDP(c conn, cfg Config) (*Table, *udp, error) {
+ realaddr := c.LocalAddr().(*net.UDPAddr)
+ if cfg.AnnounceAddr != nil {
+ realaddr = cfg.AnnounceAddr
+ }
+ self := enode.NewV4(&cfg.PrivateKey.PublicKey, realaddr.IP, realaddr.Port, realaddr.Port)
+ db, err := enode.OpenDB(cfg.NodeDBPath)
+ if err != nil {
+ return nil, nil, err
+ }
+
udp := &udp{
conn: c,
priv: cfg.PrivateKey,
@@ -245,13 +266,9 @@ func newUDP(c conn, cfg Config) (*Table, *udp, error) {
gotreply: make(chan reply),
addpending: make(chan *pending),
}
- realaddr := c.LocalAddr().(*net.UDPAddr)
- if cfg.AnnounceAddr != nil {
- realaddr = cfg.AnnounceAddr
- }
// TODO: separate TCP port
udp.ourEndpoint = makeEndpoint(realaddr, uint16(realaddr.Port))
- tab, err := newTable(udp, PubkeyID(&cfg.PrivateKey.PublicKey), realaddr, cfg.NodeDBPath, cfg.Bootnodes)
+ tab, err := newTable(udp, self, db, cfg.Bootnodes)
if err != nil {
return nil, nil, err
}
@@ -265,17 +282,18 @@ func newUDP(c conn, cfg Config) (*Table, *udp, error) {
func (t *udp) close() {
close(t.closing)
t.conn.Close()
+ t.db.Close()
// TODO: wait for the loops to end.
}
// ping sends a ping message to the given node and waits for a reply.
-func (t *udp) ping(toid NodeID, toaddr *net.UDPAddr) error {
+func (t *udp) ping(toid enode.ID, toaddr *net.UDPAddr) error {
return <-t.sendPing(toid, toaddr, nil)
}
// sendPing sends a ping message to the given node and invokes the callback
// when the reply arrives.
-func (t *udp) sendPing(toid NodeID, toaddr *net.UDPAddr, callback func()) <-chan error {
+func (t *udp) sendPing(toid enode.ID, toaddr *net.UDPAddr, callback func()) <-chan error {
req := &ping{
Version: 4,
From: t.ourEndpoint,
@@ -299,21 +317,21 @@ func (t *udp) sendPing(toid NodeID, toaddr *net.UDPAddr, callback func()) <-chan
return errc
}
-func (t *udp) waitping(from NodeID) error {
+func (t *udp) waitping(from enode.ID) error {
return <-t.pending(from, pingPacket, func(interface{}) bool { return true })
}
// findnode sends a findnode request to the given node and waits until
// the node has sent up to k neighbors.
-func (t *udp) findnode(toid NodeID, toaddr *net.UDPAddr, target NodeID) ([]*Node, error) {
+func (t *udp) findnode(toid enode.ID, toaddr *net.UDPAddr, target encPubkey) ([]*node, error) {
// If we haven't seen a ping from the destination node for a while, it won't remember
// our endpoint proof and reject findnode. Solicit a ping first.
- if time.Since(t.db.lastPingReceived(toid)) > nodeDBNodeExpiration {
+ if time.Since(t.db.LastPingReceived(toid)) > bondExpiration {
t.ping(toid, toaddr)
t.waitping(toid)
}
- nodes := make([]*Node, 0, bucketSize)
+ nodes := make([]*node, 0, bucketSize)
nreceived := 0
errc := t.pending(toid, neighborsPacket, func(r interface{}) bool {
reply := r.(*neighbors)
@@ -337,7 +355,7 @@ func (t *udp) findnode(toid NodeID, toaddr *net.UDPAddr, target NodeID) ([]*Node
// pending adds a reply callback to the pending reply queue.
// see the documentation of type pending for a detailed explanation.
-func (t *udp) pending(id NodeID, ptype byte, callback func(interface{}) bool) <-chan error {
+func (t *udp) pending(id enode.ID, ptype byte, callback func(interface{}) bool) <-chan error {
ch := make(chan error, 1)
p := &pending{from: id, ptype: ptype, callback: callback, errc: ch}
select {
@@ -349,7 +367,7 @@ func (t *udp) pending(id NodeID, ptype byte, callback func(interface{}) bool) <-
return ch
}
-func (t *udp) handleReply(from NodeID, ptype byte, req packet) bool {
+func (t *udp) handleReply(from enode.ID, ptype byte, req packet) bool {
matched := make(chan bool, 1)
select {
case t.gotreply <- reply{from, ptype, req, matched}:
@@ -563,19 +581,20 @@ func (t *udp) handlePacket(from *net.UDPAddr, buf []byte) error {
return err
}
-func decodePacket(buf []byte) (packet, NodeID, []byte, error) {
+func decodePacket(buf []byte) (packet, encPubkey, []byte, error) {
if len(buf) < headSize+1 {
- return nil, NodeID{}, nil, errPacketTooSmall
+ return nil, encPubkey{}, nil, errPacketTooSmall
}
hash, sig, sigdata := buf[:macSize], buf[macSize:headSize], buf[headSize:]
shouldhash := crypto.Keccak256(buf[macSize:])
if !bytes.Equal(hash, shouldhash) {
- return nil, NodeID{}, nil, errBadHash
+ return nil, encPubkey{}, nil, errBadHash
}
- fromID, err := recoverNodeID(crypto.Keccak256(buf[headSize:]), sig)
+ fromKey, err := recoverNodeKey(crypto.Keccak256(buf[headSize:]), sig)
if err != nil {
- return nil, NodeID{}, hash, err
+ return nil, fromKey, hash, err
}
+
var req packet
switch ptype := sigdata[0]; ptype {
case pingPacket:
@@ -587,56 +606,59 @@ func decodePacket(buf []byte) (packet, NodeID, []byte, error) {
case neighborsPacket:
req = new(neighbors)
default:
- return nil, fromID, hash, fmt.Errorf("unknown type: %d", ptype)
+ return nil, fromKey, hash, fmt.Errorf("unknown type: %d", ptype)
}
s := rlp.NewStream(bytes.NewReader(sigdata[1:]), 0)
err = s.Decode(req)
- return req, fromID, hash, err
+ return req, fromKey, hash, err
}
-func (req *ping) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
+func (req *ping) handle(t *udp, from *net.UDPAddr, fromKey encPubkey, mac []byte) error {
if expired(req.Expiration) {
return errExpired
}
+ key, err := decodePubkey(fromKey)
+ if err != nil {
+ return fmt.Errorf("invalid public key: %v", err)
+ }
t.send(from, pongPacket, &pong{
To: makeEndpoint(from, req.From.TCP),
ReplyTok: mac,
Expiration: uint64(time.Now().Add(expiration).Unix()),
})
- t.handleReply(fromID, pingPacket, req)
-
- // Add the node to the table. Before doing so, ensure that we have a recent enough pong
- // recorded in the database so their findnode requests will be accepted later.
- n := NewNode(fromID, from.IP, uint16(from.Port), req.From.TCP)
- if time.Since(t.db.lastPongReceived(fromID)) > nodeDBNodeExpiration {
- t.sendPing(fromID, from, func() { t.addThroughPing(n) })
+ n := wrapNode(enode.NewV4(key, from.IP, int(req.From.TCP), from.Port))
+ t.handleReply(n.ID(), pingPacket, req)
+ if time.Since(t.db.LastPongReceived(n.ID())) > bondExpiration {
+ t.sendPing(n.ID(), from, func() { t.addThroughPing(n) })
} else {
t.addThroughPing(n)
}
- t.db.updateLastPingReceived(fromID, time.Now())
+ t.db.UpdateLastPingReceived(n.ID(), time.Now())
return nil
}
func (req *ping) name() string { return "PING/v4" }
-func (req *pong) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
+func (req *pong) handle(t *udp, from *net.UDPAddr, fromKey encPubkey, mac []byte) error {
if expired(req.Expiration) {
return errExpired
}
+ fromID := fromKey.id()
if !t.handleReply(fromID, pongPacket, req) {
return errUnsolicitedReply
}
- t.db.updateLastPongReceived(fromID, time.Now())
+ t.db.UpdateLastPongReceived(fromID, time.Now())
return nil
}
func (req *pong) name() string { return "PONG/v4" }
-func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
+func (req *findnode) handle(t *udp, from *net.UDPAddr, fromKey encPubkey, mac []byte) error {
if expired(req.Expiration) {
return errExpired
}
- if !t.db.hasBond(fromID) {
+ fromID := fromKey.id()
+ if time.Since(t.db.LastPongReceived(fromID)) > bondExpiration {
// No endpoint proof pong exists, we don't process the packet. This prevents an
// attack vector where the discovery protocol could be used to amplify traffic in a
// DDOS attack. A malicious actor would send a findnode request with the IP address
@@ -645,7 +667,7 @@ func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte
// findnode) to the victim.
return errUnknownNode
}
- target := crypto.Keccak256Hash(req.Target[:])
+ target := enode.ID(crypto.Keccak256Hash(req.Target[:]))
t.mutex.Lock()
closest := t.closest(target, bucketSize).entries
t.mutex.Unlock()
@@ -655,7 +677,7 @@ func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte
// Send neighbors in chunks with at most maxNeighbors per packet
// to stay below the 1280 byte limit.
for _, n := range closest {
- if netutil.CheckRelayIP(from.IP, n.IP) == nil {
+ if netutil.CheckRelayIP(from.IP, n.IP()) == nil {
p.Nodes = append(p.Nodes, nodeToRPC(n))
}
if len(p.Nodes) == maxNeighbors {
@@ -672,11 +694,11 @@ func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte
func (req *findnode) name() string { return "FINDNODE/v4" }
-func (req *neighbors) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
+func (req *neighbors) handle(t *udp, from *net.UDPAddr, fromKey encPubkey, mac []byte) error {
if expired(req.Expiration) {
return errExpired
}
- if !t.handleReply(fromID, neighborsPacket, req) {
+ if !t.handleReply(fromKey.id(), neighborsPacket, req) {
return errUnsolicitedReply
}
return nil