diff options
Diffstat (limited to 'event/event_test.go')
-rw-r--r-- | event/event_test.go | 161 |
1 files changed, 161 insertions, 0 deletions
diff --git a/event/event_test.go b/event/event_test.go new file mode 100644 index 000000000..385bd70b7 --- /dev/null +++ b/event/event_test.go @@ -0,0 +1,161 @@ +package event + +import ( + "math/rand" + "sync" + "testing" + "time" +) + +type testEvent int + +func TestSub(t *testing.T) { + mux := NewTypeMux() + defer mux.Stop() + + sub := mux.Subscribe(testEvent(0)) + go func() { + if err := mux.Post(testEvent(5)); err != nil { + t.Errorf("Post returned unexpected error: %v", err) + } + }() + ev := <-sub.Chan() + + if ev.(testEvent) != testEvent(5) { + t.Errorf("Got %v (%T), expected event %v (%T)", + ev, ev, testEvent(5), testEvent(5)) + } +} + +func TestMuxErrorAfterStop(t *testing.T) { + mux := NewTypeMux() + mux.Stop() + + sub := mux.Subscribe(testEvent(0)) + if _, isopen := <-sub.Chan(); isopen { + t.Errorf("subscription channel was not closed") + } + if err := mux.Post(testEvent(0)); err != ErrMuxClosed { + t.Errorf("Post error mismatch, got: %s, expected: %s", err, ErrMuxClosed) + } +} + +func TestUnsubscribeUnblockPost(t *testing.T) { + mux := NewTypeMux() + defer mux.Stop() + + sub := mux.Subscribe(testEvent(0)) + unblocked := make(chan bool) + go func() { + mux.Post(testEvent(5)) + unblocked <- true + }() + + select { + case <-unblocked: + t.Errorf("Post returned before Unsubscribe") + default: + sub.Unsubscribe() + <-unblocked + } +} + +func TestMuxConcurrent(t *testing.T) { + rand.Seed(time.Now().Unix()) + mux := NewTypeMux() + defer mux.Stop() + + recv := make(chan int) + poster := func() { + for { + err := mux.Post(testEvent(0)) + if err != nil { + return + } + } + } + sub := func(i int) { + time.Sleep(time.Duration(rand.Intn(99)) * time.Millisecond) + sub := mux.Subscribe(testEvent(0)) + <-sub.Chan() + sub.Unsubscribe() + recv <- i + } + + go poster() + go poster() + go poster() + nsubs := 1000 + for i := 0; i < nsubs; i++ { + go sub(i) + } + + // wait until everyone has been served + counts := make(map[int]int, nsubs) + for i := 0; i < nsubs; i++ { + counts[<-recv]++ + } + for i, count := range counts { + if count != 1 { + t.Errorf("receiver %d called %d times, expected only 1 call", i, count) + } + } +} + +func emptySubscriber(mux *TypeMux, types ...interface{}) { + s := mux.Subscribe(testEvent(0)) + go func() { + for _ = range s.Chan() { + } + }() +} + +func BenchmarkPost3(b *testing.B) { + var mux = NewTypeMux() + defer mux.Stop() + emptySubscriber(mux, testEvent(0)) + emptySubscriber(mux, testEvent(0)) + emptySubscriber(mux, testEvent(0)) + + for i := 0; i < b.N; i++ { + mux.Post(testEvent(0)) + } +} + +func BenchmarkPostConcurrent(b *testing.B) { + var mux = NewTypeMux() + defer mux.Stop() + emptySubscriber(mux, testEvent(0)) + emptySubscriber(mux, testEvent(0)) + emptySubscriber(mux, testEvent(0)) + + var wg sync.WaitGroup + poster := func() { + for i := 0; i < b.N; i++ { + mux.Post(testEvent(0)) + } + wg.Done() + } + wg.Add(5) + for i := 0; i < 5; i++ { + go poster() + } + wg.Wait() +} + +// for comparison +func BenchmarkChanSend(b *testing.B) { + c := make(chan interface{}) + closed := make(chan struct{}) + go func() { + for _ = range c { + } + }() + + for i := 0; i < b.N; i++ { + select { + case c <- i: + case <-closed: + } + } +} |