From a2e43d28d01ef9642c7f6992b78b86bd0696c847 Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Thu, 10 May 2018 15:04:45 +0800 Subject: all: collate new transaction events together --- core/events.go | 4 ++-- core/tx_journal.go | 32 +++++++++++++++++++++++++------- core/tx_pool.go | 32 +++++++++++++++++++++----------- core/tx_pool_test.go | 31 ++++++++++++++++++------------- 4 files changed, 66 insertions(+), 33 deletions(-) (limited to 'core') diff --git a/core/events.go b/core/events.go index 6f404f612..fee34b39d 100644 --- a/core/events.go +++ b/core/events.go @@ -21,8 +21,8 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) -// TxPreEvent is posted when a transaction enters the transaction pool. -type TxPreEvent struct{ Tx *types.Transaction } +// TxsPreEvent is posted when a batch of transactions enter the transaction pool. +type TxsPreEvent struct{ Txs types.Transactions } // PendingLogsEvent is posted pre mining and notifies of pending logs. type PendingLogsEvent struct { diff --git a/core/tx_journal.go b/core/tx_journal.go index e872d7b53..b344690b6 100644 --- a/core/tx_journal.go +++ b/core/tx_journal.go @@ -56,7 +56,7 @@ func newTxJournal(path string) *txJournal { // load parses a transaction journal dump from disk, loading its contents into // the specified pool. -func (journal *txJournal) load(add func(*types.Transaction) error) error { +func (journal *txJournal) load(add func([]*types.Transaction) []error) error { // Skip the parsing if the journal file doens't exist at all if _, err := os.Stat(journal.path); os.IsNotExist(err) { return nil @@ -76,7 +76,22 @@ func (journal *txJournal) load(add func(*types.Transaction) error) error { stream := rlp.NewStream(input, 0) total, dropped := 0, 0 - var failure error + // flush imports a batch of transactions and bump the appropriate progress counters + flush := func(txs types.Transactions) { + errs := add(txs) + for _, err := range errs { + if err != nil { + log.Debug("Failed to add journaled transaction", "err", err) + dropped++ + } + } + } + + var ( + failure error + txs types.Transactions + ) + for { // Parse the next transaction and terminate on error tx := new(types.Transaction) @@ -86,14 +101,17 @@ func (journal *txJournal) load(add func(*types.Transaction) error) error { } break } - // Import the transaction and bump the appropriate progress counters + txs = append(txs, tx) total++ - if err = add(tx); err != nil { - log.Debug("Failed to add journaled transaction", "err", err) - dropped++ - continue + if txs.Len() > 1024 { + flush(txs) + txs = types.Transactions{} } } + if txs.Len() > 0 { + flush(txs) + txs = types.Transactions{} + } log.Info("Loaded local transaction journal", "transactions", total, "dropped", dropped) return failure diff --git a/core/tx_pool.go b/core/tx_pool.go index 388b40058..f23828cb4 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -238,7 +238,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block if !config.NoLocals && config.Journal != "" { pool.journal = newTxJournal(config.Journal) - if err := pool.journal.load(pool.AddLocal); err != nil { + if err := pool.journal.load(pool.AddLocals); err != nil { log.Warn("Failed to load transaction journal", "err", err) } if err := pool.journal.rotate(pool.local()); err != nil { @@ -444,9 +444,9 @@ func (pool *TxPool) Stop() { log.Info("Transaction pool stopped") } -// SubscribeTxPreEvent registers a subscription of TxPreEvent and +// SubscribeTxPreEvent registers a subscription of TxsPreEvent and // starts sending event to the given channel. -func (pool *TxPool) SubscribeTxPreEvent(ch chan<- TxPreEvent) event.Subscription { +func (pool *TxPool) SubscribeTxPreEvent(ch chan<- TxsPreEvent) event.Subscription { return pool.scope.Track(pool.txFeed.Subscribe(ch)) } @@ -653,7 +653,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To()) // We've directly injected a replacement transaction, notify subsystems - go pool.txFeed.Send(TxPreEvent{tx}) + go pool.txFeed.Send(TxsPreEvent{types.Transactions{tx}}) return old != nil, nil } @@ -715,7 +715,7 @@ func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) { // promoteTx adds a transaction to the pending (processable) list of transactions. // // Note, this method assumes the pool lock is held! -func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) { +func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) bool { // Try to insert the transaction into the pending queue if pool.pending[addr] == nil { pool.pending[addr] = newTxList(true) @@ -729,7 +729,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T pool.priced.Removed() pendingDiscardCounter.Inc(1) - return + return false } // Otherwise discard any previous transaction and mark this if old != nil { @@ -746,8 +746,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T // Set the potentially new pending nonce and notify any subsystems of the new tx pool.beats[addr] = time.Now() pool.pendingState.SetNonce(addr, tx.Nonce()+1) - - go pool.txFeed.Send(TxPreEvent{tx}) + return true } // AddLocal enqueues a single transaction into the pool if it is valid, marking @@ -907,6 +906,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { // future queue to the set of pending transactions. During this process, all // invalidated transactions (low nonce, low balance) are deleted. func (pool *TxPool) promoteExecutables(accounts []common.Address) { + var promotedTxs types.Transactions // Gather all the accounts potentially needing updates if accounts == nil { accounts = make([]common.Address, 0, len(pool.queue)) @@ -937,11 +937,16 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { queuedNofundsCounter.Inc(1) } // Gather all executable transactions and promote them - for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) { + txs := list.Ready(pool.pendingState.GetNonce(addr)) + for _, tx := range txs { hash := tx.Hash() - log.Trace("Promoting queued transaction", "hash", hash) - pool.promoteTx(addr, hash, tx) + inserted := pool.promoteTx(addr, hash, tx) + if inserted { + log.Trace("Promoting queued transaction", "hash", hash) + promotedTxs = append(promotedTxs, tx) + } } + // Drop all transactions over the allowed limit if !pool.locals.contains(addr) { for _, tx := range list.Cap(int(pool.config.AccountQueue)) { @@ -957,6 +962,11 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { delete(pool.queue, addr) } } + // Notify subsystem for new promoted transactions. + if promotedTxs.Len() > 0 { + pool.txFeed.Send(TxsPreEvent{promotedTxs}) + } + // If the pending limit is overflown, start equalizing allowances pending := uint64(0) for _, list := range pool.pending { diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index e7f52075e..c19581e3f 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -118,21 +118,26 @@ func validateTxPoolInternals(pool *TxPool) error { // validateEvents checks that the correct number of transaction addition events // were fired on the pool's event feed. -func validateEvents(events chan TxPreEvent, count int) error { - for i := 0; i < count; i++ { +func validateEvents(events chan TxsPreEvent, count int) error { + received := 0 + for { + if received == count { + break + } select { - case <-events: + case ev := <-events: + received += ev.Txs.Len() case <-time.After(time.Second): - return fmt.Errorf("event #%d not fired", i) + return fmt.Errorf("event #%d not fired", received) } } select { - case tx := <-events: - return fmt.Errorf("more than %d events fired: %v", count, tx.Tx) + case ev := <-events: + return fmt.Errorf("more than %d events fired: %v", count, ev.Txs) case <-time.After(50 * time.Millisecond): // This branch should be "default", but it's a data race between goroutines, - // reading the event channel and pushng into it, so better wait a bit ensuring + // reading the event channel and pushing into it, so better wait a bit ensuring // really nothing gets injected. } return nil @@ -669,7 +674,7 @@ func TestTransactionGapFilling(t *testing.T) { pool.currentState.AddBalance(account, big.NewInt(1000000)) // Keep track of transaction events to ensure all executables get announced - events := make(chan TxPreEvent, testTxPoolConfig.AccountQueue+5) + events := make(chan TxsPreEvent, testTxPoolConfig.AccountQueue+5) sub := pool.txFeed.Subscribe(events) defer sub.Unsubscribe() @@ -920,7 +925,7 @@ func TestTransactionPendingLimiting(t *testing.T) { pool.currentState.AddBalance(account, big.NewInt(1000000)) // Keep track of transaction events to ensure all executables get announced - events := make(chan TxPreEvent, testTxPoolConfig.AccountQueue+5) + events := make(chan TxsPreEvent, testTxPoolConfig.AccountQueue+5) sub := pool.txFeed.Subscribe(events) defer sub.Unsubscribe() @@ -1140,7 +1145,7 @@ func TestTransactionPoolRepricing(t *testing.T) { defer pool.Stop() // Keep track of transaction events to ensure all executables get announced - events := make(chan TxPreEvent, 32) + events := make(chan TxsPreEvent, 32) sub := pool.txFeed.Subscribe(events) defer sub.Unsubscribe() @@ -1327,7 +1332,7 @@ func TestTransactionPoolUnderpricing(t *testing.T) { defer pool.Stop() // Keep track of transaction events to ensure all executables get announced - events := make(chan TxPreEvent, 32) + events := make(chan TxsPreEvent, 32) sub := pool.txFeed.Subscribe(events) defer sub.Unsubscribe() @@ -1433,7 +1438,7 @@ func TestTransactionPoolStableUnderpricing(t *testing.T) { defer pool.Stop() // Keep track of transaction events to ensure all executables get announced - events := make(chan TxPreEvent, 32) + events := make(chan TxsPreEvent, 32) sub := pool.txFeed.Subscribe(events) defer sub.Unsubscribe() @@ -1495,7 +1500,7 @@ func TestTransactionReplacement(t *testing.T) { defer pool.Stop() // Keep track of transaction events to ensure all executables get announced - events := make(chan TxPreEvent, 32) + events := make(chan TxsPreEvent, 32) sub := pool.txFeed.Subscribe(events) defer sub.Unsubscribe() -- cgit v1.2.3 From 49719e21bcd740c5890334f8c0ec8ac3777fb4c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 18 May 2018 11:45:52 +0300 Subject: core, eth: minor txpool event cleanups --- core/events.go | 7 ++----- core/tx_journal.go | 29 ++++++++++++++--------------- core/tx_pool.go | 28 ++++++++++++++-------------- core/tx_pool_test.go | 27 ++++++++++++++------------- 4 files changed, 44 insertions(+), 47 deletions(-) (limited to 'core') diff --git a/core/events.go b/core/events.go index fee34b39d..8d200f2a2 100644 --- a/core/events.go +++ b/core/events.go @@ -21,8 +21,8 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) -// TxsPreEvent is posted when a batch of transactions enter the transaction pool. -type TxsPreEvent struct{ Txs types.Transactions } +// NewTxsEvent is posted when a batch of transactions enter the transaction pool. +type NewTxsEvent struct{ Txs []*types.Transaction } // PendingLogsEvent is posted pre mining and notifies of pending logs. type PendingLogsEvent struct { @@ -35,9 +35,6 @@ type PendingStateEvent struct{} // NewMinedBlockEvent is posted when a block has been imported. type NewMinedBlockEvent struct{ Block *types.Block } -// RemovedTransactionEvent is posted when a reorg happens -type RemovedTransactionEvent struct{ Txs types.Transactions } - // RemovedLogsEvent is posted when a reorg happens type RemovedLogsEvent struct{ Logs []*types.Log } diff --git a/core/tx_journal.go b/core/tx_journal.go index b344690b6..1397e9fd3 100644 --- a/core/tx_journal.go +++ b/core/tx_journal.go @@ -76,22 +76,21 @@ func (journal *txJournal) load(add func([]*types.Transaction) []error) error { stream := rlp.NewStream(input, 0) total, dropped := 0, 0 - // flush imports a batch of transactions and bump the appropriate progress counters - flush := func(txs types.Transactions) { - errs := add(txs) - for _, err := range errs { + // Create a method to load a limited batch of transactions and bump the + // appropriate progress counters. Then use this method to load all the + // journalled transactions in small-ish batches. + loadBatch := func(txs types.Transactions) { + for _, err := range add(txs) { if err != nil { log.Debug("Failed to add journaled transaction", "err", err) dropped++ } } } - var ( failure error - txs types.Transactions + batch types.Transactions ) - for { // Parse the next transaction and terminate on error tx := new(types.Transaction) @@ -99,19 +98,19 @@ func (journal *txJournal) load(add func([]*types.Transaction) []error) error { if err != io.EOF { failure = err } + if batch.Len() > 0 { + loadBatch(batch) + } break } - txs = append(txs, tx) + // New transaction parsed, queue up for later, import if threnshold is reached total++ - if txs.Len() > 1024 { - flush(txs) - txs = types.Transactions{} + + if batch = append(batch, tx); batch.Len() > 1024 { + loadBatch(batch) + batch = batch[:0] } } - if txs.Len() > 0 { - flush(txs) - txs = types.Transactions{} - } log.Info("Loaded local transaction journal", "transactions", total, "dropped", dropped) return failure diff --git a/core/tx_pool.go b/core/tx_pool.go index f23828cb4..f89e11441 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -444,9 +444,9 @@ func (pool *TxPool) Stop() { log.Info("Transaction pool stopped") } -// SubscribeTxPreEvent registers a subscription of TxsPreEvent and +// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and // starts sending event to the given channel. -func (pool *TxPool) SubscribeTxPreEvent(ch chan<- TxsPreEvent) event.Subscription { +func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- NewTxsEvent) event.Subscription { return pool.scope.Track(pool.txFeed.Subscribe(ch)) } @@ -653,7 +653,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To()) // We've directly injected a replacement transaction, notify subsystems - go pool.txFeed.Send(TxsPreEvent{types.Transactions{tx}}) + go pool.txFeed.Send(NewTxsEvent{types.Transactions{tx}}) return old != nil, nil } @@ -712,7 +712,8 @@ func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) { } } -// promoteTx adds a transaction to the pending (processable) list of transactions. +// promoteTx adds a transaction to the pending (processable) list of transactions +// and returns whether it was inserted or an older was better. // // Note, this method assumes the pool lock is held! func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) bool { @@ -746,6 +747,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T // Set the potentially new pending nonce and notify any subsystems of the new tx pool.beats[addr] = time.Now() pool.pendingState.SetNonce(addr, tx.Nonce()+1) + return true } @@ -906,7 +908,9 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { // future queue to the set of pending transactions. During this process, all // invalidated transactions (low nonce, low balance) are deleted. func (pool *TxPool) promoteExecutables(accounts []common.Address) { - var promotedTxs types.Transactions + // Track the promoted transactions to broadcast them at once + var promoted []*types.Transaction + // Gather all the accounts potentially needing updates if accounts == nil { accounts = make([]common.Address, 0, len(pool.queue)) @@ -937,16 +941,13 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { queuedNofundsCounter.Inc(1) } // Gather all executable transactions and promote them - txs := list.Ready(pool.pendingState.GetNonce(addr)) - for _, tx := range txs { + for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) { hash := tx.Hash() - inserted := pool.promoteTx(addr, hash, tx) - if inserted { + if pool.promoteTx(addr, hash, tx) { log.Trace("Promoting queued transaction", "hash", hash) - promotedTxs = append(promotedTxs, tx) + promoted = append(promoted, tx) } } - // Drop all transactions over the allowed limit if !pool.locals.contains(addr) { for _, tx := range list.Cap(int(pool.config.AccountQueue)) { @@ -963,10 +964,9 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { } } // Notify subsystem for new promoted transactions. - if promotedTxs.Len() > 0 { - pool.txFeed.Send(TxsPreEvent{promotedTxs}) + if len(promoted) > 0 { + pool.txFeed.Send(NewTxsEvent{promoted}) } - // If the pending limit is overflown, start equalizing allowances pending := uint64(0) for _, list := range pool.pending { diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index c19581e3f..25993c258 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -118,19 +118,20 @@ func validateTxPoolInternals(pool *TxPool) error { // validateEvents checks that the correct number of transaction addition events // were fired on the pool's event feed. -func validateEvents(events chan TxsPreEvent, count int) error { - received := 0 - for { - if received == count { - break - } +func validateEvents(events chan NewTxsEvent, count int) error { + var received []*types.Transaction + + for len(received) < count { select { case ev := <-events: - received += ev.Txs.Len() + received = append(received, ev.Txs...) case <-time.After(time.Second): return fmt.Errorf("event #%d not fired", received) } } + if len(received) > count { + return fmt.Errorf("more than %d events fired: %v", count, received[count:]) + } select { case ev := <-events: return fmt.Errorf("more than %d events fired: %v", count, ev.Txs) @@ -674,7 +675,7 @@ func TestTransactionGapFilling(t *testing.T) { pool.currentState.AddBalance(account, big.NewInt(1000000)) // Keep track of transaction events to ensure all executables get announced - events := make(chan TxsPreEvent, testTxPoolConfig.AccountQueue+5) + events := make(chan NewTxsEvent, testTxPoolConfig.AccountQueue+5) sub := pool.txFeed.Subscribe(events) defer sub.Unsubscribe() @@ -925,7 +926,7 @@ func TestTransactionPendingLimiting(t *testing.T) { pool.currentState.AddBalance(account, big.NewInt(1000000)) // Keep track of transaction events to ensure all executables get announced - events := make(chan TxsPreEvent, testTxPoolConfig.AccountQueue+5) + events := make(chan NewTxsEvent, testTxPoolConfig.AccountQueue+5) sub := pool.txFeed.Subscribe(events) defer sub.Unsubscribe() @@ -1145,7 +1146,7 @@ func TestTransactionPoolRepricing(t *testing.T) { defer pool.Stop() // Keep track of transaction events to ensure all executables get announced - events := make(chan TxsPreEvent, 32) + events := make(chan NewTxsEvent, 32) sub := pool.txFeed.Subscribe(events) defer sub.Unsubscribe() @@ -1332,7 +1333,7 @@ func TestTransactionPoolUnderpricing(t *testing.T) { defer pool.Stop() // Keep track of transaction events to ensure all executables get announced - events := make(chan TxsPreEvent, 32) + events := make(chan NewTxsEvent, 32) sub := pool.txFeed.Subscribe(events) defer sub.Unsubscribe() @@ -1438,7 +1439,7 @@ func TestTransactionPoolStableUnderpricing(t *testing.T) { defer pool.Stop() // Keep track of transaction events to ensure all executables get announced - events := make(chan TxsPreEvent, 32) + events := make(chan NewTxsEvent, 32) sub := pool.txFeed.Subscribe(events) defer sub.Unsubscribe() @@ -1500,7 +1501,7 @@ func TestTransactionReplacement(t *testing.T) { defer pool.Stop() // Keep track of transaction events to ensure all executables get announced - events := make(chan TxsPreEvent, 32) + events := make(chan NewTxsEvent, 32) sub := pool.txFeed.Subscribe(events) defer sub.Unsubscribe() -- cgit v1.2.3