diff options
Diffstat (limited to 'event')
-rw-r--r-- | event/filter/filter.go | 78 | ||||
-rw-r--r-- | event/filter/filter_test.go | 34 | ||||
-rw-r--r-- | event/filter/generic_filter.go | 32 | ||||
-rw-r--r-- | event/filter/old_filter.go | 94 |
4 files changed, 238 insertions, 0 deletions
diff --git a/event/filter/filter.go b/event/filter/filter.go new file mode 100644 index 000000000..ca767f413 --- /dev/null +++ b/event/filter/filter.go @@ -0,0 +1,78 @@ +package filter + +import "reflect" + +type Filter interface { + Compare(Filter) bool + Trigger(data interface{}) +} + +type FilterEvent struct { + filter Filter + data interface{} +} + +type Filters struct { + id int + watchers map[int]Filter + ch chan FilterEvent + + quit chan struct{} +} + +func New() *Filters { + return &Filters{ + ch: make(chan FilterEvent), + watchers: make(map[int]Filter), + quit: make(chan struct{}), + } +} + +func (self *Filters) Start() { + go self.loop() +} + +func (self *Filters) Stop() { + close(self.quit) +} + +func (self *Filters) Notify(filter Filter, data interface{}) { + self.ch <- FilterEvent{filter, data} +} + +func (self *Filters) Install(watcher Filter) int { + self.watchers[self.id] = watcher + self.id++ + + return self.id - 1 +} + +func (self *Filters) Uninstall(id int) { + delete(self.watchers, id) +} + +func (self *Filters) loop() { +out: + for { + select { + case <-self.quit: + break out + case event := <-self.ch: + for _, watcher := range self.watchers { + if reflect.TypeOf(watcher) == reflect.TypeOf(event.filter) { + if watcher.Compare(event.filter) { + watcher.Trigger(event.data) + } + } + } + } + } +} + +func (self *Filters) Match(a, b Filter) bool { + return reflect.TypeOf(a) == reflect.TypeOf(b) && a.Compare(b) +} + +func (self *Filters) Get(i int) Filter { + return self.watchers[i] +} diff --git a/event/filter/filter_test.go b/event/filter/filter_test.go new file mode 100644 index 000000000..815deb63a --- /dev/null +++ b/event/filter/filter_test.go @@ -0,0 +1,34 @@ +package filter + +import "testing" + +func TestFilters(t *testing.T) { + var success bool + var failure bool + + fm := New() + fm.Start() + fm.Install(Generic{ + Str1: "hello", + Fn: func(data interface{}) { + success = data.(bool) + }, + }) + fm.Install(Generic{ + Str1: "hello1", + Str2: "hello", + Fn: func(data interface{}) { + failure = true + }, + }) + fm.Notify(Generic{Str1: "hello"}, true) + fm.Stop() + + if !success { + t.Error("expected 'hello' to be posted") + } + + if failure { + t.Error("hello1 was triggered") + } +} diff --git a/event/filter/generic_filter.go b/event/filter/generic_filter.go new file mode 100644 index 000000000..2ce0f0642 --- /dev/null +++ b/event/filter/generic_filter.go @@ -0,0 +1,32 @@ +package filter + +type Generic struct { + Str1, Str2, Str3 string + Data map[string]struct{} + + Fn func(data interface{}) +} + +// self = registered, f = incoming +func (self Generic) Compare(f Filter) bool { + var strMatch, dataMatch = true, true + + filter := f.(Generic) + if (len(self.Str1) > 0 && filter.Str1 != self.Str1) || + (len(self.Str2) > 0 && filter.Str2 != self.Str2) || + (len(self.Str3) > 0 && filter.Str3 != self.Str3) { + strMatch = false + } + + for k, _ := range self.Data { + if _, ok := filter.Data[k]; !ok { + return false + } + } + + return strMatch && dataMatch +} + +func (self Generic) Trigger(data interface{}) { + self.Fn(data) +} diff --git a/event/filter/old_filter.go b/event/filter/old_filter.go new file mode 100644 index 000000000..c30a7e584 --- /dev/null +++ b/event/filter/old_filter.go @@ -0,0 +1,94 @@ +// XXX This is the old filter system specifically for messages. This is till in used and could use some refactoring +package filter + +import ( + "sync" + + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/state" +) + +type FilterManager struct { + eventMux *event.TypeMux + + filterMu sync.RWMutex + filterId int + filters map[int]*core.Filter + + quit chan struct{} +} + +func NewFilterManager(mux *event.TypeMux) *FilterManager { + return &FilterManager{ + eventMux: mux, + filters: make(map[int]*core.Filter), + } +} + +func (self *FilterManager) Start() { + go self.filterLoop() +} + +func (self *FilterManager) Stop() { + close(self.quit) +} + +func (self *FilterManager) InstallFilter(filter *core.Filter) (id int) { + self.filterMu.Lock() + id = self.filterId + self.filters[id] = filter + self.filterId++ + self.filterMu.Unlock() + return id +} + +func (self *FilterManager) UninstallFilter(id int) { + self.filterMu.Lock() + delete(self.filters, id) + self.filterMu.Unlock() +} + +// GetFilter retrieves a filter installed using InstallFilter. +// The filter may not be modified. +func (self *FilterManager) GetFilter(id int) *core.Filter { + self.filterMu.RLock() + defer self.filterMu.RUnlock() + return self.filters[id] +} + +func (self *FilterManager) filterLoop() { + // Subscribe to events + events := self.eventMux.Subscribe(core.NewBlockEvent{}, state.Messages(nil)) + +out: + for { + select { + case <-self.quit: + break out + case event := <-events.Chan(): + switch event := event.(type) { + case core.NewBlockEvent: + self.filterMu.RLock() + for _, filter := range self.filters { + if filter.BlockCallback != nil { + filter.BlockCallback(event.Block) + } + } + self.filterMu.RUnlock() + + case state.Logs: + self.filterMu.RLock() + for _, filter := range self.filters { + if filter.LogsCallback != nil { + msgs := filter.FilterLogs(event) + if len(msgs) > 0 { + filter.LogsCallback(msgs) + } + } + } + self.filterMu.RUnlock() + } + } + } +} |