aboutsummaryrefslogtreecommitdiffstats
path: root/packages/pipeline/src
diff options
context:
space:
mode:
Diffstat (limited to 'packages/pipeline/src')
-rw-r--r--packages/pipeline/src/data_sources/bloxy/index.ts133
-rw-r--r--packages/pipeline/src/entities/dex_trade.ts54
-rw-r--r--packages/pipeline/src/entities/index.ts7
-rw-r--r--packages/pipeline/src/ormconfig.ts4
-rw-r--r--packages/pipeline/src/parsers/bloxy/index.ts53
-rw-r--r--packages/pipeline/src/scripts/pull_competing_dex_trades.ts51
6 files changed, 298 insertions, 4 deletions
diff --git a/packages/pipeline/src/data_sources/bloxy/index.ts b/packages/pipeline/src/data_sources/bloxy/index.ts
new file mode 100644
index 000000000..31cd5bfd6
--- /dev/null
+++ b/packages/pipeline/src/data_sources/bloxy/index.ts
@@ -0,0 +1,133 @@
+import axios from 'axios';
+import * as R from 'ramda';
+
+// URL to use for getting dex trades from Bloxy.
+export const BLOXY_DEX_TRADES_URL = 'https://bloxy.info/api/dex/trades';
+// Number of trades to get at once. Must be less than or equal to MAX_OFFSET.
+const TRADES_PER_QUERY = 10000;
+// Maximum offset supported by the Bloxy API.
+const MAX_OFFSET = 100000;
+// Buffer to subtract from offset. This means we will request some trades twice
+// but we have less chance on missing out on any data.
+const OFFSET_BUFFER = 1000;
+// Maximum number of days supported by the Bloxy API.
+const MAX_DAYS = 30;
+// Buffer used for comparing the last seen timestamp to the last returned
+// timestamp. Increasing this reduces chances of data loss but also creates more
+// redundancy and can impact performance.
+// tslint:disable-next-line:custom-no-magic-numbers
+const LAST_SEEN_TIMESTAMP_BUFFER_MS = 1000 * 60 * 30; // 30 minutes
+
+// tslint:disable-next-line:custom-no-magic-numbers
+const millisecondsPerDay = 1000 * 60 * 60 * 24; // ms/d = ms/s * s/m * m/h * h/d
+
+export interface BloxyTrade {
+ tx_hash: string;
+ tx_time: string;
+ tx_date: string;
+ tx_sender: string;
+ smart_contract_id: number;
+ smart_contract_address: string;
+ contract_type: string;
+ maker: string;
+ taker: string;
+ amountBuy: number;
+ makerFee: number;
+ buyCurrencyId: number;
+ buySymbol: string;
+ amountSell: number;
+ takerFee: number;
+ sellCurrencyId: number;
+ sellSymbol: string;
+ maker_annotation: string;
+ taker_annotation: string;
+ protocol: string;
+ buyAddress: string | null;
+ sellAddress: string | null;
+}
+
+interface BloxyError {
+ error: string;
+}
+
+type BloxyResponse<T> = T | BloxyError;
+type BloxyTradeResponse = BloxyResponse<BloxyTrade[]>;
+
+function isError<T>(response: BloxyResponse<T>): response is BloxyError {
+ return (response as BloxyError).error !== undefined;
+}
+
+export class BloxySource {
+ private readonly _apiKey: string;
+
+ constructor(apiKey: string) {
+ this._apiKey = apiKey;
+ }
+
+ /**
+ * Gets all latest trades between the lastSeenTimestamp (minus some buffer)
+ * and the current time. Note that because the Bloxy API has some hard
+ * limits it might not always be possible to get *all* the trades in the
+ * desired time range.
+ * @param lastSeenTimestamp The latest timestamp for trades that have
+ * already been seen.
+ */
+ public async getDexTradesAsync(lastSeenTimestamp: number): Promise<BloxyTrade[]> {
+ let allTrades: BloxyTrade[] = [];
+
+ // Clamp numberOfDays so that it is always between 1 and MAX_DAYS (inclusive)
+ const numberOfDays = R.clamp(1, MAX_DAYS, getDaysSinceTimestamp(lastSeenTimestamp));
+
+ // Keep getting trades until we hit one of the following conditions:
+ //
+ // 1. Offset hits MAX_OFFSET (we can't go back any further).
+ // 2. There are no more trades in the response.
+ // 3. We see a tx_time equal to or earlier than lastSeenTimestamp (plus
+ // some buffer).
+ //
+ for (let offset = 0; offset <= MAX_OFFSET; offset += TRADES_PER_QUERY - OFFSET_BUFFER) {
+ const trades = await this._getTradesWithOffsetAsync(numberOfDays, offset);
+ if (trades.length === 0) {
+ // There are no more trades left for the days we are querying.
+ // This means we are done.
+ return filterDuplicateTrades(allTrades);
+ }
+ const sortedTrades = R.reverse(R.sortBy(trade => trade.tx_time, trades));
+ allTrades = allTrades.concat(sortedTrades);
+
+ // Check if lastReturnedTimestamp < lastSeenTimestamp
+ const lastReturnedTimestamp = new Date(sortedTrades[0].tx_time).getTime();
+ if (lastReturnedTimestamp < lastSeenTimestamp - LAST_SEEN_TIMESTAMP_BUFFER_MS) {
+ // We are at the point where we have already seen trades for the
+ // timestamp range that is being returned. We're done.
+ return filterDuplicateTrades(allTrades);
+ }
+ }
+ return filterDuplicateTrades(allTrades);
+ }
+
+ private async _getTradesWithOffsetAsync(numberOfDays: number, offset: number): Promise<BloxyTrade[]> {
+ const resp = await axios.get<BloxyTradeResponse>(BLOXY_DEX_TRADES_URL, {
+ params: {
+ key: this._apiKey,
+ days: numberOfDays,
+ limit: TRADES_PER_QUERY,
+ offset,
+ },
+ });
+ if (isError(resp.data)) {
+ throw new Error('Error in Bloxy API response: ' + resp.data.error);
+ }
+ return resp.data;
+ }
+}
+
+// Computes the number of days between the given timestamp and the current
+// timestamp (rounded up).
+function getDaysSinceTimestamp(timestamp: number): number {
+ const msSinceTimestamp = Date.now() - timestamp;
+ const daysSinceTimestamp = msSinceTimestamp / millisecondsPerDay;
+ return Math.ceil(daysSinceTimestamp);
+}
+
+const filterDuplicateTrades = R.uniqBy((trade: BloxyTrade) => trade.tx_hash);
diff --git a/packages/pipeline/src/entities/dex_trade.ts b/packages/pipeline/src/entities/dex_trade.ts
new file mode 100644
index 000000000..9d288cb51
--- /dev/null
+++ b/packages/pipeline/src/entities/dex_trade.ts
@@ -0,0 +1,54 @@
+import { BigNumber } from '@0x/utils';
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+import { bigNumberTransformer, numberToBigIntTransformer } from '../utils';
+
+@Entity({ name: 'dex_trades', schema: 'raw' })
+export class DexTrade {
+ @PrimaryColumn({ name: 'source_url' })
+ public sourceUrl!: string;
+ @PrimaryColumn({ name: 'tx_hash' })
+ public txHash!: string;
+
+ @Column({ name: 'tx_timestamp', type: 'bigint', transformer: numberToBigIntTransformer })
+ public txTimestamp!: number;
+ @Column({ name: 'tx_date' })
+ public txDate!: string;
+ @Column({ name: 'tx_sender' })
+ public txSender!: string;
+ @Column({ name: 'smart_contract_id', type: 'bigint', transformer: numberToBigIntTransformer })
+ public smartContractId!: number;
+ @Column({ name: 'smart_contract_address' })
+ public smartContractAddress!: string;
+ @Column({ name: 'contract_type' })
+ public contractType!: string;
+ @Column({ type: 'varchar' })
+ public maker!: string;
+ @Column({ type: 'varchar' })
+ public taker!: string;
+ @Column({ name: 'amount_buy', type: 'numeric', transformer: bigNumberTransformer })
+ public amountBuy!: BigNumber;
+ @Column({ name: 'maker_fee_amount', type: 'numeric', transformer: bigNumberTransformer })
+ public makerFeeAmount!: BigNumber;
+ @Column({ name: 'buy_currency_id', type: 'bigint', transformer: numberToBigIntTransformer })
+ public buyCurrencyId!: number;
+ @Column({ name: 'buy_symbol' })
+ public buySymbol!: string;
+ @Column({ name: 'amount_sell', type: 'numeric', transformer: bigNumberTransformer })
+ public amountSell!: BigNumber;
+ @Column({ name: 'taker_fee_amount', type: 'numeric', transformer: bigNumberTransformer })
+ public takerFeeAmount!: BigNumber;
+ @Column({ name: 'sell_currency_id', type: 'bigint', transformer: numberToBigIntTransformer })
+ public sellCurrencyId!: number;
+ @Column({ name: 'sell_symbol' })
+ public sellSymbol!: string;
+ @Column({ name: 'maker_annotation' })
+ public makerAnnotation!: string;
+ @Column({ name: 'taker_annotation' })
+ public takerAnnotation!: string;
+ @Column() public protocol!: string;
+ @Column({ name: 'buy_address', type: 'varchar', nullable: true })
+ public buyAddress!: string | null;
+ @Column({ name: 'sell_address', type: 'varchar', nullable: true })
+ public sellAddress!: string | null;
+}
diff --git a/packages/pipeline/src/entities/index.ts b/packages/pipeline/src/entities/index.ts
index a8f159a2a..d3056a477 100644
--- a/packages/pipeline/src/entities/index.ts
+++ b/packages/pipeline/src/entities/index.ts
@@ -3,14 +3,15 @@ import { ExchangeCancelUpToEvent } from './exchange_cancel_up_to_event';
import { ExchangeFillEvent } from './exchange_fill_event';
export { Block } from './block';
+export { DexTrade } from './dex_trade';
export { ExchangeCancelEvent } from './exchange_cancel_event';
export { ExchangeCancelUpToEvent } from './exchange_cancel_up_to_event';
export { ExchangeFillEvent } from './exchange_fill_event';
+export { OHLCVExternal } from './ohlcv_external';
export { Relayer } from './relayer';
export { SraOrder } from './sra_order';
-export { Transaction } from './transaction';
-export { TokenMetadata } from './token_metadata';
export { SraOrdersObservedTimeStamp, createObservedTimestampForOrder } from './sra_order_observed_timestamp';
-export { OHLCVExternal } from './ohlcv_external';
+export { TokenMetadata } from './token_metadata';
+export { Transaction } from './transaction';
export type ExchangeEvent = ExchangeFillEvent | ExchangeCancelEvent | ExchangeCancelUpToEvent;
diff --git a/packages/pipeline/src/ormconfig.ts b/packages/pipeline/src/ormconfig.ts
index e8277a439..fd6c7c39b 100644
--- a/packages/pipeline/src/ormconfig.ts
+++ b/packages/pipeline/src/ormconfig.ts
@@ -2,6 +2,7 @@ import { ConnectionOptions } from 'typeorm';
import {
Block,
+ DexTrade,
ExchangeCancelEvent,
ExchangeCancelUpToEvent,
ExchangeFillEvent,
@@ -14,6 +15,7 @@ import {
const entities = [
Block,
+ DexTrade,
ExchangeCancelEvent,
ExchangeCancelUpToEvent,
ExchangeFillEvent,
@@ -28,7 +30,7 @@ const config: ConnectionOptions = {
type: 'postgres',
url: process.env.ZEROEX_DATA_PIPELINE_DB_URL,
synchronize: false,
- // logging: ['error'],
+ logging: ['error'],
entities,
migrations: ['./lib/migrations/**/*.js'],
};
diff --git a/packages/pipeline/src/parsers/bloxy/index.ts b/packages/pipeline/src/parsers/bloxy/index.ts
new file mode 100644
index 000000000..af07c7507
--- /dev/null
+++ b/packages/pipeline/src/parsers/bloxy/index.ts
@@ -0,0 +1,53 @@
+import { BigNumber } from '@0x/utils';
+import * as R from 'ramda';
+
+import { BLOXY_DEX_TRADES_URL, BloxyTrade } from '../../data_sources/bloxy';
+import { DexTrade } from '../../entities';
+
+/**
+ * Parses a raw trades response from the Bloxy Dex API and returns an array of
+ * DexTrade entities.
+ * @param rawTrades A raw order response from an SRA endpoint.
+ */
+export function parseBloxyTrades(rawTrades: BloxyTrade[]): DexTrade[] {
+ return R.map(_parseBloxyTrade, rawTrades);
+}
+
+/**
+ * Converts a single Bloxy trade into a DexTrade entity.
+ * @param rawTrade A single trade from the response from the Bloxy API.
+ */
+export function _parseBloxyTrade(rawTrade: BloxyTrade): DexTrade {
+ const dexTrade = new DexTrade();
+ dexTrade.sourceUrl = BLOXY_DEX_TRADES_URL;
+ dexTrade.txHash = rawTrade.tx_hash;
+ dexTrade.txTimestamp = new Date(rawTrade.tx_time).getTime();
+ dexTrade.txDate = rawTrade.tx_date;
+ dexTrade.txSender = rawTrade.tx_sender;
+ dexTrade.smartContractId = rawTrade.smart_contract_id;
+ dexTrade.smartContractAddress = rawTrade.smart_contract_address;
+ dexTrade.contractType = rawTrade.contract_type;
+ dexTrade.maker = rawTrade.maker;
+ dexTrade.taker = rawTrade.taker;
+ // TODO(albrow): The Bloxy API returns amounts and fees as a `number` type
+ // but some of their values have too many significant digits to be
+ // represented that way. Ideally they will switch to using strings and then
+ // we can update this code.
+ dexTrade.amountBuy = new BigNumber(rawTrade.amountBuy.toString());
+ dexTrade.makerFeeAmount = new BigNumber(rawTrade.makerFee.toString());
+ dexTrade.buyCurrencyId = rawTrade.buyCurrencyId;
+ dexTrade.buySymbol = filterNullCharacters(rawTrade.buySymbol);
+ dexTrade.amountSell = new BigNumber(rawTrade.amountSell.toString());
+ dexTrade.takerFeeAmount = new BigNumber(rawTrade.takerFee.toString());
+ dexTrade.sellCurrencyId = rawTrade.sellCurrencyId;
+ dexTrade.sellSymbol = filterNullCharacters(rawTrade.sellSymbol);
+ dexTrade.makerAnnotation = rawTrade.maker_annotation;
+ dexTrade.takerAnnotation = rawTrade.taker_annotation;
+ dexTrade.protocol = rawTrade.protocol;
+ dexTrade.buyAddress = rawTrade.buyAddress;
+ dexTrade.sellAddress = rawTrade.sellAddress;
+ return dexTrade;
+}
+
+// Works with any form of escaoed null character (e.g., '\0' and '\u0000').
+const filterNullCharacters = R.replace(/\0/g, '');
diff --git a/packages/pipeline/src/scripts/pull_competing_dex_trades.ts b/packages/pipeline/src/scripts/pull_competing_dex_trades.ts
new file mode 100644
index 000000000..4e4c12dd0
--- /dev/null
+++ b/packages/pipeline/src/scripts/pull_competing_dex_trades.ts
@@ -0,0 +1,51 @@
+// tslint:disable:no-console
+import 'reflect-metadata';
+import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm';
+
+import { BloxySource } from '../data_sources/bloxy';
+import { DexTrade } from '../entities';
+import * as ormConfig from '../ormconfig';
+import { parseBloxyTrades } from '../parsers/bloxy';
+import { handleError } from '../utils';
+
+// Number of trades to save at once.
+const BATCH_SAVE_SIZE = 1000;
+
+let connection: Connection;
+
+(async () => {
+ connection = await createConnection(ormConfig as ConnectionOptions);
+ await getAndSaveTrades();
+ process.exit(0);
+})().catch(handleError);
+
+async function getAndSaveTrades(): Promise<void> {
+ const apiKey = process.env.BLOXY_API_KEY;
+ if (apiKey === undefined) {
+ throw new Error('Missing required env var: BLOXY_API_KEY');
+ }
+ const bloxySource = new BloxySource(apiKey);
+ const tradesRepository = connection.getRepository(DexTrade);
+ const lastSeenTimestamp = await getLastSeenTimestampAsync(tradesRepository);
+ console.log(`Last seen timestamp: ${lastSeenTimestamp === 0 ? 'none' : lastSeenTimestamp}`);
+ console.log('Getting latest dex trades...');
+ const rawTrades = await bloxySource.getDexTradesAsync(lastSeenTimestamp);
+ console.log(`Parsing ${rawTrades.length} trades...`);
+ const trades = parseBloxyTrades(rawTrades);
+ console.log(`Saving ${trades.length} trades...`);
+ await tradesRepository.save(trades, { chunk: Math.ceil(trades.length / BATCH_SAVE_SIZE) });
+ console.log('Done saving trades.');
+}
+
+async function getLastSeenTimestampAsync(tradesRepository: Repository<DexTrade>): Promise<number> {
+ if ((await tradesRepository.count()) === 0) {
+ return 0;
+ }
+ const response = (await connection.query(
+ 'SELECT tx_timestamp FROM raw.dex_trades ORDER BY tx_timestamp DESC LIMIT 1',
+ )) as Array<{ tx_timestamp: number }>;
+ if (response.length === 0) {
+ return 0;
+ }
+ return response[0].tx_timestamp;
+}