aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJeffrey Wilcke <geffobscura@gmail.com>2016-02-13 08:40:44 +0800
committerJeffrey Wilcke <geffobscura@gmail.com>2016-02-13 20:14:02 +0800
commit987c1a595a6a318ac83b68e3b87812497070bf9b (patch)
tree30859d085d7533c7ad610eed8a186f88724eadfe
parentb05e472c076d30035233d6a8b5fb3360b236e3ff (diff)
downloaddexon-987c1a595a6a318ac83b68e3b87812497070bf9b.tar
dexon-987c1a595a6a318ac83b68e3b87812497070bf9b.tar.gz
dexon-987c1a595a6a318ac83b68e3b87812497070bf9b.tar.bz2
dexon-987c1a595a6a318ac83b68e3b87812497070bf9b.tar.lz
dexon-987c1a595a6a318ac83b68e3b87812497070bf9b.tar.xz
dexon-987c1a595a6a318ac83b68e3b87812497070bf9b.tar.zst
dexon-987c1a595a6a318ac83b68e3b87812497070bf9b.zip
eth/filters: ✨ pending logs ✨
Pending logs are now filterable through the Go API. Filter API changed such that each filter type has it's own bucket and adding filter explicitly requires you specify the bucket to put it in.
-rw-r--r--core/blockchain.go2
-rw-r--r--core/blockchain_test.go4
-rw-r--r--core/events.go7
-rw-r--r--eth/filters/api.go29
-rw-r--r--eth/filters/filter.go3
-rw-r--r--eth/filters/filter_system.go101
-rw-r--r--eth/filters/filter_system_test.go26
-rw-r--r--miner/worker.go23
8 files changed, 143 insertions, 52 deletions
diff --git a/core/blockchain.go b/core/blockchain.go
index 95ed06d8d..22dd617ad 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -1358,7 +1358,7 @@ func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
go self.eventMux.Post(RemovedTransactionEvent{diff})
}
if len(deletedLogs) > 0 {
- go self.eventMux.Post(RemovedLogEvent{deletedLogs})
+ go self.eventMux.Post(RemovedLogsEvent{deletedLogs})
}
return nil
diff --git a/core/blockchain_test.go b/core/blockchain_test.go
index b4ac1696a..1bb5f646d 100644
--- a/core/blockchain_test.go
+++ b/core/blockchain_test.go
@@ -982,7 +982,7 @@ func TestLogReorgs(t *testing.T) {
evmux := &event.TypeMux{}
blockchain, _ := NewBlockChain(db, FakePow{}, evmux)
- subs := evmux.Subscribe(RemovedLogEvent{})
+ subs := evmux.Subscribe(RemovedLogsEvent{})
chain, _ := GenerateChain(genesis, db, 2, func(i int, gen *BlockGen) {
if i == 1 {
tx, err := types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), big.NewInt(1000000), new(big.Int), code).SignECDSA(key1)
@@ -1002,7 +1002,7 @@ func TestLogReorgs(t *testing.T) {
}
ev := <-subs.Chan()
- if len(ev.Data.(RemovedLogEvent).Logs) == 0 {
+ if len(ev.Data.(RemovedLogsEvent).Logs) == 0 {
t.Error("expected logs")
}
}
diff --git a/core/events.go b/core/events.go
index 1a760c71c..c23206cad 100644
--- a/core/events.go
+++ b/core/events.go
@@ -30,6 +30,11 @@ type TxPreEvent struct{ Tx *types.Transaction }
// TxPostEvent is posted when a transaction has been processed.
type TxPostEvent struct{ Tx *types.Transaction }
+// PendingLogsEvent is posted pre mining and notifies of pending logs.
+type PendingLogsEvent struct {
+ Logs vm.Logs
+}
+
// NewBlockEvent is posted when a block has been imported.
type NewBlockEvent struct{ Block *types.Block }
@@ -40,7 +45,7 @@ type NewMinedBlockEvent struct{ Block *types.Block }
type RemovedTransactionEvent struct{ Txs types.Transactions }
// RemovedLogEvent is posted when a reorg happens
-type RemovedLogEvent struct{ Logs vm.Logs }
+type RemovedLogsEvent struct{ Logs vm.Logs }
// ChainSplit is posted when a new head is detected
type ChainSplitEvent struct {
diff --git a/eth/filters/api.go b/eth/filters/api.go
index 148daa649..6cd184b80 100644
--- a/eth/filters/api.go
+++ b/eth/filters/api.go
@@ -142,7 +142,11 @@ func (s *PublicFilterAPI) NewBlockFilter() (string, error) {
s.blockMu.Lock()
filter := New(s.chainDb)
- id := s.filterManager.Add(filter)
+ id, err := s.filterManager.Add(filter, ChainFilter)
+ if err != nil {
+ return "", err
+ }
+
s.blockQueue[id] = &hashQueue{timeout: time.Now()}
filter.BlockCallback = func(block *types.Block, logs vm.Logs) {
@@ -174,7 +178,11 @@ func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
defer s.transactionMu.Unlock()
filter := New(s.chainDb)
- id := s.filterManager.Add(filter)
+ id, err := s.filterManager.Add(filter, PendingTxFilter)
+ if err != nil {
+ return "", err
+ }
+
s.transactionQueue[id] = &hashQueue{timeout: time.Now()}
filter.TransactionCallback = func(tx *types.Transaction) {
@@ -194,12 +202,16 @@ func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
}
// newLogFilter creates a new log filter.
-func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash) int {
+func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash) (int, error) {
s.logMu.Lock()
defer s.logMu.Unlock()
filter := New(s.chainDb)
- id := s.filterManager.Add(filter)
+ id, err := s.filterManager.Add(filter, LogFilter)
+ if err != nil {
+ return 0, err
+ }
+
s.logQueue[id] = &logQueue{timeout: time.Now()}
filter.SetBeginBlock(earliest)
@@ -215,7 +227,7 @@ func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []commo
}
}
- return id
+ return id, nil
}
// NewFilterArgs represents a request to create a new filter.
@@ -352,9 +364,12 @@ func (s *PublicFilterAPI) NewFilter(args NewFilterArgs) (string, error) {
var id int
if len(args.Addresses) > 0 {
- id = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics)
+ id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics)
} else {
- id = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics)
+ id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics)
+ }
+ if err != nil {
+ return "", err
}
s.filterMapMu.Lock()
diff --git a/eth/filters/filter.go b/eth/filters/filter.go
index 2c92d20b1..96af93c4a 100644
--- a/eth/filters/filter.go
+++ b/eth/filters/filter.go
@@ -18,6 +18,7 @@ package filters
import (
"math"
+ "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
@@ -32,6 +33,8 @@ type AccountChange struct {
// Filtering interface
type Filter struct {
+ created time.Time
+
db ethdb.Database
begin, end int64
addresses []common.Address
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index 04e58a08c..b61a493b6 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -19,6 +19,7 @@
package filters
import (
+ "fmt"
"sync"
"time"
@@ -27,26 +28,47 @@ import (
"github.com/ethereum/go-ethereum/event"
)
+// FilterType determines the type of filter and is used to put the filter in to
+// the correct bucket when added.
+type FilterType byte
+
+const (
+ ChainFilter FilterType = iota // new block events filter
+ PendingTxFilter // pending transaction filter
+ LogFilter // new or removed log filter
+ PendingLogFilter // pending log filter
+)
+
// FilterSystem manages filters that filter specific events such as
// block, transaction and log events. The Filtering system can be used to listen
// for specific LOG events fired by the EVM (Ethereum Virtual Machine).
type FilterSystem struct {
filterMu sync.RWMutex
filterId int
- filters map[int]*Filter
- created map[int]time.Time
- sub event.Subscription
+
+ chainFilters map[int]*Filter
+ pendingTxFilters map[int]*Filter
+ logFilters map[int]*Filter
+ pendingLogFilters map[int]*Filter
+
+ // generic is an ugly hack for Get
+ generic map[int]*Filter
+
+ sub event.Subscription
}
// NewFilterSystem returns a newly allocated filter manager
func NewFilterSystem(mux *event.TypeMux) *FilterSystem {
fs := &FilterSystem{
- filters: make(map[int]*Filter),
- created: make(map[int]time.Time),
+ chainFilters: make(map[int]*Filter),
+ pendingTxFilters: make(map[int]*Filter),
+ logFilters: make(map[int]*Filter),
+ pendingLogFilters: make(map[int]*Filter),
+ generic: make(map[int]*Filter),
}
fs.sub = mux.Subscribe(
- //core.PendingBlockEvent{},
- core.RemovedLogEvent{},
+ core.PendingLogsEvent{},
+ core.RemovedLogsEvent{},
core.ChainEvent{},
core.TxPreEvent{},
vm.Logs(nil),
@@ -61,15 +83,30 @@ func (fs *FilterSystem) Stop() {
}
// Add adds a filter to the filter manager
-func (fs *FilterSystem) Add(filter *Filter) (id int) {
+func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error) {
fs.filterMu.Lock()
defer fs.filterMu.Unlock()
- id = fs.filterId
- fs.filters[id] = filter
- fs.created[id] = time.Now()
+
+ id := fs.filterId
+ filter.created = time.Now()
+
+ switch filterType {
+ case ChainFilter:
+ fs.chainFilters[id] = filter
+ case PendingTxFilter:
+ fs.pendingTxFilters[id] = filter
+ case LogFilter:
+ fs.logFilters[id] = filter
+ case PendingLogFilter:
+ fs.pendingLogFilters[id] = filter
+ default:
+ return 0, fmt.Errorf("unknown filter type %v", filterType)
+ }
+ fs.generic[id] = filter
+
fs.filterId++
- return id
+ return id, nil
}
// Remove removes a filter by filter id
@@ -77,16 +114,18 @@ func (fs *FilterSystem) Remove(id int) {
fs.filterMu.Lock()
defer fs.filterMu.Unlock()
- delete(fs.filters, id)
- delete(fs.created, id)
+ delete(fs.chainFilters, id)
+ delete(fs.pendingTxFilters, id)
+ delete(fs.logFilters, id)
+ delete(fs.pendingLogFilters, id)
+ delete(fs.generic, id)
}
-// Get retrieves a filter installed using Add The filter may not be modified.
func (fs *FilterSystem) Get(id int) *Filter {
fs.filterMu.RLock()
defer fs.filterMu.RUnlock()
- return fs.filters[id]
+ return fs.generic[id]
}
// filterLoop waits for specific events from ethereum and fires their handlers
@@ -96,17 +135,16 @@ func (fs *FilterSystem) filterLoop() {
switch ev := event.Data.(type) {
case core.ChainEvent:
fs.filterMu.RLock()
- for id, filter := range fs.filters {
- if filter.BlockCallback != nil && !fs.created[id].After(event.Time) {
+ for _, filter := range fs.chainFilters {
+ if filter.BlockCallback != nil && !filter.created.After(event.Time) {
filter.BlockCallback(ev.Block, ev.Logs)
}
}
fs.filterMu.RUnlock()
-
case core.TxPreEvent:
fs.filterMu.RLock()
- for id, filter := range fs.filters {
- if filter.TransactionCallback != nil && !fs.created[id].After(event.Time) {
+ for _, filter := range fs.pendingTxFilters {
+ if filter.TransactionCallback != nil && !filter.created.After(event.Time) {
filter.TransactionCallback(ev.Tx)
}
}
@@ -114,25 +152,34 @@ func (fs *FilterSystem) filterLoop() {
case vm.Logs:
fs.filterMu.RLock()
- for id, filter := range fs.filters {
- if filter.LogCallback != nil && !fs.created[id].After(event.Time) {
+ for _, filter := range fs.logFilters {
+ if filter.LogCallback != nil && !filter.created.After(event.Time) {
for _, log := range filter.FilterLogs(ev) {
filter.LogCallback(log, false)
}
}
}
fs.filterMu.RUnlock()
-
- case core.RemovedLogEvent:
+ case core.RemovedLogsEvent:
fs.filterMu.RLock()
- for id, filter := range fs.filters {
- if filter.LogCallback != nil && !fs.created[id].After(event.Time) {
+ for _, filter := range fs.logFilters {
+ if filter.LogCallback != nil && !filter.created.After(event.Time) {
for _, removedLog := range ev.Logs {
filter.LogCallback(removedLog, true)
}
}
}
fs.filterMu.RUnlock()
+ case core.PendingLogsEvent:
+ fs.filterMu.RLock()
+ for _, filter := range fs.pendingLogFilters {
+ if filter.LogCallback != nil && !filter.created.After(event.Time) {
+ for _, pendingLog := range ev.Logs {
+ filter.LogCallback(pendingLog, false)
+ }
+ }
+ }
+ fs.filterMu.RUnlock()
}
}
}
diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go
index 7ddeb02bc..3ad7dd9cb 100644
--- a/eth/filters/filter_system_test.go
+++ b/eth/filters/filter_system_test.go
@@ -18,6 +18,7 @@ func TestCallbacks(t *testing.T) {
txDone = make(chan struct{})
logDone = make(chan struct{})
removedLogDone = make(chan struct{})
+ pendingLogDone = make(chan struct{})
)
blockFilter := &Filter{
@@ -37,7 +38,6 @@ func TestCallbacks(t *testing.T) {
}
},
}
-
removedLogFilter := &Filter{
LogCallback: func(l *vm.Log, oob bool) {
if oob {
@@ -45,16 +45,23 @@ func TestCallbacks(t *testing.T) {
}
},
}
+ pendingLogFilter := &Filter{
+ LogCallback: func(*vm.Log, bool) {
+ close(pendingLogDone)
+ },
+ }
- fs.Add(blockFilter)
- fs.Add(txFilter)
- fs.Add(logFilter)
- fs.Add(removedLogFilter)
+ fs.Add(blockFilter, ChainFilter)
+ fs.Add(txFilter, PendingTxFilter)
+ fs.Add(logFilter, LogFilter)
+ fs.Add(removedLogFilter, LogFilter)
+ fs.Add(pendingLogFilter, PendingLogFilter)
mux.Post(core.ChainEvent{})
mux.Post(core.TxPreEvent{})
- mux.Post(core.RemovedLogEvent{vm.Logs{&vm.Log{}}})
mux.Post(vm.Logs{&vm.Log{}})
+ mux.Post(core.RemovedLogsEvent{vm.Logs{&vm.Log{}}})
+ mux.Post(core.PendingLogsEvent{vm.Logs{&vm.Log{}}})
const dura = 5 * time.Second
failTimer := time.NewTimer(dura)
@@ -84,4 +91,11 @@ func TestCallbacks(t *testing.T) {
case <-failTimer.C:
t.Error("removed log filter failed to trigger (timeout)")
}
+
+ failTimer.Reset(dura)
+ select {
+ case <-pendingLogDone:
+ case <-failTimer.C:
+ t.Error("pending log filter failed to trigger (timout)")
+ }
}
diff --git a/miner/worker.go b/miner/worker.go
index 9c29d2250..81f7b16ac 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -243,7 +243,7 @@ func (self *worker) update() {
// Apply transaction to the pending state if we're not mining
if atomic.LoadInt32(&self.mining) == 0 {
self.currentMu.Lock()
- self.current.commitTransactions(types.Transactions{ev.Tx}, self.gasPrice, self.chain)
+ self.current.commitTransactions(self.mux, types.Transactions{ev.Tx}, self.gasPrice, self.chain)
self.currentMu.Unlock()
}
}
@@ -529,7 +529,7 @@ func (self *worker) commitNewWork() {
transactions := append(singleTxOwner, multiTxOwner...)
*/
- work.commitTransactions(transactions, self.gasPrice, self.chain)
+ work.commitTransactions(self.mux, transactions, self.gasPrice, self.chain)
self.eth.TxPool().RemoveTransactions(work.lowGasTxs)
// compute uncles for the new block.
@@ -588,8 +588,10 @@ func (self *worker) commitUncle(work *Work, uncle *types.Header) error {
return nil
}
-func (env *Work) commitTransactions(transactions types.Transactions, gasPrice *big.Int, bc *core.BlockChain) {
+func (env *Work) commitTransactions(mux *event.TypeMux, transactions types.Transactions, gasPrice *big.Int, bc *core.BlockChain) {
gp := new(core.GasPool).AddGas(env.header.GasLimit)
+
+ var coalescedLogs vm.Logs
for _, tx := range transactions {
// We can skip err. It has already been validated in the tx pool
from, _ := tx.From()
@@ -627,7 +629,7 @@ func (env *Work) commitTransactions(transactions types.Transactions, gasPrice *b
env.state.StartRecord(tx.Hash(), common.Hash{}, 0)
- err := env.commitTransaction(tx, bc, gp)
+ err, logs := env.commitTransaction(tx, bc, gp)
switch {
case core.IsGasLimitErr(err):
// ignore the transactor so no nonce errors will be thrown for this account
@@ -643,20 +645,25 @@ func (env *Work) commitTransactions(transactions types.Transactions, gasPrice *b
}
default:
env.tcount++
+ coalescedLogs = append(coalescedLogs, logs...)
}
}
+ if len(coalescedLogs) > 0 {
+ go mux.Post(core.PendingLogsEvent{Logs: coalescedLogs})
+ }
}
-func (env *Work) commitTransaction(tx *types.Transaction, bc *core.BlockChain, gp *core.GasPool) error {
+func (env *Work) commitTransaction(tx *types.Transaction, bc *core.BlockChain, gp *core.GasPool) (error, vm.Logs) {
snap := env.state.Copy()
- receipt, _, _, err := core.ApplyTransaction(bc, gp, env.state, env.header, tx, env.header.GasUsed)
+ receipt, logs, _, err := core.ApplyTransaction(bc, gp, env.state, env.header, tx, env.header.GasUsed)
if err != nil {
env.state.Set(snap)
- return err
+ return err, nil
}
env.txs = append(env.txs, tx)
env.receipts = append(env.receipts, receipt)
- return nil
+
+ return nil, logs
}
// TODO: remove or use