aboutsummaryrefslogtreecommitdiffstats
path: root/core/tx_pool.go
diff options
context:
space:
mode:
authorRyan Schneider <ryanleeschneider@gmail.com>2018-05-23 20:55:42 +0800
committerPéter Szilágyi <peterke@gmail.com>2018-05-23 20:55:42 +0800
commit55b579e02ce6f374bf81061269eabde0d82ae567 (patch)
treebd342772873ac0eb434d49a339850df6a3cf633b /core/tx_pool.go
parentbe22ee8ddac890044ca66500f4f8b32c635e3d1f (diff)
downloaddexon-55b579e02ce6f374bf81061269eabde0d82ae567.tar
dexon-55b579e02ce6f374bf81061269eabde0d82ae567.tar.gz
dexon-55b579e02ce6f374bf81061269eabde0d82ae567.tar.bz2
dexon-55b579e02ce6f374bf81061269eabde0d82ae567.tar.lz
dexon-55b579e02ce6f374bf81061269eabde0d82ae567.tar.xz
dexon-55b579e02ce6f374bf81061269eabde0d82ae567.tar.zst
dexon-55b579e02ce6f374bf81061269eabde0d82ae567.zip
core: use a wrapped map to remove contention in `TxPool.Get`. (#16670)
* core: use a wrapped `map` and `sync.RWMutex` for `TxPool.all` to remove contention in `TxPool.Get`. * core: Remove redundant `txLookup.Find` and improve comments on txLookup methods.
Diffstat (limited to 'core/tx_pool.go')
-rw-r--r--core/tx_pool.go130
1 files changed, 96 insertions, 34 deletions
diff --git a/core/tx_pool.go b/core/tx_pool.go
index f89e11441..1c9516b1b 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -200,11 +200,11 @@ type TxPool struct {
locals *accountSet // Set of local transaction to exempt from eviction rules
journal *txJournal // Journal of local transaction to back up to disk
- pending map[common.Address]*txList // All currently processable transactions
- queue map[common.Address]*txList // Queued but non-processable transactions
- beats map[common.Address]time.Time // Last heartbeat from each known account
- all map[common.Hash]*types.Transaction // All transactions to allow lookups
- priced *txPricedList // All transactions sorted by price
+ pending map[common.Address]*txList // All currently processable transactions
+ queue map[common.Address]*txList // Queued but non-processable transactions
+ beats map[common.Address]time.Time // Last heartbeat from each known account
+ all *txLookup // All transactions to allow lookups
+ priced *txPricedList // All transactions sorted by price
wg sync.WaitGroup // for shutdown sync
@@ -226,12 +226,12 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
pending: make(map[common.Address]*txList),
queue: make(map[common.Address]*txList),
beats: make(map[common.Address]time.Time),
- all: make(map[common.Hash]*types.Transaction),
+ all: newTxLookup(),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
}
pool.locals = newAccountSet(pool.signer)
- pool.priced = newTxPricedList(&pool.all)
+ pool.priced = newTxPricedList(pool.all)
pool.reset(nil, chain.CurrentBlock().Header())
// If local transactions and journaling is enabled, load from disk
@@ -605,7 +605,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
// If the transaction is already known, discard it
hash := tx.Hash()
- if pool.all[hash] != nil {
+ if pool.all.Get(hash) != nil {
log.Trace("Discarding already known transaction", "hash", hash)
return false, fmt.Errorf("known transaction: %x", hash)
}
@@ -616,7 +616,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
return false, err
}
// If the transaction pool is full, discard underpriced transactions
- if uint64(len(pool.all)) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
+ if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
// If the new transaction is underpriced, don't accept it
if !local && pool.priced.Underpriced(tx, pool.locals) {
log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
@@ -624,7 +624,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
return false, ErrUnderpriced
}
// New transaction is better than our worse ones, make room for it
- drop := pool.priced.Discard(len(pool.all)-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
+ drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
for _, tx := range drop {
log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
underpricedTxCounter.Inc(1)
@@ -642,11 +642,11 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
}
// New transaction is better, replace old one
if old != nil {
- delete(pool.all, old.Hash())
+ pool.all.Remove(old.Hash())
pool.priced.Removed()
pendingReplaceCounter.Inc(1)
}
- pool.all[tx.Hash()] = tx
+ pool.all.Add(tx)
pool.priced.Put(tx)
pool.journalTx(from, tx)
@@ -689,12 +689,12 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er
}
// Discard any previous transaction and mark this
if old != nil {
- delete(pool.all, old.Hash())
+ pool.all.Remove(old.Hash())
pool.priced.Removed()
queuedReplaceCounter.Inc(1)
}
- if pool.all[hash] == nil {
- pool.all[hash] = tx
+ if pool.all.Get(hash) == nil {
+ pool.all.Add(tx)
pool.priced.Put(tx)
}
return old != nil, nil
@@ -726,7 +726,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
inserted, old := list.Add(tx, pool.config.PriceBump)
if !inserted {
// An older transaction was better, discard this
- delete(pool.all, hash)
+ pool.all.Remove(hash)
pool.priced.Removed()
pendingDiscardCounter.Inc(1)
@@ -734,14 +734,14 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
}
// Otherwise discard any previous transaction and mark this
if old != nil {
- delete(pool.all, old.Hash())
+ pool.all.Remove(old.Hash())
pool.priced.Removed()
pendingReplaceCounter.Inc(1)
}
// Failsafe to work around direct pending inserts (tests)
- if pool.all[hash] == nil {
- pool.all[hash] = tx
+ if pool.all.Get(hash) == nil {
+ pool.all.Add(tx)
pool.priced.Put(tx)
}
// Set the potentially new pending nonce and notify any subsystems of the new tx
@@ -840,7 +840,7 @@ func (pool *TxPool) Status(hashes []common.Hash) []TxStatus {
status := make([]TxStatus, len(hashes))
for i, hash := range hashes {
- if tx := pool.all[hash]; tx != nil {
+ if tx := pool.all.Get(hash); tx != nil {
from, _ := types.Sender(pool.signer, tx) // already validated
if pool.pending[from] != nil && pool.pending[from].txs.items[tx.Nonce()] != nil {
status[i] = TxStatusPending
@@ -855,24 +855,21 @@ func (pool *TxPool) Status(hashes []common.Hash) []TxStatus {
// Get returns a transaction if it is contained in the pool
// and nil otherwise.
func (pool *TxPool) Get(hash common.Hash) *types.Transaction {
- pool.mu.RLock()
- defer pool.mu.RUnlock()
-
- return pool.all[hash]
+ return pool.all.Get(hash)
}
// removeTx removes a single transaction from the queue, moving all subsequent
// transactions back to the future queue.
func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
// Fetch the transaction we wish to delete
- tx, ok := pool.all[hash]
- if !ok {
+ tx := pool.all.Get(hash)
+ if tx == nil {
return
}
addr, _ := types.Sender(pool.signer, tx) // already validated during insertion
// Remove it from the list of known transactions
- delete(pool.all, hash)
+ pool.all.Remove(hash)
if outofbound {
pool.priced.Removed()
}
@@ -928,7 +925,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
for _, tx := range list.Forward(pool.currentState.GetNonce(addr)) {
hash := tx.Hash()
log.Trace("Removed old queued transaction", "hash", hash)
- delete(pool.all, hash)
+ pool.all.Remove(hash)
pool.priced.Removed()
}
// Drop all transactions that are too costly (low balance or out of gas)
@@ -936,7 +933,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
for _, tx := range drops {
hash := tx.Hash()
log.Trace("Removed unpayable queued transaction", "hash", hash)
- delete(pool.all, hash)
+ pool.all.Remove(hash)
pool.priced.Removed()
queuedNofundsCounter.Inc(1)
}
@@ -952,7 +949,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
if !pool.locals.contains(addr) {
for _, tx := range list.Cap(int(pool.config.AccountQueue)) {
hash := tx.Hash()
- delete(pool.all, hash)
+ pool.all.Remove(hash)
pool.priced.Removed()
queuedRateLimitCounter.Inc(1)
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
@@ -1001,7 +998,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
for _, tx := range list.Cap(list.Len() - 1) {
// Drop the transaction from the global pools too
hash := tx.Hash()
- delete(pool.all, hash)
+ pool.all.Remove(hash)
pool.priced.Removed()
// Update the account nonce to the dropped transaction
@@ -1023,7 +1020,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
for _, tx := range list.Cap(list.Len() - 1) {
// Drop the transaction from the global pools too
hash := tx.Hash()
- delete(pool.all, hash)
+ pool.all.Remove(hash)
pool.priced.Removed()
// Update the account nonce to the dropped transaction
@@ -1092,7 +1089,7 @@ func (pool *TxPool) demoteUnexecutables() {
for _, tx := range list.Forward(nonce) {
hash := tx.Hash()
log.Trace("Removed old pending transaction", "hash", hash)
- delete(pool.all, hash)
+ pool.all.Remove(hash)
pool.priced.Removed()
}
// Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
@@ -1100,7 +1097,7 @@ func (pool *TxPool) demoteUnexecutables() {
for _, tx := range drops {
hash := tx.Hash()
log.Trace("Removed unpayable pending transaction", "hash", hash)
- delete(pool.all, hash)
+ pool.all.Remove(hash)
pool.priced.Removed()
pendingNofundsCounter.Inc(1)
}
@@ -1172,3 +1169,68 @@ func (as *accountSet) containsTx(tx *types.Transaction) bool {
func (as *accountSet) add(addr common.Address) {
as.accounts[addr] = struct{}{}
}
+
+// txLookup is used internally by TxPool to track transactions while allowing lookup without
+// mutex contention.
+//
+// Note, although this type is properly protected against concurrent access, it
+// is **not** a type that should ever be mutated or even exposed outside of the
+// transaction pool, since its internal state is tightly coupled with the pools
+// internal mechanisms. The sole purpose of the type is to permit out-of-bound
+// peeking into the pool in TxPool.Get without having to acquire the widely scoped
+// TxPool.mu mutex.
+type txLookup struct {
+ all map[common.Hash]*types.Transaction
+ lock sync.RWMutex
+}
+
+// newTxLookup returns a new txLookup structure.
+func newTxLookup() *txLookup {
+ return &txLookup{
+ all: make(map[common.Hash]*types.Transaction),
+ }
+}
+
+// Range calls f on each key and value present in the map.
+func (t *txLookup) Range(f func(hash common.Hash, tx *types.Transaction) bool) {
+ t.lock.RLock()
+ defer t.lock.RUnlock()
+
+ for key, value := range t.all {
+ if !f(key, value) {
+ break
+ }
+ }
+}
+
+// Get returns a transaction if it exists in the lookup, or nil if not found.
+func (t *txLookup) Get(hash common.Hash) *types.Transaction {
+ t.lock.RLock()
+ defer t.lock.RUnlock()
+
+ return t.all[hash]
+}
+
+// Count returns the current number of items in the lookup.
+func (t *txLookup) Count() int {
+ t.lock.RLock()
+ defer t.lock.RUnlock()
+
+ return len(t.all)
+}
+
+// Add adds a transaction to the lookup.
+func (t *txLookup) Add(tx *types.Transaction) {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ t.all[tx.Hash()] = tx
+}
+
+// Remove removes a transaction from the lookup.
+func (t *txLookup) Remove(hash common.Hash) {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ delete(t.all, hash)
+}