diff options
Diffstat (limited to 'rpc/subscription.go')
-rw-r--r-- | rpc/subscription.go | 36 |
1 files changed, 25 insertions, 11 deletions
diff --git a/rpc/subscription.go b/rpc/subscription.go index 6ce7befa1..6bbb6f75d 100644 --- a/rpc/subscription.go +++ b/rpc/subscription.go @@ -52,9 +52,10 @@ type notifierKey struct{} // Server callbacks use the notifier to send notifications. type Notifier struct { codec ServerCodec - subMu sync.RWMutex // guards active and inactive maps + subMu sync.Mutex active map[ID]*Subscription inactive map[ID]*Subscription + buffer map[ID][]interface{} // unsent notifications of inactive subscriptions } // newNotifier creates a new notifier that can be used to send subscription @@ -64,6 +65,7 @@ func newNotifier(codec ServerCodec) *Notifier { codec: codec, active: make(map[ID]*Subscription), inactive: make(map[ID]*Subscription), + buffer: make(map[ID][]interface{}), } } @@ -88,20 +90,26 @@ func (n *Notifier) CreateSubscription() *Subscription { // Notify sends a notification to the client with the given data as payload. // If an error occurs the RPC connection is closed and the error is returned. func (n *Notifier) Notify(id ID, data interface{}) error { - n.subMu.RLock() - defer n.subMu.RUnlock() - - sub, active := n.active[id] - if active { - notification := n.codec.CreateNotification(string(id), sub.namespace, data) - if err := n.codec.Write(notification); err != nil { - n.codec.Close() - return err - } + n.subMu.Lock() + defer n.subMu.Unlock() + + if sub, active := n.active[id]; active { + n.send(sub, data) + } else { + n.buffer[id] = append(n.buffer[id], data) } return nil } +func (n *Notifier) send(sub *Subscription, data interface{}) error { + notification := n.codec.CreateNotification(string(sub.ID), sub.namespace, data) + err := n.codec.Write(notification) + if err != nil { + n.codec.Close() + } + return err +} + // Closed returns a channel that is closed when the RPC connection is closed. func (n *Notifier) Closed() <-chan interface{} { return n.codec.Closed() @@ -127,9 +135,15 @@ func (n *Notifier) unsubscribe(id ID) error { func (n *Notifier) activate(id ID, namespace string) { n.subMu.Lock() defer n.subMu.Unlock() + if sub, found := n.inactive[id]; found { sub.namespace = namespace n.active[id] = sub delete(n.inactive, id) + // Send buffered notifications. + for _, data := range n.buffer[id] { + n.send(sub, data) + } + delete(n.buffer, id) } } |