diff options
author | Jeffrey Wilcke <jeffrey@ethereum.org> | 2015-04-14 06:38:47 +0800 |
---|---|---|
committer | Jeffrey Wilcke <jeffrey@ethereum.org> | 2015-04-14 06:38:47 +0800 |
commit | 2ea98d9b74ac2d66dce6eeb92c371c0237245d79 (patch) | |
tree | 6b3bc65d95754f7ec249aa6eaa61ac40db150176 | |
parent | f6f9a0d515513463d6f9656d225939385a276608 (diff) | |
parent | 0217652d1b7e8f0c1c3002837d9f1277de27ef46 (diff) | |
download | go-tangerine-2ea98d9b74ac2d66dce6eeb92c371c0237245d79.tar go-tangerine-2ea98d9b74ac2d66dce6eeb92c371c0237245d79.tar.gz go-tangerine-2ea98d9b74ac2d66dce6eeb92c371c0237245d79.tar.bz2 go-tangerine-2ea98d9b74ac2d66dce6eeb92c371c0237245d79.tar.lz go-tangerine-2ea98d9b74ac2d66dce6eeb92c371c0237245d79.tar.xz go-tangerine-2ea98d9b74ac2d66dce6eeb92c371c0237245d79.tar.zst go-tangerine-2ea98d9b74ac2d66dce6eeb92c371c0237245d79.zip |
Merge pull request #704 from fjl/p2p-concurrency-fixups
p2p: more concurrency fixups
-rw-r--r-- | p2p/discover/node.go | 15 | ||||
-rw-r--r-- | p2p/discover/table.go | 1 | ||||
-rw-r--r-- | p2p/discover/udp.go | 5 | ||||
-rw-r--r-- | p2p/handshake.go | 2 | ||||
-rw-r--r-- | p2p/peer.go | 30 | ||||
-rw-r--r-- | p2p/server.go | 2 |
6 files changed, 14 insertions, 41 deletions
diff --git a/p2p/discover/node.go b/p2p/discover/node.go index 99cb549a5..6662a6cb7 100644 --- a/p2p/discover/node.go +++ b/p2p/discover/node.go @@ -14,8 +14,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" - "time" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto/secp256k1" @@ -31,9 +29,6 @@ type Node struct { DiscPort int // UDP listening port for discovery protocol TCPPort int // TCP listening port for RLPx - - // this must be set/read using atomic load and store. - activeStamp int64 } func newNode(id NodeID, addr *net.UDPAddr) *Node { @@ -50,16 +45,6 @@ func (n *Node) isValid() bool { return !n.IP.IsMulticast() && !n.IP.IsUnspecified() && n.TCPPort != 0 && n.DiscPort != 0 } -func (n *Node) bumpActive() { - stamp := time.Now().Unix() - atomic.StoreInt64(&n.activeStamp, stamp) -} - -func (n *Node) active() time.Time { - stamp := atomic.LoadInt64(&n.activeStamp) - return time.Unix(stamp, 0) -} - func (n *Node) addr() *net.UDPAddr { return &net.UDPAddr{IP: n.IP, Port: n.DiscPort} } diff --git a/p2p/discover/table.go b/p2p/discover/table.go index dbf86c084..e2e846456 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -326,7 +326,6 @@ outer: func (b *bucket) bump(n *Node) bool { for i := range b.entries { if b.entries[i].ID == n.ID { - n.bumpActive() // move it to the front copy(b.entries[1:], b.entries[:i]) b.entries[0] = n diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go index d37260e7d..61a0abed9 100644 --- a/p2p/discover/udp.go +++ b/p2p/discover/udp.go @@ -267,11 +267,12 @@ func (t *udp) loop() { defer timeout.Stop() rearmTimeout := func() { - if len(pending) == 0 || nextDeadline == pending[0].deadline { + now := time.Now() + if len(pending) == 0 || now.Before(nextDeadline) { return } nextDeadline = pending[0].deadline - timeout.Reset(nextDeadline.Sub(time.Now())) + timeout.Reset(nextDeadline.Sub(now)) } for { diff --git a/p2p/handshake.go b/p2p/handshake.go index 43361364f..79395f23f 100644 --- a/p2p/handshake.go +++ b/p2p/handshake.go @@ -115,7 +115,7 @@ func setupOutboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, // returning the handshake read error. If the remote side // disconnects us early with a valid reason, we should return it // as the error so it can be tracked elsewhere. - werr := make(chan error) + werr := make(chan error, 1) go func() { werr <- Send(rw, handshakeMsg, our) }() rhs, err := readProtocolHandshake(rw, secrets.RemoteID, our) if err != nil { diff --git a/p2p/peer.go b/p2p/peer.go index 7bc4f9cf6..1262ba64a 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "io" - "io/ioutil" "net" "sort" "sync" @@ -20,8 +19,7 @@ const ( baseProtocolLength = uint64(16) baseProtocolMaxMsgSize = 10 * 1024 * 1024 - pingInterval = 15 * time.Second - disconnectGracePeriod = 2 * time.Second + pingInterval = 15 * time.Second ) const ( @@ -129,39 +127,27 @@ func (p *Peer) run() DiscReason { case err := <-readErr: if r, ok := err.(DiscReason); ok { reason = r - break + } else { + // Note: We rely on protocols to abort if there is a write + // error. It might be more robust to handle them here as well. + p.DebugDetailf("Read error: %v\n", err) + reason = DiscNetworkError } - // Note: We rely on protocols to abort if there is a write - // error. It might be more robust to handle them here as well. - p.DebugDetailf("Read error: %v\n", err) - p.conn.Close() - reason = DiscNetworkError case err := <-p.protoErr: reason = discReasonForError(err) case reason = <-p.disc: } close(p.closed) + p.politeDisconnect(reason) p.wg.Wait() - if reason != DiscNetworkError { - p.politeDisconnect(reason) - } p.Debugf("Disconnected: %v\n", reason) return reason } func (p *Peer) politeDisconnect(reason DiscReason) { - done := make(chan struct{}) - go func() { + if reason != DiscNetworkError { SendItems(p.rw, discMsg, uint(reason)) - // Wait for the other side to close the connection. - // Discard any data that they send until then. - io.Copy(ioutil.Discard, p.conn) - close(done) - }() - select { - case <-done: - case <-time.After(disconnectGracePeriod): } p.conn.Close() } diff --git a/p2p/server.go b/p2p/server.go index 5cd3dc2ad..61e0d71e9 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -260,9 +260,11 @@ func (srv *Server) Stop() { // No new peers can be added at this point because dialLoop and // listenLoop are down. It is safe to call peerWG.Wait because // peerWG.Add is not called outside of those loops. + srv.lock.Lock() for _, peer := range srv.peers { peer.Disconnect(DiscQuitting) } + srv.lock.Unlock() srv.peerWG.Wait() } |