diff options
-rw-r--r-- | packages/pipeline/package.json | 3 | ||||
-rw-r--r-- | packages/pipeline/src/entities/block.ts | 4 | ||||
-rw-r--r-- | packages/pipeline/src/parsers/web3/index.ts | 5 | ||||
-rw-r--r-- | packages/pipeline/src/scripts/pull_missing_blocks.ts | 83 | ||||
-rw-r--r-- | yarn.lock | 4 |
5 files changed, 95 insertions, 4 deletions
diff --git a/packages/pipeline/package.json b/packages/pipeline/package.json index dcd19d150..a57fbf5bc 100644 --- a/packages/pipeline/package.json +++ b/packages/pipeline/package.json @@ -30,8 +30,8 @@ "license": "Apache-2.0", "devDependencies": { "@0x/tslint-config": "^1.0.9", - "@types/ramda": "^0.25.38", "@types/axios": "^0.14.0", + "@types/ramda": "^0.25.38", "chai": "^4.1.2", "chai-as-promised": "^7.1.1", "chai-bignumber": "^2.0.2", @@ -50,6 +50,7 @@ "@0x/types": "^1.2.0", "@0x/utils": "^2.0.3", "@0x/web3-wrapper": "^3.1.0", + "async-parallel": "^1.2.3", "axios": "^0.18.0", "ethereum-types": "^1.0.6", "pg": "^7.5.0", diff --git a/packages/pipeline/src/entities/block.ts b/packages/pipeline/src/entities/block.ts index 51be37703..f2efc6390 100644 --- a/packages/pipeline/src/entities/block.ts +++ b/packages/pipeline/src/entities/block.ts @@ -5,6 +5,6 @@ export class Block { @PrimaryColumn() public hash!: string; @PrimaryColumn() public number!: number; - @Column({ name: 'unix_timestamp_seconds' }) - public unixTimestampSeconds!: number; + @Column({ name: 'timestamp' }) + public timestamp!: number; } diff --git a/packages/pipeline/src/parsers/web3/index.ts b/packages/pipeline/src/parsers/web3/index.ts index 2ead4c0e4..9b5b3b55d 100644 --- a/packages/pipeline/src/parsers/web3/index.ts +++ b/packages/pipeline/src/parsers/web3/index.ts @@ -2,6 +2,8 @@ import { BlockWithoutTransactionData, Transaction as EthTransaction } from 'ethe import { Block, Transaction } from '../../entities'; +const MILLISECONDS_PER_SECOND = 1000; + /** * Parses a raw block and returns a Block entity. * @param rawBlock a raw block (e.g. returned from web3-wrapper). @@ -17,7 +19,8 @@ export function parseBlock(rawBlock: BlockWithoutTransactionData): Block { const block = new Block(); block.hash = rawBlock.hash; block.number = rawBlock.number; - block.unixTimestampSeconds = rawBlock.timestamp; + // Block timestamps are in seconds, but we use milliseconds everywhere else. + block.timestamp = rawBlock.timestamp * MILLISECONDS_PER_SECOND; return block; } diff --git a/packages/pipeline/src/scripts/pull_missing_blocks.ts b/packages/pipeline/src/scripts/pull_missing_blocks.ts new file mode 100644 index 000000000..4a1483ab9 --- /dev/null +++ b/packages/pipeline/src/scripts/pull_missing_blocks.ts @@ -0,0 +1,83 @@ +// tslint:disable:no-console +import { web3Factory } from '@0x/dev-utils'; +import * as Parallel from 'async-parallel'; +import R = require('ramda'); +import 'reflect-metadata'; +import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; + +import { Web3Source } from '../data_sources/web3'; +import { Block } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseBlock } from '../parsers/web3'; +import { handleError } from '../utils'; + +// Number of blocks to save at once. +const BATCH_SAVE_SIZE = 1000; +// Maximum number of requests to send at once. +const MAX_CONCURRENCY = 10; +// Maximum number of blocks to query for at once. This is also the maximum +// number of blocks we will hold in memory prior to being saved to the database. +const MAX_BLOCKS_PER_QUERY = 1000; +// Block number when the Exchange contract was deployed to mainnet. +// TODO(albrow): De-dupe this constant. +const EXCHANGE_START_BLOCK = 6271590; + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const provider = web3Factory.getRpcProvider({ + rpcUrl: `https://mainnet.infura.io/${process.env.INFURA_API_KEY}`, + }); + const web3Source = new Web3Source(provider); + await getAllMissingBlocks(web3Source); + process.exit(0); +})().catch(handleError); + +interface MissingBlocksResponse { + block_number: string; +} + +async function getAllMissingBlocks(web3Source: Web3Source): Promise<void> { + const blocksRepository = connection.getRepository(Block); + let fromBlock = EXCHANGE_START_BLOCK; + while (true) { + const blockNumbers = await getMissingBlockNumbers(fromBlock); + if (blockNumbers.length === 0) { + // There are no more missing blocks. We're done. + break; + } + await getAndSaveBlocks(web3Source, blocksRepository, blockNumbers); + fromBlock = Math.max(...blockNumbers) + 1; + } + const totalBlocks = await blocksRepository.count(); + console.log(`Done saving blocks. There are now ${totalBlocks} total blocks.`); +} + +async function getMissingBlockNumbers(fromBlock: number): Promise<number[]> { + console.log(`Checking for missing blocks starting at ${fromBlock}...`); + const response = (await connection.query( + 'SELECT DISTINCT(block_number) FROM raw.exchange_fill_events WHERE block_number NOT IN (SELECT number FROM raw.blocks) AND block_number >= $1 ORDER BY block_number ASC LIMIT $2', + [fromBlock, MAX_BLOCKS_PER_QUERY], + )) as MissingBlocksResponse[]; + const blockNumberStrings = R.pluck('block_number', response); + const blockNumbers = R.map(parseInt, blockNumberStrings); + console.log(`Found ${blockNumbers.length} missing blocks in the given range.`); + return blockNumbers; +} + +async function getAndSaveBlocks( + web3Source: Web3Source, + blocksRepository: Repository<Block>, + blockNumbers: number[], +): Promise<void> { + console.log(`Getting block data for ${blockNumbers.length} blocks...`); + Parallel.setConcurrency(MAX_CONCURRENCY); + const rawBlocks = await Parallel.map(blockNumbers, async (blockNumber: number) => + web3Source.getBlockInfoAsync(blockNumber), + ); + console.log(`Parsing ${rawBlocks.length} blocks...`); + const blocks = R.map(parseBlock, rawBlocks); + console.log(`Saving ${blocks.length} blocks...`); + await blocksRepository.save(blocks, { chunk: Math.ceil(blocks.length / BATCH_SAVE_SIZE) }); +} @@ -2373,6 +2373,10 @@ async-limiter@~1.0.0: version "1.0.0" resolved "https://registry.yarnpkg.com/async-limiter/-/async-limiter-1.0.0.tgz#78faed8c3d074ab81f22b4e985d79e8738f720f8" +async-parallel@^1.2.3: + version "1.2.3" + resolved "https://registry.yarnpkg.com/async-parallel/-/async-parallel-1.2.3.tgz#0b90550aeffb7a365d8cee881eb0618f656a3450" + async@1.x, async@^1.4.0, async@^1.4.2, async@^1.5.2: version "1.5.2" resolved "https://registry.yarnpkg.com/async/-/async-1.5.2.tgz#ec6a61ae56480c0c3cb241c95618e20892f9672a" |