aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--core/tx_pool.go833
-rw-r--r--core/tx_pool_test.go163
2 files changed, 543 insertions, 453 deletions
diff --git a/core/tx_pool.go b/core/tx_pool.go
index b16825332..5d37fcffc 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -208,16 +208,14 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig {
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
type TxPool struct {
- config TxPoolConfig
- chainconfig *params.ChainConfig
- chain blockChain
- gasPrice *big.Int
- txFeed event.Feed
- scope event.SubscriptionScope
- chainHeadCh chan ChainHeadEvent
- chainHeadSub event.Subscription
- signer types.Signer
- mu sync.RWMutex
+ config TxPoolConfig
+ chainconfig *params.ChainConfig
+ chain blockChain
+ gasPrice *big.Int
+ txFeed event.Feed
+ scope event.SubscriptionScope
+ signer types.Signer
+ mu sync.RWMutex
currentState *state.StateDB // Current state in the blockchain head
pendingState *state.ManagedState // Pending state tracking virtual nonces
@@ -232,9 +230,18 @@ type TxPool struct {
all *txLookup // All transactions to allow lookups
priced *txPricedList // All transactions sorted by price
- wg sync.WaitGroup // for shutdown sync
+ chainHeadCh chan ChainHeadEvent
+ chainHeadSub event.Subscription
+ reqResetCh chan *txpoolResetRequest
+ reqPromoteCh chan *accountSet
+ queueTxEventCh chan *types.Transaction
+ reorgDoneCh chan chan struct{}
+ reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
+ wg sync.WaitGroup // tracks loop, scheduleReorgLoop
+}
- homestead bool
+type txpoolResetRequest struct {
+ oldHead, newHead *types.Header
}
// NewTxPool creates a new transaction pool to gather, sort and filter inbound
@@ -245,16 +252,21 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
// Create the transaction pool with its initial settings
pool := &TxPool{
- config: config,
- chainconfig: chainconfig,
- chain: chain,
- signer: types.NewEIP155Signer(chainconfig.ChainID),
- pending: make(map[common.Address]*txList),
- queue: make(map[common.Address]*txList),
- beats: make(map[common.Address]time.Time),
- all: newTxLookup(),
- chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
- gasPrice: new(big.Int).SetUint64(config.PriceLimit),
+ config: config,
+ chainconfig: chainconfig,
+ chain: chain,
+ signer: types.NewEIP155Signer(chainconfig.ChainID),
+ pending: make(map[common.Address]*txList),
+ queue: make(map[common.Address]*txList),
+ beats: make(map[common.Address]time.Time),
+ all: newTxLookup(),
+ chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
+ reqResetCh: make(chan *txpoolResetRequest),
+ reqPromoteCh: make(chan *accountSet),
+ queueTxEventCh: make(chan *types.Transaction),
+ reorgDoneCh: make(chan chan struct{}),
+ reorgShutdownCh: make(chan struct{}),
+ gasPrice: new(big.Int).SetUint64(config.PriceLimit),
}
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
@@ -264,6 +276,10 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
pool.priced = newTxPricedList(pool.all)
pool.reset(nil, chain.CurrentBlock().Header())
+ // Start the reorg loop early so it can handle requests generated during journal loading.
+ pool.wg.Add(1)
+ go pool.scheduleReorgLoop()
+
// If local transactions and journaling is enabled, load from disk
if !config.NoLocals && config.Journal != "" {
pool.journal = newTxJournal(config.Journal)
@@ -275,10 +291,9 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
log.Warn("Failed to rotate transaction journal", "err", err)
}
}
- // Subscribe events from blockchain
- pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
- // Start the event loop and return
+ // Subscribe events from blockchain and start the main event loop.
+ pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
pool.wg.Add(1)
go pool.loop()
@@ -291,38 +306,31 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
func (pool *TxPool) loop() {
defer pool.wg.Done()
- // Start the stats reporting and transaction eviction tickers
- var prevPending, prevQueued, prevStales int
-
- report := time.NewTicker(statsReportInterval)
+ var (
+ prevPending, prevQueued, prevStales int
+ // Start the stats reporting and transaction eviction tickers
+ report = time.NewTicker(statsReportInterval)
+ evict = time.NewTicker(evictionInterval)
+ journal = time.NewTicker(pool.config.Rejournal)
+ // Track the previous head headers for transaction reorgs
+ head = pool.chain.CurrentBlock()
+ )
defer report.Stop()
-
- evict := time.NewTicker(evictionInterval)
defer evict.Stop()
-
- journal := time.NewTicker(pool.config.Rejournal)
defer journal.Stop()
- // Track the previous head headers for transaction reorgs
- head := pool.chain.CurrentBlock()
-
- // Keep waiting for and reacting to the various events
for {
select {
// Handle ChainHeadEvent
case ev := <-pool.chainHeadCh:
if ev.Block != nil {
- pool.mu.Lock()
- if pool.chainconfig.IsHomestead(ev.Block.Number()) {
- pool.homestead = true
- }
- pool.reset(head.Header(), ev.Block.Header())
+ pool.requestReset(head.Header(), ev.Block.Header())
head = ev.Block
-
- pool.mu.Unlock()
}
- // Be unsubscribed due to system stopped
+
+ // System shutdown.
case <-pool.chainHeadSub.Err():
+ close(pool.reorgShutdownCh)
return
// Handle stats reporting ticks
@@ -367,114 +375,6 @@ func (pool *TxPool) loop() {
}
}
-// lockedReset is a wrapper around reset to allow calling it in a thread safe
-// manner. This method is only ever used in the tester!
-func (pool *TxPool) lockedReset(oldHead, newHead *types.Header) {
- pool.mu.Lock()
- defer pool.mu.Unlock()
-
- pool.reset(oldHead, newHead)
-}
-
-// reset retrieves the current state of the blockchain and ensures the content
-// of the transaction pool is valid with regard to the chain state.
-func (pool *TxPool) reset(oldHead, newHead *types.Header) {
- // If we're reorging an old state, reinject all dropped transactions
- var reinject types.Transactions
-
- if oldHead != nil && oldHead.Hash() != newHead.ParentHash {
- // If the reorg is too deep, avoid doing it (will happen during fast sync)
- oldNum := oldHead.Number.Uint64()
- newNum := newHead.Number.Uint64()
-
- if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 {
- log.Debug("Skipping deep transaction reorg", "depth", depth)
- } else {
- // Reorg seems shallow enough to pull in all transactions into memory
- var discarded, included types.Transactions
- var (
- rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64())
- add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64())
- )
- if rem == nil {
- // This can happen if a setHead is performed, where we simply discard the old
- // head from the chain.
- // If that is the case, we don't have the lost transactions any more, and
- // there's nothing to add
- if newNum < oldNum {
- // If the reorg ended up on a lower number, it's indicative of setHead being the cause
- log.Debug("Skipping transaction reset caused by setHead",
- "old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum)
- } else {
- // If we reorged to a same or higher number, then it's not a case of setHead
- log.Warn("Transaction pool reset with missing oldhead",
- "old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum)
- }
- return
- }
- for rem.NumberU64() > add.NumberU64() {
- discarded = append(discarded, rem.Transactions()...)
- if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
- log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
- return
- }
- }
- for add.NumberU64() > rem.NumberU64() {
- included = append(included, add.Transactions()...)
- if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
- log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
- return
- }
- }
- for rem.Hash() != add.Hash() {
- discarded = append(discarded, rem.Transactions()...)
- if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
- log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
- return
- }
- included = append(included, add.Transactions()...)
- if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
- log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
- return
- }
- }
- reinject = types.TxDifference(discarded, included)
- }
- }
- // Initialize the internal state to the current head
- if newHead == nil {
- newHead = pool.chain.CurrentBlock().Header() // Special case during testing
- }
- statedb, err := pool.chain.StateAt(newHead.Root)
- if err != nil {
- log.Error("Failed to reset txpool state", "err", err)
- return
- }
- pool.currentState = statedb
- pool.pendingState = state.ManageState(statedb)
- pool.currentMaxGas = newHead.GasLimit
-
- // Inject any transactions discarded due to reorgs
- log.Debug("Reinjecting stale transactions", "count", len(reinject))
- senderCacher.recover(pool.signer, reinject)
- pool.addTxsLocked(reinject, false)
-
- // validate the pool of pending transactions, this will remove
- // any transactions that have been included in the block or
- // have been invalidated because of another transaction (e.g.
- // higher gas price)
- pool.demoteUnexecutables()
-
- // Update all accounts to the latest known pending nonce
- for addr, list := range pool.pending {
- txs := list.Flatten() // Heavy but will be cached and is needed by the miner anyway
- pool.pendingState.SetNonce(addr, txs[len(txs)-1].Nonce()+1)
- }
- // Check the queue and move transactions over to the pending if possible
- // or remove those that have become invalid
- pool.promoteExecutables(nil)
-}
-
// Stop terminates the transaction pool.
func (pool *TxPool) Stop() {
// Unsubscribe all subscriptions registered from txpool
@@ -638,7 +538,8 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
return ErrInsufficientFunds
}
- intrGas, err := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead)
+ // Ensure the transaction has more gas than the basic tx fee.
+ intrGas, err := IntrinsicGas(tx.Data(), tx.To() == nil, true)
if err != nil {
return err
}
@@ -648,27 +549,28 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
return nil
}
-// add validates a transaction and inserts it into the non-executable queue for
-// later pending promotion and execution. If the transaction is a replacement for
-// an already pending or queued one, it overwrites the previous and returns this
-// so outer code doesn't uselessly call promote.
+// add validates a transaction and inserts it into the non-executable queue for later
+// pending promotion and execution. If the transaction is a replacement for an already
+// pending or queued one, it overwrites the previous transaction if its price is higher.
//
// If a newly added transaction is marked as local, its sending account will be
-// whitelisted, preventing any associated transaction from being dropped out of
-// the pool due to pricing constraints.
-func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
+// whitelisted, preventing any associated transaction from being dropped out of the pool
+// due to pricing constraints.
+func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err error) {
// If the transaction is already known, discard it
hash := tx.Hash()
if pool.all.Get(hash) != nil {
log.Trace("Discarding already known transaction", "hash", hash)
return false, fmt.Errorf("known transaction: %x", hash)
}
+
// If the transaction fails basic validation, discard it
if err := pool.validateTx(tx, local); err != nil {
log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
invalidTxMeter.Mark(1)
return false, err
}
+
// If the transaction pool is full, discard underpriced transactions
if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
// If the new transaction is underpriced, don't accept it
@@ -685,7 +587,8 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
pool.removeTx(tx.Hash(), false)
}
}
- // If the transaction is replacing an already pending one, do directly
+
+ // Try to replace an existing transaction in the pending pool
from, _ := types.Sender(pool.signer, tx) // already validated
if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
// Nonce already pending, check if required price bump is met
@@ -703,19 +606,17 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
pool.all.Add(tx)
pool.priced.Put(tx)
pool.journalTx(from, tx)
-
+ pool.queueTxEvent(tx)
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
-
- // We've directly injected a replacement transaction, notify subsystems
- go pool.txFeed.Send(NewTxsEvent{types.Transactions{tx}})
-
return old != nil, nil
}
+
// New transaction isn't replacing a pending one, push into queue
- replace, err := pool.enqueueTx(hash, tx)
+ replaced, err = pool.enqueueTx(hash, tx)
if err != nil {
return false, err
}
+
// Mark local addresses and journal local transactions
if local {
if !pool.locals.contains(from) {
@@ -729,7 +630,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
pool.journalTx(from, tx)
log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
- return replace, nil
+ return replaced, nil
}
// enqueueTx inserts a new transaction into the non-executable transaction queue.
@@ -817,94 +718,83 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
return true
}
-// AddLocal enqueues a single transaction into the pool if it is valid, marking
-// the sender as a local one in the mean time, ensuring it goes around the local
-// pricing constraints.
-func (pool *TxPool) AddLocal(tx *types.Transaction) error {
- return pool.addTx(tx, !pool.config.NoLocals)
-}
-
-// AddRemote enqueues a single transaction into the pool if it is valid. If the
-// sender is not among the locally tracked ones, full pricing constraints will
-// apply.
-func (pool *TxPool) AddRemote(tx *types.Transaction) error {
- return pool.addTx(tx, false)
+// AddLocals enqueues a batch of transactions into the pool if they are valid, marking the
+// senders as a local ones, ensuring they go around the local pricing constraints.
+//
+// This method is used to add transactions from the RPC API and performs synchronous pool
+// reorganization and event propagation.
+func (pool *TxPool) AddLocals(txs []*types.Transaction) []error {
+ return pool.addTxs(txs, !pool.config.NoLocals, true)
}
-// AddLocals enqueues a batch of transactions into the pool if they are valid,
-// marking the senders as a local ones in the mean time, ensuring they go around
-// the local pricing constraints.
-func (pool *TxPool) AddLocals(txs []*types.Transaction) []error {
- return pool.addTxs(txs, !pool.config.NoLocals)
+// AddLocal enqueues a single local transaction into the pool if it is valid. This is
+// a convenience wrapper aroundd AddLocals.
+func (pool *TxPool) AddLocal(tx *types.Transaction) error {
+ errs := pool.AddLocals([]*types.Transaction{tx})
+ return errs[0]
}
-// AddRemotes enqueues a batch of transactions into the pool if they are valid.
-// If the senders are not among the locally tracked ones, full pricing constraints
-// will apply.
+// AddRemotes enqueues a batch of transactions into the pool if they are valid. If the
+// senders are not among the locally tracked ones, full pricing constraints will apply.
+//
+// This method is used to add transactions from the p2p network and does not wait for pool
+// reorganization and internal event propagation.
func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error {
- return pool.addTxs(txs, false)
+ return pool.addTxs(txs, false, false)
}
-// addTx enqueues a single transaction into the pool if it is valid.
-func (pool *TxPool) addTx(tx *types.Transaction, local bool) error {
- // Cache sender in transaction before obtaining lock (pool.signer is immutable)
- types.Sender(pool.signer, tx)
-
- pool.mu.Lock()
- defer pool.mu.Unlock()
+// This is like AddRemotes, but waits for pool reorganization. Tests use this method.
+func (pool *TxPool) addRemotesSync(txs []*types.Transaction) []error {
+ return pool.addTxs(txs, false, true)
+}
- // Try to inject the transaction and update any state
- replace, err := pool.add(tx, local)
- if err != nil {
- return err
- }
- validMeter.Mark(1)
+// This is like AddRemotes with a single transaction, but waits for pool reorganization. Tests use this method.
+func (pool *TxPool) addRemoteSync(tx *types.Transaction) error {
+ errs := pool.addRemotesSync([]*types.Transaction{tx})
+ return errs[0]
+}
- // If we added a new transaction, run promotion checks and return
- if !replace {
- from, _ := types.Sender(pool.signer, tx) // already validated
- pool.promoteExecutables([]common.Address{from})
- }
- return nil
+// AddRemote enqueues a single transaction into the pool if it is valid. This is a convenience
+// wrapper around AddRemotes.
+//
+// Deprecated: use AddRemotes
+func (pool *TxPool) AddRemote(tx *types.Transaction) error {
+ errs := pool.AddRemotes([]*types.Transaction{tx})
+ return errs[0]
}
// addTxs attempts to queue a batch of transactions if they are valid.
-func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) []error {
+func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
// Cache senders in transactions before obtaining lock (pool.signer is immutable)
for _, tx := range txs {
types.Sender(pool.signer, tx)
}
+
pool.mu.Lock()
- defer pool.mu.Unlock()
+ errs, dirtyAddrs := pool.addTxsLocked(txs, local)
+ pool.mu.Unlock()
- return pool.addTxsLocked(txs, local)
+ done := pool.requestPromoteExecutables(dirtyAddrs)
+ if sync {
+ <-done
+ }
+ return errs
}
-// addTxsLocked attempts to queue a batch of transactions if they are valid,
-// whilst assuming the transaction pool lock is already held.
-func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) []error {
- // Add the batch of transactions, tracking the accepted ones
- dirty := make(map[common.Address]struct{})
+// addTxsLocked attempts to queue a batch of transactions if they are valid.
+// The transaction pool lock must be held.
+func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, *accountSet) {
+ dirty := newAccountSet(pool.signer)
errs := make([]error, len(txs))
-
for i, tx := range txs {
- var replace bool
- if replace, errs[i] = pool.add(tx, local); errs[i] == nil && !replace {
- from, _ := types.Sender(pool.signer, tx) // already validated
- dirty[from] = struct{}{}
- }
- }
- validMeter.Mark(int64(len(dirty)))
-
- // Only reprocess the internal state if something was actually added
- if len(dirty) > 0 {
- addrs := make([]common.Address, 0, len(dirty))
- for addr := range dirty {
- addrs = append(addrs, addr)
+ replaced, err := pool.add(tx, local)
+ errs[i] = err
+ if err == nil && !replaced {
+ dirty.addTx(tx)
}
- pool.promoteExecutables(addrs)
}
- return errs
+ validMeter.Mark(int64(len(dirty.accounts)))
+ return errs, dirty
}
// Status returns the status (unknown/pending/queued) of a batch of transactions
@@ -927,8 +817,7 @@ func (pool *TxPool) Status(hashes []common.Hash) []TxStatus {
return status
}
-// Get returns a transaction if it is contained in the pool
-// and nil otherwise.
+// Get returns a transaction if it is contained in the pool and nil otherwise.
func (pool *TxPool) Get(hash common.Hash) *types.Transaction {
return pool.all.Get(hash)
}
@@ -984,20 +873,261 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
}
}
+// requestPromoteExecutables requests a pool reset to the new head block.
+// The returned channel is closed when the reset has occurred.
+func (pool *TxPool) requestReset(oldHead *types.Header, newHead *types.Header) chan struct{} {
+ select {
+ case pool.reqResetCh <- &txpoolResetRequest{oldHead, newHead}:
+ return <-pool.reorgDoneCh
+ case <-pool.reorgShutdownCh:
+ return pool.reorgShutdownCh
+ }
+}
+
+// requestPromoteExecutables requests transaction promotion checks for the given addresses.
+// The returned channel is closed when the promotion checks have occurred.
+func (pool *TxPool) requestPromoteExecutables(set *accountSet) chan struct{} {
+ select {
+ case pool.reqPromoteCh <- set:
+ return <-pool.reorgDoneCh
+ case <-pool.reorgShutdownCh:
+ return pool.reorgShutdownCh
+ }
+}
+
+// queueTxEvent enqueues a transaction event to be sent in the next reorg run.
+func (pool *TxPool) queueTxEvent(tx *types.Transaction) {
+ select {
+ case pool.queueTxEventCh <- tx:
+ case <-pool.reorgShutdownCh:
+ }
+}
+
+// scheduleReorgLoop schedules runs of reset and promoteExecutables. Code above should not
+// call those methods directly, but request them being run using requestReset and
+// requestPromoteExecutables instead.
+func (pool *TxPool) scheduleReorgLoop() {
+ defer pool.wg.Done()
+
+ var (
+ curDone chan struct{} // non-nil while runReorg is active
+ nextDone = make(chan struct{})
+ launchNextRun bool
+ reset *txpoolResetRequest
+ dirtyAccounts *accountSet
+ queuedEvents = make(map[common.Address]*txSortedMap)
+ )
+ for {
+ // Launch next background reorg if needed
+ if curDone == nil && launchNextRun {
+ // Run the background reorg and announcements
+ go pool.runReorg(nextDone, reset, dirtyAccounts, queuedEvents)
+
+ // Prepare everything for the next round of reorg
+ curDone, nextDone = nextDone, make(chan struct{})
+ launchNextRun = false
+
+ reset, dirtyAccounts = nil, nil
+ queuedEvents = make(map[common.Address]*txSortedMap)
+ }
+
+ select {
+ case req := <-pool.reqResetCh:
+ // Reset request: update head if request is already pending.
+ if reset == nil {
+ reset = req
+ } else {
+ reset.newHead = req.newHead
+ }
+ launchNextRun = true
+ pool.reorgDoneCh <- nextDone
+
+ case req := <-pool.reqPromoteCh:
+ // Promote request: update address set if request is already pending.
+ if dirtyAccounts == nil {
+ dirtyAccounts = req
+ } else {
+ dirtyAccounts.merge(req)
+ }
+ launchNextRun = true
+ pool.reorgDoneCh <- nextDone
+
+ case tx := <-pool.queueTxEventCh:
+ // Queue up the event, but don't schedule a reorg. It's up to the caller to
+ // request one later if they want the events sent.
+ addr, _ := types.Sender(pool.signer, tx)
+ if _, ok := queuedEvents[addr]; !ok {
+ queuedEvents[addr] = newTxSortedMap()
+ }
+ queuedEvents[addr].Put(tx)
+
+ case <-curDone:
+ curDone = nil
+
+ case <-pool.reorgShutdownCh:
+ // Wait for current run to finish.
+ if curDone != nil {
+ <-curDone
+ }
+ close(nextDone)
+ return
+ }
+ }
+}
+
+// runReorg runs reset and promoteExecutables on behalf of scheduleReorgLoop.
+func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*txSortedMap) {
+ defer close(done)
+
+ var promoteAddrs []common.Address
+ if dirtyAccounts != nil {
+ promoteAddrs = dirtyAccounts.flatten()
+ }
+ pool.mu.Lock()
+ if reset != nil {
+ // Reset from the old head to the new, rescheduling any reorged transactions
+ pool.reset(reset.oldHead, reset.newHead)
+
+ // Nonces were reset, discard any events that became stale
+ for addr := range events {
+ events[addr].Forward(pool.pendingState.GetNonce(addr))
+ if events[addr].Len() == 0 {
+ delete(events, addr)
+ }
+ }
+ // Reset needs promote for all addresses
+ promoteAddrs = promoteAddrs[:0]
+ for addr := range pool.queue {
+ promoteAddrs = append(promoteAddrs, addr)
+ }
+ }
+ // Check for pending transactions for every account that sent new ones
+ promoted := pool.promoteExecutables(promoteAddrs)
+ for _, tx := range promoted {
+ addr, _ := types.Sender(pool.signer, tx)
+ if _, ok := events[addr]; !ok {
+ events[addr] = newTxSortedMap()
+ }
+ events[addr].Put(tx)
+ }
+ // If a new block appeared, validate the pool of pending transactions. This will
+ // remove any transaction that has been included in the block or was invalidated
+ // because of another transaction (e.g. higher gas price).
+ if reset != nil {
+ pool.demoteUnexecutables()
+ }
+ // Ensure pool.queue and pool.pending sizes stay within the configured limits.
+ pool.truncatePending()
+ pool.truncateQueue()
+
+ // Update all accounts to the latest known pending nonce
+ for addr, list := range pool.pending {
+ txs := list.Flatten() // Heavy but will be cached and is needed by the miner anyway
+ pool.pendingState.SetNonce(addr, txs[len(txs)-1].Nonce()+1)
+ }
+ pool.mu.Unlock()
+
+ // Notify subsystems for newly added transactions
+ if len(events) > 0 {
+ var txs []*types.Transaction
+ for _, set := range events {
+ txs = append(txs, set.Flatten()...)
+ }
+ pool.txFeed.Send(NewTxsEvent{txs})
+ }
+}
+
+// reset retrieves the current state of the blockchain and ensures the content
+// of the transaction pool is valid with regard to the chain state.
+func (pool *TxPool) reset(oldHead, newHead *types.Header) {
+ // If we're reorging an old state, reinject all dropped transactions
+ var reinject types.Transactions
+
+ if oldHead != nil && oldHead.Hash() != newHead.ParentHash {
+ // If the reorg is too deep, avoid doing it (will happen during fast sync)
+ oldNum := oldHead.Number.Uint64()
+ newNum := newHead.Number.Uint64()
+
+ if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 {
+ log.Debug("Skipping deep transaction reorg", "depth", depth)
+ } else {
+ // Reorg seems shallow enough to pull in all transactions into memory
+ var discarded, included types.Transactions
+ var (
+ rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64())
+ add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64())
+ )
+ if rem == nil {
+ // This can happen if a setHead is performed, where we simply discard the old
+ // head from the chain.
+ // If that is the case, we don't have the lost transactions any more, and
+ // there's nothing to add
+ if newNum < oldNum {
+ // If the reorg ended up on a lower number, it's indicative of setHead being the cause
+ log.Debug("Skipping transaction reset caused by setHead",
+ "old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum)
+ } else {
+ // If we reorged to a same or higher number, then it's not a case of setHead
+ log.Warn("Transaction pool reset with missing oldhead",
+ "old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum)
+ }
+ return
+ }
+ for rem.NumberU64() > add.NumberU64() {
+ discarded = append(discarded, rem.Transactions()...)
+ if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
+ log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
+ return
+ }
+ }
+ for add.NumberU64() > rem.NumberU64() {
+ included = append(included, add.Transactions()...)
+ if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
+ log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
+ return
+ }
+ }
+ for rem.Hash() != add.Hash() {
+ discarded = append(discarded, rem.Transactions()...)
+ if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
+ log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
+ return
+ }
+ included = append(included, add.Transactions()...)
+ if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
+ log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
+ return
+ }
+ }
+ reinject = types.TxDifference(discarded, included)
+ }
+ }
+ // Initialize the internal state to the current head
+ if newHead == nil {
+ newHead = pool.chain.CurrentBlock().Header() // Special case during testing
+ }
+ statedb, err := pool.chain.StateAt(newHead.Root)
+ if err != nil {
+ log.Error("Failed to reset txpool state", "err", err)
+ return
+ }
+ pool.currentState = statedb
+ pool.pendingState = state.ManageState(statedb)
+ pool.currentMaxGas = newHead.GasLimit
+
+ // Inject any transactions discarded due to reorgs
+ log.Debug("Reinjecting stale transactions", "count", len(reinject))
+ senderCacher.recover(pool.signer, reinject)
+ pool.addTxsLocked(reinject, false)
+}
+
// promoteExecutables moves transactions that have become processable from the
// future queue to the set of pending transactions. During this process, all
// invalidated transactions (low nonce, low balance) are deleted.
-func (pool *TxPool) promoteExecutables(accounts []common.Address) {
+func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Transaction {
// Track the promoted transactions to broadcast them at once
var promoted []*types.Transaction
- // Gather all the accounts potentially needing updates
- if accounts == nil {
- accounts = make([]common.Address, 0, len(pool.queue))
- for addr := range pool.queue {
- accounts = append(accounts, addr)
- }
- }
// Iterate over all accounts and promote any executable transactions
for _, addr := range accounts {
list := pool.queue[addr]
@@ -1053,69 +1183,46 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
delete(pool.queue, addr)
}
}
- // Notify subsystem for new promoted transactions.
- if len(promoted) > 0 {
- go pool.txFeed.Send(NewTxsEvent{promoted})
- }
- // If the pending limit is overflown, start equalizing allowances
+ return promoted
+}
+
+// truncatePending removes transactions from the pending queue if the pool is above the
+// pending limit. The algorithm tries to reduce transaction counts by an approximately
+// equal number for all for accounts with many pending transactions.
+func (pool *TxPool) truncatePending() {
pending := uint64(0)
for _, list := range pool.pending {
pending += uint64(list.Len())
}
- if pending > pool.config.GlobalSlots {
- pendingBeforeCap := pending
- // Assemble a spam order to penalize large transactors first
- spammers := prque.New(nil)
- for addr, list := range pool.pending {
- // Only evict transactions from high rollers
- if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
- spammers.Push(addr, int64(list.Len()))
- }
- }
- // Gradually drop transactions from offenders
- offenders := []common.Address{}
- for pending > pool.config.GlobalSlots && !spammers.Empty() {
- // Retrieve the next offender if not local address
- offender, _ := spammers.Pop()
- offenders = append(offenders, offender.(common.Address))
-
- // Equalize balances until all the same or below threshold
- if len(offenders) > 1 {
- // Calculate the equalization threshold for all current offenders
- threshold := pool.pending[offender.(common.Address)].Len()
-
- // Iteratively reduce all offenders until below limit or threshold reached
- for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold {
- for i := 0; i < len(offenders)-1; i++ {
- list := pool.pending[offenders[i]]
-
- caps := list.Cap(list.Len() - 1)
- for _, tx := range caps {
- // Drop the transaction from the global pools too
- hash := tx.Hash()
- pool.all.Remove(hash)
-
- // Update the account nonce to the dropped transaction
- if nonce := tx.Nonce(); pool.pendingState.GetNonce(offenders[i]) > nonce {
- pool.pendingState.SetNonce(offenders[i], nonce)
- }
- log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
- }
- pool.priced.Removed(len(caps))
- pendingCounter.Dec(int64(len(caps)))
- if pool.locals.contains(offenders[i]) {
- localCounter.Dec(int64(len(caps)))
- }
- pending--
- }
- }
- }
+ if pending <= pool.config.GlobalSlots {
+ return
+ }
+
+ pendingBeforeCap := pending
+ // Assemble a spam order to penalize large transactors first
+ spammers := prque.New(nil)
+ for addr, list := range pool.pending {
+ // Only evict transactions from high rollers
+ if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
+ spammers.Push(addr, int64(list.Len()))
}
- // If still above threshold, reduce to limit or min allowance
- if pending > pool.config.GlobalSlots && len(offenders) > 0 {
- for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots {
- for _, addr := range offenders {
- list := pool.pending[addr]
+ }
+ // Gradually drop transactions from offenders
+ offenders := []common.Address{}
+ for pending > pool.config.GlobalSlots && !spammers.Empty() {
+ // Retrieve the next offender if not local address
+ offender, _ := spammers.Pop()
+ offenders = append(offenders, offender.(common.Address))
+
+ // Equalize balances until all the same or below threshold
+ if len(offenders) > 1 {
+ // Calculate the equalization threshold for all current offenders
+ threshold := pool.pending[offender.(common.Address)].Len()
+
+ // Iteratively reduce all offenders until below limit or threshold reached
+ for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold {
+ for i := 0; i < len(offenders)-1; i++ {
+ list := pool.pending[offenders[i]]
caps := list.Cap(list.Len() - 1)
for _, tx := range caps {
@@ -1124,60 +1231,93 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
pool.all.Remove(hash)
// Update the account nonce to the dropped transaction
- if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce {
- pool.pendingState.SetNonce(addr, nonce)
+ if nonce := tx.Nonce(); pool.pendingState.GetNonce(offenders[i]) > nonce {
+ pool.pendingState.SetNonce(offenders[i], nonce)
}
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
}
pool.priced.Removed(len(caps))
pendingCounter.Dec(int64(len(caps)))
- if pool.locals.contains(addr) {
+ if pool.locals.contains(offenders[i]) {
localCounter.Dec(int64(len(caps)))
}
pending--
}
}
}
- pendingRateLimitMeter.Mark(int64(pendingBeforeCap - pending))
}
- // If we've queued more transactions than the hard limit, drop oldest ones
+
+ // If still above threshold, reduce to limit or min allowance
+ if pending > pool.config.GlobalSlots && len(offenders) > 0 {
+ for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots {
+ for _, addr := range offenders {
+ list := pool.pending[addr]
+
+ caps := list.Cap(list.Len() - 1)
+ for _, tx := range caps {
+ // Drop the transaction from the global pools too
+ hash := tx.Hash()
+ pool.all.Remove(hash)
+
+ // Update the account nonce to the dropped transaction
+ if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce {
+ pool.pendingState.SetNonce(addr, nonce)
+ }
+ log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
+ }
+ pool.priced.Removed(len(caps))
+ pendingCounter.Dec(int64(len(caps)))
+ if pool.locals.contains(addr) {
+ localCounter.Dec(int64(len(caps)))
+ }
+ pending--
+ }
+ }
+ }
+ pendingRateLimitMeter.Mark(int64(pendingBeforeCap - pending))
+}
+
+// truncateQueue drops the oldes transactions in the queue if the pool is above the global queue limit.
+func (pool *TxPool) truncateQueue() {
queued := uint64(0)
for _, list := range pool.queue {
queued += uint64(list.Len())
}
- if queued > pool.config.GlobalQueue {
- // Sort all accounts with queued transactions by heartbeat
- addresses := make(addressesByHeartbeat, 0, len(pool.queue))
- for addr := range pool.queue {
- if !pool.locals.contains(addr) { // don't drop locals
- addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
- }
+ if queued <= pool.config.GlobalQueue {
+ return
+ }
+
+ // Sort all accounts with queued transactions by heartbeat
+ addresses := make(addressesByHeartbeat, 0, len(pool.queue))
+ for addr := range pool.queue {
+ if !pool.locals.contains(addr) { // don't drop locals
+ addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
}
- sort.Sort(addresses)
+ }
+ sort.Sort(addresses)
- // Drop transactions until the total is below the limit or only locals remain
- for drop := queued - pool.config.GlobalQueue; drop > 0 && len(addresses) > 0; {
- addr := addresses[len(addresses)-1]
- list := pool.queue[addr.address]
+ // Drop transactions until the total is below the limit or only locals remain
+ for drop := queued - pool.config.GlobalQueue; drop > 0 && len(addresses) > 0; {
+ addr := addresses[len(addresses)-1]
+ list := pool.queue[addr.address]
- addresses = addresses[:len(addresses)-1]
+ addresses = addresses[:len(addresses)-1]
- // Drop all transactions if they are less than the overflow
- if size := uint64(list.Len()); size <= drop {
- for _, tx := range list.Flatten() {
- pool.removeTx(tx.Hash(), true)
- }
- drop -= size
- queuedRateLimitMeter.Mark(int64(size))
- continue
- }
- // Otherwise drop only last few transactions
- txs := list.Flatten()
- for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
- pool.removeTx(txs[i].Hash(), true)
- drop--
- queuedRateLimitMeter.Mark(1)
+ // Drop all transactions if they are less than the overflow
+ if size := uint64(list.Len()); size <= drop {
+ for _, tx := range list.Flatten() {
+ pool.removeTx(tx.Hash(), true)
}
+ drop -= size
+ queuedRateLimitMeter.Mark(int64(size))
+ continue
+ }
+ // Otherwise drop only last few transactions
+ txs := list.Flatten()
+ for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
+ pool.removeTx(txs[i].Hash(), true)
+ drop--
+ queuedRateLimitMeter.Mark(1)
}
}
}
@@ -1224,7 +1364,7 @@ func (pool *TxPool) demoteUnexecutables() {
log.Error("Demoting invalidated transaction", "hash", hash)
pool.enqueueTx(hash, tx)
}
- pendingCounter.Inc(int64(len(gapped)))
+ pendingCounter.Dec(int64(len(gapped)))
}
// Delete the entire queue entry if it became empty.
if list.Empty() {
@@ -1256,11 +1396,15 @@ type accountSet struct {
// newAccountSet creates a new address set with an associated signer for sender
// derivations.
-func newAccountSet(signer types.Signer) *accountSet {
- return &accountSet{
+func newAccountSet(signer types.Signer, addrs ...common.Address) *accountSet {
+ as := &accountSet{
accounts: make(map[common.Address]struct{}),
signer: signer,
}
+ for _, addr := range addrs {
+ as.add(addr)
+ }
+ return as
}
// contains checks if a given address is contained within the set.
@@ -1284,6 +1428,13 @@ func (as *accountSet) add(addr common.Address) {
as.cache = nil
}
+// addTx adds the sender of tx into the set.
+func (as *accountSet) addTx(tx *types.Transaction) {
+ if addr, err := types.Sender(as.signer, tx); err == nil {
+ as.add(addr)
+ }
+}
+
// flatten returns the list of addresses within this set, also caching it for later
// reuse. The returned slice should not be changed!
func (as *accountSet) flatten() []common.Address {
@@ -1297,6 +1448,14 @@ func (as *accountSet) flatten() []common.Address {
return *as.cache
}
+// merge adds all addresses from the 'other' set into 'as'.
+func (as *accountSet) merge(other *accountSet) {
+ for addr := range other.accounts {
+ as.accounts[addr] = struct{}{}
+ }
+ as.cache = nil
+}
+
// txLookup is used internally by TxPool to track transactions while allowing lookup without
// mutex contention.
//
diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go
index 50c73cf53..7f1832a43 100644
--- a/core/tx_pool_test.go
+++ b/core/tx_pool_test.go
@@ -200,7 +200,7 @@ func TestStateChangeDuringTransactionPoolReset(t *testing.T) {
t.Fatalf("Invalid nonce, want 0, got %d", nonce)
}
- pool.AddRemotes(types.Transactions{tx0, tx1})
+ pool.addRemotesSync([]*types.Transaction{tx0, tx1})
nonce = pool.State().GetNonce(address)
if nonce != 2 {
@@ -209,8 +209,7 @@ func TestStateChangeDuringTransactionPoolReset(t *testing.T) {
// trigger state change in the background
trigger = true
-
- pool.lockedReset(nil, nil)
+ <-pool.requestReset(nil, nil)
_, err := pool.Pending()
if err != nil {
@@ -268,10 +267,10 @@ func TestTransactionQueue(t *testing.T) {
tx := transaction(0, 100, key)
from, _ := deriveSender(tx)
pool.currentState.AddBalance(from, big.NewInt(1000))
- pool.lockedReset(nil, nil)
- pool.enqueueTx(tx.Hash(), tx)
+ <-pool.requestReset(nil, nil)
- pool.promoteExecutables([]common.Address{from})
+ pool.enqueueTx(tx.Hash(), tx)
+ <-pool.requestPromoteExecutables(newAccountSet(pool.signer, from))
if len(pool.pending) != 1 {
t.Error("expected valid txs to be 1 is", len(pool.pending))
}
@@ -280,33 +279,36 @@ func TestTransactionQueue(t *testing.T) {
from, _ = deriveSender(tx)
pool.currentState.SetNonce(from, 2)
pool.enqueueTx(tx.Hash(), tx)
- pool.promoteExecutables([]common.Address{from})
+
+ <-pool.requestPromoteExecutables(newAccountSet(pool.signer, from))
if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok {
t.Error("expected transaction to be in tx pool")
}
-
if len(pool.queue) > 0 {
t.Error("expected transaction queue to be empty. is", len(pool.queue))
}
+}
- pool, key = setupTxPool()
+func TestTransactionQueue2(t *testing.T) {
+ t.Parallel()
+
+ pool, key := setupTxPool()
defer pool.Stop()
tx1 := transaction(0, 100, key)
tx2 := transaction(10, 100, key)
tx3 := transaction(11, 100, key)
- from, _ = deriveSender(tx1)
+ from, _ := deriveSender(tx1)
pool.currentState.AddBalance(from, big.NewInt(1000))
- pool.lockedReset(nil, nil)
+ pool.reset(nil, nil)
pool.enqueueTx(tx1.Hash(), tx1)
pool.enqueueTx(tx2.Hash(), tx2)
pool.enqueueTx(tx3.Hash(), tx3)
pool.promoteExecutables([]common.Address{from})
-
if len(pool.pending) != 1 {
- t.Error("expected tx pool to be 1, got", len(pool.pending))
+ t.Error("expected pending length to be 1, got", len(pool.pending))
}
if pool.queue[from].Len() != 2 {
t.Error("expected len(queue) == 2, got", pool.queue[from].Len())
@@ -339,7 +341,7 @@ func TestTransactionChainFork(t *testing.T) {
statedb.AddBalance(addr, big.NewInt(100000000000000))
pool.chain = &testBlockChain{statedb, 1000000, new(event.Feed)}
- pool.lockedReset(nil, nil)
+ <-pool.requestReset(nil, nil)
}
resetState()
@@ -368,7 +370,7 @@ func TestTransactionDoubleNonce(t *testing.T) {
statedb.AddBalance(addr, big.NewInt(100000000000000))
pool.chain = &testBlockChain{statedb, 1000000, new(event.Feed)}
- pool.lockedReset(nil, nil)
+ <-pool.requestReset(nil, nil)
}
resetState()
@@ -384,16 +386,17 @@ func TestTransactionDoubleNonce(t *testing.T) {
if replace, err := pool.add(tx2, false); err != nil || !replace {
t.Errorf("second transaction insert failed (%v) or not reported replacement (%v)", err, replace)
}
- pool.promoteExecutables([]common.Address{addr})
+ <-pool.requestPromoteExecutables(newAccountSet(signer, addr))
if pool.pending[addr].Len() != 1 {
t.Error("expected 1 pending transactions, got", pool.pending[addr].Len())
}
if tx := pool.pending[addr].txs.items[0]; tx.Hash() != tx2.Hash() {
t.Errorf("transaction mismatch: have %x, want %x", tx.Hash(), tx2.Hash())
}
+
// Add the third transaction and ensure it's not saved (smaller price)
pool.add(tx3, false)
- pool.promoteExecutables([]common.Address{addr})
+ <-pool.requestPromoteExecutables(newAccountSet(signer, addr))
if pool.pending[addr].Len() != 1 {
t.Error("expected 1 pending transactions, got", pool.pending[addr].Len())
}
@@ -439,7 +442,7 @@ func TestTransactionNonceRecovery(t *testing.T) {
addr := crypto.PubkeyToAddress(key.PublicKey)
pool.currentState.SetNonce(addr, n)
pool.currentState.AddBalance(addr, big.NewInt(100000000000000))
- pool.lockedReset(nil, nil)
+ <-pool.requestReset(nil, nil)
tx := transaction(n, 100000, key)
if err := pool.AddRemote(tx); err != nil {
@@ -447,7 +450,7 @@ func TestTransactionNonceRecovery(t *testing.T) {
}
// simulate some weird re-order of transactions and missing nonce(s)
pool.currentState.SetNonce(addr, n-1)
- pool.lockedReset(nil, nil)
+ <-pool.requestReset(nil, nil)
if fn := pool.pendingState.GetNonce(addr); fn != n-1 {
t.Errorf("expected nonce to be %d, got %d", n-1, fn)
}
@@ -491,7 +494,7 @@ func TestTransactionDropping(t *testing.T) {
if pool.all.Count() != 6 {
t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 6)
}
- pool.lockedReset(nil, nil)
+ <-pool.requestReset(nil, nil)
if pool.pending[account].Len() != 3 {
t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), 3)
}
@@ -503,7 +506,7 @@ func TestTransactionDropping(t *testing.T) {
}
// Reduce the balance of the account, and check that invalidated transactions are dropped
pool.currentState.AddBalance(account, big.NewInt(-650))
- pool.lockedReset(nil, nil)
+ <-pool.requestReset(nil, nil)
if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok {
t.Errorf("funded pending transaction missing: %v", tx0)
@@ -528,7 +531,7 @@ func TestTransactionDropping(t *testing.T) {
}
// Reduce the block gas limit, check that invalidated transactions are dropped
pool.chain.(*testBlockChain).gasLimit = 100
- pool.lockedReset(nil, nil)
+ <-pool.requestReset(nil, nil)
if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok {
t.Errorf("funded pending transaction missing: %v", tx0)
@@ -584,7 +587,7 @@ func TestTransactionPostponing(t *testing.T) {
txs = append(txs, tx)
}
}
- for i, err := range pool.AddRemotes(txs) {
+ for i, err := range pool.addRemotesSync(txs) {
if err != nil {
t.Fatalf("tx %d: failed to add transactions: %v", i, err)
}
@@ -599,7 +602,7 @@ func TestTransactionPostponing(t *testing.T) {
if pool.all.Count() != len(txs) {
t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), len(txs))
}
- pool.lockedReset(nil, nil)
+ <-pool.requestReset(nil, nil)
if pending := pool.pending[accs[0]].Len() + pool.pending[accs[1]].Len(); pending != len(txs) {
t.Errorf("pending transaction mismatch: have %d, want %d", pending, len(txs))
}
@@ -613,7 +616,7 @@ func TestTransactionPostponing(t *testing.T) {
for _, addr := range accs {
pool.currentState.AddBalance(addr, big.NewInt(-1))
}
- pool.lockedReset(nil, nil)
+ <-pool.requestReset(nil, nil)
// The first account's first transaction remains valid, check that subsequent
// ones are either filtered out, or queued up for later.
@@ -680,12 +683,10 @@ func TestTransactionGapFilling(t *testing.T) {
defer sub.Unsubscribe()
// Create a pending and a queued transaction with a nonce-gap in between
- if err := pool.AddRemote(transaction(0, 100000, key)); err != nil {
- t.Fatalf("failed to add pending transaction: %v", err)
- }
- if err := pool.AddRemote(transaction(2, 100000, key)); err != nil {
- t.Fatalf("failed to add queued transaction: %v", err)
- }
+ pool.addRemotesSync([]*types.Transaction{
+ transaction(0, 100000, key),
+ transaction(2, 100000, key),
+ })
pending, queued := pool.Stats()
if pending != 1 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1)
@@ -700,7 +701,7 @@ func TestTransactionGapFilling(t *testing.T) {
t.Fatalf("pool internal state corrupted: %v", err)
}
// Fill the nonce gap and ensure all transactions become pending
- if err := pool.AddRemote(transaction(1, 100000, key)); err != nil {
+ if err := pool.addRemoteSync(transaction(1, 100000, key)); err != nil {
t.Fatalf("failed to add gapped transaction: %v", err)
}
pending, queued = pool.Stats()
@@ -732,7 +733,7 @@ func TestTransactionQueueAccountLimiting(t *testing.T) {
// Keep queuing up transactions and make sure all above a limit are dropped
for i := uint64(1); i <= testTxPoolConfig.AccountQueue+5; i++ {
- if err := pool.AddRemote(transaction(i, 100000, key)); err != nil {
+ if err := pool.addRemoteSync(transaction(i, 100000, key)); err != nil {
t.Fatalf("tx %d: failed to add transaction: %v", i, err)
}
if len(pool.pending) != 0 {
@@ -799,7 +800,7 @@ func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) {
nonces[addr]++
}
// Import the batch and verify that limits have been enforced
- pool.AddRemotes(txs)
+ pool.addRemotesSync(txs)
queued := 0
for addr, list := range pool.queue {
@@ -932,7 +933,7 @@ func TestTransactionPendingLimiting(t *testing.T) {
// Keep queuing up transactions and make sure all above a limit are dropped
for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ {
- if err := pool.AddRemote(transaction(i, 100000, key)); err != nil {
+ if err := pool.addRemoteSync(transaction(i, 100000, key)); err != nil {
t.Fatalf("tx %d: failed to add transaction: %v", i, err)
}
if pool.pending[account].Len() != int(i)+1 {
@@ -953,57 +954,6 @@ func TestTransactionPendingLimiting(t *testing.T) {
}
}
-// Tests that the transaction limits are enforced the same way irrelevant whether
-// the transactions are added one by one or in batches.
-func TestTransactionQueueLimitingEquivalency(t *testing.T) { testTransactionLimitingEquivalency(t, 1) }
-func TestTransactionPendingLimitingEquivalency(t *testing.T) { testTransactionLimitingEquivalency(t, 0) }
-
-func testTransactionLimitingEquivalency(t *testing.T, origin uint64) {
- t.Parallel()
-
- // Add a batch of transactions to a pool one by one
- pool1, key1 := setupTxPool()
- defer pool1.Stop()
-
- account1, _ := deriveSender(transaction(0, 0, key1))
- pool1.currentState.AddBalance(account1, big.NewInt(1000000))
-
- for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ {
- if err := pool1.AddRemote(transaction(origin+i, 100000, key1)); err != nil {
- t.Fatalf("tx %d: failed to add transaction: %v", i, err)
- }
- }
- // Add a batch of transactions to a pool in one big batch
- pool2, key2 := setupTxPool()
- defer pool2.Stop()
-
- account2, _ := deriveSender(transaction(0, 0, key2))
- pool2.currentState.AddBalance(account2, big.NewInt(1000000))
-
- txs := []*types.Transaction{}
- for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ {
- txs = append(txs, transaction(origin+i, 100000, key2))
- }
- pool2.AddRemotes(txs)
-
- // Ensure the batch optimization honors the same pool mechanics
- if len(pool1.pending) != len(pool2.pending) {
- t.Errorf("pending transaction count mismatch: one-by-one algo: %d, batch algo: %d", len(pool1.pending), len(pool2.pending))
- }
- if len(pool1.queue) != len(pool2.queue) {
- t.Errorf("queued transaction count mismatch: one-by-one algo: %d, batch algo: %d", len(pool1.queue), len(pool2.queue))
- }
- if pool1.all.Count() != pool2.all.Count() {
- t.Errorf("total transaction count mismatch: one-by-one algo %d, batch algo %d", pool1.all.Count(), pool2.all.Count())
- }
- if err := validateTxPoolInternals(pool1); err != nil {
- t.Errorf("pool 1 internal state corrupted: %v", err)
- }
- if err := validateTxPoolInternals(pool2); err != nil {
- t.Errorf("pool 2 internal state corrupted: %v", err)
- }
-}
-
// Tests that if the transaction count belonging to multiple accounts go above
// some hard threshold, the higher transactions are dropped to prevent DOS
// attacks.
@@ -1038,7 +988,7 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) {
}
}
// Import the batch and verify that limits have been enforced
- pool.AddRemotes(txs)
+ pool.addRemotesSync(txs)
pending := 0
for _, list := range pool.pending {
@@ -1118,7 +1068,7 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) {
}
}
// Import the batch and verify that limits have been enforced
- pool.AddRemotes(txs)
+ pool.addRemotesSync(txs)
for addr, list := range pool.pending {
if list.Len() != int(config.AccountSlots) {
@@ -1174,7 +1124,7 @@ func TestTransactionPoolRepricing(t *testing.T) {
ltx := pricedTransaction(0, 100000, big.NewInt(1), keys[3])
// Import the batch and that both pending and queued transactions match up
- pool.AddRemotes(txs)
+ pool.addRemotesSync(txs)
pool.AddLocal(ltx)
pending, queued := pool.Stats()
@@ -1454,7 +1404,7 @@ func TestTransactionPoolStableUnderpricing(t *testing.T) {
for i := uint64(0); i < config.GlobalSlots; i++ {
txs = append(txs, pricedTransaction(i, 100000, big.NewInt(1), keys[0]))
}
- pool.AddRemotes(txs)
+ pool.addRemotesSync(txs)
pending, queued := pool.Stats()
if pending != int(config.GlobalSlots) {
@@ -1470,7 +1420,7 @@ func TestTransactionPoolStableUnderpricing(t *testing.T) {
t.Fatalf("pool internal state corrupted: %v", err)
}
// Ensure that adding high priced transactions drops a cheap, but doesn't produce a gap
- if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(3), keys[1])); err != nil {
+ if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(3), keys[1])); err != nil {
t.Fatalf("failed to add well priced transaction: %v", err)
}
pending, queued = pool.Stats()
@@ -1513,7 +1463,7 @@ func TestTransactionReplacement(t *testing.T) {
price := int64(100)
threshold := (price * (100 + int64(testTxPoolConfig.PriceBump))) / 100
- if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(1), key)); err != nil {
+ if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(1), key)); err != nil {
t.Fatalf("failed to add original cheap pending transaction: %v", err)
}
if err := pool.AddRemote(pricedTransaction(0, 100001, big.NewInt(1), key)); err != ErrReplaceUnderpriced {
@@ -1526,7 +1476,7 @@ func TestTransactionReplacement(t *testing.T) {
t.Fatalf("cheap replacement event firing failed: %v", err)
}
- if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(price), key)); err != nil {
+ if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(price), key)); err != nil {
t.Fatalf("failed to add original proper pending transaction: %v", err)
}
if err := pool.AddRemote(pricedTransaction(0, 100001, big.NewInt(threshold-1), key)); err != ErrReplaceUnderpriced {
@@ -1538,6 +1488,7 @@ func TestTransactionReplacement(t *testing.T) {
if err := validateEvents(events, 2); err != nil {
t.Fatalf("proper replacement event firing failed: %v", err)
}
+
// Add queued transactions, ensuring the minimum price bump is enforced for replacement (for ultra low prices too)
if err := pool.AddRemote(pricedTransaction(2, 100000, big.NewInt(1), key)); err != nil {
t.Fatalf("failed to add original cheap queued transaction: %v", err)
@@ -1615,7 +1566,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
if err := pool.AddLocal(pricedTransaction(2, 100000, big.NewInt(1), local)); err != nil {
t.Fatalf("failed to add local transaction: %v", err)
}
- if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(1), remote)); err != nil {
+ if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(1), remote)); err != nil {
t.Fatalf("failed to add remote transaction: %v", err)
}
pending, queued := pool.Stats()
@@ -1653,7 +1604,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
}
// Bump the nonce temporarily and ensure the newly invalidated transaction is removed
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 2)
- pool.lockedReset(nil, nil)
+ <-pool.requestReset(nil, nil)
time.Sleep(2 * config.Rejournal)
pool.Stop()
@@ -1707,7 +1658,7 @@ func TestTransactionStatusCheck(t *testing.T) {
txs = append(txs, pricedTransaction(2, 100000, big.NewInt(1), keys[2])) // Queued only
// Import the transaction and ensure they are correctly added
- pool.AddRemotes(txs)
+ pool.addRemotesSync(txs)
pending, queued := pool.Stats()
if pending != 2 {
@@ -1786,26 +1737,6 @@ func benchmarkFuturePromotion(b *testing.B, size int) {
}
}
-// Benchmarks the speed of iterative transaction insertion.
-func BenchmarkPoolInsert(b *testing.B) {
- // Generate a batch of transactions to enqueue into the pool
- pool, key := setupTxPool()
- defer pool.Stop()
-
- account, _ := deriveSender(transaction(0, 0, key))
- pool.currentState.AddBalance(account, big.NewInt(1000000))
-
- txs := make(types.Transactions, b.N)
- for i := 0; i < b.N; i++ {
- txs[i] = transaction(uint64(i), 100000, key)
- }
- // Benchmark importing the transactions into the queue
- b.ResetTimer()
- for _, tx := range txs {
- pool.AddRemote(tx)
- }
-}
-
// Benchmarks the speed of batched transaction insertion.
func BenchmarkPoolBatchInsert100(b *testing.B) { benchmarkPoolBatchInsert(b, 100) }
func BenchmarkPoolBatchInsert1000(b *testing.B) { benchmarkPoolBatchInsert(b, 1000) }