diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/tx_pool.go | 65 | ||||
-rw-r--r-- | core/tx_pool_test.go | 95 |
2 files changed, 124 insertions, 36 deletions
diff --git a/core/tx_pool.go b/core/tx_pool.go index edcbc21eb..b805cf226 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -176,7 +176,7 @@ func (pool *TxPool) resetState() { // any transactions that have been included in the block or // have been invalidated because of another transaction (e.g. // higher gas price) - pool.demoteUnexecutables() + pool.demoteUnexecutables(currentState) // Update all accounts to the latest known pending nonce for addr, list := range pool.pending { @@ -185,7 +185,7 @@ func (pool *TxPool) resetState() { } // Check the queue and move transactions over to the pending if possible // or remove those that have become invalid - pool.promoteExecutables() + pool.promoteExecutables(currentState) } func (pool *TxPool) Stop() { @@ -196,8 +196,12 @@ func (pool *TxPool) Stop() { } func (pool *TxPool) State() *state.ManagedState { - pool.mu.RLock() - defer pool.mu.RUnlock() + pool.mu.Lock() + defer pool.mu.Unlock() + + if pool.pendingState == nil { + pool.resetState() + } return pool.pendingState } @@ -237,21 +241,26 @@ func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common // Pending retrieves all currently processable transactions, groupped by origin // account and sorted by nonce. The returned transaction set is a copy and can be // freely modified by calling code. -func (pool *TxPool) Pending() map[common.Address]types.Transactions { +func (pool *TxPool) Pending() (map[common.Address]types.Transactions, error) { pool.mu.Lock() defer pool.mu.Unlock() + state, err := pool.currentState() + if err != nil { + return nil, err + } + // check queue first - pool.promoteExecutables() + pool.promoteExecutables(state) // invalidate any txs - pool.demoteUnexecutables() + pool.demoteUnexecutables(state) pending := make(map[common.Address]types.Transactions) for addr, list := range pool.pending { pending[addr] = list.Flatten() } - return pending + return pending, nil } // SetLocal marks a transaction as local, skipping gas price @@ -410,13 +419,19 @@ func (pool *TxPool) Add(tx *types.Transaction) error { if err := pool.add(tx); err != nil { return err } - pool.promoteExecutables() + + state, err := pool.currentState() + if err != nil { + return err + } + + pool.promoteExecutables(state) return nil } // AddBatch attempts to queue a batch of transactions. -func (pool *TxPool) AddBatch(txs []*types.Transaction) { +func (pool *TxPool) AddBatch(txs []*types.Transaction) error { pool.mu.Lock() defer pool.mu.Unlock() @@ -425,7 +440,15 @@ func (pool *TxPool) AddBatch(txs []*types.Transaction) { glog.V(logger.Debug).Infoln("tx error:", err) } } - pool.promoteExecutables() + + state, err := pool.currentState() + if err != nil { + return err + } + + pool.promoteExecutables(state) + + return nil } // Get returns a transaction if it is contained in the pool @@ -499,17 +522,7 @@ func (pool *TxPool) removeTx(hash common.Hash) { // promoteExecutables moves transactions that have become processable from the // future queue to the set of pending transactions. During this process, all // invalidated transactions (low nonce, low balance) are deleted. -func (pool *TxPool) promoteExecutables() { - // Init delayed since tx pool could have been started before any state sync - if pool.pendingState == nil { - pool.resetState() - } - // Retrieve the current state to allow nonce and balance checking - state, err := pool.currentState() - if err != nil { - glog.Errorf("Could not get current state: %v", err) - return - } +func (pool *TxPool) promoteExecutables(state *state.StateDB) { // Iterate over all accounts and promote any executable transactions queued := uint64(0) for addr, list := range pool.queue { @@ -645,13 +658,7 @@ func (pool *TxPool) promoteExecutables() { // demoteUnexecutables removes invalid and processed transactions from the pools // executable/pending queue and any subsequent transactions that become unexecutable // are moved back into the future queue. -func (pool *TxPool) demoteUnexecutables() { - // Retrieve the current state to allow nonce and balance checking - state, err := pool.currentState() - if err != nil { - glog.V(logger.Info).Infoln("failed to get current state: %v", err) - return - } +func (pool *TxPool) demoteUnexecutables(state *state.StateDB) { // Iterate over all accounts and demote any non-executable transactions for addr, list := range pool.pending { nonce := state.GetNonce(addr) diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index 009d19886..3e516735b 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -51,6 +51,80 @@ func deriveSender(tx *types.Transaction) (common.Address, error) { return types.Sender(types.HomesteadSigner{}, tx) } +// This test simulates a scenario where a new block is imported during a +// state reset and tests whether the pending state is in sync with the +// block head event that initiated the resetState(). +func TestStateChangeDuringPoolReset(t *testing.T) { + var ( + db, _ = ethdb.NewMemDatabase() + key, _ = crypto.GenerateKey() + address = crypto.PubkeyToAddress(key.PublicKey) + mux = new(event.TypeMux) + statedb, _ = state.New(common.Hash{}, db) + trigger = false + ) + + // setup pool with 2 transaction in it + statedb.SetBalance(address, new(big.Int).Mul(common.Big1, common.Ether)) + + tx0 := transaction(0, big.NewInt(100000), key) + tx1 := transaction(1, big.NewInt(100000), key) + + // stateFunc is used multiple times to reset the pending state. + // when simulate is true it will create a state that indicates + // that tx0 and tx1 are included in the chain. + stateFunc := func() (*state.StateDB, error) { + // delay "state change" by one. The tx pool fetches the + // state multiple times and by delaying it a bit we simulate + // a state change between those fetches. + stdb := statedb + if trigger { + statedb, _ = state.New(common.Hash{}, db) + // simulate that the new head block included tx0 and tx1 + statedb.SetNonce(address, 2) + statedb.SetBalance(address, new(big.Int).Mul(common.Big1, common.Ether)) + trigger = false + } + return stdb, nil + } + + gasLimitFunc := func() *big.Int { return big.NewInt(1000000000) } + + txpool := NewTxPool(testChainConfig(), mux, stateFunc, gasLimitFunc) + txpool.resetState() + + nonce := txpool.State().GetNonce(address) + if nonce != 0 { + t.Fatalf("Invalid nonce, want 0, got %d", nonce) + } + + txpool.AddBatch(types.Transactions{tx0, tx1}) + + nonce = txpool.State().GetNonce(address) + if nonce != 2 { + t.Fatalf("Invalid nonce, want 2, got %d", nonce) + } + + // trigger state change in the background + trigger = true + + txpool.resetState() + + pendingTx, err := txpool.Pending() + if err != nil { + t.Fatalf("Could not fetch pending transactions: %v", err) + } + + for addr, txs := range pendingTx { + t.Logf("%0x: %d\n", addr, len(txs)) + } + + nonce = txpool.State().GetNonce(address) + if nonce != 2 { + t.Fatalf("Invalid nonce, want 2, got %d", nonce) + } +} + func TestInvalidTransactions(t *testing.T) { pool, key := setupTxPool() @@ -97,9 +171,10 @@ func TestTransactionQueue(t *testing.T) { from, _ := deriveSender(tx) currentState, _ := pool.currentState() currentState.AddBalance(from, big.NewInt(1000)) + pool.resetState() pool.enqueueTx(tx.Hash(), tx) - pool.promoteExecutables() + pool.promoteExecutables(currentState) if len(pool.pending) != 1 { t.Error("expected valid txs to be 1 is", len(pool.pending)) } @@ -108,7 +183,7 @@ func TestTransactionQueue(t *testing.T) { from, _ = deriveSender(tx) currentState.SetNonce(from, 2) pool.enqueueTx(tx.Hash(), tx) - pool.promoteExecutables() + pool.promoteExecutables(currentState) if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok { t.Error("expected transaction to be in tx pool") } @@ -124,11 +199,13 @@ func TestTransactionQueue(t *testing.T) { from, _ = deriveSender(tx1) currentState, _ = pool.currentState() currentState.AddBalance(from, big.NewInt(1000)) + pool.resetState() + pool.enqueueTx(tx1.Hash(), tx1) pool.enqueueTx(tx2.Hash(), tx2) pool.enqueueTx(tx3.Hash(), tx3) - pool.promoteExecutables() + pool.promoteExecutables(currentState) if len(pool.pending) != 1 { t.Error("expected tx pool to be 1, got", len(pool.pending)) @@ -225,7 +302,8 @@ func TestTransactionDoubleNonce(t *testing.T) { if err := pool.add(tx2); err != nil { t.Error("didn't expect error", err) } - pool.promoteExecutables() + state, _ := pool.currentState() + pool.promoteExecutables(state) if pool.pending[addr].Len() != 1 { t.Error("expected 1 pending transactions, got", pool.pending[addr].Len()) } @@ -236,7 +314,7 @@ func TestTransactionDoubleNonce(t *testing.T) { if err := pool.add(tx3); err != nil { t.Error("didn't expect error", err) } - pool.promoteExecutables() + pool.promoteExecutables(state) if pool.pending[addr].Len() != 1 { t.Error("expected 1 pending transactions, got", pool.pending[addr].Len()) } @@ -295,6 +373,7 @@ func TestRemovedTxEvent(t *testing.T) { from, _ := deriveSender(tx) currentState, _ := pool.currentState() currentState.AddBalance(from, big.NewInt(1000000000000)) + pool.resetState() pool.eventMux.Post(RemovedTransactionEvent{types.Transactions{tx}}) pool.eventMux.Post(ChainHeadEvent{nil}) if pool.pending[from].Len() != 1 { @@ -452,6 +531,7 @@ func TestTransactionQueueAccountLimiting(t *testing.T) { state, _ := pool.currentState() state.AddBalance(account, big.NewInt(1000000)) + pool.resetState() // Keep queuing up transactions and make sure all above a limit are dropped for i := uint64(1); i <= maxQueuedPerAccount+5; i++ { @@ -564,6 +644,7 @@ func TestTransactionPendingLimiting(t *testing.T) { state, _ := pool.currentState() state.AddBalance(account, big.NewInt(1000000)) + pool.resetState() // Keep queuing up transactions and make sure all above a limit are dropped for i := uint64(0); i < maxQueuedPerAccount+5; i++ { @@ -733,7 +814,7 @@ func benchmarkPendingDemotion(b *testing.B, size int) { // Benchmark the speed of pool validation b.ResetTimer() for i := 0; i < b.N; i++ { - pool.demoteUnexecutables() + pool.demoteUnexecutables(state) } } @@ -757,7 +838,7 @@ func benchmarkFuturePromotion(b *testing.B, size int) { // Benchmark the speed of pool validation b.ResetTimer() for i := 0; i < b.N; i++ { - pool.promoteExecutables() + pool.promoteExecutables(state) } } |