From e1f1d3085c6b868de93313700cac8a325e9b148b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 12 Jul 2018 17:36:07 +0300 Subject: accounts, eth, les: blockhash based filtering on all code paths --- eth/api_backend.go | 4 ++ eth/filters/api.go | 62 +++++++++++++++---------------- eth/filters/bench_test.go | 4 +- eth/filters/filter.go | 77 ++++++++++++++++++++++++++++++--------- eth/filters/filter_system_test.go | 8 ++++ eth/filters/filter_test.go | 16 ++++---- 6 files changed, 111 insertions(+), 60 deletions(-) (limited to 'eth') diff --git a/eth/api_backend.go b/eth/api_backend.go index 016087dfe..03f6012d7 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -70,6 +70,10 @@ func (b *EthAPIBackend) HeaderByNumber(ctx context.Context, blockNr rpc.BlockNum return b.eth.blockchain.GetHeaderByNumber(uint64(blockNr)), nil } +func (b *EthAPIBackend) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { + return b.eth.blockchain.GetHeaderByHash(hash), nil +} + func (b *EthAPIBackend) BlockByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Block, error) { // Pending block is only known by the miner if blockNr == rpc.PendingBlockNumber { diff --git a/eth/filters/api.go b/eth/filters/api.go index f964d9232..4e686c0ce 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -28,7 +28,6 @@ import ( ethereum "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" @@ -325,36 +324,26 @@ func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) { // // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*types.Log, error) { - var ( - fromBlock int64 - toBlock int64 - ) - + var filter *Filter if crit.BlockHash != nil { - // look up block number from block hash - if block := rawdb.ReadHeaderNumber(api.chainDb, *crit.BlockHash); block != nil { - // verify block is part of canonical chain - if canonical := rawdb.ReadCanonicalHash(api.chainDb, *block); canonical != *crit.BlockHash { - return nil, fmt.Errorf("Block with hash %s was removed from canonical chain", crit.BlockHash.Hex()) - } - fromBlock = int64(*block) - toBlock = fromBlock - } else { - return nil, fmt.Errorf("Block with hash %s was not found", crit.BlockHash.Hex()) - } + // Block filter requested, construct a single-shot filter + filter = NewBlockFilter(api.backend, *crit.BlockHash, crit.Addresses, crit.Topics) } else { // Convert the RPC block numbers into internal representations + var ( + begin int64 + end int64 + ) if crit.FromBlock == nil { - fromBlock = int64(rpc.LatestBlockNumber) + begin = int64(rpc.LatestBlockNumber) } if crit.ToBlock == nil { - toBlock = int64(rpc.LatestBlockNumber) + end = int64(rpc.LatestBlockNumber) } + // Construct the range filter + filter = NewRangeFilter(api.backend, begin, end, crit.Addresses, crit.Topics) } - - // Create and run the filter to get all the logs - filter := New(api.backend, fromBlock, toBlock, crit.Addresses, crit.Topics) - + // Run the filter and return all the logs logs, err := filter.Logs(ctx) if err != nil { return nil, err @@ -392,17 +381,24 @@ func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*ty return nil, fmt.Errorf("filter not found") } - begin := rpc.LatestBlockNumber.Int64() - if f.crit.FromBlock != nil { - begin = f.crit.FromBlock.Int64() - } - end := rpc.LatestBlockNumber.Int64() - if f.crit.ToBlock != nil { - end = f.crit.ToBlock.Int64() + var filter *Filter + if f.crit.BlockHash != nil { + // Block filter requested, construct a single-shot filter + filter = NewBlockFilter(api.backend, *f.crit.BlockHash, f.crit.Addresses, f.crit.Topics) + } else { + // Convert the RPC block numbers into internal representations + begin := rpc.LatestBlockNumber.Int64() + if f.crit.FromBlock != nil { + begin = f.crit.FromBlock.Int64() + } + end := rpc.LatestBlockNumber.Int64() + if f.crit.ToBlock != nil { + end = f.crit.ToBlock.Int64() + } + // Construct the range filter + filter = NewRangeFilter(api.backend, begin, end, f.crit.Addresses, f.crit.Topics) } - // Create and run the filter to get all the logs - filter := New(api.backend, begin, end, f.crit.Addresses, f.crit.Topics) - + // Run the filter and return all the logs logs, err := filter.Logs(ctx) if err != nil { return nil, err diff --git a/eth/filters/bench_test.go b/eth/filters/bench_test.go index faffaa70b..c5f681e02 100644 --- a/eth/filters/bench_test.go +++ b/eth/filters/bench_test.go @@ -135,7 +135,7 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64) { var addr common.Address addr[0] = byte(i) addr[1] = byte(i / 256) - filter := New(backend, 0, int64(cnt*sectionSize-1), []common.Address{addr}, nil) + filter := NewRangeFilter(backend, 0, int64(cnt*sectionSize-1), []common.Address{addr}, nil) if _, err := filter.Logs(context.Background()); err != nil { b.Error("filter.Find error:", err) } @@ -192,7 +192,7 @@ func BenchmarkNoBloomBits(b *testing.B) { start := time.Now() mux := new(event.TypeMux) backend := &testBackend{mux, db, 0, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)} - filter := New(backend, 0, int64(*headNum), []common.Address{{}}, nil) + filter := NewRangeFilter(backend, 0, int64(*headNum), []common.Address{{}}, nil) filter.Logs(context.Background()) d := time.Since(start) fmt.Println("Finished running filter benchmarks") diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 7000d74fa..071613ad7 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -18,6 +18,7 @@ package filters import ( "context" + "errors" "math/big" "github.com/ethereum/go-ethereum/common" @@ -33,6 +34,7 @@ type Backend interface { ChainDb() ethdb.Database EventMux() *event.TypeMux HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) + HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error) GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error) @@ -49,17 +51,19 @@ type Backend interface { type Filter struct { backend Backend - db ethdb.Database - begin, end int64 - addresses []common.Address - topics [][]common.Hash + db ethdb.Database + addresses []common.Address + topics [][]common.Hash + + block common.Hash // Block hash if filtering a single block + begin, end int64 // Range interval if filtering multiple blocks matcher *bloombits.Matcher } -// New creates a new filter which uses a bloom filter on blocks to figure out whether -// a particular block is interesting or not. -func New(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter { +// NewRangeFilter creates a new filter which uses a bloom filter on blocks to +// figure out whether a particular block is interesting or not. +func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter { // Flatten the address and topic filter clauses into a single bloombits filter // system. Since the bloombits are not positional, nil topics are permitted, // which get flattened into a nil byte slice. @@ -78,23 +82,52 @@ func New(backend Backend, begin, end int64, addresses []common.Address, topics [ } filters = append(filters, filter) } - // Assemble and return the filter size, _ := backend.BloomStatus() + // Create a generic filter and convert it into a range filter + filter := newFilter(backend, addresses, topics) + + filter.matcher = bloombits.NewMatcher(size, filters) + filter.begin = begin + filter.end = end + + return filter +} + +// NewBlockFilter creates a new filter which directly inspects the contents of +// a block to figure out whether it is interesting or not. +func NewBlockFilter(backend Backend, block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter { + // Create a generic filter and convert it into a block filter + filter := newFilter(backend, addresses, topics) + filter.block = block + return filter +} + +// newFilter creates a generic filter that can either filter based on a block hash, +// or based on range queries. The search criteria needs to be explicitly set. +func newFilter(backend Backend, addresses []common.Address, topics [][]common.Hash) *Filter { return &Filter{ backend: backend, - begin: begin, - end: end, addresses: addresses, topics: topics, db: backend.ChainDb(), - matcher: bloombits.NewMatcher(size, filters), } } // Logs searches the blockchain for matching log entries, returning all from the // first block that contains matches, updating the start of the filter accordingly. func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { + // If we're doing singleton block filtering, execute and return + if f.block != (common.Hash{}) { + header, err := f.backend.HeaderByHash(ctx, f.block) + if err != nil { + return nil, err + } + if header == nil { + return nil, errors.New("unknown block") + } + return f.blockLogs(ctx, header) + } // Figure out the limits of the filter range header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) if header == nil { @@ -187,13 +220,23 @@ func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, e if header == nil || err != nil { return logs, err } - if bloomFilter(header.Bloom, f.addresses, f.topics) { - found, err := f.checkMatches(ctx, header) - if err != nil { - return logs, err - } - logs = append(logs, found...) + found, err := f.blockLogs(ctx, header) + if err != nil { + return logs, err + } + logs = append(logs, found...) + } + return logs, nil +} + +// blockLogs returns the logs matching the filter criteria within a single block. +func (f *Filter) blockLogs(ctx context.Context, header *types.Header) (logs []*types.Log, err error) { + if bloomFilter(header.Bloom, f.addresses, f.topics) { + found, err := f.checkMatches(ctx, header) + if err != nil { + return logs, err } + logs = append(logs, found...) } return logs, nil } diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 2c1e8a2eb..e71080b1a 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -75,6 +75,14 @@ func (b *testBackend) HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumbe return rawdb.ReadHeader(b.db, hash, num), nil } +func (b *testBackend) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { + number := rawdb.ReadHeaderNumber(b.db, hash) + if number == nil { + return nil, nil + } + return rawdb.ReadHeader(b.db, hash, *number), nil +} + func (b *testBackend) GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error) { if number := rawdb.ReadHeaderNumber(b.db, hash); number != nil { return rawdb.ReadReceipts(b.db, hash, *number), nil diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index ccabe955c..396a03d61 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -92,7 +92,7 @@ func BenchmarkFilters(b *testing.B) { } b.ResetTimer() - filter := New(backend, 0, -1, []common.Address{addr1, addr2, addr3, addr4}, nil) + filter := NewRangeFilter(backend, 0, -1, []common.Address{addr1, addr2, addr3, addr4}, nil) for i := 0; i < b.N; i++ { logs, _ := filter.Logs(context.Background()) @@ -175,14 +175,14 @@ func TestFilters(t *testing.T) { rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), receipts[i]) } - filter := New(backend, 0, -1, []common.Address{addr}, [][]common.Hash{{hash1, hash2, hash3, hash4}}) + filter := NewRangeFilter(backend, 0, -1, []common.Address{addr}, [][]common.Hash{{hash1, hash2, hash3, hash4}}) logs, _ := filter.Logs(context.Background()) if len(logs) != 4 { t.Error("expected 4 log, got", len(logs)) } - filter = New(backend, 900, 999, []common.Address{addr}, [][]common.Hash{{hash3}}) + filter = NewRangeFilter(backend, 900, 999, []common.Address{addr}, [][]common.Hash{{hash3}}) logs, _ = filter.Logs(context.Background()) if len(logs) != 1 { t.Error("expected 1 log, got", len(logs)) @@ -191,7 +191,7 @@ func TestFilters(t *testing.T) { t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0]) } - filter = New(backend, 990, -1, []common.Address{addr}, [][]common.Hash{{hash3}}) + filter = NewRangeFilter(backend, 990, -1, []common.Address{addr}, [][]common.Hash{{hash3}}) logs, _ = filter.Logs(context.Background()) if len(logs) != 1 { t.Error("expected 1 log, got", len(logs)) @@ -200,7 +200,7 @@ func TestFilters(t *testing.T) { t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0]) } - filter = New(backend, 1, 10, nil, [][]common.Hash{{hash1, hash2}}) + filter = NewRangeFilter(backend, 1, 10, nil, [][]common.Hash{{hash1, hash2}}) logs, _ = filter.Logs(context.Background()) if len(logs) != 2 { @@ -208,7 +208,7 @@ func TestFilters(t *testing.T) { } failHash := common.BytesToHash([]byte("fail")) - filter = New(backend, 0, -1, nil, [][]common.Hash{{failHash}}) + filter = NewRangeFilter(backend, 0, -1, nil, [][]common.Hash{{failHash}}) logs, _ = filter.Logs(context.Background()) if len(logs) != 0 { @@ -216,14 +216,14 @@ func TestFilters(t *testing.T) { } failAddr := common.BytesToAddress([]byte("failmenow")) - filter = New(backend, 0, -1, []common.Address{failAddr}, nil) + filter = NewRangeFilter(backend, 0, -1, []common.Address{failAddr}, nil) logs, _ = filter.Logs(context.Background()) if len(logs) != 0 { t.Error("expected 0 log, got", len(logs)) } - filter = New(backend, 0, -1, nil, [][]common.Hash{{failHash}, {hash1}}) + filter = NewRangeFilter(backend, 0, -1, nil, [][]common.Hash{{failHash}, {hash1}}) logs, _ = filter.Logs(context.Background()) if len(logs) != 0 { -- cgit v1.2.3