From 47ff8130124b479f1f051312eed50c33f0a38e6f Mon Sep 17 00:00:00 2001 From: Bas van Kervel Date: Wed, 27 Jul 2016 17:47:46 +0200 Subject: rpc: refactor subscriptions and filters --- eth/downloader/api.go | 157 +++++++++++++++++++++++++++++++++++--------------- 1 file changed, 111 insertions(+), 46 deletions(-) (limited to 'eth/downloader/api.go') diff --git a/eth/downloader/api.go b/eth/downloader/api.go index 94cd6515f..c36dfb7e0 100644 --- a/eth/downloader/api.go +++ b/eth/downloader/api.go @@ -19,53 +19,102 @@ package downloader import ( "sync" - "golang.org/x/net/context" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rpc" + "golang.org/x/net/context" ) // PublicDownloaderAPI provides an API which gives information about the current synchronisation status. // It offers only methods that operates on data that can be available to anyone without security risks. type PublicDownloaderAPI struct { - d *Downloader - mux *event.TypeMux - muSyncSubscriptions sync.Mutex - syncSubscriptions map[string]rpc.Subscription + d *Downloader + mux *event.TypeMux + installSyncSubscription chan chan interface{} + uninstallSyncSubscription chan *uninstallSyncSubscriptionRequest } -// NewPublicDownloaderAPI create a new PublicDownloaderAPI. +// NewPublicDownloaderAPI create a new PublicDownloaderAPI. The API has an internal event loop that +// listens for events from the downloader through the global event mux. In case it receives one of +// these events it broadcasts it to all syncing subscriptions that are installed through the +// installSyncSubscription channel. func NewPublicDownloaderAPI(d *Downloader, m *event.TypeMux) *PublicDownloaderAPI { - api := &PublicDownloaderAPI{d: d, mux: m, syncSubscriptions: make(map[string]rpc.Subscription)} + api := &PublicDownloaderAPI{ + d: d, + mux: m, + installSyncSubscription: make(chan chan interface{}), + uninstallSyncSubscription: make(chan *uninstallSyncSubscriptionRequest), + } - go api.run() + go api.eventLoop() return api } -func (api *PublicDownloaderAPI) run() { - sub := api.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{}) - - for event := range sub.Chan() { - var notification interface{} +// eventLoop runs an loop until the event mux closes. It will install and uninstall new +// sync subscriptions and broadcasts sync status updates to the installed sync subscriptions. +func (api *PublicDownloaderAPI) eventLoop() { + var ( + sub = api.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{}) + syncSubscriptions = make(map[chan interface{}]struct{}) + ) + + for { + select { + case i := <-api.installSyncSubscription: + syncSubscriptions[i] = struct{}{} + case u := <-api.uninstallSyncSubscription: + delete(syncSubscriptions, u.c) + close(u.uninstalled) + case event := <-sub.Chan(): + if event == nil { + return + } - switch event.Data.(type) { - case StartEvent: - result := &SyncingResult{Syncing: true} - result.Status.Origin, result.Status.Current, result.Status.Height, result.Status.Pulled, result.Status.Known = api.d.Progress() - notification = result - case DoneEvent, FailedEvent: - notification = false + var notification interface{} + switch event.Data.(type) { + case StartEvent: + result := &SyncingResult{Syncing: true} + result.Status.Origin, result.Status.Current, result.Status.Height, result.Status.Pulled, result.Status.Known = api.d.Progress() + notification = result + case DoneEvent, FailedEvent: + notification = false + } + // broadcast + for c := range syncSubscriptions { + c <- notification + } } + } +} + +// Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished. +func (api *PublicDownloaderAPI) Syncing(ctx context.Context) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } - api.muSyncSubscriptions.Lock() - for id, sub := range api.syncSubscriptions { - if sub.Notify(notification) == rpc.ErrNotificationNotFound { - delete(api.syncSubscriptions, id) + rpcSub := notifier.CreateSubscription() + + go func() { + statuses := make(chan interface{}) + sub := api.SubscribeSyncStatus(statuses) + + for { + select { + case status := <-statuses: + notifier.Notify(rpcSub.ID, status) + case <-rpcSub.Err(): + sub.Unsubscribe() + return + case <-notifier.Closed(): + sub.Unsubscribe() + return } } - api.muSyncSubscriptions.Unlock() - } + }() + + return rpcSub, nil } // Progress gives progress indications when the node is synchronising with the Ethereum network. @@ -83,26 +132,42 @@ type SyncingResult struct { Status Progress `json:"status"` } -// Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished. -func (api *PublicDownloaderAPI) Syncing(ctx context.Context) (rpc.Subscription, error) { - notifier, supported := rpc.NotifierFromContext(ctx) - if !supported { - return nil, rpc.ErrNotificationsUnsupported - } - - subscription, err := notifier.NewSubscription(func(id string) { - api.muSyncSubscriptions.Lock() - delete(api.syncSubscriptions, id) - api.muSyncSubscriptions.Unlock() - }) +// uninstallSyncSubscriptionRequest uninstalles a syncing subscription in the API event loop. +type uninstallSyncSubscriptionRequest struct { + c chan interface{} + uninstalled chan interface{} +} - if err != nil { - return nil, err - } +// SyncStatusSubscription represents a syncing subscription. +type SyncStatusSubscription struct { + api *PublicDownloaderAPI // register subscription in event loop of this api instance + c chan interface{} // channel where events are broadcasted to + unsubOnce sync.Once // make sure unsubscribe logic is executed once +} - api.muSyncSubscriptions.Lock() - api.syncSubscriptions[subscription.ID()] = subscription - api.muSyncSubscriptions.Unlock() +// Unsubscribe uninstalls the subscription from the DownloadAPI event loop. +// The status channel that was passed to subscribeSyncStatus isn't used anymore +// after this method returns. +func (s *SyncStatusSubscription) Unsubscribe() { + s.unsubOnce.Do(func() { + req := uninstallSyncSubscriptionRequest{s.c, make(chan interface{})} + s.api.uninstallSyncSubscription <- &req + + for { + select { + case <-s.c: + // drop new status events until uninstall confirmation + continue + case <-req.uninstalled: + return + } + } + }) +} - return subscription, nil +// SubscribeSyncStatus creates a subscription that will broadcast new synchronisation updates. +// The given channel must receive interface values, the result can either +func (api *PublicDownloaderAPI) SubscribeSyncStatus(status chan interface{}) *SyncStatusSubscription { + api.installSyncSubscription <- status + return &SyncStatusSubscription{api: api, c: status} } -- cgit v1.2.3