aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--event/feed.go31
-rw-r--r--event/feed_test.go68
-rw-r--r--event/subscription.go10
3 files changed, 93 insertions, 16 deletions
diff --git a/event/feed.go b/event/feed.go
index bd8e26321..4568304df 100644
--- a/event/feed.go
+++ b/event/feed.go
@@ -33,7 +33,9 @@ var errBadChannel = errors.New("event: Subscribe argument does not have sendable
//
// The zero value is ready to use.
type Feed struct {
- sendLock chan struct{} // one-element buffer, empty when held
+ // sendLock has a one-element buffer and is empty when held.
+ // It protects sendCases.
+ sendLock chan struct{}
removeSub chan interface{} // interrupts Send
sendCases caseList // the active set of select cases used by Send
@@ -44,6 +46,10 @@ type Feed struct {
closed bool
}
+// This is the index of the first actual subscription channel in sendCases.
+// sendCases[0] is a SelectRecv case for the removeSub channel.
+const firstSubSendCase = 1
+
type feedTypeError struct {
got, want reflect.Type
op string
@@ -67,6 +73,7 @@ func (f *Feed) init() {
// until the subscription is canceled. All channels added must have the same element type.
//
// The channel should have ample buffer space to avoid blocking other subscribers.
+// Slow subscribers are not dropped.
func (f *Feed) Subscribe(channel interface{}) Subscription {
chanval := reflect.ValueOf(channel)
chantyp := chanval.Type()
@@ -125,13 +132,14 @@ func (f *Feed) remove(sub *feedSub) {
func (f *Feed) Send(value interface{}) (nsent int) {
f.mu.Lock()
f.init()
+ f.mu.Unlock()
+
<-f.sendLock
- // Add new subscriptions from the inbox, then clear it.
+
+ // Add new cases from the inbox after taking the send lock.
+ f.mu.Lock()
f.sendCases = append(f.sendCases, f.inbox...)
- for i := range f.inbox {
- f.inbox[i] = reflect.SelectCase{}
- }
- f.inbox = f.inbox[:0]
+ f.inbox = nil
f.mu.Unlock()
// Set the sent value on all channels.
@@ -140,7 +148,7 @@ func (f *Feed) Send(value interface{}) (nsent int) {
f.sendLock <- struct{}{}
panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype})
}
- for i := 1; i < len(f.sendCases); i++ {
+ for i := firstSubSendCase; i < len(f.sendCases); i++ {
f.sendCases[i].Send = rvalue
}
@@ -150,13 +158,14 @@ func (f *Feed) Send(value interface{}) (nsent int) {
// Fast path: try sending without blocking before adding to the select set.
// This should usually succeed if subscribers are fast enough and have free
// buffer space.
- for i := 1; i < len(cases); i++ {
+ for i := firstSubSendCase; i < len(cases); i++ {
if cases[i].Chan.TrySend(rvalue) {
- cases = cases.deactivate(i)
nsent++
+ cases = cases.deactivate(i)
+ i--
}
}
- if len(cases) == 1 {
+ if len(cases) == firstSubSendCase {
break
}
// Select on all the receivers, waiting for them to unblock.
@@ -174,7 +183,7 @@ func (f *Feed) Send(value interface{}) (nsent int) {
}
// Forget about the sent value and hand off the send lock.
- for i := 1; i < len(f.sendCases); i++ {
+ for i := firstSubSendCase; i < len(f.sendCases); i++ {
f.sendCases[i].Send = reflect.Value{}
}
f.sendLock <- struct{}{}
diff --git a/event/feed_test.go b/event/feed_test.go
index 4f897c162..a82c10303 100644
--- a/event/feed_test.go
+++ b/event/feed_test.go
@@ -167,6 +167,74 @@ func TestFeedSubscribeSameChannel(t *testing.T) {
done.Wait()
}
+func TestFeedSubscribeBlockedPost(t *testing.T) {
+ var (
+ feed Feed
+ nsends = 2000
+ ch1 = make(chan int)
+ ch2 = make(chan int)
+ wg sync.WaitGroup
+ )
+ defer wg.Wait()
+
+ feed.Subscribe(ch1)
+ wg.Add(nsends)
+ for i := 0; i < nsends; i++ {
+ go func() {
+ feed.Send(99)
+ wg.Done()
+ }()
+ }
+
+ sub2 := feed.Subscribe(ch2)
+ defer sub2.Unsubscribe()
+
+ // We're done when ch1 has received N times.
+ // The number of receives on ch2 depends on scheduling.
+ for i := 0; i < nsends; {
+ select {
+ case <-ch1:
+ i++
+ case <-ch2:
+ }
+ }
+}
+
+func TestFeedUnsubscribeBlockedPost(t *testing.T) {
+ var (
+ feed Feed
+ nsends = 200
+ chans = make([]chan int, 2000)
+ subs = make([]Subscription, len(chans))
+ bchan = make(chan int)
+ bsub = feed.Subscribe(bchan)
+ wg sync.WaitGroup
+ )
+ for i := range chans {
+ chans[i] = make(chan int, nsends)
+ }
+
+ // Queue up some Sends. None of these can make progress while bchan isn't read.
+ wg.Add(nsends)
+ for i := 0; i < nsends; i++ {
+ go func() {
+ feed.Send(99)
+ wg.Done()
+ }()
+ }
+ // Subscribe the other channels.
+ for i, ch := range chans {
+ subs[i] = feed.Subscribe(ch)
+ }
+ // Unsubscribe them again.
+ for _, sub := range subs {
+ sub.Unsubscribe()
+ }
+ // Unblock the Sends.
+ bsub.Unsubscribe()
+ wg.Wait()
+}
+
func TestFeedUnsubscribeFromInbox(t *testing.T) {
var (
feed Feed
diff --git a/event/subscription.go b/event/subscription.go
index 7f2619b2d..83bd21213 100644
--- a/event/subscription.go
+++ b/event/subscription.go
@@ -43,14 +43,14 @@ type Subscription interface {
Unsubscribe() // cancels sending of events, closing the error channel
}
-// NewSubscription runs fn as a subscription in a new goroutine. The channel given to fn
-// is closed when Unsubscribe is called. If fn returns an error, it is sent on the
-// subscription's error channel.
-func NewSubscription(fn func(<-chan struct{}) error) Subscription {
+// NewSubscription runs a producer function as a subscription in a new goroutine. The
+// channel given to the producer is closed when Unsubscribe is called. If fn returns an
+// error, it is sent on the subscription's error channel.
+func NewSubscription(producer func(<-chan struct{}) error) Subscription {
s := &funcSub{unsub: make(chan struct{}), err: make(chan error, 1)}
go func() {
defer close(s.err)
- err := fn(s.unsub)
+ err := producer(s.unsub)
s.mu.Lock()
defer s.mu.Unlock()
if !s.unsubscribed {