From eb1317a59abec368f3855eb3690e40d8db2a8984 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Mon, 17 Sep 2018 11:27:38 -0700 Subject: Rebase pipeline branch off development --- packages/pipeline/src/global.d.ts | 6 + packages/pipeline/src/models/block.ts | 22 + packages/pipeline/src/models/event.ts | 84 +++ packages/pipeline/src/models/historical_prices.ts | 43 ++ packages/pipeline/src/models/order.ts | 30 + packages/pipeline/src/models/price.ts | 15 + packages/pipeline/src/models/relayer.ts | 75 +++ packages/pipeline/src/models/tokens.ts | 24 + packages/pipeline/src/models/transaction.ts | 36 ++ packages/pipeline/src/postgres.ts | 12 + packages/pipeline/src/run_jobs.ts | 87 +++ packages/pipeline/src/scrape_prices.ts | 10 + packages/pipeline/src/scripts/create_tables.ts | 258 +++++++++ packages/pipeline/src/scripts/join_tables.ts | 234 ++++++++ packages/pipeline/src/scripts/query_data.ts | 87 +++ packages/pipeline/src/scripts/scrape_data.ts | 649 ++++++++++++++++++++++ packages/pipeline/src/utils.ts | 181 ++++++ packages/pipeline/src/zrx.ts | 11 + 18 files changed, 1864 insertions(+) create mode 100644 packages/pipeline/src/global.d.ts create mode 100644 packages/pipeline/src/models/block.ts create mode 100644 packages/pipeline/src/models/event.ts create mode 100644 packages/pipeline/src/models/historical_prices.ts create mode 100644 packages/pipeline/src/models/order.ts create mode 100644 packages/pipeline/src/models/price.ts create mode 100644 packages/pipeline/src/models/relayer.ts create mode 100644 packages/pipeline/src/models/tokens.ts create mode 100644 packages/pipeline/src/models/transaction.ts create mode 100644 packages/pipeline/src/postgres.ts create mode 100644 packages/pipeline/src/run_jobs.ts create mode 100644 packages/pipeline/src/scrape_prices.ts create mode 100644 packages/pipeline/src/scripts/create_tables.ts create mode 100644 packages/pipeline/src/scripts/join_tables.ts create mode 100644 packages/pipeline/src/scripts/query_data.ts create mode 100644 packages/pipeline/src/scripts/scrape_data.ts create mode 100644 packages/pipeline/src/utils.ts create mode 100644 packages/pipeline/src/zrx.ts (limited to 'packages/pipeline') diff --git a/packages/pipeline/src/global.d.ts b/packages/pipeline/src/global.d.ts new file mode 100644 index 000000000..0af4ddb30 --- /dev/null +++ b/packages/pipeline/src/global.d.ts @@ -0,0 +1,6 @@ +declare module 'queue'; +declare module 'airtable'; +declare module '*.json' { + const value: any; + export default value; +} diff --git a/packages/pipeline/src/models/block.ts b/packages/pipeline/src/models/block.ts new file mode 100644 index 000000000..a81cdb293 --- /dev/null +++ b/packages/pipeline/src/models/block.ts @@ -0,0 +1,22 @@ +const block = { + tableName: 'blocks', + tableProperties: { + id: { + type: 'key', + }, + timestamp: { + type: 'timestamp', + required: true, + }, + block_number: { + type: 'bigint', + required: true, + }, + }, +}; +const logToBlockSchemaMapping: any = { + number: 'block_number', + hash: 'block_hash', + timestamp: 'timestamp', +}; +export { block, logToBlockSchemaMapping }; diff --git a/packages/pipeline/src/models/event.ts b/packages/pipeline/src/models/event.ts new file mode 100644 index 000000000..cb1c939e5 --- /dev/null +++ b/packages/pipeline/src/models/event.ts @@ -0,0 +1,84 @@ +const event = { + tableName: 'events', + tableProperties: { + id: { + type: 'key', + }, + timestamp: { + type: 'timestamp', + required: true, + }, + event_type: { + type: 'varchar', + required: true, + }, + error_id: { + type: 'varchar', + }, + order_hash: { + type: 'char(66)', + }, + maker: { + type: 'char(42)', + }, + maker_amount: { + type: 'varchar', + }, + maker_fee: { + type: 'varchar', + }, + maker_token: { + type: 'char(42)', + }, + taker_amount: { + type: 'varchar', + }, + taker_fee: { + type: 'varchar', + }, + taker_token: { + type: 'char(42)', + }, + txn_hash: { + type: 'char(66)', + }, + gas_used: { + type: 'varchar', + }, + gas_price: { + type: 'varchar', + }, + fee_recipient: { + type: 'char(42)', + }, + method_id: { + type: 'char(10)', + }, + salt: { + type: 'varchar', + }, + block_number: { + type: 'bigint', + }, + }, +}; +const logToEventSchemaMapping: any = { + blockNumber: 'block_number', + transactionHash: 'txn_hash', + event: 'event_type', + logIndex: 'log_index', + 'args.maker': 'maker', + 'args.taker': 'taker', + 'args.feeRecipient': 'fee_recipient', + 'args.makerToken': 'maker_token', + 'args.takerToken': 'taker_token', + 'args.filledMakerTokenAmount': 'maker_amount', + 'args.filledTakerTokenAmount': 'taker_amount', + 'args.paidMakerFee': 'maker_fee', + 'args.paidTakerFee': 'taker_fee', + 'args.orderHash': 'order_hash', + 'args.cancelledMakerTokenAmount': 'maker_amount', + 'args.cancelledTakerTokenAmount': 'taker_amount', + 'args.errorId': 'error_id', +}; +export { event, logToEventSchemaMapping }; diff --git a/packages/pipeline/src/models/historical_prices.ts b/packages/pipeline/src/models/historical_prices.ts new file mode 100644 index 000000000..cf49b579e --- /dev/null +++ b/packages/pipeline/src/models/historical_prices.ts @@ -0,0 +1,43 @@ +const historicalPrices = { + tableName: 'historical_prices', + tableProperties: { + token: { + type: 'varchar', + }, + base: { + type: 'varchar', + }, + timestamp: { + type: 'timestamp', + }, + close: { + type: 'numeric(50)', + }, + high: { + type: 'numeric(50)', + }, + low: { + type: 'numeric(50)', + }, + open: { + type: 'numeric(50)', + }, + volume_from: { + type: 'numeric(50)', + }, + volume_to: { + type: 'numeric(50)', + }, + }, +}; +const logToHistoricalPricesSchema: { [log: string]: string } = { + token: 'token', + time: 'timestamp', + close: 'close', + high: 'high', + low: 'low', + open: 'open', + volumefrom: 'volume_from', + volumeto: 'volume_to', +}; +export { historicalPrices, logToHistoricalPricesSchema }; diff --git a/packages/pipeline/src/models/order.ts b/packages/pipeline/src/models/order.ts new file mode 100644 index 000000000..dadae2883 --- /dev/null +++ b/packages/pipeline/src/models/order.ts @@ -0,0 +1,30 @@ +const order = { + tableName: 'orders', + tableProperties: { + id: { + type: 'key', + }, + timestamp: { + type: 'timestamp', + required: true, + }, + block_number: { + type: 'bigint', + required: true, + }, + }, +}; +const logToOrderSchemaMapping: any = { + exchangeContractAddress: 'exchange_contract_address', + maker: 'maker', + makerTokenAddress: 'maker_token', + makerTokenAmount: 'maker_amount', + makerFee: 'maker_fee', + taker: 'taker', + takerTokenAddress: 'taker_token', + takerTokenAmount: 'taker_amount', + takerFee: 'taker_fee', + expirationUnixTimestampSec: 'expiration_unix_timestamp_sec', + salt: 'salt', +}; +export { order, logToOrderSchemaMapping }; diff --git a/packages/pipeline/src/models/price.ts b/packages/pipeline/src/models/price.ts new file mode 100644 index 000000000..9e45cc2a9 --- /dev/null +++ b/packages/pipeline/src/models/price.ts @@ -0,0 +1,15 @@ +const price = { + tableName: 'prices', + tableProperties: { + address: { + type: 'char(42)', + }, + timestamp: { + type: 'timestamp', + }, + price: { + type: 'numeric(50)', + }, + }, +}; +export { price }; diff --git a/packages/pipeline/src/models/relayer.ts b/packages/pipeline/src/models/relayer.ts new file mode 100644 index 000000000..77a123e8b --- /dev/null +++ b/packages/pipeline/src/models/relayer.ts @@ -0,0 +1,75 @@ +// const relayer = { +// tableName: 'relayers', +// tableProperties: { +// id: { +// type: 'integer', +// }, +// name: { +// type: 'varchar', +// }, +// url : { +// type: 'varchar', +// }, +// model: { +// type: 'varchar[]', +// }, +// status: { +// type: 'varchar', +// }, +// sra_status: { +// type: 'varchar', +// }, +// sra_http_url: { +// type: 'varchar', +// }, +// known_fee_addresses: { +// type: 'char(42)[]', +// }, +// known_taker_addresses: { +// type: 'char(42)[]', +// }, +// relayer_type: { +// type: 'varchar', +// }, +// }, +// }; +const relayer = { + tableName: 'relayers', + tableProperties: { + name: { + type: 'varchar', + }, + url: { + type: 'varchar', + }, + sra_http_endpoint: { + type: 'varchar', + }, + sra_ws_endpoint: { + type: 'varchar', + }, + fee_recipient_addresses: { + type: 'char(42)[]', + }, + taker_addresses: { + type: 'char(42)[]', + }, + }, +}; +// const logToRelayerSchemaMapping: any = { +// 'id' : 'id', +// 'fields[\'Name\']': 'name', +// 'fields[\'URL\']': 'url', +// 'fields[\'Model\']': 'model', +// 'fields[\'Status\']': 'status', +// 'fields[\'SRA Status\']': 'sra_status', +// 'fields[\'SRA HTTP URL\']': 'sra_http_url', +// 'fields[\'Known Fee Addresses\']': 'known_fee_addresses', +// 'fields[\'Known Taker Addresses\']': 'known_taker_addresses', +// 'fields[\'Relayer Type\']': 'relayer_type', +// }; +const logToRelayerSchemaMapping: any = { + name: 'name', + homepage_url: 'url', +}; +export { relayer, logToRelayerSchemaMapping }; diff --git a/packages/pipeline/src/models/tokens.ts b/packages/pipeline/src/models/tokens.ts new file mode 100644 index 000000000..96e8a31af --- /dev/null +++ b/packages/pipeline/src/models/tokens.ts @@ -0,0 +1,24 @@ +const token = { + tableName: 'tokens', + tableProperties: { + address: { + type: 'char(66)', + }, + decimals: { + type: 'bigint', + }, + name: { + type: 'varchar', + }, + symbol: { + type: 'varchar', + }, + }, +}; +const logToTokenSchemaMapping: any = { + address: 'address', + decimals: 'decimals', + name: 'name', + symbol: 'symbol', +}; +export { token, logToTokenSchemaMapping }; diff --git a/packages/pipeline/src/models/transaction.ts b/packages/pipeline/src/models/transaction.ts new file mode 100644 index 000000000..715cc9480 --- /dev/null +++ b/packages/pipeline/src/models/transaction.ts @@ -0,0 +1,36 @@ +const transaction = { + tableName: 'transactions', + tableProperties: { + txn_hash: { + type: 'char(66)', + }, + block_hash: { + type: 'char(66)', + }, + block_number: { + type: 'bigint', + }, + gas_used: { + type: 'varchar', + }, + gas_price: { + type: 'varchar', + }, + method_id: { + type: 'char(10)', + }, + salt: { + type: 'varchar', + }, + }, +}; +const logToTransactionSchemaMapping: any = { + hash: 'txn_hash', + gas: 'gas_used', + gasPrice: 'gas_price', + blockHash: 'block_hash', + blockNumber: 'block_number', + method_id: 'method_id', + salt: 'salt', +}; +export { transaction, logToTransactionSchemaMapping }; diff --git a/packages/pipeline/src/postgres.ts b/packages/pipeline/src/postgres.ts new file mode 100644 index 000000000..d095e5c9e --- /dev/null +++ b/packages/pipeline/src/postgres.ts @@ -0,0 +1,12 @@ +import * as dotenv from 'dotenv'; +import { Pool, PoolConfig } from 'pg'; +dotenv.config(); +const client: PoolConfig = { + user: process.env.AURORA_USER, + database: process.env.AURORA_DB, + password: process.env.AURORA_PASSWORD, + port: parseInt(process.env.AURORA_PORT || '5432', 10), + host: process.env.AURORA_HOST, +}; +const postgresClient = new Pool(client); +export { postgresClient }; diff --git a/packages/pipeline/src/run_jobs.ts b/packages/pipeline/src/run_jobs.ts new file mode 100644 index 000000000..4d82d4e2d --- /dev/null +++ b/packages/pipeline/src/run_jobs.ts @@ -0,0 +1,87 @@ +import { exec } from 'child_process'; + +import { postgresClient } from './postgres.js'; +import { dataFetchingQueries } from './scripts/query_data.js'; +import { web3, zrx } from './zrx.js'; +const CUR_BLOCK_OFFSET = 20; +postgresClient.query(dataFetchingQueries.get_max_block, []).then((data: any) => { + const maxBlockNumber = data.rows[0].max; + const safeCurBlockNumber = web3.eth.blockNumber - CUR_BLOCK_OFFSET; + console.log('Scraping ' + maxBlockNumber + ' to ' + safeCurBlockNumber); + exec( + 'node ./lib/scripts/scrape_data --type events --from ' + maxBlockNumber + ' --to ' + safeCurBlockNumber, + (error, stdout, stderr) => { + if (error) { + console.log(error); + return; + } + console.log('Scraped events'); + console.log('Scraping blocks'); + exec( + 'node ./lib/scripts/scrape_data --type blocks --from ' + maxBlockNumber + ' --to ' + safeCurBlockNumber, + (error, stdout, stderr) => { + if (error) { + console.log(error); + return; + } + console.log('Scraped blocks'); + console.log('Scraping transactions'); + exec( + 'node ./lib/scripts/scrape_data --type transactions --from ' + + maxBlockNumber + + ' --to ' + + safeCurBlockNumber, + (error, stdout, stderr) => { + if (error) { + console.log(error); + return; + } + console.log('Scraped transactions'); + console.log('Joining events_staging'); + exec( + 'node ./lib/scripts/join_tables --name events_staging --from ' + + maxBlockNumber + + ' --to ' + + safeCurBlockNumber, + (error, stdout, stderr) => { + if (error) { + console.log(error); + return; + } + console.log('Joined events_staging'); + console.log('Joining events'); + exec( + 'node ./lib/scripts/join_tables --name events --from ' + + maxBlockNumber + + ' --to ' + + safeCurBlockNumber, + (error, stdout, stderr) => { + if (error) { + console.log(error); + return; + } + console.log('Joined events'); + console.log('Joining events_full'); + exec( + 'node ./lib/scripts/join_tables --name events_full --from ' + + maxBlockNumber + + ' --to ' + + safeCurBlockNumber, + (error, stdout, stderr) => { + if (error) { + console.log(error); + return; + } + }, + ); + }, + ); + }, + ); + }, + ); + }, + ); + }, + ); +}); diff --git a/packages/pipeline/src/scrape_prices.ts b/packages/pipeline/src/scrape_prices.ts new file mode 100644 index 000000000..c26062fac --- /dev/null +++ b/packages/pipeline/src/scrape_prices.ts @@ -0,0 +1,10 @@ +import { postgresClient } from './postgres.js'; +import { dataFetchingQueries } from './scripts/query_data.js'; +import { scrapeDataScripts } from './scripts/scrape_data.js'; +import { web3, zrx } from './zrx.js'; +const CUR_BLOCK_OFFSET = 20; +postgresClient.query(dataFetchingQueries.get_most_recent_pricing_date, []).then((data: any) => { + const curMaxScrapedDate = new Date(data.rows[0].max); + const curDate = new Date(); + scrapeDataScripts.scrapeAllPricesToDB(curMaxScrapedDate.getTime(), curDate.getTime()); +}); diff --git a/packages/pipeline/src/scripts/create_tables.ts b/packages/pipeline/src/scripts/create_tables.ts new file mode 100644 index 000000000..fd0d2b78b --- /dev/null +++ b/packages/pipeline/src/scripts/create_tables.ts @@ -0,0 +1,258 @@ +import * as commandLineArgs from 'command-line-args'; + +import { postgresClient } from '../postgres'; +import { formatters } from '../utils'; +const tableQueries: any = { + events_full: `CREATE TABLE IF NOT EXISTS events_full ( + timestamp TIMESTAMP WITH TIME ZONE, + event_type VARCHAR, + error_id VARCHAR, + order_hash CHAR(66), + maker CHAR(42), + maker_amount NUMERIC(78), + maker_fee NUMERIC(78), + maker_token CHAR(42), + taker CHAR(42), + taker_amount NUMERIC(78), + taker_fee NUMERIC(78), + taker_token CHAR(42), + txn_hash CHAR(66), + gas_used NUMERIC(78), + gas_price NUMERIC(78), + fee_recipient CHAR(42), + method_id CHAR(10), + salt VARCHAR, + block_number BIGINT, + log_index BIGINT, + taker_symbol VARCHAR, + taker_name VARCHAR, + taker_decimals BIGINT, + taker_usd_price NUMERIC(78), + taker_txn_usd_value NUMERIC(78), + maker_symbol VARCHAR, + maker_name VARCHAR, + maker_decimals BIGINT, + maker_usd_price NUMERIC(78), + maker_txn_usd_value NUMERIC(78), + PRIMARY KEY (txn_hash, order_hash, log_index) + )`, + events: `CREATE TABLE IF NOT EXISTS events ( + timestamp TIMESTAMP WITH TIME ZONE, + event_type VARCHAR, + error_id VARCHAR, + order_hash CHAR(66), + maker CHAR(42), + maker_amount NUMERIC(78), + maker_fee NUMERIC(78), + maker_token CHAR(42), + taker CHAR(42), + taker_amount NUMERIC(78), + taker_fee NUMERIC(78), + taker_token CHAR(42), + txn_hash CHAR(66), + gas_used NUMERIC(78), + gas_price NUMERIC(78), + fee_recipient CHAR(42), + method_id CHAR(10), + salt VARCHAR, + block_number BIGINT, + log_index BIGINT, + PRIMARY KEY (txn_hash, order_hash, log_index) + )`, + events_staging: `CREATE TABLE IF NOT EXISTS events_staging ( + timestamp TIMESTAMP WITH TIME ZONE, + event_type VARCHAR, + error_id VARCHAR, + order_hash CHAR(66), + maker CHAR(42), + maker_amount NUMERIC(78), + maker_fee NUMERIC(78), + maker_token CHAR(42), + taker CHAR(42), + taker_amount NUMERIC(78), + taker_fee NUMERIC(78), + taker_token CHAR(42), + txn_hash CHAR(66), + fee_recipient CHAR(42), + block_number BIGINT, + log_index BIGINT, + PRIMARY KEY (txn_hash, order_hash, log_index) + )`, + events_raw: `CREATE TABLE IF NOT EXISTS events_raw ( + event_type VARCHAR, + error_id VARCHAR, + order_hash CHAR(66), + maker CHAR(42), + maker_amount NUMERIC(78), + maker_fee NUMERIC(78), + maker_token CHAR(42), + taker CHAR(42), + taker_amount NUMERIC(78), + taker_fee NUMERIC(78), + taker_token CHAR(42), + txn_hash CHAR(66), + fee_recipient CHAR(42), + block_number BIGINT, + log_index BIGINT, + PRIMARY KEY (txn_hash, order_hash, log_index) + )`, + blocks: `CREATE TABLE IF NOT EXISTS blocks ( + timestamp TIMESTAMP WITH TIME ZONE, + block_hash CHAR(66) UNIQUE, + block_number BIGINT, + PRIMARY KEY (block_hash) + )`, + transactions: `CREATE TABLE IF NOT EXISTS transactions ( + txn_hash CHAR(66) UNIQUE, + block_hash CHAR(66), + block_number BIGINT, + gas_used NUMERIC(78), + gas_price NUMERIC(78), + method_id CHAR(10), + salt VARCHAR, + PRIMARY KEY (txn_hash) + )`, + tokens: `CREATE TABLE IF NOT EXISTS tokens ( + address CHAR(42) UNIQUE, + name VARCHAR, + symbol VARCHAR, + decimals INT, + PRIMARY KEY (address) + )`, + prices: `CREATE TABLE IF NOT EXISTS prices ( + address CHAR(42) UNIQUE, + timestamp TIMESTAMP WITH TIME ZONE, + price NUMERIC(78, 18), + PRIMARY KEY (address, timestamp) + )`, + relayers: `CREATE TABLE IF NOT EXISTS relayers ( + name VARCHAR UNIQUE, + url VARCHAR DEFAULT '', + sra_http_endpoint VARCHAR DEFAULT '', + sra_ws_endpoint VARCHAR DEFAULT '', + fee_recipient_addresses CHAR(42)[] DEFAULT '{}', + taker_addresses CHAR(42)[] DEFAULT '{}', + PRIMARY KEY(name)`, + historical_prices: `CREATE TABLE IF NOT EXISTS historical_prices ( + token VARCHAR, + base VARCHAR, + timestamp TIMESTAMP WITH TIME ZONE, + close NUMERIC(78, 18), + high NUMERIC(78, 18), + low NUMERIC(78, 18), + open NUMERIC(78, 18), + volume_from NUMERIC(78, 18), + volume_to NUMERIC(78, 18), + PRIMARY KEY (token, base, timestamp) + )`, + orders: `CREATE TABLE IF NOT EXISTS orders ( + relayer_id VARCHAR, + exchange_contract_address CHAR(42), + maker CHAR(42), + maker_amount NUMERIC(78), + maker_fee NUMERIC(78), + maker_token CHAR(42), + taker CHAR(42), + taker_amount NUMERIC(78), + taker_fee NUMERIC(78), + taker_token CHAR(42), + fee_recipient CHAR(42), + expiration_unix_timestamp_sec NUMERIC(78), + salt VARCHAR, + order_hash CHAR(66), + PRIMARY KEY (relayer_id, order_hash) + )`, +}; +function _safeQuery(query: string): any { + return new Promise((resolve, reject) => { + postgresClient + .query(query) + .then((data: any) => { + resolve(data); + }) + .catch((err: any) => { + reject(err); + }); + }); +} +export const tableScripts = { + createTable(query: string): any { + return _safeQuery(query); + }, + createAllTables(): any { + for (const tableName of tableQueries) { + _safeQuery(tableQueries[tableName]); + } + }, +}; +export const insertDataScripts = { + insertSingleRow(table: string, object: any): any { + return new Promise((resolve, reject) => { + const columns = Object.keys(object); + const safeArray: any = []; + for (const key of columns) { + if (key in object) { + if (key === 'timestamp') { + safeArray.push('to_timestamp(' + object[key] + ')'); + } else if (typeof object[key] === 'string' || object[key] instanceof String) { + safeArray.push(formatters.escapeSQLParam(object[key])); + } else { + safeArray.push(object[key]); + } + } else { + safeArray.push('default'); + } + } + const queryString = `INSERT INTO ${table} (${columns}) VALUES (${safeArray}) ON CONFLICT DO NOTHING`; + console.log(queryString); + postgresClient + .query(queryString) + .then((data: any) => { + resolve(data); + }) + .catch((err: any) => { + reject(err); + }); + }); + }, + insertMultipleRows(table: string, rows: any[], columns: any[]): any { + return new Promise((resolve, reject) => { + if (rows.length > 0) { + const rowsSplit = rows.map((value, index) => { + const safeArray: any = []; + for (const key of columns) { + if (key in value) { + if (key === 'timestamp') { + safeArray.push('to_timestamp(' + value[key] + ')'); + } else if (typeof value[key] === 'string' || value[key] instanceof String) { + safeArray.push(formatters.escapeSQLParam(value[key])); + } else if (value[key] instanceof Array) { + const escapedArray = value[key].map((subValue: string, subIndex: number) => { + return formatters.escapeSQLParam(subValue); + }); + safeArray.push('ARRAY[' + escapedArray.toString() + ']'); + } else { + safeArray.push(value[key]); + } + } else { + safeArray.push('default'); + } + } + return '(' + safeArray + ')'; + }); + const queryString = `INSERT INTO ${table} (${columns}) VALUES ${rowsSplit} ON CONFLICT DO NOTHING`; + postgresClient + .query(queryString) + .then((data: any) => { + resolve(data); + }) + .catch((err: any) => { + // console.log(err); + reject(err); + }); + } else { + resolve({}); + } + }); + }, +}; diff --git a/packages/pipeline/src/scripts/join_tables.ts b/packages/pipeline/src/scripts/join_tables.ts new file mode 100644 index 000000000..e7c05b39a --- /dev/null +++ b/packages/pipeline/src/scripts/join_tables.ts @@ -0,0 +1,234 @@ +import * as commandLineArgs from 'command-line-args'; + +import { postgresClient } from '../postgres'; +import { formatters } from '../utils'; +const optionDefinitions = [ + { name: 'name', alias: 'n', type: String }, + { name: 'from', alias: 'f', type: Number }, + { name: 'to', alias: 't', type: Number }, +]; +const cli = commandLineArgs(optionDefinitions); +const dataInsertionQueries: any = { + events_staging: `INSERT INTO events_staging ( + timestamp, + event_type, + error_id, + order_hash, + maker, + maker_amount, + maker_fee, + maker_token, + taker, + taker_amount, + taker_fee, + taker_token, + txn_hash, + fee_recipient, + block_number, + log_index + ) + (SELECT + b.timestamp, + a.event_type, + a.error_id, + a.order_hash, + a.maker, + a.maker_amount, + a.maker_fee, + a.maker_token, + a.taker, + a.taker_amount, + a.taker_fee, + a.taker_token, + a.txn_hash, + a.fee_recipient, + a.block_number, + a.log_index + FROM + events_raw a + JOIN + blocks b + ON + a.block_number = b.block_number + AND + b.block_number >= $1 + AND + b.block_number <= $2 + ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`, + events: `INSERT INTO events ( + timestamp, + event_type, + error_id, + order_hash, + maker, + maker_amount, + maker_fee, + maker_token, + taker, + taker_amount, + taker_fee, + taker_token, + txn_hash, + fee_recipient, + block_number, + log_index, + gas_used, + gas_price, + method_id, + salt + ) + (SELECT + a.timestamp, + a.event_type, + a.error_id, + a.order_hash, + a.maker, + a.maker_amount, + a.maker_fee, + a.maker_token, + a.taker, + a.taker_amount, + a.taker_fee, + a.taker_token, + a.txn_hash, + a.fee_recipient, + a.block_number, + a.log_index, + b.gas_used, + b.gas_price, + b.method_id, + b.salt + FROM + events_staging a + JOIN + transactions b + ON + a.txn_hash = b.txn_hash + AND + a.block_number >= $1 + AND + a.block_number <= $2 + ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`, + events_full: ` + INSERT INTO events_full ( + timestamp, + event_type, + error_id, + order_hash, + maker, + maker_amount, + maker_fee, + maker_token, + taker, + taker_amount, + taker_fee, + taker_token, + txn_hash, + fee_recipient, + block_number, + log_index, + gas_used, + gas_price, + method_id, + salt, + taker_symbol, + taker_name, + taker_decimals, + taker_usd_price, + taker_txn_usd_value, + maker_symbol, + maker_name, + maker_decimals, + maker_usd_price, + maker_txn_usd_value + ) + (SELECT + events.timestamp, + events.event_type, + events.error_id, + events.order_hash, + events.maker, + events.maker_amount, + events.maker_fee, + events.maker_token, + events.taker, + events.taker_amount, + events.taker_fee, + events.taker_token, + events.txn_hash, + events.fee_recipient, + events.block_number, + events.log_index, + events.gas_used, + events.gas_price, + events.method_id, + events.salt, + taker_token_prices.symbol, + taker_token_prices.name, + taker_token_prices.decimals, + taker_token_prices.price, + (events.taker_amount / (10 ^ taker_token_prices.decimals)) * taker_token_prices.price, + maker_token_prices.symbol, + maker_token_prices.name, + maker_token_prices.decimals, + maker_token_prices.price, + (events.maker_amount / (10 ^ maker_token_prices.decimals)) * maker_token_prices.price + FROM + events + LEFT JOIN + (SELECT + tokens.address, + tokens.name, + tokens.symbol, + tokens.decimals, + prices.timestamp, + prices.price + FROM + tokens + LEFT JOIN + prices + ON + tokens.symbol = prices.symbol) taker_token_prices + ON + (events.taker_token = taker_token_prices.address + AND + (DATE(events.timestamp) = DATE(taker_token_prices.timestamp) OR taker_token_prices.timestamp IS NULL)) + LEFT JOIN + (SELECT + tokens.address, + tokens.name, + tokens.symbol, + tokens.decimals, + prices.timestamp, + prices.price + FROM + tokens + LEFT JOIN + prices + ON + tokens.symbol = prices.symbol) maker_token_prices + ON + (events.maker_token = maker_token_prices.address + AND + (DATE(events.timestamp) = DATE(maker_token_prices.timestamp) OR maker_token_prices.timestamp IS NULL)) + WHERE + events.block_number >= $1 + AND + events.block_number <= $2 + ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`, +}; +if (cli.name) { + const query = dataInsertionQueries[cli.name]; + if (query && cli.from) { + const fromBlock = cli.from; + const toBlock = cli.to ? cli.to : cli.from + 1; + postgresClient + .query(query, [fromBlock, toBlock]) + .then((data: any) => { + console.log(data); + }) + .catch((err: any) => { + console.error(err); + }); + } +} diff --git a/packages/pipeline/src/scripts/query_data.ts b/packages/pipeline/src/scripts/query_data.ts new file mode 100644 index 000000000..97e3749ea --- /dev/null +++ b/packages/pipeline/src/scripts/query_data.ts @@ -0,0 +1,87 @@ +import { formatters } from '../utils'; +export const dataFetchingQueries: any = { + get_missing_txn_hashes: ` + SELECT + a.txn_hash + FROM + events_raw a + WHERE NOT EXISTS + ( + SELECT + * + FROM + transactions b + WHERE + b.txn_hash = a.txn_hash + ) + AND + a.block_number >= $1 + AND + a.block_number < $2`, + get_used_block_numbers: ` + SELECT DISTINCT + a.block_number + FROM + events_raw a + WHERE NOT EXISTS + ( + SELECT + * + FROM + blocks b + WHERE + b.block_number = a.block_number + ) + AND + a.block_number >= $1 + AND + a.block_number < $2`, + get_token_registry: ` + SELECT + * + FROM + tokens`, + get_max_block: ` + SELECT + MAX(block_number) + FROM + events_raw`, + get_relayers: ` + SELECT + * + FROM + relayers`, + get_most_recent_pricing_date: ` + SELECT + MAX(DATE(timestamp)) + FROM + prices + `, + get_top_unknown_token_addresses: ` + SELECT a.token_address as address, a.txn_value / 2 as total_txn_value +FROM +(SELECT token_address, SUM(txn_value) as txn_value +FROM +(select a.timestamp, a.maker_token as token_address, (CASE WHEN a.taker_txn_usd_value > a.maker_txn_usd_value OR a.maker_txn_usd_value IS NULL + THEN a.taker_txn_usd_value + ELSE a.maker_txn_usd_value END) as txn_value + from events_full a + where a.event_type = 'LogFill' + and a.timestamp > (NOW() + INTERVAL '-24 hours') + union + select a.timestamp, a.taker_token as token_address, (CASE WHEN a.taker_txn_usd_value > a.maker_txn_usd_value OR a.maker_txn_usd_value IS NULL + THEN a.taker_txn_usd_value + ELSE a.maker_txn_usd_value END) as txn_value + from events_full a + where a.event_type = 'LogFill' + and a.timestamp > (NOW() + INTERVAL '-24 hours')) token_txn_values +WHERE token_address IS NOT NULL +AND txn_value > 0 +GROUP BY 1 +ORDER BY 2 DESC) a +LEFT JOIN tokens b +ON a.token_address = b.address +WHERE symbol is NULL +ORDER BY 2 DESC +`, +}; diff --git a/packages/pipeline/src/scripts/scrape_data.ts b/packages/pipeline/src/scripts/scrape_data.ts new file mode 100644 index 000000000..963670b47 --- /dev/null +++ b/packages/pipeline/src/scripts/scrape_data.ts @@ -0,0 +1,649 @@ +import { ExchangeEvents, ZeroEx } from '0x.js'; +import { HttpClient, Order, OrderbookRequest, OrderbookResponse, TokenPairsItem } from '@0xproject/connect'; +import * as Airtable from 'airtable'; +import * as commandLineArgs from 'command-line-args'; +import * as _ from 'lodash'; +import * as querystring from 'querystring'; +import * as queue from 'queue'; +import * as request from 'request'; +import * as rpn from 'request-promise-native'; + +import { HttpRequestOptions } from '../../../connect/lib/src/types.js'; +import { relayer } from '../models/relayer.js'; +import { token } from '../models/tokens.js'; +import { postgresClient } from '../postgres.js'; +import { typeConverters } from '../utils.js'; +import { web3, zrx } from '../zrx.js'; + +import { insertDataScripts } from './create_tables.js'; +import { dataFetchingQueries } from './query_data.js'; +const optionDefinitions = [ + { name: 'from', alias: 'f', type: Number }, + { name: 'to', alias: 't', type: Number }, + { name: 'type', type: String }, + { name: 'id', type: String }, + { name: 'force', type: Boolean }, + { name: 'token', type: String }, +]; +const cli = commandLineArgs(optionDefinitions); +const q = queue({ concurrency: 6, autostart: true }); +const airtableBase = new Airtable({ apiKey: process.env.AIRTABLE_API_KEY }).base(process.env.AIRTABLE_0X_BASE); +const BLOCK_INCREMENTS = 1000; +const BASE_SYMBOL = 'USD'; // use USD as base currency against +const API_HIST_LIMIT = 2000; // cryptocompare API limits histoday price query to 2000 days +const SECONDS_PER_DAY = 86400; +const PRICE_API_ENDPOINT = 'https://min-api.cryptocompare.com/data/pricehistorical'; +const RELAYER_REGISTRY_JSON = 'https://raw.githubusercontent.com/0xProject/0x-relayer-registry/master/relayers.json'; +const METAMASK_ETH_CONTRACT_METADATA_JSON = + 'https://raw.githubusercontent.com/MetaMask/eth-contract-metadata/master/contract-map.json'; +const ETHPLORER_BASE_URL = 'http://api.ethplorer.io'; +const ETHPLORER_TOP_TOKENS_JSON = `${ETHPLORER_BASE_URL}/getTopTokens?apiKey=dyijm5418TjOJe34`; +// const HIST_PRICE_API_ENDPOINT = 'https://min-api.cryptocompare.com/data/histoday'; +const AIRTABLE_RELAYER_INFO = 'Relayer Info'; +export const pullDataScripts = { + getAllEvents(fromBlockNumber: number, toBlockNumber: number): any { + return new Promise((resolve, reject) => { + const getLogsPromises: any[] = []; + getLogsPromises.push( + zrx.exchange.getLogsAsync( + ExchangeEvents.LogFill, + { fromBlock: fromBlockNumber, toBlock: toBlockNumber }, + {}, + ), + zrx.exchange.getLogsAsync( + ExchangeEvents.LogCancel, + { fromBlock: fromBlockNumber, toBlock: toBlockNumber }, + {}, + ), + zrx.exchange.getLogsAsync( + ExchangeEvents.LogError, + { fromBlock: fromBlockNumber, toBlock: toBlockNumber }, + {}, + ), + ); + Promise.all(getLogsPromises) + .then((data: any[]) => { + resolve(data); + }) + .catch((err: any) => { + reject(err); + }); + }); + }, + getBlockInfo(blockNumber: number): any { + return new Promise((resolve, reject) => { + web3.eth.getBlock(blockNumber, (err, result) => { + if (err) { + reject(err); + } else { + resolve(result); + } + }); + }); + }, + getTransactionInfo(transactionHash: string): any { + return new Promise((resolve, reject) => { + web3.eth.getTransaction(transactionHash, (err, result) => { + if (err) { + reject(err); + } else { + resolve(result); + } + }); + }); + }, + getTokenRegistry(): any { + return new Promise((resolve, reject) => { + zrx.tokenRegistry + .getTokensAsync() + .then((data: any) => { + resolve(data); + }) + .catch((err: any) => { + reject(err); + }); + }); + }, + getMetaMaskTokens(): any { + return new Promise((resolve, reject) => { + request(METAMASK_ETH_CONTRACT_METADATA_JSON, (error, response, body) => { + if (error) { + reject(error); + } else { + resolve(JSON.parse(body)); + } + }); + }); + }, + getEthplorerTopTokens(): any { + return new Promise((resolve, reject) => { + request(ETHPLORER_TOP_TOKENS_JSON, (error, response, body) => { + if (error) { + reject(error); + } else { + resolve(JSON.parse(body)); + } + }); + }); + }, + getEthplorerToken(tokenAddress: string): any { + return new Promise((resolve, reject) => { + const url = `${ETHPLORER_BASE_URL}/getTokenInfo/${tokenAddress}?apiKey=dyijm5418TjOJe34`; + request(url, (error, response, body) => { + if (error) { + reject(error); + } else { + try { + const json = JSON.parse(body); + resolve(json); + } catch (err) { + resolve({ error: 'error' }); + } + } + }); + }); + }, + getPriceData(symbol: string, timestamp: number, timeDelay?: number): any { + return new Promise((resolve, reject) => { + if (symbol === 'WETH') { + symbol = 'ETH'; + } + let parsedParams = querystring.stringify({ + fsym: symbol, + tsyms: 'USD', + ts: timestamp / 1000, + }); + console.debug(parsedParams); + setTimeout(() => { + request(PRICE_API_ENDPOINT + '?' + parsedParams, (error, response, body) => { + if (error) { + reject(error); + } else { + resolve(JSON.parse(body)); + } + }); + }, timeDelay); + }); + }, + getRelayers(): any { + return new Promise((resolve, reject) => { + request(RELAYER_REGISTRY_JSON, (error, response, body) => { + if (error) { + reject(error); + } else { + resolve(JSON.parse(body)); + } + }); + }); + }, + async getOrderBook(sraEndpoint: string): Promise { + const relayerClient = new HttpClient(sraEndpoint); + const tokenResponse: TokenPairsItem[] = await relayerClient.getTokenPairsAsync(); + const fullOrderBook: OrderbookResponse[] = []; + for (const tokenPair of tokenResponse) { + const orderBookRequest: OrderbookRequest = { + baseTokenAddress: tokenPair.tokenA.address, + quoteTokenAddress: tokenPair.tokenB.address, + }; + const orderBook: OrderbookResponse = await relayerClient.getOrderbookAsync(orderBookRequest); + fullOrderBook.push(orderBook); + } + return fullOrderBook; + }, + // async getHistoricalPrices( + // fromSymbol: string, + // toSymbol: string, + // fromTimestamp: number, + // toTimestamp: number, + // ): Promise { + // const daysInQueryPeriod = Math.round((toTimestamp - fromTimestamp) / (SECONDS_PER_DAY)); + // if(fromSymbol === 'WETH') { + // fromSymbol = 'ETH'; + // } + // var parsedParams = { + // fsym: fromSymbol, + // tsym: toSymbol, + // limit: Math.min(daysInQueryPeriod, API_HIST_LIMIT), + // toTs: toTimestamp, + // }; + // var options = { + // uri: HIST_PRICE_API_ENDPOINT, + // qs: parsedParams, + // json: false, + // }; + // try { + // const response = await rpn(options); + // return Promise.resolve(JSON.parse(response)); + // } catch (error) { + // console.debug(error); + // return Promise.reject(error); + // } + // }, +}; +export const scrapeDataScripts = { + scrapeAllPricesToDB(fromTime: number, toTime: number) { + const fromDate = new Date(fromTime); + fromDate.setUTCHours(0); + fromDate.setUTCMinutes(0); + fromDate.setUTCSeconds(0); + fromDate.setUTCMilliseconds(0); + const toDate = new Date(toTime); + postgresClient + .query(dataFetchingQueries.get_token_registry, []) + .then((result: any) => { + for (const curDate = fromDate; curDate < toDate; curDate.setDate(curDate.getDate() + 1)) { + for (const token of Object.values(result.rows)) { + console.debug('Scraping ' + curDate + ' ' + token); + q.push(_scrapePriceToDB(curDate.getTime(), token, 500)); + } + } + }) + .catch((err: any) => { + console.debug(err); + }); + }, +}; +function _scrapeEventsToDB(fromBlock: number, toBlock: number): any { + return (cb: () => void) => { + pullDataScripts + .getAllEvents(fromBlock, toBlock) + .then((data: any) => { + const parsedEvents: any = {}; + parsedEvents[ExchangeEvents.LogFill] = []; + parsedEvents[ExchangeEvents.LogCancel] = []; + parsedEvents[ExchangeEvents.LogError] = []; + for (const index in data) { + for (const datum of data[index]) { + const event = typeConverters.convertLogEventToEventObject(datum); + parsedEvents[event.event_type].push(event); + } + } + console.log(fromBlock + ' : ' + toBlock + ' ' + parsedEvents[ExchangeEvents.LogFill].length); + for (const event_type in parsedEvents) { + if (parsedEvents[event_type].length > 0) { + insertDataScripts + .insertMultipleRows( + 'events_raw', + parsedEvents[event_type], + Object.keys(parsedEvents[event_type][0]), + ) + .then(() => {}) + .catch((error: any) => {}); + } + } + cb(); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapeBlockToDB(block: number): any { + return (cb: () => void) => { + pullDataScripts + .getBlockInfo(block) + .then((data: any) => { + const parsedBlock = typeConverters.convertLogBlockToBlockObject(data); + insertDataScripts + .insertSingleRow('blocks', parsedBlock) + .then((result: any) => { + cb(); + }) + .catch((err: any) => { + cb(); + }); + }) + .catch((err: any) => { + cb(); + }); + }; +} +// function _scrapeAllRelayersToDB(): any { +// return (cb: () => void) => { +// airtableBase(AIRTABLE_RELAYER_INFO) +// .select() +// .eachPage((records: any, fetchNextPage: () => void) => { +// const parsedRelayers: any[] = []; +// for(const record of records) { +// parsedRelayers.push(typeConverters.convertRelayerToRelayerObject(record)); +// } +// insertDataScripts.insertMultipleRows('relayers', parsedRelayers, Object.keys(parsedRelayers[0])) +// .then((result: any) => { +// cb(); +// }) +// .catch((err: any) => { +// cb(); +// }); +// }) +// .catch((err: any) => { +// cb(); +// }); +// }; +// } +function _scrapeAllRelayersToDB(): any { + return (cb: () => void) => { + pullDataScripts + .getRelayers() + .then((relayers: any[]) => { + console.log(relayers); + const parsedRelayers: any[] = []; + for (const relayer of relayers) { + parsedRelayers.push(typeConverters.convertRelayerToRelayerObject(relayer)); + } + console.log(parsedRelayers); + insertDataScripts + .insertMultipleRows('relayers', parsedRelayers, Object.keys(relayer.tableProperties)) + .then((result: any) => { + console.log(result); + cb(); + }) + .catch((err: any) => { + console.log(err); + cb(); + }); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapeTransactionToDB(transactionHash: string): any { + return (cb: () => void) => { + pullDataScripts + .getTransactionInfo(transactionHash) + .then((data: any) => { + const parsedTransaction = typeConverters.convertLogTransactionToTransactionObject(data); + insertDataScripts + .insertSingleRow('transactions', parsedTransaction) + .then((result: any) => { + cb(); + }) + .catch((err: any) => { + cb(); + }); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapeTokenRegistryToDB(): any { + return (cb: () => void) => { + pullDataScripts + .getTokenRegistry() + .then((data: any) => { + const parsedTokens: any = []; + for (const token of data) { + parsedTokens.push(typeConverters.convertLogTokenToTokenObject(token)); + } + insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); + cb(); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapeMetaMaskEthContractMetadataToDB(): any { + return (cb: () => void) => { + pullDataScripts + .getMetaMaskTokens() + .then((data: any) => { + const parsedTokens: any = []; + const dataArray = _.map(_.keys(data), (tokenAddress: string) => { + const value = _.get(data, tokenAddress); + return { + address: tokenAddress, + ...value, + }; + }); + const erc20TokensOnly = _.filter(dataArray, entry => { + const isErc20 = _.get(entry, 'erc20'); + return isErc20; + }); + for (const token of erc20TokensOnly) { + parsedTokens.push(typeConverters.convertMetaMaskTokenToTokenObject(token)); + } + insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); + cb(); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapeEthplorerTopTokensToDB(): any { + return (cb: () => void) => { + pullDataScripts + .getEthplorerTopTokens() + .then((data: any) => { + const parsedTokens: any = []; + const tokens = _.get(data, 'tokens'); + for (const token of tokens) { + parsedTokens.push(typeConverters.convertMetaMaskTokenToTokenObject(token)); + } + insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); + cb(); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapeUnknownTokenInformationToDB(): any { + return (cb: () => void) => { + postgresClient + .query(dataFetchingQueries.get_top_unknown_token_addresses) + .then(async (result: any) => { + const addresses = _.map(result.rows, row => _.get(row, 'address')); + const responses = await Promise.all( + _.map(addresses, address => pullDataScripts.getEthplorerToken(address)), + ); + const tokens = _.filter(responses, response => _.isUndefined(_.get(response, 'error'))); + const parsedTokens = _.map(tokens, tokenInfo => + typeConverters.convertEthplorerTokenToTokenObject(tokenInfo), + ); + insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0])); + cb(); + }) + .catch((err: any) => { + cb(); + }); + }; +} +function _scrapePriceToDB(timestamp: number, token: any, timeDelay?: number): any { + return (cb: () => void) => { + pullDataScripts + .getPriceData(token.symbol, timestamp, timeDelay) + .then((data: any) => { + const safeSymbol = token.symbol === 'WETH' ? 'ETH' : token.symbol; + const parsedPrice = { + timestamp: timestamp / 1000, + symbol: token.symbol, + base: 'USD', + price: _.has(data[safeSymbol], 'USD') ? data[safeSymbol].USD : 0, + }; + console.debug('Inserting ' + timestamp); + console.debug(parsedPrice); + insertDataScripts.insertSingleRow('prices', parsedPrice); + cb(); + }) + .catch((err: any) => { + console.debug(err); + cb(); + }); + }; +} +// function _scrapeHistoricalPricesToDB(token: any, fromTimestamp: number, toTimestamp: number): any { +// return (cb: () => void) => { +// pullDataScripts +// .getHistoricalPrices(token, BASE_SYMBOL, fromTimestamp, toTimestamp) +// .then((data: any) => { +// const parsedHistoricalPrices: any = []; +// for (const historicalPrice of data['Data']) { +// const parsedHistoricalPrice = typeConverters.convertLogHistoricalPricesToHistoricalPricesObject(historicalPrice); +// parsedHistoricalPrice['token'] = token; +// parsedHistoricalPrice['base'] = BASE_SYMBOL; +// parsedHistoricalPrices.push(parsedHistoricalPrice); +// } +// if (parsedHistoricalPrices.length > 0) { +// insertDataScripts +// .insertMultipleRows( +// 'historical_prices', +// parsedHistoricalPrices, +// Object.keys(parsedHistoricalPrices[0]), +// ) +// .catch((error: any) => { +// console.error(error); +// }); +// } +// cb(); +// }) +// .catch((error: any) => { +// console.error(error); +// cb(); +// }); +// }; +// } +function _scrapeOrderBookToDB(id: string, sraEndpoint: string): any { + return (cb: () => void) => { + pullDataScripts + .getOrderBook(sraEndpoint) + .then((data: any) => { + for (const book of data) { + for (const order of book.bids) { + console.debug(order); + const parsedOrder = typeConverters.convertLogOrderToOrderObject(order); + parsedOrder.relayer_id = id; + parsedOrder.order_hash = ZeroEx.getOrderHashHex(order); + insertDataScripts.insertSingleRow('orders', parsedOrder).catch((error: any) => { + console.error(error); + }); + } + for (const order of book.asks) { + console.debug(order); + const parsedOrder = typeConverters.convertLogOrderToOrderObject(order); + parsedOrder.relayer_id = id; + parsedOrder.order_hash = ZeroEx.getOrderHashHex(order); + insertDataScripts.insertSingleRow('orders', parsedOrder).catch((error: any) => { + console.error(error); + }); + } + } + cb(); + }) + .catch((error: any) => { + console.error(error); + cb(); + }); + }; +} +if (cli.type === 'events') { + if (cli.from && cli.to) { + const destToBlock = cli.to ? cli.to : cli.from; + let curFromBlock = cli.from; + let curToBlock = curFromBlock; + do { + curToBlock += destToBlock - curToBlock < BLOCK_INCREMENTS ? destToBlock - curToBlock : BLOCK_INCREMENTS; + q.push(_scrapeEventsToDB(curFromBlock, curToBlock)); + curFromBlock = curToBlock + 1; + } while (curToBlock < destToBlock); + } +} else if (cli.type === 'blocks') { + if (cli.from && cli.to) { + if (cli.force) { + const destToBlock = cli.to ? cli.to : cli.from; + let curFromBlock = cli.from; + const curToBlock = curFromBlock; + for (; curFromBlock < destToBlock; curFromBlock++) { + q.push(_scrapeBlockToDB(curFromBlock)); + } + } else { + const fetchFrom = cli.from; + const fetchTo = cli.to ? cli.to : cli.from + 1; + postgresClient + .query(dataFetchingQueries.get_used_block_numbers, [fetchFrom, fetchTo]) + .then((data: any) => { + for (const row of data.rows) { + q.push(_scrapeBlockToDB(row.block_number)); + } + }) + .catch((err: any) => { + // console.debug(err); + }); + } + } +} else if (cli.type === 'transactions') { + if (cli.id) { + q.push(_scrapeTransactionToDB(cli.id)); + } else if (cli.from) { + const fetchFrom = cli.from; + const fetchTo = cli.to ? cli.to : cli.from + 1; + postgresClient + .query(dataFetchingQueries.get_missing_txn_hashes, [fetchFrom, fetchTo]) + .then((data: any) => { + for (const row of data.rows) { + q.push(_scrapeTransactionToDB(row.txn_hash)); + } + }) + .catch((err: any) => { + // console.debug(err); + }); + } +} else if (cli.type === 'tokens') { + q.push(_scrapeMetaMaskEthContractMetadataToDB()); + q.push(_scrapeEthplorerTopTokensToDB()); +} else if (cli.type === 'unknown_tokens') { + q.push(_scrapeUnknownTokenInformationToDB()); +} else if (cli.type === 'prices' && cli.from && cli.to) { + const fromDate = new Date(cli.from); + console.debug(fromDate); + fromDate.setUTCHours(0); + fromDate.setUTCMinutes(0); + fromDate.setUTCSeconds(0); + fromDate.setUTCMilliseconds(0); + console.debug(fromDate); + const toDate = new Date(cli.to); + postgresClient + .query(dataFetchingQueries.get_token_registry, []) + .then((result: any) => { + for (const curDate = fromDate; curDate < toDate; curDate.setDate(curDate.getDate() + 1)) { + for (const token of Object.values(result.rows)) { + console.debug('Scraping ' + curDate + ' ' + token); + q.push(_scrapePriceToDB(curDate.getTime(), token)); + } + } + }) + .catch((err: any) => { + console.debug(err); + }); + // } else if (cli.type === 'historical_prices') { + // if (cli.token && cli.from && cli.to) { + // q.push(_scrapeHistoricalPricesToDB(cli.token, cli.from, cli.to)); + // } + // } else if (cli.type === 'all_historical_prices') { + // if (cli.from && cli.to) { + // postgresClient + // .query(dataFetchingQueries.get_token_registry, []) + // .then((result: any) => { + // const curTokens: any = result.rows.map((a: any): any => a.symbol); + // for (const curToken of curTokens) { + // console.debug('Historical data backfill: Pushing coin ' + curToken); + // q.push(_scrapeHistoricalPricesToDB(curToken, cli.from, cli.to)); + // } + // }) + // .catch((err: any) => { + // console.debug(err); + // }); + // } +} else if (cli.type === 'relayers') { + q.push(_scrapeAllRelayersToDB()); +} else if (cli.type === 'orders') { + postgresClient.query(dataFetchingQueries.get_relayers, []).then((result: any) => { + for (const relayer of result.rows) { + if (relayer.sra_http_url) { + q.push(_scrapeOrderBookToDB(relayer.id, relayer.sra_http_url)); + } + } + }); +} diff --git a/packages/pipeline/src/utils.ts b/packages/pipeline/src/utils.ts new file mode 100644 index 000000000..3474cd16b --- /dev/null +++ b/packages/pipeline/src/utils.ts @@ -0,0 +1,181 @@ +import * as _ from 'lodash'; + +import { block, logToBlockSchemaMapping } from './models/block'; +import { event, logToEventSchemaMapping } from './models/event'; +import { historicalPrices, logToHistoricalPricesSchema } from './models/historical_prices'; +import { logToOrderSchemaMapping, order } from './models/order'; +import { logToRelayerSchemaMapping } from './models/relayer'; +import { logToTokenSchemaMapping, token } from './models/tokens'; +import { logToTransactionSchemaMapping, transaction } from './models/transaction'; +export const typeConverters = { + convertLogEventToEventObject(log: any): any { + const newEvent: any = {}; + for (const key in logToEventSchemaMapping) { + if (_.has(log, key)) { + newEvent[logToEventSchemaMapping[key]] = _.get(log, key); + if (newEvent[logToEventSchemaMapping[key]].constructor.name === 'BigNumber') { + newEvent[logToEventSchemaMapping[key]] = newEvent[logToEventSchemaMapping[key]].toString(); + } + } + } + return newEvent; + }, + convertLogBlockToBlockObject(logBlock: any): any { + const newBlock: any = {}; + for (const key in logToBlockSchemaMapping) { + if (_.has(logBlock, key)) { + newBlock[logToBlockSchemaMapping[key]] = _.get(logBlock, key); + if (newBlock[logToBlockSchemaMapping[key]].constructor.name === 'BigNumber') { + newBlock[logToBlockSchemaMapping[key]] = newBlock[logToBlockSchemaMapping[key]].toString(); + } + } + } + return newBlock; + }, + convertLogTokenToTokenObject(logToken: any): any { + const newToken: any = {}; + for (const key in logToTokenSchemaMapping) { + if (_.has(logToken, key)) { + newToken[logToTokenSchemaMapping[key]] = _.get(logToken, key); + if (newToken[logToTokenSchemaMapping[key]].constructor.name === 'BigNumber') { + newToken[logToTokenSchemaMapping[key]] = newToken[logToTokenSchemaMapping[key]].toString(); + } + } + } + newToken[logToTokenSchemaMapping.address] = newToken[logToTokenSchemaMapping.address].toLowerCase(); + return newToken; + }, + convertMetaMaskTokenToTokenObject(metaMaskToken: any): any { + const newToken: any = {}; + for (const key in logToTokenSchemaMapping) { + if (_.has(metaMaskToken, key)) { + newToken[logToTokenSchemaMapping[key]] = _.get(metaMaskToken, key); + } + } + newToken[logToTokenSchemaMapping.address] = newToken[logToTokenSchemaMapping.address].toLowerCase(); + console.log(newToken); + return newToken; + }, + convertEthplorerTokenToTokenObject(ethplorerToken: any): any { + const newToken: any = {}; + for (const key in logToTokenSchemaMapping) { + if (_.has(ethplorerToken, key)) { + newToken[logToTokenSchemaMapping[key]] = _.get(ethplorerToken, key); + } + } + newToken[logToTokenSchemaMapping.address] = newToken[logToTokenSchemaMapping.address].toLowerCase(); + return newToken; + }, + convertLogTransactionToTransactionObject(logTransaction: any): any { + const newTransaction: any = {}; + for (const key in logToTransactionSchemaMapping) { + if (_.has(logTransaction, key)) { + newTransaction[logToTransactionSchemaMapping[key]] = _.get(logTransaction, key); + if (newTransaction[logToTransactionSchemaMapping[key]].constructor.name === 'BigNumber') { + newTransaction[logToTransactionSchemaMapping[key]] = newTransaction[ + logToTransactionSchemaMapping[key] + ].toString(); + } + } else { + if (key === 'method_id') { + newTransaction[logToTransactionSchemaMapping[key]] = logTransaction.input.substring(0, 10); + } else if (key === 'salt') { + newTransaction[logToTransactionSchemaMapping[key]] = + '0x' + logTransaction.input.substring(714, 778); // Only God can judge me + } + } + } + return newTransaction; + }, + // convertRelayerToRelayerObject(logRelayer: any): any { + // const newRelayer: any = {}; + // for (const key in logToRelayerSchemaMapping) { + // if (_.has(logRelayer, key)) { + // newRelayer[logToRelayerSchemaMapping[key]] = _.get(logRelayer, key); + // if (newRelayer[logToRelayerSchemaMapping[key]].constructor.name === 'BigNumber') { + // newRelayer[logToRelayerSchemaMapping[key]] = newRelayer[logToRelayerSchemaMapping[key]].toString(); + // } + // } else if((logToRelayerSchemaMapping[key] === 'known_fee_addresses' || logToRelayerSchemaMapping[key] === 'known_taker_addresses')) { + // newRelayer[logToRelayerSchemaMapping[key]] = '{}'; + // } else { + // newRelayer[logToRelayerSchemaMapping[key]] = ''; + // } + // } + // return newRelayer; + // }, + convertRelayerToRelayerObject(logRelayer: any): any { + const newRelayer: any = {}; + for (const key in logToRelayerSchemaMapping) { + if (_.has(logRelayer, key)) { + newRelayer[logToRelayerSchemaMapping[key]] = _.get(logRelayer, key); + if (newRelayer[logToRelayerSchemaMapping[key]].constructor.name === 'BigNumber') { + newRelayer[logToRelayerSchemaMapping[key]] = newRelayer[logToRelayerSchemaMapping[key]].toString(); + } + } else if ( + logToRelayerSchemaMapping[key] === 'known_fee_addresses' || + logToRelayerSchemaMapping[key] === 'known_taker_addresses' + ) { + newRelayer[logToRelayerSchemaMapping[key]] = '{}'; + } else { + newRelayer[logToRelayerSchemaMapping[key]] = ''; + } + } + if (_.has(logRelayer, 'networks')) { + for (const network of logRelayer.networks) { + if (network.networkId === 1) { + if (_.has(network, 'sra_http_endpoint')) { + newRelayer.sra_http_endpoint = network.sra_http_endpoint; + } + if (_.has(network, 'sra_ws_endpoint')) { + newRelayer.sra_ws_endpoint = network.sra_ws_endpoint; + } + if (_.has(network, 'static_order_fields')) { + if (_.has(network, 'static_order_fields.fee_recipient_addresses')) { + newRelayer.fee_recipient_addresses = network.static_order_fields.fee_recipient_addresses; + } + if (_.has(network, 'static_order_fields.taker_addresses')) { + newRelayer.taker_addresses = network.static_order_fields.taker_addresses; + } + } + } + } + } + return newRelayer; + }, + convertLogHistoricalPricesToHistoricalPricesObject(logHistoricalPrice: any): any { + const newHistoricalPrices: any = {}; + for (const key in logToHistoricalPricesSchema) { + if (_.has(logHistoricalPrice, key)) { + newHistoricalPrices[logToHistoricalPricesSchema[key]] = _.get(logHistoricalPrice, key); + } + } + return newHistoricalPrices; + }, + convertLogOrderToOrderObject(logOrder: any): any { + const newOrder: any = {}; + for (const key in logToOrderSchemaMapping) { + if (_.has(logOrder, key)) { + console.log(key); + console.log(logOrder[key]); + newOrder[logToOrderSchemaMapping[key]] = _.get(logOrder, key); + if (newOrder[logToOrderSchemaMapping[key]].constructor.name === 'BigNumber') { + newOrder[logToOrderSchemaMapping[key]] = newOrder[logToOrderSchemaMapping[key]].toString(); + } + } + } + console.log(newOrder); + return newOrder; + }, +}; +export const formatters = { + escapeSQLParams(params: any[]): string { + let escapedString = ''; + for (const i in params) { + escapedString += "'" + params[i] + "',"; + } + return escapedString.slice(0, -1); + }, + escapeSQLParam(param: string): string { + return "'" + param + "'"; + }, +}; diff --git a/packages/pipeline/src/zrx.ts b/packages/pipeline/src/zrx.ts new file mode 100644 index 000000000..cbe10a55a --- /dev/null +++ b/packages/pipeline/src/zrx.ts @@ -0,0 +1,11 @@ +import { ExchangeEvents, ZeroEx } from '0x.js'; +import * as dotenv from 'dotenv'; +import * as Web3 from 'web3'; +dotenv.config(); +const provider = new Web3.providers.HttpProvider(process.env.WEB3_PROVIDER_URL); +const web3 = new Web3(provider); +const MAINNET = 1; +const zrx = new ZeroEx(provider, { + networkId: MAINNET, +}); +export { web3, zrx }; -- cgit v1.2.3