aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--p2p/discover/node.go4
-rw-r--r--p2p/discover/table.go15
-rw-r--r--p2p/discover/udp.go12
-rw-r--r--p2p/server.go31
-rw-r--r--rpc/comms/ipc.go8
5 files changed, 56 insertions, 14 deletions
diff --git a/p2p/discover/node.go b/p2p/discover/node.go
index b6956e197..a14f29424 100644
--- a/p2p/discover/node.go
+++ b/p2p/discover/node.go
@@ -48,6 +48,10 @@ type Node struct {
// In those tests, the content of sha will not actually correspond
// with ID.
sha common.Hash
+
+ // whether this node is currently being pinged in order to replace
+ // it in a bucket
+ contested bool
}
func newNode(id NodeID, ip net.IP, udpPort, tcpPort uint16) *Node {
diff --git a/p2p/discover/table.go b/p2p/discover/table.go
index b077f010c..972bc1077 100644
--- a/p2p/discover/table.go
+++ b/p2p/discover/table.go
@@ -455,24 +455,31 @@ func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error {
func (tab *Table) add(new *Node) {
b := tab.buckets[logdist(tab.self.sha, new.sha)]
tab.mutex.Lock()
+ defer tab.mutex.Unlock()
if b.bump(new) {
- tab.mutex.Unlock()
return
}
var oldest *Node
if len(b.entries) == bucketSize {
oldest = b.entries[bucketSize-1]
+ if oldest.contested {
+ // The node is already being replaced, don't attempt
+ // to replace it.
+ return
+ }
+ oldest.contested = true
// Let go of the mutex so other goroutines can access
// the table while we ping the least recently active node.
tab.mutex.Unlock()
- if err := tab.ping(oldest.ID, oldest.addr()); err == nil {
+ err := tab.ping(oldest.ID, oldest.addr())
+ tab.mutex.Lock()
+ oldest.contested = false
+ if err == nil {
// The node responded, don't replace it.
return
}
- tab.mutex.Lock()
}
added := b.replace(new, oldest)
- tab.mutex.Unlock()
if added && tab.nodeAddedHook != nil {
tab.nodeAddedHook(new)
}
diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go
index e98e8d0ba..afb31ee69 100644
--- a/p2p/discover/udp.go
+++ b/p2p/discover/udp.go
@@ -455,6 +455,10 @@ func encodePacket(priv *ecdsa.PrivateKey, ptype byte, req interface{}) ([]byte,
return packet, nil
}
+type tempError interface {
+ Temporary() bool
+}
+
// readLoop runs in its own goroutine. it handles incoming UDP packets.
func (t *udp) readLoop() {
defer t.conn.Close()
@@ -464,7 +468,13 @@ func (t *udp) readLoop() {
buf := make([]byte, 1280)
for {
nbytes, from, err := t.conn.ReadFromUDP(buf)
- if err != nil {
+ if tempErr, ok := err.(tempError); ok && tempErr.Temporary() {
+ // Ignore temporary read errors.
+ glog.V(logger.Debug).Infof("Temporary read error: %v", err)
+ continue
+ } else if err != nil {
+ // Shut down the loop for permament errors.
+ glog.V(logger.Debug).Infof("Read error: %v", err)
return
}
t.handlePacket(from, buf[:nbytes])
diff --git a/p2p/server.go b/p2p/server.go
index ba83c5503..6060adc71 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -541,6 +541,10 @@ func (srv *Server) encHandshakeChecks(peers map[discover.NodeID]*Peer, c *conn)
}
}
+type tempError interface {
+ Temporary() bool
+}
+
// listenLoop runs in its own goroutine and accepts
// inbound connections.
func (srv *Server) listenLoop() {
@@ -560,16 +564,31 @@ func (srv *Server) listenLoop() {
}
for {
+ // Wait for a handshake slot before accepting.
<-slots
- fd, err := srv.listener.Accept()
- if err != nil {
- return
+
+ var (
+ fd net.Conn
+ err error
+ )
+ for {
+ fd, err = srv.listener.Accept()
+ if tempErr, ok := err.(tempError); ok && tempErr.Temporary() {
+ glog.V(logger.Debug).Infof("Temporary read error: %v", err)
+ continue
+ } else if err != nil {
+ glog.V(logger.Debug).Infof("Read error: %v", err)
+ return
+ }
+ break
}
- mfd := newMeteredConn(fd, true)
+ fd = newMeteredConn(fd, true)
+ glog.V(logger.Debug).Infof("Accepted conn %v\n", fd.RemoteAddr())
- glog.V(logger.Debug).Infof("Accepted conn %v\n", mfd.RemoteAddr())
+ // Spawn the handler. It will give the slot back when the connection
+ // has been established.
go func() {
- srv.setupConn(mfd, inboundConn, nil)
+ srv.setupConn(fd, inboundConn, nil)
slots <- struct{}{}
}()
}
diff --git a/rpc/comms/ipc.go b/rpc/comms/ipc.go
index e982ada13..d897bf313 100644
--- a/rpc/comms/ipc.go
+++ b/rpc/comms/ipc.go
@@ -44,12 +44,14 @@ func (self *ipcClient) Close() {
func (self *ipcClient) Send(req interface{}) error {
var err error
- if err = self.coder.WriteResponse(req); err != nil {
- if _, ok := err.(*net.OpError); ok { // connection lost, retry once
+ if r, ok := req.(*shared.Request); ok {
+ if err = self.coder.WriteResponse(r); err != nil {
if err = self.reconnect(); err == nil {
- err = self.coder.WriteResponse(req)
+ err = self.coder.WriteResponse(r)
}
}
+
+ return err
}
return err
}