From 6d92fdc0df12337c2ffb8e6d19c83653f1a00ff2 Mon Sep 17 00:00:00 2001 From: Bas van Kervel Date: Thu, 25 Jun 2015 12:01:28 +0200 Subject: added support for batch requests --- rpc/comms/comms.go | 38 +++++++++++++++++++++++++++++--------- 1 file changed, 29 insertions(+), 9 deletions(-) (limited to 'rpc/comms') diff --git a/rpc/comms/comms.go b/rpc/comms/comms.go index bfe625758..1374bde3f 100644 --- a/rpc/comms/comms.go +++ b/rpc/comms/comms.go @@ -47,7 +47,7 @@ func handle(conn net.Conn, api shared.EthereumApi, c codec.Codec) { codec := c.New(conn) for { - req, err := codec.ReadRequest() + requests, isBatch, err := codec.ReadRequest() if err == io.EOF { codec.Close() return @@ -57,15 +57,35 @@ func handle(conn net.Conn, api shared.EthereumApi, c codec.Codec) { return } - var rpcResponse interface{} - res, err := api.Execute(req) + if isBatch { + responses := make([]*interface{}, len(requests)) + responseCount := 0 + for _, req := range requests { + res, err := api.Execute(req) + if req.Id != nil { + rpcResponse := shared.NewRpcResponse(req.Id, req.Jsonrpc, res, err) + responses[responseCount] = rpcResponse + responseCount += 1 + } + } - rpcResponse = shared.NewRpcResponse(req.Id, req.Jsonrpc, res, err) - err = codec.WriteResponse(rpcResponse) - if err != nil { - glog.V(logger.Error).Infof("comms send err - %v\n", err) - codec.Close() - return + err = codec.WriteResponse(responses[:responseCount]) + if err != nil { + glog.V(logger.Error).Infof("comms send err - %v\n", err) + codec.Close() + return + } + } else { + var rpcResponse interface{} + res, err := api.Execute(requests[0]) + + rpcResponse = shared.NewRpcResponse(requests[0].Id, requests[0].Jsonrpc, res, err) + err = codec.WriteResponse(rpcResponse) + if err != nil { + glog.V(logger.Error).Infof("comms send err - %v\n", err) + codec.Close() + return + } } } } -- cgit v1.2.3 From ffbe5656ff2cba43c813f46f743fde4d1ab2dd58 Mon Sep 17 00:00:00 2001 From: Bas van Kervel Date: Thu, 25 Jun 2015 13:18:10 +0200 Subject: support for large requests/responses --- rpc/comms/ipc.go | 1 + rpc/comms/ipc_unix.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) (limited to 'rpc/comms') diff --git a/rpc/comms/ipc.go b/rpc/comms/ipc.go index 068a1288f..3cfcbf3cf 100644 --- a/rpc/comms/ipc.go +++ b/rpc/comms/ipc.go @@ -16,6 +16,7 @@ type IpcConfig struct { type ipcClient struct { endpoint string + c net.Conn codec codec.Codec coder codec.ApiCoder } diff --git a/rpc/comms/ipc_unix.go b/rpc/comms/ipc_unix.go index 295eb916b..5724231f4 100644 --- a/rpc/comms/ipc_unix.go +++ b/rpc/comms/ipc_unix.go @@ -18,7 +18,7 @@ func newIpcClient(cfg IpcConfig, codec codec.Codec) (*ipcClient, error) { return nil, err } - return &ipcClient{cfg.Endpoint, codec, codec.New(c)}, nil + return &ipcClient{cfg.Endpoint, c, codec, codec.New(c)}, nil } func (self *ipcClient) reconnect() error { -- cgit v1.2.3 From 04910c902a7654706cc0f97b86627661fcd22b36 Mon Sep 17 00:00:00 2001 From: unknown Date: Thu, 25 Jun 2015 04:53:41 -0700 Subject: support for large request/response on windows --- rpc/comms/ipc_windows.go | 39 ++++++++------------------------------- 1 file changed, 8 insertions(+), 31 deletions(-) (limited to 'rpc/comms') diff --git a/rpc/comms/ipc_windows.go b/rpc/comms/ipc_windows.go index 44c82ef8a..4914a99c4 100644 --- a/rpc/comms/ipc_windows.go +++ b/rpc/comms/ipc_windows.go @@ -640,7 +640,7 @@ func newIpcClient(cfg IpcConfig, codec codec.Codec) (*ipcClient, error) { return nil, err } - return &ipcClient{cfg.Endpoint, codec, codec.New(c)}, nil + return &ipcClient{cfg.Endpoint, c, codec, codec.New(c)}, nil } func (self *ipcClient) reconnect() error { @@ -667,36 +667,13 @@ func startIpc(cfg IpcConfig, codec codec.Codec, api shared.EthereumApi) error { glog.V(logger.Error).Infof("Error accepting ipc connection - %v\n", err) continue } - - go func(conn net.Conn) { - codec := codec.New(conn) - - for { - req, err := codec.ReadRequest() - if err == io.EOF { - codec.Close() - return - } else if err != nil { - glog.V(logger.Error).Infof("IPC recv err - %v\n", err) - codec.Close() - return - } - - var rpcResponse interface{} - res, err := api.Execute(req) - - rpcResponse = shared.NewRpcResponse(req.Id, req.Jsonrpc, res, err) - err = codec.WriteResponse(rpcResponse) - if err != nil { - glog.V(logger.Error).Infof("IPC send err - %v\n", err) - codec.Close() - return - } - } - }(conn) - } - }() - + + go handle(conn, api, codec) + } + + os.Remove(cfg.Endpoint) + }() + glog.V(logger.Info).Infof("IPC service started (%s)\n", cfg.Endpoint) return nil -- cgit v1.2.3 From 662285074e55a3915f7236a04fec355c3f416eb8 Mon Sep 17 00:00:00 2001 From: Bas van Kervel Date: Thu, 25 Jun 2015 15:54:16 +0200 Subject: improved logging for IPC connection lifetime management --- rpc/comms/comms.go | 8 ++++---- rpc/comms/ipc.go | 5 +++++ rpc/comms/ipc_unix.go | 5 ++++- rpc/comms/ipc_windows.go | 17 ++++++++++------- 4 files changed, 23 insertions(+), 12 deletions(-) (limited to 'rpc/comms') diff --git a/rpc/comms/comms.go b/rpc/comms/comms.go index 1374bde3f..6e980149f 100644 --- a/rpc/comms/comms.go +++ b/rpc/comms/comms.go @@ -43,7 +43,7 @@ type EthereumClient interface { SupportedModules() (map[string]string, error) } -func handle(conn net.Conn, api shared.EthereumApi, c codec.Codec) { +func handle(id int, conn net.Conn, api shared.EthereumApi, c codec.Codec) { codec := c.New(conn) for { @@ -52,8 +52,8 @@ func handle(conn net.Conn, api shared.EthereumApi, c codec.Codec) { codec.Close() return } else if err != nil { - glog.V(logger.Error).Infof("comms recv err - %v\n", err) codec.Close() + glog.V(logger.Debug).Infof("Closed IPC Conn %06d recv err - %v\n", id, err) return } @@ -71,8 +71,8 @@ func handle(conn net.Conn, api shared.EthereumApi, c codec.Codec) { err = codec.WriteResponse(responses[:responseCount]) if err != nil { - glog.V(logger.Error).Infof("comms send err - %v\n", err) codec.Close() + glog.V(logger.Debug).Infof("Closed IPC Conn %06d send err - %v\n", id, err) return } } else { @@ -82,8 +82,8 @@ func handle(conn net.Conn, api shared.EthereumApi, c codec.Codec) { rpcResponse = shared.NewRpcResponse(requests[0].Id, requests[0].Jsonrpc, res, err) err = codec.WriteResponse(rpcResponse) if err != nil { - glog.V(logger.Error).Infof("comms send err - %v\n", err) codec.Close() + glog.V(logger.Debug).Infof("Closed IPC Conn %06d send err - %v\n", id, err) return } } diff --git a/rpc/comms/ipc.go b/rpc/comms/ipc.go index 3cfcbf3cf..f3dda5581 100644 --- a/rpc/comms/ipc.go +++ b/rpc/comms/ipc.go @@ -2,6 +2,7 @@ package comms import ( "fmt" + "math/rand" "net" "encoding/json" @@ -95,3 +96,7 @@ func NewIpcClient(cfg IpcConfig, codec codec.Codec) (*ipcClient, error) { func StartIpc(cfg IpcConfig, codec codec.Codec, offeredApi shared.EthereumApi) error { return startIpc(cfg, codec, offeredApi) } + +func newIpcConnId() int { + return rand.Int() % 1000000 +} diff --git a/rpc/comms/ipc_unix.go b/rpc/comms/ipc_unix.go index 5724231f4..3e71c7d32 100644 --- a/rpc/comms/ipc_unix.go +++ b/rpc/comms/ipc_unix.go @@ -48,7 +48,10 @@ func startIpc(cfg IpcConfig, codec codec.Codec, api shared.EthereumApi) error { continue } - go handle(conn, api, codec) + id := newIpcConnId() + glog.V(logger.Debug).Infof("New IPC connection with id %06d started\n", id) + + go handle(id, conn, api, codec) } os.Remove(cfg.Endpoint) diff --git a/rpc/comms/ipc_windows.go b/rpc/comms/ipc_windows.go index 4914a99c4..203cd2d7b 100644 --- a/rpc/comms/ipc_windows.go +++ b/rpc/comms/ipc_windows.go @@ -667,13 +667,16 @@ func startIpc(cfg IpcConfig, codec codec.Codec, api shared.EthereumApi) error { glog.V(logger.Error).Infof("Error accepting ipc connection - %v\n", err) continue } - - go handle(conn, api, codec) - } - - os.Remove(cfg.Endpoint) - }() - + + id := newIpcConnId() + glog.V(logger.Debug).Infof("New IPC connection with id %06d started\n", id) + + go handle(id, conn, api, codec) + } + + os.Remove(cfg.Endpoint) + }() + glog.V(logger.Info).Infof("IPC service started (%s)\n", cfg.Endpoint) return nil -- cgit v1.2.3