diff options
author | Xianny <8582774+xianny@users.noreply.github.com> | 2018-12-05 05:36:18 +0800 |
---|---|---|
committer | Fred Carlsen <fred@sjelfull.no> | 2018-12-06 19:06:34 +0800 |
commit | 6f5787b2c43957c1c5db5a6123399e8baeb0ed78 (patch) | |
tree | 811b4c2269f716262d1263bf53d0e51d5a8a82a7 /packages/pipeline/src | |
parent | f96711bac373ac7caaca647defd68d91ba43a181 (diff) | |
download | dexon-sol-tools-6f5787b2c43957c1c5db5a6123399e8baeb0ed78.tar dexon-sol-tools-6f5787b2c43957c1c5db5a6123399e8baeb0ed78.tar.gz dexon-sol-tools-6f5787b2c43957c1c5db5a6123399e8baeb0ed78.tar.bz2 dexon-sol-tools-6f5787b2c43957c1c5db5a6123399e8baeb0ed78.tar.lz dexon-sol-tools-6f5787b2c43957c1c5db5a6123399e8baeb0ed78.tar.xz dexon-sol-tools-6f5787b2c43957c1c5db5a6123399e8baeb0ed78.tar.zst dexon-sol-tools-6f5787b2c43957c1c5db5a6123399e8baeb0ed78.zip |
pull OHLCV records from Crypto Compare (#1349)
* [WIP] pull OHLCV records from Crypto Compare
* lint
* refactor to pull logic out of script and into modules
* add entity test for ohlcv_external entity
* implement rate limit and chronological backfill for ohlcv
* add unit tests; cleanup variable names
* Fetch OHLCV pairs params from events table
* better method names
* fix outdated test
* lint
* Clean up after review
* oops
* fix failing test
* better filtering of most recent records
* fix bug when generating pairs
* fix default earliest backfill date
* fix bug with retrieving backfill time
* prettier
Diffstat (limited to 'packages/pipeline/src')
8 files changed, 344 insertions, 16 deletions
diff --git a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts new file mode 100644 index 000000000..6b10c29c5 --- /dev/null +++ b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts @@ -0,0 +1,94 @@ +// tslint:disable:no-duplicate-imports +import { fetchAsync } from '@0x/utils'; +import promiseLimit = require('p-limit'); +import { stringify } from 'querystring'; +import * as R from 'ramda'; + +import { TradingPair } from '../../utils/get_ohlcv_trading_pairs'; + +export interface CryptoCompareOHLCVResponse { + Data: Map<string, CryptoCompareOHLCVRecord[]>; + Response: string; + Message: string; +} + +export interface CryptoCompareOHLCVRecord { + time: number; // in seconds, not milliseconds + close: number; + high: number; + low: number; + open: number; + volumefrom: number; + volumeto: number; +} + +export interface CryptoCompareOHLCVParams { + fsym: string; + tsym: string; + e?: string; + aggregate?: string; + aggregatePredictableTimePeriods?: boolean; + limit?: number; + toTs?: number; +} + +const ONE_WEEK = 7 * 24 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers +const ONE_HOUR = 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers +const ONE_SECOND = 1000; +const HTTP_OK_STATUS = 200; + +export class CryptoCompareOHLCVSource { + public readonly interval = ONE_WEEK; // the hourly API returns data for one week at a time + public readonly default_exchange = 'CCCAGG'; + public readonly intervalBetweenRecords = ONE_HOUR; + private readonly _url: string = 'https://min-api.cryptocompare.com/data/histohour?'; + + // rate-limit for all API calls through this class instance + private readonly _promiseLimit: (fetchFn: () => Promise<Response>) => Promise<Response>; + constructor(maxConcurrentRequests: number = 50) { + this._promiseLimit = promiseLimit(maxConcurrentRequests); + } + + // gets OHLCV records starting from pair.latest + public async getHourlyOHLCVAsync(pair: TradingPair): Promise<CryptoCompareOHLCVRecord[]> { + const params = { + e: this.default_exchange, + fsym: pair.fromSymbol, + tsym: pair.toSymbol, + toTs: Math.floor((pair.latestSavedTime + this.interval) / ONE_SECOND), // CryptoCompare uses timestamp in seconds. not ms + }; + const url = this._url + stringify(params); + + // go through the instance-wide rate-limit + const fetchPromise: Promise<Response> = this._promiseLimit(() => { + // tslint:disable-next-line:no-console + console.log(`Scraping Crypto Compare at ${url}`); + return fetchAsync(url); + }); + + const response = await Promise.resolve(fetchPromise); + if (response.status !== HTTP_OK_STATUS) { + // tslint:disable-next-line:no-console + console.log(`Error scraping ${url}`); + return []; + } + const json: CryptoCompareOHLCVResponse = await response.json(); + if (json.Response === 'Error' || Object.keys(json.Data).length === 0) { + // tslint:disable-next-line:no-console + console.log(`Error scraping ${url}: ${json.Message}`); + return []; + } + return Object.values(json.Data).filter(rec => rec.time * ONE_SECOND >= pair.latestSavedTime); + } + public generateBackfillIntervals(pair: TradingPair): TradingPair[] { + const now = new Date().getTime(); + const f = (p: TradingPair): false | [TradingPair, TradingPair] => { + if (p.latestSavedTime > now) { + return false; + } else { + return [p, R.merge(p, { latestSavedTime: p.latestSavedTime + this.interval })]; + } + }; + return R.unfold(f, pair); + } +} diff --git a/packages/pipeline/src/entities/ohlcv_external.ts b/packages/pipeline/src/entities/ohlcv_external.ts index 95cd4f2f5..4f55dd930 100644 --- a/packages/pipeline/src/entities/ohlcv_external.ts +++ b/packages/pipeline/src/entities/ohlcv_external.ts @@ -1,20 +1,30 @@ import { Column, Entity, PrimaryColumn } from 'typeorm'; +import { numberToBigIntTransformer } from '../utils'; + @Entity({ name: 'ohlcv_external', schema: 'raw' }) export class OHLCVExternal { @PrimaryColumn() public exchange!: string; - @PrimaryColumn() public fromSymbol!: string; - @PrimaryColumn() public toSymbol!: string; - @PrimaryColumn() public startTime!: number; - @PrimaryColumn() public endTime!: number; + + @PrimaryColumn({ name: 'from_symbol', type: 'varchar' }) + public fromSymbol!: string; + @PrimaryColumn({ name: 'to_symbol', type: 'varchar' }) + public toSymbol!: string; + @PrimaryColumn({ name: 'start_time', transformer: numberToBigIntTransformer }) + public startTime!: number; + @PrimaryColumn({ name: 'end_time', transformer: numberToBigIntTransformer }) + public endTime!: number; @Column() public open!: number; @Column() public close!: number; @Column() public low!: number; @Column() public high!: number; - @Column() public volumeFrom!: number; - @Column() public volumeTo!: number; + @Column({ name: 'volume_from' }) + public volumeFrom!: number; + @Column({ name: 'volume_to' }) + public volumeTo!: number; @PrimaryColumn() public source!: string; - @PrimaryColumn() public observedTimestamp!: number; + @PrimaryColumn({ name: 'observed_timestamp', transformer: numberToBigIntTransformer }) + public observedTimestamp!: number; } diff --git a/packages/pipeline/src/ormconfig.ts b/packages/pipeline/src/ormconfig.ts index c135c399b..9f7815b4e 100644 --- a/packages/pipeline/src/ormconfig.ts +++ b/packages/pipeline/src/ormconfig.ts @@ -6,6 +6,7 @@ import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeFillEvent, + OHLCVExternal, Relayer, SraOrder, SraOrdersObservedTimeStamp, @@ -20,6 +21,7 @@ const entities = [ ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeFillEvent, + OHLCVExternal, Relayer, SraOrder, SraOrdersObservedTimeStamp, diff --git a/packages/pipeline/src/parsers/ohlcv_external/crypto_compare.ts b/packages/pipeline/src/parsers/ohlcv_external/crypto_compare.ts new file mode 100644 index 000000000..3efb90384 --- /dev/null +++ b/packages/pipeline/src/parsers/ohlcv_external/crypto_compare.ts @@ -0,0 +1,38 @@ +import { CryptoCompareOHLCVRecord } from '../../data_sources/ohlcv_external/crypto_compare'; +import { OHLCVExternal } from '../../entities'; + +const ONE_SECOND = 1000; // Crypto Compare uses timestamps in seconds instead of milliseconds + +export interface OHLCVMetadata { + exchange: string; + fromSymbol: string; + toSymbol: string; + source: string; + observedTimestamp: number; + interval: number; +} +/** + * Parses OHLCV records from Crypto Compare into an array of OHLCVExternal entities + * @param rawRecords an array of OHLCV records from Crypto Compare (not the full response) + */ +export function parseRecords(rawRecords: CryptoCompareOHLCVRecord[], metadata: OHLCVMetadata): OHLCVExternal[] { + return rawRecords.map(rec => { + const ohlcvRecord = new OHLCVExternal(); + ohlcvRecord.exchange = metadata.exchange; + ohlcvRecord.fromSymbol = metadata.fromSymbol; + ohlcvRecord.toSymbol = metadata.toSymbol; + ohlcvRecord.startTime = rec.time * ONE_SECOND - metadata.interval; + ohlcvRecord.endTime = rec.time * ONE_SECOND; + + ohlcvRecord.open = rec.open; + ohlcvRecord.close = rec.close; + ohlcvRecord.low = rec.low; + ohlcvRecord.high = rec.high; + ohlcvRecord.volumeFrom = rec.volumefrom; + ohlcvRecord.volumeTo = rec.volumeto; + + ohlcvRecord.source = metadata.source; + ohlcvRecord.observedTimestamp = metadata.observedTimestamp; + return ohlcvRecord; + }); +} diff --git a/packages/pipeline/src/parsers/ohlcv_external/index.ts b/packages/pipeline/src/parsers/ohlcv_external/index.ts deleted file mode 100644 index e69de29bb..000000000 --- a/packages/pipeline/src/parsers/ohlcv_external/index.ts +++ /dev/null diff --git a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts new file mode 100644 index 000000000..6979cd10e --- /dev/null +++ b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts @@ -0,0 +1,101 @@ +// tslint:disable:no-console +import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; + +import { CryptoCompareOHLCVSource } from '../data_sources/ohlcv_external/crypto_compare'; +import { OHLCVExternal } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { OHLCVMetadata, parseRecords } from '../parsers/ohlcv_external/crypto_compare'; +import { handleError } from '../utils'; +import { fetchOHLCVTradingPairsAsync, TradingPair } from '../utils/get_ohlcv_trading_pairs'; + +const SOURCE_NAME = 'CryptoCompare'; +const TWO_HOURS_AGO = new Date().getTime() - 2 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers +const ONE_HOUR_AGO = new Date().getTime() - 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers +const ONE_SECOND = 1000; + +const MAX_CONCURRENT_REQUESTS = parseInt(process.env.CRYPTOCOMPARE_MAX_CONCURRENT_REQUESTS || '14', 10); // tslint:disable-line:custom-no-magic-numbers +const EARLIEST_BACKFILL_DATE = process.env.OHLCV_EARLIEST_BACKFILL_DATE || '2010-09-01'; // the time when BTC/USD info starts appearing on Crypto Compare +const EARLIEST_BACKFILL_TIME = new Date(EARLIEST_BACKFILL_DATE).getTime(); + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const repository = connection.getRepository(OHLCVExternal); + const source = new CryptoCompareOHLCVSource(MAX_CONCURRENT_REQUESTS); + + const jobTime = new Date().getTime(); + const tradingPairs = await fetchOHLCVTradingPairsAsync(connection, SOURCE_NAME, EARLIEST_BACKFILL_TIME); + console.log(`Starting ${tradingPairs.length} job(s) to scrape Crypto Compare for OHLCV records...`); + + const fetchAndSavePromises = tradingPairs.map(async pair => { + const pairs = source.generateBackfillIntervals(pair); + return fetchAndSaveAsync(source, repository, jobTime, pairs); + }); + await Promise.all(fetchAndSavePromises); + console.log(`Finished scraping OHLCV records from Crypto Compare, exiting...`); + process.exit(0); +})().catch(handleError); + +async function fetchAndSaveAsync( + source: CryptoCompareOHLCVSource, + repository: Repository<OHLCVExternal>, + jobTime: number, + pairs: TradingPair[], +): Promise<void> { + const sortAscTimestamp = (a: TradingPair, b: TradingPair): number => { + if (a.latestSavedTime < b.latestSavedTime) { + return -1; + } else if (a.latestSavedTime > b.latestSavedTime) { + return 1; + } else { + return 0; + } + }; + pairs.sort(sortAscTimestamp); + + let i = 0; + while (i < pairs.length) { + const pair = pairs[i]; + if (pair.latestSavedTime > TWO_HOURS_AGO) { + break; + } + const rawRecords = await source.getHourlyOHLCVAsync(pair); + const records = rawRecords.filter(rec => { + return rec.time * ONE_SECOND < ONE_HOUR_AGO && rec.time * ONE_SECOND > pair.latestSavedTime; + }); // Crypto Compare can take ~30mins to finalise records + if (records.length === 0) { + console.log(`No more records, stopping task for ${JSON.stringify(pair)}`); + break; + } + const metadata: OHLCVMetadata = { + exchange: source.default_exchange, + fromSymbol: pair.fromSymbol, + toSymbol: pair.toSymbol, + source: SOURCE_NAME, + observedTimestamp: jobTime, + interval: source.intervalBetweenRecords, + }; + const parsedRecords = parseRecords(records, metadata); + try { + await saveRecordsAsync(repository, parsedRecords); + i++; + } catch (err) { + console.log(`Error saving OHLCVRecords, stopping task for ${JSON.stringify(pair)} [${err}]`); + break; + } + } + return Promise.resolve(); +} + +async function saveRecordsAsync(repository: Repository<OHLCVExternal>, records: OHLCVExternal[]): Promise<void> { + const metadata = [ + records[0].fromSymbol, + records[0].toSymbol, + new Date(records[0].startTime), + new Date(records[records.length - 1].endTime), + ]; + + console.log(`Saving ${records.length} records to ${repository.metadata.name}... ${JSON.stringify(metadata)}`); + await repository.save(records); +} diff --git a/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts b/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts new file mode 100644 index 000000000..9d3ef2fba --- /dev/null +++ b/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts @@ -0,0 +1,92 @@ +import { fetchAsync } from '@0x/utils'; +import * as R from 'ramda'; +import { Connection } from 'typeorm'; + +export interface TradingPair { + fromSymbol: string; + toSymbol: string; + latestSavedTime: number; +} + +const COINLIST_API = 'https://min-api.cryptocompare.com/data/all/coinlist?BuiltOn=7605'; + +interface CryptoCompareCoinListResp { + Data: Map<string, CryptoCompareCoin>; +} + +interface CryptoCompareCoin { + Symbol: string; + BuiltOn: string; + SmartContractAddress: string; +} + +const TO_CURRENCIES = ['USD', 'EUR', 'ETH', 'USDT']; +const ETHEREUM_IDENTIFIER = '7605'; +const HTTP_OK_STATUS = 200; +/** + * Get trading pairs with latest scraped time for OHLCV records + * @param conn a typeorm Connection to postgres + */ +export async function fetchOHLCVTradingPairsAsync( + conn: Connection, + source: string, + earliestBackfillTime: number, +): Promise<TradingPair[]> { + // fetch existing ohlcv records + const latestTradingPairs: Array<{ + from_symbol: string; + to_symbol: string; + latest: string; + }> = await conn.query(`SELECT + MAX(end_time) as latest, + from_symbol, + to_symbol + FROM raw.ohlcv_external + GROUP BY from_symbol, to_symbol;`); + + const latestTradingPairsIndex: { [fromSym: string]: { [toSym: string]: number } } = {}; + latestTradingPairs.forEach(pair => { + const latestIndex: { [toSym: string]: number } = latestTradingPairsIndex[pair.from_symbol] || {}; + latestIndex[pair.to_symbol] = parseInt(pair.latest, 10); // tslint:disable-line:custom-no-magic-numbers + latestTradingPairsIndex[pair.from_symbol] = latestIndex; + }); + + // get token symbols used by Crypto Compare + const allCoinsResp = await fetchAsync(COINLIST_API); + if (allCoinsResp.status !== HTTP_OK_STATUS) { + return []; + } + const allCoins: CryptoCompareCoinListResp = await allCoinsResp.json(); + const erc20CoinsIndex: Map<string, string> = new Map(); + Object.entries(allCoins.Data).forEach(pair => { + const [symbol, coinData] = pair; + if (coinData.BuiltOn === ETHEREUM_IDENTIFIER && coinData.SmartContractAddress !== 'N/A') { + erc20CoinsIndex.set(coinData.SmartContractAddress.toLowerCase(), symbol); + } + }); + + // fetch all tokens that are traded on 0x + const rawTokenAddresses: Array<{ tokenaddress: string }> = await conn.query( + `SELECT DISTINCT(maker_token_address) as tokenaddress FROM raw.exchange_fill_events UNION + SELECT DISTINCT(taker_token_address) as tokenaddress FROM raw.exchange_fill_events`, + ); + const tokenAddresses = R.pluck('tokenaddress', rawTokenAddresses); + + // join token addresses with CC symbols + const allTokenSymbols: string[] = tokenAddresses + .map(tokenAddress => erc20CoinsIndex.get(tokenAddress.toLowerCase()) || '') + .filter(x => x); + + // generate list of all tokens with time of latest existing record OR default earliest time + const allTradingPairCombinations: TradingPair[] = R.chain(sym => { + return TO_CURRENCIES.map(fiat => { + return { + fromSymbol: sym, + toSymbol: fiat, + latestSavedTime: R.path<number>([sym, fiat], latestTradingPairsIndex) || earliestBackfillTime, + }; + }); + }, allTokenSymbols); + + return allTradingPairCombinations; +} diff --git a/packages/pipeline/src/utils/index.ts b/packages/pipeline/src/utils/index.ts index 918cfc695..718ea6133 100644 --- a/packages/pipeline/src/utils/index.ts +++ b/packages/pipeline/src/utils/index.ts @@ -35,12 +35,3 @@ export function handleError(e: any): void { } process.exit(1); } - -/** - * Returns the unix timestamp of the current hour - */ -export function getHourInUnixTime(): number { - const currentTime: number = Date.now(); - // tslint:disable-next-line - return currentTime - currentTime % (3600 * 1000); -} |