aboutsummaryrefslogtreecommitdiffstats
path: root/packages/order-watcher/src/order_watcher
diff options
context:
space:
mode:
authorkao <zichongkao@gmail.com>2018-12-14 05:33:46 +0800
committerkao <zichongkao@gmail.com>2018-12-15 04:52:55 +0800
commitd9b58848346a4be41684eea244e393afaab6a617 (patch)
treecec032005acf9ad8c70e05a4b5387c1f1df8b79f /packages/order-watcher/src/order_watcher
parent687749460d026ae8b16e355c85c70e1e79b63252 (diff)
downloaddexon-0x-contracts-d9b58848346a4be41684eea244e393afaab6a617.tar
dexon-0x-contracts-d9b58848346a4be41684eea244e393afaab6a617.tar.gz
dexon-0x-contracts-d9b58848346a4be41684eea244e393afaab6a617.tar.bz2
dexon-0x-contracts-d9b58848346a4be41684eea244e393afaab6a617.tar.lz
dexon-0x-contracts-d9b58848346a4be41684eea244e393afaab6a617.tar.xz
dexon-0x-contracts-d9b58848346a4be41684eea244e393afaab6a617.tar.zst
dexon-0x-contracts-d9b58848346a4be41684eea244e393afaab6a617.zip
Respond to CR
Diffstat (limited to 'packages/order-watcher/src/order_watcher')
-rw-r--r--packages/order-watcher/src/order_watcher/order_watcher_websocket.ts209
1 files changed, 99 insertions, 110 deletions
diff --git a/packages/order-watcher/src/order_watcher/order_watcher_websocket.ts b/packages/order-watcher/src/order_watcher/order_watcher_websocket.ts
index 84afc4000..806c7c6a5 100644
--- a/packages/order-watcher/src/order_watcher/order_watcher_websocket.ts
+++ b/packages/order-watcher/src/order_watcher/order_watcher_websocket.ts
@@ -1,60 +1,57 @@
import { ContractAddresses } from '@0x/contract-addresses';
-import { SignedOrder } from '@0x/types';
+import { OrderState, SignedOrder } from '@0x/types';
import { BigNumber, logUtils } from '@0x/utils';
import { Provider } from 'ethereum-types';
import * as http from 'http';
import * as WebSocket from 'websocket';
-import { webSocketUtf8MessageSchema } from '../schemas/websocket_utf8_message_schema';
-import { OnOrderStateChangeCallback, OrderWatcherConfig } from '../types';
+import { webSocketRequestSchema, webSocketUtf8MessageSchema } from '../schemas/websocket_schemas';
+import {
+ OnOrderStateChangeCallback,
+ OrderWatcherAction,
+ OrderWatcherConfig,
+ WebSocketRequest,
+ WebSocketResponse,
+} from '../types';
import { assert } from '../utils/assert';
import { OrderWatcher } from './order_watcher';
const DEFAULT_HTTP_PORT = 8080;
-const enum OrderWatcherAction {
- // Actions initiated by the user.
- getStats = 'getStats',
- addOrderAsync = 'addOrderAsync',
- removeOrder = 'removeOrder',
- // These are spontaneous; they are primarily orderstate changes.
- orderWatcherUpdate = 'orderWatcherUpdate',
- // `subscribe` and `unsubscribe` are methods of OrderWatcher, but we don't
- // need to expose them to the WebSocket server user because the user implicitly
- // subscribes and unsubscribes by connecting and disconnecting from the server.
-}
-
-// Users have to create a json object of this format and attach it to
-// the data field of their WebSocket message to interact with this server.
-interface WebSocketRequestData {
- action: OrderWatcherAction;
- params: any;
-}
-
-// Users should expect a json object of this format in the data field
-// of the WebSocket messages that this server sends out.
-interface WebSocketResponseData {
- action: OrderWatcherAction;
- success: number;
- result: any;
-}
-
// Wraps the OrderWatcher functionality in a WebSocket server. Motivations:
// 1) Users can watch orders via non-typescript programs.
// 2) Better encapsulation so that users can work
export class OrderWatcherWebSocketServer {
- public httpServer: http.Server;
- public readonly _orderWatcher: OrderWatcher;
+ public readonly _orderWatcher: OrderWatcher; // public for testing
+ private readonly _httpServer: http.Server;
private readonly _connectionStore: Set<WebSocket.connection>;
private readonly _wsServer: WebSocket.server;
/**
- * Instantiate a new web socket server which provides OrderWatcher functionality
- * @param provider Web3 provider to use for JSON RPC calls (for OrderWatcher)
- * @param networkId NetworkId to watch orders on (for OrderWatcher)
+ * Recover types lost when the payload is stringified.
+ */
+ private static _parseSignedOrder(rawRequest: any): SignedOrder {
+ const bigNumberFields = [
+ 'salt',
+ 'makerFee',
+ 'takerFee',
+ 'makerAssetAmount',
+ 'takerAssetAmount',
+ 'expirationTimeSeconds',
+ ];
+ for (const field of bigNumberFields) {
+ rawRequest[field] = new BigNumber(rawRequest[field]);
+ }
+ return rawRequest;
+ }
+
+ /**
+ * Instantiate a new WebSocket server which provides OrderWatcher functionality
+ * @param provider Web3 provider to use for JSON RPC calls.
+ * @param networkId NetworkId to watch orders on.
* @param contractAddresses Optional contract addresses. Defaults to known
- * addresses based on networkId (for OrderWatcher)
- * @param partialConfig Optional configurations (for OrderWatcher)
+ * addresses based on networkId.
+ * @param partialConfig Optional configurations.
*/
constructor(
provider: Provider,
@@ -64,85 +61,98 @@ export class OrderWatcherWebSocketServer {
) {
this._orderWatcher = new OrderWatcher(provider, networkId, contractAddresses, partialConfig);
this._connectionStore = new Set();
- this.httpServer = http.createServer();
+ this._httpServer = http.createServer();
this._wsServer = new WebSocket.server({
- httpServer: this.httpServer,
+ httpServer: this._httpServer,
+ // Avoid setting autoAcceptConnections to true as it defeats all
+ // standard cross-origin protection facilities built into the protocol
+ // and the browser. Also ensures that a request event is emitted by
+ // the server whenever a new WebSocket request is made.
autoAcceptConnections: false,
});
- this._wsServer.on('request', (request: any) => {
+ this._wsServer.on('request', async (request: any) => {
// Designed for usage pattern where client and server are run on the same
// machine by the same user. As such, no security checks are in place.
const connection: WebSocket.connection = request.accept(null, request.origin);
logUtils.log(`${new Date()} [Server] Accepted connection from origin ${request.origin}.`);
- connection.on('message', this._messageCallback.bind(this, connection));
- connection.on('close', this._closeCallback.bind(this, connection));
+ connection.on('message', await this._onMessageCallbackAsync.bind(this, connection));
+ connection.on('close', this._onCloseCallback.bind(this, connection));
this._connectionStore.add(connection);
});
// Have the WebSocket server subscribe to the OrderWatcher to receive updates.
// These updates are then broadcast to clients in the _connectionStore.
- this._orderWatcher.subscribe(this._broadcastCallback);
+ const broadcastCallback: OnOrderStateChangeCallback = this._broadcastCallback.bind(this);
+ this._orderWatcher.subscribe(broadcastCallback);
}
/**
* Activates the WebSocket server by having its HTTP server start listening.
*/
public listen(): void {
- this.httpServer.listen(DEFAULT_HTTP_PORT, () => {
- logUtils.log(`${new Date()} [Server] Listening on port ${DEFAULT_HTTP_PORT}`);
+ const port = process.env.ORDER_WATCHER_HTTP_PORT || DEFAULT_HTTP_PORT;
+ this._httpServer.listen(port, () => {
+ logUtils.log(`${new Date()} [Server] Listening on port ${port}`);
});
}
+ /**
+ * Deactivates the WebSocket server by stopping the HTTP server from accepting
+ * new connections.
+ */
public close(): void {
- this.httpServer.close();
+ this._httpServer.close();
}
- private _messageCallback(connection: WebSocket.connection, message: any): void {
- assert.doesConformToSchema('message', message, webSocketUtf8MessageSchema);
- const requestData: WebSocketRequestData = JSON.parse(message.utf8Data);
- const responseData = this._routeRequest(requestData);
- logUtils.log(`${new Date()} [Server] OrderWatcher output: ${JSON.stringify(responseData)}`);
- connection.sendUTF(JSON.stringify(responseData));
+ private async _onMessageCallbackAsync(connection: WebSocket.connection, message: any): Promise<void> {
+ const response: WebSocketResponse = {
+ action: null,
+ success: false,
+ result: null,
+ };
+ try {
+ assert.doesConformToSchema('message', message, webSocketUtf8MessageSchema);
+ const request: WebSocketRequest = JSON.parse(message.utf8Data);
+ assert.doesConformToSchema('request', request, webSocketRequestSchema);
+ response.action = request.action;
+ response.success = true;
+ response.result = await this._routeRequestAsync(request);
+ } catch (err) {
+ response.result = err.toString();
+ }
+ logUtils.log(`${new Date()} [Server] OrderWatcher output: ${JSON.stringify(response)}`);
+ connection.sendUTF(JSON.stringify(response));
}
- private _closeCallback(connection: WebSocket.connection): void {
+ private _onCloseCallback(connection: WebSocket.connection): void {
this._connectionStore.delete(connection);
logUtils.log(`${new Date()} [Server] Client ${connection.remoteAddress} disconnected.`);
}
- private _routeRequest(requestData: WebSocketRequestData): WebSocketResponseData {
- const responseData: WebSocketResponseData = {
- action: requestData.action,
- success: 0,
- result: undefined,
- };
-
- try {
- logUtils.log(`${new Date()} [Server] Request received: ${requestData.action}`);
- switch (requestData.action) {
- case 'addOrderAsync': {
- const signedOrder: SignedOrder = this._parseSignedOrder(requestData);
- // tslint:disable-next-line:no-floating-promises
- this._orderWatcher.addOrderAsync(signedOrder); // Ok to fireNforget
- break;
- }
- case 'removeOrder': {
- this._orderWatcher.removeOrder(requestData.params.orderHash);
- break;
- }
- case 'getStats': {
- responseData.result = this._orderWatcher.getStats();
- break;
- }
- default:
- throw new Error(`[Server] Invalid request action: ${requestData.action}`);
+ private async _routeRequestAsync(request: WebSocketRequest): Promise<any> {
+ logUtils.log(`${new Date()} [Server] Request received: ${request.action}`);
+ let result = null;
+ switch (request.action) {
+ case OrderWatcherAction.AddOrder: {
+ const signedOrder: SignedOrder = OrderWatcherWebSocketServer._parseSignedOrder(request.signedOrder);
+ await this._orderWatcher.addOrderAsync(signedOrder);
+ break;
}
- responseData.success = 1;
- } catch (err) {
- responseData.result = { error: err.toString() };
+ case OrderWatcherAction.RemoveOrder: {
+ const orderHash = request.orderHash || '_';
+ this._orderWatcher.removeOrder(orderHash);
+ break;
+ }
+ case OrderWatcherAction.GetStats: {
+ result = this._orderWatcher.getStats();
+ break;
+ }
+ default:
+ // Should never reach here. Should be caught by JSON schema check.
+ throw new Error(`[Server] Invalid request action: ${request.action}`);
}
- return responseData;
+ return result;
}
/**
@@ -150,35 +160,14 @@ export class OrderWatcherWebSocketServer {
* we do not support clients subscribing to only a subset of orders. As such,
* Client B will be notified of changes to an order that Client A added.
*/
- private readonly _broadcastCallback: OnOrderStateChangeCallback = (err, orderState) => {
+ private _broadcastCallback(err: Error | null, orderState?: OrderState): void {
this._connectionStore.forEach((connection: WebSocket.connection) => {
- const responseData: WebSocketResponseData = {
- action: OrderWatcherAction.orderWatcherUpdate,
- success: 1,
+ const response: WebSocketResponse = {
+ action: OrderWatcherAction.Update,
+ success: true,
result: orderState || err,
};
- connection.sendUTF(JSON.stringify(responseData));
+ connection.sendUTF(JSON.stringify(response));
});
- // tslint:disable-next-line:semicolon
- }; // tslint thinks this is a class method, It's actally a property that holds a function.
-
- /**
- * Recover types lost when the payload is stringified.
- */
- private readonly _parseSignedOrder = (requestData: WebSocketRequestData) => {
- const signedOrder = requestData.params.signedOrder;
- const bigNumberFields = [
- 'salt',
- 'makerFee',
- 'takerFee',
- 'makerAssetAmount',
- 'takerAssetAmount',
- 'expirationTimeSeconds',
- ];
- for (const field of bigNumberFields) {
- signedOrder[field] = new BigNumber(signedOrder[field]);
- }
- return signedOrder;
- // tslint:disable-next-line:semicolon
- }; // tslint thinks this is a class method, It's actally a property that holds a function.
+ }
}