aboutsummaryrefslogtreecommitdiffstats
path: root/packages/pipeline/src
diff options
context:
space:
mode:
authorAlex Browne <stephenalexbrowne@gmail.com>2018-11-20 10:38:11 +0800
committerAlex Browne <stephenalexbrowne@gmail.com>2018-12-05 06:24:48 +0800
commit9986717671fe8e14c2168f7479bdaffe406bedc0 (patch)
tree740250d59b8679756c7595ef49eba0d73cacbec5 /packages/pipeline/src
parent5cad2ad1744ab1c1e24ed52fc0a26ec5acf5c898 (diff)
downloaddexon-sol-tools-9986717671fe8e14c2168f7479bdaffe406bedc0.tar
dexon-sol-tools-9986717671fe8e14c2168f7479bdaffe406bedc0.tar.gz
dexon-sol-tools-9986717671fe8e14c2168f7479bdaffe406bedc0.tar.bz2
dexon-sol-tools-9986717671fe8e14c2168f7479bdaffe406bedc0.tar.lz
dexon-sol-tools-9986717671fe8e14c2168f7479bdaffe406bedc0.tar.xz
dexon-sol-tools-9986717671fe8e14c2168f7479bdaffe406bedc0.tar.zst
dexon-sol-tools-9986717671fe8e14c2168f7479bdaffe406bedc0.zip
Add script for pulling missing block data
Diffstat (limited to 'packages/pipeline/src')
-rw-r--r--packages/pipeline/src/entities/block.ts4
-rw-r--r--packages/pipeline/src/parsers/web3/index.ts5
-rw-r--r--packages/pipeline/src/scripts/pull_missing_blocks.ts83
3 files changed, 89 insertions, 3 deletions
diff --git a/packages/pipeline/src/entities/block.ts b/packages/pipeline/src/entities/block.ts
index 51be37703..f2efc6390 100644
--- a/packages/pipeline/src/entities/block.ts
+++ b/packages/pipeline/src/entities/block.ts
@@ -5,6 +5,6 @@ export class Block {
@PrimaryColumn() public hash!: string;
@PrimaryColumn() public number!: number;
- @Column({ name: 'unix_timestamp_seconds' })
- public unixTimestampSeconds!: number;
+ @Column({ name: 'timestamp' })
+ public timestamp!: number;
}
diff --git a/packages/pipeline/src/parsers/web3/index.ts b/packages/pipeline/src/parsers/web3/index.ts
index 2ead4c0e4..9b5b3b55d 100644
--- a/packages/pipeline/src/parsers/web3/index.ts
+++ b/packages/pipeline/src/parsers/web3/index.ts
@@ -2,6 +2,8 @@ import { BlockWithoutTransactionData, Transaction as EthTransaction } from 'ethe
import { Block, Transaction } from '../../entities';
+const MILLISECONDS_PER_SECOND = 1000;
+
/**
* Parses a raw block and returns a Block entity.
* @param rawBlock a raw block (e.g. returned from web3-wrapper).
@@ -17,7 +19,8 @@ export function parseBlock(rawBlock: BlockWithoutTransactionData): Block {
const block = new Block();
block.hash = rawBlock.hash;
block.number = rawBlock.number;
- block.unixTimestampSeconds = rawBlock.timestamp;
+ // Block timestamps are in seconds, but we use milliseconds everywhere else.
+ block.timestamp = rawBlock.timestamp * MILLISECONDS_PER_SECOND;
return block;
}
diff --git a/packages/pipeline/src/scripts/pull_missing_blocks.ts b/packages/pipeline/src/scripts/pull_missing_blocks.ts
new file mode 100644
index 000000000..4a1483ab9
--- /dev/null
+++ b/packages/pipeline/src/scripts/pull_missing_blocks.ts
@@ -0,0 +1,83 @@
+// tslint:disable:no-console
+import { web3Factory } from '@0x/dev-utils';
+import * as Parallel from 'async-parallel';
+import R = require('ramda');
+import 'reflect-metadata';
+import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm';
+
+import { Web3Source } from '../data_sources/web3';
+import { Block } from '../entities';
+import * as ormConfig from '../ormconfig';
+import { parseBlock } from '../parsers/web3';
+import { handleError } from '../utils';
+
+// Number of blocks to save at once.
+const BATCH_SAVE_SIZE = 1000;
+// Maximum number of requests to send at once.
+const MAX_CONCURRENCY = 10;
+// Maximum number of blocks to query for at once. This is also the maximum
+// number of blocks we will hold in memory prior to being saved to the database.
+const MAX_BLOCKS_PER_QUERY = 1000;
+// Block number when the Exchange contract was deployed to mainnet.
+// TODO(albrow): De-dupe this constant.
+const EXCHANGE_START_BLOCK = 6271590;
+
+let connection: Connection;
+
+(async () => {
+ connection = await createConnection(ormConfig as ConnectionOptions);
+ const provider = web3Factory.getRpcProvider({
+ rpcUrl: `https://mainnet.infura.io/${process.env.INFURA_API_KEY}`,
+ });
+ const web3Source = new Web3Source(provider);
+ await getAllMissingBlocks(web3Source);
+ process.exit(0);
+})().catch(handleError);
+
+interface MissingBlocksResponse {
+ block_number: string;
+}
+
+async function getAllMissingBlocks(web3Source: Web3Source): Promise<void> {
+ const blocksRepository = connection.getRepository(Block);
+ let fromBlock = EXCHANGE_START_BLOCK;
+ while (true) {
+ const blockNumbers = await getMissingBlockNumbers(fromBlock);
+ if (blockNumbers.length === 0) {
+ // There are no more missing blocks. We're done.
+ break;
+ }
+ await getAndSaveBlocks(web3Source, blocksRepository, blockNumbers);
+ fromBlock = Math.max(...blockNumbers) + 1;
+ }
+ const totalBlocks = await blocksRepository.count();
+ console.log(`Done saving blocks. There are now ${totalBlocks} total blocks.`);
+}
+
+async function getMissingBlockNumbers(fromBlock: number): Promise<number[]> {
+ console.log(`Checking for missing blocks starting at ${fromBlock}...`);
+ const response = (await connection.query(
+ 'SELECT DISTINCT(block_number) FROM raw.exchange_fill_events WHERE block_number NOT IN (SELECT number FROM raw.blocks) AND block_number >= $1 ORDER BY block_number ASC LIMIT $2',
+ [fromBlock, MAX_BLOCKS_PER_QUERY],
+ )) as MissingBlocksResponse[];
+ const blockNumberStrings = R.pluck('block_number', response);
+ const blockNumbers = R.map(parseInt, blockNumberStrings);
+ console.log(`Found ${blockNumbers.length} missing blocks in the given range.`);
+ return blockNumbers;
+}
+
+async function getAndSaveBlocks(
+ web3Source: Web3Source,
+ blocksRepository: Repository<Block>,
+ blockNumbers: number[],
+): Promise<void> {
+ console.log(`Getting block data for ${blockNumbers.length} blocks...`);
+ Parallel.setConcurrency(MAX_CONCURRENCY);
+ const rawBlocks = await Parallel.map(blockNumbers, async (blockNumber: number) =>
+ web3Source.getBlockInfoAsync(blockNumber),
+ );
+ console.log(`Parsing ${rawBlocks.length} blocks...`);
+ const blocks = R.map(parseBlock, rawBlocks);
+ console.log(`Saving ${blocks.length} blocks...`);
+ await blocksRepository.save(blocks, { chunk: Math.ceil(blocks.length / BATCH_SAVE_SIZE) });
+}