From f7a71996fbbe9cea4445600ffa3c232a6cf42803 Mon Sep 17 00:00:00 2001 From: Jeffrey Wilcke Date: Sun, 30 Aug 2015 10:04:59 +0200 Subject: core, event/filter, xeth: refactored filter system Moved the filtering system from `event` to `eth/filters` package and removed the `core.Filter` object. The `filters.Filter` object now requires a `common.Database` rather than a `eth.Backend` and invokes the `core.GetBlockByX` directly rather than thru a "manager". --- eth/filters/filter.go | 211 +++++++++++++++++++++++++++++++++++++++++++ eth/filters/filter_system.go | 133 +++++++++++++++++++++++++++ 2 files changed, 344 insertions(+) create mode 100644 eth/filters/filter.go create mode 100644 eth/filters/filter_system.go (limited to 'eth') diff --git a/eth/filters/filter.go b/eth/filters/filter.go new file mode 100644 index 000000000..b7f795607 --- /dev/null +++ b/eth/filters/filter.go @@ -0,0 +1,211 @@ +// Copyright 2014 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package filters + +import ( + "math" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/types" +) + +type AccountChange struct { + Address, StateAddress []byte +} + +// Filtering interface +type Filter struct { + db common.Database + earliest int64 + latest int64 + skip int + address []common.Address + max int + topics [][]common.Hash + + BlockCallback func(*types.Block, state.Logs) + TransactionCallback func(*types.Transaction) + LogsCallback func(state.Logs) +} + +// Create a new filter which uses a bloom filter on blocks to figure out whether a particular block +// is interesting or not. +func New(db common.Database) *Filter { + return &Filter{db: db} +} + +// Set the earliest and latest block for filtering. +// -1 = latest block (i.e., the current block) +// hash = particular hash from-to +func (self *Filter) SetEarliestBlock(earliest int64) { + self.earliest = earliest +} + +func (self *Filter) SetLatestBlock(latest int64) { + self.latest = latest +} + +func (self *Filter) SetAddress(addr []common.Address) { + self.address = addr +} + +func (self *Filter) SetTopics(topics [][]common.Hash) { + self.topics = topics +} + +func (self *Filter) SetMax(max int) { + self.max = max +} + +func (self *Filter) SetSkip(skip int) { + self.skip = skip +} + +// Run filters logs with the current parameters set +func (self *Filter) Find() state.Logs { + earliestBlock := core.GetCurrentBlock(self.db) + var earliestBlockNo uint64 = uint64(self.earliest) + if self.earliest == -1 { + earliestBlockNo = earliestBlock.NumberU64() + } + var latestBlockNo uint64 = uint64(self.latest) + if self.latest == -1 { + latestBlockNo = earliestBlock.NumberU64() + } + + var ( + logs state.Logs + block = core.GetBlockByNumber(self.db, latestBlockNo) + ) + +done: + for i := 0; block != nil; i++ { + // Quit on latest + switch { + case block.NumberU64() == 0: + break done + case block.NumberU64() < earliestBlockNo: + break done + case self.max <= len(logs): + break done + } + + // Use bloom filtering to see if this block is interesting given the + // current parameters + if self.bloomFilter(block) { + // Get the logs of the block + var ( + receipts = core.GetBlockReceipts(self.db, block.Hash()) + unfiltered state.Logs + ) + for _, receipt := range receipts { + unfiltered = append(unfiltered, receipt.Logs()...) + } + logs = append(logs, self.FilterLogs(unfiltered)...) + } + + block = core.GetBlockByHash(self.db, block.ParentHash()) + } + + skip := int(math.Min(float64(len(logs)), float64(self.skip))) + + return logs[skip:] +} + +func includes(addresses []common.Address, a common.Address) bool { + for _, addr := range addresses { + if addr == a { + return true + } + } + + return false +} + +func (self *Filter) FilterLogs(logs state.Logs) state.Logs { + var ret state.Logs + + // Filter the logs for interesting stuff +Logs: + for _, log := range logs { + if len(self.address) > 0 && !includes(self.address, log.Address) { + continue + } + + logTopics := make([]common.Hash, len(self.topics)) + copy(logTopics, log.Topics) + + // If the to filtered topics is greater than the amount of topics in + // logs, skip. + if len(self.topics) > len(log.Topics) { + continue Logs + } + + for i, topics := range self.topics { + var match bool + for _, topic := range topics { + // common.Hash{} is a match all (wildcard) + if (topic == common.Hash{}) || log.Topics[i] == topic { + match = true + break + } + } + + if !match { + continue Logs + } + + } + + ret = append(ret, log) + } + + return ret +} + +func (self *Filter) bloomFilter(block *types.Block) bool { + if len(self.address) > 0 { + var included bool + for _, addr := range self.address { + if types.BloomLookup(block.Bloom(), addr) { + included = true + break + } + } + + if !included { + return false + } + } + + for _, sub := range self.topics { + var included bool + for _, topic := range sub { + if (topic == common.Hash{}) || types.BloomLookup(block.Bloom(), topic) { + included = true + break + } + } + if !included { + return false + } + } + + return true +} diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go new file mode 100644 index 000000000..9ad73a896 --- /dev/null +++ b/eth/filters/filter_system.go @@ -0,0 +1,133 @@ +// Copyright 2014 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// package filters implements an ethereum filtering system for block, +// transactions and log events. +package filters + +import ( + "sync" + + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/event" +) + +// FilterSystem manages filters that filter specific events such as +// block, transaction and log events. The Filtering system can be used to listen +// for specific LOG events fires by the EVM (Ethereum Virtual Machine). +type FilterSystem struct { + eventMux *event.TypeMux + + filterMu sync.RWMutex + filterId int + filters map[int]*Filter + + quit chan struct{} +} + +// NewFilterSystem returns a newly allocated filter manager +func NewFilterSystem(mux *event.TypeMux) *FilterSystem { + fs := &FilterSystem{ + eventMux: mux, + filters: make(map[int]*Filter), + } + go fs.filterLoop() + return fs +} + +// Stop quits the filter loop required for polling events +func (fs *FilterSystem) Stop() { + close(fs.quit) +} + +// Add adds a filter to the filter manager +func (fs *FilterSystem) Add(filter *Filter) (id int) { + fs.filterMu.Lock() + defer fs.filterMu.Unlock() + id = fs.filterId + fs.filters[id] = filter + fs.filterId++ + + return id +} + +// Remove removes a filter by filter id +func (fs *FilterSystem) Remove(id int) { + fs.filterMu.Lock() + defer fs.filterMu.Unlock() + if _, ok := fs.filters[id]; ok { + delete(fs.filters, id) + } +} + +// Get retrieves a filter installed using Add The filter may not be modified. +func (fs *FilterSystem) Get(id int) *Filter { + fs.filterMu.RLock() + defer fs.filterMu.RUnlock() + return fs.filters[id] +} + +// filterLoop waits for specific events from ethereum and fires their handlers +// when the filter matches the requirements. +func (fs *FilterSystem) filterLoop() { + // Subscribe to events + events := fs.eventMux.Subscribe( + //core.PendingBlockEvent{}, + core.ChainEvent{}, + core.TxPreEvent{}, + state.Logs(nil)) + +out: + for { + select { + case <-fs.quit: + break out + case event := <-events.Chan(): + switch event := event.(type) { + case core.ChainEvent: + fs.filterMu.RLock() + for _, filter := range fs.filters { + if filter.BlockCallback != nil { + filter.BlockCallback(event.Block, event.Logs) + } + } + fs.filterMu.RUnlock() + + case core.TxPreEvent: + fs.filterMu.RLock() + for _, filter := range fs.filters { + if filter.TransactionCallback != nil { + filter.TransactionCallback(event.Tx) + } + } + fs.filterMu.RUnlock() + + case state.Logs: + fs.filterMu.RLock() + for _, filter := range fs.filters { + if filter.LogsCallback != nil { + msgs := filter.FilterLogs(event) + if len(msgs) > 0 { + filter.LogsCallback(msgs) + } + } + } + fs.filterMu.RUnlock() + } + } + } +} -- cgit v1.2.3