aboutsummaryrefslogtreecommitdiffstats
path: root/eth
diff options
context:
space:
mode:
authorJeffrey Wilcke <jeffrey@ethereum.org>2016-04-05 15:43:32 +0800
committerJeffrey Wilcke <jeffrey@ethereum.org>2016-04-05 15:43:32 +0800
commited92f116f72646e73613afc2f2e7b83472a61434 (patch)
tree6c6d3cf414e21ee37d93e30e782ec028ff0144f8 /eth
parent6a185531d2cd2003bb4352c391f9dca023894d5a (diff)
parentf7328c5ecbd1076582a71ef7bf436485f3868b1f (diff)
downloaddexon-ed92f116f72646e73613afc2f2e7b83472a61434.tar
dexon-ed92f116f72646e73613afc2f2e7b83472a61434.tar.gz
dexon-ed92f116f72646e73613afc2f2e7b83472a61434.tar.bz2
dexon-ed92f116f72646e73613afc2f2e7b83472a61434.tar.lz
dexon-ed92f116f72646e73613afc2f2e7b83472a61434.tar.xz
dexon-ed92f116f72646e73613afc2f2e7b83472a61434.tar.zst
dexon-ed92f116f72646e73613afc2f2e7b83472a61434.zip
Merge pull request #2407 from bas-vk/rpc-notifications
RPC pub sub
Diffstat (limited to 'eth')
-rw-r--r--eth/api.go206
-rw-r--r--eth/backend.go2
-rw-r--r--eth/downloader/api.go75
-rw-r--r--eth/filters/api.go79
4 files changed, 271 insertions, 91 deletions
diff --git a/eth/api.go b/eth/api.go
index a257639ba..676191fc2 100644
--- a/eth/api.go
+++ b/eth/api.go
@@ -28,6 +28,8 @@ import (
"sync"
"time"
+ "golang.org/x/net/context"
+
"github.com/ethereum/ethash"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
@@ -457,16 +459,46 @@ func (s *PrivateAccountAPI) LockAccount(addr common.Address) bool {
// It offers only methods that operate on public data that is freely available to anyone.
type PublicBlockChainAPI struct {
config *core.ChainConfig
- bc *core.BlockChain
- chainDb ethdb.Database
- eventMux *event.TypeMux
- am *accounts.Manager
- miner *miner.Miner
+ bc *core.BlockChain
+ chainDb ethdb.Database
+ eventMux *event.TypeMux
+ muNewBlockSubscriptions sync.Mutex // protects newBlocksSubscriptions
+ newBlockSubscriptions map[string]func(core.ChainEvent) error // callbacks for new block subscriptions
+ am *accounts.Manager
+ miner *miner.Miner
}
// NewPublicBlockChainAPI creates a new Etheruem blockchain API.
func NewPublicBlockChainAPI(config *core.ChainConfig, bc *core.BlockChain, m *miner.Miner, chainDb ethdb.Database, eventMux *event.TypeMux, am *accounts.Manager) *PublicBlockChainAPI {
- return &PublicBlockChainAPI{config: config, bc: bc, miner: m, chainDb: chainDb, eventMux: eventMux, am: am}
+ api := &PublicBlockChainAPI{
+ config: config,
+ bc: bc,
+ miner: m,
+ chainDb: chainDb,
+ eventMux: eventMux,
+ am: am,
+ newBlockSubscriptions: make(map[string]func(core.ChainEvent) error),
+ }
+
+ go api.subscriptionLoop()
+
+ return api
+}
+
+// subscriptionLoop reads events from the global event mux and creates notifications for the matched subscriptions.
+func (s *PublicBlockChainAPI) subscriptionLoop() {
+ sub := s.eventMux.Subscribe(core.ChainEvent{})
+ for event := range sub.Chan() {
+ if chainEvent, ok := event.Data.(core.ChainEvent); ok {
+ s.muNewBlockSubscriptions.Lock()
+ for id, notifyOf := range s.newBlockSubscriptions {
+ if notifyOf(chainEvent) == rpc.ErrNotificationNotFound {
+ delete(s.newBlockSubscriptions, id)
+ }
+ }
+ s.muNewBlockSubscriptions.Unlock()
+ }
+ }
}
// BlockNumber returns the block number of the chain head.
@@ -564,20 +596,36 @@ type NewBlocksArgs struct {
// NewBlocks triggers a new block event each time a block is appended to the chain. It accepts an argument which allows
// the caller to specify whether the output should contain transactions and in what format.
-func (s *PublicBlockChainAPI) NewBlocks(args NewBlocksArgs) (rpc.Subscription, error) {
- sub := s.eventMux.Subscribe(core.ChainEvent{})
+func (s *PublicBlockChainAPI) NewBlocks(ctx context.Context, args NewBlocksArgs) (rpc.Subscription, error) {
+ notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier)
+ if !supported {
+ return nil, rpc.ErrNotificationsUnsupported
+ }
- output := func(rawBlock interface{}) interface{} {
- if event, ok := rawBlock.(core.ChainEvent); ok {
- notification, err := s.rpcOutputBlock(event.Block, args.IncludeTransactions, args.TransactionDetails)
- if err == nil {
- return notification
- }
+ // create a subscription that will remove itself when unsubscribed/cancelled
+ subscription, err := notifier.NewSubscription(func(subId string) {
+ s.muNewBlockSubscriptions.Lock()
+ delete(s.newBlockSubscriptions, subId)
+ s.muNewBlockSubscriptions.Unlock()
+ })
+
+ if err != nil {
+ return nil, err
+ }
+
+ // add a callback that is called on chain events which will format the block and notify the client
+ s.muNewBlockSubscriptions.Lock()
+ s.newBlockSubscriptions[subscription.ID()] = func(e core.ChainEvent) error {
+ if notification, err := s.rpcOutputBlock(e.Block, args.IncludeTransactions, args.TransactionDetails); err == nil {
+ return subscription.Notify(notification)
+ } else {
+ glog.V(logger.Warn).Info("unable to format block %v\n", err)
}
- return rawBlock
+ return nil
}
+ s.muNewBlockSubscriptions.Unlock()
- return rpc.NewSubscriptionWithOutputFormat(sub, output), nil
+ return subscription, nil
}
// GetCode returns the code stored at the given address in the state for the given block number.
@@ -821,26 +869,75 @@ func newRPCTransaction(b *types.Block, txHash common.Hash) (*RPCTransaction, err
// PublicTransactionPoolAPI exposes methods for the RPC interface
type PublicTransactionPoolAPI struct {
- eventMux *event.TypeMux
- chainDb ethdb.Database
- gpo *GasPriceOracle
- bc *core.BlockChain
- miner *miner.Miner
- am *accounts.Manager
- txPool *core.TxPool
- txMu sync.Mutex
+ eventMux *event.TypeMux
+ chainDb ethdb.Database
+ gpo *GasPriceOracle
+ bc *core.BlockChain
+ miner *miner.Miner
+ am *accounts.Manager
+ txPool *core.TxPool
+ txMu sync.Mutex
+ muPendingTxSubs sync.Mutex
+ pendingTxSubs map[string]rpc.Subscription
}
// NewPublicTransactionPoolAPI creates a new RPC service with methods specific for the transaction pool.
func NewPublicTransactionPoolAPI(e *Ethereum) *PublicTransactionPoolAPI {
- return &PublicTransactionPoolAPI{
- eventMux: e.EventMux(),
- gpo: NewGasPriceOracle(e),
- chainDb: e.ChainDb(),
- bc: e.BlockChain(),
- am: e.AccountManager(),
- txPool: e.TxPool(),
- miner: e.Miner(),
+ api := &PublicTransactionPoolAPI{
+ eventMux: e.EventMux(),
+ gpo: NewGasPriceOracle(e),
+ chainDb: e.ChainDb(),
+ bc: e.BlockChain(),
+ am: e.AccountManager(),
+ txPool: e.TxPool(),
+ miner: e.Miner(),
+ pendingTxSubs: make(map[string]rpc.Subscription),
+ }
+
+ go api.subscriptionLoop()
+
+ return api
+}
+
+// subscriptionLoop listens for events on the global event mux and creates notifications for subscriptions.
+func (s *PublicTransactionPoolAPI) subscriptionLoop() {
+ sub := s.eventMux.Subscribe(core.TxPreEvent{})
+ accountTimeout := time.NewTicker(10 * time.Second)
+
+ // only publish pending tx signed by one of the accounts in the node
+ accountSet := set.New()
+ accounts, _ := s.am.Accounts()
+ for _, acc := range accounts {
+ accountSet.Add(acc.Address)
+ }
+
+ for {
+ select {
+ case event := <-sub.Chan():
+ if event == nil {
+ continue
+ }
+ tx := event.Data.(core.TxPreEvent)
+ if from, err := tx.Tx.FromFrontier(); err == nil {
+ if accountSet.Has(from) {
+ s.muPendingTxSubs.Lock()
+ for id, sub := range s.pendingTxSubs {
+ if sub.Notify(tx.Tx.Hash()) == rpc.ErrNotificationNotFound {
+ delete(s.pendingTxSubs, id)
+ }
+ }
+ s.muPendingTxSubs.Unlock()
+ }
+ }
+ case <-accountTimeout.C:
+ // refresh account list when accounts are added/removed from the node.
+ if accounts, err := s.am.Accounts(); err == nil {
+ accountSet.Clear()
+ for _, acc := range accounts {
+ accountSet.Add(acc.Address)
+ }
+ }
+ }
}
}
@@ -1275,40 +1372,27 @@ func (s *PublicTransactionPoolAPI) PendingTransactions() ([]*RPCTransaction, err
// NewPendingTransaction creates a subscription that is triggered each time a transaction enters the transaction pool
// and is send from one of the transactions this nodes manages.
-func (s *PublicTransactionPoolAPI) NewPendingTransactions() (rpc.Subscription, error) {
- sub := s.eventMux.Subscribe(core.TxPreEvent{})
-
- accounts, err := s.am.Accounts()
- if err != nil {
- return rpc.Subscription{}, err
+func (s *PublicTransactionPoolAPI) NewPendingTransactions(ctx context.Context) (rpc.Subscription, error) {
+ notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier)
+ if !supported {
+ return nil, rpc.ErrNotificationsUnsupported
}
- accountSet := set.New()
- for _, account := range accounts {
- accountSet.Add(account.Address)
- }
- accountSetLastUpdates := time.Now()
- output := func(transaction interface{}) interface{} {
- if time.Since(accountSetLastUpdates) > (time.Duration(2) * time.Second) {
- if accounts, err = s.am.Accounts(); err != nil {
- accountSet.Clear()
- for _, account := range accounts {
- accountSet.Add(account.Address)
- }
- accountSetLastUpdates = time.Now()
- }
- }
+ subscription, err := notifier.NewSubscription(func(id string) {
+ s.muPendingTxSubs.Lock()
+ delete(s.pendingTxSubs, id)
+ s.muPendingTxSubs.Unlock()
+ })
- tx := transaction.(core.TxPreEvent)
- if from, err := tx.Tx.FromFrontier(); err == nil {
- if accountSet.Has(from) {
- return tx.Tx.Hash()
- }
- }
- return nil
+ if err != nil {
+ return nil, err
}
- return rpc.NewSubscriptionWithOutputFormat(sub, output), nil
+ s.muPendingTxSubs.Lock()
+ s.pendingTxSubs[subscription.ID()] = subscription
+ s.muPendingTxSubs.Unlock()
+
+ return subscription, nil
}
// Resend accepts an existing transaction and a new gas price and limit. It will remove the given transaction from the
diff --git a/eth/backend.go b/eth/backend.go
index 26af7ff91..20f516610 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -306,7 +306,7 @@ func (s *Ethereum) APIs() []rpc.API {
}, {
Namespace: "eth",
Version: "1.0",
- Service: downloader.NewPublicDownloaderAPI(s.Downloader()),
+ Service: downloader.NewPublicDownloaderAPI(s.Downloader(), s.EventMux()),
Public: true,
}, {
Namespace: "miner",
diff --git a/eth/downloader/api.go b/eth/downloader/api.go
index 13d0ed46e..576b33f1d 100644
--- a/eth/downloader/api.go
+++ b/eth/downloader/api.go
@@ -17,18 +17,55 @@
package downloader
import (
+ "sync"
+
+ "golang.org/x/net/context"
+
+ "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
)
// PublicDownloaderAPI provides an API which gives information about the current synchronisation status.
// It offers only methods that operates on data that can be available to anyone without security risks.
type PublicDownloaderAPI struct {
- d *Downloader
+ d *Downloader
+ mux *event.TypeMux
+ muSyncSubscriptions sync.Mutex
+ syncSubscriptions map[string]rpc.Subscription
}
// NewPublicDownloaderAPI create a new PublicDownloaderAPI.
-func NewPublicDownloaderAPI(d *Downloader) *PublicDownloaderAPI {
- return &PublicDownloaderAPI{d}
+func NewPublicDownloaderAPI(d *Downloader, m *event.TypeMux) *PublicDownloaderAPI {
+ api := &PublicDownloaderAPI{d: d, mux: m, syncSubscriptions: make(map[string]rpc.Subscription)}
+
+ go api.run()
+
+ return api
+}
+
+func (api *PublicDownloaderAPI) run() {
+ sub := api.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{})
+
+ for event := range sub.Chan() {
+ var notification interface{}
+
+ switch event.Data.(type) {
+ case StartEvent:
+ result := &SyncingResult{Syncing: true}
+ result.Status.Origin, result.Status.Current, result.Status.Height, result.Status.Pulled, result.Status.Known = api.d.Progress()
+ notification = result
+ case DoneEvent, FailedEvent:
+ notification = false
+ }
+
+ api.muSyncSubscriptions.Lock()
+ for id, sub := range api.syncSubscriptions {
+ if sub.Notify(notification) == rpc.ErrNotificationNotFound {
+ delete(api.syncSubscriptions, id)
+ }
+ }
+ api.muSyncSubscriptions.Unlock()
+ }
}
// Progress gives progress indications when the node is synchronising with the Ethereum network.
@@ -47,19 +84,25 @@ type SyncingResult struct {
}
// Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished.
-func (s *PublicDownloaderAPI) Syncing() (rpc.Subscription, error) {
- sub := s.d.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{})
+func (api *PublicDownloaderAPI) Syncing(ctx context.Context) (rpc.Subscription, error) {
+ notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier)
+ if !supported {
+ return nil, rpc.ErrNotificationsUnsupported
+ }
- output := func(event interface{}) interface{} {
- switch event.(type) {
- case StartEvent:
- result := &SyncingResult{Syncing: true}
- result.Status.Origin, result.Status.Current, result.Status.Height, result.Status.Pulled, result.Status.Known = s.d.Progress()
- return result
- case DoneEvent, FailedEvent:
- return false
- }
- return nil
+ subscription, err := notifier.NewSubscription(func(id string) {
+ api.muSyncSubscriptions.Lock()
+ delete(api.syncSubscriptions, id)
+ api.muSyncSubscriptions.Unlock()
+ })
+
+ if err != nil {
+ return nil, err
}
- return rpc.NewSubscriptionWithOutputFormat(sub, output), nil
+
+ api.muSyncSubscriptions.Lock()
+ api.syncSubscriptions[subscription.ID()] = subscription
+ api.muSyncSubscriptions.Unlock()
+
+ return subscription, nil
}
diff --git a/eth/filters/api.go b/eth/filters/api.go
index e6a1ce3ab..956660363 100644
--- a/eth/filters/api.go
+++ b/eth/filters/api.go
@@ -17,15 +17,13 @@
package filters
import (
- "sync"
- "time"
-
"crypto/rand"
"encoding/hex"
- "errors"
-
"encoding/json"
+ "errors"
"fmt"
+ "sync"
+ "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
@@ -33,6 +31,8 @@ import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
+
+ "golang.org/x/net/context"
)
var (
@@ -202,7 +202,7 @@ func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
}
// newLogFilter creates a new log filter.
-func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash) (int, error) {
+func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash, callback func(log *vm.Log, removed bool)) (int, error) {
s.logMu.Lock()
defer s.logMu.Unlock()
@@ -219,17 +219,70 @@ func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []commo
filter.SetAddresses(addresses)
filter.SetTopics(topics)
filter.LogCallback = func(log *vm.Log, removed bool) {
- s.logMu.Lock()
- defer s.logMu.Unlock()
-
- if queue := s.logQueue[id]; queue != nil {
- queue.add(vmlog{log, removed})
+ if callback != nil {
+ callback(log, removed)
+ } else {
+ s.logMu.Lock()
+ defer s.logMu.Unlock()
+ if queue := s.logQueue[id]; queue != nil {
+ queue.add(vmlog{log, removed})
+ }
}
}
return id, nil
}
+func (s *PublicFilterAPI) Logs(ctx context.Context, args NewFilterArgs) (rpc.Subscription, error) {
+ notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier)
+ if !supported {
+ return nil, rpc.ErrNotificationsUnsupported
+ }
+
+ var (
+ externalId string
+ subscription rpc.Subscription
+ err error
+ )
+
+ if externalId, err = newFilterId(); err != nil {
+ return nil, err
+ }
+
+ // uninstall filter when subscription is unsubscribed/cancelled
+ if subscription, err = notifier.NewSubscription(func(string) {
+ s.UninstallFilter(externalId)
+ }); err != nil {
+ return nil, err
+ }
+
+ notifySubscriber := func(log *vm.Log, removed bool) {
+ rpcLog := toRPCLogs(vm.Logs{log}, removed)
+ if err := subscription.Notify(rpcLog); err != nil {
+ subscription.Cancel()
+ }
+ }
+
+ // from and to block number are not used since subscriptions don't allow you to travel to "time"
+ var id int
+ if len(args.Addresses) > 0 {
+ id, err = s.newLogFilter(-1, -1, args.Addresses, args.Topics, notifySubscriber)
+ } else {
+ id, err = s.newLogFilter(-1, -1, nil, args.Topics, notifySubscriber)
+ }
+
+ if err != nil {
+ subscription.Cancel()
+ return nil, err
+ }
+
+ s.filterMapMu.Lock()
+ s.filterMapping[externalId] = id
+ s.filterMapMu.Unlock()
+
+ return subscription, err
+}
+
// NewFilterArgs represents a request to create a new filter.
type NewFilterArgs struct {
FromBlock rpc.BlockNumber
@@ -364,9 +417,9 @@ func (s *PublicFilterAPI) NewFilter(args NewFilterArgs) (string, error) {
var id int
if len(args.Addresses) > 0 {
- id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics)
+ id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics, nil)
} else {
- id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics)
+ id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics, nil)
}
if err != nil {
return "", err