aboutsummaryrefslogtreecommitdiffstats
path: root/packages/order-watcher/src
diff options
context:
space:
mode:
authorFabio Berger <me@fabioberger.com>2018-12-19 21:21:16 +0800
committerFabio Berger <me@fabioberger.com>2018-12-19 21:21:16 +0800
commit040b402b6d558d13f2f4e032297b6723cdf2aafe (patch)
treec973d3f5dc7f45ff6ede9f1736315af3ed902828 /packages/order-watcher/src
parent5c24596d812a80011f9e92a13bf91923fbcc2a64 (diff)
parent552007cafbb577c92b89cedd7d83f5b6d00998e1 (diff)
downloaddexon-sol-tools-040b402b6d558d13f2f4e032297b6723cdf2aafe.tar
dexon-sol-tools-040b402b6d558d13f2f4e032297b6723cdf2aafe.tar.gz
dexon-sol-tools-040b402b6d558d13f2f4e032297b6723cdf2aafe.tar.bz2
dexon-sol-tools-040b402b6d558d13f2f4e032297b6723cdf2aafe.tar.lz
dexon-sol-tools-040b402b6d558d13f2f4e032297b6723cdf2aafe.tar.xz
dexon-sol-tools-040b402b6d558d13f2f4e032297b6723cdf2aafe.tar.zst
dexon-sol-tools-040b402b6d558d13f2f4e032297b6723cdf2aafe.zip
Merge branch 'development' of github.com:0xProject/0x-monorepo into development
* 'development' of github.com:0xProject/0x-monorepo: Move onMessageAsync outside of tests and add comments Fix WS tests to remove race-condition and be more specific about the message expected Add temporary console.log to test failing on CI Remove unused file Fix file name Consolidate use of isVerbose in orderWatcherConfig Add isVerbose option to enable/disable logging Fix schemas and tests Move OrderWatcher Websocket schemas to json-schemas and convert to JSON so that they are language agnostic Improve our compliance to the JSON RPC spec remove unused instance variable Ensure fileName matches class name, fix broadcast Respond to CR Respond to CR WIP: OrderWatcher WebSocket
Diffstat (limited to 'packages/order-watcher/src')
-rw-r--r--packages/order-watcher/src/index.ts1
-rw-r--r--packages/order-watcher/src/order_watcher/order_watcher_web_socket_server.ts200
-rw-r--r--packages/order-watcher/src/types.ts66
3 files changed, 266 insertions, 1 deletions
diff --git a/packages/order-watcher/src/index.ts b/packages/order-watcher/src/index.ts
index 5eeba3e87..e275a0c6a 100644
--- a/packages/order-watcher/src/index.ts
+++ b/packages/order-watcher/src/index.ts
@@ -1,4 +1,5 @@
export { OrderWatcher } from './order_watcher/order_watcher';
+export { OrderWatcherWebSocketServer } from './order_watcher/order_watcher_web_socket_server';
export { ExpirationWatcher } from './order_watcher/expiration_watcher';
export {
diff --git a/packages/order-watcher/src/order_watcher/order_watcher_web_socket_server.ts b/packages/order-watcher/src/order_watcher/order_watcher_web_socket_server.ts
new file mode 100644
index 000000000..b75b07603
--- /dev/null
+++ b/packages/order-watcher/src/order_watcher/order_watcher_web_socket_server.ts
@@ -0,0 +1,200 @@
+import { ContractAddresses } from '@0x/contract-addresses';
+import { schemas } from '@0x/json-schemas';
+import { OrderStateInvalid, OrderStateValid, SignedOrder } from '@0x/types';
+import { BigNumber, logUtils } from '@0x/utils';
+import { Provider } from 'ethereum-types';
+import * as http from 'http';
+import * as WebSocket from 'websocket';
+
+import { GetStatsResult, OrderWatcherConfig, OrderWatcherMethod, WebSocketRequest, WebSocketResponse } from '../types';
+import { assert } from '../utils/assert';
+
+import { OrderWatcher } from './order_watcher';
+
+const DEFAULT_HTTP_PORT = 8080;
+const JSON_RPC_VERSION = '2.0';
+
+// Wraps the OrderWatcher functionality in a WebSocket server. Motivations:
+// 1) Users can watch orders via non-typescript programs.
+// 2) Better encapsulation so that users can work
+export class OrderWatcherWebSocketServer {
+ private readonly _orderWatcher: OrderWatcher;
+ private readonly _httpServer: http.Server;
+ private readonly _connectionStore: Set<WebSocket.connection>;
+ private readonly _wsServer: WebSocket.server;
+ private readonly _isVerbose: boolean;
+ /**
+ * Recover types lost when the payload is stringified.
+ */
+ private static _parseSignedOrder(rawRequest: any): SignedOrder {
+ const bigNumberFields = [
+ 'salt',
+ 'makerFee',
+ 'takerFee',
+ 'makerAssetAmount',
+ 'takerAssetAmount',
+ 'expirationTimeSeconds',
+ ];
+ for (const field of bigNumberFields) {
+ rawRequest[field] = new BigNumber(rawRequest[field]);
+ }
+ return rawRequest;
+ }
+
+ /**
+ * Instantiate a new WebSocket server which provides OrderWatcher functionality
+ * @param provider Web3 provider to use for JSON RPC calls.
+ * @param networkId NetworkId to watch orders on.
+ * @param contractAddresses Optional contract addresses. Defaults to known
+ * addresses based on networkId.
+ * @param orderWatcherConfig OrderWatcher configurations. isVerbose sets the verbosity for the WebSocket server aswell.
+ * @param isVerbose Whether to enable verbose logging. Defaults to true.
+ */
+ constructor(
+ provider: Provider,
+ networkId: number,
+ contractAddresses?: ContractAddresses,
+ orderWatcherConfig?: Partial<OrderWatcherConfig>,
+ ) {
+ this._isVerbose =
+ orderWatcherConfig !== undefined && orderWatcherConfig.isVerbose !== undefined
+ ? orderWatcherConfig.isVerbose
+ : true;
+ this._orderWatcher = new OrderWatcher(provider, networkId, contractAddresses, orderWatcherConfig);
+ this._connectionStore = new Set();
+ this._httpServer = http.createServer();
+ this._wsServer = new WebSocket.server({
+ httpServer: this._httpServer,
+ // Avoid setting autoAcceptConnections to true as it defeats all
+ // standard cross-origin protection facilities built into the protocol
+ // and the browser.
+ // Source: https://www.npmjs.com/package/websocket#server-example
+ // Also ensures that a request event is emitted by
+ // the server whenever a new WebSocket request is made.
+ autoAcceptConnections: false,
+ });
+
+ this._wsServer.on('request', async (request: any) => {
+ // Designed for usage pattern where client and server are run on the same
+ // machine by the same user. As such, no security checks are in place.
+ const connection: WebSocket.connection = request.accept(null, request.origin);
+ this._log(`${new Date()} [Server] Accepted connection from origin ${request.origin}.`);
+ connection.on('message', this._onMessageCallbackAsync.bind(this, connection));
+ connection.on('close', this._onCloseCallback.bind(this, connection));
+ this._connectionStore.add(connection);
+ });
+ }
+
+ /**
+ * Activates the WebSocket server by subscribing to the OrderWatcher and
+ * starting the WebSocket's HTTP server
+ */
+ public start(): void {
+ // Have the WebSocket server subscribe to the OrderWatcher to receive updates.
+ // These updates are then broadcast to clients in the _connectionStore.
+ this._orderWatcher.subscribe(this._broadcastCallback.bind(this));
+
+ const port = process.env.ORDER_WATCHER_HTTP_PORT || DEFAULT_HTTP_PORT;
+ this._httpServer.listen(port, () => {
+ this._log(`${new Date()} [Server] Listening on port ${port}`);
+ });
+ }
+
+ /**
+ * Deactivates the WebSocket server by stopping the HTTP server from accepting
+ * new connections and unsubscribing from the OrderWatcher
+ */
+ public stop(): void {
+ this._httpServer.close();
+ this._orderWatcher.unsubscribe();
+ }
+
+ private _log(...args: any[]): void {
+ if (this._isVerbose) {
+ logUtils.log(...args);
+ }
+ }
+
+ private async _onMessageCallbackAsync(connection: WebSocket.connection, message: any): Promise<void> {
+ let response: WebSocketResponse;
+ let id: number | null = null;
+ try {
+ assert.doesConformToSchema('message', message, schemas.orderWatcherWebSocketUtf8MessageSchema);
+ const request: WebSocketRequest = JSON.parse(message.utf8Data);
+ id = request.id;
+ assert.doesConformToSchema('request', request, schemas.orderWatcherWebSocketRequestSchema);
+ assert.isString(request.jsonrpc, JSON_RPC_VERSION);
+ response = {
+ id,
+ jsonrpc: JSON_RPC_VERSION,
+ method: request.method,
+ result: await this._routeRequestAsync(request),
+ };
+ } catch (err) {
+ response = {
+ id,
+ jsonrpc: JSON_RPC_VERSION,
+ method: null,
+ error: err.toString(),
+ };
+ }
+ this._log(`${new Date()} [Server] OrderWatcher output: ${JSON.stringify(response)}`);
+ connection.sendUTF(JSON.stringify(response));
+ }
+
+ private _onCloseCallback(connection: WebSocket.connection): void {
+ this._connectionStore.delete(connection);
+ this._log(`${new Date()} [Server] Client ${connection.remoteAddress} disconnected.`);
+ }
+
+ private async _routeRequestAsync(request: WebSocketRequest): Promise<GetStatsResult | undefined> {
+ this._log(`${new Date()} [Server] Request received: ${request.method}`);
+ switch (request.method) {
+ case OrderWatcherMethod.AddOrder: {
+ const signedOrder: SignedOrder = OrderWatcherWebSocketServer._parseSignedOrder(
+ request.params.signedOrder,
+ );
+ await this._orderWatcher.addOrderAsync(signedOrder);
+ break;
+ }
+ case OrderWatcherMethod.RemoveOrder: {
+ this._orderWatcher.removeOrder(request.params.orderHash || 'undefined');
+ break;
+ }
+ case OrderWatcherMethod.GetStats: {
+ return this._orderWatcher.getStats();
+ }
+ default:
+ // Should never reach here. Should be caught by JSON schema check.
+ throw new Error(`Unexpected default case hit for request.method`);
+ }
+ return undefined;
+ }
+
+ /**
+ * Broadcasts OrderState changes to ALL connected clients. At the moment,
+ * we do not support clients subscribing to only a subset of orders. As such,
+ * Client B will be notified of changes to an order that Client A added.
+ */
+ private _broadcastCallback(err: Error | null, orderState?: OrderStateValid | OrderStateInvalid | undefined): void {
+ const method = OrderWatcherMethod.Update;
+ const response =
+ err === null
+ ? {
+ jsonrpc: JSON_RPC_VERSION,
+ method,
+ result: orderState,
+ }
+ : {
+ jsonrpc: JSON_RPC_VERSION,
+ method,
+ error: {
+ code: -32000,
+ message: err.message,
+ },
+ };
+ this._connectionStore.forEach((connection: WebSocket.connection) => {
+ connection.sendUTF(JSON.stringify(response));
+ });
+ }
+}
diff --git a/packages/order-watcher/src/types.ts b/packages/order-watcher/src/types.ts
index 8078dd971..2b529a939 100644
--- a/packages/order-watcher/src/types.ts
+++ b/packages/order-watcher/src/types.ts
@@ -1,4 +1,4 @@
-import { OrderState } from '@0x/types';
+import { OrderState, SignedOrder } from '@0x/types';
import { LogEntryEvent } from 'ethereum-types';
export enum OrderWatcherError {
@@ -31,3 +31,67 @@ export enum InternalOrderWatcherError {
ZrxNotInTokenRegistry = 'ZRX_NOT_IN_TOKEN_REGISTRY',
WethNotInTokenRegistry = 'WETH_NOT_IN_TOKEN_REGISTRY',
}
+
+export enum OrderWatcherMethod {
+ // Methods initiated by the user.
+ GetStats = 'GET_STATS',
+ AddOrder = 'ADD_ORDER',
+ RemoveOrder = 'REMOVE_ORDER',
+ // These are spontaneous; they are primarily orderstate changes.
+ Update = 'UPDATE',
+ // `subscribe` and `unsubscribe` are methods of OrderWatcher, but we don't
+ // need to expose them to the WebSocket server user because the user implicitly
+ // subscribes and unsubscribes by connecting and disconnecting from the server.
+}
+
+// Users have to create a json object of this format and attach it to
+// the data field of their WebSocket message to interact with the server.
+export type WebSocketRequest = AddOrderRequest | RemoveOrderRequest | GetStatsRequest;
+
+export interface AddOrderRequest {
+ id: number;
+ jsonrpc: string;
+ method: OrderWatcherMethod.AddOrder;
+ params: { signedOrder: SignedOrder };
+}
+
+export interface RemoveOrderRequest {
+ id: number;
+ jsonrpc: string;
+ method: OrderWatcherMethod.RemoveOrder;
+ params: { orderHash: string };
+}
+
+export interface GetStatsRequest {
+ id: number;
+ jsonrpc: string;
+ method: OrderWatcherMethod.GetStats;
+}
+
+// Users should expect a json object of this format in the data field
+// of the WebSocket messages that the server sends out.
+export type WebSocketResponse = SuccessfulWebSocketResponse | ErrorWebSocketResponse;
+
+export interface SuccessfulWebSocketResponse {
+ id: number;
+ jsonrpc: string;
+ method: OrderWatcherMethod;
+ result: OrderState | GetStatsResult | undefined; // result is undefined for ADD_ORDER and REMOVE_ORDER
+}
+
+export interface ErrorWebSocketResponse {
+ id: number | null;
+ jsonrpc: string;
+ method: null;
+ error: JSONRPCError;
+}
+
+export interface JSONRPCError {
+ code: number;
+ message: string;
+ data?: string | object;
+}
+
+export interface GetStatsResult {
+ orderCount: number;
+}