aboutsummaryrefslogtreecommitdiffstats
path: root/rpc
diff options
context:
space:
mode:
authorFelix Lange <fjl@twurst.com>2016-07-12 23:47:15 +0800
committerFelix Lange <fjl@twurst.com>2016-07-23 05:21:27 +0800
commit91b769042857f542b2792b23ec407e1c9bd4fe8d (patch)
treef6730b3e85a7ac5ca98f9a716505349958fcacd3 /rpc
parentbb01bea4e276dad359815c682a2dee730737f4dc (diff)
downloadgo-tangerine-91b769042857f542b2792b23ec407e1c9bd4fe8d.tar
go-tangerine-91b769042857f542b2792b23ec407e1c9bd4fe8d.tar.gz
go-tangerine-91b769042857f542b2792b23ec407e1c9bd4fe8d.tar.bz2
go-tangerine-91b769042857f542b2792b23ec407e1c9bd4fe8d.tar.lz
go-tangerine-91b769042857f542b2792b23ec407e1c9bd4fe8d.tar.xz
go-tangerine-91b769042857f542b2792b23ec407e1c9bd4fe8d.tar.zst
go-tangerine-91b769042857f542b2792b23ec407e1c9bd4fe8d.zip
rpc: add new client, use it everywhere
The new client implementation supports concurrent requests, subscriptions and replaces the various ad hoc RPC clients throughout go-ethereum.
Diffstat (limited to 'rpc')
-rw-r--r--rpc/client.go740
-rw-r--r--rpc/client_context_go1.4.go60
-rw-r--r--rpc/client_context_go1.5.go61
-rw-r--r--rpc/client_context_go1.6.go55
-rw-r--r--rpc/client_context_go1.7.go51
-rw-r--r--rpc/client_example_test.go83
-rw-r--r--rpc/client_test.go489
-rw-r--r--rpc/errors.go63
-rw-r--r--rpc/http.go163
-rw-r--r--rpc/inproc.go49
-rw-r--r--rpc/ipc.go79
-rw-r--r--rpc/ipc_unix.go6
-rw-r--r--rpc/ipc_windows.go15
-rw-r--r--rpc/json.go61
-rw-r--r--rpc/notification.go2
-rw-r--r--rpc/notification_test.go45
-rw-r--r--rpc/server.go2
-rw-r--r--rpc/server_test.go14
-rw-r--r--rpc/types.go33
-rw-r--r--rpc/utils.go29
-rw-r--r--rpc/websocket.go160
21 files changed, 1865 insertions, 395 deletions
diff --git a/rpc/client.go b/rpc/client.go
new file mode 100644
index 000000000..4ff9a8cb9
--- /dev/null
+++ b/rpc/client.go
@@ -0,0 +1,740 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package rpc
+
+import (
+ "bytes"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "net"
+ "net/url"
+ "reflect"
+ "strconv"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+ "golang.org/x/net/context"
+)
+
+var (
+ ErrClientQuit = errors.New("client is closed")
+ ErrNoResult = errors.New("no result in JSON-RPC response")
+)
+
+const (
+ clientSubscriptionBuffer = 100 // if exceeded, the client stops reading
+ tcpKeepAliveInterval = 30 * time.Second
+ defaultDialTimeout = 10 * time.Second // used when dialing if the context has no deadline
+ defaultWriteTimeout = 10 * time.Second // used for calls if the context has no deadline
+ subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls
+)
+
+// BatchElem is an element in a batch request.
+type BatchElem struct {
+ Method string
+ Args []interface{}
+ // The result is unmarshaled into this field. Result must be set to a
+ // non-nil pointer value of the desired type, otherwise the response will be
+ // discarded.
+ Result interface{}
+ // Error is set if the server returns an error for this request, or if
+ // unmarshaling into Result fails. It is not set for I/O errors.
+ Error error
+}
+
+// A value of this type can a JSON-RPC request, notification, successful response or
+// error response. Which one it is depends on the fields.
+type jsonrpcMessage struct {
+ Version string `json:"jsonrpc"`
+ ID json.RawMessage `json:"id,omitempty"`
+ Method string `json:"method,omitempty"`
+ Params json.RawMessage `json:"params,omitempty"`
+ Error *jsonError `json:"error,omitempty"`
+ Result json.RawMessage `json:"result,omitempty"`
+}
+
+func (msg *jsonrpcMessage) isNotification() bool {
+ return msg.ID == nil && msg.Method != ""
+}
+
+func (msg *jsonrpcMessage) isResponse() bool {
+ return msg.hasValidID() && msg.Method == "" && len(msg.Params) == 0
+}
+
+func (msg *jsonrpcMessage) hasValidID() bool {
+ return len(msg.ID) > 0 && msg.ID[0] != '{' && msg.ID[0] != '['
+}
+
+func (msg *jsonrpcMessage) String() string {
+ b, _ := json.Marshal(msg)
+ return string(b)
+}
+
+// Client represents a connection to an RPC server.
+type Client struct {
+ idCounter uint32
+ connectFunc func(ctx context.Context) (net.Conn, error)
+ isHTTP bool
+
+ // writeConn is only safe to access outside dispatch, with the
+ // write lock held. The write lock is taken by sending on
+ // requestOp and released by sending on sendDone.
+ writeConn net.Conn
+
+ // for dispatch
+ close chan struct{}
+ didQuit chan struct{} // closed when client quits
+ reconnected chan net.Conn // where write/reconnect sends the new connection
+ readErr chan error // errors from read
+ readResp chan []*jsonrpcMessage // valid messages from read
+ requestOp chan *requestOp // for registering response IDs
+ sendDone chan error // signals write completion, releases write lock
+ respWait map[string]*requestOp // active requests
+ subs map[string]*ClientSubscription // active subscriptions
+}
+
+type requestOp struct {
+ ids []json.RawMessage
+ err error
+ resp chan *jsonrpcMessage // receives up to len(ids) responses
+ sub *ClientSubscription // only set for EthSubscribe requests
+}
+
+func (op *requestOp) wait(ctx context.Context) (*jsonrpcMessage, error) {
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case resp := <-op.resp:
+ return resp, op.err
+ }
+}
+
+// Dial creates a new client for the given URL.
+//
+// The currently supported URL schemes are "http", "https", "ws" and "wss". If rawurl is a
+// file name with no URL scheme, a local socket connection is established using UNIX
+// domain sockets on supported platforms and named pipes on Windows. If you want to
+// configure transport options, use DialHTTP, DialWebsocket or DialIPC instead.
+//
+// For websocket connections, the origin is set to the local host name.
+//
+// The client reconnects automatically if the connection is lost.
+func Dial(rawurl string) (*Client, error) {
+ return DialContext(context.Background(), rawurl)
+}
+
+// DialContext creates a new RPC client, just like Dial.
+//
+// The context is used to cancel or time out the initial connection establishment. It does
+// not affect subsequent interactions with the client.
+func DialContext(ctx context.Context, rawurl string) (*Client, error) {
+ u, err := url.Parse(rawurl)
+ if err != nil {
+ return nil, err
+ }
+ switch u.Scheme {
+ case "http", "https":
+ return DialHTTP(rawurl)
+ case "ws", "wss":
+ return DialWebsocket(ctx, rawurl, "")
+ case "":
+ return DialIPC(ctx, rawurl)
+ default:
+ return nil, fmt.Errorf("no known transport for URL scheme %q", u.Scheme)
+ }
+}
+
+func newClient(initctx context.Context, connectFunc func(context.Context) (net.Conn, error)) (*Client, error) {
+ conn, err := connectFunc(initctx)
+ if err != nil {
+ return nil, err
+ }
+ _, isHTTP := conn.(*httpConn)
+
+ c := &Client{
+ writeConn: conn,
+ isHTTP: isHTTP,
+ connectFunc: connectFunc,
+ close: make(chan struct{}),
+ didQuit: make(chan struct{}),
+ reconnected: make(chan net.Conn),
+ readErr: make(chan error),
+ readResp: make(chan []*jsonrpcMessage),
+ requestOp: make(chan *requestOp),
+ sendDone: make(chan error, 1),
+ respWait: make(map[string]*requestOp),
+ subs: make(map[string]*ClientSubscription),
+ }
+ if !isHTTP {
+ go c.dispatch(conn)
+ }
+ return c, nil
+}
+
+func (c *Client) nextID() json.RawMessage {
+ id := atomic.AddUint32(&c.idCounter, 1)
+ return []byte(strconv.FormatUint(uint64(id), 10))
+}
+
+// SupportedModules calls the rpc_modules method, retrieving the list of
+// APIs that are available on the server.
+func (c *Client) SupportedModules() (map[string]string, error) {
+ var result map[string]string
+ ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
+ defer cancel()
+ err := c.CallContext(ctx, &result, "rpc_modules")
+ return result, err
+}
+
+// Close closes the client, aborting any in-flight requests.
+func (c *Client) Close() {
+ if c.isHTTP {
+ return
+ }
+ select {
+ case c.close <- struct{}{}:
+ <-c.didQuit
+ case <-c.didQuit:
+ }
+}
+
+// Call performs a JSON-RPC call with the given arguments and unmarshals into
+// result if no error occurred.
+//
+// The result must be a pointer so that package json can unmarshal into it. You
+// can also pass nil, in which case the result is ignored.
+func (c *Client) Call(result interface{}, method string, args ...interface{}) error {
+ ctx := context.Background()
+ return c.CallContext(ctx, result, method, args...)
+}
+
+// CallContext performs a JSON-RPC call with the given arguments. If the context is
+// canceled before the call has successfully returned, CallContext returns immediately.
+//
+// The result must be a pointer so that package json can unmarshal into it. You
+// can also pass nil, in which case the result is ignored.
+func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
+ msg, err := c.newMessage(method, args...)
+ if err != nil {
+ return err
+ }
+ op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)}
+
+ if c.isHTTP {
+ err = c.sendHTTP(ctx, op, msg)
+ } else {
+ err = c.send(ctx, op, msg)
+ }
+ if err != nil {
+ return err
+ }
+
+ // dispatch has accepted the request and will close the channel it when it quits.
+ switch resp, err := op.wait(ctx); {
+ case err != nil:
+ return err
+ case resp.Error != nil:
+ return resp.Error
+ case len(resp.Result) == 0:
+ return ErrNoResult
+ default:
+ return json.Unmarshal(resp.Result, &result)
+ }
+}
+
+// BatchCall sends all given requests as a single batch and waits for the server
+// to return a response for all of them.
+//
+// In contrast to Call, BatchCall only returns I/O errors. Any error specific to
+// a request is reported through the Error field of the corresponding BatchElem.
+//
+// Note that batch calls may not be executed atomically on the server side.
+func (c *Client) BatchCall(b []BatchElem) error {
+ ctx := context.Background()
+ return c.BatchCallContext(ctx, b)
+}
+
+// BatchCall sends all given requests as a single batch and waits for the server
+// to return a response for all of them. The wait duration is bounded by the
+// context's deadline.
+//
+// In contrast to CallContext, BatchCallContext only returns I/O errors. Any
+// error specific to a request is reported through the Error field of the
+// corresponding BatchElem.
+//
+// Note that batch calls may not be executed atomically on the server side.
+func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
+ msgs := make([]*jsonrpcMessage, len(b))
+ op := &requestOp{
+ ids: make([]json.RawMessage, len(b)),
+ resp: make(chan *jsonrpcMessage, len(b)),
+ }
+ for i, elem := range b {
+ msg, err := c.newMessage(elem.Method, elem.Args...)
+ if err != nil {
+ return err
+ }
+ msgs[i] = msg
+ op.ids[i] = msg.ID
+ }
+
+ var err error
+ if c.isHTTP {
+ err = c.sendBatchHTTP(ctx, op, msgs)
+ } else {
+ err = c.send(ctx, op, msgs)
+ }
+
+ // Wait for all responses to come back.
+ for n := 0; n < len(b) && err == nil; n++ {
+ var resp *jsonrpcMessage
+ resp, err = op.wait(ctx)
+ if err != nil {
+ break
+ }
+ // Find the element corresponding to this response.
+ // The element is guaranteed to be present because dispatch
+ // only sends valid IDs to our channel.
+ var elem *BatchElem
+ for i := range msgs {
+ if bytes.Equal(msgs[i].ID, resp.ID) {
+ elem = &b[i]
+ break
+ }
+ }
+ if resp.Error != nil {
+ elem.Error = resp.Error
+ continue
+ }
+ if len(resp.Result) == 0 {
+ elem.Error = ErrNoResult
+ continue
+ }
+ elem.Error = json.Unmarshal(resp.Result, elem.Result)
+ }
+ return err
+}
+
+// EthSubscribe calls the "eth_subscribe" method with the given arguments,
+// registering a subscription. Server notifications for the subscription are
+// sent to the given channel. The element type of the channel must match the
+// expected type of content returned by the subscription.
+//
+// Callers should not use the same channel for multiple calls to EthSubscribe.
+// The channel is closed when the notification is unsubscribed or an error
+// occurs. The error can be retrieved via the Err method of the subscription.
+//
+// Slow subscribers will block the clients ingress path eventually.
+func (c *Client) EthSubscribe(channel interface{}, args ...interface{}) (*ClientSubscription, error) {
+ // Check type of channel first.
+ chanVal := reflect.ValueOf(channel)
+ if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 {
+ panic("first argument to EthSubscribe must be a writable channel")
+ }
+ if chanVal.IsNil() {
+ panic("channel given to EthSubscribe must not be nil")
+ }
+ if c.isHTTP {
+ return nil, ErrNotificationsUnsupported
+ }
+
+ msg, err := c.newMessage(subscribeMethod, args...)
+ if err != nil {
+ return nil, err
+ }
+ op := &requestOp{
+ ids: []json.RawMessage{msg.ID},
+ resp: make(chan *jsonrpcMessage),
+ sub: newClientSubscription(c, chanVal),
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
+ defer cancel()
+
+ // Send the subscription request.
+ // The arrival and validity of the response is signaled on sub.quit.
+ if err := c.send(ctx, op, msg); err != nil {
+ return nil, err
+ }
+ if _, err := op.wait(ctx); err != nil {
+ return nil, err
+ }
+ return op.sub, nil
+}
+
+func (c *Client) newMessage(method string, paramsIn ...interface{}) (*jsonrpcMessage, error) {
+ params, err := json.Marshal(paramsIn)
+ if err != nil {
+ return nil, err
+ }
+ return &jsonrpcMessage{Version: "2.0", ID: c.nextID(), Method: method, Params: params}, nil
+}
+
+// send registers op with the dispatch loop, then sends msg on the connection.
+// if sending fails, op is deregistered.
+func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error {
+ select {
+ case c.requestOp <- op:
+ if glog.V(logger.Detail) {
+ glog.Info("sending ", msg)
+ }
+ err := c.write(ctx, msg)
+ c.sendDone <- err
+ return err
+ case <-c.didQuit:
+ return ErrClientQuit
+ }
+}
+
+func (c *Client) write(ctx context.Context, msg interface{}) error {
+ deadline, ok := ctx.Deadline()
+ if !ok {
+ deadline = time.Now().Add(defaultWriteTimeout)
+ }
+ // The previous write failed. Try to establish a new connection.
+ if c.writeConn == nil {
+ if err := c.reconnect(ctx); err != nil {
+ return err
+ }
+ }
+ c.writeConn.SetWriteDeadline(deadline)
+ err := json.NewEncoder(c.writeConn).Encode(msg)
+ if err != nil {
+ c.writeConn = nil
+ }
+ return err
+}
+
+func (c *Client) reconnect(ctx context.Context) error {
+ newconn, err := c.connectFunc(ctx)
+ if err != nil {
+ glog.V(logger.Detail).Infof("reconnect failed: %v", err)
+ return err
+ }
+ select {
+ case c.reconnected <- newconn:
+ c.writeConn = newconn
+ return nil
+ case <-c.didQuit:
+ newconn.Close()
+ return ErrClientQuit
+ }
+}
+
+// dispatch is the main loop of the client.
+// It sends read messages to waiting calls to Call and BatchCall
+// and subscription notifications to registered subscriptions.
+func (c *Client) dispatch(conn net.Conn) {
+ // Spawn the initial read loop.
+ go c.read(conn)
+
+ var (
+ lastOp *requestOp // tracks last send operation
+ requestOpLock = c.requestOp // nil while the send lock is held
+ reading = true // if true, a read loop is running
+ )
+ defer close(c.didQuit)
+ defer func() {
+ c.closeRequestOps(ErrClientQuit)
+ conn.Close()
+ if reading {
+ // Empty read channels until read is dead.
+ for {
+ select {
+ case <-c.readResp:
+ case <-c.readErr:
+ return
+ }
+ }
+ }
+ }()
+
+ for {
+ select {
+ case <-c.close:
+ return
+
+ // Read path.
+ case batch := <-c.readResp:
+ for _, msg := range batch {
+ switch {
+ case msg.isNotification():
+ if glog.V(logger.Detail) {
+ glog.Info("<-readResp: notification ", msg)
+ }
+ c.handleNotification(msg)
+ case msg.isResponse():
+ if glog.V(logger.Detail) {
+ glog.Info("<-readResp: response ", msg)
+ }
+ c.handleResponse(msg)
+ default:
+ if glog.V(logger.Debug) {
+ glog.Error("<-readResp: dropping weird message", msg)
+ }
+ // TODO: maybe close
+ }
+ }
+
+ case err := <-c.readErr:
+ glog.V(logger.Debug).Infof("<-readErr: %v", err)
+ c.closeRequestOps(err)
+ conn.Close()
+ reading = false
+
+ case newconn := <-c.reconnected:
+ glog.V(logger.Debug).Infof("<-reconnected: (reading=%t) %v", reading, conn.RemoteAddr())
+ if reading {
+ // Wait for the previous read loop to exit. This is a rare case.
+ conn.Close()
+ <-c.readErr
+ }
+ go c.read(newconn)
+ reading = true
+ conn = newconn
+
+ // Send path.
+ case op := <-requestOpLock:
+ // Stop listening for further send ops until the current one is done.
+ requestOpLock = nil
+ lastOp = op
+ for _, id := range op.ids {
+ c.respWait[string(id)] = op
+ }
+
+ case err := <-c.sendDone:
+ if err != nil {
+ // Remove response handlers for the last send. We remove those here
+ // because the error is already handled in Call or BatchCall. When the
+ // read loop goes down, it will signal all other current operations.
+ for _, id := range lastOp.ids {
+ delete(c.respWait, string(id))
+ }
+ }
+ // Listen for send ops again.
+ requestOpLock = c.requestOp
+ lastOp = nil
+ }
+ }
+}
+
+// closeRequestOps unblocks pending send ops and active subscriptions.
+func (c *Client) closeRequestOps(err error) {
+ didClose := make(map[*requestOp]bool)
+
+ for id, op := range c.respWait {
+ // Remove the op so that later calls will not close op.resp again.
+ delete(c.respWait, id)
+
+ if !didClose[op] {
+ op.err = err
+ close(op.resp)
+ didClose[op] = true
+ }
+ }
+ for id, sub := range c.subs {
+ delete(c.subs, id)
+ sub.quitWithError(err, false)
+ }
+}
+
+func (c *Client) handleNotification(msg *jsonrpcMessage) {
+ if msg.Method != notificationMethod {
+ glog.V(logger.Debug).Info("dropping non-subscription message: ", msg)
+ return
+ }
+ var subResult struct {
+ ID string `json:"subscription"`
+ Result json.RawMessage `json:"result"`
+ }
+ if err := json.Unmarshal(msg.Params, &subResult); err != nil {
+ glog.V(logger.Debug).Info("dropping invalid subscription message: ", msg)
+ return
+ }
+ if c.subs[subResult.ID] != nil {
+ c.subs[subResult.ID].deliver(subResult.Result)
+ }
+}
+
+func (c *Client) handleResponse(msg *jsonrpcMessage) {
+ op := c.respWait[string(msg.ID)]
+ if op == nil {
+ glog.V(logger.Debug).Infof("unsolicited response %v", msg)
+ return
+ }
+ delete(c.respWait, string(msg.ID))
+ // For normal responses, just forward the reply to Call/BatchCall.
+ if op.sub == nil {
+ op.resp <- msg
+ return
+ }
+ // For subscription responses, start the subscription if the server
+ // indicates success. EthSubscribe gets unblocked in either case through
+ // the op.resp channel.
+ defer close(op.resp)
+ if msg.Error != nil {
+ op.err = msg.Error
+ return
+ }
+ if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil {
+ go op.sub.start()
+ c.subs[op.sub.subid] = op.sub
+ }
+}
+
+// Reading happens on a dedicated goroutine.
+
+func (c *Client) read(conn net.Conn) error {
+ var (
+ buf json.RawMessage
+ dec = json.NewDecoder(conn)
+ )
+ readMessage := func() (rs []*jsonrpcMessage, err error) {
+ buf = buf[:0]
+ if err = dec.Decode(&buf); err != nil {
+ return nil, err
+ }
+ if isBatch(buf) {
+ err = json.Unmarshal(buf, &rs)
+ } else {
+ rs = make([]*jsonrpcMessage, 1)
+ err = json.Unmarshal(buf, &rs[0])
+ }
+ return rs, err
+ }
+
+ for {
+ resp, err := readMessage()
+ if err != nil {
+ c.readErr <- err
+ return err
+ }
+ c.readResp <- resp
+ }
+}
+
+// Subscriptions.
+
+// A ClientSubscription represents a subscription established through EthSubscribe.
+type ClientSubscription struct {
+ client *Client
+ etype reflect.Type
+ channel reflect.Value
+ subid string
+ in chan json.RawMessage
+
+ quitOnce sync.Once // ensures quit is closed once
+ quit chan struct{} // quit is closed when the subscription exits
+ errOnce sync.Once // ensures err is closed once
+ err chan error
+}
+
+func newClientSubscription(c *Client, channel reflect.Value) *ClientSubscription {
+ sub := &ClientSubscription{
+ client: c,
+ etype: channel.Type().Elem(),
+ channel: channel,
+ quit: make(chan struct{}),
+ err: make(chan error, 1),
+ // in is buffered so dispatch can continue even if the subscriber is slow.
+ in: make(chan json.RawMessage, clientSubscriptionBuffer),
+ }
+ return sub
+}
+
+// Err returns the subscription error channel. The intended use of Err is to schedule
+// resubscription when the client connection is closed unexpectedly.
+//
+// The error channel receives a value when the subscription has ended due
+// to an error. The received error is ErrClientQuit if Close has been called
+// on the underlying client and no other error has occurred.
+//
+// The error channel is closed when Unsubscribe is called on the subscription.
+func (sub *ClientSubscription) Err() <-chan error {
+ return sub.err
+}
+
+// Unsubscribe unsubscribes the notification and closes the error channel.
+// It can safely be called more than once.
+func (sub *ClientSubscription) Unsubscribe() {
+ sub.quitWithError(nil, true)
+ sub.errOnce.Do(func() { close(sub.err) })
+}
+
+func (sub *ClientSubscription) quitWithError(err error, unsubscribeServer bool) {
+ sub.quitOnce.Do(func() {
+ if unsubscribeServer {
+ sub.requestUnsubscribe()
+ }
+ if err != nil {
+ sub.err <- err
+ }
+ close(sub.quit)
+ })
+}
+
+func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) {
+ select {
+ case sub.in <- result:
+ return true
+ case <-sub.quit:
+ return false
+ }
+}
+
+func (sub *ClientSubscription) start() {
+ sub.quitWithError(sub.forward())
+}
+
+func (sub *ClientSubscription) forward() (err error, unsubscribeServer bool) {
+ cases := []reflect.SelectCase{
+ {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)},
+ {Dir: reflect.SelectSend, Chan: sub.channel},
+ }
+ for {
+ select {
+ case result := <-sub.in:
+ val, err := sub.unmarshal(result)
+ if err != nil {
+ return err, true
+ }
+ cases[1].Send = val
+ switch chosen, _, _ := reflect.Select(cases); chosen {
+ case 0: // <-sub.quit
+ return nil, false
+ case 1: // sub.channel<-
+ continue
+ }
+ case <-sub.quit:
+ return nil, false
+ }
+ }
+}
+
+func (sub *ClientSubscription) unmarshal(result json.RawMessage) (reflect.Value, error) {
+ val := reflect.New(sub.etype)
+ err := json.Unmarshal(result, val.Interface())
+ return val.Elem(), err
+}
+
+func (sub *ClientSubscription) requestUnsubscribe() error {
+ var result interface{}
+ return sub.client.Call(&result, unsubscribeMethod, sub.subid)
+}
diff --git a/rpc/client_context_go1.4.go b/rpc/client_context_go1.4.go
new file mode 100644
index 000000000..ac956a17d
--- /dev/null
+++ b/rpc/client_context_go1.4.go
@@ -0,0 +1,60 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// +build !go1.5
+
+package rpc
+
+import (
+ "net"
+ "net/http"
+ "time"
+
+ "golang.org/x/net/context"
+)
+
+// In older versions of Go (below 1.5), dials cannot be canceled
+// via a channel or context. The context deadline can still applied.
+
+// contextDialer returns a dialer that applies the deadline value from the given context.
+func contextDialer(ctx context.Context) *net.Dialer {
+ dialer := &net.Dialer{KeepAlive: tcpKeepAliveInterval}
+ if deadline, ok := ctx.Deadline(); ok {
+ dialer.Deadline = deadline
+ } else {
+ dialer.Deadline = time.Now().Add(defaultDialTimeout)
+ }
+ return dialer
+}
+
+// dialContext connects to the given address, aborting the dial if ctx is canceled.
+func dialContext(ctx context.Context, network, addr string) (net.Conn, error) {
+ return contextDialer(ctx).Dial(network, addr)
+}
+
+// requestWithContext copies req, adding the cancelation channel and deadline from ctx.
+func requestWithContext(c *http.Client, req *http.Request, ctx context.Context) (*http.Client, *http.Request) {
+ // Set Timeout on the client if the context has a deadline.
+ // Note that there is no default timeout (unlike in contextDialer) because
+ // the timeout applies to the entire request, including reads from body.
+ if deadline, ok := ctx.Deadline(); ok {
+ c2 := *c
+ c2.Timeout = deadline.Sub(time.Now())
+ c = &c2
+ }
+ req2 := *req
+ return c, &req2
+}
diff --git a/rpc/client_context_go1.5.go b/rpc/client_context_go1.5.go
new file mode 100644
index 000000000..4a007d9f8
--- /dev/null
+++ b/rpc/client_context_go1.5.go
@@ -0,0 +1,61 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// +build go1.5,!go1.6
+
+package rpc
+
+import (
+ "net"
+ "net/http"
+ "time"
+
+ "golang.org/x/net/context"
+)
+
+// In Go 1.5, dials cannot be canceled via a channel or context. The context deadline can
+// still be applied. Go 1.5 adds the ability to cancel HTTP requests via a channel.
+
+// contextDialer returns a dialer that applies the deadline value from the given context.
+func contextDialer(ctx context.Context) *net.Dialer {
+ dialer := &net.Dialer{KeepAlive: tcpKeepAliveInterval}
+ if deadline, ok := ctx.Deadline(); ok {
+ dialer.Deadline = deadline
+ } else {
+ dialer.Deadline = time.Now().Add(defaultDialTimeout)
+ }
+ return dialer
+}
+
+// dialContext connects to the given address, aborting the dial if ctx is canceled.
+func dialContext(ctx context.Context, network, addr string) (net.Conn, error) {
+ return contextDialer(ctx).Dial(network, addr)
+}
+
+// requestWithContext copies req, adding the cancelation channel and deadline from ctx.
+func requestWithContext(c *http.Client, req *http.Request, ctx context.Context) (*http.Client, *http.Request) {
+ // Set Timeout on the client if the context has a deadline.
+ // Note that there is no default timeout (unlike in contextDialer) because
+ // the timeout applies to the entire request, including reads from body.
+ if deadline, ok := ctx.Deadline(); ok {
+ c2 := *c
+ c2.Timeout = deadline.Sub(time.Now())
+ c = &c2
+ }
+ req2 := *req
+ req2.Cancel = ctx.Done()
+ return c, &req2
+}
diff --git a/rpc/client_context_go1.6.go b/rpc/client_context_go1.6.go
new file mode 100644
index 000000000..67777ddc6
--- /dev/null
+++ b/rpc/client_context_go1.6.go
@@ -0,0 +1,55 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// +build go1.6,!go1.7
+
+package rpc
+
+import (
+ "net"
+ "net/http"
+ "time"
+
+ "golang.org/x/net/context"
+)
+
+// In Go 1.6, net.Dialer gained the ability to cancel via a channel.
+
+// contextDialer returns a dialer that applies the deadline value from the given context.
+func contextDialer(ctx context.Context) *net.Dialer {
+ dialer := &net.Dialer{Cancel: ctx.Done(), KeepAlive: tcpKeepAliveInterval}
+ if deadline, ok := ctx.Deadline(); ok {
+ dialer.Deadline = deadline
+ } else {
+ dialer.Deadline = time.Now().Add(defaultDialTimeout)
+ }
+ return dialer
+}
+
+// dialContext connects to the given address, aborting the dial if ctx is canceled.
+func dialContext(ctx context.Context, network, addr string) (net.Conn, error) {
+ return contextDialer(ctx).Dial(network, addr)
+}
+
+// requestWithContext copies req, adding the cancelation channel and deadline from ctx.
+func requestWithContext(c *http.Client, req *http.Request, ctx context.Context) (*http.Client, *http.Request) {
+ // We set Timeout on the client for Go <= 1.5. There
+ // is no need to do that here because the dial will be canceled
+ // by package http.
+ req2 := *req
+ req2.Cancel = ctx.Done()
+ return c, &req2
+}
diff --git a/rpc/client_context_go1.7.go b/rpc/client_context_go1.7.go
new file mode 100644
index 000000000..56ce12ab8
--- /dev/null
+++ b/rpc/client_context_go1.7.go
@@ -0,0 +1,51 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// +build go1.7
+
+package rpc
+
+import (
+ "context"
+ "net"
+ "net/http"
+ "time"
+)
+
+// In Go 1.7, context moved into the standard library and support
+// for cancelation via context was added to net.Dialer and http.Request.
+
+// contextDialer returns a dialer that applies the deadline value from the given context.
+func contextDialer(ctx context.Context) *net.Dialer {
+ dialer := &net.Dialer{Cancel: ctx.Done(), KeepAlive: tcpKeepAliveInterval}
+ if deadline, ok := ctx.Deadline(); ok {
+ dialer.Deadline = deadline
+ } else {
+ dialer.Deadline = time.Now().Add(defaultDialTimeout)
+ }
+ return dialer
+}
+
+// dialContext connects to the given address, aborting the dial if ctx is canceled.
+func dialContext(ctx context.Context, network, addr string) (net.Conn, error) {
+ d := &net.Dialer{KeepAlive: tcpKeepAliveInterval}
+ return d.DialContext(ctx, network, addr)
+}
+
+// requestWithContext copies req, adding the cancelation channel and deadline from ctx.
+func requestWithContext(c *http.Client, req *http.Request, ctx context.Context) (*http.Client, *http.Request) {
+ return c, req.WithContext(ctx)
+}
diff --git a/rpc/client_example_test.go b/rpc/client_example_test.go
new file mode 100644
index 000000000..84b4b67bb
--- /dev/null
+++ b/rpc/client_example_test.go
@@ -0,0 +1,83 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package rpc_test
+
+import (
+ "fmt"
+ "math/big"
+ "time"
+
+ "github.com/ethereum/go-ethereum/rpc"
+)
+
+// In this example, our client whishes to track the latest 'block number'
+// known to the server. The server supports two methods:
+//
+// eth_getBlockByNumber("latest", {})
+// returns the latest block object.
+//
+// eth_subscribe("newBlocks")
+// creates a subscription which fires block objects when new blocks arrive.
+
+type Block struct {
+ Number *big.Int
+}
+
+func ExampleClientSubscription() {
+ // Connect the client.
+ client, _ := rpc.Dial("ws://127.0.0.1:8485")
+ subch := make(chan Block)
+ go subscribeBlocks(client, subch)
+
+ // Print events from the subscription as they arrive.
+ for block := range subch {
+ fmt.Println("latest block:", block.Number)
+ }
+}
+
+// subscribeBlocks runs in its own goroutine and maintains
+// a subscription for new blocks.
+func subscribeBlocks(client *rpc.Client, subch chan Block) {
+ for i := 0; ; i++ {
+ if i > 0 {
+ time.Sleep(2 * time.Second)
+ }
+
+ // Subscribe to new blocks.
+ sub, err := client.EthSubscribe(subch, "newBlocks")
+ if err == rpc.ErrClientQuit {
+ return // Stop reconnecting if the client was closed.
+ } else if err != nil {
+ fmt.Println("subscribe error:", err)
+ continue
+ }
+
+ // The connection is established now.
+ // Update the channel with the current block.
+ var lastBlock Block
+ if err := client.Call(&lastBlock, "eth_getBlockByNumber", "latest"); err != nil {
+ fmt.Println("can't get latest block:", err)
+ continue
+ }
+ subch <- lastBlock
+
+ // The subscription will deliver events to the channel. Wait for the
+ // subscription to end for any reason, then loop around to re-establish
+ // the connection.
+ fmt.Println("connection lost: ", <-sub.Err())
+ }
+}
diff --git a/rpc/client_test.go b/rpc/client_test.go
new file mode 100644
index 000000000..58dceada0
--- /dev/null
+++ b/rpc/client_test.go
@@ -0,0 +1,489 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package rpc
+
+import (
+ "fmt"
+ "math/rand"
+ "net"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "reflect"
+ "runtime"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/davecgh/go-spew/spew"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+ "golang.org/x/net/context"
+)
+
+func TestClientRequest(t *testing.T) {
+ server := newTestServer("service", new(Service))
+ defer server.Stop()
+ client := DialInProc(server)
+ defer client.Close()
+
+ var resp Result
+ if err := client.Call(&resp, "service_echo", "hello", 10, &Args{"world"}); err != nil {
+ t.Fatal(err)
+ }
+ if !reflect.DeepEqual(resp, Result{"hello", 10, &Args{"world"}}) {
+ t.Errorf("incorrect result %#v", resp)
+ }
+}
+
+func TestClientBatchRequest(t *testing.T) {
+ server := newTestServer("service", new(Service))
+ defer server.Stop()
+ client := DialInProc(server)
+ defer client.Close()
+
+ batch := []BatchElem{
+ {
+ Method: "service_echo",
+ Args: []interface{}{"hello", 10, &Args{"world"}},
+ Result: new(Result),
+ },
+ {
+ Method: "service_echo",
+ Args: []interface{}{"hello2", 11, &Args{"world"}},
+ Result: new(Result),
+ },
+ {
+ Method: "no_such_method",
+ Args: []interface{}{1, 2, 3},
+ Result: new(int),
+ },
+ }
+ if err := client.BatchCall(batch); err != nil {
+ t.Fatal(err)
+ }
+ wantResult := []BatchElem{
+ {
+ Method: "service_echo",
+ Args: []interface{}{"hello", 10, &Args{"world"}},
+ Result: &Result{"hello", 10, &Args{"world"}},
+ },
+ {
+ Method: "service_echo",
+ Args: []interface{}{"hello2", 11, &Args{"world"}},
+ Result: &Result{"hello2", 11, &Args{"world"}},
+ },
+ {
+ Method: "no_such_method",
+ Args: []interface{}{1, 2, 3},
+ Result: new(int),
+ Error: &jsonError{Code: -32601, Message: "The method no_such_method_ does not exist/is not available"},
+ },
+ }
+ if !reflect.DeepEqual(batch, wantResult) {
+ t.Errorf("batch results mismatch:\ngot %swant %s", spew.Sdump(batch), spew.Sdump(wantResult))
+ }
+}
+
+// func TestClientCancelInproc(t *testing.T) { testClientCancel("inproc", t) }
+func TestClientCancelWebsocket(t *testing.T) { testClientCancel("ws", t) }
+func TestClientCancelHTTP(t *testing.T) { testClientCancel("http", t) }
+func TestClientCancelIPC(t *testing.T) { testClientCancel("ipc", t) }
+
+// This test checks that requests made through CallContext can be canceled by canceling
+// the context.
+func testClientCancel(transport string, t *testing.T) {
+ server := newTestServer("service", new(Service))
+ defer server.Stop()
+
+ // What we want to achieve is that the context gets canceled
+ // at various stages of request processing. The interesting cases
+ // are:
+ // - cancel during dial
+ // - cancel while performing a HTTP request
+ // - cancel while waiting for a response
+ //
+ // To trigger those, the times are chosen such that connections
+ // are killed within the deadline for every other call (maxKillTimeout
+ // is 2x maxCancelTimeout).
+ //
+ // Once a connection is dead, there is a fair chance it won't connect
+ // successfully because the accept is delayed by 1s.
+ maxContextCancelTimeout := 300 * time.Millisecond
+ fl := &flakeyListener{
+ maxAcceptDelay: 1 * time.Second,
+ maxKillTimeout: 600 * time.Millisecond,
+ }
+
+ var client *Client
+ switch transport {
+ case "ws", "http":
+ c, hs := httpTestClient(server, transport, fl)
+ defer hs.Close()
+ client = c
+ case "ipc":
+ c, l := ipcTestClient(server, fl)
+ defer l.Close()
+ client = c
+ default:
+ panic("unknown transport: " + transport)
+ }
+
+ // These tests take a lot of time, run them all at once.
+ // You probably want to run with -parallel 1 or comment out
+ // the call to t.Parallel if you enable the logging.
+ t.Parallel()
+ // glog.SetV(6)
+ // glog.SetToStderr(true)
+ // defer glog.SetToStderr(false)
+ // glog.Infoln("testing ", transport)
+
+ // The actual test starts here.
+ var (
+ wg sync.WaitGroup
+ nreqs = 10
+ ncallers = 6
+ )
+ caller := func(index int) {
+ defer wg.Done()
+ for i := 0; i < nreqs; i++ {
+ var (
+ ctx context.Context
+ cancel func()
+ timeout = time.Duration(rand.Int63n(int64(maxContextCancelTimeout)))
+ )
+ if index < ncallers/2 {
+ // For half of the callers, create a context without deadline
+ // and cancel it later.
+ ctx, cancel = context.WithCancel(context.Background())
+ time.AfterFunc(timeout, cancel)
+ } else {
+ // For the other half, create a context with a deadline instead. This is
+ // different because the context deadline is used to set the socket write
+ // deadline.
+ ctx, cancel = context.WithTimeout(context.Background(), timeout)
+ }
+ // Now perform a call with the context.
+ // The key thing here is that no call will ever complete successfully.
+ err := client.CallContext(ctx, nil, "service_sleep", 2*maxContextCancelTimeout)
+ if err != nil {
+ glog.V(logger.Debug).Infoln("got expected error:", err)
+ } else {
+ t.Errorf("no error for call with %v wait time", timeout)
+ }
+ cancel()
+ }
+ }
+ wg.Add(ncallers)
+ for i := 0; i < ncallers; i++ {
+ go caller(i)
+ }
+ wg.Wait()
+}
+
+func TestClientSubscribeInvalidArg(t *testing.T) {
+ server := newTestServer("service", new(Service))
+ defer server.Stop()
+ client := DialInProc(server)
+ defer client.Close()
+
+ check := func(shouldPanic bool, arg interface{}) {
+ defer func() {
+ err := recover()
+ if shouldPanic && err == nil {
+ t.Errorf("EthSubscribe should've panicked for %#v", arg)
+ }
+ if !shouldPanic && err != nil {
+ t.Errorf("EthSubscribe shouldn't have panicked for %#v", arg)
+ buf := make([]byte, 1024*1024)
+ buf = buf[:runtime.Stack(buf, false)]
+ t.Error(err)
+ t.Error(string(buf))
+ }
+ }()
+ client.EthSubscribe(arg, "foo_bar")
+ }
+ check(true, nil)
+ check(true, 1)
+ check(true, (chan int)(nil))
+ check(true, make(<-chan int))
+ check(false, make(chan int))
+ check(false, make(chan<- int))
+}
+
+func TestClientSubscribe(t *testing.T) {
+ server := newTestServer("eth", new(NotificationTestService))
+ defer server.Stop()
+ client := DialInProc(server)
+ defer client.Close()
+
+ nc := make(chan int)
+ count := 10
+ sub, err := client.EthSubscribe(nc, "someSubscription", count, 0)
+ if err != nil {
+ t.Fatal("can't subscribe:", err)
+ }
+ for i := 0; i < count; i++ {
+ if val := <-nc; val != i {
+ t.Fatalf("value mismatch: got %d, want %d", val, i)
+ }
+ }
+
+ sub.Unsubscribe()
+ select {
+ case v := <-nc:
+ t.Fatal("received value after unsubscribe:", v)
+ case err := <-sub.Err():
+ if err != nil {
+ t.Fatalf("Err returned a non-nil error after explicit unsubscribe: %q", err)
+ }
+ case <-time.After(1 * time.Second):
+ t.Fatalf("subscription not closed within 1s after unsubscribe")
+ }
+}
+
+// In this test, the connection drops while EthSubscribe is
+// waiting for a response.
+func TestClientSubscribeClose(t *testing.T) {
+ service := &NotificationTestService{
+ gotHangSubscriptionReq: make(chan struct{}),
+ unblockHangSubscription: make(chan struct{}),
+ }
+ server := newTestServer("eth", service)
+ defer server.Stop()
+ client := DialInProc(server)
+ defer client.Close()
+
+ var (
+ nc = make(chan int)
+ errc = make(chan error)
+ sub *ClientSubscription
+ err error
+ )
+ go func() {
+ sub, err = client.EthSubscribe(nc, "hangSubscription", 999)
+ errc <- err
+ }()
+
+ <-service.gotHangSubscriptionReq
+ client.Close()
+ service.unblockHangSubscription <- struct{}{}
+
+ select {
+ case err := <-errc:
+ if err == nil {
+ t.Errorf("EthSubscribe returned nil error after Close")
+ }
+ if sub != nil {
+ t.Error("EthSubscribe returned non-nil subscription after Close")
+ }
+ case <-time.After(1 * time.Second):
+ t.Fatalf("EthSubscribe did not return within 1s after Close")
+ }
+}
+
+func TestClientHTTP(t *testing.T) {
+ server := newTestServer("service", new(Service))
+ defer server.Stop()
+
+ client, hs := httpTestClient(server, "http", nil)
+ defer hs.Close()
+ defer client.Close()
+
+ // Launch concurrent requests.
+ var (
+ results = make([]Result, 100)
+ errc = make(chan error)
+ wantResult = Result{"a", 1, new(Args)}
+ )
+ defer client.Close()
+ for i := range results {
+ i := i
+ go func() {
+ errc <- client.Call(&results[i], "service_echo",
+ wantResult.String, wantResult.Int, wantResult.Args)
+ }()
+ }
+
+ // Wait for all of them to complete.
+ timeout := time.NewTimer(5 * time.Second)
+ defer timeout.Stop()
+ for i := range results {
+ select {
+ case err := <-errc:
+ if err != nil {
+ t.Fatal(err)
+ }
+ case <-timeout.C:
+ t.Fatalf("timeout (got %d/%d) results)", i+1, len(results))
+ }
+ }
+
+ // Check results.
+ for i := range results {
+ if !reflect.DeepEqual(results[i], wantResult) {
+ t.Errorf("result %d mismatch: got %#v, want %#v", i, results[i], wantResult)
+ }
+ }
+}
+
+func TestClientReconnect(t *testing.T) {
+ startServer := func(addr string) (*Server, net.Listener) {
+ srv := newTestServer("service", new(Service))
+ l, err := net.Listen("tcp", addr)
+ if err != nil {
+ t.Fatal(err)
+ }
+ go http.Serve(l, srv.WebsocketHandler("*"))
+ return srv, l
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ // Start a server and corresponding client.
+ s1, l1 := startServer("127.0.0.1:0")
+ client, err := DialContext(ctx, "ws://"+l1.Addr().String())
+ if err != nil {
+ t.Fatal("can't dial", err)
+ }
+
+ // Perform a call. This should work because the server is up.
+ var resp Result
+ if err := client.CallContext(ctx, &resp, "service_echo", "", 1, nil); err != nil {
+ t.Fatal(err)
+ }
+
+ // Shut down the server and try calling again. It shouldn't work.
+ l1.Close()
+ s1.Stop()
+ if err := client.CallContext(ctx, &resp, "service_echo", "", 2, nil); err == nil {
+ t.Error("successful call while the server is down")
+ t.Logf("resp: %#v", resp)
+ }
+
+ // Allow for some cool down time so we can listen on the same address again.
+ time.Sleep(2 * time.Second)
+
+ // Start it up again and call again. The connection should be reestablished.
+ // We spawn multiple calls here to check whether this hangs somehow.
+ s2, l2 := startServer(l1.Addr().String())
+ defer l2.Close()
+ defer s2.Stop()
+
+ start := make(chan struct{})
+ errors := make(chan error, 20)
+ for i := 0; i < cap(errors); i++ {
+ go func() {
+ <-start
+ var resp Result
+ errors <- client.CallContext(ctx, &resp, "service_echo", "", 3, nil)
+ }()
+ }
+ close(start)
+ errcount := 0
+ for i := 0; i < cap(errors); i++ {
+ if err = <-errors; err != nil {
+ errcount++
+ }
+ }
+ t.Log("err:", err)
+ if errcount > 1 {
+ t.Errorf("expected one error after disconnect, got %d", errcount)
+ }
+}
+
+func newTestServer(serviceName string, service interface{}) *Server {
+ server := NewServer()
+ if err := server.RegisterName(serviceName, service); err != nil {
+ panic(err)
+ }
+ return server
+}
+
+func httpTestClient(srv *Server, transport string, fl *flakeyListener) (*Client, *httptest.Server) {
+ // Create the HTTP server.
+ var hs *httptest.Server
+ switch transport {
+ case "ws":
+ hs = httptest.NewUnstartedServer(srv.WebsocketHandler("*"))
+ case "http":
+ hs = httptest.NewUnstartedServer(srv)
+ default:
+ panic("unknown HTTP transport: " + transport)
+ }
+ // Wrap the listener if required.
+ if fl != nil {
+ fl.Listener = hs.Listener
+ hs.Listener = fl
+ }
+ // Connect the client.
+ hs.Start()
+ client, err := Dial(transport + "://" + hs.Listener.Addr().String())
+ if err != nil {
+ panic(err)
+ }
+ return client, hs
+}
+
+func ipcTestClient(srv *Server, fl *flakeyListener) (*Client, net.Listener) {
+ // Listen on a random endpoint.
+ endpoint := fmt.Sprintf("go-ethereum-test-ipc-%d-%d", os.Getpid(), rand.Int63())
+ if runtime.GOOS == "windows" {
+ endpoint = `\\.\pipe\` + endpoint
+ } else {
+ endpoint = os.TempDir() + "/" + endpoint
+ }
+ l, err := ipcListen(endpoint)
+ if err != nil {
+ panic(err)
+ }
+ // Connect the listener to the server.
+ if fl != nil {
+ fl.Listener = l
+ l = fl
+ }
+ go srv.ServeListener(l)
+ // Connect the client.
+ client, err := Dial(endpoint)
+ if err != nil {
+ panic(err)
+ }
+ return client, l
+}
+
+// flakeyListener kills accepted connections after a random timeout.
+type flakeyListener struct {
+ net.Listener
+ maxKillTimeout time.Duration
+ maxAcceptDelay time.Duration
+}
+
+func (l *flakeyListener) Accept() (net.Conn, error) {
+ delay := time.Duration(rand.Int63n(int64(l.maxAcceptDelay)))
+ time.Sleep(delay)
+
+ c, err := l.Listener.Accept()
+ if err == nil {
+ timeout := time.Duration(rand.Int63n(int64(l.maxKillTimeout)))
+ time.AfterFunc(timeout, func() {
+ glog.V(logger.Debug).Infof("killing conn %v after %v", c.LocalAddr(), timeout)
+ c.Close()
+ })
+ }
+ return c, err
+}
diff --git a/rpc/errors.go b/rpc/errors.go
index bc352fc45..9cf9dc60c 100644
--- a/rpc/errors.go
+++ b/rpc/errors.go
@@ -24,74 +24,43 @@ type methodNotFoundError struct {
method string
}
-func (e *methodNotFoundError) Code() int {
- return -32601
-}
+func (e *methodNotFoundError) ErrorCode() int { return -32601 }
func (e *methodNotFoundError) Error() string {
return fmt.Sprintf("The method %s%s%s does not exist/is not available", e.service, serviceMethodSeparator, e.method)
}
// received message isn't a valid request
-type invalidRequestError struct {
- message string
-}
+type invalidRequestError struct{ message string }
-func (e *invalidRequestError) Code() int {
- return -32600
-}
+func (e *invalidRequestError) ErrorCode() int { return -32600 }
-func (e *invalidRequestError) Error() string {
- return e.message
-}
+func (e *invalidRequestError) Error() string { return e.message }
// received message is invalid
-type invalidMessageError struct {
- message string
-}
+type invalidMessageError struct{ message string }
-func (e *invalidMessageError) Code() int {
- return -32700
-}
+func (e *invalidMessageError) ErrorCode() int { return -32700 }
-func (e *invalidMessageError) Error() string {
- return e.message
-}
+func (e *invalidMessageError) Error() string { return e.message }
// unable to decode supplied params, or an invalid number of parameters
-type invalidParamsError struct {
- message string
-}
+type invalidParamsError struct{ message string }
-func (e *invalidParamsError) Code() int {
- return -32602
-}
+func (e *invalidParamsError) ErrorCode() int { return -32602 }
-func (e *invalidParamsError) Error() string {
- return e.message
-}
+func (e *invalidParamsError) Error() string { return e.message }
// logic error, callback returned an error
-type callbackError struct {
- message string
-}
+type callbackError struct{ message string }
-func (e *callbackError) Code() int {
- return -32000
-}
+func (e *callbackError) ErrorCode() int { return -32000 }
-func (e *callbackError) Error() string {
- return e.message
-}
+func (e *callbackError) Error() string { return e.message }
// issued when a request is received after the server is issued to stop.
-type shutdownError struct {
-}
+type shutdownError struct{}
-func (e *shutdownError) Code() int {
- return -32000
-}
+func (e *shutdownError) ErrorCode() int { return -32000 }
-func (e *shutdownError) Error() string {
- return "server is shutting down"
-}
+func (e *shutdownError) Error() string { return "server is shutting down" }
diff --git a/rpc/http.go b/rpc/http.go
index 9283ce0ec..afcdd4bd6 100644
--- a/rpc/http.go
+++ b/rpc/http.go
@@ -22,71 +22,108 @@ import (
"fmt"
"io"
"io/ioutil"
+ "net"
"net/http"
- "net/url"
"strings"
+ "sync"
+ "time"
"github.com/rs/cors"
+ "golang.org/x/net/context"
)
const (
maxHTTPRequestContentLength = 1024 * 128
)
-// httpClient connects to a geth RPC server over HTTP.
-type httpClient struct {
- endpoint *url.URL // HTTP-RPC server endpoint
- httpClient http.Client // reuse connection
- lastRes []byte // HTTP requests are synchronous, store last response
+var nullAddr, _ = net.ResolveTCPAddr("tcp", "127.0.0.1:0")
+
+type httpConn struct {
+ client *http.Client
+ req *http.Request
+ closeOnce sync.Once
+ closed chan struct{}
+}
+
+// httpConn is treated specially by Client.
+func (hc *httpConn) LocalAddr() net.Addr { return nullAddr }
+func (hc *httpConn) RemoteAddr() net.Addr { return nullAddr }
+func (hc *httpConn) SetReadDeadline(time.Time) error { return nil }
+func (hc *httpConn) SetWriteDeadline(time.Time) error { return nil }
+func (hc *httpConn) SetDeadline(time.Time) error { return nil }
+func (hc *httpConn) Write([]byte) (int, error) { panic("Write called") }
+
+func (hc *httpConn) Read(b []byte) (int, error) {
+ <-hc.closed
+ return 0, io.EOF
+}
+
+func (hc *httpConn) Close() error {
+ hc.closeOnce.Do(func() { close(hc.closed) })
+ return nil
}
-// NewHTTPClient create a new RPC clients that connection to a geth RPC server
-// over HTTP.
-func NewHTTPClient(endpoint string) (Client, error) {
- url, err := url.Parse(endpoint)
+// DialHTTP creates a new RPC clients that connection to an RPC server over HTTP.
+func DialHTTP(endpoint string) (*Client, error) {
+ req, err := http.NewRequest("POST", endpoint, nil)
if err != nil {
return nil, err
}
- return &httpClient{endpoint: url}, nil
-}
+ req.Header.Set("Content-Type", "application/json")
+ req.Header.Set("Accept", "application/json")
-// Send will serialize the given msg to JSON and sends it to the RPC server.
-// Since HTTP is synchronous the response is stored until Recv is called.
-func (client *httpClient) Send(msg interface{}) error {
- var body []byte
- var err error
+ initctx := context.Background()
+ return newClient(initctx, func(context.Context) (net.Conn, error) {
+ return &httpConn{client: new(http.Client), req: req, closed: make(chan struct{})}, nil
+ })
+}
- client.lastRes = nil
- if body, err = json.Marshal(msg); err != nil {
+func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) error {
+ hc := c.writeConn.(*httpConn)
+ respBody, err := hc.doRequest(ctx, msg)
+ if err != nil {
return err
}
+ defer respBody.Close()
+ var respmsg jsonrpcMessage
+ if err := json.NewDecoder(respBody).Decode(&respmsg); err != nil {
+ return err
+ }
+ op.resp <- &respmsg
+ return nil
+}
- resp, err := client.httpClient.Post(client.endpoint.String(), "application/json", bytes.NewReader(body))
+func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonrpcMessage) error {
+ hc := c.writeConn.(*httpConn)
+ respBody, err := hc.doRequest(ctx, msgs)
if err != nil {
return err
}
-
- defer resp.Body.Close()
- if resp.StatusCode == http.StatusOK {
- client.lastRes, err = ioutil.ReadAll(resp.Body)
+ defer respBody.Close()
+ var respmsgs []jsonrpcMessage
+ if err := json.NewDecoder(respBody).Decode(&respmsgs); err != nil {
return err
}
-
- return fmt.Errorf("request failed: %s", resp.Status)
-}
-
-// Recv will try to deserialize the last received response into the given msg.
-func (client *httpClient) Recv(msg interface{}) error {
- return json.Unmarshal(client.lastRes, &msg)
+ for _, respmsg := range respmsgs {
+ op.resp <- &respmsg
+ }
+ return nil
}
-// Close is not necessary for httpClient
-func (client *httpClient) Close() {
-}
+func (hc *httpConn) doRequest(ctx context.Context, msg interface{}) (io.ReadCloser, error) {
+ body, err := json.Marshal(msg)
+ if err != nil {
+ return nil, err
+ }
+ client, req := requestWithContext(hc.client, hc.req, ctx)
+ req.Body = ioutil.NopCloser(bytes.NewReader(body))
+ req.ContentLength = int64(len(body))
-// SupportedModules will return the collection of offered RPC modules.
-func (client *httpClient) SupportedModules() (map[string]string, error) {
- return SupportedModules(client)
+ resp, err := client.Do(req)
+ if err != nil {
+ return nil, err
+ }
+ return resp.Body, nil
}
// httpReadWriteNopCloser wraps a io.Reader and io.Writer with a NOP Close method.
@@ -100,43 +137,39 @@ func (t *httpReadWriteNopCloser) Close() error {
return nil
}
-// newJSONHTTPHandler creates a HTTP handler that will parse incoming JSON requests,
-// send the request to the given API provider and sends the response back to the caller.
-func newJSONHTTPHandler(srv *Server) http.HandlerFunc {
- return func(w http.ResponseWriter, r *http.Request) {
- if r.ContentLength > maxHTTPRequestContentLength {
- http.Error(w,
- fmt.Sprintf("content length too large (%d>%d)", r.ContentLength, maxHTTPRequestContentLength),
- http.StatusRequestEntityTooLarge)
- return
- }
-
- w.Header().Set("content-type", "application/json")
-
- // create a codec that reads direct from the request body until
- // EOF and writes the response to w and order the server to process
- // a single request.
- codec := NewJSONCodec(&httpReadWriteNopCloser{r.Body, w})
- defer codec.Close()
- srv.ServeSingleRequest(codec, OptionMethodInvocation)
+// NewHTTPServer creates a new HTTP RPC server around an API provider.
+//
+// Deprecated: Server implements http.Handler
+func NewHTTPServer(corsString string, srv *Server) *http.Server {
+ return &http.Server{Handler: newCorsHandler(srv, corsString)}
+}
+
+// ServeHTTP serves JSON-RPC requests over HTTP.
+func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ if r.ContentLength > maxHTTPRequestContentLength {
+ http.Error(w,
+ fmt.Sprintf("content length too large (%d>%d)", r.ContentLength, maxHTTPRequestContentLength),
+ http.StatusRequestEntityTooLarge)
+ return
}
+ w.Header().Set("content-type", "application/json")
+
+ // create a codec that reads direct from the request body until
+ // EOF and writes the response to w and order the server to process
+ // a single request.
+ codec := NewJSONCodec(&httpReadWriteNopCloser{r.Body, w})
+ defer codec.Close()
+ srv.ServeSingleRequest(codec, OptionMethodInvocation)
}
-// NewHTTPServer creates a new HTTP RPC server around an API provider.
-func NewHTTPServer(corsString string, srv *Server) *http.Server {
+func newCorsHandler(srv *Server, corsString string) http.Handler {
var allowedOrigins []string
for _, domain := range strings.Split(corsString, ",") {
allowedOrigins = append(allowedOrigins, strings.TrimSpace(domain))
}
-
c := cors.New(cors.Options{
AllowedOrigins: allowedOrigins,
AllowedMethods: []string{"POST", "GET"},
})
-
- handler := c.Handler(newJSONHTTPHandler(srv))
-
- return &http.Server{
- Handler: handler,
- }
+ return c.Handler(srv)
}
diff --git a/rpc/inproc.go b/rpc/inproc.go
index 250f5c787..f72b97497 100644
--- a/rpc/inproc.go
+++ b/rpc/inproc.go
@@ -17,45 +17,18 @@
package rpc
import (
- "encoding/json"
- "io"
"net"
-)
-
-// inProcClient is an in-process buffer stream attached to an RPC server.
-type inProcClient struct {
- server *Server
- cl io.Closer
- enc *json.Encoder
- dec *json.Decoder
-}
-// Close tears down the request channel of the in-proc client.
-func (c *inProcClient) Close() {
- c.cl.Close()
-}
-
-// NewInProcRPCClient creates an in-process buffer stream attachment to a given
-// RPC server.
-func NewInProcRPCClient(handler *Server) Client {
- p1, p2 := net.Pipe()
- go handler.ServeCodec(NewJSONCodec(p1), OptionMethodInvocation|OptionSubscriptions)
- return &inProcClient{handler, p2, json.NewEncoder(p2), json.NewDecoder(p2)}
-}
-
-// Send marshals a message into a json format and injects in into the client
-// request channel.
-func (c *inProcClient) Send(msg interface{}) error {
- return c.enc.Encode(msg)
-}
-
-// Recv reads a message from the response channel and tries to parse it into the
-// given msg interface.
-func (c *inProcClient) Recv(msg interface{}) error {
- return c.dec.Decode(msg)
-}
+ "golang.org/x/net/context"
+)
-// Returns the collection of modules the RPC server offers.
-func (c *inProcClient) SupportedModules() (map[string]string, error) {
- return SupportedModules(c)
+// NewInProcClient attaches an in-process connection to the given RPC server.
+func DialInProc(handler *Server) *Client {
+ initctx := context.Background()
+ c, _ := newClient(initctx, func(context.Context) (net.Conn, error) {
+ p1, p2 := net.Pipe()
+ go handler.ServeCodec(NewJSONCodec(p1), OptionMethodInvocation|OptionSubscriptions)
+ return p2, nil
+ })
+ return c
}
diff --git a/rpc/ipc.go b/rpc/ipc.go
index 05d8909ca..c2b9e3871 100644
--- a/rpc/ipc.go
+++ b/rpc/ipc.go
@@ -17,68 +17,39 @@
package rpc
import (
- "encoding/json"
"net"
+
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+ "golang.org/x/net/context"
)
-// CreateIPCListener creates an listener, on Unix platforms this is a unix socket, on Windows this is a named pipe
+// CreateIPCListener creates an listener, on Unix platforms this is a unix socket, on
+// Windows this is a named pipe
func CreateIPCListener(endpoint string) (net.Listener, error) {
return ipcListen(endpoint)
}
-// ipcClient represent an IPC RPC client. It will connect to a given endpoint and tries to communicate with a node using
-// JSON serialization.
-type ipcClient struct {
- endpoint string
- conn net.Conn
- out *json.Encoder
- in *json.Decoder
-}
-
-// NewIPCClient create a new IPC client that will connect on the given endpoint. Messages are JSON encoded and encoded.
-// On Unix it assumes the endpoint is the full path to a unix socket, and Windows the endpoint is an identifier for a
-// named pipe.
-func NewIPCClient(endpoint string) (Client, error) {
- conn, err := newIPCConnection(endpoint)
- if err != nil {
- return nil, err
- }
- return &ipcClient{endpoint: endpoint, conn: conn, in: json.NewDecoder(conn), out: json.NewEncoder(conn)}, nil
-}
-
-// Send will serialize the given message and send it to the server.
-// When sending the message fails it will try to reconnect once and send the message again.
-func (client *ipcClient) Send(msg interface{}) error {
- if err := client.out.Encode(msg); err == nil {
- return nil
- }
-
- // retry once
- client.conn.Close()
-
- conn, err := newIPCConnection(client.endpoint)
- if err != nil {
- return err
+// ServeListener accepts connections on l, serving JSON-RPC on them.
+func (srv *Server) ServeListener(l net.Listener) error {
+ for {
+ conn, err := l.Accept()
+ if err != nil {
+ return err
+ }
+ glog.V(logger.Detail).Infoln("accepted conn", conn.RemoteAddr())
+ go srv.ServeCodec(NewJSONCodec(conn), OptionMethodInvocation|OptionSubscriptions)
}
-
- client.conn = conn
- client.in = json.NewDecoder(conn)
- client.out = json.NewEncoder(conn)
-
- return client.out.Encode(msg)
-}
-
-// Recv will read a message from the connection and tries to parse it. It assumes the received message is JSON encoded.
-func (client *ipcClient) Recv(msg interface{}) error {
- return client.in.Decode(&msg)
-}
-
-// Close will close the underlying IPC connection
-func (client *ipcClient) Close() {
- client.conn.Close()
}
-// SupportedModules will return the collection of offered RPC modules.
-func (client *ipcClient) SupportedModules() (map[string]string, error) {
- return SupportedModules(client)
+// DialIPC create a new IPC client that connects to the given endpoint. On Unix it assumes
+// the endpoint is the full path to a unix socket, and Windows the endpoint is an
+// identifier for a named pipe.
+//
+// The context is used for the initial connection establishment. It does not
+// affect subsequent interactions with the client.
+func DialIPC(ctx context.Context, endpoint string) (*Client, error) {
+ return newClient(ctx, func(ctx context.Context) (net.Conn, error) {
+ return newIPCConnection(ctx, endpoint)
+ })
}
diff --git a/rpc/ipc_unix.go b/rpc/ipc_unix.go
index 9ece01240..a25b21627 100644
--- a/rpc/ipc_unix.go
+++ b/rpc/ipc_unix.go
@@ -22,6 +22,8 @@ import (
"net"
"os"
"path/filepath"
+
+ "golang.org/x/net/context"
)
// ipcListen will create a Unix socket on the given endpoint.
@@ -40,6 +42,6 @@ func ipcListen(endpoint string) (net.Listener, error) {
}
// newIPCConnection will connect to a Unix socket on the given endpoint.
-func newIPCConnection(endpoint string) (net.Conn, error) {
- return net.DialUnix("unix", nil, &net.UnixAddr{Name: endpoint, Net: "unix"})
+func newIPCConnection(ctx context.Context, endpoint string) (net.Conn, error) {
+ return dialContext(ctx, "unix", endpoint)
}
diff --git a/rpc/ipc_windows.go b/rpc/ipc_windows.go
index 8342d04d5..68234d215 100644
--- a/rpc/ipc_windows.go
+++ b/rpc/ipc_windows.go
@@ -22,16 +22,27 @@ import (
"net"
"time"
+ "golang.org/x/net/context"
"gopkg.in/natefinch/npipe.v2"
)
+// This is used if the dialing context has no deadline. It is much smaller than the
+// defaultDialTimeout because named pipes are local and there is no need to wait so long.
+const defaultPipeDialTimeout = 2 * time.Second
+
// ipcListen will create a named pipe on the given endpoint.
func ipcListen(endpoint string) (net.Listener, error) {
return npipe.Listen(endpoint)
}
// newIPCConnection will connect to a named pipe with the given endpoint as name.
-func newIPCConnection(endpoint string) (net.Conn, error) {
- timeout := 5 * time.Second
+func newIPCConnection(ctx context.Context, endpoint string) (net.Conn, error) {
+ timeout := defaultPipeDialTimeout
+ if deadline, ok := ctx.Deadline(); ok {
+ timeout = deadline.Sub(time.Now())
+ if timeout < 0 {
+ timeout = 0
+ }
+ }
return npipe.DialTimeout(endpoint, timeout)
}
diff --git a/rpc/json.go b/rpc/json.go
index ee931bc87..a7053e3f5 100644
--- a/rpc/json.go
+++ b/rpc/json.go
@@ -30,49 +30,43 @@ import (
)
const (
- JSONRPCVersion = "2.0"
+ jsonrpcVersion = "2.0"
serviceMethodSeparator = "_"
subscribeMethod = "eth_subscribe"
unsubscribeMethod = "eth_unsubscribe"
notificationMethod = "eth_subscription"
)
-// JSON-RPC request
-type JSONRequest struct {
+type jsonRequest struct {
Method string `json:"method"`
Version string `json:"jsonrpc"`
Id json.RawMessage `json:"id,omitempty"`
Payload json.RawMessage `json:"params,omitempty"`
}
-// JSON-RPC response
-type JSONSuccessResponse struct {
+type jsonSuccessResponse struct {
Version string `json:"jsonrpc"`
Id interface{} `json:"id,omitempty"`
Result interface{} `json:"result"`
}
-// JSON-RPC error object
-type JSONError struct {
+type jsonError struct {
Code int `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data,omitempty"`
}
-// JSON-RPC error response
-type JSONErrResponse struct {
+type jsonErrResponse struct {
Version string `json:"jsonrpc"`
Id interface{} `json:"id,omitempty"`
- Error JSONError `json:"error"`
+ Error jsonError `json:"error"`
}
-// JSON-RPC notification payload
type jsonSubscription struct {
Subscription string `json:"subscription"`
Result interface{} `json:"result,omitempty"`
}
-// JSON-RPC notification
type jsonNotification struct {
Version string `json:"jsonrpc"`
Method string `json:"method"`
@@ -91,6 +85,17 @@ type jsonCodec struct {
rw io.ReadWriteCloser // connection
}
+func (err *jsonError) Error() string {
+ if err.Message == "" {
+ return fmt.Sprintf("json-rpc error %d", err.Code)
+ }
+ return err.Message
+}
+
+func (err *jsonError) ErrorCode() int {
+ return err.Code
+}
+
// NewJSONCodec creates a new RPC server codec with support for JSON-RPC 2.0
func NewJSONCodec(rwc io.ReadWriteCloser) ServerCodec {
d := json.NewDecoder(rwc)
@@ -113,7 +118,7 @@ func isBatch(msg json.RawMessage) bool {
// ReadRequestHeaders will read new requests without parsing the arguments. It will
// return a collection of requests, an indication if these requests are in batch
// form or an error when the incoming message could not be read/parsed.
-func (c *jsonCodec) ReadRequestHeaders() ([]rpcRequest, bool, RPCError) {
+func (c *jsonCodec) ReadRequestHeaders() ([]rpcRequest, bool, Error) {
c.decMu.Lock()
defer c.decMu.Unlock()
@@ -148,8 +153,8 @@ func checkReqId(reqId json.RawMessage) error {
// parseRequest will parse a single request from the given RawMessage. It will return
// the parsed request, an indication if the request was a batch or an error when
// the request could not be parsed.
-func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCError) {
- var in JSONRequest
+func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, Error) {
+ var in jsonRequest
if err := json.Unmarshal(incomingMsg, &in); err != nil {
return nil, false, &invalidMessageError{err.Error()}
}
@@ -197,8 +202,8 @@ func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCError) {
// parseBatchRequest will parse a batch request into a collection of requests from the given RawMessage, an indication
// if the request was a batch or an error when the request could not be read.
-func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCError) {
- var in []JSONRequest
+func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, Error) {
+ var in []jsonRequest
if err := json.Unmarshal(incomingMsg, &in); err != nil {
return nil, false, &invalidMessageError{err.Error()}
}
@@ -253,7 +258,7 @@ func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCErro
// ParseRequestArguments tries to parse the given params (json.RawMessage) with the given types. It returns the parsed
// values or an error when the parsing failed.
-func (c *jsonCodec) ParseRequestArguments(argTypes []reflect.Type, params interface{}) ([]reflect.Value, RPCError) {
+func (c *jsonCodec) ParseRequestArguments(argTypes []reflect.Type, params interface{}) ([]reflect.Value, Error) {
if args, ok := params.(json.RawMessage); !ok {
return nil, &invalidParamsError{"Invalid params supplied"}
} else {
@@ -264,7 +269,7 @@ func (c *jsonCodec) ParseRequestArguments(argTypes []reflect.Type, params interf
// parsePositionalArguments tries to parse the given args to an array of values with the given types.
// It returns the parsed values or an error when the args could not be parsed. Missing optional arguments
// are returned as reflect.Zero values.
-func parsePositionalArguments(args json.RawMessage, callbackArgs []reflect.Type) ([]reflect.Value, RPCError) {
+func parsePositionalArguments(args json.RawMessage, callbackArgs []reflect.Type) ([]reflect.Value, Error) {
params := make([]interface{}, 0, len(callbackArgs))
for _, t := range callbackArgs {
params = append(params, reflect.New(t).Interface())
@@ -302,31 +307,31 @@ func parsePositionalArguments(args json.RawMessage, callbackArgs []reflect.Type)
// CreateResponse will create a JSON-RPC success response with the given id and reply as result.
func (c *jsonCodec) CreateResponse(id interface{}, reply interface{}) interface{} {
if isHexNum(reflect.TypeOf(reply)) {
- return &JSONSuccessResponse{Version: JSONRPCVersion, Id: id, Result: fmt.Sprintf(`%#x`, reply)}
+ return &jsonSuccessResponse{Version: jsonrpcVersion, Id: id, Result: fmt.Sprintf(`%#x`, reply)}
}
- return &JSONSuccessResponse{Version: JSONRPCVersion, Id: id, Result: reply}
+ return &jsonSuccessResponse{Version: jsonrpcVersion, Id: id, Result: reply}
}
// CreateErrorResponse will create a JSON-RPC error response with the given id and error.
-func (c *jsonCodec) CreateErrorResponse(id interface{}, err RPCError) interface{} {
- return &JSONErrResponse{Version: JSONRPCVersion, Id: id, Error: JSONError{Code: err.Code(), Message: err.Error()}}
+func (c *jsonCodec) CreateErrorResponse(id interface{}, err Error) interface{} {
+ return &jsonErrResponse{Version: jsonrpcVersion, Id: id, Error: jsonError{Code: err.ErrorCode(), Message: err.Error()}}
}
// CreateErrorResponseWithInfo will create a JSON-RPC error response with the given id and error.
// info is optional and contains additional information about the error. When an empty string is passed it is ignored.
-func (c *jsonCodec) CreateErrorResponseWithInfo(id interface{}, err RPCError, info interface{}) interface{} {
- return &JSONErrResponse{Version: JSONRPCVersion, Id: id,
- Error: JSONError{Code: err.Code(), Message: err.Error(), Data: info}}
+func (c *jsonCodec) CreateErrorResponseWithInfo(id interface{}, err Error, info interface{}) interface{} {
+ return &jsonErrResponse{Version: jsonrpcVersion, Id: id,
+ Error: jsonError{Code: err.ErrorCode(), Message: err.Error(), Data: info}}
}
// CreateNotification will create a JSON-RPC notification with the given subscription id and event as params.
func (c *jsonCodec) CreateNotification(subid string, event interface{}) interface{} {
if isHexNum(reflect.TypeOf(event)) {
- return &jsonNotification{Version: JSONRPCVersion, Method: notificationMethod,
+ return &jsonNotification{Version: jsonrpcVersion, Method: notificationMethod,
Params: jsonSubscription{Subscription: subid, Result: fmt.Sprintf(`%#x`, event)}}
}
- return &jsonNotification{Version: JSONRPCVersion, Method: notificationMethod,
+ return &jsonNotification{Version: jsonrpcVersion, Method: notificationMethod,
Params: jsonSubscription{Subscription: subid, Result: event}}
}
diff --git a/rpc/notification.go b/rpc/notification.go
index e84e26a58..875433071 100644
--- a/rpc/notification.go
+++ b/rpc/notification.go
@@ -28,7 +28,7 @@ import (
var (
// ErrNotificationsUnsupported is returned when the connection doesn't support notifications
- ErrNotificationsUnsupported = errors.New("notifications not supported")
+ ErrNotificationsUnsupported = errors.New("subscription notifications not supported by the current transport")
// ErrNotificationNotFound is returned when the notification for the given id is not found
ErrNotificationNotFound = errors.New("notification not found")
diff --git a/rpc/notification_test.go b/rpc/notification_test.go
index 1bcede177..280503222 100644
--- a/rpc/notification_test.go
+++ b/rpc/notification_test.go
@@ -19,20 +19,31 @@ package rpc
import (
"encoding/json"
"net"
+ "sync"
"testing"
"time"
"golang.org/x/net/context"
)
-type NotificationTestService struct{}
+type NotificationTestService struct {
+ mu sync.Mutex
+ unsubscribed bool
-var (
- unsubCallbackCalled = false
-)
+ gotHangSubscriptionReq chan struct{}
+ unblockHangSubscription chan struct{}
+}
+
+func (s *NotificationTestService) wasUnsubCallbackCalled() bool {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ return s.unsubscribed
+}
func (s *NotificationTestService) Unsubscribe(subid string) {
- unsubCallbackCalled = true
+ s.mu.Lock()
+ s.unsubscribed = true
+ s.mu.Unlock()
}
func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val int) (Subscription, error) {
@@ -60,6 +71,26 @@ func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val i
return subscription, nil
}
+// HangSubscription blocks on s.unblockHangSubscription before
+// sending anything.
+func (s *NotificationTestService) HangSubscription(ctx context.Context, val int) (Subscription, error) {
+ notifier, supported := NotifierFromContext(ctx)
+ if !supported {
+ return nil, ErrNotificationsUnsupported
+ }
+
+ s.gotHangSubscriptionReq <- struct{}{}
+ <-s.unblockHangSubscription
+ subscription, err := notifier.NewSubscription(s.Unsubscribe)
+ if err != nil {
+ return nil, err
+ }
+ go func() {
+ subscription.Notify(val)
+ }()
+ return subscription, nil
+}
+
func TestNotifications(t *testing.T) {
server := NewServer()
service := &NotificationTestService{}
@@ -90,7 +121,7 @@ func TestNotifications(t *testing.T) {
}
var subid string
- response := JSONSuccessResponse{Result: subid}
+ response := jsonSuccessResponse{Result: subid}
if err := in.Decode(&response); err != nil {
t.Fatal(err)
}
@@ -114,7 +145,7 @@ func TestNotifications(t *testing.T) {
clientConn.Close() // causes notification unsubscribe callback to be called
time.Sleep(1 * time.Second)
- if !unsubCallbackCalled {
+ if !service.wasUnsubCallbackCalled() {
t.Error("unsubscribe callback not called after closing connection")
}
}
diff --git a/rpc/server.go b/rpc/server.go
index a9bdef285..040805a5c 100644
--- a/rpc/server.go
+++ b/rpc/server.go
@@ -381,7 +381,7 @@ func (s *Server) execBatch(ctx context.Context, codec ServerCodec, requests []*s
// readRequest requests the next (batch) request from the codec. It will return the collection
// of requests, an indication if the request was a batch, the invalid request identifier and an
// error when the request could not be read/parsed.
-func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCError) {
+func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error) {
reqs, batch, err := codec.ReadRequestHeaders()
if err != nil {
return nil, batch, err
diff --git a/rpc/server_test.go b/rpc/server_test.go
index de47e1afd..e6840bde4 100644
--- a/rpc/server_test.go
+++ b/rpc/server_test.go
@@ -21,6 +21,7 @@ import (
"net"
"reflect"
"testing"
+ "time"
"golang.org/x/net/context"
)
@@ -48,6 +49,13 @@ func (s *Service) EchoWithCtx(ctx context.Context, str string, i int, args *Args
return Result{str, i, args}
}
+func (s *Service) Sleep(ctx context.Context, duration time.Duration) {
+ select {
+ case <-time.After(duration):
+ case <-ctx.Done():
+ }
+}
+
func (s *Service) Rets() (string, error) {
return "", nil
}
@@ -85,8 +93,8 @@ func TestServerRegisterName(t *testing.T) {
t.Fatalf("Expected service calc to be registered")
}
- if len(svc.callbacks) != 4 {
- t.Errorf("Expected 4 callbacks for service 'calc', got %d", len(svc.callbacks))
+ if len(svc.callbacks) != 5 {
+ t.Errorf("Expected 5 callbacks for service 'calc', got %d", len(svc.callbacks))
}
if len(svc.subscriptions) != 1 {
@@ -126,7 +134,7 @@ func testServerMethodExecution(t *testing.T, method string) {
t.Fatal(err)
}
- response := JSONSuccessResponse{Result: &Result{}}
+ response := jsonSuccessResponse{Result: &Result{}}
if err := in.Decode(&response); err != nil {
t.Fatal(err)
}
diff --git a/rpc/types.go b/rpc/types.go
index 460581715..2a7268ad8 100644
--- a/rpc/types.go
+++ b/rpc/types.go
@@ -62,7 +62,7 @@ type serverRequest struct {
callb *callback
args []reflect.Value
isUnsubscribe bool
- err RPCError
+ err Error
}
type serviceRegistry map[string]*service // collection of services
@@ -88,15 +88,13 @@ type rpcRequest struct {
id interface{}
isPubSub bool
params interface{}
- err RPCError // invalid batch element
+ err Error // invalid batch element
}
-// RPCError implements RPC error, is add support for error codec over regular go errors
-type RPCError interface {
- // RPC error code
- Code() int
- // Error message
- Error() string
+// Error wraps RPC errors, which contain an error code in addition to the message.
+type Error interface {
+ Error() string // returns the message
+ ErrorCode() int // returns the code
}
// ServerCodec implements reading, parsing and writing RPC messages for the server side of
@@ -104,15 +102,15 @@ type RPCError interface {
// multiple go-routines concurrently.
type ServerCodec interface {
// Read next request
- ReadRequestHeaders() ([]rpcRequest, bool, RPCError)
+ ReadRequestHeaders() ([]rpcRequest, bool, Error)
// Parse request argument to the given types
- ParseRequestArguments([]reflect.Type, interface{}) ([]reflect.Value, RPCError)
+ ParseRequestArguments([]reflect.Type, interface{}) ([]reflect.Value, Error)
// Assemble success response, expects response id and payload
CreateResponse(interface{}, interface{}) interface{}
// Assemble error response, expects response id and error
- CreateErrorResponse(interface{}, RPCError) interface{}
+ CreateErrorResponse(interface{}, Error) interface{}
// Assemble error response with extra information about the error through info
- CreateErrorResponseWithInfo(id interface{}, err RPCError, info interface{}) interface{}
+ CreateErrorResponseWithInfo(id interface{}, err Error, info interface{}) interface{}
// Create notification response
CreateNotification(string, interface{}) interface{}
// Write msg to client.
@@ -274,14 +272,3 @@ func (bn *BlockNumber) UnmarshalJSON(data []byte) error {
func (bn *BlockNumber) Int64() int64 {
return (int64)(*bn)
}
-
-// Client defines the interface for go client that wants to connect to a geth RPC endpoint
-type Client interface {
- // SupportedModules returns the collection of API's the server offers
- SupportedModules() (map[string]string, error)
-
- Send(req interface{}) error
- Recv(msg interface{}) error
-
- Close()
-}
diff --git a/rpc/utils.go b/rpc/utils.go
index fe482e19d..1ac6698f5 100644
--- a/rpc/utils.go
+++ b/rpc/utils.go
@@ -20,7 +20,6 @@ import (
"crypto/rand"
"encoding/hex"
"errors"
- "fmt"
"math/big"
"reflect"
"unicode"
@@ -227,31 +226,3 @@ func newSubscriptionID() (string, error) {
}
return "0x" + hex.EncodeToString(subid[:]), nil
}
-
-// SupportedModules returns the collection of API's that the RPC server offers
-// on which the given client connects.
-func SupportedModules(client Client) (map[string]string, error) {
- req := JSONRequest{
- Id: []byte("1"),
- Version: "2.0",
- Method: MetadataApi + "_modules",
- }
- if err := client.Send(req); err != nil {
- return nil, err
- }
-
- var response JSONSuccessResponse
- if err := client.Recv(&response); err != nil {
- return nil, err
- }
- if response.Result != nil {
- mods := make(map[string]string)
- if modules, ok := response.Result.(map[string]interface{}); ok {
- for m, v := range modules {
- mods[m] = fmt.Sprintf("%s", v)
- }
- return mods, nil
- }
- }
- return nil, fmt.Errorf("unable to retrieve modules")
-}
diff --git a/rpc/websocket.go b/rpc/websocket.go
index fe9354d94..fc3cd0709 100644
--- a/rpc/websocket.go
+++ b/rpc/websocket.go
@@ -17,36 +17,39 @@
package rpc
import (
+ "crypto/tls"
"fmt"
+ "net"
"net/http"
+ "net/url"
"os"
"strings"
- "sync"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
+ "golang.org/x/net/context"
"golang.org/x/net/websocket"
"gopkg.in/fatih/set.v0"
)
-// wsReaderWriterCloser reads and write payloads from and to a websocket connection.
-type wsReaderWriterCloser struct {
- c *websocket.Conn
-}
-
-// Read will read incoming payload data into p.
-func (rw *wsReaderWriterCloser) Read(p []byte) (int, error) {
- return rw.c.Read(p)
-}
-
-// Write writes p to the websocket.
-func (rw *wsReaderWriterCloser) Write(p []byte) (int, error) {
- return rw.c.Write(p)
+// WebsocketHandler returns a handler that serves JSON-RPC to WebSocket connections.
+//
+// allowedOrigins should be a comma-separated list of allowed origin URLs.
+// To allow connections with any origin, pass "*".
+func (srv *Server) WebsocketHandler(allowedOrigins string) http.Handler {
+ return websocket.Server{
+ Handshake: wsHandshakeValidator(strings.Split(allowedOrigins, ",")),
+ Handler: func(conn *websocket.Conn) {
+ srv.ServeCodec(NewJSONCodec(conn), OptionMethodInvocation|OptionSubscriptions)
+ },
+ }
}
-// Close closes the websocket connection.
-func (rw *wsReaderWriterCloser) Close() error {
- return rw.c.Close()
+// NewWSServer creates a new websocket RPC server around an API provider.
+//
+// Deprecated: use Server.WebsocketHandler
+func NewWSServer(allowedOrigins string, srv *Server) *http.Server {
+ return &http.Server{Handler: srv.WebsocketHandler(allowedOrigins)}
}
// wsHandshakeValidator returns a handler that verifies the origin during the
@@ -87,96 +90,63 @@ func wsHandshakeValidator(allowedOrigins []string) func(*websocket.Config, *http
return f
}
-// NewWSServer creates a new websocket RPC server around an API provider.
-func NewWSServer(allowedOrigins string, handler *Server) *http.Server {
- return &http.Server{
- Handler: websocket.Server{
- Handshake: wsHandshakeValidator(strings.Split(allowedOrigins, ",")),
- Handler: func(conn *websocket.Conn) {
- handler.ServeCodec(NewJSONCodec(&wsReaderWriterCloser{conn}),
- OptionMethodInvocation|OptionSubscriptions)
- },
- },
+// DialWebsocket creates a new RPC client that communicates with a JSON-RPC server
+// that is listening on the given endpoint.
+//
+// The context is used for the initial connection establishment. It does not
+// affect subsequent interactions with the client.
+func DialWebsocket(ctx context.Context, endpoint, origin string) (*Client, error) {
+ if origin == "" {
+ var err error
+ if origin, err = os.Hostname(); err != nil {
+ return nil, err
+ }
+ if strings.HasPrefix(endpoint, "wss") {
+ origin = "https://" + strings.ToLower(origin)
+ } else {
+ origin = "http://" + strings.ToLower(origin)
+ }
+ }
+ config, err := websocket.NewConfig(endpoint, origin)
+ if err != nil {
+ return nil, err
}
-}
-
-// wsClient represents a RPC client that communicates over websockets with a
-// RPC server.
-type wsClient struct {
- endpoint string
- connMu sync.Mutex
- conn *websocket.Conn
-}
-// NewWSClientj creates a new RPC client that communicates with a RPC server
-// that is listening on the given endpoint using JSON encoding.
-func NewWSClient(endpoint string) (Client, error) {
- return &wsClient{endpoint: endpoint}, nil
+ return newClient(ctx, func(ctx context.Context) (net.Conn, error) {
+ return wsDialContext(ctx, config)
+ })
}
-// connection will return a websocket connection to the RPC server. It will
-// (re)connect when necessary.
-func (client *wsClient) connection() (*websocket.Conn, error) {
- if client.conn != nil {
- return client.conn, nil
+func wsDialContext(ctx context.Context, config *websocket.Config) (*websocket.Conn, error) {
+ var conn net.Conn
+ var err error
+ switch config.Location.Scheme {
+ case "ws":
+ conn, err = dialContext(ctx, "tcp", wsDialAddress(config.Location))
+ case "wss":
+ dialer := contextDialer(ctx)
+ conn, err = tls.DialWithDialer(dialer, "tcp", wsDialAddress(config.Location), config.TlsConfig)
+ default:
+ err = websocket.ErrBadScheme
}
-
- origin, err := os.Hostname()
if err != nil {
return nil, err
}
-
- origin = "http://" + origin
- client.conn, err = websocket.Dial(client.endpoint, "", origin)
-
- return client.conn, err
-}
-
-// SupportedModules is the collection of modules the RPC server offers.
-func (client *wsClient) SupportedModules() (map[string]string, error) {
- return SupportedModules(client)
-}
-
-// Send writes the JSON serialized msg to the websocket. It will create a new
-// websocket connection to the server if the client is currently not connected.
-func (client *wsClient) Send(msg interface{}) (err error) {
- client.connMu.Lock()
- defer client.connMu.Unlock()
-
- var conn *websocket.Conn
- if conn, err = client.connection(); err == nil {
- if err = websocket.JSON.Send(conn, msg); err != nil {
- client.conn.Close()
- client.conn = nil
- }
+ ws, err := websocket.NewClient(config, conn)
+ if err != nil {
+ conn.Close()
+ return nil, err
}
-
- return err
+ return ws, err
}
-// Recv reads a JSON message from the websocket and unmarshals it into msg.
-func (client *wsClient) Recv(msg interface{}) (err error) {
- client.connMu.Lock()
- defer client.connMu.Unlock()
+var wsPortMap = map[string]string{"ws": "80", "wss": "443"}
- var conn *websocket.Conn
- if conn, err = client.connection(); err == nil {
- if err = websocket.JSON.Receive(conn, msg); err != nil {
- client.conn.Close()
- client.conn = nil
+func wsDialAddress(location *url.URL) string {
+ if _, ok := wsPortMap[location.Scheme]; ok {
+ if _, _, err := net.SplitHostPort(location.Host); err != nil {
+ return net.JoinHostPort(location.Host, wsPortMap[location.Scheme])
}
}
- return
-}
-
-// Close closes the underlaying websocket connection.
-func (client *wsClient) Close() {
- client.connMu.Lock()
- defer client.connMu.Unlock()
-
- if client.conn != nil {
- client.conn.Close()
- client.conn = nil
- }
-
+ return location.Host
}