aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--p2p/discover/node.go4
-rw-r--r--p2p/discover/table.go15
-rw-r--r--p2p/discover/udp.go12
-rw-r--r--p2p/server.go31
4 files changed, 51 insertions, 11 deletions
diff --git a/p2p/discover/node.go b/p2p/discover/node.go
index b6956e197..a14f29424 100644
--- a/p2p/discover/node.go
+++ b/p2p/discover/node.go
@@ -48,6 +48,10 @@ type Node struct {
// In those tests, the content of sha will not actually correspond
// with ID.
sha common.Hash
+
+ // whether this node is currently being pinged in order to replace
+ // it in a bucket
+ contested bool
}
func newNode(id NodeID, ip net.IP, udpPort, tcpPort uint16) *Node {
diff --git a/p2p/discover/table.go b/p2p/discover/table.go
index b077f010c..972bc1077 100644
--- a/p2p/discover/table.go
+++ b/p2p/discover/table.go
@@ -455,24 +455,31 @@ func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error {
func (tab *Table) add(new *Node) {
b := tab.buckets[logdist(tab.self.sha, new.sha)]
tab.mutex.Lock()
+ defer tab.mutex.Unlock()
if b.bump(new) {
- tab.mutex.Unlock()
return
}
var oldest *Node
if len(b.entries) == bucketSize {
oldest = b.entries[bucketSize-1]
+ if oldest.contested {
+ // The node is already being replaced, don't attempt
+ // to replace it.
+ return
+ }
+ oldest.contested = true
// Let go of the mutex so other goroutines can access
// the table while we ping the least recently active node.
tab.mutex.Unlock()
- if err := tab.ping(oldest.ID, oldest.addr()); err == nil {
+ err := tab.ping(oldest.ID, oldest.addr())
+ tab.mutex.Lock()
+ oldest.contested = false
+ if err == nil {
// The node responded, don't replace it.
return
}
- tab.mutex.Lock()
}
added := b.replace(new, oldest)
- tab.mutex.Unlock()
if added && tab.nodeAddedHook != nil {
tab.nodeAddedHook(new)
}
diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go
index 008e63937..6aefb68f7 100644
--- a/p2p/discover/udp.go
+++ b/p2p/discover/udp.go
@@ -458,6 +458,10 @@ func encodePacket(priv *ecdsa.PrivateKey, ptype byte, req interface{}) ([]byte,
return packet, nil
}
+type tempError interface {
+ Temporary() bool
+}
+
// readLoop runs in its own goroutine. it handles incoming UDP packets.
func (t *udp) readLoop() {
defer t.conn.Close()
@@ -467,7 +471,13 @@ func (t *udp) readLoop() {
buf := make([]byte, 1280)
for {
nbytes, from, err := t.conn.ReadFromUDP(buf)
- if err != nil {
+ if tempErr, ok := err.(tempError); ok && tempErr.Temporary() {
+ // Ignore temporary read errors.
+ glog.V(logger.Debug).Infof("Temporary read error: %v", err)
+ continue
+ } else if err != nil {
+ // Shut down the loop for permament errors.
+ glog.V(logger.Debug).Infof("Read error: %v", err)
return
}
t.handlePacket(from, buf[:nbytes])
diff --git a/p2p/server.go b/p2p/server.go
index 7351a2654..d8be85323 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -542,6 +542,10 @@ func (srv *Server) encHandshakeChecks(peers map[discover.NodeID]*Peer, c *conn)
}
}
+type tempError interface {
+ Temporary() bool
+}
+
// listenLoop runs in its own goroutine and accepts
// inbound connections.
func (srv *Server) listenLoop() {
@@ -561,16 +565,31 @@ func (srv *Server) listenLoop() {
}
for {
+ // Wait for a handshake slot before accepting.
<-slots
- fd, err := srv.listener.Accept()
- if err != nil {
- return
+
+ var (
+ fd net.Conn
+ err error
+ )
+ for {
+ fd, err = srv.listener.Accept()
+ if tempErr, ok := err.(tempError); ok && tempErr.Temporary() {
+ glog.V(logger.Debug).Infof("Temporary read error: %v", err)
+ continue
+ } else if err != nil {
+ glog.V(logger.Debug).Infof("Read error: %v", err)
+ return
+ }
+ break
}
- mfd := newMeteredConn(fd, true)
+ fd = newMeteredConn(fd, true)
+ glog.V(logger.Debug).Infof("Accepted conn %v\n", fd.RemoteAddr())
- glog.V(logger.Debug).Infof("Accepted conn %v\n", mfd.RemoteAddr())
+ // Spawn the handler. It will give the slot back when the connection
+ // has been established.
go func() {
- srv.setupConn(mfd, inboundConn, nil)
+ srv.setupConn(fd, inboundConn, nil)
slots <- struct{}{}
}()
}