diff options
Diffstat (limited to 'p2p/discover/udp.go')
-rw-r--r-- | p2p/discover/udp.go | 91 |
1 files changed, 55 insertions, 36 deletions
diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go index 95862c72b..008e63937 100644 --- a/p2p/discover/udp.go +++ b/p2p/discover/udp.go @@ -1,23 +1,24 @@ // Copyright 2015 The go-ethereum Authors -// This file is part of go-ethereum. +// This file is part of the go-ethereum library. // -// go-ethereum is free software: you can redistribute it and/or modify +// The go-ethereum library is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // -// go-ethereum is distributed in the hope that it will be useful, +// The go-ethereum library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License -// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>. +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. package discover import ( "bytes" + "container/list" "crypto/ecdsa" "errors" "fmt" @@ -25,6 +26,7 @@ import ( "time" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/fdtrack" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/p2p/nat" @@ -42,6 +44,7 @@ var ( errUnsolicitedReply = errors.New("unsolicited reply") errUnknownNode = errors.New("unknown node") errTimeout = errors.New("RPC timeout") + errClockWarp = errors.New("reply deadline too far in the future") errClosed = errors.New("socket closed") ) @@ -197,6 +200,7 @@ func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, nodeDBP if err != nil { return nil, err } + fdtrack.Open("p2p") conn, err := net.ListenUDP("udp", addr) if err != nil { return nil, err @@ -234,6 +238,7 @@ func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, nodeDBPath strin func (t *udp) close() { close(t.closing) + fdtrack.Close("p2p") t.conn.Close() // TODO: wait for the loops to end. } @@ -293,7 +298,7 @@ func (t *udp) pending(id NodeID, ptype byte, callback func(interface{}) bool) <- } func (t *udp) handleReply(from NodeID, ptype byte, req packet) bool { - matched := make(chan bool) + matched := make(chan bool, 1) select { case t.gotreply <- reply{from, ptype, req, matched}: // loop will handle it @@ -307,68 +312,82 @@ func (t *udp) handleReply(from NodeID, ptype byte, req packet) bool { // the refresh timer and the pending reply queue. func (t *udp) loop() { var ( - pending []*pending - nextDeadline time.Time - timeout = time.NewTimer(0) - refresh = time.NewTicker(refreshInterval) + plist = list.New() + timeout = time.NewTimer(0) + nextTimeout *pending // head of plist when timeout was last reset + refresh = time.NewTicker(refreshInterval) ) <-timeout.C // ignore first timeout defer refresh.Stop() defer timeout.Stop() - rearmTimeout := func() { - now := time.Now() - if len(pending) == 0 || now.Before(nextDeadline) { + resetTimeout := func() { + if plist.Front() == nil || nextTimeout == plist.Front().Value { return } - nextDeadline = pending[0].deadline - timeout.Reset(nextDeadline.Sub(now)) + // Start the timer so it fires when the next pending reply has expired. + now := time.Now() + for el := plist.Front(); el != nil; el = el.Next() { + nextTimeout = el.Value.(*pending) + if dist := nextTimeout.deadline.Sub(now); dist < 2*respTimeout { + timeout.Reset(dist) + return + } + // Remove pending replies whose deadline is too far in the + // future. These can occur if the system clock jumped + // backwards after the deadline was assigned. + nextTimeout.errc <- errClockWarp + plist.Remove(el) + } + nextTimeout = nil + timeout.Stop() } for { + resetTimeout() + select { case <-refresh.C: go t.refresh() case <-t.closing: - for _, p := range pending { - p.errc <- errClosed + for el := plist.Front(); el != nil; el = el.Next() { + el.Value.(*pending).errc <- errClosed } - pending = nil return case p := <-t.addpending: p.deadline = time.Now().Add(respTimeout) - pending = append(pending, p) - rearmTimeout() + plist.PushBack(p) case r := <-t.gotreply: var matched bool - for i := 0; i < len(pending); i++ { - if p := pending[i]; p.from == r.from && p.ptype == r.ptype { + for el := plist.Front(); el != nil; el = el.Next() { + p := el.Value.(*pending) + if p.from == r.from && p.ptype == r.ptype { matched = true + // Remove the matcher if its callback indicates + // that all replies have been received. This is + // required for packet types that expect multiple + // reply packets. if p.callback(r.data) { - // callback indicates the request is done, remove it. p.errc <- nil - copy(pending[i:], pending[i+1:]) - pending = pending[:len(pending)-1] - i-- + plist.Remove(el) } } } r.matched <- matched case now := <-timeout.C: - // notify and remove callbacks whose deadline is in the past. - i := 0 - for ; i < len(pending) && now.After(pending[i].deadline); i++ { - pending[i].errc <- errTimeout - } - if i > 0 { - copy(pending, pending[i:]) - pending = pending[:len(pending)-i] + nextTimeout = nil + // Notify and remove callbacks whose deadline is in the past. + for el := plist.Front(); el != nil; el = el.Next() { + p := el.Value.(*pending) + if now.After(p.deadline) || now.Equal(p.deadline) { + p.errc <- errTimeout + plist.Remove(el) + } } - rearmTimeout() } } } @@ -382,7 +401,7 @@ const ( var ( headSpace = make([]byte, headSize) - // Neighbors responses are sent across multiple packets to + // Neighbors replies are sent across multiple packets to // stay below the 1280 byte limit. We compute the maximum number // of entries by stuffing a packet until it grows too large. maxNeighbors int |