aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFelix Lange <fjl@twurst.com>2016-07-20 05:14:08 +0800
committerGitHub <noreply@github.com>2016-07-20 05:14:08 +0800
commit74ec95e7f6d0250dcc9bf51b8bd7e9c8c5431dfc (patch)
tree28e778c3b4babdb8443a78b3c7c60909186948f9
parent65f340bb95bd10f59b3086f322e751bf46a4e446 (diff)
parent51f8ce26cf6dbc20ddc548af305739db981fdd41 (diff)
downloaddexon-74ec95e7f6d0250dcc9bf51b8bd7e9c8c5431dfc.tar
dexon-74ec95e7f6d0250dcc9bf51b8bd7e9c8c5431dfc.tar.gz
dexon-74ec95e7f6d0250dcc9bf51b8bd7e9c8c5431dfc.tar.bz2
dexon-74ec95e7f6d0250dcc9bf51b8bd7e9c8c5431dfc.tar.lz
dexon-74ec95e7f6d0250dcc9bf51b8bd7e9c8c5431dfc.tar.xz
dexon-74ec95e7f6d0250dcc9bf51b8bd7e9c8c5431dfc.tar.zst
dexon-74ec95e7f6d0250dcc9bf51b8bd7e9c8c5431dfc.zip
Merge pull request #2711 from hdiedrich/1.4.7-filter-races-cleanup
Fix #2710 Filter race: concurrent map read and map write
-rw-r--r--eth/filters/api.go63
-rw-r--r--eth/filters/filter_system.go19
2 files changed, 54 insertions, 28 deletions
diff --git a/eth/filters/api.go b/eth/filters/api.go
index 393019f8b..65c5b9380 100644
--- a/eth/filters/api.go
+++ b/eth/filters/api.go
@@ -68,8 +68,6 @@ type PublicFilterAPI struct {
transactionMu sync.RWMutex
transactionQueue map[int]*hashQueue
-
- transactMu sync.Mutex
}
// NewPublicFilterAPI returns a new PublicFilterAPI instance.
@@ -100,6 +98,7 @@ done:
for {
select {
case <-timer.C:
+ s.filterManager.Lock() // lock order like filterLoop()
s.logMu.Lock()
for id, filter := range s.logQueue {
if time.Since(filter.timeout) > filterTickerTime {
@@ -126,6 +125,7 @@ done:
}
}
s.transactionMu.Unlock()
+ s.filterManager.Unlock()
case <-s.quit:
break done
}
@@ -135,19 +135,24 @@ done:
// NewBlockFilter create a new filter that returns blocks that are included into the canonical chain.
func (s *PublicFilterAPI) NewBlockFilter() (string, error) {
+ // protect filterManager.Add() and setting of filter fields
+ s.filterManager.Lock()
+ defer s.filterManager.Unlock()
+
externalId, err := newFilterId()
if err != nil {
return "", err
}
- s.blockMu.Lock()
filter := New(s.chainDb)
id, err := s.filterManager.Add(filter, ChainFilter)
if err != nil {
return "", err
}
+ s.blockMu.Lock()
s.blockQueue[id] = &hashQueue{timeout: time.Now()}
+ s.blockMu.Unlock()
filter.BlockCallback = func(block *types.Block, logs vm.Logs) {
s.blockMu.Lock()
@@ -158,8 +163,6 @@ func (s *PublicFilterAPI) NewBlockFilter() (string, error) {
}
}
- defer s.blockMu.Unlock()
-
s.filterMapMu.Lock()
s.filterMapping[externalId] = id
s.filterMapMu.Unlock()
@@ -169,21 +172,24 @@ func (s *PublicFilterAPI) NewBlockFilter() (string, error) {
// NewPendingTransactionFilter creates a filter that returns new pending transactions.
func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
+ // protect filterManager.Add() and setting of filter fields
+ s.filterManager.Lock()
+ defer s.filterManager.Unlock()
+
externalId, err := newFilterId()
if err != nil {
return "", err
}
- s.transactionMu.Lock()
- defer s.transactionMu.Unlock()
-
filter := New(s.chainDb)
id, err := s.filterManager.Add(filter, PendingTxFilter)
if err != nil {
return "", err
}
+ s.transactionMu.Lock()
s.transactionQueue[id] = &hashQueue{timeout: time.Now()}
+ s.transactionMu.Unlock()
filter.TransactionCallback = func(tx *types.Transaction) {
s.transactionMu.Lock()
@@ -203,8 +209,9 @@ func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
// newLogFilter creates a new log filter.
func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash, callback func(log *vm.Log, removed bool)) (int, error) {
- s.logMu.Lock()
- defer s.logMu.Unlock()
+ // protect filterManager.Add() and setting of filter fields
+ s.filterManager.Lock()
+ defer s.filterManager.Unlock()
filter := New(s.chainDb)
id, err := s.filterManager.Add(filter, LogFilter)
@@ -212,7 +219,9 @@ func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []commo
return 0, err
}
+ s.logMu.Lock()
s.logQueue[id] = &logQueue{timeout: time.Now()}
+ s.logMu.Unlock()
filter.SetBeginBlock(earliest)
filter.SetEndBlock(latest)
@@ -443,35 +452,43 @@ func (s *PublicFilterAPI) GetLogs(args NewFilterArgs) []vmlog {
// UninstallFilter removes the filter with the given filter id.
func (s *PublicFilterAPI) UninstallFilter(filterId string) bool {
- s.filterMapMu.Lock()
- defer s.filterMapMu.Unlock()
+ s.filterManager.Lock()
+ defer s.filterManager.Unlock()
+ s.filterMapMu.Lock()
id, ok := s.filterMapping[filterId]
if !ok {
+ s.filterMapMu.Unlock()
return false
}
-
- defer s.filterManager.Remove(id)
delete(s.filterMapping, filterId)
+ s.filterMapMu.Unlock()
+ s.filterManager.Remove(id)
+
+ s.logMu.Lock()
if _, ok := s.logQueue[id]; ok {
- s.logMu.Lock()
- defer s.logMu.Unlock()
delete(s.logQueue, id)
+ s.logMu.Unlock()
return true
}
+ s.logMu.Unlock()
+
+ s.blockMu.Lock()
if _, ok := s.blockQueue[id]; ok {
- s.blockMu.Lock()
- defer s.blockMu.Unlock()
delete(s.blockQueue, id)
+ s.blockMu.Unlock()
return true
}
+ s.blockMu.Unlock()
+
+ s.transactionMu.Lock()
if _, ok := s.transactionQueue[id]; ok {
- s.transactionMu.Lock()
- defer s.transactionMu.Unlock()
delete(s.transactionQueue, id)
+ s.transactionMu.Unlock()
return true
}
+ s.transactionMu.Unlock()
return false
}
@@ -525,7 +542,9 @@ func (s *PublicFilterAPI) logFilterChanged(id int) []vmlog {
// GetFilterLogs returns the logs for the filter with the given id.
func (s *PublicFilterAPI) GetFilterLogs(filterId string) []vmlog {
+ s.filterMapMu.RLock()
id, ok := s.filterMapping[filterId]
+ s.filterMapMu.RUnlock()
if !ok {
return toRPCLogs(nil, false)
}
@@ -540,9 +559,9 @@ func (s *PublicFilterAPI) GetFilterLogs(filterId string) []vmlog {
// GetFilterChanges returns the logs for the filter with the given id since last time is was called.
// This can be used for polling.
func (s *PublicFilterAPI) GetFilterChanges(filterId string) interface{} {
- s.filterMapMu.Lock()
+ s.filterMapMu.RLock()
id, ok := s.filterMapping[filterId]
- s.filterMapMu.Unlock()
+ s.filterMapMu.RUnlock()
if !ok { // filter not found
return []interface{}{}
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index 4343dfa21..256464213 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -82,11 +82,20 @@ func (fs *FilterSystem) Stop() {
fs.sub.Unsubscribe()
}
-// Add adds a filter to the filter manager
-func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error) {
+// Acquire filter system maps lock, required to force lock acquisition
+// sequence with filterMu acquired first to avoid deadlocks by callbacks
+func (fs *FilterSystem) Lock() {
fs.filterMu.Lock()
- defer fs.filterMu.Unlock()
+}
+
+// Release filter system maps lock
+func (fs *FilterSystem) Unlock() {
+ fs.filterMu.Unlock()
+}
+// Add adds a filter to the filter manager
+// Expects filterMu to be locked.
+func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error) {
id := fs.filterId
filter.created = time.Now()
@@ -110,10 +119,8 @@ func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error)
}
// Remove removes a filter by filter id
+// Expects filterMu to be locked.
func (fs *FilterSystem) Remove(id int) {
- fs.filterMu.Lock()
- defer fs.filterMu.Unlock()
-
delete(fs.chainFilters, id)
delete(fs.pendingTxFilters, id)
delete(fs.logFilters, id)