diff options
author | Lewis Marshall <lewis@lmars.net> | 2017-09-25 16:08:07 +0800 |
---|---|---|
committer | Felix Lange <fjl@users.noreply.github.com> | 2017-09-25 16:08:07 +0800 |
commit | 9feec51e2dd754819e5c730ac5985d28d57adb48 (patch) | |
tree | 32b07b659cf7d0b4c1a7da67b5c49daf7a10a9d3 /p2p/server.go | |
parent | 673007d7aed1d2678ea3277eceb7b55dc29cf092 (diff) | |
download | dexon-9feec51e2dd754819e5c730ac5985d28d57adb48.tar dexon-9feec51e2dd754819e5c730ac5985d28d57adb48.tar.gz dexon-9feec51e2dd754819e5c730ac5985d28d57adb48.tar.bz2 dexon-9feec51e2dd754819e5c730ac5985d28d57adb48.tar.lz dexon-9feec51e2dd754819e5c730ac5985d28d57adb48.tar.xz dexon-9feec51e2dd754819e5c730ac5985d28d57adb48.tar.zst dexon-9feec51e2dd754819e5c730ac5985d28d57adb48.zip |
p2p: add network simulation framework (#14982)
This commit introduces a network simulation framework which
can be used to run simulated networks of devp2p nodes. The
intention is to use this for testing protocols, performing
benchmarks and visualising emergent network behaviour.
Diffstat (limited to 'p2p/server.go')
-rw-r--r-- | p2p/server.go | 56 |
1 files changed, 48 insertions, 8 deletions
diff --git a/p2p/server.go b/p2p/server.go index d7909d53a..d1d578401 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -27,6 +27,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/discv5" @@ -130,10 +131,14 @@ type Config struct { // If Dialer is set to a non-nil value, the given Dialer // is used to dial outbound peer connections. - Dialer *net.Dialer `toml:"-"` + Dialer NodeDialer `toml:"-"` // If NoDial is true, the server will not dial any peers. NoDial bool `toml:",omitempty"` + + // If EnableMsgEvents is set then the server will emit PeerEvents + // whenever a message is sent to or received from a peer + EnableMsgEvents bool } // Server manages all peer connections. @@ -166,6 +171,7 @@ type Server struct { addpeer chan *conn delpeer chan peerDrop loopWG sync.WaitGroup // loop, listenLoop + peerFeed event.Feed } type peerOpFunc func(map[discover.NodeID]*Peer) @@ -191,7 +197,7 @@ type conn struct { fd net.Conn transport flags connFlag - cont chan error // The run loop uses cont to signal errors to setupConn. + cont chan error // The run loop uses cont to signal errors to SetupConn. id discover.NodeID // valid after the encryption handshake caps []Cap // valid after the protocol handshake name string // valid after the protocol handshake @@ -291,6 +297,11 @@ func (srv *Server) RemovePeer(node *discover.Node) { } } +// SubscribePeers subscribes the given channel to peer events +func (srv *Server) SubscribeEvents(ch chan *PeerEvent) event.Subscription { + return srv.peerFeed.Subscribe(ch) +} + // Self returns the local node's endpoint information. func (srv *Server) Self() *discover.Node { srv.lock.Lock() @@ -358,7 +369,7 @@ func (srv *Server) Start() (err error) { srv.newTransport = newRLPX } if srv.Dialer == nil { - srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout} + srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}} } srv.quit = make(chan struct{}) srv.addpeer = make(chan *conn) @@ -536,7 +547,11 @@ running: c.flags |= trustedConn } // TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them. - c.cont <- srv.encHandshakeChecks(peers, c) + select { + case c.cont <- srv.encHandshakeChecks(peers, c): + case <-srv.quit: + break running + } case c := <-srv.addpeer: // At this point the connection is past the protocol handshake. // Its capabilities are known and the remote identity is verified. @@ -544,6 +559,11 @@ running: if err == nil { // The handshakes are done and it passed all checks. p := newPeer(c, srv.Protocols) + // If message events are enabled, pass the peerFeed + // to the peer + if srv.EnableMsgEvents { + p.events = &srv.peerFeed + } name := truncateName(c.name) log.Debug("Adding p2p peer", "id", c.id, "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1) peers[c.id] = p @@ -552,7 +572,11 @@ running: // The dialer logic relies on the assumption that // dial tasks complete after the peer has been added or // discarded. Unblock the task last. - c.cont <- err + select { + case c.cont <- err: + case <-srv.quit: + break running + } case pd := <-srv.delpeer: // A peer disconnected. d := common.PrettyDuration(mclock.Now() - pd.created) @@ -665,16 +689,16 @@ func (srv *Server) listenLoop() { // Spawn the handler. It will give the slot back when the connection // has been established. go func() { - srv.setupConn(fd, inboundConn, nil) + srv.SetupConn(fd, inboundConn, nil) slots <- struct{}{} }() } } -// setupConn runs the handshakes and attempts to add the connection +// SetupConn runs the handshakes and attempts to add the connection // as a peer. It returns when the connection has been added as a peer // or the handshakes have failed. -func (srv *Server) setupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) { +func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) { // Prevent leftover pending conns from entering the handshake. srv.lock.Lock() running := srv.running @@ -755,7 +779,23 @@ func (srv *Server) runPeer(p *Peer) { if srv.newPeerHook != nil { srv.newPeerHook(p) } + + // broadcast peer add + srv.peerFeed.Send(&PeerEvent{ + Type: PeerEventTypeAdd, + Peer: p.ID(), + }) + + // run the protocol remoteRequested, err := p.run() + + // broadcast peer drop + srv.peerFeed.Send(&PeerEvent{ + Type: PeerEventTypeDrop, + Peer: p.ID(), + Error: err.Error(), + }) + // Note: run waits for existing peers to be sent on srv.delpeer // before returning, so this send should not select on srv.quit. srv.delpeer <- peerDrop{p, err, remoteRequested} |