aboutsummaryrefslogtreecommitdiffstats
path: root/eth/filters/filter.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/filters/filter.go')
-rw-r--r--eth/filters/filter.go275
1 files changed, 140 insertions, 135 deletions
diff --git a/eth/filters/filter.go b/eth/filters/filter.go
index 0a0b81224..e208f8f38 100644
--- a/eth/filters/filter.go
+++ b/eth/filters/filter.go
@@ -18,12 +18,11 @@ package filters
import (
"context"
- "math"
"math/big"
- "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
@@ -35,165 +34,185 @@ type Backend interface {
EventMux() *event.TypeMux
HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
+
+ SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription
+ SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
+ SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
+ SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
+
+ BloomStatus() (uint64, uint64)
+ ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
}
// Filter can be used to retrieve and filter logs.
type Filter struct {
- backend Backend
- useMipMap bool
-
- created time.Time
+ backend Backend
db ethdb.Database
begin, end int64
addresses []common.Address
topics [][]common.Hash
+
+ matcher *bloombits.Matcher
}
// New creates a new filter which uses a bloom filter on blocks to figure out whether
// a particular block is interesting or not.
-// MipMaps allow past blocks to be searched much more efficiently, but are not available
-// to light clients.
-func New(backend Backend, useMipMap bool) *Filter {
+func New(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter {
+ // Flatten the address and topic filter clauses into a single bloombits filter
+ // system. Since the bloombits are not positional, nil topics are permitted,
+ // which get flattened into a nil byte slice.
+ var filters [][][]byte
+ if len(addresses) > 0 {
+ filter := make([][]byte, len(addresses))
+ for i, address := range addresses {
+ filter[i] = address.Bytes()
+ }
+ filters = append(filters, filter)
+ }
+ for _, topicList := range topics {
+ filter := make([][]byte, len(topicList))
+ for i, topic := range topicList {
+ filter[i] = topic.Bytes()
+ }
+ filters = append(filters, filter)
+ }
+ // Assemble and return the filter
+ size, _ := backend.BloomStatus()
+
return &Filter{
backend: backend,
- useMipMap: useMipMap,
+ begin: begin,
+ end: end,
+ addresses: addresses,
+ topics: topics,
db: backend.ChainDb(),
+ matcher: bloombits.NewMatcher(size, filters),
}
}
-// SetBeginBlock sets the earliest block for filtering.
-// -1 = latest block (i.e., the current block)
-// hash = particular hash from-to
-func (f *Filter) SetBeginBlock(begin int64) {
- f.begin = begin
-}
-
-// SetEndBlock sets the latest block for filtering.
-func (f *Filter) SetEndBlock(end int64) {
- f.end = end
-}
-
-// SetAddresses matches only logs that are generated from addresses that are included
-// in the given addresses.
-func (f *Filter) SetAddresses(addr []common.Address) {
- f.addresses = addr
-}
-
-// SetTopics matches only logs that have topics matching the given topics.
-func (f *Filter) SetTopics(topics [][]common.Hash) {
- f.topics = topics
-}
-
-// FindOnce searches the blockchain for matching log entries, returning
-// all matching entries from the first block that contains matches,
-// updating the start point of the filter accordingly. If no results are
-// found, a nil slice is returned.
-func (f *Filter) FindOnce(ctx context.Context) ([]*types.Log, error) {
- head, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
- if head == nil {
+// Logs searches the blockchain for matching log entries, returning all from the
+// first block that contains matches, updating the start of the filter accordingly.
+func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
+ // Figure out the limits of the filter range
+ header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
+ if header == nil {
return nil, nil
}
- headBlockNumber := head.Number.Uint64()
+ head := header.Number.Uint64()
- var beginBlockNo uint64 = uint64(f.begin)
if f.begin == -1 {
- beginBlockNo = headBlockNumber
+ f.begin = int64(head)
}
- var endBlockNo uint64 = uint64(f.end)
+ end := uint64(f.end)
if f.end == -1 {
- endBlockNo = headBlockNumber
+ end = head
}
-
- // if no addresses are present we can't make use of fast search which
- // uses the mipmap bloom filters to check for fast inclusion and uses
- // higher range probability in order to ensure at least a false positive
- if !f.useMipMap || len(f.addresses) == 0 {
- logs, blockNumber, err := f.getLogs(ctx, beginBlockNo, endBlockNo)
- f.begin = int64(blockNumber + 1)
- return logs, err
- }
-
- logs, blockNumber := f.mipFind(beginBlockNo, endBlockNo, 0)
- f.begin = int64(blockNumber + 1)
- return logs, nil
-}
-
-// Run filters logs with the current parameters set
-func (f *Filter) Find(ctx context.Context) (logs []*types.Log, err error) {
- for {
- newLogs, err := f.FindOnce(ctx)
- if len(newLogs) == 0 || err != nil {
+ // Gather all indexed logs, and finish with non indexed ones
+ var (
+ logs []*types.Log
+ err error
+ )
+ size, sections := f.backend.BloomStatus()
+ if indexed := sections * size; indexed > uint64(f.begin) {
+ if indexed > end {
+ logs, err = f.indexedLogs(ctx, end)
+ } else {
+ logs, err = f.indexedLogs(ctx, indexed-1)
+ }
+ if err != nil {
return logs, err
}
- logs = append(logs, newLogs...)
}
+ rest, err := f.unindexedLogs(ctx, end)
+ logs = append(logs, rest...)
+ return logs, err
}
-func (f *Filter) mipFind(start, end uint64, depth int) (logs []*types.Log, blockNumber uint64) {
- level := core.MIPMapLevels[depth]
- // normalise numerator so we can work in level specific batches and
- // work with the proper range checks
- for num := start / level * level; num <= end; num += level {
- // find addresses in bloom filters
- bloom := core.GetMipmapBloom(f.db, num, level)
- // Don't bother checking the first time through the loop - we're probably picking
- // up where a previous run left off.
- first := true
- for _, addr := range f.addresses {
- if first || bloom.TestBytes(addr[:]) {
- first = false
- // range check normalised values and make sure that
- // we're resolving the correct range instead of the
- // normalised values.
- start := uint64(math.Max(float64(num), float64(start)))
- end := uint64(math.Min(float64(num+level-1), float64(end)))
- if depth+1 == len(core.MIPMapLevels) {
- l, blockNumber, _ := f.getLogs(context.Background(), start, end)
- if len(l) > 0 {
- return l, blockNumber
- }
- } else {
- l, blockNumber := f.mipFind(start, end, depth+1)
- if len(l) > 0 {
- return l, blockNumber
- }
+// indexedLogs returns the logs matching the filter criteria based on the bloom
+// bits indexed available locally or via the network.
+func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
+ // Create a matcher session and request servicing from the backend
+ matches := make(chan uint64, 64)
+
+ session, err := f.matcher.Start(ctx, uint64(f.begin), end, matches)
+ if err != nil {
+ return nil, err
+ }
+ defer session.Close()
+
+ f.backend.ServiceFilter(ctx, session)
+
+ // Iterate over the matches until exhausted or context closed
+ var logs []*types.Log
+
+ for {
+ select {
+ case number, ok := <-matches:
+ // Abort if all matches have been fulfilled
+ if !ok {
+ err := session.Error()
+ if err == nil {
+ f.begin = int64(end) + 1
}
+ return logs, err
+ }
+ f.begin = int64(number) + 1
+ // Retrieve the suggested block and pull any truly matching logs
+ header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
+ if header == nil || err != nil {
+ return logs, err
}
+ found, err := f.checkMatches(ctx, header)
+ if err != nil {
+ return logs, err
+ }
+ logs = append(logs, found...)
+
+ case <-ctx.Done():
+ return logs, ctx.Err()
}
}
-
- return nil, end
}
-func (f *Filter) getLogs(ctx context.Context, start, end uint64) (logs []*types.Log, blockNumber uint64, err error) {
- for i := start; i <= end; i++ {
- blockNumber := rpc.BlockNumber(i)
- header, err := f.backend.HeaderByNumber(ctx, blockNumber)
+// indexedLogs returns the logs matching the filter criteria based on raw block
+// iteration and bloom matching.
+func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
+ var logs []*types.Log
+
+ for ; f.begin <= int64(end); f.begin++ {
+ header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
if header == nil || err != nil {
- return logs, end, err
+ return logs, err
}
-
- // Use bloom filtering to see if this block is interesting given the
- // current parameters
- if f.bloomFilter(header.Bloom) {
- // Get the logs of the block
- receipts, err := f.backend.GetReceipts(ctx, header.Hash())
+ if bloomFilter(header.Bloom, f.addresses, f.topics) {
+ found, err := f.checkMatches(ctx, header)
if err != nil {
- return nil, end, err
- }
- var unfiltered []*types.Log
- for _, receipt := range receipts {
- unfiltered = append(unfiltered, ([]*types.Log)(receipt.Logs)...)
- }
- logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
- if len(logs) > 0 {
- return logs, uint64(blockNumber), nil
+ return logs, err
}
+ logs = append(logs, found...)
}
}
+ return logs, nil
+}
- return logs, end, nil
+// checkMatches checks if the receipts belonging to the given header contain any log events that
+// match the filter criteria. This function is called when the bloom filter signals a potential match.
+func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs []*types.Log, err error) {
+ // Get the logs of the block
+ receipts, err := f.backend.GetReceipts(ctx, header.Hash())
+ if err != nil {
+ return nil, err
+ }
+ var unfiltered []*types.Log
+ for _, receipt := range receipts {
+ unfiltered = append(unfiltered, receipt.Logs...)
+ }
+ logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
+ if len(logs) > 0 {
+ return logs, nil
+ }
+ return nil, nil
}
func includes(addresses []common.Address, a common.Address) bool {
@@ -221,39 +240,27 @@ Logs:
if len(addresses) > 0 && !includes(addresses, log.Address) {
continue
}
-
- logTopics := make([]common.Hash, len(topics))
- copy(logTopics, log.Topics)
-
// If the to filtered topics is greater than the amount of topics in logs, skip.
if len(topics) > len(log.Topics) {
continue Logs
}
-
for i, topics := range topics {
- var match bool
+ match := len(topics) == 0 // empty rule set == wildcard
for _, topic := range topics {
- // common.Hash{} is a match all (wildcard)
- if (topic == common.Hash{}) || log.Topics[i] == topic {
+ if log.Topics[i] == topic {
match = true
break
}
}
-
if !match {
continue Logs
}
}
ret = append(ret, log)
}
-
return ret
}
-func (f *Filter) bloomFilter(bloom types.Bloom) bool {
- return bloomFilter(bloom, f.addresses, f.topics)
-}
-
func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]common.Hash) bool {
if len(addresses) > 0 {
var included bool
@@ -263,16 +270,15 @@ func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]commo
break
}
}
-
if !included {
return false
}
}
for _, sub := range topics {
- var included bool
+ included := len(sub) == 0 // empty rule set == wildcard
for _, topic := range sub {
- if (topic == common.Hash{}) || types.BloomLookup(bloom, topic) {
+ if types.BloomLookup(bloom, topic) {
included = true
break
}
@@ -281,6 +287,5 @@ func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]commo
return false
}
}
-
return true
}