diff options
Diffstat (limited to 'packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts')
-rw-r--r-- | packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts | 36 |
1 files changed, 19 insertions, 17 deletions
diff --git a/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts b/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts index a1b63128f..eac48f849 100644 --- a/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts +++ b/packages/order-watcher/src/order_watcher/order_watcher_websocket_server.ts @@ -12,7 +12,7 @@ import { assert } from '../utils/assert'; import { OrderWatcher } from './order_watcher'; const DEFAULT_HTTP_PORT = 8080; -const JSONRPC_VERSION = '2.0'; +const JSON_RPC_VERSION = '2.0'; // Wraps the OrderWatcher functionality in a WebSocket server. Motivations: // 1) Users can watch orders via non-typescript programs. @@ -77,16 +77,17 @@ export class OrderWatcherWebSocketServer { connection.on('close', this._onCloseCallback.bind(this, connection)); this._connectionStore.add(connection); }); + } + /** + * Activates the WebSocket server by subscribing to the OrderWatcher and + * starting the WebSocket's HTTP server + */ + public start(): void { // Have the WebSocket server subscribe to the OrderWatcher to receive updates. // These updates are then broadcast to clients in the _connectionStore. this._orderWatcher.subscribe(this._broadcastCallback.bind(this)); - } - /** - * Activates the WebSocket server by having its HTTP server start listening. - */ - public listen(): void { const port = process.env.ORDER_WATCHER_HTTP_PORT || DEFAULT_HTTP_PORT; this._httpServer.listen(port, () => { logUtils.log(`${new Date()} [Server] Listening on port ${port}`); @@ -95,29 +96,30 @@ export class OrderWatcherWebSocketServer { /** * Deactivates the WebSocket server by stopping the HTTP server from accepting - * new connections. + * new connections and unsubscribing from the OrderWatcher */ - public close(): void { + public stop(): void { this._httpServer.close(); + this._orderWatcher.unsubscribe(); } private async _onMessageCallbackAsync(connection: WebSocket.connection, message: any): Promise<void> { let response: WebSocketResponse; + assert.doesConformToSchema('message', message, webSocketUtf8MessageSchema); + const request: WebSocketRequest = JSON.parse(message.utf8Data); + assert.doesConformToSchema('request', request, webSocketRequestSchema); + assert.isString(request.jsonrpc, JSON_RPC_VERSION); try { - assert.doesConformToSchema('message', message, webSocketUtf8MessageSchema); - const request: WebSocketRequest = JSON.parse(message.utf8Data); - assert.doesConformToSchema('request', request, webSocketRequestSchema); - assert.isString(request.jsonrpc, JSONRPC_VERSION); response = { id: request.id, - jsonrpc: JSONRPC_VERSION, + jsonrpc: JSON_RPC_VERSION, method: request.method, result: await this._routeRequestAsync(request), }; } catch (err) { response = { - id: null, - jsonrpc: JSONRPC_VERSION, + id: request.id, + jsonrpc: JSON_RPC_VERSION, method: null, error: err.toString(), }; @@ -165,12 +167,12 @@ export class OrderWatcherWebSocketServer { const response = err === null ? { - jsonrpc: JSONRPC_VERSION, + jsonrpc: JSON_RPC_VERSION, method, result: orderState, } : { - jsonrpc: JSONRPC_VERSION, + jsonrpc: JSON_RPC_VERSION, method, error: { code: -32000, |