diff options
Diffstat (limited to 'packages/pipeline/src')
4 files changed, 150 insertions, 87 deletions
diff --git a/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts b/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts index e25c6a731..d09f59535 100644 --- a/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts +++ b/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts @@ -1,4 +1,12 @@ -import { ContractWrappers, ExchangeEvents, ExchangeFillEventArgs, ExchangeWrapper } from '@0x/contract-wrappers'; +import { + ContractWrappers, + ExchangeCancelEventArgs, + ExchangeCancelUpToEventArgs, + ExchangeEventArgs, + ExchangeEvents, + ExchangeFillEventArgs, + ExchangeWrapper, +} from '@0x/contract-wrappers'; import { Web3ProviderEngine } from '@0x/subproviders'; import { Web3Wrapper } from '@0x/web3-wrapper'; import { LogWithDecodedArgs } from 'ethereum-types'; @@ -16,20 +24,41 @@ export class ExchangeEventsSource { this._exchangeWrapper = contractWrappers.exchange; } - // TODO(albrow): Get Cancel and CancelUpTo events. - public async getFillEventsAsync( - fromBlock: number = EXCHANGE_START_BLOCK, + fromBlock?: number, toBlock?: number, ): Promise<Array<LogWithDecodedArgs<ExchangeFillEventArgs>>> { + return this._getEventsAsync<ExchangeFillEventArgs>(ExchangeEvents.Fill, fromBlock, toBlock); + } + + public async getCancelEventsAsync( + fromBlock?: number, + toBlock?: number, + ): Promise<Array<LogWithDecodedArgs<ExchangeCancelEventArgs>>> { + return this._getEventsAsync<ExchangeCancelEventArgs>(ExchangeEvents.Cancel, fromBlock, toBlock); + } + + public async getCancelUpToEventsAsync( + fromBlock?: number, + toBlock?: number, + ): Promise<Array<LogWithDecodedArgs<ExchangeCancelUpToEventArgs>>> { + return this._getEventsAsync<ExchangeCancelUpToEventArgs>(ExchangeEvents.CancelUpTo, fromBlock, toBlock); + } + + private async _getEventsAsync<ArgsType extends ExchangeEventArgs>( + eventName: ExchangeEvents, + fromBlock: number = EXCHANGE_START_BLOCK, + toBlock?: number, + ): Promise<Array<LogWithDecodedArgs<ArgsType>>> { const calculatedToBlock = toBlock === undefined ? (await this._web3Wrapper.getBlockNumberAsync()) - BLOCK_FINALITY_THRESHOLD : toBlock; - let events: Array<LogWithDecodedArgs<ExchangeFillEventArgs>> = []; + let events: Array<LogWithDecodedArgs<ArgsType>> = []; for (let currFromBlock = fromBlock; currFromBlock <= calculatedToBlock; currFromBlock += NUM_BLOCKS_PER_QUERY) { events = events.concat( - await this._getFillEventsForRangeAsync( + await this._getEventsForRangeAsync<ArgsType>( + eventName, currFromBlock, Math.min(currFromBlock + NUM_BLOCKS_PER_QUERY - 1, calculatedToBlock), ), @@ -38,12 +67,13 @@ export class ExchangeEventsSource { return events; } - private async _getFillEventsForRangeAsync( + private async _getEventsForRangeAsync<ArgsType extends ExchangeEventArgs>( + eventName: ExchangeEvents, fromBlock: number, toBlock: number, - ): Promise<Array<LogWithDecodedArgs<ExchangeFillEventArgs>>> { - return this._exchangeWrapper.getLogsAsync<ExchangeFillEventArgs>( - ExchangeEvents.Fill, + ): Promise<Array<LogWithDecodedArgs<ArgsType>>> { + return this._exchangeWrapper.getLogsAsync<ArgsType>( + eventName, { fromBlock, toBlock, diff --git a/packages/pipeline/src/entities/index.ts b/packages/pipeline/src/entities/index.ts index 442eec9cd..d3c61bcb9 100644 --- a/packages/pipeline/src/entities/index.ts +++ b/packages/pipeline/src/entities/index.ts @@ -1,3 +1,7 @@ +import { ExchangeCancelEvent } from './exchange_cancel_event'; +import { ExchangeCancelUpToEvent } from './exchange_cancel_up_to_event'; +import { ExchangeFillEvent } from './exchange_fill_event'; + export { Block } from './block'; export { ExchangeCancelEvent } from './exchange_cancel_event'; export { ExchangeCancelUpToEvent } from './exchange_cancel_up_to_event'; @@ -7,3 +11,5 @@ export { SraOrder } from './sra_order'; export { Transaction } from './transaction'; export { TokenOnChainMetadata } from './token_on_chain_metadata'; export { SraOrdersObservedTimeStamp, createObservedTimestampForOrder } from './sra_order_observed_timestamp'; + +export type ExchangeEvent = ExchangeFillEvent | ExchangeCancelEvent | ExchangeCancelUpToEvent; diff --git a/packages/pipeline/src/parsers/events/index.ts b/packages/pipeline/src/parsers/events/index.ts index 407883078..d42d1c57a 100644 --- a/packages/pipeline/src/parsers/events/index.ts +++ b/packages/pipeline/src/parsers/events/index.ts @@ -1,9 +1,4 @@ -import { - ExchangeCancelEventArgs, - ExchangeCancelUpToEventArgs, - ExchangeEventArgs, - ExchangeFillEventArgs, -} from '@0x/contract-wrappers'; +import { ExchangeCancelEventArgs, ExchangeCancelUpToEventArgs, ExchangeFillEventArgs } from '@0x/contract-wrappers'; import { assetDataUtils } from '@0x/order-utils'; import { AssetProxyId, ERC721AssetData } from '@0x/types'; import { LogWithDecodedArgs } from 'ethereum-types'; @@ -12,39 +7,32 @@ import * as R from 'ramda'; import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeFillEvent } from '../../entities'; import { bigNumbertoStringOrNull } from '../../utils'; -export type ExchangeEventEntity = ExchangeFillEvent | ExchangeCancelEvent | ExchangeCancelUpToEvent; +/** + * Parses raw event logs for a fill event and returns an array of + * ExchangeFillEvent entities. + * @param eventLogs Raw event logs (e.g. returned from contract-wrappers). + */ +export const parseExchangeFillEvents: ( + eventLogs: Array<LogWithDecodedArgs<ExchangeFillEventArgs>>, +) => ExchangeFillEvent[] = R.map(_convertToExchangeFillEvent); -export const parseExchangeEvents: ( - eventLogs: Array<LogWithDecodedArgs<ExchangeEventArgs>>, -) => ExchangeEventEntity[] = R.map(_convertToEntity); +/** + * Parses raw event logs for a cancel event and returns an array of + * ExchangeCancelEvent entities. + * @param eventLogs Raw event logs (e.g. returned from contract-wrappers). + */ +export const parseExchangeCancelEvents: ( + eventLogs: Array<LogWithDecodedArgs<ExchangeCancelEventArgs>>, +) => ExchangeCancelEvent[] = R.map(_convertToExchangeCancelEvent); /** - * Converts a raw event log to an Entity. Automatically detects the type of - * event and returns the appropriate entity type. Throws for unknown event - * types. - * @param eventLog Raw event log (e.g. returned from contract-wrappers). + * Parses raw event logs for a CancelUpTo event and returns an array of + * ExchangeCancelUpToEvent entities. + * @param eventLogs Raw event logs (e.g. returned from contract-wrappers). */ -export function _convertToEntity(eventLog: LogWithDecodedArgs<ExchangeEventArgs>): ExchangeEventEntity { - switch (eventLog.event) { - case 'Fill': - // tslint has a false positive here. We need to type assert in order - // to change the type argument to the more specific - // ExchangeFillEventArgs. - // tslint:disable-next-line:no-unnecessary-type-assertion - return _convertToExchangeFillEvent(eventLog as LogWithDecodedArgs<ExchangeFillEventArgs>); - case 'Cancel': - // tslint:disable-next-line:no-unnecessary-type-assertion - return _convertToExchangeCancelEvent(eventLog as LogWithDecodedArgs<ExchangeCancelEventArgs>); - case 'CancelUpTo': - // tslint:disable-next-line:no-unnecessary-type-assertion - return _convertToExchangeCancelUpToEvent(eventLog as LogWithDecodedArgs<ExchangeCancelUpToEventArgs>); - default: - // Another false positive here. We are adding two strings, but - // tslint seems confused about the types. - // tslint:disable-next-line:restrict-plus-operands - throw new Error('unexpected eventLog.event type: ' + eventLog.event); - } -} +export const parseExchangeCancelUpToEvents: ( + eventLogs: Array<LogWithDecodedArgs<ExchangeCancelUpToEventArgs>>, +) => ExchangeCancelUpToEvent[] = R.map(_convertToExchangeCancelUpToEvent); /** * Converts a raw event log for a fill event into an ExchangeFillEvent entity. diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index bceed299c..b2a99e3c0 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -1,14 +1,13 @@ // tslint:disable:no-console import { web3Factory } from '@0x/dev-utils'; -import { Web3ProviderEngine } from '@0x/subproviders'; import R = require('ramda'); import 'reflect-metadata'; import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; -import { ExchangeFillEvent } from '../entities'; +import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeEvent, ExchangeFillEvent } from '../entities'; import * as ormConfig from '../ormconfig'; -import { ExchangeEventEntity, parseExchangeEvents } from '../parsers/events'; +import { parseExchangeCancelEvents, parseExchangeCancelUpToEvents, parseExchangeFillEvents } from '../parsers/events'; import { handleError } from '../utils'; const EXCHANGE_START_BLOCK = 6271590; // Block number when the Exchange contract was deployed to mainnet. @@ -22,40 +21,88 @@ let connection: Connection; const provider = web3Factory.getRpcProvider({ rpcUrl: 'https://mainnet.infura.io', }); - await getExchangeEventsAsync(provider); + const eventsSource = new ExchangeEventsSource(provider, 1); + await getFillEventsAsync(eventsSource); + await getCancelEventsAsync(eventsSource); + await getCancelUpToEventsAsync(eventsSource); process.exit(0); })().catch(handleError); -async function getExchangeEventsAsync(provider: Web3ProviderEngine): Promise<void> { - console.log('Checking existing event logs...'); - const eventsRepository = connection.getRepository(ExchangeFillEvent); - const manager = connection.createEntityManager(); - const startBlock = await getStartBlockAsync(eventsRepository); - console.log(`Getting event logs starting at ${startBlock}...`); - const exchangeEvents = new ExchangeEventsSource(provider, 1); - const eventLogs = await exchangeEvents.getFillEventsAsync(startBlock); - console.log('Parsing events...'); - const events = parseExchangeEvents(eventLogs); - console.log(`Retrieved and parsed ${events.length} total events.`); - console.log('Saving events...'); - if (startBlock === EXCHANGE_START_BLOCK) { +async function getFillEventsAsync(eventsSource: ExchangeEventsSource): Promise<void> { + console.log('Checking existing fill events...'); + const repository = connection.getRepository(ExchangeFillEvent); + const startBlock = await getStartBlockAsync(repository); + console.log(`Getting fill events starting at ${startBlock}...`); + const eventLogs = await eventsSource.getFillEventsAsync(startBlock); + console.log('Parsing fill events...'); + const events = parseExchangeFillEvents(eventLogs); + console.log(`Retrieved and parsed ${events.length} total fill events.`); + await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); +} + +async function getCancelEventsAsync(eventsSource: ExchangeEventsSource): Promise<void> { + console.log('Checking existing cancel events...'); + const repository = connection.getRepository(ExchangeCancelEvent); + const startBlock = await getStartBlockAsync(repository); + console.log(`Getting cancel events starting at ${startBlock}...`); + const eventLogs = await eventsSource.getCancelEventsAsync(startBlock); + console.log('Parsing cancel events...'); + const events = parseExchangeCancelEvents(eventLogs); + console.log(`Retrieved and parsed ${events.length} total cancel events.`); + await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); +} + +async function getCancelUpToEventsAsync(eventsSource: ExchangeEventsSource): Promise<void> { + console.log('Checking existing CancelUpTo events...'); + const repository = connection.getRepository(ExchangeCancelUpToEvent); + const startBlock = await getStartBlockAsync(repository); + console.log(`Getting CancelUpTo events starting at ${startBlock}...`); + const eventLogs = await eventsSource.getCancelUpToEventsAsync(startBlock); + console.log('Parsing CancelUpTo events...'); + const events = parseExchangeCancelUpToEvents(eventLogs); + console.log(`Retrieved and parsed ${events.length} total CancelUpTo events.`); + await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); +} + +async function getStartBlockAsync<T extends ExchangeEvent>(repository: Repository<T>): Promise<number> { + const fillEventCount = await repository.count(); + if (fillEventCount === 0) { + console.log(`No existing ${repository.metadata.name}s found.`); + return EXCHANGE_START_BLOCK; + } + const queryResult = await connection.query( + // TODO(albrow): Would prefer to use a prepared statement here to reduce + // surface area for SQL injections, but it doesn't appear to be working. + `SELECT block_number FROM raw.${repository.metadata.tableName} ORDER BY block_number DESC LIMIT 1`, + ); + const lastKnownBlock = queryResult[0].block_number; + return lastKnownBlock - START_BLOCK_OFFSET; +} + +async function saveEventsAsync<T extends ExchangeEvent>( + isInitialPull: boolean, + repository: Repository<T>, + events: T[], +): Promise<void> { + console.log(`Saving ${repository.metadata.name}s...`); + if (isInitialPull) { // Split data into numChunks pieces of maximum size BATCH_SAVE_SIZE // each. for (const eventsBatch of R.splitEvery(BATCH_SAVE_SIZE, events)) { - await eventsRepository.insert(eventsBatch); + await repository.insert(eventsBatch); } } else { // If we possibly have some overlap where we need to update some // existing events, we need to use our workaround/fallback. - await saveIndividuallyWithFallbackAsync(eventsRepository, events); + await saveIndividuallyWithFallbackAsync(repository, events); } - const totalEvents = await eventsRepository.count(); - console.log(`Done saving events. There are now ${totalEvents} total events.`); + const totalEvents = await repository.count(); + console.log(`Done saving events. There are now ${totalEvents} total ${repository.metadata.name}s.`); } -async function saveIndividuallyWithFallbackAsync( - eventsRepository: Repository<ExchangeFillEvent>, - events: ExchangeEventEntity[], +async function saveIndividuallyWithFallbackAsync<T extends ExchangeEvent>( + repository: Repository<T>, + events: T[], ): Promise<void> { // Note(albrow): This is a temporary hack because `save` is not working as // documented and is causing a foreign key constraint violation. Hopefully @@ -63,32 +110,24 @@ async function saveIndividuallyWithFallbackAsync( // on one event at a time and is therefore much slower. for (const event of events) { try { - // First try and insert. - await eventsRepository.insert(event); + // First try an insert. + await repository.insert(event); } catch { // If it fails, assume it was a foreign key constraint error and try // doing an update instead. - await eventsRepository.update( + // Note(albrow): Unfortunately the `as any` hack here seems + // required. I can't figure out how to convince the type-checker + // that the criteria and the entity itself are the correct type for + // the given repository. If we can remove the `save` hack then this + // will probably no longer be necessary. + await repository.update( { contractAddress: event.contractAddress, blockNumber: event.blockNumber, logIndex: event.logIndex, - }, - event, + } as any, + event as any, ); } } } - -async function getStartBlockAsync(eventsRepository: Repository<ExchangeFillEvent>): Promise<number> { - const fillEventCount = await eventsRepository.count(); - if (fillEventCount === 0) { - console.log('No existing fill events found.'); - return EXCHANGE_START_BLOCK; - } - const queryResult = await connection.query( - 'SELECT block_number FROM raw.exchange_fill_events ORDER BY block_number DESC LIMIT 1', - ); - const lastKnownBlock = queryResult[0].block_number; - return lastKnownBlock - START_BLOCK_OFFSET; -} |