diff options
author | Péter Szilágyi <peterke@gmail.com> | 2017-09-05 18:39:18 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-09-05 18:39:18 +0800 |
commit | c91f7beb53ff1ab0376d9aa5fab5a8de8b04631c (patch) | |
tree | da60bb454f0b527f83986d2f1d319679e772261b /core/tx_pool.go | |
parent | 2bacf36d8095ac7936f69552e2727ac6f276479f (diff) | |
parent | da7d57e07c04dcbb7cc20b35f6606ef3f4c400e3 (diff) | |
download | dexon-c91f7beb53ff1ab0376d9aa5fab5a8de8b04631c.tar dexon-c91f7beb53ff1ab0376d9aa5fab5a8de8b04631c.tar.gz dexon-c91f7beb53ff1ab0376d9aa5fab5a8de8b04631c.tar.bz2 dexon-c91f7beb53ff1ab0376d9aa5fab5a8de8b04631c.tar.lz dexon-c91f7beb53ff1ab0376d9aa5fab5a8de8b04631c.tar.xz dexon-c91f7beb53ff1ab0376d9aa5fab5a8de8b04631c.tar.zst dexon-c91f7beb53ff1ab0376d9aa5fab5a8de8b04631c.zip |
Merge pull request #15085 from karalabe/txpool-immutable
core: make txpool operate on immutable state
Diffstat (limited to 'core/tx_pool.go')
-rw-r--r-- | core/tx_pool.go | 203 |
1 files changed, 114 insertions, 89 deletions
diff --git a/core/tx_pool.go b/core/tx_pool.go index d835c94d1..f41fbe069 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -105,10 +105,11 @@ var ( // blockChain provides the state of blockchain and current gas limit to do // some pre checks in tx pool and event subscribers. type blockChain interface { - State() (*state.StateDB, error) - GasLimit() *big.Int + CurrentHeader() *types.Header SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription - SubscribeRemovedTxEvent(ch chan<- RemovedTransactionEvent) event.Subscription + + GetBlock(hash common.Hash, number uint64) *types.Block + StateAt(root common.Hash) (*state.StateDB, error) } // TxPoolConfig are the configuration parameters of the transaction pool. @@ -174,18 +175,19 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig { type TxPool struct { config TxPoolConfig chainconfig *params.ChainConfig - blockChain blockChain - pendingState *state.ManagedState + chain blockChain gasPrice *big.Int txFeed event.Feed scope event.SubscriptionScope chainHeadCh chan ChainHeadEvent chainHeadSub event.Subscription - rmTxCh chan RemovedTransactionEvent - rmTxSub event.Subscription signer types.Signer mu sync.RWMutex + currentState *state.StateDB // Current state in the blockchain head + pendingState *state.ManagedState // Pending state tracking virtual nonces + currentMaxGas *big.Int // Current gas limit for transaction caps + locals *accountSet // Set of local transaction to exepmt from evicion rules journal *txJournal // Journal of local transaction to back up to disk @@ -202,28 +204,26 @@ type TxPool struct { // NewTxPool creates a new transaction pool to gather, sort and filter inbound // trnsactions from the network. -func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, blockChain blockChain) *TxPool { +func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool { // Sanitize the input to ensure no vulnerable gas prices are set config = (&config).sanitize() // Create the transaction pool with its initial settings pool := &TxPool{ - config: config, - chainconfig: chainconfig, - blockChain: blockChain, - signer: types.NewEIP155Signer(chainconfig.ChainId), - pending: make(map[common.Address]*txList), - queue: make(map[common.Address]*txList), - beats: make(map[common.Address]time.Time), - all: make(map[common.Hash]*types.Transaction), - chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), - rmTxCh: make(chan RemovedTransactionEvent, rmTxChanSize), - gasPrice: new(big.Int).SetUint64(config.PriceLimit), - pendingState: nil, + config: config, + chainconfig: chainconfig, + chain: chain, + signer: types.NewEIP155Signer(chainconfig.ChainId), + pending: make(map[common.Address]*txList), + queue: make(map[common.Address]*txList), + beats: make(map[common.Address]time.Time), + all: make(map[common.Hash]*types.Transaction), + chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), + gasPrice: new(big.Int).SetUint64(config.PriceLimit), } pool.locals = newAccountSet(pool.signer) pool.priced = newTxPricedList(&pool.all) - pool.reset() + pool.reset(nil, chain.CurrentHeader()) // If local transactions and journaling is enabled, load from disk if !config.NoLocals && config.Journal != "" { @@ -237,8 +237,8 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, blockChain } } // Subscribe events from blockchain - pool.chainHeadSub = pool.blockChain.SubscribeChainHeadEvent(pool.chainHeadCh) - pool.rmTxSub = pool.blockChain.SubscribeRemovedTxEvent(pool.rmTxCh) + pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh) + // Start the event loop and return pool.wg.Add(1) go pool.loop() @@ -264,31 +264,28 @@ func (pool *TxPool) loop() { journal := time.NewTicker(pool.config.Rejournal) defer journal.Stop() + // Track the previous head headers for transaction reorgs + head := pool.chain.CurrentHeader() + // Keep waiting for and reacting to the various events for { select { // Handle ChainHeadEvent case ev := <-pool.chainHeadCh: - pool.mu.Lock() if ev.Block != nil { + pool.mu.Lock() if pool.chainconfig.IsHomestead(ev.Block.Number()) { pool.homestead = true } + pool.reset(head, ev.Block.Header()) + head = ev.Block.Header() + pool.mu.Unlock() } - pool.reset() - pool.mu.Unlock() // Be unsubscribed due to system stopped case <-pool.chainHeadSub.Err(): return - // Handle RemovedTransactionEvent - case ev := <-pool.rmTxCh: - pool.addTxs(ev.Txs, false) - // Be unsubscribed due to system stopped - case <-pool.rmTxSub.Err(): - return - // Handle stats reporting ticks case <-report.C: pool.mu.RLock() @@ -333,28 +330,76 @@ func (pool *TxPool) loop() { // lockedReset is a wrapper around reset to allow calling it in a thread safe // manner. This method is only ever used in the tester! -func (pool *TxPool) lockedReset() { +func (pool *TxPool) lockedReset(oldHead, newHead *types.Header) { pool.mu.Lock() defer pool.mu.Unlock() - pool.reset() + pool.reset(oldHead, newHead) } // reset retrieves the current state of the blockchain and ensures the content // of the transaction pool is valid with regard to the chain state. -func (pool *TxPool) reset() { - currentState, err := pool.blockChain.State() +func (pool *TxPool) reset(oldHead, newHead *types.Header) { + // If we're reorging an old state, reinject all dropped transactions + var reinject types.Transactions + + if oldHead != nil && oldHead.Hash() != newHead.ParentHash { + var discarded, included types.Transactions + + var ( + rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64()) + add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64()) + ) + for rem.NumberU64() > add.NumberU64() { + discarded = append(discarded, rem.Transactions()...) + if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { + log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash()) + return + } + } + for add.NumberU64() > rem.NumberU64() { + included = append(included, add.Transactions()...) + if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil { + log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash()) + return + } + } + for rem.Hash() != add.Hash() { + discarded = append(discarded, rem.Transactions()...) + if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { + log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash()) + return + } + included = append(included, add.Transactions()...) + if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil { + log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash()) + return + } + } + reinject = types.TxDifference(discarded, included) + } + // Initialize the internal state to the current head + if newHead == nil { + newHead = pool.chain.CurrentHeader() // Special case during testing + } + statedb, err := pool.chain.StateAt(newHead.Root) if err != nil { - log.Error("Failed reset txpool state", "err", err) + log.Error("Failed to reset txpool state", "err", err) return } - pool.pendingState = state.ManageState(currentState) + pool.currentState = statedb + pool.pendingState = state.ManageState(statedb) + pool.currentMaxGas = newHead.GasLimit + + // Inject any transactions discarded due to reorgs + log.Debug("Reinjecting stale transactions", "count", len(reinject)) + pool.addTxsLocked(reinject, false) // validate the pool of pending transactions, this will remove // any transactions that have been included in the block or // have been invalidated because of another transaction (e.g. // higher gas price) - pool.demoteUnexecutables(currentState) + pool.demoteUnexecutables() // Update all accounts to the latest known pending nonce for addr, list := range pool.pending { @@ -363,16 +408,16 @@ func (pool *TxPool) reset() { } // Check the queue and move transactions over to the pending if possible // or remove those that have become invalid - pool.promoteExecutables(currentState, nil) + pool.promoteExecutables(nil) } // Stop terminates the transaction pool. func (pool *TxPool) Stop() { // Unsubscribe all subscriptions registered from txpool pool.scope.Close() + // Unsubscribe subscriptions registered from blockchain pool.chainHeadSub.Unsubscribe() - pool.rmTxSub.Unsubscribe() pool.wg.Wait() if pool.journal != nil { @@ -442,8 +487,8 @@ func (pool *TxPool) stats() (int, int) { // Content retrieves the data content of the transaction pool, returning all the // pending as well as queued transactions, grouped by account and sorted by nonce. func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) { - pool.mu.RLock() - defer pool.mu.RUnlock() + pool.mu.Lock() + defer pool.mu.Unlock() pending := make(map[common.Address]types.Transactions) for addr, list := range pool.pending { @@ -499,7 +544,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { return ErrNegativeValue } // Ensure the transaction doesn't exceed the current block limit gas. - if pool.blockChain.GasLimit().Cmp(tx.Gas()) < 0 { + if pool.currentMaxGas.Cmp(tx.Gas()) < 0 { return ErrGasLimit } // Make sure the transaction is signed properly @@ -513,16 +558,12 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { return ErrUnderpriced } // Ensure the transaction adheres to nonce ordering - currentState, err := pool.blockChain.State() - if err != nil { - return err - } - if currentState.GetNonce(from) > tx.Nonce() { + if pool.currentState.GetNonce(from) > tx.Nonce() { return ErrNonceTooLow } // Transactor should have enough funds to cover the costs // cost == V + GP * GL - if currentState.GetBalance(from).Cmp(tx.Cost()) < 0 { + if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 { return ErrInsufficientFunds } intrGas := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead) @@ -721,12 +762,8 @@ func (pool *TxPool) addTx(tx *types.Transaction, local bool) error { } // If we added a new transaction, run promotion checks and return if !replace { - state, err := pool.blockChain.State() - if err != nil { - return err - } from, _ := types.Sender(pool.signer, tx) // already validated - pool.promoteExecutables(state, []common.Address{from}) + pool.promoteExecutables([]common.Address{from}) } return nil } @@ -736,6 +773,12 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) error { pool.mu.Lock() defer pool.mu.Unlock() + return pool.addTxsLocked(txs, local) +} + +// addTxsLocked attempts to queue a batch of transactions if they are valid, +// whilst assuming the transaction pool lock is already held. +func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) error { // Add the batch of transaction, tracking the accepted ones dirty := make(map[common.Address]struct{}) for _, tx := range txs { @@ -748,15 +791,11 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) error { } // Only reprocess the internal state if something was actually added if len(dirty) > 0 { - state, err := pool.blockChain.State() - if err != nil { - return err - } addrs := make([]common.Address, 0, len(dirty)) for addr, _ := range dirty { addrs = append(addrs, addr) } - pool.promoteExecutables(state, addrs) + pool.promoteExecutables(addrs) } return nil } @@ -770,24 +809,6 @@ func (pool *TxPool) Get(hash common.Hash) *types.Transaction { return pool.all[hash] } -// Remove removes the transaction with the given hash from the pool. -func (pool *TxPool) Remove(hash common.Hash) { - pool.mu.Lock() - defer pool.mu.Unlock() - - pool.removeTx(hash) -} - -// RemoveBatch removes all given transactions from the pool. -func (pool *TxPool) RemoveBatch(txs types.Transactions) { - pool.mu.Lock() - defer pool.mu.Unlock() - - for _, tx := range txs { - pool.removeTx(tx.Hash()) - } -} - // removeTx removes a single transaction from the queue, moving all subsequent // transactions back to the future queue. func (pool *TxPool) removeTx(hash common.Hash) { @@ -834,9 +855,7 @@ func (pool *TxPool) removeTx(hash common.Hash) { // promoteExecutables moves transactions that have become processable from the // future queue to the set of pending transactions. During this process, all // invalidated transactions (low nonce, low balance) are deleted. -func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.Address) { - gaslimit := pool.blockChain.GasLimit() - +func (pool *TxPool) promoteExecutables(accounts []common.Address) { // Gather all the accounts potentially needing updates if accounts == nil { accounts = make([]common.Address, 0, len(pool.queue)) @@ -851,14 +870,14 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A continue // Just in case someone calls with a non existing account } // Drop all transactions that are deemed too old (low nonce) - for _, tx := range list.Forward(state.GetNonce(addr)) { + for _, tx := range list.Forward(pool.currentState.GetNonce(addr)) { hash := tx.Hash() log.Trace("Removed old queued transaction", "hash", hash) delete(pool.all, hash) pool.priced.Removed() } // Drop all transactions that are too costly (low balance or out of gas) - drops, _ := list.Filter(state.GetBalance(addr), gaslimit) + drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas) for _, tx := range drops { hash := tx.Hash() log.Trace("Removed unpayable queued transaction", "hash", hash) @@ -1003,12 +1022,10 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A // demoteUnexecutables removes invalid and processed transactions from the pools // executable/pending queue and any subsequent transactions that become unexecutable // are moved back into the future queue. -func (pool *TxPool) demoteUnexecutables(state *state.StateDB) { - gaslimit := pool.blockChain.GasLimit() - +func (pool *TxPool) demoteUnexecutables() { // Iterate over all accounts and demote any non-executable transactions for addr, list := range pool.pending { - nonce := state.GetNonce(addr) + nonce := pool.currentState.GetNonce(addr) // Drop all transactions that are deemed too old (low nonce) for _, tx := range list.Forward(nonce) { @@ -1018,7 +1035,7 @@ func (pool *TxPool) demoteUnexecutables(state *state.StateDB) { pool.priced.Removed() } // Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later - drops, invalids := list.Filter(state.GetBalance(addr), gaslimit) + drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas) for _, tx := range drops { hash := tx.Hash() log.Trace("Removed unpayable pending transaction", "hash", hash) @@ -1031,6 +1048,14 @@ func (pool *TxPool) demoteUnexecutables(state *state.StateDB) { log.Trace("Demoting pending transaction", "hash", hash) pool.enqueueTx(hash, tx) } + // If there's a gap in front, warn (should never happen) and postpone all transactions + if list.Len() > 0 && list.txs.Get(nonce) == nil { + for _, tx := range list.Cap(0) { + hash := tx.Hash() + log.Error("Demoting invalidated transaction", "hash", hash) + pool.enqueueTx(hash, tx) + } + } // Delete the entire queue entry if it became empty. if list.Empty() { delete(pool.pending, addr) |