diff options
author | Felix Lange <fjl@twurst.com> | 2015-12-15 00:38:10 +0800 |
---|---|---|
committer | Felix Lange <fjl@twurst.com> | 2015-12-15 00:38:10 +0800 |
commit | fa187a366dda1894179635eeec2a929bfacc4ad3 (patch) | |
tree | 4494d4dcded47dd49f2fe7374e85fefa9249246e /rpc | |
parent | 787d71d6595df98586c625e82f4decb034215203 (diff) | |
parent | eae81465c1c815c317cd30e4de6bdf4d59df2340 (diff) | |
download | go-tangerine-fa187a366dda1894179635eeec2a929bfacc4ad3.tar go-tangerine-fa187a366dda1894179635eeec2a929bfacc4ad3.tar.gz go-tangerine-fa187a366dda1894179635eeec2a929bfacc4ad3.tar.bz2 go-tangerine-fa187a366dda1894179635eeec2a929bfacc4ad3.tar.lz go-tangerine-fa187a366dda1894179635eeec2a929bfacc4ad3.tar.xz go-tangerine-fa187a366dda1894179635eeec2a929bfacc4ad3.tar.zst go-tangerine-fa187a366dda1894179635eeec2a929bfacc4ad3.zip |
Merge pull request #2035 from bas-vk/rcp-v2-rebase
rpc: new RPC implementation with pub/sub support
Diffstat (limited to 'rpc')
-rw-r--r-- | rpc/api/mergedapi.go | 8 | ||||
-rw-r--r-- | rpc/api/personal_js.go | 5 | ||||
-rw-r--r-- | rpc/api/utils.go | 2 | ||||
-rw-r--r-- | rpc/comms/ipc.go | 22 | ||||
-rw-r--r-- | rpc/jeth.go | 72 | ||||
-rw-r--r-- | rpc/v2/doc.go | 102 | ||||
-rw-r--r-- | rpc/v2/errors.go | 85 | ||||
-rw-r--r-- | rpc/v2/json.go | 343 | ||||
-rw-r--r-- | rpc/v2/json_test.go | 73 | ||||
-rw-r--r-- | rpc/v2/server.go | 378 | ||||
-rw-r--r-- | rpc/v2/server_test.go | 219 | ||||
-rw-r--r-- | rpc/v2/types.go | 352 | ||||
-rw-r--r-- | rpc/v2/types_test.go | 57 | ||||
-rw-r--r-- | rpc/v2/utils.go | 205 |
14 files changed, 1919 insertions, 4 deletions
diff --git a/rpc/api/mergedapi.go b/rpc/api/mergedapi.go index 8f4ef8e60..92e1e2bb7 100644 --- a/rpc/api/mergedapi.go +++ b/rpc/api/mergedapi.go @@ -39,9 +39,11 @@ func newMergedApi(apis ...shared.EthereumApi) *MergedApi { mergedApi.methods = make(map[string]shared.EthereumApi) for _, api := range apis { - mergedApi.apis[api.Name()] = api.ApiVersion() - for _, method := range api.Methods() { - mergedApi.methods[method] = api + if api != nil { + mergedApi.apis[api.Name()] = api.ApiVersion() + for _, method := range api.Methods() { + mergedApi.methods[method] = api + } } } return mergedApi diff --git a/rpc/api/personal_js.go b/rpc/api/personal_js.go index 81c5d4a36..84c669af7 100644 --- a/rpc/api/personal_js.go +++ b/rpc/api/personal_js.go @@ -33,6 +33,11 @@ web3._extend({ call: 'personal_unlockAccount', params: 3, inputFormatter: [null, null, null] + }), + new web3._extend.Method({ + name: 'lockAccount', + call: 'personal_lockAccount', + params: 1 }) ], properties: diff --git a/rpc/api/utils.go b/rpc/api/utils.go index d6820cd2e..794b6abee 100644 --- a/rpc/api/utils.go +++ b/rpc/api/utils.go @@ -191,6 +191,8 @@ func ParseApiString(apistr string, codec codec.Codec, xeth *xeth.XEth, stack *no apis[i] = NewPersonalApi(xeth, eth, codec) case shared.Web3ApiName: apis[i] = NewWeb3Api(xeth, codec) + case "rpc": // gives information about the RPC interface + continue default: return nil, fmt.Errorf("Unknown API '%s'", name) } diff --git a/rpc/comms/ipc.go b/rpc/comms/ipc.go index 882d62ab4..3ba747b1d 100644 --- a/rpc/comms/ipc.go +++ b/rpc/comms/ipc.go @@ -69,13 +69,28 @@ func (self *ipcClient) SupportedModules() (map[string]string, error) { req := shared.Request{ Id: 1, Jsonrpc: "2.0", - Method: "modules", + Method: "rpc_modules", } if err := self.coder.WriteResponse(req); err != nil { return nil, err } + res, _ := self.coder.ReadResponse() + if sucRes, ok := res.(*shared.SuccessResponse); ok { + data, _ := json.Marshal(sucRes.Result) + modules := make(map[string]string) + if err := json.Unmarshal(data, &modules); err == nil { + return modules, nil + } + } + + // old version uses modules instead of rpc_modules, this can be removed after full migration + req.Method = "modules" + if err := self.coder.WriteResponse(req); err != nil { + return nil, err + } + res, err := self.coder.ReadResponse() if err != nil { return nil, err @@ -108,6 +123,11 @@ func StartIpc(cfg IpcConfig, codec codec.Codec, initializer InitFunc) error { return nil } +// CreateListener creates an listener, on Unix platforms this is a unix socket, on Windows this is a named pipe +func CreateListener(cfg IpcConfig) (net.Listener, error) { + return ipcListen(cfg) +} + func ipcLoop(cfg IpcConfig, codec codec.Codec, initializer InitFunc, l net.Listener) { glog.V(logger.Info).Infof("IPC service started (%s)\n", cfg.Endpoint) defer os.Remove(cfg.Endpoint) diff --git a/rpc/jeth.go b/rpc/jeth.go index 1260a3404..de7dd1e76 100644 --- a/rpc/jeth.go +++ b/rpc/jeth.go @@ -54,6 +54,78 @@ func (self *Jeth) err(call otto.FunctionCall, code int, msg string, id interface return res } +// UnlockAccount asks the user for the password and than executes the jeth.UnlockAccount callback in the jsre +func (self *Jeth) UnlockAccount(call otto.FunctionCall) (response otto.Value) { + var cmd, account, passwd string + timeout := int64(300) + var ok bool + + if len(call.ArgumentList) == 0 { + fmt.Println("expected address of account to unlock") + return otto.FalseValue() + } + + if len(call.ArgumentList) >= 1 { + if accountExport, err := call.Argument(0).Export(); err == nil { + if account, ok = accountExport.(string); ok { + if len(call.ArgumentList) == 1 { + fmt.Printf("Unlock account %s\n", account) + passwd, err = utils.PromptPassword("Passphrase: ", true) + if err != nil { + return otto.FalseValue() + } + } + } + } + } + if len(call.ArgumentList) >= 2 { + if passwdExport, err := call.Argument(1).Export(); err == nil { + passwd, _ = passwdExport.(string) + } + } + + if len(call.ArgumentList) >= 3 { + if timeoutExport, err := call.Argument(2).Export(); err == nil { + timeout, _ = timeoutExport.(int64) + } + } + + cmd = fmt.Sprintf("jeth.unlockAccount('%s', '%s', %d)", account, passwd, timeout) + if val, err := call.Otto.Run(cmd); err == nil { + return val + } + + return otto.FalseValue() +} + +// NewAccount asks the user for the password and than executes the jeth.newAccount callback in the jsre +func (self *Jeth) NewAccount(call otto.FunctionCall) (response otto.Value) { + if len(call.ArgumentList) == 0 { + passwd, err := utils.PromptPassword("Passphrase: ", true) + if err != nil { + return otto.FalseValue() + } + passwd2, err := utils.PromptPassword("Repeat passphrase: ", true) + if err != nil { + return otto.FalseValue() + } + + if passwd != passwd2 { + fmt.Println("Passphrases don't match") + return otto.FalseValue() + } + + cmd := fmt.Sprintf("jeth.newAccount('%s')", passwd) + if val, err := call.Otto.Run(cmd); err == nil { + return val + } + } else { + fmt.Println("New account doesn't expect argument(s), you will be prompted for a password") + } + + return otto.FalseValue() +} + func (self *Jeth) Send(call otto.FunctionCall) (response otto.Value) { reqif, err := call.Argument(0).Export() if err != nil { diff --git a/rpc/v2/doc.go b/rpc/v2/doc.go new file mode 100644 index 000000000..e51494adb --- /dev/null +++ b/rpc/v2/doc.go @@ -0,0 +1,102 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +/* +Package rpc provides access to the exported methods of an object across a network +or other I/O connection. After creating a server instance objects can be registered, +making it visible from the outside. Exported methods that follow specific +conventions can be called remotely. It also has support for the publish/subscribe +pattern. + +Methods that satisfy the following criteria are made available for remote access: + - object must be exported + - method must be exported + - method returns 0, 1 (response or error) or 2 (response and error) values + - method argument(s) must be exported or builtin types + - method returned value(s) must be exported or builtin types + +An example method: + func (s *CalcService) Div(a, b int) (int, error) + +When the returned error isn't nil the returned integer is ignored and the error is +send back to the client. Otherwise the returned integer is send back to the client. + +The server offers the ServeCodec method which accepts a ServerCodec instance. It will +read requests from the codec, process the request and sends the response back to the +client using the codec. The server can execute requests concurrently. Responses +can be send back to the client out of order. + +An example server which uses the JSON codec: + type CalculatorService struct {} + + func (s *CalculatorService) Add(a, b int) int { + return a + b + } + + func (s *CalculatorService Div(a, b int) (int, error) { + if b == 0 { + return 0, errors.New("divide by zero") + } + return a/b, nil + } + + calculator := new(CalculatorService) + server := NewServer() + server.RegisterName("calculator", calculator") + + l, _ := net.ListenUnix("unix", &net.UnixAddr{Net: "unix", Name: "/tmp/calculator.sock"}) + for { + c, _ := l.AcceptUnix() + codec := v2.NewJSONCodec(c) + go server.ServeCodec(codec) + } + +The package also supports the publish subscribe pattern through the use of subscriptions. +A method that is considered eligible for notifications must satisfy the following criteria: + - object must be exported + - method must be exported + - method argument(s) must be exported or builtin types + - method must return the tuple Subscription, error + + +An example method: + func (s *BlockChainService) Head() (Subscription, error) { + sub := s.bc.eventMux.Subscribe(ChainHeadEvent{}) + return v2.NewSubscription(sub), nil + } + +This method will push all raised ChainHeadEvents to subscribed clients. If the client is only +interested in every N'th block it is possible to add a criteria. + + func (s *BlockChainService) HeadFiltered(nth uint64) (Subscription, error) { + sub := s.bc.eventMux.Subscribe(ChainHeadEvent{}) + + criteria := func(event interface{}) bool { + chainHeadEvent := event.(ChainHeadEvent) + if chainHeadEvent.Block.NumberU64() % nth == 0 { + return true + } + return false + } + + return v2.NewSubscriptionFiltered(sub, criteria), nil + } + +Subscriptions are deleted when: + - the user sends an unsubscribe request + - the connection which was used to create the subscription is closed +*/ +package v2 diff --git a/rpc/v2/errors.go b/rpc/v2/errors.go new file mode 100644 index 000000000..a06d19d84 --- /dev/null +++ b/rpc/v2/errors.go @@ -0,0 +1,85 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package v2 + +import "fmt" + +// request is for an unknown service +type methodNotFoundError struct { + service string + method string +} + +func (e *methodNotFoundError) Code() int { + return -32601 +} + +func (e *methodNotFoundError) Error() string { + return fmt.Sprintf("The method %s%s%s does not exist/is not available", e.service, serviceMethodSeparator, e.method) +} + +// received message isn't a valid request +type invalidRequestError struct { + message string +} + +func (e *invalidRequestError) Code() int { + return -32600 +} + +func (e *invalidRequestError) Error() string { + return e.message +} + +// received message is invalid +type invalidMessageError struct { + message string +} + +func (e *invalidMessageError) Code() int { + return -32700 +} + +func (e *invalidMessageError) Error() string { + return e.message +} + +// unable to decode supplied params, or an invalid number of parameters +type invalidParamsError struct { + message string +} + +func (e *invalidParamsError) Code() int { + return -32602 +} + +func (e *invalidParamsError) Error() string { + return e.message +} + +// logic error, callback returned an error +type callbackError struct { + message string +} + +func (e *callbackError) Code() int { + return -32000 +} + +func (e *callbackError) Error() string { + return e.message +} diff --git a/rpc/v2/json.go b/rpc/v2/json.go new file mode 100644 index 000000000..9208e2d37 --- /dev/null +++ b/rpc/v2/json.go @@ -0,0 +1,343 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package v2 + +import ( + "encoding/json" + "fmt" + "io" + "reflect" + "strings" + "sync/atomic" + + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" +) + +const ( + jsonRPCVersion = "2.0" + serviceMethodSeparator = "_" + subscribeMethod = "eth_subscribe" + unsubscribeMethod = "eth_unsubscribe" + notificationMethod = "eth_subscription" +) + +// JSON-RPC request +type jsonRequest struct { + Method string `json:"method"` + Version string `json:"jsonrpc"` + Id *int64 `json:"id,omitempty"` + Payload json.RawMessage `json:"params"` +} + +// JSON-RPC response +type jsonSuccessResponse struct { + Version string `json:"jsonrpc"` + Id int64 `json:"id"` + Result interface{} `json:"result,omitempty"` +} + +// JSON-RPC error object +type jsonError struct { + Code int `json:"code"` + Message string `json:"message"` + Data interface{} `json:"data,omitempty"` +} + +// JSON-RPC error response +type jsonErrResponse struct { + Version string `json:"jsonrpc"` + Id *int64 `json:"id,omitempty"` + Error jsonError `json:"error"` +} + +// JSON-RPC notification payload +type jsonSubscription struct { + Subscription string `json:"subscription"` + Result interface{} `json:"result,omitempty"` +} + +// JSON-RPC notification +type jsonNotification struct { + Version string `json:"jsonrpc"` + Method string `json:"method"` + Params jsonSubscription `json:"params"` +} + +// jsonCodec reads and writes JSON-RPC messages to the underlying connection. It also has support for parsing arguments +// and serializing (result) objects. +type jsonCodec struct { + closed chan interface{} + isClosed int32 + d *json.Decoder + e *json.Encoder + req jsonRequest + rw io.ReadWriteCloser +} + +// NewJSONCodec creates a new RPC server codec with support for JSON-RPC 2.0 +func NewJSONCodec(rwc io.ReadWriteCloser) ServerCodec { + d := json.NewDecoder(rwc) + d.UseNumber() + return &jsonCodec{closed: make(chan interface{}), d: d, e: json.NewEncoder(rwc), rw: rwc, isClosed: 0} +} + +// isBatch returns true when the first non-whitespace characters is '[' +func isBatch(msg json.RawMessage) bool { + for _, c := range msg { + // skip insignificant whitespace (http://www.ietf.org/rfc/rfc4627.txt) + if c == 0x20 || c == 0x09 || c == 0x0a || c == 0x0d { + continue + } + return c == '[' + } + return false +} + +// ReadRequestHeaders will read new requests without parsing the arguments. It will return a collection of requests, an +// indication if these requests are in batch form or an error when the incoming message could not be read/parsed. +func (c *jsonCodec) ReadRequestHeaders() ([]rpcRequest, bool, RPCError) { + var incomingMsg json.RawMessage + if err := c.d.Decode(&incomingMsg); err != nil { + return nil, false, &invalidRequestError{err.Error()} + } + + if isBatch(incomingMsg) { + return parseBatchRequest(incomingMsg) + } + + return parseRequest(incomingMsg) +} + +// parseRequest will parse a single request from the given RawMessage. It will return the parsed request, an indication +// if the request was a batch or an error when the request could not be parsed. +func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCError) { + var in jsonRequest + if err := json.Unmarshal(incomingMsg, &in); err != nil { + return nil, false, &invalidMessageError{err.Error()} + } + + if in.Id == nil { + return nil, false, &invalidMessageError{"Server cannot handle notifications"} + } + + // subscribe are special, they will always use `subscribeMethod` as service method + if in.Method == subscribeMethod { + reqs := []rpcRequest{rpcRequest{id: *in.Id, isPubSub: true}} + if len(in.Payload) > 0 { + // first param must be subscription name + var subscribeMethod [1]string + if err := json.Unmarshal(in.Payload, &subscribeMethod); err != nil { + glog.V(logger.Debug).Infof("Unable to parse subscription method: %v\n", err) + return nil, false, &invalidRequestError{"Unable to parse subscription request"} + } + + // all subscriptions are made on the eth service + reqs[0].service, reqs[0].method = "eth", subscribeMethod[0] + reqs[0].params = in.Payload + return reqs, false, nil + } + return nil, false, &invalidRequestError{"Unable to parse subscription request"} + } + + if in.Method == unsubscribeMethod { + return []rpcRequest{rpcRequest{id: *in.Id, isPubSub: true, + method: unsubscribeMethod, params: in.Payload}}, false, nil + } + + // regular RPC call + elems := strings.Split(in.Method, serviceMethodSeparator) + if len(elems) != 2 { + return nil, false, &methodNotFoundError{in.Method, ""} + } + + if len(in.Payload) == 0 { + return []rpcRequest{rpcRequest{service: elems[0], method: elems[1], id: *in.Id}}, false, nil + } + + return []rpcRequest{rpcRequest{service: elems[0], method: elems[1], id: *in.Id, params: in.Payload}}, false, nil +} + +// parseBatchRequest will parse a batch request into a collection of requests from the given RawMessage, an indication +// if the request was a batch or an error when the request could not be read. +func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCError) { + var in []jsonRequest + if err := json.Unmarshal(incomingMsg, &in); err != nil { + return nil, false, &invalidMessageError{err.Error()} + } + + requests := make([]rpcRequest, len(in)) + for i, r := range in { + if r.Id == nil { + return nil, true, &invalidMessageError{"Server cannot handle notifications"} + } + + // (un)subscribe are special, they will always use the same service.method + if r.Method == subscribeMethod { + requests[i] = rpcRequest{id: *r.Id, isPubSub: true} + if len(r.Payload) > 0 { + var subscribeMethod [1]string + if err := json.Unmarshal(r.Payload, &subscribeMethod); err != nil { + glog.V(logger.Debug).Infof("Unable to parse subscription method: %v\n", err) + return nil, false, &invalidRequestError{"Unable to parse subscription request"} + } + + // all subscriptions are made on the eth service + requests[i].service, requests[i].method = "eth", subscribeMethod[0] + requests[i].params = r.Payload + continue + } + + return nil, true, &invalidRequestError{"Unable to parse (un)subscribe request arguments"} + } + + if r.Method == unsubscribeMethod { + requests[i] = rpcRequest{id: *r.Id, isPubSub: true, method: unsubscribeMethod, params: r.Payload} + continue + } + + elems := strings.Split(r.Method, serviceMethodSeparator) + if len(elems) != 2 { + return nil, true, &methodNotFoundError{r.Method, ""} + } + + if len(r.Payload) == 0 { + requests[i] = rpcRequest{service: elems[0], method: elems[1], id: *r.Id, params: nil} + } else { + requests[i] = rpcRequest{service: elems[0], method: elems[1], id: *r.Id, params: r.Payload} + } + } + + return requests, true, nil +} + +// ParseRequestArguments tries to parse the given params (json.RawMessage) with the given types. It returns the parsed +// values or an error when the parsing failed. +func (c *jsonCodec) ParseRequestArguments(argTypes []reflect.Type, params interface{}) ([]reflect.Value, RPCError) { + if args, ok := params.(json.RawMessage); !ok { + return nil, &invalidParamsError{"Invalid params supplied"} + } else { + return parsePositionalArguments(args, argTypes) + } +} + +func countArguments(args json.RawMessage) (int, error) { + var cnt []interface{} + if err := json.Unmarshal(args, &cnt); err != nil { + return -1, nil + } + return len(cnt), nil +} + +// parsePositionalArguments tries to parse the given args to an array of values with the given types. It returns the +// parsed values or an error when the args could not be parsed. +func parsePositionalArguments(args json.RawMessage, argTypes []reflect.Type) ([]reflect.Value, RPCError) { + argValues := make([]reflect.Value, len(argTypes)) + params := make([]interface{}, len(argTypes)) + + n, err := countArguments(args) + if err != nil { + return nil, &invalidParamsError{err.Error()} + } + if n != len(argTypes) { + return nil, &invalidParamsError{fmt.Sprintf("insufficient params, want %d have %d", len(argTypes), n)} + } + + for i, t := range argTypes { + if t.Kind() == reflect.Ptr { + // values must be pointers for the Unmarshal method, reflect. + // Dereference otherwise reflect.New would create **SomeType + argValues[i] = reflect.New(t.Elem()) + params[i] = argValues[i].Interface() + + // when not specified blockNumbers are by default latest (-1) + if blockNumber, ok := params[i].(*BlockNumber); ok { + *blockNumber = BlockNumber(-1) + } + } else { + argValues[i] = reflect.New(t) + params[i] = argValues[i].Interface() + + // when not specified blockNumbers are by default latest (-1) + if blockNumber, ok := params[i].(*BlockNumber); ok { + *blockNumber = BlockNumber(-1) + } + } + } + + if err := json.Unmarshal(args, ¶ms); err != nil { + return nil, &invalidParamsError{err.Error()} + } + + // Convert pointers back to values where necessary + for i, a := range argValues { + if a.Kind() != argTypes[i].Kind() { + argValues[i] = reflect.Indirect(argValues[i]) + } + } + + return argValues, nil +} + +// CreateResponse will create a JSON-RPC success response with the given id and reply as result. +func (c *jsonCodec) CreateResponse(id int64, reply interface{}) interface{} { + if isHexNum(reflect.TypeOf(reply)) { + return &jsonSuccessResponse{Version: jsonRPCVersion, Id: id, Result: fmt.Sprintf(`%#x`, reply)} + } + return &jsonSuccessResponse{Version: jsonRPCVersion, Id: id, Result: reply} +} + +// CreateErrorResponse will create a JSON-RPC error response with the given id and error. +func (c *jsonCodec) CreateErrorResponse(id *int64, err RPCError) interface{} { + return &jsonErrResponse{Version: jsonRPCVersion, Id: id, Error: jsonError{Code: err.Code(), Message: err.Error()}} +} + +// CreateErrorResponseWithInfo will create a JSON-RPC error response with the given id and error. +// info is optional and contains additional information about the error. When an empty string is passed it is ignored. +func (c *jsonCodec) CreateErrorResponseWithInfo(id *int64, err RPCError, info interface{}) interface{} { + return &jsonErrResponse{Version: jsonRPCVersion, Id: id, + Error: jsonError{Code: err.Code(), Message: err.Error(), Data: info}} +} + +// CreateNotification will create a JSON-RPC notification with the given subscription id and event as params. +func (c *jsonCodec) CreateNotification(subid string, event interface{}) interface{} { + if isHexNum(reflect.TypeOf(event)) { + return &jsonNotification{Version: jsonRPCVersion, Method: notificationMethod, + Params: jsonSubscription{Subscription: subid, Result: fmt.Sprintf(`%#x`, event)}} + } + + return &jsonNotification{Version: jsonRPCVersion, Method: notificationMethod, + Params: jsonSubscription{Subscription: subid, Result: event}} +} + +// Write message to client +func (c *jsonCodec) Write(res interface{}) error { + return c.e.Encode(res) +} + +// Close the underlying connection +func (c *jsonCodec) Close() { + if atomic.CompareAndSwapInt32(&c.isClosed, 0, 1) { + close(c.closed) + c.rw.Close() + } +} + +// Closed returns a channel which will be closed when Close is called +func (c *jsonCodec) Closed() <-chan interface{} { + return c.closed +} diff --git a/rpc/v2/json_test.go b/rpc/v2/json_test.go new file mode 100644 index 000000000..dc8a345d7 --- /dev/null +++ b/rpc/v2/json_test.go @@ -0,0 +1,73 @@ +package v2 + +import ( + "bufio" + "bytes" + "reflect" + "testing" +) + +type RWC struct { + *bufio.ReadWriter +} + +func (rwc *RWC) Close() error { + return nil +} + +func TestJSONRequestParsing(t *testing.T) { + server := NewServer() + service := new(Service) + + if err := server.RegisterName("calc", service); err != nil { + t.Fatalf("%v", err) + } + + req := bytes.NewBufferString(`{"id": 1234, "jsonrpc": "2.0", "method": "calc_add", "params": [11, 22]}`) + var str string + reply := bytes.NewBufferString(str) + rw := &RWC{bufio.NewReadWriter(bufio.NewReader(req), bufio.NewWriter(reply))} + + codec := NewJSONCodec(rw) + + requests, batch, err := codec.ReadRequestHeaders() + if err != nil { + t.Fatalf("%v", err) + } + + if batch { + t.Fatalf("Request isn't a batch") + } + + if len(requests) != 1 { + t.Fatalf("Expected 1 request but got %d requests - %v", len(requests), requests) + } + + if requests[0].service != "calc" { + t.Fatalf("Expected service 'calc' but got '%s'", requests[0].service) + } + + if requests[0].method != "add" { + t.Fatalf("Expected method 'Add' but got '%s'", requests[0].method) + } + + if requests[0].id != 1234 { + t.Fatalf("Expected id 1234 but got %d", requests[0].id) + } + + var arg int + args := []reflect.Type{reflect.TypeOf(arg), reflect.TypeOf(arg)} + + v, err := codec.ParseRequestArguments(args, requests[0].params) + if err != nil { + t.Fatalf("%v", err) + } + + if len(v) != 2 { + t.Fatalf("Expected 2 argument values, got %d", len(v)) + } + + if v[0].Int() != 11 || v[1].Int() != 22 { + t.Fatalf("expected %d == 11 && %d == 22", v[0].Int(), v[1].Int()) + } +} diff --git a/rpc/v2/server.go b/rpc/v2/server.go new file mode 100644 index 000000000..ff6b69015 --- /dev/null +++ b/rpc/v2/server.go @@ -0,0 +1,378 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package v2 + +import ( + "fmt" + "reflect" + + "runtime" + + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" +) + +// NewServer will create a new server instance with no registered handlers. +func NewServer() *Server { + server := &Server{services: make(serviceRegistry), subscriptions: make(subscriptionRegistry)} + + // register a default service which will provide meta information about the RPC service such as the services and + // methods it offers. + rpcService := &RPCService{server} + server.RegisterName("rpc", rpcService) + + return server +} + +// RPCService gives meta information about the server. +// e.g. gives information about the loaded modules. +type RPCService struct { + server *Server +} + +// Modules returns the list of RPC services with their version number +func (s *RPCService) Modules() map[string]string { + modules := make(map[string]string) + for name, _ := range s.server.services { + modules[name] = "1.0" + } + return modules +} + +// RegisterName will create an service for the given rcvr type under the given name. When no methods on the given rcvr +// match the criteria to be either a RPC method or a subscription an error is returned. Otherwise a new service is +// created and added to the service collection this server instance serves. +func (s *Server) RegisterName(name string, rcvr interface{}) error { + if s.services == nil { + s.services = make(serviceRegistry) + } + + svc := new(service) + svc.typ = reflect.TypeOf(rcvr) + rcvrVal := reflect.ValueOf(rcvr) + + if name == "" { + return fmt.Errorf("no service name for type %s", svc.typ.String()) + } + if !isExported(reflect.Indirect(rcvrVal).Type().Name()) { + return fmt.Errorf("%s is not exported", reflect.Indirect(rcvrVal).Type().Name()) + } + + // already a previous service register under given sname, merge methods/subscriptions + if regsvc, present := s.services[name]; present { + methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ) + if len(methods) == 0 && len(subscriptions) == 0 { + return fmt.Errorf("Service doesn't have any suitable methods/subscriptions to expose") + } + + for _, m := range methods { + regsvc.callbacks[formatName(m.method.Name)] = m + } + for _, s := range subscriptions { + regsvc.subscriptions[formatName(s.method.Name)] = s + } + + return nil + } + + svc.name = name + svc.callbacks, svc.subscriptions = suitableCallbacks(rcvrVal, svc.typ) + + if len(svc.callbacks) == 0 && len(svc.subscriptions) == 0 { + return fmt.Errorf("Service doesn't have any suitable methods/subscriptions to expose") + } + + s.services[svc.name] = svc + + return nil +} + +// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes the +// response back using the given codec. It will block until the codec is closed. +// +// This server will: +// 1. allow for asynchronous and parallel request execution +// 2. supports notifications (pub/sub) +// 3. supports request batches +func (s *Server) ServeCodec(codec ServerCodec) { + defer func() { + if err := recover(); err != nil { + const size = 64 << 10 + buf := make([]byte, size) + buf = buf[:runtime.Stack(buf, false)] + glog.Errorln(string(buf)) + } + codec.Close() + }() + + for { + reqs, batch, err := s.readRequest(codec) + if err != nil { + glog.V(logger.Debug).Infof("%v\n", err) + codec.Write(codec.CreateErrorResponse(nil, err)) + break + } + + if batch { + go s.execBatch(codec, reqs) + } else { + go s.exec(codec, reqs[0]) + } + } +} + +// sendNotification will create a notification from the given event by serializing member fields of the event. +// It will then send the notification to the client, when it fails the codec is closed. When the event has multiple +// fields an array of values is returned. +func sendNotification(codec ServerCodec, subid string, event interface{}) { + notification := codec.CreateNotification(subid, event) + + if err := codec.Write(notification); err != nil { + codec.Close() + } +} + +// createSubscription will register a new subscription and waits for raised events. When an event is raised it will: +// 1. test if the event is raised matches the criteria the user has (optionally) specified +// 2. create a notification of the event and send it the client when it matches the criteria +// It will unsubscribe the subscription when the socket is closed or the subscription is unsubscribed by the user. +func (s *Server) createSubscription(c ServerCodec, req *serverRequest) (string, error) { + args := []reflect.Value{req.callb.rcvr} + if len(req.args) > 0 { + args = append(args, req.args...) + } + + subid, err := newSubscriptionId() + if err != nil { + return "", err + } + + reply := req.callb.method.Func.Call(args) + + if reply[1].IsNil() { // no error + if subscription, ok := reply[0].Interface().(Subscription); ok { + s.muSubcriptions.Lock() + s.subscriptions[subid] = subscription + s.muSubcriptions.Unlock() + go func() { + cases := []reflect.SelectCase{ + reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(subscription.Chan())}, // new event + reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c.Closed())}, // connection closed + } + + for { + idx, notification, recvOk := reflect.Select(cases) + switch idx { + case 0: // new event, or channel closed + if recvOk { // send notification + if event, ok := notification.Interface().(*event.Event); ok { + if subscription.match == nil || subscription.match(event.Data) { + sendNotification(c, subid, subscription.format(event.Data)) + } + } + } else { // user send an eth_unsubscribe request + return + } + case 1: // connection closed + s.unsubscribe(subid) + return + } + } + }() + } else { // unable to create subscription + s.muSubcriptions.Lock() + delete(s.subscriptions, subid) + s.muSubcriptions.Unlock() + } + } else { + return "", fmt.Errorf("Unable to create subscription") + } + + return subid, nil +} + +// unsubscribe calls the Unsubscribe method on the subscription and removes a subscription from the subscription +// registry. +func (s *Server) unsubscribe(subid string) bool { + s.muSubcriptions.Lock() + defer s.muSubcriptions.Unlock() + if sub, ok := s.subscriptions[subid]; ok { + sub.Unsubscribe() + delete(s.subscriptions, subid) + return true + } + return false +} + +// handle executes a request and returns the response from the callback. +func (s *Server) handle(codec ServerCodec, req *serverRequest) interface{} { + if req.err != nil { + return codec.CreateErrorResponse(&req.id, req.err) + } + + if req.isUnsubscribe { // first param must be the subscription id + if len(req.args) >= 1 && req.args[0].Kind() == reflect.String { + subid := req.args[0].String() + if s.unsubscribe(subid) { + return codec.CreateResponse(req.id, true) + } else { + return codec.CreateErrorResponse(&req.id, + &callbackError{fmt.Sprintf("subscription '%s' not found", subid)}) + } + } + return codec.CreateErrorResponse(&req.id, &invalidParamsError{"Expected subscription id as argument"}) + } + + if req.callb.isSubscribe { + subid, err := s.createSubscription(codec, req) + if err != nil { + return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}) + } + return codec.CreateResponse(req.id, subid) + } + + // regular RPC call + if len(req.args) != len(req.callb.argTypes) { + rpcErr := &invalidParamsError{fmt.Sprintf("%s%s%s expects %d parameters, got %d", + req.svcname, serviceMethodSeparator, req.callb.method.Name, + len(req.callb.argTypes), len(req.args))} + return codec.CreateErrorResponse(&req.id, rpcErr) + } + + arguments := []reflect.Value{req.callb.rcvr} + if len(req.args) > 0 { + arguments = append(arguments, req.args...) + } + + reply := req.callb.method.Func.Call(arguments) + + if len(reply) == 0 { + return codec.CreateResponse(req.id, nil) + } + + if req.callb.errPos >= 0 { // test if method returned an error + if !reply[req.callb.errPos].IsNil() { + e := reply[req.callb.errPos].Interface().(error) + res := codec.CreateErrorResponse(&req.id, &callbackError{e.Error()}) + return res + } + } + + return codec.CreateResponse(req.id, reply[0].Interface()) +} + +// exec executes the given request and writes the result back using the codec. +func (s *Server) exec(codec ServerCodec, req *serverRequest) { + var response interface{} + if req.err != nil { + response = codec.CreateErrorResponse(&req.id, req.err) + } else { + response = s.handle(codec, req) + } + + if err := codec.Write(response); err != nil { + glog.V(logger.Error).Infof("%v\n", err) + codec.Close() + } +} + +// execBatch executes the given requests and writes the result back using the codec. It will only write the response +// back when the last request is processed. +func (s *Server) execBatch(codec ServerCodec, requests []*serverRequest) { + responses := make([]interface{}, len(requests)) + for i, req := range requests { + if req.err != nil { + responses[i] = codec.CreateErrorResponse(&req.id, req.err) + } else { + responses[i] = s.handle(codec, req) + } + } + + if err := codec.Write(responses); err != nil { + glog.V(logger.Error).Infof("%v\n", err) + codec.Close() + } +} + +// readRequest requests the next (batch) request from the codec. It will return the collection of requests, an +// indication if the request was a batch, the invalid request identifier and an error when the request could not be +// read/parsed. +func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCError) { + reqs, batch, err := codec.ReadRequestHeaders() + if err != nil { + return nil, batch, err + } + + requests := make([]*serverRequest, len(reqs)) + + // verify requests + for i, r := range reqs { + var ok bool + var svc *service + + if r.isPubSub && r.method == unsubscribeMethod { + requests[i] = &serverRequest{id: r.id, isUnsubscribe: true} + argTypes := []reflect.Type{reflect.TypeOf("")} + if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil { + requests[i].args = args + } else { + requests[i].err = &invalidParamsError{err.Error()} + } + continue + } + + if svc, ok = s.services[r.service]; !ok { + requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}} + continue + } + + if r.isPubSub { // eth_subscribe + if callb, ok := svc.subscriptions[r.method]; ok { + requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb} + if r.params != nil && len(callb.argTypes) > 0 { + argTypes := []reflect.Type{reflect.TypeOf("")} + argTypes = append(argTypes, callb.argTypes...) + if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil { + requests[i].args = args[1:] // first one is service.method name which isn't an actual argument + } else { + requests[i].err = &invalidParamsError{err.Error()} + } + } + } else { + requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{subscribeMethod, r.method}} + } + continue + } + + if callb, ok := svc.callbacks[r.method]; ok { + requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb} + if r.params != nil && len(callb.argTypes) > 0 { + if args, err := codec.ParseRequestArguments(callb.argTypes, r.params); err == nil { + requests[i].args = args + } else { + requests[i].err = &invalidParamsError{err.Error()} + } + } + continue + } + + requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}} + } + + return requests, batch, nil +} diff --git a/rpc/v2/server_test.go b/rpc/v2/server_test.go new file mode 100644 index 000000000..f4f77672f --- /dev/null +++ b/rpc/v2/server_test.go @@ -0,0 +1,219 @@ +package v2 + +import ( + "encoding/json" + "fmt" + "reflect" + "testing" + "time" +) + +type Service struct{} + +type Args struct { + S string +} + +func (s *Service) NoArgsRets() { +} + +type Result struct { + String string + Int int + Args *Args +} + +func (s *Service) Echo(str string, i int, args *Args) Result { + return Result{str, i, args} +} + +func (s *Service) Rets() (string, error) { + return "", nil +} + +func (s *Service) InvalidRets1() (error, string) { + return nil, "" +} + +func (s *Service) InvalidRets2() (string, string) { + return "", "" +} + +func (s *Service) InvalidRets3() (string, string, error) { + return "", "", nil +} + +func (s *Service) Subscription() (Subscription, error) { + return NewSubscription(nil), nil +} + +func TestServerRegisterName(t *testing.T) { + server := NewServer() + service := new(Service) + + if err := server.RegisterName("calc", service); err != nil { + t.Fatalf("%v", err) + } + + if len(server.services) != 2 { + t.Fatalf("Expected 2 service entries, got %d", len(server.services)) + } + + svc, ok := server.services["calc"] + if !ok { + t.Fatalf("Expected service calc to be registered") + } + + if len(svc.callbacks) != 3 { + t.Errorf("Expected 3 callbacks for service 'calc', got %d", len(svc.callbacks)) + } + + if len(svc.subscriptions) != 1 { + t.Errorf("Expected 1 subscription for service 'calc', got %d", len(svc.subscriptions)) + } +} + +// dummy codec used for testing RPC method execution +type ServerTestCodec struct { + counter int + input []byte + output string + closer chan interface{} +} + +func (c *ServerTestCodec) ReadRequestHeaders() ([]rpcRequest, bool, RPCError) { + c.counter += 1 + + if c.counter == 1 { + var req jsonRequest + json.Unmarshal(c.input, &req) + return []rpcRequest{rpcRequest{id: *req.Id, isPubSub: false, service: "test", method: req.Method, params: req.Payload}}, false, nil + } + + // requests are executes in parallel, wait a bit before returning an error so that the previous request has time to + // be executed + timer := time.NewTimer(time.Duration(2) * time.Second) + <-timer.C + + return nil, false, &invalidRequestError{"connection closed"} +} + +func (c *ServerTestCodec) ParseRequestArguments(argTypes []reflect.Type, payload interface{}) ([]reflect.Value, RPCError) { + + args, _ := payload.(json.RawMessage) + + argValues := make([]reflect.Value, len(argTypes)) + params := make([]interface{}, len(argTypes)) + + n, err := countArguments(args) + if err != nil { + return nil, &invalidParamsError{err.Error()} + } + if n != len(argTypes) { + return nil, &invalidParamsError{fmt.Sprintf("insufficient params, want %d have %d", len(argTypes), n)} + + } + + for i, t := range argTypes { + if t.Kind() == reflect.Ptr { + // values must be pointers for the Unmarshal method, reflect. + // Dereference otherwise reflect.New would create **SomeType + argValues[i] = reflect.New(t.Elem()) + params[i] = argValues[i].Interface() + + // when not specified blockNumbers are by default latest (-1) + if blockNumber, ok := params[i].(*BlockNumber); ok { + *blockNumber = BlockNumber(-1) + } + } else { + argValues[i] = reflect.New(t) + params[i] = argValues[i].Interface() + + // when not specified blockNumbers are by default latest (-1) + if blockNumber, ok := params[i].(*BlockNumber); ok { + *blockNumber = BlockNumber(-1) + } + } + } + + if err := json.Unmarshal(args, ¶ms); err != nil { + return nil, &invalidParamsError{err.Error()} + } + + // Convert pointers back to values where necessary + for i, a := range argValues { + if a.Kind() != argTypes[i].Kind() { + argValues[i] = reflect.Indirect(argValues[i]) + } + } + + return argValues, nil +} + +func (c *ServerTestCodec) CreateResponse(id int64, reply interface{}) interface{} { + return &jsonSuccessResponse{Version: jsonRPCVersion, Id: id, Result: reply} +} + +func (c *ServerTestCodec) CreateErrorResponse(id *int64, err RPCError) interface{} { + return &jsonErrResponse{Version: jsonRPCVersion, Id: id, Error: jsonError{Code: err.Code(), Message: err.Error()}} +} + +func (c *ServerTestCodec) CreateErrorResponseWithInfo(id *int64, err RPCError, info interface{}) interface{} { + return &jsonErrResponse{Version: jsonRPCVersion, Id: id, + Error: jsonError{Code: err.Code(), Message: err.Error(), Data: info}} +} + +func (c *ServerTestCodec) CreateNotification(subid string, event interface{}) interface{} { + return &jsonNotification{Version: jsonRPCVersion, Method: notificationMethod, + Params: jsonSubscription{Subscription: subid, Result: event}} +} + +func (c *ServerTestCodec) Write(msg interface{}) error { + if len(c.output) == 0 { // only capture first response + if o, err := json.Marshal(msg); err != nil { + return err + } else { + c.output = string(o) + } + } + + return nil +} + +func (c *ServerTestCodec) Close() { + close(c.closer) +} + +func (c *ServerTestCodec) Closed() <-chan interface{} { + return c.closer +} + +func TestServerMethodExecution(t *testing.T) { + server := NewServer() + service := new(Service) + + if err := server.RegisterName("test", service); err != nil { + t.Fatalf("%v", err) + } + + id := int64(12345) + req := jsonRequest{ + Method: "echo", + Version: "2.0", + Id: &id, + } + args := []interface{}{"string arg", 1122, &Args{"qwerty"}} + req.Payload, _ = json.Marshal(&args) + + input, _ := json.Marshal(&req) + codec := &ServerTestCodec{input: input, closer: make(chan interface{})} + go server.ServeCodec(codec) + + <-codec.closer + + expected := `{"jsonrpc":"2.0","id":12345,"result":{"String":"string arg","Int":1122,"Args":{"S":"qwerty"}}}` + + if expected != codec.output { + t.Fatalf("expected %s, got %s\n", expected, codec.output) + } +} diff --git a/rpc/v2/types.go b/rpc/v2/types.go new file mode 100644 index 000000000..d538e0a3f --- /dev/null +++ b/rpc/v2/types.go @@ -0,0 +1,352 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package v2 + +import ( + "fmt" + "math" + "math/big" + "reflect" + "strings" + + "sync" + + "github.com/ethereum/go-ethereum/event" +) + +// API describes the set of methods offered over the RPC interface +type API struct { + Namespace string // namespace under which the rpc methods of Service are exposed + Version string // api version for DApp's + Service interface{} // receiver instance which holds the methods + Public bool // indication if the methods must be considered safe for public use +} + +// callback is a method callback which was registered in the server +type callback struct { + rcvr reflect.Value // receiver of method + method reflect.Method // callback + argTypes []reflect.Type // input argument types + errPos int // err return idx, of -1 when method cannot return error + isSubscribe bool // indication if the callback is a subscription +} + +// service represents a registered object +type service struct { + name string // name for service + rcvr reflect.Value // receiver of methods for the service + typ reflect.Type // receiver type + callbacks callbacks // registered handlers + subscriptions subscriptions // available subscriptions/notifications +} + +// serverRequest is an incoming request +type serverRequest struct { + id int64 + svcname string + rcvr reflect.Value + callb *callback + args []reflect.Value + isUnsubscribe bool + err RPCError +} + +type serviceRegistry map[string]*service // collection of services +type callbacks map[string]*callback // collection of RPC callbacks +type subscriptions map[string]*callback // collection of subscription callbacks +type subscriptionRegistry map[string]Subscription // collection of subscriptions + +// Server represents a RPC server +type Server struct { + services serviceRegistry + muSubcriptions sync.Mutex // protects subscriptions + subscriptions subscriptionRegistry +} + +// rpcRequest represents a raw incoming RPC request +type rpcRequest struct { + service string + method string + id int64 + isPubSub bool + params interface{} +} + +// RPCError implements RPC error, is add support for error codec over regular go errors +type RPCError interface { + // RPC error code + Code() int + // Error message + Error() string +} + +// ServerCodec implements reading, parsing and writing RPC messages for the server side of +// a RPC session. Implementations must be go-routine safe since the codec can be called in +// multiple go-routines concurrently. +type ServerCodec interface { + // Read next request + ReadRequestHeaders() ([]rpcRequest, bool, RPCError) + // Parse request argument to the given types + ParseRequestArguments([]reflect.Type, interface{}) ([]reflect.Value, RPCError) + // Assemble success response + CreateResponse(int64, interface{}) interface{} + // Assemble error response + CreateErrorResponse(*int64, RPCError) interface{} + // Assemble error response with extra information about the error through info + CreateErrorResponseWithInfo(id *int64, err RPCError, info interface{}) interface{} + // Create notification response + CreateNotification(string, interface{}) interface{} + // Write msg to client. + Write(interface{}) error + // Close underlying data stream + Close() + // Closed when underlying connection is closed + Closed() <-chan interface{} +} + +// SubscriptionMatcher returns true if the given value matches the criteria specified by the user +type SubscriptionMatcher func(interface{}) bool + +// SubscriptionOutputFormat accepts event data and has the ability to format the data before it is send to the client +type SubscriptionOutputFormat func(interface{}) interface{} + +// defaultSubscriptionOutputFormatter returns data and is used as default output format for notifications +func defaultSubscriptionOutputFormatter(data interface{}) interface{} { + return data +} + +// Subscription is used by the server to send notifications to the client +type Subscription struct { + sub event.Subscription + match SubscriptionMatcher + format SubscriptionOutputFormat +} + +// NewSubscription create a new RPC subscription +func NewSubscription(sub event.Subscription) Subscription { + return Subscription{sub, nil, defaultSubscriptionOutputFormatter} +} + +// NewSubscriptionWithOutputFormat create a new RPC subscription which a custom notification output format +func NewSubscriptionWithOutputFormat(sub event.Subscription, formatter SubscriptionOutputFormat) Subscription { + return Subscription{sub, nil, formatter} +} + +// NewSubscriptionFiltered will create a new subscription. For each raised event the given matcher is +// called. If it returns true the event is send as notification to the client, otherwise it is ignored. +func NewSubscriptionFiltered(sub event.Subscription, match SubscriptionMatcher) Subscription { + return Subscription{sub, match, defaultSubscriptionOutputFormatter} +} + +// Chan returns the channel where new events will be published. It's up the user to call the matcher to +// determine if the events are interesting for the client. +func (s *Subscription) Chan() <-chan *event.Event { + return s.sub.Chan() +} + +// Unsubscribe will end the subscription and closes the event channel +func (s *Subscription) Unsubscribe() { + s.sub.Unsubscribe() +} + +// HexNumber serializes a number to hex format using the "%#x" format +type HexNumber big.Int + +// NewHexNumber creates a new hex number instance which will serialize the given val with `%#x` on marshal. +func NewHexNumber(val interface{}) *HexNumber { + if val == nil { + return nil + } + + if v, ok := val.(*big.Int); ok && v != nil { + hn := new(big.Int).Set(v) + return (*HexNumber)(hn) + } + + rval := reflect.ValueOf(val) + + var unsigned uint64 + utype := reflect.TypeOf(unsigned) + if t := rval.Type(); t.ConvertibleTo(utype) { + hn := new(big.Int).SetUint64(rval.Convert(utype).Uint()) + return (*HexNumber)(hn) + } + + var signed int64 + stype := reflect.TypeOf(signed) + if t := rval.Type(); t.ConvertibleTo(stype) { + hn := new(big.Int).SetInt64(rval.Convert(stype).Int()) + return (*HexNumber)(hn) + } + + return nil +} + +func (h *HexNumber) UnmarshalJSON(input []byte) error { + length := len(input) + if length >= 2 && input[0] == '"' && input[length-1] == '"' { + input = input[1 : length-1] + } + + hn := (*big.Int)(h) + if _, ok := hn.SetString(string(input), 0); ok { + return nil + } + + return fmt.Errorf("Unable to parse number") +} + +// MarshalJSON serialize the hex number instance to a hex representation. +func (h *HexNumber) MarshalJSON() ([]byte, error) { + if h != nil { + hn := (*big.Int)(h) + if hn.BitLen() == 0 { + return []byte(`"0x0"`), nil + } + return []byte(fmt.Sprintf(`"0x%x"`, hn)), nil + } + return nil, nil +} + +func (h *HexNumber) Int() int { + hn := (*big.Int)(h) + return int(hn.Int64()) +} + +func (h *HexNumber) Int64() int64 { + hn := (*big.Int)(h) + return hn.Int64() +} + +func (h *HexNumber) Uint() uint { + hn := (*big.Int)(h) + return uint(hn.Uint64()) +} + +func (h *HexNumber) Uint64() uint64 { + hn := (*big.Int)(h) + return hn.Uint64() +} + +func (h *HexNumber) BigInt() *big.Int { + return (*big.Int)(h) +} + +type Number int64 + +func (n *Number) UnmarshalJSON(data []byte) error { + input := strings.TrimSpace(string(data)) + + if len(input) >= 2 && input[0] == '"' && input[len(input)-1] == '"' { + input = input[1 : len(input)-1] + } + + if len(input) == 0 { + *n = Number(latestBlockNumber.Int64()) + return nil + } + + in := new(big.Int) + _, ok := in.SetString(input, 0) + + if !ok { // test if user supplied string tag + return fmt.Errorf(`invalid number %s`, data) + } + + if in.Cmp(earliestBlockNumber) >= 0 && in.Cmp(maxBlockNumber) <= 0 { + *n = Number(in.Int64()) + return nil + } + + return fmt.Errorf("blocknumber not in range [%d, %d]", earliestBlockNumber, maxBlockNumber) +} + +func (n *Number) Int64() int64 { + return *(*int64)(n) +} + +func (n *Number) BigInt() *big.Int { + return big.NewInt(n.Int64()) +} + +var ( + pendingBlockNumber = big.NewInt(-2) + latestBlockNumber = big.NewInt(-1) + earliestBlockNumber = big.NewInt(0) + maxBlockNumber = big.NewInt(math.MaxInt64) +) + +type BlockNumber int64 + +const ( + PendingBlockNumber = BlockNumber(-2) + LatestBlockNumber = BlockNumber(-1) +) + +// UnmarshalJSON parses the given JSON fragement into a BlockNumber. It supports: +// - "latest" or "earliest" as string arguments +// - the block number +// Returned errors: +// - an unsupported error when "pending" is specified (not yet implemented) +// - an invalid block number error when the given argument isn't a known strings +// - an out of range error when the given block number is either too little or too large +func (bn *BlockNumber) UnmarshalJSON(data []byte) error { + input := strings.TrimSpace(string(data)) + + if len(input) >= 2 && input[0] == '"' && input[len(input)-1] == '"' { + input = input[1 : len(input)-1] + } + + if len(input) == 0 { + *bn = BlockNumber(latestBlockNumber.Int64()) + return nil + } + + in := new(big.Int) + _, ok := in.SetString(input, 0) + + if !ok { // test if user supplied string tag + strBlockNumber := input + if strBlockNumber == "latest" { + *bn = BlockNumber(latestBlockNumber.Int64()) + return nil + } + + if strBlockNumber == "earliest" { + *bn = BlockNumber(earliestBlockNumber.Int64()) + return nil + } + + if strBlockNumber == "pending" { + *bn = BlockNumber(pendingBlockNumber.Int64()) + return nil + } + + return fmt.Errorf(`invalid blocknumber %s`, data) + } + + if in.Cmp(earliestBlockNumber) >= 0 && in.Cmp(maxBlockNumber) <= 0 { + *bn = BlockNumber(in.Int64()) + return nil + } + + return fmt.Errorf("blocknumber not in range [%d, %d]", earliestBlockNumber, maxBlockNumber) +} + +func (bn *BlockNumber) Int64() int64 { + return (int64)(*bn) +} diff --git a/rpc/v2/types_test.go b/rpc/v2/types_test.go new file mode 100644 index 000000000..f73a2369e --- /dev/null +++ b/rpc/v2/types_test.go @@ -0,0 +1,57 @@ +package v2 + +import ( + "bytes" + "encoding/json" + "math/big" + "testing" +) + +func TestNewHexNumber(t *testing.T) { + tests := []interface{}{big.NewInt(123), int64(123), uint64(123), int8(123), uint8(123)} + + for i, v := range tests { + hn := NewHexNumber(v) + if hn == nil { + t.Fatalf("Unable to create hex number instance for tests[%d]", i) + } + if hn.Int64() != 123 { + t.Fatalf("expected %d, got %d on value tests[%d]", 123, hn.Int64(), i) + } + } + + failures := []interface{}{"", nil, []byte{1, 2, 3, 4}} + for i, v := range failures { + hn := NewHexNumber(v) + if hn != nil { + t.Fatalf("Creating a nex number instance of %T should fail (failures[%d])", failures[i], i) + } + } +} + +func TestHexNumberUnmarshalJSON(t *testing.T) { + tests := []string{`"0x4d2"`, "1234", `"1234"`} + for i, v := range tests { + var hn HexNumber + if err := json.Unmarshal([]byte(v), &hn); err != nil { + t.Fatalf("Test %d failed - %s", i, err) + } + + if hn.Int64() != 1234 { + t.Fatalf("Expected %d, got %d for test[%d]", 1234, hn.Int64(), i) + } + } +} + +func TestHexNumberMarshalJSON(t *testing.T) { + hn := NewHexNumber(1234567890) + got, err := json.Marshal(hn) + if err != nil { + t.Fatalf("Unable to marshal hex number - %s", err) + } + + exp := []byte(`"0x499602d2"`) + if bytes.Compare(exp, got) != 0 { + t.Fatalf("Invalid json.Marshal, expected '%s', got '%s'", exp, got) + } +} diff --git a/rpc/v2/utils.go b/rpc/v2/utils.go new file mode 100644 index 000000000..a564b2473 --- /dev/null +++ b/rpc/v2/utils.go @@ -0,0 +1,205 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package v2 + +import ( + "crypto/rand" + "encoding/hex" + "errors" + "math/big" + "reflect" + "unicode" + "unicode/utf8" +) + +// Is this an exported - upper case - name? +func isExported(name string) bool { + rune, _ := utf8.DecodeRuneInString(name) + return unicode.IsUpper(rune) +} + +// Is this type exported or a builtin? +func isExportedOrBuiltinType(t reflect.Type) bool { + for t.Kind() == reflect.Ptr { + t = t.Elem() + } + // PkgPath will be non-empty even for an exported type, + // so we need to check the type name as well. + return isExported(t.Name()) || t.PkgPath() == "" +} + +var errorType = reflect.TypeOf((*error)(nil)).Elem() + +// Implements this type the error interface +func isErrorType(t reflect.Type) bool { + for t.Kind() == reflect.Ptr { + t = t.Elem() + } + return t.Implements(errorType) +} + +var subscriptionType = reflect.TypeOf((*Subscription)(nil)).Elem() + +func isSubscriptionType(t reflect.Type) bool { + for t.Kind() == reflect.Ptr { + t = t.Elem() + } + return t == subscriptionType +} + +// isPubSub tests whether the given method return the pair (v2.Subscription, error) +func isPubSub(methodType reflect.Type) bool { + if methodType.NumOut() != 2 { + return false + } + return isSubscriptionType(methodType.Out(0)) && isErrorType(methodType.Out(1)) +} + +// formatName will convert to first character to lower case +func formatName(name string) string { + ret := []rune(name) + if len(ret) > 0 { + ret[0] = unicode.ToLower(ret[0]) + } + return string(ret) +} + +var bigIntType = reflect.TypeOf((*big.Int)(nil)).Elem() + +// Indication if this type should be serialized in hex +func isHexNum(t reflect.Type) bool { + if t == nil { + return false + } + for t.Kind() == reflect.Ptr { + t = t.Elem() + } + + return t == bigIntType +} + +var blockNumberType = reflect.TypeOf((*BlockNumber)(nil)).Elem() + +// Indication if the given block is a BlockNumber +func isBlockNumber(t reflect.Type) bool { + if t == nil { + return false + } + + for t.Kind() == reflect.Ptr { + t = t.Elem() + } + + return t == blockNumberType +} + +// suitableCallbacks iterates over the methods of the given type. It will determine if a method satisfies the criteria +// for a RPC callback or a subscription callback and adds it to the collection of callbacks or subscriptions. See server +// documentation for a summary of these criteria. +func suitableCallbacks(rcvr reflect.Value, typ reflect.Type) (callbacks, subscriptions) { + callbacks := make(callbacks) + subscriptions := make(subscriptions) + +METHODS: + for m := 0; m < typ.NumMethod(); m++ { + method := typ.Method(m) + mtype := method.Type + mname := formatName(method.Name) + if method.PkgPath != "" { // method must be exported + continue + } + + var h callback + h.isSubscribe = isPubSub(mtype) + h.rcvr = rcvr + h.method = method + h.errPos = -1 + + if h.isSubscribe { + h.argTypes = make([]reflect.Type, mtype.NumIn()-1) // skip rcvr type + for i := 1; i < mtype.NumIn(); i++ { + argType := mtype.In(i) + if isExportedOrBuiltinType(argType) { + h.argTypes[i-1] = argType + } else { + continue METHODS + } + } + + subscriptions[mname] = &h + continue METHODS + } + + numIn := mtype.NumIn() + + // determine method arguments, ignore first arg since it's the receiver type + // Arguments must be exported or builtin types + h.argTypes = make([]reflect.Type, numIn-1) + for i := 1; i < numIn; i++ { + argType := mtype.In(i) + if !isExportedOrBuiltinType(argType) { + continue METHODS + } + h.argTypes[i-1] = argType + } + + // check that all returned values are exported or builtin types + for i := 0; i < mtype.NumOut(); i++ { + if !isExportedOrBuiltinType(mtype.Out(i)) { + continue METHODS + } + } + + // when a method returns an error it must be the last returned value + h.errPos = -1 + for i := 0; i < mtype.NumOut(); i++ { + if isErrorType(mtype.Out(i)) { + h.errPos = i + break + } + } + + if h.errPos >= 0 && h.errPos != mtype.NumOut()-1 { + continue METHODS + } + + switch mtype.NumOut() { + case 0, 1: + break + case 2: + if h.errPos == -1 { // method must one return value and 1 error + continue METHODS + } + break + default: + continue METHODS + } + + callbacks[mname] = &h + } + + return callbacks, subscriptions +} + +func newSubscriptionId() (string, error) { + var subid [16]byte + n, _ := rand.Read(subid[:]) + if n != 16 { + return "", errors.New("Unable to generate subscription id") + } + return "0x" + hex.EncodeToString(subid[:]), nil +} |