aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/api.go
diff options
context:
space:
mode:
authorBas van Kervel <bas@ethdev.com>2016-03-29 21:07:40 +0800
committerBas van Kervel <bas@ethdev.com>2016-04-02 00:26:35 +0800
commitf7328c5ecbd1076582a71ef7bf436485f3868b1f (patch)
treea32f466f00306cb131bee254cbe14a4dcaa68973 /eth/downloader/api.go
parentfb578f4550a08617485d9146876489d1f3bb1b52 (diff)
downloaddexon-f7328c5ecbd1076582a71ef7bf436485f3868b1f.tar
dexon-f7328c5ecbd1076582a71ef7bf436485f3868b1f.tar.gz
dexon-f7328c5ecbd1076582a71ef7bf436485f3868b1f.tar.bz2
dexon-f7328c5ecbd1076582a71ef7bf436485f3868b1f.tar.lz
dexon-f7328c5ecbd1076582a71ef7bf436485f3868b1f.tar.xz
dexon-f7328c5ecbd1076582a71ef7bf436485f3868b1f.tar.zst
dexon-f7328c5ecbd1076582a71ef7bf436485f3868b1f.zip
rpc: add pub/sub support
Diffstat (limited to 'eth/downloader/api.go')
-rw-r--r--eth/downloader/api.go75
1 files changed, 59 insertions, 16 deletions
diff --git a/eth/downloader/api.go b/eth/downloader/api.go
index 13d0ed46e..576b33f1d 100644
--- a/eth/downloader/api.go
+++ b/eth/downloader/api.go
@@ -17,18 +17,55 @@
package downloader
import (
+ "sync"
+
+ "golang.org/x/net/context"
+
+ "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
)
// PublicDownloaderAPI provides an API which gives information about the current synchronisation status.
// It offers only methods that operates on data that can be available to anyone without security risks.
type PublicDownloaderAPI struct {
- d *Downloader
+ d *Downloader
+ mux *event.TypeMux
+ muSyncSubscriptions sync.Mutex
+ syncSubscriptions map[string]rpc.Subscription
}
// NewPublicDownloaderAPI create a new PublicDownloaderAPI.
-func NewPublicDownloaderAPI(d *Downloader) *PublicDownloaderAPI {
- return &PublicDownloaderAPI{d}
+func NewPublicDownloaderAPI(d *Downloader, m *event.TypeMux) *PublicDownloaderAPI {
+ api := &PublicDownloaderAPI{d: d, mux: m, syncSubscriptions: make(map[string]rpc.Subscription)}
+
+ go api.run()
+
+ return api
+}
+
+func (api *PublicDownloaderAPI) run() {
+ sub := api.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{})
+
+ for event := range sub.Chan() {
+ var notification interface{}
+
+ switch event.Data.(type) {
+ case StartEvent:
+ result := &SyncingResult{Syncing: true}
+ result.Status.Origin, result.Status.Current, result.Status.Height, result.Status.Pulled, result.Status.Known = api.d.Progress()
+ notification = result
+ case DoneEvent, FailedEvent:
+ notification = false
+ }
+
+ api.muSyncSubscriptions.Lock()
+ for id, sub := range api.syncSubscriptions {
+ if sub.Notify(notification) == rpc.ErrNotificationNotFound {
+ delete(api.syncSubscriptions, id)
+ }
+ }
+ api.muSyncSubscriptions.Unlock()
+ }
}
// Progress gives progress indications when the node is synchronising with the Ethereum network.
@@ -47,19 +84,25 @@ type SyncingResult struct {
}
// Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished.
-func (s *PublicDownloaderAPI) Syncing() (rpc.Subscription, error) {
- sub := s.d.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{})
+func (api *PublicDownloaderAPI) Syncing(ctx context.Context) (rpc.Subscription, error) {
+ notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier)
+ if !supported {
+ return nil, rpc.ErrNotificationsUnsupported
+ }
- output := func(event interface{}) interface{} {
- switch event.(type) {
- case StartEvent:
- result := &SyncingResult{Syncing: true}
- result.Status.Origin, result.Status.Current, result.Status.Height, result.Status.Pulled, result.Status.Known = s.d.Progress()
- return result
- case DoneEvent, FailedEvent:
- return false
- }
- return nil
+ subscription, err := notifier.NewSubscription(func(id string) {
+ api.muSyncSubscriptions.Lock()
+ delete(api.syncSubscriptions, id)
+ api.muSyncSubscriptions.Unlock()
+ })
+
+ if err != nil {
+ return nil, err
}
- return rpc.NewSubscriptionWithOutputFormat(sub, output), nil
+
+ api.muSyncSubscriptions.Lock()
+ api.syncSubscriptions[subscription.ID()] = subscription
+ api.muSyncSubscriptions.Unlock()
+
+ return subscription, nil
}