diff options
author | Péter Szilágyi <peterke@gmail.com> | 2016-02-13 20:53:59 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2016-02-13 20:53:59 +0800 |
commit | cb85923828d5ea45b53189b3b29ab014d9601f09 (patch) | |
tree | 6ced6a59dc3ac84db21ce8021ebc36cfe1f9d8fe | |
parent | 770b29fd80b8f25be31c2db5af6904cc5d0688ef (diff) | |
parent | 987c1a595a6a318ac83b68e3b87812497070bf9b (diff) | |
download | dexon-cb85923828d5ea45b53189b3b29ab014d9601f09.tar dexon-cb85923828d5ea45b53189b3b29ab014d9601f09.tar.gz dexon-cb85923828d5ea45b53189b3b29ab014d9601f09.tar.bz2 dexon-cb85923828d5ea45b53189b3b29ab014d9601f09.tar.lz dexon-cb85923828d5ea45b53189b3b29ab014d9601f09.tar.xz dexon-cb85923828d5ea45b53189b3b29ab014d9601f09.tar.zst dexon-cb85923828d5ea45b53189b3b29ab014d9601f09.zip |
Merge pull request #2205 from obscuren/pending-filters
eth/filters: ✨ pending logs ✨
-rw-r--r-- | core/blockchain.go | 2 | ||||
-rw-r--r-- | core/blockchain_test.go | 4 | ||||
-rw-r--r-- | core/events.go | 7 | ||||
-rw-r--r-- | eth/filters/api.go | 29 | ||||
-rw-r--r-- | eth/filters/filter.go | 3 | ||||
-rw-r--r-- | eth/filters/filter_system.go | 101 | ||||
-rw-r--r-- | eth/filters/filter_system_test.go | 26 | ||||
-rw-r--r-- | miner/worker.go | 23 |
8 files changed, 143 insertions, 52 deletions
diff --git a/core/blockchain.go b/core/blockchain.go index 95ed06d8d..22dd617ad 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1358,7 +1358,7 @@ func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error { go self.eventMux.Post(RemovedTransactionEvent{diff}) } if len(deletedLogs) > 0 { - go self.eventMux.Post(RemovedLogEvent{deletedLogs}) + go self.eventMux.Post(RemovedLogsEvent{deletedLogs}) } return nil diff --git a/core/blockchain_test.go b/core/blockchain_test.go index b4ac1696a..1bb5f646d 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -982,7 +982,7 @@ func TestLogReorgs(t *testing.T) { evmux := &event.TypeMux{} blockchain, _ := NewBlockChain(db, FakePow{}, evmux) - subs := evmux.Subscribe(RemovedLogEvent{}) + subs := evmux.Subscribe(RemovedLogsEvent{}) chain, _ := GenerateChain(genesis, db, 2, func(i int, gen *BlockGen) { if i == 1 { tx, err := types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), big.NewInt(1000000), new(big.Int), code).SignECDSA(key1) @@ -1002,7 +1002,7 @@ func TestLogReorgs(t *testing.T) { } ev := <-subs.Chan() - if len(ev.Data.(RemovedLogEvent).Logs) == 0 { + if len(ev.Data.(RemovedLogsEvent).Logs) == 0 { t.Error("expected logs") } } diff --git a/core/events.go b/core/events.go index 1a760c71c..c23206cad 100644 --- a/core/events.go +++ b/core/events.go @@ -30,6 +30,11 @@ type TxPreEvent struct{ Tx *types.Transaction } // TxPostEvent is posted when a transaction has been processed. type TxPostEvent struct{ Tx *types.Transaction } +// PendingLogsEvent is posted pre mining and notifies of pending logs. +type PendingLogsEvent struct { + Logs vm.Logs +} + // NewBlockEvent is posted when a block has been imported. type NewBlockEvent struct{ Block *types.Block } @@ -40,7 +45,7 @@ type NewMinedBlockEvent struct{ Block *types.Block } type RemovedTransactionEvent struct{ Txs types.Transactions } // RemovedLogEvent is posted when a reorg happens -type RemovedLogEvent struct{ Logs vm.Logs } +type RemovedLogsEvent struct{ Logs vm.Logs } // ChainSplit is posted when a new head is detected type ChainSplitEvent struct { diff --git a/eth/filters/api.go b/eth/filters/api.go index 148daa649..6cd184b80 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -142,7 +142,11 @@ func (s *PublicFilterAPI) NewBlockFilter() (string, error) { s.blockMu.Lock() filter := New(s.chainDb) - id := s.filterManager.Add(filter) + id, err := s.filterManager.Add(filter, ChainFilter) + if err != nil { + return "", err + } + s.blockQueue[id] = &hashQueue{timeout: time.Now()} filter.BlockCallback = func(block *types.Block, logs vm.Logs) { @@ -174,7 +178,11 @@ func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) { defer s.transactionMu.Unlock() filter := New(s.chainDb) - id := s.filterManager.Add(filter) + id, err := s.filterManager.Add(filter, PendingTxFilter) + if err != nil { + return "", err + } + s.transactionQueue[id] = &hashQueue{timeout: time.Now()} filter.TransactionCallback = func(tx *types.Transaction) { @@ -194,12 +202,16 @@ func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) { } // newLogFilter creates a new log filter. -func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash) int { +func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash) (int, error) { s.logMu.Lock() defer s.logMu.Unlock() filter := New(s.chainDb) - id := s.filterManager.Add(filter) + id, err := s.filterManager.Add(filter, LogFilter) + if err != nil { + return 0, err + } + s.logQueue[id] = &logQueue{timeout: time.Now()} filter.SetBeginBlock(earliest) @@ -215,7 +227,7 @@ func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []commo } } - return id + return id, nil } // NewFilterArgs represents a request to create a new filter. @@ -352,9 +364,12 @@ func (s *PublicFilterAPI) NewFilter(args NewFilterArgs) (string, error) { var id int if len(args.Addresses) > 0 { - id = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics) + id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics) } else { - id = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics) + id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics) + } + if err != nil { + return "", err } s.filterMapMu.Lock() diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 2c92d20b1..96af93c4a 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -18,6 +18,7 @@ package filters import ( "math" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" @@ -32,6 +33,8 @@ type AccountChange struct { // Filtering interface type Filter struct { + created time.Time + db ethdb.Database begin, end int64 addresses []common.Address diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 04e58a08c..b61a493b6 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -19,6 +19,7 @@ package filters import ( + "fmt" "sync" "time" @@ -27,26 +28,47 @@ import ( "github.com/ethereum/go-ethereum/event" ) +// FilterType determines the type of filter and is used to put the filter in to +// the correct bucket when added. +type FilterType byte + +const ( + ChainFilter FilterType = iota // new block events filter + PendingTxFilter // pending transaction filter + LogFilter // new or removed log filter + PendingLogFilter // pending log filter +) + // FilterSystem manages filters that filter specific events such as // block, transaction and log events. The Filtering system can be used to listen // for specific LOG events fired by the EVM (Ethereum Virtual Machine). type FilterSystem struct { filterMu sync.RWMutex filterId int - filters map[int]*Filter - created map[int]time.Time - sub event.Subscription + + chainFilters map[int]*Filter + pendingTxFilters map[int]*Filter + logFilters map[int]*Filter + pendingLogFilters map[int]*Filter + + // generic is an ugly hack for Get + generic map[int]*Filter + + sub event.Subscription } // NewFilterSystem returns a newly allocated filter manager func NewFilterSystem(mux *event.TypeMux) *FilterSystem { fs := &FilterSystem{ - filters: make(map[int]*Filter), - created: make(map[int]time.Time), + chainFilters: make(map[int]*Filter), + pendingTxFilters: make(map[int]*Filter), + logFilters: make(map[int]*Filter), + pendingLogFilters: make(map[int]*Filter), + generic: make(map[int]*Filter), } fs.sub = mux.Subscribe( - //core.PendingBlockEvent{}, - core.RemovedLogEvent{}, + core.PendingLogsEvent{}, + core.RemovedLogsEvent{}, core.ChainEvent{}, core.TxPreEvent{}, vm.Logs(nil), @@ -61,15 +83,30 @@ func (fs *FilterSystem) Stop() { } // Add adds a filter to the filter manager -func (fs *FilterSystem) Add(filter *Filter) (id int) { +func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error) { fs.filterMu.Lock() defer fs.filterMu.Unlock() - id = fs.filterId - fs.filters[id] = filter - fs.created[id] = time.Now() + + id := fs.filterId + filter.created = time.Now() + + switch filterType { + case ChainFilter: + fs.chainFilters[id] = filter + case PendingTxFilter: + fs.pendingTxFilters[id] = filter + case LogFilter: + fs.logFilters[id] = filter + case PendingLogFilter: + fs.pendingLogFilters[id] = filter + default: + return 0, fmt.Errorf("unknown filter type %v", filterType) + } + fs.generic[id] = filter + fs.filterId++ - return id + return id, nil } // Remove removes a filter by filter id @@ -77,16 +114,18 @@ func (fs *FilterSystem) Remove(id int) { fs.filterMu.Lock() defer fs.filterMu.Unlock() - delete(fs.filters, id) - delete(fs.created, id) + delete(fs.chainFilters, id) + delete(fs.pendingTxFilters, id) + delete(fs.logFilters, id) + delete(fs.pendingLogFilters, id) + delete(fs.generic, id) } -// Get retrieves a filter installed using Add The filter may not be modified. func (fs *FilterSystem) Get(id int) *Filter { fs.filterMu.RLock() defer fs.filterMu.RUnlock() - return fs.filters[id] + return fs.generic[id] } // filterLoop waits for specific events from ethereum and fires their handlers @@ -96,17 +135,16 @@ func (fs *FilterSystem) filterLoop() { switch ev := event.Data.(type) { case core.ChainEvent: fs.filterMu.RLock() - for id, filter := range fs.filters { - if filter.BlockCallback != nil && !fs.created[id].After(event.Time) { + for _, filter := range fs.chainFilters { + if filter.BlockCallback != nil && !filter.created.After(event.Time) { filter.BlockCallback(ev.Block, ev.Logs) } } fs.filterMu.RUnlock() - case core.TxPreEvent: fs.filterMu.RLock() - for id, filter := range fs.filters { - if filter.TransactionCallback != nil && !fs.created[id].After(event.Time) { + for _, filter := range fs.pendingTxFilters { + if filter.TransactionCallback != nil && !filter.created.After(event.Time) { filter.TransactionCallback(ev.Tx) } } @@ -114,25 +152,34 @@ func (fs *FilterSystem) filterLoop() { case vm.Logs: fs.filterMu.RLock() - for id, filter := range fs.filters { - if filter.LogCallback != nil && !fs.created[id].After(event.Time) { + for _, filter := range fs.logFilters { + if filter.LogCallback != nil && !filter.created.After(event.Time) { for _, log := range filter.FilterLogs(ev) { filter.LogCallback(log, false) } } } fs.filterMu.RUnlock() - - case core.RemovedLogEvent: + case core.RemovedLogsEvent: fs.filterMu.RLock() - for id, filter := range fs.filters { - if filter.LogCallback != nil && !fs.created[id].After(event.Time) { + for _, filter := range fs.logFilters { + if filter.LogCallback != nil && !filter.created.After(event.Time) { for _, removedLog := range ev.Logs { filter.LogCallback(removedLog, true) } } } fs.filterMu.RUnlock() + case core.PendingLogsEvent: + fs.filterMu.RLock() + for _, filter := range fs.pendingLogFilters { + if filter.LogCallback != nil && !filter.created.After(event.Time) { + for _, pendingLog := range ev.Logs { + filter.LogCallback(pendingLog, false) + } + } + } + fs.filterMu.RUnlock() } } } diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 7ddeb02bc..3ad7dd9cb 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -18,6 +18,7 @@ func TestCallbacks(t *testing.T) { txDone = make(chan struct{}) logDone = make(chan struct{}) removedLogDone = make(chan struct{}) + pendingLogDone = make(chan struct{}) ) blockFilter := &Filter{ @@ -37,7 +38,6 @@ func TestCallbacks(t *testing.T) { } }, } - removedLogFilter := &Filter{ LogCallback: func(l *vm.Log, oob bool) { if oob { @@ -45,16 +45,23 @@ func TestCallbacks(t *testing.T) { } }, } + pendingLogFilter := &Filter{ + LogCallback: func(*vm.Log, bool) { + close(pendingLogDone) + }, + } - fs.Add(blockFilter) - fs.Add(txFilter) - fs.Add(logFilter) - fs.Add(removedLogFilter) + fs.Add(blockFilter, ChainFilter) + fs.Add(txFilter, PendingTxFilter) + fs.Add(logFilter, LogFilter) + fs.Add(removedLogFilter, LogFilter) + fs.Add(pendingLogFilter, PendingLogFilter) mux.Post(core.ChainEvent{}) mux.Post(core.TxPreEvent{}) - mux.Post(core.RemovedLogEvent{vm.Logs{&vm.Log{}}}) mux.Post(vm.Logs{&vm.Log{}}) + mux.Post(core.RemovedLogsEvent{vm.Logs{&vm.Log{}}}) + mux.Post(core.PendingLogsEvent{vm.Logs{&vm.Log{}}}) const dura = 5 * time.Second failTimer := time.NewTimer(dura) @@ -84,4 +91,11 @@ func TestCallbacks(t *testing.T) { case <-failTimer.C: t.Error("removed log filter failed to trigger (timeout)") } + + failTimer.Reset(dura) + select { + case <-pendingLogDone: + case <-failTimer.C: + t.Error("pending log filter failed to trigger (timout)") + } } diff --git a/miner/worker.go b/miner/worker.go index 9c29d2250..81f7b16ac 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -243,7 +243,7 @@ func (self *worker) update() { // Apply transaction to the pending state if we're not mining if atomic.LoadInt32(&self.mining) == 0 { self.currentMu.Lock() - self.current.commitTransactions(types.Transactions{ev.Tx}, self.gasPrice, self.chain) + self.current.commitTransactions(self.mux, types.Transactions{ev.Tx}, self.gasPrice, self.chain) self.currentMu.Unlock() } } @@ -529,7 +529,7 @@ func (self *worker) commitNewWork() { transactions := append(singleTxOwner, multiTxOwner...) */ - work.commitTransactions(transactions, self.gasPrice, self.chain) + work.commitTransactions(self.mux, transactions, self.gasPrice, self.chain) self.eth.TxPool().RemoveTransactions(work.lowGasTxs) // compute uncles for the new block. @@ -588,8 +588,10 @@ func (self *worker) commitUncle(work *Work, uncle *types.Header) error { return nil } -func (env *Work) commitTransactions(transactions types.Transactions, gasPrice *big.Int, bc *core.BlockChain) { +func (env *Work) commitTransactions(mux *event.TypeMux, transactions types.Transactions, gasPrice *big.Int, bc *core.BlockChain) { gp := new(core.GasPool).AddGas(env.header.GasLimit) + + var coalescedLogs vm.Logs for _, tx := range transactions { // We can skip err. It has already been validated in the tx pool from, _ := tx.From() @@ -627,7 +629,7 @@ func (env *Work) commitTransactions(transactions types.Transactions, gasPrice *b env.state.StartRecord(tx.Hash(), common.Hash{}, 0) - err := env.commitTransaction(tx, bc, gp) + err, logs := env.commitTransaction(tx, bc, gp) switch { case core.IsGasLimitErr(err): // ignore the transactor so no nonce errors will be thrown for this account @@ -643,20 +645,25 @@ func (env *Work) commitTransactions(transactions types.Transactions, gasPrice *b } default: env.tcount++ + coalescedLogs = append(coalescedLogs, logs...) } } + if len(coalescedLogs) > 0 { + go mux.Post(core.PendingLogsEvent{Logs: coalescedLogs}) + } } -func (env *Work) commitTransaction(tx *types.Transaction, bc *core.BlockChain, gp *core.GasPool) error { +func (env *Work) commitTransaction(tx *types.Transaction, bc *core.BlockChain, gp *core.GasPool) (error, vm.Logs) { snap := env.state.Copy() - receipt, _, _, err := core.ApplyTransaction(bc, gp, env.state, env.header, tx, env.header.GasUsed) + receipt, logs, _, err := core.ApplyTransaction(bc, gp, env.state, env.header, tx, env.header.GasUsed) if err != nil { env.state.Set(snap) - return err + return err, nil } env.txs = append(env.txs, tx) env.receipts = append(env.receipts, receipt) - return nil + + return nil, logs } // TODO: remove or use |