From 51f8ce26cf6dbc20ddc548af305739db981fdd41 Mon Sep 17 00:00:00 2001 From: Henning Diedrich Date: Fri, 17 Jun 2016 09:53:54 +0200 Subject: eth: fix #2710 filter races and locking bugs found in its wake. --- eth/filters/api.go | 63 +++++++++++++++++++++++++++++++++++------------------- 1 file changed, 41 insertions(+), 22 deletions(-) (limited to 'eth/filters/api.go') diff --git a/eth/filters/api.go b/eth/filters/api.go index 393019f8b..65c5b9380 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -68,8 +68,6 @@ type PublicFilterAPI struct { transactionMu sync.RWMutex transactionQueue map[int]*hashQueue - - transactMu sync.Mutex } // NewPublicFilterAPI returns a new PublicFilterAPI instance. @@ -100,6 +98,7 @@ done: for { select { case <-timer.C: + s.filterManager.Lock() // lock order like filterLoop() s.logMu.Lock() for id, filter := range s.logQueue { if time.Since(filter.timeout) > filterTickerTime { @@ -126,6 +125,7 @@ done: } } s.transactionMu.Unlock() + s.filterManager.Unlock() case <-s.quit: break done } @@ -135,19 +135,24 @@ done: // NewBlockFilter create a new filter that returns blocks that are included into the canonical chain. func (s *PublicFilterAPI) NewBlockFilter() (string, error) { + // protect filterManager.Add() and setting of filter fields + s.filterManager.Lock() + defer s.filterManager.Unlock() + externalId, err := newFilterId() if err != nil { return "", err } - s.blockMu.Lock() filter := New(s.chainDb) id, err := s.filterManager.Add(filter, ChainFilter) if err != nil { return "", err } + s.blockMu.Lock() s.blockQueue[id] = &hashQueue{timeout: time.Now()} + s.blockMu.Unlock() filter.BlockCallback = func(block *types.Block, logs vm.Logs) { s.blockMu.Lock() @@ -158,8 +163,6 @@ func (s *PublicFilterAPI) NewBlockFilter() (string, error) { } } - defer s.blockMu.Unlock() - s.filterMapMu.Lock() s.filterMapping[externalId] = id s.filterMapMu.Unlock() @@ -169,21 +172,24 @@ func (s *PublicFilterAPI) NewBlockFilter() (string, error) { // NewPendingTransactionFilter creates a filter that returns new pending transactions. func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) { + // protect filterManager.Add() and setting of filter fields + s.filterManager.Lock() + defer s.filterManager.Unlock() + externalId, err := newFilterId() if err != nil { return "", err } - s.transactionMu.Lock() - defer s.transactionMu.Unlock() - filter := New(s.chainDb) id, err := s.filterManager.Add(filter, PendingTxFilter) if err != nil { return "", err } + s.transactionMu.Lock() s.transactionQueue[id] = &hashQueue{timeout: time.Now()} + s.transactionMu.Unlock() filter.TransactionCallback = func(tx *types.Transaction) { s.transactionMu.Lock() @@ -203,8 +209,9 @@ func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) { // newLogFilter creates a new log filter. func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash, callback func(log *vm.Log, removed bool)) (int, error) { - s.logMu.Lock() - defer s.logMu.Unlock() + // protect filterManager.Add() and setting of filter fields + s.filterManager.Lock() + defer s.filterManager.Unlock() filter := New(s.chainDb) id, err := s.filterManager.Add(filter, LogFilter) @@ -212,7 +219,9 @@ func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []commo return 0, err } + s.logMu.Lock() s.logQueue[id] = &logQueue{timeout: time.Now()} + s.logMu.Unlock() filter.SetBeginBlock(earliest) filter.SetEndBlock(latest) @@ -443,35 +452,43 @@ func (s *PublicFilterAPI) GetLogs(args NewFilterArgs) []vmlog { // UninstallFilter removes the filter with the given filter id. func (s *PublicFilterAPI) UninstallFilter(filterId string) bool { - s.filterMapMu.Lock() - defer s.filterMapMu.Unlock() + s.filterManager.Lock() + defer s.filterManager.Unlock() + s.filterMapMu.Lock() id, ok := s.filterMapping[filterId] if !ok { + s.filterMapMu.Unlock() return false } - - defer s.filterManager.Remove(id) delete(s.filterMapping, filterId) + s.filterMapMu.Unlock() + s.filterManager.Remove(id) + + s.logMu.Lock() if _, ok := s.logQueue[id]; ok { - s.logMu.Lock() - defer s.logMu.Unlock() delete(s.logQueue, id) + s.logMu.Unlock() return true } + s.logMu.Unlock() + + s.blockMu.Lock() if _, ok := s.blockQueue[id]; ok { - s.blockMu.Lock() - defer s.blockMu.Unlock() delete(s.blockQueue, id) + s.blockMu.Unlock() return true } + s.blockMu.Unlock() + + s.transactionMu.Lock() if _, ok := s.transactionQueue[id]; ok { - s.transactionMu.Lock() - defer s.transactionMu.Unlock() delete(s.transactionQueue, id) + s.transactionMu.Unlock() return true } + s.transactionMu.Unlock() return false } @@ -525,7 +542,9 @@ func (s *PublicFilterAPI) logFilterChanged(id int) []vmlog { // GetFilterLogs returns the logs for the filter with the given id. func (s *PublicFilterAPI) GetFilterLogs(filterId string) []vmlog { + s.filterMapMu.RLock() id, ok := s.filterMapping[filterId] + s.filterMapMu.RUnlock() if !ok { return toRPCLogs(nil, false) } @@ -540,9 +559,9 @@ func (s *PublicFilterAPI) GetFilterLogs(filterId string) []vmlog { // GetFilterChanges returns the logs for the filter with the given id since last time is was called. // This can be used for polling. func (s *PublicFilterAPI) GetFilterChanges(filterId string) interface{} { - s.filterMapMu.Lock() + s.filterMapMu.RLock() id, ok := s.filterMapping[filterId] - s.filterMapMu.Unlock() + s.filterMapMu.RUnlock() if !ok { // filter not found return []interface{}{} -- cgit v1.2.3