diff options
Diffstat (limited to 'packages/pipeline/src/scripts')
13 files changed, 0 insertions, 1024 deletions
diff --git a/packages/pipeline/src/scripts/pull_competing_dex_trades.ts b/packages/pipeline/src/scripts/pull_competing_dex_trades.ts deleted file mode 100644 index 14644bb2e..000000000 --- a/packages/pipeline/src/scripts/pull_competing_dex_trades.ts +++ /dev/null @@ -1,52 +0,0 @@ -import 'reflect-metadata'; -import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; - -import { logUtils } from '@0x/utils'; - -import { BloxySource } from '../data_sources/bloxy'; -import { DexTrade } from '../entities'; -import * as ormConfig from '../ormconfig'; -import { parseBloxyTrades } from '../parsers/bloxy'; -import { handleError } from '../utils'; - -// Number of trades to save at once. -const BATCH_SAVE_SIZE = 1000; - -let connection: Connection; - -(async () => { - connection = await createConnection(ormConfig as ConnectionOptions); - await getAndSaveTradesAsync(); - process.exit(0); -})().catch(handleError); - -async function getAndSaveTradesAsync(): Promise<void> { - const apiKey = process.env.BLOXY_API_KEY; - if (apiKey === undefined) { - throw new Error('Missing required env var: BLOXY_API_KEY'); - } - const bloxySource = new BloxySource(apiKey); - const tradesRepository = connection.getRepository(DexTrade); - const lastSeenTimestamp = await getLastSeenTimestampAsync(tradesRepository); - logUtils.log(`Last seen timestamp: ${lastSeenTimestamp === 0 ? 'none' : lastSeenTimestamp}`); - logUtils.log('Getting latest dex trades...'); - const rawTrades = await bloxySource.getDexTradesAsync(lastSeenTimestamp); - logUtils.log(`Parsing ${rawTrades.length} trades...`); - const trades = parseBloxyTrades(rawTrades); - logUtils.log(`Saving ${trades.length} trades...`); - await tradesRepository.save(trades, { chunk: Math.ceil(trades.length / BATCH_SAVE_SIZE) }); - logUtils.log('Done saving trades.'); -} - -async function getLastSeenTimestampAsync(tradesRepository: Repository<DexTrade>): Promise<number> { - if ((await tradesRepository.count()) === 0) { - return 0; - } - const response = (await connection.query( - 'SELECT tx_timestamp FROM raw.dex_trades ORDER BY tx_timestamp DESC LIMIT 1', - )) as Array<{ tx_timestamp: number }>; - if (response.length === 0) { - return 0; - } - return response[0].tx_timestamp; -} diff --git a/packages/pipeline/src/scripts/pull_copper.ts b/packages/pipeline/src/scripts/pull_copper.ts deleted file mode 100644 index 5e4a6a643..000000000 --- a/packages/pipeline/src/scripts/pull_copper.ts +++ /dev/null @@ -1,130 +0,0 @@ -import * as R from 'ramda'; -import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; - -import { logUtils } from '@0x/utils'; - -import { CopperEndpoint, CopperSearchParams, CopperSource } from '../data_sources/copper'; -import { CopperActivity, CopperActivityType, CopperCustomField, CopperLead, CopperOpportunity } from '../entities'; -import * as ormConfig from '../ormconfig'; -import { - CopperSearchResponse, - parseActivities, - parseActivityTypes, - parseCustomFields, - parseLeads, - parseOpportunities, -} from '../parsers/copper'; -import { handleError } from '../utils'; -const ONE_SECOND = 1000; -const COPPER_RATE_LIMIT = 10; -let connection: Connection; - -(async () => { - connection = await createConnection(ormConfig as ConnectionOptions); - - const accessToken = process.env.COPPER_ACCESS_TOKEN; - const userEmail = process.env.COPPER_USER_EMAIL; - if (accessToken === undefined || userEmail === undefined) { - throw new Error('Missing required env var: COPPER_ACCESS_TOKEN and/or COPPER_USER_EMAIL'); - } - const source = new CopperSource(COPPER_RATE_LIMIT, accessToken, userEmail); - - const fetchPromises = [ - fetchAndSaveLeadsAsync(source), - fetchAndSaveOpportunitiesAsync(source), - fetchAndSaveActivitiesAsync(source), - fetchAndSaveCustomFieldsAsync(source), - fetchAndSaveActivityTypesAsync(source), - ]; - fetchPromises.forEach(async fn => { - await fn; - }); -})().catch(handleError); - -async function fetchAndSaveLeadsAsync(source: CopperSource): Promise<void> { - const repository = connection.getRepository(CopperLead); - const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_leads'); - logUtils.log(`Fetching Copper leads starting from ${startTime}...`); - await fetchAndSaveAsync(CopperEndpoint.Leads, source, startTime, {}, parseLeads, repository); -} - -async function fetchAndSaveOpportunitiesAsync(source: CopperSource): Promise<void> { - const repository = connection.getRepository(CopperOpportunity); - const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_opportunities'); - logUtils.log(`Fetching Copper opportunities starting from ${startTime}...`); - await fetchAndSaveAsync( - CopperEndpoint.Opportunities, - source, - startTime, - { sort_by: 'name' }, - parseOpportunities, - repository, - ); -} - -async function fetchAndSaveActivitiesAsync(source: CopperSource): Promise<void> { - const repository = connection.getRepository(CopperActivity); - const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_activities'); - const searchParams = { - minimum_activity_date: Math.floor(startTime / ONE_SECOND), - }; - logUtils.log(`Fetching Copper activities starting from ${startTime}...`); - await fetchAndSaveAsync(CopperEndpoint.Activities, source, startTime, searchParams, parseActivities, repository); -} - -async function getMaxAsync(conn: Connection, sortColumn: string, tableName: string): Promise<number> { - const queryResult = await conn.query(`SELECT MAX(${sortColumn}) as _max from ${tableName};`); - if (R.isEmpty(queryResult)) { - return 0; - } else { - return queryResult[0]._max; - } -} - -// (Xianny): Copper API doesn't allow queries to filter by date. To ensure that we are filling in ascending chronological -// order and not missing any records, we are scraping all available pages. If Copper data gets larger, -// it would make sense to search for and start filling from the first page that contains a new record. -// This search would increase our network calls and is not efficient to implement with our current small volume -// of Copper records. -async function fetchAndSaveAsync<T extends CopperSearchResponse, E>( - endpoint: CopperEndpoint, - source: CopperSource, - startTime: number, - searchParams: CopperSearchParams, - parseFn: (recs: T[]) => E[], - repository: Repository<E>, -): Promise<void> { - let saved = 0; - const numPages = await source.fetchNumberOfPagesAsync(endpoint); - try { - for (let i = numPages; i > 0; i--) { - logUtils.log(`Fetching page ${i}/${numPages} of ${endpoint}...`); - const raw = await source.fetchSearchResultsAsync<T>(endpoint, { - ...searchParams, - page_number: i, - }); - const newRecords = raw.filter(rec => rec.date_modified * ONE_SECOND > startTime); - const parsed = parseFn(newRecords); - await repository.save<any>(parsed); - saved += newRecords.length; - } - } catch (err) { - logUtils.log(`Error fetching ${endpoint}, stopping: ${err.stack}`); - } finally { - logUtils.log(`Saved ${saved} items from ${endpoint}, done.`); - } -} - -async function fetchAndSaveActivityTypesAsync(source: CopperSource): Promise<void> { - logUtils.log(`Fetching Copper activity types...`); - const activityTypes = await source.fetchActivityTypesAsync(); - const repository = connection.getRepository(CopperActivityType); - await repository.save(parseActivityTypes(activityTypes)); -} - -async function fetchAndSaveCustomFieldsAsync(source: CopperSource): Promise<void> { - logUtils.log(`Fetching Copper custom fields...`); - const customFields = await source.fetchCustomFieldsAsync(); - const repository = connection.getRepository(CopperCustomField); - await repository.save(parseCustomFields(customFields)); -} diff --git a/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts deleted file mode 100644 index 4e00f258f..000000000 --- a/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts +++ /dev/null @@ -1,55 +0,0 @@ -import { logUtils } from '@0x/utils'; -import * as R from 'ramda'; -import { Connection, ConnectionOptions, createConnection } from 'typeorm'; - -import { DDEX_SOURCE, DdexMarket, DdexSource } from '../data_sources/ddex'; -import { TokenOrderbookSnapshot as TokenOrder } from '../entities'; -import * as ormConfig from '../ormconfig'; -import { parseDdexOrders } from '../parsers/ddex_orders'; -import { handleError } from '../utils'; - -// Number of orders to save at once. -const BATCH_SAVE_SIZE = 1000; - -// Number of markets to retrieve orderbooks for at once. -const MARKET_ORDERBOOK_REQUEST_BATCH_SIZE = 50; - -// Delay between market orderbook requests. -const MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY = 5000; - -let connection: Connection; - -(async () => { - connection = await createConnection(ormConfig as ConnectionOptions); - const ddexSource = new DdexSource(); - const markets = await ddexSource.getActiveMarketsAsync(); - for (const marketsChunk of R.splitEvery(MARKET_ORDERBOOK_REQUEST_BATCH_SIZE, markets)) { - await Promise.all( - marketsChunk.map(async (market: DdexMarket) => getAndSaveMarketOrderbookAsync(ddexSource, market)), - ); - await new Promise<void>(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY)); - } - process.exit(0); -})().catch(handleError); - -/** - * Retrieve orderbook from Ddex API for a given market. Parse orders and insert - * them into our database. - * @param ddexSource Data source which can query Ddex API. - * @param market Object from Ddex API containing market data. - */ -async function getAndSaveMarketOrderbookAsync(ddexSource: DdexSource, market: DdexMarket): Promise<void> { - const orderBook = await ddexSource.getMarketOrderbookAsync(market.id); - const observedTimestamp = Date.now(); - - logUtils.log(`${market.id}: Parsing orders.`); - const orders = parseDdexOrders(orderBook, market, observedTimestamp, DDEX_SOURCE); - - if (orders.length > 0) { - logUtils.log(`${market.id}: Saving ${orders.length} orders.`); - const TokenOrderRepository = connection.getRepository(TokenOrder); - await TokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) }); - } else { - logUtils.log(`${market.id}: 0 orders to save.`); - } -} diff --git a/packages/pipeline/src/scripts/pull_erc20_events.ts b/packages/pipeline/src/scripts/pull_erc20_events.ts deleted file mode 100644 index bd520c610..000000000 --- a/packages/pipeline/src/scripts/pull_erc20_events.ts +++ /dev/null @@ -1,96 +0,0 @@ -import { getContractAddressesForNetworkOrThrow } from '@0x/contract-addresses'; -import { web3Factory } from '@0x/dev-utils'; -import { Web3ProviderEngine } from '@0x/subproviders'; -import { logUtils } from '@0x/utils'; -import { Web3Wrapper } from '@0x/web3-wrapper'; -import 'reflect-metadata'; -import { Connection, ConnectionOptions, createConnection } from 'typeorm'; - -import { ERC20EventsSource } from '../data_sources/contract-wrappers/erc20_events'; -import { ERC20ApprovalEvent } from '../entities'; -import * as ormConfig from '../ormconfig'; -import { parseERC20ApprovalEvents } from '../parsers/events'; -import { handleError, INFURA_ROOT_URL } from '../utils'; - -const NETWORK_ID = 1; -const START_BLOCK_OFFSET = 100; // Number of blocks before the last known block to consider when updating fill events. -const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. -const BLOCK_FINALITY_THRESHOLD = 10; // When to consider blocks as final. Used to compute default endBlock. - -let connection: Connection; - -interface Token { - // name is used for logging only. - name: string; - address: string; - defaultStartBlock: number; -} - -const tokensToGetApprovalEvents: Token[] = [ - { - name: 'WETH', - address: getContractAddressesForNetworkOrThrow(NETWORK_ID).etherToken, - defaultStartBlock: 4719568, // Block when the WETH contract was deployed. - }, - { - name: 'ZRX', - address: getContractAddressesForNetworkOrThrow(NETWORK_ID).zrxToken, - defaultStartBlock: 4145415, // Block when the ZRX contract was deployed. - }, - { - name: 'DAI', - address: '0x89d24a6b4ccb1b6faa2625fe562bdd9a23260359', - defaultStartBlock: 4752008, // Block when the DAI contract was deployed. - }, -]; - -(async () => { - connection = await createConnection(ormConfig as ConnectionOptions); - const provider = web3Factory.getRpcProvider({ - rpcUrl: INFURA_ROOT_URL, - }); - const endBlock = await calculateEndBlockAsync(provider); - for (const token of tokensToGetApprovalEvents) { - await getAndSaveApprovalEventsAsync(provider, token, endBlock); - } - process.exit(0); -})().catch(handleError); - -async function getAndSaveApprovalEventsAsync( - provider: Web3ProviderEngine, - token: Token, - endBlock: number, -): Promise<void> { - logUtils.log(`Getting approval events for ${token.name}...`); - logUtils.log('Checking existing approval events...'); - const repository = connection.getRepository(ERC20ApprovalEvent); - const startBlock = (await getStartBlockAsync(token)) || token.defaultStartBlock; - - logUtils.log(`Getting approval events starting at ${startBlock}...`); - const eventsSource = new ERC20EventsSource(provider, NETWORK_ID, token.address); - const eventLogs = await eventsSource.getApprovalEventsAsync(startBlock, endBlock); - - logUtils.log(`Parsing ${eventLogs.length} approval events...`); - const events = parseERC20ApprovalEvents(eventLogs); - logUtils.log(`Retrieved and parsed ${events.length} total approval events.`); - await repository.save(events, { chunk: Math.ceil(events.length / BATCH_SAVE_SIZE) }); -} - -async function calculateEndBlockAsync(provider: Web3ProviderEngine): Promise<number> { - const web3Wrapper = new Web3Wrapper(provider); - const currentBlock = await web3Wrapper.getBlockNumberAsync(); - return currentBlock - BLOCK_FINALITY_THRESHOLD; -} - -async function getStartBlockAsync(token: Token): Promise<number | null> { - const queryResult = await connection.query( - `SELECT block_number FROM raw.erc20_approval_events WHERE token_address = $1 ORDER BY block_number DESC LIMIT 1`, - [token.address], - ); - if (queryResult.length === 0) { - logUtils.log(`No existing approval events found for ${token.name}.`); - return null; - } - const lastKnownBlock = queryResult[0].block_number; - return lastKnownBlock - START_BLOCK_OFFSET; -} diff --git a/packages/pipeline/src/scripts/pull_exchange_events.ts b/packages/pipeline/src/scripts/pull_exchange_events.ts deleted file mode 100644 index c2c56da6b..000000000 --- a/packages/pipeline/src/scripts/pull_exchange_events.ts +++ /dev/null @@ -1,152 +0,0 @@ -import { web3Factory } from '@0x/dev-utils'; -import { Web3ProviderEngine } from '@0x/subproviders'; -import { logUtils } from '@0x/utils'; -import { Web3Wrapper } from '@0x/web3-wrapper'; -import R = require('ramda'); -import 'reflect-metadata'; -import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; - -import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; -import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeEvent, ExchangeFillEvent } from '../entities'; -import * as ormConfig from '../ormconfig'; -import { parseExchangeCancelEvents, parseExchangeCancelUpToEvents, parseExchangeFillEvents } from '../parsers/events'; -import { EXCHANGE_START_BLOCK, handleError, INFURA_ROOT_URL } from '../utils'; - -const START_BLOCK_OFFSET = 100; // Number of blocks before the last known block to consider when updating fill events. -const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. -const BLOCK_FINALITY_THRESHOLD = 10; // When to consider blocks as final. Used to compute default endBlock. - -let connection: Connection; - -(async () => { - connection = await createConnection(ormConfig as ConnectionOptions); - const provider = web3Factory.getRpcProvider({ - rpcUrl: INFURA_ROOT_URL, - }); - const endBlock = await calculateEndBlockAsync(provider); - const eventsSource = new ExchangeEventsSource(provider, 1); - await getFillEventsAsync(eventsSource, endBlock); - await getCancelEventsAsync(eventsSource, endBlock); - await getCancelUpToEventsAsync(eventsSource, endBlock); - process.exit(0); -})().catch(handleError); - -async function getFillEventsAsync(eventsSource: ExchangeEventsSource, endBlock: number): Promise<void> { - logUtils.log('Checking existing fill events...'); - const repository = connection.getRepository(ExchangeFillEvent); - const startBlock = await getStartBlockAsync(repository); - logUtils.log(`Getting fill events starting at ${startBlock}...`); - const eventLogs = await eventsSource.getFillEventsAsync(startBlock, endBlock); - logUtils.log('Parsing fill events...'); - const events = parseExchangeFillEvents(eventLogs); - logUtils.log(`Retrieved and parsed ${events.length} total fill events.`); - await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); -} - -async function getCancelEventsAsync(eventsSource: ExchangeEventsSource, endBlock: number): Promise<void> { - logUtils.log('Checking existing cancel events...'); - const repository = connection.getRepository(ExchangeCancelEvent); - const startBlock = await getStartBlockAsync(repository); - logUtils.log(`Getting cancel events starting at ${startBlock}...`); - const eventLogs = await eventsSource.getCancelEventsAsync(startBlock, endBlock); - logUtils.log('Parsing cancel events...'); - const events = parseExchangeCancelEvents(eventLogs); - logUtils.log(`Retrieved and parsed ${events.length} total cancel events.`); - await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); -} - -async function getCancelUpToEventsAsync(eventsSource: ExchangeEventsSource, endBlock: number): Promise<void> { - logUtils.log('Checking existing CancelUpTo events...'); - const repository = connection.getRepository(ExchangeCancelUpToEvent); - const startBlock = await getStartBlockAsync(repository); - logUtils.log(`Getting CancelUpTo events starting at ${startBlock}...`); - const eventLogs = await eventsSource.getCancelUpToEventsAsync(startBlock, endBlock); - logUtils.log('Parsing CancelUpTo events...'); - const events = parseExchangeCancelUpToEvents(eventLogs); - logUtils.log(`Retrieved and parsed ${events.length} total CancelUpTo events.`); - await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); -} - -const tableNameRegex = /^[a-zA-Z_]*$/; - -async function getStartBlockAsync<T extends ExchangeEvent>(repository: Repository<T>): Promise<number> { - const fillEventCount = await repository.count(); - if (fillEventCount === 0) { - logUtils.log(`No existing ${repository.metadata.name}s found.`); - return EXCHANGE_START_BLOCK; - } - const tableName = repository.metadata.tableName; - if (!tableNameRegex.test(tableName)) { - throw new Error(`Unexpected special character in table name: ${tableName}`); - } - const queryResult = await connection.query( - `SELECT block_number FROM raw.${tableName} ORDER BY block_number DESC LIMIT 1`, - ); - const lastKnownBlock = queryResult[0].block_number; - return lastKnownBlock - START_BLOCK_OFFSET; -} - -async function saveEventsAsync<T extends ExchangeEvent>( - isInitialPull: boolean, - repository: Repository<T>, - events: T[], -): Promise<void> { - logUtils.log(`Saving ${repository.metadata.name}s...`); - if (isInitialPull) { - // Split data into numChunks pieces of maximum size BATCH_SAVE_SIZE - // each. - for (const eventsBatch of R.splitEvery(BATCH_SAVE_SIZE, events)) { - await repository.insert(eventsBatch); - } - } else { - // If we possibly have some overlap where we need to update some - // existing events, we need to use our workaround/fallback. - await saveIndividuallyWithFallbackAsync(repository, events); - } - const totalEvents = await repository.count(); - logUtils.log(`Done saving events. There are now ${totalEvents} total ${repository.metadata.name}s.`); -} - -async function saveIndividuallyWithFallbackAsync<T extends ExchangeEvent>( - repository: Repository<T>, - events: T[], -): Promise<void> { - // Note(albrow): This is a temporary hack because `save` is not working as - // documented and is causing a primary key constraint violation. Hopefully - // can remove later because this "poor man's upsert" implementation operates - // on one event at a time and is therefore much slower. - for (const event of events) { - try { - // First try an insert. - await repository.insert(event); - } catch (err) { - if (err.message.includes('duplicate key value violates unique constraint')) { - logUtils.log("Ignore the preceeding INSERT failure; it's not unexpected"); - } else { - throw err; - } - // If it fails, assume it was a primary key constraint error and try - // doing an update instead. - // Note(albrow): Unfortunately the `as any` hack here seems - // required. I can't figure out how to convince the type-checker - // that the criteria and the entity itself are the correct type for - // the given repository. If we can remove the `save` hack then this - // will probably no longer be necessary. - await repository.update( - { - contractAddress: event.contractAddress, - blockNumber: event.blockNumber, - logIndex: event.logIndex, - transactionHash: event.transactionHash, - } as any, - event as any, - ); - } - } -} - -async function calculateEndBlockAsync(provider: Web3ProviderEngine): Promise<number> { - const web3Wrapper = new Web3Wrapper(provider); - const currentBlock = await web3Wrapper.getBlockNumberAsync(); - return currentBlock - BLOCK_FINALITY_THRESHOLD; -} diff --git a/packages/pipeline/src/scripts/pull_idex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_idex_orderbook_snapshots.ts deleted file mode 100644 index 490b17766..000000000 --- a/packages/pipeline/src/scripts/pull_idex_orderbook_snapshots.ts +++ /dev/null @@ -1,63 +0,0 @@ -import { logUtils } from '@0x/utils'; -import * as R from 'ramda'; -import { Connection, ConnectionOptions, createConnection } from 'typeorm'; - -import { IDEX_SOURCE, IdexSource } from '../data_sources/idex'; -import { TokenOrderbookSnapshot as TokenOrder } from '../entities'; -import * as ormConfig from '../ormconfig'; -import { parseIdexOrders } from '../parsers/idex_orders'; -import { handleError } from '../utils'; - -// Number of orders to save at once. -const BATCH_SAVE_SIZE = 1000; - -// Number of markets to retrieve orderbooks for at once. -const MARKET_ORDERBOOK_REQUEST_BATCH_SIZE = 100; - -// Delay between market orderbook requests. -const MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY = 2000; - -let connection: Connection; - -(async () => { - connection = await createConnection(ormConfig as ConnectionOptions); - const idexSource = new IdexSource(); - logUtils.log('Getting all IDEX markets'); - const markets = await idexSource.getMarketsAsync(); - logUtils.log(`Got ${markets.length} markets.`); - for (const marketsChunk of R.splitEvery(MARKET_ORDERBOOK_REQUEST_BATCH_SIZE, markets)) { - await Promise.all( - marketsChunk.map(async (marketId: string) => getAndSaveMarketOrderbookAsync(idexSource, marketId)), - ); - await new Promise<void>(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY)); - } - process.exit(0); -})().catch(handleError); - -/** - * Retrieve orderbook from Idex API for a given market. Parse orders and insert - * them into our database. - * @param idexSource Data source which can query Idex API. - * @param marketId String representing market of interest, eg. 'ETH_TIC'. - */ -async function getAndSaveMarketOrderbookAsync(idexSource: IdexSource, marketId: string): Promise<void> { - logUtils.log(`${marketId}: Retrieving orderbook.`); - const orderBook = await idexSource.getMarketOrderbookAsync(marketId); - const observedTimestamp = Date.now(); - - if (!R.has('bids', orderBook) || !R.has('asks', orderBook)) { - logUtils.warn(`${marketId}: Orderbook faulty.`); - return; - } - - logUtils.log(`${marketId}: Parsing orders.`); - const orders = parseIdexOrders(orderBook, observedTimestamp, IDEX_SOURCE); - - if (orders.length > 0) { - logUtils.log(`${marketId}: Saving ${orders.length} orders.`); - const TokenOrderRepository = connection.getRepository(TokenOrder); - await TokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) }); - } else { - logUtils.log(`${marketId}: 0 orders to save.`); - } -} diff --git a/packages/pipeline/src/scripts/pull_missing_blocks.ts b/packages/pipeline/src/scripts/pull_missing_blocks.ts deleted file mode 100644 index 345ea38fe..000000000 --- a/packages/pipeline/src/scripts/pull_missing_blocks.ts +++ /dev/null @@ -1,91 +0,0 @@ -import { web3Factory } from '@0x/dev-utils'; -import { logUtils } from '@0x/utils'; - -import * as Parallel from 'async-parallel'; -import R = require('ramda'); -import 'reflect-metadata'; -import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; - -import { Web3Source } from '../data_sources/web3'; -import { Block } from '../entities'; -import * as ormConfig from '../ormconfig'; -import { parseBlock } from '../parsers/web3'; -import { handleError, INFURA_ROOT_URL } from '../utils'; - -// Number of blocks to save at once. -const BATCH_SAVE_SIZE = 1000; -// Maximum number of requests to send at once. -const MAX_CONCURRENCY = 20; -// Maximum number of blocks to query for at once. This is also the maximum -// number of blocks we will hold in memory prior to being saved to the database. -const MAX_BLOCKS_PER_QUERY = 1000; - -let connection: Connection; - -const tablesWithMissingBlocks = [ - 'raw.exchange_fill_events', - 'raw.exchange_cancel_events', - 'raw.exchange_cancel_up_to_events', - 'raw.erc20_approval_events', -]; - -(async () => { - connection = await createConnection(ormConfig as ConnectionOptions); - const provider = web3Factory.getRpcProvider({ - rpcUrl: INFURA_ROOT_URL, - }); - const web3Source = new Web3Source(provider); - for (const tableName of tablesWithMissingBlocks) { - await getAllMissingBlocksAsync(web3Source, tableName); - } - process.exit(0); -})().catch(handleError); - -interface MissingBlocksResponse { - block_number: string; -} - -async function getAllMissingBlocksAsync(web3Source: Web3Source, tableName: string): Promise<void> { - const blocksRepository = connection.getRepository(Block); - while (true) { - logUtils.log(`Checking for missing blocks in ${tableName}...`); - const blockNumbers = await getMissingBlockNumbersAsync(tableName); - if (blockNumbers.length === 0) { - // There are no more missing blocks. We're done. - break; - } - await getAndSaveBlocksAsync(web3Source, blocksRepository, blockNumbers); - } - const totalBlocks = await blocksRepository.count(); - logUtils.log(`Done saving blocks for ${tableName}. There are now ${totalBlocks} total blocks.`); -} - -async function getMissingBlockNumbersAsync(tableName: string): Promise<number[]> { - // This query returns up to `MAX_BLOCKS_PER_QUERY` distinct block numbers - // which are present in `tableName` but not in `raw.blocks`. - const response = (await connection.query( - `SELECT DISTINCT(block_number) FROM ${tableName} LEFT JOIN raw.blocks ON ${tableName}.block_number = raw.blocks.number WHERE number IS NULL LIMIT $1;`, - [MAX_BLOCKS_PER_QUERY], - )) as MissingBlocksResponse[]; - const blockNumberStrings = R.pluck('block_number', response); - const blockNumbers = R.map(parseInt, blockNumberStrings); - logUtils.log(`Found ${blockNumbers.length} missing blocks.`); - return blockNumbers; -} - -async function getAndSaveBlocksAsync( - web3Source: Web3Source, - blocksRepository: Repository<Block>, - blockNumbers: number[], -): Promise<void> { - logUtils.log(`Getting block data for ${blockNumbers.length} blocks...`); - Parallel.setConcurrency(MAX_CONCURRENCY); - const rawBlocks = await Parallel.map(blockNumbers, async (blockNumber: number) => - web3Source.getBlockInfoAsync(blockNumber), - ); - logUtils.log(`Parsing ${rawBlocks.length} blocks...`); - const blocks = R.map(parseBlock, rawBlocks); - logUtils.log(`Saving ${blocks.length} blocks...`); - await blocksRepository.save(blocks, { chunk: Math.ceil(blocks.length / BATCH_SAVE_SIZE) }); - logUtils.log('Done saving this batch of blocks'); -} diff --git a/packages/pipeline/src/scripts/pull_oasis_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_oasis_orderbook_snapshots.ts deleted file mode 100644 index c4dcf6c83..000000000 --- a/packages/pipeline/src/scripts/pull_oasis_orderbook_snapshots.ts +++ /dev/null @@ -1,58 +0,0 @@ -import { logUtils } from '@0x/utils'; -import * as R from 'ramda'; -import { Connection, ConnectionOptions, createConnection } from 'typeorm'; - -import { OASIS_SOURCE, OasisMarket, OasisSource } from '../data_sources/oasis'; -import { TokenOrderbookSnapshot as TokenOrder } from '../entities'; -import * as ormConfig from '../ormconfig'; -import { parseOasisOrders } from '../parsers/oasis_orders'; -import { handleError } from '../utils'; - -// Number of orders to save at once. -const BATCH_SAVE_SIZE = 1000; - -// Number of markets to retrieve orderbooks for at once. -const MARKET_ORDERBOOK_REQUEST_BATCH_SIZE = 50; - -// Delay between market orderbook requests. -const MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY = 1000; - -let connection: Connection; - -(async () => { - connection = await createConnection(ormConfig as ConnectionOptions); - const oasisSource = new OasisSource(); - logUtils.log('Getting all active Oasis markets'); - const markets = await oasisSource.getActiveMarketsAsync(); - logUtils.log(`Got ${markets.length} markets.`); - for (const marketsChunk of R.splitEvery(MARKET_ORDERBOOK_REQUEST_BATCH_SIZE, markets)) { - await Promise.all( - marketsChunk.map(async (market: OasisMarket) => getAndSaveMarketOrderbookAsync(oasisSource, market)), - ); - await new Promise<void>(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY)); - } - process.exit(0); -})().catch(handleError); - -/** - * Retrieve orderbook from Oasis API for a given market. Parse orders and insert - * them into our database. - * @param oasisSource Data source which can query Oasis API. - * @param marketId String identifying market we want data for. eg. 'REPAUG'. - */ -async function getAndSaveMarketOrderbookAsync(oasisSource: OasisSource, market: OasisMarket): Promise<void> { - logUtils.log(`${market.id}: Retrieving orderbook.`); - const orderBook = await oasisSource.getMarketOrderbookAsync(market.id); - const observedTimestamp = Date.now(); - - logUtils.log(`${market.id}: Parsing orders.`); - const orders = parseOasisOrders(orderBook, market, observedTimestamp, OASIS_SOURCE); - - if (orders.length > 0) { - logUtils.log(`${market.id}: Saving ${orders.length} orders.`); - const TokenOrderRepository = connection.getRepository(TokenOrder); - await TokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) }); - } else { - logUtils.log(`${market.id}: 0 orders to save.`); - } -} diff --git a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts deleted file mode 100644 index caac7b9d4..000000000 --- a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts +++ /dev/null @@ -1,96 +0,0 @@ -import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; - -import { logUtils } from '@0x/utils'; - -import { CryptoCompareOHLCVSource } from '../data_sources/ohlcv_external/crypto_compare'; -import { OHLCVExternal } from '../entities'; -import * as ormConfig from '../ormconfig'; -import { OHLCVMetadata, parseRecords } from '../parsers/ohlcv_external/crypto_compare'; -import { handleError } from '../utils'; -import { fetchOHLCVTradingPairsAsync, TradingPair } from '../utils/get_ohlcv_trading_pairs'; - -const SOURCE_NAME = 'CryptoCompare'; -const TWO_HOURS_AGO = new Date().getTime() - 2 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers - -const MAX_REQS_PER_SECOND = parseInt(process.env.CRYPTOCOMPARE_MAX_REQS_PER_SECOND || '15', 10); // tslint:disable-line:custom-no-magic-numbers -const EARLIEST_BACKFILL_DATE = process.env.OHLCV_EARLIEST_BACKFILL_DATE || '2014-06-01'; -const EARLIEST_BACKFILL_TIME = new Date(EARLIEST_BACKFILL_DATE).getTime(); - -let connection: Connection; - -(async () => { - connection = await createConnection(ormConfig as ConnectionOptions); - const repository = connection.getRepository(OHLCVExternal); - const source = new CryptoCompareOHLCVSource(MAX_REQS_PER_SECOND); - - const jobTime = new Date().getTime(); - const tradingPairs = await fetchOHLCVTradingPairsAsync(connection, SOURCE_NAME, EARLIEST_BACKFILL_TIME); - logUtils.log(`Starting ${tradingPairs.length} job(s) to scrape Crypto Compare for OHLCV records...`); - - const fetchAndSavePromises = tradingPairs.map(async pair => { - const pairs = source.generateBackfillIntervals(pair); - return fetchAndSaveAsync(source, repository, jobTime, pairs); - }); - await Promise.all(fetchAndSavePromises); - logUtils.log(`Finished scraping OHLCV records from Crypto Compare, exiting...`); - process.exit(0); -})().catch(handleError); - -async function fetchAndSaveAsync( - source: CryptoCompareOHLCVSource, - repository: Repository<OHLCVExternal>, - jobTime: number, - pairs: TradingPair[], -): Promise<void> { - const sortAscTimestamp = (a: TradingPair, b: TradingPair): number => { - if (a.latestSavedTime < b.latestSavedTime) { - return -1; - } else if (a.latestSavedTime > b.latestSavedTime) { - return 1; - } else { - return 0; - } - }; - pairs.sort(sortAscTimestamp); - - let i = 0; - while (i < pairs.length) { - const pair = pairs[i]; - if (pair.latestSavedTime > TWO_HOURS_AGO) { - break; - } - try { - const records = await source.getHourlyOHLCVAsync(pair); - logUtils.log(`Retrieved ${records.length} records for ${JSON.stringify(pair)}`); - if (records.length > 0) { - const metadata: OHLCVMetadata = { - exchange: source.defaultExchange, - fromSymbol: pair.fromSymbol, - toSymbol: pair.toSymbol, - source: SOURCE_NAME, - observedTimestamp: jobTime, - interval: source.intervalBetweenRecords, - }; - const parsedRecords = parseRecords(records, metadata); - await saveRecordsAsync(repository, parsedRecords); - } - i++; - } catch (err) { - logUtils.log(`Error scraping OHLCVRecords, stopping task for ${JSON.stringify(pair)} [${err}]`); - break; - } - } - return Promise.resolve(); -} - -async function saveRecordsAsync(repository: Repository<OHLCVExternal>, records: OHLCVExternal[]): Promise<void> { - const metadata = [ - records[0].fromSymbol, - records[0].toSymbol, - new Date(records[0].startTime), - new Date(records[records.length - 1].endTime), - ]; - - logUtils.log(`Saving ${records.length} records to ${repository.metadata.name}... ${JSON.stringify(metadata)}`); - await repository.save(records); -} diff --git a/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts deleted file mode 100644 index 34345f355..000000000 --- a/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts +++ /dev/null @@ -1,87 +0,0 @@ -import { logUtils } from '@0x/utils'; -import { Connection, ConnectionOptions, createConnection } from 'typeorm'; - -import { - PARADEX_SOURCE, - ParadexActiveMarketsResponse, - ParadexMarket, - ParadexSource, - ParadexTokenInfoResponse, -} from '../data_sources/paradex'; -import { TokenOrderbookSnapshot as TokenOrder } from '../entities'; -import * as ormConfig from '../ormconfig'; -import { parseParadexOrders } from '../parsers/paradex_orders'; -import { handleError } from '../utils'; - -// Number of orders to save at once. -const BATCH_SAVE_SIZE = 1000; - -let connection: Connection; - -(async () => { - connection = await createConnection(ormConfig as ConnectionOptions); - const apiKey = process.env.PARADEX_DATA_PIPELINE_API_KEY; - if (apiKey === undefined) { - throw new Error('Missing required env var: PARADEX_DATA_PIPELINE_API_KEY'); - } - const paradexSource = new ParadexSource(apiKey); - const markets = await paradexSource.getActiveMarketsAsync(); - const tokenInfoResponse = await paradexSource.getTokenInfoAsync(); - const extendedMarkets = addTokenAddresses(markets, tokenInfoResponse); - await Promise.all( - extendedMarkets.map(async (market: ParadexMarket) => getAndSaveMarketOrderbookAsync(paradexSource, market)), - ); - process.exit(0); -})().catch(handleError); - -/** - * Extend the default ParadexMarket objects with token addresses. - * @param markets An array of ParadexMarket objects. - * @param tokenInfoResponse An array of ParadexTokenInfo containing the addresses. - */ -function addTokenAddresses( - markets: ParadexActiveMarketsResponse, - tokenInfoResponse: ParadexTokenInfoResponse, -): ParadexMarket[] { - const symbolAddressMapping = new Map<string, string>(); - tokenInfoResponse.forEach(tokenInfo => symbolAddressMapping.set(tokenInfo.symbol, tokenInfo.address)); - - markets.forEach((market: ParadexMarket) => { - if (symbolAddressMapping.has(market.baseToken)) { - market.baseTokenAddress = symbolAddressMapping.get(market.baseToken); - } else { - market.quoteTokenAddress = ''; - logUtils.warn(`${market.baseToken}: No address found.`); - } - - if (symbolAddressMapping.has(market.quoteToken)) { - market.quoteTokenAddress = symbolAddressMapping.get(market.quoteToken); - } else { - market.quoteTokenAddress = ''; - logUtils.warn(`${market.quoteToken}: No address found.`); - } - }); - return markets; -} - -/** - * Retrieve orderbook from Paradex API for a given market. Parse orders and insert - * them into our database. - * @param paradexSource Data source which can query the Paradex API. - * @param market Object from the Paradex API with information about the market in question. - */ -async function getAndSaveMarketOrderbookAsync(paradexSource: ParadexSource, market: ParadexMarket): Promise<void> { - const paradexOrderbookResponse = await paradexSource.getMarketOrderbookAsync(market.symbol); - const observedTimestamp = Date.now(); - - logUtils.log(`${market.symbol}: Parsing orders.`); - const orders = parseParadexOrders(paradexOrderbookResponse, market, observedTimestamp, PARADEX_SOURCE); - - if (orders.length > 0) { - logUtils.log(`${market.symbol}: Saving ${orders.length} orders.`); - const tokenOrderRepository = connection.getRepository(TokenOrder); - await tokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) }); - } else { - logUtils.log(`${market.symbol}: 0 orders to save.`); - } -} diff --git a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts deleted file mode 100644 index 8e8720803..000000000 --- a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts +++ /dev/null @@ -1,62 +0,0 @@ -import { HttpClient } from '@0x/connect'; -import { logUtils } from '@0x/utils'; - -import * as R from 'ramda'; -import 'reflect-metadata'; -import { Connection, ConnectionOptions, createConnection, EntityManager } from 'typeorm'; - -import { createObservedTimestampForOrder, SraOrder } from '../entities'; -import * as ormConfig from '../ormconfig'; -import { parseSraOrders } from '../parsers/sra_orders'; -import { handleError } from '../utils'; - -const RADAR_RELAY_URL = 'https://api.radarrelay.com/0x/v2'; -const ORDERS_PER_PAGE = 10000; // Number of orders to get per request. - -let connection: Connection; - -(async () => { - connection = await createConnection(ormConfig as ConnectionOptions); - await getOrderbookAsync(); - process.exit(0); -})().catch(handleError); - -async function getOrderbookAsync(): Promise<void> { - logUtils.log('Getting all orders...'); - const connectClient = new HttpClient(RADAR_RELAY_URL); - const rawOrders = await connectClient.getOrdersAsync({ - perPage: ORDERS_PER_PAGE, - }); - logUtils.log(`Got ${rawOrders.records.length} orders.`); - logUtils.log('Parsing orders...'); - // Parse the sra orders, then add source url to each. - const orders = R.pipe( - parseSraOrders, - R.map(setSourceUrl(RADAR_RELAY_URL)), - )(rawOrders); - // Save all the orders and update the observed time stamps in a single - // transaction. - logUtils.log('Saving orders and updating timestamps...'); - const observedTimestamp = Date.now(); - await connection.transaction( - async (manager: EntityManager): Promise<void> => { - for (const order of orders) { - await manager.save(SraOrder, order); - const orderObservation = createObservedTimestampForOrder(order, observedTimestamp); - await manager.save(orderObservation); - } - }, - ); -} - -const sourceUrlProp = R.lensProp('sourceUrl'); - -/** - * Sets the source url for a single order. Returns a new order instead of - * mutating the given one. - */ -const setSourceUrl = R.curry( - (sourceURL: string, order: SraOrder): SraOrder => { - return R.set(sourceUrlProp, sourceURL, order); - }, -); diff --git a/packages/pipeline/src/scripts/pull_trusted_tokens.ts b/packages/pipeline/src/scripts/pull_trusted_tokens.ts deleted file mode 100644 index 8afb3e052..000000000 --- a/packages/pipeline/src/scripts/pull_trusted_tokens.ts +++ /dev/null @@ -1,48 +0,0 @@ -import 'reflect-metadata'; -import { Connection, ConnectionOptions, createConnection } from 'typeorm'; - -import { logUtils } from '@0x/utils'; - -import { MetamaskTrustedTokenMeta, TrustedTokenSource, ZeroExTrustedTokenMeta } from '../data_sources/trusted_tokens'; -import { TokenMetadata } from '../entities'; -import * as ormConfig from '../ormconfig'; -import { parseMetamaskTrustedTokens, parseZeroExTrustedTokens } from '../parsers/token_metadata'; -import { handleError } from '../utils'; - -const METAMASK_TRUSTED_TOKENS_URL = - 'https://raw.githubusercontent.com/MetaMask/eth-contract-metadata/d45916c533116510cc8e9e048a8b5fc3732a6b6d/contract-map.json'; - -const ZEROEX_TRUSTED_TOKENS_URL = 'https://website-api.0xproject.com/tokens'; - -let connection: Connection; - -(async () => { - connection = await createConnection(ormConfig as ConnectionOptions); - await getMetamaskTrustedTokensAsync(); - await getZeroExTrustedTokensAsync(); - process.exit(0); -})().catch(handleError); - -async function getMetamaskTrustedTokensAsync(): Promise<void> { - logUtils.log('Getting latest metamask trusted tokens list ...'); - const trustedTokensRepository = connection.getRepository(TokenMetadata); - const trustedTokensSource = new TrustedTokenSource<Map<string, MetamaskTrustedTokenMeta>>( - METAMASK_TRUSTED_TOKENS_URL, - ); - const resp = await trustedTokensSource.getTrustedTokenMetaAsync(); - const trustedTokens = parseMetamaskTrustedTokens(resp); - logUtils.log('Saving metamask trusted tokens list'); - await trustedTokensRepository.save(trustedTokens); - logUtils.log('Done saving metamask trusted tokens.'); -} - -async function getZeroExTrustedTokensAsync(): Promise<void> { - logUtils.log('Getting latest 0x trusted tokens list ...'); - const trustedTokensRepository = connection.getRepository(TokenMetadata); - const trustedTokensSource = new TrustedTokenSource<ZeroExTrustedTokenMeta[]>(ZEROEX_TRUSTED_TOKENS_URL); - const resp = await trustedTokensSource.getTrustedTokenMetaAsync(); - const trustedTokens = parseZeroExTrustedTokens(resp); - logUtils.log('Saving metamask trusted tokens list'); - await trustedTokensRepository.save(trustedTokens); - logUtils.log('Done saving metamask trusted tokens.'); -} diff --git a/packages/pipeline/src/scripts/update_relayer_info.ts b/packages/pipeline/src/scripts/update_relayer_info.ts deleted file mode 100644 index 910a0157c..000000000 --- a/packages/pipeline/src/scripts/update_relayer_info.ts +++ /dev/null @@ -1,34 +0,0 @@ -import 'reflect-metadata'; -import { Connection, ConnectionOptions, createConnection } from 'typeorm'; - -import { logUtils } from '@0x/utils'; - -import { RelayerRegistrySource } from '../data_sources/relayer-registry'; -import { Relayer } from '../entities'; -import * as ormConfig from '../ormconfig'; -import { parseRelayers } from '../parsers/relayer_registry'; -import { handleError } from '../utils'; - -// NOTE(albrow): We need to manually update this URL for now. Fix this when we -// have the relayer-registry behind semantic versioning. -const RELAYER_REGISTRY_URL = - 'https://raw.githubusercontent.com/0xProject/0x-relayer-registry/4701c85677d161ea729a466aebbc1826c6aa2c0b/relayers.json'; - -let connection: Connection; - -(async () => { - connection = await createConnection(ormConfig as ConnectionOptions); - await getRelayersAsync(); - process.exit(0); -})().catch(handleError); - -async function getRelayersAsync(): Promise<void> { - logUtils.log('Getting latest relayer info...'); - const relayerRepository = connection.getRepository(Relayer); - const relayerSource = new RelayerRegistrySource(RELAYER_REGISTRY_URL); - const relayersResp = await relayerSource.getRelayerInfoAsync(); - const relayers = parseRelayers(relayersResp); - logUtils.log('Saving relayer info...'); - await relayerRepository.save(relayers); - logUtils.log('Done saving relayer info.'); -} |