diff options
author | Felix Lange <fjl@users.noreply.github.com> | 2018-10-12 02:32:14 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-10-12 02:32:14 +0800 |
commit | dcae0d348bb7f5d9052e50a83383a33538ce376a (patch) | |
tree | 9dee15f209bd31c3adf32fef3835a06bda8f18e4 /p2p/simulations/adapters/exec.go | |
parent | f951e23fb5ad2f7017f314a95287bc0506a67d05 (diff) | |
download | go-tangerine-dcae0d348bb7f5d9052e50a83383a33538ce376a.tar go-tangerine-dcae0d348bb7f5d9052e50a83383a33538ce376a.tar.gz go-tangerine-dcae0d348bb7f5d9052e50a83383a33538ce376a.tar.bz2 go-tangerine-dcae0d348bb7f5d9052e50a83383a33538ce376a.tar.lz go-tangerine-dcae0d348bb7f5d9052e50a83383a33538ce376a.tar.xz go-tangerine-dcae0d348bb7f5d9052e50a83383a33538ce376a.tar.zst go-tangerine-dcae0d348bb7f5d9052e50a83383a33538ce376a.zip |
p2p/simulations: fix a deadlock and clean up adapters (#17891)
This fixes a rare deadlock with the inproc adapter:
- A node is stopped, which acquires Network.lock.
- The protocol code being simulated (swarm/network in my case)
waits for its goroutines to shut down.
- One of those goroutines calls into the simulation to add a peer,
which waits for Network.lock.
The fix for the deadlock is really simple, just release the lock
before stopping the simulation node.
Other changes in this PR clean up the exec adapter so it reports
node startup errors better and remove the docker adapter because
it just adds overhead.
In the exec adapter, node information is now posted to a one-shot
server. This avoids log parsing and allows reporting startup
errors to the simulation host.
A small change in package node was needed because simulation
nodes use port zero. Node.{HTTP,WS}Endpoint now return the live
endpoints after startup by checking the TCP listener.
Diffstat (limited to 'p2p/simulations/adapters/exec.go')
-rw-r--r-- | p2p/simulations/adapters/exec.go | 225 |
1 files changed, 127 insertions, 98 deletions
diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go index dc7d277ca..abb196717 100644 --- a/p2p/simulations/adapters/exec.go +++ b/p2p/simulations/adapters/exec.go @@ -17,7 +17,7 @@ package adapters import ( - "bufio" + "bytes" "context" "crypto/ecdsa" "encoding/json" @@ -25,6 +25,7 @@ import ( "fmt" "io" "net" + "net/http" "os" "os/exec" "os/signal" @@ -43,12 +44,14 @@ import ( "golang.org/x/net/websocket" ) -// ExecAdapter is a NodeAdapter which runs simulation nodes by executing the -// current binary as a child process. -// -// An init hook is used so that the child process executes the node services -// (rather than whataver the main() function would normally do), see the -// execP2PNode function for more information. +func init() { + // Register a reexec function to start a simulation node when the current binary is + // executed as "p2p-node" (rather than whataver the main() function would normally do). + reexec.Register("p2p-node", execP2PNode) +} + +// ExecAdapter is a NodeAdapter which runs simulation nodes by executing the current binary +// as a child process. type ExecAdapter struct { // BaseDir is the directory under which the data directories for each // simulation node are created. @@ -150,15 +153,13 @@ func (n *ExecNode) Client() (*rpc.Client, error) { } // Start exec's the node passing the ID and service as command line arguments -// and the node config encoded as JSON in the _P2P_NODE_CONFIG environment -// variable +// and the node config encoded as JSON in an environment variable. func (n *ExecNode) Start(snapshots map[string][]byte) (err error) { if n.Cmd != nil { return errors.New("already started") } defer func() { if err != nil { - log.Error("node failed to start", "err", err) n.Stop() } }() @@ -175,59 +176,78 @@ func (n *ExecNode) Start(snapshots map[string][]byte) (err error) { return fmt.Errorf("error generating node config: %s", err) } - // use a pipe for stderr so we can both copy the node's stderr to - // os.Stderr and read the WebSocket address from the logs - stderrR, stderrW := io.Pipe() - stderr := io.MultiWriter(os.Stderr, stderrW) + // start the one-shot server that waits for startup information + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + statusURL, statusC := n.waitForStartupJSON(ctx) // start the node cmd := n.newCmd() cmd.Stdout = os.Stdout - cmd.Stderr = stderr - cmd.Env = append(os.Environ(), fmt.Sprintf("_P2P_NODE_CONFIG=%s", confData)) + cmd.Stderr = os.Stderr + cmd.Env = append(os.Environ(), + envStatusURL+"="+statusURL, + envNodeConfig+"="+string(confData), + ) if err := cmd.Start(); err != nil { return fmt.Errorf("error starting node: %s", err) } n.Cmd = cmd // read the WebSocket address from the stderr logs - var wsAddr string - wsAddrC := make(chan string) - go func() { - s := bufio.NewScanner(stderrR) - for s.Scan() { - if strings.Contains(s.Text(), "WebSocket endpoint opened") { - wsAddrC <- wsAddrPattern.FindString(s.Text()) - } - } - }() - select { - case wsAddr = <-wsAddrC: - if wsAddr == "" { - return errors.New("failed to read WebSocket address from stderr") - } - case <-time.After(10 * time.Second): - return errors.New("timed out waiting for WebSocket address on stderr") + status := <-statusC + if status.Err != "" { + return errors.New(status.Err) } - - // create the RPC client and load the node info - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - client, err := rpc.DialWebsocket(ctx, wsAddr, "") + client, err := rpc.DialWebsocket(ctx, status.WSEndpoint, "http://localhost") if err != nil { - return fmt.Errorf("error dialing rpc websocket: %s", err) + return fmt.Errorf("can't connect to RPC server: %v", err) } - var info p2p.NodeInfo - if err := client.CallContext(ctx, &info, "admin_nodeInfo"); err != nil { - return fmt.Errorf("error getting node info: %s", err) - } - n.client = client - n.wsAddr = wsAddr - n.Info = &info + // node ready :) + n.client = client + n.wsAddr = status.WSEndpoint + n.Info = status.NodeInfo return nil } +// waitForStartupJSON runs a one-shot HTTP server to receive a startup report. +func (n *ExecNode) waitForStartupJSON(ctx context.Context) (string, chan nodeStartupJSON) { + var ( + ch = make(chan nodeStartupJSON, 1) + quitOnce sync.Once + srv http.Server + ) + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + ch <- nodeStartupJSON{Err: err.Error()} + return "", ch + } + quit := func(status nodeStartupJSON) { + quitOnce.Do(func() { + l.Close() + ch <- status + }) + } + srv.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var status nodeStartupJSON + if err := json.NewDecoder(r.Body).Decode(&status); err != nil { + status.Err = fmt.Sprintf("can't decode startup report: %v", err) + } + quit(status) + }) + // Run the HTTP server, but don't wait forever and shut it down + // if the context is canceled. + go srv.Serve(l) + go func() { + <-ctx.Done() + quit(nodeStartupJSON{Err: "didn't get startup report"}) + }() + + url := "http://" + l.Addr().String() + return url, ch +} + // execCommand returns a command which runs the node locally by exec'ing // the current binary but setting argv[0] to "p2p-node" so that the child // runs execP2PNode @@ -318,12 +338,6 @@ func (n *ExecNode) Snapshots() (map[string][]byte, error) { return snapshots, n.client.Call(&snapshots, "simulation_snapshot") } -func init() { - // register a reexec function to start a devp2p node when the current - // binary is executed as "p2p-node" - reexec.Register("p2p-node", execP2PNode) -} - // execNodeConfig is used to serialize the node configuration so it can be // passed to the child process as a JSON encoded environment variable type execNodeConfig struct { @@ -333,55 +347,69 @@ type execNodeConfig struct { PeerAddrs map[string]string `json:"peer_addrs,omitempty"` } -// ExternalIP gets an external IP address so that Enode URL is usable -func ExternalIP() net.IP { - addrs, err := net.InterfaceAddrs() - if err != nil { - log.Crit("error getting IP address", "err", err) - } - for _, addr := range addrs { - if ip, ok := addr.(*net.IPNet); ok && !ip.IP.IsLoopback() && !ip.IP.IsLinkLocalUnicast() { - return ip.IP - } - } - log.Warn("unable to determine explicit IP address, falling back to loopback") - return net.IP{127, 0, 0, 1} -} - -// execP2PNode starts a devp2p node when the current binary is executed with +// execP2PNode starts a simulation node when the current binary is executed with // argv[0] being "p2p-node", reading the service / ID from argv[1] / argv[2] -// and the node config from the _P2P_NODE_CONFIG environment variable +// and the node config from an environment variable. func execP2PNode() { glogger := log.NewGlogHandler(log.StreamHandler(os.Stderr, log.LogfmtFormat())) glogger.Verbosity(log.LvlInfo) log.Root().SetHandler(glogger) + statusURL := os.Getenv(envStatusURL) + if statusURL == "" { + log.Crit("missing " + envStatusURL) + } + + // Start the node and gather startup report. + var status nodeStartupJSON + stack, stackErr := startExecNodeStack() + if stackErr != nil { + status.Err = stackErr.Error() + } else { + status.WSEndpoint = "ws://" + stack.WSEndpoint() + status.NodeInfo = stack.Server().NodeInfo() + } + + // Send status to the host. + statusJSON, _ := json.Marshal(status) + if _, err := http.Post(statusURL, "application/json", bytes.NewReader(statusJSON)); err != nil { + log.Crit("Can't post startup info", "url", statusURL, "err", err) + } + if stackErr != nil { + os.Exit(1) + } + + // Stop the stack if we get a SIGTERM signal. + go func() { + sigc := make(chan os.Signal, 1) + signal.Notify(sigc, syscall.SIGTERM) + defer signal.Stop(sigc) + <-sigc + log.Info("Received SIGTERM, shutting down...") + stack.Stop() + }() + stack.Wait() // Wait for the stack to exit. +} +func startExecNodeStack() (*node.Node, error) { // read the services from argv serviceNames := strings.Split(os.Args[1], ",") // decode the config - confEnv := os.Getenv("_P2P_NODE_CONFIG") + confEnv := os.Getenv(envNodeConfig) if confEnv == "" { - log.Crit("missing _P2P_NODE_CONFIG") + return nil, fmt.Errorf("missing " + envNodeConfig) } var conf execNodeConfig if err := json.Unmarshal([]byte(confEnv), &conf); err != nil { - log.Crit("error decoding _P2P_NODE_CONFIG", "err", err) + return nil, fmt.Errorf("error decoding %s: %v", envNodeConfig, err) } conf.Stack.P2P.PrivateKey = conf.Node.PrivateKey conf.Stack.Logger = log.New("node.id", conf.Node.ID.String()) - if strings.HasPrefix(conf.Stack.P2P.ListenAddr, ":") { - conf.Stack.P2P.ListenAddr = ExternalIP().String() + conf.Stack.P2P.ListenAddr - } - if conf.Stack.WSHost == "0.0.0.0" { - conf.Stack.WSHost = ExternalIP().String() - } - // initialize the devp2p stack stack, err := node.New(&conf.Stack) if err != nil { - log.Crit("error creating node stack", "err", err) + return nil, fmt.Errorf("error creating node stack: %v", err) } // register the services, collecting them into a map so we can wrap @@ -390,7 +418,7 @@ func execP2PNode() { for _, name := range serviceNames { serviceFunc, exists := serviceFuncs[name] if !exists { - log.Crit("unknown node service", "name", name) + return nil, fmt.Errorf("unknown node service %q", err) } constructor := func(nodeCtx *node.ServiceContext) (node.Service, error) { ctx := &ServiceContext{ @@ -409,34 +437,35 @@ func execP2PNode() { return service, nil } if err := stack.Register(constructor); err != nil { - log.Crit("error starting service", "name", name, "err", err) + return stack, fmt.Errorf("error registering service %q: %v", name, err) } } // register the snapshot service - if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { + err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { return &snapshotService{services}, nil - }); err != nil { - log.Crit("error starting snapshot service", "err", err) + }) + if err != nil { + return stack, fmt.Errorf("error starting snapshot service: %v", err) } // start the stack - if err := stack.Start(); err != nil { - log.Crit("error stating node stack", "err", err) + if err = stack.Start(); err != nil { + err = fmt.Errorf("error starting stack: %v", err) } + return stack, err +} - // stop the stack if we get a SIGTERM signal - go func() { - sigc := make(chan os.Signal, 1) - signal.Notify(sigc, syscall.SIGTERM) - defer signal.Stop(sigc) - <-sigc - log.Info("Received SIGTERM, shutting down...") - stack.Stop() - }() +const ( + envStatusURL = "_P2P_STATUS_URL" + envNodeConfig = "_P2P_NODE_CONFIG" +) - // wait for the stack to exit - stack.Wait() +// nodeStartupJSON is sent to the simulation host after startup. +type nodeStartupJSON struct { + Err string + WSEndpoint string + NodeInfo *p2p.NodeInfo } // snapshotService is a node.Service which wraps a list of services and |