aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/server.go')
-rw-r--r--p2p/server.go56
1 files changed, 48 insertions, 8 deletions
diff --git a/p2p/server.go b/p2p/server.go
index d7909d53a..d1d578401 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
+ "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/discv5"
@@ -130,10 +131,14 @@ type Config struct {
// If Dialer is set to a non-nil value, the given Dialer
// is used to dial outbound peer connections.
- Dialer *net.Dialer `toml:"-"`
+ Dialer NodeDialer `toml:"-"`
// If NoDial is true, the server will not dial any peers.
NoDial bool `toml:",omitempty"`
+
+ // If EnableMsgEvents is set then the server will emit PeerEvents
+ // whenever a message is sent to or received from a peer
+ EnableMsgEvents bool
}
// Server manages all peer connections.
@@ -166,6 +171,7 @@ type Server struct {
addpeer chan *conn
delpeer chan peerDrop
loopWG sync.WaitGroup // loop, listenLoop
+ peerFeed event.Feed
}
type peerOpFunc func(map[discover.NodeID]*Peer)
@@ -191,7 +197,7 @@ type conn struct {
fd net.Conn
transport
flags connFlag
- cont chan error // The run loop uses cont to signal errors to setupConn.
+ cont chan error // The run loop uses cont to signal errors to SetupConn.
id discover.NodeID // valid after the encryption handshake
caps []Cap // valid after the protocol handshake
name string // valid after the protocol handshake
@@ -291,6 +297,11 @@ func (srv *Server) RemovePeer(node *discover.Node) {
}
}
+// SubscribePeers subscribes the given channel to peer events
+func (srv *Server) SubscribeEvents(ch chan *PeerEvent) event.Subscription {
+ return srv.peerFeed.Subscribe(ch)
+}
+
// Self returns the local node's endpoint information.
func (srv *Server) Self() *discover.Node {
srv.lock.Lock()
@@ -358,7 +369,7 @@ func (srv *Server) Start() (err error) {
srv.newTransport = newRLPX
}
if srv.Dialer == nil {
- srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout}
+ srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}
}
srv.quit = make(chan struct{})
srv.addpeer = make(chan *conn)
@@ -536,7 +547,11 @@ running:
c.flags |= trustedConn
}
// TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them.
- c.cont <- srv.encHandshakeChecks(peers, c)
+ select {
+ case c.cont <- srv.encHandshakeChecks(peers, c):
+ case <-srv.quit:
+ break running
+ }
case c := <-srv.addpeer:
// At this point the connection is past the protocol handshake.
// Its capabilities are known and the remote identity is verified.
@@ -544,6 +559,11 @@ running:
if err == nil {
// The handshakes are done and it passed all checks.
p := newPeer(c, srv.Protocols)
+ // If message events are enabled, pass the peerFeed
+ // to the peer
+ if srv.EnableMsgEvents {
+ p.events = &srv.peerFeed
+ }
name := truncateName(c.name)
log.Debug("Adding p2p peer", "id", c.id, "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
peers[c.id] = p
@@ -552,7 +572,11 @@ running:
// The dialer logic relies on the assumption that
// dial tasks complete after the peer has been added or
// discarded. Unblock the task last.
- c.cont <- err
+ select {
+ case c.cont <- err:
+ case <-srv.quit:
+ break running
+ }
case pd := <-srv.delpeer:
// A peer disconnected.
d := common.PrettyDuration(mclock.Now() - pd.created)
@@ -665,16 +689,16 @@ func (srv *Server) listenLoop() {
// Spawn the handler. It will give the slot back when the connection
// has been established.
go func() {
- srv.setupConn(fd, inboundConn, nil)
+ srv.SetupConn(fd, inboundConn, nil)
slots <- struct{}{}
}()
}
}
-// setupConn runs the handshakes and attempts to add the connection
+// SetupConn runs the handshakes and attempts to add the connection
// as a peer. It returns when the connection has been added as a peer
// or the handshakes have failed.
-func (srv *Server) setupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) {
+func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) {
// Prevent leftover pending conns from entering the handshake.
srv.lock.Lock()
running := srv.running
@@ -755,7 +779,23 @@ func (srv *Server) runPeer(p *Peer) {
if srv.newPeerHook != nil {
srv.newPeerHook(p)
}
+
+ // broadcast peer add
+ srv.peerFeed.Send(&PeerEvent{
+ Type: PeerEventTypeAdd,
+ Peer: p.ID(),
+ })
+
+ // run the protocol
remoteRequested, err := p.run()
+
+ // broadcast peer drop
+ srv.peerFeed.Send(&PeerEvent{
+ Type: PeerEventTypeDrop,
+ Peer: p.ID(),
+ Error: err.Error(),
+ })
+
// Note: run waits for existing peers to be sent on srv.delpeer
// before returning, so this send should not select on srv.quit.
srv.delpeer <- peerDrop{p, err, remoteRequested}