aboutsummaryrefslogtreecommitdiffstats
path: root/packages/connect/src/ws_orderbook_channel.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/connect/src/ws_orderbook_channel.ts')
-rw-r--r--packages/connect/src/ws_orderbook_channel.ts185
1 files changed, 62 insertions, 123 deletions
diff --git a/packages/connect/src/ws_orderbook_channel.ts b/packages/connect/src/ws_orderbook_channel.ts
index bdcc8a75d..fa9f5e37f 100644
--- a/packages/connect/src/ws_orderbook_channel.ts
+++ b/packages/connect/src/ws_orderbook_channel.ts
@@ -1,166 +1,105 @@
-import { assert } from '@0xproject/assert';
-import { schemas } from '@0xproject/json-schemas';
import * as _ from 'lodash';
import * as WebSocket from 'websocket';
-import { schemas as clientSchemas } from './schemas/schemas';
import {
OrderbookChannel,
OrderbookChannelHandler,
OrderbookChannelMessageTypes,
OrderbookChannelSubscriptionOpts,
- WebsocketClientEventType,
- WebsocketConnectionEventType,
- WebSocketOrderbookChannelConfig,
} from './types';
+import { assert } from './utils/assert';
import { orderbookChannelMessageParser } from './utils/orderbook_channel_message_parser';
-const DEFAULT_HEARTBEAT_INTERVAL_MS = 15000;
-const MINIMUM_HEARTBEAT_INTERVAL_MS = 10;
-
/**
* This class includes all the functionality related to interacting with a websocket endpoint
* that implements the standard relayer API v0
*/
export class WebSocketOrderbookChannel implements OrderbookChannel {
- private _apiEndpointUrl: string;
- private _client: WebSocket.client;
- private _connectionIfExists?: WebSocket.connection;
- private _heartbeatTimerIfExists?: NodeJS.Timer;
- private _subscriptionCounter = 0;
- private _heartbeatIntervalMs: number;
+ private readonly _client: WebSocket.w3cwebsocket;
+ private readonly _handler: OrderbookChannelHandler;
+ private readonly _subscriptionOptsList: OrderbookChannelSubscriptionOpts[] = [];
/**
* Instantiates a new WebSocketOrderbookChannel instance
- * @param url The relayer API base WS url you would like to interact with
- * @param config The configuration object. Look up the type for the description.
+ * @param client A WebSocket client
+ * @param handler An OrderbookChannelHandler instance that responds to various
+ * channel updates
* @return An instance of WebSocketOrderbookChannel
*/
- constructor(url: string, config?: WebSocketOrderbookChannelConfig) {
- assert.isUri('url', url);
- if (!_.isUndefined(config)) {
- assert.doesConformToSchema('config', config, clientSchemas.webSocketOrderbookChannelConfigSchema);
- }
- this._apiEndpointUrl = url;
- this._heartbeatIntervalMs =
- _.isUndefined(config) || _.isUndefined(config.heartbeatIntervalMs)
- ? DEFAULT_HEARTBEAT_INTERVAL_MS
- : config.heartbeatIntervalMs;
- this._client = new WebSocket.client();
+ constructor(client: WebSocket.w3cwebsocket, handler: OrderbookChannelHandler) {
+ assert.isOrderbookChannelHandler('handler', handler);
+ // set private members
+ this._client = client;
+ this._handler = handler;
+ // attach client callbacks
+ this._client.onerror = err => {
+ this._handler.onError(this, err);
+ };
+ this._client.onclose = () => {
+ this._handler.onClose(this);
+ };
+ this._client.onmessage = message => {
+ this._handleWebSocketMessage(message);
+ };
}
/**
* Subscribe to orderbook snapshots and updates from the websocket
* @param subscriptionOpts An OrderbookChannelSubscriptionOpts instance describing which
* token pair to subscribe to
- * @param handler An OrderbookChannelHandler instance that responds to various
- * channel updates
*/
- public subscribe(subscriptionOpts: OrderbookChannelSubscriptionOpts, handler: OrderbookChannelHandler): void {
- assert.doesConformToSchema(
- 'subscriptionOpts',
- subscriptionOpts,
- schemas.relayerApiOrderbookChannelSubscribePayload,
- );
- assert.isFunction('handler.onSnapshot', _.get(handler, 'onSnapshot'));
- assert.isFunction('handler.onUpdate', _.get(handler, 'onUpdate'));
- assert.isFunction('handler.onError', _.get(handler, 'onError'));
- assert.isFunction('handler.onClose', _.get(handler, 'onClose'));
- this._subscriptionCounter += 1;
+ public subscribe(subscriptionOpts: OrderbookChannelSubscriptionOpts): void {
+ assert.isOrderbookChannelSubscriptionOpts('subscriptionOpts', subscriptionOpts);
+ assert.assert(this._client.readyState === WebSocket.w3cwebsocket.OPEN, 'WebSocket connection is closed');
+ this._subscriptionOptsList.push(subscriptionOpts);
+ // TODO: update requestId management to use UUIDs for v2
const subscribeMessage = {
type: 'subscribe',
channel: 'orderbook',
- requestId: this._subscriptionCounter,
+ requestId: this._subscriptionOptsList.length - 1,
payload: subscriptionOpts,
};
- this._getConnection((error, connection) => {
- if (!_.isUndefined(error)) {
- handler.onError(this, subscriptionOpts, error);
- } else if (!_.isUndefined(connection) && connection.connected) {
- connection.on(WebsocketConnectionEventType.Error, wsError => {
- handler.onError(this, subscriptionOpts, wsError);
- });
- connection.on(WebsocketConnectionEventType.Close, (_code: number, _desc: string) => {
- handler.onClose(this, subscriptionOpts);
- });
- connection.on(WebsocketConnectionEventType.Message, message => {
- this._handleWebSocketMessage(subscribeMessage.requestId, subscriptionOpts, message, handler);
- });
- connection.sendUTF(JSON.stringify(subscribeMessage));
- }
- });
+ this._client.send(JSON.stringify(subscribeMessage));
}
/**
* Close the websocket and stop receiving updates
*/
public close(): void {
- if (!_.isUndefined(this._connectionIfExists)) {
- this._connectionIfExists.close();
- }
- if (!_.isUndefined(this._heartbeatTimerIfExists)) {
- clearInterval(this._heartbeatTimerIfExists);
- }
+ this._client.close();
}
- private _getConnection(callback: (error?: Error, connection?: WebSocket.connection) => void): void {
- if (!_.isUndefined(this._connectionIfExists) && this._connectionIfExists.connected) {
- callback(undefined, this._connectionIfExists);
- } else {
- this._client.on(WebsocketClientEventType.Connect, connection => {
- this._connectionIfExists = connection;
- if (this._heartbeatIntervalMs >= MINIMUM_HEARTBEAT_INTERVAL_MS) {
- this._heartbeatTimerIfExists = setInterval(() => {
- connection.ping('');
- }, this._heartbeatIntervalMs);
- } else {
- callback(
- new Error(
- `Heartbeat interval is ${
- this._heartbeatIntervalMs
- }ms which is less than the required minimum of ${MINIMUM_HEARTBEAT_INTERVAL_MS}ms`,
- ),
- undefined,
- );
- }
- callback(undefined, this._connectionIfExists);
- });
- this._client.on(WebsocketClientEventType.ConnectFailed, error => {
- callback(error, undefined);
- });
- this._client.connect(this._apiEndpointUrl);
+ private _handleWebSocketMessage(message: any): void {
+ if (_.isUndefined(message.data)) {
+ this._handler.onError(this, new Error(`Message does not contain data. Url: ${this._client.url}`));
+ return;
}
- }
- private _handleWebSocketMessage(
- requestId: number,
- subscriptionOpts: OrderbookChannelSubscriptionOpts,
- message: WebSocket.IMessage,
- handler: OrderbookChannelHandler,
- ): void {
- if (!_.isUndefined(message.utf8Data)) {
- try {
- const utf8Data = message.utf8Data;
- const parserResult = orderbookChannelMessageParser.parse(utf8Data);
- if (parserResult.requestId === requestId) {
- switch (parserResult.type) {
- case OrderbookChannelMessageTypes.Snapshot: {
- handler.onSnapshot(this, subscriptionOpts, parserResult.payload);
- break;
- }
- case OrderbookChannelMessageTypes.Update: {
- handler.onUpdate(this, subscriptionOpts, parserResult.payload);
- break;
- }
- default: {
- handler.onError(
- this,
- subscriptionOpts,
- new Error(`Message has missing a type parameter: ${utf8Data}`),
- );
- }
- }
+ try {
+ const data = message.data;
+ const parserResult = orderbookChannelMessageParser.parse(data);
+ const subscriptionOpts = this._subscriptionOptsList[parserResult.requestId];
+ if (_.isUndefined(subscriptionOpts)) {
+ this._handler.onError(
+ this,
+ new Error(`Message has unknown requestId. Url: ${this._client.url} Message: ${data}`),
+ );
+ return;
+ }
+ switch (parserResult.type) {
+ case OrderbookChannelMessageTypes.Snapshot: {
+ this._handler.onSnapshot(this, subscriptionOpts, parserResult.payload);
+ break;
+ }
+ case OrderbookChannelMessageTypes.Update: {
+ this._handler.onUpdate(this, subscriptionOpts, parserResult.payload);
+ break;
+ }
+ default: {
+ this._handler.onError(
+ this,
+ new Error(`Message has unknown type parameter. Url: ${this._client.url} Message: ${data}`),
+ subscriptionOpts,
+ );
}
- } catch (error) {
- handler.onError(this, subscriptionOpts, error);
}
- } else {
- handler.onError(this, subscriptionOpts, new Error(`Message does not contain utf8Data`));
+ } catch (error) {
+ this._handler.onError(this, error);
}
}
}