From 0ef327bbee79c01a69ba59258acc6ce3a48bc288 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 1 Jul 2016 18:59:55 +0300 Subject: core, eth, internal, miner: optimize txpool for quick ops --- core/tx_pool.go | 503 ++++++++++++++++++++++++-------------------------------- 1 file changed, 215 insertions(+), 288 deletions(-) (limited to 'core/tx_pool.go') diff --git a/core/tx_pool.go b/core/tx_pool.go index ec3d5c16b..c4dcceba0 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -20,7 +20,6 @@ import ( "errors" "fmt" "math/big" - "sort" "sync" "time" @@ -51,11 +50,6 @@ const ( type stateFn func() (*state.StateDB, error) -// TxList is a "list" of transactions belonging to an account, sorted by account -// nonce. To allow gaps and avoid constant copying, the list is represented as a -// hash map. -type TxList map[uint64]*types.Transaction - // TxPool contains all currently known transactions. Transactions // enter the pool when they are received from the network or submitted // locally. They exit the pool when they are included in the blockchain. @@ -74,8 +68,8 @@ type TxPool struct { localTx *txSet mu sync.RWMutex - pending map[common.Address]TxList // All currently processable transactions - queue map[common.Address]TxList // Queued but non-processable transactions + pending map[common.Address]*txList // All currently processable transactions + queue map[common.Address]*txList // Queued but non-processable transactions all map[common.Hash]*types.Transaction // All transactions to allow lookups wg sync.WaitGroup // for shutdown sync @@ -86,8 +80,8 @@ type TxPool struct { func NewTxPool(config *ChainConfig, eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool { pool := &TxPool{ config: config, - pending: make(map[common.Address]TxList), - queue: make(map[common.Address]TxList), + pending: make(map[common.Address]*txList), + queue: make(map[common.Address]*txList), all: make(map[common.Hash]*types.Transaction), eventMux: eventMux, currentState: currentStateFn, @@ -125,7 +119,7 @@ func (pool *TxPool) eventLoop() { pool.minGasPrice = ev.Price pool.mu.Unlock() case RemovedTransactionEvent: - pool.AddTransactions(ev.Txs) + pool.AddBatch(ev.Txs) } } } @@ -133,12 +127,12 @@ func (pool *TxPool) eventLoop() { func (pool *TxPool) resetState() { currentState, err := pool.currentState() if err != nil { - glog.V(logger.Info).Infoln("failed to get current state: %v", err) + glog.V(logger.Error).Infof("Failed to get current state: %v", err) return } managedState := state.ManageState(currentState) if err != nil { - glog.V(logger.Info).Infoln("failed to get managed state: %v", err) + glog.V(logger.Error).Infof("Failed to get managed state: %v", err) return } pool.pendingState = managedState @@ -147,22 +141,15 @@ func (pool *TxPool) resetState() { // any transactions that have been included in the block or // have been invalidated because of another transaction (e.g. // higher gas price) - pool.validatePool() - - // Loop over the pending transactions and base the nonce of the new - // pending transaction set. - for addr, txs := range pool.pending { - // Set the nonce. Transaction nonce can never be lower - // than the state nonce; validatePool took care of that. - for nonce, _ := range txs { - if pool.pendingState.GetNonce(addr) <= nonce { - pool.pendingState.SetNonce(addr, nonce+1) - } - } + pool.demoteUnexecutables() + + // Update all accounts to the latest known pending nonce + for addr, list := range pool.pending { + pool.pendingState.SetNonce(addr, list.last+1) } // Check the queue and move transactions over to the pending if possible // or remove those that have become invalid - pool.checkQueue() + pool.promoteExecutables() } func (pool *TxPool) Stop() { @@ -178,46 +165,58 @@ func (pool *TxPool) State() *state.ManagedState { return pool.pendingState } +// Stats retrieves the current pool stats, namely the number of pending and the +// number of queued (non-executable) transactions. func (pool *TxPool) Stats() (pending int, queued int) { pool.mu.RLock() defer pool.mu.RUnlock() - for _, txs := range pool.pending { - pending += len(txs) + for _, list := range pool.pending { + pending += list.Len() } - for _, txs := range pool.queue { - queued += len(txs) + for _, list := range pool.queue { + queued += list.Len() } return } // Content retrieves the data content of the transaction pool, returning all the -// pending as well as queued transactions, grouped by account and nonce. -func (pool *TxPool) Content() (map[common.Address]TxList, map[common.Address]TxList) { +// pending as well as queued transactions, grouped by account and sorted by nonce. +func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) { pool.mu.RLock() defer pool.mu.RUnlock() - // Retrieve all the pending transactions and sort by account and by nonce - pending := make(map[common.Address]TxList) - for addr, txs := range pool.pending { - copy := make(TxList) - for nonce, tx := range txs { - copy[nonce] = tx - } - pending[addr] = copy - } - // Retrieve all the queued transactions and sort by account and by nonce - queued := make(map[common.Address]TxList) - for addr, txs := range pool.queue { - copy := make(TxList) - for nonce, tx := range txs { - copy[nonce] = tx - } - queued[addr] = copy + pending := make(map[common.Address]types.Transactions) + for addr, list := range pool.pending { + pending[addr] = list.Flatten() + } + queued := make(map[common.Address]types.Transactions) + for addr, list := range pool.queue { + queued[addr] = list.Flatten() } return pending, queued } +// Pending retrieves all currently processable transactions, groupped by origin +// account and sorted by nonce. The returned transaction set is a copy and can be +// freely modified by calling code. +func (pool *TxPool) Pending() map[common.Address]types.Transactions { + pool.mu.Lock() + defer pool.mu.Unlock() + + // check queue first + pool.promoteExecutables() + + // invalidate any txs + pool.demoteUnexecutables() + + pending := make(map[common.Address]types.Transactions) + for addr, list := range pool.pending { + pending[addr] = list.Flatten() + } + return pending +} + // SetLocal marks a transaction as local, skipping gas price // check against local miner minimum in the future func (pool *TxPool) SetLocal(tx *types.Transaction) { @@ -283,340 +282,268 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error { return nil } -// validate and queue transactions. -func (self *TxPool) add(tx *types.Transaction) error { +// add validates a transaction and inserts it into the non-executable queue for +// later pending promotion and execution. +func (pool *TxPool) add(tx *types.Transaction) error { + // If the transaction is alreayd known, discard it hash := tx.Hash() - - if self.all[hash] != nil { - return fmt.Errorf("Known transaction (%x)", hash[:4]) + if pool.all[hash] != nil { + return fmt.Errorf("Known transaction: %x", hash[:4]) } - err := self.validateTx(tx) - if err != nil { + // Otherwise ensure basic validation passes nd queue it up + if err := pool.validateTx(tx); err != nil { return err } - self.queueTx(hash, tx) + pool.enqueueTx(hash, tx) + // Print a log message if low enough level is set if glog.V(logger.Debug) { - var toname string + rcpt := "[NEW_CONTRACT]" if to := tx.To(); to != nil { - toname = common.Bytes2Hex(to[:4]) - } else { - toname = "[NEW_CONTRACT]" + rcpt = common.Bytes2Hex(to[:4]) } - // we can ignore the error here because From is - // verified in ValidateTransaction. - f, _ := tx.From() - from := common.Bytes2Hex(f[:4]) - glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, hash) + from, _ := tx.From() // from already verified during tx validation + glog.Infof("(t) 0x%x => %s (%v) %x\n", from[:4], rcpt, tx.Value, hash) } - return nil } -// queueTx will queue an unknown transaction. -func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) { - addr, _ := tx.From() // already validated - if self.queue[addr] == nil { - self.queue[addr] = make(TxList) +// enqueueTx inserts a new transction into the non-executable transaction queue. +// +// Note, this method assumes the pool lock is held! +func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) { + // Try to insert the transaction into the future queue + from, _ := tx.From() // already validated + if pool.queue[from] == nil { + pool.queue[from] = newTxList(false) } - // If the nonce is already used, discard the lower priced transaction - nonce := tx.Nonce() - - if old, ok := self.queue[addr][nonce]; ok { - if old.GasPrice().Cmp(tx.GasPrice()) >= 0 { - return // Old was better, discard this - } - delete(self.all, old.Hash()) // New is better, drop and overwrite old one + inserted, old := pool.queue[from].Add(tx) + if !inserted { + return // An older transaction was better, discard this + } + // Discard any previous transaction and mark this + if old != nil { + delete(pool.all, old.Hash()) } - self.queue[addr][nonce] = tx - self.all[hash] = tx + pool.all[hash] = tx } -// addTx will moves a transaction from the non-executable queue to the pending -// (processable) list of transactions. -func (pool *TxPool) addTx(addr common.Address, tx *types.Transaction) { +// promoteTx adds a transaction to the pending (processable) list of transactions. +// +// Note, this method assumes the pool lock is held! +func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) { // Init delayed since tx pool could have been started before any state sync if pool.pendingState == nil { pool.resetState() } - // If the nonce is already used, discard the lower priced transaction - hash, nonce := tx.Hash(), tx.Nonce() - - if old, ok := pool.pending[addr][nonce]; ok { - oldHash := old.Hash() + // Try to insert the transaction into the pending queue + if pool.pending[addr] == nil { + pool.pending[addr] = newTxList(true) + } + list := pool.pending[addr] - switch { - case oldHash == hash: // Nothing changed, noop - return - case old.GasPrice().Cmp(tx.GasPrice()) >= 0: // Old was better, discard this - delete(pool.all, hash) - return - default: // New is better, discard old - delete(pool.all, oldHash) - } + inserted, old := list.Add(tx) + if !inserted { + // An older transaction was better, discard this + delete(pool.all, hash) + return } - // The transaction is being kept, insert it into the tx pool - if _, ok := pool.pending[addr]; !ok { - pool.pending[addr] = make(TxList) + // Otherwise discard any previous transaction and mark this + if old != nil { + delete(pool.all, old.Hash()) } - pool.pending[addr][nonce] = tx - pool.all[hash] = tx - - // Increment the nonce on the pending state. This can only happen if - // the nonce is +1 to the previous one. - pool.pendingState.SetNonce(addr, nonce+1) + pool.all[hash] = tx // Failsafe to work around direct pending inserts (tests) - // Notify the subscribers. This event is posted in a goroutine - // because it's possible that somewhere during the post "Remove transaction" - // gets called which will then wait for the global tx pool lock and deadlock. + // Set the potentially new pending nonce and notify any subsystems of the new tx + pool.pendingState.SetNonce(addr, list.last+1) go pool.eventMux.Post(TxPreEvent{tx}) } // Add queues a single transaction in the pool if it is valid. -func (self *TxPool) Add(tx *types.Transaction) error { - self.mu.Lock() - defer self.mu.Unlock() +func (pool *TxPool) Add(tx *types.Transaction) error { + pool.mu.Lock() + defer pool.mu.Unlock() - if err := self.add(tx); err != nil { + if err := pool.add(tx); err != nil { return err } - self.checkQueue() + pool.promoteExecutables() + return nil } -// AddTransactions attempts to queue all valid transactions in txs. -func (self *TxPool) AddTransactions(txs []*types.Transaction) { - self.mu.Lock() - defer self.mu.Unlock() +// AddBatch attempts to queue a batch of transactions. +func (pool *TxPool) AddBatch(txs []*types.Transaction) { + pool.mu.Lock() + defer pool.mu.Unlock() for _, tx := range txs { - if err := self.add(tx); err != nil { + if err := pool.add(tx); err != nil { glog.V(logger.Debug).Infoln("tx error:", err) - } else { - h := tx.Hash() - glog.V(logger.Debug).Infof("tx %x\n", h[:4]) } } - - // check and validate the queue - self.checkQueue() + pool.promoteExecutables() } -// GetTransaction returns a transaction if it is contained in the pool +// Get returns a transaction if it is contained in the pool // and nil otherwise. -func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction { - tp.mu.RLock() - defer tp.mu.RUnlock() - - return tp.all[hash] -} - -// GetTransactions returns all currently processable transactions. -// The returned slice may be modified by the caller. -func (self *TxPool) GetTransactions() types.Transactions { - self.mu.Lock() - defer self.mu.Unlock() - - // check queue first - self.checkQueue() - - // invalidate any txs - self.validatePool() +func (pool *TxPool) Get(hash common.Hash) *types.Transaction { + pool.mu.RLock() + defer pool.mu.RUnlock() - count := 0 - for _, txs := range self.pending { - count += len(txs) - } - pending := make(types.Transactions, 0, count) - for _, txs := range self.pending { - for _, tx := range txs { - pending = append(pending, tx) - } - } - return pending + return pool.all[hash] } -// RemoveTransactions removes all given transactions from the pool. -func (self *TxPool) RemoveTransactions(txs types.Transactions) { - self.mu.Lock() - defer self.mu.Unlock() +// Remove removes the transaction with the given hash from the pool. +func (pool *TxPool) Remove(hash common.Hash) { + pool.mu.Lock() + defer pool.mu.Unlock() - for _, tx := range txs { - self.removeTx(tx.Hash()) - } + pool.removeTx(hash) } -// RemoveTx removes the transaction with the given hash from the pool. -func (pool *TxPool) RemoveTx(hash common.Hash) { +// RemoveBatch removes all given transactions from the pool. +func (pool *TxPool) RemoveBatch(txs types.Transactions) { pool.mu.Lock() defer pool.mu.Unlock() - pool.removeTx(hash) + for _, tx := range txs { + pool.removeTx(tx.Hash()) + } } +// removeTx iterates removes a single transaction from the queue, moving all +// subsequent transactions back to the future queue. func (pool *TxPool) removeTx(hash common.Hash) { // Fetch the transaction we wish to delete tx, ok := pool.all[hash] if !ok { return } - addr, _ := tx.From() + addr, _ := tx.From() // already validated during insertion - // Remove it from all internal lists + // Remove it from the list of known transactions delete(pool.all, hash) - delete(pool.pending[addr], tx.Nonce()) - if len(pool.pending[addr]) == 0 { - delete(pool.pending, addr) + // Remove the transaction from the pending lists and reset the account nonce + if pending := pool.pending[addr]; pending != nil { + if removed, invalids := pending.Remove(tx); removed { + // If no more transactions are left, remove the list and reset the nonce + if pending.Empty() { + delete(pool.pending, addr) + pool.pendingState.SetNonce(addr, tx.Nonce()) + } else { + // Otherwise update the nonce and postpone any invalidated transactions + pool.pendingState.SetNonce(addr, pending.last) + for _, tx := range invalids { + pool.enqueueTx(tx.Hash(), tx) + } + } + } } - delete(pool.queue[addr], tx.Nonce()) - if len(pool.queue[addr]) == 0 { - delete(pool.queue, addr) + // Transaction is in the future queue + if future := pool.queue[addr]; future != nil { + future.Remove(tx) + if future.Empty() { + delete(pool.queue, addr) + } } } -// checkQueue moves transactions that have become processable from the pool's -// queue to the set of pending transactions. -func (pool *TxPool) checkQueue() { +// promoteExecutables moves transactions that have become processable from the +// future queue to the set of pending transactions. During this process, all +// invalidated transactions (low nonce, low balance) are deleted. +func (pool *TxPool) promoteExecutables() { // Init delayed since tx pool could have been started before any state sync if pool.pendingState == nil { pool.resetState() } - - var promote txQueue - for address, txs := range pool.queue { - currentState, err := pool.currentState() - if err != nil { - glog.Errorf("could not get current state: %v", err) - return + // Retrieve the current state to allow nonce and balance checking + state, err := pool.currentState() + if err != nil { + glog.Errorf("Could not get current state: %v", err) + return + } + // Iterate over all accounts and promote any executable transactions + for addr, list := range pool.queue { + // Drop all transactions that are deemed too old (low nonce) + for _, tx := range list.Forward(state.GetNonce(addr)) { + if glog.V(logger.Core) { + glog.Infof("Removed old queued transaction: %v", tx) + } + delete(pool.all, tx.Hash()) } - balance := currentState.GetBalance(address) - - var ( - guessedNonce = pool.pendingState.GetNonce(address) // nonce currently kept by the tx pool (pending state) - trueNonce = currentState.GetNonce(address) // nonce known by the last state - ) - promote = promote[:0] - for nonce, tx := range txs { - // Drop processed or out of fund transactions - if nonce < trueNonce || balance.Cmp(tx.Cost()) < 0 { - if glog.V(logger.Core) { - glog.Infof("removed tx (%v) from pool queue: low tx nonce or out of funds\n", tx) - } - delete(txs, nonce) - delete(pool.all, tx.Hash()) - - continue + // Drop all transactions that are too costly (low balance) + drops, _ := list.Filter(state.GetBalance(addr)) + for _, tx := range drops { + if glog.V(logger.Core) { + glog.Infof("Removed unpayable queued transaction: %v", tx) } - // Collect the remaining transactions for the next pass. - promote = append(promote, txQueueEntry{address, tx}) + delete(pool.all, tx.Hash()) } - // Find the next consecutive nonce range starting at the current account nonce, - // pushing the guessed nonce forward if we add consecutive transactions. - sort.Sort(promote) - for i, entry := range promote { - // If we reached a gap in the nonces, enforce transaction limit and stop - if entry.Nonce() > guessedNonce { - if len(promote)-i > maxQueued { - if glog.V(logger.Debug) { - glog.Infof("Queued tx limit exceeded for %s. Tx %s removed\n", common.PP(address[:]), common.PP(entry.Hash().Bytes())) - } - for _, drop := range promote[i+maxQueued:] { - delete(txs, drop.Nonce()) - delete(pool.all, drop.Hash()) - } - } - break + // Gather all executable transactions and promote them + for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) { + if glog.V(logger.Core) { + glog.Infof("Promoting queued transaction: %v", tx) } - // Otherwise promote the transaction and move the guess nonce if needed - pool.addTx(address, entry.Transaction) - delete(txs, entry.Nonce()) - - if entry.Nonce() == guessedNonce { - guessedNonce++ + pool.promoteTx(addr, tx.Hash(), tx) + } + // Drop all transactions over the allowed limit + for _, tx := range list.Cap(maxQueued) { + if glog.V(logger.Core) { + glog.Infof("Removed cap-exceeding queued transaction: %v", tx) } + delete(pool.all, tx.Hash()) } // Delete the entire queue entry if it became empty. - if len(txs) == 0 { - delete(pool.queue, address) + if list.Empty() { + delete(pool.queue, addr) } } } -// validatePool removes invalid and processed transactions from the main pool. -// If a transaction is removed for being invalid (e.g. out of funds), all sub- -// sequent (Still valid) transactions are moved back into the future queue. This -// is important to prevent a drained account from DOSing the network with non -// executable transactions. -func (pool *TxPool) validatePool() { +// demoteUnexecutables removes invalid and processed transactions from the pools +// executable/pending queue and any subsequent transactions that become unexecutable +// are moved back into the future queue. +func (pool *TxPool) demoteUnexecutables() { + // Retrieve the current state to allow nonce and balance checking state, err := pool.currentState() if err != nil { glog.V(logger.Info).Infoln("failed to get current state: %v", err) return } - balanceCache := make(map[common.Address]*big.Int) - - // Clean up the pending pool, accumulating invalid nonces - gaps := make(map[common.Address]uint64) + // Iterate over all accounts and demote any non-executable transactions + for addr, list := range pool.pending { + nonce := state.GetNonce(addr) - for addr, txs := range pool.pending { - for nonce, tx := range txs { - // Perform light nonce and balance validation - balance := balanceCache[addr] - if balance == nil { - balance = state.GetBalance(addr) - balanceCache[addr] = balance + // Drop all transactions that are deemed too old (low nonce) + for _, tx := range list.Forward(nonce) { + if glog.V(logger.Core) { + glog.Infof("Removed old pending transaction: %v", tx) } - if past := state.GetNonce(addr) > nonce; past || balance.Cmp(tx.Cost()) < 0 { - // Remove an already past it invalidated transaction - if glog.V(logger.Core) { - glog.Infof("removed tx (%v) from pool: low tx nonce or out of funds\n", tx) - } - delete(pool.pending[addr], nonce) - if len(pool.pending[addr]) == 0 { - delete(pool.pending, addr) - } - delete(pool.all, tx.Hash()) - - // Track the smallest invalid nonce to postpone subsequent transactions - if !past { - if prev, ok := gaps[addr]; !ok || nonce < prev { - gaps[addr] = nonce - } - } + delete(pool.all, tx.Hash()) + } + // Drop all transactions that are too costly (low balance), and queue any invalids back for later + drops, invalids := list.Filter(state.GetBalance(addr)) + for _, tx := range drops { + if glog.V(logger.Core) { + glog.Infof("Removed unpayable pending transaction: %v", tx) } + delete(pool.all, tx.Hash()) } - } - // Move all transactions after a gap back to the future queue - if len(gaps) > 0 { - for addr, txs := range pool.pending { - for nonce, tx := range txs { - if gap, ok := gaps[addr]; ok && nonce >= gap { - if glog.V(logger.Core) { - glog.Infof("postponed tx (%v) due to introduced gap\n", tx) - } - delete(pool.pending[addr], nonce) - if len(pool.pending[addr]) == 0 { - delete(pool.pending, addr) - } - pool.queueTx(tx.Hash(), tx) - } + for _, tx := range invalids { + if glog.V(logger.Core) { + glog.Infof("Demoting pending transaction: %v", tx) } + pool.enqueueTx(tx.Hash(), tx) + } + // Delete the entire queue entry if it became empty. + if list.Empty() { + delete(pool.pending, addr) } } } -type txQueue []txQueueEntry - -type txQueueEntry struct { - addr common.Address - *types.Transaction -} - -func (q txQueue) Len() int { return len(q) } -func (q txQueue) Swap(i, j int) { q[i], q[j] = q[j], q[i] } -func (q txQueue) Less(i, j int) bool { return q[i].Nonce() < q[j].Nonce() } - // txSet represents a set of transaction hashes in which entries // are automatically dropped after txSetDuration time type txSet struct { -- cgit v1.2.3