diff options
Diffstat (limited to 'rpc/server.go')
-rw-r--r-- | rpc/server.go | 443 |
1 files changed, 74 insertions, 369 deletions
diff --git a/rpc/server.go b/rpc/server.go index 214e1d3ed..5a92847f2 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -18,11 +18,7 @@ package rpc import ( "context" - "fmt" - "reflect" - "runtime" - "strings" - "sync" + "io" "sync/atomic" mapset "github.com/deckarep/golang-set" @@ -31,7 +27,9 @@ import ( const MetadataApi = "rpc" -// CodecOption specifies which type of messages this codec supports +// CodecOption specifies which type of messages a codec supports. +// +// Deprecated: this option is no longer honored by Server. type CodecOption int const ( @@ -42,194 +40,87 @@ const ( OptionSubscriptions = 1 << iota // support pub sub ) -// NewServer will create a new server instance with no registered handlers. -func NewServer() *Server { - server := &Server{ - services: make(serviceRegistry), - codecs: mapset.NewSet(), - run: 1, - } +// Server is an RPC server. +type Server struct { + services serviceRegistry + idgen func() ID + run int32 + codecs mapset.Set +} - // register a default service which will provide meta information about the RPC service such as the services and - // methods it offers. +// NewServer creates a new server instance with no registered handlers. +func NewServer() *Server { + server := &Server{idgen: randomIDGenerator(), codecs: mapset.NewSet(), run: 1} + // Register the default service providing meta information about the RPC service such + // as the services and methods it offers. rpcService := &RPCService{server} server.RegisterName(MetadataApi, rpcService) - return server } -// RPCService gives meta information about the server. -// e.g. gives information about the loaded modules. -type RPCService struct { - server *Server -} - -// Modules returns the list of RPC services with their version number -func (s *RPCService) Modules() map[string]string { - modules := make(map[string]string) - for name := range s.server.services { - modules[name] = "1.0" - } - return modules +// RegisterName creates a service for the given receiver type under the given name. When no +// methods on the given receiver match the criteria to be either a RPC method or a +// subscription an error is returned. Otherwise a new service is created and added to the +// service collection this server provides to clients. +func (s *Server) RegisterName(name string, receiver interface{}) error { + return s.services.registerName(name, receiver) } -// RegisterName will create a service for the given rcvr type under the given name. When no methods on the given rcvr -// match the criteria to be either a RPC method or a subscription an error is returned. Otherwise a new service is -// created and added to the service collection this server instance serves. -func (s *Server) RegisterName(name string, rcvr interface{}) error { - if s.services == nil { - s.services = make(serviceRegistry) - } - - svc := new(service) - svc.typ = reflect.TypeOf(rcvr) - rcvrVal := reflect.ValueOf(rcvr) - - if name == "" { - return fmt.Errorf("no service name for type %s", svc.typ.String()) - } - if !isExported(reflect.Indirect(rcvrVal).Type().Name()) { - return fmt.Errorf("%s is not exported", reflect.Indirect(rcvrVal).Type().Name()) - } - - methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ) +// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes +// the response back using the given codec. It will block until the codec is closed or the +// server is stopped. In either case the codec is closed. +// +// Note that codec options are no longer supported. +func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) { + defer codec.Close() - if len(methods) == 0 && len(subscriptions) == 0 { - return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr) + // Don't serve if server is stopped. + if atomic.LoadInt32(&s.run) == 0 { + return } - // already a previous service register under given name, merge methods/subscriptions - if regsvc, present := s.services[name]; present { - for _, m := range methods { - regsvc.callbacks[formatName(m.method.Name)] = m - } - for _, s := range subscriptions { - regsvc.subscriptions[formatName(s.method.Name)] = s - } - return nil - } - - svc.name = name - svc.callbacks, svc.subscriptions = methods, subscriptions + // Add the codec to the set so it can be closed by Stop. + s.codecs.Add(codec) + defer s.codecs.Remove(codec) - s.services[svc.name] = svc - return nil + c := initClient(codec, s.idgen, &s.services) + <-codec.Closed() + c.Close() } -// serveRequest will reads requests from the codec, calls the RPC callback and -// writes the response to the given codec. -// -// If singleShot is true it will process a single request, otherwise it will handle -// requests until the codec returns an error when reading a request (in most cases -// an EOF). It executes requests in parallel when singleShot is false. -func (s *Server) serveRequest(ctx context.Context, codec ServerCodec, singleShot bool, options CodecOption) error { - var pend sync.WaitGroup - - defer func() { - if err := recover(); err != nil { - const size = 64 << 10 - buf := make([]byte, size) - buf = buf[:runtime.Stack(buf, false)] - log.Error(string(buf)) - } - s.codecsMu.Lock() - s.codecs.Remove(codec) - s.codecsMu.Unlock() - }() - - // ctx, cancel := context.WithCancel(context.Background()) - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - // if the codec supports notification include a notifier that callbacks can use - // to send notification to clients. It is tied to the codec/connection. If the - // connection is closed the notifier will stop and cancels all active subscriptions. - if options&OptionSubscriptions == OptionSubscriptions { - ctx = context.WithValue(ctx, notifierKey{}, newNotifier(codec)) - } - s.codecsMu.Lock() - if atomic.LoadInt32(&s.run) != 1 { // server stopped - s.codecsMu.Unlock() - return &shutdownError{} +// serveSingleRequest reads and processes a single RPC request from the given codec. This +// is used to serve HTTP connections. Subscriptions and reverse calls are not allowed in +// this mode. +func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) { + // Don't serve if server is stopped. + if atomic.LoadInt32(&s.run) == 0 { + return } - s.codecs.Add(codec) - s.codecsMu.Unlock() - // test if the server is ordered to stop - for atomic.LoadInt32(&s.run) == 1 { - reqs, batch, err := s.readRequest(codec) - if err != nil { - // If a parsing error occurred, send an error - if err.Error() != "EOF" { - log.Debug(fmt.Sprintf("read error %v\n", err)) - codec.Write(codec.CreateErrorResponse(nil, err)) - } - // Error or end of stream, wait for requests and tear down - pend.Wait() - return nil - } + h := newHandler(ctx, codec, s.idgen, &s.services) + h.allowSubscribe = false + defer h.close(io.EOF, nil) - // check if server is ordered to shutdown and return an error - // telling the client that his request failed. - if atomic.LoadInt32(&s.run) != 1 { - err = &shutdownError{} - if batch { - resps := make([]interface{}, len(reqs)) - for i, r := range reqs { - resps[i] = codec.CreateErrorResponse(&r.id, err) - } - codec.Write(resps) - } else { - codec.Write(codec.CreateErrorResponse(&reqs[0].id, err)) - } - return nil - } - // If a single shot request is executing, run and return immediately - if singleShot { - if batch { - s.execBatch(ctx, codec, reqs) - } else { - s.exec(ctx, codec, reqs[0]) - } - return nil + reqs, batch, err := codec.Read() + if err != nil { + if err != io.EOF { + codec.Write(ctx, errorMessage(&invalidMessageError{"parse error"})) } - // For multi-shot connections, start a goroutine to serve and loop back - pend.Add(1) - - go func(reqs []*serverRequest, batch bool) { - defer pend.Done() - if batch { - s.execBatch(ctx, codec, reqs) - } else { - s.exec(ctx, codec, reqs[0]) - } - }(reqs, batch) + return + } + if batch { + h.handleBatch(reqs) + } else { + h.handleMsg(reqs[0]) } - return nil -} - -// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes the -// response back using the given codec. It will block until the codec is closed or the server is -// stopped. In either case the codec is closed. -func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) { - defer codec.Close() - s.serveRequest(context.Background(), codec, false, options) -} - -// ServeSingleRequest reads and processes a single RPC request from the given codec. It will not -// close the codec unless a non-recoverable error has occurred. Note, this method will return after -// a single request has been processed! -func (s *Server) ServeSingleRequest(ctx context.Context, codec ServerCodec, options CodecOption) { - s.serveRequest(ctx, codec, true, options) } -// Stop will stop reading new requests, wait for stopPendingRequestTimeout to allow pending requests to finish, -// close all codecs which will cancel pending requests/subscriptions. +// Stop stops reading new requests, waits for stopPendingRequestTimeout to allow pending +// requests to finish, then closes all codecs which will cancel pending requests and +// subscriptions. func (s *Server) Stop() { if atomic.CompareAndSwapInt32(&s.run, 1, 0) { - log.Debug("RPC Server shutdown initiatied") - s.codecsMu.Lock() - defer s.codecsMu.Unlock() + log.Debug("RPC server shutting down") s.codecs.Each(func(c interface{}) bool { c.(ServerCodec).Close() return true @@ -237,206 +128,20 @@ func (s *Server) Stop() { } } -// createSubscription will call the subscription callback and returns the subscription id or error. -func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *serverRequest) (ID, error) { - // subscription have as first argument the context following optional arguments - args := []reflect.Value{req.callb.rcvr, reflect.ValueOf(ctx)} - args = append(args, req.args...) - reply := req.callb.method.Func.Call(args) - - if !reply[1].IsNil() { // subscription creation failed - return "", reply[1].Interface().(error) - } - - return reply[0].Interface().(*Subscription).ID, nil -} - -// handle executes a request and returns the response from the callback. -func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverRequest) (interface{}, func()) { - if req.err != nil { - return codec.CreateErrorResponse(&req.id, req.err), nil - } - - if req.isUnsubscribe { // cancel subscription, first param must be the subscription id - if len(req.args) >= 1 && req.args[0].Kind() == reflect.String { - notifier, supported := NotifierFromContext(ctx) - if !supported { // interface doesn't support subscriptions (e.g. http) - return codec.CreateErrorResponse(&req.id, &callbackError{ErrNotificationsUnsupported.Error()}), nil - } - - subid := ID(req.args[0].String()) - if err := notifier.unsubscribe(subid); err != nil { - return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil - } - - return codec.CreateResponse(req.id, true), nil - } - return codec.CreateErrorResponse(&req.id, &invalidParamsError{"Expected subscription id as first argument"}), nil - } - - if req.callb.isSubscribe { - subid, err := s.createSubscription(ctx, codec, req) - if err != nil { - return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil - } - - // active the subscription after the sub id was successfully sent to the client - activateSub := func() { - notifier, _ := NotifierFromContext(ctx) - notifier.activate(subid, req.svcname) - } - - return codec.CreateResponse(req.id, subid), activateSub - } - - // regular RPC call, prepare arguments - if len(req.args) != len(req.callb.argTypes) { - rpcErr := &invalidParamsError{fmt.Sprintf("%s%s%s expects %d parameters, got %d", - req.svcname, serviceMethodSeparator, req.callb.method.Name, - len(req.callb.argTypes), len(req.args))} - return codec.CreateErrorResponse(&req.id, rpcErr), nil - } - - arguments := []reflect.Value{req.callb.rcvr} - if req.callb.hasCtx { - arguments = append(arguments, reflect.ValueOf(ctx)) - } - if len(req.args) > 0 { - arguments = append(arguments, req.args...) - } - - // execute RPC method and return result - reply := req.callb.method.Func.Call(arguments) - if len(reply) == 0 { - return codec.CreateResponse(req.id, nil), nil - } - if req.callb.errPos >= 0 { // test if method returned an error - if !reply[req.callb.errPos].IsNil() { - e := reply[req.callb.errPos].Interface().(error) - res := codec.CreateErrorResponse(&req.id, &callbackError{e.Error()}) - return res, nil - } - } - return codec.CreateResponse(req.id, reply[0].Interface()), nil -} - -// exec executes the given request and writes the result back using the codec. -func (s *Server) exec(ctx context.Context, codec ServerCodec, req *serverRequest) { - var response interface{} - var callback func() - if req.err != nil { - response = codec.CreateErrorResponse(&req.id, req.err) - } else { - response, callback = s.handle(ctx, codec, req) - } - - if err := codec.Write(response); err != nil { - log.Error(fmt.Sprintf("%v\n", err)) - codec.Close() - } - - // when request was a subscribe request this allows these subscriptions to be actived - if callback != nil { - callback() - } -} - -// execBatch executes the given requests and writes the result back using the codec. -// It will only write the response back when the last request is processed. -func (s *Server) execBatch(ctx context.Context, codec ServerCodec, requests []*serverRequest) { - responses := make([]interface{}, len(requests)) - var callbacks []func() - for i, req := range requests { - if req.err != nil { - responses[i] = codec.CreateErrorResponse(&req.id, req.err) - } else { - var callback func() - if responses[i], callback = s.handle(ctx, codec, req); callback != nil { - callbacks = append(callbacks, callback) - } - } - } - - if err := codec.Write(responses); err != nil { - log.Error(fmt.Sprintf("%v\n", err)) - codec.Close() - } - - // when request holds one of more subscribe requests this allows these subscriptions to be activated - for _, c := range callbacks { - c() - } +// RPCService gives meta information about the server. +// e.g. gives information about the loaded modules. +type RPCService struct { + server *Server } -// readRequest requests the next (batch) request from the codec. It will return the collection -// of requests, an indication if the request was a batch, the invalid request identifier and an -// error when the request could not be read/parsed. -func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error) { - reqs, batch, err := codec.ReadRequestHeaders() - if err != nil { - return nil, batch, err - } - - requests := make([]*serverRequest, len(reqs)) - - // verify requests - for i, r := range reqs { - var ok bool - var svc *service - - if r.err != nil { - requests[i] = &serverRequest{id: r.id, err: r.err} - continue - } - - if r.isPubSub && strings.HasSuffix(r.method, unsubscribeMethodSuffix) { - requests[i] = &serverRequest{id: r.id, isUnsubscribe: true} - argTypes := []reflect.Type{reflect.TypeOf("")} // expect subscription id as first arg - if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil { - requests[i].args = args - } else { - requests[i].err = &invalidParamsError{err.Error()} - } - continue - } - - if svc, ok = s.services[r.service]; !ok { // rpc method isn't available - requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}} - continue - } - - if r.isPubSub { // eth_subscribe, r.method contains the subscription method name - if callb, ok := svc.subscriptions[r.method]; ok { - requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb} - if r.params != nil && len(callb.argTypes) > 0 { - argTypes := []reflect.Type{reflect.TypeOf("")} - argTypes = append(argTypes, callb.argTypes...) - if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil { - requests[i].args = args[1:] // first one is service.method name which isn't an actual argument - } else { - requests[i].err = &invalidParamsError{err.Error()} - } - } - } else { - requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}} - } - continue - } - - if callb, ok := svc.callbacks[r.method]; ok { // lookup RPC method - requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb} - if r.params != nil && len(callb.argTypes) > 0 { - if args, err := codec.ParseRequestArguments(callb.argTypes, r.params); err == nil { - requests[i].args = args - } else { - requests[i].err = &invalidParamsError{err.Error()} - } - } - continue - } +// Modules returns the list of RPC services with their version number +func (s *RPCService) Modules() map[string]string { + s.server.services.mu.Lock() + defer s.server.services.mu.Unlock() - requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}} + modules := make(map[string]string) + for name := range s.server.services.services { + modules[name] = "1.0" } - - return requests, batch, nil + return modules } |