diff options
author | Alex Browne <stephenalexbrowne@gmail.com> | 2018-12-12 07:16:05 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-12-12 07:16:05 +0800 |
commit | b4cdb14b9b79589d7b24fd7655406c15b6bb00f6 (patch) | |
tree | ea004984d12ff2337387356f4b14ad087740adc2 /packages/pipeline/src/scripts | |
parent | d37680610b772d7bb585203047bef0af0439df0a (diff) | |
download | dexon-sol-tools-b4cdb14b9b79589d7b24fd7655406c15b6bb00f6.tar dexon-sol-tools-b4cdb14b9b79589d7b24fd7655406c15b6bb00f6.tar.gz dexon-sol-tools-b4cdb14b9b79589d7b24fd7655406c15b6bb00f6.tar.bz2 dexon-sol-tools-b4cdb14b9b79589d7b24fd7655406c15b6bb00f6.tar.lz dexon-sol-tools-b4cdb14b9b79589d7b24fd7655406c15b6bb00f6.tar.xz dexon-sol-tools-b4cdb14b9b79589d7b24fd7655406c15b6bb00f6.tar.zst dexon-sol-tools-b4cdb14b9b79589d7b24fd7655406c15b6bb00f6.zip |
Refactor event scraping and add support for scraping ERC20 approval events (#1401)
* Refactor event scraping and add support for scraping ERC20 approval events
* Add tests for data_sources/contract-wrappers/utils
Diffstat (limited to 'packages/pipeline/src/scripts')
-rw-r--r-- | packages/pipeline/src/scripts/pull_erc20_events.ts | 66 | ||||
-rw-r--r-- | packages/pipeline/src/scripts/pull_exchange_events.ts (renamed from packages/pipeline/src/scripts/pull_missing_events.ts) | 28 | ||||
-rw-r--r-- | packages/pipeline/src/scripts/pull_missing_blocks.ts | 2 |
3 files changed, 86 insertions, 10 deletions
diff --git a/packages/pipeline/src/scripts/pull_erc20_events.ts b/packages/pipeline/src/scripts/pull_erc20_events.ts new file mode 100644 index 000000000..0ad12c97a --- /dev/null +++ b/packages/pipeline/src/scripts/pull_erc20_events.ts @@ -0,0 +1,66 @@ +// tslint:disable:no-console +import { getContractAddressesForNetworkOrThrow } from '@0x/contract-addresses'; +import { web3Factory } from '@0x/dev-utils'; +import { Web3ProviderEngine } from '@0x/subproviders'; +import { Web3Wrapper } from '@0x/web3-wrapper'; +import 'reflect-metadata'; +import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; + +import { ERC20EventsSource } from '../data_sources/contract-wrappers/erc20_events'; +import { ERC20ApprovalEvent } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseERC20ApprovalEvents } from '../parsers/events'; +import { handleError, INFURA_ROOT_URL } from '../utils'; + +const NETWORK_ID = 1; +const START_BLOCK_OFFSET = 100; // Number of blocks before the last known block to consider when updating fill events. +const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. +const BLOCK_FINALITY_THRESHOLD = 10; // When to consider blocks as final. Used to compute default endBlock. +const WETH_START_BLOCK = 4719568; // Block number when the WETH contract was deployed. + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const provider = web3Factory.getRpcProvider({ + rpcUrl: INFURA_ROOT_URL, + }); + const endBlock = await calculateEndBlockAsync(provider); + await getAndSaveWETHApprovalEventsAsync(provider, endBlock); + process.exit(0); +})().catch(handleError); + +async function getAndSaveWETHApprovalEventsAsync(provider: Web3ProviderEngine, endBlock: number): Promise<void> { + console.log('Checking existing approval events...'); + const repository = connection.getRepository(ERC20ApprovalEvent); + const startBlock = (await getStartBlockAsync(repository)) || WETH_START_BLOCK; + + console.log(`Getting WETH approval events starting at ${startBlock}...`); + const wethTokenAddress = getContractAddressesForNetworkOrThrow(NETWORK_ID).etherToken; + const eventsSource = new ERC20EventsSource(provider, NETWORK_ID, wethTokenAddress); + const eventLogs = await eventsSource.getApprovalEventsAsync(startBlock, endBlock); + + console.log(`Parsing ${eventLogs.length} WETH approval events...`); + const events = parseERC20ApprovalEvents(eventLogs); + console.log(`Retrieved and parsed ${events.length} total WETH approval events.`); + await repository.save(events, { chunk: Math.ceil(events.length / BATCH_SAVE_SIZE) }); +} + +async function calculateEndBlockAsync(provider: Web3ProviderEngine): Promise<number> { + const web3Wrapper = new Web3Wrapper(provider); + const currentBlock = await web3Wrapper.getBlockNumberAsync(); + return currentBlock - BLOCK_FINALITY_THRESHOLD; +} + +async function getStartBlockAsync(repository: Repository<ERC20ApprovalEvent>): Promise<number | null> { + const fillEventCount = await repository.count(); + if (fillEventCount === 0) { + console.log(`No existing approval events found.`); + return null; + } + const queryResult = await connection.query( + `SELECT block_number FROM raw.erc20_approval_events ORDER BY block_number DESC LIMIT 1`, + ); + const lastKnownBlock = queryResult[0].block_number; + return lastKnownBlock - START_BLOCK_OFFSET; +} diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_exchange_events.ts index 80abbb8b0..e98fc6629 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_exchange_events.ts @@ -1,5 +1,7 @@ // tslint:disable:no-console import { web3Factory } from '@0x/dev-utils'; +import { Web3ProviderEngine } from '@0x/subproviders'; +import { Web3Wrapper } from '@0x/web3-wrapper'; import R = require('ramda'); import 'reflect-metadata'; import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; @@ -12,6 +14,7 @@ import { EXCHANGE_START_BLOCK, handleError, INFURA_ROOT_URL } from '../utils'; const START_BLOCK_OFFSET = 100; // Number of blocks before the last known block to consider when updating fill events. const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. +const BLOCK_FINALITY_THRESHOLD = 10; // When to consider blocks as final. Used to compute default endBlock. let connection: Connection; @@ -20,43 +23,44 @@ let connection: Connection; const provider = web3Factory.getRpcProvider({ rpcUrl: INFURA_ROOT_URL, }); + const endBlock = await calculateEndBlockAsync(provider); const eventsSource = new ExchangeEventsSource(provider, 1); - await getFillEventsAsync(eventsSource); - await getCancelEventsAsync(eventsSource); - await getCancelUpToEventsAsync(eventsSource); + await getFillEventsAsync(eventsSource, endBlock); + await getCancelEventsAsync(eventsSource, endBlock); + await getCancelUpToEventsAsync(eventsSource, endBlock); process.exit(0); })().catch(handleError); -async function getFillEventsAsync(eventsSource: ExchangeEventsSource): Promise<void> { +async function getFillEventsAsync(eventsSource: ExchangeEventsSource, endBlock: number): Promise<void> { console.log('Checking existing fill events...'); const repository = connection.getRepository(ExchangeFillEvent); const startBlock = await getStartBlockAsync(repository); console.log(`Getting fill events starting at ${startBlock}...`); - const eventLogs = await eventsSource.getFillEventsAsync(startBlock); + const eventLogs = await eventsSource.getFillEventsAsync(startBlock, endBlock); console.log('Parsing fill events...'); const events = parseExchangeFillEvents(eventLogs); console.log(`Retrieved and parsed ${events.length} total fill events.`); await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); } -async function getCancelEventsAsync(eventsSource: ExchangeEventsSource): Promise<void> { +async function getCancelEventsAsync(eventsSource: ExchangeEventsSource, endBlock: number): Promise<void> { console.log('Checking existing cancel events...'); const repository = connection.getRepository(ExchangeCancelEvent); const startBlock = await getStartBlockAsync(repository); console.log(`Getting cancel events starting at ${startBlock}...`); - const eventLogs = await eventsSource.getCancelEventsAsync(startBlock); + const eventLogs = await eventsSource.getCancelEventsAsync(startBlock, endBlock); console.log('Parsing cancel events...'); const events = parseExchangeCancelEvents(eventLogs); console.log(`Retrieved and parsed ${events.length} total cancel events.`); await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); } -async function getCancelUpToEventsAsync(eventsSource: ExchangeEventsSource): Promise<void> { +async function getCancelUpToEventsAsync(eventsSource: ExchangeEventsSource, endBlock: number): Promise<void> { console.log('Checking existing CancelUpTo events...'); const repository = connection.getRepository(ExchangeCancelUpToEvent); const startBlock = await getStartBlockAsync(repository); console.log(`Getting CancelUpTo events starting at ${startBlock}...`); - const eventLogs = await eventsSource.getCancelUpToEventsAsync(startBlock); + const eventLogs = await eventsSource.getCancelUpToEventsAsync(startBlock, endBlock); console.log('Parsing CancelUpTo events...'); const events = parseExchangeCancelUpToEvents(eventLogs); console.log(`Retrieved and parsed ${events.length} total CancelUpTo events.`); @@ -134,3 +138,9 @@ async function saveIndividuallyWithFallbackAsync<T extends ExchangeEvent>( } } } + +async function calculateEndBlockAsync(provider: Web3ProviderEngine): Promise<number> { + const web3Wrapper = new Web3Wrapper(provider); + const currentBlock = await web3Wrapper.getBlockNumberAsync(); + return currentBlock - BLOCK_FINALITY_THRESHOLD; +} diff --git a/packages/pipeline/src/scripts/pull_missing_blocks.ts b/packages/pipeline/src/scripts/pull_missing_blocks.ts index b7bd51f08..18fe1b700 100644 --- a/packages/pipeline/src/scripts/pull_missing_blocks.ts +++ b/packages/pipeline/src/scripts/pull_missing_blocks.ts @@ -24,7 +24,7 @@ let connection: Connection; (async () => { connection = await createConnection(ormConfig as ConnectionOptions); const provider = web3Factory.getRpcProvider({ - rpcUrl: `${INFURA_ROOT_URL}/${process.env.INFURA_API_KEY}`, + rpcUrl: INFURA_ROOT_URL, }); const web3Source = new Web3Source(provider); await getAllMissingBlocks(web3Source); |