aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/discover
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/discover')
-rw-r--r--p2p/discover/node.go4
-rw-r--r--p2p/discover/table.go15
-rw-r--r--p2p/discover/udp.go12
3 files changed, 26 insertions, 5 deletions
diff --git a/p2p/discover/node.go b/p2p/discover/node.go
index b6956e197..a14f29424 100644
--- a/p2p/discover/node.go
+++ b/p2p/discover/node.go
@@ -48,6 +48,10 @@ type Node struct {
// In those tests, the content of sha will not actually correspond
// with ID.
sha common.Hash
+
+ // whether this node is currently being pinged in order to replace
+ // it in a bucket
+ contested bool
}
func newNode(id NodeID, ip net.IP, udpPort, tcpPort uint16) *Node {
diff --git a/p2p/discover/table.go b/p2p/discover/table.go
index b077f010c..972bc1077 100644
--- a/p2p/discover/table.go
+++ b/p2p/discover/table.go
@@ -455,24 +455,31 @@ func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error {
func (tab *Table) add(new *Node) {
b := tab.buckets[logdist(tab.self.sha, new.sha)]
tab.mutex.Lock()
+ defer tab.mutex.Unlock()
if b.bump(new) {
- tab.mutex.Unlock()
return
}
var oldest *Node
if len(b.entries) == bucketSize {
oldest = b.entries[bucketSize-1]
+ if oldest.contested {
+ // The node is already being replaced, don't attempt
+ // to replace it.
+ return
+ }
+ oldest.contested = true
// Let go of the mutex so other goroutines can access
// the table while we ping the least recently active node.
tab.mutex.Unlock()
- if err := tab.ping(oldest.ID, oldest.addr()); err == nil {
+ err := tab.ping(oldest.ID, oldest.addr())
+ tab.mutex.Lock()
+ oldest.contested = false
+ if err == nil {
// The node responded, don't replace it.
return
}
- tab.mutex.Lock()
}
added := b.replace(new, oldest)
- tab.mutex.Unlock()
if added && tab.nodeAddedHook != nil {
tab.nodeAddedHook(new)
}
diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go
index 008e63937..6aefb68f7 100644
--- a/p2p/discover/udp.go
+++ b/p2p/discover/udp.go
@@ -458,6 +458,10 @@ func encodePacket(priv *ecdsa.PrivateKey, ptype byte, req interface{}) ([]byte,
return packet, nil
}
+type tempError interface {
+ Temporary() bool
+}
+
// readLoop runs in its own goroutine. it handles incoming UDP packets.
func (t *udp) readLoop() {
defer t.conn.Close()
@@ -467,7 +471,13 @@ func (t *udp) readLoop() {
buf := make([]byte, 1280)
for {
nbytes, from, err := t.conn.ReadFromUDP(buf)
- if err != nil {
+ if tempErr, ok := err.(tempError); ok && tempErr.Temporary() {
+ // Ignore temporary read errors.
+ glog.V(logger.Debug).Infof("Temporary read error: %v", err)
+ continue
+ } else if err != nil {
+ // Shut down the loop for permament errors.
+ glog.V(logger.Debug).Infof("Read error: %v", err)
return
}
t.handlePacket(from, buf[:nbytes])