aboutsummaryrefslogtreecommitdiffstats
path: root/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts
blob: 34345f3557b9ddeb30ac0cfe2057c7717661c429 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import { logUtils } from '@0x/utils';
import { Connection, ConnectionOptions, createConnection } from 'typeorm';

import {
    PARADEX_SOURCE,
    ParadexActiveMarketsResponse,
    ParadexMarket,
    ParadexSource,
    ParadexTokenInfoResponse,
} from '../data_sources/paradex';
import { TokenOrderbookSnapshot as TokenOrder } from '../entities';
import * as ormConfig from '../ormconfig';
import { parseParadexOrders } from '../parsers/paradex_orders';
import { handleError } from '../utils';

// Number of orders to save at once.
const BATCH_SAVE_SIZE = 1000;

let connection: Connection;

(async () => {
    connection = await createConnection(ormConfig as ConnectionOptions);
    const apiKey = process.env.PARADEX_DATA_PIPELINE_API_KEY;
    if (apiKey === undefined) {
        throw new Error('Missing required env var: PARADEX_DATA_PIPELINE_API_KEY');
    }
    const paradexSource = new ParadexSource(apiKey);
    const markets = await paradexSource.getActiveMarketsAsync();
    const tokenInfoResponse = await paradexSource.getTokenInfoAsync();
    const extendedMarkets = addTokenAddresses(markets, tokenInfoResponse);
    await Promise.all(
        extendedMarkets.map(async (market: ParadexMarket) => getAndSaveMarketOrderbookAsync(paradexSource, market)),
    );
    process.exit(0);
})().catch(handleError);

/**
 * Extend the default ParadexMarket objects with token addresses.
 * @param markets An array of ParadexMarket objects.
 * @param tokenInfoResponse An array of ParadexTokenInfo containing the addresses.
 */
function addTokenAddresses(
    markets: ParadexActiveMarketsResponse,
    tokenInfoResponse: ParadexTokenInfoResponse,
): ParadexMarket[] {
    const symbolAddressMapping = new Map<string, string>();
    tokenInfoResponse.forEach(tokenInfo => symbolAddressMapping.set(tokenInfo.symbol, tokenInfo.address));

    markets.forEach((market: ParadexMarket) => {
        if (symbolAddressMapping.has(market.baseToken)) {
            market.baseTokenAddress = symbolAddressMapping.get(market.baseToken);
        } else {
            market.quoteTokenAddress = '';
            logUtils.warn(`${market.baseToken}: No address found.`);
        }

        if (symbolAddressMapping.has(market.quoteToken)) {
            market.quoteTokenAddress = symbolAddressMapping.get(market.quoteToken);
        } else {
            market.quoteTokenAddress = '';
            logUtils.warn(`${market.quoteToken}: No address found.`);
        }
    });
    return markets;
}

/**
 * Retrieve orderbook from Paradex API for a given market. Parse orders and insert
 * them into our database.
 * @param paradexSource Data source which can query the Paradex API.
 * @param market Object from the Paradex API with information about the market in question.
 */
async function getAndSaveMarketOrderbookAsync(paradexSource: ParadexSource, market: ParadexMarket): Promise<void> {
    const paradexOrderbookResponse = await paradexSource.getMarketOrderbookAsync(market.symbol);
    const observedTimestamp = Date.now();

    logUtils.log(`${market.symbol}: Parsing orders.`);
    const orders = parseParadexOrders(paradexOrderbookResponse, market, observedTimestamp, PARADEX_SOURCE);

    if (orders.length > 0) {
        logUtils.log(`${market.symbol}: Saving ${orders.length} orders.`);
        const tokenOrderRepository = connection.getRepository(TokenOrder);
        await tokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) });
    } else {
        logUtils.log(`${market.symbol}: 0 orders to save.`);
    }
}