diff options
Diffstat (limited to 'packages/pipeline/src')
8 files changed, 68 insertions, 66 deletions
diff --git a/packages/pipeline/src/scripts/pull_competing_dex_trades.ts b/packages/pipeline/src/scripts/pull_competing_dex_trades.ts index 1478d5615..14644bb2e 100644 --- a/packages/pipeline/src/scripts/pull_competing_dex_trades.ts +++ b/packages/pipeline/src/scripts/pull_competing_dex_trades.ts @@ -1,7 +1,8 @@ -// tslint:disable:no-console import 'reflect-metadata'; import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; +import { logUtils } from '@0x/utils'; + import { BloxySource } from '../data_sources/bloxy'; import { DexTrade } from '../entities'; import * as ormConfig from '../ormconfig'; @@ -27,14 +28,14 @@ async function getAndSaveTradesAsync(): Promise<void> { const bloxySource = new BloxySource(apiKey); const tradesRepository = connection.getRepository(DexTrade); const lastSeenTimestamp = await getLastSeenTimestampAsync(tradesRepository); - console.log(`Last seen timestamp: ${lastSeenTimestamp === 0 ? 'none' : lastSeenTimestamp}`); - console.log('Getting latest dex trades...'); + logUtils.log(`Last seen timestamp: ${lastSeenTimestamp === 0 ? 'none' : lastSeenTimestamp}`); + logUtils.log('Getting latest dex trades...'); const rawTrades = await bloxySource.getDexTradesAsync(lastSeenTimestamp); - console.log(`Parsing ${rawTrades.length} trades...`); + logUtils.log(`Parsing ${rawTrades.length} trades...`); const trades = parseBloxyTrades(rawTrades); - console.log(`Saving ${trades.length} trades...`); + logUtils.log(`Saving ${trades.length} trades...`); await tradesRepository.save(trades, { chunk: Math.ceil(trades.length / BATCH_SAVE_SIZE) }); - console.log('Done saving trades.'); + logUtils.log('Done saving trades.'); } async function getLastSeenTimestampAsync(tradesRepository: Repository<DexTrade>): Promise<number> { diff --git a/packages/pipeline/src/scripts/pull_copper.ts b/packages/pipeline/src/scripts/pull_copper.ts index 69814f209..5e4a6a643 100644 --- a/packages/pipeline/src/scripts/pull_copper.ts +++ b/packages/pipeline/src/scripts/pull_copper.ts @@ -1,7 +1,8 @@ -// tslint:disable:no-console import * as R from 'ramda'; import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; +import { logUtils } from '@0x/utils'; + import { CopperEndpoint, CopperSearchParams, CopperSource } from '../data_sources/copper'; import { CopperActivity, CopperActivityType, CopperCustomField, CopperLead, CopperOpportunity } from '../entities'; import * as ormConfig from '../ormconfig'; @@ -43,14 +44,14 @@ let connection: Connection; async function fetchAndSaveLeadsAsync(source: CopperSource): Promise<void> { const repository = connection.getRepository(CopperLead); const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_leads'); - console.log(`Fetching Copper leads starting from ${startTime}...`); + logUtils.log(`Fetching Copper leads starting from ${startTime}...`); await fetchAndSaveAsync(CopperEndpoint.Leads, source, startTime, {}, parseLeads, repository); } async function fetchAndSaveOpportunitiesAsync(source: CopperSource): Promise<void> { const repository = connection.getRepository(CopperOpportunity); const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_opportunities'); - console.log(`Fetching Copper opportunities starting from ${startTime}...`); + logUtils.log(`Fetching Copper opportunities starting from ${startTime}...`); await fetchAndSaveAsync( CopperEndpoint.Opportunities, source, @@ -67,7 +68,7 @@ async function fetchAndSaveActivitiesAsync(source: CopperSource): Promise<void> const searchParams = { minimum_activity_date: Math.floor(startTime / ONE_SECOND), }; - console.log(`Fetching Copper activities starting from ${startTime}...`); + logUtils.log(`Fetching Copper activities starting from ${startTime}...`); await fetchAndSaveAsync(CopperEndpoint.Activities, source, startTime, searchParams, parseActivities, repository); } @@ -97,7 +98,7 @@ async function fetchAndSaveAsync<T extends CopperSearchResponse, E>( const numPages = await source.fetchNumberOfPagesAsync(endpoint); try { for (let i = numPages; i > 0; i--) { - console.log(`Fetching page ${i}/${numPages} of ${endpoint}...`); + logUtils.log(`Fetching page ${i}/${numPages} of ${endpoint}...`); const raw = await source.fetchSearchResultsAsync<T>(endpoint, { ...searchParams, page_number: i, @@ -108,21 +109,21 @@ async function fetchAndSaveAsync<T extends CopperSearchResponse, E>( saved += newRecords.length; } } catch (err) { - console.log(`Error fetching ${endpoint}, stopping: ${err.stack}`); + logUtils.log(`Error fetching ${endpoint}, stopping: ${err.stack}`); } finally { - console.log(`Saved ${saved} items from ${endpoint}, done.`); + logUtils.log(`Saved ${saved} items from ${endpoint}, done.`); } } async function fetchAndSaveActivityTypesAsync(source: CopperSource): Promise<void> { - console.log(`Fetching Copper activity types...`); + logUtils.log(`Fetching Copper activity types...`); const activityTypes = await source.fetchActivityTypesAsync(); const repository = connection.getRepository(CopperActivityType); await repository.save(parseActivityTypes(activityTypes)); } async function fetchAndSaveCustomFieldsAsync(source: CopperSource): Promise<void> { - console.log(`Fetching Copper custom fields...`); + logUtils.log(`Fetching Copper custom fields...`); const customFields = await source.fetchCustomFieldsAsync(); const repository = connection.getRepository(CopperCustomField); await repository.save(parseCustomFields(customFields)); diff --git a/packages/pipeline/src/scripts/pull_exchange_events.ts b/packages/pipeline/src/scripts/pull_exchange_events.ts index e98fc6629..f8ce4038d 100644 --- a/packages/pipeline/src/scripts/pull_exchange_events.ts +++ b/packages/pipeline/src/scripts/pull_exchange_events.ts @@ -1,6 +1,6 @@ -// tslint:disable:no-console import { web3Factory } from '@0x/dev-utils'; import { Web3ProviderEngine } from '@0x/subproviders'; +import { logUtils } from '@0x/utils'; import { Web3Wrapper } from '@0x/web3-wrapper'; import R = require('ramda'); import 'reflect-metadata'; @@ -32,38 +32,38 @@ let connection: Connection; })().catch(handleError); async function getFillEventsAsync(eventsSource: ExchangeEventsSource, endBlock: number): Promise<void> { - console.log('Checking existing fill events...'); + logUtils.log('Checking existing fill events...'); const repository = connection.getRepository(ExchangeFillEvent); const startBlock = await getStartBlockAsync(repository); - console.log(`Getting fill events starting at ${startBlock}...`); + logUtils.log(`Getting fill events starting at ${startBlock}...`); const eventLogs = await eventsSource.getFillEventsAsync(startBlock, endBlock); - console.log('Parsing fill events...'); + logUtils.log('Parsing fill events...'); const events = parseExchangeFillEvents(eventLogs); - console.log(`Retrieved and parsed ${events.length} total fill events.`); + logUtils.log(`Retrieved and parsed ${events.length} total fill events.`); await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); } async function getCancelEventsAsync(eventsSource: ExchangeEventsSource, endBlock: number): Promise<void> { - console.log('Checking existing cancel events...'); + logUtils.log('Checking existing cancel events...'); const repository = connection.getRepository(ExchangeCancelEvent); const startBlock = await getStartBlockAsync(repository); - console.log(`Getting cancel events starting at ${startBlock}...`); + logUtils.log(`Getting cancel events starting at ${startBlock}...`); const eventLogs = await eventsSource.getCancelEventsAsync(startBlock, endBlock); - console.log('Parsing cancel events...'); + logUtils.log('Parsing cancel events...'); const events = parseExchangeCancelEvents(eventLogs); - console.log(`Retrieved and parsed ${events.length} total cancel events.`); + logUtils.log(`Retrieved and parsed ${events.length} total cancel events.`); await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); } async function getCancelUpToEventsAsync(eventsSource: ExchangeEventsSource, endBlock: number): Promise<void> { - console.log('Checking existing CancelUpTo events...'); + logUtils.log('Checking existing CancelUpTo events...'); const repository = connection.getRepository(ExchangeCancelUpToEvent); const startBlock = await getStartBlockAsync(repository); - console.log(`Getting CancelUpTo events starting at ${startBlock}...`); + logUtils.log(`Getting CancelUpTo events starting at ${startBlock}...`); const eventLogs = await eventsSource.getCancelUpToEventsAsync(startBlock, endBlock); - console.log('Parsing CancelUpTo events...'); + logUtils.log('Parsing CancelUpTo events...'); const events = parseExchangeCancelUpToEvents(eventLogs); - console.log(`Retrieved and parsed ${events.length} total CancelUpTo events.`); + logUtils.log(`Retrieved and parsed ${events.length} total CancelUpTo events.`); await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); } @@ -72,7 +72,7 @@ const tableNameRegex = /^[a-zA-Z_]*$/; async function getStartBlockAsync<T extends ExchangeEvent>(repository: Repository<T>): Promise<number> { const fillEventCount = await repository.count(); if (fillEventCount === 0) { - console.log(`No existing ${repository.metadata.name}s found.`); + logUtils.log(`No existing ${repository.metadata.name}s found.`); return EXCHANGE_START_BLOCK; } const tableName = repository.metadata.tableName; @@ -91,7 +91,7 @@ async function saveEventsAsync<T extends ExchangeEvent>( repository: Repository<T>, events: T[], ): Promise<void> { - console.log(`Saving ${repository.metadata.name}s...`); + logUtils.log(`Saving ${repository.metadata.name}s...`); if (isInitialPull) { // Split data into numChunks pieces of maximum size BATCH_SAVE_SIZE // each. @@ -104,7 +104,7 @@ async function saveEventsAsync<T extends ExchangeEvent>( await saveIndividuallyWithFallbackAsync(repository, events); } const totalEvents = await repository.count(); - console.log(`Done saving events. There are now ${totalEvents} total ${repository.metadata.name}s.`); + logUtils.log(`Done saving events. There are now ${totalEvents} total ${repository.metadata.name}s.`); } async function saveIndividuallyWithFallbackAsync<T extends ExchangeEvent>( diff --git a/packages/pipeline/src/scripts/pull_missing_blocks.ts b/packages/pipeline/src/scripts/pull_missing_blocks.ts index ced9d99eb..345ea38fe 100644 --- a/packages/pipeline/src/scripts/pull_missing_blocks.ts +++ b/packages/pipeline/src/scripts/pull_missing_blocks.ts @@ -1,5 +1,6 @@ -// tslint:disable:no-console import { web3Factory } from '@0x/dev-utils'; +import { logUtils } from '@0x/utils'; + import * as Parallel from 'async-parallel'; import R = require('ramda'); import 'reflect-metadata'; @@ -47,7 +48,7 @@ interface MissingBlocksResponse { async function getAllMissingBlocksAsync(web3Source: Web3Source, tableName: string): Promise<void> { const blocksRepository = connection.getRepository(Block); while (true) { - console.log(`Checking for missing blocks in ${tableName}...`); + logUtils.log(`Checking for missing blocks in ${tableName}...`); const blockNumbers = await getMissingBlockNumbersAsync(tableName); if (blockNumbers.length === 0) { // There are no more missing blocks. We're done. @@ -56,7 +57,7 @@ async function getAllMissingBlocksAsync(web3Source: Web3Source, tableName: strin await getAndSaveBlocksAsync(web3Source, blocksRepository, blockNumbers); } const totalBlocks = await blocksRepository.count(); - console.log(`Done saving blocks for ${tableName}. There are now ${totalBlocks} total blocks.`); + logUtils.log(`Done saving blocks for ${tableName}. There are now ${totalBlocks} total blocks.`); } async function getMissingBlockNumbersAsync(tableName: string): Promise<number[]> { @@ -68,7 +69,7 @@ async function getMissingBlockNumbersAsync(tableName: string): Promise<number[]> )) as MissingBlocksResponse[]; const blockNumberStrings = R.pluck('block_number', response); const blockNumbers = R.map(parseInt, blockNumberStrings); - console.log(`Found ${blockNumbers.length} missing blocks.`); + logUtils.log(`Found ${blockNumbers.length} missing blocks.`); return blockNumbers; } @@ -77,14 +78,14 @@ async function getAndSaveBlocksAsync( blocksRepository: Repository<Block>, blockNumbers: number[], ): Promise<void> { - console.log(`Getting block data for ${blockNumbers.length} blocks...`); + logUtils.log(`Getting block data for ${blockNumbers.length} blocks...`); Parallel.setConcurrency(MAX_CONCURRENCY); const rawBlocks = await Parallel.map(blockNumbers, async (blockNumber: number) => web3Source.getBlockInfoAsync(blockNumber), ); - console.log(`Parsing ${rawBlocks.length} blocks...`); + logUtils.log(`Parsing ${rawBlocks.length} blocks...`); const blocks = R.map(parseBlock, rawBlocks); - console.log(`Saving ${blocks.length} blocks...`); + logUtils.log(`Saving ${blocks.length} blocks...`); await blocksRepository.save(blocks, { chunk: Math.ceil(blocks.length / BATCH_SAVE_SIZE) }); - console.log('Done saving this batch of blocks'); + logUtils.log('Done saving this batch of blocks'); } diff --git a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts index d44eb5cc6..caac7b9d4 100644 --- a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts +++ b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts @@ -1,6 +1,7 @@ -// tslint:disable:no-console import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; +import { logUtils } from '@0x/utils'; + import { CryptoCompareOHLCVSource } from '../data_sources/ohlcv_external/crypto_compare'; import { OHLCVExternal } from '../entities'; import * as ormConfig from '../ormconfig'; @@ -24,14 +25,14 @@ let connection: Connection; const jobTime = new Date().getTime(); const tradingPairs = await fetchOHLCVTradingPairsAsync(connection, SOURCE_NAME, EARLIEST_BACKFILL_TIME); - console.log(`Starting ${tradingPairs.length} job(s) to scrape Crypto Compare for OHLCV records...`); + logUtils.log(`Starting ${tradingPairs.length} job(s) to scrape Crypto Compare for OHLCV records...`); const fetchAndSavePromises = tradingPairs.map(async pair => { const pairs = source.generateBackfillIntervals(pair); return fetchAndSaveAsync(source, repository, jobTime, pairs); }); await Promise.all(fetchAndSavePromises); - console.log(`Finished scraping OHLCV records from Crypto Compare, exiting...`); + logUtils.log(`Finished scraping OHLCV records from Crypto Compare, exiting...`); process.exit(0); })().catch(handleError); @@ -60,7 +61,7 @@ async function fetchAndSaveAsync( } try { const records = await source.getHourlyOHLCVAsync(pair); - console.log(`Retrieved ${records.length} records for ${JSON.stringify(pair)}`); + logUtils.log(`Retrieved ${records.length} records for ${JSON.stringify(pair)}`); if (records.length > 0) { const metadata: OHLCVMetadata = { exchange: source.defaultExchange, @@ -75,7 +76,7 @@ async function fetchAndSaveAsync( } i++; } catch (err) { - console.log(`Error scraping OHLCVRecords, stopping task for ${JSON.stringify(pair)} [${err}]`); + logUtils.log(`Error scraping OHLCVRecords, stopping task for ${JSON.stringify(pair)} [${err}]`); break; } } @@ -90,6 +91,6 @@ async function saveRecordsAsync(repository: Repository<OHLCVExternal>, records: new Date(records[records.length - 1].endTime), ]; - console.log(`Saving ${records.length} records to ${repository.metadata.name}... ${JSON.stringify(metadata)}`); + logUtils.log(`Saving ${records.length} records to ${repository.metadata.name}... ${JSON.stringify(metadata)}`); await repository.save(records); } diff --git a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts index 03fc764f2..8e8720803 100644 --- a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts +++ b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts @@ -1,5 +1,6 @@ -// tslint:disable:no-console import { HttpClient } from '@0x/connect'; +import { logUtils } from '@0x/utils'; + import * as R from 'ramda'; import 'reflect-metadata'; import { Connection, ConnectionOptions, createConnection, EntityManager } from 'typeorm'; @@ -21,13 +22,13 @@ let connection: Connection; })().catch(handleError); async function getOrderbookAsync(): Promise<void> { - console.log('Getting all orders...'); + logUtils.log('Getting all orders...'); const connectClient = new HttpClient(RADAR_RELAY_URL); const rawOrders = await connectClient.getOrdersAsync({ perPage: ORDERS_PER_PAGE, }); - console.log(`Got ${rawOrders.records.length} orders.`); - console.log('Parsing orders...'); + logUtils.log(`Got ${rawOrders.records.length} orders.`); + logUtils.log('Parsing orders...'); // Parse the sra orders, then add source url to each. const orders = R.pipe( parseSraOrders, @@ -35,7 +36,7 @@ async function getOrderbookAsync(): Promise<void> { )(rawOrders); // Save all the orders and update the observed time stamps in a single // transaction. - console.log('Saving orders and updating timestamps...'); + logUtils.log('Saving orders and updating timestamps...'); const observedTimestamp = Date.now(); await connection.transaction( async (manager: EntityManager): Promise<void> => { diff --git a/packages/pipeline/src/scripts/pull_trusted_tokens.ts b/packages/pipeline/src/scripts/pull_trusted_tokens.ts index 5906deee6..8afb3e052 100644 --- a/packages/pipeline/src/scripts/pull_trusted_tokens.ts +++ b/packages/pipeline/src/scripts/pull_trusted_tokens.ts @@ -1,6 +1,8 @@ import 'reflect-metadata'; import { Connection, ConnectionOptions, createConnection } from 'typeorm'; +import { logUtils } from '@0x/utils'; + import { MetamaskTrustedTokenMeta, TrustedTokenSource, ZeroExTrustedTokenMeta } from '../data_sources/trusted_tokens'; import { TokenMetadata } from '../entities'; import * as ormConfig from '../ormconfig'; @@ -22,31 +24,25 @@ let connection: Connection; })().catch(handleError); async function getMetamaskTrustedTokensAsync(): Promise<void> { - // tslint:disable-next-line:no-console - console.log('Getting latest metamask trusted tokens list ...'); + logUtils.log('Getting latest metamask trusted tokens list ...'); const trustedTokensRepository = connection.getRepository(TokenMetadata); const trustedTokensSource = new TrustedTokenSource<Map<string, MetamaskTrustedTokenMeta>>( METAMASK_TRUSTED_TOKENS_URL, ); const resp = await trustedTokensSource.getTrustedTokenMetaAsync(); const trustedTokens = parseMetamaskTrustedTokens(resp); - // tslint:disable-next-line:no-console - console.log('Saving metamask trusted tokens list'); + logUtils.log('Saving metamask trusted tokens list'); await trustedTokensRepository.save(trustedTokens); - // tslint:disable-next-line:no-console - console.log('Done saving metamask trusted tokens.'); + logUtils.log('Done saving metamask trusted tokens.'); } async function getZeroExTrustedTokensAsync(): Promise<void> { - // tslint:disable-next-line:no-console - console.log('Getting latest 0x trusted tokens list ...'); + logUtils.log('Getting latest 0x trusted tokens list ...'); const trustedTokensRepository = connection.getRepository(TokenMetadata); const trustedTokensSource = new TrustedTokenSource<ZeroExTrustedTokenMeta[]>(ZEROEX_TRUSTED_TOKENS_URL); const resp = await trustedTokensSource.getTrustedTokenMetaAsync(); const trustedTokens = parseZeroExTrustedTokens(resp); - // tslint:disable-next-line:no-console - console.log('Saving metamask trusted tokens list'); + logUtils.log('Saving metamask trusted tokens list'); await trustedTokensRepository.save(trustedTokens); - // tslint:disable-next-line:no-console - console.log('Done saving metamask trusted tokens.'); + logUtils.log('Done saving metamask trusted tokens.'); } diff --git a/packages/pipeline/src/scripts/update_relayer_info.ts b/packages/pipeline/src/scripts/update_relayer_info.ts index 41d29b385..910a0157c 100644 --- a/packages/pipeline/src/scripts/update_relayer_info.ts +++ b/packages/pipeline/src/scripts/update_relayer_info.ts @@ -1,7 +1,8 @@ -// tslint:disable:no-console import 'reflect-metadata'; import { Connection, ConnectionOptions, createConnection } from 'typeorm'; +import { logUtils } from '@0x/utils'; + import { RelayerRegistrySource } from '../data_sources/relayer-registry'; import { Relayer } from '../entities'; import * as ormConfig from '../ormconfig'; @@ -22,12 +23,12 @@ let connection: Connection; })().catch(handleError); async function getRelayersAsync(): Promise<void> { - console.log('Getting latest relayer info...'); + logUtils.log('Getting latest relayer info...'); const relayerRepository = connection.getRepository(Relayer); const relayerSource = new RelayerRegistrySource(RELAYER_REGISTRY_URL); const relayersResp = await relayerSource.getRelayerInfoAsync(); const relayers = parseRelayers(relayersResp); - console.log('Saving relayer info...'); + logUtils.log('Saving relayer info...'); await relayerRepository.save(relayers); - console.log('Done saving relayer info.'); + logUtils.log('Done saving relayer info.'); } |