From 57e7119c0d4f1ab7dd1d4c0118e72dc1706e2151 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Mon, 17 Sep 2018 11:27:38 -0700 Subject: Rebase pipeline branch off development --- packages/pipeline/src/scripts/create_tables.ts | 258 ++++++++++ packages/pipeline/src/scripts/join_tables.ts | 234 +++++++++ packages/pipeline/src/scripts/query_data.ts | 87 ++++ packages/pipeline/src/scripts/scrape_data.ts | 649 +++++++++++++++++++++++++ 4 files changed, 1228 insertions(+) create mode 100644 packages/pipeline/src/scripts/create_tables.ts create mode 100644 packages/pipeline/src/scripts/join_tables.ts create mode 100644 packages/pipeline/src/scripts/query_data.ts create mode 100644 packages/pipeline/src/scripts/scrape_data.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/create_tables.ts b/packages/pipeline/src/scripts/create_tables.ts new file mode 100644 index 000000000..fd0d2b78b --- /dev/null +++ b/packages/pipeline/src/scripts/create_tables.ts @@ -0,0 +1,258 @@ +import * as commandLineArgs from 'command-line-args'; + +import { postgresClient } from '../postgres'; +import { formatters } from '../utils'; +const tableQueries: any = { + events_full: `CREATE TABLE IF NOT EXISTS events_full ( + timestamp TIMESTAMP WITH TIME ZONE, + event_type VARCHAR, + error_id VARCHAR, + order_hash CHAR(66), + maker CHAR(42), + maker_amount NUMERIC(78), + maker_fee NUMERIC(78), + maker_token CHAR(42), + taker CHAR(42), + taker_amount NUMERIC(78), + taker_fee NUMERIC(78), + taker_token CHAR(42), + txn_hash CHAR(66), + gas_used NUMERIC(78), + gas_price NUMERIC(78), + fee_recipient CHAR(42), + method_id CHAR(10), + salt VARCHAR, + block_number BIGINT, + log_index BIGINT, + taker_symbol VARCHAR, + taker_name VARCHAR, + taker_decimals BIGINT, + taker_usd_price NUMERIC(78), + taker_txn_usd_value NUMERIC(78), + maker_symbol VARCHAR, + maker_name VARCHAR, + maker_decimals BIGINT, + maker_usd_price NUMERIC(78), + maker_txn_usd_value NUMERIC(78), + PRIMARY KEY (txn_hash, order_hash, log_index) + )`, + events: `CREATE TABLE IF NOT EXISTS events ( + timestamp TIMESTAMP WITH TIME ZONE, + event_type VARCHAR, + error_id VARCHAR, + order_hash CHAR(66), + maker CHAR(42), + maker_amount NUMERIC(78), + maker_fee NUMERIC(78), + maker_token CHAR(42), + taker CHAR(42), + taker_amount NUMERIC(78), + taker_fee NUMERIC(78), + taker_token CHAR(42), + txn_hash CHAR(66), + gas_used NUMERIC(78), + gas_price NUMERIC(78), + fee_recipient CHAR(42), + method_id CHAR(10), + salt VARCHAR, + block_number BIGINT, + log_index BIGINT, + PRIMARY KEY (txn_hash, order_hash, log_index) + )`, + events_staging: `CREATE TABLE IF NOT EXISTS events_staging ( + timestamp TIMESTAMP WITH TIME ZONE, + event_type VARCHAR, + error_id VARCHAR, + order_hash CHAR(66), + maker CHAR(42), + maker_amount NUMERIC(78), + maker_fee NUMERIC(78), + maker_token CHAR(42), + taker CHAR(42), + taker_amount NUMERIC(78), + taker_fee NUMERIC(78), + taker_token CHAR(42), + txn_hash CHAR(66), + fee_recipient CHAR(42), + block_number BIGINT, + log_index BIGINT, + PRIMARY KEY (txn_hash, order_hash, log_index) + )`, + events_raw: `CREATE TABLE IF NOT EXISTS events_raw ( + event_type VARCHAR, + error_id VARCHAR, + order_hash CHAR(66), + maker CHAR(42), + maker_amount NUMERIC(78), + maker_fee NUMERIC(78), + maker_token CHAR(42), + taker CHAR(42), + taker_amount NUMERIC(78), + taker_fee NUMERIC(78), + taker_token CHAR(42), + txn_hash CHAR(66), + fee_recipient CHAR(42), + block_number BIGINT, + log_index BIGINT, + PRIMARY KEY (txn_hash, order_hash, log_index) + )`, + blocks: `CREATE TABLE IF NOT EXISTS blocks ( + timestamp TIMESTAMP WITH TIME ZONE, + block_hash CHAR(66) UNIQUE, + block_number BIGINT, + PRIMARY KEY (block_hash) + )`, + transactions: `CREATE TABLE IF NOT EXISTS transactions ( + txn_hash CHAR(66) UNIQUE, + block_hash CHAR(66), + block_number BIGINT, + gas_used NUMERIC(78), + gas_price NUMERIC(78), + method_id CHAR(10), + salt VARCHAR, + PRIMARY KEY (txn_hash) + )`, + tokens: `CREATE TABLE IF NOT EXISTS tokens ( + address CHAR(42) UNIQUE, + name VARCHAR, + symbol VARCHAR, + decimals INT, + PRIMARY KEY (address) + )`, + prices: `CREATE TABLE IF NOT EXISTS prices ( + address CHAR(42) UNIQUE, + timestamp TIMESTAMP WITH TIME ZONE, + price NUMERIC(78, 18), + PRIMARY KEY (address, timestamp) + )`, + relayers: `CREATE TABLE IF NOT EXISTS relayers ( + name VARCHAR UNIQUE, + url VARCHAR DEFAULT '', + sra_http_endpoint VARCHAR DEFAULT '', + sra_ws_endpoint VARCHAR DEFAULT '', + fee_recipient_addresses CHAR(42)[] DEFAULT '{}', + taker_addresses CHAR(42)[] DEFAULT '{}', + PRIMARY KEY(name)`, + historical_prices: `CREATE TABLE IF NOT EXISTS historical_prices ( + token VARCHAR, + base VARCHAR, + timestamp TIMESTAMP WITH TIME ZONE, + close NUMERIC(78, 18), + high NUMERIC(78, 18), + low NUMERIC(78, 18), + open NUMERIC(78, 18), + volume_from NUMERIC(78, 18), + volume_to NUMERIC(78, 18), + PRIMARY KEY (token, base, timestamp) + )`, + orders: `CREATE TABLE IF NOT EXISTS orders ( + relayer_id VARCHAR, + exchange_contract_address CHAR(42), + maker CHAR(42), + maker_amount NUMERIC(78), + maker_fee NUMERIC(78), + maker_token CHAR(42), + taker CHAR(42), + taker_amount NUMERIC(78), + taker_fee NUMERIC(78), + taker_token CHAR(42), + fee_recipient CHAR(42), + expiration_unix_timestamp_sec NUMERIC(78), + salt VARCHAR, + order_hash CHAR(66), + PRIMARY KEY (relayer_id, order_hash) + )`, +}; +function _safeQuery(query: string): any { + return new Promise((resolve, reject) => { + postgresClient + .query(query) + .then((data: any) => { + resolve(data); + }) + .catch((err: any) => { + reject(err); + }); + }); +} +export const tableScripts = { + createTable(query: string): any { + return _safeQuery(query); + }, + createAllTables(): any { + for (const tableName of tableQueries) { + _safeQuery(tableQueries[tableName]); + } + }, +}; +export const insertDataScripts = { + insertSingleRow(table: string, object: any): any { + return new Promise((resolve, reject) => { + const columns = Object.keys(object); + const safeArray: any = []; + for (const key of columns) { + if (key in object) { + if (key === 'timestamp') { + safeArray.push('to_timestamp(' + object[key] + ')'); + } else if (typeof object[key] === 'string' || object[key] instanceof String) { + safeArray.push(formatters.escapeSQLParam(object[key])); + } else { + safeArray.push(object[key]); + } + } else { + safeArray.push('default'); + } + } + const queryString = `INSERT INTO ${table} (${columns}) VALUES (${safeArray}) ON CONFLICT DO NOTHING`; + console.log(queryString); + postgresClient + .query(queryString) + .then((data: any) => { + resolve(data); + }) + .catch((err: any) => { + reject(err); + }); + }); + }, + insertMultipleRows(table: string, rows: any[], columns: any[]): any { + return new Promise((resolve, reject) => { + if (rows.length > 0) { + const rowsSplit = rows.map((value, index) => { + const safeArray: any = []; + for (const key of columns) { + if (key in value) { + if (key === 'timestamp') { + safeArray.push('to_timestamp(' + value[key] + ')'); + } else if (typeof value[key] === 'string' || value[key] instanceof String) { + safeArray.push(formatters.escapeSQLParam(value[key])); + } else if (value[key] instanceof Array) { + const escapedArray = value[key].map((subValue: string, subIndex: number) => { + return formatters.escapeSQLParam(subValue); + }); + safeArray.push('ARRAY[' + escapedArray.toString() + ']'); + } else { + safeArray.push(value[key]); + } + } else { + safeArray.push('default'); + } + } + return '(' + safeArray + ')'; + }); + const queryString = `INSERT INTO ${table} (${columns}) VALUES ${rowsSplit} ON CONFLICT DO NOTHING`; + postgresClient + .query(queryString) + .then((data: any) => { + resolve(data); + }) + .catch((err: any) => { + // console.log(err); + reject(err); + }); + } else { + resolve({}); + } + }); + }, +}; diff --git a/packages/pipeline/src/scripts/join_tables.ts b/packages/pipeline/src/scripts/join_tables.ts new file mode 100644 index 000000000..e7c05b39a --- /dev/null +++ b/packages/pipeline/src/scripts/join_tables.ts @@ -0,0 +1,234 @@ +import * as commandLineArgs from 'command-line-args'; + +import { postgresClient } from '../postgres'; +import { formatters } from '../utils'; +const optionDefinitions = [ + { name: 'name', alias: 'n', type: String }, + { name: 'from', alias: 'f', type: Number }, + { name: 'to', alias: 't', type: Number }, +]; +const cli = commandLineArgs(optionDefinitions); +const dataInsertionQueries: any = { + events_staging: `INSERT INTO events_staging ( + timestamp, + event_type, + error_id, + order_hash, + maker, + maker_amount, + maker_fee, + maker_token, + taker, + taker_amount, + taker_fee, + taker_token, + txn_hash, + fee_recipient, + block_number, + log_index + ) + (SELECT + b.timestamp, + a.event_type, + a.error_id, + a.order_hash, + a.maker, + a.maker_amount, + a.maker_fee, + a.maker_token, + a.taker, + a.taker_amount, + a.taker_fee, + a.taker_token, + a.txn_hash, + a.fee_recipient, + a.block_number, + a.log_index + FROM + events_raw a + JOIN + blocks b + ON + a.block_number = b.block_number + AND + b.block_number >= $1 + AND + b.block_number <= $2 + ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`, + events: `INSERT INTO events ( + timestamp, + event_type, + error_id, + order_hash, + maker, + maker_amount, + maker_fee, + maker_token, + taker, + taker_amount, + taker_fee, + taker_token, + txn_hash, + fee_recipient, + block_number, + log_index, + gas_used, + gas_price, + method_id, + salt + ) + (SELECT + a.timestamp, + a.event_type, + a.error_id, + a.order_hash, + a.maker, + a.maker_amount, + a.maker_fee, + a.maker_token, + a.taker, + a.taker_amount, + a.taker_fee, + a.taker_token, + a.txn_hash, + a.fee_recipient, + a.block_number, + a.log_index, + b.gas_used, + b.gas_price, + b.method_id, + b.salt + FROM + events_staging a + JOIN + transactions b + ON + a.txn_hash = b.txn_hash + AND + a.block_number >= $1 + AND + a.block_number <= $2 + ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`, + events_full: ` + INSERT INTO events_full ( + timestamp, + event_type, + error_id, + order_hash, + maker, + maker_amount, + maker_fee, + maker_token, + taker, + taker_amount, + taker_fee, + taker_token, + txn_hash, + fee_recipient, + block_number, + log_index, + gas_used, + gas_price, + method_id, + salt, + taker_symbol, + taker_name, + taker_decimals, + taker_usd_price, + taker_txn_usd_value, + maker_symbol, + maker_name, + maker_decimals, + maker_usd_price, + maker_txn_usd_value + ) + (SELECT + events.timestamp, + events.event_type, + events.error_id, + events.order_hash, + events.maker, + events.maker_amount, + events.maker_fee, + events.maker_token, + events.taker, + events.taker_amount, + events.taker_fee, + events.taker_token, + events.txn_hash, + events.fee_recipient, + events.block_number, + events.log_index, + events.gas_used, + events.gas_price, + events.method_id, + events.salt, + taker_token_prices.symbol, + taker_token_prices.name, + taker_token_prices.decimals, + taker_token_prices.price, + (events.taker_amount / (10 ^ taker_token_prices.decimals)) * taker_token_prices.price, + maker_token_prices.symbol, + maker_token_prices.name, + maker_token_prices.decimals, + maker_token_prices.price, + (events.maker_amount / (10 ^ maker_token_prices.decimals)) * maker_token_prices.price + FROM + events + LEFT JOIN + (SELECT + tokens.address, + tokens.name, + tokens.symbol, + tokens.decimals, + prices.timestamp, + prices.price + FROM + tokens + LEFT JOIN + prices + ON + tokens.symbol = prices.symbol) taker_token_prices + ON + (events.taker_token = taker_token_prices.address + AND + (DATE(events.timestamp) = DATE(taker_token_prices.timestamp) OR taker_token_prices.timestamp IS NULL)) + LEFT JOIN + (SELECT + tokens.address, + tokens.name, + tokens.symbol, + tokens.decimals, + prices.timestamp, + prices.price + FROM + tokens + LEFT JOIN + prices + ON + tokens.symbol = prices.symbol) maker_token_prices + ON + (events.maker_token = maker_token_prices.address + AND + (DATE(events.timestamp) = DATE(maker_token_prices.timestamp) OR maker_token_prices.timestamp IS NULL)) + WHERE + events.block_number >= $1 + AND + events.block_number <= $2 + ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`, +}; +if (cli.name) { + const query = dataInsertionQueries[cli.name]; + if (query && cli.from) { + const fromBlock = cli.from; + const toBlock = cli.to ? cli.to : cli.from + 1; + postgresClient + .query(query, [fromBlock, toBlock]) + .then((data: any) => { + console.log(data); + }) + .catch((err: any) => { + console.error(err); + }); + } +} diff --git a/packages/pipeline/src/scripts/query_data.ts b/packages/pipeline/src/scripts/query_data.ts new file mode 100644 index 000000000..97e3749ea --- /dev/null +++ b/packages/pipeline/src/scripts/query_data.ts @@ -0,0 +1,87 @@ +import { formatters } from '../utils'; +export const dataFetchingQueries: any = { + get_missing_txn_hashes: ` + SELECT + a.txn_hash + FROM + events_raw a + WHERE NOT EXISTS + ( + SELECT + * + FROM + transactions b + WHERE + b.txn_hash = a.txn_hash + ) + AND + a.block_number >= $1 + AND + a.block_number < $2`, + get_used_block_numbers: ` + SELECT DISTINCT + a.block_number + FROM + events_raw a + WHERE NOT EXISTS + ( + SELECT + * + FROM + blocks b + WHERE + b.block_number = a.block_number + ) + AND + a.block_number >= $1 + AND + a.block_number < $2`, + get_token_registry: ` + SELECT + * + FROM + tokens`, + get_max_block: ` + SELECT + MAX(block_number) + FROM + events_raw`, + get_relayers: ` + SELECT + * + FROM + relayers`, + get_most_recent_pricing_date: ` + SELECT + MAX(DATE(timestamp)) + FROM + prices + `, + get_top_unknown_token_addresses: ` + SELECT a.token_address as address, a.txn_value / 2 as total_txn_value +FROM +(SELECT token_address, SUM(txn_value) as txn_value +FROM +(select a.timestamp, a.maker_token as token_address, (CASE WHEN a.taker_txn_usd_value > a.maker_txn_usd_value OR a.maker_txn_usd_value IS NULL + THEN a.taker_txn_usd_value + ELSE a.maker_txn_usd_value END) as txn_value + from events_full a + where a.event_type = 'LogFill' + and a.timestamp > (NOW() + INTERVAL '-24 hours') + union + select a.timestamp, a.taker_token as token_address, (CASE WHEN a.taker_txn_usd_value > a.maker_txn_usd_value OR a.maker_txn_usd_value IS NULL + THEN a.taker_txn_usd_value + ELSE a.maker_txn_usd_value END) as txn_value + from events_full a + where a.event_type = 'LogFill' + and a.timestamp > (NOW() + INTERVAL '-24 hours')) token_txn_values +WHERE token_address IS NOT NULL +AND txn_value > 0 +GROUP BY 1 +ORDER BY 2 DESC) a +LEFT JOIN tokens b +ON a.token_address = b.address +WHERE symbol is NULL +ORDER BY 2 DESC +`, +}; diff --git a/packages/pipeline/src/scripts/scrape_data.ts b/packages/pipeline/src/scripts/scrape_data.ts new file mode 100644 index 000000000..963670b47 --- /dev/null +++ b/packages/pipeline/src/scripts/scrape_data.ts @@ -0,0 +1,649 @@ +import { ExchangeEvents, ZeroEx } from '0x.js'; +import { HttpClient, Order, OrderbookRequest, OrderbookResponse, TokenPairsItem } from '@0xproject/connect'; +import * as Airtable from 'airtable'; +import * as commandLineArgs from 'command-line-args'; +import * as _ from 'lodash'; +import * as querystring from 'querystring'; +import * as queue from 'queue'; +import * as request from 'request'; +import * as rpn from 'request-promise-native'; + +import { HttpRequestOptions } from '../../../connect/lib/src/types.js'; +import { relayer } from '../models/relayer.js'; +import { token } from '../models/tokens.js'; +import { postgresClient } from '../postgres.js'; +import { typeConverters } from '../utils.js'; +import { web3, zrx } from '../zrx.js'; + +import { insertDataScripts } from './create_tables.js'; +import { dataFetchingQueries } from './query_data.js'; +const optionDefinitions = [ + { name: 'from', alias: 'f', type: Number }, + { name: 'to', alias: 't', type: Number }, + { name: 'type', type: String }, + { name: 'id', type: String }, + { name: 'force', type: Boolean }, + { name: 'token', type: String }, +]; +const cli = commandLineArgs(optionDefinitions); +const q = queue({ concurrency: 6, autostart: true }); +const airtableBase = new Airtable({ apiKey: process.env.AIRTABLE_API_KEY }).base(process.env.AIRTABLE_0X_BASE); +const BLOCK_INCREMENTS = 1000; +const BASE_SYMBOL = 'USD'; // use USD as base currency against +const API_HIST_LIMIT = 2000; // cryptocompare API limits histoday price query to 2000 days +const SECONDS_PER_DAY = 86400; +const PRICE_API_ENDPOINT = 'https://min-api.cryptocompare.com/data/pricehistorical'; +const RELAYER_REGISTRY_JSON = 'https://raw.githubusercontent.com/0xProject/0x-relayer-registry/master/relayers.json'; +const METAMASK_ETH_CONTRACT_METADATA_JSON = + 'https://raw.githubusercontent.com/MetaMask/eth-contract-metadata/master/contract-map.json'; +const ETHPLORER_BASE_URL = 'http://api.ethplorer.io'; +const ETHPLORER_TOP_TOKENS_JSON = `${ETHPLORER_BASE_URL}/getTopTokens?apiKey=dyijm5418TjOJe34`; +// const HIST_PRICE_API_ENDPOINT = 'https://min-api.cryptocompare.com/data/histoday'; +const AIRTABLE_RELAYER_INFO = 'Relayer Info'; +export const pullDataScripts = { + getAllEvents(fromBlockNumber: number, toBlockNumber: number): any { + return new Promise((resolve, reject) => { + const getLogsPromises: any[] = []; + getLogsPromises.push( + zrx.exchange.getLogsAsync( + ExchangeEvents.LogFill, + { fromBlock: fromBlockNumber, toBlock: toBlockNumber }, + {}, + ), + zrx.exchange.getLogsAsync( + ExchangeEvents.LogCancel, + { fromBlock: fromBlockNumber, toBlock: toBlockNumber }, + {}, + ), + zrx.exchange.getLogsAsync( + ExchangeEvents.LogError, + { fromBlock: fromBlockNumber, toBlock: toBlockNumber }, + {}, + ), + ); + Promise.all(getLogsPromises) + .then((data: any[]) => { + resolve(data); + }) + .catch((err: any) => { + reject(err); + }); + }); + }, + getBlockInfo(blockNumber: number): any { + return new Promise((resolve, reject) => { + web3.eth.getBlock(blockNumber, (err, result) => { + if (err) { + reject(err); + } else { + resolve(result); + } + }); + }); + }, + getTransactionInfo(transactionHash: string): any { + return new Promise((resolve, reject) => { + web3.eth.getTransaction(transactionHash, (err, result) => { + if (err) { + reject(err); + } else { + resolve(result); + } + }); + }); + }, + getTokenRegistry(): any { + return new Promise((resolve, reject) => { + zrx.tokenRegistry + .getTokensAsync() + .then((data: any) => { + resolve(data); + }) + .catch((err: any) => { + reject(err); + }); + }); + }, + getMetaMaskTokens(): any { + return new Promise((resolve, reject) => { + request(METAMASK_ETH_CONTRACT_METADATA_JSON, (error, response, body) => { + if (error) { + reject(error); + } else { + resolve(JSON.parse(body)); + } + }); + }); + }, + getEthplorerTopTokens(): any { + return new Promise((resolve, reject) => { + request(ETHPLORER_TOP_TOKENS_JSON, (error, response, body) => { + if (error) { + reject(error); + } else { + resolve(JSON.parse(body)); + } + }); + }); + }, + getEthplorerToken(tokenAddress: string): any { + return new Promise((resolve, reject) => { + const url = `${ETHPLORER_BASE_URL}/getTokenInfo/${tokenAddress}?apiKey=dyijm5418TjOJe34`; + request(url, (error, response, body) => { + if (error) { + reject(error); + } else { + try { + const json = JSON.parse(body); + resolve(json); + } catch (err) { + resolve({ error: 'error' }); + } + } + }); + }); + }, + getPriceData(symbol: string, timestamp: number, timeDelay?: number): any { + return new Promise((resolve, reject) => { + if (symbol === 'WETH') { + symbol = 'ETH'; + } + let parsedParams = querystring.stringify({ + fsym: symbol, + tsyms: 'USD', + ts: timestamp / 1000, + }); + console.debug(parsedParams); + setTimeout(() => { + request(PRICE_API_ENDPOINT + '?' + parsedParams, (error, response, body) => { + if (error) { + reject(error); + } else { + resolve(JSON.parse(body)); + } + }); + }, timeDelay); + }); + }, + getRelayers(): any { + return new Promise((resolve, reject) => { + request(RELAYER_REGISTRY_JSON, (error, response, body) => { + if (error) { + reject(error); + } else { + resolve(JSON.parse(body)); + } + }); + }); + }, + async getOrderBook(sraEndpoint: string): Promise { + const relayerClient = new HttpClient(sraEndpoint); + const tokenResponse: TokenPairsItem[] = await relayerClient.getTokenPairsAsync(); + const fullOrderBook: OrderbookResponse[] = []; + for (const tokenPair of tokenResponse) { + const orderBookRequest: OrderbookRequest = { + baseTokenAddress: tokenPair.tokenA.address, + quoteTokenAddress: tokenPair.tokenB.address, + }; + const orderBook: OrderbookResponse = await relayerClient.getOrderbookAsync(orderBookRequest); + fullOrderBook.push(orderBook); + } + return fullOrderBook; + }, + // async getHistoricalPrices( + // fromSymbol: string, + // toSymbol: string, + // fromTimestamp: number, + // toTimestamp: number, + // ): Promise { + // const daysInQueryPeriod = Math.round((toTimestamp - fromTimestamp) / (SECONDS_PER_DAY)); + // if(fromSymbol === 'WETH') { + // fromSymbol = 'ETH'; + // } + // var parsedParams = { + // fsym: fromSymbol, + // tsym: toSymbol, + // limit: Math.min(daysInQueryPeriod, API_HIST_LIMIT), + // toTs: toTimestamp, + // }; + // var options = { + // uri: HIST_PRICE_API_ENDPOINT, + // qs: parsedParams, + // json: false, + // }; + // try { + // const response = await rpn(options); + // return Promise.resolve(JSON.parse(response)); + // } catch (error) { + // console.debug(error); + // return Promise.reject(error); + // } + // }, +}; +export const scrapeDataScripts = { + scrapeAllPricesToDB(fromTime: number, toTime: number) { + const fromDate = new Date(fromTime); + fromDate.setUTCHours(0); + fromDate.setUTCMinutes(0); + fromDate.setUTCSeconds(0); + fromDate.setUTCMilliseconds(0); + const toDate = new Date(toTime); + postgresClient + .query(dataFetchingQueries.get_token_registry, []) + .then((result: any) => { + for (const curDate = fromDate; curDate < toDate; curDate.setDate(curDate.getDate() + 1)) { + for (const token of Object.values(result.rows)) { + console.debug('Scraping ' + curDate + ' ' + token); + q.push(_scrapePriceToDB(curDate.getTime(), token, 500)); + } + } + }) + .catch((err: any) => { + console.debug(err); + }); + }, +}; +function _scrapeEventsToDB(fromBlock: number, toBlock: number): any { + return (cb: () => void) => { + pullDataScripts + .getAllEvents(fromBlock, toBlock) + .then((data: any) => { + const parsedEvents: any = {}; + parsedEvents[ExchangeEvents.LogFill] = []; + parsedEvents[ExchangeEvents.LogCancel] = []; + parsedEvents[ExchangeEvents.LogError] = []; + for (const index in data) { + for (const datum of data[index]) { + const event = typeConverters.convertLogEventToEventObject(datum); + parsedEvents[event.event_type].push(event); + } + } + console.log(fromBlock + ' : ' + toBlock + ' ' + parsedEvents[ExchangeEvents.LogFill].length); + for (const event_type in parsedEvents) { + if (parsedEvents[event_type].length > 0) { + insertDataScripts + .insertMultipleRows( + 'events_raw', + parsedEvents[event_type], + Object.keys(parsedEvents[event_type][0]), + ) + .then(() => {}) + .catch((error: any) => {}); + } + } + cb(); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapeBlockToDB(block: number): any { + return (cb: () => void) => { + pullDataScripts + .getBlockInfo(block) + .then((data: any) => { + const parsedBlock = typeConverters.convertLogBlockToBlockObject(data); + insertDataScripts + .insertSingleRow('blocks', parsedBlock) + .then((result: any) => { + cb(); + }) + .catch((err: any) => { + cb(); + }); + }) + .catch((err: any) => { + cb(); + }); + }; +} +// function _scrapeAllRelayersToDB(): any { +// return (cb: () => void) => { +// airtableBase(AIRTABLE_RELAYER_INFO) +// .select() +// .eachPage((records: any, fetchNextPage: () => void) => { +// const parsedRelayers: any[] = []; +// for(const record of records) { +// parsedRelayers.push(typeConverters.convertRelayerToRelayerObject(record)); +// } +// insertDataScripts.insertMultipleRows('relayers', parsedRelayers, Object.keys(parsedRelayers[0])) +// .then((result: any) => { +// cb(); +// }) +// .catch((err: any) => { +// cb(); +// }); +// }) +// .catch((err: any) => { +// cb(); +// }); +// }; +// } +function _scrapeAllRelayersToDB(): any { + return (cb: () => void) => { + pullDataScripts + .getRelayers() + .then((relayers: any[]) => { + console.log(relayers); + const parsedRelayers: any[] = []; + for (const relayer of relayers) { + parsedRelayers.push(typeConverters.convertRelayerToRelayerObject(relayer)); + } + console.log(parsedRelayers); + insertDataScripts + .insertMultipleRows('relayers', parsedRelayers, Object.keys(relayer.tableProperties)) + .then((result: any) => { + console.log(result); + cb(); + }) + .catch((err: any) => { + console.log(err); + cb(); + }); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapeTransactionToDB(transactionHash: string): any { + return (cb: () => void) => { + pullDataScripts + .getTransactionInfo(transactionHash) + .then((data: any) => { + const parsedTransaction = typeConverters.convertLogTransactionToTransactionObject(data); + insertDataScripts + .insertSingleRow('transactions', parsedTransaction) + .then((result: any) => { + cb(); + }) + .catch((err: any) => { + cb(); + }); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapeTokenRegistryToDB(): any { + return (cb: () => void) => { + pullDataScripts + .getTokenRegistry() + .then((data: any) => { + const parsedTokens: any = []; + for (const token of data) { + parsedTokens.push(typeConverters.convertLogTokenToTokenObject(token)); + } + insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); + cb(); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapeMetaMaskEthContractMetadataToDB(): any { + return (cb: () => void) => { + pullDataScripts + .getMetaMaskTokens() + .then((data: any) => { + const parsedTokens: any = []; + const dataArray = _.map(_.keys(data), (tokenAddress: string) => { + const value = _.get(data, tokenAddress); + return { + address: tokenAddress, + ...value, + }; + }); + const erc20TokensOnly = _.filter(dataArray, entry => { + const isErc20 = _.get(entry, 'erc20'); + return isErc20; + }); + for (const token of erc20TokensOnly) { + parsedTokens.push(typeConverters.convertMetaMaskTokenToTokenObject(token)); + } + insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); + cb(); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapeEthplorerTopTokensToDB(): any { + return (cb: () => void) => { + pullDataScripts + .getEthplorerTopTokens() + .then((data: any) => { + const parsedTokens: any = []; + const tokens = _.get(data, 'tokens'); + for (const token of tokens) { + parsedTokens.push(typeConverters.convertMetaMaskTokenToTokenObject(token)); + } + insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); + cb(); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapeUnknownTokenInformationToDB(): any { + return (cb: () => void) => { + postgresClient + .query(dataFetchingQueries.get_top_unknown_token_addresses) + .then(async (result: any) => { + const addresses = _.map(result.rows, row => _.get(row, 'address')); + const responses = await Promise.all( + _.map(addresses, address => pullDataScripts.getEthplorerToken(address)), + ); + const tokens = _.filter(responses, response => _.isUndefined(_.get(response, 'error'))); + const parsedTokens = _.map(tokens, tokenInfo => + typeConverters.convertEthplorerTokenToTokenObject(tokenInfo), + ); + insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); + cb(); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapePriceToDB(timestamp: number, token: any, timeDelay?: number): any { + return (cb: () => void) => { + pullDataScripts + .getPriceData(token.symbol, timestamp, timeDelay) + .then((data: any) => { + const safeSymbol = token.symbol === 'WETH' ? 'ETH' : token.symbol; + const parsedPrice = { + timestamp: timestamp / 1000, + symbol: token.symbol, + base: 'USD', + price: _.has(data[safeSymbol], 'USD') ? data[safeSymbol].USD : 0, + }; + console.debug('Inserting ' + timestamp); + console.debug(parsedPrice); + insertDataScripts.insertSingleRow('prices', parsedPrice); + cb(); + }) + .catch((err: any) => { + console.debug(err); + cb(); + }); + }; +} +// function _scrapeHistoricalPricesToDB(token: any, fromTimestamp: number, toTimestamp: number): any { +// return (cb: () => void) => { +// pullDataScripts +// .getHistoricalPrices(token, BASE_SYMBOL, fromTimestamp, toTimestamp) +// .then((data: any) => { +// const parsedHistoricalPrices: any = []; +// for (const historicalPrice of data['Data']) { +// const parsedHistoricalPrice = typeConverters.convertLogHistoricalPricesToHistoricalPricesObject(historicalPrice); +// parsedHistoricalPrice['token'] = token; +// parsedHistoricalPrice['base'] = BASE_SYMBOL; +// parsedHistoricalPrices.push(parsedHistoricalPrice); +// } +// if (parsedHistoricalPrices.length > 0) { +// insertDataScripts +// .insertMultipleRows( +// 'historical_prices', +// parsedHistoricalPrices, +// Object.keys(parsedHistoricalPrices[0]), +// ) +// .catch((error: any) => { +// console.error(error); +// }); +// } +// cb(); +// }) +// .catch((error: any) => { +// console.error(error); +// cb(); +// }); +// }; +// } +function _scrapeOrderBookToDB(id: string, sraEndpoint: string): any { + return (cb: () => void) => { + pullDataScripts + .getOrderBook(sraEndpoint) + .then((data: any) => { + for (const book of data) { + for (const order of book.bids) { + console.debug(order); + const parsedOrder = typeConverters.convertLogOrderToOrderObject(order); + parsedOrder.relayer_id = id; + parsedOrder.order_hash = ZeroEx.getOrderHashHex(order); + insertDataScripts.insertSingleRow('orders', parsedOrder).catch((error: any) => { + console.error(error); + }); + } + for (const order of book.asks) { + console.debug(order); + const parsedOrder = typeConverters.convertLogOrderToOrderObject(order); + parsedOrder.relayer_id = id; + parsedOrder.order_hash = ZeroEx.getOrderHashHex(order); + insertDataScripts.insertSingleRow('orders', parsedOrder).catch((error: any) => { + console.error(error); + }); + } + } + cb(); + }) + .catch((error: any) => { + console.error(error); + cb(); + }); + }; +} +if (cli.type === 'events') { + if (cli.from && cli.to) { + const destToBlock = cli.to ? cli.to : cli.from; + let curFromBlock = cli.from; + let curToBlock = curFromBlock; + do { + curToBlock += destToBlock - curToBlock < BLOCK_INCREMENTS ? destToBlock - curToBlock : BLOCK_INCREMENTS; + q.push(_scrapeEventsToDB(curFromBlock, curToBlock)); + curFromBlock = curToBlock + 1; + } while (curToBlock < destToBlock); + } +} else if (cli.type === 'blocks') { + if (cli.from && cli.to) { + if (cli.force) { + const destToBlock = cli.to ? cli.to : cli.from; + let curFromBlock = cli.from; + const curToBlock = curFromBlock; + for (; curFromBlock < destToBlock; curFromBlock++) { + q.push(_scrapeBlockToDB(curFromBlock)); + } + } else { + const fetchFrom = cli.from; + const fetchTo = cli.to ? cli.to : cli.from + 1; + postgresClient + .query(dataFetchingQueries.get_used_block_numbers, [fetchFrom, fetchTo]) + .then((data: any) => { + for (const row of data.rows) { + q.push(_scrapeBlockToDB(row.block_number)); + } + }) + .catch((err: any) => { + // console.debug(err); + }); + } + } +} else if (cli.type === 'transactions') { + if (cli.id) { + q.push(_scrapeTransactionToDB(cli.id)); + } else if (cli.from) { + const fetchFrom = cli.from; + const fetchTo = cli.to ? cli.to : cli.from + 1; + postgresClient + .query(dataFetchingQueries.get_missing_txn_hashes, [fetchFrom, fetchTo]) + .then((data: any) => { + for (const row of data.rows) { + q.push(_scrapeTransactionToDB(row.txn_hash)); + } + }) + .catch((err: any) => { + // console.debug(err); + }); + } +} else if (cli.type === 'tokens') { + q.push(_scrapeMetaMaskEthContractMetadataToDB()); + q.push(_scrapeEthplorerTopTokensToDB()); +} else if (cli.type === 'unknown_tokens') { + q.push(_scrapeUnknownTokenInformationToDB()); +} else if (cli.type === 'prices' && cli.from && cli.to) { + const fromDate = new Date(cli.from); + console.debug(fromDate); + fromDate.setUTCHours(0); + fromDate.setUTCMinutes(0); + fromDate.setUTCSeconds(0); + fromDate.setUTCMilliseconds(0); + console.debug(fromDate); + const toDate = new Date(cli.to); + postgresClient + .query(dataFetchingQueries.get_token_registry, []) + .then((result: any) => { + for (const curDate = fromDate; curDate < toDate; curDate.setDate(curDate.getDate() + 1)) { + for (const token of Object.values(result.rows)) { + console.debug('Scraping ' + curDate + ' ' + token); + q.push(_scrapePriceToDB(curDate.getTime(), token)); + } + } + }) + .catch((err: any) => { + console.debug(err); + }); + // } else if (cli.type === 'historical_prices') { + // if (cli.token && cli.from && cli.to) { + // q.push(_scrapeHistoricalPricesToDB(cli.token, cli.from, cli.to)); + // } + // } else if (cli.type === 'all_historical_prices') { + // if (cli.from && cli.to) { + // postgresClient + // .query(dataFetchingQueries.get_token_registry, []) + // .then((result: any) => { + // const curTokens: any = result.rows.map((a: any): any => a.symbol); + // for (const curToken of curTokens) { + // console.debug('Historical data backfill: Pushing coin ' + curToken); + // q.push(_scrapeHistoricalPricesToDB(curToken, cli.from, cli.to)); + // } + // }) + // .catch((err: any) => { + // console.debug(err); + // }); + // } +} else if (cli.type === 'relayers') { + q.push(_scrapeAllRelayersToDB()); +} else if (cli.type === 'orders') { + postgresClient.query(dataFetchingQueries.get_relayers, []).then((result: any) => { + for (const relayer of result.rows) { + if (relayer.sra_http_url) { + q.push(_scrapeOrderBookToDB(relayer.id, relayer.sra_http_url)); + } + } + }); +} -- cgit v1.2.3 From cd73a047efbc1b7afa884ef27a1cc4991e20efd8 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Wed, 19 Sep 2018 10:51:12 -0700 Subject: Remove old code. Create function for getting contract events via etherscan --- packages/pipeline/src/scripts/create_tables.ts | 258 ---------- packages/pipeline/src/scripts/join_tables.ts | 234 --------- packages/pipeline/src/scripts/query_data.ts | 87 ---- packages/pipeline/src/scripts/scrape_data.ts | 649 ------------------------- 4 files changed, 1228 deletions(-) delete mode 100644 packages/pipeline/src/scripts/create_tables.ts delete mode 100644 packages/pipeline/src/scripts/join_tables.ts delete mode 100644 packages/pipeline/src/scripts/query_data.ts delete mode 100644 packages/pipeline/src/scripts/scrape_data.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/create_tables.ts b/packages/pipeline/src/scripts/create_tables.ts deleted file mode 100644 index fd0d2b78b..000000000 --- a/packages/pipeline/src/scripts/create_tables.ts +++ /dev/null @@ -1,258 +0,0 @@ -import * as commandLineArgs from 'command-line-args'; - -import { postgresClient } from '../postgres'; -import { formatters } from '../utils'; -const tableQueries: any = { - events_full: `CREATE TABLE IF NOT EXISTS events_full ( - timestamp TIMESTAMP WITH TIME ZONE, - event_type VARCHAR, - error_id VARCHAR, - order_hash CHAR(66), - maker CHAR(42), - maker_amount NUMERIC(78), - maker_fee NUMERIC(78), - maker_token CHAR(42), - taker CHAR(42), - taker_amount NUMERIC(78), - taker_fee NUMERIC(78), - taker_token CHAR(42), - txn_hash CHAR(66), - gas_used NUMERIC(78), - gas_price NUMERIC(78), - fee_recipient CHAR(42), - method_id CHAR(10), - salt VARCHAR, - block_number BIGINT, - log_index BIGINT, - taker_symbol VARCHAR, - taker_name VARCHAR, - taker_decimals BIGINT, - taker_usd_price NUMERIC(78), - taker_txn_usd_value NUMERIC(78), - maker_symbol VARCHAR, - maker_name VARCHAR, - maker_decimals BIGINT, - maker_usd_price NUMERIC(78), - maker_txn_usd_value NUMERIC(78), - PRIMARY KEY (txn_hash, order_hash, log_index) - )`, - events: `CREATE TABLE IF NOT EXISTS events ( - timestamp TIMESTAMP WITH TIME ZONE, - event_type VARCHAR, - error_id VARCHAR, - order_hash CHAR(66), - maker CHAR(42), - maker_amount NUMERIC(78), - maker_fee NUMERIC(78), - maker_token CHAR(42), - taker CHAR(42), - taker_amount NUMERIC(78), - taker_fee NUMERIC(78), - taker_token CHAR(42), - txn_hash CHAR(66), - gas_used NUMERIC(78), - gas_price NUMERIC(78), - fee_recipient CHAR(42), - method_id CHAR(10), - salt VARCHAR, - block_number BIGINT, - log_index BIGINT, - PRIMARY KEY (txn_hash, order_hash, log_index) - )`, - events_staging: `CREATE TABLE IF NOT EXISTS events_staging ( - timestamp TIMESTAMP WITH TIME ZONE, - event_type VARCHAR, - error_id VARCHAR, - order_hash CHAR(66), - maker CHAR(42), - maker_amount NUMERIC(78), - maker_fee NUMERIC(78), - maker_token CHAR(42), - taker CHAR(42), - taker_amount NUMERIC(78), - taker_fee NUMERIC(78), - taker_token CHAR(42), - txn_hash CHAR(66), - fee_recipient CHAR(42), - block_number BIGINT, - log_index BIGINT, - PRIMARY KEY (txn_hash, order_hash, log_index) - )`, - events_raw: `CREATE TABLE IF NOT EXISTS events_raw ( - event_type VARCHAR, - error_id VARCHAR, - order_hash CHAR(66), - maker CHAR(42), - maker_amount NUMERIC(78), - maker_fee NUMERIC(78), - maker_token CHAR(42), - taker CHAR(42), - taker_amount NUMERIC(78), - taker_fee NUMERIC(78), - taker_token CHAR(42), - txn_hash CHAR(66), - fee_recipient CHAR(42), - block_number BIGINT, - log_index BIGINT, - PRIMARY KEY (txn_hash, order_hash, log_index) - )`, - blocks: `CREATE TABLE IF NOT EXISTS blocks ( - timestamp TIMESTAMP WITH TIME ZONE, - block_hash CHAR(66) UNIQUE, - block_number BIGINT, - PRIMARY KEY (block_hash) - )`, - transactions: `CREATE TABLE IF NOT EXISTS transactions ( - txn_hash CHAR(66) UNIQUE, - block_hash CHAR(66), - block_number BIGINT, - gas_used NUMERIC(78), - gas_price NUMERIC(78), - method_id CHAR(10), - salt VARCHAR, - PRIMARY KEY (txn_hash) - )`, - tokens: `CREATE TABLE IF NOT EXISTS tokens ( - address CHAR(42) UNIQUE, - name VARCHAR, - symbol VARCHAR, - decimals INT, - PRIMARY KEY (address) - )`, - prices: `CREATE TABLE IF NOT EXISTS prices ( - address CHAR(42) UNIQUE, - timestamp TIMESTAMP WITH TIME ZONE, - price NUMERIC(78, 18), - PRIMARY KEY (address, timestamp) - )`, - relayers: `CREATE TABLE IF NOT EXISTS relayers ( - name VARCHAR UNIQUE, - url VARCHAR DEFAULT '', - sra_http_endpoint VARCHAR DEFAULT '', - sra_ws_endpoint VARCHAR DEFAULT '', - fee_recipient_addresses CHAR(42)[] DEFAULT '{}', - taker_addresses CHAR(42)[] DEFAULT '{}', - PRIMARY KEY(name)`, - historical_prices: `CREATE TABLE IF NOT EXISTS historical_prices ( - token VARCHAR, - base VARCHAR, - timestamp TIMESTAMP WITH TIME ZONE, - close NUMERIC(78, 18), - high NUMERIC(78, 18), - low NUMERIC(78, 18), - open NUMERIC(78, 18), - volume_from NUMERIC(78, 18), - volume_to NUMERIC(78, 18), - PRIMARY KEY (token, base, timestamp) - )`, - orders: `CREATE TABLE IF NOT EXISTS orders ( - relayer_id VARCHAR, - exchange_contract_address CHAR(42), - maker CHAR(42), - maker_amount NUMERIC(78), - maker_fee NUMERIC(78), - maker_token CHAR(42), - taker CHAR(42), - taker_amount NUMERIC(78), - taker_fee NUMERIC(78), - taker_token CHAR(42), - fee_recipient CHAR(42), - expiration_unix_timestamp_sec NUMERIC(78), - salt VARCHAR, - order_hash CHAR(66), - PRIMARY KEY (relayer_id, order_hash) - )`, -}; -function _safeQuery(query: string): any { - return new Promise((resolve, reject) => { - postgresClient - .query(query) - .then((data: any) => { - resolve(data); - }) - .catch((err: any) => { - reject(err); - }); - }); -} -export const tableScripts = { - createTable(query: string): any { - return _safeQuery(query); - }, - createAllTables(): any { - for (const tableName of tableQueries) { - _safeQuery(tableQueries[tableName]); - } - }, -}; -export const insertDataScripts = { - insertSingleRow(table: string, object: any): any { - return new Promise((resolve, reject) => { - const columns = Object.keys(object); - const safeArray: any = []; - for (const key of columns) { - if (key in object) { - if (key === 'timestamp') { - safeArray.push('to_timestamp(' + object[key] + ')'); - } else if (typeof object[key] === 'string' || object[key] instanceof String) { - safeArray.push(formatters.escapeSQLParam(object[key])); - } else { - safeArray.push(object[key]); - } - } else { - safeArray.push('default'); - } - } - const queryString = `INSERT INTO ${table} (${columns}) VALUES (${safeArray}) ON CONFLICT DO NOTHING`; - console.log(queryString); - postgresClient - .query(queryString) - .then((data: any) => { - resolve(data); - }) - .catch((err: any) => { - reject(err); - }); - }); - }, - insertMultipleRows(table: string, rows: any[], columns: any[]): any { - return new Promise((resolve, reject) => { - if (rows.length > 0) { - const rowsSplit = rows.map((value, index) => { - const safeArray: any = []; - for (const key of columns) { - if (key in value) { - if (key === 'timestamp') { - safeArray.push('to_timestamp(' + value[key] + ')'); - } else if (typeof value[key] === 'string' || value[key] instanceof String) { - safeArray.push(formatters.escapeSQLParam(value[key])); - } else if (value[key] instanceof Array) { - const escapedArray = value[key].map((subValue: string, subIndex: number) => { - return formatters.escapeSQLParam(subValue); - }); - safeArray.push('ARRAY[' + escapedArray.toString() + ']'); - } else { - safeArray.push(value[key]); - } - } else { - safeArray.push('default'); - } - } - return '(' + safeArray + ')'; - }); - const queryString = `INSERT INTO ${table} (${columns}) VALUES ${rowsSplit} ON CONFLICT DO NOTHING`; - postgresClient - .query(queryString) - .then((data: any) => { - resolve(data); - }) - .catch((err: any) => { - // console.log(err); - reject(err); - }); - } else { - resolve({}); - } - }); - }, -}; diff --git a/packages/pipeline/src/scripts/join_tables.ts b/packages/pipeline/src/scripts/join_tables.ts deleted file mode 100644 index e7c05b39a..000000000 --- a/packages/pipeline/src/scripts/join_tables.ts +++ /dev/null @@ -1,234 +0,0 @@ -import * as commandLineArgs from 'command-line-args'; - -import { postgresClient } from '../postgres'; -import { formatters } from '../utils'; -const optionDefinitions = [ - { name: 'name', alias: 'n', type: String }, - { name: 'from', alias: 'f', type: Number }, - { name: 'to', alias: 't', type: Number }, -]; -const cli = commandLineArgs(optionDefinitions); -const dataInsertionQueries: any = { - events_staging: `INSERT INTO events_staging ( - timestamp, - event_type, - error_id, - order_hash, - maker, - maker_amount, - maker_fee, - maker_token, - taker, - taker_amount, - taker_fee, - taker_token, - txn_hash, - fee_recipient, - block_number, - log_index - ) - (SELECT - b.timestamp, - a.event_type, - a.error_id, - a.order_hash, - a.maker, - a.maker_amount, - a.maker_fee, - a.maker_token, - a.taker, - a.taker_amount, - a.taker_fee, - a.taker_token, - a.txn_hash, - a.fee_recipient, - a.block_number, - a.log_index - FROM - events_raw a - JOIN - blocks b - ON - a.block_number = b.block_number - AND - b.block_number >= $1 - AND - b.block_number <= $2 - ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`, - events: `INSERT INTO events ( - timestamp, - event_type, - error_id, - order_hash, - maker, - maker_amount, - maker_fee, - maker_token, - taker, - taker_amount, - taker_fee, - taker_token, - txn_hash, - fee_recipient, - block_number, - log_index, - gas_used, - gas_price, - method_id, - salt - ) - (SELECT - a.timestamp, - a.event_type, - a.error_id, - a.order_hash, - a.maker, - a.maker_amount, - a.maker_fee, - a.maker_token, - a.taker, - a.taker_amount, - a.taker_fee, - a.taker_token, - a.txn_hash, - a.fee_recipient, - a.block_number, - a.log_index, - b.gas_used, - b.gas_price, - b.method_id, - b.salt - FROM - events_staging a - JOIN - transactions b - ON - a.txn_hash = b.txn_hash - AND - a.block_number >= $1 - AND - a.block_number <= $2 - ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`, - events_full: ` - INSERT INTO events_full ( - timestamp, - event_type, - error_id, - order_hash, - maker, - maker_amount, - maker_fee, - maker_token, - taker, - taker_amount, - taker_fee, - taker_token, - txn_hash, - fee_recipient, - block_number, - log_index, - gas_used, - gas_price, - method_id, - salt, - taker_symbol, - taker_name, - taker_decimals, - taker_usd_price, - taker_txn_usd_value, - maker_symbol, - maker_name, - maker_decimals, - maker_usd_price, - maker_txn_usd_value - ) - (SELECT - events.timestamp, - events.event_type, - events.error_id, - events.order_hash, - events.maker, - events.maker_amount, - events.maker_fee, - events.maker_token, - events.taker, - events.taker_amount, - events.taker_fee, - events.taker_token, - events.txn_hash, - events.fee_recipient, - events.block_number, - events.log_index, - events.gas_used, - events.gas_price, - events.method_id, - events.salt, - taker_token_prices.symbol, - taker_token_prices.name, - taker_token_prices.decimals, - taker_token_prices.price, - (events.taker_amount / (10 ^ taker_token_prices.decimals)) * taker_token_prices.price, - maker_token_prices.symbol, - maker_token_prices.name, - maker_token_prices.decimals, - maker_token_prices.price, - (events.maker_amount / (10 ^ maker_token_prices.decimals)) * maker_token_prices.price - FROM - events - LEFT JOIN - (SELECT - tokens.address, - tokens.name, - tokens.symbol, - tokens.decimals, - prices.timestamp, - prices.price - FROM - tokens - LEFT JOIN - prices - ON - tokens.symbol = prices.symbol) taker_token_prices - ON - (events.taker_token = taker_token_prices.address - AND - (DATE(events.timestamp) = DATE(taker_token_prices.timestamp) OR taker_token_prices.timestamp IS NULL)) - LEFT JOIN - (SELECT - tokens.address, - tokens.name, - tokens.symbol, - tokens.decimals, - prices.timestamp, - prices.price - FROM - tokens - LEFT JOIN - prices - ON - tokens.symbol = prices.symbol) maker_token_prices - ON - (events.maker_token = maker_token_prices.address - AND - (DATE(events.timestamp) = DATE(maker_token_prices.timestamp) OR maker_token_prices.timestamp IS NULL)) - WHERE - events.block_number >= $1 - AND - events.block_number <= $2 - ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`, -}; -if (cli.name) { - const query = dataInsertionQueries[cli.name]; - if (query && cli.from) { - const fromBlock = cli.from; - const toBlock = cli.to ? cli.to : cli.from + 1; - postgresClient - .query(query, [fromBlock, toBlock]) - .then((data: any) => { - console.log(data); - }) - .catch((err: any) => { - console.error(err); - }); - } -} diff --git a/packages/pipeline/src/scripts/query_data.ts b/packages/pipeline/src/scripts/query_data.ts deleted file mode 100644 index 97e3749ea..000000000 --- a/packages/pipeline/src/scripts/query_data.ts +++ /dev/null @@ -1,87 +0,0 @@ -import { formatters } from '../utils'; -export const dataFetchingQueries: any = { - get_missing_txn_hashes: ` - SELECT - a.txn_hash - FROM - events_raw a - WHERE NOT EXISTS - ( - SELECT - * - FROM - transactions b - WHERE - b.txn_hash = a.txn_hash - ) - AND - a.block_number >= $1 - AND - a.block_number < $2`, - get_used_block_numbers: ` - SELECT DISTINCT - a.block_number - FROM - events_raw a - WHERE NOT EXISTS - ( - SELECT - * - FROM - blocks b - WHERE - b.block_number = a.block_number - ) - AND - a.block_number >= $1 - AND - a.block_number < $2`, - get_token_registry: ` - SELECT - * - FROM - tokens`, - get_max_block: ` - SELECT - MAX(block_number) - FROM - events_raw`, - get_relayers: ` - SELECT - * - FROM - relayers`, - get_most_recent_pricing_date: ` - SELECT - MAX(DATE(timestamp)) - FROM - prices - `, - get_top_unknown_token_addresses: ` - SELECT a.token_address as address, a.txn_value / 2 as total_txn_value -FROM -(SELECT token_address, SUM(txn_value) as txn_value -FROM -(select a.timestamp, a.maker_token as token_address, (CASE WHEN a.taker_txn_usd_value > a.maker_txn_usd_value OR a.maker_txn_usd_value IS NULL - THEN a.taker_txn_usd_value - ELSE a.maker_txn_usd_value END) as txn_value - from events_full a - where a.event_type = 'LogFill' - and a.timestamp > (NOW() + INTERVAL '-24 hours') - union - select a.timestamp, a.taker_token as token_address, (CASE WHEN a.taker_txn_usd_value > a.maker_txn_usd_value OR a.maker_txn_usd_value IS NULL - THEN a.taker_txn_usd_value - ELSE a.maker_txn_usd_value END) as txn_value - from events_full a - where a.event_type = 'LogFill' - and a.timestamp > (NOW() + INTERVAL '-24 hours')) token_txn_values -WHERE token_address IS NOT NULL -AND txn_value > 0 -GROUP BY 1 -ORDER BY 2 DESC) a -LEFT JOIN tokens b -ON a.token_address = b.address -WHERE symbol is NULL -ORDER BY 2 DESC -`, -}; diff --git a/packages/pipeline/src/scripts/scrape_data.ts b/packages/pipeline/src/scripts/scrape_data.ts deleted file mode 100644 index 963670b47..000000000 --- a/packages/pipeline/src/scripts/scrape_data.ts +++ /dev/null @@ -1,649 +0,0 @@ -import { ExchangeEvents, ZeroEx } from '0x.js'; -import { HttpClient, Order, OrderbookRequest, OrderbookResponse, TokenPairsItem } from '@0xproject/connect'; -import * as Airtable from 'airtable'; -import * as commandLineArgs from 'command-line-args'; -import * as _ from 'lodash'; -import * as querystring from 'querystring'; -import * as queue from 'queue'; -import * as request from 'request'; -import * as rpn from 'request-promise-native'; - -import { HttpRequestOptions } from '../../../connect/lib/src/types.js'; -import { relayer } from '../models/relayer.js'; -import { token } from '../models/tokens.js'; -import { postgresClient } from '../postgres.js'; -import { typeConverters } from '../utils.js'; -import { web3, zrx } from '../zrx.js'; - -import { insertDataScripts } from './create_tables.js'; -import { dataFetchingQueries } from './query_data.js'; -const optionDefinitions = [ - { name: 'from', alias: 'f', type: Number }, - { name: 'to', alias: 't', type: Number }, - { name: 'type', type: String }, - { name: 'id', type: String }, - { name: 'force', type: Boolean }, - { name: 'token', type: String }, -]; -const cli = commandLineArgs(optionDefinitions); -const q = queue({ concurrency: 6, autostart: true }); -const airtableBase = new Airtable({ apiKey: process.env.AIRTABLE_API_KEY }).base(process.env.AIRTABLE_0X_BASE); -const BLOCK_INCREMENTS = 1000; -const BASE_SYMBOL = 'USD'; // use USD as base currency against -const API_HIST_LIMIT = 2000; // cryptocompare API limits histoday price query to 2000 days -const SECONDS_PER_DAY = 86400; -const PRICE_API_ENDPOINT = 'https://min-api.cryptocompare.com/data/pricehistorical'; -const RELAYER_REGISTRY_JSON = 'https://raw.githubusercontent.com/0xProject/0x-relayer-registry/master/relayers.json'; -const METAMASK_ETH_CONTRACT_METADATA_JSON = - 'https://raw.githubusercontent.com/MetaMask/eth-contract-metadata/master/contract-map.json'; -const ETHPLORER_BASE_URL = 'http://api.ethplorer.io'; -const ETHPLORER_TOP_TOKENS_JSON = `${ETHPLORER_BASE_URL}/getTopTokens?apiKey=dyijm5418TjOJe34`; -// const HIST_PRICE_API_ENDPOINT = 'https://min-api.cryptocompare.com/data/histoday'; -const AIRTABLE_RELAYER_INFO = 'Relayer Info'; -export const pullDataScripts = { - getAllEvents(fromBlockNumber: number, toBlockNumber: number): any { - return new Promise((resolve, reject) => { - const getLogsPromises: any[] = []; - getLogsPromises.push( - zrx.exchange.getLogsAsync( - ExchangeEvents.LogFill, - { fromBlock: fromBlockNumber, toBlock: toBlockNumber }, - {}, - ), - zrx.exchange.getLogsAsync( - ExchangeEvents.LogCancel, - { fromBlock: fromBlockNumber, toBlock: toBlockNumber }, - {}, - ), - zrx.exchange.getLogsAsync( - ExchangeEvents.LogError, - { fromBlock: fromBlockNumber, toBlock: toBlockNumber }, - {}, - ), - ); - Promise.all(getLogsPromises) - .then((data: any[]) => { - resolve(data); - }) - .catch((err: any) => { - reject(err); - }); - }); - }, - getBlockInfo(blockNumber: number): any { - return new Promise((resolve, reject) => { - web3.eth.getBlock(blockNumber, (err, result) => { - if (err) { - reject(err); - } else { - resolve(result); - } - }); - }); - }, - getTransactionInfo(transactionHash: string): any { - return new Promise((resolve, reject) => { - web3.eth.getTransaction(transactionHash, (err, result) => { - if (err) { - reject(err); - } else { - resolve(result); - } - }); - }); - }, - getTokenRegistry(): any { - return new Promise((resolve, reject) => { - zrx.tokenRegistry - .getTokensAsync() - .then((data: any) => { - resolve(data); - }) - .catch((err: any) => { - reject(err); - }); - }); - }, - getMetaMaskTokens(): any { - return new Promise((resolve, reject) => { - request(METAMASK_ETH_CONTRACT_METADATA_JSON, (error, response, body) => { - if (error) { - reject(error); - } else { - resolve(JSON.parse(body)); - } - }); - }); - }, - getEthplorerTopTokens(): any { - return new Promise((resolve, reject) => { - request(ETHPLORER_TOP_TOKENS_JSON, (error, response, body) => { - if (error) { - reject(error); - } else { - resolve(JSON.parse(body)); - } - }); - }); - }, - getEthplorerToken(tokenAddress: string): any { - return new Promise((resolve, reject) => { - const url = `${ETHPLORER_BASE_URL}/getTokenInfo/${tokenAddress}?apiKey=dyijm5418TjOJe34`; - request(url, (error, response, body) => { - if (error) { - reject(error); - } else { - try { - const json = JSON.parse(body); - resolve(json); - } catch (err) { - resolve({ error: 'error' }); - } - } - }); - }); - }, - getPriceData(symbol: string, timestamp: number, timeDelay?: number): any { - return new Promise((resolve, reject) => { - if (symbol === 'WETH') { - symbol = 'ETH'; - } - let parsedParams = querystring.stringify({ - fsym: symbol, - tsyms: 'USD', - ts: timestamp / 1000, - }); - console.debug(parsedParams); - setTimeout(() => { - request(PRICE_API_ENDPOINT + '?' + parsedParams, (error, response, body) => { - if (error) { - reject(error); - } else { - resolve(JSON.parse(body)); - } - }); - }, timeDelay); - }); - }, - getRelayers(): any { - return new Promise((resolve, reject) => { - request(RELAYER_REGISTRY_JSON, (error, response, body) => { - if (error) { - reject(error); - } else { - resolve(JSON.parse(body)); - } - }); - }); - }, - async getOrderBook(sraEndpoint: string): Promise { - const relayerClient = new HttpClient(sraEndpoint); - const tokenResponse: TokenPairsItem[] = await relayerClient.getTokenPairsAsync(); - const fullOrderBook: OrderbookResponse[] = []; - for (const tokenPair of tokenResponse) { - const orderBookRequest: OrderbookRequest = { - baseTokenAddress: tokenPair.tokenA.address, - quoteTokenAddress: tokenPair.tokenB.address, - }; - const orderBook: OrderbookResponse = await relayerClient.getOrderbookAsync(orderBookRequest); - fullOrderBook.push(orderBook); - } - return fullOrderBook; - }, - // async getHistoricalPrices( - // fromSymbol: string, - // toSymbol: string, - // fromTimestamp: number, - // toTimestamp: number, - // ): Promise { - // const daysInQueryPeriod = Math.round((toTimestamp - fromTimestamp) / (SECONDS_PER_DAY)); - // if(fromSymbol === 'WETH') { - // fromSymbol = 'ETH'; - // } - // var parsedParams = { - // fsym: fromSymbol, - // tsym: toSymbol, - // limit: Math.min(daysInQueryPeriod, API_HIST_LIMIT), - // toTs: toTimestamp, - // }; - // var options = { - // uri: HIST_PRICE_API_ENDPOINT, - // qs: parsedParams, - // json: false, - // }; - // try { - // const response = await rpn(options); - // return Promise.resolve(JSON.parse(response)); - // } catch (error) { - // console.debug(error); - // return Promise.reject(error); - // } - // }, -}; -export const scrapeDataScripts = { - scrapeAllPricesToDB(fromTime: number, toTime: number) { - const fromDate = new Date(fromTime); - fromDate.setUTCHours(0); - fromDate.setUTCMinutes(0); - fromDate.setUTCSeconds(0); - fromDate.setUTCMilliseconds(0); - const toDate = new Date(toTime); - postgresClient - .query(dataFetchingQueries.get_token_registry, []) - .then((result: any) => { - for (const curDate = fromDate; curDate < toDate; curDate.setDate(curDate.getDate() + 1)) { - for (const token of Object.values(result.rows)) { - console.debug('Scraping ' + curDate + ' ' + token); - q.push(_scrapePriceToDB(curDate.getTime(), token, 500)); - } - } - }) - .catch((err: any) => { - console.debug(err); - }); - }, -}; -function _scrapeEventsToDB(fromBlock: number, toBlock: number): any { - return (cb: () => void) => { - pullDataScripts - .getAllEvents(fromBlock, toBlock) - .then((data: any) => { - const parsedEvents: any = {}; - parsedEvents[ExchangeEvents.LogFill] = []; - parsedEvents[ExchangeEvents.LogCancel] = []; - parsedEvents[ExchangeEvents.LogError] = []; - for (const index in data) { - for (const datum of data[index]) { - const event = typeConverters.convertLogEventToEventObject(datum); - parsedEvents[event.event_type].push(event); - } - } - console.log(fromBlock + ' : ' + toBlock + ' ' + parsedEvents[ExchangeEvents.LogFill].length); - for (const event_type in parsedEvents) { - if (parsedEvents[event_type].length > 0) { - insertDataScripts - .insertMultipleRows( - 'events_raw', - parsedEvents[event_type], - Object.keys(parsedEvents[event_type][0]), - ) - .then(() => {}) - .catch((error: any) => {}); - } - } - cb(); - }) - .catch((err: any) => { - cb(); - }); - }; -} -function _scrapeBlockToDB(block: number): any { - return (cb: () => void) => { - pullDataScripts - .getBlockInfo(block) - .then((data: any) => { - const parsedBlock = typeConverters.convertLogBlockToBlockObject(data); - insertDataScripts - .insertSingleRow('blocks', parsedBlock) - .then((result: any) => { - cb(); - }) - .catch((err: any) => { - cb(); - }); - }) - .catch((err: any) => { - cb(); - }); - }; -} -// function _scrapeAllRelayersToDB(): any { -// return (cb: () => void) => { -// airtableBase(AIRTABLE_RELAYER_INFO) -// .select() -// .eachPage((records: any, fetchNextPage: () => void) => { -// const parsedRelayers: any[] = []; -// for(const record of records) { -// parsedRelayers.push(typeConverters.convertRelayerToRelayerObject(record)); -// } -// insertDataScripts.insertMultipleRows('relayers', parsedRelayers, Object.keys(parsedRelayers[0])) -// .then((result: any) => { -// cb(); -// }) -// .catch((err: any) => { -// cb(); -// }); -// }) -// .catch((err: any) => { -// cb(); -// }); -// }; -// } -function _scrapeAllRelayersToDB(): any { - return (cb: () => void) => { - pullDataScripts - .getRelayers() - .then((relayers: any[]) => { - console.log(relayers); - const parsedRelayers: any[] = []; - for (const relayer of relayers) { - parsedRelayers.push(typeConverters.convertRelayerToRelayerObject(relayer)); - } - console.log(parsedRelayers); - insertDataScripts - .insertMultipleRows('relayers', parsedRelayers, Object.keys(relayer.tableProperties)) - .then((result: any) => { - console.log(result); - cb(); - }) - .catch((err: any) => { - console.log(err); - cb(); - }); - }) - .catch((err: any) => { - cb(); - }); - }; -} -function _scrapeTransactionToDB(transactionHash: string): any { - return (cb: () => void) => { - pullDataScripts - .getTransactionInfo(transactionHash) - .then((data: any) => { - const parsedTransaction = typeConverters.convertLogTransactionToTransactionObject(data); - insertDataScripts - .insertSingleRow('transactions', parsedTransaction) - .then((result: any) => { - cb(); - }) - .catch((err: any) => { - cb(); - }); - }) - .catch((err: any) => { - cb(); - }); - }; -} -function _scrapeTokenRegistryToDB(): any { - return (cb: () => void) => { - pullDataScripts - .getTokenRegistry() - .then((data: any) => { - const parsedTokens: any = []; - for (const token of data) { - parsedTokens.push(typeConverters.convertLogTokenToTokenObject(token)); - } - insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); - cb(); - }) - .catch((err: any) => { - cb(); - }); - }; -} -function _scrapeMetaMaskEthContractMetadataToDB(): any { - return (cb: () => void) => { - pullDataScripts - .getMetaMaskTokens() - .then((data: any) => { - const parsedTokens: any = []; - const dataArray = _.map(_.keys(data), (tokenAddress: string) => { - const value = _.get(data, tokenAddress); - return { - address: tokenAddress, - ...value, - }; - }); - const erc20TokensOnly = _.filter(dataArray, entry => { - const isErc20 = _.get(entry, 'erc20'); - return isErc20; - }); - for (const token of erc20TokensOnly) { - parsedTokens.push(typeConverters.convertMetaMaskTokenToTokenObject(token)); - } - insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); - cb(); - }) - .catch((err: any) => { - cb(); - }); - }; -} -function _scrapeEthplorerTopTokensToDB(): any { - return (cb: () => void) => { - pullDataScripts - .getEthplorerTopTokens() - .then((data: any) => { - const parsedTokens: any = []; - const tokens = _.get(data, 'tokens'); - for (const token of tokens) { - parsedTokens.push(typeConverters.convertMetaMaskTokenToTokenObject(token)); - } - insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); - cb(); - }) - .catch((err: any) => { - cb(); - }); - }; -} -function _scrapeUnknownTokenInformationToDB(): any { - return (cb: () => void) => { - postgresClient - .query(dataFetchingQueries.get_top_unknown_token_addresses) - .then(async (result: any) => { - const addresses = _.map(result.rows, row => _.get(row, 'address')); - const responses = await Promise.all( - _.map(addresses, address => pullDataScripts.getEthplorerToken(address)), - ); - const tokens = _.filter(responses, response => _.isUndefined(_.get(response, 'error'))); - const parsedTokens = _.map(tokens, tokenInfo => - typeConverters.convertEthplorerTokenToTokenObject(tokenInfo), - ); - insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); - cb(); - }) - .catch((err: any) => { - cb(); - }); - }; -} -function _scrapePriceToDB(timestamp: number, token: any, timeDelay?: number): any { - return (cb: () => void) => { - pullDataScripts - .getPriceData(token.symbol, timestamp, timeDelay) - .then((data: any) => { - const safeSymbol = token.symbol === 'WETH' ? 'ETH' : token.symbol; - const parsedPrice = { - timestamp: timestamp / 1000, - symbol: token.symbol, - base: 'USD', - price: _.has(data[safeSymbol], 'USD') ? data[safeSymbol].USD : 0, - }; - console.debug('Inserting ' + timestamp); - console.debug(parsedPrice); - insertDataScripts.insertSingleRow('prices', parsedPrice); - cb(); - }) - .catch((err: any) => { - console.debug(err); - cb(); - }); - }; -} -// function _scrapeHistoricalPricesToDB(token: any, fromTimestamp: number, toTimestamp: number): any { -// return (cb: () => void) => { -// pullDataScripts -// .getHistoricalPrices(token, BASE_SYMBOL, fromTimestamp, toTimestamp) -// .then((data: any) => { -// const parsedHistoricalPrices: any = []; -// for (const historicalPrice of data['Data']) { -// const parsedHistoricalPrice = typeConverters.convertLogHistoricalPricesToHistoricalPricesObject(historicalPrice); -// parsedHistoricalPrice['token'] = token; -// parsedHistoricalPrice['base'] = BASE_SYMBOL; -// parsedHistoricalPrices.push(parsedHistoricalPrice); -// } -// if (parsedHistoricalPrices.length > 0) { -// insertDataScripts -// .insertMultipleRows( -// 'historical_prices', -// parsedHistoricalPrices, -// Object.keys(parsedHistoricalPrices[0]), -// ) -// .catch((error: any) => { -// console.error(error); -// }); -// } -// cb(); -// }) -// .catch((error: any) => { -// console.error(error); -// cb(); -// }); -// }; -// } -function _scrapeOrderBookToDB(id: string, sraEndpoint: string): any { - return (cb: () => void) => { - pullDataScripts - .getOrderBook(sraEndpoint) - .then((data: any) => { - for (const book of data) { - for (const order of book.bids) { - console.debug(order); - const parsedOrder = typeConverters.convertLogOrderToOrderObject(order); - parsedOrder.relayer_id = id; - parsedOrder.order_hash = ZeroEx.getOrderHashHex(order); - insertDataScripts.insertSingleRow('orders', parsedOrder).catch((error: any) => { - console.error(error); - }); - } - for (const order of book.asks) { - console.debug(order); - const parsedOrder = typeConverters.convertLogOrderToOrderObject(order); - parsedOrder.relayer_id = id; - parsedOrder.order_hash = ZeroEx.getOrderHashHex(order); - insertDataScripts.insertSingleRow('orders', parsedOrder).catch((error: any) => { - console.error(error); - }); - } - } - cb(); - }) - .catch((error: any) => { - console.error(error); - cb(); - }); - }; -} -if (cli.type === 'events') { - if (cli.from && cli.to) { - const destToBlock = cli.to ? cli.to : cli.from; - let curFromBlock = cli.from; - let curToBlock = curFromBlock; - do { - curToBlock += destToBlock - curToBlock < BLOCK_INCREMENTS ? destToBlock - curToBlock : BLOCK_INCREMENTS; - q.push(_scrapeEventsToDB(curFromBlock, curToBlock)); - curFromBlock = curToBlock + 1; - } while (curToBlock < destToBlock); - } -} else if (cli.type === 'blocks') { - if (cli.from && cli.to) { - if (cli.force) { - const destToBlock = cli.to ? cli.to : cli.from; - let curFromBlock = cli.from; - const curToBlock = curFromBlock; - for (; curFromBlock < destToBlock; curFromBlock++) { - q.push(_scrapeBlockToDB(curFromBlock)); - } - } else { - const fetchFrom = cli.from; - const fetchTo = cli.to ? cli.to : cli.from + 1; - postgresClient - .query(dataFetchingQueries.get_used_block_numbers, [fetchFrom, fetchTo]) - .then((data: any) => { - for (const row of data.rows) { - q.push(_scrapeBlockToDB(row.block_number)); - } - }) - .catch((err: any) => { - // console.debug(err); - }); - } - } -} else if (cli.type === 'transactions') { - if (cli.id) { - q.push(_scrapeTransactionToDB(cli.id)); - } else if (cli.from) { - const fetchFrom = cli.from; - const fetchTo = cli.to ? cli.to : cli.from + 1; - postgresClient - .query(dataFetchingQueries.get_missing_txn_hashes, [fetchFrom, fetchTo]) - .then((data: any) => { - for (const row of data.rows) { - q.push(_scrapeTransactionToDB(row.txn_hash)); - } - }) - .catch((err: any) => { - // console.debug(err); - }); - } -} else if (cli.type === 'tokens') { - q.push(_scrapeMetaMaskEthContractMetadataToDB()); - q.push(_scrapeEthplorerTopTokensToDB()); -} else if (cli.type === 'unknown_tokens') { - q.push(_scrapeUnknownTokenInformationToDB()); -} else if (cli.type === 'prices' && cli.from && cli.to) { - const fromDate = new Date(cli.from); - console.debug(fromDate); - fromDate.setUTCHours(0); - fromDate.setUTCMinutes(0); - fromDate.setUTCSeconds(0); - fromDate.setUTCMilliseconds(0); - console.debug(fromDate); - const toDate = new Date(cli.to); - postgresClient - .query(dataFetchingQueries.get_token_registry, []) - .then((result: any) => { - for (const curDate = fromDate; curDate < toDate; curDate.setDate(curDate.getDate() + 1)) { - for (const token of Object.values(result.rows)) { - console.debug('Scraping ' + curDate + ' ' + token); - q.push(_scrapePriceToDB(curDate.getTime(), token)); - } - } - }) - .catch((err: any) => { - console.debug(err); - }); - // } else if (cli.type === 'historical_prices') { - // if (cli.token && cli.from && cli.to) { - // q.push(_scrapeHistoricalPricesToDB(cli.token, cli.from, cli.to)); - // } - // } else if (cli.type === 'all_historical_prices') { - // if (cli.from && cli.to) { - // postgresClient - // .query(dataFetchingQueries.get_token_registry, []) - // .then((result: any) => { - // const curTokens: any = result.rows.map((a: any): any => a.symbol); - // for (const curToken of curTokens) { - // console.debug('Historical data backfill: Pushing coin ' + curToken); - // q.push(_scrapeHistoricalPricesToDB(curToken, cli.from, cli.to)); - // } - // }) - // .catch((err: any) => { - // console.debug(err); - // }); - // } -} else if (cli.type === 'relayers') { - q.push(_scrapeAllRelayersToDB()); -} else if (cli.type === 'orders') { - postgresClient.query(dataFetchingQueries.get_relayers, []).then((result: any) => { - for (const relayer of result.rows) { - if (relayer.sra_http_url) { - q.push(_scrapeOrderBookToDB(relayer.id, relayer.sra_http_url)); - } - } - }); -} -- cgit v1.2.3 From 954c3b9272556723c10fd02ad6b753ac98ca47fa Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Wed, 7 Nov 2018 16:48:25 -0800 Subject: Split index.ts into multiple scripts in scripts/ and detect last known block when pulling events --- packages/pipeline/src/scripts/merge_v2_events.ts | 81 ++++++++++++++++++++++ .../pipeline/src/scripts/pull_missing_events.ts | 60 ++++++++++++++++ .../pipeline/src/scripts/update_relayer_info.ts | 31 +++++++++ 3 files changed, 172 insertions(+) create mode 100644 packages/pipeline/src/scripts/merge_v2_events.ts create mode 100644 packages/pipeline/src/scripts/pull_missing_events.ts create mode 100644 packages/pipeline/src/scripts/update_relayer_info.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/merge_v2_events.ts b/packages/pipeline/src/scripts/merge_v2_events.ts new file mode 100644 index 000000000..227ece121 --- /dev/null +++ b/packages/pipeline/src/scripts/merge_v2_events.ts @@ -0,0 +1,81 @@ +import { web3Factory } from '@0x/dev-utils'; +import 'reflect-metadata'; +import { Connection, createConnection } from 'typeorm'; + +import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; +import { ExchangeFillEvent } from '../entities/ExchangeFillEvent'; +import { deployConfig } from '../ormconfig'; +import { parseExchangeEvents } from '../parsers/events'; + +let connection: Connection; + +(async () => { + connection = await createConnection(deployConfig); + await getExchangeEventsAsync(); + await mergeExchangeEventsAsync(); + console.log('Exiting process'); + process.exit(0); +})(); + +// TODO(albrow): Separately: Errors do not appear to be handled correctly. If you use the +// wrong rpcUrl it just returns early with no error. +async function getExchangeEventsAsync(): Promise { + console.log('Getting event logs...'); + const provider = web3Factory.getRpcProvider({ + rpcUrl: 'https://mainnet.infura.io', + }); + const eventsRepository = connection.getRepository(ExchangeFillEvent); + const exchangeEvents = new ExchangeEventsSource(provider, 1); + const eventLogs = await exchangeEvents.getFillEventsAsync(); + console.log('Parsing events...'); + const events = parseExchangeEvents(eventLogs); + console.log(`Retrieved and parsed ${events.length} total events.`); + console.log('Saving events...'); + for (const event of events) { + await eventsRepository.save(event); + } + await eventsRepository.save(events); + console.log('Saved events.'); +} + +const insertEventsRawQuery = `INSERT INTO events_raw ( + event_type, + error_id, + order_hash, + maker, + maker_amount, + maker_fee, + maker_token, + taker, + taker_amount, + taker_fee, + taker_token, + txn_hash, + fee_recipient, + block_number, + log_index +) +( + SELECT + 'LogFill', + null, + "orderHash", + "makerAddress", + "makerAssetFilledAmount"::numeric(78), + "makerFeePaid"::numeric(78), + "makerTokenAddress", + "takerAddress", + "takerAssetFilledAmount"::numeric(78), + "takerFeePaid"::numeric(78), + "takerTokenAddress", + "transactionHash", + "feeRecipientAddress", + "blockNumber", + "logIndex" + FROM exchange_fill_event +) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`; + +async function mergeExchangeEventsAsync(): Promise { + console.log('Merging results into events_raw...'); + await connection.query(insertEventsRawQuery); +} diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts new file mode 100644 index 000000000..1f71722a3 --- /dev/null +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -0,0 +1,60 @@ +import { web3Factory } from '@0x/dev-utils'; +import { Web3ProviderEngine } from '@0x/subproviders'; +import R = require('ramda'); +import 'reflect-metadata'; +import { Connection, createConnection, Repository } from 'typeorm'; + +import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; +import { ExchangeFillEvent } from '../entities/ExchangeFillEvent'; +import { deployConfig } from '../ormconfig'; +import { parseExchangeEvents } from '../parsers/events'; + +const EXCHANGE_START_BLOCK = 6271590; // Block number when the Exchange contract was deployed to mainnet. +const START_BLOCK_OFFSET = 1000; // Number of blocks before the last known block to consider when updating fill events. +const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. + +let connection: Connection; + +(async () => { + connection = await createConnection(deployConfig); + const provider = web3Factory.getRpcProvider({ + rpcUrl: 'https://mainnet.infura.io', + }); + await getExchangeEventsAsync(provider); + process.exit(0); +})(); + +async function getExchangeEventsAsync(provider: Web3ProviderEngine): Promise { + console.log('Checking existing event logs...'); + const eventsRepository = connection.getRepository(ExchangeFillEvent); + const startBlock = await getStartBlockAsync(eventsRepository); + console.log(`Getting event logs starting at ${startBlock}...`); + const exchangeEvents = new ExchangeEventsSource(provider, 1); + const eventLogs = await exchangeEvents.getFillEventsAsync(startBlock); + console.log('Parsing events...'); + const events = parseExchangeEvents(eventLogs); + console.log(`Retrieved and parsed ${events.length} total events.`); + console.log('Saving events...'); + // Split the events into batches of size BATCH_SAVE_SIZE and save each batch + // in a single request. This reduces round-trip latency to the DB. We need + // to batch this way because saving an extremely large number of events in a + // single request causes problems. + for (const eventsBatch of R.splitEvery(BATCH_SAVE_SIZE, events)) { + await eventsRepository.save(eventsBatch); + } + const totalEvents = await eventsRepository.count(); + console.log(`Done saving events. There are now ${totalEvents} total events.`); +} + +async function getStartBlockAsync(eventsRepository: Repository): Promise { + const fillEventCount = await eventsRepository.count(); + if (fillEventCount === 0) { + console.log('No existing fill events found.'); + return EXCHANGE_START_BLOCK; + } + const queryResult = await connection.query( + 'SELECT "blockNumber" FROM exchange_fill_event ORDER BY "blockNumber" DESC LIMIT 1', + ); + const lastKnownBlock = queryResult[0].blockNumber; + return lastKnownBlock - START_BLOCK_OFFSET; +} diff --git a/packages/pipeline/src/scripts/update_relayer_info.ts b/packages/pipeline/src/scripts/update_relayer_info.ts new file mode 100644 index 000000000..05e045ff4 --- /dev/null +++ b/packages/pipeline/src/scripts/update_relayer_info.ts @@ -0,0 +1,31 @@ +import 'reflect-metadata'; +import { Connection, createConnection } from 'typeorm'; + +import { RelayerRegistrySource } from '../data_sources/relayer-registry'; +import { Relayer } from '../entities/Relayer'; +import { deployConfig } from '../ormconfig'; +import { parseRelayers } from '../parsers/relayer_registry'; + +// NOTE(albrow): We need to manually update this URL for now. Fix this when we +// have the relayer-registry behind semantic versioning. +const RELAYER_REGISTRY_URL = + 'https://raw.githubusercontent.com/0xProject/0x-relayer-registry/4701c85677d161ea729a466aebbc1826c6aa2c0b/relayers.json'; + +let connection: Connection; + +(async () => { + connection = await createConnection(deployConfig); + await getRelayers(); + process.exit(0); +})(); + +async function getRelayers(): Promise { + console.log('Getting latest relayer info...'); + const relayerRepository = connection.getRepository(Relayer); + const relayerSource = new RelayerRegistrySource(RELAYER_REGISTRY_URL); + const relayersResp = await relayerSource.getRelayerInfoAsync(); + const relayers = parseRelayers(relayersResp); + console.log('Saving relayer info...'); + await relayerRepository.save(relayers); + console.log('Done saving relayer info.'); +} -- cgit v1.2.3 From ccad046eb649a60fdf7319a075fa41490d593ae8 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Thu, 8 Nov 2018 10:15:09 -0800 Subject: Reorganize entities. Make scripts work from any directory. --- packages/pipeline/src/scripts/merge_v2_events.ts | 2 +- packages/pipeline/src/scripts/pull_missing_events.ts | 2 +- packages/pipeline/src/scripts/update_relayer_info.ts | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/merge_v2_events.ts b/packages/pipeline/src/scripts/merge_v2_events.ts index 227ece121..99a76aa61 100644 --- a/packages/pipeline/src/scripts/merge_v2_events.ts +++ b/packages/pipeline/src/scripts/merge_v2_events.ts @@ -3,7 +3,7 @@ import 'reflect-metadata'; import { Connection, createConnection } from 'typeorm'; import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; -import { ExchangeFillEvent } from '../entities/ExchangeFillEvent'; +import { ExchangeFillEvent } from '../entities'; import { deployConfig } from '../ormconfig'; import { parseExchangeEvents } from '../parsers/events'; diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index 1f71722a3..a108f012f 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -5,7 +5,7 @@ import 'reflect-metadata'; import { Connection, createConnection, Repository } from 'typeorm'; import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; -import { ExchangeFillEvent } from '../entities/ExchangeFillEvent'; +import { ExchangeFillEvent } from '../entities'; import { deployConfig } from '../ormconfig'; import { parseExchangeEvents } from '../parsers/events'; diff --git a/packages/pipeline/src/scripts/update_relayer_info.ts b/packages/pipeline/src/scripts/update_relayer_info.ts index 05e045ff4..f54e16b6c 100644 --- a/packages/pipeline/src/scripts/update_relayer_info.ts +++ b/packages/pipeline/src/scripts/update_relayer_info.ts @@ -2,7 +2,7 @@ import 'reflect-metadata'; import { Connection, createConnection } from 'typeorm'; import { RelayerRegistrySource } from '../data_sources/relayer-registry'; -import { Relayer } from '../entities/Relayer'; +import { Relayer } from '../entities'; import { deployConfig } from '../ormconfig'; import { parseRelayers } from '../parsers/relayer_registry'; -- cgit v1.2.3 From 410a9244952b663fa8e56cc778b712ae228bbd82 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Thu, 8 Nov 2018 11:38:37 -0800 Subject: Add better error handling for immediately invoked async functions --- packages/pipeline/src/scripts/merge_v2_events.ts | 81 ---------------------- .../pipeline/src/scripts/pull_missing_events.ts | 7 +- .../pipeline/src/scripts/update_relayer_info.ts | 3 +- 3 files changed, 6 insertions(+), 85 deletions(-) delete mode 100644 packages/pipeline/src/scripts/merge_v2_events.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/merge_v2_events.ts b/packages/pipeline/src/scripts/merge_v2_events.ts deleted file mode 100644 index 99a76aa61..000000000 --- a/packages/pipeline/src/scripts/merge_v2_events.ts +++ /dev/null @@ -1,81 +0,0 @@ -import { web3Factory } from '@0x/dev-utils'; -import 'reflect-metadata'; -import { Connection, createConnection } from 'typeorm'; - -import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; -import { ExchangeFillEvent } from '../entities'; -import { deployConfig } from '../ormconfig'; -import { parseExchangeEvents } from '../parsers/events'; - -let connection: Connection; - -(async () => { - connection = await createConnection(deployConfig); - await getExchangeEventsAsync(); - await mergeExchangeEventsAsync(); - console.log('Exiting process'); - process.exit(0); -})(); - -// TODO(albrow): Separately: Errors do not appear to be handled correctly. If you use the -// wrong rpcUrl it just returns early with no error. -async function getExchangeEventsAsync(): Promise { - console.log('Getting event logs...'); - const provider = web3Factory.getRpcProvider({ - rpcUrl: 'https://mainnet.infura.io', - }); - const eventsRepository = connection.getRepository(ExchangeFillEvent); - const exchangeEvents = new ExchangeEventsSource(provider, 1); - const eventLogs = await exchangeEvents.getFillEventsAsync(); - console.log('Parsing events...'); - const events = parseExchangeEvents(eventLogs); - console.log(`Retrieved and parsed ${events.length} total events.`); - console.log('Saving events...'); - for (const event of events) { - await eventsRepository.save(event); - } - await eventsRepository.save(events); - console.log('Saved events.'); -} - -const insertEventsRawQuery = `INSERT INTO events_raw ( - event_type, - error_id, - order_hash, - maker, - maker_amount, - maker_fee, - maker_token, - taker, - taker_amount, - taker_fee, - taker_token, - txn_hash, - fee_recipient, - block_number, - log_index -) -( - SELECT - 'LogFill', - null, - "orderHash", - "makerAddress", - "makerAssetFilledAmount"::numeric(78), - "makerFeePaid"::numeric(78), - "makerTokenAddress", - "takerAddress", - "takerAssetFilledAmount"::numeric(78), - "takerFeePaid"::numeric(78), - "takerTokenAddress", - "transactionHash", - "feeRecipientAddress", - "blockNumber", - "logIndex" - FROM exchange_fill_event -) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`; - -async function mergeExchangeEventsAsync(): Promise { - console.log('Merging results into events_raw...'); - await connection.query(insertEventsRawQuery); -} diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index a108f012f..cca0d9cfe 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -8,6 +8,7 @@ import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange import { ExchangeFillEvent } from '../entities'; import { deployConfig } from '../ormconfig'; import { parseExchangeEvents } from '../parsers/events'; +import { handleError } from '../utils'; const EXCHANGE_START_BLOCK = 6271590; // Block number when the Exchange contract was deployed to mainnet. const START_BLOCK_OFFSET = 1000; // Number of blocks before the last known block to consider when updating fill events. @@ -22,7 +23,7 @@ let connection: Connection; }); await getExchangeEventsAsync(provider); process.exit(0); -})(); +})().catch(handleError); async function getExchangeEventsAsync(provider: Web3ProviderEngine): Promise { console.log('Checking existing event logs...'); @@ -53,8 +54,8 @@ async function getStartBlockAsync(eventsRepository: Repository { console.log('Getting latest relayer info...'); -- cgit v1.2.3 From 329c68f610843ebded9ca31fc9cd6f3eed744a8e Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Mon, 12 Nov 2018 16:40:20 -0800 Subject: Configure TypeORM for migrations. Add new package.json scripts. --- packages/pipeline/src/scripts/pull_missing_events.ts | 6 +++--- packages/pipeline/src/scripts/update_relayer_info.ts | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index cca0d9cfe..e2b312280 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -2,11 +2,11 @@ import { web3Factory } from '@0x/dev-utils'; import { Web3ProviderEngine } from '@0x/subproviders'; import R = require('ramda'); import 'reflect-metadata'; -import { Connection, createConnection, Repository } from 'typeorm'; +import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; import { ExchangeFillEvent } from '../entities'; -import { deployConfig } from '../ormconfig'; +import * as ormConfig from '../ormconfig'; import { parseExchangeEvents } from '../parsers/events'; import { handleError } from '../utils'; @@ -17,7 +17,7 @@ const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. let connection: Connection; (async () => { - connection = await createConnection(deployConfig); + connection = await createConnection(ormConfig as ConnectionOptions); const provider = web3Factory.getRpcProvider({ rpcUrl: 'https://mainnet.infura.io', }); diff --git a/packages/pipeline/src/scripts/update_relayer_info.ts b/packages/pipeline/src/scripts/update_relayer_info.ts index 051289992..af9dd726e 100644 --- a/packages/pipeline/src/scripts/update_relayer_info.ts +++ b/packages/pipeline/src/scripts/update_relayer_info.ts @@ -1,9 +1,9 @@ import 'reflect-metadata'; -import { Connection, createConnection } from 'typeorm'; +import { Connection, ConnectionOptions, createConnection } from 'typeorm'; import { RelayerRegistrySource } from '../data_sources/relayer-registry'; import { Relayer } from '../entities'; -import { deployConfig } from '../ormconfig'; +import * as ormConfig from '../ormconfig'; import { parseRelayers } from '../parsers/relayer_registry'; import { handleError } from '../utils'; @@ -15,7 +15,7 @@ const RELAYER_REGISTRY_URL = let connection: Connection; (async () => { - connection = await createConnection(deployConfig); + connection = await createConnection(ormConfig as ConnectionOptions); await getRelayers(); process.exit(0); })().catch(handleError); -- cgit v1.2.3 From 688d277b30b287f66f0dbd49f2a23cab8b256219 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Mon, 12 Nov 2018 17:36:33 -0800 Subject: Configure linter with --format stylish and fix linter errors --- packages/pipeline/src/scripts/pull_missing_events.ts | 1 + packages/pipeline/src/scripts/update_relayer_info.ts | 1 + 2 files changed, 2 insertions(+) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index e2b312280..bc0eac853 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -1,3 +1,4 @@ +// tslint:disable:no-console import { web3Factory } from '@0x/dev-utils'; import { Web3ProviderEngine } from '@0x/subproviders'; import R = require('ramda'); diff --git a/packages/pipeline/src/scripts/update_relayer_info.ts b/packages/pipeline/src/scripts/update_relayer_info.ts index af9dd726e..f8918728d 100644 --- a/packages/pipeline/src/scripts/update_relayer_info.ts +++ b/packages/pipeline/src/scripts/update_relayer_info.ts @@ -1,3 +1,4 @@ +// tslint:disable:no-console import 'reflect-metadata'; import { Connection, ConnectionOptions, createConnection } from 'typeorm'; -- cgit v1.2.3 From 55bbe1954b35ff0a6367f1ff820d32a32b48eff3 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Tue, 13 Nov 2018 14:05:49 -0800 Subject: Preliminary work for adding RR order book scraping --- .../src/scripts/pull_radar_relay_orders.ts | 52 ++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 packages/pipeline/src/scripts/pull_radar_relay_orders.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts new file mode 100644 index 000000000..c4d3f7095 --- /dev/null +++ b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts @@ -0,0 +1,52 @@ +// tslint:disable:no-console +import { HttpClient } from '@0x/connect'; +import * as R from 'ramda'; +import 'reflect-metadata'; +import { Connection, ConnectionOptions, createConnection } from 'typeorm'; + +import { SraOrder } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseSraOrders } from '../parsers/sra_orders'; +import { handleError } from '../utils'; + +const RADAR_RELAY_URL = 'https://api.radarrelay.com/0x/v2'; +const BATCH_SAVE_SIZE = 1000; // Number of orders to save at once. +const ORDERS_PER_PAGE = 10000; // Number of orders to get per request. + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + await getOrderbook(); + process.exit(0); +})().catch(handleError); + +async function getOrderbook(): Promise { + console.log('Getting all orders...'); + const connectClient = new HttpClient(RADAR_RELAY_URL); + const rawOrders = await connectClient.getOrdersAsync({ + perPage: ORDERS_PER_PAGE, + }); + console.log(`Got ${rawOrders.records.length} orders.`); + console.log('Parsing orders...'); + const orders = R.pipe(parseSraOrders, R.map(setSourceUrl(RADAR_RELAY_URL)))(rawOrders); + const ordersRepository = connection.getRepository(SraOrder); + // TODO(albrow): Move batch saving to a utility function to reduce + // duplicated code. + for (const ordersBatch of R.splitEvery(BATCH_SAVE_SIZE, orders)) { + await ordersRepository.save(ordersBatch); + } +} + +const sourceUrlProp = R.lensProp('sourceUrl'); + +const setSourceUrl = R.curry((sourceURL: string, order: SraOrder): SraOrder => { + return R.set(sourceUrlProp, sourceURL, order); +}); + +const firstSeenProp = R.lensProp('firstSeenTimestamp'); +const lastUpdatedProp = R.lensProp('lastUpdatedTimestamp'); + +const setFirstSeen = R.curry((sourceURL: string, order: SraOrder): SraOrder => { + return R.set(firstSeenTimestampProp, sourceURL, order); +}); -- cgit v1.2.3 From 26280e4aba147ad6000b9df309e64db84b6932fc Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Tue, 13 Nov 2018 15:33:43 -0800 Subject: Implement scraping sra orders from radar relay --- .../src/scripts/pull_radar_relay_orders.ts | 33 +++++++++++----------- 1 file changed, 17 insertions(+), 16 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts index c4d3f7095..b3a4d887e 100644 --- a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts +++ b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts @@ -2,15 +2,14 @@ import { HttpClient } from '@0x/connect'; import * as R from 'ramda'; import 'reflect-metadata'; -import { Connection, ConnectionOptions, createConnection } from 'typeorm'; +import { Connection, ConnectionOptions, createConnection, EntityManager } from 'typeorm'; -import { SraOrder } from '../entities'; +import { createObservedTimestampForOrder, SraOrder } from '../entities'; import * as ormConfig from '../ormconfig'; import { parseSraOrders } from '../parsers/sra_orders'; import { handleError } from '../utils'; const RADAR_RELAY_URL = 'https://api.radarrelay.com/0x/v2'; -const BATCH_SAVE_SIZE = 1000; // Number of orders to save at once. const ORDERS_PER_PAGE = 10000; // Number of orders to get per request. let connection: Connection; @@ -29,24 +28,26 @@ async function getOrderbook(): Promise { }); console.log(`Got ${rawOrders.records.length} orders.`); console.log('Parsing orders...'); + // Parse the sra orders, then add source url to each. const orders = R.pipe(parseSraOrders, R.map(setSourceUrl(RADAR_RELAY_URL)))(rawOrders); - const ordersRepository = connection.getRepository(SraOrder); - // TODO(albrow): Move batch saving to a utility function to reduce - // duplicated code. - for (const ordersBatch of R.splitEvery(BATCH_SAVE_SIZE, orders)) { - await ordersRepository.save(ordersBatch); - } + // Save all the orders and update the observed time stamps in a single + // transaction. + console.log('Saving orders and updating timestamps...'); + await connection.transaction(async (manager: EntityManager): Promise => { + for (const order of orders) { + await manager.save(SraOrder, order); + const observedTimestamp = createObservedTimestampForOrder(order); + await manager.save(observedTimestamp); + } + }); } const sourceUrlProp = R.lensProp('sourceUrl'); +/** + * Sets the source url for a single order. Returns a new order instead of + * mutating the given one. + */ const setSourceUrl = R.curry((sourceURL: string, order: SraOrder): SraOrder => { return R.set(sourceUrlProp, sourceURL, order); }); - -const firstSeenProp = R.lensProp('firstSeenTimestamp'); -const lastUpdatedProp = R.lensProp('lastUpdatedTimestamp'); - -const setFirstSeen = R.curry((sourceURL: string, order: SraOrder): SraOrder => { - return R.set(firstSeenTimestampProp, sourceURL, order); -}); -- cgit v1.2.3 From 10e93bb01ffcf029821430781ef582a24901a461 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Wed, 14 Nov 2018 14:39:34 -0800 Subject: Add raw schema prefix to query in pull_missing_events --- packages/pipeline/src/scripts/pull_missing_events.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index bc0eac853..0af999a77 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -55,7 +55,7 @@ async function getStartBlockAsync(eventsRepository: Repository Date: Wed, 14 Nov 2018 15:58:36 -0800 Subject: Change some column types from varchar to numeric --- packages/pipeline/src/scripts/pull_missing_events.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index 0af999a77..1693bb59a 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -13,7 +13,7 @@ import { handleError } from '../utils'; const EXCHANGE_START_BLOCK = 6271590; // Block number when the Exchange contract was deployed to mainnet. const START_BLOCK_OFFSET = 1000; // Number of blocks before the last known block to consider when updating fill events. -const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. +const BATCH_SAVE_SIZE = 10000; // Number of events to save at once. let connection: Connection; -- cgit v1.2.3 From b0a2c10e11fa41e2800d4fe67d8008b5828128a3 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Wed, 14 Nov 2018 16:20:07 -0800 Subject: Use built-in chunk feature of TypeORM save method --- packages/pipeline/src/scripts/pull_missing_events.ts | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index 1693bb59a..b1b8665dd 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -29,6 +29,7 @@ let connection: Connection; async function getExchangeEventsAsync(provider: Web3ProviderEngine): Promise { console.log('Checking existing event logs...'); const eventsRepository = connection.getRepository(ExchangeFillEvent); + const manager = connection.createEntityManager(); const startBlock = await getStartBlockAsync(eventsRepository); console.log(`Getting event logs starting at ${startBlock}...`); const exchangeEvents = new ExchangeEventsSource(provider, 1); @@ -37,13 +38,7 @@ async function getExchangeEventsAsync(provider: Web3ProviderEngine): Promise Date: Wed, 14 Nov 2018 17:40:19 -0800 Subject: Fix chunk size in pull_missing_events --- packages/pipeline/src/scripts/pull_missing_events.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index b1b8665dd..f53bc12bd 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -38,7 +38,7 @@ async function getExchangeEventsAsync(provider: Web3ProviderEngine): Promise Date: Wed, 14 Nov 2018 18:39:06 -0800 Subject: Add workaround for broken save method --- .../pipeline/src/scripts/pull_missing_events.ts | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index f53bc12bd..2dc81f205 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -38,7 +38,27 @@ async function getExchangeEventsAsync(provider: Web3ProviderEngine): Promise Date: Thu, 15 Nov 2018 12:10:27 -0800 Subject: Optimize database operations in pull_missing_events script --- .../pipeline/src/scripts/pull_missing_events.ts | 35 ++++++++++++++++------ 1 file changed, 26 insertions(+), 9 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index 2dc81f205..bceed299c 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -8,12 +8,12 @@ import { Connection, ConnectionOptions, createConnection, Repository } from 'typ import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; import { ExchangeFillEvent } from '../entities'; import * as ormConfig from '../ormconfig'; -import { parseExchangeEvents } from '../parsers/events'; +import { ExchangeEventEntity, parseExchangeEvents } from '../parsers/events'; import { handleError } from '../utils'; const EXCHANGE_START_BLOCK = 6271590; // Block number when the Exchange contract was deployed to mainnet. -const START_BLOCK_OFFSET = 1000; // Number of blocks before the last known block to consider when updating fill events. -const BATCH_SAVE_SIZE = 10000; // Number of events to save at once. +const START_BLOCK_OFFSET = 100; // Number of blocks before the last known block to consider when updating fill events. +const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. let connection: Connection; @@ -38,17 +38,36 @@ async function getExchangeEventsAsync(provider: Web3ProviderEngine): Promise, + events: ExchangeEventEntity[], +): Promise { // Note(albrow): This is a temporary hack because `save` is not working as // documented and is causing a foreign key constraint violation. Hopefully // can remove later because this "poor man's upsert" implementation operates // on one event at a time and is therefore much slower. - // await eventsRepository.save(events, { chunk: Math.ceil(events.length / BATCH_SAVE_SIZE) }); for (const event of events) { try { - await eventsRepository.save(event); + // First try and insert. + await eventsRepository.insert(event); } catch { - // Assume this is a foreign key constraint error and try doing an - // update instead. + // If it fails, assume it was a foreign key constraint error and try + // doing an update instead. await eventsRepository.update( { contractAddress: event.contractAddress, @@ -59,8 +78,6 @@ async function getExchangeEventsAsync(provider: Web3ProviderEngine): Promise): Promise { -- cgit v1.2.3 From 24fd2d9730d58a58929f401674175ad8a5a7fbc1 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Fri, 16 Nov 2018 12:55:54 -0800 Subject: Add support for pulling Cancel and CancelUpTo events --- .../pipeline/src/scripts/pull_missing_events.ts | 123 ++++++++++++++------- 1 file changed, 81 insertions(+), 42 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index bceed299c..b2a99e3c0 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -1,14 +1,13 @@ // tslint:disable:no-console import { web3Factory } from '@0x/dev-utils'; -import { Web3ProviderEngine } from '@0x/subproviders'; import R = require('ramda'); import 'reflect-metadata'; import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; -import { ExchangeFillEvent } from '../entities'; +import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeEvent, ExchangeFillEvent } from '../entities'; import * as ormConfig from '../ormconfig'; -import { ExchangeEventEntity, parseExchangeEvents } from '../parsers/events'; +import { parseExchangeCancelEvents, parseExchangeCancelUpToEvents, parseExchangeFillEvents } from '../parsers/events'; import { handleError } from '../utils'; const EXCHANGE_START_BLOCK = 6271590; // Block number when the Exchange contract was deployed to mainnet. @@ -22,40 +21,88 @@ let connection: Connection; const provider = web3Factory.getRpcProvider({ rpcUrl: 'https://mainnet.infura.io', }); - await getExchangeEventsAsync(provider); + const eventsSource = new ExchangeEventsSource(provider, 1); + await getFillEventsAsync(eventsSource); + await getCancelEventsAsync(eventsSource); + await getCancelUpToEventsAsync(eventsSource); process.exit(0); })().catch(handleError); -async function getExchangeEventsAsync(provider: Web3ProviderEngine): Promise { - console.log('Checking existing event logs...'); - const eventsRepository = connection.getRepository(ExchangeFillEvent); - const manager = connection.createEntityManager(); - const startBlock = await getStartBlockAsync(eventsRepository); - console.log(`Getting event logs starting at ${startBlock}...`); - const exchangeEvents = new ExchangeEventsSource(provider, 1); - const eventLogs = await exchangeEvents.getFillEventsAsync(startBlock); - console.log('Parsing events...'); - const events = parseExchangeEvents(eventLogs); - console.log(`Retrieved and parsed ${events.length} total events.`); - console.log('Saving events...'); - if (startBlock === EXCHANGE_START_BLOCK) { +async function getFillEventsAsync(eventsSource: ExchangeEventsSource): Promise { + console.log('Checking existing fill events...'); + const repository = connection.getRepository(ExchangeFillEvent); + const startBlock = await getStartBlockAsync(repository); + console.log(`Getting fill events starting at ${startBlock}...`); + const eventLogs = await eventsSource.getFillEventsAsync(startBlock); + console.log('Parsing fill events...'); + const events = parseExchangeFillEvents(eventLogs); + console.log(`Retrieved and parsed ${events.length} total fill events.`); + await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); +} + +async function getCancelEventsAsync(eventsSource: ExchangeEventsSource): Promise { + console.log('Checking existing cancel events...'); + const repository = connection.getRepository(ExchangeCancelEvent); + const startBlock = await getStartBlockAsync(repository); + console.log(`Getting cancel events starting at ${startBlock}...`); + const eventLogs = await eventsSource.getCancelEventsAsync(startBlock); + console.log('Parsing cancel events...'); + const events = parseExchangeCancelEvents(eventLogs); + console.log(`Retrieved and parsed ${events.length} total cancel events.`); + await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); +} + +async function getCancelUpToEventsAsync(eventsSource: ExchangeEventsSource): Promise { + console.log('Checking existing CancelUpTo events...'); + const repository = connection.getRepository(ExchangeCancelUpToEvent); + const startBlock = await getStartBlockAsync(repository); + console.log(`Getting CancelUpTo events starting at ${startBlock}...`); + const eventLogs = await eventsSource.getCancelUpToEventsAsync(startBlock); + console.log('Parsing CancelUpTo events...'); + const events = parseExchangeCancelUpToEvents(eventLogs); + console.log(`Retrieved and parsed ${events.length} total CancelUpTo events.`); + await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); +} + +async function getStartBlockAsync(repository: Repository): Promise { + const fillEventCount = await repository.count(); + if (fillEventCount === 0) { + console.log(`No existing ${repository.metadata.name}s found.`); + return EXCHANGE_START_BLOCK; + } + const queryResult = await connection.query( + // TODO(albrow): Would prefer to use a prepared statement here to reduce + // surface area for SQL injections, but it doesn't appear to be working. + `SELECT block_number FROM raw.${repository.metadata.tableName} ORDER BY block_number DESC LIMIT 1`, + ); + const lastKnownBlock = queryResult[0].block_number; + return lastKnownBlock - START_BLOCK_OFFSET; +} + +async function saveEventsAsync( + isInitialPull: boolean, + repository: Repository, + events: T[], +): Promise { + console.log(`Saving ${repository.metadata.name}s...`); + if (isInitialPull) { // Split data into numChunks pieces of maximum size BATCH_SAVE_SIZE // each. for (const eventsBatch of R.splitEvery(BATCH_SAVE_SIZE, events)) { - await eventsRepository.insert(eventsBatch); + await repository.insert(eventsBatch); } } else { // If we possibly have some overlap where we need to update some // existing events, we need to use our workaround/fallback. - await saveIndividuallyWithFallbackAsync(eventsRepository, events); + await saveIndividuallyWithFallbackAsync(repository, events); } - const totalEvents = await eventsRepository.count(); - console.log(`Done saving events. There are now ${totalEvents} total events.`); + const totalEvents = await repository.count(); + console.log(`Done saving events. There are now ${totalEvents} total ${repository.metadata.name}s.`); } -async function saveIndividuallyWithFallbackAsync( - eventsRepository: Repository, - events: ExchangeEventEntity[], +async function saveIndividuallyWithFallbackAsync( + repository: Repository, + events: T[], ): Promise { // Note(albrow): This is a temporary hack because `save` is not working as // documented and is causing a foreign key constraint violation. Hopefully @@ -63,32 +110,24 @@ async function saveIndividuallyWithFallbackAsync( // on one event at a time and is therefore much slower. for (const event of events) { try { - // First try and insert. - await eventsRepository.insert(event); + // First try an insert. + await repository.insert(event); } catch { // If it fails, assume it was a foreign key constraint error and try // doing an update instead. - await eventsRepository.update( + // Note(albrow): Unfortunately the `as any` hack here seems + // required. I can't figure out how to convince the type-checker + // that the criteria and the entity itself are the correct type for + // the given repository. If we can remove the `save` hack then this + // will probably no longer be necessary. + await repository.update( { contractAddress: event.contractAddress, blockNumber: event.blockNumber, logIndex: event.logIndex, - }, - event, + } as any, + event as any, ); } } } - -async function getStartBlockAsync(eventsRepository: Repository): Promise { - const fillEventCount = await eventsRepository.count(); - if (fillEventCount === 0) { - console.log('No existing fill events found.'); - return EXCHANGE_START_BLOCK; - } - const queryResult = await connection.query( - 'SELECT block_number FROM raw.exchange_fill_events ORDER BY block_number DESC LIMIT 1', - ); - const lastKnownBlock = queryResult[0].block_number; - return lastKnownBlock - START_BLOCK_OFFSET; -} -- cgit v1.2.3 From 5cad2ad1744ab1c1e24ed52fc0a26ec5acf5c898 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Fri, 16 Nov 2018 13:16:17 -0800 Subject: Check for special characters in table name in pull_missing_events --- packages/pipeline/src/scripts/pull_missing_events.ts | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index b2a99e3c0..0b7f6287f 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -64,16 +64,20 @@ async function getCancelUpToEventsAsync(eventsSource: ExchangeEventsSource): Pro await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); } +const tabelNameRegex = /^[a-zA-Z_]*$/; + async function getStartBlockAsync(repository: Repository): Promise { const fillEventCount = await repository.count(); if (fillEventCount === 0) { console.log(`No existing ${repository.metadata.name}s found.`); return EXCHANGE_START_BLOCK; } + const tableName = repository.metadata.tableName; + if (!tabelNameRegex.test(tableName)) { + throw new Error('Unexpected special character in table name: ' + tableName); + } const queryResult = await connection.query( - // TODO(albrow): Would prefer to use a prepared statement here to reduce - // surface area for SQL injections, but it doesn't appear to be working. - `SELECT block_number FROM raw.${repository.metadata.tableName} ORDER BY block_number DESC LIMIT 1`, + `SELECT block_number FROM raw.${tableName} ORDER BY block_number DESC LIMIT 1`, ); const lastKnownBlock = queryResult[0].block_number; return lastKnownBlock - START_BLOCK_OFFSET; -- cgit v1.2.3 From 9986717671fe8e14c2168f7479bdaffe406bedc0 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Mon, 19 Nov 2018 18:38:11 -0800 Subject: Add script for pulling missing block data --- .../pipeline/src/scripts/pull_missing_blocks.ts | 83 ++++++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 packages/pipeline/src/scripts/pull_missing_blocks.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_blocks.ts b/packages/pipeline/src/scripts/pull_missing_blocks.ts new file mode 100644 index 000000000..4a1483ab9 --- /dev/null +++ b/packages/pipeline/src/scripts/pull_missing_blocks.ts @@ -0,0 +1,83 @@ +// tslint:disable:no-console +import { web3Factory } from '@0x/dev-utils'; +import * as Parallel from 'async-parallel'; +import R = require('ramda'); +import 'reflect-metadata'; +import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; + +import { Web3Source } from '../data_sources/web3'; +import { Block } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseBlock } from '../parsers/web3'; +import { handleError } from '../utils'; + +// Number of blocks to save at once. +const BATCH_SAVE_SIZE = 1000; +// Maximum number of requests to send at once. +const MAX_CONCURRENCY = 10; +// Maximum number of blocks to query for at once. This is also the maximum +// number of blocks we will hold in memory prior to being saved to the database. +const MAX_BLOCKS_PER_QUERY = 1000; +// Block number when the Exchange contract was deployed to mainnet. +// TODO(albrow): De-dupe this constant. +const EXCHANGE_START_BLOCK = 6271590; + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const provider = web3Factory.getRpcProvider({ + rpcUrl: `https://mainnet.infura.io/${process.env.INFURA_API_KEY}`, + }); + const web3Source = new Web3Source(provider); + await getAllMissingBlocks(web3Source); + process.exit(0); +})().catch(handleError); + +interface MissingBlocksResponse { + block_number: string; +} + +async function getAllMissingBlocks(web3Source: Web3Source): Promise { + const blocksRepository = connection.getRepository(Block); + let fromBlock = EXCHANGE_START_BLOCK; + while (true) { + const blockNumbers = await getMissingBlockNumbers(fromBlock); + if (blockNumbers.length === 0) { + // There are no more missing blocks. We're done. + break; + } + await getAndSaveBlocks(web3Source, blocksRepository, blockNumbers); + fromBlock = Math.max(...blockNumbers) + 1; + } + const totalBlocks = await blocksRepository.count(); + console.log(`Done saving blocks. There are now ${totalBlocks} total blocks.`); +} + +async function getMissingBlockNumbers(fromBlock: number): Promise { + console.log(`Checking for missing blocks starting at ${fromBlock}...`); + const response = (await connection.query( + 'SELECT DISTINCT(block_number) FROM raw.exchange_fill_events WHERE block_number NOT IN (SELECT number FROM raw.blocks) AND block_number >= $1 ORDER BY block_number ASC LIMIT $2', + [fromBlock, MAX_BLOCKS_PER_QUERY], + )) as MissingBlocksResponse[]; + const blockNumberStrings = R.pluck('block_number', response); + const blockNumbers = R.map(parseInt, blockNumberStrings); + console.log(`Found ${blockNumbers.length} missing blocks in the given range.`); + return blockNumbers; +} + +async function getAndSaveBlocks( + web3Source: Web3Source, + blocksRepository: Repository, + blockNumbers: number[], +): Promise { + console.log(`Getting block data for ${blockNumbers.length} blocks...`); + Parallel.setConcurrency(MAX_CONCURRENCY); + const rawBlocks = await Parallel.map(blockNumbers, async (blockNumber: number) => + web3Source.getBlockInfoAsync(blockNumber), + ); + console.log(`Parsing ${rawBlocks.length} blocks...`); + const blocks = R.map(parseBlock, rawBlocks); + console.log(`Saving ${blocks.length} blocks...`); + await blocksRepository.save(blocks, { chunk: Math.ceil(blocks.length / BATCH_SAVE_SIZE) }); +} -- cgit v1.2.3 From c6af5131b0b06433d6294260274e187ad61f4ef7 Mon Sep 17 00:00:00 2001 From: Jake Ellowitz Date: Mon, 19 Nov 2018 16:24:07 -0800 Subject: Pull token metadata re trusted tokens --- .../pipeline/src/scripts/pull_missing_events.ts | 4 +- .../pipeline/src/scripts/pull_trusted_tokens.ts | 51 ++++++++++++++++++++++ 2 files changed, 53 insertions(+), 2 deletions(-) create mode 100644 packages/pipeline/src/scripts/pull_trusted_tokens.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index 0b7f6287f..68cabe3de 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -64,7 +64,7 @@ async function getCancelUpToEventsAsync(eventsSource: ExchangeEventsSource): Pro await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); } -const tabelNameRegex = /^[a-zA-Z_]*$/; +const tableNameRegex = /^[a-zA-Z_]*$/; async function getStartBlockAsync(repository: Repository): Promise { const fillEventCount = await repository.count(); @@ -73,7 +73,7 @@ async function getStartBlockAsync(repository: Repositor return EXCHANGE_START_BLOCK; } const tableName = repository.metadata.tableName; - if (!tabelNameRegex.test(tableName)) { + if (!tableNameRegex.test(tableName)) { throw new Error('Unexpected special character in table name: ' + tableName); } const queryResult = await connection.query( diff --git a/packages/pipeline/src/scripts/pull_trusted_tokens.ts b/packages/pipeline/src/scripts/pull_trusted_tokens.ts new file mode 100644 index 000000000..67d9e08d1 --- /dev/null +++ b/packages/pipeline/src/scripts/pull_trusted_tokens.ts @@ -0,0 +1,51 @@ +import 'reflect-metadata'; +import { Connection, ConnectionOptions, createConnection } from 'typeorm'; + +import { MetamaskTrustedTokenMeta, TrustedTokenSource, ZeroExTrustedTokenMeta } from '../data_sources/trusted_tokens'; +import { TrustedToken } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseMetamaskTrustedTokens, parseZeroExTrustedTokens } from '../parsers/trusted_tokens'; +import { handleError } from '../utils'; + +const METAMASK_TRUSTED_TOKENS_URL = + 'https://raw.githubusercontent.com/MetaMask/eth-contract-metadata/master/contract-map.json'; + +const ZEROEX_TRUSTED_TOKENS_URL = + 'https://website-api.0xproject.com/tokens'; + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + await getMetamaskTrustedTokens(); + await getZeroExTrustedTokens(); + process.exit(0); +})().catch(handleError); + +async function getMetamaskTrustedTokens(): Promise { + // tslint:disable-next-line + console.log('Getting latest metamask trusted tokens list ...'); + const trustedTokensRepository = connection.getRepository(TrustedToken); + const trustedTokensSource = new TrustedTokenSource>(METAMASK_TRUSTED_TOKENS_URL); + const resp = await trustedTokensSource.getTrustedTokenMetaAsync(); + const trustedTokens = parseMetamaskTrustedTokens(resp); + // tslint:disable-next-line + console.log('Saving metamask trusted tokens list'); + await trustedTokensRepository.save(trustedTokens); + // tslint:disable-next-line + console.log('Done saving metamask trusted tokens.') +} + +async function getZeroExTrustedTokens(): Promise { + // tslint:disable-next-line + console.log('Getting latest 0x trusted tokens list ...'); + const trustedTokensRepository = connection.getRepository(TrustedToken); + const trustedTokensSource = new TrustedTokenSource(ZEROEX_TRUSTED_TOKENS_URL); + const resp = await trustedTokensSource.getTrustedTokenMetaAsync(); + const trustedTokens = parseZeroExTrustedTokens(resp); + // tslint:disable-next-line + console.log('Saving metamask trusted tokens list'); + await trustedTokensRepository.save(trustedTokens); + // tslint:disable-next-line + console.log('Done saving metamask trusted tokens.'); +} -- cgit v1.2.3 From dea89c4e221d5b22de97b27573719cd27ce250c7 Mon Sep 17 00:00:00 2001 From: Jake Ellowitz Date: Mon, 19 Nov 2018 19:11:51 -0800 Subject: metadata and trusted sources in same raw table --- packages/pipeline/src/scripts/pull_trusted_tokens.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_trusted_tokens.ts b/packages/pipeline/src/scripts/pull_trusted_tokens.ts index 67d9e08d1..48e20bea7 100644 --- a/packages/pipeline/src/scripts/pull_trusted_tokens.ts +++ b/packages/pipeline/src/scripts/pull_trusted_tokens.ts @@ -2,9 +2,9 @@ import 'reflect-metadata'; import { Connection, ConnectionOptions, createConnection } from 'typeorm'; import { MetamaskTrustedTokenMeta, TrustedTokenSource, ZeroExTrustedTokenMeta } from '../data_sources/trusted_tokens'; -import { TrustedToken } from '../entities'; +import { TokenMetadata } from '../entities'; import * as ormConfig from '../ormconfig'; -import { parseMetamaskTrustedTokens, parseZeroExTrustedTokens } from '../parsers/trusted_tokens'; +import { parseMetamaskTrustedTokens, parseZeroExTrustedTokens } from '../parsers/token_metadata'; import { handleError } from '../utils'; const METAMASK_TRUSTED_TOKENS_URL = @@ -25,7 +25,7 @@ let connection: Connection; async function getMetamaskTrustedTokens(): Promise { // tslint:disable-next-line console.log('Getting latest metamask trusted tokens list ...'); - const trustedTokensRepository = connection.getRepository(TrustedToken); + const trustedTokensRepository = connection.getRepository(TokenMetadata); const trustedTokensSource = new TrustedTokenSource>(METAMASK_TRUSTED_TOKENS_URL); const resp = await trustedTokensSource.getTrustedTokenMetaAsync(); const trustedTokens = parseMetamaskTrustedTokens(resp); @@ -39,7 +39,7 @@ async function getMetamaskTrustedTokens(): Promise { async function getZeroExTrustedTokens(): Promise { // tslint:disable-next-line console.log('Getting latest 0x trusted tokens list ...'); - const trustedTokensRepository = connection.getRepository(TrustedToken); + const trustedTokensRepository = connection.getRepository(TokenMetadata); const trustedTokensSource = new TrustedTokenSource(ZEROEX_TRUSTED_TOKENS_URL); const resp = await trustedTokensSource.getTrustedTokenMetaAsync(); const trustedTokens = parseZeroExTrustedTokens(resp); -- cgit v1.2.3 From 1aa3f9d69f7a6570c6fa62c542063fa92fc0bf5a Mon Sep 17 00:00:00 2001 From: Jake Ellowitz Date: Mon, 26 Nov 2018 14:55:45 -0800 Subject: updating comment for 0x trusted tokens --- packages/pipeline/src/scripts/pull_trusted_tokens.ts | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_trusted_tokens.ts b/packages/pipeline/src/scripts/pull_trusted_tokens.ts index 48e20bea7..b3a4466bb 100644 --- a/packages/pipeline/src/scripts/pull_trusted_tokens.ts +++ b/packages/pipeline/src/scripts/pull_trusted_tokens.ts @@ -10,8 +10,7 @@ import { handleError } from '../utils'; const METAMASK_TRUSTED_TOKENS_URL = 'https://raw.githubusercontent.com/MetaMask/eth-contract-metadata/master/contract-map.json'; -const ZEROEX_TRUSTED_TOKENS_URL = - 'https://website-api.0xproject.com/tokens'; +const ZEROEX_TRUSTED_TOKENS_URL = 'https://website-api.0xproject.com/tokens'; let connection: Connection; @@ -26,14 +25,16 @@ async function getMetamaskTrustedTokens(): Promise { // tslint:disable-next-line console.log('Getting latest metamask trusted tokens list ...'); const trustedTokensRepository = connection.getRepository(TokenMetadata); - const trustedTokensSource = new TrustedTokenSource>(METAMASK_TRUSTED_TOKENS_URL); + const trustedTokensSource = new TrustedTokenSource>( + METAMASK_TRUSTED_TOKENS_URL, + ); const resp = await trustedTokensSource.getTrustedTokenMetaAsync(); const trustedTokens = parseMetamaskTrustedTokens(resp); // tslint:disable-next-line console.log('Saving metamask trusted tokens list'); await trustedTokensRepository.save(trustedTokens); // tslint:disable-next-line - console.log('Done saving metamask trusted tokens.') + console.log('Done saving metamask trusted tokens.'); } async function getZeroExTrustedTokens(): Promise { -- cgit v1.2.3 From 7198b441e0d85785eec7244dd60bcd92269d954e Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Thu, 29 Nov 2018 11:40:09 -0800 Subject: Add script for parsing competing dex trades from Bloxy (#1355) --- .../src/scripts/pull_competing_dex_trades.ts | 51 ++++++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 packages/pipeline/src/scripts/pull_competing_dex_trades.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_competing_dex_trades.ts b/packages/pipeline/src/scripts/pull_competing_dex_trades.ts new file mode 100644 index 000000000..4e4c12dd0 --- /dev/null +++ b/packages/pipeline/src/scripts/pull_competing_dex_trades.ts @@ -0,0 +1,51 @@ +// tslint:disable:no-console +import 'reflect-metadata'; +import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; + +import { BloxySource } from '../data_sources/bloxy'; +import { DexTrade } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseBloxyTrades } from '../parsers/bloxy'; +import { handleError } from '../utils'; + +// Number of trades to save at once. +const BATCH_SAVE_SIZE = 1000; + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + await getAndSaveTrades(); + process.exit(0); +})().catch(handleError); + +async function getAndSaveTrades(): Promise { + const apiKey = process.env.BLOXY_API_KEY; + if (apiKey === undefined) { + throw new Error('Missing required env var: BLOXY_API_KEY'); + } + const bloxySource = new BloxySource(apiKey); + const tradesRepository = connection.getRepository(DexTrade); + const lastSeenTimestamp = await getLastSeenTimestampAsync(tradesRepository); + console.log(`Last seen timestamp: ${lastSeenTimestamp === 0 ? 'none' : lastSeenTimestamp}`); + console.log('Getting latest dex trades...'); + const rawTrades = await bloxySource.getDexTradesAsync(lastSeenTimestamp); + console.log(`Parsing ${rawTrades.length} trades...`); + const trades = parseBloxyTrades(rawTrades); + console.log(`Saving ${trades.length} trades...`); + await tradesRepository.save(trades, { chunk: Math.ceil(trades.length / BATCH_SAVE_SIZE) }); + console.log('Done saving trades.'); +} + +async function getLastSeenTimestampAsync(tradesRepository: Repository): Promise { + if ((await tradesRepository.count()) === 0) { + return 0; + } + const response = (await connection.query( + 'SELECT tx_timestamp FROM raw.dex_trades ORDER BY tx_timestamp DESC LIMIT 1', + )) as Array<{ tx_timestamp: number }>; + if (response.length === 0) { + return 0; + } + return response[0].tx_timestamp; +} -- cgit v1.2.3 From 87ffa5d7ab19d2288bf68131a7e7ec77578c564c Mon Sep 17 00:00:00 2001 From: zkao Date: Tue, 4 Dec 2018 13:21:46 -0800 Subject: Token_orderbook_snapshots for Ddex and Paradex(#1354) * Implements the TokenOrderbookSnapshot Table * Scripts, Data Sources and Entities to pull Ddex and Paradex API data. --- .../src/scripts/pull_ddex_orderbook_snapshots.ts | 55 ++++++++++++++ .../scripts/pull_paradex_orderbook_snapshots.ts | 87 ++++++++++++++++++++++ 2 files changed, 142 insertions(+) create mode 100644 packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts create mode 100644 packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts new file mode 100644 index 000000000..b02468e9b --- /dev/null +++ b/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts @@ -0,0 +1,55 @@ +import { logUtils } from '@0x/utils'; +import * as R from 'ramda'; +import { Connection, ConnectionOptions, createConnection } from 'typeorm'; + +import { DDEX_SOURCE, DdexMarket, DdexSource } from '../data_sources/ddex'; +import { TokenOrderbookSnapshot as TokenOrder } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseDdexOrders } from '../parsers/ddex_orders'; +import { handleError } from '../utils'; + +// Number of orders to save at once. +const BATCH_SAVE_SIZE = 1000; + +// Number of markets to retrieve orderbooks for at once. +const MARKET_ORDERBOOK_REQUEST_BATCH_SIZE = 50; + +// Delay between market orderbook requests. +const MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY = 5000; + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const ddexSource = new DdexSource(); + const markets = await ddexSource.getActiveMarketsAsync(); + for (const marketsChunk of R.splitEvery(MARKET_ORDERBOOK_REQUEST_BATCH_SIZE, markets)) { + await Promise.all( + marketsChunk.map(async (market: DdexMarket) => getAndSaveMarketOrderbook(ddexSource, market)), + ); + await new Promise(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY)); + } + process.exit(0); +})().catch(handleError); + +/** + * Retrieve orderbook from Ddex API for a given market. Parse orders and insert + * them into our database. + * @param ddexSource Data source which can query Ddex API. + * @param market Object from Ddex API containing market data. + */ +async function getAndSaveMarketOrderbook(ddexSource: DdexSource, market: DdexMarket): Promise { + const orderBook = await ddexSource.getMarketOrderbookAsync(market.id); + const observedTimestamp = Date.now(); + + logUtils.log(`${market.id}: Parsing orders.`); + const orders = parseDdexOrders(orderBook, market, observedTimestamp, DDEX_SOURCE); + + if (orders.length > 0) { + logUtils.log(`${market.id}: Saving ${orders.length} orders.`); + const TokenOrderRepository = connection.getRepository(TokenOrder); + await TokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) }); + } else { + logUtils.log(`${market.id}: 0 orders to save.`); + } +} diff --git a/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts new file mode 100644 index 000000000..bae1fbede --- /dev/null +++ b/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts @@ -0,0 +1,87 @@ +import { logUtils } from '@0x/utils'; +import { Connection, ConnectionOptions, createConnection } from 'typeorm'; + +import { + PARADEX_SOURCE, + ParadexActiveMarketsResponse, + ParadexMarket, + ParadexSource, + ParadexTokenInfoResponse, +} from '../data_sources/paradex'; +import { TokenOrderbookSnapshot as TokenOrder } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseParadexOrders } from '../parsers/paradex_orders'; +import { handleError } from '../utils'; + +// Number of orders to save at once. +const BATCH_SAVE_SIZE = 1000; + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const apiKey = process.env.PARADEX_DATA_PIPELINE_API_KEY; + if (apiKey === undefined) { + throw new Error('Missing required env var: PARADEX_DATA_PIPELINE_API_KEY'); + } + const paradexSource = new ParadexSource(apiKey); + const markets = await paradexSource.getActiveMarketsAsync(); + const tokenInfoResponse = await paradexSource.getTokenInfoAsync(); + const extendedMarkets = addTokenAddresses(markets, tokenInfoResponse); + await Promise.all( + extendedMarkets.map(async (market: ParadexMarket) => getAndSaveMarketOrderbook(paradexSource, market)), + ); + process.exit(0); +})().catch(handleError); + +/** + * Extend the default ParadexMarket objects with token addresses. + * @param markets An array of ParadexMarket objects. + * @param tokenInfoResponse An array of ParadexTokenInfo containing the addresses. + */ +function addTokenAddresses( + markets: ParadexActiveMarketsResponse, + tokenInfoResponse: ParadexTokenInfoResponse, +): ParadexMarket[] { + const symbolAddressMapping = new Map(); + tokenInfoResponse.forEach(tokenInfo => symbolAddressMapping.set(tokenInfo.symbol, tokenInfo.address)); + + markets.forEach((market: ParadexMarket) => { + if (symbolAddressMapping.has(market.baseToken)) { + market.baseTokenAddress = symbolAddressMapping.get(market.baseToken); + } else { + market.quoteTokenAddress = ''; + logUtils.warn(`${market.baseToken}: No address found.`); + } + + if (symbolAddressMapping.has(market.quoteToken)) { + market.quoteTokenAddress = symbolAddressMapping.get(market.quoteToken); + } else { + market.quoteTokenAddress = ''; + logUtils.warn(`${market.quoteToken}: No address found.`); + } + }); + return markets; +} + +/** + * Retrieve orderbook from Paradex API for a given market. Parse orders and insert + * them into our database. + * @param paradexSource Data source which can query the Paradex API. + * @param market Object from the Paradex API with information about the market in question. + */ +async function getAndSaveMarketOrderbook(paradexSource: ParadexSource, market: ParadexMarket): Promise { + const paradexOrderbookResponse = await paradexSource.getMarketOrderbookAsync(market.symbol); + const observedTimestamp = Date.now(); + + logUtils.log(`${market.symbol}: Parsing orders.`); + const orders = parseParadexOrders(paradexOrderbookResponse, market, observedTimestamp, PARADEX_SOURCE); + + if (orders.length > 0) { + logUtils.log(`${market.symbol}: Saving ${orders.length} orders.`); + const tokenOrderRepository = connection.getRepository(TokenOrder); + await tokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) }); + } else { + logUtils.log(`${market.symbol}: 0 orders to save.`); + } +} -- cgit v1.2.3 From 8c21a700bae0c751f7f9ca47f9a47628a4478911 Mon Sep 17 00:00:00 2001 From: Xianny <8582774+xianny@users.noreply.github.com> Date: Tue, 4 Dec 2018 13:36:18 -0800 Subject: pull OHLCV records from Crypto Compare (#1349) * [WIP] pull OHLCV records from Crypto Compare * lint * refactor to pull logic out of script and into modules * add entity test for ohlcv_external entity * implement rate limit and chronological backfill for ohlcv * add unit tests; cleanup variable names * Fetch OHLCV pairs params from events table * better method names * fix outdated test * lint * Clean up after review * oops * fix failing test * better filtering of most recent records * fix bug when generating pairs * fix default earliest backfill date * fix bug with retrieving backfill time * prettier --- .../src/scripts/pull_ohlcv_cryptocompare.ts | 101 +++++++++++++++++++++ 1 file changed, 101 insertions(+) create mode 100644 packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts new file mode 100644 index 000000000..6979cd10e --- /dev/null +++ b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts @@ -0,0 +1,101 @@ +// tslint:disable:no-console +import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; + +import { CryptoCompareOHLCVSource } from '../data_sources/ohlcv_external/crypto_compare'; +import { OHLCVExternal } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { OHLCVMetadata, parseRecords } from '../parsers/ohlcv_external/crypto_compare'; +import { handleError } from '../utils'; +import { fetchOHLCVTradingPairsAsync, TradingPair } from '../utils/get_ohlcv_trading_pairs'; + +const SOURCE_NAME = 'CryptoCompare'; +const TWO_HOURS_AGO = new Date().getTime() - 2 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers +const ONE_HOUR_AGO = new Date().getTime() - 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers +const ONE_SECOND = 1000; + +const MAX_CONCURRENT_REQUESTS = parseInt(process.env.CRYPTOCOMPARE_MAX_CONCURRENT_REQUESTS || '14', 10); // tslint:disable-line:custom-no-magic-numbers +const EARLIEST_BACKFILL_DATE = process.env.OHLCV_EARLIEST_BACKFILL_DATE || '2010-09-01'; // the time when BTC/USD info starts appearing on Crypto Compare +const EARLIEST_BACKFILL_TIME = new Date(EARLIEST_BACKFILL_DATE).getTime(); + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const repository = connection.getRepository(OHLCVExternal); + const source = new CryptoCompareOHLCVSource(MAX_CONCURRENT_REQUESTS); + + const jobTime = new Date().getTime(); + const tradingPairs = await fetchOHLCVTradingPairsAsync(connection, SOURCE_NAME, EARLIEST_BACKFILL_TIME); + console.log(`Starting ${tradingPairs.length} job(s) to scrape Crypto Compare for OHLCV records...`); + + const fetchAndSavePromises = tradingPairs.map(async pair => { + const pairs = source.generateBackfillIntervals(pair); + return fetchAndSaveAsync(source, repository, jobTime, pairs); + }); + await Promise.all(fetchAndSavePromises); + console.log(`Finished scraping OHLCV records from Crypto Compare, exiting...`); + process.exit(0); +})().catch(handleError); + +async function fetchAndSaveAsync( + source: CryptoCompareOHLCVSource, + repository: Repository, + jobTime: number, + pairs: TradingPair[], +): Promise { + const sortAscTimestamp = (a: TradingPair, b: TradingPair): number => { + if (a.latestSavedTime < b.latestSavedTime) { + return -1; + } else if (a.latestSavedTime > b.latestSavedTime) { + return 1; + } else { + return 0; + } + }; + pairs.sort(sortAscTimestamp); + + let i = 0; + while (i < pairs.length) { + const pair = pairs[i]; + if (pair.latestSavedTime > TWO_HOURS_AGO) { + break; + } + const rawRecords = await source.getHourlyOHLCVAsync(pair); + const records = rawRecords.filter(rec => { + return rec.time * ONE_SECOND < ONE_HOUR_AGO && rec.time * ONE_SECOND > pair.latestSavedTime; + }); // Crypto Compare can take ~30mins to finalise records + if (records.length === 0) { + console.log(`No more records, stopping task for ${JSON.stringify(pair)}`); + break; + } + const metadata: OHLCVMetadata = { + exchange: source.default_exchange, + fromSymbol: pair.fromSymbol, + toSymbol: pair.toSymbol, + source: SOURCE_NAME, + observedTimestamp: jobTime, + interval: source.intervalBetweenRecords, + }; + const parsedRecords = parseRecords(records, metadata); + try { + await saveRecordsAsync(repository, parsedRecords); + i++; + } catch (err) { + console.log(`Error saving OHLCVRecords, stopping task for ${JSON.stringify(pair)} [${err}]`); + break; + } + } + return Promise.resolve(); +} + +async function saveRecordsAsync(repository: Repository, records: OHLCVExternal[]): Promise { + const metadata = [ + records[0].fromSymbol, + records[0].toSymbol, + new Date(records[0].startTime), + new Date(records[records.length - 1].endTime), + ]; + + console.log(`Saving ${records.length} records to ${repository.metadata.name}... ${JSON.stringify(metadata)}`); + await repository.save(records); +} -- cgit v1.2.3 From 8721d4ed7a7a38f77847b45619f15315044c375c Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Tue, 4 Dec 2018 15:23:15 -0800 Subject: Fix linter --- packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts index b02468e9b..7868e9c5a 100644 --- a/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts +++ b/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts @@ -27,7 +27,7 @@ let connection: Connection; await Promise.all( marketsChunk.map(async (market: DdexMarket) => getAndSaveMarketOrderbook(ddexSource, market)), ); - await new Promise(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY)); + await new Promise(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY)); } process.exit(0); })().catch(handleError); -- cgit v1.2.3 From 549f5e4655f246062dd6451065ec01eb789dbd8f Mon Sep 17 00:00:00 2001 From: Fabio B Date: Tue, 4 Dec 2018 19:56:23 -0800 Subject: Use a string template in packages/pipeline/src/scripts/pull_missing_events.ts Co-Authored-By: albrow --- packages/pipeline/src/scripts/pull_missing_events.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index 68cabe3de..c7e07c540 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -74,7 +74,7 @@ async function getStartBlockAsync(repository: Repositor } const tableName = repository.metadata.tableName; if (!tableNameRegex.test(tableName)) { - throw new Error('Unexpected special character in table name: ' + tableName); + throw new Error(`Unexpected special character in table name: ${tableName}`); } const queryResult = await connection.query( `SELECT block_number FROM raw.${tableName} ORDER BY block_number DESC LIMIT 1`, -- cgit v1.2.3 From 00f86ca0f7871639d2b0be496f6f8c5e0d8d7ffe Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Tue, 4 Dec 2018 20:04:08 -0800 Subject: Address PR feedback --- .../pipeline/src/scripts/pull_missing_blocks.ts | 7 ++--- .../pipeline/src/scripts/pull_missing_events.ts | 5 ++-- .../src/scripts/pull_radar_relay_orders.ts | 33 +++++++++++++--------- .../pipeline/src/scripts/pull_trusted_tokens.ts | 14 ++++----- 4 files changed, 31 insertions(+), 28 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_missing_blocks.ts b/packages/pipeline/src/scripts/pull_missing_blocks.ts index 4a1483ab9..b7bd51f08 100644 --- a/packages/pipeline/src/scripts/pull_missing_blocks.ts +++ b/packages/pipeline/src/scripts/pull_missing_blocks.ts @@ -9,7 +9,7 @@ import { Web3Source } from '../data_sources/web3'; import { Block } from '../entities'; import * as ormConfig from '../ormconfig'; import { parseBlock } from '../parsers/web3'; -import { handleError } from '../utils'; +import { EXCHANGE_START_BLOCK, handleError, INFURA_ROOT_URL } from '../utils'; // Number of blocks to save at once. const BATCH_SAVE_SIZE = 1000; @@ -18,16 +18,13 @@ const MAX_CONCURRENCY = 10; // Maximum number of blocks to query for at once. This is also the maximum // number of blocks we will hold in memory prior to being saved to the database. const MAX_BLOCKS_PER_QUERY = 1000; -// Block number when the Exchange contract was deployed to mainnet. -// TODO(albrow): De-dupe this constant. -const EXCHANGE_START_BLOCK = 6271590; let connection: Connection; (async () => { connection = await createConnection(ormConfig as ConnectionOptions); const provider = web3Factory.getRpcProvider({ - rpcUrl: `https://mainnet.infura.io/${process.env.INFURA_API_KEY}`, + rpcUrl: `${INFURA_ROOT_URL}/${process.env.INFURA_API_KEY}`, }); const web3Source = new Web3Source(provider); await getAllMissingBlocks(web3Source); diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index c7e07c540..80abbb8b0 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -8,9 +8,8 @@ import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeEvent, ExchangeFillEvent } from '../entities'; import * as ormConfig from '../ormconfig'; import { parseExchangeCancelEvents, parseExchangeCancelUpToEvents, parseExchangeFillEvents } from '../parsers/events'; -import { handleError } from '../utils'; +import { EXCHANGE_START_BLOCK, handleError, INFURA_ROOT_URL } from '../utils'; -const EXCHANGE_START_BLOCK = 6271590; // Block number when the Exchange contract was deployed to mainnet. const START_BLOCK_OFFSET = 100; // Number of blocks before the last known block to consider when updating fill events. const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. @@ -19,7 +18,7 @@ let connection: Connection; (async () => { connection = await createConnection(ormConfig as ConnectionOptions); const provider = web3Factory.getRpcProvider({ - rpcUrl: 'https://mainnet.infura.io', + rpcUrl: INFURA_ROOT_URL, }); const eventsSource = new ExchangeEventsSource(provider, 1); await getFillEventsAsync(eventsSource); diff --git a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts index b3a4d887e..bbbef9b47 100644 --- a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts +++ b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts @@ -16,11 +16,11 @@ let connection: Connection; (async () => { connection = await createConnection(ormConfig as ConnectionOptions); - await getOrderbook(); + await getOrderbookAsync(); process.exit(0); })().catch(handleError); -async function getOrderbook(): Promise { +async function getOrderbookAsync(): Promise { console.log('Getting all orders...'); const connectClient = new HttpClient(RADAR_RELAY_URL); const rawOrders = await connectClient.getOrdersAsync({ @@ -29,17 +29,22 @@ async function getOrderbook(): Promise { console.log(`Got ${rawOrders.records.length} orders.`); console.log('Parsing orders...'); // Parse the sra orders, then add source url to each. - const orders = R.pipe(parseSraOrders, R.map(setSourceUrl(RADAR_RELAY_URL)))(rawOrders); + const orders = R.pipe( + parseSraOrders, + R.map(setSourceUrl(RADAR_RELAY_URL)), + )(rawOrders); // Save all the orders and update the observed time stamps in a single // transaction. console.log('Saving orders and updating timestamps...'); - await connection.transaction(async (manager: EntityManager): Promise => { - for (const order of orders) { - await manager.save(SraOrder, order); - const observedTimestamp = createObservedTimestampForOrder(order); - await manager.save(observedTimestamp); - } - }); + await connection.transaction( + async (manager: EntityManager): Promise => { + for (const order of orders) { + await manager.save(SraOrder, order); + const observedTimestamp = createObservedTimestampForOrder(order); + await manager.save(observedTimestamp); + } + }, + ); } const sourceUrlProp = R.lensProp('sourceUrl'); @@ -48,6 +53,8 @@ const sourceUrlProp = R.lensProp('sourceUrl'); * Sets the source url for a single order. Returns a new order instead of * mutating the given one. */ -const setSourceUrl = R.curry((sourceURL: string, order: SraOrder): SraOrder => { - return R.set(sourceUrlProp, sourceURL, order); -}); +const setSourceUrl = R.curry( + (sourceURL: string, order: SraOrder): SraOrder => { + return R.set(sourceUrlProp, sourceURL, order); + }, +); diff --git a/packages/pipeline/src/scripts/pull_trusted_tokens.ts b/packages/pipeline/src/scripts/pull_trusted_tokens.ts index b3a4466bb..1befc4437 100644 --- a/packages/pipeline/src/scripts/pull_trusted_tokens.ts +++ b/packages/pipeline/src/scripts/pull_trusted_tokens.ts @@ -8,7 +8,7 @@ import { parseMetamaskTrustedTokens, parseZeroExTrustedTokens } from '../parsers import { handleError } from '../utils'; const METAMASK_TRUSTED_TOKENS_URL = - 'https://raw.githubusercontent.com/MetaMask/eth-contract-metadata/master/contract-map.json'; + 'https://raw.githubusercontent.com/MetaMask/eth-contract-metadata/d45916c533116510cc8e9e048a8b5fc3732a6b6d/contract-map.json'; const ZEROEX_TRUSTED_TOKENS_URL = 'https://website-api.0xproject.com/tokens'; @@ -22,7 +22,7 @@ let connection: Connection; })().catch(handleError); async function getMetamaskTrustedTokens(): Promise { - // tslint:disable-next-line + // tslint:disable-next-line:no-console console.log('Getting latest metamask trusted tokens list ...'); const trustedTokensRepository = connection.getRepository(TokenMetadata); const trustedTokensSource = new TrustedTokenSource>( @@ -30,23 +30,23 @@ async function getMetamaskTrustedTokens(): Promise { ); const resp = await trustedTokensSource.getTrustedTokenMetaAsync(); const trustedTokens = parseMetamaskTrustedTokens(resp); - // tslint:disable-next-line + // tslint:disable-next-line:no-console console.log('Saving metamask trusted tokens list'); await trustedTokensRepository.save(trustedTokens); - // tslint:disable-next-line + // tslint:disable-next-line:no-console console.log('Done saving metamask trusted tokens.'); } async function getZeroExTrustedTokens(): Promise { - // tslint:disable-next-line + // tslint:disable-next-line:no-console console.log('Getting latest 0x trusted tokens list ...'); const trustedTokensRepository = connection.getRepository(TokenMetadata); const trustedTokensSource = new TrustedTokenSource(ZEROEX_TRUSTED_TOKENS_URL); const resp = await trustedTokensSource.getTrustedTokenMetaAsync(); const trustedTokens = parseZeroExTrustedTokens(resp); - // tslint:disable-next-line + // tslint:disable-next-line:no-console console.log('Saving metamask trusted tokens list'); await trustedTokensRepository.save(trustedTokens); - // tslint:disable-next-line + // tslint:disable-next-line:no-console console.log('Done saving metamask trusted tokens.'); } -- cgit v1.2.3 From 2e704ac01a077b0c73288aaa53c9cf66c73e27f1 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Tue, 4 Dec 2018 20:08:32 -0800 Subject: Fix prettier --- .../src/scripts/pull_radar_relay_orders.ts | 29 ++++++++-------------- 1 file changed, 11 insertions(+), 18 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts index bbbef9b47..6c18bcaef 100644 --- a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts +++ b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts @@ -29,22 +29,17 @@ async function getOrderbookAsync(): Promise { console.log(`Got ${rawOrders.records.length} orders.`); console.log('Parsing orders...'); // Parse the sra orders, then add source url to each. - const orders = R.pipe( - parseSraOrders, - R.map(setSourceUrl(RADAR_RELAY_URL)), - )(rawOrders); + const orders = R.pipe(parseSraOrders, R.map(setSourceUrl(RADAR_RELAY_URL)))(rawOrders); // Save all the orders and update the observed time stamps in a single // transaction. console.log('Saving orders and updating timestamps...'); - await connection.transaction( - async (manager: EntityManager): Promise => { - for (const order of orders) { - await manager.save(SraOrder, order); - const observedTimestamp = createObservedTimestampForOrder(order); - await manager.save(observedTimestamp); - } - }, - ); + await connection.transaction(async (manager: EntityManager): Promise => { + for (const order of orders) { + await manager.save(SraOrder, order); + const observedTimestamp = createObservedTimestampForOrder(order); + await manager.save(observedTimestamp); + } + }); } const sourceUrlProp = R.lensProp('sourceUrl'); @@ -53,8 +48,6 @@ const sourceUrlProp = R.lensProp('sourceUrl'); * Sets the source url for a single order. Returns a new order instead of * mutating the given one. */ -const setSourceUrl = R.curry( - (sourceURL: string, order: SraOrder): SraOrder => { - return R.set(sourceUrlProp, sourceURL, order); - }, -); +const setSourceUrl = R.curry((sourceURL: string, order: SraOrder): SraOrder => { + return R.set(sourceUrlProp, sourceURL, order); +}); -- cgit v1.2.3 From 08eb0b91b6d0f0dc90ae920a18ca5dd080bf235c Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Wed, 5 Dec 2018 12:27:32 -0800 Subject: Fix RadarRelay timestamps (#1391) * Fixing rr timestamps * Apply prettier --- packages/pipeline/src/scripts/pull_radar_relay_orders.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts index 6c18bcaef..40bb6fc97 100644 --- a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts +++ b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts @@ -33,11 +33,12 @@ async function getOrderbookAsync(): Promise { // Save all the orders and update the observed time stamps in a single // transaction. console.log('Saving orders and updating timestamps...'); + const observedTimestamp = Date.now(); await connection.transaction(async (manager: EntityManager): Promise => { for (const order of orders) { await manager.save(SraOrder, order); - const observedTimestamp = createObservedTimestampForOrder(order); - await manager.save(observedTimestamp); + const orderObservation = createObservedTimestampForOrder(order, observedTimestamp); + await manager.save(orderObservation); } }); } -- cgit v1.2.3 From 78d0ab1aa2393b7a0b21108b9811e0b0a4406faf Mon Sep 17 00:00:00 2001 From: Xianny <8582774+xianny@users.noreply.github.com> Date: Wed, 5 Dec 2018 16:05:06 -0800 Subject: Fix/pipeline/ohlcv (#1393) The OHLCV script in data pipeline quits early when we get no data from Crypto Compare. Sometimes Crypto Compare gives us a valid empty response (e.g. when we query for way back in time) and we need to just continue. This adds better filtering for the types of Crypto Compare responses to detect when we should continue and when we should really quit. --- .../src/scripts/pull_ohlcv_cryptocompare.ts | 38 +++++++++------------- 1 file changed, 16 insertions(+), 22 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts index 6979cd10e..7377a64d8 100644 --- a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts +++ b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts @@ -10,11 +10,9 @@ import { fetchOHLCVTradingPairsAsync, TradingPair } from '../utils/get_ohlcv_tra const SOURCE_NAME = 'CryptoCompare'; const TWO_HOURS_AGO = new Date().getTime() - 2 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers -const ONE_HOUR_AGO = new Date().getTime() - 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers -const ONE_SECOND = 1000; const MAX_CONCURRENT_REQUESTS = parseInt(process.env.CRYPTOCOMPARE_MAX_CONCURRENT_REQUESTS || '14', 10); // tslint:disable-line:custom-no-magic-numbers -const EARLIEST_BACKFILL_DATE = process.env.OHLCV_EARLIEST_BACKFILL_DATE || '2010-09-01'; // the time when BTC/USD info starts appearing on Crypto Compare +const EARLIEST_BACKFILL_DATE = process.env.OHLCV_EARLIEST_BACKFILL_DATE || '2014-06-01'; const EARLIEST_BACKFILL_TIME = new Date(EARLIEST_BACKFILL_DATE).getTime(); let connection: Connection; @@ -60,28 +58,24 @@ async function fetchAndSaveAsync( if (pair.latestSavedTime > TWO_HOURS_AGO) { break; } - const rawRecords = await source.getHourlyOHLCVAsync(pair); - const records = rawRecords.filter(rec => { - return rec.time * ONE_SECOND < ONE_HOUR_AGO && rec.time * ONE_SECOND > pair.latestSavedTime; - }); // Crypto Compare can take ~30mins to finalise records - if (records.length === 0) { - console.log(`No more records, stopping task for ${JSON.stringify(pair)}`); - break; - } - const metadata: OHLCVMetadata = { - exchange: source.default_exchange, - fromSymbol: pair.fromSymbol, - toSymbol: pair.toSymbol, - source: SOURCE_NAME, - observedTimestamp: jobTime, - interval: source.intervalBetweenRecords, - }; - const parsedRecords = parseRecords(records, metadata); try { - await saveRecordsAsync(repository, parsedRecords); + const records = await source.getHourlyOHLCVAsync(pair); + console.log(`Retrieved ${records.length} records for ${JSON.stringify(pair)}`); + if (records.length > 0) { + const metadata: OHLCVMetadata = { + exchange: source.default_exchange, + fromSymbol: pair.fromSymbol, + toSymbol: pair.toSymbol, + source: SOURCE_NAME, + observedTimestamp: jobTime, + interval: source.intervalBetweenRecords, + }; + const parsedRecords = parseRecords(records, metadata); + await saveRecordsAsync(repository, parsedRecords); + } i++; } catch (err) { - console.log(`Error saving OHLCVRecords, stopping task for ${JSON.stringify(pair)} [${err}]`); + console.log(`Error scraping OHLCVRecords, stopping task for ${JSON.stringify(pair)} [${err}]`); break; } } -- cgit v1.2.3 From 8b3b4d983f0f942fc4e42365d4cfcaf793d1d283 Mon Sep 17 00:00:00 2001 From: xianny Date: Thu, 6 Dec 2018 13:20:08 -0800 Subject: rename variable and define default in only 1 location --- packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts index 7377a64d8..a29a13bfc 100644 --- a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts +++ b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts @@ -11,7 +11,7 @@ import { fetchOHLCVTradingPairsAsync, TradingPair } from '../utils/get_ohlcv_tra const SOURCE_NAME = 'CryptoCompare'; const TWO_HOURS_AGO = new Date().getTime() - 2 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers -const MAX_CONCURRENT_REQUESTS = parseInt(process.env.CRYPTOCOMPARE_MAX_CONCURRENT_REQUESTS || '14', 10); // tslint:disable-line:custom-no-magic-numbers +const MAX_REQS_PER_SECOND = parseInt(process.env.CRYPTOCOMPARE_MAX_REQS_PER_SECOND || '15', 10); // tslint:disable-line:custom-no-magic-numbers const EARLIEST_BACKFILL_DATE = process.env.OHLCV_EARLIEST_BACKFILL_DATE || '2014-06-01'; const EARLIEST_BACKFILL_TIME = new Date(EARLIEST_BACKFILL_DATE).getTime(); @@ -20,7 +20,7 @@ let connection: Connection; (async () => { connection = await createConnection(ormConfig as ConnectionOptions); const repository = connection.getRepository(OHLCVExternal); - const source = new CryptoCompareOHLCVSource(MAX_CONCURRENT_REQUESTS); + const source = new CryptoCompareOHLCVSource(MAX_REQS_PER_SECOND); const jobTime = new Date().getTime(); const tradingPairs = await fetchOHLCVTradingPairsAsync(connection, SOURCE_NAME, EARLIEST_BACKFILL_TIME); -- cgit v1.2.3 From 096c4c8f2b20b4ca909d4ba950e219c22a5f8882 Mon Sep 17 00:00:00 2001 From: xianny Date: Mon, 10 Dec 2018 10:57:36 -0800 Subject: change to camelCase --- packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts index a29a13bfc..d44eb5cc6 100644 --- a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts +++ b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts @@ -63,7 +63,7 @@ async function fetchAndSaveAsync( console.log(`Retrieved ${records.length} records for ${JSON.stringify(pair)}`); if (records.length > 0) { const metadata: OHLCVMetadata = { - exchange: source.default_exchange, + exchange: source.defaultExchange, fromSymbol: pair.fromSymbol, toSymbol: pair.toSymbol, source: SOURCE_NAME, -- cgit v1.2.3