aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader
diff options
context:
space:
mode:
authorJeffrey Wilcke <jeffrey@ethereum.org>2016-04-05 15:43:32 +0800
committerJeffrey Wilcke <jeffrey@ethereum.org>2016-04-05 15:43:32 +0800
commited92f116f72646e73613afc2f2e7b83472a61434 (patch)
tree6c6d3cf414e21ee37d93e30e782ec028ff0144f8 /eth/downloader
parent6a185531d2cd2003bb4352c391f9dca023894d5a (diff)
parentf7328c5ecbd1076582a71ef7bf436485f3868b1f (diff)
downloadgo-tangerine-ed92f116f72646e73613afc2f2e7b83472a61434.tar
go-tangerine-ed92f116f72646e73613afc2f2e7b83472a61434.tar.gz
go-tangerine-ed92f116f72646e73613afc2f2e7b83472a61434.tar.bz2
go-tangerine-ed92f116f72646e73613afc2f2e7b83472a61434.tar.lz
go-tangerine-ed92f116f72646e73613afc2f2e7b83472a61434.tar.xz
go-tangerine-ed92f116f72646e73613afc2f2e7b83472a61434.tar.zst
go-tangerine-ed92f116f72646e73613afc2f2e7b83472a61434.zip
Merge pull request #2407 from bas-vk/rpc-notifications
RPC pub sub
Diffstat (limited to 'eth/downloader')
-rw-r--r--eth/downloader/api.go75
1 files changed, 59 insertions, 16 deletions
diff --git a/eth/downloader/api.go b/eth/downloader/api.go
index 13d0ed46e..576b33f1d 100644
--- a/eth/downloader/api.go
+++ b/eth/downloader/api.go
@@ -17,18 +17,55 @@
package downloader
import (
+ "sync"
+
+ "golang.org/x/net/context"
+
+ "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
)
// PublicDownloaderAPI provides an API which gives information about the current synchronisation status.
// It offers only methods that operates on data that can be available to anyone without security risks.
type PublicDownloaderAPI struct {
- d *Downloader
+ d *Downloader
+ mux *event.TypeMux
+ muSyncSubscriptions sync.Mutex
+ syncSubscriptions map[string]rpc.Subscription
}
// NewPublicDownloaderAPI create a new PublicDownloaderAPI.
-func NewPublicDownloaderAPI(d *Downloader) *PublicDownloaderAPI {
- return &PublicDownloaderAPI{d}
+func NewPublicDownloaderAPI(d *Downloader, m *event.TypeMux) *PublicDownloaderAPI {
+ api := &PublicDownloaderAPI{d: d, mux: m, syncSubscriptions: make(map[string]rpc.Subscription)}
+
+ go api.run()
+
+ return api
+}
+
+func (api *PublicDownloaderAPI) run() {
+ sub := api.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{})
+
+ for event := range sub.Chan() {
+ var notification interface{}
+
+ switch event.Data.(type) {
+ case StartEvent:
+ result := &SyncingResult{Syncing: true}
+ result.Status.Origin, result.Status.Current, result.Status.Height, result.Status.Pulled, result.Status.Known = api.d.Progress()
+ notification = result
+ case DoneEvent, FailedEvent:
+ notification = false
+ }
+
+ api.muSyncSubscriptions.Lock()
+ for id, sub := range api.syncSubscriptions {
+ if sub.Notify(notification) == rpc.ErrNotificationNotFound {
+ delete(api.syncSubscriptions, id)
+ }
+ }
+ api.muSyncSubscriptions.Unlock()
+ }
}
// Progress gives progress indications when the node is synchronising with the Ethereum network.
@@ -47,19 +84,25 @@ type SyncingResult struct {
}
// Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished.
-func (s *PublicDownloaderAPI) Syncing() (rpc.Subscription, error) {
- sub := s.d.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{})
+func (api *PublicDownloaderAPI) Syncing(ctx context.Context) (rpc.Subscription, error) {
+ notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier)
+ if !supported {
+ return nil, rpc.ErrNotificationsUnsupported
+ }
- output := func(event interface{}) interface{} {
- switch event.(type) {
- case StartEvent:
- result := &SyncingResult{Syncing: true}
- result.Status.Origin, result.Status.Current, result.Status.Height, result.Status.Pulled, result.Status.Known = s.d.Progress()
- return result
- case DoneEvent, FailedEvent:
- return false
- }
- return nil
+ subscription, err := notifier.NewSubscription(func(id string) {
+ api.muSyncSubscriptions.Lock()
+ delete(api.syncSubscriptions, id)
+ api.muSyncSubscriptions.Unlock()
+ })
+
+ if err != nil {
+ return nil, err
}
- return rpc.NewSubscriptionWithOutputFormat(sub, output), nil
+
+ api.muSyncSubscriptions.Lock()
+ api.syncSubscriptions[subscription.ID()] = subscription
+ api.muSyncSubscriptions.Unlock()
+
+ return subscription, nil
}