diff options
Diffstat (limited to 'packages/pipeline/src')
-rw-r--r-- | packages/pipeline/src/data_sources/idex/index.ts | 82 | ||||
-rw-r--r-- | packages/pipeline/src/data_sources/oasis/index.ts | 103 | ||||
-rw-r--r-- | packages/pipeline/src/entities/token_order.ts | 10 | ||||
-rw-r--r-- | packages/pipeline/src/parsers/ddex_orders/index.ts | 18 | ||||
-rw-r--r-- | packages/pipeline/src/parsers/idex_orders/index.ts | 77 | ||||
-rw-r--r-- | packages/pipeline/src/parsers/oasis_orders/index.ts | 71 | ||||
-rw-r--r-- | packages/pipeline/src/parsers/utils.ts | 28 | ||||
-rw-r--r-- | packages/pipeline/src/scripts/pull_idex_orderbook_snapshots.ts | 63 | ||||
-rw-r--r-- | packages/pipeline/src/scripts/pull_oasis_orderbook_snapshots.ts | 58 |
9 files changed, 490 insertions, 20 deletions
diff --git a/packages/pipeline/src/data_sources/idex/index.ts b/packages/pipeline/src/data_sources/idex/index.ts new file mode 100644 index 000000000..c1e53c08d --- /dev/null +++ b/packages/pipeline/src/data_sources/idex/index.ts @@ -0,0 +1,82 @@ +import { fetchAsync } from '@0x/utils'; + +const IDEX_BASE_URL = 'https://api.idex.market'; +const MARKETS_URL = `${IDEX_BASE_URL}/returnTicker`; +const ORDERBOOK_URL = `${IDEX_BASE_URL}/returnOrderBook`; +const MAX_ORDER_COUNT = 100; // Maximum based on https://github.com/AuroraDAO/idex-api-docs#returnorderbook +export const IDEX_SOURCE = 'idex'; + +export interface IdexMarketsResponse { + [marketName: string]: IdexMarket; +} + +export interface IdexMarket { + last: string; + high: string; + low: string; + lowestAsk: string; + highestBid: string; + percentChange: string; + baseVolume: string; + quoteVolume: string; +} + +export interface IdexOrderbook { + asks: IdexOrder[]; + bids: IdexOrder[]; +} + +export interface IdexOrder { + price: string; + amount: string; + total: string; + orderHash: string; + params: IdexOrderParam; +} + +export interface IdexOrderParam { + tokenBuy: string; + buySymbol: string; + buyPrecision: number; + amountBuy: string; + tokenSell: string; + sellSymbol: string; + sellPrecision: number; + amountSell: string; + expires: number; + nonce: number; + user: string; +} + +// tslint:disable:prefer-function-over-method +// ^ Keep consistency with other sources and help logical organization +export class IdexSource { + /** + * Call Idex API to find out which markets they are maintaining orderbooks for. + */ + public async getMarketsAsync(): Promise<string[]> { + const params = { method: 'POST' }; + const resp = await fetchAsync(MARKETS_URL, params); + const respJson: IdexMarketsResponse = await resp.json(); + const markets: string[] = Object.keys(respJson); + return markets; + } + + /** + * Retrieve orderbook from Idex API for a given market. + * @param marketId String identifying the market we want data for. Eg. 'REP_AUG' + */ + public async getMarketOrderbookAsync(marketId: string): Promise<IdexOrderbook> { + const params = { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + market: marketId, + count: MAX_ORDER_COUNT, + }), + }; + const resp = await fetchAsync(ORDERBOOK_URL, params); + const respJson: IdexOrderbook = await resp.json(); + return respJson; + } +} diff --git a/packages/pipeline/src/data_sources/oasis/index.ts b/packages/pipeline/src/data_sources/oasis/index.ts new file mode 100644 index 000000000..3b30e9dfd --- /dev/null +++ b/packages/pipeline/src/data_sources/oasis/index.ts @@ -0,0 +1,103 @@ +import { fetchAsync } from '@0x/utils'; + +const OASIS_BASE_URL = 'https://data.makerdao.com/v1'; +const OASIS_MARKET_QUERY = `query { + oasisMarkets(period: "1 week") { + nodes { + id + base + quote + buyVol + sellVol + price + high + low + } + } +}`; +const OASIS_ORDERBOOK_QUERY = `query ($market: String!) { + allOasisOrders(condition: { market: $market }) { + totalCount + nodes { + market + offerId + price + amount + act + } + } +}`; +export const OASIS_SOURCE = 'oasis'; + +export interface OasisMarket { + id: string; // market symbol e.g MKRDAI + base: string; // base symbol e.g MKR + quote: string; // quote symbol e.g DAI + buyVol: number; // total buy volume (base) + sellVol: number; // total sell volume (base) + price: number; // volume weighted price (quote) + high: number; // max sell price + low: number; // min buy price +} + +export interface OasisMarketResponse { + data: { + oasisMarkets: { + nodes: OasisMarket[]; + }; + }; +} + +export interface OasisOrder { + offerId: number; // Offer Id + market: string; // Market symbol (base/quote) + price: string; // Offer price (quote) + amount: string; // Offer amount (base) + act: string; // Action (ask|bid) +} + +export interface OasisOrderbookResponse { + data: { + allOasisOrders: { + totalCount: number; + nodes: OasisOrder[]; + }; + }; +} + +// tslint:disable:prefer-function-over-method +// ^ Keep consistency with other sources and help logical organization +export class OasisSource { + /** + * Call Ddex API to find out which markets they are maintaining orderbooks for. + */ + public async getActiveMarketsAsync(): Promise<OasisMarket[]> { + const params = { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ query: OASIS_MARKET_QUERY }), + }; + const resp = await fetchAsync(OASIS_BASE_URL, params); + const respJson: OasisMarketResponse = await resp.json(); + const markets = respJson.data.oasisMarkets.nodes; + return markets; + } + + /** + * Retrieve orderbook from Oasis API for a given market. + * @param marketId String identifying the market we want data for. Eg. 'REPAUG'. + */ + public async getMarketOrderbookAsync(marketId: string): Promise<OasisOrder[]> { + const input = { + market: marketId, + }; + const params = { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ query: OASIS_ORDERBOOK_QUERY, variables: input }), + }; + const resp = await fetchAsync(OASIS_BASE_URL, params); + const respJson: OasisOrderbookResponse = await resp.json(); + return respJson.data.allOasisOrders.nodes; + } +} diff --git a/packages/pipeline/src/entities/token_order.ts b/packages/pipeline/src/entities/token_order.ts index 557705767..4b8f0abc3 100644 --- a/packages/pipeline/src/entities/token_order.ts +++ b/packages/pipeline/src/entities/token_order.ts @@ -10,20 +10,20 @@ export class TokenOrderbookSnapshot { public observedTimestamp!: number; @PrimaryColumn({ name: 'source' }) public source!: string; - @Column({ name: 'order_type' }) + @PrimaryColumn({ name: 'order_type' }) public orderType!: OrderType; @PrimaryColumn({ name: 'price', type: 'numeric', transformer: bigNumberTransformer }) public price!: BigNumber; @PrimaryColumn({ name: 'base_asset_symbol' }) public baseAssetSymbol!: string; - @Column({ name: 'base_asset_address' }) - public baseAssetAddress!: string; + @Column({ nullable: true, type: String, name: 'base_asset_address' }) + public baseAssetAddress!: string | null; @Column({ name: 'base_volume', type: 'numeric', transformer: bigNumberTransformer }) public baseVolume!: BigNumber; @PrimaryColumn({ name: 'quote_asset_symbol' }) public quoteAssetSymbol!: string; - @Column({ name: 'quote_asset_address' }) - public quoteAssetAddress!: string; + @Column({ nullable: true, type: String, name: 'quote_asset_address' }) + public quoteAssetAddress!: string | null; @Column({ name: 'quote_volume', type: 'numeric', transformer: bigNumberTransformer }) public quoteVolume!: BigNumber; } diff --git a/packages/pipeline/src/parsers/ddex_orders/index.ts b/packages/pipeline/src/parsers/ddex_orders/index.ts index 81132e8f0..52a998f9f 100644 --- a/packages/pipeline/src/parsers/ddex_orders/index.ts +++ b/packages/pipeline/src/parsers/ddex_orders/index.ts @@ -1,7 +1,8 @@ import { BigNumber } from '@0x/utils'; -import * as R from 'ramda'; -import { DdexMarket, DdexOrder, DdexOrderbook } from '../../data_sources/ddex'; +import { aggregateOrders } from '../utils'; + +import { DdexMarket, DdexOrderbook } from '../../data_sources/ddex'; import { TokenOrderbookSnapshot as TokenOrder } from '../../entities'; import { OrderType } from '../../types'; @@ -28,19 +29,6 @@ export function parseDdexOrders( } /** - * Aggregates orders by price point for consistency with other exchanges. - * Querying the Ddex API at level 3 setting returns a breakdown of - * individual orders at each price point. Other exchanges only give total amount - * at each price point. Returns an array of <price, amount> tuples. - * @param ddexOrders A list of Ddex orders awaiting aggregation. - */ -export function aggregateOrders(ddexOrders: DdexOrder[]): Array<[string, BigNumber]> { - const sumAmount = (acc: BigNumber, order: DdexOrder): BigNumber => acc.plus(order.amount); - const aggregatedPricePoints = R.reduceBy(sumAmount, new BigNumber(0), R.prop('price'), ddexOrders); - return Object.entries(aggregatedPricePoints); -} - -/** * Parse a single aggregated Ddex order in order to form a tokenOrder entity * which can be saved into the database. * @param ddexMarket An object containing information about the market where these diff --git a/packages/pipeline/src/parsers/idex_orders/index.ts b/packages/pipeline/src/parsers/idex_orders/index.ts new file mode 100644 index 000000000..dfe27455c --- /dev/null +++ b/packages/pipeline/src/parsers/idex_orders/index.ts @@ -0,0 +1,77 @@ +import { BigNumber } from '@0x/utils'; + +import { aggregateOrders } from '../utils'; + +import { IdexOrder, IdexOrderbook, IdexOrderParam } from '../../data_sources/idex'; +import { TokenOrderbookSnapshot as TokenOrder } from '../../entities'; +import { OrderType } from '../../types'; + +/** + * Marque function of this file. + * 1) Takes in orders from an orderbook, + * 2) Aggregates them by price point, + * 3) Parses them into entities which are then saved into the database. + * @param idexOrderbook raw orderbook that we pull from the Idex API. + * @param observedTimestamp Time at which the orders for the market were pulled. + * @param source The exchange where these orders are placed. In this case 'idex'. + */ +export function parseIdexOrders(idexOrderbook: IdexOrderbook, observedTimestamp: number, source: string): TokenOrder[] { + const aggregatedBids = aggregateOrders(idexOrderbook.bids); + // Any of the bid orders' params will work + const idexBidOrder = idexOrderbook.bids[0]; + const parsedBids = + aggregatedBids.length > 0 + ? aggregatedBids.map(order => parseIdexOrder(idexBidOrder.params, observedTimestamp, 'bid', source, order)) + : []; + + const aggregatedAsks = aggregateOrders(idexOrderbook.asks); + // Any of the ask orders' params will work + const idexAskOrder = idexOrderbook.asks[0]; + const parsedAsks = + aggregatedAsks.length > 0 + ? aggregatedAsks.map(order => parseIdexOrder(idexAskOrder.params, observedTimestamp, 'ask', source, order)) + : []; + return parsedBids.concat(parsedAsks); +} + +/** + * Parse a single aggregated Idex order in order to form a tokenOrder entity + * which can be saved into the database. + * @param idexOrderParam An object containing information about the market where these + * trades have been placed. + * @param observedTimestamp The time when the API response returned back to us. + * @param orderType 'bid' or 'ask' enum. + * @param source Exchange where these orders were placed. + * @param idexOrder A <price, amount> tuple which we will convert to volume-basis. + */ +export function parseIdexOrder( + idexOrderParam: IdexOrderParam, + observedTimestamp: number, + orderType: OrderType, + source: string, + idexOrder: [string, BigNumber], +): TokenOrder { + const tokenOrder = new TokenOrder(); + const price = new BigNumber(idexOrder[0]); + const amount = idexOrder[1]; + + tokenOrder.source = source; + tokenOrder.observedTimestamp = observedTimestamp; + tokenOrder.orderType = orderType; + tokenOrder.price = price; + tokenOrder.baseVolume = amount; + tokenOrder.quoteVolume = price.times(amount); + + if (orderType === 'bid') { + tokenOrder.baseAssetSymbol = idexOrderParam.buySymbol; + tokenOrder.baseAssetAddress = idexOrderParam.tokenBuy; + tokenOrder.quoteAssetSymbol = idexOrderParam.sellSymbol; + tokenOrder.quoteAssetAddress = idexOrderParam.tokenSell; + } else { + tokenOrder.baseAssetSymbol = idexOrderParam.sellSymbol; + tokenOrder.baseAssetAddress = idexOrderParam.tokenSell; + tokenOrder.quoteAssetSymbol = idexOrderParam.buySymbol; + tokenOrder.quoteAssetAddress = idexOrderParam.tokenBuy; + } + return tokenOrder; +} diff --git a/packages/pipeline/src/parsers/oasis_orders/index.ts b/packages/pipeline/src/parsers/oasis_orders/index.ts new file mode 100644 index 000000000..7aafbf460 --- /dev/null +++ b/packages/pipeline/src/parsers/oasis_orders/index.ts @@ -0,0 +1,71 @@ +import { BigNumber } from '@0x/utils'; +import * as R from 'ramda'; + +import { aggregateOrders } from '../utils'; + +import { OasisMarket, OasisOrder } from '../../data_sources/oasis'; +import { TokenOrderbookSnapshot as TokenOrder } from '../../entities'; +import { OrderType } from '../../types'; + +/** + * Marque function of this file. + * 1) Takes in orders from an orderbook, + * 2) Aggregates them according to price point, + * 3) Builds TokenOrder entity with other information attached. + * @param oasisOrderbook A raw orderbook that we pull from the Oasis API. + * @param oasisMarket An object containing market data also directly from the API. + * @param observedTimestamp Time at which the orders for the market were pulled. + * @param source The exchange where these orders are placed. In this case 'oasis'. + */ +export function parseOasisOrders( + oasisOrderbook: OasisOrder[], + oasisMarket: OasisMarket, + observedTimestamp: number, + source: string, +): TokenOrder[] { + const aggregatedBids = aggregateOrders(R.filter(R.propEq('act', 'bid'), oasisOrderbook)); + const aggregatedAsks = aggregateOrders(R.filter(R.propEq('act', 'ask'), oasisOrderbook)); + const parsedBids = aggregatedBids.map(order => + parseOasisOrder(oasisMarket, observedTimestamp, 'bid', source, order), + ); + const parsedAsks = aggregatedAsks.map(order => + parseOasisOrder(oasisMarket, observedTimestamp, 'ask', source, order), + ); + return parsedBids.concat(parsedAsks); +} + +/** + * Parse a single aggregated Oasis order to form a tokenOrder entity + * which can be saved into the database. + * @param oasisMarket An object containing information about the market where these + * trades have been placed. + * @param observedTimestamp The time when the API response returned back to us. + * @param orderType 'bid' or 'ask' enum. + * @param source Exchange where these orders were placed. + * @param oasisOrder A <price, amount> tuple which we will convert to volume-basis. + */ +export function parseOasisOrder( + oasisMarket: OasisMarket, + observedTimestamp: number, + orderType: OrderType, + source: string, + oasisOrder: [string, BigNumber], +): TokenOrder { + const tokenOrder = new TokenOrder(); + const price = new BigNumber(oasisOrder[0]); + const amount = oasisOrder[1]; + + tokenOrder.source = source; + tokenOrder.observedTimestamp = observedTimestamp; + tokenOrder.orderType = orderType; + tokenOrder.price = price; + + tokenOrder.baseAssetSymbol = oasisMarket.base; + tokenOrder.baseAssetAddress = null; // Oasis doesn't provide address information + tokenOrder.baseVolume = price.times(amount); + + tokenOrder.quoteAssetSymbol = oasisMarket.quote; + tokenOrder.quoteAssetAddress = null; // Oasis doesn't provide address information + tokenOrder.quoteVolume = amount; + return tokenOrder; +} diff --git a/packages/pipeline/src/parsers/utils.ts b/packages/pipeline/src/parsers/utils.ts new file mode 100644 index 000000000..860729e9f --- /dev/null +++ b/packages/pipeline/src/parsers/utils.ts @@ -0,0 +1,28 @@ +import { BigNumber } from '@0x/utils'; + +export interface GenericRawOrder { + price: string; + amount: string; +} + +/** + * Aggregates individual orders by price point. Filters zero amount orders. + * @param rawOrders An array of objects that have price and amount information. + */ +export function aggregateOrders(rawOrders: GenericRawOrder[]): Array<[string, BigNumber]> { + const aggregatedOrders = new Map<string, BigNumber>(); + rawOrders.forEach(order => { + const amount = new BigNumber(order.amount); + if (amount.isZero()) { + return; + } + // Use string instead of BigNum to aggregate by value instead of variable. + // Convert to BigNumber first to consolidate different string + // representations of the same number. Eg. '0.0' and '0.00'. + const price = new BigNumber(order.price).toString(); + + const existingAmount = aggregatedOrders.get(price) || new BigNumber(0); + aggregatedOrders.set(price, amount.plus(existingAmount)); + }); + return Array.from(aggregatedOrders.entries()); +} diff --git a/packages/pipeline/src/scripts/pull_idex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_idex_orderbook_snapshots.ts new file mode 100644 index 000000000..d47c1dd3f --- /dev/null +++ b/packages/pipeline/src/scripts/pull_idex_orderbook_snapshots.ts @@ -0,0 +1,63 @@ +import { logUtils } from '@0x/utils'; +import * as R from 'ramda'; +import { Connection, ConnectionOptions, createConnection } from 'typeorm'; + +import { IDEX_SOURCE, IdexSource } from '../data_sources/idex'; +import { TokenOrderbookSnapshot as TokenOrder } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseIdexOrders } from '../parsers/idex_orders'; +import { handleError } from '../utils'; + +// Number of orders to save at once. +const BATCH_SAVE_SIZE = 1000; + +// Number of markets to retrieve orderbooks for at once. +const MARKET_ORDERBOOK_REQUEST_BATCH_SIZE = 100; + +// Delay between market orderbook requests. +const MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY = 2000; + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const idexSource = new IdexSource(); + logUtils.log('Getting all IDEX markets'); + const markets = await idexSource.getMarketsAsync(); + logUtils.log(`Got ${markets.length} markets.`); + for (const marketsChunk of R.splitEvery(MARKET_ORDERBOOK_REQUEST_BATCH_SIZE, markets)) { + await Promise.all( + marketsChunk.map(async (marketId: string) => getAndSaveMarketOrderbook(idexSource, marketId)), + ); + await new Promise<void>(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY)); + } + process.exit(0); +})().catch(handleError); + +/** + * Retrieve orderbook from Idex API for a given market. Parse orders and insert + * them into our database. + * @param idexSource Data source which can query Idex API. + * @param marketId String representing market of interest, eg. 'ETH_TIC'. + */ +async function getAndSaveMarketOrderbook(idexSource: IdexSource, marketId: string): Promise<void> { + logUtils.log(`${marketId}: Retrieving orderbook.`); + const orderBook = await idexSource.getMarketOrderbookAsync(marketId); + const observedTimestamp = Date.now(); + + if (!R.has('bids', orderBook) || !R.has('asks', orderBook)) { + logUtils.warn(`${marketId}: Orderbook faulty.`); + return; + } + + logUtils.log(`${marketId}: Parsing orders.`); + const orders = parseIdexOrders(orderBook, observedTimestamp, IDEX_SOURCE); + + if (orders.length > 0) { + logUtils.log(`${marketId}: Saving ${orders.length} orders.`); + const TokenOrderRepository = connection.getRepository(TokenOrder); + await TokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) }); + } else { + logUtils.log(`${marketId}: 0 orders to save.`); + } +} diff --git a/packages/pipeline/src/scripts/pull_oasis_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_oasis_orderbook_snapshots.ts new file mode 100644 index 000000000..0ffa5fd47 --- /dev/null +++ b/packages/pipeline/src/scripts/pull_oasis_orderbook_snapshots.ts @@ -0,0 +1,58 @@ +import { logUtils } from '@0x/utils'; +import * as R from 'ramda'; +import { Connection, ConnectionOptions, createConnection } from 'typeorm'; + +import { OASIS_SOURCE, OasisMarket, OasisSource } from '../data_sources/oasis'; +import { TokenOrderbookSnapshot as TokenOrder } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseOasisOrders } from '../parsers/oasis_orders'; +import { handleError } from '../utils'; + +// Number of orders to save at once. +const BATCH_SAVE_SIZE = 1000; + +// Number of markets to retrieve orderbooks for at once. +const MARKET_ORDERBOOK_REQUEST_BATCH_SIZE = 50; + +// Delay between market orderbook requests. +const MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY = 1000; + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const oasisSource = new OasisSource(); + logUtils.log('Getting all active Oasis markets'); + const markets = await oasisSource.getActiveMarketsAsync(); + logUtils.log(`Got ${markets.length} markets.`); + for (const marketsChunk of R.splitEvery(MARKET_ORDERBOOK_REQUEST_BATCH_SIZE, markets)) { + await Promise.all( + marketsChunk.map(async (market: OasisMarket) => getAndSaveMarketOrderbook(oasisSource, market)), + ); + await new Promise<void>(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY)); + } + process.exit(0); +})().catch(handleError); + +/** + * Retrieve orderbook from Oasis API for a given market. Parse orders and insert + * them into our database. + * @param oasisSource Data source which can query Oasis API. + * @param marketId String identifying market we want data for. eg. 'REPAUG'. + */ +async function getAndSaveMarketOrderbook(oasisSource: OasisSource, market: OasisMarket): Promise<void> { + logUtils.log(`${market.id}: Retrieving orderbook.`); + const orderBook = await oasisSource.getMarketOrderbookAsync(market.id); + const observedTimestamp = Date.now(); + + logUtils.log(`${market.id}: Parsing orders.`); + const orders = parseOasisOrders(orderBook, market, observedTimestamp, OASIS_SOURCE); + + if (orders.length > 0) { + logUtils.log(`${market.id}: Saving ${orders.length} orders.`); + const TokenOrderRepository = connection.getRepository(TokenOrder); + await TokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) }); + } else { + logUtils.log(`${market.id}: 0 orders to save.`); + } +} |