aboutsummaryrefslogtreecommitdiffstats
path: root/packages/pipeline/src/data_sources
diff options
context:
space:
mode:
authorXianny <8582774+xianny@users.noreply.github.com>2018-12-05 05:36:18 +0800
committerAlex Browne <stephenalexbrowne@gmail.com>2018-12-05 06:26:03 +0800
commit8c21a700bae0c751f7f9ca47f9a47628a4478911 (patch)
tree2c3efb6ba20987ffaa68c39a3246ed7802849479 /packages/pipeline/src/data_sources
parent87ffa5d7ab19d2288bf68131a7e7ec77578c564c (diff)
downloaddexon-sol-tools-8c21a700bae0c751f7f9ca47f9a47628a4478911.tar
dexon-sol-tools-8c21a700bae0c751f7f9ca47f9a47628a4478911.tar.gz
dexon-sol-tools-8c21a700bae0c751f7f9ca47f9a47628a4478911.tar.bz2
dexon-sol-tools-8c21a700bae0c751f7f9ca47f9a47628a4478911.tar.lz
dexon-sol-tools-8c21a700bae0c751f7f9ca47f9a47628a4478911.tar.xz
dexon-sol-tools-8c21a700bae0c751f7f9ca47f9a47628a4478911.tar.zst
dexon-sol-tools-8c21a700bae0c751f7f9ca47f9a47628a4478911.zip
pull OHLCV records from Crypto Compare (#1349)
* [WIP] pull OHLCV records from Crypto Compare * lint * refactor to pull logic out of script and into modules * add entity test for ohlcv_external entity * implement rate limit and chronological backfill for ohlcv * add unit tests; cleanup variable names * Fetch OHLCV pairs params from events table * better method names * fix outdated test * lint * Clean up after review * oops * fix failing test * better filtering of most recent records * fix bug when generating pairs * fix default earliest backfill date * fix bug with retrieving backfill time * prettier
Diffstat (limited to 'packages/pipeline/src/data_sources')
-rw-r--r--packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts94
1 files changed, 94 insertions, 0 deletions
diff --git a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts
new file mode 100644
index 000000000..6b10c29c5
--- /dev/null
+++ b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts
@@ -0,0 +1,94 @@
+// tslint:disable:no-duplicate-imports
+import { fetchAsync } from '@0x/utils';
+import promiseLimit = require('p-limit');
+import { stringify } from 'querystring';
+import * as R from 'ramda';
+
+import { TradingPair } from '../../utils/get_ohlcv_trading_pairs';
+
+export interface CryptoCompareOHLCVResponse {
+ Data: Map<string, CryptoCompareOHLCVRecord[]>;
+ Response: string;
+ Message: string;
+}
+
+export interface CryptoCompareOHLCVRecord {
+ time: number; // in seconds, not milliseconds
+ close: number;
+ high: number;
+ low: number;
+ open: number;
+ volumefrom: number;
+ volumeto: number;
+}
+
+export interface CryptoCompareOHLCVParams {
+ fsym: string;
+ tsym: string;
+ e?: string;
+ aggregate?: string;
+ aggregatePredictableTimePeriods?: boolean;
+ limit?: number;
+ toTs?: number;
+}
+
+const ONE_WEEK = 7 * 24 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers
+const ONE_HOUR = 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers
+const ONE_SECOND = 1000;
+const HTTP_OK_STATUS = 200;
+
+export class CryptoCompareOHLCVSource {
+ public readonly interval = ONE_WEEK; // the hourly API returns data for one week at a time
+ public readonly default_exchange = 'CCCAGG';
+ public readonly intervalBetweenRecords = ONE_HOUR;
+ private readonly _url: string = 'https://min-api.cryptocompare.com/data/histohour?';
+
+ // rate-limit for all API calls through this class instance
+ private readonly _promiseLimit: (fetchFn: () => Promise<Response>) => Promise<Response>;
+ constructor(maxConcurrentRequests: number = 50) {
+ this._promiseLimit = promiseLimit(maxConcurrentRequests);
+ }
+
+ // gets OHLCV records starting from pair.latest
+ public async getHourlyOHLCVAsync(pair: TradingPair): Promise<CryptoCompareOHLCVRecord[]> {
+ const params = {
+ e: this.default_exchange,
+ fsym: pair.fromSymbol,
+ tsym: pair.toSymbol,
+ toTs: Math.floor((pair.latestSavedTime + this.interval) / ONE_SECOND), // CryptoCompare uses timestamp in seconds. not ms
+ };
+ const url = this._url + stringify(params);
+
+ // go through the instance-wide rate-limit
+ const fetchPromise: Promise<Response> = this._promiseLimit(() => {
+ // tslint:disable-next-line:no-console
+ console.log(`Scraping Crypto Compare at ${url}`);
+ return fetchAsync(url);
+ });
+
+ const response = await Promise.resolve(fetchPromise);
+ if (response.status !== HTTP_OK_STATUS) {
+ // tslint:disable-next-line:no-console
+ console.log(`Error scraping ${url}`);
+ return [];
+ }
+ const json: CryptoCompareOHLCVResponse = await response.json();
+ if (json.Response === 'Error' || Object.keys(json.Data).length === 0) {
+ // tslint:disable-next-line:no-console
+ console.log(`Error scraping ${url}: ${json.Message}`);
+ return [];
+ }
+ return Object.values(json.Data).filter(rec => rec.time * ONE_SECOND >= pair.latestSavedTime);
+ }
+ public generateBackfillIntervals(pair: TradingPair): TradingPair[] {
+ const now = new Date().getTime();
+ const f = (p: TradingPair): false | [TradingPair, TradingPair] => {
+ if (p.latestSavedTime > now) {
+ return false;
+ } else {
+ return [p, R.merge(p, { latestSavedTime: p.latestSavedTime + this.interval })];
+ }
+ };
+ return R.unfold(f, pair);
+ }
+}