aboutsummaryrefslogtreecommitdiffstats
path: root/packages/pipeline/src
diff options
context:
space:
mode:
Diffstat (limited to 'packages/pipeline/src')
-rw-r--r--packages/pipeline/src/entities/token_order.ts3
-rw-r--r--packages/pipeline/src/parsers/ddex_orders/index.ts22
-rw-r--r--packages/pipeline/src/parsers/events/exchange_events.ts38
-rw-r--r--packages/pipeline/src/parsers/idex_orders/index.ts12
-rw-r--r--packages/pipeline/src/parsers/oasis_orders/index.ts12
-rw-r--r--packages/pipeline/src/parsers/paradex_orders/index.ts8
-rw-r--r--packages/pipeline/src/parsers/sra_orders/index.ts20
-rw-r--r--packages/pipeline/src/scripts/pull_erc20_events.ts68
-rw-r--r--packages/pipeline/src/scripts/pull_missing_blocks.ts49
-rw-r--r--packages/pipeline/src/types.ts11
-rw-r--r--packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts44
-rw-r--r--packages/pipeline/src/utils/transformers/asset_proxy_id_types.ts20
-rw-r--r--packages/pipeline/src/utils/transformers/index.ts1
13 files changed, 207 insertions, 101 deletions
diff --git a/packages/pipeline/src/entities/token_order.ts b/packages/pipeline/src/entities/token_order.ts
index 4b8f0abc3..2709747cb 100644
--- a/packages/pipeline/src/entities/token_order.ts
+++ b/packages/pipeline/src/entities/token_order.ts
@@ -1,7 +1,6 @@
import { BigNumber } from '@0x/utils';
import { Column, Entity, PrimaryColumn } from 'typeorm';
-import { OrderType } from '../types';
import { bigNumberTransformer, numberToBigIntTransformer } from '../utils';
@Entity({ name: 'token_orderbook_snapshots', schema: 'raw' })
@@ -11,7 +10,7 @@ export class TokenOrderbookSnapshot {
@PrimaryColumn({ name: 'source' })
public source!: string;
@PrimaryColumn({ name: 'order_type' })
- public orderType!: OrderType;
+ public orderType!: string;
@PrimaryColumn({ name: 'price', type: 'numeric', transformer: bigNumberTransformer })
public price!: BigNumber;
@PrimaryColumn({ name: 'base_asset_symbol' })
diff --git a/packages/pipeline/src/parsers/ddex_orders/index.ts b/packages/pipeline/src/parsers/ddex_orders/index.ts
index 52a998f9f..eeb9c9d5b 100644
--- a/packages/pipeline/src/parsers/ddex_orders/index.ts
+++ b/packages/pipeline/src/parsers/ddex_orders/index.ts
@@ -23,8 +23,12 @@ export function parseDdexOrders(
): TokenOrder[] {
const aggregatedBids = aggregateOrders(ddexOrderbook.bids);
const aggregatedAsks = aggregateOrders(ddexOrderbook.asks);
- const parsedBids = aggregatedBids.map(order => parseDdexOrder(ddexMarket, observedTimestamp, 'bid', source, order));
- const parsedAsks = aggregatedAsks.map(order => parseDdexOrder(ddexMarket, observedTimestamp, 'ask', source, order));
+ const parsedBids = aggregatedBids.map(order =>
+ parseDdexOrder(ddexMarket, observedTimestamp, OrderType.Bid, source, order),
+ );
+ const parsedAsks = aggregatedAsks.map(order =>
+ parseDdexOrder(ddexMarket, observedTimestamp, OrderType.Ask, source, order),
+ );
return parsedBids.concat(parsedAsks);
}
@@ -54,12 +58,14 @@ export function parseDdexOrder(
tokenOrder.orderType = orderType;
tokenOrder.price = price;
- tokenOrder.baseAssetSymbol = ddexMarket.baseToken;
- tokenOrder.baseAssetAddress = ddexMarket.baseTokenAddress;
- tokenOrder.baseVolume = price.times(amount);
+ // ddex currently confuses quote and base assets.
+ // We switch them here to maintain our internal consistency.
+ tokenOrder.baseAssetSymbol = ddexMarket.quoteToken;
+ tokenOrder.baseAssetAddress = ddexMarket.quoteTokenAddress;
+ tokenOrder.baseVolume = amount;
- tokenOrder.quoteAssetSymbol = ddexMarket.quoteToken;
- tokenOrder.quoteAssetAddress = ddexMarket.quoteTokenAddress;
- tokenOrder.quoteVolume = amount;
+ tokenOrder.quoteAssetSymbol = ddexMarket.baseToken;
+ tokenOrder.quoteAssetAddress = ddexMarket.baseTokenAddress;
+ tokenOrder.quoteVolume = price.times(amount);
return tokenOrder;
}
diff --git a/packages/pipeline/src/parsers/events/exchange_events.ts b/packages/pipeline/src/parsers/events/exchange_events.ts
index e18106c75..9c4a5f89a 100644
--- a/packages/pipeline/src/parsers/events/exchange_events.ts
+++ b/packages/pipeline/src/parsers/events/exchange_events.ts
@@ -5,7 +5,7 @@ import { LogWithDecodedArgs } from 'ethereum-types';
import * as R from 'ramda';
import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeFillEvent } from '../../entities';
-import { bigNumbertoStringOrNull } from '../../utils';
+import { bigNumbertoStringOrNull, convertAssetProxyIdToType } from '../../utils';
/**
* Parses raw event logs for a fill event and returns an array of
@@ -40,9 +40,7 @@ export const parseExchangeCancelUpToEvents: (
*/
export function _convertToExchangeFillEvent(eventLog: LogWithDecodedArgs<ExchangeFillEventArgs>): ExchangeFillEvent {
const makerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.makerAssetData);
- const makerAssetType = makerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721';
const takerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.takerAssetData);
- const takerAssetType = takerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721';
const exchangeFillEvent = new ExchangeFillEvent();
exchangeFillEvent.contractAddress = eventLog.address as string;
exchangeFillEvent.blockNumber = eventLog.blockNumber as number;
@@ -59,16 +57,24 @@ export function _convertToExchangeFillEvent(eventLog: LogWithDecodedArgs<Exchang
exchangeFillEvent.takerFeePaid = eventLog.args.takerFeePaid;
exchangeFillEvent.orderHash = eventLog.args.orderHash;
exchangeFillEvent.rawMakerAssetData = eventLog.args.makerAssetData;
- exchangeFillEvent.makerAssetType = makerAssetType;
+ // tslint:disable-next-line:no-unnecessary-type-assertion
+ exchangeFillEvent.makerAssetType = convertAssetProxyIdToType(makerAssetData.assetProxyId as AssetProxyId);
exchangeFillEvent.makerAssetProxyId = makerAssetData.assetProxyId;
- exchangeFillEvent.makerTokenAddress = makerAssetData.tokenAddress;
+ // HACK(abandeali1): this event schema currently does not support multiple maker/taker assets, so we store the first token address from the MultiAssetProxy assetData
+ exchangeFillEvent.makerTokenAddress = assetDataUtils.isMultiAssetData(makerAssetData)
+ ? assetDataUtils.decodeMultiAssetDataRecursively(eventLog.args.makerAssetData).nestedAssetData[0].tokenAddress
+ : makerAssetData.tokenAddress;
// tslint has a false positive here. Type assertion is required.
// tslint:disable-next-line:no-unnecessary-type-assertion
exchangeFillEvent.makerTokenId = bigNumbertoStringOrNull((makerAssetData as ERC721AssetData).tokenId);
exchangeFillEvent.rawTakerAssetData = eventLog.args.takerAssetData;
- exchangeFillEvent.takerAssetType = takerAssetType;
+ // tslint:disable-next-line:no-unnecessary-type-assertion
+ exchangeFillEvent.takerAssetType = convertAssetProxyIdToType(takerAssetData.assetProxyId as AssetProxyId);
exchangeFillEvent.takerAssetProxyId = takerAssetData.assetProxyId;
- exchangeFillEvent.takerTokenAddress = takerAssetData.tokenAddress;
+ // HACK(abandeali1): this event schema currently does not support multiple maker/taker assets, so we store the first token address from the MultiAssetProxy assetData
+ exchangeFillEvent.takerTokenAddress = assetDataUtils.isMultiAssetData(takerAssetData)
+ ? assetDataUtils.decodeMultiAssetDataRecursively(eventLog.args.takerAssetData).nestedAssetData[0].tokenAddress
+ : takerAssetData.tokenAddress;
// tslint:disable-next-line:no-unnecessary-type-assertion
exchangeFillEvent.takerTokenId = bigNumbertoStringOrNull((takerAssetData as ERC721AssetData).tokenId);
return exchangeFillEvent;
@@ -83,9 +89,7 @@ export function _convertToExchangeCancelEvent(
eventLog: LogWithDecodedArgs<ExchangeCancelEventArgs>,
): ExchangeCancelEvent {
const makerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.makerAssetData);
- const makerAssetType = makerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721';
const takerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.takerAssetData);
- const takerAssetType = takerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721';
const exchangeCancelEvent = new ExchangeCancelEvent();
exchangeCancelEvent.contractAddress = eventLog.address as string;
exchangeCancelEvent.blockNumber = eventLog.blockNumber as number;
@@ -98,15 +102,23 @@ export function _convertToExchangeCancelEvent(
exchangeCancelEvent.senderAddress = eventLog.args.senderAddress;
exchangeCancelEvent.orderHash = eventLog.args.orderHash;
exchangeCancelEvent.rawMakerAssetData = eventLog.args.makerAssetData;
- exchangeCancelEvent.makerAssetType = makerAssetType;
+ // tslint:disable-next-line:no-unnecessary-type-assertion
+ exchangeCancelEvent.makerAssetType = convertAssetProxyIdToType(makerAssetData.assetProxyId as AssetProxyId);
exchangeCancelEvent.makerAssetProxyId = makerAssetData.assetProxyId;
- exchangeCancelEvent.makerTokenAddress = makerAssetData.tokenAddress;
+ // HACK(abandeali1): this event schema currently does not support multiple maker/taker assets, so we store the first token address from the MultiAssetProxy assetData
+ exchangeCancelEvent.makerTokenAddress = assetDataUtils.isMultiAssetData(makerAssetData)
+ ? assetDataUtils.decodeMultiAssetDataRecursively(eventLog.args.makerAssetData).nestedAssetData[0].tokenAddress
+ : makerAssetData.tokenAddress;
// tslint:disable-next-line:no-unnecessary-type-assertion
exchangeCancelEvent.makerTokenId = bigNumbertoStringOrNull((makerAssetData as ERC721AssetData).tokenId);
exchangeCancelEvent.rawTakerAssetData = eventLog.args.takerAssetData;
- exchangeCancelEvent.takerAssetType = takerAssetType;
+ // tslint:disable-next-line:no-unnecessary-type-assertion
+ exchangeCancelEvent.takerAssetType = convertAssetProxyIdToType(takerAssetData.assetProxyId as AssetProxyId);
exchangeCancelEvent.takerAssetProxyId = takerAssetData.assetProxyId;
- exchangeCancelEvent.takerTokenAddress = takerAssetData.tokenAddress;
+ // HACK(abandeali1): this event schema currently does not support multiple maker/taker assets, so we store the first token address from the MultiAssetProxy assetData
+ exchangeCancelEvent.takerTokenAddress = assetDataUtils.isMultiAssetData(takerAssetData)
+ ? assetDataUtils.decodeMultiAssetDataRecursively(eventLog.args.takerAssetData).nestedAssetData[0].tokenAddress
+ : takerAssetData.tokenAddress;
// tslint:disable-next-line:no-unnecessary-type-assertion
exchangeCancelEvent.takerTokenId = bigNumbertoStringOrNull((takerAssetData as ERC721AssetData).tokenId);
return exchangeCancelEvent;
diff --git a/packages/pipeline/src/parsers/idex_orders/index.ts b/packages/pipeline/src/parsers/idex_orders/index.ts
index dfe27455c..14b871195 100644
--- a/packages/pipeline/src/parsers/idex_orders/index.ts
+++ b/packages/pipeline/src/parsers/idex_orders/index.ts
@@ -2,7 +2,7 @@ import { BigNumber } from '@0x/utils';
import { aggregateOrders } from '../utils';
-import { IdexOrder, IdexOrderbook, IdexOrderParam } from '../../data_sources/idex';
+import { IdexOrderbook, IdexOrderParam } from '../../data_sources/idex';
import { TokenOrderbookSnapshot as TokenOrder } from '../../entities';
import { OrderType } from '../../types';
@@ -21,7 +21,9 @@ export function parseIdexOrders(idexOrderbook: IdexOrderbook, observedTimestamp:
const idexBidOrder = idexOrderbook.bids[0];
const parsedBids =
aggregatedBids.length > 0
- ? aggregatedBids.map(order => parseIdexOrder(idexBidOrder.params, observedTimestamp, 'bid', source, order))
+ ? aggregatedBids.map(order =>
+ parseIdexOrder(idexBidOrder.params, observedTimestamp, OrderType.Bid, source, order),
+ )
: [];
const aggregatedAsks = aggregateOrders(idexOrderbook.asks);
@@ -29,7 +31,9 @@ export function parseIdexOrders(idexOrderbook: IdexOrderbook, observedTimestamp:
const idexAskOrder = idexOrderbook.asks[0];
const parsedAsks =
aggregatedAsks.length > 0
- ? aggregatedAsks.map(order => parseIdexOrder(idexAskOrder.params, observedTimestamp, 'ask', source, order))
+ ? aggregatedAsks.map(order =>
+ parseIdexOrder(idexAskOrder.params, observedTimestamp, OrderType.Ask, source, order),
+ )
: [];
return parsedBids.concat(parsedAsks);
}
@@ -62,7 +66,7 @@ export function parseIdexOrder(
tokenOrder.baseVolume = amount;
tokenOrder.quoteVolume = price.times(amount);
- if (orderType === 'bid') {
+ if (orderType === OrderType.Bid) {
tokenOrder.baseAssetSymbol = idexOrderParam.buySymbol;
tokenOrder.baseAssetAddress = idexOrderParam.tokenBuy;
tokenOrder.quoteAssetSymbol = idexOrderParam.sellSymbol;
diff --git a/packages/pipeline/src/parsers/oasis_orders/index.ts b/packages/pipeline/src/parsers/oasis_orders/index.ts
index 7aafbf460..b71fb65b9 100644
--- a/packages/pipeline/src/parsers/oasis_orders/index.ts
+++ b/packages/pipeline/src/parsers/oasis_orders/index.ts
@@ -23,13 +23,13 @@ export function parseOasisOrders(
observedTimestamp: number,
source: string,
): TokenOrder[] {
- const aggregatedBids = aggregateOrders(R.filter(R.propEq('act', 'bid'), oasisOrderbook));
- const aggregatedAsks = aggregateOrders(R.filter(R.propEq('act', 'ask'), oasisOrderbook));
+ const aggregatedBids = aggregateOrders(R.filter(R.propEq('act', OrderType.Bid), oasisOrderbook));
+ const aggregatedAsks = aggregateOrders(R.filter(R.propEq('act', OrderType.Ask), oasisOrderbook));
const parsedBids = aggregatedBids.map(order =>
- parseOasisOrder(oasisMarket, observedTimestamp, 'bid', source, order),
+ parseOasisOrder(oasisMarket, observedTimestamp, OrderType.Bid, source, order),
);
const parsedAsks = aggregatedAsks.map(order =>
- parseOasisOrder(oasisMarket, observedTimestamp, 'ask', source, order),
+ parseOasisOrder(oasisMarket, observedTimestamp, OrderType.Ask, source, order),
);
return parsedBids.concat(parsedAsks);
}
@@ -62,10 +62,10 @@ export function parseOasisOrder(
tokenOrder.baseAssetSymbol = oasisMarket.base;
tokenOrder.baseAssetAddress = null; // Oasis doesn't provide address information
- tokenOrder.baseVolume = price.times(amount);
+ tokenOrder.baseVolume = amount;
tokenOrder.quoteAssetSymbol = oasisMarket.quote;
tokenOrder.quoteAssetAddress = null; // Oasis doesn't provide address information
- tokenOrder.quoteVolume = amount;
+ tokenOrder.quoteVolume = price.times(amount);
return tokenOrder;
}
diff --git a/packages/pipeline/src/parsers/paradex_orders/index.ts b/packages/pipeline/src/parsers/paradex_orders/index.ts
index 7966658a7..85990dae4 100644
--- a/packages/pipeline/src/parsers/paradex_orders/index.ts
+++ b/packages/pipeline/src/parsers/paradex_orders/index.ts
@@ -21,10 +21,10 @@ export function parseParadexOrders(
source: string,
): TokenOrder[] {
const parsedBids = paradexOrderbookResponse.bids.map(order =>
- parseParadexOrder(paradexMarket, observedTimestamp, 'bid', source, order),
+ parseParadexOrder(paradexMarket, observedTimestamp, OrderType.Bid, source, order),
);
const parsedAsks = paradexOrderbookResponse.asks.map(order =>
- parseParadexOrder(paradexMarket, observedTimestamp, 'ask', source, order),
+ parseParadexOrder(paradexMarket, observedTimestamp, OrderType.Ask, source, order),
);
return parsedBids.concat(parsedAsks);
}
@@ -57,10 +57,10 @@ export function parseParadexOrder(
tokenOrder.baseAssetSymbol = paradexMarket.baseToken;
tokenOrder.baseAssetAddress = paradexMarket.baseTokenAddress as string;
- tokenOrder.baseVolume = price.times(amount);
+ tokenOrder.baseVolume = amount;
tokenOrder.quoteAssetSymbol = paradexMarket.quoteToken;
tokenOrder.quoteAssetAddress = paradexMarket.quoteTokenAddress as string;
- tokenOrder.quoteVolume = amount;
+ tokenOrder.quoteVolume = price.times(amount);
return tokenOrder;
}
diff --git a/packages/pipeline/src/parsers/sra_orders/index.ts b/packages/pipeline/src/parsers/sra_orders/index.ts
index ef8901e40..13fe632a4 100644
--- a/packages/pipeline/src/parsers/sra_orders/index.ts
+++ b/packages/pipeline/src/parsers/sra_orders/index.ts
@@ -4,7 +4,7 @@ import { AssetProxyId, ERC721AssetData } from '@0x/types';
import * as R from 'ramda';
import { SraOrder } from '../../entities';
-import { bigNumbertoStringOrNull } from '../../utils';
+import { bigNumbertoStringOrNull, convertAssetProxyIdToType } from '../../utils';
/**
* Parses a raw order response from an SRA endpoint and returns an array of
@@ -22,9 +22,7 @@ export function parseSraOrders(rawOrdersResponse: OrdersResponse): SraOrder[] {
export function _convertToEntity(apiOrder: APIOrder): SraOrder {
// TODO(albrow): refactor out common asset data decoding code.
const makerAssetData = assetDataUtils.decodeAssetDataOrThrow(apiOrder.order.makerAssetData);
- const makerAssetType = makerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721';
const takerAssetData = assetDataUtils.decodeAssetDataOrThrow(apiOrder.order.takerAssetData);
- const takerAssetType = takerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721';
const sraOrder = new SraOrder();
sraOrder.exchangeAddress = apiOrder.order.exchangeAddress;
@@ -43,16 +41,24 @@ export function _convertToEntity(apiOrder: APIOrder): SraOrder {
sraOrder.signature = apiOrder.order.signature;
sraOrder.rawMakerAssetData = apiOrder.order.makerAssetData;
- sraOrder.makerAssetType = makerAssetType;
+ // tslint:disable-next-line:no-unnecessary-type-assertion
+ sraOrder.makerAssetType = convertAssetProxyIdToType(makerAssetData.assetProxyId as AssetProxyId);
sraOrder.makerAssetProxyId = makerAssetData.assetProxyId;
- sraOrder.makerTokenAddress = makerAssetData.tokenAddress;
+ // HACK(abandeali1): this event schema currently does not support multiple maker/taker assets, so we store the first token address from the MultiAssetProxy assetData
+ sraOrder.makerTokenAddress = assetDataUtils.isMultiAssetData(makerAssetData)
+ ? assetDataUtils.decodeMultiAssetDataRecursively(apiOrder.order.makerAssetData).nestedAssetData[0].tokenAddress
+ : makerAssetData.tokenAddress;
// tslint has a false positive here. Type assertion is required.
// tslint:disable-next-line:no-unnecessary-type-assertion
sraOrder.makerTokenId = bigNumbertoStringOrNull((makerAssetData as ERC721AssetData).tokenId);
sraOrder.rawTakerAssetData = apiOrder.order.takerAssetData;
- sraOrder.takerAssetType = takerAssetType;
+ // tslint:disable-next-line:no-unnecessary-type-assertion
+ sraOrder.takerAssetType = convertAssetProxyIdToType(takerAssetData.assetProxyId as AssetProxyId);
sraOrder.takerAssetProxyId = takerAssetData.assetProxyId;
- sraOrder.takerTokenAddress = takerAssetData.tokenAddress;
+ // HACK(abandeali1): this event schema currently does not support multiple maker/taker assets, so we store the first token address from the MultiAssetProxy assetData
+ sraOrder.takerTokenAddress = assetDataUtils.isMultiAssetData(takerAssetData)
+ ? assetDataUtils.decodeMultiAssetDataRecursively(apiOrder.order.takerAssetData).nestedAssetData[0].tokenAddress
+ : takerAssetData.tokenAddress;
// tslint:disable-next-line:no-unnecessary-type-assertion
sraOrder.takerTokenId = bigNumbertoStringOrNull((takerAssetData as ERC721AssetData).tokenId);
diff --git a/packages/pipeline/src/scripts/pull_erc20_events.ts b/packages/pipeline/src/scripts/pull_erc20_events.ts
index 0ad12c97a..bd520c610 100644
--- a/packages/pipeline/src/scripts/pull_erc20_events.ts
+++ b/packages/pipeline/src/scripts/pull_erc20_events.ts
@@ -1,10 +1,10 @@
-// tslint:disable:no-console
import { getContractAddressesForNetworkOrThrow } from '@0x/contract-addresses';
import { web3Factory } from '@0x/dev-utils';
import { Web3ProviderEngine } from '@0x/subproviders';
+import { logUtils } from '@0x/utils';
import { Web3Wrapper } from '@0x/web3-wrapper';
import 'reflect-metadata';
-import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm';
+import { Connection, ConnectionOptions, createConnection } from 'typeorm';
import { ERC20EventsSource } from '../data_sources/contract-wrappers/erc20_events';
import { ERC20ApprovalEvent } from '../entities';
@@ -16,33 +16,63 @@ const NETWORK_ID = 1;
const START_BLOCK_OFFSET = 100; // Number of blocks before the last known block to consider when updating fill events.
const BATCH_SAVE_SIZE = 1000; // Number of events to save at once.
const BLOCK_FINALITY_THRESHOLD = 10; // When to consider blocks as final. Used to compute default endBlock.
-const WETH_START_BLOCK = 4719568; // Block number when the WETH contract was deployed.
let connection: Connection;
+interface Token {
+ // name is used for logging only.
+ name: string;
+ address: string;
+ defaultStartBlock: number;
+}
+
+const tokensToGetApprovalEvents: Token[] = [
+ {
+ name: 'WETH',
+ address: getContractAddressesForNetworkOrThrow(NETWORK_ID).etherToken,
+ defaultStartBlock: 4719568, // Block when the WETH contract was deployed.
+ },
+ {
+ name: 'ZRX',
+ address: getContractAddressesForNetworkOrThrow(NETWORK_ID).zrxToken,
+ defaultStartBlock: 4145415, // Block when the ZRX contract was deployed.
+ },
+ {
+ name: 'DAI',
+ address: '0x89d24a6b4ccb1b6faa2625fe562bdd9a23260359',
+ defaultStartBlock: 4752008, // Block when the DAI contract was deployed.
+ },
+];
+
(async () => {
connection = await createConnection(ormConfig as ConnectionOptions);
const provider = web3Factory.getRpcProvider({
rpcUrl: INFURA_ROOT_URL,
});
const endBlock = await calculateEndBlockAsync(provider);
- await getAndSaveWETHApprovalEventsAsync(provider, endBlock);
+ for (const token of tokensToGetApprovalEvents) {
+ await getAndSaveApprovalEventsAsync(provider, token, endBlock);
+ }
process.exit(0);
})().catch(handleError);
-async function getAndSaveWETHApprovalEventsAsync(provider: Web3ProviderEngine, endBlock: number): Promise<void> {
- console.log('Checking existing approval events...');
+async function getAndSaveApprovalEventsAsync(
+ provider: Web3ProviderEngine,
+ token: Token,
+ endBlock: number,
+): Promise<void> {
+ logUtils.log(`Getting approval events for ${token.name}...`);
+ logUtils.log('Checking existing approval events...');
const repository = connection.getRepository(ERC20ApprovalEvent);
- const startBlock = (await getStartBlockAsync(repository)) || WETH_START_BLOCK;
+ const startBlock = (await getStartBlockAsync(token)) || token.defaultStartBlock;
- console.log(`Getting WETH approval events starting at ${startBlock}...`);
- const wethTokenAddress = getContractAddressesForNetworkOrThrow(NETWORK_ID).etherToken;
- const eventsSource = new ERC20EventsSource(provider, NETWORK_ID, wethTokenAddress);
+ logUtils.log(`Getting approval events starting at ${startBlock}...`);
+ const eventsSource = new ERC20EventsSource(provider, NETWORK_ID, token.address);
const eventLogs = await eventsSource.getApprovalEventsAsync(startBlock, endBlock);
- console.log(`Parsing ${eventLogs.length} WETH approval events...`);
+ logUtils.log(`Parsing ${eventLogs.length} approval events...`);
const events = parseERC20ApprovalEvents(eventLogs);
- console.log(`Retrieved and parsed ${events.length} total WETH approval events.`);
+ logUtils.log(`Retrieved and parsed ${events.length} total approval events.`);
await repository.save(events, { chunk: Math.ceil(events.length / BATCH_SAVE_SIZE) });
}
@@ -52,15 +82,15 @@ async function calculateEndBlockAsync(provider: Web3ProviderEngine): Promise<num
return currentBlock - BLOCK_FINALITY_THRESHOLD;
}
-async function getStartBlockAsync(repository: Repository<ERC20ApprovalEvent>): Promise<number | null> {
- const fillEventCount = await repository.count();
- if (fillEventCount === 0) {
- console.log(`No existing approval events found.`);
- return null;
- }
+async function getStartBlockAsync(token: Token): Promise<number | null> {
const queryResult = await connection.query(
- `SELECT block_number FROM raw.erc20_approval_events ORDER BY block_number DESC LIMIT 1`,
+ `SELECT block_number FROM raw.erc20_approval_events WHERE token_address = $1 ORDER BY block_number DESC LIMIT 1`,
+ [token.address],
);
+ if (queryResult.length === 0) {
+ logUtils.log(`No existing approval events found for ${token.name}.`);
+ return null;
+ }
const lastKnownBlock = queryResult[0].block_number;
return lastKnownBlock - START_BLOCK_OFFSET;
}
diff --git a/packages/pipeline/src/scripts/pull_missing_blocks.ts b/packages/pipeline/src/scripts/pull_missing_blocks.ts
index a5203824c..ced9d99eb 100644
--- a/packages/pipeline/src/scripts/pull_missing_blocks.ts
+++ b/packages/pipeline/src/scripts/pull_missing_blocks.ts
@@ -9,25 +9,34 @@ import { Web3Source } from '../data_sources/web3';
import { Block } from '../entities';
import * as ormConfig from '../ormconfig';
import { parseBlock } from '../parsers/web3';
-import { EXCHANGE_START_BLOCK, handleError, INFURA_ROOT_URL } from '../utils';
+import { handleError, INFURA_ROOT_URL } from '../utils';
// Number of blocks to save at once.
const BATCH_SAVE_SIZE = 1000;
// Maximum number of requests to send at once.
-const MAX_CONCURRENCY = 10;
+const MAX_CONCURRENCY = 20;
// Maximum number of blocks to query for at once. This is also the maximum
// number of blocks we will hold in memory prior to being saved to the database.
const MAX_BLOCKS_PER_QUERY = 1000;
let connection: Connection;
+const tablesWithMissingBlocks = [
+ 'raw.exchange_fill_events',
+ 'raw.exchange_cancel_events',
+ 'raw.exchange_cancel_up_to_events',
+ 'raw.erc20_approval_events',
+];
+
(async () => {
connection = await createConnection(ormConfig as ConnectionOptions);
const provider = web3Factory.getRpcProvider({
rpcUrl: INFURA_ROOT_URL,
});
const web3Source = new Web3Source(provider);
- await getAllMissingBlocksAsync(web3Source);
+ for (const tableName of tablesWithMissingBlocks) {
+ await getAllMissingBlocksAsync(web3Source, tableName);
+ }
process.exit(0);
})().catch(handleError);
@@ -35,44 +44,31 @@ interface MissingBlocksResponse {
block_number: string;
}
-async function getAllMissingBlocksAsync(web3Source: Web3Source): Promise<void> {
+async function getAllMissingBlocksAsync(web3Source: Web3Source, tableName: string): Promise<void> {
const blocksRepository = connection.getRepository(Block);
- let fromBlock = EXCHANGE_START_BLOCK;
while (true) {
- const blockNumbers = await getMissingBlockNumbersAsync(fromBlock);
+ console.log(`Checking for missing blocks in ${tableName}...`);
+ const blockNumbers = await getMissingBlockNumbersAsync(tableName);
if (blockNumbers.length === 0) {
// There are no more missing blocks. We're done.
break;
}
await getAndSaveBlocksAsync(web3Source, blocksRepository, blockNumbers);
- fromBlock = Math.max(...blockNumbers) + 1;
}
const totalBlocks = await blocksRepository.count();
- console.log(`Done saving blocks. There are now ${totalBlocks} total blocks.`);
+ console.log(`Done saving blocks for ${tableName}. There are now ${totalBlocks} total blocks.`);
}
-async function getMissingBlockNumbersAsync(fromBlock: number): Promise<number[]> {
- console.log(`Checking for missing blocks starting at ${fromBlock}...`);
- // Note(albrow): The easiest way to get all the blocks we need is to
- // consider all the events tables together in a single query. If this query
- // gets too slow, we should consider re-architecting so that we can work on
- // getting the blocks for one type of event at a time.
+async function getMissingBlockNumbersAsync(tableName: string): Promise<number[]> {
+ // This query returns up to `MAX_BLOCKS_PER_QUERY` distinct block numbers
+ // which are present in `tableName` but not in `raw.blocks`.
const response = (await connection.query(
- `WITH all_events AS (
- SELECT block_number FROM raw.exchange_fill_events
- UNION SELECT block_number FROM raw.exchange_cancel_events
- UNION SELECT block_number FROM raw.exchange_cancel_up_to_events
- UNION SELECT block_number FROM raw.erc20_approval_events
- )
- SELECT DISTINCT(block_number) FROM all_events
- WHERE block_number NOT IN (SELECT number FROM raw.blocks)
- AND block_number >= $1
- ORDER BY block_number ASC LIMIT $2`,
- [fromBlock, MAX_BLOCKS_PER_QUERY],
+ `SELECT DISTINCT(block_number) FROM ${tableName} LEFT JOIN raw.blocks ON ${tableName}.block_number = raw.blocks.number WHERE number IS NULL LIMIT $1;`,
+ [MAX_BLOCKS_PER_QUERY],
)) as MissingBlocksResponse[];
const blockNumberStrings = R.pluck('block_number', response);
const blockNumbers = R.map(parseInt, blockNumberStrings);
- console.log(`Found ${blockNumbers.length} missing blocks in the given range.`);
+ console.log(`Found ${blockNumbers.length} missing blocks.`);
return blockNumbers;
}
@@ -90,4 +86,5 @@ async function getAndSaveBlocksAsync(
const blocks = R.map(parseBlock, rawBlocks);
console.log(`Saving ${blocks.length} blocks...`);
await blocksRepository.save(blocks, { chunk: Math.ceil(blocks.length / BATCH_SAVE_SIZE) });
+ console.log('Done saving this batch of blocks');
}
diff --git a/packages/pipeline/src/types.ts b/packages/pipeline/src/types.ts
index e02b42a40..5f2121807 100644
--- a/packages/pipeline/src/types.ts
+++ b/packages/pipeline/src/types.ts
@@ -1,2 +1,9 @@
-export type AssetType = 'erc20' | 'erc721';
-export type OrderType = 'bid' | 'ask';
+export enum AssetType {
+ ERC20 = 'erc20',
+ ERC721 = 'erc721',
+ MultiAsset = 'multiAsset',
+}
+export enum OrderType {
+ Bid = 'bid',
+ Ask = 'ask',
+}
diff --git a/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts b/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts
index 9d3ef2fba..19f81344e 100644
--- a/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts
+++ b/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts
@@ -23,6 +23,18 @@ interface CryptoCompareCoin {
const TO_CURRENCIES = ['USD', 'EUR', 'ETH', 'USDT'];
const ETHEREUM_IDENTIFIER = '7605';
const HTTP_OK_STATUS = 200;
+
+interface StaticPair {
+ fromSymbol: string;
+ toSymbol: string;
+}
+const SPECIAL_CASES: StaticPair[] = [
+ {
+ fromSymbol: 'ETH',
+ toSymbol: 'USD',
+ },
+];
+
/**
* Get trading pairs with latest scraped time for OHLCV records
* @param conn a typeorm Connection to postgres
@@ -44,6 +56,7 @@ export async function fetchOHLCVTradingPairsAsync(
FROM raw.ohlcv_external
GROUP BY from_symbol, to_symbol;`);
+ // build addressable index: { fromsym: { tosym: time }}
const latestTradingPairsIndex: { [fromSym: string]: { [toSym: string]: number } } = {};
latestTradingPairs.forEach(pair => {
const latestIndex: { [toSym: string]: number } = latestTradingPairsIndex[pair.from_symbol] || {};
@@ -51,6 +64,13 @@ export async function fetchOHLCVTradingPairsAsync(
latestTradingPairsIndex[pair.from_symbol] = latestIndex;
});
+ // match time to special cases
+ const specialCases: TradingPair[] = SPECIAL_CASES.map(pair => {
+ const latestSavedTime =
+ R.path<number>([pair.fromSymbol, pair.toSymbol], latestTradingPairsIndex) || earliestBackfillTime;
+ return R.assoc('latestSavedTime', latestSavedTime, pair);
+ });
+
// get token symbols used by Crypto Compare
const allCoinsResp = await fetchAsync(COINLIST_API);
if (allCoinsResp.status !== HTTP_OK_STATUS) {
@@ -66,27 +86,31 @@ export async function fetchOHLCVTradingPairsAsync(
});
// fetch all tokens that are traded on 0x
- const rawTokenAddresses: Array<{ tokenaddress: string }> = await conn.query(
+ const rawEventTokenAddresses: Array<{ tokenaddress: string }> = await conn.query(
`SELECT DISTINCT(maker_token_address) as tokenaddress FROM raw.exchange_fill_events UNION
SELECT DISTINCT(taker_token_address) as tokenaddress FROM raw.exchange_fill_events`,
);
- const tokenAddresses = R.pluck('tokenaddress', rawTokenAddresses);
+
+ // tslint:disable-next-line:no-unbound-method
+ const eventTokenAddresses = R.pluck('tokenaddress', rawEventTokenAddresses).map(R.toLower);
// join token addresses with CC symbols
- const allTokenSymbols: string[] = tokenAddresses
- .map(tokenAddress => erc20CoinsIndex.get(tokenAddress.toLowerCase()) || '')
- .filter(x => x);
+ const eventTokenSymbols: string[] = eventTokenAddresses
+ .filter(tokenAddress => erc20CoinsIndex.has(tokenAddress))
+ .map(tokenAddress => erc20CoinsIndex.get(tokenAddress) as string);
- // generate list of all tokens with time of latest existing record OR default earliest time
- const allTradingPairCombinations: TradingPair[] = R.chain(sym => {
+ // join traded tokens with fiat and latest backfill time
+ const eventTradingPairs: TradingPair[] = R.chain(sym => {
return TO_CURRENCIES.map(fiat => {
- return {
+ const pair = {
fromSymbol: sym,
toSymbol: fiat,
latestSavedTime: R.path<number>([sym, fiat], latestTradingPairsIndex) || earliestBackfillTime,
};
+ return pair;
});
- }, allTokenSymbols);
+ }, eventTokenSymbols);
- return allTradingPairCombinations;
+ // join with special cases
+ return R.concat(eventTradingPairs, specialCases);
}
diff --git a/packages/pipeline/src/utils/transformers/asset_proxy_id_types.ts b/packages/pipeline/src/utils/transformers/asset_proxy_id_types.ts
new file mode 100644
index 000000000..2cd05a616
--- /dev/null
+++ b/packages/pipeline/src/utils/transformers/asset_proxy_id_types.ts
@@ -0,0 +1,20 @@
+import { AssetProxyId } from '@0x/types';
+
+import { AssetType } from '../../types';
+
+/**
+ * Converts an assetProxyId to its string equivalent
+ * @param assetProxyId Id of AssetProxy
+ */
+export function convertAssetProxyIdToType(assetProxyId: AssetProxyId): AssetType {
+ switch (assetProxyId) {
+ case AssetProxyId.ERC20:
+ return AssetType.ERC20;
+ case AssetProxyId.ERC721:
+ return AssetType.ERC721;
+ case AssetProxyId.MultiAsset:
+ return AssetType.MultiAsset;
+ default:
+ throw new Error(`${assetProxyId} not a supported assetProxyId`);
+ }
+}
diff --git a/packages/pipeline/src/utils/transformers/index.ts b/packages/pipeline/src/utils/transformers/index.ts
index 232c1c5de..31a4c9223 100644
--- a/packages/pipeline/src/utils/transformers/index.ts
+++ b/packages/pipeline/src/utils/transformers/index.ts
@@ -1,2 +1,3 @@
export * from './big_number';
export * from './number_to_bigint';
+export * from './asset_proxy_id_types';