diff options
author | Hsuan Lee <hsuan@cobinhood.com> | 2019-01-19 18:42:04 +0800 |
---|---|---|
committer | Hsuan Lee <hsuan@cobinhood.com> | 2019-01-19 18:42:04 +0800 |
commit | 7ae38906926dc09bc10670c361af0d2bf0050426 (patch) | |
tree | 5fb10ae366b987db09e4ddb4bc3ba0f75404ad08 /packages/pipeline/src | |
parent | b5fd3c72a08aaa6957917d74c333387a16edf66b (diff) | |
download | dexon-sol-tools-7ae38906926dc09bc10670c361af0d2bf0050426.tar dexon-sol-tools-7ae38906926dc09bc10670c361af0d2bf0050426.tar.gz dexon-sol-tools-7ae38906926dc09bc10670c361af0d2bf0050426.tar.bz2 dexon-sol-tools-7ae38906926dc09bc10670c361af0d2bf0050426.tar.lz dexon-sol-tools-7ae38906926dc09bc10670c361af0d2bf0050426.tar.xz dexon-sol-tools-7ae38906926dc09bc10670c361af0d2bf0050426.tar.zst dexon-sol-tools-7ae38906926dc09bc10670c361af0d2bf0050426.zip |
Update dependency packages
Diffstat (limited to 'packages/pipeline/src')
69 files changed, 0 insertions, 3976 deletions
diff --git a/packages/pipeline/src/data_sources/bloxy/index.ts b/packages/pipeline/src/data_sources/bloxy/index.ts deleted file mode 100644 index 94468d25a..000000000 --- a/packages/pipeline/src/data_sources/bloxy/index.ts +++ /dev/null @@ -1,133 +0,0 @@ -import axios from 'axios'; -import * as R from 'ramda'; - -// URL to use for getting dex trades from Bloxy. -export const BLOXY_DEX_TRADES_URL = 'https://bloxy.info/api/dex/trades'; -// Number of trades to get at once. Must be less than or equal to MAX_OFFSET. -const TRADES_PER_QUERY = 10000; -// Maximum offset supported by the Bloxy API. -const MAX_OFFSET = 100000; -// Buffer to subtract from offset. This means we will request some trades twice -// but we have less chance on missing out on any data. -const OFFSET_BUFFER = 1000; -// Maximum number of days supported by the Bloxy API. -const MAX_DAYS = 30; -// Buffer used for comparing the last seen timestamp to the last returned -// timestamp. Increasing this reduces chances of data loss but also creates more -// redundancy and can impact performance. -// tslint:disable-next-line:custom-no-magic-numbers -const LAST_SEEN_TIMESTAMP_BUFFER_MS = 1000 * 60 * 30; // 30 minutes - -// tslint:disable-next-line:custom-no-magic-numbers -const millisecondsPerDay = 1000 * 60 * 60 * 24; // ms/d = ms/s * s/m * m/h * h/d - -export interface BloxyTrade { - tx_hash: string; - tx_time: string; - tx_date: string; - tx_sender: string; - smart_contract_id: number; - smart_contract_address: string; - contract_type: string; - maker: string; - taker: string; - amountBuy: number; - makerFee: number; - buyCurrencyId: number; - buySymbol: string; - amountSell: number; - takerFee: number; - sellCurrencyId: number; - sellSymbol: string; - maker_annotation: string; - taker_annotation: string; - protocol: string; - buyAddress: string | null; - sellAddress: string | null; -} - -interface BloxyError { - error: string; -} - -type BloxyResponse<T> = T | BloxyError; -type BloxyTradeResponse = BloxyResponse<BloxyTrade[]>; - -function isError<T>(response: BloxyResponse<T>): response is BloxyError { - return (response as BloxyError).error !== undefined; -} - -export class BloxySource { - private readonly _apiKey: string; - - constructor(apiKey: string) { - this._apiKey = apiKey; - } - - /** - * Gets all latest trades between the lastSeenTimestamp (minus some buffer) - * and the current time. Note that because the Bloxy API has some hard - * limits it might not always be possible to get *all* the trades in the - * desired time range. - * @param lastSeenTimestamp The latest timestamp for trades that have - * already been seen. - */ - public async getDexTradesAsync(lastSeenTimestamp: number): Promise<BloxyTrade[]> { - let allTrades: BloxyTrade[] = []; - - // Clamp numberOfDays so that it is always between 1 and MAX_DAYS (inclusive) - const numberOfDays = R.clamp(1, MAX_DAYS, getDaysSinceTimestamp(lastSeenTimestamp)); - - // Keep getting trades until we hit one of the following conditions: - // - // 1. Offset hits MAX_OFFSET (we can't go back any further). - // 2. There are no more trades in the response. - // 3. We see a tx_time equal to or earlier than lastSeenTimestamp (plus - // some buffer). - // - for (let offset = 0; offset <= MAX_OFFSET; offset += TRADES_PER_QUERY - OFFSET_BUFFER) { - const trades = await this._getTradesWithOffsetAsync(numberOfDays, offset); - if (trades.length === 0) { - // There are no more trades left for the days we are querying. - // This means we are done. - return filterDuplicateTrades(allTrades); - } - const sortedTrades = R.reverse(R.sortBy(trade => trade.tx_time, trades)); - allTrades = allTrades.concat(sortedTrades); - - // Check if lastReturnedTimestamp < lastSeenTimestamp - const lastReturnedTimestamp = new Date(sortedTrades[0].tx_time).getTime(); - if (lastReturnedTimestamp < lastSeenTimestamp - LAST_SEEN_TIMESTAMP_BUFFER_MS) { - // We are at the point where we have already seen trades for the - // timestamp range that is being returned. We're done. - return filterDuplicateTrades(allTrades); - } - } - return filterDuplicateTrades(allTrades); - } - - private async _getTradesWithOffsetAsync(numberOfDays: number, offset: number): Promise<BloxyTrade[]> { - const resp = await axios.get<BloxyTradeResponse>(BLOXY_DEX_TRADES_URL, { - params: { - key: this._apiKey, - days: numberOfDays, - limit: TRADES_PER_QUERY, - offset, - }, - }); - if (isError(resp.data)) { - throw new Error(`Error in Bloxy API response: ${resp.data.error}`); - } - return resp.data; - } -} - -// Computes the number of days between the given timestamp and the current -// timestamp (rounded up). -function getDaysSinceTimestamp(timestamp: number): number { - const msSinceTimestamp = Date.now() - timestamp; - const daysSinceTimestamp = msSinceTimestamp / millisecondsPerDay; - return Math.ceil(daysSinceTimestamp); -} - -const filterDuplicateTrades = R.uniqBy((trade: BloxyTrade) => trade.tx_hash); diff --git a/packages/pipeline/src/data_sources/contract-wrappers/erc20_events.ts b/packages/pipeline/src/data_sources/contract-wrappers/erc20_events.ts deleted file mode 100644 index e0098122f..000000000 --- a/packages/pipeline/src/data_sources/contract-wrappers/erc20_events.ts +++ /dev/null @@ -1,45 +0,0 @@ -import { - ContractWrappers, - ERC20TokenApprovalEventArgs, - ERC20TokenEvents, - ERC20TokenWrapper, -} from '@0x/contract-wrappers'; -import { Web3ProviderEngine } from '@0x/subproviders'; -import { LogWithDecodedArgs } from 'ethereum-types'; - -import { GetEventsFunc, getEventsWithPaginationAsync } from './utils'; - -export class ERC20EventsSource { - private readonly _erc20Wrapper: ERC20TokenWrapper; - private readonly _tokenAddress: string; - constructor(provider: Web3ProviderEngine, networkId: number, tokenAddress: string) { - const contractWrappers = new ContractWrappers(provider, { networkId }); - this._erc20Wrapper = contractWrappers.erc20Token; - this._tokenAddress = tokenAddress; - } - - public async getApprovalEventsAsync( - startBlock: number, - endBlock: number, - ): Promise<Array<LogWithDecodedArgs<ERC20TokenApprovalEventArgs>>> { - return getEventsWithPaginationAsync( - this._getApprovalEventsForRangeAsync.bind(this) as GetEventsFunc<ERC20TokenApprovalEventArgs>, - startBlock, - endBlock, - ); - } - - // Gets all approval events of for a specific sub-range. This getter - // function will be called during each step of pagination. - private async _getApprovalEventsForRangeAsync( - fromBlock: number, - toBlock: number, - ): Promise<Array<LogWithDecodedArgs<ERC20TokenApprovalEventArgs>>> { - return this._erc20Wrapper.getLogsAsync<ERC20TokenApprovalEventArgs>( - this._tokenAddress, - ERC20TokenEvents.Approval, - { fromBlock, toBlock }, - {}, - ); - } -} diff --git a/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts b/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts deleted file mode 100644 index 58691e2ab..000000000 --- a/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts +++ /dev/null @@ -1,59 +0,0 @@ -import { - ContractWrappers, - ExchangeCancelEventArgs, - ExchangeCancelUpToEventArgs, - ExchangeEventArgs, - ExchangeEvents, - ExchangeFillEventArgs, - ExchangeWrapper, -} from '@0x/contract-wrappers'; -import { Web3ProviderEngine } from '@0x/subproviders'; -import { LogWithDecodedArgs } from 'ethereum-types'; - -import { GetEventsFunc, getEventsWithPaginationAsync } from './utils'; - -export class ExchangeEventsSource { - private readonly _exchangeWrapper: ExchangeWrapper; - constructor(provider: Web3ProviderEngine, networkId: number) { - const contractWrappers = new ContractWrappers(provider, { networkId }); - this._exchangeWrapper = contractWrappers.exchange; - } - - public async getFillEventsAsync( - startBlock: number, - endBlock: number, - ): Promise<Array<LogWithDecodedArgs<ExchangeFillEventArgs>>> { - const getFillEventsForRangeAsync = this._makeGetterFuncForEventType<ExchangeFillEventArgs>(ExchangeEvents.Fill); - return getEventsWithPaginationAsync(getFillEventsForRangeAsync, startBlock, endBlock); - } - - public async getCancelEventsAsync( - startBlock: number, - endBlock: number, - ): Promise<Array<LogWithDecodedArgs<ExchangeCancelEventArgs>>> { - const getCancelEventsForRangeAsync = this._makeGetterFuncForEventType<ExchangeCancelEventArgs>( - ExchangeEvents.Cancel, - ); - return getEventsWithPaginationAsync(getCancelEventsForRangeAsync, startBlock, endBlock); - } - - public async getCancelUpToEventsAsync( - startBlock: number, - endBlock: number, - ): Promise<Array<LogWithDecodedArgs<ExchangeCancelUpToEventArgs>>> { - const getCancelUpToEventsForRangeAsync = this._makeGetterFuncForEventType<ExchangeCancelUpToEventArgs>( - ExchangeEvents.CancelUpTo, - ); - return getEventsWithPaginationAsync(getCancelUpToEventsForRangeAsync, startBlock, endBlock); - } - - // Returns a getter function which gets all events of a specific type for a - // specific sub-range. This getter function will be called during each step - // of pagination. - private _makeGetterFuncForEventType<ArgsType extends ExchangeEventArgs>( - eventType: ExchangeEvents, - ): GetEventsFunc<ArgsType> { - return async (fromBlock: number, toBlock: number) => - this._exchangeWrapper.getLogsAsync<ArgsType>(eventType, { fromBlock, toBlock }, {}); - } -} diff --git a/packages/pipeline/src/data_sources/contract-wrappers/utils.ts b/packages/pipeline/src/data_sources/contract-wrappers/utils.ts deleted file mode 100644 index 67660a37e..000000000 --- a/packages/pipeline/src/data_sources/contract-wrappers/utils.ts +++ /dev/null @@ -1,67 +0,0 @@ -import { DecodedLogArgs, LogWithDecodedArgs } from 'ethereum-types'; - -const NUM_BLOCKS_PER_QUERY = 10000; // Number of blocks to query for events at a time. -const NUM_RETRIES = 3; // Number of retries if a request fails or times out. - -export type GetEventsFunc<ArgsType extends DecodedLogArgs> = ( - fromBlock: number, - toBlock: number, -) => Promise<Array<LogWithDecodedArgs<ArgsType>>>; - -/** - * Gets all events between the given startBlock and endBlock by querying for - * NUM_BLOCKS_PER_QUERY at a time. Accepts a getter function in order to - * maximize code re-use and allow for getting different types of events for - * different contracts. If the getter function throws with a retryable error, - * it will automatically be retried up to NUM_RETRIES times. - * @param getEventsAsync A getter function which will be called for each step during pagination. - * @param startBlock The start of the entire block range to get events for. - * @param endBlock The end of the entire block range to get events for. - */ -export async function getEventsWithPaginationAsync<ArgsType extends DecodedLogArgs>( - getEventsAsync: GetEventsFunc<ArgsType>, - startBlock: number, - endBlock: number, -): Promise<Array<LogWithDecodedArgs<ArgsType>>> { - let events: Array<LogWithDecodedArgs<ArgsType>> = []; - for (let fromBlock = startBlock; fromBlock <= endBlock; fromBlock += NUM_BLOCKS_PER_QUERY) { - const toBlock = Math.min(fromBlock + NUM_BLOCKS_PER_QUERY - 1, endBlock); - const eventsInRange = await _getEventsWithRetriesAsync(getEventsAsync, NUM_RETRIES, fromBlock, toBlock); - events = events.concat(eventsInRange); - } - return events; -} - -/** - * Calls the getEventsAsync function and retries up to numRetries times if it - * throws with an error that is considered retryable. - * @param getEventsAsync a function that will be called on each iteration. - * @param numRetries the maximum number times to retry getEventsAsync if it fails with a retryable error. - * @param fromBlock the start of the sub-range of blocks we are getting events for. - * @param toBlock the end of the sub-range of blocks we are getting events for. - */ -export async function _getEventsWithRetriesAsync<ArgsType extends DecodedLogArgs>( - getEventsAsync: GetEventsFunc<ArgsType>, - numRetries: number, - fromBlock: number, - toBlock: number, -): Promise<Array<LogWithDecodedArgs<ArgsType>>> { - let eventsInRange: Array<LogWithDecodedArgs<ArgsType>> = []; - for (let i = 0; i <= numRetries; i++) { - try { - eventsInRange = await getEventsAsync(fromBlock, toBlock); - } catch (err) { - if (isErrorRetryable(err) && i < numRetries) { - continue; - } else { - throw err; - } - } - break; - } - return eventsInRange; -} - -function isErrorRetryable(err: Error): boolean { - return err.message.includes('network timeout'); -} diff --git a/packages/pipeline/src/data_sources/copper/index.ts b/packages/pipeline/src/data_sources/copper/index.ts deleted file mode 100644 index 15df2fd7d..000000000 --- a/packages/pipeline/src/data_sources/copper/index.ts +++ /dev/null @@ -1,126 +0,0 @@ -import { fetchAsync } from '@0x/utils'; -import Bottleneck from 'bottleneck'; - -import { - CopperActivityTypeCategory, - CopperActivityTypeResponse, - CopperCustomFieldResponse, - CopperSearchResponse, -} from '../../parsers/copper'; - -const HTTP_OK_STATUS = 200; -const COPPER_URI = 'https://api.prosperworks.com/developer_api/v1'; - -const DEFAULT_PAGINATION_PARAMS = { - page_size: 200, - sort_by: 'date_modified', - sort_direction: 'desc', -}; - -export type CopperSearchParams = CopperLeadSearchParams | CopperActivitySearchParams | CopperOpportunitySearchParams; -export interface CopperLeadSearchParams { - page_number?: number; -} - -export interface CopperActivitySearchParams { - minimum_activity_date: number; - page_number?: number; -} - -export interface CopperOpportunitySearchParams { - sort_by: string; // must override the default 'date_modified' for this endpoint - page_number?: number; -} -export enum CopperEndpoint { - Leads = '/leads/search', - Opportunities = '/opportunities/search', - Activities = '/activities/search', -} -const ONE_SECOND = 1000; - -function httpErrorCheck(response: Response): void { - if (response.status !== HTTP_OK_STATUS) { - throw new Error(`HTTP error while scraping Copper: [${JSON.stringify(response)}]`); - } -} -export class CopperSource { - private readonly _accessToken: string; - private readonly _userEmail: string; - private readonly _defaultHeaders: any; - private readonly _limiter: Bottleneck; - - constructor(maxConcurrentRequests: number, accessToken: string, userEmail: string) { - this._accessToken = accessToken; - this._userEmail = userEmail; - this._defaultHeaders = { - 'Content-Type': 'application/json', - 'X-PW-AccessToken': this._accessToken, - 'X-PW-Application': 'developer_api', - 'X-PW-UserEmail': this._userEmail, - }; - this._limiter = new Bottleneck({ - minTime: ONE_SECOND / maxConcurrentRequests, - reservoir: 30, - reservoirRefreshAmount: 30, - reservoirRefreshInterval: maxConcurrentRequests, - }); - } - - public async fetchNumberOfPagesAsync(endpoint: CopperEndpoint, searchParams?: CopperSearchParams): Promise<number> { - const resp = await this._limiter.schedule(() => - fetchAsync(COPPER_URI + endpoint, { - method: 'POST', - body: JSON.stringify({ ...DEFAULT_PAGINATION_PARAMS, ...searchParams }), - headers: this._defaultHeaders, - }), - ); - - httpErrorCheck(resp); - - // total number of records that match the request parameters - if (resp.headers.has('X-Pw-Total')) { - const totalRecords: number = parseInt(resp.headers.get('X-Pw-Total') as string, 10); // tslint:disable-line:custom-no-magic-numbers - return Math.ceil(totalRecords / DEFAULT_PAGINATION_PARAMS.page_size); - } else { - return 1; - } - } - public async fetchSearchResultsAsync<T extends CopperSearchResponse>( - endpoint: CopperEndpoint, - searchParams?: CopperSearchParams, - ): Promise<T[]> { - const request = { ...DEFAULT_PAGINATION_PARAMS, ...searchParams }; - const response = await this._limiter.schedule(() => - fetchAsync(COPPER_URI + endpoint, { - method: 'POST', - body: JSON.stringify(request), - headers: this._defaultHeaders, - }), - ); - httpErrorCheck(response); - const json: T[] = await response.json(); - return json; - } - - public async fetchActivityTypesAsync(): Promise<Map<CopperActivityTypeCategory, CopperActivityTypeResponse[]>> { - const response = await this._limiter.schedule(() => - fetchAsync(`${COPPER_URI}/activity_types`, { - method: 'GET', - headers: this._defaultHeaders, - }), - ); - httpErrorCheck(response); - return response.json(); - } - - public async fetchCustomFieldsAsync(): Promise<CopperCustomFieldResponse[]> { - const response = await this._limiter.schedule(() => - fetchAsync(`${COPPER_URI}/custom_field_definitions`, { - method: 'GET', - headers: this._defaultHeaders, - }), - ); - httpErrorCheck(response); - return response.json(); - } -} diff --git a/packages/pipeline/src/data_sources/ddex/index.ts b/packages/pipeline/src/data_sources/ddex/index.ts deleted file mode 100644 index 7ef92b90f..000000000 --- a/packages/pipeline/src/data_sources/ddex/index.ts +++ /dev/null @@ -1,77 +0,0 @@ -import { fetchAsync, logUtils } from '@0x/utils'; - -const DDEX_BASE_URL = 'https://api.ddex.io/v3'; -const ACTIVE_MARKETS_URL = `${DDEX_BASE_URL}/markets`; -const NO_AGGREGATION_LEVEL = 3; // See https://docs.ddex.io/#get-orderbook -const ORDERBOOK_ENDPOINT = `/orderbook?level=${NO_AGGREGATION_LEVEL}`; -export const DDEX_SOURCE = 'ddex'; - -export interface DdexActiveMarketsResponse { - status: number; - desc: string; - data: { - markets: DdexMarket[]; - }; -} - -export interface DdexMarket { - id: string; - quoteToken: string; - quoteTokenDecimals: number; - quoteTokenAddress: string; - baseToken: string; - baseTokenDecimals: number; - baseTokenAddress: string; - minOrderSize: string; - pricePrecision: number; - priceDecimals: number; - amountDecimals: number; -} - -export interface DdexOrderbookResponse { - status: number; - desc: string; - data: { - orderBook: DdexOrderbook; - }; -} - -export interface DdexOrderbook { - marketId: string; - bids: DdexOrder[]; - asks: DdexOrder[]; -} - -export interface DdexOrder { - price: string; - amount: string; - orderId: string; -} - -// tslint:disable:prefer-function-over-method -// ^ Keep consistency with other sources and help logical organization -export class DdexSource { - /** - * Call Ddex API to find out which markets they are maintaining orderbooks for. - */ - public async getActiveMarketsAsync(): Promise<DdexMarket[]> { - logUtils.log('Getting all active DDEX markets'); - const resp = await fetchAsync(ACTIVE_MARKETS_URL); - const respJson: DdexActiveMarketsResponse = await resp.json(); - const markets = respJson.data.markets; - logUtils.log(`Got ${markets.length} markets.`); - return markets; - } - - /** - * Retrieve orderbook from Ddex API for a given market. - * @param marketId String identifying the market we want data for. Eg. 'REP/AUG' - */ - public async getMarketOrderbookAsync(marketId: string): Promise<DdexOrderbook> { - logUtils.log(`${marketId}: Retrieving orderbook.`); - const marketOrderbookUrl = `${ACTIVE_MARKETS_URL}/${marketId}${ORDERBOOK_ENDPOINT}`; - const resp = await fetchAsync(marketOrderbookUrl); - const respJson: DdexOrderbookResponse = await resp.json(); - return respJson.data.orderBook; - } -} diff --git a/packages/pipeline/src/data_sources/idex/index.ts b/packages/pipeline/src/data_sources/idex/index.ts deleted file mode 100644 index c1e53c08d..000000000 --- a/packages/pipeline/src/data_sources/idex/index.ts +++ /dev/null @@ -1,82 +0,0 @@ -import { fetchAsync } from '@0x/utils'; - -const IDEX_BASE_URL = 'https://api.idex.market'; -const MARKETS_URL = `${IDEX_BASE_URL}/returnTicker`; -const ORDERBOOK_URL = `${IDEX_BASE_URL}/returnOrderBook`; -const MAX_ORDER_COUNT = 100; // Maximum based on https://github.com/AuroraDAO/idex-api-docs#returnorderbook -export const IDEX_SOURCE = 'idex'; - -export interface IdexMarketsResponse { - [marketName: string]: IdexMarket; -} - -export interface IdexMarket { - last: string; - high: string; - low: string; - lowestAsk: string; - highestBid: string; - percentChange: string; - baseVolume: string; - quoteVolume: string; -} - -export interface IdexOrderbook { - asks: IdexOrder[]; - bids: IdexOrder[]; -} - -export interface IdexOrder { - price: string; - amount: string; - total: string; - orderHash: string; - params: IdexOrderParam; -} - -export interface IdexOrderParam { - tokenBuy: string; - buySymbol: string; - buyPrecision: number; - amountBuy: string; - tokenSell: string; - sellSymbol: string; - sellPrecision: number; - amountSell: string; - expires: number; - nonce: number; - user: string; -} - -// tslint:disable:prefer-function-over-method -// ^ Keep consistency with other sources and help logical organization -export class IdexSource { - /** - * Call Idex API to find out which markets they are maintaining orderbooks for. - */ - public async getMarketsAsync(): Promise<string[]> { - const params = { method: 'POST' }; - const resp = await fetchAsync(MARKETS_URL, params); - const respJson: IdexMarketsResponse = await resp.json(); - const markets: string[] = Object.keys(respJson); - return markets; - } - - /** - * Retrieve orderbook from Idex API for a given market. - * @param marketId String identifying the market we want data for. Eg. 'REP_AUG' - */ - public async getMarketOrderbookAsync(marketId: string): Promise<IdexOrderbook> { - const params = { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ - market: marketId, - count: MAX_ORDER_COUNT, - }), - }; - const resp = await fetchAsync(ORDERBOOK_URL, params); - const respJson: IdexOrderbook = await resp.json(); - return respJson; - } -} diff --git a/packages/pipeline/src/data_sources/oasis/index.ts b/packages/pipeline/src/data_sources/oasis/index.ts deleted file mode 100644 index 3b30e9dfd..000000000 --- a/packages/pipeline/src/data_sources/oasis/index.ts +++ /dev/null @@ -1,103 +0,0 @@ -import { fetchAsync } from '@0x/utils'; - -const OASIS_BASE_URL = 'https://data.makerdao.com/v1'; -const OASIS_MARKET_QUERY = `query { - oasisMarkets(period: "1 week") { - nodes { - id - base - quote - buyVol - sellVol - price - high - low - } - } -}`; -const OASIS_ORDERBOOK_QUERY = `query ($market: String!) { - allOasisOrders(condition: { market: $market }) { - totalCount - nodes { - market - offerId - price - amount - act - } - } -}`; -export const OASIS_SOURCE = 'oasis'; - -export interface OasisMarket { - id: string; // market symbol e.g MKRDAI - base: string; // base symbol e.g MKR - quote: string; // quote symbol e.g DAI - buyVol: number; // total buy volume (base) - sellVol: number; // total sell volume (base) - price: number; // volume weighted price (quote) - high: number; // max sell price - low: number; // min buy price -} - -export interface OasisMarketResponse { - data: { - oasisMarkets: { - nodes: OasisMarket[]; - }; - }; -} - -export interface OasisOrder { - offerId: number; // Offer Id - market: string; // Market symbol (base/quote) - price: string; // Offer price (quote) - amount: string; // Offer amount (base) - act: string; // Action (ask|bid) -} - -export interface OasisOrderbookResponse { - data: { - allOasisOrders: { - totalCount: number; - nodes: OasisOrder[]; - }; - }; -} - -// tslint:disable:prefer-function-over-method -// ^ Keep consistency with other sources and help logical organization -export class OasisSource { - /** - * Call Ddex API to find out which markets they are maintaining orderbooks for. - */ - public async getActiveMarketsAsync(): Promise<OasisMarket[]> { - const params = { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ query: OASIS_MARKET_QUERY }), - }; - const resp = await fetchAsync(OASIS_BASE_URL, params); - const respJson: OasisMarketResponse = await resp.json(); - const markets = respJson.data.oasisMarkets.nodes; - return markets; - } - - /** - * Retrieve orderbook from Oasis API for a given market. - * @param marketId String identifying the market we want data for. Eg. 'REPAUG'. - */ - public async getMarketOrderbookAsync(marketId: string): Promise<OasisOrder[]> { - const input = { - market: marketId, - }; - const params = { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ query: OASIS_ORDERBOOK_QUERY, variables: input }), - }; - const resp = await fetchAsync(OASIS_BASE_URL, params); - const respJson: OasisOrderbookResponse = await resp.json(); - return respJson.data.allOasisOrders.nodes; - } -} diff --git a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts deleted file mode 100644 index 85042501b..000000000 --- a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts +++ /dev/null @@ -1,110 +0,0 @@ -// tslint:disable:no-duplicate-imports -import { fetchAsync } from '@0x/utils'; -import Bottleneck from 'bottleneck'; -import { stringify } from 'querystring'; -import * as R from 'ramda'; - -import { TradingPair } from '../../utils/get_ohlcv_trading_pairs'; - -export interface CryptoCompareOHLCVResponse { - Data: CryptoCompareOHLCVRecord[]; - Response: string; - Message: string; - Type: number; -} - -export interface CryptoCompareOHLCVRecord { - time: number; // in seconds, not milliseconds - close: number; - high: number; - low: number; - open: number; - volumefrom: number; - volumeto: number; -} - -export interface CryptoCompareOHLCVParams { - fsym: string; - tsym: string; - e?: string; - aggregate?: string; - aggregatePredictableTimePeriods?: boolean; - limit?: number; - toTs?: number; -} - -const ONE_HOUR = 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers -const ONE_SECOND = 1000; -const ONE_HOUR_AGO = new Date().getTime() - ONE_HOUR; -const HTTP_OK_STATUS = 200; -const CRYPTO_COMPARE_VALID_EMPTY_RESPONSE_TYPE = 96; -const MAX_PAGE_SIZE = 2000; - -export class CryptoCompareOHLCVSource { - public readonly intervalBetweenRecords = ONE_HOUR; - public readonly defaultExchange = 'CCCAGG'; - public readonly interval = this.intervalBetweenRecords * MAX_PAGE_SIZE; // the hourly API returns data for one interval at a time - private readonly _url: string = 'https://min-api.cryptocompare.com/data/histohour?'; - - // rate-limit for all API calls through this class instance - private readonly _limiter: Bottleneck; - constructor(maxReqsPerSecond: number) { - this._limiter = new Bottleneck({ - minTime: ONE_SECOND / maxReqsPerSecond, - reservoir: 30, - reservoirRefreshAmount: 30, - reservoirRefreshInterval: ONE_SECOND, - }); - } - - // gets OHLCV records starting from pair.latest - public async getHourlyOHLCVAsync(pair: TradingPair): Promise<CryptoCompareOHLCVRecord[]> { - const params = { - e: this.defaultExchange, - fsym: pair.fromSymbol, - tsym: pair.toSymbol, - limit: MAX_PAGE_SIZE, - toTs: Math.floor((pair.latestSavedTime + this.interval) / ONE_SECOND), // CryptoCompare uses timestamp in seconds. not ms - }; - const url = this._url + stringify(params); - const response = await this._limiter.schedule(() => fetchAsync(url)); - if (response.status !== HTTP_OK_STATUS) { - throw new Error(`HTTP error while scraping Crypto Compare: [${response}]`); - } - const json: CryptoCompareOHLCVResponse = await response.json(); - if ( - (json.Response === 'Error' || json.Data.length === 0) && - json.Type !== CRYPTO_COMPARE_VALID_EMPTY_RESPONSE_TYPE - ) { - throw new Error(JSON.stringify(json)); - } - return json.Data.filter(rec => { - return ( - // Crypto Compare takes ~30 mins to finalise records - rec.time * ONE_SECOND < ONE_HOUR_AGO && rec.time * ONE_SECOND > pair.latestSavedTime && hasData(rec) - ); - }); - } - public generateBackfillIntervals(pair: TradingPair): TradingPair[] { - const now = new Date().getTime(); - const f = (p: TradingPair): false | [TradingPair, TradingPair] => { - if (p.latestSavedTime > now) { - return false; - } else { - return [p, R.merge(p, { latestSavedTime: p.latestSavedTime + this.interval })]; - } - }; - return R.unfold(f, pair); - } -} - -function hasData(record: CryptoCompareOHLCVRecord): boolean { - return ( - record.close !== 0 || - record.open !== 0 || - record.high !== 0 || - record.low !== 0 || - record.volumefrom !== 0 || - record.volumeto !== 0 - ); -} diff --git a/packages/pipeline/src/data_sources/paradex/index.ts b/packages/pipeline/src/data_sources/paradex/index.ts deleted file mode 100644 index 46d448f4b..000000000 --- a/packages/pipeline/src/data_sources/paradex/index.ts +++ /dev/null @@ -1,92 +0,0 @@ -import { fetchAsync, logUtils } from '@0x/utils'; - -const PARADEX_BASE_URL = 'https://api.paradex.io/consumer/v0'; -const ACTIVE_MARKETS_URL = `${PARADEX_BASE_URL}/markets`; -const ORDERBOOK_ENDPOINT = `${PARADEX_BASE_URL}/orderbook`; -const TOKEN_INFO_ENDPOINT = `${PARADEX_BASE_URL}/tokens`; -export const PARADEX_SOURCE = 'paradex'; - -export type ParadexActiveMarketsResponse = ParadexMarket[]; - -export interface ParadexMarket { - id: string; - symbol: string; - baseToken: string; - quoteToken: string; - minOrderSize: string; - maxOrderSize: string; - priceMaxDecimals: number; - amountMaxDecimals: number; - // These are not native to the Paradex API response. We tag them on later - // by calling the token endpoint and joining on symbol. - baseTokenAddress?: string; - quoteTokenAddress?: string; -} - -export interface ParadexOrderbookResponse { - marketId: number; - marketSymbol: string; - bids: ParadexOrder[]; - asks: ParadexOrder[]; -} - -export interface ParadexOrder { - amount: string; - price: string; -} - -export type ParadexTokenInfoResponse = ParadexTokenInfo[]; - -export interface ParadexTokenInfo { - name: string; - symbol: string; - address: string; -} - -export class ParadexSource { - private readonly _apiKey: string; - - constructor(apiKey: string) { - this._apiKey = apiKey; - } - - /** - * Call Paradex API to find out which markets they are maintaining orderbooks for. - */ - public async getActiveMarketsAsync(): Promise<ParadexActiveMarketsResponse> { - logUtils.log('Getting all active Paradex markets.'); - const resp = await fetchAsync(ACTIVE_MARKETS_URL, { - headers: { 'API-KEY': this._apiKey }, - }); - const markets: ParadexActiveMarketsResponse = await resp.json(); - logUtils.log(`Got ${markets.length} markets.`); - return markets; - } - - /** - * Call Paradex API to find out their token information. - */ - public async getTokenInfoAsync(): Promise<ParadexTokenInfoResponse> { - logUtils.log('Getting token information from Paradex.'); - const resp = await fetchAsync(TOKEN_INFO_ENDPOINT, { - headers: { 'API-KEY': this._apiKey }, - }); - const tokens: ParadexTokenInfoResponse = await resp.json(); - logUtils.log(`Got information for ${tokens.length} tokens.`); - return tokens; - } - - /** - * Retrieve orderbook from Paradex API for a given market. - * @param marketSymbol String representing the market we want data for. - */ - public async getMarketOrderbookAsync(marketSymbol: string): Promise<ParadexOrderbookResponse> { - logUtils.log(`${marketSymbol}: Retrieving orderbook.`); - const marketOrderbookUrl = `${ORDERBOOK_ENDPOINT}?market=${marketSymbol}`; - const resp = await fetchAsync(marketOrderbookUrl, { - headers: { 'API-KEY': this._apiKey }, - }); - const orderbookResponse: ParadexOrderbookResponse = await resp.json(); - return orderbookResponse; - } -} diff --git a/packages/pipeline/src/data_sources/relayer-registry/index.ts b/packages/pipeline/src/data_sources/relayer-registry/index.ts deleted file mode 100644 index 8133f5eae..000000000 --- a/packages/pipeline/src/data_sources/relayer-registry/index.ts +++ /dev/null @@ -1,33 +0,0 @@ -import axios from 'axios'; - -export interface RelayerResponse { - name: string; - homepage_url: string; - app_url: string; - header_img: string; - logo_img: string; - networks: RelayerResponseNetwork[]; -} - -export interface RelayerResponseNetwork { - networkId: number; - sra_http_endpoint?: string; - sra_ws_endpoint?: string; - static_order_fields?: { - fee_recipient_addresses?: string[]; - taker_addresses?: string[]; - }; -} - -export class RelayerRegistrySource { - private readonly _url: string; - - constructor(url: string) { - this._url = url; - } - - public async getRelayerInfoAsync(): Promise<Map<string, RelayerResponse>> { - const resp = await axios.get<Map<string, RelayerResponse>>(this._url); - return resp.data; - } -} diff --git a/packages/pipeline/src/data_sources/trusted_tokens/index.ts b/packages/pipeline/src/data_sources/trusted_tokens/index.ts deleted file mode 100644 index 552739fb9..000000000 --- a/packages/pipeline/src/data_sources/trusted_tokens/index.ts +++ /dev/null @@ -1,29 +0,0 @@ -import axios from 'axios'; - -export interface ZeroExTrustedTokenMeta { - address: string; - name: string; - symbol: string; - decimals: number; -} - -export interface MetamaskTrustedTokenMeta { - address: string; - name: string; - erc20: boolean; - symbol: string; - decimals: number; -} - -export class TrustedTokenSource<T> { - private readonly _url: string; - - constructor(url: string) { - this._url = url; - } - - public async getTrustedTokenMetaAsync(): Promise<T> { - const resp = await axios.get<T>(this._url); - return resp.data; - } -} diff --git a/packages/pipeline/src/data_sources/web3/index.ts b/packages/pipeline/src/data_sources/web3/index.ts deleted file mode 100644 index 45a9ea161..000000000 --- a/packages/pipeline/src/data_sources/web3/index.ts +++ /dev/null @@ -1,22 +0,0 @@ -import { Web3ProviderEngine } from '@0x/subproviders'; -import { Web3Wrapper } from '@0x/web3-wrapper'; -import { BlockWithoutTransactionData, Transaction } from 'ethereum-types'; - -export class Web3Source { - private readonly _web3Wrapper: Web3Wrapper; - constructor(provider: Web3ProviderEngine) { - this._web3Wrapper = new Web3Wrapper(provider); - } - - public async getBlockInfoAsync(blockNumber: number): Promise<BlockWithoutTransactionData> { - const block = await this._web3Wrapper.getBlockIfExistsAsync(blockNumber); - if (block == null) { - return Promise.reject(new Error(`Could not find block for given block number: ${blockNumber}`)); - } - return block; - } - - public async getTransactionInfoAsync(txHash: string): Promise<Transaction> { - return this._web3Wrapper.getTransactionByHashAsync(txHash); - } -} diff --git a/packages/pipeline/src/entities/block.ts b/packages/pipeline/src/entities/block.ts deleted file mode 100644 index 398946622..000000000 --- a/packages/pipeline/src/entities/block.ts +++ /dev/null @@ -1,13 +0,0 @@ -import { Column, Entity, PrimaryColumn } from 'typeorm'; - -import { numberToBigIntTransformer } from '../utils'; - -@Entity({ name: 'blocks', schema: 'raw' }) -export class Block { - @PrimaryColumn() public hash!: string; - @PrimaryColumn({ transformer: numberToBigIntTransformer }) - public number!: number; - - @Column({ name: 'timestamp', transformer: numberToBigIntTransformer }) - public timestamp!: number; -} diff --git a/packages/pipeline/src/entities/copper_activity.ts b/packages/pipeline/src/entities/copper_activity.ts deleted file mode 100644 index cbc034285..000000000 --- a/packages/pipeline/src/entities/copper_activity.ts +++ /dev/null @@ -1,41 +0,0 @@ -import { Column, Entity, Index, PrimaryColumn } from 'typeorm'; - -import { numberToBigIntTransformer } from '../utils'; - -@Entity({ name: 'copper_activities', schema: 'raw' }) -export class CopperActivity { - @PrimaryColumn({ type: 'bigint', transformer: numberToBigIntTransformer }) - public id!: number; - - @Index() - @Column({ name: 'parent_id', type: 'bigint', transformer: numberToBigIntTransformer }) - public parentId!: number; - @Column({ name: 'parent_type', type: 'varchar' }) - public parentType!: string; - - // join with CopperActivityType - @Index() - @Column({ name: 'type_id', type: 'bigint', transformer: numberToBigIntTransformer }) - public typeId!: number; - @Column({ name: 'type_category', type: 'varchar' }) - public typeCategory!: string; - @Column({ name: 'type_name', type: 'varchar', nullable: true }) - public typeName?: string; - - @Column({ name: 'user_id', type: 'bigint', transformer: numberToBigIntTransformer }) - public userId!: number; - @Column({ name: 'old_value_id', type: 'bigint', nullable: true, transformer: numberToBigIntTransformer }) - public oldValueId?: number; - @Column({ name: 'old_value_name', type: 'varchar', nullable: true }) - public oldValueName?: string; - @Column({ name: 'new_value_id', type: 'bigint', nullable: true, transformer: numberToBigIntTransformer }) - public newValueId?: number; - @Column({ name: 'new_value_name', type: 'varchar', nullable: true }) - public newValueName?: string; - - @Index() - @Column({ name: 'date_created', type: 'bigint', transformer: numberToBigIntTransformer }) - public dateCreated!: number; - @PrimaryColumn({ name: 'date_modified', type: 'bigint', transformer: numberToBigIntTransformer }) - public dateModified!: number; -} diff --git a/packages/pipeline/src/entities/copper_activity_type.ts b/packages/pipeline/src/entities/copper_activity_type.ts deleted file mode 100644 index 8fb2dcf70..000000000 --- a/packages/pipeline/src/entities/copper_activity_type.ts +++ /dev/null @@ -1,17 +0,0 @@ -import { Column, Entity, PrimaryColumn } from 'typeorm'; - -import { numberToBigIntTransformer } from '../utils'; - -@Entity({ name: 'copper_activity_types', schema: 'raw' }) -export class CopperActivityType { - @PrimaryColumn({ type: 'bigint', transformer: numberToBigIntTransformer }) - public id!: number; - @Column({ name: 'category', type: 'varchar' }) - public category!: string; - @Column({ name: 'name', type: 'varchar' }) - public name!: string; - @Column({ name: 'is_disabled', type: 'boolean', nullable: true }) - public isDisabled?: boolean; - @Column({ name: 'count_as_interaction', type: 'boolean', nullable: true }) - public countAsInteraction?: boolean; -} diff --git a/packages/pipeline/src/entities/copper_custom_field.ts b/packages/pipeline/src/entities/copper_custom_field.ts deleted file mode 100644 index f23f6ab22..000000000 --- a/packages/pipeline/src/entities/copper_custom_field.ts +++ /dev/null @@ -1,15 +0,0 @@ -import { Column, Entity, PrimaryColumn } from 'typeorm'; - -import { numberToBigIntTransformer } from '../utils'; - -@Entity({ name: 'copper_custom_fields', schema: 'raw' }) -export class CopperCustomField { - @PrimaryColumn({ type: 'bigint', transformer: numberToBigIntTransformer }) - public id!: number; - @Column({ name: 'data_type', type: 'varchar' }) - public dataType!: string; - @Column({ name: 'field_type', type: 'varchar', nullable: true }) - public fieldType?: string; - @Column({ name: 'name', type: 'varchar' }) - public name!: string; -} diff --git a/packages/pipeline/src/entities/copper_lead.ts b/packages/pipeline/src/entities/copper_lead.ts deleted file mode 100644 index c51ccd761..000000000 --- a/packages/pipeline/src/entities/copper_lead.ts +++ /dev/null @@ -1,38 +0,0 @@ -import { Column, Entity, Index, PrimaryColumn } from 'typeorm'; - -import { numberToBigIntTransformer } from '../utils'; - -@Entity({ name: 'copper_leads', schema: 'raw' }) -export class CopperLead { - @PrimaryColumn({ type: 'bigint', transformer: numberToBigIntTransformer }) - public id!: number; - - @Column({ name: 'name', type: 'varchar', nullable: true }) - public name?: string; - @Column({ name: 'first_name', type: 'varchar', nullable: true }) - public firstName?: string; - @Column({ name: 'last_name', type: 'varchar', nullable: true }) - public lastName?: string; - @Column({ name: 'middle_name', type: 'varchar', nullable: true }) - public middleName?: string; - @Column({ name: 'assignee_id', type: 'bigint', transformer: numberToBigIntTransformer, nullable: true }) - public assigneeId?: number; - @Column({ name: 'company_name', type: 'varchar', nullable: true }) - public companyName?: string; - @Column({ name: 'customer_source_id', type: 'bigint', transformer: numberToBigIntTransformer, nullable: true }) - public customerSourceId?: number; - @Column({ name: 'monetary_value', type: 'integer', nullable: true }) - public monetaryValue?: number; - @Column({ name: 'status', type: 'varchar' }) - public status!: string; - @Column({ name: 'status_id', type: 'bigint', transformer: numberToBigIntTransformer }) - public statusId!: number; - @Column({ name: 'title', type: 'varchar', nullable: true }) - public title?: string; - - @Index() - @Column({ name: 'date_created', type: 'bigint', transformer: numberToBigIntTransformer }) - public dateCreated!: number; - @PrimaryColumn({ name: 'date_modified', type: 'bigint', transformer: numberToBigIntTransformer }) - public dateModified!: number; -} diff --git a/packages/pipeline/src/entities/copper_opportunity.ts b/packages/pipeline/src/entities/copper_opportunity.ts deleted file mode 100644 index e12bd69ce..000000000 --- a/packages/pipeline/src/entities/copper_opportunity.ts +++ /dev/null @@ -1,45 +0,0 @@ -import { Column, Entity, PrimaryColumn } from 'typeorm'; - -import { numberToBigIntTransformer } from '../utils'; - -@Entity({ name: 'copper_opportunities', schema: 'raw' }) -export class CopperOpportunity { - @PrimaryColumn({ name: 'id', type: 'bigint', transformer: numberToBigIntTransformer }) - public id!: number; - @Column({ name: 'name', type: 'varchar' }) - public name!: string; - @Column({ name: 'assignee_id', nullable: true, type: 'bigint', transformer: numberToBigIntTransformer }) - public assigneeId?: number; - @Column({ name: 'close_date', nullable: true, type: 'varchar' }) - public closeDate?: string; - @Column({ name: 'company_id', nullable: true, type: 'bigint', transformer: numberToBigIntTransformer }) - public companyId?: number; - @Column({ name: 'company_name', nullable: true, type: 'varchar' }) - public companyName?: string; - @Column({ name: 'customer_source_id', nullable: true, type: 'bigint', transformer: numberToBigIntTransformer }) - public customerSourceId?: number; - @Column({ name: 'loss_reason_id', nullable: true, type: 'bigint', transformer: numberToBigIntTransformer }) - public lossReasonId?: number; - @Column({ name: 'pipeline_id', type: 'bigint', transformer: numberToBigIntTransformer }) - public pipelineId!: number; - @Column({ name: 'pipeline_stage_id', type: 'bigint', transformer: numberToBigIntTransformer }) - public pipelineStageId!: number; - @Column({ name: 'primary_contact_id', nullable: true, type: 'bigint', transformer: numberToBigIntTransformer }) - public primaryContactId?: number; - @Column({ name: 'priority', nullable: true, type: 'varchar' }) - public priority?: string; - @Column({ name: 'status', type: 'varchar' }) - public status!: string; - @Column({ name: 'interaction_count', type: 'bigint', transformer: numberToBigIntTransformer }) - public interactionCount!: number; - @Column({ name: 'monetary_value', nullable: true, type: 'integer' }) - public monetaryValue?: number; - @Column({ name: 'win_probability', nullable: true, type: 'integer' }) - public winProbability?: number; - @Column({ name: 'date_created', type: 'bigint', transformer: numberToBigIntTransformer }) - public dateCreated!: number; - @PrimaryColumn({ name: 'date_modified', type: 'bigint', transformer: numberToBigIntTransformer }) - public dateModified!: number; - @Column({ name: 'custom_fields', type: 'jsonb' }) - public customFields!: { [key: number]: number }; -} diff --git a/packages/pipeline/src/entities/dex_trade.ts b/packages/pipeline/src/entities/dex_trade.ts deleted file mode 100644 index 9d288cb51..000000000 --- a/packages/pipeline/src/entities/dex_trade.ts +++ /dev/null @@ -1,54 +0,0 @@ -import { BigNumber } from '@0x/utils'; -import { Column, Entity, PrimaryColumn } from 'typeorm'; - -import { bigNumberTransformer, numberToBigIntTransformer } from '../utils'; - -@Entity({ name: 'dex_trades', schema: 'raw' }) -export class DexTrade { - @PrimaryColumn({ name: 'source_url' }) - public sourceUrl!: string; - @PrimaryColumn({ name: 'tx_hash' }) - public txHash!: string; - - @Column({ name: 'tx_timestamp', type: 'bigint', transformer: numberToBigIntTransformer }) - public txTimestamp!: number; - @Column({ name: 'tx_date' }) - public txDate!: string; - @Column({ name: 'tx_sender' }) - public txSender!: string; - @Column({ name: 'smart_contract_id', type: 'bigint', transformer: numberToBigIntTransformer }) - public smartContractId!: number; - @Column({ name: 'smart_contract_address' }) - public smartContractAddress!: string; - @Column({ name: 'contract_type' }) - public contractType!: string; - @Column({ type: 'varchar' }) - public maker!: string; - @Column({ type: 'varchar' }) - public taker!: string; - @Column({ name: 'amount_buy', type: 'numeric', transformer: bigNumberTransformer }) - public amountBuy!: BigNumber; - @Column({ name: 'maker_fee_amount', type: 'numeric', transformer: bigNumberTransformer }) - public makerFeeAmount!: BigNumber; - @Column({ name: 'buy_currency_id', type: 'bigint', transformer: numberToBigIntTransformer }) - public buyCurrencyId!: number; - @Column({ name: 'buy_symbol' }) - public buySymbol!: string; - @Column({ name: 'amount_sell', type: 'numeric', transformer: bigNumberTransformer }) - public amountSell!: BigNumber; - @Column({ name: 'taker_fee_amount', type: 'numeric', transformer: bigNumberTransformer }) - public takerFeeAmount!: BigNumber; - @Column({ name: 'sell_currency_id', type: 'bigint', transformer: numberToBigIntTransformer }) - public sellCurrencyId!: number; - @Column({ name: 'sell_symbol' }) - public sellSymbol!: string; - @Column({ name: 'maker_annotation' }) - public makerAnnotation!: string; - @Column({ name: 'taker_annotation' }) - public takerAnnotation!: string; - @Column() public protocol!: string; - @Column({ name: 'buy_address', type: 'varchar', nullable: true }) - public buyAddress!: string | null; - @Column({ name: 'sell_address', type: 'varchar', nullable: true }) - public sellAddress!: string | null; -} diff --git a/packages/pipeline/src/entities/erc20_approval_event.ts b/packages/pipeline/src/entities/erc20_approval_event.ts deleted file mode 100644 index 69cdfcb0b..000000000 --- a/packages/pipeline/src/entities/erc20_approval_event.ts +++ /dev/null @@ -1,26 +0,0 @@ -import { BigNumber } from '@0x/utils'; -import { Column, Entity, PrimaryColumn } from 'typeorm'; - -import { bigNumberTransformer, numberToBigIntTransformer } from '../utils'; - -@Entity({ name: 'erc20_approval_events', schema: 'raw' }) -export class ERC20ApprovalEvent { - @PrimaryColumn({ name: 'token_address' }) - public tokenAddress!: string; - @PrimaryColumn({ name: 'log_index' }) - public logIndex!: number; - @PrimaryColumn({ name: 'block_number', transformer: numberToBigIntTransformer }) - public blockNumber!: number; - - @Column({ name: 'raw_data' }) - public rawData!: string; - - @Column({ name: 'transaction_hash' }) - public transactionHash!: string; - @Column({ name: 'owner_address' }) - public ownerAddress!: string; - @Column({ name: 'spender_address' }) - public spenderAddress!: string; - @Column({ name: 'amount', type: 'numeric', transformer: bigNumberTransformer }) - public amount!: BigNumber; -} diff --git a/packages/pipeline/src/entities/exchange_cancel_event.ts b/packages/pipeline/src/entities/exchange_cancel_event.ts deleted file mode 100644 index 38f99c903..000000000 --- a/packages/pipeline/src/entities/exchange_cancel_event.ts +++ /dev/null @@ -1,51 +0,0 @@ -import { Column, Entity, PrimaryColumn } from 'typeorm'; - -import { AssetType } from '../types'; -import { numberToBigIntTransformer } from '../utils'; - -@Entity({ name: 'exchange_cancel_events', schema: 'raw' }) -export class ExchangeCancelEvent { - @PrimaryColumn({ name: 'contract_address' }) - public contractAddress!: string; - @PrimaryColumn({ name: 'log_index' }) - public logIndex!: number; - @PrimaryColumn({ name: 'block_number', transformer: numberToBigIntTransformer }) - public blockNumber!: number; - - @Column({ name: 'raw_data' }) - public rawData!: string; - - @Column({ name: 'transaction_hash' }) - public transactionHash!: string; - @Column({ name: 'maker_address' }) - public makerAddress!: string; - @Column({ nullable: true, type: String, name: 'taker_address' }) - public takerAddress!: string; - @Column({ name: 'fee_recipient_address' }) - public feeRecipientAddress!: string; - @Column({ name: 'sender_address' }) - public senderAddress!: string; - @Column({ name: 'order_hash' }) - public orderHash!: string; - - @Column({ name: 'raw_maker_asset_data' }) - public rawMakerAssetData!: string; - @Column({ name: 'maker_asset_type' }) - public makerAssetType!: AssetType; - @Column({ name: 'maker_asset_proxy_id' }) - public makerAssetProxyId!: string; - @Column({ name: 'maker_token_address' }) - public makerTokenAddress!: string; - @Column({ nullable: true, type: String, name: 'maker_token_id' }) - public makerTokenId!: string | null; - @Column({ name: 'raw_taker_asset_data' }) - public rawTakerAssetData!: string; - @Column({ name: 'taker_asset_type' }) - public takerAssetType!: AssetType; - @Column({ name: 'taker_asset_proxy_id' }) - public takerAssetProxyId!: string; - @Column({ name: 'taker_token_address' }) - public takerTokenAddress!: string; - @Column({ nullable: true, type: String, name: 'taker_token_id' }) - public takerTokenId!: string | null; -} diff --git a/packages/pipeline/src/entities/exchange_cancel_up_to_event.ts b/packages/pipeline/src/entities/exchange_cancel_up_to_event.ts deleted file mode 100644 index 27580305e..000000000 --- a/packages/pipeline/src/entities/exchange_cancel_up_to_event.ts +++ /dev/null @@ -1,26 +0,0 @@ -import { BigNumber } from '@0x/utils'; -import { Column, Entity, PrimaryColumn } from 'typeorm'; - -import { bigNumberTransformer, numberToBigIntTransformer } from '../utils'; - -@Entity({ name: 'exchange_cancel_up_to_events', schema: 'raw' }) -export class ExchangeCancelUpToEvent { - @PrimaryColumn({ name: 'contract_address' }) - public contractAddress!: string; - @PrimaryColumn({ name: 'log_index' }) - public logIndex!: number; - @PrimaryColumn({ name: 'block_number', transformer: numberToBigIntTransformer }) - public blockNumber!: number; - - @Column({ name: 'raw_data' }) - public rawData!: string; - - @Column({ name: 'transaction_hash' }) - public transactionHash!: string; - @Column({ name: 'maker_address' }) - public makerAddress!: string; - @Column({ name: 'sender_address' }) - public senderAddress!: string; - @Column({ name: 'order_epoch', type: 'numeric', transformer: bigNumberTransformer }) - public orderEpoch!: BigNumber; -} diff --git a/packages/pipeline/src/entities/exchange_fill_event.ts b/packages/pipeline/src/entities/exchange_fill_event.ts deleted file mode 100644 index 9b7727615..000000000 --- a/packages/pipeline/src/entities/exchange_fill_event.ts +++ /dev/null @@ -1,60 +0,0 @@ -import { BigNumber } from '@0x/utils'; -import { Column, Entity, PrimaryColumn } from 'typeorm'; - -import { AssetType } from '../types'; -import { bigNumberTransformer, numberToBigIntTransformer } from '../utils'; - -@Entity({ name: 'exchange_fill_events', schema: 'raw' }) -export class ExchangeFillEvent { - @PrimaryColumn({ name: 'contract_address' }) - public contractAddress!: string; - @PrimaryColumn({ name: 'log_index' }) - public logIndex!: number; - @PrimaryColumn({ name: 'block_number', transformer: numberToBigIntTransformer }) - public blockNumber!: number; - - @Column({ name: 'raw_data' }) - public rawData!: string; - - @Column({ name: 'transaction_hash' }) - public transactionHash!: string; - @Column({ name: 'maker_address' }) - public makerAddress!: string; - @Column({ name: 'taker_address' }) - public takerAddress!: string; - @Column({ name: 'fee_recipient_address' }) - public feeRecipientAddress!: string; - @Column({ name: 'sender_address' }) - public senderAddress!: string; - @Column({ name: 'maker_asset_filled_amount', type: 'numeric', transformer: bigNumberTransformer }) - public makerAssetFilledAmount!: BigNumber; - @Column({ name: 'taker_asset_filled_amount', type: 'numeric', transformer: bigNumberTransformer }) - public takerAssetFilledAmount!: BigNumber; - @Column({ name: 'maker_fee_paid', type: 'numeric', transformer: bigNumberTransformer }) - public makerFeePaid!: BigNumber; - @Column({ name: 'taker_fee_paid', type: 'numeric', transformer: bigNumberTransformer }) - public takerFeePaid!: BigNumber; - @Column({ name: 'order_hash' }) - public orderHash!: string; - - @Column({ name: 'raw_maker_asset_data' }) - public rawMakerAssetData!: string; - @Column({ name: 'maker_asset_type' }) - public makerAssetType!: AssetType; - @Column({ name: 'maker_asset_proxy_id' }) - public makerAssetProxyId!: string; - @Column({ name: 'maker_token_address' }) - public makerTokenAddress!: string; - @Column({ nullable: true, type: String, name: 'maker_token_id' }) - public makerTokenId!: string | null; - @Column({ name: 'raw_taker_asset_data' }) - public rawTakerAssetData!: string; - @Column({ name: 'taker_asset_type' }) - public takerAssetType!: AssetType; - @Column({ name: 'taker_asset_proxy_id' }) - public takerAssetProxyId!: string; - @Column({ name: 'taker_token_address' }) - public takerTokenAddress!: string; - @Column({ nullable: true, type: String, name: 'taker_token_id' }) - public takerTokenId!: string | null; -} diff --git a/packages/pipeline/src/entities/index.ts b/packages/pipeline/src/entities/index.ts deleted file mode 100644 index 27c153c07..000000000 --- a/packages/pipeline/src/entities/index.ts +++ /dev/null @@ -1,25 +0,0 @@ -import { ExchangeCancelEvent } from './exchange_cancel_event'; -import { ExchangeCancelUpToEvent } from './exchange_cancel_up_to_event'; -import { ExchangeFillEvent } from './exchange_fill_event'; - -export { Block } from './block'; -export { DexTrade } from './dex_trade'; -export { ExchangeCancelEvent } from './exchange_cancel_event'; -export { ExchangeCancelUpToEvent } from './exchange_cancel_up_to_event'; -export { ExchangeFillEvent } from './exchange_fill_event'; -export { OHLCVExternal } from './ohlcv_external'; -export { Relayer } from './relayer'; -export { SraOrder } from './sra_order'; -export { SraOrdersObservedTimeStamp, createObservedTimestampForOrder } from './sra_order_observed_timestamp'; -export { TokenMetadata } from './token_metadata'; -export { TokenOrderbookSnapshot } from './token_order'; -export { Transaction } from './transaction'; -export { ERC20ApprovalEvent } from './erc20_approval_event'; - -export { CopperLead } from './copper_lead'; -export { CopperActivity } from './copper_activity'; -export { CopperOpportunity } from './copper_opportunity'; -export { CopperActivityType } from './copper_activity_type'; -export { CopperCustomField } from './copper_custom_field'; - -export type ExchangeEvent = ExchangeFillEvent | ExchangeCancelEvent | ExchangeCancelUpToEvent; diff --git a/packages/pipeline/src/entities/ohlcv_external.ts b/packages/pipeline/src/entities/ohlcv_external.ts deleted file mode 100644 index 4f55dd930..000000000 --- a/packages/pipeline/src/entities/ohlcv_external.ts +++ /dev/null @@ -1,30 +0,0 @@ -import { Column, Entity, PrimaryColumn } from 'typeorm'; - -import { numberToBigIntTransformer } from '../utils'; - -@Entity({ name: 'ohlcv_external', schema: 'raw' }) -export class OHLCVExternal { - @PrimaryColumn() public exchange!: string; - - @PrimaryColumn({ name: 'from_symbol', type: 'varchar' }) - public fromSymbol!: string; - @PrimaryColumn({ name: 'to_symbol', type: 'varchar' }) - public toSymbol!: string; - @PrimaryColumn({ name: 'start_time', transformer: numberToBigIntTransformer }) - public startTime!: number; - @PrimaryColumn({ name: 'end_time', transformer: numberToBigIntTransformer }) - public endTime!: number; - - @Column() public open!: number; - @Column() public close!: number; - @Column() public low!: number; - @Column() public high!: number; - @Column({ name: 'volume_from' }) - public volumeFrom!: number; - @Column({ name: 'volume_to' }) - public volumeTo!: number; - - @PrimaryColumn() public source!: string; - @PrimaryColumn({ name: 'observed_timestamp', transformer: numberToBigIntTransformer }) - public observedTimestamp!: number; -} diff --git a/packages/pipeline/src/entities/relayer.ts b/packages/pipeline/src/entities/relayer.ts deleted file mode 100644 index 5af8578b4..000000000 --- a/packages/pipeline/src/entities/relayer.ts +++ /dev/null @@ -1,21 +0,0 @@ -import { Column, Entity, PrimaryColumn } from 'typeorm'; - -@Entity({ name: 'relayers', schema: 'raw' }) -export class Relayer { - @PrimaryColumn() public uuid!: string; - - @Column() public name!: string; - @Column({ name: 'homepage_url', type: 'varchar' }) - public homepageUrl!: string; - @Column({ name: 'sra_http_endpoint', type: 'varchar', nullable: true }) - public sraHttpEndpoint!: string | null; - @Column({ name: 'sra_ws_endpoint', type: 'varchar', nullable: true }) - public sraWsEndpoint!: string | null; - @Column({ name: 'app_url', type: 'varchar', nullable: true }) - public appUrl!: string | null; - - @Column({ name: 'fee_recipient_addresses', type: 'varchar', array: true }) - public feeRecipientAddresses!: string[]; - @Column({ name: 'taker_addresses', type: 'varchar', array: true }) - public takerAddresses!: string[]; -} diff --git a/packages/pipeline/src/entities/sra_order.ts b/packages/pipeline/src/entities/sra_order.ts deleted file mode 100644 index 9c730a0bb..000000000 --- a/packages/pipeline/src/entities/sra_order.ts +++ /dev/null @@ -1,63 +0,0 @@ -import { BigNumber } from '@0x/utils'; -import { Column, Entity, PrimaryColumn } from 'typeorm'; - -import { AssetType } from '../types'; -import { bigNumberTransformer } from '../utils'; - -@Entity({ name: 'sra_orders', schema: 'raw' }) -export class SraOrder { - @PrimaryColumn({ name: 'exchange_address' }) - public exchangeAddress!: string; - @PrimaryColumn({ name: 'order_hash_hex' }) - public orderHashHex!: string; - @PrimaryColumn({ name: 'source_url' }) - public sourceUrl!: string; - - @Column({ name: 'maker_address' }) - public makerAddress!: string; - @Column({ name: 'taker_address' }) - public takerAddress!: string; - @Column({ name: 'fee_recipient_address' }) - public feeRecipientAddress!: string; - @Column({ name: 'sender_address' }) - public senderAddress!: string; - @Column({ name: 'maker_asset_amount', type: 'numeric', transformer: bigNumberTransformer }) - public makerAssetAmount!: BigNumber; - @Column({ name: 'taker_asset_amount', type: 'numeric', transformer: bigNumberTransformer }) - public takerAssetAmount!: BigNumber; - @Column({ name: 'maker_fee', type: 'numeric', transformer: bigNumberTransformer }) - public makerFee!: BigNumber; - @Column({ name: 'taker_fee', type: 'numeric', transformer: bigNumberTransformer }) - public takerFee!: BigNumber; - @Column({ name: 'expiration_time_seconds', type: 'numeric', transformer: bigNumberTransformer }) - public expirationTimeSeconds!: BigNumber; - @Column({ name: 'salt', type: 'numeric', transformer: bigNumberTransformer }) - public salt!: BigNumber; - @Column({ name: 'signature' }) - public signature!: string; - - @Column({ name: 'raw_maker_asset_data' }) - public rawMakerAssetData!: string; - @Column({ name: 'maker_asset_type' }) - public makerAssetType!: AssetType; - @Column({ name: 'maker_asset_proxy_id' }) - public makerAssetProxyId!: string; - @Column({ name: 'maker_token_address' }) - public makerTokenAddress!: string; - @Column({ nullable: true, type: String, name: 'maker_token_id' }) - public makerTokenId!: string | null; - @Column({ name: 'raw_taker_asset_data' }) - public rawTakerAssetData!: string; - @Column({ name: 'taker_asset_type' }) - public takerAssetType!: AssetType; - @Column({ name: 'taker_asset_proxy_id' }) - public takerAssetProxyId!: string; - @Column({ name: 'taker_token_address' }) - public takerTokenAddress!: string; - @Column({ nullable: true, type: String, name: 'taker_token_id' }) - public takerTokenId!: string | null; - - // TODO(albrow): Make this optional? - @Column({ name: 'metadata_json' }) - public metadataJson!: string; -} diff --git a/packages/pipeline/src/entities/sra_order_observed_timestamp.ts b/packages/pipeline/src/entities/sra_order_observed_timestamp.ts deleted file mode 100644 index cbec1c6d0..000000000 --- a/packages/pipeline/src/entities/sra_order_observed_timestamp.ts +++ /dev/null @@ -1,35 +0,0 @@ -import { Entity, PrimaryColumn } from 'typeorm'; - -import { numberToBigIntTransformer } from '../utils'; - -import { SraOrder } from './sra_order'; - -@Entity({ name: 'sra_orders_observed_timestamps', schema: 'raw' }) -export class SraOrdersObservedTimeStamp { - @PrimaryColumn({ name: 'exchange_address' }) - public exchangeAddress!: string; - @PrimaryColumn({ name: 'order_hash_hex' }) - public orderHashHex!: string; - @PrimaryColumn({ name: 'source_url' }) - public sourceUrl!: string; - - @PrimaryColumn({ name: 'observed_timestamp', transformer: numberToBigIntTransformer }) - public observedTimestamp!: number; -} - -/** - * Returns a new SraOrdersObservedTimeStamp for the given order based on the - * current time. - * @param order The order to generate a timestamp for. - */ -export function createObservedTimestampForOrder( - order: SraOrder, - observedTimestamp: number, -): SraOrdersObservedTimeStamp { - const observed = new SraOrdersObservedTimeStamp(); - observed.exchangeAddress = order.exchangeAddress; - observed.orderHashHex = order.orderHashHex; - observed.sourceUrl = order.sourceUrl; - observed.observedTimestamp = observedTimestamp; - return observed; -} diff --git a/packages/pipeline/src/entities/token_metadata.ts b/packages/pipeline/src/entities/token_metadata.ts deleted file mode 100644 index 911b53972..000000000 --- a/packages/pipeline/src/entities/token_metadata.ts +++ /dev/null @@ -1,22 +0,0 @@ -import { BigNumber } from '@0x/utils'; -import { Column, Entity, PrimaryColumn } from 'typeorm'; - -import { bigNumberTransformer } from '../utils/transformers'; - -@Entity({ name: 'token_metadata', schema: 'raw' }) -export class TokenMetadata { - @PrimaryColumn({ type: 'varchar', nullable: false }) - public address!: string; - - @PrimaryColumn({ type: 'varchar', nullable: false }) - public authority!: string; - - @Column({ type: 'numeric', transformer: bigNumberTransformer, nullable: true }) - public decimals!: BigNumber | null; - - @Column({ type: 'varchar', nullable: true }) - public symbol!: string | null; - - @Column({ type: 'varchar', nullable: true }) - public name!: string | null; -} diff --git a/packages/pipeline/src/entities/token_order.ts b/packages/pipeline/src/entities/token_order.ts deleted file mode 100644 index 2709747cb..000000000 --- a/packages/pipeline/src/entities/token_order.ts +++ /dev/null @@ -1,28 +0,0 @@ -import { BigNumber } from '@0x/utils'; -import { Column, Entity, PrimaryColumn } from 'typeorm'; - -import { bigNumberTransformer, numberToBigIntTransformer } from '../utils'; - -@Entity({ name: 'token_orderbook_snapshots', schema: 'raw' }) -export class TokenOrderbookSnapshot { - @PrimaryColumn({ name: 'observed_timestamp', type: 'bigint', transformer: numberToBigIntTransformer }) - public observedTimestamp!: number; - @PrimaryColumn({ name: 'source' }) - public source!: string; - @PrimaryColumn({ name: 'order_type' }) - public orderType!: string; - @PrimaryColumn({ name: 'price', type: 'numeric', transformer: bigNumberTransformer }) - public price!: BigNumber; - @PrimaryColumn({ name: 'base_asset_symbol' }) - public baseAssetSymbol!: string; - @Column({ nullable: true, type: String, name: 'base_asset_address' }) - public baseAssetAddress!: string | null; - @Column({ name: 'base_volume', type: 'numeric', transformer: bigNumberTransformer }) - public baseVolume!: BigNumber; - @PrimaryColumn({ name: 'quote_asset_symbol' }) - public quoteAssetSymbol!: string; - @Column({ nullable: true, type: String, name: 'quote_asset_address' }) - public quoteAssetAddress!: string | null; - @Column({ name: 'quote_volume', type: 'numeric', transformer: bigNumberTransformer }) - public quoteVolume!: BigNumber; -} diff --git a/packages/pipeline/src/entities/transaction.ts b/packages/pipeline/src/entities/transaction.ts deleted file mode 100644 index 742050177..000000000 --- a/packages/pipeline/src/entities/transaction.ts +++ /dev/null @@ -1,19 +0,0 @@ -import { BigNumber } from '@0x/utils'; -import { Column, Entity, PrimaryColumn } from 'typeorm'; - -import { bigNumberTransformer, numberToBigIntTransformer } from '../utils'; - -@Entity({ name: 'transactions', schema: 'raw' }) -export class Transaction { - @PrimaryColumn({ name: 'transaction_hash' }) - public transactionHash!: string; - @PrimaryColumn({ name: 'block_hash' }) - public blockHash!: string; - @PrimaryColumn({ name: 'block_number', transformer: numberToBigIntTransformer }) - public blockNumber!: number; - - @Column({ type: 'numeric', name: 'gas_used', transformer: bigNumberTransformer }) - public gasUsed!: BigNumber; - @Column({ type: 'numeric', name: 'gas_price', transformer: bigNumberTransformer }) - public gasPrice!: BigNumber; -} diff --git a/packages/pipeline/src/ormconfig.ts b/packages/pipeline/src/ormconfig.ts deleted file mode 100644 index 2700714cd..000000000 --- a/packages/pipeline/src/ormconfig.ts +++ /dev/null @@ -1,54 +0,0 @@ -import { ConnectionOptions } from 'typeorm'; - -import { - Block, - CopperActivity, - CopperActivityType, - CopperCustomField, - CopperLead, - CopperOpportunity, - DexTrade, - ERC20ApprovalEvent, - ExchangeCancelEvent, - ExchangeCancelUpToEvent, - ExchangeFillEvent, - OHLCVExternal, - Relayer, - SraOrder, - SraOrdersObservedTimeStamp, - TokenMetadata, - TokenOrderbookSnapshot, - Transaction, -} from './entities'; - -const entities = [ - Block, - CopperOpportunity, - CopperActivity, - CopperActivityType, - CopperCustomField, - CopperLead, - DexTrade, - ExchangeCancelEvent, - ExchangeCancelUpToEvent, - ExchangeFillEvent, - ERC20ApprovalEvent, - OHLCVExternal, - Relayer, - SraOrder, - SraOrdersObservedTimeStamp, - TokenMetadata, - TokenOrderbookSnapshot, - Transaction, -]; - -const config: ConnectionOptions = { - type: 'postgres', - url: process.env.ZEROEX_DATA_PIPELINE_DB_URL, - synchronize: false, - logging: ['error'], - entities, - migrations: ['./lib/migrations/**/*.js'], -}; - -module.exports = config; diff --git a/packages/pipeline/src/parsers/bloxy/index.ts b/packages/pipeline/src/parsers/bloxy/index.ts deleted file mode 100644 index caa55d289..000000000 --- a/packages/pipeline/src/parsers/bloxy/index.ts +++ /dev/null @@ -1,53 +0,0 @@ -import { BigNumber } from '@0x/utils'; -import * as R from 'ramda'; - -import { BLOXY_DEX_TRADES_URL, BloxyTrade } from '../../data_sources/bloxy'; -import { DexTrade } from '../../entities'; - -/** - * Parses a raw trades response from the Bloxy Dex API and returns an array of - * DexTrade entities. - * @param rawTrades A raw order response from an SRA endpoint. - */ -export function parseBloxyTrades(rawTrades: BloxyTrade[]): DexTrade[] { - return R.map(_parseBloxyTrade, rawTrades); -} - -/** - * Converts a single Bloxy trade into a DexTrade entity. - * @param rawTrade A single trade from the response from the Bloxy API. - */ -export function _parseBloxyTrade(rawTrade: BloxyTrade): DexTrade { - const dexTrade = new DexTrade(); - dexTrade.sourceUrl = BLOXY_DEX_TRADES_URL; - dexTrade.txHash = rawTrade.tx_hash; - dexTrade.txTimestamp = new Date(rawTrade.tx_time).getTime(); - dexTrade.txDate = rawTrade.tx_date; - dexTrade.txSender = rawTrade.tx_sender; - dexTrade.smartContractId = rawTrade.smart_contract_id; - dexTrade.smartContractAddress = rawTrade.smart_contract_address; - dexTrade.contractType = rawTrade.contract_type; - dexTrade.maker = rawTrade.maker; - dexTrade.taker = rawTrade.taker; - // TODO(albrow): The Bloxy API returns amounts and fees as a `number` type - // but some of their values have too many significant digits to be - // represented that way. Ideally they will switch to using strings and then - // we can update this code. - dexTrade.amountBuy = new BigNumber(rawTrade.amountBuy.toString()); - dexTrade.makerFeeAmount = new BigNumber(rawTrade.makerFee.toString()); - dexTrade.buyCurrencyId = rawTrade.buyCurrencyId; - dexTrade.buySymbol = filterNullCharacters(rawTrade.buySymbol); - dexTrade.amountSell = new BigNumber(rawTrade.amountSell.toString()); - dexTrade.takerFeeAmount = new BigNumber(rawTrade.takerFee.toString()); - dexTrade.sellCurrencyId = rawTrade.sellCurrencyId; - dexTrade.sellSymbol = filterNullCharacters(rawTrade.sellSymbol); - dexTrade.makerAnnotation = rawTrade.maker_annotation; - dexTrade.takerAnnotation = rawTrade.taker_annotation; - dexTrade.protocol = rawTrade.protocol; - dexTrade.buyAddress = rawTrade.buyAddress; - dexTrade.sellAddress = rawTrade.sellAddress; - return dexTrade; -} - -// Works with any form of escaped null character (e.g., '\0' and '\u0000'). -const filterNullCharacters = R.replace(/\0/g, ''); diff --git a/packages/pipeline/src/parsers/copper/index.ts b/packages/pipeline/src/parsers/copper/index.ts deleted file mode 100644 index 07da66d10..000000000 --- a/packages/pipeline/src/parsers/copper/index.ts +++ /dev/null @@ -1,259 +0,0 @@ -import * as R from 'ramda'; - -import { CopperActivity, CopperActivityType, CopperCustomField, CopperLead, CopperOpportunity } from '../../entities'; - -const ONE_SECOND = 1000; -export type CopperSearchResponse = CopperLeadResponse | CopperActivityResponse | CopperOpportunityResponse; -export interface CopperLeadResponse { - id: number; - name?: string; - first_name?: string; - last_name?: string; - middle_name?: string; - assignee_id?: number; - company_name?: string; - customer_source_id?: number; - monetary_value?: number; - status: string; - status_id: number; - title?: string; - date_created: number; // in seconds - date_modified: number; // in seconds -} - -export interface CopperActivityResponse { - id: number; - parent: CopperActivityParentResponse; - type: CopperActivityTypeResponse; - user_id: number; - activity_date: number; - old_value: CopperActivityValueResponse; - new_value: CopperActivityValueResponse; - date_created: number; // in seconds - date_modified: number; // in seconds -} - -export interface CopperActivityValueResponse { - id: number; - name: string; -} -export interface CopperActivityParentResponse { - id: number; - type: string; -} - -// custom activity types -export enum CopperActivityTypeCategory { - User = 'user', - System = 'system', -} -export interface CopperActivityTypeResponse { - id: number; - category: CopperActivityTypeCategory; - name: string; - is_disabled?: boolean; - count_as_interaction?: boolean; -} - -export interface CopperOpportunityResponse { - id: number; - name: string; - assignee_id?: number; - close_date?: string; - company_id?: number; - company_name?: string; - customer_source_id?: number; - loss_reason_id?: number; - pipeline_id: number; - pipeline_stage_id: number; - primary_contact_id?: number; - priority?: string; - status: string; - tags: string[]; - interaction_count: number; - monetary_value?: number; - win_probability?: number; - date_created: number; // in seconds - date_modified: number; // in seconds - custom_fields: CopperNestedCustomFieldResponse[]; -} -interface CopperNestedCustomFieldResponse { - custom_field_definition_id: number; - value: number | number[] | null; -} -// custom fields -export enum CopperCustomFieldType { - String = 'String', - Text = 'Text', - Dropdown = 'Dropdown', - MultiSelect = 'MultiSelect', // not in API documentation but shows up in results - Date = 'Date', - Checkbox = 'Checkbox', - Float = 'Float', - URL = 'URL', // tslint:disable-line:enum-naming - Percentage = 'Percentage', - Currency = 'Currency', - Connect = 'Connect', -} -export interface CopperCustomFieldOptionResponse { - id: number; - name: string; -} -export interface CopperCustomFieldResponse { - id: number; - name: string; - data_type: CopperCustomFieldType; - options?: CopperCustomFieldOptionResponse[]; -} -/** - * Parse response from Copper API /search/leads/ - * - * @param leads - The array of leads returned from the API - * @returns Returns an array of Copper Lead entities - */ -export function parseLeads(leads: CopperLeadResponse[]): CopperLead[] { - return leads.map(lead => { - const entity = new CopperLead(); - entity.id = lead.id; - entity.name = lead.name || undefined; - entity.firstName = lead.first_name || undefined; - entity.lastName = lead.last_name || undefined; - entity.middleName = lead.middle_name || undefined; - entity.assigneeId = lead.assignee_id || undefined; - entity.companyName = lead.company_name || undefined; - entity.customerSourceId = lead.customer_source_id || undefined; - entity.monetaryValue = lead.monetary_value || undefined; - entity.status = lead.status; - entity.statusId = lead.status_id; - entity.title = lead.title || undefined; - entity.dateCreated = lead.date_created * ONE_SECOND; - entity.dateModified = lead.date_modified * ONE_SECOND; - return entity; - }); -} - -/** - * Parse response from Copper API /search/activities/ - * - * @param activities - The array of activities returned from the API - * @returns Returns an array of Copper Activity entities - */ -export function parseActivities(activities: CopperActivityResponse[]): CopperActivity[] { - return activities.map(activity => { - const entity = new CopperActivity(); - entity.id = activity.id; - - entity.parentId = activity.parent.id; - entity.parentType = activity.parent.type; - - entity.typeId = activity.type.id; - entity.typeCategory = activity.type.category.toString(); - entity.typeName = activity.type.name; - - entity.userId = activity.user_id; - entity.dateCreated = activity.date_created * ONE_SECOND; - entity.dateModified = activity.date_modified * ONE_SECOND; - - // nested nullable fields - entity.oldValueId = R.path(['old_value', 'id'], activity); - entity.oldValueName = R.path(['old_value', 'name'], activity); - entity.newValueId = R.path(['new_value', 'id'], activity); - entity.newValueName = R.path(['new_value', 'name'], activity); - - return entity; - }); -} - -/** - * Parse response from Copper API /search/opportunities/ - * - * @param opportunities - The array of opportunities returned from the API - * @returns Returns an array of Copper Opportunity entities - */ -export function parseOpportunities(opportunities: CopperOpportunityResponse[]): CopperOpportunity[] { - return opportunities.map(opp => { - const customFields: { [key: number]: number } = opp.custom_fields - .filter(f => f.value !== null) - .map(f => ({ - ...f, - value: ([] as number[]).concat(f.value || []), // normalise all values to number[] - })) - .map(f => f.value.map(val => [f.custom_field_definition_id, val] as [number, number])) // pair each value with the custom_field_definition_id - .reduce((acc, pair) => acc.concat(pair)) // flatten - .reduce<{ [key: number]: number }>((obj, [key, value]) => { - // transform into object literal - obj[key] = value; - return obj; - }, {}); - - const entity = new CopperOpportunity(); - entity.id = opp.id; - entity.name = opp.name; - entity.assigneeId = opp.assignee_id || undefined; - entity.closeDate = opp.close_date || undefined; - entity.companyId = opp.company_id || undefined; - entity.companyName = opp.company_name || undefined; - entity.customerSourceId = opp.customer_source_id || undefined; - entity.lossReasonId = opp.loss_reason_id || undefined; - entity.pipelineId = opp.pipeline_id; - entity.pipelineStageId = opp.pipeline_stage_id; - entity.primaryContactId = opp.primary_contact_id || undefined; - entity.priority = opp.priority || undefined; - entity.status = opp.status; - entity.interactionCount = opp.interaction_count; - entity.monetaryValue = opp.monetary_value || undefined; - entity.winProbability = opp.win_probability === null ? undefined : opp.win_probability; - entity.dateCreated = opp.date_created * ONE_SECOND; - entity.dateModified = opp.date_modified * ONE_SECOND; - entity.customFields = customFields; - return entity; - }); -} - -/** - * Parse response from Copper API /activity_types/ - * - * @param activityTypeResponse - Activity Types response from the API, keyed by "user" or "system" - * @returns Returns an array of Copper Activity Type entities - */ -export function parseActivityTypes( - activityTypeResponse: Map<CopperActivityTypeCategory, CopperActivityTypeResponse[]>, -): CopperActivityType[] { - const values: CopperActivityTypeResponse[] = R.flatten(Object.values(activityTypeResponse)); - return values.map(activityType => ({ - id: activityType.id, - name: activityType.name, - category: activityType.category.toString(), - isDisabled: activityType.is_disabled, - countAsInteraction: activityType.count_as_interaction, - })); -} - -/** - * Parse response from Copper API /custom_field_definitions/ - * - * @param customFieldResponse - array of custom field definitions returned from the API, consisting of top-level fields and nested fields - * @returns Returns an array of Copper Custom Field entities - */ -export function parseCustomFields(customFieldResponse: CopperCustomFieldResponse[]): CopperCustomField[] { - function parseTopLevelField(field: CopperCustomFieldResponse): CopperCustomField[] { - const topLevelField: CopperCustomField = { - id: field.id, - name: field.name, - dataType: field.data_type.toString(), - }; - - if (field.options !== undefined) { - const nestedFields: CopperCustomField[] = field.options.map(option => ({ - id: option.id, - name: option.name, - dataType: field.name, - fieldType: 'option', - })); - return nestedFields.concat(topLevelField); - } else { - return [topLevelField]; - } - } - return R.chain(parseTopLevelField, customFieldResponse); -} diff --git a/packages/pipeline/src/parsers/ddex_orders/index.ts b/packages/pipeline/src/parsers/ddex_orders/index.ts deleted file mode 100644 index 562f894ab..000000000 --- a/packages/pipeline/src/parsers/ddex_orders/index.ts +++ /dev/null @@ -1,69 +0,0 @@ -import { BigNumber } from '@0x/utils'; - -import { aggregateOrders } from '../utils'; - -import { DdexMarket, DdexOrderbook } from '../../data_sources/ddex'; -import { TokenOrderbookSnapshot as TokenOrder } from '../../entities'; -import { OrderType } from '../../types'; - -/** - * Marque function of this file. - * 1) Takes in orders from an orderbook, - * other information attached. - * @param ddexOrderbook A raw orderbook that we pull from the Ddex API. - * @param ddexMarket An object containing market data also directly from the API. - * @param observedTimestamp Time at which the orders for the market were pulled. - * @param source The exchange where these orders are placed. In this case 'ddex'. - */ -export function parseDdexOrders( - ddexOrderbook: DdexOrderbook, - ddexMarket: DdexMarket, - observedTimestamp: number, - source: string, -): TokenOrder[] { - const aggregatedBids = aggregateOrders(ddexOrderbook.bids); - const aggregatedAsks = aggregateOrders(ddexOrderbook.asks); - const parsedBids = aggregatedBids.map(order => - parseDdexOrder(ddexMarket, observedTimestamp, OrderType.Bid, source, order), - ); - const parsedAsks = aggregatedAsks.map(order => - parseDdexOrder(ddexMarket, observedTimestamp, OrderType.Ask, source, order), - ); - return parsedBids.concat(parsedAsks); -} - -/** - * Parse a single aggregated Ddex order in order to form a tokenOrder entity - * which can be saved into the database. - * @param ddexMarket An object containing information about the market where these - * trades have been placed. - * @param observedTimestamp The time when the API response returned back to us. - * @param orderType 'bid' or 'ask' enum. - * @param source Exchange where these orders were placed. - * @param ddexOrder A <price, amount> tuple which we will convert to volume-basis. - */ -export function parseDdexOrder( - ddexMarket: DdexMarket, - observedTimestamp: number, - orderType: OrderType, - source: string, - ddexOrder: [string, BigNumber], -): TokenOrder { - const tokenOrder = new TokenOrder(); - const price = new BigNumber(ddexOrder[0]); - const amount = ddexOrder[1]; - - tokenOrder.source = source; - tokenOrder.observedTimestamp = observedTimestamp; - tokenOrder.orderType = orderType; - tokenOrder.price = price; - - tokenOrder.baseAssetSymbol = ddexMarket.baseToken; - tokenOrder.baseAssetAddress = ddexMarket.baseTokenAddress; - tokenOrder.baseVolume = amount; - - tokenOrder.quoteAssetSymbol = ddexMarket.quoteToken; - tokenOrder.quoteAssetAddress = ddexMarket.quoteTokenAddress; - tokenOrder.quoteVolume = price.times(amount); - return tokenOrder; -} diff --git a/packages/pipeline/src/parsers/events/erc20_events.ts b/packages/pipeline/src/parsers/events/erc20_events.ts deleted file mode 100644 index caf9984d0..000000000 --- a/packages/pipeline/src/parsers/events/erc20_events.ts +++ /dev/null @@ -1,34 +0,0 @@ -import { ERC20TokenApprovalEventArgs } from '@0x/contract-wrappers'; -import { LogWithDecodedArgs } from 'ethereum-types'; -import * as R from 'ramda'; - -import { ERC20ApprovalEvent } from '../../entities'; - -/** - * Parses raw event logs for an ERC20 approval event and returns an array of - * ERC20ApprovalEvent entities. - * @param eventLogs Raw event logs (e.g. returned from contract-wrappers). - */ -export const parseERC20ApprovalEvents: ( - eventLogs: Array<LogWithDecodedArgs<ERC20TokenApprovalEventArgs>>, -) => ERC20ApprovalEvent[] = R.map(_convertToERC20ApprovalEvent); - -/** - * Converts a raw event log for an ERC20 approval event into an - * ERC20ApprovalEvent entity. - * @param eventLog Raw event log (e.g. returned from contract-wrappers). - */ -export function _convertToERC20ApprovalEvent( - eventLog: LogWithDecodedArgs<ERC20TokenApprovalEventArgs>, -): ERC20ApprovalEvent { - const erc20ApprovalEvent = new ERC20ApprovalEvent(); - erc20ApprovalEvent.tokenAddress = eventLog.address as string; - erc20ApprovalEvent.blockNumber = eventLog.blockNumber as number; - erc20ApprovalEvent.logIndex = eventLog.logIndex as number; - erc20ApprovalEvent.rawData = eventLog.data as string; - erc20ApprovalEvent.transactionHash = eventLog.transactionHash; - erc20ApprovalEvent.ownerAddress = eventLog.args._owner; - erc20ApprovalEvent.spenderAddress = eventLog.args._spender; - erc20ApprovalEvent.amount = eventLog.args._value; - return erc20ApprovalEvent; -} diff --git a/packages/pipeline/src/parsers/events/exchange_events.ts b/packages/pipeline/src/parsers/events/exchange_events.ts deleted file mode 100644 index 9c4a5f89a..000000000 --- a/packages/pipeline/src/parsers/events/exchange_events.ts +++ /dev/null @@ -1,145 +0,0 @@ -import { ExchangeCancelEventArgs, ExchangeCancelUpToEventArgs, ExchangeFillEventArgs } from '@0x/contract-wrappers'; -import { assetDataUtils } from '@0x/order-utils'; -import { AssetProxyId, ERC721AssetData } from '@0x/types'; -import { LogWithDecodedArgs } from 'ethereum-types'; -import * as R from 'ramda'; - -import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeFillEvent } from '../../entities'; -import { bigNumbertoStringOrNull, convertAssetProxyIdToType } from '../../utils'; - -/** - * Parses raw event logs for a fill event and returns an array of - * ExchangeFillEvent entities. - * @param eventLogs Raw event logs (e.g. returned from contract-wrappers). - */ -export const parseExchangeFillEvents: ( - eventLogs: Array<LogWithDecodedArgs<ExchangeFillEventArgs>>, -) => ExchangeFillEvent[] = R.map(_convertToExchangeFillEvent); - -/** - * Parses raw event logs for a cancel event and returns an array of - * ExchangeCancelEvent entities. - * @param eventLogs Raw event logs (e.g. returned from contract-wrappers). - */ -export const parseExchangeCancelEvents: ( - eventLogs: Array<LogWithDecodedArgs<ExchangeCancelEventArgs>>, -) => ExchangeCancelEvent[] = R.map(_convertToExchangeCancelEvent); - -/** - * Parses raw event logs for a CancelUpTo event and returns an array of - * ExchangeCancelUpToEvent entities. - * @param eventLogs Raw event logs (e.g. returned from contract-wrappers). - */ -export const parseExchangeCancelUpToEvents: ( - eventLogs: Array<LogWithDecodedArgs<ExchangeCancelUpToEventArgs>>, -) => ExchangeCancelUpToEvent[] = R.map(_convertToExchangeCancelUpToEvent); - -/** - * Converts a raw event log for a fill event into an ExchangeFillEvent entity. - * @param eventLog Raw event log (e.g. returned from contract-wrappers). - */ -export function _convertToExchangeFillEvent(eventLog: LogWithDecodedArgs<ExchangeFillEventArgs>): ExchangeFillEvent { - const makerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.makerAssetData); - const takerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.takerAssetData); - const exchangeFillEvent = new ExchangeFillEvent(); - exchangeFillEvent.contractAddress = eventLog.address as string; - exchangeFillEvent.blockNumber = eventLog.blockNumber as number; - exchangeFillEvent.logIndex = eventLog.logIndex as number; - exchangeFillEvent.rawData = eventLog.data as string; - exchangeFillEvent.transactionHash = eventLog.transactionHash; - exchangeFillEvent.makerAddress = eventLog.args.makerAddress; - exchangeFillEvent.takerAddress = eventLog.args.takerAddress; - exchangeFillEvent.feeRecipientAddress = eventLog.args.feeRecipientAddress; - exchangeFillEvent.senderAddress = eventLog.args.senderAddress; - exchangeFillEvent.makerAssetFilledAmount = eventLog.args.makerAssetFilledAmount; - exchangeFillEvent.takerAssetFilledAmount = eventLog.args.takerAssetFilledAmount; - exchangeFillEvent.makerFeePaid = eventLog.args.makerFeePaid; - exchangeFillEvent.takerFeePaid = eventLog.args.takerFeePaid; - exchangeFillEvent.orderHash = eventLog.args.orderHash; - exchangeFillEvent.rawMakerAssetData = eventLog.args.makerAssetData; - // tslint:disable-next-line:no-unnecessary-type-assertion - exchangeFillEvent.makerAssetType = convertAssetProxyIdToType(makerAssetData.assetProxyId as AssetProxyId); - exchangeFillEvent.makerAssetProxyId = makerAssetData.assetProxyId; - // HACK(abandeali1): this event schema currently does not support multiple maker/taker assets, so we store the first token address from the MultiAssetProxy assetData - exchangeFillEvent.makerTokenAddress = assetDataUtils.isMultiAssetData(makerAssetData) - ? assetDataUtils.decodeMultiAssetDataRecursively(eventLog.args.makerAssetData).nestedAssetData[0].tokenAddress - : makerAssetData.tokenAddress; - // tslint has a false positive here. Type assertion is required. - // tslint:disable-next-line:no-unnecessary-type-assertion - exchangeFillEvent.makerTokenId = bigNumbertoStringOrNull((makerAssetData as ERC721AssetData).tokenId); - exchangeFillEvent.rawTakerAssetData = eventLog.args.takerAssetData; - // tslint:disable-next-line:no-unnecessary-type-assertion - exchangeFillEvent.takerAssetType = convertAssetProxyIdToType(takerAssetData.assetProxyId as AssetProxyId); - exchangeFillEvent.takerAssetProxyId = takerAssetData.assetProxyId; - // HACK(abandeali1): this event schema currently does not support multiple maker/taker assets, so we store the first token address from the MultiAssetProxy assetData - exchangeFillEvent.takerTokenAddress = assetDataUtils.isMultiAssetData(takerAssetData) - ? assetDataUtils.decodeMultiAssetDataRecursively(eventLog.args.takerAssetData).nestedAssetData[0].tokenAddress - : takerAssetData.tokenAddress; - // tslint:disable-next-line:no-unnecessary-type-assertion - exchangeFillEvent.takerTokenId = bigNumbertoStringOrNull((takerAssetData as ERC721AssetData).tokenId); - return exchangeFillEvent; -} - -/** - * Converts a raw event log for a cancel event into an ExchangeCancelEvent - * entity. - * @param eventLog Raw event log (e.g. returned from contract-wrappers). - */ -export function _convertToExchangeCancelEvent( - eventLog: LogWithDecodedArgs<ExchangeCancelEventArgs>, -): ExchangeCancelEvent { - const makerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.makerAssetData); - const takerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.takerAssetData); - const exchangeCancelEvent = new ExchangeCancelEvent(); - exchangeCancelEvent.contractAddress = eventLog.address as string; - exchangeCancelEvent.blockNumber = eventLog.blockNumber as number; - exchangeCancelEvent.logIndex = eventLog.logIndex as number; - exchangeCancelEvent.rawData = eventLog.data as string; - exchangeCancelEvent.transactionHash = eventLog.transactionHash; - exchangeCancelEvent.makerAddress = eventLog.args.makerAddress; - exchangeCancelEvent.takerAddress = eventLog.args.takerAddress; - exchangeCancelEvent.feeRecipientAddress = eventLog.args.feeRecipientAddress; - exchangeCancelEvent.senderAddress = eventLog.args.senderAddress; - exchangeCancelEvent.orderHash = eventLog.args.orderHash; - exchangeCancelEvent.rawMakerAssetData = eventLog.args.makerAssetData; - // tslint:disable-next-line:no-unnecessary-type-assertion - exchangeCancelEvent.makerAssetType = convertAssetProxyIdToType(makerAssetData.assetProxyId as AssetProxyId); - exchangeCancelEvent.makerAssetProxyId = makerAssetData.assetProxyId; - // HACK(abandeali1): this event schema currently does not support multiple maker/taker assets, so we store the first token address from the MultiAssetProxy assetData - exchangeCancelEvent.makerTokenAddress = assetDataUtils.isMultiAssetData(makerAssetData) - ? assetDataUtils.decodeMultiAssetDataRecursively(eventLog.args.makerAssetData).nestedAssetData[0].tokenAddress - : makerAssetData.tokenAddress; - // tslint:disable-next-line:no-unnecessary-type-assertion - exchangeCancelEvent.makerTokenId = bigNumbertoStringOrNull((makerAssetData as ERC721AssetData).tokenId); - exchangeCancelEvent.rawTakerAssetData = eventLog.args.takerAssetData; - // tslint:disable-next-line:no-unnecessary-type-assertion - exchangeCancelEvent.takerAssetType = convertAssetProxyIdToType(takerAssetData.assetProxyId as AssetProxyId); - exchangeCancelEvent.takerAssetProxyId = takerAssetData.assetProxyId; - // HACK(abandeali1): this event schema currently does not support multiple maker/taker assets, so we store the first token address from the MultiAssetProxy assetData - exchangeCancelEvent.takerTokenAddress = assetDataUtils.isMultiAssetData(takerAssetData) - ? assetDataUtils.decodeMultiAssetDataRecursively(eventLog.args.takerAssetData).nestedAssetData[0].tokenAddress - : takerAssetData.tokenAddress; - // tslint:disable-next-line:no-unnecessary-type-assertion - exchangeCancelEvent.takerTokenId = bigNumbertoStringOrNull((takerAssetData as ERC721AssetData).tokenId); - return exchangeCancelEvent; -} - -/** - * Converts a raw event log for a cancelUpTo event into an - * ExchangeCancelUpToEvent entity. - * @param eventLog Raw event log (e.g. returned from contract-wrappers). - */ -export function _convertToExchangeCancelUpToEvent( - eventLog: LogWithDecodedArgs<ExchangeCancelUpToEventArgs>, -): ExchangeCancelUpToEvent { - const exchangeCancelUpToEvent = new ExchangeCancelUpToEvent(); - exchangeCancelUpToEvent.contractAddress = eventLog.address as string; - exchangeCancelUpToEvent.blockNumber = eventLog.blockNumber as number; - exchangeCancelUpToEvent.logIndex = eventLog.logIndex as number; - exchangeCancelUpToEvent.rawData = eventLog.data as string; - exchangeCancelUpToEvent.transactionHash = eventLog.transactionHash; - exchangeCancelUpToEvent.makerAddress = eventLog.args.makerAddress; - exchangeCancelUpToEvent.senderAddress = eventLog.args.senderAddress; - exchangeCancelUpToEvent.orderEpoch = eventLog.args.orderEpoch; - return exchangeCancelUpToEvent; -} diff --git a/packages/pipeline/src/parsers/events/index.ts b/packages/pipeline/src/parsers/events/index.ts deleted file mode 100644 index 3f9915e8b..000000000 --- a/packages/pipeline/src/parsers/events/index.ts +++ /dev/null @@ -1,2 +0,0 @@ -export { parseExchangeCancelEvents, parseExchangeCancelUpToEvents, parseExchangeFillEvents } from './exchange_events'; -export { parseERC20ApprovalEvents } from './erc20_events'; diff --git a/packages/pipeline/src/parsers/idex_orders/index.ts b/packages/pipeline/src/parsers/idex_orders/index.ts deleted file mode 100644 index 14b871195..000000000 --- a/packages/pipeline/src/parsers/idex_orders/index.ts +++ /dev/null @@ -1,81 +0,0 @@ -import { BigNumber } from '@0x/utils'; - -import { aggregateOrders } from '../utils'; - -import { IdexOrderbook, IdexOrderParam } from '../../data_sources/idex'; -import { TokenOrderbookSnapshot as TokenOrder } from '../../entities'; -import { OrderType } from '../../types'; - -/** - * Marque function of this file. - * 1) Takes in orders from an orderbook, - * 2) Aggregates them by price point, - * 3) Parses them into entities which are then saved into the database. - * @param idexOrderbook raw orderbook that we pull from the Idex API. - * @param observedTimestamp Time at which the orders for the market were pulled. - * @param source The exchange where these orders are placed. In this case 'idex'. - */ -export function parseIdexOrders(idexOrderbook: IdexOrderbook, observedTimestamp: number, source: string): TokenOrder[] { - const aggregatedBids = aggregateOrders(idexOrderbook.bids); - // Any of the bid orders' params will work - const idexBidOrder = idexOrderbook.bids[0]; - const parsedBids = - aggregatedBids.length > 0 - ? aggregatedBids.map(order => - parseIdexOrder(idexBidOrder.params, observedTimestamp, OrderType.Bid, source, order), - ) - : []; - - const aggregatedAsks = aggregateOrders(idexOrderbook.asks); - // Any of the ask orders' params will work - const idexAskOrder = idexOrderbook.asks[0]; - const parsedAsks = - aggregatedAsks.length > 0 - ? aggregatedAsks.map(order => - parseIdexOrder(idexAskOrder.params, observedTimestamp, OrderType.Ask, source, order), - ) - : []; - return parsedBids.concat(parsedAsks); -} - -/** - * Parse a single aggregated Idex order in order to form a tokenOrder entity - * which can be saved into the database. - * @param idexOrderParam An object containing information about the market where these - * trades have been placed. - * @param observedTimestamp The time when the API response returned back to us. - * @param orderType 'bid' or 'ask' enum. - * @param source Exchange where these orders were placed. - * @param idexOrder A <price, amount> tuple which we will convert to volume-basis. - */ -export function parseIdexOrder( - idexOrderParam: IdexOrderParam, - observedTimestamp: number, - orderType: OrderType, - source: string, - idexOrder: [string, BigNumber], -): TokenOrder { - const tokenOrder = new TokenOrder(); - const price = new BigNumber(idexOrder[0]); - const amount = idexOrder[1]; - - tokenOrder.source = source; - tokenOrder.observedTimestamp = observedTimestamp; - tokenOrder.orderType = orderType; - tokenOrder.price = price; - tokenOrder.baseVolume = amount; - tokenOrder.quoteVolume = price.times(amount); - - if (orderType === OrderType.Bid) { - tokenOrder.baseAssetSymbol = idexOrderParam.buySymbol; - tokenOrder.baseAssetAddress = idexOrderParam.tokenBuy; - tokenOrder.quoteAssetSymbol = idexOrderParam.sellSymbol; - tokenOrder.quoteAssetAddress = idexOrderParam.tokenSell; - } else { - tokenOrder.baseAssetSymbol = idexOrderParam.sellSymbol; - tokenOrder.baseAssetAddress = idexOrderParam.tokenSell; - tokenOrder.quoteAssetSymbol = idexOrderParam.buySymbol; - tokenOrder.quoteAssetAddress = idexOrderParam.tokenBuy; - } - return tokenOrder; -} diff --git a/packages/pipeline/src/parsers/oasis_orders/index.ts b/packages/pipeline/src/parsers/oasis_orders/index.ts deleted file mode 100644 index b71fb65b9..000000000 --- a/packages/pipeline/src/parsers/oasis_orders/index.ts +++ /dev/null @@ -1,71 +0,0 @@ -import { BigNumber } from '@0x/utils'; -import * as R from 'ramda'; - -import { aggregateOrders } from '../utils'; - -import { OasisMarket, OasisOrder } from '../../data_sources/oasis'; -import { TokenOrderbookSnapshot as TokenOrder } from '../../entities'; -import { OrderType } from '../../types'; - -/** - * Marque function of this file. - * 1) Takes in orders from an orderbook, - * 2) Aggregates them according to price point, - * 3) Builds TokenOrder entity with other information attached. - * @param oasisOrderbook A raw orderbook that we pull from the Oasis API. - * @param oasisMarket An object containing market data also directly from the API. - * @param observedTimestamp Time at which the orders for the market were pulled. - * @param source The exchange where these orders are placed. In this case 'oasis'. - */ -export function parseOasisOrders( - oasisOrderbook: OasisOrder[], - oasisMarket: OasisMarket, - observedTimestamp: number, - source: string, -): TokenOrder[] { - const aggregatedBids = aggregateOrders(R.filter(R.propEq('act', OrderType.Bid), oasisOrderbook)); - const aggregatedAsks = aggregateOrders(R.filter(R.propEq('act', OrderType.Ask), oasisOrderbook)); - const parsedBids = aggregatedBids.map(order => - parseOasisOrder(oasisMarket, observedTimestamp, OrderType.Bid, source, order), - ); - const parsedAsks = aggregatedAsks.map(order => - parseOasisOrder(oasisMarket, observedTimestamp, OrderType.Ask, source, order), - ); - return parsedBids.concat(parsedAsks); -} - -/** - * Parse a single aggregated Oasis order to form a tokenOrder entity - * which can be saved into the database. - * @param oasisMarket An object containing information about the market where these - * trades have been placed. - * @param observedTimestamp The time when the API response returned back to us. - * @param orderType 'bid' or 'ask' enum. - * @param source Exchange where these orders were placed. - * @param oasisOrder A <price, amount> tuple which we will convert to volume-basis. - */ -export function parseOasisOrder( - oasisMarket: OasisMarket, - observedTimestamp: number, - orderType: OrderType, - source: string, - oasisOrder: [string, BigNumber], -): TokenOrder { - const tokenOrder = new TokenOrder(); - const price = new BigNumber(oasisOrder[0]); - const amount = oasisOrder[1]; - - tokenOrder.source = source; - tokenOrder.observedTimestamp = observedTimestamp; - tokenOrder.orderType = orderType; - tokenOrder.price = price; - - tokenOrder.baseAssetSymbol = oasisMarket.base; - tokenOrder.baseAssetAddress = null; // Oasis doesn't provide address information - tokenOrder.baseVolume = amount; - - tokenOrder.quoteAssetSymbol = oasisMarket.quote; - tokenOrder.quoteAssetAddress = null; // Oasis doesn't provide address information - tokenOrder.quoteVolume = price.times(amount); - return tokenOrder; -} diff --git a/packages/pipeline/src/parsers/ohlcv_external/crypto_compare.ts b/packages/pipeline/src/parsers/ohlcv_external/crypto_compare.ts deleted file mode 100644 index 3efb90384..000000000 --- a/packages/pipeline/src/parsers/ohlcv_external/crypto_compare.ts +++ /dev/null @@ -1,38 +0,0 @@ -import { CryptoCompareOHLCVRecord } from '../../data_sources/ohlcv_external/crypto_compare'; -import { OHLCVExternal } from '../../entities'; - -const ONE_SECOND = 1000; // Crypto Compare uses timestamps in seconds instead of milliseconds - -export interface OHLCVMetadata { - exchange: string; - fromSymbol: string; - toSymbol: string; - source: string; - observedTimestamp: number; - interval: number; -} -/** - * Parses OHLCV records from Crypto Compare into an array of OHLCVExternal entities - * @param rawRecords an array of OHLCV records from Crypto Compare (not the full response) - */ -export function parseRecords(rawRecords: CryptoCompareOHLCVRecord[], metadata: OHLCVMetadata): OHLCVExternal[] { - return rawRecords.map(rec => { - const ohlcvRecord = new OHLCVExternal(); - ohlcvRecord.exchange = metadata.exchange; - ohlcvRecord.fromSymbol = metadata.fromSymbol; - ohlcvRecord.toSymbol = metadata.toSymbol; - ohlcvRecord.startTime = rec.time * ONE_SECOND - metadata.interval; - ohlcvRecord.endTime = rec.time * ONE_SECOND; - - ohlcvRecord.open = rec.open; - ohlcvRecord.close = rec.close; - ohlcvRecord.low = rec.low; - ohlcvRecord.high = rec.high; - ohlcvRecord.volumeFrom = rec.volumefrom; - ohlcvRecord.volumeTo = rec.volumeto; - - ohlcvRecord.source = metadata.source; - ohlcvRecord.observedTimestamp = metadata.observedTimestamp; - return ohlcvRecord; - }); -} diff --git a/packages/pipeline/src/parsers/paradex_orders/index.ts b/packages/pipeline/src/parsers/paradex_orders/index.ts deleted file mode 100644 index 85990dae4..000000000 --- a/packages/pipeline/src/parsers/paradex_orders/index.ts +++ /dev/null @@ -1,66 +0,0 @@ -import { BigNumber } from '@0x/utils'; - -import { ParadexMarket, ParadexOrder, ParadexOrderbookResponse } from '../../data_sources/paradex'; -import { TokenOrderbookSnapshot as TokenOrder } from '../../entities'; -import { OrderType } from '../../types'; - -/** - * Marque function of this file. - * 1) Takes in orders from an orderbook (orders are already aggregated by price point), - * 2) For each aggregated order, forms a TokenOrder entity with market data and - * other information attached. - * @param paradexOrderbookResponse An orderbook response from the Paradex API. - * @param paradexMarket An object containing market data also directly from the API. - * @param observedTimestamp Time at which the orders for the market were pulled. - * @param source The exchange where these orders are placed. In this case 'paradex'. - */ -export function parseParadexOrders( - paradexOrderbookResponse: ParadexOrderbookResponse, - paradexMarket: ParadexMarket, - observedTimestamp: number, - source: string, -): TokenOrder[] { - const parsedBids = paradexOrderbookResponse.bids.map(order => - parseParadexOrder(paradexMarket, observedTimestamp, OrderType.Bid, source, order), - ); - const parsedAsks = paradexOrderbookResponse.asks.map(order => - parseParadexOrder(paradexMarket, observedTimestamp, OrderType.Ask, source, order), - ); - return parsedBids.concat(parsedAsks); -} - -/** - * Parse a single aggregated Ddex order in order to form a tokenOrder entity - * which can be saved into the database. - * @param paradexMarket An object containing information about the market where these - * orders have been placed. - * @param observedTimestamp The time when the API response returned back to us. - * @param orderType 'bid' or 'ask' enum. - * @param source Exchange where these orders were placed. - * @param paradexOrder A ParadexOrder object; basically price, amount tuple. - */ -export function parseParadexOrder( - paradexMarket: ParadexMarket, - observedTimestamp: number, - orderType: OrderType, - source: string, - paradexOrder: ParadexOrder, -): TokenOrder { - const tokenOrder = new TokenOrder(); - const price = new BigNumber(paradexOrder.price); - const amount = new BigNumber(paradexOrder.amount); - - tokenOrder.source = source; - tokenOrder.observedTimestamp = observedTimestamp; - tokenOrder.orderType = orderType; - tokenOrder.price = price; - - tokenOrder.baseAssetSymbol = paradexMarket.baseToken; - tokenOrder.baseAssetAddress = paradexMarket.baseTokenAddress as string; - tokenOrder.baseVolume = amount; - - tokenOrder.quoteAssetSymbol = paradexMarket.quoteToken; - tokenOrder.quoteAssetAddress = paradexMarket.quoteTokenAddress as string; - tokenOrder.quoteVolume = price.times(amount); - return tokenOrder; -} diff --git a/packages/pipeline/src/parsers/relayer_registry/index.ts b/packages/pipeline/src/parsers/relayer_registry/index.ts deleted file mode 100644 index 9723880a4..000000000 --- a/packages/pipeline/src/parsers/relayer_registry/index.ts +++ /dev/null @@ -1,37 +0,0 @@ -import * as R from 'ramda'; - -import { RelayerResponse, RelayerResponseNetwork } from '../../data_sources/relayer-registry'; -import { Relayer } from '../../entities'; - -/** - * Parses a raw relayer registry response into an array of Relayer entities. - * @param rawResp raw response from the relayer-registry json file. - */ -export function parseRelayers(rawResp: Map<string, RelayerResponse>): Relayer[] { - const parsedAsObject = R.mapObjIndexed(parseRelayer, rawResp); - return R.values(parsedAsObject); -} - -function parseRelayer(relayerResp: RelayerResponse, uuid: string): Relayer { - const relayer = new Relayer(); - relayer.uuid = uuid; - relayer.name = relayerResp.name; - relayer.homepageUrl = relayerResp.homepage_url; - relayer.appUrl = relayerResp.app_url; - const mainNetworkRelayerInfo = getMainNetwork(relayerResp); - if (mainNetworkRelayerInfo !== undefined) { - relayer.sraHttpEndpoint = mainNetworkRelayerInfo.sra_http_endpoint || null; - relayer.sraWsEndpoint = mainNetworkRelayerInfo.sra_ws_endpoint || null; - relayer.feeRecipientAddresses = - R.path(['static_order_fields', 'fee_recipient_addresses'], mainNetworkRelayerInfo) || []; - relayer.takerAddresses = R.path(['static_order_fields', 'taker_addresses'], mainNetworkRelayerInfo) || []; - } else { - relayer.feeRecipientAddresses = []; - relayer.takerAddresses = []; - } - return relayer; -} - -function getMainNetwork(relayerResp: RelayerResponse): RelayerResponseNetwork | undefined { - return R.find(network => network.networkId === 1, relayerResp.networks); -} diff --git a/packages/pipeline/src/parsers/sra_orders/index.ts b/packages/pipeline/src/parsers/sra_orders/index.ts deleted file mode 100644 index 13fe632a4..000000000 --- a/packages/pipeline/src/parsers/sra_orders/index.ts +++ /dev/null @@ -1,68 +0,0 @@ -import { APIOrder, OrdersResponse } from '@0x/connect'; -import { assetDataUtils, orderHashUtils } from '@0x/order-utils'; -import { AssetProxyId, ERC721AssetData } from '@0x/types'; -import * as R from 'ramda'; - -import { SraOrder } from '../../entities'; -import { bigNumbertoStringOrNull, convertAssetProxyIdToType } from '../../utils'; - -/** - * Parses a raw order response from an SRA endpoint and returns an array of - * SraOrder entities. - * @param rawOrdersResponse A raw order response from an SRA endpoint. - */ -export function parseSraOrders(rawOrdersResponse: OrdersResponse): SraOrder[] { - return R.map(_convertToEntity, rawOrdersResponse.records); -} - -/** - * Converts a single APIOrder into an SraOrder entity. - * @param apiOrder A single order from the response from an SRA endpoint. - */ -export function _convertToEntity(apiOrder: APIOrder): SraOrder { - // TODO(albrow): refactor out common asset data decoding code. - const makerAssetData = assetDataUtils.decodeAssetDataOrThrow(apiOrder.order.makerAssetData); - const takerAssetData = assetDataUtils.decodeAssetDataOrThrow(apiOrder.order.takerAssetData); - - const sraOrder = new SraOrder(); - sraOrder.exchangeAddress = apiOrder.order.exchangeAddress; - sraOrder.orderHashHex = orderHashUtils.getOrderHashHex(apiOrder.order); - - sraOrder.makerAddress = apiOrder.order.makerAddress; - sraOrder.takerAddress = apiOrder.order.takerAddress; - sraOrder.feeRecipientAddress = apiOrder.order.feeRecipientAddress; - sraOrder.senderAddress = apiOrder.order.senderAddress; - sraOrder.makerAssetAmount = apiOrder.order.makerAssetAmount; - sraOrder.takerAssetAmount = apiOrder.order.takerAssetAmount; - sraOrder.makerFee = apiOrder.order.makerFee; - sraOrder.takerFee = apiOrder.order.takerFee; - sraOrder.expirationTimeSeconds = apiOrder.order.expirationTimeSeconds; - sraOrder.salt = apiOrder.order.salt; - sraOrder.signature = apiOrder.order.signature; - - sraOrder.rawMakerAssetData = apiOrder.order.makerAssetData; - // tslint:disable-next-line:no-unnecessary-type-assertion - sraOrder.makerAssetType = convertAssetProxyIdToType(makerAssetData.assetProxyId as AssetProxyId); - sraOrder.makerAssetProxyId = makerAssetData.assetProxyId; - // HACK(abandeali1): this event schema currently does not support multiple maker/taker assets, so we store the first token address from the MultiAssetProxy assetData - sraOrder.makerTokenAddress = assetDataUtils.isMultiAssetData(makerAssetData) - ? assetDataUtils.decodeMultiAssetDataRecursively(apiOrder.order.makerAssetData).nestedAssetData[0].tokenAddress - : makerAssetData.tokenAddress; - // tslint has a false positive here. Type assertion is required. - // tslint:disable-next-line:no-unnecessary-type-assertion - sraOrder.makerTokenId = bigNumbertoStringOrNull((makerAssetData as ERC721AssetData).tokenId); - sraOrder.rawTakerAssetData = apiOrder.order.takerAssetData; - // tslint:disable-next-line:no-unnecessary-type-assertion - sraOrder.takerAssetType = convertAssetProxyIdToType(takerAssetData.assetProxyId as AssetProxyId); - sraOrder.takerAssetProxyId = takerAssetData.assetProxyId; - // HACK(abandeali1): this event schema currently does not support multiple maker/taker assets, so we store the first token address from the MultiAssetProxy assetData - sraOrder.takerTokenAddress = assetDataUtils.isMultiAssetData(takerAssetData) - ? assetDataUtils.decodeMultiAssetDataRecursively(apiOrder.order.takerAssetData).nestedAssetData[0].tokenAddress - : takerAssetData.tokenAddress; - // tslint:disable-next-line:no-unnecessary-type-assertion - sraOrder.takerTokenId = bigNumbertoStringOrNull((takerAssetData as ERC721AssetData).tokenId); - - sraOrder.metadataJson = JSON.stringify(apiOrder.metaData); - - return sraOrder; -} diff --git a/packages/pipeline/src/parsers/token_metadata/index.ts b/packages/pipeline/src/parsers/token_metadata/index.ts deleted file mode 100644 index 65e0aaa6e..000000000 --- a/packages/pipeline/src/parsers/token_metadata/index.ts +++ /dev/null @@ -1,46 +0,0 @@ -import * as R from 'ramda'; - -import { MetamaskTrustedTokenMeta, ZeroExTrustedTokenMeta } from '../../data_sources/trusted_tokens'; -import { TokenMetadata } from '../../entities'; -import { toBigNumberOrNull } from '../../utils'; - -/** - * Parses Metamask's trusted tokens list. - * @param rawResp raw response from the metamask json file. - */ -export function parseMetamaskTrustedTokens(rawResp: Map<string, MetamaskTrustedTokenMeta>): TokenMetadata[] { - const parsedAsObject = R.mapObjIndexed(parseMetamaskTrustedToken, rawResp); - return R.values(parsedAsObject); -} - -/** - * Parses 0x's trusted tokens list. - * @param rawResp raw response from the 0x trusted tokens file. - */ -export function parseZeroExTrustedTokens(rawResp: ZeroExTrustedTokenMeta[]): TokenMetadata[] { - return R.map(parseZeroExTrustedToken, rawResp); -} - -function parseMetamaskTrustedToken(resp: MetamaskTrustedTokenMeta, address: string): TokenMetadata { - const trustedToken = new TokenMetadata(); - - trustedToken.address = address; - trustedToken.decimals = toBigNumberOrNull(resp.decimals); - trustedToken.symbol = resp.symbol; - trustedToken.name = resp.name; - trustedToken.authority = 'metamask'; - - return trustedToken; -} - -function parseZeroExTrustedToken(resp: ZeroExTrustedTokenMeta): TokenMetadata { - const trustedToken = new TokenMetadata(); - - trustedToken.address = resp.address; - trustedToken.decimals = toBigNumberOrNull(resp.decimals); - trustedToken.symbol = resp.symbol; - trustedToken.name = resp.name; - trustedToken.authority = '0x'; - - return trustedToken; -} diff --git a/packages/pipeline/src/parsers/utils.ts b/packages/pipeline/src/parsers/utils.ts deleted file mode 100644 index 860729e9f..000000000 --- a/packages/pipeline/src/parsers/utils.ts +++ /dev/null @@ -1,28 +0,0 @@ -import { BigNumber } from '@0x/utils'; - -export interface GenericRawOrder { - price: string; - amount: string; -} - -/** - * Aggregates individual orders by price point. Filters zero amount orders. - * @param rawOrders An array of objects that have price and amount information. - */ -export function aggregateOrders(rawOrders: GenericRawOrder[]): Array<[string, BigNumber]> { - const aggregatedOrders = new Map<string, BigNumber>(); - rawOrders.forEach(order => { - const amount = new BigNumber(order.amount); - if (amount.isZero()) { - return; - } - // Use string instead of BigNum to aggregate by value instead of variable. - // Convert to BigNumber first to consolidate different string - // representations of the same number. Eg. '0.0' and '0.00'. - const price = new BigNumber(order.price).toString(); - - const existingAmount = aggregatedOrders.get(price) || new BigNumber(0); - aggregatedOrders.set(price, amount.plus(existingAmount)); - }); - return Array.from(aggregatedOrders.entries()); -} diff --git a/packages/pipeline/src/parsers/web3/index.ts b/packages/pipeline/src/parsers/web3/index.ts deleted file mode 100644 index f986efc59..000000000 --- a/packages/pipeline/src/parsers/web3/index.ts +++ /dev/null @@ -1,49 +0,0 @@ -import { BigNumber } from '@0x/utils'; -import { BlockWithoutTransactionData, Transaction as EthTransaction } from 'ethereum-types'; - -import { Block, Transaction } from '../../entities'; - -const MILLISECONDS_PER_SECOND = 1000; - -/** - * Parses a raw block and returns a Block entity. - * @param rawBlock a raw block (e.g. returned from web3-wrapper). - */ -export function parseBlock(rawBlock: BlockWithoutTransactionData): Block { - if (rawBlock.hash == null) { - throw new Error('Tried to parse raw block but hash was null'); - } - if (rawBlock.number == null) { - throw new Error('Tried to parse raw block but number was null'); - } - - const block = new Block(); - block.hash = rawBlock.hash; - block.number = rawBlock.number; - // Block timestamps are in seconds, but we use milliseconds everywhere else. - block.timestamp = rawBlock.timestamp * MILLISECONDS_PER_SECOND; - return block; -} - -/** - * Parses a raw transaction and returns a Transaction entity. - * @param rawBlock a raw transaction (e.g. returned from web3-wrapper). - */ -export function parseTransaction(rawTransaction: EthTransaction): Transaction { - if (rawTransaction.blockHash == null) { - throw new Error('Tried to parse raw transaction but blockHash was null'); - } - if (rawTransaction.blockNumber == null) { - throw new Error('Tried to parse raw transaction but blockNumber was null'); - } - - const tx = new Transaction(); - tx.transactionHash = rawTransaction.hash; - tx.blockHash = rawTransaction.blockHash; - tx.blockNumber = rawTransaction.blockNumber; - - tx.gasUsed = new BigNumber(rawTransaction.gas); - tx.gasPrice = rawTransaction.gasPrice; - - return tx; -} diff --git a/packages/pipeline/src/scripts/pull_competing_dex_trades.ts b/packages/pipeline/src/scripts/pull_competing_dex_trades.ts deleted file mode 100644 index 14644bb2e..000000000 --- a/packages/pipeline/src/scripts/pull_competing_dex_trades.ts +++ /dev/null @@ -1,52 +0,0 @@ -import 'reflect-metadata'; -import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; - -import { logUtils } from '@0x/utils'; - -import { BloxySource } from '../data_sources/bloxy'; -import { DexTrade } from '../entities'; -import * as ormConfig from '../ormconfig'; -import { parseBloxyTrades } from '../parsers/bloxy'; -import { handleError } from '../utils'; - -// Number of trades to save at once. -const BATCH_SAVE_SIZE = 1000; - -let connection: Connection; - -(async () => { - connection = await createConnection(ormConfig as ConnectionOptions); - await getAndSaveTradesAsync(); - process.exit(0); -})().catch(handleError); - -async function getAndSaveTradesAsync(): Promise<void> { - const apiKey = process.env.BLOXY_API_KEY; - if (apiKey === undefined) { - throw new Error('Missing required env var: BLOXY_API_KEY'); - } - const bloxySource = new BloxySource(apiKey); - const tradesRepository = connection.getRepository(DexTrade); - const lastSeenTimestamp = await getLastSeenTimestampAsync(tradesRepository); - logUtils.log(`Last seen timestamp: ${lastSeenTimestamp === 0 ? 'none' : lastSeenTimestamp}`); - logUtils.log('Getting latest dex trades...'); - const rawTrades = await bloxySource.getDexTradesAsync(lastSeenTimestamp); - logUtils.log(`Parsing ${rawTrades.length} trades...`); - const trades = parseBloxyTrades(rawTrades); - logUtils.log(`Saving ${trades.length} trades...`); - await tradesRepository.save(trades, { chunk: Math.ceil(trades.length / BATCH_SAVE_SIZE) }); - logUtils.log('Done saving trades.'); -} - -async function getLastSeenTimestampAsync(tradesRepository: Repository<DexTrade>): Promise<number> { - if ((await tradesRepository.count()) === 0) { - return 0; - } - const response = (await connection.query( - 'SELECT tx_timestamp FROM raw.dex_trades ORDER BY tx_timestamp DESC LIMIT 1', - )) as Array<{ tx_timestamp: number }>; - if (response.length === 0) { - return 0; - } - return response[0].tx_timestamp; -} diff --git a/packages/pipeline/src/scripts/pull_copper.ts b/packages/pipeline/src/scripts/pull_copper.ts deleted file mode 100644 index 5e4a6a643..000000000 --- a/packages/pipeline/src/scripts/pull_copper.ts +++ /dev/null @@ -1,130 +0,0 @@ -import * as R from 'ramda'; -import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; - -import { logUtils } from '@0x/utils'; - -import { CopperEndpoint, CopperSearchParams, CopperSource } from '../data_sources/copper'; -import { CopperActivity, CopperActivityType, CopperCustomField, CopperLead, CopperOpportunity } from '../entities'; -import * as ormConfig from '../ormconfig'; -import { - CopperSearchResponse, - parseActivities, - parseActivityTypes, - parseCustomFields, - parseLeads, - parseOpportunities, -} from '../parsers/copper'; -import { handleError } from '../utils'; -const ONE_SECOND = 1000; -const COPPER_RATE_LIMIT = 10; -let connection: Connection; - -(async () => { - connection = await createConnection(ormConfig as ConnectionOptions); - - const accessToken = process.env.COPPER_ACCESS_TOKEN; - const userEmail = process.env.COPPER_USER_EMAIL; - if (accessToken === undefined || userEmail === undefined) { - throw new Error('Missing required env var: COPPER_ACCESS_TOKEN and/or COPPER_USER_EMAIL'); - } - const source = new CopperSource(COPPER_RATE_LIMIT, accessToken, userEmail); - - const fetchPromises = [ - fetchAndSaveLeadsAsync(source), - fetchAndSaveOpportunitiesAsync(source), - fetchAndSaveActivitiesAsync(source), - fetchAndSaveCustomFieldsAsync(source), - fetchAndSaveActivityTypesAsync(source), - ]; - fetchPromises.forEach(async fn => { - await fn; - }); -})().catch(handleError); - -async function fetchAndSaveLeadsAsync(source: CopperSource): Promise<void> { - const repository = connection.getRepository(CopperLead); - const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_leads'); - logUtils.log(`Fetching Copper leads starting from ${startTime}...`); - await fetchAndSaveAsync(CopperEndpoint.Leads, source, startTime, {}, parseLeads, repository); -} - -async function fetchAndSaveOpportunitiesAsync(source: CopperSource): Promise<void> { - const repository = connection.getRepository(CopperOpportunity); - const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_opportunities'); - logUtils.log(`Fetching Copper opportunities starting from ${startTime}...`); - await fetchAndSaveAsync( - CopperEndpoint.Opportunities, - source, - startTime, - { sort_by: 'name' }, - parseOpportunities, - repository, - ); -} - -async function fetchAndSaveActivitiesAsync(source: CopperSource): Promise<void> { - const repository = connection.getRepository(CopperActivity); - const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_activities'); - const searchParams = { - minimum_activity_date: Math.floor(startTime / ONE_SECOND), - }; - logUtils.log(`Fetching Copper activities starting from ${startTime}...`); - await fetchAndSaveAsync(CopperEndpoint.Activities, source, startTime, searchParams, parseActivities, repository); -} - -async function getMaxAsync(conn: Connection, sortColumn: string, tableName: string): Promise<number> { - const queryResult = await conn.query(`SELECT MAX(${sortColumn}) as _max from ${tableName};`); - if (R.isEmpty(queryResult)) { - return 0; - } else { - return queryResult[0]._max; - } -} - -// (Xianny): Copper API doesn't allow queries to filter by date. To ensure that we are filling in ascending chronological -// order and not missing any records, we are scraping all available pages. If Copper data gets larger, -// it would make sense to search for and start filling from the first page that contains a new record. -// This search would increase our network calls and is not efficient to implement with our current small volume -// of Copper records. -async function fetchAndSaveAsync<T extends CopperSearchResponse, E>( - endpoint: CopperEndpoint, - source: CopperSource, - startTime: number, - searchParams: CopperSearchParams, - parseFn: (recs: T[]) => E[], - repository: Repository<E>, -): Promise<void> { - let saved = 0; - const numPages = await source.fetchNumberOfPagesAsync(endpoint); - try { - for (let i = numPages; i > 0; i--) { - logUtils.log(`Fetching page ${i}/${numPages} of ${endpoint}...`); - const raw = await source.fetchSearchResultsAsync<T>(endpoint, { - ...searchParams, - page_number: i, - }); - const newRecords = raw.filter(rec => rec.date_modified * ONE_SECOND > startTime); - const parsed = parseFn(newRecords); - await repository.save<any>(parsed); - saved += newRecords.length; - } - } catch (err) { - logUtils.log(`Error fetching ${endpoint}, stopping: ${err.stack}`); - } finally { - logUtils.log(`Saved ${saved} items from ${endpoint}, done.`); - } -} - -async function fetchAndSaveActivityTypesAsync(source: CopperSource): Promise<void> { - logUtils.log(`Fetching Copper activity types...`); - const activityTypes = await source.fetchActivityTypesAsync(); - const repository = connection.getRepository(CopperActivityType); - await repository.save(parseActivityTypes(activityTypes)); -} - -async function fetchAndSaveCustomFieldsAsync(source: CopperSource): Promise<void> { - logUtils.log(`Fetching Copper custom fields...`); - const customFields = await source.fetchCustomFieldsAsync(); - const repository = connection.getRepository(CopperCustomField); - await repository.save(parseCustomFields(customFields)); -} diff --git a/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts deleted file mode 100644 index 4e00f258f..000000000 --- a/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts +++ /dev/null @@ -1,55 +0,0 @@ -import { logUtils } from '@0x/utils'; -import * as R from 'ramda'; -import { Connection, ConnectionOptions, createConnection } from 'typeorm'; - -import { DDEX_SOURCE, DdexMarket, DdexSource } from '../data_sources/ddex'; -import { TokenOrderbookSnapshot as TokenOrder } from '../entities'; -import * as ormConfig from '../ormconfig'; -import { parseDdexOrders } from '../parsers/ddex_orders'; -import { handleError } from '../utils'; - -// Number of orders to save at once. -const BATCH_SAVE_SIZE = 1000; - -// Number of markets to retrieve orderbooks for at once. -const MARKET_ORDERBOOK_REQUEST_BATCH_SIZE = 50; - -// Delay between market orderbook requests. -const MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY = 5000; - -let connection: Connection; - -(async () => { - connection = await createConnection(ormConfig as ConnectionOptions); - const ddexSource = new DdexSource(); - const markets = await ddexSource.getActiveMarketsAsync(); - for (const marketsChunk of R.splitEvery(MARKET_ORDERBOOK_REQUEST_BATCH_SIZE, markets)) { - await Promise.all( - marketsChunk.map(async (market: DdexMarket) => getAndSaveMarketOrderbookAsync(ddexSource, market)), - ); - await new Promise<void>(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY)); - } - process.exit(0); -})().catch(handleError); - -/** - * Retrieve orderbook from Ddex API for a given market. Parse orders and insert - * them into our database. - * @param ddexSource Data source which can query Ddex API. - * @param market Object from Ddex API containing market data. - */ -async function getAndSaveMarketOrderbookAsync(ddexSource: DdexSource, market: DdexMarket): Promise<void> { - const orderBook = await ddexSource.getMarketOrderbookAsync(market.id); - const observedTimestamp = Date.now(); - - logUtils.log(`${market.id}: Parsing orders.`); - const orders = parseDdexOrders(orderBook, market, observedTimestamp, DDEX_SOURCE); - - if (orders.length > 0) { - logUtils.log(`${market.id}: Saving ${orders.length} orders.`); - const TokenOrderRepository = connection.getRepository(TokenOrder); - await TokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) }); - } else { - logUtils.log(`${market.id}: 0 orders to save.`); - } -} diff --git a/packages/pipeline/src/scripts/pull_erc20_events.ts b/packages/pipeline/src/scripts/pull_erc20_events.ts deleted file mode 100644 index bd520c610..000000000 --- a/packages/pipeline/src/scripts/pull_erc20_events.ts +++ /dev/null @@ -1,96 +0,0 @@ -import { getContractAddressesForNetworkOrThrow } from '@0x/contract-addresses'; -import { web3Factory } from '@0x/dev-utils'; -import { Web3ProviderEngine } from '@0x/subproviders'; -import { logUtils } from '@0x/utils'; -import { Web3Wrapper } from '@0x/web3-wrapper'; -import 'reflect-metadata'; -import { Connection, ConnectionOptions, createConnection } from 'typeorm'; - -import { ERC20EventsSource } from '../data_sources/contract-wrappers/erc20_events'; -import { ERC20ApprovalEvent } from '../entities'; -import * as ormConfig from '../ormconfig'; -import { parseERC20ApprovalEvents } from '../parsers/events'; -import { handleError, INFURA_ROOT_URL } from '../utils'; - -const NETWORK_ID = 1; -const START_BLOCK_OFFSET = 100; // Number of blocks before the last known block to consider when updating fill events. -const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. -const BLOCK_FINALITY_THRESHOLD = 10; // When to consider blocks as final. Used to compute default endBlock. - -let connection: Connection; - -interface Token { - // name is used for logging only. - name: string; - address: string; - defaultStartBlock: number; -} - -const tokensToGetApprovalEvents: Token[] = [ - { - name: 'WETH', - address: getContractAddressesForNetworkOrThrow(NETWORK_ID).etherToken, - defaultStartBlock: 4719568, // Block when the WETH contract was deployed. - }, - { - name: 'ZRX', - address: getContractAddressesForNetworkOrThrow(NETWORK_ID).zrxToken, - defaultStartBlock: 4145415, // Block when the ZRX contract was deployed. - }, - { - name: 'DAI', - address: '0x89d24a6b4ccb1b6faa2625fe562bdd9a23260359', - defaultStartBlock: 4752008, // Block when the DAI contract was deployed. - }, -]; - -(async () => { - connection = await createConnection(ormConfig as ConnectionOptions); - const provider = web3Factory.getRpcProvider({ - rpcUrl: INFURA_ROOT_URL, - }); - const endBlock = await calculateEndBlockAsync(provider); - for (const token of tokensToGetApprovalEvents) { - await getAndSaveApprovalEventsAsync(provider, token, endBlock); - } - process.exit(0); -})().catch(handleError); - -async function getAndSaveApprovalEventsAsync( - provider: Web3ProviderEngine, - token: Token, - endBlock: number, -): Promise<void> { - logUtils.log(`Getting approval events for ${token.name}...`); - logUtils.log('Checking existing approval events...'); - const repository = connection.getRepository(ERC20ApprovalEvent); - const startBlock = (await getStartBlockAsync(token)) || token.defaultStartBlock; - - logUtils.log(`Getting approval events starting at ${startBlock}...`); - const eventsSource = new ERC20EventsSource(provider, NETWORK_ID, token.address); - const eventLogs = await eventsSource.getApprovalEventsAsync(startBlock, endBlock); - - logUtils.log(`Parsing ${eventLogs.length} approval events...`); - const events = parseERC20ApprovalEvents(eventLogs); - logUtils.log(`Retrieved and parsed ${events.length} total approval events.`); - await repository.save(events, { chunk: Math.ceil(events.length / BATCH_SAVE_SIZE) }); -} - -async function calculateEndBlockAsync(provider: Web3ProviderEngine): Promise<number> { - const web3Wrapper = new Web3Wrapper(provider); - const currentBlock = await web3Wrapper.getBlockNumberAsync(); - return currentBlock - BLOCK_FINALITY_THRESHOLD; -} - -async function getStartBlockAsync(token: Token): Promise<number | null> { - const queryResult = await connection.query( - `SELECT block_number FROM raw.erc20_approval_events WHERE token_address = $1 ORDER BY block_number DESC LIMIT 1`, - [token.address], - ); - if (queryResult.length === 0) { - logUtils.log(`No existing approval events found for ${token.name}.`); - return null; - } - const lastKnownBlock = queryResult[0].block_number; - return lastKnownBlock - START_BLOCK_OFFSET; -} diff --git a/packages/pipeline/src/scripts/pull_exchange_events.ts b/packages/pipeline/src/scripts/pull_exchange_events.ts deleted file mode 100644 index f8ce4038d..000000000 --- a/packages/pipeline/src/scripts/pull_exchange_events.ts +++ /dev/null @@ -1,146 +0,0 @@ -import { web3Factory } from '@0x/dev-utils'; -import { Web3ProviderEngine } from '@0x/subproviders'; -import { logUtils } from '@0x/utils'; -import { Web3Wrapper } from '@0x/web3-wrapper'; -import R = require('ramda'); -import 'reflect-metadata'; -import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; - -import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; -import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeEvent, ExchangeFillEvent } from '../entities'; -import * as ormConfig from '../ormconfig'; -import { parseExchangeCancelEvents, parseExchangeCancelUpToEvents, parseExchangeFillEvents } from '../parsers/events'; -import { EXCHANGE_START_BLOCK, handleError, INFURA_ROOT_URL } from '../utils'; - -const START_BLOCK_OFFSET = 100; // Number of blocks before the last known block to consider when updating fill events. -const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. -const BLOCK_FINALITY_THRESHOLD = 10; // When to consider blocks as final. Used to compute default endBlock. - -let connection: Connection; - -(async () => { - connection = await createConnection(ormConfig as ConnectionOptions); - const provider = web3Factory.getRpcProvider({ - rpcUrl: INFURA_ROOT_URL, - }); - const endBlock = await calculateEndBlockAsync(provider); - const eventsSource = new ExchangeEventsSource(provider, 1); - await getFillEventsAsync(eventsSource, endBlock); - await getCancelEventsAsync(eventsSource, endBlock); - await getCancelUpToEventsAsync(eventsSource, endBlock); - process.exit(0); -})().catch(handleError); - -async function getFillEventsAsync(eventsSource: ExchangeEventsSource, endBlock: number): Promise<void> { - logUtils.log('Checking existing fill events...'); - const repository = connection.getRepository(ExchangeFillEvent); - const startBlock = await getStartBlockAsync(repository); - logUtils.log(`Getting fill events starting at ${startBlock}...`); - const eventLogs = await eventsSource.getFillEventsAsync(startBlock, endBlock); - logUtils.log('Parsing fill events...'); - const events = parseExchangeFillEvents(eventLogs); - logUtils.log(`Retrieved and parsed ${events.length} total fill events.`); - await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); -} - -async function getCancelEventsAsync(eventsSource: ExchangeEventsSource, endBlock: number): Promise<void> { - logUtils.log('Checking existing cancel events...'); - const repository = connection.getRepository(ExchangeCancelEvent); - const startBlock = await getStartBlockAsync(repository); - logUtils.log(`Getting cancel events starting at ${startBlock}...`); - const eventLogs = await eventsSource.getCancelEventsAsync(startBlock, endBlock); - logUtils.log('Parsing cancel events...'); - const events = parseExchangeCancelEvents(eventLogs); - logUtils.log(`Retrieved and parsed ${events.length} total cancel events.`); - await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); -} - -async function getCancelUpToEventsAsync(eventsSource: ExchangeEventsSource, endBlock: number): Promise<void> { - logUtils.log('Checking existing CancelUpTo events...'); - const repository = connection.getRepository(ExchangeCancelUpToEvent); - const startBlock = await getStartBlockAsync(repository); - logUtils.log(`Getting CancelUpTo events starting at ${startBlock}...`); - const eventLogs = await eventsSource.getCancelUpToEventsAsync(startBlock, endBlock); - logUtils.log('Parsing CancelUpTo events...'); - const events = parseExchangeCancelUpToEvents(eventLogs); - logUtils.log(`Retrieved and parsed ${events.length} total CancelUpTo events.`); - await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); -} - -const tableNameRegex = /^[a-zA-Z_]*$/; - -async function getStartBlockAsync<T extends ExchangeEvent>(repository: Repository<T>): Promise<number> { - const fillEventCount = await repository.count(); - if (fillEventCount === 0) { - logUtils.log(`No existing ${repository.metadata.name}s found.`); - return EXCHANGE_START_BLOCK; - } - const tableName = repository.metadata.tableName; - if (!tableNameRegex.test(tableName)) { - throw new Error(`Unexpected special character in table name: ${tableName}`); - } - const queryResult = await connection.query( - `SELECT block_number FROM raw.${tableName} ORDER BY block_number DESC LIMIT 1`, - ); - const lastKnownBlock = queryResult[0].block_number; - return lastKnownBlock - START_BLOCK_OFFSET; -} - -async function saveEventsAsync<T extends ExchangeEvent>( - isInitialPull: boolean, - repository: Repository<T>, - events: T[], -): Promise<void> { - logUtils.log(`Saving ${repository.metadata.name}s...`); - if (isInitialPull) { - // Split data into numChunks pieces of maximum size BATCH_SAVE_SIZE - // each. - for (const eventsBatch of R.splitEvery(BATCH_SAVE_SIZE, events)) { - await repository.insert(eventsBatch); - } - } else { - // If we possibly have some overlap where we need to update some - // existing events, we need to use our workaround/fallback. - await saveIndividuallyWithFallbackAsync(repository, events); - } - const totalEvents = await repository.count(); - logUtils.log(`Done saving events. There are now ${totalEvents} total ${repository.metadata.name}s.`); -} - -async function saveIndividuallyWithFallbackAsync<T extends ExchangeEvent>( - repository: Repository<T>, - events: T[], -): Promise<void> { - // Note(albrow): This is a temporary hack because `save` is not working as - // documented and is causing a foreign key constraint violation. Hopefully - // can remove later because this "poor man's upsert" implementation operates - // on one event at a time and is therefore much slower. - for (const event of events) { - try { - // First try an insert. - await repository.insert(event); - } catch { - // If it fails, assume it was a foreign key constraint error and try - // doing an update instead. - // Note(albrow): Unfortunately the `as any` hack here seems - // required. I can't figure out how to convince the type-checker - // that the criteria and the entity itself are the correct type for - // the given repository. If we can remove the `save` hack then this - // will probably no longer be necessary. - await repository.update( - { - contractAddress: event.contractAddress, - blockNumber: event.blockNumber, - logIndex: event.logIndex, - } as any, - event as any, - ); - } - } -} - -async function calculateEndBlockAsync(provider: Web3ProviderEngine): Promise<number> { - const web3Wrapper = new Web3Wrapper(provider); - const currentBlock = await web3Wrapper.getBlockNumberAsync(); - return currentBlock - BLOCK_FINALITY_THRESHOLD; -} diff --git a/packages/pipeline/src/scripts/pull_idex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_idex_orderbook_snapshots.ts deleted file mode 100644 index 490b17766..000000000 --- a/packages/pipeline/src/scripts/pull_idex_orderbook_snapshots.ts +++ /dev/null @@ -1,63 +0,0 @@ -import { logUtils } from '@0x/utils'; -import * as R from 'ramda'; -import { Connection, ConnectionOptions, createConnection } from 'typeorm'; - -import { IDEX_SOURCE, IdexSource } from '../data_sources/idex'; -import { TokenOrderbookSnapshot as TokenOrder } from '../entities'; -import * as ormConfig from '../ormconfig'; -import { parseIdexOrders } from '../parsers/idex_orders'; -import { handleError } from '../utils'; - -// Number of orders to save at once. -const BATCH_SAVE_SIZE = 1000; - -// Number of markets to retrieve orderbooks for at once. -const MARKET_ORDERBOOK_REQUEST_BATCH_SIZE = 100; - -// Delay between market orderbook requests. -const MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY = 2000; - -let connection: Connection; - -(async () => { - connection = await createConnection(ormConfig as ConnectionOptions); - const idexSource = new IdexSource(); - logUtils.log('Getting all IDEX markets'); - const markets = await idexSource.getMarketsAsync(); - logUtils.log(`Got ${markets.length} markets.`); - for (const marketsChunk of R.splitEvery(MARKET_ORDERBOOK_REQUEST_BATCH_SIZE, markets)) { - await Promise.all( - marketsChunk.map(async (marketId: string) => getAndSaveMarketOrderbookAsync(idexSource, marketId)), - ); - await new Promise<void>(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY)); - } - process.exit(0); -})().catch(handleError); - -/** - * Retrieve orderbook from Idex API for a given market. Parse orders and insert - * them into our database. - * @param idexSource Data source which can query Idex API. - * @param marketId String representing market of interest, eg. 'ETH_TIC'. - */ -async function getAndSaveMarketOrderbookAsync(idexSource: IdexSource, marketId: string): Promise<void> { - logUtils.log(`${marketId}: Retrieving orderbook.`); - const orderBook = await idexSource.getMarketOrderbookAsync(marketId); - const observedTimestamp = Date.now(); - - if (!R.has('bids', orderBook) || !R.has('asks', orderBook)) { - logUtils.warn(`${marketId}: Orderbook faulty.`); - return; - } - - logUtils.log(`${marketId}: Parsing orders.`); - const orders = parseIdexOrders(orderBook, observedTimestamp, IDEX_SOURCE); - - if (orders.length > 0) { - logUtils.log(`${marketId}: Saving ${orders.length} orders.`); - const TokenOrderRepository = connection.getRepository(TokenOrder); - await TokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) }); - } else { - logUtils.log(`${marketId}: 0 orders to save.`); - } -} diff --git a/packages/pipeline/src/scripts/pull_missing_blocks.ts b/packages/pipeline/src/scripts/pull_missing_blocks.ts deleted file mode 100644 index 345ea38fe..000000000 --- a/packages/pipeline/src/scripts/pull_missing_blocks.ts +++ /dev/null @@ -1,91 +0,0 @@ -import { web3Factory } from '@0x/dev-utils'; -import { logUtils } from '@0x/utils'; - -import * as Parallel from 'async-parallel'; -import R = require('ramda'); -import 'reflect-metadata'; -import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; - -import { Web3Source } from '../data_sources/web3'; -import { Block } from '../entities'; -import * as ormConfig from '../ormconfig'; -import { parseBlock } from '../parsers/web3'; -import { handleError, INFURA_ROOT_URL } from '../utils'; - -// Number of blocks to save at once. -const BATCH_SAVE_SIZE = 1000; -// Maximum number of requests to send at once. -const MAX_CONCURRENCY = 20; -// Maximum number of blocks to query for at once. This is also the maximum -// number of blocks we will hold in memory prior to being saved to the database. -const MAX_BLOCKS_PER_QUERY = 1000; - -let connection: Connection; - -const tablesWithMissingBlocks = [ - 'raw.exchange_fill_events', - 'raw.exchange_cancel_events', - 'raw.exchange_cancel_up_to_events', - 'raw.erc20_approval_events', -]; - -(async () => { - connection = await createConnection(ormConfig as ConnectionOptions); - const provider = web3Factory.getRpcProvider({ - rpcUrl: INFURA_ROOT_URL, - }); - const web3Source = new Web3Source(provider); - for (const tableName of tablesWithMissingBlocks) { - await getAllMissingBlocksAsync(web3Source, tableName); - } - process.exit(0); -})().catch(handleError); - -interface MissingBlocksResponse { - block_number: string; -} - -async function getAllMissingBlocksAsync(web3Source: Web3Source, tableName: string): Promise<void> { - const blocksRepository = connection.getRepository(Block); - while (true) { - logUtils.log(`Checking for missing blocks in ${tableName}...`); - const blockNumbers = await getMissingBlockNumbersAsync(tableName); - if (blockNumbers.length === 0) { - // There are no more missing blocks. We're done. - break; - } - await getAndSaveBlocksAsync(web3Source, blocksRepository, blockNumbers); - } - const totalBlocks = await blocksRepository.count(); - logUtils.log(`Done saving blocks for ${tableName}. There are now ${totalBlocks} total blocks.`); -} - -async function getMissingBlockNumbersAsync(tableName: string): Promise<number[]> { - // This query returns up to `MAX_BLOCKS_PER_QUERY` distinct block numbers - // which are present in `tableName` but not in `raw.blocks`. - const response = (await connection.query( - `SELECT DISTINCT(block_number) FROM ${tableName} LEFT JOIN raw.blocks ON ${tableName}.block_number = raw.blocks.number WHERE number IS NULL LIMIT $1;`, - [MAX_BLOCKS_PER_QUERY], - )) as MissingBlocksResponse[]; - const blockNumberStrings = R.pluck('block_number', response); - const blockNumbers = R.map(parseInt, blockNumberStrings); - logUtils.log(`Found ${blockNumbers.length} missing blocks.`); - return blockNumbers; -} - -async function getAndSaveBlocksAsync( - web3Source: Web3Source, - blocksRepository: Repository<Block>, - blockNumbers: number[], -): Promise<void> { - logUtils.log(`Getting block data for ${blockNumbers.length} blocks...`); - Parallel.setConcurrency(MAX_CONCURRENCY); - const rawBlocks = await Parallel.map(blockNumbers, async (blockNumber: number) => - web3Source.getBlockInfoAsync(blockNumber), - ); - logUtils.log(`Parsing ${rawBlocks.length} blocks...`); - const blocks = R.map(parseBlock, rawBlocks); - logUtils.log(`Saving ${blocks.length} blocks...`); - await blocksRepository.save(blocks, { chunk: Math.ceil(blocks.length / BATCH_SAVE_SIZE) }); - logUtils.log('Done saving this batch of blocks'); -} diff --git a/packages/pipeline/src/scripts/pull_oasis_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_oasis_orderbook_snapshots.ts deleted file mode 100644 index c4dcf6c83..000000000 --- a/packages/pipeline/src/scripts/pull_oasis_orderbook_snapshots.ts +++ /dev/null @@ -1,58 +0,0 @@ -import { logUtils } from '@0x/utils'; -import * as R from 'ramda'; -import { Connection, ConnectionOptions, createConnection } from 'typeorm'; - -import { OASIS_SOURCE, OasisMarket, OasisSource } from '../data_sources/oasis'; -import { TokenOrderbookSnapshot as TokenOrder } from '../entities'; -import * as ormConfig from '../ormconfig'; -import { parseOasisOrders } from '../parsers/oasis_orders'; -import { handleError } from '../utils'; - -// Number of orders to save at once. -const BATCH_SAVE_SIZE = 1000; - -// Number of markets to retrieve orderbooks for at once. -const MARKET_ORDERBOOK_REQUEST_BATCH_SIZE = 50; - -// Delay between market orderbook requests. -const MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY = 1000; - -let connection: Connection; - -(async () => { - connection = await createConnection(ormConfig as ConnectionOptions); - const oasisSource = new OasisSource(); - logUtils.log('Getting all active Oasis markets'); - const markets = await oasisSource.getActiveMarketsAsync(); - logUtils.log(`Got ${markets.length} markets.`); - for (const marketsChunk of R.splitEvery(MARKET_ORDERBOOK_REQUEST_BATCH_SIZE, markets)) { - await Promise.all( - marketsChunk.map(async (market: OasisMarket) => getAndSaveMarketOrderbookAsync(oasisSource, market)), - ); - await new Promise<void>(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY)); - } - process.exit(0); -})().catch(handleError); - -/** - * Retrieve orderbook from Oasis API for a given market. Parse orders and insert - * them into our database. - * @param oasisSource Data source which can query Oasis API. - * @param marketId String identifying market we want data for. eg. 'REPAUG'. - */ -async function getAndSaveMarketOrderbookAsync(oasisSource: OasisSource, market: OasisMarket): Promise<void> { - logUtils.log(`${market.id}: Retrieving orderbook.`); - const orderBook = await oasisSource.getMarketOrderbookAsync(market.id); - const observedTimestamp = Date.now(); - - logUtils.log(`${market.id}: Parsing orders.`); - const orders = parseOasisOrders(orderBook, market, observedTimestamp, OASIS_SOURCE); - - if (orders.length > 0) { - logUtils.log(`${market.id}: Saving ${orders.length} orders.`); - const TokenOrderRepository = connection.getRepository(TokenOrder); - await TokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) }); - } else { - logUtils.log(`${market.id}: 0 orders to save.`); - } -} diff --git a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts deleted file mode 100644 index caac7b9d4..000000000 --- a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts +++ /dev/null @@ -1,96 +0,0 @@ -import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; - -import { logUtils } from '@0x/utils'; - -import { CryptoCompareOHLCVSource } from '../data_sources/ohlcv_external/crypto_compare'; -import { OHLCVExternal } from '../entities'; -import * as ormConfig from '../ormconfig'; -import { OHLCVMetadata, parseRecords } from '../parsers/ohlcv_external/crypto_compare'; -import { handleError } from '../utils'; -import { fetchOHLCVTradingPairsAsync, TradingPair } from '../utils/get_ohlcv_trading_pairs'; - -const SOURCE_NAME = 'CryptoCompare'; -const TWO_HOURS_AGO = new Date().getTime() - 2 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers - -const MAX_REQS_PER_SECOND = parseInt(process.env.CRYPTOCOMPARE_MAX_REQS_PER_SECOND || '15', 10); // tslint:disable-line:custom-no-magic-numbers -const EARLIEST_BACKFILL_DATE = process.env.OHLCV_EARLIEST_BACKFILL_DATE || '2014-06-01'; -const EARLIEST_BACKFILL_TIME = new Date(EARLIEST_BACKFILL_DATE).getTime(); - -let connection: Connection; - -(async () => { - connection = await createConnection(ormConfig as ConnectionOptions); - const repository = connection.getRepository(OHLCVExternal); - const source = new CryptoCompareOHLCVSource(MAX_REQS_PER_SECOND); - - const jobTime = new Date().getTime(); - const tradingPairs = await fetchOHLCVTradingPairsAsync(connection, SOURCE_NAME, EARLIEST_BACKFILL_TIME); - logUtils.log(`Starting ${tradingPairs.length} job(s) to scrape Crypto Compare for OHLCV records...`); - - const fetchAndSavePromises = tradingPairs.map(async pair => { - const pairs = source.generateBackfillIntervals(pair); - return fetchAndSaveAsync(source, repository, jobTime, pairs); - }); - await Promise.all(fetchAndSavePromises); - logUtils.log(`Finished scraping OHLCV records from Crypto Compare, exiting...`); - process.exit(0); -})().catch(handleError); - -async function fetchAndSaveAsync( - source: CryptoCompareOHLCVSource, - repository: Repository<OHLCVExternal>, - jobTime: number, - pairs: TradingPair[], -): Promise<void> { - const sortAscTimestamp = (a: TradingPair, b: TradingPair): number => { - if (a.latestSavedTime < b.latestSavedTime) { - return -1; - } else if (a.latestSavedTime > b.latestSavedTime) { - return 1; - } else { - return 0; - } - }; - pairs.sort(sortAscTimestamp); - - let i = 0; - while (i < pairs.length) { - const pair = pairs[i]; - if (pair.latestSavedTime > TWO_HOURS_AGO) { - break; - } - try { - const records = await source.getHourlyOHLCVAsync(pair); - logUtils.log(`Retrieved ${records.length} records for ${JSON.stringify(pair)}`); - if (records.length > 0) { - const metadata: OHLCVMetadata = { - exchange: source.defaultExchange, - fromSymbol: pair.fromSymbol, - toSymbol: pair.toSymbol, - source: SOURCE_NAME, - observedTimestamp: jobTime, - interval: source.intervalBetweenRecords, - }; - const parsedRecords = parseRecords(records, metadata); - await saveRecordsAsync(repository, parsedRecords); - } - i++; - } catch (err) { - logUtils.log(`Error scraping OHLCVRecords, stopping task for ${JSON.stringify(pair)} [${err}]`); - break; - } - } - return Promise.resolve(); -} - -async function saveRecordsAsync(repository: Repository<OHLCVExternal>, records: OHLCVExternal[]): Promise<void> { - const metadata = [ - records[0].fromSymbol, - records[0].toSymbol, - new Date(records[0].startTime), - new Date(records[records.length - 1].endTime), - ]; - - logUtils.log(`Saving ${records.length} records to ${repository.metadata.name}... ${JSON.stringify(metadata)}`); - await repository.save(records); -} diff --git a/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts deleted file mode 100644 index 34345f355..000000000 --- a/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts +++ /dev/null @@ -1,87 +0,0 @@ -import { logUtils } from '@0x/utils'; -import { Connection, ConnectionOptions, createConnection } from 'typeorm'; - -import { - PARADEX_SOURCE, - ParadexActiveMarketsResponse, - ParadexMarket, - ParadexSource, - ParadexTokenInfoResponse, -} from '../data_sources/paradex'; -import { TokenOrderbookSnapshot as TokenOrder } from '../entities'; -import * as ormConfig from '../ormconfig'; -import { parseParadexOrders } from '../parsers/paradex_orders'; -import { handleError } from '../utils'; - -// Number of orders to save at once. -const BATCH_SAVE_SIZE = 1000; - -let connection: Connection; - -(async () => { - connection = await createConnection(ormConfig as ConnectionOptions); - const apiKey = process.env.PARADEX_DATA_PIPELINE_API_KEY; - if (apiKey === undefined) { - throw new Error('Missing required env var: PARADEX_DATA_PIPELINE_API_KEY'); - } - const paradexSource = new ParadexSource(apiKey); - const markets = await paradexSource.getActiveMarketsAsync(); - const tokenInfoResponse = await paradexSource.getTokenInfoAsync(); - const extendedMarkets = addTokenAddresses(markets, tokenInfoResponse); - await Promise.all( - extendedMarkets.map(async (market: ParadexMarket) => getAndSaveMarketOrderbookAsync(paradexSource, market)), - ); - process.exit(0); -})().catch(handleError); - -/** - * Extend the default ParadexMarket objects with token addresses. - * @param markets An array of ParadexMarket objects. - * @param tokenInfoResponse An array of ParadexTokenInfo containing the addresses. - */ -function addTokenAddresses( - markets: ParadexActiveMarketsResponse, - tokenInfoResponse: ParadexTokenInfoResponse, -): ParadexMarket[] { - const symbolAddressMapping = new Map<string, string>(); - tokenInfoResponse.forEach(tokenInfo => symbolAddressMapping.set(tokenInfo.symbol, tokenInfo.address)); - - markets.forEach((market: ParadexMarket) => { - if (symbolAddressMapping.has(market.baseToken)) { - market.baseTokenAddress = symbolAddressMapping.get(market.baseToken); - } else { - market.quoteTokenAddress = ''; - logUtils.warn(`${market.baseToken}: No address found.`); - } - - if (symbolAddressMapping.has(market.quoteToken)) { - market.quoteTokenAddress = symbolAddressMapping.get(market.quoteToken); - } else { - market.quoteTokenAddress = ''; - logUtils.warn(`${market.quoteToken}: No address found.`); - } - }); - return markets; -} - -/** - * Retrieve orderbook from Paradex API for a given market. Parse orders and insert - * them into our database. - * @param paradexSource Data source which can query the Paradex API. - * @param market Object from the Paradex API with information about the market in question. - */ -async function getAndSaveMarketOrderbookAsync(paradexSource: ParadexSource, market: ParadexMarket): Promise<void> { - const paradexOrderbookResponse = await paradexSource.getMarketOrderbookAsync(market.symbol); - const observedTimestamp = Date.now(); - - logUtils.log(`${market.symbol}: Parsing orders.`); - const orders = parseParadexOrders(paradexOrderbookResponse, market, observedTimestamp, PARADEX_SOURCE); - - if (orders.length > 0) { - logUtils.log(`${market.symbol}: Saving ${orders.length} orders.`); - const tokenOrderRepository = connection.getRepository(TokenOrder); - await tokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) }); - } else { - logUtils.log(`${market.symbol}: 0 orders to save.`); - } -} diff --git a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts deleted file mode 100644 index 8e8720803..000000000 --- a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts +++ /dev/null @@ -1,62 +0,0 @@ -import { HttpClient } from '@0x/connect'; -import { logUtils } from '@0x/utils'; - -import * as R from 'ramda'; -import 'reflect-metadata'; -import { Connection, ConnectionOptions, createConnection, EntityManager } from 'typeorm'; - -import { createObservedTimestampForOrder, SraOrder } from '../entities'; -import * as ormConfig from '../ormconfig'; -import { parseSraOrders } from '../parsers/sra_orders'; -import { handleError } from '../utils'; - -const RADAR_RELAY_URL = 'https://api.radarrelay.com/0x/v2'; -const ORDERS_PER_PAGE = 10000; // Number of orders to get per request. - -let connection: Connection; - -(async () => { - connection = await createConnection(ormConfig as ConnectionOptions); - await getOrderbookAsync(); - process.exit(0); -})().catch(handleError); - -async function getOrderbookAsync(): Promise<void> { - logUtils.log('Getting all orders...'); - const connectClient = new HttpClient(RADAR_RELAY_URL); - const rawOrders = await connectClient.getOrdersAsync({ - perPage: ORDERS_PER_PAGE, - }); - logUtils.log(`Got ${rawOrders.records.length} orders.`); - logUtils.log('Parsing orders...'); - // Parse the sra orders, then add source url to each. - const orders = R.pipe( - parseSraOrders, - R.map(setSourceUrl(RADAR_RELAY_URL)), - )(rawOrders); - // Save all the orders and update the observed time stamps in a single - // transaction. - logUtils.log('Saving orders and updating timestamps...'); - const observedTimestamp = Date.now(); - await connection.transaction( - async (manager: EntityManager): Promise<void> => { - for (const order of orders) { - await manager.save(SraOrder, order); - const orderObservation = createObservedTimestampForOrder(order, observedTimestamp); - await manager.save(orderObservation); - } - }, - ); -} - -const sourceUrlProp = R.lensProp('sourceUrl'); - -/** - * Sets the source url for a single order. Returns a new order instead of - * mutating the given one. - */ -const setSourceUrl = R.curry( - (sourceURL: string, order: SraOrder): SraOrder => { - return R.set(sourceUrlProp, sourceURL, order); - }, -); diff --git a/packages/pipeline/src/scripts/pull_trusted_tokens.ts b/packages/pipeline/src/scripts/pull_trusted_tokens.ts deleted file mode 100644 index 8afb3e052..000000000 --- a/packages/pipeline/src/scripts/pull_trusted_tokens.ts +++ /dev/null @@ -1,48 +0,0 @@ -import 'reflect-metadata'; -import { Connection, ConnectionOptions, createConnection } from 'typeorm'; - -import { logUtils } from '@0x/utils'; - -import { MetamaskTrustedTokenMeta, TrustedTokenSource, ZeroExTrustedTokenMeta } from '../data_sources/trusted_tokens'; -import { TokenMetadata } from '../entities'; -import * as ormConfig from '../ormconfig'; -import { parseMetamaskTrustedTokens, parseZeroExTrustedTokens } from '../parsers/token_metadata'; -import { handleError } from '../utils'; - -const METAMASK_TRUSTED_TOKENS_URL = - 'https://raw.githubusercontent.com/MetaMask/eth-contract-metadata/d45916c533116510cc8e9e048a8b5fc3732a6b6d/contract-map.json'; - -const ZEROEX_TRUSTED_TOKENS_URL = 'https://website-api.0xproject.com/tokens'; - -let connection: Connection; - -(async () => { - connection = await createConnection(ormConfig as ConnectionOptions); - await getMetamaskTrustedTokensAsync(); - await getZeroExTrustedTokensAsync(); - process.exit(0); -})().catch(handleError); - -async function getMetamaskTrustedTokensAsync(): Promise<void> { - logUtils.log('Getting latest metamask trusted tokens list ...'); - const trustedTokensRepository = connection.getRepository(TokenMetadata); - const trustedTokensSource = new TrustedTokenSource<Map<string, MetamaskTrustedTokenMeta>>( - METAMASK_TRUSTED_TOKENS_URL, - ); - const resp = await trustedTokensSource.getTrustedTokenMetaAsync(); - const trustedTokens = parseMetamaskTrustedTokens(resp); - logUtils.log('Saving metamask trusted tokens list'); - await trustedTokensRepository.save(trustedTokens); - logUtils.log('Done saving metamask trusted tokens.'); -} - -async function getZeroExTrustedTokensAsync(): Promise<void> { - logUtils.log('Getting latest 0x trusted tokens list ...'); - const trustedTokensRepository = connection.getRepository(TokenMetadata); - const trustedTokensSource = new TrustedTokenSource<ZeroExTrustedTokenMeta[]>(ZEROEX_TRUSTED_TOKENS_URL); - const resp = await trustedTokensSource.getTrustedTokenMetaAsync(); - const trustedTokens = parseZeroExTrustedTokens(resp); - logUtils.log('Saving metamask trusted tokens list'); - await trustedTokensRepository.save(trustedTokens); - logUtils.log('Done saving metamask trusted tokens.'); -} diff --git a/packages/pipeline/src/scripts/update_relayer_info.ts b/packages/pipeline/src/scripts/update_relayer_info.ts deleted file mode 100644 index 910a0157c..000000000 --- a/packages/pipeline/src/scripts/update_relayer_info.ts +++ /dev/null @@ -1,34 +0,0 @@ -import 'reflect-metadata'; -import { Connection, ConnectionOptions, createConnection } from 'typeorm'; - -import { logUtils } from '@0x/utils'; - -import { RelayerRegistrySource } from '../data_sources/relayer-registry'; -import { Relayer } from '../entities'; -import * as ormConfig from '../ormconfig'; -import { parseRelayers } from '../parsers/relayer_registry'; -import { handleError } from '../utils'; - -// NOTE(albrow): We need to manually update this URL for now. Fix this when we -// have the relayer-registry behind semantic versioning. -const RELAYER_REGISTRY_URL = - 'https://raw.githubusercontent.com/0xProject/0x-relayer-registry/4701c85677d161ea729a466aebbc1826c6aa2c0b/relayers.json'; - -let connection: Connection; - -(async () => { - connection = await createConnection(ormConfig as ConnectionOptions); - await getRelayersAsync(); - process.exit(0); -})().catch(handleError); - -async function getRelayersAsync(): Promise<void> { - logUtils.log('Getting latest relayer info...'); - const relayerRepository = connection.getRepository(Relayer); - const relayerSource = new RelayerRegistrySource(RELAYER_REGISTRY_URL); - const relayersResp = await relayerSource.getRelayerInfoAsync(); - const relayers = parseRelayers(relayersResp); - logUtils.log('Saving relayer info...'); - await relayerRepository.save(relayers); - logUtils.log('Done saving relayer info.'); -} diff --git a/packages/pipeline/src/types.ts b/packages/pipeline/src/types.ts deleted file mode 100644 index 5f2121807..000000000 --- a/packages/pipeline/src/types.ts +++ /dev/null @@ -1,9 +0,0 @@ -export enum AssetType { - ERC20 = 'erc20', - ERC721 = 'erc721', - MultiAsset = 'multiAsset', -} -export enum OrderType { - Bid = 'bid', - Ask = 'ask', -} diff --git a/packages/pipeline/src/utils/constants.ts b/packages/pipeline/src/utils/constants.ts deleted file mode 100644 index 56f3e82d8..000000000 --- a/packages/pipeline/src/utils/constants.ts +++ /dev/null @@ -1,3 +0,0 @@ -// Block number when the Exchange contract was deployed to mainnet. -export const EXCHANGE_START_BLOCK = 6271590; -export const INFURA_ROOT_URL = 'https://mainnet.infura.io'; diff --git a/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts b/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts deleted file mode 100644 index 19f81344e..000000000 --- a/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts +++ /dev/null @@ -1,116 +0,0 @@ -import { fetchAsync } from '@0x/utils'; -import * as R from 'ramda'; -import { Connection } from 'typeorm'; - -export interface TradingPair { - fromSymbol: string; - toSymbol: string; - latestSavedTime: number; -} - -const COINLIST_API = 'https://min-api.cryptocompare.com/data/all/coinlist?BuiltOn=7605'; - -interface CryptoCompareCoinListResp { - Data: Map<string, CryptoCompareCoin>; -} - -interface CryptoCompareCoin { - Symbol: string; - BuiltOn: string; - SmartContractAddress: string; -} - -const TO_CURRENCIES = ['USD', 'EUR', 'ETH', 'USDT']; -const ETHEREUM_IDENTIFIER = '7605'; -const HTTP_OK_STATUS = 200; - -interface StaticPair { - fromSymbol: string; - toSymbol: string; -} -const SPECIAL_CASES: StaticPair[] = [ - { - fromSymbol: 'ETH', - toSymbol: 'USD', - }, -]; - -/** - * Get trading pairs with latest scraped time for OHLCV records - * @param conn a typeorm Connection to postgres - */ -export async function fetchOHLCVTradingPairsAsync( - conn: Connection, - source: string, - earliestBackfillTime: number, -): Promise<TradingPair[]> { - // fetch existing ohlcv records - const latestTradingPairs: Array<{ - from_symbol: string; - to_symbol: string; - latest: string; - }> = await conn.query(`SELECT - MAX(end_time) as latest, - from_symbol, - to_symbol - FROM raw.ohlcv_external - GROUP BY from_symbol, to_symbol;`); - - // build addressable index: { fromsym: { tosym: time }} - const latestTradingPairsIndex: { [fromSym: string]: { [toSym: string]: number } } = {}; - latestTradingPairs.forEach(pair => { - const latestIndex: { [toSym: string]: number } = latestTradingPairsIndex[pair.from_symbol] || {}; - latestIndex[pair.to_symbol] = parseInt(pair.latest, 10); // tslint:disable-line:custom-no-magic-numbers - latestTradingPairsIndex[pair.from_symbol] = latestIndex; - }); - - // match time to special cases - const specialCases: TradingPair[] = SPECIAL_CASES.map(pair => { - const latestSavedTime = - R.path<number>([pair.fromSymbol, pair.toSymbol], latestTradingPairsIndex) || earliestBackfillTime; - return R.assoc('latestSavedTime', latestSavedTime, pair); - }); - - // get token symbols used by Crypto Compare - const allCoinsResp = await fetchAsync(COINLIST_API); - if (allCoinsResp.status !== HTTP_OK_STATUS) { - return []; - } - const allCoins: CryptoCompareCoinListResp = await allCoinsResp.json(); - const erc20CoinsIndex: Map<string, string> = new Map(); - Object.entries(allCoins.Data).forEach(pair => { - const [symbol, coinData] = pair; - if (coinData.BuiltOn === ETHEREUM_IDENTIFIER && coinData.SmartContractAddress !== 'N/A') { - erc20CoinsIndex.set(coinData.SmartContractAddress.toLowerCase(), symbol); - } - }); - - // fetch all tokens that are traded on 0x - const rawEventTokenAddresses: Array<{ tokenaddress: string }> = await conn.query( - `SELECT DISTINCT(maker_token_address) as tokenaddress FROM raw.exchange_fill_events UNION - SELECT DISTINCT(taker_token_address) as tokenaddress FROM raw.exchange_fill_events`, - ); - - // tslint:disable-next-line:no-unbound-method - const eventTokenAddresses = R.pluck('tokenaddress', rawEventTokenAddresses).map(R.toLower); - - // join token addresses with CC symbols - const eventTokenSymbols: string[] = eventTokenAddresses - .filter(tokenAddress => erc20CoinsIndex.has(tokenAddress)) - .map(tokenAddress => erc20CoinsIndex.get(tokenAddress) as string); - - // join traded tokens with fiat and latest backfill time - const eventTradingPairs: TradingPair[] = R.chain(sym => { - return TO_CURRENCIES.map(fiat => { - const pair = { - fromSymbol: sym, - toSymbol: fiat, - latestSavedTime: R.path<number>([sym, fiat], latestTradingPairsIndex) || earliestBackfillTime, - }; - return pair; - }); - }, eventTokenSymbols); - - // join with special cases - return R.concat(eventTradingPairs, specialCases); -} diff --git a/packages/pipeline/src/utils/index.ts b/packages/pipeline/src/utils/index.ts deleted file mode 100644 index 094c0178e..000000000 --- a/packages/pipeline/src/utils/index.ts +++ /dev/null @@ -1,53 +0,0 @@ -import { BigNumber } from '@0x/utils'; -export * from './transformers'; -export * from './constants'; - -/** - * If the given BigNumber is not null, returns the string representation of that - * number. Otherwise, returns null. - * @param n The number to convert. - */ -export function bigNumbertoStringOrNull(n: BigNumber): string | null { - if (n == null) { - return null; - } - return n.toString(); -} - -/** - * If value is null or undefined, returns null. Otherwise converts value to a - * BigNumber. - * @param value A string or number to be converted to a BigNumber - */ -export function toBigNumberOrNull(value: string | number | null): BigNumber | null { - switch (value) { - case null: - case undefined: - return null; - default: - return new BigNumber(value); - } -} - -/** - * Logs an error by intelligently checking for `message` and `stack` properties. - * Intended for use with top-level immediately invoked asynchronous functions. - * @param e the error to log. - */ -export function handleError(e: any): void { - if (e.message != null) { - // tslint:disable-next-line:no-console - console.error(e.message); - } else { - // tslint:disable-next-line:no-console - console.error('Unknown error'); - } - if (e.stack != null) { - // tslint:disable-next-line:no-console - console.error(e.stack); - } else { - // tslint:disable-next-line:no-console - console.error('(No stack trace)'); - } - process.exit(1); -} diff --git a/packages/pipeline/src/utils/transformers/asset_proxy_id_types.ts b/packages/pipeline/src/utils/transformers/asset_proxy_id_types.ts deleted file mode 100644 index 2cd05a616..000000000 --- a/packages/pipeline/src/utils/transformers/asset_proxy_id_types.ts +++ /dev/null @@ -1,20 +0,0 @@ -import { AssetProxyId } from '@0x/types'; - -import { AssetType } from '../../types'; - -/** - * Converts an assetProxyId to its string equivalent - * @param assetProxyId Id of AssetProxy - */ -export function convertAssetProxyIdToType(assetProxyId: AssetProxyId): AssetType { - switch (assetProxyId) { - case AssetProxyId.ERC20: - return AssetType.ERC20; - case AssetProxyId.ERC721: - return AssetType.ERC721; - case AssetProxyId.MultiAsset: - return AssetType.MultiAsset; - default: - throw new Error(`${assetProxyId} not a supported assetProxyId`); - } -} diff --git a/packages/pipeline/src/utils/transformers/big_number.ts b/packages/pipeline/src/utils/transformers/big_number.ts deleted file mode 100644 index 5f2e4d565..000000000 --- a/packages/pipeline/src/utils/transformers/big_number.ts +++ /dev/null @@ -1,16 +0,0 @@ -import { BigNumber } from '@0x/utils'; -import { ValueTransformer } from 'typeorm/decorator/options/ValueTransformer'; - -export class BigNumberTransformer implements ValueTransformer { - // tslint:disable-next-line:prefer-function-over-method - public to(value: BigNumber | null): string | null { - return value === null ? null : value.toString(); - } - - // tslint:disable-next-line:prefer-function-over-method - public from(value: string | null): BigNumber | null { - return value === null ? null : new BigNumber(value); - } -} - -export const bigNumberTransformer = new BigNumberTransformer(); diff --git a/packages/pipeline/src/utils/transformers/index.ts b/packages/pipeline/src/utils/transformers/index.ts deleted file mode 100644 index 31a4c9223..000000000 --- a/packages/pipeline/src/utils/transformers/index.ts +++ /dev/null @@ -1,3 +0,0 @@ -export * from './big_number'; -export * from './number_to_bigint'; -export * from './asset_proxy_id_types'; diff --git a/packages/pipeline/src/utils/transformers/number_to_bigint.ts b/packages/pipeline/src/utils/transformers/number_to_bigint.ts deleted file mode 100644 index 8fbd52093..000000000 --- a/packages/pipeline/src/utils/transformers/number_to_bigint.ts +++ /dev/null @@ -1,31 +0,0 @@ -import { BigNumber } from '@0x/utils'; -import { ValueTransformer } from 'typeorm/decorator/options/ValueTransformer'; - -const decimalRadix = 10; - -// Can be used to convert a JavaScript number type to a Postgres bigint type and -// vice versa. By default TypeORM will silently convert number types to string -// if the corresponding Postgres type is bigint. See -// https://github.com/typeorm/typeorm/issues/2400 for more information. -export class NumberToBigIntTransformer implements ValueTransformer { - // tslint:disable-next-line:prefer-function-over-method - public to(value: number): string | null { - if (value === null || value === undefined) { - return null; - } else { - return value.toString(); - } - } - - // tslint:disable-next-line:prefer-function-over-method - public from(value: string): number { - if (new BigNumber(value).isGreaterThan(Number.MAX_SAFE_INTEGER)) { - throw new Error( - `Attempted to convert PostgreSQL bigint value (${value}) to JavaScript number type but it is too big to safely convert`, - ); - } - return Number.parseInt(value, decimalRadix); - } -} - -export const numberToBigIntTransformer = new NumberToBigIntTransformer(); |