aboutsummaryrefslogtreecommitdiffstats
path: root/eth
diff options
context:
space:
mode:
authorJeffrey Wilcke <jeffrey@ethereum.org>2015-10-16 01:44:30 +0800
committerJeffrey Wilcke <jeffrey@ethereum.org>2015-10-16 01:44:30 +0800
commitcefe5c80b1cdcab606a169c0be65d9d2ba9bc941 (patch)
treedbbb116fa71ee396fdeb743a725b14007b43e845 /eth
parent2f1f2e4811a6f3094f99b55f6553fe27d83f9aad (diff)
parent402fd6e8c6a2e379351e0aae10a833fae6bcae6c (diff)
downloadgo-tangerine-cefe5c80b1cdcab606a169c0be65d9d2ba9bc941.tar
go-tangerine-cefe5c80b1cdcab606a169c0be65d9d2ba9bc941.tar.gz
go-tangerine-cefe5c80b1cdcab606a169c0be65d9d2ba9bc941.tar.bz2
go-tangerine-cefe5c80b1cdcab606a169c0be65d9d2ba9bc941.tar.lz
go-tangerine-cefe5c80b1cdcab606a169c0be65d9d2ba9bc941.tar.xz
go-tangerine-cefe5c80b1cdcab606a169c0be65d9d2ba9bc941.tar.zst
go-tangerine-cefe5c80b1cdcab606a169c0be65d9d2ba9bc941.zip
Merge pull request #1898 from karalabe/eventmux-post-race
core, eth, event, miner, xeth: fix event post / subscription race
Diffstat (limited to 'eth')
-rw-r--r--eth/filters/filter_system.go44
-rw-r--r--eth/gasprice.go15
-rw-r--r--eth/handler.go4
3 files changed, 36 insertions, 27 deletions
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index 4972dcd59..ae6093525 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -20,6 +20,7 @@ package filters
import (
"sync"
+ "time"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/vm"
@@ -35,6 +36,7 @@ type FilterSystem struct {
filterMu sync.RWMutex
filterId int
filters map[int]*Filter
+ created map[int]time.Time
quit chan struct{}
}
@@ -44,6 +46,7 @@ func NewFilterSystem(mux *event.TypeMux) *FilterSystem {
fs := &FilterSystem{
eventMux: mux,
filters: make(map[int]*Filter),
+ created: make(map[int]time.Time),
}
go fs.filterLoop()
return fs
@@ -60,6 +63,7 @@ func (fs *FilterSystem) Add(filter *Filter) (id int) {
defer fs.filterMu.Unlock()
id = fs.filterId
fs.filters[id] = filter
+ fs.created[id] = time.Now()
fs.filterId++
return id
@@ -69,15 +73,16 @@ func (fs *FilterSystem) Add(filter *Filter) (id int) {
func (fs *FilterSystem) Remove(id int) {
fs.filterMu.Lock()
defer fs.filterMu.Unlock()
- if _, ok := fs.filters[id]; ok {
- delete(fs.filters, id)
- }
+
+ delete(fs.filters, id)
+ delete(fs.created, id)
}
// Get retrieves a filter installed using Add The filter may not be modified.
func (fs *FilterSystem) Get(id int) *Filter {
fs.filterMu.RLock()
defer fs.filterMu.RUnlock()
+
return fs.filters[id]
}
@@ -85,42 +90,49 @@ func (fs *FilterSystem) Get(id int) *Filter {
// when the filter matches the requirements.
func (fs *FilterSystem) filterLoop() {
// Subscribe to events
- events := fs.eventMux.Subscribe(
+ eventCh := fs.eventMux.Subscribe(
//core.PendingBlockEvent{},
core.ChainEvent{},
core.TxPreEvent{},
- vm.Logs(nil))
+ vm.Logs(nil),
+ ).Chan()
out:
for {
select {
case <-fs.quit:
break out
- case event := <-events.Chan():
- switch event := event.(type) {
+ case event, ok := <-eventCh:
+ if !ok {
+ // Event subscription closed, set the channel to nil to stop spinning
+ eventCh = nil
+ continue
+ }
+ // A real event arrived, notify the registered filters
+ switch ev := event.Data.(type) {
case core.ChainEvent:
fs.filterMu.RLock()
- for _, filter := range fs.filters {
- if filter.BlockCallback != nil {
- filter.BlockCallback(event.Block, event.Logs)
+ for id, filter := range fs.filters {
+ if filter.BlockCallback != nil && fs.created[id].Before(event.Time) {
+ filter.BlockCallback(ev.Block, ev.Logs)
}
}
fs.filterMu.RUnlock()
case core.TxPreEvent:
fs.filterMu.RLock()
- for _, filter := range fs.filters {
- if filter.TransactionCallback != nil {
- filter.TransactionCallback(event.Tx)
+ for id, filter := range fs.filters {
+ if filter.TransactionCallback != nil && fs.created[id].Before(event.Time) {
+ filter.TransactionCallback(ev.Tx)
}
}
fs.filterMu.RUnlock()
case vm.Logs:
fs.filterMu.RLock()
- for _, filter := range fs.filters {
- if filter.LogsCallback != nil {
- msgs := filter.FilterLogs(event)
+ for id, filter := range fs.filters {
+ if filter.LogsCallback != nil && fs.created[id].Before(event.Time) {
+ msgs := filter.FilterLogs(ev)
if len(msgs) > 0 {
filter.LogsCallback(msgs)
}
diff --git a/eth/gasprice.go b/eth/gasprice.go
index c08b96129..b4409f346 100644
--- a/eth/gasprice.go
+++ b/eth/gasprice.go
@@ -84,19 +84,16 @@ func (self *GasPriceOracle) processPastBlocks() {
}
func (self *GasPriceOracle) listenLoop() {
- for {
- ev, isopen := <-self.events.Chan()
- if !isopen {
- break
- }
- switch ev := ev.(type) {
+ defer self.events.Unsubscribe()
+
+ for event := range self.events.Chan() {
+ switch event := event.Data.(type) {
case core.ChainEvent:
- self.processBlock(ev.Block)
+ self.processBlock(event.Block)
case core.ChainSplitEvent:
- self.processBlock(ev.Block)
+ self.processBlock(event.Block)
}
}
- self.events.Unsubscribe()
}
func (self *GasPriceOracle) processBlock(block *types.Block) {
diff --git a/eth/handler.go b/eth/handler.go
index fc92338b4..3fc909672 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -687,7 +687,7 @@ func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction)
func (self *ProtocolManager) minedBroadcastLoop() {
// automatically stops if unsubscribe
for obj := range self.minedBlockSub.Chan() {
- switch ev := obj.(type) {
+ switch ev := obj.Data.(type) {
case core.NewMinedBlockEvent:
self.BroadcastBlock(ev.Block, true) // First propagate block to peers
self.BroadcastBlock(ev.Block, false) // Only then announce to the rest
@@ -698,7 +698,7 @@ func (self *ProtocolManager) minedBroadcastLoop() {
func (self *ProtocolManager) txBroadcastLoop() {
// automatically stops if unsubscribe
for obj := range self.txSub.Chan() {
- event := obj.(core.TxPreEvent)
+ event := obj.Data.(core.TxPreEvent)
self.BroadcastTx(event.Tx.Hash(), event.Tx)
}
}