diff options
author | Péter Szilágyi <peterke@gmail.com> | 2015-10-12 20:04:38 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2015-10-12 21:22:03 +0800 |
commit | 402fd6e8c6a2e379351e0aae10a833fae6bcae6c (patch) | |
tree | 30ab93e93af7c70e5df213eb3665f51a293bc4a9 /eth | |
parent | 315a422ba754eae10db21990a809f608f7af62d4 (diff) | |
download | go-tangerine-402fd6e8c6a2e379351e0aae10a833fae6bcae6c.tar go-tangerine-402fd6e8c6a2e379351e0aae10a833fae6bcae6c.tar.gz go-tangerine-402fd6e8c6a2e379351e0aae10a833fae6bcae6c.tar.bz2 go-tangerine-402fd6e8c6a2e379351e0aae10a833fae6bcae6c.tar.lz go-tangerine-402fd6e8c6a2e379351e0aae10a833fae6bcae6c.tar.xz go-tangerine-402fd6e8c6a2e379351e0aae10a833fae6bcae6c.tar.zst go-tangerine-402fd6e8c6a2e379351e0aae10a833fae6bcae6c.zip |
core, eth, event, miner, xeth: fix event post / subscription race
Diffstat (limited to 'eth')
-rw-r--r-- | eth/filters/filter_system.go | 44 | ||||
-rw-r--r-- | eth/gasprice.go | 15 | ||||
-rw-r--r-- | eth/handler.go | 4 |
3 files changed, 36 insertions, 27 deletions
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 4972dcd59..ae6093525 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -20,6 +20,7 @@ package filters import ( "sync" + "time" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/vm" @@ -35,6 +36,7 @@ type FilterSystem struct { filterMu sync.RWMutex filterId int filters map[int]*Filter + created map[int]time.Time quit chan struct{} } @@ -44,6 +46,7 @@ func NewFilterSystem(mux *event.TypeMux) *FilterSystem { fs := &FilterSystem{ eventMux: mux, filters: make(map[int]*Filter), + created: make(map[int]time.Time), } go fs.filterLoop() return fs @@ -60,6 +63,7 @@ func (fs *FilterSystem) Add(filter *Filter) (id int) { defer fs.filterMu.Unlock() id = fs.filterId fs.filters[id] = filter + fs.created[id] = time.Now() fs.filterId++ return id @@ -69,15 +73,16 @@ func (fs *FilterSystem) Add(filter *Filter) (id int) { func (fs *FilterSystem) Remove(id int) { fs.filterMu.Lock() defer fs.filterMu.Unlock() - if _, ok := fs.filters[id]; ok { - delete(fs.filters, id) - } + + delete(fs.filters, id) + delete(fs.created, id) } // Get retrieves a filter installed using Add The filter may not be modified. func (fs *FilterSystem) Get(id int) *Filter { fs.filterMu.RLock() defer fs.filterMu.RUnlock() + return fs.filters[id] } @@ -85,42 +90,49 @@ func (fs *FilterSystem) Get(id int) *Filter { // when the filter matches the requirements. func (fs *FilterSystem) filterLoop() { // Subscribe to events - events := fs.eventMux.Subscribe( + eventCh := fs.eventMux.Subscribe( //core.PendingBlockEvent{}, core.ChainEvent{}, core.TxPreEvent{}, - vm.Logs(nil)) + vm.Logs(nil), + ).Chan() out: for { select { case <-fs.quit: break out - case event := <-events.Chan(): - switch event := event.(type) { + case event, ok := <-eventCh: + if !ok { + // Event subscription closed, set the channel to nil to stop spinning + eventCh = nil + continue + } + // A real event arrived, notify the registered filters + switch ev := event.Data.(type) { case core.ChainEvent: fs.filterMu.RLock() - for _, filter := range fs.filters { - if filter.BlockCallback != nil { - filter.BlockCallback(event.Block, event.Logs) + for id, filter := range fs.filters { + if filter.BlockCallback != nil && fs.created[id].Before(event.Time) { + filter.BlockCallback(ev.Block, ev.Logs) } } fs.filterMu.RUnlock() case core.TxPreEvent: fs.filterMu.RLock() - for _, filter := range fs.filters { - if filter.TransactionCallback != nil { - filter.TransactionCallback(event.Tx) + for id, filter := range fs.filters { + if filter.TransactionCallback != nil && fs.created[id].Before(event.Time) { + filter.TransactionCallback(ev.Tx) } } fs.filterMu.RUnlock() case vm.Logs: fs.filterMu.RLock() - for _, filter := range fs.filters { - if filter.LogsCallback != nil { - msgs := filter.FilterLogs(event) + for id, filter := range fs.filters { + if filter.LogsCallback != nil && fs.created[id].Before(event.Time) { + msgs := filter.FilterLogs(ev) if len(msgs) > 0 { filter.LogsCallback(msgs) } diff --git a/eth/gasprice.go b/eth/gasprice.go index c08b96129..b4409f346 100644 --- a/eth/gasprice.go +++ b/eth/gasprice.go @@ -84,19 +84,16 @@ func (self *GasPriceOracle) processPastBlocks() { } func (self *GasPriceOracle) listenLoop() { - for { - ev, isopen := <-self.events.Chan() - if !isopen { - break - } - switch ev := ev.(type) { + defer self.events.Unsubscribe() + + for event := range self.events.Chan() { + switch event := event.Data.(type) { case core.ChainEvent: - self.processBlock(ev.Block) + self.processBlock(event.Block) case core.ChainSplitEvent: - self.processBlock(ev.Block) + self.processBlock(event.Block) } } - self.events.Unsubscribe() } func (self *GasPriceOracle) processBlock(block *types.Block) { diff --git a/eth/handler.go b/eth/handler.go index fc92338b4..3fc909672 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -687,7 +687,7 @@ func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) func (self *ProtocolManager) minedBroadcastLoop() { // automatically stops if unsubscribe for obj := range self.minedBlockSub.Chan() { - switch ev := obj.(type) { + switch ev := obj.Data.(type) { case core.NewMinedBlockEvent: self.BroadcastBlock(ev.Block, true) // First propagate block to peers self.BroadcastBlock(ev.Block, false) // Only then announce to the rest @@ -698,7 +698,7 @@ func (self *ProtocolManager) minedBroadcastLoop() { func (self *ProtocolManager) txBroadcastLoop() { // automatically stops if unsubscribe for obj := range self.txSub.Chan() { - event := obj.(core.TxPreEvent) + event := obj.Data.(core.TxPreEvent) self.BroadcastTx(event.Tx.Hash(), event.Tx) } } |