aboutsummaryrefslogtreecommitdiffstats
path: root/rpc/server.go
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2017-05-03 16:41:07 +0800
committerGitHub <noreply@github.com>2017-05-03 16:41:07 +0800
commita8eafcdc0e01ba1adcb25bfb0e06e7575c1a436e (patch)
tree94c70a20c9e3b025a9223b70f49b8c0210840e89 /rpc/server.go
parentc3dc01caf18addd8466bf8667dca2052ad1342f8 (diff)
parent37e3f561f15cbedf10c01847e58a079f9b86bf6f (diff)
downloadgo-tangerine-a8eafcdc0e01ba1adcb25bfb0e06e7575c1a436e.tar
go-tangerine-a8eafcdc0e01ba1adcb25bfb0e06e7575c1a436e.tar.gz
go-tangerine-a8eafcdc0e01ba1adcb25bfb0e06e7575c1a436e.tar.bz2
go-tangerine-a8eafcdc0e01ba1adcb25bfb0e06e7575c1a436e.tar.lz
go-tangerine-a8eafcdc0e01ba1adcb25bfb0e06e7575c1a436e.tar.xz
go-tangerine-a8eafcdc0e01ba1adcb25bfb0e06e7575c1a436e.tar.zst
go-tangerine-a8eafcdc0e01ba1adcb25bfb0e06e7575c1a436e.zip
Merge pull request #13885 from bas-vk/rpc_generic_pubsub
rpc: support subscriptions under custom namespaces
Diffstat (limited to 'rpc/server.go')
-rw-r--r--rpc/server.go17
1 files changed, 8 insertions, 9 deletions
diff --git a/rpc/server.go b/rpc/server.go
index 78df37e52..62b84af34 100644
--- a/rpc/server.go
+++ b/rpc/server.go
@@ -21,6 +21,7 @@ import (
"fmt"
"reflect"
"runtime"
+ "strings"
"sync"
"sync/atomic"
@@ -96,32 +97,30 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error {
return fmt.Errorf("%s is not exported", reflect.Indirect(rcvrVal).Type().Name())
}
+ methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ)
+
// already a previous service register under given sname, merge methods/subscriptions
if regsvc, present := s.services[name]; present {
- methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ)
if len(methods) == 0 && len(subscriptions) == 0 {
return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
}
-
for _, m := range methods {
regsvc.callbacks[formatName(m.method.Name)] = m
}
for _, s := range subscriptions {
regsvc.subscriptions[formatName(s.method.Name)] = s
}
-
return nil
}
svc.name = name
- svc.callbacks, svc.subscriptions = suitableCallbacks(rcvrVal, svc.typ)
+ svc.callbacks, svc.subscriptions = methods, subscriptions
if len(svc.callbacks) == 0 && len(svc.subscriptions) == 0 {
return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
}
s.services[svc.name] = svc
-
return nil
}
@@ -303,7 +302,7 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque
// active the subscription after the sub id was successfully sent to the client
activateSub := func() {
notifier, _ := NotifierFromContext(ctx)
- notifier.activate(subid)
+ notifier.activate(subid, req.svcname)
}
return codec.CreateResponse(req.id, subid), activateSub
@@ -383,7 +382,7 @@ func (s *Server) execBatch(ctx context.Context, codec ServerCodec, requests []*s
codec.Close()
}
- // when request holds one of more subscribe requests this allows these subscriptions to be actived
+ // when request holds one of more subscribe requests this allows these subscriptions to be activated
for _, c := range callbacks {
c()
}
@@ -410,7 +409,7 @@ func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error)
continue
}
- if r.isPubSub && r.method == unsubscribeMethod {
+ if r.isPubSub && strings.HasSuffix(r.method, unsubscribeMethodSuffix) {
requests[i] = &serverRequest{id: r.id, isUnsubscribe: true}
argTypes := []reflect.Type{reflect.TypeOf("")} // expect subscription id as first arg
if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil {
@@ -439,7 +438,7 @@ func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error)
}
}
} else {
- requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{subscribeMethod, r.method}}
+ requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.method, r.method}}
}
continue
}