aboutsummaryrefslogtreecommitdiffstats
path: root/rpc/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/server.go')
-rw-r--r--rpc/server.go36
1 files changed, 18 insertions, 18 deletions
diff --git a/rpc/server.go b/rpc/server.go
index 7b7d22063..996c63700 100644
--- a/rpc/server.go
+++ b/rpc/server.go
@@ -21,7 +21,6 @@ import (
"reflect"
"runtime"
"sync/atomic"
- "time"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
@@ -30,8 +29,6 @@ import (
)
const (
- stopPendingRequestTimeout = 3 * time.Second // give pending requests stopPendingRequestTimeout the time to finish when the server is stopped
-
notificationBufferSize = 10000 // max buffered notifications before codec is closed
MetadataApi = "rpc"
@@ -169,7 +166,7 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO
// to send notification to clients. It is thight to the codec/connection. If the
// connection is closed the notifier will stop and cancels all active subscriptions.
if options&OptionSubscriptions == OptionSubscriptions {
- ctx = context.WithValue(ctx, notifierKey{}, newBufferedNotifier(codec, notificationBufferSize))
+ ctx = context.WithValue(ctx, notifierKey{}, newNotifier(codec))
}
s.codecsMu.Lock()
if atomic.LoadInt32(&s.run) != 1 { // server stopped
@@ -183,7 +180,7 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO
for atomic.LoadInt32(&s.run) == 1 {
reqs, batch, err := s.readRequest(codec)
if err != nil {
- glog.V(logger.Debug).Infof("%v\n", err)
+ glog.V(logger.Debug).Infof("read error %v\n", err)
codec.Write(codec.CreateErrorResponse(nil, err))
return nil
}
@@ -240,19 +237,17 @@ func (s *Server) ServeSingleRequest(codec ServerCodec, options CodecOption) {
func (s *Server) Stop() {
if atomic.CompareAndSwapInt32(&s.run, 1, 0) {
glog.V(logger.Debug).Infoln("RPC Server shutdown initiatied")
- time.AfterFunc(stopPendingRequestTimeout, func() {
- s.codecsMu.Lock()
- defer s.codecsMu.Unlock()
- s.codecs.Each(func(c interface{}) bool {
- c.(ServerCodec).Close()
- return true
- })
+ s.codecsMu.Lock()
+ defer s.codecsMu.Unlock()
+ s.codecs.Each(func(c interface{}) bool {
+ c.(ServerCodec).Close()
+ return true
})
}
}
// createSubscription will call the subscription callback and returns the subscription id or error.
-func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *serverRequest) (string, error) {
+func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *serverRequest) (ID, error) {
// subscription have as first argument the context following optional arguments
args := []reflect.Value{req.callb.rcvr, reflect.ValueOf(ctx)}
args = append(args, req.args...)
@@ -262,7 +257,7 @@ func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *ser
return "", reply[1].Interface().(error)
}
- return reply[0].Interface().(Subscription).ID(), nil
+ return reply[0].Interface().(*Subscription).ID, nil
}
// handle executes a request and returns the response from the callback.
@@ -278,8 +273,8 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque
return codec.CreateErrorResponse(&req.id, &callbackError{ErrNotificationsUnsupported.Error()}), nil
}
- subid := req.args[0].String()
- if err := notifier.Unsubscribe(subid); err != nil {
+ subid := ID(req.args[0].String())
+ if err := notifier.unsubscribe(subid); err != nil {
return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil
}
@@ -297,7 +292,7 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque
// active the subscription after the sub id was successfully sent to the client
activateSub := func() {
notifier, _ := NotifierFromContext(ctx)
- notifier.(*bufferedNotifier).activate(subid)
+ notifier.activate(subid)
}
return codec.CreateResponse(req.id, subid), activateSub
@@ -386,7 +381,7 @@ func (s *Server) execBatch(ctx context.Context, codec ServerCodec, requests []*s
// readRequest requests the next (batch) request from the codec. It will return the collection
// of requests, an indication if the request was a batch, the invalid request identifier and an
// error when the request could not be read/parsed.
-func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCError) {
+func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error) {
reqs, batch, err := codec.ReadRequestHeaders()
if err != nil {
return nil, batch, err
@@ -399,6 +394,11 @@ func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCErro
var ok bool
var svc *service
+ if r.err != nil {
+ requests[i] = &serverRequest{id: r.id, err: r.err}
+ continue
+ }
+
if r.isPubSub && r.method == unsubscribeMethod {
requests[i] = &serverRequest{id: r.id, isUnsubscribe: true}
argTypes := []reflect.Type{reflect.TypeOf("")} // expect subscription id as first arg