aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2016-02-13 20:53:59 +0800
committerPéter Szilágyi <peterke@gmail.com>2016-02-13 20:53:59 +0800
commitcb85923828d5ea45b53189b3b29ab014d9601f09 (patch)
tree6ced6a59dc3ac84db21ce8021ebc36cfe1f9d8fe
parent770b29fd80b8f25be31c2db5af6904cc5d0688ef (diff)
parent987c1a595a6a318ac83b68e3b87812497070bf9b (diff)
downloaddexon-cb85923828d5ea45b53189b3b29ab014d9601f09.tar
dexon-cb85923828d5ea45b53189b3b29ab014d9601f09.tar.gz
dexon-cb85923828d5ea45b53189b3b29ab014d9601f09.tar.bz2
dexon-cb85923828d5ea45b53189b3b29ab014d9601f09.tar.lz
dexon-cb85923828d5ea45b53189b3b29ab014d9601f09.tar.xz
dexon-cb85923828d5ea45b53189b3b29ab014d9601f09.tar.zst
dexon-cb85923828d5ea45b53189b3b29ab014d9601f09.zip
Merge pull request #2205 from obscuren/pending-filters
eth/filters: ✨ pending logs ✨
-rw-r--r--core/blockchain.go2
-rw-r--r--core/blockchain_test.go4
-rw-r--r--core/events.go7
-rw-r--r--eth/filters/api.go29
-rw-r--r--eth/filters/filter.go3
-rw-r--r--eth/filters/filter_system.go101
-rw-r--r--eth/filters/filter_system_test.go26
-rw-r--r--miner/worker.go23
8 files changed, 143 insertions, 52 deletions
diff --git a/core/blockchain.go b/core/blockchain.go
index 95ed06d8d..22dd617ad 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -1358,7 +1358,7 @@ func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
go self.eventMux.Post(RemovedTransactionEvent{diff})
}
if len(deletedLogs) > 0 {
- go self.eventMux.Post(RemovedLogEvent{deletedLogs})
+ go self.eventMux.Post(RemovedLogsEvent{deletedLogs})
}
return nil
diff --git a/core/blockchain_test.go b/core/blockchain_test.go
index b4ac1696a..1bb5f646d 100644
--- a/core/blockchain_test.go
+++ b/core/blockchain_test.go
@@ -982,7 +982,7 @@ func TestLogReorgs(t *testing.T) {
evmux := &event.TypeMux{}
blockchain, _ := NewBlockChain(db, FakePow{}, evmux)
- subs := evmux.Subscribe(RemovedLogEvent{})
+ subs := evmux.Subscribe(RemovedLogsEvent{})
chain, _ := GenerateChain(genesis, db, 2, func(i int, gen *BlockGen) {
if i == 1 {
tx, err := types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), big.NewInt(1000000), new(big.Int), code).SignECDSA(key1)
@@ -1002,7 +1002,7 @@ func TestLogReorgs(t *testing.T) {
}
ev := <-subs.Chan()
- if len(ev.Data.(RemovedLogEvent).Logs) == 0 {
+ if len(ev.Data.(RemovedLogsEvent).Logs) == 0 {
t.Error("expected logs")
}
}
diff --git a/core/events.go b/core/events.go
index 1a760c71c..c23206cad 100644
--- a/core/events.go
+++ b/core/events.go
@@ -30,6 +30,11 @@ type TxPreEvent struct{ Tx *types.Transaction }
// TxPostEvent is posted when a transaction has been processed.
type TxPostEvent struct{ Tx *types.Transaction }
+// PendingLogsEvent is posted pre mining and notifies of pending logs.
+type PendingLogsEvent struct {
+ Logs vm.Logs
+}
+
// NewBlockEvent is posted when a block has been imported.
type NewBlockEvent struct{ Block *types.Block }
@@ -40,7 +45,7 @@ type NewMinedBlockEvent struct{ Block *types.Block }
type RemovedTransactionEvent struct{ Txs types.Transactions }
// RemovedLogEvent is posted when a reorg happens
-type RemovedLogEvent struct{ Logs vm.Logs }
+type RemovedLogsEvent struct{ Logs vm.Logs }
// ChainSplit is posted when a new head is detected
type ChainSplitEvent struct {
diff --git a/eth/filters/api.go b/eth/filters/api.go
index 148daa649..6cd184b80 100644
--- a/eth/filters/api.go
+++ b/eth/filters/api.go
@@ -142,7 +142,11 @@ func (s *PublicFilterAPI) NewBlockFilter() (string, error) {
s.blockMu.Lock()
filter := New(s.chainDb)
- id := s.filterManager.Add(filter)
+ id, err := s.filterManager.Add(filter, ChainFilter)
+ if err != nil {
+ return "", err
+ }
+
s.blockQueue[id] = &hashQueue{timeout: time.Now()}
filter.BlockCallback = func(block *types.Block, logs vm.Logs) {
@@ -174,7 +178,11 @@ func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
defer s.transactionMu.Unlock()
filter := New(s.chainDb)
- id := s.filterManager.Add(filter)
+ id, err := s.filterManager.Add(filter, PendingTxFilter)
+ if err != nil {
+ return "", err
+ }
+
s.transactionQueue[id] = &hashQueue{timeout: time.Now()}
filter.TransactionCallback = func(tx *types.Transaction) {
@@ -194,12 +202,16 @@ func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
}
// newLogFilter creates a new log filter.
-func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash) int {
+func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash) (int, error) {
s.logMu.Lock()
defer s.logMu.Unlock()
filter := New(s.chainDb)
- id := s.filterManager.Add(filter)
+ id, err := s.filterManager.Add(filter, LogFilter)
+ if err != nil {
+ return 0, err
+ }
+
s.logQueue[id] = &logQueue{timeout: time.Now()}
filter.SetBeginBlock(earliest)
@@ -215,7 +227,7 @@ func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []commo
}
}
- return id
+ return id, nil
}
// NewFilterArgs represents a request to create a new filter.
@@ -352,9 +364,12 @@ func (s *PublicFilterAPI) NewFilter(args NewFilterArgs) (string, error) {
var id int
if len(args.Addresses) > 0 {
- id = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics)
+ id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics)
} else {
- id = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics)
+ id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics)
+ }
+ if err != nil {
+ return "", err
}
s.filterMapMu.Lock()
diff --git a/eth/filters/filter.go b/eth/filters/filter.go
index 2c92d20b1..96af93c4a 100644
--- a/eth/filters/filter.go
+++ b/eth/filters/filter.go
@@ -18,6 +18,7 @@ package filters
import (
"math"
+ "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
@@ -32,6 +33,8 @@ type AccountChange struct {
// Filtering interface
type Filter struct {
+ created time.Time
+
db ethdb.Database
begin, end int64
addresses []common.Address
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index 04e58a08c..b61a493b6 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -19,6 +19,7 @@
package filters
import (
+ "fmt"
"sync"
"time"
@@ -27,26 +28,47 @@ import (
"github.com/ethereum/go-ethereum/event"
)
+// FilterType determines the type of filter and is used to put the filter in to
+// the correct bucket when added.
+type FilterType byte
+
+const (
+ ChainFilter FilterType = iota // new block events filter
+ PendingTxFilter // pending transaction filter
+ LogFilter // new or removed log filter
+ PendingLogFilter // pending log filter
+)
+
// FilterSystem manages filters that filter specific events such as
// block, transaction and log events. The Filtering system can be used to listen
// for specific LOG events fired by the EVM (Ethereum Virtual Machine).
type FilterSystem struct {
filterMu sync.RWMutex
filterId int
- filters map[int]*Filter
- created map[int]time.Time
- sub event.Subscription
+
+ chainFilters map[int]*Filter
+ pendingTxFilters map[int]*Filter
+ logFilters map[int]*Filter
+ pendingLogFilters map[int]*Filter
+
+ // generic is an ugly hack for Get
+ generic map[int]*Filter
+
+ sub event.Subscription
}
// NewFilterSystem returns a newly allocated filter manager
func NewFilterSystem(mux *event.TypeMux) *FilterSystem {
fs := &FilterSystem{
- filters: make(map[int]*Filter),
- created: make(map[int]time.Time),
+ chainFilters: make(map[int]*Filter),
+ pendingTxFilters: make(map[int]*Filter),
+ logFilters: make(map[int]*Filter),
+ pendingLogFilters: make(map[int]*Filter),
+ generic: make(map[int]*Filter),
}
fs.sub = mux.Subscribe(
- //core.PendingBlockEvent{},
- core.RemovedLogEvent{},
+ core.PendingLogsEvent{},
+ core.RemovedLogsEvent{},
core.ChainEvent{},
core.TxPreEvent{},
vm.Logs(nil),
@@ -61,15 +83,30 @@ func (fs *FilterSystem) Stop() {
}
// Add adds a filter to the filter manager
-func (fs *FilterSystem) Add(filter *Filter) (id int) {
+func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error) {
fs.filterMu.Lock()
defer fs.filterMu.Unlock()
- id = fs.filterId
- fs.filters[id] = filter
- fs.created[id] = time.Now()
+
+ id := fs.filterId
+ filter.created = time.Now()
+
+ switch filterType {
+ case ChainFilter:
+ fs.chainFilters[id] = filter
+ case PendingTxFilter:
+ fs.pendingTxFilters[id] = filter
+ case LogFilter:
+ fs.logFilters[id] = filter
+ case PendingLogFilter:
+ fs.pendingLogFilters[id] = filter
+ default:
+ return 0, fmt.Errorf("unknown filter type %v", filterType)
+ }
+ fs.generic[id] = filter
+
fs.filterId++
- return id
+ return id, nil
}
// Remove removes a filter by filter id
@@ -77,16 +114,18 @@ func (fs *FilterSystem) Remove(id int) {
fs.filterMu.Lock()
defer fs.filterMu.Unlock()
- delete(fs.filters, id)
- delete(fs.created, id)
+ delete(fs.chainFilters, id)
+ delete(fs.pendingTxFilters, id)
+ delete(fs.logFilters, id)
+ delete(fs.pendingLogFilters, id)
+ delete(fs.generic, id)
}
-// Get retrieves a filter installed using Add The filter may not be modified.
func (fs *FilterSystem) Get(id int) *Filter {
fs.filterMu.RLock()
defer fs.filterMu.RUnlock()
- return fs.filters[id]
+ return fs.generic[id]
}
// filterLoop waits for specific events from ethereum and fires their handlers
@@ -96,17 +135,16 @@ func (fs *FilterSystem) filterLoop() {
switch ev := event.Data.(type) {
case core.ChainEvent:
fs.filterMu.RLock()
- for id, filter := range fs.filters {
- if filter.BlockCallback != nil && !fs.created[id].After(event.Time) {
+ for _, filter := range fs.chainFilters {
+ if filter.BlockCallback != nil && !filter.created.After(event.Time) {
filter.BlockCallback(ev.Block, ev.Logs)
}
}
fs.filterMu.RUnlock()
-
case core.TxPreEvent:
fs.filterMu.RLock()
- for id, filter := range fs.filters {
- if filter.TransactionCallback != nil && !fs.created[id].After(event.Time) {
+ for _, filter := range fs.pendingTxFilters {
+ if filter.TransactionCallback != nil && !filter.created.After(event.Time) {
filter.TransactionCallback(ev.Tx)
}
}
@@ -114,25 +152,34 @@ func (fs *FilterSystem) filterLoop() {
case vm.Logs:
fs.filterMu.RLock()
- for id, filter := range fs.filters {
- if filter.LogCallback != nil && !fs.created[id].After(event.Time) {
+ for _, filter := range fs.logFilters {
+ if filter.LogCallback != nil && !filter.created.After(event.Time) {
for _, log := range filter.FilterLogs(ev) {
filter.LogCallback(log, false)
}
}
}
fs.filterMu.RUnlock()
-
- case core.RemovedLogEvent:
+ case core.RemovedLogsEvent:
fs.filterMu.RLock()
- for id, filter := range fs.filters {
- if filter.LogCallback != nil && !fs.created[id].After(event.Time) {
+ for _, filter := range fs.logFilters {
+ if filter.LogCallback != nil && !filter.created.After(event.Time) {
for _, removedLog := range ev.Logs {
filter.LogCallback(removedLog, true)
}
}
}
fs.filterMu.RUnlock()
+ case core.PendingLogsEvent:
+ fs.filterMu.RLock()
+ for _, filter := range fs.pendingLogFilters {
+ if filter.LogCallback != nil && !filter.created.After(event.Time) {
+ for _, pendingLog := range ev.Logs {
+ filter.LogCallback(pendingLog, false)
+ }
+ }
+ }
+ fs.filterMu.RUnlock()
}
}
}
diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go
index 7ddeb02bc..3ad7dd9cb 100644
--- a/eth/filters/filter_system_test.go
+++ b/eth/filters/filter_system_test.go
@@ -18,6 +18,7 @@ func TestCallbacks(t *testing.T) {
txDone = make(chan struct{})
logDone = make(chan struct{})
removedLogDone = make(chan struct{})
+ pendingLogDone = make(chan struct{})
)
blockFilter := &Filter{
@@ -37,7 +38,6 @@ func TestCallbacks(t *testing.T) {
}
},
}
-
removedLogFilter := &Filter{
LogCallback: func(l *vm.Log, oob bool) {
if oob {
@@ -45,16 +45,23 @@ func TestCallbacks(t *testing.T) {
}
},
}
+ pendingLogFilter := &Filter{
+ LogCallback: func(*vm.Log, bool) {
+ close(pendingLogDone)
+ },
+ }
- fs.Add(blockFilter)
- fs.Add(txFilter)
- fs.Add(logFilter)
- fs.Add(removedLogFilter)
+ fs.Add(blockFilter, ChainFilter)
+ fs.Add(txFilter, PendingTxFilter)
+ fs.Add(logFilter, LogFilter)
+ fs.Add(removedLogFilter, LogFilter)
+ fs.Add(pendingLogFilter, PendingLogFilter)
mux.Post(core.ChainEvent{})
mux.Post(core.TxPreEvent{})
- mux.Post(core.RemovedLogEvent{vm.Logs{&vm.Log{}}})
mux.Post(vm.Logs{&vm.Log{}})
+ mux.Post(core.RemovedLogsEvent{vm.Logs{&vm.Log{}}})
+ mux.Post(core.PendingLogsEvent{vm.Logs{&vm.Log{}}})
const dura = 5 * time.Second
failTimer := time.NewTimer(dura)
@@ -84,4 +91,11 @@ func TestCallbacks(t *testing.T) {
case <-failTimer.C:
t.Error("removed log filter failed to trigger (timeout)")
}
+
+ failTimer.Reset(dura)
+ select {
+ case <-pendingLogDone:
+ case <-failTimer.C:
+ t.Error("pending log filter failed to trigger (timout)")
+ }
}
diff --git a/miner/worker.go b/miner/worker.go
index 9c29d2250..81f7b16ac 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -243,7 +243,7 @@ func (self *worker) update() {
// Apply transaction to the pending state if we're not mining
if atomic.LoadInt32(&self.mining) == 0 {
self.currentMu.Lock()
- self.current.commitTransactions(types.Transactions{ev.Tx}, self.gasPrice, self.chain)
+ self.current.commitTransactions(self.mux, types.Transactions{ev.Tx}, self.gasPrice, self.chain)
self.currentMu.Unlock()
}
}
@@ -529,7 +529,7 @@ func (self *worker) commitNewWork() {
transactions := append(singleTxOwner, multiTxOwner...)
*/
- work.commitTransactions(transactions, self.gasPrice, self.chain)
+ work.commitTransactions(self.mux, transactions, self.gasPrice, self.chain)
self.eth.TxPool().RemoveTransactions(work.lowGasTxs)
// compute uncles for the new block.
@@ -588,8 +588,10 @@ func (self *worker) commitUncle(work *Work, uncle *types.Header) error {
return nil
}
-func (env *Work) commitTransactions(transactions types.Transactions, gasPrice *big.Int, bc *core.BlockChain) {
+func (env *Work) commitTransactions(mux *event.TypeMux, transactions types.Transactions, gasPrice *big.Int, bc *core.BlockChain) {
gp := new(core.GasPool).AddGas(env.header.GasLimit)
+
+ var coalescedLogs vm.Logs
for _, tx := range transactions {
// We can skip err. It has already been validated in the tx pool
from, _ := tx.From()
@@ -627,7 +629,7 @@ func (env *Work) commitTransactions(transactions types.Transactions, gasPrice *b
env.state.StartRecord(tx.Hash(), common.Hash{}, 0)
- err := env.commitTransaction(tx, bc, gp)
+ err, logs := env.commitTransaction(tx, bc, gp)
switch {
case core.IsGasLimitErr(err):
// ignore the transactor so no nonce errors will be thrown for this account
@@ -643,20 +645,25 @@ func (env *Work) commitTransactions(transactions types.Transactions, gasPrice *b
}
default:
env.tcount++
+ coalescedLogs = append(coalescedLogs, logs...)
}
}
+ if len(coalescedLogs) > 0 {
+ go mux.Post(core.PendingLogsEvent{Logs: coalescedLogs})
+ }
}
-func (env *Work) commitTransaction(tx *types.Transaction, bc *core.BlockChain, gp *core.GasPool) error {
+func (env *Work) commitTransaction(tx *types.Transaction, bc *core.BlockChain, gp *core.GasPool) (error, vm.Logs) {
snap := env.state.Copy()
- receipt, _, _, err := core.ApplyTransaction(bc, gp, env.state, env.header, tx, env.header.GasUsed)
+ receipt, logs, _, err := core.ApplyTransaction(bc, gp, env.state, env.header, tx, env.header.GasUsed)
if err != nil {
env.state.Set(snap)
- return err
+ return err, nil
}
env.txs = append(env.txs, tx)
env.receipts = append(env.receipts, receipt)
- return nil
+
+ return nil, logs
}
// TODO: remove or use