diff options
author | bas-vk <bas-vk@users.noreply.github.com> | 2016-11-28 21:59:06 +0800 |
---|---|---|
committer | Jeffrey Wilcke <jeffrey@ethereum.org> | 2016-11-28 21:59:06 +0800 |
commit | b5be6b72cb06ded22075a41db6abc670d33c5ea7 (patch) | |
tree | 288845ba8db1db4d1e0d01ed612240c6229d8a9e /eth/filters/filter_system.go | |
parent | 318ad3c1e424912ff69c9febc4e08a0137f8803f (diff) | |
download | go-tangerine-b5be6b72cb06ded22075a41db6abc670d33c5ea7.tar go-tangerine-b5be6b72cb06ded22075a41db6abc670d33c5ea7.tar.gz go-tangerine-b5be6b72cb06ded22075a41db6abc670d33c5ea7.tar.bz2 go-tangerine-b5be6b72cb06ded22075a41db6abc670d33c5ea7.tar.lz go-tangerine-b5be6b72cb06ded22075a41db6abc670d33c5ea7.tar.xz go-tangerine-b5be6b72cb06ded22075a41db6abc670d33c5ea7.tar.zst go-tangerine-b5be6b72cb06ded22075a41db6abc670d33c5ea7.zip |
eth/filter: add support for pending logs (#3219)
Diffstat (limited to 'eth/filters/filter_system.go')
-rw-r--r-- | eth/filters/filter_system.go | 131 |
1 files changed, 107 insertions, 24 deletions
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index c2c072a9f..b59718aea 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -43,13 +43,17 @@ const ( UnknownSubscription Type = iota // LogsSubscription queries for new or removed (chain reorg) logs LogsSubscription - // PendingLogsSubscription queries for logs for the pending block + // PendingLogsSubscription queries for logs in pending blocks PendingLogsSubscription + // MinedAndPendingLogsSubscription queries for logs in mined and pending blocks. + MinedAndPendingLogsSubscription // PendingTransactionsSubscription queries tx hashes for pending // transactions entering the pending state PendingTransactionsSubscription // BlocksSubscription queries hashes for blocks that are imported BlocksSubscription + // LastSubscription keeps track of the last index + LastIndexSubscription ) var ( @@ -63,19 +67,26 @@ type Log struct { Removed bool `json:"removed"` } +// MarshalJSON returns *l as the JSON encoding of l. func (l *Log) MarshalJSON() ([]byte, error) { fields := map[string]interface{}{ "address": l.Address, "data": fmt.Sprintf("0x%x", l.Data), - "blockNumber": fmt.Sprintf("%#x", l.BlockNumber), + "blockNumber": nil, "logIndex": fmt.Sprintf("%#x", l.Index), - "blockHash": l.BlockHash, + "blockHash": nil, "transactionHash": l.TxHash, "transactionIndex": fmt.Sprintf("%#x", l.TxIndex), "topics": l.Topics, "removed": l.Removed, } + // mined logs + if l.BlockHash != (common.Hash{}) { + fields["blockNumber"] = fmt.Sprintf("%#x", l.BlockNumber) + fields["blockHash"] = l.BlockHash + } + return json.Marshal(fields) } @@ -169,11 +180,50 @@ func (es *EventSystem) subscribe(sub *subscription) *Subscription { } // SubscribeLogs creates a subscription that will write all logs matching the -// given criteria to the given logs channel. -func (es *EventSystem) SubscribeLogs(crit FilterCriteria, logs chan []Log) *Subscription { +// given criteria to the given logs channel. Default value for the from and to +// block is "latest". If the fromBlock > toBlock an error is returned. +func (es *EventSystem) SubscribeLogs(crit FilterCriteria, logs chan []Log) (*Subscription, error) { + var from, to rpc.BlockNumber + if crit.FromBlock == nil { + from = rpc.LatestBlockNumber + } else { + from = rpc.BlockNumber(crit.FromBlock.Int64()) + } + if crit.ToBlock == nil { + to = rpc.LatestBlockNumber + } else { + to = rpc.BlockNumber(crit.ToBlock.Int64()) + } + + // only interested in pending logs + if from == rpc.PendingBlockNumber && to == rpc.PendingBlockNumber { + return es.subscribePendingLogs(crit, logs), nil + } + // only interested in new mined logs + if from == rpc.LatestBlockNumber && to == rpc.LatestBlockNumber { + return es.subscribeLogs(crit, logs), nil + } + // only interested in mined logs within a specific block range + if from >= 0 && to >= 0 && to >= from { + return es.subscribeLogs(crit, logs), nil + } + // interested in mined logs from a specific block number, new logs and pending logs + if from >= rpc.LatestBlockNumber && to == rpc.PendingBlockNumber { + return es.subscribeMinedPendingLogs(crit, logs), nil + } + // interested in logs from a specific block number to new mined blocks + if from >= 0 && to == rpc.LatestBlockNumber { + return es.subscribeLogs(crit, logs), nil + } + return nil, fmt.Errorf("invalid from and to block combination: from > to") +} + +// subscribeMinedPendingLogs creates a subscription that returned mined and +// pending logs that match the given criteria. +func (es *EventSystem) subscribeMinedPendingLogs(crit FilterCriteria, logs chan []Log) *Subscription { sub := &subscription{ id: rpc.NewID(), - typ: LogsSubscription, + typ: MinedAndPendingLogsSubscription, logsCrit: crit, created: time.Now(), logs: logs, @@ -186,12 +236,12 @@ func (es *EventSystem) SubscribeLogs(crit FilterCriteria, logs chan []Log) *Subs return es.subscribe(sub) } -// SubscribePendingLogs creates a subscription that will write pending logs matching the -// given criteria to the given channel. -func (es *EventSystem) SubscribePendingLogs(crit FilterCriteria, logs chan []Log) *Subscription { +// subscribeLogs creates a subscription that will write all logs matching the +// given criteria to the given logs channel. +func (es *EventSystem) subscribeLogs(crit FilterCriteria, logs chan []Log) *Subscription { sub := &subscription{ id: rpc.NewID(), - typ: PendingLogsSubscription, + typ: LogsSubscription, logsCrit: crit, created: time.Now(), logs: logs, @@ -204,15 +254,16 @@ func (es *EventSystem) SubscribePendingLogs(crit FilterCriteria, logs chan []Log return es.subscribe(sub) } -// SubscribePendingTxEvents creates a sbuscription that writes transaction hashes for +// subscribePendingLogs creates a subscription that writes transaction hashes for // transactions that enter the transaction pool. -func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscription { +func (es *EventSystem) subscribePendingLogs(crit FilterCriteria, logs chan []Log) *Subscription { sub := &subscription{ id: rpc.NewID(), - typ: PendingTransactionsSubscription, + typ: PendingLogsSubscription, + logsCrit: crit, created: time.Now(), - logs: make(chan []Log), - hashes: hashes, + logs: logs, + hashes: make(chan common.Hash), headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), @@ -238,6 +289,23 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti return es.subscribe(sub) } +// SubscribePendingTxEvents creates a subscription that writes transaction hashes for +// transactions that enter the transaction pool. +func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: PendingTransactionsSubscription, + created: time.Now(), + logs: make(chan []Log), + hashes: hashes, + headers: make(chan *types.Header), + installed: make(chan struct{}), + err: make(chan error), + } + + return es.subscribe(sub) +} + type filterIndex map[Type]map[rpc.ID]*subscription // broadcast event to filters that match criteria. @@ -251,7 +319,7 @@ func (es *EventSystem) broadcast(filters filterIndex, ev *event.Event) { if len(e) > 0 { for _, f := range filters[LogsSubscription] { if ev.Time.After(f.created) { - if matchedLogs := filterLogs(convertLogs(e, false), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + if matchedLogs := filterLogs(convertLogs(e, false), f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { f.logs <- matchedLogs } } @@ -260,7 +328,7 @@ func (es *EventSystem) broadcast(filters filterIndex, ev *event.Event) { case core.RemovedLogsEvent: for _, f := range filters[LogsSubscription] { if ev.Time.After(f.created) { - if matchedLogs := filterLogs(convertLogs(e.Logs, true), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + if matchedLogs := filterLogs(convertLogs(e.Logs, true), f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { f.logs <- matchedLogs } } @@ -268,7 +336,7 @@ func (es *EventSystem) broadcast(filters filterIndex, ev *event.Event) { case core.PendingLogsEvent: for _, f := range filters[PendingLogsSubscription] { if ev.Time.After(f.created) { - if matchedLogs := filterLogs(convertLogs(e.Logs, false), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + if matchedLogs := filterLogs(convertLogs(e.Logs, false), nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { f.logs <- matchedLogs } } @@ -351,8 +419,8 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common. } unfiltered = append(unfiltered, rl...) } - logs := filterLogs(unfiltered, addresses, topics) - //fmt.Println("found", len(logs)) + + logs := filterLogs(unfiltered, nil, nil, addresses, topics) return logs } return nil @@ -364,6 +432,11 @@ func (es *EventSystem) eventLoop() { index = make(filterIndex) sub = es.mux.Subscribe(core.PendingLogsEvent{}, core.RemovedLogsEvent{}, vm.Logs{}, core.TxPreEvent{}, core.ChainEvent{}) ) + + for i := UnknownSubscription; i < LastIndexSubscription; i++ { + index[i] = make(map[rpc.ID]*subscription) + } + for { select { case ev, active := <-sub.Chan(): @@ -372,13 +445,22 @@ func (es *EventSystem) eventLoop() { } es.broadcast(index, ev) case f := <-es.install: - if _, found := index[f.typ]; !found { - index[f.typ] = make(map[rpc.ID]*subscription) + if f.typ == MinedAndPendingLogsSubscription { + // the type are logs and pending logs subscriptions + index[LogsSubscription][f.id] = f + index[PendingLogsSubscription][f.id] = f + } else { + index[f.typ][f.id] = f } - index[f.typ][f.id] = f close(f.installed) case f := <-es.uninstall: - delete(index[f.typ], f.id) + if f.typ == MinedAndPendingLogsSubscription { + // the type are logs and pending logs subscriptions + delete(index[LogsSubscription], f.id) + delete(index[PendingLogsSubscription], f.id) + } else { + delete(index[f.typ], f.id) + } close(f.err) } } @@ -386,6 +468,7 @@ func (es *EventSystem) eventLoop() { // convertLogs is a helper utility that converts vm.Logs to []filter.Log. func convertLogs(in vm.Logs, removed bool) []Log { + logs := make([]Log, len(in)) for i, l := range in { logs[i] = Log{l, removed} |