aboutsummaryrefslogtreecommitdiffstats
path: root/rpc/subscription.go
diff options
context:
space:
mode:
authorFelix Lange <fjl@users.noreply.github.com>2019-02-04 20:47:34 +0800
committerGitHub <noreply@github.com>2019-02-04 20:47:34 +0800
commit245f3146c26698193c4b479e7bc5825b058c444a (patch)
treec1196f7579e99e89e3e38cd2c7e442ef49a95731 /rpc/subscription.go
parentec3432bccbb058567c0ea3f1e6537460f1f0aa29 (diff)
downloadgo-tangerine-245f3146c26698193c4b479e7bc5825b058c444a.tar
go-tangerine-245f3146c26698193c4b479e7bc5825b058c444a.tar.gz
go-tangerine-245f3146c26698193c4b479e7bc5825b058c444a.tar.bz2
go-tangerine-245f3146c26698193c4b479e7bc5825b058c444a.tar.lz
go-tangerine-245f3146c26698193c4b479e7bc5825b058c444a.tar.xz
go-tangerine-245f3146c26698193c4b479e7bc5825b058c444a.tar.zst
go-tangerine-245f3146c26698193c4b479e7bc5825b058c444a.zip
rpc: implement full bi-directional communication (#18471)
New APIs added: client.RegisterName(namespace, service) // makes service available to server client.Notify(ctx, method, args...) // sends a notification ClientFromContext(ctx) // to get a client in handler method This is essentially a rewrite of the server-side code. JSON-RPC processing code is now the same on both server and client side. Many minor issues were fixed in the process and there is a new test suite for JSON-RPC spec compliance (and non-compliance in some cases). List of behavior changes: - Method handlers are now called with a per-request context instead of a per-connection context. The context is canceled right after the method returns. - Subscription error channels are always closed when the connection ends. There is no need to also wait on the Notifier's Closed channel to detect whether the subscription has ended. - Client now omits "params" instead of sending "params": null when there are no arguments to a call. The previous behavior was not compliant with the spec. The server still accepts "params": null. - Floating point numbers are allowed as "id". The spec doesn't allow them, but we handle request "id" as json.RawMessage and guarantee that the same number will be sent back. - Logging is improved significantly. There is now a message at DEBUG level for each RPC call served.
Diffstat (limited to 'rpc/subscription.go')
-rw-r--r--rpc/subscription.go338
1 files changed, 258 insertions, 80 deletions
diff --git a/rpc/subscription.go b/rpc/subscription.go
index 6bbb6f75d..c1e869b8a 100644
--- a/rpc/subscription.go
+++ b/rpc/subscription.go
@@ -17,9 +17,19 @@
package rpc
import (
+ "bufio"
+ "container/list"
"context"
+ crand "crypto/rand"
+ "encoding/binary"
+ "encoding/hex"
+ "encoding/json"
"errors"
+ "math/rand"
+ "reflect"
+ "strings"
"sync"
+ "time"
)
var (
@@ -29,121 +39,289 @@ var (
ErrSubscriptionNotFound = errors.New("subscription not found")
)
+var globalGen = randomIDGenerator()
+
// ID defines a pseudo random number that is used to identify RPC subscriptions.
type ID string
-// a Subscription is created by a notifier and tight to that notifier. The client can use
-// this subscription to wait for an unsubscribe request for the client, see Err().
-type Subscription struct {
- ID ID
- namespace string
- err chan error // closed on unsubscribe
+// NewID returns a new, random ID.
+func NewID() ID {
+ return globalGen()
}
-// Err returns a channel that is closed when the client send an unsubscribe request.
-func (s *Subscription) Err() <-chan error {
- return s.err
+// randomIDGenerator returns a function generates a random IDs.
+func randomIDGenerator() func() ID {
+ seed, err := binary.ReadVarint(bufio.NewReader(crand.Reader))
+ if err != nil {
+ seed = int64(time.Now().Nanosecond())
+ }
+ var (
+ mu sync.Mutex
+ rng = rand.New(rand.NewSource(seed))
+ )
+ return func() ID {
+ mu.Lock()
+ defer mu.Unlock()
+ id := make([]byte, 16)
+ rng.Read(id)
+ return encodeID(id)
+ }
}
-// notifierKey is used to store a notifier within the connection context.
-type notifierKey struct{}
-
-// Notifier is tight to a RPC connection that supports subscriptions.
-// Server callbacks use the notifier to send notifications.
-type Notifier struct {
- codec ServerCodec
- subMu sync.Mutex
- active map[ID]*Subscription
- inactive map[ID]*Subscription
- buffer map[ID][]interface{} // unsent notifications of inactive subscriptions
-}
-
-// newNotifier creates a new notifier that can be used to send subscription
-// notifications to the client.
-func newNotifier(codec ServerCodec) *Notifier {
- return &Notifier{
- codec: codec,
- active: make(map[ID]*Subscription),
- inactive: make(map[ID]*Subscription),
- buffer: make(map[ID][]interface{}),
+func encodeID(b []byte) ID {
+ id := hex.EncodeToString(b)
+ id = strings.TrimLeft(id, "0")
+ if id == "" {
+ id = "0" // ID's are RPC quantities, no leading zero's and 0 is 0x0.
}
+ return ID("0x" + id)
}
+type notifierKey struct{}
+
// NotifierFromContext returns the Notifier value stored in ctx, if any.
func NotifierFromContext(ctx context.Context) (*Notifier, bool) {
n, ok := ctx.Value(notifierKey{}).(*Notifier)
return n, ok
}
+// Notifier is tied to a RPC connection that supports subscriptions.
+// Server callbacks use the notifier to send notifications.
+type Notifier struct {
+ h *handler
+ namespace string
+
+ mu sync.Mutex
+ sub *Subscription
+ buffer []json.RawMessage
+ callReturned bool
+ activated bool
+}
+
// CreateSubscription returns a new subscription that is coupled to the
// RPC connection. By default subscriptions are inactive and notifications
// are dropped until the subscription is marked as active. This is done
// by the RPC server after the subscription ID is send to the client.
func (n *Notifier) CreateSubscription() *Subscription {
- s := &Subscription{ID: NewID(), err: make(chan error)}
- n.subMu.Lock()
- n.inactive[s.ID] = s
- n.subMu.Unlock()
- return s
+ n.mu.Lock()
+ defer n.mu.Unlock()
+
+ if n.sub != nil {
+ panic("can't create multiple subscriptions with Notifier")
+ } else if n.callReturned {
+ panic("can't create subscription after subscribe call has returned")
+ }
+ n.sub = &Subscription{ID: n.h.idgen(), namespace: n.namespace, err: make(chan error, 1)}
+ return n.sub
}
// Notify sends a notification to the client with the given data as payload.
// If an error occurs the RPC connection is closed and the error is returned.
func (n *Notifier) Notify(id ID, data interface{}) error {
- n.subMu.Lock()
- defer n.subMu.Unlock()
+ enc, err := json.Marshal(data)
+ if err != nil {
+ return err
+ }
- if sub, active := n.active[id]; active {
- n.send(sub, data)
- } else {
- n.buffer[id] = append(n.buffer[id], data)
+ n.mu.Lock()
+ defer n.mu.Unlock()
+
+ if n.sub == nil {
+ panic("can't Notify before subscription is created")
+ } else if n.sub.ID != id {
+ panic("Notify with wrong ID")
+ }
+ if n.activated {
+ return n.send(n.sub, enc)
}
+ n.buffer = append(n.buffer, enc)
return nil
}
-func (n *Notifier) send(sub *Subscription, data interface{}) error {
- notification := n.codec.CreateNotification(string(sub.ID), sub.namespace, data)
- err := n.codec.Write(notification)
- if err != nil {
- n.codec.Close()
+// Closed returns a channel that is closed when the RPC connection is closed.
+// Deprecated: use subscription error channel
+func (n *Notifier) Closed() <-chan interface{} {
+ return n.h.conn.Closed()
+}
+
+// takeSubscription returns the subscription (if one has been created). No subscription can
+// be created after this call.
+func (n *Notifier) takeSubscription() *Subscription {
+ n.mu.Lock()
+ defer n.mu.Unlock()
+ n.callReturned = true
+ return n.sub
+}
+
+// acticate is called after the subscription ID was sent to client. Notifications are
+// buffered before activation. This prevents notifications being sent to the client before
+// the subscription ID is sent to the client.
+func (n *Notifier) activate() error {
+ n.mu.Lock()
+ defer n.mu.Unlock()
+
+ for _, data := range n.buffer {
+ if err := n.send(n.sub, data); err != nil {
+ return err
+ }
}
- return err
+ n.activated = true
+ return nil
}
-// Closed returns a channel that is closed when the RPC connection is closed.
-func (n *Notifier) Closed() <-chan interface{} {
- return n.codec.Closed()
-}
-
-// unsubscribe a subscription.
-// If the subscription could not be found ErrSubscriptionNotFound is returned.
-func (n *Notifier) unsubscribe(id ID) error {
- n.subMu.Lock()
- defer n.subMu.Unlock()
- if s, found := n.active[id]; found {
- close(s.err)
- delete(n.active, id)
- return nil
+func (n *Notifier) send(sub *Subscription, data json.RawMessage) error {
+ params, _ := json.Marshal(&subscriptionResult{ID: string(sub.ID), Result: data})
+ ctx := context.Background()
+ return n.h.conn.Write(ctx, &jsonrpcMessage{
+ Version: vsn,
+ Method: n.namespace + notificationMethodSuffix,
+ Params: params,
+ })
+}
+
+// A Subscription is created by a notifier and tight to that notifier. The client can use
+// this subscription to wait for an unsubscribe request for the client, see Err().
+type Subscription struct {
+ ID ID
+ namespace string
+ err chan error // closed on unsubscribe
+}
+
+// Err returns a channel that is closed when the client send an unsubscribe request.
+func (s *Subscription) Err() <-chan error {
+ return s.err
+}
+
+// MarshalJSON marshals a subscription as its ID.
+func (s *Subscription) MarshalJSON() ([]byte, error) {
+ return json.Marshal(s.ID)
+}
+
+// ClientSubscription is a subscription established through the Client's Subscribe or
+// EthSubscribe methods.
+type ClientSubscription struct {
+ client *Client
+ etype reflect.Type
+ channel reflect.Value
+ namespace string
+ subid string
+ in chan json.RawMessage
+
+ quitOnce sync.Once // ensures quit is closed once
+ quit chan struct{} // quit is closed when the subscription exits
+ errOnce sync.Once // ensures err is closed once
+ err chan error
+}
+
+func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription {
+ sub := &ClientSubscription{
+ client: c,
+ namespace: namespace,
+ etype: channel.Type().Elem(),
+ channel: channel,
+ quit: make(chan struct{}),
+ err: make(chan error, 1),
+ in: make(chan json.RawMessage),
}
- return ErrSubscriptionNotFound
-}
-
-// activate enables a subscription. Until a subscription is enabled all
-// notifications are dropped. This method is called by the RPC server after
-// the subscription ID was sent to client. This prevents notifications being
-// send to the client before the subscription ID is send to the client.
-func (n *Notifier) activate(id ID, namespace string) {
- n.subMu.Lock()
- defer n.subMu.Unlock()
-
- if sub, found := n.inactive[id]; found {
- sub.namespace = namespace
- n.active[id] = sub
- delete(n.inactive, id)
- // Send buffered notifications.
- for _, data := range n.buffer[id] {
- n.send(sub, data)
+ return sub
+}
+
+// Err returns the subscription error channel. The intended use of Err is to schedule
+// resubscription when the client connection is closed unexpectedly.
+//
+// The error channel receives a value when the subscription has ended due
+// to an error. The received error is nil if Close has been called
+// on the underlying client and no other error has occurred.
+//
+// The error channel is closed when Unsubscribe is called on the subscription.
+func (sub *ClientSubscription) Err() <-chan error {
+ return sub.err
+}
+
+// Unsubscribe unsubscribes the notification and closes the error channel.
+// It can safely be called more than once.
+func (sub *ClientSubscription) Unsubscribe() {
+ sub.quitWithError(nil, true)
+ sub.errOnce.Do(func() { close(sub.err) })
+}
+
+func (sub *ClientSubscription) quitWithError(err error, unsubscribeServer bool) {
+ sub.quitOnce.Do(func() {
+ // The dispatch loop won't be able to execute the unsubscribe call
+ // if it is blocked on deliver. Close sub.quit first because it
+ // unblocks deliver.
+ close(sub.quit)
+ if unsubscribeServer {
+ sub.requestUnsubscribe()
+ }
+ if err != nil {
+ if err == ErrClientQuit {
+ err = nil // Adhere to subscription semantics.
+ }
+ sub.err <- err
}
- delete(n.buffer, id)
+ })
+}
+
+func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) {
+ select {
+ case sub.in <- result:
+ return true
+ case <-sub.quit:
+ return false
}
}
+
+func (sub *ClientSubscription) start() {
+ sub.quitWithError(sub.forward())
+}
+
+func (sub *ClientSubscription) forward() (err error, unsubscribeServer bool) {
+ cases := []reflect.SelectCase{
+ {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)},
+ {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.in)},
+ {Dir: reflect.SelectSend, Chan: sub.channel},
+ }
+ buffer := list.New()
+ defer buffer.Init()
+ for {
+ var chosen int
+ var recv reflect.Value
+ if buffer.Len() == 0 {
+ // Idle, omit send case.
+ chosen, recv, _ = reflect.Select(cases[:2])
+ } else {
+ // Non-empty buffer, send the first queued item.
+ cases[2].Send = reflect.ValueOf(buffer.Front().Value)
+ chosen, recv, _ = reflect.Select(cases)
+ }
+
+ switch chosen {
+ case 0: // <-sub.quit
+ return nil, false
+ case 1: // <-sub.in
+ val, err := sub.unmarshal(recv.Interface().(json.RawMessage))
+ if err != nil {
+ return err, true
+ }
+ if buffer.Len() == maxClientSubscriptionBuffer {
+ return ErrSubscriptionQueueOverflow, true
+ }
+ buffer.PushBack(val)
+ case 2: // sub.channel<-
+ cases[2].Send = reflect.Value{} // Don't hold onto the value.
+ buffer.Remove(buffer.Front())
+ }
+ }
+}
+
+func (sub *ClientSubscription) unmarshal(result json.RawMessage) (interface{}, error) {
+ val := reflect.New(sub.etype)
+ err := json.Unmarshal(result, val.Interface())
+ return val.Elem().Interface(), err
+}
+
+func (sub *ClientSubscription) requestUnsubscribe() error {
+ var result interface{}
+ return sub.client.Call(&result, sub.namespace+unsubscribeMethodSuffix, sub.subid)
+}