diff options
Diffstat (limited to 'p2p/simulations/http.go')
-rw-r--r-- | p2p/simulations/http.go | 680 |
1 files changed, 680 insertions, 0 deletions
diff --git a/p2p/simulations/http.go b/p2p/simulations/http.go new file mode 100644 index 000000000..3fa8b9292 --- /dev/null +++ b/p2p/simulations/http.go @@ -0,0 +1,680 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package simulations + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "strconv" + "strings" + + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + "github.com/ethereum/go-ethereum/rpc" + "github.com/julienschmidt/httprouter" + "golang.org/x/net/websocket" +) + +// DefaultClient is the default simulation API client which expects the API +// to be running at http://localhost:8888 +var DefaultClient = NewClient("http://localhost:8888") + +// Client is a client for the simulation HTTP API which supports creating +// and managing simulation networks +type Client struct { + URL string + + client *http.Client +} + +// NewClient returns a new simulation API client +func NewClient(url string) *Client { + return &Client{ + URL: url, + client: http.DefaultClient, + } +} + +// GetNetwork returns details of the network +func (c *Client) GetNetwork() (*Network, error) { + network := &Network{} + return network, c.Get("/", network) +} + +// StartNetwork starts all existing nodes in the simulation network +func (c *Client) StartNetwork() error { + return c.Post("/start", nil, nil) +} + +// StopNetwork stops all existing nodes in a simulation network +func (c *Client) StopNetwork() error { + return c.Post("/stop", nil, nil) +} + +// CreateSnapshot creates a network snapshot +func (c *Client) CreateSnapshot() (*Snapshot, error) { + snap := &Snapshot{} + return snap, c.Get("/snapshot", snap) +} + +// LoadSnapshot loads a snapshot into the network +func (c *Client) LoadSnapshot(snap *Snapshot) error { + return c.Post("/snapshot", snap, nil) +} + +// SubscribeOpts is a collection of options to use when subscribing to network +// events +type SubscribeOpts struct { + // Current instructs the server to send events for existing nodes and + // connections first + Current bool + + // Filter instructs the server to only send a subset of message events + Filter string +} + +// SubscribeNetwork subscribes to network events which are sent from the server +// as a server-sent-events stream, optionally receiving events for existing +// nodes and connections and filtering message events +func (c *Client) SubscribeNetwork(events chan *Event, opts SubscribeOpts) (event.Subscription, error) { + url := fmt.Sprintf("%s/events?current=%t&filter=%s", c.URL, opts.Current, opts.Filter) + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + req.Header.Set("Accept", "text/event-stream") + res, err := c.client.Do(req) + if err != nil { + return nil, err + } + if res.StatusCode != http.StatusOK { + response, _ := ioutil.ReadAll(res.Body) + res.Body.Close() + return nil, fmt.Errorf("unexpected HTTP status: %s: %s", res.Status, response) + } + + // define a producer function to pass to event.Subscription + // which reads server-sent events from res.Body and sends + // them to the events channel + producer := func(stop <-chan struct{}) error { + defer res.Body.Close() + + // read lines from res.Body in a goroutine so that we are + // always reading from the stop channel + lines := make(chan string) + errC := make(chan error, 1) + go func() { + s := bufio.NewScanner(res.Body) + for s.Scan() { + select { + case lines <- s.Text(): + case <-stop: + return + } + } + errC <- s.Err() + }() + + // detect any lines which start with "data:", decode the data + // into an event and send it to the events channel + for { + select { + case line := <-lines: + if !strings.HasPrefix(line, "data:") { + continue + } + data := strings.TrimSpace(strings.TrimPrefix(line, "data:")) + event := &Event{} + if err := json.Unmarshal([]byte(data), event); err != nil { + return fmt.Errorf("error decoding SSE event: %s", err) + } + select { + case events <- event: + case <-stop: + return nil + } + case err := <-errC: + return err + case <-stop: + return nil + } + } + } + + return event.NewSubscription(producer), nil +} + +// GetNodes returns all nodes which exist in the network +func (c *Client) GetNodes() ([]*p2p.NodeInfo, error) { + var nodes []*p2p.NodeInfo + return nodes, c.Get("/nodes", &nodes) +} + +// CreateNode creates a node in the network using the given configuration +func (c *Client) CreateNode(config *adapters.NodeConfig) (*p2p.NodeInfo, error) { + node := &p2p.NodeInfo{} + return node, c.Post("/nodes", config, node) +} + +// GetNode returns details of a node +func (c *Client) GetNode(nodeID string) (*p2p.NodeInfo, error) { + node := &p2p.NodeInfo{} + return node, c.Get(fmt.Sprintf("/nodes/%s", nodeID), node) +} + +// StartNode starts a node +func (c *Client) StartNode(nodeID string) error { + return c.Post(fmt.Sprintf("/nodes/%s/start", nodeID), nil, nil) +} + +// StopNode stops a node +func (c *Client) StopNode(nodeID string) error { + return c.Post(fmt.Sprintf("/nodes/%s/stop", nodeID), nil, nil) +} + +// ConnectNode connects a node to a peer node +func (c *Client) ConnectNode(nodeID, peerID string) error { + return c.Post(fmt.Sprintf("/nodes/%s/conn/%s", nodeID, peerID), nil, nil) +} + +// DisconnectNode disconnects a node from a peer node +func (c *Client) DisconnectNode(nodeID, peerID string) error { + return c.Delete(fmt.Sprintf("/nodes/%s/conn/%s", nodeID, peerID)) +} + +// RPCClient returns an RPC client connected to a node +func (c *Client) RPCClient(ctx context.Context, nodeID string) (*rpc.Client, error) { + baseURL := strings.Replace(c.URL, "http", "ws", 1) + return rpc.DialWebsocket(ctx, fmt.Sprintf("%s/nodes/%s/rpc", baseURL, nodeID), "") +} + +// Get performs a HTTP GET request decoding the resulting JSON response +// into "out" +func (c *Client) Get(path string, out interface{}) error { + return c.Send("GET", path, nil, out) +} + +// Post performs a HTTP POST request sending "in" as the JSON body and +// decoding the resulting JSON response into "out" +func (c *Client) Post(path string, in, out interface{}) error { + return c.Send("POST", path, in, out) +} + +// Delete performs a HTTP DELETE request +func (c *Client) Delete(path string) error { + return c.Send("DELETE", path, nil, nil) +} + +// Send performs a HTTP request, sending "in" as the JSON request body and +// decoding the JSON response into "out" +func (c *Client) Send(method, path string, in, out interface{}) error { + var body []byte + if in != nil { + var err error + body, err = json.Marshal(in) + if err != nil { + return err + } + } + req, err := http.NewRequest(method, c.URL+path, bytes.NewReader(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + res, err := c.client.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusCreated { + response, _ := ioutil.ReadAll(res.Body) + return fmt.Errorf("unexpected HTTP status: %s: %s", res.Status, response) + } + if out != nil { + if err := json.NewDecoder(res.Body).Decode(out); err != nil { + return err + } + } + return nil +} + +// Server is an HTTP server providing an API to manage a simulation network +type Server struct { + router *httprouter.Router + network *Network +} + +// NewServer returns a new simulation API server +func NewServer(network *Network) *Server { + s := &Server{ + router: httprouter.New(), + network: network, + } + + s.OPTIONS("/", s.Options) + s.GET("/", s.GetNetwork) + s.POST("/start", s.StartNetwork) + s.POST("/stop", s.StopNetwork) + s.GET("/events", s.StreamNetworkEvents) + s.GET("/snapshot", s.CreateSnapshot) + s.POST("/snapshot", s.LoadSnapshot) + s.POST("/nodes", s.CreateNode) + s.GET("/nodes", s.GetNodes) + s.GET("/nodes/:nodeid", s.GetNode) + s.POST("/nodes/:nodeid/start", s.StartNode) + s.POST("/nodes/:nodeid/stop", s.StopNode) + s.POST("/nodes/:nodeid/conn/:peerid", s.ConnectNode) + s.DELETE("/nodes/:nodeid/conn/:peerid", s.DisconnectNode) + s.GET("/nodes/:nodeid/rpc", s.NodeRPC) + + return s +} + +// GetNetwork returns details of the network +func (s *Server) GetNetwork(w http.ResponseWriter, req *http.Request) { + s.JSON(w, http.StatusOK, s.network) +} + +// StartNetwork starts all nodes in the network +func (s *Server) StartNetwork(w http.ResponseWriter, req *http.Request) { + if err := s.network.StartAll(); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) +} + +// StopNetwork stops all nodes in the network +func (s *Server) StopNetwork(w http.ResponseWriter, req *http.Request) { + if err := s.network.StopAll(); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) +} + +// StreamNetworkEvents streams network events as a server-sent-events stream +func (s *Server) StreamNetworkEvents(w http.ResponseWriter, req *http.Request) { + events := make(chan *Event) + sub := s.network.events.Subscribe(events) + defer sub.Unsubscribe() + + // stop the stream if the client goes away + var clientGone <-chan bool + if cn, ok := w.(http.CloseNotifier); ok { + clientGone = cn.CloseNotify() + } + + // write writes the given event and data to the stream like: + // + // event: <event> + // data: <data> + // + write := func(event, data string) { + fmt.Fprintf(w, "event: %s\n", event) + fmt.Fprintf(w, "data: %s\n\n", data) + if fw, ok := w.(http.Flusher); ok { + fw.Flush() + } + } + writeEvent := func(event *Event) error { + data, err := json.Marshal(event) + if err != nil { + return err + } + write("network", string(data)) + return nil + } + writeErr := func(err error) { + write("error", err.Error()) + } + + // check if filtering has been requested + var filters MsgFilters + if filterParam := req.URL.Query().Get("filter"); filterParam != "" { + var err error + filters, err = NewMsgFilters(filterParam) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + } + + w.Header().Set("Content-Type", "text/event-stream; charset=utf-8") + w.WriteHeader(http.StatusOK) + fmt.Fprintf(w, "\n\n") + if fw, ok := w.(http.Flusher); ok { + fw.Flush() + } + + // optionally send the existing nodes and connections + if req.URL.Query().Get("current") == "true" { + snap, err := s.network.Snapshot() + if err != nil { + writeErr(err) + return + } + for _, node := range snap.Nodes { + event := NewEvent(&node.Node) + if err := writeEvent(event); err != nil { + writeErr(err) + return + } + } + for _, conn := range snap.Conns { + event := NewEvent(&conn) + if err := writeEvent(event); err != nil { + writeErr(err) + return + } + } + } + + for { + select { + case event := <-events: + // only send message events which match the filters + if event.Msg != nil && !filters.Match(event.Msg) { + continue + } + if err := writeEvent(event); err != nil { + writeErr(err) + return + } + case <-clientGone: + return + } + } +} + +// NewMsgFilters constructs a collection of message filters from a URL query +// parameter. +// +// The parameter is expected to be a dash-separated list of individual filters, +// each having the format '<proto>:<codes>', where <proto> is the name of a +// protocol and <codes> is a comma-separated list of message codes. +// +// A message code of '*' or '-1' is considered a wildcard and matches any code. +func NewMsgFilters(filterParam string) (MsgFilters, error) { + filters := make(MsgFilters) + for _, filter := range strings.Split(filterParam, "-") { + protoCodes := strings.SplitN(filter, ":", 2) + if len(protoCodes) != 2 || protoCodes[0] == "" || protoCodes[1] == "" { + return nil, fmt.Errorf("invalid message filter: %s", filter) + } + proto := protoCodes[0] + for _, code := range strings.Split(protoCodes[1], ",") { + if code == "*" || code == "-1" { + filters[MsgFilter{Proto: proto, Code: -1}] = struct{}{} + continue + } + n, err := strconv.ParseUint(code, 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid message code: %s", code) + } + filters[MsgFilter{Proto: proto, Code: int64(n)}] = struct{}{} + } + } + return filters, nil +} + +// MsgFilters is a collection of filters which are used to filter message +// events +type MsgFilters map[MsgFilter]struct{} + +// Match checks if the given message matches any of the filters +func (m MsgFilters) Match(msg *Msg) bool { + // check if there is a wildcard filter for the message's protocol + if _, ok := m[MsgFilter{Proto: msg.Protocol, Code: -1}]; ok { + return true + } + + // check if there is a filter for the message's protocol and code + if _, ok := m[MsgFilter{Proto: msg.Protocol, Code: int64(msg.Code)}]; ok { + return true + } + + return false +} + +// MsgFilter is used to filter message events based on protocol and message +// code +type MsgFilter struct { + // Proto is matched against a message's protocol + Proto string + + // Code is matched against a message's code, with -1 matching all codes + Code int64 +} + +// CreateSnapshot creates a network snapshot +func (s *Server) CreateSnapshot(w http.ResponseWriter, req *http.Request) { + snap, err := s.network.Snapshot() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + s.JSON(w, http.StatusOK, snap) +} + +// LoadSnapshot loads a snapshot into the network +func (s *Server) LoadSnapshot(w http.ResponseWriter, req *http.Request) { + snap := &Snapshot{} + if err := json.NewDecoder(req.Body).Decode(snap); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + if err := s.network.Load(snap); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + s.JSON(w, http.StatusOK, s.network) +} + +// CreateNode creates a node in the network using the given configuration +func (s *Server) CreateNode(w http.ResponseWriter, req *http.Request) { + config := adapters.RandomNodeConfig() + err := json.NewDecoder(req.Body).Decode(config) + if err != nil && err != io.EOF { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + node, err := s.network.NewNodeWithConfig(config) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + s.JSON(w, http.StatusCreated, node.NodeInfo()) +} + +// GetNodes returns all nodes which exist in the network +func (s *Server) GetNodes(w http.ResponseWriter, req *http.Request) { + nodes := s.network.GetNodes() + + infos := make([]*p2p.NodeInfo, len(nodes)) + for i, node := range nodes { + infos[i] = node.NodeInfo() + } + + s.JSON(w, http.StatusOK, infos) +} + +// GetNode returns details of a node +func (s *Server) GetNode(w http.ResponseWriter, req *http.Request) { + node := req.Context().Value("node").(*Node) + + s.JSON(w, http.StatusOK, node.NodeInfo()) +} + +// StartNode starts a node +func (s *Server) StartNode(w http.ResponseWriter, req *http.Request) { + node := req.Context().Value("node").(*Node) + + if err := s.network.Start(node.ID()); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + s.JSON(w, http.StatusOK, node.NodeInfo()) +} + +// StopNode stops a node +func (s *Server) StopNode(w http.ResponseWriter, req *http.Request) { + node := req.Context().Value("node").(*Node) + + if err := s.network.Stop(node.ID()); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + s.JSON(w, http.StatusOK, node.NodeInfo()) +} + +// ConnectNode connects a node to a peer node +func (s *Server) ConnectNode(w http.ResponseWriter, req *http.Request) { + node := req.Context().Value("node").(*Node) + peer := req.Context().Value("peer").(*Node) + + if err := s.network.Connect(node.ID(), peer.ID()); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + s.JSON(w, http.StatusOK, node.NodeInfo()) +} + +// DisconnectNode disconnects a node from a peer node +func (s *Server) DisconnectNode(w http.ResponseWriter, req *http.Request) { + node := req.Context().Value("node").(*Node) + peer := req.Context().Value("peer").(*Node) + + if err := s.network.Disconnect(node.ID(), peer.ID()); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + s.JSON(w, http.StatusOK, node.NodeInfo()) +} + +// Options responds to the OPTIONS HTTP method by returning a 200 OK response +// with the "Access-Control-Allow-Headers" header set to "Content-Type" +func (s *Server) Options(w http.ResponseWriter, req *http.Request) { + w.Header().Set("Access-Control-Allow-Headers", "Content-Type") + w.WriteHeader(http.StatusOK) +} + +// NodeRPC forwards RPC requests to a node in the network via a WebSocket +// connection +func (s *Server) NodeRPC(w http.ResponseWriter, req *http.Request) { + node := req.Context().Value("node").(*Node) + + handler := func(conn *websocket.Conn) { + node.ServeRPC(conn) + } + + websocket.Server{Handler: handler}.ServeHTTP(w, req) +} + +// ServeHTTP implements the http.Handler interface by delegating to the +// underlying httprouter.Router +func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) { + s.router.ServeHTTP(w, req) +} + +// GET registers a handler for GET requests to a particular path +func (s *Server) GET(path string, handle http.HandlerFunc) { + s.router.GET(path, s.wrapHandler(handle)) +} + +// POST registers a handler for POST requests to a particular path +func (s *Server) POST(path string, handle http.HandlerFunc) { + s.router.POST(path, s.wrapHandler(handle)) +} + +// DELETE registers a handler for DELETE requests to a particular path +func (s *Server) DELETE(path string, handle http.HandlerFunc) { + s.router.DELETE(path, s.wrapHandler(handle)) +} + +// OPTIONS registers a handler for OPTIONS requests to a particular path +func (s *Server) OPTIONS(path string, handle http.HandlerFunc) { + s.router.OPTIONS("/*path", s.wrapHandler(handle)) +} + +// JSON sends "data" as a JSON HTTP response +func (s *Server) JSON(w http.ResponseWriter, status int, data interface{}) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + json.NewEncoder(w).Encode(data) +} + +// wrapHandler returns a httprouter.Handle which wraps a http.HandlerFunc by +// populating request.Context with any objects from the URL params +func (s *Server) wrapHandler(handler http.HandlerFunc) httprouter.Handle { + return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") + + ctx := context.Background() + + if id := params.ByName("nodeid"); id != "" { + var node *Node + if nodeID, err := discover.HexID(id); err == nil { + node = s.network.GetNode(nodeID) + } else { + node = s.network.GetNodeByName(id) + } + if node == nil { + http.NotFound(w, req) + return + } + ctx = context.WithValue(ctx, "node", node) + } + + if id := params.ByName("peerid"); id != "" { + var peer *Node + if peerID, err := discover.HexID(id); err == nil { + peer = s.network.GetNode(peerID) + } else { + peer = s.network.GetNodeByName(id) + } + if peer == nil { + http.NotFound(w, req) + return + } + ctx = context.WithValue(ctx, "peer", peer) + } + + handler(w, req.WithContext(ctx)) + } +} |