aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--core/tx_pool.go65
-rw-r--r--core/tx_pool_test.go90
2 files changed, 150 insertions, 5 deletions
diff --git a/core/tx_pool.go b/core/tx_pool.go
index 4c9410d8e..2c8a5c396 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -30,6 +30,7 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
+ "gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
var (
@@ -46,10 +47,12 @@ var (
)
var (
- maxQueuedPerAccount = uint64(64) // Max limit of queued transactions per address
- maxQueuedInTotal = uint64(8192) // Max limit of queued transactions from all accounts
- maxQueuedLifetime = 3 * time.Hour // Max amount of time transactions from idle accounts are queued
- evictionInterval = time.Minute // Time interval to check for evictable transactions
+ minPendingPerAccount = uint64(16) // Min number of guaranteed transaction slots per address
+ maxPendingTotal = uint64(4096) // Max limit of pending transactions from all accounts (soft)
+ maxQueuedPerAccount = uint64(64) // Max limit of queued transactions per address
+ maxQueuedInTotal = uint64(1024) // Max limit of queued transactions from all accounts
+ maxQueuedLifetime = 3 * time.Hour // Max amount of time transactions from idle accounts are queued
+ evictionInterval = time.Minute // Time interval to check for evictable transactions
)
type stateFn func() (*state.StateDB, error)
@@ -481,7 +484,6 @@ func (pool *TxPool) promoteExecutables() {
}
// Iterate over all accounts and promote any executable transactions
queued := uint64(0)
-
for addr, list := range pool.queue {
// Drop all transactions that are deemed too old (low nonce)
for _, tx := range list.Forward(state.GetNonce(addr)) {
@@ -519,6 +521,59 @@ func (pool *TxPool) promoteExecutables() {
delete(pool.queue, addr)
}
}
+ // If the pending limit is overflown, start equalizing allowances
+ pending := uint64(0)
+ for _, list := range pool.pending {
+ pending += uint64(list.Len())
+ }
+ if pending > maxPendingTotal {
+ // Assemble a spam order to penalize large transactors first
+ spammers := prque.New()
+ for addr, list := range pool.pending {
+ // Only evict transactions from high rollers
+ if uint64(list.Len()) > minPendingPerAccount {
+ // Skip local accounts as pools should maintain backlogs for themselves
+ for _, tx := range list.txs.items {
+ if !pool.localTx.contains(tx.Hash()) {
+ spammers.Push(addr, float32(list.Len()))
+ }
+ break // Checking on transaction for locality is enough
+ }
+ }
+ }
+ // Gradually drop transactions from offenders
+ offenders := []common.Address{}
+ for pending > maxPendingTotal && !spammers.Empty() {
+ // Retrieve the next offender if not local address
+ offender, _ := spammers.Pop()
+ offenders = append(offenders, offender.(common.Address))
+
+ // Equalize balances until all the same or below threshold
+ if len(offenders) > 1 {
+ // Calculate the equalization threshold for all current offenders
+ threshold := pool.pending[offender.(common.Address)].Len()
+
+ // Iteratively reduce all offenders until below limit or threshold reached
+ for pending > maxPendingTotal && pool.pending[offenders[len(offenders)-2]].Len() > threshold {
+ for i := 0; i < len(offenders)-1; i++ {
+ list := pool.pending[offenders[i]]
+ list.Cap(list.Len() - 1)
+ pending--
+ }
+ }
+ }
+ }
+ // If still above threshold, reduce to limit or min allowance
+ if pending > maxPendingTotal && len(offenders) > 0 {
+ for pending > maxPendingTotal && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > minPendingPerAccount {
+ for _, addr := range offenders {
+ list := pool.pending[addr]
+ list.Cap(list.Len() - 1)
+ pending--
+ }
+ }
+ }
+ }
// If we've queued more transactions than the hard limit, drop oldest ones
if queued > maxQueuedInTotal {
// Sort all accounts with queued transactions by heartbeat
diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go
index 4bc5aed38..dbe6fa635 100644
--- a/core/tx_pool_test.go
+++ b/core/tx_pool_test.go
@@ -618,6 +618,96 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) {
}
}
+// Tests that if the transaction count belonging to multiple accounts go above
+// some hard threshold, the higher transactions are dropped to prevent DOS
+// attacks.
+func TestTransactionPendingGlobalLimiting(t *testing.T) {
+ // Reduce the queue limits to shorten test time
+ defer func(old uint64) { maxPendingTotal = old }(maxPendingTotal)
+ maxPendingTotal = minPendingPerAccount * 10
+
+ // Create the pool to test the limit enforcement with
+ db, _ := ethdb.NewMemDatabase()
+ statedb, _ := state.New(common.Hash{}, db)
+
+ pool := NewTxPool(testChainConfig(), new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
+ pool.resetState()
+
+ // Create a number of test accounts and fund them
+ state, _ := pool.currentState()
+
+ keys := make([]*ecdsa.PrivateKey, 5)
+ for i := 0; i < len(keys); i++ {
+ keys[i], _ = crypto.GenerateKey()
+ state.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
+ }
+ // Generate and queue a batch of transactions
+ nonces := make(map[common.Address]uint64)
+
+ txs := types.Transactions{}
+ for _, key := range keys {
+ addr := crypto.PubkeyToAddress(key.PublicKey)
+ for j := 0; j < int(maxPendingTotal)/len(keys)*2; j++ {
+ txs = append(txs, transaction(nonces[addr], big.NewInt(100000), key))
+ nonces[addr]++
+ }
+ }
+ // Import the batch and verify that limits have been enforced
+ pool.AddBatch(txs)
+
+ pending := 0
+ for _, list := range pool.pending {
+ pending += list.Len()
+ }
+ if pending > int(maxPendingTotal) {
+ t.Fatalf("total pending transactions overflow allowance: %d > %d", pending, maxPendingTotal)
+ }
+}
+
+// Tests that if the transaction count belonging to multiple accounts go above
+// some hard threshold, if they are under the minimum guaranteed slot count then
+// the transactions are still kept.
+func TestTransactionPendingMinimumAllowance(t *testing.T) {
+ // Reduce the queue limits to shorten test time
+ defer func(old uint64) { maxPendingTotal = old }(maxPendingTotal)
+ maxPendingTotal = 0
+
+ // Create the pool to test the limit enforcement with
+ db, _ := ethdb.NewMemDatabase()
+ statedb, _ := state.New(common.Hash{}, db)
+
+ pool := NewTxPool(testChainConfig(), new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
+ pool.resetState()
+
+ // Create a number of test accounts and fund them
+ state, _ := pool.currentState()
+
+ keys := make([]*ecdsa.PrivateKey, 5)
+ for i := 0; i < len(keys); i++ {
+ keys[i], _ = crypto.GenerateKey()
+ state.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
+ }
+ // Generate and queue a batch of transactions
+ nonces := make(map[common.Address]uint64)
+
+ txs := types.Transactions{}
+ for _, key := range keys {
+ addr := crypto.PubkeyToAddress(key.PublicKey)
+ for j := 0; j < int(minPendingPerAccount)*2; j++ {
+ txs = append(txs, transaction(nonces[addr], big.NewInt(100000), key))
+ nonces[addr]++
+ }
+ }
+ // Import the batch and verify that limits have been enforced
+ pool.AddBatch(txs)
+
+ for addr, list := range pool.pending {
+ if list.Len() != int(minPendingPerAccount) {
+ t.Errorf("addr %x: total pending transactions mismatch: have %d, want %d", addr, list.Len(), minPendingPerAccount)
+ }
+ }
+}
+
// Benchmarks the speed of validating the contents of the pending queue of the
// transaction pool.
func BenchmarkPendingDemotion100(b *testing.B) { benchmarkPendingDemotion(b, 100) }