From 51f8ce26cf6dbc20ddc548af305739db981fdd41 Mon Sep 17 00:00:00 2001
From: Henning Diedrich <hd@eonblast.com>
Date: Fri, 17 Jun 2016 09:53:54 +0200
Subject: eth: fix #2710 filter races

and locking bugs found in its wake.
---
 eth/filters/api.go           | 63 ++++++++++++++++++++++++++++----------------
 eth/filters/filter_system.go | 19 ++++++++-----
 2 files changed, 54 insertions(+), 28 deletions(-)

(limited to 'eth')

diff --git a/eth/filters/api.go b/eth/filters/api.go
index 393019f8b..65c5b9380 100644
--- a/eth/filters/api.go
+++ b/eth/filters/api.go
@@ -68,8 +68,6 @@ type PublicFilterAPI struct {
 
 	transactionMu    sync.RWMutex
 	transactionQueue map[int]*hashQueue
-
-	transactMu sync.Mutex
 }
 
 // NewPublicFilterAPI returns a new PublicFilterAPI instance.
@@ -100,6 +98,7 @@ done:
 	for {
 		select {
 		case <-timer.C:
+			s.filterManager.Lock() // lock order like filterLoop()
 			s.logMu.Lock()
 			for id, filter := range s.logQueue {
 				if time.Since(filter.timeout) > filterTickerTime {
@@ -126,6 +125,7 @@ done:
 				}
 			}
 			s.transactionMu.Unlock()
+			s.filterManager.Unlock()
 		case <-s.quit:
 			break done
 		}
@@ -135,19 +135,24 @@ done:
 
 // NewBlockFilter create a new filter that returns blocks that are included into the canonical chain.
 func (s *PublicFilterAPI) NewBlockFilter() (string, error) {
+	// protect filterManager.Add() and setting of filter fields
+	s.filterManager.Lock()
+	defer s.filterManager.Unlock()
+
 	externalId, err := newFilterId()
 	if err != nil {
 		return "", err
 	}
 
-	s.blockMu.Lock()
 	filter := New(s.chainDb)
 	id, err := s.filterManager.Add(filter, ChainFilter)
 	if err != nil {
 		return "", err
 	}
 
+	s.blockMu.Lock()
 	s.blockQueue[id] = &hashQueue{timeout: time.Now()}
+	s.blockMu.Unlock()
 
 	filter.BlockCallback = func(block *types.Block, logs vm.Logs) {
 		s.blockMu.Lock()
@@ -158,8 +163,6 @@ func (s *PublicFilterAPI) NewBlockFilter() (string, error) {
 		}
 	}
 
-	defer s.blockMu.Unlock()
-
 	s.filterMapMu.Lock()
 	s.filterMapping[externalId] = id
 	s.filterMapMu.Unlock()
@@ -169,21 +172,24 @@ func (s *PublicFilterAPI) NewBlockFilter() (string, error) {
 
 // NewPendingTransactionFilter creates a filter that returns new pending transactions.
 func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
+	// protect filterManager.Add() and setting of filter fields
+	s.filterManager.Lock()
+	defer s.filterManager.Unlock()
+
 	externalId, err := newFilterId()
 	if err != nil {
 		return "", err
 	}
 
-	s.transactionMu.Lock()
-	defer s.transactionMu.Unlock()
-
 	filter := New(s.chainDb)
 	id, err := s.filterManager.Add(filter, PendingTxFilter)
 	if err != nil {
 		return "", err
 	}
 
+	s.transactionMu.Lock()
 	s.transactionQueue[id] = &hashQueue{timeout: time.Now()}
+	s.transactionMu.Unlock()
 
 	filter.TransactionCallback = func(tx *types.Transaction) {
 		s.transactionMu.Lock()
@@ -203,8 +209,9 @@ func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
 
 // newLogFilter creates a new log filter.
 func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash, callback func(log *vm.Log, removed bool)) (int, error) {
-	s.logMu.Lock()
-	defer s.logMu.Unlock()
+	// protect filterManager.Add() and setting of filter fields
+	s.filterManager.Lock()
+	defer s.filterManager.Unlock()
 
 	filter := New(s.chainDb)
 	id, err := s.filterManager.Add(filter, LogFilter)
@@ -212,7 +219,9 @@ func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []commo
 		return 0, err
 	}
 
+	s.logMu.Lock()
 	s.logQueue[id] = &logQueue{timeout: time.Now()}
+	s.logMu.Unlock()
 
 	filter.SetBeginBlock(earliest)
 	filter.SetEndBlock(latest)
@@ -443,35 +452,43 @@ func (s *PublicFilterAPI) GetLogs(args NewFilterArgs) []vmlog {
 
 // UninstallFilter removes the filter with the given filter id.
 func (s *PublicFilterAPI) UninstallFilter(filterId string) bool {
-	s.filterMapMu.Lock()
-	defer s.filterMapMu.Unlock()
+	s.filterManager.Lock()
+	defer s.filterManager.Unlock()
 
+	s.filterMapMu.Lock()
 	id, ok := s.filterMapping[filterId]
 	if !ok {
+		s.filterMapMu.Unlock()
 		return false
 	}
-
-	defer s.filterManager.Remove(id)
 	delete(s.filterMapping, filterId)
+	s.filterMapMu.Unlock()
 
+	s.filterManager.Remove(id)
+
+	s.logMu.Lock()
 	if _, ok := s.logQueue[id]; ok {
-		s.logMu.Lock()
-		defer s.logMu.Unlock()
 		delete(s.logQueue, id)
+		s.logMu.Unlock()
 		return true
 	}
+	s.logMu.Unlock()
+
+	s.blockMu.Lock()
 	if _, ok := s.blockQueue[id]; ok {
-		s.blockMu.Lock()
-		defer s.blockMu.Unlock()
 		delete(s.blockQueue, id)
+		s.blockMu.Unlock()
 		return true
 	}
+	s.blockMu.Unlock()
+
+	s.transactionMu.Lock()
 	if _, ok := s.transactionQueue[id]; ok {
-		s.transactionMu.Lock()
-		defer s.transactionMu.Unlock()
 		delete(s.transactionQueue, id)
+		s.transactionMu.Unlock()
 		return true
 	}
+	s.transactionMu.Unlock()
 
 	return false
 }
@@ -525,7 +542,9 @@ func (s *PublicFilterAPI) logFilterChanged(id int) []vmlog {
 
 // GetFilterLogs returns the logs for the filter with the given id.
 func (s *PublicFilterAPI) GetFilterLogs(filterId string) []vmlog {
+	s.filterMapMu.RLock()
 	id, ok := s.filterMapping[filterId]
+	s.filterMapMu.RUnlock()
 	if !ok {
 		return toRPCLogs(nil, false)
 	}
@@ -540,9 +559,9 @@ func (s *PublicFilterAPI) GetFilterLogs(filterId string) []vmlog {
 // GetFilterChanges returns the logs for the filter with the given id since last time is was called.
 // This can be used for polling.
 func (s *PublicFilterAPI) GetFilterChanges(filterId string) interface{} {
-	s.filterMapMu.Lock()
+	s.filterMapMu.RLock()
 	id, ok := s.filterMapping[filterId]
-	s.filterMapMu.Unlock()
+	s.filterMapMu.RUnlock()
 
 	if !ok { // filter not found
 		return []interface{}{}
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index 4343dfa21..256464213 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -82,11 +82,20 @@ func (fs *FilterSystem) Stop() {
 	fs.sub.Unsubscribe()
 }
 
-// Add adds a filter to the filter manager
-func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error) {
+// Acquire filter system maps lock, required to force lock acquisition
+// sequence with filterMu acquired first to avoid deadlocks by callbacks
+func (fs *FilterSystem) Lock() {
 	fs.filterMu.Lock()
-	defer fs.filterMu.Unlock()
+}
+
+// Release filter system maps lock
+func (fs *FilterSystem) Unlock() {
+	fs.filterMu.Unlock()
+}
 
+// Add adds a filter to the filter manager
+// Expects filterMu to be locked.
+func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error) {
 	id := fs.filterId
 	filter.created = time.Now()
 
@@ -110,10 +119,8 @@ func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error)
 }
 
 // Remove removes a filter by filter id
+// Expects filterMu to be locked.
 func (fs *FilterSystem) Remove(id int) {
-	fs.filterMu.Lock()
-	defer fs.filterMu.Unlock()
-
 	delete(fs.chainFilters, id)
 	delete(fs.pendingTxFilters, id)
 	delete(fs.logFilters, id)
-- 
cgit v1.2.3