From fbdb44dcc17240a01b45e55d3aa4e4b8db0868cd Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 29 Oct 2015 13:28:00 +0100 Subject: cmd/utils, rpc/comms: stop XEth when IPC connection ends There are a bunch of changes required to make this work: - in miner: allow unregistering agents, fix RemoteAgent.Stop - in eth/filters: make FilterSystem.Stop not crash - in rpc/comms: move listen loop to platform-independent code Fixes #1930. I ran the shell loop there for a few minutes and didn't see any changes in the memory profile. --- eth/filters/filter_system.go | 88 ++++++++++++++++++-------------------------- 1 file changed, 35 insertions(+), 53 deletions(-) (limited to 'eth') diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index ae6093525..df3ce90c6 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -31,30 +31,32 @@ import ( // block, transaction and log events. The Filtering system can be used to listen // for specific LOG events fired by the EVM (Ethereum Virtual Machine). type FilterSystem struct { - eventMux *event.TypeMux - filterMu sync.RWMutex filterId int filters map[int]*Filter created map[int]time.Time - - quit chan struct{} + sub event.Subscription } // NewFilterSystem returns a newly allocated filter manager func NewFilterSystem(mux *event.TypeMux) *FilterSystem { fs := &FilterSystem{ - eventMux: mux, - filters: make(map[int]*Filter), - created: make(map[int]time.Time), + filters: make(map[int]*Filter), + created: make(map[int]time.Time), } + fs.sub = mux.Subscribe( + //core.PendingBlockEvent{}, + core.ChainEvent{}, + core.TxPreEvent{}, + vm.Logs(nil), + ) go fs.filterLoop() return fs } // Stop quits the filter loop required for polling events func (fs *FilterSystem) Stop() { - close(fs.quit) + fs.sub.Unsubscribe() } // Add adds a filter to the filter manager @@ -89,57 +91,37 @@ func (fs *FilterSystem) Get(id int) *Filter { // filterLoop waits for specific events from ethereum and fires their handlers // when the filter matches the requirements. func (fs *FilterSystem) filterLoop() { - // Subscribe to events - eventCh := fs.eventMux.Subscribe( - //core.PendingBlockEvent{}, - core.ChainEvent{}, - core.TxPreEvent{}, - vm.Logs(nil), - ).Chan() - -out: - for { - select { - case <-fs.quit: - break out - case event, ok := <-eventCh: - if !ok { - // Event subscription closed, set the channel to nil to stop spinning - eventCh = nil - continue - } - // A real event arrived, notify the registered filters - switch ev := event.Data.(type) { - case core.ChainEvent: - fs.filterMu.RLock() - for id, filter := range fs.filters { - if filter.BlockCallback != nil && fs.created[id].Before(event.Time) { - filter.BlockCallback(ev.Block, ev.Logs) - } + for event := range fs.sub.Chan() { + switch ev := event.Data.(type) { + case core.ChainEvent: + fs.filterMu.RLock() + for id, filter := range fs.filters { + if filter.BlockCallback != nil && fs.created[id].Before(event.Time) { + filter.BlockCallback(ev.Block, ev.Logs) } - fs.filterMu.RUnlock() + } + fs.filterMu.RUnlock() - case core.TxPreEvent: - fs.filterMu.RLock() - for id, filter := range fs.filters { - if filter.TransactionCallback != nil && fs.created[id].Before(event.Time) { - filter.TransactionCallback(ev.Tx) - } + case core.TxPreEvent: + fs.filterMu.RLock() + for id, filter := range fs.filters { + if filter.TransactionCallback != nil && fs.created[id].Before(event.Time) { + filter.TransactionCallback(ev.Tx) } - fs.filterMu.RUnlock() - - case vm.Logs: - fs.filterMu.RLock() - for id, filter := range fs.filters { - if filter.LogsCallback != nil && fs.created[id].Before(event.Time) { - msgs := filter.FilterLogs(ev) - if len(msgs) > 0 { - filter.LogsCallback(msgs) - } + } + fs.filterMu.RUnlock() + + case vm.Logs: + fs.filterMu.RLock() + for id, filter := range fs.filters { + if filter.LogsCallback != nil && fs.created[id].Before(event.Time) { + msgs := filter.FilterLogs(ev) + if len(msgs) > 0 { + filter.LogsCallback(msgs) } } - fs.filterMu.RUnlock() } + fs.filterMu.RUnlock() } } } -- cgit v1.2.3