diff options
author | Jeffrey Wilcke <obscuren@users.noreply.github.com> | 2014-10-10 22:57:50 +0800 |
---|---|---|
committer | Jeffrey Wilcke <obscuren@users.noreply.github.com> | 2014-10-10 22:57:50 +0800 |
commit | 6fec5bd32e64e15d16085591e732b26a243297fa (patch) | |
tree | 75269be1c098d1346fcb6460747969372b5f4cf3 | |
parent | a38dafcc57d296447db9748c8c85df6c58b243fb (diff) | |
parent | 44674cb96c64ddf9a8b3345f14329c030ecd4ed6 (diff) | |
download | go-tangerine-6fec5bd32e64e15d16085591e732b26a243297fa.tar go-tangerine-6fec5bd32e64e15d16085591e732b26a243297fa.tar.gz go-tangerine-6fec5bd32e64e15d16085591e732b26a243297fa.tar.bz2 go-tangerine-6fec5bd32e64e15d16085591e732b26a243297fa.tar.lz go-tangerine-6fec5bd32e64e15d16085591e732b26a243297fa.tar.xz go-tangerine-6fec5bd32e64e15d16085591e732b26a243297fa.tar.zst go-tangerine-6fec5bd32e64e15d16085591e732b26a243297fa.zip |
Merge pull request #56 from fjl/feature/raceless-eventer
Fix Eventer race
-rw-r--r-- | eventer/eventer.go | 12 | ||||
-rw-r--r-- | eventer/eventer_test.go | 57 |
2 files changed, 60 insertions, 9 deletions
diff --git a/eventer/eventer.go b/eventer/eventer.go index fb2f299a3..6e5ee2ec5 100644 --- a/eventer/eventer.go +++ b/eventer/eventer.go @@ -1,5 +1,7 @@ package eventer +import "sync" + // Basic receiver interface. type Receiver interface { Send(Event) @@ -27,17 +29,18 @@ type Event struct { type Channels map[string][]Receiver type EventMachine struct { + mu sync.RWMutex channels Channels } func New() *EventMachine { - return &EventMachine{ - channels: make(Channels), - } + return &EventMachine{channels: make(Channels)} } func (self *EventMachine) add(typ string, r Receiver) { + self.mu.Lock() self.channels[typ] = append(self.channels[typ], r) + self.mu.Unlock() } // Generalised methods for the known receiver types @@ -64,11 +67,11 @@ func (self *EventMachine) RegisterFunc(typ string, f Function) { func (self *EventMachine) Register(typ string) Channel { c := make(Channel, 1) self.add(typ, c) - return c } func (self *EventMachine) Post(typ string, data interface{}) { + self.mu.RLock() if self.channels[typ] != nil { ev := Event{typ, data} for _, receiver := range self.channels[typ] { @@ -76,4 +79,5 @@ func (self *EventMachine) Post(typ string, data interface{}) { receiver.Send(ev) } } + self.mu.RUnlock() } diff --git a/eventer/eventer_test.go b/eventer/eventer_test.go index b35267af6..a5db6d901 100644 --- a/eventer/eventer_test.go +++ b/eventer/eventer_test.go @@ -1,9 +1,13 @@ package eventer -import "testing" +import ( + "math/rand" + "testing" + "time" +) func TestChannel(t *testing.T) { - eventer := New(nil) + eventer := New() c := make(Channel, 1) eventer.RegisterChannel("test", c) @@ -17,7 +21,7 @@ func TestChannel(t *testing.T) { } func TestFunction(t *testing.T) { - eventer := New(nil) + eventer := New() var data string eventer.RegisterFunc("test", func(ev Event) { @@ -31,7 +35,7 @@ func TestFunction(t *testing.T) { } func TestRegister(t *testing.T) { - eventer := New(nil) + eventer := New() c := eventer.Register("test") eventer.Post("test", "hello world") @@ -44,7 +48,7 @@ func TestRegister(t *testing.T) { } func TestOn(t *testing.T) { - eventer := New(nil) + eventer := New() c := make(Channel, 1) eventer.On("test", c) @@ -64,3 +68,46 @@ func TestOn(t *testing.T) { t.Error("Expected function event with data 'hello world'. Got", data) } } + +func TestConcurrentUsage(t *testing.T) { + rand.Seed(time.Now().Unix()) + eventer := New() + stop := make(chan struct{}) + recv := make(chan int) + poster := func() { + for { + select { + case <-stop: + return + default: + eventer.Post("test", "hi") + } + } + } + listener := func(i int) { + time.Sleep(time.Duration(rand.Intn(99)) * time.Millisecond) + c := eventer.Register("test") + // wait for the first event + <-c + recv <- i + // keep receiving to prevent deadlock + for { + select { + case <-stop: + return + case <-c: + } + } + } + + nlisteners := 200 + go poster() + for i := 0; i < nlisteners; i++ { + go listener(i) + } + // wait until everyone has been served + for i := 0; i < nlisteners; i++ { + <-recv + } + close(stop) +} |