aboutsummaryrefslogtreecommitdiffstats
path: root/eth/filters/filter.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/filters/filter.go')
-rw-r--r--eth/filters/filter.go270
1 files changed, 95 insertions, 175 deletions
diff --git a/eth/filters/filter.go b/eth/filters/filter.go
index ea9ccf2f9..3a2226f6b 100644
--- a/eth/filters/filter.go
+++ b/eth/filters/filter.go
@@ -19,11 +19,9 @@ package filters
import (
"context"
"math/big"
- "sync"
"time"
"github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/common/bitutil"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/types"
@@ -37,140 +35,143 @@ type Backend interface {
EventMux() *event.TypeMux
HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
- BloomBitsSections() uint64
- BloomBitsConfig() BloomConfig
+
SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
- GetBloomBits(ctx context.Context, bitIdx uint64, sectionIdxList []uint64) ([][]byte, error)
-}
-type BloomConfig struct {
- SectionSize uint64
- MaxRequestLen int
- MaxRequestWait time.Duration
+ BloomStatus() (uint64, uint64)
+ ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
}
// Filter can be used to retrieve and filter logs.
type Filter struct {
- backend Backend
- bloomBitsConfig BloomConfig
+ backend Backend
db ethdb.Database
begin, end int64
addresses []common.Address
topics [][]common.Hash
- decompress func([]byte, int) ([]byte, error)
- matcher *bloombits.Matcher
+ matcher *bloombits.Matcher
}
// New creates a new filter which uses a bloom filter on blocks to figure out whether
// a particular block is interesting or not.
func New(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter {
+ size, _ := backend.BloomStatus()
+
return &Filter{
- backend: backend,
- begin: begin,
- end: end,
- addresses: addresses,
- topics: topics,
- bloomBitsConfig: backend.BloomBitsConfig(),
- db: backend.ChainDb(),
- matcher: bloombits.NewMatcher(backend.BloomBitsConfig().SectionSize, addresses, topics),
- decompress: bitutil.DecompressBytes,
+ backend: backend,
+ begin: begin,
+ end: end,
+ addresses: addresses,
+ topics: topics,
+ db: backend.ChainDb(),
+ matcher: bloombits.NewMatcher(size, addresses, topics),
}
}
-// FindOnce searches the blockchain for matching log entries, returning
-// all matching entries from the first block that contains matches,
-// updating the start point of the filter accordingly. If no results are
-// found, a nil slice is returned.
-func (f *Filter) FindOnce(ctx context.Context) ([]*types.Log, error) {
- head, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
- if head == nil {
+// Logs searches the blockchain for matching log entries, returning all from the
+// first block that contains matches, updating the start of the filter accordingly.
+func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
+ // Figure out the limits of the filter range
+ header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
+ if header == nil {
return nil, nil
}
- headBlockNumber := head.Number.Uint64()
+ head := header.Number.Uint64()
- var beginBlockNo uint64 = uint64(f.begin)
if f.begin == -1 {
- beginBlockNo = headBlockNumber
+ f.begin = int64(head)
}
- var endBlockNo uint64 = uint64(f.end)
+ end := uint64(f.end)
if f.end == -1 {
- endBlockNo = headBlockNumber
+ end = head
}
-
- logs, blockNumber, err := f.getLogs(ctx, beginBlockNo, endBlockNo)
- f.begin = int64(blockNumber + 1)
- return logs, err
-}
-
-// Run filters logs with the current parameters set
-func (f *Filter) Find(ctx context.Context) (logs []*types.Log, err error) {
- for {
- newLogs, err := f.FindOnce(ctx)
- if len(newLogs) == 0 || err != nil {
+ // Gather all indexed logs, and finish with non indexed ones
+ var (
+ logs []*types.Log
+ err error
+ )
+ size, sections := f.backend.BloomStatus()
+ if indexed := sections * size; indexed > uint64(f.begin) {
+ if indexed > end {
+ logs, err = f.indexedLogs(ctx, end)
+ } else {
+ logs, err = f.indexedLogs(ctx, indexed-1)
+ }
+ if err != nil {
return logs, err
}
- logs = append(logs, newLogs...)
}
+ rest, err := f.unindexedLogs(ctx, end)
+ logs = append(logs, rest...)
+ return logs, err
}
-// nextRequest returns the next request to retrieve for the bloombits matcher
-func (f *Filter) nextRequest() (bloombits uint, sections []uint64) {
- bloomIndex, ok := f.matcher.AllocSectionQueue()
- if !ok {
- return 0, nil
+// indexedLogs returns the logs matching the filter criteria based on the bloom
+// bits indexed available locally or via the network.
+func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
+ // Create a matcher session and request servicing from the backend
+ matches := make(chan uint64, 64)
+
+ session, err := f.matcher.Start(uint64(f.begin), end, matches)
+ if err != nil {
+ return nil, err
}
- if f.bloomBitsConfig.MaxRequestWait > 0 &&
- (f.bloomBitsConfig.MaxRequestLen <= 1 || // SectionCount is always greater than zero after a successful alloc
- f.matcher.SectionCount(bloomIndex) < f.bloomBitsConfig.MaxRequestLen) {
- time.Sleep(f.bloomBitsConfig.MaxRequestWait)
+ defer session.Close(time.Second)
+
+ f.backend.ServiceFilter(ctx, session)
+
+ // Iterate over the matches until exhausted or context closed
+ var logs []*types.Log
+
+ for {
+ select {
+ case number, ok := <-matches:
+ // Abort if all matches have been fulfilled
+ if !ok {
+ f.begin = int64(end) + 1
+ return logs, nil
+ }
+ // Retrieve the suggested block and pull any truly matching logs
+ header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
+ if header == nil || err != nil {
+ return logs, err
+ }
+ found, err := f.checkMatches(ctx, header)
+ if err != nil {
+ return logs, err
+ }
+ logs = append(logs, found...)
+
+ case <-ctx.Done():
+ return logs, ctx.Err()
+ }
}
- return bloomIndex, f.matcher.FetchSections(bloomIndex, f.bloomBitsConfig.MaxRequestLen)
}
-// serveMatcher serves the bloombits matcher by fetching the requested vectors
-// through the filter backend
-func (f *Filter) serveMatcher(ctx context.Context, stop chan struct{}, wg *sync.WaitGroup) chan error {
- errChn := make(chan error, 1)
- wg.Add(10)
- for i := 0; i < 10; i++ {
- go func(i int) {
- defer wg.Done()
-
- for {
- b, s := f.nextRequest()
- if s == nil {
- return
- }
- data, err := f.backend.GetBloomBits(ctx, uint64(b), s)
- if err != nil {
- select {
- case errChn <- err:
- case <-stop:
- }
- return
- }
- decomp := make([][]byte, len(data))
- for i, d := range data {
- var err error
- if decomp[i], err = f.decompress(d, int(f.bloomBitsConfig.SectionSize/8)); err != nil {
- select {
- case errChn <- err:
- case <-stop:
- }
- return
- }
- }
- f.matcher.Deliver(b, s, decomp)
+// indexedLogs returns the logs matching the filter criteria based on raw block
+// iteration and bloom matching.
+func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
+ var logs []*types.Log
+
+ for ; f.begin <= int64(end); f.begin++ {
+ header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
+ if header == nil || err != nil {
+ return logs, err
+ }
+ if bloomFilter(header.Bloom, f.addresses, f.topics) {
+ found, err := f.checkMatches(ctx, header)
+ if err != nil {
+ return logs, err
}
- }(i)
+ logs = append(logs, found...)
+ }
}
-
- return errChn
+ return logs, nil
}
// checkMatches checks if the receipts belonging to the given header contain any log events that
@@ -192,83 +193,6 @@ func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs [
return nil, nil
}
-func (f *Filter) getLogs(ctx context.Context, start, end uint64) (logs []*types.Log, blockNumber uint64, err error) {
- haveBloomBitsBefore := f.backend.BloomBitsSections() * f.bloomBitsConfig.SectionSize
- if haveBloomBitsBefore > start {
- e := end
- if haveBloomBitsBefore <= e {
- e = haveBloomBitsBefore - 1
- }
-
- stop := make(chan struct{})
- var wg sync.WaitGroup
- matches := f.matcher.Start(start, e)
- errChn := f.serveMatcher(ctx, stop, &wg)
-
- defer func() {
- f.matcher.Stop()
- close(stop)
- wg.Wait()
- }()
-
- loop:
- for {
- select {
- case i, ok := <-matches:
- if !ok {
- break loop
- }
-
- blockNumber := rpc.BlockNumber(i)
- header, err := f.backend.HeaderByNumber(ctx, blockNumber)
- if header == nil || err != nil {
- return logs, end, err
- }
-
- logs, err := f.checkMatches(ctx, header)
- if err != nil {
- return nil, end, err
- }
- if logs != nil {
- return logs, i, nil
- }
- case err := <-errChn:
- return logs, end, err
- case <-ctx.Done():
- return nil, end, ctx.Err()
- }
- }
-
- if end < haveBloomBitsBefore {
- return logs, end, nil
- }
- start = haveBloomBitsBefore
- }
-
- // search the rest with regular block-by-block bloom filtering
- for i := start; i <= end; i++ {
- blockNumber := rpc.BlockNumber(i)
- header, err := f.backend.HeaderByNumber(ctx, blockNumber)
- if header == nil || err != nil {
- return logs, end, err
- }
-
- // Use bloom filtering to see if this block is interesting given the
- // current parameters
- if f.bloomFilter(header.Bloom) {
- logs, err := f.checkMatches(ctx, header)
- if err != nil {
- return nil, end, err
- }
- if logs != nil {
- return logs, i, nil
- }
- }
- }
-
- return logs, end, nil
-}
-
func includes(addresses []common.Address, a common.Address) bool {
for _, addr := range addresses {
if addr == a {
@@ -323,10 +247,6 @@ Logs:
return ret
}
-func (f *Filter) bloomFilter(bloom types.Bloom) bool {
- return bloomFilter(bloom, f.addresses, f.topics)
-}
-
func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]common.Hash) bool {
if len(addresses) > 0 {
var included bool