diff options
Diffstat (limited to 'eth/filters/filter_system.go')
-rw-r--r-- | eth/filters/filter_system.go | 101 |
1 files changed, 74 insertions, 27 deletions
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 04e58a08c..b61a493b6 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -19,6 +19,7 @@ package filters import ( + "fmt" "sync" "time" @@ -27,26 +28,47 @@ import ( "github.com/ethereum/go-ethereum/event" ) +// FilterType determines the type of filter and is used to put the filter in to +// the correct bucket when added. +type FilterType byte + +const ( + ChainFilter FilterType = iota // new block events filter + PendingTxFilter // pending transaction filter + LogFilter // new or removed log filter + PendingLogFilter // pending log filter +) + // FilterSystem manages filters that filter specific events such as // block, transaction and log events. The Filtering system can be used to listen // for specific LOG events fired by the EVM (Ethereum Virtual Machine). type FilterSystem struct { filterMu sync.RWMutex filterId int - filters map[int]*Filter - created map[int]time.Time - sub event.Subscription + + chainFilters map[int]*Filter + pendingTxFilters map[int]*Filter + logFilters map[int]*Filter + pendingLogFilters map[int]*Filter + + // generic is an ugly hack for Get + generic map[int]*Filter + + sub event.Subscription } // NewFilterSystem returns a newly allocated filter manager func NewFilterSystem(mux *event.TypeMux) *FilterSystem { fs := &FilterSystem{ - filters: make(map[int]*Filter), - created: make(map[int]time.Time), + chainFilters: make(map[int]*Filter), + pendingTxFilters: make(map[int]*Filter), + logFilters: make(map[int]*Filter), + pendingLogFilters: make(map[int]*Filter), + generic: make(map[int]*Filter), } fs.sub = mux.Subscribe( - //core.PendingBlockEvent{}, - core.RemovedLogEvent{}, + core.PendingLogsEvent{}, + core.RemovedLogsEvent{}, core.ChainEvent{}, core.TxPreEvent{}, vm.Logs(nil), @@ -61,15 +83,30 @@ func (fs *FilterSystem) Stop() { } // Add adds a filter to the filter manager -func (fs *FilterSystem) Add(filter *Filter) (id int) { +func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error) { fs.filterMu.Lock() defer fs.filterMu.Unlock() - id = fs.filterId - fs.filters[id] = filter - fs.created[id] = time.Now() + + id := fs.filterId + filter.created = time.Now() + + switch filterType { + case ChainFilter: + fs.chainFilters[id] = filter + case PendingTxFilter: + fs.pendingTxFilters[id] = filter + case LogFilter: + fs.logFilters[id] = filter + case PendingLogFilter: + fs.pendingLogFilters[id] = filter + default: + return 0, fmt.Errorf("unknown filter type %v", filterType) + } + fs.generic[id] = filter + fs.filterId++ - return id + return id, nil } // Remove removes a filter by filter id @@ -77,16 +114,18 @@ func (fs *FilterSystem) Remove(id int) { fs.filterMu.Lock() defer fs.filterMu.Unlock() - delete(fs.filters, id) - delete(fs.created, id) + delete(fs.chainFilters, id) + delete(fs.pendingTxFilters, id) + delete(fs.logFilters, id) + delete(fs.pendingLogFilters, id) + delete(fs.generic, id) } -// Get retrieves a filter installed using Add The filter may not be modified. func (fs *FilterSystem) Get(id int) *Filter { fs.filterMu.RLock() defer fs.filterMu.RUnlock() - return fs.filters[id] + return fs.generic[id] } // filterLoop waits for specific events from ethereum and fires their handlers @@ -96,17 +135,16 @@ func (fs *FilterSystem) filterLoop() { switch ev := event.Data.(type) { case core.ChainEvent: fs.filterMu.RLock() - for id, filter := range fs.filters { - if filter.BlockCallback != nil && !fs.created[id].After(event.Time) { + for _, filter := range fs.chainFilters { + if filter.BlockCallback != nil && !filter.created.After(event.Time) { filter.BlockCallback(ev.Block, ev.Logs) } } fs.filterMu.RUnlock() - case core.TxPreEvent: fs.filterMu.RLock() - for id, filter := range fs.filters { - if filter.TransactionCallback != nil && !fs.created[id].After(event.Time) { + for _, filter := range fs.pendingTxFilters { + if filter.TransactionCallback != nil && !filter.created.After(event.Time) { filter.TransactionCallback(ev.Tx) } } @@ -114,25 +152,34 @@ func (fs *FilterSystem) filterLoop() { case vm.Logs: fs.filterMu.RLock() - for id, filter := range fs.filters { - if filter.LogCallback != nil && !fs.created[id].After(event.Time) { + for _, filter := range fs.logFilters { + if filter.LogCallback != nil && !filter.created.After(event.Time) { for _, log := range filter.FilterLogs(ev) { filter.LogCallback(log, false) } } } fs.filterMu.RUnlock() - - case core.RemovedLogEvent: + case core.RemovedLogsEvent: fs.filterMu.RLock() - for id, filter := range fs.filters { - if filter.LogCallback != nil && !fs.created[id].After(event.Time) { + for _, filter := range fs.logFilters { + if filter.LogCallback != nil && !filter.created.After(event.Time) { for _, removedLog := range ev.Logs { filter.LogCallback(removedLog, true) } } } fs.filterMu.RUnlock() + case core.PendingLogsEvent: + fs.filterMu.RLock() + for _, filter := range fs.pendingLogFilters { + if filter.LogCallback != nil && !filter.created.After(event.Time) { + for _, pendingLog := range ev.Logs { + filter.LogCallback(pendingLog, false) + } + } + } + fs.filterMu.RUnlock() } } } |