aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBas van Kervel <bas@ethdev.com>2016-07-27 23:47:46 +0800
committerBas van Kervel <basvankervel@gmail.com>2016-08-17 18:59:58 +0800
commit47ff8130124b479f1f051312eed50c33f0a38e6f (patch)
treecb29e4550f63f3a763dd04b267261e354e56d7eb
parent3b39d4d1c15df2697284c3d7a61564f98ab45c70 (diff)
downloaddexon-47ff8130124b479f1f051312eed50c33f0a38e6f.tar
dexon-47ff8130124b479f1f051312eed50c33f0a38e6f.tar.gz
dexon-47ff8130124b479f1f051312eed50c33f0a38e6f.tar.bz2
dexon-47ff8130124b479f1f051312eed50c33f0a38e6f.tar.lz
dexon-47ff8130124b479f1f051312eed50c33f0a38e6f.tar.xz
dexon-47ff8130124b479f1f051312eed50c33f0a38e6f.tar.zst
dexon-47ff8130124b479f1f051312eed50c33f0a38e6f.zip
rpc: refactor subscriptions and filters
-rw-r--r--core/types/block.go22
-rw-r--r--eth/downloader/api.go157
-rw-r--r--eth/filters/api.go784
-rw-r--r--eth/filters/api_test.go27
-rw-r--r--eth/filters/filter.go120
-rw-r--r--eth/filters/filter_system.go370
-rw-r--r--eth/filters/filter_system_test.go349
-rw-r--r--internal/ethapi/api.go126
-rw-r--r--rpc/notification.go297
-rw-r--r--rpc/server.go12
-rw-r--r--rpc/server_test.go2
-rw-r--r--rpc/subscription.go135
-rw-r--r--rpc/subscription_test.go (renamed from rpc/notification_test.go)34
-rw-r--r--rpc/types.go4
-rw-r--r--rpc/utils.go42
15 files changed, 1265 insertions, 1216 deletions
diff --git a/core/types/block.go b/core/types/block.go
index 37b6f3ec1..599359247 100644
--- a/core/types/block.go
+++ b/core/types/block.go
@@ -114,6 +114,28 @@ func (h *Header) UnmarshalJSON(data []byte) error {
return nil
}
+func (h *Header) MarshalJSON() ([]byte, error) {
+ fields := map[string]interface{}{
+ "hash": h.Hash(),
+ "parentHash": h.ParentHash,
+ "number": fmt.Sprintf("%#x", h.Number),
+ "nonce": h.Nonce,
+ "receiptRoot": h.ReceiptHash,
+ "logsBloom": h.Bloom,
+ "sha3Uncles": h.UncleHash,
+ "stateRoot": h.Root,
+ "miner": h.Coinbase,
+ "difficulty": fmt.Sprintf("%#x", h.Difficulty),
+ "extraData": fmt.Sprintf("0x%x", h.Extra),
+ "gasLimit": fmt.Sprintf("%#x", h.GasLimit),
+ "gasUsed": fmt.Sprintf("%#x", h.GasUsed),
+ "timestamp": fmt.Sprintf("%#x", h.Time),
+ "transactionsRoot": h.TxHash,
+ }
+
+ return json.Marshal(fields)
+}
+
func rlpHash(x interface{}) (h common.Hash) {
hw := sha3.NewKeccak256()
rlp.Encode(hw, x)
diff --git a/eth/downloader/api.go b/eth/downloader/api.go
index 94cd6515f..c36dfb7e0 100644
--- a/eth/downloader/api.go
+++ b/eth/downloader/api.go
@@ -19,53 +19,102 @@ package downloader
import (
"sync"
- "golang.org/x/net/context"
-
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
+ "golang.org/x/net/context"
)
// PublicDownloaderAPI provides an API which gives information about the current synchronisation status.
// It offers only methods that operates on data that can be available to anyone without security risks.
type PublicDownloaderAPI struct {
- d *Downloader
- mux *event.TypeMux
- muSyncSubscriptions sync.Mutex
- syncSubscriptions map[string]rpc.Subscription
+ d *Downloader
+ mux *event.TypeMux
+ installSyncSubscription chan chan interface{}
+ uninstallSyncSubscription chan *uninstallSyncSubscriptionRequest
}
-// NewPublicDownloaderAPI create a new PublicDownloaderAPI.
+// NewPublicDownloaderAPI create a new PublicDownloaderAPI. The API has an internal event loop that
+// listens for events from the downloader through the global event mux. In case it receives one of
+// these events it broadcasts it to all syncing subscriptions that are installed through the
+// installSyncSubscription channel.
func NewPublicDownloaderAPI(d *Downloader, m *event.TypeMux) *PublicDownloaderAPI {
- api := &PublicDownloaderAPI{d: d, mux: m, syncSubscriptions: make(map[string]rpc.Subscription)}
+ api := &PublicDownloaderAPI{
+ d: d,
+ mux: m,
+ installSyncSubscription: make(chan chan interface{}),
+ uninstallSyncSubscription: make(chan *uninstallSyncSubscriptionRequest),
+ }
- go api.run()
+ go api.eventLoop()
return api
}
-func (api *PublicDownloaderAPI) run() {
- sub := api.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{})
-
- for event := range sub.Chan() {
- var notification interface{}
+// eventLoop runs an loop until the event mux closes. It will install and uninstall new
+// sync subscriptions and broadcasts sync status updates to the installed sync subscriptions.
+func (api *PublicDownloaderAPI) eventLoop() {
+ var (
+ sub = api.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{})
+ syncSubscriptions = make(map[chan interface{}]struct{})
+ )
+
+ for {
+ select {
+ case i := <-api.installSyncSubscription:
+ syncSubscriptions[i] = struct{}{}
+ case u := <-api.uninstallSyncSubscription:
+ delete(syncSubscriptions, u.c)
+ close(u.uninstalled)
+ case event := <-sub.Chan():
+ if event == nil {
+ return
+ }
- switch event.Data.(type) {
- case StartEvent:
- result := &SyncingResult{Syncing: true}
- result.Status.Origin, result.Status.Current, result.Status.Height, result.Status.Pulled, result.Status.Known = api.d.Progress()
- notification = result
- case DoneEvent, FailedEvent:
- notification = false
+ var notification interface{}
+ switch event.Data.(type) {
+ case StartEvent:
+ result := &SyncingResult{Syncing: true}
+ result.Status.Origin, result.Status.Current, result.Status.Height, result.Status.Pulled, result.Status.Known = api.d.Progress()
+ notification = result
+ case DoneEvent, FailedEvent:
+ notification = false
+ }
+ // broadcast
+ for c := range syncSubscriptions {
+ c <- notification
+ }
}
+ }
+}
+
+// Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished.
+func (api *PublicDownloaderAPI) Syncing(ctx context.Context) (*rpc.Subscription, error) {
+ notifier, supported := rpc.NotifierFromContext(ctx)
+ if !supported {
+ return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
+ }
- api.muSyncSubscriptions.Lock()
- for id, sub := range api.syncSubscriptions {
- if sub.Notify(notification) == rpc.ErrNotificationNotFound {
- delete(api.syncSubscriptions, id)
+ rpcSub := notifier.CreateSubscription()
+
+ go func() {
+ statuses := make(chan interface{})
+ sub := api.SubscribeSyncStatus(statuses)
+
+ for {
+ select {
+ case status := <-statuses:
+ notifier.Notify(rpcSub.ID, status)
+ case <-rpcSub.Err():
+ sub.Unsubscribe()
+ return
+ case <-notifier.Closed():
+ sub.Unsubscribe()
+ return
}
}
- api.muSyncSubscriptions.Unlock()
- }
+ }()
+
+ return rpcSub, nil
}
// Progress gives progress indications when the node is synchronising with the Ethereum network.
@@ -83,26 +132,42 @@ type SyncingResult struct {
Status Progress `json:"status"`
}
-// Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished.
-func (api *PublicDownloaderAPI) Syncing(ctx context.Context) (rpc.Subscription, error) {
- notifier, supported := rpc.NotifierFromContext(ctx)
- if !supported {
- return nil, rpc.ErrNotificationsUnsupported
- }
-
- subscription, err := notifier.NewSubscription(func(id string) {
- api.muSyncSubscriptions.Lock()
- delete(api.syncSubscriptions, id)
- api.muSyncSubscriptions.Unlock()
- })
+// uninstallSyncSubscriptionRequest uninstalles a syncing subscription in the API event loop.
+type uninstallSyncSubscriptionRequest struct {
+ c chan interface{}
+ uninstalled chan interface{}
+}
- if err != nil {
- return nil, err
- }
+// SyncStatusSubscription represents a syncing subscription.
+type SyncStatusSubscription struct {
+ api *PublicDownloaderAPI // register subscription in event loop of this api instance
+ c chan interface{} // channel where events are broadcasted to
+ unsubOnce sync.Once // make sure unsubscribe logic is executed once
+}
- api.muSyncSubscriptions.Lock()
- api.syncSubscriptions[subscription.ID()] = subscription
- api.muSyncSubscriptions.Unlock()
+// Unsubscribe uninstalls the subscription from the DownloadAPI event loop.
+// The status channel that was passed to subscribeSyncStatus isn't used anymore
+// after this method returns.
+func (s *SyncStatusSubscription) Unsubscribe() {
+ s.unsubOnce.Do(func() {
+ req := uninstallSyncSubscriptionRequest{s.c, make(chan interface{})}
+ s.api.uninstallSyncSubscription <- &req
+
+ for {
+ select {
+ case <-s.c:
+ // drop new status events until uninstall confirmation
+ continue
+ case <-req.uninstalled:
+ return
+ }
+ }
+ })
+}
- return subscription, nil
+// SubscribeSyncStatus creates a subscription that will broadcast new synchronisation updates.
+// The given channel must receive interface values, the result can either
+func (api *PublicDownloaderAPI) SubscribeSyncStatus(status chan interface{}) *SyncStatusSubscription {
+ api.installSyncSubscription <- status
+ return &SyncStatusSubscription{api: api, c: status}
}
diff --git a/eth/filters/api.go b/eth/filters/api.go
index 65c5b9380..3bc220348 100644
--- a/eth/filters/api.go
+++ b/eth/filters/api.go
@@ -17,292 +17,414 @@
package filters
import (
- "crypto/rand"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
+ "math/big"
"sync"
"time"
+ "golang.org/x/net/context"
+
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
-
- "golang.org/x/net/context"
)
var (
- filterTickerTime = 5 * time.Minute
+ deadline = 5 * time.Minute // consider a filter inactive if it has not been polled for within deadline
)
-// byte will be inferred
-const (
- unknownFilterTy = iota
- blockFilterTy
- transactionFilterTy
- logFilterTy
-)
+// filter is a helper struct that holds meta information over the filter type
+// and associated subscription in the event system.
+type filter struct {
+ typ Type
+ deadline *time.Timer // filter is inactiv when deadline triggers
+ hashes []common.Hash
+ crit FilterCriteria
+ logs []Log
+ s *Subscription // associated subscription in event system
+}
// PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
// information related to the Ethereum protocol such als blocks, transactions and logs.
type PublicFilterAPI struct {
- mux *event.TypeMux
-
- quit chan struct{}
- chainDb ethdb.Database
-
- filterManager *FilterSystem
-
- filterMapMu sync.RWMutex
- filterMapping map[string]int // maps between filter internal filter identifiers and external filter identifiers
-
- logMu sync.RWMutex
- logQueue map[int]*logQueue
-
- blockMu sync.RWMutex
- blockQueue map[int]*hashQueue
-
- transactionMu sync.RWMutex
- transactionQueue map[int]*hashQueue
+ mux *event.TypeMux
+ quit chan struct{}
+ chainDb ethdb.Database
+ events *EventSystem
+ filtersMu sync.Mutex
+ filters map[rpc.ID]*filter
}
// NewPublicFilterAPI returns a new PublicFilterAPI instance.
func NewPublicFilterAPI(chainDb ethdb.Database, mux *event.TypeMux) *PublicFilterAPI {
- svc := &PublicFilterAPI{
- mux: mux,
- chainDb: chainDb,
- filterManager: NewFilterSystem(mux),
- filterMapping: make(map[string]int),
- logQueue: make(map[int]*logQueue),
- blockQueue: make(map[int]*hashQueue),
- transactionQueue: make(map[int]*hashQueue),
+ api := &PublicFilterAPI{
+ mux: mux,
+ chainDb: chainDb,
+ events: NewEventSystem(mux),
+ filters: make(map[rpc.ID]*filter),
}
- go svc.start()
- return svc
-}
-// Stop quits the work loop.
-func (s *PublicFilterAPI) Stop() {
- close(s.quit)
+ go api.timeoutLoop()
+
+ return api
}
-// start the work loop, wait and process events.
-func (s *PublicFilterAPI) start() {
- timer := time.NewTicker(2 * time.Second)
- defer timer.Stop()
-done:
+// timeoutLoop runs every 5 minutes and deletes filters that have not been recently used.
+// Tt is started when the api is created.
+func (api *PublicFilterAPI) timeoutLoop() {
+ ticker := time.NewTicker(5 * time.Minute)
for {
- select {
- case <-timer.C:
- s.filterManager.Lock() // lock order like filterLoop()
- s.logMu.Lock()
- for id, filter := range s.logQueue {
- if time.Since(filter.timeout) > filterTickerTime {
- s.filterManager.Remove(id)
- delete(s.logQueue, id)
- }
+ <-ticker.C
+ api.filtersMu.Lock()
+ for id, f := range api.filters {
+ select {
+ case <-f.deadline.C:
+ f.s.Unsubscribe()
+ delete(api.filters, id)
+ default:
+ continue
}
- s.logMu.Unlock()
+ }
+ api.filtersMu.Unlock()
+ }
+}
- s.blockMu.Lock()
- for id, filter := range s.blockQueue {
- if time.Since(filter.timeout) > filterTickerTime {
- s.filterManager.Remove(id)
- delete(s.blockQueue, id)
- }
- }
- s.blockMu.Unlock()
+// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes
+// as transactions enter the pending state.
+//
+// It is part of the filter package because this filter can be used throug the
+// `eth_getFilterChanges` polling method that is also used for log filters.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter
+func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
+ var (
+ pendingTxs = make(chan common.Hash)
+ pendingTxSub = api.events.SubscribePendingTxEvents(pendingTxs)
+ )
- s.transactionMu.Lock()
- for id, filter := range s.transactionQueue {
- if time.Since(filter.timeout) > filterTickerTime {
- s.filterManager.Remove(id)
- delete(s.transactionQueue, id)
+ api.filtersMu.Lock()
+ api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: pendingTxSub}
+ api.filtersMu.Unlock()
+
+ go func() {
+ for {
+ select {
+ case ph := <-pendingTxs:
+ api.filtersMu.Lock()
+ if f, found := api.filters[pendingTxSub.ID]; found {
+ f.hashes = append(f.hashes, ph)
}
+ api.filtersMu.Unlock()
+ case <-pendingTxSub.Err():
+ api.filtersMu.Lock()
+ delete(api.filters, pendingTxSub.ID)
+ api.filtersMu.Unlock()
+ return
}
- s.transactionMu.Unlock()
- s.filterManager.Unlock()
- case <-s.quit:
- break done
}
- }
+ }()
+ return pendingTxSub.ID
}
-// NewBlockFilter create a new filter that returns blocks that are included into the canonical chain.
-func (s *PublicFilterAPI) NewBlockFilter() (string, error) {
- // protect filterManager.Add() and setting of filter fields
- s.filterManager.Lock()
- defer s.filterManager.Unlock()
-
- externalId, err := newFilterId()
- if err != nil {
- return "", err
+// NewPendingTransactions creates a subscription that is triggered each time a transaction
+// enters the transaction pool and was signed from one of the transactions this nodes manages.
+func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) {
+ notifier, supported := rpc.NotifierFromContext(ctx)
+ if !supported {
+ return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
- filter := New(s.chainDb)
- id, err := s.filterManager.Add(filter, ChainFilter)
- if err != nil {
- return "", err
- }
+ rpcSub := notifier.CreateSubscription()
+
+ go func() {
+ txHashes := make(chan common.Hash)
+ pendingTxSub := api.events.SubscribePendingTxEvents(txHashes)
+
+ for {
+ select {
+ case h := <-txHashes:
+ notifier.Notify(rpcSub.ID, h)
+ case <-rpcSub.Err():
+ pendingTxSub.Unsubscribe()
+ return
+ case <-notifier.Closed():
+ pendingTxSub.Unsubscribe()
+ return
+ }
+ }
+ }()
- s.blockMu.Lock()
- s.blockQueue[id] = &hashQueue{timeout: time.Now()}
- s.blockMu.Unlock()
+ return rpcSub, nil
+}
- filter.BlockCallback = func(block *types.Block, logs vm.Logs) {
- s.blockMu.Lock()
- defer s.blockMu.Unlock()
+// NewBlockFilter creates a filter that fetches blocks that are imported into the chain.
+// It is part of the filter package since polling goes with eth_getFilterChanges.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newblockfilter
+func (api *PublicFilterAPI) NewBlockFilter() rpc.ID {
+ var (
+ headers = make(chan *types.Header)
+ headerSub = api.events.SubscribeNewHeads(headers)
+ )
- if queue := s.blockQueue[id]; queue != nil {
- queue.add(block.Hash())
+ api.filtersMu.Lock()
+ api.filters[headerSub.ID] = &filter{typ: BlocksSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: headerSub}
+ api.filtersMu.Unlock()
+
+ go func() {
+ for {
+ select {
+ case h := <-headers:
+ api.filtersMu.Lock()
+ if f, found := api.filters[headerSub.ID]; found {
+ f.hashes = append(f.hashes, h.Hash())
+ }
+ api.filtersMu.Unlock()
+ case <-headerSub.Err():
+ api.filtersMu.Lock()
+ delete(api.filters, headerSub.ID)
+ api.filtersMu.Unlock()
+ return
+ }
}
+ }()
+
+ return headerSub.ID
+}
+
+// NewHeads send a notification each time a new (header) block is appended to the chain.
+func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
+ notifier, supported := rpc.NotifierFromContext(ctx)
+ if !supported {
+ return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
- s.filterMapMu.Lock()
- s.filterMapping[externalId] = id
- s.filterMapMu.Unlock()
+ rpcSub := notifier.CreateSubscription()
- return externalId, nil
-}
+ go func() {
+ headers := make(chan *types.Header)
+ headersSub := api.events.SubscribeNewHeads(headers)
-// NewPendingTransactionFilter creates a filter that returns new pending transactions.
-func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
- // protect filterManager.Add() and setting of filter fields
- s.filterManager.Lock()
- defer s.filterManager.Unlock()
+ for {
+ select {
+ case h := <-headers:
+ notifier.Notify(rpcSub.ID, h)
+ case <-rpcSub.Err():
+ headersSub.Unsubscribe()
+ return
+ case <-notifier.Closed():
+ headersSub.Unsubscribe()
+ return
+ }
+ }
+ }()
- externalId, err := newFilterId()
- if err != nil {
- return "", err
- }
+ return rpcSub, nil
+}
- filter := New(s.chainDb)
- id, err := s.filterManager.Add(filter, PendingTxFilter)
- if err != nil {
- return "", err
+// Logs creates a subscription that fires for all new log that match the given filter criteria.
+func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) {
+ notifier, supported := rpc.NotifierFromContext(ctx)
+ if !supported {
+ return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
- s.transactionMu.Lock()
- s.transactionQueue[id] = &hashQueue{timeout: time.Now()}
- s.transactionMu.Unlock()
+ rpcSub := notifier.CreateSubscription()
- filter.TransactionCallback = func(tx *types.Transaction) {
- s.transactionMu.Lock()
- defer s.transactionMu.Unlock()
+ go func() {
+ matchedLogs := make(chan []Log)
+ logsSub := api.events.SubscribeLogs(crit, matchedLogs)
- if queue := s.transactionQueue[id]; queue != nil {
- queue.add(tx.Hash())
+ for {
+ select {
+ case logs := <-matchedLogs:
+ for _, log := range logs {
+ notifier.Notify(rpcSub.ID, &log)
+ }
+ case <-rpcSub.Err(): // client send an unsubscribe request
+ logsSub.Unsubscribe()
+ return
+ case <-notifier.Closed(): // connection dropped
+ logsSub.Unsubscribe()
+ return
+ }
}
- }
+ }()
- s.filterMapMu.Lock()
- s.filterMapping[externalId] = id
- s.filterMapMu.Unlock()
+ return rpcSub, nil
+}
- return externalId, nil
+// FilterCriteria represents a request to create a new filter.
+type FilterCriteria struct {
+ FromBlock *big.Int
+ ToBlock *big.Int
+ Addresses []common.Address
+ Topics [][]common.Hash
}
-// newLogFilter creates a new log filter.
-func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash, callback func(log *vm.Log, removed bool)) (int, error) {
- // protect filterManager.Add() and setting of filter fields
- s.filterManager.Lock()
- defer s.filterManager.Unlock()
+// NewFilter creates a new filter and returns the filter id. It can be
+// used to retrieve logs when the state changes. This method cannot be
+// used to fetch logs that are already stored in the state.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter
+func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) rpc.ID {
+ var (
+ logs = make(chan []Log)
+ logsSub = api.events.SubscribeLogs(crit, logs)
+ )
- filter := New(s.chainDb)
- id, err := s.filterManager.Add(filter, LogFilter)
- if err != nil {
- return 0, err
+ if crit.FromBlock == nil {
+ crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
+ }
+ if crit.ToBlock == nil {
+ crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
}
- s.logMu.Lock()
- s.logQueue[id] = &logQueue{timeout: time.Now()}
- s.logMu.Unlock()
-
- filter.SetBeginBlock(earliest)
- filter.SetEndBlock(latest)
- filter.SetAddresses(addresses)
- filter.SetTopics(topics)
- filter.LogCallback = func(log *vm.Log, removed bool) {
- if callback != nil {
- callback(log, removed)
- } else {
- s.logMu.Lock()
- defer s.logMu.Unlock()
- if queue := s.logQueue[id]; queue != nil {
- queue.add(vmlog{log, removed})
+ api.filtersMu.Lock()
+ api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(deadline), logs: make([]Log, 0), s: logsSub}
+ api.filtersMu.Unlock()
+
+ go func() {
+ for {
+ select {
+ case l := <-logs:
+ api.filtersMu.Lock()
+ if f, found := api.filters[logsSub.ID]; found {
+ f.logs = append(f.logs, l...)
+ }
+ api.filtersMu.Unlock()
+ case <-logsSub.Err():
+ api.filtersMu.Lock()
+ delete(api.filters, logsSub.ID)
+ api.filtersMu.Unlock()
+ return
}
}
- }
+ }()
- return id, nil
+ return logsSub.ID
}
-// Logs creates a subscription that fires for all new log that match the given filter criteria.
-func (s *PublicFilterAPI) Logs(ctx context.Context, args NewFilterArgs) (rpc.Subscription, error) {
- notifier, supported := rpc.NotifierFromContext(ctx)
- if !supported {
- return nil, rpc.ErrNotificationsUnsupported
+// GetLogs returns logs matching the given argument that are stored within the state.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs
+func (api *PublicFilterAPI) GetLogs(crit FilterCriteria) []Log {
+ if crit.FromBlock == nil {
+ crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
+ }
+ if crit.ToBlock == nil {
+ crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
}
- var (
- externalId string
- subscription rpc.Subscription
- err error
- )
+ filter := New(api.chainDb)
+ filter.SetBeginBlock(crit.FromBlock.Int64())
+ filter.SetEndBlock(crit.ToBlock.Int64())
+ filter.SetAddresses(crit.Addresses)
+ filter.SetTopics(crit.Topics)
- if externalId, err = newFilterId(); err != nil {
- return nil, err
- }
+ return returnLogs(filter.Find())
+}
- // uninstall filter when subscription is unsubscribed/cancelled
- if subscription, err = notifier.NewSubscription(func(string) {
- s.UninstallFilter(externalId)
- }); err != nil {
- return nil, err
+// UninstallFilter removes the filter with the given filter id.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_uninstallfilter
+func (api *PublicFilterAPI) UninstallFilter(id rpc.ID) bool {
+ api.filtersMu.Lock()
+ f, found := api.filters[id]
+ if found {
+ delete(api.filters, id)
}
-
- notifySubscriber := func(log *vm.Log, removed bool) {
- rpcLog := toRPCLogs(vm.Logs{log}, removed)
- if err := subscription.Notify(rpcLog); err != nil {
- subscription.Cancel()
- }
+ api.filtersMu.Unlock()
+ if found {
+ f.s.Unsubscribe()
}
- // from and to block number are not used since subscriptions don't allow you to travel to "time"
- var id int
- if len(args.Addresses) > 0 {
- id, err = s.newLogFilter(-1, -1, args.Addresses, args.Topics, notifySubscriber)
- } else {
- id, err = s.newLogFilter(-1, -1, nil, args.Topics, notifySubscriber)
+ return found
+}
+
+// GetFilterLogs returns the logs for the filter with the given id.
+// If the filter could not be found an empty array of logs is returned.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterlogs
+func (api *PublicFilterAPI) GetFilterLogs(id rpc.ID) []Log {
+ api.filtersMu.Lock()
+ f, found := api.filters[id]
+ api.filtersMu.Unlock()
+
+ if !found || f.typ != LogsSubscription {
+ return []Log{}
}
- if err != nil {
- subscription.Cancel()
- return nil, err
+ filter := New(api.chainDb)
+ filter.SetBeginBlock(f.crit.FromBlock.Int64())
+ filter.SetEndBlock(f.crit.ToBlock.Int64())
+ filter.SetAddresses(f.crit.Addresses)
+ filter.SetTopics(f.crit.Topics)
+
+ return returnLogs(filter.Find())
+}
+
+// GetFilterChanges returns the logs for the filter with the given id since
+// last time is was called. This can be used for polling.
+//
+// For pending transaction and block filters the result is []common.Hash.
+// (pending)Log filters return []Log. If the filter could not be found
+// []interface{}{} is returned.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges
+func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) interface{} {
+ api.filtersMu.Lock()
+ defer api.filtersMu.Unlock()
+
+ if f, found := api.filters[id]; found {
+ if !f.deadline.Stop() {
+ // timer expired but filter is not yet removed in timeout loop
+ // receive timer value and reset timer
+ <-f.deadline.C
+ }
+ f.deadline.Reset(deadline)
+
+ switch f.typ {
+ case PendingTransactionsSubscription, BlocksSubscription:
+ hashes := f.hashes
+ f.hashes = nil
+ return returnHashes(hashes)
+ case PendingLogsSubscription, LogsSubscription:
+ logs := f.logs
+ f.logs = nil
+ return returnLogs(logs)
+ }
}
- s.filterMapMu.Lock()
- s.filterMapping[externalId] = id
- s.filterMapMu.Unlock()
+ return []interface{}{}
+}
- return subscription, err
+// returnHashes is a helper that will return an empty hash array case the given hash array is nil,
+// otherwise the given hashes array is returned.
+func returnHashes(hashes []common.Hash) []common.Hash {
+ if hashes == nil {
+ return []common.Hash{}
+ }
+ return hashes
}
-// NewFilterArgs represents a request to create a new filter.
-type NewFilterArgs struct {
- FromBlock rpc.BlockNumber
- ToBlock rpc.BlockNumber
- Addresses []common.Address
- Topics [][]common.Hash
+// returnLogs is a helper that will return an empty log array in case the given logs array is nil,
+// otherwise the given logs array is returned.
+func returnLogs(logs []Log) []Log {
+ if logs == nil {
+ return []Log{}
+ }
+ return logs
}
// UnmarshalJSON sets *args fields with given data.
-func (args *NewFilterArgs) UnmarshalJSON(data []byte) error {
+func (args *FilterCriteria) UnmarshalJSON(data []byte) error {
type input struct {
From *rpc.BlockNumber `json:"fromBlock"`
ToBlock *rpc.BlockNumber `json:"toBlock"`
@@ -316,15 +438,15 @@ func (args *NewFilterArgs) UnmarshalJSON(data []byte) error {
}
if raw.From == nil || raw.From.Int64() < 0 {
- args.FromBlock = rpc.LatestBlockNumber
+ args.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
} else {
- args.FromBlock = *raw.From
+ args.FromBlock = big.NewInt(raw.From.Int64())
}
if raw.ToBlock == nil || raw.ToBlock.Int64() < 0 {
- args.ToBlock = rpc.LatestBlockNumber
+ args.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
} else {
- args.ToBlock = *raw.ToBlock
+ args.ToBlock = big.NewInt(raw.ToBlock.Int64())
}
args.Addresses = []common.Address{}
@@ -414,255 +536,3 @@ func (args *NewFilterArgs) UnmarshalJSON(data []byte) error {
return nil
}
-
-// NewFilter creates a new filter and returns the filter id. It can be uses to retrieve logs.
-func (s *PublicFilterAPI) NewFilter(args NewFilterArgs) (string, error) {
- externalId, err := newFilterId()
- if err != nil {
- return "", err
- }
-
- var id int
- if len(args.Addresses) > 0 {
- id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics, nil)
- } else {
- id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics, nil)
- }
- if err != nil {
- return "", err
- }
-
- s.filterMapMu.Lock()
- s.filterMapping[externalId] = id
- s.filterMapMu.Unlock()
-
- return externalId, nil
-}
-
-// GetLogs returns the logs matching the given argument.
-func (s *PublicFilterAPI) GetLogs(args NewFilterArgs) []vmlog {
- filter := New(s.chainDb)
- filter.SetBeginBlock(args.FromBlock.Int64())
- filter.SetEndBlock(args.ToBlock.Int64())
- filter.SetAddresses(args.Addresses)
- filter.SetTopics(args.Topics)
-
- return toRPCLogs(filter.Find(), false)
-}
-
-// UninstallFilter removes the filter with the given filter id.
-func (s *PublicFilterAPI) UninstallFilter(filterId string) bool {
- s.filterManager.Lock()
- defer s.filterManager.Unlock()
-
- s.filterMapMu.Lock()
- id, ok := s.filterMapping[filterId]
- if !ok {
- s.filterMapMu.Unlock()
- return false
- }
- delete(s.filterMapping, filterId)
- s.filterMapMu.Unlock()
-
- s.filterManager.Remove(id)
-
- s.logMu.Lock()
- if _, ok := s.logQueue[id]; ok {
- delete(s.logQueue, id)
- s.logMu.Unlock()
- return true
- }
- s.logMu.Unlock()
-
- s.blockMu.Lock()
- if _, ok := s.blockQueue[id]; ok {
- delete(s.blockQueue, id)
- s.blockMu.Unlock()
- return true
- }
- s.blockMu.Unlock()
-
- s.transactionMu.Lock()
- if _, ok := s.transactionQueue[id]; ok {
- delete(s.transactionQueue, id)
- s.transactionMu.Unlock()
- return true
- }
- s.transactionMu.Unlock()
-
- return false
-}
-
-// getFilterType is a helper utility that determine the type of filter for the given filter id.
-func (s *PublicFilterAPI) getFilterType(id int) byte {
- if _, ok := s.blockQueue[id]; ok {
- return blockFilterTy
- } else if _, ok := s.transactionQueue[id]; ok {
- return transactionFilterTy
- } else if _, ok := s.logQueue[id]; ok {
- return logFilterTy
- }
-
- return unknownFilterTy
-}
-
-// blockFilterChanged returns a collection of block hashes for the block filter with the given id.
-func (s *PublicFilterAPI) blockFilterChanged(id int) []common.Hash {
- s.blockMu.Lock()
- defer s.blockMu.Unlock()
-
- if s.blockQueue[id] != nil {
- return s.blockQueue[id].get()
- }
- return nil
-}
-
-// transactionFilterChanged returns a collection of transaction hashes for the pending
-// transaction filter with the given id.
-func (s *PublicFilterAPI) transactionFilterChanged(id int) []common.Hash {
- s.blockMu.Lock()
- defer s.blockMu.Unlock()
-
- if s.transactionQueue[id] != nil {
- return s.transactionQueue[id].get()
- }
- return nil
-}
-
-// logFilterChanged returns a collection of logs for the log filter with the given id.
-func (s *PublicFilterAPI) logFilterChanged(id int) []vmlog {
- s.logMu.Lock()
- defer s.logMu.Unlock()
-
- if s.logQueue[id] != nil {
- return s.logQueue[id].get()
- }
- return nil
-}
-
-// GetFilterLogs returns the logs for the filter with the given id.
-func (s *PublicFilterAPI) GetFilterLogs(filterId string) []vmlog {
- s.filterMapMu.RLock()
- id, ok := s.filterMapping[filterId]
- s.filterMapMu.RUnlock()
- if !ok {
- return toRPCLogs(nil, false)
- }
-
- if filter := s.filterManager.Get(id); filter != nil {
- return toRPCLogs(filter.Find(), false)
- }
-
- return toRPCLogs(nil, false)
-}
-
-// GetFilterChanges returns the logs for the filter with the given id since last time is was called.
-// This can be used for polling.
-func (s *PublicFilterAPI) GetFilterChanges(filterId string) interface{} {
- s.filterMapMu.RLock()
- id, ok := s.filterMapping[filterId]
- s.filterMapMu.RUnlock()
-
- if !ok { // filter not found
- return []interface{}{}
- }
-
- switch s.getFilterType(id) {
- case blockFilterTy:
- return returnHashes(s.blockFilterChanged(id))
- case transactionFilterTy:
- return returnHashes(s.transactionFilterChanged(id))
- case logFilterTy:
- return s.logFilterChanged(id)
- }
-
- return []interface{}{}
-}
-
-type vmlog struct {
- *vm.Log
- Removed bool `json:"removed"`
-}
-
-type logQueue struct {
- mu sync.Mutex
-
- logs []vmlog
- timeout time.Time
- id int
-}
-
-func (l *logQueue) add(logs ...vmlog) {
- l.mu.Lock()
- defer l.mu.Unlock()
-
- l.logs = append(l.logs, logs...)
-}
-
-func (l *logQueue) get() []vmlog {
- l.mu.Lock()
- defer l.mu.Unlock()
-
- l.timeout = time.Now()
- tmp := l.logs
- l.logs = nil
- return tmp
-}
-
-type hashQueue struct {
- mu sync.Mutex
-
- hashes []common.Hash
- timeout time.Time
- id int
-}
-
-func (l *hashQueue) add(hashes ...common.Hash) {
- l.mu.Lock()
- defer l.mu.Unlock()
-
- l.hashes = append(l.hashes, hashes...)
-}
-
-func (l *hashQueue) get() []common.Hash {
- l.mu.Lock()
- defer l.mu.Unlock()
-
- l.timeout = time.Now()
- tmp := l.hashes
- l.hashes = nil
- return tmp
-}
-
-// newFilterId generates a new random filter identifier that can be exposed to the outer world. By publishing random
-// identifiers it is not feasible for DApp's to guess filter id's for other DApp's and uninstall or poll for them
-// causing the affected DApp to miss data.
-func newFilterId() (string, error) {
- var subid [16]byte
- n, _ := rand.Read(subid[:])
- if n != 16 {
- return "", errors.New("Unable to generate filter id")
- }
- return "0x" + hex.EncodeToString(subid[:]), nil
-}
-
-// toRPCLogs is a helper that will convert a vm.Logs array to an structure which
-// can hold additional information about the logs such as whether it was deleted.
-// Additionally when nil is given it will by default instead create an empty slice
-// instead. This is required by the RPC specification.
-func toRPCLogs(logs vm.Logs, removed bool) []vmlog {
- convertedLogs := make([]vmlog, len(logs))
- for i, log := range logs {
- convertedLogs[i] = vmlog{Log: log, Removed: removed}
- }
- return convertedLogs
-}
-
-// returnHashes is a helper that will return an empty hash array case the given hash array is nil, otherwise is will
-// return the given hashes. The RPC interfaces defines that always an array is returned.
-func returnHashes(hashes []common.Hash) []common.Hash {
- if hashes == nil {
- return []common.Hash{}
- }
- return hashes
-}
diff --git a/eth/filters/api_test.go b/eth/filters/api_test.go
index 9e8edc241..98eb6cbaa 100644
--- a/eth/filters/api_test.go
+++ b/eth/filters/api_test.go
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-package filters_test
+package filters
import (
"encoding/json"
@@ -22,7 +22,6 @@ import (
"testing"
"github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/rpc"
)
@@ -39,14 +38,14 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
)
// default values
- var test0 filters.NewFilterArgs
+ var test0 FilterCriteria
if err := json.Unmarshal([]byte("{}"), &test0); err != nil {
t.Fatal(err)
}
- if test0.FromBlock != rpc.LatestBlockNumber {
+ if test0.FromBlock.Int64() != rpc.LatestBlockNumber.Int64() {
t.Fatalf("expected %d, got %d", rpc.LatestBlockNumber, test0.FromBlock)
}
- if test0.ToBlock != rpc.LatestBlockNumber {
+ if test0.ToBlock.Int64() != rpc.LatestBlockNumber.Int64() {
t.Fatalf("expected %d, got %d", rpc.LatestBlockNumber, test0.ToBlock)
}
if len(test0.Addresses) != 0 {
@@ -57,20 +56,20 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
}
// from, to block number
- var test1 filters.NewFilterArgs
+ var test1 FilterCriteria
vector := fmt.Sprintf(`{"fromBlock":"0x%x","toBlock":"0x%x"}`, fromBlock, toBlock)
if err := json.Unmarshal([]byte(vector), &test1); err != nil {
t.Fatal(err)
}
- if test1.FromBlock != fromBlock {
+ if test1.FromBlock.Int64() != fromBlock.Int64() {
t.Fatalf("expected FromBlock %d, got %d", fromBlock, test1.FromBlock)
}
- if test1.ToBlock != toBlock {
+ if test1.ToBlock.Int64() != toBlock.Int64() {
t.Fatalf("expected ToBlock %d, got %d", toBlock, test1.ToBlock)
}
// single address
- var test2 filters.NewFilterArgs
+ var test2 FilterCriteria
vector = fmt.Sprintf(`{"address": "%s"}`, address0.Hex())
if err := json.Unmarshal([]byte(vector), &test2); err != nil {
t.Fatal(err)
@@ -83,7 +82,7 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
}
// multiple address
- var test3 filters.NewFilterArgs
+ var test3 FilterCriteria
vector = fmt.Sprintf(`{"address": ["%s", "%s"]}`, address0.Hex(), address1.Hex())
if err := json.Unmarshal([]byte(vector), &test3); err != nil {
t.Fatal(err)
@@ -99,7 +98,7 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
}
// single topic
- var test4 filters.NewFilterArgs
+ var test4 FilterCriteria
vector = fmt.Sprintf(`{"topics": ["%s"]}`, topic0.Hex())
if err := json.Unmarshal([]byte(vector), &test4); err != nil {
t.Fatal(err)
@@ -115,7 +114,7 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
}
// test multiple "AND" topics
- var test5 filters.NewFilterArgs
+ var test5 FilterCriteria
vector = fmt.Sprintf(`{"topics": ["%s", "%s"]}`, topic0.Hex(), topic1.Hex())
if err := json.Unmarshal([]byte(vector), &test5); err != nil {
t.Fatal(err)
@@ -137,7 +136,7 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
}
// test optional topic
- var test6 filters.NewFilterArgs
+ var test6 FilterCriteria
vector = fmt.Sprintf(`{"topics": ["%s", null, "%s"]}`, topic0.Hex(), topic2.Hex())
if err := json.Unmarshal([]byte(vector), &test6); err != nil {
t.Fatal(err)
@@ -165,7 +164,7 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
}
// test OR topics
- var test7 filters.NewFilterArgs
+ var test7 FilterCriteria
vector = fmt.Sprintf(`{"topics": [["%s", "%s"], null, ["%s", null]]}`, topic0.Hex(), topic1.Hex(), topic2.Hex())
if err := json.Unmarshal([]byte(vector), &test7); err != nil {
t.Fatal(err)
diff --git a/eth/filters/filter.go b/eth/filters/filter.go
index fd739bf0e..4226620dc 100644
--- a/eth/filters/filter.go
+++ b/eth/filters/filter.go
@@ -23,15 +23,10 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/ethdb"
)
-type AccountChange struct {
- Address, StateAddress []byte
-}
-
-// Filtering interface
+// Filter can be used to retrieve and filter logs
type Filter struct {
created time.Time
@@ -39,70 +34,72 @@ type Filter struct {
begin, end int64
addresses []common.Address
topics [][]common.Hash
-
- BlockCallback func(*types.Block, vm.Logs)
- TransactionCallback func(*types.Transaction)
- LogCallback func(*vm.Log, bool)
}
-// Create a new filter which uses a bloom filter on blocks to figure out whether a particular block
-// is interesting or not.
+// New creates a new filter which uses a bloom filter on blocks to figure out whether
+// a particular block is interesting or not.
func New(db ethdb.Database) *Filter {
return &Filter{db: db}
}
-// Set the earliest and latest block for filtering.
+// SetBeginBlock sets the earliest block for filtering.
// -1 = latest block (i.e., the current block)
// hash = particular hash from-to
-func (self *Filter) SetBeginBlock(begin int64) {
- self.begin = begin
+func (f *Filter) SetBeginBlock(begin int64) {
+ f.begin = begin
}
-func (self *Filter) SetEndBlock(end int64) {
- self.end = end
+// SetEndBlock sets the latest block for filtering.
+func (f *Filter) SetEndBlock(end int64) {
+ f.end = end
}
-func (self *Filter) SetAddresses(addr []common.Address) {
- self.addresses = addr
+// SetAddresses matches only logs that are generated from addresses that are included
+// in the given addresses.
+func (f *Filter) SetAddresses(addr []common.Address) {
+ f.addresses = addr
}
-func (self *Filter) SetTopics(topics [][]common.Hash) {
- self.topics = topics
+// SetTopics matches only logs that have topics matching the given topics.
+func (f *Filter) SetTopics(topics [][]common.Hash) {
+ f.topics = topics
}
// Run filters logs with the current parameters set
-func (self *Filter) Find() vm.Logs {
- latestHash := core.GetHeadBlockHash(self.db)
- latestBlock := core.GetBlock(self.db, latestHash, core.GetBlockNumber(self.db, latestHash))
+func (f *Filter) Find() []Log {
+ latestHash := core.GetHeadBlockHash(f.db)
+ latestBlock := core.GetBlock(f.db, latestHash, core.GetBlockNumber(f.db, latestHash))
if latestBlock == nil {
- return vm.Logs{}
+ return []Log{}
}
- var beginBlockNo uint64 = uint64(self.begin)
- if self.begin == -1 {
+
+ var beginBlockNo uint64 = uint64(f.begin)
+ if f.begin == -1 {
beginBlockNo = latestBlock.NumberU64()
}
- var endBlockNo uint64 = uint64(self.end)
- if self.end == -1 {
+
+ endBlockNo := uint64(f.end)
+ if f.end == -1 {
endBlockNo = latestBlock.NumberU64()
}
// if no addresses are present we can't make use of fast search which
// uses the mipmap bloom filters to check for fast inclusion and uses
// higher range probability in order to ensure at least a false positive
- if len(self.addresses) == 0 {
- return self.getLogs(beginBlockNo, endBlockNo)
+ if len(f.addresses) == 0 {
+ return f.getLogs(beginBlockNo, endBlockNo)
}
- return self.mipFind(beginBlockNo, endBlockNo, 0)
+ return f.mipFind(beginBlockNo, endBlockNo, 0)
}
-func (self *Filter) mipFind(start, end uint64, depth int) (logs vm.Logs) {
+func (f *Filter) mipFind(start, end uint64, depth int) (logs []Log) {
level := core.MIPMapLevels[depth]
// normalise numerator so we can work in level specific batches and
// work with the proper range checks
for num := start / level * level; num <= end; num += level {
// find addresses in bloom filters
- bloom := core.GetMipmapBloom(self.db, num, level)
- for _, addr := range self.addresses {
+ bloom := core.GetMipmapBloom(f.db, num, level)
+ for _, addr := range f.addresses {
if bloom.TestBytes(addr[:]) {
// range check normalised values and make sure that
// we're resolving the correct range instead of the
@@ -110,9 +107,9 @@ func (self *Filter) mipFind(start, end uint64, depth int) (logs vm.Logs) {
start := uint64(math.Max(float64(num), float64(start)))
end := uint64(math.Min(float64(num+level-1), float64(end)))
if depth+1 == len(core.MIPMapLevels) {
- logs = append(logs, self.getLogs(start, end)...)
+ logs = append(logs, f.getLogs(start, end)...)
} else {
- logs = append(logs, self.mipFind(start, end, depth+1)...)
+ logs = append(logs, f.mipFind(start, end, depth+1)...)
}
// break so we don't check the same range for each
// possible address. Checks on multiple addresses
@@ -125,12 +122,15 @@ func (self *Filter) mipFind(start, end uint64, depth int) (logs vm.Logs) {
return logs
}
-func (self *Filter) getLogs(start, end uint64) (logs vm.Logs) {
+func (f *Filter) getLogs(start, end uint64) (logs []Log) {
+ var block *types.Block
+
for i := start; i <= end; i++ {
- var block *types.Block
- hash := core.GetCanonicalHash(self.db, i)
+ hash := core.GetCanonicalHash(f.db, i)
if hash != (common.Hash{}) {
- block = core.GetBlock(self.db, hash, i)
+ block = core.GetBlock(f.db, hash, i)
+ } else { // block not found
+ return logs
}
if block == nil { // block not found/written
return logs
@@ -138,16 +138,20 @@ func (self *Filter) getLogs(start, end uint64) (logs vm.Logs) {
// Use bloom filtering to see if this block is interesting given the
// current parameters
- if self.bloomFilter(block) {
+ if f.bloomFilter(block) {
// Get the logs of the block
var (
- receipts = core.GetBlockReceipts(self.db, block.Hash(), i)
- unfiltered vm.Logs
+ receipts = core.GetBlockReceipts(f.db, block.Hash(), i)
+ unfiltered []Log
)
for _, receipt := range receipts {
- unfiltered = append(unfiltered, receipt.Logs...)
+ rl := make([]Log, len(receipt.Logs))
+ for i, l := range receipt.Logs {
+ rl[i] = Log{l, false}
+ }
+ unfiltered = append(unfiltered, rl...)
}
- logs = append(logs, self.FilterLogs(unfiltered)...)
+ logs = append(logs, filterLogs(unfiltered, f.addresses, f.topics)...)
}
}
@@ -164,26 +168,25 @@ func includes(addresses []common.Address, a common.Address) bool {
return false
}
-func (self *Filter) FilterLogs(logs vm.Logs) vm.Logs {
- var ret vm.Logs
+func filterLogs(logs []Log, addresses []common.Address, topics [][]common.Hash) []Log {
+ var ret []Log
// Filter the logs for interesting stuff
Logs:
for _, log := range logs {
- if len(self.addresses) > 0 && !includes(self.addresses, log.Address) {
+ if len(addresses) > 0 && !includes(addresses, log.Address) {
continue
}
- logTopics := make([]common.Hash, len(self.topics))
+ logTopics := make([]common.Hash, len(topics))
copy(logTopics, log.Topics)
- // If the to filtered topics is greater than the amount of topics in
- // logs, skip.
- if len(self.topics) > len(log.Topics) {
+ // If the to filtered topics is greater than the amount of topics in logs, skip.
+ if len(topics) > len(log.Topics) {
continue Logs
}
- for i, topics := range self.topics {
+ for i, topics := range topics {
var match bool
for _, topic := range topics {
// common.Hash{} is a match all (wildcard)
@@ -196,7 +199,6 @@ Logs:
if !match {
continue Logs
}
-
}
ret = append(ret, log)
@@ -205,10 +207,10 @@ Logs:
return ret
}
-func (self *Filter) bloomFilter(block *types.Block) bool {
- if len(self.addresses) > 0 {
+func (f *Filter) bloomFilter(block *types.Block) bool {
+ if len(f.addresses) > 0 {
var included bool
- for _, addr := range self.addresses {
+ for _, addr := range f.addresses {
if types.BloomLookup(block.Bloom(), addr) {
included = true
break
@@ -220,7 +222,7 @@ func (self *Filter) bloomFilter(block *types.Block) bool {
}
}
- for _, sub := range self.topics {
+ for _, sub := range f.topics {
var included bool
for _, topic := range sub {
if (topic == common.Hash{}) || types.BloomLookup(block.Bloom(), topic) {
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index 256464213..04a55fd09 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -14,179 +14,305 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-// package filters implements an ethereum filtering system for block,
+// Package filters implements an ethereum filtering system for block,
// transactions and log events.
package filters
import (
+ "encoding/json"
+ "errors"
"fmt"
"sync"
"time"
+ "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/rpc"
)
-// FilterType determines the type of filter and is used to put the filter in to
+// Type determines the kind of filter and is used to put the filter in to
// the correct bucket when added.
-type FilterType byte
+type Type byte
const (
- ChainFilter FilterType = iota // new block events filter
- PendingTxFilter // pending transaction filter
- LogFilter // new or removed log filter
- PendingLogFilter // pending log filter
+ // UnknownSubscription indicates an unkown subscription type
+ UnknownSubscription Type = iota
+ // LogsSubscription queries for new or removed (chain reorg) logs
+ LogsSubscription
+ // PendingLogsSubscription queries for logs for the pending block
+ PendingLogsSubscription
+ // PendingTransactionsSubscription queries tx hashes for pending
+ // transactions entering the pending state
+ PendingTransactionsSubscription
+ // BlocksSubscription queries hashes for blocks that are imported
+ BlocksSubscription
)
-// FilterSystem manages filters that filter specific events such as
-// block, transaction and log events. The Filtering system can be used to listen
-// for specific LOG events fired by the EVM (Ethereum Virtual Machine).
-type FilterSystem struct {
- filterMu sync.RWMutex
- filterId int
+var (
+ ErrInvalidSubscriptionID = errors.New("invalid id")
+)
+
+// Log is a helper that can hold additional information about vm.Log
+// necessary for the RPC interface.
+type Log struct {
+ *vm.Log
+ Removed bool `json:"removed"`
+}
- chainFilters map[int]*Filter
- pendingTxFilters map[int]*Filter
- logFilters map[int]*Filter
- pendingLogFilters map[int]*Filter
+func (l *Log) MarshalJSON() ([]byte, error) {
+ fields := map[string]interface{}{
+ "address": l.Address,
+ "data": fmt.Sprintf("0x%x", l.Data),
+ "blockNumber": fmt.Sprintf("%#x", l.BlockNumber),
+ "logIndex": fmt.Sprintf("%#x", l.Index),
+ "blockHash": l.BlockHash,
+ "transactionHash": l.TxHash,
+ "transactionIndex": fmt.Sprintf("%#x", l.TxIndex),
+ "topics": l.Topics,
+ "removed": l.Removed,
+ }
- // generic is an ugly hack for Get
- generic map[int]*Filter
+ return json.Marshal(fields)
+}
- sub event.Subscription
+type subscription struct {
+ id rpc.ID
+ typ Type
+ created time.Time
+ logsCrit FilterCriteria
+ logs chan []Log
+ hashes chan common.Hash
+ headers chan *types.Header
+ installed chan struct{} // closed when the filter is installed
+ err chan error // closed when the filter is uninstalled
}
-// NewFilterSystem returns a newly allocated filter manager
-func NewFilterSystem(mux *event.TypeMux) *FilterSystem {
- fs := &FilterSystem{
- chainFilters: make(map[int]*Filter),
- pendingTxFilters: make(map[int]*Filter),
- logFilters: make(map[int]*Filter),
- pendingLogFilters: make(map[int]*Filter),
- generic: make(map[int]*Filter),
+// EventSystem creates subscriptions, processes events and broadcasts them to the
+// subscription which match the subscription criteria.
+type EventSystem struct {
+ mux *event.TypeMux
+ sub event.Subscription
+ install chan *subscription // install filter for event notification
+ uninstall chan *subscription // remove filter for event notification
+}
+
+// NewEventSystem creates a new manager that listens for event on the given mux,
+// parses and filters them. It uses the all map to retrieve filter changes. The
+// work loop holds its own index that is used to forward events to filters.
+//
+// The returned manager has a loop that needs to be stopped with the Stop function
+// or by stopping the given mux.
+func NewEventSystem(mux *event.TypeMux) *EventSystem {
+ m := &EventSystem{
+ mux: mux,
+ install: make(chan *subscription),
+ uninstall: make(chan *subscription),
}
- fs.sub = mux.Subscribe(
- core.PendingLogsEvent{},
- core.RemovedLogsEvent{},
- core.ChainEvent{},
- core.TxPreEvent{},
- vm.Logs(nil),
- )
- go fs.filterLoop()
- return fs
+
+ go m.eventLoop()
+
+ return m
}
-// Stop quits the filter loop required for polling events
-func (fs *FilterSystem) Stop() {
- fs.sub.Unsubscribe()
+// Subscription is created when the client registers itself for a particular event.
+type Subscription struct {
+ ID rpc.ID
+ f *subscription
+ es *EventSystem
+ unsubOnce sync.Once
}
-// Acquire filter system maps lock, required to force lock acquisition
-// sequence with filterMu acquired first to avoid deadlocks by callbacks
-func (fs *FilterSystem) Lock() {
- fs.filterMu.Lock()
+// Err returns a channel that is closed when unsubscribed.
+func (sub *Subscription) Err() <-chan error {
+ return sub.f.err
}
-// Release filter system maps lock
-func (fs *FilterSystem) Unlock() {
- fs.filterMu.Unlock()
+// Unsubscribe uninstalls the subscription from the event broadcast loop.
+func (sub *Subscription) Unsubscribe() {
+ sub.unsubOnce.Do(func() {
+ uninstallLoop:
+ for {
+ // write uninstall request and consume logs/hashes. This prevents
+ // the eventLoop broadcast method to deadlock when writing to the
+ // filter event channel while the subscription loop is waiting for
+ // this method to return (and thus not reading these events).
+ select {
+ case sub.es.uninstall <- sub.f:
+ break uninstallLoop
+ case <-sub.f.logs:
+ case <-sub.f.hashes:
+ case <-sub.f.headers:
+ }
+ }
+
+ // wait for filter to be uninstalled in work loop before returning
+ // this ensures that the manager won't use the event channel which
+ // will probably be closed by the client asap after this method returns.
+ <-sub.Err()
+ })
}
-// Add adds a filter to the filter manager
-// Expects filterMu to be locked.
-func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error) {
- id := fs.filterId
- filter.created = time.Now()
+// subscribe installs the subscription in the event broadcast loop.
+func (es *EventSystem) subscribe(sub *subscription) *Subscription {
+ es.install <- sub
+ <-sub.installed
+ return &Subscription{ID: sub.id, f: sub, es: es}
+}
- switch filterType {
- case ChainFilter:
- fs.chainFilters[id] = filter
- case PendingTxFilter:
- fs.pendingTxFilters[id] = filter
- case LogFilter:
- fs.logFilters[id] = filter
- case PendingLogFilter:
- fs.pendingLogFilters[id] = filter
- default:
- return 0, fmt.Errorf("unknown filter type %v", filterType)
+// SubscribeLogs creates a subscription that will write all logs matching the
+// given criteria to the given logs channel.
+func (es *EventSystem) SubscribeLogs(crit FilterCriteria, logs chan []Log) *Subscription {
+ sub := &subscription{
+ id: rpc.NewID(),
+ typ: LogsSubscription,
+ logsCrit: crit,
+ created: time.Now(),
+ logs: logs,
+ hashes: make(chan common.Hash),
+ headers: make(chan *types.Header),
+ installed: make(chan struct{}),
+ err: make(chan error),
}
- fs.generic[id] = filter
- fs.filterId++
+ return es.subscribe(sub)
+}
- return id, nil
+// SubscribePendingLogs creates a subscription that will write pending logs matching the
+// given criteria to the given channel.
+func (es *EventSystem) SubscribePendingLogs(crit FilterCriteria, logs chan []Log) *Subscription {
+ sub := &subscription{
+ id: rpc.NewID(),
+ typ: PendingLogsSubscription,
+ logsCrit: crit,
+ created: time.Now(),
+ logs: logs,
+ hashes: make(chan common.Hash),
+ headers: make(chan *types.Header),
+ installed: make(chan struct{}),
+ err: make(chan error),
+ }
+
+ return es.subscribe(sub)
}
-// Remove removes a filter by filter id
-// Expects filterMu to be locked.
-func (fs *FilterSystem) Remove(id int) {
- delete(fs.chainFilters, id)
- delete(fs.pendingTxFilters, id)
- delete(fs.logFilters, id)
- delete(fs.pendingLogFilters, id)
- delete(fs.generic, id)
+// SubscribePendingTxEvents creates a sbuscription that writes transaction hashes for
+// transactions that enter the transaction pool.
+func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscription {
+ sub := &subscription{
+ id: rpc.NewID(),
+ typ: PendingTransactionsSubscription,
+ created: time.Now(),
+ logs: make(chan []Log),
+ hashes: hashes,
+ headers: make(chan *types.Header),
+ installed: make(chan struct{}),
+ err: make(chan error),
+ }
+
+ return es.subscribe(sub)
}
-func (fs *FilterSystem) Get(id int) *Filter {
- fs.filterMu.RLock()
- defer fs.filterMu.RUnlock()
+// SubscribeNewHeads creates a subscription that writes the header of a block that is
+// imported in the chain.
+func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription {
+ sub := &subscription{
+ id: rpc.NewID(),
+ typ: BlocksSubscription,
+ created: time.Now(),
+ logs: make(chan []Log),
+ hashes: make(chan common.Hash),
+ headers: headers,
+ installed: make(chan struct{}),
+ err: make(chan error),
+ }
- return fs.generic[id]
+ return es.subscribe(sub)
}
-// filterLoop waits for specific events from ethereum and fires their handlers
-// when the filter matches the requirements.
-func (fs *FilterSystem) filterLoop() {
- for event := range fs.sub.Chan() {
- switch ev := event.Data.(type) {
- case core.ChainEvent:
- fs.filterMu.RLock()
- for _, filter := range fs.chainFilters {
- if filter.BlockCallback != nil && !filter.created.After(event.Time) {
- filter.BlockCallback(ev.Block, ev.Logs)
+type filterIndex map[Type]map[rpc.ID]*subscription
+
+// broadcast event to filters that match criteria.
+func broadcast(filters filterIndex, ev *event.Event) {
+ if ev == nil {
+ return
+ }
+
+ switch e := ev.Data.(type) {
+ case vm.Logs:
+ if len(e) > 0 {
+ for _, f := range filters[LogsSubscription] {
+ if ev.Time.After(f.created) {
+ if matchedLogs := filterLogs(convertLogs(e, false), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
+ }
}
}
- fs.filterMu.RUnlock()
- case core.TxPreEvent:
- fs.filterMu.RLock()
- for _, filter := range fs.pendingTxFilters {
- if filter.TransactionCallback != nil && !filter.created.After(event.Time) {
- filter.TransactionCallback(ev.Tx)
+ }
+ case core.RemovedLogsEvent:
+ for _, f := range filters[LogsSubscription] {
+ if ev.Time.After(f.created) {
+ if matchedLogs := filterLogs(convertLogs(e.Logs, true), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
}
}
- fs.filterMu.RUnlock()
-
- case vm.Logs:
- fs.filterMu.RLock()
- for _, filter := range fs.logFilters {
- if filter.LogCallback != nil && !filter.created.After(event.Time) {
- for _, log := range filter.FilterLogs(ev) {
- filter.LogCallback(log, false)
- }
+ }
+ case core.PendingLogsEvent:
+ for _, f := range filters[PendingLogsSubscription] {
+ if ev.Time.After(f.created) {
+ if matchedLogs := filterLogs(convertLogs(e.Logs, false), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
}
}
- fs.filterMu.RUnlock()
- case core.RemovedLogsEvent:
- fs.filterMu.RLock()
- for _, filter := range fs.logFilters {
- if filter.LogCallback != nil && !filter.created.After(event.Time) {
- for _, removedLog := range filter.FilterLogs(ev.Logs) {
- filter.LogCallback(removedLog, true)
- }
- }
+ }
+ case core.TxPreEvent:
+ for _, f := range filters[PendingTransactionsSubscription] {
+ if ev.Time.After(f.created) {
+ f.hashes <- e.Tx.Hash()
}
- fs.filterMu.RUnlock()
- case core.PendingLogsEvent:
- fs.filterMu.RLock()
- for _, filter := range fs.pendingLogFilters {
- if filter.LogCallback != nil && !filter.created.After(event.Time) {
- for _, pendingLog := range ev.Logs {
- filter.LogCallback(pendingLog, false)
- }
- }
+ }
+ case core.ChainEvent:
+ for _, f := range filters[BlocksSubscription] {
+ if ev.Time.After(f.created) {
+ f.headers <- e.Block.Header()
}
- fs.filterMu.RUnlock()
}
}
}
+
+// eventLoop (un)installs filters and processes mux events.
+func (es *EventSystem) eventLoop() {
+ var (
+ index = make(filterIndex)
+ sub = es.mux.Subscribe(core.PendingLogsEvent{}, core.RemovedLogsEvent{}, vm.Logs{}, core.TxPreEvent{}, core.ChainEvent{})
+ )
+ for {
+ select {
+ case ev, active := <-sub.Chan():
+ if !active { // system stopped
+ return
+ }
+ broadcast(index, ev)
+ case f := <-es.install:
+ if _, found := index[f.typ]; !found {
+ index[f.typ] = make(map[rpc.ID]*subscription)
+ }
+ index[f.typ][f.id] = f
+ close(f.installed)
+ case f := <-es.uninstall:
+ delete(index[f.typ], f.id)
+ close(f.err)
+ }
+ }
+}
+
+// convertLogs is a helper utility that converts vm.Logs to []filter.Log.
+func convertLogs(in vm.Logs, removed bool) []Log {
+ logs := make([]Log, len(in))
+ for i, l := range in {
+ logs[i] = Log{l, removed}
+ }
+ return logs
+}
diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go
index 72824cb08..9e6fde1c6 100644
--- a/eth/filters/filter_system_test.go
+++ b/eth/filters/filter_system_test.go
@@ -17,101 +17,310 @@
package filters
import (
+ "math/big"
+ "reflect"
"testing"
"time"
+ "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
+ "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/rpc"
)
-func TestCallbacks(t *testing.T) {
+var (
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ api = NewPublicFilterAPI(db, mux)
+)
+
+// TestBlockSubscription tests if a block subscription returns block hashes for posted chain events.
+// It creates multiple subscriptions:
+// - one at the start and should receive all posted chain events and a second (blockHashes)
+// - one that is created after a cutoff moment and uninstalled after a second cutoff moment (blockHashes[cutoff1:cutoff2])
+// - one that is created after the second cutoff moment (blockHashes[cutoff2:])
+func TestBlockSubscription(t *testing.T) {
+ t.Parallel()
+
var (
- mux event.TypeMux
- fs = NewFilterSystem(&mux)
- blockDone = make(chan struct{})
- txDone = make(chan struct{})
- logDone = make(chan struct{})
- removedLogDone = make(chan struct{})
- pendingLogDone = make(chan struct{})
+ genesis = core.WriteGenesisBlockForTesting(db)
+ chain, _ = core.GenerateChain(nil, genesis, db, 10, func(i int, gen *core.BlockGen) {})
+ chainEvents = []core.ChainEvent{}
)
- blockFilter := &Filter{
- BlockCallback: func(*types.Block, vm.Logs) {
- close(blockDone)
- },
- }
- txFilter := &Filter{
- TransactionCallback: func(*types.Transaction) {
- close(txDone)
- },
+ for _, blk := range chain {
+ chainEvents = append(chainEvents, core.ChainEvent{Hash: blk.Hash(), Block: blk})
}
- logFilter := &Filter{
- LogCallback: func(l *vm.Log, oob bool) {
- if !oob {
- close(logDone)
+
+ chan0 := make(chan *types.Header)
+ sub0 := api.events.SubscribeNewHeads(chan0)
+ chan1 := make(chan *types.Header)
+ sub1 := api.events.SubscribeNewHeads(chan1)
+
+ go func() { // simulate client
+ i1, i2 := 0, 0
+ for i1 != len(chainEvents) || i2 != len(chainEvents) {
+ select {
+ case header := <-chan0:
+ if chainEvents[i1].Hash != header.Hash() {
+ t.Errorf("sub0 received invalid hash on index %d, want %x, got %x", i1, chainEvents[i1].Hash, header.Hash())
+ }
+ i1++
+ case header := <-chan1:
+ if chainEvents[i2].Hash != header.Hash() {
+ t.Errorf("sub1 received invalid hash on index %d, want %x, got %x", i2, chainEvents[i2].Hash, header.Hash())
+ }
+ i2++
}
- },
+ }
+
+ sub0.Unsubscribe()
+ sub1.Unsubscribe()
+ }()
+
+ time.Sleep(1 * time.Second)
+ for _, e := range chainEvents {
+ mux.Post(e)
}
- removedLogFilter := &Filter{
- LogCallback: func(l *vm.Log, oob bool) {
- if oob {
- close(removedLogDone)
- }
- },
+
+ <-sub0.Err()
+ <-sub1.Err()
+}
+
+// TestPendingTxFilter tests whether pending tx filters retrieve all pending transactions that are posted to the event mux.
+func TestPendingTxFilter(t *testing.T) {
+ t.Parallel()
+
+ var (
+ transactions = []*types.Transaction{
+ types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil),
+ types.NewTransaction(1, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil),
+ types.NewTransaction(2, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil),
+ types.NewTransaction(3, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil),
+ types.NewTransaction(4, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil),
+ }
+
+ hashes []common.Hash
+ )
+
+ fid0 := api.NewPendingTransactionFilter()
+
+ time.Sleep(1 * time.Second)
+ for _, tx := range transactions {
+ ev := core.TxPreEvent{Tx: tx}
+ mux.Post(ev)
}
- pendingLogFilter := &Filter{
- LogCallback: func(*vm.Log, bool) {
- close(pendingLogDone)
- },
+
+ for {
+ h := api.GetFilterChanges(fid0).([]common.Hash)
+ hashes = append(hashes, h...)
+
+ if len(hashes) >= len(transactions) {
+ break
+ }
+
+ time.Sleep(100 * time.Millisecond)
+ }
+
+ for i := range hashes {
+ if hashes[i] != transactions[i].Hash() {
+ t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i])
+ }
+ }
+}
+
+// TestLogFilter tests whether log filters match the correct logs that are posted to the event mux.
+func TestLogFilter(t *testing.T) {
+ t.Parallel()
+
+ var (
+ firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
+ secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
+ thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333")
+ notUsedAddress = common.HexToAddress("0x9999999999999999999999999999999999999999")
+ firstTopic = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
+ secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222")
+ notUsedTopic = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999")
+
+ allLogs = vm.Logs{
+ // Note, these are used for comparison of the test cases.
+ vm.NewLog(firstAddr, []common.Hash{}, []byte(""), 0),
+ vm.NewLog(firstAddr, []common.Hash{firstTopic}, []byte(""), 1),
+ vm.NewLog(secondAddr, []common.Hash{firstTopic}, []byte(""), 1),
+ vm.NewLog(thirdAddress, []common.Hash{secondTopic}, []byte(""), 2),
+ vm.NewLog(thirdAddress, []common.Hash{secondTopic}, []byte(""), 3),
+ }
+
+ testCases = []struct {
+ crit FilterCriteria
+ expected vm.Logs
+ id rpc.ID
+ }{
+ // match all
+ {FilterCriteria{}, allLogs, ""},
+ // match none due to no matching addresses
+ {FilterCriteria{Addresses: []common.Address{common.Address{}, notUsedAddress}, Topics: [][]common.Hash{allLogs[0].Topics}}, vm.Logs{}, ""},
+ // match logs based on addresses, ignore topics
+ {FilterCriteria{Addresses: []common.Address{firstAddr}}, allLogs[:2], ""},
+ // match none due to no matching topics (match with address)
+ {FilterCriteria{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{[]common.Hash{notUsedTopic}}}, vm.Logs{}, ""},
+ // match logs based on addresses and topics
+ {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, allLogs[3:5], ""},
+ // match logs based on multiple addresses and "or" topics
+ {FilterCriteria{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, allLogs[2:5], ""},
+ // block numbers are ignored for filters created with New***Filter, these return all logs that match the given criterias when the state changes
+ {FilterCriteria{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(1), ToBlock: big.NewInt(2)}, allLogs[:2], ""},
+ }
+
+ err error
+ )
+
+ // create all filters
+ for i := range testCases {
+ testCases[i].id = api.NewFilter(testCases[i].crit)
}
- fs.Add(blockFilter, ChainFilter)
- fs.Add(txFilter, PendingTxFilter)
- fs.Add(logFilter, LogFilter)
- fs.Add(removedLogFilter, LogFilter)
- fs.Add(pendingLogFilter, PendingLogFilter)
-
- mux.Post(core.ChainEvent{})
- mux.Post(core.TxPreEvent{})
- mux.Post(vm.Logs{&vm.Log{}})
- mux.Post(core.RemovedLogsEvent{Logs: vm.Logs{&vm.Log{}}})
- mux.Post(core.PendingLogsEvent{Logs: vm.Logs{&vm.Log{}}})
-
- const dura = 5 * time.Second
- failTimer := time.NewTimer(dura)
- select {
- case <-blockDone:
- case <-failTimer.C:
- t.Error("block filter failed to trigger (timeout)")
+ // raise events
+ time.Sleep(1 * time.Second)
+ if err = mux.Post(allLogs); err != nil {
+ t.Fatal(err)
}
- failTimer.Reset(dura)
- select {
- case <-txDone:
- case <-failTimer.C:
- t.Error("transaction filter failed to trigger (timeout)")
+ for i, tt := range testCases {
+ var fetched []Log
+ for { // fetch all expected logs
+ fetched = append(fetched, api.GetFilterChanges(tt.id).([]Log)...)
+ if len(fetched) >= len(tt.expected) {
+ break
+ }
+
+ time.Sleep(100 * time.Millisecond)
+ }
+
+ if len(fetched) != len(tt.expected) {
+ t.Errorf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched))
+ return
+ }
+
+ for l := range fetched {
+ if fetched[l].Removed {
+ t.Errorf("expected log not to be removed for log %d in case %d", l, i)
+ }
+ if !reflect.DeepEqual(fetched[l].Log, tt.expected[l]) {
+ t.Errorf("invalid log on index %d for case %d", l, i)
+ }
+
+ }
}
+}
- failTimer.Reset(dura)
- select {
- case <-logDone:
- case <-failTimer.C:
- t.Error("log filter failed to trigger (timeout)")
+// TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event mux.
+func TestPendingLogsSubscription(t *testing.T) {
+ t.Parallel()
+
+ var (
+ firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
+ secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
+ thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333")
+ notUsedAddress = common.HexToAddress("0x9999999999999999999999999999999999999999")
+ firstTopic = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
+ secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222")
+ thirdTopic = common.HexToHash("0x3333333333333333333333333333333333333333333333333333333333333333")
+ forthTopic = common.HexToHash("0x4444444444444444444444444444444444444444444444444444444444444444")
+ notUsedTopic = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999")
+
+ allLogs = []core.PendingLogsEvent{
+ core.PendingLogsEvent{Logs: vm.Logs{vm.NewLog(firstAddr, []common.Hash{}, []byte(""), 0)}},
+ core.PendingLogsEvent{Logs: vm.Logs{vm.NewLog(firstAddr, []common.Hash{firstTopic}, []byte(""), 1)}},
+ core.PendingLogsEvent{Logs: vm.Logs{vm.NewLog(secondAddr, []common.Hash{firstTopic}, []byte(""), 2)}},
+ core.PendingLogsEvent{Logs: vm.Logs{vm.NewLog(thirdAddress, []common.Hash{secondTopic}, []byte(""), 3)}},
+ core.PendingLogsEvent{Logs: vm.Logs{vm.NewLog(thirdAddress, []common.Hash{secondTopic}, []byte(""), 4)}},
+ core.PendingLogsEvent{Logs: vm.Logs{
+ vm.NewLog(thirdAddress, []common.Hash{firstTopic}, []byte(""), 5),
+ vm.NewLog(thirdAddress, []common.Hash{thirdTopic}, []byte(""), 5),
+ vm.NewLog(thirdAddress, []common.Hash{forthTopic}, []byte(""), 5),
+ vm.NewLog(firstAddr, []common.Hash{firstTopic}, []byte(""), 5),
+ }},
+ }
+
+ convertLogs = func(pl []core.PendingLogsEvent) vm.Logs {
+ var logs vm.Logs
+ for _, l := range pl {
+ logs = append(logs, l.Logs...)
+ }
+ return logs
+ }
+
+ testCases = []struct {
+ crit FilterCriteria
+ expected vm.Logs
+ c chan []Log
+ sub *Subscription
+ }{
+ // match all
+ {FilterCriteria{}, convertLogs(allLogs), nil, nil},
+ // match none due to no matching addresses
+ {FilterCriteria{Addresses: []common.Address{common.Address{}, notUsedAddress}, Topics: [][]common.Hash{[]common.Hash{}}}, vm.Logs{}, nil, nil},
+ // match logs based on addresses, ignore topics
+ {FilterCriteria{Addresses: []common.Address{firstAddr}}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil},
+ // match none due to no matching topics (match with address)
+ {FilterCriteria{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{[]common.Hash{notUsedTopic}}}, vm.Logs{}, nil, nil},
+ // match logs based on addresses and topics
+ {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, append(convertLogs(allLogs[3:5]), allLogs[5].Logs[0]), nil, nil},
+ // match logs based on multiple addresses and "or" topics
+ {FilterCriteria{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, append(convertLogs(allLogs[2:5]), allLogs[5].Logs[0]), nil, nil},
+ // block numbers are ignored for filters created with New***Filter, these return all logs that match the given criterias when the state changes
+ {FilterCriteria{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(2), ToBlock: big.NewInt(3)}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil},
+ // multiple pending logs, should match only 2 topics from the logs in block 5
+ {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, forthTopic}}}, vm.Logs{allLogs[5].Logs[0], allLogs[5].Logs[2]}, nil, nil},
+ }
+ )
+
+ // create all subscriptions, this ensures all subscriptions are created before the events are posted.
+ // on slow machines this could otherwise lead to missing events when the subscription is created after
+ // (some) events are posted.
+ for i := range testCases {
+ testCases[i].c = make(chan []Log)
+ testCases[i].sub = api.events.SubscribePendingLogs(testCases[i].crit, testCases[i].c)
}
- failTimer.Reset(dura)
- select {
- case <-removedLogDone:
- case <-failTimer.C:
- t.Error("removed log filter failed to trigger (timeout)")
+ for n, test := range testCases {
+ i := n
+ tt := test
+ go func() {
+ var fetched []Log
+ fetchLoop:
+ for {
+ logs := <-tt.c
+ fetched = append(fetched, logs...)
+ if len(fetched) >= len(tt.expected) {
+ break fetchLoop
+ }
+ }
+
+ if len(fetched) != len(tt.expected) {
+ t.Fatalf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched))
+ }
+
+ for l := range fetched {
+ if fetched[l].Removed {
+ t.Errorf("expected log not to be removed for log %d in case %d", l, i)
+ }
+ if !reflect.DeepEqual(fetched[l].Log, tt.expected[l]) {
+ t.Errorf("invalid log on index %d for case %d", l, i)
+ }
+ }
+ }()
}
- failTimer.Reset(dura)
- select {
- case <-pendingLogDone:
- case <-failTimer.C:
- t.Error("pending log filter failed to trigger (timeout)")
+ // raise events
+ time.Sleep(1 * time.Second)
+ for _, l := range allLogs {
+ if err := mux.Post(l); err != nil {
+ t.Fatal(err)
+ }
}
}
diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go
index 88bacc45b..e1729d1d2 100644
--- a/internal/ethapi/api.go
+++ b/internal/ethapi/api.go
@@ -24,7 +24,6 @@ import (
"fmt"
"math/big"
"strings"
- "sync"
"time"
"github.com/ethereum/ethash"
@@ -345,37 +344,12 @@ func (s *PrivateAccountAPI) SignAndSendTransaction(ctx context.Context, args Sen
// PublicBlockChainAPI provides an API to access the Ethereum blockchain.
// It offers only methods that operate on public data that is freely available to anyone.
type PublicBlockChainAPI struct {
- b Backend
- muNewBlockSubscriptions sync.Mutex // protects newBlocksSubscriptions
- newBlockSubscriptions map[string]func(core.ChainEvent) error // callbacks for new block subscriptions
+ b Backend
}
// NewPublicBlockChainAPI creates a new Etheruem blockchain API.
func NewPublicBlockChainAPI(b Backend) *PublicBlockChainAPI {
- api := &PublicBlockChainAPI{
- b: b,
- newBlockSubscriptions: make(map[string]func(core.ChainEvent) error),
- }
-
- go api.subscriptionLoop()
-
- return api
-}
-
-// subscriptionLoop reads events from the global event mux and creates notifications for the matched subscriptions.
-func (s *PublicBlockChainAPI) subscriptionLoop() {
- sub := s.b.EventMux().Subscribe(core.ChainEvent{})
- for event := range sub.Chan() {
- if chainEvent, ok := event.Data.(core.ChainEvent); ok {
- s.muNewBlockSubscriptions.Lock()
- for id, notifyOf := range s.newBlockSubscriptions {
- if notifyOf(chainEvent) == rpc.ErrNotificationNotFound {
- delete(s.newBlockSubscriptions, id)
- }
- }
- s.muNewBlockSubscriptions.Unlock()
- }
- }
+ return &PublicBlockChainAPI{b}
}
// BlockNumber returns the block number of the chain head.
@@ -470,45 +444,6 @@ func (s *PublicBlockChainAPI) GetUncleCountByBlockHash(ctx context.Context, bloc
return nil
}
-// NewBlocksArgs allows the user to specify if the returned block should include transactions and in which format.
-type NewBlocksArgs struct {
- IncludeTransactions bool `json:"includeTransactions"`
- TransactionDetails bool `json:"transactionDetails"`
-}
-
-// NewBlocks triggers a new block event each time a block is appended to the chain. It accepts an argument which allows
-// the caller to specify whether the output should contain transactions and in what format.
-func (s *PublicBlockChainAPI) NewBlocks(ctx context.Context, args NewBlocksArgs) (rpc.Subscription, error) {
- notifier, supported := rpc.NotifierFromContext(ctx)
- if !supported {
- return nil, rpc.ErrNotificationsUnsupported
- }
-
- // create a subscription that will remove itself when unsubscribed/cancelled
- subscription, err := notifier.NewSubscription(func(subId string) {
- s.muNewBlockSubscriptions.Lock()
- delete(s.newBlockSubscriptions, subId)
- s.muNewBlockSubscriptions.Unlock()
- })
-
- if err != nil {
- return nil, err
- }
-
- // add a callback that is called on chain events which will format the block and notify the client
- s.muNewBlockSubscriptions.Lock()
- s.newBlockSubscriptions[subscription.ID()] = func(e core.ChainEvent) error {
- notification, err := s.rpcOutputBlock(e.Block, args.IncludeTransactions, args.TransactionDetails)
- if err == nil {
- return subscription.Notify(notification)
- }
- glog.V(logger.Warn).Info("unable to format block %v\n", err)
- return nil
- }
- s.muNewBlockSubscriptions.Unlock()
- return subscription, nil
-}
-
// GetCode returns the code stored at the given address in the state for the given block number.
func (s *PublicBlockChainAPI) GetCode(ctx context.Context, address common.Address, blockNr rpc.BlockNumber) (string, error) {
state, _, err := s.b.StateAndHeaderByNumber(blockNr)
@@ -867,40 +802,12 @@ func newRPCTransaction(b *types.Block, txHash common.Hash) (*RPCTransaction, err
// PublicTransactionPoolAPI exposes methods for the RPC interface
type PublicTransactionPoolAPI struct {
- b Backend
- muPendingTxSubs sync.Mutex
- pendingTxSubs map[string]rpc.Subscription
+ b Backend
}
// NewPublicTransactionPoolAPI creates a new RPC service with methods specific for the transaction pool.
func NewPublicTransactionPoolAPI(b Backend) *PublicTransactionPoolAPI {
- api := &PublicTransactionPoolAPI{
- b: b,
- pendingTxSubs: make(map[string]rpc.Subscription),
- }
-
- go api.subscriptionLoop()
-
- return api
-}
-
-// subscriptionLoop listens for events on the global event mux and creates notifications for subscriptions.
-func (s *PublicTransactionPoolAPI) subscriptionLoop() {
- sub := s.b.EventMux().Subscribe(core.TxPreEvent{})
- for event := range sub.Chan() {
- tx := event.Data.(core.TxPreEvent)
- if from, err := tx.Tx.FromFrontier(); err == nil {
- if s.b.AccountManager().HasAddress(from) {
- s.muPendingTxSubs.Lock()
- for id, sub := range s.pendingTxSubs {
- if sub.Notify(tx.Tx.Hash()) == rpc.ErrNotificationNotFound {
- delete(s.pendingTxSubs, id)
- }
- }
- s.muPendingTxSubs.Unlock()
- }
- }
- }
+ return &PublicTransactionPoolAPI{b}
}
func getTransaction(chainDb ethdb.Database, b Backend, txHash common.Hash) (*types.Transaction, bool, error) {
@@ -1353,31 +1260,6 @@ func (s *PublicTransactionPoolAPI) PendingTransactions() []*RPCTransaction {
return transactions
}
-// NewPendingTransactions creates a subscription that is triggered each time a transaction enters the transaction pool
-// and is send from one of the transactions this nodes manages.
-func (s *PublicTransactionPoolAPI) NewPendingTransactions(ctx context.Context) (rpc.Subscription, error) {
- notifier, supported := rpc.NotifierFromContext(ctx)
- if !supported {
- return nil, rpc.ErrNotificationsUnsupported
- }
-
- subscription, err := notifier.NewSubscription(func(id string) {
- s.muPendingTxSubs.Lock()
- delete(s.pendingTxSubs, id)
- s.muPendingTxSubs.Unlock()
- })
-
- if err != nil {
- return nil, err
- }
-
- s.muPendingTxSubs.Lock()
- s.pendingTxSubs[subscription.ID()] = subscription
- s.muPendingTxSubs.Unlock()
-
- return subscription, nil
-}
-
// Resend accepts an existing transaction and a new gas price and limit. It will remove the given transaction from the
// pool and reinsert it with the new gas price and limit.
func (s *PublicTransactionPoolAPI) Resend(ctx context.Context, tx *Tx, gasPrice, gasLimit *rpc.HexNumber) (common.Hash, error) {
diff --git a/rpc/notification.go b/rpc/notification.go
deleted file mode 100644
index 875433071..000000000
--- a/rpc/notification.go
+++ /dev/null
@@ -1,297 +0,0 @@
-// Copyright 2016 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package rpc
-
-import (
- "errors"
- "sync"
- "time"
-
- "github.com/ethereum/go-ethereum/logger"
- "github.com/ethereum/go-ethereum/logger/glog"
- "golang.org/x/net/context"
-)
-
-var (
- // ErrNotificationsUnsupported is returned when the connection doesn't support notifications
- ErrNotificationsUnsupported = errors.New("subscription notifications not supported by the current transport")
-
- // ErrNotificationNotFound is returned when the notification for the given id is not found
- ErrNotificationNotFound = errors.New("notification not found")
-
- // errNotifierStopped is returned when the notifier is stopped (e.g. codec is closed)
- errNotifierStopped = errors.New("unable to send notification")
-
- // errNotificationQueueFull is returns when there are too many notifications in the queue
- errNotificationQueueFull = errors.New("too many pending notifications")
-)
-
-// unsubSignal is a signal that the subscription is unsubscribed. It is used to flush buffered
-// notifications that might be pending in the internal queue.
-var unsubSignal = new(struct{})
-
-// UnsubscribeCallback defines a callback that is called when a subcription ends.
-// It receives the subscription id as argument.
-type UnsubscribeCallback func(id string)
-
-// notification is a helper object that holds event data for a subscription
-type notification struct {
- sub *bufferedSubscription // subscription id
- data interface{} // event data
-}
-
-// A Notifier type describes the interface for objects that can send create subscriptions
-type Notifier interface {
- // Create a new subscription. The given callback is called when this subscription
- // is cancelled (e.g. client send an unsubscribe, connection closed).
- NewSubscription(UnsubscribeCallback) (Subscription, error)
- // Cancel subscription
- Unsubscribe(id string) error
-}
-
-type notifierKey struct{}
-
-// NotifierFromContext returns the Notifier value stored in ctx, if any.
-func NotifierFromContext(ctx context.Context) (Notifier, bool) {
- n, ok := ctx.Value(notifierKey{}).(Notifier)
- return n, ok
-}
-
-// Subscription defines the interface for objects that can notify subscribers
-type Subscription interface {
- // Inform client of an event
- Notify(data interface{}) error
- // Unique identifier
- ID() string
- // Cancel subscription
- Cancel() error
-}
-
-// bufferedSubscription is a subscription that uses a bufferedNotifier to send
-// notifications to subscribers.
-type bufferedSubscription struct {
- id string
- unsubOnce sync.Once // call unsub method once
- unsub UnsubscribeCallback // called on Unsubscribed
- notifier *bufferedNotifier // forward notifications to
- pending chan interface{} // closed when active
- flushed chan interface{} // closed when all buffered notifications are send
- lastNotification time.Time // last time a notification was send
-}
-
-// ID returns the subscription identifier that the client uses to refer to this instance.
-func (s *bufferedSubscription) ID() string {
- return s.id
-}
-
-// Cancel informs the notifier that this subscription is cancelled by the API
-func (s *bufferedSubscription) Cancel() error {
- return s.notifier.Unsubscribe(s.id)
-}
-
-// Notify the subscriber of a particular event.
-func (s *bufferedSubscription) Notify(data interface{}) error {
- return s.notifier.send(s.id, data)
-}
-
-// bufferedNotifier is a notifier that queues notifications in an internal queue and
-// send them as fast as possible to the client from this queue. It will stop if the
-// queue grows past a given size.
-type bufferedNotifier struct {
- codec ServerCodec // underlying connection
- mu sync.Mutex // guard internal state
- subscriptions map[string]*bufferedSubscription // keep track of subscriptions associated with codec
- queueSize int // max number of items in queue
- queue chan *notification // notification queue
- stopped bool // indication if this notifier is ordered to stop
-}
-
-// newBufferedNotifier returns a notifier that queues notifications in an internal queue
-// from which notifications are send as fast as possible to the client. If the queue size
-// limit is reached (client is unable to keep up) it will stop and closes the codec.
-func newBufferedNotifier(codec ServerCodec, size int) *bufferedNotifier {
- notifier := &bufferedNotifier{
- codec: codec,
- subscriptions: make(map[string]*bufferedSubscription),
- queue: make(chan *notification, size),
- queueSize: size,
- }
-
- go notifier.run()
-
- return notifier
-}
-
-// NewSubscription creates a new subscription that forwards events to this instance internal
-// queue. The given callback is called when the subscription is unsubscribed/cancelled.
-func (n *bufferedNotifier) NewSubscription(callback UnsubscribeCallback) (Subscription, error) {
- id, err := newSubscriptionID()
- if err != nil {
- return nil, err
- }
-
- n.mu.Lock()
- defer n.mu.Unlock()
-
- if n.stopped {
- return nil, errNotifierStopped
- }
-
- sub := &bufferedSubscription{
- id: id,
- unsub: callback,
- notifier: n,
- pending: make(chan interface{}),
- flushed: make(chan interface{}),
- lastNotification: time.Now(),
- }
-
- n.subscriptions[id] = sub
-
- return sub, nil
-}
-
-// Remove the given subscription. If subscription is not found notificationNotFoundErr is returned.
-func (n *bufferedNotifier) Unsubscribe(subid string) error {
- n.mu.Lock()
- sub, found := n.subscriptions[subid]
- n.mu.Unlock()
-
- if found {
- // send the unsubscribe signal, this will cause the notifier not to accept new events
- // for this subscription and will close the flushed channel after the last (buffered)
- // notification was send to the client.
- if err := n.send(subid, unsubSignal); err != nil {
- return err
- }
-
- // wait for confirmation that all (buffered) events are send for this subscription.
- // this ensures that the unsubscribe method response is not send before all buffered
- // events for this subscription are send.
- <-sub.flushed
-
- return nil
- }
-
- return ErrNotificationNotFound
-}
-
-// Send enques the given data for the subscription with public ID on the internal queue. t returns
-// an error when the notifier is stopped or the queue is full. If data is the unsubscribe signal it
-// will remove the subscription with the given id from the subscription collection.
-func (n *bufferedNotifier) send(id string, data interface{}) error {
- n.mu.Lock()
- defer n.mu.Unlock()
-
- if n.stopped {
- return errNotifierStopped
- }
-
- var (
- subscription *bufferedSubscription
- found bool
- )
-
- // check if subscription is associated with this connection, it might be cancelled
- // (subscribe/connection closed)
- if subscription, found = n.subscriptions[id]; !found {
- glog.V(logger.Error).Infof("received notification for unknown subscription %s\n", id)
- return ErrNotificationNotFound
- }
-
- // received the unsubscribe signal. Add it to the queue to make sure any pending notifications
- // for this subscription are send. When the run loop receives this singal it will signal that
- // all pending subscriptions are flushed and that the confirmation of the unsubscribe can be
- // send to the user. Remove the subscriptions to make sure new notifications are not accepted.
- if data == unsubSignal {
- delete(n.subscriptions, id)
- if subscription.unsub != nil {
- subscription.unsubOnce.Do(func() { subscription.unsub(id) })
- }
- }
-
- subscription.lastNotification = time.Now()
-
- if len(n.queue) >= n.queueSize {
- glog.V(logger.Warn).Infoln("too many buffered notifications -> close connection")
- n.codec.Close()
- return errNotificationQueueFull
- }
-
- n.queue <- &notification{subscription, data}
- return nil
-}
-
-// run reads notifications from the internal queue and sends them to the client. In case of an
-// error, or when the codec is closed it will cancel all active subscriptions and returns.
-func (n *bufferedNotifier) run() {
- defer func() {
- n.mu.Lock()
- defer n.mu.Unlock()
-
- n.stopped = true
- close(n.queue)
-
- // on exit call unsubscribe callback
- for id, sub := range n.subscriptions {
- if sub.unsub != nil {
- sub.unsubOnce.Do(func() { sub.unsub(id) })
- }
- close(sub.flushed)
- delete(n.subscriptions, id)
- }
- }()
-
- for {
- select {
- case notification := <-n.queue:
- // It can happen that an event is raised before the RPC server was able to send the sub
- // id to the client. Therefore subscriptions are marked as pending until the sub id was
- // send. The RPC server will activate the subscription by closing the pending chan.
- <-notification.sub.pending
-
- if notification.data == unsubSignal {
- // unsubSignal is the last accepted message for this subscription. Raise the signal
- // that all buffered notifications are sent by closing the flushed channel. This
- // indicates that the response for the unsubscribe can be send to the client.
- close(notification.sub.flushed)
- } else {
- msg := n.codec.CreateNotification(notification.sub.id, notification.data)
- if err := n.codec.Write(msg); err != nil {
- n.codec.Close()
- // unable to send notification to client, unsubscribe all subscriptions
- glog.V(logger.Warn).Infof("unable to send notification - %v\n", err)
- return
- }
- }
- case <-n.codec.Closed(): // connection was closed
- glog.V(logger.Debug).Infoln("codec closed, stop subscriptions")
- return
- }
- }
-}
-
-// Marks the subscription as active. This will causes the notifications for this subscription to be
-// forwarded to the client.
-func (n *bufferedNotifier) activate(subid string) {
- n.mu.Lock()
- defer n.mu.Unlock()
-
- if sub, found := n.subscriptions[subid]; found {
- close(sub.pending)
- }
-}
diff --git a/rpc/server.go b/rpc/server.go
index 040805a5c..996c63700 100644
--- a/rpc/server.go
+++ b/rpc/server.go
@@ -166,7 +166,7 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO
// to send notification to clients. It is thight to the codec/connection. If the
// connection is closed the notifier will stop and cancels all active subscriptions.
if options&OptionSubscriptions == OptionSubscriptions {
- ctx = context.WithValue(ctx, notifierKey{}, newBufferedNotifier(codec, notificationBufferSize))
+ ctx = context.WithValue(ctx, notifierKey{}, newNotifier(codec))
}
s.codecsMu.Lock()
if atomic.LoadInt32(&s.run) != 1 { // server stopped
@@ -247,7 +247,7 @@ func (s *Server) Stop() {
}
// createSubscription will call the subscription callback and returns the subscription id or error.
-func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *serverRequest) (string, error) {
+func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *serverRequest) (ID, error) {
// subscription have as first argument the context following optional arguments
args := []reflect.Value{req.callb.rcvr, reflect.ValueOf(ctx)}
args = append(args, req.args...)
@@ -257,7 +257,7 @@ func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *ser
return "", reply[1].Interface().(error)
}
- return reply[0].Interface().(Subscription).ID(), nil
+ return reply[0].Interface().(*Subscription).ID, nil
}
// handle executes a request and returns the response from the callback.
@@ -273,8 +273,8 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque
return codec.CreateErrorResponse(&req.id, &callbackError{ErrNotificationsUnsupported.Error()}), nil
}
- subid := req.args[0].String()
- if err := notifier.Unsubscribe(subid); err != nil {
+ subid := ID(req.args[0].String())
+ if err := notifier.unsubscribe(subid); err != nil {
return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil
}
@@ -292,7 +292,7 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque
// active the subscription after the sub id was successfully sent to the client
activateSub := func() {
notifier, _ := NotifierFromContext(ctx)
- notifier.(*bufferedNotifier).activate(subid)
+ notifier.activate(subid)
}
return codec.CreateResponse(req.id, subid), activateSub
diff --git a/rpc/server_test.go b/rpc/server_test.go
index e6840bde4..c3c88fab7 100644
--- a/rpc/server_test.go
+++ b/rpc/server_test.go
@@ -72,7 +72,7 @@ func (s *Service) InvalidRets3() (string, string, error) {
return "", "", nil
}
-func (s *Service) Subscription(ctx context.Context) (Subscription, error) {
+func (s *Service) Subscription(ctx context.Context) (*Subscription, error) {
return nil, nil
}
diff --git a/rpc/subscription.go b/rpc/subscription.go
new file mode 100644
index 000000000..863d34b20
--- /dev/null
+++ b/rpc/subscription.go
@@ -0,0 +1,135 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package rpc
+
+import (
+ "errors"
+ "sync"
+
+ "golang.org/x/net/context"
+)
+
+var (
+ // ErrNotificationsUnsupported is returned when the connection doesn't support notifications
+ ErrNotificationsUnsupported = errors.New("notifications not supported")
+ // ErrNotificationNotFound is returned when the notification for the given id is not found
+ ErrSubscriptionNotFound = errors.New("subscription not found")
+)
+
+// ID defines a psuedo random number that is used to identify RPC subscriptions.
+type ID string
+
+// a Subscription is created by a notifier and tight to that notifier. The client can use
+// this subscription to wait for an unsubscribe request for the client, see Err().
+type Subscription struct {
+ ID ID
+ err chan error // closed on unsubscribe
+}
+
+// Err returns a channel that is closed when the client send an unsubscribe request.
+func (s *Subscription) Err() <-chan error {
+ return s.err
+}
+
+// notifierKey is used to store a notifier within the connection context.
+type notifierKey struct{}
+
+// Notifier is tight to a RPC connection that supports subscriptions.
+// Server callbacks use the notifier to send notifications.
+type Notifier struct {
+ codec ServerCodec
+ subMu sync.RWMutex // guards active and inactive maps
+ stopped bool
+ active map[ID]*Subscription
+ inactive map[ID]*Subscription
+}
+
+// newNotifier creates a new notifier that can be used to send subscription
+// notifications to the client.
+func newNotifier(codec ServerCodec) *Notifier {
+ return &Notifier{
+ codec: codec,
+ active: make(map[ID]*Subscription),
+ inactive: make(map[ID]*Subscription),
+ }
+}
+
+// NotifierFromContext returns the Notifier value stored in ctx, if any.
+func NotifierFromContext(ctx context.Context) (*Notifier, bool) {
+ n, ok := ctx.Value(notifierKey{}).(*Notifier)
+ return n, ok
+}
+
+// CreateSubscription returns a new subscription that is coupled to the
+// RPC connection. By default subscriptions are inactive and notifications
+// are dropped until the subscription is marked as active. This is done
+// by the RPC server after the subscription ID is send to the client.
+func (n *Notifier) CreateSubscription() *Subscription {
+ s := &Subscription{NewID(), make(chan error)}
+ n.subMu.Lock()
+ n.inactive[s.ID] = s
+ n.subMu.Unlock()
+ return s
+}
+
+// Notify sends a notification to the client with the given data as payload.
+// If an error occurs the RPC connection is closed and the error is returned.
+func (n *Notifier) Notify(id ID, data interface{}) error {
+ n.subMu.RLock()
+ defer n.subMu.RUnlock()
+
+ _, active := n.active[id]
+ if active {
+ notification := n.codec.CreateNotification(string(id), data)
+ if err := n.codec.Write(notification); err != nil {
+ n.codec.Close()
+ return err
+ }
+ }
+ return nil
+}
+
+// Closed returns a channel that is closed when the RPC connection is closed.
+func (n *Notifier) Closed() <-chan interface{} {
+ return n.codec.Closed()
+}
+
+// unsubscribe a subscription.
+// If the subscription could not be found ErrSubscriptionNotFound is returned.
+func (n *Notifier) unsubscribe(id ID) error {
+ n.subMu.Lock()
+ defer n.subMu.Unlock()
+ if s, found := n.active[id]; found {
+ close(s.err)
+ delete(n.active, id)
+ return nil
+ }
+ return ErrSubscriptionNotFound
+}
+
+// activate enables a subscription. Until a subscription is enabled all
+// notifications are dropped. This method is called by the RPC server after
+// the subscription ID was sent to client. This prevents notifications being
+// send to the client before the subscription ID is send to the client.
+func (n *Notifier) activate(id ID) {
+ n.subMu.Lock()
+ defer n.subMu.Unlock()
+ if sub, found := n.inactive[id]; found {
+ n.active[id] = sub
+ delete(n.inactive, id)
+ }
+}
diff --git a/rpc/notification_test.go b/rpc/subscription_test.go
index 52352848c..8bb341694 100644
--- a/rpc/notification_test.go
+++ b/rpc/subscription_test.go
@@ -50,7 +50,7 @@ func (s *NotificationTestService) Unsubscribe(subid string) {
s.mu.Unlock()
}
-func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val int) (Subscription, error) {
+func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val int) (*Subscription, error) {
notifier, supported := NotifierFromContext(ctx)
if !supported {
return nil, ErrNotificationsUnsupported
@@ -59,17 +59,29 @@ func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val i
// by explicitly creating an subscription we make sure that the subscription id is send back to the client
// before the first subscription.Notify is called. Otherwise the events might be send before the response
// for the eth_subscribe method.
- subscription, err := notifier.NewSubscription(s.Unsubscribe)
- if err != nil {
- return nil, err
- }
+ subscription := notifier.CreateSubscription()
go func() {
+ // test expects n events, if we begin sending event immediatly some events
+ // will probably be dropped since the subscription ID might not be send to
+ // the client.
+ time.Sleep(5 * time.Second)
for i := 0; i < n; i++ {
- if err := subscription.Notify(val + i); err != nil {
+ if err := notifier.Notify(subscription.ID, val+i); err != nil {
return
}
}
+
+ select {
+ case <-notifier.Closed():
+ s.mu.Lock()
+ s.unsubscribed = true
+ s.mu.Unlock()
+ case <-subscription.Err():
+ s.mu.Lock()
+ s.unsubscribed = true
+ s.mu.Unlock()
+ }
}()
return subscription, nil
@@ -77,7 +89,7 @@ func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val i
// HangSubscription blocks on s.unblockHangSubscription before
// sending anything.
-func (s *NotificationTestService) HangSubscription(ctx context.Context, val int) (Subscription, error) {
+func (s *NotificationTestService) HangSubscription(ctx context.Context, val int) (*Subscription, error) {
notifier, supported := NotifierFromContext(ctx)
if !supported {
return nil, ErrNotificationsUnsupported
@@ -85,12 +97,10 @@ func (s *NotificationTestService) HangSubscription(ctx context.Context, val int)
s.gotHangSubscriptionReq <- struct{}{}
<-s.unblockHangSubscription
- subscription, err := notifier.NewSubscription(s.Unsubscribe)
- if err != nil {
- return nil, err
- }
+ subscription := notifier.CreateSubscription()
+
go func() {
- subscription.Notify(val)
+ notifier.Notify(subscription.ID, val)
}()
return subscription, nil
}
diff --git a/rpc/types.go b/rpc/types.go
index 2a7268ad8..89c5b5bc9 100644
--- a/rpc/types.go
+++ b/rpc/types.go
@@ -269,6 +269,6 @@ func (bn *BlockNumber) UnmarshalJSON(data []byte) error {
return fmt.Errorf("blocknumber not in range [%d, %d]", earliestBlockNumber, maxBlockNumber)
}
-func (bn *BlockNumber) Int64() int64 {
- return (int64)(*bn)
+func (bn BlockNumber) Int64() int64 {
+ return (int64)(bn)
}
diff --git a/rpc/utils.go b/rpc/utils.go
index 1ac6698f5..b590ba62f 100644
--- a/rpc/utils.go
+++ b/rpc/utils.go
@@ -17,17 +17,26 @@
package rpc
import (
- "crypto/rand"
+ "bufio"
+ crand "crypto/rand"
+ "encoding/binary"
"encoding/hex"
- "errors"
"math/big"
+ "math/rand"
"reflect"
+ "sync"
+ "time"
"unicode"
"unicode/utf8"
"golang.org/x/net/context"
)
+var (
+ subscriptionIDGenMu sync.Mutex
+ subscriptionIDGen = idGenerator()
+)
+
// Is this an exported - upper case - name?
func isExported(name string) bool {
rune, _ := utf8.DecodeRuneInString(name)
@@ -218,11 +227,28 @@ METHODS:
return callbacks, subscriptions
}
-func newSubscriptionID() (string, error) {
- var subid [16]byte
- n, _ := rand.Read(subid[:])
- if n != 16 {
- return "", errors.New("Unable to generate subscription id")
+// idGenerator helper utility that generates a (pseudo) random sequence of
+// bytes that are used to generate identifiers.
+func idGenerator() *rand.Rand {
+ if seed, err := binary.ReadVarint(bufio.NewReader(crand.Reader)); err == nil {
+ return rand.New(rand.NewSource(seed))
+ }
+ return rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
+}
+
+// NewID generates a identifier that can be used as an identifier in the RPC interface.
+// e.g. filter and subscription identifier.
+func NewID() ID {
+ subscriptionIDGenMu.Lock()
+ defer subscriptionIDGenMu.Unlock()
+
+ id := make([]byte, 16)
+ for i := 0; i < len(id); i += 7 {
+ val := subscriptionIDGen.Int63()
+ for j := 0; i+j < len(id) && j < 7; j++ {
+ id[i+j] = byte(val)
+ val >>= 8
+ }
}
- return "0x" + hex.EncodeToString(subid[:]), nil
+ return ID("0x" + hex.EncodeToString(id))
}