aboutsummaryrefslogtreecommitdiffstats
path: root/event
diff options
context:
space:
mode:
Diffstat (limited to 'event')
-rw-r--r--event/filter/filter.go78
-rw-r--r--event/filter/filter_test.go34
-rw-r--r--event/filter/generic_filter.go32
-rw-r--r--event/filter/old_filter.go94
4 files changed, 238 insertions, 0 deletions
diff --git a/event/filter/filter.go b/event/filter/filter.go
new file mode 100644
index 000000000..ca767f413
--- /dev/null
+++ b/event/filter/filter.go
@@ -0,0 +1,78 @@
+package filter
+
+import "reflect"
+
+type Filter interface {
+ Compare(Filter) bool
+ Trigger(data interface{})
+}
+
+type FilterEvent struct {
+ filter Filter
+ data interface{}
+}
+
+type Filters struct {
+ id int
+ watchers map[int]Filter
+ ch chan FilterEvent
+
+ quit chan struct{}
+}
+
+func New() *Filters {
+ return &Filters{
+ ch: make(chan FilterEvent),
+ watchers: make(map[int]Filter),
+ quit: make(chan struct{}),
+ }
+}
+
+func (self *Filters) Start() {
+ go self.loop()
+}
+
+func (self *Filters) Stop() {
+ close(self.quit)
+}
+
+func (self *Filters) Notify(filter Filter, data interface{}) {
+ self.ch <- FilterEvent{filter, data}
+}
+
+func (self *Filters) Install(watcher Filter) int {
+ self.watchers[self.id] = watcher
+ self.id++
+
+ return self.id - 1
+}
+
+func (self *Filters) Uninstall(id int) {
+ delete(self.watchers, id)
+}
+
+func (self *Filters) loop() {
+out:
+ for {
+ select {
+ case <-self.quit:
+ break out
+ case event := <-self.ch:
+ for _, watcher := range self.watchers {
+ if reflect.TypeOf(watcher) == reflect.TypeOf(event.filter) {
+ if watcher.Compare(event.filter) {
+ watcher.Trigger(event.data)
+ }
+ }
+ }
+ }
+ }
+}
+
+func (self *Filters) Match(a, b Filter) bool {
+ return reflect.TypeOf(a) == reflect.TypeOf(b) && a.Compare(b)
+}
+
+func (self *Filters) Get(i int) Filter {
+ return self.watchers[i]
+}
diff --git a/event/filter/filter_test.go b/event/filter/filter_test.go
new file mode 100644
index 000000000..815deb63a
--- /dev/null
+++ b/event/filter/filter_test.go
@@ -0,0 +1,34 @@
+package filter
+
+import "testing"
+
+func TestFilters(t *testing.T) {
+ var success bool
+ var failure bool
+
+ fm := New()
+ fm.Start()
+ fm.Install(Generic{
+ Str1: "hello",
+ Fn: func(data interface{}) {
+ success = data.(bool)
+ },
+ })
+ fm.Install(Generic{
+ Str1: "hello1",
+ Str2: "hello",
+ Fn: func(data interface{}) {
+ failure = true
+ },
+ })
+ fm.Notify(Generic{Str1: "hello"}, true)
+ fm.Stop()
+
+ if !success {
+ t.Error("expected 'hello' to be posted")
+ }
+
+ if failure {
+ t.Error("hello1 was triggered")
+ }
+}
diff --git a/event/filter/generic_filter.go b/event/filter/generic_filter.go
new file mode 100644
index 000000000..2ce0f0642
--- /dev/null
+++ b/event/filter/generic_filter.go
@@ -0,0 +1,32 @@
+package filter
+
+type Generic struct {
+ Str1, Str2, Str3 string
+ Data map[string]struct{}
+
+ Fn func(data interface{})
+}
+
+// self = registered, f = incoming
+func (self Generic) Compare(f Filter) bool {
+ var strMatch, dataMatch = true, true
+
+ filter := f.(Generic)
+ if (len(self.Str1) > 0 && filter.Str1 != self.Str1) ||
+ (len(self.Str2) > 0 && filter.Str2 != self.Str2) ||
+ (len(self.Str3) > 0 && filter.Str3 != self.Str3) {
+ strMatch = false
+ }
+
+ for k, _ := range self.Data {
+ if _, ok := filter.Data[k]; !ok {
+ return false
+ }
+ }
+
+ return strMatch && dataMatch
+}
+
+func (self Generic) Trigger(data interface{}) {
+ self.Fn(data)
+}
diff --git a/event/filter/old_filter.go b/event/filter/old_filter.go
new file mode 100644
index 000000000..c30a7e584
--- /dev/null
+++ b/event/filter/old_filter.go
@@ -0,0 +1,94 @@
+// XXX This is the old filter system specifically for messages. This is till in used and could use some refactoring
+package filter
+
+import (
+ "sync"
+
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/state"
+)
+
+type FilterManager struct {
+ eventMux *event.TypeMux
+
+ filterMu sync.RWMutex
+ filterId int
+ filters map[int]*core.Filter
+
+ quit chan struct{}
+}
+
+func NewFilterManager(mux *event.TypeMux) *FilterManager {
+ return &FilterManager{
+ eventMux: mux,
+ filters: make(map[int]*core.Filter),
+ }
+}
+
+func (self *FilterManager) Start() {
+ go self.filterLoop()
+}
+
+func (self *FilterManager) Stop() {
+ close(self.quit)
+}
+
+func (self *FilterManager) InstallFilter(filter *core.Filter) (id int) {
+ self.filterMu.Lock()
+ id = self.filterId
+ self.filters[id] = filter
+ self.filterId++
+ self.filterMu.Unlock()
+ return id
+}
+
+func (self *FilterManager) UninstallFilter(id int) {
+ self.filterMu.Lock()
+ delete(self.filters, id)
+ self.filterMu.Unlock()
+}
+
+// GetFilter retrieves a filter installed using InstallFilter.
+// The filter may not be modified.
+func (self *FilterManager) GetFilter(id int) *core.Filter {
+ self.filterMu.RLock()
+ defer self.filterMu.RUnlock()
+ return self.filters[id]
+}
+
+func (self *FilterManager) filterLoop() {
+ // Subscribe to events
+ events := self.eventMux.Subscribe(core.NewBlockEvent{}, state.Messages(nil))
+
+out:
+ for {
+ select {
+ case <-self.quit:
+ break out
+ case event := <-events.Chan():
+ switch event := event.(type) {
+ case core.NewBlockEvent:
+ self.filterMu.RLock()
+ for _, filter := range self.filters {
+ if filter.BlockCallback != nil {
+ filter.BlockCallback(event.Block)
+ }
+ }
+ self.filterMu.RUnlock()
+
+ case state.Logs:
+ self.filterMu.RLock()
+ for _, filter := range self.filters {
+ if filter.LogsCallback != nil {
+ msgs := filter.FilterLogs(event)
+ if len(msgs) > 0 {
+ filter.LogsCallback(msgs)
+ }
+ }
+ }
+ self.filterMu.RUnlock()
+ }
+ }
+ }
+}