aboutsummaryrefslogtreecommitdiffstats
path: root/packages/connect/src
diff options
context:
space:
mode:
Diffstat (limited to 'packages/connect/src')
-rw-r--r--packages/connect/src/index.ts3
-rw-r--r--packages/connect/src/orderbook_channel_factory.ts32
-rw-r--r--packages/connect/src/schemas/schemas.ts2
-rw-r--r--packages/connect/src/schemas/websocket_orderbook_channel_config_schema.ts10
-rw-r--r--packages/connect/src/types.ts13
-rw-r--r--packages/connect/src/utils/assert.ts26
-rw-r--r--packages/connect/src/utils/orderbook_channel_message_parser.ts8
-rw-r--r--packages/connect/src/ws_orderbook_channel.ts185
8 files changed, 131 insertions, 148 deletions
diff --git a/packages/connect/src/index.ts b/packages/connect/src/index.ts
index ef5d8683e..7f5eb8ed3 100644
--- a/packages/connect/src/index.ts
+++ b/packages/connect/src/index.ts
@@ -1,5 +1,5 @@
export { HttpClient } from './http_client';
-export { WebSocketOrderbookChannel } from './ws_orderbook_channel';
+export { orderbookChannelFactory } from './orderbook_channel_factory';
export {
Client,
FeesRequest,
@@ -14,7 +14,6 @@ export {
TokenPairsItem,
TokenPairsRequestOpts,
TokenTradeInfo,
- WebSocketOrderbookChannelConfig,
} from './types';
export { Order, SignedOrder } from '@0xproject/types';
diff --git a/packages/connect/src/orderbook_channel_factory.ts b/packages/connect/src/orderbook_channel_factory.ts
new file mode 100644
index 000000000..5134af323
--- /dev/null
+++ b/packages/connect/src/orderbook_channel_factory.ts
@@ -0,0 +1,32 @@
+import * as WebSocket from 'websocket';
+
+import { OrderbookChannel, OrderbookChannelHandler } from './types';
+import { assert } from './utils/assert';
+import { WebSocketOrderbookChannel } from './ws_orderbook_channel';
+
+export const orderbookChannelFactory = {
+ /**
+ * Instantiates a new WebSocketOrderbookChannel instance
+ * @param url The relayer API base WS url you would like to interact with
+ * @param handler An OrderbookChannelHandler instance that responds to various
+ * channel updates
+ * @return An OrderbookChannel Promise
+ */
+ async createWebSocketOrderbookChannelAsync(
+ url: string,
+ handler: OrderbookChannelHandler,
+ ): Promise<OrderbookChannel> {
+ assert.isUri('url', url);
+ assert.isOrderbookChannelHandler('handler', handler);
+ return new Promise<OrderbookChannel>((resolve, reject) => {
+ const client = new WebSocket.w3cwebsocket(url);
+ client.onopen = () => {
+ const orderbookChannel = new WebSocketOrderbookChannel(client, handler);
+ resolve(orderbookChannel);
+ };
+ client.onerror = err => {
+ reject(err);
+ };
+ });
+ },
+};
diff --git a/packages/connect/src/schemas/schemas.ts b/packages/connect/src/schemas/schemas.ts
index b9a8472fb..0b8b798a9 100644
--- a/packages/connect/src/schemas/schemas.ts
+++ b/packages/connect/src/schemas/schemas.ts
@@ -3,7 +3,6 @@ import { orderBookRequestSchema } from './orderbook_request_schema';
import { ordersRequestOptsSchema } from './orders_request_opts_schema';
import { pagedRequestOptsSchema } from './paged_request_opts_schema';
import { tokenPairsRequestOptsSchema } from './token_pairs_request_opts_schema';
-import { webSocketOrderbookChannelConfigSchema } from './websocket_orderbook_channel_config_schema';
export const schemas = {
feesRequestSchema,
@@ -11,5 +10,4 @@ export const schemas = {
ordersRequestOptsSchema,
pagedRequestOptsSchema,
tokenPairsRequestOptsSchema,
- webSocketOrderbookChannelConfigSchema,
};
diff --git a/packages/connect/src/schemas/websocket_orderbook_channel_config_schema.ts b/packages/connect/src/schemas/websocket_orderbook_channel_config_schema.ts
deleted file mode 100644
index 81c0cac9c..000000000
--- a/packages/connect/src/schemas/websocket_orderbook_channel_config_schema.ts
+++ /dev/null
@@ -1,10 +0,0 @@
-export const webSocketOrderbookChannelConfigSchema = {
- id: '/WebSocketOrderbookChannelConfig',
- type: 'object',
- properties: {
- heartbeatIntervalMs: {
- type: 'number',
- minimum: 10,
- },
- },
-};
diff --git a/packages/connect/src/types.ts b/packages/connect/src/types.ts
index f5e52f50d..fc7a4b24d 100644
--- a/packages/connect/src/types.ts
+++ b/packages/connect/src/types.ts
@@ -11,18 +11,11 @@ export interface Client {
}
export interface OrderbookChannel {
- subscribe: (subscriptionOpts: OrderbookChannelSubscriptionOpts, handler: OrderbookChannelHandler) => void;
+ subscribe: (subscriptionOpts: OrderbookChannelSubscriptionOpts) => void;
close: () => void;
}
/**
- * heartbeatInterval: Interval in milliseconds that the orderbook channel should ping the underlying websocket. Default: 15000
- */
-export interface WebSocketOrderbookChannelConfig {
- heartbeatIntervalMs?: number;
-}
-
-/**
* baseTokenAddress: The address of token designated as the baseToken in the currency pair calculation of price
* quoteTokenAddress: The address of token designated as the quoteToken in the currency pair calculation of price
* snapshot: If true, a snapshot of the orderbook will be sent before the updates to the orderbook
@@ -46,8 +39,8 @@ export interface OrderbookChannelHandler {
subscriptionOpts: OrderbookChannelSubscriptionOpts,
order: SignedOrder,
) => void;
- onError: (channel: OrderbookChannel, subscriptionOpts: OrderbookChannelSubscriptionOpts, err: Error) => void;
- onClose: (channel: OrderbookChannel, subscriptionOpts: OrderbookChannelSubscriptionOpts) => void;
+ onError: (channel: OrderbookChannel, err: Error, subscriptionOpts?: OrderbookChannelSubscriptionOpts) => void;
+ onClose: (channel: OrderbookChannel) => void;
}
export type OrderbookChannelMessage =
diff --git a/packages/connect/src/utils/assert.ts b/packages/connect/src/utils/assert.ts
new file mode 100644
index 000000000..a0fd12fbd
--- /dev/null
+++ b/packages/connect/src/utils/assert.ts
@@ -0,0 +1,26 @@
+import { assert as sharedAssert } from '@0xproject/assert';
+// HACK: We need those two unused imports because they're actually used by sharedAssert which gets injected here
+// tslint:disable-next-line:no-unused-variable
+import { Schema, schemas } from '@0xproject/json-schemas';
+// tslint:disable-next-line:no-unused-variable
+import { ECSignature } from '@0xproject/types';
+// tslint:disable-next-line:no-unused-variable
+import { BigNumber } from '@0xproject/utils';
+import * as _ from 'lodash';
+
+export const assert = {
+ ...sharedAssert,
+ isOrderbookChannelSubscriptionOpts(variableName: string, subscriptionOpts: any): void {
+ sharedAssert.doesConformToSchema(
+ variableName,
+ subscriptionOpts,
+ schemas.relayerApiOrderbookChannelSubscribePayload,
+ );
+ },
+ isOrderbookChannelHandler(variableName: string, handler: any): void {
+ sharedAssert.isFunction(`${variableName}.onSnapshot`, _.get(handler, 'onSnapshot'));
+ sharedAssert.isFunction(`${variableName}.onUpdate`, _.get(handler, 'onUpdate'));
+ sharedAssert.isFunction(`${variableName}.onError`, _.get(handler, 'onError'));
+ sharedAssert.isFunction(`${variableName}.onClose`, _.get(handler, 'onClose'));
+ },
+};
diff --git a/packages/connect/src/utils/orderbook_channel_message_parser.ts b/packages/connect/src/utils/orderbook_channel_message_parser.ts
index 9a9ca8901..593288078 100644
--- a/packages/connect/src/utils/orderbook_channel_message_parser.ts
+++ b/packages/connect/src/utils/orderbook_channel_message_parser.ts
@@ -8,10 +8,16 @@ import { relayerResponseJsonParsers } from './relayer_response_json_parsers';
export const orderbookChannelMessageParser = {
parse(utf8Data: string): OrderbookChannelMessage {
+ // parse the message
const messageObj = JSON.parse(utf8Data);
+ // ensure we have a type parameter to switch on
const type: string = _.get(messageObj, 'type');
assert.assert(!_.isUndefined(type), `Message is missing a type parameter: ${utf8Data}`);
assert.isString('type', type);
+ // ensure we have a request id for the resulting message
+ const requestId: number = _.get(messageObj, 'requestId');
+ assert.assert(!_.isUndefined(requestId), `Message is missing a requestId parameter: ${utf8Data}`);
+ assert.isNumber('requestId', requestId);
switch (type) {
case OrderbookChannelMessageTypes.Snapshot: {
assert.doesConformToSchema('message', messageObj, schemas.relayerApiOrderbookChannelSnapshotSchema);
@@ -28,7 +34,7 @@ export const orderbookChannelMessageParser = {
default: {
return {
type: OrderbookChannelMessageTypes.Unknown,
- requestId: 0,
+ requestId,
payload: undefined,
};
}
diff --git a/packages/connect/src/ws_orderbook_channel.ts b/packages/connect/src/ws_orderbook_channel.ts
index bdcc8a75d..e1c55cce3 100644
--- a/packages/connect/src/ws_orderbook_channel.ts
+++ b/packages/connect/src/ws_orderbook_channel.ts
@@ -1,166 +1,105 @@
-import { assert } from '@0xproject/assert';
-import { schemas } from '@0xproject/json-schemas';
import * as _ from 'lodash';
import * as WebSocket from 'websocket';
-import { schemas as clientSchemas } from './schemas/schemas';
import {
OrderbookChannel,
OrderbookChannelHandler,
OrderbookChannelMessageTypes,
OrderbookChannelSubscriptionOpts,
- WebsocketClientEventType,
- WebsocketConnectionEventType,
- WebSocketOrderbookChannelConfig,
} from './types';
+import { assert } from './utils/assert';
import { orderbookChannelMessageParser } from './utils/orderbook_channel_message_parser';
-const DEFAULT_HEARTBEAT_INTERVAL_MS = 15000;
-const MINIMUM_HEARTBEAT_INTERVAL_MS = 10;
-
/**
* This class includes all the functionality related to interacting with a websocket endpoint
* that implements the standard relayer API v0
*/
export class WebSocketOrderbookChannel implements OrderbookChannel {
- private _apiEndpointUrl: string;
- private _client: WebSocket.client;
- private _connectionIfExists?: WebSocket.connection;
- private _heartbeatTimerIfExists?: NodeJS.Timer;
- private _subscriptionCounter = 0;
- private _heartbeatIntervalMs: number;
+ private _client: WebSocket.w3cwebsocket;
+ private _handler: OrderbookChannelHandler;
+ private _subscriptionOptsList: OrderbookChannelSubscriptionOpts[] = [];
/**
* Instantiates a new WebSocketOrderbookChannel instance
- * @param url The relayer API base WS url you would like to interact with
- * @param config The configuration object. Look up the type for the description.
+ * @param client A WebSocket client
+ * @param handler An OrderbookChannelHandler instance that responds to various
+ * channel updates
* @return An instance of WebSocketOrderbookChannel
*/
- constructor(url: string, config?: WebSocketOrderbookChannelConfig) {
- assert.isUri('url', url);
- if (!_.isUndefined(config)) {
- assert.doesConformToSchema('config', config, clientSchemas.webSocketOrderbookChannelConfigSchema);
- }
- this._apiEndpointUrl = url;
- this._heartbeatIntervalMs =
- _.isUndefined(config) || _.isUndefined(config.heartbeatIntervalMs)
- ? DEFAULT_HEARTBEAT_INTERVAL_MS
- : config.heartbeatIntervalMs;
- this._client = new WebSocket.client();
+ constructor(client: WebSocket.w3cwebsocket, handler: OrderbookChannelHandler) {
+ assert.isOrderbookChannelHandler('handler', handler);
+ // set private members
+ this._client = client;
+ this._handler = handler;
+ // attach client callbacks
+ this._client.onerror = err => {
+ this._handler.onError(this, err);
+ };
+ this._client.onclose = () => {
+ this._handler.onClose(this);
+ };
+ this._client.onmessage = message => {
+ this._handleWebSocketMessage(message);
+ };
}
/**
* Subscribe to orderbook snapshots and updates from the websocket
* @param subscriptionOpts An OrderbookChannelSubscriptionOpts instance describing which
* token pair to subscribe to
- * @param handler An OrderbookChannelHandler instance that responds to various
- * channel updates
*/
- public subscribe(subscriptionOpts: OrderbookChannelSubscriptionOpts, handler: OrderbookChannelHandler): void {
- assert.doesConformToSchema(
- 'subscriptionOpts',
- subscriptionOpts,
- schemas.relayerApiOrderbookChannelSubscribePayload,
- );
- assert.isFunction('handler.onSnapshot', _.get(handler, 'onSnapshot'));
- assert.isFunction('handler.onUpdate', _.get(handler, 'onUpdate'));
- assert.isFunction('handler.onError', _.get(handler, 'onError'));
- assert.isFunction('handler.onClose', _.get(handler, 'onClose'));
- this._subscriptionCounter += 1;
+ public subscribe(subscriptionOpts: OrderbookChannelSubscriptionOpts): void {
+ assert.isOrderbookChannelSubscriptionOpts('subscriptionOpts', subscriptionOpts);
+ assert.assert(this._client.readyState === WebSocket.w3cwebsocket.OPEN, 'WebSocket connection is closed');
+ this._subscriptionOptsList.push(subscriptionOpts);
+ // TODO: update requestId management to use UUIDs for v2
const subscribeMessage = {
type: 'subscribe',
channel: 'orderbook',
- requestId: this._subscriptionCounter,
+ requestId: this._subscriptionOptsList.length - 1,
payload: subscriptionOpts,
};
- this._getConnection((error, connection) => {
- if (!_.isUndefined(error)) {
- handler.onError(this, subscriptionOpts, error);
- } else if (!_.isUndefined(connection) && connection.connected) {
- connection.on(WebsocketConnectionEventType.Error, wsError => {
- handler.onError(this, subscriptionOpts, wsError);
- });
- connection.on(WebsocketConnectionEventType.Close, (_code: number, _desc: string) => {
- handler.onClose(this, subscriptionOpts);
- });
- connection.on(WebsocketConnectionEventType.Message, message => {
- this._handleWebSocketMessage(subscribeMessage.requestId, subscriptionOpts, message, handler);
- });
- connection.sendUTF(JSON.stringify(subscribeMessage));
- }
- });
+ this._client.send(JSON.stringify(subscribeMessage));
}
/**
* Close the websocket and stop receiving updates
*/
public close(): void {
- if (!_.isUndefined(this._connectionIfExists)) {
- this._connectionIfExists.close();
- }
- if (!_.isUndefined(this._heartbeatTimerIfExists)) {
- clearInterval(this._heartbeatTimerIfExists);
- }
+ this._client.close();
}
- private _getConnection(callback: (error?: Error, connection?: WebSocket.connection) => void): void {
- if (!_.isUndefined(this._connectionIfExists) && this._connectionIfExists.connected) {
- callback(undefined, this._connectionIfExists);
- } else {
- this._client.on(WebsocketClientEventType.Connect, connection => {
- this._connectionIfExists = connection;
- if (this._heartbeatIntervalMs >= MINIMUM_HEARTBEAT_INTERVAL_MS) {
- this._heartbeatTimerIfExists = setInterval(() => {
- connection.ping('');
- }, this._heartbeatIntervalMs);
- } else {
- callback(
- new Error(
- `Heartbeat interval is ${
- this._heartbeatIntervalMs
- }ms which is less than the required minimum of ${MINIMUM_HEARTBEAT_INTERVAL_MS}ms`,
- ),
- undefined,
- );
- }
- callback(undefined, this._connectionIfExists);
- });
- this._client.on(WebsocketClientEventType.ConnectFailed, error => {
- callback(error, undefined);
- });
- this._client.connect(this._apiEndpointUrl);
+ private _handleWebSocketMessage(message: any): void {
+ if (_.isUndefined(message.data)) {
+ this._handler.onError(this, new Error(`Message does not contain data. Url: ${this._client.url}`));
+ return;
}
- }
- private _handleWebSocketMessage(
- requestId: number,
- subscriptionOpts: OrderbookChannelSubscriptionOpts,
- message: WebSocket.IMessage,
- handler: OrderbookChannelHandler,
- ): void {
- if (!_.isUndefined(message.utf8Data)) {
- try {
- const utf8Data = message.utf8Data;
- const parserResult = orderbookChannelMessageParser.parse(utf8Data);
- if (parserResult.requestId === requestId) {
- switch (parserResult.type) {
- case OrderbookChannelMessageTypes.Snapshot: {
- handler.onSnapshot(this, subscriptionOpts, parserResult.payload);
- break;
- }
- case OrderbookChannelMessageTypes.Update: {
- handler.onUpdate(this, subscriptionOpts, parserResult.payload);
- break;
- }
- default: {
- handler.onError(
- this,
- subscriptionOpts,
- new Error(`Message has missing a type parameter: ${utf8Data}`),
- );
- }
- }
+ try {
+ const data = message.data;
+ const parserResult = orderbookChannelMessageParser.parse(data);
+ const subscriptionOpts = this._subscriptionOptsList[parserResult.requestId];
+ if (_.isUndefined(subscriptionOpts)) {
+ this._handler.onError(
+ this,
+ new Error(`Message has unknown requestId. Url: ${this._client.url} Message: ${data}`),
+ );
+ return;
+ }
+ switch (parserResult.type) {
+ case OrderbookChannelMessageTypes.Snapshot: {
+ this._handler.onSnapshot(this, subscriptionOpts, parserResult.payload);
+ break;
+ }
+ case OrderbookChannelMessageTypes.Update: {
+ this._handler.onUpdate(this, subscriptionOpts, parserResult.payload);
+ break;
+ }
+ default: {
+ this._handler.onError(
+ this,
+ new Error(`Message has unknown type parameter. Url: ${this._client.url} Message: ${data}`),
+ subscriptionOpts,
+ );
}
- } catch (error) {
- handler.onError(this, subscriptionOpts, error);
}
- } else {
- handler.onError(this, subscriptionOpts, new Error(`Message does not contain utf8Data`));
+ } catch (error) {
+ this._handler.onError(this, error);
}
}
}