From a2e43d28d01ef9642c7f6992b78b86bd0696c847 Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Thu, 10 May 2018 15:04:45 +0800 Subject: all: collate new transaction events together --- accounts/abi/bind/backends/simulated.go | 2 +- core/events.go | 4 +-- core/tx_journal.go | 32 ++++++++++++++++++----- core/tx_pool.go | 32 +++++++++++++++-------- core/tx_pool_test.go | 31 +++++++++++++---------- eth/api_backend.go | 2 +- eth/filters/api.go | 14 ++++++---- eth/filters/filter.go | 2 +- eth/filters/filter_system.go | 38 +++++++++++++++------------- eth/filters/filter_system_test.go | 7 ++--- eth/handler.go | 41 +++++++++++++++++------------- eth/helper_test.go | 2 +- eth/protocol.go | 4 +-- eth/protocol_test.go | 2 +- ethstats/ethstats.go | 8 +++--- internal/ethapi/backend.go | 2 +- les/api_backend.go | 2 +- light/txpool.go | 6 ++--- miner/worker.go | 45 ++++++++++++++++++++------------- 19 files changed, 165 insertions(+), 111 deletions(-) diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index 7b605e1c1..6262f3688 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -454,7 +454,7 @@ func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*ty return logs, nil } -func (fb *filterBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { +func (fb *filterBackend) SubscribeTxPreEvent(ch chan<- core.TxsPreEvent) event.Subscription { return event.NewSubscription(func(quit <-chan struct{}) error { <-quit return nil diff --git a/core/events.go b/core/events.go index 6f404f612..fee34b39d 100644 --- a/core/events.go +++ b/core/events.go @@ -21,8 +21,8 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) -// TxPreEvent is posted when a transaction enters the transaction pool. -type TxPreEvent struct{ Tx *types.Transaction } +// TxsPreEvent is posted when a batch of transactions enter the transaction pool. +type TxsPreEvent struct{ Txs types.Transactions } // PendingLogsEvent is posted pre mining and notifies of pending logs. type PendingLogsEvent struct { diff --git a/core/tx_journal.go b/core/tx_journal.go index e872d7b53..b344690b6 100644 --- a/core/tx_journal.go +++ b/core/tx_journal.go @@ -56,7 +56,7 @@ func newTxJournal(path string) *txJournal { // load parses a transaction journal dump from disk, loading its contents into // the specified pool. -func (journal *txJournal) load(add func(*types.Transaction) error) error { +func (journal *txJournal) load(add func([]*types.Transaction) []error) error { // Skip the parsing if the journal file doens't exist at all if _, err := os.Stat(journal.path); os.IsNotExist(err) { return nil @@ -76,7 +76,22 @@ func (journal *txJournal) load(add func(*types.Transaction) error) error { stream := rlp.NewStream(input, 0) total, dropped := 0, 0 - var failure error + // flush imports a batch of transactions and bump the appropriate progress counters + flush := func(txs types.Transactions) { + errs := add(txs) + for _, err := range errs { + if err != nil { + log.Debug("Failed to add journaled transaction", "err", err) + dropped++ + } + } + } + + var ( + failure error + txs types.Transactions + ) + for { // Parse the next transaction and terminate on error tx := new(types.Transaction) @@ -86,14 +101,17 @@ func (journal *txJournal) load(add func(*types.Transaction) error) error { } break } - // Import the transaction and bump the appropriate progress counters + txs = append(txs, tx) total++ - if err = add(tx); err != nil { - log.Debug("Failed to add journaled transaction", "err", err) - dropped++ - continue + if txs.Len() > 1024 { + flush(txs) + txs = types.Transactions{} } } + if txs.Len() > 0 { + flush(txs) + txs = types.Transactions{} + } log.Info("Loaded local transaction journal", "transactions", total, "dropped", dropped) return failure diff --git a/core/tx_pool.go b/core/tx_pool.go index 388b40058..f23828cb4 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -238,7 +238,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block if !config.NoLocals && config.Journal != "" { pool.journal = newTxJournal(config.Journal) - if err := pool.journal.load(pool.AddLocal); err != nil { + if err := pool.journal.load(pool.AddLocals); err != nil { log.Warn("Failed to load transaction journal", "err", err) } if err := pool.journal.rotate(pool.local()); err != nil { @@ -444,9 +444,9 @@ func (pool *TxPool) Stop() { log.Info("Transaction pool stopped") } -// SubscribeTxPreEvent registers a subscription of TxPreEvent and +// SubscribeTxPreEvent registers a subscription of TxsPreEvent and // starts sending event to the given channel. -func (pool *TxPool) SubscribeTxPreEvent(ch chan<- TxPreEvent) event.Subscription { +func (pool *TxPool) SubscribeTxPreEvent(ch chan<- TxsPreEvent) event.Subscription { return pool.scope.Track(pool.txFeed.Subscribe(ch)) } @@ -653,7 +653,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To()) // We've directly injected a replacement transaction, notify subsystems - go pool.txFeed.Send(TxPreEvent{tx}) + go pool.txFeed.Send(TxsPreEvent{types.Transactions{tx}}) return old != nil, nil } @@ -715,7 +715,7 @@ func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) { // promoteTx adds a transaction to the pending (processable) list of transactions. // // Note, this method assumes the pool lock is held! -func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) { +func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) bool { // Try to insert the transaction into the pending queue if pool.pending[addr] == nil { pool.pending[addr] = newTxList(true) @@ -729,7 +729,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T pool.priced.Removed() pendingDiscardCounter.Inc(1) - return + return false } // Otherwise discard any previous transaction and mark this if old != nil { @@ -746,8 +746,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T // Set the potentially new pending nonce and notify any subsystems of the new tx pool.beats[addr] = time.Now() pool.pendingState.SetNonce(addr, tx.Nonce()+1) - - go pool.txFeed.Send(TxPreEvent{tx}) + return true } // AddLocal enqueues a single transaction into the pool if it is valid, marking @@ -907,6 +906,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { // future queue to the set of pending transactions. During this process, all // invalidated transactions (low nonce, low balance) are deleted. func (pool *TxPool) promoteExecutables(accounts []common.Address) { + var promotedTxs types.Transactions // Gather all the accounts potentially needing updates if accounts == nil { accounts = make([]common.Address, 0, len(pool.queue)) @@ -937,11 +937,16 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { queuedNofundsCounter.Inc(1) } // Gather all executable transactions and promote them - for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) { + txs := list.Ready(pool.pendingState.GetNonce(addr)) + for _, tx := range txs { hash := tx.Hash() - log.Trace("Promoting queued transaction", "hash", hash) - pool.promoteTx(addr, hash, tx) + inserted := pool.promoteTx(addr, hash, tx) + if inserted { + log.Trace("Promoting queued transaction", "hash", hash) + promotedTxs = append(promotedTxs, tx) + } } + // Drop all transactions over the allowed limit if !pool.locals.contains(addr) { for _, tx := range list.Cap(int(pool.config.AccountQueue)) { @@ -957,6 +962,11 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { delete(pool.queue, addr) } } + // Notify subsystem for new promoted transactions. + if promotedTxs.Len() > 0 { + pool.txFeed.Send(TxsPreEvent{promotedTxs}) + } + // If the pending limit is overflown, start equalizing allowances pending := uint64(0) for _, list := range pool.pending { diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index e7f52075e..c19581e3f 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -118,21 +118,26 @@ func validateTxPoolInternals(pool *TxPool) error { // validateEvents checks that the correct number of transaction addition events // were fired on the pool's event feed. -func validateEvents(events chan TxPreEvent, count int) error { - for i := 0; i < count; i++ { +func validateEvents(events chan TxsPreEvent, count int) error { + received := 0 + for { + if received == count { + break + } select { - case <-events: + case ev := <-events: + received += ev.Txs.Len() case <-time.After(time.Second): - return fmt.Errorf("event #%d not fired", i) + return fmt.Errorf("event #%d not fired", received) } } select { - case tx := <-events: - return fmt.Errorf("more than %d events fired: %v", count, tx.Tx) + case ev := <-events: + return fmt.Errorf("more than %d events fired: %v", count, ev.Txs) case <-time.After(50 * time.Millisecond): // This branch should be "default", but it's a data race between goroutines, - // reading the event channel and pushng into it, so better wait a bit ensuring + // reading the event channel and pushing into it, so better wait a bit ensuring // really nothing gets injected. } return nil @@ -669,7 +674,7 @@ func TestTransactionGapFilling(t *testing.T) { pool.currentState.AddBalance(account, big.NewInt(1000000)) // Keep track of transaction events to ensure all executables get announced - events := make(chan TxPreEvent, testTxPoolConfig.AccountQueue+5) + events := make(chan TxsPreEvent, testTxPoolConfig.AccountQueue+5) sub := pool.txFeed.Subscribe(events) defer sub.Unsubscribe() @@ -920,7 +925,7 @@ func TestTransactionPendingLimiting(t *testing.T) { pool.currentState.AddBalance(account, big.NewInt(1000000)) // Keep track of transaction events to ensure all executables get announced - events := make(chan TxPreEvent, testTxPoolConfig.AccountQueue+5) + events := make(chan TxsPreEvent, testTxPoolConfig.AccountQueue+5) sub := pool.txFeed.Subscribe(events) defer sub.Unsubscribe() @@ -1140,7 +1145,7 @@ func TestTransactionPoolRepricing(t *testing.T) { defer pool.Stop() // Keep track of transaction events to ensure all executables get announced - events := make(chan TxPreEvent, 32) + events := make(chan TxsPreEvent, 32) sub := pool.txFeed.Subscribe(events) defer sub.Unsubscribe() @@ -1327,7 +1332,7 @@ func TestTransactionPoolUnderpricing(t *testing.T) { defer pool.Stop() // Keep track of transaction events to ensure all executables get announced - events := make(chan TxPreEvent, 32) + events := make(chan TxsPreEvent, 32) sub := pool.txFeed.Subscribe(events) defer sub.Unsubscribe() @@ -1433,7 +1438,7 @@ func TestTransactionPoolStableUnderpricing(t *testing.T) { defer pool.Stop() // Keep track of transaction events to ensure all executables get announced - events := make(chan TxPreEvent, 32) + events := make(chan TxsPreEvent, 32) sub := pool.txFeed.Subscribe(events) defer sub.Unsubscribe() @@ -1495,7 +1500,7 @@ func TestTransactionReplacement(t *testing.T) { defer pool.Stop() // Keep track of transaction events to ensure all executables get announced - events := make(chan TxPreEvent, 32) + events := make(chan TxsPreEvent, 32) sub := pool.txFeed.Subscribe(events) defer sub.Unsubscribe() diff --git a/eth/api_backend.go b/eth/api_backend.go index 4ace9b594..ef70b12b7 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -188,7 +188,7 @@ func (b *EthAPIBackend) TxPoolContent() (map[common.Address]types.Transactions, return b.eth.TxPool().Content() } -func (b *EthAPIBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { +func (b *EthAPIBackend) SubscribeTxPreEvent(ch chan<- core.TxsPreEvent) event.Subscription { return b.eth.TxPool().SubscribeTxPreEvent(ch) } diff --git a/eth/filters/api.go b/eth/filters/api.go index 1297b7478..d2c9258f9 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -104,7 +104,7 @@ func (api *PublicFilterAPI) timeoutLoop() { // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { var ( - pendingTxs = make(chan common.Hash) + pendingTxs = make(chan []common.Hash) pendingTxSub = api.events.SubscribePendingTxEvents(pendingTxs) ) @@ -118,7 +118,7 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { case ph := <-pendingTxs: api.filtersMu.Lock() if f, found := api.filters[pendingTxSub.ID]; found { - f.hashes = append(f.hashes, ph) + f.hashes = append(f.hashes, ph...) } api.filtersMu.Unlock() case <-pendingTxSub.Err(): @@ -144,13 +144,17 @@ func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Su rpcSub := notifier.CreateSubscription() go func() { - txHashes := make(chan common.Hash) + txHashes := make(chan []common.Hash, 128) pendingTxSub := api.events.SubscribePendingTxEvents(txHashes) for { select { - case h := <-txHashes: - notifier.Notify(rpcSub.ID, h) + case hashes := <-txHashes: + // To keep the original behaviour, send a single tx hash in one notification. + // TODO(rjl493456442) Send a batch of tx hashes in one notification + for _, h := range hashes { + notifier.Notify(rpcSub.ID, h) + } case <-rpcSub.Err(): pendingTxSub.Unsubscribe() return diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 5dfe60e77..45d91bea0 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -36,7 +36,7 @@ type Backend interface { GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error) - SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription + SubscribeTxPreEvent(chan<- core.TxsPreEvent) event.Subscription SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index bb11734a7..5f1c12c6a 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -59,7 +59,7 @@ const ( const ( - // txChanSize is the size of channel listening to TxPreEvent. + // txChanSize is the size of channel listening to TxsPreEvent. // The number is referenced from the size of tx pool. txChanSize = 4096 // rmLogsChanSize is the size of channel listening to RemovedLogsEvent. @@ -80,7 +80,7 @@ type subscription struct { created time.Time logsCrit ethereum.FilterQuery logs chan []*types.Log - hashes chan common.Hash + hashes chan []common.Hash headers chan *types.Header installed chan struct{} // closed when the filter is installed err chan error // closed when the filter is uninstalled @@ -95,7 +95,7 @@ type EventSystem struct { lastHead *types.Header // Subscriptions - txSub event.Subscription // Subscription for new transaction event + txsSub event.Subscription // Subscription for new transaction event logsSub event.Subscription // Subscription for new log event rmLogsSub event.Subscription // Subscription for removed log event chainSub event.Subscription // Subscription for new chain event @@ -104,7 +104,7 @@ type EventSystem struct { // Channels install chan *subscription // install filter for event notification uninstall chan *subscription // remove filter for event notification - txCh chan core.TxPreEvent // Channel to receive new transaction event + txsCh chan core.TxsPreEvent // Channel to receive new transactions event logsCh chan []*types.Log // Channel to receive new log event rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event chainCh chan core.ChainEvent // Channel to receive new chain event @@ -123,14 +123,14 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS lightMode: lightMode, install: make(chan *subscription), uninstall: make(chan *subscription), - txCh: make(chan core.TxPreEvent, txChanSize), + txsCh: make(chan core.TxsPreEvent, txChanSize), logsCh: make(chan []*types.Log, logsChanSize), rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), chainCh: make(chan core.ChainEvent, chainEvChanSize), } // Subscribe events - m.txSub = m.backend.SubscribeTxPreEvent(m.txCh) + m.txsSub = m.backend.SubscribeTxPreEvent(m.txsCh) m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh) m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh) m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) @@ -138,7 +138,7 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{}) // Make sure none of the subscriptions are empty - if m.txSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || + if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogSub.Closed() { log.Crit("Subscribe for event system failed") } @@ -240,7 +240,7 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs logsCrit: crit, created: time.Now(), logs: logs, - hashes: make(chan common.Hash), + hashes: make(chan []common.Hash), headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), @@ -257,7 +257,7 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ logsCrit: crit, created: time.Now(), logs: logs, - hashes: make(chan common.Hash), + hashes: make(chan []common.Hash), headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), @@ -274,7 +274,7 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan logsCrit: crit, created: time.Now(), logs: logs, - hashes: make(chan common.Hash), + hashes: make(chan []common.Hash), headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), @@ -290,7 +290,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti typ: BlocksSubscription, created: time.Now(), logs: make(chan []*types.Log), - hashes: make(chan common.Hash), + hashes: make(chan []common.Hash), headers: headers, installed: make(chan struct{}), err: make(chan error), @@ -300,7 +300,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti // SubscribePendingTxEvents creates a subscription that writes transaction hashes for // transactions that enter the transaction pool. -func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscription { +func (es *EventSystem) SubscribePendingTxEvents(hashes chan []common.Hash) *Subscription { sub := &subscription{ id: rpc.NewID(), typ: PendingTransactionsSubscription, @@ -348,9 +348,13 @@ func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) { } } } - case core.TxPreEvent: + case core.TxsPreEvent: + hashes := make([]common.Hash, 0, e.Txs.Len()) + for _, tx := range e.Txs { + hashes = append(hashes, tx.Hash()) + } for _, f := range filters[PendingTransactionsSubscription] { - f.hashes <- e.Tx.Hash() + f.hashes <- hashes } case core.ChainEvent: for _, f := range filters[BlocksSubscription] { @@ -446,7 +450,7 @@ func (es *EventSystem) eventLoop() { // Ensure all subscriptions get cleaned up defer func() { es.pendingLogSub.Unsubscribe() - es.txSub.Unsubscribe() + es.txsSub.Unsubscribe() es.logsSub.Unsubscribe() es.rmLogsSub.Unsubscribe() es.chainSub.Unsubscribe() @@ -460,7 +464,7 @@ func (es *EventSystem) eventLoop() { for { select { // Handle subscribed events - case ev := <-es.txCh: + case ev := <-es.txsCh: es.broadcast(index, ev) case ev := <-es.logsCh: es.broadcast(index, ev) @@ -495,7 +499,7 @@ func (es *EventSystem) eventLoop() { close(f.err) // System stopped - case <-es.txSub.Err(): + case <-es.txsSub.Err(): return case <-es.logsSub.Err(): return diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index b4df24b47..c43d282ae 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -96,7 +96,7 @@ func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types return logs, nil } -func (b *testBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { +func (b *testBackend) SubscribeTxPreEvent(ch chan<- core.TxsPreEvent) event.Subscription { return b.txFeed.Subscribe(ch) } @@ -232,10 +232,7 @@ func TestPendingTxFilter(t *testing.T) { fid0 := api.NewPendingTransactionFilter() time.Sleep(1 * time.Second) - for _, tx := range transactions { - ev := core.TxPreEvent{Tx: tx} - txFeed.Send(ev) - } + txFeed.Send(core.TxsPreEvent{transactions}) timeout := time.Now().Add(1 * time.Second) for { diff --git a/eth/handler.go b/eth/handler.go index c8f7e13f1..7ef8e957a 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -46,7 +46,7 @@ const ( softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data. estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header - // txChanSize is the size of channel listening to TxPreEvent. + // txChanSize is the size of channel listening to TxsPreEvent. // The number is referenced from the size of tx pool. txChanSize = 4096 ) @@ -81,8 +81,8 @@ type ProtocolManager struct { SubProtocols []p2p.Protocol eventMux *event.TypeMux - txCh chan core.TxPreEvent - txSub event.Subscription + txsCh chan core.TxsPreEvent + txsSub event.Subscription minedBlockSub *event.TypeMuxSubscription // channels for fetcher, syncer, txsyncLoop @@ -204,8 +204,8 @@ func (pm *ProtocolManager) Start(maxPeers int) { pm.maxPeers = maxPeers // broadcast transactions - pm.txCh = make(chan core.TxPreEvent, txChanSize) - pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh) + pm.txsCh = make(chan core.TxsPreEvent, txChanSize) + pm.txsSub = pm.txpool.SubscribeTxPreEvent(pm.txsCh) go pm.txBroadcastLoop() // broadcast mined blocks @@ -220,7 +220,7 @@ func (pm *ProtocolManager) Start(maxPeers int) { func (pm *ProtocolManager) Stop() { log.Info("Stopping Ethereum protocol") - pm.txSub.Unsubscribe() // quits txBroadcastLoop + pm.txsSub.Unsubscribe() // quits txBroadcastLoop pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop // Quit the sync loop. @@ -712,16 +712,23 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { } } -// BroadcastTx will propagate a transaction to all peers which are not known to +// BroadcastTxs will propagate a batch of transactions to all peers which are not known to // already have the given transaction. -func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) { - // Broadcast transaction to a batch of peers not knowing about it - peers := pm.peers.PeersWithoutTx(hash) - //FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))] - for _, peer := range peers { - peer.SendTransactions(types.Transactions{tx}) +func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) { + var txset = make(map[*peer]types.Transactions) + + // Broadcast transactions to a batch of peers not knowing about it + for _, tx := range txs { + peers := pm.peers.PeersWithoutTx(tx.Hash()) + for _, peer := range peers { + txset[peer] = append(txset[peer], tx) + } + log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers)) + } + // FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))] + for peer, txs := range txset { + peer.SendTransactions(txs) } - log.Trace("Broadcast transaction", "hash", hash, "recipients", len(peers)) } // Mined broadcast loop @@ -739,11 +746,11 @@ func (pm *ProtocolManager) minedBroadcastLoop() { func (pm *ProtocolManager) txBroadcastLoop() { for { select { - case event := <-pm.txCh: - pm.BroadcastTx(event.Tx.Hash(), event.Tx) + case event := <-pm.txsCh: + pm.BroadcastTxs(event.Txs) // Err() channel will be closed when unsubscribing. - case <-pm.txSub.Err(): + case <-pm.txsSub.Err(): return } } diff --git a/eth/helper_test.go b/eth/helper_test.go index 8a0260fc9..40a172ef8 100644 --- a/eth/helper_test.go +++ b/eth/helper_test.go @@ -124,7 +124,7 @@ func (p *testTxPool) Pending() (map[common.Address]types.Transactions, error) { return batches, nil } -func (p *testTxPool) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { +func (p *testTxPool) SubscribeTxPreEvent(ch chan<- core.TxsPreEvent) event.Subscription { return p.txFeed.Subscribe(ch) } diff --git a/eth/protocol.go b/eth/protocol.go index 328d5b993..bee3d8365 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -104,8 +104,8 @@ type txPool interface { Pending() (map[common.Address]types.Transactions, error) // SubscribeTxPreEvent should return an event subscription of - // TxPreEvent and send events to the given channel. - SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription + // TxsPreEvent and send events to the given channel. + SubscribeTxPreEvent(chan<- core.TxsPreEvent) event.Subscription } // statusData is the network packet for the status message. diff --git a/eth/protocol_test.go b/eth/protocol_test.go index b2f93d8dd..1c32ea7d8 100644 --- a/eth/protocol_test.go +++ b/eth/protocol_test.go @@ -116,7 +116,7 @@ func testRecvTransactions(t *testing.T, protocol int) { t.Errorf("added wrong tx hash: got %v, want %v", added[0].Hash(), tx.Hash()) } case <-time.After(2 * time.Second): - t.Errorf("no TxPreEvent received within 2 seconds") + t.Errorf("no TxsPreEvent received within 2 seconds") } } diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go index a15d84615..17f9f3c7a 100644 --- a/ethstats/ethstats.go +++ b/ethstats/ethstats.go @@ -49,7 +49,7 @@ const ( // history request. historyUpdateRange = 50 - // txChanSize is the size of channel listening to TxPreEvent. + // txChanSize is the size of channel listening to TxsPreEvent. // The number is referenced from the size of tx pool. txChanSize = 4096 // chainHeadChanSize is the size of channel listening to ChainHeadEvent. @@ -58,8 +58,8 @@ const ( type txPool interface { // SubscribeTxPreEvent should return an event subscription of - // TxPreEvent and send events to the given channel. - SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription + // TxsPreEvent and send events to the given channel. + SubscribeTxPreEvent(chan<- core.TxsPreEvent) event.Subscription } type blockChain interface { @@ -150,7 +150,7 @@ func (s *Service) loop() { headSub := blockchain.SubscribeChainHeadEvent(chainHeadCh) defer headSub.Unsubscribe() - txEventCh := make(chan core.TxPreEvent, txChanSize) + txEventCh := make(chan core.TxsPreEvent, txChanSize) txSub := txpool.SubscribeTxPreEvent(txEventCh) defer txSub.Unsubscribe() diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index af95d7906..e351152b9 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -65,7 +65,7 @@ type Backend interface { GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) Stats() (pending int, queued int) TxPoolContent() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) - SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription + SubscribeTxPreEvent(chan<- core.TxsPreEvent) event.Subscription ChainConfig() *params.ChainConfig CurrentBlock() *types.Block diff --git a/les/api_backend.go b/les/api_backend.go index 1d3c99513..957c8ae64 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -136,7 +136,7 @@ func (b *LesApiBackend) TxPoolContent() (map[common.Address]types.Transactions, return b.eth.txPool.Content() } -func (b *LesApiBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { +func (b *LesApiBackend) SubscribeTxPreEvent(ch chan<- core.TxsPreEvent) event.Subscription { return b.eth.txPool.SubscribeTxPreEvent(ch) } diff --git a/light/txpool.go b/light/txpool.go index 94c8139cb..bedfd48a2 100644 --- a/light/txpool.go +++ b/light/txpool.go @@ -321,9 +321,9 @@ func (pool *TxPool) Stop() { log.Info("Transaction pool stopped") } -// SubscribeTxPreEvent registers a subscription of core.TxPreEvent and +// SubscribeTxPreEvent registers a subscription of core.TxsPreEvent and // starts sending event to the given channel. -func (pool *TxPool) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { +func (pool *TxPool) SubscribeTxPreEvent(ch chan<- core.TxsPreEvent) event.Subscription { return pool.scope.Track(pool.txFeed.Subscribe(ch)) } @@ -412,7 +412,7 @@ func (self *TxPool) add(ctx context.Context, tx *types.Transaction) error { // Notify the subscribers. This event is posted in a goroutine // because it's possible that somewhere during the post "Remove transaction" // gets called which will then wait for the global tx pool lock and deadlock. - go self.txFeed.Send(core.TxPreEvent{Tx: tx}) + go self.txFeed.Send(core.TxsPreEvent{types.Transactions{tx}}) } // Print a log message if low enough level is set diff --git a/miner/worker.go b/miner/worker.go index 48b0b2765..3d086d6ae 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -42,7 +42,7 @@ const ( resultQueueSize = 10 miningLogAtDepth = 5 - // txChanSize is the size of channel listening to TxPreEvent. + // txChanSize is the size of channel listening to TxsPreEvent. // The number is referenced from the size of tx pool. txChanSize = 4096 // chainHeadChanSize is the size of channel listening to ChainHeadEvent. @@ -71,6 +71,7 @@ type Work struct { family *set.Set // family set (used for checking uncle invalidity) uncles *set.Set // uncle set tcount int // tx count in cycle + gasPool *core.GasPool // available gas used to pack transaction. Block *types.Block // the new block @@ -95,8 +96,8 @@ type worker struct { // update loop mux *event.TypeMux - txCh chan core.TxPreEvent - txSub event.Subscription + txsCh chan core.TxsPreEvent + txsSub event.Subscription chainHeadCh chan core.ChainHeadEvent chainHeadSub event.Subscription chainSideCh chan core.ChainSideEvent @@ -137,7 +138,7 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com engine: engine, eth: eth, mux: mux, - txCh: make(chan core.TxPreEvent, txChanSize), + txsCh: make(chan core.TxsPreEvent, txChanSize), chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), chainDb: eth.ChainDb(), @@ -149,8 +150,8 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com agents: make(map[Agent]struct{}), unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth), } - // Subscribe TxPreEvent for tx pool - worker.txSub = eth.TxPool().SubscribeTxPreEvent(worker.txCh) + // Subscribe TxsPreEvent for tx pool + worker.txsSub = eth.TxPool().SubscribeTxPreEvent(worker.txsCh) // Subscribe events for blockchain worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh) worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh) @@ -241,7 +242,7 @@ func (self *worker) unregister(agent Agent) { } func (self *worker) update() { - defer self.txSub.Unsubscribe() + defer self.txsSub.Unsubscribe() defer self.chainHeadSub.Unsubscribe() defer self.chainSideSub.Unsubscribe() @@ -258,15 +259,21 @@ func (self *worker) update() { self.possibleUncles[ev.Block.Hash()] = ev.Block self.uncleMu.Unlock() - // Handle TxPreEvent - case ev := <-self.txCh: - // Apply transaction to the pending state if we're not mining + // Handle TxsPreEvent + case ev := <-self.txsCh: + // Apply transactions to the pending state if we're not mining. + // + // Note all transactions received may not be continuous with transactions + // already included in the current mining block. These transactions will + // be automatically eliminated. if atomic.LoadInt32(&self.mining) == 0 { self.currentMu.Lock() - acc, _ := types.Sender(self.current.signer, ev.Tx) - txs := map[common.Address]types.Transactions{acc: {ev.Tx}} + txs := make(map[common.Address]types.Transactions) + for _, tx := range ev.Txs { + acc, _ := types.Sender(self.current.signer, tx) + txs[acc] = append(txs[acc], tx) + } txset := types.NewTransactionsByPriceAndNonce(self.current.signer, txs) - self.current.commitTransactions(self.mux, txset, self.chain, self.coinbase) self.updateSnapshot() self.currentMu.Unlock() @@ -278,7 +285,7 @@ func (self *worker) update() { } // System stopped - case <-self.txSub.Err(): + case <-self.txsSub.Err(): return case <-self.chainHeadSub.Err(): return @@ -522,14 +529,16 @@ func (self *worker) updateSnapshot() { } func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsByPriceAndNonce, bc *core.BlockChain, coinbase common.Address) { - gp := new(core.GasPool).AddGas(env.header.GasLimit) + if env.gasPool == nil { + env.gasPool = new(core.GasPool).AddGas(env.header.GasLimit) + } var coalescedLogs []*types.Log for { // If we don't have enough gas for any further transactions then we're done - if gp.Gas() < params.TxGas { - log.Trace("Not enough gas for further transactions", "gp", gp) + if env.gasPool.Gas() < params.TxGas { + log.Trace("Not enough gas for further transactions", "gp", env.gasPool) break } // Retrieve the next transaction and abort if all done @@ -553,7 +562,7 @@ func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsB // Start executing the transaction env.state.Prepare(tx.Hash(), common.Hash{}, env.tcount) - err, logs := env.commitTransaction(tx, bc, coinbase, gp) + err, logs := env.commitTransaction(tx, bc, coinbase, env.gasPool) switch err { case core.ErrGasLimitReached: // Pop the current out-of-gas transaction without shifting in the next from the account -- cgit v1.2.3