aboutsummaryrefslogtreecommitdiffstats
path: root/packages/connect/src/ws_orderbook_channel.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/connect/src/ws_orderbook_channel.ts')
-rw-r--r--packages/connect/src/ws_orderbook_channel.ts238
1 files changed, 119 insertions, 119 deletions
diff --git a/packages/connect/src/ws_orderbook_channel.ts b/packages/connect/src/ws_orderbook_channel.ts
index 822a022f4..bbb47c649 100644
--- a/packages/connect/src/ws_orderbook_channel.ts
+++ b/packages/connect/src/ws_orderbook_channel.ts
@@ -4,12 +4,12 @@ import * as _ from 'lodash';
import * as WebSocket from 'websocket';
import {
- OrderbookChannel,
- OrderbookChannelHandler,
- OrderbookChannelMessageTypes,
- OrderbookChannelSubscriptionOpts,
- WebsocketClientEventType,
- WebsocketConnectionEventType,
+ OrderbookChannel,
+ OrderbookChannelHandler,
+ OrderbookChannelMessageTypes,
+ OrderbookChannelSubscriptionOpts,
+ WebsocketClientEventType,
+ WebsocketConnectionEventType,
} from './types';
import { orderbookChannelMessageParser } from './utils/orderbook_channel_message_parser';
@@ -18,117 +18,117 @@ import { orderbookChannelMessageParser } from './utils/orderbook_channel_message
* that implements the standard relayer API v0
*/
export class WebSocketOrderbookChannel implements OrderbookChannel {
- private _apiEndpointUrl: string;
- private _client: WebSocket.client;
- private _connectionIfExists?: WebSocket.connection;
- private _subscriptionCounter = 0;
- /**
- * Instantiates a new WebSocketOrderbookChannel instance
- * @param url The relayer API base WS url you would like to interact with
- * @return An instance of WebSocketOrderbookChannel
- */
- constructor(url: string) {
- assert.isUri('url', url);
- this._apiEndpointUrl = url;
- this._client = new WebSocket.client();
- }
- /**
- * Subscribe to orderbook snapshots and updates from the websocket
- * @param subscriptionOpts An OrderbookChannelSubscriptionOpts instance describing which
- * token pair to subscribe to
- * @param handler An OrderbookChannelHandler instance that responds to various
- * channel updates
- */
- public subscribe(subscriptionOpts: OrderbookChannelSubscriptionOpts, handler: OrderbookChannelHandler): void {
- assert.doesConformToSchema(
- 'subscriptionOpts',
- subscriptionOpts,
- schemas.relayerApiOrderbookChannelSubscribePayload,
- );
- assert.isFunction('handler.onSnapshot', _.get(handler, 'onSnapshot'));
- assert.isFunction('handler.onUpdate', _.get(handler, 'onUpdate'));
- assert.isFunction('handler.onError', _.get(handler, 'onError'));
- assert.isFunction('handler.onClose', _.get(handler, 'onClose'));
- this._subscriptionCounter += 1;
- const subscribeMessage = {
- type: 'subscribe',
- channel: 'orderbook',
- requestId: this._subscriptionCounter,
- payload: subscriptionOpts,
- };
- this._getConnection((error, connection) => {
- if (!_.isUndefined(error)) {
- handler.onError(this, subscriptionOpts, error);
- } else if (!_.isUndefined(connection) && connection.connected) {
- connection.on(WebsocketConnectionEventType.Error, wsError => {
- handler.onError(this, subscriptionOpts, wsError);
- });
- connection.on(WebsocketConnectionEventType.Close, () => {
- handler.onClose(this, subscriptionOpts);
- });
- connection.on(WebsocketConnectionEventType.Message, message => {
- this._handleWebSocketMessage(subscribeMessage.requestId, subscriptionOpts, message, handler);
- });
- connection.sendUTF(JSON.stringify(subscribeMessage));
- }
- });
- }
- /**
- * Close the websocket and stop receiving updates
- */
- public close() {
- if (!_.isUndefined(this._connectionIfExists)) {
- this._connectionIfExists.close();
- }
- }
- private _getConnection(callback: (error?: Error, connection?: WebSocket.connection) => void) {
- if (!_.isUndefined(this._connectionIfExists) && this._connectionIfExists.connected) {
- callback(undefined, this._connectionIfExists);
- } else {
- this._client.on(WebsocketClientEventType.Connect, connection => {
- this._connectionIfExists = connection;
- callback(undefined, this._connectionIfExists);
- });
- this._client.on(WebsocketClientEventType.ConnectFailed, error => {
- callback(error, undefined);
- });
- this._client.connect(this._apiEndpointUrl);
- }
- }
- private _handleWebSocketMessage(
- requestId: number,
- subscriptionOpts: OrderbookChannelSubscriptionOpts,
- message: WebSocket.IMessage,
- handler: OrderbookChannelHandler,
- ): void {
- if (!_.isUndefined(message.utf8Data)) {
- try {
- const utf8Data = message.utf8Data;
- const parserResult = orderbookChannelMessageParser.parse(utf8Data);
- if (parserResult.requestId === requestId) {
- switch (parserResult.type) {
- case OrderbookChannelMessageTypes.Snapshot: {
- handler.onSnapshot(this, subscriptionOpts, parserResult.payload);
- break;
- }
- case OrderbookChannelMessageTypes.Update: {
- handler.onUpdate(this, subscriptionOpts, parserResult.payload);
- break;
- }
- default: {
- handler.onError(
- this,
- subscriptionOpts,
- new Error(`Message has missing a type parameter: ${utf8Data}`),
- );
- }
- }
- }
- } catch (error) {
- handler.onError(this, subscriptionOpts, error);
- }
- } else {
- handler.onError(this, subscriptionOpts, new Error(`Message does not contain utf8Data`));
- }
- }
+ private _apiEndpointUrl: string;
+ private _client: WebSocket.client;
+ private _connectionIfExists?: WebSocket.connection;
+ private _subscriptionCounter = 0;
+ /**
+ * Instantiates a new WebSocketOrderbookChannel instance
+ * @param url The relayer API base WS url you would like to interact with
+ * @return An instance of WebSocketOrderbookChannel
+ */
+ constructor(url: string) {
+ assert.isUri('url', url);
+ this._apiEndpointUrl = url;
+ this._client = new WebSocket.client();
+ }
+ /**
+ * Subscribe to orderbook snapshots and updates from the websocket
+ * @param subscriptionOpts An OrderbookChannelSubscriptionOpts instance describing which
+ * token pair to subscribe to
+ * @param handler An OrderbookChannelHandler instance that responds to various
+ * channel updates
+ */
+ public subscribe(subscriptionOpts: OrderbookChannelSubscriptionOpts, handler: OrderbookChannelHandler): void {
+ assert.doesConformToSchema(
+ 'subscriptionOpts',
+ subscriptionOpts,
+ schemas.relayerApiOrderbookChannelSubscribePayload,
+ );
+ assert.isFunction('handler.onSnapshot', _.get(handler, 'onSnapshot'));
+ assert.isFunction('handler.onUpdate', _.get(handler, 'onUpdate'));
+ assert.isFunction('handler.onError', _.get(handler, 'onError'));
+ assert.isFunction('handler.onClose', _.get(handler, 'onClose'));
+ this._subscriptionCounter += 1;
+ const subscribeMessage = {
+ type: 'subscribe',
+ channel: 'orderbook',
+ requestId: this._subscriptionCounter,
+ payload: subscriptionOpts,
+ };
+ this._getConnection((error, connection) => {
+ if (!_.isUndefined(error)) {
+ handler.onError(this, subscriptionOpts, error);
+ } else if (!_.isUndefined(connection) && connection.connected) {
+ connection.on(WebsocketConnectionEventType.Error, wsError => {
+ handler.onError(this, subscriptionOpts, wsError);
+ });
+ connection.on(WebsocketConnectionEventType.Close, () => {
+ handler.onClose(this, subscriptionOpts);
+ });
+ connection.on(WebsocketConnectionEventType.Message, message => {
+ this._handleWebSocketMessage(subscribeMessage.requestId, subscriptionOpts, message, handler);
+ });
+ connection.sendUTF(JSON.stringify(subscribeMessage));
+ }
+ });
+ }
+ /**
+ * Close the websocket and stop receiving updates
+ */
+ public close() {
+ if (!_.isUndefined(this._connectionIfExists)) {
+ this._connectionIfExists.close();
+ }
+ }
+ private _getConnection(callback: (error?: Error, connection?: WebSocket.connection) => void) {
+ if (!_.isUndefined(this._connectionIfExists) && this._connectionIfExists.connected) {
+ callback(undefined, this._connectionIfExists);
+ } else {
+ this._client.on(WebsocketClientEventType.Connect, connection => {
+ this._connectionIfExists = connection;
+ callback(undefined, this._connectionIfExists);
+ });
+ this._client.on(WebsocketClientEventType.ConnectFailed, error => {
+ callback(error, undefined);
+ });
+ this._client.connect(this._apiEndpointUrl);
+ }
+ }
+ private _handleWebSocketMessage(
+ requestId: number,
+ subscriptionOpts: OrderbookChannelSubscriptionOpts,
+ message: WebSocket.IMessage,
+ handler: OrderbookChannelHandler,
+ ): void {
+ if (!_.isUndefined(message.utf8Data)) {
+ try {
+ const utf8Data = message.utf8Data;
+ const parserResult = orderbookChannelMessageParser.parse(utf8Data);
+ if (parserResult.requestId === requestId) {
+ switch (parserResult.type) {
+ case OrderbookChannelMessageTypes.Snapshot: {
+ handler.onSnapshot(this, subscriptionOpts, parserResult.payload);
+ break;
+ }
+ case OrderbookChannelMessageTypes.Update: {
+ handler.onUpdate(this, subscriptionOpts, parserResult.payload);
+ break;
+ }
+ default: {
+ handler.onError(
+ this,
+ subscriptionOpts,
+ new Error(`Message has missing a type parameter: ${utf8Data}`),
+ );
+ }
+ }
+ }
+ } catch (error) {
+ handler.onError(this, subscriptionOpts, error);
+ }
+ } else {
+ handler.onError(this, subscriptionOpts, new Error(`Message does not contain utf8Data`));
+ }
+ }
}