aboutsummaryrefslogtreecommitdiffstats
path: root/event/subscription.go
diff options
context:
space:
mode:
authorFelix Lange <fjl@twurst.com>2016-07-19 07:39:12 +0800
committerFelix Lange <fjl@twurst.com>2017-01-26 01:44:20 +0800
commit6d5e100d0dc6fc0b905610850a75b5d4fa907739 (patch)
treea653dc592af1e7405faeaf9ec9b84f9b32aa3b88 /event/subscription.go
parent9b62facdd4bdabfed5ef98d131686c4d2606083a (diff)
downloadgo-tangerine-6d5e100d0dc6fc0b905610850a75b5d4fa907739.tar
go-tangerine-6d5e100d0dc6fc0b905610850a75b5d4fa907739.tar.gz
go-tangerine-6d5e100d0dc6fc0b905610850a75b5d4fa907739.tar.bz2
go-tangerine-6d5e100d0dc6fc0b905610850a75b5d4fa907739.tar.lz
go-tangerine-6d5e100d0dc6fc0b905610850a75b5d4fa907739.tar.xz
go-tangerine-6d5e100d0dc6fc0b905610850a75b5d4fa907739.tar.zst
go-tangerine-6d5e100d0dc6fc0b905610850a75b5d4fa907739.zip
event: add new Subscription type and related utilities
This commit introduces a new Subscription type, which is synonymous with ethereum.Subscription. It also adds a couple of utilities that make working with Subscriptions easier. The mot complex utility is Feed, a synchronisation device that implements broadcast subscriptions. Feed is slightly faster than TypeMux and will replace uses of TypeMux across the go-ethereum codebase in the future.
Diffstat (limited to 'event/subscription.go')
-rw-r--r--event/subscription.go275
1 files changed, 275 insertions, 0 deletions
diff --git a/event/subscription.go b/event/subscription.go
new file mode 100644
index 000000000..7f2619b2d
--- /dev/null
+++ b/event/subscription.go
@@ -0,0 +1,275 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package event
+
+import (
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common/mclock"
+ "golang.org/x/net/context"
+)
+
+// Subscription represents a stream of events. The carrier of the events is typically a
+// channel, but isn't part of the interface.
+//
+// Subscriptions can fail while established. Failures are reported through an error
+// channel. It receives a value if there is an issue with the subscription (e.g. the
+// network connection delivering the events has been closed). Only one value will ever be
+// sent.
+//
+// The error channel is closed when the subscription ends successfully (i.e. when the
+// source of events is closed). It is also closed when Unsubscribe is called.
+//
+// The Unsubscribe method cancels the sending of events. You must call Unsubscribe in all
+// cases to ensure that resources related to the subscription are released. It can be
+// called any number of times.
+type Subscription interface {
+ Err() <-chan error // returns the error channel
+ Unsubscribe() // cancels sending of events, closing the error channel
+}
+
+// NewSubscription runs fn as a subscription in a new goroutine. The channel given to fn
+// is closed when Unsubscribe is called. If fn returns an error, it is sent on the
+// subscription's error channel.
+func NewSubscription(fn func(<-chan struct{}) error) Subscription {
+ s := &funcSub{unsub: make(chan struct{}), err: make(chan error, 1)}
+ go func() {
+ defer close(s.err)
+ err := fn(s.unsub)
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if !s.unsubscribed {
+ if err != nil {
+ s.err <- err
+ }
+ s.unsubscribed = true
+ }
+ }()
+ return s
+}
+
+type funcSub struct {
+ unsub chan struct{}
+ err chan error
+ mu sync.Mutex
+ unsubscribed bool
+}
+
+func (s *funcSub) Unsubscribe() {
+ s.mu.Lock()
+ if s.unsubscribed {
+ s.mu.Unlock()
+ return
+ }
+ s.unsubscribed = true
+ close(s.unsub)
+ s.mu.Unlock()
+ // Wait for producer shutdown.
+ <-s.err
+}
+
+func (s *funcSub) Err() <-chan error {
+ return s.err
+}
+
+// Resubscribe calls fn repeatedly to keep a subscription established. When the
+// subscription is established, Resubscribe waits for it to fail and calls fn again. This
+// process repeats until Unsubscribe is called or the active subscription ends
+// successfully.
+//
+// Resubscribe applies backoff between calls to fn. The time between calls is adapted
+// based on the error rate, but will never exceed backoffMax.
+func Resubscribe(backoffMax time.Duration, fn ResubscribeFunc) Subscription {
+ s := &resubscribeSub{
+ waitTime: backoffMax / 10,
+ backoffMax: backoffMax,
+ fn: fn,
+ err: make(chan error),
+ unsub: make(chan struct{}),
+ }
+ go s.loop()
+ return s
+}
+
+// A ResubscribeFunc attempts to establish a subscription.
+type ResubscribeFunc func(context.Context) (Subscription, error)
+
+type resubscribeSub struct {
+ fn ResubscribeFunc
+ err chan error
+ unsub chan struct{}
+ unsubOnce sync.Once
+ lastTry mclock.AbsTime
+ waitTime, backoffMax time.Duration
+}
+
+func (s *resubscribeSub) Unsubscribe() {
+ s.unsubOnce.Do(func() {
+ s.unsub <- struct{}{}
+ <-s.err
+ })
+}
+
+func (s *resubscribeSub) Err() <-chan error {
+ return s.err
+}
+
+func (s *resubscribeSub) loop() {
+ defer close(s.err)
+ var done bool
+ for !done {
+ sub := s.subscribe()
+ if sub == nil {
+ break
+ }
+ done = s.waitForError(sub)
+ sub.Unsubscribe()
+ }
+}
+
+func (s *resubscribeSub) subscribe() Subscription {
+ subscribed := make(chan error)
+ var sub Subscription
+retry:
+ for {
+ s.lastTry = mclock.Now()
+ ctx, cancel := context.WithCancel(context.Background())
+ go func() {
+ rsub, err := s.fn(ctx)
+ sub = rsub
+ subscribed <- err
+ }()
+ select {
+ case err := <-subscribed:
+ cancel()
+ if err != nil {
+ // Subscribing failed, wait before launching the next try.
+ if s.backoffWait() {
+ return nil
+ }
+ continue retry
+ }
+ if sub == nil {
+ panic("event: ResubscribeFunc returned nil subscription and no error")
+ }
+ return sub
+ case <-s.unsub:
+ cancel()
+ return nil
+ }
+ }
+}
+
+func (s *resubscribeSub) waitForError(sub Subscription) bool {
+ defer sub.Unsubscribe()
+ select {
+ case err := <-sub.Err():
+ return err == nil
+ case <-s.unsub:
+ return true
+ }
+}
+
+func (s *resubscribeSub) backoffWait() bool {
+ if time.Duration(mclock.Now()-s.lastTry) > s.backoffMax {
+ s.waitTime = s.backoffMax / 10
+ } else {
+ s.waitTime *= 2
+ if s.waitTime > s.backoffMax {
+ s.waitTime = s.backoffMax
+ }
+ }
+
+ t := time.NewTimer(s.waitTime)
+ defer t.Stop()
+ select {
+ case <-t.C:
+ return false
+ case <-s.unsub:
+ return true
+ }
+}
+
+// SubscriptionScope provides a facility to unsubscribe multiple subscriptions at once.
+//
+// For code that handle more than one subscription, a scope can be used to conveniently
+// unsubscribe all of them with a single call. The example demonstrates a typical use in a
+// larger program.
+//
+// The zero value is ready to use.
+type SubscriptionScope struct {
+ mu sync.Mutex
+ subs map[*scopeSub]struct{}
+ closed bool
+}
+
+type scopeSub struct {
+ sc *SubscriptionScope
+ s Subscription
+}
+
+// Track starts tracking a subscription. If the scope is closed, Track returns nil. The
+// returned subscription is a wrapper. Unsubscribing the wrapper removes it from the
+// scope.
+func (sc *SubscriptionScope) Track(s Subscription) Subscription {
+ sc.mu.Lock()
+ defer sc.mu.Unlock()
+ if sc.closed {
+ return nil
+ }
+ if sc.subs == nil {
+ sc.subs = make(map[*scopeSub]struct{})
+ }
+ ss := &scopeSub{sc, s}
+ sc.subs[ss] = struct{}{}
+ return ss
+}
+
+// Close calls Unsubscribe on all tracked subscriptions and prevents further additions to
+// the tracked set. Calls to Track after Close return nil.
+func (sc *SubscriptionScope) Close() {
+ sc.mu.Lock()
+ defer sc.mu.Unlock()
+ if sc.closed {
+ return
+ }
+ sc.closed = true
+ for s := range sc.subs {
+ s.s.Unsubscribe()
+ }
+ sc.subs = nil
+}
+
+// Count returns the number of tracked subscriptions.
+// It is meant to be used for debugging.
+func (sc *SubscriptionScope) Count() int {
+ sc.mu.Lock()
+ defer sc.mu.Unlock()
+ return len(sc.subs)
+}
+
+func (s *scopeSub) Unsubscribe() {
+ s.s.Unsubscribe()
+ s.sc.mu.Lock()
+ defer s.sc.mu.Unlock()
+ delete(s.sc.subs, s)
+}
+
+func (s *scopeSub) Err() <-chan error {
+ return s.s.Err()
+}