diff options
Diffstat (limited to 'p2p/server.go')
-rw-r--r-- | p2p/server.go | 56 |
1 files changed, 48 insertions, 8 deletions
diff --git a/p2p/server.go b/p2p/server.go index d7909d53a..d1d578401 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -27,6 +27,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/discv5" @@ -130,10 +131,14 @@ type Config struct { // If Dialer is set to a non-nil value, the given Dialer // is used to dial outbound peer connections. - Dialer *net.Dialer `toml:"-"` + Dialer NodeDialer `toml:"-"` // If NoDial is true, the server will not dial any peers. NoDial bool `toml:",omitempty"` + + // If EnableMsgEvents is set then the server will emit PeerEvents + // whenever a message is sent to or received from a peer + EnableMsgEvents bool } // Server manages all peer connections. @@ -166,6 +171,7 @@ type Server struct { addpeer chan *conn delpeer chan peerDrop loopWG sync.WaitGroup // loop, listenLoop + peerFeed event.Feed } type peerOpFunc func(map[discover.NodeID]*Peer) @@ -191,7 +197,7 @@ type conn struct { fd net.Conn transport flags connFlag - cont chan error // The run loop uses cont to signal errors to setupConn. + cont chan error // The run loop uses cont to signal errors to SetupConn. id discover.NodeID // valid after the encryption handshake caps []Cap // valid after the protocol handshake name string // valid after the protocol handshake @@ -291,6 +297,11 @@ func (srv *Server) RemovePeer(node *discover.Node) { } } +// SubscribePeers subscribes the given channel to peer events +func (srv *Server) SubscribeEvents(ch chan *PeerEvent) event.Subscription { + return srv.peerFeed.Subscribe(ch) +} + // Self returns the local node's endpoint information. func (srv *Server) Self() *discover.Node { srv.lock.Lock() @@ -358,7 +369,7 @@ func (srv *Server) Start() (err error) { srv.newTransport = newRLPX } if srv.Dialer == nil { - srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout} + srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}} } srv.quit = make(chan struct{}) srv.addpeer = make(chan *conn) @@ -536,7 +547,11 @@ running: c.flags |= trustedConn } // TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them. - c.cont <- srv.encHandshakeChecks(peers, c) + select { + case c.cont <- srv.encHandshakeChecks(peers, c): + case <-srv.quit: + break running + } case c := <-srv.addpeer: // At this point the connection is past the protocol handshake. // Its capabilities are known and the remote identity is verified. @@ -544,6 +559,11 @@ running: if err == nil { // The handshakes are done and it passed all checks. p := newPeer(c, srv.Protocols) + // If message events are enabled, pass the peerFeed + // to the peer + if srv.EnableMsgEvents { + p.events = &srv.peerFeed + } name := truncateName(c.name) log.Debug("Adding p2p peer", "id", c.id, "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1) peers[c.id] = p @@ -552,7 +572,11 @@ running: // The dialer logic relies on the assumption that // dial tasks complete after the peer has been added or // discarded. Unblock the task last. - c.cont <- err + select { + case c.cont <- err: + case <-srv.quit: + break running + } case pd := <-srv.delpeer: // A peer disconnected. d := common.PrettyDuration(mclock.Now() - pd.created) @@ -665,16 +689,16 @@ func (srv *Server) listenLoop() { // Spawn the handler. It will give the slot back when the connection // has been established. go func() { - srv.setupConn(fd, inboundConn, nil) + srv.SetupConn(fd, inboundConn, nil) slots <- struct{}{} }() } } -// setupConn runs the handshakes and attempts to add the connection +// SetupConn runs the handshakes and attempts to add the connection // as a peer. It returns when the connection has been added as a peer // or the handshakes have failed. -func (srv *Server) setupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) { +func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) { // Prevent leftover pending conns from entering the handshake. srv.lock.Lock() running := srv.running @@ -755,7 +779,23 @@ func (srv *Server) runPeer(p *Peer) { if srv.newPeerHook != nil { srv.newPeerHook(p) } + + // broadcast peer add + srv.peerFeed.Send(&PeerEvent{ + Type: PeerEventTypeAdd, + Peer: p.ID(), + }) + + // run the protocol remoteRequested, err := p.run() + + // broadcast peer drop + srv.peerFeed.Send(&PeerEvent{ + Type: PeerEventTypeDrop, + Peer: p.ID(), + Error: err.Error(), + }) + // Note: run waits for existing peers to be sent on srv.delpeer // before returning, so this send should not select on srv.quit. srv.delpeer <- peerDrop{p, err, remoteRequested} |