From 87ffa5d7ab19d2288bf68131a7e7ec77578c564c Mon Sep 17 00:00:00 2001 From: zkao Date: Tue, 4 Dec 2018 13:21:46 -0800 Subject: Token_orderbook_snapshots for Ddex and Paradex(#1354) * Implements the TokenOrderbookSnapshot Table * Scripts, Data Sources and Entities to pull Ddex and Paradex API data. --- .../1543434472116-TokenOrderbookSnapshots.ts | 30 +++++++ packages/pipeline/src/data_sources/ddex/index.ts | 78 ++++++++++++++++++ .../pipeline/src/data_sources/paradex/index.ts | 92 ++++++++++++++++++++++ packages/pipeline/src/entities/index.ts | 1 + packages/pipeline/src/entities/token_order.ts | 29 +++++++ packages/pipeline/src/ormconfig.ts | 2 + packages/pipeline/src/parsers/ddex_orders/index.ts | 77 ++++++++++++++++++ .../pipeline/src/parsers/paradex_orders/index.ts | 66 ++++++++++++++++ .../src/scripts/pull_ddex_orderbook_snapshots.ts | 55 +++++++++++++ .../scripts/pull_paradex_orderbook_snapshots.ts | 87 ++++++++++++++++++++ packages/pipeline/src/types.ts | 1 + .../pipeline/test/entities/token_order_test.ts | 31 ++++++++ .../test/parsers/ddex_orders/index_test.ts | 66 ++++++++++++++++ .../test/parsers/paradex_orders/index_test.ts | 54 +++++++++++++ 14 files changed, 669 insertions(+) create mode 100644 packages/pipeline/migrations/1543434472116-TokenOrderbookSnapshots.ts create mode 100644 packages/pipeline/src/data_sources/ddex/index.ts create mode 100644 packages/pipeline/src/data_sources/paradex/index.ts create mode 100644 packages/pipeline/src/entities/token_order.ts create mode 100644 packages/pipeline/src/parsers/ddex_orders/index.ts create mode 100644 packages/pipeline/src/parsers/paradex_orders/index.ts create mode 100644 packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts create mode 100644 packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts create mode 100644 packages/pipeline/test/entities/token_order_test.ts create mode 100644 packages/pipeline/test/parsers/ddex_orders/index_test.ts create mode 100644 packages/pipeline/test/parsers/paradex_orders/index_test.ts diff --git a/packages/pipeline/migrations/1543434472116-TokenOrderbookSnapshots.ts b/packages/pipeline/migrations/1543434472116-TokenOrderbookSnapshots.ts new file mode 100644 index 000000000..a7117c753 --- /dev/null +++ b/packages/pipeline/migrations/1543434472116-TokenOrderbookSnapshots.ts @@ -0,0 +1,30 @@ +import { MigrationInterface, QueryRunner, Table } from 'typeorm'; + +const tokenOrderbookSnapshots = new Table({ + name: 'raw.token_orderbook_snapshots', + columns: [ + { name: 'observed_timestamp', type: 'bigint', isPrimary: true }, + { name: 'source', type: 'varchar', isPrimary: true }, + { name: 'order_type', type: 'order_t' }, + { name: 'price', type: 'numeric', isPrimary: true }, + + { name: 'base_asset_symbol', type: 'varchar', isPrimary: true }, + { name: 'base_asset_address', type: 'char(42)' }, + { name: 'base_volume', type: 'numeric' }, + + { name: 'quote_asset_symbol', type: 'varchar', isPrimary: true }, + { name: 'quote_asset_address', type: 'char(42)' }, + { name: 'quote_volume', type: 'numeric' }, + ], +}); + +export class TokenOrderbookSnapshots1543434472116 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(`CREATE TYPE order_t AS enum('bid', 'ask');`); + await queryRunner.createTable(tokenOrderbookSnapshots); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.dropTable(tokenOrderbookSnapshots.name); + } +} diff --git a/packages/pipeline/src/data_sources/ddex/index.ts b/packages/pipeline/src/data_sources/ddex/index.ts new file mode 100644 index 000000000..2bbd8c29b --- /dev/null +++ b/packages/pipeline/src/data_sources/ddex/index.ts @@ -0,0 +1,78 @@ +import { fetchAsync, logUtils } from '@0x/utils'; + +const DDEX_BASE_URL = 'https://api.ddex.io/v2'; +const ACTIVE_MARKETS_URL = `${DDEX_BASE_URL}/markets`; +const NO_AGGREGATION_LEVEL = 3; // See https://docs.ddex.io/#get-orderbook +const ORDERBOOK_ENDPOINT = `/orderbook?level=${NO_AGGREGATION_LEVEL}`; +export const DDEX_SOURCE = 'ddex'; + +export interface DdexActiveMarketsResponse { + status: number; + desc: string; + data: { + markets: DdexMarket[]; + }; +} + +export interface DdexMarket { + id: string; + quoteToken: string; + quoteTokenDecimals: number; + quoteTokenAddress: string; + baseToken: string; + baseTokenDecimals: number; + baseTokenAddress: string; + minOrderSize: string; + maxOrderSize: string; + pricePrecision: number; + priceDecimals: number; + amountDecimals: number; +} + +export interface DdexOrderbookResponse { + status: number; + desc: string; + data: { + orderBook: DdexOrderbook; + }; +} + +export interface DdexOrderbook { + marketId: string; + bids: DdexOrder[]; + asks: DdexOrder[]; +} + +export interface DdexOrder { + price: string; + amount: string; + orderId: string; +} + +// tslint:disable:prefer-function-over-method +// ^ Keep consistency with other sources and help logical organization +export class DdexSource { + /** + * Call Ddex API to find out which markets they are maintaining orderbooks for. + */ + public async getActiveMarketsAsync(): Promise { + logUtils.log('Getting all active DDEX markets'); + const resp = await fetchAsync(ACTIVE_MARKETS_URL); + const respJson: DdexActiveMarketsResponse = await resp.json(); + const markets = respJson.data.markets; + logUtils.log(`Got ${markets.length} markets.`); + return markets; + } + + /** + * Retrieve orderbook from Ddex API for a given market. + * @param marketId String identifying the market we want data for. Eg. 'REP/AUG' + */ + public async getMarketOrderbookAsync(marketId: string): Promise { + logUtils.log(`${marketId}: Retrieving orderbook.`); + const marketOrderbookUrl = `${ACTIVE_MARKETS_URL}/${marketId}${ORDERBOOK_ENDPOINT}`; + const resp = await fetchAsync(marketOrderbookUrl); + const respJson: DdexOrderbookResponse = await resp.json(); + return respJson.data.orderBook; + } +} diff --git a/packages/pipeline/src/data_sources/paradex/index.ts b/packages/pipeline/src/data_sources/paradex/index.ts new file mode 100644 index 000000000..69a03d553 --- /dev/null +++ b/packages/pipeline/src/data_sources/paradex/index.ts @@ -0,0 +1,92 @@ +import { fetchAsync, logUtils } from '@0x/utils'; + +const PARADEX_BASE_URL = 'https://api.paradex.io/consumer/v0'; +const ACTIVE_MARKETS_URL = PARADEX_BASE_URL + '/markets'; +const ORDERBOOK_ENDPOINT = PARADEX_BASE_URL + '/orderbook'; +const TOKEN_INFO_ENDPOINT = PARADEX_BASE_URL + '/tokens'; +export const PARADEX_SOURCE = 'paradex'; + +export type ParadexActiveMarketsResponse = ParadexMarket[]; + +export interface ParadexMarket { + id: string; + symbol: string; + baseToken: string; + quoteToken: string; + minOrderSize: string; + maxOrderSize: string; + priceMaxDecimals: number; + amountMaxDecimals: number; + // These are not native to the Paradex API response. We tag them on later + // by calling the token endpoint and joining on symbol. + baseTokenAddress?: string; + quoteTokenAddress?: string; +} + +export interface ParadexOrderbookResponse { + marketId: number; + marketSymbol: string; + bids: ParadexOrder[]; + asks: ParadexOrder[]; +} + +export interface ParadexOrder { + amount: string; + price: string; +} + +export type ParadexTokenInfoResponse = ParadexTokenInfo[]; + +export interface ParadexTokenInfo { + name: string; + symbol: string; + address: string; +} + +export class ParadexSource { + private readonly _apiKey: string; + + constructor(apiKey: string) { + this._apiKey = apiKey; + } + + /** + * Call Paradex API to find out which markets they are maintaining orderbooks for. + */ + public async getActiveMarketsAsync(): Promise { + logUtils.log('Getting all active Paradex markets.'); + const resp = await fetchAsync(ACTIVE_MARKETS_URL, { + headers: { 'API-KEY': this._apiKey }, + }); + const markets: ParadexActiveMarketsResponse = await resp.json(); + logUtils.log(`Got ${markets.length} markets.`); + return markets; + } + + /** + * Call Paradex API to find out their token information. + */ + public async getTokenInfoAsync(): Promise { + logUtils.log('Getting token information from Paradex.'); + const resp = await fetchAsync(TOKEN_INFO_ENDPOINT, { + headers: { 'API-KEY': this._apiKey }, + }); + const tokens: ParadexTokenInfoResponse = await resp.json(); + logUtils.log(`Got information for ${tokens.length} tokens.`); + return tokens; + } + + /** + * Retrieve orderbook from Paradex API for a given market. + * @param marketSymbol String representing the market we want data for. + */ + public async getMarketOrderbookAsync(marketSymbol: string): Promise { + logUtils.log(`${marketSymbol}: Retrieving orderbook.`); + const marketOrderbookUrl = `${ORDERBOOK_ENDPOINT}?market=${marketSymbol}`; + const resp = await fetchAsync(marketOrderbookUrl, { + headers: { 'API-KEY': this._apiKey }, + }); + const orderbookResponse: ParadexOrderbookResponse = await resp.json(); + return orderbookResponse; + } +} diff --git a/packages/pipeline/src/entities/index.ts b/packages/pipeline/src/entities/index.ts index d3056a477..db0814e38 100644 --- a/packages/pipeline/src/entities/index.ts +++ b/packages/pipeline/src/entities/index.ts @@ -12,6 +12,7 @@ export { Relayer } from './relayer'; export { SraOrder } from './sra_order'; export { SraOrdersObservedTimeStamp, createObservedTimestampForOrder } from './sra_order_observed_timestamp'; export { TokenMetadata } from './token_metadata'; +export { TokenOrderbookSnapshot } from './token_order'; export { Transaction } from './transaction'; export type ExchangeEvent = ExchangeFillEvent | ExchangeCancelEvent | ExchangeCancelUpToEvent; diff --git a/packages/pipeline/src/entities/token_order.ts b/packages/pipeline/src/entities/token_order.ts new file mode 100644 index 000000000..557705767 --- /dev/null +++ b/packages/pipeline/src/entities/token_order.ts @@ -0,0 +1,29 @@ +import { BigNumber } from '@0x/utils'; +import { Column, Entity, PrimaryColumn } from 'typeorm'; + +import { OrderType } from '../types'; +import { bigNumberTransformer, numberToBigIntTransformer } from '../utils'; + +@Entity({ name: 'token_orderbook_snapshots', schema: 'raw' }) +export class TokenOrderbookSnapshot { + @PrimaryColumn({ name: 'observed_timestamp', type: 'bigint', transformer: numberToBigIntTransformer }) + public observedTimestamp!: number; + @PrimaryColumn({ name: 'source' }) + public source!: string; + @Column({ name: 'order_type' }) + public orderType!: OrderType; + @PrimaryColumn({ name: 'price', type: 'numeric', transformer: bigNumberTransformer }) + public price!: BigNumber; + @PrimaryColumn({ name: 'base_asset_symbol' }) + public baseAssetSymbol!: string; + @Column({ name: 'base_asset_address' }) + public baseAssetAddress!: string; + @Column({ name: 'base_volume', type: 'numeric', transformer: bigNumberTransformer }) + public baseVolume!: BigNumber; + @PrimaryColumn({ name: 'quote_asset_symbol' }) + public quoteAssetSymbol!: string; + @Column({ name: 'quote_asset_address' }) + public quoteAssetAddress!: string; + @Column({ name: 'quote_volume', type: 'numeric', transformer: bigNumberTransformer }) + public quoteVolume!: BigNumber; +} diff --git a/packages/pipeline/src/ormconfig.ts b/packages/pipeline/src/ormconfig.ts index fd6c7c39b..c135c399b 100644 --- a/packages/pipeline/src/ormconfig.ts +++ b/packages/pipeline/src/ormconfig.ts @@ -10,6 +10,7 @@ import { SraOrder, SraOrdersObservedTimeStamp, TokenMetadata, + TokenOrderbookSnapshot, Transaction, } from './entities'; @@ -23,6 +24,7 @@ const entities = [ SraOrder, SraOrdersObservedTimeStamp, TokenMetadata, + TokenOrderbookSnapshot, Transaction, ]; diff --git a/packages/pipeline/src/parsers/ddex_orders/index.ts b/packages/pipeline/src/parsers/ddex_orders/index.ts new file mode 100644 index 000000000..81132e8f0 --- /dev/null +++ b/packages/pipeline/src/parsers/ddex_orders/index.ts @@ -0,0 +1,77 @@ +import { BigNumber } from '@0x/utils'; +import * as R from 'ramda'; + +import { DdexMarket, DdexOrder, DdexOrderbook } from '../../data_sources/ddex'; +import { TokenOrderbookSnapshot as TokenOrder } from '../../entities'; +import { OrderType } from '../../types'; + +/** + * Marque function of this file. + * 1) Takes in orders from an orderbook, + * other information attached. + * @param ddexOrderbook A raw orderbook that we pull from the Ddex API. + * @param ddexMarket An object containing market data also directly from the API. + * @param observedTimestamp Time at which the orders for the market were pulled. + * @param source The exchange where these orders are placed. In this case 'ddex'. + */ +export function parseDdexOrders( + ddexOrderbook: DdexOrderbook, + ddexMarket: DdexMarket, + observedTimestamp: number, + source: string, +): TokenOrder[] { + const aggregatedBids = aggregateOrders(ddexOrderbook.bids); + const aggregatedAsks = aggregateOrders(ddexOrderbook.asks); + const parsedBids = aggregatedBids.map(order => parseDdexOrder(ddexMarket, observedTimestamp, 'bid', source, order)); + const parsedAsks = aggregatedAsks.map(order => parseDdexOrder(ddexMarket, observedTimestamp, 'ask', source, order)); + return parsedBids.concat(parsedAsks); +} + +/** + * Aggregates orders by price point for consistency with other exchanges. + * Querying the Ddex API at level 3 setting returns a breakdown of + * individual orders at each price point. Other exchanges only give total amount + * at each price point. Returns an array of tuples. + * @param ddexOrders A list of Ddex orders awaiting aggregation. + */ +export function aggregateOrders(ddexOrders: DdexOrder[]): Array<[string, BigNumber]> { + const sumAmount = (acc: BigNumber, order: DdexOrder): BigNumber => acc.plus(order.amount); + const aggregatedPricePoints = R.reduceBy(sumAmount, new BigNumber(0), R.prop('price'), ddexOrders); + return Object.entries(aggregatedPricePoints); +} + +/** + * Parse a single aggregated Ddex order in order to form a tokenOrder entity + * which can be saved into the database. + * @param ddexMarket An object containing information about the market where these + * trades have been placed. + * @param observedTimestamp The time when the API response returned back to us. + * @param orderType 'bid' or 'ask' enum. + * @param source Exchange where these orders were placed. + * @param ddexOrder A tuple which we will convert to volume-basis. + */ +export function parseDdexOrder( + ddexMarket: DdexMarket, + observedTimestamp: number, + orderType: OrderType, + source: string, + ddexOrder: [string, BigNumber], +): TokenOrder { + const tokenOrder = new TokenOrder(); + const price = new BigNumber(ddexOrder[0]); + const amount = ddexOrder[1]; + + tokenOrder.source = source; + tokenOrder.observedTimestamp = observedTimestamp; + tokenOrder.orderType = orderType; + tokenOrder.price = price; + + tokenOrder.baseAssetSymbol = ddexMarket.baseToken; + tokenOrder.baseAssetAddress = ddexMarket.baseTokenAddress; + tokenOrder.baseVolume = price.times(amount); + + tokenOrder.quoteAssetSymbol = ddexMarket.quoteToken; + tokenOrder.quoteAssetAddress = ddexMarket.quoteTokenAddress; + tokenOrder.quoteVolume = amount; + return tokenOrder; +} diff --git a/packages/pipeline/src/parsers/paradex_orders/index.ts b/packages/pipeline/src/parsers/paradex_orders/index.ts new file mode 100644 index 000000000..7966658a7 --- /dev/null +++ b/packages/pipeline/src/parsers/paradex_orders/index.ts @@ -0,0 +1,66 @@ +import { BigNumber } from '@0x/utils'; + +import { ParadexMarket, ParadexOrder, ParadexOrderbookResponse } from '../../data_sources/paradex'; +import { TokenOrderbookSnapshot as TokenOrder } from '../../entities'; +import { OrderType } from '../../types'; + +/** + * Marque function of this file. + * 1) Takes in orders from an orderbook (orders are already aggregated by price point), + * 2) For each aggregated order, forms a TokenOrder entity with market data and + * other information attached. + * @param paradexOrderbookResponse An orderbook response from the Paradex API. + * @param paradexMarket An object containing market data also directly from the API. + * @param observedTimestamp Time at which the orders for the market were pulled. + * @param source The exchange where these orders are placed. In this case 'paradex'. + */ +export function parseParadexOrders( + paradexOrderbookResponse: ParadexOrderbookResponse, + paradexMarket: ParadexMarket, + observedTimestamp: number, + source: string, +): TokenOrder[] { + const parsedBids = paradexOrderbookResponse.bids.map(order => + parseParadexOrder(paradexMarket, observedTimestamp, 'bid', source, order), + ); + const parsedAsks = paradexOrderbookResponse.asks.map(order => + parseParadexOrder(paradexMarket, observedTimestamp, 'ask', source, order), + ); + return parsedBids.concat(parsedAsks); +} + +/** + * Parse a single aggregated Ddex order in order to form a tokenOrder entity + * which can be saved into the database. + * @param paradexMarket An object containing information about the market where these + * orders have been placed. + * @param observedTimestamp The time when the API response returned back to us. + * @param orderType 'bid' or 'ask' enum. + * @param source Exchange where these orders were placed. + * @param paradexOrder A ParadexOrder object; basically price, amount tuple. + */ +export function parseParadexOrder( + paradexMarket: ParadexMarket, + observedTimestamp: number, + orderType: OrderType, + source: string, + paradexOrder: ParadexOrder, +): TokenOrder { + const tokenOrder = new TokenOrder(); + const price = new BigNumber(paradexOrder.price); + const amount = new BigNumber(paradexOrder.amount); + + tokenOrder.source = source; + tokenOrder.observedTimestamp = observedTimestamp; + tokenOrder.orderType = orderType; + tokenOrder.price = price; + + tokenOrder.baseAssetSymbol = paradexMarket.baseToken; + tokenOrder.baseAssetAddress = paradexMarket.baseTokenAddress as string; + tokenOrder.baseVolume = price.times(amount); + + tokenOrder.quoteAssetSymbol = paradexMarket.quoteToken; + tokenOrder.quoteAssetAddress = paradexMarket.quoteTokenAddress as string; + tokenOrder.quoteVolume = amount; + return tokenOrder; +} diff --git a/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts new file mode 100644 index 000000000..b02468e9b --- /dev/null +++ b/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts @@ -0,0 +1,55 @@ +import { logUtils } from '@0x/utils'; +import * as R from 'ramda'; +import { Connection, ConnectionOptions, createConnection } from 'typeorm'; + +import { DDEX_SOURCE, DdexMarket, DdexSource } from '../data_sources/ddex'; +import { TokenOrderbookSnapshot as TokenOrder } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseDdexOrders } from '../parsers/ddex_orders'; +import { handleError } from '../utils'; + +// Number of orders to save at once. +const BATCH_SAVE_SIZE = 1000; + +// Number of markets to retrieve orderbooks for at once. +const MARKET_ORDERBOOK_REQUEST_BATCH_SIZE = 50; + +// Delay between market orderbook requests. +const MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY = 5000; + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const ddexSource = new DdexSource(); + const markets = await ddexSource.getActiveMarketsAsync(); + for (const marketsChunk of R.splitEvery(MARKET_ORDERBOOK_REQUEST_BATCH_SIZE, markets)) { + await Promise.all( + marketsChunk.map(async (market: DdexMarket) => getAndSaveMarketOrderbook(ddexSource, market)), + ); + await new Promise(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY)); + } + process.exit(0); +})().catch(handleError); + +/** + * Retrieve orderbook from Ddex API for a given market. Parse orders and insert + * them into our database. + * @param ddexSource Data source which can query Ddex API. + * @param market Object from Ddex API containing market data. + */ +async function getAndSaveMarketOrderbook(ddexSource: DdexSource, market: DdexMarket): Promise { + const orderBook = await ddexSource.getMarketOrderbookAsync(market.id); + const observedTimestamp = Date.now(); + + logUtils.log(`${market.id}: Parsing orders.`); + const orders = parseDdexOrders(orderBook, market, observedTimestamp, DDEX_SOURCE); + + if (orders.length > 0) { + logUtils.log(`${market.id}: Saving ${orders.length} orders.`); + const TokenOrderRepository = connection.getRepository(TokenOrder); + await TokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) }); + } else { + logUtils.log(`${market.id}: 0 orders to save.`); + } +} diff --git a/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts new file mode 100644 index 000000000..bae1fbede --- /dev/null +++ b/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts @@ -0,0 +1,87 @@ +import { logUtils } from '@0x/utils'; +import { Connection, ConnectionOptions, createConnection } from 'typeorm'; + +import { + PARADEX_SOURCE, + ParadexActiveMarketsResponse, + ParadexMarket, + ParadexSource, + ParadexTokenInfoResponse, +} from '../data_sources/paradex'; +import { TokenOrderbookSnapshot as TokenOrder } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseParadexOrders } from '../parsers/paradex_orders'; +import { handleError } from '../utils'; + +// Number of orders to save at once. +const BATCH_SAVE_SIZE = 1000; + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const apiKey = process.env.PARADEX_DATA_PIPELINE_API_KEY; + if (apiKey === undefined) { + throw new Error('Missing required env var: PARADEX_DATA_PIPELINE_API_KEY'); + } + const paradexSource = new ParadexSource(apiKey); + const markets = await paradexSource.getActiveMarketsAsync(); + const tokenInfoResponse = await paradexSource.getTokenInfoAsync(); + const extendedMarkets = addTokenAddresses(markets, tokenInfoResponse); + await Promise.all( + extendedMarkets.map(async (market: ParadexMarket) => getAndSaveMarketOrderbook(paradexSource, market)), + ); + process.exit(0); +})().catch(handleError); + +/** + * Extend the default ParadexMarket objects with token addresses. + * @param markets An array of ParadexMarket objects. + * @param tokenInfoResponse An array of ParadexTokenInfo containing the addresses. + */ +function addTokenAddresses( + markets: ParadexActiveMarketsResponse, + tokenInfoResponse: ParadexTokenInfoResponse, +): ParadexMarket[] { + const symbolAddressMapping = new Map(); + tokenInfoResponse.forEach(tokenInfo => symbolAddressMapping.set(tokenInfo.symbol, tokenInfo.address)); + + markets.forEach((market: ParadexMarket) => { + if (symbolAddressMapping.has(market.baseToken)) { + market.baseTokenAddress = symbolAddressMapping.get(market.baseToken); + } else { + market.quoteTokenAddress = ''; + logUtils.warn(`${market.baseToken}: No address found.`); + } + + if (symbolAddressMapping.has(market.quoteToken)) { + market.quoteTokenAddress = symbolAddressMapping.get(market.quoteToken); + } else { + market.quoteTokenAddress = ''; + logUtils.warn(`${market.quoteToken}: No address found.`); + } + }); + return markets; +} + +/** + * Retrieve orderbook from Paradex API for a given market. Parse orders and insert + * them into our database. + * @param paradexSource Data source which can query the Paradex API. + * @param market Object from the Paradex API with information about the market in question. + */ +async function getAndSaveMarketOrderbook(paradexSource: ParadexSource, market: ParadexMarket): Promise { + const paradexOrderbookResponse = await paradexSource.getMarketOrderbookAsync(market.symbol); + const observedTimestamp = Date.now(); + + logUtils.log(`${market.symbol}: Parsing orders.`); + const orders = parseParadexOrders(paradexOrderbookResponse, market, observedTimestamp, PARADEX_SOURCE); + + if (orders.length > 0) { + logUtils.log(`${market.symbol}: Saving ${orders.length} orders.`); + const tokenOrderRepository = connection.getRepository(TokenOrder); + await tokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) }); + } else { + logUtils.log(`${market.symbol}: 0 orders to save.`); + } +} diff --git a/packages/pipeline/src/types.ts b/packages/pipeline/src/types.ts index a12d37895..e02b42a40 100644 --- a/packages/pipeline/src/types.ts +++ b/packages/pipeline/src/types.ts @@ -1 +1,2 @@ export type AssetType = 'erc20' | 'erc721'; +export type OrderType = 'bid' | 'ask'; diff --git a/packages/pipeline/test/entities/token_order_test.ts b/packages/pipeline/test/entities/token_order_test.ts new file mode 100644 index 000000000..c6057f5aa --- /dev/null +++ b/packages/pipeline/test/entities/token_order_test.ts @@ -0,0 +1,31 @@ +import { BigNumber } from '@0x/utils'; +import 'mocha'; + +import { TokenOrderbookSnapshot } from '../../src/entities'; +import { createDbConnectionOnceAsync } from '../db_setup'; +import { chaiSetup } from '../utils/chai_setup'; + +import { testSaveAndFindEntityAsync } from './util'; + +chaiSetup.configure(); + +const tokenOrderbookSnapshot: TokenOrderbookSnapshot = { + source: 'ddextest', + observedTimestamp: Date.now(), + orderType: 'bid', + price: new BigNumber(10.1), + baseAssetSymbol: 'ETH', + baseAssetAddress: '0x818e6fecd516ecc3849daf6845e3ec868087b755', + baseVolume: new BigNumber(143), + quoteAssetSymbol: 'ABC', + quoteAssetAddress: '0x00923b9a074762b93650716333b3e1473a15048e', + quoteVolume: new BigNumber(12.3234234), +}; + +describe('TokenOrderbookSnapshot entity', () => { + it('save/find', async () => { + const connection = await createDbConnectionOnceAsync(); + const tokenOrderbookSnapshotRepository = connection.getRepository(TokenOrderbookSnapshot); + await testSaveAndFindEntityAsync(tokenOrderbookSnapshotRepository, tokenOrderbookSnapshot); + }); +}); diff --git a/packages/pipeline/test/parsers/ddex_orders/index_test.ts b/packages/pipeline/test/parsers/ddex_orders/index_test.ts new file mode 100644 index 000000000..213100f44 --- /dev/null +++ b/packages/pipeline/test/parsers/ddex_orders/index_test.ts @@ -0,0 +1,66 @@ +import { BigNumber } from '@0x/utils'; +import * as chai from 'chai'; +import 'mocha'; + +import { DdexMarket } from '../../../src/data_sources/ddex'; +import { TokenOrderbookSnapshot as TokenOrder } from '../../../src/entities'; +import { aggregateOrders, parseDdexOrder } from '../../../src/parsers/ddex_orders'; +import { OrderType } from '../../../src/types'; +import { chaiSetup } from '../../utils/chai_setup'; + +chaiSetup.configure(); +const expect = chai.expect; + +// tslint:disable:custom-no-magic-numbers +describe('ddex_orders', () => { + describe('aggregateOrders', () => { + it('aggregates orders by price point', () => { + const input = [ + { price: '1', amount: '20', orderId: 'testtest' }, + { price: '1', amount: '30', orderId: 'testone' }, + { price: '2', amount: '100', orderId: 'testtwo' }, + ]; + const expected = [['1', new BigNumber(50)], ['2', new BigNumber(100)]]; + const actual = aggregateOrders(input); + expect(actual).deep.equal(expected); + }); + }); + + describe('parseDdexOrder', () => { + it('converts ddexOrder to TokenOrder entity', () => { + const ddexOrder: [string, BigNumber] = ['0.5', new BigNumber(10)]; + const ddexMarket: DdexMarket = { + id: 'ABC-DEF', + quoteToken: 'ABC', + quoteTokenDecimals: 5, + quoteTokenAddress: '0x0000000000000000000000000000000000000000', + baseToken: 'DEF', + baseTokenDecimals: 2, + baseTokenAddress: '0xb45df06e38540a675fdb5b598abf2c0dbe9d6b81', + minOrderSize: '0.1', + maxOrderSize: '1000', + pricePrecision: 1, + priceDecimals: 1, + amountDecimals: 0, + }; + const observedTimestamp: number = Date.now(); + const orderType: OrderType = 'bid'; + const source: string = 'ddex'; + + const expected = new TokenOrder(); + expected.source = 'ddex'; + expected.observedTimestamp = observedTimestamp; + expected.orderType = 'bid'; + expected.price = new BigNumber(0.5); + expected.baseAssetSymbol = 'DEF'; + expected.baseAssetAddress = '0xb45df06e38540a675fdb5b598abf2c0dbe9d6b81'; + expected.baseVolume = new BigNumber(5); + expected.quoteAssetSymbol = 'ABC'; + expected.quoteAssetAddress = '0x0000000000000000000000000000000000000000'; + expected.quoteVolume = new BigNumber(10); + + const actual = parseDdexOrder(ddexMarket, observedTimestamp, orderType, source, ddexOrder); + expect(actual).deep.equal(expected); + }); + }); +}); diff --git a/packages/pipeline/test/parsers/paradex_orders/index_test.ts b/packages/pipeline/test/parsers/paradex_orders/index_test.ts new file mode 100644 index 000000000..1522806bf --- /dev/null +++ b/packages/pipeline/test/parsers/paradex_orders/index_test.ts @@ -0,0 +1,54 @@ +import { BigNumber } from '@0x/utils'; +import * as chai from 'chai'; +import 'mocha'; + +import { ParadexMarket, ParadexOrder } from '../../../src/data_sources/paradex'; +import { TokenOrderbookSnapshot as TokenOrder } from '../../../src/entities'; +import { parseParadexOrder } from '../../../src/parsers/paradex_orders'; +import { OrderType } from '../../../src/types'; +import { chaiSetup } from '../../utils/chai_setup'; + +chaiSetup.configure(); +const expect = chai.expect; + +// tslint:disable:custom-no-magic-numbers +describe('paradex_orders', () => { + describe('parseParadexOrder', () => { + it('converts ParadexOrder to TokenOrder entity', () => { + const paradexOrder: ParadexOrder = { + amount: '412', + price: '0.1245', + }; + const paradexMarket: ParadexMarket = { + id: '2', + symbol: 'ABC/DEF', + baseToken: 'DEF', + quoteToken: 'ABC', + minOrderSize: '0.1', + maxOrderSize: '1000', + priceMaxDecimals: 5, + amountMaxDecimals: 5, + baseTokenAddress: '0xb45df06e38540a675fdb5b598abf2c0dbe9d6b81', + quoteTokenAddress: '0x0000000000000000000000000000000000000000', + }; + const observedTimestamp: number = Date.now(); + const orderType: OrderType = 'bid'; + const source: string = 'paradex'; + + const expected = new TokenOrder(); + expected.source = 'paradex'; + expected.observedTimestamp = observedTimestamp; + expected.orderType = 'bid'; + expected.price = new BigNumber(0.1245); + expected.baseAssetSymbol = 'DEF'; + expected.baseAssetAddress = '0xb45df06e38540a675fdb5b598abf2c0dbe9d6b81'; + expected.baseVolume = new BigNumber(412 * 0.1245); + expected.quoteAssetSymbol = 'ABC'; + expected.quoteAssetAddress = '0x0000000000000000000000000000000000000000'; + expected.quoteVolume = new BigNumber(412); + + const actual = parseParadexOrder(paradexMarket, observedTimestamp, orderType, source, paradexOrder); + expect(actual).deep.equal(expected); + }); + }); +}); -- cgit v1.2.3