aboutsummaryrefslogtreecommitdiffstats
path: root/eth/filters/filter_system.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/filters/filter_system.go')
-rw-r--r--eth/filters/filter_system.go131
1 files changed, 107 insertions, 24 deletions
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index c2c072a9f..b59718aea 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -43,13 +43,17 @@ const (
UnknownSubscription Type = iota
// LogsSubscription queries for new or removed (chain reorg) logs
LogsSubscription
- // PendingLogsSubscription queries for logs for the pending block
+ // PendingLogsSubscription queries for logs in pending blocks
PendingLogsSubscription
+ // MinedAndPendingLogsSubscription queries for logs in mined and pending blocks.
+ MinedAndPendingLogsSubscription
// PendingTransactionsSubscription queries tx hashes for pending
// transactions entering the pending state
PendingTransactionsSubscription
// BlocksSubscription queries hashes for blocks that are imported
BlocksSubscription
+ // LastSubscription keeps track of the last index
+ LastIndexSubscription
)
var (
@@ -63,19 +67,26 @@ type Log struct {
Removed bool `json:"removed"`
}
+// MarshalJSON returns *l as the JSON encoding of l.
func (l *Log) MarshalJSON() ([]byte, error) {
fields := map[string]interface{}{
"address": l.Address,
"data": fmt.Sprintf("0x%x", l.Data),
- "blockNumber": fmt.Sprintf("%#x", l.BlockNumber),
+ "blockNumber": nil,
"logIndex": fmt.Sprintf("%#x", l.Index),
- "blockHash": l.BlockHash,
+ "blockHash": nil,
"transactionHash": l.TxHash,
"transactionIndex": fmt.Sprintf("%#x", l.TxIndex),
"topics": l.Topics,
"removed": l.Removed,
}
+ // mined logs
+ if l.BlockHash != (common.Hash{}) {
+ fields["blockNumber"] = fmt.Sprintf("%#x", l.BlockNumber)
+ fields["blockHash"] = l.BlockHash
+ }
+
return json.Marshal(fields)
}
@@ -169,11 +180,50 @@ func (es *EventSystem) subscribe(sub *subscription) *Subscription {
}
// SubscribeLogs creates a subscription that will write all logs matching the
-// given criteria to the given logs channel.
-func (es *EventSystem) SubscribeLogs(crit FilterCriteria, logs chan []Log) *Subscription {
+// given criteria to the given logs channel. Default value for the from and to
+// block is "latest". If the fromBlock > toBlock an error is returned.
+func (es *EventSystem) SubscribeLogs(crit FilterCriteria, logs chan []Log) (*Subscription, error) {
+ var from, to rpc.BlockNumber
+ if crit.FromBlock == nil {
+ from = rpc.LatestBlockNumber
+ } else {
+ from = rpc.BlockNumber(crit.FromBlock.Int64())
+ }
+ if crit.ToBlock == nil {
+ to = rpc.LatestBlockNumber
+ } else {
+ to = rpc.BlockNumber(crit.ToBlock.Int64())
+ }
+
+ // only interested in pending logs
+ if from == rpc.PendingBlockNumber && to == rpc.PendingBlockNumber {
+ return es.subscribePendingLogs(crit, logs), nil
+ }
+ // only interested in new mined logs
+ if from == rpc.LatestBlockNumber && to == rpc.LatestBlockNumber {
+ return es.subscribeLogs(crit, logs), nil
+ }
+ // only interested in mined logs within a specific block range
+ if from >= 0 && to >= 0 && to >= from {
+ return es.subscribeLogs(crit, logs), nil
+ }
+ // interested in mined logs from a specific block number, new logs and pending logs
+ if from >= rpc.LatestBlockNumber && to == rpc.PendingBlockNumber {
+ return es.subscribeMinedPendingLogs(crit, logs), nil
+ }
+ // interested in logs from a specific block number to new mined blocks
+ if from >= 0 && to == rpc.LatestBlockNumber {
+ return es.subscribeLogs(crit, logs), nil
+ }
+ return nil, fmt.Errorf("invalid from and to block combination: from > to")
+}
+
+// subscribeMinedPendingLogs creates a subscription that returned mined and
+// pending logs that match the given criteria.
+func (es *EventSystem) subscribeMinedPendingLogs(crit FilterCriteria, logs chan []Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
- typ: LogsSubscription,
+ typ: MinedAndPendingLogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
@@ -186,12 +236,12 @@ func (es *EventSystem) SubscribeLogs(crit FilterCriteria, logs chan []Log) *Subs
return es.subscribe(sub)
}
-// SubscribePendingLogs creates a subscription that will write pending logs matching the
-// given criteria to the given channel.
-func (es *EventSystem) SubscribePendingLogs(crit FilterCriteria, logs chan []Log) *Subscription {
+// subscribeLogs creates a subscription that will write all logs matching the
+// given criteria to the given logs channel.
+func (es *EventSystem) subscribeLogs(crit FilterCriteria, logs chan []Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
- typ: PendingLogsSubscription,
+ typ: LogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
@@ -204,15 +254,16 @@ func (es *EventSystem) SubscribePendingLogs(crit FilterCriteria, logs chan []Log
return es.subscribe(sub)
}
-// SubscribePendingTxEvents creates a sbuscription that writes transaction hashes for
+// subscribePendingLogs creates a subscription that writes transaction hashes for
// transactions that enter the transaction pool.
-func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscription {
+func (es *EventSystem) subscribePendingLogs(crit FilterCriteria, logs chan []Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
- typ: PendingTransactionsSubscription,
+ typ: PendingLogsSubscription,
+ logsCrit: crit,
created: time.Now(),
- logs: make(chan []Log),
- hashes: hashes,
+ logs: logs,
+ hashes: make(chan common.Hash),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
@@ -238,6 +289,23 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
return es.subscribe(sub)
}
+// SubscribePendingTxEvents creates a subscription that writes transaction hashes for
+// transactions that enter the transaction pool.
+func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscription {
+ sub := &subscription{
+ id: rpc.NewID(),
+ typ: PendingTransactionsSubscription,
+ created: time.Now(),
+ logs: make(chan []Log),
+ hashes: hashes,
+ headers: make(chan *types.Header),
+ installed: make(chan struct{}),
+ err: make(chan error),
+ }
+
+ return es.subscribe(sub)
+}
+
type filterIndex map[Type]map[rpc.ID]*subscription
// broadcast event to filters that match criteria.
@@ -251,7 +319,7 @@ func (es *EventSystem) broadcast(filters filterIndex, ev *event.Event) {
if len(e) > 0 {
for _, f := range filters[LogsSubscription] {
if ev.Time.After(f.created) {
- if matchedLogs := filterLogs(convertLogs(e, false), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ if matchedLogs := filterLogs(convertLogs(e, false), f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
@@ -260,7 +328,7 @@ func (es *EventSystem) broadcast(filters filterIndex, ev *event.Event) {
case core.RemovedLogsEvent:
for _, f := range filters[LogsSubscription] {
if ev.Time.After(f.created) {
- if matchedLogs := filterLogs(convertLogs(e.Logs, true), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ if matchedLogs := filterLogs(convertLogs(e.Logs, true), f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
@@ -268,7 +336,7 @@ func (es *EventSystem) broadcast(filters filterIndex, ev *event.Event) {
case core.PendingLogsEvent:
for _, f := range filters[PendingLogsSubscription] {
if ev.Time.After(f.created) {
- if matchedLogs := filterLogs(convertLogs(e.Logs, false), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ if matchedLogs := filterLogs(convertLogs(e.Logs, false), nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
@@ -351,8 +419,8 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.
}
unfiltered = append(unfiltered, rl...)
}
- logs := filterLogs(unfiltered, addresses, topics)
- //fmt.Println("found", len(logs))
+
+ logs := filterLogs(unfiltered, nil, nil, addresses, topics)
return logs
}
return nil
@@ -364,6 +432,11 @@ func (es *EventSystem) eventLoop() {
index = make(filterIndex)
sub = es.mux.Subscribe(core.PendingLogsEvent{}, core.RemovedLogsEvent{}, vm.Logs{}, core.TxPreEvent{}, core.ChainEvent{})
)
+
+ for i := UnknownSubscription; i < LastIndexSubscription; i++ {
+ index[i] = make(map[rpc.ID]*subscription)
+ }
+
for {
select {
case ev, active := <-sub.Chan():
@@ -372,13 +445,22 @@ func (es *EventSystem) eventLoop() {
}
es.broadcast(index, ev)
case f := <-es.install:
- if _, found := index[f.typ]; !found {
- index[f.typ] = make(map[rpc.ID]*subscription)
+ if f.typ == MinedAndPendingLogsSubscription {
+ // the type are logs and pending logs subscriptions
+ index[LogsSubscription][f.id] = f
+ index[PendingLogsSubscription][f.id] = f
+ } else {
+ index[f.typ][f.id] = f
}
- index[f.typ][f.id] = f
close(f.installed)
case f := <-es.uninstall:
- delete(index[f.typ], f.id)
+ if f.typ == MinedAndPendingLogsSubscription {
+ // the type are logs and pending logs subscriptions
+ delete(index[LogsSubscription], f.id)
+ delete(index[PendingLogsSubscription], f.id)
+ } else {
+ delete(index[f.typ], f.id)
+ }
close(f.err)
}
}
@@ -386,6 +468,7 @@ func (es *EventSystem) eventLoop() {
// convertLogs is a helper utility that converts vm.Logs to []filter.Log.
func convertLogs(in vm.Logs, removed bool) []Log {
+
logs := make([]Log, len(in))
for i, l := range in {
logs[i] = Log{l, removed}