aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBrandon Millman <brandon@0xproject.com>2018-02-08 03:16:24 +0800
committerGitHub <noreply@github.com>2018-02-08 03:16:24 +0800
commit852811b314de3d62ee9068e3c5cbcfaa0650b058 (patch)
tree2740ad3525a18986ebf09e7e24bba2a755fc35f4
parente443cb2a3b58003c5e0dffc0ea2aaa955fc12366 (diff)
parent2404ff030422e5ad021536135d57a19e98bedcd8 (diff)
downloaddexon-sol-tools-852811b314de3d62ee9068e3c5cbcfaa0650b058.tar
dexon-sol-tools-852811b314de3d62ee9068e3c5cbcfaa0650b058.tar.gz
dexon-sol-tools-852811b314de3d62ee9068e3c5cbcfaa0650b058.tar.bz2
dexon-sol-tools-852811b314de3d62ee9068e3c5cbcfaa0650b058.tar.lz
dexon-sol-tools-852811b314de3d62ee9068e3c5cbcfaa0650b058.tar.xz
dexon-sol-tools-852811b314de3d62ee9068e3c5cbcfaa0650b058.tar.zst
dexon-sol-tools-852811b314de3d62ee9068e3c5cbcfaa0650b058.zip
Merge pull request #375 from 0xProject/feature/testnet-faucets/queue-by-network
Organize async task queues by network
-rw-r--r--packages/testnet-faucets/src/ts/dispatch_queue.ts54
-rw-r--r--packages/testnet-faucets/src/ts/dispense_asset_tasks.ts44
-rw-r--r--packages/testnet-faucets/src/ts/error_reporter.ts3
-rw-r--r--packages/testnet-faucets/src/ts/ether_request_queue.ts27
-rw-r--r--packages/testnet-faucets/src/ts/handler.ts97
-rw-r--r--packages/testnet-faucets/src/ts/request_queue.ts51
-rw-r--r--packages/testnet-faucets/src/ts/zrx_request_queue.ts43
7 files changed, 150 insertions, 169 deletions
diff --git a/packages/testnet-faucets/src/ts/dispatch_queue.ts b/packages/testnet-faucets/src/ts/dispatch_queue.ts
new file mode 100644
index 000000000..672511619
--- /dev/null
+++ b/packages/testnet-faucets/src/ts/dispatch_queue.ts
@@ -0,0 +1,54 @@
+import { intervalUtils } from '@0xproject/utils';
+import * as _ from 'lodash';
+
+import { errorReporter } from './error_reporter';
+import { utils } from './utils';
+
+const MAX_QUEUE_SIZE = 500;
+const DEFAULT_QUEUE_INTERVAL_MS = 1000;
+
+export class DispatchQueue {
+ private _queueIntervalMs: number;
+ private _queue: Array<() => Promise<void>>;
+ private _queueIntervalIdIfExists?: NodeJS.Timer;
+ constructor() {
+ this._queueIntervalMs = DEFAULT_QUEUE_INTERVAL_MS;
+ this._queue = [];
+ this._start();
+ }
+ public add(taskAsync: () => Promise<void>): boolean {
+ if (this.isFull()) {
+ return false;
+ }
+ this._queue.push(taskAsync);
+ return true;
+ }
+ public size(): number {
+ return this._queue.length;
+ }
+ public isFull(): boolean {
+ return this.size() >= MAX_QUEUE_SIZE;
+ }
+ public stop() {
+ if (!_.isUndefined(this._queueIntervalIdIfExists)) {
+ intervalUtils.clearAsyncExcludingInterval(this._queueIntervalIdIfExists);
+ }
+ }
+ private _start() {
+ this._queueIntervalIdIfExists = intervalUtils.setAsyncExcludingInterval(
+ async () => {
+ const taskAsync = this._queue.shift();
+ if (_.isUndefined(taskAsync)) {
+ return Promise.resolve();
+ }
+ await taskAsync();
+ },
+ this._queueIntervalMs,
+ (err: Error) => {
+ utils.consoleLog(`Unexpected err: ${err} - ${JSON.stringify(err)}`);
+ // tslint:disable-next-line:no-floating-promises
+ errorReporter.reportAsync(err);
+ },
+ );
+ }
+}
diff --git a/packages/testnet-faucets/src/ts/dispense_asset_tasks.ts b/packages/testnet-faucets/src/ts/dispense_asset_tasks.ts
new file mode 100644
index 000000000..9aa47463c
--- /dev/null
+++ b/packages/testnet-faucets/src/ts/dispense_asset_tasks.ts
@@ -0,0 +1,44 @@
+import { ZeroEx } from '0x.js';
+import { BigNumber, promisify } from '@0xproject/utils';
+import * as _ from 'lodash';
+import * as Web3 from 'web3';
+
+import { configs } from './configs';
+import { errorReporter } from './error_reporter';
+import { utils } from './utils';
+
+const DISPENSE_AMOUNT_ETHER = 0.1;
+const DISPENSE_AMOUNT_TOKEN = 0.1;
+
+export const dispenseAssetTasks = {
+ dispenseEtherTask(recipientAddress: string, web3: Web3) {
+ return async () => {
+ utils.consoleLog(`Processing ETH ${recipientAddress}`);
+ const sendTransactionAsync = promisify(web3.eth.sendTransaction);
+ const txHash = await sendTransactionAsync({
+ from: configs.DISPENSER_ADDRESS,
+ to: recipientAddress,
+ value: web3.toWei(DISPENSE_AMOUNT_ETHER, 'ether'),
+ });
+ utils.consoleLog(`Sent ${DISPENSE_AMOUNT_ETHER} ETH to ${recipientAddress} tx: ${txHash}`);
+ };
+ },
+ dispenseTokenTask(recipientAddress: string, tokenSymbol: string, zeroEx: ZeroEx) {
+ return async () => {
+ utils.consoleLog(`Processing ${tokenSymbol} ${recipientAddress}`);
+ const amountToDispense = new BigNumber(DISPENSE_AMOUNT_TOKEN);
+ const token = await zeroEx.tokenRegistry.getTokenBySymbolIfExistsAsync(tokenSymbol);
+ if (_.isUndefined(token)) {
+ throw new Error(`Unsupported asset type: ${tokenSymbol}`);
+ }
+ const baseUnitAmount = ZeroEx.toBaseUnitAmount(amountToDispense, token.decimals);
+ const txHash = await zeroEx.token.transferAsync(
+ token.address,
+ configs.DISPENSER_ADDRESS,
+ recipientAddress,
+ baseUnitAmount,
+ );
+ utils.consoleLog(`Sent ${amountToDispense} ZRX to ${recipientAddress} tx: ${txHash}`);
+ };
+ },
+};
diff --git a/packages/testnet-faucets/src/ts/error_reporter.ts b/packages/testnet-faucets/src/ts/error_reporter.ts
index 6865d3893..7fd76bde5 100644
--- a/packages/testnet-faucets/src/ts/error_reporter.ts
+++ b/packages/testnet-faucets/src/ts/error_reporter.ts
@@ -9,9 +9,7 @@ export const errorReporter = {
rollbar.init(configs.ROLLBAR_ACCESS_KEY, {
environment: configs.ENVIRONMENT,
});
-
rollbar.handleUncaughtExceptions(configs.ROLLBAR_ACCESS_KEY);
-
process.on('unhandledRejection', async (err: Error) => {
utils.consoleLog(`Uncaught exception ${err}. Stack: ${err.stack}`);
await this.reportAsync(err);
@@ -22,7 +20,6 @@ export const errorReporter = {
if (configs.ENVIRONMENT === 'development') {
return; // Do not log development environment errors
}
-
return new Promise((resolve, reject) => {
rollbar.handleError(err, req, (rollbarErr: Error) => {
if (rollbarErr) {
diff --git a/packages/testnet-faucets/src/ts/ether_request_queue.ts b/packages/testnet-faucets/src/ts/ether_request_queue.ts
deleted file mode 100644
index 710d49f39..000000000
--- a/packages/testnet-faucets/src/ts/ether_request_queue.ts
+++ /dev/null
@@ -1,27 +0,0 @@
-import { promisify } from '@0xproject/utils';
-import * as _ from 'lodash';
-
-import { configs } from './configs';
-import { errorReporter } from './error_reporter';
-import { RequestQueue } from './request_queue';
-import { utils } from './utils';
-
-const DISPENSE_AMOUNT_ETHER = 0.1;
-
-export class EtherRequestQueue extends RequestQueue {
- protected async _processNextRequestFireAndForgetAsync(recipientAddress: string) {
- utils.consoleLog(`Processing ETH ${recipientAddress}`);
- const sendTransactionAsync = promisify(this._web3.eth.sendTransaction);
- try {
- const txHash = await sendTransactionAsync({
- from: configs.DISPENSER_ADDRESS,
- to: recipientAddress,
- value: this._web3.toWei(DISPENSE_AMOUNT_ETHER, 'ether'),
- });
- utils.consoleLog(`Sent ${DISPENSE_AMOUNT_ETHER} ETH to ${recipientAddress} tx: ${txHash}`);
- } catch (err) {
- utils.consoleLog(`Unexpected err: ${err} - ${JSON.stringify(err)}`);
- await errorReporter.reportAsync(err);
- }
- }
-}
diff --git a/packages/testnet-faucets/src/ts/handler.ts b/packages/testnet-faucets/src/ts/handler.ts
index d96b90802..5ed9ce628 100644
--- a/packages/testnet-faucets/src/ts/handler.ts
+++ b/packages/testnet-faucets/src/ts/handler.ts
@@ -1,5 +1,4 @@
import { Order, SignedOrder, ZeroEx } from '0x.js';
-import { NonceTrackerSubprovider } from '@0xproject/subproviders';
import { BigNumber } from '@0xproject/utils';
import * as express from 'express';
import * as _ from 'lodash';
@@ -10,17 +9,23 @@ import * as Web3 from 'web3';
// we are not running in a browser env.
// Filed issue: https://github.com/ethereum/web3.js/issues/844
(global as any).XMLHttpRequest = undefined;
+import { NonceTrackerSubprovider } from '@0xproject/subproviders';
import ProviderEngine = require('web3-provider-engine');
import HookedWalletSubprovider = require('web3-provider-engine/subproviders/hooked-wallet');
import RpcSubprovider = require('web3-provider-engine/subproviders/rpc');
import { configs } from './configs';
-import { EtherRequestQueue } from './ether_request_queue';
+import { DispatchQueue } from './dispatch_queue';
+import { dispenseAssetTasks } from './dispense_asset_tasks';
import { idManagement } from './id_management';
-import { RequestQueue } from './request_queue';
import { rpcUrls } from './rpc_urls';
import { utils } from './utils';
-import { ZRXRequestQueue } from './zrx_request_queue';
+
+interface NetworkConfig {
+ dispatchQueue: DispatchQueue;
+ web3: Web3;
+ zeroEx: ZeroEx;
+}
interface ItemByNetworkId<T> {
[networkId: string]: T;
@@ -35,30 +40,7 @@ enum RequestedAssetType {
const FIVE_DAYS_IN_MS = 4.32e8; // TODO: make this configurable
export class Handler {
- private _zeroExByNetworkId: ItemByNetworkId<ZeroEx> = {};
- private _etherRequestQueueByNetworkId: ItemByNetworkId<RequestQueue> = {};
- private _zrxRequestQueueByNetworkId: ItemByNetworkId<RequestQueue> = {};
- private static _dispenseAsset(
- req: express.Request,
- res: express.Response,
- requestQueueByNetworkId: ItemByNetworkId<RequestQueue>,
- requestedAssetType: RequestedAssetType,
- ) {
- const requestQueue = _.get(requestQueueByNetworkId, req.params.networkId);
- if (_.isUndefined(requestQueue)) {
- res.status(400).send('UNSUPPORTED_NETWORK_ID');
- return;
- }
- const didAddToQueue = requestQueue.add(req.params.recipient);
- if (!didAddToQueue) {
- res.status(503).send('QUEUE_IS_FULL');
- return;
- }
- utils.consoleLog(
- `Added ${req.params.recipient} to queue: ${requestedAssetType} networkId: ${req.params.networkId}`,
- );
- res.status(200).end();
- }
+ private _networkConfigByNetworkId: ItemByNetworkId<NetworkConfig> = {};
private static _createProviderEngine(rpcUrl: string) {
const engine = new ProviderEngine();
engine.addProvider(new NonceTrackerSubprovider());
@@ -79,35 +61,31 @@ export class Handler {
networkId: +networkId,
};
const zeroEx = new ZeroEx(web3.currentProvider, zeroExConfig);
- this._zeroExByNetworkId[networkId] = zeroEx;
- this._etherRequestQueueByNetworkId[networkId] = new EtherRequestQueue(web3);
- this._zrxRequestQueueByNetworkId[networkId] = new ZRXRequestQueue(web3, zeroEx);
+ const dispatchQueue = new DispatchQueue();
+ this._networkConfigByNetworkId[networkId] = {
+ dispatchQueue,
+ web3,
+ zeroEx,
+ };
});
}
public getQueueInfo(req: express.Request, res: express.Response) {
res.setHeader('Content-Type', 'application/json');
const queueInfo = _.mapValues(rpcUrls, (rpcUrl: string, networkId: string) => {
- const etherRequestQueue = this._etherRequestQueueByNetworkId[networkId];
- const zrxRequestQueue = this._zrxRequestQueueByNetworkId[networkId];
+ const dispatchQueue = this._networkConfigByNetworkId[networkId].dispatchQueue;
return {
- ether: {
- full: etherRequestQueue.isFull(),
- size: etherRequestQueue.size(),
- },
- zrx: {
- full: zrxRequestQueue.isFull(),
- size: zrxRequestQueue.size(),
- },
+ full: dispatchQueue.isFull(),
+ size: dispatchQueue.size(),
};
});
const payload = JSON.stringify(queueInfo);
res.status(200).send(payload);
}
public dispenseEther(req: express.Request, res: express.Response) {
- Handler._dispenseAsset(req, res, this._etherRequestQueueByNetworkId, RequestedAssetType.ETH);
+ this._dispenseAsset(req, res, RequestedAssetType.ETH);
}
public dispenseZRX(req: express.Request, res: express.Response) {
- Handler._dispenseAsset(req, res, this._zrxRequestQueueByNetworkId, RequestedAssetType.ZRX);
+ this._dispenseAsset(req, res, RequestedAssetType.ZRX);
}
public async dispenseWETHOrder(req: express.Request, res: express.Response) {
await this._dispenseOrder(req, res, RequestedAssetType.WETH);
@@ -115,12 +93,41 @@ export class Handler {
public async dispenseZRXOrder(req: express.Request, res: express.Response, next: express.NextFunction) {
await this._dispenseOrder(req, res, RequestedAssetType.ZRX);
}
+ private _dispenseAsset(req: express.Request, res: express.Response, requestedAssetType: RequestedAssetType) {
+ const networkId = req.params.networkId;
+ const recipient = req.params.recipient;
+ const networkConfig = this._networkConfigByNetworkId[networkId];
+ let dispenserTask;
+ switch (requestedAssetType) {
+ case RequestedAssetType.ETH:
+ dispenserTask = dispenseAssetTasks.dispenseEtherTask(recipient, networkConfig.web3);
+ break;
+ case RequestedAssetType.WETH:
+ case RequestedAssetType.ZRX:
+ dispenserTask = dispenseAssetTasks.dispenseTokenTask(
+ recipient,
+ requestedAssetType,
+ networkConfig.zeroEx,
+ );
+ break;
+ default:
+ throw new Error(`Unsupported asset type: ${requestedAssetType}`);
+ }
+ const didAddToQueue = networkConfig.dispatchQueue.add(dispenserTask);
+ if (!didAddToQueue) {
+ res.status(503).send('QUEUE_IS_FULL');
+ return;
+ }
+ utils.consoleLog(`Added ${recipient} to queue: ${requestedAssetType} networkId: ${networkId}`);
+ res.status(200).end();
+ }
private async _dispenseOrder(req: express.Request, res: express.Response, requestedAssetType: RequestedAssetType) {
- const zeroEx = _.get(this._zeroExByNetworkId, req.params.networkId);
- if (_.isUndefined(zeroEx)) {
+ const networkConfig = _.get(this._networkConfigByNetworkId, req.params.networkId);
+ if (_.isUndefined(networkConfig)) {
res.status(400).send('UNSUPPORTED_NETWORK_ID');
return;
}
+ const zeroEx = networkConfig.zeroEx;
res.setHeader('Content-Type', 'application/json');
const makerTokenAddress = await zeroEx.tokenRegistry.getTokenAddressBySymbolIfExistsAsync(requestedAssetType);
if (_.isUndefined(makerTokenAddress)) {
diff --git a/packages/testnet-faucets/src/ts/request_queue.ts b/packages/testnet-faucets/src/ts/request_queue.ts
deleted file mode 100644
index f128528a5..000000000
--- a/packages/testnet-faucets/src/ts/request_queue.ts
+++ /dev/null
@@ -1,51 +0,0 @@
-import * as _ from 'lodash';
-import * as timers from 'timers';
-
-// HACK: web3 leaks XMLHttpRequest into the global scope and causes requests to hang
-// because they are using the wrong XHR package.
-// Filed issue: https://github.com/ethereum/web3.js/issues/844
-// tslint:disable-next-line:ordered-imports
-import * as Web3 from 'web3';
-
-const MAX_QUEUE_SIZE = 500;
-const DEFAULT_QUEUE_INTERVAL_MS = 1000;
-
-export class RequestQueue {
- protected _queueIntervalMs: number;
- protected _queue: string[];
- protected _queueIntervalId?: NodeJS.Timer;
- protected _web3: Web3;
- constructor(web3: any) {
- this._queueIntervalMs = DEFAULT_QUEUE_INTERVAL_MS;
- this._queue = [];
- this._web3 = web3;
- this._start();
- }
- public add(recipientAddress: string): boolean {
- if (this.isFull()) {
- return false;
- }
- this._queue.push(recipientAddress);
- return true;
- }
- public size(): number {
- return this._queue.length;
- }
- public isFull(): boolean {
- return this.size() >= MAX_QUEUE_SIZE;
- }
- protected _start() {
- this._queueIntervalId = timers.setInterval(() => {
- const recipientAddress = this._queue.shift();
- if (_.isUndefined(recipientAddress)) {
- return;
- }
- // tslint:disable-next-line:no-floating-promises
- this._processNextRequestFireAndForgetAsync(recipientAddress);
- }, this._queueIntervalMs);
- }
- // tslint:disable-next-line:prefer-function-over-method
- protected async _processNextRequestFireAndForgetAsync(recipientAddress: string) {
- throw new Error('Expected processNextRequestFireAndForgetAsync to be implemented by a subclass');
- }
-}
diff --git a/packages/testnet-faucets/src/ts/zrx_request_queue.ts b/packages/testnet-faucets/src/ts/zrx_request_queue.ts
deleted file mode 100644
index 3659f4856..000000000
--- a/packages/testnet-faucets/src/ts/zrx_request_queue.ts
+++ /dev/null
@@ -1,43 +0,0 @@
-import { ZeroEx } from '0x.js';
-import { BigNumber } from '@0xproject/utils';
-import * as _ from 'lodash';
-
-import { configs } from './configs';
-import { errorReporter } from './error_reporter';
-import { RequestQueue } from './request_queue';
-import { utils } from './utils';
-
-// HACK: web3 leaks XMLHttpRequest into the global scope and causes requests to hang
-// because they are using the wrong XHR package.
-// Filed issue: https://github.com/ethereum/web3.js/issues/844
-// tslint:disable-next-line:ordered-imports
-import * as Web3 from 'web3';
-
-const DISPENSE_AMOUNT_ZRX = new BigNumber(0.1);
-const QUEUE_INTERVAL_MS = 5000;
-
-export class ZRXRequestQueue extends RequestQueue {
- private _zeroEx: ZeroEx;
- constructor(web3: Web3, zeroEx: ZeroEx) {
- super(web3);
- this._queueIntervalMs = QUEUE_INTERVAL_MS;
- this._zeroEx = zeroEx;
- }
- protected async _processNextRequestFireAndForgetAsync(recipientAddress: string) {
- utils.consoleLog(`Processing ZRX ${recipientAddress}`);
- const baseUnitAmount = ZeroEx.toBaseUnitAmount(DISPENSE_AMOUNT_ZRX, 18);
- try {
- const zrxTokenAddress = this._zeroEx.exchange.getZRXTokenAddress();
- const txHash = await this._zeroEx.token.transferAsync(
- zrxTokenAddress,
- configs.DISPENSER_ADDRESS,
- recipientAddress,
- baseUnitAmount,
- );
- utils.consoleLog(`Sent ${DISPENSE_AMOUNT_ZRX} ZRX to ${recipientAddress} tx: ${txHash}`);
- } catch (err) {
- utils.consoleLog(`Unexpected err: ${err} - ${JSON.stringify(err)}`);
- await errorReporter.reportAsync(err);
- }
- }
-}