From 57e7119c0d4f1ab7dd1d4c0118e72dc1706e2151 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Mon, 17 Sep 2018 11:27:38 -0700 Subject: Rebase pipeline branch off development --- packages/pipeline/src/scripts/create_tables.ts | 258 ++++++++++ packages/pipeline/src/scripts/join_tables.ts | 234 +++++++++ packages/pipeline/src/scripts/query_data.ts | 87 ++++ packages/pipeline/src/scripts/scrape_data.ts | 649 +++++++++++++++++++++++++ 4 files changed, 1228 insertions(+) create mode 100644 packages/pipeline/src/scripts/create_tables.ts create mode 100644 packages/pipeline/src/scripts/join_tables.ts create mode 100644 packages/pipeline/src/scripts/query_data.ts create mode 100644 packages/pipeline/src/scripts/scrape_data.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/create_tables.ts b/packages/pipeline/src/scripts/create_tables.ts new file mode 100644 index 000000000..fd0d2b78b --- /dev/null +++ b/packages/pipeline/src/scripts/create_tables.ts @@ -0,0 +1,258 @@ +import * as commandLineArgs from 'command-line-args'; + +import { postgresClient } from '../postgres'; +import { formatters } from '../utils'; +const tableQueries: any = { + events_full: `CREATE TABLE IF NOT EXISTS events_full ( + timestamp TIMESTAMP WITH TIME ZONE, + event_type VARCHAR, + error_id VARCHAR, + order_hash CHAR(66), + maker CHAR(42), + maker_amount NUMERIC(78), + maker_fee NUMERIC(78), + maker_token CHAR(42), + taker CHAR(42), + taker_amount NUMERIC(78), + taker_fee NUMERIC(78), + taker_token CHAR(42), + txn_hash CHAR(66), + gas_used NUMERIC(78), + gas_price NUMERIC(78), + fee_recipient CHAR(42), + method_id CHAR(10), + salt VARCHAR, + block_number BIGINT, + log_index BIGINT, + taker_symbol VARCHAR, + taker_name VARCHAR, + taker_decimals BIGINT, + taker_usd_price NUMERIC(78), + taker_txn_usd_value NUMERIC(78), + maker_symbol VARCHAR, + maker_name VARCHAR, + maker_decimals BIGINT, + maker_usd_price NUMERIC(78), + maker_txn_usd_value NUMERIC(78), + PRIMARY KEY (txn_hash, order_hash, log_index) + )`, + events: `CREATE TABLE IF NOT EXISTS events ( + timestamp TIMESTAMP WITH TIME ZONE, + event_type VARCHAR, + error_id VARCHAR, + order_hash CHAR(66), + maker CHAR(42), + maker_amount NUMERIC(78), + maker_fee NUMERIC(78), + maker_token CHAR(42), + taker CHAR(42), + taker_amount NUMERIC(78), + taker_fee NUMERIC(78), + taker_token CHAR(42), + txn_hash CHAR(66), + gas_used NUMERIC(78), + gas_price NUMERIC(78), + fee_recipient CHAR(42), + method_id CHAR(10), + salt VARCHAR, + block_number BIGINT, + log_index BIGINT, + PRIMARY KEY (txn_hash, order_hash, log_index) + )`, + events_staging: `CREATE TABLE IF NOT EXISTS events_staging ( + timestamp TIMESTAMP WITH TIME ZONE, + event_type VARCHAR, + error_id VARCHAR, + order_hash CHAR(66), + maker CHAR(42), + maker_amount NUMERIC(78), + maker_fee NUMERIC(78), + maker_token CHAR(42), + taker CHAR(42), + taker_amount NUMERIC(78), + taker_fee NUMERIC(78), + taker_token CHAR(42), + txn_hash CHAR(66), + fee_recipient CHAR(42), + block_number BIGINT, + log_index BIGINT, + PRIMARY KEY (txn_hash, order_hash, log_index) + )`, + events_raw: `CREATE TABLE IF NOT EXISTS events_raw ( + event_type VARCHAR, + error_id VARCHAR, + order_hash CHAR(66), + maker CHAR(42), + maker_amount NUMERIC(78), + maker_fee NUMERIC(78), + maker_token CHAR(42), + taker CHAR(42), + taker_amount NUMERIC(78), + taker_fee NUMERIC(78), + taker_token CHAR(42), + txn_hash CHAR(66), + fee_recipient CHAR(42), + block_number BIGINT, + log_index BIGINT, + PRIMARY KEY (txn_hash, order_hash, log_index) + )`, + blocks: `CREATE TABLE IF NOT EXISTS blocks ( + timestamp TIMESTAMP WITH TIME ZONE, + block_hash CHAR(66) UNIQUE, + block_number BIGINT, + PRIMARY KEY (block_hash) + )`, + transactions: `CREATE TABLE IF NOT EXISTS transactions ( + txn_hash CHAR(66) UNIQUE, + block_hash CHAR(66), + block_number BIGINT, + gas_used NUMERIC(78), + gas_price NUMERIC(78), + method_id CHAR(10), + salt VARCHAR, + PRIMARY KEY (txn_hash) + )`, + tokens: `CREATE TABLE IF NOT EXISTS tokens ( + address CHAR(42) UNIQUE, + name VARCHAR, + symbol VARCHAR, + decimals INT, + PRIMARY KEY (address) + )`, + prices: `CREATE TABLE IF NOT EXISTS prices ( + address CHAR(42) UNIQUE, + timestamp TIMESTAMP WITH TIME ZONE, + price NUMERIC(78, 18), + PRIMARY KEY (address, timestamp) + )`, + relayers: `CREATE TABLE IF NOT EXISTS relayers ( + name VARCHAR UNIQUE, + url VARCHAR DEFAULT '', + sra_http_endpoint VARCHAR DEFAULT '', + sra_ws_endpoint VARCHAR DEFAULT '', + fee_recipient_addresses CHAR(42)[] DEFAULT '{}', + taker_addresses CHAR(42)[] DEFAULT '{}', + PRIMARY KEY(name)`, + historical_prices: `CREATE TABLE IF NOT EXISTS historical_prices ( + token VARCHAR, + base VARCHAR, + timestamp TIMESTAMP WITH TIME ZONE, + close NUMERIC(78, 18), + high NUMERIC(78, 18), + low NUMERIC(78, 18), + open NUMERIC(78, 18), + volume_from NUMERIC(78, 18), + volume_to NUMERIC(78, 18), + PRIMARY KEY (token, base, timestamp) + )`, + orders: `CREATE TABLE IF NOT EXISTS orders ( + relayer_id VARCHAR, + exchange_contract_address CHAR(42), + maker CHAR(42), + maker_amount NUMERIC(78), + maker_fee NUMERIC(78), + maker_token CHAR(42), + taker CHAR(42), + taker_amount NUMERIC(78), + taker_fee NUMERIC(78), + taker_token CHAR(42), + fee_recipient CHAR(42), + expiration_unix_timestamp_sec NUMERIC(78), + salt VARCHAR, + order_hash CHAR(66), + PRIMARY KEY (relayer_id, order_hash) + )`, +}; +function _safeQuery(query: string): any { + return new Promise((resolve, reject) => { + postgresClient + .query(query) + .then((data: any) => { + resolve(data); + }) + .catch((err: any) => { + reject(err); + }); + }); +} +export const tableScripts = { + createTable(query: string): any { + return _safeQuery(query); + }, + createAllTables(): any { + for (const tableName of tableQueries) { + _safeQuery(tableQueries[tableName]); + } + }, +}; +export const insertDataScripts = { + insertSingleRow(table: string, object: any): any { + return new Promise((resolve, reject) => { + const columns = Object.keys(object); + const safeArray: any = []; + for (const key of columns) { + if (key in object) { + if (key === 'timestamp') { + safeArray.push('to_timestamp(' + object[key] + ')'); + } else if (typeof object[key] === 'string' || object[key] instanceof String) { + safeArray.push(formatters.escapeSQLParam(object[key])); + } else { + safeArray.push(object[key]); + } + } else { + safeArray.push('default'); + } + } + const queryString = `INSERT INTO ${table} (${columns}) VALUES (${safeArray}) ON CONFLICT DO NOTHING`; + console.log(queryString); + postgresClient + .query(queryString) + .then((data: any) => { + resolve(data); + }) + .catch((err: any) => { + reject(err); + }); + }); + }, + insertMultipleRows(table: string, rows: any[], columns: any[]): any { + return new Promise((resolve, reject) => { + if (rows.length > 0) { + const rowsSplit = rows.map((value, index) => { + const safeArray: any = []; + for (const key of columns) { + if (key in value) { + if (key === 'timestamp') { + safeArray.push('to_timestamp(' + value[key] + ')'); + } else if (typeof value[key] === 'string' || value[key] instanceof String) { + safeArray.push(formatters.escapeSQLParam(value[key])); + } else if (value[key] instanceof Array) { + const escapedArray = value[key].map((subValue: string, subIndex: number) => { + return formatters.escapeSQLParam(subValue); + }); + safeArray.push('ARRAY[' + escapedArray.toString() + ']'); + } else { + safeArray.push(value[key]); + } + } else { + safeArray.push('default'); + } + } + return '(' + safeArray + ')'; + }); + const queryString = `INSERT INTO ${table} (${columns}) VALUES ${rowsSplit} ON CONFLICT DO NOTHING`; + postgresClient + .query(queryString) + .then((data: any) => { + resolve(data); + }) + .catch((err: any) => { + // console.log(err); + reject(err); + }); + } else { + resolve({}); + } + }); + }, +}; diff --git a/packages/pipeline/src/scripts/join_tables.ts b/packages/pipeline/src/scripts/join_tables.ts new file mode 100644 index 000000000..e7c05b39a --- /dev/null +++ b/packages/pipeline/src/scripts/join_tables.ts @@ -0,0 +1,234 @@ +import * as commandLineArgs from 'command-line-args'; + +import { postgresClient } from '../postgres'; +import { formatters } from '../utils'; +const optionDefinitions = [ + { name: 'name', alias: 'n', type: String }, + { name: 'from', alias: 'f', type: Number }, + { name: 'to', alias: 't', type: Number }, +]; +const cli = commandLineArgs(optionDefinitions); +const dataInsertionQueries: any = { + events_staging: `INSERT INTO events_staging ( + timestamp, + event_type, + error_id, + order_hash, + maker, + maker_amount, + maker_fee, + maker_token, + taker, + taker_amount, + taker_fee, + taker_token, + txn_hash, + fee_recipient, + block_number, + log_index + ) + (SELECT + b.timestamp, + a.event_type, + a.error_id, + a.order_hash, + a.maker, + a.maker_amount, + a.maker_fee, + a.maker_token, + a.taker, + a.taker_amount, + a.taker_fee, + a.taker_token, + a.txn_hash, + a.fee_recipient, + a.block_number, + a.log_index + FROM + events_raw a + JOIN + blocks b + ON + a.block_number = b.block_number + AND + b.block_number >= $1 + AND + b.block_number <= $2 + ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`, + events: `INSERT INTO events ( + timestamp, + event_type, + error_id, + order_hash, + maker, + maker_amount, + maker_fee, + maker_token, + taker, + taker_amount, + taker_fee, + taker_token, + txn_hash, + fee_recipient, + block_number, + log_index, + gas_used, + gas_price, + method_id, + salt + ) + (SELECT + a.timestamp, + a.event_type, + a.error_id, + a.order_hash, + a.maker, + a.maker_amount, + a.maker_fee, + a.maker_token, + a.taker, + a.taker_amount, + a.taker_fee, + a.taker_token, + a.txn_hash, + a.fee_recipient, + a.block_number, + a.log_index, + b.gas_used, + b.gas_price, + b.method_id, + b.salt + FROM + events_staging a + JOIN + transactions b + ON + a.txn_hash = b.txn_hash + AND + a.block_number >= $1 + AND + a.block_number <= $2 + ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`, + events_full: ` + INSERT INTO events_full ( + timestamp, + event_type, + error_id, + order_hash, + maker, + maker_amount, + maker_fee, + maker_token, + taker, + taker_amount, + taker_fee, + taker_token, + txn_hash, + fee_recipient, + block_number, + log_index, + gas_used, + gas_price, + method_id, + salt, + taker_symbol, + taker_name, + taker_decimals, + taker_usd_price, + taker_txn_usd_value, + maker_symbol, + maker_name, + maker_decimals, + maker_usd_price, + maker_txn_usd_value + ) + (SELECT + events.timestamp, + events.event_type, + events.error_id, + events.order_hash, + events.maker, + events.maker_amount, + events.maker_fee, + events.maker_token, + events.taker, + events.taker_amount, + events.taker_fee, + events.taker_token, + events.txn_hash, + events.fee_recipient, + events.block_number, + events.log_index, + events.gas_used, + events.gas_price, + events.method_id, + events.salt, + taker_token_prices.symbol, + taker_token_prices.name, + taker_token_prices.decimals, + taker_token_prices.price, + (events.taker_amount / (10 ^ taker_token_prices.decimals)) * taker_token_prices.price, + maker_token_prices.symbol, + maker_token_prices.name, + maker_token_prices.decimals, + maker_token_prices.price, + (events.maker_amount / (10 ^ maker_token_prices.decimals)) * maker_token_prices.price + FROM + events + LEFT JOIN + (SELECT + tokens.address, + tokens.name, + tokens.symbol, + tokens.decimals, + prices.timestamp, + prices.price + FROM + tokens + LEFT JOIN + prices + ON + tokens.symbol = prices.symbol) taker_token_prices + ON + (events.taker_token = taker_token_prices.address + AND + (DATE(events.timestamp) = DATE(taker_token_prices.timestamp) OR taker_token_prices.timestamp IS NULL)) + LEFT JOIN + (SELECT + tokens.address, + tokens.name, + tokens.symbol, + tokens.decimals, + prices.timestamp, + prices.price + FROM + tokens + LEFT JOIN + prices + ON + tokens.symbol = prices.symbol) maker_token_prices + ON + (events.maker_token = maker_token_prices.address + AND + (DATE(events.timestamp) = DATE(maker_token_prices.timestamp) OR maker_token_prices.timestamp IS NULL)) + WHERE + events.block_number >= $1 + AND + events.block_number <= $2 + ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`, +}; +if (cli.name) { + const query = dataInsertionQueries[cli.name]; + if (query && cli.from) { + const fromBlock = cli.from; + const toBlock = cli.to ? cli.to : cli.from + 1; + postgresClient + .query(query, [fromBlock, toBlock]) + .then((data: any) => { + console.log(data); + }) + .catch((err: any) => { + console.error(err); + }); + } +} diff --git a/packages/pipeline/src/scripts/query_data.ts b/packages/pipeline/src/scripts/query_data.ts new file mode 100644 index 000000000..97e3749ea --- /dev/null +++ b/packages/pipeline/src/scripts/query_data.ts @@ -0,0 +1,87 @@ +import { formatters } from '../utils'; +export const dataFetchingQueries: any = { + get_missing_txn_hashes: ` + SELECT + a.txn_hash + FROM + events_raw a + WHERE NOT EXISTS + ( + SELECT + * + FROM + transactions b + WHERE + b.txn_hash = a.txn_hash + ) + AND + a.block_number >= $1 + AND + a.block_number < $2`, + get_used_block_numbers: ` + SELECT DISTINCT + a.block_number + FROM + events_raw a + WHERE NOT EXISTS + ( + SELECT + * + FROM + blocks b + WHERE + b.block_number = a.block_number + ) + AND + a.block_number >= $1 + AND + a.block_number < $2`, + get_token_registry: ` + SELECT + * + FROM + tokens`, + get_max_block: ` + SELECT + MAX(block_number) + FROM + events_raw`, + get_relayers: ` + SELECT + * + FROM + relayers`, + get_most_recent_pricing_date: ` + SELECT + MAX(DATE(timestamp)) + FROM + prices + `, + get_top_unknown_token_addresses: ` + SELECT a.token_address as address, a.txn_value / 2 as total_txn_value +FROM +(SELECT token_address, SUM(txn_value) as txn_value +FROM +(select a.timestamp, a.maker_token as token_address, (CASE WHEN a.taker_txn_usd_value > a.maker_txn_usd_value OR a.maker_txn_usd_value IS NULL + THEN a.taker_txn_usd_value + ELSE a.maker_txn_usd_value END) as txn_value + from events_full a + where a.event_type = 'LogFill' + and a.timestamp > (NOW() + INTERVAL '-24 hours') + union + select a.timestamp, a.taker_token as token_address, (CASE WHEN a.taker_txn_usd_value > a.maker_txn_usd_value OR a.maker_txn_usd_value IS NULL + THEN a.taker_txn_usd_value + ELSE a.maker_txn_usd_value END) as txn_value + from events_full a + where a.event_type = 'LogFill' + and a.timestamp > (NOW() + INTERVAL '-24 hours')) token_txn_values +WHERE token_address IS NOT NULL +AND txn_value > 0 +GROUP BY 1 +ORDER BY 2 DESC) a +LEFT JOIN tokens b +ON a.token_address = b.address +WHERE symbol is NULL +ORDER BY 2 DESC +`, +}; diff --git a/packages/pipeline/src/scripts/scrape_data.ts b/packages/pipeline/src/scripts/scrape_data.ts new file mode 100644 index 000000000..963670b47 --- /dev/null +++ b/packages/pipeline/src/scripts/scrape_data.ts @@ -0,0 +1,649 @@ +import { ExchangeEvents, ZeroEx } from '0x.js'; +import { HttpClient, Order, OrderbookRequest, OrderbookResponse, TokenPairsItem } from '@0xproject/connect'; +import * as Airtable from 'airtable'; +import * as commandLineArgs from 'command-line-args'; +import * as _ from 'lodash'; +import * as querystring from 'querystring'; +import * as queue from 'queue'; +import * as request from 'request'; +import * as rpn from 'request-promise-native'; + +import { HttpRequestOptions } from '../../../connect/lib/src/types.js'; +import { relayer } from '../models/relayer.js'; +import { token } from '../models/tokens.js'; +import { postgresClient } from '../postgres.js'; +import { typeConverters } from '../utils.js'; +import { web3, zrx } from '../zrx.js'; + +import { insertDataScripts } from './create_tables.js'; +import { dataFetchingQueries } from './query_data.js'; +const optionDefinitions = [ + { name: 'from', alias: 'f', type: Number }, + { name: 'to', alias: 't', type: Number }, + { name: 'type', type: String }, + { name: 'id', type: String }, + { name: 'force', type: Boolean }, + { name: 'token', type: String }, +]; +const cli = commandLineArgs(optionDefinitions); +const q = queue({ concurrency: 6, autostart: true }); +const airtableBase = new Airtable({ apiKey: process.env.AIRTABLE_API_KEY }).base(process.env.AIRTABLE_0X_BASE); +const BLOCK_INCREMENTS = 1000; +const BASE_SYMBOL = 'USD'; // use USD as base currency against +const API_HIST_LIMIT = 2000; // cryptocompare API limits histoday price query to 2000 days +const SECONDS_PER_DAY = 86400; +const PRICE_API_ENDPOINT = 'https://min-api.cryptocompare.com/data/pricehistorical'; +const RELAYER_REGISTRY_JSON = 'https://raw.githubusercontent.com/0xProject/0x-relayer-registry/master/relayers.json'; +const METAMASK_ETH_CONTRACT_METADATA_JSON = + 'https://raw.githubusercontent.com/MetaMask/eth-contract-metadata/master/contract-map.json'; +const ETHPLORER_BASE_URL = 'http://api.ethplorer.io'; +const ETHPLORER_TOP_TOKENS_JSON = `${ETHPLORER_BASE_URL}/getTopTokens?apiKey=dyijm5418TjOJe34`; +// const HIST_PRICE_API_ENDPOINT = 'https://min-api.cryptocompare.com/data/histoday'; +const AIRTABLE_RELAYER_INFO = 'Relayer Info'; +export const pullDataScripts = { + getAllEvents(fromBlockNumber: number, toBlockNumber: number): any { + return new Promise((resolve, reject) => { + const getLogsPromises: any[] = []; + getLogsPromises.push( + zrx.exchange.getLogsAsync( + ExchangeEvents.LogFill, + { fromBlock: fromBlockNumber, toBlock: toBlockNumber }, + {}, + ), + zrx.exchange.getLogsAsync( + ExchangeEvents.LogCancel, + { fromBlock: fromBlockNumber, toBlock: toBlockNumber }, + {}, + ), + zrx.exchange.getLogsAsync( + ExchangeEvents.LogError, + { fromBlock: fromBlockNumber, toBlock: toBlockNumber }, + {}, + ), + ); + Promise.all(getLogsPromises) + .then((data: any[]) => { + resolve(data); + }) + .catch((err: any) => { + reject(err); + }); + }); + }, + getBlockInfo(blockNumber: number): any { + return new Promise((resolve, reject) => { + web3.eth.getBlock(blockNumber, (err, result) => { + if (err) { + reject(err); + } else { + resolve(result); + } + }); + }); + }, + getTransactionInfo(transactionHash: string): any { + return new Promise((resolve, reject) => { + web3.eth.getTransaction(transactionHash, (err, result) => { + if (err) { + reject(err); + } else { + resolve(result); + } + }); + }); + }, + getTokenRegistry(): any { + return new Promise((resolve, reject) => { + zrx.tokenRegistry + .getTokensAsync() + .then((data: any) => { + resolve(data); + }) + .catch((err: any) => { + reject(err); + }); + }); + }, + getMetaMaskTokens(): any { + return new Promise((resolve, reject) => { + request(METAMASK_ETH_CONTRACT_METADATA_JSON, (error, response, body) => { + if (error) { + reject(error); + } else { + resolve(JSON.parse(body)); + } + }); + }); + }, + getEthplorerTopTokens(): any { + return new Promise((resolve, reject) => { + request(ETHPLORER_TOP_TOKENS_JSON, (error, response, body) => { + if (error) { + reject(error); + } else { + resolve(JSON.parse(body)); + } + }); + }); + }, + getEthplorerToken(tokenAddress: string): any { + return new Promise((resolve, reject) => { + const url = `${ETHPLORER_BASE_URL}/getTokenInfo/${tokenAddress}?apiKey=dyijm5418TjOJe34`; + request(url, (error, response, body) => { + if (error) { + reject(error); + } else { + try { + const json = JSON.parse(body); + resolve(json); + } catch (err) { + resolve({ error: 'error' }); + } + } + }); + }); + }, + getPriceData(symbol: string, timestamp: number, timeDelay?: number): any { + return new Promise((resolve, reject) => { + if (symbol === 'WETH') { + symbol = 'ETH'; + } + let parsedParams = querystring.stringify({ + fsym: symbol, + tsyms: 'USD', + ts: timestamp / 1000, + }); + console.debug(parsedParams); + setTimeout(() => { + request(PRICE_API_ENDPOINT + '?' + parsedParams, (error, response, body) => { + if (error) { + reject(error); + } else { + resolve(JSON.parse(body)); + } + }); + }, timeDelay); + }); + }, + getRelayers(): any { + return new Promise((resolve, reject) => { + request(RELAYER_REGISTRY_JSON, (error, response, body) => { + if (error) { + reject(error); + } else { + resolve(JSON.parse(body)); + } + }); + }); + }, + async getOrderBook(sraEndpoint: string): Promise { + const relayerClient = new HttpClient(sraEndpoint); + const tokenResponse: TokenPairsItem[] = await relayerClient.getTokenPairsAsync(); + const fullOrderBook: OrderbookResponse[] = []; + for (const tokenPair of tokenResponse) { + const orderBookRequest: OrderbookRequest = { + baseTokenAddress: tokenPair.tokenA.address, + quoteTokenAddress: tokenPair.tokenB.address, + }; + const orderBook: OrderbookResponse = await relayerClient.getOrderbookAsync(orderBookRequest); + fullOrderBook.push(orderBook); + } + return fullOrderBook; + }, + // async getHistoricalPrices( + // fromSymbol: string, + // toSymbol: string, + // fromTimestamp: number, + // toTimestamp: number, + // ): Promise { + // const daysInQueryPeriod = Math.round((toTimestamp - fromTimestamp) / (SECONDS_PER_DAY)); + // if(fromSymbol === 'WETH') { + // fromSymbol = 'ETH'; + // } + // var parsedParams = { + // fsym: fromSymbol, + // tsym: toSymbol, + // limit: Math.min(daysInQueryPeriod, API_HIST_LIMIT), + // toTs: toTimestamp, + // }; + // var options = { + // uri: HIST_PRICE_API_ENDPOINT, + // qs: parsedParams, + // json: false, + // }; + // try { + // const response = await rpn(options); + // return Promise.resolve(JSON.parse(response)); + // } catch (error) { + // console.debug(error); + // return Promise.reject(error); + // } + // }, +}; +export const scrapeDataScripts = { + scrapeAllPricesToDB(fromTime: number, toTime: number) { + const fromDate = new Date(fromTime); + fromDate.setUTCHours(0); + fromDate.setUTCMinutes(0); + fromDate.setUTCSeconds(0); + fromDate.setUTCMilliseconds(0); + const toDate = new Date(toTime); + postgresClient + .query(dataFetchingQueries.get_token_registry, []) + .then((result: any) => { + for (const curDate = fromDate; curDate < toDate; curDate.setDate(curDate.getDate() + 1)) { + for (const token of Object.values(result.rows)) { + console.debug('Scraping ' + curDate + ' ' + token); + q.push(_scrapePriceToDB(curDate.getTime(), token, 500)); + } + } + }) + .catch((err: any) => { + console.debug(err); + }); + }, +}; +function _scrapeEventsToDB(fromBlock: number, toBlock: number): any { + return (cb: () => void) => { + pullDataScripts + .getAllEvents(fromBlock, toBlock) + .then((data: any) => { + const parsedEvents: any = {}; + parsedEvents[ExchangeEvents.LogFill] = []; + parsedEvents[ExchangeEvents.LogCancel] = []; + parsedEvents[ExchangeEvents.LogError] = []; + for (const index in data) { + for (const datum of data[index]) { + const event = typeConverters.convertLogEventToEventObject(datum); + parsedEvents[event.event_type].push(event); + } + } + console.log(fromBlock + ' : ' + toBlock + ' ' + parsedEvents[ExchangeEvents.LogFill].length); + for (const event_type in parsedEvents) { + if (parsedEvents[event_type].length > 0) { + insertDataScripts + .insertMultipleRows( + 'events_raw', + parsedEvents[event_type], + Object.keys(parsedEvents[event_type][0]), + ) + .then(() => {}) + .catch((error: any) => {}); + } + } + cb(); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapeBlockToDB(block: number): any { + return (cb: () => void) => { + pullDataScripts + .getBlockInfo(block) + .then((data: any) => { + const parsedBlock = typeConverters.convertLogBlockToBlockObject(data); + insertDataScripts + .insertSingleRow('blocks', parsedBlock) + .then((result: any) => { + cb(); + }) + .catch((err: any) => { + cb(); + }); + }) + .catch((err: any) => { + cb(); + }); + }; +} +// function _scrapeAllRelayersToDB(): any { +// return (cb: () => void) => { +// airtableBase(AIRTABLE_RELAYER_INFO) +// .select() +// .eachPage((records: any, fetchNextPage: () => void) => { +// const parsedRelayers: any[] = []; +// for(const record of records) { +// parsedRelayers.push(typeConverters.convertRelayerToRelayerObject(record)); +// } +// insertDataScripts.insertMultipleRows('relayers', parsedRelayers, Object.keys(parsedRelayers[0])) +// .then((result: any) => { +// cb(); +// }) +// .catch((err: any) => { +// cb(); +// }); +// }) +// .catch((err: any) => { +// cb(); +// }); +// }; +// } +function _scrapeAllRelayersToDB(): any { + return (cb: () => void) => { + pullDataScripts + .getRelayers() + .then((relayers: any[]) => { + console.log(relayers); + const parsedRelayers: any[] = []; + for (const relayer of relayers) { + parsedRelayers.push(typeConverters.convertRelayerToRelayerObject(relayer)); + } + console.log(parsedRelayers); + insertDataScripts + .insertMultipleRows('relayers', parsedRelayers, Object.keys(relayer.tableProperties)) + .then((result: any) => { + console.log(result); + cb(); + }) + .catch((err: any) => { + console.log(err); + cb(); + }); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapeTransactionToDB(transactionHash: string): any { + return (cb: () => void) => { + pullDataScripts + .getTransactionInfo(transactionHash) + .then((data: any) => { + const parsedTransaction = typeConverters.convertLogTransactionToTransactionObject(data); + insertDataScripts + .insertSingleRow('transactions', parsedTransaction) + .then((result: any) => { + cb(); + }) + .catch((err: any) => { + cb(); + }); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapeTokenRegistryToDB(): any { + return (cb: () => void) => { + pullDataScripts + .getTokenRegistry() + .then((data: any) => { + const parsedTokens: any = []; + for (const token of data) { + parsedTokens.push(typeConverters.convertLogTokenToTokenObject(token)); + } + insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); + cb(); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapeMetaMaskEthContractMetadataToDB(): any { + return (cb: () => void) => { + pullDataScripts + .getMetaMaskTokens() + .then((data: any) => { + const parsedTokens: any = []; + const dataArray = _.map(_.keys(data), (tokenAddress: string) => { + const value = _.get(data, tokenAddress); + return { + address: tokenAddress, + ...value, + }; + }); + const erc20TokensOnly = _.filter(dataArray, entry => { + const isErc20 = _.get(entry, 'erc20'); + return isErc20; + }); + for (const token of erc20TokensOnly) { + parsedTokens.push(typeConverters.convertMetaMaskTokenToTokenObject(token)); + } + insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); + cb(); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapeEthplorerTopTokensToDB(): any { + return (cb: () => void) => { + pullDataScripts + .getEthplorerTopTokens() + .then((data: any) => { + const parsedTokens: any = []; + const tokens = _.get(data, 'tokens'); + for (const token of tokens) { + parsedTokens.push(typeConverters.convertMetaMaskTokenToTokenObject(token)); + } + insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); + cb(); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapeUnknownTokenInformationToDB(): any { + return (cb: () => void) => { + postgresClient + .query(dataFetchingQueries.get_top_unknown_token_addresses) + .then(async (result: any) => { + const addresses = _.map(result.rows, row => _.get(row, 'address')); + const responses = await Promise.all( + _.map(addresses, address => pullDataScripts.getEthplorerToken(address)), + ); + const tokens = _.filter(responses, response => _.isUndefined(_.get(response, 'error'))); + const parsedTokens = _.map(tokens, tokenInfo => + typeConverters.convertEthplorerTokenToTokenObject(tokenInfo), + ); + insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); + cb(); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapePriceToDB(timestamp: number, token: any, timeDelay?: number): any { + return (cb: () => void) => { + pullDataScripts + .getPriceData(token.symbol, timestamp, timeDelay) + .then((data: any) => { + const safeSymbol = token.symbol === 'WETH' ? 'ETH' : token.symbol; + const parsedPrice = { + timestamp: timestamp / 1000, + symbol: token.symbol, + base: 'USD', + price: _.has(data[safeSymbol], 'USD') ? data[safeSymbol].USD : 0, + }; + console.debug('Inserting ' + timestamp); + console.debug(parsedPrice); + insertDataScripts.insertSingleRow('prices', parsedPrice); + cb(); + }) + .catch((err: any) => { + console.debug(err); + cb(); + }); + }; +} +// function _scrapeHistoricalPricesToDB(token: any, fromTimestamp: number, toTimestamp: number): any { +// return (cb: () => void) => { +// pullDataScripts +// .getHistoricalPrices(token, BASE_SYMBOL, fromTimestamp, toTimestamp) +// .then((data: any) => { +// const parsedHistoricalPrices: any = []; +// for (const historicalPrice of data['Data']) { +// const parsedHistoricalPrice = typeConverters.convertLogHistoricalPricesToHistoricalPricesObject(historicalPrice); +// parsedHistoricalPrice['token'] = token; +// parsedHistoricalPrice['base'] = BASE_SYMBOL; +// parsedHistoricalPrices.push(parsedHistoricalPrice); +// } +// if (parsedHistoricalPrices.length > 0) { +// insertDataScripts +// .insertMultipleRows( +// 'historical_prices', +// parsedHistoricalPrices, +// Object.keys(parsedHistoricalPrices[0]), +// ) +// .catch((error: any) => { +// console.error(error); +// }); +// } +// cb(); +// }) +// .catch((error: any) => { +// console.error(error); +// cb(); +// }); +// }; +// } +function _scrapeOrderBookToDB(id: string, sraEndpoint: string): any { + return (cb: () => void) => { + pullDataScripts + .getOrderBook(sraEndpoint) + .then((data: any) => { + for (const book of data) { + for (const order of book.bids) { + console.debug(order); + const parsedOrder = typeConverters.convertLogOrderToOrderObject(order); + parsedOrder.relayer_id = id; + parsedOrder.order_hash = ZeroEx.getOrderHashHex(order); + insertDataScripts.insertSingleRow('orders', parsedOrder).catch((error: any) => { + console.error(error); + }); + } + for (const order of book.asks) { + console.debug(order); + const parsedOrder = typeConverters.convertLogOrderToOrderObject(order); + parsedOrder.relayer_id = id; + parsedOrder.order_hash = ZeroEx.getOrderHashHex(order); + insertDataScripts.insertSingleRow('orders', parsedOrder).catch((error: any) => { + console.error(error); + }); + } + } + cb(); + }) + .catch((error: any) => { + console.error(error); + cb(); + }); + }; +} +if (cli.type === 'events') { + if (cli.from && cli.to) { + const destToBlock = cli.to ? cli.to : cli.from; + let curFromBlock = cli.from; + let curToBlock = curFromBlock; + do { + curToBlock += destToBlock - curToBlock < BLOCK_INCREMENTS ? destToBlock - curToBlock : BLOCK_INCREMENTS; + q.push(_scrapeEventsToDB(curFromBlock, curToBlock)); + curFromBlock = curToBlock + 1; + } while (curToBlock < destToBlock); + } +} else if (cli.type === 'blocks') { + if (cli.from && cli.to) { + if (cli.force) { + const destToBlock = cli.to ? cli.to : cli.from; + let curFromBlock = cli.from; + const curToBlock = curFromBlock; + for (; curFromBlock < destToBlock; curFromBlock++) { + q.push(_scrapeBlockToDB(curFromBlock)); + } + } else { + const fetchFrom = cli.from; + const fetchTo = cli.to ? cli.to : cli.from + 1; + postgresClient + .query(dataFetchingQueries.get_used_block_numbers, [fetchFrom, fetchTo]) + .then((data: any) => { + for (const row of data.rows) { + q.push(_scrapeBlockToDB(row.block_number)); + } + }) + .catch((err: any) => { + // console.debug(err); + }); + } + } +} else if (cli.type === 'transactions') { + if (cli.id) { + q.push(_scrapeTransactionToDB(cli.id)); + } else if (cli.from) { + const fetchFrom = cli.from; + const fetchTo = cli.to ? cli.to : cli.from + 1; + postgresClient + .query(dataFetchingQueries.get_missing_txn_hashes, [fetchFrom, fetchTo]) + .then((data: any) => { + for (const row of data.rows) { + q.push(_scrapeTransactionToDB(row.txn_hash)); + } + }) + .catch((err: any) => { + // console.debug(err); + }); + } +} else if (cli.type === 'tokens') { + q.push(_scrapeMetaMaskEthContractMetadataToDB()); + q.push(_scrapeEthplorerTopTokensToDB()); +} else if (cli.type === 'unknown_tokens') { + q.push(_scrapeUnknownTokenInformationToDB()); +} else if (cli.type === 'prices' && cli.from && cli.to) { + const fromDate = new Date(cli.from); + console.debug(fromDate); + fromDate.setUTCHours(0); + fromDate.setUTCMinutes(0); + fromDate.setUTCSeconds(0); + fromDate.setUTCMilliseconds(0); + console.debug(fromDate); + const toDate = new Date(cli.to); + postgresClient + .query(dataFetchingQueries.get_token_registry, []) + .then((result: any) => { + for (const curDate = fromDate; curDate < toDate; curDate.setDate(curDate.getDate() + 1)) { + for (const token of Object.values(result.rows)) { + console.debug('Scraping ' + curDate + ' ' + token); + q.push(_scrapePriceToDB(curDate.getTime(), token)); + } + } + }) + .catch((err: any) => { + console.debug(err); + }); + // } else if (cli.type === 'historical_prices') { + // if (cli.token && cli.from && cli.to) { + // q.push(_scrapeHistoricalPricesToDB(cli.token, cli.from, cli.to)); + // } + // } else if (cli.type === 'all_historical_prices') { + // if (cli.from && cli.to) { + // postgresClient + // .query(dataFetchingQueries.get_token_registry, []) + // .then((result: any) => { + // const curTokens: any = result.rows.map((a: any): any => a.symbol); + // for (const curToken of curTokens) { + // console.debug('Historical data backfill: Pushing coin ' + curToken); + // q.push(_scrapeHistoricalPricesToDB(curToken, cli.from, cli.to)); + // } + // }) + // .catch((err: any) => { + // console.debug(err); + // }); + // } +} else if (cli.type === 'relayers') { + q.push(_scrapeAllRelayersToDB()); +} else if (cli.type === 'orders') { + postgresClient.query(dataFetchingQueries.get_relayers, []).then((result: any) => { + for (const relayer of result.rows) { + if (relayer.sra_http_url) { + q.push(_scrapeOrderBookToDB(relayer.id, relayer.sra_http_url)); + } + } + }); +} -- cgit v1.2.3 From cd73a047efbc1b7afa884ef27a1cc4991e20efd8 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Wed, 19 Sep 2018 10:51:12 -0700 Subject: Remove old code. Create function for getting contract events via etherscan --- packages/pipeline/src/scripts/create_tables.ts | 258 ---------- packages/pipeline/src/scripts/join_tables.ts | 234 --------- packages/pipeline/src/scripts/query_data.ts | 87 ---- packages/pipeline/src/scripts/scrape_data.ts | 649 ------------------------- 4 files changed, 1228 deletions(-) delete mode 100644 packages/pipeline/src/scripts/create_tables.ts delete mode 100644 packages/pipeline/src/scripts/join_tables.ts delete mode 100644 packages/pipeline/src/scripts/query_data.ts delete mode 100644 packages/pipeline/src/scripts/scrape_data.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/create_tables.ts b/packages/pipeline/src/scripts/create_tables.ts deleted file mode 100644 index fd0d2b78b..000000000 --- a/packages/pipeline/src/scripts/create_tables.ts +++ /dev/null @@ -1,258 +0,0 @@ -import * as commandLineArgs from 'command-line-args'; - -import { postgresClient } from '../postgres'; -import { formatters } from '../utils'; -const tableQueries: any = { - events_full: `CREATE TABLE IF NOT EXISTS events_full ( - timestamp TIMESTAMP WITH TIME ZONE, - event_type VARCHAR, - error_id VARCHAR, - order_hash CHAR(66), - maker CHAR(42), - maker_amount NUMERIC(78), - maker_fee NUMERIC(78), - maker_token CHAR(42), - taker CHAR(42), - taker_amount NUMERIC(78), - taker_fee NUMERIC(78), - taker_token CHAR(42), - txn_hash CHAR(66), - gas_used NUMERIC(78), - gas_price NUMERIC(78), - fee_recipient CHAR(42), - method_id CHAR(10), - salt VARCHAR, - block_number BIGINT, - log_index BIGINT, - taker_symbol VARCHAR, - taker_name VARCHAR, - taker_decimals BIGINT, - taker_usd_price NUMERIC(78), - taker_txn_usd_value NUMERIC(78), - maker_symbol VARCHAR, - maker_name VARCHAR, - maker_decimals BIGINT, - maker_usd_price NUMERIC(78), - maker_txn_usd_value NUMERIC(78), - PRIMARY KEY (txn_hash, order_hash, log_index) - )`, - events: `CREATE TABLE IF NOT EXISTS events ( - timestamp TIMESTAMP WITH TIME ZONE, - event_type VARCHAR, - error_id VARCHAR, - order_hash CHAR(66), - maker CHAR(42), - maker_amount NUMERIC(78), - maker_fee NUMERIC(78), - maker_token CHAR(42), - taker CHAR(42), - taker_amount NUMERIC(78), - taker_fee NUMERIC(78), - taker_token CHAR(42), - txn_hash CHAR(66), - gas_used NUMERIC(78), - gas_price NUMERIC(78), - fee_recipient CHAR(42), - method_id CHAR(10), - salt VARCHAR, - block_number BIGINT, - log_index BIGINT, - PRIMARY KEY (txn_hash, order_hash, log_index) - )`, - events_staging: `CREATE TABLE IF NOT EXISTS events_staging ( - timestamp TIMESTAMP WITH TIME ZONE, - event_type VARCHAR, - error_id VARCHAR, - order_hash CHAR(66), - maker CHAR(42), - maker_amount NUMERIC(78), - maker_fee NUMERIC(78), - maker_token CHAR(42), - taker CHAR(42), - taker_amount NUMERIC(78), - taker_fee NUMERIC(78), - taker_token CHAR(42), - txn_hash CHAR(66), - fee_recipient CHAR(42), - block_number BIGINT, - log_index BIGINT, - PRIMARY KEY (txn_hash, order_hash, log_index) - )`, - events_raw: `CREATE TABLE IF NOT EXISTS events_raw ( - event_type VARCHAR, - error_id VARCHAR, - order_hash CHAR(66), - maker CHAR(42), - maker_amount NUMERIC(78), - maker_fee NUMERIC(78), - maker_token CHAR(42), - taker CHAR(42), - taker_amount NUMERIC(78), - taker_fee NUMERIC(78), - taker_token CHAR(42), - txn_hash CHAR(66), - fee_recipient CHAR(42), - block_number BIGINT, - log_index BIGINT, - PRIMARY KEY (txn_hash, order_hash, log_index) - )`, - blocks: `CREATE TABLE IF NOT EXISTS blocks ( - timestamp TIMESTAMP WITH TIME ZONE, - block_hash CHAR(66) UNIQUE, - block_number BIGINT, - PRIMARY KEY (block_hash) - )`, - transactions: `CREATE TABLE IF NOT EXISTS transactions ( - txn_hash CHAR(66) UNIQUE, - block_hash CHAR(66), - block_number BIGINT, - gas_used NUMERIC(78), - gas_price NUMERIC(78), - method_id CHAR(10), - salt VARCHAR, - PRIMARY KEY (txn_hash) - )`, - tokens: `CREATE TABLE IF NOT EXISTS tokens ( - address CHAR(42) UNIQUE, - name VARCHAR, - symbol VARCHAR, - decimals INT, - PRIMARY KEY (address) - )`, - prices: `CREATE TABLE IF NOT EXISTS prices ( - address CHAR(42) UNIQUE, - timestamp TIMESTAMP WITH TIME ZONE, - price NUMERIC(78, 18), - PRIMARY KEY (address, timestamp) - )`, - relayers: `CREATE TABLE IF NOT EXISTS relayers ( - name VARCHAR UNIQUE, - url VARCHAR DEFAULT '', - sra_http_endpoint VARCHAR DEFAULT '', - sra_ws_endpoint VARCHAR DEFAULT '', - fee_recipient_addresses CHAR(42)[] DEFAULT '{}', - taker_addresses CHAR(42)[] DEFAULT '{}', - PRIMARY KEY(name)`, - historical_prices: `CREATE TABLE IF NOT EXISTS historical_prices ( - token VARCHAR, - base VARCHAR, - timestamp TIMESTAMP WITH TIME ZONE, - close NUMERIC(78, 18), - high NUMERIC(78, 18), - low NUMERIC(78, 18), - open NUMERIC(78, 18), - volume_from NUMERIC(78, 18), - volume_to NUMERIC(78, 18), - PRIMARY KEY (token, base, timestamp) - )`, - orders: `CREATE TABLE IF NOT EXISTS orders ( - relayer_id VARCHAR, - exchange_contract_address CHAR(42), - maker CHAR(42), - maker_amount NUMERIC(78), - maker_fee NUMERIC(78), - maker_token CHAR(42), - taker CHAR(42), - taker_amount NUMERIC(78), - taker_fee NUMERIC(78), - taker_token CHAR(42), - fee_recipient CHAR(42), - expiration_unix_timestamp_sec NUMERIC(78), - salt VARCHAR, - order_hash CHAR(66), - PRIMARY KEY (relayer_id, order_hash) - )`, -}; -function _safeQuery(query: string): any { - return new Promise((resolve, reject) => { - postgresClient - .query(query) - .then((data: any) => { - resolve(data); - }) - .catch((err: any) => { - reject(err); - }); - }); -} -export const tableScripts = { - createTable(query: string): any { - return _safeQuery(query); - }, - createAllTables(): any { - for (const tableName of tableQueries) { - _safeQuery(tableQueries[tableName]); - } - }, -}; -export const insertDataScripts = { - insertSingleRow(table: string, object: any): any { - return new Promise((resolve, reject) => { - const columns = Object.keys(object); - const safeArray: any = []; - for (const key of columns) { - if (key in object) { - if (key === 'timestamp') { - safeArray.push('to_timestamp(' + object[key] + ')'); - } else if (typeof object[key] === 'string' || object[key] instanceof String) { - safeArray.push(formatters.escapeSQLParam(object[key])); - } else { - safeArray.push(object[key]); - } - } else { - safeArray.push('default'); - } - } - const queryString = `INSERT INTO ${table} (${columns}) VALUES (${safeArray}) ON CONFLICT DO NOTHING`; - console.log(queryString); - postgresClient - .query(queryString) - .then((data: any) => { - resolve(data); - }) - .catch((err: any) => { - reject(err); - }); - }); - }, - insertMultipleRows(table: string, rows: any[], columns: any[]): any { - return new Promise((resolve, reject) => { - if (rows.length > 0) { - const rowsSplit = rows.map((value, index) => { - const safeArray: any = []; - for (const key of columns) { - if (key in value) { - if (key === 'timestamp') { - safeArray.push('to_timestamp(' + value[key] + ')'); - } else if (typeof value[key] === 'string' || value[key] instanceof String) { - safeArray.push(formatters.escapeSQLParam(value[key])); - } else if (value[key] instanceof Array) { - const escapedArray = value[key].map((subValue: string, subIndex: number) => { - return formatters.escapeSQLParam(subValue); - }); - safeArray.push('ARRAY[' + escapedArray.toString() + ']'); - } else { - safeArray.push(value[key]); - } - } else { - safeArray.push('default'); - } - } - return '(' + safeArray + ')'; - }); - const queryString = `INSERT INTO ${table} (${columns}) VALUES ${rowsSplit} ON CONFLICT DO NOTHING`; - postgresClient - .query(queryString) - .then((data: any) => { - resolve(data); - }) - .catch((err: any) => { - // console.log(err); - reject(err); - }); - } else { - resolve({}); - } - }); - }, -}; diff --git a/packages/pipeline/src/scripts/join_tables.ts b/packages/pipeline/src/scripts/join_tables.ts deleted file mode 100644 index e7c05b39a..000000000 --- a/packages/pipeline/src/scripts/join_tables.ts +++ /dev/null @@ -1,234 +0,0 @@ -import * as commandLineArgs from 'command-line-args'; - -import { postgresClient } from '../postgres'; -import { formatters } from '../utils'; -const optionDefinitions = [ - { name: 'name', alias: 'n', type: String }, - { name: 'from', alias: 'f', type: Number }, - { name: 'to', alias: 't', type: Number }, -]; -const cli = commandLineArgs(optionDefinitions); -const dataInsertionQueries: any = { - events_staging: `INSERT INTO events_staging ( - timestamp, - event_type, - error_id, - order_hash, - maker, - maker_amount, - maker_fee, - maker_token, - taker, - taker_amount, - taker_fee, - taker_token, - txn_hash, - fee_recipient, - block_number, - log_index - ) - (SELECT - b.timestamp, - a.event_type, - a.error_id, - a.order_hash, - a.maker, - a.maker_amount, - a.maker_fee, - a.maker_token, - a.taker, - a.taker_amount, - a.taker_fee, - a.taker_token, - a.txn_hash, - a.fee_recipient, - a.block_number, - a.log_index - FROM - events_raw a - JOIN - blocks b - ON - a.block_number = b.block_number - AND - b.block_number >= $1 - AND - b.block_number <= $2 - ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`, - events: `INSERT INTO events ( - timestamp, - event_type, - error_id, - order_hash, - maker, - maker_amount, - maker_fee, - maker_token, - taker, - taker_amount, - taker_fee, - taker_token, - txn_hash, - fee_recipient, - block_number, - log_index, - gas_used, - gas_price, - method_id, - salt - ) - (SELECT - a.timestamp, - a.event_type, - a.error_id, - a.order_hash, - a.maker, - a.maker_amount, - a.maker_fee, - a.maker_token, - a.taker, - a.taker_amount, - a.taker_fee, - a.taker_token, - a.txn_hash, - a.fee_recipient, - a.block_number, - a.log_index, - b.gas_used, - b.gas_price, - b.method_id, - b.salt - FROM - events_staging a - JOIN - transactions b - ON - a.txn_hash = b.txn_hash - AND - a.block_number >= $1 - AND - a.block_number <= $2 - ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`, - events_full: ` - INSERT INTO events_full ( - timestamp, - event_type, - error_id, - order_hash, - maker, - maker_amount, - maker_fee, - maker_token, - taker, - taker_amount, - taker_fee, - taker_token, - txn_hash, - fee_recipient, - block_number, - log_index, - gas_used, - gas_price, - method_id, - salt, - taker_symbol, - taker_name, - taker_decimals, - taker_usd_price, - taker_txn_usd_value, - maker_symbol, - maker_name, - maker_decimals, - maker_usd_price, - maker_txn_usd_value - ) - (SELECT - events.timestamp, - events.event_type, - events.error_id, - events.order_hash, - events.maker, - events.maker_amount, - events.maker_fee, - events.maker_token, - events.taker, - events.taker_amount, - events.taker_fee, - events.taker_token, - events.txn_hash, - events.fee_recipient, - events.block_number, - events.log_index, - events.gas_used, - events.gas_price, - events.method_id, - events.salt, - taker_token_prices.symbol, - taker_token_prices.name, - taker_token_prices.decimals, - taker_token_prices.price, - (events.taker_amount / (10 ^ taker_token_prices.decimals)) * taker_token_prices.price, - maker_token_prices.symbol, - maker_token_prices.name, - maker_token_prices.decimals, - maker_token_prices.price, - (events.maker_amount / (10 ^ maker_token_prices.decimals)) * maker_token_prices.price - FROM - events - LEFT JOIN - (SELECT - tokens.address, - tokens.name, - tokens.symbol, - tokens.decimals, - prices.timestamp, - prices.price - FROM - tokens - LEFT JOIN - prices - ON - tokens.symbol = prices.symbol) taker_token_prices - ON - (events.taker_token = taker_token_prices.address - AND - (DATE(events.timestamp) = DATE(taker_token_prices.timestamp) OR taker_token_prices.timestamp IS NULL)) - LEFT JOIN - (SELECT - tokens.address, - tokens.name, - tokens.symbol, - tokens.decimals, - prices.timestamp, - prices.price - FROM - tokens - LEFT JOIN - prices - ON - tokens.symbol = prices.symbol) maker_token_prices - ON - (events.maker_token = maker_token_prices.address - AND - (DATE(events.timestamp) = DATE(maker_token_prices.timestamp) OR maker_token_prices.timestamp IS NULL)) - WHERE - events.block_number >= $1 - AND - events.block_number <= $2 - ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`, -}; -if (cli.name) { - const query = dataInsertionQueries[cli.name]; - if (query && cli.from) { - const fromBlock = cli.from; - const toBlock = cli.to ? cli.to : cli.from + 1; - postgresClient - .query(query, [fromBlock, toBlock]) - .then((data: any) => { - console.log(data); - }) - .catch((err: any) => { - console.error(err); - }); - } -} diff --git a/packages/pipeline/src/scripts/query_data.ts b/packages/pipeline/src/scripts/query_data.ts deleted file mode 100644 index 97e3749ea..000000000 --- a/packages/pipeline/src/scripts/query_data.ts +++ /dev/null @@ -1,87 +0,0 @@ -import { formatters } from '../utils'; -export const dataFetchingQueries: any = { - get_missing_txn_hashes: ` - SELECT - a.txn_hash - FROM - events_raw a - WHERE NOT EXISTS - ( - SELECT - * - FROM - transactions b - WHERE - b.txn_hash = a.txn_hash - ) - AND - a.block_number >= $1 - AND - a.block_number < $2`, - get_used_block_numbers: ` - SELECT DISTINCT - a.block_number - FROM - events_raw a - WHERE NOT EXISTS - ( - SELECT - * - FROM - blocks b - WHERE - b.block_number = a.block_number - ) - AND - a.block_number >= $1 - AND - a.block_number < $2`, - get_token_registry: ` - SELECT - * - FROM - tokens`, - get_max_block: ` - SELECT - MAX(block_number) - FROM - events_raw`, - get_relayers: ` - SELECT - * - FROM - relayers`, - get_most_recent_pricing_date: ` - SELECT - MAX(DATE(timestamp)) - FROM - prices - `, - get_top_unknown_token_addresses: ` - SELECT a.token_address as address, a.txn_value / 2 as total_txn_value -FROM -(SELECT token_address, SUM(txn_value) as txn_value -FROM -(select a.timestamp, a.maker_token as token_address, (CASE WHEN a.taker_txn_usd_value > a.maker_txn_usd_value OR a.maker_txn_usd_value IS NULL - THEN a.taker_txn_usd_value - ELSE a.maker_txn_usd_value END) as txn_value - from events_full a - where a.event_type = 'LogFill' - and a.timestamp > (NOW() + INTERVAL '-24 hours') - union - select a.timestamp, a.taker_token as token_address, (CASE WHEN a.taker_txn_usd_value > a.maker_txn_usd_value OR a.maker_txn_usd_value IS NULL - THEN a.taker_txn_usd_value - ELSE a.maker_txn_usd_value END) as txn_value - from events_full a - where a.event_type = 'LogFill' - and a.timestamp > (NOW() + INTERVAL '-24 hours')) token_txn_values -WHERE token_address IS NOT NULL -AND txn_value > 0 -GROUP BY 1 -ORDER BY 2 DESC) a -LEFT JOIN tokens b -ON a.token_address = b.address -WHERE symbol is NULL -ORDER BY 2 DESC -`, -}; diff --git a/packages/pipeline/src/scripts/scrape_data.ts b/packages/pipeline/src/scripts/scrape_data.ts deleted file mode 100644 index 963670b47..000000000 --- a/packages/pipeline/src/scripts/scrape_data.ts +++ /dev/null @@ -1,649 +0,0 @@ -import { ExchangeEvents, ZeroEx } from '0x.js'; -import { HttpClient, Order, OrderbookRequest, OrderbookResponse, TokenPairsItem } from '@0xproject/connect'; -import * as Airtable from 'airtable'; -import * as commandLineArgs from 'command-line-args'; -import * as _ from 'lodash'; -import * as querystring from 'querystring'; -import * as queue from 'queue'; -import * as request from 'request'; -import * as rpn from 'request-promise-native'; - -import { HttpRequestOptions } from '../../../connect/lib/src/types.js'; -import { relayer } from '../models/relayer.js'; -import { token } from '../models/tokens.js'; -import { postgresClient } from '../postgres.js'; -import { typeConverters } from '../utils.js'; -import { web3, zrx } from '../zrx.js'; - -import { insertDataScripts } from './create_tables.js'; -import { dataFetchingQueries } from './query_data.js'; -const optionDefinitions = [ - { name: 'from', alias: 'f', type: Number }, - { name: 'to', alias: 't', type: Number }, - { name: 'type', type: String }, - { name: 'id', type: String }, - { name: 'force', type: Boolean }, - { name: 'token', type: String }, -]; -const cli = commandLineArgs(optionDefinitions); -const q = queue({ concurrency: 6, autostart: true }); -const airtableBase = new Airtable({ apiKey: process.env.AIRTABLE_API_KEY }).base(process.env.AIRTABLE_0X_BASE); -const BLOCK_INCREMENTS = 1000; -const BASE_SYMBOL = 'USD'; // use USD as base currency against -const API_HIST_LIMIT = 2000; // cryptocompare API limits histoday price query to 2000 days -const SECONDS_PER_DAY = 86400; -const PRICE_API_ENDPOINT = 'https://min-api.cryptocompare.com/data/pricehistorical'; -const RELAYER_REGISTRY_JSON = 'https://raw.githubusercontent.com/0xProject/0x-relayer-registry/master/relayers.json'; -const METAMASK_ETH_CONTRACT_METADATA_JSON = - 'https://raw.githubusercontent.com/MetaMask/eth-contract-metadata/master/contract-map.json'; -const ETHPLORER_BASE_URL = 'http://api.ethplorer.io'; -const ETHPLORER_TOP_TOKENS_JSON = `${ETHPLORER_BASE_URL}/getTopTokens?apiKey=dyijm5418TjOJe34`; -// const HIST_PRICE_API_ENDPOINT = 'https://min-api.cryptocompare.com/data/histoday'; -const AIRTABLE_RELAYER_INFO = 'Relayer Info'; -export const pullDataScripts = { - getAllEvents(fromBlockNumber: number, toBlockNumber: number): any { - return new Promise((resolve, reject) => { - const getLogsPromises: any[] = []; - getLogsPromises.push( - zrx.exchange.getLogsAsync( - ExchangeEvents.LogFill, - { fromBlock: fromBlockNumber, toBlock: toBlockNumber }, - {}, - ), - zrx.exchange.getLogsAsync( - ExchangeEvents.LogCancel, - { fromBlock: fromBlockNumber, toBlock: toBlockNumber }, - {}, - ), - zrx.exchange.getLogsAsync( - ExchangeEvents.LogError, - { fromBlock: fromBlockNumber, toBlock: toBlockNumber }, - {}, - ), - ); - Promise.all(getLogsPromises) - .then((data: any[]) => { - resolve(data); - }) - .catch((err: any) => { - reject(err); - }); - }); - }, - getBlockInfo(blockNumber: number): any { - return new Promise((resolve, reject) => { - web3.eth.getBlock(blockNumber, (err, result) => { - if (err) { - reject(err); - } else { - resolve(result); - } - }); - }); - }, - getTransactionInfo(transactionHash: string): any { - return new Promise((resolve, reject) => { - web3.eth.getTransaction(transactionHash, (err, result) => { - if (err) { - reject(err); - } else { - resolve(result); - } - }); - }); - }, - getTokenRegistry(): any { - return new Promise((resolve, reject) => { - zrx.tokenRegistry - .getTokensAsync() - .then((data: any) => { - resolve(data); - }) - .catch((err: any) => { - reject(err); - }); - }); - }, - getMetaMaskTokens(): any { - return new Promise((resolve, reject) => { - request(METAMASK_ETH_CONTRACT_METADATA_JSON, (error, response, body) => { - if (error) { - reject(error); - } else { - resolve(JSON.parse(body)); - } - }); - }); - }, - getEthplorerTopTokens(): any { - return new Promise((resolve, reject) => { - request(ETHPLORER_TOP_TOKENS_JSON, (error, response, body) => { - if (error) { - reject(error); - } else { - resolve(JSON.parse(body)); - } - }); - }); - }, - getEthplorerToken(tokenAddress: string): any { - return new Promise((resolve, reject) => { - const url = `${ETHPLORER_BASE_URL}/getTokenInfo/${tokenAddress}?apiKey=dyijm5418TjOJe34`; - request(url, (error, response, body) => { - if (error) { - reject(error); - } else { - try { - const json = JSON.parse(body); - resolve(json); - } catch (err) { - resolve({ error: 'error' }); - } - } - }); - }); - }, - getPriceData(symbol: string, timestamp: number, timeDelay?: number): any { - return new Promise((resolve, reject) => { - if (symbol === 'WETH') { - symbol = 'ETH'; - } - let parsedParams = querystring.stringify({ - fsym: symbol, - tsyms: 'USD', - ts: timestamp / 1000, - }); - console.debug(parsedParams); - setTimeout(() => { - request(PRICE_API_ENDPOINT + '?' + parsedParams, (error, response, body) => { - if (error) { - reject(error); - } else { - resolve(JSON.parse(body)); - } - }); - }, timeDelay); - }); - }, - getRelayers(): any { - return new Promise((resolve, reject) => { - request(RELAYER_REGISTRY_JSON, (error, response, body) => { - if (error) { - reject(error); - } else { - resolve(JSON.parse(body)); - } - }); - }); - }, - async getOrderBook(sraEndpoint: string): Promise { - const relayerClient = new HttpClient(sraEndpoint); - const tokenResponse: TokenPairsItem[] = await relayerClient.getTokenPairsAsync(); - const fullOrderBook: OrderbookResponse[] = []; - for (const tokenPair of tokenResponse) { - const orderBookRequest: OrderbookRequest = { - baseTokenAddress: tokenPair.tokenA.address, - quoteTokenAddress: tokenPair.tokenB.address, - }; - const orderBook: OrderbookResponse = await relayerClient.getOrderbookAsync(orderBookRequest); - fullOrderBook.push(orderBook); - } - return fullOrderBook; - }, - // async getHistoricalPrices( - // fromSymbol: string, - // toSymbol: string, - // fromTimestamp: number, - // toTimestamp: number, - // ): Promise { - // const daysInQueryPeriod = Math.round((toTimestamp - fromTimestamp) / (SECONDS_PER_DAY)); - // if(fromSymbol === 'WETH') { - // fromSymbol = 'ETH'; - // } - // var parsedParams = { - // fsym: fromSymbol, - // tsym: toSymbol, - // limit: Math.min(daysInQueryPeriod, API_HIST_LIMIT), - // toTs: toTimestamp, - // }; - // var options = { - // uri: HIST_PRICE_API_ENDPOINT, - // qs: parsedParams, - // json: false, - // }; - // try { - // const response = await rpn(options); - // return Promise.resolve(JSON.parse(response)); - // } catch (error) { - // console.debug(error); - // return Promise.reject(error); - // } - // }, -}; -export const scrapeDataScripts = { - scrapeAllPricesToDB(fromTime: number, toTime: number) { - const fromDate = new Date(fromTime); - fromDate.setUTCHours(0); - fromDate.setUTCMinutes(0); - fromDate.setUTCSeconds(0); - fromDate.setUTCMilliseconds(0); - const toDate = new Date(toTime); - postgresClient - .query(dataFetchingQueries.get_token_registry, []) - .then((result: any) => { - for (const curDate = fromDate; curDate < toDate; curDate.setDate(curDate.getDate() + 1)) { - for (const token of Object.values(result.rows)) { - console.debug('Scraping ' + curDate + ' ' + token); - q.push(_scrapePriceToDB(curDate.getTime(), token, 500)); - } - } - }) - .catch((err: any) => { - console.debug(err); - }); - }, -}; -function _scrapeEventsToDB(fromBlock: number, toBlock: number): any { - return (cb: () => void) => { - pullDataScripts - .getAllEvents(fromBlock, toBlock) - .then((data: any) => { - const parsedEvents: any = {}; - parsedEvents[ExchangeEvents.LogFill] = []; - parsedEvents[ExchangeEvents.LogCancel] = []; - parsedEvents[ExchangeEvents.LogError] = []; - for (const index in data) { - for (const datum of data[index]) { - const event = typeConverters.convertLogEventToEventObject(datum); - parsedEvents[event.event_type].push(event); - } - } - console.log(fromBlock + ' : ' + toBlock + ' ' + parsedEvents[ExchangeEvents.LogFill].length); - for (const event_type in parsedEvents) { - if (parsedEvents[event_type].length > 0) { - insertDataScripts - .insertMultipleRows( - 'events_raw', - parsedEvents[event_type], - Object.keys(parsedEvents[event_type][0]), - ) - .then(() => {}) - .catch((error: any) => {}); - } - } - cb(); - }) - .catch((err: any) => { - cb(); - }); - }; -} -function _scrapeBlockToDB(block: number): any { - return (cb: () => void) => { - pullDataScripts - .getBlockInfo(block) - .then((data: any) => { - const parsedBlock = typeConverters.convertLogBlockToBlockObject(data); - insertDataScripts - .insertSingleRow('blocks', parsedBlock) - .then((result: any) => { - cb(); - }) - .catch((err: any) => { - cb(); - }); - }) - .catch((err: any) => { - cb(); - }); - }; -} -// function _scrapeAllRelayersToDB(): any { -// return (cb: () => void) => { -// airtableBase(AIRTABLE_RELAYER_INFO) -// .select() -// .eachPage((records: any, fetchNextPage: () => void) => { -// const parsedRelayers: any[] = []; -// for(const record of records) { -// parsedRelayers.push(typeConverters.convertRelayerToRelayerObject(record)); -// } -// insertDataScripts.insertMultipleRows('relayers', parsedRelayers, Object.keys(parsedRelayers[0])) -// .then((result: any) => { -// cb(); -// }) -// .catch((err: any) => { -// cb(); -// }); -// }) -// .catch((err: any) => { -// cb(); -// }); -// }; -// } -function _scrapeAllRelayersToDB(): any { - return (cb: () => void) => { - pullDataScripts - .getRelayers() - .then((relayers: any[]) => { - console.log(relayers); - const parsedRelayers: any[] = []; - for (const relayer of relayers) { - parsedRelayers.push(typeConverters.convertRelayerToRelayerObject(relayer)); - } - console.log(parsedRelayers); - insertDataScripts - .insertMultipleRows('relayers', parsedRelayers, Object.keys(relayer.tableProperties)) - .then((result: any) => { - console.log(result); - cb(); - }) - .catch((err: any) => { - console.log(err); - cb(); - }); - }) - .catch((err: any) => { - cb(); - }); - }; -} -function _scrapeTransactionToDB(transactionHash: string): any { - return (cb: () => void) => { - pullDataScripts - .getTransactionInfo(transactionHash) - .then((data: any) => { - const parsedTransaction = typeConverters.convertLogTransactionToTransactionObject(data); - insertDataScripts - .insertSingleRow('transactions', parsedTransaction) - .then((result: any) => { - cb(); - }) - .catch((err: any) => { - cb(); - }); - }) - .catch((err: any) => { - cb(); - }); - }; -} -function _scrapeTokenRegistryToDB(): any { - return (cb: () => void) => { - pullDataScripts - .getTokenRegistry() - .then((data: any) => { - const parsedTokens: any = []; - for (const token of data) { - parsedTokens.push(typeConverters.convertLogTokenToTokenObject(token)); - } - insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); - cb(); - }) - .catch((err: any) => { - cb(); - }); - }; -} -function _scrapeMetaMaskEthContractMetadataToDB(): any { - return (cb: () => void) => { - pullDataScripts - .getMetaMaskTokens() - .then((data: any) => { - const parsedTokens: any = []; - const dataArray = _.map(_.keys(data), (tokenAddress: string) => { - const value = _.get(data, tokenAddress); - return { - address: tokenAddress, - ...value, - }; - }); - const erc20TokensOnly = _.filter(dataArray, entry => { - const isErc20 = _.get(entry, 'erc20'); - return isErc20; - }); - for (const token of erc20TokensOnly) { - parsedTokens.push(typeConverters.convertMetaMaskTokenToTokenObject(token)); - } - insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); - cb(); - }) - .catch((err: any) => { - cb(); - }); - }; -} -function _scrapeEthplorerTopTokensToDB(): any { - return (cb: () => void) => { - pullDataScripts - .getEthplorerTopTokens() - .then((data: any) => { - const parsedTokens: any = []; - const tokens = _.get(data, 'tokens'); - for (const token of tokens) { - parsedTokens.push(typeConverters.convertMetaMaskTokenToTokenObject(token)); - } - insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); - cb(); - }) - .catch((err: any) => { - cb(); - }); - }; -} -function _scrapeUnknownTokenInformationToDB(): any { - return (cb: () => void) => { - postgresClient - .query(dataFetchingQueries.get_top_unknown_token_addresses) - .then(async (result: any) => { - const addresses = _.map(result.rows, row => _.get(row, 'address')); - const responses = await Promise.all( - _.map(addresses, address => pullDataScripts.getEthplorerToken(address)), - ); - const tokens = _.filter(responses, response => _.isUndefined(_.get(response, 'error'))); - const parsedTokens = _.map(tokens, tokenInfo => - typeConverters.convertEthplorerTokenToTokenObject(tokenInfo), - ); - insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); - cb(); - }) - .catch((err: any) => { - cb(); - }); - }; -} -function _scrapePriceToDB(timestamp: number, token: any, timeDelay?: number): any { - return (cb: () => void) => { - pullDataScripts - .getPriceData(token.symbol, timestamp, timeDelay) - .then((data: any) => { - const safeSymbol = token.symbol === 'WETH' ? 'ETH' : token.symbol; - const parsedPrice = { - timestamp: timestamp / 1000, - symbol: token.symbol, - base: 'USD', - price: _.has(data[safeSymbol], 'USD') ? data[safeSymbol].USD : 0, - }; - console.debug('Inserting ' + timestamp); - console.debug(parsedPrice); - insertDataScripts.insertSingleRow('prices', parsedPrice); - cb(); - }) - .catch((err: any) => { - console.debug(err); - cb(); - }); - }; -} -// function _scrapeHistoricalPricesToDB(token: any, fromTimestamp: number, toTimestamp: number): any { -// return (cb: () => void) => { -// pullDataScripts -// .getHistoricalPrices(token, BASE_SYMBOL, fromTimestamp, toTimestamp) -// .then((data: any) => { -// const parsedHistoricalPrices: any = []; -// for (const historicalPrice of data['Data']) { -// const parsedHistoricalPrice = typeConverters.convertLogHistoricalPricesToHistoricalPricesObject(historicalPrice); -// parsedHistoricalPrice['token'] = token; -// parsedHistoricalPrice['base'] = BASE_SYMBOL; -// parsedHistoricalPrices.push(parsedHistoricalPrice); -// } -// if (parsedHistoricalPrices.length > 0) { -// insertDataScripts -// .insertMultipleRows( -// 'historical_prices', -// parsedHistoricalPrices, -// Object.keys(parsedHistoricalPrices[0]), -// ) -// .catch((error: any) => { -// console.error(error); -// }); -// } -// cb(); -// }) -// .catch((error: any) => { -// console.error(error); -// cb(); -// }); -// }; -// } -function _scrapeOrderBookToDB(id: string, sraEndpoint: string): any { - return (cb: () => void) => { - pullDataScripts - .getOrderBook(sraEndpoint) - .then((data: any) => { - for (const book of data) { - for (const order of book.bids) { - console.debug(order); - const parsedOrder = typeConverters.convertLogOrderToOrderObject(order); - parsedOrder.relayer_id = id; - parsedOrder.order_hash = ZeroEx.getOrderHashHex(order); - insertDataScripts.insertSingleRow('orders', parsedOrder).catch((error: any) => { - console.error(error); - }); - } - for (const order of book.asks) { - console.debug(order); - const parsedOrder = typeConverters.convertLogOrderToOrderObject(order); - parsedOrder.relayer_id = id; - parsedOrder.order_hash = ZeroEx.getOrderHashHex(order); - insertDataScripts.insertSingleRow('orders', parsedOrder).catch((error: any) => { - console.error(error); - }); - } - } - cb(); - }) - .catch((error: any) => { - console.error(error); - cb(); - }); - }; -} -if (cli.type === 'events') { - if (cli.from && cli.to) { - const destToBlock = cli.to ? cli.to : cli.from; - let curFromBlock = cli.from; - let curToBlock = curFromBlock; - do { - curToBlock += destToBlock - curToBlock < BLOCK_INCREMENTS ? destToBlock - curToBlock : BLOCK_INCREMENTS; - q.push(_scrapeEventsToDB(curFromBlock, curToBlock)); - curFromBlock = curToBlock + 1; - } while (curToBlock < destToBlock); - } -} else if (cli.type === 'blocks') { - if (cli.from && cli.to) { - if (cli.force) { - const destToBlock = cli.to ? cli.to : cli.from; - let curFromBlock = cli.from; - const curToBlock = curFromBlock; - for (; curFromBlock < destToBlock; curFromBlock++) { - q.push(_scrapeBlockToDB(curFromBlock)); - } - } else { - const fetchFrom = cli.from; - const fetchTo = cli.to ? cli.to : cli.from + 1; - postgresClient - .query(dataFetchingQueries.get_used_block_numbers, [fetchFrom, fetchTo]) - .then((data: any) => { - for (const row of data.rows) { - q.push(_scrapeBlockToDB(row.block_number)); - } - }) - .catch((err: any) => { - // console.debug(err); - }); - } - } -} else if (cli.type === 'transactions') { - if (cli.id) { - q.push(_scrapeTransactionToDB(cli.id)); - } else if (cli.from) { - const fetchFrom = cli.from; - const fetchTo = cli.to ? cli.to : cli.from + 1; - postgresClient - .query(dataFetchingQueries.get_missing_txn_hashes, [fetchFrom, fetchTo]) - .then((data: any) => { - for (const row of data.rows) { - q.push(_scrapeTransactionToDB(row.txn_hash)); - } - }) - .catch((err: any) => { - // console.debug(err); - }); - } -} else if (cli.type === 'tokens') { - q.push(_scrapeMetaMaskEthContractMetadataToDB()); - q.push(_scrapeEthplorerTopTokensToDB()); -} else if (cli.type === 'unknown_tokens') { - q.push(_scrapeUnknownTokenInformationToDB()); -} else if (cli.type === 'prices' && cli.from && cli.to) { - const fromDate = new Date(cli.from); - console.debug(fromDate); - fromDate.setUTCHours(0); - fromDate.setUTCMinutes(0); - fromDate.setUTCSeconds(0); - fromDate.setUTCMilliseconds(0); - console.debug(fromDate); - const toDate = new Date(cli.to); - postgresClient - .query(dataFetchingQueries.get_token_registry, []) - .then((result: any) => { - for (const curDate = fromDate; curDate < toDate; curDate.setDate(curDate.getDate() + 1)) { - for (const token of Object.values(result.rows)) { - console.debug('Scraping ' + curDate + ' ' + token); - q.push(_scrapePriceToDB(curDate.getTime(), token)); - } - } - }) - .catch((err: any) => { - console.debug(err); - }); - // } else if (cli.type === 'historical_prices') { - // if (cli.token && cli.from && cli.to) { - // q.push(_scrapeHistoricalPricesToDB(cli.token, cli.from, cli.to)); - // } - // } else if (cli.type === 'all_historical_prices') { - // if (cli.from && cli.to) { - // postgresClient - // .query(dataFetchingQueries.get_token_registry, []) - // .then((result: any) => { - // const curTokens: any = result.rows.map((a: any): any => a.symbol); - // for (const curToken of curTokens) { - // console.debug('Historical data backfill: Pushing coin ' + curToken); - // q.push(_scrapeHistoricalPricesToDB(curToken, cli.from, cli.to)); - // } - // }) - // .catch((err: any) => { - // console.debug(err); - // }); - // } -} else if (cli.type === 'relayers') { - q.push(_scrapeAllRelayersToDB()); -} else if (cli.type === 'orders') { - postgresClient.query(dataFetchingQueries.get_relayers, []).then((result: any) => { - for (const relayer of result.rows) { - if (relayer.sra_http_url) { - q.push(_scrapeOrderBookToDB(relayer.id, relayer.sra_http_url)); - } - } - }); -} -- cgit v1.2.3 From 954c3b9272556723c10fd02ad6b753ac98ca47fa Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Wed, 7 Nov 2018 16:48:25 -0800 Subject: Split index.ts into multiple scripts in scripts/ and detect last known block when pulling events --- packages/pipeline/src/scripts/merge_v2_events.ts | 81 ++++++++++++++++++++++ .../pipeline/src/scripts/pull_missing_events.ts | 60 ++++++++++++++++ .../pipeline/src/scripts/update_relayer_info.ts | 31 +++++++++ 3 files changed, 172 insertions(+) create mode 100644 packages/pipeline/src/scripts/merge_v2_events.ts create mode 100644 packages/pipeline/src/scripts/pull_missing_events.ts create mode 100644 packages/pipeline/src/scripts/update_relayer_info.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/merge_v2_events.ts b/packages/pipeline/src/scripts/merge_v2_events.ts new file mode 100644 index 000000000..227ece121 --- /dev/null +++ b/packages/pipeline/src/scripts/merge_v2_events.ts @@ -0,0 +1,81 @@ +import { web3Factory } from '@0x/dev-utils'; +import 'reflect-metadata'; +import { Connection, createConnection } from 'typeorm'; + +import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; +import { ExchangeFillEvent } from '../entities/ExchangeFillEvent'; +import { deployConfig } from '../ormconfig'; +import { parseExchangeEvents } from '../parsers/events'; + +let connection: Connection; + +(async () => { + connection = await createConnection(deployConfig); + await getExchangeEventsAsync(); + await mergeExchangeEventsAsync(); + console.log('Exiting process'); + process.exit(0); +})(); + +// TODO(albrow): Separately: Errors do not appear to be handled correctly. If you use the +// wrong rpcUrl it just returns early with no error. +async function getExchangeEventsAsync(): Promise { + console.log('Getting event logs...'); + const provider = web3Factory.getRpcProvider({ + rpcUrl: 'https://mainnet.infura.io', + }); + const eventsRepository = connection.getRepository(ExchangeFillEvent); + const exchangeEvents = new ExchangeEventsSource(provider, 1); + const eventLogs = await exchangeEvents.getFillEventsAsync(); + console.log('Parsing events...'); + const events = parseExchangeEvents(eventLogs); + console.log(`Retrieved and parsed ${events.length} total events.`); + console.log('Saving events...'); + for (const event of events) { + await eventsRepository.save(event); + } + await eventsRepository.save(events); + console.log('Saved events.'); +} + +const insertEventsRawQuery = `INSERT INTO events_raw ( + event_type, + error_id, + order_hash, + maker, + maker_amount, + maker_fee, + maker_token, + taker, + taker_amount, + taker_fee, + taker_token, + txn_hash, + fee_recipient, + block_number, + log_index +) +( + SELECT + 'LogFill', + null, + "orderHash", + "makerAddress", + "makerAssetFilledAmount"::numeric(78), + "makerFeePaid"::numeric(78), + "makerTokenAddress", + "takerAddress", + "takerAssetFilledAmount"::numeric(78), + "takerFeePaid"::numeric(78), + "takerTokenAddress", + "transactionHash", + "feeRecipientAddress", + "blockNumber", + "logIndex" + FROM exchange_fill_event +) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`; + +async function mergeExchangeEventsAsync(): Promise { + console.log('Merging results into events_raw...'); + await connection.query(insertEventsRawQuery); +} diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts new file mode 100644 index 000000000..1f71722a3 --- /dev/null +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -0,0 +1,60 @@ +import { web3Factory } from '@0x/dev-utils'; +import { Web3ProviderEngine } from '@0x/subproviders'; +import R = require('ramda'); +import 'reflect-metadata'; +import { Connection, createConnection, Repository } from 'typeorm'; + +import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; +import { ExchangeFillEvent } from '../entities/ExchangeFillEvent'; +import { deployConfig } from '../ormconfig'; +import { parseExchangeEvents } from '../parsers/events'; + +const EXCHANGE_START_BLOCK = 6271590; // Block number when the Exchange contract was deployed to mainnet. +const START_BLOCK_OFFSET = 1000; // Number of blocks before the last known block to consider when updating fill events. +const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. + +let connection: Connection; + +(async () => { + connection = await createConnection(deployConfig); + const provider = web3Factory.getRpcProvider({ + rpcUrl: 'https://mainnet.infura.io', + }); + await getExchangeEventsAsync(provider); + process.exit(0); +})(); + +async function getExchangeEventsAsync(provider: Web3ProviderEngine): Promise { + console.log('Checking existing event logs...'); + const eventsRepository = connection.getRepository(ExchangeFillEvent); + const startBlock = await getStartBlockAsync(eventsRepository); + console.log(`Getting event logs starting at ${startBlock}...`); + const exchangeEvents = new ExchangeEventsSource(provider, 1); + const eventLogs = await exchangeEvents.getFillEventsAsync(startBlock); + console.log('Parsing events...'); + const events = parseExchangeEvents(eventLogs); + console.log(`Retrieved and parsed ${events.length} total events.`); + console.log('Saving events...'); + // Split the events into batches of size BATCH_SAVE_SIZE and save each batch + // in a single request. This reduces round-trip latency to the DB. We need + // to batch this way because saving an extremely large number of events in a + // single request causes problems. + for (const eventsBatch of R.splitEvery(BATCH_SAVE_SIZE, events)) { + await eventsRepository.save(eventsBatch); + } + const totalEvents = await eventsRepository.count(); + console.log(`Done saving events. There are now ${totalEvents} total events.`); +} + +async function getStartBlockAsync(eventsRepository: Repository): Promise { + const fillEventCount = await eventsRepository.count(); + if (fillEventCount === 0) { + console.log('No existing fill events found.'); + return EXCHANGE_START_BLOCK; + } + const queryResult = await connection.query( + 'SELECT "blockNumber" FROM exchange_fill_event ORDER BY "blockNumber" DESC LIMIT 1', + ); + const lastKnownBlock = queryResult[0].blockNumber; + return lastKnownBlock - START_BLOCK_OFFSET; +} diff --git a/packages/pipeline/src/scripts/update_relayer_info.ts b/packages/pipeline/src/scripts/update_relayer_info.ts new file mode 100644 index 000000000..05e045ff4 --- /dev/null +++ b/packages/pipeline/src/scripts/update_relayer_info.ts @@ -0,0 +1,31 @@ +import 'reflect-metadata'; +import { Connection, createConnection } from 'typeorm'; + +import { RelayerRegistrySource } from '../data_sources/relayer-registry'; +import { Relayer } from '../entities/Relayer'; +import { deployConfig } from '../ormconfig'; +import { parseRelayers } from '../parsers/relayer_registry'; + +// NOTE(albrow): We need to manually update this URL for now. Fix this when we +// have the relayer-registry behind semantic versioning. +const RELAYER_REGISTRY_URL = + 'https://raw.githubusercontent.com/0xProject/0x-relayer-registry/4701c85677d161ea729a466aebbc1826c6aa2c0b/relayers.json'; + +let connection: Connection; + +(async () => { + connection = await createConnection(deployConfig); + await getRelayers(); + process.exit(0); +})(); + +async function getRelayers(): Promise { + console.log('Getting latest relayer info...'); + const relayerRepository = connection.getRepository(Relayer); + const relayerSource = new RelayerRegistrySource(RELAYER_REGISTRY_URL); + const relayersResp = await relayerSource.getRelayerInfoAsync(); + const relayers = parseRelayers(relayersResp); + console.log('Saving relayer info...'); + await relayerRepository.save(relayers); + console.log('Done saving relayer info.'); +} -- cgit v1.2.3 From ccad046eb649a60fdf7319a075fa41490d593ae8 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Thu, 8 Nov 2018 10:15:09 -0800 Subject: Reorganize entities. Make scripts work from any directory. --- packages/pipeline/src/scripts/merge_v2_events.ts | 2 +- packages/pipeline/src/scripts/pull_missing_events.ts | 2 +- packages/pipeline/src/scripts/update_relayer_info.ts | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/merge_v2_events.ts b/packages/pipeline/src/scripts/merge_v2_events.ts index 227ece121..99a76aa61 100644 --- a/packages/pipeline/src/scripts/merge_v2_events.ts +++ b/packages/pipeline/src/scripts/merge_v2_events.ts @@ -3,7 +3,7 @@ import 'reflect-metadata'; import { Connection, createConnection } from 'typeorm'; import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; -import { ExchangeFillEvent } from '../entities/ExchangeFillEvent'; +import { ExchangeFillEvent } from '../entities'; import { deployConfig } from '../ormconfig'; import { parseExchangeEvents } from '../parsers/events'; diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index 1f71722a3..a108f012f 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -5,7 +5,7 @@ import 'reflect-metadata'; import { Connection, createConnection, Repository } from 'typeorm'; import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; -import { ExchangeFillEvent } from '../entities/ExchangeFillEvent'; +import { ExchangeFillEvent } from '../entities'; import { deployConfig } from '../ormconfig'; import { parseExchangeEvents } from '../parsers/events'; diff --git a/packages/pipeline/src/scripts/update_relayer_info.ts b/packages/pipeline/src/scripts/update_relayer_info.ts index 05e045ff4..f54e16b6c 100644 --- a/packages/pipeline/src/scripts/update_relayer_info.ts +++ b/packages/pipeline/src/scripts/update_relayer_info.ts @@ -2,7 +2,7 @@ import 'reflect-metadata'; import { Connection, createConnection } from 'typeorm'; import { RelayerRegistrySource } from '../data_sources/relayer-registry'; -import { Relayer } from '../entities/Relayer'; +import { Relayer } from '../entities'; import { deployConfig } from '../ormconfig'; import { parseRelayers } from '../parsers/relayer_registry'; -- cgit v1.2.3 From 410a9244952b663fa8e56cc778b712ae228bbd82 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Thu, 8 Nov 2018 11:38:37 -0800 Subject: Add better error handling for immediately invoked async functions --- packages/pipeline/src/scripts/merge_v2_events.ts | 81 ---------------------- .../pipeline/src/scripts/pull_missing_events.ts | 7 +- .../pipeline/src/scripts/update_relayer_info.ts | 3 +- 3 files changed, 6 insertions(+), 85 deletions(-) delete mode 100644 packages/pipeline/src/scripts/merge_v2_events.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/merge_v2_events.ts b/packages/pipeline/src/scripts/merge_v2_events.ts deleted file mode 100644 index 99a76aa61..000000000 --- a/packages/pipeline/src/scripts/merge_v2_events.ts +++ /dev/null @@ -1,81 +0,0 @@ -import { web3Factory } from '@0x/dev-utils'; -import 'reflect-metadata'; -import { Connection, createConnection } from 'typeorm'; - -import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; -import { ExchangeFillEvent } from '../entities'; -import { deployConfig } from '../ormconfig'; -import { parseExchangeEvents } from '../parsers/events'; - -let connection: Connection; - -(async () => { - connection = await createConnection(deployConfig); - await getExchangeEventsAsync(); - await mergeExchangeEventsAsync(); - console.log('Exiting process'); - process.exit(0); -})(); - -// TODO(albrow): Separately: Errors do not appear to be handled correctly. If you use the -// wrong rpcUrl it just returns early with no error. -async function getExchangeEventsAsync(): Promise { - console.log('Getting event logs...'); - const provider = web3Factory.getRpcProvider({ - rpcUrl: 'https://mainnet.infura.io', - }); - const eventsRepository = connection.getRepository(ExchangeFillEvent); - const exchangeEvents = new ExchangeEventsSource(provider, 1); - const eventLogs = await exchangeEvents.getFillEventsAsync(); - console.log('Parsing events...'); - const events = parseExchangeEvents(eventLogs); - console.log(`Retrieved and parsed ${events.length} total events.`); - console.log('Saving events...'); - for (const event of events) { - await eventsRepository.save(event); - } - await eventsRepository.save(events); - console.log('Saved events.'); -} - -const insertEventsRawQuery = `INSERT INTO events_raw ( - event_type, - error_id, - order_hash, - maker, - maker_amount, - maker_fee, - maker_token, - taker, - taker_amount, - taker_fee, - taker_token, - txn_hash, - fee_recipient, - block_number, - log_index -) -( - SELECT - 'LogFill', - null, - "orderHash", - "makerAddress", - "makerAssetFilledAmount"::numeric(78), - "makerFeePaid"::numeric(78), - "makerTokenAddress", - "takerAddress", - "takerAssetFilledAmount"::numeric(78), - "takerFeePaid"::numeric(78), - "takerTokenAddress", - "transactionHash", - "feeRecipientAddress", - "blockNumber", - "logIndex" - FROM exchange_fill_event -) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`; - -async function mergeExchangeEventsAsync(): Promise { - console.log('Merging results into events_raw...'); - await connection.query(insertEventsRawQuery); -} diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index a108f012f..cca0d9cfe 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -8,6 +8,7 @@ import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange import { ExchangeFillEvent } from '../entities'; import { deployConfig } from '../ormconfig'; import { parseExchangeEvents } from '../parsers/events'; +import { handleError } from '../utils'; const EXCHANGE_START_BLOCK = 6271590; // Block number when the Exchange contract was deployed to mainnet. const START_BLOCK_OFFSET = 1000; // Number of blocks before the last known block to consider when updating fill events. @@ -22,7 +23,7 @@ let connection: Connection; }); await getExchangeEventsAsync(provider); process.exit(0); -})(); +})().catch(handleError); async function getExchangeEventsAsync(provider: Web3ProviderEngine): Promise { console.log('Checking existing event logs...'); @@ -53,8 +54,8 @@ async function getStartBlockAsync(eventsRepository: Repository { console.log('Getting latest relayer info...'); -- cgit v1.2.3 From 329c68f610843ebded9ca31fc9cd6f3eed744a8e Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Mon, 12 Nov 2018 16:40:20 -0800 Subject: Configure TypeORM for migrations. Add new package.json scripts. --- packages/pipeline/src/scripts/pull_missing_events.ts | 6 +++--- packages/pipeline/src/scripts/update_relayer_info.ts | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index cca0d9cfe..e2b312280 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -2,11 +2,11 @@ import { web3Factory } from '@0x/dev-utils'; import { Web3ProviderEngine } from '@0x/subproviders'; import R = require('ramda'); import 'reflect-metadata'; -import { Connection, createConnection, Repository } from 'typeorm'; +import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; import { ExchangeFillEvent } from '../entities'; -import { deployConfig } from '../ormconfig'; +import * as ormConfig from '../ormconfig'; import { parseExchangeEvents } from '../parsers/events'; import { handleError } from '../utils'; @@ -17,7 +17,7 @@ const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. let connection: Connection; (async () => { - connection = await createConnection(deployConfig); + connection = await createConnection(ormConfig as ConnectionOptions); const provider = web3Factory.getRpcProvider({ rpcUrl: 'https://mainnet.infura.io', }); diff --git a/packages/pipeline/src/scripts/update_relayer_info.ts b/packages/pipeline/src/scripts/update_relayer_info.ts index 051289992..af9dd726e 100644 --- a/packages/pipeline/src/scripts/update_relayer_info.ts +++ b/packages/pipeline/src/scripts/update_relayer_info.ts @@ -1,9 +1,9 @@ import 'reflect-metadata'; -import { Connection, createConnection } from 'typeorm'; +import { Connection, ConnectionOptions, createConnection } from 'typeorm'; import { RelayerRegistrySource } from '../data_sources/relayer-registry'; import { Relayer } from '../entities'; -import { deployConfig } from '../ormconfig'; +import * as ormConfig from '../ormconfig'; import { parseRelayers } from '../parsers/relayer_registry'; import { handleError } from '../utils'; @@ -15,7 +15,7 @@ const RELAYER_REGISTRY_URL = let connection: Connection; (async () => { - connection = await createConnection(deployConfig); + connection = await createConnection(ormConfig as ConnectionOptions); await getRelayers(); process.exit(0); })().catch(handleError); -- cgit v1.2.3 From 688d277b30b287f66f0dbd49f2a23cab8b256219 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Mon, 12 Nov 2018 17:36:33 -0800 Subject: Configure linter with --format stylish and fix linter errors --- packages/pipeline/src/scripts/pull_missing_events.ts | 1 + packages/pipeline/src/scripts/update_relayer_info.ts | 1 + 2 files changed, 2 insertions(+) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index e2b312280..bc0eac853 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -1,3 +1,4 @@ +// tslint:disable:no-console import { web3Factory } from '@0x/dev-utils'; import { Web3ProviderEngine } from '@0x/subproviders'; import R = require('ramda'); diff --git a/packages/pipeline/src/scripts/update_relayer_info.ts b/packages/pipeline/src/scripts/update_relayer_info.ts index af9dd726e..f8918728d 100644 --- a/packages/pipeline/src/scripts/update_relayer_info.ts +++ b/packages/pipeline/src/scripts/update_relayer_info.ts @@ -1,3 +1,4 @@ +// tslint:disable:no-console import 'reflect-metadata'; import { Connection, ConnectionOptions, createConnection } from 'typeorm'; -- cgit v1.2.3 From 55bbe1954b35ff0a6367f1ff820d32a32b48eff3 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Tue, 13 Nov 2018 14:05:49 -0800 Subject: Preliminary work for adding RR order book scraping --- .../src/scripts/pull_radar_relay_orders.ts | 52 ++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 packages/pipeline/src/scripts/pull_radar_relay_orders.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts new file mode 100644 index 000000000..c4d3f7095 --- /dev/null +++ b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts @@ -0,0 +1,52 @@ +// tslint:disable:no-console +import { HttpClient } from '@0x/connect'; +import * as R from 'ramda'; +import 'reflect-metadata'; +import { Connection, ConnectionOptions, createConnection } from 'typeorm'; + +import { SraOrder } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseSraOrders } from '../parsers/sra_orders'; +import { handleError } from '../utils'; + +const RADAR_RELAY_URL = 'https://api.radarrelay.com/0x/v2'; +const BATCH_SAVE_SIZE = 1000; // Number of orders to save at once. +const ORDERS_PER_PAGE = 10000; // Number of orders to get per request. + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + await getOrderbook(); + process.exit(0); +})().catch(handleError); + +async function getOrderbook(): Promise { + console.log('Getting all orders...'); + const connectClient = new HttpClient(RADAR_RELAY_URL); + const rawOrders = await connectClient.getOrdersAsync({ + perPage: ORDERS_PER_PAGE, + }); + console.log(`Got ${rawOrders.records.length} orders.`); + console.log('Parsing orders...'); + const orders = R.pipe(parseSraOrders, R.map(setSourceUrl(RADAR_RELAY_URL)))(rawOrders); + const ordersRepository = connection.getRepository(SraOrder); + // TODO(albrow): Move batch saving to a utility function to reduce + // duplicated code. + for (const ordersBatch of R.splitEvery(BATCH_SAVE_SIZE, orders)) { + await ordersRepository.save(ordersBatch); + } +} + +const sourceUrlProp = R.lensProp('sourceUrl'); + +const setSourceUrl = R.curry((sourceURL: string, order: SraOrder): SraOrder => { + return R.set(sourceUrlProp, sourceURL, order); +}); + +const firstSeenProp = R.lensProp('firstSeenTimestamp'); +const lastUpdatedProp = R.lensProp('lastUpdatedTimestamp'); + +const setFirstSeen = R.curry((sourceURL: string, order: SraOrder): SraOrder => { + return R.set(firstSeenTimestampProp, sourceURL, order); +}); -- cgit v1.2.3 From 26280e4aba147ad6000b9df309e64db84b6932fc Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Tue, 13 Nov 2018 15:33:43 -0800 Subject: Implement scraping sra orders from radar relay --- .../src/scripts/pull_radar_relay_orders.ts | 33 +++++++++++----------- 1 file changed, 17 insertions(+), 16 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts index c4d3f7095..b3a4d887e 100644 --- a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts +++ b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts @@ -2,15 +2,14 @@ import { HttpClient } from '@0x/connect'; import * as R from 'ramda'; import 'reflect-metadata'; -import { Connection, ConnectionOptions, createConnection } from 'typeorm'; +import { Connection, ConnectionOptions, createConnection, EntityManager } from 'typeorm'; -import { SraOrder } from '../entities'; +import { createObservedTimestampForOrder, SraOrder } from '../entities'; import * as ormConfig from '../ormconfig'; import { parseSraOrders } from '../parsers/sra_orders'; import { handleError } from '../utils'; const RADAR_RELAY_URL = 'https://api.radarrelay.com/0x/v2'; -const BATCH_SAVE_SIZE = 1000; // Number of orders to save at once. const ORDERS_PER_PAGE = 10000; // Number of orders to get per request. let connection: Connection; @@ -29,24 +28,26 @@ async function getOrderbook(): Promise { }); console.log(`Got ${rawOrders.records.length} orders.`); console.log('Parsing orders...'); + // Parse the sra orders, then add source url to each. const orders = R.pipe(parseSraOrders, R.map(setSourceUrl(RADAR_RELAY_URL)))(rawOrders); - const ordersRepository = connection.getRepository(SraOrder); - // TODO(albrow): Move batch saving to a utility function to reduce - // duplicated code. - for (const ordersBatch of R.splitEvery(BATCH_SAVE_SIZE, orders)) { - await ordersRepository.save(ordersBatch); - } + // Save all the orders and update the observed time stamps in a single + // transaction. + console.log('Saving orders and updating timestamps...'); + await connection.transaction(async (manager: EntityManager): Promise => { + for (const order of orders) { + await manager.save(SraOrder, order); + const observedTimestamp = createObservedTimestampForOrder(order); + await manager.save(observedTimestamp); + } + }); } const sourceUrlProp = R.lensProp('sourceUrl'); +/** + * Sets the source url for a single order. Returns a new order instead of + * mutating the given one. + */ const setSourceUrl = R.curry((sourceURL: string, order: SraOrder): SraOrder => { return R.set(sourceUrlProp, sourceURL, order); }); - -const firstSeenProp = R.lensProp('firstSeenTimestamp'); -const lastUpdatedProp = R.lensProp('lastUpdatedTimestamp'); - -const setFirstSeen = R.curry((sourceURL: string, order: SraOrder): SraOrder => { - return R.set(firstSeenTimestampProp, sourceURL, order); -}); -- cgit v1.2.3 From 10e93bb01ffcf029821430781ef582a24901a461 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Wed, 14 Nov 2018 14:39:34 -0800 Subject: Add raw schema prefix to query in pull_missing_events --- packages/pipeline/src/scripts/pull_missing_events.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index bc0eac853..0af999a77 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -55,7 +55,7 @@ async function getStartBlockAsync(eventsRepository: Repository Date: Wed, 14 Nov 2018 15:58:36 -0800 Subject: Change some column types from varchar to numeric --- packages/pipeline/src/scripts/pull_missing_events.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index 0af999a77..1693bb59a 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -13,7 +13,7 @@ import { handleError } from '../utils'; const EXCHANGE_START_BLOCK = 6271590; // Block number when the Exchange contract was deployed to mainnet. const START_BLOCK_OFFSET = 1000; // Number of blocks before the last known block to consider when updating fill events. -const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. +const BATCH_SAVE_SIZE = 10000; // Number of events to save at once. let connection: Connection; -- cgit v1.2.3 From b0a2c10e11fa41e2800d4fe67d8008b5828128a3 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Wed, 14 Nov 2018 16:20:07 -0800 Subject: Use built-in chunk feature of TypeORM save method --- packages/pipeline/src/scripts/pull_missing_events.ts | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index 1693bb59a..b1b8665dd 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -29,6 +29,7 @@ let connection: Connection; async function getExchangeEventsAsync(provider: Web3ProviderEngine): Promise { console.log('Checking existing event logs...'); const eventsRepository = connection.getRepository(ExchangeFillEvent); + const manager = connection.createEntityManager(); const startBlock = await getStartBlockAsync(eventsRepository); console.log(`Getting event logs starting at ${startBlock}...`); const exchangeEvents = new ExchangeEventsSource(provider, 1); @@ -37,13 +38,7 @@ async function getExchangeEventsAsync(provider: Web3ProviderEngine): Promise Date: Wed, 14 Nov 2018 17:40:19 -0800 Subject: Fix chunk size in pull_missing_events --- packages/pipeline/src/scripts/pull_missing_events.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index b1b8665dd..f53bc12bd 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -38,7 +38,7 @@ async function getExchangeEventsAsync(provider: Web3ProviderEngine): Promise Date: Wed, 14 Nov 2018 18:39:06 -0800 Subject: Add workaround for broken save method --- .../pipeline/src/scripts/pull_missing_events.ts | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index f53bc12bd..2dc81f205 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -38,7 +38,27 @@ async function getExchangeEventsAsync(provider: Web3ProviderEngine): Promise Date: Thu, 15 Nov 2018 12:10:27 -0800 Subject: Optimize database operations in pull_missing_events script --- .../pipeline/src/scripts/pull_missing_events.ts | 35 ++++++++++++++++------ 1 file changed, 26 insertions(+), 9 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index 2dc81f205..bceed299c 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -8,12 +8,12 @@ import { Connection, ConnectionOptions, createConnection, Repository } from 'typ import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; import { ExchangeFillEvent } from '../entities'; import * as ormConfig from '../ormconfig'; -import { parseExchangeEvents } from '../parsers/events'; +import { ExchangeEventEntity, parseExchangeEvents } from '../parsers/events'; import { handleError } from '../utils'; const EXCHANGE_START_BLOCK = 6271590; // Block number when the Exchange contract was deployed to mainnet. -const START_BLOCK_OFFSET = 1000; // Number of blocks before the last known block to consider when updating fill events. -const BATCH_SAVE_SIZE = 10000; // Number of events to save at once. +const START_BLOCK_OFFSET = 100; // Number of blocks before the last known block to consider when updating fill events. +const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. let connection: Connection; @@ -38,17 +38,36 @@ async function getExchangeEventsAsync(provider: Web3ProviderEngine): Promise, + events: ExchangeEventEntity[], +): Promise { // Note(albrow): This is a temporary hack because `save` is not working as // documented and is causing a foreign key constraint violation. Hopefully // can remove later because this "poor man's upsert" implementation operates // on one event at a time and is therefore much slower. - // await eventsRepository.save(events, { chunk: Math.ceil(events.length / BATCH_SAVE_SIZE) }); for (const event of events) { try { - await eventsRepository.save(event); + // First try and insert. + await eventsRepository.insert(event); } catch { - // Assume this is a foreign key constraint error and try doing an - // update instead. + // If it fails, assume it was a foreign key constraint error and try + // doing an update instead. await eventsRepository.update( { contractAddress: event.contractAddress, @@ -59,8 +78,6 @@ async function getExchangeEventsAsync(provider: Web3ProviderEngine): Promise): Promise { -- cgit v1.2.3 From 24fd2d9730d58a58929f401674175ad8a5a7fbc1 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Fri, 16 Nov 2018 12:55:54 -0800 Subject: Add support for pulling Cancel and CancelUpTo events --- .../pipeline/src/scripts/pull_missing_events.ts | 123 ++++++++++++++------- 1 file changed, 81 insertions(+), 42 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index bceed299c..b2a99e3c0 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -1,14 +1,13 @@ // tslint:disable:no-console import { web3Factory } from '@0x/dev-utils'; -import { Web3ProviderEngine } from '@0x/subproviders'; import R = require('ramda'); import 'reflect-metadata'; import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; -import { ExchangeFillEvent } from '../entities'; +import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeEvent, ExchangeFillEvent } from '../entities'; import * as ormConfig from '../ormconfig'; -import { ExchangeEventEntity, parseExchangeEvents } from '../parsers/events'; +import { parseExchangeCancelEvents, parseExchangeCancelUpToEvents, parseExchangeFillEvents } from '../parsers/events'; import { handleError } from '../utils'; const EXCHANGE_START_BLOCK = 6271590; // Block number when the Exchange contract was deployed to mainnet. @@ -22,40 +21,88 @@ let connection: Connection; const provider = web3Factory.getRpcProvider({ rpcUrl: 'https://mainnet.infura.io', }); - await getExchangeEventsAsync(provider); + const eventsSource = new ExchangeEventsSource(provider, 1); + await getFillEventsAsync(eventsSource); + await getCancelEventsAsync(eventsSource); + await getCancelUpToEventsAsync(eventsSource); process.exit(0); })().catch(handleError); -async function getExchangeEventsAsync(provider: Web3ProviderEngine): Promise { - console.log('Checking existing event logs...'); - const eventsRepository = connection.getRepository(ExchangeFillEvent); - const manager = connection.createEntityManager(); - const startBlock = await getStartBlockAsync(eventsRepository); - console.log(`Getting event logs starting at ${startBlock}...`); - const exchangeEvents = new ExchangeEventsSource(provider, 1); - const eventLogs = await exchangeEvents.getFillEventsAsync(startBlock); - console.log('Parsing events...'); - const events = parseExchangeEvents(eventLogs); - console.log(`Retrieved and parsed ${events.length} total events.`); - console.log('Saving events...'); - if (startBlock === EXCHANGE_START_BLOCK) { +async function getFillEventsAsync(eventsSource: ExchangeEventsSource): Promise { + console.log('Checking existing fill events...'); + const repository = connection.getRepository(ExchangeFillEvent); + const startBlock = await getStartBlockAsync(repository); + console.log(`Getting fill events starting at ${startBlock}...`); + const eventLogs = await eventsSource.getFillEventsAsync(startBlock); + console.log('Parsing fill events...'); + const events = parseExchangeFillEvents(eventLogs); + console.log(`Retrieved and parsed ${events.length} total fill events.`); + await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); +} + +async function getCancelEventsAsync(eventsSource: ExchangeEventsSource): Promise { + console.log('Checking existing cancel events...'); + const repository = connection.getRepository(ExchangeCancelEvent); + const startBlock = await getStartBlockAsync(repository); + console.log(`Getting cancel events starting at ${startBlock}...`); + const eventLogs = await eventsSource.getCancelEventsAsync(startBlock); + console.log('Parsing cancel events...'); + const events = parseExchangeCancelEvents(eventLogs); + console.log(`Retrieved and parsed ${events.length} total cancel events.`); + await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); +} + +async function getCancelUpToEventsAsync(eventsSource: ExchangeEventsSource): Promise { + console.log('Checking existing CancelUpTo events...'); + const repository = connection.getRepository(ExchangeCancelUpToEvent); + const startBlock = await getStartBlockAsync(repository); + console.log(`Getting CancelUpTo events starting at ${startBlock}...`); + const eventLogs = await eventsSource.getCancelUpToEventsAsync(startBlock); + console.log('Parsing CancelUpTo events...'); + const events = parseExchangeCancelUpToEvents(eventLogs); + console.log(`Retrieved and parsed ${events.length} total CancelUpTo events.`); + await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); +} + +async function getStartBlockAsync(repository: Repository): Promise { + const fillEventCount = await repository.count(); + if (fillEventCount === 0) { + console.log(`No existing ${repository.metadata.name}s found.`); + return EXCHANGE_START_BLOCK; + } + const queryResult = await connection.query( + // TODO(albrow): Would prefer to use a prepared statement here to reduce + // surface area for SQL injections, but it doesn't appear to be working. + `SELECT block_number FROM raw.${repository.metadata.tableName} ORDER BY block_number DESC LIMIT 1`, + ); + const lastKnownBlock = queryResult[0].block_number; + return lastKnownBlock - START_BLOCK_OFFSET; +} + +async function saveEventsAsync( + isInitialPull: boolean, + repository: Repository, + events: T[], +): Promise { + console.log(`Saving ${repository.metadata.name}s...`); + if (isInitialPull) { // Split data into numChunks pieces of maximum size BATCH_SAVE_SIZE // each. for (const eventsBatch of R.splitEvery(BATCH_SAVE_SIZE, events)) { - await eventsRepository.insert(eventsBatch); + await repository.insert(eventsBatch); } } else { // If we possibly have some overlap where we need to update some // existing events, we need to use our workaround/fallback. - await saveIndividuallyWithFallbackAsync(eventsRepository, events); + await saveIndividuallyWithFallbackAsync(repository, events); } - const totalEvents = await eventsRepository.count(); - console.log(`Done saving events. There are now ${totalEvents} total events.`); + const totalEvents = await repository.count(); + console.log(`Done saving events. There are now ${totalEvents} total ${repository.metadata.name}s.`); } -async function saveIndividuallyWithFallbackAsync( - eventsRepository: Repository, - events: ExchangeEventEntity[], +async function saveIndividuallyWithFallbackAsync( + repository: Repository, + events: T[], ): Promise { // Note(albrow): This is a temporary hack because `save` is not working as // documented and is causing a foreign key constraint violation. Hopefully @@ -63,32 +110,24 @@ async function saveIndividuallyWithFallbackAsync( // on one event at a time and is therefore much slower. for (const event of events) { try { - // First try and insert. - await eventsRepository.insert(event); + // First try an insert. + await repository.insert(event); } catch { // If it fails, assume it was a foreign key constraint error and try // doing an update instead. - await eventsRepository.update( + // Note(albrow): Unfortunately the `as any` hack here seems + // required. I can't figure out how to convince the type-checker + // that the criteria and the entity itself are the correct type for + // the given repository. If we can remove the `save` hack then this + // will probably no longer be necessary. + await repository.update( { contractAddress: event.contractAddress, blockNumber: event.blockNumber, logIndex: event.logIndex, - }, - event, + } as any, + event as any, ); } } } - -async function getStartBlockAsync(eventsRepository: Repository): Promise { - const fillEventCount = await eventsRepository.count(); - if (fillEventCount === 0) { - console.log('No existing fill events found.'); - return EXCHANGE_START_BLOCK; - } - const queryResult = await connection.query( - 'SELECT block_number FROM raw.exchange_fill_events ORDER BY block_number DESC LIMIT 1', - ); - const lastKnownBlock = queryResult[0].block_number; - return lastKnownBlock - START_BLOCK_OFFSET; -} -- cgit v1.2.3 From 5cad2ad1744ab1c1e24ed52fc0a26ec5acf5c898 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Fri, 16 Nov 2018 13:16:17 -0800 Subject: Check for special characters in table name in pull_missing_events --- packages/pipeline/src/scripts/pull_missing_events.ts | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index b2a99e3c0..0b7f6287f 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -64,16 +64,20 @@ async function getCancelUpToEventsAsync(eventsSource: ExchangeEventsSource): Pro await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); } +const tabelNameRegex = /^[a-zA-Z_]*$/; + async function getStartBlockAsync(repository: Repository): Promise { const fillEventCount = await repository.count(); if (fillEventCount === 0) { console.log(`No existing ${repository.metadata.name}s found.`); return EXCHANGE_START_BLOCK; } + const tableName = repository.metadata.tableName; + if (!tabelNameRegex.test(tableName)) { + throw new Error('Unexpected special character in table name: ' + tableName); + } const queryResult = await connection.query( - // TODO(albrow): Would prefer to use a prepared statement here to reduce - // surface area for SQL injections, but it doesn't appear to be working. - `SELECT block_number FROM raw.${repository.metadata.tableName} ORDER BY block_number DESC LIMIT 1`, + `SELECT block_number FROM raw.${tableName} ORDER BY block_number DESC LIMIT 1`, ); const lastKnownBlock = queryResult[0].block_number; return lastKnownBlock - START_BLOCK_OFFSET; -- cgit v1.2.3 From 9986717671fe8e14c2168f7479bdaffe406bedc0 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Mon, 19 Nov 2018 18:38:11 -0800 Subject: Add script for pulling missing block data --- .../pipeline/src/scripts/pull_missing_blocks.ts | 83 ++++++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 packages/pipeline/src/scripts/pull_missing_blocks.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_blocks.ts b/packages/pipeline/src/scripts/pull_missing_blocks.ts new file mode 100644 index 000000000..4a1483ab9 --- /dev/null +++ b/packages/pipeline/src/scripts/pull_missing_blocks.ts @@ -0,0 +1,83 @@ +// tslint:disable:no-console +import { web3Factory } from '@0x/dev-utils'; +import * as Parallel from 'async-parallel'; +import R = require('ramda'); +import 'reflect-metadata'; +import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; + +import { Web3Source } from '../data_sources/web3'; +import { Block } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseBlock } from '../parsers/web3'; +import { handleError } from '../utils'; + +// Number of blocks to save at once. +const BATCH_SAVE_SIZE = 1000; +// Maximum number of requests to send at once. +const MAX_CONCURRENCY = 10; +// Maximum number of blocks to query for at once. This is also the maximum +// number of blocks we will hold in memory prior to being saved to the database. +const MAX_BLOCKS_PER_QUERY = 1000; +// Block number when the Exchange contract was deployed to mainnet. +// TODO(albrow): De-dupe this constant. +const EXCHANGE_START_BLOCK = 6271590; + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const provider = web3Factory.getRpcProvider({ + rpcUrl: `https://mainnet.infura.io/${process.env.INFURA_API_KEY}`, + }); + const web3Source = new Web3Source(provider); + await getAllMissingBlocks(web3Source); + process.exit(0); +})().catch(handleError); + +interface MissingBlocksResponse { + block_number: string; +} + +async function getAllMissingBlocks(web3Source: Web3Source): Promise { + const blocksRepository = connection.getRepository(Block); + let fromBlock = EXCHANGE_START_BLOCK; + while (true) { + const blockNumbers = await getMissingBlockNumbers(fromBlock); + if (blockNumbers.length === 0) { + // There are no more missing blocks. We're done. + break; + } + await getAndSaveBlocks(web3Source, blocksRepository, blockNumbers); + fromBlock = Math.max(...blockNumbers) + 1; + } + const totalBlocks = await blocksRepository.count(); + console.log(`Done saving blocks. There are now ${totalBlocks} total blocks.`); +} + +async function getMissingBlockNumbers(fromBlock: number): Promise { + console.log(`Checking for missing blocks starting at ${fromBlock}...`); + const response = (await connection.query( + 'SELECT DISTINCT(block_number) FROM raw.exchange_fill_events WHERE block_number NOT IN (SELECT number FROM raw.blocks) AND block_number >= $1 ORDER BY block_number ASC LIMIT $2', + [fromBlock, MAX_BLOCKS_PER_QUERY], + )) as MissingBlocksResponse[]; + const blockNumberStrings = R.pluck('block_number', response); + const blockNumbers = R.map(parseInt, blockNumberStrings); + console.log(`Found ${blockNumbers.length} missing blocks in the given range.`); + return blockNumbers; +} + +async function getAndSaveBlocks( + web3Source: Web3Source, + blocksRepository: Repository, + blockNumbers: number[], +): Promise { + console.log(`Getting block data for ${blockNumbers.length} blocks...`); + Parallel.setConcurrency(MAX_CONCURRENCY); + const rawBlocks = await Parallel.map(blockNumbers, async (blockNumber: number) => + web3Source.getBlockInfoAsync(blockNumber), + ); + console.log(`Parsing ${rawBlocks.length} blocks...`); + const blocks = R.map(parseBlock, rawBlocks); + console.log(`Saving ${blocks.length} blocks...`); + await blocksRepository.save(blocks, { chunk: Math.ceil(blocks.length / BATCH_SAVE_SIZE) }); +} -- cgit v1.2.3 From c6af5131b0b06433d6294260274e187ad61f4ef7 Mon Sep 17 00:00:00 2001 From: Jake Ellowitz Date: Mon, 19 Nov 2018 16:24:07 -0800 Subject: Pull token metadata re trusted tokens --- .../pipeline/src/scripts/pull_missing_events.ts | 4 +- .../pipeline/src/scripts/pull_trusted_tokens.ts | 51 ++++++++++++++++++++++ 2 files changed, 53 insertions(+), 2 deletions(-) create mode 100644 packages/pipeline/src/scripts/pull_trusted_tokens.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index 0b7f6287f..68cabe3de 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -64,7 +64,7 @@ async function getCancelUpToEventsAsync(eventsSource: ExchangeEventsSource): Pro await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); } -const tabelNameRegex = /^[a-zA-Z_]*$/; +const tableNameRegex = /^[a-zA-Z_]*$/; async function getStartBlockAsync(repository: Repository): Promise { const fillEventCount = await repository.count(); @@ -73,7 +73,7 @@ async function getStartBlockAsync(repository: Repositor return EXCHANGE_START_BLOCK; } const tableName = repository.metadata.tableName; - if (!tabelNameRegex.test(tableName)) { + if (!tableNameRegex.test(tableName)) { throw new Error('Unexpected special character in table name: ' + tableName); } const queryResult = await connection.query( diff --git a/packages/pipeline/src/scripts/pull_trusted_tokens.ts b/packages/pipeline/src/scripts/pull_trusted_tokens.ts new file mode 100644 index 000000000..67d9e08d1 --- /dev/null +++ b/packages/pipeline/src/scripts/pull_trusted_tokens.ts @@ -0,0 +1,51 @@ +import 'reflect-metadata'; +import { Connection, ConnectionOptions, createConnection } from 'typeorm'; + +import { MetamaskTrustedTokenMeta, TrustedTokenSource, ZeroExTrustedTokenMeta } from '../data_sources/trusted_tokens'; +import { TrustedToken } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseMetamaskTrustedTokens, parseZeroExTrustedTokens } from '../parsers/trusted_tokens'; +import { handleError } from '../utils'; + +const METAMASK_TRUSTED_TOKENS_URL = + 'https://raw.githubusercontent.com/MetaMask/eth-contract-metadata/master/contract-map.json'; + +const ZEROEX_TRUSTED_TOKENS_URL = + 'https://website-api.0xproject.com/tokens'; + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + await getMetamaskTrustedTokens(); + await getZeroExTrustedTokens(); + process.exit(0); +})().catch(handleError); + +async function getMetamaskTrustedTokens(): Promise { + // tslint:disable-next-line + console.log('Getting latest metamask trusted tokens list ...'); + const trustedTokensRepository = connection.getRepository(TrustedToken); + const trustedTokensSource = new TrustedTokenSource>(METAMASK_TRUSTED_TOKENS_URL); + const resp = await trustedTokensSource.getTrustedTokenMetaAsync(); + const trustedTokens = parseMetamaskTrustedTokens(resp); + // tslint:disable-next-line + console.log('Saving metamask trusted tokens list'); + await trustedTokensRepository.save(trustedTokens); + // tslint:disable-next-line + console.log('Done saving metamask trusted tokens.') +} + +async function getZeroExTrustedTokens(): Promise { + // tslint:disable-next-line + console.log('Getting latest 0x trusted tokens list ...'); + const trustedTokensRepository = connection.getRepository(TrustedToken); + const trustedTokensSource = new TrustedTokenSource(ZEROEX_TRUSTED_TOKENS_URL); + const resp = await trustedTokensSource.getTrustedTokenMetaAsync(); + const trustedTokens = parseZeroExTrustedTokens(resp); + // tslint:disable-next-line + console.log('Saving metamask trusted tokens list'); + await trustedTokensRepository.save(trustedTokens); + // tslint:disable-next-line + console.log('Done saving metamask trusted tokens.'); +} -- cgit v1.2.3 From dea89c4e221d5b22de97b27573719cd27ce250c7 Mon Sep 17 00:00:00 2001 From: Jake Ellowitz Date: Mon, 19 Nov 2018 19:11:51 -0800 Subject: metadata and trusted sources in same raw table --- packages/pipeline/src/scripts/pull_trusted_tokens.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_trusted_tokens.ts b/packages/pipeline/src/scripts/pull_trusted_tokens.ts index 67d9e08d1..48e20bea7 100644 --- a/packages/pipeline/src/scripts/pull_trusted_tokens.ts +++ b/packages/pipeline/src/scripts/pull_trusted_tokens.ts @@ -2,9 +2,9 @@ import 'reflect-metadata'; import { Connection, ConnectionOptions, createConnection } from 'typeorm'; import { MetamaskTrustedTokenMeta, TrustedTokenSource, ZeroExTrustedTokenMeta } from '../data_sources/trusted_tokens'; -import { TrustedToken } from '../entities'; +import { TokenMetadata } from '../entities'; import * as ormConfig from '../ormconfig'; -import { parseMetamaskTrustedTokens, parseZeroExTrustedTokens } from '../parsers/trusted_tokens'; +import { parseMetamaskTrustedTokens, parseZeroExTrustedTokens } from '../parsers/token_metadata'; import { handleError } from '../utils'; const METAMASK_TRUSTED_TOKENS_URL = @@ -25,7 +25,7 @@ let connection: Connection; async function getMetamaskTrustedTokens(): Promise { // tslint:disable-next-line console.log('Getting latest metamask trusted tokens list ...'); - const trustedTokensRepository = connection.getRepository(TrustedToken); + const trustedTokensRepository = connection.getRepository(TokenMetadata); const trustedTokensSource = new TrustedTokenSource>(METAMASK_TRUSTED_TOKENS_URL); const resp = await trustedTokensSource.getTrustedTokenMetaAsync(); const trustedTokens = parseMetamaskTrustedTokens(resp); @@ -39,7 +39,7 @@ async function getMetamaskTrustedTokens(): Promise { async function getZeroExTrustedTokens(): Promise { // tslint:disable-next-line console.log('Getting latest 0x trusted tokens list ...'); - const trustedTokensRepository = connection.getRepository(TrustedToken); + const trustedTokensRepository = connection.getRepository(TokenMetadata); const trustedTokensSource = new TrustedTokenSource(ZEROEX_TRUSTED_TOKENS_URL); const resp = await trustedTokensSource.getTrustedTokenMetaAsync(); const trustedTokens = parseZeroExTrustedTokens(resp); -- cgit v1.2.3 From 1aa3f9d69f7a6570c6fa62c542063fa92fc0bf5a Mon Sep 17 00:00:00 2001 From: Jake Ellowitz Date: Mon, 26 Nov 2018 14:55:45 -0800 Subject: updating comment for 0x trusted tokens --- packages/pipeline/src/scripts/pull_trusted_tokens.ts | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_trusted_tokens.ts b/packages/pipeline/src/scripts/pull_trusted_tokens.ts index 48e20bea7..b3a4466bb 100644 --- a/packages/pipeline/src/scripts/pull_trusted_tokens.ts +++ b/packages/pipeline/src/scripts/pull_trusted_tokens.ts @@ -10,8 +10,7 @@ import { handleError } from '../utils'; const METAMASK_TRUSTED_TOKENS_URL = 'https://raw.githubusercontent.com/MetaMask/eth-contract-metadata/master/contract-map.json'; -const ZEROEX_TRUSTED_TOKENS_URL = - 'https://website-api.0xproject.com/tokens'; +const ZEROEX_TRUSTED_TOKENS_URL = 'https://website-api.0xproject.com/tokens'; let connection: Connection; @@ -26,14 +25,16 @@ async function getMetamaskTrustedTokens(): Promise { // tslint:disable-next-line console.log('Getting latest metamask trusted tokens list ...'); const trustedTokensRepository = connection.getRepository(TokenMetadata); - const trustedTokensSource = new TrustedTokenSource>(METAMASK_TRUSTED_TOKENS_URL); + const trustedTokensSource = new TrustedTokenSource>( + METAMASK_TRUSTED_TOKENS_URL, + ); const resp = await trustedTokensSource.getTrustedTokenMetaAsync(); const trustedTokens = parseMetamaskTrustedTokens(resp); // tslint:disable-next-line console.log('Saving metamask trusted tokens list'); await trustedTokensRepository.save(trustedTokens); // tslint:disable-next-line - console.log('Done saving metamask trusted tokens.') + console.log('Done saving metamask trusted tokens.'); } async function getZeroExTrustedTokens(): Promise { -- cgit v1.2.3 From 7198b441e0d85785eec7244dd60bcd92269d954e Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Thu, 29 Nov 2018 11:40:09 -0800 Subject: Add script for parsing competing dex trades from Bloxy (#1355) --- .../src/scripts/pull_competing_dex_trades.ts | 51 ++++++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 packages/pipeline/src/scripts/pull_competing_dex_trades.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_competing_dex_trades.ts b/packages/pipeline/src/scripts/pull_competing_dex_trades.ts new file mode 100644 index 000000000..4e4c12dd0 --- /dev/null +++ b/packages/pipeline/src/scripts/pull_competing_dex_trades.ts @@ -0,0 +1,51 @@ +// tslint:disable:no-console +import 'reflect-metadata'; +import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; + +import { BloxySource } from '../data_sources/bloxy'; +import { DexTrade } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseBloxyTrades } from '../parsers/bloxy'; +import { handleError } from '../utils'; + +// Number of trades to save at once. +const BATCH_SAVE_SIZE = 1000; + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + await getAndSaveTrades(); + process.exit(0); +})().catch(handleError); + +async function getAndSaveTrades(): Promise { + const apiKey = process.env.BLOXY_API_KEY; + if (apiKey === undefined) { + throw new Error('Missing required env var: BLOXY_API_KEY'); + } + const bloxySource = new BloxySource(apiKey); + const tradesRepository = connection.getRepository(DexTrade); + const lastSeenTimestamp = await getLastSeenTimestampAsync(tradesRepository); + console.log(`Last seen timestamp: ${lastSeenTimestamp === 0 ? 'none' : lastSeenTimestamp}`); + console.log('Getting latest dex trades...'); + const rawTrades = await bloxySource.getDexTradesAsync(lastSeenTimestamp); + console.log(`Parsing ${rawTrades.length} trades...`); + const trades = parseBloxyTrades(rawTrades); + console.log(`Saving ${trades.length} trades...`); + await tradesRepository.save(trades, { chunk: Math.ceil(trades.length / BATCH_SAVE_SIZE) }); + console.log('Done saving trades.'); +} + +async function getLastSeenTimestampAsync(tradesRepository: Repository): Promise { + if ((await tradesRepository.count()) === 0) { + return 0; + } + const response = (await connection.query( + 'SELECT tx_timestamp FROM raw.dex_trades ORDER BY tx_timestamp DESC LIMIT 1', + )) as Array<{ tx_timestamp: number }>; + if (response.length === 0) { + return 0; + } + return response[0].tx_timestamp; +} -- cgit v1.2.3 From 87ffa5d7ab19d2288bf68131a7e7ec77578c564c Mon Sep 17 00:00:00 2001 From: zkao Date: Tue, 4 Dec 2018 13:21:46 -0800 Subject: Token_orderbook_snapshots for Ddex and Paradex(#1354) * Implements the TokenOrderbookSnapshot Table * Scripts, Data Sources and Entities to pull Ddex and Paradex API data. --- .../src/scripts/pull_ddex_orderbook_snapshots.ts | 55 ++++++++++++++ .../scripts/pull_paradex_orderbook_snapshots.ts | 87 ++++++++++++++++++++++ 2 files changed, 142 insertions(+) create mode 100644 packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts create mode 100644 packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts new file mode 100644 index 000000000..b02468e9b --- /dev/null +++ b/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts @@ -0,0 +1,55 @@ +import { logUtils } from '@0x/utils'; +import * as R from 'ramda'; +import { Connection, ConnectionOptions, createConnection } from 'typeorm'; + +import { DDEX_SOURCE, DdexMarket, DdexSource } from '../data_sources/ddex'; +import { TokenOrderbookSnapshot as TokenOrder } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseDdexOrders } from '../parsers/ddex_orders'; +import { handleError } from '../utils'; + +// Number of orders to save at once. +const BATCH_SAVE_SIZE = 1000; + +// Number of markets to retrieve orderbooks for at once. +const MARKET_ORDERBOOK_REQUEST_BATCH_SIZE = 50; + +// Delay between market orderbook requests. +const MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY = 5000; + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const ddexSource = new DdexSource(); + const markets = await ddexSource.getActiveMarketsAsync(); + for (const marketsChunk of R.splitEvery(MARKET_ORDERBOOK_REQUEST_BATCH_SIZE, markets)) { + await Promise.all( + marketsChunk.map(async (market: DdexMarket) => getAndSaveMarketOrderbook(ddexSource, market)), + ); + await new Promise(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY)); + } + process.exit(0); +})().catch(handleError); + +/** + * Retrieve orderbook from Ddex API for a given market. Parse orders and insert + * them into our database. + * @param ddexSource Data source which can query Ddex API. + * @param market Object from Ddex API containing market data. + */ +async function getAndSaveMarketOrderbook(ddexSource: DdexSource, market: DdexMarket): Promise { + const orderBook = await ddexSource.getMarketOrderbookAsync(market.id); + const observedTimestamp = Date.now(); + + logUtils.log(`${market.id}: Parsing orders.`); + const orders = parseDdexOrders(orderBook, market, observedTimestamp, DDEX_SOURCE); + + if (orders.length > 0) { + logUtils.log(`${market.id}: Saving ${orders.length} orders.`); + const TokenOrderRepository = connection.getRepository(TokenOrder); + await TokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) }); + } else { + logUtils.log(`${market.id}: 0 orders to save.`); + } +} diff --git a/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts new file mode 100644 index 000000000..bae1fbede --- /dev/null +++ b/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts @@ -0,0 +1,87 @@ +import { logUtils } from '@0x/utils'; +import { Connection, ConnectionOptions, createConnection } from 'typeorm'; + +import { + PARADEX_SOURCE, + ParadexActiveMarketsResponse, + ParadexMarket, + ParadexSource, + ParadexTokenInfoResponse, +} from '../data_sources/paradex'; +import { TokenOrderbookSnapshot as TokenOrder } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseParadexOrders } from '../parsers/paradex_orders'; +import { handleError } from '../utils'; + +// Number of orders to save at once. +const BATCH_SAVE_SIZE = 1000; + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const apiKey = process.env.PARADEX_DATA_PIPELINE_API_KEY; + if (apiKey === undefined) { + throw new Error('Missing required env var: PARADEX_DATA_PIPELINE_API_KEY'); + } + const paradexSource = new ParadexSource(apiKey); + const markets = await paradexSource.getActiveMarketsAsync(); + const tokenInfoResponse = await paradexSource.getTokenInfoAsync(); + const extendedMarkets = addTokenAddresses(markets, tokenInfoResponse); + await Promise.all( + extendedMarkets.map(async (market: ParadexMarket) => getAndSaveMarketOrderbook(paradexSource, market)), + ); + process.exit(0); +})().catch(handleError); + +/** + * Extend the default ParadexMarket objects with token addresses. + * @param markets An array of ParadexMarket objects. + * @param tokenInfoResponse An array of ParadexTokenInfo containing the addresses. + */ +function addTokenAddresses( + markets: ParadexActiveMarketsResponse, + tokenInfoResponse: ParadexTokenInfoResponse, +): ParadexMarket[] { + const symbolAddressMapping = new Map(); + tokenInfoResponse.forEach(tokenInfo => symbolAddressMapping.set(tokenInfo.symbol, tokenInfo.address)); + + markets.forEach((market: ParadexMarket) => { + if (symbolAddressMapping.has(market.baseToken)) { + market.baseTokenAddress = symbolAddressMapping.get(market.baseToken); + } else { + market.quoteTokenAddress = ''; + logUtils.warn(`${market.baseToken}: No address found.`); + } + + if (symbolAddressMapping.has(market.quoteToken)) { + market.quoteTokenAddress = symbolAddressMapping.get(market.quoteToken); + } else { + market.quoteTokenAddress = ''; + logUtils.warn(`${market.quoteToken}: No address found.`); + } + }); + return markets; +} + +/** + * Retrieve orderbook from Paradex API for a given market. Parse orders and insert + * them into our database. + * @param paradexSource Data source which can query the Paradex API. + * @param market Object from the Paradex API with information about the market in question. + */ +async function getAndSaveMarketOrderbook(paradexSource: ParadexSource, market: ParadexMarket): Promise { + const paradexOrderbookResponse = await paradexSource.getMarketOrderbookAsync(market.symbol); + const observedTimestamp = Date.now(); + + logUtils.log(`${market.symbol}: Parsing orders.`); + const orders = parseParadexOrders(paradexOrderbookResponse, market, observedTimestamp, PARADEX_SOURCE); + + if (orders.length > 0) { + logUtils.log(`${market.symbol}: Saving ${orders.length} orders.`); + const tokenOrderRepository = connection.getRepository(TokenOrder); + await tokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) }); + } else { + logUtils.log(`${market.symbol}: 0 orders to save.`); + } +} -- cgit v1.2.3 From 8c21a700bae0c751f7f9ca47f9a47628a4478911 Mon Sep 17 00:00:00 2001 From: Xianny <8582774+xianny@users.noreply.github.com> Date: Tue, 4 Dec 2018 13:36:18 -0800 Subject: pull OHLCV records from Crypto Compare (#1349) * [WIP] pull OHLCV records from Crypto Compare * lint * refactor to pull logic out of script and into modules * add entity test for ohlcv_external entity * implement rate limit and chronological backfill for ohlcv * add unit tests; cleanup variable names * Fetch OHLCV pairs params from events table * better method names * fix outdated test * lint * Clean up after review * oops * fix failing test * better filtering of most recent records * fix bug when generating pairs * fix default earliest backfill date * fix bug with retrieving backfill time * prettier --- .../src/scripts/pull_ohlcv_cryptocompare.ts | 101 +++++++++++++++++++++ 1 file changed, 101 insertions(+) create mode 100644 packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts new file mode 100644 index 000000000..6979cd10e --- /dev/null +++ b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts @@ -0,0 +1,101 @@ +// tslint:disable:no-console +import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; + +import { CryptoCompareOHLCVSource } from '../data_sources/ohlcv_external/crypto_compare'; +import { OHLCVExternal } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { OHLCVMetadata, parseRecords } from '../parsers/ohlcv_external/crypto_compare'; +import { handleError } from '../utils'; +import { fetchOHLCVTradingPairsAsync, TradingPair } from '../utils/get_ohlcv_trading_pairs'; + +const SOURCE_NAME = 'CryptoCompare'; +const TWO_HOURS_AGO = new Date().getTime() - 2 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers +const ONE_HOUR_AGO = new Date().getTime() - 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers +const ONE_SECOND = 1000; + +const MAX_CONCURRENT_REQUESTS = parseInt(process.env.CRYPTOCOMPARE_MAX_CONCURRENT_REQUESTS || '14', 10); // tslint:disable-line:custom-no-magic-numbers +const EARLIEST_BACKFILL_DATE = process.env.OHLCV_EARLIEST_BACKFILL_DATE || '2010-09-01'; // the time when BTC/USD info starts appearing on Crypto Compare +const EARLIEST_BACKFILL_TIME = new Date(EARLIEST_BACKFILL_DATE).getTime(); + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const repository = connection.getRepository(OHLCVExternal); + const source = new CryptoCompareOHLCVSource(MAX_CONCURRENT_REQUESTS); + + const jobTime = new Date().getTime(); + const tradingPairs = await fetchOHLCVTradingPairsAsync(connection, SOURCE_NAME, EARLIEST_BACKFILL_TIME); + console.log(`Starting ${tradingPairs.length} job(s) to scrape Crypto Compare for OHLCV records...`); + + const fetchAndSavePromises = tradingPairs.map(async pair => { + const pairs = source.generateBackfillIntervals(pair); + return fetchAndSaveAsync(source, repository, jobTime, pairs); + }); + await Promise.all(fetchAndSavePromises); + console.log(`Finished scraping OHLCV records from Crypto Compare, exiting...`); + process.exit(0); +})().catch(handleError); + +async function fetchAndSaveAsync( + source: CryptoCompareOHLCVSource, + repository: Repository, + jobTime: number, + pairs: TradingPair[], +): Promise { + const sortAscTimestamp = (a: TradingPair, b: TradingPair): number => { + if (a.latestSavedTime < b.latestSavedTime) { + return -1; + } else if (a.latestSavedTime > b.latestSavedTime) { + return 1; + } else { + return 0; + } + }; + pairs.sort(sortAscTimestamp); + + let i = 0; + while (i < pairs.length) { + const pair = pairs[i]; + if (pair.latestSavedTime > TWO_HOURS_AGO) { + break; + } + const rawRecords = await source.getHourlyOHLCVAsync(pair); + const records = rawRecords.filter(rec => { + return rec.time * ONE_SECOND < ONE_HOUR_AGO && rec.time * ONE_SECOND > pair.latestSavedTime; + }); // Crypto Compare can take ~30mins to finalise records + if (records.length === 0) { + console.log(`No more records, stopping task for ${JSON.stringify(pair)}`); + break; + } + const metadata: OHLCVMetadata = { + exchange: source.default_exchange, + fromSymbol: pair.fromSymbol, + toSymbol: pair.toSymbol, + source: SOURCE_NAME, + observedTimestamp: jobTime, + interval: source.intervalBetweenRecords, + }; + const parsedRecords = parseRecords(records, metadata); + try { + await saveRecordsAsync(repository, parsedRecords); + i++; + } catch (err) { + console.log(`Error saving OHLCVRecords, stopping task for ${JSON.stringify(pair)} [${err}]`); + break; + } + } + return Promise.resolve(); +} + +async function saveRecordsAsync(repository: Repository, records: OHLCVExternal[]): Promise { + const metadata = [ + records[0].fromSymbol, + records[0].toSymbol, + new Date(records[0].startTime), + new Date(records[records.length - 1].endTime), + ]; + + console.log(`Saving ${records.length} records to ${repository.metadata.name}... ${JSON.stringify(metadata)}`); + await repository.save(records); +} -- cgit v1.2.3 From 8721d4ed7a7a38f77847b45619f15315044c375c Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Tue, 4 Dec 2018 15:23:15 -0800 Subject: Fix linter --- packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts index b02468e9b..7868e9c5a 100644 --- a/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts +++ b/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts @@ -27,7 +27,7 @@ let connection: Connection; await Promise.all( marketsChunk.map(async (market: DdexMarket) => getAndSaveMarketOrderbook(ddexSource, market)), ); - await new Promise(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY)); + await new Promise(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY)); } process.exit(0); })().catch(handleError); -- cgit v1.2.3 From 549f5e4655f246062dd6451065ec01eb789dbd8f Mon Sep 17 00:00:00 2001 From: Fabio B Date: Tue, 4 Dec 2018 19:56:23 -0800 Subject: Use a string template in packages/pipeline/src/scripts/pull_missing_events.ts Co-Authored-By: albrow --- packages/pipeline/src/scripts/pull_missing_events.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index 68cabe3de..c7e07c540 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -74,7 +74,7 @@ async function getStartBlockAsync(repository: Repositor } const tableName = repository.metadata.tableName; if (!tableNameRegex.test(tableName)) { - throw new Error('Unexpected special character in table name: ' + tableName); + throw new Error(`Unexpected special character in table name: ${tableName}`); } const queryResult = await connection.query( `SELECT block_number FROM raw.${tableName} ORDER BY block_number DESC LIMIT 1`, -- cgit v1.2.3 From 00f86ca0f7871639d2b0be496f6f8c5e0d8d7ffe Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Tue, 4 Dec 2018 20:04:08 -0800 Subject: Address PR feedback --- .../pipeline/src/scripts/pull_missing_blocks.ts | 7 ++--- .../pipeline/src/scripts/pull_missing_events.ts | 5 ++-- .../src/scripts/pull_radar_relay_orders.ts | 33 +++++++++++++--------- .../pipeline/src/scripts/pull_trusted_tokens.ts | 14 ++++----- 4 files changed, 31 insertions(+), 28 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_blocks.ts b/packages/pipeline/src/scripts/pull_missing_blocks.ts index 4a1483ab9..b7bd51f08 100644 --- a/packages/pipeline/src/scripts/pull_missing_blocks.ts +++ b/packages/pipeline/src/scripts/pull_missing_blocks.ts @@ -9,7 +9,7 @@ import { Web3Source } from '../data_sources/web3'; import { Block } from '../entities'; import * as ormConfig from '../ormconfig'; import { parseBlock } from '../parsers/web3'; -import { handleError } from '../utils'; +import { EXCHANGE_START_BLOCK, handleError, INFURA_ROOT_URL } from '../utils'; // Number of blocks to save at once. const BATCH_SAVE_SIZE = 1000; @@ -18,16 +18,13 @@ const MAX_CONCURRENCY = 10; // Maximum number of blocks to query for at once. This is also the maximum // number of blocks we will hold in memory prior to being saved to the database. const MAX_BLOCKS_PER_QUERY = 1000; -// Block number when the Exchange contract was deployed to mainnet. -// TODO(albrow): De-dupe this constant. -const EXCHANGE_START_BLOCK = 6271590; let connection: Connection; (async () => { connection = await createConnection(ormConfig as ConnectionOptions); const provider = web3Factory.getRpcProvider({ - rpcUrl: `https://mainnet.infura.io/${process.env.INFURA_API_KEY}`, + rpcUrl: `${INFURA_ROOT_URL}/${process.env.INFURA_API_KEY}`, }); const web3Source = new Web3Source(provider); await getAllMissingBlocks(web3Source); diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index c7e07c540..80abbb8b0 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -8,9 +8,8 @@ import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeEvent, ExchangeFillEvent } from '../entities'; import * as ormConfig from '../ormconfig'; import { parseExchangeCancelEvents, parseExchangeCancelUpToEvents, parseExchangeFillEvents } from '../parsers/events'; -import { handleError } from '../utils'; +import { EXCHANGE_START_BLOCK, handleError, INFURA_ROOT_URL } from '../utils'; -const EXCHANGE_START_BLOCK = 6271590; // Block number when the Exchange contract was deployed to mainnet. const START_BLOCK_OFFSET = 100; // Number of blocks before the last known block to consider when updating fill events. const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. @@ -19,7 +18,7 @@ let connection: Connection; (async () => { connection = await createConnection(ormConfig as ConnectionOptions); const provider = web3Factory.getRpcProvider({ - rpcUrl: 'https://mainnet.infura.io', + rpcUrl: INFURA_ROOT_URL, }); const eventsSource = new ExchangeEventsSource(provider, 1); await getFillEventsAsync(eventsSource); diff --git a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts index b3a4d887e..bbbef9b47 100644 --- a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts +++ b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts @@ -16,11 +16,11 @@ let connection: Connection; (async () => { connection = await createConnection(ormConfig as ConnectionOptions); - await getOrderbook(); + await getOrderbookAsync(); process.exit(0); })().catch(handleError); -async function getOrderbook(): Promise { +async function getOrderbookAsync(): Promise { console.log('Getting all orders...'); const connectClient = new HttpClient(RADAR_RELAY_URL); const rawOrders = await connectClient.getOrdersAsync({ @@ -29,17 +29,22 @@ async function getOrderbook(): Promise { console.log(`Got ${rawOrders.records.length} orders.`); console.log('Parsing orders...'); // Parse the sra orders, then add source url to each. - const orders = R.pipe(parseSraOrders, R.map(setSourceUrl(RADAR_RELAY_URL)))(rawOrders); + const orders = R.pipe( + parseSraOrders, + R.map(setSourceUrl(RADAR_RELAY_URL)), + )(rawOrders); // Save all the orders and update the observed time stamps in a single // transaction. console.log('Saving orders and updating timestamps...'); - await connection.transaction(async (manager: EntityManager): Promise => { - for (const order of orders) { - await manager.save(SraOrder, order); - const observedTimestamp = createObservedTimestampForOrder(order); - await manager.save(observedTimestamp); - } - }); + await connection.transaction( + async (manager: EntityManager): Promise => { + for (const order of orders) { + await manager.save(SraOrder, order); + const observedTimestamp = createObservedTimestampForOrder(order); + await manager.save(observedTimestamp); + } + }, + ); } const sourceUrlProp = R.lensProp('sourceUrl'); @@ -48,6 +53,8 @@ const sourceUrlProp = R.lensProp('sourceUrl'); * Sets the source url for a single order. Returns a new order instead of * mutating the given one. */ -const setSourceUrl = R.curry((sourceURL: string, order: SraOrder): SraOrder => { - return R.set(sourceUrlProp, sourceURL, order); -}); +const setSourceUrl = R.curry( + (sourceURL: string, order: SraOrder): SraOrder => { + return R.set(sourceUrlProp, sourceURL, order); + }, +); diff --git a/packages/pipeline/src/scripts/pull_trusted_tokens.ts b/packages/pipeline/src/scripts/pull_trusted_tokens.ts index b3a4466bb..1befc4437 100644 --- a/packages/pipeline/src/scripts/pull_trusted_tokens.ts +++ b/packages/pipeline/src/scripts/pull_trusted_tokens.ts @@ -8,7 +8,7 @@ import { parseMetamaskTrustedTokens, parseZeroExTrustedTokens } from '../parsers import { handleError } from '../utils'; const METAMASK_TRUSTED_TOKENS_URL = - 'https://raw.githubusercontent.com/MetaMask/eth-contract-metadata/master/contract-map.json'; + 'https://raw.githubusercontent.com/MetaMask/eth-contract-metadata/d45916c533116510cc8e9e048a8b5fc3732a6b6d/contract-map.json'; const ZEROEX_TRUSTED_TOKENS_URL = 'https://website-api.0xproject.com/tokens'; @@ -22,7 +22,7 @@ let connection: Connection; })().catch(handleError); async function getMetamaskTrustedTokens(): Promise { - // tslint:disable-next-line + // tslint:disable-next-line:no-console console.log('Getting latest metamask trusted tokens list ...'); const trustedTokensRepository = connection.getRepository(TokenMetadata); const trustedTokensSource = new TrustedTokenSource>( @@ -30,23 +30,23 @@ async function getMetamaskTrustedTokens(): Promise { ); const resp = await trustedTokensSource.getTrustedTokenMetaAsync(); const trustedTokens = parseMetamaskTrustedTokens(resp); - // tslint:disable-next-line + // tslint:disable-next-line:no-console console.log('Saving metamask trusted tokens list'); await trustedTokensRepository.save(trustedTokens); - // tslint:disable-next-line + // tslint:disable-next-line:no-console console.log('Done saving metamask trusted tokens.'); } async function getZeroExTrustedTokens(): Promise { - // tslint:disable-next-line + // tslint:disable-next-line:no-console console.log('Getting latest 0x trusted tokens list ...'); const trustedTokensRepository = connection.getRepository(TokenMetadata); const trustedTokensSource = new TrustedTokenSource(ZEROEX_TRUSTED_TOKENS_URL); const resp = await trustedTokensSource.getTrustedTokenMetaAsync(); const trustedTokens = parseZeroExTrustedTokens(resp); - // tslint:disable-next-line + // tslint:disable-next-line:no-console console.log('Saving metamask trusted tokens list'); await trustedTokensRepository.save(trustedTokens); - // tslint:disable-next-line + // tslint:disable-next-line:no-console console.log('Done saving metamask trusted tokens.'); } -- cgit v1.2.3 From 2e704ac01a077b0c73288aaa53c9cf66c73e27f1 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Tue, 4 Dec 2018 20:08:32 -0800 Subject: Fix prettier --- .../src/scripts/pull_radar_relay_orders.ts | 29 ++++++++-------------- 1 file changed, 11 insertions(+), 18 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts index bbbef9b47..6c18bcaef 100644 --- a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts +++ b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts @@ -29,22 +29,17 @@ async function getOrderbookAsync(): Promise { console.log(`Got ${rawOrders.records.length} orders.`); console.log('Parsing orders...'); // Parse the sra orders, then add source url to each. - const orders = R.pipe( - parseSraOrders, - R.map(setSourceUrl(RADAR_RELAY_URL)), - )(rawOrders); + const orders = R.pipe(parseSraOrders, R.map(setSourceUrl(RADAR_RELAY_URL)))(rawOrders); // Save all the orders and update the observed time stamps in a single // transaction. console.log('Saving orders and updating timestamps...'); - await connection.transaction( - async (manager: EntityManager): Promise => { - for (const order of orders) { - await manager.save(SraOrder, order); - const observedTimestamp = createObservedTimestampForOrder(order); - await manager.save(observedTimestamp); - } - }, - ); + await connection.transaction(async (manager: EntityManager): Promise => { + for (const order of orders) { + await manager.save(SraOrder, order); + const observedTimestamp = createObservedTimestampForOrder(order); + await manager.save(observedTimestamp); + } + }); } const sourceUrlProp = R.lensProp('sourceUrl'); @@ -53,8 +48,6 @@ const sourceUrlProp = R.lensProp('sourceUrl'); * Sets the source url for a single order. Returns a new order instead of * mutating the given one. */ -const setSourceUrl = R.curry( - (sourceURL: string, order: SraOrder): SraOrder => { - return R.set(sourceUrlProp, sourceURL, order); - }, -); +const setSourceUrl = R.curry((sourceURL: string, order: SraOrder): SraOrder => { + return R.set(sourceUrlProp, sourceURL, order); +}); -- cgit v1.2.3 From 08eb0b91b6d0f0dc90ae920a18ca5dd080bf235c Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Wed, 5 Dec 2018 12:27:32 -0800 Subject: Fix RadarRelay timestamps (#1391) * Fixing rr timestamps * Apply prettier --- packages/pipeline/src/scripts/pull_radar_relay_orders.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts index 6c18bcaef..40bb6fc97 100644 --- a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts +++ b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts @@ -33,11 +33,12 @@ async function getOrderbookAsync(): Promise { // Save all the orders and update the observed time stamps in a single // transaction. console.log('Saving orders and updating timestamps...'); + const observedTimestamp = Date.now(); await connection.transaction(async (manager: EntityManager): Promise => { for (const order of orders) { await manager.save(SraOrder, order); - const observedTimestamp = createObservedTimestampForOrder(order); - await manager.save(observedTimestamp); + const orderObservation = createObservedTimestampForOrder(order, observedTimestamp); + await manager.save(orderObservation); } }); } -- cgit v1.2.3 From 78d0ab1aa2393b7a0b21108b9811e0b0a4406faf Mon Sep 17 00:00:00 2001 From: Xianny <8582774+xianny@users.noreply.github.com> Date: Wed, 5 Dec 2018 16:05:06 -0800 Subject: Fix/pipeline/ohlcv (#1393) The OHLCV script in data pipeline quits early when we get no data from Crypto Compare. Sometimes Crypto Compare gives us a valid empty response (e.g. when we query for way back in time) and we need to just continue. This adds better filtering for the types of Crypto Compare responses to detect when we should continue and when we should really quit. --- .../src/scripts/pull_ohlcv_cryptocompare.ts | 38 +++++++++------------- 1 file changed, 16 insertions(+), 22 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts index 6979cd10e..7377a64d8 100644 --- a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts +++ b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts @@ -10,11 +10,9 @@ import { fetchOHLCVTradingPairsAsync, TradingPair } from '../utils/get_ohlcv_tra const SOURCE_NAME = 'CryptoCompare'; const TWO_HOURS_AGO = new Date().getTime() - 2 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers -const ONE_HOUR_AGO = new Date().getTime() - 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers -const ONE_SECOND = 1000; const MAX_CONCURRENT_REQUESTS = parseInt(process.env.CRYPTOCOMPARE_MAX_CONCURRENT_REQUESTS || '14', 10); // tslint:disable-line:custom-no-magic-numbers -const EARLIEST_BACKFILL_DATE = process.env.OHLCV_EARLIEST_BACKFILL_DATE || '2010-09-01'; // the time when BTC/USD info starts appearing on Crypto Compare +const EARLIEST_BACKFILL_DATE = process.env.OHLCV_EARLIEST_BACKFILL_DATE || '2014-06-01'; const EARLIEST_BACKFILL_TIME = new Date(EARLIEST_BACKFILL_DATE).getTime(); let connection: Connection; @@ -60,28 +58,24 @@ async function fetchAndSaveAsync( if (pair.latestSavedTime > TWO_HOURS_AGO) { break; } - const rawRecords = await source.getHourlyOHLCVAsync(pair); - const records = rawRecords.filter(rec => { - return rec.time * ONE_SECOND < ONE_HOUR_AGO && rec.time * ONE_SECOND > pair.latestSavedTime; - }); // Crypto Compare can take ~30mins to finalise records - if (records.length === 0) { - console.log(`No more records, stopping task for ${JSON.stringify(pair)}`); - break; - } - const metadata: OHLCVMetadata = { - exchange: source.default_exchange, - fromSymbol: pair.fromSymbol, - toSymbol: pair.toSymbol, - source: SOURCE_NAME, - observedTimestamp: jobTime, - interval: source.intervalBetweenRecords, - }; - const parsedRecords = parseRecords(records, metadata); try { - await saveRecordsAsync(repository, parsedRecords); + const records = await source.getHourlyOHLCVAsync(pair); + console.log(`Retrieved ${records.length} records for ${JSON.stringify(pair)}`); + if (records.length > 0) { + const metadata: OHLCVMetadata = { + exchange: source.default_exchange, + fromSymbol: pair.fromSymbol, + toSymbol: pair.toSymbol, + source: SOURCE_NAME, + observedTimestamp: jobTime, + interval: source.intervalBetweenRecords, + }; + const parsedRecords = parseRecords(records, metadata); + await saveRecordsAsync(repository, parsedRecords); + } i++; } catch (err) { - console.log(`Error saving OHLCVRecords, stopping task for ${JSON.stringify(pair)} [${err}]`); + console.log(`Error scraping OHLCVRecords, stopping task for ${JSON.stringify(pair)} [${err}]`); break; } } -- cgit v1.2.3 From 8b3b4d983f0f942fc4e42365d4cfcaf793d1d283 Mon Sep 17 00:00:00 2001 From: xianny Date: Thu, 6 Dec 2018 13:20:08 -0800 Subject: rename variable and define default in only 1 location --- packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts index 7377a64d8..a29a13bfc 100644 --- a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts +++ b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts @@ -11,7 +11,7 @@ import { fetchOHLCVTradingPairsAsync, TradingPair } from '../utils/get_ohlcv_tra const SOURCE_NAME = 'CryptoCompare'; const TWO_HOURS_AGO = new Date().getTime() - 2 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers -const MAX_CONCURRENT_REQUESTS = parseInt(process.env.CRYPTOCOMPARE_MAX_CONCURRENT_REQUESTS || '14', 10); // tslint:disable-line:custom-no-magic-numbers +const MAX_REQS_PER_SECOND = parseInt(process.env.CRYPTOCOMPARE_MAX_REQS_PER_SECOND || '15', 10); // tslint:disable-line:custom-no-magic-numbers const EARLIEST_BACKFILL_DATE = process.env.OHLCV_EARLIEST_BACKFILL_DATE || '2014-06-01'; const EARLIEST_BACKFILL_TIME = new Date(EARLIEST_BACKFILL_DATE).getTime(); @@ -20,7 +20,7 @@ let connection: Connection; (async () => { connection = await createConnection(ormConfig as ConnectionOptions); const repository = connection.getRepository(OHLCVExternal); - const source = new CryptoCompareOHLCVSource(MAX_CONCURRENT_REQUESTS); + const source = new CryptoCompareOHLCVSource(MAX_REQS_PER_SECOND); const jobTime = new Date().getTime(); const tradingPairs = await fetchOHLCVTradingPairsAsync(connection, SOURCE_NAME, EARLIEST_BACKFILL_TIME); -- cgit v1.2.3 From 096c4c8f2b20b4ca909d4ba950e219c22a5f8882 Mon Sep 17 00:00:00 2001 From: xianny Date: Mon, 10 Dec 2018 10:57:36 -0800 Subject: change to camelCase --- packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts index a29a13bfc..d44eb5cc6 100644 --- a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts +++ b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts @@ -63,7 +63,7 @@ async function fetchAndSaveAsync( console.log(`Retrieved ${records.length} records for ${JSON.stringify(pair)}`); if (records.length > 0) { const metadata: OHLCVMetadata = { - exchange: source.default_exchange, + exchange: source.defaultExchange, fromSymbol: pair.fromSymbol, toSymbol: pair.toSymbol, source: SOURCE_NAME, -- cgit v1.2.3 From b4cdb14b9b79589d7b24fd7655406c15b6bb00f6 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Tue, 11 Dec 2018 15:16:05 -0800 Subject: Refactor event scraping and add support for scraping ERC20 approval events (#1401) * Refactor event scraping and add support for scraping ERC20 approval events * Add tests for data_sources/contract-wrappers/utils --- packages/pipeline/src/scripts/pull_erc20_events.ts | 66 ++++++++++ .../pipeline/src/scripts/pull_exchange_events.ts | 146 +++++++++++++++++++++ .../pipeline/src/scripts/pull_missing_blocks.ts | 2 +- .../pipeline/src/scripts/pull_missing_events.ts | 136 ------------------- 4 files changed, 213 insertions(+), 137 deletions(-) create mode 100644 packages/pipeline/src/scripts/pull_erc20_events.ts create mode 100644 packages/pipeline/src/scripts/pull_exchange_events.ts delete mode 100644 packages/pipeline/src/scripts/pull_missing_events.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_erc20_events.ts b/packages/pipeline/src/scripts/pull_erc20_events.ts new file mode 100644 index 000000000..0ad12c97a --- /dev/null +++ b/packages/pipeline/src/scripts/pull_erc20_events.ts @@ -0,0 +1,66 @@ +// tslint:disable:no-console +import { getContractAddressesForNetworkOrThrow } from '@0x/contract-addresses'; +import { web3Factory } from '@0x/dev-utils'; +import { Web3ProviderEngine } from '@0x/subproviders'; +import { Web3Wrapper } from '@0x/web3-wrapper'; +import 'reflect-metadata'; +import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; + +import { ERC20EventsSource } from '../data_sources/contract-wrappers/erc20_events'; +import { ERC20ApprovalEvent } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseERC20ApprovalEvents } from '../parsers/events'; +import { handleError, INFURA_ROOT_URL } from '../utils'; + +const NETWORK_ID = 1; +const START_BLOCK_OFFSET = 100; // Number of blocks before the last known block to consider when updating fill events. +const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. +const BLOCK_FINALITY_THRESHOLD = 10; // When to consider blocks as final. Used to compute default endBlock. +const WETH_START_BLOCK = 4719568; // Block number when the WETH contract was deployed. + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const provider = web3Factory.getRpcProvider({ + rpcUrl: INFURA_ROOT_URL, + }); + const endBlock = await calculateEndBlockAsync(provider); + await getAndSaveWETHApprovalEventsAsync(provider, endBlock); + process.exit(0); +})().catch(handleError); + +async function getAndSaveWETHApprovalEventsAsync(provider: Web3ProviderEngine, endBlock: number): Promise { + console.log('Checking existing approval events...'); + const repository = connection.getRepository(ERC20ApprovalEvent); + const startBlock = (await getStartBlockAsync(repository)) || WETH_START_BLOCK; + + console.log(`Getting WETH approval events starting at ${startBlock}...`); + const wethTokenAddress = getContractAddressesForNetworkOrThrow(NETWORK_ID).etherToken; + const eventsSource = new ERC20EventsSource(provider, NETWORK_ID, wethTokenAddress); + const eventLogs = await eventsSource.getApprovalEventsAsync(startBlock, endBlock); + + console.log(`Parsing ${eventLogs.length} WETH approval events...`); + const events = parseERC20ApprovalEvents(eventLogs); + console.log(`Retrieved and parsed ${events.length} total WETH approval events.`); + await repository.save(events, { chunk: Math.ceil(events.length / BATCH_SAVE_SIZE) }); +} + +async function calculateEndBlockAsync(provider: Web3ProviderEngine): Promise { + const web3Wrapper = new Web3Wrapper(provider); + const currentBlock = await web3Wrapper.getBlockNumberAsync(); + return currentBlock - BLOCK_FINALITY_THRESHOLD; +} + +async function getStartBlockAsync(repository: Repository): Promise { + const fillEventCount = await repository.count(); + if (fillEventCount === 0) { + console.log(`No existing approval events found.`); + return null; + } + const queryResult = await connection.query( + `SELECT block_number FROM raw.erc20_approval_events ORDER BY block_number DESC LIMIT 1`, + ); + const lastKnownBlock = queryResult[0].block_number; + return lastKnownBlock - START_BLOCK_OFFSET; +} diff --git a/packages/pipeline/src/scripts/pull_exchange_events.ts b/packages/pipeline/src/scripts/pull_exchange_events.ts new file mode 100644 index 000000000..e98fc6629 --- /dev/null +++ b/packages/pipeline/src/scripts/pull_exchange_events.ts @@ -0,0 +1,146 @@ +// tslint:disable:no-console +import { web3Factory } from '@0x/dev-utils'; +import { Web3ProviderEngine } from '@0x/subproviders'; +import { Web3Wrapper } from '@0x/web3-wrapper'; +import R = require('ramda'); +import 'reflect-metadata'; +import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; + +import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; +import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeEvent, ExchangeFillEvent } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseExchangeCancelEvents, parseExchangeCancelUpToEvents, parseExchangeFillEvents } from '../parsers/events'; +import { EXCHANGE_START_BLOCK, handleError, INFURA_ROOT_URL } from '../utils'; + +const START_BLOCK_OFFSET = 100; // Number of blocks before the last known block to consider when updating fill events. +const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. +const BLOCK_FINALITY_THRESHOLD = 10; // When to consider blocks as final. Used to compute default endBlock. + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const provider = web3Factory.getRpcProvider({ + rpcUrl: INFURA_ROOT_URL, + }); + const endBlock = await calculateEndBlockAsync(provider); + const eventsSource = new ExchangeEventsSource(provider, 1); + await getFillEventsAsync(eventsSource, endBlock); + await getCancelEventsAsync(eventsSource, endBlock); + await getCancelUpToEventsAsync(eventsSource, endBlock); + process.exit(0); +})().catch(handleError); + +async function getFillEventsAsync(eventsSource: ExchangeEventsSource, endBlock: number): Promise { + console.log('Checking existing fill events...'); + const repository = connection.getRepository(ExchangeFillEvent); + const startBlock = await getStartBlockAsync(repository); + console.log(`Getting fill events starting at ${startBlock}...`); + const eventLogs = await eventsSource.getFillEventsAsync(startBlock, endBlock); + console.log('Parsing fill events...'); + const events = parseExchangeFillEvents(eventLogs); + console.log(`Retrieved and parsed ${events.length} total fill events.`); + await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); +} + +async function getCancelEventsAsync(eventsSource: ExchangeEventsSource, endBlock: number): Promise { + console.log('Checking existing cancel events...'); + const repository = connection.getRepository(ExchangeCancelEvent); + const startBlock = await getStartBlockAsync(repository); + console.log(`Getting cancel events starting at ${startBlock}...`); + const eventLogs = await eventsSource.getCancelEventsAsync(startBlock, endBlock); + console.log('Parsing cancel events...'); + const events = parseExchangeCancelEvents(eventLogs); + console.log(`Retrieved and parsed ${events.length} total cancel events.`); + await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); +} + +async function getCancelUpToEventsAsync(eventsSource: ExchangeEventsSource, endBlock: number): Promise { + console.log('Checking existing CancelUpTo events...'); + const repository = connection.getRepository(ExchangeCancelUpToEvent); + const startBlock = await getStartBlockAsync(repository); + console.log(`Getting CancelUpTo events starting at ${startBlock}...`); + const eventLogs = await eventsSource.getCancelUpToEventsAsync(startBlock, endBlock); + console.log('Parsing CancelUpTo events...'); + const events = parseExchangeCancelUpToEvents(eventLogs); + console.log(`Retrieved and parsed ${events.length} total CancelUpTo events.`); + await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); +} + +const tableNameRegex = /^[a-zA-Z_]*$/; + +async function getStartBlockAsync(repository: Repository): Promise { + const fillEventCount = await repository.count(); + if (fillEventCount === 0) { + console.log(`No existing ${repository.metadata.name}s found.`); + return EXCHANGE_START_BLOCK; + } + const tableName = repository.metadata.tableName; + if (!tableNameRegex.test(tableName)) { + throw new Error(`Unexpected special character in table name: ${tableName}`); + } + const queryResult = await connection.query( + `SELECT block_number FROM raw.${tableName} ORDER BY block_number DESC LIMIT 1`, + ); + const lastKnownBlock = queryResult[0].block_number; + return lastKnownBlock - START_BLOCK_OFFSET; +} + +async function saveEventsAsync( + isInitialPull: boolean, + repository: Repository, + events: T[], +): Promise { + console.log(`Saving ${repository.metadata.name}s...`); + if (isInitialPull) { + // Split data into numChunks pieces of maximum size BATCH_SAVE_SIZE + // each. + for (const eventsBatch of R.splitEvery(BATCH_SAVE_SIZE, events)) { + await repository.insert(eventsBatch); + } + } else { + // If we possibly have some overlap where we need to update some + // existing events, we need to use our workaround/fallback. + await saveIndividuallyWithFallbackAsync(repository, events); + } + const totalEvents = await repository.count(); + console.log(`Done saving events. There are now ${totalEvents} total ${repository.metadata.name}s.`); +} + +async function saveIndividuallyWithFallbackAsync( + repository: Repository, + events: T[], +): Promise { + // Note(albrow): This is a temporary hack because `save` is not working as + // documented and is causing a foreign key constraint violation. Hopefully + // can remove later because this "poor man's upsert" implementation operates + // on one event at a time and is therefore much slower. + for (const event of events) { + try { + // First try an insert. + await repository.insert(event); + } catch { + // If it fails, assume it was a foreign key constraint error and try + // doing an update instead. + // Note(albrow): Unfortunately the `as any` hack here seems + // required. I can't figure out how to convince the type-checker + // that the criteria and the entity itself are the correct type for + // the given repository. If we can remove the `save` hack then this + // will probably no longer be necessary. + await repository.update( + { + contractAddress: event.contractAddress, + blockNumber: event.blockNumber, + logIndex: event.logIndex, + } as any, + event as any, + ); + } + } +} + +async function calculateEndBlockAsync(provider: Web3ProviderEngine): Promise { + const web3Wrapper = new Web3Wrapper(provider); + const currentBlock = await web3Wrapper.getBlockNumberAsync(); + return currentBlock - BLOCK_FINALITY_THRESHOLD; +} diff --git a/packages/pipeline/src/scripts/pull_missing_blocks.ts b/packages/pipeline/src/scripts/pull_missing_blocks.ts index b7bd51f08..18fe1b700 100644 --- a/packages/pipeline/src/scripts/pull_missing_blocks.ts +++ b/packages/pipeline/src/scripts/pull_missing_blocks.ts @@ -24,7 +24,7 @@ let connection: Connection; (async () => { connection = await createConnection(ormConfig as ConnectionOptions); const provider = web3Factory.getRpcProvider({ - rpcUrl: `${INFURA_ROOT_URL}/${process.env.INFURA_API_KEY}`, + rpcUrl: INFURA_ROOT_URL, }); const web3Source = new Web3Source(provider); await getAllMissingBlocks(web3Source); diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts deleted file mode 100644 index 80abbb8b0..000000000 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ /dev/null @@ -1,136 +0,0 @@ -// tslint:disable:no-console -import { web3Factory } from '@0x/dev-utils'; -import R = require('ramda'); -import 'reflect-metadata'; -import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; - -import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; -import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeEvent, ExchangeFillEvent } from '../entities'; -import * as ormConfig from '../ormconfig'; -import { parseExchangeCancelEvents, parseExchangeCancelUpToEvents, parseExchangeFillEvents } from '../parsers/events'; -import { EXCHANGE_START_BLOCK, handleError, INFURA_ROOT_URL } from '../utils'; - -const START_BLOCK_OFFSET = 100; // Number of blocks before the last known block to consider when updating fill events. -const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. - -let connection: Connection; - -(async () => { - connection = await createConnection(ormConfig as ConnectionOptions); - const provider = web3Factory.getRpcProvider({ - rpcUrl: INFURA_ROOT_URL, - }); - const eventsSource = new ExchangeEventsSource(provider, 1); - await getFillEventsAsync(eventsSource); - await getCancelEventsAsync(eventsSource); - await getCancelUpToEventsAsync(eventsSource); - process.exit(0); -})().catch(handleError); - -async function getFillEventsAsync(eventsSource: ExchangeEventsSource): Promise { - console.log('Checking existing fill events...'); - const repository = connection.getRepository(ExchangeFillEvent); - const startBlock = await getStartBlockAsync(repository); - console.log(`Getting fill events starting at ${startBlock}...`); - const eventLogs = await eventsSource.getFillEventsAsync(startBlock); - console.log('Parsing fill events...'); - const events = parseExchangeFillEvents(eventLogs); - console.log(`Retrieved and parsed ${events.length} total fill events.`); - await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); -} - -async function getCancelEventsAsync(eventsSource: ExchangeEventsSource): Promise { - console.log('Checking existing cancel events...'); - const repository = connection.getRepository(ExchangeCancelEvent); - const startBlock = await getStartBlockAsync(repository); - console.log(`Getting cancel events starting at ${startBlock}...`); - const eventLogs = await eventsSource.getCancelEventsAsync(startBlock); - console.log('Parsing cancel events...'); - const events = parseExchangeCancelEvents(eventLogs); - console.log(`Retrieved and parsed ${events.length} total cancel events.`); - await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); -} - -async function getCancelUpToEventsAsync(eventsSource: ExchangeEventsSource): Promise { - console.log('Checking existing CancelUpTo events...'); - const repository = connection.getRepository(ExchangeCancelUpToEvent); - const startBlock = await getStartBlockAsync(repository); - console.log(`Getting CancelUpTo events starting at ${startBlock}...`); - const eventLogs = await eventsSource.getCancelUpToEventsAsync(startBlock); - console.log('Parsing CancelUpTo events...'); - const events = parseExchangeCancelUpToEvents(eventLogs); - console.log(`Retrieved and parsed ${events.length} total CancelUpTo events.`); - await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); -} - -const tableNameRegex = /^[a-zA-Z_]*$/; - -async function getStartBlockAsync(repository: Repository): Promise { - const fillEventCount = await repository.count(); - if (fillEventCount === 0) { - console.log(`No existing ${repository.metadata.name}s found.`); - return EXCHANGE_START_BLOCK; - } - const tableName = repository.metadata.tableName; - if (!tableNameRegex.test(tableName)) { - throw new Error(`Unexpected special character in table name: ${tableName}`); - } - const queryResult = await connection.query( - `SELECT block_number FROM raw.${tableName} ORDER BY block_number DESC LIMIT 1`, - ); - const lastKnownBlock = queryResult[0].block_number; - return lastKnownBlock - START_BLOCK_OFFSET; -} - -async function saveEventsAsync( - isInitialPull: boolean, - repository: Repository, - events: T[], -): Promise { - console.log(`Saving ${repository.metadata.name}s...`); - if (isInitialPull) { - // Split data into numChunks pieces of maximum size BATCH_SAVE_SIZE - // each. - for (const eventsBatch of R.splitEvery(BATCH_SAVE_SIZE, events)) { - await repository.insert(eventsBatch); - } - } else { - // If we possibly have some overlap where we need to update some - // existing events, we need to use our workaround/fallback. - await saveIndividuallyWithFallbackAsync(repository, events); - } - const totalEvents = await repository.count(); - console.log(`Done saving events. There are now ${totalEvents} total ${repository.metadata.name}s.`); -} - -async function saveIndividuallyWithFallbackAsync( - repository: Repository, - events: T[], -): Promise { - // Note(albrow): This is a temporary hack because `save` is not working as - // documented and is causing a foreign key constraint violation. Hopefully - // can remove later because this "poor man's upsert" implementation operates - // on one event at a time and is therefore much slower. - for (const event of events) { - try { - // First try an insert. - await repository.insert(event); - } catch { - // If it fails, assume it was a foreign key constraint error and try - // doing an update instead. - // Note(albrow): Unfortunately the `as any` hack here seems - // required. I can't figure out how to convince the type-checker - // that the criteria and the entity itself are the correct type for - // the given repository. If we can remove the `save` hack then this - // will probably no longer be necessary. - await repository.update( - { - contractAddress: event.contractAddress, - blockNumber: event.blockNumber, - logIndex: event.logIndex, - } as any, - event as any, - ); - } - } -} -- cgit v1.2.3 From 42be1a429fd9286a72e19b782c9b906cb3c0f8ad Mon Sep 17 00:00:00 2001 From: zkao Date: Tue, 11 Dec 2018 15:48:54 -0800 Subject: track idex orderbook snapshots (#1397) * Track Idex and Oasis Orderbook Snapshots --- .../src/scripts/pull_idex_orderbook_snapshots.ts | 63 ++++++++++++++++++++++ .../src/scripts/pull_oasis_orderbook_snapshots.ts | 58 ++++++++++++++++++++ 2 files changed, 121 insertions(+) create mode 100644 packages/pipeline/src/scripts/pull_idex_orderbook_snapshots.ts create mode 100644 packages/pipeline/src/scripts/pull_oasis_orderbook_snapshots.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_idex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_idex_orderbook_snapshots.ts new file mode 100644 index 000000000..d47c1dd3f --- /dev/null +++ b/packages/pipeline/src/scripts/pull_idex_orderbook_snapshots.ts @@ -0,0 +1,63 @@ +import { logUtils } from '@0x/utils'; +import * as R from 'ramda'; +import { Connection, ConnectionOptions, createConnection } from 'typeorm'; + +import { IDEX_SOURCE, IdexSource } from '../data_sources/idex'; +import { TokenOrderbookSnapshot as TokenOrder } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseIdexOrders } from '../parsers/idex_orders'; +import { handleError } from '../utils'; + +// Number of orders to save at once. +const BATCH_SAVE_SIZE = 1000; + +// Number of markets to retrieve orderbooks for at once. +const MARKET_ORDERBOOK_REQUEST_BATCH_SIZE = 100; + +// Delay between market orderbook requests. +const MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY = 2000; + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const idexSource = new IdexSource(); + logUtils.log('Getting all IDEX markets'); + const markets = await idexSource.getMarketsAsync(); + logUtils.log(`Got ${markets.length} markets.`); + for (const marketsChunk of R.splitEvery(MARKET_ORDERBOOK_REQUEST_BATCH_SIZE, markets)) { + await Promise.all( + marketsChunk.map(async (marketId: string) => getAndSaveMarketOrderbook(idexSource, marketId)), + ); + await new Promise(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY)); + } + process.exit(0); +})().catch(handleError); + +/** + * Retrieve orderbook from Idex API for a given market. Parse orders and insert + * them into our database. + * @param idexSource Data source which can query Idex API. + * @param marketId String representing market of interest, eg. 'ETH_TIC'. + */ +async function getAndSaveMarketOrderbook(idexSource: IdexSource, marketId: string): Promise { + logUtils.log(`${marketId}: Retrieving orderbook.`); + const orderBook = await idexSource.getMarketOrderbookAsync(marketId); + const observedTimestamp = Date.now(); + + if (!R.has('bids', orderBook) || !R.has('asks', orderBook)) { + logUtils.warn(`${marketId}: Orderbook faulty.`); + return; + } + + logUtils.log(`${marketId}: Parsing orders.`); + const orders = parseIdexOrders(orderBook, observedTimestamp, IDEX_SOURCE); + + if (orders.length > 0) { + logUtils.log(`${marketId}: Saving ${orders.length} orders.`); + const TokenOrderRepository = connection.getRepository(TokenOrder); + await TokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) }); + } else { + logUtils.log(`${marketId}: 0 orders to save.`); + } +} diff --git a/packages/pipeline/src/scripts/pull_oasis_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_oasis_orderbook_snapshots.ts new file mode 100644 index 000000000..0ffa5fd47 --- /dev/null +++ b/packages/pipeline/src/scripts/pull_oasis_orderbook_snapshots.ts @@ -0,0 +1,58 @@ +import { logUtils } from '@0x/utils'; +import * as R from 'ramda'; +import { Connection, ConnectionOptions, createConnection } from 'typeorm'; + +import { OASIS_SOURCE, OasisMarket, OasisSource } from '../data_sources/oasis'; +import { TokenOrderbookSnapshot as TokenOrder } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseOasisOrders } from '../parsers/oasis_orders'; +import { handleError } from '../utils'; + +// Number of orders to save at once. +const BATCH_SAVE_SIZE = 1000; + +// Number of markets to retrieve orderbooks for at once. +const MARKET_ORDERBOOK_REQUEST_BATCH_SIZE = 50; + +// Delay between market orderbook requests. +const MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY = 1000; + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const oasisSource = new OasisSource(); + logUtils.log('Getting all active Oasis markets'); + const markets = await oasisSource.getActiveMarketsAsync(); + logUtils.log(`Got ${markets.length} markets.`); + for (const marketsChunk of R.splitEvery(MARKET_ORDERBOOK_REQUEST_BATCH_SIZE, markets)) { + await Promise.all( + marketsChunk.map(async (market: OasisMarket) => getAndSaveMarketOrderbook(oasisSource, market)), + ); + await new Promise(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY)); + } + process.exit(0); +})().catch(handleError); + +/** + * Retrieve orderbook from Oasis API for a given market. Parse orders and insert + * them into our database. + * @param oasisSource Data source which can query Oasis API. + * @param marketId String identifying market we want data for. eg. 'REPAUG'. + */ +async function getAndSaveMarketOrderbook(oasisSource: OasisSource, market: OasisMarket): Promise { + logUtils.log(`${market.id}: Retrieving orderbook.`); + const orderBook = await oasisSource.getMarketOrderbookAsync(market.id); + const observedTimestamp = Date.now(); + + logUtils.log(`${market.id}: Parsing orders.`); + const orders = parseOasisOrders(orderBook, market, observedTimestamp, OASIS_SOURCE); + + if (orders.length > 0) { + logUtils.log(`${market.id}: Saving ${orders.length} orders.`); + const TokenOrderRepository = connection.getRepository(TokenOrder); + await TokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) }); + } else { + logUtils.log(`${market.id}: 0 orders to save.`); + } +} -- cgit v1.2.3 From 5cff9110355f22a0887febe8728d445dc3867125 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Tue, 11 Dec 2018 17:31:16 -0800 Subject: Make pull_missing_blocks script consider all events with block numbers (#1420) --- packages/pipeline/src/scripts/pull_missing_blocks.ts | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_blocks.ts b/packages/pipeline/src/scripts/pull_missing_blocks.ts index 18fe1b700..275141a12 100644 --- a/packages/pipeline/src/scripts/pull_missing_blocks.ts +++ b/packages/pipeline/src/scripts/pull_missing_blocks.ts @@ -53,8 +53,21 @@ async function getAllMissingBlocks(web3Source: Web3Source): Promise { async function getMissingBlockNumbers(fromBlock: number): Promise { console.log(`Checking for missing blocks starting at ${fromBlock}...`); + // Note(albrow): The easiest way to get all the blocks we need is to + // consider all the events tables together in a single query. If this query + // gets too slow, we should consider re-architecting so that we can work on + // getting the blocks for one type of event at a time. const response = (await connection.query( - 'SELECT DISTINCT(block_number) FROM raw.exchange_fill_events WHERE block_number NOT IN (SELECT number FROM raw.blocks) AND block_number >= $1 ORDER BY block_number ASC LIMIT $2', + `WITH all_events AS ( + SELECT block_number FROM raw.exchange_fill_events + UNION SELECT block_number FROM raw.exchange_cancel_events + UNION SELECT block_number FROM raw.exchange_cancel_up_to_events + UNION SELECT block_number FROM raw.erc20_approval_events + ) + SELECT DISTINCT(block_number) FROM all_events + WHERE block_number NOT IN (SELECT number FROM raw.blocks) + AND block_number >= $1 + ORDER BY block_number ASC LIMIT $2`, [fromBlock, MAX_BLOCKS_PER_QUERY], )) as MissingBlocksResponse[]; const blockNumberStrings = R.pluck('block_number', response); -- cgit v1.2.3 From 1534505652aa12a385dee46f876c22cc4ce8621a Mon Sep 17 00:00:00 2001 From: Leonid Logvinov Date: Wed, 12 Dec 2018 15:32:41 -0800 Subject: Fix linter errors --- packages/pipeline/src/scripts/pull_competing_dex_trades.ts | 4 ++-- .../pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts | 4 ++-- .../pipeline/src/scripts/pull_idex_orderbook_snapshots.ts | 4 ++-- packages/pipeline/src/scripts/pull_missing_blocks.ts | 12 ++++++------ .../pipeline/src/scripts/pull_oasis_orderbook_snapshots.ts | 4 ++-- .../pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts | 4 ++-- packages/pipeline/src/scripts/pull_trusted_tokens.ts | 8 ++++---- packages/pipeline/src/scripts/update_relayer_info.ts | 4 ++-- 8 files changed, 22 insertions(+), 22 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_competing_dex_trades.ts b/packages/pipeline/src/scripts/pull_competing_dex_trades.ts index 4e4c12dd0..1478d5615 100644 --- a/packages/pipeline/src/scripts/pull_competing_dex_trades.ts +++ b/packages/pipeline/src/scripts/pull_competing_dex_trades.ts @@ -15,11 +15,11 @@ let connection: Connection; (async () => { connection = await createConnection(ormConfig as ConnectionOptions); - await getAndSaveTrades(); + await getAndSaveTradesAsync(); process.exit(0); })().catch(handleError); -async function getAndSaveTrades(): Promise { +async function getAndSaveTradesAsync(): Promise { const apiKey = process.env.BLOXY_API_KEY; if (apiKey === undefined) { throw new Error('Missing required env var: BLOXY_API_KEY'); diff --git a/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts index 7868e9c5a..4e00f258f 100644 --- a/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts +++ b/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts @@ -25,7 +25,7 @@ let connection: Connection; const markets = await ddexSource.getActiveMarketsAsync(); for (const marketsChunk of R.splitEvery(MARKET_ORDERBOOK_REQUEST_BATCH_SIZE, markets)) { await Promise.all( - marketsChunk.map(async (market: DdexMarket) => getAndSaveMarketOrderbook(ddexSource, market)), + marketsChunk.map(async (market: DdexMarket) => getAndSaveMarketOrderbookAsync(ddexSource, market)), ); await new Promise(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY)); } @@ -38,7 +38,7 @@ let connection: Connection; * @param ddexSource Data source which can query Ddex API. * @param market Object from Ddex API containing market data. */ -async function getAndSaveMarketOrderbook(ddexSource: DdexSource, market: DdexMarket): Promise { +async function getAndSaveMarketOrderbookAsync(ddexSource: DdexSource, market: DdexMarket): Promise { const orderBook = await ddexSource.getMarketOrderbookAsync(market.id); const observedTimestamp = Date.now(); diff --git a/packages/pipeline/src/scripts/pull_idex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_idex_orderbook_snapshots.ts index d47c1dd3f..490b17766 100644 --- a/packages/pipeline/src/scripts/pull_idex_orderbook_snapshots.ts +++ b/packages/pipeline/src/scripts/pull_idex_orderbook_snapshots.ts @@ -27,7 +27,7 @@ let connection: Connection; logUtils.log(`Got ${markets.length} markets.`); for (const marketsChunk of R.splitEvery(MARKET_ORDERBOOK_REQUEST_BATCH_SIZE, markets)) { await Promise.all( - marketsChunk.map(async (marketId: string) => getAndSaveMarketOrderbook(idexSource, marketId)), + marketsChunk.map(async (marketId: string) => getAndSaveMarketOrderbookAsync(idexSource, marketId)), ); await new Promise(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY)); } @@ -40,7 +40,7 @@ let connection: Connection; * @param idexSource Data source which can query Idex API. * @param marketId String representing market of interest, eg. 'ETH_TIC'. */ -async function getAndSaveMarketOrderbook(idexSource: IdexSource, marketId: string): Promise { +async function getAndSaveMarketOrderbookAsync(idexSource: IdexSource, marketId: string): Promise { logUtils.log(`${marketId}: Retrieving orderbook.`); const orderBook = await idexSource.getMarketOrderbookAsync(marketId); const observedTimestamp = Date.now(); diff --git a/packages/pipeline/src/scripts/pull_missing_blocks.ts b/packages/pipeline/src/scripts/pull_missing_blocks.ts index 275141a12..a5203824c 100644 --- a/packages/pipeline/src/scripts/pull_missing_blocks.ts +++ b/packages/pipeline/src/scripts/pull_missing_blocks.ts @@ -27,7 +27,7 @@ let connection: Connection; rpcUrl: INFURA_ROOT_URL, }); const web3Source = new Web3Source(provider); - await getAllMissingBlocks(web3Source); + await getAllMissingBlocksAsync(web3Source); process.exit(0); })().catch(handleError); @@ -35,23 +35,23 @@ interface MissingBlocksResponse { block_number: string; } -async function getAllMissingBlocks(web3Source: Web3Source): Promise { +async function getAllMissingBlocksAsync(web3Source: Web3Source): Promise { const blocksRepository = connection.getRepository(Block); let fromBlock = EXCHANGE_START_BLOCK; while (true) { - const blockNumbers = await getMissingBlockNumbers(fromBlock); + const blockNumbers = await getMissingBlockNumbersAsync(fromBlock); if (blockNumbers.length === 0) { // There are no more missing blocks. We're done. break; } - await getAndSaveBlocks(web3Source, blocksRepository, blockNumbers); + await getAndSaveBlocksAsync(web3Source, blocksRepository, blockNumbers); fromBlock = Math.max(...blockNumbers) + 1; } const totalBlocks = await blocksRepository.count(); console.log(`Done saving blocks. There are now ${totalBlocks} total blocks.`); } -async function getMissingBlockNumbers(fromBlock: number): Promise { +async function getMissingBlockNumbersAsync(fromBlock: number): Promise { console.log(`Checking for missing blocks starting at ${fromBlock}...`); // Note(albrow): The easiest way to get all the blocks we need is to // consider all the events tables together in a single query. If this query @@ -76,7 +76,7 @@ async function getMissingBlockNumbers(fromBlock: number): Promise { return blockNumbers; } -async function getAndSaveBlocks( +async function getAndSaveBlocksAsync( web3Source: Web3Source, blocksRepository: Repository, blockNumbers: number[], diff --git a/packages/pipeline/src/scripts/pull_oasis_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_oasis_orderbook_snapshots.ts index 0ffa5fd47..c4dcf6c83 100644 --- a/packages/pipeline/src/scripts/pull_oasis_orderbook_snapshots.ts +++ b/packages/pipeline/src/scripts/pull_oasis_orderbook_snapshots.ts @@ -27,7 +27,7 @@ let connection: Connection; logUtils.log(`Got ${markets.length} markets.`); for (const marketsChunk of R.splitEvery(MARKET_ORDERBOOK_REQUEST_BATCH_SIZE, markets)) { await Promise.all( - marketsChunk.map(async (market: OasisMarket) => getAndSaveMarketOrderbook(oasisSource, market)), + marketsChunk.map(async (market: OasisMarket) => getAndSaveMarketOrderbookAsync(oasisSource, market)), ); await new Promise(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY)); } @@ -40,7 +40,7 @@ let connection: Connection; * @param oasisSource Data source which can query Oasis API. * @param marketId String identifying market we want data for. eg. 'REPAUG'. */ -async function getAndSaveMarketOrderbook(oasisSource: OasisSource, market: OasisMarket): Promise { +async function getAndSaveMarketOrderbookAsync(oasisSource: OasisSource, market: OasisMarket): Promise { logUtils.log(`${market.id}: Retrieving orderbook.`); const orderBook = await oasisSource.getMarketOrderbookAsync(market.id); const observedTimestamp = Date.now(); diff --git a/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts index bae1fbede..34345f355 100644 --- a/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts +++ b/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts @@ -29,7 +29,7 @@ let connection: Connection; const tokenInfoResponse = await paradexSource.getTokenInfoAsync(); const extendedMarkets = addTokenAddresses(markets, tokenInfoResponse); await Promise.all( - extendedMarkets.map(async (market: ParadexMarket) => getAndSaveMarketOrderbook(paradexSource, market)), + extendedMarkets.map(async (market: ParadexMarket) => getAndSaveMarketOrderbookAsync(paradexSource, market)), ); process.exit(0); })().catch(handleError); @@ -70,7 +70,7 @@ function addTokenAddresses( * @param paradexSource Data source which can query the Paradex API. * @param market Object from the Paradex API with information about the market in question. */ -async function getAndSaveMarketOrderbook(paradexSource: ParadexSource, market: ParadexMarket): Promise { +async function getAndSaveMarketOrderbookAsync(paradexSource: ParadexSource, market: ParadexMarket): Promise { const paradexOrderbookResponse = await paradexSource.getMarketOrderbookAsync(market.symbol); const observedTimestamp = Date.now(); diff --git a/packages/pipeline/src/scripts/pull_trusted_tokens.ts b/packages/pipeline/src/scripts/pull_trusted_tokens.ts index 1befc4437..5906deee6 100644 --- a/packages/pipeline/src/scripts/pull_trusted_tokens.ts +++ b/packages/pipeline/src/scripts/pull_trusted_tokens.ts @@ -16,12 +16,12 @@ let connection: Connection; (async () => { connection = await createConnection(ormConfig as ConnectionOptions); - await getMetamaskTrustedTokens(); - await getZeroExTrustedTokens(); + await getMetamaskTrustedTokensAsync(); + await getZeroExTrustedTokensAsync(); process.exit(0); })().catch(handleError); -async function getMetamaskTrustedTokens(): Promise { +async function getMetamaskTrustedTokensAsync(): Promise { // tslint:disable-next-line:no-console console.log('Getting latest metamask trusted tokens list ...'); const trustedTokensRepository = connection.getRepository(TokenMetadata); @@ -37,7 +37,7 @@ async function getMetamaskTrustedTokens(): Promise { console.log('Done saving metamask trusted tokens.'); } -async function getZeroExTrustedTokens(): Promise { +async function getZeroExTrustedTokensAsync(): Promise { // tslint:disable-next-line:no-console console.log('Getting latest 0x trusted tokens list ...'); const trustedTokensRepository = connection.getRepository(TokenMetadata); diff --git a/packages/pipeline/src/scripts/update_relayer_info.ts b/packages/pipeline/src/scripts/update_relayer_info.ts index f8918728d..41d29b385 100644 --- a/packages/pipeline/src/scripts/update_relayer_info.ts +++ b/packages/pipeline/src/scripts/update_relayer_info.ts @@ -17,11 +17,11 @@ let connection: Connection; (async () => { connection = await createConnection(ormConfig as ConnectionOptions); - await getRelayers(); + await getRelayersAsync(); process.exit(0); })().catch(handleError); -async function getRelayers(): Promise { +async function getRelayersAsync(): Promise { console.log('Getting latest relayer info...'); const relayerRepository = connection.getRepository(Relayer); const relayerSource = new RelayerRegistrySource(RELAYER_REGISTRY_URL); -- cgit v1.2.3 From 28713bdb3859e24b78e784722813b03e95788e1f Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Mon, 17 Dec 2018 11:36:03 -0800 Subject: Pull approval events for ZRX and DAI (#1430) --- packages/pipeline/src/scripts/pull_erc20_events.ts | 68 ++++++++++++++++------ 1 file changed, 49 insertions(+), 19 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_erc20_events.ts b/packages/pipeline/src/scripts/pull_erc20_events.ts index 0ad12c97a..bd520c610 100644 --- a/packages/pipeline/src/scripts/pull_erc20_events.ts +++ b/packages/pipeline/src/scripts/pull_erc20_events.ts @@ -1,10 +1,10 @@ -// tslint:disable:no-console import { getContractAddressesForNetworkOrThrow } from '@0x/contract-addresses'; import { web3Factory } from '@0x/dev-utils'; import { Web3ProviderEngine } from '@0x/subproviders'; +import { logUtils } from '@0x/utils'; import { Web3Wrapper } from '@0x/web3-wrapper'; import 'reflect-metadata'; -import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; +import { Connection, ConnectionOptions, createConnection } from 'typeorm'; import { ERC20EventsSource } from '../data_sources/contract-wrappers/erc20_events'; import { ERC20ApprovalEvent } from '../entities'; @@ -16,33 +16,63 @@ const NETWORK_ID = 1; const START_BLOCK_OFFSET = 100; // Number of blocks before the last known block to consider when updating fill events. const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. const BLOCK_FINALITY_THRESHOLD = 10; // When to consider blocks as final. Used to compute default endBlock. -const WETH_START_BLOCK = 4719568; // Block number when the WETH contract was deployed. let connection: Connection; +interface Token { + // name is used for logging only. + name: string; + address: string; + defaultStartBlock: number; +} + +const tokensToGetApprovalEvents: Token[] = [ + { + name: 'WETH', + address: getContractAddressesForNetworkOrThrow(NETWORK_ID).etherToken, + defaultStartBlock: 4719568, // Block when the WETH contract was deployed. + }, + { + name: 'ZRX', + address: getContractAddressesForNetworkOrThrow(NETWORK_ID).zrxToken, + defaultStartBlock: 4145415, // Block when the ZRX contract was deployed. + }, + { + name: 'DAI', + address: '0x89d24a6b4ccb1b6faa2625fe562bdd9a23260359', + defaultStartBlock: 4752008, // Block when the DAI contract was deployed. + }, +]; + (async () => { connection = await createConnection(ormConfig as ConnectionOptions); const provider = web3Factory.getRpcProvider({ rpcUrl: INFURA_ROOT_URL, }); const endBlock = await calculateEndBlockAsync(provider); - await getAndSaveWETHApprovalEventsAsync(provider, endBlock); + for (const token of tokensToGetApprovalEvents) { + await getAndSaveApprovalEventsAsync(provider, token, endBlock); + } process.exit(0); })().catch(handleError); -async function getAndSaveWETHApprovalEventsAsync(provider: Web3ProviderEngine, endBlock: number): Promise { - console.log('Checking existing approval events...'); +async function getAndSaveApprovalEventsAsync( + provider: Web3ProviderEngine, + token: Token, + endBlock: number, +): Promise { + logUtils.log(`Getting approval events for ${token.name}...`); + logUtils.log('Checking existing approval events...'); const repository = connection.getRepository(ERC20ApprovalEvent); - const startBlock = (await getStartBlockAsync(repository)) || WETH_START_BLOCK; + const startBlock = (await getStartBlockAsync(token)) || token.defaultStartBlock; - console.log(`Getting WETH approval events starting at ${startBlock}...`); - const wethTokenAddress = getContractAddressesForNetworkOrThrow(NETWORK_ID).etherToken; - const eventsSource = new ERC20EventsSource(provider, NETWORK_ID, wethTokenAddress); + logUtils.log(`Getting approval events starting at ${startBlock}...`); + const eventsSource = new ERC20EventsSource(provider, NETWORK_ID, token.address); const eventLogs = await eventsSource.getApprovalEventsAsync(startBlock, endBlock); - console.log(`Parsing ${eventLogs.length} WETH approval events...`); + logUtils.log(`Parsing ${eventLogs.length} approval events...`); const events = parseERC20ApprovalEvents(eventLogs); - console.log(`Retrieved and parsed ${events.length} total WETH approval events.`); + logUtils.log(`Retrieved and parsed ${events.length} total approval events.`); await repository.save(events, { chunk: Math.ceil(events.length / BATCH_SAVE_SIZE) }); } @@ -52,15 +82,15 @@ async function calculateEndBlockAsync(provider: Web3ProviderEngine): Promise): Promise { - const fillEventCount = await repository.count(); - if (fillEventCount === 0) { - console.log(`No existing approval events found.`); - return null; - } +async function getStartBlockAsync(token: Token): Promise { const queryResult = await connection.query( - `SELECT block_number FROM raw.erc20_approval_events ORDER BY block_number DESC LIMIT 1`, + `SELECT block_number FROM raw.erc20_approval_events WHERE token_address = $1 ORDER BY block_number DESC LIMIT 1`, + [token.address], ); + if (queryResult.length === 0) { + logUtils.log(`No existing approval events found for ${token.name}.`); + return null; + } const lastKnownBlock = queryResult[0].block_number; return lastKnownBlock - START_BLOCK_OFFSET; } -- cgit v1.2.3 From 44d9cc53b87c5a19586c05030292b5b21351dc74 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Mon, 17 Dec 2018 17:07:22 -0800 Subject: Fix bug in pull_missing_blocks with incorrect start block (#1438) --- packages/pipeline/src/scripts/pull_missing_blocks.ts | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_blocks.ts b/packages/pipeline/src/scripts/pull_missing_blocks.ts index a5203824c..bb5385126 100644 --- a/packages/pipeline/src/scripts/pull_missing_blocks.ts +++ b/packages/pipeline/src/scripts/pull_missing_blocks.ts @@ -9,7 +9,7 @@ import { Web3Source } from '../data_sources/web3'; import { Block } from '../entities'; import * as ormConfig from '../ormconfig'; import { parseBlock } from '../parsers/web3'; -import { EXCHANGE_START_BLOCK, handleError, INFURA_ROOT_URL } from '../utils'; +import { handleError, INFURA_ROOT_URL } from '../utils'; // Number of blocks to save at once. const BATCH_SAVE_SIZE = 1000; @@ -37,22 +37,19 @@ interface MissingBlocksResponse { async function getAllMissingBlocksAsync(web3Source: Web3Source): Promise { const blocksRepository = connection.getRepository(Block); - let fromBlock = EXCHANGE_START_BLOCK; while (true) { - const blockNumbers = await getMissingBlockNumbersAsync(fromBlock); + const blockNumbers = await getMissingBlockNumbersAsync(); if (blockNumbers.length === 0) { // There are no more missing blocks. We're done. break; } await getAndSaveBlocksAsync(web3Source, blocksRepository, blockNumbers); - fromBlock = Math.max(...blockNumbers) + 1; } const totalBlocks = await blocksRepository.count(); console.log(`Done saving blocks. There are now ${totalBlocks} total blocks.`); } -async function getMissingBlockNumbersAsync(fromBlock: number): Promise { - console.log(`Checking for missing blocks starting at ${fromBlock}...`); +async function getMissingBlockNumbersAsync(): Promise { // Note(albrow): The easiest way to get all the blocks we need is to // consider all the events tables together in a single query. If this query // gets too slow, we should consider re-architecting so that we can work on @@ -66,13 +63,12 @@ async function getMissingBlockNumbersAsync(fromBlock: number): Promise ) SELECT DISTINCT(block_number) FROM all_events WHERE block_number NOT IN (SELECT number FROM raw.blocks) - AND block_number >= $1 - ORDER BY block_number ASC LIMIT $2`, - [fromBlock, MAX_BLOCKS_PER_QUERY], + ORDER BY block_number ASC LIMIT $1`, + [MAX_BLOCKS_PER_QUERY], )) as MissingBlocksResponse[]; const blockNumberStrings = R.pluck('block_number', response); const blockNumbers = R.map(parseInt, blockNumberStrings); - console.log(`Found ${blockNumbers.length} missing blocks in the given range.`); + console.log(`Found ${blockNumbers.length} missing blocks.`); return blockNumbers; } -- cgit v1.2.3 From 7dda953bc929e218121c331fedb3884b24555855 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Mon, 7 Jan 2019 15:04:00 -0800 Subject: Optimize SQL queries in pull_missing_blocks (#1458) * Optimize SQL queries in pull_missing_blocks * Update comment in pull_missing_blocks --- .../pipeline/src/scripts/pull_missing_blocks.ts | 39 +++++++++++----------- 1 file changed, 20 insertions(+), 19 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_blocks.ts b/packages/pipeline/src/scripts/pull_missing_blocks.ts index bb5385126..ced9d99eb 100644 --- a/packages/pipeline/src/scripts/pull_missing_blocks.ts +++ b/packages/pipeline/src/scripts/pull_missing_blocks.ts @@ -14,20 +14,29 @@ import { handleError, INFURA_ROOT_URL } from '../utils'; // Number of blocks to save at once. const BATCH_SAVE_SIZE = 1000; // Maximum number of requests to send at once. -const MAX_CONCURRENCY = 10; +const MAX_CONCURRENCY = 20; // Maximum number of blocks to query for at once. This is also the maximum // number of blocks we will hold in memory prior to being saved to the database. const MAX_BLOCKS_PER_QUERY = 1000; let connection: Connection; +const tablesWithMissingBlocks = [ + 'raw.exchange_fill_events', + 'raw.exchange_cancel_events', + 'raw.exchange_cancel_up_to_events', + 'raw.erc20_approval_events', +]; + (async () => { connection = await createConnection(ormConfig as ConnectionOptions); const provider = web3Factory.getRpcProvider({ rpcUrl: INFURA_ROOT_URL, }); const web3Source = new Web3Source(provider); - await getAllMissingBlocksAsync(web3Source); + for (const tableName of tablesWithMissingBlocks) { + await getAllMissingBlocksAsync(web3Source, tableName); + } process.exit(0); })().catch(handleError); @@ -35,10 +44,11 @@ interface MissingBlocksResponse { block_number: string; } -async function getAllMissingBlocksAsync(web3Source: Web3Source): Promise { +async function getAllMissingBlocksAsync(web3Source: Web3Source, tableName: string): Promise { const blocksRepository = connection.getRepository(Block); while (true) { - const blockNumbers = await getMissingBlockNumbersAsync(); + console.log(`Checking for missing blocks in ${tableName}...`); + const blockNumbers = await getMissingBlockNumbersAsync(tableName); if (blockNumbers.length === 0) { // There are no more missing blocks. We're done. break; @@ -46,24 +56,14 @@ async function getAllMissingBlocksAsync(web3Source: Web3Source): Promise { await getAndSaveBlocksAsync(web3Source, blocksRepository, blockNumbers); } const totalBlocks = await blocksRepository.count(); - console.log(`Done saving blocks. There are now ${totalBlocks} total blocks.`); + console.log(`Done saving blocks for ${tableName}. There are now ${totalBlocks} total blocks.`); } -async function getMissingBlockNumbersAsync(): Promise { - // Note(albrow): The easiest way to get all the blocks we need is to - // consider all the events tables together in a single query. If this query - // gets too slow, we should consider re-architecting so that we can work on - // getting the blocks for one type of event at a time. +async function getMissingBlockNumbersAsync(tableName: string): Promise { + // This query returns up to `MAX_BLOCKS_PER_QUERY` distinct block numbers + // which are present in `tableName` but not in `raw.blocks`. const response = (await connection.query( - `WITH all_events AS ( - SELECT block_number FROM raw.exchange_fill_events - UNION SELECT block_number FROM raw.exchange_cancel_events - UNION SELECT block_number FROM raw.exchange_cancel_up_to_events - UNION SELECT block_number FROM raw.erc20_approval_events - ) - SELECT DISTINCT(block_number) FROM all_events - WHERE block_number NOT IN (SELECT number FROM raw.blocks) - ORDER BY block_number ASC LIMIT $1`, + `SELECT DISTINCT(block_number) FROM ${tableName} LEFT JOIN raw.blocks ON ${tableName}.block_number = raw.blocks.number WHERE number IS NULL LIMIT $1;`, [MAX_BLOCKS_PER_QUERY], )) as MissingBlocksResponse[]; const blockNumberStrings = R.pluck('block_number', response); @@ -86,4 +86,5 @@ async function getAndSaveBlocksAsync( const blocks = R.map(parseBlock, rawBlocks); console.log(`Saving ${blocks.length} blocks...`); await blocksRepository.save(blocks, { chunk: Math.ceil(blocks.length / BATCH_SAVE_SIZE) }); + console.log('Done saving this batch of blocks'); } -- cgit v1.2.3 From 27fc640a9eae70813e9f02cda2c27a4509a04591 Mon Sep 17 00:00:00 2001 From: Xianny <8582774+xianny@users.noreply.github.com> Date: Tue, 8 Jan 2019 13:50:51 -0800 Subject: fetch and save copper (#1472) Fetch and save Copper CRM --- packages/pipeline/src/scripts/pull_copper.ts | 129 +++++++++++++++++++++++++++ 1 file changed, 129 insertions(+) create mode 100644 packages/pipeline/src/scripts/pull_copper.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_copper.ts b/packages/pipeline/src/scripts/pull_copper.ts new file mode 100644 index 000000000..69814f209 --- /dev/null +++ b/packages/pipeline/src/scripts/pull_copper.ts @@ -0,0 +1,129 @@ +// tslint:disable:no-console +import * as R from 'ramda'; +import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; + +import { CopperEndpoint, CopperSearchParams, CopperSource } from '../data_sources/copper'; +import { CopperActivity, CopperActivityType, CopperCustomField, CopperLead, CopperOpportunity } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { + CopperSearchResponse, + parseActivities, + parseActivityTypes, + parseCustomFields, + parseLeads, + parseOpportunities, +} from '../parsers/copper'; +import { handleError } from '../utils'; +const ONE_SECOND = 1000; +const COPPER_RATE_LIMIT = 10; +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + + const accessToken = process.env.COPPER_ACCESS_TOKEN; + const userEmail = process.env.COPPER_USER_EMAIL; + if (accessToken === undefined || userEmail === undefined) { + throw new Error('Missing required env var: COPPER_ACCESS_TOKEN and/or COPPER_USER_EMAIL'); + } + const source = new CopperSource(COPPER_RATE_LIMIT, accessToken, userEmail); + + const fetchPromises = [ + fetchAndSaveLeadsAsync(source), + fetchAndSaveOpportunitiesAsync(source), + fetchAndSaveActivitiesAsync(source), + fetchAndSaveCustomFieldsAsync(source), + fetchAndSaveActivityTypesAsync(source), + ]; + fetchPromises.forEach(async fn => { + await fn; + }); +})().catch(handleError); + +async function fetchAndSaveLeadsAsync(source: CopperSource): Promise { + const repository = connection.getRepository(CopperLead); + const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_leads'); + console.log(`Fetching Copper leads starting from ${startTime}...`); + await fetchAndSaveAsync(CopperEndpoint.Leads, source, startTime, {}, parseLeads, repository); +} + +async function fetchAndSaveOpportunitiesAsync(source: CopperSource): Promise { + const repository = connection.getRepository(CopperOpportunity); + const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_opportunities'); + console.log(`Fetching Copper opportunities starting from ${startTime}...`); + await fetchAndSaveAsync( + CopperEndpoint.Opportunities, + source, + startTime, + { sort_by: 'name' }, + parseOpportunities, + repository, + ); +} + +async function fetchAndSaveActivitiesAsync(source: CopperSource): Promise { + const repository = connection.getRepository(CopperActivity); + const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_activities'); + const searchParams = { + minimum_activity_date: Math.floor(startTime / ONE_SECOND), + }; + console.log(`Fetching Copper activities starting from ${startTime}...`); + await fetchAndSaveAsync(CopperEndpoint.Activities, source, startTime, searchParams, parseActivities, repository); +} + +async function getMaxAsync(conn: Connection, sortColumn: string, tableName: string): Promise { + const queryResult = await conn.query(`SELECT MAX(${sortColumn}) as _max from ${tableName};`); + if (R.isEmpty(queryResult)) { + return 0; + } else { + return queryResult[0]._max; + } +} + +// (Xianny): Copper API doesn't allow queries to filter by date. To ensure that we are filling in ascending chronological +// order and not missing any records, we are scraping all available pages. If Copper data gets larger, +// it would make sense to search for and start filling from the first page that contains a new record. +// This search would increase our network calls and is not efficient to implement with our current small volume +// of Copper records. +async function fetchAndSaveAsync( + endpoint: CopperEndpoint, + source: CopperSource, + startTime: number, + searchParams: CopperSearchParams, + parseFn: (recs: T[]) => E[], + repository: Repository, +): Promise { + let saved = 0; + const numPages = await source.fetchNumberOfPagesAsync(endpoint); + try { + for (let i = numPages; i > 0; i--) { + console.log(`Fetching page ${i}/${numPages} of ${endpoint}...`); + const raw = await source.fetchSearchResultsAsync(endpoint, { + ...searchParams, + page_number: i, + }); + const newRecords = raw.filter(rec => rec.date_modified * ONE_SECOND > startTime); + const parsed = parseFn(newRecords); + await repository.save(parsed); + saved += newRecords.length; + } + } catch (err) { + console.log(`Error fetching ${endpoint}, stopping: ${err.stack}`); + } finally { + console.log(`Saved ${saved} items from ${endpoint}, done.`); + } +} + +async function fetchAndSaveActivityTypesAsync(source: CopperSource): Promise { + console.log(`Fetching Copper activity types...`); + const activityTypes = await source.fetchActivityTypesAsync(); + const repository = connection.getRepository(CopperActivityType); + await repository.save(parseActivityTypes(activityTypes)); +} + +async function fetchAndSaveCustomFieldsAsync(source: CopperSource): Promise { + console.log(`Fetching Copper custom fields...`); + const customFields = await source.fetchCustomFieldsAsync(); + const repository = connection.getRepository(CopperCustomField); + await repository.save(parseCustomFields(customFields)); +} -- cgit v1.2.3