diff options
Diffstat (limited to 'packages/pipeline')
-rw-r--r-- | packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts | 36 | ||||
-rw-r--r-- | packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts | 38 |
2 files changed, 43 insertions, 31 deletions
diff --git a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts index 6b10c29c5..8804c34d0 100644 --- a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts +++ b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts @@ -7,9 +7,10 @@ import * as R from 'ramda'; import { TradingPair } from '../../utils/get_ohlcv_trading_pairs'; export interface CryptoCompareOHLCVResponse { - Data: Map<string, CryptoCompareOHLCVRecord[]>; + Data: CryptoCompareOHLCVRecord[]; Response: string; Message: string; + Type: number; } export interface CryptoCompareOHLCVRecord { @@ -35,7 +36,9 @@ export interface CryptoCompareOHLCVParams { const ONE_WEEK = 7 * 24 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers const ONE_HOUR = 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers const ONE_SECOND = 1000; +const ONE_HOUR_AGO = new Date().getTime() - ONE_HOUR; const HTTP_OK_STATUS = 200; +const CRYPTO_COMPARE_VALID_EMPTY_RESPONSE_TYPE = 96; export class CryptoCompareOHLCVSource { public readonly interval = ONE_WEEK; // the hourly API returns data for one week at a time @@ -68,17 +71,21 @@ export class CryptoCompareOHLCVSource { const response = await Promise.resolve(fetchPromise); if (response.status !== HTTP_OK_STATUS) { - // tslint:disable-next-line:no-console - console.log(`Error scraping ${url}`); - return []; + throw new Error(`HTTP error while scraping Crypto Compare: [${response}]`); } const json: CryptoCompareOHLCVResponse = await response.json(); - if (json.Response === 'Error' || Object.keys(json.Data).length === 0) { - // tslint:disable-next-line:no-console - console.log(`Error scraping ${url}: ${json.Message}`); - return []; + if ( + (json.Response === 'Error' || json.Data.length === 0) && + json.Type !== CRYPTO_COMPARE_VALID_EMPTY_RESPONSE_TYPE + ) { + throw new Error(JSON.stringify(json)); } - return Object.values(json.Data).filter(rec => rec.time * ONE_SECOND >= pair.latestSavedTime); + return json.Data.filter(rec => { + return ( + // Crypto Compare takes ~30 mins to finalise records + rec.time * ONE_SECOND < ONE_HOUR_AGO && rec.time * ONE_SECOND > pair.latestSavedTime && hasData(rec) + ); + }); } public generateBackfillIntervals(pair: TradingPair): TradingPair[] { const now = new Date().getTime(); @@ -92,3 +99,14 @@ export class CryptoCompareOHLCVSource { return R.unfold(f, pair); } } + +function hasData(record: CryptoCompareOHLCVRecord): boolean { + return ( + record.close !== 0 || + record.open !== 0 || + record.high !== 0 || + record.low !== 0 || + record.volumefrom !== 0 || + record.volumeto !== 0 + ); +} diff --git a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts index 6979cd10e..7377a64d8 100644 --- a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts +++ b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts @@ -10,11 +10,9 @@ import { fetchOHLCVTradingPairsAsync, TradingPair } from '../utils/get_ohlcv_tra const SOURCE_NAME = 'CryptoCompare'; const TWO_HOURS_AGO = new Date().getTime() - 2 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers -const ONE_HOUR_AGO = new Date().getTime() - 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers -const ONE_SECOND = 1000; const MAX_CONCURRENT_REQUESTS = parseInt(process.env.CRYPTOCOMPARE_MAX_CONCURRENT_REQUESTS || '14', 10); // tslint:disable-line:custom-no-magic-numbers -const EARLIEST_BACKFILL_DATE = process.env.OHLCV_EARLIEST_BACKFILL_DATE || '2010-09-01'; // the time when BTC/USD info starts appearing on Crypto Compare +const EARLIEST_BACKFILL_DATE = process.env.OHLCV_EARLIEST_BACKFILL_DATE || '2014-06-01'; const EARLIEST_BACKFILL_TIME = new Date(EARLIEST_BACKFILL_DATE).getTime(); let connection: Connection; @@ -60,28 +58,24 @@ async function fetchAndSaveAsync( if (pair.latestSavedTime > TWO_HOURS_AGO) { break; } - const rawRecords = await source.getHourlyOHLCVAsync(pair); - const records = rawRecords.filter(rec => { - return rec.time * ONE_SECOND < ONE_HOUR_AGO && rec.time * ONE_SECOND > pair.latestSavedTime; - }); // Crypto Compare can take ~30mins to finalise records - if (records.length === 0) { - console.log(`No more records, stopping task for ${JSON.stringify(pair)}`); - break; - } - const metadata: OHLCVMetadata = { - exchange: source.default_exchange, - fromSymbol: pair.fromSymbol, - toSymbol: pair.toSymbol, - source: SOURCE_NAME, - observedTimestamp: jobTime, - interval: source.intervalBetweenRecords, - }; - const parsedRecords = parseRecords(records, metadata); try { - await saveRecordsAsync(repository, parsedRecords); + const records = await source.getHourlyOHLCVAsync(pair); + console.log(`Retrieved ${records.length} records for ${JSON.stringify(pair)}`); + if (records.length > 0) { + const metadata: OHLCVMetadata = { + exchange: source.default_exchange, + fromSymbol: pair.fromSymbol, + toSymbol: pair.toSymbol, + source: SOURCE_NAME, + observedTimestamp: jobTime, + interval: source.intervalBetweenRecords, + }; + const parsedRecords = parseRecords(records, metadata); + await saveRecordsAsync(repository, parsedRecords); + } i++; } catch (err) { - console.log(`Error saving OHLCVRecords, stopping task for ${JSON.stringify(pair)} [${err}]`); + console.log(`Error scraping OHLCVRecords, stopping task for ${JSON.stringify(pair)} [${err}]`); break; } } |