From 47ff8130124b479f1f051312eed50c33f0a38e6f Mon Sep 17 00:00:00 2001 From: Bas van Kervel Date: Wed, 27 Jul 2016 17:47:46 +0200 Subject: rpc: refactor subscriptions and filters --- rpc/subscription_test.go | 165 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 165 insertions(+) create mode 100644 rpc/subscription_test.go (limited to 'rpc/subscription_test.go') diff --git a/rpc/subscription_test.go b/rpc/subscription_test.go new file mode 100644 index 000000000..8bb341694 --- /dev/null +++ b/rpc/subscription_test.go @@ -0,0 +1,165 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import ( + "encoding/json" + "net" + "sync" + "testing" + "time" + + "golang.org/x/net/context" +) + +type NotificationTestService struct { + mu sync.Mutex + unsubscribed bool + + gotHangSubscriptionReq chan struct{} + unblockHangSubscription chan struct{} +} + +func (s *NotificationTestService) Echo(i int) int { + return i +} + +func (s *NotificationTestService) wasUnsubCallbackCalled() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.unsubscribed +} + +func (s *NotificationTestService) Unsubscribe(subid string) { + s.mu.Lock() + s.unsubscribed = true + s.mu.Unlock() +} + +func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val int) (*Subscription, error) { + notifier, supported := NotifierFromContext(ctx) + if !supported { + return nil, ErrNotificationsUnsupported + } + + // by explicitly creating an subscription we make sure that the subscription id is send back to the client + // before the first subscription.Notify is called. Otherwise the events might be send before the response + // for the eth_subscribe method. + subscription := notifier.CreateSubscription() + + go func() { + // test expects n events, if we begin sending event immediatly some events + // will probably be dropped since the subscription ID might not be send to + // the client. + time.Sleep(5 * time.Second) + for i := 0; i < n; i++ { + if err := notifier.Notify(subscription.ID, val+i); err != nil { + return + } + } + + select { + case <-notifier.Closed(): + s.mu.Lock() + s.unsubscribed = true + s.mu.Unlock() + case <-subscription.Err(): + s.mu.Lock() + s.unsubscribed = true + s.mu.Unlock() + } + }() + + return subscription, nil +} + +// HangSubscription blocks on s.unblockHangSubscription before +// sending anything. +func (s *NotificationTestService) HangSubscription(ctx context.Context, val int) (*Subscription, error) { + notifier, supported := NotifierFromContext(ctx) + if !supported { + return nil, ErrNotificationsUnsupported + } + + s.gotHangSubscriptionReq <- struct{}{} + <-s.unblockHangSubscription + subscription := notifier.CreateSubscription() + + go func() { + notifier.Notify(subscription.ID, val) + }() + return subscription, nil +} + +func TestNotifications(t *testing.T) { + server := NewServer() + service := &NotificationTestService{} + + if err := server.RegisterName("eth", service); err != nil { + t.Fatalf("unable to register test service %v", err) + } + + clientConn, serverConn := net.Pipe() + + go server.ServeCodec(NewJSONCodec(serverConn), OptionMethodInvocation|OptionSubscriptions) + + out := json.NewEncoder(clientConn) + in := json.NewDecoder(clientConn) + + n := 5 + val := 12345 + request := map[string]interface{}{ + "id": 1, + "method": "eth_subscribe", + "version": "2.0", + "params": []interface{}{"someSubscription", n, val}, + } + + // create subscription + if err := out.Encode(request); err != nil { + t.Fatal(err) + } + + var subid string + response := jsonSuccessResponse{Result: subid} + if err := in.Decode(&response); err != nil { + t.Fatal(err) + } + + var ok bool + if subid, ok = response.Result.(string); !ok { + t.Fatalf("expected subscription id, got %T", response.Result) + } + + for i := 0; i < n; i++ { + var notification jsonNotification + if err := in.Decode(¬ification); err != nil { + t.Fatalf("%v", err) + } + + if int(notification.Params.Result.(float64)) != val+i { + t.Fatalf("expected %d, got %d", val+i, notification.Params.Result) + } + } + + clientConn.Close() // causes notification unsubscribe callback to be called + time.Sleep(1 * time.Second) + + if !service.wasUnsubCallbackCalled() { + t.Error("unsubscribe callback not called after closing connection") + } +} -- cgit v1.2.3