aboutsummaryrefslogtreecommitdiffstats
path: root/eth/filters
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2017-08-29 19:13:11 +0800
committerPéter Szilágyi <peterke@gmail.com>2017-09-06 16:14:19 +0800
commitf585f9eee8cb18423c23fe8b517b5b4cbe3b3755 (patch)
tree08c232ee58318c20f971cf8e3f5dfa09f1e2caf7 /eth/filters
parent4ea4d2dc3473afd9d2eda6ef6b359accce1f0946 (diff)
downloaddexon-f585f9eee8cb18423c23fe8b517b5b4cbe3b3755.tar
dexon-f585f9eee8cb18423c23fe8b517b5b4cbe3b3755.tar.gz
dexon-f585f9eee8cb18423c23fe8b517b5b4cbe3b3755.tar.bz2
dexon-f585f9eee8cb18423c23fe8b517b5b4cbe3b3755.tar.lz
dexon-f585f9eee8cb18423c23fe8b517b5b4cbe3b3755.tar.xz
dexon-f585f9eee8cb18423c23fe8b517b5b4cbe3b3755.tar.zst
dexon-f585f9eee8cb18423c23fe8b517b5b4cbe3b3755.zip
core, eth: clean up bloom filtering, add some tests
Diffstat (limited to 'eth/filters')
-rw-r--r--eth/filters/api.go49
-rw-r--r--eth/filters/bench_test.go82
-rw-r--r--eth/filters/filter.go270
-rw-r--r--eth/filters/filter_system_test.go60
-rw-r--r--eth/filters/filter_test.go18
5 files changed, 184 insertions, 295 deletions
diff --git a/eth/filters/api.go b/eth/filters/api.go
index 11767753e..6e1d48adb 100644
--- a/eth/filters/api.go
+++ b/eth/filters/api.go
@@ -51,27 +51,24 @@ type filter struct {
// PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
// information related to the Ethereum protocol such als blocks, transactions and logs.
type PublicFilterAPI struct {
- backend Backend
- bloomBitsSection uint64
- mux *event.TypeMux
- quit chan struct{}
- chainDb ethdb.Database
- events *EventSystem
- filtersMu sync.Mutex
- filters map[rpc.ID]*filter
+ backend Backend
+ mux *event.TypeMux
+ quit chan struct{}
+ chainDb ethdb.Database
+ events *EventSystem
+ filtersMu sync.Mutex
+ filters map[rpc.ID]*filter
}
// NewPublicFilterAPI returns a new PublicFilterAPI instance.
-func NewPublicFilterAPI(backend Backend, lightMode bool, bloomBitsSection uint64) *PublicFilterAPI {
+func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI {
api := &PublicFilterAPI{
- backend: backend,
- bloomBitsSection: bloomBitsSection,
- mux: backend.EventMux(),
- chainDb: backend.ChainDb(),
- events: NewEventSystem(backend.EventMux(), backend, lightMode),
- filters: make(map[rpc.ID]*filter),
+ backend: backend,
+ mux: backend.EventMux(),
+ chainDb: backend.ChainDb(),
+ events: NewEventSystem(backend.EventMux(), backend, lightMode),
+ filters: make(map[rpc.ID]*filter),
}
-
go api.timeoutLoop()
return api
@@ -326,16 +323,20 @@ func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) {
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs
func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*types.Log, error) {
+ // Convert the RPC block numbers into internal representations
if crit.FromBlock == nil {
crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
}
if crit.ToBlock == nil {
crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
}
-
+ // Create and run the filter to get all the logs
filter := New(api.backend, crit.FromBlock.Int64(), crit.ToBlock.Int64(), crit.Addresses, crit.Topics)
- logs, err := filter.Find(ctx)
+ logs, err := filter.Logs(ctx)
+ if err != nil {
+ return nil, err
+ }
return returnLogs(logs), err
}
@@ -369,20 +370,18 @@ func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*ty
return nil, fmt.Errorf("filter not found")
}
- var begin, end int64
+ begin := rpc.LatestBlockNumber.Int64()
if f.crit.FromBlock != nil {
begin = f.crit.FromBlock.Int64()
- } else {
- begin = rpc.LatestBlockNumber.Int64()
}
+ end := rpc.LatestBlockNumber.Int64()
if f.crit.ToBlock != nil {
end = f.crit.ToBlock.Int64()
- } else {
- end = rpc.LatestBlockNumber.Int64()
}
+ // Create and run the filter to get all the logs
filter := New(api.backend, begin, end, f.crit.Addresses, f.crit.Topics)
- logs, err := filter.Find(ctx)
+ logs, err := filter.Logs(ctx)
if err != nil {
return nil, err
}
@@ -390,7 +389,7 @@ func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*ty
}
// GetFilterChanges returns the logs for the filter with the given id since
-// last time is was called. This can be used for polling.
+// last time it was called. This can be used for polling.
//
// For pending transaction and block filters the result is []common.Hash.
// (pending)Log filters return []Log.
diff --git a/eth/filters/bench_test.go b/eth/filters/bench_test.go
index 2487bc0eb..d994378fc 100644
--- a/eth/filters/bench_test.go
+++ b/eth/filters/bench_test.go
@@ -31,82 +31,41 @@ import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/node"
- "github.com/golang/snappy"
)
func BenchmarkBloomBits512(b *testing.B) {
- benchmarkBloomBitsForSize(b, 512)
+ benchmarkBloomBits(b, 512)
}
func BenchmarkBloomBits1k(b *testing.B) {
- benchmarkBloomBitsForSize(b, 1024)
+ benchmarkBloomBits(b, 1024)
}
func BenchmarkBloomBits2k(b *testing.B) {
- benchmarkBloomBitsForSize(b, 2048)
+ benchmarkBloomBits(b, 2048)
}
func BenchmarkBloomBits4k(b *testing.B) {
- benchmarkBloomBitsForSize(b, 4096)
+ benchmarkBloomBits(b, 4096)
}
func BenchmarkBloomBits8k(b *testing.B) {
- benchmarkBloomBitsForSize(b, 8192)
+ benchmarkBloomBits(b, 8192)
}
func BenchmarkBloomBits16k(b *testing.B) {
- benchmarkBloomBitsForSize(b, 16384)
+ benchmarkBloomBits(b, 16384)
}
func BenchmarkBloomBits32k(b *testing.B) {
- benchmarkBloomBitsForSize(b, 32768)
-}
-
-func benchmarkBloomBitsForSize(b *testing.B, sectionSize uint64) {
- benchmarkBloomBits(b, sectionSize, 0)
- benchmarkBloomBits(b, sectionSize, 1)
- benchmarkBloomBits(b, sectionSize, 2)
+ benchmarkBloomBits(b, 32768)
}
const benchFilterCnt = 2000
-func benchmarkBloomBits(b *testing.B, sectionSize uint64, comp int) {
+func benchmarkBloomBits(b *testing.B, sectionSize uint64) {
benchDataDir := node.DefaultDataDir() + "/geth/chaindata"
- fmt.Println("Running bloombits benchmark section size:", sectionSize, " compression method:", comp)
-
- var (
- compressFn func([]byte) []byte
- decompressFn func([]byte, int) ([]byte, error)
- )
- switch comp {
- case 0:
- // no compression
- compressFn = func(data []byte) []byte {
- return data
- }
- decompressFn = func(data []byte, target int) ([]byte, error) {
- if len(data) != target {
- panic(nil)
- }
- return data, nil
- }
- case 1:
- // bitutil/compress.go
- compressFn = bitutil.CompressBytes
- decompressFn = bitutil.DecompressBytes
- case 2:
- // go snappy
- compressFn = func(data []byte) []byte {
- return snappy.Encode(nil, data)
- }
- decompressFn = func(data []byte, target int) ([]byte, error) {
- decomp, err := snappy.Decode(nil, data)
- if err != nil || len(decomp) != target {
- panic(err)
- }
- return decomp, nil
- }
- }
+ fmt.Println("Running bloombits benchmark section size:", sectionSize)
db, err := ethdb.NewLDBDatabase(benchDataDir, 128, 1024)
if err != nil {
@@ -128,7 +87,10 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64, comp int) {
cnt := (headNum - 512) / sectionSize
var dataSize, compSize uint64
for sectionIdx := uint64(0); sectionIdx < cnt; sectionIdx++ {
- bc := bloombits.NewBloomBitsCreator(sectionSize)
+ bc, err := bloombits.NewGenerator(uint(sectionSize))
+ if err != nil {
+ b.Fatalf("failed to create generator: %v", err)
+ }
var header *types.Header
for i := sectionIdx * sectionSize; i < (sectionIdx+1)*sectionSize; i++ {
hash := core.GetCanonicalHash(db, i)
@@ -136,15 +98,18 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64, comp int) {
if header == nil {
b.Fatalf("Error creating bloomBits data")
}
- bc.AddHeaderBloom(header.Bloom)
+ bc.AddBloom(header.Bloom)
}
sectionHead := core.GetCanonicalHash(db, (sectionIdx+1)*sectionSize-1)
- for i := 0; i < bloombits.BloomLength; i++ {
- data := bc.GetBitVector(uint(i))
- comp := compressFn(data)
+ for i := 0; i < types.BloomBitLength; i++ {
+ data, err := bc.Bitset(uint(i))
+ if err != nil {
+ b.Fatalf("failed to retrieve bitset: %v", err)
+ }
+ comp := bitutil.CompressBytes(data)
dataSize += uint64(len(data))
compSize += uint64(len(comp))
- core.StoreBloomBits(db, uint64(i), sectionIdx, sectionHead, comp)
+ core.WriteBloomBits(db, uint(i), sectionIdx, sectionHead, comp)
}
//if sectionIdx%50 == 0 {
// fmt.Println(" section", sectionIdx, "/", cnt)
@@ -171,8 +136,7 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64, comp int) {
addr[0] = byte(i)
addr[1] = byte(i / 256)
filter := New(backend, 0, int64(cnt*sectionSize-1), []common.Address{addr}, nil)
- filter.decompress = decompressFn
- if _, err := filter.Find(context.Background()); err != nil {
+ if _, err := filter.Logs(context.Background()); err != nil {
b.Error("filter.Find error:", err)
}
}
@@ -229,7 +193,7 @@ func BenchmarkNoBloomBits(b *testing.B) {
mux := new(event.TypeMux)
backend := &testBackend{mux, db, 0, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
filter := New(backend, 0, int64(headNum), []common.Address{common.Address{}}, nil)
- filter.Find(context.Background())
+ filter.Logs(context.Background())
d := time.Since(start)
fmt.Println("Finished running filter benchmarks")
fmt.Println(" ", d, "total ", d*time.Duration(1000000)/time.Duration(headNum+1), "per million blocks")
diff --git a/eth/filters/filter.go b/eth/filters/filter.go
index ea9ccf2f9..3a2226f6b 100644
--- a/eth/filters/filter.go
+++ b/eth/filters/filter.go
@@ -19,11 +19,9 @@ package filters
import (
"context"
"math/big"
- "sync"
"time"
"github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/common/bitutil"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/types"
@@ -37,140 +35,143 @@ type Backend interface {
EventMux() *event.TypeMux
HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
- BloomBitsSections() uint64
- BloomBitsConfig() BloomConfig
+
SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
- GetBloomBits(ctx context.Context, bitIdx uint64, sectionIdxList []uint64) ([][]byte, error)
-}
-type BloomConfig struct {
- SectionSize uint64
- MaxRequestLen int
- MaxRequestWait time.Duration
+ BloomStatus() (uint64, uint64)
+ ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
}
// Filter can be used to retrieve and filter logs.
type Filter struct {
- backend Backend
- bloomBitsConfig BloomConfig
+ backend Backend
db ethdb.Database
begin, end int64
addresses []common.Address
topics [][]common.Hash
- decompress func([]byte, int) ([]byte, error)
- matcher *bloombits.Matcher
+ matcher *bloombits.Matcher
}
// New creates a new filter which uses a bloom filter on blocks to figure out whether
// a particular block is interesting or not.
func New(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter {
+ size, _ := backend.BloomStatus()
+
return &Filter{
- backend: backend,
- begin: begin,
- end: end,
- addresses: addresses,
- topics: topics,
- bloomBitsConfig: backend.BloomBitsConfig(),
- db: backend.ChainDb(),
- matcher: bloombits.NewMatcher(backend.BloomBitsConfig().SectionSize, addresses, topics),
- decompress: bitutil.DecompressBytes,
+ backend: backend,
+ begin: begin,
+ end: end,
+ addresses: addresses,
+ topics: topics,
+ db: backend.ChainDb(),
+ matcher: bloombits.NewMatcher(size, addresses, topics),
}
}
-// FindOnce searches the blockchain for matching log entries, returning
-// all matching entries from the first block that contains matches,
-// updating the start point of the filter accordingly. If no results are
-// found, a nil slice is returned.
-func (f *Filter) FindOnce(ctx context.Context) ([]*types.Log, error) {
- head, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
- if head == nil {
+// Logs searches the blockchain for matching log entries, returning all from the
+// first block that contains matches, updating the start of the filter accordingly.
+func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
+ // Figure out the limits of the filter range
+ header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
+ if header == nil {
return nil, nil
}
- headBlockNumber := head.Number.Uint64()
+ head := header.Number.Uint64()
- var beginBlockNo uint64 = uint64(f.begin)
if f.begin == -1 {
- beginBlockNo = headBlockNumber
+ f.begin = int64(head)
}
- var endBlockNo uint64 = uint64(f.end)
+ end := uint64(f.end)
if f.end == -1 {
- endBlockNo = headBlockNumber
+ end = head
}
-
- logs, blockNumber, err := f.getLogs(ctx, beginBlockNo, endBlockNo)
- f.begin = int64(blockNumber + 1)
- return logs, err
-}
-
-// Run filters logs with the current parameters set
-func (f *Filter) Find(ctx context.Context) (logs []*types.Log, err error) {
- for {
- newLogs, err := f.FindOnce(ctx)
- if len(newLogs) == 0 || err != nil {
+ // Gather all indexed logs, and finish with non indexed ones
+ var (
+ logs []*types.Log
+ err error
+ )
+ size, sections := f.backend.BloomStatus()
+ if indexed := sections * size; indexed > uint64(f.begin) {
+ if indexed > end {
+ logs, err = f.indexedLogs(ctx, end)
+ } else {
+ logs, err = f.indexedLogs(ctx, indexed-1)
+ }
+ if err != nil {
return logs, err
}
- logs = append(logs, newLogs...)
}
+ rest, err := f.unindexedLogs(ctx, end)
+ logs = append(logs, rest...)
+ return logs, err
}
-// nextRequest returns the next request to retrieve for the bloombits matcher
-func (f *Filter) nextRequest() (bloombits uint, sections []uint64) {
- bloomIndex, ok := f.matcher.AllocSectionQueue()
- if !ok {
- return 0, nil
+// indexedLogs returns the logs matching the filter criteria based on the bloom
+// bits indexed available locally or via the network.
+func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
+ // Create a matcher session and request servicing from the backend
+ matches := make(chan uint64, 64)
+
+ session, err := f.matcher.Start(uint64(f.begin), end, matches)
+ if err != nil {
+ return nil, err
}
- if f.bloomBitsConfig.MaxRequestWait > 0 &&
- (f.bloomBitsConfig.MaxRequestLen <= 1 || // SectionCount is always greater than zero after a successful alloc
- f.matcher.SectionCount(bloomIndex) < f.bloomBitsConfig.MaxRequestLen) {
- time.Sleep(f.bloomBitsConfig.MaxRequestWait)
+ defer session.Close(time.Second)
+
+ f.backend.ServiceFilter(ctx, session)
+
+ // Iterate over the matches until exhausted or context closed
+ var logs []*types.Log
+
+ for {
+ select {
+ case number, ok := <-matches:
+ // Abort if all matches have been fulfilled
+ if !ok {
+ f.begin = int64(end) + 1
+ return logs, nil
+ }
+ // Retrieve the suggested block and pull any truly matching logs
+ header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
+ if header == nil || err != nil {
+ return logs, err
+ }
+ found, err := f.checkMatches(ctx, header)
+ if err != nil {
+ return logs, err
+ }
+ logs = append(logs, found...)
+
+ case <-ctx.Done():
+ return logs, ctx.Err()
+ }
}
- return bloomIndex, f.matcher.FetchSections(bloomIndex, f.bloomBitsConfig.MaxRequestLen)
}
-// serveMatcher serves the bloombits matcher by fetching the requested vectors
-// through the filter backend
-func (f *Filter) serveMatcher(ctx context.Context, stop chan struct{}, wg *sync.WaitGroup) chan error {
- errChn := make(chan error, 1)
- wg.Add(10)
- for i := 0; i < 10; i++ {
- go func(i int) {
- defer wg.Done()
-
- for {
- b, s := f.nextRequest()
- if s == nil {
- return
- }
- data, err := f.backend.GetBloomBits(ctx, uint64(b), s)
- if err != nil {
- select {
- case errChn <- err:
- case <-stop:
- }
- return
- }
- decomp := make([][]byte, len(data))
- for i, d := range data {
- var err error
- if decomp[i], err = f.decompress(d, int(f.bloomBitsConfig.SectionSize/8)); err != nil {
- select {
- case errChn <- err:
- case <-stop:
- }
- return
- }
- }
- f.matcher.Deliver(b, s, decomp)
+// indexedLogs returns the logs matching the filter criteria based on raw block
+// iteration and bloom matching.
+func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
+ var logs []*types.Log
+
+ for ; f.begin <= int64(end); f.begin++ {
+ header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
+ if header == nil || err != nil {
+ return logs, err
+ }
+ if bloomFilter(header.Bloom, f.addresses, f.topics) {
+ found, err := f.checkMatches(ctx, header)
+ if err != nil {
+ return logs, err
}
- }(i)
+ logs = append(logs, found...)
+ }
}
-
- return errChn
+ return logs, nil
}
// checkMatches checks if the receipts belonging to the given header contain any log events that
@@ -192,83 +193,6 @@ func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs [
return nil, nil
}
-func (f *Filter) getLogs(ctx context.Context, start, end uint64) (logs []*types.Log, blockNumber uint64, err error) {
- haveBloomBitsBefore := f.backend.BloomBitsSections() * f.bloomBitsConfig.SectionSize
- if haveBloomBitsBefore > start {
- e := end
- if haveBloomBitsBefore <= e {
- e = haveBloomBitsBefore - 1
- }
-
- stop := make(chan struct{})
- var wg sync.WaitGroup
- matches := f.matcher.Start(start, e)
- errChn := f.serveMatcher(ctx, stop, &wg)
-
- defer func() {
- f.matcher.Stop()
- close(stop)
- wg.Wait()
- }()
-
- loop:
- for {
- select {
- case i, ok := <-matches:
- if !ok {
- break loop
- }
-
- blockNumber := rpc.BlockNumber(i)
- header, err := f.backend.HeaderByNumber(ctx, blockNumber)
- if header == nil || err != nil {
- return logs, end, err
- }
-
- logs, err := f.checkMatches(ctx, header)
- if err != nil {
- return nil, end, err
- }
- if logs != nil {
- return logs, i, nil
- }
- case err := <-errChn:
- return logs, end, err
- case <-ctx.Done():
- return nil, end, ctx.Err()
- }
- }
-
- if end < haveBloomBitsBefore {
- return logs, end, nil
- }
- start = haveBloomBitsBefore
- }
-
- // search the rest with regular block-by-block bloom filtering
- for i := start; i <= end; i++ {
- blockNumber := rpc.BlockNumber(i)
- header, err := f.backend.HeaderByNumber(ctx, blockNumber)
- if header == nil || err != nil {
- return logs, end, err
- }
-
- // Use bloom filtering to see if this block is interesting given the
- // current parameters
- if f.bloomFilter(header.Bloom) {
- logs, err := f.checkMatches(ctx, header)
- if err != nil {
- return nil, end, err
- }
- if logs != nil {
- return logs, i, nil
- }
- }
- }
-
- return logs, end, nil
-}
-
func includes(addresses []common.Address, a common.Address) bool {
for _, addr := range addresses {
if addr == a {
@@ -323,10 +247,6 @@ Logs:
return ret
}
-func (f *Filter) bloomFilter(bloom types.Bloom) bool {
- return bloomFilter(bloom, f.addresses, f.topics)
-}
-
func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]common.Hash) bool {
if len(addresses) > 0 {
var included bool
diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go
index 140fad555..664ce07a5 100644
--- a/eth/filters/filter_system_test.go
+++ b/eth/filters/filter_system_test.go
@@ -20,12 +20,14 @@ import (
"context"
"fmt"
"math/big"
+ "math/rand"
"reflect"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
@@ -85,29 +87,35 @@ func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subsc
return b.chainFeed.Subscribe(ch)
}
-func (b *testBackend) GetBloomBits(ctx context.Context, bitIdx uint64, sectionIdxList []uint64) ([][]byte, error) {
- results := make([][]byte, len(sectionIdxList))
- var err error
- for i, sectionIdx := range sectionIdxList {
- sectionHead := core.GetCanonicalHash(b.db, (sectionIdx+1)*testBloomBitsSection-1)
- results[i], err = core.GetBloomBits(b.db, bitIdx, sectionIdx, sectionHead)
- if err != nil {
- return nil, err
- }
- }
- return results, nil
+func (b *testBackend) BloomStatus() (uint64, uint64) {
+ return params.BloomBitsBlocks, b.sections
}
-func (b *testBackend) BloomBitsSections() uint64 {
- return b.sections
-}
+func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
+ requests := make(chan chan *bloombits.Retrieval)
-func (b *testBackend) BloomBitsConfig() BloomConfig {
- return BloomConfig{
- SectionSize: testBloomBitsSection,
- MaxRequestLen: 16,
- MaxRequestWait: 0,
- }
+ go session.Multiplex(16, 0, requests)
+ go func() {
+ for {
+ // Wait for a service request or a shutdown
+ select {
+ case <-ctx.Done():
+ return
+
+ case request := <-requests:
+ task := <-request
+
+ task.Bitsets = make([][]byte, len(task.Sections))
+ for i, section := range task.Sections {
+ if rand.Int()%4 != 0 { // Handle occasional missing deliveries
+ head := core.GetCanonicalHash(b.db, (section+1)*params.BloomBitsBlocks-1)
+ task.Bitsets[i] = core.GetBloomBits(b.db, task.Bit, section, head)
+ }
+ }
+ request <- task
+ }
+ }
+ }()
}
// TestBlockSubscription tests if a block subscription returns block hashes for posted chain events.
@@ -126,7 +134,7 @@ func TestBlockSubscription(t *testing.T) {
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
- api = NewPublicFilterAPI(backend, false, 0)
+ api = NewPublicFilterAPI(backend, false)
genesis = new(core.Genesis).MustCommit(db)
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, db, 10, func(i int, gen *core.BlockGen) {})
chainEvents = []core.ChainEvent{}
@@ -183,7 +191,7 @@ func TestPendingTxFilter(t *testing.T) {
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
- api = NewPublicFilterAPI(backend, false, 0)
+ api = NewPublicFilterAPI(backend, false)
transactions = []*types.Transaction{
types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil),
@@ -246,7 +254,7 @@ func TestLogFilterCreation(t *testing.T) {
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
- api = NewPublicFilterAPI(backend, false, 0)
+ api = NewPublicFilterAPI(backend, false)
testCases = []struct {
crit FilterCriteria
@@ -295,7 +303,7 @@ func TestInvalidLogFilterCreation(t *testing.T) {
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
- api = NewPublicFilterAPI(backend, false, 0)
+ api = NewPublicFilterAPI(backend, false)
)
// different situations where log filter creation should fail.
@@ -325,7 +333,7 @@ func TestLogFilter(t *testing.T) {
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
- api = NewPublicFilterAPI(backend, false, 0)
+ api = NewPublicFilterAPI(backend, false)
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
@@ -442,7 +450,7 @@ func TestPendingLogsSubscription(t *testing.T) {
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
- api = NewPublicFilterAPI(backend, false, 0)
+ api = NewPublicFilterAPI(backend, false)
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go
index f1c6481d7..11235e95a 100644
--- a/eth/filters/filter_test.go
+++ b/eth/filters/filter_test.go
@@ -32,8 +32,6 @@ import (
"github.com/ethereum/go-ethereum/params"
)
-const testBloomBitsSection = 4096
-
func makeReceipt(addr common.Address) *types.Receipt {
receipt := types.NewReceipt(nil, false, new(big.Int))
receipt.Logs = []*types.Log{
@@ -101,7 +99,7 @@ func BenchmarkFilters(b *testing.B) {
filter := New(backend, 0, -1, []common.Address{addr1, addr2, addr3, addr4}, nil)
for i := 0; i < b.N; i++ {
- logs, _ := filter.Find(context.Background())
+ logs, _ := filter.Logs(context.Background())
if len(logs) != 4 {
b.Fatal("expected 4 logs, got", len(logs))
}
@@ -189,13 +187,13 @@ func TestFilters(t *testing.T) {
filter := New(backend, 0, -1, []common.Address{addr}, [][]common.Hash{{hash1, hash2, hash3, hash4}})
- logs, _ := filter.Find(context.Background())
+ logs, _ := filter.Logs(context.Background())
if len(logs) != 4 {
t.Error("expected 4 log, got", len(logs))
}
filter = New(backend, 900, 999, []common.Address{addr}, [][]common.Hash{{hash3}})
- logs, _ = filter.Find(context.Background())
+ logs, _ = filter.Logs(context.Background())
if len(logs) != 1 {
t.Error("expected 1 log, got", len(logs))
}
@@ -204,7 +202,7 @@ func TestFilters(t *testing.T) {
}
filter = New(backend, 990, -1, []common.Address{addr}, [][]common.Hash{{hash3}})
- logs, _ = filter.Find(context.Background())
+ logs, _ = filter.Logs(context.Background())
if len(logs) != 1 {
t.Error("expected 1 log, got", len(logs))
}
@@ -214,7 +212,7 @@ func TestFilters(t *testing.T) {
filter = New(backend, 1, 10, nil, [][]common.Hash{{hash1, hash2}})
- logs, _ = filter.Find(context.Background())
+ logs, _ = filter.Logs(context.Background())
if len(logs) != 2 {
t.Error("expected 2 log, got", len(logs))
}
@@ -222,7 +220,7 @@ func TestFilters(t *testing.T) {
failHash := common.BytesToHash([]byte("fail"))
filter = New(backend, 0, -1, nil, [][]common.Hash{{failHash}})
- logs, _ = filter.Find(context.Background())
+ logs, _ = filter.Logs(context.Background())
if len(logs) != 0 {
t.Error("expected 0 log, got", len(logs))
}
@@ -230,14 +228,14 @@ func TestFilters(t *testing.T) {
failAddr := common.BytesToAddress([]byte("failmenow"))
filter = New(backend, 0, -1, []common.Address{failAddr}, nil)
- logs, _ = filter.Find(context.Background())
+ logs, _ = filter.Logs(context.Background())
if len(logs) != 0 {
t.Error("expected 0 log, got", len(logs))
}
filter = New(backend, 0, -1, nil, [][]common.Hash{{failHash}, {hash1}})
- logs, _ = filter.Find(context.Background())
+ logs, _ = filter.Logs(context.Background())
if len(logs) != 0 {
t.Error("expected 0 log, got", len(logs))
}