From f7328c5ecbd1076582a71ef7bf436485f3868b1f Mon Sep 17 00:00:00 2001 From: Bas van Kervel Date: Tue, 29 Mar 2016 15:07:40 +0200 Subject: rpc: add pub/sub support --- eth/api.go | 206 +++++++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 145 insertions(+), 61 deletions(-) (limited to 'eth/api.go') diff --git a/eth/api.go b/eth/api.go index a257639ba..676191fc2 100644 --- a/eth/api.go +++ b/eth/api.go @@ -28,6 +28,8 @@ import ( "sync" "time" + "golang.org/x/net/context" + "github.com/ethereum/ethash" "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" @@ -457,16 +459,46 @@ func (s *PrivateAccountAPI) LockAccount(addr common.Address) bool { // It offers only methods that operate on public data that is freely available to anyone. type PublicBlockChainAPI struct { config *core.ChainConfig - bc *core.BlockChain - chainDb ethdb.Database - eventMux *event.TypeMux - am *accounts.Manager - miner *miner.Miner + bc *core.BlockChain + chainDb ethdb.Database + eventMux *event.TypeMux + muNewBlockSubscriptions sync.Mutex // protects newBlocksSubscriptions + newBlockSubscriptions map[string]func(core.ChainEvent) error // callbacks for new block subscriptions + am *accounts.Manager + miner *miner.Miner } // NewPublicBlockChainAPI creates a new Etheruem blockchain API. func NewPublicBlockChainAPI(config *core.ChainConfig, bc *core.BlockChain, m *miner.Miner, chainDb ethdb.Database, eventMux *event.TypeMux, am *accounts.Manager) *PublicBlockChainAPI { - return &PublicBlockChainAPI{config: config, bc: bc, miner: m, chainDb: chainDb, eventMux: eventMux, am: am} + api := &PublicBlockChainAPI{ + config: config, + bc: bc, + miner: m, + chainDb: chainDb, + eventMux: eventMux, + am: am, + newBlockSubscriptions: make(map[string]func(core.ChainEvent) error), + } + + go api.subscriptionLoop() + + return api +} + +// subscriptionLoop reads events from the global event mux and creates notifications for the matched subscriptions. +func (s *PublicBlockChainAPI) subscriptionLoop() { + sub := s.eventMux.Subscribe(core.ChainEvent{}) + for event := range sub.Chan() { + if chainEvent, ok := event.Data.(core.ChainEvent); ok { + s.muNewBlockSubscriptions.Lock() + for id, notifyOf := range s.newBlockSubscriptions { + if notifyOf(chainEvent) == rpc.ErrNotificationNotFound { + delete(s.newBlockSubscriptions, id) + } + } + s.muNewBlockSubscriptions.Unlock() + } + } } // BlockNumber returns the block number of the chain head. @@ -564,20 +596,36 @@ type NewBlocksArgs struct { // NewBlocks triggers a new block event each time a block is appended to the chain. It accepts an argument which allows // the caller to specify whether the output should contain transactions and in what format. -func (s *PublicBlockChainAPI) NewBlocks(args NewBlocksArgs) (rpc.Subscription, error) { - sub := s.eventMux.Subscribe(core.ChainEvent{}) +func (s *PublicBlockChainAPI) NewBlocks(ctx context.Context, args NewBlocksArgs) (rpc.Subscription, error) { + notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier) + if !supported { + return nil, rpc.ErrNotificationsUnsupported + } - output := func(rawBlock interface{}) interface{} { - if event, ok := rawBlock.(core.ChainEvent); ok { - notification, err := s.rpcOutputBlock(event.Block, args.IncludeTransactions, args.TransactionDetails) - if err == nil { - return notification - } + // create a subscription that will remove itself when unsubscribed/cancelled + subscription, err := notifier.NewSubscription(func(subId string) { + s.muNewBlockSubscriptions.Lock() + delete(s.newBlockSubscriptions, subId) + s.muNewBlockSubscriptions.Unlock() + }) + + if err != nil { + return nil, err + } + + // add a callback that is called on chain events which will format the block and notify the client + s.muNewBlockSubscriptions.Lock() + s.newBlockSubscriptions[subscription.ID()] = func(e core.ChainEvent) error { + if notification, err := s.rpcOutputBlock(e.Block, args.IncludeTransactions, args.TransactionDetails); err == nil { + return subscription.Notify(notification) + } else { + glog.V(logger.Warn).Info("unable to format block %v\n", err) } - return rawBlock + return nil } + s.muNewBlockSubscriptions.Unlock() - return rpc.NewSubscriptionWithOutputFormat(sub, output), nil + return subscription, nil } // GetCode returns the code stored at the given address in the state for the given block number. @@ -821,26 +869,75 @@ func newRPCTransaction(b *types.Block, txHash common.Hash) (*RPCTransaction, err // PublicTransactionPoolAPI exposes methods for the RPC interface type PublicTransactionPoolAPI struct { - eventMux *event.TypeMux - chainDb ethdb.Database - gpo *GasPriceOracle - bc *core.BlockChain - miner *miner.Miner - am *accounts.Manager - txPool *core.TxPool - txMu sync.Mutex + eventMux *event.TypeMux + chainDb ethdb.Database + gpo *GasPriceOracle + bc *core.BlockChain + miner *miner.Miner + am *accounts.Manager + txPool *core.TxPool + txMu sync.Mutex + muPendingTxSubs sync.Mutex + pendingTxSubs map[string]rpc.Subscription } // NewPublicTransactionPoolAPI creates a new RPC service with methods specific for the transaction pool. func NewPublicTransactionPoolAPI(e *Ethereum) *PublicTransactionPoolAPI { - return &PublicTransactionPoolAPI{ - eventMux: e.EventMux(), - gpo: NewGasPriceOracle(e), - chainDb: e.ChainDb(), - bc: e.BlockChain(), - am: e.AccountManager(), - txPool: e.TxPool(), - miner: e.Miner(), + api := &PublicTransactionPoolAPI{ + eventMux: e.EventMux(), + gpo: NewGasPriceOracle(e), + chainDb: e.ChainDb(), + bc: e.BlockChain(), + am: e.AccountManager(), + txPool: e.TxPool(), + miner: e.Miner(), + pendingTxSubs: make(map[string]rpc.Subscription), + } + + go api.subscriptionLoop() + + return api +} + +// subscriptionLoop listens for events on the global event mux and creates notifications for subscriptions. +func (s *PublicTransactionPoolAPI) subscriptionLoop() { + sub := s.eventMux.Subscribe(core.TxPreEvent{}) + accountTimeout := time.NewTicker(10 * time.Second) + + // only publish pending tx signed by one of the accounts in the node + accountSet := set.New() + accounts, _ := s.am.Accounts() + for _, acc := range accounts { + accountSet.Add(acc.Address) + } + + for { + select { + case event := <-sub.Chan(): + if event == nil { + continue + } + tx := event.Data.(core.TxPreEvent) + if from, err := tx.Tx.FromFrontier(); err == nil { + if accountSet.Has(from) { + s.muPendingTxSubs.Lock() + for id, sub := range s.pendingTxSubs { + if sub.Notify(tx.Tx.Hash()) == rpc.ErrNotificationNotFound { + delete(s.pendingTxSubs, id) + } + } + s.muPendingTxSubs.Unlock() + } + } + case <-accountTimeout.C: + // refresh account list when accounts are added/removed from the node. + if accounts, err := s.am.Accounts(); err == nil { + accountSet.Clear() + for _, acc := range accounts { + accountSet.Add(acc.Address) + } + } + } } } @@ -1275,40 +1372,27 @@ func (s *PublicTransactionPoolAPI) PendingTransactions() ([]*RPCTransaction, err // NewPendingTransaction creates a subscription that is triggered each time a transaction enters the transaction pool // and is send from one of the transactions this nodes manages. -func (s *PublicTransactionPoolAPI) NewPendingTransactions() (rpc.Subscription, error) { - sub := s.eventMux.Subscribe(core.TxPreEvent{}) - - accounts, err := s.am.Accounts() - if err != nil { - return rpc.Subscription{}, err +func (s *PublicTransactionPoolAPI) NewPendingTransactions(ctx context.Context) (rpc.Subscription, error) { + notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier) + if !supported { + return nil, rpc.ErrNotificationsUnsupported } - accountSet := set.New() - for _, account := range accounts { - accountSet.Add(account.Address) - } - accountSetLastUpdates := time.Now() - output := func(transaction interface{}) interface{} { - if time.Since(accountSetLastUpdates) > (time.Duration(2) * time.Second) { - if accounts, err = s.am.Accounts(); err != nil { - accountSet.Clear() - for _, account := range accounts { - accountSet.Add(account.Address) - } - accountSetLastUpdates = time.Now() - } - } + subscription, err := notifier.NewSubscription(func(id string) { + s.muPendingTxSubs.Lock() + delete(s.pendingTxSubs, id) + s.muPendingTxSubs.Unlock() + }) - tx := transaction.(core.TxPreEvent) - if from, err := tx.Tx.FromFrontier(); err == nil { - if accountSet.Has(from) { - return tx.Tx.Hash() - } - } - return nil + if err != nil { + return nil, err } - return rpc.NewSubscriptionWithOutputFormat(sub, output), nil + s.muPendingTxSubs.Lock() + s.pendingTxSubs[subscription.ID()] = subscription + s.muPendingTxSubs.Unlock() + + return subscription, nil } // Resend accepts an existing transaction and a new gas price and limit. It will remove the given transaction from the -- cgit v1.2.3