From 44eafb15e0581ef37c3e3cfeccb703381acc2ae2 Mon Sep 17 00:00:00 2001 From: obscuren Date: Sat, 7 Feb 2015 17:03:12 +0100 Subject: Renamed filter --- event/filter/eth_filter.go | 104 +++++++++++++++++++++++++++++++++++++++++++++ event/filter/old_filter.go | 103 -------------------------------------------- 2 files changed, 104 insertions(+), 103 deletions(-) create mode 100644 event/filter/eth_filter.go delete mode 100644 event/filter/old_filter.go diff --git a/event/filter/eth_filter.go b/event/filter/eth_filter.go new file mode 100644 index 000000000..295fcfbbf --- /dev/null +++ b/event/filter/eth_filter.go @@ -0,0 +1,104 @@ +package filter + +// TODO make use of the generic filtering system + +import ( + "sync" + + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/state" +) + +type FilterManager struct { + eventMux *event.TypeMux + + filterMu sync.RWMutex + filterId int + filters map[int]*core.Filter + + quit chan struct{} +} + +func NewFilterManager(mux *event.TypeMux) *FilterManager { + return &FilterManager{ + eventMux: mux, + filters: make(map[int]*core.Filter), + } +} + +func (self *FilterManager) Start() { + go self.filterLoop() +} + +func (self *FilterManager) Stop() { + close(self.quit) +} + +func (self *FilterManager) InstallFilter(filter *core.Filter) (id int) { + self.filterMu.Lock() + id = self.filterId + self.filters[id] = filter + self.filterId++ + self.filterMu.Unlock() + return id +} + +func (self *FilterManager) UninstallFilter(id int) { + self.filterMu.Lock() + delete(self.filters, id) + self.filterMu.Unlock() +} + +// GetFilter retrieves a filter installed using InstallFilter. +// The filter may not be modified. +func (self *FilterManager) GetFilter(id int) *core.Filter { + self.filterMu.RLock() + defer self.filterMu.RUnlock() + return self.filters[id] +} + +func (self *FilterManager) filterLoop() { + // Subscribe to events + events := self.eventMux.Subscribe(core.PendingBlockEvent{}, core.NewBlockEvent{}, state.Logs(nil)) + +out: + for { + select { + case <-self.quit: + break out + case event := <-events.Chan(): + switch event := event.(type) { + case core.NewBlockEvent: + self.filterMu.RLock() + for _, filter := range self.filters { + if filter.BlockCallback != nil { + filter.BlockCallback(event.Block) + } + } + self.filterMu.RUnlock() + + case core.PendingBlockEvent: + self.filterMu.RLock() + for _, filter := range self.filters { + if filter.PendingCallback != nil { + filter.PendingCallback(event.Block) + } + } + self.filterMu.RUnlock() + + case state.Logs: + self.filterMu.RLock() + for _, filter := range self.filters { + if filter.LogsCallback != nil { + msgs := filter.FilterLogs(event) + if len(msgs) > 0 { + filter.LogsCallback(msgs) + } + } + } + self.filterMu.RUnlock() + } + } + } +} diff --git a/event/filter/old_filter.go b/event/filter/old_filter.go deleted file mode 100644 index ab0127ffb..000000000 --- a/event/filter/old_filter.go +++ /dev/null @@ -1,103 +0,0 @@ -// XXX This is the old filter system specifically for messages. This is till in used and could use some refactoring -package filter - -import ( - "sync" - - "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/event" - "github.com/ethereum/go-ethereum/state" -) - -type FilterManager struct { - eventMux *event.TypeMux - - filterMu sync.RWMutex - filterId int - filters map[int]*core.Filter - - quit chan struct{} -} - -func NewFilterManager(mux *event.TypeMux) *FilterManager { - return &FilterManager{ - eventMux: mux, - filters: make(map[int]*core.Filter), - } -} - -func (self *FilterManager) Start() { - go self.filterLoop() -} - -func (self *FilterManager) Stop() { - close(self.quit) -} - -func (self *FilterManager) InstallFilter(filter *core.Filter) (id int) { - self.filterMu.Lock() - id = self.filterId - self.filters[id] = filter - self.filterId++ - self.filterMu.Unlock() - return id -} - -func (self *FilterManager) UninstallFilter(id int) { - self.filterMu.Lock() - delete(self.filters, id) - self.filterMu.Unlock() -} - -// GetFilter retrieves a filter installed using InstallFilter. -// The filter may not be modified. -func (self *FilterManager) GetFilter(id int) *core.Filter { - self.filterMu.RLock() - defer self.filterMu.RUnlock() - return self.filters[id] -} - -func (self *FilterManager) filterLoop() { - // Subscribe to events - events := self.eventMux.Subscribe(core.PendingBlockEvent{}, core.NewBlockEvent{}, state.Logs(nil)) - -out: - for { - select { - case <-self.quit: - break out - case event := <-events.Chan(): - switch event := event.(type) { - case core.NewBlockEvent: - self.filterMu.RLock() - for _, filter := range self.filters { - if filter.BlockCallback != nil { - filter.BlockCallback(event.Block) - } - } - self.filterMu.RUnlock() - - case core.PendingBlockEvent: - self.filterMu.RLock() - for _, filter := range self.filters { - if filter.PendingCallback != nil { - filter.PendingCallback(event.Block) - } - } - self.filterMu.RUnlock() - - case state.Logs: - self.filterMu.RLock() - for _, filter := range self.filters { - if filter.LogsCallback != nil { - msgs := filter.FilterLogs(event) - if len(msgs) > 0 { - filter.LogsCallback(msgs) - } - } - } - self.filterMu.RUnlock() - } - } - } -} -- cgit v1.2.3