From 8c21a700bae0c751f7f9ca47f9a47628a4478911 Mon Sep 17 00:00:00 2001 From: Xianny <8582774+xianny@users.noreply.github.com> Date: Tue, 4 Dec 2018 13:36:18 -0800 Subject: pull OHLCV records from Crypto Compare (#1349) * [WIP] pull OHLCV records from Crypto Compare * lint * refactor to pull logic out of script and into modules * add entity test for ohlcv_external entity * implement rate limit and chronological backfill for ohlcv * add unit tests; cleanup variable names * Fetch OHLCV pairs params from events table * better method names * fix outdated test * lint * Clean up after review * oops * fix failing test * better filtering of most recent records * fix bug when generating pairs * fix default earliest backfill date * fix bug with retrieving backfill time * prettier --- .../data_sources/ohlcv_external/crypto_compare.ts | 94 ++++++++++++++++++++++ 1 file changed, 94 insertions(+) create mode 100644 packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts (limited to 'packages/pipeline/src/data_sources') diff --git a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts new file mode 100644 index 000000000..6b10c29c5 --- /dev/null +++ b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts @@ -0,0 +1,94 @@ +// tslint:disable:no-duplicate-imports +import { fetchAsync } from '@0x/utils'; +import promiseLimit = require('p-limit'); +import { stringify } from 'querystring'; +import * as R from 'ramda'; + +import { TradingPair } from '../../utils/get_ohlcv_trading_pairs'; + +export interface CryptoCompareOHLCVResponse { + Data: Map; + Response: string; + Message: string; +} + +export interface CryptoCompareOHLCVRecord { + time: number; // in seconds, not milliseconds + close: number; + high: number; + low: number; + open: number; + volumefrom: number; + volumeto: number; +} + +export interface CryptoCompareOHLCVParams { + fsym: string; + tsym: string; + e?: string; + aggregate?: string; + aggregatePredictableTimePeriods?: boolean; + limit?: number; + toTs?: number; +} + +const ONE_WEEK = 7 * 24 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers +const ONE_HOUR = 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers +const ONE_SECOND = 1000; +const HTTP_OK_STATUS = 200; + +export class CryptoCompareOHLCVSource { + public readonly interval = ONE_WEEK; // the hourly API returns data for one week at a time + public readonly default_exchange = 'CCCAGG'; + public readonly intervalBetweenRecords = ONE_HOUR; + private readonly _url: string = 'https://min-api.cryptocompare.com/data/histohour?'; + + // rate-limit for all API calls through this class instance + private readonly _promiseLimit: (fetchFn: () => Promise) => Promise; + constructor(maxConcurrentRequests: number = 50) { + this._promiseLimit = promiseLimit(maxConcurrentRequests); + } + + // gets OHLCV records starting from pair.latest + public async getHourlyOHLCVAsync(pair: TradingPair): Promise { + const params = { + e: this.default_exchange, + fsym: pair.fromSymbol, + tsym: pair.toSymbol, + toTs: Math.floor((pair.latestSavedTime + this.interval) / ONE_SECOND), // CryptoCompare uses timestamp in seconds. not ms + }; + const url = this._url + stringify(params); + + // go through the instance-wide rate-limit + const fetchPromise: Promise = this._promiseLimit(() => { + // tslint:disable-next-line:no-console + console.log(`Scraping Crypto Compare at ${url}`); + return fetchAsync(url); + }); + + const response = await Promise.resolve(fetchPromise); + if (response.status !== HTTP_OK_STATUS) { + // tslint:disable-next-line:no-console + console.log(`Error scraping ${url}`); + return []; + } + const json: CryptoCompareOHLCVResponse = await response.json(); + if (json.Response === 'Error' || Object.keys(json.Data).length === 0) { + // tslint:disable-next-line:no-console + console.log(`Error scraping ${url}: ${json.Message}`); + return []; + } + return Object.values(json.Data).filter(rec => rec.time * ONE_SECOND >= pair.latestSavedTime); + } + public generateBackfillIntervals(pair: TradingPair): TradingPair[] { + const now = new Date().getTime(); + const f = (p: TradingPair): false | [TradingPair, TradingPair] => { + if (p.latestSavedTime > now) { + return false; + } else { + return [p, R.merge(p, { latestSavedTime: p.latestSavedTime + this.interval })]; + } + }; + return R.unfold(f, pair); + } +} -- cgit v1.2.3