diff options
author | Leonid Logvinov <logvinov.leon@gmail.com> | 2019-01-09 19:02:25 +0800 |
---|---|---|
committer | Leonid Logvinov <logvinov.leon@gmail.com> | 2019-01-09 19:02:25 +0800 |
commit | ea14913b412e78ff458bdfba47182f7363e776e5 (patch) | |
tree | 3ee220bfbbd9923b5e1adc36ee51f9b5d39ad640 /packages/pipeline/src/scripts/pull_missing_blocks.ts | |
parent | 5868c91cfb54cfa9177572b201d88d1168bf5b06 (diff) | |
parent | 5dd55491b86bf8577405e37d0f2d668aa1273b10 (diff) | |
download | dexon-sol-tools-ea14913b412e78ff458bdfba47182f7363e776e5.tar dexon-sol-tools-ea14913b412e78ff458bdfba47182f7363e776e5.tar.gz dexon-sol-tools-ea14913b412e78ff458bdfba47182f7363e776e5.tar.bz2 dexon-sol-tools-ea14913b412e78ff458bdfba47182f7363e776e5.tar.lz dexon-sol-tools-ea14913b412e78ff458bdfba47182f7363e776e5.tar.xz dexon-sol-tools-ea14913b412e78ff458bdfba47182f7363e776e5.tar.zst dexon-sol-tools-ea14913b412e78ff458bdfba47182f7363e776e5.zip |
Merge development
Diffstat (limited to 'packages/pipeline/src/scripts/pull_missing_blocks.ts')
-rw-r--r-- | packages/pipeline/src/scripts/pull_missing_blocks.ts | 90 |
1 files changed, 90 insertions, 0 deletions
diff --git a/packages/pipeline/src/scripts/pull_missing_blocks.ts b/packages/pipeline/src/scripts/pull_missing_blocks.ts new file mode 100644 index 000000000..ced9d99eb --- /dev/null +++ b/packages/pipeline/src/scripts/pull_missing_blocks.ts @@ -0,0 +1,90 @@ +// tslint:disable:no-console +import { web3Factory } from '@0x/dev-utils'; +import * as Parallel from 'async-parallel'; +import R = require('ramda'); +import 'reflect-metadata'; +import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; + +import { Web3Source } from '../data_sources/web3'; +import { Block } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseBlock } from '../parsers/web3'; +import { handleError, INFURA_ROOT_URL } from '../utils'; + +// Number of blocks to save at once. +const BATCH_SAVE_SIZE = 1000; +// Maximum number of requests to send at once. +const MAX_CONCURRENCY = 20; +// Maximum number of blocks to query for at once. This is also the maximum +// number of blocks we will hold in memory prior to being saved to the database. +const MAX_BLOCKS_PER_QUERY = 1000; + +let connection: Connection; + +const tablesWithMissingBlocks = [ + 'raw.exchange_fill_events', + 'raw.exchange_cancel_events', + 'raw.exchange_cancel_up_to_events', + 'raw.erc20_approval_events', +]; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const provider = web3Factory.getRpcProvider({ + rpcUrl: INFURA_ROOT_URL, + }); + const web3Source = new Web3Source(provider); + for (const tableName of tablesWithMissingBlocks) { + await getAllMissingBlocksAsync(web3Source, tableName); + } + process.exit(0); +})().catch(handleError); + +interface MissingBlocksResponse { + block_number: string; +} + +async function getAllMissingBlocksAsync(web3Source: Web3Source, tableName: string): Promise<void> { + const blocksRepository = connection.getRepository(Block); + while (true) { + console.log(`Checking for missing blocks in ${tableName}...`); + const blockNumbers = await getMissingBlockNumbersAsync(tableName); + if (blockNumbers.length === 0) { + // There are no more missing blocks. We're done. + break; + } + await getAndSaveBlocksAsync(web3Source, blocksRepository, blockNumbers); + } + const totalBlocks = await blocksRepository.count(); + console.log(`Done saving blocks for ${tableName}. There are now ${totalBlocks} total blocks.`); +} + +async function getMissingBlockNumbersAsync(tableName: string): Promise<number[]> { + // This query returns up to `MAX_BLOCKS_PER_QUERY` distinct block numbers + // which are present in `tableName` but not in `raw.blocks`. + const response = (await connection.query( + `SELECT DISTINCT(block_number) FROM ${tableName} LEFT JOIN raw.blocks ON ${tableName}.block_number = raw.blocks.number WHERE number IS NULL LIMIT $1;`, + [MAX_BLOCKS_PER_QUERY], + )) as MissingBlocksResponse[]; + const blockNumberStrings = R.pluck('block_number', response); + const blockNumbers = R.map(parseInt, blockNumberStrings); + console.log(`Found ${blockNumbers.length} missing blocks.`); + return blockNumbers; +} + +async function getAndSaveBlocksAsync( + web3Source: Web3Source, + blocksRepository: Repository<Block>, + blockNumbers: number[], +): Promise<void> { + console.log(`Getting block data for ${blockNumbers.length} blocks...`); + Parallel.setConcurrency(MAX_CONCURRENCY); + const rawBlocks = await Parallel.map(blockNumbers, async (blockNumber: number) => + web3Source.getBlockInfoAsync(blockNumber), + ); + console.log(`Parsing ${rawBlocks.length} blocks...`); + const blocks = R.map(parseBlock, rawBlocks); + console.log(`Saving ${blocks.length} blocks...`); + await blocksRepository.save(blocks, { chunk: Math.ceil(blocks.length / BATCH_SAVE_SIZE) }); + console.log('Done saving this batch of blocks'); +} |