aboutsummaryrefslogtreecommitdiffstats
path: root/packages/pipeline/src/scripts
diff options
context:
space:
mode:
authorAlex Browne <stephenalexbrowne@gmail.com>2018-09-18 02:27:38 +0800
committerFred Carlsen <fred@sjelfull.no>2018-12-06 19:01:16 +0800
commitdda44500c52a4b72f6a27fb2384a3f43100a1b26 (patch)
tree49de937e27a2e0bfbc925493c881f411366df513 /packages/pipeline/src/scripts
parent7b81bd831fb4b43380ba4bc256d154ab0b5f8fca (diff)
downloaddexon-sol-tools-dda44500c52a4b72f6a27fb2384a3f43100a1b26.tar
dexon-sol-tools-dda44500c52a4b72f6a27fb2384a3f43100a1b26.tar.gz
dexon-sol-tools-dda44500c52a4b72f6a27fb2384a3f43100a1b26.tar.bz2
dexon-sol-tools-dda44500c52a4b72f6a27fb2384a3f43100a1b26.tar.lz
dexon-sol-tools-dda44500c52a4b72f6a27fb2384a3f43100a1b26.tar.xz
dexon-sol-tools-dda44500c52a4b72f6a27fb2384a3f43100a1b26.tar.zst
dexon-sol-tools-dda44500c52a4b72f6a27fb2384a3f43100a1b26.zip
Rebase pipeline branch off development
Diffstat (limited to 'packages/pipeline/src/scripts')
-rw-r--r--packages/pipeline/src/scripts/create_tables.ts258
-rw-r--r--packages/pipeline/src/scripts/join_tables.ts234
-rw-r--r--packages/pipeline/src/scripts/query_data.ts87
-rw-r--r--packages/pipeline/src/scripts/scrape_data.ts649
4 files changed, 1228 insertions, 0 deletions
diff --git a/packages/pipeline/src/scripts/create_tables.ts b/packages/pipeline/src/scripts/create_tables.ts
new file mode 100644
index 000000000..fd0d2b78b
--- /dev/null
+++ b/packages/pipeline/src/scripts/create_tables.ts
@@ -0,0 +1,258 @@
+import * as commandLineArgs from 'command-line-args';
+
+import { postgresClient } from '../postgres';
+import { formatters } from '../utils';
+const tableQueries: any = {
+ events_full: `CREATE TABLE IF NOT EXISTS events_full (
+ timestamp TIMESTAMP WITH TIME ZONE,
+ event_type VARCHAR,
+ error_id VARCHAR,
+ order_hash CHAR(66),
+ maker CHAR(42),
+ maker_amount NUMERIC(78),
+ maker_fee NUMERIC(78),
+ maker_token CHAR(42),
+ taker CHAR(42),
+ taker_amount NUMERIC(78),
+ taker_fee NUMERIC(78),
+ taker_token CHAR(42),
+ txn_hash CHAR(66),
+ gas_used NUMERIC(78),
+ gas_price NUMERIC(78),
+ fee_recipient CHAR(42),
+ method_id CHAR(10),
+ salt VARCHAR,
+ block_number BIGINT,
+ log_index BIGINT,
+ taker_symbol VARCHAR,
+ taker_name VARCHAR,
+ taker_decimals BIGINT,
+ taker_usd_price NUMERIC(78),
+ taker_txn_usd_value NUMERIC(78),
+ maker_symbol VARCHAR,
+ maker_name VARCHAR,
+ maker_decimals BIGINT,
+ maker_usd_price NUMERIC(78),
+ maker_txn_usd_value NUMERIC(78),
+ PRIMARY KEY (txn_hash, order_hash, log_index)
+ )`,
+ events: `CREATE TABLE IF NOT EXISTS events (
+ timestamp TIMESTAMP WITH TIME ZONE,
+ event_type VARCHAR,
+ error_id VARCHAR,
+ order_hash CHAR(66),
+ maker CHAR(42),
+ maker_amount NUMERIC(78),
+ maker_fee NUMERIC(78),
+ maker_token CHAR(42),
+ taker CHAR(42),
+ taker_amount NUMERIC(78),
+ taker_fee NUMERIC(78),
+ taker_token CHAR(42),
+ txn_hash CHAR(66),
+ gas_used NUMERIC(78),
+ gas_price NUMERIC(78),
+ fee_recipient CHAR(42),
+ method_id CHAR(10),
+ salt VARCHAR,
+ block_number BIGINT,
+ log_index BIGINT,
+ PRIMARY KEY (txn_hash, order_hash, log_index)
+ )`,
+ events_staging: `CREATE TABLE IF NOT EXISTS events_staging (
+ timestamp TIMESTAMP WITH TIME ZONE,
+ event_type VARCHAR,
+ error_id VARCHAR,
+ order_hash CHAR(66),
+ maker CHAR(42),
+ maker_amount NUMERIC(78),
+ maker_fee NUMERIC(78),
+ maker_token CHAR(42),
+ taker CHAR(42),
+ taker_amount NUMERIC(78),
+ taker_fee NUMERIC(78),
+ taker_token CHAR(42),
+ txn_hash CHAR(66),
+ fee_recipient CHAR(42),
+ block_number BIGINT,
+ log_index BIGINT,
+ PRIMARY KEY (txn_hash, order_hash, log_index)
+ )`,
+ events_raw: `CREATE TABLE IF NOT EXISTS events_raw (
+ event_type VARCHAR,
+ error_id VARCHAR,
+ order_hash CHAR(66),
+ maker CHAR(42),
+ maker_amount NUMERIC(78),
+ maker_fee NUMERIC(78),
+ maker_token CHAR(42),
+ taker CHAR(42),
+ taker_amount NUMERIC(78),
+ taker_fee NUMERIC(78),
+ taker_token CHAR(42),
+ txn_hash CHAR(66),
+ fee_recipient CHAR(42),
+ block_number BIGINT,
+ log_index BIGINT,
+ PRIMARY KEY (txn_hash, order_hash, log_index)
+ )`,
+ blocks: `CREATE TABLE IF NOT EXISTS blocks (
+ timestamp TIMESTAMP WITH TIME ZONE,
+ block_hash CHAR(66) UNIQUE,
+ block_number BIGINT,
+ PRIMARY KEY (block_hash)
+ )`,
+ transactions: `CREATE TABLE IF NOT EXISTS transactions (
+ txn_hash CHAR(66) UNIQUE,
+ block_hash CHAR(66),
+ block_number BIGINT,
+ gas_used NUMERIC(78),
+ gas_price NUMERIC(78),
+ method_id CHAR(10),
+ salt VARCHAR,
+ PRIMARY KEY (txn_hash)
+ )`,
+ tokens: `CREATE TABLE IF NOT EXISTS tokens (
+ address CHAR(42) UNIQUE,
+ name VARCHAR,
+ symbol VARCHAR,
+ decimals INT,
+ PRIMARY KEY (address)
+ )`,
+ prices: `CREATE TABLE IF NOT EXISTS prices (
+ address CHAR(42) UNIQUE,
+ timestamp TIMESTAMP WITH TIME ZONE,
+ price NUMERIC(78, 18),
+ PRIMARY KEY (address, timestamp)
+ )`,
+ relayers: `CREATE TABLE IF NOT EXISTS relayers (
+ name VARCHAR UNIQUE,
+ url VARCHAR DEFAULT '',
+ sra_http_endpoint VARCHAR DEFAULT '',
+ sra_ws_endpoint VARCHAR DEFAULT '',
+ fee_recipient_addresses CHAR(42)[] DEFAULT '{}',
+ taker_addresses CHAR(42)[] DEFAULT '{}',
+ PRIMARY KEY(name)`,
+ historical_prices: `CREATE TABLE IF NOT EXISTS historical_prices (
+ token VARCHAR,
+ base VARCHAR,
+ timestamp TIMESTAMP WITH TIME ZONE,
+ close NUMERIC(78, 18),
+ high NUMERIC(78, 18),
+ low NUMERIC(78, 18),
+ open NUMERIC(78, 18),
+ volume_from NUMERIC(78, 18),
+ volume_to NUMERIC(78, 18),
+ PRIMARY KEY (token, base, timestamp)
+ )`,
+ orders: `CREATE TABLE IF NOT EXISTS orders (
+ relayer_id VARCHAR,
+ exchange_contract_address CHAR(42),
+ maker CHAR(42),
+ maker_amount NUMERIC(78),
+ maker_fee NUMERIC(78),
+ maker_token CHAR(42),
+ taker CHAR(42),
+ taker_amount NUMERIC(78),
+ taker_fee NUMERIC(78),
+ taker_token CHAR(42),
+ fee_recipient CHAR(42),
+ expiration_unix_timestamp_sec NUMERIC(78),
+ salt VARCHAR,
+ order_hash CHAR(66),
+ PRIMARY KEY (relayer_id, order_hash)
+ )`,
+};
+function _safeQuery(query: string): any {
+ return new Promise((resolve, reject) => {
+ postgresClient
+ .query(query)
+ .then((data: any) => {
+ resolve(data);
+ })
+ .catch((err: any) => {
+ reject(err);
+ });
+ });
+}
+export const tableScripts = {
+ createTable(query: string): any {
+ return _safeQuery(query);
+ },
+ createAllTables(): any {
+ for (const tableName of tableQueries) {
+ _safeQuery(tableQueries[tableName]);
+ }
+ },
+};
+export const insertDataScripts = {
+ insertSingleRow(table: string, object: any): any {
+ return new Promise((resolve, reject) => {
+ const columns = Object.keys(object);
+ const safeArray: any = [];
+ for (const key of columns) {
+ if (key in object) {
+ if (key === 'timestamp') {
+ safeArray.push('to_timestamp(' + object[key] + ')');
+ } else if (typeof object[key] === 'string' || object[key] instanceof String) {
+ safeArray.push(formatters.escapeSQLParam(object[key]));
+ } else {
+ safeArray.push(object[key]);
+ }
+ } else {
+ safeArray.push('default');
+ }
+ }
+ const queryString = `INSERT INTO ${table} (${columns}) VALUES (${safeArray}) ON CONFLICT DO NOTHING`;
+ console.log(queryString);
+ postgresClient
+ .query(queryString)
+ .then((data: any) => {
+ resolve(data);
+ })
+ .catch((err: any) => {
+ reject(err);
+ });
+ });
+ },
+ insertMultipleRows(table: string, rows: any[], columns: any[]): any {
+ return new Promise((resolve, reject) => {
+ if (rows.length > 0) {
+ const rowsSplit = rows.map((value, index) => {
+ const safeArray: any = [];
+ for (const key of columns) {
+ if (key in value) {
+ if (key === 'timestamp') {
+ safeArray.push('to_timestamp(' + value[key] + ')');
+ } else if (typeof value[key] === 'string' || value[key] instanceof String) {
+ safeArray.push(formatters.escapeSQLParam(value[key]));
+ } else if (value[key] instanceof Array) {
+ const escapedArray = value[key].map((subValue: string, subIndex: number) => {
+ return formatters.escapeSQLParam(subValue);
+ });
+ safeArray.push('ARRAY[' + escapedArray.toString() + ']');
+ } else {
+ safeArray.push(value[key]);
+ }
+ } else {
+ safeArray.push('default');
+ }
+ }
+ return '(' + safeArray + ')';
+ });
+ const queryString = `INSERT INTO ${table} (${columns}) VALUES ${rowsSplit} ON CONFLICT DO NOTHING`;
+ postgresClient
+ .query(queryString)
+ .then((data: any) => {
+ resolve(data);
+ })
+ .catch((err: any) => {
+ // console.log(err);
+ reject(err);
+ });
+ } else {
+ resolve({});
+ }
+ });
+ },
+};
diff --git a/packages/pipeline/src/scripts/join_tables.ts b/packages/pipeline/src/scripts/join_tables.ts
new file mode 100644
index 000000000..e7c05b39a
--- /dev/null
+++ b/packages/pipeline/src/scripts/join_tables.ts
@@ -0,0 +1,234 @@
+import * as commandLineArgs from 'command-line-args';
+
+import { postgresClient } from '../postgres';
+import { formatters } from '../utils';
+const optionDefinitions = [
+ { name: 'name', alias: 'n', type: String },
+ { name: 'from', alias: 'f', type: Number },
+ { name: 'to', alias: 't', type: Number },
+];
+const cli = commandLineArgs(optionDefinitions);
+const dataInsertionQueries: any = {
+ events_staging: `INSERT INTO events_staging (
+ timestamp,
+ event_type,
+ error_id,
+ order_hash,
+ maker,
+ maker_amount,
+ maker_fee,
+ maker_token,
+ taker,
+ taker_amount,
+ taker_fee,
+ taker_token,
+ txn_hash,
+ fee_recipient,
+ block_number,
+ log_index
+ )
+ (SELECT
+ b.timestamp,
+ a.event_type,
+ a.error_id,
+ a.order_hash,
+ a.maker,
+ a.maker_amount,
+ a.maker_fee,
+ a.maker_token,
+ a.taker,
+ a.taker_amount,
+ a.taker_fee,
+ a.taker_token,
+ a.txn_hash,
+ a.fee_recipient,
+ a.block_number,
+ a.log_index
+ FROM
+ events_raw a
+ JOIN
+ blocks b
+ ON
+ a.block_number = b.block_number
+ AND
+ b.block_number >= $1
+ AND
+ b.block_number <= $2
+ ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`,
+ events: `INSERT INTO events (
+ timestamp,
+ event_type,
+ error_id,
+ order_hash,
+ maker,
+ maker_amount,
+ maker_fee,
+ maker_token,
+ taker,
+ taker_amount,
+ taker_fee,
+ taker_token,
+ txn_hash,
+ fee_recipient,
+ block_number,
+ log_index,
+ gas_used,
+ gas_price,
+ method_id,
+ salt
+ )
+ (SELECT
+ a.timestamp,
+ a.event_type,
+ a.error_id,
+ a.order_hash,
+ a.maker,
+ a.maker_amount,
+ a.maker_fee,
+ a.maker_token,
+ a.taker,
+ a.taker_amount,
+ a.taker_fee,
+ a.taker_token,
+ a.txn_hash,
+ a.fee_recipient,
+ a.block_number,
+ a.log_index,
+ b.gas_used,
+ b.gas_price,
+ b.method_id,
+ b.salt
+ FROM
+ events_staging a
+ JOIN
+ transactions b
+ ON
+ a.txn_hash = b.txn_hash
+ AND
+ a.block_number >= $1
+ AND
+ a.block_number <= $2
+ ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`,
+ events_full: `
+ INSERT INTO events_full (
+ timestamp,
+ event_type,
+ error_id,
+ order_hash,
+ maker,
+ maker_amount,
+ maker_fee,
+ maker_token,
+ taker,
+ taker_amount,
+ taker_fee,
+ taker_token,
+ txn_hash,
+ fee_recipient,
+ block_number,
+ log_index,
+ gas_used,
+ gas_price,
+ method_id,
+ salt,
+ taker_symbol,
+ taker_name,
+ taker_decimals,
+ taker_usd_price,
+ taker_txn_usd_value,
+ maker_symbol,
+ maker_name,
+ maker_decimals,
+ maker_usd_price,
+ maker_txn_usd_value
+ )
+ (SELECT
+ events.timestamp,
+ events.event_type,
+ events.error_id,
+ events.order_hash,
+ events.maker,
+ events.maker_amount,
+ events.maker_fee,
+ events.maker_token,
+ events.taker,
+ events.taker_amount,
+ events.taker_fee,
+ events.taker_token,
+ events.txn_hash,
+ events.fee_recipient,
+ events.block_number,
+ events.log_index,
+ events.gas_used,
+ events.gas_price,
+ events.method_id,
+ events.salt,
+ taker_token_prices.symbol,
+ taker_token_prices.name,
+ taker_token_prices.decimals,
+ taker_token_prices.price,
+ (events.taker_amount / (10 ^ taker_token_prices.decimals)) * taker_token_prices.price,
+ maker_token_prices.symbol,
+ maker_token_prices.name,
+ maker_token_prices.decimals,
+ maker_token_prices.price,
+ (events.maker_amount / (10 ^ maker_token_prices.decimals)) * maker_token_prices.price
+ FROM
+ events
+ LEFT JOIN
+ (SELECT
+ tokens.address,
+ tokens.name,
+ tokens.symbol,
+ tokens.decimals,
+ prices.timestamp,
+ prices.price
+ FROM
+ tokens
+ LEFT JOIN
+ prices
+ ON
+ tokens.symbol = prices.symbol) taker_token_prices
+ ON
+ (events.taker_token = taker_token_prices.address
+ AND
+ (DATE(events.timestamp) = DATE(taker_token_prices.timestamp) OR taker_token_prices.timestamp IS NULL))
+ LEFT JOIN
+ (SELECT
+ tokens.address,
+ tokens.name,
+ tokens.symbol,
+ tokens.decimals,
+ prices.timestamp,
+ prices.price
+ FROM
+ tokens
+ LEFT JOIN
+ prices
+ ON
+ tokens.symbol = prices.symbol) maker_token_prices
+ ON
+ (events.maker_token = maker_token_prices.address
+ AND
+ (DATE(events.timestamp) = DATE(maker_token_prices.timestamp) OR maker_token_prices.timestamp IS NULL))
+ WHERE
+ events.block_number >= $1
+ AND
+ events.block_number <= $2
+ ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`,
+};
+if (cli.name) {
+ const query = dataInsertionQueries[cli.name];
+ if (query && cli.from) {
+ const fromBlock = cli.from;
+ const toBlock = cli.to ? cli.to : cli.from + 1;
+ postgresClient
+ .query(query, [fromBlock, toBlock])
+ .then((data: any) => {
+ console.log(data);
+ })
+ .catch((err: any) => {
+ console.error(err);
+ });
+ }
+}
diff --git a/packages/pipeline/src/scripts/query_data.ts b/packages/pipeline/src/scripts/query_data.ts
new file mode 100644
index 000000000..97e3749ea
--- /dev/null
+++ b/packages/pipeline/src/scripts/query_data.ts
@@ -0,0 +1,87 @@
+import { formatters } from '../utils';
+export const dataFetchingQueries: any = {
+ get_missing_txn_hashes: `
+ SELECT
+ a.txn_hash
+ FROM
+ events_raw a
+ WHERE NOT EXISTS
+ (
+ SELECT
+ *
+ FROM
+ transactions b
+ WHERE
+ b.txn_hash = a.txn_hash
+ )
+ AND
+ a.block_number >= $1
+ AND
+ a.block_number < $2`,
+ get_used_block_numbers: `
+ SELECT DISTINCT
+ a.block_number
+ FROM
+ events_raw a
+ WHERE NOT EXISTS
+ (
+ SELECT
+ *
+ FROM
+ blocks b
+ WHERE
+ b.block_number = a.block_number
+ )
+ AND
+ a.block_number >= $1
+ AND
+ a.block_number < $2`,
+ get_token_registry: `
+ SELECT
+ *
+ FROM
+ tokens`,
+ get_max_block: `
+ SELECT
+ MAX(block_number)
+ FROM
+ events_raw`,
+ get_relayers: `
+ SELECT
+ *
+ FROM
+ relayers`,
+ get_most_recent_pricing_date: `
+ SELECT
+ MAX(DATE(timestamp))
+ FROM
+ prices
+ `,
+ get_top_unknown_token_addresses: `
+ SELECT a.token_address as address, a.txn_value / 2 as total_txn_value
+FROM
+(SELECT token_address, SUM(txn_value) as txn_value
+FROM
+(select a.timestamp, a.maker_token as token_address, (CASE WHEN a.taker_txn_usd_value > a.maker_txn_usd_value OR a.maker_txn_usd_value IS NULL
+ THEN a.taker_txn_usd_value
+ ELSE a.maker_txn_usd_value END) as txn_value
+ from events_full a
+ where a.event_type = 'LogFill'
+ and a.timestamp > (NOW() + INTERVAL '-24 hours')
+ union
+ select a.timestamp, a.taker_token as token_address, (CASE WHEN a.taker_txn_usd_value > a.maker_txn_usd_value OR a.maker_txn_usd_value IS NULL
+ THEN a.taker_txn_usd_value
+ ELSE a.maker_txn_usd_value END) as txn_value
+ from events_full a
+ where a.event_type = 'LogFill'
+ and a.timestamp > (NOW() + INTERVAL '-24 hours')) token_txn_values
+WHERE token_address IS NOT NULL
+AND txn_value > 0
+GROUP BY 1
+ORDER BY 2 DESC) a
+LEFT JOIN tokens b
+ON a.token_address = b.address
+WHERE symbol is NULL
+ORDER BY 2 DESC
+`,
+};
diff --git a/packages/pipeline/src/scripts/scrape_data.ts b/packages/pipeline/src/scripts/scrape_data.ts
new file mode 100644
index 000000000..963670b47
--- /dev/null
+++ b/packages/pipeline/src/scripts/scrape_data.ts
@@ -0,0 +1,649 @@
+import { ExchangeEvents, ZeroEx } from '0x.js';
+import { HttpClient, Order, OrderbookRequest, OrderbookResponse, TokenPairsItem } from '@0xproject/connect';
+import * as Airtable from 'airtable';
+import * as commandLineArgs from 'command-line-args';
+import * as _ from 'lodash';
+import * as querystring from 'querystring';
+import * as queue from 'queue';
+import * as request from 'request';
+import * as rpn from 'request-promise-native';
+
+import { HttpRequestOptions } from '../../../connect/lib/src/types.js';
+import { relayer } from '../models/relayer.js';
+import { token } from '../models/tokens.js';
+import { postgresClient } from '../postgres.js';
+import { typeConverters } from '../utils.js';
+import { web3, zrx } from '../zrx.js';
+
+import { insertDataScripts } from './create_tables.js';
+import { dataFetchingQueries } from './query_data.js';
+const optionDefinitions = [
+ { name: 'from', alias: 'f', type: Number },
+ { name: 'to', alias: 't', type: Number },
+ { name: 'type', type: String },
+ { name: 'id', type: String },
+ { name: 'force', type: Boolean },
+ { name: 'token', type: String },
+];
+const cli = commandLineArgs(optionDefinitions);
+const q = queue({ concurrency: 6, autostart: true });
+const airtableBase = new Airtable({ apiKey: process.env.AIRTABLE_API_KEY }).base(process.env.AIRTABLE_0X_BASE);
+const BLOCK_INCREMENTS = 1000;
+const BASE_SYMBOL = 'USD'; // use USD as base currency against
+const API_HIST_LIMIT = 2000; // cryptocompare API limits histoday price query to 2000 days
+const SECONDS_PER_DAY = 86400;
+const PRICE_API_ENDPOINT = 'https://min-api.cryptocompare.com/data/pricehistorical';
+const RELAYER_REGISTRY_JSON = 'https://raw.githubusercontent.com/0xProject/0x-relayer-registry/master/relayers.json';
+const METAMASK_ETH_CONTRACT_METADATA_JSON =
+ 'https://raw.githubusercontent.com/MetaMask/eth-contract-metadata/master/contract-map.json';
+const ETHPLORER_BASE_URL = 'http://api.ethplorer.io';
+const ETHPLORER_TOP_TOKENS_JSON = `${ETHPLORER_BASE_URL}/getTopTokens?apiKey=dyijm5418TjOJe34`;
+// const HIST_PRICE_API_ENDPOINT = 'https://min-api.cryptocompare.com/data/histoday';
+const AIRTABLE_RELAYER_INFO = 'Relayer Info';
+export const pullDataScripts = {
+ getAllEvents(fromBlockNumber: number, toBlockNumber: number): any {
+ return new Promise((resolve, reject) => {
+ const getLogsPromises: any[] = [];
+ getLogsPromises.push(
+ zrx.exchange.getLogsAsync(
+ ExchangeEvents.LogFill,
+ { fromBlock: fromBlockNumber, toBlock: toBlockNumber },
+ {},
+ ),
+ zrx.exchange.getLogsAsync(
+ ExchangeEvents.LogCancel,
+ { fromBlock: fromBlockNumber, toBlock: toBlockNumber },
+ {},
+ ),
+ zrx.exchange.getLogsAsync(
+ ExchangeEvents.LogError,
+ { fromBlock: fromBlockNumber, toBlock: toBlockNumber },
+ {},
+ ),
+ );
+ Promise.all(getLogsPromises)
+ .then((data: any[]) => {
+ resolve(data);
+ })
+ .catch((err: any) => {
+ reject(err);
+ });
+ });
+ },
+ getBlockInfo(blockNumber: number): any {
+ return new Promise((resolve, reject) => {
+ web3.eth.getBlock(blockNumber, (err, result) => {
+ if (err) {
+ reject(err);
+ } else {
+ resolve(result);
+ }
+ });
+ });
+ },
+ getTransactionInfo(transactionHash: string): any {
+ return new Promise((resolve, reject) => {
+ web3.eth.getTransaction(transactionHash, (err, result) => {
+ if (err) {
+ reject(err);
+ } else {
+ resolve(result);
+ }
+ });
+ });
+ },
+ getTokenRegistry(): any {
+ return new Promise((resolve, reject) => {
+ zrx.tokenRegistry
+ .getTokensAsync()
+ .then((data: any) => {
+ resolve(data);
+ })
+ .catch((err: any) => {
+ reject(err);
+ });
+ });
+ },
+ getMetaMaskTokens(): any {
+ return new Promise((resolve, reject) => {
+ request(METAMASK_ETH_CONTRACT_METADATA_JSON, (error, response, body) => {
+ if (error) {
+ reject(error);
+ } else {
+ resolve(JSON.parse(body));
+ }
+ });
+ });
+ },
+ getEthplorerTopTokens(): any {
+ return new Promise((resolve, reject) => {
+ request(ETHPLORER_TOP_TOKENS_JSON, (error, response, body) => {
+ if (error) {
+ reject(error);
+ } else {
+ resolve(JSON.parse(body));
+ }
+ });
+ });
+ },
+ getEthplorerToken(tokenAddress: string): any {
+ return new Promise((resolve, reject) => {
+ const url = `${ETHPLORER_BASE_URL}/getTokenInfo/${tokenAddress}?apiKey=dyijm5418TjOJe34`;
+ request(url, (error, response, body) => {
+ if (error) {
+ reject(error);
+ } else {
+ try {
+ const json = JSON.parse(body);
+ resolve(json);
+ } catch (err) {
+ resolve({ error: 'error' });
+ }
+ }
+ });
+ });
+ },
+ getPriceData(symbol: string, timestamp: number, timeDelay?: number): any {
+ return new Promise((resolve, reject) => {
+ if (symbol === 'WETH') {
+ symbol = 'ETH';
+ }
+ let parsedParams = querystring.stringify({
+ fsym: symbol,
+ tsyms: 'USD',
+ ts: timestamp / 1000,
+ });
+ console.debug(parsedParams);
+ setTimeout(() => {
+ request(PRICE_API_ENDPOINT + '?' + parsedParams, (error, response, body) => {
+ if (error) {
+ reject(error);
+ } else {
+ resolve(JSON.parse(body));
+ }
+ });
+ }, timeDelay);
+ });
+ },
+ getRelayers(): any {
+ return new Promise((resolve, reject) => {
+ request(RELAYER_REGISTRY_JSON, (error, response, body) => {
+ if (error) {
+ reject(error);
+ } else {
+ resolve(JSON.parse(body));
+ }
+ });
+ });
+ },
+ async getOrderBook(sraEndpoint: string): Promise<Object> {
+ const relayerClient = new HttpClient(sraEndpoint);
+ const tokenResponse: TokenPairsItem[] = await relayerClient.getTokenPairsAsync();
+ const fullOrderBook: OrderbookResponse[] = [];
+ for (const tokenPair of tokenResponse) {
+ const orderBookRequest: OrderbookRequest = {
+ baseTokenAddress: tokenPair.tokenA.address,
+ quoteTokenAddress: tokenPair.tokenB.address,
+ };
+ const orderBook: OrderbookResponse = await relayerClient.getOrderbookAsync(orderBookRequest);
+ fullOrderBook.push(orderBook);
+ }
+ return fullOrderBook;
+ },
+ // async getHistoricalPrices(
+ // fromSymbol: string,
+ // toSymbol: string,
+ // fromTimestamp: number,
+ // toTimestamp: number,
+ // ): Promise<HistoricalPriceResponse> {
+ // const daysInQueryPeriod = Math.round((toTimestamp - fromTimestamp) / (SECONDS_PER_DAY));
+ // if(fromSymbol === 'WETH') {
+ // fromSymbol = 'ETH';
+ // }
+ // var parsedParams = {
+ // fsym: fromSymbol,
+ // tsym: toSymbol,
+ // limit: Math.min(daysInQueryPeriod, API_HIST_LIMIT),
+ // toTs: toTimestamp,
+ // };
+ // var options = {
+ // uri: HIST_PRICE_API_ENDPOINT,
+ // qs: parsedParams,
+ // json: false,
+ // };
+ // try {
+ // const response = await rpn(options);
+ // return Promise.resolve(JSON.parse(response));
+ // } catch (error) {
+ // console.debug(error);
+ // return Promise.reject(error);
+ // }
+ // },
+};
+export const scrapeDataScripts = {
+ scrapeAllPricesToDB(fromTime: number, toTime: number) {
+ const fromDate = new Date(fromTime);
+ fromDate.setUTCHours(0);
+ fromDate.setUTCMinutes(0);
+ fromDate.setUTCSeconds(0);
+ fromDate.setUTCMilliseconds(0);
+ const toDate = new Date(toTime);
+ postgresClient
+ .query(dataFetchingQueries.get_token_registry, [])
+ .then((result: any) => {
+ for (const curDate = fromDate; curDate < toDate; curDate.setDate(curDate.getDate() + 1)) {
+ for (const token of Object.values(result.rows)) {
+ console.debug('Scraping ' + curDate + ' ' + token);
+ q.push(_scrapePriceToDB(curDate.getTime(), token, 500));
+ }
+ }
+ })
+ .catch((err: any) => {
+ console.debug(err);
+ });
+ },
+};
+function _scrapeEventsToDB(fromBlock: number, toBlock: number): any {
+ return (cb: () => void) => {
+ pullDataScripts
+ .getAllEvents(fromBlock, toBlock)
+ .then((data: any) => {
+ const parsedEvents: any = {};
+ parsedEvents[ExchangeEvents.LogFill] = [];
+ parsedEvents[ExchangeEvents.LogCancel] = [];
+ parsedEvents[ExchangeEvents.LogError] = [];
+ for (const index in data) {
+ for (const datum of data[index]) {
+ const event = typeConverters.convertLogEventToEventObject(datum);
+ parsedEvents[event.event_type].push(event);
+ }
+ }
+ console.log(fromBlock + ' : ' + toBlock + ' ' + parsedEvents[ExchangeEvents.LogFill].length);
+ for (const event_type in parsedEvents) {
+ if (parsedEvents[event_type].length > 0) {
+ insertDataScripts
+ .insertMultipleRows(
+ 'events_raw',
+ parsedEvents[event_type],
+ Object.keys(parsedEvents[event_type][0]),
+ )
+ .then(() => {})
+ .catch((error: any) => {});
+ }
+ }
+ cb();
+ })
+ .catch((err: any) => {
+ cb();
+ });
+ };
+}
+function _scrapeBlockToDB(block: number): any {
+ return (cb: () => void) => {
+ pullDataScripts
+ .getBlockInfo(block)
+ .then((data: any) => {
+ const parsedBlock = typeConverters.convertLogBlockToBlockObject(data);
+ insertDataScripts
+ .insertSingleRow('blocks', parsedBlock)
+ .then((result: any) => {
+ cb();
+ })
+ .catch((err: any) => {
+ cb();
+ });
+ })
+ .catch((err: any) => {
+ cb();
+ });
+ };
+}
+// function _scrapeAllRelayersToDB(): any {
+// return (cb: () => void) => {
+// airtableBase(AIRTABLE_RELAYER_INFO)
+// .select()
+// .eachPage((records: any, fetchNextPage: () => void) => {
+// const parsedRelayers: any[] = [];
+// for(const record of records) {
+// parsedRelayers.push(typeConverters.convertRelayerToRelayerObject(record));
+// }
+// insertDataScripts.insertMultipleRows('relayers', parsedRelayers, Object.keys(parsedRelayers[0]))
+// .then((result: any) => {
+// cb();
+// })
+// .catch((err: any) => {
+// cb();
+// });
+// })
+// .catch((err: any) => {
+// cb();
+// });
+// };
+// }
+function _scrapeAllRelayersToDB(): any {
+ return (cb: () => void) => {
+ pullDataScripts
+ .getRelayers()
+ .then((relayers: any[]) => {
+ console.log(relayers);
+ const parsedRelayers: any[] = [];
+ for (const relayer of relayers) {
+ parsedRelayers.push(typeConverters.convertRelayerToRelayerObject(relayer));
+ }
+ console.log(parsedRelayers);
+ insertDataScripts
+ .insertMultipleRows('relayers', parsedRelayers, Object.keys(relayer.tableProperties))
+ .then((result: any) => {
+ console.log(result);
+ cb();
+ })
+ .catch((err: any) => {
+ console.log(err);
+ cb();
+ });
+ })
+ .catch((err: any) => {
+ cb();
+ });
+ };
+}
+function _scrapeTransactionToDB(transactionHash: string): any {
+ return (cb: () => void) => {
+ pullDataScripts
+ .getTransactionInfo(transactionHash)
+ .then((data: any) => {
+ const parsedTransaction = typeConverters.convertLogTransactionToTransactionObject(data);
+ insertDataScripts
+ .insertSingleRow('transactions', parsedTransaction)
+ .then((result: any) => {
+ cb();
+ })
+ .catch((err: any) => {
+ cb();
+ });
+ })
+ .catch((err: any) => {
+ cb();
+ });
+ };
+}
+function _scrapeTokenRegistryToDB(): any {
+ return (cb: () => void) => {
+ pullDataScripts
+ .getTokenRegistry()
+ .then((data: any) => {
+ const parsedTokens: any = [];
+ for (const token of data) {
+ parsedTokens.push(typeConverters.convertLogTokenToTokenObject(token));
+ }
+ insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0]));
+ cb();
+ })
+ .catch((err: any) => {
+ cb();
+ });
+ };
+}
+function _scrapeMetaMaskEthContractMetadataToDB(): any {
+ return (cb: () => void) => {
+ pullDataScripts
+ .getMetaMaskTokens()
+ .then((data: any) => {
+ const parsedTokens: any = [];
+ const dataArray = _.map(_.keys(data), (tokenAddress: string) => {
+ const value = _.get(data, tokenAddress);
+ return {
+ address: tokenAddress,
+ ...value,
+ };
+ });
+ const erc20TokensOnly = _.filter(dataArray, entry => {
+ const isErc20 = _.get(entry, 'erc20');
+ return isErc20;
+ });
+ for (const token of erc20TokensOnly) {
+ parsedTokens.push(typeConverters.convertMetaMaskTokenToTokenObject(token));
+ }
+ insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0]));
+ cb();
+ })
+ .catch((err: any) => {
+ cb();
+ });
+ };
+}
+function _scrapeEthplorerTopTokensToDB(): any {
+ return (cb: () => void) => {
+ pullDataScripts
+ .getEthplorerTopTokens()
+ .then((data: any) => {
+ const parsedTokens: any = [];
+ const tokens = _.get(data, 'tokens');
+ for (const token of tokens) {
+ parsedTokens.push(typeConverters.convertMetaMaskTokenToTokenObject(token));
+ }
+ insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0]));
+ cb();
+ })
+ .catch((err: any) => {
+ cb();
+ });
+ };
+}
+function _scrapeUnknownTokenInformationToDB(): any {
+ return (cb: () => void) => {
+ postgresClient
+ .query(dataFetchingQueries.get_top_unknown_token_addresses)
+ .then(async (result: any) => {
+ const addresses = _.map(result.rows, row => _.get(row, 'address'));
+ const responses = await Promise.all(
+ _.map(addresses, address => pullDataScripts.getEthplorerToken(address)),
+ );
+ const tokens = _.filter(responses, response => _.isUndefined(_.get(response, 'error')));
+ const parsedTokens = _.map(tokens, tokenInfo =>
+ typeConverters.convertEthplorerTokenToTokenObject(tokenInfo),
+ );
+ insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0]));
+ cb();
+ })
+ .catch((err: any) => {
+ cb();
+ });
+ };
+}
+function _scrapePriceToDB(timestamp: number, token: any, timeDelay?: number): any {
+ return (cb: () => void) => {
+ pullDataScripts
+ .getPriceData(token.symbol, timestamp, timeDelay)
+ .then((data: any) => {
+ const safeSymbol = token.symbol === 'WETH' ? 'ETH' : token.symbol;
+ const parsedPrice = {
+ timestamp: timestamp / 1000,
+ symbol: token.symbol,
+ base: 'USD',
+ price: _.has(data[safeSymbol], 'USD') ? data[safeSymbol].USD : 0,
+ };
+ console.debug('Inserting ' + timestamp);
+ console.debug(parsedPrice);
+ insertDataScripts.insertSingleRow('prices', parsedPrice);
+ cb();
+ })
+ .catch((err: any) => {
+ console.debug(err);
+ cb();
+ });
+ };
+}
+// function _scrapeHistoricalPricesToDB(token: any, fromTimestamp: number, toTimestamp: number): any {
+// return (cb: () => void) => {
+// pullDataScripts
+// .getHistoricalPrices(token, BASE_SYMBOL, fromTimestamp, toTimestamp)
+// .then((data: any) => {
+// const parsedHistoricalPrices: any = [];
+// for (const historicalPrice of data['Data']) {
+// const parsedHistoricalPrice = typeConverters.convertLogHistoricalPricesToHistoricalPricesObject(historicalPrice);
+// parsedHistoricalPrice['token'] = token;
+// parsedHistoricalPrice['base'] = BASE_SYMBOL;
+// parsedHistoricalPrices.push(parsedHistoricalPrice);
+// }
+// if (parsedHistoricalPrices.length > 0) {
+// insertDataScripts
+// .insertMultipleRows(
+// 'historical_prices',
+// parsedHistoricalPrices,
+// Object.keys(parsedHistoricalPrices[0]),
+// )
+// .catch((error: any) => {
+// console.error(error);
+// });
+// }
+// cb();
+// })
+// .catch((error: any) => {
+// console.error(error);
+// cb();
+// });
+// };
+// }
+function _scrapeOrderBookToDB(id: string, sraEndpoint: string): any {
+ return (cb: () => void) => {
+ pullDataScripts
+ .getOrderBook(sraEndpoint)
+ .then((data: any) => {
+ for (const book of data) {
+ for (const order of book.bids) {
+ console.debug(order);
+ const parsedOrder = typeConverters.convertLogOrderToOrderObject(order);
+ parsedOrder.relayer_id = id;
+ parsedOrder.order_hash = ZeroEx.getOrderHashHex(order);
+ insertDataScripts.insertSingleRow('orders', parsedOrder).catch((error: any) => {
+ console.error(error);
+ });
+ }
+ for (const order of book.asks) {
+ console.debug(order);
+ const parsedOrder = typeConverters.convertLogOrderToOrderObject(order);
+ parsedOrder.relayer_id = id;
+ parsedOrder.order_hash = ZeroEx.getOrderHashHex(order);
+ insertDataScripts.insertSingleRow('orders', parsedOrder).catch((error: any) => {
+ console.error(error);
+ });
+ }
+ }
+ cb();
+ })
+ .catch((error: any) => {
+ console.error(error);
+ cb();
+ });
+ };
+}
+if (cli.type === 'events') {
+ if (cli.from && cli.to) {
+ const destToBlock = cli.to ? cli.to : cli.from;
+ let curFromBlock = cli.from;
+ let curToBlock = curFromBlock;
+ do {
+ curToBlock += destToBlock - curToBlock < BLOCK_INCREMENTS ? destToBlock - curToBlock : BLOCK_INCREMENTS;
+ q.push(_scrapeEventsToDB(curFromBlock, curToBlock));
+ curFromBlock = curToBlock + 1;
+ } while (curToBlock < destToBlock);
+ }
+} else if (cli.type === 'blocks') {
+ if (cli.from && cli.to) {
+ if (cli.force) {
+ const destToBlock = cli.to ? cli.to : cli.from;
+ let curFromBlock = cli.from;
+ const curToBlock = curFromBlock;
+ for (; curFromBlock < destToBlock; curFromBlock++) {
+ q.push(_scrapeBlockToDB(curFromBlock));
+ }
+ } else {
+ const fetchFrom = cli.from;
+ const fetchTo = cli.to ? cli.to : cli.from + 1;
+ postgresClient
+ .query(dataFetchingQueries.get_used_block_numbers, [fetchFrom, fetchTo])
+ .then((data: any) => {
+ for (const row of data.rows) {
+ q.push(_scrapeBlockToDB(row.block_number));
+ }
+ })
+ .catch((err: any) => {
+ // console.debug(err);
+ });
+ }
+ }
+} else if (cli.type === 'transactions') {
+ if (cli.id) {
+ q.push(_scrapeTransactionToDB(cli.id));
+ } else if (cli.from) {
+ const fetchFrom = cli.from;
+ const fetchTo = cli.to ? cli.to : cli.from + 1;
+ postgresClient
+ .query(dataFetchingQueries.get_missing_txn_hashes, [fetchFrom, fetchTo])
+ .then((data: any) => {
+ for (const row of data.rows) {
+ q.push(_scrapeTransactionToDB(row.txn_hash));
+ }
+ })
+ .catch((err: any) => {
+ // console.debug(err);
+ });
+ }
+} else if (cli.type === 'tokens') {
+ q.push(_scrapeMetaMaskEthContractMetadataToDB());
+ q.push(_scrapeEthplorerTopTokensToDB());
+} else if (cli.type === 'unknown_tokens') {
+ q.push(_scrapeUnknownTokenInformationToDB());
+} else if (cli.type === 'prices' && cli.from && cli.to) {
+ const fromDate = new Date(cli.from);
+ console.debug(fromDate);
+ fromDate.setUTCHours(0);
+ fromDate.setUTCMinutes(0);
+ fromDate.setUTCSeconds(0);
+ fromDate.setUTCMilliseconds(0);
+ console.debug(fromDate);
+ const toDate = new Date(cli.to);
+ postgresClient
+ .query(dataFetchingQueries.get_token_registry, [])
+ .then((result: any) => {
+ for (const curDate = fromDate; curDate < toDate; curDate.setDate(curDate.getDate() + 1)) {
+ for (const token of Object.values(result.rows)) {
+ console.debug('Scraping ' + curDate + ' ' + token);
+ q.push(_scrapePriceToDB(curDate.getTime(), token));
+ }
+ }
+ })
+ .catch((err: any) => {
+ console.debug(err);
+ });
+ // } else if (cli.type === 'historical_prices') {
+ // if (cli.token && cli.from && cli.to) {
+ // q.push(_scrapeHistoricalPricesToDB(cli.token, cli.from, cli.to));
+ // }
+ // } else if (cli.type === 'all_historical_prices') {
+ // if (cli.from && cli.to) {
+ // postgresClient
+ // .query(dataFetchingQueries.get_token_registry, [])
+ // .then((result: any) => {
+ // const curTokens: any = result.rows.map((a: any): any => a.symbol);
+ // for (const curToken of curTokens) {
+ // console.debug('Historical data backfill: Pushing coin ' + curToken);
+ // q.push(_scrapeHistoricalPricesToDB(curToken, cli.from, cli.to));
+ // }
+ // })
+ // .catch((err: any) => {
+ // console.debug(err);
+ // });
+ // }
+} else if (cli.type === 'relayers') {
+ q.push(_scrapeAllRelayersToDB());
+} else if (cli.type === 'orders') {
+ postgresClient.query(dataFetchingQueries.get_relayers, []).then((result: any) => {
+ for (const relayer of result.rows) {
+ if (relayer.sra_http_url) {
+ q.push(_scrapeOrderBookToDB(relayer.id, relayer.sra_http_url));
+ }
+ }
+ });
+}