diff options
6 files changed, 50 insertions, 5 deletions
diff --git a/packages/pipeline/migrations/1548809952793-AllowDuplicateTxHashesInDexTrades.ts b/packages/pipeline/migrations/1548809952793-AllowDuplicateTxHashesInDexTrades.ts new file mode 100644 index 000000000..21b08f0ef --- /dev/null +++ b/packages/pipeline/migrations/1548809952793-AllowDuplicateTxHashesInDexTrades.ts @@ -0,0 +1,29 @@ +import { MigrationInterface, QueryRunner, TableColumn } from 'typeorm'; + +const DEX_TRADES_TABLE_NAME = 'raw.dex_trades'; + +export class AllowDuplicateTxHashesInDexTrades1548809952793 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise<any> { + const dexTradesTable = await queryRunner.getTable(DEX_TRADES_TABLE_NAME); + if (dexTradesTable) { + // Need new primary key to be non-null. No default value makes sense, so drop table. + await queryRunner.query(`DELETE from ${DEX_TRADES_TABLE_NAME}`); + // Composite key goes from (source_url, tx_hash) to (source_url, tx_hash, trade_index) + await queryRunner.addColumn( + DEX_TRADES_TABLE_NAME, + new TableColumn({ + name: 'trade_index', + type: 'varchar', + isPrimary: true, + }), + ); + } + } + + public async down(queryRunner: QueryRunner): Promise<any> { + const dexTradesTable = await queryRunner.getTable(DEX_TRADES_TABLE_NAME); + if (dexTradesTable) { + await queryRunner.dropColumn(dexTradesTable, 'trade_index'); + } + } +} diff --git a/packages/pipeline/src/data_sources/bloxy/index.ts b/packages/pipeline/src/data_sources/bloxy/index.ts index 94468d25a..22ab195b3 100644 --- a/packages/pipeline/src/data_sources/bloxy/index.ts +++ b/packages/pipeline/src/data_sources/bloxy/index.ts @@ -1,6 +1,8 @@ import axios from 'axios'; import * as R from 'ramda'; +import { logUtils } from '@0x/utils'; + // URL to use for getting dex trades from Bloxy. export const BLOXY_DEX_TRADES_URL = 'https://bloxy.info/api/dex/trades'; // Number of trades to get at once. Must be less than or equal to MAX_OFFSET. @@ -26,6 +28,7 @@ export interface BloxyTrade { tx_time: string; tx_date: string; tx_sender: string; + tradeIndex: string; smart_contract_id: number; smart_contract_address: string; contract_type: string; @@ -73,6 +76,15 @@ export class BloxySource { * already been seen. */ public async getDexTradesAsync(lastSeenTimestamp: number): Promise<BloxyTrade[]> { + const allTrades = await this._scrapeAllDexTradesAsync(lastSeenTimestamp); + logUtils.log(`Removing duplicates from ${allTrades.length} entries`); + const uniqueTrades = R.uniqBy((trade: BloxyTrade) => `${trade.tradeIndex}-${trade.tx_hash}`, allTrades); + logUtils.log(`Removed ${allTrades.length - uniqueTrades.length} duplicate entries`); + return uniqueTrades; + } + + // Potentially returns duplicate trades. + private async _scrapeAllDexTradesAsync(lastSeenTimestamp: number): Promise<BloxyTrade[]> { let allTrades: BloxyTrade[] = []; // Clamp numberOfDays so that it is always between 1 and MAX_DAYS (inclusive) @@ -90,7 +102,7 @@ export class BloxySource { if (trades.length === 0) { // There are no more trades left for the days we are querying. // This means we are done. - return filterDuplicateTrades(allTrades); + return allTrades; } const sortedTrades = R.reverse(R.sortBy(trade => trade.tx_time, trades)); allTrades = allTrades.concat(sortedTrades); @@ -100,10 +112,10 @@ export class BloxySource { if (lastReturnedTimestamp < lastSeenTimestamp - LAST_SEEN_TIMESTAMP_BUFFER_MS) { // We are at the point where we have already seen trades for the // timestamp range that is being returned. We're done. - return filterDuplicateTrades(allTrades); + return allTrades; } } - return filterDuplicateTrades(allTrades); + return allTrades; } private async _getTradesWithOffsetAsync(numberOfDays: number, offset: number): Promise<BloxyTrade[]> { @@ -129,5 +141,3 @@ function getDaysSinceTimestamp(timestamp: number): number { const daysSinceTimestamp = msSinceTimestamp / millisecondsPerDay; return Math.ceil(daysSinceTimestamp); } - -const filterDuplicateTrades = R.uniqBy((trade: BloxyTrade) => trade.tx_hash); diff --git a/packages/pipeline/src/entities/dex_trade.ts b/packages/pipeline/src/entities/dex_trade.ts index 9d288cb51..93dcaf238 100644 --- a/packages/pipeline/src/entities/dex_trade.ts +++ b/packages/pipeline/src/entities/dex_trade.ts @@ -9,6 +9,8 @@ export class DexTrade { public sourceUrl!: string; @PrimaryColumn({ name: 'tx_hash' }) public txHash!: string; + @PrimaryColumn({ name: 'trade_index' }) + public tradeIndex!: string; @Column({ name: 'tx_timestamp', type: 'bigint', transformer: numberToBigIntTransformer }) public txTimestamp!: number; diff --git a/packages/pipeline/src/parsers/bloxy/index.ts b/packages/pipeline/src/parsers/bloxy/index.ts index caa55d289..3d797aff0 100644 --- a/packages/pipeline/src/parsers/bloxy/index.ts +++ b/packages/pipeline/src/parsers/bloxy/index.ts @@ -21,6 +21,7 @@ export function _parseBloxyTrade(rawTrade: BloxyTrade): DexTrade { const dexTrade = new DexTrade(); dexTrade.sourceUrl = BLOXY_DEX_TRADES_URL; dexTrade.txHash = rawTrade.tx_hash; + dexTrade.tradeIndex = rawTrade.tradeIndex; dexTrade.txTimestamp = new Date(rawTrade.tx_time).getTime(); dexTrade.txDate = rawTrade.tx_date; dexTrade.txSender = rawTrade.tx_sender; diff --git a/packages/pipeline/test/entities/dex_trades_test.ts b/packages/pipeline/test/entities/dex_trades_test.ts index 83aaeec8f..7c4829988 100644 --- a/packages/pipeline/test/entities/dex_trades_test.ts +++ b/packages/pipeline/test/entities/dex_trades_test.ts @@ -33,6 +33,7 @@ const baseTrade = { takerAnnotation: '', protocol: 'Kyber Network Proxy', sellAddress: '0xbf2179859fc6d5bee9bf9158632dc51678a4100e', + tradeIndex: '3', }; const tradeWithNullAddresses: DexTrade = R.merge(baseTrade, { diff --git a/packages/pipeline/test/parsers/bloxy/index_test.ts b/packages/pipeline/test/parsers/bloxy/index_test.ts index 6aabb091d..d270bd2a7 100644 --- a/packages/pipeline/test/parsers/bloxy/index_test.ts +++ b/packages/pipeline/test/parsers/bloxy/index_test.ts @@ -17,6 +17,7 @@ const baseInput: BloxyTrade = { tx_time: '2018-11-21T09:06:28.000+00:00', tx_date: '2018-11-21', tx_sender: '0x00923b9a074762b93650716333b3e1473a15048e', + tradeIndex: '1', smart_contract_id: 7091917, smart_contract_address: '0x818e6fecd516ecc3849daf6845e3ec868087b755', contract_type: 'DEX/Kyber Network Proxy', @@ -40,6 +41,7 @@ const baseInput: BloxyTrade = { const baseExpected: DexTrade = { sourceUrl: BLOXY_DEX_TRADES_URL, txHash: '0xb93a7faf92efbbb5405c9a73cd4efd99702fe27c03ff22baee1f1b1e37b3a0bf', + tradeIndex: '1', txTimestamp: 1542791188000, txDate: '2018-11-21', txSender: '0x00923b9a074762b93650716333b3e1473a15048e', |