aboutsummaryrefslogtreecommitdiffstats
path: root/packages/pipeline
diff options
context:
space:
mode:
authorXianny <8582774+xianny@users.noreply.github.com>2018-12-11 03:01:35 +0800
committerGitHub <noreply@github.com>2018-12-11 03:01:35 +0800
commit02e14f88d234d720341f163bf77325b82a7f3822 (patch)
tree9803a4d5fbf701a0fab8d95a5b129f749c5260a8 /packages/pipeline
parentd050a1bd534695288b6da0b01e58aba1dff0e63e (diff)
parent096c4c8f2b20b4ca909d4ba950e219c22a5f8882 (diff)
downloaddexon-0x-contracts-02e14f88d234d720341f163bf77325b82a7f3822.tar
dexon-0x-contracts-02e14f88d234d720341f163bf77325b82a7f3822.tar.gz
dexon-0x-contracts-02e14f88d234d720341f163bf77325b82a7f3822.tar.bz2
dexon-0x-contracts-02e14f88d234d720341f163bf77325b82a7f3822.tar.lz
dexon-0x-contracts-02e14f88d234d720341f163bf77325b82a7f3822.tar.xz
dexon-0x-contracts-02e14f88d234d720341f163bf77325b82a7f3822.tar.zst
dexon-0x-contracts-02e14f88d234d720341f163bf77325b82a7f3822.zip
Fix/pipeline/ohlcv ratelimit (#1403)
Use time-based throttling and increase batch size of CC query
Diffstat (limited to 'packages/pipeline')
-rw-r--r--packages/pipeline/package.json2
-rw-r--r--packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts32
-rw-r--r--packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts6
-rw-r--r--packages/pipeline/test/data_sources/ohlcv_external/crypto_compare_test.ts4
4 files changed, 21 insertions, 23 deletions
diff --git a/packages/pipeline/package.json b/packages/pipeline/package.json
index 0539618d4..4fde906b8 100644
--- a/packages/pipeline/package.json
+++ b/packages/pipeline/package.json
@@ -52,9 +52,9 @@
"@types/p-limit": "^2.0.0",
"async-parallel": "^1.2.3",
"axios": "^0.18.0",
+ "bottleneck": "^2.13.2",
"dockerode": "^2.5.7",
"ethereum-types": "^1.0.6",
- "p-limit": "^2.0.0",
"pg": "^7.5.0",
"prettier": "^1.15.3",
"ramda": "^0.25.0",
diff --git a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts
index 8804c34d0..85042501b 100644
--- a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts
+++ b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts
@@ -1,6 +1,6 @@
// tslint:disable:no-duplicate-imports
import { fetchAsync } from '@0x/utils';
-import promiseLimit = require('p-limit');
+import Bottleneck from 'bottleneck';
import { stringify } from 'querystring';
import * as R from 'ramda';
@@ -33,43 +33,41 @@ export interface CryptoCompareOHLCVParams {
toTs?: number;
}
-const ONE_WEEK = 7 * 24 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers
const ONE_HOUR = 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers
const ONE_SECOND = 1000;
const ONE_HOUR_AGO = new Date().getTime() - ONE_HOUR;
const HTTP_OK_STATUS = 200;
const CRYPTO_COMPARE_VALID_EMPTY_RESPONSE_TYPE = 96;
+const MAX_PAGE_SIZE = 2000;
export class CryptoCompareOHLCVSource {
- public readonly interval = ONE_WEEK; // the hourly API returns data for one week at a time
- public readonly default_exchange = 'CCCAGG';
public readonly intervalBetweenRecords = ONE_HOUR;
+ public readonly defaultExchange = 'CCCAGG';
+ public readonly interval = this.intervalBetweenRecords * MAX_PAGE_SIZE; // the hourly API returns data for one interval at a time
private readonly _url: string = 'https://min-api.cryptocompare.com/data/histohour?';
// rate-limit for all API calls through this class instance
- private readonly _promiseLimit: (fetchFn: () => Promise<Response>) => Promise<Response>;
- constructor(maxConcurrentRequests: number = 50) {
- this._promiseLimit = promiseLimit(maxConcurrentRequests);
+ private readonly _limiter: Bottleneck;
+ constructor(maxReqsPerSecond: number) {
+ this._limiter = new Bottleneck({
+ minTime: ONE_SECOND / maxReqsPerSecond,
+ reservoir: 30,
+ reservoirRefreshAmount: 30,
+ reservoirRefreshInterval: ONE_SECOND,
+ });
}
// gets OHLCV records starting from pair.latest
public async getHourlyOHLCVAsync(pair: TradingPair): Promise<CryptoCompareOHLCVRecord[]> {
const params = {
- e: this.default_exchange,
+ e: this.defaultExchange,
fsym: pair.fromSymbol,
tsym: pair.toSymbol,
+ limit: MAX_PAGE_SIZE,
toTs: Math.floor((pair.latestSavedTime + this.interval) / ONE_SECOND), // CryptoCompare uses timestamp in seconds. not ms
};
const url = this._url + stringify(params);
-
- // go through the instance-wide rate-limit
- const fetchPromise: Promise<Response> = this._promiseLimit(() => {
- // tslint:disable-next-line:no-console
- console.log(`Scraping Crypto Compare at ${url}`);
- return fetchAsync(url);
- });
-
- const response = await Promise.resolve(fetchPromise);
+ const response = await this._limiter.schedule(() => fetchAsync(url));
if (response.status !== HTTP_OK_STATUS) {
throw new Error(`HTTP error while scraping Crypto Compare: [${response}]`);
}
diff --git a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts
index 7377a64d8..d44eb5cc6 100644
--- a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts
+++ b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts
@@ -11,7 +11,7 @@ import { fetchOHLCVTradingPairsAsync, TradingPair } from '../utils/get_ohlcv_tra
const SOURCE_NAME = 'CryptoCompare';
const TWO_HOURS_AGO = new Date().getTime() - 2 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers
-const MAX_CONCURRENT_REQUESTS = parseInt(process.env.CRYPTOCOMPARE_MAX_CONCURRENT_REQUESTS || '14', 10); // tslint:disable-line:custom-no-magic-numbers
+const MAX_REQS_PER_SECOND = parseInt(process.env.CRYPTOCOMPARE_MAX_REQS_PER_SECOND || '15', 10); // tslint:disable-line:custom-no-magic-numbers
const EARLIEST_BACKFILL_DATE = process.env.OHLCV_EARLIEST_BACKFILL_DATE || '2014-06-01';
const EARLIEST_BACKFILL_TIME = new Date(EARLIEST_BACKFILL_DATE).getTime();
@@ -20,7 +20,7 @@ let connection: Connection;
(async () => {
connection = await createConnection(ormConfig as ConnectionOptions);
const repository = connection.getRepository(OHLCVExternal);
- const source = new CryptoCompareOHLCVSource(MAX_CONCURRENT_REQUESTS);
+ const source = new CryptoCompareOHLCVSource(MAX_REQS_PER_SECOND);
const jobTime = new Date().getTime();
const tradingPairs = await fetchOHLCVTradingPairsAsync(connection, SOURCE_NAME, EARLIEST_BACKFILL_TIME);
@@ -63,7 +63,7 @@ async function fetchAndSaveAsync(
console.log(`Retrieved ${records.length} records for ${JSON.stringify(pair)}`);
if (records.length > 0) {
const metadata: OHLCVMetadata = {
- exchange: source.default_exchange,
+ exchange: source.defaultExchange,
fromSymbol: pair.fromSymbol,
toSymbol: pair.toSymbol,
source: SOURCE_NAME,
diff --git a/packages/pipeline/test/data_sources/ohlcv_external/crypto_compare_test.ts b/packages/pipeline/test/data_sources/ohlcv_external/crypto_compare_test.ts
index cb374bbb1..2efe3f5ec 100644
--- a/packages/pipeline/test/data_sources/ohlcv_external/crypto_compare_test.ts
+++ b/packages/pipeline/test/data_sources/ohlcv_external/crypto_compare_test.ts
@@ -13,7 +13,7 @@ const expect = chai.expect;
describe('ohlcv_external data source (Crypto Compare)', () => {
describe('generateBackfillIntervals', () => {
it('generates pairs with intervals to query', () => {
- const source = new CryptoCompareOHLCVSource();
+ const source = new CryptoCompareOHLCVSource(20);
const pair: TradingPair = {
fromSymbol: 'ETH',
toSymbol: 'ZRX',
@@ -31,7 +31,7 @@ describe('ohlcv_external data source (Crypto Compare)', () => {
});
it('returns single pair if no backfill is needed', () => {
- const source = new CryptoCompareOHLCVSource();
+ const source = new CryptoCompareOHLCVSource(20);
const pair: TradingPair = {
fromSymbol: 'ETH',
toSymbol: 'ZRX',