aboutsummaryrefslogtreecommitdiffstats
path: root/eth/filters
diff options
context:
space:
mode:
authorBas van Kervel <bas@ethdev.com>2016-03-29 21:07:40 +0800
committerBas van Kervel <bas@ethdev.com>2016-04-02 00:26:35 +0800
commitf7328c5ecbd1076582a71ef7bf436485f3868b1f (patch)
treea32f466f00306cb131bee254cbe14a4dcaa68973 /eth/filters
parentfb578f4550a08617485d9146876489d1f3bb1b52 (diff)
downloaddexon-f7328c5ecbd1076582a71ef7bf436485f3868b1f.tar
dexon-f7328c5ecbd1076582a71ef7bf436485f3868b1f.tar.gz
dexon-f7328c5ecbd1076582a71ef7bf436485f3868b1f.tar.bz2
dexon-f7328c5ecbd1076582a71ef7bf436485f3868b1f.tar.lz
dexon-f7328c5ecbd1076582a71ef7bf436485f3868b1f.tar.xz
dexon-f7328c5ecbd1076582a71ef7bf436485f3868b1f.tar.zst
dexon-f7328c5ecbd1076582a71ef7bf436485f3868b1f.zip
rpc: add pub/sub support
Diffstat (limited to 'eth/filters')
-rw-r--r--eth/filters/api.go79
1 files changed, 66 insertions, 13 deletions
diff --git a/eth/filters/api.go b/eth/filters/api.go
index e6a1ce3ab..956660363 100644
--- a/eth/filters/api.go
+++ b/eth/filters/api.go
@@ -17,15 +17,13 @@
package filters
import (
- "sync"
- "time"
-
"crypto/rand"
"encoding/hex"
- "errors"
-
"encoding/json"
+ "errors"
"fmt"
+ "sync"
+ "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
@@ -33,6 +31,8 @@ import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
+
+ "golang.org/x/net/context"
)
var (
@@ -202,7 +202,7 @@ func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
}
// newLogFilter creates a new log filter.
-func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash) (int, error) {
+func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash, callback func(log *vm.Log, removed bool)) (int, error) {
s.logMu.Lock()
defer s.logMu.Unlock()
@@ -219,17 +219,70 @@ func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []commo
filter.SetAddresses(addresses)
filter.SetTopics(topics)
filter.LogCallback = func(log *vm.Log, removed bool) {
- s.logMu.Lock()
- defer s.logMu.Unlock()
-
- if queue := s.logQueue[id]; queue != nil {
- queue.add(vmlog{log, removed})
+ if callback != nil {
+ callback(log, removed)
+ } else {
+ s.logMu.Lock()
+ defer s.logMu.Unlock()
+ if queue := s.logQueue[id]; queue != nil {
+ queue.add(vmlog{log, removed})
+ }
}
}
return id, nil
}
+func (s *PublicFilterAPI) Logs(ctx context.Context, args NewFilterArgs) (rpc.Subscription, error) {
+ notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier)
+ if !supported {
+ return nil, rpc.ErrNotificationsUnsupported
+ }
+
+ var (
+ externalId string
+ subscription rpc.Subscription
+ err error
+ )
+
+ if externalId, err = newFilterId(); err != nil {
+ return nil, err
+ }
+
+ // uninstall filter when subscription is unsubscribed/cancelled
+ if subscription, err = notifier.NewSubscription(func(string) {
+ s.UninstallFilter(externalId)
+ }); err != nil {
+ return nil, err
+ }
+
+ notifySubscriber := func(log *vm.Log, removed bool) {
+ rpcLog := toRPCLogs(vm.Logs{log}, removed)
+ if err := subscription.Notify(rpcLog); err != nil {
+ subscription.Cancel()
+ }
+ }
+
+ // from and to block number are not used since subscriptions don't allow you to travel to "time"
+ var id int
+ if len(args.Addresses) > 0 {
+ id, err = s.newLogFilter(-1, -1, args.Addresses, args.Topics, notifySubscriber)
+ } else {
+ id, err = s.newLogFilter(-1, -1, nil, args.Topics, notifySubscriber)
+ }
+
+ if err != nil {
+ subscription.Cancel()
+ return nil, err
+ }
+
+ s.filterMapMu.Lock()
+ s.filterMapping[externalId] = id
+ s.filterMapMu.Unlock()
+
+ return subscription, err
+}
+
// NewFilterArgs represents a request to create a new filter.
type NewFilterArgs struct {
FromBlock rpc.BlockNumber
@@ -364,9 +417,9 @@ func (s *PublicFilterAPI) NewFilter(args NewFilterArgs) (string, error) {
var id int
if len(args.Addresses) > 0 {
- id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics)
+ id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics, nil)
} else {
- id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics)
+ id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics, nil)
}
if err != nil {
return "", err