diff options
Diffstat (limited to 'packages/pipeline')
45 files changed, 1414 insertions, 288 deletions
diff --git a/packages/pipeline/migrations/1544131464368-CreateERC20ApprovalEvents.ts b/packages/pipeline/migrations/1544131464368-CreateERC20ApprovalEvents.ts new file mode 100644 index 000000000..2e84e0ec8 --- /dev/null +++ b/packages/pipeline/migrations/1544131464368-CreateERC20ApprovalEvents.ts @@ -0,0 +1,26 @@ +import { MigrationInterface, QueryRunner, Table } from 'typeorm'; + +const erc20ApprovalEvents = new Table({ + name: 'raw.erc20_approval_events', + columns: [ + { name: 'token_address', type: 'varchar(42)', isPrimary: true }, + { name: 'log_index', type: 'integer', isPrimary: true }, + { name: 'block_number', type: 'bigint', isPrimary: true }, + + { name: 'raw_data', type: 'varchar' }, + { name: 'transaction_hash', type: 'varchar' }, + { name: 'owner_address', type: 'varchar(42)' }, + { name: 'spender_address', type: 'varchar(42)' }, + { name: 'amount', type: 'numeric' }, + ], +}); + +export class CreateERC20TokenApprovalEvents1544131464368 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise<any> { + await queryRunner.createTable(erc20ApprovalEvents); + } + + public async down(queryRunner: QueryRunner): Promise<any> { + await queryRunner.dropTable(erc20ApprovalEvents); + } +} diff --git a/packages/pipeline/migrations/1544131658904-TokenOrderbookSnapshotAddOrderType.ts b/packages/pipeline/migrations/1544131658904-TokenOrderbookSnapshotAddOrderType.ts new file mode 100644 index 000000000..a501ec6d8 --- /dev/null +++ b/packages/pipeline/migrations/1544131658904-TokenOrderbookSnapshotAddOrderType.ts @@ -0,0 +1,33 @@ +import { MigrationInterface, QueryRunner } from 'typeorm'; + +export class TokenOrderbookSnapshotAddOrderType1544131658904 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise<any> { + await queryRunner.query( + `ALTER TABLE raw.token_orderbook_snapshots + DROP CONSTRAINT "PK_8a16487e7cb6862ec5a84ed3495", + ADD PRIMARY KEY (observed_timestamp, source, order_type, price, base_asset_symbol, quote_asset_symbol); + `, + ); + await queryRunner.query( + `ALTER TABLE raw.token_orderbook_snapshots + ALTER COLUMN quote_asset_address DROP NOT NULL, + ALTER COLUMN base_asset_address DROP NOT NULL; + `, + ); + } + + public async down(queryRunner: QueryRunner): Promise<any> { + await queryRunner.query( + `ALTER TABLE raw.token_orderbook_snapshots + ALTER COLUMN quote_asset_address SET NOT NULL, + ALTER COLUMN base_asset_address SET NOT NULL; + `, + ); + await queryRunner.query( + `ALTER TABLE raw.token_orderbook_snapshots + DROP CONSTRAINT token_orderbook_snapshots_pkey, + ADD CONSTRAINT "PK_8a16487e7cb6862ec5a84ed3495" PRIMARY KEY (observed_timestamp, source, price, base_asset_symbol, quote_asset_symbol); + `, + ); + } +} diff --git a/packages/pipeline/package.json b/packages/pipeline/package.json index 0539618d4..f8adf9055 100644 --- a/packages/pipeline/package.json +++ b/packages/pipeline/package.json @@ -1,6 +1,6 @@ { "name": "@0x/pipeline", - "version": "1.0.0", + "version": "1.0.1", "private": true, "description": "Data pipeline for offline analysis", "scripts": { @@ -39,22 +39,23 @@ "typescript": "3.0.1" }, "dependencies": { - "@0x/connect": "^3.0.2", + "@0x/connect": "^3.0.9", + "@0x/contract-addresses": "^2.0.0", "@0x/contract-artifacts": "^1.0.1", "@0x/contract-wrappers": "^3.0.0", - "@0x/dev-utils": "^1.0.13", + "@0x/dev-utils": "^1.0.20", "@0x/order-utils": "^2.0.0", - "@0x/subproviders": "^2.1.0", - "@0x/types": "^1.2.0", - "@0x/utils": "^2.0.3", - "@0x/web3-wrapper": "^3.1.0", + "@0x/subproviders": "^2.1.7", + "@0x/types": "^1.4.0", + "@0x/utils": "^2.0.7", + "@0x/web3-wrapper": "^3.2.0", "@types/dockerode": "^2.5.9", "@types/p-limit": "^2.0.0", "async-parallel": "^1.2.3", "axios": "^0.18.0", + "bottleneck": "^2.13.2", "dockerode": "^2.5.7", - "ethereum-types": "^1.0.6", - "p-limit": "^2.0.0", + "ethereum-types": "^1.1.3", "pg": "^7.5.0", "prettier": "^1.15.3", "ramda": "^0.25.0", diff --git a/packages/pipeline/src/data_sources/bloxy/index.ts b/packages/pipeline/src/data_sources/bloxy/index.ts index 31cd5bfd6..94468d25a 100644 --- a/packages/pipeline/src/data_sources/bloxy/index.ts +++ b/packages/pipeline/src/data_sources/bloxy/index.ts @@ -116,7 +116,7 @@ export class BloxySource { }, }); if (isError(resp.data)) { - throw new Error('Error in Bloxy API response: ' + resp.data.error); + throw new Error(`Error in Bloxy API response: ${resp.data.error}`); } return resp.data; } diff --git a/packages/pipeline/src/data_sources/contract-wrappers/erc20_events.ts b/packages/pipeline/src/data_sources/contract-wrappers/erc20_events.ts new file mode 100644 index 000000000..e0098122f --- /dev/null +++ b/packages/pipeline/src/data_sources/contract-wrappers/erc20_events.ts @@ -0,0 +1,45 @@ +import { + ContractWrappers, + ERC20TokenApprovalEventArgs, + ERC20TokenEvents, + ERC20TokenWrapper, +} from '@0x/contract-wrappers'; +import { Web3ProviderEngine } from '@0x/subproviders'; +import { LogWithDecodedArgs } from 'ethereum-types'; + +import { GetEventsFunc, getEventsWithPaginationAsync } from './utils'; + +export class ERC20EventsSource { + private readonly _erc20Wrapper: ERC20TokenWrapper; + private readonly _tokenAddress: string; + constructor(provider: Web3ProviderEngine, networkId: number, tokenAddress: string) { + const contractWrappers = new ContractWrappers(provider, { networkId }); + this._erc20Wrapper = contractWrappers.erc20Token; + this._tokenAddress = tokenAddress; + } + + public async getApprovalEventsAsync( + startBlock: number, + endBlock: number, + ): Promise<Array<LogWithDecodedArgs<ERC20TokenApprovalEventArgs>>> { + return getEventsWithPaginationAsync( + this._getApprovalEventsForRangeAsync.bind(this) as GetEventsFunc<ERC20TokenApprovalEventArgs>, + startBlock, + endBlock, + ); + } + + // Gets all approval events of for a specific sub-range. This getter + // function will be called during each step of pagination. + private async _getApprovalEventsForRangeAsync( + fromBlock: number, + toBlock: number, + ): Promise<Array<LogWithDecodedArgs<ERC20TokenApprovalEventArgs>>> { + return this._erc20Wrapper.getLogsAsync<ERC20TokenApprovalEventArgs>( + this._tokenAddress, + ERC20TokenEvents.Approval, + { fromBlock, toBlock }, + {}, + ); + } +} diff --git a/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts b/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts index 1717eb8b3..58691e2ab 100644 --- a/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts +++ b/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts @@ -8,78 +8,52 @@ import { ExchangeWrapper, } from '@0x/contract-wrappers'; import { Web3ProviderEngine } from '@0x/subproviders'; -import { Web3Wrapper } from '@0x/web3-wrapper'; import { LogWithDecodedArgs } from 'ethereum-types'; -import { EXCHANGE_START_BLOCK } from '../../utils'; - -const BLOCK_FINALITY_THRESHOLD = 10; // When to consider blocks as final. Used to compute default toBlock. -const NUM_BLOCKS_PER_QUERY = 20000; // Number of blocks to query for events at a time. +import { GetEventsFunc, getEventsWithPaginationAsync } from './utils'; export class ExchangeEventsSource { private readonly _exchangeWrapper: ExchangeWrapper; - private readonly _web3Wrapper: Web3Wrapper; constructor(provider: Web3ProviderEngine, networkId: number) { - this._web3Wrapper = new Web3Wrapper(provider); const contractWrappers = new ContractWrappers(provider, { networkId }); this._exchangeWrapper = contractWrappers.exchange; } public async getFillEventsAsync( - fromBlock?: number, - toBlock?: number, + startBlock: number, + endBlock: number, ): Promise<Array<LogWithDecodedArgs<ExchangeFillEventArgs>>> { - return this._getEventsAsync<ExchangeFillEventArgs>(ExchangeEvents.Fill, fromBlock, toBlock); + const getFillEventsForRangeAsync = this._makeGetterFuncForEventType<ExchangeFillEventArgs>(ExchangeEvents.Fill); + return getEventsWithPaginationAsync(getFillEventsForRangeAsync, startBlock, endBlock); } public async getCancelEventsAsync( - fromBlock?: number, - toBlock?: number, + startBlock: number, + endBlock: number, ): Promise<Array<LogWithDecodedArgs<ExchangeCancelEventArgs>>> { - return this._getEventsAsync<ExchangeCancelEventArgs>(ExchangeEvents.Cancel, fromBlock, toBlock); + const getCancelEventsForRangeAsync = this._makeGetterFuncForEventType<ExchangeCancelEventArgs>( + ExchangeEvents.Cancel, + ); + return getEventsWithPaginationAsync(getCancelEventsForRangeAsync, startBlock, endBlock); } public async getCancelUpToEventsAsync( - fromBlock?: number, - toBlock?: number, + startBlock: number, + endBlock: number, ): Promise<Array<LogWithDecodedArgs<ExchangeCancelUpToEventArgs>>> { - return this._getEventsAsync<ExchangeCancelUpToEventArgs>(ExchangeEvents.CancelUpTo, fromBlock, toBlock); - } - - private async _getEventsAsync<ArgsType extends ExchangeEventArgs>( - eventName: ExchangeEvents, - fromBlock: number = EXCHANGE_START_BLOCK, - toBlock?: number, - ): Promise<Array<LogWithDecodedArgs<ArgsType>>> { - const calculatedToBlock = - toBlock === undefined - ? (await this._web3Wrapper.getBlockNumberAsync()) - BLOCK_FINALITY_THRESHOLD - : toBlock; - let events: Array<LogWithDecodedArgs<ArgsType>> = []; - for (let currFromBlock = fromBlock; currFromBlock <= calculatedToBlock; currFromBlock += NUM_BLOCKS_PER_QUERY) { - events = events.concat( - await this._getEventsForRangeAsync<ArgsType>( - eventName, - currFromBlock, - Math.min(currFromBlock + NUM_BLOCKS_PER_QUERY - 1, calculatedToBlock), - ), - ); - } - return events; + const getCancelUpToEventsForRangeAsync = this._makeGetterFuncForEventType<ExchangeCancelUpToEventArgs>( + ExchangeEvents.CancelUpTo, + ); + return getEventsWithPaginationAsync(getCancelUpToEventsForRangeAsync, startBlock, endBlock); } - private async _getEventsForRangeAsync<ArgsType extends ExchangeEventArgs>( - eventName: ExchangeEvents, - fromBlock: number, - toBlock: number, - ): Promise<Array<LogWithDecodedArgs<ArgsType>>> { - return this._exchangeWrapper.getLogsAsync<ArgsType>( - eventName, - { - fromBlock, - toBlock, - }, - {}, - ); + // Returns a getter function which gets all events of a specific type for a + // specific sub-range. This getter function will be called during each step + // of pagination. + private _makeGetterFuncForEventType<ArgsType extends ExchangeEventArgs>( + eventType: ExchangeEvents, + ): GetEventsFunc<ArgsType> { + return async (fromBlock: number, toBlock: number) => + this._exchangeWrapper.getLogsAsync<ArgsType>(eventType, { fromBlock, toBlock }, {}); } } diff --git a/packages/pipeline/src/data_sources/contract-wrappers/utils.ts b/packages/pipeline/src/data_sources/contract-wrappers/utils.ts new file mode 100644 index 000000000..67660a37e --- /dev/null +++ b/packages/pipeline/src/data_sources/contract-wrappers/utils.ts @@ -0,0 +1,67 @@ +import { DecodedLogArgs, LogWithDecodedArgs } from 'ethereum-types'; + +const NUM_BLOCKS_PER_QUERY = 10000; // Number of blocks to query for events at a time. +const NUM_RETRIES = 3; // Number of retries if a request fails or times out. + +export type GetEventsFunc<ArgsType extends DecodedLogArgs> = ( + fromBlock: number, + toBlock: number, +) => Promise<Array<LogWithDecodedArgs<ArgsType>>>; + +/** + * Gets all events between the given startBlock and endBlock by querying for + * NUM_BLOCKS_PER_QUERY at a time. Accepts a getter function in order to + * maximize code re-use and allow for getting different types of events for + * different contracts. If the getter function throws with a retryable error, + * it will automatically be retried up to NUM_RETRIES times. + * @param getEventsAsync A getter function which will be called for each step during pagination. + * @param startBlock The start of the entire block range to get events for. + * @param endBlock The end of the entire block range to get events for. + */ +export async function getEventsWithPaginationAsync<ArgsType extends DecodedLogArgs>( + getEventsAsync: GetEventsFunc<ArgsType>, + startBlock: number, + endBlock: number, +): Promise<Array<LogWithDecodedArgs<ArgsType>>> { + let events: Array<LogWithDecodedArgs<ArgsType>> = []; + for (let fromBlock = startBlock; fromBlock <= endBlock; fromBlock += NUM_BLOCKS_PER_QUERY) { + const toBlock = Math.min(fromBlock + NUM_BLOCKS_PER_QUERY - 1, endBlock); + const eventsInRange = await _getEventsWithRetriesAsync(getEventsAsync, NUM_RETRIES, fromBlock, toBlock); + events = events.concat(eventsInRange); + } + return events; +} + +/** + * Calls the getEventsAsync function and retries up to numRetries times if it + * throws with an error that is considered retryable. + * @param getEventsAsync a function that will be called on each iteration. + * @param numRetries the maximum number times to retry getEventsAsync if it fails with a retryable error. + * @param fromBlock the start of the sub-range of blocks we are getting events for. + * @param toBlock the end of the sub-range of blocks we are getting events for. + */ +export async function _getEventsWithRetriesAsync<ArgsType extends DecodedLogArgs>( + getEventsAsync: GetEventsFunc<ArgsType>, + numRetries: number, + fromBlock: number, + toBlock: number, +): Promise<Array<LogWithDecodedArgs<ArgsType>>> { + let eventsInRange: Array<LogWithDecodedArgs<ArgsType>> = []; + for (let i = 0; i <= numRetries; i++) { + try { + eventsInRange = await getEventsAsync(fromBlock, toBlock); + } catch (err) { + if (isErrorRetryable(err) && i < numRetries) { + continue; + } else { + throw err; + } + } + break; + } + return eventsInRange; +} + +function isErrorRetryable(err: Error): boolean { + return err.message.includes('network timeout'); +} diff --git a/packages/pipeline/src/data_sources/idex/index.ts b/packages/pipeline/src/data_sources/idex/index.ts new file mode 100644 index 000000000..c1e53c08d --- /dev/null +++ b/packages/pipeline/src/data_sources/idex/index.ts @@ -0,0 +1,82 @@ +import { fetchAsync } from '@0x/utils'; + +const IDEX_BASE_URL = 'https://api.idex.market'; +const MARKETS_URL = `${IDEX_BASE_URL}/returnTicker`; +const ORDERBOOK_URL = `${IDEX_BASE_URL}/returnOrderBook`; +const MAX_ORDER_COUNT = 100; // Maximum based on https://github.com/AuroraDAO/idex-api-docs#returnorderbook +export const IDEX_SOURCE = 'idex'; + +export interface IdexMarketsResponse { + [marketName: string]: IdexMarket; +} + +export interface IdexMarket { + last: string; + high: string; + low: string; + lowestAsk: string; + highestBid: string; + percentChange: string; + baseVolume: string; + quoteVolume: string; +} + +export interface IdexOrderbook { + asks: IdexOrder[]; + bids: IdexOrder[]; +} + +export interface IdexOrder { + price: string; + amount: string; + total: string; + orderHash: string; + params: IdexOrderParam; +} + +export interface IdexOrderParam { + tokenBuy: string; + buySymbol: string; + buyPrecision: number; + amountBuy: string; + tokenSell: string; + sellSymbol: string; + sellPrecision: number; + amountSell: string; + expires: number; + nonce: number; + user: string; +} + +// tslint:disable:prefer-function-over-method +// ^ Keep consistency with other sources and help logical organization +export class IdexSource { + /** + * Call Idex API to find out which markets they are maintaining orderbooks for. + */ + public async getMarketsAsync(): Promise<string[]> { + const params = { method: 'POST' }; + const resp = await fetchAsync(MARKETS_URL, params); + const respJson: IdexMarketsResponse = await resp.json(); + const markets: string[] = Object.keys(respJson); + return markets; + } + + /** + * Retrieve orderbook from Idex API for a given market. + * @param marketId String identifying the market we want data for. Eg. 'REP_AUG' + */ + public async getMarketOrderbookAsync(marketId: string): Promise<IdexOrderbook> { + const params = { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + market: marketId, + count: MAX_ORDER_COUNT, + }), + }; + const resp = await fetchAsync(ORDERBOOK_URL, params); + const respJson: IdexOrderbook = await resp.json(); + return respJson; + } +} diff --git a/packages/pipeline/src/data_sources/oasis/index.ts b/packages/pipeline/src/data_sources/oasis/index.ts new file mode 100644 index 000000000..3b30e9dfd --- /dev/null +++ b/packages/pipeline/src/data_sources/oasis/index.ts @@ -0,0 +1,103 @@ +import { fetchAsync } from '@0x/utils'; + +const OASIS_BASE_URL = 'https://data.makerdao.com/v1'; +const OASIS_MARKET_QUERY = `query { + oasisMarkets(period: "1 week") { + nodes { + id + base + quote + buyVol + sellVol + price + high + low + } + } +}`; +const OASIS_ORDERBOOK_QUERY = `query ($market: String!) { + allOasisOrders(condition: { market: $market }) { + totalCount + nodes { + market + offerId + price + amount + act + } + } +}`; +export const OASIS_SOURCE = 'oasis'; + +export interface OasisMarket { + id: string; // market symbol e.g MKRDAI + base: string; // base symbol e.g MKR + quote: string; // quote symbol e.g DAI + buyVol: number; // total buy volume (base) + sellVol: number; // total sell volume (base) + price: number; // volume weighted price (quote) + high: number; // max sell price + low: number; // min buy price +} + +export interface OasisMarketResponse { + data: { + oasisMarkets: { + nodes: OasisMarket[]; + }; + }; +} + +export interface OasisOrder { + offerId: number; // Offer Id + market: string; // Market symbol (base/quote) + price: string; // Offer price (quote) + amount: string; // Offer amount (base) + act: string; // Action (ask|bid) +} + +export interface OasisOrderbookResponse { + data: { + allOasisOrders: { + totalCount: number; + nodes: OasisOrder[]; + }; + }; +} + +// tslint:disable:prefer-function-over-method +// ^ Keep consistency with other sources and help logical organization +export class OasisSource { + /** + * Call Ddex API to find out which markets they are maintaining orderbooks for. + */ + public async getActiveMarketsAsync(): Promise<OasisMarket[]> { + const params = { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ query: OASIS_MARKET_QUERY }), + }; + const resp = await fetchAsync(OASIS_BASE_URL, params); + const respJson: OasisMarketResponse = await resp.json(); + const markets = respJson.data.oasisMarkets.nodes; + return markets; + } + + /** + * Retrieve orderbook from Oasis API for a given market. + * @param marketId String identifying the market we want data for. Eg. 'REPAUG'. + */ + public async getMarketOrderbookAsync(marketId: string): Promise<OasisOrder[]> { + const input = { + market: marketId, + }; + const params = { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ query: OASIS_ORDERBOOK_QUERY, variables: input }), + }; + const resp = await fetchAsync(OASIS_BASE_URL, params); + const respJson: OasisOrderbookResponse = await resp.json(); + return respJson.data.allOasisOrders.nodes; + } +} diff --git a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts index 8804c34d0..85042501b 100644 --- a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts +++ b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts @@ -1,6 +1,6 @@ // tslint:disable:no-duplicate-imports import { fetchAsync } from '@0x/utils'; -import promiseLimit = require('p-limit'); +import Bottleneck from 'bottleneck'; import { stringify } from 'querystring'; import * as R from 'ramda'; @@ -33,43 +33,41 @@ export interface CryptoCompareOHLCVParams { toTs?: number; } -const ONE_WEEK = 7 * 24 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers const ONE_HOUR = 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers const ONE_SECOND = 1000; const ONE_HOUR_AGO = new Date().getTime() - ONE_HOUR; const HTTP_OK_STATUS = 200; const CRYPTO_COMPARE_VALID_EMPTY_RESPONSE_TYPE = 96; +const MAX_PAGE_SIZE = 2000; export class CryptoCompareOHLCVSource { - public readonly interval = ONE_WEEK; // the hourly API returns data for one week at a time - public readonly default_exchange = 'CCCAGG'; public readonly intervalBetweenRecords = ONE_HOUR; + public readonly defaultExchange = 'CCCAGG'; + public readonly interval = this.intervalBetweenRecords * MAX_PAGE_SIZE; // the hourly API returns data for one interval at a time private readonly _url: string = 'https://min-api.cryptocompare.com/data/histohour?'; // rate-limit for all API calls through this class instance - private readonly _promiseLimit: (fetchFn: () => Promise<Response>) => Promise<Response>; - constructor(maxConcurrentRequests: number = 50) { - this._promiseLimit = promiseLimit(maxConcurrentRequests); + private readonly _limiter: Bottleneck; + constructor(maxReqsPerSecond: number) { + this._limiter = new Bottleneck({ + minTime: ONE_SECOND / maxReqsPerSecond, + reservoir: 30, + reservoirRefreshAmount: 30, + reservoirRefreshInterval: ONE_SECOND, + }); } // gets OHLCV records starting from pair.latest public async getHourlyOHLCVAsync(pair: TradingPair): Promise<CryptoCompareOHLCVRecord[]> { const params = { - e: this.default_exchange, + e: this.defaultExchange, fsym: pair.fromSymbol, tsym: pair.toSymbol, + limit: MAX_PAGE_SIZE, toTs: Math.floor((pair.latestSavedTime + this.interval) / ONE_SECOND), // CryptoCompare uses timestamp in seconds. not ms }; const url = this._url + stringify(params); - - // go through the instance-wide rate-limit - const fetchPromise: Promise<Response> = this._promiseLimit(() => { - // tslint:disable-next-line:no-console - console.log(`Scraping Crypto Compare at ${url}`); - return fetchAsync(url); - }); - - const response = await Promise.resolve(fetchPromise); + const response = await this._limiter.schedule(() => fetchAsync(url)); if (response.status !== HTTP_OK_STATUS) { throw new Error(`HTTP error while scraping Crypto Compare: [${response}]`); } diff --git a/packages/pipeline/src/data_sources/paradex/index.ts b/packages/pipeline/src/data_sources/paradex/index.ts index 69a03d553..46d448f4b 100644 --- a/packages/pipeline/src/data_sources/paradex/index.ts +++ b/packages/pipeline/src/data_sources/paradex/index.ts @@ -1,9 +1,9 @@ import { fetchAsync, logUtils } from '@0x/utils'; const PARADEX_BASE_URL = 'https://api.paradex.io/consumer/v0'; -const ACTIVE_MARKETS_URL = PARADEX_BASE_URL + '/markets'; -const ORDERBOOK_ENDPOINT = PARADEX_BASE_URL + '/orderbook'; -const TOKEN_INFO_ENDPOINT = PARADEX_BASE_URL + '/tokens'; +const ACTIVE_MARKETS_URL = `${PARADEX_BASE_URL}/markets`; +const ORDERBOOK_ENDPOINT = `${PARADEX_BASE_URL}/orderbook`; +const TOKEN_INFO_ENDPOINT = `${PARADEX_BASE_URL}/tokens`; export const PARADEX_SOURCE = 'paradex'; export type ParadexActiveMarketsResponse = ParadexMarket[]; diff --git a/packages/pipeline/src/entities/erc20_approval_event.ts b/packages/pipeline/src/entities/erc20_approval_event.ts new file mode 100644 index 000000000..69cdfcb0b --- /dev/null +++ b/packages/pipeline/src/entities/erc20_approval_event.ts @@ -0,0 +1,26 @@ +import { BigNumber } from '@0x/utils'; +import { Column, Entity, PrimaryColumn } from 'typeorm'; + +import { bigNumberTransformer, numberToBigIntTransformer } from '../utils'; + +@Entity({ name: 'erc20_approval_events', schema: 'raw' }) +export class ERC20ApprovalEvent { + @PrimaryColumn({ name: 'token_address' }) + public tokenAddress!: string; + @PrimaryColumn({ name: 'log_index' }) + public logIndex!: number; + @PrimaryColumn({ name: 'block_number', transformer: numberToBigIntTransformer }) + public blockNumber!: number; + + @Column({ name: 'raw_data' }) + public rawData!: string; + + @Column({ name: 'transaction_hash' }) + public transactionHash!: string; + @Column({ name: 'owner_address' }) + public ownerAddress!: string; + @Column({ name: 'spender_address' }) + public spenderAddress!: string; + @Column({ name: 'amount', type: 'numeric', transformer: bigNumberTransformer }) + public amount!: BigNumber; +} diff --git a/packages/pipeline/src/entities/index.ts b/packages/pipeline/src/entities/index.ts index db0814e38..cc3de78bb 100644 --- a/packages/pipeline/src/entities/index.ts +++ b/packages/pipeline/src/entities/index.ts @@ -14,5 +14,6 @@ export { SraOrdersObservedTimeStamp, createObservedTimestampForOrder } from './s export { TokenMetadata } from './token_metadata'; export { TokenOrderbookSnapshot } from './token_order'; export { Transaction } from './transaction'; +export { ERC20ApprovalEvent } from './erc20_approval_event'; export type ExchangeEvent = ExchangeFillEvent | ExchangeCancelEvent | ExchangeCancelUpToEvent; diff --git a/packages/pipeline/src/entities/token_order.ts b/packages/pipeline/src/entities/token_order.ts index 557705767..4b8f0abc3 100644 --- a/packages/pipeline/src/entities/token_order.ts +++ b/packages/pipeline/src/entities/token_order.ts @@ -10,20 +10,20 @@ export class TokenOrderbookSnapshot { public observedTimestamp!: number; @PrimaryColumn({ name: 'source' }) public source!: string; - @Column({ name: 'order_type' }) + @PrimaryColumn({ name: 'order_type' }) public orderType!: OrderType; @PrimaryColumn({ name: 'price', type: 'numeric', transformer: bigNumberTransformer }) public price!: BigNumber; @PrimaryColumn({ name: 'base_asset_symbol' }) public baseAssetSymbol!: string; - @Column({ name: 'base_asset_address' }) - public baseAssetAddress!: string; + @Column({ nullable: true, type: String, name: 'base_asset_address' }) + public baseAssetAddress!: string | null; @Column({ name: 'base_volume', type: 'numeric', transformer: bigNumberTransformer }) public baseVolume!: BigNumber; @PrimaryColumn({ name: 'quote_asset_symbol' }) public quoteAssetSymbol!: string; - @Column({ name: 'quote_asset_address' }) - public quoteAssetAddress!: string; + @Column({ nullable: true, type: String, name: 'quote_asset_address' }) + public quoteAssetAddress!: string | null; @Column({ name: 'quote_volume', type: 'numeric', transformer: bigNumberTransformer }) public quoteVolume!: BigNumber; } diff --git a/packages/pipeline/src/ormconfig.ts b/packages/pipeline/src/ormconfig.ts index 9f7815b4e..fe11d81d5 100644 --- a/packages/pipeline/src/ormconfig.ts +++ b/packages/pipeline/src/ormconfig.ts @@ -3,6 +3,7 @@ import { ConnectionOptions } from 'typeorm'; import { Block, DexTrade, + ERC20ApprovalEvent, ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeFillEvent, @@ -21,6 +22,7 @@ const entities = [ ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeFillEvent, + ERC20ApprovalEvent, OHLCVExternal, Relayer, SraOrder, diff --git a/packages/pipeline/src/parsers/ddex_orders/index.ts b/packages/pipeline/src/parsers/ddex_orders/index.ts index 81132e8f0..52a998f9f 100644 --- a/packages/pipeline/src/parsers/ddex_orders/index.ts +++ b/packages/pipeline/src/parsers/ddex_orders/index.ts @@ -1,7 +1,8 @@ import { BigNumber } from '@0x/utils'; -import * as R from 'ramda'; -import { DdexMarket, DdexOrder, DdexOrderbook } from '../../data_sources/ddex'; +import { aggregateOrders } from '../utils'; + +import { DdexMarket, DdexOrderbook } from '../../data_sources/ddex'; import { TokenOrderbookSnapshot as TokenOrder } from '../../entities'; import { OrderType } from '../../types'; @@ -28,19 +29,6 @@ export function parseDdexOrders( } /** - * Aggregates orders by price point for consistency with other exchanges. - * Querying the Ddex API at level 3 setting returns a breakdown of - * individual orders at each price point. Other exchanges only give total amount - * at each price point. Returns an array of <price, amount> tuples. - * @param ddexOrders A list of Ddex orders awaiting aggregation. - */ -export function aggregateOrders(ddexOrders: DdexOrder[]): Array<[string, BigNumber]> { - const sumAmount = (acc: BigNumber, order: DdexOrder): BigNumber => acc.plus(order.amount); - const aggregatedPricePoints = R.reduceBy(sumAmount, new BigNumber(0), R.prop('price'), ddexOrders); - return Object.entries(aggregatedPricePoints); -} - -/** * Parse a single aggregated Ddex order in order to form a tokenOrder entity * which can be saved into the database. * @param ddexMarket An object containing information about the market where these diff --git a/packages/pipeline/src/parsers/events/erc20_events.ts b/packages/pipeline/src/parsers/events/erc20_events.ts new file mode 100644 index 000000000..caf9984d0 --- /dev/null +++ b/packages/pipeline/src/parsers/events/erc20_events.ts @@ -0,0 +1,34 @@ +import { ERC20TokenApprovalEventArgs } from '@0x/contract-wrappers'; +import { LogWithDecodedArgs } from 'ethereum-types'; +import * as R from 'ramda'; + +import { ERC20ApprovalEvent } from '../../entities'; + +/** + * Parses raw event logs for an ERC20 approval event and returns an array of + * ERC20ApprovalEvent entities. + * @param eventLogs Raw event logs (e.g. returned from contract-wrappers). + */ +export const parseERC20ApprovalEvents: ( + eventLogs: Array<LogWithDecodedArgs<ERC20TokenApprovalEventArgs>>, +) => ERC20ApprovalEvent[] = R.map(_convertToERC20ApprovalEvent); + +/** + * Converts a raw event log for an ERC20 approval event into an + * ERC20ApprovalEvent entity. + * @param eventLog Raw event log (e.g. returned from contract-wrappers). + */ +export function _convertToERC20ApprovalEvent( + eventLog: LogWithDecodedArgs<ERC20TokenApprovalEventArgs>, +): ERC20ApprovalEvent { + const erc20ApprovalEvent = new ERC20ApprovalEvent(); + erc20ApprovalEvent.tokenAddress = eventLog.address as string; + erc20ApprovalEvent.blockNumber = eventLog.blockNumber as number; + erc20ApprovalEvent.logIndex = eventLog.logIndex as number; + erc20ApprovalEvent.rawData = eventLog.data as string; + erc20ApprovalEvent.transactionHash = eventLog.transactionHash; + erc20ApprovalEvent.ownerAddress = eventLog.args._owner; + erc20ApprovalEvent.spenderAddress = eventLog.args._spender; + erc20ApprovalEvent.amount = eventLog.args._value; + return erc20ApprovalEvent; +} diff --git a/packages/pipeline/src/parsers/events/exchange_events.ts b/packages/pipeline/src/parsers/events/exchange_events.ts new file mode 100644 index 000000000..e18106c75 --- /dev/null +++ b/packages/pipeline/src/parsers/events/exchange_events.ts @@ -0,0 +1,133 @@ +import { ExchangeCancelEventArgs, ExchangeCancelUpToEventArgs, ExchangeFillEventArgs } from '@0x/contract-wrappers'; +import { assetDataUtils } from '@0x/order-utils'; +import { AssetProxyId, ERC721AssetData } from '@0x/types'; +import { LogWithDecodedArgs } from 'ethereum-types'; +import * as R from 'ramda'; + +import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeFillEvent } from '../../entities'; +import { bigNumbertoStringOrNull } from '../../utils'; + +/** + * Parses raw event logs for a fill event and returns an array of + * ExchangeFillEvent entities. + * @param eventLogs Raw event logs (e.g. returned from contract-wrappers). + */ +export const parseExchangeFillEvents: ( + eventLogs: Array<LogWithDecodedArgs<ExchangeFillEventArgs>>, +) => ExchangeFillEvent[] = R.map(_convertToExchangeFillEvent); + +/** + * Parses raw event logs for a cancel event and returns an array of + * ExchangeCancelEvent entities. + * @param eventLogs Raw event logs (e.g. returned from contract-wrappers). + */ +export const parseExchangeCancelEvents: ( + eventLogs: Array<LogWithDecodedArgs<ExchangeCancelEventArgs>>, +) => ExchangeCancelEvent[] = R.map(_convertToExchangeCancelEvent); + +/** + * Parses raw event logs for a CancelUpTo event and returns an array of + * ExchangeCancelUpToEvent entities. + * @param eventLogs Raw event logs (e.g. returned from contract-wrappers). + */ +export const parseExchangeCancelUpToEvents: ( + eventLogs: Array<LogWithDecodedArgs<ExchangeCancelUpToEventArgs>>, +) => ExchangeCancelUpToEvent[] = R.map(_convertToExchangeCancelUpToEvent); + +/** + * Converts a raw event log for a fill event into an ExchangeFillEvent entity. + * @param eventLog Raw event log (e.g. returned from contract-wrappers). + */ +export function _convertToExchangeFillEvent(eventLog: LogWithDecodedArgs<ExchangeFillEventArgs>): ExchangeFillEvent { + const makerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.makerAssetData); + const makerAssetType = makerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721'; + const takerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.takerAssetData); + const takerAssetType = takerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721'; + const exchangeFillEvent = new ExchangeFillEvent(); + exchangeFillEvent.contractAddress = eventLog.address as string; + exchangeFillEvent.blockNumber = eventLog.blockNumber as number; + exchangeFillEvent.logIndex = eventLog.logIndex as number; + exchangeFillEvent.rawData = eventLog.data as string; + exchangeFillEvent.transactionHash = eventLog.transactionHash; + exchangeFillEvent.makerAddress = eventLog.args.makerAddress; + exchangeFillEvent.takerAddress = eventLog.args.takerAddress; + exchangeFillEvent.feeRecipientAddress = eventLog.args.feeRecipientAddress; + exchangeFillEvent.senderAddress = eventLog.args.senderAddress; + exchangeFillEvent.makerAssetFilledAmount = eventLog.args.makerAssetFilledAmount; + exchangeFillEvent.takerAssetFilledAmount = eventLog.args.takerAssetFilledAmount; + exchangeFillEvent.makerFeePaid = eventLog.args.makerFeePaid; + exchangeFillEvent.takerFeePaid = eventLog.args.takerFeePaid; + exchangeFillEvent.orderHash = eventLog.args.orderHash; + exchangeFillEvent.rawMakerAssetData = eventLog.args.makerAssetData; + exchangeFillEvent.makerAssetType = makerAssetType; + exchangeFillEvent.makerAssetProxyId = makerAssetData.assetProxyId; + exchangeFillEvent.makerTokenAddress = makerAssetData.tokenAddress; + // tslint has a false positive here. Type assertion is required. + // tslint:disable-next-line:no-unnecessary-type-assertion + exchangeFillEvent.makerTokenId = bigNumbertoStringOrNull((makerAssetData as ERC721AssetData).tokenId); + exchangeFillEvent.rawTakerAssetData = eventLog.args.takerAssetData; + exchangeFillEvent.takerAssetType = takerAssetType; + exchangeFillEvent.takerAssetProxyId = takerAssetData.assetProxyId; + exchangeFillEvent.takerTokenAddress = takerAssetData.tokenAddress; + // tslint:disable-next-line:no-unnecessary-type-assertion + exchangeFillEvent.takerTokenId = bigNumbertoStringOrNull((takerAssetData as ERC721AssetData).tokenId); + return exchangeFillEvent; +} + +/** + * Converts a raw event log for a cancel event into an ExchangeCancelEvent + * entity. + * @param eventLog Raw event log (e.g. returned from contract-wrappers). + */ +export function _convertToExchangeCancelEvent( + eventLog: LogWithDecodedArgs<ExchangeCancelEventArgs>, +): ExchangeCancelEvent { + const makerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.makerAssetData); + const makerAssetType = makerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721'; + const takerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.takerAssetData); + const takerAssetType = takerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721'; + const exchangeCancelEvent = new ExchangeCancelEvent(); + exchangeCancelEvent.contractAddress = eventLog.address as string; + exchangeCancelEvent.blockNumber = eventLog.blockNumber as number; + exchangeCancelEvent.logIndex = eventLog.logIndex as number; + exchangeCancelEvent.rawData = eventLog.data as string; + exchangeCancelEvent.transactionHash = eventLog.transactionHash; + exchangeCancelEvent.makerAddress = eventLog.args.makerAddress; + exchangeCancelEvent.takerAddress = eventLog.args.takerAddress; + exchangeCancelEvent.feeRecipientAddress = eventLog.args.feeRecipientAddress; + exchangeCancelEvent.senderAddress = eventLog.args.senderAddress; + exchangeCancelEvent.orderHash = eventLog.args.orderHash; + exchangeCancelEvent.rawMakerAssetData = eventLog.args.makerAssetData; + exchangeCancelEvent.makerAssetType = makerAssetType; + exchangeCancelEvent.makerAssetProxyId = makerAssetData.assetProxyId; + exchangeCancelEvent.makerTokenAddress = makerAssetData.tokenAddress; + // tslint:disable-next-line:no-unnecessary-type-assertion + exchangeCancelEvent.makerTokenId = bigNumbertoStringOrNull((makerAssetData as ERC721AssetData).tokenId); + exchangeCancelEvent.rawTakerAssetData = eventLog.args.takerAssetData; + exchangeCancelEvent.takerAssetType = takerAssetType; + exchangeCancelEvent.takerAssetProxyId = takerAssetData.assetProxyId; + exchangeCancelEvent.takerTokenAddress = takerAssetData.tokenAddress; + // tslint:disable-next-line:no-unnecessary-type-assertion + exchangeCancelEvent.takerTokenId = bigNumbertoStringOrNull((takerAssetData as ERC721AssetData).tokenId); + return exchangeCancelEvent; +} + +/** + * Converts a raw event log for a cancelUpTo event into an + * ExchangeCancelUpToEvent entity. + * @param eventLog Raw event log (e.g. returned from contract-wrappers). + */ +export function _convertToExchangeCancelUpToEvent( + eventLog: LogWithDecodedArgs<ExchangeCancelUpToEventArgs>, +): ExchangeCancelUpToEvent { + const exchangeCancelUpToEvent = new ExchangeCancelUpToEvent(); + exchangeCancelUpToEvent.contractAddress = eventLog.address as string; + exchangeCancelUpToEvent.blockNumber = eventLog.blockNumber as number; + exchangeCancelUpToEvent.logIndex = eventLog.logIndex as number; + exchangeCancelUpToEvent.rawData = eventLog.data as string; + exchangeCancelUpToEvent.transactionHash = eventLog.transactionHash; + exchangeCancelUpToEvent.makerAddress = eventLog.args.makerAddress; + exchangeCancelUpToEvent.senderAddress = eventLog.args.senderAddress; + exchangeCancelUpToEvent.orderEpoch = eventLog.args.orderEpoch; + return exchangeCancelUpToEvent; +} diff --git a/packages/pipeline/src/parsers/events/index.ts b/packages/pipeline/src/parsers/events/index.ts index e18106c75..3f9915e8b 100644 --- a/packages/pipeline/src/parsers/events/index.ts +++ b/packages/pipeline/src/parsers/events/index.ts @@ -1,133 +1,2 @@ -import { ExchangeCancelEventArgs, ExchangeCancelUpToEventArgs, ExchangeFillEventArgs } from '@0x/contract-wrappers'; -import { assetDataUtils } from '@0x/order-utils'; -import { AssetProxyId, ERC721AssetData } from '@0x/types'; -import { LogWithDecodedArgs } from 'ethereum-types'; -import * as R from 'ramda'; - -import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeFillEvent } from '../../entities'; -import { bigNumbertoStringOrNull } from '../../utils'; - -/** - * Parses raw event logs for a fill event and returns an array of - * ExchangeFillEvent entities. - * @param eventLogs Raw event logs (e.g. returned from contract-wrappers). - */ -export const parseExchangeFillEvents: ( - eventLogs: Array<LogWithDecodedArgs<ExchangeFillEventArgs>>, -) => ExchangeFillEvent[] = R.map(_convertToExchangeFillEvent); - -/** - * Parses raw event logs for a cancel event and returns an array of - * ExchangeCancelEvent entities. - * @param eventLogs Raw event logs (e.g. returned from contract-wrappers). - */ -export const parseExchangeCancelEvents: ( - eventLogs: Array<LogWithDecodedArgs<ExchangeCancelEventArgs>>, -) => ExchangeCancelEvent[] = R.map(_convertToExchangeCancelEvent); - -/** - * Parses raw event logs for a CancelUpTo event and returns an array of - * ExchangeCancelUpToEvent entities. - * @param eventLogs Raw event logs (e.g. returned from contract-wrappers). - */ -export const parseExchangeCancelUpToEvents: ( - eventLogs: Array<LogWithDecodedArgs<ExchangeCancelUpToEventArgs>>, -) => ExchangeCancelUpToEvent[] = R.map(_convertToExchangeCancelUpToEvent); - -/** - * Converts a raw event log for a fill event into an ExchangeFillEvent entity. - * @param eventLog Raw event log (e.g. returned from contract-wrappers). - */ -export function _convertToExchangeFillEvent(eventLog: LogWithDecodedArgs<ExchangeFillEventArgs>): ExchangeFillEvent { - const makerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.makerAssetData); - const makerAssetType = makerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721'; - const takerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.takerAssetData); - const takerAssetType = takerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721'; - const exchangeFillEvent = new ExchangeFillEvent(); - exchangeFillEvent.contractAddress = eventLog.address as string; - exchangeFillEvent.blockNumber = eventLog.blockNumber as number; - exchangeFillEvent.logIndex = eventLog.logIndex as number; - exchangeFillEvent.rawData = eventLog.data as string; - exchangeFillEvent.transactionHash = eventLog.transactionHash; - exchangeFillEvent.makerAddress = eventLog.args.makerAddress; - exchangeFillEvent.takerAddress = eventLog.args.takerAddress; - exchangeFillEvent.feeRecipientAddress = eventLog.args.feeRecipientAddress; - exchangeFillEvent.senderAddress = eventLog.args.senderAddress; - exchangeFillEvent.makerAssetFilledAmount = eventLog.args.makerAssetFilledAmount; - exchangeFillEvent.takerAssetFilledAmount = eventLog.args.takerAssetFilledAmount; - exchangeFillEvent.makerFeePaid = eventLog.args.makerFeePaid; - exchangeFillEvent.takerFeePaid = eventLog.args.takerFeePaid; - exchangeFillEvent.orderHash = eventLog.args.orderHash; - exchangeFillEvent.rawMakerAssetData = eventLog.args.makerAssetData; - exchangeFillEvent.makerAssetType = makerAssetType; - exchangeFillEvent.makerAssetProxyId = makerAssetData.assetProxyId; - exchangeFillEvent.makerTokenAddress = makerAssetData.tokenAddress; - // tslint has a false positive here. Type assertion is required. - // tslint:disable-next-line:no-unnecessary-type-assertion - exchangeFillEvent.makerTokenId = bigNumbertoStringOrNull((makerAssetData as ERC721AssetData).tokenId); - exchangeFillEvent.rawTakerAssetData = eventLog.args.takerAssetData; - exchangeFillEvent.takerAssetType = takerAssetType; - exchangeFillEvent.takerAssetProxyId = takerAssetData.assetProxyId; - exchangeFillEvent.takerTokenAddress = takerAssetData.tokenAddress; - // tslint:disable-next-line:no-unnecessary-type-assertion - exchangeFillEvent.takerTokenId = bigNumbertoStringOrNull((takerAssetData as ERC721AssetData).tokenId); - return exchangeFillEvent; -} - -/** - * Converts a raw event log for a cancel event into an ExchangeCancelEvent - * entity. - * @param eventLog Raw event log (e.g. returned from contract-wrappers). - */ -export function _convertToExchangeCancelEvent( - eventLog: LogWithDecodedArgs<ExchangeCancelEventArgs>, -): ExchangeCancelEvent { - const makerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.makerAssetData); - const makerAssetType = makerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721'; - const takerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.takerAssetData); - const takerAssetType = takerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721'; - const exchangeCancelEvent = new ExchangeCancelEvent(); - exchangeCancelEvent.contractAddress = eventLog.address as string; - exchangeCancelEvent.blockNumber = eventLog.blockNumber as number; - exchangeCancelEvent.logIndex = eventLog.logIndex as number; - exchangeCancelEvent.rawData = eventLog.data as string; - exchangeCancelEvent.transactionHash = eventLog.transactionHash; - exchangeCancelEvent.makerAddress = eventLog.args.makerAddress; - exchangeCancelEvent.takerAddress = eventLog.args.takerAddress; - exchangeCancelEvent.feeRecipientAddress = eventLog.args.feeRecipientAddress; - exchangeCancelEvent.senderAddress = eventLog.args.senderAddress; - exchangeCancelEvent.orderHash = eventLog.args.orderHash; - exchangeCancelEvent.rawMakerAssetData = eventLog.args.makerAssetData; - exchangeCancelEvent.makerAssetType = makerAssetType; - exchangeCancelEvent.makerAssetProxyId = makerAssetData.assetProxyId; - exchangeCancelEvent.makerTokenAddress = makerAssetData.tokenAddress; - // tslint:disable-next-line:no-unnecessary-type-assertion - exchangeCancelEvent.makerTokenId = bigNumbertoStringOrNull((makerAssetData as ERC721AssetData).tokenId); - exchangeCancelEvent.rawTakerAssetData = eventLog.args.takerAssetData; - exchangeCancelEvent.takerAssetType = takerAssetType; - exchangeCancelEvent.takerAssetProxyId = takerAssetData.assetProxyId; - exchangeCancelEvent.takerTokenAddress = takerAssetData.tokenAddress; - // tslint:disable-next-line:no-unnecessary-type-assertion - exchangeCancelEvent.takerTokenId = bigNumbertoStringOrNull((takerAssetData as ERC721AssetData).tokenId); - return exchangeCancelEvent; -} - -/** - * Converts a raw event log for a cancelUpTo event into an - * ExchangeCancelUpToEvent entity. - * @param eventLog Raw event log (e.g. returned from contract-wrappers). - */ -export function _convertToExchangeCancelUpToEvent( - eventLog: LogWithDecodedArgs<ExchangeCancelUpToEventArgs>, -): ExchangeCancelUpToEvent { - const exchangeCancelUpToEvent = new ExchangeCancelUpToEvent(); - exchangeCancelUpToEvent.contractAddress = eventLog.address as string; - exchangeCancelUpToEvent.blockNumber = eventLog.blockNumber as number; - exchangeCancelUpToEvent.logIndex = eventLog.logIndex as number; - exchangeCancelUpToEvent.rawData = eventLog.data as string; - exchangeCancelUpToEvent.transactionHash = eventLog.transactionHash; - exchangeCancelUpToEvent.makerAddress = eventLog.args.makerAddress; - exchangeCancelUpToEvent.senderAddress = eventLog.args.senderAddress; - exchangeCancelUpToEvent.orderEpoch = eventLog.args.orderEpoch; - return exchangeCancelUpToEvent; -} +export { parseExchangeCancelEvents, parseExchangeCancelUpToEvents, parseExchangeFillEvents } from './exchange_events'; +export { parseERC20ApprovalEvents } from './erc20_events'; diff --git a/packages/pipeline/src/parsers/idex_orders/index.ts b/packages/pipeline/src/parsers/idex_orders/index.ts new file mode 100644 index 000000000..dfe27455c --- /dev/null +++ b/packages/pipeline/src/parsers/idex_orders/index.ts @@ -0,0 +1,77 @@ +import { BigNumber } from '@0x/utils'; + +import { aggregateOrders } from '../utils'; + +import { IdexOrder, IdexOrderbook, IdexOrderParam } from '../../data_sources/idex'; +import { TokenOrderbookSnapshot as TokenOrder } from '../../entities'; +import { OrderType } from '../../types'; + +/** + * Marque function of this file. + * 1) Takes in orders from an orderbook, + * 2) Aggregates them by price point, + * 3) Parses them into entities which are then saved into the database. + * @param idexOrderbook raw orderbook that we pull from the Idex API. + * @param observedTimestamp Time at which the orders for the market were pulled. + * @param source The exchange where these orders are placed. In this case 'idex'. + */ +export function parseIdexOrders(idexOrderbook: IdexOrderbook, observedTimestamp: number, source: string): TokenOrder[] { + const aggregatedBids = aggregateOrders(idexOrderbook.bids); + // Any of the bid orders' params will work + const idexBidOrder = idexOrderbook.bids[0]; + const parsedBids = + aggregatedBids.length > 0 + ? aggregatedBids.map(order => parseIdexOrder(idexBidOrder.params, observedTimestamp, 'bid', source, order)) + : []; + + const aggregatedAsks = aggregateOrders(idexOrderbook.asks); + // Any of the ask orders' params will work + const idexAskOrder = idexOrderbook.asks[0]; + const parsedAsks = + aggregatedAsks.length > 0 + ? aggregatedAsks.map(order => parseIdexOrder(idexAskOrder.params, observedTimestamp, 'ask', source, order)) + : []; + return parsedBids.concat(parsedAsks); +} + +/** + * Parse a single aggregated Idex order in order to form a tokenOrder entity + * which can be saved into the database. + * @param idexOrderParam An object containing information about the market where these + * trades have been placed. + * @param observedTimestamp The time when the API response returned back to us. + * @param orderType 'bid' or 'ask' enum. + * @param source Exchange where these orders were placed. + * @param idexOrder A <price, amount> tuple which we will convert to volume-basis. + */ +export function parseIdexOrder( + idexOrderParam: IdexOrderParam, + observedTimestamp: number, + orderType: OrderType, + source: string, + idexOrder: [string, BigNumber], +): TokenOrder { + const tokenOrder = new TokenOrder(); + const price = new BigNumber(idexOrder[0]); + const amount = idexOrder[1]; + + tokenOrder.source = source; + tokenOrder.observedTimestamp = observedTimestamp; + tokenOrder.orderType = orderType; + tokenOrder.price = price; + tokenOrder.baseVolume = amount; + tokenOrder.quoteVolume = price.times(amount); + + if (orderType === 'bid') { + tokenOrder.baseAssetSymbol = idexOrderParam.buySymbol; + tokenOrder.baseAssetAddress = idexOrderParam.tokenBuy; + tokenOrder.quoteAssetSymbol = idexOrderParam.sellSymbol; + tokenOrder.quoteAssetAddress = idexOrderParam.tokenSell; + } else { + tokenOrder.baseAssetSymbol = idexOrderParam.sellSymbol; + tokenOrder.baseAssetAddress = idexOrderParam.tokenSell; + tokenOrder.quoteAssetSymbol = idexOrderParam.buySymbol; + tokenOrder.quoteAssetAddress = idexOrderParam.tokenBuy; + } + return tokenOrder; +} diff --git a/packages/pipeline/src/parsers/oasis_orders/index.ts b/packages/pipeline/src/parsers/oasis_orders/index.ts new file mode 100644 index 000000000..7aafbf460 --- /dev/null +++ b/packages/pipeline/src/parsers/oasis_orders/index.ts @@ -0,0 +1,71 @@ +import { BigNumber } from '@0x/utils'; +import * as R from 'ramda'; + +import { aggregateOrders } from '../utils'; + +import { OasisMarket, OasisOrder } from '../../data_sources/oasis'; +import { TokenOrderbookSnapshot as TokenOrder } from '../../entities'; +import { OrderType } from '../../types'; + +/** + * Marque function of this file. + * 1) Takes in orders from an orderbook, + * 2) Aggregates them according to price point, + * 3) Builds TokenOrder entity with other information attached. + * @param oasisOrderbook A raw orderbook that we pull from the Oasis API. + * @param oasisMarket An object containing market data also directly from the API. + * @param observedTimestamp Time at which the orders for the market were pulled. + * @param source The exchange where these orders are placed. In this case 'oasis'. + */ +export function parseOasisOrders( + oasisOrderbook: OasisOrder[], + oasisMarket: OasisMarket, + observedTimestamp: number, + source: string, +): TokenOrder[] { + const aggregatedBids = aggregateOrders(R.filter(R.propEq('act', 'bid'), oasisOrderbook)); + const aggregatedAsks = aggregateOrders(R.filter(R.propEq('act', 'ask'), oasisOrderbook)); + const parsedBids = aggregatedBids.map(order => + parseOasisOrder(oasisMarket, observedTimestamp, 'bid', source, order), + ); + const parsedAsks = aggregatedAsks.map(order => + parseOasisOrder(oasisMarket, observedTimestamp, 'ask', source, order), + ); + return parsedBids.concat(parsedAsks); +} + +/** + * Parse a single aggregated Oasis order to form a tokenOrder entity + * which can be saved into the database. + * @param oasisMarket An object containing information about the market where these + * trades have been placed. + * @param observedTimestamp The time when the API response returned back to us. + * @param orderType 'bid' or 'ask' enum. + * @param source Exchange where these orders were placed. + * @param oasisOrder A <price, amount> tuple which we will convert to volume-basis. + */ +export function parseOasisOrder( + oasisMarket: OasisMarket, + observedTimestamp: number, + orderType: OrderType, + source: string, + oasisOrder: [string, BigNumber], +): TokenOrder { + const tokenOrder = new TokenOrder(); + const price = new BigNumber(oasisOrder[0]); + const amount = oasisOrder[1]; + + tokenOrder.source = source; + tokenOrder.observedTimestamp = observedTimestamp; + tokenOrder.orderType = orderType; + tokenOrder.price = price; + + tokenOrder.baseAssetSymbol = oasisMarket.base; + tokenOrder.baseAssetAddress = null; // Oasis doesn't provide address information + tokenOrder.baseVolume = price.times(amount); + + tokenOrder.quoteAssetSymbol = oasisMarket.quote; + tokenOrder.quoteAssetAddress = null; // Oasis doesn't provide address information + tokenOrder.quoteVolume = amount; + return tokenOrder; +} diff --git a/packages/pipeline/src/parsers/token_metadata/index.ts b/packages/pipeline/src/parsers/token_metadata/index.ts index 3b3e05d76..65e0aaa6e 100644 --- a/packages/pipeline/src/parsers/token_metadata/index.ts +++ b/packages/pipeline/src/parsers/token_metadata/index.ts @@ -1,9 +1,8 @@ -import { BigNumber } from '@0x/utils'; import * as R from 'ramda'; import { MetamaskTrustedTokenMeta, ZeroExTrustedTokenMeta } from '../../data_sources/trusted_tokens'; import { TokenMetadata } from '../../entities'; -import {} from '../../utils'; +import { toBigNumberOrNull } from '../../utils'; /** * Parses Metamask's trusted tokens list. @@ -26,7 +25,7 @@ function parseMetamaskTrustedToken(resp: MetamaskTrustedTokenMeta, address: stri const trustedToken = new TokenMetadata(); trustedToken.address = address; - trustedToken.decimals = new BigNumber(resp.decimals); + trustedToken.decimals = toBigNumberOrNull(resp.decimals); trustedToken.symbol = resp.symbol; trustedToken.name = resp.name; trustedToken.authority = 'metamask'; @@ -38,7 +37,7 @@ function parseZeroExTrustedToken(resp: ZeroExTrustedTokenMeta): TokenMetadata { const trustedToken = new TokenMetadata(); trustedToken.address = resp.address; - trustedToken.decimals = new BigNumber(resp.decimals); + trustedToken.decimals = toBigNumberOrNull(resp.decimals); trustedToken.symbol = resp.symbol; trustedToken.name = resp.name; trustedToken.authority = '0x'; diff --git a/packages/pipeline/src/parsers/utils.ts b/packages/pipeline/src/parsers/utils.ts new file mode 100644 index 000000000..860729e9f --- /dev/null +++ b/packages/pipeline/src/parsers/utils.ts @@ -0,0 +1,28 @@ +import { BigNumber } from '@0x/utils'; + +export interface GenericRawOrder { + price: string; + amount: string; +} + +/** + * Aggregates individual orders by price point. Filters zero amount orders. + * @param rawOrders An array of objects that have price and amount information. + */ +export function aggregateOrders(rawOrders: GenericRawOrder[]): Array<[string, BigNumber]> { + const aggregatedOrders = new Map<string, BigNumber>(); + rawOrders.forEach(order => { + const amount = new BigNumber(order.amount); + if (amount.isZero()) { + return; + } + // Use string instead of BigNum to aggregate by value instead of variable. + // Convert to BigNumber first to consolidate different string + // representations of the same number. Eg. '0.0' and '0.00'. + const price = new BigNumber(order.price).toString(); + + const existingAmount = aggregatedOrders.get(price) || new BigNumber(0); + aggregatedOrders.set(price, amount.plus(existingAmount)); + }); + return Array.from(aggregatedOrders.entries()); +} diff --git a/packages/pipeline/src/scripts/pull_competing_dex_trades.ts b/packages/pipeline/src/scripts/pull_competing_dex_trades.ts index 4e4c12dd0..1478d5615 100644 --- a/packages/pipeline/src/scripts/pull_competing_dex_trades.ts +++ b/packages/pipeline/src/scripts/pull_competing_dex_trades.ts @@ -15,11 +15,11 @@ let connection: Connection; (async () => { connection = await createConnection(ormConfig as ConnectionOptions); - await getAndSaveTrades(); + await getAndSaveTradesAsync(); process.exit(0); })().catch(handleError); -async function getAndSaveTrades(): Promise<void> { +async function getAndSaveTradesAsync(): Promise<void> { const apiKey = process.env.BLOXY_API_KEY; if (apiKey === undefined) { throw new Error('Missing required env var: BLOXY_API_KEY'); diff --git a/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts index 7868e9c5a..4e00f258f 100644 --- a/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts +++ b/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts @@ -25,7 +25,7 @@ let connection: Connection; const markets = await ddexSource.getActiveMarketsAsync(); for (const marketsChunk of R.splitEvery(MARKET_ORDERBOOK_REQUEST_BATCH_SIZE, markets)) { await Promise.all( - marketsChunk.map(async (market: DdexMarket) => getAndSaveMarketOrderbook(ddexSource, market)), + marketsChunk.map(async (market: DdexMarket) => getAndSaveMarketOrderbookAsync(ddexSource, market)), ); await new Promise<void>(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY)); } @@ -38,7 +38,7 @@ let connection: Connection; * @param ddexSource Data source which can query Ddex API. * @param market Object from Ddex API containing market data. */ -async function getAndSaveMarketOrderbook(ddexSource: DdexSource, market: DdexMarket): Promise<void> { +async function getAndSaveMarketOrderbookAsync(ddexSource: DdexSource, market: DdexMarket): Promise<void> { const orderBook = await ddexSource.getMarketOrderbookAsync(market.id); const observedTimestamp = Date.now(); diff --git a/packages/pipeline/src/scripts/pull_erc20_events.ts b/packages/pipeline/src/scripts/pull_erc20_events.ts new file mode 100644 index 000000000..0ad12c97a --- /dev/null +++ b/packages/pipeline/src/scripts/pull_erc20_events.ts @@ -0,0 +1,66 @@ +// tslint:disable:no-console +import { getContractAddressesForNetworkOrThrow } from '@0x/contract-addresses'; +import { web3Factory } from '@0x/dev-utils'; +import { Web3ProviderEngine } from '@0x/subproviders'; +import { Web3Wrapper } from '@0x/web3-wrapper'; +import 'reflect-metadata'; +import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; + +import { ERC20EventsSource } from '../data_sources/contract-wrappers/erc20_events'; +import { ERC20ApprovalEvent } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseERC20ApprovalEvents } from '../parsers/events'; +import { handleError, INFURA_ROOT_URL } from '../utils'; + +const NETWORK_ID = 1; +const START_BLOCK_OFFSET = 100; // Number of blocks before the last known block to consider when updating fill events. +const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. +const BLOCK_FINALITY_THRESHOLD = 10; // When to consider blocks as final. Used to compute default endBlock. +const WETH_START_BLOCK = 4719568; // Block number when the WETH contract was deployed. + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const provider = web3Factory.getRpcProvider({ + rpcUrl: INFURA_ROOT_URL, + }); + const endBlock = await calculateEndBlockAsync(provider); + await getAndSaveWETHApprovalEventsAsync(provider, endBlock); + process.exit(0); +})().catch(handleError); + +async function getAndSaveWETHApprovalEventsAsync(provider: Web3ProviderEngine, endBlock: number): Promise<void> { + console.log('Checking existing approval events...'); + const repository = connection.getRepository(ERC20ApprovalEvent); + const startBlock = (await getStartBlockAsync(repository)) || WETH_START_BLOCK; + + console.log(`Getting WETH approval events starting at ${startBlock}...`); + const wethTokenAddress = getContractAddressesForNetworkOrThrow(NETWORK_ID).etherToken; + const eventsSource = new ERC20EventsSource(provider, NETWORK_ID, wethTokenAddress); + const eventLogs = await eventsSource.getApprovalEventsAsync(startBlock, endBlock); + + console.log(`Parsing ${eventLogs.length} WETH approval events...`); + const events = parseERC20ApprovalEvents(eventLogs); + console.log(`Retrieved and parsed ${events.length} total WETH approval events.`); + await repository.save(events, { chunk: Math.ceil(events.length / BATCH_SAVE_SIZE) }); +} + +async function calculateEndBlockAsync(provider: Web3ProviderEngine): Promise<number> { + const web3Wrapper = new Web3Wrapper(provider); + const currentBlock = await web3Wrapper.getBlockNumberAsync(); + return currentBlock - BLOCK_FINALITY_THRESHOLD; +} + +async function getStartBlockAsync(repository: Repository<ERC20ApprovalEvent>): Promise<number | null> { + const fillEventCount = await repository.count(); + if (fillEventCount === 0) { + console.log(`No existing approval events found.`); + return null; + } + const queryResult = await connection.query( + `SELECT block_number FROM raw.erc20_approval_events ORDER BY block_number DESC LIMIT 1`, + ); + const lastKnownBlock = queryResult[0].block_number; + return lastKnownBlock - START_BLOCK_OFFSET; +} diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_exchange_events.ts index 80abbb8b0..e98fc6629 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_exchange_events.ts @@ -1,5 +1,7 @@ // tslint:disable:no-console import { web3Factory } from '@0x/dev-utils'; +import { Web3ProviderEngine } from '@0x/subproviders'; +import { Web3Wrapper } from '@0x/web3-wrapper'; import R = require('ramda'); import 'reflect-metadata'; import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; @@ -12,6 +14,7 @@ import { EXCHANGE_START_BLOCK, handleError, INFURA_ROOT_URL } from '../utils'; const START_BLOCK_OFFSET = 100; // Number of blocks before the last known block to consider when updating fill events. const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. +const BLOCK_FINALITY_THRESHOLD = 10; // When to consider blocks as final. Used to compute default endBlock. let connection: Connection; @@ -20,43 +23,44 @@ let connection: Connection; const provider = web3Factory.getRpcProvider({ rpcUrl: INFURA_ROOT_URL, }); + const endBlock = await calculateEndBlockAsync(provider); const eventsSource = new ExchangeEventsSource(provider, 1); - await getFillEventsAsync(eventsSource); - await getCancelEventsAsync(eventsSource); - await getCancelUpToEventsAsync(eventsSource); + await getFillEventsAsync(eventsSource, endBlock); + await getCancelEventsAsync(eventsSource, endBlock); + await getCancelUpToEventsAsync(eventsSource, endBlock); process.exit(0); })().catch(handleError); -async function getFillEventsAsync(eventsSource: ExchangeEventsSource): Promise<void> { +async function getFillEventsAsync(eventsSource: ExchangeEventsSource, endBlock: number): Promise<void> { console.log('Checking existing fill events...'); const repository = connection.getRepository(ExchangeFillEvent); const startBlock = await getStartBlockAsync(repository); console.log(`Getting fill events starting at ${startBlock}...`); - const eventLogs = await eventsSource.getFillEventsAsync(startBlock); + const eventLogs = await eventsSource.getFillEventsAsync(startBlock, endBlock); console.log('Parsing fill events...'); const events = parseExchangeFillEvents(eventLogs); console.log(`Retrieved and parsed ${events.length} total fill events.`); await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); } -async function getCancelEventsAsync(eventsSource: ExchangeEventsSource): Promise<void> { +async function getCancelEventsAsync(eventsSource: ExchangeEventsSource, endBlock: number): Promise<void> { console.log('Checking existing cancel events...'); const repository = connection.getRepository(ExchangeCancelEvent); const startBlock = await getStartBlockAsync(repository); console.log(`Getting cancel events starting at ${startBlock}...`); - const eventLogs = await eventsSource.getCancelEventsAsync(startBlock); + const eventLogs = await eventsSource.getCancelEventsAsync(startBlock, endBlock); console.log('Parsing cancel events...'); const events = parseExchangeCancelEvents(eventLogs); console.log(`Retrieved and parsed ${events.length} total cancel events.`); await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); } -async function getCancelUpToEventsAsync(eventsSource: ExchangeEventsSource): Promise<void> { +async function getCancelUpToEventsAsync(eventsSource: ExchangeEventsSource, endBlock: number): Promise<void> { console.log('Checking existing CancelUpTo events...'); const repository = connection.getRepository(ExchangeCancelUpToEvent); const startBlock = await getStartBlockAsync(repository); console.log(`Getting CancelUpTo events starting at ${startBlock}...`); - const eventLogs = await eventsSource.getCancelUpToEventsAsync(startBlock); + const eventLogs = await eventsSource.getCancelUpToEventsAsync(startBlock, endBlock); console.log('Parsing CancelUpTo events...'); const events = parseExchangeCancelUpToEvents(eventLogs); console.log(`Retrieved and parsed ${events.length} total CancelUpTo events.`); @@ -134,3 +138,9 @@ async function saveIndividuallyWithFallbackAsync<T extends ExchangeEvent>( } } } + +async function calculateEndBlockAsync(provider: Web3ProviderEngine): Promise<number> { + const web3Wrapper = new Web3Wrapper(provider); + const currentBlock = await web3Wrapper.getBlockNumberAsync(); + return currentBlock - BLOCK_FINALITY_THRESHOLD; +} diff --git a/packages/pipeline/src/scripts/pull_idex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_idex_orderbook_snapshots.ts new file mode 100644 index 000000000..490b17766 --- /dev/null +++ b/packages/pipeline/src/scripts/pull_idex_orderbook_snapshots.ts @@ -0,0 +1,63 @@ +import { logUtils } from '@0x/utils'; +import * as R from 'ramda'; +import { Connection, ConnectionOptions, createConnection } from 'typeorm'; + +import { IDEX_SOURCE, IdexSource } from '../data_sources/idex'; +import { TokenOrderbookSnapshot as TokenOrder } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseIdexOrders } from '../parsers/idex_orders'; +import { handleError } from '../utils'; + +// Number of orders to save at once. +const BATCH_SAVE_SIZE = 1000; + +// Number of markets to retrieve orderbooks for at once. +const MARKET_ORDERBOOK_REQUEST_BATCH_SIZE = 100; + +// Delay between market orderbook requests. +const MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY = 2000; + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const idexSource = new IdexSource(); + logUtils.log('Getting all IDEX markets'); + const markets = await idexSource.getMarketsAsync(); + logUtils.log(`Got ${markets.length} markets.`); + for (const marketsChunk of R.splitEvery(MARKET_ORDERBOOK_REQUEST_BATCH_SIZE, markets)) { + await Promise.all( + marketsChunk.map(async (marketId: string) => getAndSaveMarketOrderbookAsync(idexSource, marketId)), + ); + await new Promise<void>(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY)); + } + process.exit(0); +})().catch(handleError); + +/** + * Retrieve orderbook from Idex API for a given market. Parse orders and insert + * them into our database. + * @param idexSource Data source which can query Idex API. + * @param marketId String representing market of interest, eg. 'ETH_TIC'. + */ +async function getAndSaveMarketOrderbookAsync(idexSource: IdexSource, marketId: string): Promise<void> { + logUtils.log(`${marketId}: Retrieving orderbook.`); + const orderBook = await idexSource.getMarketOrderbookAsync(marketId); + const observedTimestamp = Date.now(); + + if (!R.has('bids', orderBook) || !R.has('asks', orderBook)) { + logUtils.warn(`${marketId}: Orderbook faulty.`); + return; + } + + logUtils.log(`${marketId}: Parsing orders.`); + const orders = parseIdexOrders(orderBook, observedTimestamp, IDEX_SOURCE); + + if (orders.length > 0) { + logUtils.log(`${marketId}: Saving ${orders.length} orders.`); + const TokenOrderRepository = connection.getRepository(TokenOrder); + await TokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) }); + } else { + logUtils.log(`${marketId}: 0 orders to save.`); + } +} diff --git a/packages/pipeline/src/scripts/pull_missing_blocks.ts b/packages/pipeline/src/scripts/pull_missing_blocks.ts index b7bd51f08..a5203824c 100644 --- a/packages/pipeline/src/scripts/pull_missing_blocks.ts +++ b/packages/pipeline/src/scripts/pull_missing_blocks.ts @@ -24,10 +24,10 @@ let connection: Connection; (async () => { connection = await createConnection(ormConfig as ConnectionOptions); const provider = web3Factory.getRpcProvider({ - rpcUrl: `${INFURA_ROOT_URL}/${process.env.INFURA_API_KEY}`, + rpcUrl: INFURA_ROOT_URL, }); const web3Source = new Web3Source(provider); - await getAllMissingBlocks(web3Source); + await getAllMissingBlocksAsync(web3Source); process.exit(0); })().catch(handleError); @@ -35,26 +35,39 @@ interface MissingBlocksResponse { block_number: string; } -async function getAllMissingBlocks(web3Source: Web3Source): Promise<void> { +async function getAllMissingBlocksAsync(web3Source: Web3Source): Promise<void> { const blocksRepository = connection.getRepository(Block); let fromBlock = EXCHANGE_START_BLOCK; while (true) { - const blockNumbers = await getMissingBlockNumbers(fromBlock); + const blockNumbers = await getMissingBlockNumbersAsync(fromBlock); if (blockNumbers.length === 0) { // There are no more missing blocks. We're done. break; } - await getAndSaveBlocks(web3Source, blocksRepository, blockNumbers); + await getAndSaveBlocksAsync(web3Source, blocksRepository, blockNumbers); fromBlock = Math.max(...blockNumbers) + 1; } const totalBlocks = await blocksRepository.count(); console.log(`Done saving blocks. There are now ${totalBlocks} total blocks.`); } -async function getMissingBlockNumbers(fromBlock: number): Promise<number[]> { +async function getMissingBlockNumbersAsync(fromBlock: number): Promise<number[]> { console.log(`Checking for missing blocks starting at ${fromBlock}...`); + // Note(albrow): The easiest way to get all the blocks we need is to + // consider all the events tables together in a single query. If this query + // gets too slow, we should consider re-architecting so that we can work on + // getting the blocks for one type of event at a time. const response = (await connection.query( - 'SELECT DISTINCT(block_number) FROM raw.exchange_fill_events WHERE block_number NOT IN (SELECT number FROM raw.blocks) AND block_number >= $1 ORDER BY block_number ASC LIMIT $2', + `WITH all_events AS ( + SELECT block_number FROM raw.exchange_fill_events + UNION SELECT block_number FROM raw.exchange_cancel_events + UNION SELECT block_number FROM raw.exchange_cancel_up_to_events + UNION SELECT block_number FROM raw.erc20_approval_events + ) + SELECT DISTINCT(block_number) FROM all_events + WHERE block_number NOT IN (SELECT number FROM raw.blocks) + AND block_number >= $1 + ORDER BY block_number ASC LIMIT $2`, [fromBlock, MAX_BLOCKS_PER_QUERY], )) as MissingBlocksResponse[]; const blockNumberStrings = R.pluck('block_number', response); @@ -63,7 +76,7 @@ async function getMissingBlockNumbers(fromBlock: number): Promise<number[]> { return blockNumbers; } -async function getAndSaveBlocks( +async function getAndSaveBlocksAsync( web3Source: Web3Source, blocksRepository: Repository<Block>, blockNumbers: number[], diff --git a/packages/pipeline/src/scripts/pull_oasis_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_oasis_orderbook_snapshots.ts new file mode 100644 index 000000000..c4dcf6c83 --- /dev/null +++ b/packages/pipeline/src/scripts/pull_oasis_orderbook_snapshots.ts @@ -0,0 +1,58 @@ +import { logUtils } from '@0x/utils'; +import * as R from 'ramda'; +import { Connection, ConnectionOptions, createConnection } from 'typeorm'; + +import { OASIS_SOURCE, OasisMarket, OasisSource } from '../data_sources/oasis'; +import { TokenOrderbookSnapshot as TokenOrder } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseOasisOrders } from '../parsers/oasis_orders'; +import { handleError } from '../utils'; + +// Number of orders to save at once. +const BATCH_SAVE_SIZE = 1000; + +// Number of markets to retrieve orderbooks for at once. +const MARKET_ORDERBOOK_REQUEST_BATCH_SIZE = 50; + +// Delay between market orderbook requests. +const MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY = 1000; + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const oasisSource = new OasisSource(); + logUtils.log('Getting all active Oasis markets'); + const markets = await oasisSource.getActiveMarketsAsync(); + logUtils.log(`Got ${markets.length} markets.`); + for (const marketsChunk of R.splitEvery(MARKET_ORDERBOOK_REQUEST_BATCH_SIZE, markets)) { + await Promise.all( + marketsChunk.map(async (market: OasisMarket) => getAndSaveMarketOrderbookAsync(oasisSource, market)), + ); + await new Promise<void>(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY)); + } + process.exit(0); +})().catch(handleError); + +/** + * Retrieve orderbook from Oasis API for a given market. Parse orders and insert + * them into our database. + * @param oasisSource Data source which can query Oasis API. + * @param marketId String identifying market we want data for. eg. 'REPAUG'. + */ +async function getAndSaveMarketOrderbookAsync(oasisSource: OasisSource, market: OasisMarket): Promise<void> { + logUtils.log(`${market.id}: Retrieving orderbook.`); + const orderBook = await oasisSource.getMarketOrderbookAsync(market.id); + const observedTimestamp = Date.now(); + + logUtils.log(`${market.id}: Parsing orders.`); + const orders = parseOasisOrders(orderBook, market, observedTimestamp, OASIS_SOURCE); + + if (orders.length > 0) { + logUtils.log(`${market.id}: Saving ${orders.length} orders.`); + const TokenOrderRepository = connection.getRepository(TokenOrder); + await TokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) }); + } else { + logUtils.log(`${market.id}: 0 orders to save.`); + } +} diff --git a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts index 7377a64d8..d44eb5cc6 100644 --- a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts +++ b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts @@ -11,7 +11,7 @@ import { fetchOHLCVTradingPairsAsync, TradingPair } from '../utils/get_ohlcv_tra const SOURCE_NAME = 'CryptoCompare'; const TWO_HOURS_AGO = new Date().getTime() - 2 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers -const MAX_CONCURRENT_REQUESTS = parseInt(process.env.CRYPTOCOMPARE_MAX_CONCURRENT_REQUESTS || '14', 10); // tslint:disable-line:custom-no-magic-numbers +const MAX_REQS_PER_SECOND = parseInt(process.env.CRYPTOCOMPARE_MAX_REQS_PER_SECOND || '15', 10); // tslint:disable-line:custom-no-magic-numbers const EARLIEST_BACKFILL_DATE = process.env.OHLCV_EARLIEST_BACKFILL_DATE || '2014-06-01'; const EARLIEST_BACKFILL_TIME = new Date(EARLIEST_BACKFILL_DATE).getTime(); @@ -20,7 +20,7 @@ let connection: Connection; (async () => { connection = await createConnection(ormConfig as ConnectionOptions); const repository = connection.getRepository(OHLCVExternal); - const source = new CryptoCompareOHLCVSource(MAX_CONCURRENT_REQUESTS); + const source = new CryptoCompareOHLCVSource(MAX_REQS_PER_SECOND); const jobTime = new Date().getTime(); const tradingPairs = await fetchOHLCVTradingPairsAsync(connection, SOURCE_NAME, EARLIEST_BACKFILL_TIME); @@ -63,7 +63,7 @@ async function fetchAndSaveAsync( console.log(`Retrieved ${records.length} records for ${JSON.stringify(pair)}`); if (records.length > 0) { const metadata: OHLCVMetadata = { - exchange: source.default_exchange, + exchange: source.defaultExchange, fromSymbol: pair.fromSymbol, toSymbol: pair.toSymbol, source: SOURCE_NAME, diff --git a/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts index bae1fbede..34345f355 100644 --- a/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts +++ b/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts @@ -29,7 +29,7 @@ let connection: Connection; const tokenInfoResponse = await paradexSource.getTokenInfoAsync(); const extendedMarkets = addTokenAddresses(markets, tokenInfoResponse); await Promise.all( - extendedMarkets.map(async (market: ParadexMarket) => getAndSaveMarketOrderbook(paradexSource, market)), + extendedMarkets.map(async (market: ParadexMarket) => getAndSaveMarketOrderbookAsync(paradexSource, market)), ); process.exit(0); })().catch(handleError); @@ -70,7 +70,7 @@ function addTokenAddresses( * @param paradexSource Data source which can query the Paradex API. * @param market Object from the Paradex API with information about the market in question. */ -async function getAndSaveMarketOrderbook(paradexSource: ParadexSource, market: ParadexMarket): Promise<void> { +async function getAndSaveMarketOrderbookAsync(paradexSource: ParadexSource, market: ParadexMarket): Promise<void> { const paradexOrderbookResponse = await paradexSource.getMarketOrderbookAsync(market.symbol); const observedTimestamp = Date.now(); diff --git a/packages/pipeline/src/scripts/pull_trusted_tokens.ts b/packages/pipeline/src/scripts/pull_trusted_tokens.ts index 1befc4437..5906deee6 100644 --- a/packages/pipeline/src/scripts/pull_trusted_tokens.ts +++ b/packages/pipeline/src/scripts/pull_trusted_tokens.ts @@ -16,12 +16,12 @@ let connection: Connection; (async () => { connection = await createConnection(ormConfig as ConnectionOptions); - await getMetamaskTrustedTokens(); - await getZeroExTrustedTokens(); + await getMetamaskTrustedTokensAsync(); + await getZeroExTrustedTokensAsync(); process.exit(0); })().catch(handleError); -async function getMetamaskTrustedTokens(): Promise<void> { +async function getMetamaskTrustedTokensAsync(): Promise<void> { // tslint:disable-next-line:no-console console.log('Getting latest metamask trusted tokens list ...'); const trustedTokensRepository = connection.getRepository(TokenMetadata); @@ -37,7 +37,7 @@ async function getMetamaskTrustedTokens(): Promise<void> { console.log('Done saving metamask trusted tokens.'); } -async function getZeroExTrustedTokens(): Promise<void> { +async function getZeroExTrustedTokensAsync(): Promise<void> { // tslint:disable-next-line:no-console console.log('Getting latest 0x trusted tokens list ...'); const trustedTokensRepository = connection.getRepository(TokenMetadata); diff --git a/packages/pipeline/src/scripts/update_relayer_info.ts b/packages/pipeline/src/scripts/update_relayer_info.ts index f8918728d..41d29b385 100644 --- a/packages/pipeline/src/scripts/update_relayer_info.ts +++ b/packages/pipeline/src/scripts/update_relayer_info.ts @@ -17,11 +17,11 @@ let connection: Connection; (async () => { connection = await createConnection(ormConfig as ConnectionOptions); - await getRelayers(); + await getRelayersAsync(); process.exit(0); })().catch(handleError); -async function getRelayers(): Promise<void> { +async function getRelayersAsync(): Promise<void> { console.log('Getting latest relayer info...'); const relayerRepository = connection.getRepository(Relayer); const relayerSource = new RelayerRegistrySource(RELAYER_REGISTRY_URL); diff --git a/packages/pipeline/src/utils/index.ts b/packages/pipeline/src/utils/index.ts index 2096a0a39..094c0178e 100644 --- a/packages/pipeline/src/utils/index.ts +++ b/packages/pipeline/src/utils/index.ts @@ -15,6 +15,21 @@ export function bigNumbertoStringOrNull(n: BigNumber): string | null { } /** + * If value is null or undefined, returns null. Otherwise converts value to a + * BigNumber. + * @param value A string or number to be converted to a BigNumber + */ +export function toBigNumberOrNull(value: string | number | null): BigNumber | null { + switch (value) { + case null: + case undefined: + return null; + default: + return new BigNumber(value); + } +} + +/** * Logs an error by intelligently checking for `message` and `stack` properties. * Intended for use with top-level immediately invoked asynchronous functions. * @param e the error to log. diff --git a/packages/pipeline/test/data_sources/contract-wrappers/utils_test.ts b/packages/pipeline/test/data_sources/contract-wrappers/utils_test.ts new file mode 100644 index 000000000..06f1a5e86 --- /dev/null +++ b/packages/pipeline/test/data_sources/contract-wrappers/utils_test.ts @@ -0,0 +1,109 @@ +// tslint:disable:custom-no-magic-numbers +import * as chai from 'chai'; +import { LogWithDecodedArgs } from 'ethereum-types'; +import 'mocha'; + +import { _getEventsWithRetriesAsync } from '../../../src/data_sources/contract-wrappers/utils'; +import { chaiSetup } from '../../utils/chai_setup'; + +chaiSetup.configure(); +const expect = chai.expect; + +const retryableMessage = 'network timeout: (simulated network timeout error)'; +const retryableError = new Error(retryableMessage); + +describe('data_sources/contract-wrappers/utils', () => { + describe('_getEventsWithRetriesAsync', () => { + it('sends a single request if it was successful', async () => { + // Pre-declare values for the fromBlock and toBlock arguments. + const expectedFromBlock = 100; + const expectedToBlock = 200; + const expectedLogs: Array<LogWithDecodedArgs<any>> = [ + { + logIndex: 123, + transactionIndex: 456, + transactionHash: '0x6dd106d002873746072fc5e496dd0fb2541b68c77bcf9184ae19a42fd33657fe', + blockHash: '0x6dd106d002873746072fc5e496dd0fb2541b68c77bcf9184ae19a42fd33657ff', + blockNumber: 789, + address: '0x6dd106d002873746072fc5e496dd0fb2541b68c77bcf9184ae19a42fd3365800', + data: 'fake raw data', + topics: [], + event: 'TEST_EVENT', + args: [1, 2, 3], + }, + ]; + + // mockGetEventsAsync checks its arguments, increments `callCount` + // and returns `expectedLogs`. + let callCount = 0; + const mockGetEventsAsync = async ( + fromBlock: number, + toBlock: number, + ): Promise<Array<LogWithDecodedArgs<any>>> => { + expect(fromBlock).equals(expectedFromBlock); + expect(toBlock).equals(expectedToBlock); + callCount += 1; + return expectedLogs; + }; + + // Make sure that we get what we expected and that the mock function + // was called exactly once. + const gotLogs = await _getEventsWithRetriesAsync(mockGetEventsAsync, 3, expectedFromBlock, expectedToBlock); + expect(gotLogs).deep.equals(expectedLogs); + expect(callCount).equals( + 1, + 'getEventsAsync function was called more than once even though it was successful', + ); + }); + it('retries and eventually succeeds', async () => { + const numRetries = 5; + let callCount = 0; + // mockGetEventsAsync throws unless callCount == numRetries + 1. + const mockGetEventsAsync = async ( + _fromBlock: number, + _toBlock: number, + ): Promise<Array<LogWithDecodedArgs<any>>> => { + callCount += 1; + if (callCount === numRetries + 1) { + return []; + } + throw retryableError; + }; + await _getEventsWithRetriesAsync(mockGetEventsAsync, numRetries, 100, 300); + expect(callCount).equals(numRetries + 1, 'getEventsAsync function was called the wrong number of times'); + }); + it('throws for non-retryable errors', async () => { + const numRetries = 5; + const expectedMessage = 'Non-retryable error'; + // mockGetEventsAsync always throws a non-retryable error. + const mockGetEventsAsync = async ( + _fromBlock: number, + _toBlock: number, + ): Promise<Array<LogWithDecodedArgs<any>>> => { + throw new Error(expectedMessage); + }; + // Note(albrow): This does actually return a promise (or at least a + // "promise-like object" and is a false positive in TSLint. + // tslint:disable-next-line:await-promise + await expect(_getEventsWithRetriesAsync(mockGetEventsAsync, numRetries, 100, 300)).to.be.rejectedWith( + expectedMessage, + ); + }); + it('throws after too many retries', async () => { + const numRetries = 5; + // mockGetEventsAsync always throws a retryable error. + const mockGetEventsAsync = async ( + _fromBlock: number, + _toBlock: number, + ): Promise<Array<LogWithDecodedArgs<any>>> => { + throw retryableError; + }; + // Note(albrow): This does actually return a promise (or at least a + // "promise-like object" and is a false positive in TSLint. + // tslint:disable-next-line:await-promise + await expect(_getEventsWithRetriesAsync(mockGetEventsAsync, numRetries, 100, 300)).to.be.rejectedWith( + retryableMessage, + ); + }); + }); +}); diff --git a/packages/pipeline/test/data_sources/ohlcv_external/crypto_compare_test.ts b/packages/pipeline/test/data_sources/ohlcv_external/crypto_compare_test.ts index cb374bbb1..2efe3f5ec 100644 --- a/packages/pipeline/test/data_sources/ohlcv_external/crypto_compare_test.ts +++ b/packages/pipeline/test/data_sources/ohlcv_external/crypto_compare_test.ts @@ -13,7 +13,7 @@ const expect = chai.expect; describe('ohlcv_external data source (Crypto Compare)', () => { describe('generateBackfillIntervals', () => { it('generates pairs with intervals to query', () => { - const source = new CryptoCompareOHLCVSource(); + const source = new CryptoCompareOHLCVSource(20); const pair: TradingPair = { fromSymbol: 'ETH', toSymbol: 'ZRX', @@ -31,7 +31,7 @@ describe('ohlcv_external data source (Crypto Compare)', () => { }); it('returns single pair if no backfill is needed', () => { - const source = new CryptoCompareOHLCVSource(); + const source = new CryptoCompareOHLCVSource(20); const pair: TradingPair = { fromSymbol: 'ETH', toSymbol: 'ZRX', diff --git a/packages/pipeline/test/entities/erc20_approval_events_test.ts b/packages/pipeline/test/entities/erc20_approval_events_test.ts new file mode 100644 index 000000000..1ecf41ee5 --- /dev/null +++ b/packages/pipeline/test/entities/erc20_approval_events_test.ts @@ -0,0 +1,29 @@ +import { BigNumber } from '@0x/utils'; +import 'mocha'; +import 'reflect-metadata'; + +import { ERC20ApprovalEvent } from '../../src/entities'; +import { createDbConnectionOnceAsync } from '../db_setup'; +import { chaiSetup } from '../utils/chai_setup'; + +import { testSaveAndFindEntityAsync } from './util'; + +chaiSetup.configure(); + +// tslint:disable:custom-no-magic-numbers +describe('ERC20ApprovalEvent entity', () => { + it('save/find', async () => { + const connection = await createDbConnectionOnceAsync(); + const event = new ERC20ApprovalEvent(); + event.tokenAddress = '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2'; + event.blockNumber = 6281577; + event.rawData = '0x000000000000000000000000000000000000000000000002b9cba5ee21ad3df9'; + event.logIndex = 43; + event.transactionHash = '0xcb46b19c786376a0a0140d51e3e606a4c4f926d8ca5434e96d2f69d04d8d9c7f'; + event.ownerAddress = '0x0b65c5f6f3a05d6be5588a72b603360773b3fe04'; + event.spenderAddress = '0x448a5065aebb8e423f0896e6c5d525c040f59af3'; + event.amount = new BigNumber('50281464906893835769'); + const blocksRepository = connection.getRepository(ERC20ApprovalEvent); + await testSaveAndFindEntityAsync(blocksRepository, event); + }); +}); diff --git a/packages/pipeline/test/parsers/bloxy/index_test.ts b/packages/pipeline/test/parsers/bloxy/index_test.ts index 2b8d68f98..6aabb091d 100644 --- a/packages/pipeline/test/parsers/bloxy/index_test.ts +++ b/packages/pipeline/test/parsers/bloxy/index_test.ts @@ -7,7 +7,6 @@ import * as R from 'ramda'; import { BLOXY_DEX_TRADES_URL, BloxyTrade } from '../../../src/data_sources/bloxy'; import { DexTrade } from '../../../src/entities'; import { _parseBloxyTrade } from '../../../src/parsers/bloxy'; -import { _convertToExchangeFillEvent } from '../../../src/parsers/events'; import { chaiSetup } from '../../utils/chai_setup'; chaiSetup.configure(); diff --git a/packages/pipeline/test/parsers/ddex_orders/index_test.ts b/packages/pipeline/test/parsers/ddex_orders/index_test.ts index 213100f44..9f4bfe7e3 100644 --- a/packages/pipeline/test/parsers/ddex_orders/index_test.ts +++ b/packages/pipeline/test/parsers/ddex_orders/index_test.ts @@ -4,7 +4,7 @@ import 'mocha'; import { DdexMarket } from '../../../src/data_sources/ddex'; import { TokenOrderbookSnapshot as TokenOrder } from '../../../src/entities'; -import { aggregateOrders, parseDdexOrder } from '../../../src/parsers/ddex_orders'; +import { parseDdexOrder } from '../../../src/parsers/ddex_orders'; import { OrderType } from '../../../src/types'; import { chaiSetup } from '../../utils/chai_setup'; @@ -13,19 +13,6 @@ const expect = chai.expect; // tslint:disable:custom-no-magic-numbers describe('ddex_orders', () => { - describe('aggregateOrders', () => { - it('aggregates orders by price point', () => { - const input = [ - { price: '1', amount: '20', orderId: 'testtest' }, - { price: '1', amount: '30', orderId: 'testone' }, - { price: '2', amount: '100', orderId: 'testtwo' }, - ]; - const expected = [['1', new BigNumber(50)], ['2', new BigNumber(100)]]; - const actual = aggregateOrders(input); - expect(actual).deep.equal(expected); - }); - }); - describe('parseDdexOrder', () => { it('converts ddexOrder to TokenOrder entity', () => { const ddexOrder: [string, BigNumber] = ['0.5', new BigNumber(10)]; diff --git a/packages/pipeline/test/parsers/events/erc20_events_test.ts b/packages/pipeline/test/parsers/events/erc20_events_test.ts new file mode 100644 index 000000000..962c50f98 --- /dev/null +++ b/packages/pipeline/test/parsers/events/erc20_events_test.ts @@ -0,0 +1,54 @@ +import { ERC20TokenApprovalEventArgs } from '@0x/contract-wrappers'; +import { BigNumber } from '@0x/utils'; +import * as chai from 'chai'; +import { LogWithDecodedArgs } from 'ethereum-types'; +import 'mocha'; + +import { ERC20ApprovalEvent } from '../../../src/entities'; +import { _convertToERC20ApprovalEvent } from '../../../src/parsers/events/erc20_events'; +import { _convertToExchangeFillEvent } from '../../../src/parsers/events/exchange_events'; +import { chaiSetup } from '../../utils/chai_setup'; + +chaiSetup.configure(); +const expect = chai.expect; + +// tslint:disable:custom-no-magic-numbers +describe('erc20_events', () => { + describe('_convertToERC20ApprovalEvent', () => { + it('converts LogWithDecodedArgs to ERC20ApprovalEvent entity', () => { + const input: LogWithDecodedArgs<ERC20TokenApprovalEventArgs> = { + address: '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2', + blockHash: '0xd2d7aafaa7102aec0bca8ef026d5a85133e87892334c46ee1e92e42912991c9b', + blockNumber: 6281577, + data: '0x000000000000000000000000000000000000000000000002b9cba5ee21ad3df9', + logIndex: 43, + topics: [ + '0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925', + '0x0000000000000000000000000b65c5f6f3a05d6be5588a72b603360773b3fe04', + '0x000000000000000000000000448a5065aebb8e423f0896e6c5d525c040f59af3', + ], + transactionHash: '0xcb46b19c786376a0a0140d51e3e606a4c4f926d8ca5434e96d2f69d04d8d9c7f', + transactionIndex: 103, + event: 'Approval', + args: { + _owner: '0x0b65c5f6f3a05d6be5588a72b603360773b3fe04', + _spender: '0x448a5065aebb8e423f0896e6c5d525c040f59af3', + _value: new BigNumber('50281464906893835769'), + }, + }; + + const expected = new ERC20ApprovalEvent(); + expected.tokenAddress = '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2'; + expected.blockNumber = 6281577; + expected.rawData = '0x000000000000000000000000000000000000000000000002b9cba5ee21ad3df9'; + expected.logIndex = 43; + expected.transactionHash = '0xcb46b19c786376a0a0140d51e3e606a4c4f926d8ca5434e96d2f69d04d8d9c7f'; + expected.ownerAddress = '0x0b65c5f6f3a05d6be5588a72b603360773b3fe04'; + expected.spenderAddress = '0x448a5065aebb8e423f0896e6c5d525c040f59af3'; + expected.amount = new BigNumber('50281464906893835769'); + + const actual = _convertToERC20ApprovalEvent(input); + expect(actual).deep.equal(expected); + }); + }); +}); diff --git a/packages/pipeline/test/parsers/events/index_test.ts b/packages/pipeline/test/parsers/events/exchange_events_test.ts index 7e439ce39..5d4b185a5 100644 --- a/packages/pipeline/test/parsers/events/index_test.ts +++ b/packages/pipeline/test/parsers/events/exchange_events_test.ts @@ -5,7 +5,7 @@ import { LogWithDecodedArgs } from 'ethereum-types'; import 'mocha'; import { ExchangeFillEvent } from '../../../src/entities'; -import { _convertToExchangeFillEvent } from '../../../src/parsers/events'; +import { _convertToExchangeFillEvent } from '../../../src/parsers/events/exchange_events'; import { chaiSetup } from '../../utils/chai_setup'; chaiSetup.configure(); diff --git a/packages/pipeline/test/parsers/idex_orders/index_test.ts b/packages/pipeline/test/parsers/idex_orders/index_test.ts new file mode 100644 index 000000000..d54ecb9a8 --- /dev/null +++ b/packages/pipeline/test/parsers/idex_orders/index_test.ts @@ -0,0 +1,87 @@ +import { BigNumber } from '@0x/utils'; +import * as chai from 'chai'; +import 'mocha'; + +import { IdexOrderParam } from '../../../src/data_sources/idex'; +import { TokenOrderbookSnapshot as TokenOrder } from '../../../src/entities'; +import { parseIdexOrder } from '../../../src/parsers/idex_orders'; +import { OrderType } from '../../../src/types'; +import { chaiSetup } from '../../utils/chai_setup'; + +chaiSetup.configure(); +const expect = chai.expect; + +// tslint:disable:custom-no-magic-numbers +describe('idex_orders', () => { + describe('parseIdexOrder', () => { + // for market listed as 'DEF_ABC'. + it('correctly converts bid type idexOrder to TokenOrder entity', () => { + const idexOrder: [string, BigNumber] = ['0.5', new BigNumber(10)]; + const idexOrderParam: IdexOrderParam = { + tokenBuy: '0x0000000000000000000000000000000000000000', + buySymbol: 'ABC', + buyPrecision: 2, + amountBuy: '10', + tokenSell: '0xb45df06e38540a675fdb5b598abf2c0dbe9d6b81', + sellSymbol: 'DEF', + sellPrecision: 2, + amountSell: '5', + expires: Date.now() + 100000, + nonce: 1, + user: '0x212345667543456435324564345643453453333', + }; + const observedTimestamp: number = Date.now(); + const orderType: OrderType = 'bid'; + const source: string = 'idex'; + + const expected = new TokenOrder(); + expected.source = 'idex'; + expected.observedTimestamp = observedTimestamp; + expected.orderType = 'bid'; + expected.price = new BigNumber(0.5); + expected.baseAssetSymbol = 'ABC'; + expected.baseAssetAddress = '0x0000000000000000000000000000000000000000'; + expected.baseVolume = new BigNumber(10); + expected.quoteAssetSymbol = 'DEF'; + expected.quoteAssetAddress = '0xb45df06e38540a675fdb5b598abf2c0dbe9d6b81'; + expected.quoteVolume = new BigNumber(5); + + const actual = parseIdexOrder(idexOrderParam, observedTimestamp, orderType, source, idexOrder); + expect(actual).deep.equal(expected); + }); + it('correctly converts ask type idexOrder to TokenOrder entity', () => { + const idexOrder: [string, BigNumber] = ['0.5', new BigNumber(10)]; + const idexOrderParam: IdexOrderParam = { + tokenBuy: '0xb45df06e38540a675fdb5b598abf2c0dbe9d6b81', + buySymbol: 'DEF', + buyPrecision: 2, + amountBuy: '5', + tokenSell: '0x0000000000000000000000000000000000000000', + sellSymbol: 'ABC', + sellPrecision: 2, + amountSell: '10', + expires: Date.now() + 100000, + nonce: 1, + user: '0x212345667543456435324564345643453453333', + }; + const observedTimestamp: number = Date.now(); + const orderType: OrderType = 'ask'; + const source: string = 'idex'; + + const expected = new TokenOrder(); + expected.source = 'idex'; + expected.observedTimestamp = observedTimestamp; + expected.orderType = 'ask'; + expected.price = new BigNumber(0.5); + expected.baseAssetSymbol = 'ABC'; + expected.baseAssetAddress = '0x0000000000000000000000000000000000000000'; + expected.baseVolume = new BigNumber(10); + expected.quoteAssetSymbol = 'DEF'; + expected.quoteAssetAddress = '0xb45df06e38540a675fdb5b598abf2c0dbe9d6b81'; + expected.quoteVolume = new BigNumber(5); + + const actual = parseIdexOrder(idexOrderParam, observedTimestamp, orderType, source, idexOrder); + expect(actual).deep.equal(expected); + }); + }); +}); diff --git a/packages/pipeline/test/parsers/oasis_orders/index_test.ts b/packages/pipeline/test/parsers/oasis_orders/index_test.ts new file mode 100644 index 000000000..9e8ba9a40 --- /dev/null +++ b/packages/pipeline/test/parsers/oasis_orders/index_test.ts @@ -0,0 +1,49 @@ +import { BigNumber } from '@0x/utils'; +import * as chai from 'chai'; +import 'mocha'; + +import { OasisMarket } from '../../../src/data_sources/oasis'; +import { TokenOrderbookSnapshot as TokenOrder } from '../../../src/entities'; +import { parseOasisOrder } from '../../../src/parsers/oasis_orders'; +import { OrderType } from '../../../src/types'; +import { chaiSetup } from '../../utils/chai_setup'; + +chaiSetup.configure(); +const expect = chai.expect; + +// tslint:disable:custom-no-magic-numbers +describe('oasis_orders', () => { + describe('parseOasisOrder', () => { + it('converts oasisOrder to TokenOrder entity', () => { + const oasisOrder: [string, BigNumber] = ['0.5', new BigNumber(10)]; + const oasisMarket: OasisMarket = { + id: 'ABCDEF', + base: 'DEF', + quote: 'ABC', + buyVol: 100, + sellVol: 200, + price: 1, + high: 1, + low: 0, + }; + const observedTimestamp: number = Date.now(); + const orderType: OrderType = 'bid'; + const source: string = 'oasis'; + + const expected = new TokenOrder(); + expected.source = 'oasis'; + expected.observedTimestamp = observedTimestamp; + expected.orderType = 'bid'; + expected.price = new BigNumber(0.5); + expected.baseAssetSymbol = 'DEF'; + expected.baseAssetAddress = null; + expected.baseVolume = new BigNumber(5); + expected.quoteAssetSymbol = 'ABC'; + expected.quoteAssetAddress = null; + expected.quoteVolume = new BigNumber(10); + + const actual = parseOasisOrder(oasisMarket, observedTimestamp, orderType, source, oasisOrder); + expect(actual).deep.equal(expected); + }); + }); +}); diff --git a/packages/pipeline/test/parsers/utils/index_test.ts b/packages/pipeline/test/parsers/utils/index_test.ts new file mode 100644 index 000000000..5a0d0f182 --- /dev/null +++ b/packages/pipeline/test/parsers/utils/index_test.ts @@ -0,0 +1,30 @@ +import { BigNumber } from '@0x/utils'; +import * as chai from 'chai'; +import 'mocha'; + +import { aggregateOrders, GenericRawOrder } from '../../../src/parsers/utils'; +import { chaiSetup } from '../../utils/chai_setup'; + +chaiSetup.configure(); +const expect = chai.expect; + +// tslint:disable:custom-no-magic-numbers +describe('aggregateOrders', () => { + it('aggregates order by price point', () => { + const input = [ + { price: '1', amount: '20', orderHash: 'testtest', total: '20' }, + { price: '1', amount: '30', orderHash: 'testone', total: '30' }, + { price: '2', amount: '100', orderHash: 'testtwo', total: '200' }, + ]; + const expected = [['1', new BigNumber(50)], ['2', new BigNumber(100)]]; + const actual = aggregateOrders(input); + expect(actual).deep.equal(expected); + }); + + it('handles empty orders gracefully', () => { + const input: GenericRawOrder[] = []; + const expected: Array<[string, BigNumber]> = []; + const actual = aggregateOrders(input); + expect(actual).deep.equal(expected); + }); +}); |