aboutsummaryrefslogtreecommitdiffstats
path: root/packages/connect/src/ws_orderbook_channel.ts
diff options
context:
space:
mode:
authorBrandon Millman <brandon.millman@gmail.com>2017-11-15 07:26:36 +0800
committerBrandon Millman <brandon.millman@gmail.com>2017-11-15 10:55:29 +0800
commit655b0636facc110e9192cc7c3190f4b16f212be9 (patch)
tree562109951a69321e6d8857798d4d5310248b71d7 /packages/connect/src/ws_orderbook_channel.ts
parent5bd8e172c9415e9d8eca2d3893fe767684018351 (diff)
downloaddexon-sol-tools-655b0636facc110e9192cc7c3190f4b16f212be9.tar
dexon-sol-tools-655b0636facc110e9192cc7c3190f4b16f212be9.tar.gz
dexon-sol-tools-655b0636facc110e9192cc7c3190f4b16f212be9.tar.bz2
dexon-sol-tools-655b0636facc110e9192cc7c3190f4b16f212be9.tar.lz
dexon-sol-tools-655b0636facc110e9192cc7c3190f4b16f212be9.tar.xz
dexon-sol-tools-655b0636facc110e9192cc7c3190f4b16f212be9.tar.zst
dexon-sol-tools-655b0636facc110e9192cc7c3190f4b16f212be9.zip
Add connect to monorepo
Diffstat (limited to 'packages/connect/src/ws_orderbook_channel.ts')
-rw-r--r--packages/connect/src/ws_orderbook_channel.ts127
1 files changed, 127 insertions, 0 deletions
diff --git a/packages/connect/src/ws_orderbook_channel.ts b/packages/connect/src/ws_orderbook_channel.ts
new file mode 100644
index 000000000..78b823dbe
--- /dev/null
+++ b/packages/connect/src/ws_orderbook_channel.ts
@@ -0,0 +1,127 @@
+import * as _ from 'lodash';
+import * as WebSocket from 'websocket';
+import {assert} from '@0xproject/assert';
+import {schemas} from '@0xproject/json-schemas';
+import {SignedOrder} from '0x.js';
+import {
+ OrderbookChannel,
+ OrderbookChannelHandler,
+ OrderbookChannelMessageTypes,
+ OrderbookChannelSubscriptionOpts,
+} from './types';
+import {orderbookChannelMessageParsers} from './utils/orderbook_channel_message_parsers';
+
+enum ConnectionEventType {
+ Close = 'close',
+ Error = 'error',
+ Message = 'message',
+}
+
+enum ClientEventType {
+ Connect = 'connect',
+ ConnectFailed = 'connectFailed',
+}
+
+/**
+ * This class includes all the functionality related to interacting with a websocket endpoint
+ * that implements the standard relayer API v0
+ */
+export class WebSocketOrderbookChannel implements OrderbookChannel {
+ private apiEndpointUrl: string;
+ private client: WebSocket.client;
+ private connectionIfExists?: WebSocket.connection;
+ /**
+ * Instantiates a new WebSocketOrderbookChannel instance
+ * @param url The base url for making API calls
+ * @return An instance of WebSocketOrderbookChannel
+ */
+ constructor(url: string) {
+ assert.isUri('url', url);
+ this.apiEndpointUrl = url;
+ this.client = new WebSocket.client();
+ }
+ /**
+ * Subscribe to orderbook snapshots and updates from the websocket
+ * @param subscriptionOpts An OrderbookChannelSubscriptionOpts instance describing which
+ * token pair to subscribe to
+ * @param handler An OrderbookChannelHandler instance that responds to various
+ * channel updates
+ */
+ public subscribe(subscriptionOpts: OrderbookChannelSubscriptionOpts, handler: OrderbookChannelHandler): void {
+ assert.doesConformToSchema(
+ 'subscriptionOpts', subscriptionOpts, schemas.relayerApiOrderbookChannelSubscribePayload);
+ assert.isFunction('handler.onSnapshot', _.get(handler, 'onSnapshot'));
+ assert.isFunction('handler.onUpdate', _.get(handler, 'onUpdate'));
+ assert.isFunction('handler.onError', _.get(handler, 'onError'));
+ assert.isFunction('handler.onClose', _.get(handler, 'onClose'));
+ const subscribeMessage = {
+ type: 'subscribe',
+ channel: 'orderbook',
+ payload: subscriptionOpts,
+ };
+ this._getConnection((error, connection) => {
+ if (!_.isUndefined(error)) {
+ handler.onError(this, error);
+ } else if (!_.isUndefined(connection) && connection.connected) {
+ connection.on(ConnectionEventType.Error, wsError => {
+ handler.onError(this, wsError);
+ });
+ connection.on(ConnectionEventType.Close, () => {
+ handler.onClose(this);
+ });
+ connection.on(ConnectionEventType.Message, message => {
+ this._handleWebSocketMessage(message, handler);
+ });
+ connection.sendUTF(JSON.stringify(subscribeMessage));
+ }
+ });
+ }
+ /**
+ * Close the websocket and stop receiving updates
+ */
+ public close() {
+ if (!_.isUndefined(this.connectionIfExists)) {
+ this.connectionIfExists.close();
+ }
+ }
+ private _getConnection(callback: (error?: Error, connection?: WebSocket.connection) => void) {
+ if (!_.isUndefined(this.connectionIfExists) && this.connectionIfExists.connected) {
+ callback(undefined, this.connectionIfExists);
+ } else {
+ this.client.on(ClientEventType.Connect, connection => {
+ this.connectionIfExists = connection;
+ callback(undefined, this.connectionIfExists);
+ });
+ this.client.on(ClientEventType.ConnectFailed, error => {
+ callback(error, undefined);
+ });
+ this.client.connect(this.apiEndpointUrl);
+ }
+ }
+ private _handleWebSocketMessage(message: WebSocket.IMessage, handler: OrderbookChannelHandler): void {
+ if (!_.isUndefined(message.utf8Data)) {
+ try {
+ const utf8Data = message.utf8Data;
+ const parserResult = orderbookChannelMessageParsers.parser(utf8Data);
+ const type = parserResult.type;
+ switch (parserResult.type) {
+ case (OrderbookChannelMessageTypes.Snapshot): {
+ handler.onSnapshot(this, parserResult.payload);
+ break;
+ }
+ case (OrderbookChannelMessageTypes.Update): {
+ handler.onUpdate(this, parserResult.payload);
+ break;
+ }
+ default: {
+ handler.onError(this, new Error(`Message has missing a type parameter: ${utf8Data}`));
+ }
+ }
+ } catch (error) {
+ handler.onError(this, error);
+ }
+ } else {
+ handler.onError(this, new Error(`Message does not contain utf8Data`));
+ }
+ }
+}