aboutsummaryrefslogtreecommitdiffstats
path: root/packages/pipeline/src
diff options
context:
space:
mode:
Diffstat (limited to 'packages/pipeline/src')
-rw-r--r--packages/pipeline/src/scripts/pull_competing_dex_trades.ts13
-rw-r--r--packages/pipeline/src/scripts/pull_copper.ts19
-rw-r--r--packages/pipeline/src/scripts/pull_exchange_events.ts32
-rw-r--r--packages/pipeline/src/scripts/pull_missing_blocks.ts17
-rw-r--r--packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts13
-rw-r--r--packages/pipeline/src/scripts/pull_radar_relay_orders.ts11
-rw-r--r--packages/pipeline/src/scripts/pull_trusted_tokens.ts20
-rw-r--r--packages/pipeline/src/scripts/update_relayer_info.ts9
8 files changed, 68 insertions, 66 deletions
diff --git a/packages/pipeline/src/scripts/pull_competing_dex_trades.ts b/packages/pipeline/src/scripts/pull_competing_dex_trades.ts
index 1478d5615..14644bb2e 100644
--- a/packages/pipeline/src/scripts/pull_competing_dex_trades.ts
+++ b/packages/pipeline/src/scripts/pull_competing_dex_trades.ts
@@ -1,7 +1,8 @@
-// tslint:disable:no-console
import 'reflect-metadata';
import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm';
+import { logUtils } from '@0x/utils';
+
import { BloxySource } from '../data_sources/bloxy';
import { DexTrade } from '../entities';
import * as ormConfig from '../ormconfig';
@@ -27,14 +28,14 @@ async function getAndSaveTradesAsync(): Promise<void> {
const bloxySource = new BloxySource(apiKey);
const tradesRepository = connection.getRepository(DexTrade);
const lastSeenTimestamp = await getLastSeenTimestampAsync(tradesRepository);
- console.log(`Last seen timestamp: ${lastSeenTimestamp === 0 ? 'none' : lastSeenTimestamp}`);
- console.log('Getting latest dex trades...');
+ logUtils.log(`Last seen timestamp: ${lastSeenTimestamp === 0 ? 'none' : lastSeenTimestamp}`);
+ logUtils.log('Getting latest dex trades...');
const rawTrades = await bloxySource.getDexTradesAsync(lastSeenTimestamp);
- console.log(`Parsing ${rawTrades.length} trades...`);
+ logUtils.log(`Parsing ${rawTrades.length} trades...`);
const trades = parseBloxyTrades(rawTrades);
- console.log(`Saving ${trades.length} trades...`);
+ logUtils.log(`Saving ${trades.length} trades...`);
await tradesRepository.save(trades, { chunk: Math.ceil(trades.length / BATCH_SAVE_SIZE) });
- console.log('Done saving trades.');
+ logUtils.log('Done saving trades.');
}
async function getLastSeenTimestampAsync(tradesRepository: Repository<DexTrade>): Promise<number> {
diff --git a/packages/pipeline/src/scripts/pull_copper.ts b/packages/pipeline/src/scripts/pull_copper.ts
index 69814f209..5e4a6a643 100644
--- a/packages/pipeline/src/scripts/pull_copper.ts
+++ b/packages/pipeline/src/scripts/pull_copper.ts
@@ -1,7 +1,8 @@
-// tslint:disable:no-console
import * as R from 'ramda';
import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm';
+import { logUtils } from '@0x/utils';
+
import { CopperEndpoint, CopperSearchParams, CopperSource } from '../data_sources/copper';
import { CopperActivity, CopperActivityType, CopperCustomField, CopperLead, CopperOpportunity } from '../entities';
import * as ormConfig from '../ormconfig';
@@ -43,14 +44,14 @@ let connection: Connection;
async function fetchAndSaveLeadsAsync(source: CopperSource): Promise<void> {
const repository = connection.getRepository(CopperLead);
const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_leads');
- console.log(`Fetching Copper leads starting from ${startTime}...`);
+ logUtils.log(`Fetching Copper leads starting from ${startTime}...`);
await fetchAndSaveAsync(CopperEndpoint.Leads, source, startTime, {}, parseLeads, repository);
}
async function fetchAndSaveOpportunitiesAsync(source: CopperSource): Promise<void> {
const repository = connection.getRepository(CopperOpportunity);
const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_opportunities');
- console.log(`Fetching Copper opportunities starting from ${startTime}...`);
+ logUtils.log(`Fetching Copper opportunities starting from ${startTime}...`);
await fetchAndSaveAsync(
CopperEndpoint.Opportunities,
source,
@@ -67,7 +68,7 @@ async function fetchAndSaveActivitiesAsync(source: CopperSource): Promise<void>
const searchParams = {
minimum_activity_date: Math.floor(startTime / ONE_SECOND),
};
- console.log(`Fetching Copper activities starting from ${startTime}...`);
+ logUtils.log(`Fetching Copper activities starting from ${startTime}...`);
await fetchAndSaveAsync(CopperEndpoint.Activities, source, startTime, searchParams, parseActivities, repository);
}
@@ -97,7 +98,7 @@ async function fetchAndSaveAsync<T extends CopperSearchResponse, E>(
const numPages = await source.fetchNumberOfPagesAsync(endpoint);
try {
for (let i = numPages; i > 0; i--) {
- console.log(`Fetching page ${i}/${numPages} of ${endpoint}...`);
+ logUtils.log(`Fetching page ${i}/${numPages} of ${endpoint}...`);
const raw = await source.fetchSearchResultsAsync<T>(endpoint, {
...searchParams,
page_number: i,
@@ -108,21 +109,21 @@ async function fetchAndSaveAsync<T extends CopperSearchResponse, E>(
saved += newRecords.length;
}
} catch (err) {
- console.log(`Error fetching ${endpoint}, stopping: ${err.stack}`);
+ logUtils.log(`Error fetching ${endpoint}, stopping: ${err.stack}`);
} finally {
- console.log(`Saved ${saved} items from ${endpoint}, done.`);
+ logUtils.log(`Saved ${saved} items from ${endpoint}, done.`);
}
}
async function fetchAndSaveActivityTypesAsync(source: CopperSource): Promise<void> {
- console.log(`Fetching Copper activity types...`);
+ logUtils.log(`Fetching Copper activity types...`);
const activityTypes = await source.fetchActivityTypesAsync();
const repository = connection.getRepository(CopperActivityType);
await repository.save(parseActivityTypes(activityTypes));
}
async function fetchAndSaveCustomFieldsAsync(source: CopperSource): Promise<void> {
- console.log(`Fetching Copper custom fields...`);
+ logUtils.log(`Fetching Copper custom fields...`);
const customFields = await source.fetchCustomFieldsAsync();
const repository = connection.getRepository(CopperCustomField);
await repository.save(parseCustomFields(customFields));
diff --git a/packages/pipeline/src/scripts/pull_exchange_events.ts b/packages/pipeline/src/scripts/pull_exchange_events.ts
index e98fc6629..f8ce4038d 100644
--- a/packages/pipeline/src/scripts/pull_exchange_events.ts
+++ b/packages/pipeline/src/scripts/pull_exchange_events.ts
@@ -1,6 +1,6 @@
-// tslint:disable:no-console
import { web3Factory } from '@0x/dev-utils';
import { Web3ProviderEngine } from '@0x/subproviders';
+import { logUtils } from '@0x/utils';
import { Web3Wrapper } from '@0x/web3-wrapper';
import R = require('ramda');
import 'reflect-metadata';
@@ -32,38 +32,38 @@ let connection: Connection;
})().catch(handleError);
async function getFillEventsAsync(eventsSource: ExchangeEventsSource, endBlock: number): Promise<void> {
- console.log('Checking existing fill events...');
+ logUtils.log('Checking existing fill events...');
const repository = connection.getRepository(ExchangeFillEvent);
const startBlock = await getStartBlockAsync(repository);
- console.log(`Getting fill events starting at ${startBlock}...`);
+ logUtils.log(`Getting fill events starting at ${startBlock}...`);
const eventLogs = await eventsSource.getFillEventsAsync(startBlock, endBlock);
- console.log('Parsing fill events...');
+ logUtils.log('Parsing fill events...');
const events = parseExchangeFillEvents(eventLogs);
- console.log(`Retrieved and parsed ${events.length} total fill events.`);
+ logUtils.log(`Retrieved and parsed ${events.length} total fill events.`);
await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events);
}
async function getCancelEventsAsync(eventsSource: ExchangeEventsSource, endBlock: number): Promise<void> {
- console.log('Checking existing cancel events...');
+ logUtils.log('Checking existing cancel events...');
const repository = connection.getRepository(ExchangeCancelEvent);
const startBlock = await getStartBlockAsync(repository);
- console.log(`Getting cancel events starting at ${startBlock}...`);
+ logUtils.log(`Getting cancel events starting at ${startBlock}...`);
const eventLogs = await eventsSource.getCancelEventsAsync(startBlock, endBlock);
- console.log('Parsing cancel events...');
+ logUtils.log('Parsing cancel events...');
const events = parseExchangeCancelEvents(eventLogs);
- console.log(`Retrieved and parsed ${events.length} total cancel events.`);
+ logUtils.log(`Retrieved and parsed ${events.length} total cancel events.`);
await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events);
}
async function getCancelUpToEventsAsync(eventsSource: ExchangeEventsSource, endBlock: number): Promise<void> {
- console.log('Checking existing CancelUpTo events...');
+ logUtils.log('Checking existing CancelUpTo events...');
const repository = connection.getRepository(ExchangeCancelUpToEvent);
const startBlock = await getStartBlockAsync(repository);
- console.log(`Getting CancelUpTo events starting at ${startBlock}...`);
+ logUtils.log(`Getting CancelUpTo events starting at ${startBlock}...`);
const eventLogs = await eventsSource.getCancelUpToEventsAsync(startBlock, endBlock);
- console.log('Parsing CancelUpTo events...');
+ logUtils.log('Parsing CancelUpTo events...');
const events = parseExchangeCancelUpToEvents(eventLogs);
- console.log(`Retrieved and parsed ${events.length} total CancelUpTo events.`);
+ logUtils.log(`Retrieved and parsed ${events.length} total CancelUpTo events.`);
await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events);
}
@@ -72,7 +72,7 @@ const tableNameRegex = /^[a-zA-Z_]*$/;
async function getStartBlockAsync<T extends ExchangeEvent>(repository: Repository<T>): Promise<number> {
const fillEventCount = await repository.count();
if (fillEventCount === 0) {
- console.log(`No existing ${repository.metadata.name}s found.`);
+ logUtils.log(`No existing ${repository.metadata.name}s found.`);
return EXCHANGE_START_BLOCK;
}
const tableName = repository.metadata.tableName;
@@ -91,7 +91,7 @@ async function saveEventsAsync<T extends ExchangeEvent>(
repository: Repository<T>,
events: T[],
): Promise<void> {
- console.log(`Saving ${repository.metadata.name}s...`);
+ logUtils.log(`Saving ${repository.metadata.name}s...`);
if (isInitialPull) {
// Split data into numChunks pieces of maximum size BATCH_SAVE_SIZE
// each.
@@ -104,7 +104,7 @@ async function saveEventsAsync<T extends ExchangeEvent>(
await saveIndividuallyWithFallbackAsync(repository, events);
}
const totalEvents = await repository.count();
- console.log(`Done saving events. There are now ${totalEvents} total ${repository.metadata.name}s.`);
+ logUtils.log(`Done saving events. There are now ${totalEvents} total ${repository.metadata.name}s.`);
}
async function saveIndividuallyWithFallbackAsync<T extends ExchangeEvent>(
diff --git a/packages/pipeline/src/scripts/pull_missing_blocks.ts b/packages/pipeline/src/scripts/pull_missing_blocks.ts
index ced9d99eb..345ea38fe 100644
--- a/packages/pipeline/src/scripts/pull_missing_blocks.ts
+++ b/packages/pipeline/src/scripts/pull_missing_blocks.ts
@@ -1,5 +1,6 @@
-// tslint:disable:no-console
import { web3Factory } from '@0x/dev-utils';
+import { logUtils } from '@0x/utils';
+
import * as Parallel from 'async-parallel';
import R = require('ramda');
import 'reflect-metadata';
@@ -47,7 +48,7 @@ interface MissingBlocksResponse {
async function getAllMissingBlocksAsync(web3Source: Web3Source, tableName: string): Promise<void> {
const blocksRepository = connection.getRepository(Block);
while (true) {
- console.log(`Checking for missing blocks in ${tableName}...`);
+ logUtils.log(`Checking for missing blocks in ${tableName}...`);
const blockNumbers = await getMissingBlockNumbersAsync(tableName);
if (blockNumbers.length === 0) {
// There are no more missing blocks. We're done.
@@ -56,7 +57,7 @@ async function getAllMissingBlocksAsync(web3Source: Web3Source, tableName: strin
await getAndSaveBlocksAsync(web3Source, blocksRepository, blockNumbers);
}
const totalBlocks = await blocksRepository.count();
- console.log(`Done saving blocks for ${tableName}. There are now ${totalBlocks} total blocks.`);
+ logUtils.log(`Done saving blocks for ${tableName}. There are now ${totalBlocks} total blocks.`);
}
async function getMissingBlockNumbersAsync(tableName: string): Promise<number[]> {
@@ -68,7 +69,7 @@ async function getMissingBlockNumbersAsync(tableName: string): Promise<number[]>
)) as MissingBlocksResponse[];
const blockNumberStrings = R.pluck('block_number', response);
const blockNumbers = R.map(parseInt, blockNumberStrings);
- console.log(`Found ${blockNumbers.length} missing blocks.`);
+ logUtils.log(`Found ${blockNumbers.length} missing blocks.`);
return blockNumbers;
}
@@ -77,14 +78,14 @@ async function getAndSaveBlocksAsync(
blocksRepository: Repository<Block>,
blockNumbers: number[],
): Promise<void> {
- console.log(`Getting block data for ${blockNumbers.length} blocks...`);
+ logUtils.log(`Getting block data for ${blockNumbers.length} blocks...`);
Parallel.setConcurrency(MAX_CONCURRENCY);
const rawBlocks = await Parallel.map(blockNumbers, async (blockNumber: number) =>
web3Source.getBlockInfoAsync(blockNumber),
);
- console.log(`Parsing ${rawBlocks.length} blocks...`);
+ logUtils.log(`Parsing ${rawBlocks.length} blocks...`);
const blocks = R.map(parseBlock, rawBlocks);
- console.log(`Saving ${blocks.length} blocks...`);
+ logUtils.log(`Saving ${blocks.length} blocks...`);
await blocksRepository.save(blocks, { chunk: Math.ceil(blocks.length / BATCH_SAVE_SIZE) });
- console.log('Done saving this batch of blocks');
+ logUtils.log('Done saving this batch of blocks');
}
diff --git a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts
index d44eb5cc6..caac7b9d4 100644
--- a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts
+++ b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts
@@ -1,6 +1,7 @@
-// tslint:disable:no-console
import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm';
+import { logUtils } from '@0x/utils';
+
import { CryptoCompareOHLCVSource } from '../data_sources/ohlcv_external/crypto_compare';
import { OHLCVExternal } from '../entities';
import * as ormConfig from '../ormconfig';
@@ -24,14 +25,14 @@ let connection: Connection;
const jobTime = new Date().getTime();
const tradingPairs = await fetchOHLCVTradingPairsAsync(connection, SOURCE_NAME, EARLIEST_BACKFILL_TIME);
- console.log(`Starting ${tradingPairs.length} job(s) to scrape Crypto Compare for OHLCV records...`);
+ logUtils.log(`Starting ${tradingPairs.length} job(s) to scrape Crypto Compare for OHLCV records...`);
const fetchAndSavePromises = tradingPairs.map(async pair => {
const pairs = source.generateBackfillIntervals(pair);
return fetchAndSaveAsync(source, repository, jobTime, pairs);
});
await Promise.all(fetchAndSavePromises);
- console.log(`Finished scraping OHLCV records from Crypto Compare, exiting...`);
+ logUtils.log(`Finished scraping OHLCV records from Crypto Compare, exiting...`);
process.exit(0);
})().catch(handleError);
@@ -60,7 +61,7 @@ async function fetchAndSaveAsync(
}
try {
const records = await source.getHourlyOHLCVAsync(pair);
- console.log(`Retrieved ${records.length} records for ${JSON.stringify(pair)}`);
+ logUtils.log(`Retrieved ${records.length} records for ${JSON.stringify(pair)}`);
if (records.length > 0) {
const metadata: OHLCVMetadata = {
exchange: source.defaultExchange,
@@ -75,7 +76,7 @@ async function fetchAndSaveAsync(
}
i++;
} catch (err) {
- console.log(`Error scraping OHLCVRecords, stopping task for ${JSON.stringify(pair)} [${err}]`);
+ logUtils.log(`Error scraping OHLCVRecords, stopping task for ${JSON.stringify(pair)} [${err}]`);
break;
}
}
@@ -90,6 +91,6 @@ async function saveRecordsAsync(repository: Repository<OHLCVExternal>, records:
new Date(records[records.length - 1].endTime),
];
- console.log(`Saving ${records.length} records to ${repository.metadata.name}... ${JSON.stringify(metadata)}`);
+ logUtils.log(`Saving ${records.length} records to ${repository.metadata.name}... ${JSON.stringify(metadata)}`);
await repository.save(records);
}
diff --git a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts
index 03fc764f2..8e8720803 100644
--- a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts
+++ b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts
@@ -1,5 +1,6 @@
-// tslint:disable:no-console
import { HttpClient } from '@0x/connect';
+import { logUtils } from '@0x/utils';
+
import * as R from 'ramda';
import 'reflect-metadata';
import { Connection, ConnectionOptions, createConnection, EntityManager } from 'typeorm';
@@ -21,13 +22,13 @@ let connection: Connection;
})().catch(handleError);
async function getOrderbookAsync(): Promise<void> {
- console.log('Getting all orders...');
+ logUtils.log('Getting all orders...');
const connectClient = new HttpClient(RADAR_RELAY_URL);
const rawOrders = await connectClient.getOrdersAsync({
perPage: ORDERS_PER_PAGE,
});
- console.log(`Got ${rawOrders.records.length} orders.`);
- console.log('Parsing orders...');
+ logUtils.log(`Got ${rawOrders.records.length} orders.`);
+ logUtils.log('Parsing orders...');
// Parse the sra orders, then add source url to each.
const orders = R.pipe(
parseSraOrders,
@@ -35,7 +36,7 @@ async function getOrderbookAsync(): Promise<void> {
)(rawOrders);
// Save all the orders and update the observed time stamps in a single
// transaction.
- console.log('Saving orders and updating timestamps...');
+ logUtils.log('Saving orders and updating timestamps...');
const observedTimestamp = Date.now();
await connection.transaction(
async (manager: EntityManager): Promise<void> => {
diff --git a/packages/pipeline/src/scripts/pull_trusted_tokens.ts b/packages/pipeline/src/scripts/pull_trusted_tokens.ts
index 5906deee6..8afb3e052 100644
--- a/packages/pipeline/src/scripts/pull_trusted_tokens.ts
+++ b/packages/pipeline/src/scripts/pull_trusted_tokens.ts
@@ -1,6 +1,8 @@
import 'reflect-metadata';
import { Connection, ConnectionOptions, createConnection } from 'typeorm';
+import { logUtils } from '@0x/utils';
+
import { MetamaskTrustedTokenMeta, TrustedTokenSource, ZeroExTrustedTokenMeta } from '../data_sources/trusted_tokens';
import { TokenMetadata } from '../entities';
import * as ormConfig from '../ormconfig';
@@ -22,31 +24,25 @@ let connection: Connection;
})().catch(handleError);
async function getMetamaskTrustedTokensAsync(): Promise<void> {
- // tslint:disable-next-line:no-console
- console.log('Getting latest metamask trusted tokens list ...');
+ logUtils.log('Getting latest metamask trusted tokens list ...');
const trustedTokensRepository = connection.getRepository(TokenMetadata);
const trustedTokensSource = new TrustedTokenSource<Map<string, MetamaskTrustedTokenMeta>>(
METAMASK_TRUSTED_TOKENS_URL,
);
const resp = await trustedTokensSource.getTrustedTokenMetaAsync();
const trustedTokens = parseMetamaskTrustedTokens(resp);
- // tslint:disable-next-line:no-console
- console.log('Saving metamask trusted tokens list');
+ logUtils.log('Saving metamask trusted tokens list');
await trustedTokensRepository.save(trustedTokens);
- // tslint:disable-next-line:no-console
- console.log('Done saving metamask trusted tokens.');
+ logUtils.log('Done saving metamask trusted tokens.');
}
async function getZeroExTrustedTokensAsync(): Promise<void> {
- // tslint:disable-next-line:no-console
- console.log('Getting latest 0x trusted tokens list ...');
+ logUtils.log('Getting latest 0x trusted tokens list ...');
const trustedTokensRepository = connection.getRepository(TokenMetadata);
const trustedTokensSource = new TrustedTokenSource<ZeroExTrustedTokenMeta[]>(ZEROEX_TRUSTED_TOKENS_URL);
const resp = await trustedTokensSource.getTrustedTokenMetaAsync();
const trustedTokens = parseZeroExTrustedTokens(resp);
- // tslint:disable-next-line:no-console
- console.log('Saving metamask trusted tokens list');
+ logUtils.log('Saving metamask trusted tokens list');
await trustedTokensRepository.save(trustedTokens);
- // tslint:disable-next-line:no-console
- console.log('Done saving metamask trusted tokens.');
+ logUtils.log('Done saving metamask trusted tokens.');
}
diff --git a/packages/pipeline/src/scripts/update_relayer_info.ts b/packages/pipeline/src/scripts/update_relayer_info.ts
index 41d29b385..910a0157c 100644
--- a/packages/pipeline/src/scripts/update_relayer_info.ts
+++ b/packages/pipeline/src/scripts/update_relayer_info.ts
@@ -1,7 +1,8 @@
-// tslint:disable:no-console
import 'reflect-metadata';
import { Connection, ConnectionOptions, createConnection } from 'typeorm';
+import { logUtils } from '@0x/utils';
+
import { RelayerRegistrySource } from '../data_sources/relayer-registry';
import { Relayer } from '../entities';
import * as ormConfig from '../ormconfig';
@@ -22,12 +23,12 @@ let connection: Connection;
})().catch(handleError);
async function getRelayersAsync(): Promise<void> {
- console.log('Getting latest relayer info...');
+ logUtils.log('Getting latest relayer info...');
const relayerRepository = connection.getRepository(Relayer);
const relayerSource = new RelayerRegistrySource(RELAYER_REGISTRY_URL);
const relayersResp = await relayerSource.getRelayerInfoAsync();
const relayers = parseRelayers(relayersResp);
- console.log('Saving relayer info...');
+ logUtils.log('Saving relayer info...');
await relayerRepository.save(relayers);
- console.log('Done saving relayer info.');
+ logUtils.log('Done saving relayer info.');
}