From 24fd2d9730d58a58929f401674175ad8a5a7fbc1 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Fri, 16 Nov 2018 12:55:54 -0800 Subject: Add support for pulling Cancel and CancelUpTo events --- .../1542401122477-MakeTakerAddressNullable.ts | 17 +++ .../contract-wrappers/exchange_events.ts | 50 +++++++-- packages/pipeline/src/entities/index.ts | 6 + packages/pipeline/src/parsers/events/index.ts | 58 ++++------ .../pipeline/src/scripts/pull_missing_events.ts | 123 ++++++++++++++------- .../pipeline/test/parsers/events/index_test.ts | 6 +- 6 files changed, 170 insertions(+), 90 deletions(-) create mode 100644 packages/pipeline/migrations/1542401122477-MakeTakerAddressNullable.ts diff --git a/packages/pipeline/migrations/1542401122477-MakeTakerAddressNullable.ts b/packages/pipeline/migrations/1542401122477-MakeTakerAddressNullable.ts new file mode 100644 index 000000000..957c85a36 --- /dev/null +++ b/packages/pipeline/migrations/1542401122477-MakeTakerAddressNullable.ts @@ -0,0 +1,17 @@ +import { MigrationInterface, QueryRunner } from 'typeorm'; + +export class MakeTakerAddressNullable1542401122477 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `ALTER TABLE raw.exchange_cancel_events + ALTER COLUMN taker_address DROP NOT NULL;`, + ); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `ALTER TABLE raw.exchange_cancel_events + ALTER COLUMN taker_address SET NOT NULL;`, + ); + } +} diff --git a/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts b/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts index e25c6a731..d09f59535 100644 --- a/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts +++ b/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts @@ -1,4 +1,12 @@ -import { ContractWrappers, ExchangeEvents, ExchangeFillEventArgs, ExchangeWrapper } from '@0x/contract-wrappers'; +import { + ContractWrappers, + ExchangeCancelEventArgs, + ExchangeCancelUpToEventArgs, + ExchangeEventArgs, + ExchangeEvents, + ExchangeFillEventArgs, + ExchangeWrapper, +} from '@0x/contract-wrappers'; import { Web3ProviderEngine } from '@0x/subproviders'; import { Web3Wrapper } from '@0x/web3-wrapper'; import { LogWithDecodedArgs } from 'ethereum-types'; @@ -16,20 +24,41 @@ export class ExchangeEventsSource { this._exchangeWrapper = contractWrappers.exchange; } - // TODO(albrow): Get Cancel and CancelUpTo events. - public async getFillEventsAsync( - fromBlock: number = EXCHANGE_START_BLOCK, + fromBlock?: number, toBlock?: number, ): Promise>> { + return this._getEventsAsync(ExchangeEvents.Fill, fromBlock, toBlock); + } + + public async getCancelEventsAsync( + fromBlock?: number, + toBlock?: number, + ): Promise>> { + return this._getEventsAsync(ExchangeEvents.Cancel, fromBlock, toBlock); + } + + public async getCancelUpToEventsAsync( + fromBlock?: number, + toBlock?: number, + ): Promise>> { + return this._getEventsAsync(ExchangeEvents.CancelUpTo, fromBlock, toBlock); + } + + private async _getEventsAsync( + eventName: ExchangeEvents, + fromBlock: number = EXCHANGE_START_BLOCK, + toBlock?: number, + ): Promise>> { const calculatedToBlock = toBlock === undefined ? (await this._web3Wrapper.getBlockNumberAsync()) - BLOCK_FINALITY_THRESHOLD : toBlock; - let events: Array> = []; + let events: Array> = []; for (let currFromBlock = fromBlock; currFromBlock <= calculatedToBlock; currFromBlock += NUM_BLOCKS_PER_QUERY) { events = events.concat( - await this._getFillEventsForRangeAsync( + await this._getEventsForRangeAsync( + eventName, currFromBlock, Math.min(currFromBlock + NUM_BLOCKS_PER_QUERY - 1, calculatedToBlock), ), @@ -38,12 +67,13 @@ export class ExchangeEventsSource { return events; } - private async _getFillEventsForRangeAsync( + private async _getEventsForRangeAsync( + eventName: ExchangeEvents, fromBlock: number, toBlock: number, - ): Promise>> { - return this._exchangeWrapper.getLogsAsync( - ExchangeEvents.Fill, + ): Promise>> { + return this._exchangeWrapper.getLogsAsync( + eventName, { fromBlock, toBlock, diff --git a/packages/pipeline/src/entities/index.ts b/packages/pipeline/src/entities/index.ts index 442eec9cd..d3c61bcb9 100644 --- a/packages/pipeline/src/entities/index.ts +++ b/packages/pipeline/src/entities/index.ts @@ -1,3 +1,7 @@ +import { ExchangeCancelEvent } from './exchange_cancel_event'; +import { ExchangeCancelUpToEvent } from './exchange_cancel_up_to_event'; +import { ExchangeFillEvent } from './exchange_fill_event'; + export { Block } from './block'; export { ExchangeCancelEvent } from './exchange_cancel_event'; export { ExchangeCancelUpToEvent } from './exchange_cancel_up_to_event'; @@ -7,3 +11,5 @@ export { SraOrder } from './sra_order'; export { Transaction } from './transaction'; export { TokenOnChainMetadata } from './token_on_chain_metadata'; export { SraOrdersObservedTimeStamp, createObservedTimestampForOrder } from './sra_order_observed_timestamp'; + +export type ExchangeEvent = ExchangeFillEvent | ExchangeCancelEvent | ExchangeCancelUpToEvent; diff --git a/packages/pipeline/src/parsers/events/index.ts b/packages/pipeline/src/parsers/events/index.ts index 407883078..d42d1c57a 100644 --- a/packages/pipeline/src/parsers/events/index.ts +++ b/packages/pipeline/src/parsers/events/index.ts @@ -1,9 +1,4 @@ -import { - ExchangeCancelEventArgs, - ExchangeCancelUpToEventArgs, - ExchangeEventArgs, - ExchangeFillEventArgs, -} from '@0x/contract-wrappers'; +import { ExchangeCancelEventArgs, ExchangeCancelUpToEventArgs, ExchangeFillEventArgs } from '@0x/contract-wrappers'; import { assetDataUtils } from '@0x/order-utils'; import { AssetProxyId, ERC721AssetData } from '@0x/types'; import { LogWithDecodedArgs } from 'ethereum-types'; @@ -12,39 +7,32 @@ import * as R from 'ramda'; import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeFillEvent } from '../../entities'; import { bigNumbertoStringOrNull } from '../../utils'; -export type ExchangeEventEntity = ExchangeFillEvent | ExchangeCancelEvent | ExchangeCancelUpToEvent; +/** + * Parses raw event logs for a fill event and returns an array of + * ExchangeFillEvent entities. + * @param eventLogs Raw event logs (e.g. returned from contract-wrappers). + */ +export const parseExchangeFillEvents: ( + eventLogs: Array>, +) => ExchangeFillEvent[] = R.map(_convertToExchangeFillEvent); -export const parseExchangeEvents: ( - eventLogs: Array>, -) => ExchangeEventEntity[] = R.map(_convertToEntity); +/** + * Parses raw event logs for a cancel event and returns an array of + * ExchangeCancelEvent entities. + * @param eventLogs Raw event logs (e.g. returned from contract-wrappers). + */ +export const parseExchangeCancelEvents: ( + eventLogs: Array>, +) => ExchangeCancelEvent[] = R.map(_convertToExchangeCancelEvent); /** - * Converts a raw event log to an Entity. Automatically detects the type of - * event and returns the appropriate entity type. Throws for unknown event - * types. - * @param eventLog Raw event log (e.g. returned from contract-wrappers). + * Parses raw event logs for a CancelUpTo event and returns an array of + * ExchangeCancelUpToEvent entities. + * @param eventLogs Raw event logs (e.g. returned from contract-wrappers). */ -export function _convertToEntity(eventLog: LogWithDecodedArgs): ExchangeEventEntity { - switch (eventLog.event) { - case 'Fill': - // tslint has a false positive here. We need to type assert in order - // to change the type argument to the more specific - // ExchangeFillEventArgs. - // tslint:disable-next-line:no-unnecessary-type-assertion - return _convertToExchangeFillEvent(eventLog as LogWithDecodedArgs); - case 'Cancel': - // tslint:disable-next-line:no-unnecessary-type-assertion - return _convertToExchangeCancelEvent(eventLog as LogWithDecodedArgs); - case 'CancelUpTo': - // tslint:disable-next-line:no-unnecessary-type-assertion - return _convertToExchangeCancelUpToEvent(eventLog as LogWithDecodedArgs); - default: - // Another false positive here. We are adding two strings, but - // tslint seems confused about the types. - // tslint:disable-next-line:restrict-plus-operands - throw new Error('unexpected eventLog.event type: ' + eventLog.event); - } -} +export const parseExchangeCancelUpToEvents: ( + eventLogs: Array>, +) => ExchangeCancelUpToEvent[] = R.map(_convertToExchangeCancelUpToEvent); /** * Converts a raw event log for a fill event into an ExchangeFillEvent entity. diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index bceed299c..b2a99e3c0 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -1,14 +1,13 @@ // tslint:disable:no-console import { web3Factory } from '@0x/dev-utils'; -import { Web3ProviderEngine } from '@0x/subproviders'; import R = require('ramda'); import 'reflect-metadata'; import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; -import { ExchangeFillEvent } from '../entities'; +import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeEvent, ExchangeFillEvent } from '../entities'; import * as ormConfig from '../ormconfig'; -import { ExchangeEventEntity, parseExchangeEvents } from '../parsers/events'; +import { parseExchangeCancelEvents, parseExchangeCancelUpToEvents, parseExchangeFillEvents } from '../parsers/events'; import { handleError } from '../utils'; const EXCHANGE_START_BLOCK = 6271590; // Block number when the Exchange contract was deployed to mainnet. @@ -22,40 +21,88 @@ let connection: Connection; const provider = web3Factory.getRpcProvider({ rpcUrl: 'https://mainnet.infura.io', }); - await getExchangeEventsAsync(provider); + const eventsSource = new ExchangeEventsSource(provider, 1); + await getFillEventsAsync(eventsSource); + await getCancelEventsAsync(eventsSource); + await getCancelUpToEventsAsync(eventsSource); process.exit(0); })().catch(handleError); -async function getExchangeEventsAsync(provider: Web3ProviderEngine): Promise { - console.log('Checking existing event logs...'); - const eventsRepository = connection.getRepository(ExchangeFillEvent); - const manager = connection.createEntityManager(); - const startBlock = await getStartBlockAsync(eventsRepository); - console.log(`Getting event logs starting at ${startBlock}...`); - const exchangeEvents = new ExchangeEventsSource(provider, 1); - const eventLogs = await exchangeEvents.getFillEventsAsync(startBlock); - console.log('Parsing events...'); - const events = parseExchangeEvents(eventLogs); - console.log(`Retrieved and parsed ${events.length} total events.`); - console.log('Saving events...'); - if (startBlock === EXCHANGE_START_BLOCK) { +async function getFillEventsAsync(eventsSource: ExchangeEventsSource): Promise { + console.log('Checking existing fill events...'); + const repository = connection.getRepository(ExchangeFillEvent); + const startBlock = await getStartBlockAsync(repository); + console.log(`Getting fill events starting at ${startBlock}...`); + const eventLogs = await eventsSource.getFillEventsAsync(startBlock); + console.log('Parsing fill events...'); + const events = parseExchangeFillEvents(eventLogs); + console.log(`Retrieved and parsed ${events.length} total fill events.`); + await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); +} + +async function getCancelEventsAsync(eventsSource: ExchangeEventsSource): Promise { + console.log('Checking existing cancel events...'); + const repository = connection.getRepository(ExchangeCancelEvent); + const startBlock = await getStartBlockAsync(repository); + console.log(`Getting cancel events starting at ${startBlock}...`); + const eventLogs = await eventsSource.getCancelEventsAsync(startBlock); + console.log('Parsing cancel events...'); + const events = parseExchangeCancelEvents(eventLogs); + console.log(`Retrieved and parsed ${events.length} total cancel events.`); + await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); +} + +async function getCancelUpToEventsAsync(eventsSource: ExchangeEventsSource): Promise { + console.log('Checking existing CancelUpTo events...'); + const repository = connection.getRepository(ExchangeCancelUpToEvent); + const startBlock = await getStartBlockAsync(repository); + console.log(`Getting CancelUpTo events starting at ${startBlock}...`); + const eventLogs = await eventsSource.getCancelUpToEventsAsync(startBlock); + console.log('Parsing CancelUpTo events...'); + const events = parseExchangeCancelUpToEvents(eventLogs); + console.log(`Retrieved and parsed ${events.length} total CancelUpTo events.`); + await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); +} + +async function getStartBlockAsync(repository: Repository): Promise { + const fillEventCount = await repository.count(); + if (fillEventCount === 0) { + console.log(`No existing ${repository.metadata.name}s found.`); + return EXCHANGE_START_BLOCK; + } + const queryResult = await connection.query( + // TODO(albrow): Would prefer to use a prepared statement here to reduce + // surface area for SQL injections, but it doesn't appear to be working. + `SELECT block_number FROM raw.${repository.metadata.tableName} ORDER BY block_number DESC LIMIT 1`, + ); + const lastKnownBlock = queryResult[0].block_number; + return lastKnownBlock - START_BLOCK_OFFSET; +} + +async function saveEventsAsync( + isInitialPull: boolean, + repository: Repository, + events: T[], +): Promise { + console.log(`Saving ${repository.metadata.name}s...`); + if (isInitialPull) { // Split data into numChunks pieces of maximum size BATCH_SAVE_SIZE // each. for (const eventsBatch of R.splitEvery(BATCH_SAVE_SIZE, events)) { - await eventsRepository.insert(eventsBatch); + await repository.insert(eventsBatch); } } else { // If we possibly have some overlap where we need to update some // existing events, we need to use our workaround/fallback. - await saveIndividuallyWithFallbackAsync(eventsRepository, events); + await saveIndividuallyWithFallbackAsync(repository, events); } - const totalEvents = await eventsRepository.count(); - console.log(`Done saving events. There are now ${totalEvents} total events.`); + const totalEvents = await repository.count(); + console.log(`Done saving events. There are now ${totalEvents} total ${repository.metadata.name}s.`); } -async function saveIndividuallyWithFallbackAsync( - eventsRepository: Repository, - events: ExchangeEventEntity[], +async function saveIndividuallyWithFallbackAsync( + repository: Repository, + events: T[], ): Promise { // Note(albrow): This is a temporary hack because `save` is not working as // documented and is causing a foreign key constraint violation. Hopefully @@ -63,32 +110,24 @@ async function saveIndividuallyWithFallbackAsync( // on one event at a time and is therefore much slower. for (const event of events) { try { - // First try and insert. - await eventsRepository.insert(event); + // First try an insert. + await repository.insert(event); } catch { // If it fails, assume it was a foreign key constraint error and try // doing an update instead. - await eventsRepository.update( + // Note(albrow): Unfortunately the `as any` hack here seems + // required. I can't figure out how to convince the type-checker + // that the criteria and the entity itself are the correct type for + // the given repository. If we can remove the `save` hack then this + // will probably no longer be necessary. + await repository.update( { contractAddress: event.contractAddress, blockNumber: event.blockNumber, logIndex: event.logIndex, - }, - event, + } as any, + event as any, ); } } } - -async function getStartBlockAsync(eventsRepository: Repository): Promise { - const fillEventCount = await eventsRepository.count(); - if (fillEventCount === 0) { - console.log('No existing fill events found.'); - return EXCHANGE_START_BLOCK; - } - const queryResult = await connection.query( - 'SELECT block_number FROM raw.exchange_fill_events ORDER BY block_number DESC LIMIT 1', - ); - const lastKnownBlock = queryResult[0].block_number; - return lastKnownBlock - START_BLOCK_OFFSET; -} diff --git a/packages/pipeline/test/parsers/events/index_test.ts b/packages/pipeline/test/parsers/events/index_test.ts index 63e080edc..7e439ce39 100644 --- a/packages/pipeline/test/parsers/events/index_test.ts +++ b/packages/pipeline/test/parsers/events/index_test.ts @@ -5,7 +5,7 @@ import { LogWithDecodedArgs } from 'ethereum-types'; import 'mocha'; import { ExchangeFillEvent } from '../../../src/entities'; -import { _convertToEntity } from '../../../src/parsers/events'; +import { _convertToExchangeFillEvent } from '../../../src/parsers/events'; import { chaiSetup } from '../../utils/chai_setup'; chaiSetup.configure(); @@ -13,7 +13,7 @@ const expect = chai.expect; // tslint:disable:custom-no-magic-numbers describe('exchange_events', () => { - describe('_convertToEntity', () => { + describe('_convertToExchangeFillEvent', () => { it('converts LogWithDecodedArgs to ExchangeFillEvent entity', () => { const input: LogWithDecodedArgs = { logIndex: 102, @@ -71,7 +71,7 @@ describe('exchange_events', () => { expected.takerAssetProxyId = '0xf47261b0'; expected.takerTokenAddress = '0xe41d2489571d322189246dafa5ebde1f4699f498'; expected.takerTokenId = null; - const actual = _convertToEntity(input); + const actual = _convertToExchangeFillEvent(input); expect(actual).deep.equal(expected); }); }); -- cgit v1.2.3