aboutsummaryrefslogtreecommitdiffstats
path: root/eth/filters/filter_system.go
diff options
context:
space:
mode:
authorBas van Kervel <bas@ethdev.com>2016-07-27 23:47:46 +0800
committerBas van Kervel <basvankervel@gmail.com>2016-08-17 18:59:58 +0800
commit47ff8130124b479f1f051312eed50c33f0a38e6f (patch)
treecb29e4550f63f3a763dd04b267261e354e56d7eb /eth/filters/filter_system.go
parent3b39d4d1c15df2697284c3d7a61564f98ab45c70 (diff)
downloadgo-tangerine-47ff8130124b479f1f051312eed50c33f0a38e6f.tar
go-tangerine-47ff8130124b479f1f051312eed50c33f0a38e6f.tar.gz
go-tangerine-47ff8130124b479f1f051312eed50c33f0a38e6f.tar.bz2
go-tangerine-47ff8130124b479f1f051312eed50c33f0a38e6f.tar.lz
go-tangerine-47ff8130124b479f1f051312eed50c33f0a38e6f.tar.xz
go-tangerine-47ff8130124b479f1f051312eed50c33f0a38e6f.tar.zst
go-tangerine-47ff8130124b479f1f051312eed50c33f0a38e6f.zip
rpc: refactor subscriptions and filters
Diffstat (limited to 'eth/filters/filter_system.go')
-rw-r--r--eth/filters/filter_system.go370
1 files changed, 248 insertions, 122 deletions
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index 256464213..04a55fd09 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -14,179 +14,305 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-// package filters implements an ethereum filtering system for block,
+// Package filters implements an ethereum filtering system for block,
// transactions and log events.
package filters
import (
+ "encoding/json"
+ "errors"
"fmt"
"sync"
"time"
+ "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/rpc"
)
-// FilterType determines the type of filter and is used to put the filter in to
+// Type determines the kind of filter and is used to put the filter in to
// the correct bucket when added.
-type FilterType byte
+type Type byte
const (
- ChainFilter FilterType = iota // new block events filter
- PendingTxFilter // pending transaction filter
- LogFilter // new or removed log filter
- PendingLogFilter // pending log filter
+ // UnknownSubscription indicates an unkown subscription type
+ UnknownSubscription Type = iota
+ // LogsSubscription queries for new or removed (chain reorg) logs
+ LogsSubscription
+ // PendingLogsSubscription queries for logs for the pending block
+ PendingLogsSubscription
+ // PendingTransactionsSubscription queries tx hashes for pending
+ // transactions entering the pending state
+ PendingTransactionsSubscription
+ // BlocksSubscription queries hashes for blocks that are imported
+ BlocksSubscription
)
-// FilterSystem manages filters that filter specific events such as
-// block, transaction and log events. The Filtering system can be used to listen
-// for specific LOG events fired by the EVM (Ethereum Virtual Machine).
-type FilterSystem struct {
- filterMu sync.RWMutex
- filterId int
+var (
+ ErrInvalidSubscriptionID = errors.New("invalid id")
+)
+
+// Log is a helper that can hold additional information about vm.Log
+// necessary for the RPC interface.
+type Log struct {
+ *vm.Log
+ Removed bool `json:"removed"`
+}
- chainFilters map[int]*Filter
- pendingTxFilters map[int]*Filter
- logFilters map[int]*Filter
- pendingLogFilters map[int]*Filter
+func (l *Log) MarshalJSON() ([]byte, error) {
+ fields := map[string]interface{}{
+ "address": l.Address,
+ "data": fmt.Sprintf("0x%x", l.Data),
+ "blockNumber": fmt.Sprintf("%#x", l.BlockNumber),
+ "logIndex": fmt.Sprintf("%#x", l.Index),
+ "blockHash": l.BlockHash,
+ "transactionHash": l.TxHash,
+ "transactionIndex": fmt.Sprintf("%#x", l.TxIndex),
+ "topics": l.Topics,
+ "removed": l.Removed,
+ }
- // generic is an ugly hack for Get
- generic map[int]*Filter
+ return json.Marshal(fields)
+}
- sub event.Subscription
+type subscription struct {
+ id rpc.ID
+ typ Type
+ created time.Time
+ logsCrit FilterCriteria
+ logs chan []Log
+ hashes chan common.Hash
+ headers chan *types.Header
+ installed chan struct{} // closed when the filter is installed
+ err chan error // closed when the filter is uninstalled
}
-// NewFilterSystem returns a newly allocated filter manager
-func NewFilterSystem(mux *event.TypeMux) *FilterSystem {
- fs := &FilterSystem{
- chainFilters: make(map[int]*Filter),
- pendingTxFilters: make(map[int]*Filter),
- logFilters: make(map[int]*Filter),
- pendingLogFilters: make(map[int]*Filter),
- generic: make(map[int]*Filter),
+// EventSystem creates subscriptions, processes events and broadcasts them to the
+// subscription which match the subscription criteria.
+type EventSystem struct {
+ mux *event.TypeMux
+ sub event.Subscription
+ install chan *subscription // install filter for event notification
+ uninstall chan *subscription // remove filter for event notification
+}
+
+// NewEventSystem creates a new manager that listens for event on the given mux,
+// parses and filters them. It uses the all map to retrieve filter changes. The
+// work loop holds its own index that is used to forward events to filters.
+//
+// The returned manager has a loop that needs to be stopped with the Stop function
+// or by stopping the given mux.
+func NewEventSystem(mux *event.TypeMux) *EventSystem {
+ m := &EventSystem{
+ mux: mux,
+ install: make(chan *subscription),
+ uninstall: make(chan *subscription),
}
- fs.sub = mux.Subscribe(
- core.PendingLogsEvent{},
- core.RemovedLogsEvent{},
- core.ChainEvent{},
- core.TxPreEvent{},
- vm.Logs(nil),
- )
- go fs.filterLoop()
- return fs
+
+ go m.eventLoop()
+
+ return m
}
-// Stop quits the filter loop required for polling events
-func (fs *FilterSystem) Stop() {
- fs.sub.Unsubscribe()
+// Subscription is created when the client registers itself for a particular event.
+type Subscription struct {
+ ID rpc.ID
+ f *subscription
+ es *EventSystem
+ unsubOnce sync.Once
}
-// Acquire filter system maps lock, required to force lock acquisition
-// sequence with filterMu acquired first to avoid deadlocks by callbacks
-func (fs *FilterSystem) Lock() {
- fs.filterMu.Lock()
+// Err returns a channel that is closed when unsubscribed.
+func (sub *Subscription) Err() <-chan error {
+ return sub.f.err
}
-// Release filter system maps lock
-func (fs *FilterSystem) Unlock() {
- fs.filterMu.Unlock()
+// Unsubscribe uninstalls the subscription from the event broadcast loop.
+func (sub *Subscription) Unsubscribe() {
+ sub.unsubOnce.Do(func() {
+ uninstallLoop:
+ for {
+ // write uninstall request and consume logs/hashes. This prevents
+ // the eventLoop broadcast method to deadlock when writing to the
+ // filter event channel while the subscription loop is waiting for
+ // this method to return (and thus not reading these events).
+ select {
+ case sub.es.uninstall <- sub.f:
+ break uninstallLoop
+ case <-sub.f.logs:
+ case <-sub.f.hashes:
+ case <-sub.f.headers:
+ }
+ }
+
+ // wait for filter to be uninstalled in work loop before returning
+ // this ensures that the manager won't use the event channel which
+ // will probably be closed by the client asap after this method returns.
+ <-sub.Err()
+ })
}
-// Add adds a filter to the filter manager
-// Expects filterMu to be locked.
-func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error) {
- id := fs.filterId
- filter.created = time.Now()
+// subscribe installs the subscription in the event broadcast loop.
+func (es *EventSystem) subscribe(sub *subscription) *Subscription {
+ es.install <- sub
+ <-sub.installed
+ return &Subscription{ID: sub.id, f: sub, es: es}
+}
- switch filterType {
- case ChainFilter:
- fs.chainFilters[id] = filter
- case PendingTxFilter:
- fs.pendingTxFilters[id] = filter
- case LogFilter:
- fs.logFilters[id] = filter
- case PendingLogFilter:
- fs.pendingLogFilters[id] = filter
- default:
- return 0, fmt.Errorf("unknown filter type %v", filterType)
+// SubscribeLogs creates a subscription that will write all logs matching the
+// given criteria to the given logs channel.
+func (es *EventSystem) SubscribeLogs(crit FilterCriteria, logs chan []Log) *Subscription {
+ sub := &subscription{
+ id: rpc.NewID(),
+ typ: LogsSubscription,
+ logsCrit: crit,
+ created: time.Now(),
+ logs: logs,
+ hashes: make(chan common.Hash),
+ headers: make(chan *types.Header),
+ installed: make(chan struct{}),
+ err: make(chan error),
}
- fs.generic[id] = filter
- fs.filterId++
+ return es.subscribe(sub)
+}
- return id, nil
+// SubscribePendingLogs creates a subscription that will write pending logs matching the
+// given criteria to the given channel.
+func (es *EventSystem) SubscribePendingLogs(crit FilterCriteria, logs chan []Log) *Subscription {
+ sub := &subscription{
+ id: rpc.NewID(),
+ typ: PendingLogsSubscription,
+ logsCrit: crit,
+ created: time.Now(),
+ logs: logs,
+ hashes: make(chan common.Hash),
+ headers: make(chan *types.Header),
+ installed: make(chan struct{}),
+ err: make(chan error),
+ }
+
+ return es.subscribe(sub)
}
-// Remove removes a filter by filter id
-// Expects filterMu to be locked.
-func (fs *FilterSystem) Remove(id int) {
- delete(fs.chainFilters, id)
- delete(fs.pendingTxFilters, id)
- delete(fs.logFilters, id)
- delete(fs.pendingLogFilters, id)
- delete(fs.generic, id)
+// SubscribePendingTxEvents creates a sbuscription that writes transaction hashes for
+// transactions that enter the transaction pool.
+func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscription {
+ sub := &subscription{
+ id: rpc.NewID(),
+ typ: PendingTransactionsSubscription,
+ created: time.Now(),
+ logs: make(chan []Log),
+ hashes: hashes,
+ headers: make(chan *types.Header),
+ installed: make(chan struct{}),
+ err: make(chan error),
+ }
+
+ return es.subscribe(sub)
}
-func (fs *FilterSystem) Get(id int) *Filter {
- fs.filterMu.RLock()
- defer fs.filterMu.RUnlock()
+// SubscribeNewHeads creates a subscription that writes the header of a block that is
+// imported in the chain.
+func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription {
+ sub := &subscription{
+ id: rpc.NewID(),
+ typ: BlocksSubscription,
+ created: time.Now(),
+ logs: make(chan []Log),
+ hashes: make(chan common.Hash),
+ headers: headers,
+ installed: make(chan struct{}),
+ err: make(chan error),
+ }
- return fs.generic[id]
+ return es.subscribe(sub)
}
-// filterLoop waits for specific events from ethereum and fires their handlers
-// when the filter matches the requirements.
-func (fs *FilterSystem) filterLoop() {
- for event := range fs.sub.Chan() {
- switch ev := event.Data.(type) {
- case core.ChainEvent:
- fs.filterMu.RLock()
- for _, filter := range fs.chainFilters {
- if filter.BlockCallback != nil && !filter.created.After(event.Time) {
- filter.BlockCallback(ev.Block, ev.Logs)
+type filterIndex map[Type]map[rpc.ID]*subscription
+
+// broadcast event to filters that match criteria.
+func broadcast(filters filterIndex, ev *event.Event) {
+ if ev == nil {
+ return
+ }
+
+ switch e := ev.Data.(type) {
+ case vm.Logs:
+ if len(e) > 0 {
+ for _, f := range filters[LogsSubscription] {
+ if ev.Time.After(f.created) {
+ if matchedLogs := filterLogs(convertLogs(e, false), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
+ }
}
}
- fs.filterMu.RUnlock()
- case core.TxPreEvent:
- fs.filterMu.RLock()
- for _, filter := range fs.pendingTxFilters {
- if filter.TransactionCallback != nil && !filter.created.After(event.Time) {
- filter.TransactionCallback(ev.Tx)
+ }
+ case core.RemovedLogsEvent:
+ for _, f := range filters[LogsSubscription] {
+ if ev.Time.After(f.created) {
+ if matchedLogs := filterLogs(convertLogs(e.Logs, true), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
}
}
- fs.filterMu.RUnlock()
-
- case vm.Logs:
- fs.filterMu.RLock()
- for _, filter := range fs.logFilters {
- if filter.LogCallback != nil && !filter.created.After(event.Time) {
- for _, log := range filter.FilterLogs(ev) {
- filter.LogCallback(log, false)
- }
+ }
+ case core.PendingLogsEvent:
+ for _, f := range filters[PendingLogsSubscription] {
+ if ev.Time.After(f.created) {
+ if matchedLogs := filterLogs(convertLogs(e.Logs, false), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
}
}
- fs.filterMu.RUnlock()
- case core.RemovedLogsEvent:
- fs.filterMu.RLock()
- for _, filter := range fs.logFilters {
- if filter.LogCallback != nil && !filter.created.After(event.Time) {
- for _, removedLog := range filter.FilterLogs(ev.Logs) {
- filter.LogCallback(removedLog, true)
- }
- }
+ }
+ case core.TxPreEvent:
+ for _, f := range filters[PendingTransactionsSubscription] {
+ if ev.Time.After(f.created) {
+ f.hashes <- e.Tx.Hash()
}
- fs.filterMu.RUnlock()
- case core.PendingLogsEvent:
- fs.filterMu.RLock()
- for _, filter := range fs.pendingLogFilters {
- if filter.LogCallback != nil && !filter.created.After(event.Time) {
- for _, pendingLog := range ev.Logs {
- filter.LogCallback(pendingLog, false)
- }
- }
+ }
+ case core.ChainEvent:
+ for _, f := range filters[BlocksSubscription] {
+ if ev.Time.After(f.created) {
+ f.headers <- e.Block.Header()
}
- fs.filterMu.RUnlock()
}
}
}
+
+// eventLoop (un)installs filters and processes mux events.
+func (es *EventSystem) eventLoop() {
+ var (
+ index = make(filterIndex)
+ sub = es.mux.Subscribe(core.PendingLogsEvent{}, core.RemovedLogsEvent{}, vm.Logs{}, core.TxPreEvent{}, core.ChainEvent{})
+ )
+ for {
+ select {
+ case ev, active := <-sub.Chan():
+ if !active { // system stopped
+ return
+ }
+ broadcast(index, ev)
+ case f := <-es.install:
+ if _, found := index[f.typ]; !found {
+ index[f.typ] = make(map[rpc.ID]*subscription)
+ }
+ index[f.typ][f.id] = f
+ close(f.installed)
+ case f := <-es.uninstall:
+ delete(index[f.typ], f.id)
+ close(f.err)
+ }
+ }
+}
+
+// convertLogs is a helper utility that converts vm.Logs to []filter.Log.
+func convertLogs(in vm.Logs, removed bool) []Log {
+ logs := make([]Log, len(in))
+ for i, l := range in {
+ logs[i] = Log{l, removed}
+ }
+ return logs
+}