aboutsummaryrefslogblamecommitdiffstats
path: root/event/event_test.go
blob: 385bd70b7987f1fcff5491a9ecad335e18fb14e9 (plain) (tree)
































































































































































                                                                                               
package event

import (
    "math/rand"
    "sync"
    "testing"
    "time"
)

type testEvent int

func TestSub(t *testing.T) {
    mux := NewTypeMux()
    defer mux.Stop()

    sub := mux.Subscribe(testEvent(0))
    go func() {
        if err := mux.Post(testEvent(5)); err != nil {
            t.Errorf("Post returned unexpected error: %v", err)
        }
    }()
    ev := <-sub.Chan()

    if ev.(testEvent) != testEvent(5) {
        t.Errorf("Got %v (%T), expected event %v (%T)",
            ev, ev, testEvent(5), testEvent(5))
    }
}

func TestMuxErrorAfterStop(t *testing.T) {
    mux := NewTypeMux()
    mux.Stop()

    sub := mux.Subscribe(testEvent(0))
    if _, isopen := <-sub.Chan(); isopen {
        t.Errorf("subscription channel was not closed")
    }
    if err := mux.Post(testEvent(0)); err != ErrMuxClosed {
        t.Errorf("Post error mismatch, got: %s, expected: %s", err, ErrMuxClosed)
    }
}

func TestUnsubscribeUnblockPost(t *testing.T) {
    mux := NewTypeMux()
    defer mux.Stop()

    sub := mux.Subscribe(testEvent(0))
    unblocked := make(chan bool)
    go func() {
        mux.Post(testEvent(5))
        unblocked <- true
    }()

    select {
    case <-unblocked:
        t.Errorf("Post returned before Unsubscribe")
    default:
        sub.Unsubscribe()
        <-unblocked
    }
}

func TestMuxConcurrent(t *testing.T) {
    rand.Seed(time.Now().Unix())
    mux := NewTypeMux()
    defer mux.Stop()

    recv := make(chan int)
    poster := func() {
        for {
            err := mux.Post(testEvent(0))
            if err != nil {
                return
            }
        }
    }
    sub := func(i int) {
        time.Sleep(time.Duration(rand.Intn(99)) * time.Millisecond)
        sub := mux.Subscribe(testEvent(0))
        <-sub.Chan()
        sub.Unsubscribe()
        recv <- i
    }

    go poster()
    go poster()
    go poster()
    nsubs := 1000
    for i := 0; i < nsubs; i++ {
        go sub(i)
    }

    // wait until everyone has been served
    counts := make(map[int]int, nsubs)
    for i := 0; i < nsubs; i++ {
        counts[<-recv]++
    }
    for i, count := range counts {
        if count != 1 {
            t.Errorf("receiver %d called %d times, expected only 1 call", i, count)
        }
    }
}

func emptySubscriber(mux *TypeMux, types ...interface{}) {
    s := mux.Subscribe(testEvent(0))
    go func() {
        for _ = range s.Chan() {
        }
    }()
}

func BenchmarkPost3(b *testing.B) {
    var mux = NewTypeMux()
    defer mux.Stop()
    emptySubscriber(mux, testEvent(0))
    emptySubscriber(mux, testEvent(0))
    emptySubscriber(mux, testEvent(0))

    for i := 0; i < b.N; i++ {
        mux.Post(testEvent(0))
    }
}

func BenchmarkPostConcurrent(b *testing.B) {
    var mux = NewTypeMux()
    defer mux.Stop()
    emptySubscriber(mux, testEvent(0))
    emptySubscriber(mux, testEvent(0))
    emptySubscriber(mux, testEvent(0))

    var wg sync.WaitGroup
    poster := func() {
        for i := 0; i < b.N; i++ {
            mux.Post(testEvent(0))
        }
        wg.Done()
    }
    wg.Add(5)
    for i := 0; i < 5; i++ {
        go poster()
    }
    wg.Wait()
}

// for comparison
func BenchmarkChanSend(b *testing.B) {
    c := make(chan interface{})
    closed := make(chan struct{})
    go func() {
        for _ = range c {
        }
    }()

    for i := 0; i < b.N; i++ {
        select {
        case c <- i:
        case <-closed:
        }
    }
}