From 649787a9bf76fbf4b28336d4c1f06922e44e6d49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 30 Dec 2015 18:31:37 +0200 Subject: core: fix transaction reorg issues within the tx pool --- core/transaction_pool.go | 119 +++++++++++++++++++++++++++++++---------------- 1 file changed, 78 insertions(+), 41 deletions(-) (limited to 'core/transaction_pool.go') diff --git a/core/transaction_pool.go b/core/transaction_pool.go index 7dcc2aac2..abd5bb7b7 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -140,7 +140,6 @@ func (pool *TxPool) resetState() { } } } - // Check the queue and move transactions over to the pending if possible // or remove those that have become invalid pool.checkQueue() @@ -301,17 +300,15 @@ func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Trans } // Add queues a single transaction in the pool if it is valid. -func (self *TxPool) Add(tx *types.Transaction) (err error) { +func (self *TxPool) Add(tx *types.Transaction) error { self.mu.Lock() defer self.mu.Unlock() - err = self.add(tx) - if err == nil { - // check and validate the queueue - self.checkQueue() + if err := self.add(tx); err != nil { + return err } - - return + self.checkQueue() + return nil } // AddTransactions attempts to queue all valid transactions in txs. @@ -417,51 +414,55 @@ func (pool *TxPool) checkQueue() { pool.resetState() } - var addq txQueue + var promote txQueue for address, txs := range pool.queue { - // guessed nonce is the nonce currently kept by the tx pool (pending state) - guessedNonce := pool.pendingState.GetNonce(address) - // true nonce is the nonce known by the last state currentState, err := pool.currentState() if err != nil { glog.Errorf("could not get current state: %v", err) return } - trueNonce := currentState.GetNonce(address) - addq := addq[:0] + balance := currentState.GetBalance(address) + + var ( + guessedNonce = pool.pendingState.GetNonce(address) // nonce currently kept by the tx pool (pending state) + trueNonce = currentState.GetNonce(address) // nonce known by the last state + ) + promote = promote[:0] for hash, tx := range txs { - if tx.Nonce() < trueNonce { - // Drop queued transactions whose nonce is lower than - // the account nonce because they have been processed. + // Drop processed or out of fund transactions + if tx.Nonce() < trueNonce || balance.Cmp(tx.Cost()) < 0 { + if glog.V(logger.Core) { + glog.Infof("removed tx (%v) from pool queue: low tx nonce or out of funds\n", tx) + } delete(txs, hash) - } else { - // Collect the remaining transactions for the next pass. - addq = append(addq, txQueueEntry{hash, address, tx}) - } - } - // Find the next consecutive nonce range starting at the - // current account nonce. - sort.Sort(addq) - for i, e := range addq { - // start deleting the transactions from the queue if they exceed the limit - if i > maxQueued { - delete(pool.queue[address], e.hash) continue } - - if e.Nonce() > guessedNonce { - if len(addq)-i > maxQueued { + // Collect the remaining transactions for the next pass. + promote = append(promote, txQueueEntry{hash, address, tx}) + } + // Find the next consecutive nonce range starting at the current account nonce, + // pushing the guessed nonce forward if we add consecutive transactions. + sort.Sort(promote) + for i, entry := range promote { + // If we reached a gap in the nonces, enforce transaction limit and stop + if entry.Nonce() > guessedNonce { + if len(promote)-i > maxQueued { if glog.V(logger.Debug) { - glog.Infof("Queued tx limit exceeded for %s. Tx %s removed\n", common.PP(address[:]), common.PP(e.hash[:])) + glog.Infof("Queued tx limit exceeded for %s. Tx %s removed\n", common.PP(address[:]), common.PP(entry.hash[:])) } - for j := i + maxQueued; j < len(addq); j++ { - delete(txs, addq[j].hash) + for _, drop := range promote[i+maxQueued:] { + delete(txs, drop.hash) } } break } - delete(txs, e.hash) - pool.addTx(e.hash, address, e.Transaction) + // Otherwise promote the transaction and move the guess nonce if needed + pool.addTx(entry.hash, address, entry.Transaction) + delete(txs, entry.hash) + + if entry.Nonce() == guessedNonce { + guessedNonce++ + } } // Delete the entire queue entry if it became empty. if len(txs) == 0 { @@ -471,20 +472,56 @@ func (pool *TxPool) checkQueue() { } // validatePool removes invalid and processed transactions from the main pool. +// If a transaction is removed for being invalid (e.g. out of funds), all sub- +// sequent (Still valid) transactions are moved back into the future queue. This +// is important to prevent a drained account from DOSing the network with non +// executable transactions. func (pool *TxPool) validatePool() { state, err := pool.currentState() if err != nil { glog.V(logger.Info).Infoln("failed to get current state: %v", err) return } + balanceCache := make(map[common.Address]*big.Int) + + // Clean up the pending pool, accumulating invalid nonces + gaps := make(map[common.Address]uint64) + for hash, tx := range pool.pending { - from, _ := tx.From() // err already checked - // perform light nonce validation - if state.GetNonce(from) > tx.Nonce() { + sender, _ := tx.From() // err already checked + + // Perform light nonce and balance validation + balance := balanceCache[sender] + if balance == nil { + balance = state.GetBalance(sender) + balanceCache[sender] = balance + } + if past := state.GetNonce(sender) > tx.Nonce(); past || balance.Cmp(tx.Cost()) < 0 { + // Remove an already past it invalidated transaction if glog.V(logger.Core) { - glog.Infof("removed tx (%x) from pool: low tx nonce\n", hash[:4]) + glog.Infof("removed tx (%v) from pool: low tx nonce or out of funds\n", tx) } delete(pool.pending, hash) + + // Track the smallest invalid nonce to postpone subsequent transactions + if !past { + if prev, ok := gaps[sender]; !ok || tx.Nonce() < prev { + gaps[sender] = tx.Nonce() + } + } + } + } + // Move all transactions after a gap back to the future queue + if len(gaps) > 0 { + for hash, tx := range pool.pending { + sender, _ := tx.From() + if gap, ok := gaps[sender]; ok && tx.Nonce() >= gap { + if glog.V(logger.Core) { + glog.Infof("postponed tx (%v) due to introduced gap\n", tx) + } + pool.queueTx(hash, tx) + delete(pool.pending, hash) + } } } } -- cgit v1.2.3