aboutsummaryrefslogtreecommitdiffstats
path: root/eth/filters/filter_system.go
diff options
context:
space:
mode:
authorJeffrey Wilcke <geffobscura@gmail.com>2016-02-13 08:40:44 +0800
committerJeffrey Wilcke <geffobscura@gmail.com>2016-02-13 20:14:02 +0800
commit987c1a595a6a318ac83b68e3b87812497070bf9b (patch)
tree30859d085d7533c7ad610eed8a186f88724eadfe /eth/filters/filter_system.go
parentb05e472c076d30035233d6a8b5fb3360b236e3ff (diff)
downloadgo-tangerine-987c1a595a6a318ac83b68e3b87812497070bf9b.tar
go-tangerine-987c1a595a6a318ac83b68e3b87812497070bf9b.tar.gz
go-tangerine-987c1a595a6a318ac83b68e3b87812497070bf9b.tar.bz2
go-tangerine-987c1a595a6a318ac83b68e3b87812497070bf9b.tar.lz
go-tangerine-987c1a595a6a318ac83b68e3b87812497070bf9b.tar.xz
go-tangerine-987c1a595a6a318ac83b68e3b87812497070bf9b.tar.zst
go-tangerine-987c1a595a6a318ac83b68e3b87812497070bf9b.zip
eth/filters: ✨ pending logs ✨
Pending logs are now filterable through the Go API. Filter API changed such that each filter type has it's own bucket and adding filter explicitly requires you specify the bucket to put it in.
Diffstat (limited to 'eth/filters/filter_system.go')
-rw-r--r--eth/filters/filter_system.go101
1 files changed, 74 insertions, 27 deletions
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index 04e58a08c..b61a493b6 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -19,6 +19,7 @@
package filters
import (
+ "fmt"
"sync"
"time"
@@ -27,26 +28,47 @@ import (
"github.com/ethereum/go-ethereum/event"
)
+// FilterType determines the type of filter and is used to put the filter in to
+// the correct bucket when added.
+type FilterType byte
+
+const (
+ ChainFilter FilterType = iota // new block events filter
+ PendingTxFilter // pending transaction filter
+ LogFilter // new or removed log filter
+ PendingLogFilter // pending log filter
+)
+
// FilterSystem manages filters that filter specific events such as
// block, transaction and log events. The Filtering system can be used to listen
// for specific LOG events fired by the EVM (Ethereum Virtual Machine).
type FilterSystem struct {
filterMu sync.RWMutex
filterId int
- filters map[int]*Filter
- created map[int]time.Time
- sub event.Subscription
+
+ chainFilters map[int]*Filter
+ pendingTxFilters map[int]*Filter
+ logFilters map[int]*Filter
+ pendingLogFilters map[int]*Filter
+
+ // generic is an ugly hack for Get
+ generic map[int]*Filter
+
+ sub event.Subscription
}
// NewFilterSystem returns a newly allocated filter manager
func NewFilterSystem(mux *event.TypeMux) *FilterSystem {
fs := &FilterSystem{
- filters: make(map[int]*Filter),
- created: make(map[int]time.Time),
+ chainFilters: make(map[int]*Filter),
+ pendingTxFilters: make(map[int]*Filter),
+ logFilters: make(map[int]*Filter),
+ pendingLogFilters: make(map[int]*Filter),
+ generic: make(map[int]*Filter),
}
fs.sub = mux.Subscribe(
- //core.PendingBlockEvent{},
- core.RemovedLogEvent{},
+ core.PendingLogsEvent{},
+ core.RemovedLogsEvent{},
core.ChainEvent{},
core.TxPreEvent{},
vm.Logs(nil),
@@ -61,15 +83,30 @@ func (fs *FilterSystem) Stop() {
}
// Add adds a filter to the filter manager
-func (fs *FilterSystem) Add(filter *Filter) (id int) {
+func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error) {
fs.filterMu.Lock()
defer fs.filterMu.Unlock()
- id = fs.filterId
- fs.filters[id] = filter
- fs.created[id] = time.Now()
+
+ id := fs.filterId
+ filter.created = time.Now()
+
+ switch filterType {
+ case ChainFilter:
+ fs.chainFilters[id] = filter
+ case PendingTxFilter:
+ fs.pendingTxFilters[id] = filter
+ case LogFilter:
+ fs.logFilters[id] = filter
+ case PendingLogFilter:
+ fs.pendingLogFilters[id] = filter
+ default:
+ return 0, fmt.Errorf("unknown filter type %v", filterType)
+ }
+ fs.generic[id] = filter
+
fs.filterId++
- return id
+ return id, nil
}
// Remove removes a filter by filter id
@@ -77,16 +114,18 @@ func (fs *FilterSystem) Remove(id int) {
fs.filterMu.Lock()
defer fs.filterMu.Unlock()
- delete(fs.filters, id)
- delete(fs.created, id)
+ delete(fs.chainFilters, id)
+ delete(fs.pendingTxFilters, id)
+ delete(fs.logFilters, id)
+ delete(fs.pendingLogFilters, id)
+ delete(fs.generic, id)
}
-// Get retrieves a filter installed using Add The filter may not be modified.
func (fs *FilterSystem) Get(id int) *Filter {
fs.filterMu.RLock()
defer fs.filterMu.RUnlock()
- return fs.filters[id]
+ return fs.generic[id]
}
// filterLoop waits for specific events from ethereum and fires their handlers
@@ -96,17 +135,16 @@ func (fs *FilterSystem) filterLoop() {
switch ev := event.Data.(type) {
case core.ChainEvent:
fs.filterMu.RLock()
- for id, filter := range fs.filters {
- if filter.BlockCallback != nil && !fs.created[id].After(event.Time) {
+ for _, filter := range fs.chainFilters {
+ if filter.BlockCallback != nil && !filter.created.After(event.Time) {
filter.BlockCallback(ev.Block, ev.Logs)
}
}
fs.filterMu.RUnlock()
-
case core.TxPreEvent:
fs.filterMu.RLock()
- for id, filter := range fs.filters {
- if filter.TransactionCallback != nil && !fs.created[id].After(event.Time) {
+ for _, filter := range fs.pendingTxFilters {
+ if filter.TransactionCallback != nil && !filter.created.After(event.Time) {
filter.TransactionCallback(ev.Tx)
}
}
@@ -114,25 +152,34 @@ func (fs *FilterSystem) filterLoop() {
case vm.Logs:
fs.filterMu.RLock()
- for id, filter := range fs.filters {
- if filter.LogCallback != nil && !fs.created[id].After(event.Time) {
+ for _, filter := range fs.logFilters {
+ if filter.LogCallback != nil && !filter.created.After(event.Time) {
for _, log := range filter.FilterLogs(ev) {
filter.LogCallback(log, false)
}
}
}
fs.filterMu.RUnlock()
-
- case core.RemovedLogEvent:
+ case core.RemovedLogsEvent:
fs.filterMu.RLock()
- for id, filter := range fs.filters {
- if filter.LogCallback != nil && !fs.created[id].After(event.Time) {
+ for _, filter := range fs.logFilters {
+ if filter.LogCallback != nil && !filter.created.After(event.Time) {
for _, removedLog := range ev.Logs {
filter.LogCallback(removedLog, true)
}
}
}
fs.filterMu.RUnlock()
+ case core.PendingLogsEvent:
+ fs.filterMu.RLock()
+ for _, filter := range fs.pendingLogFilters {
+ if filter.LogCallback != nil && !filter.created.After(event.Time) {
+ for _, pendingLog := range ev.Logs {
+ filter.LogCallback(pendingLog, false)
+ }
+ }
+ }
+ fs.filterMu.RUnlock()
}
}
}