diff options
author | Felix Lange <fjl@twurst.com> | 2015-08-08 08:49:28 +0800 |
---|---|---|
committer | Felix Lange <fjl@twurst.com> | 2015-08-11 17:42:17 +0800 |
commit | 590c99a98fd4e58fd34db7e31639b240461ee532 (patch) | |
tree | 0f2890729987d609dd3fa0ab64a9dd86aa05863a /p2p/discover/udp.go | |
parent | 01ed3fa1a9414328eb6c4fc839e1b2044a786a7a (diff) | |
download | go-tangerine-590c99a98fd4e58fd34db7e31639b240461ee532.tar go-tangerine-590c99a98fd4e58fd34db7e31639b240461ee532.tar.gz go-tangerine-590c99a98fd4e58fd34db7e31639b240461ee532.tar.bz2 go-tangerine-590c99a98fd4e58fd34db7e31639b240461ee532.tar.lz go-tangerine-590c99a98fd4e58fd34db7e31639b240461ee532.tar.xz go-tangerine-590c99a98fd4e58fd34db7e31639b240461ee532.tar.zst go-tangerine-590c99a98fd4e58fd34db7e31639b240461ee532.zip |
p2p/discover: fix UDP reply packet timeout handling
If the timeout fired (even just nanoseconds) before the deadline of the
next pending reply, the timer was not rescheduled. The timer would've
been rescheduled anyway once the next packet was sent, but there were
cases where no next packet could ever be sent due to the locking issue
fixed in the previous commit.
As timing-related bugs go, this issue had been present for a long time
and I could never reproduce it. The test added in this commit did
reproduce the issue on about one out of 15 runs.
Diffstat (limited to 'p2p/discover/udp.go')
-rw-r--r-- | p2p/discover/udp.go | 78 |
1 files changed, 47 insertions, 31 deletions
diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go index d7ca9000d..008e63937 100644 --- a/p2p/discover/udp.go +++ b/p2p/discover/udp.go @@ -18,6 +18,7 @@ package discover import ( "bytes" + "container/list" "crypto/ecdsa" "errors" "fmt" @@ -43,6 +44,7 @@ var ( errUnsolicitedReply = errors.New("unsolicited reply") errUnknownNode = errors.New("unknown node") errTimeout = errors.New("RPC timeout") + errClockWarp = errors.New("reply deadline too far in the future") errClosed = errors.New("socket closed") ) @@ -296,7 +298,7 @@ func (t *udp) pending(id NodeID, ptype byte, callback func(interface{}) bool) <- } func (t *udp) handleReply(from NodeID, ptype byte, req packet) bool { - matched := make(chan bool) + matched := make(chan bool, 1) select { case t.gotreply <- reply{from, ptype, req, matched}: // loop will handle it @@ -310,68 +312,82 @@ func (t *udp) handleReply(from NodeID, ptype byte, req packet) bool { // the refresh timer and the pending reply queue. func (t *udp) loop() { var ( - pending []*pending - nextDeadline time.Time - timeout = time.NewTimer(0) - refresh = time.NewTicker(refreshInterval) + plist = list.New() + timeout = time.NewTimer(0) + nextTimeout *pending // head of plist when timeout was last reset + refresh = time.NewTicker(refreshInterval) ) <-timeout.C // ignore first timeout defer refresh.Stop() defer timeout.Stop() - rearmTimeout := func() { - now := time.Now() - if len(pending) == 0 || now.Before(nextDeadline) { + resetTimeout := func() { + if plist.Front() == nil || nextTimeout == plist.Front().Value { return } - nextDeadline = pending[0].deadline - timeout.Reset(nextDeadline.Sub(now)) + // Start the timer so it fires when the next pending reply has expired. + now := time.Now() + for el := plist.Front(); el != nil; el = el.Next() { + nextTimeout = el.Value.(*pending) + if dist := nextTimeout.deadline.Sub(now); dist < 2*respTimeout { + timeout.Reset(dist) + return + } + // Remove pending replies whose deadline is too far in the + // future. These can occur if the system clock jumped + // backwards after the deadline was assigned. + nextTimeout.errc <- errClockWarp + plist.Remove(el) + } + nextTimeout = nil + timeout.Stop() } for { + resetTimeout() + select { case <-refresh.C: go t.refresh() case <-t.closing: - for _, p := range pending { - p.errc <- errClosed + for el := plist.Front(); el != nil; el = el.Next() { + el.Value.(*pending).errc <- errClosed } - pending = nil return case p := <-t.addpending: p.deadline = time.Now().Add(respTimeout) - pending = append(pending, p) - rearmTimeout() + plist.PushBack(p) case r := <-t.gotreply: var matched bool - for i := 0; i < len(pending); i++ { - if p := pending[i]; p.from == r.from && p.ptype == r.ptype { + for el := plist.Front(); el != nil; el = el.Next() { + p := el.Value.(*pending) + if p.from == r.from && p.ptype == r.ptype { matched = true + // Remove the matcher if its callback indicates + // that all replies have been received. This is + // required for packet types that expect multiple + // reply packets. if p.callback(r.data) { - // callback indicates the request is done, remove it. p.errc <- nil - copy(pending[i:], pending[i+1:]) - pending = pending[:len(pending)-1] - i-- + plist.Remove(el) } } } r.matched <- matched case now := <-timeout.C: - // notify and remove callbacks whose deadline is in the past. - i := 0 - for ; i < len(pending) && now.After(pending[i].deadline); i++ { - pending[i].errc <- errTimeout - } - if i > 0 { - copy(pending, pending[i:]) - pending = pending[:len(pending)-i] + nextTimeout = nil + // Notify and remove callbacks whose deadline is in the past. + for el := plist.Front(); el != nil; el = el.Next() { + p := el.Value.(*pending) + if now.After(p.deadline) || now.Equal(p.deadline) { + p.errc <- errTimeout + plist.Remove(el) + } } - rearmTimeout() } } } @@ -385,7 +401,7 @@ const ( var ( headSpace = make([]byte, headSize) - // Neighbors responses are sent across multiple packets to + // Neighbors replies are sent across multiple packets to // stay below the 1280 byte limit. We compute the maximum number // of entries by stuffing a packet until it grows too large. maxNeighbors int |