diff options
Diffstat (limited to 'packages/pipeline/src')
-rw-r--r-- | packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts | 44 |
1 files changed, 34 insertions, 10 deletions
diff --git a/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts b/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts index 9d3ef2fba..19f81344e 100644 --- a/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts +++ b/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts @@ -23,6 +23,18 @@ interface CryptoCompareCoin { const TO_CURRENCIES = ['USD', 'EUR', 'ETH', 'USDT']; const ETHEREUM_IDENTIFIER = '7605'; const HTTP_OK_STATUS = 200; + +interface StaticPair { + fromSymbol: string; + toSymbol: string; +} +const SPECIAL_CASES: StaticPair[] = [ + { + fromSymbol: 'ETH', + toSymbol: 'USD', + }, +]; + /** * Get trading pairs with latest scraped time for OHLCV records * @param conn a typeorm Connection to postgres @@ -44,6 +56,7 @@ export async function fetchOHLCVTradingPairsAsync( FROM raw.ohlcv_external GROUP BY from_symbol, to_symbol;`); + // build addressable index: { fromsym: { tosym: time }} const latestTradingPairsIndex: { [fromSym: string]: { [toSym: string]: number } } = {}; latestTradingPairs.forEach(pair => { const latestIndex: { [toSym: string]: number } = latestTradingPairsIndex[pair.from_symbol] || {}; @@ -51,6 +64,13 @@ export async function fetchOHLCVTradingPairsAsync( latestTradingPairsIndex[pair.from_symbol] = latestIndex; }); + // match time to special cases + const specialCases: TradingPair[] = SPECIAL_CASES.map(pair => { + const latestSavedTime = + R.path<number>([pair.fromSymbol, pair.toSymbol], latestTradingPairsIndex) || earliestBackfillTime; + return R.assoc('latestSavedTime', latestSavedTime, pair); + }); + // get token symbols used by Crypto Compare const allCoinsResp = await fetchAsync(COINLIST_API); if (allCoinsResp.status !== HTTP_OK_STATUS) { @@ -66,27 +86,31 @@ export async function fetchOHLCVTradingPairsAsync( }); // fetch all tokens that are traded on 0x - const rawTokenAddresses: Array<{ tokenaddress: string }> = await conn.query( + const rawEventTokenAddresses: Array<{ tokenaddress: string }> = await conn.query( `SELECT DISTINCT(maker_token_address) as tokenaddress FROM raw.exchange_fill_events UNION SELECT DISTINCT(taker_token_address) as tokenaddress FROM raw.exchange_fill_events`, ); - const tokenAddresses = R.pluck('tokenaddress', rawTokenAddresses); + + // tslint:disable-next-line:no-unbound-method + const eventTokenAddresses = R.pluck('tokenaddress', rawEventTokenAddresses).map(R.toLower); // join token addresses with CC symbols - const allTokenSymbols: string[] = tokenAddresses - .map(tokenAddress => erc20CoinsIndex.get(tokenAddress.toLowerCase()) || '') - .filter(x => x); + const eventTokenSymbols: string[] = eventTokenAddresses + .filter(tokenAddress => erc20CoinsIndex.has(tokenAddress)) + .map(tokenAddress => erc20CoinsIndex.get(tokenAddress) as string); - // generate list of all tokens with time of latest existing record OR default earliest time - const allTradingPairCombinations: TradingPair[] = R.chain(sym => { + // join traded tokens with fiat and latest backfill time + const eventTradingPairs: TradingPair[] = R.chain(sym => { return TO_CURRENCIES.map(fiat => { - return { + const pair = { fromSymbol: sym, toSymbol: fiat, latestSavedTime: R.path<number>([sym, fiat], latestTradingPairsIndex) || earliestBackfillTime, }; + return pair; }); - }, allTokenSymbols); + }, eventTokenSymbols); - return allTradingPairCombinations; + // join with special cases + return R.concat(eventTradingPairs, specialCases); } |