aboutsummaryrefslogtreecommitdiffstats
path: root/packages/pipeline/src
diff options
context:
space:
mode:
Diffstat (limited to 'packages/pipeline/src')
-rw-r--r--packages/pipeline/src/entities/index.ts1
-rw-r--r--packages/pipeline/src/entities/sra_order.ts4
-rw-r--r--packages/pipeline/src/entities/sra_order_observed_timestamp.ts30
-rw-r--r--packages/pipeline/src/ormconfig.ts2
-rw-r--r--packages/pipeline/src/parsers/sra_orders/index.ts4
-rw-r--r--packages/pipeline/src/scripts/pull_radar_relay_orders.ts33
6 files changed, 52 insertions, 22 deletions
diff --git a/packages/pipeline/src/entities/index.ts b/packages/pipeline/src/entities/index.ts
index 6e024f280..442eec9cd 100644
--- a/packages/pipeline/src/entities/index.ts
+++ b/packages/pipeline/src/entities/index.ts
@@ -6,3 +6,4 @@ export { Relayer } from './relayer';
export { SraOrder } from './sra_order';
export { Transaction } from './transaction';
export { TokenOnChainMetadata } from './token_on_chain_metadata';
+export { SraOrdersObservedTimeStamp, createObservedTimestampForOrder } from './sra_order_observed_timestamp';
diff --git a/packages/pipeline/src/entities/sra_order.ts b/packages/pipeline/src/entities/sra_order.ts
index 4b7f652d3..2bdb1ba2e 100644
--- a/packages/pipeline/src/entities/sra_order.ts
+++ b/packages/pipeline/src/entities/sra_order.ts
@@ -19,9 +19,9 @@ export class SraOrder {
public feeRecipientAddress!: string;
@Column({ name: 'sender_address' })
public senderAddress!: string;
- @Column({ name: 'maker_asset_amount' })
+ @Column({ name: 'maker_asset_filled_amount' })
public makerAssetAmount!: string;
- @Column({ name: 'taker_asset_amount' })
+ @Column({ name: 'taker_asset_filled_amount' })
public takerAssetAmount!: string;
@Column({ name: 'maker_fee' })
public makerFee!: string;
diff --git a/packages/pipeline/src/entities/sra_order_observed_timestamp.ts b/packages/pipeline/src/entities/sra_order_observed_timestamp.ts
new file mode 100644
index 000000000..bdb6cd36b
--- /dev/null
+++ b/packages/pipeline/src/entities/sra_order_observed_timestamp.ts
@@ -0,0 +1,30 @@
+import { Entity, PrimaryColumn } from 'typeorm';
+
+import { SraOrder } from './sra_order';
+
+@Entity({ name: 'sra_orders_observed_timestamps', schema: 'raw' })
+export class SraOrdersObservedTimeStamp {
+ @PrimaryColumn({ name: 'exchange_address' })
+ public exchangeAddress!: string;
+ @PrimaryColumn({ name: 'order_hash_hex' })
+ public orderHashHex!: string;
+ @PrimaryColumn({ name: 'source_url' })
+ public sourceUrl!: string;
+
+ @PrimaryColumn({ name: 'observed_timestamp' })
+ public observedTimestamp!: number;
+}
+
+/**
+ * Returns a new SraOrdersObservedTimeStamp for the given order based on the
+ * current time.
+ * @param order The order to generate a timestamp for.
+ */
+export function createObservedTimestampForOrder(order: SraOrder): SraOrdersObservedTimeStamp {
+ const observed = new SraOrdersObservedTimeStamp();
+ observed.exchangeAddress = order.exchangeAddress;
+ observed.orderHashHex = order.orderHashHex;
+ observed.sourceUrl = order.sourceUrl;
+ observed.observedTimestamp = Date.now();
+ return observed;
+}
diff --git a/packages/pipeline/src/ormconfig.ts b/packages/pipeline/src/ormconfig.ts
index 2fb6b3d3c..2f5f7df33 100644
--- a/packages/pipeline/src/ormconfig.ts
+++ b/packages/pipeline/src/ormconfig.ts
@@ -7,6 +7,7 @@ import {
ExchangeFillEvent,
Relayer,
SraOrder,
+ SraOrdersObservedTimeStamp,
Transaction,
} from './entities';
@@ -17,6 +18,7 @@ const entities = [
ExchangeFillEvent,
Relayer,
SraOrder,
+ SraOrdersObservedTimeStamp,
Transaction,
];
diff --git a/packages/pipeline/src/parsers/sra_orders/index.ts b/packages/pipeline/src/parsers/sra_orders/index.ts
index 3d7f73fca..800521955 100644
--- a/packages/pipeline/src/parsers/sra_orders/index.ts
+++ b/packages/pipeline/src/parsers/sra_orders/index.ts
@@ -30,10 +30,6 @@ export function _convertToEntity(apiOrder: APIOrder): SraOrder {
sraOrder.exchangeAddress = apiOrder.order.exchangeAddress;
sraOrder.orderHashHex = orderHashUtils.getOrderHashHex(apiOrder.order);
- // TODO(albrow): Set these fields to the correct values upstack.
- sraOrder.lastUpdatedTimestamp = 0;
- sraOrder.firstSeenTimestamp = 0;
-
sraOrder.makerAddress = apiOrder.order.makerAddress;
sraOrder.takerAddress = apiOrder.order.takerAddress;
sraOrder.feeRecipientAddress = apiOrder.order.feeRecipientAddress;
diff --git a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts
index c4d3f7095..b3a4d887e 100644
--- a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts
+++ b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts
@@ -2,15 +2,14 @@
import { HttpClient } from '@0x/connect';
import * as R from 'ramda';
import 'reflect-metadata';
-import { Connection, ConnectionOptions, createConnection } from 'typeorm';
+import { Connection, ConnectionOptions, createConnection, EntityManager } from 'typeorm';
-import { SraOrder } from '../entities';
+import { createObservedTimestampForOrder, SraOrder } from '../entities';
import * as ormConfig from '../ormconfig';
import { parseSraOrders } from '../parsers/sra_orders';
import { handleError } from '../utils';
const RADAR_RELAY_URL = 'https://api.radarrelay.com/0x/v2';
-const BATCH_SAVE_SIZE = 1000; // Number of orders to save at once.
const ORDERS_PER_PAGE = 10000; // Number of orders to get per request.
let connection: Connection;
@@ -29,24 +28,26 @@ async function getOrderbook(): Promise<void> {
});
console.log(`Got ${rawOrders.records.length} orders.`);
console.log('Parsing orders...');
+ // Parse the sra orders, then add source url to each.
const orders = R.pipe(parseSraOrders, R.map(setSourceUrl(RADAR_RELAY_URL)))(rawOrders);
- const ordersRepository = connection.getRepository(SraOrder);
- // TODO(albrow): Move batch saving to a utility function to reduce
- // duplicated code.
- for (const ordersBatch of R.splitEvery(BATCH_SAVE_SIZE, orders)) {
- await ordersRepository.save(ordersBatch);
- }
+ // Save all the orders and update the observed time stamps in a single
+ // transaction.
+ console.log('Saving orders and updating timestamps...');
+ await connection.transaction(async (manager: EntityManager): Promise<void> => {
+ for (const order of orders) {
+ await manager.save(SraOrder, order);
+ const observedTimestamp = createObservedTimestampForOrder(order);
+ await manager.save(observedTimestamp);
+ }
+ });
}
const sourceUrlProp = R.lensProp('sourceUrl');
+/**
+ * Sets the source url for a single order. Returns a new order instead of
+ * mutating the given one.
+ */
const setSourceUrl = R.curry((sourceURL: string, order: SraOrder): SraOrder => {
return R.set(sourceUrlProp, sourceURL, order);
});
-
-const firstSeenProp = R.lensProp('firstSeenTimestamp');
-const lastUpdatedProp = R.lensProp('lastUpdatedTimestamp');
-
-const setFirstSeen = R.curry((sourceURL: string, order: SraOrder): SraOrder => {
- return R.set(firstSeenTimestampProp, sourceURL, order);
-});