diff options
author | Fabio Berger <me@fabioberger.com> | 2018-12-16 13:34:35 +0800 |
---|---|---|
committer | Fabio Berger <me@fabioberger.com> | 2018-12-16 13:34:35 +0800 |
commit | 7cafe396de676cec3859c76d6407a0948a8e398e (patch) | |
tree | 746594ed8d3a83819b8c06f173f716c029068e3a /packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts | |
parent | 6bb2ef923894a3572c3fa824c3bf1a69759eb43a (diff) | |
download | dexon-0x-contracts-7cafe396de676cec3859c76d6407a0948a8e398e.tar dexon-0x-contracts-7cafe396de676cec3859c76d6407a0948a8e398e.tar.gz dexon-0x-contracts-7cafe396de676cec3859c76d6407a0948a8e398e.tar.bz2 dexon-0x-contracts-7cafe396de676cec3859c76d6407a0948a8e398e.tar.lz dexon-0x-contracts-7cafe396de676cec3859c76d6407a0948a8e398e.tar.xz dexon-0x-contracts-7cafe396de676cec3859c76d6407a0948a8e398e.tar.zst dexon-0x-contracts-7cafe396de676cec3859c76d6407a0948a8e398e.zip |
Ensure fileName matches class name, fix broadcast
Diffstat (limited to 'packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts')
-rw-r--r-- | packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts | 186 |
1 files changed, 186 insertions, 0 deletions
diff --git a/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts b/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts new file mode 100644 index 000000000..2d2d9e82e --- /dev/null +++ b/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts @@ -0,0 +1,186 @@ +import { ContractAddresses } from '@0x/contract-addresses'; +import { OrderStateInvalid, OrderStateValid, SignedOrder } from '@0x/types'; +import { BigNumber, logUtils } from '@0x/utils'; +import { Provider } from 'ethereum-types'; +import * as http from 'http'; +import * as WebSocket from 'websocket'; + +import { webSocketRequestSchema, webSocketUtf8MessageSchema } from '../schemas/websocket_schemas'; +import { GetStatsResult, OrderWatcherConfig, OrderWatcherMethod, WebSocketRequest, WebSocketResponse } from '../types'; +import { assert } from '../utils/assert'; + +import { OrderWatcher } from './order_watcher'; + +const DEFAULT_HTTP_PORT = 8080; +const JSONRPC_VERSION = '2.0'; + +// Wraps the OrderWatcher functionality in a WebSocket server. Motivations: +// 1) Users can watch orders via non-typescript programs. +// 2) Better encapsulation so that users can work +export class OrderWatcherWebSocketServer { + private readonly _orderWatcher: OrderWatcher; + private readonly _httpServer: http.Server; + private readonly _connectionStore: Set<WebSocket.connection>; + private readonly _wsServer: WebSocket.server; + private _jsonRpcRequestId: number; + /** + * Recover types lost when the payload is stringified. + */ + private static _parseSignedOrder(rawRequest: any): SignedOrder { + const bigNumberFields = [ + 'salt', + 'makerFee', + 'takerFee', + 'makerAssetAmount', + 'takerAssetAmount', + 'expirationTimeSeconds', + ]; + for (const field of bigNumberFields) { + rawRequest[field] = new BigNumber(rawRequest[field]); + } + return rawRequest; + } + + /** + * Instantiate a new WebSocket server which provides OrderWatcher functionality + * @param provider Web3 provider to use for JSON RPC calls. + * @param networkId NetworkId to watch orders on. + * @param contractAddresses Optional contract addresses. Defaults to known + * addresses based on networkId. + * @param partialConfig Optional configurations. + */ + constructor( + provider: Provider, + networkId: number, + contractAddresses?: ContractAddresses, + partialConfig?: Partial<OrderWatcherConfig>, + ) { + this._jsonRpcRequestId = 1; + this._orderWatcher = new OrderWatcher(provider, networkId, contractAddresses, partialConfig); + this._connectionStore = new Set(); + this._httpServer = http.createServer(); + this._wsServer = new WebSocket.server({ + httpServer: this._httpServer, + // Avoid setting autoAcceptConnections to true as it defeats all + // standard cross-origin protection facilities built into the protocol + // and the browser. + // Source: https://www.npmjs.com/package/websocket#server-example + // Also ensures that a request event is emitted by + // the server whenever a new WebSocket request is made. + autoAcceptConnections: false, + }); + + this._wsServer.on('request', async (request: any) => { + // Designed for usage pattern where client and server are run on the same + // machine by the same user. As such, no security checks are in place. + const connection: WebSocket.connection = request.accept(null, request.origin); + logUtils.log(`${new Date()} [Server] Accepted connection from origin ${request.origin}.`); + connection.on('message', this._onMessageCallbackAsync.bind(this, connection)); + connection.on('close', this._onCloseCallback.bind(this, connection)); + this._connectionStore.add(connection); + }); + + // Have the WebSocket server subscribe to the OrderWatcher to receive updates. + // These updates are then broadcast to clients in the _connectionStore. + this._orderWatcher.subscribe(this._broadcastCallback.bind(this)); + } + + /** + * Activates the WebSocket server by having its HTTP server start listening. + */ + public listen(): void { + const port = process.env.ORDER_WATCHER_HTTP_PORT || DEFAULT_HTTP_PORT; + this._httpServer.listen(port, () => { + logUtils.log(`${new Date()} [Server] Listening on port ${port}`); + }); + } + + /** + * Deactivates the WebSocket server by stopping the HTTP server from accepting + * new connections. + */ + public close(): void { + this._httpServer.close(); + } + + private async _onMessageCallbackAsync(connection: WebSocket.connection, message: any): Promise<void> { + let response: WebSocketResponse; + try { + assert.doesConformToSchema('message', message, webSocketUtf8MessageSchema); + const request: WebSocketRequest = JSON.parse(message.utf8Data); + assert.doesConformToSchema('request', request, webSocketRequestSchema); + assert.isString(request.jsonrpc, JSONRPC_VERSION); + response = { + id: request.id, + jsonrpc: JSONRPC_VERSION, + method: request.method, + result: await this._routeRequestAsync(request), + }; + } catch (err) { + response = { + id: null, + jsonrpc: JSONRPC_VERSION, + method: null, + error: err.toString(), + }; + } + logUtils.log(`${new Date()} [Server] OrderWatcher output: ${JSON.stringify(response)}`); + connection.sendUTF(JSON.stringify(response)); + } + + private _onCloseCallback(connection: WebSocket.connection): void { + this._connectionStore.delete(connection); + logUtils.log(`${new Date()} [Server] Client ${connection.remoteAddress} disconnected.`); + } + + private async _routeRequestAsync(request: WebSocketRequest): Promise<GetStatsResult | undefined> { + logUtils.log(`${new Date()} [Server] Request received: ${request.method}`); + switch (request.method) { + case OrderWatcherMethod.AddOrder: { + const signedOrder: SignedOrder = OrderWatcherWebSocketServer._parseSignedOrder( + request.params.signedOrder, + ); + await this._orderWatcher.addOrderAsync(signedOrder); + break; + } + case OrderWatcherMethod.RemoveOrder: { + this._orderWatcher.removeOrder(request.params.orderHash || 'undefined'); + break; + } + case OrderWatcherMethod.GetStats: { + return this._orderWatcher.getStats(); + break; + } + default: + // Should never reach here. Should be caught by JSON schema check. + } + return undefined; + } + + /** + * Broadcasts OrderState changes to ALL connected clients. At the moment, + * we do not support clients subscribing to only a subset of orders. As such, + * Client B will be notified of changes to an order that Client A added. + */ + private _broadcastCallback(err: Error | null, orderState?: OrderStateValid | OrderStateInvalid | undefined): void { + const method = OrderWatcherMethod.Update; + const response = + err === null + ? { + jsonrpc: JSONRPC_VERSION, + method, + result: orderState, + } + : { + jsonrpc: JSONRPC_VERSION, + method, + error: { + code: -32000, + message: err.message, + }, + }; + this._connectionStore.forEach((connection: WebSocket.connection) => { + connection.sendUTF(JSON.stringify(response)); + }); + } +} |