diff options
author | Péter Szilágyi <peterke@gmail.com> | 2018-05-20 00:39:28 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-05-20 00:39:28 +0800 |
commit | 953b5ac015da7fdef39c6864bb0cb490ffaa7959 (patch) | |
tree | 0a1b93effde4276c64a1c62ff97f25a5cd2deb7f /core/tx_pool.go | |
parent | f9c456e02d6b8320389a7ea95c0eef3e10a87e2e (diff) | |
parent | 49719e21bcd740c5890334f8c0ec8ac3777fb4c6 (diff) | |
download | dexon-953b5ac015da7fdef39c6864bb0cb490ffaa7959.tar dexon-953b5ac015da7fdef39c6864bb0cb490ffaa7959.tar.gz dexon-953b5ac015da7fdef39c6864bb0cb490ffaa7959.tar.bz2 dexon-953b5ac015da7fdef39c6864bb0cb490ffaa7959.tar.lz dexon-953b5ac015da7fdef39c6864bb0cb490ffaa7959.tar.xz dexon-953b5ac015da7fdef39c6864bb0cb490ffaa7959.tar.zst dexon-953b5ac015da7fdef39c6864bb0cb490ffaa7959.zip |
Merge pull request #16720 from rjl493456442/PreTxsEvent
all: collate new transaction events together
Diffstat (limited to 'core/tx_pool.go')
-rw-r--r-- | core/tx_pool.go | 30 |
1 files changed, 20 insertions, 10 deletions
diff --git a/core/tx_pool.go b/core/tx_pool.go index 388b40058..f89e11441 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -238,7 +238,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block if !config.NoLocals && config.Journal != "" { pool.journal = newTxJournal(config.Journal) - if err := pool.journal.load(pool.AddLocal); err != nil { + if err := pool.journal.load(pool.AddLocals); err != nil { log.Warn("Failed to load transaction journal", "err", err) } if err := pool.journal.rotate(pool.local()); err != nil { @@ -444,9 +444,9 @@ func (pool *TxPool) Stop() { log.Info("Transaction pool stopped") } -// SubscribeTxPreEvent registers a subscription of TxPreEvent and +// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and // starts sending event to the given channel. -func (pool *TxPool) SubscribeTxPreEvent(ch chan<- TxPreEvent) event.Subscription { +func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- NewTxsEvent) event.Subscription { return pool.scope.Track(pool.txFeed.Subscribe(ch)) } @@ -653,7 +653,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To()) // We've directly injected a replacement transaction, notify subsystems - go pool.txFeed.Send(TxPreEvent{tx}) + go pool.txFeed.Send(NewTxsEvent{types.Transactions{tx}}) return old != nil, nil } @@ -712,10 +712,11 @@ func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) { } } -// promoteTx adds a transaction to the pending (processable) list of transactions. +// promoteTx adds a transaction to the pending (processable) list of transactions +// and returns whether it was inserted or an older was better. // // Note, this method assumes the pool lock is held! -func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) { +func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) bool { // Try to insert the transaction into the pending queue if pool.pending[addr] == nil { pool.pending[addr] = newTxList(true) @@ -729,7 +730,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T pool.priced.Removed() pendingDiscardCounter.Inc(1) - return + return false } // Otherwise discard any previous transaction and mark this if old != nil { @@ -747,7 +748,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T pool.beats[addr] = time.Now() pool.pendingState.SetNonce(addr, tx.Nonce()+1) - go pool.txFeed.Send(TxPreEvent{tx}) + return true } // AddLocal enqueues a single transaction into the pool if it is valid, marking @@ -907,6 +908,9 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { // future queue to the set of pending transactions. During this process, all // invalidated transactions (low nonce, low balance) are deleted. func (pool *TxPool) promoteExecutables(accounts []common.Address) { + // Track the promoted transactions to broadcast them at once + var promoted []*types.Transaction + // Gather all the accounts potentially needing updates if accounts == nil { accounts = make([]common.Address, 0, len(pool.queue)) @@ -939,8 +943,10 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { // Gather all executable transactions and promote them for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) { hash := tx.Hash() - log.Trace("Promoting queued transaction", "hash", hash) - pool.promoteTx(addr, hash, tx) + if pool.promoteTx(addr, hash, tx) { + log.Trace("Promoting queued transaction", "hash", hash) + promoted = append(promoted, tx) + } } // Drop all transactions over the allowed limit if !pool.locals.contains(addr) { @@ -957,6 +963,10 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { delete(pool.queue, addr) } } + // Notify subsystem for new promoted transactions. + if len(promoted) > 0 { + pool.txFeed.Send(NewTxsEvent{promoted}) + } // If the pending limit is overflown, start equalizing allowances pending := uint64(0) for _, list := range pool.pending { |