aboutsummaryrefslogblamecommitdiffstats
path: root/rpc/server.go
blob: 040805a5c2c3869a98d3f7704c7d93b5db1ffc75 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16















                                                                                  
           



                 
                 
                     
 

                                                     
                                  



                               

                                                                                           

                                                                              
                                        

 










                                                                                           

                                                                           





                                                          



                                                                                                                        
                                                    












                                                                     
                                             




                                     
                                                                                                                     





















                                                                                                                  
                                                                                                                       















                                                                              
                                                                                                               






                                  









                                                                           

                                                                              
  


                                                                                   
                                                                                              






                                                             





                                      

           


                                                               



                                                                                            
                                                                                                               
         


                                                            
                                       



                           
                                                
                                           

                                                        
                                                                          
                                                                        
                                  

                 

                                                                             










                                                                                        
                                  

                 






                                                     
                                                        
                        
                                                      

                 






                                                                                                
                                                                     
                           
                                             


                                                                                                



                                                                                                   

 
                                                                                                               
                                                                     


                                                                             




                                                        



                  




                                                                                                             

                                                 

                                                              

         
                                                            


                                                                        
                                                                                                           
                           
                                                                       

         
                                                                                              
                                                                               
                                                                       



                                                                                                                                   
                                                     

                                                                                                           
                         

                                                                      
                 
                                                                                                                                 


                                  
                                                                   
                               


                                                                                                   
                                                                                               
                                       

                                                                    
                 

                                                                       

         
                                              



                                                                                                 
                                                                      


                                                    


                                                                   



                                                          
                                               
                                                      
                            
                                                             





                                                                                            
                                       

                 
                                                                      


                                                                              
                                                                                   
                                
                           


                                                                      
                                                              
         
 



                                                       




                                                                                             

 

                                                                                    
                                                                                               
                                                       
                              



                                                                                  



                                                                                                






                                                       




                                                                                                          

 


                                                                                               
                                                                                 











                                                      




                                                                          

                                                                                   
                                                                                                             







                                                                                                     
                                                                                       



                                                                                                              
                                                                                                
















                                                                                                                                              
                                                                                  















                                                                                                                   
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package rpc

import (
    "fmt"
    "reflect"
    "runtime"
    "sync/atomic"

    "github.com/ethereum/go-ethereum/logger"
    "github.com/ethereum/go-ethereum/logger/glog"
    "golang.org/x/net/context"
    "gopkg.in/fatih/set.v0"
)

const (
    notificationBufferSize = 10000 // max buffered notifications before codec is closed

    MetadataApi     = "rpc"
    DefaultIPCApis  = "admin,debug,eth,miner,net,personal,shh,txpool,web3"
    DefaultHTTPApis = "eth,net,web3"
)

// CodecOption specifies which type of messages this codec supports
type CodecOption int

const (
    // OptionMethodInvocation is an indication that the codec supports RPC method calls
    OptionMethodInvocation CodecOption = 1 << iota

    // OptionSubscriptions is an indication that the codec suports RPC notifications
    OptionSubscriptions = 1 << iota // support pub sub
)

// NewServer will create a new server instance with no registered handlers.
func NewServer() *Server {
    server := &Server{
        services:      make(serviceRegistry),
        subscriptions: make(subscriptionRegistry),
        codecs:        set.New(),
        run:           1,
    }

    // register a default service which will provide meta information about the RPC service such as the services and
    // methods it offers.
    rpcService := &RPCService{server}
    server.RegisterName(MetadataApi, rpcService)

    return server
}

// RPCService gives meta information about the server.
// e.g. gives information about the loaded modules.
type RPCService struct {
    server *Server
}

// Modules returns the list of RPC services with their version number
func (s *RPCService) Modules() map[string]string {
    modules := make(map[string]string)
    for name := range s.server.services {
        modules[name] = "1.0"
    }
    return modules
}

// RegisterName will create a service for the given rcvr type under the given name. When no methods on the given rcvr
// match the criteria to be either a RPC method or a subscription an error is returned. Otherwise a new service is
// created and added to the service collection this server instance serves.
func (s *Server) RegisterName(name string, rcvr interface{}) error {
    if s.services == nil {
        s.services = make(serviceRegistry)
    }

    svc := new(service)
    svc.typ = reflect.TypeOf(rcvr)
    rcvrVal := reflect.ValueOf(rcvr)

    if name == "" {
        return fmt.Errorf("no service name for type %s", svc.typ.String())
    }
    if !isExported(reflect.Indirect(rcvrVal).Type().Name()) {
        return fmt.Errorf("%s is not exported", reflect.Indirect(rcvrVal).Type().Name())
    }

    // already a previous service register under given sname, merge methods/subscriptions
    if regsvc, present := s.services[name]; present {
        methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ)
        if len(methods) == 0 && len(subscriptions) == 0 {
            return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
        }

        for _, m := range methods {
            regsvc.callbacks[formatName(m.method.Name)] = m
        }
        for _, s := range subscriptions {
            regsvc.subscriptions[formatName(s.method.Name)] = s
        }

        return nil
    }

    svc.name = name
    svc.callbacks, svc.subscriptions = suitableCallbacks(rcvrVal, svc.typ)

    if len(svc.callbacks) == 0 && len(svc.subscriptions) == 0 {
        return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
    }

    s.services[svc.name] = svc

    return nil
}

// hasOption returns true if option is included in options, otherwise false
func hasOption(option CodecOption, options []CodecOption) bool {
    for _, o := range options {
        if option == o {
            return true
        }
    }
    return false
}

// serveRequest will reads requests from the codec, calls the RPC callback and
// writes the response to the given codec.
//
// If singleShot is true it will process a single request, otherwise it will handle
// requests until the codec returns an error when reading a request (in most cases
// an EOF). It executes requests in parallel when singleShot is false.
func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecOption) error {
    defer func() {
        if err := recover(); err != nil {
            const size = 64 << 10
            buf := make([]byte, size)
            buf = buf[:runtime.Stack(buf, false)]
            glog.Errorln(string(buf))
        }

        s.codecsMu.Lock()
        s.codecs.Remove(codec)
        s.codecsMu.Unlock()

        return
    }()

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // if the codec supports notification include a notifier that callbacks can use
    // to send notification to clients. It is thight to the codec/connection. If the
    // connection is closed the notifier will stop and cancels all active subscriptions.
    if options&OptionSubscriptions == OptionSubscriptions {
        ctx = context.WithValue(ctx, notifierKey{}, newBufferedNotifier(codec, notificationBufferSize))
    }
    s.codecsMu.Lock()
    if atomic.LoadInt32(&s.run) != 1 { // server stopped
        s.codecsMu.Unlock()
        return &shutdownError{}
    }
    s.codecs.Add(codec)
    s.codecsMu.Unlock()

    // test if the server is ordered to stop
    for atomic.LoadInt32(&s.run) == 1 {
        reqs, batch, err := s.readRequest(codec)
        if err != nil {
            glog.V(logger.Debug).Infof("read error %v\n", err)
            codec.Write(codec.CreateErrorResponse(nil, err))
            return nil
        }

        // check if server is ordered to shutdown and return an error
        // telling the client that his request failed.
        if atomic.LoadInt32(&s.run) != 1 {
            err = &shutdownError{}
            if batch {
                resps := make([]interface{}, len(reqs))
                for i, r := range reqs {
                    resps[i] = codec.CreateErrorResponse(&r.id, err)
                }
                codec.Write(resps)
            } else {
                codec.Write(codec.CreateErrorResponse(&reqs[0].id, err))
            }
            return nil
        }

        if singleShot && batch {
            s.execBatch(ctx, codec, reqs)
            return nil
        } else if singleShot && !batch {
            s.exec(ctx, codec, reqs[0])
            return nil
        } else if !singleShot && batch {
            go s.execBatch(ctx, codec, reqs)
        } else {
            go s.exec(ctx, codec, reqs[0])
        }
    }

    return nil
}

// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes the
// response back using the given codec. It will block until the codec is closed or the server is
// stopped. In either case the codec is closed.
func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) {
    defer codec.Close()
    s.serveRequest(codec, false, options)
}

// ServeSingleRequest reads and processes a single RPC request from the given codec. It will not
// close the codec unless a non-recoverable error has occurred. Note, this method will return after
// a single request has been processed!
func (s *Server) ServeSingleRequest(codec ServerCodec, options CodecOption) {
    s.serveRequest(codec, true, options)
}

// Stop will stop reading new requests, wait for stopPendingRequestTimeout to allow pending requests to finish,
// close all codecs which will cancel pending requests/subscriptions.
func (s *Server) Stop() {
    if atomic.CompareAndSwapInt32(&s.run, 1, 0) {
        glog.V(logger.Debug).Infoln("RPC Server shutdown initiatied")
        s.codecsMu.Lock()
        defer s.codecsMu.Unlock()
        s.codecs.Each(func(c interface{}) bool {
            c.(ServerCodec).Close()
            return true
        })
    }
}

// createSubscription will call the subscription callback and returns the subscription id or error.
func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *serverRequest) (string, error) {
    // subscription have as first argument the context following optional arguments
    args := []reflect.Value{req.callb.rcvr, reflect.ValueOf(ctx)}
    args = append(args, req.args...)
    reply := req.callb.method.Func.Call(args)

    if !reply[1].IsNil() { // subscription creation failed
        return "", reply[1].Interface().(error)
    }

    return reply[0].Interface().(Subscription).ID(), nil
}

// handle executes a request and returns the response from the callback.
func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverRequest) (interface{}, func()) {
    if req.err != nil {
        return codec.CreateErrorResponse(&req.id, req.err), nil
    }

    if req.isUnsubscribe { // cancel subscription, first param must be the subscription id
        if len(req.args) >= 1 && req.args[0].Kind() == reflect.String {
            notifier, supported := NotifierFromContext(ctx)
            if !supported { // interface doesn't support subscriptions (e.g. http)
                return codec.CreateErrorResponse(&req.id, &callbackError{ErrNotificationsUnsupported.Error()}), nil
            }

            subid := req.args[0].String()
            if err := notifier.Unsubscribe(subid); err != nil {
                return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil
            }

            return codec.CreateResponse(req.id, true), nil
        }
        return codec.CreateErrorResponse(&req.id, &invalidParamsError{"Expected subscription id as first argument"}), nil
    }

    if req.callb.isSubscribe {
        subid, err := s.createSubscription(ctx, codec, req)
        if err != nil {
            return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil
        }

        // active the subscription after the sub id was successfully sent to the client
        activateSub := func() {
            notifier, _ := NotifierFromContext(ctx)
            notifier.(*bufferedNotifier).activate(subid)
        }

        return codec.CreateResponse(req.id, subid), activateSub
    }

    // regular RPC call, prepare arguments
    if len(req.args) != len(req.callb.argTypes) {
        rpcErr := &invalidParamsError{fmt.Sprintf("%s%s%s expects %d parameters, got %d",
            req.svcname, serviceMethodSeparator, req.callb.method.Name,
            len(req.callb.argTypes), len(req.args))}
        return codec.CreateErrorResponse(&req.id, rpcErr), nil
    }

    arguments := []reflect.Value{req.callb.rcvr}
    if req.callb.hasCtx {
        arguments = append(arguments, reflect.ValueOf(ctx))
    }
    if len(req.args) > 0 {
        arguments = append(arguments, req.args...)
    }

    // execute RPC method and return result
    reply := req.callb.method.Func.Call(arguments)
    if len(reply) == 0 {
        return codec.CreateResponse(req.id, nil), nil
    }

    if req.callb.errPos >= 0 { // test if method returned an error
        if !reply[req.callb.errPos].IsNil() {
            e := reply[req.callb.errPos].Interface().(error)
            res := codec.CreateErrorResponse(&req.id, &callbackError{e.Error()})
            return res, nil
        }
    }
    return codec.CreateResponse(req.id, reply[0].Interface()), nil
}

// exec executes the given request and writes the result back using the codec.
func (s *Server) exec(ctx context.Context, codec ServerCodec, req *serverRequest) {
    var response interface{}
    var callback func()
    if req.err != nil {
        response = codec.CreateErrorResponse(&req.id, req.err)
    } else {
        response, callback = s.handle(ctx, codec, req)
    }

    if err := codec.Write(response); err != nil {
        glog.V(logger.Error).Infof("%v\n", err)
        codec.Close()
    }

    // when request was a subscribe request this allows these subscriptions to be actived
    if callback != nil {
        callback()
    }
}

// execBatch executes the given requests and writes the result back using the codec.
// It will only write the response back when the last request is processed.
func (s *Server) execBatch(ctx context.Context, codec ServerCodec, requests []*serverRequest) {
    responses := make([]interface{}, len(requests))
    var callbacks []func()
    for i, req := range requests {
        if req.err != nil {
            responses[i] = codec.CreateErrorResponse(&req.id, req.err)
        } else {
            var callback func()
            if responses[i], callback = s.handle(ctx, codec, req); callback != nil {
                callbacks = append(callbacks, callback)
            }
        }
    }

    if err := codec.Write(responses); err != nil {
        glog.V(logger.Error).Infof("%v\n", err)
        codec.Close()
    }

    // when request holds one of more subscribe requests this allows these subscriptions to be actived
    for _, c := range callbacks {
        c()
    }
}

// readRequest requests the next (batch) request from the codec. It will return the collection
// of requests, an indication if the request was a batch, the invalid request identifier and an
// error when the request could not be read/parsed.
func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error) {
    reqs, batch, err := codec.ReadRequestHeaders()
    if err != nil {
        return nil, batch, err
    }

    requests := make([]*serverRequest, len(reqs))

    // verify requests
    for i, r := range reqs {
        var ok bool
        var svc *service

        if r.err != nil {
            requests[i] = &serverRequest{id: r.id, err: r.err}
            continue
        }

        if r.isPubSub && r.method == unsubscribeMethod {
            requests[i] = &serverRequest{id: r.id, isUnsubscribe: true}
            argTypes := []reflect.Type{reflect.TypeOf("")} // expect subscription id as first arg
            if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil {
                requests[i].args = args
            } else {
                requests[i].err = &invalidParamsError{err.Error()}
            }
            continue
        }

        if svc, ok = s.services[r.service]; !ok { // rpc method isn't available
            requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}
            continue
        }

        if r.isPubSub { // eth_subscribe, r.method contains the subscription method name
            if callb, ok := svc.subscriptions[r.method]; ok {
                requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb}
                if r.params != nil && len(callb.argTypes) > 0 {
                    argTypes := []reflect.Type{reflect.TypeOf("")}
                    argTypes = append(argTypes, callb.argTypes...)
                    if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil {
                        requests[i].args = args[1:] // first one is service.method name which isn't an actual argument
                    } else {
                        requests[i].err = &invalidParamsError{err.Error()}
                    }
                }
            } else {
                requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{subscribeMethod, r.method}}
            }
            continue
        }

        if callb, ok := svc.callbacks[r.method]; ok { // lookup RPC method
            requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb}
            if r.params != nil && len(callb.argTypes) > 0 {
                if args, err := codec.ParseRequestArguments(callb.argTypes, r.params); err == nil {
                    requests[i].args = args
                } else {
                    requests[i].err = &invalidParamsError{err.Error()}
                }
            }
            continue
        }

        requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}
    }

    return requests, batch, nil
}