aboutsummaryrefslogtreecommitdiffstats
path: root/eth/filters/filter_system.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/filters/filter_system.go')
-rw-r--r--eth/filters/filter_system.go101
1 files changed, 74 insertions, 27 deletions
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index 04e58a08c..b61a493b6 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -19,6 +19,7 @@
package filters
import (
+ "fmt"
"sync"
"time"
@@ -27,26 +28,47 @@ import (
"github.com/ethereum/go-ethereum/event"
)
+// FilterType determines the type of filter and is used to put the filter in to
+// the correct bucket when added.
+type FilterType byte
+
+const (
+ ChainFilter FilterType = iota // new block events filter
+ PendingTxFilter // pending transaction filter
+ LogFilter // new or removed log filter
+ PendingLogFilter // pending log filter
+)
+
// FilterSystem manages filters that filter specific events such as
// block, transaction and log events. The Filtering system can be used to listen
// for specific LOG events fired by the EVM (Ethereum Virtual Machine).
type FilterSystem struct {
filterMu sync.RWMutex
filterId int
- filters map[int]*Filter
- created map[int]time.Time
- sub event.Subscription
+
+ chainFilters map[int]*Filter
+ pendingTxFilters map[int]*Filter
+ logFilters map[int]*Filter
+ pendingLogFilters map[int]*Filter
+
+ // generic is an ugly hack for Get
+ generic map[int]*Filter
+
+ sub event.Subscription
}
// NewFilterSystem returns a newly allocated filter manager
func NewFilterSystem(mux *event.TypeMux) *FilterSystem {
fs := &FilterSystem{
- filters: make(map[int]*Filter),
- created: make(map[int]time.Time),
+ chainFilters: make(map[int]*Filter),
+ pendingTxFilters: make(map[int]*Filter),
+ logFilters: make(map[int]*Filter),
+ pendingLogFilters: make(map[int]*Filter),
+ generic: make(map[int]*Filter),
}
fs.sub = mux.Subscribe(
- //core.PendingBlockEvent{},
- core.RemovedLogEvent{},
+ core.PendingLogsEvent{},
+ core.RemovedLogsEvent{},
core.ChainEvent{},
core.TxPreEvent{},
vm.Logs(nil),
@@ -61,15 +83,30 @@ func (fs *FilterSystem) Stop() {
}
// Add adds a filter to the filter manager
-func (fs *FilterSystem) Add(filter *Filter) (id int) {
+func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error) {
fs.filterMu.Lock()
defer fs.filterMu.Unlock()
- id = fs.filterId
- fs.filters[id] = filter
- fs.created[id] = time.Now()
+
+ id := fs.filterId
+ filter.created = time.Now()
+
+ switch filterType {
+ case ChainFilter:
+ fs.chainFilters[id] = filter
+ case PendingTxFilter:
+ fs.pendingTxFilters[id] = filter
+ case LogFilter:
+ fs.logFilters[id] = filter
+ case PendingLogFilter:
+ fs.pendingLogFilters[id] = filter
+ default:
+ return 0, fmt.Errorf("unknown filter type %v", filterType)
+ }
+ fs.generic[id] = filter
+
fs.filterId++
- return id
+ return id, nil
}
// Remove removes a filter by filter id
@@ -77,16 +114,18 @@ func (fs *FilterSystem) Remove(id int) {
fs.filterMu.Lock()
defer fs.filterMu.Unlock()
- delete(fs.filters, id)
- delete(fs.created, id)
+ delete(fs.chainFilters, id)
+ delete(fs.pendingTxFilters, id)
+ delete(fs.logFilters, id)
+ delete(fs.pendingLogFilters, id)
+ delete(fs.generic, id)
}
-// Get retrieves a filter installed using Add The filter may not be modified.
func (fs *FilterSystem) Get(id int) *Filter {
fs.filterMu.RLock()
defer fs.filterMu.RUnlock()
- return fs.filters[id]
+ return fs.generic[id]
}
// filterLoop waits for specific events from ethereum and fires their handlers
@@ -96,17 +135,16 @@ func (fs *FilterSystem) filterLoop() {
switch ev := event.Data.(type) {
case core.ChainEvent:
fs.filterMu.RLock()
- for id, filter := range fs.filters {
- if filter.BlockCallback != nil && !fs.created[id].After(event.Time) {
+ for _, filter := range fs.chainFilters {
+ if filter.BlockCallback != nil && !filter.created.After(event.Time) {
filter.BlockCallback(ev.Block, ev.Logs)
}
}
fs.filterMu.RUnlock()
-
case core.TxPreEvent:
fs.filterMu.RLock()
- for id, filter := range fs.filters {
- if filter.TransactionCallback != nil && !fs.created[id].After(event.Time) {
+ for _, filter := range fs.pendingTxFilters {
+ if filter.TransactionCallback != nil && !filter.created.After(event.Time) {
filter.TransactionCallback(ev.Tx)
}
}
@@ -114,25 +152,34 @@ func (fs *FilterSystem) filterLoop() {
case vm.Logs:
fs.filterMu.RLock()
- for id, filter := range fs.filters {
- if filter.LogCallback != nil && !fs.created[id].After(event.Time) {
+ for _, filter := range fs.logFilters {
+ if filter.LogCallback != nil && !filter.created.After(event.Time) {
for _, log := range filter.FilterLogs(ev) {
filter.LogCallback(log, false)
}
}
}
fs.filterMu.RUnlock()
-
- case core.RemovedLogEvent:
+ case core.RemovedLogsEvent:
fs.filterMu.RLock()
- for id, filter := range fs.filters {
- if filter.LogCallback != nil && !fs.created[id].After(event.Time) {
+ for _, filter := range fs.logFilters {
+ if filter.LogCallback != nil && !filter.created.After(event.Time) {
for _, removedLog := range ev.Logs {
filter.LogCallback(removedLog, true)
}
}
}
fs.filterMu.RUnlock()
+ case core.PendingLogsEvent:
+ fs.filterMu.RLock()
+ for _, filter := range fs.pendingLogFilters {
+ if filter.LogCallback != nil && !filter.created.After(event.Time) {
+ for _, pendingLog := range ev.Logs {
+ filter.LogCallback(pendingLog, false)
+ }
+ }
+ }
+ fs.filterMu.RUnlock()
}
}
}