From a2e43d28d01ef9642c7f6992b78b86bd0696c847 Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Thu, 10 May 2018 15:04:45 +0800 Subject: all: collate new transaction events together --- eth/filters/filter_system.go | 38 +++++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 17 deletions(-) (limited to 'eth/filters/filter_system.go') diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index bb11734a7..5f1c12c6a 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -59,7 +59,7 @@ const ( const ( - // txChanSize is the size of channel listening to TxPreEvent. + // txChanSize is the size of channel listening to TxsPreEvent. // The number is referenced from the size of tx pool. txChanSize = 4096 // rmLogsChanSize is the size of channel listening to RemovedLogsEvent. @@ -80,7 +80,7 @@ type subscription struct { created time.Time logsCrit ethereum.FilterQuery logs chan []*types.Log - hashes chan common.Hash + hashes chan []common.Hash headers chan *types.Header installed chan struct{} // closed when the filter is installed err chan error // closed when the filter is uninstalled @@ -95,7 +95,7 @@ type EventSystem struct { lastHead *types.Header // Subscriptions - txSub event.Subscription // Subscription for new transaction event + txsSub event.Subscription // Subscription for new transaction event logsSub event.Subscription // Subscription for new log event rmLogsSub event.Subscription // Subscription for removed log event chainSub event.Subscription // Subscription for new chain event @@ -104,7 +104,7 @@ type EventSystem struct { // Channels install chan *subscription // install filter for event notification uninstall chan *subscription // remove filter for event notification - txCh chan core.TxPreEvent // Channel to receive new transaction event + txsCh chan core.TxsPreEvent // Channel to receive new transactions event logsCh chan []*types.Log // Channel to receive new log event rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event chainCh chan core.ChainEvent // Channel to receive new chain event @@ -123,14 +123,14 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS lightMode: lightMode, install: make(chan *subscription), uninstall: make(chan *subscription), - txCh: make(chan core.TxPreEvent, txChanSize), + txsCh: make(chan core.TxsPreEvent, txChanSize), logsCh: make(chan []*types.Log, logsChanSize), rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), chainCh: make(chan core.ChainEvent, chainEvChanSize), } // Subscribe events - m.txSub = m.backend.SubscribeTxPreEvent(m.txCh) + m.txsSub = m.backend.SubscribeTxPreEvent(m.txsCh) m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh) m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh) m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) @@ -138,7 +138,7 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{}) // Make sure none of the subscriptions are empty - if m.txSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || + if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogSub.Closed() { log.Crit("Subscribe for event system failed") } @@ -240,7 +240,7 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs logsCrit: crit, created: time.Now(), logs: logs, - hashes: make(chan common.Hash), + hashes: make(chan []common.Hash), headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), @@ -257,7 +257,7 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ logsCrit: crit, created: time.Now(), logs: logs, - hashes: make(chan common.Hash), + hashes: make(chan []common.Hash), headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), @@ -274,7 +274,7 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan logsCrit: crit, created: time.Now(), logs: logs, - hashes: make(chan common.Hash), + hashes: make(chan []common.Hash), headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), @@ -290,7 +290,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti typ: BlocksSubscription, created: time.Now(), logs: make(chan []*types.Log), - hashes: make(chan common.Hash), + hashes: make(chan []common.Hash), headers: headers, installed: make(chan struct{}), err: make(chan error), @@ -300,7 +300,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti // SubscribePendingTxEvents creates a subscription that writes transaction hashes for // transactions that enter the transaction pool. -func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscription { +func (es *EventSystem) SubscribePendingTxEvents(hashes chan []common.Hash) *Subscription { sub := &subscription{ id: rpc.NewID(), typ: PendingTransactionsSubscription, @@ -348,9 +348,13 @@ func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) { } } } - case core.TxPreEvent: + case core.TxsPreEvent: + hashes := make([]common.Hash, 0, e.Txs.Len()) + for _, tx := range e.Txs { + hashes = append(hashes, tx.Hash()) + } for _, f := range filters[PendingTransactionsSubscription] { - f.hashes <- e.Tx.Hash() + f.hashes <- hashes } case core.ChainEvent: for _, f := range filters[BlocksSubscription] { @@ -446,7 +450,7 @@ func (es *EventSystem) eventLoop() { // Ensure all subscriptions get cleaned up defer func() { es.pendingLogSub.Unsubscribe() - es.txSub.Unsubscribe() + es.txsSub.Unsubscribe() es.logsSub.Unsubscribe() es.rmLogsSub.Unsubscribe() es.chainSub.Unsubscribe() @@ -460,7 +464,7 @@ func (es *EventSystem) eventLoop() { for { select { // Handle subscribed events - case ev := <-es.txCh: + case ev := <-es.txsCh: es.broadcast(index, ev) case ev := <-es.logsCh: es.broadcast(index, ev) @@ -495,7 +499,7 @@ func (es *EventSystem) eventLoop() { close(f.err) // System stopped - case <-es.txSub.Err(): + case <-es.txsSub.Err(): return case <-es.logsSub.Err(): return -- cgit v1.2.3