aboutsummaryrefslogtreecommitdiffstats
path: root/packages/pipeline/src
diff options
context:
space:
mode:
Diffstat (limited to 'packages/pipeline/src')
-rw-r--r--packages/pipeline/src/data_sources/bloxy/index.ts133
-rw-r--r--packages/pipeline/src/data_sources/contract-wrappers/erc20_events.ts45
-rw-r--r--packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts59
-rw-r--r--packages/pipeline/src/data_sources/contract-wrappers/utils.ts67
-rw-r--r--packages/pipeline/src/data_sources/copper/index.ts126
-rw-r--r--packages/pipeline/src/data_sources/ddex/index.ts78
-rw-r--r--packages/pipeline/src/data_sources/idex/index.ts82
-rw-r--r--packages/pipeline/src/data_sources/oasis/index.ts103
-rw-r--r--packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts110
-rw-r--r--packages/pipeline/src/data_sources/paradex/index.ts92
-rw-r--r--packages/pipeline/src/data_sources/relayer-registry/index.ts33
-rw-r--r--packages/pipeline/src/data_sources/trusted_tokens/index.ts29
-rw-r--r--packages/pipeline/src/data_sources/web3/index.ts22
-rw-r--r--packages/pipeline/src/entities/block.ts13
-rw-r--r--packages/pipeline/src/entities/copper_activity.ts41
-rw-r--r--packages/pipeline/src/entities/copper_activity_type.ts17
-rw-r--r--packages/pipeline/src/entities/copper_custom_field.ts15
-rw-r--r--packages/pipeline/src/entities/copper_lead.ts38
-rw-r--r--packages/pipeline/src/entities/copper_opportunity.ts45
-rw-r--r--packages/pipeline/src/entities/dex_trade.ts54
-rw-r--r--packages/pipeline/src/entities/erc20_approval_event.ts26
-rw-r--r--packages/pipeline/src/entities/exchange_cancel_event.ts51
-rw-r--r--packages/pipeline/src/entities/exchange_cancel_up_to_event.ts26
-rw-r--r--packages/pipeline/src/entities/exchange_fill_event.ts60
-rw-r--r--packages/pipeline/src/entities/index.ts25
-rw-r--r--packages/pipeline/src/entities/ohlcv_external.ts30
-rw-r--r--packages/pipeline/src/entities/relayer.ts21
-rw-r--r--packages/pipeline/src/entities/sra_order.ts63
-rw-r--r--packages/pipeline/src/entities/sra_order_observed_timestamp.ts35
-rw-r--r--packages/pipeline/src/entities/token_metadata.ts22
-rw-r--r--packages/pipeline/src/entities/token_order.ts28
-rw-r--r--packages/pipeline/src/entities/transaction.ts19
-rw-r--r--packages/pipeline/src/ormconfig.ts54
-rw-r--r--packages/pipeline/src/parsers/bloxy/index.ts53
-rw-r--r--packages/pipeline/src/parsers/copper/index.ts259
-rw-r--r--packages/pipeline/src/parsers/ddex_orders/index.ts71
-rw-r--r--packages/pipeline/src/parsers/events/erc20_events.ts34
-rw-r--r--packages/pipeline/src/parsers/events/exchange_events.ts145
-rw-r--r--packages/pipeline/src/parsers/events/index.ts2
-rw-r--r--packages/pipeline/src/parsers/idex_orders/index.ts81
-rw-r--r--packages/pipeline/src/parsers/oasis_orders/index.ts71
-rw-r--r--packages/pipeline/src/parsers/ohlcv_external/crypto_compare.ts38
-rw-r--r--packages/pipeline/src/parsers/paradex_orders/index.ts66
-rw-r--r--packages/pipeline/src/parsers/relayer_registry/index.ts37
-rw-r--r--packages/pipeline/src/parsers/sra_orders/index.ts68
-rw-r--r--packages/pipeline/src/parsers/token_metadata/index.ts46
-rw-r--r--packages/pipeline/src/parsers/utils.ts28
-rw-r--r--packages/pipeline/src/parsers/web3/index.ts49
-rw-r--r--packages/pipeline/src/scripts/pull_competing_dex_trades.ts51
-rw-r--r--packages/pipeline/src/scripts/pull_copper.ts129
-rw-r--r--packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts55
-rw-r--r--packages/pipeline/src/scripts/pull_erc20_events.ts96
-rw-r--r--packages/pipeline/src/scripts/pull_exchange_events.ts146
-rw-r--r--packages/pipeline/src/scripts/pull_idex_orderbook_snapshots.ts63
-rw-r--r--packages/pipeline/src/scripts/pull_missing_blocks.ts90
-rw-r--r--packages/pipeline/src/scripts/pull_oasis_orderbook_snapshots.ts58
-rw-r--r--packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts95
-rw-r--r--packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts87
-rw-r--r--packages/pipeline/src/scripts/pull_radar_relay_orders.ts54
-rw-r--r--packages/pipeline/src/scripts/pull_trusted_tokens.ts52
-rw-r--r--packages/pipeline/src/scripts/update_relayer_info.ts33
-rw-r--r--packages/pipeline/src/types.ts9
-rw-r--r--packages/pipeline/src/utils/constants.ts3
-rw-r--r--packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts116
-rw-r--r--packages/pipeline/src/utils/index.ts53
-rw-r--r--packages/pipeline/src/utils/transformers/asset_proxy_id_types.ts20
-rw-r--r--packages/pipeline/src/utils/transformers/big_number.ts16
-rw-r--r--packages/pipeline/src/utils/transformers/index.ts3
-rw-r--r--packages/pipeline/src/utils/transformers/number_to_bigint.ts31
69 files changed, 3970 insertions, 0 deletions
diff --git a/packages/pipeline/src/data_sources/bloxy/index.ts b/packages/pipeline/src/data_sources/bloxy/index.ts
new file mode 100644
index 000000000..94468d25a
--- /dev/null
+++ b/packages/pipeline/src/data_sources/bloxy/index.ts
@@ -0,0 +1,133 @@
+import axios from 'axios';
+import * as R from 'ramda';
+
+// URL to use for getting dex trades from Bloxy.
+export const BLOXY_DEX_TRADES_URL = 'https://bloxy.info/api/dex/trades';
+// Number of trades to get at once. Must be less than or equal to MAX_OFFSET.
+const TRADES_PER_QUERY = 10000;
+// Maximum offset supported by the Bloxy API.
+const MAX_OFFSET = 100000;
+// Buffer to subtract from offset. This means we will request some trades twice
+// but we have less chance on missing out on any data.
+const OFFSET_BUFFER = 1000;
+// Maximum number of days supported by the Bloxy API.
+const MAX_DAYS = 30;
+// Buffer used for comparing the last seen timestamp to the last returned
+// timestamp. Increasing this reduces chances of data loss but also creates more
+// redundancy and can impact performance.
+// tslint:disable-next-line:custom-no-magic-numbers
+const LAST_SEEN_TIMESTAMP_BUFFER_MS = 1000 * 60 * 30; // 30 minutes
+
+// tslint:disable-next-line:custom-no-magic-numbers
+const millisecondsPerDay = 1000 * 60 * 60 * 24; // ms/d = ms/s * s/m * m/h * h/d
+
+export interface BloxyTrade {
+ tx_hash: string;
+ tx_time: string;
+ tx_date: string;
+ tx_sender: string;
+ smart_contract_id: number;
+ smart_contract_address: string;
+ contract_type: string;
+ maker: string;
+ taker: string;
+ amountBuy: number;
+ makerFee: number;
+ buyCurrencyId: number;
+ buySymbol: string;
+ amountSell: number;
+ takerFee: number;
+ sellCurrencyId: number;
+ sellSymbol: string;
+ maker_annotation: string;
+ taker_annotation: string;
+ protocol: string;
+ buyAddress: string | null;
+ sellAddress: string | null;
+}
+
+interface BloxyError {
+ error: string;
+}
+
+type BloxyResponse<T> = T | BloxyError;
+type BloxyTradeResponse = BloxyResponse<BloxyTrade[]>;
+
+function isError<T>(response: BloxyResponse<T>): response is BloxyError {
+ return (response as BloxyError).error !== undefined;
+}
+
+export class BloxySource {
+ private readonly _apiKey: string;
+
+ constructor(apiKey: string) {
+ this._apiKey = apiKey;
+ }
+
+ /**
+ * Gets all latest trades between the lastSeenTimestamp (minus some buffer)
+ * and the current time. Note that because the Bloxy API has some hard
+ * limits it might not always be possible to get *all* the trades in the
+ * desired time range.
+ * @param lastSeenTimestamp The latest timestamp for trades that have
+ * already been seen.
+ */
+ public async getDexTradesAsync(lastSeenTimestamp: number): Promise<BloxyTrade[]> {
+ let allTrades: BloxyTrade[] = [];
+
+ // Clamp numberOfDays so that it is always between 1 and MAX_DAYS (inclusive)
+ const numberOfDays = R.clamp(1, MAX_DAYS, getDaysSinceTimestamp(lastSeenTimestamp));
+
+ // Keep getting trades until we hit one of the following conditions:
+ //
+ // 1. Offset hits MAX_OFFSET (we can't go back any further).
+ // 2. There are no more trades in the response.
+ // 3. We see a tx_time equal to or earlier than lastSeenTimestamp (plus
+ // some buffer).
+ //
+ for (let offset = 0; offset <= MAX_OFFSET; offset += TRADES_PER_QUERY - OFFSET_BUFFER) {
+ const trades = await this._getTradesWithOffsetAsync(numberOfDays, offset);
+ if (trades.length === 0) {
+ // There are no more trades left for the days we are querying.
+ // This means we are done.
+ return filterDuplicateTrades(allTrades);
+ }
+ const sortedTrades = R.reverse(R.sortBy(trade => trade.tx_time, trades));
+ allTrades = allTrades.concat(sortedTrades);
+
+ // Check if lastReturnedTimestamp < lastSeenTimestamp
+ const lastReturnedTimestamp = new Date(sortedTrades[0].tx_time).getTime();
+ if (lastReturnedTimestamp < lastSeenTimestamp - LAST_SEEN_TIMESTAMP_BUFFER_MS) {
+ // We are at the point where we have already seen trades for the
+ // timestamp range that is being returned. We're done.
+ return filterDuplicateTrades(allTrades);
+ }
+ }
+ return filterDuplicateTrades(allTrades);
+ }
+
+ private async _getTradesWithOffsetAsync(numberOfDays: number, offset: number): Promise<BloxyTrade[]> {
+ const resp = await axios.get<BloxyTradeResponse>(BLOXY_DEX_TRADES_URL, {
+ params: {
+ key: this._apiKey,
+ days: numberOfDays,
+ limit: TRADES_PER_QUERY,
+ offset,
+ },
+ });
+ if (isError(resp.data)) {
+ throw new Error(`Error in Bloxy API response: ${resp.data.error}`);
+ }
+ return resp.data;
+ }
+}
+
+// Computes the number of days between the given timestamp and the current
+// timestamp (rounded up).
+function getDaysSinceTimestamp(timestamp: number): number {
+ const msSinceTimestamp = Date.now() - timestamp;
+ const daysSinceTimestamp = msSinceTimestamp / millisecondsPerDay;
+ return Math.ceil(daysSinceTimestamp);
+}
+
+const filterDuplicateTrades = R.uniqBy((trade: BloxyTrade) => trade.tx_hash);
diff --git a/packages/pipeline/src/data_sources/contract-wrappers/erc20_events.ts b/packages/pipeline/src/data_sources/contract-wrappers/erc20_events.ts
new file mode 100644
index 000000000..e0098122f
--- /dev/null
+++ b/packages/pipeline/src/data_sources/contract-wrappers/erc20_events.ts
@@ -0,0 +1,45 @@
+import {
+ ContractWrappers,
+ ERC20TokenApprovalEventArgs,
+ ERC20TokenEvents,
+ ERC20TokenWrapper,
+} from '@0x/contract-wrappers';
+import { Web3ProviderEngine } from '@0x/subproviders';
+import { LogWithDecodedArgs } from 'ethereum-types';
+
+import { GetEventsFunc, getEventsWithPaginationAsync } from './utils';
+
+export class ERC20EventsSource {
+ private readonly _erc20Wrapper: ERC20TokenWrapper;
+ private readonly _tokenAddress: string;
+ constructor(provider: Web3ProviderEngine, networkId: number, tokenAddress: string) {
+ const contractWrappers = new ContractWrappers(provider, { networkId });
+ this._erc20Wrapper = contractWrappers.erc20Token;
+ this._tokenAddress = tokenAddress;
+ }
+
+ public async getApprovalEventsAsync(
+ startBlock: number,
+ endBlock: number,
+ ): Promise<Array<LogWithDecodedArgs<ERC20TokenApprovalEventArgs>>> {
+ return getEventsWithPaginationAsync(
+ this._getApprovalEventsForRangeAsync.bind(this) as GetEventsFunc<ERC20TokenApprovalEventArgs>,
+ startBlock,
+ endBlock,
+ );
+ }
+
+ // Gets all approval events of for a specific sub-range. This getter
+ // function will be called during each step of pagination.
+ private async _getApprovalEventsForRangeAsync(
+ fromBlock: number,
+ toBlock: number,
+ ): Promise<Array<LogWithDecodedArgs<ERC20TokenApprovalEventArgs>>> {
+ return this._erc20Wrapper.getLogsAsync<ERC20TokenApprovalEventArgs>(
+ this._tokenAddress,
+ ERC20TokenEvents.Approval,
+ { fromBlock, toBlock },
+ {},
+ );
+ }
+}
diff --git a/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts b/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts
new file mode 100644
index 000000000..58691e2ab
--- /dev/null
+++ b/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts
@@ -0,0 +1,59 @@
+import {
+ ContractWrappers,
+ ExchangeCancelEventArgs,
+ ExchangeCancelUpToEventArgs,
+ ExchangeEventArgs,
+ ExchangeEvents,
+ ExchangeFillEventArgs,
+ ExchangeWrapper,
+} from '@0x/contract-wrappers';
+import { Web3ProviderEngine } from '@0x/subproviders';
+import { LogWithDecodedArgs } from 'ethereum-types';
+
+import { GetEventsFunc, getEventsWithPaginationAsync } from './utils';
+
+export class ExchangeEventsSource {
+ private readonly _exchangeWrapper: ExchangeWrapper;
+ constructor(provider: Web3ProviderEngine, networkId: number) {
+ const contractWrappers = new ContractWrappers(provider, { networkId });
+ this._exchangeWrapper = contractWrappers.exchange;
+ }
+
+ public async getFillEventsAsync(
+ startBlock: number,
+ endBlock: number,
+ ): Promise<Array<LogWithDecodedArgs<ExchangeFillEventArgs>>> {
+ const getFillEventsForRangeAsync = this._makeGetterFuncForEventType<ExchangeFillEventArgs>(ExchangeEvents.Fill);
+ return getEventsWithPaginationAsync(getFillEventsForRangeAsync, startBlock, endBlock);
+ }
+
+ public async getCancelEventsAsync(
+ startBlock: number,
+ endBlock: number,
+ ): Promise<Array<LogWithDecodedArgs<ExchangeCancelEventArgs>>> {
+ const getCancelEventsForRangeAsync = this._makeGetterFuncForEventType<ExchangeCancelEventArgs>(
+ ExchangeEvents.Cancel,
+ );
+ return getEventsWithPaginationAsync(getCancelEventsForRangeAsync, startBlock, endBlock);
+ }
+
+ public async getCancelUpToEventsAsync(
+ startBlock: number,
+ endBlock: number,
+ ): Promise<Array<LogWithDecodedArgs<ExchangeCancelUpToEventArgs>>> {
+ const getCancelUpToEventsForRangeAsync = this._makeGetterFuncForEventType<ExchangeCancelUpToEventArgs>(
+ ExchangeEvents.CancelUpTo,
+ );
+ return getEventsWithPaginationAsync(getCancelUpToEventsForRangeAsync, startBlock, endBlock);
+ }
+
+ // Returns a getter function which gets all events of a specific type for a
+ // specific sub-range. This getter function will be called during each step
+ // of pagination.
+ private _makeGetterFuncForEventType<ArgsType extends ExchangeEventArgs>(
+ eventType: ExchangeEvents,
+ ): GetEventsFunc<ArgsType> {
+ return async (fromBlock: number, toBlock: number) =>
+ this._exchangeWrapper.getLogsAsync<ArgsType>(eventType, { fromBlock, toBlock }, {});
+ }
+}
diff --git a/packages/pipeline/src/data_sources/contract-wrappers/utils.ts b/packages/pipeline/src/data_sources/contract-wrappers/utils.ts
new file mode 100644
index 000000000..67660a37e
--- /dev/null
+++ b/packages/pipeline/src/data_sources/contract-wrappers/utils.ts
@@ -0,0 +1,67 @@
+import { DecodedLogArgs, LogWithDecodedArgs } from 'ethereum-types';
+
+const NUM_BLOCKS_PER_QUERY = 10000; // Number of blocks to query for events at a time.
+const NUM_RETRIES = 3; // Number of retries if a request fails or times out.
+
+export type GetEventsFunc<ArgsType extends DecodedLogArgs> = (
+ fromBlock: number,
+ toBlock: number,
+) => Promise<Array<LogWithDecodedArgs<ArgsType>>>;
+
+/**
+ * Gets all events between the given startBlock and endBlock by querying for
+ * NUM_BLOCKS_PER_QUERY at a time. Accepts a getter function in order to
+ * maximize code re-use and allow for getting different types of events for
+ * different contracts. If the getter function throws with a retryable error,
+ * it will automatically be retried up to NUM_RETRIES times.
+ * @param getEventsAsync A getter function which will be called for each step during pagination.
+ * @param startBlock The start of the entire block range to get events for.
+ * @param endBlock The end of the entire block range to get events for.
+ */
+export async function getEventsWithPaginationAsync<ArgsType extends DecodedLogArgs>(
+ getEventsAsync: GetEventsFunc<ArgsType>,
+ startBlock: number,
+ endBlock: number,
+): Promise<Array<LogWithDecodedArgs<ArgsType>>> {
+ let events: Array<LogWithDecodedArgs<ArgsType>> = [];
+ for (let fromBlock = startBlock; fromBlock <= endBlock; fromBlock += NUM_BLOCKS_PER_QUERY) {
+ const toBlock = Math.min(fromBlock + NUM_BLOCKS_PER_QUERY - 1, endBlock);
+ const eventsInRange = await _getEventsWithRetriesAsync(getEventsAsync, NUM_RETRIES, fromBlock, toBlock);
+ events = events.concat(eventsInRange);
+ }
+ return events;
+}
+
+/**
+ * Calls the getEventsAsync function and retries up to numRetries times if it
+ * throws with an error that is considered retryable.
+ * @param getEventsAsync a function that will be called on each iteration.
+ * @param numRetries the maximum number times to retry getEventsAsync if it fails with a retryable error.
+ * @param fromBlock the start of the sub-range of blocks we are getting events for.
+ * @param toBlock the end of the sub-range of blocks we are getting events for.
+ */
+export async function _getEventsWithRetriesAsync<ArgsType extends DecodedLogArgs>(
+ getEventsAsync: GetEventsFunc<ArgsType>,
+ numRetries: number,
+ fromBlock: number,
+ toBlock: number,
+): Promise<Array<LogWithDecodedArgs<ArgsType>>> {
+ let eventsInRange: Array<LogWithDecodedArgs<ArgsType>> = [];
+ for (let i = 0; i <= numRetries; i++) {
+ try {
+ eventsInRange = await getEventsAsync(fromBlock, toBlock);
+ } catch (err) {
+ if (isErrorRetryable(err) && i < numRetries) {
+ continue;
+ } else {
+ throw err;
+ }
+ }
+ break;
+ }
+ return eventsInRange;
+}
+
+function isErrorRetryable(err: Error): boolean {
+ return err.message.includes('network timeout');
+}
diff --git a/packages/pipeline/src/data_sources/copper/index.ts b/packages/pipeline/src/data_sources/copper/index.ts
new file mode 100644
index 000000000..15df2fd7d
--- /dev/null
+++ b/packages/pipeline/src/data_sources/copper/index.ts
@@ -0,0 +1,126 @@
+import { fetchAsync } from '@0x/utils';
+import Bottleneck from 'bottleneck';
+
+import {
+ CopperActivityTypeCategory,
+ CopperActivityTypeResponse,
+ CopperCustomFieldResponse,
+ CopperSearchResponse,
+} from '../../parsers/copper';
+
+const HTTP_OK_STATUS = 200;
+const COPPER_URI = 'https://api.prosperworks.com/developer_api/v1';
+
+const DEFAULT_PAGINATION_PARAMS = {
+ page_size: 200,
+ sort_by: 'date_modified',
+ sort_direction: 'desc',
+};
+
+export type CopperSearchParams = CopperLeadSearchParams | CopperActivitySearchParams | CopperOpportunitySearchParams;
+export interface CopperLeadSearchParams {
+ page_number?: number;
+}
+
+export interface CopperActivitySearchParams {
+ minimum_activity_date: number;
+ page_number?: number;
+}
+
+export interface CopperOpportunitySearchParams {
+ sort_by: string; // must override the default 'date_modified' for this endpoint
+ page_number?: number;
+}
+export enum CopperEndpoint {
+ Leads = '/leads/search',
+ Opportunities = '/opportunities/search',
+ Activities = '/activities/search',
+}
+const ONE_SECOND = 1000;
+
+function httpErrorCheck(response: Response): void {
+ if (response.status !== HTTP_OK_STATUS) {
+ throw new Error(`HTTP error while scraping Copper: [${JSON.stringify(response)}]`);
+ }
+}
+export class CopperSource {
+ private readonly _accessToken: string;
+ private readonly _userEmail: string;
+ private readonly _defaultHeaders: any;
+ private readonly _limiter: Bottleneck;
+
+ constructor(maxConcurrentRequests: number, accessToken: string, userEmail: string) {
+ this._accessToken = accessToken;
+ this._userEmail = userEmail;
+ this._defaultHeaders = {
+ 'Content-Type': 'application/json',
+ 'X-PW-AccessToken': this._accessToken,
+ 'X-PW-Application': 'developer_api',
+ 'X-PW-UserEmail': this._userEmail,
+ };
+ this._limiter = new Bottleneck({
+ minTime: ONE_SECOND / maxConcurrentRequests,
+ reservoir: 30,
+ reservoirRefreshAmount: 30,
+ reservoirRefreshInterval: maxConcurrentRequests,
+ });
+ }
+
+ public async fetchNumberOfPagesAsync(endpoint: CopperEndpoint, searchParams?: CopperSearchParams): Promise<number> {
+ const resp = await this._limiter.schedule(() =>
+ fetchAsync(COPPER_URI + endpoint, {
+ method: 'POST',
+ body: JSON.stringify({ ...DEFAULT_PAGINATION_PARAMS, ...searchParams }),
+ headers: this._defaultHeaders,
+ }),
+ );
+
+ httpErrorCheck(resp);
+
+ // total number of records that match the request parameters
+ if (resp.headers.has('X-Pw-Total')) {
+ const totalRecords: number = parseInt(resp.headers.get('X-Pw-Total') as string, 10); // tslint:disable-line:custom-no-magic-numbers
+ return Math.ceil(totalRecords / DEFAULT_PAGINATION_PARAMS.page_size);
+ } else {
+ return 1;
+ }
+ }
+ public async fetchSearchResultsAsync<T extends CopperSearchResponse>(
+ endpoint: CopperEndpoint,
+ searchParams?: CopperSearchParams,
+ ): Promise<T[]> {
+ const request = { ...DEFAULT_PAGINATION_PARAMS, ...searchParams };
+ const response = await this._limiter.schedule(() =>
+ fetchAsync(COPPER_URI + endpoint, {
+ method: 'POST',
+ body: JSON.stringify(request),
+ headers: this._defaultHeaders,
+ }),
+ );
+ httpErrorCheck(response);
+ const json: T[] = await response.json();
+ return json;
+ }
+
+ public async fetchActivityTypesAsync(): Promise<Map<CopperActivityTypeCategory, CopperActivityTypeResponse[]>> {
+ const response = await this._limiter.schedule(() =>
+ fetchAsync(`${COPPER_URI}/activity_types`, {
+ method: 'GET',
+ headers: this._defaultHeaders,
+ }),
+ );
+ httpErrorCheck(response);
+ return response.json();
+ }
+
+ public async fetchCustomFieldsAsync(): Promise<CopperCustomFieldResponse[]> {
+ const response = await this._limiter.schedule(() =>
+ fetchAsync(`${COPPER_URI}/custom_field_definitions`, {
+ method: 'GET',
+ headers: this._defaultHeaders,
+ }),
+ );
+ httpErrorCheck(response);
+ return response.json();
+ }
+}
diff --git a/packages/pipeline/src/data_sources/ddex/index.ts b/packages/pipeline/src/data_sources/ddex/index.ts
new file mode 100644
index 000000000..2bbd8c29b
--- /dev/null
+++ b/packages/pipeline/src/data_sources/ddex/index.ts
@@ -0,0 +1,78 @@
+import { fetchAsync, logUtils } from '@0x/utils';
+
+const DDEX_BASE_URL = 'https://api.ddex.io/v2';
+const ACTIVE_MARKETS_URL = `${DDEX_BASE_URL}/markets`;
+const NO_AGGREGATION_LEVEL = 3; // See https://docs.ddex.io/#get-orderbook
+const ORDERBOOK_ENDPOINT = `/orderbook?level=${NO_AGGREGATION_LEVEL}`;
+export const DDEX_SOURCE = 'ddex';
+
+export interface DdexActiveMarketsResponse {
+ status: number;
+ desc: string;
+ data: {
+ markets: DdexMarket[];
+ };
+}
+
+export interface DdexMarket {
+ id: string;
+ quoteToken: string;
+ quoteTokenDecimals: number;
+ quoteTokenAddress: string;
+ baseToken: string;
+ baseTokenDecimals: number;
+ baseTokenAddress: string;
+ minOrderSize: string;
+ maxOrderSize: string;
+ pricePrecision: number;
+ priceDecimals: number;
+ amountDecimals: number;
+}
+
+export interface DdexOrderbookResponse {
+ status: number;
+ desc: string;
+ data: {
+ orderBook: DdexOrderbook;
+ };
+}
+
+export interface DdexOrderbook {
+ marketId: string;
+ bids: DdexOrder[];
+ asks: DdexOrder[];
+}
+
+export interface DdexOrder {
+ price: string;
+ amount: string;
+ orderId: string;
+}
+
+// tslint:disable:prefer-function-over-method
+// ^ Keep consistency with other sources and help logical organization
+export class DdexSource {
+ /**
+ * Call Ddex API to find out which markets they are maintaining orderbooks for.
+ */
+ public async getActiveMarketsAsync(): Promise<DdexMarket[]> {
+ logUtils.log('Getting all active DDEX markets');
+ const resp = await fetchAsync(ACTIVE_MARKETS_URL);
+ const respJson: DdexActiveMarketsResponse = await resp.json();
+ const markets = respJson.data.markets;
+ logUtils.log(`Got ${markets.length} markets.`);
+ return markets;
+ }
+
+ /**
+ * Retrieve orderbook from Ddex API for a given market.
+ * @param marketId String identifying the market we want data for. Eg. 'REP/AUG'
+ */
+ public async getMarketOrderbookAsync(marketId: string): Promise<DdexOrderbook> {
+ logUtils.log(`${marketId}: Retrieving orderbook.`);
+ const marketOrderbookUrl = `${ACTIVE_MARKETS_URL}/${marketId}${ORDERBOOK_ENDPOINT}`;
+ const resp = await fetchAsync(marketOrderbookUrl);
+ const respJson: DdexOrderbookResponse = await resp.json();
+ return respJson.data.orderBook;
+ }
+}
diff --git a/packages/pipeline/src/data_sources/idex/index.ts b/packages/pipeline/src/data_sources/idex/index.ts
new file mode 100644
index 000000000..c1e53c08d
--- /dev/null
+++ b/packages/pipeline/src/data_sources/idex/index.ts
@@ -0,0 +1,82 @@
+import { fetchAsync } from '@0x/utils';
+
+const IDEX_BASE_URL = 'https://api.idex.market';
+const MARKETS_URL = `${IDEX_BASE_URL}/returnTicker`;
+const ORDERBOOK_URL = `${IDEX_BASE_URL}/returnOrderBook`;
+const MAX_ORDER_COUNT = 100; // Maximum based on https://github.com/AuroraDAO/idex-api-docs#returnorderbook
+export const IDEX_SOURCE = 'idex';
+
+export interface IdexMarketsResponse {
+ [marketName: string]: IdexMarket;
+}
+
+export interface IdexMarket {
+ last: string;
+ high: string;
+ low: string;
+ lowestAsk: string;
+ highestBid: string;
+ percentChange: string;
+ baseVolume: string;
+ quoteVolume: string;
+}
+
+export interface IdexOrderbook {
+ asks: IdexOrder[];
+ bids: IdexOrder[];
+}
+
+export interface IdexOrder {
+ price: string;
+ amount: string;
+ total: string;
+ orderHash: string;
+ params: IdexOrderParam;
+}
+
+export interface IdexOrderParam {
+ tokenBuy: string;
+ buySymbol: string;
+ buyPrecision: number;
+ amountBuy: string;
+ tokenSell: string;
+ sellSymbol: string;
+ sellPrecision: number;
+ amountSell: string;
+ expires: number;
+ nonce: number;
+ user: string;
+}
+
+// tslint:disable:prefer-function-over-method
+// ^ Keep consistency with other sources and help logical organization
+export class IdexSource {
+ /**
+ * Call Idex API to find out which markets they are maintaining orderbooks for.
+ */
+ public async getMarketsAsync(): Promise<string[]> {
+ const params = { method: 'POST' };
+ const resp = await fetchAsync(MARKETS_URL, params);
+ const respJson: IdexMarketsResponse = await resp.json();
+ const markets: string[] = Object.keys(respJson);
+ return markets;
+ }
+
+ /**
+ * Retrieve orderbook from Idex API for a given market.
+ * @param marketId String identifying the market we want data for. Eg. 'REP_AUG'
+ */
+ public async getMarketOrderbookAsync(marketId: string): Promise<IdexOrderbook> {
+ const params = {
+ method: 'POST',
+ headers: { 'Content-Type': 'application/json' },
+ body: JSON.stringify({
+ market: marketId,
+ count: MAX_ORDER_COUNT,
+ }),
+ };
+ const resp = await fetchAsync(ORDERBOOK_URL, params);
+ const respJson: IdexOrderbook = await resp.json();
+ return respJson;
+ }
+}
diff --git a/packages/pipeline/src/data_sources/oasis/index.ts b/packages/pipeline/src/data_sources/oasis/index.ts
new file mode 100644
index 000000000..3b30e9dfd
--- /dev/null
+++ b/packages/pipeline/src/data_sources/oasis/index.ts
@@ -0,0 +1,103 @@
+import { fetchAsync } from '@0x/utils';
+
+const OASIS_BASE_URL = 'https://data.makerdao.com/v1';
+const OASIS_MARKET_QUERY = `query {
+ oasisMarkets(period: "1 week") {
+ nodes {
+ id
+ base
+ quote
+ buyVol
+ sellVol
+ price
+ high
+ low
+ }
+ }
+}`;
+const OASIS_ORDERBOOK_QUERY = `query ($market: String!) {
+ allOasisOrders(condition: { market: $market }) {
+ totalCount
+ nodes {
+ market
+ offerId
+ price
+ amount
+ act
+ }
+ }
+}`;
+export const OASIS_SOURCE = 'oasis';
+
+export interface OasisMarket {
+ id: string; // market symbol e.g MKRDAI
+ base: string; // base symbol e.g MKR
+ quote: string; // quote symbol e.g DAI
+ buyVol: number; // total buy volume (base)
+ sellVol: number; // total sell volume (base)
+ price: number; // volume weighted price (quote)
+ high: number; // max sell price
+ low: number; // min buy price
+}
+
+export interface OasisMarketResponse {
+ data: {
+ oasisMarkets: {
+ nodes: OasisMarket[];
+ };
+ };
+}
+
+export interface OasisOrder {
+ offerId: number; // Offer Id
+ market: string; // Market symbol (base/quote)
+ price: string; // Offer price (quote)
+ amount: string; // Offer amount (base)
+ act: string; // Action (ask|bid)
+}
+
+export interface OasisOrderbookResponse {
+ data: {
+ allOasisOrders: {
+ totalCount: number;
+ nodes: OasisOrder[];
+ };
+ };
+}
+
+// tslint:disable:prefer-function-over-method
+// ^ Keep consistency with other sources and help logical organization
+export class OasisSource {
+ /**
+ * Call Ddex API to find out which markets they are maintaining orderbooks for.
+ */
+ public async getActiveMarketsAsync(): Promise<OasisMarket[]> {
+ const params = {
+ method: 'POST',
+ headers: { 'Content-Type': 'application/json' },
+ body: JSON.stringify({ query: OASIS_MARKET_QUERY }),
+ };
+ const resp = await fetchAsync(OASIS_BASE_URL, params);
+ const respJson: OasisMarketResponse = await resp.json();
+ const markets = respJson.data.oasisMarkets.nodes;
+ return markets;
+ }
+
+ /**
+ * Retrieve orderbook from Oasis API for a given market.
+ * @param marketId String identifying the market we want data for. Eg. 'REPAUG'.
+ */
+ public async getMarketOrderbookAsync(marketId: string): Promise<OasisOrder[]> {
+ const input = {
+ market: marketId,
+ };
+ const params = {
+ method: 'POST',
+ headers: { 'Content-Type': 'application/json' },
+ body: JSON.stringify({ query: OASIS_ORDERBOOK_QUERY, variables: input }),
+ };
+ const resp = await fetchAsync(OASIS_BASE_URL, params);
+ const respJson: OasisOrderbookResponse = await resp.json();
+ return respJson.data.allOasisOrders.nodes;
+ }
+}
diff --git a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts
new file mode 100644
index 000000000..85042501b
--- /dev/null
+++ b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts
@@ -0,0 +1,110 @@
+// tslint:disable:no-duplicate-imports
+import { fetchAsync } from '@0x/utils';
+import Bottleneck from 'bottleneck';
+import { stringify } from 'querystring';
+import * as R from 'ramda';
+
+import { TradingPair } from '../../utils/get_ohlcv_trading_pairs';
+
+export interface CryptoCompareOHLCVResponse {
+ Data: CryptoCompareOHLCVRecord[];
+ Response: string;
+ Message: string;
+ Type: number;
+}
+
+export interface CryptoCompareOHLCVRecord {
+ time: number; // in seconds, not milliseconds
+ close: number;
+ high: number;
+ low: number;
+ open: number;
+ volumefrom: number;
+ volumeto: number;
+}
+
+export interface CryptoCompareOHLCVParams {
+ fsym: string;
+ tsym: string;
+ e?: string;
+ aggregate?: string;
+ aggregatePredictableTimePeriods?: boolean;
+ limit?: number;
+ toTs?: number;
+}
+
+const ONE_HOUR = 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers
+const ONE_SECOND = 1000;
+const ONE_HOUR_AGO = new Date().getTime() - ONE_HOUR;
+const HTTP_OK_STATUS = 200;
+const CRYPTO_COMPARE_VALID_EMPTY_RESPONSE_TYPE = 96;
+const MAX_PAGE_SIZE = 2000;
+
+export class CryptoCompareOHLCVSource {
+ public readonly intervalBetweenRecords = ONE_HOUR;
+ public readonly defaultExchange = 'CCCAGG';
+ public readonly interval = this.intervalBetweenRecords * MAX_PAGE_SIZE; // the hourly API returns data for one interval at a time
+ private readonly _url: string = 'https://min-api.cryptocompare.com/data/histohour?';
+
+ // rate-limit for all API calls through this class instance
+ private readonly _limiter: Bottleneck;
+ constructor(maxReqsPerSecond: number) {
+ this._limiter = new Bottleneck({
+ minTime: ONE_SECOND / maxReqsPerSecond,
+ reservoir: 30,
+ reservoirRefreshAmount: 30,
+ reservoirRefreshInterval: ONE_SECOND,
+ });
+ }
+
+ // gets OHLCV records starting from pair.latest
+ public async getHourlyOHLCVAsync(pair: TradingPair): Promise<CryptoCompareOHLCVRecord[]> {
+ const params = {
+ e: this.defaultExchange,
+ fsym: pair.fromSymbol,
+ tsym: pair.toSymbol,
+ limit: MAX_PAGE_SIZE,
+ toTs: Math.floor((pair.latestSavedTime + this.interval) / ONE_SECOND), // CryptoCompare uses timestamp in seconds. not ms
+ };
+ const url = this._url + stringify(params);
+ const response = await this._limiter.schedule(() => fetchAsync(url));
+ if (response.status !== HTTP_OK_STATUS) {
+ throw new Error(`HTTP error while scraping Crypto Compare: [${response}]`);
+ }
+ const json: CryptoCompareOHLCVResponse = await response.json();
+ if (
+ (json.Response === 'Error' || json.Data.length === 0) &&
+ json.Type !== CRYPTO_COMPARE_VALID_EMPTY_RESPONSE_TYPE
+ ) {
+ throw new Error(JSON.stringify(json));
+ }
+ return json.Data.filter(rec => {
+ return (
+ // Crypto Compare takes ~30 mins to finalise records
+ rec.time * ONE_SECOND < ONE_HOUR_AGO && rec.time * ONE_SECOND > pair.latestSavedTime && hasData(rec)
+ );
+ });
+ }
+ public generateBackfillIntervals(pair: TradingPair): TradingPair[] {
+ const now = new Date().getTime();
+ const f = (p: TradingPair): false | [TradingPair, TradingPair] => {
+ if (p.latestSavedTime > now) {
+ return false;
+ } else {
+ return [p, R.merge(p, { latestSavedTime: p.latestSavedTime + this.interval })];
+ }
+ };
+ return R.unfold(f, pair);
+ }
+}
+
+function hasData(record: CryptoCompareOHLCVRecord): boolean {
+ return (
+ record.close !== 0 ||
+ record.open !== 0 ||
+ record.high !== 0 ||
+ record.low !== 0 ||
+ record.volumefrom !== 0 ||
+ record.volumeto !== 0
+ );
+}
diff --git a/packages/pipeline/src/data_sources/paradex/index.ts b/packages/pipeline/src/data_sources/paradex/index.ts
new file mode 100644
index 000000000..46d448f4b
--- /dev/null
+++ b/packages/pipeline/src/data_sources/paradex/index.ts
@@ -0,0 +1,92 @@
+import { fetchAsync, logUtils } from '@0x/utils';
+
+const PARADEX_BASE_URL = 'https://api.paradex.io/consumer/v0';
+const ACTIVE_MARKETS_URL = `${PARADEX_BASE_URL}/markets`;
+const ORDERBOOK_ENDPOINT = `${PARADEX_BASE_URL}/orderbook`;
+const TOKEN_INFO_ENDPOINT = `${PARADEX_BASE_URL}/tokens`;
+export const PARADEX_SOURCE = 'paradex';
+
+export type ParadexActiveMarketsResponse = ParadexMarket[];
+
+export interface ParadexMarket {
+ id: string;
+ symbol: string;
+ baseToken: string;
+ quoteToken: string;
+ minOrderSize: string;
+ maxOrderSize: string;
+ priceMaxDecimals: number;
+ amountMaxDecimals: number;
+ // These are not native to the Paradex API response. We tag them on later
+ // by calling the token endpoint and joining on symbol.
+ baseTokenAddress?: string;
+ quoteTokenAddress?: string;
+}
+
+export interface ParadexOrderbookResponse {
+ marketId: number;
+ marketSymbol: string;
+ bids: ParadexOrder[];
+ asks: ParadexOrder[];
+}
+
+export interface ParadexOrder {
+ amount: string;
+ price: string;
+}
+
+export type ParadexTokenInfoResponse = ParadexTokenInfo[];
+
+export interface ParadexTokenInfo {
+ name: string;
+ symbol: string;
+ address: string;
+}
+
+export class ParadexSource {
+ private readonly _apiKey: string;
+
+ constructor(apiKey: string) {
+ this._apiKey = apiKey;
+ }
+
+ /**
+ * Call Paradex API to find out which markets they are maintaining orderbooks for.
+ */
+ public async getActiveMarketsAsync(): Promise<ParadexActiveMarketsResponse> {
+ logUtils.log('Getting all active Paradex markets.');
+ const resp = await fetchAsync(ACTIVE_MARKETS_URL, {
+ headers: { 'API-KEY': this._apiKey },
+ });
+ const markets: ParadexActiveMarketsResponse = await resp.json();
+ logUtils.log(`Got ${markets.length} markets.`);
+ return markets;
+ }
+
+ /**
+ * Call Paradex API to find out their token information.
+ */
+ public async getTokenInfoAsync(): Promise<ParadexTokenInfoResponse> {
+ logUtils.log('Getting token information from Paradex.');
+ const resp = await fetchAsync(TOKEN_INFO_ENDPOINT, {
+ headers: { 'API-KEY': this._apiKey },
+ });
+ const tokens: ParadexTokenInfoResponse = await resp.json();
+ logUtils.log(`Got information for ${tokens.length} tokens.`);
+ return tokens;
+ }
+
+ /**
+ * Retrieve orderbook from Paradex API for a given market.
+ * @param marketSymbol String representing the market we want data for.
+ */
+ public async getMarketOrderbookAsync(marketSymbol: string): Promise<ParadexOrderbookResponse> {
+ logUtils.log(`${marketSymbol}: Retrieving orderbook.`);
+ const marketOrderbookUrl = `${ORDERBOOK_ENDPOINT}?market=${marketSymbol}`;
+ const resp = await fetchAsync(marketOrderbookUrl, {
+ headers: { 'API-KEY': this._apiKey },
+ });
+ const orderbookResponse: ParadexOrderbookResponse = await resp.json();
+ return orderbookResponse;
+ }
+}
diff --git a/packages/pipeline/src/data_sources/relayer-registry/index.ts b/packages/pipeline/src/data_sources/relayer-registry/index.ts
new file mode 100644
index 000000000..8133f5eae
--- /dev/null
+++ b/packages/pipeline/src/data_sources/relayer-registry/index.ts
@@ -0,0 +1,33 @@
+import axios from 'axios';
+
+export interface RelayerResponse {
+ name: string;
+ homepage_url: string;
+ app_url: string;
+ header_img: string;
+ logo_img: string;
+ networks: RelayerResponseNetwork[];
+}
+
+export interface RelayerResponseNetwork {
+ networkId: number;
+ sra_http_endpoint?: string;
+ sra_ws_endpoint?: string;
+ static_order_fields?: {
+ fee_recipient_addresses?: string[];
+ taker_addresses?: string[];
+ };
+}
+
+export class RelayerRegistrySource {
+ private readonly _url: string;
+
+ constructor(url: string) {
+ this._url = url;
+ }
+
+ public async getRelayerInfoAsync(): Promise<Map<string, RelayerResponse>> {
+ const resp = await axios.get<Map<string, RelayerResponse>>(this._url);
+ return resp.data;
+ }
+}
diff --git a/packages/pipeline/src/data_sources/trusted_tokens/index.ts b/packages/pipeline/src/data_sources/trusted_tokens/index.ts
new file mode 100644
index 000000000..552739fb9
--- /dev/null
+++ b/packages/pipeline/src/data_sources/trusted_tokens/index.ts
@@ -0,0 +1,29 @@
+import axios from 'axios';
+
+export interface ZeroExTrustedTokenMeta {
+ address: string;
+ name: string;
+ symbol: string;
+ decimals: number;
+}
+
+export interface MetamaskTrustedTokenMeta {
+ address: string;
+ name: string;
+ erc20: boolean;
+ symbol: string;
+ decimals: number;
+}
+
+export class TrustedTokenSource<T> {
+ private readonly _url: string;
+
+ constructor(url: string) {
+ this._url = url;
+ }
+
+ public async getTrustedTokenMetaAsync(): Promise<T> {
+ const resp = await axios.get<T>(this._url);
+ return resp.data;
+ }
+}
diff --git a/packages/pipeline/src/data_sources/web3/index.ts b/packages/pipeline/src/data_sources/web3/index.ts
new file mode 100644
index 000000000..45a9ea161
--- /dev/null
+++ b/packages/pipeline/src/data_sources/web3/index.ts
@@ -0,0 +1,22 @@
+import { Web3ProviderEngine } from '@0x/subproviders';
+import { Web3Wrapper } from '@0x/web3-wrapper';
+import { BlockWithoutTransactionData, Transaction } from 'ethereum-types';
+
+export class Web3Source {
+ private readonly _web3Wrapper: Web3Wrapper;
+ constructor(provider: Web3ProviderEngine) {
+ this._web3Wrapper = new Web3Wrapper(provider);
+ }
+
+ public async getBlockInfoAsync(blockNumber: number): Promise<BlockWithoutTransactionData> {
+ const block = await this._web3Wrapper.getBlockIfExistsAsync(blockNumber);
+ if (block == null) {
+ return Promise.reject(new Error(`Could not find block for given block number: ${blockNumber}`));
+ }
+ return block;
+ }
+
+ public async getTransactionInfoAsync(txHash: string): Promise<Transaction> {
+ return this._web3Wrapper.getTransactionByHashAsync(txHash);
+ }
+}
diff --git a/packages/pipeline/src/entities/block.ts b/packages/pipeline/src/entities/block.ts
new file mode 100644
index 000000000..398946622
--- /dev/null
+++ b/packages/pipeline/src/entities/block.ts
@@ -0,0 +1,13 @@
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+import { numberToBigIntTransformer } from '../utils';
+
+@Entity({ name: 'blocks', schema: 'raw' })
+export class Block {
+ @PrimaryColumn() public hash!: string;
+ @PrimaryColumn({ transformer: numberToBigIntTransformer })
+ public number!: number;
+
+ @Column({ name: 'timestamp', transformer: numberToBigIntTransformer })
+ public timestamp!: number;
+}
diff --git a/packages/pipeline/src/entities/copper_activity.ts b/packages/pipeline/src/entities/copper_activity.ts
new file mode 100644
index 000000000..cbc034285
--- /dev/null
+++ b/packages/pipeline/src/entities/copper_activity.ts
@@ -0,0 +1,41 @@
+import { Column, Entity, Index, PrimaryColumn } from 'typeorm';
+
+import { numberToBigIntTransformer } from '../utils';
+
+@Entity({ name: 'copper_activities', schema: 'raw' })
+export class CopperActivity {
+ @PrimaryColumn({ type: 'bigint', transformer: numberToBigIntTransformer })
+ public id!: number;
+
+ @Index()
+ @Column({ name: 'parent_id', type: 'bigint', transformer: numberToBigIntTransformer })
+ public parentId!: number;
+ @Column({ name: 'parent_type', type: 'varchar' })
+ public parentType!: string;
+
+ // join with CopperActivityType
+ @Index()
+ @Column({ name: 'type_id', type: 'bigint', transformer: numberToBigIntTransformer })
+ public typeId!: number;
+ @Column({ name: 'type_category', type: 'varchar' })
+ public typeCategory!: string;
+ @Column({ name: 'type_name', type: 'varchar', nullable: true })
+ public typeName?: string;
+
+ @Column({ name: 'user_id', type: 'bigint', transformer: numberToBigIntTransformer })
+ public userId!: number;
+ @Column({ name: 'old_value_id', type: 'bigint', nullable: true, transformer: numberToBigIntTransformer })
+ public oldValueId?: number;
+ @Column({ name: 'old_value_name', type: 'varchar', nullable: true })
+ public oldValueName?: string;
+ @Column({ name: 'new_value_id', type: 'bigint', nullable: true, transformer: numberToBigIntTransformer })
+ public newValueId?: number;
+ @Column({ name: 'new_value_name', type: 'varchar', nullable: true })
+ public newValueName?: string;
+
+ @Index()
+ @Column({ name: 'date_created', type: 'bigint', transformer: numberToBigIntTransformer })
+ public dateCreated!: number;
+ @PrimaryColumn({ name: 'date_modified', type: 'bigint', transformer: numberToBigIntTransformer })
+ public dateModified!: number;
+}
diff --git a/packages/pipeline/src/entities/copper_activity_type.ts b/packages/pipeline/src/entities/copper_activity_type.ts
new file mode 100644
index 000000000..8fb2dcf70
--- /dev/null
+++ b/packages/pipeline/src/entities/copper_activity_type.ts
@@ -0,0 +1,17 @@
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+import { numberToBigIntTransformer } from '../utils';
+
+@Entity({ name: 'copper_activity_types', schema: 'raw' })
+export class CopperActivityType {
+ @PrimaryColumn({ type: 'bigint', transformer: numberToBigIntTransformer })
+ public id!: number;
+ @Column({ name: 'category', type: 'varchar' })
+ public category!: string;
+ @Column({ name: 'name', type: 'varchar' })
+ public name!: string;
+ @Column({ name: 'is_disabled', type: 'boolean', nullable: true })
+ public isDisabled?: boolean;
+ @Column({ name: 'count_as_interaction', type: 'boolean', nullable: true })
+ public countAsInteraction?: boolean;
+}
diff --git a/packages/pipeline/src/entities/copper_custom_field.ts b/packages/pipeline/src/entities/copper_custom_field.ts
new file mode 100644
index 000000000..f23f6ab22
--- /dev/null
+++ b/packages/pipeline/src/entities/copper_custom_field.ts
@@ -0,0 +1,15 @@
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+import { numberToBigIntTransformer } from '../utils';
+
+@Entity({ name: 'copper_custom_fields', schema: 'raw' })
+export class CopperCustomField {
+ @PrimaryColumn({ type: 'bigint', transformer: numberToBigIntTransformer })
+ public id!: number;
+ @Column({ name: 'data_type', type: 'varchar' })
+ public dataType!: string;
+ @Column({ name: 'field_type', type: 'varchar', nullable: true })
+ public fieldType?: string;
+ @Column({ name: 'name', type: 'varchar' })
+ public name!: string;
+}
diff --git a/packages/pipeline/src/entities/copper_lead.ts b/packages/pipeline/src/entities/copper_lead.ts
new file mode 100644
index 000000000..c51ccd761
--- /dev/null
+++ b/packages/pipeline/src/entities/copper_lead.ts
@@ -0,0 +1,38 @@
+import { Column, Entity, Index, PrimaryColumn } from 'typeorm';
+
+import { numberToBigIntTransformer } from '../utils';
+
+@Entity({ name: 'copper_leads', schema: 'raw' })
+export class CopperLead {
+ @PrimaryColumn({ type: 'bigint', transformer: numberToBigIntTransformer })
+ public id!: number;
+
+ @Column({ name: 'name', type: 'varchar', nullable: true })
+ public name?: string;
+ @Column({ name: 'first_name', type: 'varchar', nullable: true })
+ public firstName?: string;
+ @Column({ name: 'last_name', type: 'varchar', nullable: true })
+ public lastName?: string;
+ @Column({ name: 'middle_name', type: 'varchar', nullable: true })
+ public middleName?: string;
+ @Column({ name: 'assignee_id', type: 'bigint', transformer: numberToBigIntTransformer, nullable: true })
+ public assigneeId?: number;
+ @Column({ name: 'company_name', type: 'varchar', nullable: true })
+ public companyName?: string;
+ @Column({ name: 'customer_source_id', type: 'bigint', transformer: numberToBigIntTransformer, nullable: true })
+ public customerSourceId?: number;
+ @Column({ name: 'monetary_value', type: 'integer', nullable: true })
+ public monetaryValue?: number;
+ @Column({ name: 'status', type: 'varchar' })
+ public status!: string;
+ @Column({ name: 'status_id', type: 'bigint', transformer: numberToBigIntTransformer })
+ public statusId!: number;
+ @Column({ name: 'title', type: 'varchar', nullable: true })
+ public title?: string;
+
+ @Index()
+ @Column({ name: 'date_created', type: 'bigint', transformer: numberToBigIntTransformer })
+ public dateCreated!: number;
+ @PrimaryColumn({ name: 'date_modified', type: 'bigint', transformer: numberToBigIntTransformer })
+ public dateModified!: number;
+}
diff --git a/packages/pipeline/src/entities/copper_opportunity.ts b/packages/pipeline/src/entities/copper_opportunity.ts
new file mode 100644
index 000000000..e12bd69ce
--- /dev/null
+++ b/packages/pipeline/src/entities/copper_opportunity.ts
@@ -0,0 +1,45 @@
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+import { numberToBigIntTransformer } from '../utils';
+
+@Entity({ name: 'copper_opportunities', schema: 'raw' })
+export class CopperOpportunity {
+ @PrimaryColumn({ name: 'id', type: 'bigint', transformer: numberToBigIntTransformer })
+ public id!: number;
+ @Column({ name: 'name', type: 'varchar' })
+ public name!: string;
+ @Column({ name: 'assignee_id', nullable: true, type: 'bigint', transformer: numberToBigIntTransformer })
+ public assigneeId?: number;
+ @Column({ name: 'close_date', nullable: true, type: 'varchar' })
+ public closeDate?: string;
+ @Column({ name: 'company_id', nullable: true, type: 'bigint', transformer: numberToBigIntTransformer })
+ public companyId?: number;
+ @Column({ name: 'company_name', nullable: true, type: 'varchar' })
+ public companyName?: string;
+ @Column({ name: 'customer_source_id', nullable: true, type: 'bigint', transformer: numberToBigIntTransformer })
+ public customerSourceId?: number;
+ @Column({ name: 'loss_reason_id', nullable: true, type: 'bigint', transformer: numberToBigIntTransformer })
+ public lossReasonId?: number;
+ @Column({ name: 'pipeline_id', type: 'bigint', transformer: numberToBigIntTransformer })
+ public pipelineId!: number;
+ @Column({ name: 'pipeline_stage_id', type: 'bigint', transformer: numberToBigIntTransformer })
+ public pipelineStageId!: number;
+ @Column({ name: 'primary_contact_id', nullable: true, type: 'bigint', transformer: numberToBigIntTransformer })
+ public primaryContactId?: number;
+ @Column({ name: 'priority', nullable: true, type: 'varchar' })
+ public priority?: string;
+ @Column({ name: 'status', type: 'varchar' })
+ public status!: string;
+ @Column({ name: 'interaction_count', type: 'bigint', transformer: numberToBigIntTransformer })
+ public interactionCount!: number;
+ @Column({ name: 'monetary_value', nullable: true, type: 'integer' })
+ public monetaryValue?: number;
+ @Column({ name: 'win_probability', nullable: true, type: 'integer' })
+ public winProbability?: number;
+ @Column({ name: 'date_created', type: 'bigint', transformer: numberToBigIntTransformer })
+ public dateCreated!: number;
+ @PrimaryColumn({ name: 'date_modified', type: 'bigint', transformer: numberToBigIntTransformer })
+ public dateModified!: number;
+ @Column({ name: 'custom_fields', type: 'jsonb' })
+ public customFields!: { [key: number]: number };
+}
diff --git a/packages/pipeline/src/entities/dex_trade.ts b/packages/pipeline/src/entities/dex_trade.ts
new file mode 100644
index 000000000..9d288cb51
--- /dev/null
+++ b/packages/pipeline/src/entities/dex_trade.ts
@@ -0,0 +1,54 @@
+import { BigNumber } from '@0x/utils';
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+import { bigNumberTransformer, numberToBigIntTransformer } from '../utils';
+
+@Entity({ name: 'dex_trades', schema: 'raw' })
+export class DexTrade {
+ @PrimaryColumn({ name: 'source_url' })
+ public sourceUrl!: string;
+ @PrimaryColumn({ name: 'tx_hash' })
+ public txHash!: string;
+
+ @Column({ name: 'tx_timestamp', type: 'bigint', transformer: numberToBigIntTransformer })
+ public txTimestamp!: number;
+ @Column({ name: 'tx_date' })
+ public txDate!: string;
+ @Column({ name: 'tx_sender' })
+ public txSender!: string;
+ @Column({ name: 'smart_contract_id', type: 'bigint', transformer: numberToBigIntTransformer })
+ public smartContractId!: number;
+ @Column({ name: 'smart_contract_address' })
+ public smartContractAddress!: string;
+ @Column({ name: 'contract_type' })
+ public contractType!: string;
+ @Column({ type: 'varchar' })
+ public maker!: string;
+ @Column({ type: 'varchar' })
+ public taker!: string;
+ @Column({ name: 'amount_buy', type: 'numeric', transformer: bigNumberTransformer })
+ public amountBuy!: BigNumber;
+ @Column({ name: 'maker_fee_amount', type: 'numeric', transformer: bigNumberTransformer })
+ public makerFeeAmount!: BigNumber;
+ @Column({ name: 'buy_currency_id', type: 'bigint', transformer: numberToBigIntTransformer })
+ public buyCurrencyId!: number;
+ @Column({ name: 'buy_symbol' })
+ public buySymbol!: string;
+ @Column({ name: 'amount_sell', type: 'numeric', transformer: bigNumberTransformer })
+ public amountSell!: BigNumber;
+ @Column({ name: 'taker_fee_amount', type: 'numeric', transformer: bigNumberTransformer })
+ public takerFeeAmount!: BigNumber;
+ @Column({ name: 'sell_currency_id', type: 'bigint', transformer: numberToBigIntTransformer })
+ public sellCurrencyId!: number;
+ @Column({ name: 'sell_symbol' })
+ public sellSymbol!: string;
+ @Column({ name: 'maker_annotation' })
+ public makerAnnotation!: string;
+ @Column({ name: 'taker_annotation' })
+ public takerAnnotation!: string;
+ @Column() public protocol!: string;
+ @Column({ name: 'buy_address', type: 'varchar', nullable: true })
+ public buyAddress!: string | null;
+ @Column({ name: 'sell_address', type: 'varchar', nullable: true })
+ public sellAddress!: string | null;
+}
diff --git a/packages/pipeline/src/entities/erc20_approval_event.ts b/packages/pipeline/src/entities/erc20_approval_event.ts
new file mode 100644
index 000000000..69cdfcb0b
--- /dev/null
+++ b/packages/pipeline/src/entities/erc20_approval_event.ts
@@ -0,0 +1,26 @@
+import { BigNumber } from '@0x/utils';
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+import { bigNumberTransformer, numberToBigIntTransformer } from '../utils';
+
+@Entity({ name: 'erc20_approval_events', schema: 'raw' })
+export class ERC20ApprovalEvent {
+ @PrimaryColumn({ name: 'token_address' })
+ public tokenAddress!: string;
+ @PrimaryColumn({ name: 'log_index' })
+ public logIndex!: number;
+ @PrimaryColumn({ name: 'block_number', transformer: numberToBigIntTransformer })
+ public blockNumber!: number;
+
+ @Column({ name: 'raw_data' })
+ public rawData!: string;
+
+ @Column({ name: 'transaction_hash' })
+ public transactionHash!: string;
+ @Column({ name: 'owner_address' })
+ public ownerAddress!: string;
+ @Column({ name: 'spender_address' })
+ public spenderAddress!: string;
+ @Column({ name: 'amount', type: 'numeric', transformer: bigNumberTransformer })
+ public amount!: BigNumber;
+}
diff --git a/packages/pipeline/src/entities/exchange_cancel_event.ts b/packages/pipeline/src/entities/exchange_cancel_event.ts
new file mode 100644
index 000000000..38f99c903
--- /dev/null
+++ b/packages/pipeline/src/entities/exchange_cancel_event.ts
@@ -0,0 +1,51 @@
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+import { AssetType } from '../types';
+import { numberToBigIntTransformer } from '../utils';
+
+@Entity({ name: 'exchange_cancel_events', schema: 'raw' })
+export class ExchangeCancelEvent {
+ @PrimaryColumn({ name: 'contract_address' })
+ public contractAddress!: string;
+ @PrimaryColumn({ name: 'log_index' })
+ public logIndex!: number;
+ @PrimaryColumn({ name: 'block_number', transformer: numberToBigIntTransformer })
+ public blockNumber!: number;
+
+ @Column({ name: 'raw_data' })
+ public rawData!: string;
+
+ @Column({ name: 'transaction_hash' })
+ public transactionHash!: string;
+ @Column({ name: 'maker_address' })
+ public makerAddress!: string;
+ @Column({ nullable: true, type: String, name: 'taker_address' })
+ public takerAddress!: string;
+ @Column({ name: 'fee_recipient_address' })
+ public feeRecipientAddress!: string;
+ @Column({ name: 'sender_address' })
+ public senderAddress!: string;
+ @Column({ name: 'order_hash' })
+ public orderHash!: string;
+
+ @Column({ name: 'raw_maker_asset_data' })
+ public rawMakerAssetData!: string;
+ @Column({ name: 'maker_asset_type' })
+ public makerAssetType!: AssetType;
+ @Column({ name: 'maker_asset_proxy_id' })
+ public makerAssetProxyId!: string;
+ @Column({ name: 'maker_token_address' })
+ public makerTokenAddress!: string;
+ @Column({ nullable: true, type: String, name: 'maker_token_id' })
+ public makerTokenId!: string | null;
+ @Column({ name: 'raw_taker_asset_data' })
+ public rawTakerAssetData!: string;
+ @Column({ name: 'taker_asset_type' })
+ public takerAssetType!: AssetType;
+ @Column({ name: 'taker_asset_proxy_id' })
+ public takerAssetProxyId!: string;
+ @Column({ name: 'taker_token_address' })
+ public takerTokenAddress!: string;
+ @Column({ nullable: true, type: String, name: 'taker_token_id' })
+ public takerTokenId!: string | null;
+}
diff --git a/packages/pipeline/src/entities/exchange_cancel_up_to_event.ts b/packages/pipeline/src/entities/exchange_cancel_up_to_event.ts
new file mode 100644
index 000000000..27580305e
--- /dev/null
+++ b/packages/pipeline/src/entities/exchange_cancel_up_to_event.ts
@@ -0,0 +1,26 @@
+import { BigNumber } from '@0x/utils';
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+import { bigNumberTransformer, numberToBigIntTransformer } from '../utils';
+
+@Entity({ name: 'exchange_cancel_up_to_events', schema: 'raw' })
+export class ExchangeCancelUpToEvent {
+ @PrimaryColumn({ name: 'contract_address' })
+ public contractAddress!: string;
+ @PrimaryColumn({ name: 'log_index' })
+ public logIndex!: number;
+ @PrimaryColumn({ name: 'block_number', transformer: numberToBigIntTransformer })
+ public blockNumber!: number;
+
+ @Column({ name: 'raw_data' })
+ public rawData!: string;
+
+ @Column({ name: 'transaction_hash' })
+ public transactionHash!: string;
+ @Column({ name: 'maker_address' })
+ public makerAddress!: string;
+ @Column({ name: 'sender_address' })
+ public senderAddress!: string;
+ @Column({ name: 'order_epoch', type: 'numeric', transformer: bigNumberTransformer })
+ public orderEpoch!: BigNumber;
+}
diff --git a/packages/pipeline/src/entities/exchange_fill_event.ts b/packages/pipeline/src/entities/exchange_fill_event.ts
new file mode 100644
index 000000000..9b7727615
--- /dev/null
+++ b/packages/pipeline/src/entities/exchange_fill_event.ts
@@ -0,0 +1,60 @@
+import { BigNumber } from '@0x/utils';
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+import { AssetType } from '../types';
+import { bigNumberTransformer, numberToBigIntTransformer } from '../utils';
+
+@Entity({ name: 'exchange_fill_events', schema: 'raw' })
+export class ExchangeFillEvent {
+ @PrimaryColumn({ name: 'contract_address' })
+ public contractAddress!: string;
+ @PrimaryColumn({ name: 'log_index' })
+ public logIndex!: number;
+ @PrimaryColumn({ name: 'block_number', transformer: numberToBigIntTransformer })
+ public blockNumber!: number;
+
+ @Column({ name: 'raw_data' })
+ public rawData!: string;
+
+ @Column({ name: 'transaction_hash' })
+ public transactionHash!: string;
+ @Column({ name: 'maker_address' })
+ public makerAddress!: string;
+ @Column({ name: 'taker_address' })
+ public takerAddress!: string;
+ @Column({ name: 'fee_recipient_address' })
+ public feeRecipientAddress!: string;
+ @Column({ name: 'sender_address' })
+ public senderAddress!: string;
+ @Column({ name: 'maker_asset_filled_amount', type: 'numeric', transformer: bigNumberTransformer })
+ public makerAssetFilledAmount!: BigNumber;
+ @Column({ name: 'taker_asset_filled_amount', type: 'numeric', transformer: bigNumberTransformer })
+ public takerAssetFilledAmount!: BigNumber;
+ @Column({ name: 'maker_fee_paid', type: 'numeric', transformer: bigNumberTransformer })
+ public makerFeePaid!: BigNumber;
+ @Column({ name: 'taker_fee_paid', type: 'numeric', transformer: bigNumberTransformer })
+ public takerFeePaid!: BigNumber;
+ @Column({ name: 'order_hash' })
+ public orderHash!: string;
+
+ @Column({ name: 'raw_maker_asset_data' })
+ public rawMakerAssetData!: string;
+ @Column({ name: 'maker_asset_type' })
+ public makerAssetType!: AssetType;
+ @Column({ name: 'maker_asset_proxy_id' })
+ public makerAssetProxyId!: string;
+ @Column({ name: 'maker_token_address' })
+ public makerTokenAddress!: string;
+ @Column({ nullable: true, type: String, name: 'maker_token_id' })
+ public makerTokenId!: string | null;
+ @Column({ name: 'raw_taker_asset_data' })
+ public rawTakerAssetData!: string;
+ @Column({ name: 'taker_asset_type' })
+ public takerAssetType!: AssetType;
+ @Column({ name: 'taker_asset_proxy_id' })
+ public takerAssetProxyId!: string;
+ @Column({ name: 'taker_token_address' })
+ public takerTokenAddress!: string;
+ @Column({ nullable: true, type: String, name: 'taker_token_id' })
+ public takerTokenId!: string | null;
+}
diff --git a/packages/pipeline/src/entities/index.ts b/packages/pipeline/src/entities/index.ts
new file mode 100644
index 000000000..27c153c07
--- /dev/null
+++ b/packages/pipeline/src/entities/index.ts
@@ -0,0 +1,25 @@
+import { ExchangeCancelEvent } from './exchange_cancel_event';
+import { ExchangeCancelUpToEvent } from './exchange_cancel_up_to_event';
+import { ExchangeFillEvent } from './exchange_fill_event';
+
+export { Block } from './block';
+export { DexTrade } from './dex_trade';
+export { ExchangeCancelEvent } from './exchange_cancel_event';
+export { ExchangeCancelUpToEvent } from './exchange_cancel_up_to_event';
+export { ExchangeFillEvent } from './exchange_fill_event';
+export { OHLCVExternal } from './ohlcv_external';
+export { Relayer } from './relayer';
+export { SraOrder } from './sra_order';
+export { SraOrdersObservedTimeStamp, createObservedTimestampForOrder } from './sra_order_observed_timestamp';
+export { TokenMetadata } from './token_metadata';
+export { TokenOrderbookSnapshot } from './token_order';
+export { Transaction } from './transaction';
+export { ERC20ApprovalEvent } from './erc20_approval_event';
+
+export { CopperLead } from './copper_lead';
+export { CopperActivity } from './copper_activity';
+export { CopperOpportunity } from './copper_opportunity';
+export { CopperActivityType } from './copper_activity_type';
+export { CopperCustomField } from './copper_custom_field';
+
+export type ExchangeEvent = ExchangeFillEvent | ExchangeCancelEvent | ExchangeCancelUpToEvent;
diff --git a/packages/pipeline/src/entities/ohlcv_external.ts b/packages/pipeline/src/entities/ohlcv_external.ts
new file mode 100644
index 000000000..4f55dd930
--- /dev/null
+++ b/packages/pipeline/src/entities/ohlcv_external.ts
@@ -0,0 +1,30 @@
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+import { numberToBigIntTransformer } from '../utils';
+
+@Entity({ name: 'ohlcv_external', schema: 'raw' })
+export class OHLCVExternal {
+ @PrimaryColumn() public exchange!: string;
+
+ @PrimaryColumn({ name: 'from_symbol', type: 'varchar' })
+ public fromSymbol!: string;
+ @PrimaryColumn({ name: 'to_symbol', type: 'varchar' })
+ public toSymbol!: string;
+ @PrimaryColumn({ name: 'start_time', transformer: numberToBigIntTransformer })
+ public startTime!: number;
+ @PrimaryColumn({ name: 'end_time', transformer: numberToBigIntTransformer })
+ public endTime!: number;
+
+ @Column() public open!: number;
+ @Column() public close!: number;
+ @Column() public low!: number;
+ @Column() public high!: number;
+ @Column({ name: 'volume_from' })
+ public volumeFrom!: number;
+ @Column({ name: 'volume_to' })
+ public volumeTo!: number;
+
+ @PrimaryColumn() public source!: string;
+ @PrimaryColumn({ name: 'observed_timestamp', transformer: numberToBigIntTransformer })
+ public observedTimestamp!: number;
+}
diff --git a/packages/pipeline/src/entities/relayer.ts b/packages/pipeline/src/entities/relayer.ts
new file mode 100644
index 000000000..5af8578b4
--- /dev/null
+++ b/packages/pipeline/src/entities/relayer.ts
@@ -0,0 +1,21 @@
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+@Entity({ name: 'relayers', schema: 'raw' })
+export class Relayer {
+ @PrimaryColumn() public uuid!: string;
+
+ @Column() public name!: string;
+ @Column({ name: 'homepage_url', type: 'varchar' })
+ public homepageUrl!: string;
+ @Column({ name: 'sra_http_endpoint', type: 'varchar', nullable: true })
+ public sraHttpEndpoint!: string | null;
+ @Column({ name: 'sra_ws_endpoint', type: 'varchar', nullable: true })
+ public sraWsEndpoint!: string | null;
+ @Column({ name: 'app_url', type: 'varchar', nullable: true })
+ public appUrl!: string | null;
+
+ @Column({ name: 'fee_recipient_addresses', type: 'varchar', array: true })
+ public feeRecipientAddresses!: string[];
+ @Column({ name: 'taker_addresses', type: 'varchar', array: true })
+ public takerAddresses!: string[];
+}
diff --git a/packages/pipeline/src/entities/sra_order.ts b/packages/pipeline/src/entities/sra_order.ts
new file mode 100644
index 000000000..9c730a0bb
--- /dev/null
+++ b/packages/pipeline/src/entities/sra_order.ts
@@ -0,0 +1,63 @@
+import { BigNumber } from '@0x/utils';
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+import { AssetType } from '../types';
+import { bigNumberTransformer } from '../utils';
+
+@Entity({ name: 'sra_orders', schema: 'raw' })
+export class SraOrder {
+ @PrimaryColumn({ name: 'exchange_address' })
+ public exchangeAddress!: string;
+ @PrimaryColumn({ name: 'order_hash_hex' })
+ public orderHashHex!: string;
+ @PrimaryColumn({ name: 'source_url' })
+ public sourceUrl!: string;
+
+ @Column({ name: 'maker_address' })
+ public makerAddress!: string;
+ @Column({ name: 'taker_address' })
+ public takerAddress!: string;
+ @Column({ name: 'fee_recipient_address' })
+ public feeRecipientAddress!: string;
+ @Column({ name: 'sender_address' })
+ public senderAddress!: string;
+ @Column({ name: 'maker_asset_amount', type: 'numeric', transformer: bigNumberTransformer })
+ public makerAssetAmount!: BigNumber;
+ @Column({ name: 'taker_asset_amount', type: 'numeric', transformer: bigNumberTransformer })
+ public takerAssetAmount!: BigNumber;
+ @Column({ name: 'maker_fee', type: 'numeric', transformer: bigNumberTransformer })
+ public makerFee!: BigNumber;
+ @Column({ name: 'taker_fee', type: 'numeric', transformer: bigNumberTransformer })
+ public takerFee!: BigNumber;
+ @Column({ name: 'expiration_time_seconds', type: 'numeric', transformer: bigNumberTransformer })
+ public expirationTimeSeconds!: BigNumber;
+ @Column({ name: 'salt', type: 'numeric', transformer: bigNumberTransformer })
+ public salt!: BigNumber;
+ @Column({ name: 'signature' })
+ public signature!: string;
+
+ @Column({ name: 'raw_maker_asset_data' })
+ public rawMakerAssetData!: string;
+ @Column({ name: 'maker_asset_type' })
+ public makerAssetType!: AssetType;
+ @Column({ name: 'maker_asset_proxy_id' })
+ public makerAssetProxyId!: string;
+ @Column({ name: 'maker_token_address' })
+ public makerTokenAddress!: string;
+ @Column({ nullable: true, type: String, name: 'maker_token_id' })
+ public makerTokenId!: string | null;
+ @Column({ name: 'raw_taker_asset_data' })
+ public rawTakerAssetData!: string;
+ @Column({ name: 'taker_asset_type' })
+ public takerAssetType!: AssetType;
+ @Column({ name: 'taker_asset_proxy_id' })
+ public takerAssetProxyId!: string;
+ @Column({ name: 'taker_token_address' })
+ public takerTokenAddress!: string;
+ @Column({ nullable: true, type: String, name: 'taker_token_id' })
+ public takerTokenId!: string | null;
+
+ // TODO(albrow): Make this optional?
+ @Column({ name: 'metadata_json' })
+ public metadataJson!: string;
+}
diff --git a/packages/pipeline/src/entities/sra_order_observed_timestamp.ts b/packages/pipeline/src/entities/sra_order_observed_timestamp.ts
new file mode 100644
index 000000000..cbec1c6d0
--- /dev/null
+++ b/packages/pipeline/src/entities/sra_order_observed_timestamp.ts
@@ -0,0 +1,35 @@
+import { Entity, PrimaryColumn } from 'typeorm';
+
+import { numberToBigIntTransformer } from '../utils';
+
+import { SraOrder } from './sra_order';
+
+@Entity({ name: 'sra_orders_observed_timestamps', schema: 'raw' })
+export class SraOrdersObservedTimeStamp {
+ @PrimaryColumn({ name: 'exchange_address' })
+ public exchangeAddress!: string;
+ @PrimaryColumn({ name: 'order_hash_hex' })
+ public orderHashHex!: string;
+ @PrimaryColumn({ name: 'source_url' })
+ public sourceUrl!: string;
+
+ @PrimaryColumn({ name: 'observed_timestamp', transformer: numberToBigIntTransformer })
+ public observedTimestamp!: number;
+}
+
+/**
+ * Returns a new SraOrdersObservedTimeStamp for the given order based on the
+ * current time.
+ * @param order The order to generate a timestamp for.
+ */
+export function createObservedTimestampForOrder(
+ order: SraOrder,
+ observedTimestamp: number,
+): SraOrdersObservedTimeStamp {
+ const observed = new SraOrdersObservedTimeStamp();
+ observed.exchangeAddress = order.exchangeAddress;
+ observed.orderHashHex = order.orderHashHex;
+ observed.sourceUrl = order.sourceUrl;
+ observed.observedTimestamp = observedTimestamp;
+ return observed;
+}
diff --git a/packages/pipeline/src/entities/token_metadata.ts b/packages/pipeline/src/entities/token_metadata.ts
new file mode 100644
index 000000000..911b53972
--- /dev/null
+++ b/packages/pipeline/src/entities/token_metadata.ts
@@ -0,0 +1,22 @@
+import { BigNumber } from '@0x/utils';
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+import { bigNumberTransformer } from '../utils/transformers';
+
+@Entity({ name: 'token_metadata', schema: 'raw' })
+export class TokenMetadata {
+ @PrimaryColumn({ type: 'varchar', nullable: false })
+ public address!: string;
+
+ @PrimaryColumn({ type: 'varchar', nullable: false })
+ public authority!: string;
+
+ @Column({ type: 'numeric', transformer: bigNumberTransformer, nullable: true })
+ public decimals!: BigNumber | null;
+
+ @Column({ type: 'varchar', nullable: true })
+ public symbol!: string | null;
+
+ @Column({ type: 'varchar', nullable: true })
+ public name!: string | null;
+}
diff --git a/packages/pipeline/src/entities/token_order.ts b/packages/pipeline/src/entities/token_order.ts
new file mode 100644
index 000000000..2709747cb
--- /dev/null
+++ b/packages/pipeline/src/entities/token_order.ts
@@ -0,0 +1,28 @@
+import { BigNumber } from '@0x/utils';
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+import { bigNumberTransformer, numberToBigIntTransformer } from '../utils';
+
+@Entity({ name: 'token_orderbook_snapshots', schema: 'raw' })
+export class TokenOrderbookSnapshot {
+ @PrimaryColumn({ name: 'observed_timestamp', type: 'bigint', transformer: numberToBigIntTransformer })
+ public observedTimestamp!: number;
+ @PrimaryColumn({ name: 'source' })
+ public source!: string;
+ @PrimaryColumn({ name: 'order_type' })
+ public orderType!: string;
+ @PrimaryColumn({ name: 'price', type: 'numeric', transformer: bigNumberTransformer })
+ public price!: BigNumber;
+ @PrimaryColumn({ name: 'base_asset_symbol' })
+ public baseAssetSymbol!: string;
+ @Column({ nullable: true, type: String, name: 'base_asset_address' })
+ public baseAssetAddress!: string | null;
+ @Column({ name: 'base_volume', type: 'numeric', transformer: bigNumberTransformer })
+ public baseVolume!: BigNumber;
+ @PrimaryColumn({ name: 'quote_asset_symbol' })
+ public quoteAssetSymbol!: string;
+ @Column({ nullable: true, type: String, name: 'quote_asset_address' })
+ public quoteAssetAddress!: string | null;
+ @Column({ name: 'quote_volume', type: 'numeric', transformer: bigNumberTransformer })
+ public quoteVolume!: BigNumber;
+}
diff --git a/packages/pipeline/src/entities/transaction.ts b/packages/pipeline/src/entities/transaction.ts
new file mode 100644
index 000000000..742050177
--- /dev/null
+++ b/packages/pipeline/src/entities/transaction.ts
@@ -0,0 +1,19 @@
+import { BigNumber } from '@0x/utils';
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+import { bigNumberTransformer, numberToBigIntTransformer } from '../utils';
+
+@Entity({ name: 'transactions', schema: 'raw' })
+export class Transaction {
+ @PrimaryColumn({ name: 'transaction_hash' })
+ public transactionHash!: string;
+ @PrimaryColumn({ name: 'block_hash' })
+ public blockHash!: string;
+ @PrimaryColumn({ name: 'block_number', transformer: numberToBigIntTransformer })
+ public blockNumber!: number;
+
+ @Column({ type: 'numeric', name: 'gas_used', transformer: bigNumberTransformer })
+ public gasUsed!: BigNumber;
+ @Column({ type: 'numeric', name: 'gas_price', transformer: bigNumberTransformer })
+ public gasPrice!: BigNumber;
+}
diff --git a/packages/pipeline/src/ormconfig.ts b/packages/pipeline/src/ormconfig.ts
new file mode 100644
index 000000000..2700714cd
--- /dev/null
+++ b/packages/pipeline/src/ormconfig.ts
@@ -0,0 +1,54 @@
+import { ConnectionOptions } from 'typeorm';
+
+import {
+ Block,
+ CopperActivity,
+ CopperActivityType,
+ CopperCustomField,
+ CopperLead,
+ CopperOpportunity,
+ DexTrade,
+ ERC20ApprovalEvent,
+ ExchangeCancelEvent,
+ ExchangeCancelUpToEvent,
+ ExchangeFillEvent,
+ OHLCVExternal,
+ Relayer,
+ SraOrder,
+ SraOrdersObservedTimeStamp,
+ TokenMetadata,
+ TokenOrderbookSnapshot,
+ Transaction,
+} from './entities';
+
+const entities = [
+ Block,
+ CopperOpportunity,
+ CopperActivity,
+ CopperActivityType,
+ CopperCustomField,
+ CopperLead,
+ DexTrade,
+ ExchangeCancelEvent,
+ ExchangeCancelUpToEvent,
+ ExchangeFillEvent,
+ ERC20ApprovalEvent,
+ OHLCVExternal,
+ Relayer,
+ SraOrder,
+ SraOrdersObservedTimeStamp,
+ TokenMetadata,
+ TokenOrderbookSnapshot,
+ Transaction,
+];
+
+const config: ConnectionOptions = {
+ type: 'postgres',
+ url: process.env.ZEROEX_DATA_PIPELINE_DB_URL,
+ synchronize: false,
+ logging: ['error'],
+ entities,
+ migrations: ['./lib/migrations/**/*.js'],
+};
+
+module.exports = config;
diff --git a/packages/pipeline/src/parsers/bloxy/index.ts b/packages/pipeline/src/parsers/bloxy/index.ts
new file mode 100644
index 000000000..caa55d289
--- /dev/null
+++ b/packages/pipeline/src/parsers/bloxy/index.ts
@@ -0,0 +1,53 @@
+import { BigNumber } from '@0x/utils';
+import * as R from 'ramda';
+
+import { BLOXY_DEX_TRADES_URL, BloxyTrade } from '../../data_sources/bloxy';
+import { DexTrade } from '../../entities';
+
+/**
+ * Parses a raw trades response from the Bloxy Dex API and returns an array of
+ * DexTrade entities.
+ * @param rawTrades A raw order response from an SRA endpoint.
+ */
+export function parseBloxyTrades(rawTrades: BloxyTrade[]): DexTrade[] {
+ return R.map(_parseBloxyTrade, rawTrades);
+}
+
+/**
+ * Converts a single Bloxy trade into a DexTrade entity.
+ * @param rawTrade A single trade from the response from the Bloxy API.
+ */
+export function _parseBloxyTrade(rawTrade: BloxyTrade): DexTrade {
+ const dexTrade = new DexTrade();
+ dexTrade.sourceUrl = BLOXY_DEX_TRADES_URL;
+ dexTrade.txHash = rawTrade.tx_hash;
+ dexTrade.txTimestamp = new Date(rawTrade.tx_time).getTime();
+ dexTrade.txDate = rawTrade.tx_date;
+ dexTrade.txSender = rawTrade.tx_sender;
+ dexTrade.smartContractId = rawTrade.smart_contract_id;
+ dexTrade.smartContractAddress = rawTrade.smart_contract_address;
+ dexTrade.contractType = rawTrade.contract_type;
+ dexTrade.maker = rawTrade.maker;
+ dexTrade.taker = rawTrade.taker;
+ // TODO(albrow): The Bloxy API returns amounts and fees as a `number` type
+ // but some of their values have too many significant digits to be
+ // represented that way. Ideally they will switch to using strings and then
+ // we can update this code.
+ dexTrade.amountBuy = new BigNumber(rawTrade.amountBuy.toString());
+ dexTrade.makerFeeAmount = new BigNumber(rawTrade.makerFee.toString());
+ dexTrade.buyCurrencyId = rawTrade.buyCurrencyId;
+ dexTrade.buySymbol = filterNullCharacters(rawTrade.buySymbol);
+ dexTrade.amountSell = new BigNumber(rawTrade.amountSell.toString());
+ dexTrade.takerFeeAmount = new BigNumber(rawTrade.takerFee.toString());
+ dexTrade.sellCurrencyId = rawTrade.sellCurrencyId;
+ dexTrade.sellSymbol = filterNullCharacters(rawTrade.sellSymbol);
+ dexTrade.makerAnnotation = rawTrade.maker_annotation;
+ dexTrade.takerAnnotation = rawTrade.taker_annotation;
+ dexTrade.protocol = rawTrade.protocol;
+ dexTrade.buyAddress = rawTrade.buyAddress;
+ dexTrade.sellAddress = rawTrade.sellAddress;
+ return dexTrade;
+}
+
+// Works with any form of escaped null character (e.g., '\0' and '\u0000').
+const filterNullCharacters = R.replace(/\0/g, '');
diff --git a/packages/pipeline/src/parsers/copper/index.ts b/packages/pipeline/src/parsers/copper/index.ts
new file mode 100644
index 000000000..6c0c5abd5
--- /dev/null
+++ b/packages/pipeline/src/parsers/copper/index.ts
@@ -0,0 +1,259 @@
+import * as R from 'ramda';
+
+import { CopperActivity, CopperActivityType, CopperCustomField, CopperLead, CopperOpportunity } from '../../entities';
+
+const ONE_SECOND = 1000;
+export type CopperSearchResponse = CopperLeadResponse | CopperActivityResponse | CopperOpportunityResponse;
+export interface CopperLeadResponse {
+ id: number;
+ name?: string;
+ first_name?: string;
+ last_name?: string;
+ middle_name?: string;
+ assignee_id?: number;
+ company_name?: string;
+ customer_source_id?: number;
+ monetary_value?: number;
+ status: string;
+ status_id: number;
+ title?: string;
+ date_created: number; // in seconds
+ date_modified: number; // in seconds
+}
+
+export interface CopperActivityResponse {
+ id: number;
+ parent: CopperActivityParentResponse;
+ type: CopperActivityTypeResponse;
+ user_id: number;
+ activity_date: number;
+ old_value: CopperActivityValueResponse;
+ new_value: CopperActivityValueResponse;
+ date_created: number; // in seconds
+ date_modified: number; // in seconds
+}
+
+export interface CopperActivityValueResponse {
+ id: number;
+ name: string;
+}
+export interface CopperActivityParentResponse {
+ id: number;
+ type: string;
+}
+
+// custom activity types
+export enum CopperActivityTypeCategory {
+ user = 'user',
+ system = 'system',
+}
+export interface CopperActivityTypeResponse {
+ id: number;
+ category: CopperActivityTypeCategory;
+ name: string;
+ is_disabled?: boolean;
+ count_as_interaction?: boolean;
+}
+
+export interface CopperOpportunityResponse {
+ id: number;
+ name: string;
+ assignee_id?: number;
+ close_date?: string;
+ company_id?: number;
+ company_name?: string;
+ customer_source_id?: number;
+ loss_reason_id?: number;
+ pipeline_id: number;
+ pipeline_stage_id: number;
+ primary_contact_id?: number;
+ priority?: string;
+ status: string;
+ tags: string[];
+ interaction_count: number;
+ monetary_value?: number;
+ win_probability?: number;
+ date_created: number; // in seconds
+ date_modified: number; // in seconds
+ custom_fields: CopperNestedCustomFieldResponse[];
+}
+interface CopperNestedCustomFieldResponse {
+ custom_field_definition_id: number;
+ value: number | number[] | null;
+}
+// custom fields
+export enum CopperCustomFieldType {
+ String = 'String',
+ Text = 'Text',
+ Dropdown = 'Dropdown',
+ MultiSelect = 'MultiSelect', // not in API documentation but shows up in results
+ Date = 'Date',
+ Checkbox = 'Checkbox',
+ Float = 'Float',
+ URL = 'URL',
+ Percentage = 'Percentage',
+ Currency = 'Currency',
+ Connect = 'Connect',
+}
+export interface CopperCustomFieldOptionResponse {
+ id: number;
+ name: string;
+}
+export interface CopperCustomFieldResponse {
+ id: number;
+ name: string;
+ data_type: CopperCustomFieldType;
+ options?: CopperCustomFieldOptionResponse[];
+}
+/**
+ * Parse response from Copper API /search/leads/
+ *
+ * @param leads - The array of leads returned from the API
+ * @returns Returns an array of Copper Lead entities
+ */
+export function parseLeads(leads: CopperLeadResponse[]): CopperLead[] {
+ return leads.map(lead => {
+ const entity = new CopperLead();
+ entity.id = lead.id;
+ entity.name = lead.name || undefined;
+ entity.firstName = lead.first_name || undefined;
+ entity.lastName = lead.last_name || undefined;
+ entity.middleName = lead.middle_name || undefined;
+ entity.assigneeId = lead.assignee_id || undefined;
+ entity.companyName = lead.company_name || undefined;
+ entity.customerSourceId = lead.customer_source_id || undefined;
+ entity.monetaryValue = lead.monetary_value || undefined;
+ entity.status = lead.status;
+ entity.statusId = lead.status_id;
+ entity.title = lead.title || undefined;
+ entity.dateCreated = lead.date_created * ONE_SECOND;
+ entity.dateModified = lead.date_modified * ONE_SECOND;
+ return entity;
+ });
+}
+
+/**
+ * Parse response from Copper API /search/activities/
+ *
+ * @param activities - The array of activities returned from the API
+ * @returns Returns an array of Copper Activity entities
+ */
+export function parseActivities(activities: CopperActivityResponse[]): CopperActivity[] {
+ return activities.map(activity => {
+ const entity = new CopperActivity();
+ entity.id = activity.id;
+
+ entity.parentId = activity.parent.id;
+ entity.parentType = activity.parent.type;
+
+ entity.typeId = activity.type.id;
+ entity.typeCategory = activity.type.category.toString();
+ entity.typeName = activity.type.name;
+
+ entity.userId = activity.user_id;
+ entity.dateCreated = activity.date_created * ONE_SECOND;
+ entity.dateModified = activity.date_modified * ONE_SECOND;
+
+ // nested nullable fields
+ entity.oldValueId = R.path(['old_value', 'id'], activity);
+ entity.oldValueName = R.path(['old_value', 'name'], activity);
+ entity.newValueId = R.path(['new_value', 'id'], activity);
+ entity.newValueName = R.path(['new_value', 'name'], activity);
+
+ return entity;
+ });
+}
+
+/**
+ * Parse response from Copper API /search/opportunities/
+ *
+ * @param opportunities - The array of opportunities returned from the API
+ * @returns Returns an array of Copper Opportunity entities
+ */
+export function parseOpportunities(opportunities: CopperOpportunityResponse[]): CopperOpportunity[] {
+ return opportunities.map(opp => {
+ const customFields: { [key: number]: number } = opp.custom_fields
+ .filter(f => f.value !== null)
+ .map(f => ({
+ ...f,
+ value: ([] as number[]).concat(f.value || []), // normalise all values to number[]
+ }))
+ .map(f => f.value.map(val => [f.custom_field_definition_id, val] as [number, number])) // pair each value with the custom_field_definition_id
+ .reduce((acc, pair) => acc.concat(pair)) // flatten
+ .reduce<{ [key: number]: number }>((obj, [key, value]) => {
+ // transform into object literal
+ obj[key] = value;
+ return obj;
+ }, {});
+
+ const entity = new CopperOpportunity();
+ entity.id = opp.id;
+ entity.name = opp.name;
+ entity.assigneeId = opp.assignee_id || undefined;
+ entity.closeDate = opp.close_date || undefined;
+ entity.companyId = opp.company_id || undefined;
+ entity.companyName = opp.company_name || undefined;
+ entity.customerSourceId = opp.customer_source_id || undefined;
+ entity.lossReasonId = opp.loss_reason_id || undefined;
+ entity.pipelineId = opp.pipeline_id;
+ entity.pipelineStageId = opp.pipeline_stage_id;
+ entity.primaryContactId = opp.primary_contact_id || undefined;
+ entity.priority = opp.priority || undefined;
+ entity.status = opp.status;
+ entity.interactionCount = opp.interaction_count;
+ entity.monetaryValue = opp.monetary_value || undefined;
+ entity.winProbability = opp.win_probability === null ? undefined : opp.win_probability;
+ entity.dateCreated = opp.date_created * ONE_SECOND;
+ entity.dateModified = opp.date_modified * ONE_SECOND;
+ entity.customFields = customFields;
+ return entity;
+ });
+}
+
+/**
+ * Parse response from Copper API /activity_types/
+ *
+ * @param activityTypeResponse - Activity Types response from the API, keyed by "user" or "system"
+ * @returns Returns an array of Copper Activity Type entities
+ */
+export function parseActivityTypes(
+ activityTypeResponse: Map<CopperActivityTypeCategory, CopperActivityTypeResponse[]>,
+): CopperActivityType[] {
+ const values: CopperActivityTypeResponse[] = R.flatten(Object.values(activityTypeResponse));
+ return values.map(activityType => ({
+ id: activityType.id,
+ name: activityType.name,
+ category: activityType.category.toString(),
+ isDisabled: activityType.is_disabled,
+ countAsInteraction: activityType.count_as_interaction,
+ }));
+}
+
+/**
+ * Parse response from Copper API /custom_field_definitions/
+ *
+ * @param customFieldResponse - array of custom field definitions returned from the API, consisting of top-level fields and nested fields
+ * @returns Returns an array of Copper Custom Field entities
+ */
+export function parseCustomFields(customFieldResponse: CopperCustomFieldResponse[]): CopperCustomField[] {
+ function parseTopLevelField(field: CopperCustomFieldResponse): CopperCustomField[] {
+ const topLevelField: CopperCustomField = {
+ id: field.id,
+ name: field.name,
+ dataType: field.data_type.toString(),
+ };
+
+ if (field.options !== undefined) {
+ const nestedFields: CopperCustomField[] = field.options.map(option => ({
+ id: option.id,
+ name: option.name,
+ dataType: field.name,
+ fieldType: 'option',
+ }));
+ return nestedFields.concat(topLevelField);
+ } else {
+ return [topLevelField];
+ }
+ }
+ return R.chain(parseTopLevelField, customFieldResponse);
+}
diff --git a/packages/pipeline/src/parsers/ddex_orders/index.ts b/packages/pipeline/src/parsers/ddex_orders/index.ts
new file mode 100644
index 000000000..eeb9c9d5b
--- /dev/null
+++ b/packages/pipeline/src/parsers/ddex_orders/index.ts
@@ -0,0 +1,71 @@
+import { BigNumber } from '@0x/utils';
+
+import { aggregateOrders } from '../utils';
+
+import { DdexMarket, DdexOrderbook } from '../../data_sources/ddex';
+import { TokenOrderbookSnapshot as TokenOrder } from '../../entities';
+import { OrderType } from '../../types';
+
+/**
+ * Marque function of this file.
+ * 1) Takes in orders from an orderbook,
+ * other information attached.
+ * @param ddexOrderbook A raw orderbook that we pull from the Ddex API.
+ * @param ddexMarket An object containing market data also directly from the API.
+ * @param observedTimestamp Time at which the orders for the market were pulled.
+ * @param source The exchange where these orders are placed. In this case 'ddex'.
+ */
+export function parseDdexOrders(
+ ddexOrderbook: DdexOrderbook,
+ ddexMarket: DdexMarket,
+ observedTimestamp: number,
+ source: string,
+): TokenOrder[] {
+ const aggregatedBids = aggregateOrders(ddexOrderbook.bids);
+ const aggregatedAsks = aggregateOrders(ddexOrderbook.asks);
+ const parsedBids = aggregatedBids.map(order =>
+ parseDdexOrder(ddexMarket, observedTimestamp, OrderType.Bid, source, order),
+ );
+ const parsedAsks = aggregatedAsks.map(order =>
+ parseDdexOrder(ddexMarket, observedTimestamp, OrderType.Ask, source, order),
+ );
+ return parsedBids.concat(parsedAsks);
+}
+
+/**
+ * Parse a single aggregated Ddex order in order to form a tokenOrder entity
+ * which can be saved into the database.
+ * @param ddexMarket An object containing information about the market where these
+ * trades have been placed.
+ * @param observedTimestamp The time when the API response returned back to us.
+ * @param orderType 'bid' or 'ask' enum.
+ * @param source Exchange where these orders were placed.
+ * @param ddexOrder A <price, amount> tuple which we will convert to volume-basis.
+ */
+export function parseDdexOrder(
+ ddexMarket: DdexMarket,
+ observedTimestamp: number,
+ orderType: OrderType,
+ source: string,
+ ddexOrder: [string, BigNumber],
+): TokenOrder {
+ const tokenOrder = new TokenOrder();
+ const price = new BigNumber(ddexOrder[0]);
+ const amount = ddexOrder[1];
+
+ tokenOrder.source = source;
+ tokenOrder.observedTimestamp = observedTimestamp;
+ tokenOrder.orderType = orderType;
+ tokenOrder.price = price;
+
+ // ddex currently confuses quote and base assets.
+ // We switch them here to maintain our internal consistency.
+ tokenOrder.baseAssetSymbol = ddexMarket.quoteToken;
+ tokenOrder.baseAssetAddress = ddexMarket.quoteTokenAddress;
+ tokenOrder.baseVolume = amount;
+
+ tokenOrder.quoteAssetSymbol = ddexMarket.baseToken;
+ tokenOrder.quoteAssetAddress = ddexMarket.baseTokenAddress;
+ tokenOrder.quoteVolume = price.times(amount);
+ return tokenOrder;
+}
diff --git a/packages/pipeline/src/parsers/events/erc20_events.ts b/packages/pipeline/src/parsers/events/erc20_events.ts
new file mode 100644
index 000000000..caf9984d0
--- /dev/null
+++ b/packages/pipeline/src/parsers/events/erc20_events.ts
@@ -0,0 +1,34 @@
+import { ERC20TokenApprovalEventArgs } from '@0x/contract-wrappers';
+import { LogWithDecodedArgs } from 'ethereum-types';
+import * as R from 'ramda';
+
+import { ERC20ApprovalEvent } from '../../entities';
+
+/**
+ * Parses raw event logs for an ERC20 approval event and returns an array of
+ * ERC20ApprovalEvent entities.
+ * @param eventLogs Raw event logs (e.g. returned from contract-wrappers).
+ */
+export const parseERC20ApprovalEvents: (
+ eventLogs: Array<LogWithDecodedArgs<ERC20TokenApprovalEventArgs>>,
+) => ERC20ApprovalEvent[] = R.map(_convertToERC20ApprovalEvent);
+
+/**
+ * Converts a raw event log for an ERC20 approval event into an
+ * ERC20ApprovalEvent entity.
+ * @param eventLog Raw event log (e.g. returned from contract-wrappers).
+ */
+export function _convertToERC20ApprovalEvent(
+ eventLog: LogWithDecodedArgs<ERC20TokenApprovalEventArgs>,
+): ERC20ApprovalEvent {
+ const erc20ApprovalEvent = new ERC20ApprovalEvent();
+ erc20ApprovalEvent.tokenAddress = eventLog.address as string;
+ erc20ApprovalEvent.blockNumber = eventLog.blockNumber as number;
+ erc20ApprovalEvent.logIndex = eventLog.logIndex as number;
+ erc20ApprovalEvent.rawData = eventLog.data as string;
+ erc20ApprovalEvent.transactionHash = eventLog.transactionHash;
+ erc20ApprovalEvent.ownerAddress = eventLog.args._owner;
+ erc20ApprovalEvent.spenderAddress = eventLog.args._spender;
+ erc20ApprovalEvent.amount = eventLog.args._value;
+ return erc20ApprovalEvent;
+}
diff --git a/packages/pipeline/src/parsers/events/exchange_events.ts b/packages/pipeline/src/parsers/events/exchange_events.ts
new file mode 100644
index 000000000..9c4a5f89a
--- /dev/null
+++ b/packages/pipeline/src/parsers/events/exchange_events.ts
@@ -0,0 +1,145 @@
+import { ExchangeCancelEventArgs, ExchangeCancelUpToEventArgs, ExchangeFillEventArgs } from '@0x/contract-wrappers';
+import { assetDataUtils } from '@0x/order-utils';
+import { AssetProxyId, ERC721AssetData } from '@0x/types';
+import { LogWithDecodedArgs } from 'ethereum-types';
+import * as R from 'ramda';
+
+import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeFillEvent } from '../../entities';
+import { bigNumbertoStringOrNull, convertAssetProxyIdToType } from '../../utils';
+
+/**
+ * Parses raw event logs for a fill event and returns an array of
+ * ExchangeFillEvent entities.
+ * @param eventLogs Raw event logs (e.g. returned from contract-wrappers).
+ */
+export const parseExchangeFillEvents: (
+ eventLogs: Array<LogWithDecodedArgs<ExchangeFillEventArgs>>,
+) => ExchangeFillEvent[] = R.map(_convertToExchangeFillEvent);
+
+/**
+ * Parses raw event logs for a cancel event and returns an array of
+ * ExchangeCancelEvent entities.
+ * @param eventLogs Raw event logs (e.g. returned from contract-wrappers).
+ */
+export const parseExchangeCancelEvents: (
+ eventLogs: Array<LogWithDecodedArgs<ExchangeCancelEventArgs>>,
+) => ExchangeCancelEvent[] = R.map(_convertToExchangeCancelEvent);
+
+/**
+ * Parses raw event logs for a CancelUpTo event and returns an array of
+ * ExchangeCancelUpToEvent entities.
+ * @param eventLogs Raw event logs (e.g. returned from contract-wrappers).
+ */
+export const parseExchangeCancelUpToEvents: (
+ eventLogs: Array<LogWithDecodedArgs<ExchangeCancelUpToEventArgs>>,
+) => ExchangeCancelUpToEvent[] = R.map(_convertToExchangeCancelUpToEvent);
+
+/**
+ * Converts a raw event log for a fill event into an ExchangeFillEvent entity.
+ * @param eventLog Raw event log (e.g. returned from contract-wrappers).
+ */
+export function _convertToExchangeFillEvent(eventLog: LogWithDecodedArgs<ExchangeFillEventArgs>): ExchangeFillEvent {
+ const makerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.makerAssetData);
+ const takerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.takerAssetData);
+ const exchangeFillEvent = new ExchangeFillEvent();
+ exchangeFillEvent.contractAddress = eventLog.address as string;
+ exchangeFillEvent.blockNumber = eventLog.blockNumber as number;
+ exchangeFillEvent.logIndex = eventLog.logIndex as number;
+ exchangeFillEvent.rawData = eventLog.data as string;
+ exchangeFillEvent.transactionHash = eventLog.transactionHash;
+ exchangeFillEvent.makerAddress = eventLog.args.makerAddress;
+ exchangeFillEvent.takerAddress = eventLog.args.takerAddress;
+ exchangeFillEvent.feeRecipientAddress = eventLog.args.feeRecipientAddress;
+ exchangeFillEvent.senderAddress = eventLog.args.senderAddress;
+ exchangeFillEvent.makerAssetFilledAmount = eventLog.args.makerAssetFilledAmount;
+ exchangeFillEvent.takerAssetFilledAmount = eventLog.args.takerAssetFilledAmount;
+ exchangeFillEvent.makerFeePaid = eventLog.args.makerFeePaid;
+ exchangeFillEvent.takerFeePaid = eventLog.args.takerFeePaid;
+ exchangeFillEvent.orderHash = eventLog.args.orderHash;
+ exchangeFillEvent.rawMakerAssetData = eventLog.args.makerAssetData;
+ // tslint:disable-next-line:no-unnecessary-type-assertion
+ exchangeFillEvent.makerAssetType = convertAssetProxyIdToType(makerAssetData.assetProxyId as AssetProxyId);
+ exchangeFillEvent.makerAssetProxyId = makerAssetData.assetProxyId;
+ // HACK(abandeali1): this event schema currently does not support multiple maker/taker assets, so we store the first token address from the MultiAssetProxy assetData
+ exchangeFillEvent.makerTokenAddress = assetDataUtils.isMultiAssetData(makerAssetData)
+ ? assetDataUtils.decodeMultiAssetDataRecursively(eventLog.args.makerAssetData).nestedAssetData[0].tokenAddress
+ : makerAssetData.tokenAddress;
+ // tslint has a false positive here. Type assertion is required.
+ // tslint:disable-next-line:no-unnecessary-type-assertion
+ exchangeFillEvent.makerTokenId = bigNumbertoStringOrNull((makerAssetData as ERC721AssetData).tokenId);
+ exchangeFillEvent.rawTakerAssetData = eventLog.args.takerAssetData;
+ // tslint:disable-next-line:no-unnecessary-type-assertion
+ exchangeFillEvent.takerAssetType = convertAssetProxyIdToType(takerAssetData.assetProxyId as AssetProxyId);
+ exchangeFillEvent.takerAssetProxyId = takerAssetData.assetProxyId;
+ // HACK(abandeali1): this event schema currently does not support multiple maker/taker assets, so we store the first token address from the MultiAssetProxy assetData
+ exchangeFillEvent.takerTokenAddress = assetDataUtils.isMultiAssetData(takerAssetData)
+ ? assetDataUtils.decodeMultiAssetDataRecursively(eventLog.args.takerAssetData).nestedAssetData[0].tokenAddress
+ : takerAssetData.tokenAddress;
+ // tslint:disable-next-line:no-unnecessary-type-assertion
+ exchangeFillEvent.takerTokenId = bigNumbertoStringOrNull((takerAssetData as ERC721AssetData).tokenId);
+ return exchangeFillEvent;
+}
+
+/**
+ * Converts a raw event log for a cancel event into an ExchangeCancelEvent
+ * entity.
+ * @param eventLog Raw event log (e.g. returned from contract-wrappers).
+ */
+export function _convertToExchangeCancelEvent(
+ eventLog: LogWithDecodedArgs<ExchangeCancelEventArgs>,
+): ExchangeCancelEvent {
+ const makerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.makerAssetData);
+ const takerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.takerAssetData);
+ const exchangeCancelEvent = new ExchangeCancelEvent();
+ exchangeCancelEvent.contractAddress = eventLog.address as string;
+ exchangeCancelEvent.blockNumber = eventLog.blockNumber as number;
+ exchangeCancelEvent.logIndex = eventLog.logIndex as number;
+ exchangeCancelEvent.rawData = eventLog.data as string;
+ exchangeCancelEvent.transactionHash = eventLog.transactionHash;
+ exchangeCancelEvent.makerAddress = eventLog.args.makerAddress;
+ exchangeCancelEvent.takerAddress = eventLog.args.takerAddress;
+ exchangeCancelEvent.feeRecipientAddress = eventLog.args.feeRecipientAddress;
+ exchangeCancelEvent.senderAddress = eventLog.args.senderAddress;
+ exchangeCancelEvent.orderHash = eventLog.args.orderHash;
+ exchangeCancelEvent.rawMakerAssetData = eventLog.args.makerAssetData;
+ // tslint:disable-next-line:no-unnecessary-type-assertion
+ exchangeCancelEvent.makerAssetType = convertAssetProxyIdToType(makerAssetData.assetProxyId as AssetProxyId);
+ exchangeCancelEvent.makerAssetProxyId = makerAssetData.assetProxyId;
+ // HACK(abandeali1): this event schema currently does not support multiple maker/taker assets, so we store the first token address from the MultiAssetProxy assetData
+ exchangeCancelEvent.makerTokenAddress = assetDataUtils.isMultiAssetData(makerAssetData)
+ ? assetDataUtils.decodeMultiAssetDataRecursively(eventLog.args.makerAssetData).nestedAssetData[0].tokenAddress
+ : makerAssetData.tokenAddress;
+ // tslint:disable-next-line:no-unnecessary-type-assertion
+ exchangeCancelEvent.makerTokenId = bigNumbertoStringOrNull((makerAssetData as ERC721AssetData).tokenId);
+ exchangeCancelEvent.rawTakerAssetData = eventLog.args.takerAssetData;
+ // tslint:disable-next-line:no-unnecessary-type-assertion
+ exchangeCancelEvent.takerAssetType = convertAssetProxyIdToType(takerAssetData.assetProxyId as AssetProxyId);
+ exchangeCancelEvent.takerAssetProxyId = takerAssetData.assetProxyId;
+ // HACK(abandeali1): this event schema currently does not support multiple maker/taker assets, so we store the first token address from the MultiAssetProxy assetData
+ exchangeCancelEvent.takerTokenAddress = assetDataUtils.isMultiAssetData(takerAssetData)
+ ? assetDataUtils.decodeMultiAssetDataRecursively(eventLog.args.takerAssetData).nestedAssetData[0].tokenAddress
+ : takerAssetData.tokenAddress;
+ // tslint:disable-next-line:no-unnecessary-type-assertion
+ exchangeCancelEvent.takerTokenId = bigNumbertoStringOrNull((takerAssetData as ERC721AssetData).tokenId);
+ return exchangeCancelEvent;
+}
+
+/**
+ * Converts a raw event log for a cancelUpTo event into an
+ * ExchangeCancelUpToEvent entity.
+ * @param eventLog Raw event log (e.g. returned from contract-wrappers).
+ */
+export function _convertToExchangeCancelUpToEvent(
+ eventLog: LogWithDecodedArgs<ExchangeCancelUpToEventArgs>,
+): ExchangeCancelUpToEvent {
+ const exchangeCancelUpToEvent = new ExchangeCancelUpToEvent();
+ exchangeCancelUpToEvent.contractAddress = eventLog.address as string;
+ exchangeCancelUpToEvent.blockNumber = eventLog.blockNumber as number;
+ exchangeCancelUpToEvent.logIndex = eventLog.logIndex as number;
+ exchangeCancelUpToEvent.rawData = eventLog.data as string;
+ exchangeCancelUpToEvent.transactionHash = eventLog.transactionHash;
+ exchangeCancelUpToEvent.makerAddress = eventLog.args.makerAddress;
+ exchangeCancelUpToEvent.senderAddress = eventLog.args.senderAddress;
+ exchangeCancelUpToEvent.orderEpoch = eventLog.args.orderEpoch;
+ return exchangeCancelUpToEvent;
+}
diff --git a/packages/pipeline/src/parsers/events/index.ts b/packages/pipeline/src/parsers/events/index.ts
new file mode 100644
index 000000000..3f9915e8b
--- /dev/null
+++ b/packages/pipeline/src/parsers/events/index.ts
@@ -0,0 +1,2 @@
+export { parseExchangeCancelEvents, parseExchangeCancelUpToEvents, parseExchangeFillEvents } from './exchange_events';
+export { parseERC20ApprovalEvents } from './erc20_events';
diff --git a/packages/pipeline/src/parsers/idex_orders/index.ts b/packages/pipeline/src/parsers/idex_orders/index.ts
new file mode 100644
index 000000000..14b871195
--- /dev/null
+++ b/packages/pipeline/src/parsers/idex_orders/index.ts
@@ -0,0 +1,81 @@
+import { BigNumber } from '@0x/utils';
+
+import { aggregateOrders } from '../utils';
+
+import { IdexOrderbook, IdexOrderParam } from '../../data_sources/idex';
+import { TokenOrderbookSnapshot as TokenOrder } from '../../entities';
+import { OrderType } from '../../types';
+
+/**
+ * Marque function of this file.
+ * 1) Takes in orders from an orderbook,
+ * 2) Aggregates them by price point,
+ * 3) Parses them into entities which are then saved into the database.
+ * @param idexOrderbook raw orderbook that we pull from the Idex API.
+ * @param observedTimestamp Time at which the orders for the market were pulled.
+ * @param source The exchange where these orders are placed. In this case 'idex'.
+ */
+export function parseIdexOrders(idexOrderbook: IdexOrderbook, observedTimestamp: number, source: string): TokenOrder[] {
+ const aggregatedBids = aggregateOrders(idexOrderbook.bids);
+ // Any of the bid orders' params will work
+ const idexBidOrder = idexOrderbook.bids[0];
+ const parsedBids =
+ aggregatedBids.length > 0
+ ? aggregatedBids.map(order =>
+ parseIdexOrder(idexBidOrder.params, observedTimestamp, OrderType.Bid, source, order),
+ )
+ : [];
+
+ const aggregatedAsks = aggregateOrders(idexOrderbook.asks);
+ // Any of the ask orders' params will work
+ const idexAskOrder = idexOrderbook.asks[0];
+ const parsedAsks =
+ aggregatedAsks.length > 0
+ ? aggregatedAsks.map(order =>
+ parseIdexOrder(idexAskOrder.params, observedTimestamp, OrderType.Ask, source, order),
+ )
+ : [];
+ return parsedBids.concat(parsedAsks);
+}
+
+/**
+ * Parse a single aggregated Idex order in order to form a tokenOrder entity
+ * which can be saved into the database.
+ * @param idexOrderParam An object containing information about the market where these
+ * trades have been placed.
+ * @param observedTimestamp The time when the API response returned back to us.
+ * @param orderType 'bid' or 'ask' enum.
+ * @param source Exchange where these orders were placed.
+ * @param idexOrder A <price, amount> tuple which we will convert to volume-basis.
+ */
+export function parseIdexOrder(
+ idexOrderParam: IdexOrderParam,
+ observedTimestamp: number,
+ orderType: OrderType,
+ source: string,
+ idexOrder: [string, BigNumber],
+): TokenOrder {
+ const tokenOrder = new TokenOrder();
+ const price = new BigNumber(idexOrder[0]);
+ const amount = idexOrder[1];
+
+ tokenOrder.source = source;
+ tokenOrder.observedTimestamp = observedTimestamp;
+ tokenOrder.orderType = orderType;
+ tokenOrder.price = price;
+ tokenOrder.baseVolume = amount;
+ tokenOrder.quoteVolume = price.times(amount);
+
+ if (orderType === OrderType.Bid) {
+ tokenOrder.baseAssetSymbol = idexOrderParam.buySymbol;
+ tokenOrder.baseAssetAddress = idexOrderParam.tokenBuy;
+ tokenOrder.quoteAssetSymbol = idexOrderParam.sellSymbol;
+ tokenOrder.quoteAssetAddress = idexOrderParam.tokenSell;
+ } else {
+ tokenOrder.baseAssetSymbol = idexOrderParam.sellSymbol;
+ tokenOrder.baseAssetAddress = idexOrderParam.tokenSell;
+ tokenOrder.quoteAssetSymbol = idexOrderParam.buySymbol;
+ tokenOrder.quoteAssetAddress = idexOrderParam.tokenBuy;
+ }
+ return tokenOrder;
+}
diff --git a/packages/pipeline/src/parsers/oasis_orders/index.ts b/packages/pipeline/src/parsers/oasis_orders/index.ts
new file mode 100644
index 000000000..b71fb65b9
--- /dev/null
+++ b/packages/pipeline/src/parsers/oasis_orders/index.ts
@@ -0,0 +1,71 @@
+import { BigNumber } from '@0x/utils';
+import * as R from 'ramda';
+
+import { aggregateOrders } from '../utils';
+
+import { OasisMarket, OasisOrder } from '../../data_sources/oasis';
+import { TokenOrderbookSnapshot as TokenOrder } from '../../entities';
+import { OrderType } from '../../types';
+
+/**
+ * Marque function of this file.
+ * 1) Takes in orders from an orderbook,
+ * 2) Aggregates them according to price point,
+ * 3) Builds TokenOrder entity with other information attached.
+ * @param oasisOrderbook A raw orderbook that we pull from the Oasis API.
+ * @param oasisMarket An object containing market data also directly from the API.
+ * @param observedTimestamp Time at which the orders for the market were pulled.
+ * @param source The exchange where these orders are placed. In this case 'oasis'.
+ */
+export function parseOasisOrders(
+ oasisOrderbook: OasisOrder[],
+ oasisMarket: OasisMarket,
+ observedTimestamp: number,
+ source: string,
+): TokenOrder[] {
+ const aggregatedBids = aggregateOrders(R.filter(R.propEq('act', OrderType.Bid), oasisOrderbook));
+ const aggregatedAsks = aggregateOrders(R.filter(R.propEq('act', OrderType.Ask), oasisOrderbook));
+ const parsedBids = aggregatedBids.map(order =>
+ parseOasisOrder(oasisMarket, observedTimestamp, OrderType.Bid, source, order),
+ );
+ const parsedAsks = aggregatedAsks.map(order =>
+ parseOasisOrder(oasisMarket, observedTimestamp, OrderType.Ask, source, order),
+ );
+ return parsedBids.concat(parsedAsks);
+}
+
+/**
+ * Parse a single aggregated Oasis order to form a tokenOrder entity
+ * which can be saved into the database.
+ * @param oasisMarket An object containing information about the market where these
+ * trades have been placed.
+ * @param observedTimestamp The time when the API response returned back to us.
+ * @param orderType 'bid' or 'ask' enum.
+ * @param source Exchange where these orders were placed.
+ * @param oasisOrder A <price, amount> tuple which we will convert to volume-basis.
+ */
+export function parseOasisOrder(
+ oasisMarket: OasisMarket,
+ observedTimestamp: number,
+ orderType: OrderType,
+ source: string,
+ oasisOrder: [string, BigNumber],
+): TokenOrder {
+ const tokenOrder = new TokenOrder();
+ const price = new BigNumber(oasisOrder[0]);
+ const amount = oasisOrder[1];
+
+ tokenOrder.source = source;
+ tokenOrder.observedTimestamp = observedTimestamp;
+ tokenOrder.orderType = orderType;
+ tokenOrder.price = price;
+
+ tokenOrder.baseAssetSymbol = oasisMarket.base;
+ tokenOrder.baseAssetAddress = null; // Oasis doesn't provide address information
+ tokenOrder.baseVolume = amount;
+
+ tokenOrder.quoteAssetSymbol = oasisMarket.quote;
+ tokenOrder.quoteAssetAddress = null; // Oasis doesn't provide address information
+ tokenOrder.quoteVolume = price.times(amount);
+ return tokenOrder;
+}
diff --git a/packages/pipeline/src/parsers/ohlcv_external/crypto_compare.ts b/packages/pipeline/src/parsers/ohlcv_external/crypto_compare.ts
new file mode 100644
index 000000000..3efb90384
--- /dev/null
+++ b/packages/pipeline/src/parsers/ohlcv_external/crypto_compare.ts
@@ -0,0 +1,38 @@
+import { CryptoCompareOHLCVRecord } from '../../data_sources/ohlcv_external/crypto_compare';
+import { OHLCVExternal } from '../../entities';
+
+const ONE_SECOND = 1000; // Crypto Compare uses timestamps in seconds instead of milliseconds
+
+export interface OHLCVMetadata {
+ exchange: string;
+ fromSymbol: string;
+ toSymbol: string;
+ source: string;
+ observedTimestamp: number;
+ interval: number;
+}
+/**
+ * Parses OHLCV records from Crypto Compare into an array of OHLCVExternal entities
+ * @param rawRecords an array of OHLCV records from Crypto Compare (not the full response)
+ */
+export function parseRecords(rawRecords: CryptoCompareOHLCVRecord[], metadata: OHLCVMetadata): OHLCVExternal[] {
+ return rawRecords.map(rec => {
+ const ohlcvRecord = new OHLCVExternal();
+ ohlcvRecord.exchange = metadata.exchange;
+ ohlcvRecord.fromSymbol = metadata.fromSymbol;
+ ohlcvRecord.toSymbol = metadata.toSymbol;
+ ohlcvRecord.startTime = rec.time * ONE_SECOND - metadata.interval;
+ ohlcvRecord.endTime = rec.time * ONE_SECOND;
+
+ ohlcvRecord.open = rec.open;
+ ohlcvRecord.close = rec.close;
+ ohlcvRecord.low = rec.low;
+ ohlcvRecord.high = rec.high;
+ ohlcvRecord.volumeFrom = rec.volumefrom;
+ ohlcvRecord.volumeTo = rec.volumeto;
+
+ ohlcvRecord.source = metadata.source;
+ ohlcvRecord.observedTimestamp = metadata.observedTimestamp;
+ return ohlcvRecord;
+ });
+}
diff --git a/packages/pipeline/src/parsers/paradex_orders/index.ts b/packages/pipeline/src/parsers/paradex_orders/index.ts
new file mode 100644
index 000000000..85990dae4
--- /dev/null
+++ b/packages/pipeline/src/parsers/paradex_orders/index.ts
@@ -0,0 +1,66 @@
+import { BigNumber } from '@0x/utils';
+
+import { ParadexMarket, ParadexOrder, ParadexOrderbookResponse } from '../../data_sources/paradex';
+import { TokenOrderbookSnapshot as TokenOrder } from '../../entities';
+import { OrderType } from '../../types';
+
+/**
+ * Marque function of this file.
+ * 1) Takes in orders from an orderbook (orders are already aggregated by price point),
+ * 2) For each aggregated order, forms a TokenOrder entity with market data and
+ * other information attached.
+ * @param paradexOrderbookResponse An orderbook response from the Paradex API.
+ * @param paradexMarket An object containing market data also directly from the API.
+ * @param observedTimestamp Time at which the orders for the market were pulled.
+ * @param source The exchange where these orders are placed. In this case 'paradex'.
+ */
+export function parseParadexOrders(
+ paradexOrderbookResponse: ParadexOrderbookResponse,
+ paradexMarket: ParadexMarket,
+ observedTimestamp: number,
+ source: string,
+): TokenOrder[] {
+ const parsedBids = paradexOrderbookResponse.bids.map(order =>
+ parseParadexOrder(paradexMarket, observedTimestamp, OrderType.Bid, source, order),
+ );
+ const parsedAsks = paradexOrderbookResponse.asks.map(order =>
+ parseParadexOrder(paradexMarket, observedTimestamp, OrderType.Ask, source, order),
+ );
+ return parsedBids.concat(parsedAsks);
+}
+
+/**
+ * Parse a single aggregated Ddex order in order to form a tokenOrder entity
+ * which can be saved into the database.
+ * @param paradexMarket An object containing information about the market where these
+ * orders have been placed.
+ * @param observedTimestamp The time when the API response returned back to us.
+ * @param orderType 'bid' or 'ask' enum.
+ * @param source Exchange where these orders were placed.
+ * @param paradexOrder A ParadexOrder object; basically price, amount tuple.
+ */
+export function parseParadexOrder(
+ paradexMarket: ParadexMarket,
+ observedTimestamp: number,
+ orderType: OrderType,
+ source: string,
+ paradexOrder: ParadexOrder,
+): TokenOrder {
+ const tokenOrder = new TokenOrder();
+ const price = new BigNumber(paradexOrder.price);
+ const amount = new BigNumber(paradexOrder.amount);
+
+ tokenOrder.source = source;
+ tokenOrder.observedTimestamp = observedTimestamp;
+ tokenOrder.orderType = orderType;
+ tokenOrder.price = price;
+
+ tokenOrder.baseAssetSymbol = paradexMarket.baseToken;
+ tokenOrder.baseAssetAddress = paradexMarket.baseTokenAddress as string;
+ tokenOrder.baseVolume = amount;
+
+ tokenOrder.quoteAssetSymbol = paradexMarket.quoteToken;
+ tokenOrder.quoteAssetAddress = paradexMarket.quoteTokenAddress as string;
+ tokenOrder.quoteVolume = price.times(amount);
+ return tokenOrder;
+}
diff --git a/packages/pipeline/src/parsers/relayer_registry/index.ts b/packages/pipeline/src/parsers/relayer_registry/index.ts
new file mode 100644
index 000000000..9723880a4
--- /dev/null
+++ b/packages/pipeline/src/parsers/relayer_registry/index.ts
@@ -0,0 +1,37 @@
+import * as R from 'ramda';
+
+import { RelayerResponse, RelayerResponseNetwork } from '../../data_sources/relayer-registry';
+import { Relayer } from '../../entities';
+
+/**
+ * Parses a raw relayer registry response into an array of Relayer entities.
+ * @param rawResp raw response from the relayer-registry json file.
+ */
+export function parseRelayers(rawResp: Map<string, RelayerResponse>): Relayer[] {
+ const parsedAsObject = R.mapObjIndexed(parseRelayer, rawResp);
+ return R.values(parsedAsObject);
+}
+
+function parseRelayer(relayerResp: RelayerResponse, uuid: string): Relayer {
+ const relayer = new Relayer();
+ relayer.uuid = uuid;
+ relayer.name = relayerResp.name;
+ relayer.homepageUrl = relayerResp.homepage_url;
+ relayer.appUrl = relayerResp.app_url;
+ const mainNetworkRelayerInfo = getMainNetwork(relayerResp);
+ if (mainNetworkRelayerInfo !== undefined) {
+ relayer.sraHttpEndpoint = mainNetworkRelayerInfo.sra_http_endpoint || null;
+ relayer.sraWsEndpoint = mainNetworkRelayerInfo.sra_ws_endpoint || null;
+ relayer.feeRecipientAddresses =
+ R.path(['static_order_fields', 'fee_recipient_addresses'], mainNetworkRelayerInfo) || [];
+ relayer.takerAddresses = R.path(['static_order_fields', 'taker_addresses'], mainNetworkRelayerInfo) || [];
+ } else {
+ relayer.feeRecipientAddresses = [];
+ relayer.takerAddresses = [];
+ }
+ return relayer;
+}
+
+function getMainNetwork(relayerResp: RelayerResponse): RelayerResponseNetwork | undefined {
+ return R.find(network => network.networkId === 1, relayerResp.networks);
+}
diff --git a/packages/pipeline/src/parsers/sra_orders/index.ts b/packages/pipeline/src/parsers/sra_orders/index.ts
new file mode 100644
index 000000000..13fe632a4
--- /dev/null
+++ b/packages/pipeline/src/parsers/sra_orders/index.ts
@@ -0,0 +1,68 @@
+import { APIOrder, OrdersResponse } from '@0x/connect';
+import { assetDataUtils, orderHashUtils } from '@0x/order-utils';
+import { AssetProxyId, ERC721AssetData } from '@0x/types';
+import * as R from 'ramda';
+
+import { SraOrder } from '../../entities';
+import { bigNumbertoStringOrNull, convertAssetProxyIdToType } from '../../utils';
+
+/**
+ * Parses a raw order response from an SRA endpoint and returns an array of
+ * SraOrder entities.
+ * @param rawOrdersResponse A raw order response from an SRA endpoint.
+ */
+export function parseSraOrders(rawOrdersResponse: OrdersResponse): SraOrder[] {
+ return R.map(_convertToEntity, rawOrdersResponse.records);
+}
+
+/**
+ * Converts a single APIOrder into an SraOrder entity.
+ * @param apiOrder A single order from the response from an SRA endpoint.
+ */
+export function _convertToEntity(apiOrder: APIOrder): SraOrder {
+ // TODO(albrow): refactor out common asset data decoding code.
+ const makerAssetData = assetDataUtils.decodeAssetDataOrThrow(apiOrder.order.makerAssetData);
+ const takerAssetData = assetDataUtils.decodeAssetDataOrThrow(apiOrder.order.takerAssetData);
+
+ const sraOrder = new SraOrder();
+ sraOrder.exchangeAddress = apiOrder.order.exchangeAddress;
+ sraOrder.orderHashHex = orderHashUtils.getOrderHashHex(apiOrder.order);
+
+ sraOrder.makerAddress = apiOrder.order.makerAddress;
+ sraOrder.takerAddress = apiOrder.order.takerAddress;
+ sraOrder.feeRecipientAddress = apiOrder.order.feeRecipientAddress;
+ sraOrder.senderAddress = apiOrder.order.senderAddress;
+ sraOrder.makerAssetAmount = apiOrder.order.makerAssetAmount;
+ sraOrder.takerAssetAmount = apiOrder.order.takerAssetAmount;
+ sraOrder.makerFee = apiOrder.order.makerFee;
+ sraOrder.takerFee = apiOrder.order.takerFee;
+ sraOrder.expirationTimeSeconds = apiOrder.order.expirationTimeSeconds;
+ sraOrder.salt = apiOrder.order.salt;
+ sraOrder.signature = apiOrder.order.signature;
+
+ sraOrder.rawMakerAssetData = apiOrder.order.makerAssetData;
+ // tslint:disable-next-line:no-unnecessary-type-assertion
+ sraOrder.makerAssetType = convertAssetProxyIdToType(makerAssetData.assetProxyId as AssetProxyId);
+ sraOrder.makerAssetProxyId = makerAssetData.assetProxyId;
+ // HACK(abandeali1): this event schema currently does not support multiple maker/taker assets, so we store the first token address from the MultiAssetProxy assetData
+ sraOrder.makerTokenAddress = assetDataUtils.isMultiAssetData(makerAssetData)
+ ? assetDataUtils.decodeMultiAssetDataRecursively(apiOrder.order.makerAssetData).nestedAssetData[0].tokenAddress
+ : makerAssetData.tokenAddress;
+ // tslint has a false positive here. Type assertion is required.
+ // tslint:disable-next-line:no-unnecessary-type-assertion
+ sraOrder.makerTokenId = bigNumbertoStringOrNull((makerAssetData as ERC721AssetData).tokenId);
+ sraOrder.rawTakerAssetData = apiOrder.order.takerAssetData;
+ // tslint:disable-next-line:no-unnecessary-type-assertion
+ sraOrder.takerAssetType = convertAssetProxyIdToType(takerAssetData.assetProxyId as AssetProxyId);
+ sraOrder.takerAssetProxyId = takerAssetData.assetProxyId;
+ // HACK(abandeali1): this event schema currently does not support multiple maker/taker assets, so we store the first token address from the MultiAssetProxy assetData
+ sraOrder.takerTokenAddress = assetDataUtils.isMultiAssetData(takerAssetData)
+ ? assetDataUtils.decodeMultiAssetDataRecursively(apiOrder.order.takerAssetData).nestedAssetData[0].tokenAddress
+ : takerAssetData.tokenAddress;
+ // tslint:disable-next-line:no-unnecessary-type-assertion
+ sraOrder.takerTokenId = bigNumbertoStringOrNull((takerAssetData as ERC721AssetData).tokenId);
+
+ sraOrder.metadataJson = JSON.stringify(apiOrder.metaData);
+
+ return sraOrder;
+}
diff --git a/packages/pipeline/src/parsers/token_metadata/index.ts b/packages/pipeline/src/parsers/token_metadata/index.ts
new file mode 100644
index 000000000..65e0aaa6e
--- /dev/null
+++ b/packages/pipeline/src/parsers/token_metadata/index.ts
@@ -0,0 +1,46 @@
+import * as R from 'ramda';
+
+import { MetamaskTrustedTokenMeta, ZeroExTrustedTokenMeta } from '../../data_sources/trusted_tokens';
+import { TokenMetadata } from '../../entities';
+import { toBigNumberOrNull } from '../../utils';
+
+/**
+ * Parses Metamask's trusted tokens list.
+ * @param rawResp raw response from the metamask json file.
+ */
+export function parseMetamaskTrustedTokens(rawResp: Map<string, MetamaskTrustedTokenMeta>): TokenMetadata[] {
+ const parsedAsObject = R.mapObjIndexed(parseMetamaskTrustedToken, rawResp);
+ return R.values(parsedAsObject);
+}
+
+/**
+ * Parses 0x's trusted tokens list.
+ * @param rawResp raw response from the 0x trusted tokens file.
+ */
+export function parseZeroExTrustedTokens(rawResp: ZeroExTrustedTokenMeta[]): TokenMetadata[] {
+ return R.map(parseZeroExTrustedToken, rawResp);
+}
+
+function parseMetamaskTrustedToken(resp: MetamaskTrustedTokenMeta, address: string): TokenMetadata {
+ const trustedToken = new TokenMetadata();
+
+ trustedToken.address = address;
+ trustedToken.decimals = toBigNumberOrNull(resp.decimals);
+ trustedToken.symbol = resp.symbol;
+ trustedToken.name = resp.name;
+ trustedToken.authority = 'metamask';
+
+ return trustedToken;
+}
+
+function parseZeroExTrustedToken(resp: ZeroExTrustedTokenMeta): TokenMetadata {
+ const trustedToken = new TokenMetadata();
+
+ trustedToken.address = resp.address;
+ trustedToken.decimals = toBigNumberOrNull(resp.decimals);
+ trustedToken.symbol = resp.symbol;
+ trustedToken.name = resp.name;
+ trustedToken.authority = '0x';
+
+ return trustedToken;
+}
diff --git a/packages/pipeline/src/parsers/utils.ts b/packages/pipeline/src/parsers/utils.ts
new file mode 100644
index 000000000..860729e9f
--- /dev/null
+++ b/packages/pipeline/src/parsers/utils.ts
@@ -0,0 +1,28 @@
+import { BigNumber } from '@0x/utils';
+
+export interface GenericRawOrder {
+ price: string;
+ amount: string;
+}
+
+/**
+ * Aggregates individual orders by price point. Filters zero amount orders.
+ * @param rawOrders An array of objects that have price and amount information.
+ */
+export function aggregateOrders(rawOrders: GenericRawOrder[]): Array<[string, BigNumber]> {
+ const aggregatedOrders = new Map<string, BigNumber>();
+ rawOrders.forEach(order => {
+ const amount = new BigNumber(order.amount);
+ if (amount.isZero()) {
+ return;
+ }
+ // Use string instead of BigNum to aggregate by value instead of variable.
+ // Convert to BigNumber first to consolidate different string
+ // representations of the same number. Eg. '0.0' and '0.00'.
+ const price = new BigNumber(order.price).toString();
+
+ const existingAmount = aggregatedOrders.get(price) || new BigNumber(0);
+ aggregatedOrders.set(price, amount.plus(existingAmount));
+ });
+ return Array.from(aggregatedOrders.entries());
+}
diff --git a/packages/pipeline/src/parsers/web3/index.ts b/packages/pipeline/src/parsers/web3/index.ts
new file mode 100644
index 000000000..f986efc59
--- /dev/null
+++ b/packages/pipeline/src/parsers/web3/index.ts
@@ -0,0 +1,49 @@
+import { BigNumber } from '@0x/utils';
+import { BlockWithoutTransactionData, Transaction as EthTransaction } from 'ethereum-types';
+
+import { Block, Transaction } from '../../entities';
+
+const MILLISECONDS_PER_SECOND = 1000;
+
+/**
+ * Parses a raw block and returns a Block entity.
+ * @param rawBlock a raw block (e.g. returned from web3-wrapper).
+ */
+export function parseBlock(rawBlock: BlockWithoutTransactionData): Block {
+ if (rawBlock.hash == null) {
+ throw new Error('Tried to parse raw block but hash was null');
+ }
+ if (rawBlock.number == null) {
+ throw new Error('Tried to parse raw block but number was null');
+ }
+
+ const block = new Block();
+ block.hash = rawBlock.hash;
+ block.number = rawBlock.number;
+ // Block timestamps are in seconds, but we use milliseconds everywhere else.
+ block.timestamp = rawBlock.timestamp * MILLISECONDS_PER_SECOND;
+ return block;
+}
+
+/**
+ * Parses a raw transaction and returns a Transaction entity.
+ * @param rawBlock a raw transaction (e.g. returned from web3-wrapper).
+ */
+export function parseTransaction(rawTransaction: EthTransaction): Transaction {
+ if (rawTransaction.blockHash == null) {
+ throw new Error('Tried to parse raw transaction but blockHash was null');
+ }
+ if (rawTransaction.blockNumber == null) {
+ throw new Error('Tried to parse raw transaction but blockNumber was null');
+ }
+
+ const tx = new Transaction();
+ tx.transactionHash = rawTransaction.hash;
+ tx.blockHash = rawTransaction.blockHash;
+ tx.blockNumber = rawTransaction.blockNumber;
+
+ tx.gasUsed = new BigNumber(rawTransaction.gas);
+ tx.gasPrice = rawTransaction.gasPrice;
+
+ return tx;
+}
diff --git a/packages/pipeline/src/scripts/pull_competing_dex_trades.ts b/packages/pipeline/src/scripts/pull_competing_dex_trades.ts
new file mode 100644
index 000000000..1478d5615
--- /dev/null
+++ b/packages/pipeline/src/scripts/pull_competing_dex_trades.ts
@@ -0,0 +1,51 @@
+// tslint:disable:no-console
+import 'reflect-metadata';
+import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm';
+
+import { BloxySource } from '../data_sources/bloxy';
+import { DexTrade } from '../entities';
+import * as ormConfig from '../ormconfig';
+import { parseBloxyTrades } from '../parsers/bloxy';
+import { handleError } from '../utils';
+
+// Number of trades to save at once.
+const BATCH_SAVE_SIZE = 1000;
+
+let connection: Connection;
+
+(async () => {
+ connection = await createConnection(ormConfig as ConnectionOptions);
+ await getAndSaveTradesAsync();
+ process.exit(0);
+})().catch(handleError);
+
+async function getAndSaveTradesAsync(): Promise<void> {
+ const apiKey = process.env.BLOXY_API_KEY;
+ if (apiKey === undefined) {
+ throw new Error('Missing required env var: BLOXY_API_KEY');
+ }
+ const bloxySource = new BloxySource(apiKey);
+ const tradesRepository = connection.getRepository(DexTrade);
+ const lastSeenTimestamp = await getLastSeenTimestampAsync(tradesRepository);
+ console.log(`Last seen timestamp: ${lastSeenTimestamp === 0 ? 'none' : lastSeenTimestamp}`);
+ console.log('Getting latest dex trades...');
+ const rawTrades = await bloxySource.getDexTradesAsync(lastSeenTimestamp);
+ console.log(`Parsing ${rawTrades.length} trades...`);
+ const trades = parseBloxyTrades(rawTrades);
+ console.log(`Saving ${trades.length} trades...`);
+ await tradesRepository.save(trades, { chunk: Math.ceil(trades.length / BATCH_SAVE_SIZE) });
+ console.log('Done saving trades.');
+}
+
+async function getLastSeenTimestampAsync(tradesRepository: Repository<DexTrade>): Promise<number> {
+ if ((await tradesRepository.count()) === 0) {
+ return 0;
+ }
+ const response = (await connection.query(
+ 'SELECT tx_timestamp FROM raw.dex_trades ORDER BY tx_timestamp DESC LIMIT 1',
+ )) as Array<{ tx_timestamp: number }>;
+ if (response.length === 0) {
+ return 0;
+ }
+ return response[0].tx_timestamp;
+}
diff --git a/packages/pipeline/src/scripts/pull_copper.ts b/packages/pipeline/src/scripts/pull_copper.ts
new file mode 100644
index 000000000..69814f209
--- /dev/null
+++ b/packages/pipeline/src/scripts/pull_copper.ts
@@ -0,0 +1,129 @@
+// tslint:disable:no-console
+import * as R from 'ramda';
+import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm';
+
+import { CopperEndpoint, CopperSearchParams, CopperSource } from '../data_sources/copper';
+import { CopperActivity, CopperActivityType, CopperCustomField, CopperLead, CopperOpportunity } from '../entities';
+import * as ormConfig from '../ormconfig';
+import {
+ CopperSearchResponse,
+ parseActivities,
+ parseActivityTypes,
+ parseCustomFields,
+ parseLeads,
+ parseOpportunities,
+} from '../parsers/copper';
+import { handleError } from '../utils';
+const ONE_SECOND = 1000;
+const COPPER_RATE_LIMIT = 10;
+let connection: Connection;
+
+(async () => {
+ connection = await createConnection(ormConfig as ConnectionOptions);
+
+ const accessToken = process.env.COPPER_ACCESS_TOKEN;
+ const userEmail = process.env.COPPER_USER_EMAIL;
+ if (accessToken === undefined || userEmail === undefined) {
+ throw new Error('Missing required env var: COPPER_ACCESS_TOKEN and/or COPPER_USER_EMAIL');
+ }
+ const source = new CopperSource(COPPER_RATE_LIMIT, accessToken, userEmail);
+
+ const fetchPromises = [
+ fetchAndSaveLeadsAsync(source),
+ fetchAndSaveOpportunitiesAsync(source),
+ fetchAndSaveActivitiesAsync(source),
+ fetchAndSaveCustomFieldsAsync(source),
+ fetchAndSaveActivityTypesAsync(source),
+ ];
+ fetchPromises.forEach(async fn => {
+ await fn;
+ });
+})().catch(handleError);
+
+async function fetchAndSaveLeadsAsync(source: CopperSource): Promise<void> {
+ const repository = connection.getRepository(CopperLead);
+ const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_leads');
+ console.log(`Fetching Copper leads starting from ${startTime}...`);
+ await fetchAndSaveAsync(CopperEndpoint.Leads, source, startTime, {}, parseLeads, repository);
+}
+
+async function fetchAndSaveOpportunitiesAsync(source: CopperSource): Promise<void> {
+ const repository = connection.getRepository(CopperOpportunity);
+ const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_opportunities');
+ console.log(`Fetching Copper opportunities starting from ${startTime}...`);
+ await fetchAndSaveAsync(
+ CopperEndpoint.Opportunities,
+ source,
+ startTime,
+ { sort_by: 'name' },
+ parseOpportunities,
+ repository,
+ );
+}
+
+async function fetchAndSaveActivitiesAsync(source: CopperSource): Promise<void> {
+ const repository = connection.getRepository(CopperActivity);
+ const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_activities');
+ const searchParams = {
+ minimum_activity_date: Math.floor(startTime / ONE_SECOND),
+ };
+ console.log(`Fetching Copper activities starting from ${startTime}...`);
+ await fetchAndSaveAsync(CopperEndpoint.Activities, source, startTime, searchParams, parseActivities, repository);
+}
+
+async function getMaxAsync(conn: Connection, sortColumn: string, tableName: string): Promise<number> {
+ const queryResult = await conn.query(`SELECT MAX(${sortColumn}) as _max from ${tableName};`);
+ if (R.isEmpty(queryResult)) {
+ return 0;
+ } else {
+ return queryResult[0]._max;
+ }
+}
+
+// (Xianny): Copper API doesn't allow queries to filter by date. To ensure that we are filling in ascending chronological
+// order and not missing any records, we are scraping all available pages. If Copper data gets larger,
+// it would make sense to search for and start filling from the first page that contains a new record.
+// This search would increase our network calls and is not efficient to implement with our current small volume
+// of Copper records.
+async function fetchAndSaveAsync<T extends CopperSearchResponse, E>(
+ endpoint: CopperEndpoint,
+ source: CopperSource,
+ startTime: number,
+ searchParams: CopperSearchParams,
+ parseFn: (recs: T[]) => E[],
+ repository: Repository<E>,
+): Promise<void> {
+ let saved = 0;
+ const numPages = await source.fetchNumberOfPagesAsync(endpoint);
+ try {
+ for (let i = numPages; i > 0; i--) {
+ console.log(`Fetching page ${i}/${numPages} of ${endpoint}...`);
+ const raw = await source.fetchSearchResultsAsync<T>(endpoint, {
+ ...searchParams,
+ page_number: i,
+ });
+ const newRecords = raw.filter(rec => rec.date_modified * ONE_SECOND > startTime);
+ const parsed = parseFn(newRecords);
+ await repository.save<any>(parsed);
+ saved += newRecords.length;
+ }
+ } catch (err) {
+ console.log(`Error fetching ${endpoint}, stopping: ${err.stack}`);
+ } finally {
+ console.log(`Saved ${saved} items from ${endpoint}, done.`);
+ }
+}
+
+async function fetchAndSaveActivityTypesAsync(source: CopperSource): Promise<void> {
+ console.log(`Fetching Copper activity types...`);
+ const activityTypes = await source.fetchActivityTypesAsync();
+ const repository = connection.getRepository(CopperActivityType);
+ await repository.save(parseActivityTypes(activityTypes));
+}
+
+async function fetchAndSaveCustomFieldsAsync(source: CopperSource): Promise<void> {
+ console.log(`Fetching Copper custom fields...`);
+ const customFields = await source.fetchCustomFieldsAsync();
+ const repository = connection.getRepository(CopperCustomField);
+ await repository.save(parseCustomFields(customFields));
+}
diff --git a/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts
new file mode 100644
index 000000000..4e00f258f
--- /dev/null
+++ b/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts
@@ -0,0 +1,55 @@
+import { logUtils } from '@0x/utils';
+import * as R from 'ramda';
+import { Connection, ConnectionOptions, createConnection } from 'typeorm';
+
+import { DDEX_SOURCE, DdexMarket, DdexSource } from '../data_sources/ddex';
+import { TokenOrderbookSnapshot as TokenOrder } from '../entities';
+import * as ormConfig from '../ormconfig';
+import { parseDdexOrders } from '../parsers/ddex_orders';
+import { handleError } from '../utils';
+
+// Number of orders to save at once.
+const BATCH_SAVE_SIZE = 1000;
+
+// Number of markets to retrieve orderbooks for at once.
+const MARKET_ORDERBOOK_REQUEST_BATCH_SIZE = 50;
+
+// Delay between market orderbook requests.
+const MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY = 5000;
+
+let connection: Connection;
+
+(async () => {
+ connection = await createConnection(ormConfig as ConnectionOptions);
+ const ddexSource = new DdexSource();
+ const markets = await ddexSource.getActiveMarketsAsync();
+ for (const marketsChunk of R.splitEvery(MARKET_ORDERBOOK_REQUEST_BATCH_SIZE, markets)) {
+ await Promise.all(
+ marketsChunk.map(async (market: DdexMarket) => getAndSaveMarketOrderbookAsync(ddexSource, market)),
+ );
+ await new Promise<void>(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY));
+ }
+ process.exit(0);
+})().catch(handleError);
+
+/**
+ * Retrieve orderbook from Ddex API for a given market. Parse orders and insert
+ * them into our database.
+ * @param ddexSource Data source which can query Ddex API.
+ * @param market Object from Ddex API containing market data.
+ */
+async function getAndSaveMarketOrderbookAsync(ddexSource: DdexSource, market: DdexMarket): Promise<void> {
+ const orderBook = await ddexSource.getMarketOrderbookAsync(market.id);
+ const observedTimestamp = Date.now();
+
+ logUtils.log(`${market.id}: Parsing orders.`);
+ const orders = parseDdexOrders(orderBook, market, observedTimestamp, DDEX_SOURCE);
+
+ if (orders.length > 0) {
+ logUtils.log(`${market.id}: Saving ${orders.length} orders.`);
+ const TokenOrderRepository = connection.getRepository(TokenOrder);
+ await TokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) });
+ } else {
+ logUtils.log(`${market.id}: 0 orders to save.`);
+ }
+}
diff --git a/packages/pipeline/src/scripts/pull_erc20_events.ts b/packages/pipeline/src/scripts/pull_erc20_events.ts
new file mode 100644
index 000000000..bd520c610
--- /dev/null
+++ b/packages/pipeline/src/scripts/pull_erc20_events.ts
@@ -0,0 +1,96 @@
+import { getContractAddressesForNetworkOrThrow } from '@0x/contract-addresses';
+import { web3Factory } from '@0x/dev-utils';
+import { Web3ProviderEngine } from '@0x/subproviders';
+import { logUtils } from '@0x/utils';
+import { Web3Wrapper } from '@0x/web3-wrapper';
+import 'reflect-metadata';
+import { Connection, ConnectionOptions, createConnection } from 'typeorm';
+
+import { ERC20EventsSource } from '../data_sources/contract-wrappers/erc20_events';
+import { ERC20ApprovalEvent } from '../entities';
+import * as ormConfig from '../ormconfig';
+import { parseERC20ApprovalEvents } from '../parsers/events';
+import { handleError, INFURA_ROOT_URL } from '../utils';
+
+const NETWORK_ID = 1;
+const START_BLOCK_OFFSET = 100; // Number of blocks before the last known block to consider when updating fill events.
+const BATCH_SAVE_SIZE = 1000; // Number of events to save at once.
+const BLOCK_FINALITY_THRESHOLD = 10; // When to consider blocks as final. Used to compute default endBlock.
+
+let connection: Connection;
+
+interface Token {
+ // name is used for logging only.
+ name: string;
+ address: string;
+ defaultStartBlock: number;
+}
+
+const tokensToGetApprovalEvents: Token[] = [
+ {
+ name: 'WETH',
+ address: getContractAddressesForNetworkOrThrow(NETWORK_ID).etherToken,
+ defaultStartBlock: 4719568, // Block when the WETH contract was deployed.
+ },
+ {
+ name: 'ZRX',
+ address: getContractAddressesForNetworkOrThrow(NETWORK_ID).zrxToken,
+ defaultStartBlock: 4145415, // Block when the ZRX contract was deployed.
+ },
+ {
+ name: 'DAI',
+ address: '0x89d24a6b4ccb1b6faa2625fe562bdd9a23260359',
+ defaultStartBlock: 4752008, // Block when the DAI contract was deployed.
+ },
+];
+
+(async () => {
+ connection = await createConnection(ormConfig as ConnectionOptions);
+ const provider = web3Factory.getRpcProvider({
+ rpcUrl: INFURA_ROOT_URL,
+ });
+ const endBlock = await calculateEndBlockAsync(provider);
+ for (const token of tokensToGetApprovalEvents) {
+ await getAndSaveApprovalEventsAsync(provider, token, endBlock);
+ }
+ process.exit(0);
+})().catch(handleError);
+
+async function getAndSaveApprovalEventsAsync(
+ provider: Web3ProviderEngine,
+ token: Token,
+ endBlock: number,
+): Promise<void> {
+ logUtils.log(`Getting approval events for ${token.name}...`);
+ logUtils.log('Checking existing approval events...');
+ const repository = connection.getRepository(ERC20ApprovalEvent);
+ const startBlock = (await getStartBlockAsync(token)) || token.defaultStartBlock;
+
+ logUtils.log(`Getting approval events starting at ${startBlock}...`);
+ const eventsSource = new ERC20EventsSource(provider, NETWORK_ID, token.address);
+ const eventLogs = await eventsSource.getApprovalEventsAsync(startBlock, endBlock);
+
+ logUtils.log(`Parsing ${eventLogs.length} approval events...`);
+ const events = parseERC20ApprovalEvents(eventLogs);
+ logUtils.log(`Retrieved and parsed ${events.length} total approval events.`);
+ await repository.save(events, { chunk: Math.ceil(events.length / BATCH_SAVE_SIZE) });
+}
+
+async function calculateEndBlockAsync(provider: Web3ProviderEngine): Promise<number> {
+ const web3Wrapper = new Web3Wrapper(provider);
+ const currentBlock = await web3Wrapper.getBlockNumberAsync();
+ return currentBlock - BLOCK_FINALITY_THRESHOLD;
+}
+
+async function getStartBlockAsync(token: Token): Promise<number | null> {
+ const queryResult = await connection.query(
+ `SELECT block_number FROM raw.erc20_approval_events WHERE token_address = $1 ORDER BY block_number DESC LIMIT 1`,
+ [token.address],
+ );
+ if (queryResult.length === 0) {
+ logUtils.log(`No existing approval events found for ${token.name}.`);
+ return null;
+ }
+ const lastKnownBlock = queryResult[0].block_number;
+ return lastKnownBlock - START_BLOCK_OFFSET;
+}
diff --git a/packages/pipeline/src/scripts/pull_exchange_events.ts b/packages/pipeline/src/scripts/pull_exchange_events.ts
new file mode 100644
index 000000000..e98fc6629
--- /dev/null
+++ b/packages/pipeline/src/scripts/pull_exchange_events.ts
@@ -0,0 +1,146 @@
+// tslint:disable:no-console
+import { web3Factory } from '@0x/dev-utils';
+import { Web3ProviderEngine } from '@0x/subproviders';
+import { Web3Wrapper } from '@0x/web3-wrapper';
+import R = require('ramda');
+import 'reflect-metadata';
+import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm';
+
+import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events';
+import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeEvent, ExchangeFillEvent } from '../entities';
+import * as ormConfig from '../ormconfig';
+import { parseExchangeCancelEvents, parseExchangeCancelUpToEvents, parseExchangeFillEvents } from '../parsers/events';
+import { EXCHANGE_START_BLOCK, handleError, INFURA_ROOT_URL } from '../utils';
+
+const START_BLOCK_OFFSET = 100; // Number of blocks before the last known block to consider when updating fill events.
+const BATCH_SAVE_SIZE = 1000; // Number of events to save at once.
+const BLOCK_FINALITY_THRESHOLD = 10; // When to consider blocks as final. Used to compute default endBlock.
+
+let connection: Connection;
+
+(async () => {
+ connection = await createConnection(ormConfig as ConnectionOptions);
+ const provider = web3Factory.getRpcProvider({
+ rpcUrl: INFURA_ROOT_URL,
+ });
+ const endBlock = await calculateEndBlockAsync(provider);
+ const eventsSource = new ExchangeEventsSource(provider, 1);
+ await getFillEventsAsync(eventsSource, endBlock);
+ await getCancelEventsAsync(eventsSource, endBlock);
+ await getCancelUpToEventsAsync(eventsSource, endBlock);
+ process.exit(0);
+})().catch(handleError);
+
+async function getFillEventsAsync(eventsSource: ExchangeEventsSource, endBlock: number): Promise<void> {
+ console.log('Checking existing fill events...');
+ const repository = connection.getRepository(ExchangeFillEvent);
+ const startBlock = await getStartBlockAsync(repository);
+ console.log(`Getting fill events starting at ${startBlock}...`);
+ const eventLogs = await eventsSource.getFillEventsAsync(startBlock, endBlock);
+ console.log('Parsing fill events...');
+ const events = parseExchangeFillEvents(eventLogs);
+ console.log(`Retrieved and parsed ${events.length} total fill events.`);
+ await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events);
+}
+
+async function getCancelEventsAsync(eventsSource: ExchangeEventsSource, endBlock: number): Promise<void> {
+ console.log('Checking existing cancel events...');
+ const repository = connection.getRepository(ExchangeCancelEvent);
+ const startBlock = await getStartBlockAsync(repository);
+ console.log(`Getting cancel events starting at ${startBlock}...`);
+ const eventLogs = await eventsSource.getCancelEventsAsync(startBlock, endBlock);
+ console.log('Parsing cancel events...');
+ const events = parseExchangeCancelEvents(eventLogs);
+ console.log(`Retrieved and parsed ${events.length} total cancel events.`);
+ await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events);
+}
+
+async function getCancelUpToEventsAsync(eventsSource: ExchangeEventsSource, endBlock: number): Promise<void> {
+ console.log('Checking existing CancelUpTo events...');
+ const repository = connection.getRepository(ExchangeCancelUpToEvent);
+ const startBlock = await getStartBlockAsync(repository);
+ console.log(`Getting CancelUpTo events starting at ${startBlock}...`);
+ const eventLogs = await eventsSource.getCancelUpToEventsAsync(startBlock, endBlock);
+ console.log('Parsing CancelUpTo events...');
+ const events = parseExchangeCancelUpToEvents(eventLogs);
+ console.log(`Retrieved and parsed ${events.length} total CancelUpTo events.`);
+ await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events);
+}
+
+const tableNameRegex = /^[a-zA-Z_]*$/;
+
+async function getStartBlockAsync<T extends ExchangeEvent>(repository: Repository<T>): Promise<number> {
+ const fillEventCount = await repository.count();
+ if (fillEventCount === 0) {
+ console.log(`No existing ${repository.metadata.name}s found.`);
+ return EXCHANGE_START_BLOCK;
+ }
+ const tableName = repository.metadata.tableName;
+ if (!tableNameRegex.test(tableName)) {
+ throw new Error(`Unexpected special character in table name: ${tableName}`);
+ }
+ const queryResult = await connection.query(
+ `SELECT block_number FROM raw.${tableName} ORDER BY block_number DESC LIMIT 1`,
+ );
+ const lastKnownBlock = queryResult[0].block_number;
+ return lastKnownBlock - START_BLOCK_OFFSET;
+}
+
+async function saveEventsAsync<T extends ExchangeEvent>(
+ isInitialPull: boolean,
+ repository: Repository<T>,
+ events: T[],
+): Promise<void> {
+ console.log(`Saving ${repository.metadata.name}s...`);
+ if (isInitialPull) {
+ // Split data into numChunks pieces of maximum size BATCH_SAVE_SIZE
+ // each.
+ for (const eventsBatch of R.splitEvery(BATCH_SAVE_SIZE, events)) {
+ await repository.insert(eventsBatch);
+ }
+ } else {
+ // If we possibly have some overlap where we need to update some
+ // existing events, we need to use our workaround/fallback.
+ await saveIndividuallyWithFallbackAsync(repository, events);
+ }
+ const totalEvents = await repository.count();
+ console.log(`Done saving events. There are now ${totalEvents} total ${repository.metadata.name}s.`);
+}
+
+async function saveIndividuallyWithFallbackAsync<T extends ExchangeEvent>(
+ repository: Repository<T>,
+ events: T[],
+): Promise<void> {
+ // Note(albrow): This is a temporary hack because `save` is not working as
+ // documented and is causing a foreign key constraint violation. Hopefully
+ // can remove later because this "poor man's upsert" implementation operates
+ // on one event at a time and is therefore much slower.
+ for (const event of events) {
+ try {
+ // First try an insert.
+ await repository.insert(event);
+ } catch {
+ // If it fails, assume it was a foreign key constraint error and try
+ // doing an update instead.
+ // Note(albrow): Unfortunately the `as any` hack here seems
+ // required. I can't figure out how to convince the type-checker
+ // that the criteria and the entity itself are the correct type for
+ // the given repository. If we can remove the `save` hack then this
+ // will probably no longer be necessary.
+ await repository.update(
+ {
+ contractAddress: event.contractAddress,
+ blockNumber: event.blockNumber,
+ logIndex: event.logIndex,
+ } as any,
+ event as any,
+ );
+ }
+ }
+}
+
+async function calculateEndBlockAsync(provider: Web3ProviderEngine): Promise<number> {
+ const web3Wrapper = new Web3Wrapper(provider);
+ const currentBlock = await web3Wrapper.getBlockNumberAsync();
+ return currentBlock - BLOCK_FINALITY_THRESHOLD;
+}
diff --git a/packages/pipeline/src/scripts/pull_idex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_idex_orderbook_snapshots.ts
new file mode 100644
index 000000000..490b17766
--- /dev/null
+++ b/packages/pipeline/src/scripts/pull_idex_orderbook_snapshots.ts
@@ -0,0 +1,63 @@
+import { logUtils } from '@0x/utils';
+import * as R from 'ramda';
+import { Connection, ConnectionOptions, createConnection } from 'typeorm';
+
+import { IDEX_SOURCE, IdexSource } from '../data_sources/idex';
+import { TokenOrderbookSnapshot as TokenOrder } from '../entities';
+import * as ormConfig from '../ormconfig';
+import { parseIdexOrders } from '../parsers/idex_orders';
+import { handleError } from '../utils';
+
+// Number of orders to save at once.
+const BATCH_SAVE_SIZE = 1000;
+
+// Number of markets to retrieve orderbooks for at once.
+const MARKET_ORDERBOOK_REQUEST_BATCH_SIZE = 100;
+
+// Delay between market orderbook requests.
+const MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY = 2000;
+
+let connection: Connection;
+
+(async () => {
+ connection = await createConnection(ormConfig as ConnectionOptions);
+ const idexSource = new IdexSource();
+ logUtils.log('Getting all IDEX markets');
+ const markets = await idexSource.getMarketsAsync();
+ logUtils.log(`Got ${markets.length} markets.`);
+ for (const marketsChunk of R.splitEvery(MARKET_ORDERBOOK_REQUEST_BATCH_SIZE, markets)) {
+ await Promise.all(
+ marketsChunk.map(async (marketId: string) => getAndSaveMarketOrderbookAsync(idexSource, marketId)),
+ );
+ await new Promise<void>(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY));
+ }
+ process.exit(0);
+})().catch(handleError);
+
+/**
+ * Retrieve orderbook from Idex API for a given market. Parse orders and insert
+ * them into our database.
+ * @param idexSource Data source which can query Idex API.
+ * @param marketId String representing market of interest, eg. 'ETH_TIC'.
+ */
+async function getAndSaveMarketOrderbookAsync(idexSource: IdexSource, marketId: string): Promise<void> {
+ logUtils.log(`${marketId}: Retrieving orderbook.`);
+ const orderBook = await idexSource.getMarketOrderbookAsync(marketId);
+ const observedTimestamp = Date.now();
+
+ if (!R.has('bids', orderBook) || !R.has('asks', orderBook)) {
+ logUtils.warn(`${marketId}: Orderbook faulty.`);
+ return;
+ }
+
+ logUtils.log(`${marketId}: Parsing orders.`);
+ const orders = parseIdexOrders(orderBook, observedTimestamp, IDEX_SOURCE);
+
+ if (orders.length > 0) {
+ logUtils.log(`${marketId}: Saving ${orders.length} orders.`);
+ const TokenOrderRepository = connection.getRepository(TokenOrder);
+ await TokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) });
+ } else {
+ logUtils.log(`${marketId}: 0 orders to save.`);
+ }
+}
diff --git a/packages/pipeline/src/scripts/pull_missing_blocks.ts b/packages/pipeline/src/scripts/pull_missing_blocks.ts
new file mode 100644
index 000000000..ced9d99eb
--- /dev/null
+++ b/packages/pipeline/src/scripts/pull_missing_blocks.ts
@@ -0,0 +1,90 @@
+// tslint:disable:no-console
+import { web3Factory } from '@0x/dev-utils';
+import * as Parallel from 'async-parallel';
+import R = require('ramda');
+import 'reflect-metadata';
+import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm';
+
+import { Web3Source } from '../data_sources/web3';
+import { Block } from '../entities';
+import * as ormConfig from '../ormconfig';
+import { parseBlock } from '../parsers/web3';
+import { handleError, INFURA_ROOT_URL } from '../utils';
+
+// Number of blocks to save at once.
+const BATCH_SAVE_SIZE = 1000;
+// Maximum number of requests to send at once.
+const MAX_CONCURRENCY = 20;
+// Maximum number of blocks to query for at once. This is also the maximum
+// number of blocks we will hold in memory prior to being saved to the database.
+const MAX_BLOCKS_PER_QUERY = 1000;
+
+let connection: Connection;
+
+const tablesWithMissingBlocks = [
+ 'raw.exchange_fill_events',
+ 'raw.exchange_cancel_events',
+ 'raw.exchange_cancel_up_to_events',
+ 'raw.erc20_approval_events',
+];
+
+(async () => {
+ connection = await createConnection(ormConfig as ConnectionOptions);
+ const provider = web3Factory.getRpcProvider({
+ rpcUrl: INFURA_ROOT_URL,
+ });
+ const web3Source = new Web3Source(provider);
+ for (const tableName of tablesWithMissingBlocks) {
+ await getAllMissingBlocksAsync(web3Source, tableName);
+ }
+ process.exit(0);
+})().catch(handleError);
+
+interface MissingBlocksResponse {
+ block_number: string;
+}
+
+async function getAllMissingBlocksAsync(web3Source: Web3Source, tableName: string): Promise<void> {
+ const blocksRepository = connection.getRepository(Block);
+ while (true) {
+ console.log(`Checking for missing blocks in ${tableName}...`);
+ const blockNumbers = await getMissingBlockNumbersAsync(tableName);
+ if (blockNumbers.length === 0) {
+ // There are no more missing blocks. We're done.
+ break;
+ }
+ await getAndSaveBlocksAsync(web3Source, blocksRepository, blockNumbers);
+ }
+ const totalBlocks = await blocksRepository.count();
+ console.log(`Done saving blocks for ${tableName}. There are now ${totalBlocks} total blocks.`);
+}
+
+async function getMissingBlockNumbersAsync(tableName: string): Promise<number[]> {
+ // This query returns up to `MAX_BLOCKS_PER_QUERY` distinct block numbers
+ // which are present in `tableName` but not in `raw.blocks`.
+ const response = (await connection.query(
+ `SELECT DISTINCT(block_number) FROM ${tableName} LEFT JOIN raw.blocks ON ${tableName}.block_number = raw.blocks.number WHERE number IS NULL LIMIT $1;`,
+ [MAX_BLOCKS_PER_QUERY],
+ )) as MissingBlocksResponse[];
+ const blockNumberStrings = R.pluck('block_number', response);
+ const blockNumbers = R.map(parseInt, blockNumberStrings);
+ console.log(`Found ${blockNumbers.length} missing blocks.`);
+ return blockNumbers;
+}
+
+async function getAndSaveBlocksAsync(
+ web3Source: Web3Source,
+ blocksRepository: Repository<Block>,
+ blockNumbers: number[],
+): Promise<void> {
+ console.log(`Getting block data for ${blockNumbers.length} blocks...`);
+ Parallel.setConcurrency(MAX_CONCURRENCY);
+ const rawBlocks = await Parallel.map(blockNumbers, async (blockNumber: number) =>
+ web3Source.getBlockInfoAsync(blockNumber),
+ );
+ console.log(`Parsing ${rawBlocks.length} blocks...`);
+ const blocks = R.map(parseBlock, rawBlocks);
+ console.log(`Saving ${blocks.length} blocks...`);
+ await blocksRepository.save(blocks, { chunk: Math.ceil(blocks.length / BATCH_SAVE_SIZE) });
+ console.log('Done saving this batch of blocks');
+}
diff --git a/packages/pipeline/src/scripts/pull_oasis_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_oasis_orderbook_snapshots.ts
new file mode 100644
index 000000000..c4dcf6c83
--- /dev/null
+++ b/packages/pipeline/src/scripts/pull_oasis_orderbook_snapshots.ts
@@ -0,0 +1,58 @@
+import { logUtils } from '@0x/utils';
+import * as R from 'ramda';
+import { Connection, ConnectionOptions, createConnection } from 'typeorm';
+
+import { OASIS_SOURCE, OasisMarket, OasisSource } from '../data_sources/oasis';
+import { TokenOrderbookSnapshot as TokenOrder } from '../entities';
+import * as ormConfig from '../ormconfig';
+import { parseOasisOrders } from '../parsers/oasis_orders';
+import { handleError } from '../utils';
+
+// Number of orders to save at once.
+const BATCH_SAVE_SIZE = 1000;
+
+// Number of markets to retrieve orderbooks for at once.
+const MARKET_ORDERBOOK_REQUEST_BATCH_SIZE = 50;
+
+// Delay between market orderbook requests.
+const MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY = 1000;
+
+let connection: Connection;
+
+(async () => {
+ connection = await createConnection(ormConfig as ConnectionOptions);
+ const oasisSource = new OasisSource();
+ logUtils.log('Getting all active Oasis markets');
+ const markets = await oasisSource.getActiveMarketsAsync();
+ logUtils.log(`Got ${markets.length} markets.`);
+ for (const marketsChunk of R.splitEvery(MARKET_ORDERBOOK_REQUEST_BATCH_SIZE, markets)) {
+ await Promise.all(
+ marketsChunk.map(async (market: OasisMarket) => getAndSaveMarketOrderbookAsync(oasisSource, market)),
+ );
+ await new Promise<void>(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY));
+ }
+ process.exit(0);
+})().catch(handleError);
+
+/**
+ * Retrieve orderbook from Oasis API for a given market. Parse orders and insert
+ * them into our database.
+ * @param oasisSource Data source which can query Oasis API.
+ * @param marketId String identifying market we want data for. eg. 'REPAUG'.
+ */
+async function getAndSaveMarketOrderbookAsync(oasisSource: OasisSource, market: OasisMarket): Promise<void> {
+ logUtils.log(`${market.id}: Retrieving orderbook.`);
+ const orderBook = await oasisSource.getMarketOrderbookAsync(market.id);
+ const observedTimestamp = Date.now();
+
+ logUtils.log(`${market.id}: Parsing orders.`);
+ const orders = parseOasisOrders(orderBook, market, observedTimestamp, OASIS_SOURCE);
+
+ if (orders.length > 0) {
+ logUtils.log(`${market.id}: Saving ${orders.length} orders.`);
+ const TokenOrderRepository = connection.getRepository(TokenOrder);
+ await TokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) });
+ } else {
+ logUtils.log(`${market.id}: 0 orders to save.`);
+ }
+}
diff --git a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts
new file mode 100644
index 000000000..d44eb5cc6
--- /dev/null
+++ b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts
@@ -0,0 +1,95 @@
+// tslint:disable:no-console
+import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm';
+
+import { CryptoCompareOHLCVSource } from '../data_sources/ohlcv_external/crypto_compare';
+import { OHLCVExternal } from '../entities';
+import * as ormConfig from '../ormconfig';
+import { OHLCVMetadata, parseRecords } from '../parsers/ohlcv_external/crypto_compare';
+import { handleError } from '../utils';
+import { fetchOHLCVTradingPairsAsync, TradingPair } from '../utils/get_ohlcv_trading_pairs';
+
+const SOURCE_NAME = 'CryptoCompare';
+const TWO_HOURS_AGO = new Date().getTime() - 2 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers
+
+const MAX_REQS_PER_SECOND = parseInt(process.env.CRYPTOCOMPARE_MAX_REQS_PER_SECOND || '15', 10); // tslint:disable-line:custom-no-magic-numbers
+const EARLIEST_BACKFILL_DATE = process.env.OHLCV_EARLIEST_BACKFILL_DATE || '2014-06-01';
+const EARLIEST_BACKFILL_TIME = new Date(EARLIEST_BACKFILL_DATE).getTime();
+
+let connection: Connection;
+
+(async () => {
+ connection = await createConnection(ormConfig as ConnectionOptions);
+ const repository = connection.getRepository(OHLCVExternal);
+ const source = new CryptoCompareOHLCVSource(MAX_REQS_PER_SECOND);
+
+ const jobTime = new Date().getTime();
+ const tradingPairs = await fetchOHLCVTradingPairsAsync(connection, SOURCE_NAME, EARLIEST_BACKFILL_TIME);
+ console.log(`Starting ${tradingPairs.length} job(s) to scrape Crypto Compare for OHLCV records...`);
+
+ const fetchAndSavePromises = tradingPairs.map(async pair => {
+ const pairs = source.generateBackfillIntervals(pair);
+ return fetchAndSaveAsync(source, repository, jobTime, pairs);
+ });
+ await Promise.all(fetchAndSavePromises);
+ console.log(`Finished scraping OHLCV records from Crypto Compare, exiting...`);
+ process.exit(0);
+})().catch(handleError);
+
+async function fetchAndSaveAsync(
+ source: CryptoCompareOHLCVSource,
+ repository: Repository<OHLCVExternal>,
+ jobTime: number,
+ pairs: TradingPair[],
+): Promise<void> {
+ const sortAscTimestamp = (a: TradingPair, b: TradingPair): number => {
+ if (a.latestSavedTime < b.latestSavedTime) {
+ return -1;
+ } else if (a.latestSavedTime > b.latestSavedTime) {
+ return 1;
+ } else {
+ return 0;
+ }
+ };
+ pairs.sort(sortAscTimestamp);
+
+ let i = 0;
+ while (i < pairs.length) {
+ const pair = pairs[i];
+ if (pair.latestSavedTime > TWO_HOURS_AGO) {
+ break;
+ }
+ try {
+ const records = await source.getHourlyOHLCVAsync(pair);
+ console.log(`Retrieved ${records.length} records for ${JSON.stringify(pair)}`);
+ if (records.length > 0) {
+ const metadata: OHLCVMetadata = {
+ exchange: source.defaultExchange,
+ fromSymbol: pair.fromSymbol,
+ toSymbol: pair.toSymbol,
+ source: SOURCE_NAME,
+ observedTimestamp: jobTime,
+ interval: source.intervalBetweenRecords,
+ };
+ const parsedRecords = parseRecords(records, metadata);
+ await saveRecordsAsync(repository, parsedRecords);
+ }
+ i++;
+ } catch (err) {
+ console.log(`Error scraping OHLCVRecords, stopping task for ${JSON.stringify(pair)} [${err}]`);
+ break;
+ }
+ }
+ return Promise.resolve();
+}
+
+async function saveRecordsAsync(repository: Repository<OHLCVExternal>, records: OHLCVExternal[]): Promise<void> {
+ const metadata = [
+ records[0].fromSymbol,
+ records[0].toSymbol,
+ new Date(records[0].startTime),
+ new Date(records[records.length - 1].endTime),
+ ];
+
+ console.log(`Saving ${records.length} records to ${repository.metadata.name}... ${JSON.stringify(metadata)}`);
+ await repository.save(records);
+}
diff --git a/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts
new file mode 100644
index 000000000..34345f355
--- /dev/null
+++ b/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts
@@ -0,0 +1,87 @@
+import { logUtils } from '@0x/utils';
+import { Connection, ConnectionOptions, createConnection } from 'typeorm';
+
+import {
+ PARADEX_SOURCE,
+ ParadexActiveMarketsResponse,
+ ParadexMarket,
+ ParadexSource,
+ ParadexTokenInfoResponse,
+} from '../data_sources/paradex';
+import { TokenOrderbookSnapshot as TokenOrder } from '../entities';
+import * as ormConfig from '../ormconfig';
+import { parseParadexOrders } from '../parsers/paradex_orders';
+import { handleError } from '../utils';
+
+// Number of orders to save at once.
+const BATCH_SAVE_SIZE = 1000;
+
+let connection: Connection;
+
+(async () => {
+ connection = await createConnection(ormConfig as ConnectionOptions);
+ const apiKey = process.env.PARADEX_DATA_PIPELINE_API_KEY;
+ if (apiKey === undefined) {
+ throw new Error('Missing required env var: PARADEX_DATA_PIPELINE_API_KEY');
+ }
+ const paradexSource = new ParadexSource(apiKey);
+ const markets = await paradexSource.getActiveMarketsAsync();
+ const tokenInfoResponse = await paradexSource.getTokenInfoAsync();
+ const extendedMarkets = addTokenAddresses(markets, tokenInfoResponse);
+ await Promise.all(
+ extendedMarkets.map(async (market: ParadexMarket) => getAndSaveMarketOrderbookAsync(paradexSource, market)),
+ );
+ process.exit(0);
+})().catch(handleError);
+
+/**
+ * Extend the default ParadexMarket objects with token addresses.
+ * @param markets An array of ParadexMarket objects.
+ * @param tokenInfoResponse An array of ParadexTokenInfo containing the addresses.
+ */
+function addTokenAddresses(
+ markets: ParadexActiveMarketsResponse,
+ tokenInfoResponse: ParadexTokenInfoResponse,
+): ParadexMarket[] {
+ const symbolAddressMapping = new Map<string, string>();
+ tokenInfoResponse.forEach(tokenInfo => symbolAddressMapping.set(tokenInfo.symbol, tokenInfo.address));
+
+ markets.forEach((market: ParadexMarket) => {
+ if (symbolAddressMapping.has(market.baseToken)) {
+ market.baseTokenAddress = symbolAddressMapping.get(market.baseToken);
+ } else {
+ market.quoteTokenAddress = '';
+ logUtils.warn(`${market.baseToken}: No address found.`);
+ }
+
+ if (symbolAddressMapping.has(market.quoteToken)) {
+ market.quoteTokenAddress = symbolAddressMapping.get(market.quoteToken);
+ } else {
+ market.quoteTokenAddress = '';
+ logUtils.warn(`${market.quoteToken}: No address found.`);
+ }
+ });
+ return markets;
+}
+
+/**
+ * Retrieve orderbook from Paradex API for a given market. Parse orders and insert
+ * them into our database.
+ * @param paradexSource Data source which can query the Paradex API.
+ * @param market Object from the Paradex API with information about the market in question.
+ */
+async function getAndSaveMarketOrderbookAsync(paradexSource: ParadexSource, market: ParadexMarket): Promise<void> {
+ const paradexOrderbookResponse = await paradexSource.getMarketOrderbookAsync(market.symbol);
+ const observedTimestamp = Date.now();
+
+ logUtils.log(`${market.symbol}: Parsing orders.`);
+ const orders = parseParadexOrders(paradexOrderbookResponse, market, observedTimestamp, PARADEX_SOURCE);
+
+ if (orders.length > 0) {
+ logUtils.log(`${market.symbol}: Saving ${orders.length} orders.`);
+ const tokenOrderRepository = connection.getRepository(TokenOrder);
+ await tokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) });
+ } else {
+ logUtils.log(`${market.symbol}: 0 orders to save.`);
+ }
+}
diff --git a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts
new file mode 100644
index 000000000..40bb6fc97
--- /dev/null
+++ b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts
@@ -0,0 +1,54 @@
+// tslint:disable:no-console
+import { HttpClient } from '@0x/connect';
+import * as R from 'ramda';
+import 'reflect-metadata';
+import { Connection, ConnectionOptions, createConnection, EntityManager } from 'typeorm';
+
+import { createObservedTimestampForOrder, SraOrder } from '../entities';
+import * as ormConfig from '../ormconfig';
+import { parseSraOrders } from '../parsers/sra_orders';
+import { handleError } from '../utils';
+
+const RADAR_RELAY_URL = 'https://api.radarrelay.com/0x/v2';
+const ORDERS_PER_PAGE = 10000; // Number of orders to get per request.
+
+let connection: Connection;
+
+(async () => {
+ connection = await createConnection(ormConfig as ConnectionOptions);
+ await getOrderbookAsync();
+ process.exit(0);
+})().catch(handleError);
+
+async function getOrderbookAsync(): Promise<void> {
+ console.log('Getting all orders...');
+ const connectClient = new HttpClient(RADAR_RELAY_URL);
+ const rawOrders = await connectClient.getOrdersAsync({
+ perPage: ORDERS_PER_PAGE,
+ });
+ console.log(`Got ${rawOrders.records.length} orders.`);
+ console.log('Parsing orders...');
+ // Parse the sra orders, then add source url to each.
+ const orders = R.pipe(parseSraOrders, R.map(setSourceUrl(RADAR_RELAY_URL)))(rawOrders);
+ // Save all the orders and update the observed time stamps in a single
+ // transaction.
+ console.log('Saving orders and updating timestamps...');
+ const observedTimestamp = Date.now();
+ await connection.transaction(async (manager: EntityManager): Promise<void> => {
+ for (const order of orders) {
+ await manager.save(SraOrder, order);
+ const orderObservation = createObservedTimestampForOrder(order, observedTimestamp);
+ await manager.save(orderObservation);
+ }
+ });
+}
+
+const sourceUrlProp = R.lensProp('sourceUrl');
+
+/**
+ * Sets the source url for a single order. Returns a new order instead of
+ * mutating the given one.
+ */
+const setSourceUrl = R.curry((sourceURL: string, order: SraOrder): SraOrder => {
+ return R.set(sourceUrlProp, sourceURL, order);
+});
diff --git a/packages/pipeline/src/scripts/pull_trusted_tokens.ts b/packages/pipeline/src/scripts/pull_trusted_tokens.ts
new file mode 100644
index 000000000..5906deee6
--- /dev/null
+++ b/packages/pipeline/src/scripts/pull_trusted_tokens.ts
@@ -0,0 +1,52 @@
+import 'reflect-metadata';
+import { Connection, ConnectionOptions, createConnection } from 'typeorm';
+
+import { MetamaskTrustedTokenMeta, TrustedTokenSource, ZeroExTrustedTokenMeta } from '../data_sources/trusted_tokens';
+import { TokenMetadata } from '../entities';
+import * as ormConfig from '../ormconfig';
+import { parseMetamaskTrustedTokens, parseZeroExTrustedTokens } from '../parsers/token_metadata';
+import { handleError } from '../utils';
+
+const METAMASK_TRUSTED_TOKENS_URL =
+ 'https://raw.githubusercontent.com/MetaMask/eth-contract-metadata/d45916c533116510cc8e9e048a8b5fc3732a6b6d/contract-map.json';
+
+const ZEROEX_TRUSTED_TOKENS_URL = 'https://website-api.0xproject.com/tokens';
+
+let connection: Connection;
+
+(async () => {
+ connection = await createConnection(ormConfig as ConnectionOptions);
+ await getMetamaskTrustedTokensAsync();
+ await getZeroExTrustedTokensAsync();
+ process.exit(0);
+})().catch(handleError);
+
+async function getMetamaskTrustedTokensAsync(): Promise<void> {
+ // tslint:disable-next-line:no-console
+ console.log('Getting latest metamask trusted tokens list ...');
+ const trustedTokensRepository = connection.getRepository(TokenMetadata);
+ const trustedTokensSource = new TrustedTokenSource<Map<string, MetamaskTrustedTokenMeta>>(
+ METAMASK_TRUSTED_TOKENS_URL,
+ );
+ const resp = await trustedTokensSource.getTrustedTokenMetaAsync();
+ const trustedTokens = parseMetamaskTrustedTokens(resp);
+ // tslint:disable-next-line:no-console
+ console.log('Saving metamask trusted tokens list');
+ await trustedTokensRepository.save(trustedTokens);
+ // tslint:disable-next-line:no-console
+ console.log('Done saving metamask trusted tokens.');
+}
+
+async function getZeroExTrustedTokensAsync(): Promise<void> {
+ // tslint:disable-next-line:no-console
+ console.log('Getting latest 0x trusted tokens list ...');
+ const trustedTokensRepository = connection.getRepository(TokenMetadata);
+ const trustedTokensSource = new TrustedTokenSource<ZeroExTrustedTokenMeta[]>(ZEROEX_TRUSTED_TOKENS_URL);
+ const resp = await trustedTokensSource.getTrustedTokenMetaAsync();
+ const trustedTokens = parseZeroExTrustedTokens(resp);
+ // tslint:disable-next-line:no-console
+ console.log('Saving metamask trusted tokens list');
+ await trustedTokensRepository.save(trustedTokens);
+ // tslint:disable-next-line:no-console
+ console.log('Done saving metamask trusted tokens.');
+}
diff --git a/packages/pipeline/src/scripts/update_relayer_info.ts b/packages/pipeline/src/scripts/update_relayer_info.ts
new file mode 100644
index 000000000..41d29b385
--- /dev/null
+++ b/packages/pipeline/src/scripts/update_relayer_info.ts
@@ -0,0 +1,33 @@
+// tslint:disable:no-console
+import 'reflect-metadata';
+import { Connection, ConnectionOptions, createConnection } from 'typeorm';
+
+import { RelayerRegistrySource } from '../data_sources/relayer-registry';
+import { Relayer } from '../entities';
+import * as ormConfig from '../ormconfig';
+import { parseRelayers } from '../parsers/relayer_registry';
+import { handleError } from '../utils';
+
+// NOTE(albrow): We need to manually update this URL for now. Fix this when we
+// have the relayer-registry behind semantic versioning.
+const RELAYER_REGISTRY_URL =
+ 'https://raw.githubusercontent.com/0xProject/0x-relayer-registry/4701c85677d161ea729a466aebbc1826c6aa2c0b/relayers.json';
+
+let connection: Connection;
+
+(async () => {
+ connection = await createConnection(ormConfig as ConnectionOptions);
+ await getRelayersAsync();
+ process.exit(0);
+})().catch(handleError);
+
+async function getRelayersAsync(): Promise<void> {
+ console.log('Getting latest relayer info...');
+ const relayerRepository = connection.getRepository(Relayer);
+ const relayerSource = new RelayerRegistrySource(RELAYER_REGISTRY_URL);
+ const relayersResp = await relayerSource.getRelayerInfoAsync();
+ const relayers = parseRelayers(relayersResp);
+ console.log('Saving relayer info...');
+ await relayerRepository.save(relayers);
+ console.log('Done saving relayer info.');
+}
diff --git a/packages/pipeline/src/types.ts b/packages/pipeline/src/types.ts
new file mode 100644
index 000000000..5f2121807
--- /dev/null
+++ b/packages/pipeline/src/types.ts
@@ -0,0 +1,9 @@
+export enum AssetType {
+ ERC20 = 'erc20',
+ ERC721 = 'erc721',
+ MultiAsset = 'multiAsset',
+}
+export enum OrderType {
+ Bid = 'bid',
+ Ask = 'ask',
+}
diff --git a/packages/pipeline/src/utils/constants.ts b/packages/pipeline/src/utils/constants.ts
new file mode 100644
index 000000000..56f3e82d8
--- /dev/null
+++ b/packages/pipeline/src/utils/constants.ts
@@ -0,0 +1,3 @@
+// Block number when the Exchange contract was deployed to mainnet.
+export const EXCHANGE_START_BLOCK = 6271590;
+export const INFURA_ROOT_URL = 'https://mainnet.infura.io';
diff --git a/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts b/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts
new file mode 100644
index 000000000..19f81344e
--- /dev/null
+++ b/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts
@@ -0,0 +1,116 @@
+import { fetchAsync } from '@0x/utils';
+import * as R from 'ramda';
+import { Connection } from 'typeorm';
+
+export interface TradingPair {
+ fromSymbol: string;
+ toSymbol: string;
+ latestSavedTime: number;
+}
+
+const COINLIST_API = 'https://min-api.cryptocompare.com/data/all/coinlist?BuiltOn=7605';
+
+interface CryptoCompareCoinListResp {
+ Data: Map<string, CryptoCompareCoin>;
+}
+
+interface CryptoCompareCoin {
+ Symbol: string;
+ BuiltOn: string;
+ SmartContractAddress: string;
+}
+
+const TO_CURRENCIES = ['USD', 'EUR', 'ETH', 'USDT'];
+const ETHEREUM_IDENTIFIER = '7605';
+const HTTP_OK_STATUS = 200;
+
+interface StaticPair {
+ fromSymbol: string;
+ toSymbol: string;
+}
+const SPECIAL_CASES: StaticPair[] = [
+ {
+ fromSymbol: 'ETH',
+ toSymbol: 'USD',
+ },
+];
+
+/**
+ * Get trading pairs with latest scraped time for OHLCV records
+ * @param conn a typeorm Connection to postgres
+ */
+export async function fetchOHLCVTradingPairsAsync(
+ conn: Connection,
+ source: string,
+ earliestBackfillTime: number,
+): Promise<TradingPair[]> {
+ // fetch existing ohlcv records
+ const latestTradingPairs: Array<{
+ from_symbol: string;
+ to_symbol: string;
+ latest: string;
+ }> = await conn.query(`SELECT
+ MAX(end_time) as latest,
+ from_symbol,
+ to_symbol
+ FROM raw.ohlcv_external
+ GROUP BY from_symbol, to_symbol;`);
+
+ // build addressable index: { fromsym: { tosym: time }}
+ const latestTradingPairsIndex: { [fromSym: string]: { [toSym: string]: number } } = {};
+ latestTradingPairs.forEach(pair => {
+ const latestIndex: { [toSym: string]: number } = latestTradingPairsIndex[pair.from_symbol] || {};
+ latestIndex[pair.to_symbol] = parseInt(pair.latest, 10); // tslint:disable-line:custom-no-magic-numbers
+ latestTradingPairsIndex[pair.from_symbol] = latestIndex;
+ });
+
+ // match time to special cases
+ const specialCases: TradingPair[] = SPECIAL_CASES.map(pair => {
+ const latestSavedTime =
+ R.path<number>([pair.fromSymbol, pair.toSymbol], latestTradingPairsIndex) || earliestBackfillTime;
+ return R.assoc('latestSavedTime', latestSavedTime, pair);
+ });
+
+ // get token symbols used by Crypto Compare
+ const allCoinsResp = await fetchAsync(COINLIST_API);
+ if (allCoinsResp.status !== HTTP_OK_STATUS) {
+ return [];
+ }
+ const allCoins: CryptoCompareCoinListResp = await allCoinsResp.json();
+ const erc20CoinsIndex: Map<string, string> = new Map();
+ Object.entries(allCoins.Data).forEach(pair => {
+ const [symbol, coinData] = pair;
+ if (coinData.BuiltOn === ETHEREUM_IDENTIFIER && coinData.SmartContractAddress !== 'N/A') {
+ erc20CoinsIndex.set(coinData.SmartContractAddress.toLowerCase(), symbol);
+ }
+ });
+
+ // fetch all tokens that are traded on 0x
+ const rawEventTokenAddresses: Array<{ tokenaddress: string }> = await conn.query(
+ `SELECT DISTINCT(maker_token_address) as tokenaddress FROM raw.exchange_fill_events UNION
+ SELECT DISTINCT(taker_token_address) as tokenaddress FROM raw.exchange_fill_events`,
+ );
+
+ // tslint:disable-next-line:no-unbound-method
+ const eventTokenAddresses = R.pluck('tokenaddress', rawEventTokenAddresses).map(R.toLower);
+
+ // join token addresses with CC symbols
+ const eventTokenSymbols: string[] = eventTokenAddresses
+ .filter(tokenAddress => erc20CoinsIndex.has(tokenAddress))
+ .map(tokenAddress => erc20CoinsIndex.get(tokenAddress) as string);
+
+ // join traded tokens with fiat and latest backfill time
+ const eventTradingPairs: TradingPair[] = R.chain(sym => {
+ return TO_CURRENCIES.map(fiat => {
+ const pair = {
+ fromSymbol: sym,
+ toSymbol: fiat,
+ latestSavedTime: R.path<number>([sym, fiat], latestTradingPairsIndex) || earliestBackfillTime,
+ };
+ return pair;
+ });
+ }, eventTokenSymbols);
+
+ // join with special cases
+ return R.concat(eventTradingPairs, specialCases);
+}
diff --git a/packages/pipeline/src/utils/index.ts b/packages/pipeline/src/utils/index.ts
new file mode 100644
index 000000000..094c0178e
--- /dev/null
+++ b/packages/pipeline/src/utils/index.ts
@@ -0,0 +1,53 @@
+import { BigNumber } from '@0x/utils';
+export * from './transformers';
+export * from './constants';
+
+/**
+ * If the given BigNumber is not null, returns the string representation of that
+ * number. Otherwise, returns null.
+ * @param n The number to convert.
+ */
+export function bigNumbertoStringOrNull(n: BigNumber): string | null {
+ if (n == null) {
+ return null;
+ }
+ return n.toString();
+}
+
+/**
+ * If value is null or undefined, returns null. Otherwise converts value to a
+ * BigNumber.
+ * @param value A string or number to be converted to a BigNumber
+ */
+export function toBigNumberOrNull(value: string | number | null): BigNumber | null {
+ switch (value) {
+ case null:
+ case undefined:
+ return null;
+ default:
+ return new BigNumber(value);
+ }
+}
+
+/**
+ * Logs an error by intelligently checking for `message` and `stack` properties.
+ * Intended for use with top-level immediately invoked asynchronous functions.
+ * @param e the error to log.
+ */
+export function handleError(e: any): void {
+ if (e.message != null) {
+ // tslint:disable-next-line:no-console
+ console.error(e.message);
+ } else {
+ // tslint:disable-next-line:no-console
+ console.error('Unknown error');
+ }
+ if (e.stack != null) {
+ // tslint:disable-next-line:no-console
+ console.error(e.stack);
+ } else {
+ // tslint:disable-next-line:no-console
+ console.error('(No stack trace)');
+ }
+ process.exit(1);
+}
diff --git a/packages/pipeline/src/utils/transformers/asset_proxy_id_types.ts b/packages/pipeline/src/utils/transformers/asset_proxy_id_types.ts
new file mode 100644
index 000000000..2cd05a616
--- /dev/null
+++ b/packages/pipeline/src/utils/transformers/asset_proxy_id_types.ts
@@ -0,0 +1,20 @@
+import { AssetProxyId } from '@0x/types';
+
+import { AssetType } from '../../types';
+
+/**
+ * Converts an assetProxyId to its string equivalent
+ * @param assetProxyId Id of AssetProxy
+ */
+export function convertAssetProxyIdToType(assetProxyId: AssetProxyId): AssetType {
+ switch (assetProxyId) {
+ case AssetProxyId.ERC20:
+ return AssetType.ERC20;
+ case AssetProxyId.ERC721:
+ return AssetType.ERC721;
+ case AssetProxyId.MultiAsset:
+ return AssetType.MultiAsset;
+ default:
+ throw new Error(`${assetProxyId} not a supported assetProxyId`);
+ }
+}
diff --git a/packages/pipeline/src/utils/transformers/big_number.ts b/packages/pipeline/src/utils/transformers/big_number.ts
new file mode 100644
index 000000000..5f2e4d565
--- /dev/null
+++ b/packages/pipeline/src/utils/transformers/big_number.ts
@@ -0,0 +1,16 @@
+import { BigNumber } from '@0x/utils';
+import { ValueTransformer } from 'typeorm/decorator/options/ValueTransformer';
+
+export class BigNumberTransformer implements ValueTransformer {
+ // tslint:disable-next-line:prefer-function-over-method
+ public to(value: BigNumber | null): string | null {
+ return value === null ? null : value.toString();
+ }
+
+ // tslint:disable-next-line:prefer-function-over-method
+ public from(value: string | null): BigNumber | null {
+ return value === null ? null : new BigNumber(value);
+ }
+}
+
+export const bigNumberTransformer = new BigNumberTransformer();
diff --git a/packages/pipeline/src/utils/transformers/index.ts b/packages/pipeline/src/utils/transformers/index.ts
new file mode 100644
index 000000000..31a4c9223
--- /dev/null
+++ b/packages/pipeline/src/utils/transformers/index.ts
@@ -0,0 +1,3 @@
+export * from './big_number';
+export * from './number_to_bigint';
+export * from './asset_proxy_id_types';
diff --git a/packages/pipeline/src/utils/transformers/number_to_bigint.ts b/packages/pipeline/src/utils/transformers/number_to_bigint.ts
new file mode 100644
index 000000000..9736d7c18
--- /dev/null
+++ b/packages/pipeline/src/utils/transformers/number_to_bigint.ts
@@ -0,0 +1,31 @@
+import { BigNumber } from '@0x/utils';
+import { ValueTransformer } from 'typeorm/decorator/options/ValueTransformer';
+
+const decimalRadix = 10;
+
+// Can be used to convert a JavaScript number type to a Postgres bigint type and
+// vice versa. By default TypeORM will silently convert number types to string
+// if the corresponding Postgres type is bigint. See
+// https://github.com/typeorm/typeorm/issues/2400 for more information.
+export class NumberToBigIntTransformer implements ValueTransformer {
+ // tslint:disable-next-line:prefer-function-over-method
+ public to(value: number): string | null {
+ if (value === null || value === undefined) {
+ return null;
+ } else {
+ return value.toString();
+ }
+ }
+
+ // tslint:disable-next-line:prefer-function-over-method
+ public from(value: string): number {
+ if (new BigNumber(value).greaterThan(Number.MAX_SAFE_INTEGER)) {
+ throw new Error(
+ `Attempted to convert PostgreSQL bigint value (${value}) to JavaScript number type but it is too big to safely convert`,
+ );
+ }
+ return Number.parseInt(value, decimalRadix);
+ }
+}
+
+export const numberToBigIntTransformer = new NumberToBigIntTransformer();