aboutsummaryrefslogtreecommitdiffstats
path: root/eth
diff options
context:
space:
mode:
authorJeffrey Wilcke <jeffrey@ethereum.org>2015-10-30 00:42:55 +0800
committerJeffrey Wilcke <jeffrey@ethereum.org>2015-10-30 00:42:55 +0800
commitfc46cf337af614f4f9c96acd222089652fe7c76e (patch)
tree672ce52e11b768801f0b33f224424ba5f0fdc465 /eth
parentfd27f074feecec2f1e4c8041ff04ddac8d0ab6a3 (diff)
parentfbdb44dcc17240a01b45e55d3aa4e4b8db0868cd (diff)
downloadgo-tangerine-fc46cf337af614f4f9c96acd222089652fe7c76e.tar
go-tangerine-fc46cf337af614f4f9c96acd222089652fe7c76e.tar.gz
go-tangerine-fc46cf337af614f4f9c96acd222089652fe7c76e.tar.bz2
go-tangerine-fc46cf337af614f4f9c96acd222089652fe7c76e.tar.lz
go-tangerine-fc46cf337af614f4f9c96acd222089652fe7c76e.tar.xz
go-tangerine-fc46cf337af614f4f9c96acd222089652fe7c76e.tar.zst
go-tangerine-fc46cf337af614f4f9c96acd222089652fe7c76e.zip
Merge pull request #1946 from fjl/xeth-oom
Fix for xeth OOM issue
Diffstat (limited to 'eth')
-rw-r--r--eth/filters/filter_system.go88
1 files changed, 35 insertions, 53 deletions
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index ae6093525..df3ce90c6 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -31,30 +31,32 @@ import (
// block, transaction and log events. The Filtering system can be used to listen
// for specific LOG events fired by the EVM (Ethereum Virtual Machine).
type FilterSystem struct {
- eventMux *event.TypeMux
-
filterMu sync.RWMutex
filterId int
filters map[int]*Filter
created map[int]time.Time
-
- quit chan struct{}
+ sub event.Subscription
}
// NewFilterSystem returns a newly allocated filter manager
func NewFilterSystem(mux *event.TypeMux) *FilterSystem {
fs := &FilterSystem{
- eventMux: mux,
- filters: make(map[int]*Filter),
- created: make(map[int]time.Time),
+ filters: make(map[int]*Filter),
+ created: make(map[int]time.Time),
}
+ fs.sub = mux.Subscribe(
+ //core.PendingBlockEvent{},
+ core.ChainEvent{},
+ core.TxPreEvent{},
+ vm.Logs(nil),
+ )
go fs.filterLoop()
return fs
}
// Stop quits the filter loop required for polling events
func (fs *FilterSystem) Stop() {
- close(fs.quit)
+ fs.sub.Unsubscribe()
}
// Add adds a filter to the filter manager
@@ -89,57 +91,37 @@ func (fs *FilterSystem) Get(id int) *Filter {
// filterLoop waits for specific events from ethereum and fires their handlers
// when the filter matches the requirements.
func (fs *FilterSystem) filterLoop() {
- // Subscribe to events
- eventCh := fs.eventMux.Subscribe(
- //core.PendingBlockEvent{},
- core.ChainEvent{},
- core.TxPreEvent{},
- vm.Logs(nil),
- ).Chan()
-
-out:
- for {
- select {
- case <-fs.quit:
- break out
- case event, ok := <-eventCh:
- if !ok {
- // Event subscription closed, set the channel to nil to stop spinning
- eventCh = nil
- continue
- }
- // A real event arrived, notify the registered filters
- switch ev := event.Data.(type) {
- case core.ChainEvent:
- fs.filterMu.RLock()
- for id, filter := range fs.filters {
- if filter.BlockCallback != nil && fs.created[id].Before(event.Time) {
- filter.BlockCallback(ev.Block, ev.Logs)
- }
+ for event := range fs.sub.Chan() {
+ switch ev := event.Data.(type) {
+ case core.ChainEvent:
+ fs.filterMu.RLock()
+ for id, filter := range fs.filters {
+ if filter.BlockCallback != nil && fs.created[id].Before(event.Time) {
+ filter.BlockCallback(ev.Block, ev.Logs)
}
- fs.filterMu.RUnlock()
+ }
+ fs.filterMu.RUnlock()
- case core.TxPreEvent:
- fs.filterMu.RLock()
- for id, filter := range fs.filters {
- if filter.TransactionCallback != nil && fs.created[id].Before(event.Time) {
- filter.TransactionCallback(ev.Tx)
- }
+ case core.TxPreEvent:
+ fs.filterMu.RLock()
+ for id, filter := range fs.filters {
+ if filter.TransactionCallback != nil && fs.created[id].Before(event.Time) {
+ filter.TransactionCallback(ev.Tx)
}
- fs.filterMu.RUnlock()
-
- case vm.Logs:
- fs.filterMu.RLock()
- for id, filter := range fs.filters {
- if filter.LogsCallback != nil && fs.created[id].Before(event.Time) {
- msgs := filter.FilterLogs(ev)
- if len(msgs) > 0 {
- filter.LogsCallback(msgs)
- }
+ }
+ fs.filterMu.RUnlock()
+
+ case vm.Logs:
+ fs.filterMu.RLock()
+ for id, filter := range fs.filters {
+ if filter.LogsCallback != nil && fs.created[id].Before(event.Time) {
+ msgs := filter.FilterLogs(ev)
+ if len(msgs) > 0 {
+ filter.LogsCallback(msgs)
}
}
- fs.filterMu.RUnlock()
}
+ fs.filterMu.RUnlock()
}
}
}