From 47ff8130124b479f1f051312eed50c33f0a38e6f Mon Sep 17 00:00:00 2001 From: Bas van Kervel Date: Wed, 27 Jul 2016 17:47:46 +0200 Subject: rpc: refactor subscriptions and filters --- internal/ethapi/api.go | 126 ++----------------------------------------------- 1 file changed, 4 insertions(+), 122 deletions(-) (limited to 'internal/ethapi') diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 88bacc45b..e1729d1d2 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -24,7 +24,6 @@ import ( "fmt" "math/big" "strings" - "sync" "time" "github.com/ethereum/ethash" @@ -345,37 +344,12 @@ func (s *PrivateAccountAPI) SignAndSendTransaction(ctx context.Context, args Sen // PublicBlockChainAPI provides an API to access the Ethereum blockchain. // It offers only methods that operate on public data that is freely available to anyone. type PublicBlockChainAPI struct { - b Backend - muNewBlockSubscriptions sync.Mutex // protects newBlocksSubscriptions - newBlockSubscriptions map[string]func(core.ChainEvent) error // callbacks for new block subscriptions + b Backend } // NewPublicBlockChainAPI creates a new Etheruem blockchain API. func NewPublicBlockChainAPI(b Backend) *PublicBlockChainAPI { - api := &PublicBlockChainAPI{ - b: b, - newBlockSubscriptions: make(map[string]func(core.ChainEvent) error), - } - - go api.subscriptionLoop() - - return api -} - -// subscriptionLoop reads events from the global event mux and creates notifications for the matched subscriptions. -func (s *PublicBlockChainAPI) subscriptionLoop() { - sub := s.b.EventMux().Subscribe(core.ChainEvent{}) - for event := range sub.Chan() { - if chainEvent, ok := event.Data.(core.ChainEvent); ok { - s.muNewBlockSubscriptions.Lock() - for id, notifyOf := range s.newBlockSubscriptions { - if notifyOf(chainEvent) == rpc.ErrNotificationNotFound { - delete(s.newBlockSubscriptions, id) - } - } - s.muNewBlockSubscriptions.Unlock() - } - } + return &PublicBlockChainAPI{b} } // BlockNumber returns the block number of the chain head. @@ -470,45 +444,6 @@ func (s *PublicBlockChainAPI) GetUncleCountByBlockHash(ctx context.Context, bloc return nil } -// NewBlocksArgs allows the user to specify if the returned block should include transactions and in which format. -type NewBlocksArgs struct { - IncludeTransactions bool `json:"includeTransactions"` - TransactionDetails bool `json:"transactionDetails"` -} - -// NewBlocks triggers a new block event each time a block is appended to the chain. It accepts an argument which allows -// the caller to specify whether the output should contain transactions and in what format. -func (s *PublicBlockChainAPI) NewBlocks(ctx context.Context, args NewBlocksArgs) (rpc.Subscription, error) { - notifier, supported := rpc.NotifierFromContext(ctx) - if !supported { - return nil, rpc.ErrNotificationsUnsupported - } - - // create a subscription that will remove itself when unsubscribed/cancelled - subscription, err := notifier.NewSubscription(func(subId string) { - s.muNewBlockSubscriptions.Lock() - delete(s.newBlockSubscriptions, subId) - s.muNewBlockSubscriptions.Unlock() - }) - - if err != nil { - return nil, err - } - - // add a callback that is called on chain events which will format the block and notify the client - s.muNewBlockSubscriptions.Lock() - s.newBlockSubscriptions[subscription.ID()] = func(e core.ChainEvent) error { - notification, err := s.rpcOutputBlock(e.Block, args.IncludeTransactions, args.TransactionDetails) - if err == nil { - return subscription.Notify(notification) - } - glog.V(logger.Warn).Info("unable to format block %v\n", err) - return nil - } - s.muNewBlockSubscriptions.Unlock() - return subscription, nil -} - // GetCode returns the code stored at the given address in the state for the given block number. func (s *PublicBlockChainAPI) GetCode(ctx context.Context, address common.Address, blockNr rpc.BlockNumber) (string, error) { state, _, err := s.b.StateAndHeaderByNumber(blockNr) @@ -867,40 +802,12 @@ func newRPCTransaction(b *types.Block, txHash common.Hash) (*RPCTransaction, err // PublicTransactionPoolAPI exposes methods for the RPC interface type PublicTransactionPoolAPI struct { - b Backend - muPendingTxSubs sync.Mutex - pendingTxSubs map[string]rpc.Subscription + b Backend } // NewPublicTransactionPoolAPI creates a new RPC service with methods specific for the transaction pool. func NewPublicTransactionPoolAPI(b Backend) *PublicTransactionPoolAPI { - api := &PublicTransactionPoolAPI{ - b: b, - pendingTxSubs: make(map[string]rpc.Subscription), - } - - go api.subscriptionLoop() - - return api -} - -// subscriptionLoop listens for events on the global event mux and creates notifications for subscriptions. -func (s *PublicTransactionPoolAPI) subscriptionLoop() { - sub := s.b.EventMux().Subscribe(core.TxPreEvent{}) - for event := range sub.Chan() { - tx := event.Data.(core.TxPreEvent) - if from, err := tx.Tx.FromFrontier(); err == nil { - if s.b.AccountManager().HasAddress(from) { - s.muPendingTxSubs.Lock() - for id, sub := range s.pendingTxSubs { - if sub.Notify(tx.Tx.Hash()) == rpc.ErrNotificationNotFound { - delete(s.pendingTxSubs, id) - } - } - s.muPendingTxSubs.Unlock() - } - } - } + return &PublicTransactionPoolAPI{b} } func getTransaction(chainDb ethdb.Database, b Backend, txHash common.Hash) (*types.Transaction, bool, error) { @@ -1353,31 +1260,6 @@ func (s *PublicTransactionPoolAPI) PendingTransactions() []*RPCTransaction { return transactions } -// NewPendingTransactions creates a subscription that is triggered each time a transaction enters the transaction pool -// and is send from one of the transactions this nodes manages. -func (s *PublicTransactionPoolAPI) NewPendingTransactions(ctx context.Context) (rpc.Subscription, error) { - notifier, supported := rpc.NotifierFromContext(ctx) - if !supported { - return nil, rpc.ErrNotificationsUnsupported - } - - subscription, err := notifier.NewSubscription(func(id string) { - s.muPendingTxSubs.Lock() - delete(s.pendingTxSubs, id) - s.muPendingTxSubs.Unlock() - }) - - if err != nil { - return nil, err - } - - s.muPendingTxSubs.Lock() - s.pendingTxSubs[subscription.ID()] = subscription - s.muPendingTxSubs.Unlock() - - return subscription, nil -} - // Resend accepts an existing transaction and a new gas price and limit. It will remove the given transaction from the // pool and reinsert it with the new gas price and limit. func (s *PublicTransactionPoolAPI) Resend(ctx context.Context, tx *Tx, gasPrice, gasLimit *rpc.HexNumber) (common.Hash, error) { -- cgit v1.2.3