aboutsummaryrefslogtreecommitdiffstats
path: root/internal/ethapi/api.go
diff options
context:
space:
mode:
authorFelix Lange <fjl@twurst.com>2016-08-17 23:14:42 +0800
committerGitHub <noreply@github.com>2016-08-17 23:14:42 +0800
commit3369783e0a3e0c06388cf59fddfd799811381a2b (patch)
treecb29e4550f63f3a763dd04b267261e354e56d7eb /internal/ethapi/api.go
parent3b39d4d1c15df2697284c3d7a61564f98ab45c70 (diff)
parent47ff8130124b479f1f051312eed50c33f0a38e6f (diff)
downloaddexon-3369783e0a3e0c06388cf59fddfd799811381a2b.tar
dexon-3369783e0a3e0c06388cf59fddfd799811381a2b.tar.gz
dexon-3369783e0a3e0c06388cf59fddfd799811381a2b.tar.bz2
dexon-3369783e0a3e0c06388cf59fddfd799811381a2b.tar.lz
dexon-3369783e0a3e0c06388cf59fddfd799811381a2b.tar.xz
dexon-3369783e0a3e0c06388cf59fddfd799811381a2b.tar.zst
dexon-3369783e0a3e0c06388cf59fddfd799811381a2b.zip
Merge pull request #2885 from bas-vk/subscriptions
rpc: refactor subscriptions and filters
Diffstat (limited to 'internal/ethapi/api.go')
-rw-r--r--internal/ethapi/api.go126
1 files changed, 4 insertions, 122 deletions
diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go
index 88bacc45b..e1729d1d2 100644
--- a/internal/ethapi/api.go
+++ b/internal/ethapi/api.go
@@ -24,7 +24,6 @@ import (
"fmt"
"math/big"
"strings"
- "sync"
"time"
"github.com/ethereum/ethash"
@@ -345,37 +344,12 @@ func (s *PrivateAccountAPI) SignAndSendTransaction(ctx context.Context, args Sen
// PublicBlockChainAPI provides an API to access the Ethereum blockchain.
// It offers only methods that operate on public data that is freely available to anyone.
type PublicBlockChainAPI struct {
- b Backend
- muNewBlockSubscriptions sync.Mutex // protects newBlocksSubscriptions
- newBlockSubscriptions map[string]func(core.ChainEvent) error // callbacks for new block subscriptions
+ b Backend
}
// NewPublicBlockChainAPI creates a new Etheruem blockchain API.
func NewPublicBlockChainAPI(b Backend) *PublicBlockChainAPI {
- api := &PublicBlockChainAPI{
- b: b,
- newBlockSubscriptions: make(map[string]func(core.ChainEvent) error),
- }
-
- go api.subscriptionLoop()
-
- return api
-}
-
-// subscriptionLoop reads events from the global event mux and creates notifications for the matched subscriptions.
-func (s *PublicBlockChainAPI) subscriptionLoop() {
- sub := s.b.EventMux().Subscribe(core.ChainEvent{})
- for event := range sub.Chan() {
- if chainEvent, ok := event.Data.(core.ChainEvent); ok {
- s.muNewBlockSubscriptions.Lock()
- for id, notifyOf := range s.newBlockSubscriptions {
- if notifyOf(chainEvent) == rpc.ErrNotificationNotFound {
- delete(s.newBlockSubscriptions, id)
- }
- }
- s.muNewBlockSubscriptions.Unlock()
- }
- }
+ return &PublicBlockChainAPI{b}
}
// BlockNumber returns the block number of the chain head.
@@ -470,45 +444,6 @@ func (s *PublicBlockChainAPI) GetUncleCountByBlockHash(ctx context.Context, bloc
return nil
}
-// NewBlocksArgs allows the user to specify if the returned block should include transactions and in which format.
-type NewBlocksArgs struct {
- IncludeTransactions bool `json:"includeTransactions"`
- TransactionDetails bool `json:"transactionDetails"`
-}
-
-// NewBlocks triggers a new block event each time a block is appended to the chain. It accepts an argument which allows
-// the caller to specify whether the output should contain transactions and in what format.
-func (s *PublicBlockChainAPI) NewBlocks(ctx context.Context, args NewBlocksArgs) (rpc.Subscription, error) {
- notifier, supported := rpc.NotifierFromContext(ctx)
- if !supported {
- return nil, rpc.ErrNotificationsUnsupported
- }
-
- // create a subscription that will remove itself when unsubscribed/cancelled
- subscription, err := notifier.NewSubscription(func(subId string) {
- s.muNewBlockSubscriptions.Lock()
- delete(s.newBlockSubscriptions, subId)
- s.muNewBlockSubscriptions.Unlock()
- })
-
- if err != nil {
- return nil, err
- }
-
- // add a callback that is called on chain events which will format the block and notify the client
- s.muNewBlockSubscriptions.Lock()
- s.newBlockSubscriptions[subscription.ID()] = func(e core.ChainEvent) error {
- notification, err := s.rpcOutputBlock(e.Block, args.IncludeTransactions, args.TransactionDetails)
- if err == nil {
- return subscription.Notify(notification)
- }
- glog.V(logger.Warn).Info("unable to format block %v\n", err)
- return nil
- }
- s.muNewBlockSubscriptions.Unlock()
- return subscription, nil
-}
-
// GetCode returns the code stored at the given address in the state for the given block number.
func (s *PublicBlockChainAPI) GetCode(ctx context.Context, address common.Address, blockNr rpc.BlockNumber) (string, error) {
state, _, err := s.b.StateAndHeaderByNumber(blockNr)
@@ -867,40 +802,12 @@ func newRPCTransaction(b *types.Block, txHash common.Hash) (*RPCTransaction, err
// PublicTransactionPoolAPI exposes methods for the RPC interface
type PublicTransactionPoolAPI struct {
- b Backend
- muPendingTxSubs sync.Mutex
- pendingTxSubs map[string]rpc.Subscription
+ b Backend
}
// NewPublicTransactionPoolAPI creates a new RPC service with methods specific for the transaction pool.
func NewPublicTransactionPoolAPI(b Backend) *PublicTransactionPoolAPI {
- api := &PublicTransactionPoolAPI{
- b: b,
- pendingTxSubs: make(map[string]rpc.Subscription),
- }
-
- go api.subscriptionLoop()
-
- return api
-}
-
-// subscriptionLoop listens for events on the global event mux and creates notifications for subscriptions.
-func (s *PublicTransactionPoolAPI) subscriptionLoop() {
- sub := s.b.EventMux().Subscribe(core.TxPreEvent{})
- for event := range sub.Chan() {
- tx := event.Data.(core.TxPreEvent)
- if from, err := tx.Tx.FromFrontier(); err == nil {
- if s.b.AccountManager().HasAddress(from) {
- s.muPendingTxSubs.Lock()
- for id, sub := range s.pendingTxSubs {
- if sub.Notify(tx.Tx.Hash()) == rpc.ErrNotificationNotFound {
- delete(s.pendingTxSubs, id)
- }
- }
- s.muPendingTxSubs.Unlock()
- }
- }
- }
+ return &PublicTransactionPoolAPI{b}
}
func getTransaction(chainDb ethdb.Database, b Backend, txHash common.Hash) (*types.Transaction, bool, error) {
@@ -1353,31 +1260,6 @@ func (s *PublicTransactionPoolAPI) PendingTransactions() []*RPCTransaction {
return transactions
}
-// NewPendingTransactions creates a subscription that is triggered each time a transaction enters the transaction pool
-// and is send from one of the transactions this nodes manages.
-func (s *PublicTransactionPoolAPI) NewPendingTransactions(ctx context.Context) (rpc.Subscription, error) {
- notifier, supported := rpc.NotifierFromContext(ctx)
- if !supported {
- return nil, rpc.ErrNotificationsUnsupported
- }
-
- subscription, err := notifier.NewSubscription(func(id string) {
- s.muPendingTxSubs.Lock()
- delete(s.pendingTxSubs, id)
- s.muPendingTxSubs.Unlock()
- })
-
- if err != nil {
- return nil, err
- }
-
- s.muPendingTxSubs.Lock()
- s.pendingTxSubs[subscription.ID()] = subscription
- s.muPendingTxSubs.Unlock()
-
- return subscription, nil
-}
-
// Resend accepts an existing transaction and a new gas price and limit. It will remove the given transaction from the
// pool and reinsert it with the new gas price and limit.
func (s *PublicTransactionPoolAPI) Resend(ctx context.Context, tx *Tx, gasPrice, gasLimit *rpc.HexNumber) (common.Hash, error) {