aboutsummaryrefslogtreecommitdiffstats
path: root/eth/filters/filter.go
diff options
context:
space:
mode:
authorZsolt Felfoldi <zsfelfoldi@gmail.com>2017-08-19 03:52:20 +0800
committerPéter Szilágyi <peterke@gmail.com>2017-09-06 16:13:13 +0800
commit4ea4d2dc3473afd9d2eda6ef6b359accce1f0946 (patch)
treee651cfc2e3aa36083b333bf34dc3cccef2623f26 /eth/filters/filter.go
parent1e67378df879b1ce566f17dd95a3b126056254b5 (diff)
downloadgo-tangerine-4ea4d2dc3473afd9d2eda6ef6b359accce1f0946.tar
go-tangerine-4ea4d2dc3473afd9d2eda6ef6b359accce1f0946.tar.gz
go-tangerine-4ea4d2dc3473afd9d2eda6ef6b359accce1f0946.tar.bz2
go-tangerine-4ea4d2dc3473afd9d2eda6ef6b359accce1f0946.tar.lz
go-tangerine-4ea4d2dc3473afd9d2eda6ef6b359accce1f0946.tar.xz
go-tangerine-4ea4d2dc3473afd9d2eda6ef6b359accce1f0946.tar.zst
go-tangerine-4ea4d2dc3473afd9d2eda6ef6b359accce1f0946.zip
core, eth: add bloombit indexer, filter based on it
Diffstat (limited to 'eth/filters/filter.go')
-rw-r--r--eth/filters/filter.go232
1 files changed, 152 insertions, 80 deletions
diff --git a/eth/filters/filter.go b/eth/filters/filter.go
index f848bc6af..ea9ccf2f9 100644
--- a/eth/filters/filter.go
+++ b/eth/filters/filter.go
@@ -18,11 +18,14 @@ package filters
import (
"context"
- "math"
"math/big"
+ "sync"
+ "time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/bitutil"
"github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
@@ -34,58 +37,51 @@ type Backend interface {
EventMux() *event.TypeMux
HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
+ BloomBitsSections() uint64
+ BloomBitsConfig() BloomConfig
SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
+ GetBloomBits(ctx context.Context, bitIdx uint64, sectionIdxList []uint64) ([][]byte, error)
+}
+
+type BloomConfig struct {
+ SectionSize uint64
+ MaxRequestLen int
+ MaxRequestWait time.Duration
}
// Filter can be used to retrieve and filter logs.
type Filter struct {
- backend Backend
- useMipMap bool
+ backend Backend
+ bloomBitsConfig BloomConfig
db ethdb.Database
begin, end int64
addresses []common.Address
topics [][]common.Hash
+
+ decompress func([]byte, int) ([]byte, error)
+ matcher *bloombits.Matcher
}
// New creates a new filter which uses a bloom filter on blocks to figure out whether
// a particular block is interesting or not.
-// MipMaps allow past blocks to be searched much more efficiently, but are not available
-// to light clients.
-func New(backend Backend, useMipMap bool) *Filter {
+func New(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter {
return &Filter{
- backend: backend,
- useMipMap: useMipMap,
- db: backend.ChainDb(),
+ backend: backend,
+ begin: begin,
+ end: end,
+ addresses: addresses,
+ topics: topics,
+ bloomBitsConfig: backend.BloomBitsConfig(),
+ db: backend.ChainDb(),
+ matcher: bloombits.NewMatcher(backend.BloomBitsConfig().SectionSize, addresses, topics),
+ decompress: bitutil.DecompressBytes,
}
}
-// SetBeginBlock sets the earliest block for filtering.
-// -1 = latest block (i.e., the current block)
-// hash = particular hash from-to
-func (f *Filter) SetBeginBlock(begin int64) {
- f.begin = begin
-}
-
-// SetEndBlock sets the latest block for filtering.
-func (f *Filter) SetEndBlock(end int64) {
- f.end = end
-}
-
-// SetAddresses matches only logs that are generated from addresses that are included
-// in the given addresses.
-func (f *Filter) SetAddresses(addr []common.Address) {
- f.addresses = addr
-}
-
-// SetTopics matches only logs that have topics matching the given topics.
-func (f *Filter) SetTopics(topics [][]common.Hash) {
- f.topics = topics
-}
-
// FindOnce searches the blockchain for matching log entries, returning
// all matching entries from the first block that contains matches,
// updating the start point of the filter accordingly. If no results are
@@ -106,18 +102,9 @@ func (f *Filter) FindOnce(ctx context.Context) ([]*types.Log, error) {
endBlockNo = headBlockNumber
}
- // if no addresses are present we can't make use of fast search which
- // uses the mipmap bloom filters to check for fast inclusion and uses
- // higher range probability in order to ensure at least a false positive
- if !f.useMipMap || len(f.addresses) == 0 {
- logs, blockNumber, err := f.getLogs(ctx, beginBlockNo, endBlockNo)
- f.begin = int64(blockNumber + 1)
- return logs, err
- }
-
- logs, blockNumber := f.mipFind(beginBlockNo, endBlockNo, 0)
+ logs, blockNumber, err := f.getLogs(ctx, beginBlockNo, endBlockNo)
f.begin = int64(blockNumber + 1)
- return logs, nil
+ return logs, err
}
// Run filters logs with the current parameters set
@@ -131,43 +118,134 @@ func (f *Filter) Find(ctx context.Context) (logs []*types.Log, err error) {
}
}
-func (f *Filter) mipFind(start, end uint64, depth int) (logs []*types.Log, blockNumber uint64) {
- level := core.MIPMapLevels[depth]
- // normalise numerator so we can work in level specific batches and
- // work with the proper range checks
- for num := start / level * level; num <= end; num += level {
- // find addresses in bloom filters
- bloom := core.GetMipmapBloom(f.db, num, level)
- // Don't bother checking the first time through the loop - we're probably picking
- // up where a previous run left off.
- first := true
- for _, addr := range f.addresses {
- if first || bloom.TestBytes(addr[:]) {
- first = false
- // range check normalised values and make sure that
- // we're resolving the correct range instead of the
- // normalised values.
- start := uint64(math.Max(float64(num), float64(start)))
- end := uint64(math.Min(float64(num+level-1), float64(end)))
- if depth+1 == len(core.MIPMapLevels) {
- l, blockNumber, _ := f.getLogs(context.Background(), start, end)
- if len(l) > 0 {
- return l, blockNumber
+// nextRequest returns the next request to retrieve for the bloombits matcher
+func (f *Filter) nextRequest() (bloombits uint, sections []uint64) {
+ bloomIndex, ok := f.matcher.AllocSectionQueue()
+ if !ok {
+ return 0, nil
+ }
+ if f.bloomBitsConfig.MaxRequestWait > 0 &&
+ (f.bloomBitsConfig.MaxRequestLen <= 1 || // SectionCount is always greater than zero after a successful alloc
+ f.matcher.SectionCount(bloomIndex) < f.bloomBitsConfig.MaxRequestLen) {
+ time.Sleep(f.bloomBitsConfig.MaxRequestWait)
+ }
+ return bloomIndex, f.matcher.FetchSections(bloomIndex, f.bloomBitsConfig.MaxRequestLen)
+}
+
+// serveMatcher serves the bloombits matcher by fetching the requested vectors
+// through the filter backend
+func (f *Filter) serveMatcher(ctx context.Context, stop chan struct{}, wg *sync.WaitGroup) chan error {
+ errChn := make(chan error, 1)
+ wg.Add(10)
+ for i := 0; i < 10; i++ {
+ go func(i int) {
+ defer wg.Done()
+
+ for {
+ b, s := f.nextRequest()
+ if s == nil {
+ return
+ }
+ data, err := f.backend.GetBloomBits(ctx, uint64(b), s)
+ if err != nil {
+ select {
+ case errChn <- err:
+ case <-stop:
}
- } else {
- l, blockNumber := f.mipFind(start, end, depth+1)
- if len(l) > 0 {
- return l, blockNumber
+ return
+ }
+ decomp := make([][]byte, len(data))
+ for i, d := range data {
+ var err error
+ if decomp[i], err = f.decompress(d, int(f.bloomBitsConfig.SectionSize/8)); err != nil {
+ select {
+ case errChn <- err:
+ case <-stop:
+ }
+ return
}
}
+ f.matcher.Deliver(b, s, decomp)
}
- }
+ }(i)
}
- return nil, end
+ return errChn
+}
+
+// checkMatches checks if the receipts belonging to the given header contain any log events that
+// match the filter criteria. This function is called when the bloom filter signals a potential match.
+func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs []*types.Log, err error) {
+ // Get the logs of the block
+ receipts, err := f.backend.GetReceipts(ctx, header.Hash())
+ if err != nil {
+ return nil, err
+ }
+ var unfiltered []*types.Log
+ for _, receipt := range receipts {
+ unfiltered = append(unfiltered, ([]*types.Log)(receipt.Logs)...)
+ }
+ logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
+ if len(logs) > 0 {
+ return logs, nil
+ }
+ return nil, nil
}
func (f *Filter) getLogs(ctx context.Context, start, end uint64) (logs []*types.Log, blockNumber uint64, err error) {
+ haveBloomBitsBefore := f.backend.BloomBitsSections() * f.bloomBitsConfig.SectionSize
+ if haveBloomBitsBefore > start {
+ e := end
+ if haveBloomBitsBefore <= e {
+ e = haveBloomBitsBefore - 1
+ }
+
+ stop := make(chan struct{})
+ var wg sync.WaitGroup
+ matches := f.matcher.Start(start, e)
+ errChn := f.serveMatcher(ctx, stop, &wg)
+
+ defer func() {
+ f.matcher.Stop()
+ close(stop)
+ wg.Wait()
+ }()
+
+ loop:
+ for {
+ select {
+ case i, ok := <-matches:
+ if !ok {
+ break loop
+ }
+
+ blockNumber := rpc.BlockNumber(i)
+ header, err := f.backend.HeaderByNumber(ctx, blockNumber)
+ if header == nil || err != nil {
+ return logs, end, err
+ }
+
+ logs, err := f.checkMatches(ctx, header)
+ if err != nil {
+ return nil, end, err
+ }
+ if logs != nil {
+ return logs, i, nil
+ }
+ case err := <-errChn:
+ return logs, end, err
+ case <-ctx.Done():
+ return nil, end, ctx.Err()
+ }
+ }
+
+ if end < haveBloomBitsBefore {
+ return logs, end, nil
+ }
+ start = haveBloomBitsBefore
+ }
+
+ // search the rest with regular block-by-block bloom filtering
for i := start; i <= end; i++ {
blockNumber := rpc.BlockNumber(i)
header, err := f.backend.HeaderByNumber(ctx, blockNumber)
@@ -178,18 +256,12 @@ func (f *Filter) getLogs(ctx context.Context, start, end uint64) (logs []*types.
// Use bloom filtering to see if this block is interesting given the
// current parameters
if f.bloomFilter(header.Bloom) {
- // Get the logs of the block
- receipts, err := f.backend.GetReceipts(ctx, header.Hash())
+ logs, err := f.checkMatches(ctx, header)
if err != nil {
return nil, end, err
}
- var unfiltered []*types.Log
- for _, receipt := range receipts {
- unfiltered = append(unfiltered, ([]*types.Log)(receipt.Logs)...)
- }
- logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
- if len(logs) > 0 {
- return logs, uint64(blockNumber), nil
+ if logs != nil {
+ return logs, i, nil
}
}
}