From f7328c5ecbd1076582a71ef7bf436485f3868b1f Mon Sep 17 00:00:00 2001 From: Bas van Kervel Date: Tue, 29 Mar 2016 15:07:40 +0200 Subject: rpc: add pub/sub support --- eth/filters/api.go | 79 +++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 66 insertions(+), 13 deletions(-) (limited to 'eth/filters/api.go') diff --git a/eth/filters/api.go b/eth/filters/api.go index e6a1ce3ab..956660363 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -17,15 +17,13 @@ package filters import ( - "sync" - "time" - "crypto/rand" "encoding/hex" - "errors" - "encoding/json" + "errors" "fmt" + "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -33,6 +31,8 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rpc" + + "golang.org/x/net/context" ) var ( @@ -202,7 +202,7 @@ func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) { } // newLogFilter creates a new log filter. -func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash) (int, error) { +func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash, callback func(log *vm.Log, removed bool)) (int, error) { s.logMu.Lock() defer s.logMu.Unlock() @@ -219,17 +219,70 @@ func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []commo filter.SetAddresses(addresses) filter.SetTopics(topics) filter.LogCallback = func(log *vm.Log, removed bool) { - s.logMu.Lock() - defer s.logMu.Unlock() - - if queue := s.logQueue[id]; queue != nil { - queue.add(vmlog{log, removed}) + if callback != nil { + callback(log, removed) + } else { + s.logMu.Lock() + defer s.logMu.Unlock() + if queue := s.logQueue[id]; queue != nil { + queue.add(vmlog{log, removed}) + } } } return id, nil } +func (s *PublicFilterAPI) Logs(ctx context.Context, args NewFilterArgs) (rpc.Subscription, error) { + notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier) + if !supported { + return nil, rpc.ErrNotificationsUnsupported + } + + var ( + externalId string + subscription rpc.Subscription + err error + ) + + if externalId, err = newFilterId(); err != nil { + return nil, err + } + + // uninstall filter when subscription is unsubscribed/cancelled + if subscription, err = notifier.NewSubscription(func(string) { + s.UninstallFilter(externalId) + }); err != nil { + return nil, err + } + + notifySubscriber := func(log *vm.Log, removed bool) { + rpcLog := toRPCLogs(vm.Logs{log}, removed) + if err := subscription.Notify(rpcLog); err != nil { + subscription.Cancel() + } + } + + // from and to block number are not used since subscriptions don't allow you to travel to "time" + var id int + if len(args.Addresses) > 0 { + id, err = s.newLogFilter(-1, -1, args.Addresses, args.Topics, notifySubscriber) + } else { + id, err = s.newLogFilter(-1, -1, nil, args.Topics, notifySubscriber) + } + + if err != nil { + subscription.Cancel() + return nil, err + } + + s.filterMapMu.Lock() + s.filterMapping[externalId] = id + s.filterMapMu.Unlock() + + return subscription, err +} + // NewFilterArgs represents a request to create a new filter. type NewFilterArgs struct { FromBlock rpc.BlockNumber @@ -364,9 +417,9 @@ func (s *PublicFilterAPI) NewFilter(args NewFilterArgs) (string, error) { var id int if len(args.Addresses) > 0 { - id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics) + id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics, nil) } else { - id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics) + id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics, nil) } if err != nil { return "", err -- cgit v1.2.3