diff options
author | Felix Lange <fjl@users.noreply.github.com> | 2019-02-04 20:47:34 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-02-04 20:47:34 +0800 |
commit | 245f3146c26698193c4b479e7bc5825b058c444a (patch) | |
tree | c1196f7579e99e89e3e38cd2c7e442ef49a95731 /rpc/subscription.go | |
parent | ec3432bccbb058567c0ea3f1e6537460f1f0aa29 (diff) | |
download | go-tangerine-245f3146c26698193c4b479e7bc5825b058c444a.tar go-tangerine-245f3146c26698193c4b479e7bc5825b058c444a.tar.gz go-tangerine-245f3146c26698193c4b479e7bc5825b058c444a.tar.bz2 go-tangerine-245f3146c26698193c4b479e7bc5825b058c444a.tar.lz go-tangerine-245f3146c26698193c4b479e7bc5825b058c444a.tar.xz go-tangerine-245f3146c26698193c4b479e7bc5825b058c444a.tar.zst go-tangerine-245f3146c26698193c4b479e7bc5825b058c444a.zip |
rpc: implement full bi-directional communication (#18471)
New APIs added:
client.RegisterName(namespace, service) // makes service available to server
client.Notify(ctx, method, args...) // sends a notification
ClientFromContext(ctx) // to get a client in handler method
This is essentially a rewrite of the server-side code. JSON-RPC
processing code is now the same on both server and client side. Many
minor issues were fixed in the process and there is a new test suite for
JSON-RPC spec compliance (and non-compliance in some cases).
List of behavior changes:
- Method handlers are now called with a per-request context instead of a
per-connection context. The context is canceled right after the method
returns.
- Subscription error channels are always closed when the connection
ends. There is no need to also wait on the Notifier's Closed channel
to detect whether the subscription has ended.
- Client now omits "params" instead of sending "params": null when there
are no arguments to a call. The previous behavior was not compliant
with the spec. The server still accepts "params": null.
- Floating point numbers are allowed as "id". The spec doesn't allow
them, but we handle request "id" as json.RawMessage and guarantee that
the same number will be sent back.
- Logging is improved significantly. There is now a message at DEBUG
level for each RPC call served.
Diffstat (limited to 'rpc/subscription.go')
-rw-r--r-- | rpc/subscription.go | 338 |
1 files changed, 258 insertions, 80 deletions
diff --git a/rpc/subscription.go b/rpc/subscription.go index 6bbb6f75d..c1e869b8a 100644 --- a/rpc/subscription.go +++ b/rpc/subscription.go @@ -17,9 +17,19 @@ package rpc import ( + "bufio" + "container/list" "context" + crand "crypto/rand" + "encoding/binary" + "encoding/hex" + "encoding/json" "errors" + "math/rand" + "reflect" + "strings" "sync" + "time" ) var ( @@ -29,121 +39,289 @@ var ( ErrSubscriptionNotFound = errors.New("subscription not found") ) +var globalGen = randomIDGenerator() + // ID defines a pseudo random number that is used to identify RPC subscriptions. type ID string -// a Subscription is created by a notifier and tight to that notifier. The client can use -// this subscription to wait for an unsubscribe request for the client, see Err(). -type Subscription struct { - ID ID - namespace string - err chan error // closed on unsubscribe +// NewID returns a new, random ID. +func NewID() ID { + return globalGen() } -// Err returns a channel that is closed when the client send an unsubscribe request. -func (s *Subscription) Err() <-chan error { - return s.err +// randomIDGenerator returns a function generates a random IDs. +func randomIDGenerator() func() ID { + seed, err := binary.ReadVarint(bufio.NewReader(crand.Reader)) + if err != nil { + seed = int64(time.Now().Nanosecond()) + } + var ( + mu sync.Mutex + rng = rand.New(rand.NewSource(seed)) + ) + return func() ID { + mu.Lock() + defer mu.Unlock() + id := make([]byte, 16) + rng.Read(id) + return encodeID(id) + } } -// notifierKey is used to store a notifier within the connection context. -type notifierKey struct{} - -// Notifier is tight to a RPC connection that supports subscriptions. -// Server callbacks use the notifier to send notifications. -type Notifier struct { - codec ServerCodec - subMu sync.Mutex - active map[ID]*Subscription - inactive map[ID]*Subscription - buffer map[ID][]interface{} // unsent notifications of inactive subscriptions -} - -// newNotifier creates a new notifier that can be used to send subscription -// notifications to the client. -func newNotifier(codec ServerCodec) *Notifier { - return &Notifier{ - codec: codec, - active: make(map[ID]*Subscription), - inactive: make(map[ID]*Subscription), - buffer: make(map[ID][]interface{}), +func encodeID(b []byte) ID { + id := hex.EncodeToString(b) + id = strings.TrimLeft(id, "0") + if id == "" { + id = "0" // ID's are RPC quantities, no leading zero's and 0 is 0x0. } + return ID("0x" + id) } +type notifierKey struct{} + // NotifierFromContext returns the Notifier value stored in ctx, if any. func NotifierFromContext(ctx context.Context) (*Notifier, bool) { n, ok := ctx.Value(notifierKey{}).(*Notifier) return n, ok } +// Notifier is tied to a RPC connection that supports subscriptions. +// Server callbacks use the notifier to send notifications. +type Notifier struct { + h *handler + namespace string + + mu sync.Mutex + sub *Subscription + buffer []json.RawMessage + callReturned bool + activated bool +} + // CreateSubscription returns a new subscription that is coupled to the // RPC connection. By default subscriptions are inactive and notifications // are dropped until the subscription is marked as active. This is done // by the RPC server after the subscription ID is send to the client. func (n *Notifier) CreateSubscription() *Subscription { - s := &Subscription{ID: NewID(), err: make(chan error)} - n.subMu.Lock() - n.inactive[s.ID] = s - n.subMu.Unlock() - return s + n.mu.Lock() + defer n.mu.Unlock() + + if n.sub != nil { + panic("can't create multiple subscriptions with Notifier") + } else if n.callReturned { + panic("can't create subscription after subscribe call has returned") + } + n.sub = &Subscription{ID: n.h.idgen(), namespace: n.namespace, err: make(chan error, 1)} + return n.sub } // Notify sends a notification to the client with the given data as payload. // If an error occurs the RPC connection is closed and the error is returned. func (n *Notifier) Notify(id ID, data interface{}) error { - n.subMu.Lock() - defer n.subMu.Unlock() + enc, err := json.Marshal(data) + if err != nil { + return err + } - if sub, active := n.active[id]; active { - n.send(sub, data) - } else { - n.buffer[id] = append(n.buffer[id], data) + n.mu.Lock() + defer n.mu.Unlock() + + if n.sub == nil { + panic("can't Notify before subscription is created") + } else if n.sub.ID != id { + panic("Notify with wrong ID") + } + if n.activated { + return n.send(n.sub, enc) } + n.buffer = append(n.buffer, enc) return nil } -func (n *Notifier) send(sub *Subscription, data interface{}) error { - notification := n.codec.CreateNotification(string(sub.ID), sub.namespace, data) - err := n.codec.Write(notification) - if err != nil { - n.codec.Close() +// Closed returns a channel that is closed when the RPC connection is closed. +// Deprecated: use subscription error channel +func (n *Notifier) Closed() <-chan interface{} { + return n.h.conn.Closed() +} + +// takeSubscription returns the subscription (if one has been created). No subscription can +// be created after this call. +func (n *Notifier) takeSubscription() *Subscription { + n.mu.Lock() + defer n.mu.Unlock() + n.callReturned = true + return n.sub +} + +// acticate is called after the subscription ID was sent to client. Notifications are +// buffered before activation. This prevents notifications being sent to the client before +// the subscription ID is sent to the client. +func (n *Notifier) activate() error { + n.mu.Lock() + defer n.mu.Unlock() + + for _, data := range n.buffer { + if err := n.send(n.sub, data); err != nil { + return err + } } - return err + n.activated = true + return nil } -// Closed returns a channel that is closed when the RPC connection is closed. -func (n *Notifier) Closed() <-chan interface{} { - return n.codec.Closed() -} - -// unsubscribe a subscription. -// If the subscription could not be found ErrSubscriptionNotFound is returned. -func (n *Notifier) unsubscribe(id ID) error { - n.subMu.Lock() - defer n.subMu.Unlock() - if s, found := n.active[id]; found { - close(s.err) - delete(n.active, id) - return nil +func (n *Notifier) send(sub *Subscription, data json.RawMessage) error { + params, _ := json.Marshal(&subscriptionResult{ID: string(sub.ID), Result: data}) + ctx := context.Background() + return n.h.conn.Write(ctx, &jsonrpcMessage{ + Version: vsn, + Method: n.namespace + notificationMethodSuffix, + Params: params, + }) +} + +// A Subscription is created by a notifier and tight to that notifier. The client can use +// this subscription to wait for an unsubscribe request for the client, see Err(). +type Subscription struct { + ID ID + namespace string + err chan error // closed on unsubscribe +} + +// Err returns a channel that is closed when the client send an unsubscribe request. +func (s *Subscription) Err() <-chan error { + return s.err +} + +// MarshalJSON marshals a subscription as its ID. +func (s *Subscription) MarshalJSON() ([]byte, error) { + return json.Marshal(s.ID) +} + +// ClientSubscription is a subscription established through the Client's Subscribe or +// EthSubscribe methods. +type ClientSubscription struct { + client *Client + etype reflect.Type + channel reflect.Value + namespace string + subid string + in chan json.RawMessage + + quitOnce sync.Once // ensures quit is closed once + quit chan struct{} // quit is closed when the subscription exits + errOnce sync.Once // ensures err is closed once + err chan error +} + +func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription { + sub := &ClientSubscription{ + client: c, + namespace: namespace, + etype: channel.Type().Elem(), + channel: channel, + quit: make(chan struct{}), + err: make(chan error, 1), + in: make(chan json.RawMessage), } - return ErrSubscriptionNotFound -} - -// activate enables a subscription. Until a subscription is enabled all -// notifications are dropped. This method is called by the RPC server after -// the subscription ID was sent to client. This prevents notifications being -// send to the client before the subscription ID is send to the client. -func (n *Notifier) activate(id ID, namespace string) { - n.subMu.Lock() - defer n.subMu.Unlock() - - if sub, found := n.inactive[id]; found { - sub.namespace = namespace - n.active[id] = sub - delete(n.inactive, id) - // Send buffered notifications. - for _, data := range n.buffer[id] { - n.send(sub, data) + return sub +} + +// Err returns the subscription error channel. The intended use of Err is to schedule +// resubscription when the client connection is closed unexpectedly. +// +// The error channel receives a value when the subscription has ended due +// to an error. The received error is nil if Close has been called +// on the underlying client and no other error has occurred. +// +// The error channel is closed when Unsubscribe is called on the subscription. +func (sub *ClientSubscription) Err() <-chan error { + return sub.err +} + +// Unsubscribe unsubscribes the notification and closes the error channel. +// It can safely be called more than once. +func (sub *ClientSubscription) Unsubscribe() { + sub.quitWithError(nil, true) + sub.errOnce.Do(func() { close(sub.err) }) +} + +func (sub *ClientSubscription) quitWithError(err error, unsubscribeServer bool) { + sub.quitOnce.Do(func() { + // The dispatch loop won't be able to execute the unsubscribe call + // if it is blocked on deliver. Close sub.quit first because it + // unblocks deliver. + close(sub.quit) + if unsubscribeServer { + sub.requestUnsubscribe() + } + if err != nil { + if err == ErrClientQuit { + err = nil // Adhere to subscription semantics. + } + sub.err <- err } - delete(n.buffer, id) + }) +} + +func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) { + select { + case sub.in <- result: + return true + case <-sub.quit: + return false } } + +func (sub *ClientSubscription) start() { + sub.quitWithError(sub.forward()) +} + +func (sub *ClientSubscription) forward() (err error, unsubscribeServer bool) { + cases := []reflect.SelectCase{ + {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)}, + {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.in)}, + {Dir: reflect.SelectSend, Chan: sub.channel}, + } + buffer := list.New() + defer buffer.Init() + for { + var chosen int + var recv reflect.Value + if buffer.Len() == 0 { + // Idle, omit send case. + chosen, recv, _ = reflect.Select(cases[:2]) + } else { + // Non-empty buffer, send the first queued item. + cases[2].Send = reflect.ValueOf(buffer.Front().Value) + chosen, recv, _ = reflect.Select(cases) + } + + switch chosen { + case 0: // <-sub.quit + return nil, false + case 1: // <-sub.in + val, err := sub.unmarshal(recv.Interface().(json.RawMessage)) + if err != nil { + return err, true + } + if buffer.Len() == maxClientSubscriptionBuffer { + return ErrSubscriptionQueueOverflow, true + } + buffer.PushBack(val) + case 2: // sub.channel<- + cases[2].Send = reflect.Value{} // Don't hold onto the value. + buffer.Remove(buffer.Front()) + } + } +} + +func (sub *ClientSubscription) unmarshal(result json.RawMessage) (interface{}, error) { + val := reflect.New(sub.etype) + err := json.Unmarshal(result, val.Interface()) + return val.Elem().Interface(), err +} + +func (sub *ClientSubscription) requestUnsubscribe() error { + var result interface{} + return sub.client.Call(&result, sub.namespace+unsubscribeMethodSuffix, sub.subid) +} |