diff options
author | Felix Lange <fjl@users.noreply.github.com> | 2018-10-09 22:34:24 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-10-09 22:34:24 +0800 |
commit | 4e474c74dc2ac1d26b339c32064d0bac98775e77 (patch) | |
tree | 0d0d40fd543b039852252f1193c0eaa22a695a8e /rpc/subscription.go | |
parent | da290e9707d6b3074b571722bbb557815b03ad48 (diff) | |
download | go-tangerine-4e474c74dc2ac1d26b339c32064d0bac98775e77.tar go-tangerine-4e474c74dc2ac1d26b339c32064d0bac98775e77.tar.gz go-tangerine-4e474c74dc2ac1d26b339c32064d0bac98775e77.tar.bz2 go-tangerine-4e474c74dc2ac1d26b339c32064d0bac98775e77.tar.lz go-tangerine-4e474c74dc2ac1d26b339c32064d0bac98775e77.tar.xz go-tangerine-4e474c74dc2ac1d26b339c32064d0bac98775e77.tar.zst go-tangerine-4e474c74dc2ac1d26b339c32064d0bac98775e77.zip |
rpc: fix subscription corner case and speed up tests (#17874)
Notifier tracks whether subscription are 'active'. A subscription
becomes active when the subscription ID has been sent to the client. If
the client sends notifications in the request handler before the
subscription becomes active they are dropped. The tests tried to work
around this problem by always waiting 5s before sending the first
notification.
Fix it by buffering notifications until the subscription becomes active.
This speeds up all subscription tests.
Also fix TestSubscriptionMultipleNamespaces to wait for three messages
per subscription instead of six. The test now finishes just after all
notifications have been received and doesn't hit the 30s timeout anymore.
Diffstat (limited to 'rpc/subscription.go')
-rw-r--r-- | rpc/subscription.go | 36 |
1 files changed, 25 insertions, 11 deletions
diff --git a/rpc/subscription.go b/rpc/subscription.go index 6ce7befa1..6bbb6f75d 100644 --- a/rpc/subscription.go +++ b/rpc/subscription.go @@ -52,9 +52,10 @@ type notifierKey struct{} // Server callbacks use the notifier to send notifications. type Notifier struct { codec ServerCodec - subMu sync.RWMutex // guards active and inactive maps + subMu sync.Mutex active map[ID]*Subscription inactive map[ID]*Subscription + buffer map[ID][]interface{} // unsent notifications of inactive subscriptions } // newNotifier creates a new notifier that can be used to send subscription @@ -64,6 +65,7 @@ func newNotifier(codec ServerCodec) *Notifier { codec: codec, active: make(map[ID]*Subscription), inactive: make(map[ID]*Subscription), + buffer: make(map[ID][]interface{}), } } @@ -88,20 +90,26 @@ func (n *Notifier) CreateSubscription() *Subscription { // Notify sends a notification to the client with the given data as payload. // If an error occurs the RPC connection is closed and the error is returned. func (n *Notifier) Notify(id ID, data interface{}) error { - n.subMu.RLock() - defer n.subMu.RUnlock() - - sub, active := n.active[id] - if active { - notification := n.codec.CreateNotification(string(id), sub.namespace, data) - if err := n.codec.Write(notification); err != nil { - n.codec.Close() - return err - } + n.subMu.Lock() + defer n.subMu.Unlock() + + if sub, active := n.active[id]; active { + n.send(sub, data) + } else { + n.buffer[id] = append(n.buffer[id], data) } return nil } +func (n *Notifier) send(sub *Subscription, data interface{}) error { + notification := n.codec.CreateNotification(string(sub.ID), sub.namespace, data) + err := n.codec.Write(notification) + if err != nil { + n.codec.Close() + } + return err +} + // Closed returns a channel that is closed when the RPC connection is closed. func (n *Notifier) Closed() <-chan interface{} { return n.codec.Closed() @@ -127,9 +135,15 @@ func (n *Notifier) unsubscribe(id ID) error { func (n *Notifier) activate(id ID, namespace string) { n.subMu.Lock() defer n.subMu.Unlock() + if sub, found := n.inactive[id]; found { sub.namespace = namespace n.active[id] = sub delete(n.inactive, id) + // Send buffered notifications. + for _, data := range n.buffer[id] { + n.send(sub, data) + } + delete(n.buffer, id) } } |