diff options
Diffstat (limited to 'packages/order-watcher/src')
-rw-r--r-- | packages/order-watcher/src/index.ts | 1 | ||||
-rw-r--r-- | packages/order-watcher/src/order_watcher/order_watcher_web_socket_server.ts | 200 | ||||
-rw-r--r-- | packages/order-watcher/src/types.ts | 66 |
3 files changed, 266 insertions, 1 deletions
diff --git a/packages/order-watcher/src/index.ts b/packages/order-watcher/src/index.ts index 5eeba3e87..e275a0c6a 100644 --- a/packages/order-watcher/src/index.ts +++ b/packages/order-watcher/src/index.ts @@ -1,4 +1,5 @@ export { OrderWatcher } from './order_watcher/order_watcher'; +export { OrderWatcherWebSocketServer } from './order_watcher/order_watcher_web_socket_server'; export { ExpirationWatcher } from './order_watcher/expiration_watcher'; export { diff --git a/packages/order-watcher/src/order_watcher/order_watcher_web_socket_server.ts b/packages/order-watcher/src/order_watcher/order_watcher_web_socket_server.ts new file mode 100644 index 000000000..b75b07603 --- /dev/null +++ b/packages/order-watcher/src/order_watcher/order_watcher_web_socket_server.ts @@ -0,0 +1,200 @@ +import { ContractAddresses } from '@0x/contract-addresses'; +import { schemas } from '@0x/json-schemas'; +import { OrderStateInvalid, OrderStateValid, SignedOrder } from '@0x/types'; +import { BigNumber, logUtils } from '@0x/utils'; +import { Provider } from 'ethereum-types'; +import * as http from 'http'; +import * as WebSocket from 'websocket'; + +import { GetStatsResult, OrderWatcherConfig, OrderWatcherMethod, WebSocketRequest, WebSocketResponse } from '../types'; +import { assert } from '../utils/assert'; + +import { OrderWatcher } from './order_watcher'; + +const DEFAULT_HTTP_PORT = 8080; +const JSON_RPC_VERSION = '2.0'; + +// Wraps the OrderWatcher functionality in a WebSocket server. Motivations: +// 1) Users can watch orders via non-typescript programs. +// 2) Better encapsulation so that users can work +export class OrderWatcherWebSocketServer { + private readonly _orderWatcher: OrderWatcher; + private readonly _httpServer: http.Server; + private readonly _connectionStore: Set<WebSocket.connection>; + private readonly _wsServer: WebSocket.server; + private readonly _isVerbose: boolean; + /** + * Recover types lost when the payload is stringified. + */ + private static _parseSignedOrder(rawRequest: any): SignedOrder { + const bigNumberFields = [ + 'salt', + 'makerFee', + 'takerFee', + 'makerAssetAmount', + 'takerAssetAmount', + 'expirationTimeSeconds', + ]; + for (const field of bigNumberFields) { + rawRequest[field] = new BigNumber(rawRequest[field]); + } + return rawRequest; + } + + /** + * Instantiate a new WebSocket server which provides OrderWatcher functionality + * @param provider Web3 provider to use for JSON RPC calls. + * @param networkId NetworkId to watch orders on. + * @param contractAddresses Optional contract addresses. Defaults to known + * addresses based on networkId. + * @param orderWatcherConfig OrderWatcher configurations. isVerbose sets the verbosity for the WebSocket server aswell. + * @param isVerbose Whether to enable verbose logging. Defaults to true. + */ + constructor( + provider: Provider, + networkId: number, + contractAddresses?: ContractAddresses, + orderWatcherConfig?: Partial<OrderWatcherConfig>, + ) { + this._isVerbose = + orderWatcherConfig !== undefined && orderWatcherConfig.isVerbose !== undefined + ? orderWatcherConfig.isVerbose + : true; + this._orderWatcher = new OrderWatcher(provider, networkId, contractAddresses, orderWatcherConfig); + this._connectionStore = new Set(); + this._httpServer = http.createServer(); + this._wsServer = new WebSocket.server({ + httpServer: this._httpServer, + // Avoid setting autoAcceptConnections to true as it defeats all + // standard cross-origin protection facilities built into the protocol + // and the browser. + // Source: https://www.npmjs.com/package/websocket#server-example + // Also ensures that a request event is emitted by + // the server whenever a new WebSocket request is made. + autoAcceptConnections: false, + }); + + this._wsServer.on('request', async (request: any) => { + // Designed for usage pattern where client and server are run on the same + // machine by the same user. As such, no security checks are in place. + const connection: WebSocket.connection = request.accept(null, request.origin); + this._log(`${new Date()} [Server] Accepted connection from origin ${request.origin}.`); + connection.on('message', this._onMessageCallbackAsync.bind(this, connection)); + connection.on('close', this._onCloseCallback.bind(this, connection)); + this._connectionStore.add(connection); + }); + } + + /** + * Activates the WebSocket server by subscribing to the OrderWatcher and + * starting the WebSocket's HTTP server + */ + public start(): void { + // Have the WebSocket server subscribe to the OrderWatcher to receive updates. + // These updates are then broadcast to clients in the _connectionStore. + this._orderWatcher.subscribe(this._broadcastCallback.bind(this)); + + const port = process.env.ORDER_WATCHER_HTTP_PORT || DEFAULT_HTTP_PORT; + this._httpServer.listen(port, () => { + this._log(`${new Date()} [Server] Listening on port ${port}`); + }); + } + + /** + * Deactivates the WebSocket server by stopping the HTTP server from accepting + * new connections and unsubscribing from the OrderWatcher + */ + public stop(): void { + this._httpServer.close(); + this._orderWatcher.unsubscribe(); + } + + private _log(...args: any[]): void { + if (this._isVerbose) { + logUtils.log(...args); + } + } + + private async _onMessageCallbackAsync(connection: WebSocket.connection, message: any): Promise<void> { + let response: WebSocketResponse; + let id: number | null = null; + try { + assert.doesConformToSchema('message', message, schemas.orderWatcherWebSocketUtf8MessageSchema); + const request: WebSocketRequest = JSON.parse(message.utf8Data); + id = request.id; + assert.doesConformToSchema('request', request, schemas.orderWatcherWebSocketRequestSchema); + assert.isString(request.jsonrpc, JSON_RPC_VERSION); + response = { + id, + jsonrpc: JSON_RPC_VERSION, + method: request.method, + result: await this._routeRequestAsync(request), + }; + } catch (err) { + response = { + id, + jsonrpc: JSON_RPC_VERSION, + method: null, + error: err.toString(), + }; + } + this._log(`${new Date()} [Server] OrderWatcher output: ${JSON.stringify(response)}`); + connection.sendUTF(JSON.stringify(response)); + } + + private _onCloseCallback(connection: WebSocket.connection): void { + this._connectionStore.delete(connection); + this._log(`${new Date()} [Server] Client ${connection.remoteAddress} disconnected.`); + } + + private async _routeRequestAsync(request: WebSocketRequest): Promise<GetStatsResult | undefined> { + this._log(`${new Date()} [Server] Request received: ${request.method}`); + switch (request.method) { + case OrderWatcherMethod.AddOrder: { + const signedOrder: SignedOrder = OrderWatcherWebSocketServer._parseSignedOrder( + request.params.signedOrder, + ); + await this._orderWatcher.addOrderAsync(signedOrder); + break; + } + case OrderWatcherMethod.RemoveOrder: { + this._orderWatcher.removeOrder(request.params.orderHash || 'undefined'); + break; + } + case OrderWatcherMethod.GetStats: { + return this._orderWatcher.getStats(); + } + default: + // Should never reach here. Should be caught by JSON schema check. + throw new Error(`Unexpected default case hit for request.method`); + } + return undefined; + } + + /** + * Broadcasts OrderState changes to ALL connected clients. At the moment, + * we do not support clients subscribing to only a subset of orders. As such, + * Client B will be notified of changes to an order that Client A added. + */ + private _broadcastCallback(err: Error | null, orderState?: OrderStateValid | OrderStateInvalid | undefined): void { + const method = OrderWatcherMethod.Update; + const response = + err === null + ? { + jsonrpc: JSON_RPC_VERSION, + method, + result: orderState, + } + : { + jsonrpc: JSON_RPC_VERSION, + method, + error: { + code: -32000, + message: err.message, + }, + }; + this._connectionStore.forEach((connection: WebSocket.connection) => { + connection.sendUTF(JSON.stringify(response)); + }); + } +} diff --git a/packages/order-watcher/src/types.ts b/packages/order-watcher/src/types.ts index 8078dd971..2b529a939 100644 --- a/packages/order-watcher/src/types.ts +++ b/packages/order-watcher/src/types.ts @@ -1,4 +1,4 @@ -import { OrderState } from '@0x/types'; +import { OrderState, SignedOrder } from '@0x/types'; import { LogEntryEvent } from 'ethereum-types'; export enum OrderWatcherError { @@ -31,3 +31,67 @@ export enum InternalOrderWatcherError { ZrxNotInTokenRegistry = 'ZRX_NOT_IN_TOKEN_REGISTRY', WethNotInTokenRegistry = 'WETH_NOT_IN_TOKEN_REGISTRY', } + +export enum OrderWatcherMethod { + // Methods initiated by the user. + GetStats = 'GET_STATS', + AddOrder = 'ADD_ORDER', + RemoveOrder = 'REMOVE_ORDER', + // These are spontaneous; they are primarily orderstate changes. + Update = 'UPDATE', + // `subscribe` and `unsubscribe` are methods of OrderWatcher, but we don't + // need to expose them to the WebSocket server user because the user implicitly + // subscribes and unsubscribes by connecting and disconnecting from the server. +} + +// Users have to create a json object of this format and attach it to +// the data field of their WebSocket message to interact with the server. +export type WebSocketRequest = AddOrderRequest | RemoveOrderRequest | GetStatsRequest; + +export interface AddOrderRequest { + id: number; + jsonrpc: string; + method: OrderWatcherMethod.AddOrder; + params: { signedOrder: SignedOrder }; +} + +export interface RemoveOrderRequest { + id: number; + jsonrpc: string; + method: OrderWatcherMethod.RemoveOrder; + params: { orderHash: string }; +} + +export interface GetStatsRequest { + id: number; + jsonrpc: string; + method: OrderWatcherMethod.GetStats; +} + +// Users should expect a json object of this format in the data field +// of the WebSocket messages that the server sends out. +export type WebSocketResponse = SuccessfulWebSocketResponse | ErrorWebSocketResponse; + +export interface SuccessfulWebSocketResponse { + id: number; + jsonrpc: string; + method: OrderWatcherMethod; + result: OrderState | GetStatsResult | undefined; // result is undefined for ADD_ORDER and REMOVE_ORDER +} + +export interface ErrorWebSocketResponse { + id: number | null; + jsonrpc: string; + method: null; + error: JSONRPCError; +} + +export interface JSONRPCError { + code: number; + message: string; + data?: string | object; +} + +export interface GetStatsResult { + orderCount: number; +} |