From 687749460d026ae8b16e355c85c70e1e79b63252 Mon Sep 17 00:00:00 2001 From: kao Date: Mon, 26 Nov 2018 17:12:33 -0800 Subject: WIP: OrderWatcher WebSocket Currently incomplete. Main challenge is to figure out how to test a client + server setup in the single-threaded javascript environment. --- packages/order-watcher/package.json | 3 +- .../src/order_watcher/order_watcher_websocket.ts | 184 +++++++++++++++++ .../src/schemas/websocket_utf8_message_schema.ts | 8 + .../test/order_watcher_websocket_test.ts | 226 +++++++++++++++++++++ 4 files changed, 420 insertions(+), 1 deletion(-) create mode 100644 packages/order-watcher/src/order_watcher/order_watcher_websocket.ts create mode 100644 packages/order-watcher/src/schemas/websocket_utf8_message_schema.ts create mode 100644 packages/order-watcher/test/order_watcher_websocket_test.ts (limited to 'packages/order-watcher') diff --git a/packages/order-watcher/package.json b/packages/order-watcher/package.json index 9a51203f4..cfaf5d724 100644 --- a/packages/order-watcher/package.json +++ b/packages/order-watcher/package.json @@ -74,7 +74,8 @@ "ethereum-types": "^1.1.2", "ethereumjs-blockstream": "6.0.0", "ethers": "~4.0.4", - "lodash": "^4.17.5" + "lodash": "^4.17.5", + "websocket": "^1.0.25" }, "publishConfig": { "access": "public" diff --git a/packages/order-watcher/src/order_watcher/order_watcher_websocket.ts b/packages/order-watcher/src/order_watcher/order_watcher_websocket.ts new file mode 100644 index 000000000..84afc4000 --- /dev/null +++ b/packages/order-watcher/src/order_watcher/order_watcher_websocket.ts @@ -0,0 +1,184 @@ +import { ContractAddresses } from '@0x/contract-addresses'; +import { SignedOrder } from '@0x/types'; +import { BigNumber, logUtils } from '@0x/utils'; +import { Provider } from 'ethereum-types'; +import * as http from 'http'; +import * as WebSocket from 'websocket'; + +import { webSocketUtf8MessageSchema } from '../schemas/websocket_utf8_message_schema'; +import { OnOrderStateChangeCallback, OrderWatcherConfig } from '../types'; +import { assert } from '../utils/assert'; + +import { OrderWatcher } from './order_watcher'; + +const DEFAULT_HTTP_PORT = 8080; + +const enum OrderWatcherAction { + // Actions initiated by the user. + getStats = 'getStats', + addOrderAsync = 'addOrderAsync', + removeOrder = 'removeOrder', + // These are spontaneous; they are primarily orderstate changes. + orderWatcherUpdate = 'orderWatcherUpdate', + // `subscribe` and `unsubscribe` are methods of OrderWatcher, but we don't + // need to expose them to the WebSocket server user because the user implicitly + // subscribes and unsubscribes by connecting and disconnecting from the server. +} + +// Users have to create a json object of this format and attach it to +// the data field of their WebSocket message to interact with this server. +interface WebSocketRequestData { + action: OrderWatcherAction; + params: any; +} + +// Users should expect a json object of this format in the data field +// of the WebSocket messages that this server sends out. +interface WebSocketResponseData { + action: OrderWatcherAction; + success: number; + result: any; +} + +// Wraps the OrderWatcher functionality in a WebSocket server. Motivations: +// 1) Users can watch orders via non-typescript programs. +// 2) Better encapsulation so that users can work +export class OrderWatcherWebSocketServer { + public httpServer: http.Server; + public readonly _orderWatcher: OrderWatcher; + private readonly _connectionStore: Set; + private readonly _wsServer: WebSocket.server; + /** + * Instantiate a new web socket server which provides OrderWatcher functionality + * @param provider Web3 provider to use for JSON RPC calls (for OrderWatcher) + * @param networkId NetworkId to watch orders on (for OrderWatcher) + * @param contractAddresses Optional contract addresses. Defaults to known + * addresses based on networkId (for OrderWatcher) + * @param partialConfig Optional configurations (for OrderWatcher) + */ + constructor( + provider: Provider, + networkId: number, + contractAddresses?: ContractAddresses, + partialConfig?: Partial, + ) { + this._orderWatcher = new OrderWatcher(provider, networkId, contractAddresses, partialConfig); + this._connectionStore = new Set(); + this.httpServer = http.createServer(); + this._wsServer = new WebSocket.server({ + httpServer: this.httpServer, + autoAcceptConnections: false, + }); + + this._wsServer.on('request', (request: any) => { + // Designed for usage pattern where client and server are run on the same + // machine by the same user. As such, no security checks are in place. + const connection: WebSocket.connection = request.accept(null, request.origin); + logUtils.log(`${new Date()} [Server] Accepted connection from origin ${request.origin}.`); + connection.on('message', this._messageCallback.bind(this, connection)); + connection.on('close', this._closeCallback.bind(this, connection)); + this._connectionStore.add(connection); + }); + + // Have the WebSocket server subscribe to the OrderWatcher to receive updates. + // These updates are then broadcast to clients in the _connectionStore. + this._orderWatcher.subscribe(this._broadcastCallback); + } + + /** + * Activates the WebSocket server by having its HTTP server start listening. + */ + public listen(): void { + this.httpServer.listen(DEFAULT_HTTP_PORT, () => { + logUtils.log(`${new Date()} [Server] Listening on port ${DEFAULT_HTTP_PORT}`); + }); + } + + public close(): void { + this.httpServer.close(); + } + + private _messageCallback(connection: WebSocket.connection, message: any): void { + assert.doesConformToSchema('message', message, webSocketUtf8MessageSchema); + const requestData: WebSocketRequestData = JSON.parse(message.utf8Data); + const responseData = this._routeRequest(requestData); + logUtils.log(`${new Date()} [Server] OrderWatcher output: ${JSON.stringify(responseData)}`); + connection.sendUTF(JSON.stringify(responseData)); + } + + private _closeCallback(connection: WebSocket.connection): void { + this._connectionStore.delete(connection); + logUtils.log(`${new Date()} [Server] Client ${connection.remoteAddress} disconnected.`); + } + + private _routeRequest(requestData: WebSocketRequestData): WebSocketResponseData { + const responseData: WebSocketResponseData = { + action: requestData.action, + success: 0, + result: undefined, + }; + + try { + logUtils.log(`${new Date()} [Server] Request received: ${requestData.action}`); + switch (requestData.action) { + case 'addOrderAsync': { + const signedOrder: SignedOrder = this._parseSignedOrder(requestData); + // tslint:disable-next-line:no-floating-promises + this._orderWatcher.addOrderAsync(signedOrder); // Ok to fireNforget + break; + } + case 'removeOrder': { + this._orderWatcher.removeOrder(requestData.params.orderHash); + break; + } + case 'getStats': { + responseData.result = this._orderWatcher.getStats(); + break; + } + default: + throw new Error(`[Server] Invalid request action: ${requestData.action}`); + } + responseData.success = 1; + } catch (err) { + responseData.result = { error: err.toString() }; + } + return responseData; + } + + /** + * Broadcasts OrderState changes to ALL connected clients. At the moment, + * we do not support clients subscribing to only a subset of orders. As such, + * Client B will be notified of changes to an order that Client A added. + */ + private readonly _broadcastCallback: OnOrderStateChangeCallback = (err, orderState) => { + this._connectionStore.forEach((connection: WebSocket.connection) => { + const responseData: WebSocketResponseData = { + action: OrderWatcherAction.orderWatcherUpdate, + success: 1, + result: orderState || err, + }; + connection.sendUTF(JSON.stringify(responseData)); + }); + // tslint:disable-next-line:semicolon + }; // tslint thinks this is a class method, It's actally a property that holds a function. + + /** + * Recover types lost when the payload is stringified. + */ + private readonly _parseSignedOrder = (requestData: WebSocketRequestData) => { + const signedOrder = requestData.params.signedOrder; + const bigNumberFields = [ + 'salt', + 'makerFee', + 'takerFee', + 'makerAssetAmount', + 'takerAssetAmount', + 'expirationTimeSeconds', + ]; + for (const field of bigNumberFields) { + signedOrder[field] = new BigNumber(signedOrder[field]); + } + return signedOrder; + // tslint:disable-next-line:semicolon + }; // tslint thinks this is a class method, It's actally a property that holds a function. +} diff --git a/packages/order-watcher/src/schemas/websocket_utf8_message_schema.ts b/packages/order-watcher/src/schemas/websocket_utf8_message_schema.ts new file mode 100644 index 000000000..0a0eed407 --- /dev/null +++ b/packages/order-watcher/src/schemas/websocket_utf8_message_schema.ts @@ -0,0 +1,8 @@ +export const webSocketUtf8MessageSchema = { + id: '/WebSocketUtf8MessageSchema', + properties: { + utf8Data: { type: 'string' }, + }, + type: 'object', + required: ['utf8Data'], +}; diff --git a/packages/order-watcher/test/order_watcher_websocket_test.ts b/packages/order-watcher/test/order_watcher_websocket_test.ts new file mode 100644 index 000000000..e7b18e44b --- /dev/null +++ b/packages/order-watcher/test/order_watcher_websocket_test.ts @@ -0,0 +1,226 @@ +import { ContractWrappers } from '@0x/contract-wrappers'; +import { tokenUtils } from '@0x/contract-wrappers/lib/test/utils/token_utils'; +import { BlockchainLifecycle } from '@0x/dev-utils'; +import { FillScenarios } from '@0x/fill-scenarios'; +import { assetDataUtils, orderHashUtils } from '@0x/order-utils'; +import { ExchangeContractErrs, OrderStateInvalid, OrderStateValid, SignedOrder } from '@0x/types'; +import { BigNumber, logUtils } from '@0x/utils'; +import { Web3Wrapper } from '@0x/web3-wrapper'; +import * as chai from 'chai'; +import 'mocha'; +import * as WebSocket from 'websocket'; + +import { OrderWatcherWebSocketServer } from '../src/order_watcher/order_watcher_websocket'; + +import { chaiSetup } from './utils/chai_setup'; +import { constants } from './utils/constants'; +import { migrateOnceAsync } from './utils/migrate'; +import { provider, web3Wrapper } from './utils/web3_wrapper'; + +chaiSetup.configure(); +const expect = chai.expect; +const blockchainLifecycle = new BlockchainLifecycle(web3Wrapper); + +interface WsMessage { + data: string; +} + +describe.only('OrderWatcherWebSocket', async () => { + let contractWrappers: ContractWrappers; + let wsServer: OrderWatcherWebSocketServer; + let wsClient: WebSocket.w3cwebsocket; + let wsClientTwo: WebSocket.w3cwebsocket; + let fillScenarios: FillScenarios; + let userAddresses: string[]; + let makerAssetData: string; + let takerAssetData: string; + let makerTokenAddress: string; + let takerTokenAddress: string; + let makerAddress: string; + let takerAddress: string; + let zrxTokenAddress: string; + let signedOrder: SignedOrder; + let orderHash: string; + let addOrderPayload: { action: string; params: { signedOrder: SignedOrder } }; + let removeOrderPayload: { action: string; params: { orderHash: string } }; + const decimals = constants.ZRX_DECIMALS; + const fillableAmount = Web3Wrapper.toBaseUnitAmount(new BigNumber(5), decimals); + // createFillableSignedOrderAsync is Promise-based, which forces us + // to use Promises instead of the done() callbacks for tests. + // onmessage callback must thus be wrapped as a Promise. + const _onMessageAsync = async (client: WebSocket.w3cwebsocket) => + new Promise(resolve => { + client.onmessage = (msg: WsMessage) => resolve(msg); + }); + + before(async () => { + // Set up constants + const contractAddresses = await migrateOnceAsync(); + await blockchainLifecycle.startAsync(); + const networkId = constants.TESTRPC_NETWORK_ID; + const config = { + networkId, + contractAddresses, + }; + contractWrappers = new ContractWrappers(provider, config); + userAddresses = await web3Wrapper.getAvailableAddressesAsync(); + zrxTokenAddress = contractAddresses.zrxToken; + [makerAddress, takerAddress] = userAddresses; + [makerTokenAddress, takerTokenAddress] = tokenUtils.getDummyERC20TokenAddresses(); + [makerAssetData, takerAssetData] = [ + assetDataUtils.encodeERC20AssetData(makerTokenAddress), + assetDataUtils.encodeERC20AssetData(takerTokenAddress), + ]; + fillScenarios = new FillScenarios( + provider, + userAddresses, + zrxTokenAddress, + contractAddresses.exchange, + contractAddresses.erc20Proxy, + contractAddresses.erc721Proxy, + ); + signedOrder = await fillScenarios.createFillableSignedOrderAsync( + makerAssetData, + takerAssetData, + makerAddress, + takerAddress, + fillableAmount, + ); + orderHash = orderHashUtils.getOrderHashHex(signedOrder); + addOrderPayload = { + action: 'addOrderAsync', + params: { signedOrder }, + }; + removeOrderPayload = { + action: 'removeOrder', + params: { orderHash }, + }; + + // Prepare OrderWatcher WebSocket server + const orderWatcherConfig = {}; + wsServer = new OrderWatcherWebSocketServer(provider, networkId, contractAddresses, orderWatcherConfig); + wsServer.listen(); + }); + after(async () => { + await blockchainLifecycle.revertAsync(); + wsServer.close(); + }); + beforeEach(async () => { + await blockchainLifecycle.startAsync(); + wsClient = new WebSocket.w3cwebsocket('ws://127.0.0.1:8080/'); + logUtils.log(`${new Date()} [Client] Connected.`); + }); + afterEach(async () => { + await blockchainLifecycle.revertAsync(); + wsClient.close(); + logUtils.log(`${new Date()} [Client] Closed.`); + }); + + it('responds to getStats requests correctly', (done: any) => { + const payload = { + action: 'getStats', + params: {}, + }; + wsClient.onopen = () => wsClient.send(JSON.stringify(payload)); + wsClient.onmessage = (msg: any) => { + const responseData = JSON.parse(msg.data); + expect(responseData.action).to.be.eq('getStats'); + expect(responseData.success).to.be.eq(1); + expect(responseData.result.orderCount).to.be.eq(0); + done(); + }; + }); + + it('throws an error when an invalid action is attempted', async () => { + const invalidActionPayload = { + action: 'badAction', + params: {}, + }; + wsClient.onopen = () => wsClient.send(JSON.stringify(invalidActionPayload)); + const errorMsg = await _onMessageAsync(wsClient); + const errorData = JSON.parse(errorMsg.data); + expect(errorData.action).to.be.eq('badAction'); + expect(errorData.success).to.be.eq(0); + expect(errorData.result.error).to.be.eq('Error: [Server] Invalid request action: badAction'); + }); + + it('executes addOrderAsync and removeOrder requests correctly', async () => { + wsClient.onopen = () => wsClient.send(JSON.stringify(addOrderPayload)); + const addOrderMsg = await _onMessageAsync(wsClient); + const addOrderData = JSON.parse(addOrderMsg.data); + expect(addOrderData.action).to.be.eq('addOrderAsync'); + expect(addOrderData.success).to.be.eq(1); + expect((wsServer._orderWatcher as any)._orderByOrderHash).to.deep.include({ + [orderHash]: signedOrder, + }); + + wsClient.send(JSON.stringify(removeOrderPayload)); + const removeOrderMsg = await _onMessageAsync(wsClient); + const removeOrderData = JSON.parse(removeOrderMsg.data); + expect(removeOrderData.action).to.be.eq('removeOrder'); + expect(removeOrderData.success).to.be.eq(1); + expect((wsServer._orderWatcher as any)._orderByOrderHash).to.not.deep.include({ + [orderHash]: signedOrder, + }); + }); + + it('broadcasts orderStateInvalid message when makerAddress allowance set to 0 for watched order', async () => { + // Add the regular order + wsClient.onopen = () => wsClient.send(JSON.stringify(addOrderPayload)); + await _onMessageAsync(wsClient); + + // Set the allowance to 0 + await contractWrappers.erc20Token.setProxyAllowanceAsync(makerTokenAddress, makerAddress, new BigNumber(0)); + + // Ensure that orderStateInvalid message is received. + const orderWatcherUpdateMsg = await _onMessageAsync(wsClient); + const orderWatcherUpdateData = JSON.parse(orderWatcherUpdateMsg.data); + expect(orderWatcherUpdateData.action).to.be.eq('orderWatcherUpdate'); + expect(orderWatcherUpdateData.success).to.be.eq(1); + const invalidOrderState = orderWatcherUpdateData.result as OrderStateInvalid; + expect(invalidOrderState.isValid).to.be.false(); + expect(invalidOrderState.orderHash).to.be.eq(orderHash); + expect(invalidOrderState.error).to.be.eq(ExchangeContractErrs.InsufficientMakerAllowance); + }); + + it('broadcasts to multiple clients when an order backing ZRX allowance changes', async () => { + // Prepare order + const makerFee = Web3Wrapper.toBaseUnitAmount(new BigNumber(2), decimals); + const takerFee = Web3Wrapper.toBaseUnitAmount(new BigNumber(0), decimals); + const nonZeroMakerFeeSignedOrder = await fillScenarios.createFillableSignedOrderWithFeesAsync( + makerAssetData, + takerAssetData, + makerFee, + takerFee, + makerAddress, + takerAddress, + fillableAmount, + takerAddress, + ); + const nonZeroMakerFeeOrderPayload = { + action: 'addOrderAsync', + params: { nonZeroMakerFeeSignedOrder }, + }; + + // Set up a second client and have it add the order + wsClientTwo = new WebSocket.w3cwebsocket('ws://127.0.0.1:8080/'); + logUtils.log(`${new Date()} [Client] Connected.`); + wsClientTwo.onopen = () => wsClientTwo.send(JSON.stringify(nonZeroMakerFeeOrderPayload)); + await _onMessageAsync(wsClientTwo); + + // Change the allowance + await contractWrappers.erc20Token.setProxyAllowanceAsync(zrxTokenAddress, makerAddress, new BigNumber(0)); + + // Check that both clients receive the emitted event + for (const client of [wsClient, wsClientTwo]) { + const updateMsg = await _onMessageAsync(client); + const updateData = JSON.parse(updateMsg.data); + const orderState = updateData.result as OrderStateValid; + expect(orderState.isValid).to.be.true(); + expect(orderState.orderRelevantState.makerFeeProxyAllowance).to.be.eq('0'); + } + + wsClientTwo.close(); + logUtils.log(`${new Date()} [Client] Closed.`); + }); +}); -- cgit v1.2.3 From d9b58848346a4be41684eea244e393afaab6a617 Mon Sep 17 00:00:00 2001 From: kao Date: Thu, 13 Dec 2018 13:33:46 -0800 Subject: Respond to CR --- packages/order-watcher/README.md | 81 ++++++++ .../src/order_watcher/order_watcher_websocket.ts | 209 ++++++++++----------- .../order-watcher/src/schemas/websocket_schemas.ts | 32 ++++ .../src/schemas/websocket_utf8_message_schema.ts | 8 - packages/order-watcher/src/types.ts | 30 ++- .../test/order_watcher_websocket_test.ts | 79 +++++--- 6 files changed, 295 insertions(+), 144 deletions(-) create mode 100644 packages/order-watcher/src/schemas/websocket_schemas.ts delete mode 100644 packages/order-watcher/src/schemas/websocket_utf8_message_schema.ts (limited to 'packages/order-watcher') diff --git a/packages/order-watcher/README.md b/packages/order-watcher/README.md index c0b99b272..7eae0ae16 100644 --- a/packages/order-watcher/README.md +++ b/packages/order-watcher/README.md @@ -4,6 +4,9 @@ An order watcher daemon that watches for order validity. #### Read the wiki [article](https://0xproject.com/wiki#0x-OrderWatcher). +OrderWatcher also comes with a WebSocket server to provide language-agnostic access +to order watching functionality. We used the [WebSocket Client and Server Implementation for Node](https://www.npmjs.com/package/websocket). + ## Installation **Install** @@ -26,6 +29,84 @@ If your project is in [TypeScript](https://www.typescriptlang.org/), add the fol } ``` +## Using the WebSocket Server + +**Setup** + +**Environmental Variables** +Several environmental variables can be set to configure the server: + +* `ORDER_WATCHER_HTTP_PORT` specifies the port that the http server will listen on + and accept connections from. When this is not set, we default to 8080. + +**Requests** +The server accepts three types of requests: `ADD_ORDER`, `REMOVE_ORDER` and `GET_STATS`. These mirror what the underlying OrderWatcher does. You can read more in the [wiki](https://0xproject.com/wiki#0x-OrderWatcher). Unlike the OrderWatcher, it does not expose any subscribe or unsubscribe functionality because the client implicitly subscribes and unsubscribes by connecting to the server. + +The first step for making a request is establishing a connection with the server. In Javascript: + +``` +var W3CWebSocket = require('websocket').w3cwebsocket; +wsClient = new WebSocket.w3cwebsocket('ws://127.0.0.1:8080'); +``` + +In Python, you could use the [websocket-client library](http://pypi.python.org/pypi/websocket-client/) and run: + +``` +from websocket import create_connection +wsClient = create_connection("ws://127.0.0.1:8080") +``` + +With the connection established, you prepare the payload for your request. The payload is a json object with the following structure: + +* For `GET_STATE`, the payload is `{ action: 'GET_STATS }`. +* For `ADD_ORDER`, use `{ action: 'ADD_ORDER', signedOrder: }`. +* For `REMOVE_ORDER`, use `{ action: 'REMOVE_ORDER', orderHash: }`. + +Next, convert the payload to a string and send it through the connection. +In Javascript: + +``` +const addOrderPayload = { + action: 'ADD_ORDER', + signedOrder: , +}; +wsClient.send(JSON.stringify(addOrderPayload)); +``` + +In Python: + +``` +import json +remove_order_payload = { + 'action': 'REMOVE_ORDER', + 'orderHash': '0x6edc16bf37fde79f5012088c33784c730e2f103d9ab1caf73060c386ad107b7e', +} +wsClient.send(json.dumps(remove_order_payload)); +``` + +**Response** +The server responds to all requests in a similar format. In the data field, you'll find another json object that has been converted into a string. This json object contains the following fields: + +* `action`: The action the server is responding to. Eg. `ADD_ORDER`. When order states change the server may also initiate a response. In this case, action will be listed as `UPDATE`. +* `success`: `true` or `false`; Indicates whether the server handled the request without problems. +* `result`: This field varies based on the action. `UPDATE` responses contained the new order state. `GET_STATS` responses contain the current order count. When there are errors, the error messages are stored in here. + +In Javascript, the responses can be parsed using the `onmessage` callback: + +``` +wsClient.onmessage = (msg) => { + const responseData = JSON.parse(msg.data); + const action = responseData.action +}; +``` + +In Python, `recv` is a lightweight way to receive a response: + +``` +result = wsClient.recv() +action = result.action +``` + ## Contributing We strongly recommend that the community help us make improvements and determine the future direction of the protocol. To report bugs within this package, please create an issue in this repository. diff --git a/packages/order-watcher/src/order_watcher/order_watcher_websocket.ts b/packages/order-watcher/src/order_watcher/order_watcher_websocket.ts index 84afc4000..806c7c6a5 100644 --- a/packages/order-watcher/src/order_watcher/order_watcher_websocket.ts +++ b/packages/order-watcher/src/order_watcher/order_watcher_websocket.ts @@ -1,60 +1,57 @@ import { ContractAddresses } from '@0x/contract-addresses'; -import { SignedOrder } from '@0x/types'; +import { OrderState, SignedOrder } from '@0x/types'; import { BigNumber, logUtils } from '@0x/utils'; import { Provider } from 'ethereum-types'; import * as http from 'http'; import * as WebSocket from 'websocket'; -import { webSocketUtf8MessageSchema } from '../schemas/websocket_utf8_message_schema'; -import { OnOrderStateChangeCallback, OrderWatcherConfig } from '../types'; +import { webSocketRequestSchema, webSocketUtf8MessageSchema } from '../schemas/websocket_schemas'; +import { + OnOrderStateChangeCallback, + OrderWatcherAction, + OrderWatcherConfig, + WebSocketRequest, + WebSocketResponse, +} from '../types'; import { assert } from '../utils/assert'; import { OrderWatcher } from './order_watcher'; const DEFAULT_HTTP_PORT = 8080; -const enum OrderWatcherAction { - // Actions initiated by the user. - getStats = 'getStats', - addOrderAsync = 'addOrderAsync', - removeOrder = 'removeOrder', - // These are spontaneous; they are primarily orderstate changes. - orderWatcherUpdate = 'orderWatcherUpdate', - // `subscribe` and `unsubscribe` are methods of OrderWatcher, but we don't - // need to expose them to the WebSocket server user because the user implicitly - // subscribes and unsubscribes by connecting and disconnecting from the server. -} - -// Users have to create a json object of this format and attach it to -// the data field of their WebSocket message to interact with this server. -interface WebSocketRequestData { - action: OrderWatcherAction; - params: any; -} - -// Users should expect a json object of this format in the data field -// of the WebSocket messages that this server sends out. -interface WebSocketResponseData { - action: OrderWatcherAction; - success: number; - result: any; -} - // Wraps the OrderWatcher functionality in a WebSocket server. Motivations: // 1) Users can watch orders via non-typescript programs. // 2) Better encapsulation so that users can work export class OrderWatcherWebSocketServer { - public httpServer: http.Server; - public readonly _orderWatcher: OrderWatcher; + public readonly _orderWatcher: OrderWatcher; // public for testing + private readonly _httpServer: http.Server; private readonly _connectionStore: Set; private readonly _wsServer: WebSocket.server; /** - * Instantiate a new web socket server which provides OrderWatcher functionality - * @param provider Web3 provider to use for JSON RPC calls (for OrderWatcher) - * @param networkId NetworkId to watch orders on (for OrderWatcher) + * Recover types lost when the payload is stringified. + */ + private static _parseSignedOrder(rawRequest: any): SignedOrder { + const bigNumberFields = [ + 'salt', + 'makerFee', + 'takerFee', + 'makerAssetAmount', + 'takerAssetAmount', + 'expirationTimeSeconds', + ]; + for (const field of bigNumberFields) { + rawRequest[field] = new BigNumber(rawRequest[field]); + } + return rawRequest; + } + + /** + * Instantiate a new WebSocket server which provides OrderWatcher functionality + * @param provider Web3 provider to use for JSON RPC calls. + * @param networkId NetworkId to watch orders on. * @param contractAddresses Optional contract addresses. Defaults to known - * addresses based on networkId (for OrderWatcher) - * @param partialConfig Optional configurations (for OrderWatcher) + * addresses based on networkId. + * @param partialConfig Optional configurations. */ constructor( provider: Provider, @@ -64,85 +61,98 @@ export class OrderWatcherWebSocketServer { ) { this._orderWatcher = new OrderWatcher(provider, networkId, contractAddresses, partialConfig); this._connectionStore = new Set(); - this.httpServer = http.createServer(); + this._httpServer = http.createServer(); this._wsServer = new WebSocket.server({ - httpServer: this.httpServer, + httpServer: this._httpServer, + // Avoid setting autoAcceptConnections to true as it defeats all + // standard cross-origin protection facilities built into the protocol + // and the browser. Also ensures that a request event is emitted by + // the server whenever a new WebSocket request is made. autoAcceptConnections: false, }); - this._wsServer.on('request', (request: any) => { + this._wsServer.on('request', async (request: any) => { // Designed for usage pattern where client and server are run on the same // machine by the same user. As such, no security checks are in place. const connection: WebSocket.connection = request.accept(null, request.origin); logUtils.log(`${new Date()} [Server] Accepted connection from origin ${request.origin}.`); - connection.on('message', this._messageCallback.bind(this, connection)); - connection.on('close', this._closeCallback.bind(this, connection)); + connection.on('message', await this._onMessageCallbackAsync.bind(this, connection)); + connection.on('close', this._onCloseCallback.bind(this, connection)); this._connectionStore.add(connection); }); // Have the WebSocket server subscribe to the OrderWatcher to receive updates. // These updates are then broadcast to clients in the _connectionStore. - this._orderWatcher.subscribe(this._broadcastCallback); + const broadcastCallback: OnOrderStateChangeCallback = this._broadcastCallback.bind(this); + this._orderWatcher.subscribe(broadcastCallback); } /** * Activates the WebSocket server by having its HTTP server start listening. */ public listen(): void { - this.httpServer.listen(DEFAULT_HTTP_PORT, () => { - logUtils.log(`${new Date()} [Server] Listening on port ${DEFAULT_HTTP_PORT}`); + const port = process.env.ORDER_WATCHER_HTTP_PORT || DEFAULT_HTTP_PORT; + this._httpServer.listen(port, () => { + logUtils.log(`${new Date()} [Server] Listening on port ${port}`); }); } + /** + * Deactivates the WebSocket server by stopping the HTTP server from accepting + * new connections. + */ public close(): void { - this.httpServer.close(); + this._httpServer.close(); } - private _messageCallback(connection: WebSocket.connection, message: any): void { - assert.doesConformToSchema('message', message, webSocketUtf8MessageSchema); - const requestData: WebSocketRequestData = JSON.parse(message.utf8Data); - const responseData = this._routeRequest(requestData); - logUtils.log(`${new Date()} [Server] OrderWatcher output: ${JSON.stringify(responseData)}`); - connection.sendUTF(JSON.stringify(responseData)); + private async _onMessageCallbackAsync(connection: WebSocket.connection, message: any): Promise { + const response: WebSocketResponse = { + action: null, + success: false, + result: null, + }; + try { + assert.doesConformToSchema('message', message, webSocketUtf8MessageSchema); + const request: WebSocketRequest = JSON.parse(message.utf8Data); + assert.doesConformToSchema('request', request, webSocketRequestSchema); + response.action = request.action; + response.success = true; + response.result = await this._routeRequestAsync(request); + } catch (err) { + response.result = err.toString(); + } + logUtils.log(`${new Date()} [Server] OrderWatcher output: ${JSON.stringify(response)}`); + connection.sendUTF(JSON.stringify(response)); } - private _closeCallback(connection: WebSocket.connection): void { + private _onCloseCallback(connection: WebSocket.connection): void { this._connectionStore.delete(connection); logUtils.log(`${new Date()} [Server] Client ${connection.remoteAddress} disconnected.`); } - private _routeRequest(requestData: WebSocketRequestData): WebSocketResponseData { - const responseData: WebSocketResponseData = { - action: requestData.action, - success: 0, - result: undefined, - }; - - try { - logUtils.log(`${new Date()} [Server] Request received: ${requestData.action}`); - switch (requestData.action) { - case 'addOrderAsync': { - const signedOrder: SignedOrder = this._parseSignedOrder(requestData); - // tslint:disable-next-line:no-floating-promises - this._orderWatcher.addOrderAsync(signedOrder); // Ok to fireNforget - break; - } - case 'removeOrder': { - this._orderWatcher.removeOrder(requestData.params.orderHash); - break; - } - case 'getStats': { - responseData.result = this._orderWatcher.getStats(); - break; - } - default: - throw new Error(`[Server] Invalid request action: ${requestData.action}`); + private async _routeRequestAsync(request: WebSocketRequest): Promise { + logUtils.log(`${new Date()} [Server] Request received: ${request.action}`); + let result = null; + switch (request.action) { + case OrderWatcherAction.AddOrder: { + const signedOrder: SignedOrder = OrderWatcherWebSocketServer._parseSignedOrder(request.signedOrder); + await this._orderWatcher.addOrderAsync(signedOrder); + break; } - responseData.success = 1; - } catch (err) { - responseData.result = { error: err.toString() }; + case OrderWatcherAction.RemoveOrder: { + const orderHash = request.orderHash || '_'; + this._orderWatcher.removeOrder(orderHash); + break; + } + case OrderWatcherAction.GetStats: { + result = this._orderWatcher.getStats(); + break; + } + default: + // Should never reach here. Should be caught by JSON schema check. + throw new Error(`[Server] Invalid request action: ${request.action}`); } - return responseData; + return result; } /** @@ -150,35 +160,14 @@ export class OrderWatcherWebSocketServer { * we do not support clients subscribing to only a subset of orders. As such, * Client B will be notified of changes to an order that Client A added. */ - private readonly _broadcastCallback: OnOrderStateChangeCallback = (err, orderState) => { + private _broadcastCallback(err: Error | null, orderState?: OrderState): void { this._connectionStore.forEach((connection: WebSocket.connection) => { - const responseData: WebSocketResponseData = { - action: OrderWatcherAction.orderWatcherUpdate, - success: 1, + const response: WebSocketResponse = { + action: OrderWatcherAction.Update, + success: true, result: orderState || err, }; - connection.sendUTF(JSON.stringify(responseData)); + connection.sendUTF(JSON.stringify(response)); }); - // tslint:disable-next-line:semicolon - }; // tslint thinks this is a class method, It's actally a property that holds a function. - - /** - * Recover types lost when the payload is stringified. - */ - private readonly _parseSignedOrder = (requestData: WebSocketRequestData) => { - const signedOrder = requestData.params.signedOrder; - const bigNumberFields = [ - 'salt', - 'makerFee', - 'takerFee', - 'makerAssetAmount', - 'takerAssetAmount', - 'expirationTimeSeconds', - ]; - for (const field of bigNumberFields) { - signedOrder[field] = new BigNumber(signedOrder[field]); - } - return signedOrder; - // tslint:disable-next-line:semicolon - }; // tslint thinks this is a class method, It's actally a property that holds a function. + } } diff --git a/packages/order-watcher/src/schemas/websocket_schemas.ts b/packages/order-watcher/src/schemas/websocket_schemas.ts new file mode 100644 index 000000000..c250d12f1 --- /dev/null +++ b/packages/order-watcher/src/schemas/websocket_schemas.ts @@ -0,0 +1,32 @@ +export const webSocketUtf8MessageSchema = { + id: '/webSocketUtf8MessageSchema', + properties: { + utf8Data: { type: 'string' }, + }, + type: 'object', + required: ['utf8Data'], +}; + +export const webSocketRequestSchema = { + id: '/webSocketRequestSchema', + properties: { + action: { enum: ['GET_STATS', 'ADD_ORDER', 'REMOVE_ORDER'] }, + signedOrder: { $ref: '/signedOrderSchema' }, + orderHash: { type: 'string' }, + }, + anyOf: [ + { + properties: { action: { enum: ['ADD_ORDER'] } }, + required: ['signedOrder'], + }, + { + properties: { action: { enum: ['REMOVE_ORDER'] } }, + required: ['orderHash'], + }, + { + properties: { action: { enum: ['GET_STATS'] } }, + required: [], + }, + ], + type: 'object', +}; diff --git a/packages/order-watcher/src/schemas/websocket_utf8_message_schema.ts b/packages/order-watcher/src/schemas/websocket_utf8_message_schema.ts deleted file mode 100644 index 0a0eed407..000000000 --- a/packages/order-watcher/src/schemas/websocket_utf8_message_schema.ts +++ /dev/null @@ -1,8 +0,0 @@ -export const webSocketUtf8MessageSchema = { - id: '/WebSocketUtf8MessageSchema', - properties: { - utf8Data: { type: 'string' }, - }, - type: 'object', - required: ['utf8Data'], -}; diff --git a/packages/order-watcher/src/types.ts b/packages/order-watcher/src/types.ts index 8078dd971..7f6219732 100644 --- a/packages/order-watcher/src/types.ts +++ b/packages/order-watcher/src/types.ts @@ -1,4 +1,4 @@ -import { OrderState } from '@0x/types'; +import { OrderState, SignedOrder } from '@0x/types'; import { LogEntryEvent } from 'ethereum-types'; export enum OrderWatcherError { @@ -31,3 +31,31 @@ export enum InternalOrderWatcherError { ZrxNotInTokenRegistry = 'ZRX_NOT_IN_TOKEN_REGISTRY', WethNotInTokenRegistry = 'WETH_NOT_IN_TOKEN_REGISTRY', } + +export enum OrderWatcherAction { + // Actions initiated by the user. + GetStats = 'GET_STATS', + AddOrder = 'ADD_ORDER', + RemoveOrder = 'REMOVE_ORDER', + // These are spontaneous; they are primarily orderstate changes. + Update = 'UPDATE', + // `subscribe` and `unsubscribe` are methods of OrderWatcher, but we don't + // need to expose them to the WebSocket server user because the user implicitly + // subscribes and unsubscribes by connecting and disconnecting from the server. +} + +// Users have to create a json object of this format and attach it to +// the data field of their WebSocket message to interact with the server. +export interface WebSocketRequest { + action: OrderWatcherAction; + signedOrder?: SignedOrder; + orderHash?: string; +} + +// Users should expect a json object of this format in the data field +// of the WebSocket messages that the server sends out. +export interface WebSocketResponse { + action: OrderWatcherAction | null; + success: boolean; + result: any; +} diff --git a/packages/order-watcher/test/order_watcher_websocket_test.ts b/packages/order-watcher/test/order_watcher_websocket_test.ts index e7b18e44b..a9e72ce21 100644 --- a/packages/order-watcher/test/order_watcher_websocket_test.ts +++ b/packages/order-watcher/test/order_watcher_websocket_test.ts @@ -41,11 +41,11 @@ describe.only('OrderWatcherWebSocket', async () => { let zrxTokenAddress: string; let signedOrder: SignedOrder; let orderHash: string; - let addOrderPayload: { action: string; params: { signedOrder: SignedOrder } }; - let removeOrderPayload: { action: string; params: { orderHash: string } }; + let addOrderPayload: { action: string; signedOrder: SignedOrder }; + let removeOrderPayload: { action: string; orderHash: string }; const decimals = constants.ZRX_DECIMALS; const fillableAmount = Web3Wrapper.toBaseUnitAmount(new BigNumber(5), decimals); - // createFillableSignedOrderAsync is Promise-based, which forces us + // HACK: createFillableSignedOrderAsync is Promise-based, which forces us // to use Promises instead of the done() callbacks for tests. // onmessage callback must thus be wrapped as a Promise. const _onMessageAsync = async (client: WebSocket.w3cwebsocket) => @@ -88,12 +88,12 @@ describe.only('OrderWatcherWebSocket', async () => { ); orderHash = orderHashUtils.getOrderHashHex(signedOrder); addOrderPayload = { - action: 'addOrderAsync', - params: { signedOrder }, + action: 'ADD_ORDER', + signedOrder, }; removeOrderPayload = { - action: 'removeOrder', - params: { orderHash }, + action: 'REMOVE_ORDER', + orderHash, }; // Prepare OrderWatcher WebSocket server @@ -118,14 +118,13 @@ describe.only('OrderWatcherWebSocket', async () => { it('responds to getStats requests correctly', (done: any) => { const payload = { - action: 'getStats', - params: {}, + action: 'GET_STATS', }; wsClient.onopen = () => wsClient.send(JSON.stringify(payload)); wsClient.onmessage = (msg: any) => { const responseData = JSON.parse(msg.data); - expect(responseData.action).to.be.eq('getStats'); - expect(responseData.success).to.be.eq(1); + expect(responseData.action).to.be.eq('GET_STATS'); + expect(responseData.success).to.be.true(); expect(responseData.result.orderCount).to.be.eq(0); done(); }; @@ -133,23 +132,53 @@ describe.only('OrderWatcherWebSocket', async () => { it('throws an error when an invalid action is attempted', async () => { const invalidActionPayload = { - action: 'badAction', - params: {}, + action: 'BAD_ACTION', }; wsClient.onopen = () => wsClient.send(JSON.stringify(invalidActionPayload)); const errorMsg = await _onMessageAsync(wsClient); const errorData = JSON.parse(errorMsg.data); - expect(errorData.action).to.be.eq('badAction'); - expect(errorData.success).to.be.eq(0); - expect(errorData.result.error).to.be.eq('Error: [Server] Invalid request action: badAction'); + // tslint:disable-next-line:no-unused-expression + expect(errorData.action).to.be.null; + expect(errorData.success).to.be.false(); + expect(errorData.result).to.match(/^Error: Expected request to conform to schema/); }); - it('executes addOrderAsync and removeOrder requests correctly', async () => { + it('throws an error when we try to add an order without a signedOrder', async () => { + const noSignedOrderAddOrderPayload = { + action: 'ADD_ORDER', + orderHash: '0x0', + }; + wsClient.onopen = () => wsClient.send(JSON.stringify(noSignedOrderAddOrderPayload)); + const errorMsg = await _onMessageAsync(wsClient); + const errorData = JSON.parse(errorMsg.data); + // tslint:disable-next-line:no-unused-expression + expect(errorData.action).to.be.null; + expect(errorData.success).to.be.false(); + expect(errorData.result).to.match(/^Error: Expected request to conform to schema/); + }); + + it('throws an error when we try to add a bad signedOrder', async () => { + const invalidAddOrderPayload = { + action: 'ADD_ORDER', + signedOrder: { + makerAddress: '0x0', + }, + }; + wsClient.onopen = () => wsClient.send(JSON.stringify(invalidAddOrderPayload)); + const errorMsg = await _onMessageAsync(wsClient); + const errorData = JSON.parse(errorMsg.data); + // tslint:disable-next-line:no-unused-expression + expect(errorData.action).to.be.null; + expect(errorData.success).to.be.false(); + expect(errorData.result).to.match(/^Error: Expected request to conform to schema/); + }); + + it('executes addOrder and removeOrder requests correctly', async () => { wsClient.onopen = () => wsClient.send(JSON.stringify(addOrderPayload)); const addOrderMsg = await _onMessageAsync(wsClient); const addOrderData = JSON.parse(addOrderMsg.data); - expect(addOrderData.action).to.be.eq('addOrderAsync'); - expect(addOrderData.success).to.be.eq(1); + expect(addOrderData.action).to.be.eq('ADD_ORDER'); + expect(addOrderData.success).to.be.true(); expect((wsServer._orderWatcher as any)._orderByOrderHash).to.deep.include({ [orderHash]: signedOrder, }); @@ -157,8 +186,8 @@ describe.only('OrderWatcherWebSocket', async () => { wsClient.send(JSON.stringify(removeOrderPayload)); const removeOrderMsg = await _onMessageAsync(wsClient); const removeOrderData = JSON.parse(removeOrderMsg.data); - expect(removeOrderData.action).to.be.eq('removeOrder'); - expect(removeOrderData.success).to.be.eq(1); + expect(removeOrderData.action).to.be.eq('REMOVE_ORDER'); + expect(removeOrderData.success).to.be.true(); expect((wsServer._orderWatcher as any)._orderByOrderHash).to.not.deep.include({ [orderHash]: signedOrder, }); @@ -175,8 +204,8 @@ describe.only('OrderWatcherWebSocket', async () => { // Ensure that orderStateInvalid message is received. const orderWatcherUpdateMsg = await _onMessageAsync(wsClient); const orderWatcherUpdateData = JSON.parse(orderWatcherUpdateMsg.data); - expect(orderWatcherUpdateData.action).to.be.eq('orderWatcherUpdate'); - expect(orderWatcherUpdateData.success).to.be.eq(1); + expect(orderWatcherUpdateData.action).to.be.eq('UPDATE'); + expect(orderWatcherUpdateData.success).to.be.true(); const invalidOrderState = orderWatcherUpdateData.result as OrderStateInvalid; expect(invalidOrderState.isValid).to.be.false(); expect(invalidOrderState.orderHash).to.be.eq(orderHash); @@ -198,8 +227,8 @@ describe.only('OrderWatcherWebSocket', async () => { takerAddress, ); const nonZeroMakerFeeOrderPayload = { - action: 'addOrderAsync', - params: { nonZeroMakerFeeSignedOrder }, + action: 'ADD_ORDER', + signedOrder: nonZeroMakerFeeSignedOrder, }; // Set up a second client and have it add the order -- cgit v1.2.3 From 6bb2ef923894a3572c3fa824c3bf1a69759eb43a Mon Sep 17 00:00:00 2001 From: kao Date: Sat, 15 Dec 2018 01:23:08 -0800 Subject: Respond to CR --- packages/order-watcher/README.md | 37 +++++--- .../src/order_watcher/order_watcher_websocket.ts | 69 ++++++++------ .../order-watcher/src/schemas/websocket_schemas.ts | 53 ++++++++--- packages/order-watcher/src/types.ts | 50 ++++++++-- .../test/order_watcher_websocket_test.ts | 101 ++++++++++++++------- 5 files changed, 209 insertions(+), 101 deletions(-) (limited to 'packages/order-watcher') diff --git a/packages/order-watcher/README.md b/packages/order-watcher/README.md index 7eae0ae16..aad90a59a 100644 --- a/packages/order-watcher/README.md +++ b/packages/order-watcher/README.md @@ -5,7 +5,7 @@ An order watcher daemon that watches for order validity. #### Read the wiki [article](https://0xproject.com/wiki#0x-OrderWatcher). OrderWatcher also comes with a WebSocket server to provide language-agnostic access -to order watching functionality. We used the [WebSocket Client and Server Implementation for Node](https://www.npmjs.com/package/websocket). +to order watching functionality. We used the [WebSocket Client and Server Implementation for Node](https://www.npmjs.com/package/websocket). The server sends and receives messages that conform to the [JSON RPC specifications](https://www.jsonrpc.org/specification). ## Installation @@ -46,7 +46,7 @@ The first step for making a request is establishing a connection with the server ``` var W3CWebSocket = require('websocket').w3cwebsocket; -wsClient = new WebSocket.w3cwebsocket('ws://127.0.0.1:8080'); +wsClient = new W3CWebSocket('ws://127.0.0.1:8080'); ``` In Python, you could use the [websocket-client library](http://pypi.python.org/pypi/websocket-client/) and run: @@ -56,19 +56,22 @@ from websocket import create_connection wsClient = create_connection("ws://127.0.0.1:8080") ``` -With the connection established, you prepare the payload for your request. The payload is a json object with the following structure: +With the connection established, you prepare the payload for your request. The payload is a json object with a format established by the [JSON RPC specification](https://www.jsonrpc.org/specification): -* For `GET_STATE`, the payload is `{ action: 'GET_STATS }`. -* For `ADD_ORDER`, use `{ action: 'ADD_ORDER', signedOrder: }`. -* For `REMOVE_ORDER`, use `{ action: 'REMOVE_ORDER', orderHash: }`. +* `id`: All requests require you to specify a string as an id. When the server responds to the request, it provides an id as well to allow you to determine which request it is responding to. +* `jsonrpc`: This is always the string `'2.0'`. +* `method`: This specifies the OrderWatcher method you want to call. I.e., `'ADD_ORDER'`, `'REMOVE_ORDER'`, and `'GET_STATS'`. +* `params`: These contain the parameters needed by OrderWatcher to execute the method you called. For `ADD_ORDER`, provide `{ signedOrder: }`. For `REMOVE_ORDER`, provide `{ orderHash: }`. For `GET_STATS`, no parameters are needed, so you may leave this empty. Next, convert the payload to a string and send it through the connection. In Javascript: ``` const addOrderPayload = { - action: 'ADD_ORDER', - signedOrder: , + id: 'order32', + jsonrpc: '2.0', + method: 'ADD_ORDER', + params: { signedOrder: }, }; wsClient.send(JSON.stringify(addOrderPayload)); ``` @@ -78,8 +81,10 @@ In Python: ``` import json remove_order_payload = { - 'action': 'REMOVE_ORDER', - 'orderHash': '0x6edc16bf37fde79f5012088c33784c730e2f103d9ab1caf73060c386ad107b7e', + 'id': 'order33', + 'jsonrpc': '2.0', + 'method': 'REMOVE_ORDER', + 'params': {'orderHash': '0x6edc16bf37fde79f5012088c33784c730e2f103d9ab1caf73060c386ad107b7e'}, } wsClient.send(json.dumps(remove_order_payload)); ``` @@ -87,16 +92,18 @@ wsClient.send(json.dumps(remove_order_payload)); **Response** The server responds to all requests in a similar format. In the data field, you'll find another json object that has been converted into a string. This json object contains the following fields: -* `action`: The action the server is responding to. Eg. `ADD_ORDER`. When order states change the server may also initiate a response. In this case, action will be listed as `UPDATE`. -* `success`: `true` or `false`; Indicates whether the server handled the request without problems. -* `result`: This field varies based on the action. `UPDATE` responses contained the new order state. `GET_STATS` responses contain the current order count. When there are errors, the error messages are stored in here. +* `id`: The id corresponding to the request that the server is responding to. `UPDATE` responses are not based on any requests so the `id` field is `null`. +* `jsonrpc`: Always `'2.0'`. +* `method`: The method the server is responding to. Eg. `ADD_ORDER`. When order states change the server may also initiate a response. In this case, method will be listed as `UPDATE`. +* `result`: This field varies based on the method. `UPDATE` responses contained the new order state. `GET_STATS` responses contain the current order count. When there are errors, this field is `null`. +* `error`: When there is an error executing a request, the error message is listed here. When the server responds successfully, this field is `null`. In Javascript, the responses can be parsed using the `onmessage` callback: ``` wsClient.onmessage = (msg) => { const responseData = JSON.parse(msg.data); - const action = responseData.action + const method = responseData.method }; ``` @@ -104,7 +111,7 @@ In Python, `recv` is a lightweight way to receive a response: ``` result = wsClient.recv() -action = result.action +method = result.method ``` ## Contributing diff --git a/packages/order-watcher/src/order_watcher/order_watcher_websocket.ts b/packages/order-watcher/src/order_watcher/order_watcher_websocket.ts index 806c7c6a5..7a88597ef 100644 --- a/packages/order-watcher/src/order_watcher/order_watcher_websocket.ts +++ b/packages/order-watcher/src/order_watcher/order_watcher_websocket.ts @@ -7,9 +7,10 @@ import * as WebSocket from 'websocket'; import { webSocketRequestSchema, webSocketUtf8MessageSchema } from '../schemas/websocket_schemas'; import { + GetStatsResult, OnOrderStateChangeCallback, - OrderWatcherAction, OrderWatcherConfig, + OrderWatcherMethod, WebSocketRequest, WebSocketResponse, } from '../types'; @@ -18,12 +19,13 @@ import { assert } from '../utils/assert'; import { OrderWatcher } from './order_watcher'; const DEFAULT_HTTP_PORT = 8080; +const JSONRPC_VERSION = '2.0'; // Wraps the OrderWatcher functionality in a WebSocket server. Motivations: // 1) Users can watch orders via non-typescript programs. // 2) Better encapsulation so that users can work export class OrderWatcherWebSocketServer { - public readonly _orderWatcher: OrderWatcher; // public for testing + private readonly _orderWatcher: OrderWatcher; private readonly _httpServer: http.Server; private readonly _connectionStore: Set; private readonly _wsServer: WebSocket.server; @@ -66,7 +68,9 @@ export class OrderWatcherWebSocketServer { httpServer: this._httpServer, // Avoid setting autoAcceptConnections to true as it defeats all // standard cross-origin protection facilities built into the protocol - // and the browser. Also ensures that a request event is emitted by + // and the browser. + // Source: https://www.npmjs.com/package/websocket#server-example + // Also ensures that a request event is emitted by // the server whenever a new WebSocket request is made. autoAcceptConnections: false, }); @@ -76,7 +80,7 @@ export class OrderWatcherWebSocketServer { // machine by the same user. As such, no security checks are in place. const connection: WebSocket.connection = request.accept(null, request.origin); logUtils.log(`${new Date()} [Server] Accepted connection from origin ${request.origin}.`); - connection.on('message', await this._onMessageCallbackAsync.bind(this, connection)); + connection.on('message', this._onMessageCallbackAsync.bind(this, connection)); connection.on('close', this._onCloseCallback.bind(this, connection)); this._connectionStore.add(connection); }); @@ -106,20 +110,25 @@ export class OrderWatcherWebSocketServer { } private async _onMessageCallbackAsync(connection: WebSocket.connection, message: any): Promise { - const response: WebSocketResponse = { - action: null, - success: false, - result: null, - }; + let response: WebSocketResponse; try { assert.doesConformToSchema('message', message, webSocketUtf8MessageSchema); const request: WebSocketRequest = JSON.parse(message.utf8Data); assert.doesConformToSchema('request', request, webSocketRequestSchema); - response.action = request.action; - response.success = true; - response.result = await this._routeRequestAsync(request); + assert.isString(request.jsonrpc, JSONRPC_VERSION); + response = { + id: request.id, + jsonrpc: JSONRPC_VERSION, + method: request.method, + result: await this._routeRequestAsync(request), + }; } catch (err) { - response.result = err.toString(); + response = { + id: null, + jsonrpc: JSONRPC_VERSION, + method: null, + error: err.toString(), + }; } logUtils.log(`${new Date()} [Server] OrderWatcher output: ${JSON.stringify(response)}`); connection.sendUTF(JSON.stringify(response)); @@ -130,29 +139,28 @@ export class OrderWatcherWebSocketServer { logUtils.log(`${new Date()} [Server] Client ${connection.remoteAddress} disconnected.`); } - private async _routeRequestAsync(request: WebSocketRequest): Promise { - logUtils.log(`${new Date()} [Server] Request received: ${request.action}`); - let result = null; - switch (request.action) { - case OrderWatcherAction.AddOrder: { - const signedOrder: SignedOrder = OrderWatcherWebSocketServer._parseSignedOrder(request.signedOrder); + private async _routeRequestAsync(request: WebSocketRequest): Promise { + logUtils.log(`${new Date()} [Server] Request received: ${request.method}`); + switch (request.method) { + case OrderWatcherMethod.AddOrder: { + const signedOrder: SignedOrder = OrderWatcherWebSocketServer._parseSignedOrder( + request.params.signedOrder, + ); await this._orderWatcher.addOrderAsync(signedOrder); break; } - case OrderWatcherAction.RemoveOrder: { - const orderHash = request.orderHash || '_'; - this._orderWatcher.removeOrder(orderHash); + case OrderWatcherMethod.RemoveOrder: { + this._orderWatcher.removeOrder(request.params.orderHash || 'undefined'); break; } - case OrderWatcherAction.GetStats: { - result = this._orderWatcher.getStats(); + case OrderWatcherMethod.GetStats: { + return this._orderWatcher.getStats(); break; } default: - // Should never reach here. Should be caught by JSON schema check. - throw new Error(`[Server] Invalid request action: ${request.action}`); + // Should never reach here. Should be caught by JSON schema check. } - return result; + return undefined; } /** @@ -163,9 +171,10 @@ export class OrderWatcherWebSocketServer { private _broadcastCallback(err: Error | null, orderState?: OrderState): void { this._connectionStore.forEach((connection: WebSocket.connection) => { const response: WebSocketResponse = { - action: OrderWatcherAction.Update, - success: true, - result: orderState || err, + id: null, + jsonrpc: JSONRPC_VERSION, + method: OrderWatcherMethod.Update, + result: orderState, }; connection.sendUTF(JSON.stringify(response)); }); diff --git a/packages/order-watcher/src/schemas/websocket_schemas.ts b/packages/order-watcher/src/schemas/websocket_schemas.ts index c250d12f1..5e4e1ab74 100644 --- a/packages/order-watcher/src/schemas/websocket_schemas.ts +++ b/packages/order-watcher/src/schemas/websocket_schemas.ts @@ -9,24 +9,53 @@ export const webSocketUtf8MessageSchema = { export const webSocketRequestSchema = { id: '/webSocketRequestSchema', - properties: { - action: { enum: ['GET_STATS', 'ADD_ORDER', 'REMOVE_ORDER'] }, - signedOrder: { $ref: '/signedOrderSchema' }, - orderHash: { type: 'string' }, + type: 'object', + definitions: { + signedOrderParam: { + type: 'object', + properties: { + signedOrder: { $ref: '/signedOrderSchema' }, + }, + required: ['signedOrder'], + }, + orderHashParam: { + type: 'object', + properties: { + orderHash: { $ref: '/hexSchema' }, + }, + required: ['orderHash'], + }, }, - anyOf: [ + oneOf: [ { - properties: { action: { enum: ['ADD_ORDER'] } }, - required: ['signedOrder'], + type: 'object', + properties: { + id: { type: 'string' }, + jsonrpc: { type: 'string' }, + method: { enum: ['ADD_ORDER'] }, + params: { $ref: '#/definitions/signedOrderParam' }, + }, + required: ['id', 'jsonrpc', 'method', 'params'], }, { - properties: { action: { enum: ['REMOVE_ORDER'] } }, - required: ['orderHash'], + type: 'object', + properties: { + id: { type: 'string' }, + jsonrpc: { type: 'string' }, + method: { enum: ['REMOVE_ORDER'] }, + params: { $ref: '#/definitions/orderHashParam' }, + }, + required: ['id', 'jsonrpc', 'method', 'params'], }, { - properties: { action: { enum: ['GET_STATS'] } }, - required: [], + type: 'object', + properties: { + id: { type: 'string' }, + jsonrpc: { type: 'string' }, + method: { enum: ['GET_STATS'] }, + params: {}, + }, + required: ['id', 'jsonrpc', 'method'], }, ], - type: 'object', }; diff --git a/packages/order-watcher/src/types.ts b/packages/order-watcher/src/types.ts index 7f6219732..90d383660 100644 --- a/packages/order-watcher/src/types.ts +++ b/packages/order-watcher/src/types.ts @@ -32,8 +32,8 @@ export enum InternalOrderWatcherError { WethNotInTokenRegistry = 'WETH_NOT_IN_TOKEN_REGISTRY', } -export enum OrderWatcherAction { - // Actions initiated by the user. +export enum OrderWatcherMethod { + // Methods initiated by the user. GetStats = 'GET_STATS', AddOrder = 'ADD_ORDER', RemoveOrder = 'REMOVE_ORDER', @@ -46,16 +46,46 @@ export enum OrderWatcherAction { // Users have to create a json object of this format and attach it to // the data field of their WebSocket message to interact with the server. -export interface WebSocketRequest { - action: OrderWatcherAction; - signedOrder?: SignedOrder; - orderHash?: string; +export type WebSocketRequest = AddOrderRequest | RemoveOrderRequest | GetStatsRequest; + +interface AddOrderRequest { + id: string; + jsonrpc: string; + method: OrderWatcherMethod.AddOrder; + params: { signedOrder: SignedOrder }; +} + +interface RemoveOrderRequest { + id: string; + jsonrpc: string; + method: OrderWatcherMethod.RemoveOrder; + params: { orderHash: string }; +} + +interface GetStatsRequest { + id: string; + jsonrpc: string; + method: OrderWatcherMethod.GetStats; } // Users should expect a json object of this format in the data field // of the WebSocket messages that the server sends out. -export interface WebSocketResponse { - action: OrderWatcherAction | null; - success: boolean; - result: any; +export type WebSocketResponse = SuccessfulWebSocketResponse | ErrorWebSocketResponse; + +interface SuccessfulWebSocketResponse { + id: string | null; // id is null for UPDATE + jsonrpc: string; + method: OrderWatcherMethod; + result: OrderState | GetStatsResult | undefined; // result is undefined for ADD_ORDER and REMOVE_ORDER +} + +interface ErrorWebSocketResponse { + id: null; + jsonrpc: string; + method: null; + error: string; +} + +export interface GetStatsResult { + orderCount: number; } diff --git a/packages/order-watcher/test/order_watcher_websocket_test.ts b/packages/order-watcher/test/order_watcher_websocket_test.ts index a9e72ce21..c4d1ede45 100644 --- a/packages/order-watcher/test/order_watcher_websocket_test.ts +++ b/packages/order-watcher/test/order_watcher_websocket_test.ts @@ -41,8 +41,10 @@ describe.only('OrderWatcherWebSocket', async () => { let zrxTokenAddress: string; let signedOrder: SignedOrder; let orderHash: string; - let addOrderPayload: { action: string; signedOrder: SignedOrder }; - let removeOrderPayload: { action: string; orderHash: string }; + // Manually encode types rather than use /src/types to mimick real data that user + // would input. Otherwise we would be forced to use enums, which hide problems. + let addOrderPayload: { id: string; jsonrpc: string; method: string; params: { signedOrder: SignedOrder } }; + let removeOrderPayload: { id: string; jsonrpc: string; method: string; params: { orderHash: string } }; const decimals = constants.ZRX_DECIMALS; const fillableAmount = Web3Wrapper.toBaseUnitAmount(new BigNumber(5), decimals); // HACK: createFillableSignedOrderAsync is Promise-based, which forces us @@ -88,12 +90,16 @@ describe.only('OrderWatcherWebSocket', async () => { ); orderHash = orderHashUtils.getOrderHashHex(signedOrder); addOrderPayload = { - action: 'ADD_ORDER', - signedOrder, + id: 'addOrderPayload', + jsonrpc: '2.0', + method: 'ADD_ORDER', + params: { signedOrder }, }; removeOrderPayload = { - action: 'REMOVE_ORDER', - orderHash, + id: 'removeOrderPayload', + jsonrpc: '2.0', + method: 'REMOVE_ORDER', + params: { orderHash }, }; // Prepare OrderWatcher WebSocket server @@ -118,48 +124,75 @@ describe.only('OrderWatcherWebSocket', async () => { it('responds to getStats requests correctly', (done: any) => { const payload = { - action: 'GET_STATS', + id: 'getStats', + jsonrpc: '2.0', + method: 'GET_STATS', }; wsClient.onopen = () => wsClient.send(JSON.stringify(payload)); wsClient.onmessage = (msg: any) => { const responseData = JSON.parse(msg.data); - expect(responseData.action).to.be.eq('GET_STATS'); - expect(responseData.success).to.be.true(); + expect(responseData.id).to.be.eq('getStats'); + expect(responseData.jsonrpc).to.be.eq('2.0'); + expect(responseData.method).to.be.eq('GET_STATS'); expect(responseData.result.orderCount).to.be.eq(0); done(); }; }); - it('throws an error when an invalid action is attempted', async () => { - const invalidActionPayload = { - action: 'BAD_ACTION', + it('throws an error when an invalid method is attempted', async () => { + const invalidMethodPayload = { + id: 'invalidMethodPayload', + jsonrpc: '2.0', + method: 'BAD_METHOD', }; - wsClient.onopen = () => wsClient.send(JSON.stringify(invalidActionPayload)); + wsClient.onopen = () => wsClient.send(JSON.stringify(invalidMethodPayload)); const errorMsg = await _onMessageAsync(wsClient); const errorData = JSON.parse(errorMsg.data); // tslint:disable-next-line:no-unused-expression - expect(errorData.action).to.be.null; - expect(errorData.success).to.be.false(); - expect(errorData.result).to.match(/^Error: Expected request to conform to schema/); + expect(errorData.id).to.be.null; + // tslint:disable-next-line:no-unused-expression + expect(errorData.method).to.be.null; + expect(errorData.jsonrpc).to.be.eq('2.0'); + expect(errorData.error).to.match(/^Error: Expected request to conform to schema/); + }); + + it('throws an error when jsonrpc field missing from request', async () => { + const noJsonRpcPayload = { + id: 'noJsonRpcPayload', + method: 'GET_STATS', + }; + wsClient.onopen = () => wsClient.send(JSON.stringify(noJsonRpcPayload)); + const errorMsg = await _onMessageAsync(wsClient); + const errorData = JSON.parse(errorMsg.data); + // tslint:disable-next-line:no-unused-expression + expect(errorData.method).to.be.null; + expect(errorData.jsonrpc).to.be.eq('2.0'); + expect(errorData.error).to.match(/^Error: Expected request to conform to schema/); }); it('throws an error when we try to add an order without a signedOrder', async () => { const noSignedOrderAddOrderPayload = { - action: 'ADD_ORDER', - orderHash: '0x0', + id: 'noSignedOrderAddOrderPayload', + jsonrpc: '2.0', + method: 'ADD_ORDER', + orderHash: '0x7337e2f2a9aa2ed6afe26edc2df7ad79c3ffa9cf9b81a964f707ea63f5272355', }; wsClient.onopen = () => wsClient.send(JSON.stringify(noSignedOrderAddOrderPayload)); const errorMsg = await _onMessageAsync(wsClient); const errorData = JSON.parse(errorMsg.data); // tslint:disable-next-line:no-unused-expression - expect(errorData.action).to.be.null; - expect(errorData.success).to.be.false(); - expect(errorData.result).to.match(/^Error: Expected request to conform to schema/); + expect(errorData.id).to.be.null; + // tslint:disable-next-line:no-unused-expression + expect(errorData.method).to.be.null; + expect(errorData.jsonrpc).to.be.eq('2.0'); + expect(errorData.error).to.match(/^Error: Expected request to conform to schema/); }); it('throws an error when we try to add a bad signedOrder', async () => { const invalidAddOrderPayload = { - action: 'ADD_ORDER', + id: 'invalidAddOrderPayload', + jsonrpc: '2.0', + method: 'ADD_ORDER', signedOrder: { makerAddress: '0x0', }, @@ -168,27 +201,26 @@ describe.only('OrderWatcherWebSocket', async () => { const errorMsg = await _onMessageAsync(wsClient); const errorData = JSON.parse(errorMsg.data); // tslint:disable-next-line:no-unused-expression - expect(errorData.action).to.be.null; - expect(errorData.success).to.be.false(); - expect(errorData.result).to.match(/^Error: Expected request to conform to schema/); + expect(errorData.id).to.be.null; + // tslint:disable-next-line:no-unused-expression + expect(errorData.method).to.be.null; + expect(errorData.error).to.match(/^Error: Expected request to conform to schema/); }); it('executes addOrder and removeOrder requests correctly', async () => { wsClient.onopen = () => wsClient.send(JSON.stringify(addOrderPayload)); const addOrderMsg = await _onMessageAsync(wsClient); const addOrderData = JSON.parse(addOrderMsg.data); - expect(addOrderData.action).to.be.eq('ADD_ORDER'); - expect(addOrderData.success).to.be.true(); - expect((wsServer._orderWatcher as any)._orderByOrderHash).to.deep.include({ + expect(addOrderData.method).to.be.eq('ADD_ORDER'); + expect((wsServer as any)._orderWatcher._orderByOrderHash).to.deep.include({ [orderHash]: signedOrder, }); wsClient.send(JSON.stringify(removeOrderPayload)); const removeOrderMsg = await _onMessageAsync(wsClient); const removeOrderData = JSON.parse(removeOrderMsg.data); - expect(removeOrderData.action).to.be.eq('REMOVE_ORDER'); - expect(removeOrderData.success).to.be.true(); - expect((wsServer._orderWatcher as any)._orderByOrderHash).to.not.deep.include({ + expect(removeOrderData.method).to.be.eq('REMOVE_ORDER'); + expect((wsServer as any)._orderWatcher._orderByOrderHash).to.not.deep.include({ [orderHash]: signedOrder, }); }); @@ -204,8 +236,7 @@ describe.only('OrderWatcherWebSocket', async () => { // Ensure that orderStateInvalid message is received. const orderWatcherUpdateMsg = await _onMessageAsync(wsClient); const orderWatcherUpdateData = JSON.parse(orderWatcherUpdateMsg.data); - expect(orderWatcherUpdateData.action).to.be.eq('UPDATE'); - expect(orderWatcherUpdateData.success).to.be.true(); + expect(orderWatcherUpdateData.method).to.be.eq('UPDATE'); const invalidOrderState = orderWatcherUpdateData.result as OrderStateInvalid; expect(invalidOrderState.isValid).to.be.false(); expect(invalidOrderState.orderHash).to.be.eq(orderHash); @@ -227,7 +258,9 @@ describe.only('OrderWatcherWebSocket', async () => { takerAddress, ); const nonZeroMakerFeeOrderPayload = { - action: 'ADD_ORDER', + id: 'nonZeroMakerFeeOrderPayload', + jsonrpc: '2.0', + method: 'ADD_ORDER', signedOrder: nonZeroMakerFeeSignedOrder, }; -- cgit v1.2.3 From 7cafe396de676cec3859c76d6407a0948a8e398e Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Sat, 15 Dec 2018 21:34:35 -0800 Subject: Ensure fileName matches class name, fix broadcast --- packages/order-watcher/src/index.ts | 1 + .../src/order_watcher/order_watcher_websocket.ts | 182 ------------- .../order_watcher_websocket_server.ts | 186 +++++++++++++ packages/order-watcher/src/types.ts | 8 +- .../test/order_watcher_websocket_server_test.ts | 288 +++++++++++++++++++++ .../test/order_watcher_websocket_test.ts | 288 --------------------- 6 files changed, 482 insertions(+), 471 deletions(-) delete mode 100644 packages/order-watcher/src/order_watcher/order_watcher_websocket.ts create mode 100644 packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts create mode 100644 packages/order-watcher/test/order_watcher_websocket_server_test.ts delete mode 100644 packages/order-watcher/test/order_watcher_websocket_test.ts (limited to 'packages/order-watcher') diff --git a/packages/order-watcher/src/index.ts b/packages/order-watcher/src/index.ts index 5eeba3e87..5bdef4504 100644 --- a/packages/order-watcher/src/index.ts +++ b/packages/order-watcher/src/index.ts @@ -1,4 +1,5 @@ export { OrderWatcher } from './order_watcher/order_watcher'; +export { OrderWatcherWebSocketServer } from './order_watcher/order_watcher_websocket_server'; export { ExpirationWatcher } from './order_watcher/expiration_watcher'; export { diff --git a/packages/order-watcher/src/order_watcher/order_watcher_websocket.ts b/packages/order-watcher/src/order_watcher/order_watcher_websocket.ts deleted file mode 100644 index 7a88597ef..000000000 --- a/packages/order-watcher/src/order_watcher/order_watcher_websocket.ts +++ /dev/null @@ -1,182 +0,0 @@ -import { ContractAddresses } from '@0x/contract-addresses'; -import { OrderState, SignedOrder } from '@0x/types'; -import { BigNumber, logUtils } from '@0x/utils'; -import { Provider } from 'ethereum-types'; -import * as http from 'http'; -import * as WebSocket from 'websocket'; - -import { webSocketRequestSchema, webSocketUtf8MessageSchema } from '../schemas/websocket_schemas'; -import { - GetStatsResult, - OnOrderStateChangeCallback, - OrderWatcherConfig, - OrderWatcherMethod, - WebSocketRequest, - WebSocketResponse, -} from '../types'; -import { assert } from '../utils/assert'; - -import { OrderWatcher } from './order_watcher'; - -const DEFAULT_HTTP_PORT = 8080; -const JSONRPC_VERSION = '2.0'; - -// Wraps the OrderWatcher functionality in a WebSocket server. Motivations: -// 1) Users can watch orders via non-typescript programs. -// 2) Better encapsulation so that users can work -export class OrderWatcherWebSocketServer { - private readonly _orderWatcher: OrderWatcher; - private readonly _httpServer: http.Server; - private readonly _connectionStore: Set; - private readonly _wsServer: WebSocket.server; - /** - * Recover types lost when the payload is stringified. - */ - private static _parseSignedOrder(rawRequest: any): SignedOrder { - const bigNumberFields = [ - 'salt', - 'makerFee', - 'takerFee', - 'makerAssetAmount', - 'takerAssetAmount', - 'expirationTimeSeconds', - ]; - for (const field of bigNumberFields) { - rawRequest[field] = new BigNumber(rawRequest[field]); - } - return rawRequest; - } - - /** - * Instantiate a new WebSocket server which provides OrderWatcher functionality - * @param provider Web3 provider to use for JSON RPC calls. - * @param networkId NetworkId to watch orders on. - * @param contractAddresses Optional contract addresses. Defaults to known - * addresses based on networkId. - * @param partialConfig Optional configurations. - */ - constructor( - provider: Provider, - networkId: number, - contractAddresses?: ContractAddresses, - partialConfig?: Partial, - ) { - this._orderWatcher = new OrderWatcher(provider, networkId, contractAddresses, partialConfig); - this._connectionStore = new Set(); - this._httpServer = http.createServer(); - this._wsServer = new WebSocket.server({ - httpServer: this._httpServer, - // Avoid setting autoAcceptConnections to true as it defeats all - // standard cross-origin protection facilities built into the protocol - // and the browser. - // Source: https://www.npmjs.com/package/websocket#server-example - // Also ensures that a request event is emitted by - // the server whenever a new WebSocket request is made. - autoAcceptConnections: false, - }); - - this._wsServer.on('request', async (request: any) => { - // Designed for usage pattern where client and server are run on the same - // machine by the same user. As such, no security checks are in place. - const connection: WebSocket.connection = request.accept(null, request.origin); - logUtils.log(`${new Date()} [Server] Accepted connection from origin ${request.origin}.`); - connection.on('message', this._onMessageCallbackAsync.bind(this, connection)); - connection.on('close', this._onCloseCallback.bind(this, connection)); - this._connectionStore.add(connection); - }); - - // Have the WebSocket server subscribe to the OrderWatcher to receive updates. - // These updates are then broadcast to clients in the _connectionStore. - const broadcastCallback: OnOrderStateChangeCallback = this._broadcastCallback.bind(this); - this._orderWatcher.subscribe(broadcastCallback); - } - - /** - * Activates the WebSocket server by having its HTTP server start listening. - */ - public listen(): void { - const port = process.env.ORDER_WATCHER_HTTP_PORT || DEFAULT_HTTP_PORT; - this._httpServer.listen(port, () => { - logUtils.log(`${new Date()} [Server] Listening on port ${port}`); - }); - } - - /** - * Deactivates the WebSocket server by stopping the HTTP server from accepting - * new connections. - */ - public close(): void { - this._httpServer.close(); - } - - private async _onMessageCallbackAsync(connection: WebSocket.connection, message: any): Promise { - let response: WebSocketResponse; - try { - assert.doesConformToSchema('message', message, webSocketUtf8MessageSchema); - const request: WebSocketRequest = JSON.parse(message.utf8Data); - assert.doesConformToSchema('request', request, webSocketRequestSchema); - assert.isString(request.jsonrpc, JSONRPC_VERSION); - response = { - id: request.id, - jsonrpc: JSONRPC_VERSION, - method: request.method, - result: await this._routeRequestAsync(request), - }; - } catch (err) { - response = { - id: null, - jsonrpc: JSONRPC_VERSION, - method: null, - error: err.toString(), - }; - } - logUtils.log(`${new Date()} [Server] OrderWatcher output: ${JSON.stringify(response)}`); - connection.sendUTF(JSON.stringify(response)); - } - - private _onCloseCallback(connection: WebSocket.connection): void { - this._connectionStore.delete(connection); - logUtils.log(`${new Date()} [Server] Client ${connection.remoteAddress} disconnected.`); - } - - private async _routeRequestAsync(request: WebSocketRequest): Promise { - logUtils.log(`${new Date()} [Server] Request received: ${request.method}`); - switch (request.method) { - case OrderWatcherMethod.AddOrder: { - const signedOrder: SignedOrder = OrderWatcherWebSocketServer._parseSignedOrder( - request.params.signedOrder, - ); - await this._orderWatcher.addOrderAsync(signedOrder); - break; - } - case OrderWatcherMethod.RemoveOrder: { - this._orderWatcher.removeOrder(request.params.orderHash || 'undefined'); - break; - } - case OrderWatcherMethod.GetStats: { - return this._orderWatcher.getStats(); - break; - } - default: - // Should never reach here. Should be caught by JSON schema check. - } - return undefined; - } - - /** - * Broadcasts OrderState changes to ALL connected clients. At the moment, - * we do not support clients subscribing to only a subset of orders. As such, - * Client B will be notified of changes to an order that Client A added. - */ - private _broadcastCallback(err: Error | null, orderState?: OrderState): void { - this._connectionStore.forEach((connection: WebSocket.connection) => { - const response: WebSocketResponse = { - id: null, - jsonrpc: JSONRPC_VERSION, - method: OrderWatcherMethod.Update, - result: orderState, - }; - connection.sendUTF(JSON.stringify(response)); - }); - } -} diff --git a/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts b/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts new file mode 100644 index 000000000..2d2d9e82e --- /dev/null +++ b/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts @@ -0,0 +1,186 @@ +import { ContractAddresses } from '@0x/contract-addresses'; +import { OrderStateInvalid, OrderStateValid, SignedOrder } from '@0x/types'; +import { BigNumber, logUtils } from '@0x/utils'; +import { Provider } from 'ethereum-types'; +import * as http from 'http'; +import * as WebSocket from 'websocket'; + +import { webSocketRequestSchema, webSocketUtf8MessageSchema } from '../schemas/websocket_schemas'; +import { GetStatsResult, OrderWatcherConfig, OrderWatcherMethod, WebSocketRequest, WebSocketResponse } from '../types'; +import { assert } from '../utils/assert'; + +import { OrderWatcher } from './order_watcher'; + +const DEFAULT_HTTP_PORT = 8080; +const JSONRPC_VERSION = '2.0'; + +// Wraps the OrderWatcher functionality in a WebSocket server. Motivations: +// 1) Users can watch orders via non-typescript programs. +// 2) Better encapsulation so that users can work +export class OrderWatcherWebSocketServer { + private readonly _orderWatcher: OrderWatcher; + private readonly _httpServer: http.Server; + private readonly _connectionStore: Set; + private readonly _wsServer: WebSocket.server; + private _jsonRpcRequestId: number; + /** + * Recover types lost when the payload is stringified. + */ + private static _parseSignedOrder(rawRequest: any): SignedOrder { + const bigNumberFields = [ + 'salt', + 'makerFee', + 'takerFee', + 'makerAssetAmount', + 'takerAssetAmount', + 'expirationTimeSeconds', + ]; + for (const field of bigNumberFields) { + rawRequest[field] = new BigNumber(rawRequest[field]); + } + return rawRequest; + } + + /** + * Instantiate a new WebSocket server which provides OrderWatcher functionality + * @param provider Web3 provider to use for JSON RPC calls. + * @param networkId NetworkId to watch orders on. + * @param contractAddresses Optional contract addresses. Defaults to known + * addresses based on networkId. + * @param partialConfig Optional configurations. + */ + constructor( + provider: Provider, + networkId: number, + contractAddresses?: ContractAddresses, + partialConfig?: Partial, + ) { + this._jsonRpcRequestId = 1; + this._orderWatcher = new OrderWatcher(provider, networkId, contractAddresses, partialConfig); + this._connectionStore = new Set(); + this._httpServer = http.createServer(); + this._wsServer = new WebSocket.server({ + httpServer: this._httpServer, + // Avoid setting autoAcceptConnections to true as it defeats all + // standard cross-origin protection facilities built into the protocol + // and the browser. + // Source: https://www.npmjs.com/package/websocket#server-example + // Also ensures that a request event is emitted by + // the server whenever a new WebSocket request is made. + autoAcceptConnections: false, + }); + + this._wsServer.on('request', async (request: any) => { + // Designed for usage pattern where client and server are run on the same + // machine by the same user. As such, no security checks are in place. + const connection: WebSocket.connection = request.accept(null, request.origin); + logUtils.log(`${new Date()} [Server] Accepted connection from origin ${request.origin}.`); + connection.on('message', this._onMessageCallbackAsync.bind(this, connection)); + connection.on('close', this._onCloseCallback.bind(this, connection)); + this._connectionStore.add(connection); + }); + + // Have the WebSocket server subscribe to the OrderWatcher to receive updates. + // These updates are then broadcast to clients in the _connectionStore. + this._orderWatcher.subscribe(this._broadcastCallback.bind(this)); + } + + /** + * Activates the WebSocket server by having its HTTP server start listening. + */ + public listen(): void { + const port = process.env.ORDER_WATCHER_HTTP_PORT || DEFAULT_HTTP_PORT; + this._httpServer.listen(port, () => { + logUtils.log(`${new Date()} [Server] Listening on port ${port}`); + }); + } + + /** + * Deactivates the WebSocket server by stopping the HTTP server from accepting + * new connections. + */ + public close(): void { + this._httpServer.close(); + } + + private async _onMessageCallbackAsync(connection: WebSocket.connection, message: any): Promise { + let response: WebSocketResponse; + try { + assert.doesConformToSchema('message', message, webSocketUtf8MessageSchema); + const request: WebSocketRequest = JSON.parse(message.utf8Data); + assert.doesConformToSchema('request', request, webSocketRequestSchema); + assert.isString(request.jsonrpc, JSONRPC_VERSION); + response = { + id: request.id, + jsonrpc: JSONRPC_VERSION, + method: request.method, + result: await this._routeRequestAsync(request), + }; + } catch (err) { + response = { + id: null, + jsonrpc: JSONRPC_VERSION, + method: null, + error: err.toString(), + }; + } + logUtils.log(`${new Date()} [Server] OrderWatcher output: ${JSON.stringify(response)}`); + connection.sendUTF(JSON.stringify(response)); + } + + private _onCloseCallback(connection: WebSocket.connection): void { + this._connectionStore.delete(connection); + logUtils.log(`${new Date()} [Server] Client ${connection.remoteAddress} disconnected.`); + } + + private async _routeRequestAsync(request: WebSocketRequest): Promise { + logUtils.log(`${new Date()} [Server] Request received: ${request.method}`); + switch (request.method) { + case OrderWatcherMethod.AddOrder: { + const signedOrder: SignedOrder = OrderWatcherWebSocketServer._parseSignedOrder( + request.params.signedOrder, + ); + await this._orderWatcher.addOrderAsync(signedOrder); + break; + } + case OrderWatcherMethod.RemoveOrder: { + this._orderWatcher.removeOrder(request.params.orderHash || 'undefined'); + break; + } + case OrderWatcherMethod.GetStats: { + return this._orderWatcher.getStats(); + break; + } + default: + // Should never reach here. Should be caught by JSON schema check. + } + return undefined; + } + + /** + * Broadcasts OrderState changes to ALL connected clients. At the moment, + * we do not support clients subscribing to only a subset of orders. As such, + * Client B will be notified of changes to an order that Client A added. + */ + private _broadcastCallback(err: Error | null, orderState?: OrderStateValid | OrderStateInvalid | undefined): void { + const method = OrderWatcherMethod.Update; + const response = + err === null + ? { + jsonrpc: JSONRPC_VERSION, + method, + result: orderState, + } + : { + jsonrpc: JSONRPC_VERSION, + method, + error: { + code: -32000, + message: err.message, + }, + }; + this._connectionStore.forEach((connection: WebSocket.connection) => { + connection.sendUTF(JSON.stringify(response)); + }); + } +} diff --git a/packages/order-watcher/src/types.ts b/packages/order-watcher/src/types.ts index 90d383660..ecbebe305 100644 --- a/packages/order-watcher/src/types.ts +++ b/packages/order-watcher/src/types.ts @@ -83,7 +83,13 @@ interface ErrorWebSocketResponse { id: null; jsonrpc: string; method: null; - error: string; + error: JSONRPCError; +} + +interface JSONRPCError { + code: number; + message: string; + data?: string | object; } export interface GetStatsResult { diff --git a/packages/order-watcher/test/order_watcher_websocket_server_test.ts b/packages/order-watcher/test/order_watcher_websocket_server_test.ts new file mode 100644 index 000000000..9f9db7b1f --- /dev/null +++ b/packages/order-watcher/test/order_watcher_websocket_server_test.ts @@ -0,0 +1,288 @@ +import { ContractWrappers } from '@0x/contract-wrappers'; +import { tokenUtils } from '@0x/contract-wrappers/lib/test/utils/token_utils'; +import { BlockchainLifecycle } from '@0x/dev-utils'; +import { FillScenarios } from '@0x/fill-scenarios'; +import { assetDataUtils, orderHashUtils } from '@0x/order-utils'; +import { ExchangeContractErrs, OrderStateInvalid, OrderStateValid, SignedOrder } from '@0x/types'; +import { BigNumber, logUtils } from '@0x/utils'; +import { Web3Wrapper } from '@0x/web3-wrapper'; +import * as chai from 'chai'; +import 'mocha'; +import * as WebSocket from 'websocket'; + +import { OrderWatcherWebSocketServer } from '../src/order_watcher/order_watcher_websocket_server'; + +import { chaiSetup } from './utils/chai_setup'; +import { constants } from './utils/constants'; +import { migrateOnceAsync } from './utils/migrate'; +import { provider, web3Wrapper } from './utils/web3_wrapper'; + +chaiSetup.configure(); +const expect = chai.expect; +const blockchainLifecycle = new BlockchainLifecycle(web3Wrapper); + +interface WsMessage { + data: string; +} + +describe.only('OrderWatcherWebSocketServer', async () => { + let contractWrappers: ContractWrappers; + let wsServer: OrderWatcherWebSocketServer; + let wsClient: WebSocket.w3cwebsocket; + let wsClientTwo: WebSocket.w3cwebsocket; + let fillScenarios: FillScenarios; + let userAddresses: string[]; + let makerAssetData: string; + let takerAssetData: string; + let makerTokenAddress: string; + let takerTokenAddress: string; + let makerAddress: string; + let takerAddress: string; + let zrxTokenAddress: string; + let signedOrder: SignedOrder; + let orderHash: string; + // Manually encode types rather than use /src/types to mimick real data that user + // would input. Otherwise we would be forced to use enums, which hide problems. + let addOrderPayload: { id: string; jsonrpc: string; method: string; params: { signedOrder: SignedOrder } }; + let removeOrderPayload: { id: string; jsonrpc: string; method: string; params: { orderHash: string } }; + const decimals = constants.ZRX_DECIMALS; + const fillableAmount = Web3Wrapper.toBaseUnitAmount(new BigNumber(5), decimals); + // HACK: createFillableSignedOrderAsync is Promise-based, which forces us + // to use Promises instead of the done() callbacks for tests. + // onmessage callback must thus be wrapped as a Promise. + const _onMessageAsync = async (client: WebSocket.w3cwebsocket) => + new Promise(resolve => { + client.onmessage = (msg: WsMessage) => resolve(msg); + }); + + before(async () => { + // Set up constants + const contractAddresses = await migrateOnceAsync(); + await blockchainLifecycle.startAsync(); + const networkId = constants.TESTRPC_NETWORK_ID; + const config = { + networkId, + contractAddresses, + }; + contractWrappers = new ContractWrappers(provider, config); + userAddresses = await web3Wrapper.getAvailableAddressesAsync(); + zrxTokenAddress = contractAddresses.zrxToken; + [makerAddress, takerAddress] = userAddresses; + [makerTokenAddress, takerTokenAddress] = tokenUtils.getDummyERC20TokenAddresses(); + [makerAssetData, takerAssetData] = [ + assetDataUtils.encodeERC20AssetData(makerTokenAddress), + assetDataUtils.encodeERC20AssetData(takerTokenAddress), + ]; + fillScenarios = new FillScenarios( + provider, + userAddresses, + zrxTokenAddress, + contractAddresses.exchange, + contractAddresses.erc20Proxy, + contractAddresses.erc721Proxy, + ); + signedOrder = await fillScenarios.createFillableSignedOrderAsync( + makerAssetData, + takerAssetData, + makerAddress, + takerAddress, + fillableAmount, + ); + orderHash = orderHashUtils.getOrderHashHex(signedOrder); + addOrderPayload = { + id: 'addOrderPayload', + jsonrpc: '2.0', + method: 'ADD_ORDER', + params: { signedOrder }, + }; + removeOrderPayload = { + id: 'removeOrderPayload', + jsonrpc: '2.0', + method: 'REMOVE_ORDER', + params: { orderHash }, + }; + + // Prepare OrderWatcher WebSocket server + const orderWatcherConfig = {}; + wsServer = new OrderWatcherWebSocketServer(provider, networkId, contractAddresses, orderWatcherConfig); + wsServer.listen(); + }); + after(async () => { + await blockchainLifecycle.revertAsync(); + wsServer.close(); + }); + beforeEach(async () => { + await blockchainLifecycle.startAsync(); + wsClient = new WebSocket.w3cwebsocket('ws://127.0.0.1:8080/'); + logUtils.log(`${new Date()} [Client] Connected.`); + }); + afterEach(async () => { + await blockchainLifecycle.revertAsync(); + wsClient.close(); + logUtils.log(`${new Date()} [Client] Closed.`); + }); + + it('responds to getStats requests correctly', (done: any) => { + const payload = { + id: 'getStats', + jsonrpc: '2.0', + method: 'GET_STATS', + }; + wsClient.onopen = () => wsClient.send(JSON.stringify(payload)); + wsClient.onmessage = (msg: any) => { + const responseData = JSON.parse(msg.data); + expect(responseData.id).to.be.eq('getStats'); + expect(responseData.jsonrpc).to.be.eq('2.0'); + expect(responseData.method).to.be.eq('GET_STATS'); + expect(responseData.result.orderCount).to.be.eq(0); + done(); + }; + }); + + it('throws an error when an invalid method is attempted', async () => { + const invalidMethodPayload = { + id: 'invalidMethodPayload', + jsonrpc: '2.0', + method: 'BAD_METHOD', + }; + wsClient.onopen = () => wsClient.send(JSON.stringify(invalidMethodPayload)); + const errorMsg = await _onMessageAsync(wsClient); + const errorData = JSON.parse(errorMsg.data); + // tslint:disable-next-line:no-unused-expression + expect(errorData.id).to.be.null; + // tslint:disable-next-line:no-unused-expression + expect(errorData.method).to.be.null; + expect(errorData.jsonrpc).to.be.eq('2.0'); + expect(errorData.error).to.match(/^Error: Expected request to conform to schema/); + }); + + it('throws an error when jsonrpc field missing from request', async () => { + const noJsonRpcPayload = { + id: 'noJsonRpcPayload', + method: 'GET_STATS', + }; + wsClient.onopen = () => wsClient.send(JSON.stringify(noJsonRpcPayload)); + const errorMsg = await _onMessageAsync(wsClient); + const errorData = JSON.parse(errorMsg.data); + // tslint:disable-next-line:no-unused-expression + expect(errorData.method).to.be.null; + expect(errorData.jsonrpc).to.be.eq('2.0'); + expect(errorData.error).to.match(/^Error: Expected request to conform to schema/); + }); + + it('throws an error when we try to add an order without a signedOrder', async () => { + const noSignedOrderAddOrderPayload = { + id: 'noSignedOrderAddOrderPayload', + jsonrpc: '2.0', + method: 'ADD_ORDER', + orderHash: '0x7337e2f2a9aa2ed6afe26edc2df7ad79c3ffa9cf9b81a964f707ea63f5272355', + }; + wsClient.onopen = () => wsClient.send(JSON.stringify(noSignedOrderAddOrderPayload)); + const errorMsg = await _onMessageAsync(wsClient); + const errorData = JSON.parse(errorMsg.data); + // tslint:disable-next-line:no-unused-expression + expect(errorData.id).to.be.null; + // tslint:disable-next-line:no-unused-expression + expect(errorData.method).to.be.null; + expect(errorData.jsonrpc).to.be.eq('2.0'); + expect(errorData.error).to.match(/^Error: Expected request to conform to schema/); + }); + + it('throws an error when we try to add a bad signedOrder', async () => { + const invalidAddOrderPayload = { + id: 'invalidAddOrderPayload', + jsonrpc: '2.0', + method: 'ADD_ORDER', + signedOrder: { + makerAddress: '0x0', + }, + }; + wsClient.onopen = () => wsClient.send(JSON.stringify(invalidAddOrderPayload)); + const errorMsg = await _onMessageAsync(wsClient); + const errorData = JSON.parse(errorMsg.data); + // tslint:disable-next-line:no-unused-expression + expect(errorData.id).to.be.null; + // tslint:disable-next-line:no-unused-expression + expect(errorData.method).to.be.null; + expect(errorData.error).to.match(/^Error: Expected request to conform to schema/); + }); + + it('executes addOrder and removeOrder requests correctly', async () => { + wsClient.onopen = () => wsClient.send(JSON.stringify(addOrderPayload)); + const addOrderMsg = await _onMessageAsync(wsClient); + const addOrderData = JSON.parse(addOrderMsg.data); + expect(addOrderData.method).to.be.eq('ADD_ORDER'); + expect((wsServer as any)._orderWatcher._orderByOrderHash).to.deep.include({ + [orderHash]: signedOrder, + }); + + wsClient.send(JSON.stringify(removeOrderPayload)); + const removeOrderMsg = await _onMessageAsync(wsClient); + const removeOrderData = JSON.parse(removeOrderMsg.data); + expect(removeOrderData.method).to.be.eq('REMOVE_ORDER'); + expect((wsServer as any)._orderWatcher._orderByOrderHash).to.not.deep.include({ + [orderHash]: signedOrder, + }); + }); + + it('broadcasts orderStateInvalid message when makerAddress allowance set to 0 for watched order', async () => { + // Add the regular order + wsClient.onopen = () => wsClient.send(JSON.stringify(addOrderPayload)); + await _onMessageAsync(wsClient); + + // Set the allowance to 0 + await contractWrappers.erc20Token.setProxyAllowanceAsync(makerTokenAddress, makerAddress, new BigNumber(0)); + + // Ensure that orderStateInvalid message is received. + const orderWatcherUpdateMsg = await _onMessageAsync(wsClient); + const orderWatcherUpdateData = JSON.parse(orderWatcherUpdateMsg.data); + expect(orderWatcherUpdateData.method).to.be.eq('UPDATE'); + const invalidOrderState = orderWatcherUpdateData.result as OrderStateInvalid; + expect(invalidOrderState.isValid).to.be.false(); + expect(invalidOrderState.orderHash).to.be.eq(orderHash); + expect(invalidOrderState.error).to.be.eq(ExchangeContractErrs.InsufficientMakerAllowance); + }); + + it('broadcasts to multiple clients when an order backing ZRX allowance changes', async () => { + // Prepare order + const makerFee = Web3Wrapper.toBaseUnitAmount(new BigNumber(2), decimals); + const takerFee = Web3Wrapper.toBaseUnitAmount(new BigNumber(0), decimals); + const nonZeroMakerFeeSignedOrder = await fillScenarios.createFillableSignedOrderWithFeesAsync( + makerAssetData, + takerAssetData, + makerFee, + takerFee, + makerAddress, + takerAddress, + fillableAmount, + takerAddress, + ); + const nonZeroMakerFeeOrderPayload = { + id: 'nonZeroMakerFeeOrderPayload', + jsonrpc: '2.0', + method: 'ADD_ORDER', + signedOrder: nonZeroMakerFeeSignedOrder, + }; + + // Set up a second client and have it add the order + wsClientTwo = new WebSocket.w3cwebsocket('ws://127.0.0.1:8080/'); + logUtils.log(`${new Date()} [Client] Connected.`); + wsClientTwo.onopen = () => wsClientTwo.send(JSON.stringify(nonZeroMakerFeeOrderPayload)); + await _onMessageAsync(wsClientTwo); + + // Change the allowance + await contractWrappers.erc20Token.setProxyAllowanceAsync(zrxTokenAddress, makerAddress, new BigNumber(0)); + + // Check that both clients receive the emitted event + for (const client of [wsClient, wsClientTwo]) { + const updateMsg = await _onMessageAsync(client); + const updateData = JSON.parse(updateMsg.data); + const orderState = updateData.result as OrderStateValid; + expect(orderState.isValid).to.be.true(); + expect(orderState.orderRelevantState.makerFeeProxyAllowance).to.be.eq('0'); + } + + wsClientTwo.close(); + logUtils.log(`${new Date()} [Client] Closed.`); + }); +}); diff --git a/packages/order-watcher/test/order_watcher_websocket_test.ts b/packages/order-watcher/test/order_watcher_websocket_test.ts deleted file mode 100644 index c4d1ede45..000000000 --- a/packages/order-watcher/test/order_watcher_websocket_test.ts +++ /dev/null @@ -1,288 +0,0 @@ -import { ContractWrappers } from '@0x/contract-wrappers'; -import { tokenUtils } from '@0x/contract-wrappers/lib/test/utils/token_utils'; -import { BlockchainLifecycle } from '@0x/dev-utils'; -import { FillScenarios } from '@0x/fill-scenarios'; -import { assetDataUtils, orderHashUtils } from '@0x/order-utils'; -import { ExchangeContractErrs, OrderStateInvalid, OrderStateValid, SignedOrder } from '@0x/types'; -import { BigNumber, logUtils } from '@0x/utils'; -import { Web3Wrapper } from '@0x/web3-wrapper'; -import * as chai from 'chai'; -import 'mocha'; -import * as WebSocket from 'websocket'; - -import { OrderWatcherWebSocketServer } from '../src/order_watcher/order_watcher_websocket'; - -import { chaiSetup } from './utils/chai_setup'; -import { constants } from './utils/constants'; -import { migrateOnceAsync } from './utils/migrate'; -import { provider, web3Wrapper } from './utils/web3_wrapper'; - -chaiSetup.configure(); -const expect = chai.expect; -const blockchainLifecycle = new BlockchainLifecycle(web3Wrapper); - -interface WsMessage { - data: string; -} - -describe.only('OrderWatcherWebSocket', async () => { - let contractWrappers: ContractWrappers; - let wsServer: OrderWatcherWebSocketServer; - let wsClient: WebSocket.w3cwebsocket; - let wsClientTwo: WebSocket.w3cwebsocket; - let fillScenarios: FillScenarios; - let userAddresses: string[]; - let makerAssetData: string; - let takerAssetData: string; - let makerTokenAddress: string; - let takerTokenAddress: string; - let makerAddress: string; - let takerAddress: string; - let zrxTokenAddress: string; - let signedOrder: SignedOrder; - let orderHash: string; - // Manually encode types rather than use /src/types to mimick real data that user - // would input. Otherwise we would be forced to use enums, which hide problems. - let addOrderPayload: { id: string; jsonrpc: string; method: string; params: { signedOrder: SignedOrder } }; - let removeOrderPayload: { id: string; jsonrpc: string; method: string; params: { orderHash: string } }; - const decimals = constants.ZRX_DECIMALS; - const fillableAmount = Web3Wrapper.toBaseUnitAmount(new BigNumber(5), decimals); - // HACK: createFillableSignedOrderAsync is Promise-based, which forces us - // to use Promises instead of the done() callbacks for tests. - // onmessage callback must thus be wrapped as a Promise. - const _onMessageAsync = async (client: WebSocket.w3cwebsocket) => - new Promise(resolve => { - client.onmessage = (msg: WsMessage) => resolve(msg); - }); - - before(async () => { - // Set up constants - const contractAddresses = await migrateOnceAsync(); - await blockchainLifecycle.startAsync(); - const networkId = constants.TESTRPC_NETWORK_ID; - const config = { - networkId, - contractAddresses, - }; - contractWrappers = new ContractWrappers(provider, config); - userAddresses = await web3Wrapper.getAvailableAddressesAsync(); - zrxTokenAddress = contractAddresses.zrxToken; - [makerAddress, takerAddress] = userAddresses; - [makerTokenAddress, takerTokenAddress] = tokenUtils.getDummyERC20TokenAddresses(); - [makerAssetData, takerAssetData] = [ - assetDataUtils.encodeERC20AssetData(makerTokenAddress), - assetDataUtils.encodeERC20AssetData(takerTokenAddress), - ]; - fillScenarios = new FillScenarios( - provider, - userAddresses, - zrxTokenAddress, - contractAddresses.exchange, - contractAddresses.erc20Proxy, - contractAddresses.erc721Proxy, - ); - signedOrder = await fillScenarios.createFillableSignedOrderAsync( - makerAssetData, - takerAssetData, - makerAddress, - takerAddress, - fillableAmount, - ); - orderHash = orderHashUtils.getOrderHashHex(signedOrder); - addOrderPayload = { - id: 'addOrderPayload', - jsonrpc: '2.0', - method: 'ADD_ORDER', - params: { signedOrder }, - }; - removeOrderPayload = { - id: 'removeOrderPayload', - jsonrpc: '2.0', - method: 'REMOVE_ORDER', - params: { orderHash }, - }; - - // Prepare OrderWatcher WebSocket server - const orderWatcherConfig = {}; - wsServer = new OrderWatcherWebSocketServer(provider, networkId, contractAddresses, orderWatcherConfig); - wsServer.listen(); - }); - after(async () => { - await blockchainLifecycle.revertAsync(); - wsServer.close(); - }); - beforeEach(async () => { - await blockchainLifecycle.startAsync(); - wsClient = new WebSocket.w3cwebsocket('ws://127.0.0.1:8080/'); - logUtils.log(`${new Date()} [Client] Connected.`); - }); - afterEach(async () => { - await blockchainLifecycle.revertAsync(); - wsClient.close(); - logUtils.log(`${new Date()} [Client] Closed.`); - }); - - it('responds to getStats requests correctly', (done: any) => { - const payload = { - id: 'getStats', - jsonrpc: '2.0', - method: 'GET_STATS', - }; - wsClient.onopen = () => wsClient.send(JSON.stringify(payload)); - wsClient.onmessage = (msg: any) => { - const responseData = JSON.parse(msg.data); - expect(responseData.id).to.be.eq('getStats'); - expect(responseData.jsonrpc).to.be.eq('2.0'); - expect(responseData.method).to.be.eq('GET_STATS'); - expect(responseData.result.orderCount).to.be.eq(0); - done(); - }; - }); - - it('throws an error when an invalid method is attempted', async () => { - const invalidMethodPayload = { - id: 'invalidMethodPayload', - jsonrpc: '2.0', - method: 'BAD_METHOD', - }; - wsClient.onopen = () => wsClient.send(JSON.stringify(invalidMethodPayload)); - const errorMsg = await _onMessageAsync(wsClient); - const errorData = JSON.parse(errorMsg.data); - // tslint:disable-next-line:no-unused-expression - expect(errorData.id).to.be.null; - // tslint:disable-next-line:no-unused-expression - expect(errorData.method).to.be.null; - expect(errorData.jsonrpc).to.be.eq('2.0'); - expect(errorData.error).to.match(/^Error: Expected request to conform to schema/); - }); - - it('throws an error when jsonrpc field missing from request', async () => { - const noJsonRpcPayload = { - id: 'noJsonRpcPayload', - method: 'GET_STATS', - }; - wsClient.onopen = () => wsClient.send(JSON.stringify(noJsonRpcPayload)); - const errorMsg = await _onMessageAsync(wsClient); - const errorData = JSON.parse(errorMsg.data); - // tslint:disable-next-line:no-unused-expression - expect(errorData.method).to.be.null; - expect(errorData.jsonrpc).to.be.eq('2.0'); - expect(errorData.error).to.match(/^Error: Expected request to conform to schema/); - }); - - it('throws an error when we try to add an order without a signedOrder', async () => { - const noSignedOrderAddOrderPayload = { - id: 'noSignedOrderAddOrderPayload', - jsonrpc: '2.0', - method: 'ADD_ORDER', - orderHash: '0x7337e2f2a9aa2ed6afe26edc2df7ad79c3ffa9cf9b81a964f707ea63f5272355', - }; - wsClient.onopen = () => wsClient.send(JSON.stringify(noSignedOrderAddOrderPayload)); - const errorMsg = await _onMessageAsync(wsClient); - const errorData = JSON.parse(errorMsg.data); - // tslint:disable-next-line:no-unused-expression - expect(errorData.id).to.be.null; - // tslint:disable-next-line:no-unused-expression - expect(errorData.method).to.be.null; - expect(errorData.jsonrpc).to.be.eq('2.0'); - expect(errorData.error).to.match(/^Error: Expected request to conform to schema/); - }); - - it('throws an error when we try to add a bad signedOrder', async () => { - const invalidAddOrderPayload = { - id: 'invalidAddOrderPayload', - jsonrpc: '2.0', - method: 'ADD_ORDER', - signedOrder: { - makerAddress: '0x0', - }, - }; - wsClient.onopen = () => wsClient.send(JSON.stringify(invalidAddOrderPayload)); - const errorMsg = await _onMessageAsync(wsClient); - const errorData = JSON.parse(errorMsg.data); - // tslint:disable-next-line:no-unused-expression - expect(errorData.id).to.be.null; - // tslint:disable-next-line:no-unused-expression - expect(errorData.method).to.be.null; - expect(errorData.error).to.match(/^Error: Expected request to conform to schema/); - }); - - it('executes addOrder and removeOrder requests correctly', async () => { - wsClient.onopen = () => wsClient.send(JSON.stringify(addOrderPayload)); - const addOrderMsg = await _onMessageAsync(wsClient); - const addOrderData = JSON.parse(addOrderMsg.data); - expect(addOrderData.method).to.be.eq('ADD_ORDER'); - expect((wsServer as any)._orderWatcher._orderByOrderHash).to.deep.include({ - [orderHash]: signedOrder, - }); - - wsClient.send(JSON.stringify(removeOrderPayload)); - const removeOrderMsg = await _onMessageAsync(wsClient); - const removeOrderData = JSON.parse(removeOrderMsg.data); - expect(removeOrderData.method).to.be.eq('REMOVE_ORDER'); - expect((wsServer as any)._orderWatcher._orderByOrderHash).to.not.deep.include({ - [orderHash]: signedOrder, - }); - }); - - it('broadcasts orderStateInvalid message when makerAddress allowance set to 0 for watched order', async () => { - // Add the regular order - wsClient.onopen = () => wsClient.send(JSON.stringify(addOrderPayload)); - await _onMessageAsync(wsClient); - - // Set the allowance to 0 - await contractWrappers.erc20Token.setProxyAllowanceAsync(makerTokenAddress, makerAddress, new BigNumber(0)); - - // Ensure that orderStateInvalid message is received. - const orderWatcherUpdateMsg = await _onMessageAsync(wsClient); - const orderWatcherUpdateData = JSON.parse(orderWatcherUpdateMsg.data); - expect(orderWatcherUpdateData.method).to.be.eq('UPDATE'); - const invalidOrderState = orderWatcherUpdateData.result as OrderStateInvalid; - expect(invalidOrderState.isValid).to.be.false(); - expect(invalidOrderState.orderHash).to.be.eq(orderHash); - expect(invalidOrderState.error).to.be.eq(ExchangeContractErrs.InsufficientMakerAllowance); - }); - - it('broadcasts to multiple clients when an order backing ZRX allowance changes', async () => { - // Prepare order - const makerFee = Web3Wrapper.toBaseUnitAmount(new BigNumber(2), decimals); - const takerFee = Web3Wrapper.toBaseUnitAmount(new BigNumber(0), decimals); - const nonZeroMakerFeeSignedOrder = await fillScenarios.createFillableSignedOrderWithFeesAsync( - makerAssetData, - takerAssetData, - makerFee, - takerFee, - makerAddress, - takerAddress, - fillableAmount, - takerAddress, - ); - const nonZeroMakerFeeOrderPayload = { - id: 'nonZeroMakerFeeOrderPayload', - jsonrpc: '2.0', - method: 'ADD_ORDER', - signedOrder: nonZeroMakerFeeSignedOrder, - }; - - // Set up a second client and have it add the order - wsClientTwo = new WebSocket.w3cwebsocket('ws://127.0.0.1:8080/'); - logUtils.log(`${new Date()} [Client] Connected.`); - wsClientTwo.onopen = () => wsClientTwo.send(JSON.stringify(nonZeroMakerFeeOrderPayload)); - await _onMessageAsync(wsClientTwo); - - // Change the allowance - await contractWrappers.erc20Token.setProxyAllowanceAsync(zrxTokenAddress, makerAddress, new BigNumber(0)); - - // Check that both clients receive the emitted event - for (const client of [wsClient, wsClientTwo]) { - const updateMsg = await _onMessageAsync(client); - const updateData = JSON.parse(updateMsg.data); - const orderState = updateData.result as OrderStateValid; - expect(orderState.isValid).to.be.true(); - expect(orderState.orderRelevantState.makerFeeProxyAllowance).to.be.eq('0'); - } - - wsClientTwo.close(); - logUtils.log(`${new Date()} [Client] Closed.`); - }); -}); -- cgit v1.2.3 From f510f9df997633830e93e174ba598a45cae51f48 Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Sat, 15 Dec 2018 21:34:56 -0800 Subject: remove unused instance variable --- .../order-watcher/src/order_watcher/order_watcher_websocket_server.ts | 2 -- 1 file changed, 2 deletions(-) (limited to 'packages/order-watcher') diff --git a/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts b/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts index 2d2d9e82e..a1b63128f 100644 --- a/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts +++ b/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts @@ -22,7 +22,6 @@ export class OrderWatcherWebSocketServer { private readonly _httpServer: http.Server; private readonly _connectionStore: Set; private readonly _wsServer: WebSocket.server; - private _jsonRpcRequestId: number; /** * Recover types lost when the payload is stringified. */ @@ -55,7 +54,6 @@ export class OrderWatcherWebSocketServer { contractAddresses?: ContractAddresses, partialConfig?: Partial, ) { - this._jsonRpcRequestId = 1; this._orderWatcher = new OrderWatcher(provider, networkId, contractAddresses, partialConfig); this._connectionStore = new Set(); this._httpServer = http.createServer(); -- cgit v1.2.3 From 7661cfc85ef9e267d15bd4d7bd06c3b6cc3f7931 Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Sun, 16 Dec 2018 16:21:27 -0800 Subject: Improve our compliance to the JSON RPC spec --- packages/order-watcher/README.md | 18 +++++------ .../order_watcher_websocket_server.ts | 36 ++++++++++++---------- .../order-watcher/src/schemas/websocket_schemas.ts | 2 ++ packages/order-watcher/src/types.ts | 10 +++--- .../test/order_watcher_websocket_server_test.ts | 4 +-- 5 files changed, 37 insertions(+), 33 deletions(-) (limited to 'packages/order-watcher') diff --git a/packages/order-watcher/README.md b/packages/order-watcher/README.md index aad90a59a..385fe4715 100644 --- a/packages/order-watcher/README.md +++ b/packages/order-watcher/README.md @@ -40,7 +40,7 @@ Several environmental variables can be set to configure the server: and accept connections from. When this is not set, we default to 8080. **Requests** -The server accepts three types of requests: `ADD_ORDER`, `REMOVE_ORDER` and `GET_STATS`. These mirror what the underlying OrderWatcher does. You can read more in the [wiki](https://0xproject.com/wiki#0x-OrderWatcher). Unlike the OrderWatcher, it does not expose any subscribe or unsubscribe functionality because the client implicitly subscribes and unsubscribes by connecting to the server. +The server accepts three types of requests: `ADD_ORDER`, `REMOVE_ORDER` and `GET_STATS`. These mirror what the underlying OrderWatcher does. You can read more in the [wiki](https://0xproject.com/wiki#0x-OrderWatcher). Unlike the OrderWatcher, it does not expose any `subscribe` or `unsubscribe` functionality because the WebSocket server keeps a single subscription open for all clients. The first step for making a request is establishing a connection with the server. In Javascript: @@ -58,9 +58,9 @@ wsClient = create_connection("ws://127.0.0.1:8080") With the connection established, you prepare the payload for your request. The payload is a json object with a format established by the [JSON RPC specification](https://www.jsonrpc.org/specification): -* `id`: All requests require you to specify a string as an id. When the server responds to the request, it provides an id as well to allow you to determine which request it is responding to. +* `id`: All requests require you to specify a numerical `id`. When the server responds to the request, the response will have the same `id` as the one supplied with your request. * `jsonrpc`: This is always the string `'2.0'`. -* `method`: This specifies the OrderWatcher method you want to call. I.e., `'ADD_ORDER'`, `'REMOVE_ORDER'`, and `'GET_STATS'`. +* `method`: This specifies the OrderWatcher method you want to call. I.e., `'ADD_ORDER'`, `'REMOVE_ORDER'` or `'GET_STATS'`. * `params`: These contain the parameters needed by OrderWatcher to execute the method you called. For `ADD_ORDER`, provide `{ signedOrder: }`. For `REMOVE_ORDER`, provide `{ orderHash: }`. For `GET_STATS`, no parameters are needed, so you may leave this empty. Next, convert the payload to a string and send it through the connection. @@ -68,7 +68,7 @@ In Javascript: ``` const addOrderPayload = { - id: 'order32', + id: 1, jsonrpc: '2.0', method: 'ADD_ORDER', params: { signedOrder: }, @@ -81,7 +81,7 @@ In Python: ``` import json remove_order_payload = { - 'id': 'order33', + 'id': 1, 'jsonrpc': '2.0', 'method': 'REMOVE_ORDER', 'params': {'orderHash': '0x6edc16bf37fde79f5012088c33784c730e2f103d9ab1caf73060c386ad107b7e'}, @@ -90,13 +90,13 @@ wsClient.send(json.dumps(remove_order_payload)); ``` **Response** -The server responds to all requests in a similar format. In the data field, you'll find another json object that has been converted into a string. This json object contains the following fields: +The server responds to all requests in a similar format. In the data field, you'll find another object containing the following fields: -* `id`: The id corresponding to the request that the server is responding to. `UPDATE` responses are not based on any requests so the `id` field is `null`. +* `id`: The id corresponding to the request that the server is responding to. `UPDATE` responses are not based on any requests so the `id` field is omitted`. * `jsonrpc`: Always `'2.0'`. * `method`: The method the server is responding to. Eg. `ADD_ORDER`. When order states change the server may also initiate a response. In this case, method will be listed as `UPDATE`. -* `result`: This field varies based on the method. `UPDATE` responses contained the new order state. `GET_STATS` responses contain the current order count. When there are errors, this field is `null`. -* `error`: When there is an error executing a request, the error message is listed here. When the server responds successfully, this field is `null`. +* `result`: This field varies based on the method. `UPDATE` responses contain the new order state. `GET_STATS` responses contain the current order count. When there are errors, this field is omitted. +* `error`: When there is an error executing a request, the [JSON RPC](https://www.jsonrpc.org/specification) error object is listed here. When the server responds successfully, this field is omitted. In Javascript, the responses can be parsed using the `onmessage` callback: diff --git a/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts b/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts index a1b63128f..eac48f849 100644 --- a/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts +++ b/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts @@ -12,7 +12,7 @@ import { assert } from '../utils/assert'; import { OrderWatcher } from './order_watcher'; const DEFAULT_HTTP_PORT = 8080; -const JSONRPC_VERSION = '2.0'; +const JSON_RPC_VERSION = '2.0'; // Wraps the OrderWatcher functionality in a WebSocket server. Motivations: // 1) Users can watch orders via non-typescript programs. @@ -77,16 +77,17 @@ export class OrderWatcherWebSocketServer { connection.on('close', this._onCloseCallback.bind(this, connection)); this._connectionStore.add(connection); }); + } + /** + * Activates the WebSocket server by subscribing to the OrderWatcher and + * starting the WebSocket's HTTP server + */ + public start(): void { // Have the WebSocket server subscribe to the OrderWatcher to receive updates. // These updates are then broadcast to clients in the _connectionStore. this._orderWatcher.subscribe(this._broadcastCallback.bind(this)); - } - /** - * Activates the WebSocket server by having its HTTP server start listening. - */ - public listen(): void { const port = process.env.ORDER_WATCHER_HTTP_PORT || DEFAULT_HTTP_PORT; this._httpServer.listen(port, () => { logUtils.log(`${new Date()} [Server] Listening on port ${port}`); @@ -95,29 +96,30 @@ export class OrderWatcherWebSocketServer { /** * Deactivates the WebSocket server by stopping the HTTP server from accepting - * new connections. + * new connections and unsubscribing from the OrderWatcher */ - public close(): void { + public stop(): void { this._httpServer.close(); + this._orderWatcher.unsubscribe(); } private async _onMessageCallbackAsync(connection: WebSocket.connection, message: any): Promise { let response: WebSocketResponse; + assert.doesConformToSchema('message', message, webSocketUtf8MessageSchema); + const request: WebSocketRequest = JSON.parse(message.utf8Data); + assert.doesConformToSchema('request', request, webSocketRequestSchema); + assert.isString(request.jsonrpc, JSON_RPC_VERSION); try { - assert.doesConformToSchema('message', message, webSocketUtf8MessageSchema); - const request: WebSocketRequest = JSON.parse(message.utf8Data); - assert.doesConformToSchema('request', request, webSocketRequestSchema); - assert.isString(request.jsonrpc, JSONRPC_VERSION); response = { id: request.id, - jsonrpc: JSONRPC_VERSION, + jsonrpc: JSON_RPC_VERSION, method: request.method, result: await this._routeRequestAsync(request), }; } catch (err) { response = { - id: null, - jsonrpc: JSONRPC_VERSION, + id: request.id, + jsonrpc: JSON_RPC_VERSION, method: null, error: err.toString(), }; @@ -165,12 +167,12 @@ export class OrderWatcherWebSocketServer { const response = err === null ? { - jsonrpc: JSONRPC_VERSION, + jsonrpc: JSON_RPC_VERSION, method, result: orderState, } : { - jsonrpc: JSONRPC_VERSION, + jsonrpc: JSON_RPC_VERSION, method, error: { code: -32000, diff --git a/packages/order-watcher/src/schemas/websocket_schemas.ts b/packages/order-watcher/src/schemas/websocket_schemas.ts index 5e4e1ab74..263dd45b3 100644 --- a/packages/order-watcher/src/schemas/websocket_schemas.ts +++ b/packages/order-watcher/src/schemas/websocket_schemas.ts @@ -1,3 +1,5 @@ +// TODO: Move these schemas to the `json-schemas` package and convert to JSON +// Rename to `OrderWatcherWebSocketRequestSchema`, etc... export const webSocketUtf8MessageSchema = { id: '/webSocketUtf8MessageSchema', properties: { diff --git a/packages/order-watcher/src/types.ts b/packages/order-watcher/src/types.ts index ecbebe305..536363d8a 100644 --- a/packages/order-watcher/src/types.ts +++ b/packages/order-watcher/src/types.ts @@ -49,21 +49,21 @@ export enum OrderWatcherMethod { export type WebSocketRequest = AddOrderRequest | RemoveOrderRequest | GetStatsRequest; interface AddOrderRequest { - id: string; + id: number; jsonrpc: string; method: OrderWatcherMethod.AddOrder; params: { signedOrder: SignedOrder }; } interface RemoveOrderRequest { - id: string; + id: number; jsonrpc: string; method: OrderWatcherMethod.RemoveOrder; params: { orderHash: string }; } interface GetStatsRequest { - id: string; + id: number; jsonrpc: string; method: OrderWatcherMethod.GetStats; } @@ -73,14 +73,14 @@ interface GetStatsRequest { export type WebSocketResponse = SuccessfulWebSocketResponse | ErrorWebSocketResponse; interface SuccessfulWebSocketResponse { - id: string | null; // id is null for UPDATE + id: number; jsonrpc: string; method: OrderWatcherMethod; result: OrderState | GetStatsResult | undefined; // result is undefined for ADD_ORDER and REMOVE_ORDER } interface ErrorWebSocketResponse { - id: null; + id: number; jsonrpc: string; method: null; error: JSONRPCError; diff --git a/packages/order-watcher/test/order_watcher_websocket_server_test.ts b/packages/order-watcher/test/order_watcher_websocket_server_test.ts index 9f9db7b1f..8a6deede8 100644 --- a/packages/order-watcher/test/order_watcher_websocket_server_test.ts +++ b/packages/order-watcher/test/order_watcher_websocket_server_test.ts @@ -105,11 +105,11 @@ describe.only('OrderWatcherWebSocketServer', async () => { // Prepare OrderWatcher WebSocket server const orderWatcherConfig = {}; wsServer = new OrderWatcherWebSocketServer(provider, networkId, contractAddresses, orderWatcherConfig); - wsServer.listen(); + wsServer.start(); }); after(async () => { await blockchainLifecycle.revertAsync(); - wsServer.close(); + wsServer.stop(); }); beforeEach(async () => { await blockchainLifecycle.startAsync(); -- cgit v1.2.3 From ee4185ab465c76b64b65efefb92e11b0ca4ecad4 Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Sun, 16 Dec 2018 16:52:37 -0800 Subject: Move OrderWatcher Websocket schemas to json-schemas and convert to JSON so that they are language agnostic --- .../order_watcher_websocket_server.ts | 6 +-- .../order-watcher/src/schemas/websocket_schemas.ts | 61 ---------------------- 2 files changed, 3 insertions(+), 64 deletions(-) (limited to 'packages/order-watcher') diff --git a/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts b/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts index eac48f849..f90961cc8 100644 --- a/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts +++ b/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts @@ -1,11 +1,11 @@ import { ContractAddresses } from '@0x/contract-addresses'; +import { schemas } from '@0x/json-schemas'; import { OrderStateInvalid, OrderStateValid, SignedOrder } from '@0x/types'; import { BigNumber, logUtils } from '@0x/utils'; import { Provider } from 'ethereum-types'; import * as http from 'http'; import * as WebSocket from 'websocket'; -import { webSocketRequestSchema, webSocketUtf8MessageSchema } from '../schemas/websocket_schemas'; import { GetStatsResult, OrderWatcherConfig, OrderWatcherMethod, WebSocketRequest, WebSocketResponse } from '../types'; import { assert } from '../utils/assert'; @@ -105,9 +105,9 @@ export class OrderWatcherWebSocketServer { private async _onMessageCallbackAsync(connection: WebSocket.connection, message: any): Promise { let response: WebSocketResponse; - assert.doesConformToSchema('message', message, webSocketUtf8MessageSchema); + assert.doesConformToSchema('message', message, schemas.orderWatcherWebSocketUtf8MessageSchema); const request: WebSocketRequest = JSON.parse(message.utf8Data); - assert.doesConformToSchema('request', request, webSocketRequestSchema); + assert.doesConformToSchema('request', request, schemas.orderWatcherWebSocketRequestSchema); assert.isString(request.jsonrpc, JSON_RPC_VERSION); try { response = { diff --git a/packages/order-watcher/src/schemas/websocket_schemas.ts b/packages/order-watcher/src/schemas/websocket_schemas.ts index 263dd45b3..df54a38e1 100644 --- a/packages/order-watcher/src/schemas/websocket_schemas.ts +++ b/packages/order-watcher/src/schemas/websocket_schemas.ts @@ -1,63 +1,2 @@ // TODO: Move these schemas to the `json-schemas` package and convert to JSON // Rename to `OrderWatcherWebSocketRequestSchema`, etc... -export const webSocketUtf8MessageSchema = { - id: '/webSocketUtf8MessageSchema', - properties: { - utf8Data: { type: 'string' }, - }, - type: 'object', - required: ['utf8Data'], -}; - -export const webSocketRequestSchema = { - id: '/webSocketRequestSchema', - type: 'object', - definitions: { - signedOrderParam: { - type: 'object', - properties: { - signedOrder: { $ref: '/signedOrderSchema' }, - }, - required: ['signedOrder'], - }, - orderHashParam: { - type: 'object', - properties: { - orderHash: { $ref: '/hexSchema' }, - }, - required: ['orderHash'], - }, - }, - oneOf: [ - { - type: 'object', - properties: { - id: { type: 'string' }, - jsonrpc: { type: 'string' }, - method: { enum: ['ADD_ORDER'] }, - params: { $ref: '#/definitions/signedOrderParam' }, - }, - required: ['id', 'jsonrpc', 'method', 'params'], - }, - { - type: 'object', - properties: { - id: { type: 'string' }, - jsonrpc: { type: 'string' }, - method: { enum: ['REMOVE_ORDER'] }, - params: { $ref: '#/definitions/orderHashParam' }, - }, - required: ['id', 'jsonrpc', 'method', 'params'], - }, - { - type: 'object', - properties: { - id: { type: 'string' }, - jsonrpc: { type: 'string' }, - method: { enum: ['GET_STATS'] }, - params: {}, - }, - required: ['id', 'jsonrpc', 'method'], - }, - ], -}; -- cgit v1.2.3 From 896c8d17c16c4f1e9670ab0747ae8934ce5400a5 Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Sun, 16 Dec 2018 17:31:38 -0800 Subject: Fix schemas and tests --- .../order_watcher_websocket_server.ts | 14 ++++++----- packages/order-watcher/src/types.ts | 14 +++++------ .../test/order_watcher_websocket_server_test.ts | 27 +++++++++++----------- 3 files changed, 29 insertions(+), 26 deletions(-) (limited to 'packages/order-watcher') diff --git a/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts b/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts index f90961cc8..da5667db3 100644 --- a/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts +++ b/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts @@ -105,20 +105,22 @@ export class OrderWatcherWebSocketServer { private async _onMessageCallbackAsync(connection: WebSocket.connection, message: any): Promise { let response: WebSocketResponse; - assert.doesConformToSchema('message', message, schemas.orderWatcherWebSocketUtf8MessageSchema); - const request: WebSocketRequest = JSON.parse(message.utf8Data); - assert.doesConformToSchema('request', request, schemas.orderWatcherWebSocketRequestSchema); - assert.isString(request.jsonrpc, JSON_RPC_VERSION); + let id: number | null = null; try { + assert.doesConformToSchema('message', message, schemas.orderWatcherWebSocketUtf8MessageSchema); + const request: WebSocketRequest = JSON.parse(message.utf8Data); + id = request.id; + assert.doesConformToSchema('request', request, schemas.orderWatcherWebSocketRequestSchema); + assert.isString(request.jsonrpc, JSON_RPC_VERSION); response = { - id: request.id, + id, jsonrpc: JSON_RPC_VERSION, method: request.method, result: await this._routeRequestAsync(request), }; } catch (err) { response = { - id: request.id, + id, jsonrpc: JSON_RPC_VERSION, method: null, error: err.toString(), diff --git a/packages/order-watcher/src/types.ts b/packages/order-watcher/src/types.ts index 536363d8a..2b529a939 100644 --- a/packages/order-watcher/src/types.ts +++ b/packages/order-watcher/src/types.ts @@ -48,21 +48,21 @@ export enum OrderWatcherMethod { // the data field of their WebSocket message to interact with the server. export type WebSocketRequest = AddOrderRequest | RemoveOrderRequest | GetStatsRequest; -interface AddOrderRequest { +export interface AddOrderRequest { id: number; jsonrpc: string; method: OrderWatcherMethod.AddOrder; params: { signedOrder: SignedOrder }; } -interface RemoveOrderRequest { +export interface RemoveOrderRequest { id: number; jsonrpc: string; method: OrderWatcherMethod.RemoveOrder; params: { orderHash: string }; } -interface GetStatsRequest { +export interface GetStatsRequest { id: number; jsonrpc: string; method: OrderWatcherMethod.GetStats; @@ -72,21 +72,21 @@ interface GetStatsRequest { // of the WebSocket messages that the server sends out. export type WebSocketResponse = SuccessfulWebSocketResponse | ErrorWebSocketResponse; -interface SuccessfulWebSocketResponse { +export interface SuccessfulWebSocketResponse { id: number; jsonrpc: string; method: OrderWatcherMethod; result: OrderState | GetStatsResult | undefined; // result is undefined for ADD_ORDER and REMOVE_ORDER } -interface ErrorWebSocketResponse { - id: number; +export interface ErrorWebSocketResponse { + id: number | null; jsonrpc: string; method: null; error: JSONRPCError; } -interface JSONRPCError { +export interface JSONRPCError { code: number; message: string; data?: string | object; diff --git a/packages/order-watcher/test/order_watcher_websocket_server_test.ts b/packages/order-watcher/test/order_watcher_websocket_server_test.ts index 8a6deede8..d21c676fc 100644 --- a/packages/order-watcher/test/order_watcher_websocket_server_test.ts +++ b/packages/order-watcher/test/order_watcher_websocket_server_test.ts @@ -11,6 +11,7 @@ import 'mocha'; import * as WebSocket from 'websocket'; import { OrderWatcherWebSocketServer } from '../src/order_watcher/order_watcher_websocket_server'; +import { AddOrderRequest, OrderWatcherMethod, RemoveOrderRequest } from '../src/types'; import { chaiSetup } from './utils/chai_setup'; import { constants } from './utils/constants'; @@ -43,8 +44,8 @@ describe.only('OrderWatcherWebSocketServer', async () => { let orderHash: string; // Manually encode types rather than use /src/types to mimick real data that user // would input. Otherwise we would be forced to use enums, which hide problems. - let addOrderPayload: { id: string; jsonrpc: string; method: string; params: { signedOrder: SignedOrder } }; - let removeOrderPayload: { id: string; jsonrpc: string; method: string; params: { orderHash: string } }; + let addOrderPayload: AddOrderRequest; + let removeOrderPayload: RemoveOrderRequest; const decimals = constants.ZRX_DECIMALS; const fillableAmount = Web3Wrapper.toBaseUnitAmount(new BigNumber(5), decimals); // HACK: createFillableSignedOrderAsync is Promise-based, which forces us @@ -90,15 +91,15 @@ describe.only('OrderWatcherWebSocketServer', async () => { ); orderHash = orderHashUtils.getOrderHashHex(signedOrder); addOrderPayload = { - id: 'addOrderPayload', + id: 1, jsonrpc: '2.0', - method: 'ADD_ORDER', + method: OrderWatcherMethod.AddOrder, params: { signedOrder }, }; removeOrderPayload = { - id: 'removeOrderPayload', + id: 1, jsonrpc: '2.0', - method: 'REMOVE_ORDER', + method: OrderWatcherMethod.RemoveOrder, params: { orderHash }, }; @@ -124,14 +125,14 @@ describe.only('OrderWatcherWebSocketServer', async () => { it('responds to getStats requests correctly', (done: any) => { const payload = { - id: 'getStats', + id: 1, jsonrpc: '2.0', method: 'GET_STATS', }; wsClient.onopen = () => wsClient.send(JSON.stringify(payload)); wsClient.onmessage = (msg: any) => { const responseData = JSON.parse(msg.data); - expect(responseData.id).to.be.eq('getStats'); + expect(responseData.id).to.be.eq(1); expect(responseData.jsonrpc).to.be.eq('2.0'); expect(responseData.method).to.be.eq('GET_STATS'); expect(responseData.result.orderCount).to.be.eq(0); @@ -141,7 +142,7 @@ describe.only('OrderWatcherWebSocketServer', async () => { it('throws an error when an invalid method is attempted', async () => { const invalidMethodPayload = { - id: 'invalidMethodPayload', + id: 1, jsonrpc: '2.0', method: 'BAD_METHOD', }; @@ -158,7 +159,7 @@ describe.only('OrderWatcherWebSocketServer', async () => { it('throws an error when jsonrpc field missing from request', async () => { const noJsonRpcPayload = { - id: 'noJsonRpcPayload', + id: 1, method: 'GET_STATS', }; wsClient.onopen = () => wsClient.send(JSON.stringify(noJsonRpcPayload)); @@ -172,7 +173,7 @@ describe.only('OrderWatcherWebSocketServer', async () => { it('throws an error when we try to add an order without a signedOrder', async () => { const noSignedOrderAddOrderPayload = { - id: 'noSignedOrderAddOrderPayload', + id: 1, jsonrpc: '2.0', method: 'ADD_ORDER', orderHash: '0x7337e2f2a9aa2ed6afe26edc2df7ad79c3ffa9cf9b81a964f707ea63f5272355', @@ -190,7 +191,7 @@ describe.only('OrderWatcherWebSocketServer', async () => { it('throws an error when we try to add a bad signedOrder', async () => { const invalidAddOrderPayload = { - id: 'invalidAddOrderPayload', + id: 1, jsonrpc: '2.0', method: 'ADD_ORDER', signedOrder: { @@ -258,7 +259,7 @@ describe.only('OrderWatcherWebSocketServer', async () => { takerAddress, ); const nonZeroMakerFeeOrderPayload = { - id: 'nonZeroMakerFeeOrderPayload', + id: 1, jsonrpc: '2.0', method: 'ADD_ORDER', signedOrder: nonZeroMakerFeeSignedOrder, -- cgit v1.2.3 From 5d0e715d9ac9f358c1cdf23c9c96d622e0f1060c Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Sun, 16 Dec 2018 17:46:28 -0800 Subject: Add isVerbose option to enable/disable logging --- .../order_watcher/order_watcher_websocket_server.ts | 20 +++++++++++++++----- .../test/order_watcher_websocket_server_test.ts | 9 ++++++++- 2 files changed, 23 insertions(+), 6 deletions(-) (limited to 'packages/order-watcher') diff --git a/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts b/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts index da5667db3..2e29e775a 100644 --- a/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts +++ b/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts @@ -22,6 +22,7 @@ export class OrderWatcherWebSocketServer { private readonly _httpServer: http.Server; private readonly _connectionStore: Set; private readonly _wsServer: WebSocket.server; + private readonly _isVerbose: boolean; /** * Recover types lost when the payload is stringified. */ @@ -47,13 +48,16 @@ export class OrderWatcherWebSocketServer { * @param contractAddresses Optional contract addresses. Defaults to known * addresses based on networkId. * @param partialConfig Optional configurations. + * @param isVerbose Whether to enable verbose logging. Defaults to true. */ constructor( provider: Provider, networkId: number, contractAddresses?: ContractAddresses, + isVerbose: boolean = true, partialConfig?: Partial, ) { + this._isVerbose = isVerbose; this._orderWatcher = new OrderWatcher(provider, networkId, contractAddresses, partialConfig); this._connectionStore = new Set(); this._httpServer = http.createServer(); @@ -72,7 +76,7 @@ export class OrderWatcherWebSocketServer { // Designed for usage pattern where client and server are run on the same // machine by the same user. As such, no security checks are in place. const connection: WebSocket.connection = request.accept(null, request.origin); - logUtils.log(`${new Date()} [Server] Accepted connection from origin ${request.origin}.`); + this._log(`${new Date()} [Server] Accepted connection from origin ${request.origin}.`); connection.on('message', this._onMessageCallbackAsync.bind(this, connection)); connection.on('close', this._onCloseCallback.bind(this, connection)); this._connectionStore.add(connection); @@ -90,7 +94,7 @@ export class OrderWatcherWebSocketServer { const port = process.env.ORDER_WATCHER_HTTP_PORT || DEFAULT_HTTP_PORT; this._httpServer.listen(port, () => { - logUtils.log(`${new Date()} [Server] Listening on port ${port}`); + this._log(`${new Date()} [Server] Listening on port ${port}`); }); } @@ -103,6 +107,12 @@ export class OrderWatcherWebSocketServer { this._orderWatcher.unsubscribe(); } + private _log(...args: any[]): void { + if (this._isVerbose) { + logUtils.log(...args); + } + } + private async _onMessageCallbackAsync(connection: WebSocket.connection, message: any): Promise { let response: WebSocketResponse; let id: number | null = null; @@ -126,17 +136,17 @@ export class OrderWatcherWebSocketServer { error: err.toString(), }; } - logUtils.log(`${new Date()} [Server] OrderWatcher output: ${JSON.stringify(response)}`); + this._log(`${new Date()} [Server] OrderWatcher output: ${JSON.stringify(response)}`); connection.sendUTF(JSON.stringify(response)); } private _onCloseCallback(connection: WebSocket.connection): void { this._connectionStore.delete(connection); - logUtils.log(`${new Date()} [Server] Client ${connection.remoteAddress} disconnected.`); + this._log(`${new Date()} [Server] Client ${connection.remoteAddress} disconnected.`); } private async _routeRequestAsync(request: WebSocketRequest): Promise { - logUtils.log(`${new Date()} [Server] Request received: ${request.method}`); + this._log(`${new Date()} [Server] Request received: ${request.method}`); switch (request.method) { case OrderWatcherMethod.AddOrder: { const signedOrder: SignedOrder = OrderWatcherWebSocketServer._parseSignedOrder( diff --git a/packages/order-watcher/test/order_watcher_websocket_server_test.ts b/packages/order-watcher/test/order_watcher_websocket_server_test.ts index d21c676fc..d1a947105 100644 --- a/packages/order-watcher/test/order_watcher_websocket_server_test.ts +++ b/packages/order-watcher/test/order_watcher_websocket_server_test.ts @@ -105,7 +105,14 @@ describe.only('OrderWatcherWebSocketServer', async () => { // Prepare OrderWatcher WebSocket server const orderWatcherConfig = {}; - wsServer = new OrderWatcherWebSocketServer(provider, networkId, contractAddresses, orderWatcherConfig); + const isVerbose = true; + wsServer = new OrderWatcherWebSocketServer( + provider, + networkId, + contractAddresses, + isVerbose, + orderWatcherConfig, + ); wsServer.start(); }); after(async () => { -- cgit v1.2.3 From a12b9e82f61ac136876f9d4b72b45aad266317cf Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Sun, 16 Dec 2018 18:00:23 -0800 Subject: Consolidate use of isVerbose in orderWatcherConfig --- .../src/order_watcher/order_watcher_websocket_server.ts | 16 +++++++++------- .../test/order_watcher_websocket_server_test.ts | 17 +++++------------ 2 files changed, 14 insertions(+), 19 deletions(-) (limited to 'packages/order-watcher') diff --git a/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts b/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts index 2e29e775a..b75b07603 100644 --- a/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts +++ b/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts @@ -47,18 +47,20 @@ export class OrderWatcherWebSocketServer { * @param networkId NetworkId to watch orders on. * @param contractAddresses Optional contract addresses. Defaults to known * addresses based on networkId. - * @param partialConfig Optional configurations. + * @param orderWatcherConfig OrderWatcher configurations. isVerbose sets the verbosity for the WebSocket server aswell. * @param isVerbose Whether to enable verbose logging. Defaults to true. */ constructor( provider: Provider, networkId: number, contractAddresses?: ContractAddresses, - isVerbose: boolean = true, - partialConfig?: Partial, + orderWatcherConfig?: Partial, ) { - this._isVerbose = isVerbose; - this._orderWatcher = new OrderWatcher(provider, networkId, contractAddresses, partialConfig); + this._isVerbose = + orderWatcherConfig !== undefined && orderWatcherConfig.isVerbose !== undefined + ? orderWatcherConfig.isVerbose + : true; + this._orderWatcher = new OrderWatcher(provider, networkId, contractAddresses, orderWatcherConfig); this._connectionStore = new Set(); this._httpServer = http.createServer(); this._wsServer = new WebSocket.server({ @@ -161,10 +163,10 @@ export class OrderWatcherWebSocketServer { } case OrderWatcherMethod.GetStats: { return this._orderWatcher.getStats(); - break; } default: - // Should never reach here. Should be caught by JSON schema check. + // Should never reach here. Should be caught by JSON schema check. + throw new Error(`Unexpected default case hit for request.method`); } return undefined; } diff --git a/packages/order-watcher/test/order_watcher_websocket_server_test.ts b/packages/order-watcher/test/order_watcher_websocket_server_test.ts index d1a947105..a66d2c6c2 100644 --- a/packages/order-watcher/test/order_watcher_websocket_server_test.ts +++ b/packages/order-watcher/test/order_watcher_websocket_server_test.ts @@ -26,7 +26,7 @@ interface WsMessage { data: string; } -describe.only('OrderWatcherWebSocketServer', async () => { +describe('OrderWatcherWebSocketServer', async () => { let contractWrappers: ContractWrappers; let wsServer: OrderWatcherWebSocketServer; let wsClient: WebSocket.w3cwebsocket; @@ -42,8 +42,6 @@ describe.only('OrderWatcherWebSocketServer', async () => { let zrxTokenAddress: string; let signedOrder: SignedOrder; let orderHash: string; - // Manually encode types rather than use /src/types to mimick real data that user - // would input. Otherwise we would be forced to use enums, which hide problems. let addOrderPayload: AddOrderRequest; let removeOrderPayload: RemoveOrderRequest; const decimals = constants.ZRX_DECIMALS; @@ -104,15 +102,10 @@ describe.only('OrderWatcherWebSocketServer', async () => { }; // Prepare OrderWatcher WebSocket server - const orderWatcherConfig = {}; - const isVerbose = true; - wsServer = new OrderWatcherWebSocketServer( - provider, - networkId, - contractAddresses, - isVerbose, - orderWatcherConfig, - ); + const orderWatcherConfig = { + isVerbose: true, + }; + wsServer = new OrderWatcherWebSocketServer(provider, networkId, contractAddresses, orderWatcherConfig); wsServer.start(); }); after(async () => { -- cgit v1.2.3 From 6382f986086ee83374d3b78fed7f02e9d52b668f Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Sun, 16 Dec 2018 18:05:20 -0800 Subject: Fix file name --- packages/order-watcher/src/index.ts | 2 +- .../order_watcher_web_socket_server.ts | 200 ++++++++++++++ .../order_watcher_websocket_server.ts | 200 -------------- .../test/order_watcher_web_socket_server_test.ts | 289 +++++++++++++++++++++ .../test/order_watcher_websocket_server_test.ts | 289 --------------------- 5 files changed, 490 insertions(+), 490 deletions(-) create mode 100644 packages/order-watcher/src/order_watcher/order_watcher_web_socket_server.ts delete mode 100644 packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts create mode 100644 packages/order-watcher/test/order_watcher_web_socket_server_test.ts delete mode 100644 packages/order-watcher/test/order_watcher_websocket_server_test.ts (limited to 'packages/order-watcher') diff --git a/packages/order-watcher/src/index.ts b/packages/order-watcher/src/index.ts index 5bdef4504..e275a0c6a 100644 --- a/packages/order-watcher/src/index.ts +++ b/packages/order-watcher/src/index.ts @@ -1,5 +1,5 @@ export { OrderWatcher } from './order_watcher/order_watcher'; -export { OrderWatcherWebSocketServer } from './order_watcher/order_watcher_websocket_server'; +export { OrderWatcherWebSocketServer } from './order_watcher/order_watcher_web_socket_server'; export { ExpirationWatcher } from './order_watcher/expiration_watcher'; export { diff --git a/packages/order-watcher/src/order_watcher/order_watcher_web_socket_server.ts b/packages/order-watcher/src/order_watcher/order_watcher_web_socket_server.ts new file mode 100644 index 000000000..b75b07603 --- /dev/null +++ b/packages/order-watcher/src/order_watcher/order_watcher_web_socket_server.ts @@ -0,0 +1,200 @@ +import { ContractAddresses } from '@0x/contract-addresses'; +import { schemas } from '@0x/json-schemas'; +import { OrderStateInvalid, OrderStateValid, SignedOrder } from '@0x/types'; +import { BigNumber, logUtils } from '@0x/utils'; +import { Provider } from 'ethereum-types'; +import * as http from 'http'; +import * as WebSocket from 'websocket'; + +import { GetStatsResult, OrderWatcherConfig, OrderWatcherMethod, WebSocketRequest, WebSocketResponse } from '../types'; +import { assert } from '../utils/assert'; + +import { OrderWatcher } from './order_watcher'; + +const DEFAULT_HTTP_PORT = 8080; +const JSON_RPC_VERSION = '2.0'; + +// Wraps the OrderWatcher functionality in a WebSocket server. Motivations: +// 1) Users can watch orders via non-typescript programs. +// 2) Better encapsulation so that users can work +export class OrderWatcherWebSocketServer { + private readonly _orderWatcher: OrderWatcher; + private readonly _httpServer: http.Server; + private readonly _connectionStore: Set; + private readonly _wsServer: WebSocket.server; + private readonly _isVerbose: boolean; + /** + * Recover types lost when the payload is stringified. + */ + private static _parseSignedOrder(rawRequest: any): SignedOrder { + const bigNumberFields = [ + 'salt', + 'makerFee', + 'takerFee', + 'makerAssetAmount', + 'takerAssetAmount', + 'expirationTimeSeconds', + ]; + for (const field of bigNumberFields) { + rawRequest[field] = new BigNumber(rawRequest[field]); + } + return rawRequest; + } + + /** + * Instantiate a new WebSocket server which provides OrderWatcher functionality + * @param provider Web3 provider to use for JSON RPC calls. + * @param networkId NetworkId to watch orders on. + * @param contractAddresses Optional contract addresses. Defaults to known + * addresses based on networkId. + * @param orderWatcherConfig OrderWatcher configurations. isVerbose sets the verbosity for the WebSocket server aswell. + * @param isVerbose Whether to enable verbose logging. Defaults to true. + */ + constructor( + provider: Provider, + networkId: number, + contractAddresses?: ContractAddresses, + orderWatcherConfig?: Partial, + ) { + this._isVerbose = + orderWatcherConfig !== undefined && orderWatcherConfig.isVerbose !== undefined + ? orderWatcherConfig.isVerbose + : true; + this._orderWatcher = new OrderWatcher(provider, networkId, contractAddresses, orderWatcherConfig); + this._connectionStore = new Set(); + this._httpServer = http.createServer(); + this._wsServer = new WebSocket.server({ + httpServer: this._httpServer, + // Avoid setting autoAcceptConnections to true as it defeats all + // standard cross-origin protection facilities built into the protocol + // and the browser. + // Source: https://www.npmjs.com/package/websocket#server-example + // Also ensures that a request event is emitted by + // the server whenever a new WebSocket request is made. + autoAcceptConnections: false, + }); + + this._wsServer.on('request', async (request: any) => { + // Designed for usage pattern where client and server are run on the same + // machine by the same user. As such, no security checks are in place. + const connection: WebSocket.connection = request.accept(null, request.origin); + this._log(`${new Date()} [Server] Accepted connection from origin ${request.origin}.`); + connection.on('message', this._onMessageCallbackAsync.bind(this, connection)); + connection.on('close', this._onCloseCallback.bind(this, connection)); + this._connectionStore.add(connection); + }); + } + + /** + * Activates the WebSocket server by subscribing to the OrderWatcher and + * starting the WebSocket's HTTP server + */ + public start(): void { + // Have the WebSocket server subscribe to the OrderWatcher to receive updates. + // These updates are then broadcast to clients in the _connectionStore. + this._orderWatcher.subscribe(this._broadcastCallback.bind(this)); + + const port = process.env.ORDER_WATCHER_HTTP_PORT || DEFAULT_HTTP_PORT; + this._httpServer.listen(port, () => { + this._log(`${new Date()} [Server] Listening on port ${port}`); + }); + } + + /** + * Deactivates the WebSocket server by stopping the HTTP server from accepting + * new connections and unsubscribing from the OrderWatcher + */ + public stop(): void { + this._httpServer.close(); + this._orderWatcher.unsubscribe(); + } + + private _log(...args: any[]): void { + if (this._isVerbose) { + logUtils.log(...args); + } + } + + private async _onMessageCallbackAsync(connection: WebSocket.connection, message: any): Promise { + let response: WebSocketResponse; + let id: number | null = null; + try { + assert.doesConformToSchema('message', message, schemas.orderWatcherWebSocketUtf8MessageSchema); + const request: WebSocketRequest = JSON.parse(message.utf8Data); + id = request.id; + assert.doesConformToSchema('request', request, schemas.orderWatcherWebSocketRequestSchema); + assert.isString(request.jsonrpc, JSON_RPC_VERSION); + response = { + id, + jsonrpc: JSON_RPC_VERSION, + method: request.method, + result: await this._routeRequestAsync(request), + }; + } catch (err) { + response = { + id, + jsonrpc: JSON_RPC_VERSION, + method: null, + error: err.toString(), + }; + } + this._log(`${new Date()} [Server] OrderWatcher output: ${JSON.stringify(response)}`); + connection.sendUTF(JSON.stringify(response)); + } + + private _onCloseCallback(connection: WebSocket.connection): void { + this._connectionStore.delete(connection); + this._log(`${new Date()} [Server] Client ${connection.remoteAddress} disconnected.`); + } + + private async _routeRequestAsync(request: WebSocketRequest): Promise { + this._log(`${new Date()} [Server] Request received: ${request.method}`); + switch (request.method) { + case OrderWatcherMethod.AddOrder: { + const signedOrder: SignedOrder = OrderWatcherWebSocketServer._parseSignedOrder( + request.params.signedOrder, + ); + await this._orderWatcher.addOrderAsync(signedOrder); + break; + } + case OrderWatcherMethod.RemoveOrder: { + this._orderWatcher.removeOrder(request.params.orderHash || 'undefined'); + break; + } + case OrderWatcherMethod.GetStats: { + return this._orderWatcher.getStats(); + } + default: + // Should never reach here. Should be caught by JSON schema check. + throw new Error(`Unexpected default case hit for request.method`); + } + return undefined; + } + + /** + * Broadcasts OrderState changes to ALL connected clients. At the moment, + * we do not support clients subscribing to only a subset of orders. As such, + * Client B will be notified of changes to an order that Client A added. + */ + private _broadcastCallback(err: Error | null, orderState?: OrderStateValid | OrderStateInvalid | undefined): void { + const method = OrderWatcherMethod.Update; + const response = + err === null + ? { + jsonrpc: JSON_RPC_VERSION, + method, + result: orderState, + } + : { + jsonrpc: JSON_RPC_VERSION, + method, + error: { + code: -32000, + message: err.message, + }, + }; + this._connectionStore.forEach((connection: WebSocket.connection) => { + connection.sendUTF(JSON.stringify(response)); + }); + } +} diff --git a/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts b/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts deleted file mode 100644 index b75b07603..000000000 --- a/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts +++ /dev/null @@ -1,200 +0,0 @@ -import { ContractAddresses } from '@0x/contract-addresses'; -import { schemas } from '@0x/json-schemas'; -import { OrderStateInvalid, OrderStateValid, SignedOrder } from '@0x/types'; -import { BigNumber, logUtils } from '@0x/utils'; -import { Provider } from 'ethereum-types'; -import * as http from 'http'; -import * as WebSocket from 'websocket'; - -import { GetStatsResult, OrderWatcherConfig, OrderWatcherMethod, WebSocketRequest, WebSocketResponse } from '../types'; -import { assert } from '../utils/assert'; - -import { OrderWatcher } from './order_watcher'; - -const DEFAULT_HTTP_PORT = 8080; -const JSON_RPC_VERSION = '2.0'; - -// Wraps the OrderWatcher functionality in a WebSocket server. Motivations: -// 1) Users can watch orders via non-typescript programs. -// 2) Better encapsulation so that users can work -export class OrderWatcherWebSocketServer { - private readonly _orderWatcher: OrderWatcher; - private readonly _httpServer: http.Server; - private readonly _connectionStore: Set; - private readonly _wsServer: WebSocket.server; - private readonly _isVerbose: boolean; - /** - * Recover types lost when the payload is stringified. - */ - private static _parseSignedOrder(rawRequest: any): SignedOrder { - const bigNumberFields = [ - 'salt', - 'makerFee', - 'takerFee', - 'makerAssetAmount', - 'takerAssetAmount', - 'expirationTimeSeconds', - ]; - for (const field of bigNumberFields) { - rawRequest[field] = new BigNumber(rawRequest[field]); - } - return rawRequest; - } - - /** - * Instantiate a new WebSocket server which provides OrderWatcher functionality - * @param provider Web3 provider to use for JSON RPC calls. - * @param networkId NetworkId to watch orders on. - * @param contractAddresses Optional contract addresses. Defaults to known - * addresses based on networkId. - * @param orderWatcherConfig OrderWatcher configurations. isVerbose sets the verbosity for the WebSocket server aswell. - * @param isVerbose Whether to enable verbose logging. Defaults to true. - */ - constructor( - provider: Provider, - networkId: number, - contractAddresses?: ContractAddresses, - orderWatcherConfig?: Partial, - ) { - this._isVerbose = - orderWatcherConfig !== undefined && orderWatcherConfig.isVerbose !== undefined - ? orderWatcherConfig.isVerbose - : true; - this._orderWatcher = new OrderWatcher(provider, networkId, contractAddresses, orderWatcherConfig); - this._connectionStore = new Set(); - this._httpServer = http.createServer(); - this._wsServer = new WebSocket.server({ - httpServer: this._httpServer, - // Avoid setting autoAcceptConnections to true as it defeats all - // standard cross-origin protection facilities built into the protocol - // and the browser. - // Source: https://www.npmjs.com/package/websocket#server-example - // Also ensures that a request event is emitted by - // the server whenever a new WebSocket request is made. - autoAcceptConnections: false, - }); - - this._wsServer.on('request', async (request: any) => { - // Designed for usage pattern where client and server are run on the same - // machine by the same user. As such, no security checks are in place. - const connection: WebSocket.connection = request.accept(null, request.origin); - this._log(`${new Date()} [Server] Accepted connection from origin ${request.origin}.`); - connection.on('message', this._onMessageCallbackAsync.bind(this, connection)); - connection.on('close', this._onCloseCallback.bind(this, connection)); - this._connectionStore.add(connection); - }); - } - - /** - * Activates the WebSocket server by subscribing to the OrderWatcher and - * starting the WebSocket's HTTP server - */ - public start(): void { - // Have the WebSocket server subscribe to the OrderWatcher to receive updates. - // These updates are then broadcast to clients in the _connectionStore. - this._orderWatcher.subscribe(this._broadcastCallback.bind(this)); - - const port = process.env.ORDER_WATCHER_HTTP_PORT || DEFAULT_HTTP_PORT; - this._httpServer.listen(port, () => { - this._log(`${new Date()} [Server] Listening on port ${port}`); - }); - } - - /** - * Deactivates the WebSocket server by stopping the HTTP server from accepting - * new connections and unsubscribing from the OrderWatcher - */ - public stop(): void { - this._httpServer.close(); - this._orderWatcher.unsubscribe(); - } - - private _log(...args: any[]): void { - if (this._isVerbose) { - logUtils.log(...args); - } - } - - private async _onMessageCallbackAsync(connection: WebSocket.connection, message: any): Promise { - let response: WebSocketResponse; - let id: number | null = null; - try { - assert.doesConformToSchema('message', message, schemas.orderWatcherWebSocketUtf8MessageSchema); - const request: WebSocketRequest = JSON.parse(message.utf8Data); - id = request.id; - assert.doesConformToSchema('request', request, schemas.orderWatcherWebSocketRequestSchema); - assert.isString(request.jsonrpc, JSON_RPC_VERSION); - response = { - id, - jsonrpc: JSON_RPC_VERSION, - method: request.method, - result: await this._routeRequestAsync(request), - }; - } catch (err) { - response = { - id, - jsonrpc: JSON_RPC_VERSION, - method: null, - error: err.toString(), - }; - } - this._log(`${new Date()} [Server] OrderWatcher output: ${JSON.stringify(response)}`); - connection.sendUTF(JSON.stringify(response)); - } - - private _onCloseCallback(connection: WebSocket.connection): void { - this._connectionStore.delete(connection); - this._log(`${new Date()} [Server] Client ${connection.remoteAddress} disconnected.`); - } - - private async _routeRequestAsync(request: WebSocketRequest): Promise { - this._log(`${new Date()} [Server] Request received: ${request.method}`); - switch (request.method) { - case OrderWatcherMethod.AddOrder: { - const signedOrder: SignedOrder = OrderWatcherWebSocketServer._parseSignedOrder( - request.params.signedOrder, - ); - await this._orderWatcher.addOrderAsync(signedOrder); - break; - } - case OrderWatcherMethod.RemoveOrder: { - this._orderWatcher.removeOrder(request.params.orderHash || 'undefined'); - break; - } - case OrderWatcherMethod.GetStats: { - return this._orderWatcher.getStats(); - } - default: - // Should never reach here. Should be caught by JSON schema check. - throw new Error(`Unexpected default case hit for request.method`); - } - return undefined; - } - - /** - * Broadcasts OrderState changes to ALL connected clients. At the moment, - * we do not support clients subscribing to only a subset of orders. As such, - * Client B will be notified of changes to an order that Client A added. - */ - private _broadcastCallback(err: Error | null, orderState?: OrderStateValid | OrderStateInvalid | undefined): void { - const method = OrderWatcherMethod.Update; - const response = - err === null - ? { - jsonrpc: JSON_RPC_VERSION, - method, - result: orderState, - } - : { - jsonrpc: JSON_RPC_VERSION, - method, - error: { - code: -32000, - message: err.message, - }, - }; - this._connectionStore.forEach((connection: WebSocket.connection) => { - connection.sendUTF(JSON.stringify(response)); - }); - } -} diff --git a/packages/order-watcher/test/order_watcher_web_socket_server_test.ts b/packages/order-watcher/test/order_watcher_web_socket_server_test.ts new file mode 100644 index 000000000..fd388e907 --- /dev/null +++ b/packages/order-watcher/test/order_watcher_web_socket_server_test.ts @@ -0,0 +1,289 @@ +import { ContractWrappers } from '@0x/contract-wrappers'; +import { tokenUtils } from '@0x/contract-wrappers/lib/test/utils/token_utils'; +import { BlockchainLifecycle } from '@0x/dev-utils'; +import { FillScenarios } from '@0x/fill-scenarios'; +import { assetDataUtils, orderHashUtils } from '@0x/order-utils'; +import { ExchangeContractErrs, OrderStateInvalid, OrderStateValid, SignedOrder } from '@0x/types'; +import { BigNumber, logUtils } from '@0x/utils'; +import { Web3Wrapper } from '@0x/web3-wrapper'; +import * as chai from 'chai'; +import 'mocha'; +import * as WebSocket from 'websocket'; + +import { OrderWatcherWebSocketServer } from '../src/order_watcher/order_watcher_web_socket_server'; +import { AddOrderRequest, OrderWatcherMethod, RemoveOrderRequest } from '../src/types'; + +import { chaiSetup } from './utils/chai_setup'; +import { constants } from './utils/constants'; +import { migrateOnceAsync } from './utils/migrate'; +import { provider, web3Wrapper } from './utils/web3_wrapper'; + +chaiSetup.configure(); +const expect = chai.expect; +const blockchainLifecycle = new BlockchainLifecycle(web3Wrapper); + +interface WsMessage { + data: string; +} + +describe('OrderWatcherWebSocketServer', async () => { + let contractWrappers: ContractWrappers; + let wsServer: OrderWatcherWebSocketServer; + let wsClient: WebSocket.w3cwebsocket; + let wsClientTwo: WebSocket.w3cwebsocket; + let fillScenarios: FillScenarios; + let userAddresses: string[]; + let makerAssetData: string; + let takerAssetData: string; + let makerTokenAddress: string; + let takerTokenAddress: string; + let makerAddress: string; + let takerAddress: string; + let zrxTokenAddress: string; + let signedOrder: SignedOrder; + let orderHash: string; + let addOrderPayload: AddOrderRequest; + let removeOrderPayload: RemoveOrderRequest; + const decimals = constants.ZRX_DECIMALS; + const fillableAmount = Web3Wrapper.toBaseUnitAmount(new BigNumber(5), decimals); + // HACK: createFillableSignedOrderAsync is Promise-based, which forces us + // to use Promises instead of the done() callbacks for tests. + // onmessage callback must thus be wrapped as a Promise. + const _onMessageAsync = async (client: WebSocket.w3cwebsocket) => + new Promise(resolve => { + client.onmessage = (msg: WsMessage) => resolve(msg); + }); + + before(async () => { + // Set up constants + const contractAddresses = await migrateOnceAsync(); + await blockchainLifecycle.startAsync(); + const networkId = constants.TESTRPC_NETWORK_ID; + const config = { + networkId, + contractAddresses, + }; + contractWrappers = new ContractWrappers(provider, config); + userAddresses = await web3Wrapper.getAvailableAddressesAsync(); + zrxTokenAddress = contractAddresses.zrxToken; + [makerAddress, takerAddress] = userAddresses; + [makerTokenAddress, takerTokenAddress] = tokenUtils.getDummyERC20TokenAddresses(); + [makerAssetData, takerAssetData] = [ + assetDataUtils.encodeERC20AssetData(makerTokenAddress), + assetDataUtils.encodeERC20AssetData(takerTokenAddress), + ]; + fillScenarios = new FillScenarios( + provider, + userAddresses, + zrxTokenAddress, + contractAddresses.exchange, + contractAddresses.erc20Proxy, + contractAddresses.erc721Proxy, + ); + signedOrder = await fillScenarios.createFillableSignedOrderAsync( + makerAssetData, + takerAssetData, + makerAddress, + takerAddress, + fillableAmount, + ); + orderHash = orderHashUtils.getOrderHashHex(signedOrder); + addOrderPayload = { + id: 1, + jsonrpc: '2.0', + method: OrderWatcherMethod.AddOrder, + params: { signedOrder }, + }; + removeOrderPayload = { + id: 1, + jsonrpc: '2.0', + method: OrderWatcherMethod.RemoveOrder, + params: { orderHash }, + }; + + // Prepare OrderWatcher WebSocket server + const orderWatcherConfig = { + isVerbose: true, + }; + wsServer = new OrderWatcherWebSocketServer(provider, networkId, contractAddresses, orderWatcherConfig); + wsServer.start(); + }); + after(async () => { + await blockchainLifecycle.revertAsync(); + wsServer.stop(); + }); + beforeEach(async () => { + await blockchainLifecycle.startAsync(); + wsClient = new WebSocket.w3cwebsocket('ws://127.0.0.1:8080/'); + logUtils.log(`${new Date()} [Client] Connected.`); + }); + afterEach(async () => { + await blockchainLifecycle.revertAsync(); + wsClient.close(); + logUtils.log(`${new Date()} [Client] Closed.`); + }); + + it('responds to getStats requests correctly', (done: any) => { + const payload = { + id: 1, + jsonrpc: '2.0', + method: 'GET_STATS', + }; + wsClient.onopen = () => wsClient.send(JSON.stringify(payload)); + wsClient.onmessage = (msg: any) => { + const responseData = JSON.parse(msg.data); + expect(responseData.id).to.be.eq(1); + expect(responseData.jsonrpc).to.be.eq('2.0'); + expect(responseData.method).to.be.eq('GET_STATS'); + expect(responseData.result.orderCount).to.be.eq(0); + done(); + }; + }); + + it('throws an error when an invalid method is attempted', async () => { + const invalidMethodPayload = { + id: 1, + jsonrpc: '2.0', + method: 'BAD_METHOD', + }; + wsClient.onopen = () => wsClient.send(JSON.stringify(invalidMethodPayload)); + const errorMsg = await _onMessageAsync(wsClient); + const errorData = JSON.parse(errorMsg.data); + // tslint:disable-next-line:no-unused-expression + expect(errorData.id).to.be.null; + // tslint:disable-next-line:no-unused-expression + expect(errorData.method).to.be.null; + expect(errorData.jsonrpc).to.be.eq('2.0'); + expect(errorData.error).to.match(/^Error: Expected request to conform to schema/); + }); + + it('throws an error when jsonrpc field missing from request', async () => { + const noJsonRpcPayload = { + id: 1, + method: 'GET_STATS', + }; + wsClient.onopen = () => wsClient.send(JSON.stringify(noJsonRpcPayload)); + const errorMsg = await _onMessageAsync(wsClient); + const errorData = JSON.parse(errorMsg.data); + // tslint:disable-next-line:no-unused-expression + expect(errorData.method).to.be.null; + expect(errorData.jsonrpc).to.be.eq('2.0'); + expect(errorData.error).to.match(/^Error: Expected request to conform to schema/); + }); + + it('throws an error when we try to add an order without a signedOrder', async () => { + const noSignedOrderAddOrderPayload = { + id: 1, + jsonrpc: '2.0', + method: 'ADD_ORDER', + orderHash: '0x7337e2f2a9aa2ed6afe26edc2df7ad79c3ffa9cf9b81a964f707ea63f5272355', + }; + wsClient.onopen = () => wsClient.send(JSON.stringify(noSignedOrderAddOrderPayload)); + const errorMsg = await _onMessageAsync(wsClient); + const errorData = JSON.parse(errorMsg.data); + // tslint:disable-next-line:no-unused-expression + expect(errorData.id).to.be.null; + // tslint:disable-next-line:no-unused-expression + expect(errorData.method).to.be.null; + expect(errorData.jsonrpc).to.be.eq('2.0'); + expect(errorData.error).to.match(/^Error: Expected request to conform to schema/); + }); + + it('throws an error when we try to add a bad signedOrder', async () => { + const invalidAddOrderPayload = { + id: 1, + jsonrpc: '2.0', + method: 'ADD_ORDER', + signedOrder: { + makerAddress: '0x0', + }, + }; + wsClient.onopen = () => wsClient.send(JSON.stringify(invalidAddOrderPayload)); + const errorMsg = await _onMessageAsync(wsClient); + const errorData = JSON.parse(errorMsg.data); + // tslint:disable-next-line:no-unused-expression + expect(errorData.id).to.be.null; + // tslint:disable-next-line:no-unused-expression + expect(errorData.method).to.be.null; + expect(errorData.error).to.match(/^Error: Expected request to conform to schema/); + }); + + it('executes addOrder and removeOrder requests correctly', async () => { + wsClient.onopen = () => wsClient.send(JSON.stringify(addOrderPayload)); + const addOrderMsg = await _onMessageAsync(wsClient); + const addOrderData = JSON.parse(addOrderMsg.data); + expect(addOrderData.method).to.be.eq('ADD_ORDER'); + expect((wsServer as any)._orderWatcher._orderByOrderHash).to.deep.include({ + [orderHash]: signedOrder, + }); + + wsClient.send(JSON.stringify(removeOrderPayload)); + const removeOrderMsg = await _onMessageAsync(wsClient); + const removeOrderData = JSON.parse(removeOrderMsg.data); + expect(removeOrderData.method).to.be.eq('REMOVE_ORDER'); + expect((wsServer as any)._orderWatcher._orderByOrderHash).to.not.deep.include({ + [orderHash]: signedOrder, + }); + }); + + it('broadcasts orderStateInvalid message when makerAddress allowance set to 0 for watched order', async () => { + // Add the regular order + wsClient.onopen = () => wsClient.send(JSON.stringify(addOrderPayload)); + await _onMessageAsync(wsClient); + + // Set the allowance to 0 + await contractWrappers.erc20Token.setProxyAllowanceAsync(makerTokenAddress, makerAddress, new BigNumber(0)); + + // Ensure that orderStateInvalid message is received. + const orderWatcherUpdateMsg = await _onMessageAsync(wsClient); + const orderWatcherUpdateData = JSON.parse(orderWatcherUpdateMsg.data); + expect(orderWatcherUpdateData.method).to.be.eq('UPDATE'); + const invalidOrderState = orderWatcherUpdateData.result as OrderStateInvalid; + expect(invalidOrderState.isValid).to.be.false(); + expect(invalidOrderState.orderHash).to.be.eq(orderHash); + expect(invalidOrderState.error).to.be.eq(ExchangeContractErrs.InsufficientMakerAllowance); + }); + + it('broadcasts to multiple clients when an order backing ZRX allowance changes', async () => { + // Prepare order + const makerFee = Web3Wrapper.toBaseUnitAmount(new BigNumber(2), decimals); + const takerFee = Web3Wrapper.toBaseUnitAmount(new BigNumber(0), decimals); + const nonZeroMakerFeeSignedOrder = await fillScenarios.createFillableSignedOrderWithFeesAsync( + makerAssetData, + takerAssetData, + makerFee, + takerFee, + makerAddress, + takerAddress, + fillableAmount, + takerAddress, + ); + const nonZeroMakerFeeOrderPayload = { + id: 1, + jsonrpc: '2.0', + method: 'ADD_ORDER', + signedOrder: nonZeroMakerFeeSignedOrder, + }; + + // Set up a second client and have it add the order + wsClientTwo = new WebSocket.w3cwebsocket('ws://127.0.0.1:8080/'); + logUtils.log(`${new Date()} [Client] Connected.`); + wsClientTwo.onopen = () => wsClientTwo.send(JSON.stringify(nonZeroMakerFeeOrderPayload)); + await _onMessageAsync(wsClientTwo); + + // Change the allowance + await contractWrappers.erc20Token.setProxyAllowanceAsync(zrxTokenAddress, makerAddress, new BigNumber(0)); + + // Check that both clients receive the emitted event + for (const client of [wsClient, wsClientTwo]) { + const updateMsg = await _onMessageAsync(client); + const updateData = JSON.parse(updateMsg.data); + const orderState = updateData.result as OrderStateValid; + expect(orderState.isValid).to.be.true(); + expect(orderState.orderRelevantState.makerFeeProxyAllowance).to.be.eq('0'); + } + + wsClientTwo.close(); + logUtils.log(`${new Date()} [Client] Closed.`); + }); +}); diff --git a/packages/order-watcher/test/order_watcher_websocket_server_test.ts b/packages/order-watcher/test/order_watcher_websocket_server_test.ts deleted file mode 100644 index a66d2c6c2..000000000 --- a/packages/order-watcher/test/order_watcher_websocket_server_test.ts +++ /dev/null @@ -1,289 +0,0 @@ -import { ContractWrappers } from '@0x/contract-wrappers'; -import { tokenUtils } from '@0x/contract-wrappers/lib/test/utils/token_utils'; -import { BlockchainLifecycle } from '@0x/dev-utils'; -import { FillScenarios } from '@0x/fill-scenarios'; -import { assetDataUtils, orderHashUtils } from '@0x/order-utils'; -import { ExchangeContractErrs, OrderStateInvalid, OrderStateValid, SignedOrder } from '@0x/types'; -import { BigNumber, logUtils } from '@0x/utils'; -import { Web3Wrapper } from '@0x/web3-wrapper'; -import * as chai from 'chai'; -import 'mocha'; -import * as WebSocket from 'websocket'; - -import { OrderWatcherWebSocketServer } from '../src/order_watcher/order_watcher_websocket_server'; -import { AddOrderRequest, OrderWatcherMethod, RemoveOrderRequest } from '../src/types'; - -import { chaiSetup } from './utils/chai_setup'; -import { constants } from './utils/constants'; -import { migrateOnceAsync } from './utils/migrate'; -import { provider, web3Wrapper } from './utils/web3_wrapper'; - -chaiSetup.configure(); -const expect = chai.expect; -const blockchainLifecycle = new BlockchainLifecycle(web3Wrapper); - -interface WsMessage { - data: string; -} - -describe('OrderWatcherWebSocketServer', async () => { - let contractWrappers: ContractWrappers; - let wsServer: OrderWatcherWebSocketServer; - let wsClient: WebSocket.w3cwebsocket; - let wsClientTwo: WebSocket.w3cwebsocket; - let fillScenarios: FillScenarios; - let userAddresses: string[]; - let makerAssetData: string; - let takerAssetData: string; - let makerTokenAddress: string; - let takerTokenAddress: string; - let makerAddress: string; - let takerAddress: string; - let zrxTokenAddress: string; - let signedOrder: SignedOrder; - let orderHash: string; - let addOrderPayload: AddOrderRequest; - let removeOrderPayload: RemoveOrderRequest; - const decimals = constants.ZRX_DECIMALS; - const fillableAmount = Web3Wrapper.toBaseUnitAmount(new BigNumber(5), decimals); - // HACK: createFillableSignedOrderAsync is Promise-based, which forces us - // to use Promises instead of the done() callbacks for tests. - // onmessage callback must thus be wrapped as a Promise. - const _onMessageAsync = async (client: WebSocket.w3cwebsocket) => - new Promise(resolve => { - client.onmessage = (msg: WsMessage) => resolve(msg); - }); - - before(async () => { - // Set up constants - const contractAddresses = await migrateOnceAsync(); - await blockchainLifecycle.startAsync(); - const networkId = constants.TESTRPC_NETWORK_ID; - const config = { - networkId, - contractAddresses, - }; - contractWrappers = new ContractWrappers(provider, config); - userAddresses = await web3Wrapper.getAvailableAddressesAsync(); - zrxTokenAddress = contractAddresses.zrxToken; - [makerAddress, takerAddress] = userAddresses; - [makerTokenAddress, takerTokenAddress] = tokenUtils.getDummyERC20TokenAddresses(); - [makerAssetData, takerAssetData] = [ - assetDataUtils.encodeERC20AssetData(makerTokenAddress), - assetDataUtils.encodeERC20AssetData(takerTokenAddress), - ]; - fillScenarios = new FillScenarios( - provider, - userAddresses, - zrxTokenAddress, - contractAddresses.exchange, - contractAddresses.erc20Proxy, - contractAddresses.erc721Proxy, - ); - signedOrder = await fillScenarios.createFillableSignedOrderAsync( - makerAssetData, - takerAssetData, - makerAddress, - takerAddress, - fillableAmount, - ); - orderHash = orderHashUtils.getOrderHashHex(signedOrder); - addOrderPayload = { - id: 1, - jsonrpc: '2.0', - method: OrderWatcherMethod.AddOrder, - params: { signedOrder }, - }; - removeOrderPayload = { - id: 1, - jsonrpc: '2.0', - method: OrderWatcherMethod.RemoveOrder, - params: { orderHash }, - }; - - // Prepare OrderWatcher WebSocket server - const orderWatcherConfig = { - isVerbose: true, - }; - wsServer = new OrderWatcherWebSocketServer(provider, networkId, contractAddresses, orderWatcherConfig); - wsServer.start(); - }); - after(async () => { - await blockchainLifecycle.revertAsync(); - wsServer.stop(); - }); - beforeEach(async () => { - await blockchainLifecycle.startAsync(); - wsClient = new WebSocket.w3cwebsocket('ws://127.0.0.1:8080/'); - logUtils.log(`${new Date()} [Client] Connected.`); - }); - afterEach(async () => { - await blockchainLifecycle.revertAsync(); - wsClient.close(); - logUtils.log(`${new Date()} [Client] Closed.`); - }); - - it('responds to getStats requests correctly', (done: any) => { - const payload = { - id: 1, - jsonrpc: '2.0', - method: 'GET_STATS', - }; - wsClient.onopen = () => wsClient.send(JSON.stringify(payload)); - wsClient.onmessage = (msg: any) => { - const responseData = JSON.parse(msg.data); - expect(responseData.id).to.be.eq(1); - expect(responseData.jsonrpc).to.be.eq('2.0'); - expect(responseData.method).to.be.eq('GET_STATS'); - expect(responseData.result.orderCount).to.be.eq(0); - done(); - }; - }); - - it('throws an error when an invalid method is attempted', async () => { - const invalidMethodPayload = { - id: 1, - jsonrpc: '2.0', - method: 'BAD_METHOD', - }; - wsClient.onopen = () => wsClient.send(JSON.stringify(invalidMethodPayload)); - const errorMsg = await _onMessageAsync(wsClient); - const errorData = JSON.parse(errorMsg.data); - // tslint:disable-next-line:no-unused-expression - expect(errorData.id).to.be.null; - // tslint:disable-next-line:no-unused-expression - expect(errorData.method).to.be.null; - expect(errorData.jsonrpc).to.be.eq('2.0'); - expect(errorData.error).to.match(/^Error: Expected request to conform to schema/); - }); - - it('throws an error when jsonrpc field missing from request', async () => { - const noJsonRpcPayload = { - id: 1, - method: 'GET_STATS', - }; - wsClient.onopen = () => wsClient.send(JSON.stringify(noJsonRpcPayload)); - const errorMsg = await _onMessageAsync(wsClient); - const errorData = JSON.parse(errorMsg.data); - // tslint:disable-next-line:no-unused-expression - expect(errorData.method).to.be.null; - expect(errorData.jsonrpc).to.be.eq('2.0'); - expect(errorData.error).to.match(/^Error: Expected request to conform to schema/); - }); - - it('throws an error when we try to add an order without a signedOrder', async () => { - const noSignedOrderAddOrderPayload = { - id: 1, - jsonrpc: '2.0', - method: 'ADD_ORDER', - orderHash: '0x7337e2f2a9aa2ed6afe26edc2df7ad79c3ffa9cf9b81a964f707ea63f5272355', - }; - wsClient.onopen = () => wsClient.send(JSON.stringify(noSignedOrderAddOrderPayload)); - const errorMsg = await _onMessageAsync(wsClient); - const errorData = JSON.parse(errorMsg.data); - // tslint:disable-next-line:no-unused-expression - expect(errorData.id).to.be.null; - // tslint:disable-next-line:no-unused-expression - expect(errorData.method).to.be.null; - expect(errorData.jsonrpc).to.be.eq('2.0'); - expect(errorData.error).to.match(/^Error: Expected request to conform to schema/); - }); - - it('throws an error when we try to add a bad signedOrder', async () => { - const invalidAddOrderPayload = { - id: 1, - jsonrpc: '2.0', - method: 'ADD_ORDER', - signedOrder: { - makerAddress: '0x0', - }, - }; - wsClient.onopen = () => wsClient.send(JSON.stringify(invalidAddOrderPayload)); - const errorMsg = await _onMessageAsync(wsClient); - const errorData = JSON.parse(errorMsg.data); - // tslint:disable-next-line:no-unused-expression - expect(errorData.id).to.be.null; - // tslint:disable-next-line:no-unused-expression - expect(errorData.method).to.be.null; - expect(errorData.error).to.match(/^Error: Expected request to conform to schema/); - }); - - it('executes addOrder and removeOrder requests correctly', async () => { - wsClient.onopen = () => wsClient.send(JSON.stringify(addOrderPayload)); - const addOrderMsg = await _onMessageAsync(wsClient); - const addOrderData = JSON.parse(addOrderMsg.data); - expect(addOrderData.method).to.be.eq('ADD_ORDER'); - expect((wsServer as any)._orderWatcher._orderByOrderHash).to.deep.include({ - [orderHash]: signedOrder, - }); - - wsClient.send(JSON.stringify(removeOrderPayload)); - const removeOrderMsg = await _onMessageAsync(wsClient); - const removeOrderData = JSON.parse(removeOrderMsg.data); - expect(removeOrderData.method).to.be.eq('REMOVE_ORDER'); - expect((wsServer as any)._orderWatcher._orderByOrderHash).to.not.deep.include({ - [orderHash]: signedOrder, - }); - }); - - it('broadcasts orderStateInvalid message when makerAddress allowance set to 0 for watched order', async () => { - // Add the regular order - wsClient.onopen = () => wsClient.send(JSON.stringify(addOrderPayload)); - await _onMessageAsync(wsClient); - - // Set the allowance to 0 - await contractWrappers.erc20Token.setProxyAllowanceAsync(makerTokenAddress, makerAddress, new BigNumber(0)); - - // Ensure that orderStateInvalid message is received. - const orderWatcherUpdateMsg = await _onMessageAsync(wsClient); - const orderWatcherUpdateData = JSON.parse(orderWatcherUpdateMsg.data); - expect(orderWatcherUpdateData.method).to.be.eq('UPDATE'); - const invalidOrderState = orderWatcherUpdateData.result as OrderStateInvalid; - expect(invalidOrderState.isValid).to.be.false(); - expect(invalidOrderState.orderHash).to.be.eq(orderHash); - expect(invalidOrderState.error).to.be.eq(ExchangeContractErrs.InsufficientMakerAllowance); - }); - - it('broadcasts to multiple clients when an order backing ZRX allowance changes', async () => { - // Prepare order - const makerFee = Web3Wrapper.toBaseUnitAmount(new BigNumber(2), decimals); - const takerFee = Web3Wrapper.toBaseUnitAmount(new BigNumber(0), decimals); - const nonZeroMakerFeeSignedOrder = await fillScenarios.createFillableSignedOrderWithFeesAsync( - makerAssetData, - takerAssetData, - makerFee, - takerFee, - makerAddress, - takerAddress, - fillableAmount, - takerAddress, - ); - const nonZeroMakerFeeOrderPayload = { - id: 1, - jsonrpc: '2.0', - method: 'ADD_ORDER', - signedOrder: nonZeroMakerFeeSignedOrder, - }; - - // Set up a second client and have it add the order - wsClientTwo = new WebSocket.w3cwebsocket('ws://127.0.0.1:8080/'); - logUtils.log(`${new Date()} [Client] Connected.`); - wsClientTwo.onopen = () => wsClientTwo.send(JSON.stringify(nonZeroMakerFeeOrderPayload)); - await _onMessageAsync(wsClientTwo); - - // Change the allowance - await contractWrappers.erc20Token.setProxyAllowanceAsync(zrxTokenAddress, makerAddress, new BigNumber(0)); - - // Check that both clients receive the emitted event - for (const client of [wsClient, wsClientTwo]) { - const updateMsg = await _onMessageAsync(client); - const updateData = JSON.parse(updateMsg.data); - const orderState = updateData.result as OrderStateValid; - expect(orderState.isValid).to.be.true(); - expect(orderState.orderRelevantState.makerFeeProxyAllowance).to.be.eq('0'); - } - - wsClientTwo.close(); - logUtils.log(`${new Date()} [Client] Closed.`); - }); -}); -- cgit v1.2.3 From e295eeb8938468b1527d5d81f212766cef40bc81 Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Tue, 18 Dec 2018 16:25:26 +0000 Subject: Remove unused file --- packages/order-watcher/src/schemas/websocket_schemas.ts | 2 -- 1 file changed, 2 deletions(-) delete mode 100644 packages/order-watcher/src/schemas/websocket_schemas.ts (limited to 'packages/order-watcher') diff --git a/packages/order-watcher/src/schemas/websocket_schemas.ts b/packages/order-watcher/src/schemas/websocket_schemas.ts deleted file mode 100644 index df54a38e1..000000000 --- a/packages/order-watcher/src/schemas/websocket_schemas.ts +++ /dev/null @@ -1,2 +0,0 @@ -// TODO: Move these schemas to the `json-schemas` package and convert to JSON -// Rename to `OrderWatcherWebSocketRequestSchema`, etc... -- cgit v1.2.3 From e2510ed28f97feb33404ec0cb773214236620343 Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Wed, 19 Dec 2018 10:44:32 +0000 Subject: Add temporary console.log to test failing on CI --- packages/order-watcher/test/order_watcher_web_socket_server_test.ts | 1 + 1 file changed, 1 insertion(+) (limited to 'packages/order-watcher') diff --git a/packages/order-watcher/test/order_watcher_web_socket_server_test.ts b/packages/order-watcher/test/order_watcher_web_socket_server_test.ts index fd388e907..fa64ac305 100644 --- a/packages/order-watcher/test/order_watcher_web_socket_server_test.ts +++ b/packages/order-watcher/test/order_watcher_web_socket_server_test.ts @@ -278,6 +278,7 @@ describe('OrderWatcherWebSocketServer', async () => { for (const client of [wsClient, wsClientTwo]) { const updateMsg = await _onMessageAsync(client); const updateData = JSON.parse(updateMsg.data); + console.log('-------------------------- UPDATE_DATA: ', updateData); const orderState = updateData.result as OrderStateValid; expect(orderState.isValid).to.be.true(); expect(orderState.orderRelevantState.makerFeeProxyAllowance).to.be.eq('0'); -- cgit v1.2.3 From 84c8b83694cc16ed42cb01315803f124c582aab7 Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Wed, 19 Dec 2018 12:18:53 +0000 Subject: Fix WS tests to remove race-condition and be more specific about the message expected --- .../test/order_watcher_web_socket_server_test.ts | 57 +++++++++++++--------- 1 file changed, 34 insertions(+), 23 deletions(-) (limited to 'packages/order-watcher') diff --git a/packages/order-watcher/test/order_watcher_web_socket_server_test.ts b/packages/order-watcher/test/order_watcher_web_socket_server_test.ts index fa64ac305..8070860e7 100644 --- a/packages/order-watcher/test/order_watcher_web_socket_server_test.ts +++ b/packages/order-watcher/test/order_watcher_web_socket_server_test.ts @@ -26,7 +26,7 @@ interface WsMessage { data: string; } -describe('OrderWatcherWebSocketServer', async () => { +describe.only('OrderWatcherWebSocketServer', async () => { let contractWrappers: ContractWrappers; let wsServer: OrderWatcherWebSocketServer; let wsClient: WebSocket.w3cwebsocket; @@ -49,9 +49,14 @@ describe('OrderWatcherWebSocketServer', async () => { // HACK: createFillableSignedOrderAsync is Promise-based, which forces us // to use Promises instead of the done() callbacks for tests. // onmessage callback must thus be wrapped as a Promise. - const _onMessageAsync = async (client: WebSocket.w3cwebsocket) => + const _getOnMessagePromise = async (client: WebSocket.w3cwebsocket, method: string | null) => new Promise(resolve => { - client.onmessage = (msg: WsMessage) => resolve(msg); + client.onmessage = (msg: WsMessage) => { + const data = JSON.parse(msg.data); + if (data.method === method) { + resolve(msg); + } + }; }); before(async () => { @@ -106,20 +111,20 @@ describe('OrderWatcherWebSocketServer', async () => { isVerbose: true, }; wsServer = new OrderWatcherWebSocketServer(provider, networkId, contractAddresses, orderWatcherConfig); - wsServer.start(); }); after(async () => { await blockchainLifecycle.revertAsync(); - wsServer.stop(); }); beforeEach(async () => { + wsServer.start(); await blockchainLifecycle.startAsync(); wsClient = new WebSocket.w3cwebsocket('ws://127.0.0.1:8080/'); logUtils.log(`${new Date()} [Client] Connected.`); }); afterEach(async () => { - await blockchainLifecycle.revertAsync(); wsClient.close(); + await blockchainLifecycle.revertAsync(); + wsServer.stop(); logUtils.log(`${new Date()} [Client] Closed.`); }); @@ -147,7 +152,7 @@ describe('OrderWatcherWebSocketServer', async () => { method: 'BAD_METHOD', }; wsClient.onopen = () => wsClient.send(JSON.stringify(invalidMethodPayload)); - const errorMsg = await _onMessageAsync(wsClient); + const errorMsg = await _getOnMessagePromise(wsClient, null); const errorData = JSON.parse(errorMsg.data); // tslint:disable-next-line:no-unused-expression expect(errorData.id).to.be.null; @@ -163,7 +168,7 @@ describe('OrderWatcherWebSocketServer', async () => { method: 'GET_STATS', }; wsClient.onopen = () => wsClient.send(JSON.stringify(noJsonRpcPayload)); - const errorMsg = await _onMessageAsync(wsClient); + const errorMsg = await _getOnMessagePromise(wsClient, null); const errorData = JSON.parse(errorMsg.data); // tslint:disable-next-line:no-unused-expression expect(errorData.method).to.be.null; @@ -179,7 +184,7 @@ describe('OrderWatcherWebSocketServer', async () => { orderHash: '0x7337e2f2a9aa2ed6afe26edc2df7ad79c3ffa9cf9b81a964f707ea63f5272355', }; wsClient.onopen = () => wsClient.send(JSON.stringify(noSignedOrderAddOrderPayload)); - const errorMsg = await _onMessageAsync(wsClient); + const errorMsg = await _getOnMessagePromise(wsClient, null); const errorData = JSON.parse(errorMsg.data); // tslint:disable-next-line:no-unused-expression expect(errorData.id).to.be.null; @@ -199,7 +204,7 @@ describe('OrderWatcherWebSocketServer', async () => { }, }; wsClient.onopen = () => wsClient.send(JSON.stringify(invalidAddOrderPayload)); - const errorMsg = await _onMessageAsync(wsClient); + const errorMsg = await _getOnMessagePromise(wsClient, null); const errorData = JSON.parse(errorMsg.data); // tslint:disable-next-line:no-unused-expression expect(errorData.id).to.be.null; @@ -210,15 +215,16 @@ describe('OrderWatcherWebSocketServer', async () => { it('executes addOrder and removeOrder requests correctly', async () => { wsClient.onopen = () => wsClient.send(JSON.stringify(addOrderPayload)); - const addOrderMsg = await _onMessageAsync(wsClient); + const addOrderMsg = await _getOnMessagePromise(wsClient, OrderWatcherMethod.AddOrder); const addOrderData = JSON.parse(addOrderMsg.data); expect(addOrderData.method).to.be.eq('ADD_ORDER'); expect((wsServer as any)._orderWatcher._orderByOrderHash).to.deep.include({ [orderHash]: signedOrder, }); + const clientOnMessagePromise = _getOnMessagePromise(wsClient, OrderWatcherMethod.RemoveOrder); wsClient.send(JSON.stringify(removeOrderPayload)); - const removeOrderMsg = await _onMessageAsync(wsClient); + const removeOrderMsg = await clientOnMessagePromise; const removeOrderData = JSON.parse(removeOrderMsg.data); expect(removeOrderData.method).to.be.eq('REMOVE_ORDER'); expect((wsServer as any)._orderWatcher._orderByOrderHash).to.not.deep.include({ @@ -229,13 +235,13 @@ describe('OrderWatcherWebSocketServer', async () => { it('broadcasts orderStateInvalid message when makerAddress allowance set to 0 for watched order', async () => { // Add the regular order wsClient.onopen = () => wsClient.send(JSON.stringify(addOrderPayload)); - await _onMessageAsync(wsClient); + const clientOnMessagePromise = _getOnMessagePromise(wsClient, OrderWatcherMethod.Update); // Set the allowance to 0 await contractWrappers.erc20Token.setProxyAllowanceAsync(makerTokenAddress, makerAddress, new BigNumber(0)); // Ensure that orderStateInvalid message is received. - const orderWatcherUpdateMsg = await _onMessageAsync(wsClient); + const orderWatcherUpdateMsg = await clientOnMessagePromise; const orderWatcherUpdateData = JSON.parse(orderWatcherUpdateMsg.data); expect(orderWatcherUpdateData.method).to.be.eq('UPDATE'); const invalidOrderState = orderWatcherUpdateData.result as OrderStateInvalid; @@ -269,20 +275,25 @@ describe('OrderWatcherWebSocketServer', async () => { wsClientTwo = new WebSocket.w3cwebsocket('ws://127.0.0.1:8080/'); logUtils.log(`${new Date()} [Client] Connected.`); wsClientTwo.onopen = () => wsClientTwo.send(JSON.stringify(nonZeroMakerFeeOrderPayload)); - await _onMessageAsync(wsClientTwo); + + const clientOneOnMessagePromise = _getOnMessagePromise(wsClient, OrderWatcherMethod.Update); + const clientTwoOnMessagePromise = _getOnMessagePromise(wsClientTwo, OrderWatcherMethod.Update); // Change the allowance await contractWrappers.erc20Token.setProxyAllowanceAsync(zrxTokenAddress, makerAddress, new BigNumber(0)); // Check that both clients receive the emitted event - for (const client of [wsClient, wsClientTwo]) { - const updateMsg = await _onMessageAsync(client); - const updateData = JSON.parse(updateMsg.data); - console.log('-------------------------- UPDATE_DATA: ', updateData); - const orderState = updateData.result as OrderStateValid; - expect(orderState.isValid).to.be.true(); - expect(orderState.orderRelevantState.makerFeeProxyAllowance).to.be.eq('0'); - } + let updateMsg = await clientOneOnMessagePromise; + let updateData = JSON.parse(updateMsg.data); + let orderState = updateData.result as OrderStateValid; + expect(orderState.isValid).to.be.true(); + expect(orderState.orderRelevantState.makerFeeProxyAllowance).to.be.eq('0'); + + updateMsg = await clientTwoOnMessagePromise; + updateData = JSON.parse(updateMsg.data); + orderState = updateData.result as OrderStateValid; + expect(orderState.isValid).to.be.true(); + expect(orderState.orderRelevantState.makerFeeProxyAllowance).to.be.eq('0'); wsClientTwo.close(); logUtils.log(`${new Date()} [Client] Closed.`); -- cgit v1.2.3 From 90ee70db23251bfc68c7d4235be1bf1c4c6e6a92 Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Wed, 19 Dec 2018 12:49:51 +0000 Subject: Move onMessageAsync outside of tests and add comments --- .../test/order_watcher_web_socket_server_test.ts | 53 ++++++++++++---------- 1 file changed, 30 insertions(+), 23 deletions(-) (limited to 'packages/order-watcher') diff --git a/packages/order-watcher/test/order_watcher_web_socket_server_test.ts b/packages/order-watcher/test/order_watcher_web_socket_server_test.ts index 8070860e7..578e0de61 100644 --- a/packages/order-watcher/test/order_watcher_web_socket_server_test.ts +++ b/packages/order-watcher/test/order_watcher_web_socket_server_test.ts @@ -46,18 +46,6 @@ describe.only('OrderWatcherWebSocketServer', async () => { let removeOrderPayload: RemoveOrderRequest; const decimals = constants.ZRX_DECIMALS; const fillableAmount = Web3Wrapper.toBaseUnitAmount(new BigNumber(5), decimals); - // HACK: createFillableSignedOrderAsync is Promise-based, which forces us - // to use Promises instead of the done() callbacks for tests. - // onmessage callback must thus be wrapped as a Promise. - const _getOnMessagePromise = async (client: WebSocket.w3cwebsocket, method: string | null) => - new Promise(resolve => { - client.onmessage = (msg: WsMessage) => { - const data = JSON.parse(msg.data); - if (data.method === method) { - resolve(msg); - } - }; - }); before(async () => { // Set up constants @@ -152,7 +140,7 @@ describe.only('OrderWatcherWebSocketServer', async () => { method: 'BAD_METHOD', }; wsClient.onopen = () => wsClient.send(JSON.stringify(invalidMethodPayload)); - const errorMsg = await _getOnMessagePromise(wsClient, null); + const errorMsg = await onMessageAsync(wsClient, null); const errorData = JSON.parse(errorMsg.data); // tslint:disable-next-line:no-unused-expression expect(errorData.id).to.be.null; @@ -168,7 +156,7 @@ describe.only('OrderWatcherWebSocketServer', async () => { method: 'GET_STATS', }; wsClient.onopen = () => wsClient.send(JSON.stringify(noJsonRpcPayload)); - const errorMsg = await _getOnMessagePromise(wsClient, null); + const errorMsg = await onMessageAsync(wsClient, null); const errorData = JSON.parse(errorMsg.data); // tslint:disable-next-line:no-unused-expression expect(errorData.method).to.be.null; @@ -184,7 +172,7 @@ describe.only('OrderWatcherWebSocketServer', async () => { orderHash: '0x7337e2f2a9aa2ed6afe26edc2df7ad79c3ffa9cf9b81a964f707ea63f5272355', }; wsClient.onopen = () => wsClient.send(JSON.stringify(noSignedOrderAddOrderPayload)); - const errorMsg = await _getOnMessagePromise(wsClient, null); + const errorMsg = await onMessageAsync(wsClient, null); const errorData = JSON.parse(errorMsg.data); // tslint:disable-next-line:no-unused-expression expect(errorData.id).to.be.null; @@ -204,7 +192,7 @@ describe.only('OrderWatcherWebSocketServer', async () => { }, }; wsClient.onopen = () => wsClient.send(JSON.stringify(invalidAddOrderPayload)); - const errorMsg = await _getOnMessagePromise(wsClient, null); + const errorMsg = await onMessageAsync(wsClient, null); const errorData = JSON.parse(errorMsg.data); // tslint:disable-next-line:no-unused-expression expect(errorData.id).to.be.null; @@ -215,14 +203,14 @@ describe.only('OrderWatcherWebSocketServer', async () => { it('executes addOrder and removeOrder requests correctly', async () => { wsClient.onopen = () => wsClient.send(JSON.stringify(addOrderPayload)); - const addOrderMsg = await _getOnMessagePromise(wsClient, OrderWatcherMethod.AddOrder); + const addOrderMsg = await onMessageAsync(wsClient, OrderWatcherMethod.AddOrder); const addOrderData = JSON.parse(addOrderMsg.data); expect(addOrderData.method).to.be.eq('ADD_ORDER'); expect((wsServer as any)._orderWatcher._orderByOrderHash).to.deep.include({ [orderHash]: signedOrder, }); - const clientOnMessagePromise = _getOnMessagePromise(wsClient, OrderWatcherMethod.RemoveOrder); + const clientOnMessagePromise = onMessageAsync(wsClient, OrderWatcherMethod.RemoveOrder); wsClient.send(JSON.stringify(removeOrderPayload)); const removeOrderMsg = await clientOnMessagePromise; const removeOrderData = JSON.parse(removeOrderMsg.data); @@ -235,12 +223,16 @@ describe.only('OrderWatcherWebSocketServer', async () => { it('broadcasts orderStateInvalid message when makerAddress allowance set to 0 for watched order', async () => { // Add the regular order wsClient.onopen = () => wsClient.send(JSON.stringify(addOrderPayload)); - const clientOnMessagePromise = _getOnMessagePromise(wsClient, OrderWatcherMethod.Update); + + // We register the onMessage callback before calling `setProxyAllowanceAsync` which we + // expect will cause a message to be emitted. We do now "await" here, since we want to + // check for messages _after_ calling `setProxyAllowanceAsync` + const clientOnMessagePromise = onMessageAsync(wsClient, OrderWatcherMethod.Update); // Set the allowance to 0 await contractWrappers.erc20Token.setProxyAllowanceAsync(makerTokenAddress, makerAddress, new BigNumber(0)); - // Ensure that orderStateInvalid message is received. + // We now await the `onMessage` promise to check for the message const orderWatcherUpdateMsg = await clientOnMessagePromise; const orderWatcherUpdateData = JSON.parse(orderWatcherUpdateMsg.data); expect(orderWatcherUpdateData.method).to.be.eq('UPDATE'); @@ -276,13 +268,14 @@ describe.only('OrderWatcherWebSocketServer', async () => { logUtils.log(`${new Date()} [Client] Connected.`); wsClientTwo.onopen = () => wsClientTwo.send(JSON.stringify(nonZeroMakerFeeOrderPayload)); - const clientOneOnMessagePromise = _getOnMessagePromise(wsClient, OrderWatcherMethod.Update); - const clientTwoOnMessagePromise = _getOnMessagePromise(wsClientTwo, OrderWatcherMethod.Update); + // Setup the onMessage callbacks, but don't await them yet + const clientOneOnMessagePromise = onMessageAsync(wsClient, OrderWatcherMethod.Update); + const clientTwoOnMessagePromise = onMessageAsync(wsClientTwo, OrderWatcherMethod.Update); // Change the allowance await contractWrappers.erc20Token.setProxyAllowanceAsync(zrxTokenAddress, makerAddress, new BigNumber(0)); - // Check that both clients receive the emitted event + // Check that both clients receive the emitted event by awaiting the onMessageAsync promises let updateMsg = await clientOneOnMessagePromise; let updateData = JSON.parse(updateMsg.data); let orderState = updateData.result as OrderStateValid; @@ -299,3 +292,17 @@ describe.only('OrderWatcherWebSocketServer', async () => { logUtils.log(`${new Date()} [Client] Closed.`); }); }); + +// HACK: createFillableSignedOrderAsync is Promise-based, which forces us +// to use Promises instead of the done() callbacks for tests. +// onmessage callback must thus be wrapped as a Promise. +async function onMessageAsync(client: WebSocket.w3cwebsocket, method: string | null): Promise { + return new Promise(resolve => { + client.onmessage = (msg: WsMessage) => { + const data = JSON.parse(msg.data); + if (data.method === method) { + resolve(msg); + } + }; + }); +} -- cgit v1.2.3 From 5c24596d812a80011f9e92a13bf91923fbcc2a64 Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Wed, 19 Dec 2018 13:21:06 +0000 Subject: Add missing CHANGELOG entry for OrderWatcher WS interface --- packages/order-watcher/CHANGELOG.json | 10 ++++++++++ 1 file changed, 10 insertions(+) (limited to 'packages/order-watcher') diff --git a/packages/order-watcher/CHANGELOG.json b/packages/order-watcher/CHANGELOG.json index c1fd8d4a9..304dc45fd 100644 --- a/packages/order-watcher/CHANGELOG.json +++ b/packages/order-watcher/CHANGELOG.json @@ -1,4 +1,14 @@ [ + { + "version": "2.3.0", + "changes": [ + { + "note": + "Added a WebSocket interface to OrderWatcher so that it can be used by a client written in any language", + "pr": 1427 + } + ] + }, { "version": "2.2.8", "changes": [ -- cgit v1.2.3