From b4cdb14b9b79589d7b24fd7655406c15b6bb00f6 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Tue, 11 Dec 2018 15:16:05 -0800 Subject: Refactor event scraping and add support for scraping ERC20 approval events (#1401) * Refactor event scraping and add support for scraping ERC20 approval events * Add tests for data_sources/contract-wrappers/utils --- .../pipeline/src/scripts/pull_missing_events.ts | 136 --------------------- 1 file changed, 136 deletions(-) delete mode 100644 packages/pipeline/src/scripts/pull_missing_events.ts (limited to 'packages/pipeline/src/scripts/pull_missing_events.ts') diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts deleted file mode 100644 index 80abbb8b0..000000000 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ /dev/null @@ -1,136 +0,0 @@ -// tslint:disable:no-console -import { web3Factory } from '@0x/dev-utils'; -import R = require('ramda'); -import 'reflect-metadata'; -import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; - -import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; -import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeEvent, ExchangeFillEvent } from '../entities'; -import * as ormConfig from '../ormconfig'; -import { parseExchangeCancelEvents, parseExchangeCancelUpToEvents, parseExchangeFillEvents } from '../parsers/events'; -import { EXCHANGE_START_BLOCK, handleError, INFURA_ROOT_URL } from '../utils'; - -const START_BLOCK_OFFSET = 100; // Number of blocks before the last known block to consider when updating fill events. -const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. - -let connection: Connection; - -(async () => { - connection = await createConnection(ormConfig as ConnectionOptions); - const provider = web3Factory.getRpcProvider({ - rpcUrl: INFURA_ROOT_URL, - }); - const eventsSource = new ExchangeEventsSource(provider, 1); - await getFillEventsAsync(eventsSource); - await getCancelEventsAsync(eventsSource); - await getCancelUpToEventsAsync(eventsSource); - process.exit(0); -})().catch(handleError); - -async function getFillEventsAsync(eventsSource: ExchangeEventsSource): Promise { - console.log('Checking existing fill events...'); - const repository = connection.getRepository(ExchangeFillEvent); - const startBlock = await getStartBlockAsync(repository); - console.log(`Getting fill events starting at ${startBlock}...`); - const eventLogs = await eventsSource.getFillEventsAsync(startBlock); - console.log('Parsing fill events...'); - const events = parseExchangeFillEvents(eventLogs); - console.log(`Retrieved and parsed ${events.length} total fill events.`); - await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); -} - -async function getCancelEventsAsync(eventsSource: ExchangeEventsSource): Promise { - console.log('Checking existing cancel events...'); - const repository = connection.getRepository(ExchangeCancelEvent); - const startBlock = await getStartBlockAsync(repository); - console.log(`Getting cancel events starting at ${startBlock}...`); - const eventLogs = await eventsSource.getCancelEventsAsync(startBlock); - console.log('Parsing cancel events...'); - const events = parseExchangeCancelEvents(eventLogs); - console.log(`Retrieved and parsed ${events.length} total cancel events.`); - await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); -} - -async function getCancelUpToEventsAsync(eventsSource: ExchangeEventsSource): Promise { - console.log('Checking existing CancelUpTo events...'); - const repository = connection.getRepository(ExchangeCancelUpToEvent); - const startBlock = await getStartBlockAsync(repository); - console.log(`Getting CancelUpTo events starting at ${startBlock}...`); - const eventLogs = await eventsSource.getCancelUpToEventsAsync(startBlock); - console.log('Parsing CancelUpTo events...'); - const events = parseExchangeCancelUpToEvents(eventLogs); - console.log(`Retrieved and parsed ${events.length} total CancelUpTo events.`); - await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); -} - -const tableNameRegex = /^[a-zA-Z_]*$/; - -async function getStartBlockAsync(repository: Repository): Promise { - const fillEventCount = await repository.count(); - if (fillEventCount === 0) { - console.log(`No existing ${repository.metadata.name}s found.`); - return EXCHANGE_START_BLOCK; - } - const tableName = repository.metadata.tableName; - if (!tableNameRegex.test(tableName)) { - throw new Error(`Unexpected special character in table name: ${tableName}`); - } - const queryResult = await connection.query( - `SELECT block_number FROM raw.${tableName} ORDER BY block_number DESC LIMIT 1`, - ); - const lastKnownBlock = queryResult[0].block_number; - return lastKnownBlock - START_BLOCK_OFFSET; -} - -async function saveEventsAsync( - isInitialPull: boolean, - repository: Repository, - events: T[], -): Promise { - console.log(`Saving ${repository.metadata.name}s...`); - if (isInitialPull) { - // Split data into numChunks pieces of maximum size BATCH_SAVE_SIZE - // each. - for (const eventsBatch of R.splitEvery(BATCH_SAVE_SIZE, events)) { - await repository.insert(eventsBatch); - } - } else { - // If we possibly have some overlap where we need to update some - // existing events, we need to use our workaround/fallback. - await saveIndividuallyWithFallbackAsync(repository, events); - } - const totalEvents = await repository.count(); - console.log(`Done saving events. There are now ${totalEvents} total ${repository.metadata.name}s.`); -} - -async function saveIndividuallyWithFallbackAsync( - repository: Repository, - events: T[], -): Promise { - // Note(albrow): This is a temporary hack because `save` is not working as - // documented and is causing a foreign key constraint violation. Hopefully - // can remove later because this "poor man's upsert" implementation operates - // on one event at a time and is therefore much slower. - for (const event of events) { - try { - // First try an insert. - await repository.insert(event); - } catch { - // If it fails, assume it was a foreign key constraint error and try - // doing an update instead. - // Note(albrow): Unfortunately the `as any` hack here seems - // required. I can't figure out how to convince the type-checker - // that the criteria and the entity itself are the correct type for - // the given repository. If we can remove the `save` hack then this - // will probably no longer be necessary. - await repository.update( - { - contractAddress: event.contractAddress, - blockNumber: event.blockNumber, - logIndex: event.logIndex, - } as any, - event as any, - ); - } - } -} -- cgit v1.2.3