diff options
author | rjl493456442 <garyrong0905@gmail.com> | 2018-05-10 15:04:45 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2018-05-18 16:46:44 +0800 |
commit | a2e43d28d01ef9642c7f6992b78b86bd0696c847 (patch) | |
tree | 2f5d3444071125e84155321db6fd79d941cfee0b /eth | |
parent | 6286c255f16a914b39ffd3389cba154a53e66a13 (diff) | |
download | go-tangerine-a2e43d28d01ef9642c7f6992b78b86bd0696c847.tar go-tangerine-a2e43d28d01ef9642c7f6992b78b86bd0696c847.tar.gz go-tangerine-a2e43d28d01ef9642c7f6992b78b86bd0696c847.tar.bz2 go-tangerine-a2e43d28d01ef9642c7f6992b78b86bd0696c847.tar.lz go-tangerine-a2e43d28d01ef9642c7f6992b78b86bd0696c847.tar.xz go-tangerine-a2e43d28d01ef9642c7f6992b78b86bd0696c847.tar.zst go-tangerine-a2e43d28d01ef9642c7f6992b78b86bd0696c847.zip |
all: collate new transaction events together
Diffstat (limited to 'eth')
-rw-r--r-- | eth/api_backend.go | 2 | ||||
-rw-r--r-- | eth/filters/api.go | 14 | ||||
-rw-r--r-- | eth/filters/filter.go | 2 | ||||
-rw-r--r-- | eth/filters/filter_system.go | 38 | ||||
-rw-r--r-- | eth/filters/filter_system_test.go | 7 | ||||
-rw-r--r-- | eth/handler.go | 41 | ||||
-rw-r--r-- | eth/helper_test.go | 2 | ||||
-rw-r--r-- | eth/protocol.go | 4 | ||||
-rw-r--r-- | eth/protocol_test.go | 2 |
9 files changed, 62 insertions, 50 deletions
diff --git a/eth/api_backend.go b/eth/api_backend.go index 4ace9b594..ef70b12b7 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -188,7 +188,7 @@ func (b *EthAPIBackend) TxPoolContent() (map[common.Address]types.Transactions, return b.eth.TxPool().Content() } -func (b *EthAPIBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { +func (b *EthAPIBackend) SubscribeTxPreEvent(ch chan<- core.TxsPreEvent) event.Subscription { return b.eth.TxPool().SubscribeTxPreEvent(ch) } diff --git a/eth/filters/api.go b/eth/filters/api.go index 1297b7478..d2c9258f9 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -104,7 +104,7 @@ func (api *PublicFilterAPI) timeoutLoop() { // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { var ( - pendingTxs = make(chan common.Hash) + pendingTxs = make(chan []common.Hash) pendingTxSub = api.events.SubscribePendingTxEvents(pendingTxs) ) @@ -118,7 +118,7 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { case ph := <-pendingTxs: api.filtersMu.Lock() if f, found := api.filters[pendingTxSub.ID]; found { - f.hashes = append(f.hashes, ph) + f.hashes = append(f.hashes, ph...) } api.filtersMu.Unlock() case <-pendingTxSub.Err(): @@ -144,13 +144,17 @@ func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Su rpcSub := notifier.CreateSubscription() go func() { - txHashes := make(chan common.Hash) + txHashes := make(chan []common.Hash, 128) pendingTxSub := api.events.SubscribePendingTxEvents(txHashes) for { select { - case h := <-txHashes: - notifier.Notify(rpcSub.ID, h) + case hashes := <-txHashes: + // To keep the original behaviour, send a single tx hash in one notification. + // TODO(rjl493456442) Send a batch of tx hashes in one notification + for _, h := range hashes { + notifier.Notify(rpcSub.ID, h) + } case <-rpcSub.Err(): pendingTxSub.Unsubscribe() return diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 5dfe60e77..45d91bea0 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -36,7 +36,7 @@ type Backend interface { GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error) - SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription + SubscribeTxPreEvent(chan<- core.TxsPreEvent) event.Subscription SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index bb11734a7..5f1c12c6a 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -59,7 +59,7 @@ const ( const ( - // txChanSize is the size of channel listening to TxPreEvent. + // txChanSize is the size of channel listening to TxsPreEvent. // The number is referenced from the size of tx pool. txChanSize = 4096 // rmLogsChanSize is the size of channel listening to RemovedLogsEvent. @@ -80,7 +80,7 @@ type subscription struct { created time.Time logsCrit ethereum.FilterQuery logs chan []*types.Log - hashes chan common.Hash + hashes chan []common.Hash headers chan *types.Header installed chan struct{} // closed when the filter is installed err chan error // closed when the filter is uninstalled @@ -95,7 +95,7 @@ type EventSystem struct { lastHead *types.Header // Subscriptions - txSub event.Subscription // Subscription for new transaction event + txsSub event.Subscription // Subscription for new transaction event logsSub event.Subscription // Subscription for new log event rmLogsSub event.Subscription // Subscription for removed log event chainSub event.Subscription // Subscription for new chain event @@ -104,7 +104,7 @@ type EventSystem struct { // Channels install chan *subscription // install filter for event notification uninstall chan *subscription // remove filter for event notification - txCh chan core.TxPreEvent // Channel to receive new transaction event + txsCh chan core.TxsPreEvent // Channel to receive new transactions event logsCh chan []*types.Log // Channel to receive new log event rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event chainCh chan core.ChainEvent // Channel to receive new chain event @@ -123,14 +123,14 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS lightMode: lightMode, install: make(chan *subscription), uninstall: make(chan *subscription), - txCh: make(chan core.TxPreEvent, txChanSize), + txsCh: make(chan core.TxsPreEvent, txChanSize), logsCh: make(chan []*types.Log, logsChanSize), rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), chainCh: make(chan core.ChainEvent, chainEvChanSize), } // Subscribe events - m.txSub = m.backend.SubscribeTxPreEvent(m.txCh) + m.txsSub = m.backend.SubscribeTxPreEvent(m.txsCh) m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh) m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh) m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) @@ -138,7 +138,7 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{}) // Make sure none of the subscriptions are empty - if m.txSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || + if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogSub.Closed() { log.Crit("Subscribe for event system failed") } @@ -240,7 +240,7 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs logsCrit: crit, created: time.Now(), logs: logs, - hashes: make(chan common.Hash), + hashes: make(chan []common.Hash), headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), @@ -257,7 +257,7 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ logsCrit: crit, created: time.Now(), logs: logs, - hashes: make(chan common.Hash), + hashes: make(chan []common.Hash), headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), @@ -274,7 +274,7 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan logsCrit: crit, created: time.Now(), logs: logs, - hashes: make(chan common.Hash), + hashes: make(chan []common.Hash), headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), @@ -290,7 +290,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti typ: BlocksSubscription, created: time.Now(), logs: make(chan []*types.Log), - hashes: make(chan common.Hash), + hashes: make(chan []common.Hash), headers: headers, installed: make(chan struct{}), err: make(chan error), @@ -300,7 +300,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti // SubscribePendingTxEvents creates a subscription that writes transaction hashes for // transactions that enter the transaction pool. -func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscription { +func (es *EventSystem) SubscribePendingTxEvents(hashes chan []common.Hash) *Subscription { sub := &subscription{ id: rpc.NewID(), typ: PendingTransactionsSubscription, @@ -348,9 +348,13 @@ func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) { } } } - case core.TxPreEvent: + case core.TxsPreEvent: + hashes := make([]common.Hash, 0, e.Txs.Len()) + for _, tx := range e.Txs { + hashes = append(hashes, tx.Hash()) + } for _, f := range filters[PendingTransactionsSubscription] { - f.hashes <- e.Tx.Hash() + f.hashes <- hashes } case core.ChainEvent: for _, f := range filters[BlocksSubscription] { @@ -446,7 +450,7 @@ func (es *EventSystem) eventLoop() { // Ensure all subscriptions get cleaned up defer func() { es.pendingLogSub.Unsubscribe() - es.txSub.Unsubscribe() + es.txsSub.Unsubscribe() es.logsSub.Unsubscribe() es.rmLogsSub.Unsubscribe() es.chainSub.Unsubscribe() @@ -460,7 +464,7 @@ func (es *EventSystem) eventLoop() { for { select { // Handle subscribed events - case ev := <-es.txCh: + case ev := <-es.txsCh: es.broadcast(index, ev) case ev := <-es.logsCh: es.broadcast(index, ev) @@ -495,7 +499,7 @@ func (es *EventSystem) eventLoop() { close(f.err) // System stopped - case <-es.txSub.Err(): + case <-es.txsSub.Err(): return case <-es.logsSub.Err(): return diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index b4df24b47..c43d282ae 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -96,7 +96,7 @@ func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types return logs, nil } -func (b *testBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { +func (b *testBackend) SubscribeTxPreEvent(ch chan<- core.TxsPreEvent) event.Subscription { return b.txFeed.Subscribe(ch) } @@ -232,10 +232,7 @@ func TestPendingTxFilter(t *testing.T) { fid0 := api.NewPendingTransactionFilter() time.Sleep(1 * time.Second) - for _, tx := range transactions { - ev := core.TxPreEvent{Tx: tx} - txFeed.Send(ev) - } + txFeed.Send(core.TxsPreEvent{transactions}) timeout := time.Now().Add(1 * time.Second) for { diff --git a/eth/handler.go b/eth/handler.go index c8f7e13f1..7ef8e957a 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -46,7 +46,7 @@ const ( softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data. estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header - // txChanSize is the size of channel listening to TxPreEvent. + // txChanSize is the size of channel listening to TxsPreEvent. // The number is referenced from the size of tx pool. txChanSize = 4096 ) @@ -81,8 +81,8 @@ type ProtocolManager struct { SubProtocols []p2p.Protocol eventMux *event.TypeMux - txCh chan core.TxPreEvent - txSub event.Subscription + txsCh chan core.TxsPreEvent + txsSub event.Subscription minedBlockSub *event.TypeMuxSubscription // channels for fetcher, syncer, txsyncLoop @@ -204,8 +204,8 @@ func (pm *ProtocolManager) Start(maxPeers int) { pm.maxPeers = maxPeers // broadcast transactions - pm.txCh = make(chan core.TxPreEvent, txChanSize) - pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh) + pm.txsCh = make(chan core.TxsPreEvent, txChanSize) + pm.txsSub = pm.txpool.SubscribeTxPreEvent(pm.txsCh) go pm.txBroadcastLoop() // broadcast mined blocks @@ -220,7 +220,7 @@ func (pm *ProtocolManager) Start(maxPeers int) { func (pm *ProtocolManager) Stop() { log.Info("Stopping Ethereum protocol") - pm.txSub.Unsubscribe() // quits txBroadcastLoop + pm.txsSub.Unsubscribe() // quits txBroadcastLoop pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop // Quit the sync loop. @@ -712,16 +712,23 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { } } -// BroadcastTx will propagate a transaction to all peers which are not known to +// BroadcastTxs will propagate a batch of transactions to all peers which are not known to // already have the given transaction. -func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) { - // Broadcast transaction to a batch of peers not knowing about it - peers := pm.peers.PeersWithoutTx(hash) - //FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))] - for _, peer := range peers { - peer.SendTransactions(types.Transactions{tx}) +func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) { + var txset = make(map[*peer]types.Transactions) + + // Broadcast transactions to a batch of peers not knowing about it + for _, tx := range txs { + peers := pm.peers.PeersWithoutTx(tx.Hash()) + for _, peer := range peers { + txset[peer] = append(txset[peer], tx) + } + log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers)) + } + // FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))] + for peer, txs := range txset { + peer.SendTransactions(txs) } - log.Trace("Broadcast transaction", "hash", hash, "recipients", len(peers)) } // Mined broadcast loop @@ -739,11 +746,11 @@ func (pm *ProtocolManager) minedBroadcastLoop() { func (pm *ProtocolManager) txBroadcastLoop() { for { select { - case event := <-pm.txCh: - pm.BroadcastTx(event.Tx.Hash(), event.Tx) + case event := <-pm.txsCh: + pm.BroadcastTxs(event.Txs) // Err() channel will be closed when unsubscribing. - case <-pm.txSub.Err(): + case <-pm.txsSub.Err(): return } } diff --git a/eth/helper_test.go b/eth/helper_test.go index 8a0260fc9..40a172ef8 100644 --- a/eth/helper_test.go +++ b/eth/helper_test.go @@ -124,7 +124,7 @@ func (p *testTxPool) Pending() (map[common.Address]types.Transactions, error) { return batches, nil } -func (p *testTxPool) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { +func (p *testTxPool) SubscribeTxPreEvent(ch chan<- core.TxsPreEvent) event.Subscription { return p.txFeed.Subscribe(ch) } diff --git a/eth/protocol.go b/eth/protocol.go index 328d5b993..bee3d8365 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -104,8 +104,8 @@ type txPool interface { Pending() (map[common.Address]types.Transactions, error) // SubscribeTxPreEvent should return an event subscription of - // TxPreEvent and send events to the given channel. - SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription + // TxsPreEvent and send events to the given channel. + SubscribeTxPreEvent(chan<- core.TxsPreEvent) event.Subscription } // statusData is the network packet for the status message. diff --git a/eth/protocol_test.go b/eth/protocol_test.go index b2f93d8dd..1c32ea7d8 100644 --- a/eth/protocol_test.go +++ b/eth/protocol_test.go @@ -116,7 +116,7 @@ func testRecvTransactions(t *testing.T, protocol int) { t.Errorf("added wrong tx hash: got %v, want %v", added[0].Hash(), tx.Hash()) } case <-time.After(2 * time.Second): - t.Errorf("no TxPreEvent received within 2 seconds") + t.Errorf("no TxsPreEvent received within 2 seconds") } } |