aboutsummaryrefslogtreecommitdiffstats
path: root/packages/pipeline/src/scripts
diff options
context:
space:
mode:
authorXianny <8582774+xianny@users.noreply.github.com>2018-12-05 05:36:18 +0800
committerFred Carlsen <fred@sjelfull.no>2018-12-06 19:06:34 +0800
commit6f5787b2c43957c1c5db5a6123399e8baeb0ed78 (patch)
tree811b4c2269f716262d1263bf53d0e51d5a8a82a7 /packages/pipeline/src/scripts
parentf96711bac373ac7caaca647defd68d91ba43a181 (diff)
downloaddexon-sol-tools-6f5787b2c43957c1c5db5a6123399e8baeb0ed78.tar
dexon-sol-tools-6f5787b2c43957c1c5db5a6123399e8baeb0ed78.tar.gz
dexon-sol-tools-6f5787b2c43957c1c5db5a6123399e8baeb0ed78.tar.bz2
dexon-sol-tools-6f5787b2c43957c1c5db5a6123399e8baeb0ed78.tar.lz
dexon-sol-tools-6f5787b2c43957c1c5db5a6123399e8baeb0ed78.tar.xz
dexon-sol-tools-6f5787b2c43957c1c5db5a6123399e8baeb0ed78.tar.zst
dexon-sol-tools-6f5787b2c43957c1c5db5a6123399e8baeb0ed78.zip
pull OHLCV records from Crypto Compare (#1349)
* [WIP] pull OHLCV records from Crypto Compare * lint * refactor to pull logic out of script and into modules * add entity test for ohlcv_external entity * implement rate limit and chronological backfill for ohlcv * add unit tests; cleanup variable names * Fetch OHLCV pairs params from events table * better method names * fix outdated test * lint * Clean up after review * oops * fix failing test * better filtering of most recent records * fix bug when generating pairs * fix default earliest backfill date * fix bug with retrieving backfill time * prettier
Diffstat (limited to 'packages/pipeline/src/scripts')
-rw-r--r--packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts101
1 files changed, 101 insertions, 0 deletions
diff --git a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts
new file mode 100644
index 000000000..6979cd10e
--- /dev/null
+++ b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts
@@ -0,0 +1,101 @@
+// tslint:disable:no-console
+import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm';
+
+import { CryptoCompareOHLCVSource } from '../data_sources/ohlcv_external/crypto_compare';
+import { OHLCVExternal } from '../entities';
+import * as ormConfig from '../ormconfig';
+import { OHLCVMetadata, parseRecords } from '../parsers/ohlcv_external/crypto_compare';
+import { handleError } from '../utils';
+import { fetchOHLCVTradingPairsAsync, TradingPair } from '../utils/get_ohlcv_trading_pairs';
+
+const SOURCE_NAME = 'CryptoCompare';
+const TWO_HOURS_AGO = new Date().getTime() - 2 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers
+const ONE_HOUR_AGO = new Date().getTime() - 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers
+const ONE_SECOND = 1000;
+
+const MAX_CONCURRENT_REQUESTS = parseInt(process.env.CRYPTOCOMPARE_MAX_CONCURRENT_REQUESTS || '14', 10); // tslint:disable-line:custom-no-magic-numbers
+const EARLIEST_BACKFILL_DATE = process.env.OHLCV_EARLIEST_BACKFILL_DATE || '2010-09-01'; // the time when BTC/USD info starts appearing on Crypto Compare
+const EARLIEST_BACKFILL_TIME = new Date(EARLIEST_BACKFILL_DATE).getTime();
+
+let connection: Connection;
+
+(async () => {
+ connection = await createConnection(ormConfig as ConnectionOptions);
+ const repository = connection.getRepository(OHLCVExternal);
+ const source = new CryptoCompareOHLCVSource(MAX_CONCURRENT_REQUESTS);
+
+ const jobTime = new Date().getTime();
+ const tradingPairs = await fetchOHLCVTradingPairsAsync(connection, SOURCE_NAME, EARLIEST_BACKFILL_TIME);
+ console.log(`Starting ${tradingPairs.length} job(s) to scrape Crypto Compare for OHLCV records...`);
+
+ const fetchAndSavePromises = tradingPairs.map(async pair => {
+ const pairs = source.generateBackfillIntervals(pair);
+ return fetchAndSaveAsync(source, repository, jobTime, pairs);
+ });
+ await Promise.all(fetchAndSavePromises);
+ console.log(`Finished scraping OHLCV records from Crypto Compare, exiting...`);
+ process.exit(0);
+})().catch(handleError);
+
+async function fetchAndSaveAsync(
+ source: CryptoCompareOHLCVSource,
+ repository: Repository<OHLCVExternal>,
+ jobTime: number,
+ pairs: TradingPair[],
+): Promise<void> {
+ const sortAscTimestamp = (a: TradingPair, b: TradingPair): number => {
+ if (a.latestSavedTime < b.latestSavedTime) {
+ return -1;
+ } else if (a.latestSavedTime > b.latestSavedTime) {
+ return 1;
+ } else {
+ return 0;
+ }
+ };
+ pairs.sort(sortAscTimestamp);
+
+ let i = 0;
+ while (i < pairs.length) {
+ const pair = pairs[i];
+ if (pair.latestSavedTime > TWO_HOURS_AGO) {
+ break;
+ }
+ const rawRecords = await source.getHourlyOHLCVAsync(pair);
+ const records = rawRecords.filter(rec => {
+ return rec.time * ONE_SECOND < ONE_HOUR_AGO && rec.time * ONE_SECOND > pair.latestSavedTime;
+ }); // Crypto Compare can take ~30mins to finalise records
+ if (records.length === 0) {
+ console.log(`No more records, stopping task for ${JSON.stringify(pair)}`);
+ break;
+ }
+ const metadata: OHLCVMetadata = {
+ exchange: source.default_exchange,
+ fromSymbol: pair.fromSymbol,
+ toSymbol: pair.toSymbol,
+ source: SOURCE_NAME,
+ observedTimestamp: jobTime,
+ interval: source.intervalBetweenRecords,
+ };
+ const parsedRecords = parseRecords(records, metadata);
+ try {
+ await saveRecordsAsync(repository, parsedRecords);
+ i++;
+ } catch (err) {
+ console.log(`Error saving OHLCVRecords, stopping task for ${JSON.stringify(pair)} [${err}]`);
+ break;
+ }
+ }
+ return Promise.resolve();
+}
+
+async function saveRecordsAsync(repository: Repository<OHLCVExternal>, records: OHLCVExternal[]): Promise<void> {
+ const metadata = [
+ records[0].fromSymbol,
+ records[0].toSymbol,
+ new Date(records[0].startTime),
+ new Date(records[records.length - 1].endTime),
+ ];
+
+ console.log(`Saving ${records.length} records to ${repository.metadata.name}... ${JSON.stringify(metadata)}`);
+ await repository.save(records);
+}