From f5b8775bed8a49136c5d7e93bb0fb991bc2b1a4b Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 14 Oct 2014 01:56:24 +0200 Subject: event: new package for event multiplexer --- event/event.go | 162 ++++++++++++++++++++++++++++++++++++++++++++++++++++ event/event_test.go | 161 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 323 insertions(+) create mode 100644 event/event.go create mode 100644 event/event_test.go (limited to 'event') diff --git a/event/event.go b/event/event.go new file mode 100644 index 000000000..74f8043da --- /dev/null +++ b/event/event.go @@ -0,0 +1,162 @@ +// Package event implements an event multiplexer. +package event + +import ( + "errors" + "reflect" + "sync" +) + +type Subscription interface { + Chan() <-chan interface{} + Unsubscribe() +} + +// A TypeMux dispatches events to registered receivers. Receivers can be +// registered to handle events of certain type. Any operation +// called after mux is stopped will return ErrMuxClosed. +type TypeMux struct { + mutex sync.RWMutex + subm map[reflect.Type][]*muxsub + stopped bool +} + +var ErrMuxClosed = errors.New("event: mux closed") + +// NewTypeMux creates a running mux. +func NewTypeMux() *TypeMux { + return &TypeMux{subm: make(map[reflect.Type][]*muxsub)} +} + +// Subscribe creates a subscription for events of the given types. The +// subscription's channel is closed when it is unsubscribed +// or the mux is closed. +func (mux *TypeMux) Subscribe(types ...interface{}) Subscription { + sub := newsub(mux) + mux.mutex.Lock() + if mux.stopped { + mux.mutex.Unlock() + close(sub.postC) + } else { + for _, t := range types { + rtyp := reflect.TypeOf(t) + oldsubs := mux.subm[rtyp] + subs := make([]*muxsub, len(oldsubs)+1) + copy(subs, oldsubs) + subs[len(oldsubs)] = sub + mux.subm[rtyp] = subs + } + mux.mutex.Unlock() + } + return sub +} + +// Post sends an event to all receivers registered for the given type. +// It returns ErrMuxClosed if the mux has been stopped. +func (mux *TypeMux) Post(ev interface{}) error { + rtyp := reflect.TypeOf(ev) + mux.mutex.RLock() + if mux.stopped { + mux.mutex.RUnlock() + return ErrMuxClosed + } + subs := mux.subm[rtyp] + mux.mutex.RUnlock() + for _, sub := range subs { + sub.deliver(ev) + } + return nil +} + +// Stop closes a mux. The mux can no longer be used. +// Future Post calls will fail with ErrMuxClosed. +// Stop blocks until all current deliveries have finished. +func (mux *TypeMux) Stop() { + mux.mutex.Lock() + for _, subs := range mux.subm { + for _, sub := range subs { + sub.closewait() + } + } + mux.subm = nil + mux.stopped = true + mux.mutex.Unlock() +} + +func (mux *TypeMux) del(s *muxsub) { + mux.mutex.Lock() + for typ, subs := range mux.subm { + if pos := find(subs, s); pos >= 0 { + if len(subs) == 1 { + delete(mux.subm, typ) + } else { + mux.subm[typ] = posdelete(subs, pos) + } + } + } + s.mux.mutex.Unlock() +} + +func find(slice []*muxsub, item *muxsub) int { + for i, v := range slice { + if v == item { + return i + } + } + return -1 +} + +func posdelete(slice []*muxsub, pos int) []*muxsub { + news := make([]*muxsub, len(slice)-1) + copy(news[:pos], slice[:pos]) + copy(news[pos:], slice[pos+1:]) + return news +} + +type muxsub struct { + mux *TypeMux + mutex sync.RWMutex + closing chan struct{} + + // these two are the same channel. they are stored separately so + // postC can be set to nil without affecting the return value of + // Chan. + readC <-chan interface{} + postC chan<- interface{} +} + +func newsub(mux *TypeMux) *muxsub { + c := make(chan interface{}) + return &muxsub{ + mux: mux, + readC: c, + postC: c, + closing: make(chan struct{}), + } +} + +func (s *muxsub) Chan() <-chan interface{} { + return s.readC +} + +func (s *muxsub) Unsubscribe() { + s.mux.del(s) + s.closewait() +} + +func (s *muxsub) closewait() { + close(s.closing) + s.mutex.Lock() + close(s.postC) + s.postC = nil + s.mutex.Unlock() +} + +func (s *muxsub) deliver(ev interface{}) { + s.mutex.RLock() + select { + case s.postC <- ev: + case <-s.closing: + } + s.mutex.RUnlock() +} diff --git a/event/event_test.go b/event/event_test.go new file mode 100644 index 000000000..385bd70b7 --- /dev/null +++ b/event/event_test.go @@ -0,0 +1,161 @@ +package event + +import ( + "math/rand" + "sync" + "testing" + "time" +) + +type testEvent int + +func TestSub(t *testing.T) { + mux := NewTypeMux() + defer mux.Stop() + + sub := mux.Subscribe(testEvent(0)) + go func() { + if err := mux.Post(testEvent(5)); err != nil { + t.Errorf("Post returned unexpected error: %v", err) + } + }() + ev := <-sub.Chan() + + if ev.(testEvent) != testEvent(5) { + t.Errorf("Got %v (%T), expected event %v (%T)", + ev, ev, testEvent(5), testEvent(5)) + } +} + +func TestMuxErrorAfterStop(t *testing.T) { + mux := NewTypeMux() + mux.Stop() + + sub := mux.Subscribe(testEvent(0)) + if _, isopen := <-sub.Chan(); isopen { + t.Errorf("subscription channel was not closed") + } + if err := mux.Post(testEvent(0)); err != ErrMuxClosed { + t.Errorf("Post error mismatch, got: %s, expected: %s", err, ErrMuxClosed) + } +} + +func TestUnsubscribeUnblockPost(t *testing.T) { + mux := NewTypeMux() + defer mux.Stop() + + sub := mux.Subscribe(testEvent(0)) + unblocked := make(chan bool) + go func() { + mux.Post(testEvent(5)) + unblocked <- true + }() + + select { + case <-unblocked: + t.Errorf("Post returned before Unsubscribe") + default: + sub.Unsubscribe() + <-unblocked + } +} + +func TestMuxConcurrent(t *testing.T) { + rand.Seed(time.Now().Unix()) + mux := NewTypeMux() + defer mux.Stop() + + recv := make(chan int) + poster := func() { + for { + err := mux.Post(testEvent(0)) + if err != nil { + return + } + } + } + sub := func(i int) { + time.Sleep(time.Duration(rand.Intn(99)) * time.Millisecond) + sub := mux.Subscribe(testEvent(0)) + <-sub.Chan() + sub.Unsubscribe() + recv <- i + } + + go poster() + go poster() + go poster() + nsubs := 1000 + for i := 0; i < nsubs; i++ { + go sub(i) + } + + // wait until everyone has been served + counts := make(map[int]int, nsubs) + for i := 0; i < nsubs; i++ { + counts[<-recv]++ + } + for i, count := range counts { + if count != 1 { + t.Errorf("receiver %d called %d times, expected only 1 call", i, count) + } + } +} + +func emptySubscriber(mux *TypeMux, types ...interface{}) { + s := mux.Subscribe(testEvent(0)) + go func() { + for _ = range s.Chan() { + } + }() +} + +func BenchmarkPost3(b *testing.B) { + var mux = NewTypeMux() + defer mux.Stop() + emptySubscriber(mux, testEvent(0)) + emptySubscriber(mux, testEvent(0)) + emptySubscriber(mux, testEvent(0)) + + for i := 0; i < b.N; i++ { + mux.Post(testEvent(0)) + } +} + +func BenchmarkPostConcurrent(b *testing.B) { + var mux = NewTypeMux() + defer mux.Stop() + emptySubscriber(mux, testEvent(0)) + emptySubscriber(mux, testEvent(0)) + emptySubscriber(mux, testEvent(0)) + + var wg sync.WaitGroup + poster := func() { + for i := 0; i < b.N; i++ { + mux.Post(testEvent(0)) + } + wg.Done() + } + wg.Add(5) + for i := 0; i < 5; i++ { + go poster() + } + wg.Wait() +} + +// for comparison +func BenchmarkChanSend(b *testing.B) { + c := make(chan interface{}) + closed := make(chan struct{}) + go func() { + for _ = range c { + } + }() + + for i := 0; i < b.N; i++ { + select { + case c <- i: + case <-closed: + } + } +} -- cgit v1.2.3 From dac4a8f113b35c67349115115af17c7f1874d939 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 16 Oct 2014 18:07:27 +0200 Subject: event: add some documentation --- event/event.go | 9 +++++++++ event/example_test.go | 42 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+) create mode 100644 event/example_test.go (limited to 'event') diff --git a/event/event.go b/event/event.go index 74f8043da..09759ee50 100644 --- a/event/event.go +++ b/event/event.go @@ -7,8 +7,16 @@ import ( "sync" ) +// Subscription is implemented by event subscriptions. type Subscription interface { + // Chan returns a channel that carries events. + // Implementations should return the same channel + // for any subsequent calls to Chan. Chan() <-chan interface{} + + // Unsubscribe stops delivery of events to a subscription. + // The event channel is closed. + // Unsubscribe can be called more than once. Unsubscribe() } @@ -21,6 +29,7 @@ type TypeMux struct { stopped bool } +// ErrMuxClosed is returned when Posting on a closed TypeMux. var ErrMuxClosed = errors.New("event: mux closed") // NewTypeMux creates a running mux. diff --git a/event/example_test.go b/event/example_test.go new file mode 100644 index 000000000..2f47f6f27 --- /dev/null +++ b/event/example_test.go @@ -0,0 +1,42 @@ +package event + +import "fmt" + +func ExampleTypeMux() { + type someEvent struct{ I int } + type otherEvent struct{ S string } + type yetAnotherEvent struct{ X, Y int } + + var mux TypeMux + + // Start a subscriber. + done := make(chan struct{}) + sub := mux.Subscribe(someEvent{}, otherEvent{}) + go func() { + for event := range sub.Chan() { + fmt.Printf("Received: %#v\n", event) + } + fmt.Println("done") + close(done) + }() + + // Post some events. + mux.Post(someEvent{5}) + mux.Post(yetAnotherEvent{X: 3, Y: 4}) + mux.Post(someEvent{6}) + mux.Post(otherEvent{"whoa"}) + + // Stop closes all subscription channels. + // The subscriber goroutine will print "done" + // and exit. + mux.Stop() + + // Wait for subscriber to return. + <-done + + // Output: + // Received: event.someEvent{I:5} + // Received: event.someEvent{I:6} + // Received: event.otherEvent{S:"whoa"} + // done +} -- cgit v1.2.3 From 10bbf265b2e8f1906602d2604f755241b8eb49e6 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 16 Oct 2014 18:08:48 +0200 Subject: event: make Unsubscribe idempotent --- event/event.go | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) (limited to 'event') diff --git a/event/event.go b/event/event.go index 09759ee50..344d1e3f6 100644 --- a/event/event.go +++ b/event/event.go @@ -124,14 +124,16 @@ func posdelete(slice []*muxsub, pos int) []*muxsub { type muxsub struct { mux *TypeMux - mutex sync.RWMutex + closeMu sync.Mutex closing chan struct{} + closed bool // these two are the same channel. they are stored separately so // postC can be set to nil without affecting the return value of // Chan. - readC <-chan interface{} - postC chan<- interface{} + postMu sync.RWMutex + readC <-chan interface{} + postC chan<- interface{} } func newsub(mux *TypeMux) *muxsub { @@ -154,18 +156,25 @@ func (s *muxsub) Unsubscribe() { } func (s *muxsub) closewait() { + s.closeMu.Lock() + defer s.closeMu.Unlock() + if s.closed { + return + } close(s.closing) - s.mutex.Lock() + s.closed = true + + s.postMu.Lock() close(s.postC) s.postC = nil - s.mutex.Unlock() + s.postMu.Unlock() } func (s *muxsub) deliver(ev interface{}) { - s.mutex.RLock() + s.postMu.RLock() select { case s.postC <- ev: case <-s.closing: } - s.mutex.RUnlock() + s.postMu.RUnlock() } -- cgit v1.2.3 From 690690489610352d43f8547744b6c9486ad5affa Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 16 Oct 2014 18:10:09 +0200 Subject: event: make TypeMux zero value ready to use --- event/event.go | 13 ++++++------- event/event_test.go | 12 ++++++------ 2 files changed, 12 insertions(+), 13 deletions(-) (limited to 'event') diff --git a/event/event.go b/event/event.go index 344d1e3f6..d11a0e9bd 100644 --- a/event/event.go +++ b/event/event.go @@ -23,6 +23,8 @@ type Subscription interface { // A TypeMux dispatches events to registered receivers. Receivers can be // registered to handle events of certain type. Any operation // called after mux is stopped will return ErrMuxClosed. +// +// The zero value is ready to use. type TypeMux struct { mutex sync.RWMutex subm map[reflect.Type][]*muxsub @@ -32,11 +34,6 @@ type TypeMux struct { // ErrMuxClosed is returned when Posting on a closed TypeMux. var ErrMuxClosed = errors.New("event: mux closed") -// NewTypeMux creates a running mux. -func NewTypeMux() *TypeMux { - return &TypeMux{subm: make(map[reflect.Type][]*muxsub)} -} - // Subscribe creates a subscription for events of the given types. The // subscription's channel is closed when it is unsubscribed // or the mux is closed. @@ -44,9 +41,11 @@ func (mux *TypeMux) Subscribe(types ...interface{}) Subscription { sub := newsub(mux) mux.mutex.Lock() if mux.stopped { - mux.mutex.Unlock() close(sub.postC) } else { + if mux.subm == nil { + mux.subm = make(map[reflect.Type][]*muxsub) + } for _, t := range types { rtyp := reflect.TypeOf(t) oldsubs := mux.subm[rtyp] @@ -55,8 +54,8 @@ func (mux *TypeMux) Subscribe(types ...interface{}) Subscription { subs[len(oldsubs)] = sub mux.subm[rtyp] = subs } - mux.mutex.Unlock() } + mux.mutex.Unlock() return sub } diff --git a/event/event_test.go b/event/event_test.go index 385bd70b7..f65aaa0a2 100644 --- a/event/event_test.go +++ b/event/event_test.go @@ -10,7 +10,7 @@ import ( type testEvent int func TestSub(t *testing.T) { - mux := NewTypeMux() + mux := new(TypeMux) defer mux.Stop() sub := mux.Subscribe(testEvent(0)) @@ -28,7 +28,7 @@ func TestSub(t *testing.T) { } func TestMuxErrorAfterStop(t *testing.T) { - mux := NewTypeMux() + mux := new(TypeMux) mux.Stop() sub := mux.Subscribe(testEvent(0)) @@ -41,7 +41,7 @@ func TestMuxErrorAfterStop(t *testing.T) { } func TestUnsubscribeUnblockPost(t *testing.T) { - mux := NewTypeMux() + mux := new(TypeMux) defer mux.Stop() sub := mux.Subscribe(testEvent(0)) @@ -62,7 +62,7 @@ func TestUnsubscribeUnblockPost(t *testing.T) { func TestMuxConcurrent(t *testing.T) { rand.Seed(time.Now().Unix()) - mux := NewTypeMux() + mux := new(TypeMux) defer mux.Stop() recv := make(chan int) @@ -111,7 +111,7 @@ func emptySubscriber(mux *TypeMux, types ...interface{}) { } func BenchmarkPost3(b *testing.B) { - var mux = NewTypeMux() + var mux = new(TypeMux) defer mux.Stop() emptySubscriber(mux, testEvent(0)) emptySubscriber(mux, testEvent(0)) @@ -123,7 +123,7 @@ func BenchmarkPost3(b *testing.B) { } func BenchmarkPostConcurrent(b *testing.B) { - var mux = NewTypeMux() + var mux = new(TypeMux) defer mux.Stop() emptySubscriber(mux, testEvent(0)) emptySubscriber(mux, testEvent(0)) -- cgit v1.2.3 From fa84e50ddb8e64d4cb92d58e235cfed13761f21e Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 16 Oct 2014 18:59:28 +0200 Subject: event: panic for duplicate type --- event/event.go | 6 +++++- event/event_test.go | 15 +++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) (limited to 'event') diff --git a/event/event.go b/event/event.go index d11a0e9bd..540fbba65 100644 --- a/event/event.go +++ b/event/event.go @@ -3,6 +3,7 @@ package event import ( "errors" + "fmt" "reflect" "sync" ) @@ -40,6 +41,7 @@ var ErrMuxClosed = errors.New("event: mux closed") func (mux *TypeMux) Subscribe(types ...interface{}) Subscription { sub := newsub(mux) mux.mutex.Lock() + defer mux.mutex.Unlock() if mux.stopped { close(sub.postC) } else { @@ -49,13 +51,15 @@ func (mux *TypeMux) Subscribe(types ...interface{}) Subscription { for _, t := range types { rtyp := reflect.TypeOf(t) oldsubs := mux.subm[rtyp] + if find(oldsubs, sub) != -1 { + panic(fmt.Sprintf("event: duplicate type %s in Subscribe", rtyp)) + } subs := make([]*muxsub, len(oldsubs)+1) copy(subs, oldsubs) subs[len(oldsubs)] = sub mux.subm[rtyp] = subs } } - mux.mutex.Unlock() return sub } diff --git a/event/event_test.go b/event/event_test.go index f65aaa0a2..c7c0266c1 100644 --- a/event/event_test.go +++ b/event/event_test.go @@ -60,6 +60,21 @@ func TestUnsubscribeUnblockPost(t *testing.T) { } } +func TestSubscribeDuplicateType(t *testing.T) { + mux := new(TypeMux) + expected := "event: duplicate type event.testEvent in Subscribe" + + defer func() { + err := recover() + if err == nil { + t.Errorf("Subscribe didn't panic for duplicate type") + } else if err != expected { + t.Errorf("panic mismatch: got %#v, expected %#v", err, expected) + } + }() + mux.Subscribe(testEvent(1), testEvent(2)) +} + func TestMuxConcurrent(t *testing.T) { rand.Seed(time.Now().Unix()) mux := new(TypeMux) -- cgit v1.2.3 From 2a5af8fac72846a7354dd79143727bac4ee89cb9 Mon Sep 17 00:00:00 2001 From: Taylor Gerring Date: Mon, 17 Nov 2014 10:40:40 -0600 Subject: enable `go vet` --- event/profile.tmp | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 event/profile.tmp (limited to 'event') diff --git a/event/profile.tmp b/event/profile.tmp new file mode 100644 index 000000000..c6496902d --- /dev/null +++ b/event/profile.tmp @@ -0,0 +1,40 @@ +mode: count +github.com/ethereum/go-ethereum/event/event.go:41.66,45.17 4 1005 +github.com/ethereum/go-ethereum/event/event.go:63.2,63.12 1 1004 +github.com/ethereum/go-ethereum/event/event.go:45.17,47.3 1 1 +github.com/ethereum/go-ethereum/event/event.go:47.3,48.22 1 1004 +github.com/ethereum/go-ethereum/event/event.go:51.3,51.27 1 1004 +github.com/ethereum/go-ethereum/event/event.go:48.22,50.4 1 5 +github.com/ethereum/go-ethereum/event/event.go:51.27,54.32 3 1006 +github.com/ethereum/go-ethereum/event/event.go:57.4,60.25 4 1005 +github.com/ethereum/go-ethereum/event/event.go:54.32,56.5 1 1 +github.com/ethereum/go-ethereum/event/event.go:68.48,71.17 3 3513 +github.com/ethereum/go-ethereum/event/event.go:75.2,77.27 3 3511 +github.com/ethereum/go-ethereum/event/event.go:80.2,80.12 1 3509 +github.com/ethereum/go-ethereum/event/event.go:71.17,74.3 2 2 +github.com/ethereum/go-ethereum/event/event.go:77.27,79.3 1 2576 +github.com/ethereum/go-ethereum/event/event.go:86.28,88.32 2 5 +github.com/ethereum/go-ethereum/event/event.go:93.2,95.20 3 5 +github.com/ethereum/go-ethereum/event/event.go:88.32,89.28 1 3 +github.com/ethereum/go-ethereum/event/event.go:89.28,91.4 1 3 +github.com/ethereum/go-ethereum/event/event.go:98.36,100.34 2 1001 +github.com/ethereum/go-ethereum/event/event.go:109.2,109.22 1 1001 +github.com/ethereum/go-ethereum/event/event.go:100.34,101.37 1 1001 +github.com/ethereum/go-ethereum/event/event.go:101.37,102.22 1 1001 +github.com/ethereum/go-ethereum/event/event.go:102.22,104.5 1 2 +github.com/ethereum/go-ethereum/event/event.go:104.5,106.5 1 999 +github.com/ethereum/go-ethereum/event/event.go:112.46,113.26 1 2007 +github.com/ethereum/go-ethereum/event/event.go:118.2,118.11 1 1005 +github.com/ethereum/go-ethereum/event/event.go:113.26,114.16 1 181499 +github.com/ethereum/go-ethereum/event/event.go:114.16,116.4 1 1002 +github.com/ethereum/go-ethereum/event/event.go:121.52,126.2 4 999 +github.com/ethereum/go-ethereum/event/event.go:142.35,150.2 2 1005 +github.com/ethereum/go-ethereum/event/event.go:152.44,154.2 1 1003 +github.com/ethereum/go-ethereum/event/event.go:156.32,159.2 2 1001 +github.com/ethereum/go-ethereum/event/event.go:161.30,164.14 3 1004 +github.com/ethereum/go-ethereum/event/event.go:167.2,173.19 6 1003 +github.com/ethereum/go-ethereum/event/event.go:164.14,166.3 1 1 +github.com/ethereum/go-ethereum/event/event.go:176.42,178.9 2 2575 +github.com/ethereum/go-ethereum/event/event.go:182.2,182.20 1 2575 +github.com/ethereum/go-ethereum/event/event.go:179.2,179.21 0 1004 +github.com/ethereum/go-ethereum/event/event.go:180.2,180.19 0 1571 -- cgit v1.2.3 From 14e2e488fdf0f4d6ed1a5a48ffbbe883faa7edb6 Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 19 Nov 2014 12:25:52 +0100 Subject: Added `chain` tests & minor fixes * Fork tests (equal and larger chains) * `chain.link` fields are now exported * moved debug function from state to dump.go --- event/profile.tmp | 40 ---------------------------------------- 1 file changed, 40 deletions(-) delete mode 100644 event/profile.tmp (limited to 'event') diff --git a/event/profile.tmp b/event/profile.tmp deleted file mode 100644 index c6496902d..000000000 --- a/event/profile.tmp +++ /dev/null @@ -1,40 +0,0 @@ -mode: count -github.com/ethereum/go-ethereum/event/event.go:41.66,45.17 4 1005 -github.com/ethereum/go-ethereum/event/event.go:63.2,63.12 1 1004 -github.com/ethereum/go-ethereum/event/event.go:45.17,47.3 1 1 -github.com/ethereum/go-ethereum/event/event.go:47.3,48.22 1 1004 -github.com/ethereum/go-ethereum/event/event.go:51.3,51.27 1 1004 -github.com/ethereum/go-ethereum/event/event.go:48.22,50.4 1 5 -github.com/ethereum/go-ethereum/event/event.go:51.27,54.32 3 1006 -github.com/ethereum/go-ethereum/event/event.go:57.4,60.25 4 1005 -github.com/ethereum/go-ethereum/event/event.go:54.32,56.5 1 1 -github.com/ethereum/go-ethereum/event/event.go:68.48,71.17 3 3513 -github.com/ethereum/go-ethereum/event/event.go:75.2,77.27 3 3511 -github.com/ethereum/go-ethereum/event/event.go:80.2,80.12 1 3509 -github.com/ethereum/go-ethereum/event/event.go:71.17,74.3 2 2 -github.com/ethereum/go-ethereum/event/event.go:77.27,79.3 1 2576 -github.com/ethereum/go-ethereum/event/event.go:86.28,88.32 2 5 -github.com/ethereum/go-ethereum/event/event.go:93.2,95.20 3 5 -github.com/ethereum/go-ethereum/event/event.go:88.32,89.28 1 3 -github.com/ethereum/go-ethereum/event/event.go:89.28,91.4 1 3 -github.com/ethereum/go-ethereum/event/event.go:98.36,100.34 2 1001 -github.com/ethereum/go-ethereum/event/event.go:109.2,109.22 1 1001 -github.com/ethereum/go-ethereum/event/event.go:100.34,101.37 1 1001 -github.com/ethereum/go-ethereum/event/event.go:101.37,102.22 1 1001 -github.com/ethereum/go-ethereum/event/event.go:102.22,104.5 1 2 -github.com/ethereum/go-ethereum/event/event.go:104.5,106.5 1 999 -github.com/ethereum/go-ethereum/event/event.go:112.46,113.26 1 2007 -github.com/ethereum/go-ethereum/event/event.go:118.2,118.11 1 1005 -github.com/ethereum/go-ethereum/event/event.go:113.26,114.16 1 181499 -github.com/ethereum/go-ethereum/event/event.go:114.16,116.4 1 1002 -github.com/ethereum/go-ethereum/event/event.go:121.52,126.2 4 999 -github.com/ethereum/go-ethereum/event/event.go:142.35,150.2 2 1005 -github.com/ethereum/go-ethereum/event/event.go:152.44,154.2 1 1003 -github.com/ethereum/go-ethereum/event/event.go:156.32,159.2 2 1001 -github.com/ethereum/go-ethereum/event/event.go:161.30,164.14 3 1004 -github.com/ethereum/go-ethereum/event/event.go:167.2,173.19 6 1003 -github.com/ethereum/go-ethereum/event/event.go:164.14,166.3 1 1 -github.com/ethereum/go-ethereum/event/event.go:176.42,178.9 2 2575 -github.com/ethereum/go-ethereum/event/event.go:182.2,182.20 1 2575 -github.com/ethereum/go-ethereum/event/event.go:179.2,179.21 0 1004 -github.com/ethereum/go-ethereum/event/event.go:180.2,180.19 0 1571 -- cgit v1.2.3 From ed1538248f2e7a44680d22a052a234a31b736624 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 12 Dec 2014 22:19:39 +0100 Subject: Moved filter to events --- event/filter/filter.go | 70 ++++++++++++++++++++++++++++++++++++++++++ event/filter/filter_test.go | 34 ++++++++++++++++++++ event/filter/generic_filter.go | 22 +++++++++++++ 3 files changed, 126 insertions(+) create mode 100644 event/filter/filter.go create mode 100644 event/filter/filter_test.go create mode 100644 event/filter/generic_filter.go (limited to 'event') diff --git a/event/filter/filter.go b/event/filter/filter.go new file mode 100644 index 000000000..9817d5782 --- /dev/null +++ b/event/filter/filter.go @@ -0,0 +1,70 @@ +package filter + +import "reflect" + +type Filter interface { + Compare(Filter) bool + Trigger(data interface{}) +} + +type FilterEvent struct { + filter Filter + data interface{} +} + +type Filters struct { + id int + watchers map[int]Filter + ch chan FilterEvent + + quit chan struct{} +} + +func New() *Filters { + return &Filters{ + ch: make(chan FilterEvent), + watchers: make(map[int]Filter), + quit: make(chan struct{}), + } +} + +func (self *Filters) Start() { + go self.loop() +} + +func (self *Filters) Stop() { + close(self.quit) +} + +func (self *Filters) Notify(filter Filter, data interface{}) { + self.ch <- FilterEvent{filter, data} +} + +func (self *Filters) Install(watcher Filter) int { + self.watchers[self.id] = watcher + self.id++ + + return self.id - 1 +} + +func (self *Filters) Uninstall(id int) { + delete(self.watchers, id) +} + +func (self *Filters) loop() { +out: + for { + select { + case <-self.quit: + break out + case event := <-self.ch: + for _, watcher := range self.watchers { + if reflect.TypeOf(watcher) == reflect.TypeOf(event.filter) { + if watcher.Compare(event.filter) { + watcher.Trigger(event.data) + } + } + } + } + } +} diff --git a/event/filter/filter_test.go b/event/filter/filter_test.go new file mode 100644 index 000000000..815deb63a --- /dev/null +++ b/event/filter/filter_test.go @@ -0,0 +1,34 @@ +package filter + +import "testing" + +func TestFilters(t *testing.T) { + var success bool + var failure bool + + fm := New() + fm.Start() + fm.Install(Generic{ + Str1: "hello", + Fn: func(data interface{}) { + success = data.(bool) + }, + }) + fm.Install(Generic{ + Str1: "hello1", + Str2: "hello", + Fn: func(data interface{}) { + failure = true + }, + }) + fm.Notify(Generic{Str1: "hello"}, true) + fm.Stop() + + if !success { + t.Error("expected 'hello' to be posted") + } + + if failure { + t.Error("hello1 was triggered") + } +} diff --git a/event/filter/generic_filter.go b/event/filter/generic_filter.go new file mode 100644 index 000000000..b04b4801e --- /dev/null +++ b/event/filter/generic_filter.go @@ -0,0 +1,22 @@ +package filter + +type Generic struct { + Str1, Str2, Str3 string + + Fn func(data interface{}) +} + +func (self Generic) Compare(f Filter) bool { + filter := f.(Generic) + if (len(self.Str1) == 0 || filter.Str1 == self.Str1) && + (len(self.Str2) == 0 || filter.Str2 == self.Str2) && + (len(self.Str3) == 0 || filter.Str3 == self.Str3) { + return true + } + + return false +} + +func (self Generic) Trigger(data interface{}) { + self.Fn(data) +} -- cgit v1.2.3