aboutsummaryrefslogtreecommitdiffstats
path: root/packages/pipeline/src/scripts/join_tables.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/pipeline/src/scripts/join_tables.ts')
-rw-r--r--packages/pipeline/src/scripts/join_tables.ts234
1 files changed, 234 insertions, 0 deletions
diff --git a/packages/pipeline/src/scripts/join_tables.ts b/packages/pipeline/src/scripts/join_tables.ts
new file mode 100644
index 000000000..e7c05b39a
--- /dev/null
+++ b/packages/pipeline/src/scripts/join_tables.ts
@@ -0,0 +1,234 @@
+import * as commandLineArgs from 'command-line-args';
+
+import { postgresClient } from '../postgres';
+import { formatters } from '../utils';
+const optionDefinitions = [
+ { name: 'name', alias: 'n', type: String },
+ { name: 'from', alias: 'f', type: Number },
+ { name: 'to', alias: 't', type: Number },
+];
+const cli = commandLineArgs(optionDefinitions);
+const dataInsertionQueries: any = {
+ events_staging: `INSERT INTO events_staging (
+ timestamp,
+ event_type,
+ error_id,
+ order_hash,
+ maker,
+ maker_amount,
+ maker_fee,
+ maker_token,
+ taker,
+ taker_amount,
+ taker_fee,
+ taker_token,
+ txn_hash,
+ fee_recipient,
+ block_number,
+ log_index
+ )
+ (SELECT
+ b.timestamp,
+ a.event_type,
+ a.error_id,
+ a.order_hash,
+ a.maker,
+ a.maker_amount,
+ a.maker_fee,
+ a.maker_token,
+ a.taker,
+ a.taker_amount,
+ a.taker_fee,
+ a.taker_token,
+ a.txn_hash,
+ a.fee_recipient,
+ a.block_number,
+ a.log_index
+ FROM
+ events_raw a
+ JOIN
+ blocks b
+ ON
+ a.block_number = b.block_number
+ AND
+ b.block_number >= $1
+ AND
+ b.block_number <= $2
+ ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`,
+ events: `INSERT INTO events (
+ timestamp,
+ event_type,
+ error_id,
+ order_hash,
+ maker,
+ maker_amount,
+ maker_fee,
+ maker_token,
+ taker,
+ taker_amount,
+ taker_fee,
+ taker_token,
+ txn_hash,
+ fee_recipient,
+ block_number,
+ log_index,
+ gas_used,
+ gas_price,
+ method_id,
+ salt
+ )
+ (SELECT
+ a.timestamp,
+ a.event_type,
+ a.error_id,
+ a.order_hash,
+ a.maker,
+ a.maker_amount,
+ a.maker_fee,
+ a.maker_token,
+ a.taker,
+ a.taker_amount,
+ a.taker_fee,
+ a.taker_token,
+ a.txn_hash,
+ a.fee_recipient,
+ a.block_number,
+ a.log_index,
+ b.gas_used,
+ b.gas_price,
+ b.method_id,
+ b.salt
+ FROM
+ events_staging a
+ JOIN
+ transactions b
+ ON
+ a.txn_hash = b.txn_hash
+ AND
+ a.block_number >= $1
+ AND
+ a.block_number <= $2
+ ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`,
+ events_full: `
+ INSERT INTO events_full (
+ timestamp,
+ event_type,
+ error_id,
+ order_hash,
+ maker,
+ maker_amount,
+ maker_fee,
+ maker_token,
+ taker,
+ taker_amount,
+ taker_fee,
+ taker_token,
+ txn_hash,
+ fee_recipient,
+ block_number,
+ log_index,
+ gas_used,
+ gas_price,
+ method_id,
+ salt,
+ taker_symbol,
+ taker_name,
+ taker_decimals,
+ taker_usd_price,
+ taker_txn_usd_value,
+ maker_symbol,
+ maker_name,
+ maker_decimals,
+ maker_usd_price,
+ maker_txn_usd_value
+ )
+ (SELECT
+ events.timestamp,
+ events.event_type,
+ events.error_id,
+ events.order_hash,
+ events.maker,
+ events.maker_amount,
+ events.maker_fee,
+ events.maker_token,
+ events.taker,
+ events.taker_amount,
+ events.taker_fee,
+ events.taker_token,
+ events.txn_hash,
+ events.fee_recipient,
+ events.block_number,
+ events.log_index,
+ events.gas_used,
+ events.gas_price,
+ events.method_id,
+ events.salt,
+ taker_token_prices.symbol,
+ taker_token_prices.name,
+ taker_token_prices.decimals,
+ taker_token_prices.price,
+ (events.taker_amount / (10 ^ taker_token_prices.decimals)) * taker_token_prices.price,
+ maker_token_prices.symbol,
+ maker_token_prices.name,
+ maker_token_prices.decimals,
+ maker_token_prices.price,
+ (events.maker_amount / (10 ^ maker_token_prices.decimals)) * maker_token_prices.price
+ FROM
+ events
+ LEFT JOIN
+ (SELECT
+ tokens.address,
+ tokens.name,
+ tokens.symbol,
+ tokens.decimals,
+ prices.timestamp,
+ prices.price
+ FROM
+ tokens
+ LEFT JOIN
+ prices
+ ON
+ tokens.symbol = prices.symbol) taker_token_prices
+ ON
+ (events.taker_token = taker_token_prices.address
+ AND
+ (DATE(events.timestamp) = DATE(taker_token_prices.timestamp) OR taker_token_prices.timestamp IS NULL))
+ LEFT JOIN
+ (SELECT
+ tokens.address,
+ tokens.name,
+ tokens.symbol,
+ tokens.decimals,
+ prices.timestamp,
+ prices.price
+ FROM
+ tokens
+ LEFT JOIN
+ prices
+ ON
+ tokens.symbol = prices.symbol) maker_token_prices
+ ON
+ (events.maker_token = maker_token_prices.address
+ AND
+ (DATE(events.timestamp) = DATE(maker_token_prices.timestamp) OR maker_token_prices.timestamp IS NULL))
+ WHERE
+ events.block_number >= $1
+ AND
+ events.block_number <= $2
+ ) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`,
+};
+if (cli.name) {
+ const query = dataInsertionQueries[cli.name];
+ if (query && cli.from) {
+ const fromBlock = cli.from;
+ const toBlock = cli.to ? cli.to : cli.from + 1;
+ postgresClient
+ .query(query, [fromBlock, toBlock])
+ .then((data: any) => {
+ console.log(data);
+ })
+ .catch((err: any) => {
+ console.error(err);
+ });
+ }
+}