From 8bac655dc1c0e9a133492fe816c67f2c08b3e46b Mon Sep 17 00:00:00 2001 From: fragosti Date: Wed, 30 Jan 2019 09:39:59 -0800 Subject: Stop omitting trades because of duplicate tx hashes --- packages/pipeline/src/data_sources/bloxy/index.ts | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/packages/pipeline/src/data_sources/bloxy/index.ts b/packages/pipeline/src/data_sources/bloxy/index.ts index 94468d25a..3d9147757 100644 --- a/packages/pipeline/src/data_sources/bloxy/index.ts +++ b/packages/pipeline/src/data_sources/bloxy/index.ts @@ -1,6 +1,7 @@ import axios from 'axios'; import * as R from 'ramda'; +import { logUtils } from '@0x/utils'; // URL to use for getting dex trades from Bloxy. export const BLOXY_DEX_TRADES_URL = 'https://bloxy.info/api/dex/trades'; // Number of trades to get at once. Must be less than or equal to MAX_OFFSET. @@ -73,6 +74,15 @@ export class BloxySource { * already been seen. */ public async getDexTradesAsync(lastSeenTimestamp: number): Promise { + const allTrades = await this._scrapeAllDexTradesAsync(lastSeenTimestamp); + logUtils.log('Removing duplicate entries'); + const uniqueTrades = R.uniqBy(R.toString, allTrades) as BloxyTrade[]; + logUtils.log(`Removed ${allTrades.length - uniqueTrades.length} duplicate entries`); + return uniqueTrades; + } + + // Potentially returns duplicate trades. + private async _scrapeAllDexTradesAsync(lastSeenTimestamp: number): Promise { let allTrades: BloxyTrade[] = []; // Clamp numberOfDays so that it is always between 1 and MAX_DAYS (inclusive) @@ -90,7 +100,7 @@ export class BloxySource { if (trades.length === 0) { // There are no more trades left for the days we are querying. // This means we are done. - return filterDuplicateTrades(allTrades); + return allTrades; } const sortedTrades = R.reverse(R.sortBy(trade => trade.tx_time, trades)); allTrades = allTrades.concat(sortedTrades); @@ -100,10 +110,10 @@ export class BloxySource { if (lastReturnedTimestamp < lastSeenTimestamp - LAST_SEEN_TIMESTAMP_BUFFER_MS) { // We are at the point where we have already seen trades for the // timestamp range that is being returned. We're done. - return filterDuplicateTrades(allTrades); + return allTrades; } } - return filterDuplicateTrades(allTrades); + return allTrades; } private async _getTradesWithOffsetAsync(numberOfDays: number, offset: number): Promise { @@ -129,5 +139,3 @@ function getDaysSinceTimestamp(timestamp: number): number { const daysSinceTimestamp = msSinceTimestamp / millisecondsPerDay; return Math.ceil(daysSinceTimestamp); } - -const filterDuplicateTrades = R.uniqBy((trade: BloxyTrade) => trade.tx_hash); -- cgit v1.2.3 From 5ad2e9d6b6b1ff7c5327975501e8c042a5817ab9 Mon Sep 17 00:00:00 2001 From: fragosti Date: Wed, 30 Jan 2019 10:06:04 -0800 Subject: write migration to add trade_index primary key to dex_trades --- ...8809952793-AllowDuplicateTxHashesInDexTrades.ts | 27 ++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 packages/pipeline/migrations/1548809952793-AllowDuplicateTxHashesInDexTrades.ts diff --git a/packages/pipeline/migrations/1548809952793-AllowDuplicateTxHashesInDexTrades.ts b/packages/pipeline/migrations/1548809952793-AllowDuplicateTxHashesInDexTrades.ts new file mode 100644 index 000000000..1b396918e --- /dev/null +++ b/packages/pipeline/migrations/1548809952793-AllowDuplicateTxHashesInDexTrades.ts @@ -0,0 +1,27 @@ +import { MigrationInterface, QueryRunner, TableColumn } from 'typeorm'; + +const DEX_TRADES_TABLE_NAME = 'raw.dex_trades'; + +export class AllowDuplicateTxHashesInDexTrades1548809952793 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + const dexTradesTable = await queryRunner.getTable(DEX_TRADES_TABLE_NAME); + if (dexTradesTable) { + // Composite key goes from (source_url, tx_hash) to (trade_index, source_url, tx_hash) + await queryRunner.addColumn( + DEX_TRADES_TABLE_NAME, + new TableColumn({ + name: 'trade_index', + type: 'varchar', + isPrimary: true, + }), + ); + } + } + + public async down(queryRunner: QueryRunner): Promise { + const dexTradesTable = await queryRunner.getTable(DEX_TRADES_TABLE_NAME); + if (dexTradesTable) { + await queryRunner.dropColumn(dexTradesTable, 'trade_index'); + } + } +} -- cgit v1.2.3 From c2ad95fd94611dba2030778553380cf07a85a4c3 Mon Sep 17 00:00:00 2001 From: fragosti Date: Wed, 30 Jan 2019 10:12:15 -0800 Subject: update types to adhere to new dex trades and bloxy api format --- packages/pipeline/src/data_sources/bloxy/index.ts | 1 + packages/pipeline/src/entities/dex_trade.ts | 2 ++ packages/pipeline/src/parsers/bloxy/index.ts | 1 + packages/pipeline/test/entities/dex_trades_test.ts | 1 + packages/pipeline/test/parsers/bloxy/index_test.ts | 2 ++ yarn.lock | 25 ++++++++++++++++++++++ 6 files changed, 32 insertions(+) diff --git a/packages/pipeline/src/data_sources/bloxy/index.ts b/packages/pipeline/src/data_sources/bloxy/index.ts index 3d9147757..bba424f67 100644 --- a/packages/pipeline/src/data_sources/bloxy/index.ts +++ b/packages/pipeline/src/data_sources/bloxy/index.ts @@ -27,6 +27,7 @@ export interface BloxyTrade { tx_time: string; tx_date: string; tx_sender: string; + tradeIndex: string; smart_contract_id: number; smart_contract_address: string; contract_type: string; diff --git a/packages/pipeline/src/entities/dex_trade.ts b/packages/pipeline/src/entities/dex_trade.ts index 9d288cb51..93dcaf238 100644 --- a/packages/pipeline/src/entities/dex_trade.ts +++ b/packages/pipeline/src/entities/dex_trade.ts @@ -9,6 +9,8 @@ export class DexTrade { public sourceUrl!: string; @PrimaryColumn({ name: 'tx_hash' }) public txHash!: string; + @PrimaryColumn({ name: 'trade_index' }) + public tradeIndex!: string; @Column({ name: 'tx_timestamp', type: 'bigint', transformer: numberToBigIntTransformer }) public txTimestamp!: number; diff --git a/packages/pipeline/src/parsers/bloxy/index.ts b/packages/pipeline/src/parsers/bloxy/index.ts index caa55d289..3d797aff0 100644 --- a/packages/pipeline/src/parsers/bloxy/index.ts +++ b/packages/pipeline/src/parsers/bloxy/index.ts @@ -21,6 +21,7 @@ export function _parseBloxyTrade(rawTrade: BloxyTrade): DexTrade { const dexTrade = new DexTrade(); dexTrade.sourceUrl = BLOXY_DEX_TRADES_URL; dexTrade.txHash = rawTrade.tx_hash; + dexTrade.tradeIndex = rawTrade.tradeIndex; dexTrade.txTimestamp = new Date(rawTrade.tx_time).getTime(); dexTrade.txDate = rawTrade.tx_date; dexTrade.txSender = rawTrade.tx_sender; diff --git a/packages/pipeline/test/entities/dex_trades_test.ts b/packages/pipeline/test/entities/dex_trades_test.ts index 83aaeec8f..7c4829988 100644 --- a/packages/pipeline/test/entities/dex_trades_test.ts +++ b/packages/pipeline/test/entities/dex_trades_test.ts @@ -33,6 +33,7 @@ const baseTrade = { takerAnnotation: '', protocol: 'Kyber Network Proxy', sellAddress: '0xbf2179859fc6d5bee9bf9158632dc51678a4100e', + tradeIndex: '3', }; const tradeWithNullAddresses: DexTrade = R.merge(baseTrade, { diff --git a/packages/pipeline/test/parsers/bloxy/index_test.ts b/packages/pipeline/test/parsers/bloxy/index_test.ts index 6aabb091d..d270bd2a7 100644 --- a/packages/pipeline/test/parsers/bloxy/index_test.ts +++ b/packages/pipeline/test/parsers/bloxy/index_test.ts @@ -17,6 +17,7 @@ const baseInput: BloxyTrade = { tx_time: '2018-11-21T09:06:28.000+00:00', tx_date: '2018-11-21', tx_sender: '0x00923b9a074762b93650716333b3e1473a15048e', + tradeIndex: '1', smart_contract_id: 7091917, smart_contract_address: '0x818e6fecd516ecc3849daf6845e3ec868087b755', contract_type: 'DEX/Kyber Network Proxy', @@ -40,6 +41,7 @@ const baseInput: BloxyTrade = { const baseExpected: DexTrade = { sourceUrl: BLOXY_DEX_TRADES_URL, txHash: '0xb93a7faf92efbbb5405c9a73cd4efd99702fe27c03ff22baee1f1b1e37b3a0bf', + tradeIndex: '1', txTimestamp: 1542791188000, txDate: '2018-11-21', txSender: '0x00923b9a074762b93650716333b3e1473a15048e', diff --git a/yarn.lock b/yarn.lock index 1b51eff11..3bc39a830 100644 --- a/yarn.lock +++ b/yarn.lock @@ -13495,6 +13495,15 @@ react-dom@^16.3.2: object-assign "^4.1.1" prop-types "^15.6.0" +react-dom@^16.4.2: + version "16.7.0" + resolved "https://registry.npmjs.org/react-dom/-/react-dom-16.7.0.tgz#a17b2a7ca89ee7390bc1ed5eb81783c7461748b8" + dependencies: + loose-envify "^1.1.0" + object-assign "^4.1.1" + prop-types "^15.6.2" + scheduler "^0.12.0" + react-dom@^16.5.2: version "16.5.2" resolved "https://registry.yarnpkg.com/react-dom/-/react-dom-16.5.2.tgz#b69ee47aa20bab5327b2b9d7c1fe2a30f2cfa9d7" @@ -13821,6 +13830,15 @@ react@^16.3.2: object-assign "^4.1.1" prop-types "^15.6.0" +react@^16.4.2: + version "16.7.0" + resolved "https://registry.npmjs.org/react/-/react-16.7.0.tgz#b674ec396b0a5715873b350446f7ea0802ab6381" + dependencies: + loose-envify "^1.1.0" + object-assign "^4.1.1" + prop-types "^15.6.2" + scheduler "^0.12.0" + react@^16.5.2: version "16.5.2" resolved "https://registry.yarnpkg.com/react/-/react-16.5.2.tgz#19f6b444ed139baa45609eee6dc3d318b3895d42" @@ -14709,6 +14727,13 @@ schedule@^0.5.0: dependencies: object-assign "^4.1.1" +scheduler@^0.12.0: + version "0.12.0" + resolved "https://registry.npmjs.org/scheduler/-/scheduler-0.12.0.tgz#8ab17699939c0aedc5a196a657743c496538647b" + dependencies: + loose-envify "^1.1.0" + object-assign "^4.1.1" + schema-utils@^0.4.4: version "0.4.7" resolved "https://registry.npmjs.org/schema-utils/-/schema-utils-0.4.7.tgz#ba74f597d2be2ea880131746ee17d0a093c68187" -- cgit v1.2.3 From 7b4a0d4f8aff373b632983400d28dfdce602114a Mon Sep 17 00:00:00 2001 From: fragosti Date: Wed, 30 Jan 2019 10:28:06 -0800 Subject: fix bloxy datasource getDexTradesAsync to allow for tx_hash repetitions in dedupe logic --- packages/pipeline/src/data_sources/bloxy/index.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/pipeline/src/data_sources/bloxy/index.ts b/packages/pipeline/src/data_sources/bloxy/index.ts index bba424f67..68764ca98 100644 --- a/packages/pipeline/src/data_sources/bloxy/index.ts +++ b/packages/pipeline/src/data_sources/bloxy/index.ts @@ -2,6 +2,7 @@ import axios from 'axios'; import * as R from 'ramda'; import { logUtils } from '@0x/utils'; + // URL to use for getting dex trades from Bloxy. export const BLOXY_DEX_TRADES_URL = 'https://bloxy.info/api/dex/trades'; // Number of trades to get at once. Must be less than or equal to MAX_OFFSET. @@ -76,8 +77,11 @@ export class BloxySource { */ public async getDexTradesAsync(lastSeenTimestamp: number): Promise { const allTrades = await this._scrapeAllDexTradesAsync(lastSeenTimestamp); - logUtils.log('Removing duplicate entries'); - const uniqueTrades = R.uniqBy(R.toString, allTrades) as BloxyTrade[]; + logUtils.log(`Removing duplicates from ${allTrades.length} entries`); + const uniqueTrades = R.uniqBy( + (trade: BloxyTrade) => `${trade.tradeIndex}-${trade.tx_hash}`, + allTrades, + ) as BloxyTrade[]; logUtils.log(`Removed ${allTrades.length - uniqueTrades.length} duplicate entries`); return uniqueTrades; } -- cgit v1.2.3 From 4996ae8d9d90c138776d6660965289ae7aba18de Mon Sep 17 00:00:00 2001 From: fragosti Date: Wed, 30 Jan 2019 10:30:50 -0800 Subject: run linter --- packages/pipeline/src/data_sources/bloxy/index.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/packages/pipeline/src/data_sources/bloxy/index.ts b/packages/pipeline/src/data_sources/bloxy/index.ts index 68764ca98..22ab195b3 100644 --- a/packages/pipeline/src/data_sources/bloxy/index.ts +++ b/packages/pipeline/src/data_sources/bloxy/index.ts @@ -78,10 +78,7 @@ export class BloxySource { public async getDexTradesAsync(lastSeenTimestamp: number): Promise { const allTrades = await this._scrapeAllDexTradesAsync(lastSeenTimestamp); logUtils.log(`Removing duplicates from ${allTrades.length} entries`); - const uniqueTrades = R.uniqBy( - (trade: BloxyTrade) => `${trade.tradeIndex}-${trade.tx_hash}`, - allTrades, - ) as BloxyTrade[]; + const uniqueTrades = R.uniqBy((trade: BloxyTrade) => `${trade.tradeIndex}-${trade.tx_hash}`, allTrades); logUtils.log(`Removed ${allTrades.length - uniqueTrades.length} duplicate entries`); return uniqueTrades; } -- cgit v1.2.3 From 4f3cab42131f0bbe5e3f08cc04a1d7f60c6d48a0 Mon Sep 17 00:00:00 2001 From: fragosti Date: Fri, 1 Feb 2019 15:50:04 -0800 Subject: add row deletion step to migration --- .../migrations/1548809952793-AllowDuplicateTxHashesInDexTrades.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/pipeline/migrations/1548809952793-AllowDuplicateTxHashesInDexTrades.ts b/packages/pipeline/migrations/1548809952793-AllowDuplicateTxHashesInDexTrades.ts index 1b396918e..21b08f0ef 100644 --- a/packages/pipeline/migrations/1548809952793-AllowDuplicateTxHashesInDexTrades.ts +++ b/packages/pipeline/migrations/1548809952793-AllowDuplicateTxHashesInDexTrades.ts @@ -6,7 +6,9 @@ export class AllowDuplicateTxHashesInDexTrades1548809952793 implements Migration public async up(queryRunner: QueryRunner): Promise { const dexTradesTable = await queryRunner.getTable(DEX_TRADES_TABLE_NAME); if (dexTradesTable) { - // Composite key goes from (source_url, tx_hash) to (trade_index, source_url, tx_hash) + // Need new primary key to be non-null. No default value makes sense, so drop table. + await queryRunner.query(`DELETE from ${DEX_TRADES_TABLE_NAME}`); + // Composite key goes from (source_url, tx_hash) to (source_url, tx_hash, trade_index) await queryRunner.addColumn( DEX_TRADES_TABLE_NAME, new TableColumn({ -- cgit v1.2.3