aboutsummaryrefslogtreecommitdiffstats
path: root/packages/pipeline/src
diff options
context:
space:
mode:
authorXianny <8582774+xianny@users.noreply.github.com>2018-12-05 05:36:18 +0800
committerAlex Browne <stephenalexbrowne@gmail.com>2018-12-05 06:26:03 +0800
commit8c21a700bae0c751f7f9ca47f9a47628a4478911 (patch)
tree2c3efb6ba20987ffaa68c39a3246ed7802849479 /packages/pipeline/src
parent87ffa5d7ab19d2288bf68131a7e7ec77578c564c (diff)
downloaddexon-sol-tools-8c21a700bae0c751f7f9ca47f9a47628a4478911.tar
dexon-sol-tools-8c21a700bae0c751f7f9ca47f9a47628a4478911.tar.gz
dexon-sol-tools-8c21a700bae0c751f7f9ca47f9a47628a4478911.tar.bz2
dexon-sol-tools-8c21a700bae0c751f7f9ca47f9a47628a4478911.tar.lz
dexon-sol-tools-8c21a700bae0c751f7f9ca47f9a47628a4478911.tar.xz
dexon-sol-tools-8c21a700bae0c751f7f9ca47f9a47628a4478911.tar.zst
dexon-sol-tools-8c21a700bae0c751f7f9ca47f9a47628a4478911.zip
pull OHLCV records from Crypto Compare (#1349)
* [WIP] pull OHLCV records from Crypto Compare * lint * refactor to pull logic out of script and into modules * add entity test for ohlcv_external entity * implement rate limit and chronological backfill for ohlcv * add unit tests; cleanup variable names * Fetch OHLCV pairs params from events table * better method names * fix outdated test * lint * Clean up after review * oops * fix failing test * better filtering of most recent records * fix bug when generating pairs * fix default earliest backfill date * fix bug with retrieving backfill time * prettier
Diffstat (limited to 'packages/pipeline/src')
-rw-r--r--packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts94
-rw-r--r--packages/pipeline/src/entities/ohlcv_external.ts24
-rw-r--r--packages/pipeline/src/ormconfig.ts2
-rw-r--r--packages/pipeline/src/parsers/ohlcv_external/crypto_compare.ts38
-rw-r--r--packages/pipeline/src/parsers/ohlcv_external/index.ts0
-rw-r--r--packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts101
-rw-r--r--packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts92
-rw-r--r--packages/pipeline/src/utils/index.ts9
8 files changed, 344 insertions, 16 deletions
diff --git a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts
new file mode 100644
index 000000000..6b10c29c5
--- /dev/null
+++ b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts
@@ -0,0 +1,94 @@
+// tslint:disable:no-duplicate-imports
+import { fetchAsync } from '@0x/utils';
+import promiseLimit = require('p-limit');
+import { stringify } from 'querystring';
+import * as R from 'ramda';
+
+import { TradingPair } from '../../utils/get_ohlcv_trading_pairs';
+
+export interface CryptoCompareOHLCVResponse {
+ Data: Map<string, CryptoCompareOHLCVRecord[]>;
+ Response: string;
+ Message: string;
+}
+
+export interface CryptoCompareOHLCVRecord {
+ time: number; // in seconds, not milliseconds
+ close: number;
+ high: number;
+ low: number;
+ open: number;
+ volumefrom: number;
+ volumeto: number;
+}
+
+export interface CryptoCompareOHLCVParams {
+ fsym: string;
+ tsym: string;
+ e?: string;
+ aggregate?: string;
+ aggregatePredictableTimePeriods?: boolean;
+ limit?: number;
+ toTs?: number;
+}
+
+const ONE_WEEK = 7 * 24 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers
+const ONE_HOUR = 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers
+const ONE_SECOND = 1000;
+const HTTP_OK_STATUS = 200;
+
+export class CryptoCompareOHLCVSource {
+ public readonly interval = ONE_WEEK; // the hourly API returns data for one week at a time
+ public readonly default_exchange = 'CCCAGG';
+ public readonly intervalBetweenRecords = ONE_HOUR;
+ private readonly _url: string = 'https://min-api.cryptocompare.com/data/histohour?';
+
+ // rate-limit for all API calls through this class instance
+ private readonly _promiseLimit: (fetchFn: () => Promise<Response>) => Promise<Response>;
+ constructor(maxConcurrentRequests: number = 50) {
+ this._promiseLimit = promiseLimit(maxConcurrentRequests);
+ }
+
+ // gets OHLCV records starting from pair.latest
+ public async getHourlyOHLCVAsync(pair: TradingPair): Promise<CryptoCompareOHLCVRecord[]> {
+ const params = {
+ e: this.default_exchange,
+ fsym: pair.fromSymbol,
+ tsym: pair.toSymbol,
+ toTs: Math.floor((pair.latestSavedTime + this.interval) / ONE_SECOND), // CryptoCompare uses timestamp in seconds. not ms
+ };
+ const url = this._url + stringify(params);
+
+ // go through the instance-wide rate-limit
+ const fetchPromise: Promise<Response> = this._promiseLimit(() => {
+ // tslint:disable-next-line:no-console
+ console.log(`Scraping Crypto Compare at ${url}`);
+ return fetchAsync(url);
+ });
+
+ const response = await Promise.resolve(fetchPromise);
+ if (response.status !== HTTP_OK_STATUS) {
+ // tslint:disable-next-line:no-console
+ console.log(`Error scraping ${url}`);
+ return [];
+ }
+ const json: CryptoCompareOHLCVResponse = await response.json();
+ if (json.Response === 'Error' || Object.keys(json.Data).length === 0) {
+ // tslint:disable-next-line:no-console
+ console.log(`Error scraping ${url}: ${json.Message}`);
+ return [];
+ }
+ return Object.values(json.Data).filter(rec => rec.time * ONE_SECOND >= pair.latestSavedTime);
+ }
+ public generateBackfillIntervals(pair: TradingPair): TradingPair[] {
+ const now = new Date().getTime();
+ const f = (p: TradingPair): false | [TradingPair, TradingPair] => {
+ if (p.latestSavedTime > now) {
+ return false;
+ } else {
+ return [p, R.merge(p, { latestSavedTime: p.latestSavedTime + this.interval })];
+ }
+ };
+ return R.unfold(f, pair);
+ }
+}
diff --git a/packages/pipeline/src/entities/ohlcv_external.ts b/packages/pipeline/src/entities/ohlcv_external.ts
index 95cd4f2f5..4f55dd930 100644
--- a/packages/pipeline/src/entities/ohlcv_external.ts
+++ b/packages/pipeline/src/entities/ohlcv_external.ts
@@ -1,20 +1,30 @@
import { Column, Entity, PrimaryColumn } from 'typeorm';
+import { numberToBigIntTransformer } from '../utils';
+
@Entity({ name: 'ohlcv_external', schema: 'raw' })
export class OHLCVExternal {
@PrimaryColumn() public exchange!: string;
- @PrimaryColumn() public fromSymbol!: string;
- @PrimaryColumn() public toSymbol!: string;
- @PrimaryColumn() public startTime!: number;
- @PrimaryColumn() public endTime!: number;
+
+ @PrimaryColumn({ name: 'from_symbol', type: 'varchar' })
+ public fromSymbol!: string;
+ @PrimaryColumn({ name: 'to_symbol', type: 'varchar' })
+ public toSymbol!: string;
+ @PrimaryColumn({ name: 'start_time', transformer: numberToBigIntTransformer })
+ public startTime!: number;
+ @PrimaryColumn({ name: 'end_time', transformer: numberToBigIntTransformer })
+ public endTime!: number;
@Column() public open!: number;
@Column() public close!: number;
@Column() public low!: number;
@Column() public high!: number;
- @Column() public volumeFrom!: number;
- @Column() public volumeTo!: number;
+ @Column({ name: 'volume_from' })
+ public volumeFrom!: number;
+ @Column({ name: 'volume_to' })
+ public volumeTo!: number;
@PrimaryColumn() public source!: string;
- @PrimaryColumn() public observedTimestamp!: number;
+ @PrimaryColumn({ name: 'observed_timestamp', transformer: numberToBigIntTransformer })
+ public observedTimestamp!: number;
}
diff --git a/packages/pipeline/src/ormconfig.ts b/packages/pipeline/src/ormconfig.ts
index c135c399b..9f7815b4e 100644
--- a/packages/pipeline/src/ormconfig.ts
+++ b/packages/pipeline/src/ormconfig.ts
@@ -6,6 +6,7 @@ import {
ExchangeCancelEvent,
ExchangeCancelUpToEvent,
ExchangeFillEvent,
+ OHLCVExternal,
Relayer,
SraOrder,
SraOrdersObservedTimeStamp,
@@ -20,6 +21,7 @@ const entities = [
ExchangeCancelEvent,
ExchangeCancelUpToEvent,
ExchangeFillEvent,
+ OHLCVExternal,
Relayer,
SraOrder,
SraOrdersObservedTimeStamp,
diff --git a/packages/pipeline/src/parsers/ohlcv_external/crypto_compare.ts b/packages/pipeline/src/parsers/ohlcv_external/crypto_compare.ts
new file mode 100644
index 000000000..3efb90384
--- /dev/null
+++ b/packages/pipeline/src/parsers/ohlcv_external/crypto_compare.ts
@@ -0,0 +1,38 @@
+import { CryptoCompareOHLCVRecord } from '../../data_sources/ohlcv_external/crypto_compare';
+import { OHLCVExternal } from '../../entities';
+
+const ONE_SECOND = 1000; // Crypto Compare uses timestamps in seconds instead of milliseconds
+
+export interface OHLCVMetadata {
+ exchange: string;
+ fromSymbol: string;
+ toSymbol: string;
+ source: string;
+ observedTimestamp: number;
+ interval: number;
+}
+/**
+ * Parses OHLCV records from Crypto Compare into an array of OHLCVExternal entities
+ * @param rawRecords an array of OHLCV records from Crypto Compare (not the full response)
+ */
+export function parseRecords(rawRecords: CryptoCompareOHLCVRecord[], metadata: OHLCVMetadata): OHLCVExternal[] {
+ return rawRecords.map(rec => {
+ const ohlcvRecord = new OHLCVExternal();
+ ohlcvRecord.exchange = metadata.exchange;
+ ohlcvRecord.fromSymbol = metadata.fromSymbol;
+ ohlcvRecord.toSymbol = metadata.toSymbol;
+ ohlcvRecord.startTime = rec.time * ONE_SECOND - metadata.interval;
+ ohlcvRecord.endTime = rec.time * ONE_SECOND;
+
+ ohlcvRecord.open = rec.open;
+ ohlcvRecord.close = rec.close;
+ ohlcvRecord.low = rec.low;
+ ohlcvRecord.high = rec.high;
+ ohlcvRecord.volumeFrom = rec.volumefrom;
+ ohlcvRecord.volumeTo = rec.volumeto;
+
+ ohlcvRecord.source = metadata.source;
+ ohlcvRecord.observedTimestamp = metadata.observedTimestamp;
+ return ohlcvRecord;
+ });
+}
diff --git a/packages/pipeline/src/parsers/ohlcv_external/index.ts b/packages/pipeline/src/parsers/ohlcv_external/index.ts
deleted file mode 100644
index e69de29bb..000000000
--- a/packages/pipeline/src/parsers/ohlcv_external/index.ts
+++ /dev/null
diff --git a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts
new file mode 100644
index 000000000..6979cd10e
--- /dev/null
+++ b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts
@@ -0,0 +1,101 @@
+// tslint:disable:no-console
+import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm';
+
+import { CryptoCompareOHLCVSource } from '../data_sources/ohlcv_external/crypto_compare';
+import { OHLCVExternal } from '../entities';
+import * as ormConfig from '../ormconfig';
+import { OHLCVMetadata, parseRecords } from '../parsers/ohlcv_external/crypto_compare';
+import { handleError } from '../utils';
+import { fetchOHLCVTradingPairsAsync, TradingPair } from '../utils/get_ohlcv_trading_pairs';
+
+const SOURCE_NAME = 'CryptoCompare';
+const TWO_HOURS_AGO = new Date().getTime() - 2 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers
+const ONE_HOUR_AGO = new Date().getTime() - 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers
+const ONE_SECOND = 1000;
+
+const MAX_CONCURRENT_REQUESTS = parseInt(process.env.CRYPTOCOMPARE_MAX_CONCURRENT_REQUESTS || '14', 10); // tslint:disable-line:custom-no-magic-numbers
+const EARLIEST_BACKFILL_DATE = process.env.OHLCV_EARLIEST_BACKFILL_DATE || '2010-09-01'; // the time when BTC/USD info starts appearing on Crypto Compare
+const EARLIEST_BACKFILL_TIME = new Date(EARLIEST_BACKFILL_DATE).getTime();
+
+let connection: Connection;
+
+(async () => {
+ connection = await createConnection(ormConfig as ConnectionOptions);
+ const repository = connection.getRepository(OHLCVExternal);
+ const source = new CryptoCompareOHLCVSource(MAX_CONCURRENT_REQUESTS);
+
+ const jobTime = new Date().getTime();
+ const tradingPairs = await fetchOHLCVTradingPairsAsync(connection, SOURCE_NAME, EARLIEST_BACKFILL_TIME);
+ console.log(`Starting ${tradingPairs.length} job(s) to scrape Crypto Compare for OHLCV records...`);
+
+ const fetchAndSavePromises = tradingPairs.map(async pair => {
+ const pairs = source.generateBackfillIntervals(pair);
+ return fetchAndSaveAsync(source, repository, jobTime, pairs);
+ });
+ await Promise.all(fetchAndSavePromises);
+ console.log(`Finished scraping OHLCV records from Crypto Compare, exiting...`);
+ process.exit(0);
+})().catch(handleError);
+
+async function fetchAndSaveAsync(
+ source: CryptoCompareOHLCVSource,
+ repository: Repository<OHLCVExternal>,
+ jobTime: number,
+ pairs: TradingPair[],
+): Promise<void> {
+ const sortAscTimestamp = (a: TradingPair, b: TradingPair): number => {
+ if (a.latestSavedTime < b.latestSavedTime) {
+ return -1;
+ } else if (a.latestSavedTime > b.latestSavedTime) {
+ return 1;
+ } else {
+ return 0;
+ }
+ };
+ pairs.sort(sortAscTimestamp);
+
+ let i = 0;
+ while (i < pairs.length) {
+ const pair = pairs[i];
+ if (pair.latestSavedTime > TWO_HOURS_AGO) {
+ break;
+ }
+ const rawRecords = await source.getHourlyOHLCVAsync(pair);
+ const records = rawRecords.filter(rec => {
+ return rec.time * ONE_SECOND < ONE_HOUR_AGO && rec.time * ONE_SECOND > pair.latestSavedTime;
+ }); // Crypto Compare can take ~30mins to finalise records
+ if (records.length === 0) {
+ console.log(`No more records, stopping task for ${JSON.stringify(pair)}`);
+ break;
+ }
+ const metadata: OHLCVMetadata = {
+ exchange: source.default_exchange,
+ fromSymbol: pair.fromSymbol,
+ toSymbol: pair.toSymbol,
+ source: SOURCE_NAME,
+ observedTimestamp: jobTime,
+ interval: source.intervalBetweenRecords,
+ };
+ const parsedRecords = parseRecords(records, metadata);
+ try {
+ await saveRecordsAsync(repository, parsedRecords);
+ i++;
+ } catch (err) {
+ console.log(`Error saving OHLCVRecords, stopping task for ${JSON.stringify(pair)} [${err}]`);
+ break;
+ }
+ }
+ return Promise.resolve();
+}
+
+async function saveRecordsAsync(repository: Repository<OHLCVExternal>, records: OHLCVExternal[]): Promise<void> {
+ const metadata = [
+ records[0].fromSymbol,
+ records[0].toSymbol,
+ new Date(records[0].startTime),
+ new Date(records[records.length - 1].endTime),
+ ];
+
+ console.log(`Saving ${records.length} records to ${repository.metadata.name}... ${JSON.stringify(metadata)}`);
+ await repository.save(records);
+}
diff --git a/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts b/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts
new file mode 100644
index 000000000..9d3ef2fba
--- /dev/null
+++ b/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts
@@ -0,0 +1,92 @@
+import { fetchAsync } from '@0x/utils';
+import * as R from 'ramda';
+import { Connection } from 'typeorm';
+
+export interface TradingPair {
+ fromSymbol: string;
+ toSymbol: string;
+ latestSavedTime: number;
+}
+
+const COINLIST_API = 'https://min-api.cryptocompare.com/data/all/coinlist?BuiltOn=7605';
+
+interface CryptoCompareCoinListResp {
+ Data: Map<string, CryptoCompareCoin>;
+}
+
+interface CryptoCompareCoin {
+ Symbol: string;
+ BuiltOn: string;
+ SmartContractAddress: string;
+}
+
+const TO_CURRENCIES = ['USD', 'EUR', 'ETH', 'USDT'];
+const ETHEREUM_IDENTIFIER = '7605';
+const HTTP_OK_STATUS = 200;
+/**
+ * Get trading pairs with latest scraped time for OHLCV records
+ * @param conn a typeorm Connection to postgres
+ */
+export async function fetchOHLCVTradingPairsAsync(
+ conn: Connection,
+ source: string,
+ earliestBackfillTime: number,
+): Promise<TradingPair[]> {
+ // fetch existing ohlcv records
+ const latestTradingPairs: Array<{
+ from_symbol: string;
+ to_symbol: string;
+ latest: string;
+ }> = await conn.query(`SELECT
+ MAX(end_time) as latest,
+ from_symbol,
+ to_symbol
+ FROM raw.ohlcv_external
+ GROUP BY from_symbol, to_symbol;`);
+
+ const latestTradingPairsIndex: { [fromSym: string]: { [toSym: string]: number } } = {};
+ latestTradingPairs.forEach(pair => {
+ const latestIndex: { [toSym: string]: number } = latestTradingPairsIndex[pair.from_symbol] || {};
+ latestIndex[pair.to_symbol] = parseInt(pair.latest, 10); // tslint:disable-line:custom-no-magic-numbers
+ latestTradingPairsIndex[pair.from_symbol] = latestIndex;
+ });
+
+ // get token symbols used by Crypto Compare
+ const allCoinsResp = await fetchAsync(COINLIST_API);
+ if (allCoinsResp.status !== HTTP_OK_STATUS) {
+ return [];
+ }
+ const allCoins: CryptoCompareCoinListResp = await allCoinsResp.json();
+ const erc20CoinsIndex: Map<string, string> = new Map();
+ Object.entries(allCoins.Data).forEach(pair => {
+ const [symbol, coinData] = pair;
+ if (coinData.BuiltOn === ETHEREUM_IDENTIFIER && coinData.SmartContractAddress !== 'N/A') {
+ erc20CoinsIndex.set(coinData.SmartContractAddress.toLowerCase(), symbol);
+ }
+ });
+
+ // fetch all tokens that are traded on 0x
+ const rawTokenAddresses: Array<{ tokenaddress: string }> = await conn.query(
+ `SELECT DISTINCT(maker_token_address) as tokenaddress FROM raw.exchange_fill_events UNION
+ SELECT DISTINCT(taker_token_address) as tokenaddress FROM raw.exchange_fill_events`,
+ );
+ const tokenAddresses = R.pluck('tokenaddress', rawTokenAddresses);
+
+ // join token addresses with CC symbols
+ const allTokenSymbols: string[] = tokenAddresses
+ .map(tokenAddress => erc20CoinsIndex.get(tokenAddress.toLowerCase()) || '')
+ .filter(x => x);
+
+ // generate list of all tokens with time of latest existing record OR default earliest time
+ const allTradingPairCombinations: TradingPair[] = R.chain(sym => {
+ return TO_CURRENCIES.map(fiat => {
+ return {
+ fromSymbol: sym,
+ toSymbol: fiat,
+ latestSavedTime: R.path<number>([sym, fiat], latestTradingPairsIndex) || earliestBackfillTime,
+ };
+ });
+ }, allTokenSymbols);
+
+ return allTradingPairCombinations;
+}
diff --git a/packages/pipeline/src/utils/index.ts b/packages/pipeline/src/utils/index.ts
index 918cfc695..718ea6133 100644
--- a/packages/pipeline/src/utils/index.ts
+++ b/packages/pipeline/src/utils/index.ts
@@ -35,12 +35,3 @@ export function handleError(e: any): void {
}
process.exit(1);
}
-
-/**
- * Returns the unix timestamp of the current hour
- */
-export function getHourInUnixTime(): number {
- const currentTime: number = Date.now();
- // tslint:disable-next-line
- return currentTime - currentTime % (3600 * 1000);
-}