diff options
author | Péter Szilágyi <peterke@gmail.com> | 2017-05-03 16:41:07 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-05-03 16:41:07 +0800 |
commit | a8eafcdc0e01ba1adcb25bfb0e06e7575c1a436e (patch) | |
tree | 94c70a20c9e3b025a9223b70f49b8c0210840e89 /rpc/server.go | |
parent | c3dc01caf18addd8466bf8667dca2052ad1342f8 (diff) | |
parent | 37e3f561f15cbedf10c01847e58a079f9b86bf6f (diff) | |
download | go-tangerine-a8eafcdc0e01ba1adcb25bfb0e06e7575c1a436e.tar go-tangerine-a8eafcdc0e01ba1adcb25bfb0e06e7575c1a436e.tar.gz go-tangerine-a8eafcdc0e01ba1adcb25bfb0e06e7575c1a436e.tar.bz2 go-tangerine-a8eafcdc0e01ba1adcb25bfb0e06e7575c1a436e.tar.lz go-tangerine-a8eafcdc0e01ba1adcb25bfb0e06e7575c1a436e.tar.xz go-tangerine-a8eafcdc0e01ba1adcb25bfb0e06e7575c1a436e.tar.zst go-tangerine-a8eafcdc0e01ba1adcb25bfb0e06e7575c1a436e.zip |
Merge pull request #13885 from bas-vk/rpc_generic_pubsub
rpc: support subscriptions under custom namespaces
Diffstat (limited to 'rpc/server.go')
-rw-r--r-- | rpc/server.go | 17 |
1 files changed, 8 insertions, 9 deletions
diff --git a/rpc/server.go b/rpc/server.go index 78df37e52..62b84af34 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -21,6 +21,7 @@ import ( "fmt" "reflect" "runtime" + "strings" "sync" "sync/atomic" @@ -96,32 +97,30 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error { return fmt.Errorf("%s is not exported", reflect.Indirect(rcvrVal).Type().Name()) } + methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ) + // already a previous service register under given sname, merge methods/subscriptions if regsvc, present := s.services[name]; present { - methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ) if len(methods) == 0 && len(subscriptions) == 0 { return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr) } - for _, m := range methods { regsvc.callbacks[formatName(m.method.Name)] = m } for _, s := range subscriptions { regsvc.subscriptions[formatName(s.method.Name)] = s } - return nil } svc.name = name - svc.callbacks, svc.subscriptions = suitableCallbacks(rcvrVal, svc.typ) + svc.callbacks, svc.subscriptions = methods, subscriptions if len(svc.callbacks) == 0 && len(svc.subscriptions) == 0 { return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr) } s.services[svc.name] = svc - return nil } @@ -303,7 +302,7 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque // active the subscription after the sub id was successfully sent to the client activateSub := func() { notifier, _ := NotifierFromContext(ctx) - notifier.activate(subid) + notifier.activate(subid, req.svcname) } return codec.CreateResponse(req.id, subid), activateSub @@ -383,7 +382,7 @@ func (s *Server) execBatch(ctx context.Context, codec ServerCodec, requests []*s codec.Close() } - // when request holds one of more subscribe requests this allows these subscriptions to be actived + // when request holds one of more subscribe requests this allows these subscriptions to be activated for _, c := range callbacks { c() } @@ -410,7 +409,7 @@ func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error) continue } - if r.isPubSub && r.method == unsubscribeMethod { + if r.isPubSub && strings.HasSuffix(r.method, unsubscribeMethodSuffix) { requests[i] = &serverRequest{id: r.id, isUnsubscribe: true} argTypes := []reflect.Type{reflect.TypeOf("")} // expect subscription id as first arg if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil { @@ -439,7 +438,7 @@ func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error) } } } else { - requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{subscribeMethod, r.method}} + requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.method, r.method}} } continue } |