diff options
author | Bas van Kervel <basvankervel@gmail.com> | 2017-04-06 14:56:41 +0800 |
---|---|---|
committer | Bas van Kervel <basvankervel@gmail.com> | 2017-04-25 17:13:22 +0800 |
commit | 37e3f561f15cbedf10c01847e58a079f9b86bf6f (patch) | |
tree | d7cdc3e8a5b74261a3359f6029e927cca0fc738b /rpc/subscription.go | |
parent | ba3bcd16a6d99bc0e58516556df8e96b730c2d60 (diff) | |
download | go-tangerine-37e3f561f15cbedf10c01847e58a079f9b86bf6f.tar go-tangerine-37e3f561f15cbedf10c01847e58a079f9b86bf6f.tar.gz go-tangerine-37e3f561f15cbedf10c01847e58a079f9b86bf6f.tar.bz2 go-tangerine-37e3f561f15cbedf10c01847e58a079f9b86bf6f.tar.lz go-tangerine-37e3f561f15cbedf10c01847e58a079f9b86bf6f.tar.xz go-tangerine-37e3f561f15cbedf10c01847e58a079f9b86bf6f.tar.zst go-tangerine-37e3f561f15cbedf10c01847e58a079f9b86bf6f.zip |
rpc: support subscriptions under custom namespaces
Diffstat (limited to 'rpc/subscription.go')
-rw-r--r-- | rpc/subscription.go | 14 |
1 files changed, 8 insertions, 6 deletions
diff --git a/rpc/subscription.go b/rpc/subscription.go index 9ab6af9e1..720e4dd06 100644 --- a/rpc/subscription.go +++ b/rpc/subscription.go @@ -35,8 +35,9 @@ type ID string // a Subscription is created by a notifier and tight to that notifier. The client can use // this subscription to wait for an unsubscribe request for the client, see Err(). type Subscription struct { - ID ID - err chan error // closed on unsubscribe + ID ID + namespace string + err chan error // closed on unsubscribe } // Err returns a channel that is closed when the client send an unsubscribe request. @@ -78,7 +79,7 @@ func NotifierFromContext(ctx context.Context) (*Notifier, bool) { // are dropped until the subscription is marked as active. This is done // by the RPC server after the subscription ID is send to the client. func (n *Notifier) CreateSubscription() *Subscription { - s := &Subscription{NewID(), make(chan error)} + s := &Subscription{ID: NewID(), err: make(chan error)} n.subMu.Lock() n.inactive[s.ID] = s n.subMu.Unlock() @@ -91,9 +92,9 @@ func (n *Notifier) Notify(id ID, data interface{}) error { n.subMu.RLock() defer n.subMu.RUnlock() - _, active := n.active[id] + sub, active := n.active[id] if active { - notification := n.codec.CreateNotification(string(id), data) + notification := n.codec.CreateNotification(string(id), sub.namespace, data) if err := n.codec.Write(notification); err != nil { n.codec.Close() return err @@ -124,10 +125,11 @@ func (n *Notifier) unsubscribe(id ID) error { // notifications are dropped. This method is called by the RPC server after // the subscription ID was sent to client. This prevents notifications being // send to the client before the subscription ID is send to the client. -func (n *Notifier) activate(id ID) { +func (n *Notifier) activate(id ID, namespace string) { n.subMu.Lock() defer n.subMu.Unlock() if sub, found := n.inactive[id]; found { + sub.namespace = namespace n.active[id] = sub delete(n.inactive, id) } |