From 47ff8130124b479f1f051312eed50c33f0a38e6f Mon Sep 17 00:00:00 2001 From: Bas van Kervel Date: Wed, 27 Jul 2016 17:47:46 +0200 Subject: rpc: refactor subscriptions and filters --- eth/filters/filter_system.go | 370 +++++++++++++++++++++++++++++-------------- 1 file changed, 248 insertions(+), 122 deletions(-) (limited to 'eth/filters/filter_system.go') diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 256464213..04a55fd09 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -14,179 +14,305 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -// package filters implements an ethereum filtering system for block, +// Package filters implements an ethereum filtering system for block, // transactions and log events. package filters import ( + "encoding/json" + "errors" "fmt" "sync" "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/rpc" ) -// FilterType determines the type of filter and is used to put the filter in to +// Type determines the kind of filter and is used to put the filter in to // the correct bucket when added. -type FilterType byte +type Type byte const ( - ChainFilter FilterType = iota // new block events filter - PendingTxFilter // pending transaction filter - LogFilter // new or removed log filter - PendingLogFilter // pending log filter + // UnknownSubscription indicates an unkown subscription type + UnknownSubscription Type = iota + // LogsSubscription queries for new or removed (chain reorg) logs + LogsSubscription + // PendingLogsSubscription queries for logs for the pending block + PendingLogsSubscription + // PendingTransactionsSubscription queries tx hashes for pending + // transactions entering the pending state + PendingTransactionsSubscription + // BlocksSubscription queries hashes for blocks that are imported + BlocksSubscription ) -// FilterSystem manages filters that filter specific events such as -// block, transaction and log events. The Filtering system can be used to listen -// for specific LOG events fired by the EVM (Ethereum Virtual Machine). -type FilterSystem struct { - filterMu sync.RWMutex - filterId int +var ( + ErrInvalidSubscriptionID = errors.New("invalid id") +) + +// Log is a helper that can hold additional information about vm.Log +// necessary for the RPC interface. +type Log struct { + *vm.Log + Removed bool `json:"removed"` +} - chainFilters map[int]*Filter - pendingTxFilters map[int]*Filter - logFilters map[int]*Filter - pendingLogFilters map[int]*Filter +func (l *Log) MarshalJSON() ([]byte, error) { + fields := map[string]interface{}{ + "address": l.Address, + "data": fmt.Sprintf("0x%x", l.Data), + "blockNumber": fmt.Sprintf("%#x", l.BlockNumber), + "logIndex": fmt.Sprintf("%#x", l.Index), + "blockHash": l.BlockHash, + "transactionHash": l.TxHash, + "transactionIndex": fmt.Sprintf("%#x", l.TxIndex), + "topics": l.Topics, + "removed": l.Removed, + } - // generic is an ugly hack for Get - generic map[int]*Filter + return json.Marshal(fields) +} - sub event.Subscription +type subscription struct { + id rpc.ID + typ Type + created time.Time + logsCrit FilterCriteria + logs chan []Log + hashes chan common.Hash + headers chan *types.Header + installed chan struct{} // closed when the filter is installed + err chan error // closed when the filter is uninstalled } -// NewFilterSystem returns a newly allocated filter manager -func NewFilterSystem(mux *event.TypeMux) *FilterSystem { - fs := &FilterSystem{ - chainFilters: make(map[int]*Filter), - pendingTxFilters: make(map[int]*Filter), - logFilters: make(map[int]*Filter), - pendingLogFilters: make(map[int]*Filter), - generic: make(map[int]*Filter), +// EventSystem creates subscriptions, processes events and broadcasts them to the +// subscription which match the subscription criteria. +type EventSystem struct { + mux *event.TypeMux + sub event.Subscription + install chan *subscription // install filter for event notification + uninstall chan *subscription // remove filter for event notification +} + +// NewEventSystem creates a new manager that listens for event on the given mux, +// parses and filters them. It uses the all map to retrieve filter changes. The +// work loop holds its own index that is used to forward events to filters. +// +// The returned manager has a loop that needs to be stopped with the Stop function +// or by stopping the given mux. +func NewEventSystem(mux *event.TypeMux) *EventSystem { + m := &EventSystem{ + mux: mux, + install: make(chan *subscription), + uninstall: make(chan *subscription), } - fs.sub = mux.Subscribe( - core.PendingLogsEvent{}, - core.RemovedLogsEvent{}, - core.ChainEvent{}, - core.TxPreEvent{}, - vm.Logs(nil), - ) - go fs.filterLoop() - return fs + + go m.eventLoop() + + return m } -// Stop quits the filter loop required for polling events -func (fs *FilterSystem) Stop() { - fs.sub.Unsubscribe() +// Subscription is created when the client registers itself for a particular event. +type Subscription struct { + ID rpc.ID + f *subscription + es *EventSystem + unsubOnce sync.Once } -// Acquire filter system maps lock, required to force lock acquisition -// sequence with filterMu acquired first to avoid deadlocks by callbacks -func (fs *FilterSystem) Lock() { - fs.filterMu.Lock() +// Err returns a channel that is closed when unsubscribed. +func (sub *Subscription) Err() <-chan error { + return sub.f.err } -// Release filter system maps lock -func (fs *FilterSystem) Unlock() { - fs.filterMu.Unlock() +// Unsubscribe uninstalls the subscription from the event broadcast loop. +func (sub *Subscription) Unsubscribe() { + sub.unsubOnce.Do(func() { + uninstallLoop: + for { + // write uninstall request and consume logs/hashes. This prevents + // the eventLoop broadcast method to deadlock when writing to the + // filter event channel while the subscription loop is waiting for + // this method to return (and thus not reading these events). + select { + case sub.es.uninstall <- sub.f: + break uninstallLoop + case <-sub.f.logs: + case <-sub.f.hashes: + case <-sub.f.headers: + } + } + + // wait for filter to be uninstalled in work loop before returning + // this ensures that the manager won't use the event channel which + // will probably be closed by the client asap after this method returns. + <-sub.Err() + }) } -// Add adds a filter to the filter manager -// Expects filterMu to be locked. -func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error) { - id := fs.filterId - filter.created = time.Now() +// subscribe installs the subscription in the event broadcast loop. +func (es *EventSystem) subscribe(sub *subscription) *Subscription { + es.install <- sub + <-sub.installed + return &Subscription{ID: sub.id, f: sub, es: es} +} - switch filterType { - case ChainFilter: - fs.chainFilters[id] = filter - case PendingTxFilter: - fs.pendingTxFilters[id] = filter - case LogFilter: - fs.logFilters[id] = filter - case PendingLogFilter: - fs.pendingLogFilters[id] = filter - default: - return 0, fmt.Errorf("unknown filter type %v", filterType) +// SubscribeLogs creates a subscription that will write all logs matching the +// given criteria to the given logs channel. +func (es *EventSystem) SubscribeLogs(crit FilterCriteria, logs chan []Log) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: LogsSubscription, + logsCrit: crit, + created: time.Now(), + logs: logs, + hashes: make(chan common.Hash), + headers: make(chan *types.Header), + installed: make(chan struct{}), + err: make(chan error), } - fs.generic[id] = filter - fs.filterId++ + return es.subscribe(sub) +} - return id, nil +// SubscribePendingLogs creates a subscription that will write pending logs matching the +// given criteria to the given channel. +func (es *EventSystem) SubscribePendingLogs(crit FilterCriteria, logs chan []Log) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: PendingLogsSubscription, + logsCrit: crit, + created: time.Now(), + logs: logs, + hashes: make(chan common.Hash), + headers: make(chan *types.Header), + installed: make(chan struct{}), + err: make(chan error), + } + + return es.subscribe(sub) } -// Remove removes a filter by filter id -// Expects filterMu to be locked. -func (fs *FilterSystem) Remove(id int) { - delete(fs.chainFilters, id) - delete(fs.pendingTxFilters, id) - delete(fs.logFilters, id) - delete(fs.pendingLogFilters, id) - delete(fs.generic, id) +// SubscribePendingTxEvents creates a sbuscription that writes transaction hashes for +// transactions that enter the transaction pool. +func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: PendingTransactionsSubscription, + created: time.Now(), + logs: make(chan []Log), + hashes: hashes, + headers: make(chan *types.Header), + installed: make(chan struct{}), + err: make(chan error), + } + + return es.subscribe(sub) } -func (fs *FilterSystem) Get(id int) *Filter { - fs.filterMu.RLock() - defer fs.filterMu.RUnlock() +// SubscribeNewHeads creates a subscription that writes the header of a block that is +// imported in the chain. +func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: BlocksSubscription, + created: time.Now(), + logs: make(chan []Log), + hashes: make(chan common.Hash), + headers: headers, + installed: make(chan struct{}), + err: make(chan error), + } - return fs.generic[id] + return es.subscribe(sub) } -// filterLoop waits for specific events from ethereum and fires their handlers -// when the filter matches the requirements. -func (fs *FilterSystem) filterLoop() { - for event := range fs.sub.Chan() { - switch ev := event.Data.(type) { - case core.ChainEvent: - fs.filterMu.RLock() - for _, filter := range fs.chainFilters { - if filter.BlockCallback != nil && !filter.created.After(event.Time) { - filter.BlockCallback(ev.Block, ev.Logs) +type filterIndex map[Type]map[rpc.ID]*subscription + +// broadcast event to filters that match criteria. +func broadcast(filters filterIndex, ev *event.Event) { + if ev == nil { + return + } + + switch e := ev.Data.(type) { + case vm.Logs: + if len(e) > 0 { + for _, f := range filters[LogsSubscription] { + if ev.Time.After(f.created) { + if matchedLogs := filterLogs(convertLogs(e, false), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + f.logs <- matchedLogs + } } } - fs.filterMu.RUnlock() - case core.TxPreEvent: - fs.filterMu.RLock() - for _, filter := range fs.pendingTxFilters { - if filter.TransactionCallback != nil && !filter.created.After(event.Time) { - filter.TransactionCallback(ev.Tx) + } + case core.RemovedLogsEvent: + for _, f := range filters[LogsSubscription] { + if ev.Time.After(f.created) { + if matchedLogs := filterLogs(convertLogs(e.Logs, true), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + f.logs <- matchedLogs } } - fs.filterMu.RUnlock() - - case vm.Logs: - fs.filterMu.RLock() - for _, filter := range fs.logFilters { - if filter.LogCallback != nil && !filter.created.After(event.Time) { - for _, log := range filter.FilterLogs(ev) { - filter.LogCallback(log, false) - } + } + case core.PendingLogsEvent: + for _, f := range filters[PendingLogsSubscription] { + if ev.Time.After(f.created) { + if matchedLogs := filterLogs(convertLogs(e.Logs, false), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + f.logs <- matchedLogs } } - fs.filterMu.RUnlock() - case core.RemovedLogsEvent: - fs.filterMu.RLock() - for _, filter := range fs.logFilters { - if filter.LogCallback != nil && !filter.created.After(event.Time) { - for _, removedLog := range filter.FilterLogs(ev.Logs) { - filter.LogCallback(removedLog, true) - } - } + } + case core.TxPreEvent: + for _, f := range filters[PendingTransactionsSubscription] { + if ev.Time.After(f.created) { + f.hashes <- e.Tx.Hash() } - fs.filterMu.RUnlock() - case core.PendingLogsEvent: - fs.filterMu.RLock() - for _, filter := range fs.pendingLogFilters { - if filter.LogCallback != nil && !filter.created.After(event.Time) { - for _, pendingLog := range ev.Logs { - filter.LogCallback(pendingLog, false) - } - } + } + case core.ChainEvent: + for _, f := range filters[BlocksSubscription] { + if ev.Time.After(f.created) { + f.headers <- e.Block.Header() } - fs.filterMu.RUnlock() } } } + +// eventLoop (un)installs filters and processes mux events. +func (es *EventSystem) eventLoop() { + var ( + index = make(filterIndex) + sub = es.mux.Subscribe(core.PendingLogsEvent{}, core.RemovedLogsEvent{}, vm.Logs{}, core.TxPreEvent{}, core.ChainEvent{}) + ) + for { + select { + case ev, active := <-sub.Chan(): + if !active { // system stopped + return + } + broadcast(index, ev) + case f := <-es.install: + if _, found := index[f.typ]; !found { + index[f.typ] = make(map[rpc.ID]*subscription) + } + index[f.typ][f.id] = f + close(f.installed) + case f := <-es.uninstall: + delete(index[f.typ], f.id) + close(f.err) + } + } +} + +// convertLogs is a helper utility that converts vm.Logs to []filter.Log. +func convertLogs(in vm.Logs, removed bool) []Log { + logs := make([]Log, len(in)) + for i, l := range in { + logs[i] = Log{l, removed} + } + return logs +} -- cgit v1.2.3