From db48d312e44dab3a756aa6976412fe54f8e891ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 12 Apr 2018 12:17:52 +0300 Subject: core: txpool stable underprice drop order, perf fixes --- core/tx_list.go | 17 +++++++++-- core/tx_pool.go | 23 +++++++------- core/tx_pool_test.go | 85 +++++++++++++++++++++++++++++++++++++++++++++------- 3 files changed, 101 insertions(+), 24 deletions(-) diff --git a/core/tx_list.go b/core/tx_list.go index 55fc42617..ea6ee7019 100644 --- a/core/tx_list.go +++ b/core/tx_list.go @@ -367,9 +367,20 @@ func (l *txList) Flatten() types.Transactions { // price-sorted transactions to discard when the pool fills up. type priceHeap []*types.Transaction -func (h priceHeap) Len() int { return len(h) } -func (h priceHeap) Less(i, j int) bool { return h[i].GasPrice().Cmp(h[j].GasPrice()) < 0 } -func (h priceHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h priceHeap) Len() int { return len(h) } +func (h priceHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h priceHeap) Less(i, j int) bool { + // Sort primarily by price, returning the cheaper one + switch h[i].GasPrice().Cmp(h[j].GasPrice()) { + case -1: + return true + case 1: + return false + } + // If the prices match, stabilize via nonces (high nonce is worse) + return h[i].Nonce() > h[j].Nonce() +} func (h *priceHeap) Push(x interface{}) { *h = append(*h, x.(*types.Transaction)) diff --git a/core/tx_pool.go b/core/tx_pool.go index 089bd215a..a554f6611 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -320,7 +320,7 @@ func (pool *TxPool) loop() { // Any non-locals old enough should be removed if time.Since(pool.beats[addr]) > pool.config.Lifetime { for _, tx := range pool.queue[addr].Flatten() { - pool.removeTx(tx.Hash()) + pool.removeTx(tx.Hash(), true) } } } @@ -468,7 +468,7 @@ func (pool *TxPool) SetGasPrice(price *big.Int) { pool.gasPrice = price for _, tx := range pool.priced.Cap(price, pool.locals) { - pool.removeTx(tx.Hash()) + pool.removeTx(tx.Hash(), false) } log.Info("Transaction pool price threshold updated", "price", price) } @@ -630,7 +630,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { for _, tx := range drop { log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice()) underpricedTxCounter.Inc(1) - pool.removeTx(tx.Hash()) + pool.removeTx(tx.Hash(), false) } } // If the transaction is replacing an already pending one, do directly @@ -695,8 +695,10 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er pool.priced.Removed() queuedReplaceCounter.Inc(1) } - pool.all[hash] = tx - pool.priced.Put(tx) + if pool.all[hash] == nil { + pool.all[hash] = tx + pool.priced.Put(tx) + } return old != nil, nil } @@ -862,7 +864,7 @@ func (pool *TxPool) Get(hash common.Hash) *types.Transaction { // removeTx removes a single transaction from the queue, moving all subsequent // transactions back to the future queue. -func (pool *TxPool) removeTx(hash common.Hash) { +func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { // Fetch the transaction we wish to delete tx, ok := pool.all[hash] if !ok { @@ -872,8 +874,9 @@ func (pool *TxPool) removeTx(hash common.Hash) { // Remove it from the list of known transactions delete(pool.all, hash) - pool.priced.Removed() - + if outofbound { + pool.priced.Removed() + } // Remove the transaction from the pending lists and reset the account nonce if pending := pool.pending[addr]; pending != nil { if removed, invalids := pending.Remove(tx); removed { @@ -1052,7 +1055,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { // Drop all transactions if they are less than the overflow if size := uint64(list.Len()); size <= drop { for _, tx := range list.Flatten() { - pool.removeTx(tx.Hash()) + pool.removeTx(tx.Hash(), true) } drop -= size queuedRateLimitCounter.Inc(int64(size)) @@ -1061,7 +1064,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { // Otherwise drop only last few transactions txs := list.Flatten() for i := len(txs) - 1; i >= 0 && drop > 0; i-- { - pool.removeTx(txs[i].Hash()) + pool.removeTx(txs[i].Hash(), true) drop-- queuedRateLimitCounter.Inc(1) } diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index 1cf533aa6..0cb14cb6a 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -209,15 +209,10 @@ func TestStateChangeDuringTransactionPoolReset(t *testing.T) { pool.lockedReset(nil, nil) - pendingTx, err := pool.Pending() + _, err := pool.Pending() if err != nil { t.Fatalf("Could not fetch pending transactions: %v", err) } - - for addr, txs := range pendingTx { - t.Logf("%0x: %d\n", addr, len(txs)) - } - nonce = pool.State().GetNonce(address) if nonce != 2 { t.Fatalf("Invalid nonce, want 2, got %d", nonce) @@ -350,7 +345,7 @@ func TestTransactionChainFork(t *testing.T) { if _, err := pool.add(tx, false); err != nil { t.Error("didn't expect error", err) } - pool.removeTx(tx.Hash()) + pool.removeTx(tx.Hash(), true) // reset the pool's internal state resetState() @@ -1388,13 +1383,13 @@ func TestTransactionPoolUnderpricing(t *testing.T) { t.Fatalf("adding underpriced pending transaction error mismatch: have %v, want %v", err, ErrUnderpriced) } // Ensure that adding high priced transactions drops cheap ones, but not own - if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(3), keys[1])); err != nil { + if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(3), keys[1])); err != nil { // +K1:0 => -K1:1 => Pend K0:0, K0:1, K1:0, K2:0; Que - t.Fatalf("failed to add well priced transaction: %v", err) } - if err := pool.AddRemote(pricedTransaction(2, 100000, big.NewInt(4), keys[1])); err != nil { + if err := pool.AddRemote(pricedTransaction(2, 100000, big.NewInt(4), keys[1])); err != nil { // +K1:2 => -K0:0 => Pend K1:0, K2:0; Que K0:1 K1:2 t.Fatalf("failed to add well priced transaction: %v", err) } - if err := pool.AddRemote(pricedTransaction(3, 100000, big.NewInt(5), keys[1])); err != nil { + if err := pool.AddRemote(pricedTransaction(3, 100000, big.NewInt(5), keys[1])); err != nil { // +K1:3 => -K0:1 => Pend K1:0, K2:0; Que K1:2 K1:3 t.Fatalf("failed to add well priced transaction: %v", err) } pending, queued = pool.Stats() @@ -1404,7 +1399,7 @@ func TestTransactionPoolUnderpricing(t *testing.T) { if queued != 2 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2) } - if err := validateEvents(events, 2); err != nil { + if err := validateEvents(events, 1); err != nil { t.Fatalf("additional event firing failed: %v", err) } if err := validateTxPoolInternals(pool); err != nil { @@ -1430,6 +1425,74 @@ func TestTransactionPoolUnderpricing(t *testing.T) { } } +// Tests that more expensive transactions push out cheap ones from the pool, but +// without producing instability by creating gaps that start jumping transactions +// back and forth between queued/pending. +func TestTransactionPoolStableUnderpricing(t *testing.T) { + t.Parallel() + + // Create the pool to test the pricing enforcement with + db, _ := ethdb.NewMemDatabase() + statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)} + + config := testTxPoolConfig + config.GlobalSlots = 128 + config.GlobalQueue = 0 + + pool := NewTxPool(config, params.TestChainConfig, blockchain) + defer pool.Stop() + + // Keep track of transaction events to ensure all executables get announced + events := make(chan TxPreEvent, 32) + sub := pool.txFeed.Subscribe(events) + defer sub.Unsubscribe() + + // Create a number of test accounts and fund them + keys := make([]*ecdsa.PrivateKey, 2) + for i := 0; i < len(keys); i++ { + keys[i], _ = crypto.GenerateKey() + pool.currentState.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) + } + // Fill up the entire queue with the same transaction price points + txs := types.Transactions{} + for i := uint64(0); i < config.GlobalSlots; i++ { + txs = append(txs, pricedTransaction(i, 100000, big.NewInt(1), keys[0])) + } + pool.AddRemotes(txs) + + pending, queued := pool.Stats() + if pending != int(config.GlobalSlots) { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, config.GlobalSlots) + } + if queued != 0 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) + } + if err := validateEvents(events, int(config.GlobalSlots)); err != nil { + t.Fatalf("original event firing failed: %v", err) + } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } + // Ensure that adding high priced transactions drops a cheap, but doesn't produce a gap + if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(3), keys[1])); err != nil { + t.Fatalf("failed to add well priced transaction: %v", err) + } + pending, queued = pool.Stats() + if pending != int(config.GlobalSlots) { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, config.GlobalSlots) + } + if queued != 0 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) + } + if err := validateEvents(events, 1); err != nil { + t.Fatalf("additional event firing failed: %v", err) + } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } +} + // Tests that the pool rejects replacement transactions that don't meet the minimum // price bump required. func TestTransactionReplacement(t *testing.T) { -- cgit v1.2.3