diff options
Diffstat (limited to 'p2p/simulations/adapters')
-rw-r--r-- | p2p/simulations/adapters/docker.go | 190 | ||||
-rw-r--r-- | p2p/simulations/adapters/exec.go | 225 | ||||
-rw-r--r-- | p2p/simulations/adapters/ws.go | 51 | ||||
-rw-r--r-- | p2p/simulations/adapters/ws_test.go | 21 |
4 files changed, 127 insertions, 360 deletions
diff --git a/p2p/simulations/adapters/docker.go b/p2p/simulations/adapters/docker.go deleted file mode 100644 index 82eab0e9c..000000000 --- a/p2p/simulations/adapters/docker.go +++ /dev/null @@ -1,190 +0,0 @@ -// Copyright 2017 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package adapters - -import ( - "errors" - "fmt" - "io" - "io/ioutil" - "os" - "os/exec" - "path/filepath" - "runtime" - "strings" - - "github.com/docker/docker/pkg/reexec" - "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/p2p/enode" -) - -var ( - ErrLinuxOnly = errors.New("DockerAdapter can only be used on Linux as it uses the current binary (which must be a Linux binary)") -) - -// DockerAdapter is a NodeAdapter which runs simulation nodes inside Docker -// containers. -// -// A Docker image is built which contains the current binary at /bin/p2p-node -// which when executed runs the underlying service (see the description -// of the execP2PNode function for more details) -type DockerAdapter struct { - ExecAdapter -} - -// NewDockerAdapter builds the p2p-node Docker image containing the current -// binary and returns a DockerAdapter -func NewDockerAdapter() (*DockerAdapter, error) { - // Since Docker containers run on Linux and this adapter runs the - // current binary in the container, it must be compiled for Linux. - // - // It is reasonable to require this because the caller can just - // compile the current binary in a Docker container. - if runtime.GOOS != "linux" { - return nil, ErrLinuxOnly - } - - if err := buildDockerImage(); err != nil { - return nil, err - } - - return &DockerAdapter{ - ExecAdapter{ - nodes: make(map[enode.ID]*ExecNode), - }, - }, nil -} - -// Name returns the name of the adapter for logging purposes -func (d *DockerAdapter) Name() string { - return "docker-adapter" -} - -// NewNode returns a new DockerNode using the given config -func (d *DockerAdapter) NewNode(config *NodeConfig) (Node, error) { - if len(config.Services) == 0 { - return nil, errors.New("node must have at least one service") - } - for _, service := range config.Services { - if _, exists := serviceFuncs[service]; !exists { - return nil, fmt.Errorf("unknown node service %q", service) - } - } - - // generate the config - conf := &execNodeConfig{ - Stack: node.DefaultConfig, - Node: config, - } - conf.Stack.DataDir = "/data" - conf.Stack.WSHost = "0.0.0.0" - conf.Stack.WSOrigins = []string{"*"} - conf.Stack.WSExposeAll = true - conf.Stack.P2P.EnableMsgEvents = false - conf.Stack.P2P.NoDiscovery = true - conf.Stack.P2P.NAT = nil - conf.Stack.NoUSB = true - - // listen on all interfaces on a given port, which we set when we - // initialise NodeConfig (usually a random port) - conf.Stack.P2P.ListenAddr = fmt.Sprintf(":%d", config.Port) - - node := &DockerNode{ - ExecNode: ExecNode{ - ID: config.ID, - Config: conf, - adapter: &d.ExecAdapter, - }, - } - node.newCmd = node.dockerCommand - d.ExecAdapter.nodes[node.ID] = &node.ExecNode - return node, nil -} - -// DockerNode wraps an ExecNode but exec's the current binary in a docker -// container rather than locally -type DockerNode struct { - ExecNode -} - -// dockerCommand returns a command which exec's the binary in a Docker -// container. -// -// It uses a shell so that we can pass the _P2P_NODE_CONFIG environment -// variable to the container using the --env flag. -func (n *DockerNode) dockerCommand() *exec.Cmd { - return exec.Command( - "sh", "-c", - fmt.Sprintf( - `exec docker run --interactive --env _P2P_NODE_CONFIG="${_P2P_NODE_CONFIG}" %s p2p-node %s %s`, - dockerImage, strings.Join(n.Config.Node.Services, ","), n.ID.String(), - ), - ) -} - -// dockerImage is the name of the Docker image which gets built to run the -// simulation node -const dockerImage = "p2p-node" - -// buildDockerImage builds the Docker image which is used to run the simulation -// node in a Docker container. -// -// It adds the current binary as "p2p-node" so that it runs execP2PNode -// when executed. -func buildDockerImage() error { - // create a directory to use as the build context - dir, err := ioutil.TempDir("", "p2p-docker") - if err != nil { - return err - } - defer os.RemoveAll(dir) - - // copy the current binary into the build context - bin, err := os.Open(reexec.Self()) - if err != nil { - return err - } - defer bin.Close() - dst, err := os.OpenFile(filepath.Join(dir, "self.bin"), os.O_WRONLY|os.O_CREATE, 0755) - if err != nil { - return err - } - defer dst.Close() - if _, err := io.Copy(dst, bin); err != nil { - return err - } - - // create the Dockerfile - dockerfile := []byte(` -FROM ubuntu:16.04 -RUN mkdir /data -ADD self.bin /bin/p2p-node - `) - if err := ioutil.WriteFile(filepath.Join(dir, "Dockerfile"), dockerfile, 0644); err != nil { - return err - } - - // run 'docker build' - cmd := exec.Command("docker", "build", "-t", dockerImage, dir) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - if err := cmd.Run(); err != nil { - return fmt.Errorf("error building docker image: %s", err) - } - - return nil -} diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go index dc7d277ca..abb196717 100644 --- a/p2p/simulations/adapters/exec.go +++ b/p2p/simulations/adapters/exec.go @@ -17,7 +17,7 @@ package adapters import ( - "bufio" + "bytes" "context" "crypto/ecdsa" "encoding/json" @@ -25,6 +25,7 @@ import ( "fmt" "io" "net" + "net/http" "os" "os/exec" "os/signal" @@ -43,12 +44,14 @@ import ( "golang.org/x/net/websocket" ) -// ExecAdapter is a NodeAdapter which runs simulation nodes by executing the -// current binary as a child process. -// -// An init hook is used so that the child process executes the node services -// (rather than whataver the main() function would normally do), see the -// execP2PNode function for more information. +func init() { + // Register a reexec function to start a simulation node when the current binary is + // executed as "p2p-node" (rather than whataver the main() function would normally do). + reexec.Register("p2p-node", execP2PNode) +} + +// ExecAdapter is a NodeAdapter which runs simulation nodes by executing the current binary +// as a child process. type ExecAdapter struct { // BaseDir is the directory under which the data directories for each // simulation node are created. @@ -150,15 +153,13 @@ func (n *ExecNode) Client() (*rpc.Client, error) { } // Start exec's the node passing the ID and service as command line arguments -// and the node config encoded as JSON in the _P2P_NODE_CONFIG environment -// variable +// and the node config encoded as JSON in an environment variable. func (n *ExecNode) Start(snapshots map[string][]byte) (err error) { if n.Cmd != nil { return errors.New("already started") } defer func() { if err != nil { - log.Error("node failed to start", "err", err) n.Stop() } }() @@ -175,59 +176,78 @@ func (n *ExecNode) Start(snapshots map[string][]byte) (err error) { return fmt.Errorf("error generating node config: %s", err) } - // use a pipe for stderr so we can both copy the node's stderr to - // os.Stderr and read the WebSocket address from the logs - stderrR, stderrW := io.Pipe() - stderr := io.MultiWriter(os.Stderr, stderrW) + // start the one-shot server that waits for startup information + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + statusURL, statusC := n.waitForStartupJSON(ctx) // start the node cmd := n.newCmd() cmd.Stdout = os.Stdout - cmd.Stderr = stderr - cmd.Env = append(os.Environ(), fmt.Sprintf("_P2P_NODE_CONFIG=%s", confData)) + cmd.Stderr = os.Stderr + cmd.Env = append(os.Environ(), + envStatusURL+"="+statusURL, + envNodeConfig+"="+string(confData), + ) if err := cmd.Start(); err != nil { return fmt.Errorf("error starting node: %s", err) } n.Cmd = cmd // read the WebSocket address from the stderr logs - var wsAddr string - wsAddrC := make(chan string) - go func() { - s := bufio.NewScanner(stderrR) - for s.Scan() { - if strings.Contains(s.Text(), "WebSocket endpoint opened") { - wsAddrC <- wsAddrPattern.FindString(s.Text()) - } - } - }() - select { - case wsAddr = <-wsAddrC: - if wsAddr == "" { - return errors.New("failed to read WebSocket address from stderr") - } - case <-time.After(10 * time.Second): - return errors.New("timed out waiting for WebSocket address on stderr") + status := <-statusC + if status.Err != "" { + return errors.New(status.Err) } - - // create the RPC client and load the node info - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - client, err := rpc.DialWebsocket(ctx, wsAddr, "") + client, err := rpc.DialWebsocket(ctx, status.WSEndpoint, "http://localhost") if err != nil { - return fmt.Errorf("error dialing rpc websocket: %s", err) + return fmt.Errorf("can't connect to RPC server: %v", err) } - var info p2p.NodeInfo - if err := client.CallContext(ctx, &info, "admin_nodeInfo"); err != nil { - return fmt.Errorf("error getting node info: %s", err) - } - n.client = client - n.wsAddr = wsAddr - n.Info = &info + // node ready :) + n.client = client + n.wsAddr = status.WSEndpoint + n.Info = status.NodeInfo return nil } +// waitForStartupJSON runs a one-shot HTTP server to receive a startup report. +func (n *ExecNode) waitForStartupJSON(ctx context.Context) (string, chan nodeStartupJSON) { + var ( + ch = make(chan nodeStartupJSON, 1) + quitOnce sync.Once + srv http.Server + ) + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + ch <- nodeStartupJSON{Err: err.Error()} + return "", ch + } + quit := func(status nodeStartupJSON) { + quitOnce.Do(func() { + l.Close() + ch <- status + }) + } + srv.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var status nodeStartupJSON + if err := json.NewDecoder(r.Body).Decode(&status); err != nil { + status.Err = fmt.Sprintf("can't decode startup report: %v", err) + } + quit(status) + }) + // Run the HTTP server, but don't wait forever and shut it down + // if the context is canceled. + go srv.Serve(l) + go func() { + <-ctx.Done() + quit(nodeStartupJSON{Err: "didn't get startup report"}) + }() + + url := "http://" + l.Addr().String() + return url, ch +} + // execCommand returns a command which runs the node locally by exec'ing // the current binary but setting argv[0] to "p2p-node" so that the child // runs execP2PNode @@ -318,12 +338,6 @@ func (n *ExecNode) Snapshots() (map[string][]byte, error) { return snapshots, n.client.Call(&snapshots, "simulation_snapshot") } -func init() { - // register a reexec function to start a devp2p node when the current - // binary is executed as "p2p-node" - reexec.Register("p2p-node", execP2PNode) -} - // execNodeConfig is used to serialize the node configuration so it can be // passed to the child process as a JSON encoded environment variable type execNodeConfig struct { @@ -333,55 +347,69 @@ type execNodeConfig struct { PeerAddrs map[string]string `json:"peer_addrs,omitempty"` } -// ExternalIP gets an external IP address so that Enode URL is usable -func ExternalIP() net.IP { - addrs, err := net.InterfaceAddrs() - if err != nil { - log.Crit("error getting IP address", "err", err) - } - for _, addr := range addrs { - if ip, ok := addr.(*net.IPNet); ok && !ip.IP.IsLoopback() && !ip.IP.IsLinkLocalUnicast() { - return ip.IP - } - } - log.Warn("unable to determine explicit IP address, falling back to loopback") - return net.IP{127, 0, 0, 1} -} - -// execP2PNode starts a devp2p node when the current binary is executed with +// execP2PNode starts a simulation node when the current binary is executed with // argv[0] being "p2p-node", reading the service / ID from argv[1] / argv[2] -// and the node config from the _P2P_NODE_CONFIG environment variable +// and the node config from an environment variable. func execP2PNode() { glogger := log.NewGlogHandler(log.StreamHandler(os.Stderr, log.LogfmtFormat())) glogger.Verbosity(log.LvlInfo) log.Root().SetHandler(glogger) + statusURL := os.Getenv(envStatusURL) + if statusURL == "" { + log.Crit("missing " + envStatusURL) + } + + // Start the node and gather startup report. + var status nodeStartupJSON + stack, stackErr := startExecNodeStack() + if stackErr != nil { + status.Err = stackErr.Error() + } else { + status.WSEndpoint = "ws://" + stack.WSEndpoint() + status.NodeInfo = stack.Server().NodeInfo() + } + + // Send status to the host. + statusJSON, _ := json.Marshal(status) + if _, err := http.Post(statusURL, "application/json", bytes.NewReader(statusJSON)); err != nil { + log.Crit("Can't post startup info", "url", statusURL, "err", err) + } + if stackErr != nil { + os.Exit(1) + } + + // Stop the stack if we get a SIGTERM signal. + go func() { + sigc := make(chan os.Signal, 1) + signal.Notify(sigc, syscall.SIGTERM) + defer signal.Stop(sigc) + <-sigc + log.Info("Received SIGTERM, shutting down...") + stack.Stop() + }() + stack.Wait() // Wait for the stack to exit. +} +func startExecNodeStack() (*node.Node, error) { // read the services from argv serviceNames := strings.Split(os.Args[1], ",") // decode the config - confEnv := os.Getenv("_P2P_NODE_CONFIG") + confEnv := os.Getenv(envNodeConfig) if confEnv == "" { - log.Crit("missing _P2P_NODE_CONFIG") + return nil, fmt.Errorf("missing " + envNodeConfig) } var conf execNodeConfig if err := json.Unmarshal([]byte(confEnv), &conf); err != nil { - log.Crit("error decoding _P2P_NODE_CONFIG", "err", err) + return nil, fmt.Errorf("error decoding %s: %v", envNodeConfig, err) } conf.Stack.P2P.PrivateKey = conf.Node.PrivateKey conf.Stack.Logger = log.New("node.id", conf.Node.ID.String()) - if strings.HasPrefix(conf.Stack.P2P.ListenAddr, ":") { - conf.Stack.P2P.ListenAddr = ExternalIP().String() + conf.Stack.P2P.ListenAddr - } - if conf.Stack.WSHost == "0.0.0.0" { - conf.Stack.WSHost = ExternalIP().String() - } - // initialize the devp2p stack stack, err := node.New(&conf.Stack) if err != nil { - log.Crit("error creating node stack", "err", err) + return nil, fmt.Errorf("error creating node stack: %v", err) } // register the services, collecting them into a map so we can wrap @@ -390,7 +418,7 @@ func execP2PNode() { for _, name := range serviceNames { serviceFunc, exists := serviceFuncs[name] if !exists { - log.Crit("unknown node service", "name", name) + return nil, fmt.Errorf("unknown node service %q", err) } constructor := func(nodeCtx *node.ServiceContext) (node.Service, error) { ctx := &ServiceContext{ @@ -409,34 +437,35 @@ func execP2PNode() { return service, nil } if err := stack.Register(constructor); err != nil { - log.Crit("error starting service", "name", name, "err", err) + return stack, fmt.Errorf("error registering service %q: %v", name, err) } } // register the snapshot service - if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { + err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { return &snapshotService{services}, nil - }); err != nil { - log.Crit("error starting snapshot service", "err", err) + }) + if err != nil { + return stack, fmt.Errorf("error starting snapshot service: %v", err) } // start the stack - if err := stack.Start(); err != nil { - log.Crit("error stating node stack", "err", err) + if err = stack.Start(); err != nil { + err = fmt.Errorf("error starting stack: %v", err) } + return stack, err +} - // stop the stack if we get a SIGTERM signal - go func() { - sigc := make(chan os.Signal, 1) - signal.Notify(sigc, syscall.SIGTERM) - defer signal.Stop(sigc) - <-sigc - log.Info("Received SIGTERM, shutting down...") - stack.Stop() - }() +const ( + envStatusURL = "_P2P_STATUS_URL" + envNodeConfig = "_P2P_NODE_CONFIG" +) - // wait for the stack to exit - stack.Wait() +// nodeStartupJSON is sent to the simulation host after startup. +type nodeStartupJSON struct { + Err string + WSEndpoint string + NodeInfo *p2p.NodeInfo } // snapshotService is a node.Service which wraps a list of services and diff --git a/p2p/simulations/adapters/ws.go b/p2p/simulations/adapters/ws.go deleted file mode 100644 index 979a21709..000000000 --- a/p2p/simulations/adapters/ws.go +++ /dev/null @@ -1,51 +0,0 @@ -package adapters - -import ( - "bufio" - "errors" - "io" - "regexp" - "strings" - "time" -) - -// wsAddrPattern is a regex used to read the WebSocket address from the node's -// log -var wsAddrPattern = regexp.MustCompile(`ws://[\d.:]+`) - -func matchWSAddr(str string) (string, bool) { - if !strings.Contains(str, "WebSocket endpoint opened") { - return "", false - } - - return wsAddrPattern.FindString(str), true -} - -// findWSAddr scans through reader r, looking for the log entry with -// WebSocket address information. -func findWSAddr(r io.Reader, timeout time.Duration) (string, error) { - ch := make(chan string) - - go func() { - s := bufio.NewScanner(r) - for s.Scan() { - addr, ok := matchWSAddr(s.Text()) - if ok { - ch <- addr - } - } - close(ch) - }() - - var wsAddr string - select { - case wsAddr = <-ch: - if wsAddr == "" { - return "", errors.New("empty result") - } - case <-time.After(timeout): - return "", errors.New("timed out") - } - - return wsAddr, nil -} diff --git a/p2p/simulations/adapters/ws_test.go b/p2p/simulations/adapters/ws_test.go deleted file mode 100644 index 0bb9ed2b2..000000000 --- a/p2p/simulations/adapters/ws_test.go +++ /dev/null @@ -1,21 +0,0 @@ -package adapters - -import ( - "bytes" - "testing" - "time" -) - -func TestFindWSAddr(t *testing.T) { - line := `t=2018-05-02T19:00:45+0200 lvl=info msg="WebSocket endpoint opened" node.id=26c65a606d1125a44695bc08573190d047152b6b9a776ccbbe593e90f91444d9c1ebdadac6a775ad9fdd0923468a1d698ed3a842c1fb89c1bc0f9d4801f8c39c url=ws://127.0.0.1:59975` - buf := bytes.NewBufferString(line) - got, err := findWSAddr(buf, 10*time.Second) - if err != nil { - t.Fatalf("Failed to find addr: %v", err) - } - expected := `ws://127.0.0.1:59975` - - if got != expected { - t.Fatalf("Expected to get '%s', but got '%s'", expected, got) - } -} |