aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/simulations/adapters
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/simulations/adapters')
-rw-r--r--p2p/simulations/adapters/docker.go190
-rw-r--r--p2p/simulations/adapters/exec.go225
-rw-r--r--p2p/simulations/adapters/ws.go51
-rw-r--r--p2p/simulations/adapters/ws_test.go21
4 files changed, 127 insertions, 360 deletions
diff --git a/p2p/simulations/adapters/docker.go b/p2p/simulations/adapters/docker.go
deleted file mode 100644
index 82eab0e9c..000000000
--- a/p2p/simulations/adapters/docker.go
+++ /dev/null
@@ -1,190 +0,0 @@
-// Copyright 2017 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package adapters
-
-import (
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "os"
- "os/exec"
- "path/filepath"
- "runtime"
- "strings"
-
- "github.com/docker/docker/pkg/reexec"
- "github.com/ethereum/go-ethereum/node"
- "github.com/ethereum/go-ethereum/p2p/enode"
-)
-
-var (
- ErrLinuxOnly = errors.New("DockerAdapter can only be used on Linux as it uses the current binary (which must be a Linux binary)")
-)
-
-// DockerAdapter is a NodeAdapter which runs simulation nodes inside Docker
-// containers.
-//
-// A Docker image is built which contains the current binary at /bin/p2p-node
-// which when executed runs the underlying service (see the description
-// of the execP2PNode function for more details)
-type DockerAdapter struct {
- ExecAdapter
-}
-
-// NewDockerAdapter builds the p2p-node Docker image containing the current
-// binary and returns a DockerAdapter
-func NewDockerAdapter() (*DockerAdapter, error) {
- // Since Docker containers run on Linux and this adapter runs the
- // current binary in the container, it must be compiled for Linux.
- //
- // It is reasonable to require this because the caller can just
- // compile the current binary in a Docker container.
- if runtime.GOOS != "linux" {
- return nil, ErrLinuxOnly
- }
-
- if err := buildDockerImage(); err != nil {
- return nil, err
- }
-
- return &DockerAdapter{
- ExecAdapter{
- nodes: make(map[enode.ID]*ExecNode),
- },
- }, nil
-}
-
-// Name returns the name of the adapter for logging purposes
-func (d *DockerAdapter) Name() string {
- return "docker-adapter"
-}
-
-// NewNode returns a new DockerNode using the given config
-func (d *DockerAdapter) NewNode(config *NodeConfig) (Node, error) {
- if len(config.Services) == 0 {
- return nil, errors.New("node must have at least one service")
- }
- for _, service := range config.Services {
- if _, exists := serviceFuncs[service]; !exists {
- return nil, fmt.Errorf("unknown node service %q", service)
- }
- }
-
- // generate the config
- conf := &execNodeConfig{
- Stack: node.DefaultConfig,
- Node: config,
- }
- conf.Stack.DataDir = "/data"
- conf.Stack.WSHost = "0.0.0.0"
- conf.Stack.WSOrigins = []string{"*"}
- conf.Stack.WSExposeAll = true
- conf.Stack.P2P.EnableMsgEvents = false
- conf.Stack.P2P.NoDiscovery = true
- conf.Stack.P2P.NAT = nil
- conf.Stack.NoUSB = true
-
- // listen on all interfaces on a given port, which we set when we
- // initialise NodeConfig (usually a random port)
- conf.Stack.P2P.ListenAddr = fmt.Sprintf(":%d", config.Port)
-
- node := &DockerNode{
- ExecNode: ExecNode{
- ID: config.ID,
- Config: conf,
- adapter: &d.ExecAdapter,
- },
- }
- node.newCmd = node.dockerCommand
- d.ExecAdapter.nodes[node.ID] = &node.ExecNode
- return node, nil
-}
-
-// DockerNode wraps an ExecNode but exec's the current binary in a docker
-// container rather than locally
-type DockerNode struct {
- ExecNode
-}
-
-// dockerCommand returns a command which exec's the binary in a Docker
-// container.
-//
-// It uses a shell so that we can pass the _P2P_NODE_CONFIG environment
-// variable to the container using the --env flag.
-func (n *DockerNode) dockerCommand() *exec.Cmd {
- return exec.Command(
- "sh", "-c",
- fmt.Sprintf(
- `exec docker run --interactive --env _P2P_NODE_CONFIG="${_P2P_NODE_CONFIG}" %s p2p-node %s %s`,
- dockerImage, strings.Join(n.Config.Node.Services, ","), n.ID.String(),
- ),
- )
-}
-
-// dockerImage is the name of the Docker image which gets built to run the
-// simulation node
-const dockerImage = "p2p-node"
-
-// buildDockerImage builds the Docker image which is used to run the simulation
-// node in a Docker container.
-//
-// It adds the current binary as "p2p-node" so that it runs execP2PNode
-// when executed.
-func buildDockerImage() error {
- // create a directory to use as the build context
- dir, err := ioutil.TempDir("", "p2p-docker")
- if err != nil {
- return err
- }
- defer os.RemoveAll(dir)
-
- // copy the current binary into the build context
- bin, err := os.Open(reexec.Self())
- if err != nil {
- return err
- }
- defer bin.Close()
- dst, err := os.OpenFile(filepath.Join(dir, "self.bin"), os.O_WRONLY|os.O_CREATE, 0755)
- if err != nil {
- return err
- }
- defer dst.Close()
- if _, err := io.Copy(dst, bin); err != nil {
- return err
- }
-
- // create the Dockerfile
- dockerfile := []byte(`
-FROM ubuntu:16.04
-RUN mkdir /data
-ADD self.bin /bin/p2p-node
- `)
- if err := ioutil.WriteFile(filepath.Join(dir, "Dockerfile"), dockerfile, 0644); err != nil {
- return err
- }
-
- // run 'docker build'
- cmd := exec.Command("docker", "build", "-t", dockerImage, dir)
- cmd.Stdout = os.Stdout
- cmd.Stderr = os.Stderr
- if err := cmd.Run(); err != nil {
- return fmt.Errorf("error building docker image: %s", err)
- }
-
- return nil
-}
diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go
index dc7d277ca..abb196717 100644
--- a/p2p/simulations/adapters/exec.go
+++ b/p2p/simulations/adapters/exec.go
@@ -17,7 +17,7 @@
package adapters
import (
- "bufio"
+ "bytes"
"context"
"crypto/ecdsa"
"encoding/json"
@@ -25,6 +25,7 @@ import (
"fmt"
"io"
"net"
+ "net/http"
"os"
"os/exec"
"os/signal"
@@ -43,12 +44,14 @@ import (
"golang.org/x/net/websocket"
)
-// ExecAdapter is a NodeAdapter which runs simulation nodes by executing the
-// current binary as a child process.
-//
-// An init hook is used so that the child process executes the node services
-// (rather than whataver the main() function would normally do), see the
-// execP2PNode function for more information.
+func init() {
+ // Register a reexec function to start a simulation node when the current binary is
+ // executed as "p2p-node" (rather than whataver the main() function would normally do).
+ reexec.Register("p2p-node", execP2PNode)
+}
+
+// ExecAdapter is a NodeAdapter which runs simulation nodes by executing the current binary
+// as a child process.
type ExecAdapter struct {
// BaseDir is the directory under which the data directories for each
// simulation node are created.
@@ -150,15 +153,13 @@ func (n *ExecNode) Client() (*rpc.Client, error) {
}
// Start exec's the node passing the ID and service as command line arguments
-// and the node config encoded as JSON in the _P2P_NODE_CONFIG environment
-// variable
+// and the node config encoded as JSON in an environment variable.
func (n *ExecNode) Start(snapshots map[string][]byte) (err error) {
if n.Cmd != nil {
return errors.New("already started")
}
defer func() {
if err != nil {
- log.Error("node failed to start", "err", err)
n.Stop()
}
}()
@@ -175,59 +176,78 @@ func (n *ExecNode) Start(snapshots map[string][]byte) (err error) {
return fmt.Errorf("error generating node config: %s", err)
}
- // use a pipe for stderr so we can both copy the node's stderr to
- // os.Stderr and read the WebSocket address from the logs
- stderrR, stderrW := io.Pipe()
- stderr := io.MultiWriter(os.Stderr, stderrW)
+ // start the one-shot server that waits for startup information
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ statusURL, statusC := n.waitForStartupJSON(ctx)
// start the node
cmd := n.newCmd()
cmd.Stdout = os.Stdout
- cmd.Stderr = stderr
- cmd.Env = append(os.Environ(), fmt.Sprintf("_P2P_NODE_CONFIG=%s", confData))
+ cmd.Stderr = os.Stderr
+ cmd.Env = append(os.Environ(),
+ envStatusURL+"="+statusURL,
+ envNodeConfig+"="+string(confData),
+ )
if err := cmd.Start(); err != nil {
return fmt.Errorf("error starting node: %s", err)
}
n.Cmd = cmd
// read the WebSocket address from the stderr logs
- var wsAddr string
- wsAddrC := make(chan string)
- go func() {
- s := bufio.NewScanner(stderrR)
- for s.Scan() {
- if strings.Contains(s.Text(), "WebSocket endpoint opened") {
- wsAddrC <- wsAddrPattern.FindString(s.Text())
- }
- }
- }()
- select {
- case wsAddr = <-wsAddrC:
- if wsAddr == "" {
- return errors.New("failed to read WebSocket address from stderr")
- }
- case <-time.After(10 * time.Second):
- return errors.New("timed out waiting for WebSocket address on stderr")
+ status := <-statusC
+ if status.Err != "" {
+ return errors.New(status.Err)
}
-
- // create the RPC client and load the node info
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
- client, err := rpc.DialWebsocket(ctx, wsAddr, "")
+ client, err := rpc.DialWebsocket(ctx, status.WSEndpoint, "http://localhost")
if err != nil {
- return fmt.Errorf("error dialing rpc websocket: %s", err)
+ return fmt.Errorf("can't connect to RPC server: %v", err)
}
- var info p2p.NodeInfo
- if err := client.CallContext(ctx, &info, "admin_nodeInfo"); err != nil {
- return fmt.Errorf("error getting node info: %s", err)
- }
- n.client = client
- n.wsAddr = wsAddr
- n.Info = &info
+ // node ready :)
+ n.client = client
+ n.wsAddr = status.WSEndpoint
+ n.Info = status.NodeInfo
return nil
}
+// waitForStartupJSON runs a one-shot HTTP server to receive a startup report.
+func (n *ExecNode) waitForStartupJSON(ctx context.Context) (string, chan nodeStartupJSON) {
+ var (
+ ch = make(chan nodeStartupJSON, 1)
+ quitOnce sync.Once
+ srv http.Server
+ )
+ l, err := net.Listen("tcp", "127.0.0.1:0")
+ if err != nil {
+ ch <- nodeStartupJSON{Err: err.Error()}
+ return "", ch
+ }
+ quit := func(status nodeStartupJSON) {
+ quitOnce.Do(func() {
+ l.Close()
+ ch <- status
+ })
+ }
+ srv.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ var status nodeStartupJSON
+ if err := json.NewDecoder(r.Body).Decode(&status); err != nil {
+ status.Err = fmt.Sprintf("can't decode startup report: %v", err)
+ }
+ quit(status)
+ })
+ // Run the HTTP server, but don't wait forever and shut it down
+ // if the context is canceled.
+ go srv.Serve(l)
+ go func() {
+ <-ctx.Done()
+ quit(nodeStartupJSON{Err: "didn't get startup report"})
+ }()
+
+ url := "http://" + l.Addr().String()
+ return url, ch
+}
+
// execCommand returns a command which runs the node locally by exec'ing
// the current binary but setting argv[0] to "p2p-node" so that the child
// runs execP2PNode
@@ -318,12 +338,6 @@ func (n *ExecNode) Snapshots() (map[string][]byte, error) {
return snapshots, n.client.Call(&snapshots, "simulation_snapshot")
}
-func init() {
- // register a reexec function to start a devp2p node when the current
- // binary is executed as "p2p-node"
- reexec.Register("p2p-node", execP2PNode)
-}
-
// execNodeConfig is used to serialize the node configuration so it can be
// passed to the child process as a JSON encoded environment variable
type execNodeConfig struct {
@@ -333,55 +347,69 @@ type execNodeConfig struct {
PeerAddrs map[string]string `json:"peer_addrs,omitempty"`
}
-// ExternalIP gets an external IP address so that Enode URL is usable
-func ExternalIP() net.IP {
- addrs, err := net.InterfaceAddrs()
- if err != nil {
- log.Crit("error getting IP address", "err", err)
- }
- for _, addr := range addrs {
- if ip, ok := addr.(*net.IPNet); ok && !ip.IP.IsLoopback() && !ip.IP.IsLinkLocalUnicast() {
- return ip.IP
- }
- }
- log.Warn("unable to determine explicit IP address, falling back to loopback")
- return net.IP{127, 0, 0, 1}
-}
-
-// execP2PNode starts a devp2p node when the current binary is executed with
+// execP2PNode starts a simulation node when the current binary is executed with
// argv[0] being "p2p-node", reading the service / ID from argv[1] / argv[2]
-// and the node config from the _P2P_NODE_CONFIG environment variable
+// and the node config from an environment variable.
func execP2PNode() {
glogger := log.NewGlogHandler(log.StreamHandler(os.Stderr, log.LogfmtFormat()))
glogger.Verbosity(log.LvlInfo)
log.Root().SetHandler(glogger)
+ statusURL := os.Getenv(envStatusURL)
+ if statusURL == "" {
+ log.Crit("missing " + envStatusURL)
+ }
+
+ // Start the node and gather startup report.
+ var status nodeStartupJSON
+ stack, stackErr := startExecNodeStack()
+ if stackErr != nil {
+ status.Err = stackErr.Error()
+ } else {
+ status.WSEndpoint = "ws://" + stack.WSEndpoint()
+ status.NodeInfo = stack.Server().NodeInfo()
+ }
+
+ // Send status to the host.
+ statusJSON, _ := json.Marshal(status)
+ if _, err := http.Post(statusURL, "application/json", bytes.NewReader(statusJSON)); err != nil {
+ log.Crit("Can't post startup info", "url", statusURL, "err", err)
+ }
+ if stackErr != nil {
+ os.Exit(1)
+ }
+
+ // Stop the stack if we get a SIGTERM signal.
+ go func() {
+ sigc := make(chan os.Signal, 1)
+ signal.Notify(sigc, syscall.SIGTERM)
+ defer signal.Stop(sigc)
+ <-sigc
+ log.Info("Received SIGTERM, shutting down...")
+ stack.Stop()
+ }()
+ stack.Wait() // Wait for the stack to exit.
+}
+func startExecNodeStack() (*node.Node, error) {
// read the services from argv
serviceNames := strings.Split(os.Args[1], ",")
// decode the config
- confEnv := os.Getenv("_P2P_NODE_CONFIG")
+ confEnv := os.Getenv(envNodeConfig)
if confEnv == "" {
- log.Crit("missing _P2P_NODE_CONFIG")
+ return nil, fmt.Errorf("missing " + envNodeConfig)
}
var conf execNodeConfig
if err := json.Unmarshal([]byte(confEnv), &conf); err != nil {
- log.Crit("error decoding _P2P_NODE_CONFIG", "err", err)
+ return nil, fmt.Errorf("error decoding %s: %v", envNodeConfig, err)
}
conf.Stack.P2P.PrivateKey = conf.Node.PrivateKey
conf.Stack.Logger = log.New("node.id", conf.Node.ID.String())
- if strings.HasPrefix(conf.Stack.P2P.ListenAddr, ":") {
- conf.Stack.P2P.ListenAddr = ExternalIP().String() + conf.Stack.P2P.ListenAddr
- }
- if conf.Stack.WSHost == "0.0.0.0" {
- conf.Stack.WSHost = ExternalIP().String()
- }
-
// initialize the devp2p stack
stack, err := node.New(&conf.Stack)
if err != nil {
- log.Crit("error creating node stack", "err", err)
+ return nil, fmt.Errorf("error creating node stack: %v", err)
}
// register the services, collecting them into a map so we can wrap
@@ -390,7 +418,7 @@ func execP2PNode() {
for _, name := range serviceNames {
serviceFunc, exists := serviceFuncs[name]
if !exists {
- log.Crit("unknown node service", "name", name)
+ return nil, fmt.Errorf("unknown node service %q", err)
}
constructor := func(nodeCtx *node.ServiceContext) (node.Service, error) {
ctx := &ServiceContext{
@@ -409,34 +437,35 @@ func execP2PNode() {
return service, nil
}
if err := stack.Register(constructor); err != nil {
- log.Crit("error starting service", "name", name, "err", err)
+ return stack, fmt.Errorf("error registering service %q: %v", name, err)
}
}
// register the snapshot service
- if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
+ err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
return &snapshotService{services}, nil
- }); err != nil {
- log.Crit("error starting snapshot service", "err", err)
+ })
+ if err != nil {
+ return stack, fmt.Errorf("error starting snapshot service: %v", err)
}
// start the stack
- if err := stack.Start(); err != nil {
- log.Crit("error stating node stack", "err", err)
+ if err = stack.Start(); err != nil {
+ err = fmt.Errorf("error starting stack: %v", err)
}
+ return stack, err
+}
- // stop the stack if we get a SIGTERM signal
- go func() {
- sigc := make(chan os.Signal, 1)
- signal.Notify(sigc, syscall.SIGTERM)
- defer signal.Stop(sigc)
- <-sigc
- log.Info("Received SIGTERM, shutting down...")
- stack.Stop()
- }()
+const (
+ envStatusURL = "_P2P_STATUS_URL"
+ envNodeConfig = "_P2P_NODE_CONFIG"
+)
- // wait for the stack to exit
- stack.Wait()
+// nodeStartupJSON is sent to the simulation host after startup.
+type nodeStartupJSON struct {
+ Err string
+ WSEndpoint string
+ NodeInfo *p2p.NodeInfo
}
// snapshotService is a node.Service which wraps a list of services and
diff --git a/p2p/simulations/adapters/ws.go b/p2p/simulations/adapters/ws.go
deleted file mode 100644
index 979a21709..000000000
--- a/p2p/simulations/adapters/ws.go
+++ /dev/null
@@ -1,51 +0,0 @@
-package adapters
-
-import (
- "bufio"
- "errors"
- "io"
- "regexp"
- "strings"
- "time"
-)
-
-// wsAddrPattern is a regex used to read the WebSocket address from the node's
-// log
-var wsAddrPattern = regexp.MustCompile(`ws://[\d.:]+`)
-
-func matchWSAddr(str string) (string, bool) {
- if !strings.Contains(str, "WebSocket endpoint opened") {
- return "", false
- }
-
- return wsAddrPattern.FindString(str), true
-}
-
-// findWSAddr scans through reader r, looking for the log entry with
-// WebSocket address information.
-func findWSAddr(r io.Reader, timeout time.Duration) (string, error) {
- ch := make(chan string)
-
- go func() {
- s := bufio.NewScanner(r)
- for s.Scan() {
- addr, ok := matchWSAddr(s.Text())
- if ok {
- ch <- addr
- }
- }
- close(ch)
- }()
-
- var wsAddr string
- select {
- case wsAddr = <-ch:
- if wsAddr == "" {
- return "", errors.New("empty result")
- }
- case <-time.After(timeout):
- return "", errors.New("timed out")
- }
-
- return wsAddr, nil
-}
diff --git a/p2p/simulations/adapters/ws_test.go b/p2p/simulations/adapters/ws_test.go
deleted file mode 100644
index 0bb9ed2b2..000000000
--- a/p2p/simulations/adapters/ws_test.go
+++ /dev/null
@@ -1,21 +0,0 @@
-package adapters
-
-import (
- "bytes"
- "testing"
- "time"
-)
-
-func TestFindWSAddr(t *testing.T) {
- line := `t=2018-05-02T19:00:45+0200 lvl=info msg="WebSocket endpoint opened" node.id=26c65a606d1125a44695bc08573190d047152b6b9a776ccbbe593e90f91444d9c1ebdadac6a775ad9fdd0923468a1d698ed3a842c1fb89c1bc0f9d4801f8c39c url=ws://127.0.0.1:59975`
- buf := bytes.NewBufferString(line)
- got, err := findWSAddr(buf, 10*time.Second)
- if err != nil {
- t.Fatalf("Failed to find addr: %v", err)
- }
- expected := `ws://127.0.0.1:59975`
-
- if got != expected {
- t.Fatalf("Expected to get '%s', but got '%s'", expected, got)
- }
-}