aboutsummaryrefslogtreecommitdiffstats
path: root/packages/pipeline
diff options
context:
space:
mode:
Diffstat (limited to 'packages/pipeline')
-rw-r--r--packages/pipeline/package.json26
-rw-r--r--packages/pipeline/src/ormconfig.ts2
-rw-r--r--packages/pipeline/src/parsers/ddex_orders/index.ts14
-rw-r--r--packages/pipeline/src/parsers/oasis_orders/index.ts4
-rw-r--r--packages/pipeline/src/parsers/paradex_orders/index.ts4
-rw-r--r--packages/pipeline/src/parsers/token_metadata/index.ts7
-rw-r--r--packages/pipeline/src/scripts/pull_competing_dex_trades.ts4
-rw-r--r--packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts4
-rw-r--r--packages/pipeline/src/scripts/pull_idex_orderbook_snapshots.ts4
-rw-r--r--packages/pipeline/src/scripts/pull_missing_blocks.ts12
-rw-r--r--packages/pipeline/src/scripts/pull_oasis_orderbook_snapshots.ts4
-rw-r--r--packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts4
-rw-r--r--packages/pipeline/src/scripts/pull_trusted_tokens.ts8
-rw-r--r--packages/pipeline/src/scripts/update_relayer_info.ts4
-rw-r--r--packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts44
-rw-r--r--packages/pipeline/src/utils/index.ts15
-rw-r--r--packages/pipeline/test/parsers/ddex_orders/index_test.ts14
-rw-r--r--packages/pipeline/test/parsers/oasis_orders/index_test.ts4
-rw-r--r--packages/pipeline/test/parsers/paradex_orders/index_test.ts4
19 files changed, 108 insertions, 74 deletions
diff --git a/packages/pipeline/package.json b/packages/pipeline/package.json
index 7ba14c6db..a40f3d21c 100644
--- a/packages/pipeline/package.json
+++ b/packages/pipeline/package.json
@@ -1,6 +1,6 @@
{
"name": "@0x/pipeline",
- "version": "1.0.1",
+ "version": "1.0.2",
"private": true,
"description": "Data pipeline for offline analysis",
"scripts": {
@@ -27,41 +27,35 @@
},
"license": "Apache-2.0",
"devDependencies": {
- "@0x/tslint-config": "^1.0.9",
+ "@0x/tslint-config": "^2.0.0",
"@types/axios": "^0.14.0",
"@types/ramda": "^0.25.38",
- "@types/axios": "^0.14.0",
"chai": "^4.1.2",
"chai-as-promised": "^7.1.1",
"chai-bignumber": "^2.0.2",
"dirty-chai": "^2.0.1",
"mocha": "^5.2.0",
"tslint": "5.11.0",
- "typescript": "3.0.1",
- "mocha": "^5.2.0",
- "chai": "^4.1.2",
- "chai-as-promised": "^7.1.1",
- "chai-bignumber": "^2.0.2",
- "dirty-chai": "^2.0.1"
+ "typescript": "3.0.1"
},
"dependencies": {
- "@0x/connect": "^3.0.9",
+ "@0x/connect": "^3.0.10",
"@0x/contract-addresses": "^2.0.0",
"@0x/contract-artifacts": "^1.0.1",
"@0x/contract-wrappers": "^3.0.0",
- "@0x/dev-utils": "^1.0.20",
+ "@0x/dev-utils": "^1.0.21",
"@0x/order-utils": "^2.0.0",
- "@0x/subproviders": "^2.1.7",
- "@0x/types": "^1.4.0",
- "@0x/utils": "^2.0.7",
- "@0x/web3-wrapper": "^3.2.0",
+ "@0x/subproviders": "^2.1.8",
+ "@0x/types": "^1.4.1",
+ "@0x/utils": "^2.0.8",
+ "@0x/web3-wrapper": "^3.2.1",
"@types/dockerode": "^2.5.9",
"@types/p-limit": "^2.0.0",
"async-parallel": "^1.2.3",
"axios": "^0.18.0",
"bottleneck": "^2.13.2",
"dockerode": "^2.5.7",
- "ethereum-types": "^1.1.3",
+ "ethereum-types": "^1.1.4",
"pg": "^7.5.0",
"prettier": "^1.15.3",
"ramda": "^0.25.0",
diff --git a/packages/pipeline/src/ormconfig.ts b/packages/pipeline/src/ormconfig.ts
index 27991c430..fe11d81d5 100644
--- a/packages/pipeline/src/ormconfig.ts
+++ b/packages/pipeline/src/ormconfig.ts
@@ -14,7 +14,6 @@ import {
TokenMetadata,
TokenOrderbookSnapshot,
Transaction,
- TrustedToken,
} from './entities';
const entities = [
@@ -31,7 +30,6 @@ const entities = [
TokenMetadata,
TokenOrderbookSnapshot,
Transaction,
- TrustedToken,
];
const config: ConnectionOptions = {
diff --git a/packages/pipeline/src/parsers/ddex_orders/index.ts b/packages/pipeline/src/parsers/ddex_orders/index.ts
index 52a998f9f..d7b97efbe 100644
--- a/packages/pipeline/src/parsers/ddex_orders/index.ts
+++ b/packages/pipeline/src/parsers/ddex_orders/index.ts
@@ -54,12 +54,14 @@ export function parseDdexOrder(
tokenOrder.orderType = orderType;
tokenOrder.price = price;
- tokenOrder.baseAssetSymbol = ddexMarket.baseToken;
- tokenOrder.baseAssetAddress = ddexMarket.baseTokenAddress;
- tokenOrder.baseVolume = price.times(amount);
+ // ddex currently confuses quote and base assets.
+ // We switch them here to maintain our internal consistency.
+ tokenOrder.baseAssetSymbol = ddexMarket.quoteToken;
+ tokenOrder.baseAssetAddress = ddexMarket.quoteTokenAddress;
+ tokenOrder.baseVolume = amount;
- tokenOrder.quoteAssetSymbol = ddexMarket.quoteToken;
- tokenOrder.quoteAssetAddress = ddexMarket.quoteTokenAddress;
- tokenOrder.quoteVolume = amount;
+ tokenOrder.quoteAssetSymbol = ddexMarket.baseToken;
+ tokenOrder.quoteAssetAddress = ddexMarket.baseTokenAddress;
+ tokenOrder.quoteVolume = price.times(amount);
return tokenOrder;
}
diff --git a/packages/pipeline/src/parsers/oasis_orders/index.ts b/packages/pipeline/src/parsers/oasis_orders/index.ts
index 7aafbf460..13997f31b 100644
--- a/packages/pipeline/src/parsers/oasis_orders/index.ts
+++ b/packages/pipeline/src/parsers/oasis_orders/index.ts
@@ -62,10 +62,10 @@ export function parseOasisOrder(
tokenOrder.baseAssetSymbol = oasisMarket.base;
tokenOrder.baseAssetAddress = null; // Oasis doesn't provide address information
- tokenOrder.baseVolume = price.times(amount);
+ tokenOrder.baseVolume = amount;
tokenOrder.quoteAssetSymbol = oasisMarket.quote;
tokenOrder.quoteAssetAddress = null; // Oasis doesn't provide address information
- tokenOrder.quoteVolume = amount;
+ tokenOrder.quoteVolume = price.times(amount);
return tokenOrder;
}
diff --git a/packages/pipeline/src/parsers/paradex_orders/index.ts b/packages/pipeline/src/parsers/paradex_orders/index.ts
index 7966658a7..5ceeb64a4 100644
--- a/packages/pipeline/src/parsers/paradex_orders/index.ts
+++ b/packages/pipeline/src/parsers/paradex_orders/index.ts
@@ -57,10 +57,10 @@ export function parseParadexOrder(
tokenOrder.baseAssetSymbol = paradexMarket.baseToken;
tokenOrder.baseAssetAddress = paradexMarket.baseTokenAddress as string;
- tokenOrder.baseVolume = price.times(amount);
+ tokenOrder.baseVolume = amount;
tokenOrder.quoteAssetSymbol = paradexMarket.quoteToken;
tokenOrder.quoteAssetAddress = paradexMarket.quoteTokenAddress as string;
- tokenOrder.quoteVolume = amount;
+ tokenOrder.quoteVolume = price.times(amount);
return tokenOrder;
}
diff --git a/packages/pipeline/src/parsers/token_metadata/index.ts b/packages/pipeline/src/parsers/token_metadata/index.ts
index f258af063..65e0aaa6e 100644
--- a/packages/pipeline/src/parsers/token_metadata/index.ts
+++ b/packages/pipeline/src/parsers/token_metadata/index.ts
@@ -1,9 +1,8 @@
-import { BigNumber } from '@0x/utils';
import * as R from 'ramda';
import { MetamaskTrustedTokenMeta, ZeroExTrustedTokenMeta } from '../../data_sources/trusted_tokens';
import { TokenMetadata } from '../../entities';
-import {} from '../../utils';
+import { toBigNumberOrNull } from '../../utils';
/**
* Parses Metamask's trusted tokens list.
@@ -26,7 +25,7 @@ function parseMetamaskTrustedToken(resp: MetamaskTrustedTokenMeta, address: stri
const trustedToken = new TokenMetadata();
trustedToken.address = address;
- trustedToken.decimals = new BigNumber(resp.decimals);
+ trustedToken.decimals = toBigNumberOrNull(resp.decimals);
trustedToken.symbol = resp.symbol;
trustedToken.name = resp.name;
trustedToken.authority = 'metamask';
@@ -38,7 +37,7 @@ function parseZeroExTrustedToken(resp: ZeroExTrustedTokenMeta): TokenMetadata {
const trustedToken = new TokenMetadata();
trustedToken.address = resp.address;
- trustedToken.decimals = resp.decimals ? new BigNumber(resp.decimals) : null;
+ trustedToken.decimals = toBigNumberOrNull(resp.decimals);
trustedToken.symbol = resp.symbol;
trustedToken.name = resp.name;
trustedToken.authority = '0x';
diff --git a/packages/pipeline/src/scripts/pull_competing_dex_trades.ts b/packages/pipeline/src/scripts/pull_competing_dex_trades.ts
index 4e4c12dd0..1478d5615 100644
--- a/packages/pipeline/src/scripts/pull_competing_dex_trades.ts
+++ b/packages/pipeline/src/scripts/pull_competing_dex_trades.ts
@@ -15,11 +15,11 @@ let connection: Connection;
(async () => {
connection = await createConnection(ormConfig as ConnectionOptions);
- await getAndSaveTrades();
+ await getAndSaveTradesAsync();
process.exit(0);
})().catch(handleError);
-async function getAndSaveTrades(): Promise<void> {
+async function getAndSaveTradesAsync(): Promise<void> {
const apiKey = process.env.BLOXY_API_KEY;
if (apiKey === undefined) {
throw new Error('Missing required env var: BLOXY_API_KEY');
diff --git a/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts
index 7868e9c5a..4e00f258f 100644
--- a/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts
+++ b/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts
@@ -25,7 +25,7 @@ let connection: Connection;
const markets = await ddexSource.getActiveMarketsAsync();
for (const marketsChunk of R.splitEvery(MARKET_ORDERBOOK_REQUEST_BATCH_SIZE, markets)) {
await Promise.all(
- marketsChunk.map(async (market: DdexMarket) => getAndSaveMarketOrderbook(ddexSource, market)),
+ marketsChunk.map(async (market: DdexMarket) => getAndSaveMarketOrderbookAsync(ddexSource, market)),
);
await new Promise<void>(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY));
}
@@ -38,7 +38,7 @@ let connection: Connection;
* @param ddexSource Data source which can query Ddex API.
* @param market Object from Ddex API containing market data.
*/
-async function getAndSaveMarketOrderbook(ddexSource: DdexSource, market: DdexMarket): Promise<void> {
+async function getAndSaveMarketOrderbookAsync(ddexSource: DdexSource, market: DdexMarket): Promise<void> {
const orderBook = await ddexSource.getMarketOrderbookAsync(market.id);
const observedTimestamp = Date.now();
diff --git a/packages/pipeline/src/scripts/pull_idex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_idex_orderbook_snapshots.ts
index d47c1dd3f..490b17766 100644
--- a/packages/pipeline/src/scripts/pull_idex_orderbook_snapshots.ts
+++ b/packages/pipeline/src/scripts/pull_idex_orderbook_snapshots.ts
@@ -27,7 +27,7 @@ let connection: Connection;
logUtils.log(`Got ${markets.length} markets.`);
for (const marketsChunk of R.splitEvery(MARKET_ORDERBOOK_REQUEST_BATCH_SIZE, markets)) {
await Promise.all(
- marketsChunk.map(async (marketId: string) => getAndSaveMarketOrderbook(idexSource, marketId)),
+ marketsChunk.map(async (marketId: string) => getAndSaveMarketOrderbookAsync(idexSource, marketId)),
);
await new Promise<void>(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY));
}
@@ -40,7 +40,7 @@ let connection: Connection;
* @param idexSource Data source which can query Idex API.
* @param marketId String representing market of interest, eg. 'ETH_TIC'.
*/
-async function getAndSaveMarketOrderbook(idexSource: IdexSource, marketId: string): Promise<void> {
+async function getAndSaveMarketOrderbookAsync(idexSource: IdexSource, marketId: string): Promise<void> {
logUtils.log(`${marketId}: Retrieving orderbook.`);
const orderBook = await idexSource.getMarketOrderbookAsync(marketId);
const observedTimestamp = Date.now();
diff --git a/packages/pipeline/src/scripts/pull_missing_blocks.ts b/packages/pipeline/src/scripts/pull_missing_blocks.ts
index 275141a12..a5203824c 100644
--- a/packages/pipeline/src/scripts/pull_missing_blocks.ts
+++ b/packages/pipeline/src/scripts/pull_missing_blocks.ts
@@ -27,7 +27,7 @@ let connection: Connection;
rpcUrl: INFURA_ROOT_URL,
});
const web3Source = new Web3Source(provider);
- await getAllMissingBlocks(web3Source);
+ await getAllMissingBlocksAsync(web3Source);
process.exit(0);
})().catch(handleError);
@@ -35,23 +35,23 @@ interface MissingBlocksResponse {
block_number: string;
}
-async function getAllMissingBlocks(web3Source: Web3Source): Promise<void> {
+async function getAllMissingBlocksAsync(web3Source: Web3Source): Promise<void> {
const blocksRepository = connection.getRepository(Block);
let fromBlock = EXCHANGE_START_BLOCK;
while (true) {
- const blockNumbers = await getMissingBlockNumbers(fromBlock);
+ const blockNumbers = await getMissingBlockNumbersAsync(fromBlock);
if (blockNumbers.length === 0) {
// There are no more missing blocks. We're done.
break;
}
- await getAndSaveBlocks(web3Source, blocksRepository, blockNumbers);
+ await getAndSaveBlocksAsync(web3Source, blocksRepository, blockNumbers);
fromBlock = Math.max(...blockNumbers) + 1;
}
const totalBlocks = await blocksRepository.count();
console.log(`Done saving blocks. There are now ${totalBlocks} total blocks.`);
}
-async function getMissingBlockNumbers(fromBlock: number): Promise<number[]> {
+async function getMissingBlockNumbersAsync(fromBlock: number): Promise<number[]> {
console.log(`Checking for missing blocks starting at ${fromBlock}...`);
// Note(albrow): The easiest way to get all the blocks we need is to
// consider all the events tables together in a single query. If this query
@@ -76,7 +76,7 @@ async function getMissingBlockNumbers(fromBlock: number): Promise<number[]> {
return blockNumbers;
}
-async function getAndSaveBlocks(
+async function getAndSaveBlocksAsync(
web3Source: Web3Source,
blocksRepository: Repository<Block>,
blockNumbers: number[],
diff --git a/packages/pipeline/src/scripts/pull_oasis_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_oasis_orderbook_snapshots.ts
index 0ffa5fd47..c4dcf6c83 100644
--- a/packages/pipeline/src/scripts/pull_oasis_orderbook_snapshots.ts
+++ b/packages/pipeline/src/scripts/pull_oasis_orderbook_snapshots.ts
@@ -27,7 +27,7 @@ let connection: Connection;
logUtils.log(`Got ${markets.length} markets.`);
for (const marketsChunk of R.splitEvery(MARKET_ORDERBOOK_REQUEST_BATCH_SIZE, markets)) {
await Promise.all(
- marketsChunk.map(async (market: OasisMarket) => getAndSaveMarketOrderbook(oasisSource, market)),
+ marketsChunk.map(async (market: OasisMarket) => getAndSaveMarketOrderbookAsync(oasisSource, market)),
);
await new Promise<void>(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY));
}
@@ -40,7 +40,7 @@ let connection: Connection;
* @param oasisSource Data source which can query Oasis API.
* @param marketId String identifying market we want data for. eg. 'REPAUG'.
*/
-async function getAndSaveMarketOrderbook(oasisSource: OasisSource, market: OasisMarket): Promise<void> {
+async function getAndSaveMarketOrderbookAsync(oasisSource: OasisSource, market: OasisMarket): Promise<void> {
logUtils.log(`${market.id}: Retrieving orderbook.`);
const orderBook = await oasisSource.getMarketOrderbookAsync(market.id);
const observedTimestamp = Date.now();
diff --git a/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts
index bae1fbede..34345f355 100644
--- a/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts
+++ b/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts
@@ -29,7 +29,7 @@ let connection: Connection;
const tokenInfoResponse = await paradexSource.getTokenInfoAsync();
const extendedMarkets = addTokenAddresses(markets, tokenInfoResponse);
await Promise.all(
- extendedMarkets.map(async (market: ParadexMarket) => getAndSaveMarketOrderbook(paradexSource, market)),
+ extendedMarkets.map(async (market: ParadexMarket) => getAndSaveMarketOrderbookAsync(paradexSource, market)),
);
process.exit(0);
})().catch(handleError);
@@ -70,7 +70,7 @@ function addTokenAddresses(
* @param paradexSource Data source which can query the Paradex API.
* @param market Object from the Paradex API with information about the market in question.
*/
-async function getAndSaveMarketOrderbook(paradexSource: ParadexSource, market: ParadexMarket): Promise<void> {
+async function getAndSaveMarketOrderbookAsync(paradexSource: ParadexSource, market: ParadexMarket): Promise<void> {
const paradexOrderbookResponse = await paradexSource.getMarketOrderbookAsync(market.symbol);
const observedTimestamp = Date.now();
diff --git a/packages/pipeline/src/scripts/pull_trusted_tokens.ts b/packages/pipeline/src/scripts/pull_trusted_tokens.ts
index 1befc4437..5906deee6 100644
--- a/packages/pipeline/src/scripts/pull_trusted_tokens.ts
+++ b/packages/pipeline/src/scripts/pull_trusted_tokens.ts
@@ -16,12 +16,12 @@ let connection: Connection;
(async () => {
connection = await createConnection(ormConfig as ConnectionOptions);
- await getMetamaskTrustedTokens();
- await getZeroExTrustedTokens();
+ await getMetamaskTrustedTokensAsync();
+ await getZeroExTrustedTokensAsync();
process.exit(0);
})().catch(handleError);
-async function getMetamaskTrustedTokens(): Promise<void> {
+async function getMetamaskTrustedTokensAsync(): Promise<void> {
// tslint:disable-next-line:no-console
console.log('Getting latest metamask trusted tokens list ...');
const trustedTokensRepository = connection.getRepository(TokenMetadata);
@@ -37,7 +37,7 @@ async function getMetamaskTrustedTokens(): Promise<void> {
console.log('Done saving metamask trusted tokens.');
}
-async function getZeroExTrustedTokens(): Promise<void> {
+async function getZeroExTrustedTokensAsync(): Promise<void> {
// tslint:disable-next-line:no-console
console.log('Getting latest 0x trusted tokens list ...');
const trustedTokensRepository = connection.getRepository(TokenMetadata);
diff --git a/packages/pipeline/src/scripts/update_relayer_info.ts b/packages/pipeline/src/scripts/update_relayer_info.ts
index f8918728d..41d29b385 100644
--- a/packages/pipeline/src/scripts/update_relayer_info.ts
+++ b/packages/pipeline/src/scripts/update_relayer_info.ts
@@ -17,11 +17,11 @@ let connection: Connection;
(async () => {
connection = await createConnection(ormConfig as ConnectionOptions);
- await getRelayers();
+ await getRelayersAsync();
process.exit(0);
})().catch(handleError);
-async function getRelayers(): Promise<void> {
+async function getRelayersAsync(): Promise<void> {
console.log('Getting latest relayer info...');
const relayerRepository = connection.getRepository(Relayer);
const relayerSource = new RelayerRegistrySource(RELAYER_REGISTRY_URL);
diff --git a/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts b/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts
index 9d3ef2fba..19f81344e 100644
--- a/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts
+++ b/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts
@@ -23,6 +23,18 @@ interface CryptoCompareCoin {
const TO_CURRENCIES = ['USD', 'EUR', 'ETH', 'USDT'];
const ETHEREUM_IDENTIFIER = '7605';
const HTTP_OK_STATUS = 200;
+
+interface StaticPair {
+ fromSymbol: string;
+ toSymbol: string;
+}
+const SPECIAL_CASES: StaticPair[] = [
+ {
+ fromSymbol: 'ETH',
+ toSymbol: 'USD',
+ },
+];
+
/**
* Get trading pairs with latest scraped time for OHLCV records
* @param conn a typeorm Connection to postgres
@@ -44,6 +56,7 @@ export async function fetchOHLCVTradingPairsAsync(
FROM raw.ohlcv_external
GROUP BY from_symbol, to_symbol;`);
+ // build addressable index: { fromsym: { tosym: time }}
const latestTradingPairsIndex: { [fromSym: string]: { [toSym: string]: number } } = {};
latestTradingPairs.forEach(pair => {
const latestIndex: { [toSym: string]: number } = latestTradingPairsIndex[pair.from_symbol] || {};
@@ -51,6 +64,13 @@ export async function fetchOHLCVTradingPairsAsync(
latestTradingPairsIndex[pair.from_symbol] = latestIndex;
});
+ // match time to special cases
+ const specialCases: TradingPair[] = SPECIAL_CASES.map(pair => {
+ const latestSavedTime =
+ R.path<number>([pair.fromSymbol, pair.toSymbol], latestTradingPairsIndex) || earliestBackfillTime;
+ return R.assoc('latestSavedTime', latestSavedTime, pair);
+ });
+
// get token symbols used by Crypto Compare
const allCoinsResp = await fetchAsync(COINLIST_API);
if (allCoinsResp.status !== HTTP_OK_STATUS) {
@@ -66,27 +86,31 @@ export async function fetchOHLCVTradingPairsAsync(
});
// fetch all tokens that are traded on 0x
- const rawTokenAddresses: Array<{ tokenaddress: string }> = await conn.query(
+ const rawEventTokenAddresses: Array<{ tokenaddress: string }> = await conn.query(
`SELECT DISTINCT(maker_token_address) as tokenaddress FROM raw.exchange_fill_events UNION
SELECT DISTINCT(taker_token_address) as tokenaddress FROM raw.exchange_fill_events`,
);
- const tokenAddresses = R.pluck('tokenaddress', rawTokenAddresses);
+
+ // tslint:disable-next-line:no-unbound-method
+ const eventTokenAddresses = R.pluck('tokenaddress', rawEventTokenAddresses).map(R.toLower);
// join token addresses with CC symbols
- const allTokenSymbols: string[] = tokenAddresses
- .map(tokenAddress => erc20CoinsIndex.get(tokenAddress.toLowerCase()) || '')
- .filter(x => x);
+ const eventTokenSymbols: string[] = eventTokenAddresses
+ .filter(tokenAddress => erc20CoinsIndex.has(tokenAddress))
+ .map(tokenAddress => erc20CoinsIndex.get(tokenAddress) as string);
- // generate list of all tokens with time of latest existing record OR default earliest time
- const allTradingPairCombinations: TradingPair[] = R.chain(sym => {
+ // join traded tokens with fiat and latest backfill time
+ const eventTradingPairs: TradingPair[] = R.chain(sym => {
return TO_CURRENCIES.map(fiat => {
- return {
+ const pair = {
fromSymbol: sym,
toSymbol: fiat,
latestSavedTime: R.path<number>([sym, fiat], latestTradingPairsIndex) || earliestBackfillTime,
};
+ return pair;
});
- }, allTokenSymbols);
+ }, eventTokenSymbols);
- return allTradingPairCombinations;
+ // join with special cases
+ return R.concat(eventTradingPairs, specialCases);
}
diff --git a/packages/pipeline/src/utils/index.ts b/packages/pipeline/src/utils/index.ts
index 2096a0a39..094c0178e 100644
--- a/packages/pipeline/src/utils/index.ts
+++ b/packages/pipeline/src/utils/index.ts
@@ -15,6 +15,21 @@ export function bigNumbertoStringOrNull(n: BigNumber): string | null {
}
/**
+ * If value is null or undefined, returns null. Otherwise converts value to a
+ * BigNumber.
+ * @param value A string or number to be converted to a BigNumber
+ */
+export function toBigNumberOrNull(value: string | number | null): BigNumber | null {
+ switch (value) {
+ case null:
+ case undefined:
+ return null;
+ default:
+ return new BigNumber(value);
+ }
+}
+
+/**
* Logs an error by intelligently checking for `message` and `stack` properties.
* Intended for use with top-level immediately invoked asynchronous functions.
* @param e the error to log.
diff --git a/packages/pipeline/test/parsers/ddex_orders/index_test.ts b/packages/pipeline/test/parsers/ddex_orders/index_test.ts
index 9f4bfe7e3..4a4a86bf8 100644
--- a/packages/pipeline/test/parsers/ddex_orders/index_test.ts
+++ b/packages/pipeline/test/parsers/ddex_orders/index_test.ts
@@ -39,12 +39,14 @@ describe('ddex_orders', () => {
expected.observedTimestamp = observedTimestamp;
expected.orderType = 'bid';
expected.price = new BigNumber(0.5);
- expected.baseAssetSymbol = 'DEF';
- expected.baseAssetAddress = '0xb45df06e38540a675fdb5b598abf2c0dbe9d6b81';
- expected.baseVolume = new BigNumber(5);
- expected.quoteAssetSymbol = 'ABC';
- expected.quoteAssetAddress = '0x0000000000000000000000000000000000000000';
- expected.quoteVolume = new BigNumber(10);
+ // ddex currently confuses base and quote assets.
+ // Switch them to maintain our internal consistency.
+ expected.baseAssetSymbol = 'ABC';
+ expected.baseAssetAddress = '0x0000000000000000000000000000000000000000';
+ expected.baseVolume = new BigNumber(10);
+ expected.quoteAssetSymbol = 'DEF';
+ expected.quoteAssetAddress = '0xb45df06e38540a675fdb5b598abf2c0dbe9d6b81';
+ expected.quoteVolume = new BigNumber(5);
const actual = parseDdexOrder(ddexMarket, observedTimestamp, orderType, source, ddexOrder);
expect(actual).deep.equal(expected);
diff --git a/packages/pipeline/test/parsers/oasis_orders/index_test.ts b/packages/pipeline/test/parsers/oasis_orders/index_test.ts
index 9e8ba9a40..433bfb665 100644
--- a/packages/pipeline/test/parsers/oasis_orders/index_test.ts
+++ b/packages/pipeline/test/parsers/oasis_orders/index_test.ts
@@ -37,10 +37,10 @@ describe('oasis_orders', () => {
expected.price = new BigNumber(0.5);
expected.baseAssetSymbol = 'DEF';
expected.baseAssetAddress = null;
- expected.baseVolume = new BigNumber(5);
+ expected.baseVolume = new BigNumber(10);
expected.quoteAssetSymbol = 'ABC';
expected.quoteAssetAddress = null;
- expected.quoteVolume = new BigNumber(10);
+ expected.quoteVolume = new BigNumber(5);
const actual = parseOasisOrder(oasisMarket, observedTimestamp, orderType, source, oasisOrder);
expect(actual).deep.equal(expected);
diff --git a/packages/pipeline/test/parsers/paradex_orders/index_test.ts b/packages/pipeline/test/parsers/paradex_orders/index_test.ts
index 1522806bf..6b811b90d 100644
--- a/packages/pipeline/test/parsers/paradex_orders/index_test.ts
+++ b/packages/pipeline/test/parsers/paradex_orders/index_test.ts
@@ -42,10 +42,10 @@ describe('paradex_orders', () => {
expected.price = new BigNumber(0.1245);
expected.baseAssetSymbol = 'DEF';
expected.baseAssetAddress = '0xb45df06e38540a675fdb5b598abf2c0dbe9d6b81';
- expected.baseVolume = new BigNumber(412 * 0.1245);
+ expected.baseVolume = new BigNumber(412);
expected.quoteAssetSymbol = 'ABC';
expected.quoteAssetAddress = '0x0000000000000000000000000000000000000000';
- expected.quoteVolume = new BigNumber(412);
+ expected.quoteVolume = new BigNumber(412 * 0.1245);
const actual = parseParadexOrder(paradexMarket, observedTimestamp, orderType, source, paradexOrder);
expect(actual).deep.equal(expected);