aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/discover/udp.go
diff options
context:
space:
mode:
authorFelix Lange <fjl@twurst.com>2015-08-08 08:49:28 +0800
committerFelix Lange <fjl@twurst.com>2015-08-11 17:42:17 +0800
commit590c99a98fd4e58fd34db7e31639b240461ee532 (patch)
tree0f2890729987d609dd3fa0ab64a9dd86aa05863a /p2p/discover/udp.go
parent01ed3fa1a9414328eb6c4fc839e1b2044a786a7a (diff)
downloadgo-tangerine-590c99a98fd4e58fd34db7e31639b240461ee532.tar
go-tangerine-590c99a98fd4e58fd34db7e31639b240461ee532.tar.gz
go-tangerine-590c99a98fd4e58fd34db7e31639b240461ee532.tar.bz2
go-tangerine-590c99a98fd4e58fd34db7e31639b240461ee532.tar.lz
go-tangerine-590c99a98fd4e58fd34db7e31639b240461ee532.tar.xz
go-tangerine-590c99a98fd4e58fd34db7e31639b240461ee532.tar.zst
go-tangerine-590c99a98fd4e58fd34db7e31639b240461ee532.zip
p2p/discover: fix UDP reply packet timeout handling
If the timeout fired (even just nanoseconds) before the deadline of the next pending reply, the timer was not rescheduled. The timer would've been rescheduled anyway once the next packet was sent, but there were cases where no next packet could ever be sent due to the locking issue fixed in the previous commit. As timing-related bugs go, this issue had been present for a long time and I could never reproduce it. The test added in this commit did reproduce the issue on about one out of 15 runs.
Diffstat (limited to 'p2p/discover/udp.go')
-rw-r--r--p2p/discover/udp.go78
1 files changed, 47 insertions, 31 deletions
diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go
index d7ca9000d..008e63937 100644
--- a/p2p/discover/udp.go
+++ b/p2p/discover/udp.go
@@ -18,6 +18,7 @@ package discover
import (
"bytes"
+ "container/list"
"crypto/ecdsa"
"errors"
"fmt"
@@ -43,6 +44,7 @@ var (
errUnsolicitedReply = errors.New("unsolicited reply")
errUnknownNode = errors.New("unknown node")
errTimeout = errors.New("RPC timeout")
+ errClockWarp = errors.New("reply deadline too far in the future")
errClosed = errors.New("socket closed")
)
@@ -296,7 +298,7 @@ func (t *udp) pending(id NodeID, ptype byte, callback func(interface{}) bool) <-
}
func (t *udp) handleReply(from NodeID, ptype byte, req packet) bool {
- matched := make(chan bool)
+ matched := make(chan bool, 1)
select {
case t.gotreply <- reply{from, ptype, req, matched}:
// loop will handle it
@@ -310,68 +312,82 @@ func (t *udp) handleReply(from NodeID, ptype byte, req packet) bool {
// the refresh timer and the pending reply queue.
func (t *udp) loop() {
var (
- pending []*pending
- nextDeadline time.Time
- timeout = time.NewTimer(0)
- refresh = time.NewTicker(refreshInterval)
+ plist = list.New()
+ timeout = time.NewTimer(0)
+ nextTimeout *pending // head of plist when timeout was last reset
+ refresh = time.NewTicker(refreshInterval)
)
<-timeout.C // ignore first timeout
defer refresh.Stop()
defer timeout.Stop()
- rearmTimeout := func() {
- now := time.Now()
- if len(pending) == 0 || now.Before(nextDeadline) {
+ resetTimeout := func() {
+ if plist.Front() == nil || nextTimeout == plist.Front().Value {
return
}
- nextDeadline = pending[0].deadline
- timeout.Reset(nextDeadline.Sub(now))
+ // Start the timer so it fires when the next pending reply has expired.
+ now := time.Now()
+ for el := plist.Front(); el != nil; el = el.Next() {
+ nextTimeout = el.Value.(*pending)
+ if dist := nextTimeout.deadline.Sub(now); dist < 2*respTimeout {
+ timeout.Reset(dist)
+ return
+ }
+ // Remove pending replies whose deadline is too far in the
+ // future. These can occur if the system clock jumped
+ // backwards after the deadline was assigned.
+ nextTimeout.errc <- errClockWarp
+ plist.Remove(el)
+ }
+ nextTimeout = nil
+ timeout.Stop()
}
for {
+ resetTimeout()
+
select {
case <-refresh.C:
go t.refresh()
case <-t.closing:
- for _, p := range pending {
- p.errc <- errClosed
+ for el := plist.Front(); el != nil; el = el.Next() {
+ el.Value.(*pending).errc <- errClosed
}
- pending = nil
return
case p := <-t.addpending:
p.deadline = time.Now().Add(respTimeout)
- pending = append(pending, p)
- rearmTimeout()
+ plist.PushBack(p)
case r := <-t.gotreply:
var matched bool
- for i := 0; i < len(pending); i++ {
- if p := pending[i]; p.from == r.from && p.ptype == r.ptype {
+ for el := plist.Front(); el != nil; el = el.Next() {
+ p := el.Value.(*pending)
+ if p.from == r.from && p.ptype == r.ptype {
matched = true
+ // Remove the matcher if its callback indicates
+ // that all replies have been received. This is
+ // required for packet types that expect multiple
+ // reply packets.
if p.callback(r.data) {
- // callback indicates the request is done, remove it.
p.errc <- nil
- copy(pending[i:], pending[i+1:])
- pending = pending[:len(pending)-1]
- i--
+ plist.Remove(el)
}
}
}
r.matched <- matched
case now := <-timeout.C:
- // notify and remove callbacks whose deadline is in the past.
- i := 0
- for ; i < len(pending) && now.After(pending[i].deadline); i++ {
- pending[i].errc <- errTimeout
- }
- if i > 0 {
- copy(pending, pending[i:])
- pending = pending[:len(pending)-i]
+ nextTimeout = nil
+ // Notify and remove callbacks whose deadline is in the past.
+ for el := plist.Front(); el != nil; el = el.Next() {
+ p := el.Value.(*pending)
+ if now.After(p.deadline) || now.Equal(p.deadline) {
+ p.errc <- errTimeout
+ plist.Remove(el)
+ }
}
- rearmTimeout()
}
}
}
@@ -385,7 +401,7 @@ const (
var (
headSpace = make([]byte, headSize)
- // Neighbors responses are sent across multiple packets to
+ // Neighbors replies are sent across multiple packets to
// stay below the 1280 byte limit. We compute the maximum number
// of entries by stuffing a packet until it grows too large.
maxNeighbors int