From 57e7119c0d4f1ab7dd1d4c0118e72dc1706e2151 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Mon, 17 Sep 2018 11:27:38 -0700 Subject: Rebase pipeline branch off development --- packages/pipeline/src/scripts/create_tables.ts | 258 ++++++++++ packages/pipeline/src/scripts/join_tables.ts | 234 +++++++++ packages/pipeline/src/scripts/query_data.ts | 87 ++++ packages/pipeline/src/scripts/scrape_data.ts | 649 +++++++++++++++++++++++++ 4 files changed, 1228 insertions(+) create mode 100644 packages/pipeline/src/scripts/create_tables.ts create mode 100644 packages/pipeline/src/scripts/join_tables.ts create mode 100644 packages/pipeline/src/scripts/query_data.ts create mode 100644 packages/pipeline/src/scripts/scrape_data.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/create_tables.ts b/packages/pipeline/src/scripts/create_tables.ts new file mode 100644 index 000000000..fd0d2b78b --- /dev/null +++ b/packages/pipeline/src/scripts/create_tables.ts @@ -0,0 +1,258 @@ +import * as commandLineArgs from 'command-line-args'; + +import { postgresClient } from '../postgres'; +import { formatters } from '../utils'; +const tableQueries: any = { + events_full: `CREATE TABLE IF NOT EXISTS events_full ( + timestamp TIMESTAMP WITH TIME ZONE, + event_type VARCHAR, + error_id VARCHAR, + order_hash CHAR(66), + maker CHAR(42), + maker_amount NUMERIC(78), + maker_fee NUMERIC(78), + maker_token CHAR(42), + taker CHAR(42), + taker_amount NUMERIC(78), + taker_fee NUMERIC(78), + taker_token CHAR(42), + txn_hash CHAR(66), + gas_used NUMERIC(78), + gas_price NUMERIC(78), + fee_recipient CHAR(42), + method_id CHAR(10), + salt VARCHAR, + block_number BIGINT, + log_index BIGINT, + taker_symbol VARCHAR, + taker_name VARCHAR, + taker_decimals BIGINT, + taker_usd_price NUMERIC(78), + taker_txn_usd_value NUMERIC(78), + maker_symbol VARCHAR, + maker_name VARCHAR, + maker_decimals BIGINT, + maker_usd_price NUMERIC(78), + maker_txn_usd_value NUMERIC(78), + PRIMARY KEY (txn_hash, order_hash, log_index) + )`, + events: `CREATE TABLE IF NOT EXISTS events ( + timestamp TIMESTAMP WITH TIME ZONE, + event_type VARCHAR, + error_id VARCHAR, + order_hash CHAR(66), + maker CHAR(42), + maker_amount NUMERIC(78), + maker_fee NUMERIC(78), + maker_token CHAR(42), + taker CHAR(42), + taker_amount NUMERIC(78), + taker_fee NUMERIC(78), + taker_token CHAR(42), + txn_hash CHAR(66), + gas_used NUMERIC(78), + gas_price NUMERIC(78), + fee_recipient CHAR(42), + method_id CHAR(10), + salt VARCHAR, + block_number BIGINT, + log_index BIGINT, + PRIMARY KEY (txn_hash, order_hash, log_index) + )`, + events_staging: `CREATE TABLE IF NOT EXISTS events_staging ( + timestamp TIMESTAMP WITH TIME ZONE, + event_type VARCHAR, + error_id VARCHAR, + order_hash CHAR(66), + maker CHAR(42), + maker_amount NUMERIC(78), + maker_fee NUMERIC(78), + maker_token CHAR(42), + taker CHAR(42), + taker_amount NUMERIC(78), + taker_fee NUMERIC(78), + taker_token CHAR(42), + txn_hash CHAR(66), + fee_recipient CHAR(42), + block_number BIGINT, + log_index BIGINT, + PRIMARY KEY (txn_hash, order_hash, log_index) + )`, + events_raw: `CREATE TABLE IF NOT EXISTS events_raw ( + event_type VARCHAR, + error_id VARCHAR, + order_hash CHAR(66), + maker CHAR(42), + maker_amount NUMERIC(78), + maker_fee NUMERIC(78), + maker_token CHAR(42), + taker CHAR(42), + taker_amount NUMERIC(78), + taker_fee NUMERIC(78), + taker_token CHAR(42), + txn_hash CHAR(66), + fee_recipient CHAR(42), + block_number BIGINT, + log_index BIGINT, + PRIMARY KEY (txn_hash, order_hash, log_index) + )`, + blocks: `CREATE TABLE IF NOT EXISTS blocks ( + timestamp TIMESTAMP WITH TIME ZONE, + block_hash CHAR(66) UNIQUE, + block_number BIGINT, + PRIMARY KEY (block_hash) + )`, + transactions: `CREATE TABLE IF NOT EXISTS transactions ( + txn_hash CHAR(66) UNIQUE, + block_hash CHAR(66), + block_number BIGINT, + gas_used NUMERIC(78), + gas_price NUMERIC(78), + method_id CHAR(10), + salt VARCHAR, + PRIMARY KEY (txn_hash) + )`, + tokens: `CREATE TABLE IF NOT EXISTS tokens ( + address CHAR(42) UNIQUE, + name VARCHAR, + symbol VARCHAR, + decimals INT, + PRIMARY KEY (address) + )`, + prices: `CREATE TABLE IF NOT EXISTS prices ( + address CHAR(42) UNIQUE, + timestamp TIMESTAMP WITH TIME ZONE, + price NUMERIC(78, 18), + PRIMARY KEY (address, timestamp) + )`, + relayers: `CREATE TABLE IF NOT EXISTS relayers ( + name VARCHAR UNIQUE, + url VARCHAR DEFAULT '', + sra_http_endpoint VARCHAR DEFAULT '', + sra_ws_endpoint VARCHAR DEFAULT '', + fee_recipient_addresses CHAR(42)[] DEFAULT '{}', + taker_addresses CHAR(42)[] DEFAULT '{}', + PRIMARY KEY(name)`, + historical_prices: `CREATE TABLE IF NOT EXISTS historical_prices ( + token VARCHAR, + base VARCHAR, + timestamp TIMESTAMP WITH TIME ZONE, + close NUMERIC(78, 18), + high NUMERIC(78, 18), + low NUMERIC(78, 18), + open NUMERIC(78, 18), + volume_from NUMERIC(78, 18), + volume_to NUMERIC(78, 18), + PRIMARY KEY (token, base, timestamp) + )`, + orders: `CREATE TABLE IF NOT EXISTS orders ( + relayer_id VARCHAR, + exchange_contract_address CHAR(42), + maker CHAR(42), + maker_amount NUMERIC(78), + maker_fee NUMERIC(78), + maker_token CHAR(42), + taker CHAR(42), + taker_amount NUMERIC(78), + taker_fee NUMERIC(78), + taker_token CHAR(42), + fee_recipient CHAR(42), + expiration_unix_timestamp_sec NUMERIC(78), + salt VARCHAR, + order_hash CHAR(66), + PRIMARY KEY (relayer_id, order_hash) + )`, +}; +function _safeQuery(query: string): any { + return new Promise((resolve, reject) => { + postgresClient + .query(query) + .then((data: any) => { + resolve(data); + }) + .catch((err: any) => { + reject(err); + }); + }); +} +export const tableScripts = { + createTable(query: string): any { + return _safeQuery(query); + }, + createAllTables(): any { + for (const tableName of tableQueries) { + _safeQuery(tableQueries[tableName]); + } + }, +}; +export const insertDataScripts = { + insertSingleRow(table: string, object: any): any { + return new Promise((resolve, reject) => { + const columns = Object.keys(object); + const safeArray: any = []; + for (const key of columns) { + if (key in object) { + if (key === 'timestamp') { + safeArray.push('to_timestamp(' + object[key] + ')'); + } else if (typeof object[key] === 'string' || object[key] instanceof String) { + safeArray.push(formatters.escapeSQLParam(object[key])); + } else { + safeArray.push(object[key]); + } + } else { + safeArray.push('default'); + } + } + const queryString = `INSERT INTO ${table} (${columns}) VALUES (${safeArray}) ON CONFLICT DO NOTHING`; + console.log(queryString); + postgresClient + .query(queryString) + .then((data: any) => { + resolve(data); + }) + .catch((err: any) => { + reject(err); + }); + }); + }, + insertMultipleRows(table: string, rows: any[], columns: any[]): any { + return new Promise((resolve, reject) => { + if (rows.length > 0) { + const rowsSplit = rows.map((value, index) => { + const safeArray: any = []; + for (const key of columns) { + if (key in value) { + if (key === 'timestamp') { + safeArray.push('to_timestamp(' + value[key] + ')'); + } else if (typeof value[key] === 'string' || value[key] instanceof String) { + safeArray.push(formatters.escapeSQLParam(value[key])); + } else if (value[key] instanceof Array) { + const escapedArray = value[key].map((subValue: string, subIndex: number) => { + return formatters.escapeSQLParam(subValue); + }); + safeArray.push('ARRAY[' + escapedArray.toString() + ']'); + } else { + safeArray.push(value[key]); + } + } else { + safeArray.push('default'); + } + } + return '(' + safeArray + ')'; + }); + const queryString = `INSERT INTO ${table} (${columns}) VALUES ${rowsSplit} ON CONFLICT DO NOTHING`; + postgresClient + .query(queryString) + .then((data: any) => { + resolve(data); + }) + .catch((err: any) => { + // console.log(err); + reject(err); + }); + } else { + resolve({}); + } + }); + }, +}; diff --git a/packages/pipeline/src/scripts/join_tables.ts b/packages/pipeline/src/scripts/join_tables.ts new file mode 100644 index 000000000..e7c05b39a --- /dev/null +++ b/packages/pipeline/src/scripts/join_tables.ts @@ -0,0 +1,234 @@ +import * as commandLineArgs from 'command-line-args'; + +import { postgresClient } from '../postgres'; +import { formatters } from '../utils'; +const optionDefinitions = [ + { name: 'name', alias: 'n', type: String }, + { name: 'from', alias: 'f', type: Number }, + { name: 'to', alias: 't', type: Number }, +]; +const cli = commandLineArgs(optionDefinitions); +const dataInsertionQueries: any = { + events_staging: `INSERT INTO events_staging ( + timestamp, + event_type, + error_id, + order_hash, + maker, + maker_amount, + maker_fee, + maker_token, + taker, + taker_amount, + taker_fee, + taker_token, + txn_hash, + fee_recipient, + block_number, + log_index + ) + (SELECT + b.timestamp, + a.event_type, + a.error_id, + a.order_hash, + a.maker, + a.maker_amount, + a.maker_fee, + a.maker_token, + a.taker, + a.taker_amount, + a.taker_fee, + a.taker_token, + a.txn_hash, + a.fee_recipient, + a.block_number, + a.log_index + FROM + events_raw a + JOIN + blocks b + ON + a.block_number = b.block_number + AND + b.block_number >= $1 + AND + b.block_number <= $2 + ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`, + events: `INSERT INTO events ( + timestamp, + event_type, + error_id, + order_hash, + maker, + maker_amount, + maker_fee, + maker_token, + taker, + taker_amount, + taker_fee, + taker_token, + txn_hash, + fee_recipient, + block_number, + log_index, + gas_used, + gas_price, + method_id, + salt + ) + (SELECT + a.timestamp, + a.event_type, + a.error_id, + a.order_hash, + a.maker, + a.maker_amount, + a.maker_fee, + a.maker_token, + a.taker, + a.taker_amount, + a.taker_fee, + a.taker_token, + a.txn_hash, + a.fee_recipient, + a.block_number, + a.log_index, + b.gas_used, + b.gas_price, + b.method_id, + b.salt + FROM + events_staging a + JOIN + transactions b + ON + a.txn_hash = b.txn_hash + AND + a.block_number >= $1 + AND + a.block_number <= $2 + ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`, + events_full: ` + INSERT INTO events_full ( + timestamp, + event_type, + error_id, + order_hash, + maker, + maker_amount, + maker_fee, + maker_token, + taker, + taker_amount, + taker_fee, + taker_token, + txn_hash, + fee_recipient, + block_number, + log_index, + gas_used, + gas_price, + method_id, + salt, + taker_symbol, + taker_name, + taker_decimals, + taker_usd_price, + taker_txn_usd_value, + maker_symbol, + maker_name, + maker_decimals, + maker_usd_price, + maker_txn_usd_value + ) + (SELECT + events.timestamp, + events.event_type, + events.error_id, + events.order_hash, + events.maker, + events.maker_amount, + events.maker_fee, + events.maker_token, + events.taker, + events.taker_amount, + events.taker_fee, + events.taker_token, + events.txn_hash, + events.fee_recipient, + events.block_number, + events.log_index, + events.gas_used, + events.gas_price, + events.method_id, + events.salt, + taker_token_prices.symbol, + taker_token_prices.name, + taker_token_prices.decimals, + taker_token_prices.price, + (events.taker_amount / (10 ^ taker_token_prices.decimals)) * taker_token_prices.price, + maker_token_prices.symbol, + maker_token_prices.name, + maker_token_prices.decimals, + maker_token_prices.price, + (events.maker_amount / (10 ^ maker_token_prices.decimals)) * maker_token_prices.price + FROM + events + LEFT JOIN + (SELECT + tokens.address, + tokens.name, + tokens.symbol, + tokens.decimals, + prices.timestamp, + prices.price + FROM + tokens + LEFT JOIN + prices + ON + tokens.symbol = prices.symbol) taker_token_prices + ON + (events.taker_token = taker_token_prices.address + AND + (DATE(events.timestamp) = DATE(taker_token_prices.timestamp) OR taker_token_prices.timestamp IS NULL)) + LEFT JOIN + (SELECT + tokens.address, + tokens.name, + tokens.symbol, + tokens.decimals, + prices.timestamp, + prices.price + FROM + tokens + LEFT JOIN + prices + ON + tokens.symbol = prices.symbol) maker_token_prices + ON + (events.maker_token = maker_token_prices.address + AND + (DATE(events.timestamp) = DATE(maker_token_prices.timestamp) OR maker_token_prices.timestamp IS NULL)) + WHERE + events.block_number >= $1 + AND + events.block_number <= $2 + ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`, +}; +if (cli.name) { + const query = dataInsertionQueries[cli.name]; + if (query && cli.from) { + const fromBlock = cli.from; + const toBlock = cli.to ? cli.to : cli.from + 1; + postgresClient + .query(query, [fromBlock, toBlock]) + .then((data: any) => { + console.log(data); + }) + .catch((err: any) => { + console.error(err); + }); + } +} diff --git a/packages/pipeline/src/scripts/query_data.ts b/packages/pipeline/src/scripts/query_data.ts new file mode 100644 index 000000000..97e3749ea --- /dev/null +++ b/packages/pipeline/src/scripts/query_data.ts @@ -0,0 +1,87 @@ +import { formatters } from '../utils'; +export const dataFetchingQueries: any = { + get_missing_txn_hashes: ` + SELECT + a.txn_hash + FROM + events_raw a + WHERE NOT EXISTS + ( + SELECT + * + FROM + transactions b + WHERE + b.txn_hash = a.txn_hash + ) + AND + a.block_number >= $1 + AND + a.block_number < $2`, + get_used_block_numbers: ` + SELECT DISTINCT + a.block_number + FROM + events_raw a + WHERE NOT EXISTS + ( + SELECT + * + FROM + blocks b + WHERE + b.block_number = a.block_number + ) + AND + a.block_number >= $1 + AND + a.block_number < $2`, + get_token_registry: ` + SELECT + * + FROM + tokens`, + get_max_block: ` + SELECT + MAX(block_number) + FROM + events_raw`, + get_relayers: ` + SELECT + * + FROM + relayers`, + get_most_recent_pricing_date: ` + SELECT + MAX(DATE(timestamp)) + FROM + prices + `, + get_top_unknown_token_addresses: ` + SELECT a.token_address as address, a.txn_value / 2 as total_txn_value +FROM +(SELECT token_address, SUM(txn_value) as txn_value +FROM +(select a.timestamp, a.maker_token as token_address, (CASE WHEN a.taker_txn_usd_value > a.maker_txn_usd_value OR a.maker_txn_usd_value IS NULL + THEN a.taker_txn_usd_value + ELSE a.maker_txn_usd_value END) as txn_value + from events_full a + where a.event_type = 'LogFill' + and a.timestamp > (NOW() + INTERVAL '-24 hours') + union + select a.timestamp, a.taker_token as token_address, (CASE WHEN a.taker_txn_usd_value > a.maker_txn_usd_value OR a.maker_txn_usd_value IS NULL + THEN a.taker_txn_usd_value + ELSE a.maker_txn_usd_value END) as txn_value + from events_full a + where a.event_type = 'LogFill' + and a.timestamp > (NOW() + INTERVAL '-24 hours')) token_txn_values +WHERE token_address IS NOT NULL +AND txn_value > 0 +GROUP BY 1 +ORDER BY 2 DESC) a +LEFT JOIN tokens b +ON a.token_address = b.address +WHERE symbol is NULL +ORDER BY 2 DESC +`, +}; diff --git a/packages/pipeline/src/scripts/scrape_data.ts b/packages/pipeline/src/scripts/scrape_data.ts new file mode 100644 index 000000000..963670b47 --- /dev/null +++ b/packages/pipeline/src/scripts/scrape_data.ts @@ -0,0 +1,649 @@ +import { ExchangeEvents, ZeroEx } from '0x.js'; +import { HttpClient, Order, OrderbookRequest, OrderbookResponse, TokenPairsItem } from '@0xproject/connect'; +import * as Airtable from 'airtable'; +import * as commandLineArgs from 'command-line-args'; +import * as _ from 'lodash'; +import * as querystring from 'querystring'; +import * as queue from 'queue'; +import * as request from 'request'; +import * as rpn from 'request-promise-native'; + +import { HttpRequestOptions } from '../../../connect/lib/src/types.js'; +import { relayer } from '../models/relayer.js'; +import { token } from '../models/tokens.js'; +import { postgresClient } from '../postgres.js'; +import { typeConverters } from '../utils.js'; +import { web3, zrx } from '../zrx.js'; + +import { insertDataScripts } from './create_tables.js'; +import { dataFetchingQueries } from './query_data.js'; +const optionDefinitions = [ + { name: 'from', alias: 'f', type: Number }, + { name: 'to', alias: 't', type: Number }, + { name: 'type', type: String }, + { name: 'id', type: String }, + { name: 'force', type: Boolean }, + { name: 'token', type: String }, +]; +const cli = commandLineArgs(optionDefinitions); +const q = queue({ concurrency: 6, autostart: true }); +const airtableBase = new Airtable({ apiKey: process.env.AIRTABLE_API_KEY }).base(process.env.AIRTABLE_0X_BASE); +const BLOCK_INCREMENTS = 1000; +const BASE_SYMBOL = 'USD'; // use USD as base currency against +const API_HIST_LIMIT = 2000; // cryptocompare API limits histoday price query to 2000 days +const SECONDS_PER_DAY = 86400; +const PRICE_API_ENDPOINT = 'https://min-api.cryptocompare.com/data/pricehistorical'; +const RELAYER_REGISTRY_JSON = 'https://raw.githubusercontent.com/0xProject/0x-relayer-registry/master/relayers.json'; +const METAMASK_ETH_CONTRACT_METADATA_JSON = + 'https://raw.githubusercontent.com/MetaMask/eth-contract-metadata/master/contract-map.json'; +const ETHPLORER_BASE_URL = 'http://api.ethplorer.io'; +const ETHPLORER_TOP_TOKENS_JSON = `${ETHPLORER_BASE_URL}/getTopTokens?apiKey=dyijm5418TjOJe34`; +// const HIST_PRICE_API_ENDPOINT = 'https://min-api.cryptocompare.com/data/histoday'; +const AIRTABLE_RELAYER_INFO = 'Relayer Info'; +export const pullDataScripts = { + getAllEvents(fromBlockNumber: number, toBlockNumber: number): any { + return new Promise((resolve, reject) => { + const getLogsPromises: any[] = []; + getLogsPromises.push( + zrx.exchange.getLogsAsync( + ExchangeEvents.LogFill, + { fromBlock: fromBlockNumber, toBlock: toBlockNumber }, + {}, + ), + zrx.exchange.getLogsAsync( + ExchangeEvents.LogCancel, + { fromBlock: fromBlockNumber, toBlock: toBlockNumber }, + {}, + ), + zrx.exchange.getLogsAsync( + ExchangeEvents.LogError, + { fromBlock: fromBlockNumber, toBlock: toBlockNumber }, + {}, + ), + ); + Promise.all(getLogsPromises) + .then((data: any[]) => { + resolve(data); + }) + .catch((err: any) => { + reject(err); + }); + }); + }, + getBlockInfo(blockNumber: number): any { + return new Promise((resolve, reject) => { + web3.eth.getBlock(blockNumber, (err, result) => { + if (err) { + reject(err); + } else { + resolve(result); + } + }); + }); + }, + getTransactionInfo(transactionHash: string): any { + return new Promise((resolve, reject) => { + web3.eth.getTransaction(transactionHash, (err, result) => { + if (err) { + reject(err); + } else { + resolve(result); + } + }); + }); + }, + getTokenRegistry(): any { + return new Promise((resolve, reject) => { + zrx.tokenRegistry + .getTokensAsync() + .then((data: any) => { + resolve(data); + }) + .catch((err: any) => { + reject(err); + }); + }); + }, + getMetaMaskTokens(): any { + return new Promise((resolve, reject) => { + request(METAMASK_ETH_CONTRACT_METADATA_JSON, (error, response, body) => { + if (error) { + reject(error); + } else { + resolve(JSON.parse(body)); + } + }); + }); + }, + getEthplorerTopTokens(): any { + return new Promise((resolve, reject) => { + request(ETHPLORER_TOP_TOKENS_JSON, (error, response, body) => { + if (error) { + reject(error); + } else { + resolve(JSON.parse(body)); + } + }); + }); + }, + getEthplorerToken(tokenAddress: string): any { + return new Promise((resolve, reject) => { + const url = `${ETHPLORER_BASE_URL}/getTokenInfo/${tokenAddress}?apiKey=dyijm5418TjOJe34`; + request(url, (error, response, body) => { + if (error) { + reject(error); + } else { + try { + const json = JSON.parse(body); + resolve(json); + } catch (err) { + resolve({ error: 'error' }); + } + } + }); + }); + }, + getPriceData(symbol: string, timestamp: number, timeDelay?: number): any { + return new Promise((resolve, reject) => { + if (symbol === 'WETH') { + symbol = 'ETH'; + } + let parsedParams = querystring.stringify({ + fsym: symbol, + tsyms: 'USD', + ts: timestamp / 1000, + }); + console.debug(parsedParams); + setTimeout(() => { + request(PRICE_API_ENDPOINT + '?' + parsedParams, (error, response, body) => { + if (error) { + reject(error); + } else { + resolve(JSON.parse(body)); + } + }); + }, timeDelay); + }); + }, + getRelayers(): any { + return new Promise((resolve, reject) => { + request(RELAYER_REGISTRY_JSON, (error, response, body) => { + if (error) { + reject(error); + } else { + resolve(JSON.parse(body)); + } + }); + }); + }, + async getOrderBook(sraEndpoint: string): Promise { + const relayerClient = new HttpClient(sraEndpoint); + const tokenResponse: TokenPairsItem[] = await relayerClient.getTokenPairsAsync(); + const fullOrderBook: OrderbookResponse[] = []; + for (const tokenPair of tokenResponse) { + const orderBookRequest: OrderbookRequest = { + baseTokenAddress: tokenPair.tokenA.address, + quoteTokenAddress: tokenPair.tokenB.address, + }; + const orderBook: OrderbookResponse = await relayerClient.getOrderbookAsync(orderBookRequest); + fullOrderBook.push(orderBook); + } + return fullOrderBook; + }, + // async getHistoricalPrices( + // fromSymbol: string, + // toSymbol: string, + // fromTimestamp: number, + // toTimestamp: number, + // ): Promise { + // const daysInQueryPeriod = Math.round((toTimestamp - fromTimestamp) / (SECONDS_PER_DAY)); + // if(fromSymbol === 'WETH') { + // fromSymbol = 'ETH'; + // } + // var parsedParams = { + // fsym: fromSymbol, + // tsym: toSymbol, + // limit: Math.min(daysInQueryPeriod, API_HIST_LIMIT), + // toTs: toTimestamp, + // }; + // var options = { + // uri: HIST_PRICE_API_ENDPOINT, + // qs: parsedParams, + // json: false, + // }; + // try { + // const response = await rpn(options); + // return Promise.resolve(JSON.parse(response)); + // } catch (error) { + // console.debug(error); + // return Promise.reject(error); + // } + // }, +}; +export const scrapeDataScripts = { + scrapeAllPricesToDB(fromTime: number, toTime: number) { + const fromDate = new Date(fromTime); + fromDate.setUTCHours(0); + fromDate.setUTCMinutes(0); + fromDate.setUTCSeconds(0); + fromDate.setUTCMilliseconds(0); + const toDate = new Date(toTime); + postgresClient + .query(dataFetchingQueries.get_token_registry, []) + .then((result: any) => { + for (const curDate = fromDate; curDate < toDate; curDate.setDate(curDate.getDate() + 1)) { + for (const token of Object.values(result.rows)) { + console.debug('Scraping ' + curDate + ' ' + token); + q.push(_scrapePriceToDB(curDate.getTime(), token, 500)); + } + } + }) + .catch((err: any) => { + console.debug(err); + }); + }, +}; +function _scrapeEventsToDB(fromBlock: number, toBlock: number): any { + return (cb: () => void) => { + pullDataScripts + .getAllEvents(fromBlock, toBlock) + .then((data: any) => { + const parsedEvents: any = {}; + parsedEvents[ExchangeEvents.LogFill] = []; + parsedEvents[ExchangeEvents.LogCancel] = []; + parsedEvents[ExchangeEvents.LogError] = []; + for (const index in data) { + for (const datum of data[index]) { + const event = typeConverters.convertLogEventToEventObject(datum); + parsedEvents[event.event_type].push(event); + } + } + console.log(fromBlock + ' : ' + toBlock + ' ' + parsedEvents[ExchangeEvents.LogFill].length); + for (const event_type in parsedEvents) { + if (parsedEvents[event_type].length > 0) { + insertDataScripts + .insertMultipleRows( + 'events_raw', + parsedEvents[event_type], + Object.keys(parsedEvents[event_type][0]), + ) + .then(() => {}) + .catch((error: any) => {}); + } + } + cb(); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapeBlockToDB(block: number): any { + return (cb: () => void) => { + pullDataScripts + .getBlockInfo(block) + .then((data: any) => { + const parsedBlock = typeConverters.convertLogBlockToBlockObject(data); + insertDataScripts + .insertSingleRow('blocks', parsedBlock) + .then((result: any) => { + cb(); + }) + .catch((err: any) => { + cb(); + }); + }) + .catch((err: any) => { + cb(); + }); + }; +} +// function _scrapeAllRelayersToDB(): any { +// return (cb: () => void) => { +// airtableBase(AIRTABLE_RELAYER_INFO) +// .select() +// .eachPage((records: any, fetchNextPage: () => void) => { +// const parsedRelayers: any[] = []; +// for(const record of records) { +// parsedRelayers.push(typeConverters.convertRelayerToRelayerObject(record)); +// } +// insertDataScripts.insertMultipleRows('relayers', parsedRelayers, Object.keys(parsedRelayers[0])) +// .then((result: any) => { +// cb(); +// }) +// .catch((err: any) => { +// cb(); +// }); +// }) +// .catch((err: any) => { +// cb(); +// }); +// }; +// } +function _scrapeAllRelayersToDB(): any { + return (cb: () => void) => { + pullDataScripts + .getRelayers() + .then((relayers: any[]) => { + console.log(relayers); + const parsedRelayers: any[] = []; + for (const relayer of relayers) { + parsedRelayers.push(typeConverters.convertRelayerToRelayerObject(relayer)); + } + console.log(parsedRelayers); + insertDataScripts + .insertMultipleRows('relayers', parsedRelayers, Object.keys(relayer.tableProperties)) + .then((result: any) => { + console.log(result); + cb(); + }) + .catch((err: any) => { + console.log(err); + cb(); + }); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapeTransactionToDB(transactionHash: string): any { + return (cb: () => void) => { + pullDataScripts + .getTransactionInfo(transactionHash) + .then((data: any) => { + const parsedTransaction = typeConverters.convertLogTransactionToTransactionObject(data); + insertDataScripts + .insertSingleRow('transactions', parsedTransaction) + .then((result: any) => { + cb(); + }) + .catch((err: any) => { + cb(); + }); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapeTokenRegistryToDB(): any { + return (cb: () => void) => { + pullDataScripts + .getTokenRegistry() + .then((data: any) => { + const parsedTokens: any = []; + for (const token of data) { + parsedTokens.push(typeConverters.convertLogTokenToTokenObject(token)); + } + insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); + cb(); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapeMetaMaskEthContractMetadataToDB(): any { + return (cb: () => void) => { + pullDataScripts + .getMetaMaskTokens() + .then((data: any) => { + const parsedTokens: any = []; + const dataArray = _.map(_.keys(data), (tokenAddress: string) => { + const value = _.get(data, tokenAddress); + return { + address: tokenAddress, + ...value, + }; + }); + const erc20TokensOnly = _.filter(dataArray, entry => { + const isErc20 = _.get(entry, 'erc20'); + return isErc20; + }); + for (const token of erc20TokensOnly) { + parsedTokens.push(typeConverters.convertMetaMaskTokenToTokenObject(token)); + } + insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); + cb(); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapeEthplorerTopTokensToDB(): any { + return (cb: () => void) => { + pullDataScripts + .getEthplorerTopTokens() + .then((data: any) => { + const parsedTokens: any = []; + const tokens = _.get(data, 'tokens'); + for (const token of tokens) { + parsedTokens.push(typeConverters.convertMetaMaskTokenToTokenObject(token)); + } + insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); + cb(); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapeUnknownTokenInformationToDB(): any { + return (cb: () => void) => { + postgresClient + .query(dataFetchingQueries.get_top_unknown_token_addresses) + .then(async (result: any) => { + const addresses = _.map(result.rows, row => _.get(row, 'address')); + const responses = await Promise.all( + _.map(addresses, address => pullDataScripts.getEthplorerToken(address)), + ); + const tokens = _.filter(responses, response => _.isUndefined(_.get(response, 'error'))); + const parsedTokens = _.map(tokens, tokenInfo => + typeConverters.convertEthplorerTokenToTokenObject(tokenInfo), + ); + insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); + cb(); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapePriceToDB(timestamp: number, token: any, timeDelay?: number): any { + return (cb: () => void) => { + pullDataScripts + .getPriceData(token.symbol, timestamp, timeDelay) + .then((data: any) => { + const safeSymbol = token.symbol === 'WETH' ? 'ETH' : token.symbol; + const parsedPrice = { + timestamp: timestamp / 1000, + symbol: token.symbol, + base: 'USD', + price: _.has(data[safeSymbol], 'USD') ? data[safeSymbol].USD : 0, + }; + console.debug('Inserting ' + timestamp); + console.debug(parsedPrice); + insertDataScripts.insertSingleRow('prices', parsedPrice); + cb(); + }) + .catch((err: any) => { + console.debug(err); + cb(); + }); + }; +} +// function _scrapeHistoricalPricesToDB(token: any, fromTimestamp: number, toTimestamp: number): any { +// return (cb: () => void) => { +// pullDataScripts +// .getHistoricalPrices(token, BASE_SYMBOL, fromTimestamp, toTimestamp) +// .then((data: any) => { +// const parsedHistoricalPrices: any = []; +// for (const historicalPrice of data['Data']) { +// const parsedHistoricalPrice = typeConverters.convertLogHistoricalPricesToHistoricalPricesObject(historicalPrice); +// parsedHistoricalPrice['token'] = token; +// parsedHistoricalPrice['base'] = BASE_SYMBOL; +// parsedHistoricalPrices.push(parsedHistoricalPrice); +// } +// if (parsedHistoricalPrices.length > 0) { +// insertDataScripts +// .insertMultipleRows( +// 'historical_prices', +// parsedHistoricalPrices, +// Object.keys(parsedHistoricalPrices[0]), +// ) +// .catch((error: any) => { +// console.error(error); +// }); +// } +// cb(); +// }) +// .catch((error: any) => { +// console.error(error); +// cb(); +// }); +// }; +// } +function _scrapeOrderBookToDB(id: string, sraEndpoint: string): any { + return (cb: () => void) => { + pullDataScripts + .getOrderBook(sraEndpoint) + .then((data: any) => { + for (const book of data) { + for (const order of book.bids) { + console.debug(order); + const parsedOrder = typeConverters.convertLogOrderToOrderObject(order); + parsedOrder.relayer_id = id; + parsedOrder.order_hash = ZeroEx.getOrderHashHex(order); + insertDataScripts.insertSingleRow('orders', parsedOrder).catch((error: any) => { + console.error(error); + }); + } + for (const order of book.asks) { + console.debug(order); + const parsedOrder = typeConverters.convertLogOrderToOrderObject(order); + parsedOrder.relayer_id = id; + parsedOrder.order_hash = ZeroEx.getOrderHashHex(order); + insertDataScripts.insertSingleRow('orders', parsedOrder).catch((error: any) => { + console.error(error); + }); + } + } + cb(); + }) + .catch((error: any) => { + console.error(error); + cb(); + }); + }; +} +if (cli.type === 'events') { + if (cli.from && cli.to) { + const destToBlock = cli.to ? cli.to : cli.from; + let curFromBlock = cli.from; + let curToBlock = curFromBlock; + do { + curToBlock += destToBlock - curToBlock < BLOCK_INCREMENTS ? destToBlock - curToBlock : BLOCK_INCREMENTS; + q.push(_scrapeEventsToDB(curFromBlock, curToBlock)); + curFromBlock = curToBlock + 1; + } while (curToBlock < destToBlock); + } +} else if (cli.type === 'blocks') { + if (cli.from && cli.to) { + if (cli.force) { + const destToBlock = cli.to ? cli.to : cli.from; + let curFromBlock = cli.from; + const curToBlock = curFromBlock; + for (; curFromBlock < destToBlock; curFromBlock++) { + q.push(_scrapeBlockToDB(curFromBlock)); + } + } else { + const fetchFrom = cli.from; + const fetchTo = cli.to ? cli.to : cli.from + 1; + postgresClient + .query(dataFetchingQueries.get_used_block_numbers, [fetchFrom, fetchTo]) + .then((data: any) => { + for (const row of data.rows) { + q.push(_scrapeBlockToDB(row.block_number)); + } + }) + .catch((err: any) => { + // console.debug(err); + }); + } + } +} else if (cli.type === 'transactions') { + if (cli.id) { + q.push(_scrapeTransactionToDB(cli.id)); + } else if (cli.from) { + const fetchFrom = cli.from; + const fetchTo = cli.to ? cli.to : cli.from + 1; + postgresClient + .query(dataFetchingQueries.get_missing_txn_hashes, [fetchFrom, fetchTo]) + .then((data: any) => { + for (const row of data.rows) { + q.push(_scrapeTransactionToDB(row.txn_hash)); + } + }) + .catch((err: any) => { + // console.debug(err); + }); + } +} else if (cli.type === 'tokens') { + q.push(_scrapeMetaMaskEthContractMetadataToDB()); + q.push(_scrapeEthplorerTopTokensToDB()); +} else if (cli.type === 'unknown_tokens') { + q.push(_scrapeUnknownTokenInformationToDB()); +} else if (cli.type === 'prices' && cli.from && cli.to) { + const fromDate = new Date(cli.from); + console.debug(fromDate); + fromDate.setUTCHours(0); + fromDate.setUTCMinutes(0); + fromDate.setUTCSeconds(0); + fromDate.setUTCMilliseconds(0); + console.debug(fromDate); + const toDate = new Date(cli.to); + postgresClient + .query(dataFetchingQueries.get_token_registry, []) + .then((result: any) => { + for (const curDate = fromDate; curDate < toDate; curDate.setDate(curDate.getDate() + 1)) { + for (const token of Object.values(result.rows)) { + console.debug('Scraping ' + curDate + ' ' + token); + q.push(_scrapePriceToDB(curDate.getTime(), token)); + } + } + }) + .catch((err: any) => { + console.debug(err); + }); + // } else if (cli.type === 'historical_prices') { + // if (cli.token && cli.from && cli.to) { + // q.push(_scrapeHistoricalPricesToDB(cli.token, cli.from, cli.to)); + // } + // } else if (cli.type === 'all_historical_prices') { + // if (cli.from && cli.to) { + // postgresClient + // .query(dataFetchingQueries.get_token_registry, []) + // .then((result: any) => { + // const curTokens: any = result.rows.map((a: any): any => a.symbol); + // for (const curToken of curTokens) { + // console.debug('Historical data backfill: Pushing coin ' + curToken); + // q.push(_scrapeHistoricalPricesToDB(curToken, cli.from, cli.to)); + // } + // }) + // .catch((err: any) => { + // console.debug(err); + // }); + // } +} else if (cli.type === 'relayers') { + q.push(_scrapeAllRelayersToDB()); +} else if (cli.type === 'orders') { + postgresClient.query(dataFetchingQueries.get_relayers, []).then((result: any) => { + for (const relayer of result.rows) { + if (relayer.sra_http_url) { + q.push(_scrapeOrderBookToDB(relayer.id, relayer.sra_http_url)); + } + } + }); +} -- cgit v1.2.3 From cd73a047efbc1b7afa884ef27a1cc4991e20efd8 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Wed, 19 Sep 2018 10:51:12 -0700 Subject: Remove old code. Create function for getting contract events via etherscan --- packages/pipeline/src/scripts/create_tables.ts | 258 ---------- packages/pipeline/src/scripts/join_tables.ts | 234 --------- packages/pipeline/src/scripts/query_data.ts | 87 ---- packages/pipeline/src/scripts/scrape_data.ts | 649 ------------------------- 4 files changed, 1228 deletions(-) delete mode 100644 packages/pipeline/src/scripts/create_tables.ts delete mode 100644 packages/pipeline/src/scripts/join_tables.ts delete mode 100644 packages/pipeline/src/scripts/query_data.ts delete mode 100644 packages/pipeline/src/scripts/scrape_data.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/create_tables.ts b/packages/pipeline/src/scripts/create_tables.ts deleted file mode 100644 index fd0d2b78b..000000000 --- a/packages/pipeline/src/scripts/create_tables.ts +++ /dev/null @@ -1,258 +0,0 @@ -import * as commandLineArgs from 'command-line-args'; - -import { postgresClient } from '../postgres'; -import { formatters } from '../utils'; -const tableQueries: any = { - events_full: `CREATE TABLE IF NOT EXISTS events_full ( - timestamp TIMESTAMP WITH TIME ZONE, - event_type VARCHAR, - error_id VARCHAR, - order_hash CHAR(66), - maker CHAR(42), - maker_amount NUMERIC(78), - maker_fee NUMERIC(78), - maker_token CHAR(42), - taker CHAR(42), - taker_amount NUMERIC(78), - taker_fee NUMERIC(78), - taker_token CHAR(42), - txn_hash CHAR(66), - gas_used NUMERIC(78), - gas_price NUMERIC(78), - fee_recipient CHAR(42), - method_id CHAR(10), - salt VARCHAR, - block_number BIGINT, - log_index BIGINT, - taker_symbol VARCHAR, - taker_name VARCHAR, - taker_decimals BIGINT, - taker_usd_price NUMERIC(78), - taker_txn_usd_value NUMERIC(78), - maker_symbol VARCHAR, - maker_name VARCHAR, - maker_decimals BIGINT, - maker_usd_price NUMERIC(78), - maker_txn_usd_value NUMERIC(78), - PRIMARY KEY (txn_hash, order_hash, log_index) - )`, - events: `CREATE TABLE IF NOT EXISTS events ( - timestamp TIMESTAMP WITH TIME ZONE, - event_type VARCHAR, - error_id VARCHAR, - order_hash CHAR(66), - maker CHAR(42), - maker_amount NUMERIC(78), - maker_fee NUMERIC(78), - maker_token CHAR(42), - taker CHAR(42), - taker_amount NUMERIC(78), - taker_fee NUMERIC(78), - taker_token CHAR(42), - txn_hash CHAR(66), - gas_used NUMERIC(78), - gas_price NUMERIC(78), - fee_recipient CHAR(42), - method_id CHAR(10), - salt VARCHAR, - block_number BIGINT, - log_index BIGINT, - PRIMARY KEY (txn_hash, order_hash, log_index) - )`, - events_staging: `CREATE TABLE IF NOT EXISTS events_staging ( - timestamp TIMESTAMP WITH TIME ZONE, - event_type VARCHAR, - error_id VARCHAR, - order_hash CHAR(66), - maker CHAR(42), - maker_amount NUMERIC(78), - maker_fee NUMERIC(78), - maker_token CHAR(42), - taker CHAR(42), - taker_amount NUMERIC(78), - taker_fee NUMERIC(78), - taker_token CHAR(42), - txn_hash CHAR(66), - fee_recipient CHAR(42), - block_number BIGINT, - log_index BIGINT, - PRIMARY KEY (txn_hash, order_hash, log_index) - )`, - events_raw: `CREATE TABLE IF NOT EXISTS events_raw ( - event_type VARCHAR, - error_id VARCHAR, - order_hash CHAR(66), - maker CHAR(42), - maker_amount NUMERIC(78), - maker_fee NUMERIC(78), - maker_token CHAR(42), - taker CHAR(42), - taker_amount NUMERIC(78), - taker_fee NUMERIC(78), - taker_token CHAR(42), - txn_hash CHAR(66), - fee_recipient CHAR(42), - block_number BIGINT, - log_index BIGINT, - PRIMARY KEY (txn_hash, order_hash, log_index) - )`, - blocks: `CREATE TABLE IF NOT EXISTS blocks ( - timestamp TIMESTAMP WITH TIME ZONE, - block_hash CHAR(66) UNIQUE, - block_number BIGINT, - PRIMARY KEY (block_hash) - )`, - transactions: `CREATE TABLE IF NOT EXISTS transactions ( - txn_hash CHAR(66) UNIQUE, - block_hash CHAR(66), - block_number BIGINT, - gas_used NUMERIC(78), - gas_price NUMERIC(78), - method_id CHAR(10), - salt VARCHAR, - PRIMARY KEY (txn_hash) - )`, - tokens: `CREATE TABLE IF NOT EXISTS tokens ( - address CHAR(42) UNIQUE, - name VARCHAR, - symbol VARCHAR, - decimals INT, - PRIMARY KEY (address) - )`, - prices: `CREATE TABLE IF NOT EXISTS prices ( - address CHAR(42) UNIQUE, - timestamp TIMESTAMP WITH TIME ZONE, - price NUMERIC(78, 18), - PRIMARY KEY (address, timestamp) - )`, - relayers: `CREATE TABLE IF NOT EXISTS relayers ( - name VARCHAR UNIQUE, - url VARCHAR DEFAULT '', - sra_http_endpoint VARCHAR DEFAULT '', - sra_ws_endpoint VARCHAR DEFAULT '', - fee_recipient_addresses CHAR(42)[] DEFAULT '{}', - taker_addresses CHAR(42)[] DEFAULT '{}', - PRIMARY KEY(name)`, - historical_prices: `CREATE TABLE IF NOT EXISTS historical_prices ( - token VARCHAR, - base VARCHAR, - timestamp TIMESTAMP WITH TIME ZONE, - close NUMERIC(78, 18), - high NUMERIC(78, 18), - low NUMERIC(78, 18), - open NUMERIC(78, 18), - volume_from NUMERIC(78, 18), - volume_to NUMERIC(78, 18), - PRIMARY KEY (token, base, timestamp) - )`, - orders: `CREATE TABLE IF NOT EXISTS orders ( - relayer_id VARCHAR, - exchange_contract_address CHAR(42), - maker CHAR(42), - maker_amount NUMERIC(78), - maker_fee NUMERIC(78), - maker_token CHAR(42), - taker CHAR(42), - taker_amount NUMERIC(78), - taker_fee NUMERIC(78), - taker_token CHAR(42), - fee_recipient CHAR(42), - expiration_unix_timestamp_sec NUMERIC(78), - salt VARCHAR, - order_hash CHAR(66), - PRIMARY KEY (relayer_id, order_hash) - )`, -}; -function _safeQuery(query: string): any { - return new Promise((resolve, reject) => { - postgresClient - .query(query) - .then((data: any) => { - resolve(data); - }) - .catch((err: any) => { - reject(err); - }); - }); -} -export const tableScripts = { - createTable(query: string): any { - return _safeQuery(query); - }, - createAllTables(): any { - for (const tableName of tableQueries) { - _safeQuery(tableQueries[tableName]); - } - }, -}; -export const insertDataScripts = { - insertSingleRow(table: string, object: any): any { - return new Promise((resolve, reject) => { - const columns = Object.keys(object); - const safeArray: any = []; - for (const key of columns) { - if (key in object) { - if (key === 'timestamp') { - safeArray.push('to_timestamp(' + object[key] + ')'); - } else if (typeof object[key] === 'string' || object[key] instanceof String) { - safeArray.push(formatters.escapeSQLParam(object[key])); - } else { - safeArray.push(object[key]); - } - } else { - safeArray.push('default'); - } - } - const queryString = `INSERT INTO ${table} (${columns}) VALUES (${safeArray}) ON CONFLICT DO NOTHING`; - console.log(queryString); - postgresClient - .query(queryString) - .then((data: any) => { - resolve(data); - }) - .catch((err: any) => { - reject(err); - }); - }); - }, - insertMultipleRows(table: string, rows: any[], columns: any[]): any { - return new Promise((resolve, reject) => { - if (rows.length > 0) { - const rowsSplit = rows.map((value, index) => { - const safeArray: any = []; - for (const key of columns) { - if (key in value) { - if (key === 'timestamp') { - safeArray.push('to_timestamp(' + value[key] + ')'); - } else if (typeof value[key] === 'string' || value[key] instanceof String) { - safeArray.push(formatters.escapeSQLParam(value[key])); - } else if (value[key] instanceof Array) { - const escapedArray = value[key].map((subValue: string, subIndex: number) => { - return formatters.escapeSQLParam(subValue); - }); - safeArray.push('ARRAY[' + escapedArray.toString() + ']'); - } else { - safeArray.push(value[key]); - } - } else { - safeArray.push('default'); - } - } - return '(' + safeArray + ')'; - }); - const queryString = `INSERT INTO ${table} (${columns}) VALUES ${rowsSplit} ON CONFLICT DO NOTHING`; - postgresClient - .query(queryString) - .then((data: any) => { - resolve(data); - }) - .catch((err: any) => { - // console.log(err); - reject(err); - }); - } else { - resolve({}); - } - }); - }, -}; diff --git a/packages/pipeline/src/scripts/join_tables.ts b/packages/pipeline/src/scripts/join_tables.ts deleted file mode 100644 index e7c05b39a..000000000 --- a/packages/pipeline/src/scripts/join_tables.ts +++ /dev/null @@ -1,234 +0,0 @@ -import * as commandLineArgs from 'command-line-args'; - -import { postgresClient } from '../postgres'; -import { formatters } from '../utils'; -const optionDefinitions = [ - { name: 'name', alias: 'n', type: String }, - { name: 'from', alias: 'f', type: Number }, - { name: 'to', alias: 't', type: Number }, -]; -const cli = commandLineArgs(optionDefinitions); -const dataInsertionQueries: any = { - events_staging: `INSERT INTO events_staging ( - timestamp, - event_type, - error_id, - order_hash, - maker, - maker_amount, - maker_fee, - maker_token, - taker, - taker_amount, - taker_fee, - taker_token, - txn_hash, - fee_recipient, - block_number, - log_index - ) - (SELECT - b.timestamp, - a.event_type, - a.error_id, - a.order_hash, - a.maker, - a.maker_amount, - a.maker_fee, - a.maker_token, - a.taker, - a.taker_amount, - a.taker_fee, - a.taker_token, - a.txn_hash, - a.fee_recipient, - a.block_number, - a.log_index - FROM - events_raw a - JOIN - blocks b - ON - a.block_number = b.block_number - AND - b.block_number >= $1 - AND - b.block_number <= $2 - ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`, - events: `INSERT INTO events ( - timestamp, - event_type, - error_id, - order_hash, - maker, - maker_amount, - maker_fee, - maker_token, - taker, - taker_amount, - taker_fee, - taker_token, - txn_hash, - fee_recipient, - block_number, - log_index, - gas_used, - gas_price, - method_id, - salt - ) - (SELECT - a.timestamp, - a.event_type, - a.error_id, - a.order_hash, - a.maker, - a.maker_amount, - a.maker_fee, - a.maker_token, - a.taker, - a.taker_amount, - a.taker_fee, - a.taker_token, - a.txn_hash, - a.fee_recipient, - a.block_number, - a.log_index, - b.gas_used, - b.gas_price, - b.method_id, - b.salt - FROM - events_staging a - JOIN - transactions b - ON - a.txn_hash = b.txn_hash - AND - a.block_number >= $1 - AND - a.block_number <= $2 - ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`, - events_full: ` - INSERT INTO events_full ( - timestamp, - event_type, - error_id, - order_hash, - maker, - maker_amount, - maker_fee, - maker_token, - taker, - taker_amount, - taker_fee, - taker_token, - txn_hash, - fee_recipient, - block_number, - log_index, - gas_used, - gas_price, - method_id, - salt, - taker_symbol, - taker_name, - taker_decimals, - taker_usd_price, - taker_txn_usd_value, - maker_symbol, - maker_name, - maker_decimals, - maker_usd_price, - maker_txn_usd_value - ) - (SELECT - events.timestamp, - events.event_type, - events.error_id, - events.order_hash, - events.maker, - events.maker_amount, - events.maker_fee, - events.maker_token, - events.taker, - events.taker_amount, - events.taker_fee, - events.taker_token, - events.txn_hash, - events.fee_recipient, - events.block_number, - events.log_index, - events.gas_used, - events.gas_price, - events.method_id, - events.salt, - taker_token_prices.symbol, - taker_token_prices.name, - taker_token_prices.decimals, - taker_token_prices.price, - (events.taker_amount / (10 ^ taker_token_prices.decimals)) * taker_token_prices.price, - maker_token_prices.symbol, - maker_token_prices.name, - maker_token_prices.decimals, - maker_token_prices.price, - (events.maker_amount / (10 ^ maker_token_prices.decimals)) * maker_token_prices.price - FROM - events - LEFT JOIN - (SELECT - tokens.address, - tokens.name, - tokens.symbol, - tokens.decimals, - prices.timestamp, - prices.price - FROM - tokens - LEFT JOIN - prices - ON - tokens.symbol = prices.symbol) taker_token_prices - ON - (events.taker_token = taker_token_prices.address - AND - (DATE(events.timestamp) = DATE(taker_token_prices.timestamp) OR taker_token_prices.timestamp IS NULL)) - LEFT JOIN - (SELECT - tokens.address, - tokens.name, - tokens.symbol, - tokens.decimals, - prices.timestamp, - prices.price - FROM - tokens - LEFT JOIN - prices - ON - tokens.symbol = prices.symbol) maker_token_prices - ON - (events.maker_token = maker_token_prices.address - AND - (DATE(events.timestamp) = DATE(maker_token_prices.timestamp) OR maker_token_prices.timestamp IS NULL)) - WHERE - events.block_number >= $1 - AND - events.block_number <= $2 - ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`, -}; -if (cli.name) { - const query = dataInsertionQueries[cli.name]; - if (query && cli.from) { - const fromBlock = cli.from; - const toBlock = cli.to ? cli.to : cli.from + 1; - postgresClient - .query(query, [fromBlock, toBlock]) - .then((data: any) => { - console.log(data); - }) - .catch((err: any) => { - console.error(err); - }); - } -} diff --git a/packages/pipeline/src/scripts/query_data.ts b/packages/pipeline/src/scripts/query_data.ts deleted file mode 100644 index 97e3749ea..000000000 --- a/packages/pipeline/src/scripts/query_data.ts +++ /dev/null @@ -1,87 +0,0 @@ -import { formatters } from '../utils'; -export const dataFetchingQueries: any = { - get_missing_txn_hashes: ` - SELECT - a.txn_hash - FROM - events_raw a - WHERE NOT EXISTS - ( - SELECT - * - FROM - transactions b - WHERE - b.txn_hash = a.txn_hash - ) - AND - a.block_number >= $1 - AND - a.block_number < $2`, - get_used_block_numbers: ` - SELECT DISTINCT - a.block_number - FROM - events_raw a - WHERE NOT EXISTS - ( - SELECT - * - FROM - blocks b - WHERE - b.block_number = a.block_number - ) - AND - a.block_number >= $1 - AND - a.block_number < $2`, - get_token_registry: ` - SELECT - * - FROM - tokens`, - get_max_block: ` - SELECT - MAX(block_number) - FROM - events_raw`, - get_relayers: ` - SELECT - * - FROM - relayers`, - get_most_recent_pricing_date: ` - SELECT - MAX(DATE(timestamp)) - FROM - prices - `, - get_top_unknown_token_addresses: ` - SELECT a.token_address as address, a.txn_value / 2 as total_txn_value -FROM -(SELECT token_address, SUM(txn_value) as txn_value -FROM -(select a.timestamp, a.maker_token as token_address, (CASE WHEN a.taker_txn_usd_value > a.maker_txn_usd_value OR a.maker_txn_usd_value IS NULL - THEN a.taker_txn_usd_value - ELSE a.maker_txn_usd_value END) as txn_value - from events_full a - where a.event_type = 'LogFill' - and a.timestamp > (NOW() + INTERVAL '-24 hours') - union - select a.timestamp, a.taker_token as token_address, (CASE WHEN a.taker_txn_usd_value > a.maker_txn_usd_value OR a.maker_txn_usd_value IS NULL - THEN a.taker_txn_usd_value - ELSE a.maker_txn_usd_value END) as txn_value - from events_full a - where a.event_type = 'LogFill' - and a.timestamp > (NOW() + INTERVAL '-24 hours')) token_txn_values -WHERE token_address IS NOT NULL -AND txn_value > 0 -GROUP BY 1 -ORDER BY 2 DESC) a -LEFT JOIN tokens b -ON a.token_address = b.address -WHERE symbol is NULL -ORDER BY 2 DESC -`, -}; diff --git a/packages/pipeline/src/scripts/scrape_data.ts b/packages/pipeline/src/scripts/scrape_data.ts deleted file mode 100644 index 963670b47..000000000 --- a/packages/pipeline/src/scripts/scrape_data.ts +++ /dev/null @@ -1,649 +0,0 @@ -import { ExchangeEvents, ZeroEx } from '0x.js'; -import { HttpClient, Order, OrderbookRequest, OrderbookResponse, TokenPairsItem } from '@0xproject/connect'; -import * as Airtable from 'airtable'; -import * as commandLineArgs from 'command-line-args'; -import * as _ from 'lodash'; -import * as querystring from 'querystring'; -import * as queue from 'queue'; -import * as request from 'request'; -import * as rpn from 'request-promise-native'; - -import { HttpRequestOptions } from '../../../connect/lib/src/types.js'; -import { relayer } from '../models/relayer.js'; -import { token } from '../models/tokens.js'; -import { postgresClient } from '../postgres.js'; -import { typeConverters } from '../utils.js'; -import { web3, zrx } from '../zrx.js'; - -import { insertDataScripts } from './create_tables.js'; -import { dataFetchingQueries } from './query_data.js'; -const optionDefinitions = [ - { name: 'from', alias: 'f', type: Number }, - { name: 'to', alias: 't', type: Number }, - { name: 'type', type: String }, - { name: 'id', type: String }, - { name: 'force', type: Boolean }, - { name: 'token', type: String }, -]; -const cli = commandLineArgs(optionDefinitions); -const q = queue({ concurrency: 6, autostart: true }); -const airtableBase = new Airtable({ apiKey: process.env.AIRTABLE_API_KEY }).base(process.env.AIRTABLE_0X_BASE); -const BLOCK_INCREMENTS = 1000; -const BASE_SYMBOL = 'USD'; // use USD as base currency against -const API_HIST_LIMIT = 2000; // cryptocompare API limits histoday price query to 2000 days -const SECONDS_PER_DAY = 86400; -const PRICE_API_ENDPOINT = 'https://min-api.cryptocompare.com/data/pricehistorical'; -const RELAYER_REGISTRY_JSON = 'https://raw.githubusercontent.com/0xProject/0x-relayer-registry/master/relayers.json'; -const METAMASK_ETH_CONTRACT_METADATA_JSON = - 'https://raw.githubusercontent.com/MetaMask/eth-contract-metadata/master/contract-map.json'; -const ETHPLORER_BASE_URL = 'http://api.ethplorer.io'; -const ETHPLORER_TOP_TOKENS_JSON = `${ETHPLORER_BASE_URL}/getTopTokens?apiKey=dyijm5418TjOJe34`; -// const HIST_PRICE_API_ENDPOINT = 'https://min-api.cryptocompare.com/data/histoday'; -const AIRTABLE_RELAYER_INFO = 'Relayer Info'; -export const pullDataScripts = { - getAllEvents(fromBlockNumber: number, toBlockNumber: number): any { - return new Promise((resolve, reject) => { - const getLogsPromises: any[] = []; - getLogsPromises.push( - zrx.exchange.getLogsAsync( - ExchangeEvents.LogFill, - { fromBlock: fromBlockNumber, toBlock: toBlockNumber }, - {}, - ), - zrx.exchange.getLogsAsync( - ExchangeEvents.LogCancel, - { fromBlock: fromBlockNumber, toBlock: toBlockNumber }, - {}, - ), - zrx.exchange.getLogsAsync( - ExchangeEvents.LogError, - { fromBlock: fromBlockNumber, toBlock: toBlockNumber }, - {}, - ), - ); - Promise.all(getLogsPromises) - .then((data: any[]) => { - resolve(data); - }) - .catch((err: any) => { - reject(err); - }); - }); - }, - getBlockInfo(blockNumber: number): any { - return new Promise((resolve, reject) => { - web3.eth.getBlock(blockNumber, (err, result) => { - if (err) { - reject(err); - } else { - resolve(result); - } - }); - }); - }, - getTransactionInfo(transactionHash: string): any { - return new Promise((resolve, reject) => { - web3.eth.getTransaction(transactionHash, (err, result) => { - if (err) { - reject(err); - } else { - resolve(result); - } - }); - }); - }, - getTokenRegistry(): any { - return new Promise((resolve, reject) => { - zrx.tokenRegistry - .getTokensAsync() - .then((data: any) => { - resolve(data); - }) - .catch((err: any) => { - reject(err); - }); - }); - }, - getMetaMaskTokens(): any { - return new Promise((resolve, reject) => { - request(METAMASK_ETH_CONTRACT_METADATA_JSON, (error, response, body) => { - if (error) { - reject(error); - } else { - resolve(JSON.parse(body)); - } - }); - }); - }, - getEthplorerTopTokens(): any { - return new Promise((resolve, reject) => { - request(ETHPLORER_TOP_TOKENS_JSON, (error, response, body) => { - if (error) { - reject(error); - } else { - resolve(JSON.parse(body)); - } - }); - }); - }, - getEthplorerToken(tokenAddress: string): any { - return new Promise((resolve, reject) => { - const url = `${ETHPLORER_BASE_URL}/getTokenInfo/${tokenAddress}?apiKey=dyijm5418TjOJe34`; - request(url, (error, response, body) => { - if (error) { - reject(error); - } else { - try { - const json = JSON.parse(body); - resolve(json); - } catch (err) { - resolve({ error: 'error' }); - } - } - }); - }); - }, - getPriceData(symbol: string, timestamp: number, timeDelay?: number): any { - return new Promise((resolve, reject) => { - if (symbol === 'WETH') { - symbol = 'ETH'; - } - let parsedParams = querystring.stringify({ - fsym: symbol, - tsyms: 'USD', - ts: timestamp / 1000, - }); - console.debug(parsedParams); - setTimeout(() => { - request(PRICE_API_ENDPOINT + '?' + parsedParams, (error, response, body) => { - if (error) { - reject(error); - } else { - resolve(JSON.parse(body)); - } - }); - }, timeDelay); - }); - }, - getRelayers(): any { - return new Promise((resolve, reject) => { - request(RELAYER_REGISTRY_JSON, (error, response, body) => { - if (error) { - reject(error); - } else { - resolve(JSON.parse(body)); - } - }); - }); - }, - async getOrderBook(sraEndpoint: string): Promise { - const relayerClient = new HttpClient(sraEndpoint); - const tokenResponse: TokenPairsItem[] = await relayerClient.getTokenPairsAsync(); - const fullOrderBook: OrderbookResponse[] = []; - for (const tokenPair of tokenResponse) { - const orderBookRequest: OrderbookRequest = { - baseTokenAddress: tokenPair.tokenA.address, - quoteTokenAddress: tokenPair.tokenB.address, - }; - const orderBook: OrderbookResponse = await relayerClient.getOrderbookAsync(orderBookRequest); - fullOrderBook.push(orderBook); - } - return fullOrderBook; - }, - // async getHistoricalPrices( - // fromSymbol: string, - // toSymbol: string, - // fromTimestamp: number, - // toTimestamp: number, - // ): Promise { - // const daysInQueryPeriod = Math.round((toTimestamp - fromTimestamp) / (SECONDS_PER_DAY)); - // if(fromSymbol === 'WETH') { - // fromSymbol = 'ETH'; - // } - // var parsedParams = { - // fsym: fromSymbol, - // tsym: toSymbol, - // limit: Math.min(daysInQueryPeriod, API_HIST_LIMIT), - // toTs: toTimestamp, - // }; - // var options = { - // uri: HIST_PRICE_API_ENDPOINT, - // qs: parsedParams, - // json: false, - // }; - // try { - // const response = await rpn(options); - // return Promise.resolve(JSON.parse(response)); - // } catch (error) { - // console.debug(error); - // return Promise.reject(error); - // } - // }, -}; -export const scrapeDataScripts = { - scrapeAllPricesToDB(fromTime: number, toTime: number) { - const fromDate = new Date(fromTime); - fromDate.setUTCHours(0); - fromDate.setUTCMinutes(0); - fromDate.setUTCSeconds(0); - fromDate.setUTCMilliseconds(0); - const toDate = new Date(toTime); - postgresClient - .query(dataFetchingQueries.get_token_registry, []) - .then((result: any) => { - for (const curDate = fromDate; curDate < toDate; curDate.setDate(curDate.getDate() + 1)) { - for (const token of Object.values(result.rows)) { - console.debug('Scraping ' + curDate + ' ' + token); - q.push(_scrapePriceToDB(curDate.getTime(), token, 500)); - } - } - }) - .catch((err: any) => { - console.debug(err); - }); - }, -}; -function _scrapeEventsToDB(fromBlock: number, toBlock: number): any { - return (cb: () => void) => { - pullDataScripts - .getAllEvents(fromBlock, toBlock) - .then((data: any) => { - const parsedEvents: any = {}; - parsedEvents[ExchangeEvents.LogFill] = []; - parsedEvents[ExchangeEvents.LogCancel] = []; - parsedEvents[ExchangeEvents.LogError] = []; - for (const index in data) { - for (const datum of data[index]) { - const event = typeConverters.convertLogEventToEventObject(datum); - parsedEvents[event.event_type].push(event); - } - } - console.log(fromBlock + ' : ' + toBlock + ' ' + parsedEvents[ExchangeEvents.LogFill].length); - for (const event_type in parsedEvents) { - if (parsedEvents[event_type].length > 0) { - insertDataScripts - .insertMultipleRows( - 'events_raw', - parsedEvents[event_type], - Object.keys(parsedEvents[event_type][0]), - ) - .then(() => {}) - .catch((error: any) => {}); - } - } - cb(); - }) - .catch((err: any) => { - cb(); - }); - }; -} -function _scrapeBlockToDB(block: number): any { - return (cb: () => void) => { - pullDataScripts - .getBlockInfo(block) - .then((data: any) => { - const parsedBlock = typeConverters.convertLogBlockToBlockObject(data); - insertDataScripts - .insertSingleRow('blocks', parsedBlock) - .then((result: any) => { - cb(); - }) - .catch((err: any) => { - cb(); - }); - }) - .catch((err: any) => { - cb(); - }); - }; -} -// function _scrapeAllRelayersToDB(): any { -// return (cb: () => void) => { -// airtableBase(AIRTABLE_RELAYER_INFO) -// .select() -// .eachPage((records: any, fetchNextPage: () => void) => { -// const parsedRelayers: any[] = []; -// for(const record of records) { -// parsedRelayers.push(typeConverters.convertRelayerToRelayerObject(record)); -// } -// insertDataScripts.insertMultipleRows('relayers', parsedRelayers, Object.keys(parsedRelayers[0])) -// .then((result: any) => { -// cb(); -// }) -// .catch((err: any) => { -// cb(); -// }); -// }) -// .catch((err: any) => { -// cb(); -// }); -// }; -// } -function _scrapeAllRelayersToDB(): any { - return (cb: () => void) => { - pullDataScripts - .getRelayers() - .then((relayers: any[]) => { - console.log(relayers); - const parsedRelayers: any[] = []; - for (const relayer of relayers) { - parsedRelayers.push(typeConverters.convertRelayerToRelayerObject(relayer)); - } - console.log(parsedRelayers); - insertDataScripts - .insertMultipleRows('relayers', parsedRelayers, Object.keys(relayer.tableProperties)) - .then((result: any) => { - console.log(result); - cb(); - }) - .catch((err: any) => { - console.log(err); - cb(); - }); - }) - .catch((err: any) => { - cb(); - }); - }; -} -function _scrapeTransactionToDB(transactionHash: string): any { - return (cb: () => void) => { - pullDataScripts - .getTransactionInfo(transactionHash) - .then((data: any) => { - const parsedTransaction = typeConverters.convertLogTransactionToTransactionObject(data); - insertDataScripts - .insertSingleRow('transactions', parsedTransaction) - .then((result: any) => { - cb(); - }) - .catch((err: any) => { - cb(); - }); - }) - .catch((err: any) => { - cb(); - }); - }; -} -function _scrapeTokenRegistryToDB(): any { - return (cb: () => void) => { - pullDataScripts - .getTokenRegistry() - .then((data: any) => { - const parsedTokens: any = []; - for (const token of data) { - parsedTokens.push(typeConverters.convertLogTokenToTokenObject(token)); - } - insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); - cb(); - }) - .catch((err: any) => { - cb(); - }); - }; -} -function _scrapeMetaMaskEthContractMetadataToDB(): any { - return (cb: () => void) => { - pullDataScripts - .getMetaMaskTokens() - .then((data: any) => { - const parsedTokens: any = []; - const dataArray = _.map(_.keys(data), (tokenAddress: string) => { - const value = _.get(data, tokenAddress); - return { - address: tokenAddress, - ...value, - }; - }); - const erc20TokensOnly = _.filter(dataArray, entry => { - const isErc20 = _.get(entry, 'erc20'); - return isErc20; - }); - for (const token of erc20TokensOnly) { - parsedTokens.push(typeConverters.convertMetaMaskTokenToTokenObject(token)); - } - insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); - cb(); - }) - .catch((err: any) => { - cb(); - }); - }; -} -function _scrapeEthplorerTopTokensToDB(): any { - return (cb: () => void) => { - pullDataScripts - .getEthplorerTopTokens() - .then((data: any) => { - const parsedTokens: any = []; - const tokens = _.get(data, 'tokens'); - for (const token of tokens) { - parsedTokens.push(typeConverters.convertMetaMaskTokenToTokenObject(token)); - } - insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); - cb(); - }) - .catch((err: any) => { - cb(); - }); - }; -} -function _scrapeUnknownTokenInformationToDB(): any { - return (cb: () => void) => { - postgresClient - .query(dataFetchingQueries.get_top_unknown_token_addresses) - .then(async (result: any) => { - const addresses = _.map(result.rows, row => _.get(row, 'address')); - const responses = await Promise.all( - _.map(addresses, address => pullDataScripts.getEthplorerToken(address)), - ); - const tokens = _.filter(responses, response => _.isUndefined(_.get(response, 'error'))); - const parsedTokens = _.map(tokens, tokenInfo => - typeConverters.convertEthplorerTokenToTokenObject(tokenInfo), - ); - insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); - cb(); - }) - .catch((err: any) => { - cb(); - }); - }; -} -function _scrapePriceToDB(timestamp: number, token: any, timeDelay?: number): any { - return (cb: () => void) => { - pullDataScripts - .getPriceData(token.symbol, timestamp, timeDelay) - .then((data: any) => { - const safeSymbol = token.symbol === 'WETH' ? 'ETH' : token.symbol; - const parsedPrice = { - timestamp: timestamp / 1000, - symbol: token.symbol, - base: 'USD', - price: _.has(data[safeSymbol], 'USD') ? data[safeSymbol].USD : 0, - }; - console.debug('Inserting ' + timestamp); - console.debug(parsedPrice); - insertDataScripts.insertSingleRow('prices', parsedPrice); - cb(); - }) - .catch((err: any) => { - console.debug(err); - cb(); - }); - }; -} -// function _scrapeHistoricalPricesToDB(token: any, fromTimestamp: number, toTimestamp: number): any { -// return (cb: () => void) => { -// pullDataScripts -// .getHistoricalPrices(token, BASE_SYMBOL, fromTimestamp, toTimestamp) -// .then((data: any) => { -// const parsedHistoricalPrices: any = []; -// for (const historicalPrice of data['Data']) { -// const parsedHistoricalPrice = typeConverters.convertLogHistoricalPricesToHistoricalPricesObject(historicalPrice); -// parsedHistoricalPrice['token'] = token; -// parsedHistoricalPrice['base'] = BASE_SYMBOL; -// parsedHistoricalPrices.push(parsedHistoricalPrice); -// } -// if (parsedHistoricalPrices.length > 0) { -// insertDataScripts -// .insertMultipleRows( -// 'historical_prices', -// parsedHistoricalPrices, -// Object.keys(parsedHistoricalPrices[0]), -// ) -// .catch((error: any) => { -// console.error(error); -// }); -// } -// cb(); -// }) -// .catch((error: any) => { -// console.error(error); -// cb(); -// }); -// }; -// } -function _scrapeOrderBookToDB(id: string, sraEndpoint: string): any { - return (cb: () => void) => { - pullDataScripts - .getOrderBook(sraEndpoint) - .then((data: any) => { - for (const book of data) { - for (const order of book.bids) { - console.debug(order); - const parsedOrder = typeConverters.convertLogOrderToOrderObject(order); - parsedOrder.relayer_id = id; - parsedOrder.order_hash = ZeroEx.getOrderHashHex(order); - insertDataScripts.insertSingleRow('orders', parsedOrder).catch((error: any) => { - console.error(error); - }); - } - for (const order of book.asks) { - console.debug(order); - const parsedOrder = typeConverters.convertLogOrderToOrderObject(order); - parsedOrder.relayer_id = id; - parsedOrder.order_hash = ZeroEx.getOrderHashHex(order); - insertDataScripts.insertSingleRow('orders', parsedOrder).catch((error: any) => { - console.error(error); - }); - } - } - cb(); - }) - .catch((error: any) => { - console.error(error); - cb(); - }); - }; -} -if (cli.type === 'events') { - if (cli.from && cli.to) { - const destToBlock = cli.to ? cli.to : cli.from; - let curFromBlock = cli.from; - let curToBlock = curFromBlock; - do { - curToBlock += destToBlock - curToBlock < BLOCK_INCREMENTS ? destToBlock - curToBlock : BLOCK_INCREMENTS; - q.push(_scrapeEventsToDB(curFromBlock, curToBlock)); - curFromBlock = curToBlock + 1; - } while (curToBlock < destToBlock); - } -} else if (cli.type === 'blocks') { - if (cli.from && cli.to) { - if (cli.force) { - const destToBlock = cli.to ? cli.to : cli.from; - let curFromBlock = cli.from; - const curToBlock = curFromBlock; - for (; curFromBlock < destToBlock; curFromBlock++) { - q.push(_scrapeBlockToDB(curFromBlock)); - } - } else { - const fetchFrom = cli.from; - const fetchTo = cli.to ? cli.to : cli.from + 1; - postgresClient - .query(dataFetchingQueries.get_used_block_numbers, [fetchFrom, fetchTo]) - .then((data: any) => { - for (const row of data.rows) { - q.push(_scrapeBlockToDB(row.block_number)); - } - }) - .catch((err: any) => { - // console.debug(err); - }); - } - } -} else if (cli.type === 'transactions') { - if (cli.id) { - q.push(_scrapeTransactionToDB(cli.id)); - } else if (cli.from) { - const fetchFrom = cli.from; - const fetchTo = cli.to ? cli.to : cli.from + 1; - postgresClient - .query(dataFetchingQueries.get_missing_txn_hashes, [fetchFrom, fetchTo]) - .then((data: any) => { - for (const row of data.rows) { - q.push(_scrapeTransactionToDB(row.txn_hash)); - } - }) - .catch((err: any) => { - // console.debug(err); - }); - } -} else if (cli.type === 'tokens') { - q.push(_scrapeMetaMaskEthContractMetadataToDB()); - q.push(_scrapeEthplorerTopTokensToDB()); -} else if (cli.type === 'unknown_tokens') { - q.push(_scrapeUnknownTokenInformationToDB()); -} else if (cli.type === 'prices' && cli.from && cli.to) { - const fromDate = new Date(cli.from); - console.debug(fromDate); - fromDate.setUTCHours(0); - fromDate.setUTCMinutes(0); - fromDate.setUTCSeconds(0); - fromDate.setUTCMilliseconds(0); - console.debug(fromDate); - const toDate = new Date(cli.to); - postgresClient - .query(dataFetchingQueries.get_token_registry, []) - .then((result: any) => { - for (const curDate = fromDate; curDate < toDate; curDate.setDate(curDate.getDate() + 1)) { - for (const token of Object.values(result.rows)) { - console.debug('Scraping ' + curDate + ' ' + token); - q.push(_scrapePriceToDB(curDate.getTime(), token)); - } - } - }) - .catch((err: any) => { - console.debug(err); - }); - // } else if (cli.type === 'historical_prices') { - // if (cli.token && cli.from && cli.to) { - // q.push(_scrapeHistoricalPricesToDB(cli.token, cli.from, cli.to)); - // } - // } else if (cli.type === 'all_historical_prices') { - // if (cli.from && cli.to) { - // postgresClient - // .query(dataFetchingQueries.get_token_registry, []) - // .then((result: any) => { - // const curTokens: any = result.rows.map((a: any): any => a.symbol); - // for (const curToken of curTokens) { - // console.debug('Historical data backfill: Pushing coin ' + curToken); - // q.push(_scrapeHistoricalPricesToDB(curToken, cli.from, cli.to)); - // } - // }) - // .catch((err: any) => { - // console.debug(err); - // }); - // } -} else if (cli.type === 'relayers') { - q.push(_scrapeAllRelayersToDB()); -} else if (cli.type === 'orders') { - postgresClient.query(dataFetchingQueries.get_relayers, []).then((result: any) => { - for (const relayer of result.rows) { - if (relayer.sra_http_url) { - q.push(_scrapeOrderBookToDB(relayer.id, relayer.sra_http_url)); - } - } - }); -} -- cgit v1.2.3 From 954c3b9272556723c10fd02ad6b753ac98ca47fa Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Wed, 7 Nov 2018 16:48:25 -0800 Subject: Split index.ts into multiple scripts in scripts/ and detect last known block when pulling events --- packages/pipeline/src/scripts/merge_v2_events.ts | 81 ++++++++++++++++++++++ .../pipeline/src/scripts/pull_missing_events.ts | 60 ++++++++++++++++ .../pipeline/src/scripts/update_relayer_info.ts | 31 +++++++++ 3 files changed, 172 insertions(+) create mode 100644 packages/pipeline/src/scripts/merge_v2_events.ts create mode 100644 packages/pipeline/src/scripts/pull_missing_events.ts create mode 100644 packages/pipeline/src/scripts/update_relayer_info.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/merge_v2_events.ts b/packages/pipeline/src/scripts/merge_v2_events.ts new file mode 100644 index 000000000..227ece121 --- /dev/null +++ b/packages/pipeline/src/scripts/merge_v2_events.ts @@ -0,0 +1,81 @@ +import { web3Factory } from '@0x/dev-utils'; +import 'reflect-metadata'; +import { Connection, createConnection } from 'typeorm'; + +import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; +import { ExchangeFillEvent } from '../entities/ExchangeFillEvent'; +import { deployConfig } from '../ormconfig'; +import { parseExchangeEvents } from '../parsers/events'; + +let connection: Connection; + +(async () => { + connection = await createConnection(deployConfig); + await getExchangeEventsAsync(); + await mergeExchangeEventsAsync(); + console.log('Exiting process'); + process.exit(0); +})(); + +// TODO(albrow): Separately: Errors do not appear to be handled correctly. If you use the +// wrong rpcUrl it just returns early with no error. +async function getExchangeEventsAsync(): Promise { + console.log('Getting event logs...'); + const provider = web3Factory.getRpcProvider({ + rpcUrl: 'https://mainnet.infura.io', + }); + const eventsRepository = connection.getRepository(ExchangeFillEvent); + const exchangeEvents = new ExchangeEventsSource(provider, 1); + const eventLogs = await exchangeEvents.getFillEventsAsync(); + console.log('Parsing events...'); + const events = parseExchangeEvents(eventLogs); + console.log(`Retrieved and parsed ${events.length} total events.`); + console.log('Saving events...'); + for (const event of events) { + await eventsRepository.save(event); + } + await eventsRepository.save(events); + console.log('Saved events.'); +} + +const insertEventsRawQuery = `INSERT INTO events_raw ( + event_type, + error_id, + order_hash, + maker, + maker_amount, + maker_fee, + maker_token, + taker, + taker_amount, + taker_fee, + taker_token, + txn_hash, + fee_recipient, + block_number, + log_index +) +( + SELECT + 'LogFill', + null, + "orderHash", + "makerAddress", + "makerAssetFilledAmount"::numeric(78), + "makerFeePaid"::numeric(78), + "makerTokenAddress", + "takerAddress", + "takerAssetFilledAmount"::numeric(78), + "takerFeePaid"::numeric(78), + "takerTokenAddress", + "transactionHash", + "feeRecipientAddress", + "blockNumber", + "logIndex" + FROM exchange_fill_event +) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`; + +async function mergeExchangeEventsAsync(): Promise { + console.log('Merging results into events_raw...'); + await connection.query(insertEventsRawQuery); +} diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts new file mode 100644 index 000000000..1f71722a3 --- /dev/null +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -0,0 +1,60 @@ +import { web3Factory } from '@0x/dev-utils'; +import { Web3ProviderEngine } from '@0x/subproviders'; +import R = require('ramda'); +import 'reflect-metadata'; +import { Connection, createConnection, Repository } from 'typeorm'; + +import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; +import { ExchangeFillEvent } from '../entities/ExchangeFillEvent'; +import { deployConfig } from '../ormconfig'; +import { parseExchangeEvents } from '../parsers/events'; + +const EXCHANGE_START_BLOCK = 6271590; // Block number when the Exchange contract was deployed to mainnet. +const START_BLOCK_OFFSET = 1000; // Number of blocks before the last known block to consider when updating fill events. +const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. + +let connection: Connection; + +(async () => { + connection = await createConnection(deployConfig); + const provider = web3Factory.getRpcProvider({ + rpcUrl: 'https://mainnet.infura.io', + }); + await getExchangeEventsAsync(provider); + process.exit(0); +})(); + +async function getExchangeEventsAsync(provider: Web3ProviderEngine): Promise { + console.log('Checking existing event logs...'); + const eventsRepository = connection.getRepository(ExchangeFillEvent); + const startBlock = await getStartBlockAsync(eventsRepository); + console.log(`Getting event logs starting at ${startBlock}...`); + const exchangeEvents = new ExchangeEventsSource(provider, 1); + const eventLogs = await exchangeEvents.getFillEventsAsync(startBlock); + console.log('Parsing events...'); + const events = parseExchangeEvents(eventLogs); + console.log(`Retrieved and parsed ${events.length} total events.`); + console.log('Saving events...'); + // Split the events into batches of size BATCH_SAVE_SIZE and save each batch + // in a single request. This reduces round-trip latency to the DB. We need + // to batch this way because saving an extremely large number of events in a + // single request causes problems. + for (const eventsBatch of R.splitEvery(BATCH_SAVE_SIZE, events)) { + await eventsRepository.save(eventsBatch); + } + const totalEvents = await eventsRepository.count(); + console.log(`Done saving events. There are now ${totalEvents} total events.`); +} + +async function getStartBlockAsync(eventsRepository: Repository): Promise { + const fillEventCount = await eventsRepository.count(); + if (fillEventCount === 0) { + console.log('No existing fill events found.'); + return EXCHANGE_START_BLOCK; + } + const queryResult = await connection.query( + 'SELECT "blockNumber" FROM exchange_fill_event ORDER BY "blockNumber" DESC LIMIT 1', + ); + const lastKnownBlock = queryResult[0].blockNumber; + return lastKnownBlock - START_BLOCK_OFFSET; +} diff --git a/packages/pipeline/src/scripts/update_relayer_info.ts b/packages/pipeline/src/scripts/update_relayer_info.ts new file mode 100644 index 000000000..05e045ff4 --- /dev/null +++ b/packages/pipeline/src/scripts/update_relayer_info.ts @@ -0,0 +1,31 @@ +import 'reflect-metadata'; +import { Connection, createConnection } from 'typeorm'; + +import { RelayerRegistrySource } from '../data_sources/relayer-registry'; +import { Relayer } from '../entities/Relayer'; +import { deployConfig } from '../ormconfig'; +import { parseRelayers } from '../parsers/relayer_registry'; + +// NOTE(albrow): We need to manually update this URL for now. Fix this when we +// have the relayer-registry behind semantic versioning. +const RELAYER_REGISTRY_URL = + 'https://raw.githubusercontent.com/0xProject/0x-relayer-registry/4701c85677d161ea729a466aebbc1826c6aa2c0b/relayers.json'; + +let connection: Connection; + +(async () => { + connection = await createConnection(deployConfig); + await getRelayers(); + process.exit(0); +})(); + +async function getRelayers(): Promise { + console.log('Getting latest relayer info...'); + const relayerRepository = connection.getRepository(Relayer); + const relayerSource = new RelayerRegistrySource(RELAYER_REGISTRY_URL); + const relayersResp = await relayerSource.getRelayerInfoAsync(); + const relayers = parseRelayers(relayersResp); + console.log('Saving relayer info...'); + await relayerRepository.save(relayers); + console.log('Done saving relayer info.'); +} -- cgit v1.2.3 From ccad046eb649a60fdf7319a075fa41490d593ae8 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Thu, 8 Nov 2018 10:15:09 -0800 Subject: Reorganize entities. Make scripts work from any directory. --- packages/pipeline/src/scripts/merge_v2_events.ts | 2 +- packages/pipeline/src/scripts/pull_missing_events.ts | 2 +- packages/pipeline/src/scripts/update_relayer_info.ts | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/merge_v2_events.ts b/packages/pipeline/src/scripts/merge_v2_events.ts index 227ece121..99a76aa61 100644 --- a/packages/pipeline/src/scripts/merge_v2_events.ts +++ b/packages/pipeline/src/scripts/merge_v2_events.ts @@ -3,7 +3,7 @@ import 'reflect-metadata'; import { Connection, createConnection } from 'typeorm'; import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; -import { ExchangeFillEvent } from '../entities/ExchangeFillEvent'; +import { ExchangeFillEvent } from '../entities'; import { deployConfig } from '../ormconfig'; import { parseExchangeEvents } from '../parsers/events'; diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index 1f71722a3..a108f012f 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -5,7 +5,7 @@ import 'reflect-metadata'; import { Connection, createConnection, Repository } from 'typeorm'; import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; -import { ExchangeFillEvent } from '../entities/ExchangeFillEvent'; +import { ExchangeFillEvent } from '../entities'; import { deployConfig } from '../ormconfig'; import { parseExchangeEvents } from '../parsers/events'; diff --git a/packages/pipeline/src/scripts/update_relayer_info.ts b/packages/pipeline/src/scripts/update_relayer_info.ts index 05e045ff4..f54e16b6c 100644 --- a/packages/pipeline/src/scripts/update_relayer_info.ts +++ b/packages/pipeline/src/scripts/update_relayer_info.ts @@ -2,7 +2,7 @@ import 'reflect-metadata'; import { Connection, createConnection } from 'typeorm'; import { RelayerRegistrySource } from '../data_sources/relayer-registry'; -import { Relayer } from '../entities/Relayer'; +import { Relayer } from '../entities'; import { deployConfig } from '../ormconfig'; import { parseRelayers } from '../parsers/relayer_registry'; -- cgit v1.2.3 From 410a9244952b663fa8e56cc778b712ae228bbd82 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Thu, 8 Nov 2018 11:38:37 -0800 Subject: Add better error handling for immediately invoked async functions --- packages/pipeline/src/scripts/merge_v2_events.ts | 81 ---------------------- .../pipeline/src/scripts/pull_missing_events.ts | 7 +- .../pipeline/src/scripts/update_relayer_info.ts | 3 +- 3 files changed, 6 insertions(+), 85 deletions(-) delete mode 100644 packages/pipeline/src/scripts/merge_v2_events.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/merge_v2_events.ts b/packages/pipeline/src/scripts/merge_v2_events.ts deleted file mode 100644 index 99a76aa61..000000000 --- a/packages/pipeline/src/scripts/merge_v2_events.ts +++ /dev/null @@ -1,81 +0,0 @@ -import { web3Factory } from '@0x/dev-utils'; -import 'reflect-metadata'; -import { Connection, createConnection } from 'typeorm'; - -import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; -import { ExchangeFillEvent } from '../entities'; -import { deployConfig } from '../ormconfig'; -import { parseExchangeEvents } from '../parsers/events'; - -let connection: Connection; - -(async () => { - connection = await createConnection(deployConfig); - await getExchangeEventsAsync(); - await mergeExchangeEventsAsync(); - console.log('Exiting process'); - process.exit(0); -})(); - -// TODO(albrow): Separately: Errors do not appear to be handled correctly. If you use the -// wrong rpcUrl it just returns early with no error. -async function getExchangeEventsAsync(): Promise { - console.log('Getting event logs...'); - const provider = web3Factory.getRpcProvider({ - rpcUrl: 'https://mainnet.infura.io', - }); - const eventsRepository = connection.getRepository(ExchangeFillEvent); - const exchangeEvents = new ExchangeEventsSource(provider, 1); - const eventLogs = await exchangeEvents.getFillEventsAsync(); - console.log('Parsing events...'); - const events = parseExchangeEvents(eventLogs); - console.log(`Retrieved and parsed ${events.length} total events.`); - console.log('Saving events...'); - for (const event of events) { - await eventsRepository.save(event); - } - await eventsRepository.save(events); - console.log('Saved events.'); -} - -const insertEventsRawQuery = `INSERT INTO events_raw ( - event_type, - error_id, - order_hash, - maker, - maker_amount, - maker_fee, - maker_token, - taker, - taker_amount, - taker_fee, - taker_token, - txn_hash, - fee_recipient, - block_number, - log_index -) -( - SELECT - 'LogFill', - null, - "orderHash", - "makerAddress", - "makerAssetFilledAmount"::numeric(78), - "makerFeePaid"::numeric(78), - "makerTokenAddress", - "takerAddress", - "takerAssetFilledAmount"::numeric(78), - "takerFeePaid"::numeric(78), - "takerTokenAddress", - "transactionHash", - "feeRecipientAddress", - "blockNumber", - "logIndex" - FROM exchange_fill_event -) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`; - -async function mergeExchangeEventsAsync(): Promise { - console.log('Merging results into events_raw...'); - await connection.query(insertEventsRawQuery); -} diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index a108f012f..cca0d9cfe 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -8,6 +8,7 @@ import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange import { ExchangeFillEvent } from '../entities'; import { deployConfig } from '../ormconfig'; import { parseExchangeEvents } from '../parsers/events'; +import { handleError } from '../utils'; const EXCHANGE_START_BLOCK = 6271590; // Block number when the Exchange contract was deployed to mainnet. const START_BLOCK_OFFSET = 1000; // Number of blocks before the last known block to consider when updating fill events. @@ -22,7 +23,7 @@ let connection: Connection; }); await getExchangeEventsAsync(provider); process.exit(0); -})(); +})().catch(handleError); async function getExchangeEventsAsync(provider: Web3ProviderEngine): Promise { console.log('Checking existing event logs...'); @@ -53,8 +54,8 @@ async function getStartBlockAsync(eventsRepository: Repository { console.log('Getting latest relayer info...'); -- cgit v1.2.3 From 329c68f610843ebded9ca31fc9cd6f3eed744a8e Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Mon, 12 Nov 2018 16:40:20 -0800 Subject: Configure TypeORM for migrations. Add new package.json scripts. --- packages/pipeline/src/scripts/pull_missing_events.ts | 6 +++--- packages/pipeline/src/scripts/update_relayer_info.ts | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index cca0d9cfe..e2b312280 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -2,11 +2,11 @@ import { web3Factory } from '@0x/dev-utils'; import { Web3ProviderEngine } from '@0x/subproviders'; import R = require('ramda'); import 'reflect-metadata'; -import { Connection, createConnection, Repository } from 'typeorm'; +import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; import { ExchangeFillEvent } from '../entities'; -import { deployConfig } from '../ormconfig'; +import * as ormConfig from '../ormconfig'; import { parseExchangeEvents } from '../parsers/events'; import { handleError } from '../utils'; @@ -17,7 +17,7 @@ const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. let connection: Connection; (async () => { - connection = await createConnection(deployConfig); + connection = await createConnection(ormConfig as ConnectionOptions); const provider = web3Factory.getRpcProvider({ rpcUrl: 'https://mainnet.infura.io', }); diff --git a/packages/pipeline/src/scripts/update_relayer_info.ts b/packages/pipeline/src/scripts/update_relayer_info.ts index 051289992..af9dd726e 100644 --- a/packages/pipeline/src/scripts/update_relayer_info.ts +++ b/packages/pipeline/src/scripts/update_relayer_info.ts @@ -1,9 +1,9 @@ import 'reflect-metadata'; -import { Connection, createConnection } from 'typeorm'; +import { Connection, ConnectionOptions, createConnection } from 'typeorm'; import { RelayerRegistrySource } from '../data_sources/relayer-registry'; import { Relayer } from '../entities'; -import { deployConfig } from '../ormconfig'; +import * as ormConfig from '../ormconfig'; import { parseRelayers } from '../parsers/relayer_registry'; import { handleError } from '../utils'; @@ -15,7 +15,7 @@ const RELAYER_REGISTRY_URL = let connection: Connection; (async () => { - connection = await createConnection(deployConfig); + connection = await createConnection(ormConfig as ConnectionOptions); await getRelayers(); process.exit(0); })().catch(handleError); -- cgit v1.2.3 From 688d277b30b287f66f0dbd49f2a23cab8b256219 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Mon, 12 Nov 2018 17:36:33 -0800 Subject: Configure linter with --format stylish and fix linter errors --- packages/pipeline/src/scripts/pull_missing_events.ts | 1 + packages/pipeline/src/scripts/update_relayer_info.ts | 1 + 2 files changed, 2 insertions(+) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index e2b312280..bc0eac853 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -1,3 +1,4 @@ +// tslint:disable:no-console import { web3Factory } from '@0x/dev-utils'; import { Web3ProviderEngine } from '@0x/subproviders'; import R = require('ramda'); diff --git a/packages/pipeline/src/scripts/update_relayer_info.ts b/packages/pipeline/src/scripts/update_relayer_info.ts index af9dd726e..f8918728d 100644 --- a/packages/pipeline/src/scripts/update_relayer_info.ts +++ b/packages/pipeline/src/scripts/update_relayer_info.ts @@ -1,3 +1,4 @@ +// tslint:disable:no-console import 'reflect-metadata'; import { Connection, ConnectionOptions, createConnection } from 'typeorm'; -- cgit v1.2.3 From 55bbe1954b35ff0a6367f1ff820d32a32b48eff3 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Tue, 13 Nov 2018 14:05:49 -0800 Subject: Preliminary work for adding RR order book scraping --- .../src/scripts/pull_radar_relay_orders.ts | 52 ++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 packages/pipeline/src/scripts/pull_radar_relay_orders.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts new file mode 100644 index 000000000..c4d3f7095 --- /dev/null +++ b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts @@ -0,0 +1,52 @@ +// tslint:disable:no-console +import { HttpClient } from '@0x/connect'; +import * as R from 'ramda'; +import 'reflect-metadata'; +import { Connection, ConnectionOptions, createConnection } from 'typeorm'; + +import { SraOrder } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseSraOrders } from '../parsers/sra_orders'; +import { handleError } from '../utils'; + +const RADAR_RELAY_URL = 'https://api.radarrelay.com/0x/v2'; +const BATCH_SAVE_SIZE = 1000; // Number of orders to save at once. +const ORDERS_PER_PAGE = 10000; // Number of orders to get per request. + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + await getOrderbook(); + process.exit(0); +})().catch(handleError); + +async function getOrderbook(): Promise { + console.log('Getting all orders...'); + const connectClient = new HttpClient(RADAR_RELAY_URL); + const rawOrders = await connectClient.getOrdersAsync({ + perPage: ORDERS_PER_PAGE, + }); + console.log(`Got ${rawOrders.records.length} orders.`); + console.log('Parsing orders...'); + const orders = R.pipe(parseSraOrders, R.map(setSourceUrl(RADAR_RELAY_URL)))(rawOrders); + const ordersRepository = connection.getRepository(SraOrder); + // TODO(albrow): Move batch saving to a utility function to reduce + // duplicated code. + for (const ordersBatch of R.splitEvery(BATCH_SAVE_SIZE, orders)) { + await ordersRepository.save(ordersBatch); + } +} + +const sourceUrlProp = R.lensProp('sourceUrl'); + +const setSourceUrl = R.curry((sourceURL: string, order: SraOrder): SraOrder => { + return R.set(sourceUrlProp, sourceURL, order); +}); + +const firstSeenProp = R.lensProp('firstSeenTimestamp'); +const lastUpdatedProp = R.lensProp('lastUpdatedTimestamp'); + +const setFirstSeen = R.curry((sourceURL: string, order: SraOrder): SraOrder => { + return R.set(firstSeenTimestampProp, sourceURL, order); +}); -- cgit v1.2.3 From 26280e4aba147ad6000b9df309e64db84b6932fc Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Tue, 13 Nov 2018 15:33:43 -0800 Subject: Implement scraping sra orders from radar relay --- .../src/scripts/pull_radar_relay_orders.ts | 33 +++++++++++----------- 1 file changed, 17 insertions(+), 16 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts index c4d3f7095..b3a4d887e 100644 --- a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts +++ b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts @@ -2,15 +2,14 @@ import { HttpClient } from '@0x/connect'; import * as R from 'ramda'; import 'reflect-metadata'; -import { Connection, ConnectionOptions, createConnection } from 'typeorm'; +import { Connection, ConnectionOptions, createConnection, EntityManager } from 'typeorm'; -import { SraOrder } from '../entities'; +import { createObservedTimestampForOrder, SraOrder } from '../entities'; import * as ormConfig from '../ormconfig'; import { parseSraOrders } from '../parsers/sra_orders'; import { handleError } from '../utils'; const RADAR_RELAY_URL = 'https://api.radarrelay.com/0x/v2'; -const BATCH_SAVE_SIZE = 1000; // Number of orders to save at once. const ORDERS_PER_PAGE = 10000; // Number of orders to get per request. let connection: Connection; @@ -29,24 +28,26 @@ async function getOrderbook(): Promise { }); console.log(`Got ${rawOrders.records.length} orders.`); console.log('Parsing orders...'); + // Parse the sra orders, then add source url to each. const orders = R.pipe(parseSraOrders, R.map(setSourceUrl(RADAR_RELAY_URL)))(rawOrders); - const ordersRepository = connection.getRepository(SraOrder); - // TODO(albrow): Move batch saving to a utility function to reduce - // duplicated code. - for (const ordersBatch of R.splitEvery(BATCH_SAVE_SIZE, orders)) { - await ordersRepository.save(ordersBatch); - } + // Save all the orders and update the observed time stamps in a single + // transaction. + console.log('Saving orders and updating timestamps...'); + await connection.transaction(async (manager: EntityManager): Promise => { + for (const order of orders) { + await manager.save(SraOrder, order); + const observedTimestamp = createObservedTimestampForOrder(order); + await manager.save(observedTimestamp); + } + }); } const sourceUrlProp = R.lensProp('sourceUrl'); +/** + * Sets the source url for a single order. Returns a new order instead of + * mutating the given one. + */ const setSourceUrl = R.curry((sourceURL: string, order: SraOrder): SraOrder => { return R.set(sourceUrlProp, sourceURL, order); }); - -const firstSeenProp = R.lensProp('firstSeenTimestamp'); -const lastUpdatedProp = R.lensProp('lastUpdatedTimestamp'); - -const setFirstSeen = R.curry((sourceURL: string, order: SraOrder): SraOrder => { - return R.set(firstSeenTimestampProp, sourceURL, order); -}); -- cgit v1.2.3 From 10e93bb01ffcf029821430781ef582a24901a461 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Wed, 14 Nov 2018 14:39:34 -0800 Subject: Add raw schema prefix to query in pull_missing_events --- packages/pipeline/src/scripts/pull_missing_events.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index bc0eac853..0af999a77 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -55,7 +55,7 @@ async function getStartBlockAsync(eventsRepository: Repository Date: Wed, 14 Nov 2018 15:58:36 -0800 Subject: Change some column types from varchar to numeric --- packages/pipeline/src/scripts/pull_missing_events.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index 0af999a77..1693bb59a 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -13,7 +13,7 @@ import { handleError } from '../utils'; const EXCHANGE_START_BLOCK = 6271590; // Block number when the Exchange contract was deployed to mainnet. const START_BLOCK_OFFSET = 1000; // Number of blocks before the last known block to consider when updating fill events. -const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. +const BATCH_SAVE_SIZE = 10000; // Number of events to save at once. let connection: Connection; -- cgit v1.2.3 From b0a2c10e11fa41e2800d4fe67d8008b5828128a3 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Wed, 14 Nov 2018 16:20:07 -0800 Subject: Use built-in chunk feature of TypeORM save method --- packages/pipeline/src/scripts/pull_missing_events.ts | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index 1693bb59a..b1b8665dd 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -29,6 +29,7 @@ let connection: Connection; async function getExchangeEventsAsync(provider: Web3ProviderEngine): Promise { console.log('Checking existing event logs...'); const eventsRepository = connection.getRepository(ExchangeFillEvent); + const manager = connection.createEntityManager(); const startBlock = await getStartBlockAsync(eventsRepository); console.log(`Getting event logs starting at ${startBlock}...`); const exchangeEvents = new ExchangeEventsSource(provider, 1); @@ -37,13 +38,7 @@ async function getExchangeEventsAsync(provider: Web3ProviderEngine): Promise Date: Wed, 14 Nov 2018 17:40:19 -0800 Subject: Fix chunk size in pull_missing_events --- packages/pipeline/src/scripts/pull_missing_events.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index b1b8665dd..f53bc12bd 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -38,7 +38,7 @@ async function getExchangeEventsAsync(provider: Web3ProviderEngine): Promise Date: Wed, 14 Nov 2018 18:39:06 -0800 Subject: Add workaround for broken save method --- .../pipeline/src/scripts/pull_missing_events.ts | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index f53bc12bd..2dc81f205 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -38,7 +38,27 @@ async function getExchangeEventsAsync(provider: Web3ProviderEngine): Promise Date: Thu, 15 Nov 2018 12:10:27 -0800 Subject: Optimize database operations in pull_missing_events script --- .../pipeline/src/scripts/pull_missing_events.ts | 35 ++++++++++++++++------ 1 file changed, 26 insertions(+), 9 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index 2dc81f205..bceed299c 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -8,12 +8,12 @@ import { Connection, ConnectionOptions, createConnection, Repository } from 'typ import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; import { ExchangeFillEvent } from '../entities'; import * as ormConfig from '../ormconfig'; -import { parseExchangeEvents } from '../parsers/events'; +import { ExchangeEventEntity, parseExchangeEvents } from '../parsers/events'; import { handleError } from '../utils'; const EXCHANGE_START_BLOCK = 6271590; // Block number when the Exchange contract was deployed to mainnet. -const START_BLOCK_OFFSET = 1000; // Number of blocks before the last known block to consider when updating fill events. -const BATCH_SAVE_SIZE = 10000; // Number of events to save at once. +const START_BLOCK_OFFSET = 100; // Number of blocks before the last known block to consider when updating fill events. +const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. let connection: Connection; @@ -38,17 +38,36 @@ async function getExchangeEventsAsync(provider: Web3ProviderEngine): Promise, + events: ExchangeEventEntity[], +): Promise { // Note(albrow): This is a temporary hack because `save` is not working as // documented and is causing a foreign key constraint violation. Hopefully // can remove later because this "poor man's upsert" implementation operates // on one event at a time and is therefore much slower. - // await eventsRepository.save(events, { chunk: Math.ceil(events.length / BATCH_SAVE_SIZE) }); for (const event of events) { try { - await eventsRepository.save(event); + // First try and insert. + await eventsRepository.insert(event); } catch { - // Assume this is a foreign key constraint error and try doing an - // update instead. + // If it fails, assume it was a foreign key constraint error and try + // doing an update instead. await eventsRepository.update( { contractAddress: event.contractAddress, @@ -59,8 +78,6 @@ async function getExchangeEventsAsync(provider: Web3ProviderEngine): Promise): Promise { -- cgit v1.2.3 From 24fd2d9730d58a58929f401674175ad8a5a7fbc1 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Fri, 16 Nov 2018 12:55:54 -0800 Subject: Add support for pulling Cancel and CancelUpTo events --- .../pipeline/src/scripts/pull_missing_events.ts | 123 ++++++++++++++------- 1 file changed, 81 insertions(+), 42 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index bceed299c..b2a99e3c0 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -1,14 +1,13 @@ // tslint:disable:no-console import { web3Factory } from '@0x/dev-utils'; -import { Web3ProviderEngine } from '@0x/subproviders'; import R = require('ramda'); import 'reflect-metadata'; import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; -import { ExchangeFillEvent } from '../entities'; +import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeEvent, ExchangeFillEvent } from '../entities'; import * as ormConfig from '../ormconfig'; -import { ExchangeEventEntity, parseExchangeEvents } from '../parsers/events'; +import { parseExchangeCancelEvents, parseExchangeCancelUpToEvents, parseExchangeFillEvents } from '../parsers/events'; import { handleError } from '../utils'; const EXCHANGE_START_BLOCK = 6271590; // Block number when the Exchange contract was deployed to mainnet. @@ -22,40 +21,88 @@ let connection: Connection; const provider = web3Factory.getRpcProvider({ rpcUrl: 'https://mainnet.infura.io', }); - await getExchangeEventsAsync(provider); + const eventsSource = new ExchangeEventsSource(provider, 1); + await getFillEventsAsync(eventsSource); + await getCancelEventsAsync(eventsSource); + await getCancelUpToEventsAsync(eventsSource); process.exit(0); })().catch(handleError); -async function getExchangeEventsAsync(provider: Web3ProviderEngine): Promise { - console.log('Checking existing event logs...'); - const eventsRepository = connection.getRepository(ExchangeFillEvent); - const manager = connection.createEntityManager(); - const startBlock = await getStartBlockAsync(eventsRepository); - console.log(`Getting event logs starting at ${startBlock}...`); - const exchangeEvents = new ExchangeEventsSource(provider, 1); - const eventLogs = await exchangeEvents.getFillEventsAsync(startBlock); - console.log('Parsing events...'); - const events = parseExchangeEvents(eventLogs); - console.log(`Retrieved and parsed ${events.length} total events.`); - console.log('Saving events...'); - if (startBlock === EXCHANGE_START_BLOCK) { +async function getFillEventsAsync(eventsSource: ExchangeEventsSource): Promise { + console.log('Checking existing fill events...'); + const repository = connection.getRepository(ExchangeFillEvent); + const startBlock = await getStartBlockAsync(repository); + console.log(`Getting fill events starting at ${startBlock}...`); + const eventLogs = await eventsSource.getFillEventsAsync(startBlock); + console.log('Parsing fill events...'); + const events = parseExchangeFillEvents(eventLogs); + console.log(`Retrieved and parsed ${events.length} total fill events.`); + await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); +} + +async function getCancelEventsAsync(eventsSource: ExchangeEventsSource): Promise { + console.log('Checking existing cancel events...'); + const repository = connection.getRepository(ExchangeCancelEvent); + const startBlock = await getStartBlockAsync(repository); + console.log(`Getting cancel events starting at ${startBlock}...`); + const eventLogs = await eventsSource.getCancelEventsAsync(startBlock); + console.log('Parsing cancel events...'); + const events = parseExchangeCancelEvents(eventLogs); + console.log(`Retrieved and parsed ${events.length} total cancel events.`); + await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); +} + +async function getCancelUpToEventsAsync(eventsSource: ExchangeEventsSource): Promise { + console.log('Checking existing CancelUpTo events...'); + const repository = connection.getRepository(ExchangeCancelUpToEvent); + const startBlock = await getStartBlockAsync(repository); + console.log(`Getting CancelUpTo events starting at ${startBlock}...`); + const eventLogs = await eventsSource.getCancelUpToEventsAsync(startBlock); + console.log('Parsing CancelUpTo events...'); + const events = parseExchangeCancelUpToEvents(eventLogs); + console.log(`Retrieved and parsed ${events.length} total CancelUpTo events.`); + await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); +} + +async function getStartBlockAsync(repository: Repository): Promise { + const fillEventCount = await repository.count(); + if (fillEventCount === 0) { + console.log(`No existing ${repository.metadata.name}s found.`); + return EXCHANGE_START_BLOCK; + } + const queryResult = await connection.query( + // TODO(albrow): Would prefer to use a prepared statement here to reduce + // surface area for SQL injections, but it doesn't appear to be working. + `SELECT block_number FROM raw.${repository.metadata.tableName} ORDER BY block_number DESC LIMIT 1`, + ); + const lastKnownBlock = queryResult[0].block_number; + return lastKnownBlock - START_BLOCK_OFFSET; +} + +async function saveEventsAsync( + isInitialPull: boolean, + repository: Repository, + events: T[], +): Promise { + console.log(`Saving ${repository.metadata.name}s...`); + if (isInitialPull) { // Split data into numChunks pieces of maximum size BATCH_SAVE_SIZE // each. for (const eventsBatch of R.splitEvery(BATCH_SAVE_SIZE, events)) { - await eventsRepository.insert(eventsBatch); + await repository.insert(eventsBatch); } } else { // If we possibly have some overlap where we need to update some // existing events, we need to use our workaround/fallback. - await saveIndividuallyWithFallbackAsync(eventsRepository, events); + await saveIndividuallyWithFallbackAsync(repository, events); } - const totalEvents = await eventsRepository.count(); - console.log(`Done saving events. There are now ${totalEvents} total events.`); + const totalEvents = await repository.count(); + console.log(`Done saving events. There are now ${totalEvents} total ${repository.metadata.name}s.`); } -async function saveIndividuallyWithFallbackAsync( - eventsRepository: Repository, - events: ExchangeEventEntity[], +async function saveIndividuallyWithFallbackAsync( + repository: Repository, + events: T[], ): Promise { // Note(albrow): This is a temporary hack because `save` is not working as // documented and is causing a foreign key constraint violation. Hopefully @@ -63,32 +110,24 @@ async function saveIndividuallyWithFallbackAsync( // on one event at a time and is therefore much slower. for (const event of events) { try { - // First try and insert. - await eventsRepository.insert(event); + // First try an insert. + await repository.insert(event); } catch { // If it fails, assume it was a foreign key constraint error and try // doing an update instead. - await eventsRepository.update( + // Note(albrow): Unfortunately the `as any` hack here seems + // required. I can't figure out how to convince the type-checker + // that the criteria and the entity itself are the correct type for + // the given repository. If we can remove the `save` hack then this + // will probably no longer be necessary. + await repository.update( { contractAddress: event.contractAddress, blockNumber: event.blockNumber, logIndex: event.logIndex, - }, - event, + } as any, + event as any, ); } } } - -async function getStartBlockAsync(eventsRepository: Repository): Promise { - const fillEventCount = await eventsRepository.count(); - if (fillEventCount === 0) { - console.log('No existing fill events found.'); - return EXCHANGE_START_BLOCK; - } - const queryResult = await connection.query( - 'SELECT block_number FROM raw.exchange_fill_events ORDER BY block_number DESC LIMIT 1', - ); - const lastKnownBlock = queryResult[0].block_number; - return lastKnownBlock - START_BLOCK_OFFSET; -} -- cgit v1.2.3 From 5cad2ad1744ab1c1e24ed52fc0a26ec5acf5c898 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Fri, 16 Nov 2018 13:16:17 -0800 Subject: Check for special characters in table name in pull_missing_events --- packages/pipeline/src/scripts/pull_missing_events.ts | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index b2a99e3c0..0b7f6287f 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -64,16 +64,20 @@ async function getCancelUpToEventsAsync(eventsSource: ExchangeEventsSource): Pro await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); } +const tabelNameRegex = /^[a-zA-Z_]*$/; + async function getStartBlockAsync(repository: Repository): Promise { const fillEventCount = await repository.count(); if (fillEventCount === 0) { console.log(`No existing ${repository.metadata.name}s found.`); return EXCHANGE_START_BLOCK; } + const tableName = repository.metadata.tableName; + if (!tabelNameRegex.test(tableName)) { + throw new Error('Unexpected special character in table name: ' + tableName); + } const queryResult = await connection.query( - // TODO(albrow): Would prefer to use a prepared statement here to reduce - // surface area for SQL injections, but it doesn't appear to be working. - `SELECT block_number FROM raw.${repository.metadata.tableName} ORDER BY block_number DESC LIMIT 1`, + `SELECT block_number FROM raw.${tableName} ORDER BY block_number DESC LIMIT 1`, ); const lastKnownBlock = queryResult[0].block_number; return lastKnownBlock - START_BLOCK_OFFSET; -- cgit v1.2.3 From 9986717671fe8e14c2168f7479bdaffe406bedc0 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Mon, 19 Nov 2018 18:38:11 -0800 Subject: Add script for pulling missing block data --- .../pipeline/src/scripts/pull_missing_blocks.ts | 83 ++++++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 packages/pipeline/src/scripts/pull_missing_blocks.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_blocks.ts b/packages/pipeline/src/scripts/pull_missing_blocks.ts new file mode 100644 index 000000000..4a1483ab9 --- /dev/null +++ b/packages/pipeline/src/scripts/pull_missing_blocks.ts @@ -0,0 +1,83 @@ +// tslint:disable:no-console +import { web3Factory } from '@0x/dev-utils'; +import * as Parallel from 'async-parallel'; +import R = require('ramda'); +import 'reflect-metadata'; +import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; + +import { Web3Source } from '../data_sources/web3'; +import { Block } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseBlock } from '../parsers/web3'; +import { handleError } from '../utils'; + +// Number of blocks to save at once. +const BATCH_SAVE_SIZE = 1000; +// Maximum number of requests to send at once. +const MAX_CONCURRENCY = 10; +// Maximum number of blocks to query for at once. This is also the maximum +// number of blocks we will hold in memory prior to being saved to the database. +const MAX_BLOCKS_PER_QUERY = 1000; +// Block number when the Exchange contract was deployed to mainnet. +// TODO(albrow): De-dupe this constant. +const EXCHANGE_START_BLOCK = 6271590; + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const provider = web3Factory.getRpcProvider({ + rpcUrl: `https://mainnet.infura.io/${process.env.INFURA_API_KEY}`, + }); + const web3Source = new Web3Source(provider); + await getAllMissingBlocks(web3Source); + process.exit(0); +})().catch(handleError); + +interface MissingBlocksResponse { + block_number: string; +} + +async function getAllMissingBlocks(web3Source: Web3Source): Promise { + const blocksRepository = connection.getRepository(Block); + let fromBlock = EXCHANGE_START_BLOCK; + while (true) { + const blockNumbers = await getMissingBlockNumbers(fromBlock); + if (blockNumbers.length === 0) { + // There are no more missing blocks. We're done. + break; + } + await getAndSaveBlocks(web3Source, blocksRepository, blockNumbers); + fromBlock = Math.max(...blockNumbers) + 1; + } + const totalBlocks = await blocksRepository.count(); + console.log(`Done saving blocks. There are now ${totalBlocks} total blocks.`); +} + +async function getMissingBlockNumbers(fromBlock: number): Promise { + console.log(`Checking for missing blocks starting at ${fromBlock}...`); + const response = (await connection.query( + 'SELECT DISTINCT(block_number) FROM raw.exchange_fill_events WHERE block_number NOT IN (SELECT number FROM raw.blocks) AND block_number >= $1 ORDER BY block_number ASC LIMIT $2', + [fromBlock, MAX_BLOCKS_PER_QUERY], + )) as MissingBlocksResponse[]; + const blockNumberStrings = R.pluck('block_number', response); + const blockNumbers = R.map(parseInt, blockNumberStrings); + console.log(`Found ${blockNumbers.length} missing blocks in the given range.`); + return blockNumbers; +} + +async function getAndSaveBlocks( + web3Source: Web3Source, + blocksRepository: Repository, + blockNumbers: number[], +): Promise { + console.log(`Getting block data for ${blockNumbers.length} blocks...`); + Parallel.setConcurrency(MAX_CONCURRENCY); + const rawBlocks = await Parallel.map(blockNumbers, async (blockNumber: number) => + web3Source.getBlockInfoAsync(blockNumber), + ); + console.log(`Parsing ${rawBlocks.length} blocks...`); + const blocks = R.map(parseBlock, rawBlocks); + console.log(`Saving ${blocks.length} blocks...`); + await blocksRepository.save(blocks, { chunk: Math.ceil(blocks.length / BATCH_SAVE_SIZE) }); +} -- cgit v1.2.3 From c6af5131b0b06433d6294260274e187ad61f4ef7 Mon Sep 17 00:00:00 2001 From: Jake Ellowitz Date: Mon, 19 Nov 2018 16:24:07 -0800 Subject: Pull token metadata re trusted tokens --- .../pipeline/src/scripts/pull_missing_events.ts | 4 +- .../pipeline/src/scripts/pull_trusted_tokens.ts | 51 ++++++++++++++++++++++ 2 files changed, 53 insertions(+), 2 deletions(-) create mode 100644 packages/pipeline/src/scripts/pull_trusted_tokens.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index 0b7f6287f..68cabe3de 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -64,7 +64,7 @@ async function getCancelUpToEventsAsync(eventsSource: ExchangeEventsSource): Pro await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); } -const tabelNameRegex = /^[a-zA-Z_]*$/; +const tableNameRegex = /^[a-zA-Z_]*$/; async function getStartBlockAsync(repository: Repository): Promise { const fillEventCount = await repository.count(); @@ -73,7 +73,7 @@ async function getStartBlockAsync(repository: Repositor return EXCHANGE_START_BLOCK; } const tableName = repository.metadata.tableName; - if (!tabelNameRegex.test(tableName)) { + if (!tableNameRegex.test(tableName)) { throw new Error('Unexpected special character in table name: ' + tableName); } const queryResult = await connection.query( diff --git a/packages/pipeline/src/scripts/pull_trusted_tokens.ts b/packages/pipeline/src/scripts/pull_trusted_tokens.ts new file mode 100644 index 000000000..67d9e08d1 --- /dev/null +++ b/packages/pipeline/src/scripts/pull_trusted_tokens.ts @@ -0,0 +1,51 @@ +import 'reflect-metadata'; +import { Connection, ConnectionOptions, createConnection } from 'typeorm'; + +import { MetamaskTrustedTokenMeta, TrustedTokenSource, ZeroExTrustedTokenMeta } from '../data_sources/trusted_tokens'; +import { TrustedToken } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseMetamaskTrustedTokens, parseZeroExTrustedTokens } from '../parsers/trusted_tokens'; +import { handleError } from '../utils'; + +const METAMASK_TRUSTED_TOKENS_URL = + 'https://raw.githubusercontent.com/MetaMask/eth-contract-metadata/master/contract-map.json'; + +const ZEROEX_TRUSTED_TOKENS_URL = + 'https://website-api.0xproject.com/tokens'; + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + await getMetamaskTrustedTokens(); + await getZeroExTrustedTokens(); + process.exit(0); +})().catch(handleError); + +async function getMetamaskTrustedTokens(): Promise { + // tslint:disable-next-line + console.log('Getting latest metamask trusted tokens list ...'); + const trustedTokensRepository = connection.getRepository(TrustedToken); + const trustedTokensSource = new TrustedTokenSource>(METAMASK_TRUSTED_TOKENS_URL); + const resp = await trustedTokensSource.getTrustedTokenMetaAsync(); + const trustedTokens = parseMetamaskTrustedTokens(resp); + // tslint:disable-next-line + console.log('Saving metamask trusted tokens list'); + await trustedTokensRepository.save(trustedTokens); + // tslint:disable-next-line + console.log('Done saving metamask trusted tokens.') +} + +async function getZeroExTrustedTokens(): Promise { + // tslint:disable-next-line + console.log('Getting latest 0x trusted tokens list ...'); + const trustedTokensRepository = connection.getRepository(TrustedToken); + const trustedTokensSource = new TrustedTokenSource(ZEROEX_TRUSTED_TOKENS_URL); + const resp = await trustedTokensSource.getTrustedTokenMetaAsync(); + const trustedTokens = parseZeroExTrustedTokens(resp); + // tslint:disable-next-line + console.log('Saving metamask trusted tokens list'); + await trustedTokensRepository.save(trustedTokens); + // tslint:disable-next-line + console.log('Done saving metamask trusted tokens.'); +} -- cgit v1.2.3 From dea89c4e221d5b22de97b27573719cd27ce250c7 Mon Sep 17 00:00:00 2001 From: Jake Ellowitz Date: Mon, 19 Nov 2018 19:11:51 -0800 Subject: metadata and trusted sources in same raw table --- packages/pipeline/src/scripts/pull_trusted_tokens.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_trusted_tokens.ts b/packages/pipeline/src/scripts/pull_trusted_tokens.ts index 67d9e08d1..48e20bea7 100644 --- a/packages/pipeline/src/scripts/pull_trusted_tokens.ts +++ b/packages/pipeline/src/scripts/pull_trusted_tokens.ts @@ -2,9 +2,9 @@ import 'reflect-metadata'; import { Connection, ConnectionOptions, createConnection } from 'typeorm'; import { MetamaskTrustedTokenMeta, TrustedTokenSource, ZeroExTrustedTokenMeta } from '../data_sources/trusted_tokens'; -import { TrustedToken } from '../entities'; +import { TokenMetadata } from '../entities'; import * as ormConfig from '../ormconfig'; -import { parseMetamaskTrustedTokens, parseZeroExTrustedTokens } from '../parsers/trusted_tokens'; +import { parseMetamaskTrustedTokens, parseZeroExTrustedTokens } from '../parsers/token_metadata'; import { handleError } from '../utils'; const METAMASK_TRUSTED_TOKENS_URL = @@ -25,7 +25,7 @@ let connection: Connection; async function getMetamaskTrustedTokens(): Promise { // tslint:disable-next-line console.log('Getting latest metamask trusted tokens list ...'); - const trustedTokensRepository = connection.getRepository(TrustedToken); + const trustedTokensRepository = connection.getRepository(TokenMetadata); const trustedTokensSource = new TrustedTokenSource>(METAMASK_TRUSTED_TOKENS_URL); const resp = await trustedTokensSource.getTrustedTokenMetaAsync(); const trustedTokens = parseMetamaskTrustedTokens(resp); @@ -39,7 +39,7 @@ async function getMetamaskTrustedTokens(): Promise { async function getZeroExTrustedTokens(): Promise { // tslint:disable-next-line console.log('Getting latest 0x trusted tokens list ...'); - const trustedTokensRepository = connection.getRepository(TrustedToken); + const trustedTokensRepository = connection.getRepository(TokenMetadata); const trustedTokensSource = new TrustedTokenSource(ZEROEX_TRUSTED_TOKENS_URL); const resp = await trustedTokensSource.getTrustedTokenMetaAsync(); const trustedTokens = parseZeroExTrustedTokens(resp); -- cgit v1.2.3 From 1aa3f9d69f7a6570c6fa62c542063fa92fc0bf5a Mon Sep 17 00:00:00 2001 From: Jake Ellowitz Date: Mon, 26 Nov 2018 14:55:45 -0800 Subject: updating comment for 0x trusted tokens --- packages/pipeline/src/scripts/pull_trusted_tokens.ts | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_trusted_tokens.ts b/packages/pipeline/src/scripts/pull_trusted_tokens.ts index 48e20bea7..b3a4466bb 100644 --- a/packages/pipeline/src/scripts/pull_trusted_tokens.ts +++ b/packages/pipeline/src/scripts/pull_trusted_tokens.ts @@ -10,8 +10,7 @@ import { handleError } from '../utils'; const METAMASK_TRUSTED_TOKENS_URL = 'https://raw.githubusercontent.com/MetaMask/eth-contract-metadata/master/contract-map.json'; -const ZEROEX_TRUSTED_TOKENS_URL = - 'https://website-api.0xproject.com/tokens'; +const ZEROEX_TRUSTED_TOKENS_URL = 'https://website-api.0xproject.com/tokens'; let connection: Connection; @@ -26,14 +25,16 @@ async function getMetamaskTrustedTokens(): Promise { // tslint:disable-next-line console.log('Getting latest metamask trusted tokens list ...'); const trustedTokensRepository = connection.getRepository(TokenMetadata); - const trustedTokensSource = new TrustedTokenSource>(METAMASK_TRUSTED_TOKENS_URL); + const trustedTokensSource = new TrustedTokenSource>( + METAMASK_TRUSTED_TOKENS_URL, + ); const resp = await trustedTokensSource.getTrustedTokenMetaAsync(); const trustedTokens = parseMetamaskTrustedTokens(resp); // tslint:disable-next-line console.log('Saving metamask trusted tokens list'); await trustedTokensRepository.save(trustedTokens); // tslint:disable-next-line - console.log('Done saving metamask trusted tokens.') + console.log('Done saving metamask trusted tokens.'); } async function getZeroExTrustedTokens(): Promise { -- cgit v1.2.3 From 7198b441e0d85785eec7244dd60bcd92269d954e Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Thu, 29 Nov 2018 11:40:09 -0800 Subject: Add script for parsing competing dex trades from Bloxy (#1355) --- .../src/scripts/pull_competing_dex_trades.ts | 51 ++++++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 packages/pipeline/src/scripts/pull_competing_dex_trades.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_competing_dex_trades.ts b/packages/pipeline/src/scripts/pull_competing_dex_trades.ts new file mode 100644 index 000000000..4e4c12dd0 --- /dev/null +++ b/packages/pipeline/src/scripts/pull_competing_dex_trades.ts @@ -0,0 +1,51 @@ +// tslint:disable:no-console +import 'reflect-metadata'; +import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; + +import { BloxySource } from '../data_sources/bloxy'; +import { DexTrade } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseBloxyTrades } from '../parsers/bloxy'; +import { handleError } from '../utils'; + +// Number of trades to save at once. +const BATCH_SAVE_SIZE = 1000; + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + await getAndSaveTrades(); + process.exit(0); +})().catch(handleError); + +async function getAndSaveTrades(): Promise { + const apiKey = process.env.BLOXY_API_KEY; + if (apiKey === undefined) { + throw new Error('Missing required env var: BLOXY_API_KEY'); + } + const bloxySource = new BloxySource(apiKey); + const tradesRepository = connection.getRepository(DexTrade); + const lastSeenTimestamp = await getLastSeenTimestampAsync(tradesRepository); + console.log(`Last seen timestamp: ${lastSeenTimestamp === 0 ? 'none' : lastSeenTimestamp}`); + console.log('Getting latest dex trades...'); + const rawTrades = await bloxySource.getDexTradesAsync(lastSeenTimestamp); + console.log(`Parsing ${rawTrades.length} trades...`); + const trades = parseBloxyTrades(rawTrades); + console.log(`Saving ${trades.length} trades...`); + await tradesRepository.save(trades, { chunk: Math.ceil(trades.length / BATCH_SAVE_SIZE) }); + console.log('Done saving trades.'); +} + +async function getLastSeenTimestampAsync(tradesRepository: Repository): Promise { + if ((await tradesRepository.count()) === 0) { + return 0; + } + const response = (await connection.query( + 'SELECT tx_timestamp FROM raw.dex_trades ORDER BY tx_timestamp DESC LIMIT 1', + )) as Array<{ tx_timestamp: number }>; + if (response.length === 0) { + return 0; + } + return response[0].tx_timestamp; +} -- cgit v1.2.3 From 87ffa5d7ab19d2288bf68131a7e7ec77578c564c Mon Sep 17 00:00:00 2001 From: zkao Date: Tue, 4 Dec 2018 13:21:46 -0800 Subject: Token_orderbook_snapshots for Ddex and Paradex(#1354) * Implements the TokenOrderbookSnapshot Table * Scripts, Data Sources and Entities to pull Ddex and Paradex API data. --- .../src/scripts/pull_ddex_orderbook_snapshots.ts | 55 ++++++++++++++ .../scripts/pull_paradex_orderbook_snapshots.ts | 87 ++++++++++++++++++++++ 2 files changed, 142 insertions(+) create mode 100644 packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts create mode 100644 packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts new file mode 100644 index 000000000..b02468e9b --- /dev/null +++ b/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts @@ -0,0 +1,55 @@ +import { logUtils } from '@0x/utils'; +import * as R from 'ramda'; +import { Connection, ConnectionOptions, createConnection } from 'typeorm'; + +import { DDEX_SOURCE, DdexMarket, DdexSource } from '../data_sources/ddex'; +import { TokenOrderbookSnapshot as TokenOrder } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseDdexOrders } from '../parsers/ddex_orders'; +import { handleError } from '../utils'; + +// Number of orders to save at once. +const BATCH_SAVE_SIZE = 1000; + +// Number of markets to retrieve orderbooks for at once. +const MARKET_ORDERBOOK_REQUEST_BATCH_SIZE = 50; + +// Delay between market orderbook requests. +const MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY = 5000; + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const ddexSource = new DdexSource(); + const markets = await ddexSource.getActiveMarketsAsync(); + for (const marketsChunk of R.splitEvery(MARKET_ORDERBOOK_REQUEST_BATCH_SIZE, markets)) { + await Promise.all( + marketsChunk.map(async (market: DdexMarket) => getAndSaveMarketOrderbook(ddexSource, market)), + ); + await new Promise(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY)); + } + process.exit(0); +})().catch(handleError); + +/** + * Retrieve orderbook from Ddex API for a given market. Parse orders and insert + * them into our database. + * @param ddexSource Data source which can query Ddex API. + * @param market Object from Ddex API containing market data. + */ +async function getAndSaveMarketOrderbook(ddexSource: DdexSource, market: DdexMarket): Promise { + const orderBook = await ddexSource.getMarketOrderbookAsync(market.id); + const observedTimestamp = Date.now(); + + logUtils.log(`${market.id}: Parsing orders.`); + const orders = parseDdexOrders(orderBook, market, observedTimestamp, DDEX_SOURCE); + + if (orders.length > 0) { + logUtils.log(`${market.id}: Saving ${orders.length} orders.`); + const TokenOrderRepository = connection.getRepository(TokenOrder); + await TokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) }); + } else { + logUtils.log(`${market.id}: 0 orders to save.`); + } +} diff --git a/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts new file mode 100644 index 000000000..bae1fbede --- /dev/null +++ b/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts @@ -0,0 +1,87 @@ +import { logUtils } from '@0x/utils'; +import { Connection, ConnectionOptions, createConnection } from 'typeorm'; + +import { + PARADEX_SOURCE, + ParadexActiveMarketsResponse, + ParadexMarket, + ParadexSource, + ParadexTokenInfoResponse, +} from '../data_sources/paradex'; +import { TokenOrderbookSnapshot as TokenOrder } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseParadexOrders } from '../parsers/paradex_orders'; +import { handleError } from '../utils'; + +// Number of orders to save at once. +const BATCH_SAVE_SIZE = 1000; + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const apiKey = process.env.PARADEX_DATA_PIPELINE_API_KEY; + if (apiKey === undefined) { + throw new Error('Missing required env var: PARADEX_DATA_PIPELINE_API_KEY'); + } + const paradexSource = new ParadexSource(apiKey); + const markets = await paradexSource.getActiveMarketsAsync(); + const tokenInfoResponse = await paradexSource.getTokenInfoAsync(); + const extendedMarkets = addTokenAddresses(markets, tokenInfoResponse); + await Promise.all( + extendedMarkets.map(async (market: ParadexMarket) => getAndSaveMarketOrderbook(paradexSource, market)), + ); + process.exit(0); +})().catch(handleError); + +/** + * Extend the default ParadexMarket objects with token addresses. + * @param markets An array of ParadexMarket objects. + * @param tokenInfoResponse An array of ParadexTokenInfo containing the addresses. + */ +function addTokenAddresses( + markets: ParadexActiveMarketsResponse, + tokenInfoResponse: ParadexTokenInfoResponse, +): ParadexMarket[] { + const symbolAddressMapping = new Map(); + tokenInfoResponse.forEach(tokenInfo => symbolAddressMapping.set(tokenInfo.symbol, tokenInfo.address)); + + markets.forEach((market: ParadexMarket) => { + if (symbolAddressMapping.has(market.baseToken)) { + market.baseTokenAddress = symbolAddressMapping.get(market.baseToken); + } else { + market.quoteTokenAddress = ''; + logUtils.warn(`${market.baseToken}: No address found.`); + } + + if (symbolAddressMapping.has(market.quoteToken)) { + market.quoteTokenAddress = symbolAddressMapping.get(market.quoteToken); + } else { + market.quoteTokenAddress = ''; + logUtils.warn(`${market.quoteToken}: No address found.`); + } + }); + return markets; +} + +/** + * Retrieve orderbook from Paradex API for a given market. Parse orders and insert + * them into our database. + * @param paradexSource Data source which can query the Paradex API. + * @param market Object from the Paradex API with information about the market in question. + */ +async function getAndSaveMarketOrderbook(paradexSource: ParadexSource, market: ParadexMarket): Promise { + const paradexOrderbookResponse = await paradexSource.getMarketOrderbookAsync(market.symbol); + const observedTimestamp = Date.now(); + + logUtils.log(`${market.symbol}: Parsing orders.`); + const orders = parseParadexOrders(paradexOrderbookResponse, market, observedTimestamp, PARADEX_SOURCE); + + if (orders.length > 0) { + logUtils.log(`${market.symbol}: Saving ${orders.length} orders.`); + const tokenOrderRepository = connection.getRepository(TokenOrder); + await tokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) }); + } else { + logUtils.log(`${market.symbol}: 0 orders to save.`); + } +} -- cgit v1.2.3 From 8c21a700bae0c751f7f9ca47f9a47628a4478911 Mon Sep 17 00:00:00 2001 From: Xianny <8582774+xianny@users.noreply.github.com> Date: Tue, 4 Dec 2018 13:36:18 -0800 Subject: pull OHLCV records from Crypto Compare (#1349) * [WIP] pull OHLCV records from Crypto Compare * lint * refactor to pull logic out of script and into modules * add entity test for ohlcv_external entity * implement rate limit and chronological backfill for ohlcv * add unit tests; cleanup variable names * Fetch OHLCV pairs params from events table * better method names * fix outdated test * lint * Clean up after review * oops * fix failing test * better filtering of most recent records * fix bug when generating pairs * fix default earliest backfill date * fix bug with retrieving backfill time * prettier --- .../src/scripts/pull_ohlcv_cryptocompare.ts | 101 +++++++++++++++++++++ 1 file changed, 101 insertions(+) create mode 100644 packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts new file mode 100644 index 000000000..6979cd10e --- /dev/null +++ b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts @@ -0,0 +1,101 @@ +// tslint:disable:no-console +import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; + +import { CryptoCompareOHLCVSource } from '../data_sources/ohlcv_external/crypto_compare'; +import { OHLCVExternal } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { OHLCVMetadata, parseRecords } from '../parsers/ohlcv_external/crypto_compare'; +import { handleError } from '../utils'; +import { fetchOHLCVTradingPairsAsync, TradingPair } from '../utils/get_ohlcv_trading_pairs'; + +const SOURCE_NAME = 'CryptoCompare'; +const TWO_HOURS_AGO = new Date().getTime() - 2 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers +const ONE_HOUR_AGO = new Date().getTime() - 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers +const ONE_SECOND = 1000; + +const MAX_CONCURRENT_REQUESTS = parseInt(process.env.CRYPTOCOMPARE_MAX_CONCURRENT_REQUESTS || '14', 10); // tslint:disable-line:custom-no-magic-numbers +const EARLIEST_BACKFILL_DATE = process.env.OHLCV_EARLIEST_BACKFILL_DATE || '2010-09-01'; // the time when BTC/USD info starts appearing on Crypto Compare +const EARLIEST_BACKFILL_TIME = new Date(EARLIEST_BACKFILL_DATE).getTime(); + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const repository = connection.getRepository(OHLCVExternal); + const source = new CryptoCompareOHLCVSource(MAX_CONCURRENT_REQUESTS); + + const jobTime = new Date().getTime(); + const tradingPairs = await fetchOHLCVTradingPairsAsync(connection, SOURCE_NAME, EARLIEST_BACKFILL_TIME); + console.log(`Starting ${tradingPairs.length} job(s) to scrape Crypto Compare for OHLCV records...`); + + const fetchAndSavePromises = tradingPairs.map(async pair => { + const pairs = source.generateBackfillIntervals(pair); + return fetchAndSaveAsync(source, repository, jobTime, pairs); + }); + await Promise.all(fetchAndSavePromises); + console.log(`Finished scraping OHLCV records from Crypto Compare, exiting...`); + process.exit(0); +})().catch(handleError); + +async function fetchAndSaveAsync( + source: CryptoCompareOHLCVSource, + repository: Repository, + jobTime: number, + pairs: TradingPair[], +): Promise { + const sortAscTimestamp = (a: TradingPair, b: TradingPair): number => { + if (a.latestSavedTime < b.latestSavedTime) { + return -1; + } else if (a.latestSavedTime > b.latestSavedTime) { + return 1; + } else { + return 0; + } + }; + pairs.sort(sortAscTimestamp); + + let i = 0; + while (i < pairs.length) { + const pair = pairs[i]; + if (pair.latestSavedTime > TWO_HOURS_AGO) { + break; + } + const rawRecords = await source.getHourlyOHLCVAsync(pair); + const records = rawRecords.filter(rec => { + return rec.time * ONE_SECOND < ONE_HOUR_AGO && rec.time * ONE_SECOND > pair.latestSavedTime; + }); // Crypto Compare can take ~30mins to finalise records + if (records.length === 0) { + console.log(`No more records, stopping task for ${JSON.stringify(pair)}`); + break; + } + const metadata: OHLCVMetadata = { + exchange: source.default_exchange, + fromSymbol: pair.fromSymbol, + toSymbol: pair.toSymbol, + source: SOURCE_NAME, + observedTimestamp: jobTime, + interval: source.intervalBetweenRecords, + }; + const parsedRecords = parseRecords(records, metadata); + try { + await saveRecordsAsync(repository, parsedRecords); + i++; + } catch (err) { + console.log(`Error saving OHLCVRecords, stopping task for ${JSON.stringify(pair)} [${err}]`); + break; + } + } + return Promise.resolve(); +} + +async function saveRecordsAsync(repository: Repository, records: OHLCVExternal[]): Promise { + const metadata = [ + records[0].fromSymbol, + records[0].toSymbol, + new Date(records[0].startTime), + new Date(records[records.length - 1].endTime), + ]; + + console.log(`Saving ${records.length} records to ${repository.metadata.name}... ${JSON.stringify(metadata)}`); + await repository.save(records); +} -- cgit v1.2.3 From 8721d4ed7a7a38f77847b45619f15315044c375c Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Tue, 4 Dec 2018 15:23:15 -0800 Subject: Fix linter --- packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts index b02468e9b..7868e9c5a 100644 --- a/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts +++ b/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts @@ -27,7 +27,7 @@ let connection: Connection; await Promise.all( marketsChunk.map(async (market: DdexMarket) => getAndSaveMarketOrderbook(ddexSource, market)), ); - await new Promise(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY)); + await new Promise(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY)); } process.exit(0); })().catch(handleError); -- cgit v1.2.3 From 549f5e4655f246062dd6451065ec01eb789dbd8f Mon Sep 17 00:00:00 2001 From: Fabio B Date: Tue, 4 Dec 2018 19:56:23 -0800 Subject: Use a string template in packages/pipeline/src/scripts/pull_missing_events.ts Co-Authored-By: albrow --- packages/pipeline/src/scripts/pull_missing_events.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index 68cabe3de..c7e07c540 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -74,7 +74,7 @@ async function getStartBlockAsync(repository: Repositor } const tableName = repository.metadata.tableName; if (!tableNameRegex.test(tableName)) { - throw new Error('Unexpected special character in table name: ' + tableName); + throw new Error(`Unexpected special character in table name: ${tableName}`); } const queryResult = await connection.query( `SELECT block_number FROM raw.${tableName} ORDER BY block_number DESC LIMIT 1`, -- cgit v1.2.3 From 00f86ca0f7871639d2b0be496f6f8c5e0d8d7ffe Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Tue, 4 Dec 2018 20:04:08 -0800 Subject: Address PR feedback --- .../pipeline/src/scripts/pull_missing_blocks.ts | 7 ++--- .../pipeline/src/scripts/pull_missing_events.ts | 5 ++-- .../src/scripts/pull_radar_relay_orders.ts | 33 +++++++++++++--------- .../pipeline/src/scripts/pull_trusted_tokens.ts | 14 ++++----- 4 files changed, 31 insertions(+), 28 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_blocks.ts b/packages/pipeline/src/scripts/pull_missing_blocks.ts index 4a1483ab9..b7bd51f08 100644 --- a/packages/pipeline/src/scripts/pull_missing_blocks.ts +++ b/packages/pipeline/src/scripts/pull_missing_blocks.ts @@ -9,7 +9,7 @@ import { Web3Source } from '../data_sources/web3'; import { Block } from '../entities'; import * as ormConfig from '../ormconfig'; import { parseBlock } from '../parsers/web3'; -import { handleError } from '../utils'; +import { EXCHANGE_START_BLOCK, handleError, INFURA_ROOT_URL } from '../utils'; // Number of blocks to save at once. const BATCH_SAVE_SIZE = 1000; @@ -18,16 +18,13 @@ const MAX_CONCURRENCY = 10; // Maximum number of blocks to query for at once. This is also the maximum // number of blocks we will hold in memory prior to being saved to the database. const MAX_BLOCKS_PER_QUERY = 1000; -// Block number when the Exchange contract was deployed to mainnet. -// TODO(albrow): De-dupe this constant. -const EXCHANGE_START_BLOCK = 6271590; let connection: Connection; (async () => { connection = await createConnection(ormConfig as ConnectionOptions); const provider = web3Factory.getRpcProvider({ - rpcUrl: `https://mainnet.infura.io/${process.env.INFURA_API_KEY}`, + rpcUrl: `${INFURA_ROOT_URL}/${process.env.INFURA_API_KEY}`, }); const web3Source = new Web3Source(provider); await getAllMissingBlocks(web3Source); diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index c7e07c540..80abbb8b0 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -8,9 +8,8 @@ import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeEvent, ExchangeFillEvent } from '../entities'; import * as ormConfig from '../ormconfig'; import { parseExchangeCancelEvents, parseExchangeCancelUpToEvents, parseExchangeFillEvents } from '../parsers/events'; -import { handleError } from '../utils'; +import { EXCHANGE_START_BLOCK, handleError, INFURA_ROOT_URL } from '../utils'; -const EXCHANGE_START_BLOCK = 6271590; // Block number when the Exchange contract was deployed to mainnet. const START_BLOCK_OFFSET = 100; // Number of blocks before the last known block to consider when updating fill events. const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. @@ -19,7 +18,7 @@ let connection: Connection; (async () => { connection = await createConnection(ormConfig as ConnectionOptions); const provider = web3Factory.getRpcProvider({ - rpcUrl: 'https://mainnet.infura.io', + rpcUrl: INFURA_ROOT_URL, }); const eventsSource = new ExchangeEventsSource(provider, 1); await getFillEventsAsync(eventsSource); diff --git a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts index b3a4d887e..bbbef9b47 100644 --- a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts +++ b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts @@ -16,11 +16,11 @@ let connection: Connection; (async () => { connection = await createConnection(ormConfig as ConnectionOptions); - await getOrderbook(); + await getOrderbookAsync(); process.exit(0); })().catch(handleError); -async function getOrderbook(): Promise { +async function getOrderbookAsync(): Promise { console.log('Getting all orders...'); const connectClient = new HttpClient(RADAR_RELAY_URL); const rawOrders = await connectClient.getOrdersAsync({ @@ -29,17 +29,22 @@ async function getOrderbook(): Promise { console.log(`Got ${rawOrders.records.length} orders.`); console.log('Parsing orders...'); // Parse the sra orders, then add source url to each. - const orders = R.pipe(parseSraOrders, R.map(setSourceUrl(RADAR_RELAY_URL)))(rawOrders); + const orders = R.pipe( + parseSraOrders, + R.map(setSourceUrl(RADAR_RELAY_URL)), + )(rawOrders); // Save all the orders and update the observed time stamps in a single // transaction. console.log('Saving orders and updating timestamps...'); - await connection.transaction(async (manager: EntityManager): Promise => { - for (const order of orders) { - await manager.save(SraOrder, order); - const observedTimestamp = createObservedTimestampForOrder(order); - await manager.save(observedTimestamp); - } - }); + await connection.transaction( + async (manager: EntityManager): Promise => { + for (const order of orders) { + await manager.save(SraOrder, order); + const observedTimestamp = createObservedTimestampForOrder(order); + await manager.save(observedTimestamp); + } + }, + ); } const sourceUrlProp = R.lensProp('sourceUrl'); @@ -48,6 +53,8 @@ const sourceUrlProp = R.lensProp('sourceUrl'); * Sets the source url for a single order. Returns a new order instead of * mutating the given one. */ -const setSourceUrl = R.curry((sourceURL: string, order: SraOrder): SraOrder => { - return R.set(sourceUrlProp, sourceURL, order); -}); +const setSourceUrl = R.curry( + (sourceURL: string, order: SraOrder): SraOrder => { + return R.set(sourceUrlProp, sourceURL, order); + }, +); diff --git a/packages/pipeline/src/scripts/pull_trusted_tokens.ts b/packages/pipeline/src/scripts/pull_trusted_tokens.ts index b3a4466bb..1befc4437 100644 --- a/packages/pipeline/src/scripts/pull_trusted_tokens.ts +++ b/packages/pipeline/src/scripts/pull_trusted_tokens.ts @@ -8,7 +8,7 @@ import { parseMetamaskTrustedTokens, parseZeroExTrustedTokens } from '../parsers import { handleError } from '../utils'; const METAMASK_TRUSTED_TOKENS_URL = - 'https://raw.githubusercontent.com/MetaMask/eth-contract-metadata/master/contract-map.json'; + 'https://raw.githubusercontent.com/MetaMask/eth-contract-metadata/d45916c533116510cc8e9e048a8b5fc3732a6b6d/contract-map.json'; const ZEROEX_TRUSTED_TOKENS_URL = 'https://website-api.0xproject.com/tokens'; @@ -22,7 +22,7 @@ let connection: Connection; })().catch(handleError); async function getMetamaskTrustedTokens(): Promise { - // tslint:disable-next-line + // tslint:disable-next-line:no-console console.log('Getting latest metamask trusted tokens list ...'); const trustedTokensRepository = connection.getRepository(TokenMetadata); const trustedTokensSource = new TrustedTokenSource>( @@ -30,23 +30,23 @@ async function getMetamaskTrustedTokens(): Promise { ); const resp = await trustedTokensSource.getTrustedTokenMetaAsync(); const trustedTokens = parseMetamaskTrustedTokens(resp); - // tslint:disable-next-line + // tslint:disable-next-line:no-console console.log('Saving metamask trusted tokens list'); await trustedTokensRepository.save(trustedTokens); - // tslint:disable-next-line + // tslint:disable-next-line:no-console console.log('Done saving metamask trusted tokens.'); } async function getZeroExTrustedTokens(): Promise { - // tslint:disable-next-line + // tslint:disable-next-line:no-console console.log('Getting latest 0x trusted tokens list ...'); const trustedTokensRepository = connection.getRepository(TokenMetadata); const trustedTokensSource = new TrustedTokenSource(ZEROEX_TRUSTED_TOKENS_URL); const resp = await trustedTokensSource.getTrustedTokenMetaAsync(); const trustedTokens = parseZeroExTrustedTokens(resp); - // tslint:disable-next-line + // tslint:disable-next-line:no-console console.log('Saving metamask trusted tokens list'); await trustedTokensRepository.save(trustedTokens); - // tslint:disable-next-line + // tslint:disable-next-line:no-console console.log('Done saving metamask trusted tokens.'); } -- cgit v1.2.3 From 2e704ac01a077b0c73288aaa53c9cf66c73e27f1 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Tue, 4 Dec 2018 20:08:32 -0800 Subject: Fix prettier --- .../src/scripts/pull_radar_relay_orders.ts | 29 ++++++++-------------- 1 file changed, 11 insertions(+), 18 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts index bbbef9b47..6c18bcaef 100644 --- a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts +++ b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts @@ -29,22 +29,17 @@ async function getOrderbookAsync(): Promise { console.log(`Got ${rawOrders.records.length} orders.`); console.log('Parsing orders...'); // Parse the sra orders, then add source url to each. - const orders = R.pipe( - parseSraOrders, - R.map(setSourceUrl(RADAR_RELAY_URL)), - )(rawOrders); + const orders = R.pipe(parseSraOrders, R.map(setSourceUrl(RADAR_RELAY_URL)))(rawOrders); // Save all the orders and update the observed time stamps in a single // transaction. console.log('Saving orders and updating timestamps...'); - await connection.transaction( - async (manager: EntityManager): Promise => { - for (const order of orders) { - await manager.save(SraOrder, order); - const observedTimestamp = createObservedTimestampForOrder(order); - await manager.save(observedTimestamp); - } - }, - ); + await connection.transaction(async (manager: EntityManager): Promise => { + for (const order of orders) { + await manager.save(SraOrder, order); + const observedTimestamp = createObservedTimestampForOrder(order); + await manager.save(observedTimestamp); + } + }); } const sourceUrlProp = R.lensProp('sourceUrl'); @@ -53,8 +48,6 @@ const sourceUrlProp = R.lensProp('sourceUrl'); * Sets the source url for a single order. Returns a new order instead of * mutating the given one. */ -const setSourceUrl = R.curry( - (sourceURL: string, order: SraOrder): SraOrder => { - return R.set(sourceUrlProp, sourceURL, order); - }, -); +const setSourceUrl = R.curry((sourceURL: string, order: SraOrder): SraOrder => { + return R.set(sourceUrlProp, sourceURL, order); +}); -- cgit v1.2.3