aboutsummaryrefslogtreecommitdiffstats
path: root/packages/order-watcher/src/order_watcher/order_watcher_web_socket_server.ts
blob: b75b0760386869d8482f08b87f8a8878b95f0eb6 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
import { ContractAddresses } from '@0x/contract-addresses';
import { schemas } from '@0x/json-schemas';
import { OrderStateInvalid, OrderStateValid, SignedOrder } from '@0x/types';
import { BigNumber, logUtils } from '@0x/utils';
import { Provider } from 'ethereum-types';
import * as http from 'http';
import * as WebSocket from 'websocket';

import { GetStatsResult, OrderWatcherConfig, OrderWatcherMethod, WebSocketRequest, WebSocketResponse } from '../types';
import { assert } from '../utils/assert';

import { OrderWatcher } from './order_watcher';

const DEFAULT_HTTP_PORT = 8080;
const JSON_RPC_VERSION = '2.0';

// Wraps the OrderWatcher functionality in a WebSocket server. Motivations:
// 1) Users can watch orders via non-typescript programs.
// 2) Better encapsulation so that users can work
export class OrderWatcherWebSocketServer {
    private readonly _orderWatcher: OrderWatcher;
    private readonly _httpServer: http.Server;
    private readonly _connectionStore: Set<WebSocket.connection>;
    private readonly _wsServer: WebSocket.server;
    private readonly _isVerbose: boolean;
    /**
     *  Recover types lost when the payload is stringified.
     */
    private static _parseSignedOrder(rawRequest: any): SignedOrder {
        const bigNumberFields = [
            'salt',
            'makerFee',
            'takerFee',
            'makerAssetAmount',
            'takerAssetAmount',
            'expirationTimeSeconds',
        ];
        for (const field of bigNumberFields) {
            rawRequest[field] = new BigNumber(rawRequest[field]);
        }
        return rawRequest;
    }

    /**
     * Instantiate a new WebSocket server which provides OrderWatcher functionality
     *  @param provider Web3 provider to use for JSON RPC calls.
     *  @param networkId NetworkId to watch orders on.
     *  @param contractAddresses Optional contract addresses. Defaults to known
     *  addresses based on networkId.
     *  @param orderWatcherConfig OrderWatcher configurations. isVerbose sets the verbosity for the WebSocket server aswell.
     *  @param isVerbose Whether to enable verbose logging. Defaults to true.
     */
    constructor(
        provider: Provider,
        networkId: number,
        contractAddresses?: ContractAddresses,
        orderWatcherConfig?: Partial<OrderWatcherConfig>,
    ) {
        this._isVerbose =
            orderWatcherConfig !== undefined && orderWatcherConfig.isVerbose !== undefined
                ? orderWatcherConfig.isVerbose
                : true;
        this._orderWatcher = new OrderWatcher(provider, networkId, contractAddresses, orderWatcherConfig);
        this._connectionStore = new Set();
        this._httpServer = http.createServer();
        this._wsServer = new WebSocket.server({
            httpServer: this._httpServer,
            // Avoid setting autoAcceptConnections to true as it defeats all
            // standard cross-origin protection facilities built into the protocol
            // and the browser.
            // Source: https://www.npmjs.com/package/websocket#server-example
            // Also ensures that a request event is emitted by
            // the server whenever a new WebSocket request is made.
            autoAcceptConnections: false,
        });

        this._wsServer.on('request', async (request: any) => {
            // Designed for usage pattern where client and server are run on the same
            // machine by the same user. As such, no security checks are in place.
            const connection: WebSocket.connection = request.accept(null, request.origin);
            this._log(`${new Date()} [Server] Accepted connection from origin ${request.origin}.`);
            connection.on('message', this._onMessageCallbackAsync.bind(this, connection));
            connection.on('close', this._onCloseCallback.bind(this, connection));
            this._connectionStore.add(connection);
        });
    }

    /**
     * Activates the WebSocket server by subscribing to the OrderWatcher and
     * starting the WebSocket's HTTP server
     */
    public start(): void {
        // Have the WebSocket server subscribe to the OrderWatcher to receive updates.
        // These updates are then broadcast to clients in the _connectionStore.
        this._orderWatcher.subscribe(this._broadcastCallback.bind(this));

        const port = process.env.ORDER_WATCHER_HTTP_PORT || DEFAULT_HTTP_PORT;
        this._httpServer.listen(port, () => {
            this._log(`${new Date()} [Server] Listening on port ${port}`);
        });
    }

    /**
     * Deactivates the WebSocket server by stopping the HTTP server from accepting
     * new connections and unsubscribing from the OrderWatcher
     */
    public stop(): void {
        this._httpServer.close();
        this._orderWatcher.unsubscribe();
    }

    private _log(...args: any[]): void {
        if (this._isVerbose) {
            logUtils.log(...args);
        }
    }

    private async _onMessageCallbackAsync(connection: WebSocket.connection, message: any): Promise<void> {
        let response: WebSocketResponse;
        let id: number | null = null;
        try {
            assert.doesConformToSchema('message', message, schemas.orderWatcherWebSocketUtf8MessageSchema);
            const request: WebSocketRequest = JSON.parse(message.utf8Data);
            id = request.id;
            assert.doesConformToSchema('request', request, schemas.orderWatcherWebSocketRequestSchema);
            assert.isString(request.jsonrpc, JSON_RPC_VERSION);
            response = {
                id,
                jsonrpc: JSON_RPC_VERSION,
                method: request.method,
                result: await this._routeRequestAsync(request),
            };
        } catch (err) {
            response = {
                id,
                jsonrpc: JSON_RPC_VERSION,
                method: null,
                error: err.toString(),
            };
        }
        this._log(`${new Date()} [Server] OrderWatcher output: ${JSON.stringify(response)}`);
        connection.sendUTF(JSON.stringify(response));
    }

    private _onCloseCallback(connection: WebSocket.connection): void {
        this._connectionStore.delete(connection);
        this._log(`${new Date()} [Server] Client ${connection.remoteAddress} disconnected.`);
    }

    private async _routeRequestAsync(request: WebSocketRequest): Promise<GetStatsResult | undefined> {
        this._log(`${new Date()} [Server] Request received: ${request.method}`);
        switch (request.method) {
            case OrderWatcherMethod.AddOrder: {
                const signedOrder: SignedOrder = OrderWatcherWebSocketServer._parseSignedOrder(
                    request.params.signedOrder,
                );
                await this._orderWatcher.addOrderAsync(signedOrder);
                break;
            }
            case OrderWatcherMethod.RemoveOrder: {
                this._orderWatcher.removeOrder(request.params.orderHash || 'undefined');
                break;
            }
            case OrderWatcherMethod.GetStats: {
                return this._orderWatcher.getStats();
            }
            default:
                // Should never reach here. Should be caught by JSON schema check.
                throw new Error(`Unexpected default case hit for request.method`);
        }
        return undefined;
    }

    /**
     * Broadcasts OrderState changes to ALL connected clients. At the moment,
     * we do not support clients subscribing to only a subset of orders. As such,
     * Client B will be notified of changes to an order that Client A added.
     */
    private _broadcastCallback(err: Error | null, orderState?: OrderStateValid | OrderStateInvalid | undefined): void {
        const method = OrderWatcherMethod.Update;
        const response =
            err === null
                ? {
                      jsonrpc: JSON_RPC_VERSION,
                      method,
                      result: orderState,
                  }
                : {
                      jsonrpc: JSON_RPC_VERSION,
                      method,
                      error: {
                          code: -32000,
                          message: err.message,
                      },
                  };
        this._connectionStore.forEach((connection: WebSocket.connection) => {
            connection.sendUTF(JSON.stringify(response));
        });
    }
}