From 08959bbc70ade02109c819fdee72be1ed9310726 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 26 May 2017 13:40:47 +0300 Subject: cmd, core, eth: configurable txpool parameters --- core/tx_pool.go | 101 ++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 73 insertions(+), 28 deletions(-) (limited to 'core/tx_pool.go') diff --git a/core/tx_pool.go b/core/tx_pool.go index 6f67aaa0a..7aea02101 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -48,14 +48,8 @@ var ( ) var ( - minPendingPerAccount = uint64(16) // Min number of guaranteed transaction slots per address - maxPendingTotal = uint64(4096) // Max limit of pending transactions from all accounts (soft) - maxQueuedPerAccount = uint64(64) // Max limit of queued transactions per address - maxQueuedTotal = uint64(1024) // Max limit of queued transactions from all accounts - maxQueuedLifetime = 3 * time.Hour // Max amount of time transactions from idle accounts are queued - minPriceBumpPercent = int64(10) // Minimum price bump needed to replace an old transaction - evictionInterval = time.Minute // Time interval to check for evictable transactions - statsReportInterval = 8 * time.Second // Time interval to report transaction pool stats + evictionInterval = time.Minute // Time interval to check for evictable transactions + statsReportInterval = 8 * time.Second // Time interval to report transaction pool stats ) var ( @@ -78,6 +72,48 @@ var ( type stateFn func() (*state.StateDB, error) +// TxPoolConfig are the configuration parameters of the transaction pool. +type TxPoolConfig struct { + PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool + PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce) + + AccountSlots uint64 // Minimum number of executable transaction slots guaranteed per account + GlobalSlots uint64 // Maximum number of executable transaction slots for all accounts + AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account + GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts + + Lifetime time.Duration // Maximum amount of time non-executable transaction are queued +} + +// DefaultTxPoolConfig contains the default configurations for the transaction +// pool. +var DefaultTxPoolConfig = TxPoolConfig{ + PriceLimit: 1, + PriceBump: 10, + + AccountSlots: 16, + GlobalSlots: 4096, + AccountQueue: 64, + GlobalQueue: 1024, + + Lifetime: 3 * time.Hour, +} + +// sanitize checks the provided user configurations and changes anything that's +// unreasonable or unworkable. +func (config *TxPoolConfig) sanitize() TxPoolConfig { + conf := *config + if conf.PriceLimit < 1 { + log.Warn("Sanitizing invalid txpool price limit", "provided", conf.PriceLimit, "updated", DefaultTxPoolConfig.PriceLimit) + conf.PriceLimit = DefaultTxPoolConfig.PriceLimit + } + if conf.PriceBump < 1 { + log.Warn("Sanitizing invalid txpool price bump", "provided", conf.PriceBump, "updated", DefaultTxPoolConfig.PriceBump) + conf.PriceBump = DefaultTxPoolConfig.PriceBump + } + return conf +} + // TxPool contains all currently known transactions. Transactions // enter the pool when they are received from the network or submitted // locally. They exit the pool when they are included in the blockchain. @@ -86,7 +122,8 @@ type stateFn func() (*state.StateDB, error) // current state) and future transactions. Transactions move between those // two states over time as they are received and processed. type TxPool struct { - config *params.ChainConfig + config TxPoolConfig + chainconfig *params.ChainConfig currentState stateFn // The state function which will allow us to do some pre checks pendingState *state.ManagedState gasLimit func() *big.Int // The current gas limit function callback @@ -109,10 +146,17 @@ type TxPool struct { homestead bool } -func NewTxPool(config *params.ChainConfig, eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool { +// NewTxPool creates a new transaction pool to gather, sort and filter inbound +// trnsactions from the network. +func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool { + // Sanitize the input to ensure no vulnerable gas prices are set + config = (&config).sanitize() + + // Create the transaction pool with its initial settings pool := &TxPool{ config: config, - signer: types.NewEIP155Signer(config.ChainId), + chainconfig: chainconfig, + signer: types.NewEIP155Signer(chainconfig.ChainId), pending: make(map[common.Address]*txList), queue: make(map[common.Address]*txList), beats: make(map[common.Address]time.Time), @@ -120,7 +164,7 @@ func NewTxPool(config *params.ChainConfig, eventMux *event.TypeMux, currentState eventMux: eventMux, currentState: currentStateFn, gasLimit: gasLimitFn, - gasPrice: big.NewInt(1), + gasPrice: new(big.Int).SetUint64(config.PriceLimit), pendingState: nil, locals: newTxSet(), events: eventMux.Subscribe(ChainHeadEvent{}, RemovedTransactionEvent{}), @@ -129,6 +173,7 @@ func NewTxPool(config *params.ChainConfig, eventMux *event.TypeMux, currentState pool.priced = newTxPricedList(&pool.all) pool.resetState() + // Start the various events loops and return pool.wg.Add(2) go pool.eventLoop() go pool.expirationLoop() @@ -159,7 +204,7 @@ func (pool *TxPool) eventLoop() { case ChainHeadEvent: pool.mu.Lock() if ev.Block != nil { - if pool.config.IsHomestead(ev.Block.Number()) { + if pool.chainconfig.IsHomestead(ev.Block.Number()) { pool.homestead = true } } @@ -388,7 +433,7 @@ func (pool *TxPool) add(tx *types.Transaction) (bool, error) { return false, err } // If the transaction pool is full, discard underpriced transactions - if uint64(len(pool.all)) >= maxPendingTotal+maxQueuedTotal { + if uint64(len(pool.all)) >= pool.config.GlobalSlots+pool.config.GlobalQueue { // If the new transaction is underpriced, don't accept it if pool.priced.Underpriced(tx, pool.locals) { log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice()) @@ -396,7 +441,7 @@ func (pool *TxPool) add(tx *types.Transaction) (bool, error) { return false, ErrUnderpriced } // New transaction is better than our worse ones, make room for it - drop := pool.priced.Discard(len(pool.all)-int(maxPendingTotal+maxQueuedTotal-1), pool.locals) + drop := pool.priced.Discard(len(pool.all)-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals) for _, tx := range drop { log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice()) underpricedTxCounter.Inc(1) @@ -407,7 +452,7 @@ func (pool *TxPool) add(tx *types.Transaction) (bool, error) { from, _ := types.Sender(pool.signer, tx) // already validated if list := pool.pending[from]; list != nil && list.Overlaps(tx) { // Nonce already pending, check if required price bump is met - inserted, old := list.Add(tx) + inserted, old := list.Add(tx, pool.config.PriceBump) if !inserted { pendingDiscardCounter.Inc(1) return false, ErrReplaceUnderpriced @@ -442,7 +487,7 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er if pool.queue[from] == nil { pool.queue[from] = newTxList(false) } - inserted, old := pool.queue[from].Add(tx) + inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump) if !inserted { // An older transaction was better, discard this queuedDiscardCounter.Inc(1) @@ -469,7 +514,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T } list := pool.pending[addr] - inserted, old := list.Add(tx) + inserted, old := list.Add(tx, pool.config.PriceBump) if !inserted { // An older transaction was better, discard this delete(pool.all, hash) @@ -644,7 +689,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB) { pool.promoteTx(addr, hash, tx) } // Drop all transactions over the allowed limit - for _, tx := range list.Cap(int(maxQueuedPerAccount)) { + for _, tx := range list.Cap(int(pool.config.AccountQueue)) { hash := tx.Hash() log.Trace("Removed cap-exceeding queued transaction", "hash", hash) delete(pool.all, hash) @@ -663,13 +708,13 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB) { for _, list := range pool.pending { pending += uint64(list.Len()) } - if pending > maxPendingTotal { + if pending > pool.config.GlobalSlots { pendingBeforeCap := pending // Assemble a spam order to penalize large transactors first spammers := prque.New() for addr, list := range pool.pending { // Only evict transactions from high rollers - if uint64(list.Len()) > minPendingPerAccount { + if uint64(list.Len()) > pool.config.AccountSlots { // Skip local accounts as pools should maintain backlogs for themselves for _, tx := range list.txs.items { if !pool.locals.contains(tx.Hash()) { @@ -681,7 +726,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB) { } // Gradually drop transactions from offenders offenders := []common.Address{} - for pending > maxPendingTotal && !spammers.Empty() { + for pending > pool.config.GlobalSlots && !spammers.Empty() { // Retrieve the next offender if not local address offender, _ := spammers.Pop() offenders = append(offenders, offender.(common.Address)) @@ -692,7 +737,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB) { threshold := pool.pending[offender.(common.Address)].Len() // Iteratively reduce all offenders until below limit or threshold reached - for pending > maxPendingTotal && pool.pending[offenders[len(offenders)-2]].Len() > threshold { + for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold { for i := 0; i < len(offenders)-1; i++ { list := pool.pending[offenders[i]] list.Cap(list.Len() - 1) @@ -702,8 +747,8 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB) { } } // If still above threshold, reduce to limit or min allowance - if pending > maxPendingTotal && len(offenders) > 0 { - for pending > maxPendingTotal && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > minPendingPerAccount { + if pending > pool.config.GlobalSlots && len(offenders) > 0 { + for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots { for _, addr := range offenders { list := pool.pending[addr] list.Cap(list.Len() - 1) @@ -714,7 +759,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB) { pendingRLCounter.Inc(int64(pendingBeforeCap - pending)) } // If we've queued more transactions than the hard limit, drop oldest ones - if queued > maxQueuedTotal { + if queued > pool.config.GlobalQueue { // Sort all accounts with queued transactions by heartbeat addresses := make(addresssByHeartbeat, 0, len(pool.queue)) for addr := range pool.queue { @@ -723,7 +768,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB) { sort.Sort(addresses) // Drop transactions until the total is below the limit - for drop := queued - maxQueuedTotal; drop > 0; { + for drop := queued - pool.config.GlobalQueue; drop > 0; { addr := addresses[len(addresses)-1] list := pool.queue[addr.address] @@ -800,7 +845,7 @@ func (pool *TxPool) expirationLoop() { case <-evict.C: pool.mu.Lock() for addr := range pool.queue { - if time.Since(pool.beats[addr]) > maxQueuedLifetime { + if time.Since(pool.beats[addr]) > pool.config.Lifetime { for _, tx := range pool.queue[addr].Flatten() { pool.removeTx(tx.Hash()) } -- cgit v1.2.3