aboutsummaryrefslogtreecommitdiffstats
path: root/packages/connect/src/node_ws_orderbook_channel.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/connect/src/node_ws_orderbook_channel.ts')
-rw-r--r--packages/connect/src/node_ws_orderbook_channel.ts158
1 files changed, 158 insertions, 0 deletions
diff --git a/packages/connect/src/node_ws_orderbook_channel.ts b/packages/connect/src/node_ws_orderbook_channel.ts
new file mode 100644
index 000000000..5f61ac4c8
--- /dev/null
+++ b/packages/connect/src/node_ws_orderbook_channel.ts
@@ -0,0 +1,158 @@
+import * as _ from 'lodash';
+import * as WebSocket from 'websocket';
+
+import { schemas as clientSchemas } from './schemas/schemas';
+import {
+ NodeWebSocketOrderbookChannelConfig,
+ OrderbookChannel,
+ OrderbookChannelHandler,
+ OrderbookChannelMessageTypes,
+ OrderbookChannelSubscriptionOpts,
+ WebsocketClientEventType,
+ WebsocketConnectionEventType,
+} from './types';
+import { assert } from './utils/assert';
+import { orderbookChannelMessageParser } from './utils/orderbook_channel_message_parser';
+
+const DEFAULT_HEARTBEAT_INTERVAL_MS = 15000;
+const MINIMUM_HEARTBEAT_INTERVAL_MS = 10;
+
+/**
+ * This class includes all the functionality related to interacting with a websocket endpoint
+ * that implements the standard relayer API v0 in a node environment
+ */
+export class NodeWebSocketOrderbookChannel implements OrderbookChannel {
+ private _apiEndpointUrl: string;
+ private _client: WebSocket.client;
+ private _connectionIfExists?: WebSocket.connection;
+ private _heartbeatTimerIfExists?: NodeJS.Timer;
+ private _subscriptionCounter = 0;
+ private _heartbeatIntervalMs: number;
+ /**
+ * Instantiates a new NodeWebSocketOrderbookChannelConfig instance
+ * @param url The relayer API base WS url you would like to interact with
+ * @param config The configuration object. Look up the type for the description.
+ * @return An instance of NodeWebSocketOrderbookChannelConfig
+ */
+ constructor(url: string, config?: NodeWebSocketOrderbookChannelConfig) {
+ assert.isUri('url', url);
+ if (!_.isUndefined(config)) {
+ assert.doesConformToSchema('config', config, clientSchemas.nodeWebSocketOrderbookChannelConfigSchema);
+ }
+ this._apiEndpointUrl = url;
+ this._heartbeatIntervalMs =
+ _.isUndefined(config) || _.isUndefined(config.heartbeatIntervalMs)
+ ? DEFAULT_HEARTBEAT_INTERVAL_MS
+ : config.heartbeatIntervalMs;
+ this._client = new WebSocket.client();
+ }
+ /**
+ * Subscribe to orderbook snapshots and updates from the websocket
+ * @param subscriptionOpts An OrderbookChannelSubscriptionOpts instance describing which
+ * token pair to subscribe to
+ * @param handler An OrderbookChannelHandler instance that responds to various
+ * channel updates
+ */
+ public subscribe(subscriptionOpts: OrderbookChannelSubscriptionOpts, handler: OrderbookChannelHandler): void {
+ assert.isOrderbookChannelSubscriptionOpts('subscriptionOpts', subscriptionOpts);
+ assert.isOrderbookChannelHandler('handler', handler);
+ this._subscriptionCounter += 1;
+ const subscribeMessage = {
+ type: 'subscribe',
+ channel: 'orderbook',
+ requestId: this._subscriptionCounter,
+ payload: subscriptionOpts,
+ };
+ this._getConnection((error, connection) => {
+ if (!_.isUndefined(error)) {
+ handler.onError(this, subscriptionOpts, error);
+ } else if (!_.isUndefined(connection) && connection.connected) {
+ connection.on(WebsocketConnectionEventType.Error, wsError => {
+ handler.onError(this, subscriptionOpts, wsError);
+ });
+ connection.on(WebsocketConnectionEventType.Close, (_code: number, _desc: string) => {
+ handler.onClose(this, subscriptionOpts);
+ });
+ connection.on(WebsocketConnectionEventType.Message, message => {
+ this._handleWebSocketMessage(subscribeMessage.requestId, subscriptionOpts, message, handler);
+ });
+ connection.sendUTF(JSON.stringify(subscribeMessage));
+ }
+ });
+ }
+ /**
+ * Close the websocket and stop receiving updates
+ */
+ public close(): void {
+ if (!_.isUndefined(this._connectionIfExists)) {
+ this._connectionIfExists.close();
+ }
+ if (!_.isUndefined(this._heartbeatTimerIfExists)) {
+ clearInterval(this._heartbeatTimerIfExists);
+ }
+ }
+ private _getConnection(callback: (error?: Error, connection?: WebSocket.connection) => void): void {
+ if (!_.isUndefined(this._connectionIfExists) && this._connectionIfExists.connected) {
+ callback(undefined, this._connectionIfExists);
+ } else {
+ this._client.on(WebsocketClientEventType.Connect, connection => {
+ this._connectionIfExists = connection;
+ if (this._heartbeatIntervalMs >= MINIMUM_HEARTBEAT_INTERVAL_MS) {
+ this._heartbeatTimerIfExists = setInterval(() => {
+ connection.ping('');
+ }, this._heartbeatIntervalMs);
+ } else {
+ callback(
+ new Error(
+ `Heartbeat interval is ${
+ this._heartbeatIntervalMs
+ }ms which is less than the required minimum of ${MINIMUM_HEARTBEAT_INTERVAL_MS}ms`,
+ ),
+ undefined,
+ );
+ }
+ callback(undefined, this._connectionIfExists);
+ });
+ this._client.on(WebsocketClientEventType.ConnectFailed, error => {
+ callback(error, undefined);
+ });
+ this._client.connect(this._apiEndpointUrl);
+ }
+ }
+ private _handleWebSocketMessage(
+ requestId: number,
+ subscriptionOpts: OrderbookChannelSubscriptionOpts,
+ message: WebSocket.IMessage,
+ handler: OrderbookChannelHandler,
+ ): void {
+ if (!_.isUndefined(message.utf8Data)) {
+ try {
+ const utf8Data = message.utf8Data;
+ const parserResult = orderbookChannelMessageParser.parse(utf8Data);
+ if (parserResult.requestId === requestId) {
+ switch (parserResult.type) {
+ case OrderbookChannelMessageTypes.Snapshot: {
+ handler.onSnapshot(this, subscriptionOpts, parserResult.payload);
+ break;
+ }
+ case OrderbookChannelMessageTypes.Update: {
+ handler.onUpdate(this, subscriptionOpts, parserResult.payload);
+ break;
+ }
+ default: {
+ handler.onError(
+ this,
+ subscriptionOpts,
+ new Error(`Message has missing a type parameter: ${utf8Data}`),
+ );
+ }
+ }
+ }
+ } catch (error) {
+ handler.onError(this, subscriptionOpts, error);
+ }
+ } else {
+ handler.onError(this, subscriptionOpts, new Error(`Message does not contain utf8Data`));
+ }
+ }
+}