aboutsummaryrefslogblamecommitdiffstats
path: root/rpc/handler.go
blob: 92db89e2f8896c78b4873a29c3d7fd99b9a06813 (plain) (tree)










































































































































































































































































































                                                                                                           
                                                                                                                                      
































































































                                                                                                                              
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package rpc

import (
    "context"
    "encoding/json"
    "reflect"
    "strconv"
    "strings"
    "sync"
    "time"

    "github.com/ethereum/go-ethereum/log"
)

// handler handles JSON-RPC messages. There is one handler per connection. Note that
// handler is not safe for concurrent use. Message handling never blocks indefinitely
// because RPCs are processed on background goroutines launched by handler.
//
// The entry points for incoming messages are:
//
//    h.handleMsg(message)
//    h.handleBatch(message)
//
// Outgoing calls use the requestOp struct. Register the request before sending it
// on the connection:
//
//    op := &requestOp{ids: ...}
//    h.addRequestOp(op)
//
// Now send the request, then wait for the reply to be delivered through handleMsg:
//
//    if err := op.wait(...); err != nil {
//        h.removeRequestOp(op) // timeout, etc.
//    }
//
type handler struct {
    reg            *serviceRegistry
    unsubscribeCb  *callback
    idgen          func() ID                      // subscription ID generator
    respWait       map[string]*requestOp          // active client requests
    clientSubs     map[string]*ClientSubscription // active client subscriptions
    callWG         sync.WaitGroup                 // pending call goroutines
    rootCtx        context.Context                // canceled by close()
    cancelRoot     func()                         // cancel function for rootCtx
    conn           jsonWriter                     // where responses will be sent
    log            log.Logger
    allowSubscribe bool

    subLock    sync.Mutex
    serverSubs map[ID]*Subscription
}

type callProc struct {
    ctx       context.Context
    notifiers []*Notifier
}

func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry) *handler {
    rootCtx, cancelRoot := context.WithCancel(connCtx)
    h := &handler{
        reg:            reg,
        idgen:          idgen,
        conn:           conn,
        respWait:       make(map[string]*requestOp),
        clientSubs:     make(map[string]*ClientSubscription),
        rootCtx:        rootCtx,
        cancelRoot:     cancelRoot,
        allowSubscribe: true,
        serverSubs:     make(map[ID]*Subscription),
        log:            log.Root(),
    }
    if conn.RemoteAddr() != "" {
        h.log = h.log.New("conn", conn.RemoteAddr())
    }
    h.unsubscribeCb = newCallback(reflect.Value{}, reflect.ValueOf(h.unsubscribe))
    return h
}

// handleBatch executes all messages in a batch and returns the responses.
func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
    // Emit error response for empty batches:
    if len(msgs) == 0 {
        h.startCallProc(func(cp *callProc) {
            h.conn.Write(cp.ctx, errorMessage(&invalidRequestError{"empty batch"}))
        })
        return
    }

    // Handle non-call messages first:
    calls := make([]*jsonrpcMessage, 0, len(msgs))
    for _, msg := range msgs {
        if handled := h.handleImmediate(msg); !handled {
            calls = append(calls, msg)
        }
    }
    if len(calls) == 0 {
        return
    }
    // Process calls on a goroutine because they may block indefinitely:
    h.startCallProc(func(cp *callProc) {
        answers := make([]*jsonrpcMessage, 0, len(msgs))
        for _, msg := range calls {
            if answer := h.handleCallMsg(cp, msg); answer != nil {
                answers = append(answers, answer)
            }
        }
        h.addSubscriptions(cp.notifiers)
        if len(answers) > 0 {
            h.conn.Write(cp.ctx, answers)
        }
        for _, n := range cp.notifiers {
            n.activate()
        }
    })
}

// handleMsg handles a single message.
func (h *handler) handleMsg(msg *jsonrpcMessage) {
    if ok := h.handleImmediate(msg); ok {
        return
    }
    h.startCallProc(func(cp *callProc) {
        answer := h.handleCallMsg(cp, msg)
        h.addSubscriptions(cp.notifiers)
        if answer != nil {
            h.conn.Write(cp.ctx, answer)
        }
        for _, n := range cp.notifiers {
            n.activate()
        }
    })
}

// close cancels all requests except for inflightReq and waits for
// call goroutines to shut down.
func (h *handler) close(err error, inflightReq *requestOp) {
    h.cancelAllRequests(err, inflightReq)
    h.cancelRoot()
    h.callWG.Wait()
    h.cancelServerSubscriptions(err)
}

// addRequestOp registers a request operation.
func (h *handler) addRequestOp(op *requestOp) {
    for _, id := range op.ids {
        h.respWait[string(id)] = op
    }
}

// removeRequestOps stops waiting for the given request IDs.
func (h *handler) removeRequestOp(op *requestOp) {
    for _, id := range op.ids {
        delete(h.respWait, string(id))
    }
}

// cancelAllRequests unblocks and removes pending requests and active subscriptions.
func (h *handler) cancelAllRequests(err error, inflightReq *requestOp) {
    didClose := make(map[*requestOp]bool)
    if inflightReq != nil {
        didClose[inflightReq] = true
    }

    for id, op := range h.respWait {
        // Remove the op so that later calls will not close op.resp again.
        delete(h.respWait, id)

        if !didClose[op] {
            op.err = err
            close(op.resp)
            didClose[op] = true
        }
    }
    for id, sub := range h.clientSubs {
        delete(h.clientSubs, id)
        sub.quitWithError(err, false)
    }
}

func (h *handler) addSubscriptions(nn []*Notifier) {
    h.subLock.Lock()
    defer h.subLock.Unlock()

    for _, n := range nn {
        if sub := n.takeSubscription(); sub != nil {
            h.serverSubs[sub.ID] = sub
        }
    }
}

// cancelServerSubscriptions removes all subscriptions and closes their error channels.
func (h *handler) cancelServerSubscriptions(err error) {
    h.subLock.Lock()
    defer h.subLock.Unlock()

    for id, s := range h.serverSubs {
        s.err <- err
        close(s.err)
        delete(h.serverSubs, id)
    }
}

// startCallProc runs fn in a new goroutine and starts tracking it in the h.calls wait group.
func (h *handler) startCallProc(fn func(*callProc)) {
    h.callWG.Add(1)
    go func() {
        ctx, cancel := context.WithCancel(h.rootCtx)
        defer h.callWG.Done()
        defer cancel()
        fn(&callProc{ctx: ctx})
    }()
}

// handleImmediate executes non-call messages. It returns false if the message is a
// call or requires a reply.
func (h *handler) handleImmediate(msg *jsonrpcMessage) bool {
    start := time.Now()
    switch {
    case msg.isNotification():
        if strings.HasSuffix(msg.Method, notificationMethodSuffix) {
            h.handleSubscriptionResult(msg)
            return true
        }
        return false
    case msg.isResponse():
        h.handleResponse(msg)
        h.log.Trace("Handled RPC response", "reqid", idForLog{msg.ID}, "t", time.Since(start))
        return true
    default:
        return false
    }
}

// handleSubscriptionResult processes subscription notifications.
func (h *handler) handleSubscriptionResult(msg *jsonrpcMessage) {
    var result subscriptionResult
    if err := json.Unmarshal(msg.Params, &result); err != nil {
        h.log.Debug("Dropping invalid subscription message")
        return
    }
    if h.clientSubs[result.ID] != nil {
        h.clientSubs[result.ID].deliver(result.Result)
    }
}

// handleResponse processes method call responses.
func (h *handler) handleResponse(msg *jsonrpcMessage) {
    op := h.respWait[string(msg.ID)]
    if op == nil {
        h.log.Debug("Unsolicited RPC response", "reqid", idForLog{msg.ID})
        return
    }
    delete(h.respWait, string(msg.ID))
    // For normal responses, just forward the reply to Call/BatchCall.
    if op.sub == nil {
        op.resp <- msg
        return
    }
    // For subscription responses, start the subscription if the server
    // indicates success. EthSubscribe gets unblocked in either case through
    // the op.resp channel.
    defer close(op.resp)
    if msg.Error != nil {
        op.err = msg.Error
        return
    }
    if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil {
        go op.sub.start()
        h.clientSubs[op.sub.subid] = op.sub
    }
}

// handleCallMsg executes a call message and returns the answer.
func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMessage {
    start := time.Now()
    switch {
    case msg.isNotification():
        h.handleCall(ctx, msg)
        h.log.Debug("Served "+msg.Method, "t", time.Since(start))
        return nil
    case msg.isCall():
        resp := h.handleCall(ctx, msg)
        if resp.Error != nil {
            h.log.Warn("Served "+msg.Method, "reqid", idForLog{msg.ID}, "t", time.Since(start), "err", resp.Error.Message)
        } else {
            h.log.Debug("Served "+msg.Method, "reqid", idForLog{msg.ID}, "t", time.Since(start))
        }
        return resp
    case msg.hasValidID():
        return msg.errorResponse(&invalidRequestError{"invalid request"})
    default:
        return errorMessage(&invalidRequestError{"invalid request"})
    }
}

// handleCall processes method calls.
func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage {
    if msg.isSubscribe() {
        return h.handleSubscribe(cp, msg)
    }
    var callb *callback
    if msg.isUnsubscribe() {
        callb = h.unsubscribeCb
    } else {
        callb = h.reg.callback(msg.Method)
    }
    if callb == nil {
        return msg.errorResponse(&methodNotFoundError{method: msg.Method})
    }
    args, err := parsePositionalArguments(msg.Params, callb.argTypes)
    if err != nil {
        return msg.errorResponse(&invalidParamsError{err.Error()})
    }

    return h.runMethod(cp.ctx, msg, callb, args)
}

// handleSubscribe processes *_subscribe method calls.
func (h *handler) handleSubscribe(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage {
    if !h.allowSubscribe {
        return msg.errorResponse(ErrNotificationsUnsupported)
    }

    // Subscription method name is first argument.
    name, err := parseSubscriptionName(msg.Params)
    if err != nil {
        return msg.errorResponse(&invalidParamsError{err.Error()})
    }
    namespace := msg.namespace()
    callb := h.reg.subscription(namespace, name)
    if callb == nil {
        return msg.errorResponse(&subscriptionNotFoundError{namespace, name})
    }

    // Parse subscription name arg too, but remove it before calling the callback.
    argTypes := append([]reflect.Type{stringType}, callb.argTypes...)
    args, err := parsePositionalArguments(msg.Params, argTypes)
    if err != nil {
        return msg.errorResponse(&invalidParamsError{err.Error()})
    }
    args = args[1:]

    // Install notifier in context so the subscription handler can find it.
    n := &Notifier{h: h, namespace: namespace}
    cp.notifiers = append(cp.notifiers, n)
    ctx := context.WithValue(cp.ctx, notifierKey{}, n)

    return h.runMethod(ctx, msg, callb, args)
}

// runMethod runs the Go callback for an RPC method.
func (h *handler) runMethod(ctx context.Context, msg *jsonrpcMessage, callb *callback, args []reflect.Value) *jsonrpcMessage {
    result, err := callb.call(ctx, msg.Method, args)
    if err != nil {
        return msg.errorResponse(err)
    }
    return msg.response(result)
}

// unsubscribe is the callback function for all *_unsubscribe calls.
func (h *handler) unsubscribe(ctx context.Context, id ID) (bool, error) {
    h.subLock.Lock()
    defer h.subLock.Unlock()

    s := h.serverSubs[id]
    if s == nil {
        return false, ErrSubscriptionNotFound
    }
    close(s.err)
    delete(h.serverSubs, id)
    return true, nil
}

type idForLog struct{ json.RawMessage }

func (id idForLog) String() string {
    if s, err := strconv.Unquote(string(id.RawMessage)); err == nil {
        return s
    }
    return string(id.RawMessage)
}