From d9b1d31e7310f7f554f1740f93e4ccd5b5db90f5 Mon Sep 17 00:00:00 2001 From: Brandon Millman Date: Tue, 6 Feb 2018 15:15:24 -0800 Subject: Organize async task queues by network --- packages/testnet-faucets/src/ts/dispatch_queue.ts | 47 ++++++++++ .../testnet-faucets/src/ts/dispense_asset_tasks.ts | 54 +++++++++++ .../testnet-faucets/src/ts/ether_request_queue.ts | 27 ------ packages/testnet-faucets/src/ts/handler.ts | 100 +++++++++++---------- packages/testnet-faucets/src/ts/request_queue.ts | 56 ------------ .../testnet-faucets/src/ts/zrx_request_queue.ts | 43 --------- 6 files changed, 156 insertions(+), 171 deletions(-) create mode 100644 packages/testnet-faucets/src/ts/dispatch_queue.ts create mode 100644 packages/testnet-faucets/src/ts/dispense_asset_tasks.ts delete mode 100644 packages/testnet-faucets/src/ts/ether_request_queue.ts delete mode 100644 packages/testnet-faucets/src/ts/request_queue.ts delete mode 100644 packages/testnet-faucets/src/ts/zrx_request_queue.ts (limited to 'packages') diff --git a/packages/testnet-faucets/src/ts/dispatch_queue.ts b/packages/testnet-faucets/src/ts/dispatch_queue.ts new file mode 100644 index 000000000..94d094203 --- /dev/null +++ b/packages/testnet-faucets/src/ts/dispatch_queue.ts @@ -0,0 +1,47 @@ +import { intervalUtils } from '@0xproject/utils'; +import * as _ from 'lodash'; + +const MAX_QUEUE_SIZE = 500; +const DEFAULT_QUEUE_INTERVAL_MS = 1000; + +export class DispatchQueue { + private _queueIntervalMs: number; + private _queue: Array<() => Promise>; + private _queueIntervalIdIfExists?: NodeJS.Timer; + constructor() { + this._queueIntervalMs = DEFAULT_QUEUE_INTERVAL_MS; + this._queue = []; + this._start(); + } + public add(task: () => Promise): boolean { + if (this.isFull()) { + return false; + } + this._queue.push(task); + return true; + } + public size(): number { + return this._queue.length; + } + public isFull(): boolean { + return this.size() >= MAX_QUEUE_SIZE; + } + public stop() { + if (!_.isUndefined(this._queueIntervalIdIfExists)) { + intervalUtils.clearAsyncExcludingInterval(this._queueIntervalIdIfExists); + } + } + private _start() { + this._queueIntervalIdIfExists = intervalUtils.setAsyncExcludingInterval( + async () => { + const task = this._queue.shift(); + if (_.isUndefined(task)) { + return Promise.resolve(); + } + await task(); + }, + this._queueIntervalMs, + _.noop, + ); + } +} diff --git a/packages/testnet-faucets/src/ts/dispense_asset_tasks.ts b/packages/testnet-faucets/src/ts/dispense_asset_tasks.ts new file mode 100644 index 000000000..c70458fbe --- /dev/null +++ b/packages/testnet-faucets/src/ts/dispense_asset_tasks.ts @@ -0,0 +1,54 @@ +import { ZeroEx } from '0x.js'; +import { BigNumber, promisify } from '@0xproject/utils'; +import * as _ from 'lodash'; +import * as Web3 from 'web3'; + +import { configs } from './configs'; +import { errorReporter } from './error_reporter'; +import { utils } from './utils'; + +const DISPENSE_AMOUNT_ETHER = 0.1; +const DISPENSE_AMOUNT_TOKEN = 0.1; + +export const dispenseAssetTasks = { + dispenseEtherTask(recipientAddress: string, web3: Web3) { + return async () => { + utils.consoleLog(`Processing ETH ${recipientAddress}`); + const sendTransactionAsync = promisify(web3.eth.sendTransaction); + try { + const txHash = await sendTransactionAsync({ + from: configs.DISPENSER_ADDRESS, + to: recipientAddress, + value: web3.toWei(DISPENSE_AMOUNT_ETHER, 'ether'), + }); + utils.consoleLog(`Sent ${DISPENSE_AMOUNT_ETHER} ETH to ${recipientAddress} tx: ${txHash}`); + } catch (err) { + utils.consoleLog(`Unexpected err: ${err} - ${JSON.stringify(err)}`); + await errorReporter.reportAsync(err); + } + }; + }, + dispenseTokenTask(recipientAddress: string, tokenSymbol: string, zeroEx: ZeroEx) { + return async () => { + utils.consoleLog(`Processing ${tokenSymbol} ${recipientAddress}`); + const amountToDispense = new BigNumber(DISPENSE_AMOUNT_TOKEN); + try { + const token = await zeroEx.tokenRegistry.getTokenBySymbolIfExistsAsync(tokenSymbol); + if (_.isUndefined(token)) { + throw new Error(`Unsupported asset type: ${tokenSymbol}`); + } + const baseUnitAmount = ZeroEx.toBaseUnitAmount(amountToDispense, token.decimals); + const txHash = await zeroEx.token.transferAsync( + token.address, + configs.DISPENSER_ADDRESS, + recipientAddress, + baseUnitAmount, + ); + utils.consoleLog(`Sent ${amountToDispense} ZRX to ${recipientAddress} tx: ${txHash}`); + } catch (err) { + utils.consoleLog(`Unexpected err: ${err} - ${JSON.stringify(err)}`); + await errorReporter.reportAsync(err); + } + }; + }, +}; diff --git a/packages/testnet-faucets/src/ts/ether_request_queue.ts b/packages/testnet-faucets/src/ts/ether_request_queue.ts deleted file mode 100644 index 710d49f39..000000000 --- a/packages/testnet-faucets/src/ts/ether_request_queue.ts +++ /dev/null @@ -1,27 +0,0 @@ -import { promisify } from '@0xproject/utils'; -import * as _ from 'lodash'; - -import { configs } from './configs'; -import { errorReporter } from './error_reporter'; -import { RequestQueue } from './request_queue'; -import { utils } from './utils'; - -const DISPENSE_AMOUNT_ETHER = 0.1; - -export class EtherRequestQueue extends RequestQueue { - protected async _processNextRequestFireAndForgetAsync(recipientAddress: string) { - utils.consoleLog(`Processing ETH ${recipientAddress}`); - const sendTransactionAsync = promisify(this._web3.eth.sendTransaction); - try { - const txHash = await sendTransactionAsync({ - from: configs.DISPENSER_ADDRESS, - to: recipientAddress, - value: this._web3.toWei(DISPENSE_AMOUNT_ETHER, 'ether'), - }); - utils.consoleLog(`Sent ${DISPENSE_AMOUNT_ETHER} ETH to ${recipientAddress} tx: ${txHash}`); - } catch (err) { - utils.consoleLog(`Unexpected err: ${err} - ${JSON.stringify(err)}`); - await errorReporter.reportAsync(err); - } - } -} diff --git a/packages/testnet-faucets/src/ts/handler.ts b/packages/testnet-faucets/src/ts/handler.ts index d96b90802..ce7a81eba 100644 --- a/packages/testnet-faucets/src/ts/handler.ts +++ b/packages/testnet-faucets/src/ts/handler.ts @@ -1,5 +1,4 @@ import { Order, SignedOrder, ZeroEx } from '0x.js'; -import { NonceTrackerSubprovider } from '@0xproject/subproviders'; import { BigNumber } from '@0xproject/utils'; import * as express from 'express'; import * as _ from 'lodash'; @@ -10,17 +9,23 @@ import * as Web3 from 'web3'; // we are not running in a browser env. // Filed issue: https://github.com/ethereum/web3.js/issues/844 (global as any).XMLHttpRequest = undefined; +import { NonceTrackerSubprovider } from '@0xproject/subproviders'; import ProviderEngine = require('web3-provider-engine'); import HookedWalletSubprovider = require('web3-provider-engine/subproviders/hooked-wallet'); import RpcSubprovider = require('web3-provider-engine/subproviders/rpc'); import { configs } from './configs'; -import { EtherRequestQueue } from './ether_request_queue'; +import { DispatchQueue } from './dispatch_queue'; +import { dispenseAssetTasks } from './dispense_asset_tasks'; import { idManagement } from './id_management'; -import { RequestQueue } from './request_queue'; import { rpcUrls } from './rpc_urls'; import { utils } from './utils'; -import { ZRXRequestQueue } from './zrx_request_queue'; + +interface NetworkConfig { + dispatchQueue: DispatchQueue; + web3: Web3; + zeroEx: ZeroEx; +} interface ItemByNetworkId { [networkId: string]: T; @@ -35,30 +40,7 @@ enum RequestedAssetType { const FIVE_DAYS_IN_MS = 4.32e8; // TODO: make this configurable export class Handler { - private _zeroExByNetworkId: ItemByNetworkId = {}; - private _etherRequestQueueByNetworkId: ItemByNetworkId = {}; - private _zrxRequestQueueByNetworkId: ItemByNetworkId = {}; - private static _dispenseAsset( - req: express.Request, - res: express.Response, - requestQueueByNetworkId: ItemByNetworkId, - requestedAssetType: RequestedAssetType, - ) { - const requestQueue = _.get(requestQueueByNetworkId, req.params.networkId); - if (_.isUndefined(requestQueue)) { - res.status(400).send('UNSUPPORTED_NETWORK_ID'); - return; - } - const didAddToQueue = requestQueue.add(req.params.recipient); - if (!didAddToQueue) { - res.status(503).send('QUEUE_IS_FULL'); - return; - } - utils.consoleLog( - `Added ${req.params.recipient} to queue: ${requestedAssetType} networkId: ${req.params.networkId}`, - ); - res.status(200).end(); - } + private _networkConfigByNetworkId: ItemByNetworkId = {}; private static _createProviderEngine(rpcUrl: string) { const engine = new ProviderEngine(); engine.addProvider(new NonceTrackerSubprovider()); @@ -79,35 +61,31 @@ export class Handler { networkId: +networkId, }; const zeroEx = new ZeroEx(web3.currentProvider, zeroExConfig); - this._zeroExByNetworkId[networkId] = zeroEx; - this._etherRequestQueueByNetworkId[networkId] = new EtherRequestQueue(web3); - this._zrxRequestQueueByNetworkId[networkId] = new ZRXRequestQueue(web3, zeroEx); + const dispatchQueue = new DispatchQueue(); + this._networkConfigByNetworkId[networkId] = { + dispatchQueue, + web3, + zeroEx, + }; }); } public getQueueInfo(req: express.Request, res: express.Response) { res.setHeader('Content-Type', 'application/json'); const queueInfo = _.mapValues(rpcUrls, (rpcUrl: string, networkId: string) => { - const etherRequestQueue = this._etherRequestQueueByNetworkId[networkId]; - const zrxRequestQueue = this._zrxRequestQueueByNetworkId[networkId]; + const dispatchQueue = this._networkConfigByNetworkId[networkId].dispatchQueue; return { - ether: { - full: etherRequestQueue.isFull(), - size: etherRequestQueue.size(), - }, - zrx: { - full: zrxRequestQueue.isFull(), - size: zrxRequestQueue.size(), - }, + full: dispatchQueue.isFull(), + size: dispatchQueue.size(), }; }); const payload = JSON.stringify(queueInfo); res.status(200).send(payload); } public dispenseEther(req: express.Request, res: express.Response) { - Handler._dispenseAsset(req, res, this._etherRequestQueueByNetworkId, RequestedAssetType.ETH); + this._dispenseAsset(req, res, RequestedAssetType.ETH); } public dispenseZRX(req: express.Request, res: express.Response) { - Handler._dispenseAsset(req, res, this._zrxRequestQueueByNetworkId, RequestedAssetType.ZRX); + this._dispenseAsset(req, res, RequestedAssetType.ZRX); } public async dispenseWETHOrder(req: express.Request, res: express.Response) { await this._dispenseOrder(req, res, RequestedAssetType.WETH); @@ -115,12 +93,44 @@ export class Handler { public async dispenseZRXOrder(req: express.Request, res: express.Response, next: express.NextFunction) { await this._dispenseOrder(req, res, RequestedAssetType.ZRX); } + private _dispenseAsset(req: express.Request, res: express.Response, requestedAssetType: RequestedAssetType) { + const networkId = req.params.networkId; + const recipient = req.params.recipient; + const networkConfig = _.get(this._networkConfigByNetworkId, networkId); + if (_.isUndefined(networkConfig)) { + res.status(400).send('UNSUPPORTED_NETWORK_ID'); + return; + } + let dispenserTask; + switch (requestedAssetType) { + case RequestedAssetType.ETH: + dispenserTask = dispenseAssetTasks.dispenseEtherTask(recipient, networkConfig.web3); + break; + case RequestedAssetType.WETH: + dispenserTask = dispenseAssetTasks.dispenseTokenTask(recipient, requestedAssetType, networkConfig.zeroEx); + break; + case RequestedAssetType.ZRX: + dispenserTask = dispenseAssetTasks.dispenseTokenTask(recipient, requestedAssetType, networkConfig.zeroEx); + break; + default: + throw new Error(`Unsupported asset type: ${requestedAssetType}`); + } + + const didAddToQueue = networkConfig.dispatchQueue.add(dispenserTask); + if (!didAddToQueue) { + res.status(503).send('QUEUE_IS_FULL'); + return; + } + utils.consoleLog(`Added ${recipient} to queue: ${requestedAssetType} networkId: ${networkId}`); + res.status(200).end(); + } private async _dispenseOrder(req: express.Request, res: express.Response, requestedAssetType: RequestedAssetType) { - const zeroEx = _.get(this._zeroExByNetworkId, req.params.networkId); - if (_.isUndefined(zeroEx)) { + const networkConfig = _.get(this._networkConfigByNetworkId, req.params.networkId); + if (_.isUndefined(networkConfig)) { res.status(400).send('UNSUPPORTED_NETWORK_ID'); return; } + const zeroEx = networkConfig.zeroEx; res.setHeader('Content-Type', 'application/json'); const makerTokenAddress = await zeroEx.tokenRegistry.getTokenAddressBySymbolIfExistsAsync(requestedAssetType); if (_.isUndefined(makerTokenAddress)) { diff --git a/packages/testnet-faucets/src/ts/request_queue.ts b/packages/testnet-faucets/src/ts/request_queue.ts deleted file mode 100644 index 718f8be0c..000000000 --- a/packages/testnet-faucets/src/ts/request_queue.ts +++ /dev/null @@ -1,56 +0,0 @@ -import * as _ from 'lodash'; -import * as timers from 'timers'; - -// HACK: web3 leaks XMLHttpRequest into the global scope and causes requests to hang -// because they are using the wrong XHR package. -// Filed issue: https://github.com/ethereum/web3.js/issues/844 -// tslint:disable-next-line:ordered-imports -import * as Web3 from 'web3'; - -const MAX_QUEUE_SIZE = 500; -const DEFAULT_QUEUE_INTERVAL_MS = 1000; - -export class RequestQueue { - protected _queueIntervalMs: number; - protected _queue: string[]; - protected _queueIntervalId?: NodeJS.Timer; - protected _web3: Web3; - constructor(web3: any) { - this._queueIntervalMs = DEFAULT_QUEUE_INTERVAL_MS; - this._queue = []; - this._web3 = web3; - this._start(); - } - public add(recipientAddress: string): boolean { - if (this.isFull()) { - return false; - } - this._queue.push(recipientAddress); - return true; - } - public size(): number { - return this._queue.length; - } - public isFull(): boolean { - return this.size() >= MAX_QUEUE_SIZE; - } - protected _start() { - this._queueIntervalId = timers.setInterval(() => { - const recipientAddress = this._queue.shift(); - if (_.isUndefined(recipientAddress)) { - return; - } - // tslint:disable-next-line:no-floating-promises - this._processNextRequestFireAndForgetAsync(recipientAddress); - }, this._queueIntervalMs); - } - protected _stop() { - if (!_.isUndefined(this._queueIntervalId)) { - clearInterval(this._queueIntervalId); - } - } - // tslint:disable-next-line:prefer-function-over-method - protected async _processNextRequestFireAndForgetAsync(recipientAddress: string) { - throw new Error('Expected processNextRequestFireAndForgetAsync to be implemented by a subclass'); - } -} diff --git a/packages/testnet-faucets/src/ts/zrx_request_queue.ts b/packages/testnet-faucets/src/ts/zrx_request_queue.ts deleted file mode 100644 index 3659f4856..000000000 --- a/packages/testnet-faucets/src/ts/zrx_request_queue.ts +++ /dev/null @@ -1,43 +0,0 @@ -import { ZeroEx } from '0x.js'; -import { BigNumber } from '@0xproject/utils'; -import * as _ from 'lodash'; - -import { configs } from './configs'; -import { errorReporter } from './error_reporter'; -import { RequestQueue } from './request_queue'; -import { utils } from './utils'; - -// HACK: web3 leaks XMLHttpRequest into the global scope and causes requests to hang -// because they are using the wrong XHR package. -// Filed issue: https://github.com/ethereum/web3.js/issues/844 -// tslint:disable-next-line:ordered-imports -import * as Web3 from 'web3'; - -const DISPENSE_AMOUNT_ZRX = new BigNumber(0.1); -const QUEUE_INTERVAL_MS = 5000; - -export class ZRXRequestQueue extends RequestQueue { - private _zeroEx: ZeroEx; - constructor(web3: Web3, zeroEx: ZeroEx) { - super(web3); - this._queueIntervalMs = QUEUE_INTERVAL_MS; - this._zeroEx = zeroEx; - } - protected async _processNextRequestFireAndForgetAsync(recipientAddress: string) { - utils.consoleLog(`Processing ZRX ${recipientAddress}`); - const baseUnitAmount = ZeroEx.toBaseUnitAmount(DISPENSE_AMOUNT_ZRX, 18); - try { - const zrxTokenAddress = this._zeroEx.exchange.getZRXTokenAddress(); - const txHash = await this._zeroEx.token.transferAsync( - zrxTokenAddress, - configs.DISPENSER_ADDRESS, - recipientAddress, - baseUnitAmount, - ); - utils.consoleLog(`Sent ${DISPENSE_AMOUNT_ZRX} ZRX to ${recipientAddress} tx: ${txHash}`); - } catch (err) { - utils.consoleLog(`Unexpected err: ${err} - ${JSON.stringify(err)}`); - await errorReporter.reportAsync(err); - } - } -} -- cgit v1.2.3