diff options
Diffstat (limited to 'p2p/simulations/adapters')
-rw-r--r-- | p2p/simulations/adapters/docker.go | 182 | ||||
-rw-r--r-- | p2p/simulations/adapters/exec.go | 504 | ||||
-rw-r--r-- | p2p/simulations/adapters/inproc.go | 314 | ||||
-rw-r--r-- | p2p/simulations/adapters/types.go | 215 |
4 files changed, 1215 insertions, 0 deletions
diff --git a/p2p/simulations/adapters/docker.go b/p2p/simulations/adapters/docker.go new file mode 100644 index 000000000..022314b3d --- /dev/null +++ b/p2p/simulations/adapters/docker.go @@ -0,0 +1,182 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package adapters + +import ( + "errors" + "fmt" + "io" + "io/ioutil" + "os" + "os/exec" + "path/filepath" + "runtime" + "strings" + + "github.com/docker/docker/pkg/reexec" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p/discover" +) + +// DockerAdapter is a NodeAdapter which runs simulation nodes inside Docker +// containers. +// +// A Docker image is built which contains the current binary at /bin/p2p-node +// which when executed runs the underlying service (see the description +// of the execP2PNode function for more details) +type DockerAdapter struct { + ExecAdapter +} + +// NewDockerAdapter builds the p2p-node Docker image containing the current +// binary and returns a DockerAdapter +func NewDockerAdapter() (*DockerAdapter, error) { + // Since Docker containers run on Linux and this adapter runs the + // current binary in the container, it must be compiled for Linux. + // + // It is reasonable to require this because the caller can just + // compile the current binary in a Docker container. + if runtime.GOOS != "linux" { + return nil, errors.New("DockerAdapter can only be used on Linux as it uses the current binary (which must be a Linux binary)") + } + + if err := buildDockerImage(); err != nil { + return nil, err + } + + return &DockerAdapter{ + ExecAdapter{ + nodes: make(map[discover.NodeID]*ExecNode), + }, + }, nil +} + +// Name returns the name of the adapter for logging purposes +func (d *DockerAdapter) Name() string { + return "docker-adapter" +} + +// NewNode returns a new DockerNode using the given config +func (d *DockerAdapter) NewNode(config *NodeConfig) (Node, error) { + if len(config.Services) == 0 { + return nil, errors.New("node must have at least one service") + } + for _, service := range config.Services { + if _, exists := serviceFuncs[service]; !exists { + return nil, fmt.Errorf("unknown node service %q", service) + } + } + + // generate the config + conf := &execNodeConfig{ + Stack: node.DefaultConfig, + Node: config, + } + conf.Stack.DataDir = "/data" + conf.Stack.WSHost = "0.0.0.0" + conf.Stack.WSOrigins = []string{"*"} + conf.Stack.WSExposeAll = true + conf.Stack.P2P.EnableMsgEvents = false + conf.Stack.P2P.NoDiscovery = true + conf.Stack.P2P.NAT = nil + conf.Stack.NoUSB = true + + node := &DockerNode{ + ExecNode: ExecNode{ + ID: config.ID, + Config: conf, + adapter: &d.ExecAdapter, + }, + } + node.newCmd = node.dockerCommand + d.ExecAdapter.nodes[node.ID] = &node.ExecNode + return node, nil +} + +// DockerNode wraps an ExecNode but exec's the current binary in a docker +// container rather than locally +type DockerNode struct { + ExecNode +} + +// dockerCommand returns a command which exec's the binary in a Docker +// container. +// +// It uses a shell so that we can pass the _P2P_NODE_CONFIG environment +// variable to the container using the --env flag. +func (n *DockerNode) dockerCommand() *exec.Cmd { + return exec.Command( + "sh", "-c", + fmt.Sprintf( + `exec docker run --interactive --env _P2P_NODE_CONFIG="${_P2P_NODE_CONFIG}" %s p2p-node %s %s`, + dockerImage, strings.Join(n.Config.Node.Services, ","), n.ID.String(), + ), + ) +} + +// dockerImage is the name of the Docker image which gets built to run the +// simulation node +const dockerImage = "p2p-node" + +// buildDockerImage builds the Docker image which is used to run the simulation +// node in a Docker container. +// +// It adds the current binary as "p2p-node" so that it runs execP2PNode +// when executed. +func buildDockerImage() error { + // create a directory to use as the build context + dir, err := ioutil.TempDir("", "p2p-docker") + if err != nil { + return err + } + defer os.RemoveAll(dir) + + // copy the current binary into the build context + bin, err := os.Open(reexec.Self()) + if err != nil { + return err + } + defer bin.Close() + dst, err := os.OpenFile(filepath.Join(dir, "self.bin"), os.O_WRONLY|os.O_CREATE, 0755) + if err != nil { + return err + } + defer dst.Close() + if _, err := io.Copy(dst, bin); err != nil { + return err + } + + // create the Dockerfile + dockerfile := []byte(` +FROM ubuntu:16.04 +RUN mkdir /data +ADD self.bin /bin/p2p-node + `) + if err := ioutil.WriteFile(filepath.Join(dir, "Dockerfile"), dockerfile, 0644); err != nil { + return err + } + + // run 'docker build' + cmd := exec.Command("docker", "build", "-t", dockerImage, dir) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + if err := cmd.Run(); err != nil { + return fmt.Errorf("error building docker image: %s", err) + } + + return nil +} diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go new file mode 100644 index 000000000..bdb92cc1d --- /dev/null +++ b/p2p/simulations/adapters/exec.go @@ -0,0 +1,504 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package adapters + +import ( + "bufio" + "context" + "crypto/ecdsa" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "os" + "os/exec" + "os/signal" + "path/filepath" + "regexp" + "strings" + "sync" + "syscall" + "time" + + "github.com/docker/docker/pkg/reexec" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/rpc" + "golang.org/x/net/websocket" +) + +// ExecAdapter is a NodeAdapter which runs simulation nodes by executing the +// current binary as a child process. +// +// An init hook is used so that the child process executes the node services +// (rather than whataver the main() function would normally do), see the +// execP2PNode function for more information. +type ExecAdapter struct { + // BaseDir is the directory under which the data directories for each + // simulation node are created. + BaseDir string + + nodes map[discover.NodeID]*ExecNode +} + +// NewExecAdapter returns an ExecAdapter which stores node data in +// subdirectories of the given base directory +func NewExecAdapter(baseDir string) *ExecAdapter { + return &ExecAdapter{ + BaseDir: baseDir, + nodes: make(map[discover.NodeID]*ExecNode), + } +} + +// Name returns the name of the adapter for logging purposes +func (e *ExecAdapter) Name() string { + return "exec-adapter" +} + +// NewNode returns a new ExecNode using the given config +func (e *ExecAdapter) NewNode(config *NodeConfig) (Node, error) { + if len(config.Services) == 0 { + return nil, errors.New("node must have at least one service") + } + for _, service := range config.Services { + if _, exists := serviceFuncs[service]; !exists { + return nil, fmt.Errorf("unknown node service %q", service) + } + } + + // create the node directory using the first 12 characters of the ID + // as Unix socket paths cannot be longer than 256 characters + dir := filepath.Join(e.BaseDir, config.ID.String()[:12]) + if err := os.Mkdir(dir, 0755); err != nil { + return nil, fmt.Errorf("error creating node directory: %s", err) + } + + // generate the config + conf := &execNodeConfig{ + Stack: node.DefaultConfig, + Node: config, + } + conf.Stack.DataDir = filepath.Join(dir, "data") + conf.Stack.WSHost = "127.0.0.1" + conf.Stack.WSPort = 0 + conf.Stack.WSOrigins = []string{"*"} + conf.Stack.WSExposeAll = true + conf.Stack.P2P.EnableMsgEvents = false + conf.Stack.P2P.NoDiscovery = true + conf.Stack.P2P.NAT = nil + conf.Stack.NoUSB = true + + // listen on a random localhost port (we'll get the actual port after + // starting the node through the RPC admin.nodeInfo method) + conf.Stack.P2P.ListenAddr = "127.0.0.1:0" + + node := &ExecNode{ + ID: config.ID, + Dir: dir, + Config: conf, + adapter: e, + } + node.newCmd = node.execCommand + e.nodes[node.ID] = node + return node, nil +} + +// ExecNode starts a simulation node by exec'ing the current binary and +// running the configured services +type ExecNode struct { + ID discover.NodeID + Dir string + Config *execNodeConfig + Cmd *exec.Cmd + Info *p2p.NodeInfo + + adapter *ExecAdapter + client *rpc.Client + wsAddr string + newCmd func() *exec.Cmd + key *ecdsa.PrivateKey +} + +// Addr returns the node's enode URL +func (n *ExecNode) Addr() []byte { + if n.Info == nil { + return nil + } + return []byte(n.Info.Enode) +} + +// Client returns an rpc.Client which can be used to communicate with the +// underlying services (it is set once the node has started) +func (n *ExecNode) Client() (*rpc.Client, error) { + return n.client, nil +} + +// wsAddrPattern is a regex used to read the WebSocket address from the node's +// log +var wsAddrPattern = regexp.MustCompile(`ws://[\d.:]+`) + +// Start exec's the node passing the ID and service as command line arguments +// and the node config encoded as JSON in the _P2P_NODE_CONFIG environment +// variable +func (n *ExecNode) Start(snapshots map[string][]byte) (err error) { + if n.Cmd != nil { + return errors.New("already started") + } + defer func() { + if err != nil { + log.Error("node failed to start", "err", err) + n.Stop() + } + }() + + // encode a copy of the config containing the snapshot + confCopy := *n.Config + confCopy.Snapshots = snapshots + confCopy.PeerAddrs = make(map[string]string) + for id, node := range n.adapter.nodes { + confCopy.PeerAddrs[id.String()] = node.wsAddr + } + confData, err := json.Marshal(confCopy) + if err != nil { + return fmt.Errorf("error generating node config: %s", err) + } + + // use a pipe for stderr so we can both copy the node's stderr to + // os.Stderr and read the WebSocket address from the logs + stderrR, stderrW := io.Pipe() + stderr := io.MultiWriter(os.Stderr, stderrW) + + // start the node + cmd := n.newCmd() + cmd.Stdout = os.Stdout + cmd.Stderr = stderr + cmd.Env = append(os.Environ(), fmt.Sprintf("_P2P_NODE_CONFIG=%s", confData)) + if err := cmd.Start(); err != nil { + return fmt.Errorf("error starting node: %s", err) + } + n.Cmd = cmd + + // read the WebSocket address from the stderr logs + var wsAddr string + wsAddrC := make(chan string) + go func() { + s := bufio.NewScanner(stderrR) + for s.Scan() { + if strings.Contains(s.Text(), "WebSocket endpoint opened:") { + wsAddrC <- wsAddrPattern.FindString(s.Text()) + } + } + }() + select { + case wsAddr = <-wsAddrC: + if wsAddr == "" { + return errors.New("failed to read WebSocket address from stderr") + } + case <-time.After(10 * time.Second): + return errors.New("timed out waiting for WebSocket address on stderr") + } + + // create the RPC client and load the node info + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + client, err := rpc.DialWebsocket(ctx, wsAddr, "") + if err != nil { + return fmt.Errorf("error dialing rpc websocket: %s", err) + } + var info p2p.NodeInfo + if err := client.CallContext(ctx, &info, "admin_nodeInfo"); err != nil { + return fmt.Errorf("error getting node info: %s", err) + } + n.client = client + n.wsAddr = wsAddr + n.Info = &info + + return nil +} + +// execCommand returns a command which runs the node locally by exec'ing +// the current binary but setting argv[0] to "p2p-node" so that the child +// runs execP2PNode +func (n *ExecNode) execCommand() *exec.Cmd { + return &exec.Cmd{ + Path: reexec.Self(), + Args: []string{"p2p-node", strings.Join(n.Config.Node.Services, ","), n.ID.String()}, + } +} + +// Stop stops the node by first sending SIGTERM and then SIGKILL if the node +// doesn't stop within 5s +func (n *ExecNode) Stop() error { + if n.Cmd == nil { + return nil + } + defer func() { + n.Cmd = nil + }() + + if n.client != nil { + n.client.Close() + n.client = nil + n.wsAddr = "" + n.Info = nil + } + + if err := n.Cmd.Process.Signal(syscall.SIGTERM); err != nil { + return n.Cmd.Process.Kill() + } + waitErr := make(chan error) + go func() { + waitErr <- n.Cmd.Wait() + }() + select { + case err := <-waitErr: + return err + case <-time.After(5 * time.Second): + return n.Cmd.Process.Kill() + } +} + +// NodeInfo returns information about the node +func (n *ExecNode) NodeInfo() *p2p.NodeInfo { + info := &p2p.NodeInfo{ + ID: n.ID.String(), + } + if n.client != nil { + n.client.Call(&info, "admin_nodeInfo") + } + return info +} + +// ServeRPC serves RPC requests over the given connection by dialling the +// node's WebSocket address and joining the two connections +func (n *ExecNode) ServeRPC(clientConn net.Conn) error { + conn, err := websocket.Dial(n.wsAddr, "", "http://localhost") + if err != nil { + return err + } + var wg sync.WaitGroup + wg.Add(2) + join := func(src, dst net.Conn) { + defer wg.Done() + io.Copy(dst, src) + // close the write end of the destination connection + if cw, ok := dst.(interface { + CloseWrite() error + }); ok { + cw.CloseWrite() + } else { + dst.Close() + } + } + go join(conn, clientConn) + go join(clientConn, conn) + wg.Wait() + return nil +} + +// Snapshots creates snapshots of the services by calling the +// simulation_snapshot RPC method +func (n *ExecNode) Snapshots() (map[string][]byte, error) { + if n.client == nil { + return nil, errors.New("RPC not started") + } + var snapshots map[string][]byte + return snapshots, n.client.Call(&snapshots, "simulation_snapshot") +} + +func init() { + // register a reexec function to start a devp2p node when the current + // binary is executed as "p2p-node" + reexec.Register("p2p-node", execP2PNode) +} + +// execNodeConfig is used to serialize the node configuration so it can be +// passed to the child process as a JSON encoded environment variable +type execNodeConfig struct { + Stack node.Config `json:"stack"` + Node *NodeConfig `json:"node"` + Snapshots map[string][]byte `json:"snapshots,omitempty"` + PeerAddrs map[string]string `json:"peer_addrs,omitempty"` +} + +// execP2PNode starts a devp2p node when the current binary is executed with +// argv[0] being "p2p-node", reading the service / ID from argv[1] / argv[2] +// and the node config from the _P2P_NODE_CONFIG environment variable +func execP2PNode() { + glogger := log.NewGlogHandler(log.StreamHandler(os.Stderr, log.LogfmtFormat())) + glogger.Verbosity(log.LvlInfo) + log.Root().SetHandler(glogger) + + // read the services from argv + serviceNames := strings.Split(os.Args[1], ",") + + // decode the config + confEnv := os.Getenv("_P2P_NODE_CONFIG") + if confEnv == "" { + log.Crit("missing _P2P_NODE_CONFIG") + } + var conf execNodeConfig + if err := json.Unmarshal([]byte(confEnv), &conf); err != nil { + log.Crit("error decoding _P2P_NODE_CONFIG", "err", err) + } + conf.Stack.P2P.PrivateKey = conf.Node.PrivateKey + + // use explicit IP address in ListenAddr so that Enode URL is usable + externalIP := func() string { + addrs, err := net.InterfaceAddrs() + if err != nil { + log.Crit("error getting IP address", "err", err) + } + for _, addr := range addrs { + if ip, ok := addr.(*net.IPNet); ok && !ip.IP.IsLoopback() { + return ip.IP.String() + } + } + log.Crit("unable to determine explicit IP address") + return "" + } + if strings.HasPrefix(conf.Stack.P2P.ListenAddr, ":") { + conf.Stack.P2P.ListenAddr = externalIP() + conf.Stack.P2P.ListenAddr + } + if conf.Stack.WSHost == "0.0.0.0" { + conf.Stack.WSHost = externalIP() + } + + // initialize the devp2p stack + stack, err := node.New(&conf.Stack) + if err != nil { + log.Crit("error creating node stack", "err", err) + } + + // register the services, collecting them into a map so we can wrap + // them in a snapshot service + services := make(map[string]node.Service, len(serviceNames)) + for _, name := range serviceNames { + serviceFunc, exists := serviceFuncs[name] + if !exists { + log.Crit("unknown node service", "name", name) + } + constructor := func(nodeCtx *node.ServiceContext) (node.Service, error) { + ctx := &ServiceContext{ + RPCDialer: &wsRPCDialer{addrs: conf.PeerAddrs}, + NodeContext: nodeCtx, + Config: conf.Node, + } + if conf.Snapshots != nil { + ctx.Snapshot = conf.Snapshots[name] + } + service, err := serviceFunc(ctx) + if err != nil { + return nil, err + } + services[name] = service + return service, nil + } + if err := stack.Register(constructor); err != nil { + log.Crit("error starting service", "name", name, "err", err) + } + } + + // register the snapshot service + if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { + return &snapshotService{services}, nil + }); err != nil { + log.Crit("error starting snapshot service", "err", err) + } + + // start the stack + if err := stack.Start(); err != nil { + log.Crit("error stating node stack", "err", err) + } + + // stop the stack if we get a SIGTERM signal + go func() { + sigc := make(chan os.Signal, 1) + signal.Notify(sigc, syscall.SIGTERM) + defer signal.Stop(sigc) + <-sigc + log.Info("Received SIGTERM, shutting down...") + stack.Stop() + }() + + // wait for the stack to exit + stack.Wait() +} + +// snapshotService is a node.Service which wraps a list of services and +// exposes an API to generate a snapshot of those services +type snapshotService struct { + services map[string]node.Service +} + +func (s *snapshotService) APIs() []rpc.API { + return []rpc.API{{ + Namespace: "simulation", + Version: "1.0", + Service: SnapshotAPI{s.services}, + }} +} + +func (s *snapshotService) Protocols() []p2p.Protocol { + return nil +} + +func (s *snapshotService) Start(*p2p.Server) error { + return nil +} + +func (s *snapshotService) Stop() error { + return nil +} + +// SnapshotAPI provides an RPC method to create snapshots of services +type SnapshotAPI struct { + services map[string]node.Service +} + +func (api SnapshotAPI) Snapshot() (map[string][]byte, error) { + snapshots := make(map[string][]byte) + for name, service := range api.services { + if s, ok := service.(interface { + Snapshot() ([]byte, error) + }); ok { + snap, err := s.Snapshot() + if err != nil { + return nil, err + } + snapshots[name] = snap + } + } + return snapshots, nil +} + +type wsRPCDialer struct { + addrs map[string]string +} + +// DialRPC implements the RPCDialer interface by creating a WebSocket RPC +// client of the given node +func (w *wsRPCDialer) DialRPC(id discover.NodeID) (*rpc.Client, error) { + addr, ok := w.addrs[id.String()] + if !ok { + return nil, fmt.Errorf("unknown node: %s", id) + } + return rpc.DialWebsocket(context.Background(), addr, "http://localhost") +} diff --git a/p2p/simulations/adapters/inproc.go b/p2p/simulations/adapters/inproc.go new file mode 100644 index 000000000..c97188def --- /dev/null +++ b/p2p/simulations/adapters/inproc.go @@ -0,0 +1,314 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package adapters + +import ( + "errors" + "fmt" + "math" + "net" + "sync" + + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/rpc" +) + +// SimAdapter is a NodeAdapter which creates in-memory simulation nodes and +// connects them using in-memory net.Pipe connections +type SimAdapter struct { + mtx sync.RWMutex + nodes map[discover.NodeID]*SimNode + services map[string]ServiceFunc +} + +// NewSimAdapter creates a SimAdapter which is capable of running in-memory +// simulation nodes running any of the given services (the services to run on a +// particular node are passed to the NewNode function in the NodeConfig) +func NewSimAdapter(services map[string]ServiceFunc) *SimAdapter { + return &SimAdapter{ + nodes: make(map[discover.NodeID]*SimNode), + services: services, + } +} + +// Name returns the name of the adapter for logging purposes +func (s *SimAdapter) Name() string { + return "sim-adapter" +} + +// NewNode returns a new SimNode using the given config +func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) { + s.mtx.Lock() + defer s.mtx.Unlock() + + // check a node with the ID doesn't already exist + id := config.ID + if _, exists := s.nodes[id]; exists { + return nil, fmt.Errorf("node already exists: %s", id) + } + + // check the services are valid + if len(config.Services) == 0 { + return nil, errors.New("node must have at least one service") + } + for _, service := range config.Services { + if _, exists := s.services[service]; !exists { + return nil, fmt.Errorf("unknown node service %q", service) + } + } + + n, err := node.New(&node.Config{ + P2P: p2p.Config{ + PrivateKey: config.PrivateKey, + MaxPeers: math.MaxInt32, + NoDiscovery: true, + Dialer: s, + EnableMsgEvents: true, + }, + NoUSB: true, + }) + if err != nil { + return nil, err + } + + simNode := &SimNode{ + ID: id, + config: config, + node: n, + adapter: s, + running: make(map[string]node.Service), + } + s.nodes[id] = simNode + return simNode, nil +} + +// Dial implements the p2p.NodeDialer interface by connecting to the node using +// an in-memory net.Pipe connection +func (s *SimAdapter) Dial(dest *discover.Node) (conn net.Conn, err error) { + node, ok := s.GetNode(dest.ID) + if !ok { + return nil, fmt.Errorf("unknown node: %s", dest.ID) + } + srv := node.Server() + if srv == nil { + return nil, fmt.Errorf("node not running: %s", dest.ID) + } + pipe1, pipe2 := net.Pipe() + go srv.SetupConn(pipe1, 0, nil) + return pipe2, nil +} + +// DialRPC implements the RPCDialer interface by creating an in-memory RPC +// client of the given node +func (s *SimAdapter) DialRPC(id discover.NodeID) (*rpc.Client, error) { + node, ok := s.GetNode(id) + if !ok { + return nil, fmt.Errorf("unknown node: %s", id) + } + handler, err := node.node.RPCHandler() + if err != nil { + return nil, err + } + return rpc.DialInProc(handler), nil +} + +// GetNode returns the node with the given ID if it exists +func (s *SimAdapter) GetNode(id discover.NodeID) (*SimNode, bool) { + s.mtx.RLock() + defer s.mtx.RUnlock() + node, ok := s.nodes[id] + return node, ok +} + +// SimNode is an in-memory simulation node which connects to other nodes using +// an in-memory net.Pipe connection (see SimAdapter.Dial), running devp2p +// protocols directly over that pipe +type SimNode struct { + lock sync.RWMutex + ID discover.NodeID + config *NodeConfig + adapter *SimAdapter + node *node.Node + running map[string]node.Service + client *rpc.Client + registerOnce sync.Once +} + +// Addr returns the node's discovery address +func (self *SimNode) Addr() []byte { + return []byte(self.Node().String()) +} + +// Node returns a discover.Node representing the SimNode +func (self *SimNode) Node() *discover.Node { + return discover.NewNode(self.ID, net.IP{127, 0, 0, 1}, 30303, 30303) +} + +// Client returns an rpc.Client which can be used to communicate with the +// underlying services (it is set once the node has started) +func (self *SimNode) Client() (*rpc.Client, error) { + self.lock.RLock() + defer self.lock.RUnlock() + if self.client == nil { + return nil, errors.New("node not started") + } + return self.client, nil +} + +// ServeRPC serves RPC requests over the given connection by creating an +// in-memory client to the node's RPC server +func (self *SimNode) ServeRPC(conn net.Conn) error { + handler, err := self.node.RPCHandler() + if err != nil { + return err + } + handler.ServeCodec(rpc.NewJSONCodec(conn), rpc.OptionMethodInvocation|rpc.OptionSubscriptions) + return nil +} + +// Snapshots creates snapshots of the services by calling the +// simulation_snapshot RPC method +func (self *SimNode) Snapshots() (map[string][]byte, error) { + self.lock.RLock() + services := make(map[string]node.Service, len(self.running)) + for name, service := range self.running { + services[name] = service + } + self.lock.RUnlock() + if len(services) == 0 { + return nil, errors.New("no running services") + } + snapshots := make(map[string][]byte) + for name, service := range services { + if s, ok := service.(interface { + Snapshot() ([]byte, error) + }); ok { + snap, err := s.Snapshot() + if err != nil { + return nil, err + } + snapshots[name] = snap + } + } + return snapshots, nil +} + +// Start registers the services and starts the underlying devp2p node +func (self *SimNode) Start(snapshots map[string][]byte) error { + newService := func(name string) func(ctx *node.ServiceContext) (node.Service, error) { + return func(nodeCtx *node.ServiceContext) (node.Service, error) { + ctx := &ServiceContext{ + RPCDialer: self.adapter, + NodeContext: nodeCtx, + Config: self.config, + } + if snapshots != nil { + ctx.Snapshot = snapshots[name] + } + serviceFunc := self.adapter.services[name] + service, err := serviceFunc(ctx) + if err != nil { + return nil, err + } + self.running[name] = service + return service, nil + } + } + + // ensure we only register the services once in the case of the node + // being stopped and then started again + var regErr error + self.registerOnce.Do(func() { + for _, name := range self.config.Services { + if err := self.node.Register(newService(name)); err != nil { + regErr = err + return + } + } + }) + if regErr != nil { + return regErr + } + + if err := self.node.Start(); err != nil { + return err + } + + // create an in-process RPC client + handler, err := self.node.RPCHandler() + if err != nil { + return err + } + + self.lock.Lock() + self.client = rpc.DialInProc(handler) + self.lock.Unlock() + + return nil +} + +// Stop closes the RPC client and stops the underlying devp2p node +func (self *SimNode) Stop() error { + self.lock.Lock() + if self.client != nil { + self.client.Close() + self.client = nil + } + self.lock.Unlock() + return self.node.Stop() +} + +// Services returns a copy of the underlying services +func (self *SimNode) Services() []node.Service { + self.lock.RLock() + defer self.lock.RUnlock() + services := make([]node.Service, 0, len(self.running)) + for _, service := range self.running { + services = append(services, service) + } + return services +} + +// Server returns the underlying p2p.Server +func (self *SimNode) Server() *p2p.Server { + return self.node.Server() +} + +// SubscribeEvents subscribes the given channel to peer events from the +// underlying p2p.Server +func (self *SimNode) SubscribeEvents(ch chan *p2p.PeerEvent) event.Subscription { + srv := self.Server() + if srv == nil { + panic("node not running") + } + return srv.SubscribeEvents(ch) +} + +// NodeInfo returns information about the node +func (self *SimNode) NodeInfo() *p2p.NodeInfo { + server := self.Server() + if server == nil { + return &p2p.NodeInfo{ + ID: self.ID.String(), + Enode: self.Node().String(), + } + } + return server.NodeInfo() +} diff --git a/p2p/simulations/adapters/types.go b/p2p/simulations/adapters/types.go new file mode 100644 index 000000000..ed6cfc504 --- /dev/null +++ b/p2p/simulations/adapters/types.go @@ -0,0 +1,215 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package adapters + +import ( + "crypto/ecdsa" + "encoding/hex" + "encoding/json" + "fmt" + "net" + "os" + + "github.com/docker/docker/pkg/reexec" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/rpc" +) + +// Node represents a node in a simulation network which is created by a +// NodeAdapter, for example: +// +// * SimNode - An in-memory node +// * ExecNode - A child process node +// * DockerNode - A Docker container node +// +type Node interface { + // Addr returns the node's address (e.g. an Enode URL) + Addr() []byte + + // Client returns the RPC client which is created once the node is + // up and running + Client() (*rpc.Client, error) + + // ServeRPC serves RPC requests over the given connection + ServeRPC(net.Conn) error + + // Start starts the node with the given snapshots + Start(snapshots map[string][]byte) error + + // Stop stops the node + Stop() error + + // NodeInfo returns information about the node + NodeInfo() *p2p.NodeInfo + + // Snapshots creates snapshots of the running services + Snapshots() (map[string][]byte, error) +} + +// NodeAdapter is used to create Nodes in a simulation network +type NodeAdapter interface { + // Name returns the name of the adapter for logging purposes + Name() string + + // NewNode creates a new node with the given configuration + NewNode(config *NodeConfig) (Node, error) +} + +// NodeConfig is the configuration used to start a node in a simulation +// network +type NodeConfig struct { + // ID is the node's ID which is used to identify the node in the + // simulation network + ID discover.NodeID + + // PrivateKey is the node's private key which is used by the devp2p + // stack to encrypt communications + PrivateKey *ecdsa.PrivateKey + + // Name is a human friendly name for the node like "node01" + Name string + + // Services are the names of the services which should be run when + // starting the node (for SimNodes it should be the names of services + // contained in SimAdapter.services, for other nodes it should be + // services registered by calling the RegisterService function) + Services []string +} + +// nodeConfigJSON is used to encode and decode NodeConfig as JSON by encoding +// all fields as strings +type nodeConfigJSON struct { + ID string `json:"id"` + PrivateKey string `json:"private_key"` + Name string `json:"name"` + Services []string `json:"services"` +} + +// MarshalJSON implements the json.Marshaler interface by encoding the config +// fields as strings +func (n *NodeConfig) MarshalJSON() ([]byte, error) { + confJSON := nodeConfigJSON{ + ID: n.ID.String(), + Name: n.Name, + Services: n.Services, + } + if n.PrivateKey != nil { + confJSON.PrivateKey = hex.EncodeToString(crypto.FromECDSA(n.PrivateKey)) + } + return json.Marshal(confJSON) +} + +// UnmarshalJSON implements the json.Unmarshaler interface by decoding the json +// string values into the config fields +func (n *NodeConfig) UnmarshalJSON(data []byte) error { + var confJSON nodeConfigJSON + if err := json.Unmarshal(data, &confJSON); err != nil { + return err + } + + if confJSON.ID != "" { + nodeID, err := discover.HexID(confJSON.ID) + if err != nil { + return err + } + n.ID = nodeID + } + + if confJSON.PrivateKey != "" { + key, err := hex.DecodeString(confJSON.PrivateKey) + if err != nil { + return err + } + privKey, err := crypto.ToECDSA(key) + if err != nil { + return err + } + n.PrivateKey = privKey + } + + n.Name = confJSON.Name + n.Services = confJSON.Services + + return nil +} + +// RandomNodeConfig returns node configuration with a randomly generated ID and +// PrivateKey +func RandomNodeConfig() *NodeConfig { + key, err := crypto.GenerateKey() + if err != nil { + panic("unable to generate key") + } + var id discover.NodeID + pubkey := crypto.FromECDSAPub(&key.PublicKey) + copy(id[:], pubkey[1:]) + return &NodeConfig{ + ID: id, + PrivateKey: key, + } +} + +// ServiceContext is a collection of options and methods which can be utilised +// when starting services +type ServiceContext struct { + RPCDialer + + NodeContext *node.ServiceContext + Config *NodeConfig + Snapshot []byte +} + +// RPCDialer is used when initialising services which need to connect to +// other nodes in the network (for example a simulated Swarm node which needs +// to connect to a Geth node to resolve ENS names) +type RPCDialer interface { + DialRPC(id discover.NodeID) (*rpc.Client, error) +} + +// Services is a collection of services which can be run in a simulation +type Services map[string]ServiceFunc + +// ServiceFunc returns a node.Service which can be used to boot a devp2p node +type ServiceFunc func(ctx *ServiceContext) (node.Service, error) + +// serviceFuncs is a map of registered services which are used to boot devp2p +// nodes +var serviceFuncs = make(Services) + +// RegisterServices registers the given Services which can then be used to +// start devp2p nodes using either the Exec or Docker adapters. +// +// It should be called in an init function so that it has the opportunity to +// execute the services before main() is called. +func RegisterServices(services Services) { + for name, f := range services { + if _, exists := serviceFuncs[name]; exists { + panic(fmt.Sprintf("node service already exists: %q", name)) + } + serviceFuncs[name] = f + } + + // now we have registered the services, run reexec.Init() which will + // potentially start one of the services if the current binary has + // been exec'd with argv[0] set to "p2p-node" + if reexec.Init() { + os.Exit(0) + } +} |