From a2e43d28d01ef9642c7f6992b78b86bd0696c847 Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Thu, 10 May 2018 15:04:45 +0800 Subject: all: collate new transaction events together --- eth/filters/api.go | 14 +++++++++----- eth/filters/filter.go | 2 +- eth/filters/filter_system.go | 38 +++++++++++++++++++++----------------- eth/filters/filter_system_test.go | 7 ++----- 4 files changed, 33 insertions(+), 28 deletions(-) (limited to 'eth/filters') diff --git a/eth/filters/api.go b/eth/filters/api.go index 1297b7478..d2c9258f9 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -104,7 +104,7 @@ func (api *PublicFilterAPI) timeoutLoop() { // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { var ( - pendingTxs = make(chan common.Hash) + pendingTxs = make(chan []common.Hash) pendingTxSub = api.events.SubscribePendingTxEvents(pendingTxs) ) @@ -118,7 +118,7 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { case ph := <-pendingTxs: api.filtersMu.Lock() if f, found := api.filters[pendingTxSub.ID]; found { - f.hashes = append(f.hashes, ph) + f.hashes = append(f.hashes, ph...) } api.filtersMu.Unlock() case <-pendingTxSub.Err(): @@ -144,13 +144,17 @@ func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Su rpcSub := notifier.CreateSubscription() go func() { - txHashes := make(chan common.Hash) + txHashes := make(chan []common.Hash, 128) pendingTxSub := api.events.SubscribePendingTxEvents(txHashes) for { select { - case h := <-txHashes: - notifier.Notify(rpcSub.ID, h) + case hashes := <-txHashes: + // To keep the original behaviour, send a single tx hash in one notification. + // TODO(rjl493456442) Send a batch of tx hashes in one notification + for _, h := range hashes { + notifier.Notify(rpcSub.ID, h) + } case <-rpcSub.Err(): pendingTxSub.Unsubscribe() return diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 5dfe60e77..45d91bea0 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -36,7 +36,7 @@ type Backend interface { GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error) - SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription + SubscribeTxPreEvent(chan<- core.TxsPreEvent) event.Subscription SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index bb11734a7..5f1c12c6a 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -59,7 +59,7 @@ const ( const ( - // txChanSize is the size of channel listening to TxPreEvent. + // txChanSize is the size of channel listening to TxsPreEvent. // The number is referenced from the size of tx pool. txChanSize = 4096 // rmLogsChanSize is the size of channel listening to RemovedLogsEvent. @@ -80,7 +80,7 @@ type subscription struct { created time.Time logsCrit ethereum.FilterQuery logs chan []*types.Log - hashes chan common.Hash + hashes chan []common.Hash headers chan *types.Header installed chan struct{} // closed when the filter is installed err chan error // closed when the filter is uninstalled @@ -95,7 +95,7 @@ type EventSystem struct { lastHead *types.Header // Subscriptions - txSub event.Subscription // Subscription for new transaction event + txsSub event.Subscription // Subscription for new transaction event logsSub event.Subscription // Subscription for new log event rmLogsSub event.Subscription // Subscription for removed log event chainSub event.Subscription // Subscription for new chain event @@ -104,7 +104,7 @@ type EventSystem struct { // Channels install chan *subscription // install filter for event notification uninstall chan *subscription // remove filter for event notification - txCh chan core.TxPreEvent // Channel to receive new transaction event + txsCh chan core.TxsPreEvent // Channel to receive new transactions event logsCh chan []*types.Log // Channel to receive new log event rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event chainCh chan core.ChainEvent // Channel to receive new chain event @@ -123,14 +123,14 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS lightMode: lightMode, install: make(chan *subscription), uninstall: make(chan *subscription), - txCh: make(chan core.TxPreEvent, txChanSize), + txsCh: make(chan core.TxsPreEvent, txChanSize), logsCh: make(chan []*types.Log, logsChanSize), rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), chainCh: make(chan core.ChainEvent, chainEvChanSize), } // Subscribe events - m.txSub = m.backend.SubscribeTxPreEvent(m.txCh) + m.txsSub = m.backend.SubscribeTxPreEvent(m.txsCh) m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh) m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh) m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) @@ -138,7 +138,7 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{}) // Make sure none of the subscriptions are empty - if m.txSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || + if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogSub.Closed() { log.Crit("Subscribe for event system failed") } @@ -240,7 +240,7 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs logsCrit: crit, created: time.Now(), logs: logs, - hashes: make(chan common.Hash), + hashes: make(chan []common.Hash), headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), @@ -257,7 +257,7 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ logsCrit: crit, created: time.Now(), logs: logs, - hashes: make(chan common.Hash), + hashes: make(chan []common.Hash), headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), @@ -274,7 +274,7 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan logsCrit: crit, created: time.Now(), logs: logs, - hashes: make(chan common.Hash), + hashes: make(chan []common.Hash), headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), @@ -290,7 +290,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti typ: BlocksSubscription, created: time.Now(), logs: make(chan []*types.Log), - hashes: make(chan common.Hash), + hashes: make(chan []common.Hash), headers: headers, installed: make(chan struct{}), err: make(chan error), @@ -300,7 +300,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti // SubscribePendingTxEvents creates a subscription that writes transaction hashes for // transactions that enter the transaction pool. -func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscription { +func (es *EventSystem) SubscribePendingTxEvents(hashes chan []common.Hash) *Subscription { sub := &subscription{ id: rpc.NewID(), typ: PendingTransactionsSubscription, @@ -348,9 +348,13 @@ func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) { } } } - case core.TxPreEvent: + case core.TxsPreEvent: + hashes := make([]common.Hash, 0, e.Txs.Len()) + for _, tx := range e.Txs { + hashes = append(hashes, tx.Hash()) + } for _, f := range filters[PendingTransactionsSubscription] { - f.hashes <- e.Tx.Hash() + f.hashes <- hashes } case core.ChainEvent: for _, f := range filters[BlocksSubscription] { @@ -446,7 +450,7 @@ func (es *EventSystem) eventLoop() { // Ensure all subscriptions get cleaned up defer func() { es.pendingLogSub.Unsubscribe() - es.txSub.Unsubscribe() + es.txsSub.Unsubscribe() es.logsSub.Unsubscribe() es.rmLogsSub.Unsubscribe() es.chainSub.Unsubscribe() @@ -460,7 +464,7 @@ func (es *EventSystem) eventLoop() { for { select { // Handle subscribed events - case ev := <-es.txCh: + case ev := <-es.txsCh: es.broadcast(index, ev) case ev := <-es.logsCh: es.broadcast(index, ev) @@ -495,7 +499,7 @@ func (es *EventSystem) eventLoop() { close(f.err) // System stopped - case <-es.txSub.Err(): + case <-es.txsSub.Err(): return case <-es.logsSub.Err(): return diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index b4df24b47..c43d282ae 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -96,7 +96,7 @@ func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types return logs, nil } -func (b *testBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { +func (b *testBackend) SubscribeTxPreEvent(ch chan<- core.TxsPreEvent) event.Subscription { return b.txFeed.Subscribe(ch) } @@ -232,10 +232,7 @@ func TestPendingTxFilter(t *testing.T) { fid0 := api.NewPendingTransactionFilter() time.Sleep(1 * time.Second) - for _, tx := range transactions { - ev := core.TxPreEvent{Tx: tx} - txFeed.Send(ev) - } + txFeed.Send(core.TxsPreEvent{transactions}) timeout := time.Now().Add(1 * time.Second) for { -- cgit v1.2.3