diff options
Diffstat (limited to 'rpc/subscription_test.go')
-rw-r--r-- | rpc/subscription_test.go | 160 |
1 files changed, 160 insertions, 0 deletions
diff --git a/rpc/subscription_test.go b/rpc/subscription_test.go index 345b4e5f2..0ed15ddfe 100644 --- a/rpc/subscription_test.go +++ b/rpc/subscription_test.go @@ -19,6 +19,7 @@ package rpc import ( "context" "encoding/json" + "fmt" "net" "sync" "testing" @@ -162,3 +163,162 @@ func TestNotifications(t *testing.T) { t.Error("unsubscribe callback not called after closing connection") } } + +func waitForMessages(t *testing.T, in *json.Decoder, successes chan<- jsonSuccessResponse, + failures chan<- jsonErrResponse, notifications chan<- jsonNotification) { + + // read and parse server messages + for { + var rmsg json.RawMessage + if err := in.Decode(&rmsg); err != nil { + return + } + + var responses []map[string]interface{} + if rmsg[0] == '[' { + if err := json.Unmarshal(rmsg, &responses); err != nil { + t.Fatalf("Received invalid message: %s", rmsg) + } + } else { + var msg map[string]interface{} + if err := json.Unmarshal(rmsg, &msg); err != nil { + t.Fatalf("Received invalid message: %s", rmsg) + } + responses = append(responses, msg) + } + + for _, msg := range responses { + // determine what kind of msg was received and broadcast + // it to over the corresponding channel + if _, found := msg["result"]; found { + successes <- jsonSuccessResponse{ + Version: msg["jsonrpc"].(string), + Id: msg["id"], + Result: msg["result"], + } + continue + } + if _, found := msg["error"]; found { + params := msg["params"].(map[string]interface{}) + failures <- jsonErrResponse{ + Version: msg["jsonrpc"].(string), + Id: msg["id"], + Error: jsonError{int(params["subscription"].(float64)), params["message"].(string), params["data"]}, + } + continue + } + if _, found := msg["params"]; found { + params := msg["params"].(map[string]interface{}) + notifications <- jsonNotification{ + Version: msg["jsonrpc"].(string), + Method: msg["method"].(string), + Params: jsonSubscription{params["subscription"].(string), params["result"]}, + } + continue + } + t.Fatalf("Received invalid message: %s", msg) + } + } +} + +// TestSubscriptionMultipleNamespaces ensures that subscriptions can exists +// for multiple different namespaces. +func TestSubscriptionMultipleNamespaces(t *testing.T) { + var ( + namespaces = []string{"eth", "shh", "bzz"} + server = NewServer() + service = NotificationTestService{} + clientConn, serverConn = net.Pipe() + + out = json.NewEncoder(clientConn) + in = json.NewDecoder(clientConn) + successes = make(chan jsonSuccessResponse) + failures = make(chan jsonErrResponse) + notifications = make(chan jsonNotification) + ) + + // setup and start server + for _, namespace := range namespaces { + if err := server.RegisterName(namespace, &service); err != nil { + t.Fatalf("unable to register test service %v", err) + } + } + + go server.ServeCodec(NewJSONCodec(serverConn), OptionMethodInvocation|OptionSubscriptions) + defer server.Stop() + + // wait for message and write them to the given channels + go waitForMessages(t, in, successes, failures, notifications) + + // create subscriptions one by one + n := 3 + for i, namespace := range namespaces { + request := map[string]interface{}{ + "id": i, + "method": fmt.Sprintf("%s_subscribe", namespace), + "version": "2.0", + "params": []interface{}{"someSubscription", n, i}, + } + + if err := out.Encode(&request); err != nil { + t.Fatalf("Could not create subscription: %v", err) + } + } + + // create all subscriptions in 1 batch + var requests []interface{} + for i, namespace := range namespaces { + requests = append(requests, map[string]interface{}{ + "id": i, + "method": fmt.Sprintf("%s_subscribe", namespace), + "version": "2.0", + "params": []interface{}{"someSubscription", n, i}, + }) + } + + if err := out.Encode(&requests); err != nil { + t.Fatalf("Could not create subscription in batch form: %v", err) + } + + timeout := time.After(30 * time.Second) + subids := make(map[string]string, 2*len(namespaces)) + count := make(map[string]int, 2*len(namespaces)) + + for { + done := true + for id, _ := range count { + if count, found := count[id]; !found || count < (2*n) { + done = false + } + } + + if done && len(count) == len(namespaces) { + break + } + + select { + case suc := <-successes: // subscription created + subids[namespaces[int(suc.Id.(float64))]] = suc.Result.(string) + case failure := <-failures: + t.Errorf("received error: %v", failure.Error) + case notification := <-notifications: + if cnt, found := count[notification.Params.Subscription]; found { + count[notification.Params.Subscription] = cnt + 1 + } else { + count[notification.Params.Subscription] = 1 + } + case <-timeout: + for _, namespace := range namespaces { + subid, found := subids[namespace] + if !found { + t.Errorf("Subscription for '%s' not created", namespace) + continue + } + if count, found := count[subid]; !found || count < n { + t.Errorf("Didn't receive all notifications (%d<%d) in time for namespace '%s'", count, n, namespace) + } + } + return + } + } +} |