aboutsummaryrefslogtreecommitdiffstats
path: root/eth
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2017-08-29 19:13:11 +0800
committerPéter Szilágyi <peterke@gmail.com>2017-09-06 16:14:19 +0800
commitf585f9eee8cb18423c23fe8b517b5b4cbe3b3755 (patch)
tree08c232ee58318c20f971cf8e3f5dfa09f1e2caf7 /eth
parent4ea4d2dc3473afd9d2eda6ef6b359accce1f0946 (diff)
downloaddexon-f585f9eee8cb18423c23fe8b517b5b4cbe3b3755.tar
dexon-f585f9eee8cb18423c23fe8b517b5b4cbe3b3755.tar.gz
dexon-f585f9eee8cb18423c23fe8b517b5b4cbe3b3755.tar.bz2
dexon-f585f9eee8cb18423c23fe8b517b5b4cbe3b3755.tar.lz
dexon-f585f9eee8cb18423c23fe8b517b5b4cbe3b3755.tar.xz
dexon-f585f9eee8cb18423c23fe8b517b5b4cbe3b3755.tar.zst
dexon-f585f9eee8cb18423c23fe8b517b5b4cbe3b3755.zip
core, eth: clean up bloom filtering, add some tests
Diffstat (limited to 'eth')
-rw-r--r--eth/api_backend.go29
-rw-r--r--eth/backend.go21
-rw-r--r--eth/bloombits.go142
-rw-r--r--eth/db_upgrade.go38
-rw-r--r--eth/filters/api.go49
-rw-r--r--eth/filters/bench_test.go82
-rw-r--r--eth/filters/filter.go270
-rw-r--r--eth/filters/filter_system_test.go60
-rw-r--r--eth/filters/filter_test.go18
-rw-r--r--eth/handler.go5
10 files changed, 347 insertions, 367 deletions
diff --git a/eth/api_backend.go b/eth/api_backend.go
index fa3cf3f80..91f392f94 100644
--- a/eth/api_backend.go
+++ b/eth/api_backend.go
@@ -24,11 +24,11 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/eth/downloader"
- "github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/eth/gasprice"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
@@ -196,28 +196,13 @@ func (b *EthApiBackend) AccountManager() *accounts.Manager {
return b.eth.AccountManager()
}
-func (b *EthApiBackend) GetBloomBits(ctx context.Context, bitIdx uint64, sectionIdxList []uint64) ([][]byte, error) {
- results := make([][]byte, len(sectionIdxList))
- var err error
- for i, sectionIdx := range sectionIdxList {
- sectionHead := core.GetCanonicalHash(b.eth.chainDb, (sectionIdx+1)*bloomBitsSection-1)
- results[i], err = core.GetBloomBits(b.eth.chainDb, bitIdx, sectionIdx, sectionHead)
- if err != nil {
- return nil, err
- }
- }
- return results, nil
-}
-
-func (b *EthApiBackend) BloomBitsSections() uint64 {
- sections, _, _ := b.eth.bbIndexer.Sections()
- return sections
+func (b *EthApiBackend) BloomStatus() (uint64, uint64) {
+ sections, _, _ := b.eth.bloomIndexer.Sections()
+ return params.BloomBitsBlocks, sections
}
-func (b *EthApiBackend) BloomBitsConfig() filters.BloomConfig {
- return filters.BloomConfig{
- SectionSize: bloomBitsSection,
- MaxRequestLen: 16,
- MaxRequestWait: 0,
+func (b *EthApiBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
+ for i := 0; i < bloomFilterThreads; i++ {
+ go session.Multiplex(bloomRetrievalBatch, bloomRetrievalWait, b.eth.bloomRequests)
}
}
diff --git a/eth/backend.go b/eth/backend.go
index efc0a2317..99a1928fe 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -32,6 +32,7 @@ import (
"github.com/ethereum/go-ethereum/consensus/clique"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/eth/downloader"
@@ -77,7 +78,8 @@ type Ethereum struct {
engine consensus.Engine
accountManager *accounts.Manager
- bbIndexer *core.ChainIndexer
+ bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
+ bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports
ApiBackend *EthApiBackend
@@ -127,7 +129,8 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
networkId: config.NetworkId,
gasPrice: config.GasPrice,
etherbase: config.Etherbase,
- bbIndexer: NewBloomBitsProcessor(chainDb, bloomBitsSection),
+ bloomRequests: make(chan chan *bloombits.Retrieval),
+ bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks),
}
log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId)
@@ -151,7 +154,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
eth.blockchain.SetHead(compat.RewindTo)
core.WriteChainConfig(chainDb, genesisHash, chainConfig)
}
- eth.bbIndexer.Start(eth.blockchain)
+ eth.bloomIndexer.Start(eth.blockchain.CurrentHeader(), eth.blockchain.SubscribeChainEvent)
if config.TxPool.Journal != "" {
config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
@@ -261,7 +264,7 @@ func (s *Ethereum) APIs() []rpc.API {
}, {
Namespace: "eth",
Version: "1.0",
- Service: filters.NewPublicFilterAPI(s.ApiBackend, false, bloomBitsSection),
+ Service: filters.NewPublicFilterAPI(s.ApiBackend, false),
Public: true,
}, {
Namespace: "admin",
@@ -359,14 +362,17 @@ func (s *Ethereum) Downloader() *downloader.Downloader { return s.protocolManage
func (s *Ethereum) Protocols() []p2p.Protocol {
if s.lesServer == nil {
return s.protocolManager.SubProtocols
- } else {
- return append(s.protocolManager.SubProtocols, s.lesServer.Protocols()...)
}
+ return append(s.protocolManager.SubProtocols, s.lesServer.Protocols()...)
}
// Start implements node.Service, starting all internal goroutines needed by the
// Ethereum protocol implementation.
func (s *Ethereum) Start(srvr *p2p.Server) error {
+ // Start the bloom bits servicing goroutines
+ s.startBloomHandlers()
+
+ // Start the RPC service
s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion())
// Figure out a max peers count based on the server limits
@@ -377,6 +383,7 @@ func (s *Ethereum) Start(srvr *p2p.Server) error {
maxPeers = srvr.MaxPeers / 2
}
}
+ // Start the networking layer and the light server if requested
s.protocolManager.Start(maxPeers)
if s.lesServer != nil {
s.lesServer.Start(srvr)
@@ -390,7 +397,7 @@ func (s *Ethereum) Stop() error {
if s.stopDbUpgrade != nil {
s.stopDbUpgrade()
}
- s.bbIndexer.Close()
+ s.bloomIndexer.Close()
s.blockchain.Stop()
s.protocolManager.Stop()
if s.lesServer != nil {
diff --git a/eth/bloombits.go b/eth/bloombits.go
new file mode 100644
index 000000000..02de408b0
--- /dev/null
+++ b/eth/bloombits.go
@@ -0,0 +1,142 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package eth
+
+import (
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/bitutil"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/params"
+)
+
+const (
+ // bloomServiceThreads is the number of goroutines used globally by an Ethereum
+ // instance to service bloombits lookups for all running filters.
+ bloomServiceThreads = 16
+
+ // bloomFilterThreads is the number of goroutines used locally per filter to
+ // multiplex requests onto the global servicing goroutines.
+ bloomFilterThreads = 3
+
+ // bloomRetrievalBatch is the maximum number of bloom bit retrievals to service
+ // in a single batch.
+ bloomRetrievalBatch = 16
+
+ // bloomRetrievalWait is the maximum time to wait for enough bloom bit requests
+ // to accumulate request an entire batch (avoiding hysteresis).
+ bloomRetrievalWait = time.Duration(0)
+)
+
+// startBloomHandlers starts a batch of goroutines to accept bloom bit database
+// retrievals from possibly a range of filters and serving the data to satisfy.
+func (eth *Ethereum) startBloomHandlers() {
+ for i := 0; i < bloomServiceThreads; i++ {
+ go func() {
+ for {
+ select {
+ case <-eth.shutdownChan:
+ return
+
+ case request := <-eth.bloomRequests:
+ task := <-request
+
+ task.Bitsets = make([][]byte, len(task.Sections))
+ for i, section := range task.Sections {
+ head := core.GetCanonicalHash(eth.chainDb, (section+1)*params.BloomBitsBlocks-1)
+ blob, err := bitutil.DecompressBytes(core.GetBloomBits(eth.chainDb, task.Bit, section, head), int(params.BloomBitsBlocks)/8)
+ if err != nil {
+ panic(err)
+ }
+ task.Bitsets[i] = blob
+ }
+ request <- task
+ }
+ }
+ }()
+ }
+}
+
+const (
+ // bloomConfirms is the number of confirmation blocks before a bloom section is
+ // considered probably final and its rotated bits are calculated.
+ bloomConfirms = 256
+
+ // bloomThrottling is the time to wait between processing two consecutive index
+ // sections. It's useful during chain upgrades to prevent disk overload.
+ bloomThrottling = 100 * time.Millisecond
+)
+
+// BloomIndexer implements a core.ChainIndexer, building up a rotated bloom bits index
+// for the Ethereum header bloom filters, permitting blazing fast filtering.
+type BloomIndexer struct {
+ size uint64 // section size to generate bloombits for
+
+ db ethdb.Database // database instance to write index data and metadata into
+ gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index
+
+ section uint64 // Section is the section number being processed currently
+ head common.Hash // Head is the hash of the last header processed
+}
+
+// NewBloomIndexer returns a chain indexer that generates bloom bits data for the
+// canonical chain for fast logs filtering.
+func NewBloomIndexer(db ethdb.Database, size uint64) *core.ChainIndexer {
+ backend := &BloomIndexer{
+ db: db,
+ size: size,
+ }
+ table := ethdb.NewTable(db, string(core.BloomBitsIndexPrefix))
+
+ return core.NewChainIndexer(db, table, backend, size, bloomConfirms, bloomThrottling, "bloombits")
+}
+
+// Reset implements core.ChainIndexerBackend, starting a new bloombits index
+// section.
+func (b *BloomIndexer) Reset(section uint64) {
+ gen, err := bloombits.NewGenerator(uint(b.size))
+ if err != nil {
+ panic(err)
+ }
+ b.gen, b.section, b.head = gen, section, common.Hash{}
+}
+
+// Process implements core.ChainIndexerBackend, adding a new header's bloom into
+// the index.
+func (b *BloomIndexer) Process(header *types.Header) {
+ b.gen.AddBloom(header.Bloom)
+ b.head = header.Hash()
+}
+
+// Commit implements core.ChainIndexerBackend, finalizing the bloom section and
+// writing it out into the database.
+func (b *BloomIndexer) Commit() error {
+ batch := b.db.NewBatch()
+
+ for i := 0; i < types.BloomBitLength; i++ {
+ bits, err := b.gen.Bitset(uint(i))
+ if err != nil {
+ return err
+ }
+ core.WriteBloomBits(batch, uint(i), b.section, b.head, bitutil.CompressBytes(bits))
+ }
+ return batch.Write()
+}
diff --git a/eth/db_upgrade.go b/eth/db_upgrade.go
index ce8ce699a..d41afa17c 100644
--- a/eth/db_upgrade.go
+++ b/eth/db_upgrade.go
@@ -22,10 +22,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/common/bitutil"
"github.com/ethereum/go-ethereum/core"
- "github.com/ethereum/go-ethereum/core/bloombits"
- "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
@@ -136,38 +133,3 @@ func upgradeDeduplicateData(db ethdb.Database) func() error {
return <-errc
}
}
-
-// BloomBitsIndex implements ChainIndex
-type BloomBitsIndex struct {
- db ethdb.Database
- bc *bloombits.BloomBitsCreator
- section, sectionSize uint64
- sectionHead common.Hash
-}
-
-// number of confirmation blocks before a section is considered probably final and its bloom bits are calculated
-const bloomBitsConfirmations = 256
-
-// NewBloomBitsProcessor returns a chain processor that generates bloom bits data for the canonical chain
-func NewBloomBitsProcessor(db ethdb.Database, sectionSize uint64) *core.ChainIndexer {
- backend := &BloomBitsIndex{db: db, sectionSize: sectionSize}
- return core.NewChainIndexer(db, ethdb.NewTable(db, "bbIndex-"), backend, sectionSize, bloomBitsConfirmations, time.Millisecond*100, "bloombits")
-}
-
-func (b *BloomBitsIndex) Reset(section uint64, lastSectionHead common.Hash) {
- b.bc = bloombits.NewBloomBitsCreator(b.sectionSize)
- b.section = section
-}
-
-func (b *BloomBitsIndex) Process(header *types.Header) {
- b.bc.AddHeaderBloom(header.Bloom)
- b.sectionHead = header.Hash()
-}
-
-func (b *BloomBitsIndex) Commit() error {
- for i := 0; i < bloombits.BloomLength; i++ {
- compVector := bitutil.CompressBytes(b.bc.GetBitVector(uint(i)))
- core.StoreBloomBits(b.db, uint64(i), b.section, b.sectionHead, compVector)
- }
- return nil
-}
diff --git a/eth/filters/api.go b/eth/filters/api.go
index 11767753e..6e1d48adb 100644
--- a/eth/filters/api.go
+++ b/eth/filters/api.go
@@ -51,27 +51,24 @@ type filter struct {
// PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
// information related to the Ethereum protocol such als blocks, transactions and logs.
type PublicFilterAPI struct {
- backend Backend
- bloomBitsSection uint64
- mux *event.TypeMux
- quit chan struct{}
- chainDb ethdb.Database
- events *EventSystem
- filtersMu sync.Mutex
- filters map[rpc.ID]*filter
+ backend Backend
+ mux *event.TypeMux
+ quit chan struct{}
+ chainDb ethdb.Database
+ events *EventSystem
+ filtersMu sync.Mutex
+ filters map[rpc.ID]*filter
}
// NewPublicFilterAPI returns a new PublicFilterAPI instance.
-func NewPublicFilterAPI(backend Backend, lightMode bool, bloomBitsSection uint64) *PublicFilterAPI {
+func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI {
api := &PublicFilterAPI{
- backend: backend,
- bloomBitsSection: bloomBitsSection,
- mux: backend.EventMux(),
- chainDb: backend.ChainDb(),
- events: NewEventSystem(backend.EventMux(), backend, lightMode),
- filters: make(map[rpc.ID]*filter),
+ backend: backend,
+ mux: backend.EventMux(),
+ chainDb: backend.ChainDb(),
+ events: NewEventSystem(backend.EventMux(), backend, lightMode),
+ filters: make(map[rpc.ID]*filter),
}
-
go api.timeoutLoop()
return api
@@ -326,16 +323,20 @@ func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) {
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs
func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*types.Log, error) {
+ // Convert the RPC block numbers into internal representations
if crit.FromBlock == nil {
crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
}
if crit.ToBlock == nil {
crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
}
-
+ // Create and run the filter to get all the logs
filter := New(api.backend, crit.FromBlock.Int64(), crit.ToBlock.Int64(), crit.Addresses, crit.Topics)
- logs, err := filter.Find(ctx)
+ logs, err := filter.Logs(ctx)
+ if err != nil {
+ return nil, err
+ }
return returnLogs(logs), err
}
@@ -369,20 +370,18 @@ func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*ty
return nil, fmt.Errorf("filter not found")
}
- var begin, end int64
+ begin := rpc.LatestBlockNumber.Int64()
if f.crit.FromBlock != nil {
begin = f.crit.FromBlock.Int64()
- } else {
- begin = rpc.LatestBlockNumber.Int64()
}
+ end := rpc.LatestBlockNumber.Int64()
if f.crit.ToBlock != nil {
end = f.crit.ToBlock.Int64()
- } else {
- end = rpc.LatestBlockNumber.Int64()
}
+ // Create and run the filter to get all the logs
filter := New(api.backend, begin, end, f.crit.Addresses, f.crit.Topics)
- logs, err := filter.Find(ctx)
+ logs, err := filter.Logs(ctx)
if err != nil {
return nil, err
}
@@ -390,7 +389,7 @@ func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*ty
}
// GetFilterChanges returns the logs for the filter with the given id since
-// last time is was called. This can be used for polling.
+// last time it was called. This can be used for polling.
//
// For pending transaction and block filters the result is []common.Hash.
// (pending)Log filters return []Log.
diff --git a/eth/filters/bench_test.go b/eth/filters/bench_test.go
index 2487bc0eb..d994378fc 100644
--- a/eth/filters/bench_test.go
+++ b/eth/filters/bench_test.go
@@ -31,82 +31,41 @@ import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/node"
- "github.com/golang/snappy"
)
func BenchmarkBloomBits512(b *testing.B) {
- benchmarkBloomBitsForSize(b, 512)
+ benchmarkBloomBits(b, 512)
}
func BenchmarkBloomBits1k(b *testing.B) {
- benchmarkBloomBitsForSize(b, 1024)
+ benchmarkBloomBits(b, 1024)
}
func BenchmarkBloomBits2k(b *testing.B) {
- benchmarkBloomBitsForSize(b, 2048)
+ benchmarkBloomBits(b, 2048)
}
func BenchmarkBloomBits4k(b *testing.B) {
- benchmarkBloomBitsForSize(b, 4096)
+ benchmarkBloomBits(b, 4096)
}
func BenchmarkBloomBits8k(b *testing.B) {
- benchmarkBloomBitsForSize(b, 8192)
+ benchmarkBloomBits(b, 8192)
}
func BenchmarkBloomBits16k(b *testing.B) {
- benchmarkBloomBitsForSize(b, 16384)
+ benchmarkBloomBits(b, 16384)
}
func BenchmarkBloomBits32k(b *testing.B) {
- benchmarkBloomBitsForSize(b, 32768)
-}
-
-func benchmarkBloomBitsForSize(b *testing.B, sectionSize uint64) {
- benchmarkBloomBits(b, sectionSize, 0)
- benchmarkBloomBits(b, sectionSize, 1)
- benchmarkBloomBits(b, sectionSize, 2)
+ benchmarkBloomBits(b, 32768)
}
const benchFilterCnt = 2000
-func benchmarkBloomBits(b *testing.B, sectionSize uint64, comp int) {
+func benchmarkBloomBits(b *testing.B, sectionSize uint64) {
benchDataDir := node.DefaultDataDir() + "/geth/chaindata"
- fmt.Println("Running bloombits benchmark section size:", sectionSize, " compression method:", comp)
-
- var (
- compressFn func([]byte) []byte
- decompressFn func([]byte, int) ([]byte, error)
- )
- switch comp {
- case 0:
- // no compression
- compressFn = func(data []byte) []byte {
- return data
- }
- decompressFn = func(data []byte, target int) ([]byte, error) {
- if len(data) != target {
- panic(nil)
- }
- return data, nil
- }
- case 1:
- // bitutil/compress.go
- compressFn = bitutil.CompressBytes
- decompressFn = bitutil.DecompressBytes
- case 2:
- // go snappy
- compressFn = func(data []byte) []byte {
- return snappy.Encode(nil, data)
- }
- decompressFn = func(data []byte, target int) ([]byte, error) {
- decomp, err := snappy.Decode(nil, data)
- if err != nil || len(decomp) != target {
- panic(err)
- }
- return decomp, nil
- }
- }
+ fmt.Println("Running bloombits benchmark section size:", sectionSize)
db, err := ethdb.NewLDBDatabase(benchDataDir, 128, 1024)
if err != nil {
@@ -128,7 +87,10 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64, comp int) {
cnt := (headNum - 512) / sectionSize
var dataSize, compSize uint64
for sectionIdx := uint64(0); sectionIdx < cnt; sectionIdx++ {
- bc := bloombits.NewBloomBitsCreator(sectionSize)
+ bc, err := bloombits.NewGenerator(uint(sectionSize))
+ if err != nil {
+ b.Fatalf("failed to create generator: %v", err)
+ }
var header *types.Header
for i := sectionIdx * sectionSize; i < (sectionIdx+1)*sectionSize; i++ {
hash := core.GetCanonicalHash(db, i)
@@ -136,15 +98,18 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64, comp int) {
if header == nil {
b.Fatalf("Error creating bloomBits data")
}
- bc.AddHeaderBloom(header.Bloom)
+ bc.AddBloom(header.Bloom)
}
sectionHead := core.GetCanonicalHash(db, (sectionIdx+1)*sectionSize-1)
- for i := 0; i < bloombits.BloomLength; i++ {
- data := bc.GetBitVector(uint(i))
- comp := compressFn(data)
+ for i := 0; i < types.BloomBitLength; i++ {
+ data, err := bc.Bitset(uint(i))
+ if err != nil {
+ b.Fatalf("failed to retrieve bitset: %v", err)
+ }
+ comp := bitutil.CompressBytes(data)
dataSize += uint64(len(data))
compSize += uint64(len(comp))
- core.StoreBloomBits(db, uint64(i), sectionIdx, sectionHead, comp)
+ core.WriteBloomBits(db, uint(i), sectionIdx, sectionHead, comp)
}
//if sectionIdx%50 == 0 {
// fmt.Println(" section", sectionIdx, "/", cnt)
@@ -171,8 +136,7 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64, comp int) {
addr[0] = byte(i)
addr[1] = byte(i / 256)
filter := New(backend, 0, int64(cnt*sectionSize-1), []common.Address{addr}, nil)
- filter.decompress = decompressFn
- if _, err := filter.Find(context.Background()); err != nil {
+ if _, err := filter.Logs(context.Background()); err != nil {
b.Error("filter.Find error:", err)
}
}
@@ -229,7 +193,7 @@ func BenchmarkNoBloomBits(b *testing.B) {
mux := new(event.TypeMux)
backend := &testBackend{mux, db, 0, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
filter := New(backend, 0, int64(headNum), []common.Address{common.Address{}}, nil)
- filter.Find(context.Background())
+ filter.Logs(context.Background())
d := time.Since(start)
fmt.Println("Finished running filter benchmarks")
fmt.Println(" ", d, "total ", d*time.Duration(1000000)/time.Duration(headNum+1), "per million blocks")
diff --git a/eth/filters/filter.go b/eth/filters/filter.go
index ea9ccf2f9..3a2226f6b 100644
--- a/eth/filters/filter.go
+++ b/eth/filters/filter.go
@@ -19,11 +19,9 @@ package filters
import (
"context"
"math/big"
- "sync"
"time"
"github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/common/bitutil"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/types"
@@ -37,140 +35,143 @@ type Backend interface {
EventMux() *event.TypeMux
HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
- BloomBitsSections() uint64
- BloomBitsConfig() BloomConfig
+
SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
- GetBloomBits(ctx context.Context, bitIdx uint64, sectionIdxList []uint64) ([][]byte, error)
-}
-type BloomConfig struct {
- SectionSize uint64
- MaxRequestLen int
- MaxRequestWait time.Duration
+ BloomStatus() (uint64, uint64)
+ ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
}
// Filter can be used to retrieve and filter logs.
type Filter struct {
- backend Backend
- bloomBitsConfig BloomConfig
+ backend Backend
db ethdb.Database
begin, end int64
addresses []common.Address
topics [][]common.Hash
- decompress func([]byte, int) ([]byte, error)
- matcher *bloombits.Matcher
+ matcher *bloombits.Matcher
}
// New creates a new filter which uses a bloom filter on blocks to figure out whether
// a particular block is interesting or not.
func New(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter {
+ size, _ := backend.BloomStatus()
+
return &Filter{
- backend: backend,
- begin: begin,
- end: end,
- addresses: addresses,
- topics: topics,
- bloomBitsConfig: backend.BloomBitsConfig(),
- db: backend.ChainDb(),
- matcher: bloombits.NewMatcher(backend.BloomBitsConfig().SectionSize, addresses, topics),
- decompress: bitutil.DecompressBytes,
+ backend: backend,
+ begin: begin,
+ end: end,
+ addresses: addresses,
+ topics: topics,
+ db: backend.ChainDb(),
+ matcher: bloombits.NewMatcher(size, addresses, topics),
}
}
-// FindOnce searches the blockchain for matching log entries, returning
-// all matching entries from the first block that contains matches,
-// updating the start point of the filter accordingly. If no results are
-// found, a nil slice is returned.
-func (f *Filter) FindOnce(ctx context.Context) ([]*types.Log, error) {
- head, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
- if head == nil {
+// Logs searches the blockchain for matching log entries, returning all from the
+// first block that contains matches, updating the start of the filter accordingly.
+func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
+ // Figure out the limits of the filter range
+ header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
+ if header == nil {
return nil, nil
}
- headBlockNumber := head.Number.Uint64()
+ head := header.Number.Uint64()
- var beginBlockNo uint64 = uint64(f.begin)
if f.begin == -1 {
- beginBlockNo = headBlockNumber
+ f.begin = int64(head)
}
- var endBlockNo uint64 = uint64(f.end)
+ end := uint64(f.end)
if f.end == -1 {
- endBlockNo = headBlockNumber
+ end = head
}
-
- logs, blockNumber, err := f.getLogs(ctx, beginBlockNo, endBlockNo)
- f.begin = int64(blockNumber + 1)
- return logs, err
-}
-
-// Run filters logs with the current parameters set
-func (f *Filter) Find(ctx context.Context) (logs []*types.Log, err error) {
- for {
- newLogs, err := f.FindOnce(ctx)
- if len(newLogs) == 0 || err != nil {
+ // Gather all indexed logs, and finish with non indexed ones
+ var (
+ logs []*types.Log
+ err error
+ )
+ size, sections := f.backend.BloomStatus()
+ if indexed := sections * size; indexed > uint64(f.begin) {
+ if indexed > end {
+ logs, err = f.indexedLogs(ctx, end)
+ } else {
+ logs, err = f.indexedLogs(ctx, indexed-1)
+ }
+ if err != nil {
return logs, err
}
- logs = append(logs, newLogs...)
}
+ rest, err := f.unindexedLogs(ctx, end)
+ logs = append(logs, rest...)
+ return logs, err
}
-// nextRequest returns the next request to retrieve for the bloombits matcher
-func (f *Filter) nextRequest() (bloombits uint, sections []uint64) {
- bloomIndex, ok := f.matcher.AllocSectionQueue()
- if !ok {
- return 0, nil
+// indexedLogs returns the logs matching the filter criteria based on the bloom
+// bits indexed available locally or via the network.
+func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
+ // Create a matcher session and request servicing from the backend
+ matches := make(chan uint64, 64)
+
+ session, err := f.matcher.Start(uint64(f.begin), end, matches)
+ if err != nil {
+ return nil, err
}
- if f.bloomBitsConfig.MaxRequestWait > 0 &&
- (f.bloomBitsConfig.MaxRequestLen <= 1 || // SectionCount is always greater than zero after a successful alloc
- f.matcher.SectionCount(bloomIndex) < f.bloomBitsConfig.MaxRequestLen) {
- time.Sleep(f.bloomBitsConfig.MaxRequestWait)
+ defer session.Close(time.Second)
+
+ f.backend.ServiceFilter(ctx, session)
+
+ // Iterate over the matches until exhausted or context closed
+ var logs []*types.Log
+
+ for {
+ select {
+ case number, ok := <-matches:
+ // Abort if all matches have been fulfilled
+ if !ok {
+ f.begin = int64(end) + 1
+ return logs, nil
+ }
+ // Retrieve the suggested block and pull any truly matching logs
+ header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
+ if header == nil || err != nil {
+ return logs, err
+ }
+ found, err := f.checkMatches(ctx, header)
+ if err != nil {
+ return logs, err
+ }
+ logs = append(logs, found...)
+
+ case <-ctx.Done():
+ return logs, ctx.Err()
+ }
}
- return bloomIndex, f.matcher.FetchSections(bloomIndex, f.bloomBitsConfig.MaxRequestLen)
}
-// serveMatcher serves the bloombits matcher by fetching the requested vectors
-// through the filter backend
-func (f *Filter) serveMatcher(ctx context.Context, stop chan struct{}, wg *sync.WaitGroup) chan error {
- errChn := make(chan error, 1)
- wg.Add(10)
- for i := 0; i < 10; i++ {
- go func(i int) {
- defer wg.Done()
-
- for {
- b, s := f.nextRequest()
- if s == nil {
- return
- }
- data, err := f.backend.GetBloomBits(ctx, uint64(b), s)
- if err != nil {
- select {
- case errChn <- err:
- case <-stop:
- }
- return
- }
- decomp := make([][]byte, len(data))
- for i, d := range data {
- var err error
- if decomp[i], err = f.decompress(d, int(f.bloomBitsConfig.SectionSize/8)); err != nil {
- select {
- case errChn <- err:
- case <-stop:
- }
- return
- }
- }
- f.matcher.Deliver(b, s, decomp)
+// indexedLogs returns the logs matching the filter criteria based on raw block
+// iteration and bloom matching.
+func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
+ var logs []*types.Log
+
+ for ; f.begin <= int64(end); f.begin++ {
+ header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
+ if header == nil || err != nil {
+ return logs, err
+ }
+ if bloomFilter(header.Bloom, f.addresses, f.topics) {
+ found, err := f.checkMatches(ctx, header)
+ if err != nil {
+ return logs, err
}
- }(i)
+ logs = append(logs, found...)
+ }
}
-
- return errChn
+ return logs, nil
}
// checkMatches checks if the receipts belonging to the given header contain any log events that
@@ -192,83 +193,6 @@ func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs [
return nil, nil
}
-func (f *Filter) getLogs(ctx context.Context, start, end uint64) (logs []*types.Log, blockNumber uint64, err error) {
- haveBloomBitsBefore := f.backend.BloomBitsSections() * f.bloomBitsConfig.SectionSize
- if haveBloomBitsBefore > start {
- e := end
- if haveBloomBitsBefore <= e {
- e = haveBloomBitsBefore - 1
- }
-
- stop := make(chan struct{})
- var wg sync.WaitGroup
- matches := f.matcher.Start(start, e)
- errChn := f.serveMatcher(ctx, stop, &wg)
-
- defer func() {
- f.matcher.Stop()
- close(stop)
- wg.Wait()
- }()
-
- loop:
- for {
- select {
- case i, ok := <-matches:
- if !ok {
- break loop
- }
-
- blockNumber := rpc.BlockNumber(i)
- header, err := f.backend.HeaderByNumber(ctx, blockNumber)
- if header == nil || err != nil {
- return logs, end, err
- }
-
- logs, err := f.checkMatches(ctx, header)
- if err != nil {
- return nil, end, err
- }
- if logs != nil {
- return logs, i, nil
- }
- case err := <-errChn:
- return logs, end, err
- case <-ctx.Done():
- return nil, end, ctx.Err()
- }
- }
-
- if end < haveBloomBitsBefore {
- return logs, end, nil
- }
- start = haveBloomBitsBefore
- }
-
- // search the rest with regular block-by-block bloom filtering
- for i := start; i <= end; i++ {
- blockNumber := rpc.BlockNumber(i)
- header, err := f.backend.HeaderByNumber(ctx, blockNumber)
- if header == nil || err != nil {
- return logs, end, err
- }
-
- // Use bloom filtering to see if this block is interesting given the
- // current parameters
- if f.bloomFilter(header.Bloom) {
- logs, err := f.checkMatches(ctx, header)
- if err != nil {
- return nil, end, err
- }
- if logs != nil {
- return logs, i, nil
- }
- }
- }
-
- return logs, end, nil
-}
-
func includes(addresses []common.Address, a common.Address) bool {
for _, addr := range addresses {
if addr == a {
@@ -323,10 +247,6 @@ Logs:
return ret
}
-func (f *Filter) bloomFilter(bloom types.Bloom) bool {
- return bloomFilter(bloom, f.addresses, f.topics)
-}
-
func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]common.Hash) bool {
if len(addresses) > 0 {
var included bool
diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go
index 140fad555..664ce07a5 100644
--- a/eth/filters/filter_system_test.go
+++ b/eth/filters/filter_system_test.go
@@ -20,12 +20,14 @@ import (
"context"
"fmt"
"math/big"
+ "math/rand"
"reflect"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
@@ -85,29 +87,35 @@ func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subsc
return b.chainFeed.Subscribe(ch)
}
-func (b *testBackend) GetBloomBits(ctx context.Context, bitIdx uint64, sectionIdxList []uint64) ([][]byte, error) {
- results := make([][]byte, len(sectionIdxList))
- var err error
- for i, sectionIdx := range sectionIdxList {
- sectionHead := core.GetCanonicalHash(b.db, (sectionIdx+1)*testBloomBitsSection-1)
- results[i], err = core.GetBloomBits(b.db, bitIdx, sectionIdx, sectionHead)
- if err != nil {
- return nil, err
- }
- }
- return results, nil
+func (b *testBackend) BloomStatus() (uint64, uint64) {
+ return params.BloomBitsBlocks, b.sections
}
-func (b *testBackend) BloomBitsSections() uint64 {
- return b.sections
-}
+func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
+ requests := make(chan chan *bloombits.Retrieval)
-func (b *testBackend) BloomBitsConfig() BloomConfig {
- return BloomConfig{
- SectionSize: testBloomBitsSection,
- MaxRequestLen: 16,
- MaxRequestWait: 0,
- }
+ go session.Multiplex(16, 0, requests)
+ go func() {
+ for {
+ // Wait for a service request or a shutdown
+ select {
+ case <-ctx.Done():
+ return
+
+ case request := <-requests:
+ task := <-request
+
+ task.Bitsets = make([][]byte, len(task.Sections))
+ for i, section := range task.Sections {
+ if rand.Int()%4 != 0 { // Handle occasional missing deliveries
+ head := core.GetCanonicalHash(b.db, (section+1)*params.BloomBitsBlocks-1)
+ task.Bitsets[i] = core.GetBloomBits(b.db, task.Bit, section, head)
+ }
+ }
+ request <- task
+ }
+ }
+ }()
}
// TestBlockSubscription tests if a block subscription returns block hashes for posted chain events.
@@ -126,7 +134,7 @@ func TestBlockSubscription(t *testing.T) {
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
- api = NewPublicFilterAPI(backend, false, 0)
+ api = NewPublicFilterAPI(backend, false)
genesis = new(core.Genesis).MustCommit(db)
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, db, 10, func(i int, gen *core.BlockGen) {})
chainEvents = []core.ChainEvent{}
@@ -183,7 +191,7 @@ func TestPendingTxFilter(t *testing.T) {
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
- api = NewPublicFilterAPI(backend, false, 0)
+ api = NewPublicFilterAPI(backend, false)
transactions = []*types.Transaction{
types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil),
@@ -246,7 +254,7 @@ func TestLogFilterCreation(t *testing.T) {
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
- api = NewPublicFilterAPI(backend, false, 0)
+ api = NewPublicFilterAPI(backend, false)
testCases = []struct {
crit FilterCriteria
@@ -295,7 +303,7 @@ func TestInvalidLogFilterCreation(t *testing.T) {
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
- api = NewPublicFilterAPI(backend, false, 0)
+ api = NewPublicFilterAPI(backend, false)
)
// different situations where log filter creation should fail.
@@ -325,7 +333,7 @@ func TestLogFilter(t *testing.T) {
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
- api = NewPublicFilterAPI(backend, false, 0)
+ api = NewPublicFilterAPI(backend, false)
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
@@ -442,7 +450,7 @@ func TestPendingLogsSubscription(t *testing.T) {
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
- api = NewPublicFilterAPI(backend, false, 0)
+ api = NewPublicFilterAPI(backend, false)
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go
index f1c6481d7..11235e95a 100644
--- a/eth/filters/filter_test.go
+++ b/eth/filters/filter_test.go
@@ -32,8 +32,6 @@ import (
"github.com/ethereum/go-ethereum/params"
)
-const testBloomBitsSection = 4096
-
func makeReceipt(addr common.Address) *types.Receipt {
receipt := types.NewReceipt(nil, false, new(big.Int))
receipt.Logs = []*types.Log{
@@ -101,7 +99,7 @@ func BenchmarkFilters(b *testing.B) {
filter := New(backend, 0, -1, []common.Address{addr1, addr2, addr3, addr4}, nil)
for i := 0; i < b.N; i++ {
- logs, _ := filter.Find(context.Background())
+ logs, _ := filter.Logs(context.Background())
if len(logs) != 4 {
b.Fatal("expected 4 logs, got", len(logs))
}
@@ -189,13 +187,13 @@ func TestFilters(t *testing.T) {
filter := New(backend, 0, -1, []common.Address{addr}, [][]common.Hash{{hash1, hash2, hash3, hash4}})
- logs, _ := filter.Find(context.Background())
+ logs, _ := filter.Logs(context.Background())
if len(logs) != 4 {
t.Error("expected 4 log, got", len(logs))
}
filter = New(backend, 900, 999, []common.Address{addr}, [][]common.Hash{{hash3}})
- logs, _ = filter.Find(context.Background())
+ logs, _ = filter.Logs(context.Background())
if len(logs) != 1 {
t.Error("expected 1 log, got", len(logs))
}
@@ -204,7 +202,7 @@ func TestFilters(t *testing.T) {
}
filter = New(backend, 990, -1, []common.Address{addr}, [][]common.Hash{{hash3}})
- logs, _ = filter.Find(context.Background())
+ logs, _ = filter.Logs(context.Background())
if len(logs) != 1 {
t.Error("expected 1 log, got", len(logs))
}
@@ -214,7 +212,7 @@ func TestFilters(t *testing.T) {
filter = New(backend, 1, 10, nil, [][]common.Hash{{hash1, hash2}})
- logs, _ = filter.Find(context.Background())
+ logs, _ = filter.Logs(context.Background())
if len(logs) != 2 {
t.Error("expected 2 log, got", len(logs))
}
@@ -222,7 +220,7 @@ func TestFilters(t *testing.T) {
failHash := common.BytesToHash([]byte("fail"))
filter = New(backend, 0, -1, nil, [][]common.Hash{{failHash}})
- logs, _ = filter.Find(context.Background())
+ logs, _ = filter.Logs(context.Background())
if len(logs) != 0 {
t.Error("expected 0 log, got", len(logs))
}
@@ -230,14 +228,14 @@ func TestFilters(t *testing.T) {
failAddr := common.BytesToAddress([]byte("failmenow"))
filter = New(backend, 0, -1, []common.Address{failAddr}, nil)
- logs, _ = filter.Find(context.Background())
+ logs, _ = filter.Logs(context.Background())
if len(logs) != 0 {
t.Error("expected 0 log, got", len(logs))
}
filter = New(backend, 0, -1, nil, [][]common.Hash{{failHash}, {hash1}})
- logs, _ = filter.Find(context.Background())
+ logs, _ = filter.Logs(context.Background())
if len(logs) != 0 {
t.Error("expected 0 log, got", len(logs))
}
diff --git a/eth/handler.go b/eth/handler.go
index aadd771df..28ae208c0 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -49,8 +49,6 @@ const (
// txChanSize is the size of channel listening to TxPreEvent.
// The number is referenced from the size of tx pool.
txChanSize = 4096
-
- bloomBitsSection = 4096
)
var (
@@ -94,8 +92,6 @@ type ProtocolManager struct {
quitSync chan struct{}
noMorePeers chan struct{}
- lesServer LesServer
-
// wait group is used for graceful shutdowns during downloading
// and processing
wg sync.WaitGroup
@@ -118,7 +114,6 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
}
-
// Figure out whether to allow fast sync or not
if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {
log.Warn("Blockchain not empty, fast sync disabled")