From 53a18d2e2734d078200ec607055ae551245ae74b Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 10 May 2018 12:26:36 +0200 Subject: event: document select case slice use and add edge case test (#16680) Feed keeps active subscription channels in a slice called 'f.sendCases'. The Send method tracks the active cases in a local variable 'cases' whose value is f.sendCases initially. 'cases' shrinks to a shorter prefix of f.sendCases every time a send succeeds, moving the successful case out of range of the active case list. This can be confusing because the two slices share a backing array. Add more comments to document what is going on. Also add a test for removing a case that is in 'f.sentCases' but not 'cases'. --- event/feed.go | 5 ++++- event/feed_test.go | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/event/feed.go b/event/feed.go index 78fa3d98d..f578f00c1 100644 --- a/event/feed.go +++ b/event/feed.go @@ -148,7 +148,9 @@ func (f *Feed) Send(value interface{}) (nsent int) { f.sendCases[i].Send = rvalue } - // Send until all channels except removeSub have been chosen. + // Send until all channels except removeSub have been chosen. 'cases' tracks a prefix + // of sendCases. When a send succeeds, the corresponding case moves to the end of + // 'cases' and it shrinks by one element. cases := f.sendCases for { // Fast path: try sending without blocking before adding to the select set. @@ -170,6 +172,7 @@ func (f *Feed) Send(value interface{}) (nsent int) { index := f.sendCases.find(recv.Interface()) f.sendCases = f.sendCases.delete(index) if index >= 0 && index < len(cases) { + // Shrink 'cases' too because the removed case was still active. cases = f.sendCases[:len(cases)-1] } } else { diff --git a/event/feed_test.go b/event/feed_test.go index a82c10303..be8876932 100644 --- a/event/feed_test.go +++ b/event/feed_test.go @@ -235,6 +235,45 @@ func TestFeedUnsubscribeBlockedPost(t *testing.T) { wg.Wait() } +// Checks that unsubscribing a channel during Send works even if that +// channel has already been sent on. +func TestFeedUnsubscribeSentChan(t *testing.T) { + var ( + feed Feed + ch1 = make(chan int) + ch2 = make(chan int) + sub1 = feed.Subscribe(ch1) + sub2 = feed.Subscribe(ch2) + wg sync.WaitGroup + ) + defer sub2.Unsubscribe() + + wg.Add(1) + go func() { + feed.Send(0) + wg.Done() + }() + + // Wait for the value on ch1. + <-ch1 + // Unsubscribe ch1, removing it from the send cases. + sub1.Unsubscribe() + + // Receive ch2, finishing Send. + <-ch2 + wg.Wait() + + // Send again. This should send to ch2 only, so the wait group will unblock + // as soon as a value is received on ch2. + wg.Add(1) + go func() { + feed.Send(0) + wg.Done() + }() + <-ch2 + wg.Wait() +} + func TestFeedUnsubscribeFromInbox(t *testing.T) { var ( feed Feed -- cgit v1.2.3