aboutsummaryrefslogtreecommitdiffstats
path: root/packages/pipeline/src/data_sources
diff options
context:
space:
mode:
Diffstat (limited to 'packages/pipeline/src/data_sources')
-rw-r--r--packages/pipeline/src/data_sources/bloxy/index.ts133
-rw-r--r--packages/pipeline/src/data_sources/contract-wrappers/erc20_events.ts45
-rw-r--r--packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts59
-rw-r--r--packages/pipeline/src/data_sources/contract-wrappers/utils.ts67
-rw-r--r--packages/pipeline/src/data_sources/copper/index.ts126
-rw-r--r--packages/pipeline/src/data_sources/ddex/index.ts78
-rw-r--r--packages/pipeline/src/data_sources/idex/index.ts82
-rw-r--r--packages/pipeline/src/data_sources/oasis/index.ts103
-rw-r--r--packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts110
-rw-r--r--packages/pipeline/src/data_sources/paradex/index.ts92
-rw-r--r--packages/pipeline/src/data_sources/relayer-registry/index.ts33
-rw-r--r--packages/pipeline/src/data_sources/trusted_tokens/index.ts29
-rw-r--r--packages/pipeline/src/data_sources/web3/index.ts22
13 files changed, 979 insertions, 0 deletions
diff --git a/packages/pipeline/src/data_sources/bloxy/index.ts b/packages/pipeline/src/data_sources/bloxy/index.ts
new file mode 100644
index 000000000..94468d25a
--- /dev/null
+++ b/packages/pipeline/src/data_sources/bloxy/index.ts
@@ -0,0 +1,133 @@
+import axios from 'axios';
+import * as R from 'ramda';
+
+// URL to use for getting dex trades from Bloxy.
+export const BLOXY_DEX_TRADES_URL = 'https://bloxy.info/api/dex/trades';
+// Number of trades to get at once. Must be less than or equal to MAX_OFFSET.
+const TRADES_PER_QUERY = 10000;
+// Maximum offset supported by the Bloxy API.
+const MAX_OFFSET = 100000;
+// Buffer to subtract from offset. This means we will request some trades twice
+// but we have less chance on missing out on any data.
+const OFFSET_BUFFER = 1000;
+// Maximum number of days supported by the Bloxy API.
+const MAX_DAYS = 30;
+// Buffer used for comparing the last seen timestamp to the last returned
+// timestamp. Increasing this reduces chances of data loss but also creates more
+// redundancy and can impact performance.
+// tslint:disable-next-line:custom-no-magic-numbers
+const LAST_SEEN_TIMESTAMP_BUFFER_MS = 1000 * 60 * 30; // 30 minutes
+
+// tslint:disable-next-line:custom-no-magic-numbers
+const millisecondsPerDay = 1000 * 60 * 60 * 24; // ms/d = ms/s * s/m * m/h * h/d
+
+export interface BloxyTrade {
+ tx_hash: string;
+ tx_time: string;
+ tx_date: string;
+ tx_sender: string;
+ smart_contract_id: number;
+ smart_contract_address: string;
+ contract_type: string;
+ maker: string;
+ taker: string;
+ amountBuy: number;
+ makerFee: number;
+ buyCurrencyId: number;
+ buySymbol: string;
+ amountSell: number;
+ takerFee: number;
+ sellCurrencyId: number;
+ sellSymbol: string;
+ maker_annotation: string;
+ taker_annotation: string;
+ protocol: string;
+ buyAddress: string | null;
+ sellAddress: string | null;
+}
+
+interface BloxyError {
+ error: string;
+}
+
+type BloxyResponse<T> = T | BloxyError;
+type BloxyTradeResponse = BloxyResponse<BloxyTrade[]>;
+
+function isError<T>(response: BloxyResponse<T>): response is BloxyError {
+ return (response as BloxyError).error !== undefined;
+}
+
+export class BloxySource {
+ private readonly _apiKey: string;
+
+ constructor(apiKey: string) {
+ this._apiKey = apiKey;
+ }
+
+ /**
+ * Gets all latest trades between the lastSeenTimestamp (minus some buffer)
+ * and the current time. Note that because the Bloxy API has some hard
+ * limits it might not always be possible to get *all* the trades in the
+ * desired time range.
+ * @param lastSeenTimestamp The latest timestamp for trades that have
+ * already been seen.
+ */
+ public async getDexTradesAsync(lastSeenTimestamp: number): Promise<BloxyTrade[]> {
+ let allTrades: BloxyTrade[] = [];
+
+ // Clamp numberOfDays so that it is always between 1 and MAX_DAYS (inclusive)
+ const numberOfDays = R.clamp(1, MAX_DAYS, getDaysSinceTimestamp(lastSeenTimestamp));
+
+ // Keep getting trades until we hit one of the following conditions:
+ //
+ // 1. Offset hits MAX_OFFSET (we can't go back any further).
+ // 2. There are no more trades in the response.
+ // 3. We see a tx_time equal to or earlier than lastSeenTimestamp (plus
+ // some buffer).
+ //
+ for (let offset = 0; offset <= MAX_OFFSET; offset += TRADES_PER_QUERY - OFFSET_BUFFER) {
+ const trades = await this._getTradesWithOffsetAsync(numberOfDays, offset);
+ if (trades.length === 0) {
+ // There are no more trades left for the days we are querying.
+ // This means we are done.
+ return filterDuplicateTrades(allTrades);
+ }
+ const sortedTrades = R.reverse(R.sortBy(trade => trade.tx_time, trades));
+ allTrades = allTrades.concat(sortedTrades);
+
+ // Check if lastReturnedTimestamp < lastSeenTimestamp
+ const lastReturnedTimestamp = new Date(sortedTrades[0].tx_time).getTime();
+ if (lastReturnedTimestamp < lastSeenTimestamp - LAST_SEEN_TIMESTAMP_BUFFER_MS) {
+ // We are at the point where we have already seen trades for the
+ // timestamp range that is being returned. We're done.
+ return filterDuplicateTrades(allTrades);
+ }
+ }
+ return filterDuplicateTrades(allTrades);
+ }
+
+ private async _getTradesWithOffsetAsync(numberOfDays: number, offset: number): Promise<BloxyTrade[]> {
+ const resp = await axios.get<BloxyTradeResponse>(BLOXY_DEX_TRADES_URL, {
+ params: {
+ key: this._apiKey,
+ days: numberOfDays,
+ limit: TRADES_PER_QUERY,
+ offset,
+ },
+ });
+ if (isError(resp.data)) {
+ throw new Error(`Error in Bloxy API response: ${resp.data.error}`);
+ }
+ return resp.data;
+ }
+}
+
+// Computes the number of days between the given timestamp and the current
+// timestamp (rounded up).
+function getDaysSinceTimestamp(timestamp: number): number {
+ const msSinceTimestamp = Date.now() - timestamp;
+ const daysSinceTimestamp = msSinceTimestamp / millisecondsPerDay;
+ return Math.ceil(daysSinceTimestamp);
+}
+
+const filterDuplicateTrades = R.uniqBy((trade: BloxyTrade) => trade.tx_hash);
diff --git a/packages/pipeline/src/data_sources/contract-wrappers/erc20_events.ts b/packages/pipeline/src/data_sources/contract-wrappers/erc20_events.ts
new file mode 100644
index 000000000..e0098122f
--- /dev/null
+++ b/packages/pipeline/src/data_sources/contract-wrappers/erc20_events.ts
@@ -0,0 +1,45 @@
+import {
+ ContractWrappers,
+ ERC20TokenApprovalEventArgs,
+ ERC20TokenEvents,
+ ERC20TokenWrapper,
+} from '@0x/contract-wrappers';
+import { Web3ProviderEngine } from '@0x/subproviders';
+import { LogWithDecodedArgs } from 'ethereum-types';
+
+import { GetEventsFunc, getEventsWithPaginationAsync } from './utils';
+
+export class ERC20EventsSource {
+ private readonly _erc20Wrapper: ERC20TokenWrapper;
+ private readonly _tokenAddress: string;
+ constructor(provider: Web3ProviderEngine, networkId: number, tokenAddress: string) {
+ const contractWrappers = new ContractWrappers(provider, { networkId });
+ this._erc20Wrapper = contractWrappers.erc20Token;
+ this._tokenAddress = tokenAddress;
+ }
+
+ public async getApprovalEventsAsync(
+ startBlock: number,
+ endBlock: number,
+ ): Promise<Array<LogWithDecodedArgs<ERC20TokenApprovalEventArgs>>> {
+ return getEventsWithPaginationAsync(
+ this._getApprovalEventsForRangeAsync.bind(this) as GetEventsFunc<ERC20TokenApprovalEventArgs>,
+ startBlock,
+ endBlock,
+ );
+ }
+
+ // Gets all approval events of for a specific sub-range. This getter
+ // function will be called during each step of pagination.
+ private async _getApprovalEventsForRangeAsync(
+ fromBlock: number,
+ toBlock: number,
+ ): Promise<Array<LogWithDecodedArgs<ERC20TokenApprovalEventArgs>>> {
+ return this._erc20Wrapper.getLogsAsync<ERC20TokenApprovalEventArgs>(
+ this._tokenAddress,
+ ERC20TokenEvents.Approval,
+ { fromBlock, toBlock },
+ {},
+ );
+ }
+}
diff --git a/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts b/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts
new file mode 100644
index 000000000..58691e2ab
--- /dev/null
+++ b/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts
@@ -0,0 +1,59 @@
+import {
+ ContractWrappers,
+ ExchangeCancelEventArgs,
+ ExchangeCancelUpToEventArgs,
+ ExchangeEventArgs,
+ ExchangeEvents,
+ ExchangeFillEventArgs,
+ ExchangeWrapper,
+} from '@0x/contract-wrappers';
+import { Web3ProviderEngine } from '@0x/subproviders';
+import { LogWithDecodedArgs } from 'ethereum-types';
+
+import { GetEventsFunc, getEventsWithPaginationAsync } from './utils';
+
+export class ExchangeEventsSource {
+ private readonly _exchangeWrapper: ExchangeWrapper;
+ constructor(provider: Web3ProviderEngine, networkId: number) {
+ const contractWrappers = new ContractWrappers(provider, { networkId });
+ this._exchangeWrapper = contractWrappers.exchange;
+ }
+
+ public async getFillEventsAsync(
+ startBlock: number,
+ endBlock: number,
+ ): Promise<Array<LogWithDecodedArgs<ExchangeFillEventArgs>>> {
+ const getFillEventsForRangeAsync = this._makeGetterFuncForEventType<ExchangeFillEventArgs>(ExchangeEvents.Fill);
+ return getEventsWithPaginationAsync(getFillEventsForRangeAsync, startBlock, endBlock);
+ }
+
+ public async getCancelEventsAsync(
+ startBlock: number,
+ endBlock: number,
+ ): Promise<Array<LogWithDecodedArgs<ExchangeCancelEventArgs>>> {
+ const getCancelEventsForRangeAsync = this._makeGetterFuncForEventType<ExchangeCancelEventArgs>(
+ ExchangeEvents.Cancel,
+ );
+ return getEventsWithPaginationAsync(getCancelEventsForRangeAsync, startBlock, endBlock);
+ }
+
+ public async getCancelUpToEventsAsync(
+ startBlock: number,
+ endBlock: number,
+ ): Promise<Array<LogWithDecodedArgs<ExchangeCancelUpToEventArgs>>> {
+ const getCancelUpToEventsForRangeAsync = this._makeGetterFuncForEventType<ExchangeCancelUpToEventArgs>(
+ ExchangeEvents.CancelUpTo,
+ );
+ return getEventsWithPaginationAsync(getCancelUpToEventsForRangeAsync, startBlock, endBlock);
+ }
+
+ // Returns a getter function which gets all events of a specific type for a
+ // specific sub-range. This getter function will be called during each step
+ // of pagination.
+ private _makeGetterFuncForEventType<ArgsType extends ExchangeEventArgs>(
+ eventType: ExchangeEvents,
+ ): GetEventsFunc<ArgsType> {
+ return async (fromBlock: number, toBlock: number) =>
+ this._exchangeWrapper.getLogsAsync<ArgsType>(eventType, { fromBlock, toBlock }, {});
+ }
+}
diff --git a/packages/pipeline/src/data_sources/contract-wrappers/utils.ts b/packages/pipeline/src/data_sources/contract-wrappers/utils.ts
new file mode 100644
index 000000000..67660a37e
--- /dev/null
+++ b/packages/pipeline/src/data_sources/contract-wrappers/utils.ts
@@ -0,0 +1,67 @@
+import { DecodedLogArgs, LogWithDecodedArgs } from 'ethereum-types';
+
+const NUM_BLOCKS_PER_QUERY = 10000; // Number of blocks to query for events at a time.
+const NUM_RETRIES = 3; // Number of retries if a request fails or times out.
+
+export type GetEventsFunc<ArgsType extends DecodedLogArgs> = (
+ fromBlock: number,
+ toBlock: number,
+) => Promise<Array<LogWithDecodedArgs<ArgsType>>>;
+
+/**
+ * Gets all events between the given startBlock and endBlock by querying for
+ * NUM_BLOCKS_PER_QUERY at a time. Accepts a getter function in order to
+ * maximize code re-use and allow for getting different types of events for
+ * different contracts. If the getter function throws with a retryable error,
+ * it will automatically be retried up to NUM_RETRIES times.
+ * @param getEventsAsync A getter function which will be called for each step during pagination.
+ * @param startBlock The start of the entire block range to get events for.
+ * @param endBlock The end of the entire block range to get events for.
+ */
+export async function getEventsWithPaginationAsync<ArgsType extends DecodedLogArgs>(
+ getEventsAsync: GetEventsFunc<ArgsType>,
+ startBlock: number,
+ endBlock: number,
+): Promise<Array<LogWithDecodedArgs<ArgsType>>> {
+ let events: Array<LogWithDecodedArgs<ArgsType>> = [];
+ for (let fromBlock = startBlock; fromBlock <= endBlock; fromBlock += NUM_BLOCKS_PER_QUERY) {
+ const toBlock = Math.min(fromBlock + NUM_BLOCKS_PER_QUERY - 1, endBlock);
+ const eventsInRange = await _getEventsWithRetriesAsync(getEventsAsync, NUM_RETRIES, fromBlock, toBlock);
+ events = events.concat(eventsInRange);
+ }
+ return events;
+}
+
+/**
+ * Calls the getEventsAsync function and retries up to numRetries times if it
+ * throws with an error that is considered retryable.
+ * @param getEventsAsync a function that will be called on each iteration.
+ * @param numRetries the maximum number times to retry getEventsAsync if it fails with a retryable error.
+ * @param fromBlock the start of the sub-range of blocks we are getting events for.
+ * @param toBlock the end of the sub-range of blocks we are getting events for.
+ */
+export async function _getEventsWithRetriesAsync<ArgsType extends DecodedLogArgs>(
+ getEventsAsync: GetEventsFunc<ArgsType>,
+ numRetries: number,
+ fromBlock: number,
+ toBlock: number,
+): Promise<Array<LogWithDecodedArgs<ArgsType>>> {
+ let eventsInRange: Array<LogWithDecodedArgs<ArgsType>> = [];
+ for (let i = 0; i <= numRetries; i++) {
+ try {
+ eventsInRange = await getEventsAsync(fromBlock, toBlock);
+ } catch (err) {
+ if (isErrorRetryable(err) && i < numRetries) {
+ continue;
+ } else {
+ throw err;
+ }
+ }
+ break;
+ }
+ return eventsInRange;
+}
+
+function isErrorRetryable(err: Error): boolean {
+ return err.message.includes('network timeout');
+}
diff --git a/packages/pipeline/src/data_sources/copper/index.ts b/packages/pipeline/src/data_sources/copper/index.ts
new file mode 100644
index 000000000..15df2fd7d
--- /dev/null
+++ b/packages/pipeline/src/data_sources/copper/index.ts
@@ -0,0 +1,126 @@
+import { fetchAsync } from '@0x/utils';
+import Bottleneck from 'bottleneck';
+
+import {
+ CopperActivityTypeCategory,
+ CopperActivityTypeResponse,
+ CopperCustomFieldResponse,
+ CopperSearchResponse,
+} from '../../parsers/copper';
+
+const HTTP_OK_STATUS = 200;
+const COPPER_URI = 'https://api.prosperworks.com/developer_api/v1';
+
+const DEFAULT_PAGINATION_PARAMS = {
+ page_size: 200,
+ sort_by: 'date_modified',
+ sort_direction: 'desc',
+};
+
+export type CopperSearchParams = CopperLeadSearchParams | CopperActivitySearchParams | CopperOpportunitySearchParams;
+export interface CopperLeadSearchParams {
+ page_number?: number;
+}
+
+export interface CopperActivitySearchParams {
+ minimum_activity_date: number;
+ page_number?: number;
+}
+
+export interface CopperOpportunitySearchParams {
+ sort_by: string; // must override the default 'date_modified' for this endpoint
+ page_number?: number;
+}
+export enum CopperEndpoint {
+ Leads = '/leads/search',
+ Opportunities = '/opportunities/search',
+ Activities = '/activities/search',
+}
+const ONE_SECOND = 1000;
+
+function httpErrorCheck(response: Response): void {
+ if (response.status !== HTTP_OK_STATUS) {
+ throw new Error(`HTTP error while scraping Copper: [${JSON.stringify(response)}]`);
+ }
+}
+export class CopperSource {
+ private readonly _accessToken: string;
+ private readonly _userEmail: string;
+ private readonly _defaultHeaders: any;
+ private readonly _limiter: Bottleneck;
+
+ constructor(maxConcurrentRequests: number, accessToken: string, userEmail: string) {
+ this._accessToken = accessToken;
+ this._userEmail = userEmail;
+ this._defaultHeaders = {
+ 'Content-Type': 'application/json',
+ 'X-PW-AccessToken': this._accessToken,
+ 'X-PW-Application': 'developer_api',
+ 'X-PW-UserEmail': this._userEmail,
+ };
+ this._limiter = new Bottleneck({
+ minTime: ONE_SECOND / maxConcurrentRequests,
+ reservoir: 30,
+ reservoirRefreshAmount: 30,
+ reservoirRefreshInterval: maxConcurrentRequests,
+ });
+ }
+
+ public async fetchNumberOfPagesAsync(endpoint: CopperEndpoint, searchParams?: CopperSearchParams): Promise<number> {
+ const resp = await this._limiter.schedule(() =>
+ fetchAsync(COPPER_URI + endpoint, {
+ method: 'POST',
+ body: JSON.stringify({ ...DEFAULT_PAGINATION_PARAMS, ...searchParams }),
+ headers: this._defaultHeaders,
+ }),
+ );
+
+ httpErrorCheck(resp);
+
+ // total number of records that match the request parameters
+ if (resp.headers.has('X-Pw-Total')) {
+ const totalRecords: number = parseInt(resp.headers.get('X-Pw-Total') as string, 10); // tslint:disable-line:custom-no-magic-numbers
+ return Math.ceil(totalRecords / DEFAULT_PAGINATION_PARAMS.page_size);
+ } else {
+ return 1;
+ }
+ }
+ public async fetchSearchResultsAsync<T extends CopperSearchResponse>(
+ endpoint: CopperEndpoint,
+ searchParams?: CopperSearchParams,
+ ): Promise<T[]> {
+ const request = { ...DEFAULT_PAGINATION_PARAMS, ...searchParams };
+ const response = await this._limiter.schedule(() =>
+ fetchAsync(COPPER_URI + endpoint, {
+ method: 'POST',
+ body: JSON.stringify(request),
+ headers: this._defaultHeaders,
+ }),
+ );
+ httpErrorCheck(response);
+ const json: T[] = await response.json();
+ return json;
+ }
+
+ public async fetchActivityTypesAsync(): Promise<Map<CopperActivityTypeCategory, CopperActivityTypeResponse[]>> {
+ const response = await this._limiter.schedule(() =>
+ fetchAsync(`${COPPER_URI}/activity_types`, {
+ method: 'GET',
+ headers: this._defaultHeaders,
+ }),
+ );
+ httpErrorCheck(response);
+ return response.json();
+ }
+
+ public async fetchCustomFieldsAsync(): Promise<CopperCustomFieldResponse[]> {
+ const response = await this._limiter.schedule(() =>
+ fetchAsync(`${COPPER_URI}/custom_field_definitions`, {
+ method: 'GET',
+ headers: this._defaultHeaders,
+ }),
+ );
+ httpErrorCheck(response);
+ return response.json();
+ }
+}
diff --git a/packages/pipeline/src/data_sources/ddex/index.ts b/packages/pipeline/src/data_sources/ddex/index.ts
new file mode 100644
index 000000000..2bbd8c29b
--- /dev/null
+++ b/packages/pipeline/src/data_sources/ddex/index.ts
@@ -0,0 +1,78 @@
+import { fetchAsync, logUtils } from '@0x/utils';
+
+const DDEX_BASE_URL = 'https://api.ddex.io/v2';
+const ACTIVE_MARKETS_URL = `${DDEX_BASE_URL}/markets`;
+const NO_AGGREGATION_LEVEL = 3; // See https://docs.ddex.io/#get-orderbook
+const ORDERBOOK_ENDPOINT = `/orderbook?level=${NO_AGGREGATION_LEVEL}`;
+export const DDEX_SOURCE = 'ddex';
+
+export interface DdexActiveMarketsResponse {
+ status: number;
+ desc: string;
+ data: {
+ markets: DdexMarket[];
+ };
+}
+
+export interface DdexMarket {
+ id: string;
+ quoteToken: string;
+ quoteTokenDecimals: number;
+ quoteTokenAddress: string;
+ baseToken: string;
+ baseTokenDecimals: number;
+ baseTokenAddress: string;
+ minOrderSize: string;
+ maxOrderSize: string;
+ pricePrecision: number;
+ priceDecimals: number;
+ amountDecimals: number;
+}
+
+export interface DdexOrderbookResponse {
+ status: number;
+ desc: string;
+ data: {
+ orderBook: DdexOrderbook;
+ };
+}
+
+export interface DdexOrderbook {
+ marketId: string;
+ bids: DdexOrder[];
+ asks: DdexOrder[];
+}
+
+export interface DdexOrder {
+ price: string;
+ amount: string;
+ orderId: string;
+}
+
+// tslint:disable:prefer-function-over-method
+// ^ Keep consistency with other sources and help logical organization
+export class DdexSource {
+ /**
+ * Call Ddex API to find out which markets they are maintaining orderbooks for.
+ */
+ public async getActiveMarketsAsync(): Promise<DdexMarket[]> {
+ logUtils.log('Getting all active DDEX markets');
+ const resp = await fetchAsync(ACTIVE_MARKETS_URL);
+ const respJson: DdexActiveMarketsResponse = await resp.json();
+ const markets = respJson.data.markets;
+ logUtils.log(`Got ${markets.length} markets.`);
+ return markets;
+ }
+
+ /**
+ * Retrieve orderbook from Ddex API for a given market.
+ * @param marketId String identifying the market we want data for. Eg. 'REP/AUG'
+ */
+ public async getMarketOrderbookAsync(marketId: string): Promise<DdexOrderbook> {
+ logUtils.log(`${marketId}: Retrieving orderbook.`);
+ const marketOrderbookUrl = `${ACTIVE_MARKETS_URL}/${marketId}${ORDERBOOK_ENDPOINT}`;
+ const resp = await fetchAsync(marketOrderbookUrl);
+ const respJson: DdexOrderbookResponse = await resp.json();
+ return respJson.data.orderBook;
+ }
+}
diff --git a/packages/pipeline/src/data_sources/idex/index.ts b/packages/pipeline/src/data_sources/idex/index.ts
new file mode 100644
index 000000000..c1e53c08d
--- /dev/null
+++ b/packages/pipeline/src/data_sources/idex/index.ts
@@ -0,0 +1,82 @@
+import { fetchAsync } from '@0x/utils';
+
+const IDEX_BASE_URL = 'https://api.idex.market';
+const MARKETS_URL = `${IDEX_BASE_URL}/returnTicker`;
+const ORDERBOOK_URL = `${IDEX_BASE_URL}/returnOrderBook`;
+const MAX_ORDER_COUNT = 100; // Maximum based on https://github.com/AuroraDAO/idex-api-docs#returnorderbook
+export const IDEX_SOURCE = 'idex';
+
+export interface IdexMarketsResponse {
+ [marketName: string]: IdexMarket;
+}
+
+export interface IdexMarket {
+ last: string;
+ high: string;
+ low: string;
+ lowestAsk: string;
+ highestBid: string;
+ percentChange: string;
+ baseVolume: string;
+ quoteVolume: string;
+}
+
+export interface IdexOrderbook {
+ asks: IdexOrder[];
+ bids: IdexOrder[];
+}
+
+export interface IdexOrder {
+ price: string;
+ amount: string;
+ total: string;
+ orderHash: string;
+ params: IdexOrderParam;
+}
+
+export interface IdexOrderParam {
+ tokenBuy: string;
+ buySymbol: string;
+ buyPrecision: number;
+ amountBuy: string;
+ tokenSell: string;
+ sellSymbol: string;
+ sellPrecision: number;
+ amountSell: string;
+ expires: number;
+ nonce: number;
+ user: string;
+}
+
+// tslint:disable:prefer-function-over-method
+// ^ Keep consistency with other sources and help logical organization
+export class IdexSource {
+ /**
+ * Call Idex API to find out which markets they are maintaining orderbooks for.
+ */
+ public async getMarketsAsync(): Promise<string[]> {
+ const params = { method: 'POST' };
+ const resp = await fetchAsync(MARKETS_URL, params);
+ const respJson: IdexMarketsResponse = await resp.json();
+ const markets: string[] = Object.keys(respJson);
+ return markets;
+ }
+
+ /**
+ * Retrieve orderbook from Idex API for a given market.
+ * @param marketId String identifying the market we want data for. Eg. 'REP_AUG'
+ */
+ public async getMarketOrderbookAsync(marketId: string): Promise<IdexOrderbook> {
+ const params = {
+ method: 'POST',
+ headers: { 'Content-Type': 'application/json' },
+ body: JSON.stringify({
+ market: marketId,
+ count: MAX_ORDER_COUNT,
+ }),
+ };
+ const resp = await fetchAsync(ORDERBOOK_URL, params);
+ const respJson: IdexOrderbook = await resp.json();
+ return respJson;
+ }
+}
diff --git a/packages/pipeline/src/data_sources/oasis/index.ts b/packages/pipeline/src/data_sources/oasis/index.ts
new file mode 100644
index 000000000..3b30e9dfd
--- /dev/null
+++ b/packages/pipeline/src/data_sources/oasis/index.ts
@@ -0,0 +1,103 @@
+import { fetchAsync } from '@0x/utils';
+
+const OASIS_BASE_URL = 'https://data.makerdao.com/v1';
+const OASIS_MARKET_QUERY = `query {
+ oasisMarkets(period: "1 week") {
+ nodes {
+ id
+ base
+ quote
+ buyVol
+ sellVol
+ price
+ high
+ low
+ }
+ }
+}`;
+const OASIS_ORDERBOOK_QUERY = `query ($market: String!) {
+ allOasisOrders(condition: { market: $market }) {
+ totalCount
+ nodes {
+ market
+ offerId
+ price
+ amount
+ act
+ }
+ }
+}`;
+export const OASIS_SOURCE = 'oasis';
+
+export interface OasisMarket {
+ id: string; // market symbol e.g MKRDAI
+ base: string; // base symbol e.g MKR
+ quote: string; // quote symbol e.g DAI
+ buyVol: number; // total buy volume (base)
+ sellVol: number; // total sell volume (base)
+ price: number; // volume weighted price (quote)
+ high: number; // max sell price
+ low: number; // min buy price
+}
+
+export interface OasisMarketResponse {
+ data: {
+ oasisMarkets: {
+ nodes: OasisMarket[];
+ };
+ };
+}
+
+export interface OasisOrder {
+ offerId: number; // Offer Id
+ market: string; // Market symbol (base/quote)
+ price: string; // Offer price (quote)
+ amount: string; // Offer amount (base)
+ act: string; // Action (ask|bid)
+}
+
+export interface OasisOrderbookResponse {
+ data: {
+ allOasisOrders: {
+ totalCount: number;
+ nodes: OasisOrder[];
+ };
+ };
+}
+
+// tslint:disable:prefer-function-over-method
+// ^ Keep consistency with other sources and help logical organization
+export class OasisSource {
+ /**
+ * Call Ddex API to find out which markets they are maintaining orderbooks for.
+ */
+ public async getActiveMarketsAsync(): Promise<OasisMarket[]> {
+ const params = {
+ method: 'POST',
+ headers: { 'Content-Type': 'application/json' },
+ body: JSON.stringify({ query: OASIS_MARKET_QUERY }),
+ };
+ const resp = await fetchAsync(OASIS_BASE_URL, params);
+ const respJson: OasisMarketResponse = await resp.json();
+ const markets = respJson.data.oasisMarkets.nodes;
+ return markets;
+ }
+
+ /**
+ * Retrieve orderbook from Oasis API for a given market.
+ * @param marketId String identifying the market we want data for. Eg. 'REPAUG'.
+ */
+ public async getMarketOrderbookAsync(marketId: string): Promise<OasisOrder[]> {
+ const input = {
+ market: marketId,
+ };
+ const params = {
+ method: 'POST',
+ headers: { 'Content-Type': 'application/json' },
+ body: JSON.stringify({ query: OASIS_ORDERBOOK_QUERY, variables: input }),
+ };
+ const resp = await fetchAsync(OASIS_BASE_URL, params);
+ const respJson: OasisOrderbookResponse = await resp.json();
+ return respJson.data.allOasisOrders.nodes;
+ }
+}
diff --git a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts
new file mode 100644
index 000000000..85042501b
--- /dev/null
+++ b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts
@@ -0,0 +1,110 @@
+// tslint:disable:no-duplicate-imports
+import { fetchAsync } from '@0x/utils';
+import Bottleneck from 'bottleneck';
+import { stringify } from 'querystring';
+import * as R from 'ramda';
+
+import { TradingPair } from '../../utils/get_ohlcv_trading_pairs';
+
+export interface CryptoCompareOHLCVResponse {
+ Data: CryptoCompareOHLCVRecord[];
+ Response: string;
+ Message: string;
+ Type: number;
+}
+
+export interface CryptoCompareOHLCVRecord {
+ time: number; // in seconds, not milliseconds
+ close: number;
+ high: number;
+ low: number;
+ open: number;
+ volumefrom: number;
+ volumeto: number;
+}
+
+export interface CryptoCompareOHLCVParams {
+ fsym: string;
+ tsym: string;
+ e?: string;
+ aggregate?: string;
+ aggregatePredictableTimePeriods?: boolean;
+ limit?: number;
+ toTs?: number;
+}
+
+const ONE_HOUR = 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers
+const ONE_SECOND = 1000;
+const ONE_HOUR_AGO = new Date().getTime() - ONE_HOUR;
+const HTTP_OK_STATUS = 200;
+const CRYPTO_COMPARE_VALID_EMPTY_RESPONSE_TYPE = 96;
+const MAX_PAGE_SIZE = 2000;
+
+export class CryptoCompareOHLCVSource {
+ public readonly intervalBetweenRecords = ONE_HOUR;
+ public readonly defaultExchange = 'CCCAGG';
+ public readonly interval = this.intervalBetweenRecords * MAX_PAGE_SIZE; // the hourly API returns data for one interval at a time
+ private readonly _url: string = 'https://min-api.cryptocompare.com/data/histohour?';
+
+ // rate-limit for all API calls through this class instance
+ private readonly _limiter: Bottleneck;
+ constructor(maxReqsPerSecond: number) {
+ this._limiter = new Bottleneck({
+ minTime: ONE_SECOND / maxReqsPerSecond,
+ reservoir: 30,
+ reservoirRefreshAmount: 30,
+ reservoirRefreshInterval: ONE_SECOND,
+ });
+ }
+
+ // gets OHLCV records starting from pair.latest
+ public async getHourlyOHLCVAsync(pair: TradingPair): Promise<CryptoCompareOHLCVRecord[]> {
+ const params = {
+ e: this.defaultExchange,
+ fsym: pair.fromSymbol,
+ tsym: pair.toSymbol,
+ limit: MAX_PAGE_SIZE,
+ toTs: Math.floor((pair.latestSavedTime + this.interval) / ONE_SECOND), // CryptoCompare uses timestamp in seconds. not ms
+ };
+ const url = this._url + stringify(params);
+ const response = await this._limiter.schedule(() => fetchAsync(url));
+ if (response.status !== HTTP_OK_STATUS) {
+ throw new Error(`HTTP error while scraping Crypto Compare: [${response}]`);
+ }
+ const json: CryptoCompareOHLCVResponse = await response.json();
+ if (
+ (json.Response === 'Error' || json.Data.length === 0) &&
+ json.Type !== CRYPTO_COMPARE_VALID_EMPTY_RESPONSE_TYPE
+ ) {
+ throw new Error(JSON.stringify(json));
+ }
+ return json.Data.filter(rec => {
+ return (
+ // Crypto Compare takes ~30 mins to finalise records
+ rec.time * ONE_SECOND < ONE_HOUR_AGO && rec.time * ONE_SECOND > pair.latestSavedTime && hasData(rec)
+ );
+ });
+ }
+ public generateBackfillIntervals(pair: TradingPair): TradingPair[] {
+ const now = new Date().getTime();
+ const f = (p: TradingPair): false | [TradingPair, TradingPair] => {
+ if (p.latestSavedTime > now) {
+ return false;
+ } else {
+ return [p, R.merge(p, { latestSavedTime: p.latestSavedTime + this.interval })];
+ }
+ };
+ return R.unfold(f, pair);
+ }
+}
+
+function hasData(record: CryptoCompareOHLCVRecord): boolean {
+ return (
+ record.close !== 0 ||
+ record.open !== 0 ||
+ record.high !== 0 ||
+ record.low !== 0 ||
+ record.volumefrom !== 0 ||
+ record.volumeto !== 0
+ );
+}
diff --git a/packages/pipeline/src/data_sources/paradex/index.ts b/packages/pipeline/src/data_sources/paradex/index.ts
new file mode 100644
index 000000000..46d448f4b
--- /dev/null
+++ b/packages/pipeline/src/data_sources/paradex/index.ts
@@ -0,0 +1,92 @@
+import { fetchAsync, logUtils } from '@0x/utils';
+
+const PARADEX_BASE_URL = 'https://api.paradex.io/consumer/v0';
+const ACTIVE_MARKETS_URL = `${PARADEX_BASE_URL}/markets`;
+const ORDERBOOK_ENDPOINT = `${PARADEX_BASE_URL}/orderbook`;
+const TOKEN_INFO_ENDPOINT = `${PARADEX_BASE_URL}/tokens`;
+export const PARADEX_SOURCE = 'paradex';
+
+export type ParadexActiveMarketsResponse = ParadexMarket[];
+
+export interface ParadexMarket {
+ id: string;
+ symbol: string;
+ baseToken: string;
+ quoteToken: string;
+ minOrderSize: string;
+ maxOrderSize: string;
+ priceMaxDecimals: number;
+ amountMaxDecimals: number;
+ // These are not native to the Paradex API response. We tag them on later
+ // by calling the token endpoint and joining on symbol.
+ baseTokenAddress?: string;
+ quoteTokenAddress?: string;
+}
+
+export interface ParadexOrderbookResponse {
+ marketId: number;
+ marketSymbol: string;
+ bids: ParadexOrder[];
+ asks: ParadexOrder[];
+}
+
+export interface ParadexOrder {
+ amount: string;
+ price: string;
+}
+
+export type ParadexTokenInfoResponse = ParadexTokenInfo[];
+
+export interface ParadexTokenInfo {
+ name: string;
+ symbol: string;
+ address: string;
+}
+
+export class ParadexSource {
+ private readonly _apiKey: string;
+
+ constructor(apiKey: string) {
+ this._apiKey = apiKey;
+ }
+
+ /**
+ * Call Paradex API to find out which markets they are maintaining orderbooks for.
+ */
+ public async getActiveMarketsAsync(): Promise<ParadexActiveMarketsResponse> {
+ logUtils.log('Getting all active Paradex markets.');
+ const resp = await fetchAsync(ACTIVE_MARKETS_URL, {
+ headers: { 'API-KEY': this._apiKey },
+ });
+ const markets: ParadexActiveMarketsResponse = await resp.json();
+ logUtils.log(`Got ${markets.length} markets.`);
+ return markets;
+ }
+
+ /**
+ * Call Paradex API to find out their token information.
+ */
+ public async getTokenInfoAsync(): Promise<ParadexTokenInfoResponse> {
+ logUtils.log('Getting token information from Paradex.');
+ const resp = await fetchAsync(TOKEN_INFO_ENDPOINT, {
+ headers: { 'API-KEY': this._apiKey },
+ });
+ const tokens: ParadexTokenInfoResponse = await resp.json();
+ logUtils.log(`Got information for ${tokens.length} tokens.`);
+ return tokens;
+ }
+
+ /**
+ * Retrieve orderbook from Paradex API for a given market.
+ * @param marketSymbol String representing the market we want data for.
+ */
+ public async getMarketOrderbookAsync(marketSymbol: string): Promise<ParadexOrderbookResponse> {
+ logUtils.log(`${marketSymbol}: Retrieving orderbook.`);
+ const marketOrderbookUrl = `${ORDERBOOK_ENDPOINT}?market=${marketSymbol}`;
+ const resp = await fetchAsync(marketOrderbookUrl, {
+ headers: { 'API-KEY': this._apiKey },
+ });
+ const orderbookResponse: ParadexOrderbookResponse = await resp.json();
+ return orderbookResponse;
+ }
+}
diff --git a/packages/pipeline/src/data_sources/relayer-registry/index.ts b/packages/pipeline/src/data_sources/relayer-registry/index.ts
new file mode 100644
index 000000000..8133f5eae
--- /dev/null
+++ b/packages/pipeline/src/data_sources/relayer-registry/index.ts
@@ -0,0 +1,33 @@
+import axios from 'axios';
+
+export interface RelayerResponse {
+ name: string;
+ homepage_url: string;
+ app_url: string;
+ header_img: string;
+ logo_img: string;
+ networks: RelayerResponseNetwork[];
+}
+
+export interface RelayerResponseNetwork {
+ networkId: number;
+ sra_http_endpoint?: string;
+ sra_ws_endpoint?: string;
+ static_order_fields?: {
+ fee_recipient_addresses?: string[];
+ taker_addresses?: string[];
+ };
+}
+
+export class RelayerRegistrySource {
+ private readonly _url: string;
+
+ constructor(url: string) {
+ this._url = url;
+ }
+
+ public async getRelayerInfoAsync(): Promise<Map<string, RelayerResponse>> {
+ const resp = await axios.get<Map<string, RelayerResponse>>(this._url);
+ return resp.data;
+ }
+}
diff --git a/packages/pipeline/src/data_sources/trusted_tokens/index.ts b/packages/pipeline/src/data_sources/trusted_tokens/index.ts
new file mode 100644
index 000000000..552739fb9
--- /dev/null
+++ b/packages/pipeline/src/data_sources/trusted_tokens/index.ts
@@ -0,0 +1,29 @@
+import axios from 'axios';
+
+export interface ZeroExTrustedTokenMeta {
+ address: string;
+ name: string;
+ symbol: string;
+ decimals: number;
+}
+
+export interface MetamaskTrustedTokenMeta {
+ address: string;
+ name: string;
+ erc20: boolean;
+ symbol: string;
+ decimals: number;
+}
+
+export class TrustedTokenSource<T> {
+ private readonly _url: string;
+
+ constructor(url: string) {
+ this._url = url;
+ }
+
+ public async getTrustedTokenMetaAsync(): Promise<T> {
+ const resp = await axios.get<T>(this._url);
+ return resp.data;
+ }
+}
diff --git a/packages/pipeline/src/data_sources/web3/index.ts b/packages/pipeline/src/data_sources/web3/index.ts
new file mode 100644
index 000000000..45a9ea161
--- /dev/null
+++ b/packages/pipeline/src/data_sources/web3/index.ts
@@ -0,0 +1,22 @@
+import { Web3ProviderEngine } from '@0x/subproviders';
+import { Web3Wrapper } from '@0x/web3-wrapper';
+import { BlockWithoutTransactionData, Transaction } from 'ethereum-types';
+
+export class Web3Source {
+ private readonly _web3Wrapper: Web3Wrapper;
+ constructor(provider: Web3ProviderEngine) {
+ this._web3Wrapper = new Web3Wrapper(provider);
+ }
+
+ public async getBlockInfoAsync(blockNumber: number): Promise<BlockWithoutTransactionData> {
+ const block = await this._web3Wrapper.getBlockIfExistsAsync(blockNumber);
+ if (block == null) {
+ return Promise.reject(new Error(`Could not find block for given block number: ${blockNumber}`));
+ }
+ return block;
+ }
+
+ public async getTransactionInfoAsync(txHash: string): Promise<Transaction> {
+ return this._web3Wrapper.getTransactionByHashAsync(txHash);
+ }
+}