diff options
author | Bas van Kervel <bas@ethdev.com> | 2016-03-29 21:07:40 +0800 |
---|---|---|
committer | Bas van Kervel <bas@ethdev.com> | 2016-04-02 00:26:35 +0800 |
commit | f7328c5ecbd1076582a71ef7bf436485f3868b1f (patch) | |
tree | a32f466f00306cb131bee254cbe14a4dcaa68973 /eth | |
parent | fb578f4550a08617485d9146876489d1f3bb1b52 (diff) | |
download | go-tangerine-f7328c5ecbd1076582a71ef7bf436485f3868b1f.tar go-tangerine-f7328c5ecbd1076582a71ef7bf436485f3868b1f.tar.gz go-tangerine-f7328c5ecbd1076582a71ef7bf436485f3868b1f.tar.bz2 go-tangerine-f7328c5ecbd1076582a71ef7bf436485f3868b1f.tar.lz go-tangerine-f7328c5ecbd1076582a71ef7bf436485f3868b1f.tar.xz go-tangerine-f7328c5ecbd1076582a71ef7bf436485f3868b1f.tar.zst go-tangerine-f7328c5ecbd1076582a71ef7bf436485f3868b1f.zip |
rpc: add pub/sub support
Diffstat (limited to 'eth')
-rw-r--r-- | eth/api.go | 206 | ||||
-rw-r--r-- | eth/backend.go | 2 | ||||
-rw-r--r-- | eth/downloader/api.go | 75 | ||||
-rw-r--r-- | eth/filters/api.go | 79 |
4 files changed, 271 insertions, 91 deletions
diff --git a/eth/api.go b/eth/api.go index a257639ba..676191fc2 100644 --- a/eth/api.go +++ b/eth/api.go @@ -28,6 +28,8 @@ import ( "sync" "time" + "golang.org/x/net/context" + "github.com/ethereum/ethash" "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" @@ -457,16 +459,46 @@ func (s *PrivateAccountAPI) LockAccount(addr common.Address) bool { // It offers only methods that operate on public data that is freely available to anyone. type PublicBlockChainAPI struct { config *core.ChainConfig - bc *core.BlockChain - chainDb ethdb.Database - eventMux *event.TypeMux - am *accounts.Manager - miner *miner.Miner + bc *core.BlockChain + chainDb ethdb.Database + eventMux *event.TypeMux + muNewBlockSubscriptions sync.Mutex // protects newBlocksSubscriptions + newBlockSubscriptions map[string]func(core.ChainEvent) error // callbacks for new block subscriptions + am *accounts.Manager + miner *miner.Miner } // NewPublicBlockChainAPI creates a new Etheruem blockchain API. func NewPublicBlockChainAPI(config *core.ChainConfig, bc *core.BlockChain, m *miner.Miner, chainDb ethdb.Database, eventMux *event.TypeMux, am *accounts.Manager) *PublicBlockChainAPI { - return &PublicBlockChainAPI{config: config, bc: bc, miner: m, chainDb: chainDb, eventMux: eventMux, am: am} + api := &PublicBlockChainAPI{ + config: config, + bc: bc, + miner: m, + chainDb: chainDb, + eventMux: eventMux, + am: am, + newBlockSubscriptions: make(map[string]func(core.ChainEvent) error), + } + + go api.subscriptionLoop() + + return api +} + +// subscriptionLoop reads events from the global event mux and creates notifications for the matched subscriptions. +func (s *PublicBlockChainAPI) subscriptionLoop() { + sub := s.eventMux.Subscribe(core.ChainEvent{}) + for event := range sub.Chan() { + if chainEvent, ok := event.Data.(core.ChainEvent); ok { + s.muNewBlockSubscriptions.Lock() + for id, notifyOf := range s.newBlockSubscriptions { + if notifyOf(chainEvent) == rpc.ErrNotificationNotFound { + delete(s.newBlockSubscriptions, id) + } + } + s.muNewBlockSubscriptions.Unlock() + } + } } // BlockNumber returns the block number of the chain head. @@ -564,20 +596,36 @@ type NewBlocksArgs struct { // NewBlocks triggers a new block event each time a block is appended to the chain. It accepts an argument which allows // the caller to specify whether the output should contain transactions and in what format. -func (s *PublicBlockChainAPI) NewBlocks(args NewBlocksArgs) (rpc.Subscription, error) { - sub := s.eventMux.Subscribe(core.ChainEvent{}) +func (s *PublicBlockChainAPI) NewBlocks(ctx context.Context, args NewBlocksArgs) (rpc.Subscription, error) { + notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier) + if !supported { + return nil, rpc.ErrNotificationsUnsupported + } - output := func(rawBlock interface{}) interface{} { - if event, ok := rawBlock.(core.ChainEvent); ok { - notification, err := s.rpcOutputBlock(event.Block, args.IncludeTransactions, args.TransactionDetails) - if err == nil { - return notification - } + // create a subscription that will remove itself when unsubscribed/cancelled + subscription, err := notifier.NewSubscription(func(subId string) { + s.muNewBlockSubscriptions.Lock() + delete(s.newBlockSubscriptions, subId) + s.muNewBlockSubscriptions.Unlock() + }) + + if err != nil { + return nil, err + } + + // add a callback that is called on chain events which will format the block and notify the client + s.muNewBlockSubscriptions.Lock() + s.newBlockSubscriptions[subscription.ID()] = func(e core.ChainEvent) error { + if notification, err := s.rpcOutputBlock(e.Block, args.IncludeTransactions, args.TransactionDetails); err == nil { + return subscription.Notify(notification) + } else { + glog.V(logger.Warn).Info("unable to format block %v\n", err) } - return rawBlock + return nil } + s.muNewBlockSubscriptions.Unlock() - return rpc.NewSubscriptionWithOutputFormat(sub, output), nil + return subscription, nil } // GetCode returns the code stored at the given address in the state for the given block number. @@ -821,26 +869,75 @@ func newRPCTransaction(b *types.Block, txHash common.Hash) (*RPCTransaction, err // PublicTransactionPoolAPI exposes methods for the RPC interface type PublicTransactionPoolAPI struct { - eventMux *event.TypeMux - chainDb ethdb.Database - gpo *GasPriceOracle - bc *core.BlockChain - miner *miner.Miner - am *accounts.Manager - txPool *core.TxPool - txMu sync.Mutex + eventMux *event.TypeMux + chainDb ethdb.Database + gpo *GasPriceOracle + bc *core.BlockChain + miner *miner.Miner + am *accounts.Manager + txPool *core.TxPool + txMu sync.Mutex + muPendingTxSubs sync.Mutex + pendingTxSubs map[string]rpc.Subscription } // NewPublicTransactionPoolAPI creates a new RPC service with methods specific for the transaction pool. func NewPublicTransactionPoolAPI(e *Ethereum) *PublicTransactionPoolAPI { - return &PublicTransactionPoolAPI{ - eventMux: e.EventMux(), - gpo: NewGasPriceOracle(e), - chainDb: e.ChainDb(), - bc: e.BlockChain(), - am: e.AccountManager(), - txPool: e.TxPool(), - miner: e.Miner(), + api := &PublicTransactionPoolAPI{ + eventMux: e.EventMux(), + gpo: NewGasPriceOracle(e), + chainDb: e.ChainDb(), + bc: e.BlockChain(), + am: e.AccountManager(), + txPool: e.TxPool(), + miner: e.Miner(), + pendingTxSubs: make(map[string]rpc.Subscription), + } + + go api.subscriptionLoop() + + return api +} + +// subscriptionLoop listens for events on the global event mux and creates notifications for subscriptions. +func (s *PublicTransactionPoolAPI) subscriptionLoop() { + sub := s.eventMux.Subscribe(core.TxPreEvent{}) + accountTimeout := time.NewTicker(10 * time.Second) + + // only publish pending tx signed by one of the accounts in the node + accountSet := set.New() + accounts, _ := s.am.Accounts() + for _, acc := range accounts { + accountSet.Add(acc.Address) + } + + for { + select { + case event := <-sub.Chan(): + if event == nil { + continue + } + tx := event.Data.(core.TxPreEvent) + if from, err := tx.Tx.FromFrontier(); err == nil { + if accountSet.Has(from) { + s.muPendingTxSubs.Lock() + for id, sub := range s.pendingTxSubs { + if sub.Notify(tx.Tx.Hash()) == rpc.ErrNotificationNotFound { + delete(s.pendingTxSubs, id) + } + } + s.muPendingTxSubs.Unlock() + } + } + case <-accountTimeout.C: + // refresh account list when accounts are added/removed from the node. + if accounts, err := s.am.Accounts(); err == nil { + accountSet.Clear() + for _, acc := range accounts { + accountSet.Add(acc.Address) + } + } + } } } @@ -1275,40 +1372,27 @@ func (s *PublicTransactionPoolAPI) PendingTransactions() ([]*RPCTransaction, err // NewPendingTransaction creates a subscription that is triggered each time a transaction enters the transaction pool // and is send from one of the transactions this nodes manages. -func (s *PublicTransactionPoolAPI) NewPendingTransactions() (rpc.Subscription, error) { - sub := s.eventMux.Subscribe(core.TxPreEvent{}) - - accounts, err := s.am.Accounts() - if err != nil { - return rpc.Subscription{}, err +func (s *PublicTransactionPoolAPI) NewPendingTransactions(ctx context.Context) (rpc.Subscription, error) { + notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier) + if !supported { + return nil, rpc.ErrNotificationsUnsupported } - accountSet := set.New() - for _, account := range accounts { - accountSet.Add(account.Address) - } - accountSetLastUpdates := time.Now() - output := func(transaction interface{}) interface{} { - if time.Since(accountSetLastUpdates) > (time.Duration(2) * time.Second) { - if accounts, err = s.am.Accounts(); err != nil { - accountSet.Clear() - for _, account := range accounts { - accountSet.Add(account.Address) - } - accountSetLastUpdates = time.Now() - } - } + subscription, err := notifier.NewSubscription(func(id string) { + s.muPendingTxSubs.Lock() + delete(s.pendingTxSubs, id) + s.muPendingTxSubs.Unlock() + }) - tx := transaction.(core.TxPreEvent) - if from, err := tx.Tx.FromFrontier(); err == nil { - if accountSet.Has(from) { - return tx.Tx.Hash() - } - } - return nil + if err != nil { + return nil, err } - return rpc.NewSubscriptionWithOutputFormat(sub, output), nil + s.muPendingTxSubs.Lock() + s.pendingTxSubs[subscription.ID()] = subscription + s.muPendingTxSubs.Unlock() + + return subscription, nil } // Resend accepts an existing transaction and a new gas price and limit. It will remove the given transaction from the diff --git a/eth/backend.go b/eth/backend.go index 3c3440a53..d4f72515e 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -310,7 +310,7 @@ func (s *Ethereum) APIs() []rpc.API { }, { Namespace: "eth", Version: "1.0", - Service: downloader.NewPublicDownloaderAPI(s.Downloader()), + Service: downloader.NewPublicDownloaderAPI(s.Downloader(), s.EventMux()), Public: true, }, { Namespace: "miner", diff --git a/eth/downloader/api.go b/eth/downloader/api.go index 13d0ed46e..576b33f1d 100644 --- a/eth/downloader/api.go +++ b/eth/downloader/api.go @@ -17,18 +17,55 @@ package downloader import ( + "sync" + + "golang.org/x/net/context" + + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rpc" ) // PublicDownloaderAPI provides an API which gives information about the current synchronisation status. // It offers only methods that operates on data that can be available to anyone without security risks. type PublicDownloaderAPI struct { - d *Downloader + d *Downloader + mux *event.TypeMux + muSyncSubscriptions sync.Mutex + syncSubscriptions map[string]rpc.Subscription } // NewPublicDownloaderAPI create a new PublicDownloaderAPI. -func NewPublicDownloaderAPI(d *Downloader) *PublicDownloaderAPI { - return &PublicDownloaderAPI{d} +func NewPublicDownloaderAPI(d *Downloader, m *event.TypeMux) *PublicDownloaderAPI { + api := &PublicDownloaderAPI{d: d, mux: m, syncSubscriptions: make(map[string]rpc.Subscription)} + + go api.run() + + return api +} + +func (api *PublicDownloaderAPI) run() { + sub := api.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{}) + + for event := range sub.Chan() { + var notification interface{} + + switch event.Data.(type) { + case StartEvent: + result := &SyncingResult{Syncing: true} + result.Status.Origin, result.Status.Current, result.Status.Height, result.Status.Pulled, result.Status.Known = api.d.Progress() + notification = result + case DoneEvent, FailedEvent: + notification = false + } + + api.muSyncSubscriptions.Lock() + for id, sub := range api.syncSubscriptions { + if sub.Notify(notification) == rpc.ErrNotificationNotFound { + delete(api.syncSubscriptions, id) + } + } + api.muSyncSubscriptions.Unlock() + } } // Progress gives progress indications when the node is synchronising with the Ethereum network. @@ -47,19 +84,25 @@ type SyncingResult struct { } // Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished. -func (s *PublicDownloaderAPI) Syncing() (rpc.Subscription, error) { - sub := s.d.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{}) +func (api *PublicDownloaderAPI) Syncing(ctx context.Context) (rpc.Subscription, error) { + notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier) + if !supported { + return nil, rpc.ErrNotificationsUnsupported + } - output := func(event interface{}) interface{} { - switch event.(type) { - case StartEvent: - result := &SyncingResult{Syncing: true} - result.Status.Origin, result.Status.Current, result.Status.Height, result.Status.Pulled, result.Status.Known = s.d.Progress() - return result - case DoneEvent, FailedEvent: - return false - } - return nil + subscription, err := notifier.NewSubscription(func(id string) { + api.muSyncSubscriptions.Lock() + delete(api.syncSubscriptions, id) + api.muSyncSubscriptions.Unlock() + }) + + if err != nil { + return nil, err } - return rpc.NewSubscriptionWithOutputFormat(sub, output), nil + + api.muSyncSubscriptions.Lock() + api.syncSubscriptions[subscription.ID()] = subscription + api.muSyncSubscriptions.Unlock() + + return subscription, nil } diff --git a/eth/filters/api.go b/eth/filters/api.go index e6a1ce3ab..956660363 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -17,15 +17,13 @@ package filters import ( - "sync" - "time" - "crypto/rand" "encoding/hex" - "errors" - "encoding/json" + "errors" "fmt" + "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -33,6 +31,8 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rpc" + + "golang.org/x/net/context" ) var ( @@ -202,7 +202,7 @@ func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) { } // newLogFilter creates a new log filter. -func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash) (int, error) { +func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash, callback func(log *vm.Log, removed bool)) (int, error) { s.logMu.Lock() defer s.logMu.Unlock() @@ -219,17 +219,70 @@ func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []commo filter.SetAddresses(addresses) filter.SetTopics(topics) filter.LogCallback = func(log *vm.Log, removed bool) { - s.logMu.Lock() - defer s.logMu.Unlock() - - if queue := s.logQueue[id]; queue != nil { - queue.add(vmlog{log, removed}) + if callback != nil { + callback(log, removed) + } else { + s.logMu.Lock() + defer s.logMu.Unlock() + if queue := s.logQueue[id]; queue != nil { + queue.add(vmlog{log, removed}) + } } } return id, nil } +func (s *PublicFilterAPI) Logs(ctx context.Context, args NewFilterArgs) (rpc.Subscription, error) { + notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier) + if !supported { + return nil, rpc.ErrNotificationsUnsupported + } + + var ( + externalId string + subscription rpc.Subscription + err error + ) + + if externalId, err = newFilterId(); err != nil { + return nil, err + } + + // uninstall filter when subscription is unsubscribed/cancelled + if subscription, err = notifier.NewSubscription(func(string) { + s.UninstallFilter(externalId) + }); err != nil { + return nil, err + } + + notifySubscriber := func(log *vm.Log, removed bool) { + rpcLog := toRPCLogs(vm.Logs{log}, removed) + if err := subscription.Notify(rpcLog); err != nil { + subscription.Cancel() + } + } + + // from and to block number are not used since subscriptions don't allow you to travel to "time" + var id int + if len(args.Addresses) > 0 { + id, err = s.newLogFilter(-1, -1, args.Addresses, args.Topics, notifySubscriber) + } else { + id, err = s.newLogFilter(-1, -1, nil, args.Topics, notifySubscriber) + } + + if err != nil { + subscription.Cancel() + return nil, err + } + + s.filterMapMu.Lock() + s.filterMapping[externalId] = id + s.filterMapMu.Unlock() + + return subscription, err +} + // NewFilterArgs represents a request to create a new filter. type NewFilterArgs struct { FromBlock rpc.BlockNumber @@ -364,9 +417,9 @@ func (s *PublicFilterAPI) NewFilter(args NewFilterArgs) (string, error) { var id int if len(args.Addresses) > 0 { - id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics) + id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics, nil) } else { - id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics) + id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics, nil) } if err != nil { return "", err |