diff options
Diffstat (limited to 'rpc/comms')
-rw-r--r-- | rpc/comms/comms.go | 104 | ||||
-rw-r--r-- | rpc/comms/http.go | 190 | ||||
-rw-r--r-- | rpc/comms/http_net.go | 166 | ||||
-rw-r--r-- | rpc/comms/inproc.go | 66 | ||||
-rw-r--r-- | rpc/comms/ipc.go | 73 | ||||
-rw-r--r-- | rpc/comms/ipc_unix.go | 43 | ||||
-rw-r--r-- | rpc/comms/ipc_windows.go | 13 |
7 files changed, 615 insertions, 40 deletions
diff --git a/rpc/comms/comms.go b/rpc/comms/comms.go index 244f5a7a6..bfe625758 100644 --- a/rpc/comms/comms.go +++ b/rpc/comms/comms.go @@ -1,7 +1,111 @@ package comms +import ( + "io" + "net" + + "fmt" + "strings" + + "strconv" + + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/rpc/codec" + "github.com/ethereum/go-ethereum/rpc/shared" +) + +const ( + maxHttpSizeReqLength = 1024 * 1024 // 1MB +) + +var ( + // List with all API's which are offered over the in proc interface by default + DefaultInProcApis = shared.AllApis + + // List with all API's which are offered over the IPC interface by default + DefaultIpcApis = shared.AllApis + + // List with API's which are offered over thr HTTP/RPC interface by default + DefaultHttpRpcApis = strings.Join([]string{ + shared.DbApiName, shared.EthApiName, shared.NetApiName, shared.Web3ApiName, + }, ",") +) + type EthereumClient interface { + // Close underlaying connection Close() + // Send request Send(interface{}) error + // Receive response Recv() (interface{}, error) + // List with modules this client supports + SupportedModules() (map[string]string, error) +} + +func handle(conn net.Conn, api shared.EthereumApi, c codec.Codec) { + codec := c.New(conn) + + for { + req, err := codec.ReadRequest() + if err == io.EOF { + codec.Close() + return + } else if err != nil { + glog.V(logger.Error).Infof("comms recv err - %v\n", err) + codec.Close() + return + } + + var rpcResponse interface{} + res, err := api.Execute(req) + + rpcResponse = shared.NewRpcResponse(req.Id, req.Jsonrpc, res, err) + err = codec.WriteResponse(rpcResponse) + if err != nil { + glog.V(logger.Error).Infof("comms send err - %v\n", err) + codec.Close() + return + } + } +} + +// Endpoint must be in the form of: +// ${protocol}:${path} +// e.g. ipc:/tmp/geth.ipc +// rpc:localhost:8545 +func ClientFromEndpoint(endpoint string, c codec.Codec) (EthereumClient, error) { + if strings.HasPrefix(endpoint, "ipc:") { + cfg := IpcConfig{ + Endpoint: endpoint[4:], + } + return NewIpcClient(cfg, codec.JSON) + } + + if strings.HasPrefix(endpoint, "rpc:") { + parts := strings.Split(endpoint, ":") + addr := "http://localhost" + port := uint(8545) + if len(parts) >= 3 { + addr = parts[1] + ":" + parts[2] + } + + if len(parts) >= 4 { + p, err := strconv.Atoi(parts[3]) + + if err != nil { + return nil, err + } + port = uint(p) + } + + cfg := HttpConfig{ + ListenAddress: addr, + ListenPort: port, + } + + return NewHttpClient(cfg, codec.JSON), nil + } + + return nil, fmt.Errorf("Invalid endpoint") } diff --git a/rpc/comms/http.go b/rpc/comms/http.go new file mode 100644 index 000000000..ebee791bd --- /dev/null +++ b/rpc/comms/http.go @@ -0,0 +1,190 @@ +package comms + +import ( + "fmt" + "net/http" + "strings" + + "bytes" + "io/ioutil" + + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/rpc/codec" + "github.com/ethereum/go-ethereum/rpc/shared" + "github.com/rs/cors" +) + +var ( + // main HTTP rpc listener + httpListener *stoppableTCPListener + listenerStoppedError = fmt.Errorf("Listener has stopped") +) + +type HttpConfig struct { + ListenAddress string + ListenPort uint + CorsDomain string +} + +func StartHttp(cfg HttpConfig, codec codec.Codec, api shared.EthereumApi) error { + if httpListener != nil { + if fmt.Sprintf("%s:%d", cfg.ListenAddress, cfg.ListenPort) != httpListener.Addr().String() { + return fmt.Errorf("RPC service already running on %s ", httpListener.Addr().String()) + } + return nil // RPC service already running on given host/port + } + + l, err := newStoppableTCPListener(fmt.Sprintf("%s:%d", cfg.ListenAddress, cfg.ListenPort)) + if err != nil { + glog.V(logger.Error).Infof("Can't listen on %s:%d: %v", cfg.ListenAddress, cfg.ListenPort, err) + return err + } + httpListener = l + + var handler http.Handler + if len(cfg.CorsDomain) > 0 { + var opts cors.Options + opts.AllowedMethods = []string{"POST"} + opts.AllowedOrigins = strings.Split(cfg.CorsDomain, " ") + + c := cors.New(opts) + handler = newStoppableHandler(c.Handler(gethHttpHandler(codec, api)), l.stop) + } else { + handler = newStoppableHandler(gethHttpHandler(codec, api), l.stop) + } + + go http.Serve(l, handler) + + return nil +} + +func StopHttp() { + if httpListener != nil { + httpListener.Stop() + httpListener = nil + } +} + +type httpClient struct { + address string + port uint + codec codec.ApiCoder + lastRes interface{} + lastErr error +} + +// Create a new in process client +func NewHttpClient(cfg HttpConfig, c codec.Codec) *httpClient { + return &httpClient{ + address: cfg.ListenAddress, + port: cfg.ListenPort, + codec: c.New(nil), + } +} + +func (self *httpClient) Close() { + // do nothing +} + +func (self *httpClient) Send(req interface{}) error { + var body []byte + var err error + + self.lastRes = nil + self.lastErr = nil + + if body, err = self.codec.Encode(req); err != nil { + return err + } + + httpReq, err := http.NewRequest("POST", fmt.Sprintf("%s:%d", self.address, self.port), bytes.NewBuffer(body)) + if err != nil { + return err + } + httpReq.Header.Set("Content-Type", "application/json") + + client := http.Client{} + resp, err := client.Do(httpReq) + if err != nil { + return err + } + + defer resp.Body.Close() + + if resp.Status == "200 OK" { + reply, _ := ioutil.ReadAll(resp.Body) + var rpcSuccessResponse shared.SuccessResponse + if err = self.codec.Decode(reply, &rpcSuccessResponse); err == nil { + self.lastRes = rpcSuccessResponse.Result + self.lastErr = err + return nil + } else { + var rpcErrorResponse shared.ErrorResponse + if err = self.codec.Decode(reply, &rpcErrorResponse); err == nil { + self.lastRes = rpcErrorResponse.Error + self.lastErr = err + return nil + } else { + return err + } + } + } + + return fmt.Errorf("Not implemented") +} + +func (self *httpClient) Recv() (interface{}, error) { + return self.lastRes, self.lastErr +} + +func (self *httpClient) SupportedModules() (map[string]string, error) { + var body []byte + var err error + + payload := shared.Request{ + Id: 1, + Jsonrpc: "2.0", + Method: "modules", + } + + if body, err = self.codec.Encode(payload); err != nil { + return nil, err + } + + req, err := http.NewRequest("POST", fmt.Sprintf("%s:%d", self.address, self.port), bytes.NewBuffer(body)) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + + client := http.Client{} + resp, err := client.Do(req) + if err != nil { + return nil, err + } + + defer resp.Body.Close() + + if resp.Status == "200 OK" { + reply, _ := ioutil.ReadAll(resp.Body) + var rpcRes shared.SuccessResponse + if err = self.codec.Decode(reply, &rpcRes); err != nil { + return nil, err + } + + result := make(map[string]string) + if modules, ok := rpcRes.Result.(map[string]interface{}); ok { + for a, v := range modules { + result[a] = fmt.Sprintf("%s", v) + } + return result, nil + } + err = fmt.Errorf("Unable to parse module response - %v", rpcRes.Result) + } else { + fmt.Printf("resp.Status = %s\n", resp.Status) + fmt.Printf("err = %v\n", err) + } + + return nil, err +} diff --git a/rpc/comms/http_net.go b/rpc/comms/http_net.go new file mode 100644 index 000000000..acc5f99a9 --- /dev/null +++ b/rpc/comms/http_net.go @@ -0,0 +1,166 @@ +package comms + +import ( + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "time" + + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/rpc/codec" + "github.com/ethereum/go-ethereum/rpc/shared" +) + +// When https://github.com/golang/go/issues/4674 is implemented this could be replaced +type stoppableTCPListener struct { + *net.TCPListener + stop chan struct{} // closed when the listener must stop +} + +func newStoppableTCPListener(addr string) (*stoppableTCPListener, error) { + wl, err := net.Listen("tcp", addr) + if err != nil { + return nil, err + } + + if tcpl, ok := wl.(*net.TCPListener); ok { + stop := make(chan struct{}) + return &stoppableTCPListener{tcpl, stop}, nil + } + + return nil, fmt.Errorf("Unable to create TCP listener for RPC service") +} + +// Stop the listener and all accepted and still active connections. +func (self *stoppableTCPListener) Stop() { + close(self.stop) +} + +func (self *stoppableTCPListener) Accept() (net.Conn, error) { + for { + self.SetDeadline(time.Now().Add(time.Duration(1 * time.Second))) + c, err := self.TCPListener.AcceptTCP() + + select { + case <-self.stop: + if c != nil { // accept timeout + c.Close() + } + self.TCPListener.Close() + return nil, listenerStoppedError + default: + } + + if err != nil { + if netErr, ok := err.(net.Error); ok && netErr.Timeout() && netErr.Temporary() { + continue // regular timeout + } + } + + return &closableConnection{c, self.stop}, err + } +} + +type closableConnection struct { + *net.TCPConn + closed chan struct{} +} + +func (self *closableConnection) Read(b []byte) (n int, err error) { + select { + case <-self.closed: + self.TCPConn.Close() + return 0, io.EOF + default: + return self.TCPConn.Read(b) + } +} + +// Wraps the default handler and checks if the RPC service was stopped. In that case it returns an +// error indicating that the service was stopped. This will only happen for connections which are +// kept open (HTTP keep-alive) when the RPC service was shutdown. +func newStoppableHandler(h http.Handler, stop chan struct{}) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + select { + case <-stop: + w.Header().Set("Content-Type", "application/json") + err := fmt.Errorf("RPC service stopped") + response := shared.NewRpcResponse(-1, shared.JsonRpcVersion, nil, err) + httpSend(w, response) + default: + h.ServeHTTP(w, r) + } + }) +} + +func httpSend(writer io.Writer, v interface{}) (n int, err error) { + var payload []byte + payload, err = json.MarshalIndent(v, "", "\t") + if err != nil { + glog.V(logger.Error).Infoln("Error marshalling JSON", err) + return 0, err + } + glog.V(logger.Detail).Infof("Sending payload: %s", payload) + + return writer.Write(payload) +} + +func gethHttpHandler(codec codec.Codec, a shared.EthereumApi) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.Header().Set("Content-Type", "application/json") + + // Limit request size to resist DoS + if req.ContentLength > maxHttpSizeReqLength { + err := fmt.Errorf("Request too large") + response := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32700, err) + httpSend(w, &response) + return + } + + defer req.Body.Close() + payload, err := ioutil.ReadAll(req.Body) + if err != nil { + err := fmt.Errorf("Could not read request body") + response := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32700, err) + httpSend(w, &response) + return + } + + c := codec.New(nil) + var rpcReq shared.Request + if err = c.Decode(payload, &rpcReq); err == nil { + reply, err := a.Execute(&rpcReq) + res := shared.NewRpcResponse(rpcReq.Id, rpcReq.Jsonrpc, reply, err) + httpSend(w, &res) + return + } + + var reqBatch []shared.Request + if err = c.Decode(payload, &reqBatch); err == nil { + resBatch := make([]*interface{}, len(reqBatch)) + resCount := 0 + + for i, rpcReq := range reqBatch { + reply, err := a.Execute(&rpcReq) + if rpcReq.Id != nil { // this leaves nil entries in the response batch for later removal + resBatch[i] = shared.NewRpcResponse(rpcReq.Id, rpcReq.Jsonrpc, reply, err) + resCount += 1 + } + } + + // make response omitting nil entries + resBatch = resBatch[:resCount] + httpSend(w, resBatch) + return + } + + // invalid request + err = fmt.Errorf("Could not decode request") + res := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32600, err) + httpSend(w, res) + }) +} diff --git a/rpc/comms/inproc.go b/rpc/comms/inproc.go new file mode 100644 index 000000000..5c84b8fd8 --- /dev/null +++ b/rpc/comms/inproc.go @@ -0,0 +1,66 @@ +package comms + +import ( + "fmt" + + "github.com/ethereum/go-ethereum/rpc/codec" + "github.com/ethereum/go-ethereum/rpc/shared" +) + +type InProcClient struct { + api shared.EthereumApi + codec codec.Codec + lastId interface{} + lastJsonrpc string + lastErr error + lastRes interface{} +} + +// Create a new in process client +func NewInProcClient(codec codec.Codec) *InProcClient { + return &InProcClient{ + codec: codec, + } +} + +func (self *InProcClient) Close() { + // do nothing +} + +// Need to setup api support +func (self *InProcClient) Initialize(offeredApi shared.EthereumApi) { + self.api = offeredApi +} + +func (self *InProcClient) Send(req interface{}) error { + if r, ok := req.(*shared.Request); ok { + self.lastId = r.Id + self.lastJsonrpc = r.Jsonrpc + self.lastRes, self.lastErr = self.api.Execute(r) + return self.lastErr + } + + return fmt.Errorf("Invalid request (%T)", req) +} + +func (self *InProcClient) Recv() (interface{}, error) { + return self.lastRes, self.lastErr +} + +func (self *InProcClient) SupportedModules() (map[string]string, error) { + req := shared.Request{ + Id: 1, + Jsonrpc: "2.0", + Method: "modules", + } + + if res, err := self.api.Execute(&req); err == nil { + if result, ok := res.(map[string]string); ok { + return result, nil + } + } else { + return nil, err + } + + return nil, fmt.Errorf("Invalid response") +} diff --git a/rpc/comms/ipc.go b/rpc/comms/ipc.go index a75039d17..068a1288f 100644 --- a/rpc/comms/ipc.go +++ b/rpc/comms/ipc.go @@ -1,8 +1,13 @@ package comms import ( - "github.com/ethereum/go-ethereum/rpc/api" + "fmt" + "net" + + "encoding/json" + "github.com/ethereum/go-ethereum/rpc/codec" + "github.com/ethereum/go-ethereum/rpc/shared" ) type IpcConfig struct { @@ -10,19 +15,74 @@ type IpcConfig struct { } type ipcClient struct { - c codec.ApiCoder + endpoint string + codec codec.Codec + coder codec.ApiCoder } func (self *ipcClient) Close() { - self.c.Close() + self.coder.Close() } func (self *ipcClient) Send(req interface{}) error { - return self.c.WriteResponse(req) + var err error + if r, ok := req.(*shared.Request); ok { + if err = self.coder.WriteResponse(r); err != nil { + if _, ok := err.(*net.OpError); ok { // connection lost, retry once + if err = self.reconnect(); err == nil { + err = self.coder.WriteResponse(r) + } + } + } + return err + } + + return fmt.Errorf("Invalid request (%T)", req) } func (self *ipcClient) Recv() (interface{}, error) { - return self.c.ReadResponse() + res, err := self.coder.ReadResponse() + if err != nil { + return nil, err + } + + if r, ok := res.(shared.SuccessResponse); ok { + return r.Result, nil + } + + if r, ok := res.(shared.ErrorResponse); ok { + return r.Error, nil + } + + return res, err +} + +func (self *ipcClient) SupportedModules() (map[string]string, error) { + req := shared.Request{ + Id: 1, + Jsonrpc: "2.0", + Method: "modules", + } + + if err := self.coder.WriteResponse(req); err != nil { + return nil, err + } + + res, err := self.coder.ReadResponse() + if err != nil { + return nil, err + } + + if sucRes, ok := res.(shared.SuccessResponse); ok { + data, _ := json.Marshal(sucRes.Result) + modules := make(map[string]string) + err = json.Unmarshal(data, &modules) + if err == nil { + return modules, nil + } + } + + return nil, fmt.Errorf("Invalid response") } // Create a new IPC client, UNIX domain socket on posix, named pipe on Windows @@ -31,7 +91,6 @@ func NewIpcClient(cfg IpcConfig, codec codec.Codec) (*ipcClient, error) { } // Start IPC server -func StartIpc(cfg IpcConfig, codec codec.Codec, apis ...api.EthereumApi) error { - offeredApi := api.Merge(apis...) +func StartIpc(cfg IpcConfig, codec codec.Codec, offeredApi shared.EthereumApi) error { return startIpc(cfg, codec, offeredApi) } diff --git a/rpc/comms/ipc_unix.go b/rpc/comms/ipc_unix.go index 5a94fd1e0..295eb916b 100644 --- a/rpc/comms/ipc_unix.go +++ b/rpc/comms/ipc_unix.go @@ -3,13 +3,11 @@ package comms import ( - "io" "net" "os" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" - "github.com/ethereum/go-ethereum/rpc/api" "github.com/ethereum/go-ethereum/rpc/codec" "github.com/ethereum/go-ethereum/rpc/shared" ) @@ -20,10 +18,20 @@ func newIpcClient(cfg IpcConfig, codec codec.Codec) (*ipcClient, error) { return nil, err } - return &ipcClient{codec.New(c)}, nil + return &ipcClient{cfg.Endpoint, codec, codec.New(c)}, nil } -func startIpc(cfg IpcConfig, codec codec.Codec, api api.EthereumApi) error { +func (self *ipcClient) reconnect() error { + self.coder.Close() + c, err := net.DialUnix("unix", nil, &net.UnixAddr{self.endpoint, "unix"}) + if err == nil { + self.coder = self.codec.New(c) + } + + return err +} + +func startIpc(cfg IpcConfig, codec codec.Codec, api shared.EthereumApi) error { os.Remove(cfg.Endpoint) // in case it still exists from a previous run l, err := net.ListenUnix("unix", &net.UnixAddr{Name: cfg.Endpoint, Net: "unix"}) @@ -40,32 +48,7 @@ func startIpc(cfg IpcConfig, codec codec.Codec, api api.EthereumApi) error { continue } - go func(conn net.Conn) { - codec := codec.New(conn) - - for { - req, err := codec.ReadRequest() - if err == io.EOF { - codec.Close() - return - } else if err != nil { - glog.V(logger.Error).Infof("IPC recv err - %v\n", err) - codec.Close() - return - } - - var rpcResponse interface{} - res, err := api.Execute(req) - - rpcResponse = shared.NewRpcResponse(req.Id, req.Jsonrpc, res, err) - err = codec.WriteResponse(rpcResponse) - if err != nil { - glog.V(logger.Error).Infof("IPC send err - %v\n", err) - codec.Close() - return - } - } - }(conn) + go handle(conn, api, codec) } os.Remove(cfg.Endpoint) diff --git a/rpc/comms/ipc_windows.go b/rpc/comms/ipc_windows.go index c48dfb7fb..44c82ef8a 100644 --- a/rpc/comms/ipc_windows.go +++ b/rpc/comms/ipc_windows.go @@ -14,7 +14,6 @@ import ( "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" - "github.com/ethereum/go-ethereum/rpc/api" "github.com/ethereum/go-ethereum/rpc/codec" "github.com/ethereum/go-ethereum/rpc/shared" ) @@ -641,10 +640,18 @@ func newIpcClient(cfg IpcConfig, codec codec.Codec) (*ipcClient, error) { return nil, err } - return &ipcClient{codec.New(c)}, nil + return &ipcClient{cfg.Endpoint, codec, codec.New(c)}, nil } -func startIpc(cfg IpcConfig, codec codec.Codec, api api.EthereumApi) error { +func (self *ipcClient) reconnect() error { + c, err := Dial(self.endpoint) + if err == nil { + self.coder = self.codec.New(c) + } + return err +} + +func startIpc(cfg IpcConfig, codec codec.Codec, api shared.EthereumApi) error { os.Remove(cfg.Endpoint) // in case it still exists from a previous run l, err := Listen(cfg.Endpoint) |