aboutsummaryrefslogtreecommitdiffstats
path: root/rpc/subscription.go
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/subscription.go')
-rw-r--r--rpc/subscription.go36
1 files changed, 25 insertions, 11 deletions
diff --git a/rpc/subscription.go b/rpc/subscription.go
index 6ce7befa1..6bbb6f75d 100644
--- a/rpc/subscription.go
+++ b/rpc/subscription.go
@@ -52,9 +52,10 @@ type notifierKey struct{}
// Server callbacks use the notifier to send notifications.
type Notifier struct {
codec ServerCodec
- subMu sync.RWMutex // guards active and inactive maps
+ subMu sync.Mutex
active map[ID]*Subscription
inactive map[ID]*Subscription
+ buffer map[ID][]interface{} // unsent notifications of inactive subscriptions
}
// newNotifier creates a new notifier that can be used to send subscription
@@ -64,6 +65,7 @@ func newNotifier(codec ServerCodec) *Notifier {
codec: codec,
active: make(map[ID]*Subscription),
inactive: make(map[ID]*Subscription),
+ buffer: make(map[ID][]interface{}),
}
}
@@ -88,20 +90,26 @@ func (n *Notifier) CreateSubscription() *Subscription {
// Notify sends a notification to the client with the given data as payload.
// If an error occurs the RPC connection is closed and the error is returned.
func (n *Notifier) Notify(id ID, data interface{}) error {
- n.subMu.RLock()
- defer n.subMu.RUnlock()
-
- sub, active := n.active[id]
- if active {
- notification := n.codec.CreateNotification(string(id), sub.namespace, data)
- if err := n.codec.Write(notification); err != nil {
- n.codec.Close()
- return err
- }
+ n.subMu.Lock()
+ defer n.subMu.Unlock()
+
+ if sub, active := n.active[id]; active {
+ n.send(sub, data)
+ } else {
+ n.buffer[id] = append(n.buffer[id], data)
}
return nil
}
+func (n *Notifier) send(sub *Subscription, data interface{}) error {
+ notification := n.codec.CreateNotification(string(sub.ID), sub.namespace, data)
+ err := n.codec.Write(notification)
+ if err != nil {
+ n.codec.Close()
+ }
+ return err
+}
+
// Closed returns a channel that is closed when the RPC connection is closed.
func (n *Notifier) Closed() <-chan interface{} {
return n.codec.Closed()
@@ -127,9 +135,15 @@ func (n *Notifier) unsubscribe(id ID) error {
func (n *Notifier) activate(id ID, namespace string) {
n.subMu.Lock()
defer n.subMu.Unlock()
+
if sub, found := n.inactive[id]; found {
sub.namespace = namespace
n.active[id] = sub
delete(n.inactive, id)
+ // Send buffered notifications.
+ for _, data := range n.buffer[id] {
+ n.send(sub, data)
+ }
+ delete(n.buffer, id)
}
}