diff options
6 files changed, 295 insertions, 144 deletions
diff --git a/packages/order-watcher/README.md b/packages/order-watcher/README.md index c0b99b272..7eae0ae16 100644 --- a/packages/order-watcher/README.md +++ b/packages/order-watcher/README.md @@ -4,6 +4,9 @@ An order watcher daemon that watches for order validity. #### Read the wiki [article](https://0xproject.com/wiki#0x-OrderWatcher). +OrderWatcher also comes with a WebSocket server to provide language-agnostic access +to order watching functionality. We used the [WebSocket Client and Server Implementation for Node](https://www.npmjs.com/package/websocket). + ## Installation **Install** @@ -26,6 +29,84 @@ If your project is in [TypeScript](https://www.typescriptlang.org/), add the fol } ``` +## Using the WebSocket Server + +**Setup** + +**Environmental Variables** +Several environmental variables can be set to configure the server: + +* `ORDER_WATCHER_HTTP_PORT` specifies the port that the http server will listen on + and accept connections from. When this is not set, we default to 8080. + +**Requests** +The server accepts three types of requests: `ADD_ORDER`, `REMOVE_ORDER` and `GET_STATS`. These mirror what the underlying OrderWatcher does. You can read more in the [wiki](https://0xproject.com/wiki#0x-OrderWatcher). Unlike the OrderWatcher, it does not expose any subscribe or unsubscribe functionality because the client implicitly subscribes and unsubscribes by connecting to the server. + +The first step for making a request is establishing a connection with the server. In Javascript: + +``` +var W3CWebSocket = require('websocket').w3cwebsocket; +wsClient = new WebSocket.w3cwebsocket('ws://127.0.0.1:8080'); +``` + +In Python, you could use the [websocket-client library](http://pypi.python.org/pypi/websocket-client/) and run: + +``` +from websocket import create_connection +wsClient = create_connection("ws://127.0.0.1:8080") +``` + +With the connection established, you prepare the payload for your request. The payload is a json object with the following structure: + +* For `GET_STATE`, the payload is `{ action: 'GET_STATS }`. +* For `ADD_ORDER`, use `{ action: 'ADD_ORDER', signedOrder: <your signedOrder> }`. +* For `REMOVE_ORDER`, use `{ action: 'REMOVE_ORDER', orderHash: <your orderHash> }`. + +Next, convert the payload to a string and send it through the connection. +In Javascript: + +``` +const addOrderPayload = { + action: 'ADD_ORDER', + signedOrder: <your signedOrder>, +}; +wsClient.send(JSON.stringify(addOrderPayload)); +``` + +In Python: + +``` +import json +remove_order_payload = { + 'action': 'REMOVE_ORDER', + 'orderHash': '0x6edc16bf37fde79f5012088c33784c730e2f103d9ab1caf73060c386ad107b7e', +} +wsClient.send(json.dumps(remove_order_payload)); +``` + +**Response** +The server responds to all requests in a similar format. In the data field, you'll find another json object that has been converted into a string. This json object contains the following fields: + +* `action`: The action the server is responding to. Eg. `ADD_ORDER`. When order states change the server may also initiate a response. In this case, action will be listed as `UPDATE`. +* `success`: `true` or `false`; Indicates whether the server handled the request without problems. +* `result`: This field varies based on the action. `UPDATE` responses contained the new order state. `GET_STATS` responses contain the current order count. When there are errors, the error messages are stored in here. + +In Javascript, the responses can be parsed using the `onmessage` callback: + +``` +wsClient.onmessage = (msg) => { + const responseData = JSON.parse(msg.data); + const action = responseData.action +}; +``` + +In Python, `recv` is a lightweight way to receive a response: + +``` +result = wsClient.recv() +action = result.action +``` + ## Contributing We strongly recommend that the community help us make improvements and determine the future direction of the protocol. To report bugs within this package, please create an issue in this repository. diff --git a/packages/order-watcher/src/order_watcher/order_watcher_websocket.ts b/packages/order-watcher/src/order_watcher/order_watcher_websocket.ts index 84afc4000..806c7c6a5 100644 --- a/packages/order-watcher/src/order_watcher/order_watcher_websocket.ts +++ b/packages/order-watcher/src/order_watcher/order_watcher_websocket.ts @@ -1,60 +1,57 @@ import { ContractAddresses } from '@0x/contract-addresses'; -import { SignedOrder } from '@0x/types'; +import { OrderState, SignedOrder } from '@0x/types'; import { BigNumber, logUtils } from '@0x/utils'; import { Provider } from 'ethereum-types'; import * as http from 'http'; import * as WebSocket from 'websocket'; -import { webSocketUtf8MessageSchema } from '../schemas/websocket_utf8_message_schema'; -import { OnOrderStateChangeCallback, OrderWatcherConfig } from '../types'; +import { webSocketRequestSchema, webSocketUtf8MessageSchema } from '../schemas/websocket_schemas'; +import { + OnOrderStateChangeCallback, + OrderWatcherAction, + OrderWatcherConfig, + WebSocketRequest, + WebSocketResponse, +} from '../types'; import { assert } from '../utils/assert'; import { OrderWatcher } from './order_watcher'; const DEFAULT_HTTP_PORT = 8080; -const enum OrderWatcherAction { - // Actions initiated by the user. - getStats = 'getStats', - addOrderAsync = 'addOrderAsync', - removeOrder = 'removeOrder', - // These are spontaneous; they are primarily orderstate changes. - orderWatcherUpdate = 'orderWatcherUpdate', - // `subscribe` and `unsubscribe` are methods of OrderWatcher, but we don't - // need to expose them to the WebSocket server user because the user implicitly - // subscribes and unsubscribes by connecting and disconnecting from the server. -} - -// Users have to create a json object of this format and attach it to -// the data field of their WebSocket message to interact with this server. -interface WebSocketRequestData { - action: OrderWatcherAction; - params: any; -} - -// Users should expect a json object of this format in the data field -// of the WebSocket messages that this server sends out. -interface WebSocketResponseData { - action: OrderWatcherAction; - success: number; - result: any; -} - // Wraps the OrderWatcher functionality in a WebSocket server. Motivations: // 1) Users can watch orders via non-typescript programs. // 2) Better encapsulation so that users can work export class OrderWatcherWebSocketServer { - public httpServer: http.Server; - public readonly _orderWatcher: OrderWatcher; + public readonly _orderWatcher: OrderWatcher; // public for testing + private readonly _httpServer: http.Server; private readonly _connectionStore: Set<WebSocket.connection>; private readonly _wsServer: WebSocket.server; /** - * Instantiate a new web socket server which provides OrderWatcher functionality - * @param provider Web3 provider to use for JSON RPC calls (for OrderWatcher) - * @param networkId NetworkId to watch orders on (for OrderWatcher) + * Recover types lost when the payload is stringified. + */ + private static _parseSignedOrder(rawRequest: any): SignedOrder { + const bigNumberFields = [ + 'salt', + 'makerFee', + 'takerFee', + 'makerAssetAmount', + 'takerAssetAmount', + 'expirationTimeSeconds', + ]; + for (const field of bigNumberFields) { + rawRequest[field] = new BigNumber(rawRequest[field]); + } + return rawRequest; + } + + /** + * Instantiate a new WebSocket server which provides OrderWatcher functionality + * @param provider Web3 provider to use for JSON RPC calls. + * @param networkId NetworkId to watch orders on. * @param contractAddresses Optional contract addresses. Defaults to known - * addresses based on networkId (for OrderWatcher) - * @param partialConfig Optional configurations (for OrderWatcher) + * addresses based on networkId. + * @param partialConfig Optional configurations. */ constructor( provider: Provider, @@ -64,85 +61,98 @@ export class OrderWatcherWebSocketServer { ) { this._orderWatcher = new OrderWatcher(provider, networkId, contractAddresses, partialConfig); this._connectionStore = new Set(); - this.httpServer = http.createServer(); + this._httpServer = http.createServer(); this._wsServer = new WebSocket.server({ - httpServer: this.httpServer, + httpServer: this._httpServer, + // Avoid setting autoAcceptConnections to true as it defeats all + // standard cross-origin protection facilities built into the protocol + // and the browser. Also ensures that a request event is emitted by + // the server whenever a new WebSocket request is made. autoAcceptConnections: false, }); - this._wsServer.on('request', (request: any) => { + this._wsServer.on('request', async (request: any) => { // Designed for usage pattern where client and server are run on the same // machine by the same user. As such, no security checks are in place. const connection: WebSocket.connection = request.accept(null, request.origin); logUtils.log(`${new Date()} [Server] Accepted connection from origin ${request.origin}.`); - connection.on('message', this._messageCallback.bind(this, connection)); - connection.on('close', this._closeCallback.bind(this, connection)); + connection.on('message', await this._onMessageCallbackAsync.bind(this, connection)); + connection.on('close', this._onCloseCallback.bind(this, connection)); this._connectionStore.add(connection); }); // Have the WebSocket server subscribe to the OrderWatcher to receive updates. // These updates are then broadcast to clients in the _connectionStore. - this._orderWatcher.subscribe(this._broadcastCallback); + const broadcastCallback: OnOrderStateChangeCallback = this._broadcastCallback.bind(this); + this._orderWatcher.subscribe(broadcastCallback); } /** * Activates the WebSocket server by having its HTTP server start listening. */ public listen(): void { - this.httpServer.listen(DEFAULT_HTTP_PORT, () => { - logUtils.log(`${new Date()} [Server] Listening on port ${DEFAULT_HTTP_PORT}`); + const port = process.env.ORDER_WATCHER_HTTP_PORT || DEFAULT_HTTP_PORT; + this._httpServer.listen(port, () => { + logUtils.log(`${new Date()} [Server] Listening on port ${port}`); }); } + /** + * Deactivates the WebSocket server by stopping the HTTP server from accepting + * new connections. + */ public close(): void { - this.httpServer.close(); + this._httpServer.close(); } - private _messageCallback(connection: WebSocket.connection, message: any): void { - assert.doesConformToSchema('message', message, webSocketUtf8MessageSchema); - const requestData: WebSocketRequestData = JSON.parse(message.utf8Data); - const responseData = this._routeRequest(requestData); - logUtils.log(`${new Date()} [Server] OrderWatcher output: ${JSON.stringify(responseData)}`); - connection.sendUTF(JSON.stringify(responseData)); + private async _onMessageCallbackAsync(connection: WebSocket.connection, message: any): Promise<void> { + const response: WebSocketResponse = { + action: null, + success: false, + result: null, + }; + try { + assert.doesConformToSchema('message', message, webSocketUtf8MessageSchema); + const request: WebSocketRequest = JSON.parse(message.utf8Data); + assert.doesConformToSchema('request', request, webSocketRequestSchema); + response.action = request.action; + response.success = true; + response.result = await this._routeRequestAsync(request); + } catch (err) { + response.result = err.toString(); + } + logUtils.log(`${new Date()} [Server] OrderWatcher output: ${JSON.stringify(response)}`); + connection.sendUTF(JSON.stringify(response)); } - private _closeCallback(connection: WebSocket.connection): void { + private _onCloseCallback(connection: WebSocket.connection): void { this._connectionStore.delete(connection); logUtils.log(`${new Date()} [Server] Client ${connection.remoteAddress} disconnected.`); } - private _routeRequest(requestData: WebSocketRequestData): WebSocketResponseData { - const responseData: WebSocketResponseData = { - action: requestData.action, - success: 0, - result: undefined, - }; - - try { - logUtils.log(`${new Date()} [Server] Request received: ${requestData.action}`); - switch (requestData.action) { - case 'addOrderAsync': { - const signedOrder: SignedOrder = this._parseSignedOrder(requestData); - // tslint:disable-next-line:no-floating-promises - this._orderWatcher.addOrderAsync(signedOrder); // Ok to fireNforget - break; - } - case 'removeOrder': { - this._orderWatcher.removeOrder(requestData.params.orderHash); - break; - } - case 'getStats': { - responseData.result = this._orderWatcher.getStats(); - break; - } - default: - throw new Error(`[Server] Invalid request action: ${requestData.action}`); + private async _routeRequestAsync(request: WebSocketRequest): Promise<any> { + logUtils.log(`${new Date()} [Server] Request received: ${request.action}`); + let result = null; + switch (request.action) { + case OrderWatcherAction.AddOrder: { + const signedOrder: SignedOrder = OrderWatcherWebSocketServer._parseSignedOrder(request.signedOrder); + await this._orderWatcher.addOrderAsync(signedOrder); + break; } - responseData.success = 1; - } catch (err) { - responseData.result = { error: err.toString() }; + case OrderWatcherAction.RemoveOrder: { + const orderHash = request.orderHash || '_'; + this._orderWatcher.removeOrder(orderHash); + break; + } + case OrderWatcherAction.GetStats: { + result = this._orderWatcher.getStats(); + break; + } + default: + // Should never reach here. Should be caught by JSON schema check. + throw new Error(`[Server] Invalid request action: ${request.action}`); } - return responseData; + return result; } /** @@ -150,35 +160,14 @@ export class OrderWatcherWebSocketServer { * we do not support clients subscribing to only a subset of orders. As such, * Client B will be notified of changes to an order that Client A added. */ - private readonly _broadcastCallback: OnOrderStateChangeCallback = (err, orderState) => { + private _broadcastCallback(err: Error | null, orderState?: OrderState): void { this._connectionStore.forEach((connection: WebSocket.connection) => { - const responseData: WebSocketResponseData = { - action: OrderWatcherAction.orderWatcherUpdate, - success: 1, + const response: WebSocketResponse = { + action: OrderWatcherAction.Update, + success: true, result: orderState || err, }; - connection.sendUTF(JSON.stringify(responseData)); + connection.sendUTF(JSON.stringify(response)); }); - // tslint:disable-next-line:semicolon - }; // tslint thinks this is a class method, It's actally a property that holds a function. - - /** - * Recover types lost when the payload is stringified. - */ - private readonly _parseSignedOrder = (requestData: WebSocketRequestData) => { - const signedOrder = requestData.params.signedOrder; - const bigNumberFields = [ - 'salt', - 'makerFee', - 'takerFee', - 'makerAssetAmount', - 'takerAssetAmount', - 'expirationTimeSeconds', - ]; - for (const field of bigNumberFields) { - signedOrder[field] = new BigNumber(signedOrder[field]); - } - return signedOrder; - // tslint:disable-next-line:semicolon - }; // tslint thinks this is a class method, It's actally a property that holds a function. + } } diff --git a/packages/order-watcher/src/schemas/websocket_schemas.ts b/packages/order-watcher/src/schemas/websocket_schemas.ts new file mode 100644 index 000000000..c250d12f1 --- /dev/null +++ b/packages/order-watcher/src/schemas/websocket_schemas.ts @@ -0,0 +1,32 @@ +export const webSocketUtf8MessageSchema = { + id: '/webSocketUtf8MessageSchema', + properties: { + utf8Data: { type: 'string' }, + }, + type: 'object', + required: ['utf8Data'], +}; + +export const webSocketRequestSchema = { + id: '/webSocketRequestSchema', + properties: { + action: { enum: ['GET_STATS', 'ADD_ORDER', 'REMOVE_ORDER'] }, + signedOrder: { $ref: '/signedOrderSchema' }, + orderHash: { type: 'string' }, + }, + anyOf: [ + { + properties: { action: { enum: ['ADD_ORDER'] } }, + required: ['signedOrder'], + }, + { + properties: { action: { enum: ['REMOVE_ORDER'] } }, + required: ['orderHash'], + }, + { + properties: { action: { enum: ['GET_STATS'] } }, + required: [], + }, + ], + type: 'object', +}; diff --git a/packages/order-watcher/src/schemas/websocket_utf8_message_schema.ts b/packages/order-watcher/src/schemas/websocket_utf8_message_schema.ts deleted file mode 100644 index 0a0eed407..000000000 --- a/packages/order-watcher/src/schemas/websocket_utf8_message_schema.ts +++ /dev/null @@ -1,8 +0,0 @@ -export const webSocketUtf8MessageSchema = { - id: '/WebSocketUtf8MessageSchema', - properties: { - utf8Data: { type: 'string' }, - }, - type: 'object', - required: ['utf8Data'], -}; diff --git a/packages/order-watcher/src/types.ts b/packages/order-watcher/src/types.ts index 8078dd971..7f6219732 100644 --- a/packages/order-watcher/src/types.ts +++ b/packages/order-watcher/src/types.ts @@ -1,4 +1,4 @@ -import { OrderState } from '@0x/types'; +import { OrderState, SignedOrder } from '@0x/types'; import { LogEntryEvent } from 'ethereum-types'; export enum OrderWatcherError { @@ -31,3 +31,31 @@ export enum InternalOrderWatcherError { ZrxNotInTokenRegistry = 'ZRX_NOT_IN_TOKEN_REGISTRY', WethNotInTokenRegistry = 'WETH_NOT_IN_TOKEN_REGISTRY', } + +export enum OrderWatcherAction { + // Actions initiated by the user. + GetStats = 'GET_STATS', + AddOrder = 'ADD_ORDER', + RemoveOrder = 'REMOVE_ORDER', + // These are spontaneous; they are primarily orderstate changes. + Update = 'UPDATE', + // `subscribe` and `unsubscribe` are methods of OrderWatcher, but we don't + // need to expose them to the WebSocket server user because the user implicitly + // subscribes and unsubscribes by connecting and disconnecting from the server. +} + +// Users have to create a json object of this format and attach it to +// the data field of their WebSocket message to interact with the server. +export interface WebSocketRequest { + action: OrderWatcherAction; + signedOrder?: SignedOrder; + orderHash?: string; +} + +// Users should expect a json object of this format in the data field +// of the WebSocket messages that the server sends out. +export interface WebSocketResponse { + action: OrderWatcherAction | null; + success: boolean; + result: any; +} diff --git a/packages/order-watcher/test/order_watcher_websocket_test.ts b/packages/order-watcher/test/order_watcher_websocket_test.ts index e7b18e44b..a9e72ce21 100644 --- a/packages/order-watcher/test/order_watcher_websocket_test.ts +++ b/packages/order-watcher/test/order_watcher_websocket_test.ts @@ -41,11 +41,11 @@ describe.only('OrderWatcherWebSocket', async () => { let zrxTokenAddress: string; let signedOrder: SignedOrder; let orderHash: string; - let addOrderPayload: { action: string; params: { signedOrder: SignedOrder } }; - let removeOrderPayload: { action: string; params: { orderHash: string } }; + let addOrderPayload: { action: string; signedOrder: SignedOrder }; + let removeOrderPayload: { action: string; orderHash: string }; const decimals = constants.ZRX_DECIMALS; const fillableAmount = Web3Wrapper.toBaseUnitAmount(new BigNumber(5), decimals); - // createFillableSignedOrderAsync is Promise-based, which forces us + // HACK: createFillableSignedOrderAsync is Promise-based, which forces us // to use Promises instead of the done() callbacks for tests. // onmessage callback must thus be wrapped as a Promise. const _onMessageAsync = async (client: WebSocket.w3cwebsocket) => @@ -88,12 +88,12 @@ describe.only('OrderWatcherWebSocket', async () => { ); orderHash = orderHashUtils.getOrderHashHex(signedOrder); addOrderPayload = { - action: 'addOrderAsync', - params: { signedOrder }, + action: 'ADD_ORDER', + signedOrder, }; removeOrderPayload = { - action: 'removeOrder', - params: { orderHash }, + action: 'REMOVE_ORDER', + orderHash, }; // Prepare OrderWatcher WebSocket server @@ -118,14 +118,13 @@ describe.only('OrderWatcherWebSocket', async () => { it('responds to getStats requests correctly', (done: any) => { const payload = { - action: 'getStats', - params: {}, + action: 'GET_STATS', }; wsClient.onopen = () => wsClient.send(JSON.stringify(payload)); wsClient.onmessage = (msg: any) => { const responseData = JSON.parse(msg.data); - expect(responseData.action).to.be.eq('getStats'); - expect(responseData.success).to.be.eq(1); + expect(responseData.action).to.be.eq('GET_STATS'); + expect(responseData.success).to.be.true(); expect(responseData.result.orderCount).to.be.eq(0); done(); }; @@ -133,23 +132,53 @@ describe.only('OrderWatcherWebSocket', async () => { it('throws an error when an invalid action is attempted', async () => { const invalidActionPayload = { - action: 'badAction', - params: {}, + action: 'BAD_ACTION', }; wsClient.onopen = () => wsClient.send(JSON.stringify(invalidActionPayload)); const errorMsg = await _onMessageAsync(wsClient); const errorData = JSON.parse(errorMsg.data); - expect(errorData.action).to.be.eq('badAction'); - expect(errorData.success).to.be.eq(0); - expect(errorData.result.error).to.be.eq('Error: [Server] Invalid request action: badAction'); + // tslint:disable-next-line:no-unused-expression + expect(errorData.action).to.be.null; + expect(errorData.success).to.be.false(); + expect(errorData.result).to.match(/^Error: Expected request to conform to schema/); }); - it('executes addOrderAsync and removeOrder requests correctly', async () => { + it('throws an error when we try to add an order without a signedOrder', async () => { + const noSignedOrderAddOrderPayload = { + action: 'ADD_ORDER', + orderHash: '0x0', + }; + wsClient.onopen = () => wsClient.send(JSON.stringify(noSignedOrderAddOrderPayload)); + const errorMsg = await _onMessageAsync(wsClient); + const errorData = JSON.parse(errorMsg.data); + // tslint:disable-next-line:no-unused-expression + expect(errorData.action).to.be.null; + expect(errorData.success).to.be.false(); + expect(errorData.result).to.match(/^Error: Expected request to conform to schema/); + }); + + it('throws an error when we try to add a bad signedOrder', async () => { + const invalidAddOrderPayload = { + action: 'ADD_ORDER', + signedOrder: { + makerAddress: '0x0', + }, + }; + wsClient.onopen = () => wsClient.send(JSON.stringify(invalidAddOrderPayload)); + const errorMsg = await _onMessageAsync(wsClient); + const errorData = JSON.parse(errorMsg.data); + // tslint:disable-next-line:no-unused-expression + expect(errorData.action).to.be.null; + expect(errorData.success).to.be.false(); + expect(errorData.result).to.match(/^Error: Expected request to conform to schema/); + }); + + it('executes addOrder and removeOrder requests correctly', async () => { wsClient.onopen = () => wsClient.send(JSON.stringify(addOrderPayload)); const addOrderMsg = await _onMessageAsync(wsClient); const addOrderData = JSON.parse(addOrderMsg.data); - expect(addOrderData.action).to.be.eq('addOrderAsync'); - expect(addOrderData.success).to.be.eq(1); + expect(addOrderData.action).to.be.eq('ADD_ORDER'); + expect(addOrderData.success).to.be.true(); expect((wsServer._orderWatcher as any)._orderByOrderHash).to.deep.include({ [orderHash]: signedOrder, }); @@ -157,8 +186,8 @@ describe.only('OrderWatcherWebSocket', async () => { wsClient.send(JSON.stringify(removeOrderPayload)); const removeOrderMsg = await _onMessageAsync(wsClient); const removeOrderData = JSON.parse(removeOrderMsg.data); - expect(removeOrderData.action).to.be.eq('removeOrder'); - expect(removeOrderData.success).to.be.eq(1); + expect(removeOrderData.action).to.be.eq('REMOVE_ORDER'); + expect(removeOrderData.success).to.be.true(); expect((wsServer._orderWatcher as any)._orderByOrderHash).to.not.deep.include({ [orderHash]: signedOrder, }); @@ -175,8 +204,8 @@ describe.only('OrderWatcherWebSocket', async () => { // Ensure that orderStateInvalid message is received. const orderWatcherUpdateMsg = await _onMessageAsync(wsClient); const orderWatcherUpdateData = JSON.parse(orderWatcherUpdateMsg.data); - expect(orderWatcherUpdateData.action).to.be.eq('orderWatcherUpdate'); - expect(orderWatcherUpdateData.success).to.be.eq(1); + expect(orderWatcherUpdateData.action).to.be.eq('UPDATE'); + expect(orderWatcherUpdateData.success).to.be.true(); const invalidOrderState = orderWatcherUpdateData.result as OrderStateInvalid; expect(invalidOrderState.isValid).to.be.false(); expect(invalidOrderState.orderHash).to.be.eq(orderHash); @@ -198,8 +227,8 @@ describe.only('OrderWatcherWebSocket', async () => { takerAddress, ); const nonZeroMakerFeeOrderPayload = { - action: 'addOrderAsync', - params: { nonZeroMakerFeeSignedOrder }, + action: 'ADD_ORDER', + signedOrder: nonZeroMakerFeeSignedOrder, }; // Set up a second client and have it add the order |