aboutsummaryrefslogtreecommitdiffstats
path: root/core/tx_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/tx_pool.go')
-rw-r--r--core/tx_pool.go581
1 files changed, 317 insertions, 264 deletions
diff --git a/core/tx_pool.go b/core/tx_pool.go
index 596356377..f8b11a7ce 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -45,8 +45,11 @@ var (
ErrNegativeValue = errors.New("Negative value")
)
-const (
- maxQueued = 64 // max limit of queued txs per address
+var (
+ maxQueuedPerAccount = uint64(64) // Max limit of queued transactions per address
+ maxQueuedInTotal = uint64(65536) // Max limit of queued transactions from all accounts
+ maxQueuedLifetime = 3 * time.Hour // Max amount of time transactions from idle accounts are queued
+ evictionInterval = time.Minute // Time interval to check for evictable transactions
)
type stateFn func() (*state.StateDB, error)
@@ -68,10 +71,14 @@ type TxPool struct {
events event.Subscription
localTx *txSet
mu sync.RWMutex
- pending map[common.Hash]*types.Transaction // processable transactions
- queue map[common.Address]map[common.Hash]*types.Transaction
- wg sync.WaitGroup // for shutdown sync
+ pending map[common.Address]*txList // All currently processable transactions
+ queue map[common.Address]*txList // Queued but non-processable transactions
+ all map[common.Hash]*types.Transaction // All transactions to allow lookups
+ beats map[common.Address]time.Time // Last heartbeat from each known account
+
+ wg sync.WaitGroup // for shutdown sync
+ quit chan struct{}
homestead bool
}
@@ -79,8 +86,10 @@ type TxPool struct {
func NewTxPool(config *ChainConfig, eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool {
pool := &TxPool{
config: config,
- pending: make(map[common.Hash]*types.Transaction),
- queue: make(map[common.Address]map[common.Hash]*types.Transaction),
+ pending: make(map[common.Address]*txList),
+ queue: make(map[common.Address]*txList),
+ all: make(map[common.Hash]*types.Transaction),
+ beats: make(map[common.Address]time.Time),
eventMux: eventMux,
currentState: currentStateFn,
gasLimit: gasLimitFn,
@@ -88,10 +97,12 @@ func NewTxPool(config *ChainConfig, eventMux *event.TypeMux, currentStateFn stat
pendingState: nil,
localTx: newTxSet(),
events: eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}, RemovedTransactionEvent{}),
+ quit: make(chan struct{}),
}
- pool.wg.Add(1)
+ pool.wg.Add(2)
go pool.eventLoop()
+ go pool.expirationLoop()
return pool
}
@@ -117,7 +128,7 @@ func (pool *TxPool) eventLoop() {
pool.minGasPrice = ev.Price
pool.mu.Unlock()
case RemovedTransactionEvent:
- pool.AddTransactions(ev.Txs)
+ pool.AddBatch(ev.Txs)
}
}
}
@@ -125,12 +136,12 @@ func (pool *TxPool) eventLoop() {
func (pool *TxPool) resetState() {
currentState, err := pool.currentState()
if err != nil {
- glog.V(logger.Info).Infoln("failed to get current state: %v", err)
+ glog.V(logger.Error).Infof("Failed to get current state: %v", err)
return
}
managedState := state.ManageState(currentState)
if err != nil {
- glog.V(logger.Info).Infoln("failed to get managed state: %v", err)
+ glog.V(logger.Error).Infof("Failed to get managed state: %v", err)
return
}
pool.pendingState = managedState
@@ -139,26 +150,21 @@ func (pool *TxPool) resetState() {
// any transactions that have been included in the block or
// have been invalidated because of another transaction (e.g.
// higher gas price)
- pool.validatePool()
-
- // Loop over the pending transactions and base the nonce of the new
- // pending transaction set.
- for _, tx := range pool.pending {
- if addr, err := tx.From(); err == nil {
- // Set the nonce. Transaction nonce can never be lower
- // than the state nonce; validatePool took care of that.
- if pool.pendingState.GetNonce(addr) <= tx.Nonce() {
- pool.pendingState.SetNonce(addr, tx.Nonce()+1)
- }
- }
+ pool.demoteUnexecutables()
+
+ // Update all accounts to the latest known pending nonce
+ for addr, list := range pool.pending {
+ txs := list.Flatten() // Heavy but will be cached and is needed by the miner anyway
+ pool.pendingState.SetNonce(addr, txs[len(txs)-1].Nonce()+1)
}
// Check the queue and move transactions over to the pending if possible
// or remove those that have become invalid
- pool.checkQueue()
+ pool.promoteExecutables()
}
func (pool *TxPool) Stop() {
pool.events.Unsubscribe()
+ close(pool.quit)
pool.wg.Wait()
glog.V(logger.Info).Infoln("Transaction pool stopped")
}
@@ -170,47 +176,58 @@ func (pool *TxPool) State() *state.ManagedState {
return pool.pendingState
}
+// Stats retrieves the current pool stats, namely the number of pending and the
+// number of queued (non-executable) transactions.
func (pool *TxPool) Stats() (pending int, queued int) {
pool.mu.RLock()
defer pool.mu.RUnlock()
- pending = len(pool.pending)
- for _, txs := range pool.queue {
- queued += len(txs)
+ for _, list := range pool.pending {
+ pending += list.Len()
+ }
+ for _, list := range pool.queue {
+ queued += list.Len()
}
return
}
// Content retrieves the data content of the transaction pool, returning all the
-// pending as well as queued transactions, grouped by account and nonce.
-func (pool *TxPool) Content() (map[common.Address]map[uint64][]*types.Transaction, map[common.Address]map[uint64][]*types.Transaction) {
+// pending as well as queued transactions, grouped by account and sorted by nonce.
+func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) {
pool.mu.RLock()
defer pool.mu.RUnlock()
- // Retrieve all the pending transactions and sort by account and by nonce
- pending := make(map[common.Address]map[uint64][]*types.Transaction)
- for _, tx := range pool.pending {
- account, _ := tx.From()
-
- owned, ok := pending[account]
- if !ok {
- owned = make(map[uint64][]*types.Transaction)
- pending[account] = owned
- }
- owned[tx.Nonce()] = append(owned[tx.Nonce()], tx)
- }
- // Retrieve all the queued transactions and sort by account and by nonce
- queued := make(map[common.Address]map[uint64][]*types.Transaction)
- for account, txs := range pool.queue {
- owned := make(map[uint64][]*types.Transaction)
- for _, tx := range txs {
- owned[tx.Nonce()] = append(owned[tx.Nonce()], tx)
- }
- queued[account] = owned
+ pending := make(map[common.Address]types.Transactions)
+ for addr, list := range pool.pending {
+ pending[addr] = list.Flatten()
+ }
+ queued := make(map[common.Address]types.Transactions)
+ for addr, list := range pool.queue {
+ queued[addr] = list.Flatten()
}
return pending, queued
}
+// Pending retrieves all currently processable transactions, groupped by origin
+// account and sorted by nonce. The returned transaction set is a copy and can be
+// freely modified by calling code.
+func (pool *TxPool) Pending() map[common.Address]types.Transactions {
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
+
+ // check queue first
+ pool.promoteExecutables()
+
+ // invalidate any txs
+ pool.demoteUnexecutables()
+
+ pending := make(map[common.Address]types.Transactions)
+ for addr, list := range pool.pending {
+ pending[addr] = list.Flatten()
+ }
+ return pending
+}
+
// SetLocal marks a transaction as local, skipping gas price
// check against local miner minimum in the future
func (pool *TxPool) SetLocal(tx *types.Transaction) {
@@ -276,312 +293,348 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error {
return nil
}
-// validate and queue transactions.
-func (self *TxPool) add(tx *types.Transaction) error {
+// add validates a transaction and inserts it into the non-executable queue for
+// later pending promotion and execution.
+func (pool *TxPool) add(tx *types.Transaction) error {
+ // If the transaction is alreayd known, discard it
hash := tx.Hash()
-
- if self.pending[hash] != nil {
- return fmt.Errorf("Known transaction (%x)", hash[:4])
+ if pool.all[hash] != nil {
+ return fmt.Errorf("Known transaction: %x", hash[:4])
}
- err := self.validateTx(tx)
- if err != nil {
+ // Otherwise ensure basic validation passes and queue it up
+ if err := pool.validateTx(tx); err != nil {
return err
}
- self.queueTx(hash, tx)
+ pool.enqueueTx(hash, tx)
+ // Print a log message if low enough level is set
if glog.V(logger.Debug) {
- var toname string
+ rcpt := "[NEW_CONTRACT]"
if to := tx.To(); to != nil {
- toname = common.Bytes2Hex(to[:4])
- } else {
- toname = "[NEW_CONTRACT]"
+ rcpt = common.Bytes2Hex(to[:4])
}
- // we can ignore the error here because From is
- // verified in ValidateTransaction.
- f, _ := tx.From()
- from := common.Bytes2Hex(f[:4])
- glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, hash)
+ from, _ := tx.From() // from already verified during tx validation
+ glog.Infof("(t) 0x%x => %s (%v) %x\n", from[:4], rcpt, tx.Value, hash)
}
-
return nil
}
-// queueTx will queue an unknown transaction
-func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) {
+// enqueueTx inserts a new transaction into the non-executable transaction queue.
+//
+// Note, this method assumes the pool lock is held!
+func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) {
+ // Try to insert the transaction into the future queue
from, _ := tx.From() // already validated
- if self.queue[from] == nil {
- self.queue[from] = make(map[common.Hash]*types.Transaction)
+ if pool.queue[from] == nil {
+ pool.queue[from] = newTxList(false)
+ }
+ inserted, old := pool.queue[from].Add(tx)
+ if !inserted {
+ return // An older transaction was better, discard this
+ }
+ // Discard any previous transaction and mark this
+ if old != nil {
+ delete(pool.all, old.Hash())
}
- self.queue[from][hash] = tx
+ pool.all[hash] = tx
}
-// addTx will add a transaction to the pending (processable queue) list of transactions
-func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) {
- // init delayed since tx pool could have been started before any state sync
+// promoteTx adds a transaction to the pending (processable) list of transactions.
+//
+// Note, this method assumes the pool lock is held!
+func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) {
+ // Init delayed since tx pool could have been started before any state sync
if pool.pendingState == nil {
pool.resetState()
}
+ // Try to insert the transaction into the pending queue
+ if pool.pending[addr] == nil {
+ pool.pending[addr] = newTxList(true)
+ }
+ list := pool.pending[addr]
- if _, ok := pool.pending[hash]; !ok {
- pool.pending[hash] = tx
-
- // Increment the nonce on the pending state. This can only happen if
- // the nonce is +1 to the previous one.
- pool.pendingState.SetNonce(addr, tx.Nonce()+1)
- // Notify the subscribers. This event is posted in a goroutine
- // because it's possible that somewhere during the post "Remove transaction"
- // gets called which will then wait for the global tx pool lock and deadlock.
- go pool.eventMux.Post(TxPreEvent{tx})
+ inserted, old := list.Add(tx)
+ if !inserted {
+ // An older transaction was better, discard this
+ delete(pool.all, hash)
+ return
+ }
+ // Otherwise discard any previous transaction and mark this
+ if old != nil {
+ delete(pool.all, old.Hash())
}
+ pool.all[hash] = tx // Failsafe to work around direct pending inserts (tests)
+
+ // Set the potentially new pending nonce and notify any subsystems of the new tx
+ pool.beats[addr] = time.Now()
+ pool.pendingState.SetNonce(addr, tx.Nonce()+1)
+ go pool.eventMux.Post(TxPreEvent{tx})
}
// Add queues a single transaction in the pool if it is valid.
-func (self *TxPool) Add(tx *types.Transaction) error {
- self.mu.Lock()
- defer self.mu.Unlock()
+func (pool *TxPool) Add(tx *types.Transaction) error {
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
- if err := self.add(tx); err != nil {
+ if err := pool.add(tx); err != nil {
return err
}
- self.checkQueue()
+ pool.promoteExecutables()
+
return nil
}
-// AddTransactions attempts to queue all valid transactions in txs.
-func (self *TxPool) AddTransactions(txs []*types.Transaction) {
- self.mu.Lock()
- defer self.mu.Unlock()
+// AddBatch attempts to queue a batch of transactions.
+func (pool *TxPool) AddBatch(txs []*types.Transaction) {
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
for _, tx := range txs {
- if err := self.add(tx); err != nil {
+ if err := pool.add(tx); err != nil {
glog.V(logger.Debug).Infoln("tx error:", err)
- } else {
- h := tx.Hash()
- glog.V(logger.Debug).Infof("tx %x\n", h[:4])
}
}
-
- // check and validate the queue
- self.checkQueue()
+ pool.promoteExecutables()
}
-// GetTransaction returns a transaction if it is contained in the pool
+// Get returns a transaction if it is contained in the pool
// and nil otherwise.
-func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction {
- tp.mu.RLock()
- defer tp.mu.RUnlock()
-
- // check the txs first
- if tx, ok := tp.pending[hash]; ok {
- return tx
- }
- // check queue
- for _, txs := range tp.queue {
- if tx, ok := txs[hash]; ok {
- return tx
- }
- }
- return nil
-}
+func (pool *TxPool) Get(hash common.Hash) *types.Transaction {
+ pool.mu.RLock()
+ defer pool.mu.RUnlock()
-// GetTransactions returns all currently processable transactions.
-// The returned slice may be modified by the caller.
-func (self *TxPool) GetTransactions() (txs types.Transactions) {
- self.mu.Lock()
- defer self.mu.Unlock()
+ return pool.all[hash]
+}
- // check queue first
- self.checkQueue()
- // invalidate any txs
- self.validatePool()
+// Remove removes the transaction with the given hash from the pool.
+func (pool *TxPool) Remove(hash common.Hash) {
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
- txs = make(types.Transactions, len(self.pending))
- i := 0
- for _, tx := range self.pending {
- txs[i] = tx
- i++
- }
- return txs
+ pool.removeTx(hash)
}
-// GetQueuedTransactions returns all non-processable transactions.
-func (self *TxPool) GetQueuedTransactions() types.Transactions {
- self.mu.RLock()
- defer self.mu.RUnlock()
+// RemoveBatch removes all given transactions from the pool.
+func (pool *TxPool) RemoveBatch(txs types.Transactions) {
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
- var ret types.Transactions
- for _, txs := range self.queue {
- for _, tx := range txs {
- ret = append(ret, tx)
- }
+ for _, tx := range txs {
+ pool.removeTx(tx.Hash())
}
- sort.Sort(types.TxByNonce(ret))
- return ret
}
-// RemoveTransactions removes all given transactions from the pool.
-func (self *TxPool) RemoveTransactions(txs types.Transactions) {
- self.mu.Lock()
- defer self.mu.Unlock()
- for _, tx := range txs {
- self.removeTx(tx.Hash())
+// removeTx removes a single transaction from the queue, moving all subsequent
+// transactions back to the future queue.
+func (pool *TxPool) removeTx(hash common.Hash) {
+ // Fetch the transaction we wish to delete
+ tx, ok := pool.all[hash]
+ if !ok {
+ return
}
-}
+ addr, _ := tx.From() // already validated during insertion
-// RemoveTx removes the transaction with the given hash from the pool.
-func (pool *TxPool) RemoveTx(hash common.Hash) {
- pool.mu.Lock()
- defer pool.mu.Unlock()
- pool.removeTx(hash)
-}
+ // Remove it from the list of known transactions
+ delete(pool.all, hash)
-func (pool *TxPool) removeTx(hash common.Hash) {
- // delete from pending pool
- delete(pool.pending, hash)
- // delete from queue
- for address, txs := range pool.queue {
- if _, ok := txs[hash]; ok {
- if len(txs) == 1 {
- // if only one tx, remove entire address entry.
- delete(pool.queue, address)
+ // Remove the transaction from the pending lists and reset the account nonce
+ if pending := pool.pending[addr]; pending != nil {
+ if removed, invalids := pending.Remove(tx); removed {
+ // If no more transactions are left, remove the list
+ if pending.Empty() {
+ delete(pool.pending, addr)
+ delete(pool.beats, addr)
} else {
- delete(txs, hash)
+ // Otherwise postpone any invalidated transactions
+ for _, tx := range invalids {
+ pool.enqueueTx(tx.Hash(), tx)
+ }
+ }
+ // Update the account nonce if needed
+ if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce {
+ pool.pendingState.SetNonce(addr, tx.Nonce())
}
- break
+ }
+ }
+ // Transaction is in the future queue
+ if future := pool.queue[addr]; future != nil {
+ future.Remove(tx)
+ if future.Empty() {
+ delete(pool.queue, addr)
}
}
}
-// checkQueue moves transactions that have become processable to main pool.
-func (pool *TxPool) checkQueue() {
- // init delayed since tx pool could have been started before any state sync
+// promoteExecutables moves transactions that have become processable from the
+// future queue to the set of pending transactions. During this process, all
+// invalidated transactions (low nonce, low balance) are deleted.
+func (pool *TxPool) promoteExecutables() {
+ // Init delayed since tx pool could have been started before any state sync
if pool.pendingState == nil {
pool.resetState()
}
+ // Retrieve the current state to allow nonce and balance checking
+ state, err := pool.currentState()
+ if err != nil {
+ glog.Errorf("Could not get current state: %v", err)
+ return
+ }
+ // Iterate over all accounts and promote any executable transactions
+ queued := uint64(0)
- var promote txQueue
- for address, txs := range pool.queue {
- currentState, err := pool.currentState()
- if err != nil {
- glog.Errorf("could not get current state: %v", err)
- return
+ for addr, list := range pool.queue {
+ // Drop all transactions that are deemed too old (low nonce)
+ for _, tx := range list.Forward(state.GetNonce(addr)) {
+ if glog.V(logger.Core) {
+ glog.Infof("Removed old queued transaction: %v", tx)
+ }
+ delete(pool.all, tx.Hash())
}
- balance := currentState.GetBalance(address)
-
- var (
- guessedNonce = pool.pendingState.GetNonce(address) // nonce currently kept by the tx pool (pending state)
- trueNonce = currentState.GetNonce(address) // nonce known by the last state
- )
- promote = promote[:0]
- for hash, tx := range txs {
- // Drop processed or out of fund transactions
- if tx.Nonce() < trueNonce || balance.Cmp(tx.Cost()) < 0 {
- if glog.V(logger.Core) {
- glog.Infof("removed tx (%v) from pool queue: low tx nonce or out of funds\n", tx)
- }
- delete(txs, hash)
- continue
+ // Drop all transactions that are too costly (low balance)
+ drops, _ := list.Filter(state.GetBalance(addr))
+ for _, tx := range drops {
+ if glog.V(logger.Core) {
+ glog.Infof("Removed unpayable queued transaction: %v", tx)
}
- // Collect the remaining transactions for the next pass.
- promote = append(promote, txQueueEntry{hash, address, tx})
+ delete(pool.all, tx.Hash())
}
- // Find the next consecutive nonce range starting at the current account nonce,
- // pushing the guessed nonce forward if we add consecutive transactions.
- sort.Sort(promote)
- for i, entry := range promote {
- // If we reached a gap in the nonces, enforce transaction limit and stop
- if entry.Nonce() > guessedNonce {
- if len(promote)-i > maxQueued {
- if glog.V(logger.Debug) {
- glog.Infof("Queued tx limit exceeded for %s. Tx %s removed\n", common.PP(address[:]), common.PP(entry.hash[:]))
- }
- for _, drop := range promote[i+maxQueued:] {
- delete(txs, drop.hash)
- }
- }
- break
+ // Gather all executable transactions and promote them
+ for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) {
+ if glog.V(logger.Core) {
+ glog.Infof("Promoting queued transaction: %v", tx)
}
- // Otherwise promote the transaction and move the guess nonce if needed
- pool.addTx(entry.hash, address, entry.Transaction)
- delete(txs, entry.hash)
-
- if entry.Nonce() == guessedNonce {
- guessedNonce++
+ pool.promoteTx(addr, tx.Hash(), tx)
+ }
+ // Drop all transactions over the allowed limit
+ for _, tx := range list.Cap(int(maxQueuedPerAccount)) {
+ if glog.V(logger.Core) {
+ glog.Infof("Removed cap-exceeding queued transaction: %v", tx)
}
+ delete(pool.all, tx.Hash())
}
+ queued += uint64(list.Len())
+
// Delete the entire queue entry if it became empty.
- if len(txs) == 0 {
- delete(pool.queue, address)
+ if list.Empty() {
+ delete(pool.queue, addr)
+ }
+ }
+ // If we've queued more transactions than the hard limit, drop oldest ones
+ if queued > maxQueuedInTotal {
+ // Sort all accounts with queued transactions by heartbeat
+ addresses := make(addresssByHeartbeat, 0, len(pool.queue))
+ for addr, _ := range pool.queue {
+ addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
+ }
+ sort.Sort(addresses)
+
+ // Drop transactions until the total is below the limit
+ for drop := queued - maxQueuedInTotal; drop > 0; {
+ addr := addresses[len(addresses)-1]
+ list := pool.queue[addr.address]
+
+ addresses = addresses[:len(addresses)-1]
+
+ // Drop all transactions if they are less than the overflow
+ if size := uint64(list.Len()); size <= drop {
+ for _, tx := range list.Flatten() {
+ pool.removeTx(tx.Hash())
+ }
+ drop -= size
+ continue
+ }
+ // Otherwise drop only last few transactions
+ txs := list.Flatten()
+ for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
+ pool.removeTx(txs[i].Hash())
+ drop--
+ }
}
}
}
-// validatePool removes invalid and processed transactions from the main pool.
-// If a transaction is removed for being invalid (e.g. out of funds), all sub-
-// sequent (Still valid) transactions are moved back into the future queue. This
-// is important to prevent a drained account from DOSing the network with non
-// executable transactions.
-func (pool *TxPool) validatePool() {
+// demoteUnexecutables removes invalid and processed transactions from the pools
+// executable/pending queue and any subsequent transactions that become unexecutable
+// are moved back into the future queue.
+func (pool *TxPool) demoteUnexecutables() {
+ // Retrieve the current state to allow nonce and balance checking
state, err := pool.currentState()
if err != nil {
glog.V(logger.Info).Infoln("failed to get current state: %v", err)
return
}
- balanceCache := make(map[common.Address]*big.Int)
-
- // Clean up the pending pool, accumulating invalid nonces
- gaps := make(map[common.Address]uint64)
+ // Iterate over all accounts and demote any non-executable transactions
+ for addr, list := range pool.pending {
+ nonce := state.GetNonce(addr)
- for hash, tx := range pool.pending {
- sender, _ := tx.From() // err already checked
-
- // Perform light nonce and balance validation
- balance := balanceCache[sender]
- if balance == nil {
- balance = state.GetBalance(sender)
- balanceCache[sender] = balance
+ // Drop all transactions that are deemed too old (low nonce)
+ for _, tx := range list.Forward(nonce) {
+ if glog.V(logger.Core) {
+ glog.Infof("Removed old pending transaction: %v", tx)
+ }
+ delete(pool.all, tx.Hash())
}
- if past := state.GetNonce(sender) > tx.Nonce(); past || balance.Cmp(tx.Cost()) < 0 {
- // Remove an already past it invalidated transaction
+ // Drop all transactions that are too costly (low balance), and queue any invalids back for later
+ drops, invalids := list.Filter(state.GetBalance(addr))
+ for _, tx := range drops {
if glog.V(logger.Core) {
- glog.Infof("removed tx (%v) from pool: low tx nonce or out of funds\n", tx)
+ glog.Infof("Removed unpayable pending transaction: %v", tx)
}
- delete(pool.pending, hash)
-
- // Track the smallest invalid nonce to postpone subsequent transactions
- if !past {
- if prev, ok := gaps[sender]; !ok || tx.Nonce() < prev {
- gaps[sender] = tx.Nonce()
- }
+ delete(pool.all, tx.Hash())
+ }
+ for _, tx := range invalids {
+ if glog.V(logger.Core) {
+ glog.Infof("Demoting pending transaction: %v", tx)
}
+ pool.enqueueTx(tx.Hash(), tx)
+ }
+ // Delete the entire queue entry if it became empty.
+ if list.Empty() {
+ delete(pool.pending, addr)
+ delete(pool.beats, addr)
}
}
- // Move all transactions after a gap back to the future queue
- if len(gaps) > 0 {
- for hash, tx := range pool.pending {
- sender, _ := tx.From()
- if gap, ok := gaps[sender]; ok && tx.Nonce() >= gap {
- if glog.V(logger.Core) {
- glog.Infof("postponed tx (%v) due to introduced gap\n", tx)
+}
+
+// expirationLoop is a loop that periodically iterates over all accounts with
+// queued transactions and drop all that have been inactive for a prolonged amount
+// of time.
+func (pool *TxPool) expirationLoop() {
+ defer pool.wg.Done()
+
+ evict := time.NewTicker(evictionInterval)
+ defer evict.Stop()
+
+ for {
+ select {
+ case <-evict.C:
+ pool.mu.Lock()
+ for addr := range pool.queue {
+ if time.Since(pool.beats[addr]) > maxQueuedLifetime {
+ for _, tx := range pool.queue[addr].Flatten() {
+ pool.removeTx(tx.Hash())
+ }
}
- pool.queueTx(hash, tx)
- delete(pool.pending, hash)
}
+ pool.mu.Unlock()
+
+ case <-pool.quit:
+ return
}
}
}
-type txQueue []txQueueEntry
-
-type txQueueEntry struct {
- hash common.Hash
- addr common.Address
- *types.Transaction
+// addressByHeartbeat is an account address tagged with its last activity timestamp.
+type addressByHeartbeat struct {
+ address common.Address
+ heartbeat time.Time
}
-func (q txQueue) Len() int { return len(q) }
-func (q txQueue) Swap(i, j int) { q[i], q[j] = q[j], q[i] }
-func (q txQueue) Less(i, j int) bool { return q[i].Nonce() < q[j].Nonce() }
+type addresssByHeartbeat []addressByHeartbeat
+
+func (a addresssByHeartbeat) Len() int { return len(a) }
+func (a addresssByHeartbeat) Less(i, j int) bool { return a[i].heartbeat.Before(a[j].heartbeat) }
+func (a addresssByHeartbeat) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// txSet represents a set of transaction hashes in which entries
// are automatically dropped after txSetDuration time