diff options
author | Xianny <8582774+xianny@users.noreply.github.com> | 2018-12-05 05:36:18 +0800 |
---|---|---|
committer | Fred Carlsen <fred@sjelfull.no> | 2018-12-06 19:06:34 +0800 |
commit | 6f5787b2c43957c1c5db5a6123399e8baeb0ed78 (patch) | |
tree | 811b4c2269f716262d1263bf53d0e51d5a8a82a7 /packages/pipeline/src/data_sources | |
parent | f96711bac373ac7caaca647defd68d91ba43a181 (diff) | |
download | dexon-0x-contracts-6f5787b2c43957c1c5db5a6123399e8baeb0ed78.tar dexon-0x-contracts-6f5787b2c43957c1c5db5a6123399e8baeb0ed78.tar.gz dexon-0x-contracts-6f5787b2c43957c1c5db5a6123399e8baeb0ed78.tar.bz2 dexon-0x-contracts-6f5787b2c43957c1c5db5a6123399e8baeb0ed78.tar.lz dexon-0x-contracts-6f5787b2c43957c1c5db5a6123399e8baeb0ed78.tar.xz dexon-0x-contracts-6f5787b2c43957c1c5db5a6123399e8baeb0ed78.tar.zst dexon-0x-contracts-6f5787b2c43957c1c5db5a6123399e8baeb0ed78.zip |
pull OHLCV records from Crypto Compare (#1349)
* [WIP] pull OHLCV records from Crypto Compare
* lint
* refactor to pull logic out of script and into modules
* add entity test for ohlcv_external entity
* implement rate limit and chronological backfill for ohlcv
* add unit tests; cleanup variable names
* Fetch OHLCV pairs params from events table
* better method names
* fix outdated test
* lint
* Clean up after review
* oops
* fix failing test
* better filtering of most recent records
* fix bug when generating pairs
* fix default earliest backfill date
* fix bug with retrieving backfill time
* prettier
Diffstat (limited to 'packages/pipeline/src/data_sources')
-rw-r--r-- | packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts | 94 |
1 files changed, 94 insertions, 0 deletions
diff --git a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts new file mode 100644 index 000000000..6b10c29c5 --- /dev/null +++ b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts @@ -0,0 +1,94 @@ +// tslint:disable:no-duplicate-imports +import { fetchAsync } from '@0x/utils'; +import promiseLimit = require('p-limit'); +import { stringify } from 'querystring'; +import * as R from 'ramda'; + +import { TradingPair } from '../../utils/get_ohlcv_trading_pairs'; + +export interface CryptoCompareOHLCVResponse { + Data: Map<string, CryptoCompareOHLCVRecord[]>; + Response: string; + Message: string; +} + +export interface CryptoCompareOHLCVRecord { + time: number; // in seconds, not milliseconds + close: number; + high: number; + low: number; + open: number; + volumefrom: number; + volumeto: number; +} + +export interface CryptoCompareOHLCVParams { + fsym: string; + tsym: string; + e?: string; + aggregate?: string; + aggregatePredictableTimePeriods?: boolean; + limit?: number; + toTs?: number; +} + +const ONE_WEEK = 7 * 24 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers +const ONE_HOUR = 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers +const ONE_SECOND = 1000; +const HTTP_OK_STATUS = 200; + +export class CryptoCompareOHLCVSource { + public readonly interval = ONE_WEEK; // the hourly API returns data for one week at a time + public readonly default_exchange = 'CCCAGG'; + public readonly intervalBetweenRecords = ONE_HOUR; + private readonly _url: string = 'https://min-api.cryptocompare.com/data/histohour?'; + + // rate-limit for all API calls through this class instance + private readonly _promiseLimit: (fetchFn: () => Promise<Response>) => Promise<Response>; + constructor(maxConcurrentRequests: number = 50) { + this._promiseLimit = promiseLimit(maxConcurrentRequests); + } + + // gets OHLCV records starting from pair.latest + public async getHourlyOHLCVAsync(pair: TradingPair): Promise<CryptoCompareOHLCVRecord[]> { + const params = { + e: this.default_exchange, + fsym: pair.fromSymbol, + tsym: pair.toSymbol, + toTs: Math.floor((pair.latestSavedTime + this.interval) / ONE_SECOND), // CryptoCompare uses timestamp in seconds. not ms + }; + const url = this._url + stringify(params); + + // go through the instance-wide rate-limit + const fetchPromise: Promise<Response> = this._promiseLimit(() => { + // tslint:disable-next-line:no-console + console.log(`Scraping Crypto Compare at ${url}`); + return fetchAsync(url); + }); + + const response = await Promise.resolve(fetchPromise); + if (response.status !== HTTP_OK_STATUS) { + // tslint:disable-next-line:no-console + console.log(`Error scraping ${url}`); + return []; + } + const json: CryptoCompareOHLCVResponse = await response.json(); + if (json.Response === 'Error' || Object.keys(json.Data).length === 0) { + // tslint:disable-next-line:no-console + console.log(`Error scraping ${url}: ${json.Message}`); + return []; + } + return Object.values(json.Data).filter(rec => rec.time * ONE_SECOND >= pair.latestSavedTime); + } + public generateBackfillIntervals(pair: TradingPair): TradingPair[] { + const now = new Date().getTime(); + const f = (p: TradingPair): false | [TradingPair, TradingPair] => { + if (p.latestSavedTime > now) { + return false; + } else { + return [p, R.merge(p, { latestSavedTime: p.latestSavedTime + this.interval })]; + } + }; + return R.unfold(f, pair); + } +} |