diff options
author | Felix Lange <fjl@users.noreply.github.com> | 2019-05-15 12:47:45 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-05-15 12:47:45 +0800 |
commit | 350a87dd3c8250db50326fff076df3fd21d8f69f (patch) | |
tree | 5e7fe1c25aca988cb57970addb7fcca36e025637 | |
parent | 8deec2e45a315393025145ace43782b6294541d5 (diff) | |
download | go-tangerine-350a87dd3c8250db50326fff076df3fd21d8f69f.tar go-tangerine-350a87dd3c8250db50326fff076df3fd21d8f69f.tar.gz go-tangerine-350a87dd3c8250db50326fff076df3fd21d8f69f.tar.bz2 go-tangerine-350a87dd3c8250db50326fff076df3fd21d8f69f.tar.lz go-tangerine-350a87dd3c8250db50326fff076df3fd21d8f69f.tar.xz go-tangerine-350a87dd3c8250db50326fff076df3fd21d8f69f.tar.zst go-tangerine-350a87dd3c8250db50326fff076df3fd21d8f69f.zip |
p2p/discover: add support for EIP-868 (v4 ENR extension) (#19540)
This change implements EIP-868. The UDPv4 transport announces support
for the extension in ping/pong and handles enrRequest messages.
There are two uses of the extension: If a remote node announces support
for EIP-868 in their pong, node revalidation pulls the node's record.
The Resolve method requests the record unconditionally.
-rw-r--r-- | p2p/discover/table.go | 60 | ||||
-rw-r--r-- | p2p/discover/table_test.go | 28 | ||||
-rw-r--r-- | p2p/discover/table_util_test.go | 48 | ||||
-rw-r--r-- | p2p/discover/v4_udp.go | 304 | ||||
-rw-r--r-- | p2p/discover/v4_udp_lookup_test.go | 4 | ||||
-rw-r--r-- | p2p/discover/v4_udp_test.go | 98 |
6 files changed, 406 insertions, 136 deletions
diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 61c62f187..3460e8377 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -53,15 +53,17 @@ const ( bucketIPLimit, bucketSubnet = 2, 24 // at most 2 addresses from the same /24 tableIPLimit, tableSubnet = 10, 24 - maxFindnodeFailures = 5 // Nodes exceeding this limit are dropped - refreshInterval = 30 * time.Minute - revalidateInterval = 10 * time.Second - copyNodesInterval = 30 * time.Second - seedMinTableTime = 5 * time.Minute - seedCount = 30 - seedMaxAge = 5 * 24 * time.Hour + refreshInterval = 30 * time.Minute + revalidateInterval = 10 * time.Second + copyNodesInterval = 30 * time.Second + seedMinTableTime = 5 * time.Minute + seedCount = 30 + seedMaxAge = 5 * 24 * time.Hour ) +// Table is the 'node table', a Kademlia-like index of neighbor nodes. The table keeps +// itself up-to-date by verifying the liveness of neighbors and requesting their node +// records when announcements of a new record version are received. type Table struct { mutex sync.Mutex // protects buckets, bucket content, nursery, rand buckets [nBuckets]*bucket // index of known nodes by distance @@ -80,12 +82,13 @@ type Table struct { nodeAddedHook func(*node) // for testing } -// transport is implemented by UDP transports. +// transport is implemented by the UDP transports. type transport interface { Self() *enode.Node lookupRandom() []*enode.Node lookupSelf() []*enode.Node - ping(*enode.Node) error + ping(*enode.Node) (seq uint64, err error) + requestENR(*enode.Node) (*enode.Node, error) } // bucket contains nodes, ordered by their last activity. the entry @@ -175,14 +178,16 @@ func (tab *Table) ReadRandomNodes(buf []*enode.Node) (n int) { return i + 1 } -// Resolve searches for a specific node with the given ID. -// It returns nil if the node could not be found. -func (tab *Table) Resolve(n *enode.Node) *enode.Node { +// getNode returns the node with the given ID or nil if it isn't in the table. +func (tab *Table) getNode(id enode.ID) *enode.Node { tab.mutex.Lock() - cl := tab.closest(n.ID(), 1, false) - tab.mutex.Unlock() - if len(cl.entries) > 0 && cl.entries[0].ID() == n.ID() { - return unwrapNode(cl.entries[0]) + defer tab.mutex.Unlock() + + b := tab.bucket(id) + for _, e := range b.entries { + if e.ID() == id { + return unwrapNode(e) + } } return nil } @@ -226,7 +231,7 @@ func (tab *Table) refresh() <-chan struct{} { return done } -// loop schedules refresh, revalidate runs and coordinates shutdown. +// loop schedules runs of doRefresh, doRevalidate and copyLiveNodes. func (tab *Table) loop() { var ( revalidate = time.NewTimer(tab.nextRevalidateTime()) @@ -288,9 +293,8 @@ loop: close(tab.closed) } -// doRefresh performs a lookup for a random target to keep buckets -// full. seed nodes are inserted if the table is empty (initial -// bootstrap or discarded faulty peers). +// doRefresh performs a lookup for a random target to keep buckets full. seed nodes are +// inserted if the table is empty (initial bootstrap or discarded faulty peers). func (tab *Table) doRefresh(done chan struct{}) { defer close(done) @@ -324,8 +328,8 @@ func (tab *Table) loadSeedNodes() { } } -// doRevalidate checks that the last node in a random bucket is still live -// and replaces or deletes the node if it isn't. +// doRevalidate checks that the last node in a random bucket is still live and replaces or +// deletes the node if it isn't. func (tab *Table) doRevalidate(done chan<- struct{}) { defer func() { done <- struct{}{} }() @@ -336,7 +340,17 @@ func (tab *Table) doRevalidate(done chan<- struct{}) { } // Ping the selected node and wait for a pong. - err := tab.net.ping(unwrapNode(last)) + remoteSeq, err := tab.net.ping(unwrapNode(last)) + + // Also fetch record if the node replied and returned a higher sequence number. + if last.Seq() < remoteSeq { + n, err := tab.net.requestENR(unwrapNode(last)) + if err != nil { + tab.log.Debug("ENR request failed", "id", last.ID(), "addr", last.addr(), "err", err) + } else { + last = &node{Node: *n, addedAt: last.addedAt, livenessChecks: last.livenessChecks} + } + } tab.mutex.Lock() defer tab.mutex.Unlock() diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go index 81763e7fe..895c284b2 100644 --- a/p2p/discover/table_test.go +++ b/p2p/discover/table_test.go @@ -368,6 +368,34 @@ func TestTable_addSeenNode(t *testing.T) { checkIPLimitInvariant(t, tab) } +// This test checks that ENR updates happen during revalidation. If a node in the table +// announces a new sequence number, the new record should be pulled. +func TestTable_revalidateSyncRecord(t *testing.T) { + transport := newPingRecorder() + tab, db := newTestTable(transport) + <-tab.initDone + defer db.Close() + defer tab.close() + + // Insert a node. + var r enr.Record + r.Set(enr.IP(net.IP{127, 0, 0, 1})) + id := enode.ID{1} + n1 := wrapNode(enode.SignNull(&r, id)) + tab.addSeenNode(n1) + + // Update the node record. + r.Set(enr.WithEntry("foo", "bar")) + n2 := enode.SignNull(&r, id) + transport.updateRecord(n2) + + tab.doRevalidate(make(chan struct{}, 1)) + intable := tab.getNode(id) + if !reflect.DeepEqual(intable, n2) { + t.Fatalf("table contains old record with seq %d, want seq %d", intable.Seq(), n2.Seq()) + } +} + // gen wraps quick.Value so it's easier to use. // it generates a random value of the given value's type. func gen(typ interface{}, rand *rand.Rand) interface{} { diff --git a/p2p/discover/table_util_test.go b/p2p/discover/table_util_test.go index 71cb1895b..3075c4340 100644 --- a/p2p/discover/table_util_test.go +++ b/p2p/discover/table_util_test.go @@ -98,6 +98,7 @@ func fillTable(tab *Table, nodes []*node) { type pingRecorder struct { mu sync.Mutex dead, pinged map[enode.ID]bool + records map[enode.ID]*enode.Node n *enode.Node } @@ -107,38 +108,53 @@ func newPingRecorder() *pingRecorder { n := enode.SignNull(&r, enode.ID{}) return &pingRecorder{ - dead: make(map[enode.ID]bool), - pinged: make(map[enode.ID]bool), - n: n, + dead: make(map[enode.ID]bool), + pinged: make(map[enode.ID]bool), + records: make(map[enode.ID]*enode.Node), + n: n, } } -func (t *pingRecorder) Self() *enode.Node { - return nullNode +// setRecord updates a node record. Future calls to ping and +// requestENR will return this record. +func (t *pingRecorder) updateRecord(n *enode.Node) { + t.mu.Lock() + defer t.mu.Unlock() + t.records[n.ID()] = n } -func (t *pingRecorder) ping(n *enode.Node) error { +// Stubs to satisfy the transport interface. +func (t *pingRecorder) Self() *enode.Node { return nullNode } +func (t *pingRecorder) lookupSelf() []*enode.Node { return nil } +func (t *pingRecorder) lookupRandom() []*enode.Node { return nil } +func (t *pingRecorder) close() {} + +// ping simulates a ping request. +func (t *pingRecorder) ping(n *enode.Node) (seq uint64, err error) { t.mu.Lock() defer t.mu.Unlock() t.pinged[n.ID()] = true if t.dead[n.ID()] { - return errTimeout - } else { - return nil + return 0, errTimeout } + if t.records[n.ID()] != nil { + seq = t.records[n.ID()].Seq() + } + return seq, nil } -func (t *pingRecorder) lookupSelf() []*enode.Node { - return nil -} +// requestENR simulates an ENR request. +func (t *pingRecorder) requestENR(n *enode.Node) (*enode.Node, error) { + t.mu.Lock() + defer t.mu.Unlock() -func (t *pingRecorder) lookupRandom() []*enode.Node { - return nil + if t.dead[n.ID()] || t.records[n.ID()] == nil { + return nil, errTimeout + } + return t.records[n.ID()], nil } -func (t *pingRecorder) close() {} - func hasDuplicates(slice []*node) bool { seen := make(map[enode.ID]bool) for i, e := range slice { diff --git a/p2p/discover/v4_udp.go b/p2p/discover/v4_udp.go index 3c68beac1..b0b0053a7 100644 --- a/p2p/discover/v4_udp.go +++ b/p2p/discover/v4_udp.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/p2p/netutil" "github.com/ethereum/go-ethereum/rlp" ) @@ -47,12 +48,12 @@ var ( errClosed = errors.New("socket closed") ) -// Timeouts const ( respTimeout = 500 * time.Millisecond expiration = 20 * time.Second bondExpiration = 24 * time.Hour + maxFindnodeFailures = 5 // nodes exceeding this limit are dropped ntpFailureThreshold = 32 // Continuous timeouts after which to check NTP ntpWarningCooldown = 10 * time.Minute // Minimum amount of time to pass before repeating NTP warning driftThreshold = 10 * time.Second // Allowed clock drift before warning user @@ -69,6 +70,8 @@ const ( p_pongV4 p_findnodeV4 p_neighborsV4 + p_enrRequestV4 + p_enrResponseV4 ) // RPC request structures @@ -112,6 +115,21 @@ type ( Rest []rlp.RawValue `rlp:"tail"` } + // enrRequestV4 queries for the remote node's record. + enrRequestV4 struct { + Expiration uint64 + // Ignore additional fields (for forward compatibility). + Rest []rlp.RawValue `rlp:"tail"` + } + + // enrResponseV4 is the reply to enrRequestV4. + enrResponseV4 struct { + ReplyTok []byte // Hash of the enrRequest packet. + Record enr.Record + // Ignore additional fields (for forward compatibility). + Rest []rlp.RawValue `rlp:"tail"` + } + rpcNode struct { IP net.IP // len 4 for IPv4 or 16 for IPv6 UDP uint16 // for discovery protocol @@ -126,14 +144,15 @@ type ( } ) -// packet is implemented by all v4 protocol messages. +// packetV4 is implemented by all v4 protocol messages. type packetV4 interface { // preverify checks whether the packet is valid and should be handled at all. preverify(t *UDPv4, from *net.UDPAddr, fromID enode.ID, fromKey encPubkey) error // handle handles the packet. handle(t *UDPv4, from *net.UDPAddr, fromID enode.ID, mac []byte) - // name returns the name of the packet for logging purposes. + // packet name and type for logging purposes. name() string + kind() byte } func makeEndpoint(addr *net.UDPAddr, tcpPort uint16) rpcEndpoint { @@ -191,7 +210,7 @@ type UDPv4 struct { closing chan struct{} } -// pending represents a pending reply. +// replyMatcher represents a pending reply. // // Some implementations of the protocol wish to send more than one // reply packet to findnode. In general, any neighbors packet cannot @@ -217,17 +236,20 @@ type replyMatcher struct { // errc receives nil when the callback indicates completion or an // error if no further reply is received within the timeout. - errc chan<- error + errc chan error + + // reply contains the most recent reply. This field is safe for reading after errc has + // received a value. + reply packetV4 } type replyMatchFunc func(interface{}) (matched bool, requestDone bool) +// reply is a reply packet from a certain node. type reply struct { - from enode.ID - ip net.IP - ptype byte - data packetV4 - + from enode.ID + ip net.IP + data packetV4 // loop indicates whether there was // a matching request by sending on this channel. matched chan<- bool @@ -377,7 +399,8 @@ func (t *UDPv4) lookupWorker(n *node, targetKey encPubkey, reply chan<- []*node) t.tab.delete(n) } } else if fails > 0 { - t.db.UpdateFindFails(n.ID(), n.IP(), fails-1) + // Reset failure counter because it counts _consecutive_ failures. + t.db.UpdateFindFails(n.ID(), n.IP(), 0) } // Grab as many nodes as possible. Some of them might not be alive anymore, but we'll @@ -388,23 +411,34 @@ func (t *UDPv4) lookupWorker(n *node, targetKey encPubkey, reply chan<- []*node) reply <- r } -// Resolve searches for a specific node with the given ID. -// It returns nil if the node could not be found. +// Resolve searches for a specific node with the given ID and tries to get the most recent +// version of the node record for it. It returns n if the node could not be resolved. func (t *UDPv4) Resolve(n *enode.Node) *enode.Node { - // If the node is present in the local table, no - // network interaction is required. - if intab := t.tab.Resolve(n); intab != nil { - return intab - } - // Otherwise, do a network lookup. - hash := n.ID() - result := t.LookupPubkey(n.Pubkey()) - for _, n := range result { - if n.ID() == hash { - return n + // Try asking directly. This works if the node is still responding on the endpoint we have. + if rn, err := t.requestENR(n); err == nil { + return rn + } + // Check table for the ID, we might have a newer version there. + if intable := t.tab.getNode(n.ID()); intable != nil && intable.Seq() > n.Seq() { + n = intable + if rn, err := t.requestENR(n); err == nil { + return rn } } - return nil + // Otherwise perform a network lookup. + var key *enode.Secp256k1 + if n.Load(key) != nil { + return n // no secp256k1 key + } + result := t.LookupPubkey((*ecdsa.PublicKey)(key)) + for _, rn := range result { + if rn.ID() == n.ID() { + if rn, err := t.requestENR(rn); err == nil { + return rn + } + } + } + return n } func (t *UDPv4) ourEndpoint() rpcEndpoint { @@ -414,28 +448,27 @@ func (t *UDPv4) ourEndpoint() rpcEndpoint { } // ping sends a ping message to the given node and waits for a reply. -func (t *UDPv4) ping(n *enode.Node) error { - return <-t.sendPing(n.ID(), &net.UDPAddr{IP: n.IP(), Port: n.UDP()}, nil) +func (t *UDPv4) ping(n *enode.Node) (seq uint64, err error) { + rm := t.sendPing(n.ID(), &net.UDPAddr{IP: n.IP(), Port: n.UDP()}, nil) + if err = <-rm.errc; err == nil { + seq = seqFromTail(rm.reply.(*pongV4).Rest) + } + return seq, err } // sendPing sends a ping message to the given node and invokes the callback // when the reply arrives. -func (t *UDPv4) sendPing(toid enode.ID, toaddr *net.UDPAddr, callback func()) <-chan error { - req := &pingV4{ - Version: 4, - From: t.ourEndpoint(), - To: makeEndpoint(toaddr, 0), // TODO: maybe use known TCP port from DB - Expiration: uint64(time.Now().Add(expiration).Unix()), - } - packet, hash, err := t.encode(t.priv, p_pingV4, req) +func (t *UDPv4) sendPing(toid enode.ID, toaddr *net.UDPAddr, callback func()) *replyMatcher { + req := t.makePing(toaddr) + packet, hash, err := t.encode(t.priv, req) if err != nil { errc := make(chan error, 1) errc <- err - return errc + return &replyMatcher{errc: errc} } // Add a matcher for the reply to the pending reply queue. Pongs are matched if they // reference the ping we're about to send. - errc := t.pending(toid, toaddr.IP, p_pongV4, func(p interface{}) (matched bool, requestDone bool) { + rm := t.pending(toid, toaddr.IP, p_pongV4, func(p interface{}) (matched bool, requestDone bool) { matched = bytes.Equal(p.(*pongV4).ReplyTok, hash) if matched && callback != nil { callback() @@ -445,25 +478,30 @@ func (t *UDPv4) sendPing(toid enode.ID, toaddr *net.UDPAddr, callback func()) <- // Send the packet. t.localNode.UDPContact(toaddr) t.write(toaddr, toid, req.name(), packet) - return errc + return rm +} + +func (t *UDPv4) makePing(toaddr *net.UDPAddr) *pingV4 { + seq, _ := rlp.EncodeToBytes(t.localNode.Node().Seq()) + return &pingV4{ + Version: 4, + From: t.ourEndpoint(), + To: makeEndpoint(toaddr, 0), + Expiration: uint64(time.Now().Add(expiration).Unix()), + Rest: []rlp.RawValue{seq}, + } } // findnode sends a findnode request to the given node and waits until // the node has sent up to k neighbors. func (t *UDPv4) findnode(toid enode.ID, toaddr *net.UDPAddr, target encPubkey) ([]*node, error) { - // If we haven't seen a ping from the destination node for a while, it won't remember - // our endpoint proof and reject findnode. Solicit a ping first. - if time.Since(t.db.LastPingReceived(toid, toaddr.IP)) > bondExpiration { - <-t.sendPing(toid, toaddr, nil) - // Wait for them to ping back and process our pong. - time.Sleep(respTimeout) - } + t.ensureBond(toid, toaddr) // Add a matcher for 'neighbours' replies to the pending reply queue. The matcher is // active until enough nodes have been received. nodes := make([]*node, 0, bucketSize) nreceived := 0 - errc := t.pending(toid, toaddr.IP, p_neighborsV4, func(r interface{}) (matched bool, requestDone bool) { + rm := t.pending(toid, toaddr.IP, p_neighborsV4, func(r interface{}) (matched bool, requestDone bool) { reply := r.(*neighborsV4) for _, rn := range reply.Nodes { nreceived++ @@ -476,16 +514,56 @@ func (t *UDPv4) findnode(toid enode.ID, toaddr *net.UDPAddr, target encPubkey) ( } return true, nreceived >= bucketSize }) - t.send(toaddr, toid, p_findnodeV4, &findnodeV4{ + t.send(toaddr, toid, &findnodeV4{ Target: target, Expiration: uint64(time.Now().Add(expiration).Unix()), }) - return nodes, <-errc + return nodes, <-rm.errc +} + +// requestENR sends enrRequest to the given node and waits for a response. +func (t *UDPv4) requestENR(n *enode.Node) (*enode.Node, error) { + addr := &net.UDPAddr{IP: n.IP(), Port: n.UDP()} + t.ensureBond(n.ID(), addr) + + req := &enrRequestV4{ + Expiration: uint64(time.Now().Add(expiration).Unix()), + } + packet, hash, err := t.encode(t.priv, req) + if err != nil { + return nil, err + } + // Add a matcher for the reply to the pending reply queue. Responses are matched if + // they reference the request we're about to send. + rm := t.pending(n.ID(), addr.IP, p_enrResponseV4, func(r interface{}) (matched bool, requestDone bool) { + matched = bytes.Equal(r.(*enrResponseV4).ReplyTok, hash) + return matched, matched + }) + // Send the packet and wait for the reply. + t.write(addr, n.ID(), req.name(), packet) + if err := <-rm.errc; err != nil { + return nil, err + } + // Verify the response record. + respN, err := enode.New(enode.ValidSchemes, &rm.reply.(*enrResponseV4).Record) + if err != nil { + return nil, err + } + if respN.ID() != n.ID() { + return nil, fmt.Errorf("invalid ID in response record") + } + if respN.Seq() < n.Seq() { + return n, nil // response record is older + } + if err := netutil.CheckRelayIP(addr.IP, respN.IP()); err != nil { + return nil, fmt.Errorf("invalid IP in response record: %v", err) + } + return respN, nil } // pending adds a reply matcher to the pending reply queue. // see the documentation of type replyMatcher for a detailed explanation. -func (t *UDPv4) pending(id enode.ID, ip net.IP, ptype byte, callback replyMatchFunc) <-chan error { +func (t *UDPv4) pending(id enode.ID, ip net.IP, ptype byte, callback replyMatchFunc) *replyMatcher { ch := make(chan error, 1) p := &replyMatcher{from: id, ip: ip, ptype: ptype, callback: callback, errc: ch} select { @@ -494,15 +572,15 @@ func (t *UDPv4) pending(id enode.ID, ip net.IP, ptype byte, callback replyMatchF case <-t.closing: ch <- errClosed } - return ch + return p } // handleReply dispatches a reply packet, invoking reply matchers. It returns // whether any matcher considered the packet acceptable. -func (t *UDPv4) handleReply(from enode.ID, fromIP net.IP, ptype byte, req packetV4) bool { +func (t *UDPv4) handleReply(from enode.ID, fromIP net.IP, req packetV4) bool { matched := make(chan bool, 1) select { - case t.gotreply <- reply{from, fromIP, ptype, req, matched}: + case t.gotreply <- reply{from, fromIP, req, matched}: // loop will handle it return <-matched case <-t.closing: @@ -565,11 +643,12 @@ func (t *UDPv4) loop() { var matched bool // whether any replyMatcher considered the reply acceptable. for el := plist.Front(); el != nil; el = el.Next() { p := el.Value.(*replyMatcher) - if p.from == r.from && p.ptype == r.ptype && p.ip.Equal(r.ip) { + if p.from == r.from && p.ptype == r.data.kind() && p.ip.Equal(r.ip) { ok, requestDone := p.callback(r.data) matched = matched || ok // Remove the matcher if callback indicates that all replies have been received. if requestDone { + p.reply = r.data p.errc <- nil plist.Remove(el) } @@ -635,8 +714,8 @@ func init() { } } -func (t *UDPv4) send(toaddr *net.UDPAddr, toid enode.ID, ptype byte, req packetV4) ([]byte, error) { - packet, hash, err := t.encode(t.priv, ptype, req) +func (t *UDPv4) send(toaddr *net.UDPAddr, toid enode.ID, req packetV4) ([]byte, error) { + packet, hash, err := t.encode(t.priv, req) if err != nil { return hash, err } @@ -649,18 +728,19 @@ func (t *UDPv4) write(toaddr *net.UDPAddr, toid enode.ID, what string, packet [] return err } -func (t *UDPv4) encode(priv *ecdsa.PrivateKey, ptype byte, req interface{}) (packet, hash []byte, err error) { +func (t *UDPv4) encode(priv *ecdsa.PrivateKey, req packetV4) (packet, hash []byte, err error) { + name := req.name() b := new(bytes.Buffer) b.Write(headSpace) - b.WriteByte(ptype) + b.WriteByte(req.kind()) if err := rlp.Encode(b, req); err != nil { - t.log.Error("Can't encode discv4 packet", "err", err) + t.log.Error(fmt.Sprintf("Can't encode %s packet", name), "err", err) return nil, nil, err } packet = b.Bytes() sig, err := crypto.Sign(crypto.Keccak256(packet[headSize:]), priv) if err != nil { - t.log.Error("Can't sign discv4 packet", "err", err) + t.log.Error(fmt.Sprintf("Can't sign %s packet", name), "err", err) return nil, nil, err } copy(packet[macSize:], sig) @@ -743,6 +823,10 @@ func decodeV4(buf []byte) (packetV4, encPubkey, []byte, error) { req = new(findnodeV4) case p_neighborsV4: req = new(neighborsV4) + case p_enrRequestV4: + req = new(enrRequestV4) + case p_enrResponseV4: + req = new(enrResponseV4) default: return nil, fromKey, hash, fmt.Errorf("unknown type: %d", ptype) } @@ -751,7 +835,41 @@ func decodeV4(buf []byte) (packetV4, encPubkey, []byte, error) { return req, fromKey, hash, err } -// Packet Handlers +// checkBond checks if the given node has a recent enough endpoint proof. +func (t *UDPv4) checkBond(id enode.ID, ip net.IP) bool { + return time.Since(t.db.LastPongReceived(id, ip)) < bondExpiration +} + +// ensureBond solicits a ping from a node if we haven't seen a ping from it for a while. +// This ensures there is a valid endpoint proof on the remote end. +func (t *UDPv4) ensureBond(toid enode.ID, toaddr *net.UDPAddr) { + tooOld := time.Since(t.db.LastPingReceived(toid, toaddr.IP)) > bondExpiration + if tooOld || t.db.FindFails(toid, toaddr.IP) > maxFindnodeFailures { + rm := t.sendPing(toid, toaddr, nil) + <-rm.errc + // Wait for them to ping back and process our pong. + time.Sleep(respTimeout) + } +} + +// expired checks whether the given UNIX time stamp is in the past. +func expired(ts uint64) bool { + return time.Unix(int64(ts), 0).Before(time.Now()) +} + +func seqFromTail(tail []rlp.RawValue) uint64 { + if len(tail) == 0 { + return 0 + } + var seq uint64 + rlp.DecodeBytes(tail[0], &seq) + return seq +} + +// PING/v4 + +func (req *pingV4) name() string { return "PING/v4" } +func (req *pingV4) kind() byte { return p_pingV4 } func (req *pingV4) preverify(t *UDPv4, from *net.UDPAddr, fromID enode.ID, fromKey encPubkey) error { if expired(req.Expiration) { @@ -767,10 +885,12 @@ func (req *pingV4) preverify(t *UDPv4, from *net.UDPAddr, fromID enode.ID, fromK func (req *pingV4) handle(t *UDPv4, from *net.UDPAddr, fromID enode.ID, mac []byte) { // Reply. - t.send(from, fromID, p_pongV4, &pongV4{ + seq, _ := rlp.EncodeToBytes(t.localNode.Node().Seq()) + t.send(from, fromID, &pongV4{ To: makeEndpoint(from, req.From.TCP), ReplyTok: mac, Expiration: uint64(time.Now().Add(expiration).Unix()), + Rest: []rlp.RawValue{seq}, }) // Ping back if our last pong on file is too far in the past. @@ -788,13 +908,16 @@ func (req *pingV4) handle(t *UDPv4, from *net.UDPAddr, fromID enode.ID, mac []by t.localNode.UDPEndpointStatement(from, &net.UDPAddr{IP: req.To.IP, Port: int(req.To.UDP)}) } -func (req *pingV4) name() string { return "PING/v4" } +// PONG/v4 + +func (req *pongV4) name() string { return "PONG/v4" } +func (req *pongV4) kind() byte { return p_pongV4 } func (req *pongV4) preverify(t *UDPv4, from *net.UDPAddr, fromID enode.ID, fromKey encPubkey) error { if expired(req.Expiration) { return errExpired } - if !t.handleReply(fromID, from.IP, p_pongV4, req) { + if !t.handleReply(fromID, from.IP, req) { return errUnsolicitedReply } return nil @@ -805,13 +928,16 @@ func (req *pongV4) handle(t *UDPv4, from *net.UDPAddr, fromID enode.ID, mac []by t.db.UpdateLastPongReceived(fromID, from.IP, time.Now()) } -func (req *pongV4) name() string { return "PONG/v4" } +// FINDNODE/v4 + +func (req *findnodeV4) name() string { return "FINDNODE/v4" } +func (req *findnodeV4) kind() byte { return p_findnodeV4 } func (req *findnodeV4) preverify(t *UDPv4, from *net.UDPAddr, fromID enode.ID, fromKey encPubkey) error { if expired(req.Expiration) { return errExpired } - if time.Since(t.db.LastPongReceived(fromID, from.IP)) > bondExpiration { + if !t.checkBond(fromID, from.IP) { // No endpoint proof pong exists, we don't process the packet. This prevents an // attack vector where the discovery protocol could be used to amplify traffic in a // DDOS attack. A malicious actor would send a findnode request with the IP address @@ -839,23 +965,26 @@ func (req *findnodeV4) handle(t *UDPv4, from *net.UDPAddr, fromID enode.ID, mac p.Nodes = append(p.Nodes, nodeToRPC(n)) } if len(p.Nodes) == maxNeighbors { - t.send(from, fromID, p_neighborsV4, &p) + t.send(from, fromID, &p) p.Nodes = p.Nodes[:0] sent = true } } if len(p.Nodes) > 0 || !sent { - t.send(from, fromID, p_neighborsV4, &p) + t.send(from, fromID, &p) } } -func (req *findnodeV4) name() string { return "FINDNODE/v4" } +// NEIGHBORS/v4 + +func (req *neighborsV4) name() string { return "NEIGHBORS/v4" } +func (req *neighborsV4) kind() byte { return p_neighborsV4 } func (req *neighborsV4) preverify(t *UDPv4, from *net.UDPAddr, fromID enode.ID, fromKey encPubkey) error { if expired(req.Expiration) { return errExpired } - if !t.handleReply(fromID, from.IP, p_neighborsV4, req) { + if !t.handleReply(fromID, from.IP, req) { return errUnsolicitedReply } return nil @@ -864,8 +993,39 @@ func (req *neighborsV4) preverify(t *UDPv4, from *net.UDPAddr, fromID enode.ID, func (req *neighborsV4) handle(t *UDPv4, from *net.UDPAddr, fromID enode.ID, mac []byte) { } -func (req *neighborsV4) name() string { return "NEIGHBORS/v4" } +// ENRREQUEST/v4 -func expired(ts uint64) bool { - return time.Unix(int64(ts), 0).Before(time.Now()) +func (req *enrRequestV4) name() string { return "ENRREQUEST/v4" } +func (req *enrRequestV4) kind() byte { return p_enrRequestV4 } + +func (req *enrRequestV4) preverify(t *UDPv4, from *net.UDPAddr, fromID enode.ID, fromKey encPubkey) error { + if expired(req.Expiration) { + return errExpired + } + if !t.checkBond(fromID, from.IP) { + return errUnknownNode + } + return nil +} + +func (req *enrRequestV4) handle(t *UDPv4, from *net.UDPAddr, fromID enode.ID, mac []byte) { + t.send(from, fromID, &enrResponseV4{ + ReplyTok: mac, + Record: *t.localNode.Node().Record(), + }) +} + +// ENRRESPONSE/v4 + +func (req *enrResponseV4) name() string { return "ENRRESPONSE/v4" } +func (req *enrResponseV4) kind() byte { return p_enrResponseV4 } + +func (req *enrResponseV4) preverify(t *UDPv4, from *net.UDPAddr, fromID enode.ID, fromKey encPubkey) error { + if !t.handleReply(fromID, from.IP, req) { + return errUnsolicitedReply + } + return nil +} + +func (req *enrResponseV4) handle(t *UDPv4, from *net.UDPAddr, fromID enode.ID, mac []byte) { } diff --git a/p2p/discover/v4_udp_lookup_test.go b/p2p/discover/v4_udp_lookup_test.go index 7e12aa498..bc1cdfb08 100644 --- a/p2p/discover/v4_udp_lookup_test.go +++ b/p2p/discover/v4_udp_lookup_test.go @@ -54,11 +54,11 @@ func TestUDPv4_Lookup(t *testing.T) { n, key := lookupTestnet.nodeByAddr(to) switch p.(type) { case *pingV4: - test.packetInFrom(nil, key, to, p_pongV4, &pongV4{Expiration: futureExp, ReplyTok: hash}) + test.packetInFrom(nil, key, to, &pongV4{Expiration: futureExp, ReplyTok: hash}) case *findnodeV4: dist := enode.LogDist(n.ID(), lookupTestnet.target.id()) nodes := lookupTestnet.nodesAtDistance(dist - 1) - test.packetInFrom(nil, key, to, p_neighborsV4, &neighborsV4{Expiration: futureExp, Nodes: nodes}) + test.packetInFrom(nil, key, to, &neighborsV4{Expiration: futureExp, Nodes: nodes}) } }) } diff --git a/p2p/discover/v4_udp_test.go b/p2p/discover/v4_udp_test.go index 0aa4c01e5..9d7badea1 100644 --- a/p2p/discover/v4_udp_test.go +++ b/p2p/discover/v4_udp_test.go @@ -37,6 +37,7 @@ import ( "github.com/ethereum/go-ethereum/internal/testlog" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/rlp" ) @@ -91,19 +92,19 @@ func (test *udpTest) close() { } // handles a packet as if it had been sent to the transport. -func (test *udpTest) packetIn(wantError error, ptype byte, data packetV4) { +func (test *udpTest) packetIn(wantError error, data packetV4) { test.t.Helper() - test.packetInFrom(wantError, test.remotekey, test.remoteaddr, ptype, data) + test.packetInFrom(wantError, test.remotekey, test.remoteaddr, data) } // handles a packet as if it had been sent to the transport by the key/endpoint. -func (test *udpTest) packetInFrom(wantError error, key *ecdsa.PrivateKey, addr *net.UDPAddr, ptype byte, data packetV4) { +func (test *udpTest) packetInFrom(wantError error, key *ecdsa.PrivateKey, addr *net.UDPAddr, data packetV4) { test.t.Helper() - enc, _, err := test.udp.encode(key, ptype, data) + enc, _, err := test.udp.encode(key, data) if err != nil { - test.t.Errorf("packet (%d) encode error: %v", ptype, err) + test.t.Errorf("%s encode error: %v", data.name(), err) } test.sent = append(test.sent, enc) if err = test.udp.handlePacket(addr, enc); err != wantError { @@ -139,10 +140,10 @@ func TestUDPv4_packetErrors(t *testing.T) { test := newUDPTest(t) defer test.close() - test.packetIn(errExpired, p_pingV4, &pingV4{From: testRemote, To: testLocalAnnounced, Version: 4}) - test.packetIn(errUnsolicitedReply, p_pongV4, &pongV4{ReplyTok: []byte{}, Expiration: futureExp}) - test.packetIn(errUnknownNode, p_findnodeV4, &findnodeV4{Expiration: futureExp}) - test.packetIn(errUnsolicitedReply, p_neighborsV4, &neighborsV4{Expiration: futureExp}) + test.packetIn(errExpired, &pingV4{From: testRemote, To: testLocalAnnounced, Version: 4}) + test.packetIn(errUnsolicitedReply, &pongV4{ReplyTok: []byte{}, Expiration: futureExp}) + test.packetIn(errUnknownNode, &findnodeV4{Expiration: futureExp}) + test.packetIn(errUnsolicitedReply, &neighborsV4{Expiration: futureExp}) } func TestUDPv4_pingTimeout(t *testing.T) { @@ -153,11 +154,21 @@ func TestUDPv4_pingTimeout(t *testing.T) { key := newkey() toaddr := &net.UDPAddr{IP: net.ParseIP("1.2.3.4"), Port: 2222} node := enode.NewV4(&key.PublicKey, toaddr.IP, 0, toaddr.Port) - if err := test.udp.ping(node); err != errTimeout { + if _, err := test.udp.ping(node); err != errTimeout { t.Error("expected timeout error, got", err) } } +type testPacket byte + +func (req testPacket) kind() byte { return byte(req) } +func (req testPacket) name() string { return "" } +func (req testPacket) preverify(*UDPv4, *net.UDPAddr, enode.ID, encPubkey) error { + return nil +} +func (req testPacket) handle(*UDPv4, *net.UDPAddr, enode.ID, []byte) { +} + func TestUDPv4_responseTimeouts(t *testing.T) { t.Parallel() test := newUDPTest(t) @@ -192,7 +203,7 @@ func TestUDPv4_responseTimeouts(t *testing.T) { p.errc = nilErr test.udp.addReplyMatcher <- p time.AfterFunc(randomDuration(60*time.Millisecond), func() { - if !test.udp.handleReply(p.from, p.ip, p.ptype, nil) { + if !test.udp.handleReply(p.from, p.ip, testPacket(p.ptype)) { t.Logf("not matched: %v", p) } }) @@ -277,7 +288,7 @@ func TestUDPv4_findnode(t *testing.T) { // check that closest neighbors are returned. expected := test.table.closest(testTarget.id(), bucketSize, true) - test.packetIn(nil, p_findnodeV4, &findnodeV4{Target: testTarget, Expiration: futureExp}) + test.packetIn(nil, &findnodeV4{Target: testTarget, Expiration: futureExp}) waitNeighbors := func(want []*node) { test.waitPacketOut(func(p *neighborsV4, to *net.UDPAddr, hash []byte) { if len(p.Nodes) != len(want) { @@ -340,8 +351,8 @@ func TestUDPv4_findnodeMultiReply(t *testing.T) { for i := range list { rpclist[i] = nodeToRPC(list[i]) } - test.packetIn(nil, p_neighborsV4, &neighborsV4{Expiration: futureExp, Nodes: rpclist[:2]}) - test.packetIn(nil, p_neighborsV4, &neighborsV4{Expiration: futureExp, Nodes: rpclist[2:]}) + test.packetIn(nil, &neighborsV4{Expiration: futureExp, Nodes: rpclist[:2]}) + test.packetIn(nil, &neighborsV4{Expiration: futureExp, Nodes: rpclist[2:]}) // check that the sent neighbors are all returned by findnode select { @@ -357,6 +368,7 @@ func TestUDPv4_findnodeMultiReply(t *testing.T) { } } +// This test checks that reply matching of pong verifies the ping hash. func TestUDPv4_pingMatch(t *testing.T) { test := newUDPTest(t) defer test.close() @@ -364,22 +376,23 @@ func TestUDPv4_pingMatch(t *testing.T) { randToken := make([]byte, 32) crand.Read(randToken) - test.packetIn(nil, p_pingV4, &pingV4{From: testRemote, To: testLocalAnnounced, Version: 4, Expiration: futureExp}) + test.packetIn(nil, &pingV4{From: testRemote, To: testLocalAnnounced, Version: 4, Expiration: futureExp}) test.waitPacketOut(func(*pongV4, *net.UDPAddr, []byte) {}) test.waitPacketOut(func(*pingV4, *net.UDPAddr, []byte) {}) - test.packetIn(errUnsolicitedReply, p_pongV4, &pongV4{ReplyTok: randToken, To: testLocalAnnounced, Expiration: futureExp}) + test.packetIn(errUnsolicitedReply, &pongV4{ReplyTok: randToken, To: testLocalAnnounced, Expiration: futureExp}) } +// This test checks that reply matching of pong verifies the sender IP address. func TestUDPv4_pingMatchIP(t *testing.T) { test := newUDPTest(t) defer test.close() - test.packetIn(nil, p_pingV4, &pingV4{From: testRemote, To: testLocalAnnounced, Version: 4, Expiration: futureExp}) + test.packetIn(nil, &pingV4{From: testRemote, To: testLocalAnnounced, Version: 4, Expiration: futureExp}) test.waitPacketOut(func(*pongV4, *net.UDPAddr, []byte) {}) test.waitPacketOut(func(p *pingV4, to *net.UDPAddr, hash []byte) { wrongAddr := &net.UDPAddr{IP: net.IP{33, 44, 1, 2}, Port: 30000} - test.packetInFrom(errUnsolicitedReply, test.remotekey, wrongAddr, p_pongV4, &pongV4{ + test.packetInFrom(errUnsolicitedReply, test.remotekey, wrongAddr, &pongV4{ ReplyTok: hash, To: testLocalAnnounced, Expiration: futureExp, @@ -394,9 +407,9 @@ func TestUDPv4_successfulPing(t *testing.T) { defer test.close() // The remote side sends a ping packet to initiate the exchange. - go test.packetIn(nil, p_pingV4, &pingV4{From: testRemote, To: testLocalAnnounced, Version: 4, Expiration: futureExp}) + go test.packetIn(nil, &pingV4{From: testRemote, To: testLocalAnnounced, Version: 4, Expiration: futureExp}) - // the ping is replied to. + // The ping is replied to. test.waitPacketOut(func(p *pongV4, to *net.UDPAddr, hash []byte) { pinghash := test.sent[0][:macSize] if !bytes.Equal(p.ReplyTok, pinghash) { @@ -413,7 +426,7 @@ func TestUDPv4_successfulPing(t *testing.T) { } }) - // remote is unknown, the table pings back. + // Remote is unknown, the table pings back. test.waitPacketOut(func(p *pingV4, to *net.UDPAddr, hash []byte) { if !reflect.DeepEqual(p.From, test.udp.ourEndpoint()) { t.Errorf("got ping.From %#v, want %#v", p.From, test.udp.ourEndpoint()) @@ -427,10 +440,10 @@ func TestUDPv4_successfulPing(t *testing.T) { if !reflect.DeepEqual(p.To, wantTo) { t.Errorf("got ping.To %v, want %v", p.To, wantTo) } - test.packetIn(nil, p_pongV4, &pongV4{ReplyTok: hash, Expiration: futureExp}) + test.packetIn(nil, &pongV4{ReplyTok: hash, Expiration: futureExp}) }) - // the node should be added to the table shortly after getting the + // The node should be added to the table shortly after getting the // pong packet. select { case n := <-added: @@ -452,6 +465,45 @@ func TestUDPv4_successfulPing(t *testing.T) { } } +// This test checks that EIP-868 requests work. +func TestUDPv4_EIP868(t *testing.T) { + test := newUDPTest(t) + defer test.close() + + test.udp.localNode.Set(enr.WithEntry("foo", "bar")) + wantNode := test.udp.localNode.Node() + + // ENR requests aren't allowed before endpoint proof. + test.packetIn(errUnknownNode, &enrRequestV4{Expiration: futureExp}) + + // Perform endpoint proof and check for sequence number in packet tail. + test.packetIn(nil, &pingV4{Expiration: futureExp}) + test.waitPacketOut(func(p *pongV4, addr *net.UDPAddr, hash []byte) { + if seq := seqFromTail(p.Rest); seq != wantNode.Seq() { + t.Errorf("wrong sequence number in pong: %d, want %d", seq, wantNode.Seq()) + } + }) + test.waitPacketOut(func(p *pingV4, addr *net.UDPAddr, hash []byte) { + if seq := seqFromTail(p.Rest); seq != wantNode.Seq() { + t.Errorf("wrong sequence number in ping: %d, want %d", seq, wantNode.Seq()) + } + test.packetIn(nil, &pongV4{Expiration: futureExp, ReplyTok: hash}) + }) + + // Request should work now. + test.packetIn(nil, &enrRequestV4{Expiration: futureExp}) + test.waitPacketOut(func(p *enrResponseV4, addr *net.UDPAddr, hash []byte) { + n, err := enode.New(enode.ValidSchemes, &p.Record) + if err != nil { + t.Fatalf("invalid record: %v", err) + } + if !reflect.DeepEqual(n, wantNode) { + t.Fatalf("wrong node in enrResponse: %v", n) + } + }) +} + +// EIP-8 test vectors. var testPackets = []struct { input string wantPacket interface{} |