diff options
4 files changed, 420 insertions, 1 deletions
diff --git a/packages/order-watcher/package.json b/packages/order-watcher/package.json index 9a51203f4..cfaf5d724 100644 --- a/packages/order-watcher/package.json +++ b/packages/order-watcher/package.json @@ -74,7 +74,8 @@ "ethereum-types": "^1.1.2", "ethereumjs-blockstream": "6.0.0", "ethers": "~4.0.4", - "lodash": "^4.17.5" + "lodash": "^4.17.5", + "websocket": "^1.0.25" }, "publishConfig": { "access": "public" diff --git a/packages/order-watcher/src/order_watcher/order_watcher_websocket.ts b/packages/order-watcher/src/order_watcher/order_watcher_websocket.ts new file mode 100644 index 000000000..84afc4000 --- /dev/null +++ b/packages/order-watcher/src/order_watcher/order_watcher_websocket.ts @@ -0,0 +1,184 @@ +import { ContractAddresses } from '@0x/contract-addresses'; +import { SignedOrder } from '@0x/types'; +import { BigNumber, logUtils } from '@0x/utils'; +import { Provider } from 'ethereum-types'; +import * as http from 'http'; +import * as WebSocket from 'websocket'; + +import { webSocketUtf8MessageSchema } from '../schemas/websocket_utf8_message_schema'; +import { OnOrderStateChangeCallback, OrderWatcherConfig } from '../types'; +import { assert } from '../utils/assert'; + +import { OrderWatcher } from './order_watcher'; + +const DEFAULT_HTTP_PORT = 8080; + +const enum OrderWatcherAction { + // Actions initiated by the user. + getStats = 'getStats', + addOrderAsync = 'addOrderAsync', + removeOrder = 'removeOrder', + // These are spontaneous; they are primarily orderstate changes. + orderWatcherUpdate = 'orderWatcherUpdate', + // `subscribe` and `unsubscribe` are methods of OrderWatcher, but we don't + // need to expose them to the WebSocket server user because the user implicitly + // subscribes and unsubscribes by connecting and disconnecting from the server. +} + +// Users have to create a json object of this format and attach it to +// the data field of their WebSocket message to interact with this server. +interface WebSocketRequestData { + action: OrderWatcherAction; + params: any; +} + +// Users should expect a json object of this format in the data field +// of the WebSocket messages that this server sends out. +interface WebSocketResponseData { + action: OrderWatcherAction; + success: number; + result: any; +} + +// Wraps the OrderWatcher functionality in a WebSocket server. Motivations: +// 1) Users can watch orders via non-typescript programs. +// 2) Better encapsulation so that users can work +export class OrderWatcherWebSocketServer { + public httpServer: http.Server; + public readonly _orderWatcher: OrderWatcher; + private readonly _connectionStore: Set<WebSocket.connection>; + private readonly _wsServer: WebSocket.server; + /** + * Instantiate a new web socket server which provides OrderWatcher functionality + * @param provider Web3 provider to use for JSON RPC calls (for OrderWatcher) + * @param networkId NetworkId to watch orders on (for OrderWatcher) + * @param contractAddresses Optional contract addresses. Defaults to known + * addresses based on networkId (for OrderWatcher) + * @param partialConfig Optional configurations (for OrderWatcher) + */ + constructor( + provider: Provider, + networkId: number, + contractAddresses?: ContractAddresses, + partialConfig?: Partial<OrderWatcherConfig>, + ) { + this._orderWatcher = new OrderWatcher(provider, networkId, contractAddresses, partialConfig); + this._connectionStore = new Set(); + this.httpServer = http.createServer(); + this._wsServer = new WebSocket.server({ + httpServer: this.httpServer, + autoAcceptConnections: false, + }); + + this._wsServer.on('request', (request: any) => { + // Designed for usage pattern where client and server are run on the same + // machine by the same user. As such, no security checks are in place. + const connection: WebSocket.connection = request.accept(null, request.origin); + logUtils.log(`${new Date()} [Server] Accepted connection from origin ${request.origin}.`); + connection.on('message', this._messageCallback.bind(this, connection)); + connection.on('close', this._closeCallback.bind(this, connection)); + this._connectionStore.add(connection); + }); + + // Have the WebSocket server subscribe to the OrderWatcher to receive updates. + // These updates are then broadcast to clients in the _connectionStore. + this._orderWatcher.subscribe(this._broadcastCallback); + } + + /** + * Activates the WebSocket server by having its HTTP server start listening. + */ + public listen(): void { + this.httpServer.listen(DEFAULT_HTTP_PORT, () => { + logUtils.log(`${new Date()} [Server] Listening on port ${DEFAULT_HTTP_PORT}`); + }); + } + + public close(): void { + this.httpServer.close(); + } + + private _messageCallback(connection: WebSocket.connection, message: any): void { + assert.doesConformToSchema('message', message, webSocketUtf8MessageSchema); + const requestData: WebSocketRequestData = JSON.parse(message.utf8Data); + const responseData = this._routeRequest(requestData); + logUtils.log(`${new Date()} [Server] OrderWatcher output: ${JSON.stringify(responseData)}`); + connection.sendUTF(JSON.stringify(responseData)); + } + + private _closeCallback(connection: WebSocket.connection): void { + this._connectionStore.delete(connection); + logUtils.log(`${new Date()} [Server] Client ${connection.remoteAddress} disconnected.`); + } + + private _routeRequest(requestData: WebSocketRequestData): WebSocketResponseData { + const responseData: WebSocketResponseData = { + action: requestData.action, + success: 0, + result: undefined, + }; + + try { + logUtils.log(`${new Date()} [Server] Request received: ${requestData.action}`); + switch (requestData.action) { + case 'addOrderAsync': { + const signedOrder: SignedOrder = this._parseSignedOrder(requestData); + // tslint:disable-next-line:no-floating-promises + this._orderWatcher.addOrderAsync(signedOrder); // Ok to fireNforget + break; + } + case 'removeOrder': { + this._orderWatcher.removeOrder(requestData.params.orderHash); + break; + } + case 'getStats': { + responseData.result = this._orderWatcher.getStats(); + break; + } + default: + throw new Error(`[Server] Invalid request action: ${requestData.action}`); + } + responseData.success = 1; + } catch (err) { + responseData.result = { error: err.toString() }; + } + return responseData; + } + + /** + * Broadcasts OrderState changes to ALL connected clients. At the moment, + * we do not support clients subscribing to only a subset of orders. As such, + * Client B will be notified of changes to an order that Client A added. + */ + private readonly _broadcastCallback: OnOrderStateChangeCallback = (err, orderState) => { + this._connectionStore.forEach((connection: WebSocket.connection) => { + const responseData: WebSocketResponseData = { + action: OrderWatcherAction.orderWatcherUpdate, + success: 1, + result: orderState || err, + }; + connection.sendUTF(JSON.stringify(responseData)); + }); + // tslint:disable-next-line:semicolon + }; // tslint thinks this is a class method, It's actally a property that holds a function. + + /** + * Recover types lost when the payload is stringified. + */ + private readonly _parseSignedOrder = (requestData: WebSocketRequestData) => { + const signedOrder = requestData.params.signedOrder; + const bigNumberFields = [ + 'salt', + 'makerFee', + 'takerFee', + 'makerAssetAmount', + 'takerAssetAmount', + 'expirationTimeSeconds', + ]; + for (const field of bigNumberFields) { + signedOrder[field] = new BigNumber(signedOrder[field]); + } + return signedOrder; + // tslint:disable-next-line:semicolon + }; // tslint thinks this is a class method, It's actally a property that holds a function. +} diff --git a/packages/order-watcher/src/schemas/websocket_utf8_message_schema.ts b/packages/order-watcher/src/schemas/websocket_utf8_message_schema.ts new file mode 100644 index 000000000..0a0eed407 --- /dev/null +++ b/packages/order-watcher/src/schemas/websocket_utf8_message_schema.ts @@ -0,0 +1,8 @@ +export const webSocketUtf8MessageSchema = { + id: '/WebSocketUtf8MessageSchema', + properties: { + utf8Data: { type: 'string' }, + }, + type: 'object', + required: ['utf8Data'], +}; diff --git a/packages/order-watcher/test/order_watcher_websocket_test.ts b/packages/order-watcher/test/order_watcher_websocket_test.ts new file mode 100644 index 000000000..e7b18e44b --- /dev/null +++ b/packages/order-watcher/test/order_watcher_websocket_test.ts @@ -0,0 +1,226 @@ +import { ContractWrappers } from '@0x/contract-wrappers'; +import { tokenUtils } from '@0x/contract-wrappers/lib/test/utils/token_utils'; +import { BlockchainLifecycle } from '@0x/dev-utils'; +import { FillScenarios } from '@0x/fill-scenarios'; +import { assetDataUtils, orderHashUtils } from '@0x/order-utils'; +import { ExchangeContractErrs, OrderStateInvalid, OrderStateValid, SignedOrder } from '@0x/types'; +import { BigNumber, logUtils } from '@0x/utils'; +import { Web3Wrapper } from '@0x/web3-wrapper'; +import * as chai from 'chai'; +import 'mocha'; +import * as WebSocket from 'websocket'; + +import { OrderWatcherWebSocketServer } from '../src/order_watcher/order_watcher_websocket'; + +import { chaiSetup } from './utils/chai_setup'; +import { constants } from './utils/constants'; +import { migrateOnceAsync } from './utils/migrate'; +import { provider, web3Wrapper } from './utils/web3_wrapper'; + +chaiSetup.configure(); +const expect = chai.expect; +const blockchainLifecycle = new BlockchainLifecycle(web3Wrapper); + +interface WsMessage { + data: string; +} + +describe.only('OrderWatcherWebSocket', async () => { + let contractWrappers: ContractWrappers; + let wsServer: OrderWatcherWebSocketServer; + let wsClient: WebSocket.w3cwebsocket; + let wsClientTwo: WebSocket.w3cwebsocket; + let fillScenarios: FillScenarios; + let userAddresses: string[]; + let makerAssetData: string; + let takerAssetData: string; + let makerTokenAddress: string; + let takerTokenAddress: string; + let makerAddress: string; + let takerAddress: string; + let zrxTokenAddress: string; + let signedOrder: SignedOrder; + let orderHash: string; + let addOrderPayload: { action: string; params: { signedOrder: SignedOrder } }; + let removeOrderPayload: { action: string; params: { orderHash: string } }; + const decimals = constants.ZRX_DECIMALS; + const fillableAmount = Web3Wrapper.toBaseUnitAmount(new BigNumber(5), decimals); + // createFillableSignedOrderAsync is Promise-based, which forces us + // to use Promises instead of the done() callbacks for tests. + // onmessage callback must thus be wrapped as a Promise. + const _onMessageAsync = async (client: WebSocket.w3cwebsocket) => + new Promise<WsMessage>(resolve => { + client.onmessage = (msg: WsMessage) => resolve(msg); + }); + + before(async () => { + // Set up constants + const contractAddresses = await migrateOnceAsync(); + await blockchainLifecycle.startAsync(); + const networkId = constants.TESTRPC_NETWORK_ID; + const config = { + networkId, + contractAddresses, + }; + contractWrappers = new ContractWrappers(provider, config); + userAddresses = await web3Wrapper.getAvailableAddressesAsync(); + zrxTokenAddress = contractAddresses.zrxToken; + [makerAddress, takerAddress] = userAddresses; + [makerTokenAddress, takerTokenAddress] = tokenUtils.getDummyERC20TokenAddresses(); + [makerAssetData, takerAssetData] = [ + assetDataUtils.encodeERC20AssetData(makerTokenAddress), + assetDataUtils.encodeERC20AssetData(takerTokenAddress), + ]; + fillScenarios = new FillScenarios( + provider, + userAddresses, + zrxTokenAddress, + contractAddresses.exchange, + contractAddresses.erc20Proxy, + contractAddresses.erc721Proxy, + ); + signedOrder = await fillScenarios.createFillableSignedOrderAsync( + makerAssetData, + takerAssetData, + makerAddress, + takerAddress, + fillableAmount, + ); + orderHash = orderHashUtils.getOrderHashHex(signedOrder); + addOrderPayload = { + action: 'addOrderAsync', + params: { signedOrder }, + }; + removeOrderPayload = { + action: 'removeOrder', + params: { orderHash }, + }; + + // Prepare OrderWatcher WebSocket server + const orderWatcherConfig = {}; + wsServer = new OrderWatcherWebSocketServer(provider, networkId, contractAddresses, orderWatcherConfig); + wsServer.listen(); + }); + after(async () => { + await blockchainLifecycle.revertAsync(); + wsServer.close(); + }); + beforeEach(async () => { + await blockchainLifecycle.startAsync(); + wsClient = new WebSocket.w3cwebsocket('ws://127.0.0.1:8080/'); + logUtils.log(`${new Date()} [Client] Connected.`); + }); + afterEach(async () => { + await blockchainLifecycle.revertAsync(); + wsClient.close(); + logUtils.log(`${new Date()} [Client] Closed.`); + }); + + it('responds to getStats requests correctly', (done: any) => { + const payload = { + action: 'getStats', + params: {}, + }; + wsClient.onopen = () => wsClient.send(JSON.stringify(payload)); + wsClient.onmessage = (msg: any) => { + const responseData = JSON.parse(msg.data); + expect(responseData.action).to.be.eq('getStats'); + expect(responseData.success).to.be.eq(1); + expect(responseData.result.orderCount).to.be.eq(0); + done(); + }; + }); + + it('throws an error when an invalid action is attempted', async () => { + const invalidActionPayload = { + action: 'badAction', + params: {}, + }; + wsClient.onopen = () => wsClient.send(JSON.stringify(invalidActionPayload)); + const errorMsg = await _onMessageAsync(wsClient); + const errorData = JSON.parse(errorMsg.data); + expect(errorData.action).to.be.eq('badAction'); + expect(errorData.success).to.be.eq(0); + expect(errorData.result.error).to.be.eq('Error: [Server] Invalid request action: badAction'); + }); + + it('executes addOrderAsync and removeOrder requests correctly', async () => { + wsClient.onopen = () => wsClient.send(JSON.stringify(addOrderPayload)); + const addOrderMsg = await _onMessageAsync(wsClient); + const addOrderData = JSON.parse(addOrderMsg.data); + expect(addOrderData.action).to.be.eq('addOrderAsync'); + expect(addOrderData.success).to.be.eq(1); + expect((wsServer._orderWatcher as any)._orderByOrderHash).to.deep.include({ + [orderHash]: signedOrder, + }); + + wsClient.send(JSON.stringify(removeOrderPayload)); + const removeOrderMsg = await _onMessageAsync(wsClient); + const removeOrderData = JSON.parse(removeOrderMsg.data); + expect(removeOrderData.action).to.be.eq('removeOrder'); + expect(removeOrderData.success).to.be.eq(1); + expect((wsServer._orderWatcher as any)._orderByOrderHash).to.not.deep.include({ + [orderHash]: signedOrder, + }); + }); + + it('broadcasts orderStateInvalid message when makerAddress allowance set to 0 for watched order', async () => { + // Add the regular order + wsClient.onopen = () => wsClient.send(JSON.stringify(addOrderPayload)); + await _onMessageAsync(wsClient); + + // Set the allowance to 0 + await contractWrappers.erc20Token.setProxyAllowanceAsync(makerTokenAddress, makerAddress, new BigNumber(0)); + + // Ensure that orderStateInvalid message is received. + const orderWatcherUpdateMsg = await _onMessageAsync(wsClient); + const orderWatcherUpdateData = JSON.parse(orderWatcherUpdateMsg.data); + expect(orderWatcherUpdateData.action).to.be.eq('orderWatcherUpdate'); + expect(orderWatcherUpdateData.success).to.be.eq(1); + const invalidOrderState = orderWatcherUpdateData.result as OrderStateInvalid; + expect(invalidOrderState.isValid).to.be.false(); + expect(invalidOrderState.orderHash).to.be.eq(orderHash); + expect(invalidOrderState.error).to.be.eq(ExchangeContractErrs.InsufficientMakerAllowance); + }); + + it('broadcasts to multiple clients when an order backing ZRX allowance changes', async () => { + // Prepare order + const makerFee = Web3Wrapper.toBaseUnitAmount(new BigNumber(2), decimals); + const takerFee = Web3Wrapper.toBaseUnitAmount(new BigNumber(0), decimals); + const nonZeroMakerFeeSignedOrder = await fillScenarios.createFillableSignedOrderWithFeesAsync( + makerAssetData, + takerAssetData, + makerFee, + takerFee, + makerAddress, + takerAddress, + fillableAmount, + takerAddress, + ); + const nonZeroMakerFeeOrderPayload = { + action: 'addOrderAsync', + params: { nonZeroMakerFeeSignedOrder }, + }; + + // Set up a second client and have it add the order + wsClientTwo = new WebSocket.w3cwebsocket('ws://127.0.0.1:8080/'); + logUtils.log(`${new Date()} [Client] Connected.`); + wsClientTwo.onopen = () => wsClientTwo.send(JSON.stringify(nonZeroMakerFeeOrderPayload)); + await _onMessageAsync(wsClientTwo); + + // Change the allowance + await contractWrappers.erc20Token.setProxyAllowanceAsync(zrxTokenAddress, makerAddress, new BigNumber(0)); + + // Check that both clients receive the emitted event + for (const client of [wsClient, wsClientTwo]) { + const updateMsg = await _onMessageAsync(client); + const updateData = JSON.parse(updateMsg.data); + const orderState = updateData.result as OrderStateValid; + expect(orderState.isValid).to.be.true(); + expect(orderState.orderRelevantState.makerFeeProxyAllowance).to.be.eq('0'); + } + + wsClientTwo.close(); + logUtils.log(`${new Date()} [Client] Closed.`); + }); +}); |