aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/server.go
diff options
context:
space:
mode:
authorLewis Marshall <lewis@lmars.net>2017-09-25 16:08:07 +0800
committerFelix Lange <fjl@users.noreply.github.com>2017-09-25 16:08:07 +0800
commit9feec51e2dd754819e5c730ac5985d28d57adb48 (patch)
tree32b07b659cf7d0b4c1a7da67b5c49daf7a10a9d3 /p2p/server.go
parent673007d7aed1d2678ea3277eceb7b55dc29cf092 (diff)
downloaddexon-9feec51e2dd754819e5c730ac5985d28d57adb48.tar
dexon-9feec51e2dd754819e5c730ac5985d28d57adb48.tar.gz
dexon-9feec51e2dd754819e5c730ac5985d28d57adb48.tar.bz2
dexon-9feec51e2dd754819e5c730ac5985d28d57adb48.tar.lz
dexon-9feec51e2dd754819e5c730ac5985d28d57adb48.tar.xz
dexon-9feec51e2dd754819e5c730ac5985d28d57adb48.tar.zst
dexon-9feec51e2dd754819e5c730ac5985d28d57adb48.zip
p2p: add network simulation framework (#14982)
This commit introduces a network simulation framework which can be used to run simulated networks of devp2p nodes. The intention is to use this for testing protocols, performing benchmarks and visualising emergent network behaviour.
Diffstat (limited to 'p2p/server.go')
-rw-r--r--p2p/server.go56
1 files changed, 48 insertions, 8 deletions
diff --git a/p2p/server.go b/p2p/server.go
index d7909d53a..d1d578401 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
+ "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/discv5"
@@ -130,10 +131,14 @@ type Config struct {
// If Dialer is set to a non-nil value, the given Dialer
// is used to dial outbound peer connections.
- Dialer *net.Dialer `toml:"-"`
+ Dialer NodeDialer `toml:"-"`
// If NoDial is true, the server will not dial any peers.
NoDial bool `toml:",omitempty"`
+
+ // If EnableMsgEvents is set then the server will emit PeerEvents
+ // whenever a message is sent to or received from a peer
+ EnableMsgEvents bool
}
// Server manages all peer connections.
@@ -166,6 +171,7 @@ type Server struct {
addpeer chan *conn
delpeer chan peerDrop
loopWG sync.WaitGroup // loop, listenLoop
+ peerFeed event.Feed
}
type peerOpFunc func(map[discover.NodeID]*Peer)
@@ -191,7 +197,7 @@ type conn struct {
fd net.Conn
transport
flags connFlag
- cont chan error // The run loop uses cont to signal errors to setupConn.
+ cont chan error // The run loop uses cont to signal errors to SetupConn.
id discover.NodeID // valid after the encryption handshake
caps []Cap // valid after the protocol handshake
name string // valid after the protocol handshake
@@ -291,6 +297,11 @@ func (srv *Server) RemovePeer(node *discover.Node) {
}
}
+// SubscribePeers subscribes the given channel to peer events
+func (srv *Server) SubscribeEvents(ch chan *PeerEvent) event.Subscription {
+ return srv.peerFeed.Subscribe(ch)
+}
+
// Self returns the local node's endpoint information.
func (srv *Server) Self() *discover.Node {
srv.lock.Lock()
@@ -358,7 +369,7 @@ func (srv *Server) Start() (err error) {
srv.newTransport = newRLPX
}
if srv.Dialer == nil {
- srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout}
+ srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}
}
srv.quit = make(chan struct{})
srv.addpeer = make(chan *conn)
@@ -536,7 +547,11 @@ running:
c.flags |= trustedConn
}
// TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them.
- c.cont <- srv.encHandshakeChecks(peers, c)
+ select {
+ case c.cont <- srv.encHandshakeChecks(peers, c):
+ case <-srv.quit:
+ break running
+ }
case c := <-srv.addpeer:
// At this point the connection is past the protocol handshake.
// Its capabilities are known and the remote identity is verified.
@@ -544,6 +559,11 @@ running:
if err == nil {
// The handshakes are done and it passed all checks.
p := newPeer(c, srv.Protocols)
+ // If message events are enabled, pass the peerFeed
+ // to the peer
+ if srv.EnableMsgEvents {
+ p.events = &srv.peerFeed
+ }
name := truncateName(c.name)
log.Debug("Adding p2p peer", "id", c.id, "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
peers[c.id] = p
@@ -552,7 +572,11 @@ running:
// The dialer logic relies on the assumption that
// dial tasks complete after the peer has been added or
// discarded. Unblock the task last.
- c.cont <- err
+ select {
+ case c.cont <- err:
+ case <-srv.quit:
+ break running
+ }
case pd := <-srv.delpeer:
// A peer disconnected.
d := common.PrettyDuration(mclock.Now() - pd.created)
@@ -665,16 +689,16 @@ func (srv *Server) listenLoop() {
// Spawn the handler. It will give the slot back when the connection
// has been established.
go func() {
- srv.setupConn(fd, inboundConn, nil)
+ srv.SetupConn(fd, inboundConn, nil)
slots <- struct{}{}
}()
}
}
-// setupConn runs the handshakes and attempts to add the connection
+// SetupConn runs the handshakes and attempts to add the connection
// as a peer. It returns when the connection has been added as a peer
// or the handshakes have failed.
-func (srv *Server) setupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) {
+func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) {
// Prevent leftover pending conns from entering the handshake.
srv.lock.Lock()
running := srv.running
@@ -755,7 +779,23 @@ func (srv *Server) runPeer(p *Peer) {
if srv.newPeerHook != nil {
srv.newPeerHook(p)
}
+
+ // broadcast peer add
+ srv.peerFeed.Send(&PeerEvent{
+ Type: PeerEventTypeAdd,
+ Peer: p.ID(),
+ })
+
+ // run the protocol
remoteRequested, err := p.run()
+
+ // broadcast peer drop
+ srv.peerFeed.Send(&PeerEvent{
+ Type: PeerEventTypeDrop,
+ Peer: p.ID(),
+ Error: err.Error(),
+ })
+
// Note: run waits for existing peers to be sent on srv.delpeer
// before returning, so this send should not select on srv.quit.
srv.delpeer <- peerDrop{p, err, remoteRequested}