aboutsummaryrefslogtreecommitdiffstats
path: root/rpc/subscription_test.go
diff options
context:
space:
mode:
authorFelix Lange <fjl@users.noreply.github.com>2019-02-04 20:47:34 +0800
committerGitHub <noreply@github.com>2019-02-04 20:47:34 +0800
commit245f3146c26698193c4b479e7bc5825b058c444a (patch)
treec1196f7579e99e89e3e38cd2c7e442ef49a95731 /rpc/subscription_test.go
parentec3432bccbb058567c0ea3f1e6537460f1f0aa29 (diff)
downloadgo-tangerine-245f3146c26698193c4b479e7bc5825b058c444a.tar
go-tangerine-245f3146c26698193c4b479e7bc5825b058c444a.tar.gz
go-tangerine-245f3146c26698193c4b479e7bc5825b058c444a.tar.bz2
go-tangerine-245f3146c26698193c4b479e7bc5825b058c444a.tar.lz
go-tangerine-245f3146c26698193c4b479e7bc5825b058c444a.tar.xz
go-tangerine-245f3146c26698193c4b479e7bc5825b058c444a.tar.zst
go-tangerine-245f3146c26698193c4b479e7bc5825b058c444a.zip
rpc: implement full bi-directional communication (#18471)
New APIs added: client.RegisterName(namespace, service) // makes service available to server client.Notify(ctx, method, args...) // sends a notification ClientFromContext(ctx) // to get a client in handler method This is essentially a rewrite of the server-side code. JSON-RPC processing code is now the same on both server and client side. Many minor issues were fixed in the process and there is a new test suite for JSON-RPC spec compliance (and non-compliance in some cases). List of behavior changes: - Method handlers are now called with a per-request context instead of a per-connection context. The context is canceled right after the method returns. - Subscription error channels are always closed when the connection ends. There is no need to also wait on the Notifier's Closed channel to detect whether the subscription has ended. - Client now omits "params" instead of sending "params": null when there are no arguments to a call. The previous behavior was not compliant with the spec. The server still accepts "params": null. - Floating point numbers are allowed as "id". The spec doesn't allow them, but we handle request "id" as json.RawMessage and guarantee that the same number will be sent back. - Logging is improved significantly. There is now a message at DEBUG level for each RPC call served.
Diffstat (limited to 'rpc/subscription_test.go')
-rw-r--r--rpc/subscription_test.go319
1 files changed, 106 insertions, 213 deletions
diff --git a/rpc/subscription_test.go b/rpc/subscription_test.go
index 24febc919..eba192450 100644
--- a/rpc/subscription_test.go
+++ b/rpc/subscription_test.go
@@ -17,232 +17,62 @@
package rpc
import (
- "context"
"encoding/json"
"fmt"
"net"
- "sync"
+ "strings"
"testing"
"time"
)
-type NotificationTestService struct {
- mu sync.Mutex
- unsubscribed chan string
- gotHangSubscriptionReq chan struct{}
- unblockHangSubscription chan struct{}
-}
-
-func (s *NotificationTestService) Echo(i int) int {
- return i
-}
-
-func (s *NotificationTestService) Unsubscribe(subid string) {
- if s.unsubscribed != nil {
- s.unsubscribed <- subid
- }
-}
-
-func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val int) (*Subscription, error) {
- notifier, supported := NotifierFromContext(ctx)
- if !supported {
- return nil, ErrNotificationsUnsupported
- }
-
- // by explicitly creating an subscription we make sure that the subscription id is send back to the client
- // before the first subscription.Notify is called. Otherwise the events might be send before the response
- // for the eth_subscribe method.
- subscription := notifier.CreateSubscription()
-
- go func() {
- // test expects n events, if we begin sending event immediately some events
- // will probably be dropped since the subscription ID might not be send to
- // the client.
- for i := 0; i < n; i++ {
- if err := notifier.Notify(subscription.ID, val+i); err != nil {
- return
- }
- }
-
- select {
- case <-notifier.Closed():
- case <-subscription.Err():
- }
- if s.unsubscribed != nil {
- s.unsubscribed <- string(subscription.ID)
- }
- }()
-
- return subscription, nil
-}
-
-// HangSubscription blocks on s.unblockHangSubscription before
-// sending anything.
-func (s *NotificationTestService) HangSubscription(ctx context.Context, val int) (*Subscription, error) {
- notifier, supported := NotifierFromContext(ctx)
- if !supported {
- return nil, ErrNotificationsUnsupported
- }
-
- s.gotHangSubscriptionReq <- struct{}{}
- <-s.unblockHangSubscription
- subscription := notifier.CreateSubscription()
-
- go func() {
- notifier.Notify(subscription.ID, val)
- }()
- return subscription, nil
-}
-
-func TestNotifications(t *testing.T) {
- server := NewServer()
- service := &NotificationTestService{unsubscribed: make(chan string)}
-
- if err := server.RegisterName("eth", service); err != nil {
- t.Fatalf("unable to register test service %v", err)
- }
-
- clientConn, serverConn := net.Pipe()
-
- go server.ServeCodec(NewJSONCodec(serverConn), OptionMethodInvocation|OptionSubscriptions)
-
- out := json.NewEncoder(clientConn)
- in := json.NewDecoder(clientConn)
-
- n := 5
- val := 12345
- request := map[string]interface{}{
- "id": 1,
- "method": "eth_subscribe",
- "version": "2.0",
- "params": []interface{}{"someSubscription", n, val},
- }
-
- // create subscription
- if err := out.Encode(request); err != nil {
- t.Fatal(err)
- }
-
- var subid string
- response := jsonSuccessResponse{Result: subid}
- if err := in.Decode(&response); err != nil {
- t.Fatal(err)
- }
-
- var ok bool
- if _, ok = response.Result.(string); !ok {
- t.Fatalf("expected subscription id, got %T", response.Result)
- }
-
- for i := 0; i < n; i++ {
- var notification jsonNotification
- if err := in.Decode(&notification); err != nil {
- t.Fatalf("%v", err)
- }
-
- if int(notification.Params.Result.(float64)) != val+i {
- t.Fatalf("expected %d, got %d", val+i, notification.Params.Result)
- }
- }
-
- clientConn.Close() // causes notification unsubscribe callback to be called
- select {
- case <-service.unsubscribed:
- case <-time.After(1 * time.Second):
- t.Fatal("Unsubscribe not called after one second")
- }
-}
-
-func waitForMessages(t *testing.T, in *json.Decoder, successes chan<- jsonSuccessResponse,
- failures chan<- jsonErrResponse, notifications chan<- jsonNotification, errors chan<- error) {
-
- // read and parse server messages
- for {
- var rmsg json.RawMessage
- if err := in.Decode(&rmsg); err != nil {
- return
+func TestNewID(t *testing.T) {
+ hexchars := "0123456789ABCDEFabcdef"
+ for i := 0; i < 100; i++ {
+ id := string(NewID())
+ if !strings.HasPrefix(id, "0x") {
+ t.Fatalf("invalid ID prefix, want '0x...', got %s", id)
}
- var responses []map[string]interface{}
- if rmsg[0] == '[' {
- if err := json.Unmarshal(rmsg, &responses); err != nil {
- errors <- fmt.Errorf("Received invalid message: %s", rmsg)
- return
- }
- } else {
- var msg map[string]interface{}
- if err := json.Unmarshal(rmsg, &msg); err != nil {
- errors <- fmt.Errorf("Received invalid message: %s", rmsg)
- return
- }
- responses = append(responses, msg)
+ id = id[2:]
+ if len(id) == 0 || len(id) > 32 {
+ t.Fatalf("invalid ID length, want len(id) > 0 && len(id) <= 32), got %d", len(id))
}
- for _, msg := range responses {
- // determine what kind of msg was received and broadcast
- // it to over the corresponding channel
- if _, found := msg["result"]; found {
- successes <- jsonSuccessResponse{
- Version: msg["jsonrpc"].(string),
- Id: msg["id"],
- Result: msg["result"],
- }
- continue
- }
- if _, found := msg["error"]; found {
- params := msg["params"].(map[string]interface{})
- failures <- jsonErrResponse{
- Version: msg["jsonrpc"].(string),
- Id: msg["id"],
- Error: jsonError{int(params["subscription"].(float64)), params["message"].(string), params["data"]},
- }
- continue
+ for i := 0; i < len(id); i++ {
+ if strings.IndexByte(hexchars, id[i]) == -1 {
+ t.Fatalf("unexpected byte, want any valid hex char, got %c", id[i])
}
- if _, found := msg["params"]; found {
- params := msg["params"].(map[string]interface{})
- notifications <- jsonNotification{
- Version: msg["jsonrpc"].(string),
- Method: msg["method"].(string),
- Params: jsonSubscription{params["subscription"].(string), params["result"]},
- }
- continue
- }
- errors <- fmt.Errorf("Received invalid message: %s", msg)
}
}
}
-// TestSubscriptionMultipleNamespaces ensures that subscriptions can exists
-// for multiple different namespaces.
-func TestSubscriptionMultipleNamespaces(t *testing.T) {
+func TestSubscriptions(t *testing.T) {
var (
namespaces = []string{"eth", "shh", "bzz"}
- service = NotificationTestService{}
- subCount = len(namespaces) * 2
+ service = &notificationTestService{}
+ subCount = len(namespaces)
notificationCount = 3
server = NewServer()
clientConn, serverConn = net.Pipe()
out = json.NewEncoder(clientConn)
in = json.NewDecoder(clientConn)
- successes = make(chan jsonSuccessResponse)
- failures = make(chan jsonErrResponse)
- notifications = make(chan jsonNotification)
- errors = make(chan error, 10)
+ successes = make(chan subConfirmation)
+ notifications = make(chan subscriptionResult)
+ errors = make(chan error, subCount*notificationCount+1)
)
// setup and start server
for _, namespace := range namespaces {
- if err := server.RegisterName(namespace, &service); err != nil {
+ if err := server.RegisterName(namespace, service); err != nil {
t.Fatalf("unable to register test service %v", err)
}
}
-
go server.ServeCodec(NewJSONCodec(serverConn), OptionMethodInvocation|OptionSubscriptions)
defer server.Stop()
// wait for message and write them to the given channels
- go waitForMessages(t, in, successes, failures, notifications, errors)
+ go waitForMessages(in, successes, notifications, errors)
// create subscriptions one by one
for i, namespace := range namespaces {
@@ -252,27 +82,11 @@ func TestSubscriptionMultipleNamespaces(t *testing.T) {
"version": "2.0",
"params": []interface{}{"someSubscription", notificationCount, i},
}
-
if err := out.Encode(&request); err != nil {
t.Fatalf("Could not create subscription: %v", err)
}
}
- // create all subscriptions in 1 batch
- var requests []interface{}
- for i, namespace := range namespaces {
- requests = append(requests, map[string]interface{}{
- "id": i,
- "method": fmt.Sprintf("%s_subscribe", namespace),
- "version": "2.0",
- "params": []interface{}{"someSubscription", notificationCount, i},
- })
- }
-
- if err := out.Encode(&requests); err != nil {
- t.Fatalf("Could not create subscription in batch form: %v", err)
- }
-
timeout := time.After(30 * time.Second)
subids := make(map[string]string, subCount)
count := make(map[string]int, subCount)
@@ -285,17 +99,14 @@ func TestSubscriptionMultipleNamespaces(t *testing.T) {
}
return done
}
-
for !allReceived() {
select {
- case suc := <-successes: // subscription created
- subids[namespaces[int(suc.Id.(float64))]] = suc.Result.(string)
+ case confirmation := <-successes: // subscription created
+ subids[namespaces[confirmation.reqid]] = string(confirmation.subid)
case notification := <-notifications:
- count[notification.Params.Subscription]++
+ count[notification.ID]++
case err := <-errors:
t.Fatal(err)
- case failure := <-failures:
- t.Errorf("received error: %v", failure.Error)
case <-timeout:
for _, namespace := range namespaces {
subid, found := subids[namespace]
@@ -311,3 +122,85 @@ func TestSubscriptionMultipleNamespaces(t *testing.T) {
}
}
}
+
+// This test checks that unsubscribing works.
+func TestServerUnsubscribe(t *testing.T) {
+ // Start the server.
+ server := newTestServer()
+ service := &notificationTestService{unsubscribed: make(chan string)}
+ server.RegisterName("nftest2", service)
+ p1, p2 := net.Pipe()
+ go server.ServeCodec(NewJSONCodec(p1), OptionMethodInvocation|OptionSubscriptions)
+
+ p2.SetDeadline(time.Now().Add(10 * time.Second))
+
+ // Subscribe.
+ p2.Write([]byte(`{"jsonrpc":"2.0","id":1,"method":"nftest2_subscribe","params":["someSubscription",0,10]}`))
+
+ // Handle received messages.
+ resps := make(chan subConfirmation)
+ notifications := make(chan subscriptionResult)
+ errors := make(chan error)
+ go waitForMessages(json.NewDecoder(p2), resps, notifications, errors)
+
+ // Receive the subscription ID.
+ var sub subConfirmation
+ select {
+ case sub = <-resps:
+ case err := <-errors:
+ t.Fatal(err)
+ }
+
+ // Unsubscribe and check that it is handled on the server side.
+ p2.Write([]byte(`{"jsonrpc":"2.0","method":"nftest2_unsubscribe","params":["` + sub.subid + `"]}`))
+ for {
+ select {
+ case id := <-service.unsubscribed:
+ if id != string(sub.subid) {
+ t.Errorf("wrong subscription ID unsubscribed")
+ }
+ return
+ case err := <-errors:
+ t.Fatal(err)
+ case <-notifications:
+ // drop notifications
+ }
+ }
+}
+
+type subConfirmation struct {
+ reqid int
+ subid ID
+}
+
+func waitForMessages(in *json.Decoder, successes chan subConfirmation, notifications chan subscriptionResult, errors chan error) {
+ for {
+ var msg jsonrpcMessage
+ if err := in.Decode(&msg); err != nil {
+ errors <- fmt.Errorf("decode error: %v", err)
+ return
+ }
+ switch {
+ case msg.isNotification():
+ var res subscriptionResult
+ if err := json.Unmarshal(msg.Params, &res); err != nil {
+ errors <- fmt.Errorf("invalid subscription result: %v", err)
+ } else {
+ notifications <- res
+ }
+ case msg.isResponse():
+ var c subConfirmation
+ if msg.Error != nil {
+ errors <- msg.Error
+ } else if err := json.Unmarshal(msg.Result, &c.subid); err != nil {
+ errors <- fmt.Errorf("invalid response: %v", err)
+ } else {
+ json.Unmarshal(msg.ID, &c.reqid)
+ successes <- c
+ }
+ default:
+ errors <- fmt.Errorf("unrecognized message: %v", msg)
+ return
+ }
+ }
+}