aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/simulations/adapters
diff options
context:
space:
mode:
authorLewis Marshall <lewis@lmars.net>2017-09-25 16:08:07 +0800
committerFelix Lange <fjl@users.noreply.github.com>2017-09-25 16:08:07 +0800
commit9feec51e2dd754819e5c730ac5985d28d57adb48 (patch)
tree32b07b659cf7d0b4c1a7da67b5c49daf7a10a9d3 /p2p/simulations/adapters
parent673007d7aed1d2678ea3277eceb7b55dc29cf092 (diff)
downloadgo-tangerine-9feec51e2dd754819e5c730ac5985d28d57adb48.tar
go-tangerine-9feec51e2dd754819e5c730ac5985d28d57adb48.tar.gz
go-tangerine-9feec51e2dd754819e5c730ac5985d28d57adb48.tar.bz2
go-tangerine-9feec51e2dd754819e5c730ac5985d28d57adb48.tar.lz
go-tangerine-9feec51e2dd754819e5c730ac5985d28d57adb48.tar.xz
go-tangerine-9feec51e2dd754819e5c730ac5985d28d57adb48.tar.zst
go-tangerine-9feec51e2dd754819e5c730ac5985d28d57adb48.zip
p2p: add network simulation framework (#14982)
This commit introduces a network simulation framework which can be used to run simulated networks of devp2p nodes. The intention is to use this for testing protocols, performing benchmarks and visualising emergent network behaviour.
Diffstat (limited to 'p2p/simulations/adapters')
-rw-r--r--p2p/simulations/adapters/docker.go182
-rw-r--r--p2p/simulations/adapters/exec.go504
-rw-r--r--p2p/simulations/adapters/inproc.go314
-rw-r--r--p2p/simulations/adapters/types.go215
4 files changed, 1215 insertions, 0 deletions
diff --git a/p2p/simulations/adapters/docker.go b/p2p/simulations/adapters/docker.go
new file mode 100644
index 000000000..022314b3d
--- /dev/null
+++ b/p2p/simulations/adapters/docker.go
@@ -0,0 +1,182 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package adapters
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "runtime"
+ "strings"
+
+ "github.com/docker/docker/pkg/reexec"
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+)
+
+// DockerAdapter is a NodeAdapter which runs simulation nodes inside Docker
+// containers.
+//
+// A Docker image is built which contains the current binary at /bin/p2p-node
+// which when executed runs the underlying service (see the description
+// of the execP2PNode function for more details)
+type DockerAdapter struct {
+ ExecAdapter
+}
+
+// NewDockerAdapter builds the p2p-node Docker image containing the current
+// binary and returns a DockerAdapter
+func NewDockerAdapter() (*DockerAdapter, error) {
+ // Since Docker containers run on Linux and this adapter runs the
+ // current binary in the container, it must be compiled for Linux.
+ //
+ // It is reasonable to require this because the caller can just
+ // compile the current binary in a Docker container.
+ if runtime.GOOS != "linux" {
+ return nil, errors.New("DockerAdapter can only be used on Linux as it uses the current binary (which must be a Linux binary)")
+ }
+
+ if err := buildDockerImage(); err != nil {
+ return nil, err
+ }
+
+ return &DockerAdapter{
+ ExecAdapter{
+ nodes: make(map[discover.NodeID]*ExecNode),
+ },
+ }, nil
+}
+
+// Name returns the name of the adapter for logging purposes
+func (d *DockerAdapter) Name() string {
+ return "docker-adapter"
+}
+
+// NewNode returns a new DockerNode using the given config
+func (d *DockerAdapter) NewNode(config *NodeConfig) (Node, error) {
+ if len(config.Services) == 0 {
+ return nil, errors.New("node must have at least one service")
+ }
+ for _, service := range config.Services {
+ if _, exists := serviceFuncs[service]; !exists {
+ return nil, fmt.Errorf("unknown node service %q", service)
+ }
+ }
+
+ // generate the config
+ conf := &execNodeConfig{
+ Stack: node.DefaultConfig,
+ Node: config,
+ }
+ conf.Stack.DataDir = "/data"
+ conf.Stack.WSHost = "0.0.0.0"
+ conf.Stack.WSOrigins = []string{"*"}
+ conf.Stack.WSExposeAll = true
+ conf.Stack.P2P.EnableMsgEvents = false
+ conf.Stack.P2P.NoDiscovery = true
+ conf.Stack.P2P.NAT = nil
+ conf.Stack.NoUSB = true
+
+ node := &DockerNode{
+ ExecNode: ExecNode{
+ ID: config.ID,
+ Config: conf,
+ adapter: &d.ExecAdapter,
+ },
+ }
+ node.newCmd = node.dockerCommand
+ d.ExecAdapter.nodes[node.ID] = &node.ExecNode
+ return node, nil
+}
+
+// DockerNode wraps an ExecNode but exec's the current binary in a docker
+// container rather than locally
+type DockerNode struct {
+ ExecNode
+}
+
+// dockerCommand returns a command which exec's the binary in a Docker
+// container.
+//
+// It uses a shell so that we can pass the _P2P_NODE_CONFIG environment
+// variable to the container using the --env flag.
+func (n *DockerNode) dockerCommand() *exec.Cmd {
+ return exec.Command(
+ "sh", "-c",
+ fmt.Sprintf(
+ `exec docker run --interactive --env _P2P_NODE_CONFIG="${_P2P_NODE_CONFIG}" %s p2p-node %s %s`,
+ dockerImage, strings.Join(n.Config.Node.Services, ","), n.ID.String(),
+ ),
+ )
+}
+
+// dockerImage is the name of the Docker image which gets built to run the
+// simulation node
+const dockerImage = "p2p-node"
+
+// buildDockerImage builds the Docker image which is used to run the simulation
+// node in a Docker container.
+//
+// It adds the current binary as "p2p-node" so that it runs execP2PNode
+// when executed.
+func buildDockerImage() error {
+ // create a directory to use as the build context
+ dir, err := ioutil.TempDir("", "p2p-docker")
+ if err != nil {
+ return err
+ }
+ defer os.RemoveAll(dir)
+
+ // copy the current binary into the build context
+ bin, err := os.Open(reexec.Self())
+ if err != nil {
+ return err
+ }
+ defer bin.Close()
+ dst, err := os.OpenFile(filepath.Join(dir, "self.bin"), os.O_WRONLY|os.O_CREATE, 0755)
+ if err != nil {
+ return err
+ }
+ defer dst.Close()
+ if _, err := io.Copy(dst, bin); err != nil {
+ return err
+ }
+
+ // create the Dockerfile
+ dockerfile := []byte(`
+FROM ubuntu:16.04
+RUN mkdir /data
+ADD self.bin /bin/p2p-node
+ `)
+ if err := ioutil.WriteFile(filepath.Join(dir, "Dockerfile"), dockerfile, 0644); err != nil {
+ return err
+ }
+
+ // run 'docker build'
+ cmd := exec.Command("docker", "build", "-t", dockerImage, dir)
+ cmd.Stdout = os.Stdout
+ cmd.Stderr = os.Stderr
+ if err := cmd.Run(); err != nil {
+ return fmt.Errorf("error building docker image: %s", err)
+ }
+
+ return nil
+}
diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go
new file mode 100644
index 000000000..bdb92cc1d
--- /dev/null
+++ b/p2p/simulations/adapters/exec.go
@@ -0,0 +1,504 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package adapters
+
+import (
+ "bufio"
+ "context"
+ "crypto/ecdsa"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "net"
+ "os"
+ "os/exec"
+ "os/signal"
+ "path/filepath"
+ "regexp"
+ "strings"
+ "sync"
+ "syscall"
+ "time"
+
+ "github.com/docker/docker/pkg/reexec"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/rpc"
+ "golang.org/x/net/websocket"
+)
+
+// ExecAdapter is a NodeAdapter which runs simulation nodes by executing the
+// current binary as a child process.
+//
+// An init hook is used so that the child process executes the node services
+// (rather than whataver the main() function would normally do), see the
+// execP2PNode function for more information.
+type ExecAdapter struct {
+ // BaseDir is the directory under which the data directories for each
+ // simulation node are created.
+ BaseDir string
+
+ nodes map[discover.NodeID]*ExecNode
+}
+
+// NewExecAdapter returns an ExecAdapter which stores node data in
+// subdirectories of the given base directory
+func NewExecAdapter(baseDir string) *ExecAdapter {
+ return &ExecAdapter{
+ BaseDir: baseDir,
+ nodes: make(map[discover.NodeID]*ExecNode),
+ }
+}
+
+// Name returns the name of the adapter for logging purposes
+func (e *ExecAdapter) Name() string {
+ return "exec-adapter"
+}
+
+// NewNode returns a new ExecNode using the given config
+func (e *ExecAdapter) NewNode(config *NodeConfig) (Node, error) {
+ if len(config.Services) == 0 {
+ return nil, errors.New("node must have at least one service")
+ }
+ for _, service := range config.Services {
+ if _, exists := serviceFuncs[service]; !exists {
+ return nil, fmt.Errorf("unknown node service %q", service)
+ }
+ }
+
+ // create the node directory using the first 12 characters of the ID
+ // as Unix socket paths cannot be longer than 256 characters
+ dir := filepath.Join(e.BaseDir, config.ID.String()[:12])
+ if err := os.Mkdir(dir, 0755); err != nil {
+ return nil, fmt.Errorf("error creating node directory: %s", err)
+ }
+
+ // generate the config
+ conf := &execNodeConfig{
+ Stack: node.DefaultConfig,
+ Node: config,
+ }
+ conf.Stack.DataDir = filepath.Join(dir, "data")
+ conf.Stack.WSHost = "127.0.0.1"
+ conf.Stack.WSPort = 0
+ conf.Stack.WSOrigins = []string{"*"}
+ conf.Stack.WSExposeAll = true
+ conf.Stack.P2P.EnableMsgEvents = false
+ conf.Stack.P2P.NoDiscovery = true
+ conf.Stack.P2P.NAT = nil
+ conf.Stack.NoUSB = true
+
+ // listen on a random localhost port (we'll get the actual port after
+ // starting the node through the RPC admin.nodeInfo method)
+ conf.Stack.P2P.ListenAddr = "127.0.0.1:0"
+
+ node := &ExecNode{
+ ID: config.ID,
+ Dir: dir,
+ Config: conf,
+ adapter: e,
+ }
+ node.newCmd = node.execCommand
+ e.nodes[node.ID] = node
+ return node, nil
+}
+
+// ExecNode starts a simulation node by exec'ing the current binary and
+// running the configured services
+type ExecNode struct {
+ ID discover.NodeID
+ Dir string
+ Config *execNodeConfig
+ Cmd *exec.Cmd
+ Info *p2p.NodeInfo
+
+ adapter *ExecAdapter
+ client *rpc.Client
+ wsAddr string
+ newCmd func() *exec.Cmd
+ key *ecdsa.PrivateKey
+}
+
+// Addr returns the node's enode URL
+func (n *ExecNode) Addr() []byte {
+ if n.Info == nil {
+ return nil
+ }
+ return []byte(n.Info.Enode)
+}
+
+// Client returns an rpc.Client which can be used to communicate with the
+// underlying services (it is set once the node has started)
+func (n *ExecNode) Client() (*rpc.Client, error) {
+ return n.client, nil
+}
+
+// wsAddrPattern is a regex used to read the WebSocket address from the node's
+// log
+var wsAddrPattern = regexp.MustCompile(`ws://[\d.:]+`)
+
+// Start exec's the node passing the ID and service as command line arguments
+// and the node config encoded as JSON in the _P2P_NODE_CONFIG environment
+// variable
+func (n *ExecNode) Start(snapshots map[string][]byte) (err error) {
+ if n.Cmd != nil {
+ return errors.New("already started")
+ }
+ defer func() {
+ if err != nil {
+ log.Error("node failed to start", "err", err)
+ n.Stop()
+ }
+ }()
+
+ // encode a copy of the config containing the snapshot
+ confCopy := *n.Config
+ confCopy.Snapshots = snapshots
+ confCopy.PeerAddrs = make(map[string]string)
+ for id, node := range n.adapter.nodes {
+ confCopy.PeerAddrs[id.String()] = node.wsAddr
+ }
+ confData, err := json.Marshal(confCopy)
+ if err != nil {
+ return fmt.Errorf("error generating node config: %s", err)
+ }
+
+ // use a pipe for stderr so we can both copy the node's stderr to
+ // os.Stderr and read the WebSocket address from the logs
+ stderrR, stderrW := io.Pipe()
+ stderr := io.MultiWriter(os.Stderr, stderrW)
+
+ // start the node
+ cmd := n.newCmd()
+ cmd.Stdout = os.Stdout
+ cmd.Stderr = stderr
+ cmd.Env = append(os.Environ(), fmt.Sprintf("_P2P_NODE_CONFIG=%s", confData))
+ if err := cmd.Start(); err != nil {
+ return fmt.Errorf("error starting node: %s", err)
+ }
+ n.Cmd = cmd
+
+ // read the WebSocket address from the stderr logs
+ var wsAddr string
+ wsAddrC := make(chan string)
+ go func() {
+ s := bufio.NewScanner(stderrR)
+ for s.Scan() {
+ if strings.Contains(s.Text(), "WebSocket endpoint opened:") {
+ wsAddrC <- wsAddrPattern.FindString(s.Text())
+ }
+ }
+ }()
+ select {
+ case wsAddr = <-wsAddrC:
+ if wsAddr == "" {
+ return errors.New("failed to read WebSocket address from stderr")
+ }
+ case <-time.After(10 * time.Second):
+ return errors.New("timed out waiting for WebSocket address on stderr")
+ }
+
+ // create the RPC client and load the node info
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+ client, err := rpc.DialWebsocket(ctx, wsAddr, "")
+ if err != nil {
+ return fmt.Errorf("error dialing rpc websocket: %s", err)
+ }
+ var info p2p.NodeInfo
+ if err := client.CallContext(ctx, &info, "admin_nodeInfo"); err != nil {
+ return fmt.Errorf("error getting node info: %s", err)
+ }
+ n.client = client
+ n.wsAddr = wsAddr
+ n.Info = &info
+
+ return nil
+}
+
+// execCommand returns a command which runs the node locally by exec'ing
+// the current binary but setting argv[0] to "p2p-node" so that the child
+// runs execP2PNode
+func (n *ExecNode) execCommand() *exec.Cmd {
+ return &exec.Cmd{
+ Path: reexec.Self(),
+ Args: []string{"p2p-node", strings.Join(n.Config.Node.Services, ","), n.ID.String()},
+ }
+}
+
+// Stop stops the node by first sending SIGTERM and then SIGKILL if the node
+// doesn't stop within 5s
+func (n *ExecNode) Stop() error {
+ if n.Cmd == nil {
+ return nil
+ }
+ defer func() {
+ n.Cmd = nil
+ }()
+
+ if n.client != nil {
+ n.client.Close()
+ n.client = nil
+ n.wsAddr = ""
+ n.Info = nil
+ }
+
+ if err := n.Cmd.Process.Signal(syscall.SIGTERM); err != nil {
+ return n.Cmd.Process.Kill()
+ }
+ waitErr := make(chan error)
+ go func() {
+ waitErr <- n.Cmd.Wait()
+ }()
+ select {
+ case err := <-waitErr:
+ return err
+ case <-time.After(5 * time.Second):
+ return n.Cmd.Process.Kill()
+ }
+}
+
+// NodeInfo returns information about the node
+func (n *ExecNode) NodeInfo() *p2p.NodeInfo {
+ info := &p2p.NodeInfo{
+ ID: n.ID.String(),
+ }
+ if n.client != nil {
+ n.client.Call(&info, "admin_nodeInfo")
+ }
+ return info
+}
+
+// ServeRPC serves RPC requests over the given connection by dialling the
+// node's WebSocket address and joining the two connections
+func (n *ExecNode) ServeRPC(clientConn net.Conn) error {
+ conn, err := websocket.Dial(n.wsAddr, "", "http://localhost")
+ if err != nil {
+ return err
+ }
+ var wg sync.WaitGroup
+ wg.Add(2)
+ join := func(src, dst net.Conn) {
+ defer wg.Done()
+ io.Copy(dst, src)
+ // close the write end of the destination connection
+ if cw, ok := dst.(interface {
+ CloseWrite() error
+ }); ok {
+ cw.CloseWrite()
+ } else {
+ dst.Close()
+ }
+ }
+ go join(conn, clientConn)
+ go join(clientConn, conn)
+ wg.Wait()
+ return nil
+}
+
+// Snapshots creates snapshots of the services by calling the
+// simulation_snapshot RPC method
+func (n *ExecNode) Snapshots() (map[string][]byte, error) {
+ if n.client == nil {
+ return nil, errors.New("RPC not started")
+ }
+ var snapshots map[string][]byte
+ return snapshots, n.client.Call(&snapshots, "simulation_snapshot")
+}
+
+func init() {
+ // register a reexec function to start a devp2p node when the current
+ // binary is executed as "p2p-node"
+ reexec.Register("p2p-node", execP2PNode)
+}
+
+// execNodeConfig is used to serialize the node configuration so it can be
+// passed to the child process as a JSON encoded environment variable
+type execNodeConfig struct {
+ Stack node.Config `json:"stack"`
+ Node *NodeConfig `json:"node"`
+ Snapshots map[string][]byte `json:"snapshots,omitempty"`
+ PeerAddrs map[string]string `json:"peer_addrs,omitempty"`
+}
+
+// execP2PNode starts a devp2p node when the current binary is executed with
+// argv[0] being "p2p-node", reading the service / ID from argv[1] / argv[2]
+// and the node config from the _P2P_NODE_CONFIG environment variable
+func execP2PNode() {
+ glogger := log.NewGlogHandler(log.StreamHandler(os.Stderr, log.LogfmtFormat()))
+ glogger.Verbosity(log.LvlInfo)
+ log.Root().SetHandler(glogger)
+
+ // read the services from argv
+ serviceNames := strings.Split(os.Args[1], ",")
+
+ // decode the config
+ confEnv := os.Getenv("_P2P_NODE_CONFIG")
+ if confEnv == "" {
+ log.Crit("missing _P2P_NODE_CONFIG")
+ }
+ var conf execNodeConfig
+ if err := json.Unmarshal([]byte(confEnv), &conf); err != nil {
+ log.Crit("error decoding _P2P_NODE_CONFIG", "err", err)
+ }
+ conf.Stack.P2P.PrivateKey = conf.Node.PrivateKey
+
+ // use explicit IP address in ListenAddr so that Enode URL is usable
+ externalIP := func() string {
+ addrs, err := net.InterfaceAddrs()
+ if err != nil {
+ log.Crit("error getting IP address", "err", err)
+ }
+ for _, addr := range addrs {
+ if ip, ok := addr.(*net.IPNet); ok && !ip.IP.IsLoopback() {
+ return ip.IP.String()
+ }
+ }
+ log.Crit("unable to determine explicit IP address")
+ return ""
+ }
+ if strings.HasPrefix(conf.Stack.P2P.ListenAddr, ":") {
+ conf.Stack.P2P.ListenAddr = externalIP() + conf.Stack.P2P.ListenAddr
+ }
+ if conf.Stack.WSHost == "0.0.0.0" {
+ conf.Stack.WSHost = externalIP()
+ }
+
+ // initialize the devp2p stack
+ stack, err := node.New(&conf.Stack)
+ if err != nil {
+ log.Crit("error creating node stack", "err", err)
+ }
+
+ // register the services, collecting them into a map so we can wrap
+ // them in a snapshot service
+ services := make(map[string]node.Service, len(serviceNames))
+ for _, name := range serviceNames {
+ serviceFunc, exists := serviceFuncs[name]
+ if !exists {
+ log.Crit("unknown node service", "name", name)
+ }
+ constructor := func(nodeCtx *node.ServiceContext) (node.Service, error) {
+ ctx := &ServiceContext{
+ RPCDialer: &wsRPCDialer{addrs: conf.PeerAddrs},
+ NodeContext: nodeCtx,
+ Config: conf.Node,
+ }
+ if conf.Snapshots != nil {
+ ctx.Snapshot = conf.Snapshots[name]
+ }
+ service, err := serviceFunc(ctx)
+ if err != nil {
+ return nil, err
+ }
+ services[name] = service
+ return service, nil
+ }
+ if err := stack.Register(constructor); err != nil {
+ log.Crit("error starting service", "name", name, "err", err)
+ }
+ }
+
+ // register the snapshot service
+ if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
+ return &snapshotService{services}, nil
+ }); err != nil {
+ log.Crit("error starting snapshot service", "err", err)
+ }
+
+ // start the stack
+ if err := stack.Start(); err != nil {
+ log.Crit("error stating node stack", "err", err)
+ }
+
+ // stop the stack if we get a SIGTERM signal
+ go func() {
+ sigc := make(chan os.Signal, 1)
+ signal.Notify(sigc, syscall.SIGTERM)
+ defer signal.Stop(sigc)
+ <-sigc
+ log.Info("Received SIGTERM, shutting down...")
+ stack.Stop()
+ }()
+
+ // wait for the stack to exit
+ stack.Wait()
+}
+
+// snapshotService is a node.Service which wraps a list of services and
+// exposes an API to generate a snapshot of those services
+type snapshotService struct {
+ services map[string]node.Service
+}
+
+func (s *snapshotService) APIs() []rpc.API {
+ return []rpc.API{{
+ Namespace: "simulation",
+ Version: "1.0",
+ Service: SnapshotAPI{s.services},
+ }}
+}
+
+func (s *snapshotService) Protocols() []p2p.Protocol {
+ return nil
+}
+
+func (s *snapshotService) Start(*p2p.Server) error {
+ return nil
+}
+
+func (s *snapshotService) Stop() error {
+ return nil
+}
+
+// SnapshotAPI provides an RPC method to create snapshots of services
+type SnapshotAPI struct {
+ services map[string]node.Service
+}
+
+func (api SnapshotAPI) Snapshot() (map[string][]byte, error) {
+ snapshots := make(map[string][]byte)
+ for name, service := range api.services {
+ if s, ok := service.(interface {
+ Snapshot() ([]byte, error)
+ }); ok {
+ snap, err := s.Snapshot()
+ if err != nil {
+ return nil, err
+ }
+ snapshots[name] = snap
+ }
+ }
+ return snapshots, nil
+}
+
+type wsRPCDialer struct {
+ addrs map[string]string
+}
+
+// DialRPC implements the RPCDialer interface by creating a WebSocket RPC
+// client of the given node
+func (w *wsRPCDialer) DialRPC(id discover.NodeID) (*rpc.Client, error) {
+ addr, ok := w.addrs[id.String()]
+ if !ok {
+ return nil, fmt.Errorf("unknown node: %s", id)
+ }
+ return rpc.DialWebsocket(context.Background(), addr, "http://localhost")
+}
diff --git a/p2p/simulations/adapters/inproc.go b/p2p/simulations/adapters/inproc.go
new file mode 100644
index 000000000..c97188def
--- /dev/null
+++ b/p2p/simulations/adapters/inproc.go
@@ -0,0 +1,314 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package adapters
+
+import (
+ "errors"
+ "fmt"
+ "math"
+ "net"
+ "sync"
+
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/rpc"
+)
+
+// SimAdapter is a NodeAdapter which creates in-memory simulation nodes and
+// connects them using in-memory net.Pipe connections
+type SimAdapter struct {
+ mtx sync.RWMutex
+ nodes map[discover.NodeID]*SimNode
+ services map[string]ServiceFunc
+}
+
+// NewSimAdapter creates a SimAdapter which is capable of running in-memory
+// simulation nodes running any of the given services (the services to run on a
+// particular node are passed to the NewNode function in the NodeConfig)
+func NewSimAdapter(services map[string]ServiceFunc) *SimAdapter {
+ return &SimAdapter{
+ nodes: make(map[discover.NodeID]*SimNode),
+ services: services,
+ }
+}
+
+// Name returns the name of the adapter for logging purposes
+func (s *SimAdapter) Name() string {
+ return "sim-adapter"
+}
+
+// NewNode returns a new SimNode using the given config
+func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) {
+ s.mtx.Lock()
+ defer s.mtx.Unlock()
+
+ // check a node with the ID doesn't already exist
+ id := config.ID
+ if _, exists := s.nodes[id]; exists {
+ return nil, fmt.Errorf("node already exists: %s", id)
+ }
+
+ // check the services are valid
+ if len(config.Services) == 0 {
+ return nil, errors.New("node must have at least one service")
+ }
+ for _, service := range config.Services {
+ if _, exists := s.services[service]; !exists {
+ return nil, fmt.Errorf("unknown node service %q", service)
+ }
+ }
+
+ n, err := node.New(&node.Config{
+ P2P: p2p.Config{
+ PrivateKey: config.PrivateKey,
+ MaxPeers: math.MaxInt32,
+ NoDiscovery: true,
+ Dialer: s,
+ EnableMsgEvents: true,
+ },
+ NoUSB: true,
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ simNode := &SimNode{
+ ID: id,
+ config: config,
+ node: n,
+ adapter: s,
+ running: make(map[string]node.Service),
+ }
+ s.nodes[id] = simNode
+ return simNode, nil
+}
+
+// Dial implements the p2p.NodeDialer interface by connecting to the node using
+// an in-memory net.Pipe connection
+func (s *SimAdapter) Dial(dest *discover.Node) (conn net.Conn, err error) {
+ node, ok := s.GetNode(dest.ID)
+ if !ok {
+ return nil, fmt.Errorf("unknown node: %s", dest.ID)
+ }
+ srv := node.Server()
+ if srv == nil {
+ return nil, fmt.Errorf("node not running: %s", dest.ID)
+ }
+ pipe1, pipe2 := net.Pipe()
+ go srv.SetupConn(pipe1, 0, nil)
+ return pipe2, nil
+}
+
+// DialRPC implements the RPCDialer interface by creating an in-memory RPC
+// client of the given node
+func (s *SimAdapter) DialRPC(id discover.NodeID) (*rpc.Client, error) {
+ node, ok := s.GetNode(id)
+ if !ok {
+ return nil, fmt.Errorf("unknown node: %s", id)
+ }
+ handler, err := node.node.RPCHandler()
+ if err != nil {
+ return nil, err
+ }
+ return rpc.DialInProc(handler), nil
+}
+
+// GetNode returns the node with the given ID if it exists
+func (s *SimAdapter) GetNode(id discover.NodeID) (*SimNode, bool) {
+ s.mtx.RLock()
+ defer s.mtx.RUnlock()
+ node, ok := s.nodes[id]
+ return node, ok
+}
+
+// SimNode is an in-memory simulation node which connects to other nodes using
+// an in-memory net.Pipe connection (see SimAdapter.Dial), running devp2p
+// protocols directly over that pipe
+type SimNode struct {
+ lock sync.RWMutex
+ ID discover.NodeID
+ config *NodeConfig
+ adapter *SimAdapter
+ node *node.Node
+ running map[string]node.Service
+ client *rpc.Client
+ registerOnce sync.Once
+}
+
+// Addr returns the node's discovery address
+func (self *SimNode) Addr() []byte {
+ return []byte(self.Node().String())
+}
+
+// Node returns a discover.Node representing the SimNode
+func (self *SimNode) Node() *discover.Node {
+ return discover.NewNode(self.ID, net.IP{127, 0, 0, 1}, 30303, 30303)
+}
+
+// Client returns an rpc.Client which can be used to communicate with the
+// underlying services (it is set once the node has started)
+func (self *SimNode) Client() (*rpc.Client, error) {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ if self.client == nil {
+ return nil, errors.New("node not started")
+ }
+ return self.client, nil
+}
+
+// ServeRPC serves RPC requests over the given connection by creating an
+// in-memory client to the node's RPC server
+func (self *SimNode) ServeRPC(conn net.Conn) error {
+ handler, err := self.node.RPCHandler()
+ if err != nil {
+ return err
+ }
+ handler.ServeCodec(rpc.NewJSONCodec(conn), rpc.OptionMethodInvocation|rpc.OptionSubscriptions)
+ return nil
+}
+
+// Snapshots creates snapshots of the services by calling the
+// simulation_snapshot RPC method
+func (self *SimNode) Snapshots() (map[string][]byte, error) {
+ self.lock.RLock()
+ services := make(map[string]node.Service, len(self.running))
+ for name, service := range self.running {
+ services[name] = service
+ }
+ self.lock.RUnlock()
+ if len(services) == 0 {
+ return nil, errors.New("no running services")
+ }
+ snapshots := make(map[string][]byte)
+ for name, service := range services {
+ if s, ok := service.(interface {
+ Snapshot() ([]byte, error)
+ }); ok {
+ snap, err := s.Snapshot()
+ if err != nil {
+ return nil, err
+ }
+ snapshots[name] = snap
+ }
+ }
+ return snapshots, nil
+}
+
+// Start registers the services and starts the underlying devp2p node
+func (self *SimNode) Start(snapshots map[string][]byte) error {
+ newService := func(name string) func(ctx *node.ServiceContext) (node.Service, error) {
+ return func(nodeCtx *node.ServiceContext) (node.Service, error) {
+ ctx := &ServiceContext{
+ RPCDialer: self.adapter,
+ NodeContext: nodeCtx,
+ Config: self.config,
+ }
+ if snapshots != nil {
+ ctx.Snapshot = snapshots[name]
+ }
+ serviceFunc := self.adapter.services[name]
+ service, err := serviceFunc(ctx)
+ if err != nil {
+ return nil, err
+ }
+ self.running[name] = service
+ return service, nil
+ }
+ }
+
+ // ensure we only register the services once in the case of the node
+ // being stopped and then started again
+ var regErr error
+ self.registerOnce.Do(func() {
+ for _, name := range self.config.Services {
+ if err := self.node.Register(newService(name)); err != nil {
+ regErr = err
+ return
+ }
+ }
+ })
+ if regErr != nil {
+ return regErr
+ }
+
+ if err := self.node.Start(); err != nil {
+ return err
+ }
+
+ // create an in-process RPC client
+ handler, err := self.node.RPCHandler()
+ if err != nil {
+ return err
+ }
+
+ self.lock.Lock()
+ self.client = rpc.DialInProc(handler)
+ self.lock.Unlock()
+
+ return nil
+}
+
+// Stop closes the RPC client and stops the underlying devp2p node
+func (self *SimNode) Stop() error {
+ self.lock.Lock()
+ if self.client != nil {
+ self.client.Close()
+ self.client = nil
+ }
+ self.lock.Unlock()
+ return self.node.Stop()
+}
+
+// Services returns a copy of the underlying services
+func (self *SimNode) Services() []node.Service {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ services := make([]node.Service, 0, len(self.running))
+ for _, service := range self.running {
+ services = append(services, service)
+ }
+ return services
+}
+
+// Server returns the underlying p2p.Server
+func (self *SimNode) Server() *p2p.Server {
+ return self.node.Server()
+}
+
+// SubscribeEvents subscribes the given channel to peer events from the
+// underlying p2p.Server
+func (self *SimNode) SubscribeEvents(ch chan *p2p.PeerEvent) event.Subscription {
+ srv := self.Server()
+ if srv == nil {
+ panic("node not running")
+ }
+ return srv.SubscribeEvents(ch)
+}
+
+// NodeInfo returns information about the node
+func (self *SimNode) NodeInfo() *p2p.NodeInfo {
+ server := self.Server()
+ if server == nil {
+ return &p2p.NodeInfo{
+ ID: self.ID.String(),
+ Enode: self.Node().String(),
+ }
+ }
+ return server.NodeInfo()
+}
diff --git a/p2p/simulations/adapters/types.go b/p2p/simulations/adapters/types.go
new file mode 100644
index 000000000..ed6cfc504
--- /dev/null
+++ b/p2p/simulations/adapters/types.go
@@ -0,0 +1,215 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package adapters
+
+import (
+ "crypto/ecdsa"
+ "encoding/hex"
+ "encoding/json"
+ "fmt"
+ "net"
+ "os"
+
+ "github.com/docker/docker/pkg/reexec"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/rpc"
+)
+
+// Node represents a node in a simulation network which is created by a
+// NodeAdapter, for example:
+//
+// * SimNode - An in-memory node
+// * ExecNode - A child process node
+// * DockerNode - A Docker container node
+//
+type Node interface {
+ // Addr returns the node's address (e.g. an Enode URL)
+ Addr() []byte
+
+ // Client returns the RPC client which is created once the node is
+ // up and running
+ Client() (*rpc.Client, error)
+
+ // ServeRPC serves RPC requests over the given connection
+ ServeRPC(net.Conn) error
+
+ // Start starts the node with the given snapshots
+ Start(snapshots map[string][]byte) error
+
+ // Stop stops the node
+ Stop() error
+
+ // NodeInfo returns information about the node
+ NodeInfo() *p2p.NodeInfo
+
+ // Snapshots creates snapshots of the running services
+ Snapshots() (map[string][]byte, error)
+}
+
+// NodeAdapter is used to create Nodes in a simulation network
+type NodeAdapter interface {
+ // Name returns the name of the adapter for logging purposes
+ Name() string
+
+ // NewNode creates a new node with the given configuration
+ NewNode(config *NodeConfig) (Node, error)
+}
+
+// NodeConfig is the configuration used to start a node in a simulation
+// network
+type NodeConfig struct {
+ // ID is the node's ID which is used to identify the node in the
+ // simulation network
+ ID discover.NodeID
+
+ // PrivateKey is the node's private key which is used by the devp2p
+ // stack to encrypt communications
+ PrivateKey *ecdsa.PrivateKey
+
+ // Name is a human friendly name for the node like "node01"
+ Name string
+
+ // Services are the names of the services which should be run when
+ // starting the node (for SimNodes it should be the names of services
+ // contained in SimAdapter.services, for other nodes it should be
+ // services registered by calling the RegisterService function)
+ Services []string
+}
+
+// nodeConfigJSON is used to encode and decode NodeConfig as JSON by encoding
+// all fields as strings
+type nodeConfigJSON struct {
+ ID string `json:"id"`
+ PrivateKey string `json:"private_key"`
+ Name string `json:"name"`
+ Services []string `json:"services"`
+}
+
+// MarshalJSON implements the json.Marshaler interface by encoding the config
+// fields as strings
+func (n *NodeConfig) MarshalJSON() ([]byte, error) {
+ confJSON := nodeConfigJSON{
+ ID: n.ID.String(),
+ Name: n.Name,
+ Services: n.Services,
+ }
+ if n.PrivateKey != nil {
+ confJSON.PrivateKey = hex.EncodeToString(crypto.FromECDSA(n.PrivateKey))
+ }
+ return json.Marshal(confJSON)
+}
+
+// UnmarshalJSON implements the json.Unmarshaler interface by decoding the json
+// string values into the config fields
+func (n *NodeConfig) UnmarshalJSON(data []byte) error {
+ var confJSON nodeConfigJSON
+ if err := json.Unmarshal(data, &confJSON); err != nil {
+ return err
+ }
+
+ if confJSON.ID != "" {
+ nodeID, err := discover.HexID(confJSON.ID)
+ if err != nil {
+ return err
+ }
+ n.ID = nodeID
+ }
+
+ if confJSON.PrivateKey != "" {
+ key, err := hex.DecodeString(confJSON.PrivateKey)
+ if err != nil {
+ return err
+ }
+ privKey, err := crypto.ToECDSA(key)
+ if err != nil {
+ return err
+ }
+ n.PrivateKey = privKey
+ }
+
+ n.Name = confJSON.Name
+ n.Services = confJSON.Services
+
+ return nil
+}
+
+// RandomNodeConfig returns node configuration with a randomly generated ID and
+// PrivateKey
+func RandomNodeConfig() *NodeConfig {
+ key, err := crypto.GenerateKey()
+ if err != nil {
+ panic("unable to generate key")
+ }
+ var id discover.NodeID
+ pubkey := crypto.FromECDSAPub(&key.PublicKey)
+ copy(id[:], pubkey[1:])
+ return &NodeConfig{
+ ID: id,
+ PrivateKey: key,
+ }
+}
+
+// ServiceContext is a collection of options and methods which can be utilised
+// when starting services
+type ServiceContext struct {
+ RPCDialer
+
+ NodeContext *node.ServiceContext
+ Config *NodeConfig
+ Snapshot []byte
+}
+
+// RPCDialer is used when initialising services which need to connect to
+// other nodes in the network (for example a simulated Swarm node which needs
+// to connect to a Geth node to resolve ENS names)
+type RPCDialer interface {
+ DialRPC(id discover.NodeID) (*rpc.Client, error)
+}
+
+// Services is a collection of services which can be run in a simulation
+type Services map[string]ServiceFunc
+
+// ServiceFunc returns a node.Service which can be used to boot a devp2p node
+type ServiceFunc func(ctx *ServiceContext) (node.Service, error)
+
+// serviceFuncs is a map of registered services which are used to boot devp2p
+// nodes
+var serviceFuncs = make(Services)
+
+// RegisterServices registers the given Services which can then be used to
+// start devp2p nodes using either the Exec or Docker adapters.
+//
+// It should be called in an init function so that it has the opportunity to
+// execute the services before main() is called.
+func RegisterServices(services Services) {
+ for name, f := range services {
+ if _, exists := serviceFuncs[name]; exists {
+ panic(fmt.Sprintf("node service already exists: %q", name))
+ }
+ serviceFuncs[name] = f
+ }
+
+ // now we have registered the services, run reexec.Init() which will
+ // potentially start one of the services if the current binary has
+ // been exec'd with argv[0] set to "p2p-node"
+ if reexec.Init() {
+ os.Exit(0)
+ }
+}