aboutsummaryrefslogtreecommitdiffstats
path: root/packages/pipeline/src
diff options
context:
space:
mode:
Diffstat (limited to 'packages/pipeline/src')
-rw-r--r--packages/pipeline/src/global.d.ts6
-rw-r--r--packages/pipeline/src/models/block.ts22
-rw-r--r--packages/pipeline/src/models/event.ts84
-rw-r--r--packages/pipeline/src/models/historical_prices.ts43
-rw-r--r--packages/pipeline/src/models/order.ts30
-rw-r--r--packages/pipeline/src/models/price.ts15
-rw-r--r--packages/pipeline/src/models/relayer.ts75
-rw-r--r--packages/pipeline/src/models/tokens.ts24
-rw-r--r--packages/pipeline/src/models/transaction.ts36
-rw-r--r--packages/pipeline/src/postgres.ts12
-rw-r--r--packages/pipeline/src/run_jobs.ts87
-rw-r--r--packages/pipeline/src/scrape_prices.ts10
-rw-r--r--packages/pipeline/src/scripts/create_tables.ts258
-rw-r--r--packages/pipeline/src/scripts/join_tables.ts234
-rw-r--r--packages/pipeline/src/scripts/query_data.ts87
-rw-r--r--packages/pipeline/src/scripts/scrape_data.ts649
-rw-r--r--packages/pipeline/src/utils.ts181
-rw-r--r--packages/pipeline/src/zrx.ts11
18 files changed, 1864 insertions, 0 deletions
diff --git a/packages/pipeline/src/global.d.ts b/packages/pipeline/src/global.d.ts
new file mode 100644
index 000000000..0af4ddb30
--- /dev/null
+++ b/packages/pipeline/src/global.d.ts
@@ -0,0 +1,6 @@
+declare module 'queue';
+declare module 'airtable';
+declare module '*.json' {
+ const value: any;
+ export default value;
+}
diff --git a/packages/pipeline/src/models/block.ts b/packages/pipeline/src/models/block.ts
new file mode 100644
index 000000000..a81cdb293
--- /dev/null
+++ b/packages/pipeline/src/models/block.ts
@@ -0,0 +1,22 @@
+const block = {
+ tableName: 'blocks',
+ tableProperties: {
+ id: {
+ type: 'key',
+ },
+ timestamp: {
+ type: 'timestamp',
+ required: true,
+ },
+ block_number: {
+ type: 'bigint',
+ required: true,
+ },
+ },
+};
+const logToBlockSchemaMapping: any = {
+ number: 'block_number',
+ hash: 'block_hash',
+ timestamp: 'timestamp',
+};
+export { block, logToBlockSchemaMapping };
diff --git a/packages/pipeline/src/models/event.ts b/packages/pipeline/src/models/event.ts
new file mode 100644
index 000000000..cb1c939e5
--- /dev/null
+++ b/packages/pipeline/src/models/event.ts
@@ -0,0 +1,84 @@
+const event = {
+ tableName: 'events',
+ tableProperties: {
+ id: {
+ type: 'key',
+ },
+ timestamp: {
+ type: 'timestamp',
+ required: true,
+ },
+ event_type: {
+ type: 'varchar',
+ required: true,
+ },
+ error_id: {
+ type: 'varchar',
+ },
+ order_hash: {
+ type: 'char(66)',
+ },
+ maker: {
+ type: 'char(42)',
+ },
+ maker_amount: {
+ type: 'varchar',
+ },
+ maker_fee: {
+ type: 'varchar',
+ },
+ maker_token: {
+ type: 'char(42)',
+ },
+ taker_amount: {
+ type: 'varchar',
+ },
+ taker_fee: {
+ type: 'varchar',
+ },
+ taker_token: {
+ type: 'char(42)',
+ },
+ txn_hash: {
+ type: 'char(66)',
+ },
+ gas_used: {
+ type: 'varchar',
+ },
+ gas_price: {
+ type: 'varchar',
+ },
+ fee_recipient: {
+ type: 'char(42)',
+ },
+ method_id: {
+ type: 'char(10)',
+ },
+ salt: {
+ type: 'varchar',
+ },
+ block_number: {
+ type: 'bigint',
+ },
+ },
+};
+const logToEventSchemaMapping: any = {
+ blockNumber: 'block_number',
+ transactionHash: 'txn_hash',
+ event: 'event_type',
+ logIndex: 'log_index',
+ 'args.maker': 'maker',
+ 'args.taker': 'taker',
+ 'args.feeRecipient': 'fee_recipient',
+ 'args.makerToken': 'maker_token',
+ 'args.takerToken': 'taker_token',
+ 'args.filledMakerTokenAmount': 'maker_amount',
+ 'args.filledTakerTokenAmount': 'taker_amount',
+ 'args.paidMakerFee': 'maker_fee',
+ 'args.paidTakerFee': 'taker_fee',
+ 'args.orderHash': 'order_hash',
+ 'args.cancelledMakerTokenAmount': 'maker_amount',
+ 'args.cancelledTakerTokenAmount': 'taker_amount',
+ 'args.errorId': 'error_id',
+};
+export { event, logToEventSchemaMapping };
diff --git a/packages/pipeline/src/models/historical_prices.ts b/packages/pipeline/src/models/historical_prices.ts
new file mode 100644
index 000000000..cf49b579e
--- /dev/null
+++ b/packages/pipeline/src/models/historical_prices.ts
@@ -0,0 +1,43 @@
+const historicalPrices = {
+ tableName: 'historical_prices',
+ tableProperties: {
+ token: {
+ type: 'varchar',
+ },
+ base: {
+ type: 'varchar',
+ },
+ timestamp: {
+ type: 'timestamp',
+ },
+ close: {
+ type: 'numeric(50)',
+ },
+ high: {
+ type: 'numeric(50)',
+ },
+ low: {
+ type: 'numeric(50)',
+ },
+ open: {
+ type: 'numeric(50)',
+ },
+ volume_from: {
+ type: 'numeric(50)',
+ },
+ volume_to: {
+ type: 'numeric(50)',
+ },
+ },
+};
+const logToHistoricalPricesSchema: { [log: string]: string } = {
+ token: 'token',
+ time: 'timestamp',
+ close: 'close',
+ high: 'high',
+ low: 'low',
+ open: 'open',
+ volumefrom: 'volume_from',
+ volumeto: 'volume_to',
+};
+export { historicalPrices, logToHistoricalPricesSchema };
diff --git a/packages/pipeline/src/models/order.ts b/packages/pipeline/src/models/order.ts
new file mode 100644
index 000000000..dadae2883
--- /dev/null
+++ b/packages/pipeline/src/models/order.ts
@@ -0,0 +1,30 @@
+const order = {
+ tableName: 'orders',
+ tableProperties: {
+ id: {
+ type: 'key',
+ },
+ timestamp: {
+ type: 'timestamp',
+ required: true,
+ },
+ block_number: {
+ type: 'bigint',
+ required: true,
+ },
+ },
+};
+const logToOrderSchemaMapping: any = {
+ exchangeContractAddress: 'exchange_contract_address',
+ maker: 'maker',
+ makerTokenAddress: 'maker_token',
+ makerTokenAmount: 'maker_amount',
+ makerFee: 'maker_fee',
+ taker: 'taker',
+ takerTokenAddress: 'taker_token',
+ takerTokenAmount: 'taker_amount',
+ takerFee: 'taker_fee',
+ expirationUnixTimestampSec: 'expiration_unix_timestamp_sec',
+ salt: 'salt',
+};
+export { order, logToOrderSchemaMapping };
diff --git a/packages/pipeline/src/models/price.ts b/packages/pipeline/src/models/price.ts
new file mode 100644
index 000000000..9e45cc2a9
--- /dev/null
+++ b/packages/pipeline/src/models/price.ts
@@ -0,0 +1,15 @@
+const price = {
+ tableName: 'prices',
+ tableProperties: {
+ address: {
+ type: 'char(42)',
+ },
+ timestamp: {
+ type: 'timestamp',
+ },
+ price: {
+ type: 'numeric(50)',
+ },
+ },
+};
+export { price };
diff --git a/packages/pipeline/src/models/relayer.ts b/packages/pipeline/src/models/relayer.ts
new file mode 100644
index 000000000..77a123e8b
--- /dev/null
+++ b/packages/pipeline/src/models/relayer.ts
@@ -0,0 +1,75 @@
+// const relayer = {
+// tableName: 'relayers',
+// tableProperties: {
+// id: {
+// type: 'integer',
+// },
+// name: {
+// type: 'varchar',
+// },
+// url : {
+// type: 'varchar',
+// },
+// model: {
+// type: 'varchar[]',
+// },
+// status: {
+// type: 'varchar',
+// },
+// sra_status: {
+// type: 'varchar',
+// },
+// sra_http_url: {
+// type: 'varchar',
+// },
+// known_fee_addresses: {
+// type: 'char(42)[]',
+// },
+// known_taker_addresses: {
+// type: 'char(42)[]',
+// },
+// relayer_type: {
+// type: 'varchar',
+// },
+// },
+// };
+const relayer = {
+ tableName: 'relayers',
+ tableProperties: {
+ name: {
+ type: 'varchar',
+ },
+ url: {
+ type: 'varchar',
+ },
+ sra_http_endpoint: {
+ type: 'varchar',
+ },
+ sra_ws_endpoint: {
+ type: 'varchar',
+ },
+ fee_recipient_addresses: {
+ type: 'char(42)[]',
+ },
+ taker_addresses: {
+ type: 'char(42)[]',
+ },
+ },
+};
+// const logToRelayerSchemaMapping: any = {
+// 'id' : 'id',
+// 'fields[\'Name\']': 'name',
+// 'fields[\'URL\']': 'url',
+// 'fields[\'Model\']': 'model',
+// 'fields[\'Status\']': 'status',
+// 'fields[\'SRA Status\']': 'sra_status',
+// 'fields[\'SRA HTTP URL\']': 'sra_http_url',
+// 'fields[\'Known Fee Addresses\']': 'known_fee_addresses',
+// 'fields[\'Known Taker Addresses\']': 'known_taker_addresses',
+// 'fields[\'Relayer Type\']': 'relayer_type',
+// };
+const logToRelayerSchemaMapping: any = {
+ name: 'name',
+ homepage_url: 'url',
+};
+export { relayer, logToRelayerSchemaMapping };
diff --git a/packages/pipeline/src/models/tokens.ts b/packages/pipeline/src/models/tokens.ts
new file mode 100644
index 000000000..96e8a31af
--- /dev/null
+++ b/packages/pipeline/src/models/tokens.ts
@@ -0,0 +1,24 @@
+const token = {
+ tableName: 'tokens',
+ tableProperties: {
+ address: {
+ type: 'char(66)',
+ },
+ decimals: {
+ type: 'bigint',
+ },
+ name: {
+ type: 'varchar',
+ },
+ symbol: {
+ type: 'varchar',
+ },
+ },
+};
+const logToTokenSchemaMapping: any = {
+ address: 'address',
+ decimals: 'decimals',
+ name: 'name',
+ symbol: 'symbol',
+};
+export { token, logToTokenSchemaMapping };
diff --git a/packages/pipeline/src/models/transaction.ts b/packages/pipeline/src/models/transaction.ts
new file mode 100644
index 000000000..715cc9480
--- /dev/null
+++ b/packages/pipeline/src/models/transaction.ts
@@ -0,0 +1,36 @@
+const transaction = {
+ tableName: 'transactions',
+ tableProperties: {
+ txn_hash: {
+ type: 'char(66)',
+ },
+ block_hash: {
+ type: 'char(66)',
+ },
+ block_number: {
+ type: 'bigint',
+ },
+ gas_used: {
+ type: 'varchar',
+ },
+ gas_price: {
+ type: 'varchar',
+ },
+ method_id: {
+ type: 'char(10)',
+ },
+ salt: {
+ type: 'varchar',
+ },
+ },
+};
+const logToTransactionSchemaMapping: any = {
+ hash: 'txn_hash',
+ gas: 'gas_used',
+ gasPrice: 'gas_price',
+ blockHash: 'block_hash',
+ blockNumber: 'block_number',
+ method_id: 'method_id',
+ salt: 'salt',
+};
+export { transaction, logToTransactionSchemaMapping };
diff --git a/packages/pipeline/src/postgres.ts b/packages/pipeline/src/postgres.ts
new file mode 100644
index 000000000..d095e5c9e
--- /dev/null
+++ b/packages/pipeline/src/postgres.ts
@@ -0,0 +1,12 @@
+import * as dotenv from 'dotenv';
+import { Pool, PoolConfig } from 'pg';
+dotenv.config();
+const client: PoolConfig = {
+ user: process.env.AURORA_USER,
+ database: process.env.AURORA_DB,
+ password: process.env.AURORA_PASSWORD,
+ port: parseInt(process.env.AURORA_PORT || '5432', 10),
+ host: process.env.AURORA_HOST,
+};
+const postgresClient = new Pool(client);
+export { postgresClient };
diff --git a/packages/pipeline/src/run_jobs.ts b/packages/pipeline/src/run_jobs.ts
new file mode 100644
index 000000000..4d82d4e2d
--- /dev/null
+++ b/packages/pipeline/src/run_jobs.ts
@@ -0,0 +1,87 @@
+import { exec } from 'child_process';
+
+import { postgresClient } from './postgres.js';
+import { dataFetchingQueries } from './scripts/query_data.js';
+import { web3, zrx } from './zrx.js';
+const CUR_BLOCK_OFFSET = 20;
+postgresClient.query(dataFetchingQueries.get_max_block, []).then((data: any) => {
+ const maxBlockNumber = data.rows[0].max;
+ const safeCurBlockNumber = web3.eth.blockNumber - CUR_BLOCK_OFFSET;
+ console.log('Scraping ' + maxBlockNumber + ' to ' + safeCurBlockNumber);
+ exec(
+ 'node ./lib/scripts/scrape_data --type events --from ' + maxBlockNumber + ' --to ' + safeCurBlockNumber,
+ (error, stdout, stderr) => {
+ if (error) {
+ console.log(error);
+ return;
+ }
+ console.log('Scraped events');
+ console.log('Scraping blocks');
+ exec(
+ 'node ./lib/scripts/scrape_data --type blocks --from ' + maxBlockNumber + ' --to ' + safeCurBlockNumber,
+ (error, stdout, stderr) => {
+ if (error) {
+ console.log(error);
+ return;
+ }
+ console.log('Scraped blocks');
+ console.log('Scraping transactions');
+ exec(
+ 'node ./lib/scripts/scrape_data --type transactions --from ' +
+ maxBlockNumber +
+ ' --to ' +
+ safeCurBlockNumber,
+ (error, stdout, stderr) => {
+ if (error) {
+ console.log(error);
+ return;
+ }
+ console.log('Scraped transactions');
+ console.log('Joining events_staging');
+ exec(
+ 'node ./lib/scripts/join_tables --name events_staging --from ' +
+ maxBlockNumber +
+ ' --to ' +
+ safeCurBlockNumber,
+ (error, stdout, stderr) => {
+ if (error) {
+ console.log(error);
+ return;
+ }
+ console.log('Joined events_staging');
+ console.log('Joining events');
+ exec(
+ 'node ./lib/scripts/join_tables --name events --from ' +
+ maxBlockNumber +
+ ' --to ' +
+ safeCurBlockNumber,
+ (error, stdout, stderr) => {
+ if (error) {
+ console.log(error);
+ return;
+ }
+ console.log('Joined events');
+ console.log('Joining events_full');
+ exec(
+ 'node ./lib/scripts/join_tables --name events_full --from ' +
+ maxBlockNumber +
+ ' --to ' +
+ safeCurBlockNumber,
+ (error, stdout, stderr) => {
+ if (error) {
+ console.log(error);
+ return;
+ }
+ },
+ );
+ },
+ );
+ },
+ );
+ },
+ );
+ },
+ );
+ },
+ );
+});
diff --git a/packages/pipeline/src/scrape_prices.ts b/packages/pipeline/src/scrape_prices.ts
new file mode 100644
index 000000000..c26062fac
--- /dev/null
+++ b/packages/pipeline/src/scrape_prices.ts
@@ -0,0 +1,10 @@
+import { postgresClient } from './postgres.js';
+import { dataFetchingQueries } from './scripts/query_data.js';
+import { scrapeDataScripts } from './scripts/scrape_data.js';
+import { web3, zrx } from './zrx.js';
+const CUR_BLOCK_OFFSET = 20;
+postgresClient.query(dataFetchingQueries.get_most_recent_pricing_date, []).then((data: any) => {
+ const curMaxScrapedDate = new Date(data.rows[0].max);
+ const curDate = new Date();
+ scrapeDataScripts.scrapeAllPricesToDB(curMaxScrapedDate.getTime(), curDate.getTime());
+});
diff --git a/packages/pipeline/src/scripts/create_tables.ts b/packages/pipeline/src/scripts/create_tables.ts
new file mode 100644
index 000000000..fd0d2b78b
--- /dev/null
+++ b/packages/pipeline/src/scripts/create_tables.ts
@@ -0,0 +1,258 @@
+import * as commandLineArgs from 'command-line-args';
+
+import { postgresClient } from '../postgres';
+import { formatters } from '../utils';
+const tableQueries: any = {
+ events_full: `CREATE TABLE IF NOT EXISTS events_full (
+ timestamp TIMESTAMP WITH TIME ZONE,
+ event_type VARCHAR,
+ error_id VARCHAR,
+ order_hash CHAR(66),
+ maker CHAR(42),
+ maker_amount NUMERIC(78),
+ maker_fee NUMERIC(78),
+ maker_token CHAR(42),
+ taker CHAR(42),
+ taker_amount NUMERIC(78),
+ taker_fee NUMERIC(78),
+ taker_token CHAR(42),
+ txn_hash CHAR(66),
+ gas_used NUMERIC(78),
+ gas_price NUMERIC(78),
+ fee_recipient CHAR(42),
+ method_id CHAR(10),
+ salt VARCHAR,
+ block_number BIGINT,
+ log_index BIGINT,
+ taker_symbol VARCHAR,
+ taker_name VARCHAR,
+ taker_decimals BIGINT,
+ taker_usd_price NUMERIC(78),
+ taker_txn_usd_value NUMERIC(78),
+ maker_symbol VARCHAR,
+ maker_name VARCHAR,
+ maker_decimals BIGINT,
+ maker_usd_price NUMERIC(78),
+ maker_txn_usd_value NUMERIC(78),
+ PRIMARY KEY (txn_hash, order_hash, log_index)
+ )`,
+ events: `CREATE TABLE IF NOT EXISTS events (
+ timestamp TIMESTAMP WITH TIME ZONE,
+ event_type VARCHAR,
+ error_id VARCHAR,
+ order_hash CHAR(66),
+ maker CHAR(42),
+ maker_amount NUMERIC(78),
+ maker_fee NUMERIC(78),
+ maker_token CHAR(42),
+ taker CHAR(42),
+ taker_amount NUMERIC(78),
+ taker_fee NUMERIC(78),
+ taker_token CHAR(42),
+ txn_hash CHAR(66),
+ gas_used NUMERIC(78),
+ gas_price NUMERIC(78),
+ fee_recipient CHAR(42),
+ method_id CHAR(10),
+ salt VARCHAR,
+ block_number BIGINT,
+ log_index BIGINT,
+ PRIMARY KEY (txn_hash, order_hash, log_index)
+ )`,
+ events_staging: `CREATE TABLE IF NOT EXISTS events_staging (
+ timestamp TIMESTAMP WITH TIME ZONE,
+ event_type VARCHAR,
+ error_id VARCHAR,
+ order_hash CHAR(66),
+ maker CHAR(42),
+ maker_amount NUMERIC(78),
+ maker_fee NUMERIC(78),
+ maker_token CHAR(42),
+ taker CHAR(42),
+ taker_amount NUMERIC(78),
+ taker_fee NUMERIC(78),
+ taker_token CHAR(42),
+ txn_hash CHAR(66),
+ fee_recipient CHAR(42),
+ block_number BIGINT,
+ log_index BIGINT,
+ PRIMARY KEY (txn_hash, order_hash, log_index)
+ )`,
+ events_raw: `CREATE TABLE IF NOT EXISTS events_raw (
+ event_type VARCHAR,
+ error_id VARCHAR,
+ order_hash CHAR(66),
+ maker CHAR(42),
+ maker_amount NUMERIC(78),
+ maker_fee NUMERIC(78),
+ maker_token CHAR(42),
+ taker CHAR(42),
+ taker_amount NUMERIC(78),
+ taker_fee NUMERIC(78),
+ taker_token CHAR(42),
+ txn_hash CHAR(66),
+ fee_recipient CHAR(42),
+ block_number BIGINT,
+ log_index BIGINT,
+ PRIMARY KEY (txn_hash, order_hash, log_index)
+ )`,
+ blocks: `CREATE TABLE IF NOT EXISTS blocks (
+ timestamp TIMESTAMP WITH TIME ZONE,
+ block_hash CHAR(66) UNIQUE,
+ block_number BIGINT,
+ PRIMARY KEY (block_hash)
+ )`,
+ transactions: `CREATE TABLE IF NOT EXISTS transactions (
+ txn_hash CHAR(66) UNIQUE,
+ block_hash CHAR(66),
+ block_number BIGINT,
+ gas_used NUMERIC(78),
+ gas_price NUMERIC(78),
+ method_id CHAR(10),
+ salt VARCHAR,
+ PRIMARY KEY (txn_hash)
+ )`,
+ tokens: `CREATE TABLE IF NOT EXISTS tokens (
+ address CHAR(42) UNIQUE,
+ name VARCHAR,
+ symbol VARCHAR,
+ decimals INT,
+ PRIMARY KEY (address)
+ )`,
+ prices: `CREATE TABLE IF NOT EXISTS prices (
+ address CHAR(42) UNIQUE,
+ timestamp TIMESTAMP WITH TIME ZONE,
+ price NUMERIC(78, 18),
+ PRIMARY KEY (address, timestamp)
+ )`,
+ relayers: `CREATE TABLE IF NOT EXISTS relayers (
+ name VARCHAR UNIQUE,
+ url VARCHAR DEFAULT '',
+ sra_http_endpoint VARCHAR DEFAULT '',
+ sra_ws_endpoint VARCHAR DEFAULT '',
+ fee_recipient_addresses CHAR(42)[] DEFAULT '{}',
+ taker_addresses CHAR(42)[] DEFAULT '{}',
+ PRIMARY KEY(name)`,
+ historical_prices: `CREATE TABLE IF NOT EXISTS historical_prices (
+ token VARCHAR,
+ base VARCHAR,
+ timestamp TIMESTAMP WITH TIME ZONE,
+ close NUMERIC(78, 18),
+ high NUMERIC(78, 18),
+ low NUMERIC(78, 18),
+ open NUMERIC(78, 18),
+ volume_from NUMERIC(78, 18),
+ volume_to NUMERIC(78, 18),
+ PRIMARY KEY (token, base, timestamp)
+ )`,
+ orders: `CREATE TABLE IF NOT EXISTS orders (
+ relayer_id VARCHAR,
+ exchange_contract_address CHAR(42),
+ maker CHAR(42),
+ maker_amount NUMERIC(78),
+ maker_fee NUMERIC(78),
+ maker_token CHAR(42),
+ taker CHAR(42),
+ taker_amount NUMERIC(78),
+ taker_fee NUMERIC(78),
+ taker_token CHAR(42),
+ fee_recipient CHAR(42),
+ expiration_unix_timestamp_sec NUMERIC(78),
+ salt VARCHAR,
+ order_hash CHAR(66),
+ PRIMARY KEY (relayer_id, order_hash)
+ )`,
+};
+function _safeQuery(query: string): any {
+ return new Promise((resolve, reject) => {
+ postgresClient
+ .query(query)
+ .then((data: any) => {
+ resolve(data);
+ })
+ .catch((err: any) => {
+ reject(err);
+ });
+ });
+}
+export const tableScripts = {
+ createTable(query: string): any {
+ return _safeQuery(query);
+ },
+ createAllTables(): any {
+ for (const tableName of tableQueries) {
+ _safeQuery(tableQueries[tableName]);
+ }
+ },
+};
+export const insertDataScripts = {
+ insertSingleRow(table: string, object: any): any {
+ return new Promise((resolve, reject) => {
+ const columns = Object.keys(object);
+ const safeArray: any = [];
+ for (const key of columns) {
+ if (key in object) {
+ if (key === 'timestamp') {
+ safeArray.push('to_timestamp(' + object[key] + ')');
+ } else if (typeof object[key] === 'string' || object[key] instanceof String) {
+ safeArray.push(formatters.escapeSQLParam(object[key]));
+ } else {
+ safeArray.push(object[key]);
+ }
+ } else {
+ safeArray.push('default');
+ }
+ }
+ const queryString = `INSERT INTO ${table} (${columns}) VALUES (${safeArray}) ON CONFLICT DO NOTHING`;
+ console.log(queryString);
+ postgresClient
+ .query(queryString)
+ .then((data: any) => {
+ resolve(data);
+ })
+ .catch((err: any) => {
+ reject(err);
+ });
+ });
+ },
+ insertMultipleRows(table: string, rows: any[], columns: any[]): any {
+ return new Promise((resolve, reject) => {
+ if (rows.length > 0) {
+ const rowsSplit = rows.map((value, index) => {
+ const safeArray: any = [];
+ for (const key of columns) {
+ if (key in value) {
+ if (key === 'timestamp') {
+ safeArray.push('to_timestamp(' + value[key] + ')');
+ } else if (typeof value[key] === 'string' || value[key] instanceof String) {
+ safeArray.push(formatters.escapeSQLParam(value[key]));
+ } else if (value[key] instanceof Array) {
+ const escapedArray = value[key].map((subValue: string, subIndex: number) => {
+ return formatters.escapeSQLParam(subValue);
+ });
+ safeArray.push('ARRAY[' + escapedArray.toString() + ']');
+ } else {
+ safeArray.push(value[key]);
+ }
+ } else {
+ safeArray.push('default');
+ }
+ }
+ return '(' + safeArray + ')';
+ });
+ const queryString = `INSERT INTO ${table} (${columns}) VALUES ${rowsSplit} ON CONFLICT DO NOTHING`;
+ postgresClient
+ .query(queryString)
+ .then((data: any) => {
+ resolve(data);
+ })
+ .catch((err: any) => {
+ // console.log(err);
+ reject(err);
+ });
+ } else {
+ resolve({});
+ }
+ });
+ },
+};
diff --git a/packages/pipeline/src/scripts/join_tables.ts b/packages/pipeline/src/scripts/join_tables.ts
new file mode 100644
index 000000000..e7c05b39a
--- /dev/null
+++ b/packages/pipeline/src/scripts/join_tables.ts
@@ -0,0 +1,234 @@
+import * as commandLineArgs from 'command-line-args';
+
+import { postgresClient } from '../postgres';
+import { formatters } from '../utils';
+const optionDefinitions = [
+ { name: 'name', alias: 'n', type: String },
+ { name: 'from', alias: 'f', type: Number },
+ { name: 'to', alias: 't', type: Number },
+];
+const cli = commandLineArgs(optionDefinitions);
+const dataInsertionQueries: any = {
+ events_staging: `INSERT INTO events_staging (
+ timestamp,
+ event_type,
+ error_id,
+ order_hash,
+ maker,
+ maker_amount,
+ maker_fee,
+ maker_token,
+ taker,
+ taker_amount,
+ taker_fee,
+ taker_token,
+ txn_hash,
+ fee_recipient,
+ block_number,
+ log_index
+ )
+ (SELECT
+ b.timestamp,
+ a.event_type,
+ a.error_id,
+ a.order_hash,
+ a.maker,
+ a.maker_amount,
+ a.maker_fee,
+ a.maker_token,
+ a.taker,
+ a.taker_amount,
+ a.taker_fee,
+ a.taker_token,
+ a.txn_hash,
+ a.fee_recipient,
+ a.block_number,
+ a.log_index
+ FROM
+ events_raw a
+ JOIN
+ blocks b
+ ON
+ a.block_number = b.block_number
+ AND
+ b.block_number >= $1
+ AND
+ b.block_number <= $2
+ ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`,
+ events: `INSERT INTO events (
+ timestamp,
+ event_type,
+ error_id,
+ order_hash,
+ maker,
+ maker_amount,
+ maker_fee,
+ maker_token,
+ taker,
+ taker_amount,
+ taker_fee,
+ taker_token,
+ txn_hash,
+ fee_recipient,
+ block_number,
+ log_index,
+ gas_used,
+ gas_price,
+ method_id,
+ salt
+ )
+ (SELECT
+ a.timestamp,
+ a.event_type,
+ a.error_id,
+ a.order_hash,
+ a.maker,
+ a.maker_amount,
+ a.maker_fee,
+ a.maker_token,
+ a.taker,
+ a.taker_amount,
+ a.taker_fee,
+ a.taker_token,
+ a.txn_hash,
+ a.fee_recipient,
+ a.block_number,
+ a.log_index,
+ b.gas_used,
+ b.gas_price,
+ b.method_id,
+ b.salt
+ FROM
+ events_staging a
+ JOIN
+ transactions b
+ ON
+ a.txn_hash = b.txn_hash
+ AND
+ a.block_number >= $1
+ AND
+ a.block_number <= $2
+ ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`,
+ events_full: `
+ INSERT INTO events_full (
+ timestamp,
+ event_type,
+ error_id,
+ order_hash,
+ maker,
+ maker_amount,
+ maker_fee,
+ maker_token,
+ taker,
+ taker_amount,
+ taker_fee,
+ taker_token,
+ txn_hash,
+ fee_recipient,
+ block_number,
+ log_index,
+ gas_used,
+ gas_price,
+ method_id,
+ salt,
+ taker_symbol,
+ taker_name,
+ taker_decimals,
+ taker_usd_price,
+ taker_txn_usd_value,
+ maker_symbol,
+ maker_name,
+ maker_decimals,
+ maker_usd_price,
+ maker_txn_usd_value
+ )
+ (SELECT
+ events.timestamp,
+ events.event_type,
+ events.error_id,
+ events.order_hash,
+ events.maker,
+ events.maker_amount,
+ events.maker_fee,
+ events.maker_token,
+ events.taker,
+ events.taker_amount,
+ events.taker_fee,
+ events.taker_token,
+ events.txn_hash,
+ events.fee_recipient,
+ events.block_number,
+ events.log_index,
+ events.gas_used,
+ events.gas_price,
+ events.method_id,
+ events.salt,
+ taker_token_prices.symbol,
+ taker_token_prices.name,
+ taker_token_prices.decimals,
+ taker_token_prices.price,
+ (events.taker_amount / (10 ^ taker_token_prices.decimals)) * taker_token_prices.price,
+ maker_token_prices.symbol,
+ maker_token_prices.name,
+ maker_token_prices.decimals,
+ maker_token_prices.price,
+ (events.maker_amount / (10 ^ maker_token_prices.decimals)) * maker_token_prices.price
+ FROM
+ events
+ LEFT JOIN
+ (SELECT
+ tokens.address,
+ tokens.name,
+ tokens.symbol,
+ tokens.decimals,
+ prices.timestamp,
+ prices.price
+ FROM
+ tokens
+ LEFT JOIN
+ prices
+ ON
+ tokens.symbol = prices.symbol) taker_token_prices
+ ON
+ (events.taker_token = taker_token_prices.address
+ AND
+ (DATE(events.timestamp) = DATE(taker_token_prices.timestamp) OR taker_token_prices.timestamp IS NULL))
+ LEFT JOIN
+ (SELECT
+ tokens.address,
+ tokens.name,
+ tokens.symbol,
+ tokens.decimals,
+ prices.timestamp,
+ prices.price
+ FROM
+ tokens
+ LEFT JOIN
+ prices
+ ON
+ tokens.symbol = prices.symbol) maker_token_prices
+ ON
+ (events.maker_token = maker_token_prices.address
+ AND
+ (DATE(events.timestamp) = DATE(maker_token_prices.timestamp) OR maker_token_prices.timestamp IS NULL))
+ WHERE
+ events.block_number >= $1
+ AND
+ events.block_number <= $2
+ ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`,
+};
+if (cli.name) {
+ const query = dataInsertionQueries[cli.name];
+ if (query && cli.from) {
+ const fromBlock = cli.from;
+ const toBlock = cli.to ? cli.to : cli.from + 1;
+ postgresClient
+ .query(query, [fromBlock, toBlock])
+ .then((data: any) => {
+ console.log(data);
+ })
+ .catch((err: any) => {
+ console.error(err);
+ });
+ }
+}
diff --git a/packages/pipeline/src/scripts/query_data.ts b/packages/pipeline/src/scripts/query_data.ts
new file mode 100644
index 000000000..97e3749ea
--- /dev/null
+++ b/packages/pipeline/src/scripts/query_data.ts
@@ -0,0 +1,87 @@
+import { formatters } from '../utils';
+export const dataFetchingQueries: any = {
+ get_missing_txn_hashes: `
+ SELECT
+ a.txn_hash
+ FROM
+ events_raw a
+ WHERE NOT EXISTS
+ (
+ SELECT
+ *
+ FROM
+ transactions b
+ WHERE
+ b.txn_hash = a.txn_hash
+ )
+ AND
+ a.block_number >= $1
+ AND
+ a.block_number < $2`,
+ get_used_block_numbers: `
+ SELECT DISTINCT
+ a.block_number
+ FROM
+ events_raw a
+ WHERE NOT EXISTS
+ (
+ SELECT
+ *
+ FROM
+ blocks b
+ WHERE
+ b.block_number = a.block_number
+ )
+ AND
+ a.block_number >= $1
+ AND
+ a.block_number < $2`,
+ get_token_registry: `
+ SELECT
+ *
+ FROM
+ tokens`,
+ get_max_block: `
+ SELECT
+ MAX(block_number)
+ FROM
+ events_raw`,
+ get_relayers: `
+ SELECT
+ *
+ FROM
+ relayers`,
+ get_most_recent_pricing_date: `
+ SELECT
+ MAX(DATE(timestamp))
+ FROM
+ prices
+ `,
+ get_top_unknown_token_addresses: `
+ SELECT a.token_address as address, a.txn_value / 2 as total_txn_value
+FROM
+(SELECT token_address, SUM(txn_value) as txn_value
+FROM
+(select a.timestamp, a.maker_token as token_address, (CASE WHEN a.taker_txn_usd_value > a.maker_txn_usd_value OR a.maker_txn_usd_value IS NULL
+ THEN a.taker_txn_usd_value
+ ELSE a.maker_txn_usd_value END) as txn_value
+ from events_full a
+ where a.event_type = 'LogFill'
+ and a.timestamp > (NOW() + INTERVAL '-24 hours')
+ union
+ select a.timestamp, a.taker_token as token_address, (CASE WHEN a.taker_txn_usd_value > a.maker_txn_usd_value OR a.maker_txn_usd_value IS NULL
+ THEN a.taker_txn_usd_value
+ ELSE a.maker_txn_usd_value END) as txn_value
+ from events_full a
+ where a.event_type = 'LogFill'
+ and a.timestamp > (NOW() + INTERVAL '-24 hours')) token_txn_values
+WHERE token_address IS NOT NULL
+AND txn_value > 0
+GROUP BY 1
+ORDER BY 2 DESC) a
+LEFT JOIN tokens b
+ON a.token_address = b.address
+WHERE symbol is NULL
+ORDER BY 2 DESC
+`,
+};
diff --git a/packages/pipeline/src/scripts/scrape_data.ts b/packages/pipeline/src/scripts/scrape_data.ts
new file mode 100644
index 000000000..963670b47
--- /dev/null
+++ b/packages/pipeline/src/scripts/scrape_data.ts
@@ -0,0 +1,649 @@
+import { ExchangeEvents, ZeroEx } from '0x.js';
+import { HttpClient, Order, OrderbookRequest, OrderbookResponse, TokenPairsItem } from '@0xproject/connect';
+import * as Airtable from 'airtable';
+import * as commandLineArgs from 'command-line-args';
+import * as _ from 'lodash';
+import * as querystring from 'querystring';
+import * as queue from 'queue';
+import * as request from 'request';
+import * as rpn from 'request-promise-native';
+
+import { HttpRequestOptions } from '../../../connect/lib/src/types.js';
+import { relayer } from '../models/relayer.js';
+import { token } from '../models/tokens.js';
+import { postgresClient } from '../postgres.js';
+import { typeConverters } from '../utils.js';
+import { web3, zrx } from '../zrx.js';
+
+import { insertDataScripts } from './create_tables.js';
+import { dataFetchingQueries } from './query_data.js';
+const optionDefinitions = [
+ { name: 'from', alias: 'f', type: Number },
+ { name: 'to', alias: 't', type: Number },
+ { name: 'type', type: String },
+ { name: 'id', type: String },
+ { name: 'force', type: Boolean },
+ { name: 'token', type: String },
+];
+const cli = commandLineArgs(optionDefinitions);
+const q = queue({ concurrency: 6, autostart: true });
+const airtableBase = new Airtable({ apiKey: process.env.AIRTABLE_API_KEY }).base(process.env.AIRTABLE_0X_BASE);
+const BLOCK_INCREMENTS = 1000;
+const BASE_SYMBOL = 'USD'; // use USD as base currency against
+const API_HIST_LIMIT = 2000; // cryptocompare API limits histoday price query to 2000 days
+const SECONDS_PER_DAY = 86400;
+const PRICE_API_ENDPOINT = 'https://min-api.cryptocompare.com/data/pricehistorical';
+const RELAYER_REGISTRY_JSON = 'https://raw.githubusercontent.com/0xProject/0x-relayer-registry/master/relayers.json';
+const METAMASK_ETH_CONTRACT_METADATA_JSON =
+ 'https://raw.githubusercontent.com/MetaMask/eth-contract-metadata/master/contract-map.json';
+const ETHPLORER_BASE_URL = 'http://api.ethplorer.io';
+const ETHPLORER_TOP_TOKENS_JSON = `${ETHPLORER_BASE_URL}/getTopTokens?apiKey=dyijm5418TjOJe34`;
+// const HIST_PRICE_API_ENDPOINT = 'https://min-api.cryptocompare.com/data/histoday';
+const AIRTABLE_RELAYER_INFO = 'Relayer Info';
+export const pullDataScripts = {
+ getAllEvents(fromBlockNumber: number, toBlockNumber: number): any {
+ return new Promise((resolve, reject) => {
+ const getLogsPromises: any[] = [];
+ getLogsPromises.push(
+ zrx.exchange.getLogsAsync(
+ ExchangeEvents.LogFill,
+ { fromBlock: fromBlockNumber, toBlock: toBlockNumber },
+ {},
+ ),
+ zrx.exchange.getLogsAsync(
+ ExchangeEvents.LogCancel,
+ { fromBlock: fromBlockNumber, toBlock: toBlockNumber },
+ {},
+ ),
+ zrx.exchange.getLogsAsync(
+ ExchangeEvents.LogError,
+ { fromBlock: fromBlockNumber, toBlock: toBlockNumber },
+ {},
+ ),
+ );
+ Promise.all(getLogsPromises)
+ .then((data: any[]) => {
+ resolve(data);
+ })
+ .catch((err: any) => {
+ reject(err);
+ });
+ });
+ },
+ getBlockInfo(blockNumber: number): any {
+ return new Promise((resolve, reject) => {
+ web3.eth.getBlock(blockNumber, (err, result) => {
+ if (err) {
+ reject(err);
+ } else {
+ resolve(result);
+ }
+ });
+ });
+ },
+ getTransactionInfo(transactionHash: string): any {
+ return new Promise((resolve, reject) => {
+ web3.eth.getTransaction(transactionHash, (err, result) => {
+ if (err) {
+ reject(err);
+ } else {
+ resolve(result);
+ }
+ });
+ });
+ },
+ getTokenRegistry(): any {
+ return new Promise((resolve, reject) => {
+ zrx.tokenRegistry
+ .getTokensAsync()
+ .then((data: any) => {
+ resolve(data);
+ })
+ .catch((err: any) => {
+ reject(err);
+ });
+ });
+ },
+ getMetaMaskTokens(): any {
+ return new Promise((resolve, reject) => {
+ request(METAMASK_ETH_CONTRACT_METADATA_JSON, (error, response, body) => {
+ if (error) {
+ reject(error);
+ } else {
+ resolve(JSON.parse(body));
+ }
+ });
+ });
+ },
+ getEthplorerTopTokens(): any {
+ return new Promise((resolve, reject) => {
+ request(ETHPLORER_TOP_TOKENS_JSON, (error, response, body) => {
+ if (error) {
+ reject(error);
+ } else {
+ resolve(JSON.parse(body));
+ }
+ });
+ });
+ },
+ getEthplorerToken(tokenAddress: string): any {
+ return new Promise((resolve, reject) => {
+ const url = `${ETHPLORER_BASE_URL}/getTokenInfo/${tokenAddress}?apiKey=dyijm5418TjOJe34`;
+ request(url, (error, response, body) => {
+ if (error) {
+ reject(error);
+ } else {
+ try {
+ const json = JSON.parse(body);
+ resolve(json);
+ } catch (err) {
+ resolve({ error: 'error' });
+ }
+ }
+ });
+ });
+ },
+ getPriceData(symbol: string, timestamp: number, timeDelay?: number): any {
+ return new Promise((resolve, reject) => {
+ if (symbol === 'WETH') {
+ symbol = 'ETH';
+ }
+ let parsedParams = querystring.stringify({
+ fsym: symbol,
+ tsyms: 'USD',
+ ts: timestamp / 1000,
+ });
+ console.debug(parsedParams);
+ setTimeout(() => {
+ request(PRICE_API_ENDPOINT + '?' + parsedParams, (error, response, body) => {
+ if (error) {
+ reject(error);
+ } else {
+ resolve(JSON.parse(body));
+ }
+ });
+ }, timeDelay);
+ });
+ },
+ getRelayers(): any {
+ return new Promise((resolve, reject) => {
+ request(RELAYER_REGISTRY_JSON, (error, response, body) => {
+ if (error) {
+ reject(error);
+ } else {
+ resolve(JSON.parse(body));
+ }
+ });
+ });
+ },
+ async getOrderBook(sraEndpoint: string): Promise<Object> {
+ const relayerClient = new HttpClient(sraEndpoint);
+ const tokenResponse: TokenPairsItem[] = await relayerClient.getTokenPairsAsync();
+ const fullOrderBook: OrderbookResponse[] = [];
+ for (const tokenPair of tokenResponse) {
+ const orderBookRequest: OrderbookRequest = {
+ baseTokenAddress: tokenPair.tokenA.address,
+ quoteTokenAddress: tokenPair.tokenB.address,
+ };
+ const orderBook: OrderbookResponse = await relayerClient.getOrderbookAsync(orderBookRequest);
+ fullOrderBook.push(orderBook);
+ }
+ return fullOrderBook;
+ },
+ // async getHistoricalPrices(
+ // fromSymbol: string,
+ // toSymbol: string,
+ // fromTimestamp: number,
+ // toTimestamp: number,
+ // ): Promise<HistoricalPriceResponse> {
+ // const daysInQueryPeriod = Math.round((toTimestamp - fromTimestamp) / (SECONDS_PER_DAY));
+ // if(fromSymbol === 'WETH') {
+ // fromSymbol = 'ETH';
+ // }
+ // var parsedParams = {
+ // fsym: fromSymbol,
+ // tsym: toSymbol,
+ // limit: Math.min(daysInQueryPeriod, API_HIST_LIMIT),
+ // toTs: toTimestamp,
+ // };
+ // var options = {
+ // uri: HIST_PRICE_API_ENDPOINT,
+ // qs: parsedParams,
+ // json: false,
+ // };
+ // try {
+ // const response = await rpn(options);
+ // return Promise.resolve(JSON.parse(response));
+ // } catch (error) {
+ // console.debug(error);
+ // return Promise.reject(error);
+ // }
+ // },
+};
+export const scrapeDataScripts = {
+ scrapeAllPricesToDB(fromTime: number, toTime: number) {
+ const fromDate = new Date(fromTime);
+ fromDate.setUTCHours(0);
+ fromDate.setUTCMinutes(0);
+ fromDate.setUTCSeconds(0);
+ fromDate.setUTCMilliseconds(0);
+ const toDate = new Date(toTime);
+ postgresClient
+ .query(dataFetchingQueries.get_token_registry, [])
+ .then((result: any) => {
+ for (const curDate = fromDate; curDate < toDate; curDate.setDate(curDate.getDate() + 1)) {
+ for (const token of Object.values(result.rows)) {
+ console.debug('Scraping ' + curDate + ' ' + token);
+ q.push(_scrapePriceToDB(curDate.getTime(), token, 500));
+ }
+ }
+ })
+ .catch((err: any) => {
+ console.debug(err);
+ });
+ },
+};
+function _scrapeEventsToDB(fromBlock: number, toBlock: number): any {
+ return (cb: () => void) => {
+ pullDataScripts
+ .getAllEvents(fromBlock, toBlock)
+ .then((data: any) => {
+ const parsedEvents: any = {};
+ parsedEvents[ExchangeEvents.LogFill] = [];
+ parsedEvents[ExchangeEvents.LogCancel] = [];
+ parsedEvents[ExchangeEvents.LogError] = [];
+ for (const index in data) {
+ for (const datum of data[index]) {
+ const event = typeConverters.convertLogEventToEventObject(datum);
+ parsedEvents[event.event_type].push(event);
+ }
+ }
+ console.log(fromBlock + ' : ' + toBlock + ' ' + parsedEvents[ExchangeEvents.LogFill].length);
+ for (const event_type in parsedEvents) {
+ if (parsedEvents[event_type].length > 0) {
+ insertDataScripts
+ .insertMultipleRows(
+ 'events_raw',
+ parsedEvents[event_type],
+ Object.keys(parsedEvents[event_type][0]),
+ )
+ .then(() => {})
+ .catch((error: any) => {});
+ }
+ }
+ cb();
+ })
+ .catch((err: any) => {
+ cb();
+ });
+ };
+}
+function _scrapeBlockToDB(block: number): any {
+ return (cb: () => void) => {
+ pullDataScripts
+ .getBlockInfo(block)
+ .then((data: any) => {
+ const parsedBlock = typeConverters.convertLogBlockToBlockObject(data);
+ insertDataScripts
+ .insertSingleRow('blocks', parsedBlock)
+ .then((result: any) => {
+ cb();
+ })
+ .catch((err: any) => {
+ cb();
+ });
+ })
+ .catch((err: any) => {
+ cb();
+ });
+ };
+}
+// function _scrapeAllRelayersToDB(): any {
+// return (cb: () => void) => {
+// airtableBase(AIRTABLE_RELAYER_INFO)
+// .select()
+// .eachPage((records: any, fetchNextPage: () => void) => {
+// const parsedRelayers: any[] = [];
+// for(const record of records) {
+// parsedRelayers.push(typeConverters.convertRelayerToRelayerObject(record));
+// }
+// insertDataScripts.insertMultipleRows('relayers', parsedRelayers, Object.keys(parsedRelayers[0]))
+// .then((result: any) => {
+// cb();
+// })
+// .catch((err: any) => {
+// cb();
+// });
+// })
+// .catch((err: any) => {
+// cb();
+// });
+// };
+// }
+function _scrapeAllRelayersToDB(): any {
+ return (cb: () => void) => {
+ pullDataScripts
+ .getRelayers()
+ .then((relayers: any[]) => {
+ console.log(relayers);
+ const parsedRelayers: any[] = [];
+ for (const relayer of relayers) {
+ parsedRelayers.push(typeConverters.convertRelayerToRelayerObject(relayer));
+ }
+ console.log(parsedRelayers);
+ insertDataScripts
+ .insertMultipleRows('relayers', parsedRelayers, Object.keys(relayer.tableProperties))
+ .then((result: any) => {
+ console.log(result);
+ cb();
+ })
+ .catch((err: any) => {
+ console.log(err);
+ cb();
+ });
+ })
+ .catch((err: any) => {
+ cb();
+ });
+ };
+}
+function _scrapeTransactionToDB(transactionHash: string): any {
+ return (cb: () => void) => {
+ pullDataScripts
+ .getTransactionInfo(transactionHash)
+ .then((data: any) => {
+ const parsedTransaction = typeConverters.convertLogTransactionToTransactionObject(data);
+ insertDataScripts
+ .insertSingleRow('transactions', parsedTransaction)
+ .then((result: any) => {
+ cb();
+ })
+ .catch((err: any) => {
+ cb();
+ });
+ })
+ .catch((err: any) => {
+ cb();
+ });
+ };
+}
+function _scrapeTokenRegistryToDB(): any {
+ return (cb: () => void) => {
+ pullDataScripts
+ .getTokenRegistry()
+ .then((data: any) => {
+ const parsedTokens: any = [];
+ for (const token of data) {
+ parsedTokens.push(typeConverters.convertLogTokenToTokenObject(token));
+ }
+ insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0]));
+ cb();
+ })
+ .catch((err: any) => {
+ cb();
+ });
+ };
+}
+function _scrapeMetaMaskEthContractMetadataToDB(): any {
+ return (cb: () => void) => {
+ pullDataScripts
+ .getMetaMaskTokens()
+ .then((data: any) => {
+ const parsedTokens: any = [];
+ const dataArray = _.map(_.keys(data), (tokenAddress: string) => {
+ const value = _.get(data, tokenAddress);
+ return {
+ address: tokenAddress,
+ ...value,
+ };
+ });
+ const erc20TokensOnly = _.filter(dataArray, entry => {
+ const isErc20 = _.get(entry, 'erc20');
+ return isErc20;
+ });
+ for (const token of erc20TokensOnly) {
+ parsedTokens.push(typeConverters.convertMetaMaskTokenToTokenObject(token));
+ }
+ insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0]));
+ cb();
+ })
+ .catch((err: any) => {
+ cb();
+ });
+ };
+}
+function _scrapeEthplorerTopTokensToDB(): any {
+ return (cb: () => void) => {
+ pullDataScripts
+ .getEthplorerTopTokens()
+ .then((data: any) => {
+ const parsedTokens: any = [];
+ const tokens = _.get(data, 'tokens');
+ for (const token of tokens) {
+ parsedTokens.push(typeConverters.convertMetaMaskTokenToTokenObject(token));
+ }
+ insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0]));
+ cb();
+ })
+ .catch((err: any) => {
+ cb();
+ });
+ };
+}
+function _scrapeUnknownTokenInformationToDB(): any {
+ return (cb: () => void) => {
+ postgresClient
+ .query(dataFetchingQueries.get_top_unknown_token_addresses)
+ .then(async (result: any) => {
+ const addresses = _.map(result.rows, row => _.get(row, 'address'));
+ const responses = await Promise.all(
+ _.map(addresses, address => pullDataScripts.getEthplorerToken(address)),
+ );
+ const tokens = _.filter(responses, response => _.isUndefined(_.get(response, 'error')));
+ const parsedTokens = _.map(tokens, tokenInfo =>
+ typeConverters.convertEthplorerTokenToTokenObject(tokenInfo),
+ );
+ insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0]));
+ cb();
+ })
+ .catch((err: any) => {
+ cb();
+ });
+ };
+}
+function _scrapePriceToDB(timestamp: number, token: any, timeDelay?: number): any {
+ return (cb: () => void) => {
+ pullDataScripts
+ .getPriceData(token.symbol, timestamp, timeDelay)
+ .then((data: any) => {
+ const safeSymbol = token.symbol === 'WETH' ? 'ETH' : token.symbol;
+ const parsedPrice = {
+ timestamp: timestamp / 1000,
+ symbol: token.symbol,
+ base: 'USD',
+ price: _.has(data[safeSymbol], 'USD') ? data[safeSymbol].USD : 0,
+ };
+ console.debug('Inserting ' + timestamp);
+ console.debug(parsedPrice);
+ insertDataScripts.insertSingleRow('prices', parsedPrice);
+ cb();
+ })
+ .catch((err: any) => {
+ console.debug(err);
+ cb();
+ });
+ };
+}
+// function _scrapeHistoricalPricesToDB(token: any, fromTimestamp: number, toTimestamp: number): any {
+// return (cb: () => void) => {
+// pullDataScripts
+// .getHistoricalPrices(token, BASE_SYMBOL, fromTimestamp, toTimestamp)
+// .then((data: any) => {
+// const parsedHistoricalPrices: any = [];
+// for (const historicalPrice of data['Data']) {
+// const parsedHistoricalPrice = typeConverters.convertLogHistoricalPricesToHistoricalPricesObject(historicalPrice);
+// parsedHistoricalPrice['token'] = token;
+// parsedHistoricalPrice['base'] = BASE_SYMBOL;
+// parsedHistoricalPrices.push(parsedHistoricalPrice);
+// }
+// if (parsedHistoricalPrices.length > 0) {
+// insertDataScripts
+// .insertMultipleRows(
+// 'historical_prices',
+// parsedHistoricalPrices,
+// Object.keys(parsedHistoricalPrices[0]),
+// )
+// .catch((error: any) => {
+// console.error(error);
+// });
+// }
+// cb();
+// })
+// .catch((error: any) => {
+// console.error(error);
+// cb();
+// });
+// };
+// }
+function _scrapeOrderBookToDB(id: string, sraEndpoint: string): any {
+ return (cb: () => void) => {
+ pullDataScripts
+ .getOrderBook(sraEndpoint)
+ .then((data: any) => {
+ for (const book of data) {
+ for (const order of book.bids) {
+ console.debug(order);
+ const parsedOrder = typeConverters.convertLogOrderToOrderObject(order);
+ parsedOrder.relayer_id = id;
+ parsedOrder.order_hash = ZeroEx.getOrderHashHex(order);
+ insertDataScripts.insertSingleRow('orders', parsedOrder).catch((error: any) => {
+ console.error(error);
+ });
+ }
+ for (const order of book.asks) {
+ console.debug(order);
+ const parsedOrder = typeConverters.convertLogOrderToOrderObject(order);
+ parsedOrder.relayer_id = id;
+ parsedOrder.order_hash = ZeroEx.getOrderHashHex(order);
+ insertDataScripts.insertSingleRow('orders', parsedOrder).catch((error: any) => {
+ console.error(error);
+ });
+ }
+ }
+ cb();
+ })
+ .catch((error: any) => {
+ console.error(error);
+ cb();
+ });
+ };
+}
+if (cli.type === 'events') {
+ if (cli.from && cli.to) {
+ const destToBlock = cli.to ? cli.to : cli.from;
+ let curFromBlock = cli.from;
+ let curToBlock = curFromBlock;
+ do {
+ curToBlock += destToBlock - curToBlock < BLOCK_INCREMENTS ? destToBlock - curToBlock : BLOCK_INCREMENTS;
+ q.push(_scrapeEventsToDB(curFromBlock, curToBlock));
+ curFromBlock = curToBlock + 1;
+ } while (curToBlock < destToBlock);
+ }
+} else if (cli.type === 'blocks') {
+ if (cli.from && cli.to) {
+ if (cli.force) {
+ const destToBlock = cli.to ? cli.to : cli.from;
+ let curFromBlock = cli.from;
+ const curToBlock = curFromBlock;
+ for (; curFromBlock < destToBlock; curFromBlock++) {
+ q.push(_scrapeBlockToDB(curFromBlock));
+ }
+ } else {
+ const fetchFrom = cli.from;
+ const fetchTo = cli.to ? cli.to : cli.from + 1;
+ postgresClient
+ .query(dataFetchingQueries.get_used_block_numbers, [fetchFrom, fetchTo])
+ .then((data: any) => {
+ for (const row of data.rows) {
+ q.push(_scrapeBlockToDB(row.block_number));
+ }
+ })
+ .catch((err: any) => {
+ // console.debug(err);
+ });
+ }
+ }
+} else if (cli.type === 'transactions') {
+ if (cli.id) {
+ q.push(_scrapeTransactionToDB(cli.id));
+ } else if (cli.from) {
+ const fetchFrom = cli.from;
+ const fetchTo = cli.to ? cli.to : cli.from + 1;
+ postgresClient
+ .query(dataFetchingQueries.get_missing_txn_hashes, [fetchFrom, fetchTo])
+ .then((data: any) => {
+ for (const row of data.rows) {
+ q.push(_scrapeTransactionToDB(row.txn_hash));
+ }
+ })
+ .catch((err: any) => {
+ // console.debug(err);
+ });
+ }
+} else if (cli.type === 'tokens') {
+ q.push(_scrapeMetaMaskEthContractMetadataToDB());
+ q.push(_scrapeEthplorerTopTokensToDB());
+} else if (cli.type === 'unknown_tokens') {
+ q.push(_scrapeUnknownTokenInformationToDB());
+} else if (cli.type === 'prices' && cli.from && cli.to) {
+ const fromDate = new Date(cli.from);
+ console.debug(fromDate);
+ fromDate.setUTCHours(0);
+ fromDate.setUTCMinutes(0);
+ fromDate.setUTCSeconds(0);
+ fromDate.setUTCMilliseconds(0);
+ console.debug(fromDate);
+ const toDate = new Date(cli.to);
+ postgresClient
+ .query(dataFetchingQueries.get_token_registry, [])
+ .then((result: any) => {
+ for (const curDate = fromDate; curDate < toDate; curDate.setDate(curDate.getDate() + 1)) {
+ for (const token of Object.values(result.rows)) {
+ console.debug('Scraping ' + curDate + ' ' + token);
+ q.push(_scrapePriceToDB(curDate.getTime(), token));
+ }
+ }
+ })
+ .catch((err: any) => {
+ console.debug(err);
+ });
+ // } else if (cli.type === 'historical_prices') {
+ // if (cli.token && cli.from && cli.to) {
+ // q.push(_scrapeHistoricalPricesToDB(cli.token, cli.from, cli.to));
+ // }
+ // } else if (cli.type === 'all_historical_prices') {
+ // if (cli.from && cli.to) {
+ // postgresClient
+ // .query(dataFetchingQueries.get_token_registry, [])
+ // .then((result: any) => {
+ // const curTokens: any = result.rows.map((a: any): any => a.symbol);
+ // for (const curToken of curTokens) {
+ // console.debug('Historical data backfill: Pushing coin ' + curToken);
+ // q.push(_scrapeHistoricalPricesToDB(curToken, cli.from, cli.to));
+ // }
+ // })
+ // .catch((err: any) => {
+ // console.debug(err);
+ // });
+ // }
+} else if (cli.type === 'relayers') {
+ q.push(_scrapeAllRelayersToDB());
+} else if (cli.type === 'orders') {
+ postgresClient.query(dataFetchingQueries.get_relayers, []).then((result: any) => {
+ for (const relayer of result.rows) {
+ if (relayer.sra_http_url) {
+ q.push(_scrapeOrderBookToDB(relayer.id, relayer.sra_http_url));
+ }
+ }
+ });
+}
diff --git a/packages/pipeline/src/utils.ts b/packages/pipeline/src/utils.ts
new file mode 100644
index 000000000..3474cd16b
--- /dev/null
+++ b/packages/pipeline/src/utils.ts
@@ -0,0 +1,181 @@
+import * as _ from 'lodash';
+
+import { block, logToBlockSchemaMapping } from './models/block';
+import { event, logToEventSchemaMapping } from './models/event';
+import { historicalPrices, logToHistoricalPricesSchema } from './models/historical_prices';
+import { logToOrderSchemaMapping, order } from './models/order';
+import { logToRelayerSchemaMapping } from './models/relayer';
+import { logToTokenSchemaMapping, token } from './models/tokens';
+import { logToTransactionSchemaMapping, transaction } from './models/transaction';
+export const typeConverters = {
+ convertLogEventToEventObject(log: any): any {
+ const newEvent: any = {};
+ for (const key in logToEventSchemaMapping) {
+ if (_.has(log, key)) {
+ newEvent[logToEventSchemaMapping[key]] = _.get(log, key);
+ if (newEvent[logToEventSchemaMapping[key]].constructor.name === 'BigNumber') {
+ newEvent[logToEventSchemaMapping[key]] = newEvent[logToEventSchemaMapping[key]].toString();
+ }
+ }
+ }
+ return newEvent;
+ },
+ convertLogBlockToBlockObject(logBlock: any): any {
+ const newBlock: any = {};
+ for (const key in logToBlockSchemaMapping) {
+ if (_.has(logBlock, key)) {
+ newBlock[logToBlockSchemaMapping[key]] = _.get(logBlock, key);
+ if (newBlock[logToBlockSchemaMapping[key]].constructor.name === 'BigNumber') {
+ newBlock[logToBlockSchemaMapping[key]] = newBlock[logToBlockSchemaMapping[key]].toString();
+ }
+ }
+ }
+ return newBlock;
+ },
+ convertLogTokenToTokenObject(logToken: any): any {
+ const newToken: any = {};
+ for (const key in logToTokenSchemaMapping) {
+ if (_.has(logToken, key)) {
+ newToken[logToTokenSchemaMapping[key]] = _.get(logToken, key);
+ if (newToken[logToTokenSchemaMapping[key]].constructor.name === 'BigNumber') {
+ newToken[logToTokenSchemaMapping[key]] = newToken[logToTokenSchemaMapping[key]].toString();
+ }
+ }
+ }
+ newToken[logToTokenSchemaMapping.address] = newToken[logToTokenSchemaMapping.address].toLowerCase();
+ return newToken;
+ },
+ convertMetaMaskTokenToTokenObject(metaMaskToken: any): any {
+ const newToken: any = {};
+ for (const key in logToTokenSchemaMapping) {
+ if (_.has(metaMaskToken, key)) {
+ newToken[logToTokenSchemaMapping[key]] = _.get(metaMaskToken, key);
+ }
+ }
+ newToken[logToTokenSchemaMapping.address] = newToken[logToTokenSchemaMapping.address].toLowerCase();
+ console.log(newToken);
+ return newToken;
+ },
+ convertEthplorerTokenToTokenObject(ethplorerToken: any): any {
+ const newToken: any = {};
+ for (const key in logToTokenSchemaMapping) {
+ if (_.has(ethplorerToken, key)) {
+ newToken[logToTokenSchemaMapping[key]] = _.get(ethplorerToken, key);
+ }
+ }
+ newToken[logToTokenSchemaMapping.address] = newToken[logToTokenSchemaMapping.address].toLowerCase();
+ return newToken;
+ },
+ convertLogTransactionToTransactionObject(logTransaction: any): any {
+ const newTransaction: any = {};
+ for (const key in logToTransactionSchemaMapping) {
+ if (_.has(logTransaction, key)) {
+ newTransaction[logToTransactionSchemaMapping[key]] = _.get(logTransaction, key);
+ if (newTransaction[logToTransactionSchemaMapping[key]].constructor.name === 'BigNumber') {
+ newTransaction[logToTransactionSchemaMapping[key]] = newTransaction[
+ logToTransactionSchemaMapping[key]
+ ].toString();
+ }
+ } else {
+ if (key === 'method_id') {
+ newTransaction[logToTransactionSchemaMapping[key]] = logTransaction.input.substring(0, 10);
+ } else if (key === 'salt') {
+ newTransaction[logToTransactionSchemaMapping[key]] =
+ '0x' + logTransaction.input.substring(714, 778); // Only God can judge me
+ }
+ }
+ }
+ return newTransaction;
+ },
+ // convertRelayerToRelayerObject(logRelayer: any): any {
+ // const newRelayer: any = {};
+ // for (const key in logToRelayerSchemaMapping) {
+ // if (_.has(logRelayer, key)) {
+ // newRelayer[logToRelayerSchemaMapping[key]] = _.get(logRelayer, key);
+ // if (newRelayer[logToRelayerSchemaMapping[key]].constructor.name === 'BigNumber') {
+ // newRelayer[logToRelayerSchemaMapping[key]] = newRelayer[logToRelayerSchemaMapping[key]].toString();
+ // }
+ // } else if((logToRelayerSchemaMapping[key] === 'known_fee_addresses' || logToRelayerSchemaMapping[key] === 'known_taker_addresses')) {
+ // newRelayer[logToRelayerSchemaMapping[key]] = '{}';
+ // } else {
+ // newRelayer[logToRelayerSchemaMapping[key]] = '';
+ // }
+ // }
+ // return newRelayer;
+ // },
+ convertRelayerToRelayerObject(logRelayer: any): any {
+ const newRelayer: any = {};
+ for (const key in logToRelayerSchemaMapping) {
+ if (_.has(logRelayer, key)) {
+ newRelayer[logToRelayerSchemaMapping[key]] = _.get(logRelayer, key);
+ if (newRelayer[logToRelayerSchemaMapping[key]].constructor.name === 'BigNumber') {
+ newRelayer[logToRelayerSchemaMapping[key]] = newRelayer[logToRelayerSchemaMapping[key]].toString();
+ }
+ } else if (
+ logToRelayerSchemaMapping[key] === 'known_fee_addresses' ||
+ logToRelayerSchemaMapping[key] === 'known_taker_addresses'
+ ) {
+ newRelayer[logToRelayerSchemaMapping[key]] = '{}';
+ } else {
+ newRelayer[logToRelayerSchemaMapping[key]] = '';
+ }
+ }
+ if (_.has(logRelayer, 'networks')) {
+ for (const network of logRelayer.networks) {
+ if (network.networkId === 1) {
+ if (_.has(network, 'sra_http_endpoint')) {
+ newRelayer.sra_http_endpoint = network.sra_http_endpoint;
+ }
+ if (_.has(network, 'sra_ws_endpoint')) {
+ newRelayer.sra_ws_endpoint = network.sra_ws_endpoint;
+ }
+ if (_.has(network, 'static_order_fields')) {
+ if (_.has(network, 'static_order_fields.fee_recipient_addresses')) {
+ newRelayer.fee_recipient_addresses = network.static_order_fields.fee_recipient_addresses;
+ }
+ if (_.has(network, 'static_order_fields.taker_addresses')) {
+ newRelayer.taker_addresses = network.static_order_fields.taker_addresses;
+ }
+ }
+ }
+ }
+ }
+ return newRelayer;
+ },
+ convertLogHistoricalPricesToHistoricalPricesObject(logHistoricalPrice: any): any {
+ const newHistoricalPrices: any = {};
+ for (const key in logToHistoricalPricesSchema) {
+ if (_.has(logHistoricalPrice, key)) {
+ newHistoricalPrices[logToHistoricalPricesSchema[key]] = _.get(logHistoricalPrice, key);
+ }
+ }
+ return newHistoricalPrices;
+ },
+ convertLogOrderToOrderObject(logOrder: any): any {
+ const newOrder: any = {};
+ for (const key in logToOrderSchemaMapping) {
+ if (_.has(logOrder, key)) {
+ console.log(key);
+ console.log(logOrder[key]);
+ newOrder[logToOrderSchemaMapping[key]] = _.get(logOrder, key);
+ if (newOrder[logToOrderSchemaMapping[key]].constructor.name === 'BigNumber') {
+ newOrder[logToOrderSchemaMapping[key]] = newOrder[logToOrderSchemaMapping[key]].toString();
+ }
+ }
+ }
+ console.log(newOrder);
+ return newOrder;
+ },
+};
+export const formatters = {
+ escapeSQLParams(params: any[]): string {
+ let escapedString = '';
+ for (const i in params) {
+ escapedString += "'" + params[i] + "',";
+ }
+ return escapedString.slice(0, -1);
+ },
+ escapeSQLParam(param: string): string {
+ return "'" + param + "'";
+ },
+};
diff --git a/packages/pipeline/src/zrx.ts b/packages/pipeline/src/zrx.ts
new file mode 100644
index 000000000..cbe10a55a
--- /dev/null
+++ b/packages/pipeline/src/zrx.ts
@@ -0,0 +1,11 @@
+import { ExchangeEvents, ZeroEx } from '0x.js';
+import * as dotenv from 'dotenv';
+import * as Web3 from 'web3';
+dotenv.config();
+const provider = new Web3.providers.HttpProvider(process.env.WEB3_PROVIDER_URL);
+const web3 = new Web3(provider);
+const MAINNET = 1;
+const zrx = new ZeroEx(provider, {
+ networkId: MAINNET,
+});
+export { web3, zrx };