aboutsummaryrefslogtreecommitdiffstats
path: root/eth
diff options
context:
space:
mode:
Diffstat (limited to 'eth')
-rw-r--r--eth/filters/api.go29
-rw-r--r--eth/filters/filter.go3
-rw-r--r--eth/filters/filter_system.go101
-rw-r--r--eth/filters/filter_system_test.go26
4 files changed, 119 insertions, 40 deletions
diff --git a/eth/filters/api.go b/eth/filters/api.go
index 148daa649..6cd184b80 100644
--- a/eth/filters/api.go
+++ b/eth/filters/api.go
@@ -142,7 +142,11 @@ func (s *PublicFilterAPI) NewBlockFilter() (string, error) {
s.blockMu.Lock()
filter := New(s.chainDb)
- id := s.filterManager.Add(filter)
+ id, err := s.filterManager.Add(filter, ChainFilter)
+ if err != nil {
+ return "", err
+ }
+
s.blockQueue[id] = &hashQueue{timeout: time.Now()}
filter.BlockCallback = func(block *types.Block, logs vm.Logs) {
@@ -174,7 +178,11 @@ func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
defer s.transactionMu.Unlock()
filter := New(s.chainDb)
- id := s.filterManager.Add(filter)
+ id, err := s.filterManager.Add(filter, PendingTxFilter)
+ if err != nil {
+ return "", err
+ }
+
s.transactionQueue[id] = &hashQueue{timeout: time.Now()}
filter.TransactionCallback = func(tx *types.Transaction) {
@@ -194,12 +202,16 @@ func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
}
// newLogFilter creates a new log filter.
-func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash) int {
+func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash) (int, error) {
s.logMu.Lock()
defer s.logMu.Unlock()
filter := New(s.chainDb)
- id := s.filterManager.Add(filter)
+ id, err := s.filterManager.Add(filter, LogFilter)
+ if err != nil {
+ return 0, err
+ }
+
s.logQueue[id] = &logQueue{timeout: time.Now()}
filter.SetBeginBlock(earliest)
@@ -215,7 +227,7 @@ func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []commo
}
}
- return id
+ return id, nil
}
// NewFilterArgs represents a request to create a new filter.
@@ -352,9 +364,12 @@ func (s *PublicFilterAPI) NewFilter(args NewFilterArgs) (string, error) {
var id int
if len(args.Addresses) > 0 {
- id = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics)
+ id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics)
} else {
- id = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics)
+ id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics)
+ }
+ if err != nil {
+ return "", err
}
s.filterMapMu.Lock()
diff --git a/eth/filters/filter.go b/eth/filters/filter.go
index 2c92d20b1..96af93c4a 100644
--- a/eth/filters/filter.go
+++ b/eth/filters/filter.go
@@ -18,6 +18,7 @@ package filters
import (
"math"
+ "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
@@ -32,6 +33,8 @@ type AccountChange struct {
// Filtering interface
type Filter struct {
+ created time.Time
+
db ethdb.Database
begin, end int64
addresses []common.Address
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index 04e58a08c..b61a493b6 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -19,6 +19,7 @@
package filters
import (
+ "fmt"
"sync"
"time"
@@ -27,26 +28,47 @@ import (
"github.com/ethereum/go-ethereum/event"
)
+// FilterType determines the type of filter and is used to put the filter in to
+// the correct bucket when added.
+type FilterType byte
+
+const (
+ ChainFilter FilterType = iota // new block events filter
+ PendingTxFilter // pending transaction filter
+ LogFilter // new or removed log filter
+ PendingLogFilter // pending log filter
+)
+
// FilterSystem manages filters that filter specific events such as
// block, transaction and log events. The Filtering system can be used to listen
// for specific LOG events fired by the EVM (Ethereum Virtual Machine).
type FilterSystem struct {
filterMu sync.RWMutex
filterId int
- filters map[int]*Filter
- created map[int]time.Time
- sub event.Subscription
+
+ chainFilters map[int]*Filter
+ pendingTxFilters map[int]*Filter
+ logFilters map[int]*Filter
+ pendingLogFilters map[int]*Filter
+
+ // generic is an ugly hack for Get
+ generic map[int]*Filter
+
+ sub event.Subscription
}
// NewFilterSystem returns a newly allocated filter manager
func NewFilterSystem(mux *event.TypeMux) *FilterSystem {
fs := &FilterSystem{
- filters: make(map[int]*Filter),
- created: make(map[int]time.Time),
+ chainFilters: make(map[int]*Filter),
+ pendingTxFilters: make(map[int]*Filter),
+ logFilters: make(map[int]*Filter),
+ pendingLogFilters: make(map[int]*Filter),
+ generic: make(map[int]*Filter),
}
fs.sub = mux.Subscribe(
- //core.PendingBlockEvent{},
- core.RemovedLogEvent{},
+ core.PendingLogsEvent{},
+ core.RemovedLogsEvent{},
core.ChainEvent{},
core.TxPreEvent{},
vm.Logs(nil),
@@ -61,15 +83,30 @@ func (fs *FilterSystem) Stop() {
}
// Add adds a filter to the filter manager
-func (fs *FilterSystem) Add(filter *Filter) (id int) {
+func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error) {
fs.filterMu.Lock()
defer fs.filterMu.Unlock()
- id = fs.filterId
- fs.filters[id] = filter
- fs.created[id] = time.Now()
+
+ id := fs.filterId
+ filter.created = time.Now()
+
+ switch filterType {
+ case ChainFilter:
+ fs.chainFilters[id] = filter
+ case PendingTxFilter:
+ fs.pendingTxFilters[id] = filter
+ case LogFilter:
+ fs.logFilters[id] = filter
+ case PendingLogFilter:
+ fs.pendingLogFilters[id] = filter
+ default:
+ return 0, fmt.Errorf("unknown filter type %v", filterType)
+ }
+ fs.generic[id] = filter
+
fs.filterId++
- return id
+ return id, nil
}
// Remove removes a filter by filter id
@@ -77,16 +114,18 @@ func (fs *FilterSystem) Remove(id int) {
fs.filterMu.Lock()
defer fs.filterMu.Unlock()
- delete(fs.filters, id)
- delete(fs.created, id)
+ delete(fs.chainFilters, id)
+ delete(fs.pendingTxFilters, id)
+ delete(fs.logFilters, id)
+ delete(fs.pendingLogFilters, id)
+ delete(fs.generic, id)
}
-// Get retrieves a filter installed using Add The filter may not be modified.
func (fs *FilterSystem) Get(id int) *Filter {
fs.filterMu.RLock()
defer fs.filterMu.RUnlock()
- return fs.filters[id]
+ return fs.generic[id]
}
// filterLoop waits for specific events from ethereum and fires their handlers
@@ -96,17 +135,16 @@ func (fs *FilterSystem) filterLoop() {
switch ev := event.Data.(type) {
case core.ChainEvent:
fs.filterMu.RLock()
- for id, filter := range fs.filters {
- if filter.BlockCallback != nil && !fs.created[id].After(event.Time) {
+ for _, filter := range fs.chainFilters {
+ if filter.BlockCallback != nil && !filter.created.After(event.Time) {
filter.BlockCallback(ev.Block, ev.Logs)
}
}
fs.filterMu.RUnlock()
-
case core.TxPreEvent:
fs.filterMu.RLock()
- for id, filter := range fs.filters {
- if filter.TransactionCallback != nil && !fs.created[id].After(event.Time) {
+ for _, filter := range fs.pendingTxFilters {
+ if filter.TransactionCallback != nil && !filter.created.After(event.Time) {
filter.TransactionCallback(ev.Tx)
}
}
@@ -114,25 +152,34 @@ func (fs *FilterSystem) filterLoop() {
case vm.Logs:
fs.filterMu.RLock()
- for id, filter := range fs.filters {
- if filter.LogCallback != nil && !fs.created[id].After(event.Time) {
+ for _, filter := range fs.logFilters {
+ if filter.LogCallback != nil && !filter.created.After(event.Time) {
for _, log := range filter.FilterLogs(ev) {
filter.LogCallback(log, false)
}
}
}
fs.filterMu.RUnlock()
-
- case core.RemovedLogEvent:
+ case core.RemovedLogsEvent:
fs.filterMu.RLock()
- for id, filter := range fs.filters {
- if filter.LogCallback != nil && !fs.created[id].After(event.Time) {
+ for _, filter := range fs.logFilters {
+ if filter.LogCallback != nil && !filter.created.After(event.Time) {
for _, removedLog := range ev.Logs {
filter.LogCallback(removedLog, true)
}
}
}
fs.filterMu.RUnlock()
+ case core.PendingLogsEvent:
+ fs.filterMu.RLock()
+ for _, filter := range fs.pendingLogFilters {
+ if filter.LogCallback != nil && !filter.created.After(event.Time) {
+ for _, pendingLog := range ev.Logs {
+ filter.LogCallback(pendingLog, false)
+ }
+ }
+ }
+ fs.filterMu.RUnlock()
}
}
}
diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go
index 7ddeb02bc..3ad7dd9cb 100644
--- a/eth/filters/filter_system_test.go
+++ b/eth/filters/filter_system_test.go
@@ -18,6 +18,7 @@ func TestCallbacks(t *testing.T) {
txDone = make(chan struct{})
logDone = make(chan struct{})
removedLogDone = make(chan struct{})
+ pendingLogDone = make(chan struct{})
)
blockFilter := &Filter{
@@ -37,7 +38,6 @@ func TestCallbacks(t *testing.T) {
}
},
}
-
removedLogFilter := &Filter{
LogCallback: func(l *vm.Log, oob bool) {
if oob {
@@ -45,16 +45,23 @@ func TestCallbacks(t *testing.T) {
}
},
}
+ pendingLogFilter := &Filter{
+ LogCallback: func(*vm.Log, bool) {
+ close(pendingLogDone)
+ },
+ }
- fs.Add(blockFilter)
- fs.Add(txFilter)
- fs.Add(logFilter)
- fs.Add(removedLogFilter)
+ fs.Add(blockFilter, ChainFilter)
+ fs.Add(txFilter, PendingTxFilter)
+ fs.Add(logFilter, LogFilter)
+ fs.Add(removedLogFilter, LogFilter)
+ fs.Add(pendingLogFilter, PendingLogFilter)
mux.Post(core.ChainEvent{})
mux.Post(core.TxPreEvent{})
- mux.Post(core.RemovedLogEvent{vm.Logs{&vm.Log{}}})
mux.Post(vm.Logs{&vm.Log{}})
+ mux.Post(core.RemovedLogsEvent{vm.Logs{&vm.Log{}}})
+ mux.Post(core.PendingLogsEvent{vm.Logs{&vm.Log{}}})
const dura = 5 * time.Second
failTimer := time.NewTimer(dura)
@@ -84,4 +91,11 @@ func TestCallbacks(t *testing.T) {
case <-failTimer.C:
t.Error("removed log filter failed to trigger (timeout)")
}
+
+ failTimer.Reset(dura)
+ select {
+ case <-pendingLogDone:
+ case <-failTimer.C:
+ t.Error("pending log filter failed to trigger (timout)")
+ }
}