aboutsummaryrefslogtreecommitdiffstats
path: root/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'rpc')
-rw-r--r--rpc/client.go64
-rw-r--r--rpc/client_test.go32
2 files changed, 46 insertions, 50 deletions
diff --git a/rpc/client.go b/rpc/client.go
index f02366a39..8aa84ec98 100644
--- a/rpc/client.go
+++ b/rpc/client.go
@@ -349,85 +349,49 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
return err
}
-// ShhSubscribe calls the "shh_subscribe" method with the given arguments,
-// registering a subscription. Server notifications for the subscription are
-// sent to the given channel. The element type of the channel must match the
-// expected type of content returned by the subscription.
-//
-// The context argument cancels the RPC request that sets up the subscription but has no
-// effect on the subscription after ShhSubscribe has returned.
-//
-// Slow subscribers will be dropped eventually. Client buffers up to 8000 notifications
-// before considering the subscriber dead. The subscription Err channel will receive
-// ErrSubscriptionQueueOverflow. Use a sufficiently large buffer on the channel or ensure
-// that the channel usually has at least one reader to prevent this issue.
-func (c *Client) ShhSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
- // Check type of channel first.
- chanVal := reflect.ValueOf(channel)
- if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 {
- panic("first argument to ShhSubscribe must be a writable channel")
- }
- if chanVal.IsNil() {
- panic("channel given to ShhSubscribe must not be nil")
- }
- if c.isHTTP {
- return nil, ErrNotificationsUnsupported
- }
-
- msg, err := c.newMessage("shh"+subscribeMethodSuffix, args...)
- if err != nil {
- return nil, err
- }
- op := &requestOp{
- ids: []json.RawMessage{msg.ID},
- resp: make(chan *jsonrpcMessage),
- sub: newClientSubscription(c, "shh", chanVal),
- }
+// EthSubscribe registers a subscripion under the "eth" namespace.
+func (c *Client) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
+ return c.Subscribe(ctx, "eth", channel, args...)
+}
- // Send the subscription request.
- // The arrival and validity of the response is signaled on sub.quit.
- if err := c.send(ctx, op, msg); err != nil {
- return nil, err
- }
- if _, err := op.wait(ctx); err != nil {
- return nil, err
- }
- return op.sub, nil
+// ShhSubscribe registers a subscripion under the "shh" namespace.
+func (c *Client) ShhSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
+ return c.Subscribe(ctx, "shh", channel, args...)
}
-// EthSubscribe calls the "eth_subscribe" method with the given arguments,
+// Subscribe calls the "<namespace>_subscribe" method with the given arguments,
// registering a subscription. Server notifications for the subscription are
// sent to the given channel. The element type of the channel must match the
// expected type of content returned by the subscription.
//
// The context argument cancels the RPC request that sets up the subscription but has no
-// effect on the subscription after EthSubscribe has returned.
+// effect on the subscription after Subscribe has returned.
//
// Slow subscribers will be dropped eventually. Client buffers up to 8000 notifications
// before considering the subscriber dead. The subscription Err channel will receive
// ErrSubscriptionQueueOverflow. Use a sufficiently large buffer on the channel or ensure
// that the channel usually has at least one reader to prevent this issue.
-func (c *Client) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
+func (c *Client) Subscribe(ctx context.Context, namespace string, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
// Check type of channel first.
chanVal := reflect.ValueOf(channel)
if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 {
- panic("first argument to EthSubscribe must be a writable channel")
+ panic("first argument to Subscribe must be a writable channel")
}
if chanVal.IsNil() {
- panic("channel given to EthSubscribe must not be nil")
+ panic("channel given to Subscribe must not be nil")
}
if c.isHTTP {
return nil, ErrNotificationsUnsupported
}
- msg, err := c.newMessage("eth"+subscribeMethodSuffix, args...)
+ msg, err := c.newMessage(namespace+subscribeMethodSuffix, args...)
if err != nil {
return nil, err
}
op := &requestOp{
ids: []json.RawMessage{msg.ID},
resp: make(chan *jsonrpcMessage),
- sub: newClientSubscription(c, "eth", chanVal),
+ sub: newClientSubscription(c, namespace, chanVal),
}
// Send the subscription request.
diff --git a/rpc/client_test.go b/rpc/client_test.go
index 10d74670b..4f354d389 100644
--- a/rpc/client_test.go
+++ b/rpc/client_test.go
@@ -251,6 +251,38 @@ func TestClientSubscribe(t *testing.T) {
}
}
+func TestClientSubscribeCustomNamespace(t *testing.T) {
+ namespace := "custom"
+ server := newTestServer(namespace, new(NotificationTestService))
+ defer server.Stop()
+ client := DialInProc(server)
+ defer client.Close()
+
+ nc := make(chan int)
+ count := 10
+ sub, err := client.Subscribe(context.Background(), namespace, nc, "someSubscription", count, 0)
+ if err != nil {
+ t.Fatal("can't subscribe:", err)
+ }
+ for i := 0; i < count; i++ {
+ if val := <-nc; val != i {
+ t.Fatalf("value mismatch: got %d, want %d", val, i)
+ }
+ }
+
+ sub.Unsubscribe()
+ select {
+ case v := <-nc:
+ t.Fatal("received value after unsubscribe:", v)
+ case err := <-sub.Err():
+ if err != nil {
+ t.Fatalf("Err returned a non-nil error after explicit unsubscribe: %q", err)
+ }
+ case <-time.After(1 * time.Second):
+ t.Fatalf("subscription not closed within 1s after unsubscribe")
+ }
+}
+
// In this test, the connection drops while EthSubscribe is
// waiting for a response.
func TestClientSubscribeClose(t *testing.T) {