aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/simulations/adapters
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/simulations/adapters')
-rw-r--r--p2p/simulations/adapters/docker.go182
-rw-r--r--p2p/simulations/adapters/exec.go504
-rw-r--r--p2p/simulations/adapters/inproc.go314
-rw-r--r--p2p/simulations/adapters/types.go215
4 files changed, 1215 insertions, 0 deletions
diff --git a/p2p/simulations/adapters/docker.go b/p2p/simulations/adapters/docker.go
new file mode 100644
index 000000000..022314b3d
--- /dev/null
+++ b/p2p/simulations/adapters/docker.go
@@ -0,0 +1,182 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package adapters
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "runtime"
+ "strings"
+
+ "github.com/docker/docker/pkg/reexec"
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+)
+
+// DockerAdapter is a NodeAdapter which runs simulation nodes inside Docker
+// containers.
+//
+// A Docker image is built which contains the current binary at /bin/p2p-node
+// which when executed runs the underlying service (see the description
+// of the execP2PNode function for more details)
+type DockerAdapter struct {
+ ExecAdapter
+}
+
+// NewDockerAdapter builds the p2p-node Docker image containing the current
+// binary and returns a DockerAdapter
+func NewDockerAdapter() (*DockerAdapter, error) {
+ // Since Docker containers run on Linux and this adapter runs the
+ // current binary in the container, it must be compiled for Linux.
+ //
+ // It is reasonable to require this because the caller can just
+ // compile the current binary in a Docker container.
+ if runtime.GOOS != "linux" {
+ return nil, errors.New("DockerAdapter can only be used on Linux as it uses the current binary (which must be a Linux binary)")
+ }
+
+ if err := buildDockerImage(); err != nil {
+ return nil, err
+ }
+
+ return &DockerAdapter{
+ ExecAdapter{
+ nodes: make(map[discover.NodeID]*ExecNode),
+ },
+ }, nil
+}
+
+// Name returns the name of the adapter for logging purposes
+func (d *DockerAdapter) Name() string {
+ return "docker-adapter"
+}
+
+// NewNode returns a new DockerNode using the given config
+func (d *DockerAdapter) NewNode(config *NodeConfig) (Node, error) {
+ if len(config.Services) == 0 {
+ return nil, errors.New("node must have at least one service")
+ }
+ for _, service := range config.Services {
+ if _, exists := serviceFuncs[service]; !exists {
+ return nil, fmt.Errorf("unknown node service %q", service)
+ }
+ }
+
+ // generate the config
+ conf := &execNodeConfig{
+ Stack: node.DefaultConfig,
+ Node: config,
+ }
+ conf.Stack.DataDir = "/data"
+ conf.Stack.WSHost = "0.0.0.0"
+ conf.Stack.WSOrigins = []string{"*"}
+ conf.Stack.WSExposeAll = true
+ conf.Stack.P2P.EnableMsgEvents = false
+ conf.Stack.P2P.NoDiscovery = true
+ conf.Stack.P2P.NAT = nil
+ conf.Stack.NoUSB = true
+
+ node := &DockerNode{
+ ExecNode: ExecNode{
+ ID: config.ID,
+ Config: conf,
+ adapter: &d.ExecAdapter,
+ },
+ }
+ node.newCmd = node.dockerCommand
+ d.ExecAdapter.nodes[node.ID] = &node.ExecNode
+ return node, nil
+}
+
+// DockerNode wraps an ExecNode but exec's the current binary in a docker
+// container rather than locally
+type DockerNode struct {
+ ExecNode
+}
+
+// dockerCommand returns a command which exec's the binary in a Docker
+// container.
+//
+// It uses a shell so that we can pass the _P2P_NODE_CONFIG environment
+// variable to the container using the --env flag.
+func (n *DockerNode) dockerCommand() *exec.Cmd {
+ return exec.Command(
+ "sh", "-c",
+ fmt.Sprintf(
+ `exec docker run --interactive --env _P2P_NODE_CONFIG="${_P2P_NODE_CONFIG}" %s p2p-node %s %s`,
+ dockerImage, strings.Join(n.Config.Node.Services, ","), n.ID.String(),
+ ),
+ )
+}
+
+// dockerImage is the name of the Docker image which gets built to run the
+// simulation node
+const dockerImage = "p2p-node"
+
+// buildDockerImage builds the Docker image which is used to run the simulation
+// node in a Docker container.
+//
+// It adds the current binary as "p2p-node" so that it runs execP2PNode
+// when executed.
+func buildDockerImage() error {
+ // create a directory to use as the build context
+ dir, err := ioutil.TempDir("", "p2p-docker")
+ if err != nil {
+ return err
+ }
+ defer os.RemoveAll(dir)
+
+ // copy the current binary into the build context
+ bin, err := os.Open(reexec.Self())
+ if err != nil {
+ return err
+ }
+ defer bin.Close()
+ dst, err := os.OpenFile(filepath.Join(dir, "self.bin"), os.O_WRONLY|os.O_CREATE, 0755)
+ if err != nil {
+ return err
+ }
+ defer dst.Close()
+ if _, err := io.Copy(dst, bin); err != nil {
+ return err
+ }
+
+ // create the Dockerfile
+ dockerfile := []byte(`
+FROM ubuntu:16.04
+RUN mkdir /data
+ADD self.bin /bin/p2p-node
+ `)
+ if err := ioutil.WriteFile(filepath.Join(dir, "Dockerfile"), dockerfile, 0644); err != nil {
+ return err
+ }
+
+ // run 'docker build'
+ cmd := exec.Command("docker", "build", "-t", dockerImage, dir)
+ cmd.Stdout = os.Stdout
+ cmd.Stderr = os.Stderr
+ if err := cmd.Run(); err != nil {
+ return fmt.Errorf("error building docker image: %s", err)
+ }
+
+ return nil
+}
diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go
new file mode 100644
index 000000000..bdb92cc1d
--- /dev/null
+++ b/p2p/simulations/adapters/exec.go
@@ -0,0 +1,504 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package adapters
+
+import (
+ "bufio"
+ "context"
+ "crypto/ecdsa"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "net"
+ "os"
+ "os/exec"
+ "os/signal"
+ "path/filepath"
+ "regexp"
+ "strings"
+ "sync"
+ "syscall"
+ "time"
+
+ "github.com/docker/docker/pkg/reexec"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/rpc"
+ "golang.org/x/net/websocket"
+)
+
+// ExecAdapter is a NodeAdapter which runs simulation nodes by executing the
+// current binary as a child process.
+//
+// An init hook is used so that the child process executes the node services
+// (rather than whataver the main() function would normally do), see the
+// execP2PNode function for more information.
+type ExecAdapter struct {
+ // BaseDir is the directory under which the data directories for each
+ // simulation node are created.
+ BaseDir string
+
+ nodes map[discover.NodeID]*ExecNode
+}
+
+// NewExecAdapter returns an ExecAdapter which stores node data in
+// subdirectories of the given base directory
+func NewExecAdapter(baseDir string) *ExecAdapter {
+ return &ExecAdapter{
+ BaseDir: baseDir,
+ nodes: make(map[discover.NodeID]*ExecNode),
+ }
+}
+
+// Name returns the name of the adapter for logging purposes
+func (e *ExecAdapter) Name() string {
+ return "exec-adapter"
+}
+
+// NewNode returns a new ExecNode using the given config
+func (e *ExecAdapter) NewNode(config *NodeConfig) (Node, error) {
+ if len(config.Services) == 0 {
+ return nil, errors.New("node must have at least one service")
+ }
+ for _, service := range config.Services {
+ if _, exists := serviceFuncs[service]; !exists {
+ return nil, fmt.Errorf("unknown node service %q", service)
+ }
+ }
+
+ // create the node directory using the first 12 characters of the ID
+ // as Unix socket paths cannot be longer than 256 characters
+ dir := filepath.Join(e.BaseDir, config.ID.String()[:12])
+ if err := os.Mkdir(dir, 0755); err != nil {
+ return nil, fmt.Errorf("error creating node directory: %s", err)
+ }
+
+ // generate the config
+ conf := &execNodeConfig{
+ Stack: node.DefaultConfig,
+ Node: config,
+ }
+ conf.Stack.DataDir = filepath.Join(dir, "data")
+ conf.Stack.WSHost = "127.0.0.1"
+ conf.Stack.WSPort = 0
+ conf.Stack.WSOrigins = []string{"*"}
+ conf.Stack.WSExposeAll = true
+ conf.Stack.P2P.EnableMsgEvents = false
+ conf.Stack.P2P.NoDiscovery = true
+ conf.Stack.P2P.NAT = nil
+ conf.Stack.NoUSB = true
+
+ // listen on a random localhost port (we'll get the actual port after
+ // starting the node through the RPC admin.nodeInfo method)
+ conf.Stack.P2P.ListenAddr = "127.0.0.1:0"
+
+ node := &ExecNode{
+ ID: config.ID,
+ Dir: dir,
+ Config: conf,
+ adapter: e,
+ }
+ node.newCmd = node.execCommand
+ e.nodes[node.ID] = node
+ return node, nil
+}
+
+// ExecNode starts a simulation node by exec'ing the current binary and
+// running the configured services
+type ExecNode struct {
+ ID discover.NodeID
+ Dir string
+ Config *execNodeConfig
+ Cmd *exec.Cmd
+ Info *p2p.NodeInfo
+
+ adapter *ExecAdapter
+ client *rpc.Client
+ wsAddr string
+ newCmd func() *exec.Cmd
+ key *ecdsa.PrivateKey
+}
+
+// Addr returns the node's enode URL
+func (n *ExecNode) Addr() []byte {
+ if n.Info == nil {
+ return nil
+ }
+ return []byte(n.Info.Enode)
+}
+
+// Client returns an rpc.Client which can be used to communicate with the
+// underlying services (it is set once the node has started)
+func (n *ExecNode) Client() (*rpc.Client, error) {
+ return n.client, nil
+}
+
+// wsAddrPattern is a regex used to read the WebSocket address from the node's
+// log
+var wsAddrPattern = regexp.MustCompile(`ws://[\d.:]+`)
+
+// Start exec's the node passing the ID and service as command line arguments
+// and the node config encoded as JSON in the _P2P_NODE_CONFIG environment
+// variable
+func (n *ExecNode) Start(snapshots map[string][]byte) (err error) {
+ if n.Cmd != nil {
+ return errors.New("already started")
+ }
+ defer func() {
+ if err != nil {
+ log.Error("node failed to start", "err", err)
+ n.Stop()
+ }
+ }()
+
+ // encode a copy of the config containing the snapshot
+ confCopy := *n.Config
+ confCopy.Snapshots = snapshots
+ confCopy.PeerAddrs = make(map[string]string)
+ for id, node := range n.adapter.nodes {
+ confCopy.PeerAddrs[id.String()] = node.wsAddr
+ }
+ confData, err := json.Marshal(confCopy)
+ if err != nil {
+ return fmt.Errorf("error generating node config: %s", err)
+ }
+
+ // use a pipe for stderr so we can both copy the node's stderr to
+ // os.Stderr and read the WebSocket address from the logs
+ stderrR, stderrW := io.Pipe()
+ stderr := io.MultiWriter(os.Stderr, stderrW)
+
+ // start the node
+ cmd := n.newCmd()
+ cmd.Stdout = os.Stdout
+ cmd.Stderr = stderr
+ cmd.Env = append(os.Environ(), fmt.Sprintf("_P2P_NODE_CONFIG=%s", confData))
+ if err := cmd.Start(); err != nil {
+ return fmt.Errorf("error starting node: %s", err)
+ }
+ n.Cmd = cmd
+
+ // read the WebSocket address from the stderr logs
+ var wsAddr string
+ wsAddrC := make(chan string)
+ go func() {
+ s := bufio.NewScanner(stderrR)
+ for s.Scan() {
+ if strings.Contains(s.Text(), "WebSocket endpoint opened:") {
+ wsAddrC <- wsAddrPattern.FindString(s.Text())
+ }
+ }
+ }()
+ select {
+ case wsAddr = <-wsAddrC:
+ if wsAddr == "" {
+ return errors.New("failed to read WebSocket address from stderr")
+ }
+ case <-time.After(10 * time.Second):
+ return errors.New("timed out waiting for WebSocket address on stderr")
+ }
+
+ // create the RPC client and load the node info
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+ client, err := rpc.DialWebsocket(ctx, wsAddr, "")
+ if err != nil {
+ return fmt.Errorf("error dialing rpc websocket: %s", err)
+ }
+ var info p2p.NodeInfo
+ if err := client.CallContext(ctx, &info, "admin_nodeInfo"); err != nil {
+ return fmt.Errorf("error getting node info: %s", err)
+ }
+ n.client = client
+ n.wsAddr = wsAddr
+ n.Info = &info
+
+ return nil
+}
+
+// execCommand returns a command which runs the node locally by exec'ing
+// the current binary but setting argv[0] to "p2p-node" so that the child
+// runs execP2PNode
+func (n *ExecNode) execCommand() *exec.Cmd {
+ return &exec.Cmd{
+ Path: reexec.Self(),
+ Args: []string{"p2p-node", strings.Join(n.Config.Node.Services, ","), n.ID.String()},
+ }
+}
+
+// Stop stops the node by first sending SIGTERM and then SIGKILL if the node
+// doesn't stop within 5s
+func (n *ExecNode) Stop() error {
+ if n.Cmd == nil {
+ return nil
+ }
+ defer func() {
+ n.Cmd = nil
+ }()
+
+ if n.client != nil {
+ n.client.Close()
+ n.client = nil
+ n.wsAddr = ""
+ n.Info = nil
+ }
+
+ if err := n.Cmd.Process.Signal(syscall.SIGTERM); err != nil {
+ return n.Cmd.Process.Kill()
+ }
+ waitErr := make(chan error)
+ go func() {
+ waitErr <- n.Cmd.Wait()
+ }()
+ select {
+ case err := <-waitErr:
+ return err
+ case <-time.After(5 * time.Second):
+ return n.Cmd.Process.Kill()
+ }
+}
+
+// NodeInfo returns information about the node
+func (n *ExecNode) NodeInfo() *p2p.NodeInfo {
+ info := &p2p.NodeInfo{
+ ID: n.ID.String(),
+ }
+ if n.client != nil {
+ n.client.Call(&info, "admin_nodeInfo")
+ }
+ return info
+}
+
+// ServeRPC serves RPC requests over the given connection by dialling the
+// node's WebSocket address and joining the two connections
+func (n *ExecNode) ServeRPC(clientConn net.Conn) error {
+ conn, err := websocket.Dial(n.wsAddr, "", "http://localhost")
+ if err != nil {
+ return err
+ }
+ var wg sync.WaitGroup
+ wg.Add(2)
+ join := func(src, dst net.Conn) {
+ defer wg.Done()
+ io.Copy(dst, src)
+ // close the write end of the destination connection
+ if cw, ok := dst.(interface {
+ CloseWrite() error
+ }); ok {
+ cw.CloseWrite()
+ } else {
+ dst.Close()
+ }
+ }
+ go join(conn, clientConn)
+ go join(clientConn, conn)
+ wg.Wait()
+ return nil
+}
+
+// Snapshots creates snapshots of the services by calling the
+// simulation_snapshot RPC method
+func (n *ExecNode) Snapshots() (map[string][]byte, error) {
+ if n.client == nil {
+ return nil, errors.New("RPC not started")
+ }
+ var snapshots map[string][]byte
+ return snapshots, n.client.Call(&snapshots, "simulation_snapshot")
+}
+
+func init() {
+ // register a reexec function to start a devp2p node when the current
+ // binary is executed as "p2p-node"
+ reexec.Register("p2p-node", execP2PNode)
+}
+
+// execNodeConfig is used to serialize the node configuration so it can be
+// passed to the child process as a JSON encoded environment variable
+type execNodeConfig struct {
+ Stack node.Config `json:"stack"`
+ Node *NodeConfig `json:"node"`
+ Snapshots map[string][]byte `json:"snapshots,omitempty"`
+ PeerAddrs map[string]string `json:"peer_addrs,omitempty"`
+}
+
+// execP2PNode starts a devp2p node when the current binary is executed with
+// argv[0] being "p2p-node", reading the service / ID from argv[1] / argv[2]
+// and the node config from the _P2P_NODE_CONFIG environment variable
+func execP2PNode() {
+ glogger := log.NewGlogHandler(log.StreamHandler(os.Stderr, log.LogfmtFormat()))
+ glogger.Verbosity(log.LvlInfo)
+ log.Root().SetHandler(glogger)
+
+ // read the services from argv
+ serviceNames := strings.Split(os.Args[1], ",")
+
+ // decode the config
+ confEnv := os.Getenv("_P2P_NODE_CONFIG")
+ if confEnv == "" {
+ log.Crit("missing _P2P_NODE_CONFIG")
+ }
+ var conf execNodeConfig
+ if err := json.Unmarshal([]byte(confEnv), &conf); err != nil {
+ log.Crit("error decoding _P2P_NODE_CONFIG", "err", err)
+ }
+ conf.Stack.P2P.PrivateKey = conf.Node.PrivateKey
+
+ // use explicit IP address in ListenAddr so that Enode URL is usable
+ externalIP := func() string {
+ addrs, err := net.InterfaceAddrs()
+ if err != nil {
+ log.Crit("error getting IP address", "err", err)
+ }
+ for _, addr := range addrs {
+ if ip, ok := addr.(*net.IPNet); ok && !ip.IP.IsLoopback() {
+ return ip.IP.String()
+ }
+ }
+ log.Crit("unable to determine explicit IP address")
+ return ""
+ }
+ if strings.HasPrefix(conf.Stack.P2P.ListenAddr, ":") {
+ conf.Stack.P2P.ListenAddr = externalIP() + conf.Stack.P2P.ListenAddr
+ }
+ if conf.Stack.WSHost == "0.0.0.0" {
+ conf.Stack.WSHost = externalIP()
+ }
+
+ // initialize the devp2p stack
+ stack, err := node.New(&conf.Stack)
+ if err != nil {
+ log.Crit("error creating node stack", "err", err)
+ }
+
+ // register the services, collecting them into a map so we can wrap
+ // them in a snapshot service
+ services := make(map[string]node.Service, len(serviceNames))
+ for _, name := range serviceNames {
+ serviceFunc, exists := serviceFuncs[name]
+ if !exists {
+ log.Crit("unknown node service", "name", name)
+ }
+ constructor := func(nodeCtx *node.ServiceContext) (node.Service, error) {
+ ctx := &ServiceContext{
+ RPCDialer: &wsRPCDialer{addrs: conf.PeerAddrs},
+ NodeContext: nodeCtx,
+ Config: conf.Node,
+ }
+ if conf.Snapshots != nil {
+ ctx.Snapshot = conf.Snapshots[name]
+ }
+ service, err := serviceFunc(ctx)
+ if err != nil {
+ return nil, err
+ }
+ services[name] = service
+ return service, nil
+ }
+ if err := stack.Register(constructor); err != nil {
+ log.Crit("error starting service", "name", name, "err", err)
+ }
+ }
+
+ // register the snapshot service
+ if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
+ return &snapshotService{services}, nil
+ }); err != nil {
+ log.Crit("error starting snapshot service", "err", err)
+ }
+
+ // start the stack
+ if err := stack.Start(); err != nil {
+ log.Crit("error stating node stack", "err", err)
+ }
+
+ // stop the stack if we get a SIGTERM signal
+ go func() {
+ sigc := make(chan os.Signal, 1)
+ signal.Notify(sigc, syscall.SIGTERM)
+ defer signal.Stop(sigc)
+ <-sigc
+ log.Info("Received SIGTERM, shutting down...")
+ stack.Stop()
+ }()
+
+ // wait for the stack to exit
+ stack.Wait()
+}
+
+// snapshotService is a node.Service which wraps a list of services and
+// exposes an API to generate a snapshot of those services
+type snapshotService struct {
+ services map[string]node.Service
+}
+
+func (s *snapshotService) APIs() []rpc.API {
+ return []rpc.API{{
+ Namespace: "simulation",
+ Version: "1.0",
+ Service: SnapshotAPI{s.services},
+ }}
+}
+
+func (s *snapshotService) Protocols() []p2p.Protocol {
+ return nil
+}
+
+func (s *snapshotService) Start(*p2p.Server) error {
+ return nil
+}
+
+func (s *snapshotService) Stop() error {
+ return nil
+}
+
+// SnapshotAPI provides an RPC method to create snapshots of services
+type SnapshotAPI struct {
+ services map[string]node.Service
+}
+
+func (api SnapshotAPI) Snapshot() (map[string][]byte, error) {
+ snapshots := make(map[string][]byte)
+ for name, service := range api.services {
+ if s, ok := service.(interface {
+ Snapshot() ([]byte, error)
+ }); ok {
+ snap, err := s.Snapshot()
+ if err != nil {
+ return nil, err
+ }
+ snapshots[name] = snap
+ }
+ }
+ return snapshots, nil
+}
+
+type wsRPCDialer struct {
+ addrs map[string]string
+}
+
+// DialRPC implements the RPCDialer interface by creating a WebSocket RPC
+// client of the given node
+func (w *wsRPCDialer) DialRPC(id discover.NodeID) (*rpc.Client, error) {
+ addr, ok := w.addrs[id.String()]
+ if !ok {
+ return nil, fmt.Errorf("unknown node: %s", id)
+ }
+ return rpc.DialWebsocket(context.Background(), addr, "http://localhost")
+}
diff --git a/p2p/simulations/adapters/inproc.go b/p2p/simulations/adapters/inproc.go
new file mode 100644
index 000000000..c97188def
--- /dev/null
+++ b/p2p/simulations/adapters/inproc.go
@@ -0,0 +1,314 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package adapters
+
+import (
+ "errors"
+ "fmt"
+ "math"
+ "net"
+ "sync"
+
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/rpc"
+)
+
+// SimAdapter is a NodeAdapter which creates in-memory simulation nodes and
+// connects them using in-memory net.Pipe connections
+type SimAdapter struct {
+ mtx sync.RWMutex
+ nodes map[discover.NodeID]*SimNode
+ services map[string]ServiceFunc
+}
+
+// NewSimAdapter creates a SimAdapter which is capable of running in-memory
+// simulation nodes running any of the given services (the services to run on a
+// particular node are passed to the NewNode function in the NodeConfig)
+func NewSimAdapter(services map[string]ServiceFunc) *SimAdapter {
+ return &SimAdapter{
+ nodes: make(map[discover.NodeID]*SimNode),
+ services: services,
+ }
+}
+
+// Name returns the name of the adapter for logging purposes
+func (s *SimAdapter) Name() string {
+ return "sim-adapter"
+}
+
+// NewNode returns a new SimNode using the given config
+func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) {
+ s.mtx.Lock()
+ defer s.mtx.Unlock()
+
+ // check a node with the ID doesn't already exist
+ id := config.ID
+ if _, exists := s.nodes[id]; exists {
+ return nil, fmt.Errorf("node already exists: %s", id)
+ }
+
+ // check the services are valid
+ if len(config.Services) == 0 {
+ return nil, errors.New("node must have at least one service")
+ }
+ for _, service := range config.Services {
+ if _, exists := s.services[service]; !exists {
+ return nil, fmt.Errorf("unknown node service %q", service)
+ }
+ }
+
+ n, err := node.New(&node.Config{
+ P2P: p2p.Config{
+ PrivateKey: config.PrivateKey,
+ MaxPeers: math.MaxInt32,
+ NoDiscovery: true,
+ Dialer: s,
+ EnableMsgEvents: true,
+ },
+ NoUSB: true,
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ simNode := &SimNode{
+ ID: id,
+ config: config,
+ node: n,
+ adapter: s,
+ running: make(map[string]node.Service),
+ }
+ s.nodes[id] = simNode
+ return simNode, nil
+}
+
+// Dial implements the p2p.NodeDialer interface by connecting to the node using
+// an in-memory net.Pipe connection
+func (s *SimAdapter) Dial(dest *discover.Node) (conn net.Conn, err error) {
+ node, ok := s.GetNode(dest.ID)
+ if !ok {
+ return nil, fmt.Errorf("unknown node: %s", dest.ID)
+ }
+ srv := node.Server()
+ if srv == nil {
+ return nil, fmt.Errorf("node not running: %s", dest.ID)
+ }
+ pipe1, pipe2 := net.Pipe()
+ go srv.SetupConn(pipe1, 0, nil)
+ return pipe2, nil
+}
+
+// DialRPC implements the RPCDialer interface by creating an in-memory RPC
+// client of the given node
+func (s *SimAdapter) DialRPC(id discover.NodeID) (*rpc.Client, error) {
+ node, ok := s.GetNode(id)
+ if !ok {
+ return nil, fmt.Errorf("unknown node: %s", id)
+ }
+ handler, err := node.node.RPCHandler()
+ if err != nil {
+ return nil, err
+ }
+ return rpc.DialInProc(handler), nil
+}
+
+// GetNode returns the node with the given ID if it exists
+func (s *SimAdapter) GetNode(id discover.NodeID) (*SimNode, bool) {
+ s.mtx.RLock()
+ defer s.mtx.RUnlock()
+ node, ok := s.nodes[id]
+ return node, ok
+}
+
+// SimNode is an in-memory simulation node which connects to other nodes using
+// an in-memory net.Pipe connection (see SimAdapter.Dial), running devp2p
+// protocols directly over that pipe
+type SimNode struct {
+ lock sync.RWMutex
+ ID discover.NodeID
+ config *NodeConfig
+ adapter *SimAdapter
+ node *node.Node
+ running map[string]node.Service
+ client *rpc.Client
+ registerOnce sync.Once
+}
+
+// Addr returns the node's discovery address
+func (self *SimNode) Addr() []byte {
+ return []byte(self.Node().String())
+}
+
+// Node returns a discover.Node representing the SimNode
+func (self *SimNode) Node() *discover.Node {
+ return discover.NewNode(self.ID, net.IP{127, 0, 0, 1}, 30303, 30303)
+}
+
+// Client returns an rpc.Client which can be used to communicate with the
+// underlying services (it is set once the node has started)
+func (self *SimNode) Client() (*rpc.Client, error) {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ if self.client == nil {
+ return nil, errors.New("node not started")
+ }
+ return self.client, nil
+}
+
+// ServeRPC serves RPC requests over the given connection by creating an
+// in-memory client to the node's RPC server
+func (self *SimNode) ServeRPC(conn net.Conn) error {
+ handler, err := self.node.RPCHandler()
+ if err != nil {
+ return err
+ }
+ handler.ServeCodec(rpc.NewJSONCodec(conn), rpc.OptionMethodInvocation|rpc.OptionSubscriptions)
+ return nil
+}
+
+// Snapshots creates snapshots of the services by calling the
+// simulation_snapshot RPC method
+func (self *SimNode) Snapshots() (map[string][]byte, error) {
+ self.lock.RLock()
+ services := make(map[string]node.Service, len(self.running))
+ for name, service := range self.running {
+ services[name] = service
+ }
+ self.lock.RUnlock()
+ if len(services) == 0 {
+ return nil, errors.New("no running services")
+ }
+ snapshots := make(map[string][]byte)
+ for name, service := range services {
+ if s, ok := service.(interface {
+ Snapshot() ([]byte, error)
+ }); ok {
+ snap, err := s.Snapshot()
+ if err != nil {
+ return nil, err
+ }
+ snapshots[name] = snap
+ }
+ }
+ return snapshots, nil
+}
+
+// Start registers the services and starts the underlying devp2p node
+func (self *SimNode) Start(snapshots map[string][]byte) error {
+ newService := func(name string) func(ctx *node.ServiceContext) (node.Service, error) {
+ return func(nodeCtx *node.ServiceContext) (node.Service, error) {
+ ctx := &ServiceContext{
+ RPCDialer: self.adapter,
+ NodeContext: nodeCtx,
+ Config: self.config,
+ }
+ if snapshots != nil {
+ ctx.Snapshot = snapshots[name]
+ }
+ serviceFunc := self.adapter.services[name]
+ service, err := serviceFunc(ctx)
+ if err != nil {
+ return nil, err
+ }
+ self.running[name] = service
+ return service, nil
+ }
+ }
+
+ // ensure we only register the services once in the case of the node
+ // being stopped and then started again
+ var regErr error
+ self.registerOnce.Do(func() {
+ for _, name := range self.config.Services {
+ if err := self.node.Register(newService(name)); err != nil {
+ regErr = err
+ return
+ }
+ }
+ })
+ if regErr != nil {
+ return regErr
+ }
+
+ if err := self.node.Start(); err != nil {
+ return err
+ }
+
+ // create an in-process RPC client
+ handler, err := self.node.RPCHandler()
+ if err != nil {
+ return err
+ }
+
+ self.lock.Lock()
+ self.client = rpc.DialInProc(handler)
+ self.lock.Unlock()
+
+ return nil
+}
+
+// Stop closes the RPC client and stops the underlying devp2p node
+func (self *SimNode) Stop() error {
+ self.lock.Lock()
+ if self.client != nil {
+ self.client.Close()
+ self.client = nil
+ }
+ self.lock.Unlock()
+ return self.node.Stop()
+}
+
+// Services returns a copy of the underlying services
+func (self *SimNode) Services() []node.Service {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ services := make([]node.Service, 0, len(self.running))
+ for _, service := range self.running {
+ services = append(services, service)
+ }
+ return services
+}
+
+// Server returns the underlying p2p.Server
+func (self *SimNode) Server() *p2p.Server {
+ return self.node.Server()
+}
+
+// SubscribeEvents subscribes the given channel to peer events from the
+// underlying p2p.Server
+func (self *SimNode) SubscribeEvents(ch chan *p2p.PeerEvent) event.Subscription {
+ srv := self.Server()
+ if srv == nil {
+ panic("node not running")
+ }
+ return srv.SubscribeEvents(ch)
+}
+
+// NodeInfo returns information about the node
+func (self *SimNode) NodeInfo() *p2p.NodeInfo {
+ server := self.Server()
+ if server == nil {
+ return &p2p.NodeInfo{
+ ID: self.ID.String(),
+ Enode: self.Node().String(),
+ }
+ }
+ return server.NodeInfo()
+}
diff --git a/p2p/simulations/adapters/types.go b/p2p/simulations/adapters/types.go
new file mode 100644
index 000000000..ed6cfc504
--- /dev/null
+++ b/p2p/simulations/adapters/types.go
@@ -0,0 +1,215 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package adapters
+
+import (
+ "crypto/ecdsa"
+ "encoding/hex"
+ "encoding/json"
+ "fmt"
+ "net"
+ "os"
+
+ "github.com/docker/docker/pkg/reexec"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/rpc"
+)
+
+// Node represents a node in a simulation network which is created by a
+// NodeAdapter, for example:
+//
+// * SimNode - An in-memory node
+// * ExecNode - A child process node
+// * DockerNode - A Docker container node
+//
+type Node interface {
+ // Addr returns the node's address (e.g. an Enode URL)
+ Addr() []byte
+
+ // Client returns the RPC client which is created once the node is
+ // up and running
+ Client() (*rpc.Client, error)
+
+ // ServeRPC serves RPC requests over the given connection
+ ServeRPC(net.Conn) error
+
+ // Start starts the node with the given snapshots
+ Start(snapshots map[string][]byte) error
+
+ // Stop stops the node
+ Stop() error
+
+ // NodeInfo returns information about the node
+ NodeInfo() *p2p.NodeInfo
+
+ // Snapshots creates snapshots of the running services
+ Snapshots() (map[string][]byte, error)
+}
+
+// NodeAdapter is used to create Nodes in a simulation network
+type NodeAdapter interface {
+ // Name returns the name of the adapter for logging purposes
+ Name() string
+
+ // NewNode creates a new node with the given configuration
+ NewNode(config *NodeConfig) (Node, error)
+}
+
+// NodeConfig is the configuration used to start a node in a simulation
+// network
+type NodeConfig struct {
+ // ID is the node's ID which is used to identify the node in the
+ // simulation network
+ ID discover.NodeID
+
+ // PrivateKey is the node's private key which is used by the devp2p
+ // stack to encrypt communications
+ PrivateKey *ecdsa.PrivateKey
+
+ // Name is a human friendly name for the node like "node01"
+ Name string
+
+ // Services are the names of the services which should be run when
+ // starting the node (for SimNodes it should be the names of services
+ // contained in SimAdapter.services, for other nodes it should be
+ // services registered by calling the RegisterService function)
+ Services []string
+}
+
+// nodeConfigJSON is used to encode and decode NodeConfig as JSON by encoding
+// all fields as strings
+type nodeConfigJSON struct {
+ ID string `json:"id"`
+ PrivateKey string `json:"private_key"`
+ Name string `json:"name"`
+ Services []string `json:"services"`
+}
+
+// MarshalJSON implements the json.Marshaler interface by encoding the config
+// fields as strings
+func (n *NodeConfig) MarshalJSON() ([]byte, error) {
+ confJSON := nodeConfigJSON{
+ ID: n.ID.String(),
+ Name: n.Name,
+ Services: n.Services,
+ }
+ if n.PrivateKey != nil {
+ confJSON.PrivateKey = hex.EncodeToString(crypto.FromECDSA(n.PrivateKey))
+ }
+ return json.Marshal(confJSON)
+}
+
+// UnmarshalJSON implements the json.Unmarshaler interface by decoding the json
+// string values into the config fields
+func (n *NodeConfig) UnmarshalJSON(data []byte) error {
+ var confJSON nodeConfigJSON
+ if err := json.Unmarshal(data, &confJSON); err != nil {
+ return err
+ }
+
+ if confJSON.ID != "" {
+ nodeID, err := discover.HexID(confJSON.ID)
+ if err != nil {
+ return err
+ }
+ n.ID = nodeID
+ }
+
+ if confJSON.PrivateKey != "" {
+ key, err := hex.DecodeString(confJSON.PrivateKey)
+ if err != nil {
+ return err
+ }
+ privKey, err := crypto.ToECDSA(key)
+ if err != nil {
+ return err
+ }
+ n.PrivateKey = privKey
+ }
+
+ n.Name = confJSON.Name
+ n.Services = confJSON.Services
+
+ return nil
+}
+
+// RandomNodeConfig returns node configuration with a randomly generated ID and
+// PrivateKey
+func RandomNodeConfig() *NodeConfig {
+ key, err := crypto.GenerateKey()
+ if err != nil {
+ panic("unable to generate key")
+ }
+ var id discover.NodeID
+ pubkey := crypto.FromECDSAPub(&key.PublicKey)
+ copy(id[:], pubkey[1:])
+ return &NodeConfig{
+ ID: id,
+ PrivateKey: key,
+ }
+}
+
+// ServiceContext is a collection of options and methods which can be utilised
+// when starting services
+type ServiceContext struct {
+ RPCDialer
+
+ NodeContext *node.ServiceContext
+ Config *NodeConfig
+ Snapshot []byte
+}
+
+// RPCDialer is used when initialising services which need to connect to
+// other nodes in the network (for example a simulated Swarm node which needs
+// to connect to a Geth node to resolve ENS names)
+type RPCDialer interface {
+ DialRPC(id discover.NodeID) (*rpc.Client, error)
+}
+
+// Services is a collection of services which can be run in a simulation
+type Services map[string]ServiceFunc
+
+// ServiceFunc returns a node.Service which can be used to boot a devp2p node
+type ServiceFunc func(ctx *ServiceContext) (node.Service, error)
+
+// serviceFuncs is a map of registered services which are used to boot devp2p
+// nodes
+var serviceFuncs = make(Services)
+
+// RegisterServices registers the given Services which can then be used to
+// start devp2p nodes using either the Exec or Docker adapters.
+//
+// It should be called in an init function so that it has the opportunity to
+// execute the services before main() is called.
+func RegisterServices(services Services) {
+ for name, f := range services {
+ if _, exists := serviceFuncs[name]; exists {
+ panic(fmt.Sprintf("node service already exists: %q", name))
+ }
+ serviceFuncs[name] = f
+ }
+
+ // now we have registered the services, run reexec.Init() which will
+ // potentially start one of the services if the current binary has
+ // been exec'd with argv[0] set to "p2p-node"
+ if reexec.Init() {
+ os.Exit(0)
+ }
+}