aboutsummaryrefslogtreecommitdiffstats
path: root/eth/filters
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-10-12 20:04:38 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-10-12 21:22:03 +0800
commit402fd6e8c6a2e379351e0aae10a833fae6bcae6c (patch)
tree30ab93e93af7c70e5df213eb3665f51a293bc4a9 /eth/filters
parent315a422ba754eae10db21990a809f608f7af62d4 (diff)
downloadgo-tangerine-402fd6e8c6a2e379351e0aae10a833fae6bcae6c.tar
go-tangerine-402fd6e8c6a2e379351e0aae10a833fae6bcae6c.tar.gz
go-tangerine-402fd6e8c6a2e379351e0aae10a833fae6bcae6c.tar.bz2
go-tangerine-402fd6e8c6a2e379351e0aae10a833fae6bcae6c.tar.lz
go-tangerine-402fd6e8c6a2e379351e0aae10a833fae6bcae6c.tar.xz
go-tangerine-402fd6e8c6a2e379351e0aae10a833fae6bcae6c.tar.zst
go-tangerine-402fd6e8c6a2e379351e0aae10a833fae6bcae6c.zip
core, eth, event, miner, xeth: fix event post / subscription race
Diffstat (limited to 'eth/filters')
-rw-r--r--eth/filters/filter_system.go44
1 files changed, 28 insertions, 16 deletions
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index 4972dcd59..ae6093525 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -20,6 +20,7 @@ package filters
import (
"sync"
+ "time"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/vm"
@@ -35,6 +36,7 @@ type FilterSystem struct {
filterMu sync.RWMutex
filterId int
filters map[int]*Filter
+ created map[int]time.Time
quit chan struct{}
}
@@ -44,6 +46,7 @@ func NewFilterSystem(mux *event.TypeMux) *FilterSystem {
fs := &FilterSystem{
eventMux: mux,
filters: make(map[int]*Filter),
+ created: make(map[int]time.Time),
}
go fs.filterLoop()
return fs
@@ -60,6 +63,7 @@ func (fs *FilterSystem) Add(filter *Filter) (id int) {
defer fs.filterMu.Unlock()
id = fs.filterId
fs.filters[id] = filter
+ fs.created[id] = time.Now()
fs.filterId++
return id
@@ -69,15 +73,16 @@ func (fs *FilterSystem) Add(filter *Filter) (id int) {
func (fs *FilterSystem) Remove(id int) {
fs.filterMu.Lock()
defer fs.filterMu.Unlock()
- if _, ok := fs.filters[id]; ok {
- delete(fs.filters, id)
- }
+
+ delete(fs.filters, id)
+ delete(fs.created, id)
}
// Get retrieves a filter installed using Add The filter may not be modified.
func (fs *FilterSystem) Get(id int) *Filter {
fs.filterMu.RLock()
defer fs.filterMu.RUnlock()
+
return fs.filters[id]
}
@@ -85,42 +90,49 @@ func (fs *FilterSystem) Get(id int) *Filter {
// when the filter matches the requirements.
func (fs *FilterSystem) filterLoop() {
// Subscribe to events
- events := fs.eventMux.Subscribe(
+ eventCh := fs.eventMux.Subscribe(
//core.PendingBlockEvent{},
core.ChainEvent{},
core.TxPreEvent{},
- vm.Logs(nil))
+ vm.Logs(nil),
+ ).Chan()
out:
for {
select {
case <-fs.quit:
break out
- case event := <-events.Chan():
- switch event := event.(type) {
+ case event, ok := <-eventCh:
+ if !ok {
+ // Event subscription closed, set the channel to nil to stop spinning
+ eventCh = nil
+ continue
+ }
+ // A real event arrived, notify the registered filters
+ switch ev := event.Data.(type) {
case core.ChainEvent:
fs.filterMu.RLock()
- for _, filter := range fs.filters {
- if filter.BlockCallback != nil {
- filter.BlockCallback(event.Block, event.Logs)
+ for id, filter := range fs.filters {
+ if filter.BlockCallback != nil && fs.created[id].Before(event.Time) {
+ filter.BlockCallback(ev.Block, ev.Logs)
}
}
fs.filterMu.RUnlock()
case core.TxPreEvent:
fs.filterMu.RLock()
- for _, filter := range fs.filters {
- if filter.TransactionCallback != nil {
- filter.TransactionCallback(event.Tx)
+ for id, filter := range fs.filters {
+ if filter.TransactionCallback != nil && fs.created[id].Before(event.Time) {
+ filter.TransactionCallback(ev.Tx)
}
}
fs.filterMu.RUnlock()
case vm.Logs:
fs.filterMu.RLock()
- for _, filter := range fs.filters {
- if filter.LogsCallback != nil {
- msgs := filter.FilterLogs(event)
+ for id, filter := range fs.filters {
+ if filter.LogsCallback != nil && fs.created[id].Before(event.Time) {
+ msgs := filter.FilterLogs(ev)
if len(msgs) > 0 {
filter.LogsCallback(msgs)
}