From 16ddd1edfccdd7768447bfff9afec1f4a1ce014e Mon Sep 17 00:00:00 2001 From: Brandon Millman Date: Wed, 16 May 2018 11:15:02 -0700 Subject: Implement web browser socket --- packages/connect/src/node_ws_orderbook_channel.ts | 158 ++++++++++++++++++++++ 1 file changed, 158 insertions(+) create mode 100644 packages/connect/src/node_ws_orderbook_channel.ts (limited to 'packages/connect/src/node_ws_orderbook_channel.ts') diff --git a/packages/connect/src/node_ws_orderbook_channel.ts b/packages/connect/src/node_ws_orderbook_channel.ts new file mode 100644 index 000000000..5f61ac4c8 --- /dev/null +++ b/packages/connect/src/node_ws_orderbook_channel.ts @@ -0,0 +1,158 @@ +import * as _ from 'lodash'; +import * as WebSocket from 'websocket'; + +import { schemas as clientSchemas } from './schemas/schemas'; +import { + NodeWebSocketOrderbookChannelConfig, + OrderbookChannel, + OrderbookChannelHandler, + OrderbookChannelMessageTypes, + OrderbookChannelSubscriptionOpts, + WebsocketClientEventType, + WebsocketConnectionEventType, +} from './types'; +import { assert } from './utils/assert'; +import { orderbookChannelMessageParser } from './utils/orderbook_channel_message_parser'; + +const DEFAULT_HEARTBEAT_INTERVAL_MS = 15000; +const MINIMUM_HEARTBEAT_INTERVAL_MS = 10; + +/** + * This class includes all the functionality related to interacting with a websocket endpoint + * that implements the standard relayer API v0 in a node environment + */ +export class NodeWebSocketOrderbookChannel implements OrderbookChannel { + private _apiEndpointUrl: string; + private _client: WebSocket.client; + private _connectionIfExists?: WebSocket.connection; + private _heartbeatTimerIfExists?: NodeJS.Timer; + private _subscriptionCounter = 0; + private _heartbeatIntervalMs: number; + /** + * Instantiates a new NodeWebSocketOrderbookChannelConfig instance + * @param url The relayer API base WS url you would like to interact with + * @param config The configuration object. Look up the type for the description. + * @return An instance of NodeWebSocketOrderbookChannelConfig + */ + constructor(url: string, config?: NodeWebSocketOrderbookChannelConfig) { + assert.isUri('url', url); + if (!_.isUndefined(config)) { + assert.doesConformToSchema('config', config, clientSchemas.nodeWebSocketOrderbookChannelConfigSchema); + } + this._apiEndpointUrl = url; + this._heartbeatIntervalMs = + _.isUndefined(config) || _.isUndefined(config.heartbeatIntervalMs) + ? DEFAULT_HEARTBEAT_INTERVAL_MS + : config.heartbeatIntervalMs; + this._client = new WebSocket.client(); + } + /** + * Subscribe to orderbook snapshots and updates from the websocket + * @param subscriptionOpts An OrderbookChannelSubscriptionOpts instance describing which + * token pair to subscribe to + * @param handler An OrderbookChannelHandler instance that responds to various + * channel updates + */ + public subscribe(subscriptionOpts: OrderbookChannelSubscriptionOpts, handler: OrderbookChannelHandler): void { + assert.isOrderbookChannelSubscriptionOpts('subscriptionOpts', subscriptionOpts); + assert.isOrderbookChannelHandler('handler', handler); + this._subscriptionCounter += 1; + const subscribeMessage = { + type: 'subscribe', + channel: 'orderbook', + requestId: this._subscriptionCounter, + payload: subscriptionOpts, + }; + this._getConnection((error, connection) => { + if (!_.isUndefined(error)) { + handler.onError(this, subscriptionOpts, error); + } else if (!_.isUndefined(connection) && connection.connected) { + connection.on(WebsocketConnectionEventType.Error, wsError => { + handler.onError(this, subscriptionOpts, wsError); + }); + connection.on(WebsocketConnectionEventType.Close, (_code: number, _desc: string) => { + handler.onClose(this, subscriptionOpts); + }); + connection.on(WebsocketConnectionEventType.Message, message => { + this._handleWebSocketMessage(subscribeMessage.requestId, subscriptionOpts, message, handler); + }); + connection.sendUTF(JSON.stringify(subscribeMessage)); + } + }); + } + /** + * Close the websocket and stop receiving updates + */ + public close(): void { + if (!_.isUndefined(this._connectionIfExists)) { + this._connectionIfExists.close(); + } + if (!_.isUndefined(this._heartbeatTimerIfExists)) { + clearInterval(this._heartbeatTimerIfExists); + } + } + private _getConnection(callback: (error?: Error, connection?: WebSocket.connection) => void): void { + if (!_.isUndefined(this._connectionIfExists) && this._connectionIfExists.connected) { + callback(undefined, this._connectionIfExists); + } else { + this._client.on(WebsocketClientEventType.Connect, connection => { + this._connectionIfExists = connection; + if (this._heartbeatIntervalMs >= MINIMUM_HEARTBEAT_INTERVAL_MS) { + this._heartbeatTimerIfExists = setInterval(() => { + connection.ping(''); + }, this._heartbeatIntervalMs); + } else { + callback( + new Error( + `Heartbeat interval is ${ + this._heartbeatIntervalMs + }ms which is less than the required minimum of ${MINIMUM_HEARTBEAT_INTERVAL_MS}ms`, + ), + undefined, + ); + } + callback(undefined, this._connectionIfExists); + }); + this._client.on(WebsocketClientEventType.ConnectFailed, error => { + callback(error, undefined); + }); + this._client.connect(this._apiEndpointUrl); + } + } + private _handleWebSocketMessage( + requestId: number, + subscriptionOpts: OrderbookChannelSubscriptionOpts, + message: WebSocket.IMessage, + handler: OrderbookChannelHandler, + ): void { + if (!_.isUndefined(message.utf8Data)) { + try { + const utf8Data = message.utf8Data; + const parserResult = orderbookChannelMessageParser.parse(utf8Data); + if (parserResult.requestId === requestId) { + switch (parserResult.type) { + case OrderbookChannelMessageTypes.Snapshot: { + handler.onSnapshot(this, subscriptionOpts, parserResult.payload); + break; + } + case OrderbookChannelMessageTypes.Update: { + handler.onUpdate(this, subscriptionOpts, parserResult.payload); + break; + } + default: { + handler.onError( + this, + subscriptionOpts, + new Error(`Message has missing a type parameter: ${utf8Data}`), + ); + } + } + } + } catch (error) { + handler.onError(this, subscriptionOpts, error); + } + } else { + handler.onError(this, subscriptionOpts, new Error(`Message does not contain utf8Data`)); + } + } +} -- cgit v1.2.3