aboutsummaryrefslogtreecommitdiffstats
path: root/eth/filters/api.go
diff options
context:
space:
mode:
authorBas van Kervel <bas@ethdev.com>2016-07-27 23:47:46 +0800
committerBas van Kervel <basvankervel@gmail.com>2016-08-17 18:59:58 +0800
commit47ff8130124b479f1f051312eed50c33f0a38e6f (patch)
treecb29e4550f63f3a763dd04b267261e354e56d7eb /eth/filters/api.go
parent3b39d4d1c15df2697284c3d7a61564f98ab45c70 (diff)
downloaddexon-47ff8130124b479f1f051312eed50c33f0a38e6f.tar
dexon-47ff8130124b479f1f051312eed50c33f0a38e6f.tar.gz
dexon-47ff8130124b479f1f051312eed50c33f0a38e6f.tar.bz2
dexon-47ff8130124b479f1f051312eed50c33f0a38e6f.tar.lz
dexon-47ff8130124b479f1f051312eed50c33f0a38e6f.tar.xz
dexon-47ff8130124b479f1f051312eed50c33f0a38e6f.tar.zst
dexon-47ff8130124b479f1f051312eed50c33f0a38e6f.zip
rpc: refactor subscriptions and filters
Diffstat (limited to 'eth/filters/api.go')
-rw-r--r--eth/filters/api.go784
1 files changed, 327 insertions, 457 deletions
diff --git a/eth/filters/api.go b/eth/filters/api.go
index 65c5b9380..3bc220348 100644
--- a/eth/filters/api.go
+++ b/eth/filters/api.go
@@ -17,292 +17,414 @@
package filters
import (
- "crypto/rand"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
+ "math/big"
"sync"
"time"
+ "golang.org/x/net/context"
+
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
-
- "golang.org/x/net/context"
)
var (
- filterTickerTime = 5 * time.Minute
+ deadline = 5 * time.Minute // consider a filter inactive if it has not been polled for within deadline
)
-// byte will be inferred
-const (
- unknownFilterTy = iota
- blockFilterTy
- transactionFilterTy
- logFilterTy
-)
+// filter is a helper struct that holds meta information over the filter type
+// and associated subscription in the event system.
+type filter struct {
+ typ Type
+ deadline *time.Timer // filter is inactiv when deadline triggers
+ hashes []common.Hash
+ crit FilterCriteria
+ logs []Log
+ s *Subscription // associated subscription in event system
+}
// PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
// information related to the Ethereum protocol such als blocks, transactions and logs.
type PublicFilterAPI struct {
- mux *event.TypeMux
-
- quit chan struct{}
- chainDb ethdb.Database
-
- filterManager *FilterSystem
-
- filterMapMu sync.RWMutex
- filterMapping map[string]int // maps between filter internal filter identifiers and external filter identifiers
-
- logMu sync.RWMutex
- logQueue map[int]*logQueue
-
- blockMu sync.RWMutex
- blockQueue map[int]*hashQueue
-
- transactionMu sync.RWMutex
- transactionQueue map[int]*hashQueue
+ mux *event.TypeMux
+ quit chan struct{}
+ chainDb ethdb.Database
+ events *EventSystem
+ filtersMu sync.Mutex
+ filters map[rpc.ID]*filter
}
// NewPublicFilterAPI returns a new PublicFilterAPI instance.
func NewPublicFilterAPI(chainDb ethdb.Database, mux *event.TypeMux) *PublicFilterAPI {
- svc := &PublicFilterAPI{
- mux: mux,
- chainDb: chainDb,
- filterManager: NewFilterSystem(mux),
- filterMapping: make(map[string]int),
- logQueue: make(map[int]*logQueue),
- blockQueue: make(map[int]*hashQueue),
- transactionQueue: make(map[int]*hashQueue),
+ api := &PublicFilterAPI{
+ mux: mux,
+ chainDb: chainDb,
+ events: NewEventSystem(mux),
+ filters: make(map[rpc.ID]*filter),
}
- go svc.start()
- return svc
-}
-// Stop quits the work loop.
-func (s *PublicFilterAPI) Stop() {
- close(s.quit)
+ go api.timeoutLoop()
+
+ return api
}
-// start the work loop, wait and process events.
-func (s *PublicFilterAPI) start() {
- timer := time.NewTicker(2 * time.Second)
- defer timer.Stop()
-done:
+// timeoutLoop runs every 5 minutes and deletes filters that have not been recently used.
+// Tt is started when the api is created.
+func (api *PublicFilterAPI) timeoutLoop() {
+ ticker := time.NewTicker(5 * time.Minute)
for {
- select {
- case <-timer.C:
- s.filterManager.Lock() // lock order like filterLoop()
- s.logMu.Lock()
- for id, filter := range s.logQueue {
- if time.Since(filter.timeout) > filterTickerTime {
- s.filterManager.Remove(id)
- delete(s.logQueue, id)
- }
+ <-ticker.C
+ api.filtersMu.Lock()
+ for id, f := range api.filters {
+ select {
+ case <-f.deadline.C:
+ f.s.Unsubscribe()
+ delete(api.filters, id)
+ default:
+ continue
}
- s.logMu.Unlock()
+ }
+ api.filtersMu.Unlock()
+ }
+}
- s.blockMu.Lock()
- for id, filter := range s.blockQueue {
- if time.Since(filter.timeout) > filterTickerTime {
- s.filterManager.Remove(id)
- delete(s.blockQueue, id)
- }
- }
- s.blockMu.Unlock()
+// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes
+// as transactions enter the pending state.
+//
+// It is part of the filter package because this filter can be used throug the
+// `eth_getFilterChanges` polling method that is also used for log filters.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter
+func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
+ var (
+ pendingTxs = make(chan common.Hash)
+ pendingTxSub = api.events.SubscribePendingTxEvents(pendingTxs)
+ )
- s.transactionMu.Lock()
- for id, filter := range s.transactionQueue {
- if time.Since(filter.timeout) > filterTickerTime {
- s.filterManager.Remove(id)
- delete(s.transactionQueue, id)
+ api.filtersMu.Lock()
+ api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: pendingTxSub}
+ api.filtersMu.Unlock()
+
+ go func() {
+ for {
+ select {
+ case ph := <-pendingTxs:
+ api.filtersMu.Lock()
+ if f, found := api.filters[pendingTxSub.ID]; found {
+ f.hashes = append(f.hashes, ph)
}
+ api.filtersMu.Unlock()
+ case <-pendingTxSub.Err():
+ api.filtersMu.Lock()
+ delete(api.filters, pendingTxSub.ID)
+ api.filtersMu.Unlock()
+ return
}
- s.transactionMu.Unlock()
- s.filterManager.Unlock()
- case <-s.quit:
- break done
}
- }
+ }()
+ return pendingTxSub.ID
}
-// NewBlockFilter create a new filter that returns blocks that are included into the canonical chain.
-func (s *PublicFilterAPI) NewBlockFilter() (string, error) {
- // protect filterManager.Add() and setting of filter fields
- s.filterManager.Lock()
- defer s.filterManager.Unlock()
-
- externalId, err := newFilterId()
- if err != nil {
- return "", err
+// NewPendingTransactions creates a subscription that is triggered each time a transaction
+// enters the transaction pool and was signed from one of the transactions this nodes manages.
+func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) {
+ notifier, supported := rpc.NotifierFromContext(ctx)
+ if !supported {
+ return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
- filter := New(s.chainDb)
- id, err := s.filterManager.Add(filter, ChainFilter)
- if err != nil {
- return "", err
- }
+ rpcSub := notifier.CreateSubscription()
+
+ go func() {
+ txHashes := make(chan common.Hash)
+ pendingTxSub := api.events.SubscribePendingTxEvents(txHashes)
+
+ for {
+ select {
+ case h := <-txHashes:
+ notifier.Notify(rpcSub.ID, h)
+ case <-rpcSub.Err():
+ pendingTxSub.Unsubscribe()
+ return
+ case <-notifier.Closed():
+ pendingTxSub.Unsubscribe()
+ return
+ }
+ }
+ }()
- s.blockMu.Lock()
- s.blockQueue[id] = &hashQueue{timeout: time.Now()}
- s.blockMu.Unlock()
+ return rpcSub, nil
+}
- filter.BlockCallback = func(block *types.Block, logs vm.Logs) {
- s.blockMu.Lock()
- defer s.blockMu.Unlock()
+// NewBlockFilter creates a filter that fetches blocks that are imported into the chain.
+// It is part of the filter package since polling goes with eth_getFilterChanges.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newblockfilter
+func (api *PublicFilterAPI) NewBlockFilter() rpc.ID {
+ var (
+ headers = make(chan *types.Header)
+ headerSub = api.events.SubscribeNewHeads(headers)
+ )
- if queue := s.blockQueue[id]; queue != nil {
- queue.add(block.Hash())
+ api.filtersMu.Lock()
+ api.filters[headerSub.ID] = &filter{typ: BlocksSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: headerSub}
+ api.filtersMu.Unlock()
+
+ go func() {
+ for {
+ select {
+ case h := <-headers:
+ api.filtersMu.Lock()
+ if f, found := api.filters[headerSub.ID]; found {
+ f.hashes = append(f.hashes, h.Hash())
+ }
+ api.filtersMu.Unlock()
+ case <-headerSub.Err():
+ api.filtersMu.Lock()
+ delete(api.filters, headerSub.ID)
+ api.filtersMu.Unlock()
+ return
+ }
}
+ }()
+
+ return headerSub.ID
+}
+
+// NewHeads send a notification each time a new (header) block is appended to the chain.
+func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
+ notifier, supported := rpc.NotifierFromContext(ctx)
+ if !supported {
+ return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
- s.filterMapMu.Lock()
- s.filterMapping[externalId] = id
- s.filterMapMu.Unlock()
+ rpcSub := notifier.CreateSubscription()
- return externalId, nil
-}
+ go func() {
+ headers := make(chan *types.Header)
+ headersSub := api.events.SubscribeNewHeads(headers)
-// NewPendingTransactionFilter creates a filter that returns new pending transactions.
-func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
- // protect filterManager.Add() and setting of filter fields
- s.filterManager.Lock()
- defer s.filterManager.Unlock()
+ for {
+ select {
+ case h := <-headers:
+ notifier.Notify(rpcSub.ID, h)
+ case <-rpcSub.Err():
+ headersSub.Unsubscribe()
+ return
+ case <-notifier.Closed():
+ headersSub.Unsubscribe()
+ return
+ }
+ }
+ }()
- externalId, err := newFilterId()
- if err != nil {
- return "", err
- }
+ return rpcSub, nil
+}
- filter := New(s.chainDb)
- id, err := s.filterManager.Add(filter, PendingTxFilter)
- if err != nil {
- return "", err
+// Logs creates a subscription that fires for all new log that match the given filter criteria.
+func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) {
+ notifier, supported := rpc.NotifierFromContext(ctx)
+ if !supported {
+ return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
- s.transactionMu.Lock()
- s.transactionQueue[id] = &hashQueue{timeout: time.Now()}
- s.transactionMu.Unlock()
+ rpcSub := notifier.CreateSubscription()
- filter.TransactionCallback = func(tx *types.Transaction) {
- s.transactionMu.Lock()
- defer s.transactionMu.Unlock()
+ go func() {
+ matchedLogs := make(chan []Log)
+ logsSub := api.events.SubscribeLogs(crit, matchedLogs)
- if queue := s.transactionQueue[id]; queue != nil {
- queue.add(tx.Hash())
+ for {
+ select {
+ case logs := <-matchedLogs:
+ for _, log := range logs {
+ notifier.Notify(rpcSub.ID, &log)
+ }
+ case <-rpcSub.Err(): // client send an unsubscribe request
+ logsSub.Unsubscribe()
+ return
+ case <-notifier.Closed(): // connection dropped
+ logsSub.Unsubscribe()
+ return
+ }
}
- }
+ }()
- s.filterMapMu.Lock()
- s.filterMapping[externalId] = id
- s.filterMapMu.Unlock()
+ return rpcSub, nil
+}
- return externalId, nil
+// FilterCriteria represents a request to create a new filter.
+type FilterCriteria struct {
+ FromBlock *big.Int
+ ToBlock *big.Int
+ Addresses []common.Address
+ Topics [][]common.Hash
}
-// newLogFilter creates a new log filter.
-func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash, callback func(log *vm.Log, removed bool)) (int, error) {
- // protect filterManager.Add() and setting of filter fields
- s.filterManager.Lock()
- defer s.filterManager.Unlock()
+// NewFilter creates a new filter and returns the filter id. It can be
+// used to retrieve logs when the state changes. This method cannot be
+// used to fetch logs that are already stored in the state.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter
+func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) rpc.ID {
+ var (
+ logs = make(chan []Log)
+ logsSub = api.events.SubscribeLogs(crit, logs)
+ )
- filter := New(s.chainDb)
- id, err := s.filterManager.Add(filter, LogFilter)
- if err != nil {
- return 0, err
+ if crit.FromBlock == nil {
+ crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
+ }
+ if crit.ToBlock == nil {
+ crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
}
- s.logMu.Lock()
- s.logQueue[id] = &logQueue{timeout: time.Now()}
- s.logMu.Unlock()
-
- filter.SetBeginBlock(earliest)
- filter.SetEndBlock(latest)
- filter.SetAddresses(addresses)
- filter.SetTopics(topics)
- filter.LogCallback = func(log *vm.Log, removed bool) {
- if callback != nil {
- callback(log, removed)
- } else {
- s.logMu.Lock()
- defer s.logMu.Unlock()
- if queue := s.logQueue[id]; queue != nil {
- queue.add(vmlog{log, removed})
+ api.filtersMu.Lock()
+ api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(deadline), logs: make([]Log, 0), s: logsSub}
+ api.filtersMu.Unlock()
+
+ go func() {
+ for {
+ select {
+ case l := <-logs:
+ api.filtersMu.Lock()
+ if f, found := api.filters[logsSub.ID]; found {
+ f.logs = append(f.logs, l...)
+ }
+ api.filtersMu.Unlock()
+ case <-logsSub.Err():
+ api.filtersMu.Lock()
+ delete(api.filters, logsSub.ID)
+ api.filtersMu.Unlock()
+ return
}
}
- }
+ }()
- return id, nil
+ return logsSub.ID
}
-// Logs creates a subscription that fires for all new log that match the given filter criteria.
-func (s *PublicFilterAPI) Logs(ctx context.Context, args NewFilterArgs) (rpc.Subscription, error) {
- notifier, supported := rpc.NotifierFromContext(ctx)
- if !supported {
- return nil, rpc.ErrNotificationsUnsupported
+// GetLogs returns logs matching the given argument that are stored within the state.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs
+func (api *PublicFilterAPI) GetLogs(crit FilterCriteria) []Log {
+ if crit.FromBlock == nil {
+ crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
+ }
+ if crit.ToBlock == nil {
+ crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
}
- var (
- externalId string
- subscription rpc.Subscription
- err error
- )
+ filter := New(api.chainDb)
+ filter.SetBeginBlock(crit.FromBlock.Int64())
+ filter.SetEndBlock(crit.ToBlock.Int64())
+ filter.SetAddresses(crit.Addresses)
+ filter.SetTopics(crit.Topics)
- if externalId, err = newFilterId(); err != nil {
- return nil, err
- }
+ return returnLogs(filter.Find())
+}
- // uninstall filter when subscription is unsubscribed/cancelled
- if subscription, err = notifier.NewSubscription(func(string) {
- s.UninstallFilter(externalId)
- }); err != nil {
- return nil, err
+// UninstallFilter removes the filter with the given filter id.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_uninstallfilter
+func (api *PublicFilterAPI) UninstallFilter(id rpc.ID) bool {
+ api.filtersMu.Lock()
+ f, found := api.filters[id]
+ if found {
+ delete(api.filters, id)
}
-
- notifySubscriber := func(log *vm.Log, removed bool) {
- rpcLog := toRPCLogs(vm.Logs{log}, removed)
- if err := subscription.Notify(rpcLog); err != nil {
- subscription.Cancel()
- }
+ api.filtersMu.Unlock()
+ if found {
+ f.s.Unsubscribe()
}
- // from and to block number are not used since subscriptions don't allow you to travel to "time"
- var id int
- if len(args.Addresses) > 0 {
- id, err = s.newLogFilter(-1, -1, args.Addresses, args.Topics, notifySubscriber)
- } else {
- id, err = s.newLogFilter(-1, -1, nil, args.Topics, notifySubscriber)
+ return found
+}
+
+// GetFilterLogs returns the logs for the filter with the given id.
+// If the filter could not be found an empty array of logs is returned.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterlogs
+func (api *PublicFilterAPI) GetFilterLogs(id rpc.ID) []Log {
+ api.filtersMu.Lock()
+ f, found := api.filters[id]
+ api.filtersMu.Unlock()
+
+ if !found || f.typ != LogsSubscription {
+ return []Log{}
}
- if err != nil {
- subscription.Cancel()
- return nil, err
+ filter := New(api.chainDb)
+ filter.SetBeginBlock(f.crit.FromBlock.Int64())
+ filter.SetEndBlock(f.crit.ToBlock.Int64())
+ filter.SetAddresses(f.crit.Addresses)
+ filter.SetTopics(f.crit.Topics)
+
+ return returnLogs(filter.Find())
+}
+
+// GetFilterChanges returns the logs for the filter with the given id since
+// last time is was called. This can be used for polling.
+//
+// For pending transaction and block filters the result is []common.Hash.
+// (pending)Log filters return []Log. If the filter could not be found
+// []interface{}{} is returned.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges
+func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) interface{} {
+ api.filtersMu.Lock()
+ defer api.filtersMu.Unlock()
+
+ if f, found := api.filters[id]; found {
+ if !f.deadline.Stop() {
+ // timer expired but filter is not yet removed in timeout loop
+ // receive timer value and reset timer
+ <-f.deadline.C
+ }
+ f.deadline.Reset(deadline)
+
+ switch f.typ {
+ case PendingTransactionsSubscription, BlocksSubscription:
+ hashes := f.hashes
+ f.hashes = nil
+ return returnHashes(hashes)
+ case PendingLogsSubscription, LogsSubscription:
+ logs := f.logs
+ f.logs = nil
+ return returnLogs(logs)
+ }
}
- s.filterMapMu.Lock()
- s.filterMapping[externalId] = id
- s.filterMapMu.Unlock()
+ return []interface{}{}
+}
- return subscription, err
+// returnHashes is a helper that will return an empty hash array case the given hash array is nil,
+// otherwise the given hashes array is returned.
+func returnHashes(hashes []common.Hash) []common.Hash {
+ if hashes == nil {
+ return []common.Hash{}
+ }
+ return hashes
}
-// NewFilterArgs represents a request to create a new filter.
-type NewFilterArgs struct {
- FromBlock rpc.BlockNumber
- ToBlock rpc.BlockNumber
- Addresses []common.Address
- Topics [][]common.Hash
+// returnLogs is a helper that will return an empty log array in case the given logs array is nil,
+// otherwise the given logs array is returned.
+func returnLogs(logs []Log) []Log {
+ if logs == nil {
+ return []Log{}
+ }
+ return logs
}
// UnmarshalJSON sets *args fields with given data.
-func (args *NewFilterArgs) UnmarshalJSON(data []byte) error {
+func (args *FilterCriteria) UnmarshalJSON(data []byte) error {
type input struct {
From *rpc.BlockNumber `json:"fromBlock"`
ToBlock *rpc.BlockNumber `json:"toBlock"`
@@ -316,15 +438,15 @@ func (args *NewFilterArgs) UnmarshalJSON(data []byte) error {
}
if raw.From == nil || raw.From.Int64() < 0 {
- args.FromBlock = rpc.LatestBlockNumber
+ args.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
} else {
- args.FromBlock = *raw.From
+ args.FromBlock = big.NewInt(raw.From.Int64())
}
if raw.ToBlock == nil || raw.ToBlock.Int64() < 0 {
- args.ToBlock = rpc.LatestBlockNumber
+ args.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
} else {
- args.ToBlock = *raw.ToBlock
+ args.ToBlock = big.NewInt(raw.ToBlock.Int64())
}
args.Addresses = []common.Address{}
@@ -414,255 +536,3 @@ func (args *NewFilterArgs) UnmarshalJSON(data []byte) error {
return nil
}
-
-// NewFilter creates a new filter and returns the filter id. It can be uses to retrieve logs.
-func (s *PublicFilterAPI) NewFilter(args NewFilterArgs) (string, error) {
- externalId, err := newFilterId()
- if err != nil {
- return "", err
- }
-
- var id int
- if len(args.Addresses) > 0 {
- id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics, nil)
- } else {
- id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics, nil)
- }
- if err != nil {
- return "", err
- }
-
- s.filterMapMu.Lock()
- s.filterMapping[externalId] = id
- s.filterMapMu.Unlock()
-
- return externalId, nil
-}
-
-// GetLogs returns the logs matching the given argument.
-func (s *PublicFilterAPI) GetLogs(args NewFilterArgs) []vmlog {
- filter := New(s.chainDb)
- filter.SetBeginBlock(args.FromBlock.Int64())
- filter.SetEndBlock(args.ToBlock.Int64())
- filter.SetAddresses(args.Addresses)
- filter.SetTopics(args.Topics)
-
- return toRPCLogs(filter.Find(), false)
-}
-
-// UninstallFilter removes the filter with the given filter id.
-func (s *PublicFilterAPI) UninstallFilter(filterId string) bool {
- s.filterManager.Lock()
- defer s.filterManager.Unlock()
-
- s.filterMapMu.Lock()
- id, ok := s.filterMapping[filterId]
- if !ok {
- s.filterMapMu.Unlock()
- return false
- }
- delete(s.filterMapping, filterId)
- s.filterMapMu.Unlock()
-
- s.filterManager.Remove(id)
-
- s.logMu.Lock()
- if _, ok := s.logQueue[id]; ok {
- delete(s.logQueue, id)
- s.logMu.Unlock()
- return true
- }
- s.logMu.Unlock()
-
- s.blockMu.Lock()
- if _, ok := s.blockQueue[id]; ok {
- delete(s.blockQueue, id)
- s.blockMu.Unlock()
- return true
- }
- s.blockMu.Unlock()
-
- s.transactionMu.Lock()
- if _, ok := s.transactionQueue[id]; ok {
- delete(s.transactionQueue, id)
- s.transactionMu.Unlock()
- return true
- }
- s.transactionMu.Unlock()
-
- return false
-}
-
-// getFilterType is a helper utility that determine the type of filter for the given filter id.
-func (s *PublicFilterAPI) getFilterType(id int) byte {
- if _, ok := s.blockQueue[id]; ok {
- return blockFilterTy
- } else if _, ok := s.transactionQueue[id]; ok {
- return transactionFilterTy
- } else if _, ok := s.logQueue[id]; ok {
- return logFilterTy
- }
-
- return unknownFilterTy
-}
-
-// blockFilterChanged returns a collection of block hashes for the block filter with the given id.
-func (s *PublicFilterAPI) blockFilterChanged(id int) []common.Hash {
- s.blockMu.Lock()
- defer s.blockMu.Unlock()
-
- if s.blockQueue[id] != nil {
- return s.blockQueue[id].get()
- }
- return nil
-}
-
-// transactionFilterChanged returns a collection of transaction hashes for the pending
-// transaction filter with the given id.
-func (s *PublicFilterAPI) transactionFilterChanged(id int) []common.Hash {
- s.blockMu.Lock()
- defer s.blockMu.Unlock()
-
- if s.transactionQueue[id] != nil {
- return s.transactionQueue[id].get()
- }
- return nil
-}
-
-// logFilterChanged returns a collection of logs for the log filter with the given id.
-func (s *PublicFilterAPI) logFilterChanged(id int) []vmlog {
- s.logMu.Lock()
- defer s.logMu.Unlock()
-
- if s.logQueue[id] != nil {
- return s.logQueue[id].get()
- }
- return nil
-}
-
-// GetFilterLogs returns the logs for the filter with the given id.
-func (s *PublicFilterAPI) GetFilterLogs(filterId string) []vmlog {
- s.filterMapMu.RLock()
- id, ok := s.filterMapping[filterId]
- s.filterMapMu.RUnlock()
- if !ok {
- return toRPCLogs(nil, false)
- }
-
- if filter := s.filterManager.Get(id); filter != nil {
- return toRPCLogs(filter.Find(), false)
- }
-
- return toRPCLogs(nil, false)
-}
-
-// GetFilterChanges returns the logs for the filter with the given id since last time is was called.
-// This can be used for polling.
-func (s *PublicFilterAPI) GetFilterChanges(filterId string) interface{} {
- s.filterMapMu.RLock()
- id, ok := s.filterMapping[filterId]
- s.filterMapMu.RUnlock()
-
- if !ok { // filter not found
- return []interface{}{}
- }
-
- switch s.getFilterType(id) {
- case blockFilterTy:
- return returnHashes(s.blockFilterChanged(id))
- case transactionFilterTy:
- return returnHashes(s.transactionFilterChanged(id))
- case logFilterTy:
- return s.logFilterChanged(id)
- }
-
- return []interface{}{}
-}
-
-type vmlog struct {
- *vm.Log
- Removed bool `json:"removed"`
-}
-
-type logQueue struct {
- mu sync.Mutex
-
- logs []vmlog
- timeout time.Time
- id int
-}
-
-func (l *logQueue) add(logs ...vmlog) {
- l.mu.Lock()
- defer l.mu.Unlock()
-
- l.logs = append(l.logs, logs...)
-}
-
-func (l *logQueue) get() []vmlog {
- l.mu.Lock()
- defer l.mu.Unlock()
-
- l.timeout = time.Now()
- tmp := l.logs
- l.logs = nil
- return tmp
-}
-
-type hashQueue struct {
- mu sync.Mutex
-
- hashes []common.Hash
- timeout time.Time
- id int
-}
-
-func (l *hashQueue) add(hashes ...common.Hash) {
- l.mu.Lock()
- defer l.mu.Unlock()
-
- l.hashes = append(l.hashes, hashes...)
-}
-
-func (l *hashQueue) get() []common.Hash {
- l.mu.Lock()
- defer l.mu.Unlock()
-
- l.timeout = time.Now()
- tmp := l.hashes
- l.hashes = nil
- return tmp
-}
-
-// newFilterId generates a new random filter identifier that can be exposed to the outer world. By publishing random
-// identifiers it is not feasible for DApp's to guess filter id's for other DApp's and uninstall or poll for them
-// causing the affected DApp to miss data.
-func newFilterId() (string, error) {
- var subid [16]byte
- n, _ := rand.Read(subid[:])
- if n != 16 {
- return "", errors.New("Unable to generate filter id")
- }
- return "0x" + hex.EncodeToString(subid[:]), nil
-}
-
-// toRPCLogs is a helper that will convert a vm.Logs array to an structure which
-// can hold additional information about the logs such as whether it was deleted.
-// Additionally when nil is given it will by default instead create an empty slice
-// instead. This is required by the RPC specification.
-func toRPCLogs(logs vm.Logs, removed bool) []vmlog {
- convertedLogs := make([]vmlog, len(logs))
- for i, log := range logs {
- convertedLogs[i] = vmlog{Log: log, Removed: removed}
- }
- return convertedLogs
-}
-
-// returnHashes is a helper that will return an empty hash array case the given hash array is nil, otherwise is will
-// return the given hashes. The RPC interfaces defines that always an array is returned.
-func returnHashes(hashes []common.Hash) []common.Hash {
- if hashes == nil {
- return []common.Hash{}
- }
- return hashes
-}