From f7328c5ecbd1076582a71ef7bf436485f3868b1f Mon Sep 17 00:00:00 2001 From: Bas van Kervel Date: Tue, 29 Mar 2016 15:07:40 +0200 Subject: rpc: add pub/sub support --- rpc/server.go | 228 +++++++++++++++++++++++++++------------------------------- 1 file changed, 107 insertions(+), 121 deletions(-) (limited to 'rpc/server.go') diff --git a/rpc/server.go b/rpc/server.go index 22448f8e3..cf90eba02 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -23,7 +23,6 @@ import ( "sync/atomic" "time" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "golang.org/x/net/context" @@ -33,10 +32,26 @@ import ( const ( stopPendingRequestTimeout = 3 * time.Second // give pending requests stopPendingRequestTimeout the time to finish when the server is stopped + // NotifierContextKey is the key where the notifier associated with the codec is stored in the context + NotifierContextKey = 1 + + notificationBufferSize = 10000 // max buffered notifications before codec is closed + DefaultIPCApis = "admin,eth,debug,miner,net,shh,txpool,personal,web3" DefaultHTTPApis = "eth,net,web3" ) +// CodecOption specifies which type of messages this codec supports +type CodecOption int + +const ( + // OptionMethodInvocation is an indication that the codec supports RPC method calls + OptionMethodInvocation CodecOption = 1 << iota + + // OptionSubscriptions is an indication that the codec suports RPC notifications + OptionSubscriptions = 1 << iota // support pub sub +) + // NewServer will create a new server instance with no registered handlers. func NewServer() *Server { server := &Server{ @@ -63,7 +78,7 @@ type RPCService struct { // Modules returns the list of RPC services with their version number func (s *RPCService) Modules() map[string]string { modules := make(map[string]string) - for name, _ := range s.server.services { + for name := range s.server.services { modules[name] = "1.0" } return modules @@ -92,7 +107,7 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error { if regsvc, present := s.services[name]; present { methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ) if len(methods) == 0 && len(subscriptions) == 0 { - return fmt.Errorf("Service doesn't have any suitable methods/subscriptions to expose") + return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr) } for _, m := range methods { @@ -109,7 +124,7 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error { svc.callbacks, svc.subscriptions = suitableCallbacks(rcvrVal, svc.typ) if len(svc.callbacks) == 0 && len(svc.subscriptions) == 0 { - return fmt.Errorf("Service doesn't have any suitable methods/subscriptions to expose") + return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr) } s.services[svc.name] = svc @@ -117,12 +132,23 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error { return nil } +// hasOption returns true if option is included in options, otherwise false +func hasOption(option CodecOption, options []CodecOption) bool { + for _, o := range options { + if option == o { + return true + } + } + return false +} + // serveRequest will reads requests from the codec, calls the RPC callback and // writes the response to the given codec. +// // If singleShot is true it will process a single request, otherwise it will handle // requests until the codec returns an error when reading a request (in most cases // an EOF). It executes requests in parallel when singleShot is false. -func (s *Server) serveRequest(codec ServerCodec, singleShot bool) error { +func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecOption) error { defer func() { if err := recover(); err != nil { const size = 64 << 10 @@ -141,6 +167,12 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + // if the codec supports notification include a notifier that callbacks can use + // to send notification to clients. It is thight to the codec/connection. If the + // connection is closed the notifier will stop and cancels all active subscriptions. + if options&OptionSubscriptions == OptionSubscriptions { + ctx = context.WithValue(ctx, NotifierContextKey, newBufferedNotifier(codec, notificationBufferSize)) + } s.codecsMu.Lock() if atomic.LoadInt32(&s.run) != 1 { // server stopped s.codecsMu.Unlock() @@ -193,20 +225,16 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool) error { // ServeCodec reads incoming requests from codec, calls the appropriate callback and writes the // response back using the given codec. It will block until the codec is closed or the server is // stopped. In either case the codec is closed. -// -// This server will: -// 1. allow for asynchronous and parallel request execution -// 2. supports notifications (pub/sub) -// 3. supports request batches -func (s *Server) ServeCodec(codec ServerCodec) { +func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) { defer codec.Close() - s.serveRequest(codec, false) + s.serveRequest(codec, false, options) } // ServeSingleRequest reads and processes a single RPC request from the given codec. It will not -// close the codec unless a non-recoverable error has occurred. -func (s *Server) ServeSingleRequest(codec ServerCodec) { - s.serveRequest(codec, true) +// close the codec unless a non-recoverable error has occurred. Note, this method will return after +// a single request has been processed! +func (s *Server) ServeSingleRequest(codec ServerCodec, options CodecOption) { + s.serveRequest(codec, true, options) } // Stop will stop reading new requests, wait for stopPendingRequestTimeout to allow pending requests to finish, @@ -225,122 +253,64 @@ func (s *Server) Stop() { } } -// sendNotification will create a notification from the given event by serializing member fields of the event. -// It will then send the notification to the client, when it fails the codec is closed. When the event has multiple -// fields an array of values is returned. -func sendNotification(codec ServerCodec, subid string, event interface{}) { - notification := codec.CreateNotification(subid, event) - - if err := codec.Write(notification); err != nil { - codec.Close() - } -} - -// createSubscription will register a new subscription and waits for raised events. When an event is raised it will: -// 1. test if the event is raised matches the criteria the user has (optionally) specified -// 2. create a notification of the event and send it the client when it matches the criteria -// It will unsubscribe the subscription when the socket is closed or the subscription is unsubscribed by the user. -func (s *Server) createSubscription(c ServerCodec, req *serverRequest) (string, error) { - args := []reflect.Value{req.callb.rcvr} - if len(req.args) > 0 { - args = append(args, req.args...) - } - - subid, err := newSubscriptionId() - if err != nil { - return "", err - } - +// createSubscription will call the subscription callback and returns the subscription id or error. +func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *serverRequest) (string, error) { + // subscription have as first argument the context following optional arguments + args := []reflect.Value{req.callb.rcvr, reflect.ValueOf(ctx)} + args = append(args, req.args...) reply := req.callb.method.Func.Call(args) - if reply[1].IsNil() { // no error - if subscription, ok := reply[0].Interface().(Subscription); ok { - s.muSubcriptions.Lock() - s.subscriptions[subid] = subscription - s.muSubcriptions.Unlock() - go func() { - cases := []reflect.SelectCase{ - reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(subscription.Chan())}, // new event - reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c.Closed())}, // connection closed - } - - for { - idx, notification, recvOk := reflect.Select(cases) - switch idx { - case 0: // new event, or channel closed - if recvOk { // send notification - if event, ok := notification.Interface().(*event.Event); ok { - if subscription.match == nil || subscription.match(event.Data) { - sendNotification(c, subid, subscription.format(event.Data)) - } - } - } else { // user send an eth_unsubscribe request - return - } - case 1: // connection closed - s.unsubscribe(subid) - return - } - } - }() - } else { // unable to create subscription - s.muSubcriptions.Lock() - delete(s.subscriptions, subid) - s.muSubcriptions.Unlock() - } - } else { - return "", fmt.Errorf("Unable to create subscription") + if !reply[1].IsNil() { // subscription creation failed + return "", reply[1].Interface().(error) } - return subid, nil -} - -// unsubscribe calls the Unsubscribe method on the subscription and removes a subscription from the subscription -// registry. -func (s *Server) unsubscribe(subid string) bool { - s.muSubcriptions.Lock() - defer s.muSubcriptions.Unlock() - if sub, ok := s.subscriptions[subid]; ok { - sub.Unsubscribe() - delete(s.subscriptions, subid) - return true - } - return false + return reply[0].Interface().(Subscription).ID(), nil } // handle executes a request and returns the response from the callback. -func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverRequest) interface{} { +func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverRequest) (interface{}, func()) { if req.err != nil { - return codec.CreateErrorResponse(&req.id, req.err) + return codec.CreateErrorResponse(&req.id, req.err), nil } - if req.isUnsubscribe { // first param must be the subscription id + if req.isUnsubscribe { // cancel subscription, first param must be the subscription id if len(req.args) >= 1 && req.args[0].Kind() == reflect.String { + notifier, supported := ctx.Value(NotifierContextKey).(*bufferedNotifier) + if !supported { // interface doesn't support subscriptions (e.g. http) + return codec.CreateErrorResponse(&req.id, &callbackError{ErrNotificationsUnsupported.Error()}), nil + } + subid := req.args[0].String() - if s.unsubscribe(subid) { - return codec.CreateResponse(req.id, true) - } else { - return codec.CreateErrorResponse(&req.id, - &callbackError{fmt.Sprintf("subscription '%s' not found", subid)}) + if err := notifier.Unsubscribe(subid); err != nil { + return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil } + + return codec.CreateResponse(req.id, true), nil } - return codec.CreateErrorResponse(&req.id, &invalidParamsError{"Expected subscription id as argument"}) + return codec.CreateErrorResponse(&req.id, &invalidParamsError{"Expected subscription id as first argument"}), nil } if req.callb.isSubscribe { - subid, err := s.createSubscription(codec, req) + subid, err := s.createSubscription(ctx, codec, req) if err != nil { - return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}) + return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil + } + + // active the subscription after the sub id was successful sent to the client + activateSub := func() { + notifier, _ := ctx.Value(NotifierContextKey).(*bufferedNotifier) + notifier.activate(subid) } - return codec.CreateResponse(req.id, subid) + + return codec.CreateResponse(req.id, subid), activateSub } - // regular RPC call + // regular RPC call, prepare arguments if len(req.args) != len(req.callb.argTypes) { rpcErr := &invalidParamsError{fmt.Sprintf("%s%s%s expects %d parameters, got %d", req.svcname, serviceMethodSeparator, req.callb.method.Name, len(req.callb.argTypes), len(req.args))} - return codec.CreateErrorResponse(&req.id, rpcErr) + return codec.CreateErrorResponse(&req.id, rpcErr), nil } arguments := []reflect.Value{req.callb.rcvr} @@ -351,45 +321,56 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque arguments = append(arguments, req.args...) } + // execute RPC method and return result reply := req.callb.method.Func.Call(arguments) - if len(reply) == 0 { - return codec.CreateResponse(req.id, nil) + return codec.CreateResponse(req.id, nil), nil } if req.callb.errPos >= 0 { // test if method returned an error if !reply[req.callb.errPos].IsNil() { e := reply[req.callb.errPos].Interface().(error) res := codec.CreateErrorResponse(&req.id, &callbackError{e.Error()}) - return res + return res, nil } } - return codec.CreateResponse(req.id, reply[0].Interface()) + return codec.CreateResponse(req.id, reply[0].Interface()), nil } // exec executes the given request and writes the result back using the codec. func (s *Server) exec(ctx context.Context, codec ServerCodec, req *serverRequest) { var response interface{} + var callback func() if req.err != nil { response = codec.CreateErrorResponse(&req.id, req.err) } else { - response = s.handle(ctx, codec, req) + response, callback = s.handle(ctx, codec, req) } + if err := codec.Write(response); err != nil { glog.V(logger.Error).Infof("%v\n", err) codec.Close() } + + // when request was a subscribe request this allows these subscriptions to be actived + if callback != nil { + callback() + } } -// execBatch executes the given requests and writes the result back using the codec. It will only write the response -// back when the last request is processed. +// execBatch executes the given requests and writes the result back using the codec. +// It will only write the response back when the last request is processed. func (s *Server) execBatch(ctx context.Context, codec ServerCodec, requests []*serverRequest) { responses := make([]interface{}, len(requests)) + var callbacks []func() for i, req := range requests { if req.err != nil { responses[i] = codec.CreateErrorResponse(&req.id, req.err) } else { - responses[i] = s.handle(ctx, codec, req) + var callback func() + if responses[i], callback = s.handle(ctx, codec, req); callback != nil { + callbacks = append(callbacks, callback) + } } } @@ -397,11 +378,16 @@ func (s *Server) execBatch(ctx context.Context, codec ServerCodec, requests []*s glog.V(logger.Error).Infof("%v\n", err) codec.Close() } + + // when request holds one of more subscribe requests this allows these subscriptions to be actived + for _, c := range callbacks { + c() + } } -// readRequest requests the next (batch) request from the codec. It will return the collection of requests, an -// indication if the request was a batch, the invalid request identifier and an error when the request could not be -// read/parsed. +// readRequest requests the next (batch) request from the codec. It will return the collection +// of requests, an indication if the request was a batch, the invalid request identifier and an +// error when the request could not be read/parsed. func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCError) { reqs, batch, err := codec.ReadRequestHeaders() if err != nil { @@ -417,7 +403,7 @@ func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCErro if r.isPubSub && r.method == unsubscribeMethod { requests[i] = &serverRequest{id: r.id, isUnsubscribe: true} - argTypes := []reflect.Type{reflect.TypeOf("")} + argTypes := []reflect.Type{reflect.TypeOf("")} // expect subscription id as first arg if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil { requests[i].args = args } else { @@ -426,12 +412,12 @@ func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCErro continue } - if svc, ok = s.services[r.service]; !ok { + if svc, ok = s.services[r.service]; !ok { // rpc method isn't available requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}} continue } - if r.isPubSub { // eth_subscribe + if r.isPubSub { // eth_subscribe, r.method contains the subscription method name if callb, ok := svc.subscriptions[r.method]; ok { requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb} if r.params != nil && len(callb.argTypes) > 0 { @@ -449,7 +435,7 @@ func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCErro continue } - if callb, ok := svc.callbacks[r.method]; ok { + if callb, ok := svc.callbacks[r.method]; ok { // lookup RPC method requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb} if r.params != nil && len(callb.argTypes) > 0 { if args, err := codec.ParseRequestArguments(callb.argTypes, r.params); err == nil { -- cgit v1.2.3