From ac971685b32328fb0e99715c0dde2a10fdf3158a Mon Sep 17 00:00:00 2001 From: xianny Date: Thu, 6 Dec 2018 12:07:55 -0800 Subject: upgrade throttling code --- packages/pipeline/package.json | 2 +- .../data_sources/ohlcv_external/crypto_compare.ts | 24 ++++++++++------------ 2 files changed, 12 insertions(+), 14 deletions(-) (limited to 'packages') diff --git a/packages/pipeline/package.json b/packages/pipeline/package.json index 0539618d4..4fde906b8 100644 --- a/packages/pipeline/package.json +++ b/packages/pipeline/package.json @@ -52,9 +52,9 @@ "@types/p-limit": "^2.0.0", "async-parallel": "^1.2.3", "axios": "^0.18.0", + "bottleneck": "^2.13.2", "dockerode": "^2.5.7", "ethereum-types": "^1.0.6", - "p-limit": "^2.0.0", "pg": "^7.5.0", "prettier": "^1.15.3", "ramda": "^0.25.0", diff --git a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts index 8804c34d0..e83e3b67d 100644 --- a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts +++ b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts @@ -1,6 +1,6 @@ // tslint:disable:no-duplicate-imports import { fetchAsync } from '@0x/utils'; -import promiseLimit = require('p-limit'); +import Bottleneck from 'bottleneck'; import { stringify } from 'querystring'; import * as R from 'ramda'; @@ -35,6 +35,7 @@ export interface CryptoCompareOHLCVParams { const ONE_WEEK = 7 * 24 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers const ONE_HOUR = 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers +const ONE_MINUTE = 60 * 1000; // tslint:disable-line:custom-no-magic-numbers const ONE_SECOND = 1000; const ONE_HOUR_AGO = new Date().getTime() - ONE_HOUR; const HTTP_OK_STATUS = 200; @@ -47,9 +48,14 @@ export class CryptoCompareOHLCVSource { private readonly _url: string = 'https://min-api.cryptocompare.com/data/histohour?'; // rate-limit for all API calls through this class instance - private readonly _promiseLimit: (fetchFn: () => Promise) => Promise; - constructor(maxConcurrentRequests: number = 50) { - this._promiseLimit = promiseLimit(maxConcurrentRequests); + private readonly _limiter: Bottleneck; + constructor(maxReqsPerSecond: number = 40) { + this._limiter = new Bottleneck({ + minTime: Math.ceil(ONE_SECOND / maxReqsPerSecond), + reservoir: 2000, + reservoirRefreshAmount: 2000, + reservoirRefreshInterval: ONE_MINUTE, + }); } // gets OHLCV records starting from pair.latest @@ -61,15 +67,7 @@ export class CryptoCompareOHLCVSource { toTs: Math.floor((pair.latestSavedTime + this.interval) / ONE_SECOND), // CryptoCompare uses timestamp in seconds. not ms }; const url = this._url + stringify(params); - - // go through the instance-wide rate-limit - const fetchPromise: Promise = this._promiseLimit(() => { - // tslint:disable-next-line:no-console - console.log(`Scraping Crypto Compare at ${url}`); - return fetchAsync(url); - }); - - const response = await Promise.resolve(fetchPromise); + const response = await this._limiter.schedule(() => fetchAsync(url)); if (response.status !== HTTP_OK_STATUS) { throw new Error(`HTTP error while scraping Crypto Compare: [${response}]`); } -- cgit v1.2.3 From 8b3b4d983f0f942fc4e42365d4cfcaf793d1d283 Mon Sep 17 00:00:00 2001 From: xianny Date: Thu, 6 Dec 2018 13:20:08 -0800 Subject: rename variable and define default in only 1 location --- packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts | 5 +++-- packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) (limited to 'packages') diff --git a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts index e83e3b67d..998ef6bf3 100644 --- a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts +++ b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts @@ -49,13 +49,14 @@ export class CryptoCompareOHLCVSource { // rate-limit for all API calls through this class instance private readonly _limiter: Bottleneck; - constructor(maxReqsPerSecond: number = 40) { + constructor(maxReqsPerSecond: number) { this._limiter = new Bottleneck({ - minTime: Math.ceil(ONE_SECOND / maxReqsPerSecond), + minTime: ONE_SECOND / maxReqsPerSecond, reservoir: 2000, reservoirRefreshAmount: 2000, reservoirRefreshInterval: ONE_MINUTE, }); + console.log('mintime', Math.ceil(ONE_SECOND / maxReqsPerSecond)); // tslint:disable-line:no-console } // gets OHLCV records starting from pair.latest diff --git a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts index 7377a64d8..a29a13bfc 100644 --- a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts +++ b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts @@ -11,7 +11,7 @@ import { fetchOHLCVTradingPairsAsync, TradingPair } from '../utils/get_ohlcv_tra const SOURCE_NAME = 'CryptoCompare'; const TWO_HOURS_AGO = new Date().getTime() - 2 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers -const MAX_CONCURRENT_REQUESTS = parseInt(process.env.CRYPTOCOMPARE_MAX_CONCURRENT_REQUESTS || '14', 10); // tslint:disable-line:custom-no-magic-numbers +const MAX_REQS_PER_SECOND = parseInt(process.env.CRYPTOCOMPARE_MAX_REQS_PER_SECOND || '15', 10); // tslint:disable-line:custom-no-magic-numbers const EARLIEST_BACKFILL_DATE = process.env.OHLCV_EARLIEST_BACKFILL_DATE || '2014-06-01'; const EARLIEST_BACKFILL_TIME = new Date(EARLIEST_BACKFILL_DATE).getTime(); @@ -20,7 +20,7 @@ let connection: Connection; (async () => { connection = await createConnection(ormConfig as ConnectionOptions); const repository = connection.getRepository(OHLCVExternal); - const source = new CryptoCompareOHLCVSource(MAX_CONCURRENT_REQUESTS); + const source = new CryptoCompareOHLCVSource(MAX_REQS_PER_SECOND); const jobTime = new Date().getTime(); const tradingPairs = await fetchOHLCVTradingPairsAsync(connection, SOURCE_NAME, EARLIEST_BACKFILL_TIME); -- cgit v1.2.3 From 7b1471ffe80661a209ef3b4b9d4b67116708c210 Mon Sep 17 00:00:00 2001 From: xianny Date: Thu, 6 Dec 2018 14:28:22 -0800 Subject: query CC with larger batch size --- packages/pipeline/package.json | 1 + .../pipeline/src/data_sources/ohlcv_external/crypto_compare.ts | 7 ++++--- .../test/data_sources/ohlcv_external/crypto_compare_test.ts | 4 ++-- 3 files changed, 7 insertions(+), 5 deletions(-) (limited to 'packages') diff --git a/packages/pipeline/package.json b/packages/pipeline/package.json index 4fde906b8..5c95344b8 100644 --- a/packages/pipeline/package.json +++ b/packages/pipeline/package.json @@ -57,6 +57,7 @@ "ethereum-types": "^1.0.6", "pg": "^7.5.0", "prettier": "^1.15.3", + "pw-app-sdk": "^0.3.4", "ramda": "^0.25.0", "reflect-metadata": "^0.1.12", "sqlite3": "^4.0.2", diff --git a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts index 998ef6bf3..16f205629 100644 --- a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts +++ b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts @@ -33,18 +33,18 @@ export interface CryptoCompareOHLCVParams { toTs?: number; } -const ONE_WEEK = 7 * 24 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers const ONE_HOUR = 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers const ONE_MINUTE = 60 * 1000; // tslint:disable-line:custom-no-magic-numbers const ONE_SECOND = 1000; const ONE_HOUR_AGO = new Date().getTime() - ONE_HOUR; const HTTP_OK_STATUS = 200; const CRYPTO_COMPARE_VALID_EMPTY_RESPONSE_TYPE = 96; +const MAX_LIMIT = 2000; export class CryptoCompareOHLCVSource { - public readonly interval = ONE_WEEK; // the hourly API returns data for one week at a time - public readonly default_exchange = 'CCCAGG'; public readonly intervalBetweenRecords = ONE_HOUR; + public readonly default_exchange = 'CCCAGG'; + public readonly interval = ONE_HOUR * MAX_LIMIT; // the hourly API returns data for one interval at a time private readonly _url: string = 'https://min-api.cryptocompare.com/data/histohour?'; // rate-limit for all API calls through this class instance @@ -65,6 +65,7 @@ export class CryptoCompareOHLCVSource { e: this.default_exchange, fsym: pair.fromSymbol, tsym: pair.toSymbol, + limit: MAX_LIMIT, toTs: Math.floor((pair.latestSavedTime + this.interval) / ONE_SECOND), // CryptoCompare uses timestamp in seconds. not ms }; const url = this._url + stringify(params); diff --git a/packages/pipeline/test/data_sources/ohlcv_external/crypto_compare_test.ts b/packages/pipeline/test/data_sources/ohlcv_external/crypto_compare_test.ts index cb374bbb1..2efe3f5ec 100644 --- a/packages/pipeline/test/data_sources/ohlcv_external/crypto_compare_test.ts +++ b/packages/pipeline/test/data_sources/ohlcv_external/crypto_compare_test.ts @@ -13,7 +13,7 @@ const expect = chai.expect; describe('ohlcv_external data source (Crypto Compare)', () => { describe('generateBackfillIntervals', () => { it('generates pairs with intervals to query', () => { - const source = new CryptoCompareOHLCVSource(); + const source = new CryptoCompareOHLCVSource(20); const pair: TradingPair = { fromSymbol: 'ETH', toSymbol: 'ZRX', @@ -31,7 +31,7 @@ describe('ohlcv_external data source (Crypto Compare)', () => { }); it('returns single pair if no backfill is needed', () => { - const source = new CryptoCompareOHLCVSource(); + const source = new CryptoCompareOHLCVSource(20); const pair: TradingPair = { fromSymbol: 'ETH', toSymbol: 'ZRX', -- cgit v1.2.3 From 41f90c697b0870ae3c3727c866241b25545f88ba Mon Sep 17 00:00:00 2001 From: xianny Date: Fri, 7 Dec 2018 09:48:26 -0800 Subject: cleanup: stray import, rename variable --- packages/pipeline/package.json | 1 - .../pipeline/src/data_sources/ohlcv_external/crypto_compare.ts | 7 +++---- 2 files changed, 3 insertions(+), 5 deletions(-) (limited to 'packages') diff --git a/packages/pipeline/package.json b/packages/pipeline/package.json index 5c95344b8..4fde906b8 100644 --- a/packages/pipeline/package.json +++ b/packages/pipeline/package.json @@ -57,7 +57,6 @@ "ethereum-types": "^1.0.6", "pg": "^7.5.0", "prettier": "^1.15.3", - "pw-app-sdk": "^0.3.4", "ramda": "^0.25.0", "reflect-metadata": "^0.1.12", "sqlite3": "^4.0.2", diff --git a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts index 16f205629..0cfec4ef5 100644 --- a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts +++ b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts @@ -39,12 +39,12 @@ const ONE_SECOND = 1000; const ONE_HOUR_AGO = new Date().getTime() - ONE_HOUR; const HTTP_OK_STATUS = 200; const CRYPTO_COMPARE_VALID_EMPTY_RESPONSE_TYPE = 96; -const MAX_LIMIT = 2000; +const MAX_PAGE_SIZE = 2000; export class CryptoCompareOHLCVSource { public readonly intervalBetweenRecords = ONE_HOUR; public readonly default_exchange = 'CCCAGG'; - public readonly interval = ONE_HOUR * MAX_LIMIT; // the hourly API returns data for one interval at a time + public readonly interval = this.intervalBetweenRecords * MAX_PAGE_SIZE; // the hourly API returns data for one interval at a time private readonly _url: string = 'https://min-api.cryptocompare.com/data/histohour?'; // rate-limit for all API calls through this class instance @@ -56,7 +56,6 @@ export class CryptoCompareOHLCVSource { reservoirRefreshAmount: 2000, reservoirRefreshInterval: ONE_MINUTE, }); - console.log('mintime', Math.ceil(ONE_SECOND / maxReqsPerSecond)); // tslint:disable-line:no-console } // gets OHLCV records starting from pair.latest @@ -65,7 +64,7 @@ export class CryptoCompareOHLCVSource { e: this.default_exchange, fsym: pair.fromSymbol, tsym: pair.toSymbol, - limit: MAX_LIMIT, + limit: MAX_PAGE_SIZE, toTs: Math.floor((pair.latestSavedTime + this.interval) / ONE_SECOND), // CryptoCompare uses timestamp in seconds. not ms }; const url = this._url + stringify(params); -- cgit v1.2.3 From 2bea70a0a61a868bf951b46731ab5dea53c0aaec Mon Sep 17 00:00:00 2001 From: xianny Date: Fri, 7 Dec 2018 09:53:49 -0800 Subject: refresh rate limit every second instead of every minute --- packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'packages') diff --git a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts index 0cfec4ef5..29cef65b0 100644 --- a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts +++ b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts @@ -52,9 +52,9 @@ export class CryptoCompareOHLCVSource { constructor(maxReqsPerSecond: number) { this._limiter = new Bottleneck({ minTime: ONE_SECOND / maxReqsPerSecond, - reservoir: 2000, - reservoirRefreshAmount: 2000, - reservoirRefreshInterval: ONE_MINUTE, + reservoir: 30, + reservoirRefreshAmount: 30, + reservoirRefreshInterval: ONE_SECOND, }); } -- cgit v1.2.3 From 5febb595e929d736f4f0d72ef43a840109e9863a Mon Sep 17 00:00:00 2001 From: xianny Date: Fri, 7 Dec 2018 10:03:17 -0800 Subject: lint: remove unused variable --- packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts | 1 - 1 file changed, 1 deletion(-) (limited to 'packages') diff --git a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts index 29cef65b0..44448d241 100644 --- a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts +++ b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts @@ -34,7 +34,6 @@ export interface CryptoCompareOHLCVParams { } const ONE_HOUR = 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers -const ONE_MINUTE = 60 * 1000; // tslint:disable-line:custom-no-magic-numbers const ONE_SECOND = 1000; const ONE_HOUR_AGO = new Date().getTime() - ONE_HOUR; const HTTP_OK_STATUS = 200; -- cgit v1.2.3 From 096c4c8f2b20b4ca909d4ba950e219c22a5f8882 Mon Sep 17 00:00:00 2001 From: xianny Date: Mon, 10 Dec 2018 10:57:36 -0800 Subject: change to camelCase --- packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts | 4 ++-- packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'packages') diff --git a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts index 44448d241..85042501b 100644 --- a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts +++ b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts @@ -42,7 +42,7 @@ const MAX_PAGE_SIZE = 2000; export class CryptoCompareOHLCVSource { public readonly intervalBetweenRecords = ONE_HOUR; - public readonly default_exchange = 'CCCAGG'; + public readonly defaultExchange = 'CCCAGG'; public readonly interval = this.intervalBetweenRecords * MAX_PAGE_SIZE; // the hourly API returns data for one interval at a time private readonly _url: string = 'https://min-api.cryptocompare.com/data/histohour?'; @@ -60,7 +60,7 @@ export class CryptoCompareOHLCVSource { // gets OHLCV records starting from pair.latest public async getHourlyOHLCVAsync(pair: TradingPair): Promise { const params = { - e: this.default_exchange, + e: this.defaultExchange, fsym: pair.fromSymbol, tsym: pair.toSymbol, limit: MAX_PAGE_SIZE, diff --git a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts index a29a13bfc..d44eb5cc6 100644 --- a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts +++ b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts @@ -63,7 +63,7 @@ async function fetchAndSaveAsync( console.log(`Retrieved ${records.length} records for ${JSON.stringify(pair)}`); if (records.length > 0) { const metadata: OHLCVMetadata = { - exchange: source.default_exchange, + exchange: source.defaultExchange, fromSymbol: pair.fromSymbol, toSymbol: pair.toSymbol, source: SOURCE_NAME, -- cgit v1.2.3