aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJeffrey Wilcke <jeffrey@ethereum.org>2015-04-14 06:38:47 +0800
committerJeffrey Wilcke <jeffrey@ethereum.org>2015-04-14 06:38:47 +0800
commit2ea98d9b74ac2d66dce6eeb92c371c0237245d79 (patch)
tree6b3bc65d95754f7ec249aa6eaa61ac40db150176
parentf6f9a0d515513463d6f9656d225939385a276608 (diff)
parent0217652d1b7e8f0c1c3002837d9f1277de27ef46 (diff)
downloadgo-tangerine-2ea98d9b74ac2d66dce6eeb92c371c0237245d79.tar
go-tangerine-2ea98d9b74ac2d66dce6eeb92c371c0237245d79.tar.gz
go-tangerine-2ea98d9b74ac2d66dce6eeb92c371c0237245d79.tar.bz2
go-tangerine-2ea98d9b74ac2d66dce6eeb92c371c0237245d79.tar.lz
go-tangerine-2ea98d9b74ac2d66dce6eeb92c371c0237245d79.tar.xz
go-tangerine-2ea98d9b74ac2d66dce6eeb92c371c0237245d79.tar.zst
go-tangerine-2ea98d9b74ac2d66dce6eeb92c371c0237245d79.zip
Merge pull request #704 from fjl/p2p-concurrency-fixups
p2p: more concurrency fixups
-rw-r--r--p2p/discover/node.go15
-rw-r--r--p2p/discover/table.go1
-rw-r--r--p2p/discover/udp.go5
-rw-r--r--p2p/handshake.go2
-rw-r--r--p2p/peer.go30
-rw-r--r--p2p/server.go2
6 files changed, 14 insertions, 41 deletions
diff --git a/p2p/discover/node.go b/p2p/discover/node.go
index 99cb549a5..6662a6cb7 100644
--- a/p2p/discover/node.go
+++ b/p2p/discover/node.go
@@ -14,8 +14,6 @@ import (
"strconv"
"strings"
"sync"
- "sync/atomic"
- "time"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/secp256k1"
@@ -31,9 +29,6 @@ type Node struct {
DiscPort int // UDP listening port for discovery protocol
TCPPort int // TCP listening port for RLPx
-
- // this must be set/read using atomic load and store.
- activeStamp int64
}
func newNode(id NodeID, addr *net.UDPAddr) *Node {
@@ -50,16 +45,6 @@ func (n *Node) isValid() bool {
return !n.IP.IsMulticast() && !n.IP.IsUnspecified() && n.TCPPort != 0 && n.DiscPort != 0
}
-func (n *Node) bumpActive() {
- stamp := time.Now().Unix()
- atomic.StoreInt64(&n.activeStamp, stamp)
-}
-
-func (n *Node) active() time.Time {
- stamp := atomic.LoadInt64(&n.activeStamp)
- return time.Unix(stamp, 0)
-}
-
func (n *Node) addr() *net.UDPAddr {
return &net.UDPAddr{IP: n.IP, Port: n.DiscPort}
}
diff --git a/p2p/discover/table.go b/p2p/discover/table.go
index dbf86c084..e2e846456 100644
--- a/p2p/discover/table.go
+++ b/p2p/discover/table.go
@@ -326,7 +326,6 @@ outer:
func (b *bucket) bump(n *Node) bool {
for i := range b.entries {
if b.entries[i].ID == n.ID {
- n.bumpActive()
// move it to the front
copy(b.entries[1:], b.entries[:i])
b.entries[0] = n
diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go
index d37260e7d..61a0abed9 100644
--- a/p2p/discover/udp.go
+++ b/p2p/discover/udp.go
@@ -267,11 +267,12 @@ func (t *udp) loop() {
defer timeout.Stop()
rearmTimeout := func() {
- if len(pending) == 0 || nextDeadline == pending[0].deadline {
+ now := time.Now()
+ if len(pending) == 0 || now.Before(nextDeadline) {
return
}
nextDeadline = pending[0].deadline
- timeout.Reset(nextDeadline.Sub(time.Now()))
+ timeout.Reset(nextDeadline.Sub(now))
}
for {
diff --git a/p2p/handshake.go b/p2p/handshake.go
index 43361364f..79395f23f 100644
--- a/p2p/handshake.go
+++ b/p2p/handshake.go
@@ -115,7 +115,7 @@ func setupOutboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake,
// returning the handshake read error. If the remote side
// disconnects us early with a valid reason, we should return it
// as the error so it can be tracked elsewhere.
- werr := make(chan error)
+ werr := make(chan error, 1)
go func() { werr <- Send(rw, handshakeMsg, our) }()
rhs, err := readProtocolHandshake(rw, secrets.RemoteID, our)
if err != nil {
diff --git a/p2p/peer.go b/p2p/peer.go
index 7bc4f9cf6..1262ba64a 100644
--- a/p2p/peer.go
+++ b/p2p/peer.go
@@ -4,7 +4,6 @@ import (
"errors"
"fmt"
"io"
- "io/ioutil"
"net"
"sort"
"sync"
@@ -20,8 +19,7 @@ const (
baseProtocolLength = uint64(16)
baseProtocolMaxMsgSize = 10 * 1024 * 1024
- pingInterval = 15 * time.Second
- disconnectGracePeriod = 2 * time.Second
+ pingInterval = 15 * time.Second
)
const (
@@ -129,39 +127,27 @@ func (p *Peer) run() DiscReason {
case err := <-readErr:
if r, ok := err.(DiscReason); ok {
reason = r
- break
+ } else {
+ // Note: We rely on protocols to abort if there is a write
+ // error. It might be more robust to handle them here as well.
+ p.DebugDetailf("Read error: %v\n", err)
+ reason = DiscNetworkError
}
- // Note: We rely on protocols to abort if there is a write
- // error. It might be more robust to handle them here as well.
- p.DebugDetailf("Read error: %v\n", err)
- p.conn.Close()
- reason = DiscNetworkError
case err := <-p.protoErr:
reason = discReasonForError(err)
case reason = <-p.disc:
}
close(p.closed)
+ p.politeDisconnect(reason)
p.wg.Wait()
- if reason != DiscNetworkError {
- p.politeDisconnect(reason)
- }
p.Debugf("Disconnected: %v\n", reason)
return reason
}
func (p *Peer) politeDisconnect(reason DiscReason) {
- done := make(chan struct{})
- go func() {
+ if reason != DiscNetworkError {
SendItems(p.rw, discMsg, uint(reason))
- // Wait for the other side to close the connection.
- // Discard any data that they send until then.
- io.Copy(ioutil.Discard, p.conn)
- close(done)
- }()
- select {
- case <-done:
- case <-time.After(disconnectGracePeriod):
}
p.conn.Close()
}
diff --git a/p2p/server.go b/p2p/server.go
index 5cd3dc2ad..61e0d71e9 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -260,9 +260,11 @@ func (srv *Server) Stop() {
// No new peers can be added at this point because dialLoop and
// listenLoop are down. It is safe to call peerWG.Wait because
// peerWG.Add is not called outside of those loops.
+ srv.lock.Lock()
for _, peer := range srv.peers {
peer.Disconnect(DiscQuitting)
}
+ srv.lock.Unlock()
srv.peerWG.Wait()
}