diff options
author | Jeffrey Wilcke <jeffrey@ethereum.org> | 2016-04-05 15:43:32 +0800 |
---|---|---|
committer | Jeffrey Wilcke <jeffrey@ethereum.org> | 2016-04-05 15:43:32 +0800 |
commit | ed92f116f72646e73613afc2f2e7b83472a61434 (patch) | |
tree | 6c6d3cf414e21ee37d93e30e782ec028ff0144f8 /eth/downloader | |
parent | 6a185531d2cd2003bb4352c391f9dca023894d5a (diff) | |
parent | f7328c5ecbd1076582a71ef7bf436485f3868b1f (diff) | |
download | go-tangerine-ed92f116f72646e73613afc2f2e7b83472a61434.tar go-tangerine-ed92f116f72646e73613afc2f2e7b83472a61434.tar.gz go-tangerine-ed92f116f72646e73613afc2f2e7b83472a61434.tar.bz2 go-tangerine-ed92f116f72646e73613afc2f2e7b83472a61434.tar.lz go-tangerine-ed92f116f72646e73613afc2f2e7b83472a61434.tar.xz go-tangerine-ed92f116f72646e73613afc2f2e7b83472a61434.tar.zst go-tangerine-ed92f116f72646e73613afc2f2e7b83472a61434.zip |
Merge pull request #2407 from bas-vk/rpc-notifications
RPC pub sub
Diffstat (limited to 'eth/downloader')
-rw-r--r-- | eth/downloader/api.go | 75 |
1 files changed, 59 insertions, 16 deletions
diff --git a/eth/downloader/api.go b/eth/downloader/api.go index 13d0ed46e..576b33f1d 100644 --- a/eth/downloader/api.go +++ b/eth/downloader/api.go @@ -17,18 +17,55 @@ package downloader import ( + "sync" + + "golang.org/x/net/context" + + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rpc" ) // PublicDownloaderAPI provides an API which gives information about the current synchronisation status. // It offers only methods that operates on data that can be available to anyone without security risks. type PublicDownloaderAPI struct { - d *Downloader + d *Downloader + mux *event.TypeMux + muSyncSubscriptions sync.Mutex + syncSubscriptions map[string]rpc.Subscription } // NewPublicDownloaderAPI create a new PublicDownloaderAPI. -func NewPublicDownloaderAPI(d *Downloader) *PublicDownloaderAPI { - return &PublicDownloaderAPI{d} +func NewPublicDownloaderAPI(d *Downloader, m *event.TypeMux) *PublicDownloaderAPI { + api := &PublicDownloaderAPI{d: d, mux: m, syncSubscriptions: make(map[string]rpc.Subscription)} + + go api.run() + + return api +} + +func (api *PublicDownloaderAPI) run() { + sub := api.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{}) + + for event := range sub.Chan() { + var notification interface{} + + switch event.Data.(type) { + case StartEvent: + result := &SyncingResult{Syncing: true} + result.Status.Origin, result.Status.Current, result.Status.Height, result.Status.Pulled, result.Status.Known = api.d.Progress() + notification = result + case DoneEvent, FailedEvent: + notification = false + } + + api.muSyncSubscriptions.Lock() + for id, sub := range api.syncSubscriptions { + if sub.Notify(notification) == rpc.ErrNotificationNotFound { + delete(api.syncSubscriptions, id) + } + } + api.muSyncSubscriptions.Unlock() + } } // Progress gives progress indications when the node is synchronising with the Ethereum network. @@ -47,19 +84,25 @@ type SyncingResult struct { } // Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished. -func (s *PublicDownloaderAPI) Syncing() (rpc.Subscription, error) { - sub := s.d.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{}) +func (api *PublicDownloaderAPI) Syncing(ctx context.Context) (rpc.Subscription, error) { + notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier) + if !supported { + return nil, rpc.ErrNotificationsUnsupported + } - output := func(event interface{}) interface{} { - switch event.(type) { - case StartEvent: - result := &SyncingResult{Syncing: true} - result.Status.Origin, result.Status.Current, result.Status.Height, result.Status.Pulled, result.Status.Known = s.d.Progress() - return result - case DoneEvent, FailedEvent: - return false - } - return nil + subscription, err := notifier.NewSubscription(func(id string) { + api.muSyncSubscriptions.Lock() + delete(api.syncSubscriptions, id) + api.muSyncSubscriptions.Unlock() + }) + + if err != nil { + return nil, err } - return rpc.NewSubscriptionWithOutputFormat(sub, output), nil + + api.muSyncSubscriptions.Lock() + api.syncSubscriptions[subscription.ID()] = subscription + api.muSyncSubscriptions.Unlock() + + return subscription, nil } |