aboutsummaryrefslogtreecommitdiffstats
path: root/p2p
diff options
context:
space:
mode:
Diffstat (limited to 'p2p')
-rw-r--r--p2p/dial.go23
-rw-r--r--p2p/dial_test.go2
-rw-r--r--p2p/discover/node.go40
-rw-r--r--p2p/discover/node_test.go30
-rw-r--r--p2p/message.go66
-rw-r--r--p2p/peer.go42
-rw-r--r--p2p/server.go56
-rw-r--r--p2p/server_test.go2
-rw-r--r--p2p/simulations/README.md181
-rw-r--r--p2p/simulations/adapters/docker.go182
-rw-r--r--p2p/simulations/adapters/exec.go504
-rw-r--r--p2p/simulations/adapters/inproc.go314
-rw-r--r--p2p/simulations/adapters/types.go215
-rw-r--r--p2p/simulations/events.go108
-rw-r--r--p2p/simulations/examples/README.md39
-rw-r--r--p2p/simulations/examples/ping-pong.go184
-rwxr-xr-xp2p/simulations/examples/ping-pong.sh40
-rw-r--r--p2p/simulations/http.go680
-rw-r--r--p2p/simulations/http_test.go823
-rw-r--r--p2p/simulations/network.go680
-rw-r--r--p2p/simulations/network_test.go159
-rw-r--r--p2p/simulations/simulation.go157
22 files changed, 4513 insertions, 14 deletions
diff --git a/p2p/dial.go b/p2p/dial.go
index b77971396..2d9e3a0ed 100644
--- a/p2p/dial.go
+++ b/p2p/dial.go
@@ -47,6 +47,24 @@ const (
maxResolveDelay = time.Hour
)
+// NodeDialer is used to connect to nodes in the network, typically by using
+// an underlying net.Dialer but also using net.Pipe in tests
+type NodeDialer interface {
+ Dial(*discover.Node) (net.Conn, error)
+}
+
+// TCPDialer implements the NodeDialer interface by using a net.Dialer to
+// create TCP connections to nodes in the network
+type TCPDialer struct {
+ *net.Dialer
+}
+
+// Dial creates a TCP connection to the node
+func (t TCPDialer) Dial(dest *discover.Node) (net.Conn, error) {
+ addr := &net.TCPAddr{IP: dest.IP, Port: int(dest.TCP)}
+ return t.Dialer.Dial("tcp", addr.String())
+}
+
// dialstate schedules dials and discovery lookups.
// it get's a chance to compute new tasks on every iteration
// of the main loop in Server.run.
@@ -318,14 +336,13 @@ func (t *dialTask) resolve(srv *Server) bool {
// dial performs the actual connection attempt.
func (t *dialTask) dial(srv *Server, dest *discover.Node) bool {
- addr := &net.TCPAddr{IP: dest.IP, Port: int(dest.TCP)}
- fd, err := srv.Dialer.Dial("tcp", addr.String())
+ fd, err := srv.Dialer.Dial(dest)
if err != nil {
log.Trace("Dial error", "task", t, "err", err)
return false
}
mfd := newMeteredConn(fd, false)
- srv.setupConn(mfd, t.flags, dest)
+ srv.SetupConn(mfd, t.flags, dest)
return true
}
diff --git a/p2p/dial_test.go b/p2p/dial_test.go
index 08e863bae..ad18ef9ab 100644
--- a/p2p/dial_test.go
+++ b/p2p/dial_test.go
@@ -597,7 +597,7 @@ func TestDialResolve(t *testing.T) {
}
// Now run the task, it should resolve the ID once.
- config := Config{Dialer: &net.Dialer{Deadline: time.Now().Add(-5 * time.Minute)}}
+ config := Config{Dialer: TCPDialer{&net.Dialer{Deadline: time.Now().Add(-5 * time.Minute)}}}
srv := &Server{ntab: table, Config: config}
tasks[0].Do(srv)
if !reflect.DeepEqual(table.resolveCalls, []discover.NodeID{dest.ID}) {
diff --git a/p2p/discover/node.go b/p2p/discover/node.go
index d9cbd9448..fc928a91a 100644
--- a/p2p/discover/node.go
+++ b/p2p/discover/node.go
@@ -225,6 +225,11 @@ func (n *Node) UnmarshalText(text []byte) error {
// The node identifier is a marshaled elliptic curve public key.
type NodeID [NodeIDBits / 8]byte
+// Bytes returns a byte slice representation of the NodeID
+func (n NodeID) Bytes() []byte {
+ return n[:]
+}
+
// NodeID prints as a long hexadecimal number.
func (n NodeID) String() string {
return fmt.Sprintf("%x", n[:])
@@ -240,6 +245,41 @@ func (n NodeID) TerminalString() string {
return hex.EncodeToString(n[:8])
}
+// MarshalText implements the encoding.TextMarshaler interface.
+func (n NodeID) MarshalText() ([]byte, error) {
+ return []byte(hex.EncodeToString(n[:])), nil
+}
+
+// UnmarshalText implements the encoding.TextUnmarshaler interface.
+func (n *NodeID) UnmarshalText(text []byte) error {
+ id, err := HexID(string(text))
+ if err != nil {
+ return err
+ }
+ *n = id
+ return nil
+}
+
+// BytesID converts a byte slice to a NodeID
+func BytesID(b []byte) (NodeID, error) {
+ var id NodeID
+ if len(b) != len(id) {
+ return id, fmt.Errorf("wrong length, want %d bytes", len(id))
+ }
+ copy(id[:], b)
+ return id, nil
+}
+
+// MustBytesID converts a byte slice to a NodeID.
+// It panics if the byte slice is not a valid NodeID.
+func MustBytesID(b []byte) NodeID {
+ id, err := BytesID(b)
+ if err != nil {
+ panic(err)
+ }
+ return id
+}
+
// HexID converts a hex string to a NodeID.
// The string may be prefixed with 0x.
func HexID(in string) (NodeID, error) {
diff --git a/p2p/discover/node_test.go b/p2p/discover/node_test.go
index 3d1662d0b..ed8db4dc6 100644
--- a/p2p/discover/node_test.go
+++ b/p2p/discover/node_test.go
@@ -17,6 +17,7 @@
package discover
import (
+ "bytes"
"fmt"
"math/big"
"math/rand"
@@ -192,6 +193,35 @@ func TestHexID(t *testing.T) {
}
}
+func TestNodeID_textEncoding(t *testing.T) {
+ ref := NodeID{
+ 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x10,
+ 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x20,
+ 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x30,
+ 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x40,
+ 0x41, 0x42, 0x43, 0x44, 0x45, 0x46, 0x47, 0x48, 0x49, 0x50,
+ 0x51, 0x52, 0x53, 0x54, 0x55, 0x56, 0x57, 0x58, 0x59, 0x60,
+ 0x61, 0x62, 0x63, 0x64,
+ }
+ hex := "01020304050607080910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364"
+
+ text, err := ref.MarshalText()
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !bytes.Equal(text, []byte(hex)) {
+ t.Fatalf("text encoding did not match\nexpected: %s\ngot: %s", hex, text)
+ }
+
+ id := new(NodeID)
+ if err := id.UnmarshalText(text); err != nil {
+ t.Fatal(err)
+ }
+ if *id != ref {
+ t.Fatalf("text decoding did not match\nexpected: %s\ngot: %s", ref, id)
+ }
+}
+
func TestNodeID_recover(t *testing.T) {
prv := newkey()
hash := make([]byte, 32)
diff --git a/p2p/message.go b/p2p/message.go
index 1292d2121..5690494bf 100644
--- a/p2p/message.go
+++ b/p2p/message.go
@@ -27,6 +27,8 @@ import (
"sync/atomic"
"time"
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/rlp"
)
@@ -271,3 +273,67 @@ func ExpectMsg(r MsgReader, code uint64, content interface{}) error {
}
return nil
}
+
+// msgEventer wraps a MsgReadWriter and sends events whenever a message is sent
+// or received
+type msgEventer struct {
+ MsgReadWriter
+
+ feed *event.Feed
+ peerID discover.NodeID
+ Protocol string
+}
+
+// newMsgEventer returns a msgEventer which sends message events to the given
+// feed
+func newMsgEventer(rw MsgReadWriter, feed *event.Feed, peerID discover.NodeID, proto string) *msgEventer {
+ return &msgEventer{
+ MsgReadWriter: rw,
+ feed: feed,
+ peerID: peerID,
+ Protocol: proto,
+ }
+}
+
+// ReadMsg reads a message from the underlying MsgReadWriter and emits a
+// "message received" event
+func (self *msgEventer) ReadMsg() (Msg, error) {
+ msg, err := self.MsgReadWriter.ReadMsg()
+ if err != nil {
+ return msg, err
+ }
+ self.feed.Send(&PeerEvent{
+ Type: PeerEventTypeMsgRecv,
+ Peer: self.peerID,
+ Protocol: self.Protocol,
+ MsgCode: &msg.Code,
+ MsgSize: &msg.Size,
+ })
+ return msg, nil
+}
+
+// WriteMsg writes a message to the underlying MsgReadWriter and emits a
+// "message sent" event
+func (self *msgEventer) WriteMsg(msg Msg) error {
+ err := self.MsgReadWriter.WriteMsg(msg)
+ if err != nil {
+ return err
+ }
+ self.feed.Send(&PeerEvent{
+ Type: PeerEventTypeMsgSend,
+ Peer: self.peerID,
+ Protocol: self.Protocol,
+ MsgCode: &msg.Code,
+ MsgSize: &msg.Size,
+ })
+ return nil
+}
+
+// Close closes the underlying MsgReadWriter if it implements the io.Closer
+// interface
+func (self *msgEventer) Close() error {
+ if v, ok := self.MsgReadWriter.(io.Closer); ok {
+ return v.Close()
+ }
+ return nil
+}
diff --git a/p2p/peer.go b/p2p/peer.go
index fb4b39e95..ebf7490c6 100644
--- a/p2p/peer.go
+++ b/p2p/peer.go
@@ -25,6 +25,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/common/mclock"
+ "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/rlp"
@@ -60,6 +61,38 @@ type protoHandshake struct {
Rest []rlp.RawValue `rlp:"tail"`
}
+// PeerEventType is the type of peer events emitted by a p2p.Server
+type PeerEventType string
+
+const (
+ // PeerEventTypeAdd is the type of event emitted when a peer is added
+ // to a p2p.Server
+ PeerEventTypeAdd PeerEventType = "add"
+
+ // PeerEventTypeDrop is the type of event emitted when a peer is
+ // dropped from a p2p.Server
+ PeerEventTypeDrop PeerEventType = "drop"
+
+ // PeerEventTypeMsgSend is the type of event emitted when a
+ // message is successfully sent to a peer
+ PeerEventTypeMsgSend PeerEventType = "msgsend"
+
+ // PeerEventTypeMsgRecv is the type of event emitted when a
+ // message is received from a peer
+ PeerEventTypeMsgRecv PeerEventType = "msgrecv"
+)
+
+// PeerEvent is an event emitted when peers are either added or dropped from
+// a p2p.Server or when a message is sent or received on a peer connection
+type PeerEvent struct {
+ Type PeerEventType `json:"type"`
+ Peer discover.NodeID `json:"peer"`
+ Error string `json:"error,omitempty"`
+ Protocol string `json:"protocol,omitempty"`
+ MsgCode *uint64 `json:"msg_code,omitempty"`
+ MsgSize *uint32 `json:"msg_size,omitempty"`
+}
+
// Peer represents a connected remote node.
type Peer struct {
rw *conn
@@ -71,6 +104,9 @@ type Peer struct {
protoErr chan error
closed chan struct{}
disc chan DiscReason
+
+ // events receives message send / receive events if set
+ events *event.Feed
}
// NewPeer returns a peer for testing purposes.
@@ -297,9 +333,13 @@ func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error)
proto.closed = p.closed
proto.wstart = writeStart
proto.werr = writeErr
+ var rw MsgReadWriter = proto
+ if p.events != nil {
+ rw = newMsgEventer(rw, p.events, p.ID(), proto.Name)
+ }
p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version))
go func() {
- err := proto.Run(p, proto)
+ err := proto.Run(p, rw)
if err == nil {
p.log.Trace(fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version))
err = errProtocolReturned
diff --git a/p2p/server.go b/p2p/server.go
index d7909d53a..d1d578401 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
+ "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/discv5"
@@ -130,10 +131,14 @@ type Config struct {
// If Dialer is set to a non-nil value, the given Dialer
// is used to dial outbound peer connections.
- Dialer *net.Dialer `toml:"-"`
+ Dialer NodeDialer `toml:"-"`
// If NoDial is true, the server will not dial any peers.
NoDial bool `toml:",omitempty"`
+
+ // If EnableMsgEvents is set then the server will emit PeerEvents
+ // whenever a message is sent to or received from a peer
+ EnableMsgEvents bool
}
// Server manages all peer connections.
@@ -166,6 +171,7 @@ type Server struct {
addpeer chan *conn
delpeer chan peerDrop
loopWG sync.WaitGroup // loop, listenLoop
+ peerFeed event.Feed
}
type peerOpFunc func(map[discover.NodeID]*Peer)
@@ -191,7 +197,7 @@ type conn struct {
fd net.Conn
transport
flags connFlag
- cont chan error // The run loop uses cont to signal errors to setupConn.
+ cont chan error // The run loop uses cont to signal errors to SetupConn.
id discover.NodeID // valid after the encryption handshake
caps []Cap // valid after the protocol handshake
name string // valid after the protocol handshake
@@ -291,6 +297,11 @@ func (srv *Server) RemovePeer(node *discover.Node) {
}
}
+// SubscribePeers subscribes the given channel to peer events
+func (srv *Server) SubscribeEvents(ch chan *PeerEvent) event.Subscription {
+ return srv.peerFeed.Subscribe(ch)
+}
+
// Self returns the local node's endpoint information.
func (srv *Server) Self() *discover.Node {
srv.lock.Lock()
@@ -358,7 +369,7 @@ func (srv *Server) Start() (err error) {
srv.newTransport = newRLPX
}
if srv.Dialer == nil {
- srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout}
+ srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}
}
srv.quit = make(chan struct{})
srv.addpeer = make(chan *conn)
@@ -536,7 +547,11 @@ running:
c.flags |= trustedConn
}
// TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them.
- c.cont <- srv.encHandshakeChecks(peers, c)
+ select {
+ case c.cont <- srv.encHandshakeChecks(peers, c):
+ case <-srv.quit:
+ break running
+ }
case c := <-srv.addpeer:
// At this point the connection is past the protocol handshake.
// Its capabilities are known and the remote identity is verified.
@@ -544,6 +559,11 @@ running:
if err == nil {
// The handshakes are done and it passed all checks.
p := newPeer(c, srv.Protocols)
+ // If message events are enabled, pass the peerFeed
+ // to the peer
+ if srv.EnableMsgEvents {
+ p.events = &srv.peerFeed
+ }
name := truncateName(c.name)
log.Debug("Adding p2p peer", "id", c.id, "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
peers[c.id] = p
@@ -552,7 +572,11 @@ running:
// The dialer logic relies on the assumption that
// dial tasks complete after the peer has been added or
// discarded. Unblock the task last.
- c.cont <- err
+ select {
+ case c.cont <- err:
+ case <-srv.quit:
+ break running
+ }
case pd := <-srv.delpeer:
// A peer disconnected.
d := common.PrettyDuration(mclock.Now() - pd.created)
@@ -665,16 +689,16 @@ func (srv *Server) listenLoop() {
// Spawn the handler. It will give the slot back when the connection
// has been established.
go func() {
- srv.setupConn(fd, inboundConn, nil)
+ srv.SetupConn(fd, inboundConn, nil)
slots <- struct{}{}
}()
}
}
-// setupConn runs the handshakes and attempts to add the connection
+// SetupConn runs the handshakes and attempts to add the connection
// as a peer. It returns when the connection has been added as a peer
// or the handshakes have failed.
-func (srv *Server) setupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) {
+func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) {
// Prevent leftover pending conns from entering the handshake.
srv.lock.Lock()
running := srv.running
@@ -755,7 +779,23 @@ func (srv *Server) runPeer(p *Peer) {
if srv.newPeerHook != nil {
srv.newPeerHook(p)
}
+
+ // broadcast peer add
+ srv.peerFeed.Send(&PeerEvent{
+ Type: PeerEventTypeAdd,
+ Peer: p.ID(),
+ })
+
+ // run the protocol
remoteRequested, err := p.run()
+
+ // broadcast peer drop
+ srv.peerFeed.Send(&PeerEvent{
+ Type: PeerEventTypeDrop,
+ Peer: p.ID(),
+ Error: err.Error(),
+ })
+
// Note: run waits for existing peers to be sent on srv.delpeer
// before returning, so this send should not select on srv.quit.
srv.delpeer <- peerDrop{p, err, remoteRequested}
diff --git a/p2p/server_test.go b/p2p/server_test.go
index 971faf002..11dd83e5d 100644
--- a/p2p/server_test.go
+++ b/p2p/server_test.go
@@ -435,7 +435,7 @@ func TestServerSetupConn(t *testing.T) {
}
}
p1, _ := net.Pipe()
- srv.setupConn(p1, test.flags, test.dialDest)
+ srv.SetupConn(p1, test.flags, test.dialDest)
if !reflect.DeepEqual(test.tt.closeErr, test.wantCloseErr) {
t.Errorf("test %d: close error mismatch: got %q, want %q", i, test.tt.closeErr, test.wantCloseErr)
}
diff --git a/p2p/simulations/README.md b/p2p/simulations/README.md
new file mode 100644
index 000000000..d1f8649ea
--- /dev/null
+++ b/p2p/simulations/README.md
@@ -0,0 +1,181 @@
+# devp2p Simulations
+
+The `p2p/simulations` package implements a simulation framework which supports
+creating a collection of devp2p nodes, connecting them together to form a
+simulation network, performing simulation actions in that network and then
+extracting useful information.
+
+## Nodes
+
+Each node in a simulation network runs multiple services by wrapping a collection
+of objects which implement the `node.Service` interface meaning they:
+
+* can be started and stopped
+* run p2p protocols
+* expose RPC APIs
+
+This means that any object which implements the `node.Service` interface can be
+used to run a node in the simulation.
+
+## Services
+
+Before running a simulation, a set of service initializers must be registered
+which can then be used to run nodes in the network.
+
+A service initializer is a function with the following signature:
+
+```go
+func(ctx *adapters.ServiceContext) (node.Service, error)
+```
+
+These initializers should be registered by calling the `adapters.RegisterServices`
+function in an `init()` hook:
+
+```go
+func init() {
+ adapters.RegisterServices(adapters.Services{
+ "service1": initService1,
+ "service2": initService2,
+ })
+}
+```
+
+## Node Adapters
+
+The simulation framework includes multiple "node adapters" which are
+responsible for creating an environment in which a node runs.
+
+### SimAdapter
+
+The `SimAdapter` runs nodes in-memory, connecting them using an in-memory,
+synchronous `net.Pipe` and connecting to their RPC server using an in-memory
+`rpc.Client`.
+
+### ExecAdapter
+
+The `ExecAdapter` runs nodes as child processes of the running simulation.
+
+It does this by executing the binary which is running the simulation but
+setting `argv[0]` (i.e. the program name) to `p2p-node` which is then
+detected by an init hook in the child process which runs the `node.Service`
+using the devp2p node stack rather than executing `main()`.
+
+The nodes listen for devp2p connections and WebSocket RPC clients on random
+localhost ports.
+
+### DockerAdapter
+
+The `DockerAdapter` is similar to the `ExecAdapter` but executes `docker run`
+to run the node in a Docker container using a Docker image containing the
+simulation binary at `/bin/p2p-node`.
+
+The Docker image is built using `docker build` when the adapter is initialised,
+meaning no prior setup is necessary other than having a working Docker client.
+
+Each node listens on the external IP of the container and the default p2p and
+RPC ports (`30303` and `8546` respectively).
+
+## Network
+
+A simulation network is created with an ID and default service (which is used
+if a node is created without an explicit service), exposes methods for
+creating, starting, stopping, connecting and disconnecting nodes, and emits
+events when certain actions occur.
+
+### Events
+
+A simulation network emits the following events:
+
+* node event - when nodes are created / started / stopped
+* connection event - when nodes are connected / disconnected
+* message event - when a protocol message is sent between two nodes
+
+The events have a "control" flag which when set indicates that the event is the
+outcome of a controlled simulation action (e.g. creating a node or explicitly
+connecting two nodes together).
+
+This is in contrast to a non-control event, otherwise called a "live" event,
+which is the outcome of something happening in the network as a result of a
+control event (e.g. a node actually started up or a connection was actually
+established between two nodes).
+
+Live events are detected by the simulation network by subscribing to node peer
+events via RPC when the nodes start up.
+
+## Testing Framework
+
+The `Simulation` type can be used in tests to perform actions in a simulation
+network and then wait for expectations to be met.
+
+With a running simulation network, the `Simulation.Run` method can be called
+with a `Step` which has the following fields:
+
+* `Action` - a function which performs some action in the network
+
+* `Expect` - an expectation function which returns whether or not a
+ given node meets the expectation
+
+* `Trigger` - a channel which receives node IDs which then trigger a check
+ of the expectation function to be performed against that node
+
+As a concrete example, consider a simulated network of Ethereum nodes. An
+`Action` could be the sending of a transaction, `Expect` it being included in
+a block, and `Trigger` a check for every block that is mined.
+
+On return, the `Simulation.Run` method returns a `StepResult` which can be used
+to determine if all nodes met the expectation, how long it took them to meet
+the expectation and what network events were emitted during the step run.
+
+## HTTP API
+
+The simulation framework includes a HTTP API which can be used to control the
+simulation.
+
+The API is initialised with a particular node adapter and has the following
+endpoints:
+
+```
+GET / Get network information
+POST /start Start all nodes in the network
+POST /stop Stop all nodes in the network
+GET /events Stream network events
+GET /snapshot Take a network snapshot
+POST /snapshot Load a network snapshot
+POST /nodes Create a node
+GET /nodes Get all nodes in the network
+GET /nodes/:nodeid Get node information
+POST /nodes/:nodeid/start Start a node
+POST /nodes/:nodeid/stop Stop a node
+POST /nodes/:nodeid/conn/:peerid Connect two nodes
+DELETE /nodes/:nodeid/conn/:peerid Disconnect two nodes
+GET /nodes/:nodeid/rpc Make RPC requests to a node via WebSocket
+```
+
+For convenience, `nodeid` in the URL can be the name of a node rather than its
+ID.
+
+## Command line client
+
+`p2psim` is a command line client for the HTTP API, located in
+`cmd/p2psim`.
+
+It provides the following commands:
+
+```
+p2psim show
+p2psim events [--current] [--filter=FILTER]
+p2psim snapshot
+p2psim load
+p2psim node create [--name=NAME] [--services=SERVICES] [--key=KEY]
+p2psim node list
+p2psim node show <node>
+p2psim node start <node>
+p2psim node stop <node>
+p2psim node connect <node> <peer>
+p2psim node disconnect <node> <peer>
+p2psim node rpc <node> <method> [<args>] [--subscribe]
+```
+
+## Example
+
+See [p2p/simulations/examples/README.md](examples/README.md).
diff --git a/p2p/simulations/adapters/docker.go b/p2p/simulations/adapters/docker.go
new file mode 100644
index 000000000..022314b3d
--- /dev/null
+++ b/p2p/simulations/adapters/docker.go
@@ -0,0 +1,182 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package adapters
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "runtime"
+ "strings"
+
+ "github.com/docker/docker/pkg/reexec"
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+)
+
+// DockerAdapter is a NodeAdapter which runs simulation nodes inside Docker
+// containers.
+//
+// A Docker image is built which contains the current binary at /bin/p2p-node
+// which when executed runs the underlying service (see the description
+// of the execP2PNode function for more details)
+type DockerAdapter struct {
+ ExecAdapter
+}
+
+// NewDockerAdapter builds the p2p-node Docker image containing the current
+// binary and returns a DockerAdapter
+func NewDockerAdapter() (*DockerAdapter, error) {
+ // Since Docker containers run on Linux and this adapter runs the
+ // current binary in the container, it must be compiled for Linux.
+ //
+ // It is reasonable to require this because the caller can just
+ // compile the current binary in a Docker container.
+ if runtime.GOOS != "linux" {
+ return nil, errors.New("DockerAdapter can only be used on Linux as it uses the current binary (which must be a Linux binary)")
+ }
+
+ if err := buildDockerImage(); err != nil {
+ return nil, err
+ }
+
+ return &DockerAdapter{
+ ExecAdapter{
+ nodes: make(map[discover.NodeID]*ExecNode),
+ },
+ }, nil
+}
+
+// Name returns the name of the adapter for logging purposes
+func (d *DockerAdapter) Name() string {
+ return "docker-adapter"
+}
+
+// NewNode returns a new DockerNode using the given config
+func (d *DockerAdapter) NewNode(config *NodeConfig) (Node, error) {
+ if len(config.Services) == 0 {
+ return nil, errors.New("node must have at least one service")
+ }
+ for _, service := range config.Services {
+ if _, exists := serviceFuncs[service]; !exists {
+ return nil, fmt.Errorf("unknown node service %q", service)
+ }
+ }
+
+ // generate the config
+ conf := &execNodeConfig{
+ Stack: node.DefaultConfig,
+ Node: config,
+ }
+ conf.Stack.DataDir = "/data"
+ conf.Stack.WSHost = "0.0.0.0"
+ conf.Stack.WSOrigins = []string{"*"}
+ conf.Stack.WSExposeAll = true
+ conf.Stack.P2P.EnableMsgEvents = false
+ conf.Stack.P2P.NoDiscovery = true
+ conf.Stack.P2P.NAT = nil
+ conf.Stack.NoUSB = true
+
+ node := &DockerNode{
+ ExecNode: ExecNode{
+ ID: config.ID,
+ Config: conf,
+ adapter: &d.ExecAdapter,
+ },
+ }
+ node.newCmd = node.dockerCommand
+ d.ExecAdapter.nodes[node.ID] = &node.ExecNode
+ return node, nil
+}
+
+// DockerNode wraps an ExecNode but exec's the current binary in a docker
+// container rather than locally
+type DockerNode struct {
+ ExecNode
+}
+
+// dockerCommand returns a command which exec's the binary in a Docker
+// container.
+//
+// It uses a shell so that we can pass the _P2P_NODE_CONFIG environment
+// variable to the container using the --env flag.
+func (n *DockerNode) dockerCommand() *exec.Cmd {
+ return exec.Command(
+ "sh", "-c",
+ fmt.Sprintf(
+ `exec docker run --interactive --env _P2P_NODE_CONFIG="${_P2P_NODE_CONFIG}" %s p2p-node %s %s`,
+ dockerImage, strings.Join(n.Config.Node.Services, ","), n.ID.String(),
+ ),
+ )
+}
+
+// dockerImage is the name of the Docker image which gets built to run the
+// simulation node
+const dockerImage = "p2p-node"
+
+// buildDockerImage builds the Docker image which is used to run the simulation
+// node in a Docker container.
+//
+// It adds the current binary as "p2p-node" so that it runs execP2PNode
+// when executed.
+func buildDockerImage() error {
+ // create a directory to use as the build context
+ dir, err := ioutil.TempDir("", "p2p-docker")
+ if err != nil {
+ return err
+ }
+ defer os.RemoveAll(dir)
+
+ // copy the current binary into the build context
+ bin, err := os.Open(reexec.Self())
+ if err != nil {
+ return err
+ }
+ defer bin.Close()
+ dst, err := os.OpenFile(filepath.Join(dir, "self.bin"), os.O_WRONLY|os.O_CREATE, 0755)
+ if err != nil {
+ return err
+ }
+ defer dst.Close()
+ if _, err := io.Copy(dst, bin); err != nil {
+ return err
+ }
+
+ // create the Dockerfile
+ dockerfile := []byte(`
+FROM ubuntu:16.04
+RUN mkdir /data
+ADD self.bin /bin/p2p-node
+ `)
+ if err := ioutil.WriteFile(filepath.Join(dir, "Dockerfile"), dockerfile, 0644); err != nil {
+ return err
+ }
+
+ // run 'docker build'
+ cmd := exec.Command("docker", "build", "-t", dockerImage, dir)
+ cmd.Stdout = os.Stdout
+ cmd.Stderr = os.Stderr
+ if err := cmd.Run(); err != nil {
+ return fmt.Errorf("error building docker image: %s", err)
+ }
+
+ return nil
+}
diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go
new file mode 100644
index 000000000..bdb92cc1d
--- /dev/null
+++ b/p2p/simulations/adapters/exec.go
@@ -0,0 +1,504 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package adapters
+
+import (
+ "bufio"
+ "context"
+ "crypto/ecdsa"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "net"
+ "os"
+ "os/exec"
+ "os/signal"
+ "path/filepath"
+ "regexp"
+ "strings"
+ "sync"
+ "syscall"
+ "time"
+
+ "github.com/docker/docker/pkg/reexec"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/rpc"
+ "golang.org/x/net/websocket"
+)
+
+// ExecAdapter is a NodeAdapter which runs simulation nodes by executing the
+// current binary as a child process.
+//
+// An init hook is used so that the child process executes the node services
+// (rather than whataver the main() function would normally do), see the
+// execP2PNode function for more information.
+type ExecAdapter struct {
+ // BaseDir is the directory under which the data directories for each
+ // simulation node are created.
+ BaseDir string
+
+ nodes map[discover.NodeID]*ExecNode
+}
+
+// NewExecAdapter returns an ExecAdapter which stores node data in
+// subdirectories of the given base directory
+func NewExecAdapter(baseDir string) *ExecAdapter {
+ return &ExecAdapter{
+ BaseDir: baseDir,
+ nodes: make(map[discover.NodeID]*ExecNode),
+ }
+}
+
+// Name returns the name of the adapter for logging purposes
+func (e *ExecAdapter) Name() string {
+ return "exec-adapter"
+}
+
+// NewNode returns a new ExecNode using the given config
+func (e *ExecAdapter) NewNode(config *NodeConfig) (Node, error) {
+ if len(config.Services) == 0 {
+ return nil, errors.New("node must have at least one service")
+ }
+ for _, service := range config.Services {
+ if _, exists := serviceFuncs[service]; !exists {
+ return nil, fmt.Errorf("unknown node service %q", service)
+ }
+ }
+
+ // create the node directory using the first 12 characters of the ID
+ // as Unix socket paths cannot be longer than 256 characters
+ dir := filepath.Join(e.BaseDir, config.ID.String()[:12])
+ if err := os.Mkdir(dir, 0755); err != nil {
+ return nil, fmt.Errorf("error creating node directory: %s", err)
+ }
+
+ // generate the config
+ conf := &execNodeConfig{
+ Stack: node.DefaultConfig,
+ Node: config,
+ }
+ conf.Stack.DataDir = filepath.Join(dir, "data")
+ conf.Stack.WSHost = "127.0.0.1"
+ conf.Stack.WSPort = 0
+ conf.Stack.WSOrigins = []string{"*"}
+ conf.Stack.WSExposeAll = true
+ conf.Stack.P2P.EnableMsgEvents = false
+ conf.Stack.P2P.NoDiscovery = true
+ conf.Stack.P2P.NAT = nil
+ conf.Stack.NoUSB = true
+
+ // listen on a random localhost port (we'll get the actual port after
+ // starting the node through the RPC admin.nodeInfo method)
+ conf.Stack.P2P.ListenAddr = "127.0.0.1:0"
+
+ node := &ExecNode{
+ ID: config.ID,
+ Dir: dir,
+ Config: conf,
+ adapter: e,
+ }
+ node.newCmd = node.execCommand
+ e.nodes[node.ID] = node
+ return node, nil
+}
+
+// ExecNode starts a simulation node by exec'ing the current binary and
+// running the configured services
+type ExecNode struct {
+ ID discover.NodeID
+ Dir string
+ Config *execNodeConfig
+ Cmd *exec.Cmd
+ Info *p2p.NodeInfo
+
+ adapter *ExecAdapter
+ client *rpc.Client
+ wsAddr string
+ newCmd func() *exec.Cmd
+ key *ecdsa.PrivateKey
+}
+
+// Addr returns the node's enode URL
+func (n *ExecNode) Addr() []byte {
+ if n.Info == nil {
+ return nil
+ }
+ return []byte(n.Info.Enode)
+}
+
+// Client returns an rpc.Client which can be used to communicate with the
+// underlying services (it is set once the node has started)
+func (n *ExecNode) Client() (*rpc.Client, error) {
+ return n.client, nil
+}
+
+// wsAddrPattern is a regex used to read the WebSocket address from the node's
+// log
+var wsAddrPattern = regexp.MustCompile(`ws://[\d.:]+`)
+
+// Start exec's the node passing the ID and service as command line arguments
+// and the node config encoded as JSON in the _P2P_NODE_CONFIG environment
+// variable
+func (n *ExecNode) Start(snapshots map[string][]byte) (err error) {
+ if n.Cmd != nil {
+ return errors.New("already started")
+ }
+ defer func() {
+ if err != nil {
+ log.Error("node failed to start", "err", err)
+ n.Stop()
+ }
+ }()
+
+ // encode a copy of the config containing the snapshot
+ confCopy := *n.Config
+ confCopy.Snapshots = snapshots
+ confCopy.PeerAddrs = make(map[string]string)
+ for id, node := range n.adapter.nodes {
+ confCopy.PeerAddrs[id.String()] = node.wsAddr
+ }
+ confData, err := json.Marshal(confCopy)
+ if err != nil {
+ return fmt.Errorf("error generating node config: %s", err)
+ }
+
+ // use a pipe for stderr so we can both copy the node's stderr to
+ // os.Stderr and read the WebSocket address from the logs
+ stderrR, stderrW := io.Pipe()
+ stderr := io.MultiWriter(os.Stderr, stderrW)
+
+ // start the node
+ cmd := n.newCmd()
+ cmd.Stdout = os.Stdout
+ cmd.Stderr = stderr
+ cmd.Env = append(os.Environ(), fmt.Sprintf("_P2P_NODE_CONFIG=%s", confData))
+ if err := cmd.Start(); err != nil {
+ return fmt.Errorf("error starting node: %s", err)
+ }
+ n.Cmd = cmd
+
+ // read the WebSocket address from the stderr logs
+ var wsAddr string
+ wsAddrC := make(chan string)
+ go func() {
+ s := bufio.NewScanner(stderrR)
+ for s.Scan() {
+ if strings.Contains(s.Text(), "WebSocket endpoint opened:") {
+ wsAddrC <- wsAddrPattern.FindString(s.Text())
+ }
+ }
+ }()
+ select {
+ case wsAddr = <-wsAddrC:
+ if wsAddr == "" {
+ return errors.New("failed to read WebSocket address from stderr")
+ }
+ case <-time.After(10 * time.Second):
+ return errors.New("timed out waiting for WebSocket address on stderr")
+ }
+
+ // create the RPC client and load the node info
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+ client, err := rpc.DialWebsocket(ctx, wsAddr, "")
+ if err != nil {
+ return fmt.Errorf("error dialing rpc websocket: %s", err)
+ }
+ var info p2p.NodeInfo
+ if err := client.CallContext(ctx, &info, "admin_nodeInfo"); err != nil {
+ return fmt.Errorf("error getting node info: %s", err)
+ }
+ n.client = client
+ n.wsAddr = wsAddr
+ n.Info = &info
+
+ return nil
+}
+
+// execCommand returns a command which runs the node locally by exec'ing
+// the current binary but setting argv[0] to "p2p-node" so that the child
+// runs execP2PNode
+func (n *ExecNode) execCommand() *exec.Cmd {
+ return &exec.Cmd{
+ Path: reexec.Self(),
+ Args: []string{"p2p-node", strings.Join(n.Config.Node.Services, ","), n.ID.String()},
+ }
+}
+
+// Stop stops the node by first sending SIGTERM and then SIGKILL if the node
+// doesn't stop within 5s
+func (n *ExecNode) Stop() error {
+ if n.Cmd == nil {
+ return nil
+ }
+ defer func() {
+ n.Cmd = nil
+ }()
+
+ if n.client != nil {
+ n.client.Close()
+ n.client = nil
+ n.wsAddr = ""
+ n.Info = nil
+ }
+
+ if err := n.Cmd.Process.Signal(syscall.SIGTERM); err != nil {
+ return n.Cmd.Process.Kill()
+ }
+ waitErr := make(chan error)
+ go func() {
+ waitErr <- n.Cmd.Wait()
+ }()
+ select {
+ case err := <-waitErr:
+ return err
+ case <-time.After(5 * time.Second):
+ return n.Cmd.Process.Kill()
+ }
+}
+
+// NodeInfo returns information about the node
+func (n *ExecNode) NodeInfo() *p2p.NodeInfo {
+ info := &p2p.NodeInfo{
+ ID: n.ID.String(),
+ }
+ if n.client != nil {
+ n.client.Call(&info, "admin_nodeInfo")
+ }
+ return info
+}
+
+// ServeRPC serves RPC requests over the given connection by dialling the
+// node's WebSocket address and joining the two connections
+func (n *ExecNode) ServeRPC(clientConn net.Conn) error {
+ conn, err := websocket.Dial(n.wsAddr, "", "http://localhost")
+ if err != nil {
+ return err
+ }
+ var wg sync.WaitGroup
+ wg.Add(2)
+ join := func(src, dst net.Conn) {
+ defer wg.Done()
+ io.Copy(dst, src)
+ // close the write end of the destination connection
+ if cw, ok := dst.(interface {
+ CloseWrite() error
+ }); ok {
+ cw.CloseWrite()
+ } else {
+ dst.Close()
+ }
+ }
+ go join(conn, clientConn)
+ go join(clientConn, conn)
+ wg.Wait()
+ return nil
+}
+
+// Snapshots creates snapshots of the services by calling the
+// simulation_snapshot RPC method
+func (n *ExecNode) Snapshots() (map[string][]byte, error) {
+ if n.client == nil {
+ return nil, errors.New("RPC not started")
+ }
+ var snapshots map[string][]byte
+ return snapshots, n.client.Call(&snapshots, "simulation_snapshot")
+}
+
+func init() {
+ // register a reexec function to start a devp2p node when the current
+ // binary is executed as "p2p-node"
+ reexec.Register("p2p-node", execP2PNode)
+}
+
+// execNodeConfig is used to serialize the node configuration so it can be
+// passed to the child process as a JSON encoded environment variable
+type execNodeConfig struct {
+ Stack node.Config `json:"stack"`
+ Node *NodeConfig `json:"node"`
+ Snapshots map[string][]byte `json:"snapshots,omitempty"`
+ PeerAddrs map[string]string `json:"peer_addrs,omitempty"`
+}
+
+// execP2PNode starts a devp2p node when the current binary is executed with
+// argv[0] being "p2p-node", reading the service / ID from argv[1] / argv[2]
+// and the node config from the _P2P_NODE_CONFIG environment variable
+func execP2PNode() {
+ glogger := log.NewGlogHandler(log.StreamHandler(os.Stderr, log.LogfmtFormat()))
+ glogger.Verbosity(log.LvlInfo)
+ log.Root().SetHandler(glogger)
+
+ // read the services from argv
+ serviceNames := strings.Split(os.Args[1], ",")
+
+ // decode the config
+ confEnv := os.Getenv("_P2P_NODE_CONFIG")
+ if confEnv == "" {
+ log.Crit("missing _P2P_NODE_CONFIG")
+ }
+ var conf execNodeConfig
+ if err := json.Unmarshal([]byte(confEnv), &conf); err != nil {
+ log.Crit("error decoding _P2P_NODE_CONFIG", "err", err)
+ }
+ conf.Stack.P2P.PrivateKey = conf.Node.PrivateKey
+
+ // use explicit IP address in ListenAddr so that Enode URL is usable
+ externalIP := func() string {
+ addrs, err := net.InterfaceAddrs()
+ if err != nil {
+ log.Crit("error getting IP address", "err", err)
+ }
+ for _, addr := range addrs {
+ if ip, ok := addr.(*net.IPNet); ok && !ip.IP.IsLoopback() {
+ return ip.IP.String()
+ }
+ }
+ log.Crit("unable to determine explicit IP address")
+ return ""
+ }
+ if strings.HasPrefix(conf.Stack.P2P.ListenAddr, ":") {
+ conf.Stack.P2P.ListenAddr = externalIP() + conf.Stack.P2P.ListenAddr
+ }
+ if conf.Stack.WSHost == "0.0.0.0" {
+ conf.Stack.WSHost = externalIP()
+ }
+
+ // initialize the devp2p stack
+ stack, err := node.New(&conf.Stack)
+ if err != nil {
+ log.Crit("error creating node stack", "err", err)
+ }
+
+ // register the services, collecting them into a map so we can wrap
+ // them in a snapshot service
+ services := make(map[string]node.Service, len(serviceNames))
+ for _, name := range serviceNames {
+ serviceFunc, exists := serviceFuncs[name]
+ if !exists {
+ log.Crit("unknown node service", "name", name)
+ }
+ constructor := func(nodeCtx *node.ServiceContext) (node.Service, error) {
+ ctx := &ServiceContext{
+ RPCDialer: &wsRPCDialer{addrs: conf.PeerAddrs},
+ NodeContext: nodeCtx,
+ Config: conf.Node,
+ }
+ if conf.Snapshots != nil {
+ ctx.Snapshot = conf.Snapshots[name]
+ }
+ service, err := serviceFunc(ctx)
+ if err != nil {
+ return nil, err
+ }
+ services[name] = service
+ return service, nil
+ }
+ if err := stack.Register(constructor); err != nil {
+ log.Crit("error starting service", "name", name, "err", err)
+ }
+ }
+
+ // register the snapshot service
+ if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
+ return &snapshotService{services}, nil
+ }); err != nil {
+ log.Crit("error starting snapshot service", "err", err)
+ }
+
+ // start the stack
+ if err := stack.Start(); err != nil {
+ log.Crit("error stating node stack", "err", err)
+ }
+
+ // stop the stack if we get a SIGTERM signal
+ go func() {
+ sigc := make(chan os.Signal, 1)
+ signal.Notify(sigc, syscall.SIGTERM)
+ defer signal.Stop(sigc)
+ <-sigc
+ log.Info("Received SIGTERM, shutting down...")
+ stack.Stop()
+ }()
+
+ // wait for the stack to exit
+ stack.Wait()
+}
+
+// snapshotService is a node.Service which wraps a list of services and
+// exposes an API to generate a snapshot of those services
+type snapshotService struct {
+ services map[string]node.Service
+}
+
+func (s *snapshotService) APIs() []rpc.API {
+ return []rpc.API{{
+ Namespace: "simulation",
+ Version: "1.0",
+ Service: SnapshotAPI{s.services},
+ }}
+}
+
+func (s *snapshotService) Protocols() []p2p.Protocol {
+ return nil
+}
+
+func (s *snapshotService) Start(*p2p.Server) error {
+ return nil
+}
+
+func (s *snapshotService) Stop() error {
+ return nil
+}
+
+// SnapshotAPI provides an RPC method to create snapshots of services
+type SnapshotAPI struct {
+ services map[string]node.Service
+}
+
+func (api SnapshotAPI) Snapshot() (map[string][]byte, error) {
+ snapshots := make(map[string][]byte)
+ for name, service := range api.services {
+ if s, ok := service.(interface {
+ Snapshot() ([]byte, error)
+ }); ok {
+ snap, err := s.Snapshot()
+ if err != nil {
+ return nil, err
+ }
+ snapshots[name] = snap
+ }
+ }
+ return snapshots, nil
+}
+
+type wsRPCDialer struct {
+ addrs map[string]string
+}
+
+// DialRPC implements the RPCDialer interface by creating a WebSocket RPC
+// client of the given node
+func (w *wsRPCDialer) DialRPC(id discover.NodeID) (*rpc.Client, error) {
+ addr, ok := w.addrs[id.String()]
+ if !ok {
+ return nil, fmt.Errorf("unknown node: %s", id)
+ }
+ return rpc.DialWebsocket(context.Background(), addr, "http://localhost")
+}
diff --git a/p2p/simulations/adapters/inproc.go b/p2p/simulations/adapters/inproc.go
new file mode 100644
index 000000000..c97188def
--- /dev/null
+++ b/p2p/simulations/adapters/inproc.go
@@ -0,0 +1,314 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package adapters
+
+import (
+ "errors"
+ "fmt"
+ "math"
+ "net"
+ "sync"
+
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/rpc"
+)
+
+// SimAdapter is a NodeAdapter which creates in-memory simulation nodes and
+// connects them using in-memory net.Pipe connections
+type SimAdapter struct {
+ mtx sync.RWMutex
+ nodes map[discover.NodeID]*SimNode
+ services map[string]ServiceFunc
+}
+
+// NewSimAdapter creates a SimAdapter which is capable of running in-memory
+// simulation nodes running any of the given services (the services to run on a
+// particular node are passed to the NewNode function in the NodeConfig)
+func NewSimAdapter(services map[string]ServiceFunc) *SimAdapter {
+ return &SimAdapter{
+ nodes: make(map[discover.NodeID]*SimNode),
+ services: services,
+ }
+}
+
+// Name returns the name of the adapter for logging purposes
+func (s *SimAdapter) Name() string {
+ return "sim-adapter"
+}
+
+// NewNode returns a new SimNode using the given config
+func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) {
+ s.mtx.Lock()
+ defer s.mtx.Unlock()
+
+ // check a node with the ID doesn't already exist
+ id := config.ID
+ if _, exists := s.nodes[id]; exists {
+ return nil, fmt.Errorf("node already exists: %s", id)
+ }
+
+ // check the services are valid
+ if len(config.Services) == 0 {
+ return nil, errors.New("node must have at least one service")
+ }
+ for _, service := range config.Services {
+ if _, exists := s.services[service]; !exists {
+ return nil, fmt.Errorf("unknown node service %q", service)
+ }
+ }
+
+ n, err := node.New(&node.Config{
+ P2P: p2p.Config{
+ PrivateKey: config.PrivateKey,
+ MaxPeers: math.MaxInt32,
+ NoDiscovery: true,
+ Dialer: s,
+ EnableMsgEvents: true,
+ },
+ NoUSB: true,
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ simNode := &SimNode{
+ ID: id,
+ config: config,
+ node: n,
+ adapter: s,
+ running: make(map[string]node.Service),
+ }
+ s.nodes[id] = simNode
+ return simNode, nil
+}
+
+// Dial implements the p2p.NodeDialer interface by connecting to the node using
+// an in-memory net.Pipe connection
+func (s *SimAdapter) Dial(dest *discover.Node) (conn net.Conn, err error) {
+ node, ok := s.GetNode(dest.ID)
+ if !ok {
+ return nil, fmt.Errorf("unknown node: %s", dest.ID)
+ }
+ srv := node.Server()
+ if srv == nil {
+ return nil, fmt.Errorf("node not running: %s", dest.ID)
+ }
+ pipe1, pipe2 := net.Pipe()
+ go srv.SetupConn(pipe1, 0, nil)
+ return pipe2, nil
+}
+
+// DialRPC implements the RPCDialer interface by creating an in-memory RPC
+// client of the given node
+func (s *SimAdapter) DialRPC(id discover.NodeID) (*rpc.Client, error) {
+ node, ok := s.GetNode(id)
+ if !ok {
+ return nil, fmt.Errorf("unknown node: %s", id)
+ }
+ handler, err := node.node.RPCHandler()
+ if err != nil {
+ return nil, err
+ }
+ return rpc.DialInProc(handler), nil
+}
+
+// GetNode returns the node with the given ID if it exists
+func (s *SimAdapter) GetNode(id discover.NodeID) (*SimNode, bool) {
+ s.mtx.RLock()
+ defer s.mtx.RUnlock()
+ node, ok := s.nodes[id]
+ return node, ok
+}
+
+// SimNode is an in-memory simulation node which connects to other nodes using
+// an in-memory net.Pipe connection (see SimAdapter.Dial), running devp2p
+// protocols directly over that pipe
+type SimNode struct {
+ lock sync.RWMutex
+ ID discover.NodeID
+ config *NodeConfig
+ adapter *SimAdapter
+ node *node.Node
+ running map[string]node.Service
+ client *rpc.Client
+ registerOnce sync.Once
+}
+
+// Addr returns the node's discovery address
+func (self *SimNode) Addr() []byte {
+ return []byte(self.Node().String())
+}
+
+// Node returns a discover.Node representing the SimNode
+func (self *SimNode) Node() *discover.Node {
+ return discover.NewNode(self.ID, net.IP{127, 0, 0, 1}, 30303, 30303)
+}
+
+// Client returns an rpc.Client which can be used to communicate with the
+// underlying services (it is set once the node has started)
+func (self *SimNode) Client() (*rpc.Client, error) {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ if self.client == nil {
+ return nil, errors.New("node not started")
+ }
+ return self.client, nil
+}
+
+// ServeRPC serves RPC requests over the given connection by creating an
+// in-memory client to the node's RPC server
+func (self *SimNode) ServeRPC(conn net.Conn) error {
+ handler, err := self.node.RPCHandler()
+ if err != nil {
+ return err
+ }
+ handler.ServeCodec(rpc.NewJSONCodec(conn), rpc.OptionMethodInvocation|rpc.OptionSubscriptions)
+ return nil
+}
+
+// Snapshots creates snapshots of the services by calling the
+// simulation_snapshot RPC method
+func (self *SimNode) Snapshots() (map[string][]byte, error) {
+ self.lock.RLock()
+ services := make(map[string]node.Service, len(self.running))
+ for name, service := range self.running {
+ services[name] = service
+ }
+ self.lock.RUnlock()
+ if len(services) == 0 {
+ return nil, errors.New("no running services")
+ }
+ snapshots := make(map[string][]byte)
+ for name, service := range services {
+ if s, ok := service.(interface {
+ Snapshot() ([]byte, error)
+ }); ok {
+ snap, err := s.Snapshot()
+ if err != nil {
+ return nil, err
+ }
+ snapshots[name] = snap
+ }
+ }
+ return snapshots, nil
+}
+
+// Start registers the services and starts the underlying devp2p node
+func (self *SimNode) Start(snapshots map[string][]byte) error {
+ newService := func(name string) func(ctx *node.ServiceContext) (node.Service, error) {
+ return func(nodeCtx *node.ServiceContext) (node.Service, error) {
+ ctx := &ServiceContext{
+ RPCDialer: self.adapter,
+ NodeContext: nodeCtx,
+ Config: self.config,
+ }
+ if snapshots != nil {
+ ctx.Snapshot = snapshots[name]
+ }
+ serviceFunc := self.adapter.services[name]
+ service, err := serviceFunc(ctx)
+ if err != nil {
+ return nil, err
+ }
+ self.running[name] = service
+ return service, nil
+ }
+ }
+
+ // ensure we only register the services once in the case of the node
+ // being stopped and then started again
+ var regErr error
+ self.registerOnce.Do(func() {
+ for _, name := range self.config.Services {
+ if err := self.node.Register(newService(name)); err != nil {
+ regErr = err
+ return
+ }
+ }
+ })
+ if regErr != nil {
+ return regErr
+ }
+
+ if err := self.node.Start(); err != nil {
+ return err
+ }
+
+ // create an in-process RPC client
+ handler, err := self.node.RPCHandler()
+ if err != nil {
+ return err
+ }
+
+ self.lock.Lock()
+ self.client = rpc.DialInProc(handler)
+ self.lock.Unlock()
+
+ return nil
+}
+
+// Stop closes the RPC client and stops the underlying devp2p node
+func (self *SimNode) Stop() error {
+ self.lock.Lock()
+ if self.client != nil {
+ self.client.Close()
+ self.client = nil
+ }
+ self.lock.Unlock()
+ return self.node.Stop()
+}
+
+// Services returns a copy of the underlying services
+func (self *SimNode) Services() []node.Service {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ services := make([]node.Service, 0, len(self.running))
+ for _, service := range self.running {
+ services = append(services, service)
+ }
+ return services
+}
+
+// Server returns the underlying p2p.Server
+func (self *SimNode) Server() *p2p.Server {
+ return self.node.Server()
+}
+
+// SubscribeEvents subscribes the given channel to peer events from the
+// underlying p2p.Server
+func (self *SimNode) SubscribeEvents(ch chan *p2p.PeerEvent) event.Subscription {
+ srv := self.Server()
+ if srv == nil {
+ panic("node not running")
+ }
+ return srv.SubscribeEvents(ch)
+}
+
+// NodeInfo returns information about the node
+func (self *SimNode) NodeInfo() *p2p.NodeInfo {
+ server := self.Server()
+ if server == nil {
+ return &p2p.NodeInfo{
+ ID: self.ID.String(),
+ Enode: self.Node().String(),
+ }
+ }
+ return server.NodeInfo()
+}
diff --git a/p2p/simulations/adapters/types.go b/p2p/simulations/adapters/types.go
new file mode 100644
index 000000000..ed6cfc504
--- /dev/null
+++ b/p2p/simulations/adapters/types.go
@@ -0,0 +1,215 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package adapters
+
+import (
+ "crypto/ecdsa"
+ "encoding/hex"
+ "encoding/json"
+ "fmt"
+ "net"
+ "os"
+
+ "github.com/docker/docker/pkg/reexec"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/rpc"
+)
+
+// Node represents a node in a simulation network which is created by a
+// NodeAdapter, for example:
+//
+// * SimNode - An in-memory node
+// * ExecNode - A child process node
+// * DockerNode - A Docker container node
+//
+type Node interface {
+ // Addr returns the node's address (e.g. an Enode URL)
+ Addr() []byte
+
+ // Client returns the RPC client which is created once the node is
+ // up and running
+ Client() (*rpc.Client, error)
+
+ // ServeRPC serves RPC requests over the given connection
+ ServeRPC(net.Conn) error
+
+ // Start starts the node with the given snapshots
+ Start(snapshots map[string][]byte) error
+
+ // Stop stops the node
+ Stop() error
+
+ // NodeInfo returns information about the node
+ NodeInfo() *p2p.NodeInfo
+
+ // Snapshots creates snapshots of the running services
+ Snapshots() (map[string][]byte, error)
+}
+
+// NodeAdapter is used to create Nodes in a simulation network
+type NodeAdapter interface {
+ // Name returns the name of the adapter for logging purposes
+ Name() string
+
+ // NewNode creates a new node with the given configuration
+ NewNode(config *NodeConfig) (Node, error)
+}
+
+// NodeConfig is the configuration used to start a node in a simulation
+// network
+type NodeConfig struct {
+ // ID is the node's ID which is used to identify the node in the
+ // simulation network
+ ID discover.NodeID
+
+ // PrivateKey is the node's private key which is used by the devp2p
+ // stack to encrypt communications
+ PrivateKey *ecdsa.PrivateKey
+
+ // Name is a human friendly name for the node like "node01"
+ Name string
+
+ // Services are the names of the services which should be run when
+ // starting the node (for SimNodes it should be the names of services
+ // contained in SimAdapter.services, for other nodes it should be
+ // services registered by calling the RegisterService function)
+ Services []string
+}
+
+// nodeConfigJSON is used to encode and decode NodeConfig as JSON by encoding
+// all fields as strings
+type nodeConfigJSON struct {
+ ID string `json:"id"`
+ PrivateKey string `json:"private_key"`
+ Name string `json:"name"`
+ Services []string `json:"services"`
+}
+
+// MarshalJSON implements the json.Marshaler interface by encoding the config
+// fields as strings
+func (n *NodeConfig) MarshalJSON() ([]byte, error) {
+ confJSON := nodeConfigJSON{
+ ID: n.ID.String(),
+ Name: n.Name,
+ Services: n.Services,
+ }
+ if n.PrivateKey != nil {
+ confJSON.PrivateKey = hex.EncodeToString(crypto.FromECDSA(n.PrivateKey))
+ }
+ return json.Marshal(confJSON)
+}
+
+// UnmarshalJSON implements the json.Unmarshaler interface by decoding the json
+// string values into the config fields
+func (n *NodeConfig) UnmarshalJSON(data []byte) error {
+ var confJSON nodeConfigJSON
+ if err := json.Unmarshal(data, &confJSON); err != nil {
+ return err
+ }
+
+ if confJSON.ID != "" {
+ nodeID, err := discover.HexID(confJSON.ID)
+ if err != nil {
+ return err
+ }
+ n.ID = nodeID
+ }
+
+ if confJSON.PrivateKey != "" {
+ key, err := hex.DecodeString(confJSON.PrivateKey)
+ if err != nil {
+ return err
+ }
+ privKey, err := crypto.ToECDSA(key)
+ if err != nil {
+ return err
+ }
+ n.PrivateKey = privKey
+ }
+
+ n.Name = confJSON.Name
+ n.Services = confJSON.Services
+
+ return nil
+}
+
+// RandomNodeConfig returns node configuration with a randomly generated ID and
+// PrivateKey
+func RandomNodeConfig() *NodeConfig {
+ key, err := crypto.GenerateKey()
+ if err != nil {
+ panic("unable to generate key")
+ }
+ var id discover.NodeID
+ pubkey := crypto.FromECDSAPub(&key.PublicKey)
+ copy(id[:], pubkey[1:])
+ return &NodeConfig{
+ ID: id,
+ PrivateKey: key,
+ }
+}
+
+// ServiceContext is a collection of options and methods which can be utilised
+// when starting services
+type ServiceContext struct {
+ RPCDialer
+
+ NodeContext *node.ServiceContext
+ Config *NodeConfig
+ Snapshot []byte
+}
+
+// RPCDialer is used when initialising services which need to connect to
+// other nodes in the network (for example a simulated Swarm node which needs
+// to connect to a Geth node to resolve ENS names)
+type RPCDialer interface {
+ DialRPC(id discover.NodeID) (*rpc.Client, error)
+}
+
+// Services is a collection of services which can be run in a simulation
+type Services map[string]ServiceFunc
+
+// ServiceFunc returns a node.Service which can be used to boot a devp2p node
+type ServiceFunc func(ctx *ServiceContext) (node.Service, error)
+
+// serviceFuncs is a map of registered services which are used to boot devp2p
+// nodes
+var serviceFuncs = make(Services)
+
+// RegisterServices registers the given Services which can then be used to
+// start devp2p nodes using either the Exec or Docker adapters.
+//
+// It should be called in an init function so that it has the opportunity to
+// execute the services before main() is called.
+func RegisterServices(services Services) {
+ for name, f := range services {
+ if _, exists := serviceFuncs[name]; exists {
+ panic(fmt.Sprintf("node service already exists: %q", name))
+ }
+ serviceFuncs[name] = f
+ }
+
+ // now we have registered the services, run reexec.Init() which will
+ // potentially start one of the services if the current binary has
+ // been exec'd with argv[0] set to "p2p-node"
+ if reexec.Init() {
+ os.Exit(0)
+ }
+}
diff --git a/p2p/simulations/events.go b/p2p/simulations/events.go
new file mode 100644
index 000000000..f17958c68
--- /dev/null
+++ b/p2p/simulations/events.go
@@ -0,0 +1,108 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package simulations
+
+import (
+ "fmt"
+ "time"
+)
+
+// EventType is the type of event emitted by a simulation network
+type EventType string
+
+const (
+ // EventTypeNode is the type of event emitted when a node is either
+ // created, started or stopped
+ EventTypeNode EventType = "node"
+
+ // EventTypeConn is the type of event emitted when a connection is
+ // is either established or dropped between two nodes
+ EventTypeConn EventType = "conn"
+
+ // EventTypeMsg is the type of event emitted when a p2p message it
+ // sent between two nodes
+ EventTypeMsg EventType = "msg"
+)
+
+// Event is an event emitted by a simulation network
+type Event struct {
+ // Type is the type of the event
+ Type EventType `json:"type"`
+
+ // Time is the time the event happened
+ Time time.Time `json:"time"`
+
+ // Control indicates whether the event is the result of a controlled
+ // action in the network
+ Control bool `json:"control"`
+
+ // Node is set if the type is EventTypeNode
+ Node *Node `json:"node,omitempty"`
+
+ // Conn is set if the type is EventTypeConn
+ Conn *Conn `json:"conn,omitempty"`
+
+ // Msg is set if the type is EventTypeMsg
+ Msg *Msg `json:"msg,omitempty"`
+}
+
+// NewEvent creates a new event for the given object which should be either a
+// Node, Conn or Msg.
+//
+// The object is copied so that the event represents the state of the object
+// when NewEvent is called.
+func NewEvent(v interface{}) *Event {
+ event := &Event{Time: time.Now()}
+ switch v := v.(type) {
+ case *Node:
+ event.Type = EventTypeNode
+ node := *v
+ event.Node = &node
+ case *Conn:
+ event.Type = EventTypeConn
+ conn := *v
+ event.Conn = &conn
+ case *Msg:
+ event.Type = EventTypeMsg
+ msg := *v
+ event.Msg = &msg
+ default:
+ panic(fmt.Sprintf("invalid event type: %T", v))
+ }
+ return event
+}
+
+// ControlEvent creates a new control event
+func ControlEvent(v interface{}) *Event {
+ event := NewEvent(v)
+ event.Control = true
+ return event
+}
+
+// String returns the string representation of the event
+func (e *Event) String() string {
+ switch e.Type {
+ case EventTypeNode:
+ return fmt.Sprintf("<node-event> id: %s up: %t", e.Node.ID().TerminalString(), e.Node.Up)
+ case EventTypeConn:
+ return fmt.Sprintf("<conn-event> nodes: %s->%s up: %t", e.Conn.One.TerminalString(), e.Conn.Other.TerminalString(), e.Conn.Up)
+ case EventTypeMsg:
+ return fmt.Sprintf("<msg-event> nodes: %s->%s proto: %s, code: %d, received: %t", e.Msg.One.TerminalString(), e.Msg.Other.TerminalString(), e.Msg.Protocol, e.Msg.Code, e.Msg.Received)
+ default:
+ return ""
+ }
+}
diff --git a/p2p/simulations/examples/README.md b/p2p/simulations/examples/README.md
new file mode 100644
index 000000000..822a48dcb
--- /dev/null
+++ b/p2p/simulations/examples/README.md
@@ -0,0 +1,39 @@
+# devp2p simulation examples
+
+## ping-pong
+
+`ping-pong.go` implements a simulation network which contains nodes running a
+simple "ping-pong" protocol where nodes send a ping message to all their
+connected peers every 10s and receive pong messages in return.
+
+To run the simulation, run `go run ping-pong.go` in one terminal to start the
+simulation API and `./ping-pong.sh` in another to start and connect the nodes:
+
+```
+$ go run ping-pong.go
+INFO [08-15|13:53:49] using sim adapter
+INFO [08-15|13:53:49] starting simulation server on 0.0.0.0:8888...
+```
+
+```
+$ ./ping-pong.sh
+---> 13:58:12 creating 10 nodes
+Created node01
+Started node01
+...
+Created node10
+Started node10
+---> 13:58:13 connecting node01 to all other nodes
+Connected node01 to node02
+...
+Connected node01 to node10
+---> 13:58:14 done
+```
+
+Use the `--adapter` flag to choose the adapter type:
+
+```
+$ go run ping-pong.go --adapter exec
+INFO [08-15|14:01:14] using exec adapter tmpdir=/var/folders/k6/wpsgfg4n23ddbc6f5cnw5qg00000gn/T/p2p-example992833779
+INFO [08-15|14:01:14] starting simulation server on 0.0.0.0:8888...
+```
diff --git a/p2p/simulations/examples/ping-pong.go b/p2p/simulations/examples/ping-pong.go
new file mode 100644
index 000000000..6a0ead53a
--- /dev/null
+++ b/p2p/simulations/examples/ping-pong.go
@@ -0,0 +1,184 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package main
+
+import (
+ "flag"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "os"
+ "sync/atomic"
+ "time"
+
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/p2p/simulations"
+ "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
+ "github.com/ethereum/go-ethereum/rpc"
+)
+
+var adapterType = flag.String("adapter", "sim", `node adapter to use (one of "sim", "exec" or "docker")`)
+
+// main() starts a simulation network which contains nodes running a simple
+// ping-pong protocol
+func main() {
+ flag.Parse()
+
+ // set the log level to Trace
+ log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(false))))
+
+ // register a single ping-pong service
+ services := map[string]adapters.ServiceFunc{
+ "ping-pong": func(ctx *adapters.ServiceContext) (node.Service, error) {
+ return newPingPongService(ctx.Config.ID), nil
+ },
+ }
+ adapters.RegisterServices(services)
+
+ // create the NodeAdapter
+ var adapter adapters.NodeAdapter
+
+ switch *adapterType {
+
+ case "sim":
+ log.Info("using sim adapter")
+ adapter = adapters.NewSimAdapter(services)
+
+ case "exec":
+ tmpdir, err := ioutil.TempDir("", "p2p-example")
+ if err != nil {
+ log.Crit("error creating temp dir", "err", err)
+ }
+ defer os.RemoveAll(tmpdir)
+ log.Info("using exec adapter", "tmpdir", tmpdir)
+ adapter = adapters.NewExecAdapter(tmpdir)
+
+ case "docker":
+ log.Info("using docker adapter")
+ var err error
+ adapter, err = adapters.NewDockerAdapter()
+ if err != nil {
+ log.Crit("error creating docker adapter", "err", err)
+ }
+
+ default:
+ log.Crit(fmt.Sprintf("unknown node adapter %q", *adapterType))
+ }
+
+ // start the HTTP API
+ log.Info("starting simulation server on 0.0.0.0:8888...")
+ network := simulations.NewNetwork(adapter, &simulations.NetworkConfig{
+ DefaultService: "ping-pong",
+ })
+ if err := http.ListenAndServe(":8888", simulations.NewServer(network)); err != nil {
+ log.Crit("error starting simulation server", "err", err)
+ }
+}
+
+// pingPongService runs a ping-pong protocol between nodes where each node
+// sends a ping to all its connected peers every 10s and receives a pong in
+// return
+type pingPongService struct {
+ id discover.NodeID
+ log log.Logger
+ received int64
+}
+
+func newPingPongService(id discover.NodeID) *pingPongService {
+ return &pingPongService{
+ id: id,
+ log: log.New("node.id", id),
+ }
+}
+
+func (p *pingPongService) Protocols() []p2p.Protocol {
+ return []p2p.Protocol{{
+ Name: "ping-pong",
+ Version: 1,
+ Length: 2,
+ Run: p.Run,
+ NodeInfo: p.Info,
+ }}
+}
+
+func (p *pingPongService) APIs() []rpc.API {
+ return nil
+}
+
+func (p *pingPongService) Start(server *p2p.Server) error {
+ p.log.Info("ping-pong service starting")
+ return nil
+}
+
+func (p *pingPongService) Stop() error {
+ p.log.Info("ping-pong service stopping")
+ return nil
+}
+
+func (p *pingPongService) Info() interface{} {
+ return struct {
+ Received int64 `json:"received"`
+ }{
+ atomic.LoadInt64(&p.received),
+ }
+}
+
+const (
+ pingMsgCode = iota
+ pongMsgCode
+)
+
+// Run implements the ping-pong protocol which sends ping messages to the peer
+// at 10s intervals, and responds to pings with pong messages.
+func (p *pingPongService) Run(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
+ log := p.log.New("peer.id", peer.ID())
+
+ errC := make(chan error)
+ go func() {
+ for range time.Tick(10 * time.Second) {
+ log.Info("sending ping")
+ if err := p2p.Send(rw, pingMsgCode, "PING"); err != nil {
+ errC <- err
+ return
+ }
+ }
+ }()
+ go func() {
+ for {
+ msg, err := rw.ReadMsg()
+ if err != nil {
+ errC <- err
+ return
+ }
+ payload, err := ioutil.ReadAll(msg.Payload)
+ if err != nil {
+ errC <- err
+ return
+ }
+ log.Info("received message", "msg.code", msg.Code, "msg.payload", string(payload))
+ atomic.AddInt64(&p.received, 1)
+ if msg.Code == pingMsgCode {
+ log.Info("sending pong")
+ go p2p.Send(rw, pongMsgCode, "PONG")
+ }
+ }
+ }()
+ return <-errC
+}
diff --git a/p2p/simulations/examples/ping-pong.sh b/p2p/simulations/examples/ping-pong.sh
new file mode 100755
index 000000000..47936bd9a
--- /dev/null
+++ b/p2p/simulations/examples/ping-pong.sh
@@ -0,0 +1,40 @@
+#!/bin/bash
+#
+# Boot a ping-pong network simulation using the HTTP API started by ping-pong.go
+
+set -e
+
+main() {
+ if ! which p2psim &>/dev/null; then
+ fail "missing p2psim binary (you need to build cmd/p2psim and put it in \$PATH)"
+ fi
+
+ info "creating 10 nodes"
+ for i in $(seq 1 10); do
+ p2psim node create --name "$(node_name $i)"
+ p2psim node start "$(node_name $i)"
+ done
+
+ info "connecting node01 to all other nodes"
+ for i in $(seq 2 10); do
+ p2psim node connect "node01" "$(node_name $i)"
+ done
+
+ info "done"
+}
+
+node_name() {
+ local num=$1
+ echo "node$(printf '%02d' $num)"
+}
+
+info() {
+ echo -e "\033[1;32m---> $(date +%H:%M:%S) ${@}\033[0m"
+}
+
+fail() {
+ echo -e "\033[1;31mERROR: ${@}\033[0m" >&2
+ exit 1
+}
+
+main "$@"
diff --git a/p2p/simulations/http.go b/p2p/simulations/http.go
new file mode 100644
index 000000000..3fa8b9292
--- /dev/null
+++ b/p2p/simulations/http.go
@@ -0,0 +1,680 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package simulations
+
+import (
+ "bufio"
+ "bytes"
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "strconv"
+ "strings"
+
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
+ "github.com/ethereum/go-ethereum/rpc"
+ "github.com/julienschmidt/httprouter"
+ "golang.org/x/net/websocket"
+)
+
+// DefaultClient is the default simulation API client which expects the API
+// to be running at http://localhost:8888
+var DefaultClient = NewClient("http://localhost:8888")
+
+// Client is a client for the simulation HTTP API which supports creating
+// and managing simulation networks
+type Client struct {
+ URL string
+
+ client *http.Client
+}
+
+// NewClient returns a new simulation API client
+func NewClient(url string) *Client {
+ return &Client{
+ URL: url,
+ client: http.DefaultClient,
+ }
+}
+
+// GetNetwork returns details of the network
+func (c *Client) GetNetwork() (*Network, error) {
+ network := &Network{}
+ return network, c.Get("/", network)
+}
+
+// StartNetwork starts all existing nodes in the simulation network
+func (c *Client) StartNetwork() error {
+ return c.Post("/start", nil, nil)
+}
+
+// StopNetwork stops all existing nodes in a simulation network
+func (c *Client) StopNetwork() error {
+ return c.Post("/stop", nil, nil)
+}
+
+// CreateSnapshot creates a network snapshot
+func (c *Client) CreateSnapshot() (*Snapshot, error) {
+ snap := &Snapshot{}
+ return snap, c.Get("/snapshot", snap)
+}
+
+// LoadSnapshot loads a snapshot into the network
+func (c *Client) LoadSnapshot(snap *Snapshot) error {
+ return c.Post("/snapshot", snap, nil)
+}
+
+// SubscribeOpts is a collection of options to use when subscribing to network
+// events
+type SubscribeOpts struct {
+ // Current instructs the server to send events for existing nodes and
+ // connections first
+ Current bool
+
+ // Filter instructs the server to only send a subset of message events
+ Filter string
+}
+
+// SubscribeNetwork subscribes to network events which are sent from the server
+// as a server-sent-events stream, optionally receiving events for existing
+// nodes and connections and filtering message events
+func (c *Client) SubscribeNetwork(events chan *Event, opts SubscribeOpts) (event.Subscription, error) {
+ url := fmt.Sprintf("%s/events?current=%t&filter=%s", c.URL, opts.Current, opts.Filter)
+ req, err := http.NewRequest("GET", url, nil)
+ if err != nil {
+ return nil, err
+ }
+ req.Header.Set("Accept", "text/event-stream")
+ res, err := c.client.Do(req)
+ if err != nil {
+ return nil, err
+ }
+ if res.StatusCode != http.StatusOK {
+ response, _ := ioutil.ReadAll(res.Body)
+ res.Body.Close()
+ return nil, fmt.Errorf("unexpected HTTP status: %s: %s", res.Status, response)
+ }
+
+ // define a producer function to pass to event.Subscription
+ // which reads server-sent events from res.Body and sends
+ // them to the events channel
+ producer := func(stop <-chan struct{}) error {
+ defer res.Body.Close()
+
+ // read lines from res.Body in a goroutine so that we are
+ // always reading from the stop channel
+ lines := make(chan string)
+ errC := make(chan error, 1)
+ go func() {
+ s := bufio.NewScanner(res.Body)
+ for s.Scan() {
+ select {
+ case lines <- s.Text():
+ case <-stop:
+ return
+ }
+ }
+ errC <- s.Err()
+ }()
+
+ // detect any lines which start with "data:", decode the data
+ // into an event and send it to the events channel
+ for {
+ select {
+ case line := <-lines:
+ if !strings.HasPrefix(line, "data:") {
+ continue
+ }
+ data := strings.TrimSpace(strings.TrimPrefix(line, "data:"))
+ event := &Event{}
+ if err := json.Unmarshal([]byte(data), event); err != nil {
+ return fmt.Errorf("error decoding SSE event: %s", err)
+ }
+ select {
+ case events <- event:
+ case <-stop:
+ return nil
+ }
+ case err := <-errC:
+ return err
+ case <-stop:
+ return nil
+ }
+ }
+ }
+
+ return event.NewSubscription(producer), nil
+}
+
+// GetNodes returns all nodes which exist in the network
+func (c *Client) GetNodes() ([]*p2p.NodeInfo, error) {
+ var nodes []*p2p.NodeInfo
+ return nodes, c.Get("/nodes", &nodes)
+}
+
+// CreateNode creates a node in the network using the given configuration
+func (c *Client) CreateNode(config *adapters.NodeConfig) (*p2p.NodeInfo, error) {
+ node := &p2p.NodeInfo{}
+ return node, c.Post("/nodes", config, node)
+}
+
+// GetNode returns details of a node
+func (c *Client) GetNode(nodeID string) (*p2p.NodeInfo, error) {
+ node := &p2p.NodeInfo{}
+ return node, c.Get(fmt.Sprintf("/nodes/%s", nodeID), node)
+}
+
+// StartNode starts a node
+func (c *Client) StartNode(nodeID string) error {
+ return c.Post(fmt.Sprintf("/nodes/%s/start", nodeID), nil, nil)
+}
+
+// StopNode stops a node
+func (c *Client) StopNode(nodeID string) error {
+ return c.Post(fmt.Sprintf("/nodes/%s/stop", nodeID), nil, nil)
+}
+
+// ConnectNode connects a node to a peer node
+func (c *Client) ConnectNode(nodeID, peerID string) error {
+ return c.Post(fmt.Sprintf("/nodes/%s/conn/%s", nodeID, peerID), nil, nil)
+}
+
+// DisconnectNode disconnects a node from a peer node
+func (c *Client) DisconnectNode(nodeID, peerID string) error {
+ return c.Delete(fmt.Sprintf("/nodes/%s/conn/%s", nodeID, peerID))
+}
+
+// RPCClient returns an RPC client connected to a node
+func (c *Client) RPCClient(ctx context.Context, nodeID string) (*rpc.Client, error) {
+ baseURL := strings.Replace(c.URL, "http", "ws", 1)
+ return rpc.DialWebsocket(ctx, fmt.Sprintf("%s/nodes/%s/rpc", baseURL, nodeID), "")
+}
+
+// Get performs a HTTP GET request decoding the resulting JSON response
+// into "out"
+func (c *Client) Get(path string, out interface{}) error {
+ return c.Send("GET", path, nil, out)
+}
+
+// Post performs a HTTP POST request sending "in" as the JSON body and
+// decoding the resulting JSON response into "out"
+func (c *Client) Post(path string, in, out interface{}) error {
+ return c.Send("POST", path, in, out)
+}
+
+// Delete performs a HTTP DELETE request
+func (c *Client) Delete(path string) error {
+ return c.Send("DELETE", path, nil, nil)
+}
+
+// Send performs a HTTP request, sending "in" as the JSON request body and
+// decoding the JSON response into "out"
+func (c *Client) Send(method, path string, in, out interface{}) error {
+ var body []byte
+ if in != nil {
+ var err error
+ body, err = json.Marshal(in)
+ if err != nil {
+ return err
+ }
+ }
+ req, err := http.NewRequest(method, c.URL+path, bytes.NewReader(body))
+ if err != nil {
+ return err
+ }
+ req.Header.Set("Content-Type", "application/json")
+ req.Header.Set("Accept", "application/json")
+ res, err := c.client.Do(req)
+ if err != nil {
+ return err
+ }
+ defer res.Body.Close()
+ if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusCreated {
+ response, _ := ioutil.ReadAll(res.Body)
+ return fmt.Errorf("unexpected HTTP status: %s: %s", res.Status, response)
+ }
+ if out != nil {
+ if err := json.NewDecoder(res.Body).Decode(out); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// Server is an HTTP server providing an API to manage a simulation network
+type Server struct {
+ router *httprouter.Router
+ network *Network
+}
+
+// NewServer returns a new simulation API server
+func NewServer(network *Network) *Server {
+ s := &Server{
+ router: httprouter.New(),
+ network: network,
+ }
+
+ s.OPTIONS("/", s.Options)
+ s.GET("/", s.GetNetwork)
+ s.POST("/start", s.StartNetwork)
+ s.POST("/stop", s.StopNetwork)
+ s.GET("/events", s.StreamNetworkEvents)
+ s.GET("/snapshot", s.CreateSnapshot)
+ s.POST("/snapshot", s.LoadSnapshot)
+ s.POST("/nodes", s.CreateNode)
+ s.GET("/nodes", s.GetNodes)
+ s.GET("/nodes/:nodeid", s.GetNode)
+ s.POST("/nodes/:nodeid/start", s.StartNode)
+ s.POST("/nodes/:nodeid/stop", s.StopNode)
+ s.POST("/nodes/:nodeid/conn/:peerid", s.ConnectNode)
+ s.DELETE("/nodes/:nodeid/conn/:peerid", s.DisconnectNode)
+ s.GET("/nodes/:nodeid/rpc", s.NodeRPC)
+
+ return s
+}
+
+// GetNetwork returns details of the network
+func (s *Server) GetNetwork(w http.ResponseWriter, req *http.Request) {
+ s.JSON(w, http.StatusOK, s.network)
+}
+
+// StartNetwork starts all nodes in the network
+func (s *Server) StartNetwork(w http.ResponseWriter, req *http.Request) {
+ if err := s.network.StartAll(); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ w.WriteHeader(http.StatusOK)
+}
+
+// StopNetwork stops all nodes in the network
+func (s *Server) StopNetwork(w http.ResponseWriter, req *http.Request) {
+ if err := s.network.StopAll(); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ w.WriteHeader(http.StatusOK)
+}
+
+// StreamNetworkEvents streams network events as a server-sent-events stream
+func (s *Server) StreamNetworkEvents(w http.ResponseWriter, req *http.Request) {
+ events := make(chan *Event)
+ sub := s.network.events.Subscribe(events)
+ defer sub.Unsubscribe()
+
+ // stop the stream if the client goes away
+ var clientGone <-chan bool
+ if cn, ok := w.(http.CloseNotifier); ok {
+ clientGone = cn.CloseNotify()
+ }
+
+ // write writes the given event and data to the stream like:
+ //
+ // event: <event>
+ // data: <data>
+ //
+ write := func(event, data string) {
+ fmt.Fprintf(w, "event: %s\n", event)
+ fmt.Fprintf(w, "data: %s\n\n", data)
+ if fw, ok := w.(http.Flusher); ok {
+ fw.Flush()
+ }
+ }
+ writeEvent := func(event *Event) error {
+ data, err := json.Marshal(event)
+ if err != nil {
+ return err
+ }
+ write("network", string(data))
+ return nil
+ }
+ writeErr := func(err error) {
+ write("error", err.Error())
+ }
+
+ // check if filtering has been requested
+ var filters MsgFilters
+ if filterParam := req.URL.Query().Get("filter"); filterParam != "" {
+ var err error
+ filters, err = NewMsgFilters(filterParam)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusBadRequest)
+ return
+ }
+ }
+
+ w.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
+ w.WriteHeader(http.StatusOK)
+ fmt.Fprintf(w, "\n\n")
+ if fw, ok := w.(http.Flusher); ok {
+ fw.Flush()
+ }
+
+ // optionally send the existing nodes and connections
+ if req.URL.Query().Get("current") == "true" {
+ snap, err := s.network.Snapshot()
+ if err != nil {
+ writeErr(err)
+ return
+ }
+ for _, node := range snap.Nodes {
+ event := NewEvent(&node.Node)
+ if err := writeEvent(event); err != nil {
+ writeErr(err)
+ return
+ }
+ }
+ for _, conn := range snap.Conns {
+ event := NewEvent(&conn)
+ if err := writeEvent(event); err != nil {
+ writeErr(err)
+ return
+ }
+ }
+ }
+
+ for {
+ select {
+ case event := <-events:
+ // only send message events which match the filters
+ if event.Msg != nil && !filters.Match(event.Msg) {
+ continue
+ }
+ if err := writeEvent(event); err != nil {
+ writeErr(err)
+ return
+ }
+ case <-clientGone:
+ return
+ }
+ }
+}
+
+// NewMsgFilters constructs a collection of message filters from a URL query
+// parameter.
+//
+// The parameter is expected to be a dash-separated list of individual filters,
+// each having the format '<proto>:<codes>', where <proto> is the name of a
+// protocol and <codes> is a comma-separated list of message codes.
+//
+// A message code of '*' or '-1' is considered a wildcard and matches any code.
+func NewMsgFilters(filterParam string) (MsgFilters, error) {
+ filters := make(MsgFilters)
+ for _, filter := range strings.Split(filterParam, "-") {
+ protoCodes := strings.SplitN(filter, ":", 2)
+ if len(protoCodes) != 2 || protoCodes[0] == "" || protoCodes[1] == "" {
+ return nil, fmt.Errorf("invalid message filter: %s", filter)
+ }
+ proto := protoCodes[0]
+ for _, code := range strings.Split(protoCodes[1], ",") {
+ if code == "*" || code == "-1" {
+ filters[MsgFilter{Proto: proto, Code: -1}] = struct{}{}
+ continue
+ }
+ n, err := strconv.ParseUint(code, 10, 64)
+ if err != nil {
+ return nil, fmt.Errorf("invalid message code: %s", code)
+ }
+ filters[MsgFilter{Proto: proto, Code: int64(n)}] = struct{}{}
+ }
+ }
+ return filters, nil
+}
+
+// MsgFilters is a collection of filters which are used to filter message
+// events
+type MsgFilters map[MsgFilter]struct{}
+
+// Match checks if the given message matches any of the filters
+func (m MsgFilters) Match(msg *Msg) bool {
+ // check if there is a wildcard filter for the message's protocol
+ if _, ok := m[MsgFilter{Proto: msg.Protocol, Code: -1}]; ok {
+ return true
+ }
+
+ // check if there is a filter for the message's protocol and code
+ if _, ok := m[MsgFilter{Proto: msg.Protocol, Code: int64(msg.Code)}]; ok {
+ return true
+ }
+
+ return false
+}
+
+// MsgFilter is used to filter message events based on protocol and message
+// code
+type MsgFilter struct {
+ // Proto is matched against a message's protocol
+ Proto string
+
+ // Code is matched against a message's code, with -1 matching all codes
+ Code int64
+}
+
+// CreateSnapshot creates a network snapshot
+func (s *Server) CreateSnapshot(w http.ResponseWriter, req *http.Request) {
+ snap, err := s.network.Snapshot()
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ s.JSON(w, http.StatusOK, snap)
+}
+
+// LoadSnapshot loads a snapshot into the network
+func (s *Server) LoadSnapshot(w http.ResponseWriter, req *http.Request) {
+ snap := &Snapshot{}
+ if err := json.NewDecoder(req.Body).Decode(snap); err != nil {
+ http.Error(w, err.Error(), http.StatusBadRequest)
+ return
+ }
+
+ if err := s.network.Load(snap); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ s.JSON(w, http.StatusOK, s.network)
+}
+
+// CreateNode creates a node in the network using the given configuration
+func (s *Server) CreateNode(w http.ResponseWriter, req *http.Request) {
+ config := adapters.RandomNodeConfig()
+ err := json.NewDecoder(req.Body).Decode(config)
+ if err != nil && err != io.EOF {
+ http.Error(w, err.Error(), http.StatusBadRequest)
+ return
+ }
+
+ node, err := s.network.NewNodeWithConfig(config)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ s.JSON(w, http.StatusCreated, node.NodeInfo())
+}
+
+// GetNodes returns all nodes which exist in the network
+func (s *Server) GetNodes(w http.ResponseWriter, req *http.Request) {
+ nodes := s.network.GetNodes()
+
+ infos := make([]*p2p.NodeInfo, len(nodes))
+ for i, node := range nodes {
+ infos[i] = node.NodeInfo()
+ }
+
+ s.JSON(w, http.StatusOK, infos)
+}
+
+// GetNode returns details of a node
+func (s *Server) GetNode(w http.ResponseWriter, req *http.Request) {
+ node := req.Context().Value("node").(*Node)
+
+ s.JSON(w, http.StatusOK, node.NodeInfo())
+}
+
+// StartNode starts a node
+func (s *Server) StartNode(w http.ResponseWriter, req *http.Request) {
+ node := req.Context().Value("node").(*Node)
+
+ if err := s.network.Start(node.ID()); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ s.JSON(w, http.StatusOK, node.NodeInfo())
+}
+
+// StopNode stops a node
+func (s *Server) StopNode(w http.ResponseWriter, req *http.Request) {
+ node := req.Context().Value("node").(*Node)
+
+ if err := s.network.Stop(node.ID()); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ s.JSON(w, http.StatusOK, node.NodeInfo())
+}
+
+// ConnectNode connects a node to a peer node
+func (s *Server) ConnectNode(w http.ResponseWriter, req *http.Request) {
+ node := req.Context().Value("node").(*Node)
+ peer := req.Context().Value("peer").(*Node)
+
+ if err := s.network.Connect(node.ID(), peer.ID()); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ s.JSON(w, http.StatusOK, node.NodeInfo())
+}
+
+// DisconnectNode disconnects a node from a peer node
+func (s *Server) DisconnectNode(w http.ResponseWriter, req *http.Request) {
+ node := req.Context().Value("node").(*Node)
+ peer := req.Context().Value("peer").(*Node)
+
+ if err := s.network.Disconnect(node.ID(), peer.ID()); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ s.JSON(w, http.StatusOK, node.NodeInfo())
+}
+
+// Options responds to the OPTIONS HTTP method by returning a 200 OK response
+// with the "Access-Control-Allow-Headers" header set to "Content-Type"
+func (s *Server) Options(w http.ResponseWriter, req *http.Request) {
+ w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
+ w.WriteHeader(http.StatusOK)
+}
+
+// NodeRPC forwards RPC requests to a node in the network via a WebSocket
+// connection
+func (s *Server) NodeRPC(w http.ResponseWriter, req *http.Request) {
+ node := req.Context().Value("node").(*Node)
+
+ handler := func(conn *websocket.Conn) {
+ node.ServeRPC(conn)
+ }
+
+ websocket.Server{Handler: handler}.ServeHTTP(w, req)
+}
+
+// ServeHTTP implements the http.Handler interface by delegating to the
+// underlying httprouter.Router
+func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ s.router.ServeHTTP(w, req)
+}
+
+// GET registers a handler for GET requests to a particular path
+func (s *Server) GET(path string, handle http.HandlerFunc) {
+ s.router.GET(path, s.wrapHandler(handle))
+}
+
+// POST registers a handler for POST requests to a particular path
+func (s *Server) POST(path string, handle http.HandlerFunc) {
+ s.router.POST(path, s.wrapHandler(handle))
+}
+
+// DELETE registers a handler for DELETE requests to a particular path
+func (s *Server) DELETE(path string, handle http.HandlerFunc) {
+ s.router.DELETE(path, s.wrapHandler(handle))
+}
+
+// OPTIONS registers a handler for OPTIONS requests to a particular path
+func (s *Server) OPTIONS(path string, handle http.HandlerFunc) {
+ s.router.OPTIONS("/*path", s.wrapHandler(handle))
+}
+
+// JSON sends "data" as a JSON HTTP response
+func (s *Server) JSON(w http.ResponseWriter, status int, data interface{}) {
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(status)
+ json.NewEncoder(w).Encode(data)
+}
+
+// wrapHandler returns a httprouter.Handle which wraps a http.HandlerFunc by
+// populating request.Context with any objects from the URL params
+func (s *Server) wrapHandler(handler http.HandlerFunc) httprouter.Handle {
+ return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
+ w.Header().Set("Access-Control-Allow-Origin", "*")
+ w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
+
+ ctx := context.Background()
+
+ if id := params.ByName("nodeid"); id != "" {
+ var node *Node
+ if nodeID, err := discover.HexID(id); err == nil {
+ node = s.network.GetNode(nodeID)
+ } else {
+ node = s.network.GetNodeByName(id)
+ }
+ if node == nil {
+ http.NotFound(w, req)
+ return
+ }
+ ctx = context.WithValue(ctx, "node", node)
+ }
+
+ if id := params.ByName("peerid"); id != "" {
+ var peer *Node
+ if peerID, err := discover.HexID(id); err == nil {
+ peer = s.network.GetNode(peerID)
+ } else {
+ peer = s.network.GetNodeByName(id)
+ }
+ if peer == nil {
+ http.NotFound(w, req)
+ return
+ }
+ ctx = context.WithValue(ctx, "peer", peer)
+ }
+
+ handler(w, req.WithContext(ctx))
+ }
+}
diff --git a/p2p/simulations/http_test.go b/p2p/simulations/http_test.go
new file mode 100644
index 000000000..677a8fb14
--- /dev/null
+++ b/p2p/simulations/http_test.go
@@ -0,0 +1,823 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package simulations
+
+import (
+ "context"
+ "fmt"
+ "math/rand"
+ "net/http/httptest"
+ "reflect"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
+ "github.com/ethereum/go-ethereum/rpc"
+)
+
+// testService implements the node.Service interface and provides protocols
+// and APIs which are useful for testing nodes in a simulation network
+type testService struct {
+ id discover.NodeID
+
+ // peerCount is incremented once a peer handshake has been performed
+ peerCount int64
+
+ peers map[discover.NodeID]*testPeer
+ peersMtx sync.Mutex
+
+ // state stores []byte which is used to test creating and loading
+ // snapshots
+ state atomic.Value
+}
+
+func newTestService(ctx *adapters.ServiceContext) (node.Service, error) {
+ svc := &testService{
+ id: ctx.Config.ID,
+ peers: make(map[discover.NodeID]*testPeer),
+ }
+ svc.state.Store(ctx.Snapshot)
+ return svc, nil
+}
+
+type testPeer struct {
+ testReady chan struct{}
+ dumReady chan struct{}
+}
+
+func (t *testService) peer(id discover.NodeID) *testPeer {
+ t.peersMtx.Lock()
+ defer t.peersMtx.Unlock()
+ if peer, ok := t.peers[id]; ok {
+ return peer
+ }
+ peer := &testPeer{
+ testReady: make(chan struct{}),
+ dumReady: make(chan struct{}),
+ }
+ t.peers[id] = peer
+ return peer
+}
+
+func (t *testService) Protocols() []p2p.Protocol {
+ return []p2p.Protocol{
+ {
+ Name: "test",
+ Version: 1,
+ Length: 3,
+ Run: t.RunTest,
+ },
+ {
+ Name: "dum",
+ Version: 1,
+ Length: 1,
+ Run: t.RunDum,
+ },
+ {
+ Name: "prb",
+ Version: 1,
+ Length: 1,
+ Run: t.RunPrb,
+ },
+ }
+}
+
+func (t *testService) APIs() []rpc.API {
+ return []rpc.API{{
+ Namespace: "test",
+ Version: "1.0",
+ Service: &TestAPI{
+ state: &t.state,
+ peerCount: &t.peerCount,
+ },
+ }}
+}
+
+func (t *testService) Start(server *p2p.Server) error {
+ return nil
+}
+
+func (t *testService) Stop() error {
+ return nil
+}
+
+// handshake performs a peer handshake by sending and expecting an empty
+// message with the given code
+func (t *testService) handshake(rw p2p.MsgReadWriter, code uint64) error {
+ errc := make(chan error, 2)
+ go func() { errc <- p2p.Send(rw, code, struct{}{}) }()
+ go func() { errc <- p2p.ExpectMsg(rw, code, struct{}{}) }()
+ for i := 0; i < 2; i++ {
+ if err := <-errc; err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (t *testService) RunTest(p *p2p.Peer, rw p2p.MsgReadWriter) error {
+ peer := t.peer(p.ID())
+
+ // perform three handshakes with three different message codes,
+ // used to test message sending and filtering
+ if err := t.handshake(rw, 2); err != nil {
+ return err
+ }
+ if err := t.handshake(rw, 1); err != nil {
+ return err
+ }
+ if err := t.handshake(rw, 0); err != nil {
+ return err
+ }
+
+ // close the testReady channel so that other protocols can run
+ close(peer.testReady)
+
+ // track the peer
+ atomic.AddInt64(&t.peerCount, 1)
+ defer atomic.AddInt64(&t.peerCount, -1)
+
+ // block until the peer is dropped
+ for {
+ _, err := rw.ReadMsg()
+ if err != nil {
+ return err
+ }
+ }
+}
+
+func (t *testService) RunDum(p *p2p.Peer, rw p2p.MsgReadWriter) error {
+ peer := t.peer(p.ID())
+
+ // wait for the test protocol to perform its handshake
+ <-peer.testReady
+
+ // perform a handshake
+ if err := t.handshake(rw, 0); err != nil {
+ return err
+ }
+
+ // close the dumReady channel so that other protocols can run
+ close(peer.dumReady)
+
+ // block until the peer is dropped
+ for {
+ _, err := rw.ReadMsg()
+ if err != nil {
+ return err
+ }
+ }
+}
+func (t *testService) RunPrb(p *p2p.Peer, rw p2p.MsgReadWriter) error {
+ peer := t.peer(p.ID())
+
+ // wait for the dum protocol to perform its handshake
+ <-peer.dumReady
+
+ // perform a handshake
+ if err := t.handshake(rw, 0); err != nil {
+ return err
+ }
+
+ // block until the peer is dropped
+ for {
+ _, err := rw.ReadMsg()
+ if err != nil {
+ return err
+ }
+ }
+}
+
+func (t *testService) Snapshot() ([]byte, error) {
+ return t.state.Load().([]byte), nil
+}
+
+// TestAPI provides a test API to:
+// * get the peer count
+// * get and set an arbitrary state byte slice
+// * get and increment a counter
+// * subscribe to counter increment events
+type TestAPI struct {
+ state *atomic.Value
+ peerCount *int64
+ counter int64
+ feed event.Feed
+}
+
+func (t *TestAPI) PeerCount() int64 {
+ return atomic.LoadInt64(t.peerCount)
+}
+
+func (t *TestAPI) Get() int64 {
+ return atomic.LoadInt64(&t.counter)
+}
+
+func (t *TestAPI) Add(delta int64) {
+ atomic.AddInt64(&t.counter, delta)
+ t.feed.Send(delta)
+}
+
+func (t *TestAPI) GetState() []byte {
+ return t.state.Load().([]byte)
+}
+
+func (t *TestAPI) SetState(state []byte) {
+ t.state.Store(state)
+}
+
+func (t *TestAPI) Events(ctx context.Context) (*rpc.Subscription, error) {
+ notifier, supported := rpc.NotifierFromContext(ctx)
+ if !supported {
+ return nil, rpc.ErrNotificationsUnsupported
+ }
+
+ rpcSub := notifier.CreateSubscription()
+
+ go func() {
+ events := make(chan int64)
+ sub := t.feed.Subscribe(events)
+ defer sub.Unsubscribe()
+
+ for {
+ select {
+ case event := <-events:
+ notifier.Notify(rpcSub.ID, event)
+ case <-sub.Err():
+ return
+ case <-rpcSub.Err():
+ return
+ case <-notifier.Closed():
+ return
+ }
+ }
+ }()
+
+ return rpcSub, nil
+}
+
+var testServices = adapters.Services{
+ "test": newTestService,
+}
+
+func testHTTPServer(t *testing.T) (*Network, *httptest.Server) {
+ adapter := adapters.NewSimAdapter(testServices)
+ network := NewNetwork(adapter, &NetworkConfig{
+ DefaultService: "test",
+ })
+ return network, httptest.NewServer(NewServer(network))
+}
+
+// TestHTTPNetwork tests interacting with a simulation network using the HTTP
+// API
+func TestHTTPNetwork(t *testing.T) {
+ // start the server
+ network, s := testHTTPServer(t)
+ defer s.Close()
+
+ // subscribe to events so we can check them later
+ client := NewClient(s.URL)
+ events := make(chan *Event, 100)
+ var opts SubscribeOpts
+ sub, err := client.SubscribeNetwork(events, opts)
+ if err != nil {
+ t.Fatalf("error subscribing to network events: %s", err)
+ }
+ defer sub.Unsubscribe()
+
+ // check we can retrieve details about the network
+ gotNetwork, err := client.GetNetwork()
+ if err != nil {
+ t.Fatalf("error getting network: %s", err)
+ }
+ if gotNetwork.ID != network.ID {
+ t.Fatalf("expected network to have ID %q, got %q", network.ID, gotNetwork.ID)
+ }
+
+ // start a simulation network
+ nodeIDs := startTestNetwork(t, client)
+
+ // check we got all the events
+ x := &expectEvents{t, events, sub}
+ x.expect(
+ x.nodeEvent(nodeIDs[0], false),
+ x.nodeEvent(nodeIDs[1], false),
+ x.nodeEvent(nodeIDs[0], true),
+ x.nodeEvent(nodeIDs[1], true),
+ x.connEvent(nodeIDs[0], nodeIDs[1], false),
+ x.connEvent(nodeIDs[0], nodeIDs[1], true),
+ )
+
+ // reconnect the stream and check we get the current nodes and conns
+ events = make(chan *Event, 100)
+ opts.Current = true
+ sub, err = client.SubscribeNetwork(events, opts)
+ if err != nil {
+ t.Fatalf("error subscribing to network events: %s", err)
+ }
+ defer sub.Unsubscribe()
+ x = &expectEvents{t, events, sub}
+ x.expect(
+ x.nodeEvent(nodeIDs[0], true),
+ x.nodeEvent(nodeIDs[1], true),
+ x.connEvent(nodeIDs[0], nodeIDs[1], true),
+ )
+}
+
+func startTestNetwork(t *testing.T, client *Client) []string {
+ // create two nodes
+ nodeCount := 2
+ nodeIDs := make([]string, nodeCount)
+ for i := 0; i < nodeCount; i++ {
+ node, err := client.CreateNode(nil)
+ if err != nil {
+ t.Fatalf("error creating node: %s", err)
+ }
+ nodeIDs[i] = node.ID
+ }
+
+ // check both nodes exist
+ nodes, err := client.GetNodes()
+ if err != nil {
+ t.Fatalf("error getting nodes: %s", err)
+ }
+ if len(nodes) != nodeCount {
+ t.Fatalf("expected %d nodes, got %d", nodeCount, len(nodes))
+ }
+ for i, nodeID := range nodeIDs {
+ if nodes[i].ID != nodeID {
+ t.Fatalf("expected node %d to have ID %q, got %q", i, nodeID, nodes[i].ID)
+ }
+ node, err := client.GetNode(nodeID)
+ if err != nil {
+ t.Fatalf("error getting node %d: %s", i, err)
+ }
+ if node.ID != nodeID {
+ t.Fatalf("expected node %d to have ID %q, got %q", i, nodeID, node.ID)
+ }
+ }
+
+ // start both nodes
+ for _, nodeID := range nodeIDs {
+ if err := client.StartNode(nodeID); err != nil {
+ t.Fatalf("error starting node %q: %s", nodeID, err)
+ }
+ }
+
+ // connect the nodes
+ for i := 0; i < nodeCount-1; i++ {
+ peerId := i + 1
+ if i == nodeCount-1 {
+ peerId = 0
+ }
+ if err := client.ConnectNode(nodeIDs[i], nodeIDs[peerId]); err != nil {
+ t.Fatalf("error connecting nodes: %s", err)
+ }
+ }
+
+ return nodeIDs
+}
+
+type expectEvents struct {
+ *testing.T
+
+ events chan *Event
+ sub event.Subscription
+}
+
+func (t *expectEvents) nodeEvent(id string, up bool) *Event {
+ return &Event{
+ Type: EventTypeNode,
+ Node: &Node{
+ Config: &adapters.NodeConfig{
+ ID: discover.MustHexID(id),
+ },
+ Up: up,
+ },
+ }
+}
+
+func (t *expectEvents) connEvent(one, other string, up bool) *Event {
+ return &Event{
+ Type: EventTypeConn,
+ Conn: &Conn{
+ One: discover.MustHexID(one),
+ Other: discover.MustHexID(other),
+ Up: up,
+ },
+ }
+}
+
+func (t *expectEvents) expectMsgs(expected map[MsgFilter]int) {
+ actual := make(map[MsgFilter]int)
+ timeout := time.After(10 * time.Second)
+loop:
+ for {
+ select {
+ case event := <-t.events:
+ t.Logf("received %s event: %s", event.Type, event)
+
+ if event.Type != EventTypeMsg || event.Msg.Received {
+ continue loop
+ }
+ if event.Msg == nil {
+ t.Fatal("expected event.Msg to be set")
+ }
+ filter := MsgFilter{
+ Proto: event.Msg.Protocol,
+ Code: int64(event.Msg.Code),
+ }
+ actual[filter]++
+ if actual[filter] > expected[filter] {
+ t.Fatalf("received too many msgs for filter: %v", filter)
+ }
+ if reflect.DeepEqual(actual, expected) {
+ return
+ }
+
+ case err := <-t.sub.Err():
+ t.Fatalf("network stream closed unexpectedly: %s", err)
+
+ case <-timeout:
+ t.Fatal("timed out waiting for expected events")
+ }
+ }
+}
+
+func (t *expectEvents) expect(events ...*Event) {
+ timeout := time.After(10 * time.Second)
+ i := 0
+ for {
+ select {
+ case event := <-t.events:
+ t.Logf("received %s event: %s", event.Type, event)
+
+ expected := events[i]
+ if event.Type != expected.Type {
+ t.Fatalf("expected event %d to have type %q, got %q", i, expected.Type, event.Type)
+ }
+
+ switch expected.Type {
+
+ case EventTypeNode:
+ if event.Node == nil {
+ t.Fatal("expected event.Node to be set")
+ }
+ if event.Node.ID() != expected.Node.ID() {
+ t.Fatalf("expected node event %d to have id %q, got %q", i, expected.Node.ID().TerminalString(), event.Node.ID().TerminalString())
+ }
+ if event.Node.Up != expected.Node.Up {
+ t.Fatalf("expected node event %d to have up=%t, got up=%t", i, expected.Node.Up, event.Node.Up)
+ }
+
+ case EventTypeConn:
+ if event.Conn == nil {
+ t.Fatal("expected event.Conn to be set")
+ }
+ if event.Conn.One != expected.Conn.One {
+ t.Fatalf("expected conn event %d to have one=%q, got one=%q", i, expected.Conn.One.TerminalString(), event.Conn.One.TerminalString())
+ }
+ if event.Conn.Other != expected.Conn.Other {
+ t.Fatalf("expected conn event %d to have other=%q, got other=%q", i, expected.Conn.Other.TerminalString(), event.Conn.Other.TerminalString())
+ }
+ if event.Conn.Up != expected.Conn.Up {
+ t.Fatalf("expected conn event %d to have up=%t, got up=%t", i, expected.Conn.Up, event.Conn.Up)
+ }
+
+ }
+
+ i++
+ if i == len(events) {
+ return
+ }
+
+ case err := <-t.sub.Err():
+ t.Fatalf("network stream closed unexpectedly: %s", err)
+
+ case <-timeout:
+ t.Fatal("timed out waiting for expected events")
+ }
+ }
+}
+
+// TestHTTPNodeRPC tests calling RPC methods on nodes via the HTTP API
+func TestHTTPNodeRPC(t *testing.T) {
+ // start the server
+ _, s := testHTTPServer(t)
+ defer s.Close()
+
+ // start a node in the network
+ client := NewClient(s.URL)
+ node, err := client.CreateNode(nil)
+ if err != nil {
+ t.Fatalf("error creating node: %s", err)
+ }
+ if err := client.StartNode(node.ID); err != nil {
+ t.Fatalf("error starting node: %s", err)
+ }
+
+ // create two RPC clients
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+ rpcClient1, err := client.RPCClient(ctx, node.ID)
+ if err != nil {
+ t.Fatalf("error getting node RPC client: %s", err)
+ }
+ rpcClient2, err := client.RPCClient(ctx, node.ID)
+ if err != nil {
+ t.Fatalf("error getting node RPC client: %s", err)
+ }
+
+ // subscribe to events using client 1
+ events := make(chan int64, 1)
+ sub, err := rpcClient1.Subscribe(ctx, "test", events, "events")
+ if err != nil {
+ t.Fatalf("error subscribing to events: %s", err)
+ }
+ defer sub.Unsubscribe()
+
+ // call some RPC methods using client 2
+ if err := rpcClient2.CallContext(ctx, nil, "test_add", 10); err != nil {
+ t.Fatalf("error calling RPC method: %s", err)
+ }
+ var result int64
+ if err := rpcClient2.CallContext(ctx, &result, "test_get"); err != nil {
+ t.Fatalf("error calling RPC method: %s", err)
+ }
+ if result != 10 {
+ t.Fatalf("expected result to be 10, got %d", result)
+ }
+
+ // check we got an event from client 1
+ select {
+ case event := <-events:
+ if event != 10 {
+ t.Fatalf("expected event to be 10, got %d", event)
+ }
+ case <-ctx.Done():
+ t.Fatal(ctx.Err())
+ }
+}
+
+// TestHTTPSnapshot tests creating and loading network snapshots
+func TestHTTPSnapshot(t *testing.T) {
+ // start the server
+ _, s := testHTTPServer(t)
+ defer s.Close()
+
+ // create a two-node network
+ client := NewClient(s.URL)
+ nodeCount := 2
+ nodes := make([]*p2p.NodeInfo, nodeCount)
+ for i := 0; i < nodeCount; i++ {
+ node, err := client.CreateNode(nil)
+ if err != nil {
+ t.Fatalf("error creating node: %s", err)
+ }
+ if err := client.StartNode(node.ID); err != nil {
+ t.Fatalf("error starting node: %s", err)
+ }
+ nodes[i] = node
+ }
+ if err := client.ConnectNode(nodes[0].ID, nodes[1].ID); err != nil {
+ t.Fatalf("error connecting nodes: %s", err)
+ }
+
+ // store some state in the test services
+ states := make([]string, nodeCount)
+ for i, node := range nodes {
+ rpc, err := client.RPCClient(context.Background(), node.ID)
+ if err != nil {
+ t.Fatalf("error getting RPC client: %s", err)
+ }
+ defer rpc.Close()
+ state := fmt.Sprintf("%x", rand.Int())
+ if err := rpc.Call(nil, "test_setState", []byte(state)); err != nil {
+ t.Fatalf("error setting service state: %s", err)
+ }
+ states[i] = state
+ }
+
+ // create a snapshot
+ snap, err := client.CreateSnapshot()
+ if err != nil {
+ t.Fatalf("error creating snapshot: %s", err)
+ }
+ for i, state := range states {
+ gotState := snap.Nodes[i].Snapshots["test"]
+ if string(gotState) != state {
+ t.Fatalf("expected snapshot state %q, got %q", state, gotState)
+ }
+ }
+
+ // create another network
+ _, s = testHTTPServer(t)
+ defer s.Close()
+ client = NewClient(s.URL)
+
+ // subscribe to events so we can check them later
+ events := make(chan *Event, 100)
+ var opts SubscribeOpts
+ sub, err := client.SubscribeNetwork(events, opts)
+ if err != nil {
+ t.Fatalf("error subscribing to network events: %s", err)
+ }
+ defer sub.Unsubscribe()
+
+ // load the snapshot
+ if err := client.LoadSnapshot(snap); err != nil {
+ t.Fatalf("error loading snapshot: %s", err)
+ }
+
+ // check the nodes and connection exists
+ net, err := client.GetNetwork()
+ if err != nil {
+ t.Fatalf("error getting network: %s", err)
+ }
+ if len(net.Nodes) != nodeCount {
+ t.Fatalf("expected network to have %d nodes, got %d", nodeCount, len(net.Nodes))
+ }
+ for i, node := range nodes {
+ id := net.Nodes[i].ID().String()
+ if id != node.ID {
+ t.Fatalf("expected node %d to have ID %s, got %s", i, node.ID, id)
+ }
+ }
+ if len(net.Conns) != 1 {
+ t.Fatalf("expected network to have 1 connection, got %d", len(net.Conns))
+ }
+ conn := net.Conns[0]
+ if conn.One.String() != nodes[0].ID {
+ t.Fatalf("expected connection to have one=%q, got one=%q", nodes[0].ID, conn.One)
+ }
+ if conn.Other.String() != nodes[1].ID {
+ t.Fatalf("expected connection to have other=%q, got other=%q", nodes[1].ID, conn.Other)
+ }
+
+ // check the node states were restored
+ for i, node := range nodes {
+ rpc, err := client.RPCClient(context.Background(), node.ID)
+ if err != nil {
+ t.Fatalf("error getting RPC client: %s", err)
+ }
+ defer rpc.Close()
+ var state []byte
+ if err := rpc.Call(&state, "test_getState"); err != nil {
+ t.Fatalf("error getting service state: %s", err)
+ }
+ if string(state) != states[i] {
+ t.Fatalf("expected snapshot state %q, got %q", states[i], state)
+ }
+ }
+
+ // check we got all the events
+ x := &expectEvents{t, events, sub}
+ x.expect(
+ x.nodeEvent(nodes[0].ID, false),
+ x.nodeEvent(nodes[0].ID, true),
+ x.nodeEvent(nodes[1].ID, false),
+ x.nodeEvent(nodes[1].ID, true),
+ x.connEvent(nodes[0].ID, nodes[1].ID, false),
+ x.connEvent(nodes[0].ID, nodes[1].ID, true),
+ )
+}
+
+// TestMsgFilterPassMultiple tests streaming message events using a filter
+// with multiple protocols
+func TestMsgFilterPassMultiple(t *testing.T) {
+ // start the server
+ _, s := testHTTPServer(t)
+ defer s.Close()
+
+ // subscribe to events with a message filter
+ client := NewClient(s.URL)
+ events := make(chan *Event, 10)
+ opts := SubscribeOpts{
+ Filter: "prb:0-test:0",
+ }
+ sub, err := client.SubscribeNetwork(events, opts)
+ if err != nil {
+ t.Fatalf("error subscribing to network events: %s", err)
+ }
+ defer sub.Unsubscribe()
+
+ // start a simulation network
+ startTestNetwork(t, client)
+
+ // check we got the expected events
+ x := &expectEvents{t, events, sub}
+ x.expectMsgs(map[MsgFilter]int{
+ {"test", 0}: 2,
+ {"prb", 0}: 2,
+ })
+}
+
+// TestMsgFilterPassWildcard tests streaming message events using a filter
+// with a code wildcard
+func TestMsgFilterPassWildcard(t *testing.T) {
+ // start the server
+ _, s := testHTTPServer(t)
+ defer s.Close()
+
+ // subscribe to events with a message filter
+ client := NewClient(s.URL)
+ events := make(chan *Event, 10)
+ opts := SubscribeOpts{
+ Filter: "prb:0,2-test:*",
+ }
+ sub, err := client.SubscribeNetwork(events, opts)
+ if err != nil {
+ t.Fatalf("error subscribing to network events: %s", err)
+ }
+ defer sub.Unsubscribe()
+
+ // start a simulation network
+ startTestNetwork(t, client)
+
+ // check we got the expected events
+ x := &expectEvents{t, events, sub}
+ x.expectMsgs(map[MsgFilter]int{
+ {"test", 2}: 2,
+ {"test", 1}: 2,
+ {"test", 0}: 2,
+ {"prb", 0}: 2,
+ })
+}
+
+// TestMsgFilterPassSingle tests streaming message events using a filter
+// with a single protocol and code
+func TestMsgFilterPassSingle(t *testing.T) {
+ // start the server
+ _, s := testHTTPServer(t)
+ defer s.Close()
+
+ // subscribe to events with a message filter
+ client := NewClient(s.URL)
+ events := make(chan *Event, 10)
+ opts := SubscribeOpts{
+ Filter: "dum:0",
+ }
+ sub, err := client.SubscribeNetwork(events, opts)
+ if err != nil {
+ t.Fatalf("error subscribing to network events: %s", err)
+ }
+ defer sub.Unsubscribe()
+
+ // start a simulation network
+ startTestNetwork(t, client)
+
+ // check we got the expected events
+ x := &expectEvents{t, events, sub}
+ x.expectMsgs(map[MsgFilter]int{
+ {"dum", 0}: 2,
+ })
+}
+
+// TestMsgFilterPassSingle tests streaming message events using an invalid
+// filter
+func TestMsgFilterFailBadParams(t *testing.T) {
+ // start the server
+ _, s := testHTTPServer(t)
+ defer s.Close()
+
+ client := NewClient(s.URL)
+ events := make(chan *Event, 10)
+ opts := SubscribeOpts{
+ Filter: "foo:",
+ }
+ _, err := client.SubscribeNetwork(events, opts)
+ if err == nil {
+ t.Fatalf("expected event subscription to fail but succeeded!")
+ }
+
+ opts.Filter = "bzz:aa"
+ _, err = client.SubscribeNetwork(events, opts)
+ if err == nil {
+ t.Fatalf("expected event subscription to fail but succeeded!")
+ }
+
+ opts.Filter = "invalid"
+ _, err = client.SubscribeNetwork(events, opts)
+ if err == nil {
+ t.Fatalf("expected event subscription to fail but succeeded!")
+ }
+}
diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go
new file mode 100644
index 000000000..06890ffcf
--- /dev/null
+++ b/p2p/simulations/network.go
@@ -0,0 +1,680 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package simulations
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "fmt"
+ "sync"
+
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
+)
+
+// NetworkConfig defines configuration options for starting a Network
+type NetworkConfig struct {
+ ID string `json:"id"`
+ DefaultService string `json:"default_service,omitempty"`
+}
+
+// Network models a p2p simulation network which consists of a collection of
+// simulated nodes and the connections which exist between them.
+//
+// The Network has a single NodeAdapter which is responsible for actually
+// starting nodes and connecting them together.
+//
+// The Network emits events when nodes are started and stopped, when they are
+// connected and disconnected, and also when messages are sent between nodes.
+type Network struct {
+ NetworkConfig
+
+ Nodes []*Node `json:"nodes"`
+ nodeMap map[discover.NodeID]int
+
+ Conns []*Conn `json:"conns"`
+ connMap map[string]int
+
+ nodeAdapter adapters.NodeAdapter
+ events event.Feed
+ lock sync.RWMutex
+ quitc chan struct{}
+}
+
+// NewNetwork returns a Network which uses the given NodeAdapter and NetworkConfig
+func NewNetwork(nodeAdapter adapters.NodeAdapter, conf *NetworkConfig) *Network {
+ return &Network{
+ NetworkConfig: *conf,
+ nodeAdapter: nodeAdapter,
+ nodeMap: make(map[discover.NodeID]int),
+ connMap: make(map[string]int),
+ quitc: make(chan struct{}),
+ }
+}
+
+// Events returns the output event feed of the Network.
+func (self *Network) Events() *event.Feed {
+ return &self.events
+}
+
+// NewNode adds a new node to the network with a random ID
+func (self *Network) NewNode() (*Node, error) {
+ conf := adapters.RandomNodeConfig()
+ conf.Services = []string{self.DefaultService}
+ return self.NewNodeWithConfig(conf)
+}
+
+// NewNodeWithConfig adds a new node to the network with the given config,
+// returning an error if a node with the same ID or name already exists
+func (self *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+
+ // create a random ID and PrivateKey if not set
+ if conf.ID == (discover.NodeID{}) {
+ c := adapters.RandomNodeConfig()
+ conf.ID = c.ID
+ conf.PrivateKey = c.PrivateKey
+ }
+ id := conf.ID
+
+ // assign a name to the node if not set
+ if conf.Name == "" {
+ conf.Name = fmt.Sprintf("node%02d", len(self.Nodes)+1)
+ }
+
+ // check the node doesn't already exist
+ if node := self.getNode(id); node != nil {
+ return nil, fmt.Errorf("node with ID %q already exists", id)
+ }
+ if node := self.getNodeByName(conf.Name); node != nil {
+ return nil, fmt.Errorf("node with name %q already exists", conf.Name)
+ }
+
+ // if no services are configured, use the default service
+ if len(conf.Services) == 0 {
+ conf.Services = []string{self.DefaultService}
+ }
+
+ // use the NodeAdapter to create the node
+ adapterNode, err := self.nodeAdapter.NewNode(conf)
+ if err != nil {
+ return nil, err
+ }
+ node := &Node{
+ Node: adapterNode,
+ Config: conf,
+ }
+ log.Trace(fmt.Sprintf("node %v created", id))
+ self.nodeMap[id] = len(self.Nodes)
+ self.Nodes = append(self.Nodes, node)
+
+ // emit a "control" event
+ self.events.Send(ControlEvent(node))
+
+ return node, nil
+}
+
+// Config returns the network configuration
+func (self *Network) Config() *NetworkConfig {
+ return &self.NetworkConfig
+}
+
+// StartAll starts all nodes in the network
+func (self *Network) StartAll() error {
+ for _, node := range self.Nodes {
+ if node.Up {
+ continue
+ }
+ if err := self.Start(node.ID()); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// StopAll stops all nodes in the network
+func (self *Network) StopAll() error {
+ for _, node := range self.Nodes {
+ if !node.Up {
+ continue
+ }
+ if err := self.Stop(node.ID()); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// Start starts the node with the given ID
+func (self *Network) Start(id discover.NodeID) error {
+ return self.startWithSnapshots(id, nil)
+}
+
+// startWithSnapshots starts the node with the given ID using the give
+// snapshots
+func (self *Network) startWithSnapshots(id discover.NodeID, snapshots map[string][]byte) error {
+ node := self.GetNode(id)
+ if node == nil {
+ return fmt.Errorf("node %v does not exist", id)
+ }
+ if node.Up {
+ return fmt.Errorf("node %v already up", id)
+ }
+ log.Trace(fmt.Sprintf("starting node %v: %v using %v", id, node.Up, self.nodeAdapter.Name()))
+ if err := node.Start(snapshots); err != nil {
+ log.Warn(fmt.Sprintf("start up failed: %v", err))
+ return err
+ }
+ node.Up = true
+ log.Info(fmt.Sprintf("started node %v: %v", id, node.Up))
+
+ self.events.Send(NewEvent(node))
+
+ // subscribe to peer events
+ client, err := node.Client()
+ if err != nil {
+ return fmt.Errorf("error getting rpc client for node %v: %s", id, err)
+ }
+ events := make(chan *p2p.PeerEvent)
+ sub, err := client.Subscribe(context.Background(), "admin", events, "peerEvents")
+ if err != nil {
+ return fmt.Errorf("error getting peer events for node %v: %s", id, err)
+ }
+ go self.watchPeerEvents(id, events, sub)
+ return nil
+}
+
+// watchPeerEvents reads peer events from the given channel and emits
+// corresponding network events
+func (self *Network) watchPeerEvents(id discover.NodeID, events chan *p2p.PeerEvent, sub event.Subscription) {
+ defer func() {
+ sub.Unsubscribe()
+
+ // assume the node is now down
+ self.lock.Lock()
+ node := self.getNode(id)
+ node.Up = false
+ self.lock.Unlock()
+ self.events.Send(NewEvent(node))
+ }()
+ for {
+ select {
+ case event, ok := <-events:
+ if !ok {
+ return
+ }
+ peer := event.Peer
+ switch event.Type {
+
+ case p2p.PeerEventTypeAdd:
+ self.DidConnect(id, peer)
+
+ case p2p.PeerEventTypeDrop:
+ self.DidDisconnect(id, peer)
+
+ case p2p.PeerEventTypeMsgSend:
+ self.DidSend(id, peer, event.Protocol, *event.MsgCode)
+
+ case p2p.PeerEventTypeMsgRecv:
+ self.DidReceive(peer, id, event.Protocol, *event.MsgCode)
+
+ }
+
+ case err := <-sub.Err():
+ if err != nil {
+ log.Error(fmt.Sprintf("error getting peer events for node %v", id), "err", err)
+ }
+ return
+ }
+ }
+}
+
+// Stop stops the node with the given ID
+func (self *Network) Stop(id discover.NodeID) error {
+ node := self.GetNode(id)
+ if node == nil {
+ return fmt.Errorf("node %v does not exist", id)
+ }
+ if !node.Up {
+ return fmt.Errorf("node %v already down", id)
+ }
+ if err := node.Stop(); err != nil {
+ return err
+ }
+ node.Up = false
+ log.Info(fmt.Sprintf("stop node %v: %v", id, node.Up))
+
+ self.events.Send(ControlEvent(node))
+ return nil
+}
+
+// Connect connects two nodes together by calling the "admin_addPeer" RPC
+// method on the "one" node so that it connects to the "other" node
+func (self *Network) Connect(oneID, otherID discover.NodeID) error {
+ log.Debug(fmt.Sprintf("connecting %s to %s", oneID, otherID))
+ conn, err := self.GetOrCreateConn(oneID, otherID)
+ if err != nil {
+ return err
+ }
+ if conn.Up {
+ return fmt.Errorf("%v and %v already connected", oneID, otherID)
+ }
+ if err := conn.nodesUp(); err != nil {
+ return err
+ }
+ client, err := conn.one.Client()
+ if err != nil {
+ return err
+ }
+ self.events.Send(ControlEvent(conn))
+ return client.Call(nil, "admin_addPeer", string(conn.other.Addr()))
+}
+
+// Disconnect disconnects two nodes by calling the "admin_removePeer" RPC
+// method on the "one" node so that it disconnects from the "other" node
+func (self *Network) Disconnect(oneID, otherID discover.NodeID) error {
+ conn := self.GetConn(oneID, otherID)
+ if conn == nil {
+ return fmt.Errorf("connection between %v and %v does not exist", oneID, otherID)
+ }
+ if !conn.Up {
+ return fmt.Errorf("%v and %v already disconnected", oneID, otherID)
+ }
+ client, err := conn.one.Client()
+ if err != nil {
+ return err
+ }
+ self.events.Send(ControlEvent(conn))
+ return client.Call(nil, "admin_removePeer", string(conn.other.Addr()))
+}
+
+// DidConnect tracks the fact that the "one" node connected to the "other" node
+func (self *Network) DidConnect(one, other discover.NodeID) error {
+ conn, err := self.GetOrCreateConn(one, other)
+ if err != nil {
+ return fmt.Errorf("connection between %v and %v does not exist", one, other)
+ }
+ if conn.Up {
+ return fmt.Errorf("%v and %v already connected", one, other)
+ }
+ conn.Up = true
+ self.events.Send(NewEvent(conn))
+ return nil
+}
+
+// DidDisconnect tracks the fact that the "one" node disconnected from the
+// "other" node
+func (self *Network) DidDisconnect(one, other discover.NodeID) error {
+ conn, err := self.GetOrCreateConn(one, other)
+ if err != nil {
+ return fmt.Errorf("connection between %v and %v does not exist", one, other)
+ }
+ if !conn.Up {
+ return fmt.Errorf("%v and %v already disconnected", one, other)
+ }
+ conn.Up = false
+ self.events.Send(NewEvent(conn))
+ return nil
+}
+
+// DidSend tracks the fact that "sender" sent a message to "receiver"
+func (self *Network) DidSend(sender, receiver discover.NodeID, proto string, code uint64) error {
+ msg := &Msg{
+ One: sender,
+ Other: receiver,
+ Protocol: proto,
+ Code: code,
+ Received: false,
+ }
+ self.events.Send(NewEvent(msg))
+ return nil
+}
+
+// DidReceive tracks the fact that "receiver" received a message from "sender"
+func (self *Network) DidReceive(sender, receiver discover.NodeID, proto string, code uint64) error {
+ msg := &Msg{
+ One: sender,
+ Other: receiver,
+ Protocol: proto,
+ Code: code,
+ Received: true,
+ }
+ self.events.Send(NewEvent(msg))
+ return nil
+}
+
+// GetNode gets the node with the given ID, returning nil if the node does not
+// exist
+func (self *Network) GetNode(id discover.NodeID) *Node {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ return self.getNode(id)
+}
+
+// GetNode gets the node with the given name, returning nil if the node does
+// not exist
+func (self *Network) GetNodeByName(name string) *Node {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ return self.getNodeByName(name)
+}
+
+func (self *Network) getNode(id discover.NodeID) *Node {
+ i, found := self.nodeMap[id]
+ if !found {
+ return nil
+ }
+ return self.Nodes[i]
+}
+
+func (self *Network) getNodeByName(name string) *Node {
+ for _, node := range self.Nodes {
+ if node.Config.Name == name {
+ return node
+ }
+ }
+ return nil
+}
+
+// GetNodes returns the existing nodes
+func (self *Network) GetNodes() []*Node {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ return self.Nodes
+}
+
+// GetConn returns the connection which exists between "one" and "other"
+// regardless of which node initiated the connection
+func (self *Network) GetConn(oneID, otherID discover.NodeID) *Conn {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ return self.getConn(oneID, otherID)
+}
+
+// GetOrCreateConn is like GetConn but creates the connection if it doesn't
+// already exist
+func (self *Network) GetOrCreateConn(oneID, otherID discover.NodeID) (*Conn, error) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ if conn := self.getConn(oneID, otherID); conn != nil {
+ return conn, nil
+ }
+
+ one := self.getNode(oneID)
+ if one == nil {
+ return nil, fmt.Errorf("node %v does not exist", oneID)
+ }
+ other := self.getNode(otherID)
+ if other == nil {
+ return nil, fmt.Errorf("node %v does not exist", otherID)
+ }
+ conn := &Conn{
+ One: oneID,
+ Other: otherID,
+ one: one,
+ other: other,
+ }
+ label := ConnLabel(oneID, otherID)
+ self.connMap[label] = len(self.Conns)
+ self.Conns = append(self.Conns, conn)
+ return conn, nil
+}
+
+func (self *Network) getConn(oneID, otherID discover.NodeID) *Conn {
+ label := ConnLabel(oneID, otherID)
+ i, found := self.connMap[label]
+ if !found {
+ return nil
+ }
+ return self.Conns[i]
+}
+
+// Shutdown stops all nodes in the network and closes the quit channel
+func (self *Network) Shutdown() {
+ for _, node := range self.Nodes {
+ log.Debug(fmt.Sprintf("stopping node %s", node.ID().TerminalString()))
+ if err := node.Stop(); err != nil {
+ log.Warn(fmt.Sprintf("error stopping node %s", node.ID().TerminalString()), "err", err)
+ }
+ }
+ close(self.quitc)
+}
+
+// Node is a wrapper around adapters.Node which is used to track the status
+// of a node in the network
+type Node struct {
+ adapters.Node `json:"-"`
+
+ // Config if the config used to created the node
+ Config *adapters.NodeConfig `json:"config"`
+
+ // Up tracks whether or not the node is running
+ Up bool `json:"up"`
+}
+
+// ID returns the ID of the node
+func (self *Node) ID() discover.NodeID {
+ return self.Config.ID
+}
+
+// String returns a log-friendly string
+func (self *Node) String() string {
+ return fmt.Sprintf("Node %v", self.ID().TerminalString())
+}
+
+// NodeInfo returns information about the node
+func (self *Node) NodeInfo() *p2p.NodeInfo {
+ // avoid a panic if the node is not started yet
+ if self.Node == nil {
+ return nil
+ }
+ info := self.Node.NodeInfo()
+ info.Name = self.Config.Name
+ return info
+}
+
+// MarshalJSON implements the json.Marshaler interface so that the encoded
+// JSON includes the NodeInfo
+func (self *Node) MarshalJSON() ([]byte, error) {
+ return json.Marshal(struct {
+ Info *p2p.NodeInfo `json:"info,omitempty"`
+ Config *adapters.NodeConfig `json:"config,omitempty"`
+ Up bool `json:"up"`
+ }{
+ Info: self.NodeInfo(),
+ Config: self.Config,
+ Up: self.Up,
+ })
+}
+
+// Conn represents a connection between two nodes in the network
+type Conn struct {
+ // One is the node which initiated the connection
+ One discover.NodeID `json:"one"`
+
+ // Other is the node which the connection was made to
+ Other discover.NodeID `json:"other"`
+
+ // Up tracks whether or not the connection is active
+ Up bool `json:"up"`
+
+ one *Node
+ other *Node
+}
+
+// nodesUp returns whether both nodes are currently up
+func (self *Conn) nodesUp() error {
+ if !self.one.Up {
+ return fmt.Errorf("one %v is not up", self.One)
+ }
+ if !self.other.Up {
+ return fmt.Errorf("other %v is not up", self.Other)
+ }
+ return nil
+}
+
+// String returns a log-friendly string
+func (self *Conn) String() string {
+ return fmt.Sprintf("Conn %v->%v", self.One.TerminalString(), self.Other.TerminalString())
+}
+
+// Msg represents a p2p message sent between two nodes in the network
+type Msg struct {
+ One discover.NodeID `json:"one"`
+ Other discover.NodeID `json:"other"`
+ Protocol string `json:"protocol"`
+ Code uint64 `json:"code"`
+ Received bool `json:"received"`
+}
+
+// String returns a log-friendly string
+func (self *Msg) String() string {
+ return fmt.Sprintf("Msg(%d) %v->%v", self.Code, self.One.TerminalString(), self.Other.TerminalString())
+}
+
+// ConnLabel generates a deterministic string which represents a connection
+// between two nodes, used to compare if two connections are between the same
+// nodes
+func ConnLabel(source, target discover.NodeID) string {
+ var first, second discover.NodeID
+ if bytes.Compare(source.Bytes(), target.Bytes()) > 0 {
+ first = target
+ second = source
+ } else {
+ first = source
+ second = target
+ }
+ return fmt.Sprintf("%v-%v", first, second)
+}
+
+// Snapshot represents the state of a network at a single point in time and can
+// be used to restore the state of a network
+type Snapshot struct {
+ Nodes []NodeSnapshot `json:"nodes,omitempty"`
+ Conns []Conn `json:"conns,omitempty"`
+}
+
+// NodeSnapshot represents the state of a node in the network
+type NodeSnapshot struct {
+ Node Node `json:"node,omitempty"`
+
+ // Snapshots is arbitrary data gathered from calling node.Snapshots()
+ Snapshots map[string][]byte `json:"snapshots,omitempty"`
+}
+
+// Snapshot creates a network snapshot
+func (self *Network) Snapshot() (*Snapshot, error) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ snap := &Snapshot{
+ Nodes: make([]NodeSnapshot, len(self.Nodes)),
+ Conns: make([]Conn, len(self.Conns)),
+ }
+ for i, node := range self.Nodes {
+ snap.Nodes[i] = NodeSnapshot{Node: *node}
+ if !node.Up {
+ continue
+ }
+ snapshots, err := node.Snapshots()
+ if err != nil {
+ return nil, err
+ }
+ snap.Nodes[i].Snapshots = snapshots
+ }
+ for i, conn := range self.Conns {
+ snap.Conns[i] = *conn
+ }
+ return snap, nil
+}
+
+// Load loads a network snapshot
+func (self *Network) Load(snap *Snapshot) error {
+ for _, n := range snap.Nodes {
+ if _, err := self.NewNodeWithConfig(n.Node.Config); err != nil {
+ return err
+ }
+ if !n.Node.Up {
+ continue
+ }
+ if err := self.startWithSnapshots(n.Node.Config.ID, n.Snapshots); err != nil {
+ return err
+ }
+ }
+ for _, conn := range snap.Conns {
+ if err := self.Connect(conn.One, conn.Other); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// Subscribe reads control events from a channel and executes them
+func (self *Network) Subscribe(events chan *Event) {
+ for {
+ select {
+ case event, ok := <-events:
+ if !ok {
+ return
+ }
+ if event.Control {
+ self.executeControlEvent(event)
+ }
+ case <-self.quitc:
+ return
+ }
+ }
+}
+
+func (self *Network) executeControlEvent(event *Event) {
+ log.Trace("execute control event", "type", event.Type, "event", event)
+ switch event.Type {
+ case EventTypeNode:
+ if err := self.executeNodeEvent(event); err != nil {
+ log.Error("error executing node event", "event", event, "err", err)
+ }
+ case EventTypeConn:
+ if err := self.executeConnEvent(event); err != nil {
+ log.Error("error executing conn event", "event", event, "err", err)
+ }
+ case EventTypeMsg:
+ log.Warn("ignoring control msg event")
+ }
+}
+
+func (self *Network) executeNodeEvent(e *Event) error {
+ if !e.Node.Up {
+ return self.Stop(e.Node.ID())
+ }
+
+ if _, err := self.NewNodeWithConfig(e.Node.Config); err != nil {
+ return err
+ }
+ return self.Start(e.Node.ID())
+}
+
+func (self *Network) executeConnEvent(e *Event) error {
+ if e.Conn.Up {
+ return self.Connect(e.Conn.One, e.Conn.Other)
+ } else {
+ return self.Disconnect(e.Conn.One, e.Conn.Other)
+ }
+}
diff --git a/p2p/simulations/network_test.go b/p2p/simulations/network_test.go
new file mode 100644
index 000000000..2a062121b
--- /dev/null
+++ b/p2p/simulations/network_test.go
@@ -0,0 +1,159 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package simulations
+
+import (
+ "context"
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
+)
+
+// TestNetworkSimulation creates a multi-node simulation network with each node
+// connected in a ring topology, checks that all nodes successfully handshake
+// with each other and that a snapshot fully represents the desired topology
+func TestNetworkSimulation(t *testing.T) {
+ // create simulation network with 20 testService nodes
+ adapter := adapters.NewSimAdapter(adapters.Services{
+ "test": newTestService,
+ })
+ network := NewNetwork(adapter, &NetworkConfig{
+ DefaultService: "test",
+ })
+ defer network.Shutdown()
+ nodeCount := 20
+ ids := make([]discover.NodeID, nodeCount)
+ for i := 0; i < nodeCount; i++ {
+ node, err := network.NewNode()
+ if err != nil {
+ t.Fatalf("error creating node: %s", err)
+ }
+ if err := network.Start(node.ID()); err != nil {
+ t.Fatalf("error starting node: %s", err)
+ }
+ ids[i] = node.ID()
+ }
+
+ // perform a check which connects the nodes in a ring (so each node is
+ // connected to exactly two peers) and then checks that all nodes
+ // performed two handshakes by checking their peerCount
+ action := func(_ context.Context) error {
+ for i, id := range ids {
+ peerID := ids[(i+1)%len(ids)]
+ if err := network.Connect(id, peerID); err != nil {
+ return err
+ }
+ }
+ return nil
+ }
+ check := func(ctx context.Context, id discover.NodeID) (bool, error) {
+ // check we haven't run out of time
+ select {
+ case <-ctx.Done():
+ return false, ctx.Err()
+ default:
+ }
+
+ // get the node
+ node := network.GetNode(id)
+ if node == nil {
+ return false, fmt.Errorf("unknown node: %s", id)
+ }
+
+ // check it has exactly two peers
+ client, err := node.Client()
+ if err != nil {
+ return false, err
+ }
+ var peerCount int64
+ if err := client.CallContext(ctx, &peerCount, "test_peerCount"); err != nil {
+ return false, err
+ }
+ switch {
+ case peerCount < 2:
+ return false, nil
+ case peerCount == 2:
+ return true, nil
+ default:
+ return false, fmt.Errorf("unexpected peerCount: %d", peerCount)
+ }
+ }
+
+ timeout := 30 * time.Second
+ ctx, cancel := context.WithTimeout(context.Background(), timeout)
+ defer cancel()
+
+ // trigger a check every 100ms
+ trigger := make(chan discover.NodeID)
+ go triggerChecks(ctx, ids, trigger, 100*time.Millisecond)
+
+ result := NewSimulation(network).Run(ctx, &Step{
+ Action: action,
+ Trigger: trigger,
+ Expect: &Expectation{
+ Nodes: ids,
+ Check: check,
+ },
+ })
+ if result.Error != nil {
+ t.Fatalf("simulation failed: %s", result.Error)
+ }
+
+ // take a network snapshot and check it contains the correct topology
+ snap, err := network.Snapshot()
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(snap.Nodes) != nodeCount {
+ t.Fatalf("expected snapshot to contain %d nodes, got %d", nodeCount, len(snap.Nodes))
+ }
+ if len(snap.Conns) != nodeCount {
+ t.Fatalf("expected snapshot to contain %d connections, got %d", nodeCount, len(snap.Conns))
+ }
+ for i, id := range ids {
+ conn := snap.Conns[i]
+ if conn.One != id {
+ t.Fatalf("expected conn[%d].One to be %s, got %s", i, id, conn.One)
+ }
+ peerID := ids[(i+1)%len(ids)]
+ if conn.Other != peerID {
+ t.Fatalf("expected conn[%d].Other to be %s, got %s", i, peerID, conn.Other)
+ }
+ }
+}
+
+func triggerChecks(ctx context.Context, ids []discover.NodeID, trigger chan discover.NodeID, interval time.Duration) {
+ tick := time.NewTicker(interval)
+ defer tick.Stop()
+ for {
+ select {
+ case <-tick.C:
+ for _, id := range ids {
+ select {
+ case trigger <- id:
+ case <-ctx.Done():
+ return
+ }
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+}
diff --git a/p2p/simulations/simulation.go b/p2p/simulations/simulation.go
new file mode 100644
index 000000000..28886e924
--- /dev/null
+++ b/p2p/simulations/simulation.go
@@ -0,0 +1,157 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package simulations
+
+import (
+ "context"
+ "time"
+
+ "github.com/ethereum/go-ethereum/p2p/discover"
+)
+
+// Simulation provides a framework for running actions in a simulated network
+// and then waiting for expectations to be met
+type Simulation struct {
+ network *Network
+}
+
+// NewSimulation returns a new simulation which runs in the given network
+func NewSimulation(network *Network) *Simulation {
+ return &Simulation{
+ network: network,
+ }
+}
+
+// Run performs a step of the simulation by performing the step's action and
+// then waiting for the step's expectation to be met
+func (s *Simulation) Run(ctx context.Context, step *Step) (result *StepResult) {
+ result = newStepResult()
+
+ result.StartedAt = time.Now()
+ defer func() { result.FinishedAt = time.Now() }()
+
+ // watch network events for the duration of the step
+ stop := s.watchNetwork(result)
+ defer stop()
+
+ // perform the action
+ if err := step.Action(ctx); err != nil {
+ result.Error = err
+ return
+ }
+
+ // wait for all node expectations to either pass, error or timeout
+ nodes := make(map[discover.NodeID]struct{}, len(step.Expect.Nodes))
+ for _, id := range step.Expect.Nodes {
+ nodes[id] = struct{}{}
+ }
+ for len(result.Passes) < len(nodes) {
+ select {
+ case id := <-step.Trigger:
+ // skip if we aren't checking the node
+ if _, ok := nodes[id]; !ok {
+ continue
+ }
+
+ // skip if the node has already passed
+ if _, ok := result.Passes[id]; ok {
+ continue
+ }
+
+ // run the node expectation check
+ pass, err := step.Expect.Check(ctx, id)
+ if err != nil {
+ result.Error = err
+ return
+ }
+ if pass {
+ result.Passes[id] = time.Now()
+ }
+ case <-ctx.Done():
+ result.Error = ctx.Err()
+ return
+ }
+ }
+
+ return
+}
+
+func (s *Simulation) watchNetwork(result *StepResult) func() {
+ stop := make(chan struct{})
+ done := make(chan struct{})
+ events := make(chan *Event)
+ sub := s.network.Events().Subscribe(events)
+ go func() {
+ defer close(done)
+ defer sub.Unsubscribe()
+ for {
+ select {
+ case event := <-events:
+ result.NetworkEvents = append(result.NetworkEvents, event)
+ case <-stop:
+ return
+ }
+ }
+ }()
+ return func() {
+ close(stop)
+ <-done
+ }
+}
+
+type Step struct {
+ // Action is the action to perform for this step
+ Action func(context.Context) error
+
+ // Trigger is a channel which receives node ids and triggers an
+ // expectation check for that node
+ Trigger chan discover.NodeID
+
+ // Expect is the expectation to wait for when performing this step
+ Expect *Expectation
+}
+
+type Expectation struct {
+ // Nodes is a list of nodes to check
+ Nodes []discover.NodeID
+
+ // Check checks whether a given node meets the expectation
+ Check func(context.Context, discover.NodeID) (bool, error)
+}
+
+func newStepResult() *StepResult {
+ return &StepResult{
+ Passes: make(map[discover.NodeID]time.Time),
+ }
+}
+
+type StepResult struct {
+ // Error is the error encountered whilst running the step
+ Error error
+
+ // StartedAt is the time the step started
+ StartedAt time.Time
+
+ // FinishedAt is the time the step finished
+ FinishedAt time.Time
+
+ // Passes are the timestamps of the successful node expectations
+ Passes map[discover.NodeID]time.Time
+
+ // NetworkEvents are the network events which occurred during the step
+ NetworkEvents []*Event
+}