aboutsummaryrefslogtreecommitdiffstats
path: root/eth
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2017-09-06 18:00:35 +0800
committerGitHub <noreply@github.com>2017-09-06 18:00:35 +0800
commitc4d21bc8e548d909d5b2940cb00d3d068f5697ec (patch)
tree1747315b32a6c9d9158fb9f36cfbd2277672ddfb /eth
parent160add8570ea1653b46cc1f7c21cb6eb69220f7d (diff)
parent564c8f3ae6f80d039ef27479e5ad15145f488710 (diff)
downloadgo-tangerine-c4d21bc8e548d909d5b2940cb00d3d068f5697ec.tar
go-tangerine-c4d21bc8e548d909d5b2940cb00d3d068f5697ec.tar.gz
go-tangerine-c4d21bc8e548d909d5b2940cb00d3d068f5697ec.tar.bz2
go-tangerine-c4d21bc8e548d909d5b2940cb00d3d068f5697ec.tar.lz
go-tangerine-c4d21bc8e548d909d5b2940cb00d3d068f5697ec.tar.xz
go-tangerine-c4d21bc8e548d909d5b2940cb00d3d068f5697ec.tar.zst
go-tangerine-c4d21bc8e548d909d5b2940cb00d3d068f5697ec.zip
Merge pull request #14631 from zsfelfoldi/bloombits2
core/bloombits, eth/filter: transformed bloom bitmap based log search
Diffstat (limited to 'eth')
-rw-r--r--eth/api_backend.go12
-rw-r--r--eth/backend.go19
-rw-r--r--eth/backend_test.go74
-rw-r--r--eth/bloombits.go142
-rw-r--r--eth/db_upgrade.go44
-rw-r--r--eth/filters/api.go47
-rw-r--r--eth/filters/bench_test.go201
-rw-r--r--eth/filters/filter.go247
-rw-r--r--eth/filters/filter_system_test.go46
-rw-r--r--eth/filters/filter_test.go84
10 files changed, 579 insertions, 337 deletions
diff --git a/eth/api_backend.go b/eth/api_backend.go
index 19ef79f23..91f392f94 100644
--- a/eth/api_backend.go
+++ b/eth/api_backend.go
@@ -24,6 +24,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
@@ -194,3 +195,14 @@ func (b *EthApiBackend) EventMux() *event.TypeMux {
func (b *EthApiBackend) AccountManager() *accounts.Manager {
return b.eth.AccountManager()
}
+
+func (b *EthApiBackend) BloomStatus() (uint64, uint64) {
+ sections, _, _ := b.eth.bloomIndexer.Sections()
+ return params.BloomBitsBlocks, sections
+}
+
+func (b *EthApiBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
+ for i := 0; i < bloomFilterThreads; i++ {
+ go session.Multiplex(bloomRetrievalBatch, bloomRetrievalWait, b.eth.bloomRequests)
+ }
+}
diff --git a/eth/backend.go b/eth/backend.go
index 5837c8564..99a1928fe 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -32,6 +32,7 @@ import (
"github.com/ethereum/go-ethereum/consensus/clique"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/eth/downloader"
@@ -77,6 +78,9 @@ type Ethereum struct {
engine consensus.Engine
accountManager *accounts.Manager
+ bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
+ bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports
+
ApiBackend *EthApiBackend
miner *miner.Miner
@@ -125,11 +129,10 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
networkId: config.NetworkId,
gasPrice: config.GasPrice,
etherbase: config.Etherbase,
+ bloomRequests: make(chan chan *bloombits.Retrieval),
+ bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks),
}
- if err := addMipmapBloomBins(chainDb); err != nil {
- return nil, err
- }
log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId)
if !config.SkipBcVersionCheck {
@@ -151,6 +154,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
eth.blockchain.SetHead(compat.RewindTo)
core.WriteChainConfig(chainDb, genesisHash, chainConfig)
}
+ eth.bloomIndexer.Start(eth.blockchain.CurrentHeader(), eth.blockchain.SubscribeChainEvent)
if config.TxPool.Journal != "" {
config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
@@ -358,14 +362,17 @@ func (s *Ethereum) Downloader() *downloader.Downloader { return s.protocolManage
func (s *Ethereum) Protocols() []p2p.Protocol {
if s.lesServer == nil {
return s.protocolManager.SubProtocols
- } else {
- return append(s.protocolManager.SubProtocols, s.lesServer.Protocols()...)
}
+ return append(s.protocolManager.SubProtocols, s.lesServer.Protocols()...)
}
// Start implements node.Service, starting all internal goroutines needed by the
// Ethereum protocol implementation.
func (s *Ethereum) Start(srvr *p2p.Server) error {
+ // Start the bloom bits servicing goroutines
+ s.startBloomHandlers()
+
+ // Start the RPC service
s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion())
// Figure out a max peers count based on the server limits
@@ -376,6 +383,7 @@ func (s *Ethereum) Start(srvr *p2p.Server) error {
maxPeers = srvr.MaxPeers / 2
}
}
+ // Start the networking layer and the light server if requested
s.protocolManager.Start(maxPeers)
if s.lesServer != nil {
s.lesServer.Start(srvr)
@@ -389,6 +397,7 @@ func (s *Ethereum) Stop() error {
if s.stopDbUpgrade != nil {
s.stopDbUpgrade()
}
+ s.bloomIndexer.Close()
s.blockchain.Stop()
s.protocolManager.Stop()
if s.lesServer != nil {
diff --git a/eth/backend_test.go b/eth/backend_test.go
deleted file mode 100644
index 1fd25e95a..000000000
--- a/eth/backend_test.go
+++ /dev/null
@@ -1,74 +0,0 @@
-// Copyright 2015 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package eth
-
-import (
- "math/big"
- "testing"
-
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/core"
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/ethdb"
- "github.com/ethereum/go-ethereum/params"
-)
-
-func TestMipmapUpgrade(t *testing.T) {
- db, _ := ethdb.NewMemDatabase()
- addr := common.BytesToAddress([]byte("jeff"))
- genesis := new(core.Genesis).MustCommit(db)
-
- chain, receipts := core.GenerateChain(params.TestChainConfig, genesis, db, 10, func(i int, gen *core.BlockGen) {
- switch i {
- case 1:
- receipt := types.NewReceipt(nil, false, new(big.Int))
- receipt.Logs = []*types.Log{{Address: addr}}
- gen.AddUncheckedReceipt(receipt)
- case 2:
- receipt := types.NewReceipt(nil, false, new(big.Int))
- receipt.Logs = []*types.Log{{Address: addr}}
- gen.AddUncheckedReceipt(receipt)
- }
- })
- for i, block := range chain {
- core.WriteBlock(db, block)
- if err := core.WriteCanonicalHash(db, block.Hash(), block.NumberU64()); err != nil {
- t.Fatalf("failed to insert block number: %v", err)
- }
- if err := core.WriteHeadBlockHash(db, block.Hash()); err != nil {
- t.Fatalf("failed to insert block number: %v", err)
- }
- if err := core.WriteBlockReceipts(db, block.Hash(), block.NumberU64(), receipts[i]); err != nil {
- t.Fatal("error writing block receipts:", err)
- }
- }
-
- err := addMipmapBloomBins(db)
- if err != nil {
- t.Fatal(err)
- }
-
- bloom := core.GetMipmapBloom(db, 1, core.MIPMapLevels[0])
- if (bloom == types.Bloom{}) {
- t.Error("got empty bloom filter")
- }
-
- data, _ := db.Get([]byte("setting-mipmap-version"))
- if len(data) == 0 {
- t.Error("setting-mipmap-version not written to database")
- }
-}
diff --git a/eth/bloombits.go b/eth/bloombits.go
new file mode 100644
index 000000000..32f6c7b31
--- /dev/null
+++ b/eth/bloombits.go
@@ -0,0 +1,142 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package eth
+
+import (
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/bitutil"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/params"
+)
+
+const (
+ // bloomServiceThreads is the number of goroutines used globally by an Ethereum
+ // instance to service bloombits lookups for all running filters.
+ bloomServiceThreads = 16
+
+ // bloomFilterThreads is the number of goroutines used locally per filter to
+ // multiplex requests onto the global servicing goroutines.
+ bloomFilterThreads = 3
+
+ // bloomRetrievalBatch is the maximum number of bloom bit retrievals to service
+ // in a single batch.
+ bloomRetrievalBatch = 16
+
+ // bloomRetrievalWait is the maximum time to wait for enough bloom bit requests
+ // to accumulate request an entire batch (avoiding hysteresis).
+ bloomRetrievalWait = time.Duration(0)
+)
+
+// startBloomHandlers starts a batch of goroutines to accept bloom bit database
+// retrievals from possibly a range of filters and serving the data to satisfy.
+func (eth *Ethereum) startBloomHandlers() {
+ for i := 0; i < bloomServiceThreads; i++ {
+ go func() {
+ for {
+ select {
+ case <-eth.shutdownChan:
+ return
+
+ case request := <-eth.bloomRequests:
+ task := <-request
+
+ task.Bitsets = make([][]byte, len(task.Sections))
+ for i, section := range task.Sections {
+ head := core.GetCanonicalHash(eth.chainDb, (section+1)*params.BloomBitsBlocks-1)
+ blob, err := bitutil.DecompressBytes(core.GetBloomBits(eth.chainDb, task.Bit, section, head), int(params.BloomBitsBlocks)/8)
+ if err != nil {
+ panic(err)
+ }
+ task.Bitsets[i] = blob
+ }
+ request <- task
+ }
+ }
+ }()
+ }
+}
+
+const (
+ // bloomConfirms is the number of confirmation blocks before a bloom section is
+ // considered probably final and its rotated bits are calculated.
+ bloomConfirms = 256
+
+ // bloomThrottling is the time to wait between processing two consecutive index
+ // sections. It's useful during chain upgrades to prevent disk overload.
+ bloomThrottling = 100 * time.Millisecond
+)
+
+// BloomIndexer implements a core.ChainIndexer, building up a rotated bloom bits index
+// for the Ethereum header bloom filters, permitting blazing fast filtering.
+type BloomIndexer struct {
+ size uint64 // section size to generate bloombits for
+
+ db ethdb.Database // database instance to write index data and metadata into
+ gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index
+
+ section uint64 // Section is the section number being processed currently
+ head common.Hash // Head is the hash of the last header processed
+}
+
+// NewBloomIndexer returns a chain indexer that generates bloom bits data for the
+// canonical chain for fast logs filtering.
+func NewBloomIndexer(db ethdb.Database, size uint64) *core.ChainIndexer {
+ backend := &BloomIndexer{
+ db: db,
+ size: size,
+ }
+ table := ethdb.NewTable(db, string(core.BloomBitsIndexPrefix))
+
+ return core.NewChainIndexer(db, table, backend, size, bloomConfirms, bloomThrottling, "bloombits")
+}
+
+// Reset implements core.ChainIndexerBackend, starting a new bloombits index
+// section.
+func (b *BloomIndexer) Reset(section uint64) {
+ gen, err := bloombits.NewGenerator(uint(b.size))
+ if err != nil {
+ panic(err)
+ }
+ b.gen, b.section, b.head = gen, section, common.Hash{}
+}
+
+// Process implements core.ChainIndexerBackend, adding a new header's bloom into
+// the index.
+func (b *BloomIndexer) Process(header *types.Header) {
+ b.gen.AddBloom(uint(header.Number.Uint64()-b.section*b.size), header.Bloom)
+ b.head = header.Hash()
+}
+
+// Commit implements core.ChainIndexerBackend, finalizing the bloom section and
+// writing it out into the database.
+func (b *BloomIndexer) Commit() error {
+ batch := b.db.NewBatch()
+
+ for i := 0; i < types.BloomBitLength; i++ {
+ bits, err := b.gen.Bitset(uint(i))
+ if err != nil {
+ return err
+ }
+ core.WriteBloomBits(batch, uint(i), b.section, b.head, bitutil.CompressBytes(bits))
+ }
+ return batch.Write()
+}
diff --git a/eth/db_upgrade.go b/eth/db_upgrade.go
index 90111b2b3..d41afa17c 100644
--- a/eth/db_upgrade.go
+++ b/eth/db_upgrade.go
@@ -19,7 +19,6 @@ package eth
import (
"bytes"
- "fmt"
"time"
"github.com/ethereum/go-ethereum/common"
@@ -134,46 +133,3 @@ func upgradeDeduplicateData(db ethdb.Database) func() error {
return <-errc
}
}
-
-func addMipmapBloomBins(db ethdb.Database) (err error) {
- const mipmapVersion uint = 2
-
- // check if the version is set. We ignore data for now since there's
- // only one version so we can easily ignore it for now
- var data []byte
- data, _ = db.Get([]byte("setting-mipmap-version"))
- if len(data) > 0 {
- var version uint
- if err := rlp.DecodeBytes(data, &version); err == nil && version == mipmapVersion {
- return nil
- }
- }
-
- defer func() {
- if err == nil {
- var val []byte
- val, err = rlp.EncodeToBytes(mipmapVersion)
- if err == nil {
- err = db.Put([]byte("setting-mipmap-version"), val)
- }
- return
- }
- }()
- latestHash := core.GetHeadBlockHash(db)
- latestBlock := core.GetBlock(db, latestHash, core.GetBlockNumber(db, latestHash))
- if latestBlock == nil { // clean database
- return
- }
-
- tstart := time.Now()
- log.Warn("Upgrading db log bloom bins")
- for i := uint64(0); i <= latestBlock.NumberU64(); i++ {
- hash := core.GetCanonicalHash(db, i)
- if (hash == common.Hash{}) {
- return fmt.Errorf("chain db corrupted. Could not find block %d.", i)
- }
- core.WriteMipmapBloom(db, i, core.GetBlockReceipts(db, hash, i))
- }
- log.Info("Bloom-bin upgrade completed", "elapsed", common.PrettyDuration(time.Since(tstart)))
- return nil
-}
diff --git a/eth/filters/api.go b/eth/filters/api.go
index fff58a268..6e1d48adb 100644
--- a/eth/filters/api.go
+++ b/eth/filters/api.go
@@ -52,8 +52,8 @@ type filter struct {
// information related to the Ethereum protocol such als blocks, transactions and logs.
type PublicFilterAPI struct {
backend Backend
- useMipMap bool
mux *event.TypeMux
+ quit chan struct{}
chainDb ethdb.Database
events *EventSystem
filtersMu sync.Mutex
@@ -63,14 +63,12 @@ type PublicFilterAPI struct {
// NewPublicFilterAPI returns a new PublicFilterAPI instance.
func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI {
api := &PublicFilterAPI{
- backend: backend,
- useMipMap: !lightMode,
- mux: backend.EventMux(),
- chainDb: backend.ChainDb(),
- events: NewEventSystem(backend.EventMux(), backend, lightMode),
- filters: make(map[rpc.ID]*filter),
+ backend: backend,
+ mux: backend.EventMux(),
+ chainDb: backend.ChainDb(),
+ events: NewEventSystem(backend.EventMux(), backend, lightMode),
+ filters: make(map[rpc.ID]*filter),
}
-
go api.timeoutLoop()
return api
@@ -325,20 +323,20 @@ func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) {
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs
func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*types.Log, error) {
+ // Convert the RPC block numbers into internal representations
if crit.FromBlock == nil {
crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
}
if crit.ToBlock == nil {
crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
}
+ // Create and run the filter to get all the logs
+ filter := New(api.backend, crit.FromBlock.Int64(), crit.ToBlock.Int64(), crit.Addresses, crit.Topics)
- filter := New(api.backend, api.useMipMap)
- filter.SetBeginBlock(crit.FromBlock.Int64())
- filter.SetEndBlock(crit.ToBlock.Int64())
- filter.SetAddresses(crit.Addresses)
- filter.SetTopics(crit.Topics)
-
- logs, err := filter.Find(ctx)
+ logs, err := filter.Logs(ctx)
+ if err != nil {
+ return nil, err
+ }
return returnLogs(logs), err
}
@@ -372,21 +370,18 @@ func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*ty
return nil, fmt.Errorf("filter not found")
}
- filter := New(api.backend, api.useMipMap)
+ begin := rpc.LatestBlockNumber.Int64()
if f.crit.FromBlock != nil {
- filter.SetBeginBlock(f.crit.FromBlock.Int64())
- } else {
- filter.SetBeginBlock(rpc.LatestBlockNumber.Int64())
+ begin = f.crit.FromBlock.Int64()
}
+ end := rpc.LatestBlockNumber.Int64()
if f.crit.ToBlock != nil {
- filter.SetEndBlock(f.crit.ToBlock.Int64())
- } else {
- filter.SetEndBlock(rpc.LatestBlockNumber.Int64())
+ end = f.crit.ToBlock.Int64()
}
- filter.SetAddresses(f.crit.Addresses)
- filter.SetTopics(f.crit.Topics)
+ // Create and run the filter to get all the logs
+ filter := New(api.backend, begin, end, f.crit.Addresses, f.crit.Topics)
- logs, err := filter.Find(ctx)
+ logs, err := filter.Logs(ctx)
if err != nil {
return nil, err
}
@@ -394,7 +389,7 @@ func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*ty
}
// GetFilterChanges returns the logs for the filter with the given id since
-// last time is was called. This can be used for polling.
+// last time it was called. This can be used for polling.
//
// For pending transaction and block filters the result is []common.Hash.
// (pending)Log filters return []Log.
diff --git a/eth/filters/bench_test.go b/eth/filters/bench_test.go
new file mode 100644
index 000000000..abbf4593e
--- /dev/null
+++ b/eth/filters/bench_test.go
@@ -0,0 +1,201 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package filters
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/bitutil"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/node"
+)
+
+func BenchmarkBloomBits512(b *testing.B) {
+ benchmarkBloomBits(b, 512)
+}
+
+func BenchmarkBloomBits1k(b *testing.B) {
+ benchmarkBloomBits(b, 1024)
+}
+
+func BenchmarkBloomBits2k(b *testing.B) {
+ benchmarkBloomBits(b, 2048)
+}
+
+func BenchmarkBloomBits4k(b *testing.B) {
+ benchmarkBloomBits(b, 4096)
+}
+
+func BenchmarkBloomBits8k(b *testing.B) {
+ benchmarkBloomBits(b, 8192)
+}
+
+func BenchmarkBloomBits16k(b *testing.B) {
+ benchmarkBloomBits(b, 16384)
+}
+
+func BenchmarkBloomBits32k(b *testing.B) {
+ benchmarkBloomBits(b, 32768)
+}
+
+const benchFilterCnt = 2000
+
+func benchmarkBloomBits(b *testing.B, sectionSize uint64) {
+ benchDataDir := node.DefaultDataDir() + "/geth/chaindata"
+ fmt.Println("Running bloombits benchmark section size:", sectionSize)
+
+ db, err := ethdb.NewLDBDatabase(benchDataDir, 128, 1024)
+ if err != nil {
+ b.Fatalf("error opening database at %v: %v", benchDataDir, err)
+ }
+ head := core.GetHeadBlockHash(db)
+ if head == (common.Hash{}) {
+ b.Fatalf("chain data not found at %v", benchDataDir)
+ }
+
+ clearBloomBits(db)
+ fmt.Println("Generating bloombits data...")
+ headNum := core.GetBlockNumber(db, head)
+ if headNum < sectionSize+512 {
+ b.Fatalf("not enough blocks for running a benchmark")
+ }
+
+ start := time.Now()
+ cnt := (headNum - 512) / sectionSize
+ var dataSize, compSize uint64
+ for sectionIdx := uint64(0); sectionIdx < cnt; sectionIdx++ {
+ bc, err := bloombits.NewGenerator(uint(sectionSize))
+ if err != nil {
+ b.Fatalf("failed to create generator: %v", err)
+ }
+ var header *types.Header
+ for i := sectionIdx * sectionSize; i < (sectionIdx+1)*sectionSize; i++ {
+ hash := core.GetCanonicalHash(db, i)
+ header = core.GetHeader(db, hash, i)
+ if header == nil {
+ b.Fatalf("Error creating bloomBits data")
+ }
+ bc.AddBloom(uint(i-sectionIdx*sectionSize), header.Bloom)
+ }
+ sectionHead := core.GetCanonicalHash(db, (sectionIdx+1)*sectionSize-1)
+ for i := 0; i < types.BloomBitLength; i++ {
+ data, err := bc.Bitset(uint(i))
+ if err != nil {
+ b.Fatalf("failed to retrieve bitset: %v", err)
+ }
+ comp := bitutil.CompressBytes(data)
+ dataSize += uint64(len(data))
+ compSize += uint64(len(comp))
+ core.WriteBloomBits(db, uint(i), sectionIdx, sectionHead, comp)
+ }
+ //if sectionIdx%50 == 0 {
+ // fmt.Println(" section", sectionIdx, "/", cnt)
+ //}
+ }
+
+ d := time.Since(start)
+ fmt.Println("Finished generating bloombits data")
+ fmt.Println(" ", d, "total ", d/time.Duration(cnt*sectionSize), "per block")
+ fmt.Println(" data size:", dataSize, " compressed size:", compSize, " compression ratio:", float64(compSize)/float64(dataSize))
+
+ fmt.Println("Running filter benchmarks...")
+ start = time.Now()
+ mux := new(event.TypeMux)
+ var backend *testBackend
+
+ for i := 0; i < benchFilterCnt; i++ {
+ if i%20 == 0 {
+ db.Close()
+ db, _ = ethdb.NewLDBDatabase(benchDataDir, 128, 1024)
+ backend = &testBackend{mux, db, cnt, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
+ }
+ var addr common.Address
+ addr[0] = byte(i)
+ addr[1] = byte(i / 256)
+ filter := New(backend, 0, int64(cnt*sectionSize-1), []common.Address{addr}, nil)
+ if _, err := filter.Logs(context.Background()); err != nil {
+ b.Error("filter.Find error:", err)
+ }
+ }
+ d = time.Since(start)
+ fmt.Println("Finished running filter benchmarks")
+ fmt.Println(" ", d, "total ", d/time.Duration(benchFilterCnt), "per address", d*time.Duration(1000000)/time.Duration(benchFilterCnt*cnt*sectionSize), "per million blocks")
+ db.Close()
+}
+
+func forEachKey(db ethdb.Database, startPrefix, endPrefix []byte, fn func(key []byte)) {
+ it := db.(*ethdb.LDBDatabase).NewIterator()
+ it.Seek(startPrefix)
+ for it.Valid() {
+ key := it.Key()
+ cmpLen := len(key)
+ if len(endPrefix) < cmpLen {
+ cmpLen = len(endPrefix)
+ }
+ if bytes.Compare(key[:cmpLen], endPrefix) == 1 {
+ break
+ }
+ fn(common.CopyBytes(key))
+ it.Next()
+ }
+ it.Release()
+}
+
+var bloomBitsPrefix = []byte("bloomBits-")
+
+func clearBloomBits(db ethdb.Database) {
+ fmt.Println("Clearing bloombits data...")
+ forEachKey(db, bloomBitsPrefix, bloomBitsPrefix, func(key []byte) {
+ db.Delete(key)
+ })
+}
+
+func BenchmarkNoBloomBits(b *testing.B) {
+ benchDataDir := node.DefaultDataDir() + "/geth/chaindata"
+ fmt.Println("Running benchmark without bloombits")
+ db, err := ethdb.NewLDBDatabase(benchDataDir, 128, 1024)
+ if err != nil {
+ b.Fatalf("error opening database at %v: %v", benchDataDir, err)
+ }
+ head := core.GetHeadBlockHash(db)
+ if head == (common.Hash{}) {
+ b.Fatalf("chain data not found at %v", benchDataDir)
+ }
+ headNum := core.GetBlockNumber(db, head)
+
+ clearBloomBits(db)
+
+ fmt.Println("Running filter benchmarks...")
+ start := time.Now()
+ mux := new(event.TypeMux)
+ backend := &testBackend{mux, db, 0, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
+ filter := New(backend, 0, int64(headNum), []common.Address{common.Address{}}, nil)
+ filter.Logs(context.Background())
+ d := time.Since(start)
+ fmt.Println("Finished running filter benchmarks")
+ fmt.Println(" ", d, "total ", d*time.Duration(1000000)/time.Duration(headNum+1), "per million blocks")
+ db.Close()
+}
diff --git a/eth/filters/filter.go b/eth/filters/filter.go
index f848bc6af..4f6c30058 100644
--- a/eth/filters/filter.go
+++ b/eth/filters/filter.go
@@ -18,11 +18,12 @@ package filters
import (
"context"
- "math"
"math/big"
+ "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
@@ -34,167 +35,179 @@ type Backend interface {
EventMux() *event.TypeMux
HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
+
SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
+
+ BloomStatus() (uint64, uint64)
+ ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
}
// Filter can be used to retrieve and filter logs.
type Filter struct {
- backend Backend
- useMipMap bool
+ backend Backend
db ethdb.Database
begin, end int64
addresses []common.Address
topics [][]common.Hash
+
+ matcher *bloombits.Matcher
}
// New creates a new filter which uses a bloom filter on blocks to figure out whether
// a particular block is interesting or not.
-// MipMaps allow past blocks to be searched much more efficiently, but are not available
-// to light clients.
-func New(backend Backend, useMipMap bool) *Filter {
+func New(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter {
+ // Flatten the address and topic filter clauses into a single filter system
+ var filters [][][]byte
+ if len(addresses) > 0 {
+ filter := make([][]byte, len(addresses))
+ for i, address := range addresses {
+ filter[i] = address.Bytes()
+ }
+ filters = append(filters, filter)
+ }
+ for _, topicList := range topics {
+ filter := make([][]byte, len(topicList))
+ for i, topic := range topicList {
+ filter[i] = topic.Bytes()
+ }
+ filters = append(filters, filter)
+ }
+ // Assemble and return the filter
+ size, _ := backend.BloomStatus()
+
return &Filter{
backend: backend,
- useMipMap: useMipMap,
+ begin: begin,
+ end: end,
+ addresses: addresses,
+ topics: topics,
db: backend.ChainDb(),
+ matcher: bloombits.NewMatcher(size, filters),
}
}
-// SetBeginBlock sets the earliest block for filtering.
-// -1 = latest block (i.e., the current block)
-// hash = particular hash from-to
-func (f *Filter) SetBeginBlock(begin int64) {
- f.begin = begin
-}
-
-// SetEndBlock sets the latest block for filtering.
-func (f *Filter) SetEndBlock(end int64) {
- f.end = end
-}
-
-// SetAddresses matches only logs that are generated from addresses that are included
-// in the given addresses.
-func (f *Filter) SetAddresses(addr []common.Address) {
- f.addresses = addr
-}
-
-// SetTopics matches only logs that have topics matching the given topics.
-func (f *Filter) SetTopics(topics [][]common.Hash) {
- f.topics = topics
-}
-
-// FindOnce searches the blockchain for matching log entries, returning
-// all matching entries from the first block that contains matches,
-// updating the start point of the filter accordingly. If no results are
-// found, a nil slice is returned.
-func (f *Filter) FindOnce(ctx context.Context) ([]*types.Log, error) {
- head, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
- if head == nil {
+// Logs searches the blockchain for matching log entries, returning all from the
+// first block that contains matches, updating the start of the filter accordingly.
+func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
+ // Figure out the limits of the filter range
+ header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
+ if header == nil {
return nil, nil
}
- headBlockNumber := head.Number.Uint64()
+ head := header.Number.Uint64()
- var beginBlockNo uint64 = uint64(f.begin)
if f.begin == -1 {
- beginBlockNo = headBlockNumber
+ f.begin = int64(head)
}
- var endBlockNo uint64 = uint64(f.end)
+ end := uint64(f.end)
if f.end == -1 {
- endBlockNo = headBlockNumber
- }
-
- // if no addresses are present we can't make use of fast search which
- // uses the mipmap bloom filters to check for fast inclusion and uses
- // higher range probability in order to ensure at least a false positive
- if !f.useMipMap || len(f.addresses) == 0 {
- logs, blockNumber, err := f.getLogs(ctx, beginBlockNo, endBlockNo)
- f.begin = int64(blockNumber + 1)
- return logs, err
+ end = head
}
-
- logs, blockNumber := f.mipFind(beginBlockNo, endBlockNo, 0)
- f.begin = int64(blockNumber + 1)
- return logs, nil
-}
-
-// Run filters logs with the current parameters set
-func (f *Filter) Find(ctx context.Context) (logs []*types.Log, err error) {
- for {
- newLogs, err := f.FindOnce(ctx)
- if len(newLogs) == 0 || err != nil {
+ // Gather all indexed logs, and finish with non indexed ones
+ var (
+ logs []*types.Log
+ err error
+ )
+ size, sections := f.backend.BloomStatus()
+ if indexed := sections * size; indexed > uint64(f.begin) {
+ if indexed > end {
+ logs, err = f.indexedLogs(ctx, end)
+ } else {
+ logs, err = f.indexedLogs(ctx, indexed-1)
+ }
+ if err != nil {
return logs, err
}
- logs = append(logs, newLogs...)
}
+ rest, err := f.unindexedLogs(ctx, end)
+ logs = append(logs, rest...)
+ return logs, err
}
-func (f *Filter) mipFind(start, end uint64, depth int) (logs []*types.Log, blockNumber uint64) {
- level := core.MIPMapLevels[depth]
- // normalise numerator so we can work in level specific batches and
- // work with the proper range checks
- for num := start / level * level; num <= end; num += level {
- // find addresses in bloom filters
- bloom := core.GetMipmapBloom(f.db, num, level)
- // Don't bother checking the first time through the loop - we're probably picking
- // up where a previous run left off.
- first := true
- for _, addr := range f.addresses {
- if first || bloom.TestBytes(addr[:]) {
- first = false
- // range check normalised values and make sure that
- // we're resolving the correct range instead of the
- // normalised values.
- start := uint64(math.Max(float64(num), float64(start)))
- end := uint64(math.Min(float64(num+level-1), float64(end)))
- if depth+1 == len(core.MIPMapLevels) {
- l, blockNumber, _ := f.getLogs(context.Background(), start, end)
- if len(l) > 0 {
- return l, blockNumber
- }
- } else {
- l, blockNumber := f.mipFind(start, end, depth+1)
- if len(l) > 0 {
- return l, blockNumber
- }
- }
+// indexedLogs returns the logs matching the filter criteria based on the bloom
+// bits indexed available locally or via the network.
+func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
+ // Create a matcher session and request servicing from the backend
+ matches := make(chan uint64, 64)
+
+ session, err := f.matcher.Start(uint64(f.begin), end, matches)
+ if err != nil {
+ return nil, err
+ }
+ defer session.Close(time.Second)
+
+ f.backend.ServiceFilter(ctx, session)
+
+ // Iterate over the matches until exhausted or context closed
+ var logs []*types.Log
+
+ for {
+ select {
+ case number, ok := <-matches:
+ // Abort if all matches have been fulfilled
+ if !ok {
+ f.begin = int64(end) + 1
+ return logs, nil
+ }
+ // Retrieve the suggested block and pull any truly matching logs
+ header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
+ if header == nil || err != nil {
+ return logs, err
+ }
+ found, err := f.checkMatches(ctx, header)
+ if err != nil {
+ return logs, err
}
+ logs = append(logs, found...)
+
+ case <-ctx.Done():
+ return logs, ctx.Err()
}
}
-
- return nil, end
}
-func (f *Filter) getLogs(ctx context.Context, start, end uint64) (logs []*types.Log, blockNumber uint64, err error) {
- for i := start; i <= end; i++ {
- blockNumber := rpc.BlockNumber(i)
- header, err := f.backend.HeaderByNumber(ctx, blockNumber)
+// indexedLogs returns the logs matching the filter criteria based on raw block
+// iteration and bloom matching.
+func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
+ var logs []*types.Log
+
+ for ; f.begin <= int64(end); f.begin++ {
+ header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
if header == nil || err != nil {
- return logs, end, err
+ return logs, err
}
-
- // Use bloom filtering to see if this block is interesting given the
- // current parameters
- if f.bloomFilter(header.Bloom) {
- // Get the logs of the block
- receipts, err := f.backend.GetReceipts(ctx, header.Hash())
+ if bloomFilter(header.Bloom, f.addresses, f.topics) {
+ found, err := f.checkMatches(ctx, header)
if err != nil {
- return nil, end, err
- }
- var unfiltered []*types.Log
- for _, receipt := range receipts {
- unfiltered = append(unfiltered, ([]*types.Log)(receipt.Logs)...)
- }
- logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
- if len(logs) > 0 {
- return logs, uint64(blockNumber), nil
+ return logs, err
}
+ logs = append(logs, found...)
}
}
+ return logs, nil
+}
- return logs, end, nil
+// checkMatches checks if the receipts belonging to the given header contain any log events that
+// match the filter criteria. This function is called when the bloom filter signals a potential match.
+func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs []*types.Log, err error) {
+ // Get the logs of the block
+ receipts, err := f.backend.GetReceipts(ctx, header.Hash())
+ if err != nil {
+ return nil, err
+ }
+ var unfiltered []*types.Log
+ for _, receipt := range receipts {
+ unfiltered = append(unfiltered, ([]*types.Log)(receipt.Logs)...)
+ }
+ logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
+ if len(logs) > 0 {
+ return logs, nil
+ }
+ return nil, nil
}
func includes(addresses []common.Address, a common.Address) bool {
@@ -251,10 +264,6 @@ Logs:
return ret
}
-func (f *Filter) bloomFilter(bloom types.Bloom) bool {
- return bloomFilter(bloom, f.addresses, f.topics)
-}
-
func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]common.Hash) bool {
if len(addresses) > 0 {
var included bool
diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go
index fcc888b8c..664ce07a5 100644
--- a/eth/filters/filter_system_test.go
+++ b/eth/filters/filter_system_test.go
@@ -20,12 +20,14 @@ import (
"context"
"fmt"
"math/big"
+ "math/rand"
"reflect"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
@@ -36,6 +38,7 @@ import (
type testBackend struct {
mux *event.TypeMux
db ethdb.Database
+ sections uint64
txFeed *event.Feed
rmLogsFeed *event.Feed
logsFeed *event.Feed
@@ -84,6 +87,37 @@ func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subsc
return b.chainFeed.Subscribe(ch)
}
+func (b *testBackend) BloomStatus() (uint64, uint64) {
+ return params.BloomBitsBlocks, b.sections
+}
+
+func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
+ requests := make(chan chan *bloombits.Retrieval)
+
+ go session.Multiplex(16, 0, requests)
+ go func() {
+ for {
+ // Wait for a service request or a shutdown
+ select {
+ case <-ctx.Done():
+ return
+
+ case request := <-requests:
+ task := <-request
+
+ task.Bitsets = make([][]byte, len(task.Sections))
+ for i, section := range task.Sections {
+ if rand.Int()%4 != 0 { // Handle occasional missing deliveries
+ head := core.GetCanonicalHash(b.db, (section+1)*params.BloomBitsBlocks-1)
+ task.Bitsets[i] = core.GetBloomBits(b.db, task.Bit, section, head)
+ }
+ }
+ request <- task
+ }
+ }
+ }()
+}
+
// TestBlockSubscription tests if a block subscription returns block hashes for posted chain events.
// It creates multiple subscriptions:
// - one at the start and should receive all posted chain events and a second (blockHashes)
@@ -99,7 +133,7 @@ func TestBlockSubscription(t *testing.T) {
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
- backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
api = NewPublicFilterAPI(backend, false)
genesis = new(core.Genesis).MustCommit(db)
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, db, 10, func(i int, gen *core.BlockGen) {})
@@ -156,7 +190,7 @@ func TestPendingTxFilter(t *testing.T) {
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
- backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
api = NewPublicFilterAPI(backend, false)
transactions = []*types.Transaction{
@@ -219,7 +253,7 @@ func TestLogFilterCreation(t *testing.T) {
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
- backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
api = NewPublicFilterAPI(backend, false)
testCases = []struct {
@@ -268,7 +302,7 @@ func TestInvalidLogFilterCreation(t *testing.T) {
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
- backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
api = NewPublicFilterAPI(backend, false)
)
@@ -298,7 +332,7 @@ func TestLogFilter(t *testing.T) {
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
- backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
api = NewPublicFilterAPI(backend, false)
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
@@ -415,7 +449,7 @@ func TestPendingLogsSubscription(t *testing.T) {
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
- backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
api = NewPublicFilterAPI(backend, false)
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go
index cf508a218..11235e95a 100644
--- a/eth/filters/filter_test.go
+++ b/eth/filters/filter_test.go
@@ -41,8 +41,8 @@ func makeReceipt(addr common.Address) *types.Receipt {
return receipt
}
-func BenchmarkMipmaps(b *testing.B) {
- dir, err := ioutil.TempDir("", "mipmap")
+func BenchmarkFilters(b *testing.B) {
+ dir, err := ioutil.TempDir("", "filtertest")
if err != nil {
b.Fatal(err)
}
@@ -55,7 +55,7 @@ func BenchmarkMipmaps(b *testing.B) {
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
- backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
addr2 = common.BytesToAddress([]byte("jeff"))
@@ -66,27 +66,21 @@ func BenchmarkMipmaps(b *testing.B) {
genesis := core.GenesisBlockForTesting(db, addr1, big.NewInt(1000000))
chain, receipts := core.GenerateChain(params.TestChainConfig, genesis, db, 100010, func(i int, gen *core.BlockGen) {
- var receipts types.Receipts
switch i {
case 2403:
receipt := makeReceipt(addr1)
- receipts = types.Receipts{receipt}
gen.AddUncheckedReceipt(receipt)
case 1034:
receipt := makeReceipt(addr2)
- receipts = types.Receipts{receipt}
gen.AddUncheckedReceipt(receipt)
case 34:
receipt := makeReceipt(addr3)
- receipts = types.Receipts{receipt}
gen.AddUncheckedReceipt(receipt)
case 99999:
receipt := makeReceipt(addr4)
- receipts = types.Receipts{receipt}
gen.AddUncheckedReceipt(receipt)
}
- core.WriteMipmapBloom(db, uint64(i+1), receipts)
})
for i, block := range chain {
core.WriteBlock(db, block)
@@ -102,13 +96,10 @@ func BenchmarkMipmaps(b *testing.B) {
}
b.ResetTimer()
- filter := New(backend, true)
- filter.SetAddresses([]common.Address{addr1, addr2, addr3, addr4})
- filter.SetBeginBlock(0)
- filter.SetEndBlock(-1)
+ filter := New(backend, 0, -1, []common.Address{addr1, addr2, addr3, addr4}, nil)
for i := 0; i < b.N; i++ {
- logs, _ := filter.Find(context.Background())
+ logs, _ := filter.Logs(context.Background())
if len(logs) != 4 {
b.Fatal("expected 4 logs, got", len(logs))
}
@@ -116,7 +107,7 @@ func BenchmarkMipmaps(b *testing.B) {
}
func TestFilters(t *testing.T) {
- dir, err := ioutil.TempDir("", "mipmap")
+ dir, err := ioutil.TempDir("", "filtertest")
if err != nil {
t.Fatal(err)
}
@@ -129,7 +120,7 @@ func TestFilters(t *testing.T) {
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
- backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr = crypto.PubkeyToAddress(key1.PublicKey)
@@ -142,7 +133,6 @@ func TestFilters(t *testing.T) {
genesis := core.GenesisBlockForTesting(db, addr, big.NewInt(1000000))
chain, receipts := core.GenerateChain(params.TestChainConfig, genesis, db, 1000, func(i int, gen *core.BlockGen) {
- var receipts types.Receipts
switch i {
case 1:
receipt := types.NewReceipt(nil, false, new(big.Int))
@@ -153,7 +143,6 @@ func TestFilters(t *testing.T) {
},
}
gen.AddUncheckedReceipt(receipt)
- receipts = types.Receipts{receipt}
case 2:
receipt := types.NewReceipt(nil, false, new(big.Int))
receipt.Logs = []*types.Log{
@@ -163,7 +152,6 @@ func TestFilters(t *testing.T) {
},
}
gen.AddUncheckedReceipt(receipt)
- receipts = types.Receipts{receipt}
case 998:
receipt := types.NewReceipt(nil, false, new(big.Int))
receipt.Logs = []*types.Log{
@@ -173,7 +161,6 @@ func TestFilters(t *testing.T) {
},
}
gen.AddUncheckedReceipt(receipt)
- receipts = types.Receipts{receipt}
case 999:
receipt := types.NewReceipt(nil, false, new(big.Int))
receipt.Logs = []*types.Log{
@@ -183,12 +170,7 @@ func TestFilters(t *testing.T) {
},
}
gen.AddUncheckedReceipt(receipt)
- receipts = types.Receipts{receipt}
}
- // i is used as block number for the writes but since the i
- // starts at 0 and block 0 (genesis) is already present increment
- // by one
- core.WriteMipmapBloom(db, uint64(i+1), receipts)
})
for i, block := range chain {
core.WriteBlock(db, block)
@@ -203,23 +185,15 @@ func TestFilters(t *testing.T) {
}
}
- filter := New(backend, true)
- filter.SetAddresses([]common.Address{addr})
- filter.SetTopics([][]common.Hash{{hash1, hash2, hash3, hash4}})
- filter.SetBeginBlock(0)
- filter.SetEndBlock(-1)
+ filter := New(backend, 0, -1, []common.Address{addr}, [][]common.Hash{{hash1, hash2, hash3, hash4}})
- logs, _ := filter.Find(context.Background())
+ logs, _ := filter.Logs(context.Background())
if len(logs) != 4 {
t.Error("expected 4 log, got", len(logs))
}
- filter = New(backend, true)
- filter.SetAddresses([]common.Address{addr})
- filter.SetTopics([][]common.Hash{{hash3}})
- filter.SetBeginBlock(900)
- filter.SetEndBlock(999)
- logs, _ = filter.Find(context.Background())
+ filter = New(backend, 900, 999, []common.Address{addr}, [][]common.Hash{{hash3}})
+ logs, _ = filter.Logs(context.Background())
if len(logs) != 1 {
t.Error("expected 1 log, got", len(logs))
}
@@ -227,12 +201,8 @@ func TestFilters(t *testing.T) {
t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0])
}
- filter = New(backend, true)
- filter.SetAddresses([]common.Address{addr})
- filter.SetTopics([][]common.Hash{{hash3}})
- filter.SetBeginBlock(990)
- filter.SetEndBlock(-1)
- logs, _ = filter.Find(context.Background())
+ filter = New(backend, 990, -1, []common.Address{addr}, [][]common.Hash{{hash3}})
+ logs, _ = filter.Logs(context.Background())
if len(logs) != 1 {
t.Error("expected 1 log, got", len(logs))
}
@@ -240,44 +210,32 @@ func TestFilters(t *testing.T) {
t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0])
}
- filter = New(backend, true)
- filter.SetTopics([][]common.Hash{{hash1, hash2}})
- filter.SetBeginBlock(1)
- filter.SetEndBlock(10)
+ filter = New(backend, 1, 10, nil, [][]common.Hash{{hash1, hash2}})
- logs, _ = filter.Find(context.Background())
+ logs, _ = filter.Logs(context.Background())
if len(logs) != 2 {
t.Error("expected 2 log, got", len(logs))
}
failHash := common.BytesToHash([]byte("fail"))
- filter = New(backend, true)
- filter.SetTopics([][]common.Hash{{failHash}})
- filter.SetBeginBlock(0)
- filter.SetEndBlock(-1)
+ filter = New(backend, 0, -1, nil, [][]common.Hash{{failHash}})
- logs, _ = filter.Find(context.Background())
+ logs, _ = filter.Logs(context.Background())
if len(logs) != 0 {
t.Error("expected 0 log, got", len(logs))
}
failAddr := common.BytesToAddress([]byte("failmenow"))
- filter = New(backend, true)
- filter.SetAddresses([]common.Address{failAddr})
- filter.SetBeginBlock(0)
- filter.SetEndBlock(-1)
+ filter = New(backend, 0, -1, []common.Address{failAddr}, nil)
- logs, _ = filter.Find(context.Background())
+ logs, _ = filter.Logs(context.Background())
if len(logs) != 0 {
t.Error("expected 0 log, got", len(logs))
}
- filter = New(backend, true)
- filter.SetTopics([][]common.Hash{{failHash}, {hash1}})
- filter.SetBeginBlock(0)
- filter.SetEndBlock(-1)
+ filter = New(backend, 0, -1, nil, [][]common.Hash{{failHash}, {hash1}})
- logs, _ = filter.Find(context.Background())
+ logs, _ = filter.Logs(context.Background())
if len(logs) != 0 {
t.Error("expected 0 log, got", len(logs))
}