From 78d0ab1aa2393b7a0b21108b9811e0b0a4406faf Mon Sep 17 00:00:00 2001 From: Xianny <8582774+xianny@users.noreply.github.com> Date: Wed, 5 Dec 2018 16:05:06 -0800 Subject: Fix/pipeline/ohlcv (#1393) The OHLCV script in data pipeline quits early when we get no data from Crypto Compare. Sometimes Crypto Compare gives us a valid empty response (e.g. when we query for way back in time) and we need to just continue. This adds better filtering for the types of Crypto Compare responses to detect when we should continue and when we should really quit. --- .../data_sources/ohlcv_external/crypto_compare.ts | 36 +++++++++++++++----- .../src/scripts/pull_ohlcv_cryptocompare.ts | 38 +++++++++------------- 2 files changed, 43 insertions(+), 31 deletions(-) diff --git a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts index 6b10c29c5..8804c34d0 100644 --- a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts +++ b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts @@ -7,9 +7,10 @@ import * as R from 'ramda'; import { TradingPair } from '../../utils/get_ohlcv_trading_pairs'; export interface CryptoCompareOHLCVResponse { - Data: Map; + Data: CryptoCompareOHLCVRecord[]; Response: string; Message: string; + Type: number; } export interface CryptoCompareOHLCVRecord { @@ -35,7 +36,9 @@ export interface CryptoCompareOHLCVParams { const ONE_WEEK = 7 * 24 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers const ONE_HOUR = 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers const ONE_SECOND = 1000; +const ONE_HOUR_AGO = new Date().getTime() - ONE_HOUR; const HTTP_OK_STATUS = 200; +const CRYPTO_COMPARE_VALID_EMPTY_RESPONSE_TYPE = 96; export class CryptoCompareOHLCVSource { public readonly interval = ONE_WEEK; // the hourly API returns data for one week at a time @@ -68,17 +71,21 @@ export class CryptoCompareOHLCVSource { const response = await Promise.resolve(fetchPromise); if (response.status !== HTTP_OK_STATUS) { - // tslint:disable-next-line:no-console - console.log(`Error scraping ${url}`); - return []; + throw new Error(`HTTP error while scraping Crypto Compare: [${response}]`); } const json: CryptoCompareOHLCVResponse = await response.json(); - if (json.Response === 'Error' || Object.keys(json.Data).length === 0) { - // tslint:disable-next-line:no-console - console.log(`Error scraping ${url}: ${json.Message}`); - return []; + if ( + (json.Response === 'Error' || json.Data.length === 0) && + json.Type !== CRYPTO_COMPARE_VALID_EMPTY_RESPONSE_TYPE + ) { + throw new Error(JSON.stringify(json)); } - return Object.values(json.Data).filter(rec => rec.time * ONE_SECOND >= pair.latestSavedTime); + return json.Data.filter(rec => { + return ( + // Crypto Compare takes ~30 mins to finalise records + rec.time * ONE_SECOND < ONE_HOUR_AGO && rec.time * ONE_SECOND > pair.latestSavedTime && hasData(rec) + ); + }); } public generateBackfillIntervals(pair: TradingPair): TradingPair[] { const now = new Date().getTime(); @@ -92,3 +99,14 @@ export class CryptoCompareOHLCVSource { return R.unfold(f, pair); } } + +function hasData(record: CryptoCompareOHLCVRecord): boolean { + return ( + record.close !== 0 || + record.open !== 0 || + record.high !== 0 || + record.low !== 0 || + record.volumefrom !== 0 || + record.volumeto !== 0 + ); +} diff --git a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts index 6979cd10e..7377a64d8 100644 --- a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts +++ b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts @@ -10,11 +10,9 @@ import { fetchOHLCVTradingPairsAsync, TradingPair } from '../utils/get_ohlcv_tra const SOURCE_NAME = 'CryptoCompare'; const TWO_HOURS_AGO = new Date().getTime() - 2 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers -const ONE_HOUR_AGO = new Date().getTime() - 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers -const ONE_SECOND = 1000; const MAX_CONCURRENT_REQUESTS = parseInt(process.env.CRYPTOCOMPARE_MAX_CONCURRENT_REQUESTS || '14', 10); // tslint:disable-line:custom-no-magic-numbers -const EARLIEST_BACKFILL_DATE = process.env.OHLCV_EARLIEST_BACKFILL_DATE || '2010-09-01'; // the time when BTC/USD info starts appearing on Crypto Compare +const EARLIEST_BACKFILL_DATE = process.env.OHLCV_EARLIEST_BACKFILL_DATE || '2014-06-01'; const EARLIEST_BACKFILL_TIME = new Date(EARLIEST_BACKFILL_DATE).getTime(); let connection: Connection; @@ -60,28 +58,24 @@ async function fetchAndSaveAsync( if (pair.latestSavedTime > TWO_HOURS_AGO) { break; } - const rawRecords = await source.getHourlyOHLCVAsync(pair); - const records = rawRecords.filter(rec => { - return rec.time * ONE_SECOND < ONE_HOUR_AGO && rec.time * ONE_SECOND > pair.latestSavedTime; - }); // Crypto Compare can take ~30mins to finalise records - if (records.length === 0) { - console.log(`No more records, stopping task for ${JSON.stringify(pair)}`); - break; - } - const metadata: OHLCVMetadata = { - exchange: source.default_exchange, - fromSymbol: pair.fromSymbol, - toSymbol: pair.toSymbol, - source: SOURCE_NAME, - observedTimestamp: jobTime, - interval: source.intervalBetweenRecords, - }; - const parsedRecords = parseRecords(records, metadata); try { - await saveRecordsAsync(repository, parsedRecords); + const records = await source.getHourlyOHLCVAsync(pair); + console.log(`Retrieved ${records.length} records for ${JSON.stringify(pair)}`); + if (records.length > 0) { + const metadata: OHLCVMetadata = { + exchange: source.default_exchange, + fromSymbol: pair.fromSymbol, + toSymbol: pair.toSymbol, + source: SOURCE_NAME, + observedTimestamp: jobTime, + interval: source.intervalBetweenRecords, + }; + const parsedRecords = parseRecords(records, metadata); + await saveRecordsAsync(repository, parsedRecords); + } i++; } catch (err) { - console.log(`Error saving OHLCVRecords, stopping task for ${JSON.stringify(pair)} [${err}]`); + console.log(`Error scraping OHLCVRecords, stopping task for ${JSON.stringify(pair)} [${err}]`); break; } } -- cgit v1.2.3