aboutsummaryrefslogblamecommitdiffstats
path: root/rpc/subscription.go
blob: c1e869b8a3a462f31b032e4857ab7bc3e56cdec4 (plain) (tree)


















                                                                                  

                        
                 



                           
                


                   
              
              








                                                                                                    

                                   
                                                                                

              


                                  

 
















                                                                     

 




                                                                                    
         
                            

 

                         





                                                                        












                                                                    




                                                                          









                                                                                                




                                                                             



                                      
 









                                                                    
         
                                        


                  

























                                                                                           
         

                          

 




















































                                                                                                    
         



































                                                                                     
                 








                                                                          

         





















































                                                                                         
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package rpc

import (
    "bufio"
    "container/list"
    "context"
    crand "crypto/rand"
    "encoding/binary"
    "encoding/hex"
    "encoding/json"
    "errors"
    "math/rand"
    "reflect"
    "strings"
    "sync"
    "time"
)

var (
    // ErrNotificationsUnsupported is returned when the connection doesn't support notifications
    ErrNotificationsUnsupported = errors.New("notifications not supported")
    // ErrNotificationNotFound is returned when the notification for the given id is not found
    ErrSubscriptionNotFound = errors.New("subscription not found")
)

var globalGen = randomIDGenerator()

// ID defines a pseudo random number that is used to identify RPC subscriptions.
type ID string

// NewID returns a new, random ID.
func NewID() ID {
    return globalGen()
}

// randomIDGenerator returns a function generates a random IDs.
func randomIDGenerator() func() ID {
    seed, err := binary.ReadVarint(bufio.NewReader(crand.Reader))
    if err != nil {
        seed = int64(time.Now().Nanosecond())
    }
    var (
        mu  sync.Mutex
        rng = rand.New(rand.NewSource(seed))
    )
    return func() ID {
        mu.Lock()
        defer mu.Unlock()
        id := make([]byte, 16)
        rng.Read(id)
        return encodeID(id)
    }
}

func encodeID(b []byte) ID {
    id := hex.EncodeToString(b)
    id = strings.TrimLeft(id, "0")
    if id == "" {
        id = "0" // ID's are RPC quantities, no leading zero's and 0 is 0x0.
    }
    return ID("0x" + id)
}

type notifierKey struct{}

// NotifierFromContext returns the Notifier value stored in ctx, if any.
func NotifierFromContext(ctx context.Context) (*Notifier, bool) {
    n, ok := ctx.Value(notifierKey{}).(*Notifier)
    return n, ok
}

// Notifier is tied to a RPC connection that supports subscriptions.
// Server callbacks use the notifier to send notifications.
type Notifier struct {
    h         *handler
    namespace string

    mu           sync.Mutex
    sub          *Subscription
    buffer       []json.RawMessage
    callReturned bool
    activated    bool
}

// CreateSubscription returns a new subscription that is coupled to the
// RPC connection. By default subscriptions are inactive and notifications
// are dropped until the subscription is marked as active. This is done
// by the RPC server after the subscription ID is send to the client.
func (n *Notifier) CreateSubscription() *Subscription {
    n.mu.Lock()
    defer n.mu.Unlock()

    if n.sub != nil {
        panic("can't create multiple subscriptions with Notifier")
    } else if n.callReturned {
        panic("can't create subscription after subscribe call has returned")
    }
    n.sub = &Subscription{ID: n.h.idgen(), namespace: n.namespace, err: make(chan error, 1)}
    return n.sub
}

// Notify sends a notification to the client with the given data as payload.
// If an error occurs the RPC connection is closed and the error is returned.
func (n *Notifier) Notify(id ID, data interface{}) error {
    enc, err := json.Marshal(data)
    if err != nil {
        return err
    }

    n.mu.Lock()
    defer n.mu.Unlock()

    if n.sub == nil {
        panic("can't Notify before subscription is created")
    } else if n.sub.ID != id {
        panic("Notify with wrong ID")
    }
    if n.activated {
        return n.send(n.sub, enc)
    }
    n.buffer = append(n.buffer, enc)
    return nil
}

// Closed returns a channel that is closed when the RPC connection is closed.
// Deprecated: use subscription error channel
func (n *Notifier) Closed() <-chan interface{} {
    return n.h.conn.Closed()
}

// takeSubscription returns the subscription (if one has been created). No subscription can
// be created after this call.
func (n *Notifier) takeSubscription() *Subscription {
    n.mu.Lock()
    defer n.mu.Unlock()
    n.callReturned = true
    return n.sub
}

// acticate is called after the subscription ID was sent to client. Notifications are
// buffered before activation. This prevents notifications being sent to the client before
// the subscription ID is sent to the client.
func (n *Notifier) activate() error {
    n.mu.Lock()
    defer n.mu.Unlock()

    for _, data := range n.buffer {
        if err := n.send(n.sub, data); err != nil {
            return err
        }
    }
    n.activated = true
    return nil
}

func (n *Notifier) send(sub *Subscription, data json.RawMessage) error {
    params, _ := json.Marshal(&subscriptionResult{ID: string(sub.ID), Result: data})
    ctx := context.Background()
    return n.h.conn.Write(ctx, &jsonrpcMessage{
        Version: vsn,
        Method:  n.namespace + notificationMethodSuffix,
        Params:  params,
    })
}

// A Subscription is created by a notifier and tight to that notifier. The client can use
// this subscription to wait for an unsubscribe request for the client, see Err().
type Subscription struct {
    ID        ID
    namespace string
    err       chan error // closed on unsubscribe
}

// Err returns a channel that is closed when the client send an unsubscribe request.
func (s *Subscription) Err() <-chan error {
    return s.err
}

// MarshalJSON marshals a subscription as its ID.
func (s *Subscription) MarshalJSON() ([]byte, error) {
    return json.Marshal(s.ID)
}

// ClientSubscription is a subscription established through the Client's Subscribe or
// EthSubscribe methods.
type ClientSubscription struct {
    client    *Client
    etype     reflect.Type
    channel   reflect.Value
    namespace string
    subid     string
    in        chan json.RawMessage

    quitOnce sync.Once     // ensures quit is closed once
    quit     chan struct{} // quit is closed when the subscription exits
    errOnce  sync.Once     // ensures err is closed once
    err      chan error
}

func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription {
    sub := &ClientSubscription{
        client:    c,
        namespace: namespace,
        etype:     channel.Type().Elem(),
        channel:   channel,
        quit:      make(chan struct{}),
        err:       make(chan error, 1),
        in:        make(chan json.RawMessage),
    }
    return sub
}

// Err returns the subscription error channel. The intended use of Err is to schedule
// resubscription when the client connection is closed unexpectedly.
//
// The error channel receives a value when the subscription has ended due
// to an error. The received error is nil if Close has been called
// on the underlying client and no other error has occurred.
//
// The error channel is closed when Unsubscribe is called on the subscription.
func (sub *ClientSubscription) Err() <-chan error {
    return sub.err
}

// Unsubscribe unsubscribes the notification and closes the error channel.
// It can safely be called more than once.
func (sub *ClientSubscription) Unsubscribe() {
    sub.quitWithError(nil, true)
    sub.errOnce.Do(func() { close(sub.err) })
}

func (sub *ClientSubscription) quitWithError(err error, unsubscribeServer bool) {
    sub.quitOnce.Do(func() {
        // The dispatch loop won't be able to execute the unsubscribe call
        // if it is blocked on deliver. Close sub.quit first because it
        // unblocks deliver.
        close(sub.quit)
        if unsubscribeServer {
            sub.requestUnsubscribe()
        }
        if err != nil {
            if err == ErrClientQuit {
                err = nil // Adhere to subscription semantics.
            }
            sub.err <- err
        }
    })
}

func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) {
    select {
    case sub.in <- result:
        return true
    case <-sub.quit:
        return false
    }
}

func (sub *ClientSubscription) start() {
    sub.quitWithError(sub.forward())
}

func (sub *ClientSubscription) forward() (err error, unsubscribeServer bool) {
    cases := []reflect.SelectCase{
        {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)},
        {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.in)},
        {Dir: reflect.SelectSend, Chan: sub.channel},
    }
    buffer := list.New()
    defer buffer.Init()
    for {
        var chosen int
        var recv reflect.Value
        if buffer.Len() == 0 {
            // Idle, omit send case.
            chosen, recv, _ = reflect.Select(cases[:2])
        } else {
            // Non-empty buffer, send the first queued item.
            cases[2].Send = reflect.ValueOf(buffer.Front().Value)
            chosen, recv, _ = reflect.Select(cases)
        }

        switch chosen {
        case 0: // <-sub.quit
            return nil, false
        case 1: // <-sub.in
            val, err := sub.unmarshal(recv.Interface().(json.RawMessage))
            if err != nil {
                return err, true
            }
            if buffer.Len() == maxClientSubscriptionBuffer {
                return ErrSubscriptionQueueOverflow, true
            }
            buffer.PushBack(val)
        case 2: // sub.channel<-
            cases[2].Send = reflect.Value{} // Don't hold onto the value.
            buffer.Remove(buffer.Front())
        }
    }
}

func (sub *ClientSubscription) unmarshal(result json.RawMessage) (interface{}, error) {
    val := reflect.New(sub.etype)
    err := json.Unmarshal(result, val.Interface())
    return val.Elem().Interface(), err
}

func (sub *ClientSubscription) requestUnsubscribe() error {
    var result interface{}
    return sub.client.Call(&result, sub.namespace+unsubscribeMethodSuffix, sub.subid)
}