diff options
Diffstat (limited to 'packages')
-rw-r--r-- | packages/pipeline/src/entities/index.ts | 1 | ||||
-rw-r--r-- | packages/pipeline/src/entities/sra_order.ts | 4 | ||||
-rw-r--r-- | packages/pipeline/src/entities/sra_order_observed_timestamp.ts | 30 | ||||
-rw-r--r-- | packages/pipeline/src/ormconfig.ts | 2 | ||||
-rw-r--r-- | packages/pipeline/src/parsers/sra_orders/index.ts | 4 | ||||
-rw-r--r-- | packages/pipeline/src/scripts/pull_radar_relay_orders.ts | 33 | ||||
-rw-r--r-- | packages/pipeline/test/parsers/sra_orders/index_test.ts | 2 |
7 files changed, 52 insertions, 24 deletions
diff --git a/packages/pipeline/src/entities/index.ts b/packages/pipeline/src/entities/index.ts index 6e024f280..442eec9cd 100644 --- a/packages/pipeline/src/entities/index.ts +++ b/packages/pipeline/src/entities/index.ts @@ -6,3 +6,4 @@ export { Relayer } from './relayer'; export { SraOrder } from './sra_order'; export { Transaction } from './transaction'; export { TokenOnChainMetadata } from './token_on_chain_metadata'; +export { SraOrdersObservedTimeStamp, createObservedTimestampForOrder } from './sra_order_observed_timestamp'; diff --git a/packages/pipeline/src/entities/sra_order.ts b/packages/pipeline/src/entities/sra_order.ts index 4b7f652d3..2bdb1ba2e 100644 --- a/packages/pipeline/src/entities/sra_order.ts +++ b/packages/pipeline/src/entities/sra_order.ts @@ -19,9 +19,9 @@ export class SraOrder { public feeRecipientAddress!: string; @Column({ name: 'sender_address' }) public senderAddress!: string; - @Column({ name: 'maker_asset_amount' }) + @Column({ name: 'maker_asset_filled_amount' }) public makerAssetAmount!: string; - @Column({ name: 'taker_asset_amount' }) + @Column({ name: 'taker_asset_filled_amount' }) public takerAssetAmount!: string; @Column({ name: 'maker_fee' }) public makerFee!: string; diff --git a/packages/pipeline/src/entities/sra_order_observed_timestamp.ts b/packages/pipeline/src/entities/sra_order_observed_timestamp.ts new file mode 100644 index 000000000..bdb6cd36b --- /dev/null +++ b/packages/pipeline/src/entities/sra_order_observed_timestamp.ts @@ -0,0 +1,30 @@ +import { Entity, PrimaryColumn } from 'typeorm'; + +import { SraOrder } from './sra_order'; + +@Entity({ name: 'sra_orders_observed_timestamps', schema: 'raw' }) +export class SraOrdersObservedTimeStamp { + @PrimaryColumn({ name: 'exchange_address' }) + public exchangeAddress!: string; + @PrimaryColumn({ name: 'order_hash_hex' }) + public orderHashHex!: string; + @PrimaryColumn({ name: 'source_url' }) + public sourceUrl!: string; + + @PrimaryColumn({ name: 'observed_timestamp' }) + public observedTimestamp!: number; +} + +/** + * Returns a new SraOrdersObservedTimeStamp for the given order based on the + * current time. + * @param order The order to generate a timestamp for. + */ +export function createObservedTimestampForOrder(order: SraOrder): SraOrdersObservedTimeStamp { + const observed = new SraOrdersObservedTimeStamp(); + observed.exchangeAddress = order.exchangeAddress; + observed.orderHashHex = order.orderHashHex; + observed.sourceUrl = order.sourceUrl; + observed.observedTimestamp = Date.now(); + return observed; +} diff --git a/packages/pipeline/src/ormconfig.ts b/packages/pipeline/src/ormconfig.ts index 2fb6b3d3c..2f5f7df33 100644 --- a/packages/pipeline/src/ormconfig.ts +++ b/packages/pipeline/src/ormconfig.ts @@ -7,6 +7,7 @@ import { ExchangeFillEvent, Relayer, SraOrder, + SraOrdersObservedTimeStamp, Transaction, } from './entities'; @@ -17,6 +18,7 @@ const entities = [ ExchangeFillEvent, Relayer, SraOrder, + SraOrdersObservedTimeStamp, Transaction, ]; diff --git a/packages/pipeline/src/parsers/sra_orders/index.ts b/packages/pipeline/src/parsers/sra_orders/index.ts index 3d7f73fca..800521955 100644 --- a/packages/pipeline/src/parsers/sra_orders/index.ts +++ b/packages/pipeline/src/parsers/sra_orders/index.ts @@ -30,10 +30,6 @@ export function _convertToEntity(apiOrder: APIOrder): SraOrder { sraOrder.exchangeAddress = apiOrder.order.exchangeAddress; sraOrder.orderHashHex = orderHashUtils.getOrderHashHex(apiOrder.order); - // TODO(albrow): Set these fields to the correct values upstack. - sraOrder.lastUpdatedTimestamp = 0; - sraOrder.firstSeenTimestamp = 0; - sraOrder.makerAddress = apiOrder.order.makerAddress; sraOrder.takerAddress = apiOrder.order.takerAddress; sraOrder.feeRecipientAddress = apiOrder.order.feeRecipientAddress; diff --git a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts index c4d3f7095..b3a4d887e 100644 --- a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts +++ b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts @@ -2,15 +2,14 @@ import { HttpClient } from '@0x/connect'; import * as R from 'ramda'; import 'reflect-metadata'; -import { Connection, ConnectionOptions, createConnection } from 'typeorm'; +import { Connection, ConnectionOptions, createConnection, EntityManager } from 'typeorm'; -import { SraOrder } from '../entities'; +import { createObservedTimestampForOrder, SraOrder } from '../entities'; import * as ormConfig from '../ormconfig'; import { parseSraOrders } from '../parsers/sra_orders'; import { handleError } from '../utils'; const RADAR_RELAY_URL = 'https://api.radarrelay.com/0x/v2'; -const BATCH_SAVE_SIZE = 1000; // Number of orders to save at once. const ORDERS_PER_PAGE = 10000; // Number of orders to get per request. let connection: Connection; @@ -29,24 +28,26 @@ async function getOrderbook(): Promise<void> { }); console.log(`Got ${rawOrders.records.length} orders.`); console.log('Parsing orders...'); + // Parse the sra orders, then add source url to each. const orders = R.pipe(parseSraOrders, R.map(setSourceUrl(RADAR_RELAY_URL)))(rawOrders); - const ordersRepository = connection.getRepository(SraOrder); - // TODO(albrow): Move batch saving to a utility function to reduce - // duplicated code. - for (const ordersBatch of R.splitEvery(BATCH_SAVE_SIZE, orders)) { - await ordersRepository.save(ordersBatch); - } + // Save all the orders and update the observed time stamps in a single + // transaction. + console.log('Saving orders and updating timestamps...'); + await connection.transaction(async (manager: EntityManager): Promise<void> => { + for (const order of orders) { + await manager.save(SraOrder, order); + const observedTimestamp = createObservedTimestampForOrder(order); + await manager.save(observedTimestamp); + } + }); } const sourceUrlProp = R.lensProp('sourceUrl'); +/** + * Sets the source url for a single order. Returns a new order instead of + * mutating the given one. + */ const setSourceUrl = R.curry((sourceURL: string, order: SraOrder): SraOrder => { return R.set(sourceUrlProp, sourceURL, order); }); - -const firstSeenProp = R.lensProp('firstSeenTimestamp'); -const lastUpdatedProp = R.lensProp('lastUpdatedTimestamp'); - -const setFirstSeen = R.curry((sourceURL: string, order: SraOrder): SraOrder => { - return R.set(firstSeenTimestampProp, sourceURL, order); -}); diff --git a/packages/pipeline/test/parsers/sra_orders/index_test.ts b/packages/pipeline/test/parsers/sra_orders/index_test.ts index 4b2e7c2d0..534d84ab3 100644 --- a/packages/pipeline/test/parsers/sra_orders/index_test.ts +++ b/packages/pipeline/test/parsers/sra_orders/index_test.ts @@ -37,8 +37,6 @@ describe('sra_orders', () => { const expected = new SraOrder(); expected.exchangeAddress = '0x4f833a24e1f95d70f028921e27040ca56e09ab0b'; expected.orderHashHex = '0x1bdbeb0d088a33da28b9ee6d94e8771452f90f4a69107da2fa75195d61b9a1c9'; - expected.lastUpdatedTimestamp = 0; - expected.firstSeenTimestamp = 0; expected.makerAddress = '0xb45df06e38540a675fdb5b598abf2c0dbe9d6b81'; expected.takerAddress = '0x0000000000000000000000000000000000000000'; expected.feeRecipientAddress = '0xa258b39954cef5cb142fd567a46cddb31a670124'; |