aboutsummaryrefslogtreecommitdiffstats
path: root/packages/connect/src/ws_orderbook_channel.ts
blob: 4a9d4058f7f9ab6a71018c963d6bf17f99a6fb7a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import { assert } from '@0xproject/assert';
import { schemas } from '@0xproject/json-schemas';
import * as _ from 'lodash';
import * as WebSocket from 'websocket';

import { schemas as clientSchemas } from './schemas/schemas';
import {
    OrderbookChannel,
    OrderbookChannelHandler,
    OrderbookChannelMessageTypes,
    OrderbookChannelSubscriptionOpts,
    WebsocketClientEventType,
    WebsocketConnectionEventType,
    WebSocketOrderbookChannelConfig,
} from './types';
import { orderbookChannelMessageParser } from './utils/orderbook_channel_message_parser';

const DEFAULT_HEARTBEAT_INTERVAL_MS = 15000;
const MINIMUM_HEARTBEAT_INTERVAL_MS = 10;

/**
 * This class includes all the functionality related to interacting with a websocket endpoint
 * that implements the standard relayer API v0
 */
export class WebSocketOrderbookChannel implements OrderbookChannel {
    private _apiEndpointUrl: string;
    private _client: WebSocket.client;
    private _connectionIfExists?: WebSocket.connection;
    private _heartbeatTimerIfExists?: NodeJS.Timer;
    private _subscriptionCounter = 0;
    private _heartbeatIntervalMs: number;
    /**
     * Instantiates a new WebSocketOrderbookChannel instance
     * @param   url                 The relayer API base WS url you would like to interact with
     * @param   config              The configuration object. Look up the type for the description.
     * @return  An instance of WebSocketOrderbookChannel
     */
    constructor(url: string, config?: WebSocketOrderbookChannelConfig) {
        assert.isUri('url', url);
        if (!_.isUndefined(config)) {
            assert.doesConformToSchema('config', config, clientSchemas.webSocketOrderbookChannelConfigSchema);
        }
        this._apiEndpointUrl = url;
        this._heartbeatIntervalMs =
            _.isUndefined(config) || _.isUndefined(config.heartbeatIntervalMs)
                ? DEFAULT_HEARTBEAT_INTERVAL_MS
                : config.heartbeatIntervalMs;
        this._client = new WebSocket.client();
    }
    /**
     * Subscribe to orderbook snapshots and updates from the websocket
     * @param   subscriptionOpts     An OrderbookChannelSubscriptionOpts instance describing which
     *                               token pair to subscribe to
     * @param   handler              An OrderbookChannelHandler instance that responds to various
     *                               channel updates
     */
    public subscribe(subscriptionOpts: OrderbookChannelSubscriptionOpts, handler: OrderbookChannelHandler): void {
        assert.doesConformToSchema(
            'subscriptionOpts',
            subscriptionOpts,
            schemas.relayerApiOrderbookChannelSubscribePayload,
        );
        assert.isFunction('handler.onSnapshot', _.get(handler, 'onSnapshot'));
        assert.isFunction('handler.onUpdate', _.get(handler, 'onUpdate'));
        assert.isFunction('handler.onError', _.get(handler, 'onError'));
        assert.isFunction('handler.onClose', _.get(handler, 'onClose'));
        this._subscriptionCounter += 1;
        const subscribeMessage = {
            type: 'subscribe',
            channel: 'orderbook',
            requestId: this._subscriptionCounter,
            payload: subscriptionOpts,
        };
        this._getConnection((error, connection) => {
            if (!_.isUndefined(error)) {
                handler.onError(this, subscriptionOpts, error);
            } else if (!_.isUndefined(connection) && connection.connected) {
                connection.on(WebsocketConnectionEventType.Error, wsError => {
                    handler.onError(this, subscriptionOpts, wsError);
                });
                connection.on(WebsocketConnectionEventType.Close, (code: number, desc: string) => {
                    handler.onClose(this, subscriptionOpts);
                });
                connection.on(WebsocketConnectionEventType.Message, message => {
                    this._handleWebSocketMessage(subscribeMessage.requestId, subscriptionOpts, message, handler);
                });
                connection.sendUTF(JSON.stringify(subscribeMessage));
            }
        });
    }
    /**
     * Close the websocket and stop receiving updates
     */
    public close(): void {
        if (!_.isUndefined(this._connectionIfExists)) {
            this._connectionIfExists.close();
        }
        if (!_.isUndefined(this._heartbeatTimerIfExists)) {
            clearInterval(this._heartbeatTimerIfExists);
        }
    }
    private _getConnection(callback: (error?: Error, connection?: WebSocket.connection) => void): void {
        if (!_.isUndefined(this._connectionIfExists) && this._connectionIfExists.connected) {
            callback(undefined, this._connectionIfExists);
        } else {
            this._client.on(WebsocketClientEventType.Connect, connection => {
                this._connectionIfExists = connection;
                if (this._heartbeatIntervalMs >= MINIMUM_HEARTBEAT_INTERVAL_MS) {
                    this._heartbeatTimerIfExists = setInterval(() => {
                        connection.ping('');
                    }, this._heartbeatIntervalMs);
                } else {
                    callback(
                        new Error(
                            `Heartbeat interval is ${
                                this._heartbeatIntervalMs
                            }ms which is less than the required minimum of ${MINIMUM_HEARTBEAT_INTERVAL_MS}ms`,
                        ),
                        undefined,
                    );
                }
                callback(undefined, this._connectionIfExists);
            });
            this._client.on(WebsocketClientEventType.ConnectFailed, error => {
                callback(error, undefined);
            });
            this._client.connect(this._apiEndpointUrl);
        }
    }
    private _handleWebSocketMessage(
        requestId: number,
        subscriptionOpts: OrderbookChannelSubscriptionOpts,
        message: WebSocket.IMessage,
        handler: OrderbookChannelHandler,
    ): void {
        if (!_.isUndefined(message.utf8Data)) {
            try {
                const utf8Data = message.utf8Data;
                const parserResult = orderbookChannelMessageParser.parse(utf8Data);
                if (parserResult.requestId === requestId) {
                    switch (parserResult.type) {
                        case OrderbookChannelMessageTypes.Snapshot: {
                            handler.onSnapshot(this, subscriptionOpts, parserResult.payload);
                            break;
                        }
                        case OrderbookChannelMessageTypes.Update: {
                            handler.onUpdate(this, subscriptionOpts, parserResult.payload);
                            break;
                        }
                        default: {
                            handler.onError(
                                this,
                                subscriptionOpts,
                                new Error(`Message has missing a type parameter: ${utf8Data}`),
                            );
                        }
                    }
                }
            } catch (error) {
                handler.onError(this, subscriptionOpts, error);
            }
        } else {
            handler.onError(this, subscriptionOpts, new Error(`Message does not contain utf8Data`));
        }
    }
}