diff options
-rw-r--r-- | p2p/discover/node.go | 4 | ||||
-rw-r--r-- | p2p/discover/table.go | 15 | ||||
-rw-r--r-- | p2p/discover/udp.go | 12 | ||||
-rw-r--r-- | p2p/server.go | 31 | ||||
-rw-r--r-- | rpc/comms/ipc.go | 8 |
5 files changed, 56 insertions, 14 deletions
diff --git a/p2p/discover/node.go b/p2p/discover/node.go index b6956e197..a14f29424 100644 --- a/p2p/discover/node.go +++ b/p2p/discover/node.go @@ -48,6 +48,10 @@ type Node struct { // In those tests, the content of sha will not actually correspond // with ID. sha common.Hash + + // whether this node is currently being pinged in order to replace + // it in a bucket + contested bool } func newNode(id NodeID, ip net.IP, udpPort, tcpPort uint16) *Node { diff --git a/p2p/discover/table.go b/p2p/discover/table.go index b077f010c..972bc1077 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -455,24 +455,31 @@ func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error { func (tab *Table) add(new *Node) { b := tab.buckets[logdist(tab.self.sha, new.sha)] tab.mutex.Lock() + defer tab.mutex.Unlock() if b.bump(new) { - tab.mutex.Unlock() return } var oldest *Node if len(b.entries) == bucketSize { oldest = b.entries[bucketSize-1] + if oldest.contested { + // The node is already being replaced, don't attempt + // to replace it. + return + } + oldest.contested = true // Let go of the mutex so other goroutines can access // the table while we ping the least recently active node. tab.mutex.Unlock() - if err := tab.ping(oldest.ID, oldest.addr()); err == nil { + err := tab.ping(oldest.ID, oldest.addr()) + tab.mutex.Lock() + oldest.contested = false + if err == nil { // The node responded, don't replace it. return } - tab.mutex.Lock() } added := b.replace(new, oldest) - tab.mutex.Unlock() if added && tab.nodeAddedHook != nil { tab.nodeAddedHook(new) } diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go index e98e8d0ba..afb31ee69 100644 --- a/p2p/discover/udp.go +++ b/p2p/discover/udp.go @@ -455,6 +455,10 @@ func encodePacket(priv *ecdsa.PrivateKey, ptype byte, req interface{}) ([]byte, return packet, nil } +type tempError interface { + Temporary() bool +} + // readLoop runs in its own goroutine. it handles incoming UDP packets. func (t *udp) readLoop() { defer t.conn.Close() @@ -464,7 +468,13 @@ func (t *udp) readLoop() { buf := make([]byte, 1280) for { nbytes, from, err := t.conn.ReadFromUDP(buf) - if err != nil { + if tempErr, ok := err.(tempError); ok && tempErr.Temporary() { + // Ignore temporary read errors. + glog.V(logger.Debug).Infof("Temporary read error: %v", err) + continue + } else if err != nil { + // Shut down the loop for permament errors. + glog.V(logger.Debug).Infof("Read error: %v", err) return } t.handlePacket(from, buf[:nbytes]) diff --git a/p2p/server.go b/p2p/server.go index ba83c5503..6060adc71 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -541,6 +541,10 @@ func (srv *Server) encHandshakeChecks(peers map[discover.NodeID]*Peer, c *conn) } } +type tempError interface { + Temporary() bool +} + // listenLoop runs in its own goroutine and accepts // inbound connections. func (srv *Server) listenLoop() { @@ -560,16 +564,31 @@ func (srv *Server) listenLoop() { } for { + // Wait for a handshake slot before accepting. <-slots - fd, err := srv.listener.Accept() - if err != nil { - return + + var ( + fd net.Conn + err error + ) + for { + fd, err = srv.listener.Accept() + if tempErr, ok := err.(tempError); ok && tempErr.Temporary() { + glog.V(logger.Debug).Infof("Temporary read error: %v", err) + continue + } else if err != nil { + glog.V(logger.Debug).Infof("Read error: %v", err) + return + } + break } - mfd := newMeteredConn(fd, true) + fd = newMeteredConn(fd, true) + glog.V(logger.Debug).Infof("Accepted conn %v\n", fd.RemoteAddr()) - glog.V(logger.Debug).Infof("Accepted conn %v\n", mfd.RemoteAddr()) + // Spawn the handler. It will give the slot back when the connection + // has been established. go func() { - srv.setupConn(mfd, inboundConn, nil) + srv.setupConn(fd, inboundConn, nil) slots <- struct{}{} }() } diff --git a/rpc/comms/ipc.go b/rpc/comms/ipc.go index e982ada13..d897bf313 100644 --- a/rpc/comms/ipc.go +++ b/rpc/comms/ipc.go @@ -44,12 +44,14 @@ func (self *ipcClient) Close() { func (self *ipcClient) Send(req interface{}) error { var err error - if err = self.coder.WriteResponse(req); err != nil { - if _, ok := err.(*net.OpError); ok { // connection lost, retry once + if r, ok := req.(*shared.Request); ok { + if err = self.coder.WriteResponse(r); err != nil { if err = self.reconnect(); err == nil { - err = self.coder.WriteResponse(req) + err = self.coder.WriteResponse(r) } } + + return err } return err } |