aboutsummaryrefslogtreecommitdiffstats
path: root/event
diff options
context:
space:
mode:
Diffstat (limited to 'event')
-rw-r--r--event/event.go183
-rw-r--r--event/event_test.go176
-rw-r--r--event/example_test.go42
-rw-r--r--event/filter/eth_filter.go111
-rw-r--r--event/filter/filter.go78
-rw-r--r--event/filter/filter_test.go34
-rw-r--r--event/filter/generic_filter.go32
7 files changed, 656 insertions, 0 deletions
diff --git a/event/event.go b/event/event.go
new file mode 100644
index 000000000..540fbba65
--- /dev/null
+++ b/event/event.go
@@ -0,0 +1,183 @@
+// Package event implements an event multiplexer.
+package event
+
+import (
+ "errors"
+ "fmt"
+ "reflect"
+ "sync"
+)
+
+// Subscription is implemented by event subscriptions.
+type Subscription interface {
+ // Chan returns a channel that carries events.
+ // Implementations should return the same channel
+ // for any subsequent calls to Chan.
+ Chan() <-chan interface{}
+
+ // Unsubscribe stops delivery of events to a subscription.
+ // The event channel is closed.
+ // Unsubscribe can be called more than once.
+ Unsubscribe()
+}
+
+// A TypeMux dispatches events to registered receivers. Receivers can be
+// registered to handle events of certain type. Any operation
+// called after mux is stopped will return ErrMuxClosed.
+//
+// The zero value is ready to use.
+type TypeMux struct {
+ mutex sync.RWMutex
+ subm map[reflect.Type][]*muxsub
+ stopped bool
+}
+
+// ErrMuxClosed is returned when Posting on a closed TypeMux.
+var ErrMuxClosed = errors.New("event: mux closed")
+
+// Subscribe creates a subscription for events of the given types. The
+// subscription's channel is closed when it is unsubscribed
+// or the mux is closed.
+func (mux *TypeMux) Subscribe(types ...interface{}) Subscription {
+ sub := newsub(mux)
+ mux.mutex.Lock()
+ defer mux.mutex.Unlock()
+ if mux.stopped {
+ close(sub.postC)
+ } else {
+ if mux.subm == nil {
+ mux.subm = make(map[reflect.Type][]*muxsub)
+ }
+ for _, t := range types {
+ rtyp := reflect.TypeOf(t)
+ oldsubs := mux.subm[rtyp]
+ if find(oldsubs, sub) != -1 {
+ panic(fmt.Sprintf("event: duplicate type %s in Subscribe", rtyp))
+ }
+ subs := make([]*muxsub, len(oldsubs)+1)
+ copy(subs, oldsubs)
+ subs[len(oldsubs)] = sub
+ mux.subm[rtyp] = subs
+ }
+ }
+ return sub
+}
+
+// Post sends an event to all receivers registered for the given type.
+// It returns ErrMuxClosed if the mux has been stopped.
+func (mux *TypeMux) Post(ev interface{}) error {
+ rtyp := reflect.TypeOf(ev)
+ mux.mutex.RLock()
+ if mux.stopped {
+ mux.mutex.RUnlock()
+ return ErrMuxClosed
+ }
+ subs := mux.subm[rtyp]
+ mux.mutex.RUnlock()
+ for _, sub := range subs {
+ sub.deliver(ev)
+ }
+ return nil
+}
+
+// Stop closes a mux. The mux can no longer be used.
+// Future Post calls will fail with ErrMuxClosed.
+// Stop blocks until all current deliveries have finished.
+func (mux *TypeMux) Stop() {
+ mux.mutex.Lock()
+ for _, subs := range mux.subm {
+ for _, sub := range subs {
+ sub.closewait()
+ }
+ }
+ mux.subm = nil
+ mux.stopped = true
+ mux.mutex.Unlock()
+}
+
+func (mux *TypeMux) del(s *muxsub) {
+ mux.mutex.Lock()
+ for typ, subs := range mux.subm {
+ if pos := find(subs, s); pos >= 0 {
+ if len(subs) == 1 {
+ delete(mux.subm, typ)
+ } else {
+ mux.subm[typ] = posdelete(subs, pos)
+ }
+ }
+ }
+ s.mux.mutex.Unlock()
+}
+
+func find(slice []*muxsub, item *muxsub) int {
+ for i, v := range slice {
+ if v == item {
+ return i
+ }
+ }
+ return -1
+}
+
+func posdelete(slice []*muxsub, pos int) []*muxsub {
+ news := make([]*muxsub, len(slice)-1)
+ copy(news[:pos], slice[:pos])
+ copy(news[pos:], slice[pos+1:])
+ return news
+}
+
+type muxsub struct {
+ mux *TypeMux
+ closeMu sync.Mutex
+ closing chan struct{}
+ closed bool
+
+ // these two are the same channel. they are stored separately so
+ // postC can be set to nil without affecting the return value of
+ // Chan.
+ postMu sync.RWMutex
+ readC <-chan interface{}
+ postC chan<- interface{}
+}
+
+func newsub(mux *TypeMux) *muxsub {
+ c := make(chan interface{})
+ return &muxsub{
+ mux: mux,
+ readC: c,
+ postC: c,
+ closing: make(chan struct{}),
+ }
+}
+
+func (s *muxsub) Chan() <-chan interface{} {
+ return s.readC
+}
+
+func (s *muxsub) Unsubscribe() {
+ s.mux.del(s)
+ s.closewait()
+}
+
+func (s *muxsub) closewait() {
+ s.closeMu.Lock()
+ defer s.closeMu.Unlock()
+ if s.closed {
+ return
+ }
+ close(s.closing)
+ s.closed = true
+
+ s.postMu.Lock()
+ close(s.postC)
+ s.postC = nil
+ s.postMu.Unlock()
+}
+
+func (s *muxsub) deliver(ev interface{}) {
+ s.postMu.RLock()
+ select {
+ case s.postC <- ev:
+ case <-s.closing:
+ }
+ s.postMu.RUnlock()
+}
diff --git a/event/event_test.go b/event/event_test.go
new file mode 100644
index 000000000..c7c0266c1
--- /dev/null
+++ b/event/event_test.go
@@ -0,0 +1,176 @@
+package event
+
+import (
+ "math/rand"
+ "sync"
+ "testing"
+ "time"
+)
+
+type testEvent int
+
+func TestSub(t *testing.T) {
+ mux := new(TypeMux)
+ defer mux.Stop()
+
+ sub := mux.Subscribe(testEvent(0))
+ go func() {
+ if err := mux.Post(testEvent(5)); err != nil {
+ t.Errorf("Post returned unexpected error: %v", err)
+ }
+ }()
+ ev := <-sub.Chan()
+
+ if ev.(testEvent) != testEvent(5) {
+ t.Errorf("Got %v (%T), expected event %v (%T)",
+ ev, ev, testEvent(5), testEvent(5))
+ }
+}
+
+func TestMuxErrorAfterStop(t *testing.T) {
+ mux := new(TypeMux)
+ mux.Stop()
+
+ sub := mux.Subscribe(testEvent(0))
+ if _, isopen := <-sub.Chan(); isopen {
+ t.Errorf("subscription channel was not closed")
+ }
+ if err := mux.Post(testEvent(0)); err != ErrMuxClosed {
+ t.Errorf("Post error mismatch, got: %s, expected: %s", err, ErrMuxClosed)
+ }
+}
+
+func TestUnsubscribeUnblockPost(t *testing.T) {
+ mux := new(TypeMux)
+ defer mux.Stop()
+
+ sub := mux.Subscribe(testEvent(0))
+ unblocked := make(chan bool)
+ go func() {
+ mux.Post(testEvent(5))
+ unblocked <- true
+ }()
+
+ select {
+ case <-unblocked:
+ t.Errorf("Post returned before Unsubscribe")
+ default:
+ sub.Unsubscribe()
+ <-unblocked
+ }
+}
+
+func TestSubscribeDuplicateType(t *testing.T) {
+ mux := new(TypeMux)
+ expected := "event: duplicate type event.testEvent in Subscribe"
+
+ defer func() {
+ err := recover()
+ if err == nil {
+ t.Errorf("Subscribe didn't panic for duplicate type")
+ } else if err != expected {
+ t.Errorf("panic mismatch: got %#v, expected %#v", err, expected)
+ }
+ }()
+ mux.Subscribe(testEvent(1), testEvent(2))
+}
+
+func TestMuxConcurrent(t *testing.T) {
+ rand.Seed(time.Now().Unix())
+ mux := new(TypeMux)
+ defer mux.Stop()
+
+ recv := make(chan int)
+ poster := func() {
+ for {
+ err := mux.Post(testEvent(0))
+ if err != nil {
+ return
+ }
+ }
+ }
+ sub := func(i int) {
+ time.Sleep(time.Duration(rand.Intn(99)) * time.Millisecond)
+ sub := mux.Subscribe(testEvent(0))
+ <-sub.Chan()
+ sub.Unsubscribe()
+ recv <- i
+ }
+
+ go poster()
+ go poster()
+ go poster()
+ nsubs := 1000
+ for i := 0; i < nsubs; i++ {
+ go sub(i)
+ }
+
+ // wait until everyone has been served
+ counts := make(map[int]int, nsubs)
+ for i := 0; i < nsubs; i++ {
+ counts[<-recv]++
+ }
+ for i, count := range counts {
+ if count != 1 {
+ t.Errorf("receiver %d called %d times, expected only 1 call", i, count)
+ }
+ }
+}
+
+func emptySubscriber(mux *TypeMux, types ...interface{}) {
+ s := mux.Subscribe(testEvent(0))
+ go func() {
+ for _ = range s.Chan() {
+ }
+ }()
+}
+
+func BenchmarkPost3(b *testing.B) {
+ var mux = new(TypeMux)
+ defer mux.Stop()
+ emptySubscriber(mux, testEvent(0))
+ emptySubscriber(mux, testEvent(0))
+ emptySubscriber(mux, testEvent(0))
+
+ for i := 0; i < b.N; i++ {
+ mux.Post(testEvent(0))
+ }
+}
+
+func BenchmarkPostConcurrent(b *testing.B) {
+ var mux = new(TypeMux)
+ defer mux.Stop()
+ emptySubscriber(mux, testEvent(0))
+ emptySubscriber(mux, testEvent(0))
+ emptySubscriber(mux, testEvent(0))
+
+ var wg sync.WaitGroup
+ poster := func() {
+ for i := 0; i < b.N; i++ {
+ mux.Post(testEvent(0))
+ }
+ wg.Done()
+ }
+ wg.Add(5)
+ for i := 0; i < 5; i++ {
+ go poster()
+ }
+ wg.Wait()
+}
+
+// for comparison
+func BenchmarkChanSend(b *testing.B) {
+ c := make(chan interface{})
+ closed := make(chan struct{})
+ go func() {
+ for _ = range c {
+ }
+ }()
+
+ for i := 0; i < b.N; i++ {
+ select {
+ case c <- i:
+ case <-closed:
+ }
+ }
+}
diff --git a/event/example_test.go b/event/example_test.go
new file mode 100644
index 000000000..2f47f6f27
--- /dev/null
+++ b/event/example_test.go
@@ -0,0 +1,42 @@
+package event
+
+import "fmt"
+
+func ExampleTypeMux() {
+ type someEvent struct{ I int }
+ type otherEvent struct{ S string }
+ type yetAnotherEvent struct{ X, Y int }
+
+ var mux TypeMux
+
+ // Start a subscriber.
+ done := make(chan struct{})
+ sub := mux.Subscribe(someEvent{}, otherEvent{})
+ go func() {
+ for event := range sub.Chan() {
+ fmt.Printf("Received: %#v\n", event)
+ }
+ fmt.Println("done")
+ close(done)
+ }()
+
+ // Post some events.
+ mux.Post(someEvent{5})
+ mux.Post(yetAnotherEvent{X: 3, Y: 4})
+ mux.Post(someEvent{6})
+ mux.Post(otherEvent{"whoa"})
+
+ // Stop closes all subscription channels.
+ // The subscriber goroutine will print "done"
+ // and exit.
+ mux.Stop()
+
+ // Wait for subscriber to return.
+ <-done
+
+ // Output:
+ // Received: event.someEvent{I:5}
+ // Received: event.someEvent{I:6}
+ // Received: event.otherEvent{S:"whoa"}
+ // done
+}
diff --git a/event/filter/eth_filter.go b/event/filter/eth_filter.go
new file mode 100644
index 000000000..dc032b5c2
--- /dev/null
+++ b/event/filter/eth_filter.go
@@ -0,0 +1,111 @@
+package filter
+
+// TODO make use of the generic filtering system
+
+import (
+ "sync"
+
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/core/state"
+)
+
+type FilterManager struct {
+ eventMux *event.TypeMux
+
+ filterMu sync.RWMutex
+ filterId int
+ filters map[int]*core.Filter
+
+ quit chan struct{}
+}
+
+func NewFilterManager(mux *event.TypeMux) *FilterManager {
+ return &FilterManager{
+ eventMux: mux,
+ filters: make(map[int]*core.Filter),
+ }
+}
+
+func (self *FilterManager) Start() {
+ go self.filterLoop()
+}
+
+func (self *FilterManager) Stop() {
+ close(self.quit)
+}
+
+func (self *FilterManager) InstallFilter(filter *core.Filter) (id int) {
+ self.filterMu.Lock()
+ defer self.filterMu.Unlock()
+ id = self.filterId
+ self.filters[id] = filter
+ self.filterId++
+
+ return id
+}
+
+func (self *FilterManager) UninstallFilter(id int) {
+ self.filterMu.Lock()
+ defer self.filterMu.Unlock()
+ if _, ok := self.filters[id]; ok {
+ delete(self.filters, id)
+ }
+}
+
+// GetFilter retrieves a filter installed using InstallFilter.
+// The filter may not be modified.
+func (self *FilterManager) GetFilter(id int) *core.Filter {
+ self.filterMu.RLock()
+ defer self.filterMu.RUnlock()
+ return self.filters[id]
+}
+
+func (self *FilterManager) filterLoop() {
+ // Subscribe to events
+ events := self.eventMux.Subscribe(
+ //core.PendingBlockEvent{},
+ core.ChainEvent{},
+ core.TxPreEvent{},
+ state.Logs(nil))
+
+out:
+ for {
+ select {
+ case <-self.quit:
+ break out
+ case event := <-events.Chan():
+ switch event := event.(type) {
+ case core.ChainEvent:
+ self.filterMu.RLock()
+ for _, filter := range self.filters {
+ if filter.BlockCallback != nil {
+ filter.BlockCallback(event.Block, event.Logs)
+ }
+ }
+ self.filterMu.RUnlock()
+
+ case core.TxPreEvent:
+ self.filterMu.RLock()
+ for _, filter := range self.filters {
+ if filter.PendingCallback != nil {
+ filter.PendingCallback(event.Tx)
+ }
+ }
+ self.filterMu.RUnlock()
+
+ case state.Logs:
+ self.filterMu.RLock()
+ for _, filter := range self.filters {
+ if filter.LogsCallback != nil {
+ msgs := filter.FilterLogs(event)
+ if len(msgs) > 0 {
+ filter.LogsCallback(msgs)
+ }
+ }
+ }
+ self.filterMu.RUnlock()
+ }
+ }
+ }
+}
diff --git a/event/filter/filter.go b/event/filter/filter.go
new file mode 100644
index 000000000..ca767f413
--- /dev/null
+++ b/event/filter/filter.go
@@ -0,0 +1,78 @@
+package filter
+
+import "reflect"
+
+type Filter interface {
+ Compare(Filter) bool
+ Trigger(data interface{})
+}
+
+type FilterEvent struct {
+ filter Filter
+ data interface{}
+}
+
+type Filters struct {
+ id int
+ watchers map[int]Filter
+ ch chan FilterEvent
+
+ quit chan struct{}
+}
+
+func New() *Filters {
+ return &Filters{
+ ch: make(chan FilterEvent),
+ watchers: make(map[int]Filter),
+ quit: make(chan struct{}),
+ }
+}
+
+func (self *Filters) Start() {
+ go self.loop()
+}
+
+func (self *Filters) Stop() {
+ close(self.quit)
+}
+
+func (self *Filters) Notify(filter Filter, data interface{}) {
+ self.ch <- FilterEvent{filter, data}
+}
+
+func (self *Filters) Install(watcher Filter) int {
+ self.watchers[self.id] = watcher
+ self.id++
+
+ return self.id - 1
+}
+
+func (self *Filters) Uninstall(id int) {
+ delete(self.watchers, id)
+}
+
+func (self *Filters) loop() {
+out:
+ for {
+ select {
+ case <-self.quit:
+ break out
+ case event := <-self.ch:
+ for _, watcher := range self.watchers {
+ if reflect.TypeOf(watcher) == reflect.TypeOf(event.filter) {
+ if watcher.Compare(event.filter) {
+ watcher.Trigger(event.data)
+ }
+ }
+ }
+ }
+ }
+}
+
+func (self *Filters) Match(a, b Filter) bool {
+ return reflect.TypeOf(a) == reflect.TypeOf(b) && a.Compare(b)
+}
+
+func (self *Filters) Get(i int) Filter {
+ return self.watchers[i]
+}
diff --git a/event/filter/filter_test.go b/event/filter/filter_test.go
new file mode 100644
index 000000000..815deb63a
--- /dev/null
+++ b/event/filter/filter_test.go
@@ -0,0 +1,34 @@
+package filter
+
+import "testing"
+
+func TestFilters(t *testing.T) {
+ var success bool
+ var failure bool
+
+ fm := New()
+ fm.Start()
+ fm.Install(Generic{
+ Str1: "hello",
+ Fn: func(data interface{}) {
+ success = data.(bool)
+ },
+ })
+ fm.Install(Generic{
+ Str1: "hello1",
+ Str2: "hello",
+ Fn: func(data interface{}) {
+ failure = true
+ },
+ })
+ fm.Notify(Generic{Str1: "hello"}, true)
+ fm.Stop()
+
+ if !success {
+ t.Error("expected 'hello' to be posted")
+ }
+
+ if failure {
+ t.Error("hello1 was triggered")
+ }
+}
diff --git a/event/filter/generic_filter.go b/event/filter/generic_filter.go
new file mode 100644
index 000000000..2ce0f0642
--- /dev/null
+++ b/event/filter/generic_filter.go
@@ -0,0 +1,32 @@
+package filter
+
+type Generic struct {
+ Str1, Str2, Str3 string
+ Data map[string]struct{}
+
+ Fn func(data interface{})
+}
+
+// self = registered, f = incoming
+func (self Generic) Compare(f Filter) bool {
+ var strMatch, dataMatch = true, true
+
+ filter := f.(Generic)
+ if (len(self.Str1) > 0 && filter.Str1 != self.Str1) ||
+ (len(self.Str2) > 0 && filter.Str2 != self.Str2) ||
+ (len(self.Str3) > 0 && filter.Str3 != self.Str3) {
+ strMatch = false
+ }
+
+ for k, _ := range self.Data {
+ if _, ok := filter.Data[k]; !ok {
+ return false
+ }
+ }
+
+ return strMatch && dataMatch
+}
+
+func (self Generic) Trigger(data interface{}) {
+ self.Fn(data)
+}