aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbas-vk <bas-vk@users.noreply.github.com>2016-07-25 16:07:05 +0800
committerGitHub <noreply@github.com>2016-07-25 16:07:05 +0800
commit771655e3fee585ce4bc47dfaa279557c6c1c2421 (patch)
tree9071e157a54c40f06f0e5895643c82ca9a0b037a
parent60cd5bf9397bd8331bce3bb1884524d43c31dbb5 (diff)
parent91b769042857f542b2792b23ec407e1c9bd4fe8d (diff)
downloadgo-tangerine-771655e3fee585ce4bc47dfaa279557c6c1c2421.tar
go-tangerine-771655e3fee585ce4bc47dfaa279557c6c1c2421.tar.gz
go-tangerine-771655e3fee585ce4bc47dfaa279557c6c1c2421.tar.bz2
go-tangerine-771655e3fee585ce4bc47dfaa279557c6c1c2421.tar.lz
go-tangerine-771655e3fee585ce4bc47dfaa279557c6c1c2421.tar.xz
go-tangerine-771655e3fee585ce4bc47dfaa279557c6c1c2421.tar.zst
go-tangerine-771655e3fee585ce4bc47dfaa279557c6c1c2421.zip
Merge pull request #2808 from fjl/rpc-client-3
rpc: add new client, use it everywhere
-rw-r--r--accounts/abi/bind/backends/remote.go160
-rw-r--r--cmd/geth/consolecmd.go19
-rw-r--r--cmd/geth/monitorcmd.go39
-rw-r--r--cmd/utils/client.go55
-rw-r--r--console/bridge.go174
-rw-r--r--console/console.go4
-rw-r--r--internal/jsre/pretty.go2
-rw-r--r--node/node.go6
-rw-r--r--node/node_test.go34
-rw-r--r--rpc/client.go740
-rw-r--r--rpc/client_context_go1.4.go60
-rw-r--r--rpc/client_context_go1.5.go61
-rw-r--r--rpc/client_context_go1.6.go55
-rw-r--r--rpc/client_context_go1.7.go51
-rw-r--r--rpc/client_example_test.go83
-rw-r--r--rpc/client_test.go489
-rw-r--r--rpc/errors.go63
-rw-r--r--rpc/http.go163
-rw-r--r--rpc/inproc.go49
-rw-r--r--rpc/ipc.go79
-rw-r--r--rpc/ipc_unix.go6
-rw-r--r--rpc/ipc_windows.go15
-rw-r--r--rpc/json.go77
-rw-r--r--rpc/notification.go2
-rw-r--r--rpc/notification_test.go45
-rw-r--r--rpc/server.go24
-rw-r--r--rpc/server_test.go14
-rw-r--r--rpc/types.go32
-rw-r--r--rpc/utils.go29
-rw-r--r--rpc/websocket.go160
30 files changed, 2021 insertions, 769 deletions
diff --git a/accounts/abi/bind/backends/remote.go b/accounts/abi/bind/backends/remote.go
index 4793143e4..58edd791a 100644
--- a/accounts/abi/bind/backends/remote.go
+++ b/accounts/abi/bind/backends/remote.go
@@ -17,11 +17,7 @@
package backends
import (
- "encoding/json"
- "fmt"
"math/big"
- "sync"
- "sync/atomic"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
@@ -37,119 +33,34 @@ var _ bind.ContractBackend = (*rpcBackend)(nil)
// rpcBackend implements bind.ContractBackend, and acts as the data provider to
// Ethereum contracts bound to Go structs. It uses an RPC connection to delegate
// all its functionality.
-//
-// Note: The current implementation is a blocking one. This should be replaced
-// by a proper async version when a real RPC client is created.
type rpcBackend struct {
- client rpc.Client // RPC client connection to interact with an API server
- autoid uint32 // ID number to use for the next API request
- lock sync.Mutex // Singleton access until we get to request multiplexing
+ client *rpc.Client // RPC client connection to interact with an API server
}
// NewRPCBackend creates a new binding backend to an RPC provider that can be
// used to interact with remote contracts.
-func NewRPCBackend(client rpc.Client) bind.ContractBackend {
- return &rpcBackend{
- client: client,
- }
-}
-
-// request is a JSON RPC request package assembled internally from the client
-// method calls.
-type request struct {
- JSONRPC string `json:"jsonrpc"` // Version of the JSON RPC protocol, always set to 2.0
- ID int `json:"id"` // Auto incrementing ID number for this request
- Method string `json:"method"` // Remote procedure name to invoke on the server
- Params []interface{} `json:"params"` // List of parameters to pass through (keep types simple)
-}
-
-// response is a JSON RPC response package sent back from the API server.
-type response struct {
- JSONRPC string `json:"jsonrpc"` // Version of the JSON RPC protocol, always set to 2.0
- ID int `json:"id"` // Auto incrementing ID number for this request
- Error *failure `json:"error"` // Any error returned by the remote side
- Result json.RawMessage `json:"result"` // Whatever the remote side sends us in reply
-}
-
-// failure is a JSON RPC response error field sent back from the API server.
-type failure struct {
- Code int `json:"code"` // JSON RPC error code associated with the failure
- Message string `json:"message"` // Specific error message of the failure
-}
-
-// request forwards an API request to the RPC server, and parses the response.
-//
-// This is currently painfully non-concurrent, but it will have to do until we
-// find the time for niceties like this :P
-func (b *rpcBackend) request(ctx context.Context, method string, params []interface{}) (json.RawMessage, error) {
- b.lock.Lock()
- defer b.lock.Unlock()
-
- if ctx == nil {
- ctx = context.Background()
- }
-
- // Ugly hack to serialize an empty list properly
- if params == nil {
- params = []interface{}{}
- }
- // Assemble the request object
- reqID := int(atomic.AddUint32(&b.autoid, 1))
- req := &request{
- JSONRPC: "2.0",
- ID: reqID,
- Method: method,
- Params: params,
- }
- if err := b.client.Send(req); err != nil {
- return nil, err
- }
- res := new(response)
- errc := make(chan error, 1)
- go func() {
- errc <- b.client.Recv(res)
- }()
- select {
- case err := <-errc:
- if err != nil {
- return nil, err
- }
- case <-ctx.Done():
- return nil, ctx.Err()
- }
- if res.Error != nil {
- if res.Error.Message == bind.ErrNoCode.Error() {
- return nil, bind.ErrNoCode
- }
- return nil, fmt.Errorf("remote error: %s", res.Error.Message)
- }
- return res.Result, nil
+func NewRPCBackend(client *rpc.Client) bind.ContractBackend {
+ return &rpcBackend{client: client}
}
// HasCode implements ContractVerifier.HasCode by retrieving any code associated
// with the contract from the remote node, and checking its size.
func (b *rpcBackend) HasCode(ctx context.Context, contract common.Address, pending bool) (bool, error) {
- // Execute the RPC code retrieval
block := "latest"
if pending {
block = "pending"
}
- res, err := b.request(ctx, "eth_getCode", []interface{}{contract.Hex(), block})
- if err != nil {
- return false, err
- }
var hex string
- if err := json.Unmarshal(res, &hex); err != nil {
+ err := b.client.CallContext(ctx, &hex, "eth_getCode", contract, block)
+ if err != nil {
return false, err
}
- // Convert the response back to a Go byte slice and return
return len(common.FromHex(hex)) > 0, nil
}
// ContractCall implements ContractCaller.ContractCall, delegating the execution of
// a contract call to the remote node, returning the reply to for local processing.
func (b *rpcBackend) ContractCall(ctx context.Context, contract common.Address, data []byte, pending bool) ([]byte, error) {
- // Pack up the request into an RPC argument
args := struct {
To common.Address `json:"to"`
Data string `json:"data"`
@@ -157,63 +68,43 @@ func (b *rpcBackend) ContractCall(ctx context.Context, contract common.Address,
To: contract,
Data: common.ToHex(data),
}
- // Execute the RPC call and retrieve the response
block := "latest"
if pending {
block = "pending"
}
- res, err := b.request(ctx, "eth_call", []interface{}{args, block})
- if err != nil {
- return nil, err
- }
var hex string
- if err := json.Unmarshal(res, &hex); err != nil {
+ err := b.client.CallContext(ctx, &hex, "eth_call", args, block)
+ if err != nil {
return nil, err
}
- // Convert the response back to a Go byte slice and return
return common.FromHex(hex), nil
+
}
// PendingAccountNonce implements ContractTransactor.PendingAccountNonce, delegating
// the current account nonce retrieval to the remote node.
func (b *rpcBackend) PendingAccountNonce(ctx context.Context, account common.Address) (uint64, error) {
- res, err := b.request(ctx, "eth_getTransactionCount", []interface{}{account.Hex(), "pending"})
+ var hex rpc.HexNumber
+ err := b.client.CallContext(ctx, &hex, "eth_getTransactionCount", account.Hex(), "pending")
if err != nil {
return 0, err
}
- var hex string
- if err := json.Unmarshal(res, &hex); err != nil {
- return 0, err
- }
- nonce, ok := new(big.Int).SetString(hex, 0)
- if !ok {
- return 0, fmt.Errorf("invalid nonce hex: %s", hex)
- }
- return nonce.Uint64(), nil
+ return hex.Uint64(), nil
}
// SuggestGasPrice implements ContractTransactor.SuggestGasPrice, delegating the
// gas price oracle request to the remote node.
func (b *rpcBackend) SuggestGasPrice(ctx context.Context) (*big.Int, error) {
- res, err := b.request(ctx, "eth_gasPrice", nil)
- if err != nil {
- return nil, err
- }
- var hex string
- if err := json.Unmarshal(res, &hex); err != nil {
+ var hex rpc.HexNumber
+ if err := b.client.CallContext(ctx, &hex, "eth_gasPrice"); err != nil {
return nil, err
}
- price, ok := new(big.Int).SetString(hex, 0)
- if !ok {
- return nil, fmt.Errorf("invalid price hex: %s", hex)
- }
- return price, nil
+ return (*big.Int)(&hex), nil
}
// EstimateGasLimit implements ContractTransactor.EstimateGasLimit, delegating
// the gas estimation to the remote node.
func (b *rpcBackend) EstimateGasLimit(ctx context.Context, sender common.Address, contract *common.Address, value *big.Int, data []byte) (*big.Int, error) {
- // Pack up the request into an RPC argument
args := struct {
From common.Address `json:"from"`
To *common.Address `json:"to"`
@@ -226,19 +117,12 @@ func (b *rpcBackend) EstimateGasLimit(ctx context.Context, sender common.Address
Value: rpc.NewHexNumber(value),
}
// Execute the RPC call and retrieve the response
- res, err := b.request(ctx, "eth_estimateGas", []interface{}{args})
+ var hex rpc.HexNumber
+ err := b.client.CallContext(ctx, &hex, "eth_estimateGas", args)
if err != nil {
return nil, err
}
- var hex string
- if err := json.Unmarshal(res, &hex); err != nil {
- return nil, err
- }
- estimate, ok := new(big.Int).SetString(hex, 0)
- if !ok {
- return nil, fmt.Errorf("invalid estimate hex: %s", hex)
- }
- return estimate, nil
+ return (*big.Int)(&hex), nil
}
// SendTransaction implements ContractTransactor.SendTransaction, delegating the
@@ -248,13 +132,5 @@ func (b *rpcBackend) SendTransaction(ctx context.Context, tx *types.Transaction)
if err != nil {
return err
}
- res, err := b.request(ctx, "eth_sendRawTransaction", []interface{}{common.ToHex(data)})
- if err != nil {
- return err
- }
- var hex string
- if err := json.Unmarshal(res, &hex); err != nil {
- return err
- }
- return nil
+ return b.client.CallContext(ctx, nil, "eth_sendRawTransaction", common.ToHex(data))
}
diff --git a/cmd/geth/consolecmd.go b/cmd/geth/consolecmd.go
index 257050a62..8d53809ce 100644
--- a/cmd/geth/consolecmd.go
+++ b/cmd/geth/consolecmd.go
@@ -19,9 +19,12 @@ package main
import (
"os"
"os/signal"
+ "strings"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/console"
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/rpc"
"gopkg.in/urfave/cli.v1"
)
@@ -99,7 +102,7 @@ func localConsole(ctx *cli.Context) error {
// console to it.
func remoteConsole(ctx *cli.Context) error {
// Attach to a remotely running geth instance and start the JavaScript console
- client, err := utils.NewRemoteRPCClient(ctx)
+ client, err := dialRPC(ctx.Args().First())
if err != nil {
utils.Fatalf("Unable to attach to remote geth: %v", err)
}
@@ -127,6 +130,20 @@ func remoteConsole(ctx *cli.Context) error {
return nil
}
+// dialRPC returns a RPC client which connects to the given endpoint.
+// The check for empty endpoint implements the defaulting logic
+// for "geth attach" and "geth monitor" with no argument.
+func dialRPC(endpoint string) (*rpc.Client, error) {
+ if endpoint == "" {
+ endpoint = node.DefaultIPCEndpoint()
+ } else if strings.HasPrefix(endpoint, "rpc:") || strings.HasPrefix(endpoint, "ipc:") {
+ // Backwards compatibility with geth < 1.5 which required
+ // these prefixes.
+ endpoint = endpoint[4:]
+ }
+ return rpc.Dial(endpoint)
+}
+
// ephemeralConsole starts a new geth node, attaches an ephemeral JavaScript
// console to it, and each of the files specified as arguments and tears the
// everything down.
diff --git a/cmd/geth/monitorcmd.go b/cmd/geth/monitorcmd.go
index 11fdca89c..d1490dce2 100644
--- a/cmd/geth/monitorcmd.go
+++ b/cmd/geth/monitorcmd.go
@@ -21,11 +21,10 @@ import (
"math"
"reflect"
"runtime"
+ "sort"
"strings"
"time"
- "sort"
-
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"
@@ -36,7 +35,7 @@ import (
var (
monitorCommandAttachFlag = cli.StringFlag{
Name: "attach",
- Value: "ipc:" + node.DefaultIPCEndpoint(),
+ Value: node.DefaultIPCEndpoint(),
Usage: "API endpoint to attach to",
}
monitorCommandRowsFlag = cli.IntFlag{
@@ -69,12 +68,12 @@ to display multiple metrics simultaneously.
// monitor starts a terminal UI based monitoring tool for the requested metrics.
func monitor(ctx *cli.Context) error {
var (
- client rpc.Client
+ client *rpc.Client
err error
)
// Attach to an Ethereum node over IPC or RPC
endpoint := ctx.String(monitorCommandAttachFlag.Name)
- if client, err = utils.NewRemoteRPCClientFromString(endpoint); err != nil {
+ if client, err = dialRPC(endpoint); err != nil {
utils.Fatalf("Unable to attach to geth node: %v", err)
}
defer client.Close()
@@ -159,30 +158,10 @@ func monitor(ctx *cli.Context) error {
// retrieveMetrics contacts the attached geth node and retrieves the entire set
// of collected system metrics.
-func retrieveMetrics(client rpc.Client) (map[string]interface{}, error) {
- req := map[string]interface{}{
- "id": new(int64),
- "method": "debug_metrics",
- "jsonrpc": "2.0",
- "params": []interface{}{true},
- }
-
- if err := client.Send(req); err != nil {
- return nil, err
- }
-
- var res rpc.JSONSuccessResponse
- if err := client.Recv(&res); err != nil {
- return nil, err
- }
-
- if res.Result != nil {
- if mets, ok := res.Result.(map[string]interface{}); ok {
- return mets, nil
- }
- }
-
- return nil, fmt.Errorf("unable to retrieve metrics")
+func retrieveMetrics(client *rpc.Client) (map[string]interface{}, error) {
+ var metrics map[string]interface{}
+ err := client.Call(&metrics, "debug_metrics", true)
+ return metrics, err
}
// resolveMetrics takes a list of input metric patterns, and resolves each to one
@@ -270,7 +249,7 @@ func fetchMetric(metrics map[string]interface{}, metric string) float64 {
// refreshCharts retrieves a next batch of metrics, and inserts all the new
// values into the active datasets and charts
-func refreshCharts(client rpc.Client, metrics []string, data [][]float64, units []int, charts []*termui.LineChart, ctx *cli.Context, footer *termui.Par) (realign bool) {
+func refreshCharts(client *rpc.Client, metrics []string, data [][]float64, units []int, charts []*termui.LineChart, ctx *cli.Context, footer *termui.Par) (realign bool) {
values, err := retrieveMetrics(client)
for i, metric := range metrics {
if len(data) < 512 {
diff --git a/cmd/utils/client.go b/cmd/utils/client.go
deleted file mode 100644
index cc9647580..000000000
--- a/cmd/utils/client.go
+++ /dev/null
@@ -1,55 +0,0 @@
-// Copyright 2015 The go-ethereum Authors
-// This file is part of go-ethereum.
-//
-// go-ethereum is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// go-ethereum is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-//
-// You should have received a copy of the GNU General Public License
-// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
-
-package utils
-
-import (
- "fmt"
- "strings"
-
- "github.com/ethereum/go-ethereum/node"
- "github.com/ethereum/go-ethereum/rpc"
- "gopkg.in/urfave/cli.v1"
-)
-
-// NewRemoteRPCClient returns a RPC client which connects to a running geth instance.
-// Depending on the given context this can either be a IPC or a HTTP client.
-func NewRemoteRPCClient(ctx *cli.Context) (rpc.Client, error) {
- if ctx.Args().Present() {
- endpoint := ctx.Args().First()
- return NewRemoteRPCClientFromString(endpoint)
- }
- // use IPC by default
- return rpc.NewIPCClient(node.DefaultIPCEndpoint())
-}
-
-// NewRemoteRPCClientFromString returns a RPC client which connects to the given
-// endpoint. It must start with either `ipc:` or `rpc:` (HTTP).
-func NewRemoteRPCClientFromString(endpoint string) (rpc.Client, error) {
- if strings.HasPrefix(endpoint, "ipc:") {
- return rpc.NewIPCClient(endpoint[4:])
- }
- if strings.HasPrefix(endpoint, "rpc:") {
- return rpc.NewHTTPClient(endpoint[4:])
- }
- if strings.HasPrefix(endpoint, "http://") {
- return rpc.NewHTTPClient(endpoint)
- }
- if strings.HasPrefix(endpoint, "ws:") {
- return rpc.NewWSClient(endpoint)
- }
- return nil, fmt.Errorf("invalid endpoint")
-}
diff --git a/console/bridge.go b/console/bridge.go
index b23e06837..06cb41d80 100644
--- a/console/bridge.go
+++ b/console/bridge.go
@@ -31,13 +31,13 @@ import (
// bridge is a collection of JavaScript utility methods to bride the .js runtime
// environment and the Go RPC connection backing the remote method calls.
type bridge struct {
- client rpc.Client // RPC client to execute Ethereum requests through
+ client *rpc.Client // RPC client to execute Ethereum requests through
prompter UserPrompter // Input prompter to allow interactive user feedback
printer io.Writer // Output writer to serialize any display strings to
}
// newBridge creates a new JavaScript wrapper around an RPC client.
-func newBridge(client rpc.Client, prompter UserPrompter, printer io.Writer) *bridge {
+func newBridge(client *rpc.Client, prompter UserPrompter, printer io.Writer) *bridge {
return &bridge{
client: client,
prompter: prompter,
@@ -188,88 +188,86 @@ func (b *bridge) SleepBlocks(call otto.FunctionCall) (response otto.Value) {
return otto.FalseValue()
}
-// Send will serialize the first argument, send it to the node and returns the response.
+type jsonrpcCall struct {
+ Id int64
+ Method string
+ Params []interface{}
+}
+
+// Send implements the web3 provider "send" method.
func (b *bridge) Send(call otto.FunctionCall) (response otto.Value) {
- // Ensure that we've got a batch request (array) or a single request (object)
- arg := call.Argument(0).Object()
- if arg == nil || (arg.Class() != "Array" && arg.Class() != "Object") {
- throwJSException("request must be an object or array")
- }
- // Convert the otto VM arguments to Go values
- data, err := call.Otto.Call("JSON.stringify", nil, arg)
+ // Remarshal the request into a Go value.
+ JSON, _ := call.Otto.Object("JSON")
+ reqVal, err := JSON.Call("stringify", call.Argument(0))
if err != nil {
throwJSException(err.Error())
}
- reqjson, err := data.ToString()
- if err != nil {
- throwJSException(err.Error())
- }
-
var (
- reqs []rpc.JSONRequest
- batch = true
+ rawReq = []byte(reqVal.String())
+ reqs []jsonrpcCall
+ batch bool
)
- if err = json.Unmarshal([]byte(reqjson), &reqs); err != nil {
- // single request?
- reqs = make([]rpc.JSONRequest, 1)
- if err = json.Unmarshal([]byte(reqjson), &reqs[0]); err != nil {
- throwJSException("invalid request")
- }
+ if rawReq[0] == '[' {
+ batch = true
+ json.Unmarshal(rawReq, &reqs)
+ } else {
batch = false
+ reqs = make([]jsonrpcCall, 1)
+ json.Unmarshal(rawReq, &reqs[0])
}
- // Iteratively execute the requests
- call.Otto.Set("response_len", len(reqs))
- call.Otto.Run("var ret_response = new Array(response_len);")
- for i, req := range reqs {
- // Execute the RPC request and parse the reply
- if err = b.client.Send(&req); err != nil {
- return newErrorResponse(call, -32603, err.Error(), req.Id)
- }
- result := make(map[string]interface{})
- if err = b.client.Recv(&result); err != nil {
- return newErrorResponse(call, -32603, err.Error(), req.Id)
+ // Execute the requests.
+ resps, _ := call.Otto.Object("new Array()")
+ for _, req := range reqs {
+ resp, _ := call.Otto.Object(`({"jsonrpc":"2.0"})`)
+ resp.Set("id", req.Id)
+ var result json.RawMessage
+ err = b.client.Call(&result, req.Method, req.Params...)
+ switch err := err.(type) {
+ case nil:
+ if result == nil {
+ // Special case null because it is decoded as an empty
+ // raw message for some reason.
+ resp.Set("result", otto.NullValue())
+ } else {
+ resultVal, err := JSON.Call("parse", string(result))
+ if err != nil {
+ resp = newErrorResponse(call, -32603, err.Error(), &req.Id).Object()
+ } else {
+ resp.Set("result", resultVal)
+ }
+ }
+ case rpc.Error:
+ resp.Set("error", map[string]interface{}{
+ "code": err.ErrorCode(),
+ "message": err.Error(),
+ })
+ default:
+ resp = newErrorResponse(call, -32603, err.Error(), &req.Id).Object()
}
- // Feed the reply back into the JavaScript runtime environment
- id, _ := result["id"]
- jsonver, _ := result["jsonrpc"]
-
- call.Otto.Set("ret_id", id)
- call.Otto.Set("ret_jsonrpc", jsonver)
- call.Otto.Set("response_idx", i)
+ resps.Call("push", resp)
+ }
- if res, ok := result["result"]; ok {
- payload, _ := json.Marshal(res)
- call.Otto.Set("ret_result", string(payload))
- response, err = call.Otto.Run(`
- ret_response[response_idx] = { jsonrpc: ret_jsonrpc, id: ret_id, result: JSON.parse(ret_result) };
- `)
- continue
- }
- if res, ok := result["error"]; ok {
- payload, _ := json.Marshal(res)
- call.Otto.Set("ret_result", string(payload))
- response, err = call.Otto.Run(`
- ret_response[response_idx] = { jsonrpc: ret_jsonrpc, id: ret_id, error: JSON.parse(ret_result) };
- `)
- continue
- }
- return newErrorResponse(call, -32603, fmt.Sprintf("Invalid response"), new(int64))
+ // Return the responses either to the callback (if supplied)
+ // or directly as the return value.
+ if batch {
+ response = resps.Value()
+ } else {
+ response, _ = resps.Get("0")
}
- // Convert single requests back from batch ones
- if !batch {
- call.Otto.Run("ret_response = ret_response[0];")
+ if fn := call.Argument(1).Object(); fn != nil && fn.Class() == "function" {
+ fn.Call("apply", response)
+ return otto.UndefinedValue()
}
- // Execute any registered callbacks
- if call.Argument(1).IsObject() {
- call.Otto.Set("callback", call.Argument(1))
- call.Otto.Run(`
- if (Object.prototype.toString.call(callback) == '[object Function]') {
- callback(null, ret_response);
- }
- `)
- }
- return
+ return response
+}
+
+func newErrorResponse(call otto.FunctionCall, code int, msg string, id interface{}) otto.Value {
+ // Bundle the error into a JSON RPC call response
+ m := map[string]interface{}{"version": "2.0", "id": id, "error": map[string]interface{}{"code": code, msg: msg}}
+ res, _ := json.Marshal(m)
+ val, _ := call.Otto.Run("(" + string(res) + ")")
+ return val
}
// throwJSException panics on an otto.Value. The Otto VM will recover from the
@@ -281,37 +279,3 @@ func throwJSException(msg interface{}) otto.Value {
}
panic(val)
}
-
-// newErrorResponse creates a JSON RPC error response for a specific request id,
-// containing the specified error code and error message. Beside returning the
-// error to the caller, it also sets the ret_error and ret_response JavaScript
-// variables.
-func newErrorResponse(call otto.FunctionCall, code int, msg string, id interface{}) (response otto.Value) {
- // Bundle the error into a JSON RPC call response
- res := rpc.JSONErrResponse{
- Version: rpc.JSONRPCVersion,
- Id: id,
- Error: rpc.JSONError{
- Code: code,
- Message: msg,
- },
- }
- // Serialize the error response into JavaScript variables
- errObj, err := json.Marshal(res.Error)
- if err != nil {
- glog.V(logger.Error).Infof("Failed to serialize JSON RPC error: %v", err)
- }
- resObj, err := json.Marshal(res)
- if err != nil {
- glog.V(logger.Error).Infof("Failed to serialize JSON RPC error response: %v", err)
- }
-
- if _, err = call.Otto.Run("ret_error = " + string(errObj)); err != nil {
- glog.V(logger.Error).Infof("Failed to set `ret_error` to the occurred error: %v", err)
- }
- resVal, err := call.Otto.Run("ret_response = " + string(resObj))
- if err != nil {
- glog.V(logger.Error).Infof("Failed to set `ret_response` to the JSON RPC response: %v", err)
- }
- return resVal
-}
diff --git a/console/console.go b/console/console.go
index 00d1fea1d..f224f0c2e 100644
--- a/console/console.go
+++ b/console/console.go
@@ -52,7 +52,7 @@ const DefaultPrompt = "> "
type Config struct {
DataDir string // Data directory to store the console history at
DocRoot string // Filesystem path from where to load JavaScript files from
- Client rpc.Client // RPC client to execute Ethereum requests through
+ Client *rpc.Client // RPC client to execute Ethereum requests through
Prompt string // Input prompt prefix string (defaults to DefaultPrompt)
Prompter UserPrompter // Input prompter to allow interactive user feedback (defaults to TerminalPrompter)
Printer io.Writer // Output writer to serialize any display strings to (defaults to os.Stdout)
@@ -63,7 +63,7 @@ type Config struct {
// JavaScript console attached to a running node via an external or in-process RPC
// client.
type Console struct {
- client rpc.Client // RPC client to execute Ethereum requests through
+ client *rpc.Client // RPC client to execute Ethereum requests through
jsre *jsre.JSRE // JavaScript runtime environment running the interpreter
prompt string // Input prompt prefix string
prompter UserPrompter // Input prompter to allow interactive user feedback
diff --git a/internal/jsre/pretty.go b/internal/jsre/pretty.go
index 30d8660ff..f32e16243 100644
--- a/internal/jsre/pretty.go
+++ b/internal/jsre/pretty.go
@@ -116,7 +116,7 @@ func (ctx ppctx) printValue(v otto.Value, level int, inArray bool) {
func (ctx ppctx) printObject(obj *otto.Object, level int, inArray bool) {
switch obj.Class() {
- case "Array":
+ case "Array", "GoArray":
lv, _ := obj.Get("length")
len, _ := lv.ToInteger()
if len == 0 {
diff --git a/node/node.go b/node/node.go
index 1f517a027..ac8a7e8f0 100644
--- a/node/node.go
+++ b/node/node.go
@@ -505,16 +505,14 @@ func (n *Node) Restart() error {
}
// Attach creates an RPC client attached to an in-process API handler.
-func (n *Node) Attach() (rpc.Client, error) {
+func (n *Node) Attach() (*rpc.Client, error) {
n.lock.RLock()
defer n.lock.RUnlock()
- // Short circuit if the node's not running
if n.server == nil {
return nil, ErrNodeStopped
}
- // Otherwise attach to the API and return
- return rpc.NewInProcRPCClient(n.inprocHandler), nil
+ return rpc.DialInProc(n.inprocHandler), nil
}
// Server retrieves the currently running P2P network layer. This method is meant
diff --git a/node/node_test.go b/node/node_test.go
index 372fc6b10..d9b26453b 100644
--- a/node/node_test.go
+++ b/node/node_test.go
@@ -507,21 +507,27 @@ func TestAPIGather(t *testing.T) {
}
// Register a batch of services with some configured APIs
calls := make(chan string, 1)
-
+ makeAPI := func(result string) *OneMethodApi {
+ return &OneMethodApi{fun: func() { calls <- result }}
+ }
services := map[string]struct {
APIs []rpc.API
Maker InstrumentingWrapper
}{
- "Zero APIs": {[]rpc.API{}, InstrumentedServiceMakerA},
- "Single API": {[]rpc.API{
- {"single", "1", &OneMethodApi{fun: func() { calls <- "single.v1" }}, true},
- }, InstrumentedServiceMakerB},
- "Many APIs": {[]rpc.API{
- {"multi", "1", &OneMethodApi{fun: func() { calls <- "multi.v1" }}, true},
- {"multi.v2", "2", &OneMethodApi{fun: func() { calls <- "multi.v2" }}, true},
- {"multi.v2.nested", "2", &OneMethodApi{fun: func() { calls <- "multi.v2.nested" }}, true},
- }, InstrumentedServiceMakerC},
+ "Zero APIs": {
+ []rpc.API{}, InstrumentedServiceMakerA},
+ "Single API": {
+ []rpc.API{
+ {Namespace: "single", Version: "1", Service: makeAPI("single.v1"), Public: true},
+ }, InstrumentedServiceMakerB},
+ "Many APIs": {
+ []rpc.API{
+ {Namespace: "multi", Version: "1", Service: makeAPI("multi.v1"), Public: true},
+ {Namespace: "multi.v2", Version: "2", Service: makeAPI("multi.v2"), Public: true},
+ {Namespace: "multi.v2.nested", Version: "2", Service: makeAPI("multi.v2.nested"), Public: true},
+ }, InstrumentedServiceMakerC},
}
+
for id, config := range services {
config := config
constructor := func(*ServiceContext) (Service, error) {
@@ -554,12 +560,8 @@ func TestAPIGather(t *testing.T) {
{"multi.v2.nested_theOneMethod", "multi.v2.nested"},
}
for i, test := range tests {
- if err := client.Send(rpc.JSONRequest{Id: []byte("1"), Version: "2.0", Method: test.Method}); err != nil {
- t.Fatalf("test %d: failed to send API request: %v", i, err)
- }
- reply := new(rpc.JSONSuccessResponse)
- if err := client.Recv(reply); err != nil {
- t.Fatalf("test %d: failed to read API reply: %v", i, err)
+ if err := client.Call(nil, test.Method); err != nil {
+ t.Errorf("test %d: API request failed: %v", i, err)
}
select {
case result := <-calls:
diff --git a/rpc/client.go b/rpc/client.go
new file mode 100644
index 000000000..4ff9a8cb9
--- /dev/null
+++ b/rpc/client.go
@@ -0,0 +1,740 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package rpc
+
+import (
+ "bytes"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "net"
+ "net/url"
+ "reflect"
+ "strconv"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+ "golang.org/x/net/context"
+)
+
+var (
+ ErrClientQuit = errors.New("client is closed")
+ ErrNoResult = errors.New("no result in JSON-RPC response")
+)
+
+const (
+ clientSubscriptionBuffer = 100 // if exceeded, the client stops reading
+ tcpKeepAliveInterval = 30 * time.Second
+ defaultDialTimeout = 10 * time.Second // used when dialing if the context has no deadline
+ defaultWriteTimeout = 10 * time.Second // used for calls if the context has no deadline
+ subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls
+)
+
+// BatchElem is an element in a batch request.
+type BatchElem struct {
+ Method string
+ Args []interface{}
+ // The result is unmarshaled into this field. Result must be set to a
+ // non-nil pointer value of the desired type, otherwise the response will be
+ // discarded.
+ Result interface{}
+ // Error is set if the server returns an error for this request, or if
+ // unmarshaling into Result fails. It is not set for I/O errors.
+ Error error
+}
+
+// A value of this type can a JSON-RPC request, notification, successful response or
+// error response. Which one it is depends on the fields.
+type jsonrpcMessage struct {
+ Version string `json:"jsonrpc"`
+ ID json.RawMessage `json:"id,omitempty"`
+ Method string `json:"method,omitempty"`
+ Params json.RawMessage `json:"params,omitempty"`
+ Error *jsonError `json:"error,omitempty"`
+ Result json.RawMessage `json:"result,omitempty"`
+}
+
+func (msg *jsonrpcMessage) isNotification() bool {
+ return msg.ID == nil && msg.Method != ""
+}
+
+func (msg *jsonrpcMessage) isResponse() bool {
+ return msg.hasValidID() && msg.Method == "" && len(msg.Params) == 0
+}
+
+func (msg *jsonrpcMessage) hasValidID() bool {
+ return len(msg.ID) > 0 && msg.ID[0] != '{' && msg.ID[0] != '['
+}
+
+func (msg *jsonrpcMessage) String() string {
+ b, _ := json.Marshal(msg)
+ return string(b)
+}
+
+// Client represents a connection to an RPC server.
+type Client struct {
+ idCounter uint32
+ connectFunc func(ctx context.Context) (net.Conn, error)
+ isHTTP bool
+
+ // writeConn is only safe to access outside dispatch, with the
+ // write lock held. The write lock is taken by sending on
+ // requestOp and released by sending on sendDone.
+ writeConn net.Conn
+
+ // for dispatch
+ close chan struct{}
+ didQuit chan struct{} // closed when client quits
+ reconnected chan net.Conn // where write/reconnect sends the new connection
+ readErr chan error // errors from read
+ readResp chan []*jsonrpcMessage // valid messages from read
+ requestOp chan *requestOp // for registering response IDs
+ sendDone chan error // signals write completion, releases write lock
+ respWait map[string]*requestOp // active requests
+ subs map[string]*ClientSubscription // active subscriptions
+}
+
+type requestOp struct {
+ ids []json.RawMessage
+ err error
+ resp chan *jsonrpcMessage // receives up to len(ids) responses
+ sub *ClientSubscription // only set for EthSubscribe requests
+}
+
+func (op *requestOp) wait(ctx context.Context) (*jsonrpcMessage, error) {
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case resp := <-op.resp:
+ return resp, op.err
+ }
+}
+
+// Dial creates a new client for the given URL.
+//
+// The currently supported URL schemes are "http", "https", "ws" and "wss". If rawurl is a
+// file name with no URL scheme, a local socket connection is established using UNIX
+// domain sockets on supported platforms and named pipes on Windows. If you want to
+// configure transport options, use DialHTTP, DialWebsocket or DialIPC instead.
+//
+// For websocket connections, the origin is set to the local host name.
+//
+// The client reconnects automatically if the connection is lost.
+func Dial(rawurl string) (*Client, error) {
+ return DialContext(context.Background(), rawurl)
+}
+
+// DialContext creates a new RPC client, just like Dial.
+//
+// The context is used to cancel or time out the initial connection establishment. It does
+// not affect subsequent interactions with the client.
+func DialContext(ctx context.Context, rawurl string) (*Client, error) {
+ u, err := url.Parse(rawurl)
+ if err != nil {
+ return nil, err
+ }
+ switch u.Scheme {
+ case "http", "https":
+ return DialHTTP(rawurl)
+ case "ws", "wss":
+ return DialWebsocket(ctx, rawurl, "")
+ case "":
+ return DialIPC(ctx, rawurl)
+ default:
+ return nil, fmt.Errorf("no known transport for URL scheme %q", u.Scheme)
+ }
+}
+
+func newClient(initctx context.Context, connectFunc func(context.Context) (net.Conn, error)) (*Client, error) {
+ conn, err := connectFunc(initctx)
+ if err != nil {
+ return nil, err
+ }
+ _, isHTTP := conn.(*httpConn)
+
+ c := &Client{
+ writeConn: conn,
+ isHTTP: isHTTP,
+ connectFunc: connectFunc,
+ close: make(chan struct{}),
+ didQuit: make(chan struct{}),
+ reconnected: make(chan net.Conn),
+ readErr: make(chan error),
+ readResp: make(chan []*jsonrpcMessage),
+ requestOp: make(chan *requestOp),
+ sendDone: make(chan error, 1),
+ respWait: make(map[string]*requestOp),
+ subs: make(map[string]*ClientSubscription),
+ }
+ if !isHTTP {
+ go c.dispatch(conn)
+ }
+ return c, nil
+}
+
+func (c *Client) nextID() json.RawMessage {
+ id := atomic.AddUint32(&c.idCounter, 1)
+ return []byte(strconv.FormatUint(uint64(id), 10))
+}
+
+// SupportedModules calls the rpc_modules method, retrieving the list of
+// APIs that are available on the server.
+func (c *Client) SupportedModules() (map[string]string, error) {
+ var result map[string]string
+ ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
+ defer cancel()
+ err := c.CallContext(ctx, &result, "rpc_modules")
+ return result, err
+}
+
+// Close closes the client, aborting any in-flight requests.
+func (c *Client) Close() {
+ if c.isHTTP {
+ return
+ }
+ select {
+ case c.close <- struct{}{}:
+ <-c.didQuit
+ case <-c.didQuit:
+ }
+}
+
+// Call performs a JSON-RPC call with the given arguments and unmarshals into
+// result if no error occurred.
+//
+// The result must be a pointer so that package json can unmarshal into it. You
+// can also pass nil, in which case the result is ignored.
+func (c *Client) Call(result interface{}, method string, args ...interface{}) error {
+ ctx := context.Background()
+ return c.CallContext(ctx, result, method, args...)
+}
+
+// CallContext performs a JSON-RPC call with the given arguments. If the context is
+// canceled before the call has successfully returned, CallContext returns immediately.
+//
+// The result must be a pointer so that package json can unmarshal into it. You
+// can also pass nil, in which case the result is ignored.
+func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
+ msg, err := c.newMessage(method, args...)
+ if err != nil {
+ return err
+ }
+ op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)}
+
+ if c.isHTTP {
+ err = c.sendHTTP(ctx, op, msg)
+ } else {
+ err = c.send(ctx, op, msg)
+ }
+ if err != nil {
+ return err
+ }
+
+ // dispatch has accepted the request and will close the channel it when it quits.
+ switch resp, err := op.wait(ctx); {
+ case err != nil:
+ return err
+ case resp.Error != nil:
+ return resp.Error
+ case len(resp.Result) == 0:
+ return ErrNoResult
+ default:
+ return json.Unmarshal(resp.Result, &result)
+ }
+}
+
+// BatchCall sends all given requests as a single batch and waits for the server
+// to return a response for all of them.
+//
+// In contrast to Call, BatchCall only returns I/O errors. Any error specific to
+// a request is reported through the Error field of the corresponding BatchElem.
+//
+// Note that batch calls may not be executed atomically on the server side.
+func (c *Client) BatchCall(b []BatchElem) error {
+ ctx := context.Background()
+ return c.BatchCallContext(ctx, b)
+}
+
+// BatchCall sends all given requests as a single batch and waits for the server
+// to return a response for all of them. The wait duration is bounded by the
+// context's deadline.
+//
+// In contrast to CallContext, BatchCallContext only returns I/O errors. Any
+// error specific to a request is reported through the Error field of the
+// corresponding BatchElem.
+//
+// Note that batch calls may not be executed atomically on the server side.
+func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
+ msgs := make([]*jsonrpcMessage, len(b))
+ op := &requestOp{
+ ids: make([]json.RawMessage, len(b)),
+ resp: make(chan *jsonrpcMessage, len(b)),
+ }
+ for i, elem := range b {
+ msg, err := c.newMessage(elem.Method, elem.Args...)
+ if err != nil {
+ return err
+ }
+ msgs[i] = msg
+ op.ids[i] = msg.ID
+ }
+
+ var err error
+ if c.isHTTP {
+ err = c.sendBatchHTTP(ctx, op, msgs)
+ } else {
+ err = c.send(ctx, op, msgs)
+ }
+
+ // Wait for all responses to come back.
+ for n := 0; n < len(b) && err == nil; n++ {
+ var resp *jsonrpcMessage
+ resp, err = op.wait(ctx)
+ if err != nil {
+ break
+ }
+ // Find the element corresponding to this response.
+ // The element is guaranteed to be present because dispatch
+ // only sends valid IDs to our channel.
+ var elem *BatchElem
+ for i := range msgs {
+ if bytes.Equal(msgs[i].ID, resp.ID) {
+ elem = &b[i]
+ break
+ }
+ }
+ if resp.Error != nil {
+ elem.Error = resp.Error
+ continue
+ }
+ if len(resp.Result) == 0 {
+ elem.Error = ErrNoResult
+ continue
+ }
+ elem.Error = json.Unmarshal(resp.Result, elem.Result)
+ }
+ return err
+}
+
+// EthSubscribe calls the "eth_subscribe" method with the given arguments,
+// registering a subscription. Server notifications for the subscription are
+// sent to the given channel. The element type of the channel must match the
+// expected type of content returned by the subscription.
+//
+// Callers should not use the same channel for multiple calls to EthSubscribe.
+// The channel is closed when the notification is unsubscribed or an error
+// occurs. The error can be retrieved via the Err method of the subscription.
+//
+// Slow subscribers will block the clients ingress path eventually.
+func (c *Client) EthSubscribe(channel interface{}, args ...interface{}) (*ClientSubscription, error) {
+ // Check type of channel first.
+ chanVal := reflect.ValueOf(channel)
+ if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 {
+ panic("first argument to EthSubscribe must be a writable channel")
+ }
+ if chanVal.IsNil() {
+ panic("channel given to EthSubscribe must not be nil")
+ }
+ if c.isHTTP {
+ return nil, ErrNotificationsUnsupported
+ }
+
+ msg, err := c.newMessage(subscribeMethod, args...)
+ if err != nil {
+ return nil, err
+ }
+ op := &requestOp{
+ ids: []json.RawMessage{msg.ID},
+ resp: make(chan *jsonrpcMessage),
+ sub: newClientSubscription(c, chanVal),
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
+ defer cancel()
+
+ // Send the subscription request.
+ // The arrival and validity of the response is signaled on sub.quit.
+ if err := c.send(ctx, op, msg); err != nil {
+ return nil, err
+ }
+ if _, err := op.wait(ctx); err != nil {
+ return nil, err
+ }
+ return op.sub, nil
+}
+
+func (c *Client) newMessage(method string, paramsIn ...interface{}) (*jsonrpcMessage, error) {
+ params, err := json.Marshal(paramsIn)
+ if err != nil {
+ return nil, err
+ }
+ return &jsonrpcMessage{Version: "2.0", ID: c.nextID(), Method: method, Params: params}, nil
+}
+
+// send registers op with the dispatch loop, then sends msg on the connection.
+// if sending fails, op is deregistered.
+func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error {
+ select {
+ case c.requestOp <- op:
+ if glog.V(logger.Detail) {
+ glog.Info("sending ", msg)
+ }
+ err := c.write(ctx, msg)
+ c.sendDone <- err
+ return err
+ case <-c.didQuit:
+ return ErrClientQuit
+ }
+}
+
+func (c *Client) write(ctx context.Context, msg interface{}) error {
+ deadline, ok := ctx.Deadline()
+ if !ok {
+ deadline = time.Now().Add(defaultWriteTimeout)
+ }
+ // The previous write failed. Try to establish a new connection.
+ if c.writeConn == nil {
+ if err := c.reconnect(ctx); err != nil {
+ return err
+ }
+ }
+ c.writeConn.SetWriteDeadline(deadline)
+ err := json.NewEncoder(c.writeConn).Encode(msg)
+ if err != nil {
+ c.writeConn = nil
+ }
+ return err
+}
+
+func (c *Client) reconnect(ctx context.Context) error {
+ newconn, err := c.connectFunc(ctx)
+ if err != nil {
+ glog.V(logger.Detail).Infof("reconnect failed: %v", err)
+ return err
+ }
+ select {
+ case c.reconnected <- newconn:
+ c.writeConn = newconn
+ return nil
+ case <-c.didQuit:
+ newconn.Close()
+ return ErrClientQuit
+ }
+}
+
+// dispatch is the main loop of the client.
+// It sends read messages to waiting calls to Call and BatchCall
+// and subscription notifications to registered subscriptions.
+func (c *Client) dispatch(conn net.Conn) {
+ // Spawn the initial read loop.
+ go c.read(conn)
+
+ var (
+ lastOp *requestOp // tracks last send operation
+ requestOpLock = c.requestOp // nil while the send lock is held
+ reading = true // if true, a read loop is running
+ )
+ defer close(c.didQuit)
+ defer func() {
+ c.closeRequestOps(ErrClientQuit)
+ conn.Close()
+ if reading {
+ // Empty read channels until read is dead.
+ for {
+ select {
+ case <-c.readResp:
+ case <-c.readErr:
+ return
+ }
+ }
+ }
+ }()
+
+ for {
+ select {
+ case <-c.close:
+ return
+
+ // Read path.
+ case batch := <-c.readResp:
+ for _, msg := range batch {
+ switch {
+ case msg.isNotification():
+ if glog.V(logger.Detail) {
+ glog.Info("<-readResp: notification ", msg)
+ }
+ c.handleNotification(msg)
+ case msg.isResponse():
+ if glog.V(logger.Detail) {
+ glog.Info("<-readResp: response ", msg)
+ }
+ c.handleResponse(msg)
+ default:
+ if glog.V(logger.Debug) {
+ glog.Error("<-readResp: dropping weird message", msg)
+ }
+ // TODO: maybe close
+ }
+ }
+
+ case err := <-c.readErr:
+ glog.V(logger.Debug).Infof("<-readErr: %v", err)
+ c.closeRequestOps(err)
+ conn.Close()
+ reading = false
+
+ case newconn := <-c.reconnected:
+ glog.V(logger.Debug).Infof("<-reconnected: (reading=%t) %v", reading, conn.RemoteAddr())
+ if reading {
+ // Wait for the previous read loop to exit. This is a rare case.
+ conn.Close()
+ <-c.readErr
+ }
+ go c.read(newconn)
+ reading = true
+ conn = newconn
+
+ // Send path.
+ case op := <-requestOpLock:
+ // Stop listening for further send ops until the current one is done.
+ requestOpLock = nil
+ lastOp = op
+ for _, id := range op.ids {
+ c.respWait[string(id)] = op
+ }
+
+ case err := <-c.sendDone:
+ if err != nil {
+ // Remove response handlers for the last send. We remove those here
+ // because the error is already handled in Call or BatchCall. When the
+ // read loop goes down, it will signal all other current operations.
+ for _, id := range lastOp.ids {
+ delete(c.respWait, string(id))
+ }
+ }
+ // Listen for send ops again.
+ requestOpLock = c.requestOp
+ lastOp = nil
+ }
+ }
+}
+
+// closeRequestOps unblocks pending send ops and active subscriptions.
+func (c *Client) closeRequestOps(err error) {
+ didClose := make(map[*requestOp]bool)
+
+ for id, op := range c.respWait {
+ // Remove the op so that later calls will not close op.resp again.
+ delete(c.respWait, id)
+
+ if !didClose[op] {
+ op.err = err
+ close(op.resp)
+ didClose[op] = true
+ }
+ }
+ for id, sub := range c.subs {
+ delete(c.subs, id)
+ sub.quitWithError(err, false)
+ }
+}
+
+func (c *Client) handleNotification(msg *jsonrpcMessage) {
+ if msg.Method != notificationMethod {
+ glog.V(logger.Debug).Info("dropping non-subscription message: ", msg)
+ return
+ }
+ var subResult struct {
+ ID string `json:"subscription"`
+ Result json.RawMessage `json:"result"`
+ }
+ if err := json.Unmarshal(msg.Params, &subResult); err != nil {
+ glog.V(logger.Debug).Info("dropping invalid subscription message: ", msg)
+ return
+ }
+ if c.subs[subResult.ID] != nil {
+ c.subs[subResult.ID].deliver(subResult.Result)
+ }
+}
+
+func (c *Client) handleResponse(msg *jsonrpcMessage) {
+ op := c.respWait[string(msg.ID)]
+ if op == nil {
+ glog.V(logger.Debug).Infof("unsolicited response %v", msg)
+ return
+ }
+ delete(c.respWait, string(msg.ID))
+ // For normal responses, just forward the reply to Call/BatchCall.
+ if op.sub == nil {
+ op.resp <- msg
+ return
+ }
+ // For subscription responses, start the subscription if the server
+ // indicates success. EthSubscribe gets unblocked in either case through
+ // the op.resp channel.
+ defer close(op.resp)
+ if msg.Error != nil {
+ op.err = msg.Error
+ return
+ }
+ if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil {
+ go op.sub.start()
+ c.subs[op.sub.subid] = op.sub
+ }
+}
+
+// Reading happens on a dedicated goroutine.
+
+func (c *Client) read(conn net.Conn) error {
+ var (
+ buf json.RawMessage
+ dec = json.NewDecoder(conn)
+ )
+ readMessage := func() (rs []*jsonrpcMessage, err error) {
+ buf = buf[:0]
+ if err = dec.Decode(&buf); err != nil {
+ return nil, err
+ }
+ if isBatch(buf) {
+ err = json.Unmarshal(buf, &rs)
+ } else {
+ rs = make([]*jsonrpcMessage, 1)
+ err = json.Unmarshal(buf, &rs[0])
+ }
+ return rs, err
+ }
+
+ for {
+ resp, err := readMessage()
+ if err != nil {
+ c.readErr <- err
+ return err
+ }
+ c.readResp <- resp
+ }
+}
+
+// Subscriptions.
+
+// A ClientSubscription represents a subscription established through EthSubscribe.
+type ClientSubscription struct {
+ client *Client
+ etype reflect.Type
+ channel reflect.Value
+ subid string
+ in chan json.RawMessage
+
+ quitOnce sync.Once // ensures quit is closed once
+ quit chan struct{} // quit is closed when the subscription exits
+ errOnce sync.Once // ensures err is closed once
+ err chan error
+}
+
+func newClientSubscription(c *Client, channel reflect.Value) *ClientSubscription {
+ sub := &ClientSubscription{
+ client: c,
+ etype: channel.Type().Elem(),
+ channel: channel,
+ quit: make(chan struct{}),
+ err: make(chan error, 1),
+ // in is buffered so dispatch can continue even if the subscriber is slow.
+ in: make(chan json.RawMessage, clientSubscriptionBuffer),
+ }
+ return sub
+}
+
+// Err returns the subscription error channel. The intended use of Err is to schedule
+// resubscription when the client connection is closed unexpectedly.
+//
+// The error channel receives a value when the subscription has ended due
+// to an error. The received error is ErrClientQuit if Close has been called
+// on the underlying client and no other error has occurred.
+//
+// The error channel is closed when Unsubscribe is called on the subscription.
+func (sub *ClientSubscription) Err() <-chan error {
+ return sub.err
+}
+
+// Unsubscribe unsubscribes the notification and closes the error channel.
+// It can safely be called more than once.
+func (sub *ClientSubscription) Unsubscribe() {
+ sub.quitWithError(nil, true)
+ sub.errOnce.Do(func() { close(sub.err) })
+}
+
+func (sub *ClientSubscription) quitWithError(err error, unsubscribeServer bool) {
+ sub.quitOnce.Do(func() {
+ if unsubscribeServer {
+ sub.requestUnsubscribe()
+ }
+ if err != nil {
+ sub.err <- err
+ }
+ close(sub.quit)
+ })
+}
+
+func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) {
+ select {
+ case sub.in <- result:
+ return true
+ case <-sub.quit:
+ return false
+ }
+}
+
+func (sub *ClientSubscription) start() {
+ sub.quitWithError(sub.forward())
+}
+
+func (sub *ClientSubscription) forward() (err error, unsubscribeServer bool) {
+ cases := []reflect.SelectCase{
+ {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)},
+ {Dir: reflect.SelectSend, Chan: sub.channel},
+ }
+ for {
+ select {
+ case result := <-sub.in:
+ val, err := sub.unmarshal(result)
+ if err != nil {
+ return err, true
+ }
+ cases[1].Send = val
+ switch chosen, _, _ := reflect.Select(cases); chosen {
+ case 0: // <-sub.quit
+ return nil, false
+ case 1: // sub.channel<-
+ continue
+ }
+ case <-sub.quit:
+ return nil, false
+ }
+ }
+}
+
+func (sub *ClientSubscription) unmarshal(result json.RawMessage) (reflect.Value, error) {
+ val := reflect.New(sub.etype)
+ err := json.Unmarshal(result, val.Interface())
+ return val.Elem(), err
+}
+
+func (sub *ClientSubscription) requestUnsubscribe() error {
+ var result interface{}
+ return sub.client.Call(&result, unsubscribeMethod, sub.subid)
+}
diff --git a/rpc/client_context_go1.4.go b/rpc/client_context_go1.4.go
new file mode 100644
index 000000000..ac956a17d
--- /dev/null
+++ b/rpc/client_context_go1.4.go
@@ -0,0 +1,60 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// +build !go1.5
+
+package rpc
+
+import (
+ "net"
+ "net/http"
+ "time"
+
+ "golang.org/x/net/context"
+)
+
+// In older versions of Go (below 1.5), dials cannot be canceled
+// via a channel or context. The context deadline can still applied.
+
+// contextDialer returns a dialer that applies the deadline value from the given context.
+func contextDialer(ctx context.Context) *net.Dialer {
+ dialer := &net.Dialer{KeepAlive: tcpKeepAliveInterval}
+ if deadline, ok := ctx.Deadline(); ok {
+ dialer.Deadline = deadline
+ } else {
+ dialer.Deadline = time.Now().Add(defaultDialTimeout)
+ }
+ return dialer
+}
+
+// dialContext connects to the given address, aborting the dial if ctx is canceled.
+func dialContext(ctx context.Context, network, addr string) (net.Conn, error) {
+ return contextDialer(ctx).Dial(network, addr)
+}
+
+// requestWithContext copies req, adding the cancelation channel and deadline from ctx.
+func requestWithContext(c *http.Client, req *http.Request, ctx context.Context) (*http.Client, *http.Request) {
+ // Set Timeout on the client if the context has a deadline.
+ // Note that there is no default timeout (unlike in contextDialer) because
+ // the timeout applies to the entire request, including reads from body.
+ if deadline, ok := ctx.Deadline(); ok {
+ c2 := *c
+ c2.Timeout = deadline.Sub(time.Now())
+ c = &c2
+ }
+ req2 := *req
+ return c, &req2
+}
diff --git a/rpc/client_context_go1.5.go b/rpc/client_context_go1.5.go
new file mode 100644
index 000000000..4a007d9f8
--- /dev/null
+++ b/rpc/client_context_go1.5.go
@@ -0,0 +1,61 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// +build go1.5,!go1.6
+
+package rpc
+
+import (
+ "net"
+ "net/http"
+ "time"
+
+ "golang.org/x/net/context"
+)
+
+// In Go 1.5, dials cannot be canceled via a channel or context. The context deadline can
+// still be applied. Go 1.5 adds the ability to cancel HTTP requests via a channel.
+
+// contextDialer returns a dialer that applies the deadline value from the given context.
+func contextDialer(ctx context.Context) *net.Dialer {
+ dialer := &net.Dialer{KeepAlive: tcpKeepAliveInterval}
+ if deadline, ok := ctx.Deadline(); ok {
+ dialer.Deadline = deadline
+ } else {
+ dialer.Deadline = time.Now().Add(defaultDialTimeout)
+ }
+ return dialer
+}
+
+// dialContext connects to the given address, aborting the dial if ctx is canceled.
+func dialContext(ctx context.Context, network, addr string) (net.Conn, error) {
+ return contextDialer(ctx).Dial(network, addr)
+}
+
+// requestWithContext copies req, adding the cancelation channel and deadline from ctx.
+func requestWithContext(c *http.Client, req *http.Request, ctx context.Context) (*http.Client, *http.Request) {
+ // Set Timeout on the client if the context has a deadline.
+ // Note that there is no default timeout (unlike in contextDialer) because
+ // the timeout applies to the entire request, including reads from body.
+ if deadline, ok := ctx.Deadline(); ok {
+ c2 := *c
+ c2.Timeout = deadline.Sub(time.Now())
+ c = &c2
+ }
+ req2 := *req
+ req2.Cancel = ctx.Done()
+ return c, &req2
+}
diff --git a/rpc/client_context_go1.6.go b/rpc/client_context_go1.6.go
new file mode 100644
index 000000000..67777ddc6
--- /dev/null
+++ b/rpc/client_context_go1.6.go
@@ -0,0 +1,55 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// +build go1.6,!go1.7
+
+package rpc
+
+import (
+ "net"
+ "net/http"
+ "time"
+
+ "golang.org/x/net/context"
+)
+
+// In Go 1.6, net.Dialer gained the ability to cancel via a channel.
+
+// contextDialer returns a dialer that applies the deadline value from the given context.
+func contextDialer(ctx context.Context) *net.Dialer {
+ dialer := &net.Dialer{Cancel: ctx.Done(), KeepAlive: tcpKeepAliveInterval}
+ if deadline, ok := ctx.Deadline(); ok {
+ dialer.Deadline = deadline
+ } else {
+ dialer.Deadline = time.Now().Add(defaultDialTimeout)
+ }
+ return dialer
+}
+
+// dialContext connects to the given address, aborting the dial if ctx is canceled.
+func dialContext(ctx context.Context, network, addr string) (net.Conn, error) {
+ return contextDialer(ctx).Dial(network, addr)
+}
+
+// requestWithContext copies req, adding the cancelation channel and deadline from ctx.
+func requestWithContext(c *http.Client, req *http.Request, ctx context.Context) (*http.Client, *http.Request) {
+ // We set Timeout on the client for Go <= 1.5. There
+ // is no need to do that here because the dial will be canceled
+ // by package http.
+ req2 := *req
+ req2.Cancel = ctx.Done()
+ return c, &req2
+}
diff --git a/rpc/client_context_go1.7.go b/rpc/client_context_go1.7.go
new file mode 100644
index 000000000..56ce12ab8
--- /dev/null
+++ b/rpc/client_context_go1.7.go
@@ -0,0 +1,51 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// +build go1.7
+
+package rpc
+
+import (
+ "context"
+ "net"
+ "net/http"
+ "time"
+)
+
+// In Go 1.7, context moved into the standard library and support
+// for cancelation via context was added to net.Dialer and http.Request.
+
+// contextDialer returns a dialer that applies the deadline value from the given context.
+func contextDialer(ctx context.Context) *net.Dialer {
+ dialer := &net.Dialer{Cancel: ctx.Done(), KeepAlive: tcpKeepAliveInterval}
+ if deadline, ok := ctx.Deadline(); ok {
+ dialer.Deadline = deadline
+ } else {
+ dialer.Deadline = time.Now().Add(defaultDialTimeout)
+ }
+ return dialer
+}
+
+// dialContext connects to the given address, aborting the dial if ctx is canceled.
+func dialContext(ctx context.Context, network, addr string) (net.Conn, error) {
+ d := &net.Dialer{KeepAlive: tcpKeepAliveInterval}
+ return d.DialContext(ctx, network, addr)
+}
+
+// requestWithContext copies req, adding the cancelation channel and deadline from ctx.
+func requestWithContext(c *http.Client, req *http.Request, ctx context.Context) (*http.Client, *http.Request) {
+ return c, req.WithContext(ctx)
+}
diff --git a/rpc/client_example_test.go b/rpc/client_example_test.go
new file mode 100644
index 000000000..84b4b67bb
--- /dev/null
+++ b/rpc/client_example_test.go
@@ -0,0 +1,83 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package rpc_test
+
+import (
+ "fmt"
+ "math/big"
+ "time"
+
+ "github.com/ethereum/go-ethereum/rpc"
+)
+
+// In this example, our client whishes to track the latest 'block number'
+// known to the server. The server supports two methods:
+//
+// eth_getBlockByNumber("latest", {})
+// returns the latest block object.
+//
+// eth_subscribe("newBlocks")
+// creates a subscription which fires block objects when new blocks arrive.
+
+type Block struct {
+ Number *big.Int
+}
+
+func ExampleClientSubscription() {
+ // Connect the client.
+ client, _ := rpc.Dial("ws://127.0.0.1:8485")
+ subch := make(chan Block)
+ go subscribeBlocks(client, subch)
+
+ // Print events from the subscription as they arrive.
+ for block := range subch {
+ fmt.Println("latest block:", block.Number)
+ }
+}
+
+// subscribeBlocks runs in its own goroutine and maintains
+// a subscription for new blocks.
+func subscribeBlocks(client *rpc.Client, subch chan Block) {
+ for i := 0; ; i++ {
+ if i > 0 {
+ time.Sleep(2 * time.Second)
+ }
+
+ // Subscribe to new blocks.
+ sub, err := client.EthSubscribe(subch, "newBlocks")
+ if err == rpc.ErrClientQuit {
+ return // Stop reconnecting if the client was closed.
+ } else if err != nil {
+ fmt.Println("subscribe error:", err)
+ continue
+ }
+
+ // The connection is established now.
+ // Update the channel with the current block.
+ var lastBlock Block
+ if err := client.Call(&lastBlock, "eth_getBlockByNumber", "latest"); err != nil {
+ fmt.Println("can't get latest block:", err)
+ continue
+ }
+ subch <- lastBlock
+
+ // The subscription will deliver events to the channel. Wait for the
+ // subscription to end for any reason, then loop around to re-establish
+ // the connection.
+ fmt.Println("connection lost: ", <-sub.Err())
+ }
+}
diff --git a/rpc/client_test.go b/rpc/client_test.go
new file mode 100644
index 000000000..58dceada0
--- /dev/null
+++ b/rpc/client_test.go
@@ -0,0 +1,489 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package rpc
+
+import (
+ "fmt"
+ "math/rand"
+ "net"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "reflect"
+ "runtime"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/davecgh/go-spew/spew"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+ "golang.org/x/net/context"
+)
+
+func TestClientRequest(t *testing.T) {
+ server := newTestServer("service", new(Service))
+ defer server.Stop()
+ client := DialInProc(server)
+ defer client.Close()
+
+ var resp Result
+ if err := client.Call(&resp, "service_echo", "hello", 10, &Args{"world"}); err != nil {
+ t.Fatal(err)
+ }
+ if !reflect.DeepEqual(resp, Result{"hello", 10, &Args{"world"}}) {
+ t.Errorf("incorrect result %#v", resp)
+ }
+}
+
+func TestClientBatchRequest(t *testing.T) {
+ server := newTestServer("service", new(Service))
+ defer server.Stop()
+ client := DialInProc(server)
+ defer client.Close()
+
+ batch := []BatchElem{
+ {
+ Method: "service_echo",
+ Args: []interface{}{"hello", 10, &Args{"world"}},
+ Result: new(Result),
+ },
+ {
+ Method: "service_echo",
+ Args: []interface{}{"hello2", 11, &Args{"world"}},
+ Result: new(Result),
+ },
+ {
+ Method: "no_such_method",
+ Args: []interface{}{1, 2, 3},
+ Result: new(int),
+ },
+ }
+ if err := client.BatchCall(batch); err != nil {
+ t.Fatal(err)
+ }
+ wantResult := []BatchElem{
+ {
+ Method: "service_echo",
+ Args: []interface{}{"hello", 10, &Args{"world"}},
+ Result: &Result{"hello", 10, &Args{"world"}},
+ },
+ {
+ Method: "service_echo",
+ Args: []interface{}{"hello2", 11, &Args{"world"}},
+ Result: &Result{"hello2", 11, &Args{"world"}},
+ },
+ {
+ Method: "no_such_method",
+ Args: []interface{}{1, 2, 3},
+ Result: new(int),
+ Error: &jsonError{Code: -32601, Message: "The method no_such_method_ does not exist/is not available"},
+ },
+ }
+ if !reflect.DeepEqual(batch, wantResult) {
+ t.Errorf("batch results mismatch:\ngot %swant %s", spew.Sdump(batch), spew.Sdump(wantResult))
+ }
+}
+
+// func TestClientCancelInproc(t *testing.T) { testClientCancel("inproc", t) }
+func TestClientCancelWebsocket(t *testing.T) { testClientCancel("ws", t) }
+func TestClientCancelHTTP(t *testing.T) { testClientCancel("http", t) }
+func TestClientCancelIPC(t *testing.T) { testClientCancel("ipc", t) }
+
+// This test checks that requests made through CallContext can be canceled by canceling
+// the context.
+func testClientCancel(transport string, t *testing.T) {
+ server := newTestServer("service", new(Service))
+ defer server.Stop()
+
+ // What we want to achieve is that the context gets canceled
+ // at various stages of request processing. The interesting cases
+ // are:
+ // - cancel during dial
+ // - cancel while performing a HTTP request
+ // - cancel while waiting for a response
+ //
+ // To trigger those, the times are chosen such that connections
+ // are killed within the deadline for every other call (maxKillTimeout
+ // is 2x maxCancelTimeout).
+ //
+ // Once a connection is dead, there is a fair chance it won't connect
+ // successfully because the accept is delayed by 1s.
+ maxContextCancelTimeout := 300 * time.Millisecond
+ fl := &flakeyListener{
+ maxAcceptDelay: 1 * time.Second,
+ maxKillTimeout: 600 * time.Millisecond,
+ }
+
+ var client *Client
+ switch transport {
+ case "ws", "http":
+ c, hs := httpTestClient(server, transport, fl)
+ defer hs.Close()
+ client = c
+ case "ipc":
+ c, l := ipcTestClient(server, fl)
+ defer l.Close()
+ client = c
+ default:
+ panic("unknown transport: " + transport)
+ }
+
+ // These tests take a lot of time, run them all at once.
+ // You probably want to run with -parallel 1 or comment out
+ // the call to t.Parallel if you enable the logging.
+ t.Parallel()
+ // glog.SetV(6)
+ // glog.SetToStderr(true)
+ // defer glog.SetToStderr(false)
+ // glog.Infoln("testing ", transport)
+
+ // The actual test starts here.
+ var (
+ wg sync.WaitGroup
+ nreqs = 10
+ ncallers = 6
+ )
+ caller := func(index int) {
+ defer wg.Done()
+ for i := 0; i < nreqs; i++ {
+ var (
+ ctx context.Context
+ cancel func()
+ timeout = time.Duration(rand.Int63n(int64(maxContextCancelTimeout)))
+ )
+ if index < ncallers/2 {
+ // For half of the callers, create a context without deadline
+ // and cancel it later.
+ ctx, cancel = context.WithCancel(context.Background())
+ time.AfterFunc(timeout, cancel)
+ } else {
+ // For the other half, create a context with a deadline instead. This is
+ // different because the context deadline is used to set the socket write
+ // deadline.
+ ctx, cancel = context.WithTimeout(context.Background(), timeout)
+ }
+ // Now perform a call with the context.
+ // The key thing here is that no call will ever complete successfully.
+ err := client.CallContext(ctx, nil, "service_sleep", 2*maxContextCancelTimeout)
+ if err != nil {
+ glog.V(logger.Debug).Infoln("got expected error:", err)
+ } else {
+ t.Errorf("no error for call with %v wait time", timeout)
+ }
+ cancel()
+ }
+ }
+ wg.Add(ncallers)
+ for i := 0; i < ncallers; i++ {
+ go caller(i)
+ }
+ wg.Wait()
+}
+
+func TestClientSubscribeInvalidArg(t *testing.T) {
+ server := newTestServer("service", new(Service))
+ defer server.Stop()
+ client := DialInProc(server)
+ defer client.Close()
+
+ check := func(shouldPanic bool, arg interface{}) {
+ defer func() {
+ err := recover()
+ if shouldPanic && err == nil {
+ t.Errorf("EthSubscribe should've panicked for %#v", arg)
+ }
+ if !shouldPanic && err != nil {
+ t.Errorf("EthSubscribe shouldn't have panicked for %#v", arg)
+ buf := make([]byte, 1024*1024)
+ buf = buf[:runtime.Stack(buf, false)]
+ t.Error(err)
+ t.Error(string(buf))
+ }
+ }()
+ client.EthSubscribe(arg, "foo_bar")
+ }
+ check(true, nil)
+ check(true, 1)
+ check(true, (chan int)(nil))
+ check(true, make(<-chan int))
+ check(false, make(chan int))
+ check(false, make(chan<- int))
+}
+
+func TestClientSubscribe(t *testing.T) {
+ server := newTestServer("eth", new(NotificationTestService))
+ defer server.Stop()
+ client := DialInProc(server)
+ defer client.Close()
+
+ nc := make(chan int)
+ count := 10
+ sub, err := client.EthSubscribe(nc, "someSubscription", count, 0)
+ if err != nil {
+ t.Fatal("can't subscribe:", err)
+ }
+ for i := 0; i < count; i++ {
+ if val := <-nc; val != i {
+ t.Fatalf("value mismatch: got %d, want %d", val, i)
+ }
+ }
+
+ sub.Unsubscribe()
+ select {
+ case v := <-nc:
+ t.Fatal("received value after unsubscribe:", v)
+ case err := <-sub.Err():
+ if err != nil {
+ t.Fatalf("Err returned a non-nil error after explicit unsubscribe: %q", err)
+ }
+ case <-time.After(1 * time.Second):
+ t.Fatalf("subscription not closed within 1s after unsubscribe")
+ }
+}
+
+// In this test, the connection drops while EthSubscribe is
+// waiting for a response.
+func TestClientSubscribeClose(t *testing.T) {
+ service := &NotificationTestService{
+ gotHangSubscriptionReq: make(chan struct{}),
+ unblockHangSubscription: make(chan struct{}),
+ }
+ server := newTestServer("eth", service)
+ defer server.Stop()
+ client := DialInProc(server)
+ defer client.Close()
+
+ var (
+ nc = make(chan int)
+ errc = make(chan error)
+ sub *ClientSubscription
+ err error
+ )
+ go func() {
+ sub, err = client.EthSubscribe(nc, "hangSubscription", 999)
+ errc <- err
+ }()
+
+ <-service.gotHangSubscriptionReq
+ client.Close()
+ service.unblockHangSubscription <- struct{}{}
+
+ select {
+ case err := <-errc:
+ if err == nil {
+ t.Errorf("EthSubscribe returned nil error after Close")
+ }
+ if sub != nil {
+ t.Error("EthSubscribe returned non-nil subscription after Close")
+ }
+ case <-time.After(1 * time.Second):
+ t.Fatalf("EthSubscribe did not return within 1s after Close")
+ }
+}
+
+func TestClientHTTP(t *testing.T) {
+ server := newTestServer("service", new(Service))
+ defer server.Stop()
+
+ client, hs := httpTestClient(server, "http", nil)
+ defer hs.Close()
+ defer client.Close()
+
+ // Launch concurrent requests.
+ var (
+ results = make([]Result, 100)
+ errc = make(chan error)
+ wantResult = Result{"a", 1, new(Args)}
+ )
+ defer client.Close()
+ for i := range results {
+ i := i
+ go func() {
+ errc <- client.Call(&results[i], "service_echo",
+ wantResult.String, wantResult.Int, wantResult.Args)
+ }()
+ }
+
+ // Wait for all of them to complete.
+ timeout := time.NewTimer(5 * time.Second)
+ defer timeout.Stop()
+ for i := range results {
+ select {
+ case err := <-errc:
+ if err != nil {
+ t.Fatal(err)
+ }
+ case <-timeout.C:
+ t.Fatalf("timeout (got %d/%d) results)", i+1, len(results))
+ }
+ }
+
+ // Check results.
+ for i := range results {
+ if !reflect.DeepEqual(results[i], wantResult) {
+ t.Errorf("result %d mismatch: got %#v, want %#v", i, results[i], wantResult)
+ }
+ }
+}
+
+func TestClientReconnect(t *testing.T) {
+ startServer := func(addr string) (*Server, net.Listener) {
+ srv := newTestServer("service", new(Service))
+ l, err := net.Listen("tcp", addr)
+ if err != nil {
+ t.Fatal(err)
+ }
+ go http.Serve(l, srv.WebsocketHandler("*"))
+ return srv, l
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ // Start a server and corresponding client.
+ s1, l1 := startServer("127.0.0.1:0")
+ client, err := DialContext(ctx, "ws://"+l1.Addr().String())
+ if err != nil {
+ t.Fatal("can't dial", err)
+ }
+
+ // Perform a call. This should work because the server is up.
+ var resp Result
+ if err := client.CallContext(ctx, &resp, "service_echo", "", 1, nil); err != nil {
+ t.Fatal(err)
+ }
+
+ // Shut down the server and try calling again. It shouldn't work.
+ l1.Close()
+ s1.Stop()
+ if err := client.CallContext(ctx, &resp, "service_echo", "", 2, nil); err == nil {
+ t.Error("successful call while the server is down")
+ t.Logf("resp: %#v", resp)
+ }
+
+ // Allow for some cool down time so we can listen on the same address again.
+ time.Sleep(2 * time.Second)
+
+ // Start it up again and call again. The connection should be reestablished.
+ // We spawn multiple calls here to check whether this hangs somehow.
+ s2, l2 := startServer(l1.Addr().String())
+ defer l2.Close()
+ defer s2.Stop()
+
+ start := make(chan struct{})
+ errors := make(chan error, 20)
+ for i := 0; i < cap(errors); i++ {
+ go func() {
+ <-start
+ var resp Result
+ errors <- client.CallContext(ctx, &resp, "service_echo", "", 3, nil)
+ }()
+ }
+ close(start)
+ errcount := 0
+ for i := 0; i < cap(errors); i++ {
+ if err = <-errors; err != nil {
+ errcount++
+ }
+ }
+ t.Log("err:", err)
+ if errcount > 1 {
+ t.Errorf("expected one error after disconnect, got %d", errcount)
+ }
+}
+
+func newTestServer(serviceName string, service interface{}) *Server {
+ server := NewServer()
+ if err := server.RegisterName(serviceName, service); err != nil {
+ panic(err)
+ }
+ return server
+}
+
+func httpTestClient(srv *Server, transport string, fl *flakeyListener) (*Client, *httptest.Server) {
+ // Create the HTTP server.
+ var hs *httptest.Server
+ switch transport {
+ case "ws":
+ hs = httptest.NewUnstartedServer(srv.WebsocketHandler("*"))
+ case "http":
+ hs = httptest.NewUnstartedServer(srv)
+ default:
+ panic("unknown HTTP transport: " + transport)
+ }
+ // Wrap the listener if required.
+ if fl != nil {
+ fl.Listener = hs.Listener
+ hs.Listener = fl
+ }
+ // Connect the client.
+ hs.Start()
+ client, err := Dial(transport + "://" + hs.Listener.Addr().String())
+ if err != nil {
+ panic(err)
+ }
+ return client, hs
+}
+
+func ipcTestClient(srv *Server, fl *flakeyListener) (*Client, net.Listener) {
+ // Listen on a random endpoint.
+ endpoint := fmt.Sprintf("go-ethereum-test-ipc-%d-%d", os.Getpid(), rand.Int63())
+ if runtime.GOOS == "windows" {
+ endpoint = `\\.\pipe\` + endpoint
+ } else {
+ endpoint = os.TempDir() + "/" + endpoint
+ }
+ l, err := ipcListen(endpoint)
+ if err != nil {
+ panic(err)
+ }
+ // Connect the listener to the server.
+ if fl != nil {
+ fl.Listener = l
+ l = fl
+ }
+ go srv.ServeListener(l)
+ // Connect the client.
+ client, err := Dial(endpoint)
+ if err != nil {
+ panic(err)
+ }
+ return client, l
+}
+
+// flakeyListener kills accepted connections after a random timeout.
+type flakeyListener struct {
+ net.Listener
+ maxKillTimeout time.Duration
+ maxAcceptDelay time.Duration
+}
+
+func (l *flakeyListener) Accept() (net.Conn, error) {
+ delay := time.Duration(rand.Int63n(int64(l.maxAcceptDelay)))
+ time.Sleep(delay)
+
+ c, err := l.Listener.Accept()
+ if err == nil {
+ timeout := time.Duration(rand.Int63n(int64(l.maxKillTimeout)))
+ time.AfterFunc(timeout, func() {
+ glog.V(logger.Debug).Infof("killing conn %v after %v", c.LocalAddr(), timeout)
+ c.Close()
+ })
+ }
+ return c, err
+}
diff --git a/rpc/errors.go b/rpc/errors.go
index bc352fc45..9cf9dc60c 100644
--- a/rpc/errors.go
+++ b/rpc/errors.go
@@ -24,74 +24,43 @@ type methodNotFoundError struct {
method string
}
-func (e *methodNotFoundError) Code() int {
- return -32601
-}
+func (e *methodNotFoundError) ErrorCode() int { return -32601 }
func (e *methodNotFoundError) Error() string {
return fmt.Sprintf("The method %s%s%s does not exist/is not available", e.service, serviceMethodSeparator, e.method)
}
// received message isn't a valid request
-type invalidRequestError struct {
- message string
-}
+type invalidRequestError struct{ message string }
-func (e *invalidRequestError) Code() int {
- return -32600
-}
+func (e *invalidRequestError) ErrorCode() int { return -32600 }
-func (e *invalidRequestError) Error() string {
- return e.message
-}
+func (e *invalidRequestError) Error() string { return e.message }
// received message is invalid
-type invalidMessageError struct {
- message string
-}
+type invalidMessageError struct{ message string }
-func (e *invalidMessageError) Code() int {
- return -32700
-}
+func (e *invalidMessageError) ErrorCode() int { return -32700 }
-func (e *invalidMessageError) Error() string {
- return e.message
-}
+func (e *invalidMessageError) Error() string { return e.message }
// unable to decode supplied params, or an invalid number of parameters
-type invalidParamsError struct {
- message string
-}
+type invalidParamsError struct{ message string }
-func (e *invalidParamsError) Code() int {
- return -32602
-}
+func (e *invalidParamsError) ErrorCode() int { return -32602 }
-func (e *invalidParamsError) Error() string {
- return e.message
-}
+func (e *invalidParamsError) Error() string { return e.message }
// logic error, callback returned an error
-type callbackError struct {
- message string
-}
+type callbackError struct{ message string }
-func (e *callbackError) Code() int {
- return -32000
-}
+func (e *callbackError) ErrorCode() int { return -32000 }
-func (e *callbackError) Error() string {
- return e.message
-}
+func (e *callbackError) Error() string { return e.message }
// issued when a request is received after the server is issued to stop.
-type shutdownError struct {
-}
+type shutdownError struct{}
-func (e *shutdownError) Code() int {
- return -32000
-}
+func (e *shutdownError) ErrorCode() int { return -32000 }
-func (e *shutdownError) Error() string {
- return "server is shutting down"
-}
+func (e *shutdownError) Error() string { return "server is shutting down" }
diff --git a/rpc/http.go b/rpc/http.go
index 9283ce0ec..afcdd4bd6 100644
--- a/rpc/http.go
+++ b/rpc/http.go
@@ -22,71 +22,108 @@ import (
"fmt"
"io"
"io/ioutil"
+ "net"
"net/http"
- "net/url"
"strings"
+ "sync"
+ "time"
"github.com/rs/cors"
+ "golang.org/x/net/context"
)
const (
maxHTTPRequestContentLength = 1024 * 128
)
-// httpClient connects to a geth RPC server over HTTP.
-type httpClient struct {
- endpoint *url.URL // HTTP-RPC server endpoint
- httpClient http.Client // reuse connection
- lastRes []byte // HTTP requests are synchronous, store last response
+var nullAddr, _ = net.ResolveTCPAddr("tcp", "127.0.0.1:0")
+
+type httpConn struct {
+ client *http.Client
+ req *http.Request
+ closeOnce sync.Once
+ closed chan struct{}
+}
+
+// httpConn is treated specially by Client.
+func (hc *httpConn) LocalAddr() net.Addr { return nullAddr }
+func (hc *httpConn) RemoteAddr() net.Addr { return nullAddr }
+func (hc *httpConn) SetReadDeadline(time.Time) error { return nil }
+func (hc *httpConn) SetWriteDeadline(time.Time) error { return nil }
+func (hc *httpConn) SetDeadline(time.Time) error { return nil }
+func (hc *httpConn) Write([]byte) (int, error) { panic("Write called") }
+
+func (hc *httpConn) Read(b []byte) (int, error) {
+ <-hc.closed
+ return 0, io.EOF
+}
+
+func (hc *httpConn) Close() error {
+ hc.closeOnce.Do(func() { close(hc.closed) })
+ return nil
}
-// NewHTTPClient create a new RPC clients that connection to a geth RPC server
-// over HTTP.
-func NewHTTPClient(endpoint string) (Client, error) {
- url, err := url.Parse(endpoint)
+// DialHTTP creates a new RPC clients that connection to an RPC server over HTTP.
+func DialHTTP(endpoint string) (*Client, error) {
+ req, err := http.NewRequest("POST", endpoint, nil)
if err != nil {
return nil, err
}
- return &httpClient{endpoint: url}, nil
-}
+ req.Header.Set("Content-Type", "application/json")
+ req.Header.Set("Accept", "application/json")
-// Send will serialize the given msg to JSON and sends it to the RPC server.
-// Since HTTP is synchronous the response is stored until Recv is called.
-func (client *httpClient) Send(msg interface{}) error {
- var body []byte
- var err error
+ initctx := context.Background()
+ return newClient(initctx, func(context.Context) (net.Conn, error) {
+ return &httpConn{client: new(http.Client), req: req, closed: make(chan struct{})}, nil
+ })
+}
- client.lastRes = nil
- if body, err = json.Marshal(msg); err != nil {
+func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) error {
+ hc := c.writeConn.(*httpConn)
+ respBody, err := hc.doRequest(ctx, msg)
+ if err != nil {
return err
}
+ defer respBody.Close()
+ var respmsg jsonrpcMessage
+ if err := json.NewDecoder(respBody).Decode(&respmsg); err != nil {
+ return err
+ }
+ op.resp <- &respmsg
+ return nil
+}
- resp, err := client.httpClient.Post(client.endpoint.String(), "application/json", bytes.NewReader(body))
+func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonrpcMessage) error {
+ hc := c.writeConn.(*httpConn)
+ respBody, err := hc.doRequest(ctx, msgs)
if err != nil {
return err
}
-
- defer resp.Body.Close()
- if resp.StatusCode == http.StatusOK {
- client.lastRes, err = ioutil.ReadAll(resp.Body)
+ defer respBody.Close()
+ var respmsgs []jsonrpcMessage
+ if err := json.NewDecoder(respBody).Decode(&respmsgs); err != nil {
return err
}
-
- return fmt.Errorf("request failed: %s", resp.Status)
-}
-
-// Recv will try to deserialize the last received response into the given msg.
-func (client *httpClient) Recv(msg interface{}) error {
- return json.Unmarshal(client.lastRes, &msg)
+ for _, respmsg := range respmsgs {
+ op.resp <- &respmsg
+ }
+ return nil
}
-// Close is not necessary for httpClient
-func (client *httpClient) Close() {
-}
+func (hc *httpConn) doRequest(ctx context.Context, msg interface{}) (io.ReadCloser, error) {
+ body, err := json.Marshal(msg)
+ if err != nil {
+ return nil, err
+ }
+ client, req := requestWithContext(hc.client, hc.req, ctx)
+ req.Body = ioutil.NopCloser(bytes.NewReader(body))
+ req.ContentLength = int64(len(body))
-// SupportedModules will return the collection of offered RPC modules.
-func (client *httpClient) SupportedModules() (map[string]string, error) {
- return SupportedModules(client)
+ resp, err := client.Do(req)
+ if err != nil {
+ return nil, err
+ }
+ return resp.Body, nil
}
// httpReadWriteNopCloser wraps a io.Reader and io.Writer with a NOP Close method.
@@ -100,43 +137,39 @@ func (t *httpReadWriteNopCloser) Close() error {
return nil
}
-// newJSONHTTPHandler creates a HTTP handler that will parse incoming JSON requests,
-// send the request to the given API provider and sends the response back to the caller.
-func newJSONHTTPHandler(srv *Server) http.HandlerFunc {
- return func(w http.ResponseWriter, r *http.Request) {
- if r.ContentLength > maxHTTPRequestContentLength {
- http.Error(w,
- fmt.Sprintf("content length too large (%d>%d)", r.ContentLength, maxHTTPRequestContentLength),
- http.StatusRequestEntityTooLarge)
- return
- }
-
- w.Header().Set("content-type", "application/json")
-
- // create a codec that reads direct from the request body until
- // EOF and writes the response to w and order the server to process
- // a single request.
- codec := NewJSONCodec(&httpReadWriteNopCloser{r.Body, w})
- defer codec.Close()
- srv.ServeSingleRequest(codec, OptionMethodInvocation)
+// NewHTTPServer creates a new HTTP RPC server around an API provider.
+//
+// Deprecated: Server implements http.Handler
+func NewHTTPServer(corsString string, srv *Server) *http.Server {
+ return &http.Server{Handler: newCorsHandler(srv, corsString)}
+}
+
+// ServeHTTP serves JSON-RPC requests over HTTP.
+func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ if r.ContentLength > maxHTTPRequestContentLength {
+ http.Error(w,
+ fmt.Sprintf("content length too large (%d>%d)", r.ContentLength, maxHTTPRequestContentLength),
+ http.StatusRequestEntityTooLarge)
+ return
}
+ w.Header().Set("content-type", "application/json")
+
+ // create a codec that reads direct from the request body until
+ // EOF and writes the response to w and order the server to process
+ // a single request.
+ codec := NewJSONCodec(&httpReadWriteNopCloser{r.Body, w})
+ defer codec.Close()
+ srv.ServeSingleRequest(codec, OptionMethodInvocation)
}
-// NewHTTPServer creates a new HTTP RPC server around an API provider.
-func NewHTTPServer(corsString string, srv *Server) *http.Server {
+func newCorsHandler(srv *Server, corsString string) http.Handler {
var allowedOrigins []string
for _, domain := range strings.Split(corsString, ",") {
allowedOrigins = append(allowedOrigins, strings.TrimSpace(domain))
}
-
c := cors.New(cors.Options{
AllowedOrigins: allowedOrigins,
AllowedMethods: []string{"POST", "GET"},
})
-
- handler := c.Handler(newJSONHTTPHandler(srv))
-
- return &http.Server{
- Handler: handler,
- }
+ return c.Handler(srv)
}
diff --git a/rpc/inproc.go b/rpc/inproc.go
index 250f5c787..f72b97497 100644
--- a/rpc/inproc.go
+++ b/rpc/inproc.go
@@ -17,45 +17,18 @@
package rpc
import (
- "encoding/json"
- "io"
"net"
-)
-
-// inProcClient is an in-process buffer stream attached to an RPC server.
-type inProcClient struct {
- server *Server
- cl io.Closer
- enc *json.Encoder
- dec *json.Decoder
-}
-// Close tears down the request channel of the in-proc client.
-func (c *inProcClient) Close() {
- c.cl.Close()
-}
-
-// NewInProcRPCClient creates an in-process buffer stream attachment to a given
-// RPC server.
-func NewInProcRPCClient(handler *Server) Client {
- p1, p2 := net.Pipe()
- go handler.ServeCodec(NewJSONCodec(p1), OptionMethodInvocation|OptionSubscriptions)
- return &inProcClient{handler, p2, json.NewEncoder(p2), json.NewDecoder(p2)}
-}
-
-// Send marshals a message into a json format and injects in into the client
-// request channel.
-func (c *inProcClient) Send(msg interface{}) error {
- return c.enc.Encode(msg)
-}
-
-// Recv reads a message from the response channel and tries to parse it into the
-// given msg interface.
-func (c *inProcClient) Recv(msg interface{}) error {
- return c.dec.Decode(msg)
-}
+ "golang.org/x/net/context"
+)
-// Returns the collection of modules the RPC server offers.
-func (c *inProcClient) SupportedModules() (map[string]string, error) {
- return SupportedModules(c)
+// NewInProcClient attaches an in-process connection to the given RPC server.
+func DialInProc(handler *Server) *Client {
+ initctx := context.Background()
+ c, _ := newClient(initctx, func(context.Context) (net.Conn, error) {
+ p1, p2 := net.Pipe()
+ go handler.ServeCodec(NewJSONCodec(p1), OptionMethodInvocation|OptionSubscriptions)
+ return p2, nil
+ })
+ return c
}
diff --git a/rpc/ipc.go b/rpc/ipc.go
index 05d8909ca..c2b9e3871 100644
--- a/rpc/ipc.go
+++ b/rpc/ipc.go
@@ -17,68 +17,39 @@
package rpc
import (
- "encoding/json"
"net"
+
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+ "golang.org/x/net/context"
)
-// CreateIPCListener creates an listener, on Unix platforms this is a unix socket, on Windows this is a named pipe
+// CreateIPCListener creates an listener, on Unix platforms this is a unix socket, on
+// Windows this is a named pipe
func CreateIPCListener(endpoint string) (net.Listener, error) {
return ipcListen(endpoint)
}
-// ipcClient represent an IPC RPC client. It will connect to a given endpoint and tries to communicate with a node using
-// JSON serialization.
-type ipcClient struct {
- endpoint string
- conn net.Conn
- out *json.Encoder
- in *json.Decoder
-}
-
-// NewIPCClient create a new IPC client that will connect on the given endpoint. Messages are JSON encoded and encoded.
-// On Unix it assumes the endpoint is the full path to a unix socket, and Windows the endpoint is an identifier for a
-// named pipe.
-func NewIPCClient(endpoint string) (Client, error) {
- conn, err := newIPCConnection(endpoint)
- if err != nil {
- return nil, err
- }
- return &ipcClient{endpoint: endpoint, conn: conn, in: json.NewDecoder(conn), out: json.NewEncoder(conn)}, nil
-}
-
-// Send will serialize the given message and send it to the server.
-// When sending the message fails it will try to reconnect once and send the message again.
-func (client *ipcClient) Send(msg interface{}) error {
- if err := client.out.Encode(msg); err == nil {
- return nil
- }
-
- // retry once
- client.conn.Close()
-
- conn, err := newIPCConnection(client.endpoint)
- if err != nil {
- return err
+// ServeListener accepts connections on l, serving JSON-RPC on them.
+func (srv *Server) ServeListener(l net.Listener) error {
+ for {
+ conn, err := l.Accept()
+ if err != nil {
+ return err
+ }
+ glog.V(logger.Detail).Infoln("accepted conn", conn.RemoteAddr())
+ go srv.ServeCodec(NewJSONCodec(conn), OptionMethodInvocation|OptionSubscriptions)
}
-
- client.conn = conn
- client.in = json.NewDecoder(conn)
- client.out = json.NewEncoder(conn)
-
- return client.out.Encode(msg)
-}
-
-// Recv will read a message from the connection and tries to parse it. It assumes the received message is JSON encoded.
-func (client *ipcClient) Recv(msg interface{}) error {
- return client.in.Decode(&msg)
-}
-
-// Close will close the underlying IPC connection
-func (client *ipcClient) Close() {
- client.conn.Close()
}
-// SupportedModules will return the collection of offered RPC modules.
-func (client *ipcClient) SupportedModules() (map[string]string, error) {
- return SupportedModules(client)
+// DialIPC create a new IPC client that connects to the given endpoint. On Unix it assumes
+// the endpoint is the full path to a unix socket, and Windows the endpoint is an
+// identifier for a named pipe.
+//
+// The context is used for the initial connection establishment. It does not
+// affect subsequent interactions with the client.
+func DialIPC(ctx context.Context, endpoint string) (*Client, error) {
+ return newClient(ctx, func(ctx context.Context) (net.Conn, error) {
+ return newIPCConnection(ctx, endpoint)
+ })
}
diff --git a/rpc/ipc_unix.go b/rpc/ipc_unix.go
index 9ece01240..a25b21627 100644
--- a/rpc/ipc_unix.go
+++ b/rpc/ipc_unix.go
@@ -22,6 +22,8 @@ import (
"net"
"os"
"path/filepath"
+
+ "golang.org/x/net/context"
)
// ipcListen will create a Unix socket on the given endpoint.
@@ -40,6 +42,6 @@ func ipcListen(endpoint string) (net.Listener, error) {
}
// newIPCConnection will connect to a Unix socket on the given endpoint.
-func newIPCConnection(endpoint string) (net.Conn, error) {
- return net.DialUnix("unix", nil, &net.UnixAddr{Name: endpoint, Net: "unix"})
+func newIPCConnection(ctx context.Context, endpoint string) (net.Conn, error) {
+ return dialContext(ctx, "unix", endpoint)
}
diff --git a/rpc/ipc_windows.go b/rpc/ipc_windows.go
index 8342d04d5..68234d215 100644
--- a/rpc/ipc_windows.go
+++ b/rpc/ipc_windows.go
@@ -22,16 +22,27 @@ import (
"net"
"time"
+ "golang.org/x/net/context"
"gopkg.in/natefinch/npipe.v2"
)
+// This is used if the dialing context has no deadline. It is much smaller than the
+// defaultDialTimeout because named pipes are local and there is no need to wait so long.
+const defaultPipeDialTimeout = 2 * time.Second
+
// ipcListen will create a named pipe on the given endpoint.
func ipcListen(endpoint string) (net.Listener, error) {
return npipe.Listen(endpoint)
}
// newIPCConnection will connect to a named pipe with the given endpoint as name.
-func newIPCConnection(endpoint string) (net.Conn, error) {
- timeout := 5 * time.Second
+func newIPCConnection(ctx context.Context, endpoint string) (net.Conn, error) {
+ timeout := defaultPipeDialTimeout
+ if deadline, ok := ctx.Deadline(); ok {
+ timeout = deadline.Sub(time.Now())
+ if timeout < 0 {
+ timeout = 0
+ }
+ }
return npipe.DialTimeout(endpoint, timeout)
}
diff --git a/rpc/json.go b/rpc/json.go
index 151ed546e..a7053e3f5 100644
--- a/rpc/json.go
+++ b/rpc/json.go
@@ -30,49 +30,43 @@ import (
)
const (
- JSONRPCVersion = "2.0"
+ jsonrpcVersion = "2.0"
serviceMethodSeparator = "_"
subscribeMethod = "eth_subscribe"
unsubscribeMethod = "eth_unsubscribe"
notificationMethod = "eth_subscription"
)
-// JSON-RPC request
-type JSONRequest struct {
+type jsonRequest struct {
Method string `json:"method"`
Version string `json:"jsonrpc"`
Id json.RawMessage `json:"id,omitempty"`
Payload json.RawMessage `json:"params,omitempty"`
}
-// JSON-RPC response
-type JSONSuccessResponse struct {
+type jsonSuccessResponse struct {
Version string `json:"jsonrpc"`
Id interface{} `json:"id,omitempty"`
Result interface{} `json:"result"`
}
-// JSON-RPC error object
-type JSONError struct {
+type jsonError struct {
Code int `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data,omitempty"`
}
-// JSON-RPC error response
-type JSONErrResponse struct {
+type jsonErrResponse struct {
Version string `json:"jsonrpc"`
Id interface{} `json:"id,omitempty"`
- Error JSONError `json:"error"`
+ Error jsonError `json:"error"`
}
-// JSON-RPC notification payload
type jsonSubscription struct {
Subscription string `json:"subscription"`
Result interface{} `json:"result,omitempty"`
}
-// JSON-RPC notification
type jsonNotification struct {
Version string `json:"jsonrpc"`
Method string `json:"method"`
@@ -91,6 +85,17 @@ type jsonCodec struct {
rw io.ReadWriteCloser // connection
}
+func (err *jsonError) Error() string {
+ if err.Message == "" {
+ return fmt.Sprintf("json-rpc error %d", err.Code)
+ }
+ return err.Message
+}
+
+func (err *jsonError) ErrorCode() int {
+ return err.Code
+}
+
// NewJSONCodec creates a new RPC server codec with support for JSON-RPC 2.0
func NewJSONCodec(rwc io.ReadWriteCloser) ServerCodec {
d := json.NewDecoder(rwc)
@@ -113,7 +118,7 @@ func isBatch(msg json.RawMessage) bool {
// ReadRequestHeaders will read new requests without parsing the arguments. It will
// return a collection of requests, an indication if these requests are in batch
// form or an error when the incoming message could not be read/parsed.
-func (c *jsonCodec) ReadRequestHeaders() ([]rpcRequest, bool, RPCError) {
+func (c *jsonCodec) ReadRequestHeaders() ([]rpcRequest, bool, Error) {
c.decMu.Lock()
defer c.decMu.Unlock()
@@ -148,8 +153,8 @@ func checkReqId(reqId json.RawMessage) error {
// parseRequest will parse a single request from the given RawMessage. It will return
// the parsed request, an indication if the request was a batch or an error when
// the request could not be parsed.
-func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCError) {
- var in JSONRequest
+func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, Error) {
+ var in jsonRequest
if err := json.Unmarshal(incomingMsg, &in); err != nil {
return nil, false, &invalidMessageError{err.Error()}
}
@@ -182,12 +187,12 @@ func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCError) {
method: unsubscribeMethod, params: in.Payload}}, false, nil
}
- // regular RPC call
elems := strings.Split(in.Method, serviceMethodSeparator)
if len(elems) != 2 {
return nil, false, &methodNotFoundError{in.Method, ""}
}
+ // regular RPC call
if len(in.Payload) == 0 {
return []rpcRequest{rpcRequest{service: elems[0], method: elems[1], id: &in.Id}}, false, nil
}
@@ -197,8 +202,8 @@ func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCError) {
// parseBatchRequest will parse a batch request into a collection of requests from the given RawMessage, an indication
// if the request was a batch or an error when the request could not be read.
-func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCError) {
- var in []JSONRequest
+func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, Error) {
+ var in []jsonRequest
if err := json.Unmarshal(incomingMsg, &in); err != nil {
return nil, false, &invalidMessageError{err.Error()}
}
@@ -236,15 +241,15 @@ func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCErro
continue
}
- elems := strings.Split(r.Method, serviceMethodSeparator)
- if len(elems) != 2 {
- return nil, true, &methodNotFoundError{r.Method, ""}
- }
-
if len(r.Payload) == 0 {
- requests[i] = rpcRequest{service: elems[0], method: elems[1], id: id, params: nil}
+ requests[i] = rpcRequest{id: id, params: nil}
+ } else {
+ requests[i] = rpcRequest{id: id, params: r.Payload}
+ }
+ if elem := strings.Split(r.Method, serviceMethodSeparator); len(elem) == 2 {
+ requests[i].service, requests[i].method = elem[0], elem[1]
} else {
- requests[i] = rpcRequest{service: elems[0], method: elems[1], id: id, params: r.Payload}
+ requests[i].err = &methodNotFoundError{r.Method, ""}
}
}
@@ -253,7 +258,7 @@ func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCErro
// ParseRequestArguments tries to parse the given params (json.RawMessage) with the given types. It returns the parsed
// values or an error when the parsing failed.
-func (c *jsonCodec) ParseRequestArguments(argTypes []reflect.Type, params interface{}) ([]reflect.Value, RPCError) {
+func (c *jsonCodec) ParseRequestArguments(argTypes []reflect.Type, params interface{}) ([]reflect.Value, Error) {
if args, ok := params.(json.RawMessage); !ok {
return nil, &invalidParamsError{"Invalid params supplied"}
} else {
@@ -264,7 +269,7 @@ func (c *jsonCodec) ParseRequestArguments(argTypes []reflect.Type, params interf
// parsePositionalArguments tries to parse the given args to an array of values with the given types.
// It returns the parsed values or an error when the args could not be parsed. Missing optional arguments
// are returned as reflect.Zero values.
-func parsePositionalArguments(args json.RawMessage, callbackArgs []reflect.Type) ([]reflect.Value, RPCError) {
+func parsePositionalArguments(args json.RawMessage, callbackArgs []reflect.Type) ([]reflect.Value, Error) {
params := make([]interface{}, 0, len(callbackArgs))
for _, t := range callbackArgs {
params = append(params, reflect.New(t).Interface())
@@ -302,31 +307,31 @@ func parsePositionalArguments(args json.RawMessage, callbackArgs []reflect.Type)
// CreateResponse will create a JSON-RPC success response with the given id and reply as result.
func (c *jsonCodec) CreateResponse(id interface{}, reply interface{}) interface{} {
if isHexNum(reflect.TypeOf(reply)) {
- return &JSONSuccessResponse{Version: JSONRPCVersion, Id: id, Result: fmt.Sprintf(`%#x`, reply)}
+ return &jsonSuccessResponse{Version: jsonrpcVersion, Id: id, Result: fmt.Sprintf(`%#x`, reply)}
}
- return &JSONSuccessResponse{Version: JSONRPCVersion, Id: id, Result: reply}
+ return &jsonSuccessResponse{Version: jsonrpcVersion, Id: id, Result: reply}
}
// CreateErrorResponse will create a JSON-RPC error response with the given id and error.
-func (c *jsonCodec) CreateErrorResponse(id interface{}, err RPCError) interface{} {
- return &JSONErrResponse{Version: JSONRPCVersion, Id: id, Error: JSONError{Code: err.Code(), Message: err.Error()}}
+func (c *jsonCodec) CreateErrorResponse(id interface{}, err Error) interface{} {
+ return &jsonErrResponse{Version: jsonrpcVersion, Id: id, Error: jsonError{Code: err.ErrorCode(), Message: err.Error()}}
}
// CreateErrorResponseWithInfo will create a JSON-RPC error response with the given id and error.
// info is optional and contains additional information about the error. When an empty string is passed it is ignored.
-func (c *jsonCodec) CreateErrorResponseWithInfo(id interface{}, err RPCError, info interface{}) interface{} {
- return &JSONErrResponse{Version: JSONRPCVersion, Id: id,
- Error: JSONError{Code: err.Code(), Message: err.Error(), Data: info}}
+func (c *jsonCodec) CreateErrorResponseWithInfo(id interface{}, err Error, info interface{}) interface{} {
+ return &jsonErrResponse{Version: jsonrpcVersion, Id: id,
+ Error: jsonError{Code: err.ErrorCode(), Message: err.Error(), Data: info}}
}
// CreateNotification will create a JSON-RPC notification with the given subscription id and event as params.
func (c *jsonCodec) CreateNotification(subid string, event interface{}) interface{} {
if isHexNum(reflect.TypeOf(event)) {
- return &jsonNotification{Version: JSONRPCVersion, Method: notificationMethod,
+ return &jsonNotification{Version: jsonrpcVersion, Method: notificationMethod,
Params: jsonSubscription{Subscription: subid, Result: fmt.Sprintf(`%#x`, event)}}
}
- return &jsonNotification{Version: JSONRPCVersion, Method: notificationMethod,
+ return &jsonNotification{Version: jsonrpcVersion, Method: notificationMethod,
Params: jsonSubscription{Subscription: subid, Result: event}}
}
diff --git a/rpc/notification.go b/rpc/notification.go
index e84e26a58..875433071 100644
--- a/rpc/notification.go
+++ b/rpc/notification.go
@@ -28,7 +28,7 @@ import (
var (
// ErrNotificationsUnsupported is returned when the connection doesn't support notifications
- ErrNotificationsUnsupported = errors.New("notifications not supported")
+ ErrNotificationsUnsupported = errors.New("subscription notifications not supported by the current transport")
// ErrNotificationNotFound is returned when the notification for the given id is not found
ErrNotificationNotFound = errors.New("notification not found")
diff --git a/rpc/notification_test.go b/rpc/notification_test.go
index 1bcede177..280503222 100644
--- a/rpc/notification_test.go
+++ b/rpc/notification_test.go
@@ -19,20 +19,31 @@ package rpc
import (
"encoding/json"
"net"
+ "sync"
"testing"
"time"
"golang.org/x/net/context"
)
-type NotificationTestService struct{}
+type NotificationTestService struct {
+ mu sync.Mutex
+ unsubscribed bool
-var (
- unsubCallbackCalled = false
-)
+ gotHangSubscriptionReq chan struct{}
+ unblockHangSubscription chan struct{}
+}
+
+func (s *NotificationTestService) wasUnsubCallbackCalled() bool {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ return s.unsubscribed
+}
func (s *NotificationTestService) Unsubscribe(subid string) {
- unsubCallbackCalled = true
+ s.mu.Lock()
+ s.unsubscribed = true
+ s.mu.Unlock()
}
func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val int) (Subscription, error) {
@@ -60,6 +71,26 @@ func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val i
return subscription, nil
}
+// HangSubscription blocks on s.unblockHangSubscription before
+// sending anything.
+func (s *NotificationTestService) HangSubscription(ctx context.Context, val int) (Subscription, error) {
+ notifier, supported := NotifierFromContext(ctx)
+ if !supported {
+ return nil, ErrNotificationsUnsupported
+ }
+
+ s.gotHangSubscriptionReq <- struct{}{}
+ <-s.unblockHangSubscription
+ subscription, err := notifier.NewSubscription(s.Unsubscribe)
+ if err != nil {
+ return nil, err
+ }
+ go func() {
+ subscription.Notify(val)
+ }()
+ return subscription, nil
+}
+
func TestNotifications(t *testing.T) {
server := NewServer()
service := &NotificationTestService{}
@@ -90,7 +121,7 @@ func TestNotifications(t *testing.T) {
}
var subid string
- response := JSONSuccessResponse{Result: subid}
+ response := jsonSuccessResponse{Result: subid}
if err := in.Decode(&response); err != nil {
t.Fatal(err)
}
@@ -114,7 +145,7 @@ func TestNotifications(t *testing.T) {
clientConn.Close() // causes notification unsubscribe callback to be called
time.Sleep(1 * time.Second)
- if !unsubCallbackCalled {
+ if !service.wasUnsubCallbackCalled() {
t.Error("unsubscribe callback not called after closing connection")
}
}
diff --git a/rpc/server.go b/rpc/server.go
index 7b7d22063..040805a5c 100644
--- a/rpc/server.go
+++ b/rpc/server.go
@@ -21,7 +21,6 @@ import (
"reflect"
"runtime"
"sync/atomic"
- "time"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
@@ -30,8 +29,6 @@ import (
)
const (
- stopPendingRequestTimeout = 3 * time.Second // give pending requests stopPendingRequestTimeout the time to finish when the server is stopped
-
notificationBufferSize = 10000 // max buffered notifications before codec is closed
MetadataApi = "rpc"
@@ -183,7 +180,7 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO
for atomic.LoadInt32(&s.run) == 1 {
reqs, batch, err := s.readRequest(codec)
if err != nil {
- glog.V(logger.Debug).Infof("%v\n", err)
+ glog.V(logger.Debug).Infof("read error %v\n", err)
codec.Write(codec.CreateErrorResponse(nil, err))
return nil
}
@@ -240,13 +237,11 @@ func (s *Server) ServeSingleRequest(codec ServerCodec, options CodecOption) {
func (s *Server) Stop() {
if atomic.CompareAndSwapInt32(&s.run, 1, 0) {
glog.V(logger.Debug).Infoln("RPC Server shutdown initiatied")
- time.AfterFunc(stopPendingRequestTimeout, func() {
- s.codecsMu.Lock()
- defer s.codecsMu.Unlock()
- s.codecs.Each(func(c interface{}) bool {
- c.(ServerCodec).Close()
- return true
- })
+ s.codecsMu.Lock()
+ defer s.codecsMu.Unlock()
+ s.codecs.Each(func(c interface{}) bool {
+ c.(ServerCodec).Close()
+ return true
})
}
}
@@ -386,7 +381,7 @@ func (s *Server) execBatch(ctx context.Context, codec ServerCodec, requests []*s
// readRequest requests the next (batch) request from the codec. It will return the collection
// of requests, an indication if the request was a batch, the invalid request identifier and an
// error when the request could not be read/parsed.
-func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCError) {
+func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error) {
reqs, batch, err := codec.ReadRequestHeaders()
if err != nil {
return nil, batch, err
@@ -399,6 +394,11 @@ func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCErro
var ok bool
var svc *service
+ if r.err != nil {
+ requests[i] = &serverRequest{id: r.id, err: r.err}
+ continue
+ }
+
if r.isPubSub && r.method == unsubscribeMethod {
requests[i] = &serverRequest{id: r.id, isUnsubscribe: true}
argTypes := []reflect.Type{reflect.TypeOf("")} // expect subscription id as first arg
diff --git a/rpc/server_test.go b/rpc/server_test.go
index de47e1afd..e6840bde4 100644
--- a/rpc/server_test.go
+++ b/rpc/server_test.go
@@ -21,6 +21,7 @@ import (
"net"
"reflect"
"testing"
+ "time"
"golang.org/x/net/context"
)
@@ -48,6 +49,13 @@ func (s *Service) EchoWithCtx(ctx context.Context, str string, i int, args *Args
return Result{str, i, args}
}
+func (s *Service) Sleep(ctx context.Context, duration time.Duration) {
+ select {
+ case <-time.After(duration):
+ case <-ctx.Done():
+ }
+}
+
func (s *Service) Rets() (string, error) {
return "", nil
}
@@ -85,8 +93,8 @@ func TestServerRegisterName(t *testing.T) {
t.Fatalf("Expected service calc to be registered")
}
- if len(svc.callbacks) != 4 {
- t.Errorf("Expected 4 callbacks for service 'calc', got %d", len(svc.callbacks))
+ if len(svc.callbacks) != 5 {
+ t.Errorf("Expected 5 callbacks for service 'calc', got %d", len(svc.callbacks))
}
if len(svc.subscriptions) != 1 {
@@ -126,7 +134,7 @@ func testServerMethodExecution(t *testing.T, method string) {
t.Fatal(err)
}
- response := JSONSuccessResponse{Result: &Result{}}
+ response := jsonSuccessResponse{Result: &Result{}}
if err := in.Decode(&response); err != nil {
t.Fatal(err)
}
diff --git a/rpc/types.go b/rpc/types.go
index a1f36fbd2..2a7268ad8 100644
--- a/rpc/types.go
+++ b/rpc/types.go
@@ -62,7 +62,7 @@ type serverRequest struct {
callb *callback
args []reflect.Value
isUnsubscribe bool
- err RPCError
+ err Error
}
type serviceRegistry map[string]*service // collection of services
@@ -88,14 +88,13 @@ type rpcRequest struct {
id interface{}
isPubSub bool
params interface{}
+ err Error // invalid batch element
}
-// RPCError implements RPC error, is add support for error codec over regular go errors
-type RPCError interface {
- // RPC error code
- Code() int
- // Error message
- Error() string
+// Error wraps RPC errors, which contain an error code in addition to the message.
+type Error interface {
+ Error() string // returns the message
+ ErrorCode() int // returns the code
}
// ServerCodec implements reading, parsing and writing RPC messages for the server side of
@@ -103,15 +102,15 @@ type RPCError interface {
// multiple go-routines concurrently.
type ServerCodec interface {
// Read next request
- ReadRequestHeaders() ([]rpcRequest, bool, RPCError)
+ ReadRequestHeaders() ([]rpcRequest, bool, Error)
// Parse request argument to the given types
- ParseRequestArguments([]reflect.Type, interface{}) ([]reflect.Value, RPCError)
+ ParseRequestArguments([]reflect.Type, interface{}) ([]reflect.Value, Error)
// Assemble success response, expects response id and payload
CreateResponse(interface{}, interface{}) interface{}
// Assemble error response, expects response id and error
- CreateErrorResponse(interface{}, RPCError) interface{}
+ CreateErrorResponse(interface{}, Error) interface{}
// Assemble error response with extra information about the error through info
- CreateErrorResponseWithInfo(id interface{}, err RPCError, info interface{}) interface{}
+ CreateErrorResponseWithInfo(id interface{}, err Error, info interface{}) interface{}
// Create notification response
CreateNotification(string, interface{}) interface{}
// Write msg to client.
@@ -273,14 +272,3 @@ func (bn *BlockNumber) UnmarshalJSON(data []byte) error {
func (bn *BlockNumber) Int64() int64 {
return (int64)(*bn)
}
-
-// Client defines the interface for go client that wants to connect to a geth RPC endpoint
-type Client interface {
- // SupportedModules returns the collection of API's the server offers
- SupportedModules() (map[string]string, error)
-
- Send(req interface{}) error
- Recv(msg interface{}) error
-
- Close()
-}
diff --git a/rpc/utils.go b/rpc/utils.go
index fe482e19d..1ac6698f5 100644
--- a/rpc/utils.go
+++ b/rpc/utils.go
@@ -20,7 +20,6 @@ import (
"crypto/rand"
"encoding/hex"
"errors"
- "fmt"
"math/big"
"reflect"
"unicode"
@@ -227,31 +226,3 @@ func newSubscriptionID() (string, error) {
}
return "0x" + hex.EncodeToString(subid[:]), nil
}
-
-// SupportedModules returns the collection of API's that the RPC server offers
-// on which the given client connects.
-func SupportedModules(client Client) (map[string]string, error) {
- req := JSONRequest{
- Id: []byte("1"),
- Version: "2.0",
- Method: MetadataApi + "_modules",
- }
- if err := client.Send(req); err != nil {
- return nil, err
- }
-
- var response JSONSuccessResponse
- if err := client.Recv(&response); err != nil {
- return nil, err
- }
- if response.Result != nil {
- mods := make(map[string]string)
- if modules, ok := response.Result.(map[string]interface{}); ok {
- for m, v := range modules {
- mods[m] = fmt.Sprintf("%s", v)
- }
- return mods, nil
- }
- }
- return nil, fmt.Errorf("unable to retrieve modules")
-}
diff --git a/rpc/websocket.go b/rpc/websocket.go
index fe9354d94..fc3cd0709 100644
--- a/rpc/websocket.go
+++ b/rpc/websocket.go
@@ -17,36 +17,39 @@
package rpc
import (
+ "crypto/tls"
"fmt"
+ "net"
"net/http"
+ "net/url"
"os"
"strings"
- "sync"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
+ "golang.org/x/net/context"
"golang.org/x/net/websocket"
"gopkg.in/fatih/set.v0"
)
-// wsReaderWriterCloser reads and write payloads from and to a websocket connection.
-type wsReaderWriterCloser struct {
- c *websocket.Conn
-}
-
-// Read will read incoming payload data into p.
-func (rw *wsReaderWriterCloser) Read(p []byte) (int, error) {
- return rw.c.Read(p)
-}
-
-// Write writes p to the websocket.
-func (rw *wsReaderWriterCloser) Write(p []byte) (int, error) {
- return rw.c.Write(p)
+// WebsocketHandler returns a handler that serves JSON-RPC to WebSocket connections.
+//
+// allowedOrigins should be a comma-separated list of allowed origin URLs.
+// To allow connections with any origin, pass "*".
+func (srv *Server) WebsocketHandler(allowedOrigins string) http.Handler {
+ return websocket.Server{
+ Handshake: wsHandshakeValidator(strings.Split(allowedOrigins, ",")),
+ Handler: func(conn *websocket.Conn) {
+ srv.ServeCodec(NewJSONCodec(conn), OptionMethodInvocation|OptionSubscriptions)
+ },
+ }
}
-// Close closes the websocket connection.
-func (rw *wsReaderWriterCloser) Close() error {
- return rw.c.Close()
+// NewWSServer creates a new websocket RPC server around an API provider.
+//
+// Deprecated: use Server.WebsocketHandler
+func NewWSServer(allowedOrigins string, srv *Server) *http.Server {
+ return &http.Server{Handler: srv.WebsocketHandler(allowedOrigins)}
}
// wsHandshakeValidator returns a handler that verifies the origin during the
@@ -87,96 +90,63 @@ func wsHandshakeValidator(allowedOrigins []string) func(*websocket.Config, *http
return f
}
-// NewWSServer creates a new websocket RPC server around an API provider.
-func NewWSServer(allowedOrigins string, handler *Server) *http.Server {
- return &http.Server{
- Handler: websocket.Server{
- Handshake: wsHandshakeValidator(strings.Split(allowedOrigins, ",")),
- Handler: func(conn *websocket.Conn) {
- handler.ServeCodec(NewJSONCodec(&wsReaderWriterCloser{conn}),
- OptionMethodInvocation|OptionSubscriptions)
- },
- },
+// DialWebsocket creates a new RPC client that communicates with a JSON-RPC server
+// that is listening on the given endpoint.
+//
+// The context is used for the initial connection establishment. It does not
+// affect subsequent interactions with the client.
+func DialWebsocket(ctx context.Context, endpoint, origin string) (*Client, error) {
+ if origin == "" {
+ var err error
+ if origin, err = os.Hostname(); err != nil {
+ return nil, err
+ }
+ if strings.HasPrefix(endpoint, "wss") {
+ origin = "https://" + strings.ToLower(origin)
+ } else {
+ origin = "http://" + strings.ToLower(origin)
+ }
+ }
+ config, err := websocket.NewConfig(endpoint, origin)
+ if err != nil {
+ return nil, err
}
-}
-
-// wsClient represents a RPC client that communicates over websockets with a
-// RPC server.
-type wsClient struct {
- endpoint string
- connMu sync.Mutex
- conn *websocket.Conn
-}
-// NewWSClientj creates a new RPC client that communicates with a RPC server
-// that is listening on the given endpoint using JSON encoding.
-func NewWSClient(endpoint string) (Client, error) {
- return &wsClient{endpoint: endpoint}, nil
+ return newClient(ctx, func(ctx context.Context) (net.Conn, error) {
+ return wsDialContext(ctx, config)
+ })
}
-// connection will return a websocket connection to the RPC server. It will
-// (re)connect when necessary.
-func (client *wsClient) connection() (*websocket.Conn, error) {
- if client.conn != nil {
- return client.conn, nil
+func wsDialContext(ctx context.Context, config *websocket.Config) (*websocket.Conn, error) {
+ var conn net.Conn
+ var err error
+ switch config.Location.Scheme {
+ case "ws":
+ conn, err = dialContext(ctx, "tcp", wsDialAddress(config.Location))
+ case "wss":
+ dialer := contextDialer(ctx)
+ conn, err = tls.DialWithDialer(dialer, "tcp", wsDialAddress(config.Location), config.TlsConfig)
+ default:
+ err = websocket.ErrBadScheme
}
-
- origin, err := os.Hostname()
if err != nil {
return nil, err
}
-
- origin = "http://" + origin
- client.conn, err = websocket.Dial(client.endpoint, "", origin)
-
- return client.conn, err
-}
-
-// SupportedModules is the collection of modules the RPC server offers.
-func (client *wsClient) SupportedModules() (map[string]string, error) {
- return SupportedModules(client)
-}
-
-// Send writes the JSON serialized msg to the websocket. It will create a new
-// websocket connection to the server if the client is currently not connected.
-func (client *wsClient) Send(msg interface{}) (err error) {
- client.connMu.Lock()
- defer client.connMu.Unlock()
-
- var conn *websocket.Conn
- if conn, err = client.connection(); err == nil {
- if err = websocket.JSON.Send(conn, msg); err != nil {
- client.conn.Close()
- client.conn = nil
- }
+ ws, err := websocket.NewClient(config, conn)
+ if err != nil {
+ conn.Close()
+ return nil, err
}
-
- return err
+ return ws, err
}
-// Recv reads a JSON message from the websocket and unmarshals it into msg.
-func (client *wsClient) Recv(msg interface{}) (err error) {
- client.connMu.Lock()
- defer client.connMu.Unlock()
+var wsPortMap = map[string]string{"ws": "80", "wss": "443"}
- var conn *websocket.Conn
- if conn, err = client.connection(); err == nil {
- if err = websocket.JSON.Receive(conn, msg); err != nil {
- client.conn.Close()
- client.conn = nil
+func wsDialAddress(location *url.URL) string {
+ if _, ok := wsPortMap[location.Scheme]; ok {
+ if _, _, err := net.SplitHostPort(location.Host); err != nil {
+ return net.JoinHostPort(location.Host, wsPortMap[location.Scheme])
}
}
- return
-}
-
-// Close closes the underlaying websocket connection.
-func (client *wsClient) Close() {
- client.connMu.Lock()
- defer client.connMu.Unlock()
-
- if client.conn != nil {
- client.conn.Close()
- client.conn = nil
- }
-
+ return location.Host
}