aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/discover/udp.go
diff options
context:
space:
mode:
authorFelix Lange <fjl@twurst.com>2015-03-25 23:45:53 +0800
committerFelix Lange <fjl@twurst.com>2015-04-01 23:00:12 +0800
commitde7af720d6bb10b93d716fb0c6cf3ee0e51dc71a (patch)
treeb4ece2cb300a989ec349ff30ea84e8906131fdaf /p2p/discover/udp.go
parent92928309b2f0e274a6103b21a650eb07e78f88ea (diff)
downloaddexon-de7af720d6bb10b93d716fb0c6cf3ee0e51dc71a.tar
dexon-de7af720d6bb10b93d716fb0c6cf3ee0e51dc71a.tar.gz
dexon-de7af720d6bb10b93d716fb0c6cf3ee0e51dc71a.tar.bz2
dexon-de7af720d6bb10b93d716fb0c6cf3ee0e51dc71a.tar.lz
dexon-de7af720d6bb10b93d716fb0c6cf3ee0e51dc71a.tar.xz
dexon-de7af720d6bb10b93d716fb0c6cf3ee0e51dc71a.tar.zst
dexon-de7af720d6bb10b93d716fb0c6cf3ee0e51dc71a.zip
p2p/discover: implement node bonding
This a fix for an attack vector where the discovery protocol could be used to amplify traffic in a DDOS attack. A malicious actor would send a findnode request with the IP address and UDP port of the target as the source address. The recipient of the findnode packet would then send a neighbors packet (which is 16x the size of findnode) to the victim. Our solution is to require a 'bond' with the sender of findnode. If no bond exists, the findnode packet is not processed. A bond between nodes α and β is created when α replies to a ping from β. This (initial) version of the bonding implementation might still be vulnerable against replay attacks during the expiration time window. We will add stricter source address validation later.
Diffstat (limited to 'p2p/discover/udp.go')
-rw-r--r--p2p/discover/udp.go214
1 files changed, 130 insertions, 84 deletions
diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go
index 738a01fb7..e9ede1397 100644
--- a/p2p/discover/udp.go
+++ b/p2p/discover/udp.go
@@ -20,12 +20,14 @@ const Version = 3
// Errors
var (
- errPacketTooSmall = errors.New("too small")
- errBadHash = errors.New("bad hash")
- errExpired = errors.New("expired")
- errBadVersion = errors.New("version mismatch")
- errTimeout = errors.New("RPC timeout")
- errClosed = errors.New("socket closed")
+ errPacketTooSmall = errors.New("too small")
+ errBadHash = errors.New("bad hash")
+ errExpired = errors.New("expired")
+ errBadVersion = errors.New("version mismatch")
+ errUnsolicitedReply = errors.New("unsolicited reply")
+ errUnknownNode = errors.New("unknown node")
+ errTimeout = errors.New("RPC timeout")
+ errClosed = errors.New("socket closed")
)
// Timeouts
@@ -80,14 +82,27 @@ type rpcNode struct {
ID NodeID
}
+type packet interface {
+ handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error
+}
+
+type conn interface {
+ ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error)
+ WriteToUDP(b []byte, addr *net.UDPAddr) (n int, err error)
+ Close() error
+ LocalAddr() net.Addr
+}
+
// udp implements the RPC protocol.
type udp struct {
- conn *net.UDPConn
- priv *ecdsa.PrivateKey
+ conn conn
+ priv *ecdsa.PrivateKey
+
addpending chan *pending
- replies chan reply
- closing chan struct{}
- nat nat.Interface
+ gotreply chan reply
+
+ closing chan struct{}
+ nat nat.Interface
*Table
}
@@ -124,6 +139,9 @@ type reply struct {
from NodeID
ptype byte
data interface{}
+ // loop indicates whether there was
+ // a matching request by sending on this channel.
+ matched chan<- bool
}
// ListenUDP returns a new table that listens for UDP packets on laddr.
@@ -136,15 +154,20 @@ func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface) (*Table
if err != nil {
return nil, err
}
+ tab, _ := newUDP(priv, conn, natm)
+ log.Infoln("Listening,", tab.self)
+ return tab, nil
+}
+
+func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface) (*Table, *udp) {
udp := &udp{
- conn: conn,
+ conn: c,
priv: priv,
closing: make(chan struct{}),
+ gotreply: make(chan reply),
addpending: make(chan *pending),
- replies: make(chan reply),
}
-
- realaddr := conn.LocalAddr().(*net.UDPAddr)
+ realaddr := c.LocalAddr().(*net.UDPAddr)
if natm != nil {
if !realaddr.IP.IsLoopback() {
go nat.Map(natm, udp.closing, "udp", realaddr.Port, realaddr.Port, "ethereum discovery")
@@ -155,11 +178,9 @@ func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface) (*Table
}
}
udp.Table = newTable(udp, PubkeyID(&priv.PublicKey), realaddr)
-
go udp.loop()
go udp.readLoop()
- log.Infoln("Listening, ", udp.self)
- return udp.Table, nil
+ return udp.Table, udp
}
func (t *udp) close() {
@@ -169,10 +190,10 @@ func (t *udp) close() {
}
// ping sends a ping message to the given node and waits for a reply.
-func (t *udp) ping(e *Node) error {
+func (t *udp) ping(toid NodeID, toaddr *net.UDPAddr) error {
// TODO: maybe check for ReplyTo field in callback to measure RTT
- errc := t.pending(e.ID, pongPacket, func(interface{}) bool { return true })
- t.send(e, pingPacket, ping{
+ errc := t.pending(toid, pongPacket, func(interface{}) bool { return true })
+ t.send(toaddr, pingPacket, ping{
Version: Version,
IP: t.self.IP.String(),
Port: uint16(t.self.TCPPort),
@@ -181,12 +202,16 @@ func (t *udp) ping(e *Node) error {
return <-errc
}
+func (t *udp) waitping(from NodeID) error {
+ return <-t.pending(from, pingPacket, func(interface{}) bool { return true })
+}
+
// findnode sends a findnode request to the given node and waits until
// the node has sent up to k neighbors.
-func (t *udp) findnode(to *Node, target NodeID) ([]*Node, error) {
+func (t *udp) findnode(toid NodeID, toaddr *net.UDPAddr, target NodeID) ([]*Node, error) {
nodes := make([]*Node, 0, bucketSize)
nreceived := 0
- errc := t.pending(to.ID, neighborsPacket, func(r interface{}) bool {
+ errc := t.pending(toid, neighborsPacket, func(r interface{}) bool {
reply := r.(*neighbors)
for _, n := range reply.Nodes {
nreceived++
@@ -196,8 +221,7 @@ func (t *udp) findnode(to *Node, target NodeID) ([]*Node, error) {
}
return nreceived >= bucketSize
})
-
- t.send(to, findnodePacket, findnode{
+ t.send(toaddr, findnodePacket, findnode{
Target: target,
Expiration: uint64(time.Now().Add(expiration).Unix()),
})
@@ -219,6 +243,17 @@ func (t *udp) pending(id NodeID, ptype byte, callback func(interface{}) bool) <-
return ch
}
+func (t *udp) handleReply(from NodeID, ptype byte, req packet) bool {
+ matched := make(chan bool)
+ select {
+ case t.gotreply <- reply{from, ptype, req, matched}:
+ // loop will handle it
+ return <-matched
+ case <-t.closing:
+ return false
+ }
+}
+
// loop runs in its own goroutin. it keeps track of
// the refresh timer and the pending reply queue.
func (t *udp) loop() {
@@ -249,6 +284,7 @@ func (t *udp) loop() {
for _, p := range pending {
p.errc <- errClosed
}
+ pending = nil
return
case p := <-t.addpending:
@@ -256,18 +292,21 @@ func (t *udp) loop() {
pending = append(pending, p)
rearmTimeout()
- case reply := <-t.replies:
- // run matching callbacks, remove if they return false.
+ case r := <-t.gotreply:
+ var matched bool
for i := 0; i < len(pending); i++ {
- p := pending[i]
- if reply.from == p.from && reply.ptype == p.ptype && p.callback(reply.data) {
- p.errc <- nil
- copy(pending[i:], pending[i+1:])
- pending = pending[:len(pending)-1]
- i--
+ if p := pending[i]; p.from == r.from && p.ptype == r.ptype {
+ matched = true
+ if p.callback(r.data) {
+ // callback indicates the request is done, remove it.
+ p.errc <- nil
+ copy(pending[i:], pending[i+1:])
+ pending = pending[:len(pending)-1]
+ i--
+ }
}
}
- rearmTimeout()
+ r.matched <- matched
case now := <-timeout.C:
// notify and remove callbacks whose deadline is in the past.
@@ -292,33 +331,38 @@ const (
var headSpace = make([]byte, headSize)
-func (t *udp) send(to *Node, ptype byte, req interface{}) error {
+func (t *udp) send(toaddr *net.UDPAddr, ptype byte, req interface{}) error {
+ packet, err := encodePacket(t.priv, ptype, req)
+ if err != nil {
+ return err
+ }
+ log.DebugDetailf(">>> %v %T %v\n", toaddr, req, req)
+ if _, err = t.conn.WriteToUDP(packet, toaddr); err != nil {
+ log.DebugDetailln("UDP send failed:", err)
+ }
+ return err
+}
+
+func encodePacket(priv *ecdsa.PrivateKey, ptype byte, req interface{}) ([]byte, error) {
b := new(bytes.Buffer)
b.Write(headSpace)
b.WriteByte(ptype)
if err := rlp.Encode(b, req); err != nil {
log.Errorln("error encoding packet:", err)
- return err
+ return nil, err
}
-
packet := b.Bytes()
- sig, err := crypto.Sign(crypto.Sha3(packet[headSize:]), t.priv)
+ sig, err := crypto.Sign(crypto.Sha3(packet[headSize:]), priv)
if err != nil {
log.Errorln("could not sign packet:", err)
- return err
+ return nil, err
}
copy(packet[macSize:], sig)
// add the hash to the front. Note: this doesn't protect the
// packet in any way. Our public key will be part of this hash in
- // the future.
+ // The future.
copy(packet, crypto.Sha3(packet[macSize:]))
-
- toaddr := &net.UDPAddr{IP: to.IP, Port: to.DiscPort}
- log.DebugDetailf(">>> %v %T %v\n", toaddr, req, req)
- if _, err = t.conn.WriteToUDP(packet, toaddr); err != nil {
- log.DebugDetailln("UDP send failed:", err)
- }
- return err
+ return packet, nil
}
// readLoop runs in its own goroutine. it handles incoming UDP packets.
@@ -330,29 +374,34 @@ func (t *udp) readLoop() {
if err != nil {
return
}
- if err := t.packetIn(from, buf[:nbytes]); err != nil {
+ packet, fromID, hash, err := decodePacket(buf[:nbytes])
+ if err != nil {
log.Debugf("Bad packet from %v: %v\n", from, err)
+ continue
}
+ log.DebugDetailf("<<< %v %T %v\n", from, packet, packet)
+ go func() {
+ if err := packet.handle(t, from, fromID, hash); err != nil {
+ log.Debugf("error handling %T from %v: %v", packet, from, err)
+ }
+ }()
}
}
-func (t *udp) packetIn(from *net.UDPAddr, buf []byte) error {
+func decodePacket(buf []byte) (packet, NodeID, []byte, error) {
if len(buf) < headSize+1 {
- return errPacketTooSmall
+ return nil, NodeID{}, nil, errPacketTooSmall
}
hash, sig, sigdata := buf[:macSize], buf[macSize:headSize], buf[headSize:]
shouldhash := crypto.Sha3(buf[macSize:])
if !bytes.Equal(hash, shouldhash) {
- return errBadHash
+ return nil, NodeID{}, nil, errBadHash
}
fromID, err := recoverNodeID(crypto.Sha3(buf[headSize:]), sig)
if err != nil {
- return err
- }
-
- var req interface {
- handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error
+ return nil, NodeID{}, hash, err
}
+ var req packet
switch ptype := sigdata[0]; ptype {
case pingPacket:
req = new(ping)
@@ -363,13 +412,10 @@ func (t *udp) packetIn(from *net.UDPAddr, buf []byte) error {
case neighborsPacket:
req = new(neighbors)
default:
- return fmt.Errorf("unknown type: %d", ptype)
+ return nil, fromID, hash, fmt.Errorf("unknown type: %d", ptype)
}
- if err := rlp.Decode(bytes.NewReader(sigdata[1:]), req); err != nil {
- return err
- }
- log.DebugDetailf("<<< %v %T %v\n", from, req, req)
- return req.handle(t, from, fromID, hash)
+ err = rlp.Decode(bytes.NewReader(sigdata[1:]), req)
+ return req, fromID, hash, err
}
func (req *ping) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
@@ -379,18 +425,14 @@ func (req *ping) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) er
if req.Version != Version {
return errBadVersion
}
- t.mutex.Lock()
- // Note: we're ignoring the provided IP address right now
- n := t.bumpOrAdd(fromID, from)
- if req.Port != 0 {
- n.TCPPort = int(req.Port)
- }
- t.mutex.Unlock()
-
- t.send(n, pongPacket, pong{
+ t.send(from, pongPacket, pong{
ReplyTok: mac,
Expiration: uint64(time.Now().Add(expiration).Unix()),
})
+ if !t.handleReply(fromID, pingPacket, req) {
+ // Note: we're ignoring the provided IP address right now
+ t.bond(true, fromID, from, req.Port)
+ }
return nil
}
@@ -398,11 +440,9 @@ func (req *pong) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) er
if expired(req.Expiration) {
return errExpired
}
- t.mutex.Lock()
- t.bump(fromID)
- t.mutex.Unlock()
-
- t.replies <- reply{fromID, pongPacket, req}
+ if !t.handleReply(fromID, pongPacket, req) {
+ return errUnsolicitedReply
+ }
return nil
}
@@ -410,12 +450,21 @@ func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte
if expired(req.Expiration) {
return errExpired
}
+ if t.db.get(fromID) == nil {
+ // No bond exists, we don't process the packet. This prevents
+ // an attack vector where the discovery protocol could be used
+ // to amplify traffic in a DDOS attack. A malicious actor
+ // would send a findnode request with the IP address and UDP
+ // port of the target as the source address. The recipient of
+ // the findnode packet would then send a neighbors packet
+ // (which is a much bigger packet than findnode) to the victim.
+ return errUnknownNode
+ }
t.mutex.Lock()
- e := t.bumpOrAdd(fromID, from)
closest := t.closest(req.Target, bucketSize).entries
t.mutex.Unlock()
- t.send(e, neighborsPacket, neighbors{
+ t.send(from, neighborsPacket, neighbors{
Nodes: closest,
Expiration: uint64(time.Now().Add(expiration).Unix()),
})
@@ -426,12 +475,9 @@ func (req *neighbors) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byt
if expired(req.Expiration) {
return errExpired
}
- t.mutex.Lock()
- t.bump(fromID)
- t.add(req.Nodes)
- t.mutex.Unlock()
-
- t.replies <- reply{fromID, neighborsPacket, req}
+ if !t.handleReply(fromID, neighborsPacket, req) {
+ return errUnsolicitedReply
+ }
return nil
}