diff options
Diffstat (limited to 'packages/pipeline/src/data_sources')
13 files changed, 979 insertions, 0 deletions
diff --git a/packages/pipeline/src/data_sources/bloxy/index.ts b/packages/pipeline/src/data_sources/bloxy/index.ts new file mode 100644 index 000000000..94468d25a --- /dev/null +++ b/packages/pipeline/src/data_sources/bloxy/index.ts @@ -0,0 +1,133 @@ +import axios from 'axios'; +import * as R from 'ramda'; + +// URL to use for getting dex trades from Bloxy. +export const BLOXY_DEX_TRADES_URL = 'https://bloxy.info/api/dex/trades'; +// Number of trades to get at once. Must be less than or equal to MAX_OFFSET. +const TRADES_PER_QUERY = 10000; +// Maximum offset supported by the Bloxy API. +const MAX_OFFSET = 100000; +// Buffer to subtract from offset. This means we will request some trades twice +// but we have less chance on missing out on any data. +const OFFSET_BUFFER = 1000; +// Maximum number of days supported by the Bloxy API. +const MAX_DAYS = 30; +// Buffer used for comparing the last seen timestamp to the last returned +// timestamp. Increasing this reduces chances of data loss but also creates more +// redundancy and can impact performance. +// tslint:disable-next-line:custom-no-magic-numbers +const LAST_SEEN_TIMESTAMP_BUFFER_MS = 1000 * 60 * 30; // 30 minutes + +// tslint:disable-next-line:custom-no-magic-numbers +const millisecondsPerDay = 1000 * 60 * 60 * 24; // ms/d = ms/s * s/m * m/h * h/d + +export interface BloxyTrade { + tx_hash: string; + tx_time: string; + tx_date: string; + tx_sender: string; + smart_contract_id: number; + smart_contract_address: string; + contract_type: string; + maker: string; + taker: string; + amountBuy: number; + makerFee: number; + buyCurrencyId: number; + buySymbol: string; + amountSell: number; + takerFee: number; + sellCurrencyId: number; + sellSymbol: string; + maker_annotation: string; + taker_annotation: string; + protocol: string; + buyAddress: string | null; + sellAddress: string | null; +} + +interface BloxyError { + error: string; +} + +type BloxyResponse<T> = T | BloxyError; +type BloxyTradeResponse = BloxyResponse<BloxyTrade[]>; + +function isError<T>(response: BloxyResponse<T>): response is BloxyError { + return (response as BloxyError).error !== undefined; +} + +export class BloxySource { + private readonly _apiKey: string; + + constructor(apiKey: string) { + this._apiKey = apiKey; + } + + /** + * Gets all latest trades between the lastSeenTimestamp (minus some buffer) + * and the current time. Note that because the Bloxy API has some hard + * limits it might not always be possible to get *all* the trades in the + * desired time range. + * @param lastSeenTimestamp The latest timestamp for trades that have + * already been seen. + */ + public async getDexTradesAsync(lastSeenTimestamp: number): Promise<BloxyTrade[]> { + let allTrades: BloxyTrade[] = []; + + // Clamp numberOfDays so that it is always between 1 and MAX_DAYS (inclusive) + const numberOfDays = R.clamp(1, MAX_DAYS, getDaysSinceTimestamp(lastSeenTimestamp)); + + // Keep getting trades until we hit one of the following conditions: + // + // 1. Offset hits MAX_OFFSET (we can't go back any further). + // 2. There are no more trades in the response. + // 3. We see a tx_time equal to or earlier than lastSeenTimestamp (plus + // some buffer). + // + for (let offset = 0; offset <= MAX_OFFSET; offset += TRADES_PER_QUERY - OFFSET_BUFFER) { + const trades = await this._getTradesWithOffsetAsync(numberOfDays, offset); + if (trades.length === 0) { + // There are no more trades left for the days we are querying. + // This means we are done. + return filterDuplicateTrades(allTrades); + } + const sortedTrades = R.reverse(R.sortBy(trade => trade.tx_time, trades)); + allTrades = allTrades.concat(sortedTrades); + + // Check if lastReturnedTimestamp < lastSeenTimestamp + const lastReturnedTimestamp = new Date(sortedTrades[0].tx_time).getTime(); + if (lastReturnedTimestamp < lastSeenTimestamp - LAST_SEEN_TIMESTAMP_BUFFER_MS) { + // We are at the point where we have already seen trades for the + // timestamp range that is being returned. We're done. + return filterDuplicateTrades(allTrades); + } + } + return filterDuplicateTrades(allTrades); + } + + private async _getTradesWithOffsetAsync(numberOfDays: number, offset: number): Promise<BloxyTrade[]> { + const resp = await axios.get<BloxyTradeResponse>(BLOXY_DEX_TRADES_URL, { + params: { + key: this._apiKey, + days: numberOfDays, + limit: TRADES_PER_QUERY, + offset, + }, + }); + if (isError(resp.data)) { + throw new Error(`Error in Bloxy API response: ${resp.data.error}`); + } + return resp.data; + } +} + +// Computes the number of days between the given timestamp and the current +// timestamp (rounded up). +function getDaysSinceTimestamp(timestamp: number): number { + const msSinceTimestamp = Date.now() - timestamp; + const daysSinceTimestamp = msSinceTimestamp / millisecondsPerDay; + return Math.ceil(daysSinceTimestamp); +} + +const filterDuplicateTrades = R.uniqBy((trade: BloxyTrade) => trade.tx_hash); diff --git a/packages/pipeline/src/data_sources/contract-wrappers/erc20_events.ts b/packages/pipeline/src/data_sources/contract-wrappers/erc20_events.ts new file mode 100644 index 000000000..e0098122f --- /dev/null +++ b/packages/pipeline/src/data_sources/contract-wrappers/erc20_events.ts @@ -0,0 +1,45 @@ +import { + ContractWrappers, + ERC20TokenApprovalEventArgs, + ERC20TokenEvents, + ERC20TokenWrapper, +} from '@0x/contract-wrappers'; +import { Web3ProviderEngine } from '@0x/subproviders'; +import { LogWithDecodedArgs } from 'ethereum-types'; + +import { GetEventsFunc, getEventsWithPaginationAsync } from './utils'; + +export class ERC20EventsSource { + private readonly _erc20Wrapper: ERC20TokenWrapper; + private readonly _tokenAddress: string; + constructor(provider: Web3ProviderEngine, networkId: number, tokenAddress: string) { + const contractWrappers = new ContractWrappers(provider, { networkId }); + this._erc20Wrapper = contractWrappers.erc20Token; + this._tokenAddress = tokenAddress; + } + + public async getApprovalEventsAsync( + startBlock: number, + endBlock: number, + ): Promise<Array<LogWithDecodedArgs<ERC20TokenApprovalEventArgs>>> { + return getEventsWithPaginationAsync( + this._getApprovalEventsForRangeAsync.bind(this) as GetEventsFunc<ERC20TokenApprovalEventArgs>, + startBlock, + endBlock, + ); + } + + // Gets all approval events of for a specific sub-range. This getter + // function will be called during each step of pagination. + private async _getApprovalEventsForRangeAsync( + fromBlock: number, + toBlock: number, + ): Promise<Array<LogWithDecodedArgs<ERC20TokenApprovalEventArgs>>> { + return this._erc20Wrapper.getLogsAsync<ERC20TokenApprovalEventArgs>( + this._tokenAddress, + ERC20TokenEvents.Approval, + { fromBlock, toBlock }, + {}, + ); + } +} diff --git a/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts b/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts new file mode 100644 index 000000000..58691e2ab --- /dev/null +++ b/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts @@ -0,0 +1,59 @@ +import { + ContractWrappers, + ExchangeCancelEventArgs, + ExchangeCancelUpToEventArgs, + ExchangeEventArgs, + ExchangeEvents, + ExchangeFillEventArgs, + ExchangeWrapper, +} from '@0x/contract-wrappers'; +import { Web3ProviderEngine } from '@0x/subproviders'; +import { LogWithDecodedArgs } from 'ethereum-types'; + +import { GetEventsFunc, getEventsWithPaginationAsync } from './utils'; + +export class ExchangeEventsSource { + private readonly _exchangeWrapper: ExchangeWrapper; + constructor(provider: Web3ProviderEngine, networkId: number) { + const contractWrappers = new ContractWrappers(provider, { networkId }); + this._exchangeWrapper = contractWrappers.exchange; + } + + public async getFillEventsAsync( + startBlock: number, + endBlock: number, + ): Promise<Array<LogWithDecodedArgs<ExchangeFillEventArgs>>> { + const getFillEventsForRangeAsync = this._makeGetterFuncForEventType<ExchangeFillEventArgs>(ExchangeEvents.Fill); + return getEventsWithPaginationAsync(getFillEventsForRangeAsync, startBlock, endBlock); + } + + public async getCancelEventsAsync( + startBlock: number, + endBlock: number, + ): Promise<Array<LogWithDecodedArgs<ExchangeCancelEventArgs>>> { + const getCancelEventsForRangeAsync = this._makeGetterFuncForEventType<ExchangeCancelEventArgs>( + ExchangeEvents.Cancel, + ); + return getEventsWithPaginationAsync(getCancelEventsForRangeAsync, startBlock, endBlock); + } + + public async getCancelUpToEventsAsync( + startBlock: number, + endBlock: number, + ): Promise<Array<LogWithDecodedArgs<ExchangeCancelUpToEventArgs>>> { + const getCancelUpToEventsForRangeAsync = this._makeGetterFuncForEventType<ExchangeCancelUpToEventArgs>( + ExchangeEvents.CancelUpTo, + ); + return getEventsWithPaginationAsync(getCancelUpToEventsForRangeAsync, startBlock, endBlock); + } + + // Returns a getter function which gets all events of a specific type for a + // specific sub-range. This getter function will be called during each step + // of pagination. + private _makeGetterFuncForEventType<ArgsType extends ExchangeEventArgs>( + eventType: ExchangeEvents, + ): GetEventsFunc<ArgsType> { + return async (fromBlock: number, toBlock: number) => + this._exchangeWrapper.getLogsAsync<ArgsType>(eventType, { fromBlock, toBlock }, {}); + } +} diff --git a/packages/pipeline/src/data_sources/contract-wrappers/utils.ts b/packages/pipeline/src/data_sources/contract-wrappers/utils.ts new file mode 100644 index 000000000..67660a37e --- /dev/null +++ b/packages/pipeline/src/data_sources/contract-wrappers/utils.ts @@ -0,0 +1,67 @@ +import { DecodedLogArgs, LogWithDecodedArgs } from 'ethereum-types'; + +const NUM_BLOCKS_PER_QUERY = 10000; // Number of blocks to query for events at a time. +const NUM_RETRIES = 3; // Number of retries if a request fails or times out. + +export type GetEventsFunc<ArgsType extends DecodedLogArgs> = ( + fromBlock: number, + toBlock: number, +) => Promise<Array<LogWithDecodedArgs<ArgsType>>>; + +/** + * Gets all events between the given startBlock and endBlock by querying for + * NUM_BLOCKS_PER_QUERY at a time. Accepts a getter function in order to + * maximize code re-use and allow for getting different types of events for + * different contracts. If the getter function throws with a retryable error, + * it will automatically be retried up to NUM_RETRIES times. + * @param getEventsAsync A getter function which will be called for each step during pagination. + * @param startBlock The start of the entire block range to get events for. + * @param endBlock The end of the entire block range to get events for. + */ +export async function getEventsWithPaginationAsync<ArgsType extends DecodedLogArgs>( + getEventsAsync: GetEventsFunc<ArgsType>, + startBlock: number, + endBlock: number, +): Promise<Array<LogWithDecodedArgs<ArgsType>>> { + let events: Array<LogWithDecodedArgs<ArgsType>> = []; + for (let fromBlock = startBlock; fromBlock <= endBlock; fromBlock += NUM_BLOCKS_PER_QUERY) { + const toBlock = Math.min(fromBlock + NUM_BLOCKS_PER_QUERY - 1, endBlock); + const eventsInRange = await _getEventsWithRetriesAsync(getEventsAsync, NUM_RETRIES, fromBlock, toBlock); + events = events.concat(eventsInRange); + } + return events; +} + +/** + * Calls the getEventsAsync function and retries up to numRetries times if it + * throws with an error that is considered retryable. + * @param getEventsAsync a function that will be called on each iteration. + * @param numRetries the maximum number times to retry getEventsAsync if it fails with a retryable error. + * @param fromBlock the start of the sub-range of blocks we are getting events for. + * @param toBlock the end of the sub-range of blocks we are getting events for. + */ +export async function _getEventsWithRetriesAsync<ArgsType extends DecodedLogArgs>( + getEventsAsync: GetEventsFunc<ArgsType>, + numRetries: number, + fromBlock: number, + toBlock: number, +): Promise<Array<LogWithDecodedArgs<ArgsType>>> { + let eventsInRange: Array<LogWithDecodedArgs<ArgsType>> = []; + for (let i = 0; i <= numRetries; i++) { + try { + eventsInRange = await getEventsAsync(fromBlock, toBlock); + } catch (err) { + if (isErrorRetryable(err) && i < numRetries) { + continue; + } else { + throw err; + } + } + break; + } + return eventsInRange; +} + +function isErrorRetryable(err: Error): boolean { + return err.message.includes('network timeout'); +} diff --git a/packages/pipeline/src/data_sources/copper/index.ts b/packages/pipeline/src/data_sources/copper/index.ts new file mode 100644 index 000000000..15df2fd7d --- /dev/null +++ b/packages/pipeline/src/data_sources/copper/index.ts @@ -0,0 +1,126 @@ +import { fetchAsync } from '@0x/utils'; +import Bottleneck from 'bottleneck'; + +import { + CopperActivityTypeCategory, + CopperActivityTypeResponse, + CopperCustomFieldResponse, + CopperSearchResponse, +} from '../../parsers/copper'; + +const HTTP_OK_STATUS = 200; +const COPPER_URI = 'https://api.prosperworks.com/developer_api/v1'; + +const DEFAULT_PAGINATION_PARAMS = { + page_size: 200, + sort_by: 'date_modified', + sort_direction: 'desc', +}; + +export type CopperSearchParams = CopperLeadSearchParams | CopperActivitySearchParams | CopperOpportunitySearchParams; +export interface CopperLeadSearchParams { + page_number?: number; +} + +export interface CopperActivitySearchParams { + minimum_activity_date: number; + page_number?: number; +} + +export interface CopperOpportunitySearchParams { + sort_by: string; // must override the default 'date_modified' for this endpoint + page_number?: number; +} +export enum CopperEndpoint { + Leads = '/leads/search', + Opportunities = '/opportunities/search', + Activities = '/activities/search', +} +const ONE_SECOND = 1000; + +function httpErrorCheck(response: Response): void { + if (response.status !== HTTP_OK_STATUS) { + throw new Error(`HTTP error while scraping Copper: [${JSON.stringify(response)}]`); + } +} +export class CopperSource { + private readonly _accessToken: string; + private readonly _userEmail: string; + private readonly _defaultHeaders: any; + private readonly _limiter: Bottleneck; + + constructor(maxConcurrentRequests: number, accessToken: string, userEmail: string) { + this._accessToken = accessToken; + this._userEmail = userEmail; + this._defaultHeaders = { + 'Content-Type': 'application/json', + 'X-PW-AccessToken': this._accessToken, + 'X-PW-Application': 'developer_api', + 'X-PW-UserEmail': this._userEmail, + }; + this._limiter = new Bottleneck({ + minTime: ONE_SECOND / maxConcurrentRequests, + reservoir: 30, + reservoirRefreshAmount: 30, + reservoirRefreshInterval: maxConcurrentRequests, + }); + } + + public async fetchNumberOfPagesAsync(endpoint: CopperEndpoint, searchParams?: CopperSearchParams): Promise<number> { + const resp = await this._limiter.schedule(() => + fetchAsync(COPPER_URI + endpoint, { + method: 'POST', + body: JSON.stringify({ ...DEFAULT_PAGINATION_PARAMS, ...searchParams }), + headers: this._defaultHeaders, + }), + ); + + httpErrorCheck(resp); + + // total number of records that match the request parameters + if (resp.headers.has('X-Pw-Total')) { + const totalRecords: number = parseInt(resp.headers.get('X-Pw-Total') as string, 10); // tslint:disable-line:custom-no-magic-numbers + return Math.ceil(totalRecords / DEFAULT_PAGINATION_PARAMS.page_size); + } else { + return 1; + } + } + public async fetchSearchResultsAsync<T extends CopperSearchResponse>( + endpoint: CopperEndpoint, + searchParams?: CopperSearchParams, + ): Promise<T[]> { + const request = { ...DEFAULT_PAGINATION_PARAMS, ...searchParams }; + const response = await this._limiter.schedule(() => + fetchAsync(COPPER_URI + endpoint, { + method: 'POST', + body: JSON.stringify(request), + headers: this._defaultHeaders, + }), + ); + httpErrorCheck(response); + const json: T[] = await response.json(); + return json; + } + + public async fetchActivityTypesAsync(): Promise<Map<CopperActivityTypeCategory, CopperActivityTypeResponse[]>> { + const response = await this._limiter.schedule(() => + fetchAsync(`${COPPER_URI}/activity_types`, { + method: 'GET', + headers: this._defaultHeaders, + }), + ); + httpErrorCheck(response); + return response.json(); + } + + public async fetchCustomFieldsAsync(): Promise<CopperCustomFieldResponse[]> { + const response = await this._limiter.schedule(() => + fetchAsync(`${COPPER_URI}/custom_field_definitions`, { + method: 'GET', + headers: this._defaultHeaders, + }), + ); + httpErrorCheck(response); + return response.json(); + } +} diff --git a/packages/pipeline/src/data_sources/ddex/index.ts b/packages/pipeline/src/data_sources/ddex/index.ts new file mode 100644 index 000000000..2bbd8c29b --- /dev/null +++ b/packages/pipeline/src/data_sources/ddex/index.ts @@ -0,0 +1,78 @@ +import { fetchAsync, logUtils } from '@0x/utils'; + +const DDEX_BASE_URL = 'https://api.ddex.io/v2'; +const ACTIVE_MARKETS_URL = `${DDEX_BASE_URL}/markets`; +const NO_AGGREGATION_LEVEL = 3; // See https://docs.ddex.io/#get-orderbook +const ORDERBOOK_ENDPOINT = `/orderbook?level=${NO_AGGREGATION_LEVEL}`; +export const DDEX_SOURCE = 'ddex'; + +export interface DdexActiveMarketsResponse { + status: number; + desc: string; + data: { + markets: DdexMarket[]; + }; +} + +export interface DdexMarket { + id: string; + quoteToken: string; + quoteTokenDecimals: number; + quoteTokenAddress: string; + baseToken: string; + baseTokenDecimals: number; + baseTokenAddress: string; + minOrderSize: string; + maxOrderSize: string; + pricePrecision: number; + priceDecimals: number; + amountDecimals: number; +} + +export interface DdexOrderbookResponse { + status: number; + desc: string; + data: { + orderBook: DdexOrderbook; + }; +} + +export interface DdexOrderbook { + marketId: string; + bids: DdexOrder[]; + asks: DdexOrder[]; +} + +export interface DdexOrder { + price: string; + amount: string; + orderId: string; +} + +// tslint:disable:prefer-function-over-method +// ^ Keep consistency with other sources and help logical organization +export class DdexSource { + /** + * Call Ddex API to find out which markets they are maintaining orderbooks for. + */ + public async getActiveMarketsAsync(): Promise<DdexMarket[]> { + logUtils.log('Getting all active DDEX markets'); + const resp = await fetchAsync(ACTIVE_MARKETS_URL); + const respJson: DdexActiveMarketsResponse = await resp.json(); + const markets = respJson.data.markets; + logUtils.log(`Got ${markets.length} markets.`); + return markets; + } + + /** + * Retrieve orderbook from Ddex API for a given market. + * @param marketId String identifying the market we want data for. Eg. 'REP/AUG' + */ + public async getMarketOrderbookAsync(marketId: string): Promise<DdexOrderbook> { + logUtils.log(`${marketId}: Retrieving orderbook.`); + const marketOrderbookUrl = `${ACTIVE_MARKETS_URL}/${marketId}${ORDERBOOK_ENDPOINT}`; + const resp = await fetchAsync(marketOrderbookUrl); + const respJson: DdexOrderbookResponse = await resp.json(); + return respJson.data.orderBook; + } +} diff --git a/packages/pipeline/src/data_sources/idex/index.ts b/packages/pipeline/src/data_sources/idex/index.ts new file mode 100644 index 000000000..c1e53c08d --- /dev/null +++ b/packages/pipeline/src/data_sources/idex/index.ts @@ -0,0 +1,82 @@ +import { fetchAsync } from '@0x/utils'; + +const IDEX_BASE_URL = 'https://api.idex.market'; +const MARKETS_URL = `${IDEX_BASE_URL}/returnTicker`; +const ORDERBOOK_URL = `${IDEX_BASE_URL}/returnOrderBook`; +const MAX_ORDER_COUNT = 100; // Maximum based on https://github.com/AuroraDAO/idex-api-docs#returnorderbook +export const IDEX_SOURCE = 'idex'; + +export interface IdexMarketsResponse { + [marketName: string]: IdexMarket; +} + +export interface IdexMarket { + last: string; + high: string; + low: string; + lowestAsk: string; + highestBid: string; + percentChange: string; + baseVolume: string; + quoteVolume: string; +} + +export interface IdexOrderbook { + asks: IdexOrder[]; + bids: IdexOrder[]; +} + +export interface IdexOrder { + price: string; + amount: string; + total: string; + orderHash: string; + params: IdexOrderParam; +} + +export interface IdexOrderParam { + tokenBuy: string; + buySymbol: string; + buyPrecision: number; + amountBuy: string; + tokenSell: string; + sellSymbol: string; + sellPrecision: number; + amountSell: string; + expires: number; + nonce: number; + user: string; +} + +// tslint:disable:prefer-function-over-method +// ^ Keep consistency with other sources and help logical organization +export class IdexSource { + /** + * Call Idex API to find out which markets they are maintaining orderbooks for. + */ + public async getMarketsAsync(): Promise<string[]> { + const params = { method: 'POST' }; + const resp = await fetchAsync(MARKETS_URL, params); + const respJson: IdexMarketsResponse = await resp.json(); + const markets: string[] = Object.keys(respJson); + return markets; + } + + /** + * Retrieve orderbook from Idex API for a given market. + * @param marketId String identifying the market we want data for. Eg. 'REP_AUG' + */ + public async getMarketOrderbookAsync(marketId: string): Promise<IdexOrderbook> { + const params = { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + market: marketId, + count: MAX_ORDER_COUNT, + }), + }; + const resp = await fetchAsync(ORDERBOOK_URL, params); + const respJson: IdexOrderbook = await resp.json(); + return respJson; + } +} diff --git a/packages/pipeline/src/data_sources/oasis/index.ts b/packages/pipeline/src/data_sources/oasis/index.ts new file mode 100644 index 000000000..3b30e9dfd --- /dev/null +++ b/packages/pipeline/src/data_sources/oasis/index.ts @@ -0,0 +1,103 @@ +import { fetchAsync } from '@0x/utils'; + +const OASIS_BASE_URL = 'https://data.makerdao.com/v1'; +const OASIS_MARKET_QUERY = `query { + oasisMarkets(period: "1 week") { + nodes { + id + base + quote + buyVol + sellVol + price + high + low + } + } +}`; +const OASIS_ORDERBOOK_QUERY = `query ($market: String!) { + allOasisOrders(condition: { market: $market }) { + totalCount + nodes { + market + offerId + price + amount + act + } + } +}`; +export const OASIS_SOURCE = 'oasis'; + +export interface OasisMarket { + id: string; // market symbol e.g MKRDAI + base: string; // base symbol e.g MKR + quote: string; // quote symbol e.g DAI + buyVol: number; // total buy volume (base) + sellVol: number; // total sell volume (base) + price: number; // volume weighted price (quote) + high: number; // max sell price + low: number; // min buy price +} + +export interface OasisMarketResponse { + data: { + oasisMarkets: { + nodes: OasisMarket[]; + }; + }; +} + +export interface OasisOrder { + offerId: number; // Offer Id + market: string; // Market symbol (base/quote) + price: string; // Offer price (quote) + amount: string; // Offer amount (base) + act: string; // Action (ask|bid) +} + +export interface OasisOrderbookResponse { + data: { + allOasisOrders: { + totalCount: number; + nodes: OasisOrder[]; + }; + }; +} + +// tslint:disable:prefer-function-over-method +// ^ Keep consistency with other sources and help logical organization +export class OasisSource { + /** + * Call Ddex API to find out which markets they are maintaining orderbooks for. + */ + public async getActiveMarketsAsync(): Promise<OasisMarket[]> { + const params = { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ query: OASIS_MARKET_QUERY }), + }; + const resp = await fetchAsync(OASIS_BASE_URL, params); + const respJson: OasisMarketResponse = await resp.json(); + const markets = respJson.data.oasisMarkets.nodes; + return markets; + } + + /** + * Retrieve orderbook from Oasis API for a given market. + * @param marketId String identifying the market we want data for. Eg. 'REPAUG'. + */ + public async getMarketOrderbookAsync(marketId: string): Promise<OasisOrder[]> { + const input = { + market: marketId, + }; + const params = { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ query: OASIS_ORDERBOOK_QUERY, variables: input }), + }; + const resp = await fetchAsync(OASIS_BASE_URL, params); + const respJson: OasisOrderbookResponse = await resp.json(); + return respJson.data.allOasisOrders.nodes; + } +} diff --git a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts new file mode 100644 index 000000000..85042501b --- /dev/null +++ b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts @@ -0,0 +1,110 @@ +// tslint:disable:no-duplicate-imports +import { fetchAsync } from '@0x/utils'; +import Bottleneck from 'bottleneck'; +import { stringify } from 'querystring'; +import * as R from 'ramda'; + +import { TradingPair } from '../../utils/get_ohlcv_trading_pairs'; + +export interface CryptoCompareOHLCVResponse { + Data: CryptoCompareOHLCVRecord[]; + Response: string; + Message: string; + Type: number; +} + +export interface CryptoCompareOHLCVRecord { + time: number; // in seconds, not milliseconds + close: number; + high: number; + low: number; + open: number; + volumefrom: number; + volumeto: number; +} + +export interface CryptoCompareOHLCVParams { + fsym: string; + tsym: string; + e?: string; + aggregate?: string; + aggregatePredictableTimePeriods?: boolean; + limit?: number; + toTs?: number; +} + +const ONE_HOUR = 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers +const ONE_SECOND = 1000; +const ONE_HOUR_AGO = new Date().getTime() - ONE_HOUR; +const HTTP_OK_STATUS = 200; +const CRYPTO_COMPARE_VALID_EMPTY_RESPONSE_TYPE = 96; +const MAX_PAGE_SIZE = 2000; + +export class CryptoCompareOHLCVSource { + public readonly intervalBetweenRecords = ONE_HOUR; + public readonly defaultExchange = 'CCCAGG'; + public readonly interval = this.intervalBetweenRecords * MAX_PAGE_SIZE; // the hourly API returns data for one interval at a time + private readonly _url: string = 'https://min-api.cryptocompare.com/data/histohour?'; + + // rate-limit for all API calls through this class instance + private readonly _limiter: Bottleneck; + constructor(maxReqsPerSecond: number) { + this._limiter = new Bottleneck({ + minTime: ONE_SECOND / maxReqsPerSecond, + reservoir: 30, + reservoirRefreshAmount: 30, + reservoirRefreshInterval: ONE_SECOND, + }); + } + + // gets OHLCV records starting from pair.latest + public async getHourlyOHLCVAsync(pair: TradingPair): Promise<CryptoCompareOHLCVRecord[]> { + const params = { + e: this.defaultExchange, + fsym: pair.fromSymbol, + tsym: pair.toSymbol, + limit: MAX_PAGE_SIZE, + toTs: Math.floor((pair.latestSavedTime + this.interval) / ONE_SECOND), // CryptoCompare uses timestamp in seconds. not ms + }; + const url = this._url + stringify(params); + const response = await this._limiter.schedule(() => fetchAsync(url)); + if (response.status !== HTTP_OK_STATUS) { + throw new Error(`HTTP error while scraping Crypto Compare: [${response}]`); + } + const json: CryptoCompareOHLCVResponse = await response.json(); + if ( + (json.Response === 'Error' || json.Data.length === 0) && + json.Type !== CRYPTO_COMPARE_VALID_EMPTY_RESPONSE_TYPE + ) { + throw new Error(JSON.stringify(json)); + } + return json.Data.filter(rec => { + return ( + // Crypto Compare takes ~30 mins to finalise records + rec.time * ONE_SECOND < ONE_HOUR_AGO && rec.time * ONE_SECOND > pair.latestSavedTime && hasData(rec) + ); + }); + } + public generateBackfillIntervals(pair: TradingPair): TradingPair[] { + const now = new Date().getTime(); + const f = (p: TradingPair): false | [TradingPair, TradingPair] => { + if (p.latestSavedTime > now) { + return false; + } else { + return [p, R.merge(p, { latestSavedTime: p.latestSavedTime + this.interval })]; + } + }; + return R.unfold(f, pair); + } +} + +function hasData(record: CryptoCompareOHLCVRecord): boolean { + return ( + record.close !== 0 || + record.open !== 0 || + record.high !== 0 || + record.low !== 0 || + record.volumefrom !== 0 || + record.volumeto !== 0 + ); +} diff --git a/packages/pipeline/src/data_sources/paradex/index.ts b/packages/pipeline/src/data_sources/paradex/index.ts new file mode 100644 index 000000000..46d448f4b --- /dev/null +++ b/packages/pipeline/src/data_sources/paradex/index.ts @@ -0,0 +1,92 @@ +import { fetchAsync, logUtils } from '@0x/utils'; + +const PARADEX_BASE_URL = 'https://api.paradex.io/consumer/v0'; +const ACTIVE_MARKETS_URL = `${PARADEX_BASE_URL}/markets`; +const ORDERBOOK_ENDPOINT = `${PARADEX_BASE_URL}/orderbook`; +const TOKEN_INFO_ENDPOINT = `${PARADEX_BASE_URL}/tokens`; +export const PARADEX_SOURCE = 'paradex'; + +export type ParadexActiveMarketsResponse = ParadexMarket[]; + +export interface ParadexMarket { + id: string; + symbol: string; + baseToken: string; + quoteToken: string; + minOrderSize: string; + maxOrderSize: string; + priceMaxDecimals: number; + amountMaxDecimals: number; + // These are not native to the Paradex API response. We tag them on later + // by calling the token endpoint and joining on symbol. + baseTokenAddress?: string; + quoteTokenAddress?: string; +} + +export interface ParadexOrderbookResponse { + marketId: number; + marketSymbol: string; + bids: ParadexOrder[]; + asks: ParadexOrder[]; +} + +export interface ParadexOrder { + amount: string; + price: string; +} + +export type ParadexTokenInfoResponse = ParadexTokenInfo[]; + +export interface ParadexTokenInfo { + name: string; + symbol: string; + address: string; +} + +export class ParadexSource { + private readonly _apiKey: string; + + constructor(apiKey: string) { + this._apiKey = apiKey; + } + + /** + * Call Paradex API to find out which markets they are maintaining orderbooks for. + */ + public async getActiveMarketsAsync(): Promise<ParadexActiveMarketsResponse> { + logUtils.log('Getting all active Paradex markets.'); + const resp = await fetchAsync(ACTIVE_MARKETS_URL, { + headers: { 'API-KEY': this._apiKey }, + }); + const markets: ParadexActiveMarketsResponse = await resp.json(); + logUtils.log(`Got ${markets.length} markets.`); + return markets; + } + + /** + * Call Paradex API to find out their token information. + */ + public async getTokenInfoAsync(): Promise<ParadexTokenInfoResponse> { + logUtils.log('Getting token information from Paradex.'); + const resp = await fetchAsync(TOKEN_INFO_ENDPOINT, { + headers: { 'API-KEY': this._apiKey }, + }); + const tokens: ParadexTokenInfoResponse = await resp.json(); + logUtils.log(`Got information for ${tokens.length} tokens.`); + return tokens; + } + + /** + * Retrieve orderbook from Paradex API for a given market. + * @param marketSymbol String representing the market we want data for. + */ + public async getMarketOrderbookAsync(marketSymbol: string): Promise<ParadexOrderbookResponse> { + logUtils.log(`${marketSymbol}: Retrieving orderbook.`); + const marketOrderbookUrl = `${ORDERBOOK_ENDPOINT}?market=${marketSymbol}`; + const resp = await fetchAsync(marketOrderbookUrl, { + headers: { 'API-KEY': this._apiKey }, + }); + const orderbookResponse: ParadexOrderbookResponse = await resp.json(); + return orderbookResponse; + } +} diff --git a/packages/pipeline/src/data_sources/relayer-registry/index.ts b/packages/pipeline/src/data_sources/relayer-registry/index.ts new file mode 100644 index 000000000..8133f5eae --- /dev/null +++ b/packages/pipeline/src/data_sources/relayer-registry/index.ts @@ -0,0 +1,33 @@ +import axios from 'axios'; + +export interface RelayerResponse { + name: string; + homepage_url: string; + app_url: string; + header_img: string; + logo_img: string; + networks: RelayerResponseNetwork[]; +} + +export interface RelayerResponseNetwork { + networkId: number; + sra_http_endpoint?: string; + sra_ws_endpoint?: string; + static_order_fields?: { + fee_recipient_addresses?: string[]; + taker_addresses?: string[]; + }; +} + +export class RelayerRegistrySource { + private readonly _url: string; + + constructor(url: string) { + this._url = url; + } + + public async getRelayerInfoAsync(): Promise<Map<string, RelayerResponse>> { + const resp = await axios.get<Map<string, RelayerResponse>>(this._url); + return resp.data; + } +} diff --git a/packages/pipeline/src/data_sources/trusted_tokens/index.ts b/packages/pipeline/src/data_sources/trusted_tokens/index.ts new file mode 100644 index 000000000..552739fb9 --- /dev/null +++ b/packages/pipeline/src/data_sources/trusted_tokens/index.ts @@ -0,0 +1,29 @@ +import axios from 'axios'; + +export interface ZeroExTrustedTokenMeta { + address: string; + name: string; + symbol: string; + decimals: number; +} + +export interface MetamaskTrustedTokenMeta { + address: string; + name: string; + erc20: boolean; + symbol: string; + decimals: number; +} + +export class TrustedTokenSource<T> { + private readonly _url: string; + + constructor(url: string) { + this._url = url; + } + + public async getTrustedTokenMetaAsync(): Promise<T> { + const resp = await axios.get<T>(this._url); + return resp.data; + } +} diff --git a/packages/pipeline/src/data_sources/web3/index.ts b/packages/pipeline/src/data_sources/web3/index.ts new file mode 100644 index 000000000..45a9ea161 --- /dev/null +++ b/packages/pipeline/src/data_sources/web3/index.ts @@ -0,0 +1,22 @@ +import { Web3ProviderEngine } from '@0x/subproviders'; +import { Web3Wrapper } from '@0x/web3-wrapper'; +import { BlockWithoutTransactionData, Transaction } from 'ethereum-types'; + +export class Web3Source { + private readonly _web3Wrapper: Web3Wrapper; + constructor(provider: Web3ProviderEngine) { + this._web3Wrapper = new Web3Wrapper(provider); + } + + public async getBlockInfoAsync(blockNumber: number): Promise<BlockWithoutTransactionData> { + const block = await this._web3Wrapper.getBlockIfExistsAsync(blockNumber); + if (block == null) { + return Promise.reject(new Error(`Could not find block for given block number: ${blockNumber}`)); + } + return block; + } + + public async getTransactionInfoAsync(txHash: string): Promise<Transaction> { + return this._web3Wrapper.getTransactionByHashAsync(txHash); + } +} |