diff options
Diffstat (limited to 'rpc/server.go')
-rw-r--r-- | rpc/server.go | 36 |
1 files changed, 18 insertions, 18 deletions
diff --git a/rpc/server.go b/rpc/server.go index 7b7d22063..996c63700 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -21,7 +21,6 @@ import ( "reflect" "runtime" "sync/atomic" - "time" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" @@ -30,8 +29,6 @@ import ( ) const ( - stopPendingRequestTimeout = 3 * time.Second // give pending requests stopPendingRequestTimeout the time to finish when the server is stopped - notificationBufferSize = 10000 // max buffered notifications before codec is closed MetadataApi = "rpc" @@ -169,7 +166,7 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO // to send notification to clients. It is thight to the codec/connection. If the // connection is closed the notifier will stop and cancels all active subscriptions. if options&OptionSubscriptions == OptionSubscriptions { - ctx = context.WithValue(ctx, notifierKey{}, newBufferedNotifier(codec, notificationBufferSize)) + ctx = context.WithValue(ctx, notifierKey{}, newNotifier(codec)) } s.codecsMu.Lock() if atomic.LoadInt32(&s.run) != 1 { // server stopped @@ -183,7 +180,7 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO for atomic.LoadInt32(&s.run) == 1 { reqs, batch, err := s.readRequest(codec) if err != nil { - glog.V(logger.Debug).Infof("%v\n", err) + glog.V(logger.Debug).Infof("read error %v\n", err) codec.Write(codec.CreateErrorResponse(nil, err)) return nil } @@ -240,19 +237,17 @@ func (s *Server) ServeSingleRequest(codec ServerCodec, options CodecOption) { func (s *Server) Stop() { if atomic.CompareAndSwapInt32(&s.run, 1, 0) { glog.V(logger.Debug).Infoln("RPC Server shutdown initiatied") - time.AfterFunc(stopPendingRequestTimeout, func() { - s.codecsMu.Lock() - defer s.codecsMu.Unlock() - s.codecs.Each(func(c interface{}) bool { - c.(ServerCodec).Close() - return true - }) + s.codecsMu.Lock() + defer s.codecsMu.Unlock() + s.codecs.Each(func(c interface{}) bool { + c.(ServerCodec).Close() + return true }) } } // createSubscription will call the subscription callback and returns the subscription id or error. -func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *serverRequest) (string, error) { +func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *serverRequest) (ID, error) { // subscription have as first argument the context following optional arguments args := []reflect.Value{req.callb.rcvr, reflect.ValueOf(ctx)} args = append(args, req.args...) @@ -262,7 +257,7 @@ func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *ser return "", reply[1].Interface().(error) } - return reply[0].Interface().(Subscription).ID(), nil + return reply[0].Interface().(*Subscription).ID, nil } // handle executes a request and returns the response from the callback. @@ -278,8 +273,8 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque return codec.CreateErrorResponse(&req.id, &callbackError{ErrNotificationsUnsupported.Error()}), nil } - subid := req.args[0].String() - if err := notifier.Unsubscribe(subid); err != nil { + subid := ID(req.args[0].String()) + if err := notifier.unsubscribe(subid); err != nil { return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil } @@ -297,7 +292,7 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque // active the subscription after the sub id was successfully sent to the client activateSub := func() { notifier, _ := NotifierFromContext(ctx) - notifier.(*bufferedNotifier).activate(subid) + notifier.activate(subid) } return codec.CreateResponse(req.id, subid), activateSub @@ -386,7 +381,7 @@ func (s *Server) execBatch(ctx context.Context, codec ServerCodec, requests []*s // readRequest requests the next (batch) request from the codec. It will return the collection // of requests, an indication if the request was a batch, the invalid request identifier and an // error when the request could not be read/parsed. -func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCError) { +func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error) { reqs, batch, err := codec.ReadRequestHeaders() if err != nil { return nil, batch, err @@ -399,6 +394,11 @@ func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCErro var ok bool var svc *service + if r.err != nil { + requests[i] = &serverRequest{id: r.id, err: r.err} + continue + } + if r.isPubSub && r.method == unsubscribeMethod { requests[i] = &serverRequest{id: r.id, isUnsubscribe: true} argTypes := []reflect.Type{reflect.TypeOf("")} // expect subscription id as first arg |