From f5b8775bed8a49136c5d7e93bb0fb991bc2b1a4b Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 14 Oct 2014 01:56:24 +0200 Subject: event: new package for event multiplexer --- event/event_test.go | 161 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 161 insertions(+) create mode 100644 event/event_test.go (limited to 'event/event_test.go') diff --git a/event/event_test.go b/event/event_test.go new file mode 100644 index 000000000..385bd70b7 --- /dev/null +++ b/event/event_test.go @@ -0,0 +1,161 @@ +package event + +import ( + "math/rand" + "sync" + "testing" + "time" +) + +type testEvent int + +func TestSub(t *testing.T) { + mux := NewTypeMux() + defer mux.Stop() + + sub := mux.Subscribe(testEvent(0)) + go func() { + if err := mux.Post(testEvent(5)); err != nil { + t.Errorf("Post returned unexpected error: %v", err) + } + }() + ev := <-sub.Chan() + + if ev.(testEvent) != testEvent(5) { + t.Errorf("Got %v (%T), expected event %v (%T)", + ev, ev, testEvent(5), testEvent(5)) + } +} + +func TestMuxErrorAfterStop(t *testing.T) { + mux := NewTypeMux() + mux.Stop() + + sub := mux.Subscribe(testEvent(0)) + if _, isopen := <-sub.Chan(); isopen { + t.Errorf("subscription channel was not closed") + } + if err := mux.Post(testEvent(0)); err != ErrMuxClosed { + t.Errorf("Post error mismatch, got: %s, expected: %s", err, ErrMuxClosed) + } +} + +func TestUnsubscribeUnblockPost(t *testing.T) { + mux := NewTypeMux() + defer mux.Stop() + + sub := mux.Subscribe(testEvent(0)) + unblocked := make(chan bool) + go func() { + mux.Post(testEvent(5)) + unblocked <- true + }() + + select { + case <-unblocked: + t.Errorf("Post returned before Unsubscribe") + default: + sub.Unsubscribe() + <-unblocked + } +} + +func TestMuxConcurrent(t *testing.T) { + rand.Seed(time.Now().Unix()) + mux := NewTypeMux() + defer mux.Stop() + + recv := make(chan int) + poster := func() { + for { + err := mux.Post(testEvent(0)) + if err != nil { + return + } + } + } + sub := func(i int) { + time.Sleep(time.Duration(rand.Intn(99)) * time.Millisecond) + sub := mux.Subscribe(testEvent(0)) + <-sub.Chan() + sub.Unsubscribe() + recv <- i + } + + go poster() + go poster() + go poster() + nsubs := 1000 + for i := 0; i < nsubs; i++ { + go sub(i) + } + + // wait until everyone has been served + counts := make(map[int]int, nsubs) + for i := 0; i < nsubs; i++ { + counts[<-recv]++ + } + for i, count := range counts { + if count != 1 { + t.Errorf("receiver %d called %d times, expected only 1 call", i, count) + } + } +} + +func emptySubscriber(mux *TypeMux, types ...interface{}) { + s := mux.Subscribe(testEvent(0)) + go func() { + for _ = range s.Chan() { + } + }() +} + +func BenchmarkPost3(b *testing.B) { + var mux = NewTypeMux() + defer mux.Stop() + emptySubscriber(mux, testEvent(0)) + emptySubscriber(mux, testEvent(0)) + emptySubscriber(mux, testEvent(0)) + + for i := 0; i < b.N; i++ { + mux.Post(testEvent(0)) + } +} + +func BenchmarkPostConcurrent(b *testing.B) { + var mux = NewTypeMux() + defer mux.Stop() + emptySubscriber(mux, testEvent(0)) + emptySubscriber(mux, testEvent(0)) + emptySubscriber(mux, testEvent(0)) + + var wg sync.WaitGroup + poster := func() { + for i := 0; i < b.N; i++ { + mux.Post(testEvent(0)) + } + wg.Done() + } + wg.Add(5) + for i := 0; i < 5; i++ { + go poster() + } + wg.Wait() +} + +// for comparison +func BenchmarkChanSend(b *testing.B) { + c := make(chan interface{}) + closed := make(chan struct{}) + go func() { + for _ = range c { + } + }() + + for i := 0; i < b.N; i++ { + select { + case c <- i: + case <-closed: + } + } +} -- cgit v1.2.3 From 690690489610352d43f8547744b6c9486ad5affa Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 16 Oct 2014 18:10:09 +0200 Subject: event: make TypeMux zero value ready to use --- event/event_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'event/event_test.go') diff --git a/event/event_test.go b/event/event_test.go index 385bd70b7..f65aaa0a2 100644 --- a/event/event_test.go +++ b/event/event_test.go @@ -10,7 +10,7 @@ import ( type testEvent int func TestSub(t *testing.T) { - mux := NewTypeMux() + mux := new(TypeMux) defer mux.Stop() sub := mux.Subscribe(testEvent(0)) @@ -28,7 +28,7 @@ func TestSub(t *testing.T) { } func TestMuxErrorAfterStop(t *testing.T) { - mux := NewTypeMux() + mux := new(TypeMux) mux.Stop() sub := mux.Subscribe(testEvent(0)) @@ -41,7 +41,7 @@ func TestMuxErrorAfterStop(t *testing.T) { } func TestUnsubscribeUnblockPost(t *testing.T) { - mux := NewTypeMux() + mux := new(TypeMux) defer mux.Stop() sub := mux.Subscribe(testEvent(0)) @@ -62,7 +62,7 @@ func TestUnsubscribeUnblockPost(t *testing.T) { func TestMuxConcurrent(t *testing.T) { rand.Seed(time.Now().Unix()) - mux := NewTypeMux() + mux := new(TypeMux) defer mux.Stop() recv := make(chan int) @@ -111,7 +111,7 @@ func emptySubscriber(mux *TypeMux, types ...interface{}) { } func BenchmarkPost3(b *testing.B) { - var mux = NewTypeMux() + var mux = new(TypeMux) defer mux.Stop() emptySubscriber(mux, testEvent(0)) emptySubscriber(mux, testEvent(0)) @@ -123,7 +123,7 @@ func BenchmarkPost3(b *testing.B) { } func BenchmarkPostConcurrent(b *testing.B) { - var mux = NewTypeMux() + var mux = new(TypeMux) defer mux.Stop() emptySubscriber(mux, testEvent(0)) emptySubscriber(mux, testEvent(0)) -- cgit v1.2.3 From fa84e50ddb8e64d4cb92d58e235cfed13761f21e Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 16 Oct 2014 18:59:28 +0200 Subject: event: panic for duplicate type --- event/event_test.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) (limited to 'event/event_test.go') diff --git a/event/event_test.go b/event/event_test.go index f65aaa0a2..c7c0266c1 100644 --- a/event/event_test.go +++ b/event/event_test.go @@ -60,6 +60,21 @@ func TestUnsubscribeUnblockPost(t *testing.T) { } } +func TestSubscribeDuplicateType(t *testing.T) { + mux := new(TypeMux) + expected := "event: duplicate type event.testEvent in Subscribe" + + defer func() { + err := recover() + if err == nil { + t.Errorf("Subscribe didn't panic for duplicate type") + } else if err != expected { + t.Errorf("panic mismatch: got %#v, expected %#v", err, expected) + } + }() + mux.Subscribe(testEvent(1), testEvent(2)) +} + func TestMuxConcurrent(t *testing.T) { rand.Seed(time.Now().Unix()) mux := new(TypeMux) -- cgit v1.2.3