diff options
Diffstat (limited to 'p2p/discover/udp.go')
-rw-r--r-- | p2p/discover/udp.go | 78 |
1 files changed, 47 insertions, 31 deletions
diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go index d7ca9000d..008e63937 100644 --- a/p2p/discover/udp.go +++ b/p2p/discover/udp.go @@ -18,6 +18,7 @@ package discover import ( "bytes" + "container/list" "crypto/ecdsa" "errors" "fmt" @@ -43,6 +44,7 @@ var ( errUnsolicitedReply = errors.New("unsolicited reply") errUnknownNode = errors.New("unknown node") errTimeout = errors.New("RPC timeout") + errClockWarp = errors.New("reply deadline too far in the future") errClosed = errors.New("socket closed") ) @@ -296,7 +298,7 @@ func (t *udp) pending(id NodeID, ptype byte, callback func(interface{}) bool) <- } func (t *udp) handleReply(from NodeID, ptype byte, req packet) bool { - matched := make(chan bool) + matched := make(chan bool, 1) select { case t.gotreply <- reply{from, ptype, req, matched}: // loop will handle it @@ -310,68 +312,82 @@ func (t *udp) handleReply(from NodeID, ptype byte, req packet) bool { // the refresh timer and the pending reply queue. func (t *udp) loop() { var ( - pending []*pending - nextDeadline time.Time - timeout = time.NewTimer(0) - refresh = time.NewTicker(refreshInterval) + plist = list.New() + timeout = time.NewTimer(0) + nextTimeout *pending // head of plist when timeout was last reset + refresh = time.NewTicker(refreshInterval) ) <-timeout.C // ignore first timeout defer refresh.Stop() defer timeout.Stop() - rearmTimeout := func() { - now := time.Now() - if len(pending) == 0 || now.Before(nextDeadline) { + resetTimeout := func() { + if plist.Front() == nil || nextTimeout == plist.Front().Value { return } - nextDeadline = pending[0].deadline - timeout.Reset(nextDeadline.Sub(now)) + // Start the timer so it fires when the next pending reply has expired. + now := time.Now() + for el := plist.Front(); el != nil; el = el.Next() { + nextTimeout = el.Value.(*pending) + if dist := nextTimeout.deadline.Sub(now); dist < 2*respTimeout { + timeout.Reset(dist) + return + } + // Remove pending replies whose deadline is too far in the + // future. These can occur if the system clock jumped + // backwards after the deadline was assigned. + nextTimeout.errc <- errClockWarp + plist.Remove(el) + } + nextTimeout = nil + timeout.Stop() } for { + resetTimeout() + select { case <-refresh.C: go t.refresh() case <-t.closing: - for _, p := range pending { - p.errc <- errClosed + for el := plist.Front(); el != nil; el = el.Next() { + el.Value.(*pending).errc <- errClosed } - pending = nil return case p := <-t.addpending: p.deadline = time.Now().Add(respTimeout) - pending = append(pending, p) - rearmTimeout() + plist.PushBack(p) case r := <-t.gotreply: var matched bool - for i := 0; i < len(pending); i++ { - if p := pending[i]; p.from == r.from && p.ptype == r.ptype { + for el := plist.Front(); el != nil; el = el.Next() { + p := el.Value.(*pending) + if p.from == r.from && p.ptype == r.ptype { matched = true + // Remove the matcher if its callback indicates + // that all replies have been received. This is + // required for packet types that expect multiple + // reply packets. if p.callback(r.data) { - // callback indicates the request is done, remove it. p.errc <- nil - copy(pending[i:], pending[i+1:]) - pending = pending[:len(pending)-1] - i-- + plist.Remove(el) } } } r.matched <- matched case now := <-timeout.C: - // notify and remove callbacks whose deadline is in the past. - i := 0 - for ; i < len(pending) && now.After(pending[i].deadline); i++ { - pending[i].errc <- errTimeout - } - if i > 0 { - copy(pending, pending[i:]) - pending = pending[:len(pending)-i] + nextTimeout = nil + // Notify and remove callbacks whose deadline is in the past. + for el := plist.Front(); el != nil; el = el.Next() { + p := el.Value.(*pending) + if now.After(p.deadline) || now.Equal(p.deadline) { + p.errc <- errTimeout + plist.Remove(el) + } } - rearmTimeout() } } } @@ -385,7 +401,7 @@ const ( var ( headSpace = make([]byte, headSize) - // Neighbors responses are sent across multiple packets to + // Neighbors replies are sent across multiple packets to // stay below the 1280 byte limit. We compute the maximum number // of entries by stuffing a packet until it grows too large. maxNeighbors int |