aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--packages/order-watcher/package.json3
-rw-r--r--packages/order-watcher/src/order_watcher/order_watcher_websocket.ts184
-rw-r--r--packages/order-watcher/src/schemas/websocket_utf8_message_schema.ts8
-rw-r--r--packages/order-watcher/test/order_watcher_websocket_test.ts226
4 files changed, 420 insertions, 1 deletions
diff --git a/packages/order-watcher/package.json b/packages/order-watcher/package.json
index 9a51203f4..cfaf5d724 100644
--- a/packages/order-watcher/package.json
+++ b/packages/order-watcher/package.json
@@ -74,7 +74,8 @@
"ethereum-types": "^1.1.2",
"ethereumjs-blockstream": "6.0.0",
"ethers": "~4.0.4",
- "lodash": "^4.17.5"
+ "lodash": "^4.17.5",
+ "websocket": "^1.0.25"
},
"publishConfig": {
"access": "public"
diff --git a/packages/order-watcher/src/order_watcher/order_watcher_websocket.ts b/packages/order-watcher/src/order_watcher/order_watcher_websocket.ts
new file mode 100644
index 000000000..84afc4000
--- /dev/null
+++ b/packages/order-watcher/src/order_watcher/order_watcher_websocket.ts
@@ -0,0 +1,184 @@
+import { ContractAddresses } from '@0x/contract-addresses';
+import { SignedOrder } from '@0x/types';
+import { BigNumber, logUtils } from '@0x/utils';
+import { Provider } from 'ethereum-types';
+import * as http from 'http';
+import * as WebSocket from 'websocket';
+
+import { webSocketUtf8MessageSchema } from '../schemas/websocket_utf8_message_schema';
+import { OnOrderStateChangeCallback, OrderWatcherConfig } from '../types';
+import { assert } from '../utils/assert';
+
+import { OrderWatcher } from './order_watcher';
+
+const DEFAULT_HTTP_PORT = 8080;
+
+const enum OrderWatcherAction {
+ // Actions initiated by the user.
+ getStats = 'getStats',
+ addOrderAsync = 'addOrderAsync',
+ removeOrder = 'removeOrder',
+ // These are spontaneous; they are primarily orderstate changes.
+ orderWatcherUpdate = 'orderWatcherUpdate',
+ // `subscribe` and `unsubscribe` are methods of OrderWatcher, but we don't
+ // need to expose them to the WebSocket server user because the user implicitly
+ // subscribes and unsubscribes by connecting and disconnecting from the server.
+}
+
+// Users have to create a json object of this format and attach it to
+// the data field of their WebSocket message to interact with this server.
+interface WebSocketRequestData {
+ action: OrderWatcherAction;
+ params: any;
+}
+
+// Users should expect a json object of this format in the data field
+// of the WebSocket messages that this server sends out.
+interface WebSocketResponseData {
+ action: OrderWatcherAction;
+ success: number;
+ result: any;
+}
+
+// Wraps the OrderWatcher functionality in a WebSocket server. Motivations:
+// 1) Users can watch orders via non-typescript programs.
+// 2) Better encapsulation so that users can work
+export class OrderWatcherWebSocketServer {
+ public httpServer: http.Server;
+ public readonly _orderWatcher: OrderWatcher;
+ private readonly _connectionStore: Set<WebSocket.connection>;
+ private readonly _wsServer: WebSocket.server;
+ /**
+ * Instantiate a new web socket server which provides OrderWatcher functionality
+ * @param provider Web3 provider to use for JSON RPC calls (for OrderWatcher)
+ * @param networkId NetworkId to watch orders on (for OrderWatcher)
+ * @param contractAddresses Optional contract addresses. Defaults to known
+ * addresses based on networkId (for OrderWatcher)
+ * @param partialConfig Optional configurations (for OrderWatcher)
+ */
+ constructor(
+ provider: Provider,
+ networkId: number,
+ contractAddresses?: ContractAddresses,
+ partialConfig?: Partial<OrderWatcherConfig>,
+ ) {
+ this._orderWatcher = new OrderWatcher(provider, networkId, contractAddresses, partialConfig);
+ this._connectionStore = new Set();
+ this.httpServer = http.createServer();
+ this._wsServer = new WebSocket.server({
+ httpServer: this.httpServer,
+ autoAcceptConnections: false,
+ });
+
+ this._wsServer.on('request', (request: any) => {
+ // Designed for usage pattern where client and server are run on the same
+ // machine by the same user. As such, no security checks are in place.
+ const connection: WebSocket.connection = request.accept(null, request.origin);
+ logUtils.log(`${new Date()} [Server] Accepted connection from origin ${request.origin}.`);
+ connection.on('message', this._messageCallback.bind(this, connection));
+ connection.on('close', this._closeCallback.bind(this, connection));
+ this._connectionStore.add(connection);
+ });
+
+ // Have the WebSocket server subscribe to the OrderWatcher to receive updates.
+ // These updates are then broadcast to clients in the _connectionStore.
+ this._orderWatcher.subscribe(this._broadcastCallback);
+ }
+
+ /**
+ * Activates the WebSocket server by having its HTTP server start listening.
+ */
+ public listen(): void {
+ this.httpServer.listen(DEFAULT_HTTP_PORT, () => {
+ logUtils.log(`${new Date()} [Server] Listening on port ${DEFAULT_HTTP_PORT}`);
+ });
+ }
+
+ public close(): void {
+ this.httpServer.close();
+ }
+
+ private _messageCallback(connection: WebSocket.connection, message: any): void {
+ assert.doesConformToSchema('message', message, webSocketUtf8MessageSchema);
+ const requestData: WebSocketRequestData = JSON.parse(message.utf8Data);
+ const responseData = this._routeRequest(requestData);
+ logUtils.log(`${new Date()} [Server] OrderWatcher output: ${JSON.stringify(responseData)}`);
+ connection.sendUTF(JSON.stringify(responseData));
+ }
+
+ private _closeCallback(connection: WebSocket.connection): void {
+ this._connectionStore.delete(connection);
+ logUtils.log(`${new Date()} [Server] Client ${connection.remoteAddress} disconnected.`);
+ }
+
+ private _routeRequest(requestData: WebSocketRequestData): WebSocketResponseData {
+ const responseData: WebSocketResponseData = {
+ action: requestData.action,
+ success: 0,
+ result: undefined,
+ };
+
+ try {
+ logUtils.log(`${new Date()} [Server] Request received: ${requestData.action}`);
+ switch (requestData.action) {
+ case 'addOrderAsync': {
+ const signedOrder: SignedOrder = this._parseSignedOrder(requestData);
+ // tslint:disable-next-line:no-floating-promises
+ this._orderWatcher.addOrderAsync(signedOrder); // Ok to fireNforget
+ break;
+ }
+ case 'removeOrder': {
+ this._orderWatcher.removeOrder(requestData.params.orderHash);
+ break;
+ }
+ case 'getStats': {
+ responseData.result = this._orderWatcher.getStats();
+ break;
+ }
+ default:
+ throw new Error(`[Server] Invalid request action: ${requestData.action}`);
+ }
+ responseData.success = 1;
+ } catch (err) {
+ responseData.result = { error: err.toString() };
+ }
+ return responseData;
+ }
+
+ /**
+ * Broadcasts OrderState changes to ALL connected clients. At the moment,
+ * we do not support clients subscribing to only a subset of orders. As such,
+ * Client B will be notified of changes to an order that Client A added.
+ */
+ private readonly _broadcastCallback: OnOrderStateChangeCallback = (err, orderState) => {
+ this._connectionStore.forEach((connection: WebSocket.connection) => {
+ const responseData: WebSocketResponseData = {
+ action: OrderWatcherAction.orderWatcherUpdate,
+ success: 1,
+ result: orderState || err,
+ };
+ connection.sendUTF(JSON.stringify(responseData));
+ });
+ // tslint:disable-next-line:semicolon
+ }; // tslint thinks this is a class method, It's actally a property that holds a function.
+
+ /**
+ * Recover types lost when the payload is stringified.
+ */
+ private readonly _parseSignedOrder = (requestData: WebSocketRequestData) => {
+ const signedOrder = requestData.params.signedOrder;
+ const bigNumberFields = [
+ 'salt',
+ 'makerFee',
+ 'takerFee',
+ 'makerAssetAmount',
+ 'takerAssetAmount',
+ 'expirationTimeSeconds',
+ ];
+ for (const field of bigNumberFields) {
+ signedOrder[field] = new BigNumber(signedOrder[field]);
+ }
+ return signedOrder;
+ // tslint:disable-next-line:semicolon
+ }; // tslint thinks this is a class method, It's actally a property that holds a function.
+}
diff --git a/packages/order-watcher/src/schemas/websocket_utf8_message_schema.ts b/packages/order-watcher/src/schemas/websocket_utf8_message_schema.ts
new file mode 100644
index 000000000..0a0eed407
--- /dev/null
+++ b/packages/order-watcher/src/schemas/websocket_utf8_message_schema.ts
@@ -0,0 +1,8 @@
+export const webSocketUtf8MessageSchema = {
+ id: '/WebSocketUtf8MessageSchema',
+ properties: {
+ utf8Data: { type: 'string' },
+ },
+ type: 'object',
+ required: ['utf8Data'],
+};
diff --git a/packages/order-watcher/test/order_watcher_websocket_test.ts b/packages/order-watcher/test/order_watcher_websocket_test.ts
new file mode 100644
index 000000000..e7b18e44b
--- /dev/null
+++ b/packages/order-watcher/test/order_watcher_websocket_test.ts
@@ -0,0 +1,226 @@
+import { ContractWrappers } from '@0x/contract-wrappers';
+import { tokenUtils } from '@0x/contract-wrappers/lib/test/utils/token_utils';
+import { BlockchainLifecycle } from '@0x/dev-utils';
+import { FillScenarios } from '@0x/fill-scenarios';
+import { assetDataUtils, orderHashUtils } from '@0x/order-utils';
+import { ExchangeContractErrs, OrderStateInvalid, OrderStateValid, SignedOrder } from '@0x/types';
+import { BigNumber, logUtils } from '@0x/utils';
+import { Web3Wrapper } from '@0x/web3-wrapper';
+import * as chai from 'chai';
+import 'mocha';
+import * as WebSocket from 'websocket';
+
+import { OrderWatcherWebSocketServer } from '../src/order_watcher/order_watcher_websocket';
+
+import { chaiSetup } from './utils/chai_setup';
+import { constants } from './utils/constants';
+import { migrateOnceAsync } from './utils/migrate';
+import { provider, web3Wrapper } from './utils/web3_wrapper';
+
+chaiSetup.configure();
+const expect = chai.expect;
+const blockchainLifecycle = new BlockchainLifecycle(web3Wrapper);
+
+interface WsMessage {
+ data: string;
+}
+
+describe.only('OrderWatcherWebSocket', async () => {
+ let contractWrappers: ContractWrappers;
+ let wsServer: OrderWatcherWebSocketServer;
+ let wsClient: WebSocket.w3cwebsocket;
+ let wsClientTwo: WebSocket.w3cwebsocket;
+ let fillScenarios: FillScenarios;
+ let userAddresses: string[];
+ let makerAssetData: string;
+ let takerAssetData: string;
+ let makerTokenAddress: string;
+ let takerTokenAddress: string;
+ let makerAddress: string;
+ let takerAddress: string;
+ let zrxTokenAddress: string;
+ let signedOrder: SignedOrder;
+ let orderHash: string;
+ let addOrderPayload: { action: string; params: { signedOrder: SignedOrder } };
+ let removeOrderPayload: { action: string; params: { orderHash: string } };
+ const decimals = constants.ZRX_DECIMALS;
+ const fillableAmount = Web3Wrapper.toBaseUnitAmount(new BigNumber(5), decimals);
+ // createFillableSignedOrderAsync is Promise-based, which forces us
+ // to use Promises instead of the done() callbacks for tests.
+ // onmessage callback must thus be wrapped as a Promise.
+ const _onMessageAsync = async (client: WebSocket.w3cwebsocket) =>
+ new Promise<WsMessage>(resolve => {
+ client.onmessage = (msg: WsMessage) => resolve(msg);
+ });
+
+ before(async () => {
+ // Set up constants
+ const contractAddresses = await migrateOnceAsync();
+ await blockchainLifecycle.startAsync();
+ const networkId = constants.TESTRPC_NETWORK_ID;
+ const config = {
+ networkId,
+ contractAddresses,
+ };
+ contractWrappers = new ContractWrappers(provider, config);
+ userAddresses = await web3Wrapper.getAvailableAddressesAsync();
+ zrxTokenAddress = contractAddresses.zrxToken;
+ [makerAddress, takerAddress] = userAddresses;
+ [makerTokenAddress, takerTokenAddress] = tokenUtils.getDummyERC20TokenAddresses();
+ [makerAssetData, takerAssetData] = [
+ assetDataUtils.encodeERC20AssetData(makerTokenAddress),
+ assetDataUtils.encodeERC20AssetData(takerTokenAddress),
+ ];
+ fillScenarios = new FillScenarios(
+ provider,
+ userAddresses,
+ zrxTokenAddress,
+ contractAddresses.exchange,
+ contractAddresses.erc20Proxy,
+ contractAddresses.erc721Proxy,
+ );
+ signedOrder = await fillScenarios.createFillableSignedOrderAsync(
+ makerAssetData,
+ takerAssetData,
+ makerAddress,
+ takerAddress,
+ fillableAmount,
+ );
+ orderHash = orderHashUtils.getOrderHashHex(signedOrder);
+ addOrderPayload = {
+ action: 'addOrderAsync',
+ params: { signedOrder },
+ };
+ removeOrderPayload = {
+ action: 'removeOrder',
+ params: { orderHash },
+ };
+
+ // Prepare OrderWatcher WebSocket server
+ const orderWatcherConfig = {};
+ wsServer = new OrderWatcherWebSocketServer(provider, networkId, contractAddresses, orderWatcherConfig);
+ wsServer.listen();
+ });
+ after(async () => {
+ await blockchainLifecycle.revertAsync();
+ wsServer.close();
+ });
+ beforeEach(async () => {
+ await blockchainLifecycle.startAsync();
+ wsClient = new WebSocket.w3cwebsocket('ws://127.0.0.1:8080/');
+ logUtils.log(`${new Date()} [Client] Connected.`);
+ });
+ afterEach(async () => {
+ await blockchainLifecycle.revertAsync();
+ wsClient.close();
+ logUtils.log(`${new Date()} [Client] Closed.`);
+ });
+
+ it('responds to getStats requests correctly', (done: any) => {
+ const payload = {
+ action: 'getStats',
+ params: {},
+ };
+ wsClient.onopen = () => wsClient.send(JSON.stringify(payload));
+ wsClient.onmessage = (msg: any) => {
+ const responseData = JSON.parse(msg.data);
+ expect(responseData.action).to.be.eq('getStats');
+ expect(responseData.success).to.be.eq(1);
+ expect(responseData.result.orderCount).to.be.eq(0);
+ done();
+ };
+ });
+
+ it('throws an error when an invalid action is attempted', async () => {
+ const invalidActionPayload = {
+ action: 'badAction',
+ params: {},
+ };
+ wsClient.onopen = () => wsClient.send(JSON.stringify(invalidActionPayload));
+ const errorMsg = await _onMessageAsync(wsClient);
+ const errorData = JSON.parse(errorMsg.data);
+ expect(errorData.action).to.be.eq('badAction');
+ expect(errorData.success).to.be.eq(0);
+ expect(errorData.result.error).to.be.eq('Error: [Server] Invalid request action: badAction');
+ });
+
+ it('executes addOrderAsync and removeOrder requests correctly', async () => {
+ wsClient.onopen = () => wsClient.send(JSON.stringify(addOrderPayload));
+ const addOrderMsg = await _onMessageAsync(wsClient);
+ const addOrderData = JSON.parse(addOrderMsg.data);
+ expect(addOrderData.action).to.be.eq('addOrderAsync');
+ expect(addOrderData.success).to.be.eq(1);
+ expect((wsServer._orderWatcher as any)._orderByOrderHash).to.deep.include({
+ [orderHash]: signedOrder,
+ });
+
+ wsClient.send(JSON.stringify(removeOrderPayload));
+ const removeOrderMsg = await _onMessageAsync(wsClient);
+ const removeOrderData = JSON.parse(removeOrderMsg.data);
+ expect(removeOrderData.action).to.be.eq('removeOrder');
+ expect(removeOrderData.success).to.be.eq(1);
+ expect((wsServer._orderWatcher as any)._orderByOrderHash).to.not.deep.include({
+ [orderHash]: signedOrder,
+ });
+ });
+
+ it('broadcasts orderStateInvalid message when makerAddress allowance set to 0 for watched order', async () => {
+ // Add the regular order
+ wsClient.onopen = () => wsClient.send(JSON.stringify(addOrderPayload));
+ await _onMessageAsync(wsClient);
+
+ // Set the allowance to 0
+ await contractWrappers.erc20Token.setProxyAllowanceAsync(makerTokenAddress, makerAddress, new BigNumber(0));
+
+ // Ensure that orderStateInvalid message is received.
+ const orderWatcherUpdateMsg = await _onMessageAsync(wsClient);
+ const orderWatcherUpdateData = JSON.parse(orderWatcherUpdateMsg.data);
+ expect(orderWatcherUpdateData.action).to.be.eq('orderWatcherUpdate');
+ expect(orderWatcherUpdateData.success).to.be.eq(1);
+ const invalidOrderState = orderWatcherUpdateData.result as OrderStateInvalid;
+ expect(invalidOrderState.isValid).to.be.false();
+ expect(invalidOrderState.orderHash).to.be.eq(orderHash);
+ expect(invalidOrderState.error).to.be.eq(ExchangeContractErrs.InsufficientMakerAllowance);
+ });
+
+ it('broadcasts to multiple clients when an order backing ZRX allowance changes', async () => {
+ // Prepare order
+ const makerFee = Web3Wrapper.toBaseUnitAmount(new BigNumber(2), decimals);
+ const takerFee = Web3Wrapper.toBaseUnitAmount(new BigNumber(0), decimals);
+ const nonZeroMakerFeeSignedOrder = await fillScenarios.createFillableSignedOrderWithFeesAsync(
+ makerAssetData,
+ takerAssetData,
+ makerFee,
+ takerFee,
+ makerAddress,
+ takerAddress,
+ fillableAmount,
+ takerAddress,
+ );
+ const nonZeroMakerFeeOrderPayload = {
+ action: 'addOrderAsync',
+ params: { nonZeroMakerFeeSignedOrder },
+ };
+
+ // Set up a second client and have it add the order
+ wsClientTwo = new WebSocket.w3cwebsocket('ws://127.0.0.1:8080/');
+ logUtils.log(`${new Date()} [Client] Connected.`);
+ wsClientTwo.onopen = () => wsClientTwo.send(JSON.stringify(nonZeroMakerFeeOrderPayload));
+ await _onMessageAsync(wsClientTwo);
+
+ // Change the allowance
+ await contractWrappers.erc20Token.setProxyAllowanceAsync(zrxTokenAddress, makerAddress, new BigNumber(0));
+
+ // Check that both clients receive the emitted event
+ for (const client of [wsClient, wsClientTwo]) {
+ const updateMsg = await _onMessageAsync(client);
+ const updateData = JSON.parse(updateMsg.data);
+ const orderState = updateData.result as OrderStateValid;
+ expect(orderState.isValid).to.be.true();
+ expect(orderState.orderRelevantState.makerFeeProxyAllowance).to.be.eq('0');
+ }
+
+ wsClientTwo.close();
+ logUtils.log(`${new Date()} [Client] Closed.`);
+ });
+});