From bde2ff034317db977354e0855e6406f9428899ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 23 Jun 2015 19:12:48 +0300 Subject: cmd/geth, rpc/api: move the metrics into the new console --- rpc/api/debug.go | 64 +++++++++++++++++++++++++++++++++++++++++++++++++++++ rpc/api/debug_js.go | 5 +++++ 2 files changed, 69 insertions(+) (limited to 'rpc') diff --git a/rpc/api/debug.go b/rpc/api/debug.go index b451d8662..871786c6f 100644 --- a/rpc/api/debug.go +++ b/rpc/api/debug.go @@ -2,6 +2,8 @@ package api import ( "fmt" + "strings" + "time" "github.com/ethereum/ethash" "github.com/ethereum/go-ethereum/core/state" @@ -11,6 +13,7 @@ import ( "github.com/ethereum/go-ethereum/rpc/codec" "github.com/ethereum/go-ethereum/rpc/shared" "github.com/ethereum/go-ethereum/xeth" + "github.com/rcrowley/go-metrics" ) const ( @@ -26,6 +29,7 @@ var ( "debug_processBlock": (*debugApi).ProcessBlock, "debug_seedHash": (*debugApi).SeedHash, "debug_setHead": (*debugApi).SetHead, + "debug_metrics": (*debugApi).Metrics, } ) @@ -171,3 +175,63 @@ func (self *debugApi) SeedHash(req *shared.Request) (interface{}, error) { return nil, err } } + +func (self *debugApi) Metrics(req *shared.Request) (interface{}, error) { + // Create a rate formatter + units := []string{"", "K", "M", "G", "T", "E", "P"} + round := func(value float64, prec int) string { + unit := 0 + for value >= 1000 { + unit, value, prec = unit+1, value/1000, 2 + } + return fmt.Sprintf(fmt.Sprintf("%%.%df%s", prec, units[unit]), value) + } + format := func(total float64, rate float64) string { + return fmt.Sprintf("%s (%s/s)", round(total, 0), round(rate, 2)) + } + // Iterate over all the metrics, and just dump for now + counters := make(map[string]interface{}) + metrics.DefaultRegistry.Each(func(name string, metric interface{}) { + // Create or retrieve the counter hierarchy for this metric + root, parts := counters, strings.Split(name, "/") + for _, part := range parts[:len(parts)-1] { + if _, ok := root[part]; !ok { + root[part] = make(map[string]interface{}) + } + root = root[part].(map[string]interface{}) + } + name = parts[len(parts)-1] + + // Fill the counter with the metric details + switch metric := metric.(type) { + case metrics.Meter: + root[name] = map[string]interface{}{ + "Avg01Min": format(metric.Rate1()*60, metric.Rate1()), + "Avg05Min": format(metric.Rate5()*300, metric.Rate5()), + "Avg15Min": format(metric.Rate15()*900, metric.Rate15()), + "Total": format(float64(metric.Count()), metric.RateMean()), + } + + case metrics.Timer: + root[name] = map[string]interface{}{ + "Avg01Min": format(metric.Rate1()*60, metric.Rate1()), + "Avg05Min": format(metric.Rate5()*300, metric.Rate5()), + "Avg15Min": format(metric.Rate15()*900, metric.Rate15()), + "Count": format(float64(metric.Count()), metric.RateMean()), + "Maximum": time.Duration(metric.Max()).String(), + "Minimum": time.Duration(metric.Min()).String(), + "Percentile": map[string]interface{}{ + "20": time.Duration(metric.Percentile(0.2)).String(), + "50": time.Duration(metric.Percentile(0.5)).String(), + "80": time.Duration(metric.Percentile(0.8)).String(), + "95": time.Duration(metric.Percentile(0.95)).String(), + "99": time.Duration(metric.Percentile(0.99)).String(), + }, + } + + default: + root[name] = "Unknown metric type" + } + }) + return counters, nil +} diff --git a/rpc/api/debug_js.go b/rpc/api/debug_js.go index 35fecb75f..e48e4df06 100644 --- a/rpc/api/debug_js.go +++ b/rpc/api/debug_js.go @@ -50,6 +50,11 @@ web3._extend({ ], properties: [ + new web3._extend.Property({ + name: 'metrics', + getter: 'debug_metrics', + outputFormatter: function(obj) { return obj; } + }) ] }); ` -- cgit v1.2.3 From e5b820c47b9343d3801e1ebbeb4a8f40843ea87c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 24 Jun 2015 14:38:58 +0300 Subject: cmd/geth, rpc/api: extend metrics API, add a basic monitor command --- rpc/api/debug.go | 92 ++++++++++++++++++++++++++++++++++++--------------- rpc/api/debug_args.go | 24 ++++++++++++++ rpc/api/debug_js.go | 12 ++++--- rpc/xeth.go | 52 +++++++++++++++++++++++++++++ 4 files changed, 148 insertions(+), 32 deletions(-) create mode 100644 rpc/xeth.go (limited to 'rpc') diff --git a/rpc/api/debug.go b/rpc/api/debug.go index 871786c6f..45c99f295 100644 --- a/rpc/api/debug.go +++ b/rpc/api/debug.go @@ -177,6 +177,10 @@ func (self *debugApi) SeedHash(req *shared.Request) (interface{}, error) { } func (self *debugApi) Metrics(req *shared.Request) (interface{}, error) { + args := new(MetricsArgs) + if err := self.codec.Decode(req.Params, &args); err != nil { + return nil, shared.NewDecodeParamError(err.Error()) + } // Create a rate formatter units := []string{"", "K", "M", "G", "T", "E", "P"} round := func(value float64, prec int) string { @@ -202,35 +206,69 @@ func (self *debugApi) Metrics(req *shared.Request) (interface{}, error) { } name = parts[len(parts)-1] - // Fill the counter with the metric details - switch metric := metric.(type) { - case metrics.Meter: - root[name] = map[string]interface{}{ - "Avg01Min": format(metric.Rate1()*60, metric.Rate1()), - "Avg05Min": format(metric.Rate5()*300, metric.Rate5()), - "Avg15Min": format(metric.Rate15()*900, metric.Rate15()), - "Total": format(float64(metric.Count()), metric.RateMean()), + // Fill the counter with the metric details, formatting if requested + if args.Raw { + switch metric := metric.(type) { + case metrics.Meter: + root[name] = map[string]interface{}{ + "Avg01Min": metric.Rate1(), + "Avg05Min": metric.Rate5(), + "Avg15Min": metric.Rate15(), + "AvgTotal": metric.RateMean(), + "Total": float64(metric.Count()), + } + + case metrics.Timer: + root[name] = map[string]interface{}{ + "Avg01Min": metric.Rate1(), + "Avg05Min": metric.Rate5(), + "Avg15Min": metric.Rate15(), + "AvgTotal": metric.RateMean(), + "Total": float64(metric.Count()), + "Maximum": metric.Max(), + "Minimum": metric.Min(), + "Percentile": map[string]interface{}{ + "20": metric.Percentile(0.2), + "50": metric.Percentile(0.5), + "80": metric.Percentile(0.8), + "95": metric.Percentile(0.95), + "99": metric.Percentile(0.99), + }, + } + + default: + root[name] = "Unknown metric type" } - - case metrics.Timer: - root[name] = map[string]interface{}{ - "Avg01Min": format(metric.Rate1()*60, metric.Rate1()), - "Avg05Min": format(metric.Rate5()*300, metric.Rate5()), - "Avg15Min": format(metric.Rate15()*900, metric.Rate15()), - "Count": format(float64(metric.Count()), metric.RateMean()), - "Maximum": time.Duration(metric.Max()).String(), - "Minimum": time.Duration(metric.Min()).String(), - "Percentile": map[string]interface{}{ - "20": time.Duration(metric.Percentile(0.2)).String(), - "50": time.Duration(metric.Percentile(0.5)).String(), - "80": time.Duration(metric.Percentile(0.8)).String(), - "95": time.Duration(metric.Percentile(0.95)).String(), - "99": time.Duration(metric.Percentile(0.99)).String(), - }, + } else { + switch metric := metric.(type) { + case metrics.Meter: + root[name] = map[string]interface{}{ + "Avg01Min": format(metric.Rate1()*60, metric.Rate1()), + "Avg05Min": format(metric.Rate5()*300, metric.Rate5()), + "Avg15Min": format(metric.Rate15()*900, metric.Rate15()), + "Total": format(float64(metric.Count()), metric.RateMean()), + } + + case metrics.Timer: + root[name] = map[string]interface{}{ + "Avg01Min": format(metric.Rate1()*60, metric.Rate1()), + "Avg05Min": format(metric.Rate5()*300, metric.Rate5()), + "Avg15Min": format(metric.Rate15()*900, metric.Rate15()), + "Count": format(float64(metric.Count()), metric.RateMean()), + "Maximum": time.Duration(metric.Max()).String(), + "Minimum": time.Duration(metric.Min()).String(), + "Percentile": map[string]interface{}{ + "20": time.Duration(metric.Percentile(0.2)).String(), + "50": time.Duration(metric.Percentile(0.5)).String(), + "80": time.Duration(metric.Percentile(0.8)).String(), + "95": time.Duration(metric.Percentile(0.95)).String(), + "99": time.Duration(metric.Percentile(0.99)).String(), + }, + } + + default: + root[name] = "Unknown metric type" } - - default: - root[name] = "Unknown metric type" } }) return counters, nil diff --git a/rpc/api/debug_args.go b/rpc/api/debug_args.go index b9b5aa27e..b72fb03ae 100644 --- a/rpc/api/debug_args.go +++ b/rpc/api/debug_args.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "math/big" + "reflect" "github.com/ethereum/go-ethereum/rpc/shared" ) @@ -45,3 +46,26 @@ func (args *WaitForBlockArgs) UnmarshalJSON(b []byte) (err error) { return nil } + +type MetricsArgs struct { + Raw bool +} + +func (args *MetricsArgs) UnmarshalJSON(b []byte) (err error) { + var obj []interface{} + if err := json.Unmarshal(b, &obj); err != nil { + return shared.NewDecodeParamError(err.Error()) + } + if len(obj) > 1 { + return fmt.Errorf("metricsArgs needs 0, 1 arguments") + } + // default values when not provided + if len(obj) >= 1 && obj[0] != nil { + if value, ok := obj[0].(bool); !ok { + return fmt.Errorf("invalid argument %v", reflect.TypeOf(obj[0])) + } else { + args.Raw = value + } + } + return nil +} diff --git a/rpc/api/debug_js.go b/rpc/api/debug_js.go index e48e4df06..bd3a6dfb2 100644 --- a/rpc/api/debug_js.go +++ b/rpc/api/debug_js.go @@ -46,15 +46,17 @@ web3._extend({ params: 1, inputFormatter: [web3._extend.formatters.formatInputInt], outputFormatter: function(obj) { return obj; } + }), + new web3._extend.Method({ + name: 'metrics', + call: 'debug_metrics', + params: 1, + inputFormatter: [web3._extend.formatters.formatInputBool], + outputFormatter: function(obj) { return obj; } }) ], properties: [ - new web3._extend.Property({ - name: 'metrics', - getter: 'debug_metrics', - outputFormatter: function(obj) { return obj; } - }) ] }); ` diff --git a/rpc/xeth.go b/rpc/xeth.go new file mode 100644 index 000000000..b3e844380 --- /dev/null +++ b/rpc/xeth.go @@ -0,0 +1,52 @@ +package rpc + +import ( + "encoding/json" + "fmt" + "reflect" + "sync/atomic" + + "github.com/ethereum/go-ethereum/rpc/comms" + "github.com/ethereum/go-ethereum/rpc/shared" +) + +// Xeth is a native API interface to a remote node. +type Xeth struct { + client comms.EthereumClient + reqId uint32 +} + +// NewXeth constructs a new native API interface to a remote node. +func NewXeth(client comms.EthereumClient) *Xeth { + return &Xeth{ + client: client, + } +} + +// Call invokes a method with the given parameters are the remote node. +func (self *Xeth) Call(method string, params []interface{}) (map[string]interface{}, error) { + // Assemble the json RPC request + data, err := json.Marshal(params) + if err != nil { + return nil, err + } + req := &shared.Request{ + Id: atomic.AddUint32(&self.reqId, 1), + Jsonrpc: "2.0", + Method: method, + Params: data, + } + // Send the request over and process the response + if err := self.client.Send(req); err != nil { + return nil, err + } + res, err := self.client.Recv() + if err != nil { + return nil, err + } + value, ok := res.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("Invalid response type: have %v, want %v", reflect.TypeOf(res), reflect.TypeOf(make(map[string]interface{}))) + } + return value, nil +} -- cgit v1.2.3 From 92ef33d97a437dce2d7b55f06342de388d95f575 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 24 Jun 2015 18:30:00 +0300 Subject: rpc/api, cmd/geth: retrievel all percentiles, add time units --- rpc/api/debug.go | 43 ++++++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 21 deletions(-) (limited to 'rpc') diff --git a/rpc/api/debug.go b/rpc/api/debug.go index 45c99f295..5975f88ab 100644 --- a/rpc/api/debug.go +++ b/rpc/api/debug.go @@ -193,6 +193,11 @@ func (self *debugApi) Metrics(req *shared.Request) (interface{}, error) { format := func(total float64, rate float64) string { return fmt.Sprintf("%s (%s/s)", round(total, 0), round(rate, 2)) } + // Create the percentile units + percentiles := make([]float64, 101) + for i := 0; i <= 100; i++ { + percentiles[i] = float64(i) / 100 + } // Iterate over all the metrics, and just dump for now counters := make(map[string]interface{}) metrics.DefaultRegistry.Each(func(name string, metric interface{}) { @@ -211,29 +216,25 @@ func (self *debugApi) Metrics(req *shared.Request) (interface{}, error) { switch metric := metric.(type) { case metrics.Meter: root[name] = map[string]interface{}{ - "Avg01Min": metric.Rate1(), - "Avg05Min": metric.Rate5(), - "Avg15Min": metric.Rate15(), - "AvgTotal": metric.RateMean(), - "Total": float64(metric.Count()), + "AvgRate01Min": metric.Rate1(), + "AvgRate05Min": metric.Rate5(), + "AvgRate15Min": metric.Rate15(), + "MeanRate": metric.RateMean(), + "Total": float64(metric.Count()), } case metrics.Timer: + ps := make(map[string]interface{}) + for i, p := range metric.Percentiles(percentiles) { + ps[fmt.Sprintf("%d", i)] = p + } root[name] = map[string]interface{}{ - "Avg01Min": metric.Rate1(), - "Avg05Min": metric.Rate5(), - "Avg15Min": metric.Rate15(), - "AvgTotal": metric.RateMean(), - "Total": float64(metric.Count()), - "Maximum": metric.Max(), - "Minimum": metric.Min(), - "Percentile": map[string]interface{}{ - "20": metric.Percentile(0.2), - "50": metric.Percentile(0.5), - "80": metric.Percentile(0.8), - "95": metric.Percentile(0.95), - "99": metric.Percentile(0.99), - }, + "AvgRate01Min": metric.Rate1(), + "AvgRate05Min": metric.Rate5(), + "AvgRate15Min": metric.Rate15(), + "MeanRate": metric.RateMean(), + "Total": float64(metric.Count()), + "Percentiles": ps, } default: @@ -254,10 +255,10 @@ func (self *debugApi) Metrics(req *shared.Request) (interface{}, error) { "Avg01Min": format(metric.Rate1()*60, metric.Rate1()), "Avg05Min": format(metric.Rate5()*300, metric.Rate5()), "Avg15Min": format(metric.Rate15()*900, metric.Rate15()), - "Count": format(float64(metric.Count()), metric.RateMean()), + "Total": format(float64(metric.Count()), metric.RateMean()), "Maximum": time.Duration(metric.Max()).String(), "Minimum": time.Duration(metric.Min()).String(), - "Percentile": map[string]interface{}{ + "Percentiles": map[string]interface{}{ "20": time.Duration(metric.Percentile(0.2)).String(), "50": time.Duration(metric.Percentile(0.5)).String(), "80": time.Duration(metric.Percentile(0.8)).String(), -- cgit v1.2.3 From 6d92fdc0df12337c2ffb8e6d19c83653f1a00ff2 Mon Sep 17 00:00:00 2001 From: Bas van Kervel Date: Thu, 25 Jun 2015 12:01:28 +0200 Subject: added support for batch requests --- rpc/codec/codec.go | 2 +- rpc/codec/json.go | 59 ++++++++++++++++++++++++++++++++++++++++++++---------- rpc/comms/comms.go | 38 ++++++++++++++++++++++++++--------- 3 files changed, 78 insertions(+), 21 deletions(-) (limited to 'rpc') diff --git a/rpc/codec/codec.go b/rpc/codec/codec.go index 5e8f38438..3177f77e4 100644 --- a/rpc/codec/codec.go +++ b/rpc/codec/codec.go @@ -12,7 +12,7 @@ type Codec int // (de)serialization support for rpc interface type ApiCoder interface { // Parse message to request from underlying stream - ReadRequest() (*shared.Request, error) + ReadRequest() ([]*shared.Request, bool, error) // Parse response message from underlying stream ReadResponse() (interface{}, error) // Encode response to encoded form in underlying stream diff --git a/rpc/codec/json.go b/rpc/codec/json.go index 31024ee74..380b4cba7 100644 --- a/rpc/codec/json.go +++ b/rpc/codec/json.go @@ -8,33 +8,53 @@ import ( ) const ( - MAX_RESPONSE_SIZE = 64 * 1024 + MAX_REQUEST_SIZE = 1024 * 1024 + MAX_RESPONSE_SIZE = 1024 * 1024 ) // Json serialization support type JsonCodec struct { c net.Conn - d *json.Decoder - e *json.Encoder + buffer []byte + bytesInBuffer int } // Create new JSON coder instance func NewJsonCoder(conn net.Conn) ApiCoder { return &JsonCodec{ c: conn, - d: json.NewDecoder(conn), - e: json.NewEncoder(conn), + buffer: make([]byte, MAX_REQUEST_SIZE), + bytesInBuffer: 0, } } // Serialize obj to JSON and write it to conn -func (self *JsonCodec) ReadRequest() (*shared.Request, error) { - req := shared.Request{} - err := self.d.Decode(&req) +func (self *JsonCodec) ReadRequest() (requests []*shared.Request, isBatch bool, err error) { + n, err := self.c.Read(self.buffer[self.bytesInBuffer:]) + if err != nil { + self.bytesInBuffer = 0 + return nil, false, err + } + + self.bytesInBuffer += n + + singleRequest := shared.Request{} + err = json.Unmarshal(self.buffer[:self.bytesInBuffer], &singleRequest) if err == nil { - return &req, nil + self.bytesInBuffer = 0 + requests := make([]*shared.Request, 1) + requests[0] = &singleRequest + return requests, false, nil } - return nil, err + + requests = make([]*shared.Request, 0) + err = json.Unmarshal(self.buffer[:self.bytesInBuffer], &requests) + if err == nil { + self.bytesInBuffer = 0 + return requests, true, nil + } + + return nil, false, err } func (self *JsonCodec) ReadResponse() (interface{}, error) { @@ -66,7 +86,24 @@ func (self *JsonCodec) Encode(msg interface{}) ([]byte, error) { // Parse JSON data from conn to obj func (self *JsonCodec) WriteResponse(res interface{}) error { - return self.e.Encode(&res) + data, err := json.Marshal(res) + if err != nil { + self.c.Close() + return err + } + + bytesWritten := 0 + + for bytesWritten < len(data) { + n, err := self.c.Write(data[bytesWritten:]) + if err != nil { + self.c.Close() + return err + } + bytesWritten += n + } + + return nil } // Close decoder and encoder diff --git a/rpc/comms/comms.go b/rpc/comms/comms.go index bfe625758..1374bde3f 100644 --- a/rpc/comms/comms.go +++ b/rpc/comms/comms.go @@ -47,7 +47,7 @@ func handle(conn net.Conn, api shared.EthereumApi, c codec.Codec) { codec := c.New(conn) for { - req, err := codec.ReadRequest() + requests, isBatch, err := codec.ReadRequest() if err == io.EOF { codec.Close() return @@ -57,15 +57,35 @@ func handle(conn net.Conn, api shared.EthereumApi, c codec.Codec) { return } - var rpcResponse interface{} - res, err := api.Execute(req) + if isBatch { + responses := make([]*interface{}, len(requests)) + responseCount := 0 + for _, req := range requests { + res, err := api.Execute(req) + if req.Id != nil { + rpcResponse := shared.NewRpcResponse(req.Id, req.Jsonrpc, res, err) + responses[responseCount] = rpcResponse + responseCount += 1 + } + } - rpcResponse = shared.NewRpcResponse(req.Id, req.Jsonrpc, res, err) - err = codec.WriteResponse(rpcResponse) - if err != nil { - glog.V(logger.Error).Infof("comms send err - %v\n", err) - codec.Close() - return + err = codec.WriteResponse(responses[:responseCount]) + if err != nil { + glog.V(logger.Error).Infof("comms send err - %v\n", err) + codec.Close() + return + } + } else { + var rpcResponse interface{} + res, err := api.Execute(requests[0]) + + rpcResponse = shared.NewRpcResponse(requests[0].Id, requests[0].Jsonrpc, res, err) + err = codec.WriteResponse(rpcResponse) + if err != nil { + glog.V(logger.Error).Infof("comms send err - %v\n", err) + codec.Close() + return + } } } } -- cgit v1.2.3 From ffbe5656ff2cba43c813f46f743fde4d1ab2dd58 Mon Sep 17 00:00:00 2001 From: Bas van Kervel Date: Thu, 25 Jun 2015 13:18:10 +0200 Subject: support for large requests/responses --- rpc/codec/json.go | 46 ++++++++++++++++++++++++++++++---------------- rpc/comms/ipc.go | 1 + rpc/comms/ipc_unix.go | 2 +- 3 files changed, 32 insertions(+), 17 deletions(-) (limited to 'rpc') diff --git a/rpc/codec/json.go b/rpc/codec/json.go index 380b4cba7..1de649c21 100644 --- a/rpc/codec/json.go +++ b/rpc/codec/json.go @@ -2,28 +2,30 @@ package codec import ( "encoding/json" + "fmt" "net" + "time" "github.com/ethereum/go-ethereum/rpc/shared" ) const ( - MAX_REQUEST_SIZE = 1024 * 1024 + MAX_REQUEST_SIZE = 1024 * 1024 MAX_RESPONSE_SIZE = 1024 * 1024 ) // Json serialization support type JsonCodec struct { - c net.Conn - buffer []byte + c net.Conn + buffer []byte bytesInBuffer int } // Create new JSON coder instance func NewJsonCoder(conn net.Conn) ApiCoder { return &JsonCodec{ - c: conn, - buffer: make([]byte, MAX_REQUEST_SIZE), + c: conn, + buffer: make([]byte, MAX_REQUEST_SIZE), bytesInBuffer: 0, } } @@ -58,28 +60,40 @@ func (self *JsonCodec) ReadRequest() (requests []*shared.Request, isBatch bool, } func (self *JsonCodec) ReadResponse() (interface{}, error) { - var err error + bytesInBuffer := 0 buf := make([]byte, MAX_RESPONSE_SIZE) - n, _ := self.c.Read(buf) - var failure shared.ErrorResponse - if err = json.Unmarshal(buf[:n], &failure); err == nil && failure.Error != nil { - return failure, nil - } + deadline := time.Now().Add(15 * time.Second) + self.c.SetDeadline(deadline) + + for { + n, err := self.c.Read(buf[bytesInBuffer:]) + if err != nil { + return nil, err + } + bytesInBuffer += n - var success shared.SuccessResponse - if err = json.Unmarshal(buf[:n], &success); err == nil { - return success, nil + var success shared.SuccessResponse + if err = json.Unmarshal(buf[:bytesInBuffer], &success); err == nil { + return success, nil + } + + var failure shared.ErrorResponse + if err = json.Unmarshal(buf[:bytesInBuffer], &failure); err == nil && failure.Error != nil { + return failure, nil + } } - return nil, err + self.c.Close() + return nil, fmt.Errorf("Unable to read response") } -// Encode response to encoded form in underlying stream +// Decode data func (self *JsonCodec) Decode(data []byte, msg interface{}) error { return json.Unmarshal(data, msg) } +// Encode message func (self *JsonCodec) Encode(msg interface{}) ([]byte, error) { return json.Marshal(msg) } diff --git a/rpc/comms/ipc.go b/rpc/comms/ipc.go index 068a1288f..3cfcbf3cf 100644 --- a/rpc/comms/ipc.go +++ b/rpc/comms/ipc.go @@ -16,6 +16,7 @@ type IpcConfig struct { type ipcClient struct { endpoint string + c net.Conn codec codec.Codec coder codec.ApiCoder } diff --git a/rpc/comms/ipc_unix.go b/rpc/comms/ipc_unix.go index 295eb916b..5724231f4 100644 --- a/rpc/comms/ipc_unix.go +++ b/rpc/comms/ipc_unix.go @@ -18,7 +18,7 @@ func newIpcClient(cfg IpcConfig, codec codec.Codec) (*ipcClient, error) { return nil, err } - return &ipcClient{cfg.Endpoint, codec, codec.New(c)}, nil + return &ipcClient{cfg.Endpoint, c, codec, codec.New(c)}, nil } func (self *ipcClient) reconnect() error { -- cgit v1.2.3 From 04910c902a7654706cc0f97b86627661fcd22b36 Mon Sep 17 00:00:00 2001 From: unknown Date: Thu, 25 Jun 2015 04:53:41 -0700 Subject: support for large request/response on windows --- rpc/comms/ipc_windows.go | 39 ++++++++------------------------------- 1 file changed, 8 insertions(+), 31 deletions(-) (limited to 'rpc') diff --git a/rpc/comms/ipc_windows.go b/rpc/comms/ipc_windows.go index 44c82ef8a..4914a99c4 100644 --- a/rpc/comms/ipc_windows.go +++ b/rpc/comms/ipc_windows.go @@ -640,7 +640,7 @@ func newIpcClient(cfg IpcConfig, codec codec.Codec) (*ipcClient, error) { return nil, err } - return &ipcClient{cfg.Endpoint, codec, codec.New(c)}, nil + return &ipcClient{cfg.Endpoint, c, codec, codec.New(c)}, nil } func (self *ipcClient) reconnect() error { @@ -667,36 +667,13 @@ func startIpc(cfg IpcConfig, codec codec.Codec, api shared.EthereumApi) error { glog.V(logger.Error).Infof("Error accepting ipc connection - %v\n", err) continue } - - go func(conn net.Conn) { - codec := codec.New(conn) - - for { - req, err := codec.ReadRequest() - if err == io.EOF { - codec.Close() - return - } else if err != nil { - glog.V(logger.Error).Infof("IPC recv err - %v\n", err) - codec.Close() - return - } - - var rpcResponse interface{} - res, err := api.Execute(req) - - rpcResponse = shared.NewRpcResponse(req.Id, req.Jsonrpc, res, err) - err = codec.WriteResponse(rpcResponse) - if err != nil { - glog.V(logger.Error).Infof("IPC send err - %v\n", err) - codec.Close() - return - } - } - }(conn) - } - }() - + + go handle(conn, api, codec) + } + + os.Remove(cfg.Endpoint) + }() + glog.V(logger.Info).Infof("IPC service started (%s)\n", cfg.Endpoint) return nil -- cgit v1.2.3 From 5757a0edb59f854433d11982b2ba4831cceb167e Mon Sep 17 00:00:00 2001 From: Bas van Kervel Date: Thu, 25 Jun 2015 14:32:22 +0200 Subject: added IPC timeout support --- rpc/codec/json.go | 60 +++++++++++++++++++++++++++++++------------------------ 1 file changed, 34 insertions(+), 26 deletions(-) (limited to 'rpc') diff --git a/rpc/codec/json.go b/rpc/codec/json.go index 1de649c21..0b1a90562 100644 --- a/rpc/codec/json.go +++ b/rpc/codec/json.go @@ -10,61 +10,69 @@ import ( ) const ( + READ_TIMEOUT = 15 // read timeout in seconds MAX_REQUEST_SIZE = 1024 * 1024 MAX_RESPONSE_SIZE = 1024 * 1024 ) // Json serialization support type JsonCodec struct { - c net.Conn - buffer []byte - bytesInBuffer int + c net.Conn } // Create new JSON coder instance func NewJsonCoder(conn net.Conn) ApiCoder { return &JsonCodec{ - c: conn, - buffer: make([]byte, MAX_REQUEST_SIZE), - bytesInBuffer: 0, + c: conn, } } // Serialize obj to JSON and write it to conn func (self *JsonCodec) ReadRequest() (requests []*shared.Request, isBatch bool, err error) { - n, err := self.c.Read(self.buffer[self.bytesInBuffer:]) - if err != nil { - self.bytesInBuffer = 0 + bytesInBuffer := 0 + buf := make([]byte, MAX_REQUEST_SIZE) + + deadline := time.Now().Add(READ_TIMEOUT * time.Second) + if err := self.c.SetDeadline(deadline); err != nil { return nil, false, err } - self.bytesInBuffer += n + for { + n, err := self.c.Read(buf[bytesInBuffer:]) + if err != nil { + self.c.Close() + return nil, false, err + } + + bytesInBuffer += n - singleRequest := shared.Request{} - err = json.Unmarshal(self.buffer[:self.bytesInBuffer], &singleRequest) - if err == nil { - self.bytesInBuffer = 0 - requests := make([]*shared.Request, 1) - requests[0] = &singleRequest - return requests, false, nil - } + singleRequest := shared.Request{} + err = json.Unmarshal(buf[:bytesInBuffer], &singleRequest) + if err == nil { + requests := make([]*shared.Request, 1) + requests[0] = &singleRequest + return requests, false, nil + } - requests = make([]*shared.Request, 0) - err = json.Unmarshal(self.buffer[:self.bytesInBuffer], &requests) - if err == nil { - self.bytesInBuffer = 0 - return requests, true, nil + requests = make([]*shared.Request, 0) + err = json.Unmarshal(buf[:bytesInBuffer], &requests) + if err == nil { + return requests, true, nil + } } - return nil, false, err + self.c.Close() // timeout + return nil, false, fmt.Errorf("Unable to read response") } func (self *JsonCodec) ReadResponse() (interface{}, error) { bytesInBuffer := 0 buf := make([]byte, MAX_RESPONSE_SIZE) - deadline := time.Now().Add(15 * time.Second) - self.c.SetDeadline(deadline) + deadline := time.Now().Add(READ_TIMEOUT * time.Second) + if err := self.c.SetDeadline(deadline); err != nil { + return nil, err + } for { n, err := self.c.Read(buf[bytesInBuffer:]) -- cgit v1.2.3 From fdbf8be7356cb8a80c6fdfe0d24b0863903e1832 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 25 Jun 2015 15:33:26 +0300 Subject: cmd/geth, rpc/api: fix reported metrics issues --- rpc/api/debug.go | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) (limited to 'rpc') diff --git a/rpc/api/debug.go b/rpc/api/debug.go index 5975f88ab..f16f62d2e 100644 --- a/rpc/api/debug.go +++ b/rpc/api/debug.go @@ -193,11 +193,6 @@ func (self *debugApi) Metrics(req *shared.Request) (interface{}, error) { format := func(total float64, rate float64) string { return fmt.Sprintf("%s (%s/s)", round(total, 0), round(rate, 2)) } - // Create the percentile units - percentiles := make([]float64, 101) - for i := 0; i <= 100; i++ { - percentiles[i] = float64(i) / 100 - } // Iterate over all the metrics, and just dump for now counters := make(map[string]interface{}) metrics.DefaultRegistry.Each(func(name string, metric interface{}) { @@ -220,21 +215,23 @@ func (self *debugApi) Metrics(req *shared.Request) (interface{}, error) { "AvgRate05Min": metric.Rate5(), "AvgRate15Min": metric.Rate15(), "MeanRate": metric.RateMean(), - "Total": float64(metric.Count()), + "Overall": float64(metric.Count()), } case metrics.Timer: - ps := make(map[string]interface{}) - for i, p := range metric.Percentiles(percentiles) { - ps[fmt.Sprintf("%d", i)] = p - } root[name] = map[string]interface{}{ "AvgRate01Min": metric.Rate1(), "AvgRate05Min": metric.Rate5(), "AvgRate15Min": metric.Rate15(), "MeanRate": metric.RateMean(), - "Total": float64(metric.Count()), - "Percentiles": ps, + "Overall": float64(metric.Count()), + "Percentiles": map[string]interface{}{ + "5": metric.Percentile(0.05), + "20": metric.Percentile(0.2), + "50": metric.Percentile(0.5), + "80": metric.Percentile(0.8), + "95": metric.Percentile(0.95), + }, } default: @@ -247,7 +244,7 @@ func (self *debugApi) Metrics(req *shared.Request) (interface{}, error) { "Avg01Min": format(metric.Rate1()*60, metric.Rate1()), "Avg05Min": format(metric.Rate5()*300, metric.Rate5()), "Avg15Min": format(metric.Rate15()*900, metric.Rate15()), - "Total": format(float64(metric.Count()), metric.RateMean()), + "Overall": format(float64(metric.Count()), metric.RateMean()), } case metrics.Timer: @@ -255,15 +252,15 @@ func (self *debugApi) Metrics(req *shared.Request) (interface{}, error) { "Avg01Min": format(metric.Rate1()*60, metric.Rate1()), "Avg05Min": format(metric.Rate5()*300, metric.Rate5()), "Avg15Min": format(metric.Rate15()*900, metric.Rate15()), - "Total": format(float64(metric.Count()), metric.RateMean()), + "Overall": format(float64(metric.Count()), metric.RateMean()), "Maximum": time.Duration(metric.Max()).String(), "Minimum": time.Duration(metric.Min()).String(), "Percentiles": map[string]interface{}{ + "5": time.Duration(metric.Percentile(0.05)).String(), "20": time.Duration(metric.Percentile(0.2)).String(), "50": time.Duration(metric.Percentile(0.5)).String(), "80": time.Duration(metric.Percentile(0.8)).String(), "95": time.Duration(metric.Percentile(0.95)).String(), - "99": time.Duration(metric.Percentile(0.99)).String(), }, } -- cgit v1.2.3 From 662285074e55a3915f7236a04fec355c3f416eb8 Mon Sep 17 00:00:00 2001 From: Bas van Kervel Date: Thu, 25 Jun 2015 15:54:16 +0200 Subject: improved logging for IPC connection lifetime management --- rpc/comms/comms.go | 8 ++++---- rpc/comms/ipc.go | 5 +++++ rpc/comms/ipc_unix.go | 5 ++++- rpc/comms/ipc_windows.go | 17 ++++++++++------- 4 files changed, 23 insertions(+), 12 deletions(-) (limited to 'rpc') diff --git a/rpc/comms/comms.go b/rpc/comms/comms.go index 1374bde3f..6e980149f 100644 --- a/rpc/comms/comms.go +++ b/rpc/comms/comms.go @@ -43,7 +43,7 @@ type EthereumClient interface { SupportedModules() (map[string]string, error) } -func handle(conn net.Conn, api shared.EthereumApi, c codec.Codec) { +func handle(id int, conn net.Conn, api shared.EthereumApi, c codec.Codec) { codec := c.New(conn) for { @@ -52,8 +52,8 @@ func handle(conn net.Conn, api shared.EthereumApi, c codec.Codec) { codec.Close() return } else if err != nil { - glog.V(logger.Error).Infof("comms recv err - %v\n", err) codec.Close() + glog.V(logger.Debug).Infof("Closed IPC Conn %06d recv err - %v\n", id, err) return } @@ -71,8 +71,8 @@ func handle(conn net.Conn, api shared.EthereumApi, c codec.Codec) { err = codec.WriteResponse(responses[:responseCount]) if err != nil { - glog.V(logger.Error).Infof("comms send err - %v\n", err) codec.Close() + glog.V(logger.Debug).Infof("Closed IPC Conn %06d send err - %v\n", id, err) return } } else { @@ -82,8 +82,8 @@ func handle(conn net.Conn, api shared.EthereumApi, c codec.Codec) { rpcResponse = shared.NewRpcResponse(requests[0].Id, requests[0].Jsonrpc, res, err) err = codec.WriteResponse(rpcResponse) if err != nil { - glog.V(logger.Error).Infof("comms send err - %v\n", err) codec.Close() + glog.V(logger.Debug).Infof("Closed IPC Conn %06d send err - %v\n", id, err) return } } diff --git a/rpc/comms/ipc.go b/rpc/comms/ipc.go index 3cfcbf3cf..f3dda5581 100644 --- a/rpc/comms/ipc.go +++ b/rpc/comms/ipc.go @@ -2,6 +2,7 @@ package comms import ( "fmt" + "math/rand" "net" "encoding/json" @@ -95,3 +96,7 @@ func NewIpcClient(cfg IpcConfig, codec codec.Codec) (*ipcClient, error) { func StartIpc(cfg IpcConfig, codec codec.Codec, offeredApi shared.EthereumApi) error { return startIpc(cfg, codec, offeredApi) } + +func newIpcConnId() int { + return rand.Int() % 1000000 +} diff --git a/rpc/comms/ipc_unix.go b/rpc/comms/ipc_unix.go index 5724231f4..3e71c7d32 100644 --- a/rpc/comms/ipc_unix.go +++ b/rpc/comms/ipc_unix.go @@ -48,7 +48,10 @@ func startIpc(cfg IpcConfig, codec codec.Codec, api shared.EthereumApi) error { continue } - go handle(conn, api, codec) + id := newIpcConnId() + glog.V(logger.Debug).Infof("New IPC connection with id %06d started\n", id) + + go handle(id, conn, api, codec) } os.Remove(cfg.Endpoint) diff --git a/rpc/comms/ipc_windows.go b/rpc/comms/ipc_windows.go index 4914a99c4..203cd2d7b 100644 --- a/rpc/comms/ipc_windows.go +++ b/rpc/comms/ipc_windows.go @@ -667,13 +667,16 @@ func startIpc(cfg IpcConfig, codec codec.Codec, api shared.EthereumApi) error { glog.V(logger.Error).Infof("Error accepting ipc connection - %v\n", err) continue } - - go handle(conn, api, codec) - } - - os.Remove(cfg.Endpoint) - }() - + + id := newIpcConnId() + glog.V(logger.Debug).Infof("New IPC connection with id %06d started\n", id) + + go handle(id, conn, api, codec) + } + + os.Remove(cfg.Endpoint) + }() + glog.V(logger.Info).Infof("IPC service started (%s)\n", cfg.Endpoint) return nil -- cgit v1.2.3 From 7098ec691c349f9b4473756e4ccaf97d46956da3 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 24 Jun 2015 01:35:39 +0200 Subject: rpc: unmask pending block fields This pleases the RPC tests. --- rpc/api/eth.go | 8 -------- 1 file changed, 8 deletions(-) (limited to 'rpc') diff --git a/rpc/api/eth.go b/rpc/api/eth.go index 0dff138c6..962c8d0f9 100644 --- a/rpc/api/eth.go +++ b/rpc/api/eth.go @@ -348,14 +348,6 @@ func (self *ethApi) GetBlockByNumber(req *shared.Request) (interface{}, error) { block := self.xeth.EthBlockByNumber(args.BlockNumber) br := NewBlockRes(block, args.IncludeTxs) - // If request was for "pending", nil nonsensical fields - if args.BlockNumber == -2 { - br.BlockHash = nil - br.BlockNumber = nil - br.Miner = nil - br.Nonce = nil - br.LogsBloom = nil - } return br, nil } -- cgit v1.2.3 From 76821d167acd7da15e13b23beeceb6779138ffe5 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Sat, 27 Jun 2015 03:08:50 +0200 Subject: core, eth, rpc: avoid unnecessary block header copying --- rpc/api/parsing.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) (limited to 'rpc') diff --git a/rpc/api/parsing.go b/rpc/api/parsing.go index 85a9165e5..632462c31 100644 --- a/rpc/api/parsing.go +++ b/rpc/api/parsing.go @@ -270,29 +270,31 @@ func NewBlockRes(block *types.Block, fullTx bool) *BlockRes { res.BlockHash = newHexData(block.Hash()) res.ParentHash = newHexData(block.ParentHash()) res.Nonce = newHexData(block.Nonce()) - res.Sha3Uncles = newHexData(block.Header().UncleHash) + res.Sha3Uncles = newHexData(block.UncleHash()) res.LogsBloom = newHexData(block.Bloom()) - res.TransactionRoot = newHexData(block.Header().TxHash) + res.TransactionRoot = newHexData(block.TxHash()) res.StateRoot = newHexData(block.Root()) - res.Miner = newHexData(block.Header().Coinbase) + res.Miner = newHexData(block.Coinbase()) res.Difficulty = newHexNum(block.Difficulty()) res.TotalDifficulty = newHexNum(block.Td) res.Size = newHexNum(block.Size().Int64()) - res.ExtraData = newHexData(block.Header().Extra) + res.ExtraData = newHexData(block.Extra()) res.GasLimit = newHexNum(block.GasLimit()) res.GasUsed = newHexNum(block.GasUsed()) res.UnixTimestamp = newHexNum(block.Time()) - res.Transactions = make([]*TransactionRes, len(block.Transactions())) - for i, tx := range block.Transactions() { + txs := block.Transactions() + res.Transactions = make([]*TransactionRes, len(txs)) + for i, tx := range txs { res.Transactions[i] = NewTransactionRes(tx) res.Transactions[i].BlockHash = res.BlockHash res.Transactions[i].BlockNumber = res.BlockNumber res.Transactions[i].TxIndex = newHexNum(i) } - res.Uncles = make([]*UncleRes, len(block.Uncles())) - for i, uncle := range block.Uncles() { + uncles := block.Uncles() + res.Uncles = make([]*UncleRes, len(uncles)) + for i, uncle := range uncles { res.Uncles[i] = NewUncleRes(uncle) } -- cgit v1.2.3