aboutsummaryrefslogtreecommitdiffstats
path: root/rpc/subscription_test.go
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2017-05-03 16:41:07 +0800
committerGitHub <noreply@github.com>2017-05-03 16:41:07 +0800
commita8eafcdc0e01ba1adcb25bfb0e06e7575c1a436e (patch)
tree94c70a20c9e3b025a9223b70f49b8c0210840e89 /rpc/subscription_test.go
parentc3dc01caf18addd8466bf8667dca2052ad1342f8 (diff)
parent37e3f561f15cbedf10c01847e58a079f9b86bf6f (diff)
downloadgo-tangerine-a8eafcdc0e01ba1adcb25bfb0e06e7575c1a436e.tar
go-tangerine-a8eafcdc0e01ba1adcb25bfb0e06e7575c1a436e.tar.gz
go-tangerine-a8eafcdc0e01ba1adcb25bfb0e06e7575c1a436e.tar.bz2
go-tangerine-a8eafcdc0e01ba1adcb25bfb0e06e7575c1a436e.tar.lz
go-tangerine-a8eafcdc0e01ba1adcb25bfb0e06e7575c1a436e.tar.xz
go-tangerine-a8eafcdc0e01ba1adcb25bfb0e06e7575c1a436e.tar.zst
go-tangerine-a8eafcdc0e01ba1adcb25bfb0e06e7575c1a436e.zip
Merge pull request #13885 from bas-vk/rpc_generic_pubsub
rpc: support subscriptions under custom namespaces
Diffstat (limited to 'rpc/subscription_test.go')
-rw-r--r--rpc/subscription_test.go160
1 files changed, 160 insertions, 0 deletions
diff --git a/rpc/subscription_test.go b/rpc/subscription_test.go
index 345b4e5f2..0ed15ddfe 100644
--- a/rpc/subscription_test.go
+++ b/rpc/subscription_test.go
@@ -19,6 +19,7 @@ package rpc
import (
"context"
"encoding/json"
+ "fmt"
"net"
"sync"
"testing"
@@ -162,3 +163,162 @@ func TestNotifications(t *testing.T) {
t.Error("unsubscribe callback not called after closing connection")
}
}
+
+func waitForMessages(t *testing.T, in *json.Decoder, successes chan<- jsonSuccessResponse,
+ failures chan<- jsonErrResponse, notifications chan<- jsonNotification) {
+
+ // read and parse server messages
+ for {
+ var rmsg json.RawMessage
+ if err := in.Decode(&rmsg); err != nil {
+ return
+ }
+
+ var responses []map[string]interface{}
+ if rmsg[0] == '[' {
+ if err := json.Unmarshal(rmsg, &responses); err != nil {
+ t.Fatalf("Received invalid message: %s", rmsg)
+ }
+ } else {
+ var msg map[string]interface{}
+ if err := json.Unmarshal(rmsg, &msg); err != nil {
+ t.Fatalf("Received invalid message: %s", rmsg)
+ }
+ responses = append(responses, msg)
+ }
+
+ for _, msg := range responses {
+ // determine what kind of msg was received and broadcast
+ // it to over the corresponding channel
+ if _, found := msg["result"]; found {
+ successes <- jsonSuccessResponse{
+ Version: msg["jsonrpc"].(string),
+ Id: msg["id"],
+ Result: msg["result"],
+ }
+ continue
+ }
+ if _, found := msg["error"]; found {
+ params := msg["params"].(map[string]interface{})
+ failures <- jsonErrResponse{
+ Version: msg["jsonrpc"].(string),
+ Id: msg["id"],
+ Error: jsonError{int(params["subscription"].(float64)), params["message"].(string), params["data"]},
+ }
+ continue
+ }
+ if _, found := msg["params"]; found {
+ params := msg["params"].(map[string]interface{})
+ notifications <- jsonNotification{
+ Version: msg["jsonrpc"].(string),
+ Method: msg["method"].(string),
+ Params: jsonSubscription{params["subscription"].(string), params["result"]},
+ }
+ continue
+ }
+ t.Fatalf("Received invalid message: %s", msg)
+ }
+ }
+}
+
+// TestSubscriptionMultipleNamespaces ensures that subscriptions can exists
+// for multiple different namespaces.
+func TestSubscriptionMultipleNamespaces(t *testing.T) {
+ var (
+ namespaces = []string{"eth", "shh", "bzz"}
+ server = NewServer()
+ service = NotificationTestService{}
+ clientConn, serverConn = net.Pipe()
+
+ out = json.NewEncoder(clientConn)
+ in = json.NewDecoder(clientConn)
+ successes = make(chan jsonSuccessResponse)
+ failures = make(chan jsonErrResponse)
+ notifications = make(chan jsonNotification)
+ )
+
+ // setup and start server
+ for _, namespace := range namespaces {
+ if err := server.RegisterName(namespace, &service); err != nil {
+ t.Fatalf("unable to register test service %v", err)
+ }
+ }
+
+ go server.ServeCodec(NewJSONCodec(serverConn), OptionMethodInvocation|OptionSubscriptions)
+ defer server.Stop()
+
+ // wait for message and write them to the given channels
+ go waitForMessages(t, in, successes, failures, notifications)
+
+ // create subscriptions one by one
+ n := 3
+ for i, namespace := range namespaces {
+ request := map[string]interface{}{
+ "id": i,
+ "method": fmt.Sprintf("%s_subscribe", namespace),
+ "version": "2.0",
+ "params": []interface{}{"someSubscription", n, i},
+ }
+
+ if err := out.Encode(&request); err != nil {
+ t.Fatalf("Could not create subscription: %v", err)
+ }
+ }
+
+ // create all subscriptions in 1 batch
+ var requests []interface{}
+ for i, namespace := range namespaces {
+ requests = append(requests, map[string]interface{}{
+ "id": i,
+ "method": fmt.Sprintf("%s_subscribe", namespace),
+ "version": "2.0",
+ "params": []interface{}{"someSubscription", n, i},
+ })
+ }
+
+ if err := out.Encode(&requests); err != nil {
+ t.Fatalf("Could not create subscription in batch form: %v", err)
+ }
+
+ timeout := time.After(30 * time.Second)
+ subids := make(map[string]string, 2*len(namespaces))
+ count := make(map[string]int, 2*len(namespaces))
+
+ for {
+ done := true
+ for id, _ := range count {
+ if count, found := count[id]; !found || count < (2*n) {
+ done = false
+ }
+ }
+
+ if done && len(count) == len(namespaces) {
+ break
+ }
+
+ select {
+ case suc := <-successes: // subscription created
+ subids[namespaces[int(suc.Id.(float64))]] = suc.Result.(string)
+ case failure := <-failures:
+ t.Errorf("received error: %v", failure.Error)
+ case notification := <-notifications:
+ if cnt, found := count[notification.Params.Subscription]; found {
+ count[notification.Params.Subscription] = cnt + 1
+ } else {
+ count[notification.Params.Subscription] = 1
+ }
+ case <-timeout:
+ for _, namespace := range namespaces {
+ subid, found := subids[namespace]
+ if !found {
+ t.Errorf("Subscription for '%s' not created", namespace)
+ continue
+ }
+ if count, found := count[subid]; !found || count < n {
+ t.Errorf("Didn't receive all notifications (%d<%d) in time for namespace '%s'", count, n, namespace)
+ }
+ }
+ return
+ }
+ }
+}