From eb1317a59abec368f3855eb3690e40d8db2a8984 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Mon, 17 Sep 2018 11:27:38 -0700 Subject: Rebase pipeline branch off development --- packages/pipeline/src/scripts/create_tables.ts | 258 ++++++++++ packages/pipeline/src/scripts/join_tables.ts | 234 +++++++++ packages/pipeline/src/scripts/query_data.ts | 87 ++++ packages/pipeline/src/scripts/scrape_data.ts | 649 +++++++++++++++++++++++++ 4 files changed, 1228 insertions(+) create mode 100644 packages/pipeline/src/scripts/create_tables.ts create mode 100644 packages/pipeline/src/scripts/join_tables.ts create mode 100644 packages/pipeline/src/scripts/query_data.ts create mode 100644 packages/pipeline/src/scripts/scrape_data.ts (limited to 'packages/pipeline/src/scripts') diff --git a/packages/pipeline/src/scripts/create_tables.ts b/packages/pipeline/src/scripts/create_tables.ts new file mode 100644 index 000000000..fd0d2b78b --- /dev/null +++ b/packages/pipeline/src/scripts/create_tables.ts @@ -0,0 +1,258 @@ +import * as commandLineArgs from 'command-line-args'; + +import { postgresClient } from '../postgres'; +import { formatters } from '../utils'; +const tableQueries: any = { + events_full: `CREATE TABLE IF NOT EXISTS events_full ( + timestamp TIMESTAMP WITH TIME ZONE, + event_type VARCHAR, + error_id VARCHAR, + order_hash CHAR(66), + maker CHAR(42), + maker_amount NUMERIC(78), + maker_fee NUMERIC(78), + maker_token CHAR(42), + taker CHAR(42), + taker_amount NUMERIC(78), + taker_fee NUMERIC(78), + taker_token CHAR(42), + txn_hash CHAR(66), + gas_used NUMERIC(78), + gas_price NUMERIC(78), + fee_recipient CHAR(42), + method_id CHAR(10), + salt VARCHAR, + block_number BIGINT, + log_index BIGINT, + taker_symbol VARCHAR, + taker_name VARCHAR, + taker_decimals BIGINT, + taker_usd_price NUMERIC(78), + taker_txn_usd_value NUMERIC(78), + maker_symbol VARCHAR, + maker_name VARCHAR, + maker_decimals BIGINT, + maker_usd_price NUMERIC(78), + maker_txn_usd_value NUMERIC(78), + PRIMARY KEY (txn_hash, order_hash, log_index) + )`, + events: `CREATE TABLE IF NOT EXISTS events ( + timestamp TIMESTAMP WITH TIME ZONE, + event_type VARCHAR, + error_id VARCHAR, + order_hash CHAR(66), + maker CHAR(42), + maker_amount NUMERIC(78), + maker_fee NUMERIC(78), + maker_token CHAR(42), + taker CHAR(42), + taker_amount NUMERIC(78), + taker_fee NUMERIC(78), + taker_token CHAR(42), + txn_hash CHAR(66), + gas_used NUMERIC(78), + gas_price NUMERIC(78), + fee_recipient CHAR(42), + method_id CHAR(10), + salt VARCHAR, + block_number BIGINT, + log_index BIGINT, + PRIMARY KEY (txn_hash, order_hash, log_index) + )`, + events_staging: `CREATE TABLE IF NOT EXISTS events_staging ( + timestamp TIMESTAMP WITH TIME ZONE, + event_type VARCHAR, + error_id VARCHAR, + order_hash CHAR(66), + maker CHAR(42), + maker_amount NUMERIC(78), + maker_fee NUMERIC(78), + maker_token CHAR(42), + taker CHAR(42), + taker_amount NUMERIC(78), + taker_fee NUMERIC(78), + taker_token CHAR(42), + txn_hash CHAR(66), + fee_recipient CHAR(42), + block_number BIGINT, + log_index BIGINT, + PRIMARY KEY (txn_hash, order_hash, log_index) + )`, + events_raw: `CREATE TABLE IF NOT EXISTS events_raw ( + event_type VARCHAR, + error_id VARCHAR, + order_hash CHAR(66), + maker CHAR(42), + maker_amount NUMERIC(78), + maker_fee NUMERIC(78), + maker_token CHAR(42), + taker CHAR(42), + taker_amount NUMERIC(78), + taker_fee NUMERIC(78), + taker_token CHAR(42), + txn_hash CHAR(66), + fee_recipient CHAR(42), + block_number BIGINT, + log_index BIGINT, + PRIMARY KEY (txn_hash, order_hash, log_index) + )`, + blocks: `CREATE TABLE IF NOT EXISTS blocks ( + timestamp TIMESTAMP WITH TIME ZONE, + block_hash CHAR(66) UNIQUE, + block_number BIGINT, + PRIMARY KEY (block_hash) + )`, + transactions: `CREATE TABLE IF NOT EXISTS transactions ( + txn_hash CHAR(66) UNIQUE, + block_hash CHAR(66), + block_number BIGINT, + gas_used NUMERIC(78), + gas_price NUMERIC(78), + method_id CHAR(10), + salt VARCHAR, + PRIMARY KEY (txn_hash) + )`, + tokens: `CREATE TABLE IF NOT EXISTS tokens ( + address CHAR(42) UNIQUE, + name VARCHAR, + symbol VARCHAR, + decimals INT, + PRIMARY KEY (address) + )`, + prices: `CREATE TABLE IF NOT EXISTS prices ( + address CHAR(42) UNIQUE, + timestamp TIMESTAMP WITH TIME ZONE, + price NUMERIC(78, 18), + PRIMARY KEY (address, timestamp) + )`, + relayers: `CREATE TABLE IF NOT EXISTS relayers ( + name VARCHAR UNIQUE, + url VARCHAR DEFAULT '', + sra_http_endpoint VARCHAR DEFAULT '', + sra_ws_endpoint VARCHAR DEFAULT '', + fee_recipient_addresses CHAR(42)[] DEFAULT '{}', + taker_addresses CHAR(42)[] DEFAULT '{}', + PRIMARY KEY(name)`, + historical_prices: `CREATE TABLE IF NOT EXISTS historical_prices ( + token VARCHAR, + base VARCHAR, + timestamp TIMESTAMP WITH TIME ZONE, + close NUMERIC(78, 18), + high NUMERIC(78, 18), + low NUMERIC(78, 18), + open NUMERIC(78, 18), + volume_from NUMERIC(78, 18), + volume_to NUMERIC(78, 18), + PRIMARY KEY (token, base, timestamp) + )`, + orders: `CREATE TABLE IF NOT EXISTS orders ( + relayer_id VARCHAR, + exchange_contract_address CHAR(42), + maker CHAR(42), + maker_amount NUMERIC(78), + maker_fee NUMERIC(78), + maker_token CHAR(42), + taker CHAR(42), + taker_amount NUMERIC(78), + taker_fee NUMERIC(78), + taker_token CHAR(42), + fee_recipient CHAR(42), + expiration_unix_timestamp_sec NUMERIC(78), + salt VARCHAR, + order_hash CHAR(66), + PRIMARY KEY (relayer_id, order_hash) + )`, +}; +function _safeQuery(query: string): any { + return new Promise((resolve, reject) => { + postgresClient + .query(query) + .then((data: any) => { + resolve(data); + }) + .catch((err: any) => { + reject(err); + }); + }); +} +export const tableScripts = { + createTable(query: string): any { + return _safeQuery(query); + }, + createAllTables(): any { + for (const tableName of tableQueries) { + _safeQuery(tableQueries[tableName]); + } + }, +}; +export const insertDataScripts = { + insertSingleRow(table: string, object: any): any { + return new Promise((resolve, reject) => { + const columns = Object.keys(object); + const safeArray: any = []; + for (const key of columns) { + if (key in object) { + if (key === 'timestamp') { + safeArray.push('to_timestamp(' + object[key] + ')'); + } else if (typeof object[key] === 'string' || object[key] instanceof String) { + safeArray.push(formatters.escapeSQLParam(object[key])); + } else { + safeArray.push(object[key]); + } + } else { + safeArray.push('default'); + } + } + const queryString = `INSERT INTO ${table} (${columns}) VALUES (${safeArray}) ON CONFLICT DO NOTHING`; + console.log(queryString); + postgresClient + .query(queryString) + .then((data: any) => { + resolve(data); + }) + .catch((err: any) => { + reject(err); + }); + }); + }, + insertMultipleRows(table: string, rows: any[], columns: any[]): any { + return new Promise((resolve, reject) => { + if (rows.length > 0) { + const rowsSplit = rows.map((value, index) => { + const safeArray: any = []; + for (const key of columns) { + if (key in value) { + if (key === 'timestamp') { + safeArray.push('to_timestamp(' + value[key] + ')'); + } else if (typeof value[key] === 'string' || value[key] instanceof String) { + safeArray.push(formatters.escapeSQLParam(value[key])); + } else if (value[key] instanceof Array) { + const escapedArray = value[key].map((subValue: string, subIndex: number) => { + return formatters.escapeSQLParam(subValue); + }); + safeArray.push('ARRAY[' + escapedArray.toString() + ']'); + } else { + safeArray.push(value[key]); + } + } else { + safeArray.push('default'); + } + } + return '(' + safeArray + ')'; + }); + const queryString = `INSERT INTO ${table} (${columns}) VALUES ${rowsSplit} ON CONFLICT DO NOTHING`; + postgresClient + .query(queryString) + .then((data: any) => { + resolve(data); + }) + .catch((err: any) => { + // console.log(err); + reject(err); + }); + } else { + resolve({}); + } + }); + }, +}; diff --git a/packages/pipeline/src/scripts/join_tables.ts b/packages/pipeline/src/scripts/join_tables.ts new file mode 100644 index 000000000..e7c05b39a --- /dev/null +++ b/packages/pipeline/src/scripts/join_tables.ts @@ -0,0 +1,234 @@ +import * as commandLineArgs from 'command-line-args'; + +import { postgresClient } from '../postgres'; +import { formatters } from '../utils'; +const optionDefinitions = [ + { name: 'name', alias: 'n', type: String }, + { name: 'from', alias: 'f', type: Number }, + { name: 'to', alias: 't', type: Number }, +]; +const cli = commandLineArgs(optionDefinitions); +const dataInsertionQueries: any = { + events_staging: `INSERT INTO events_staging ( + timestamp, + event_type, + error_id, + order_hash, + maker, + maker_amount, + maker_fee, + maker_token, + taker, + taker_amount, + taker_fee, + taker_token, + txn_hash, + fee_recipient, + block_number, + log_index + ) + (SELECT + b.timestamp, + a.event_type, + a.error_id, + a.order_hash, + a.maker, + a.maker_amount, + a.maker_fee, + a.maker_token, + a.taker, + a.taker_amount, + a.taker_fee, + a.taker_token, + a.txn_hash, + a.fee_recipient, + a.block_number, + a.log_index + FROM + events_raw a + JOIN + blocks b + ON + a.block_number = b.block_number + AND + b.block_number >= $1 + AND + b.block_number <= $2 + ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`, + events: `INSERT INTO events ( + timestamp, + event_type, + error_id, + order_hash, + maker, + maker_amount, + maker_fee, + maker_token, + taker, + taker_amount, + taker_fee, + taker_token, + txn_hash, + fee_recipient, + block_number, + log_index, + gas_used, + gas_price, + method_id, + salt + ) + (SELECT + a.timestamp, + a.event_type, + a.error_id, + a.order_hash, + a.maker, + a.maker_amount, + a.maker_fee, + a.maker_token, + a.taker, + a.taker_amount, + a.taker_fee, + a.taker_token, + a.txn_hash, + a.fee_recipient, + a.block_number, + a.log_index, + b.gas_used, + b.gas_price, + b.method_id, + b.salt + FROM + events_staging a + JOIN + transactions b + ON + a.txn_hash = b.txn_hash + AND + a.block_number >= $1 + AND + a.block_number <= $2 + ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`, + events_full: ` + INSERT INTO events_full ( + timestamp, + event_type, + error_id, + order_hash, + maker, + maker_amount, + maker_fee, + maker_token, + taker, + taker_amount, + taker_fee, + taker_token, + txn_hash, + fee_recipient, + block_number, + log_index, + gas_used, + gas_price, + method_id, + salt, + taker_symbol, + taker_name, + taker_decimals, + taker_usd_price, + taker_txn_usd_value, + maker_symbol, + maker_name, + maker_decimals, + maker_usd_price, + maker_txn_usd_value + ) + (SELECT + events.timestamp, + events.event_type, + events.error_id, + events.order_hash, + events.maker, + events.maker_amount, + events.maker_fee, + events.maker_token, + events.taker, + events.taker_amount, + events.taker_fee, + events.taker_token, + events.txn_hash, + events.fee_recipient, + events.block_number, + events.log_index, + events.gas_used, + events.gas_price, + events.method_id, + events.salt, + taker_token_prices.symbol, + taker_token_prices.name, + taker_token_prices.decimals, + taker_token_prices.price, + (events.taker_amount / (10 ^ taker_token_prices.decimals)) * taker_token_prices.price, + maker_token_prices.symbol, + maker_token_prices.name, + maker_token_prices.decimals, + maker_token_prices.price, + (events.maker_amount / (10 ^ maker_token_prices.decimals)) * maker_token_prices.price + FROM + events + LEFT JOIN + (SELECT + tokens.address, + tokens.name, + tokens.symbol, + tokens.decimals, + prices.timestamp, + prices.price + FROM + tokens + LEFT JOIN + prices + ON + tokens.symbol = prices.symbol) taker_token_prices + ON + (events.taker_token = taker_token_prices.address + AND + (DATE(events.timestamp) = DATE(taker_token_prices.timestamp) OR taker_token_prices.timestamp IS NULL)) + LEFT JOIN + (SELECT + tokens.address, + tokens.name, + tokens.symbol, + tokens.decimals, + prices.timestamp, + prices.price + FROM + tokens + LEFT JOIN + prices + ON + tokens.symbol = prices.symbol) maker_token_prices + ON + (events.maker_token = maker_token_prices.address + AND + (DATE(events.timestamp) = DATE(maker_token_prices.timestamp) OR maker_token_prices.timestamp IS NULL)) + WHERE + events.block_number >= $1 + AND + events.block_number <= $2 + ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`, +}; +if (cli.name) { + const query = dataInsertionQueries[cli.name]; + if (query && cli.from) { + const fromBlock = cli.from; + const toBlock = cli.to ? cli.to : cli.from + 1; + postgresClient + .query(query, [fromBlock, toBlock]) + .then((data: any) => { + console.log(data); + }) + .catch((err: any) => { + console.error(err); + }); + } +} diff --git a/packages/pipeline/src/scripts/query_data.ts b/packages/pipeline/src/scripts/query_data.ts new file mode 100644 index 000000000..97e3749ea --- /dev/null +++ b/packages/pipeline/src/scripts/query_data.ts @@ -0,0 +1,87 @@ +import { formatters } from '../utils'; +export const dataFetchingQueries: any = { + get_missing_txn_hashes: ` + SELECT + a.txn_hash + FROM + events_raw a + WHERE NOT EXISTS + ( + SELECT + * + FROM + transactions b + WHERE + b.txn_hash = a.txn_hash + ) + AND + a.block_number >= $1 + AND + a.block_number < $2`, + get_used_block_numbers: ` + SELECT DISTINCT + a.block_number + FROM + events_raw a + WHERE NOT EXISTS + ( + SELECT + * + FROM + blocks b + WHERE + b.block_number = a.block_number + ) + AND + a.block_number >= $1 + AND + a.block_number < $2`, + get_token_registry: ` + SELECT + * + FROM + tokens`, + get_max_block: ` + SELECT + MAX(block_number) + FROM + events_raw`, + get_relayers: ` + SELECT + * + FROM + relayers`, + get_most_recent_pricing_date: ` + SELECT + MAX(DATE(timestamp)) + FROM + prices + `, + get_top_unknown_token_addresses: ` + SELECT a.token_address as address, a.txn_value / 2 as total_txn_value +FROM +(SELECT token_address, SUM(txn_value) as txn_value +FROM +(select a.timestamp, a.maker_token as token_address, (CASE WHEN a.taker_txn_usd_value > a.maker_txn_usd_value OR a.maker_txn_usd_value IS NULL + THEN a.taker_txn_usd_value + ELSE a.maker_txn_usd_value END) as txn_value + from events_full a + where a.event_type = 'LogFill' + and a.timestamp > (NOW() + INTERVAL '-24 hours') + union + select a.timestamp, a.taker_token as token_address, (CASE WHEN a.taker_txn_usd_value > a.maker_txn_usd_value OR a.maker_txn_usd_value IS NULL + THEN a.taker_txn_usd_value + ELSE a.maker_txn_usd_value END) as txn_value + from events_full a + where a.event_type = 'LogFill' + and a.timestamp > (NOW() + INTERVAL '-24 hours')) token_txn_values +WHERE token_address IS NOT NULL +AND txn_value > 0 +GROUP BY 1 +ORDER BY 2 DESC) a +LEFT JOIN tokens b +ON a.token_address = b.address +WHERE symbol is NULL +ORDER BY 2 DESC +`, +}; diff --git a/packages/pipeline/src/scripts/scrape_data.ts b/packages/pipeline/src/scripts/scrape_data.ts new file mode 100644 index 000000000..963670b47 --- /dev/null +++ b/packages/pipeline/src/scripts/scrape_data.ts @@ -0,0 +1,649 @@ +import { ExchangeEvents, ZeroEx } from '0x.js'; +import { HttpClient, Order, OrderbookRequest, OrderbookResponse, TokenPairsItem } from '@0xproject/connect'; +import * as Airtable from 'airtable'; +import * as commandLineArgs from 'command-line-args'; +import * as _ from 'lodash'; +import * as querystring from 'querystring'; +import * as queue from 'queue'; +import * as request from 'request'; +import * as rpn from 'request-promise-native'; + +import { HttpRequestOptions } from '../../../connect/lib/src/types.js'; +import { relayer } from '../models/relayer.js'; +import { token } from '../models/tokens.js'; +import { postgresClient } from '../postgres.js'; +import { typeConverters } from '../utils.js'; +import { web3, zrx } from '../zrx.js'; + +import { insertDataScripts } from './create_tables.js'; +import { dataFetchingQueries } from './query_data.js'; +const optionDefinitions = [ + { name: 'from', alias: 'f', type: Number }, + { name: 'to', alias: 't', type: Number }, + { name: 'type', type: String }, + { name: 'id', type: String }, + { name: 'force', type: Boolean }, + { name: 'token', type: String }, +]; +const cli = commandLineArgs(optionDefinitions); +const q = queue({ concurrency: 6, autostart: true }); +const airtableBase = new Airtable({ apiKey: process.env.AIRTABLE_API_KEY }).base(process.env.AIRTABLE_0X_BASE); +const BLOCK_INCREMENTS = 1000; +const BASE_SYMBOL = 'USD'; // use USD as base currency against +const API_HIST_LIMIT = 2000; // cryptocompare API limits histoday price query to 2000 days +const SECONDS_PER_DAY = 86400; +const PRICE_API_ENDPOINT = 'https://min-api.cryptocompare.com/data/pricehistorical'; +const RELAYER_REGISTRY_JSON = 'https://raw.githubusercontent.com/0xProject/0x-relayer-registry/master/relayers.json'; +const METAMASK_ETH_CONTRACT_METADATA_JSON = + 'https://raw.githubusercontent.com/MetaMask/eth-contract-metadata/master/contract-map.json'; +const ETHPLORER_BASE_URL = 'http://api.ethplorer.io'; +const ETHPLORER_TOP_TOKENS_JSON = `${ETHPLORER_BASE_URL}/getTopTokens?apiKey=dyijm5418TjOJe34`; +// const HIST_PRICE_API_ENDPOINT = 'https://min-api.cryptocompare.com/data/histoday'; +const AIRTABLE_RELAYER_INFO = 'Relayer Info'; +export const pullDataScripts = { + getAllEvents(fromBlockNumber: number, toBlockNumber: number): any { + return new Promise((resolve, reject) => { + const getLogsPromises: any[] = []; + getLogsPromises.push( + zrx.exchange.getLogsAsync( + ExchangeEvents.LogFill, + { fromBlock: fromBlockNumber, toBlock: toBlockNumber }, + {}, + ), + zrx.exchange.getLogsAsync( + ExchangeEvents.LogCancel, + { fromBlock: fromBlockNumber, toBlock: toBlockNumber }, + {}, + ), + zrx.exchange.getLogsAsync( + ExchangeEvents.LogError, + { fromBlock: fromBlockNumber, toBlock: toBlockNumber }, + {}, + ), + ); + Promise.all(getLogsPromises) + .then((data: any[]) => { + resolve(data); + }) + .catch((err: any) => { + reject(err); + }); + }); + }, + getBlockInfo(blockNumber: number): any { + return new Promise((resolve, reject) => { + web3.eth.getBlock(blockNumber, (err, result) => { + if (err) { + reject(err); + } else { + resolve(result); + } + }); + }); + }, + getTransactionInfo(transactionHash: string): any { + return new Promise((resolve, reject) => { + web3.eth.getTransaction(transactionHash, (err, result) => { + if (err) { + reject(err); + } else { + resolve(result); + } + }); + }); + }, + getTokenRegistry(): any { + return new Promise((resolve, reject) => { + zrx.tokenRegistry + .getTokensAsync() + .then((data: any) => { + resolve(data); + }) + .catch((err: any) => { + reject(err); + }); + }); + }, + getMetaMaskTokens(): any { + return new Promise((resolve, reject) => { + request(METAMASK_ETH_CONTRACT_METADATA_JSON, (error, response, body) => { + if (error) { + reject(error); + } else { + resolve(JSON.parse(body)); + } + }); + }); + }, + getEthplorerTopTokens(): any { + return new Promise((resolve, reject) => { + request(ETHPLORER_TOP_TOKENS_JSON, (error, response, body) => { + if (error) { + reject(error); + } else { + resolve(JSON.parse(body)); + } + }); + }); + }, + getEthplorerToken(tokenAddress: string): any { + return new Promise((resolve, reject) => { + const url = `${ETHPLORER_BASE_URL}/getTokenInfo/${tokenAddress}?apiKey=dyijm5418TjOJe34`; + request(url, (error, response, body) => { + if (error) { + reject(error); + } else { + try { + const json = JSON.parse(body); + resolve(json); + } catch (err) { + resolve({ error: 'error' }); + } + } + }); + }); + }, + getPriceData(symbol: string, timestamp: number, timeDelay?: number): any { + return new Promise((resolve, reject) => { + if (symbol === 'WETH') { + symbol = 'ETH'; + } + let parsedParams = querystring.stringify({ + fsym: symbol, + tsyms: 'USD', + ts: timestamp / 1000, + }); + console.debug(parsedParams); + setTimeout(() => { + request(PRICE_API_ENDPOINT + '?' + parsedParams, (error, response, body) => { + if (error) { + reject(error); + } else { + resolve(JSON.parse(body)); + } + }); + }, timeDelay); + }); + }, + getRelayers(): any { + return new Promise((resolve, reject) => { + request(RELAYER_REGISTRY_JSON, (error, response, body) => { + if (error) { + reject(error); + } else { + resolve(JSON.parse(body)); + } + }); + }); + }, + async getOrderBook(sraEndpoint: string): Promise { + const relayerClient = new HttpClient(sraEndpoint); + const tokenResponse: TokenPairsItem[] = await relayerClient.getTokenPairsAsync(); + const fullOrderBook: OrderbookResponse[] = []; + for (const tokenPair of tokenResponse) { + const orderBookRequest: OrderbookRequest = { + baseTokenAddress: tokenPair.tokenA.address, + quoteTokenAddress: tokenPair.tokenB.address, + }; + const orderBook: OrderbookResponse = await relayerClient.getOrderbookAsync(orderBookRequest); + fullOrderBook.push(orderBook); + } + return fullOrderBook; + }, + // async getHistoricalPrices( + // fromSymbol: string, + // toSymbol: string, + // fromTimestamp: number, + // toTimestamp: number, + // ): Promise { + // const daysInQueryPeriod = Math.round((toTimestamp - fromTimestamp) / (SECONDS_PER_DAY)); + // if(fromSymbol === 'WETH') { + // fromSymbol = 'ETH'; + // } + // var parsedParams = { + // fsym: fromSymbol, + // tsym: toSymbol, + // limit: Math.min(daysInQueryPeriod, API_HIST_LIMIT), + // toTs: toTimestamp, + // }; + // var options = { + // uri: HIST_PRICE_API_ENDPOINT, + // qs: parsedParams, + // json: false, + // }; + // try { + // const response = await rpn(options); + // return Promise.resolve(JSON.parse(response)); + // } catch (error) { + // console.debug(error); + // return Promise.reject(error); + // } + // }, +}; +export const scrapeDataScripts = { + scrapeAllPricesToDB(fromTime: number, toTime: number) { + const fromDate = new Date(fromTime); + fromDate.setUTCHours(0); + fromDate.setUTCMinutes(0); + fromDate.setUTCSeconds(0); + fromDate.setUTCMilliseconds(0); + const toDate = new Date(toTime); + postgresClient + .query(dataFetchingQueries.get_token_registry, []) + .then((result: any) => { + for (const curDate = fromDate; curDate < toDate; curDate.setDate(curDate.getDate() + 1)) { + for (const token of Object.values(result.rows)) { + console.debug('Scraping ' + curDate + ' ' + token); + q.push(_scrapePriceToDB(curDate.getTime(), token, 500)); + } + } + }) + .catch((err: any) => { + console.debug(err); + }); + }, +}; +function _scrapeEventsToDB(fromBlock: number, toBlock: number): any { + return (cb: () => void) => { + pullDataScripts + .getAllEvents(fromBlock, toBlock) + .then((data: any) => { + const parsedEvents: any = {}; + parsedEvents[ExchangeEvents.LogFill] = []; + parsedEvents[ExchangeEvents.LogCancel] = []; + parsedEvents[ExchangeEvents.LogError] = []; + for (const index in data) { + for (const datum of data[index]) { + const event = typeConverters.convertLogEventToEventObject(datum); + parsedEvents[event.event_type].push(event); + } + } + console.log(fromBlock + ' : ' + toBlock + ' ' + parsedEvents[ExchangeEvents.LogFill].length); + for (const event_type in parsedEvents) { + if (parsedEvents[event_type].length > 0) { + insertDataScripts + .insertMultipleRows( + 'events_raw', + parsedEvents[event_type], + Object.keys(parsedEvents[event_type][0]), + ) + .then(() => {}) + .catch((error: any) => {}); + } + } + cb(); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapeBlockToDB(block: number): any { + return (cb: () => void) => { + pullDataScripts + .getBlockInfo(block) + .then((data: any) => { + const parsedBlock = typeConverters.convertLogBlockToBlockObject(data); + insertDataScripts + .insertSingleRow('blocks', parsedBlock) + .then((result: any) => { + cb(); + }) + .catch((err: any) => { + cb(); + }); + }) + .catch((err: any) => { + cb(); + }); + }; +} +// function _scrapeAllRelayersToDB(): any { +// return (cb: () => void) => { +// airtableBase(AIRTABLE_RELAYER_INFO) +// .select() +// .eachPage((records: any, fetchNextPage: () => void) => { +// const parsedRelayers: any[] = []; +// for(const record of records) { +// parsedRelayers.push(typeConverters.convertRelayerToRelayerObject(record)); +// } +// insertDataScripts.insertMultipleRows('relayers', parsedRelayers, Object.keys(parsedRelayers[0])) +// .then((result: any) => { +// cb(); +// }) +// .catch((err: any) => { +// cb(); +// }); +// }) +// .catch((err: any) => { +// cb(); +// }); +// }; +// } +function _scrapeAllRelayersToDB(): any { + return (cb: () => void) => { + pullDataScripts + .getRelayers() + .then((relayers: any[]) => { + console.log(relayers); + const parsedRelayers: any[] = []; + for (const relayer of relayers) { + parsedRelayers.push(typeConverters.convertRelayerToRelayerObject(relayer)); + } + console.log(parsedRelayers); + insertDataScripts + .insertMultipleRows('relayers', parsedRelayers, Object.keys(relayer.tableProperties)) + .then((result: any) => { + console.log(result); + cb(); + }) + .catch((err: any) => { + console.log(err); + cb(); + }); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapeTransactionToDB(transactionHash: string): any { + return (cb: () => void) => { + pullDataScripts + .getTransactionInfo(transactionHash) + .then((data: any) => { + const parsedTransaction = typeConverters.convertLogTransactionToTransactionObject(data); + insertDataScripts + .insertSingleRow('transactions', parsedTransaction) + .then((result: any) => { + cb(); + }) + .catch((err: any) => { + cb(); + }); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapeTokenRegistryToDB(): any { + return (cb: () => void) => { + pullDataScripts + .getTokenRegistry() + .then((data: any) => { + const parsedTokens: any = []; + for (const token of data) { + parsedTokens.push(typeConverters.convertLogTokenToTokenObject(token)); + } + insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); + cb(); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapeMetaMaskEthContractMetadataToDB(): any { + return (cb: () => void) => { + pullDataScripts + .getMetaMaskTokens() + .then((data: any) => { + const parsedTokens: any = []; + const dataArray = _.map(_.keys(data), (tokenAddress: string) => { + const value = _.get(data, tokenAddress); + return { + address: tokenAddress, + ...value, + }; + }); + const erc20TokensOnly = _.filter(dataArray, entry => { + const isErc20 = _.get(entry, 'erc20'); + return isErc20; + }); + for (const token of erc20TokensOnly) { + parsedTokens.push(typeConverters.convertMetaMaskTokenToTokenObject(token)); + } + insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); + cb(); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapeEthplorerTopTokensToDB(): any { + return (cb: () => void) => { + pullDataScripts + .getEthplorerTopTokens() + .then((data: any) => { + const parsedTokens: any = []; + const tokens = _.get(data, 'tokens'); + for (const token of tokens) { + parsedTokens.push(typeConverters.convertMetaMaskTokenToTokenObject(token)); + } + insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); + cb(); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapeUnknownTokenInformationToDB(): any { + return (cb: () => void) => { + postgresClient + .query(dataFetchingQueries.get_top_unknown_token_addresses) + .then(async (result: any) => { + const addresses = _.map(result.rows, row => _.get(row, 'address')); + const responses = await Promise.all( + _.map(addresses, address => pullDataScripts.getEthplorerToken(address)), + ); + const tokens = _.filter(responses, response => _.isUndefined(_.get(response, 'error'))); + const parsedTokens = _.map(tokens, tokenInfo => + typeConverters.convertEthplorerTokenToTokenObject(tokenInfo), + ); + insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); + cb(); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapePriceToDB(timestamp: number, token: any, timeDelay?: number): any { + return (cb: () => void) => { + pullDataScripts + .getPriceData(token.symbol, timestamp, timeDelay) + .then((data: any) => { + const safeSymbol = token.symbol === 'WETH' ? 'ETH' : token.symbol; + const parsedPrice = { + timestamp: timestamp / 1000, + symbol: token.symbol, + base: 'USD', + price: _.has(data[safeSymbol], 'USD') ? data[safeSymbol].USD : 0, + }; + console.debug('Inserting ' + timestamp); + console.debug(parsedPrice); + insertDataScripts.insertSingleRow('prices', parsedPrice); + cb(); + }) + .catch((err: any) => { + console.debug(err); + cb(); + }); + }; +} +// function _scrapeHistoricalPricesToDB(token: any, fromTimestamp: number, toTimestamp: number): any { +// return (cb: () => void) => { +// pullDataScripts +// .getHistoricalPrices(token, BASE_SYMBOL, fromTimestamp, toTimestamp) +// .then((data: any) => { +// const parsedHistoricalPrices: any = []; +// for (const historicalPrice of data['Data']) { +// const parsedHistoricalPrice = typeConverters.convertLogHistoricalPricesToHistoricalPricesObject(historicalPrice); +// parsedHistoricalPrice['token'] = token; +// parsedHistoricalPrice['base'] = BASE_SYMBOL; +// parsedHistoricalPrices.push(parsedHistoricalPrice); +// } +// if (parsedHistoricalPrices.length > 0) { +// insertDataScripts +// .insertMultipleRows( +// 'historical_prices', +// parsedHistoricalPrices, +// Object.keys(parsedHistoricalPrices[0]), +// ) +// .catch((error: any) => { +// console.error(error); +// }); +// } +// cb(); +// }) +// .catch((error: any) => { +// console.error(error); +// cb(); +// }); +// }; +// } +function _scrapeOrderBookToDB(id: string, sraEndpoint: string): any { + return (cb: () => void) => { + pullDataScripts + .getOrderBook(sraEndpoint) + .then((data: any) => { + for (const book of data) { + for (const order of book.bids) { + console.debug(order); + const parsedOrder = typeConverters.convertLogOrderToOrderObject(order); + parsedOrder.relayer_id = id; + parsedOrder.order_hash = ZeroEx.getOrderHashHex(order); + insertDataScripts.insertSingleRow('orders', parsedOrder).catch((error: any) => { + console.error(error); + }); + } + for (const order of book.asks) { + console.debug(order); + const parsedOrder = typeConverters.convertLogOrderToOrderObject(order); + parsedOrder.relayer_id = id; + parsedOrder.order_hash = ZeroEx.getOrderHashHex(order); + insertDataScripts.insertSingleRow('orders', parsedOrder).catch((error: any) => { + console.error(error); + }); + } + } + cb(); + }) + .catch((error: any) => { + console.error(error); + cb(); + }); + }; +} +if (cli.type === 'events') { + if (cli.from && cli.to) { + const destToBlock = cli.to ? cli.to : cli.from; + let curFromBlock = cli.from; + let curToBlock = curFromBlock; + do { + curToBlock += destToBlock - curToBlock < BLOCK_INCREMENTS ? destToBlock - curToBlock : BLOCK_INCREMENTS; + q.push(_scrapeEventsToDB(curFromBlock, curToBlock)); + curFromBlock = curToBlock + 1; + } while (curToBlock < destToBlock); + } +} else if (cli.type === 'blocks') { + if (cli.from && cli.to) { + if (cli.force) { + const destToBlock = cli.to ? cli.to : cli.from; + let curFromBlock = cli.from; + const curToBlock = curFromBlock; + for (; curFromBlock < destToBlock; curFromBlock++) { + q.push(_scrapeBlockToDB(curFromBlock)); + } + } else { + const fetchFrom = cli.from; + const fetchTo = cli.to ? cli.to : cli.from + 1; + postgresClient + .query(dataFetchingQueries.get_used_block_numbers, [fetchFrom, fetchTo]) + .then((data: any) => { + for (const row of data.rows) { + q.push(_scrapeBlockToDB(row.block_number)); + } + }) + .catch((err: any) => { + // console.debug(err); + }); + } + } +} else if (cli.type === 'transactions') { + if (cli.id) { + q.push(_scrapeTransactionToDB(cli.id)); + } else if (cli.from) { + const fetchFrom = cli.from; + const fetchTo = cli.to ? cli.to : cli.from + 1; + postgresClient + .query(dataFetchingQueries.get_missing_txn_hashes, [fetchFrom, fetchTo]) + .then((data: any) => { + for (const row of data.rows) { + q.push(_scrapeTransactionToDB(row.txn_hash)); + } + }) + .catch((err: any) => { + // console.debug(err); + }); + } +} else if (cli.type === 'tokens') { + q.push(_scrapeMetaMaskEthContractMetadataToDB()); + q.push(_scrapeEthplorerTopTokensToDB()); +} else if (cli.type === 'unknown_tokens') { + q.push(_scrapeUnknownTokenInformationToDB()); +} else if (cli.type === 'prices' && cli.from && cli.to) { + const fromDate = new Date(cli.from); + console.debug(fromDate); + fromDate.setUTCHours(0); + fromDate.setUTCMinutes(0); + fromDate.setUTCSeconds(0); + fromDate.setUTCMilliseconds(0); + console.debug(fromDate); + const toDate = new Date(cli.to); + postgresClient + .query(dataFetchingQueries.get_token_registry, []) + .then((result: any) => { + for (const curDate = fromDate; curDate < toDate; curDate.setDate(curDate.getDate() + 1)) { + for (const token of Object.values(result.rows)) { + console.debug('Scraping ' + curDate + ' ' + token); + q.push(_scrapePriceToDB(curDate.getTime(), token)); + } + } + }) + .catch((err: any) => { + console.debug(err); + }); + // } else if (cli.type === 'historical_prices') { + // if (cli.token && cli.from && cli.to) { + // q.push(_scrapeHistoricalPricesToDB(cli.token, cli.from, cli.to)); + // } + // } else if (cli.type === 'all_historical_prices') { + // if (cli.from && cli.to) { + // postgresClient + // .query(dataFetchingQueries.get_token_registry, []) + // .then((result: any) => { + // const curTokens: any = result.rows.map((a: any): any => a.symbol); + // for (const curToken of curTokens) { + // console.debug('Historical data backfill: Pushing coin ' + curToken); + // q.push(_scrapeHistoricalPricesToDB(curToken, cli.from, cli.to)); + // } + // }) + // .catch((err: any) => { + // console.debug(err); + // }); + // } +} else if (cli.type === 'relayers') { + q.push(_scrapeAllRelayersToDB()); +} else if (cli.type === 'orders') { + postgresClient.query(dataFetchingQueries.get_relayers, []).then((result: any) => { + for (const relayer of result.rows) { + if (relayer.sra_http_url) { + q.push(_scrapeOrderBookToDB(relayer.id, relayer.sra_http_url)); + } + } + }); +} -- cgit v1.2.3