diff options
author | Alex Browne <stephenalexbrowne@gmail.com> | 2018-12-12 07:16:05 +0800 |
---|---|---|
committer | Fred Carlsen <fred@sjelfull.no> | 2018-12-13 01:19:40 +0800 |
commit | 8f911206f7d9475d040e41458f28898604b79e62 (patch) | |
tree | 0eede93454b4ae266be968681fd6295b0629911f /packages/pipeline/src | |
parent | 3496725db659e69df8394510b86d564105acac4e (diff) | |
download | dexon-sol-tools-8f911206f7d9475d040e41458f28898604b79e62.tar dexon-sol-tools-8f911206f7d9475d040e41458f28898604b79e62.tar.gz dexon-sol-tools-8f911206f7d9475d040e41458f28898604b79e62.tar.bz2 dexon-sol-tools-8f911206f7d9475d040e41458f28898604b79e62.tar.lz dexon-sol-tools-8f911206f7d9475d040e41458f28898604b79e62.tar.xz dexon-sol-tools-8f911206f7d9475d040e41458f28898604b79e62.tar.zst dexon-sol-tools-8f911206f7d9475d040e41458f28898604b79e62.zip |
Refactor event scraping and add support for scraping ERC20 approval events (#1401)
* Refactor event scraping and add support for scraping ERC20 approval events
* Add tests for data_sources/contract-wrappers/utils
Diffstat (limited to 'packages/pipeline/src')
-rw-r--r-- | packages/pipeline/src/data_sources/contract-wrappers/erc20_events.ts | 45 | ||||
-rw-r--r-- | packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts | 76 | ||||
-rw-r--r-- | packages/pipeline/src/data_sources/contract-wrappers/utils.ts | 67 | ||||
-rw-r--r-- | packages/pipeline/src/entities/erc20_approval_event.ts | 26 | ||||
-rw-r--r-- | packages/pipeline/src/entities/index.ts | 1 | ||||
-rw-r--r-- | packages/pipeline/src/ormconfig.ts | 2 | ||||
-rw-r--r-- | packages/pipeline/src/parsers/events/erc20_events.ts | 34 | ||||
-rw-r--r-- | packages/pipeline/src/parsers/events/exchange_events.ts | 133 | ||||
-rw-r--r-- | packages/pipeline/src/parsers/events/index.ts | 135 | ||||
-rw-r--r-- | packages/pipeline/src/scripts/pull_erc20_events.ts | 66 | ||||
-rw-r--r-- | packages/pipeline/src/scripts/pull_exchange_events.ts (renamed from packages/pipeline/src/scripts/pull_missing_events.ts) | 28 | ||||
-rw-r--r-- | packages/pipeline/src/scripts/pull_missing_blocks.ts | 2 |
12 files changed, 421 insertions, 194 deletions
diff --git a/packages/pipeline/src/data_sources/contract-wrappers/erc20_events.ts b/packages/pipeline/src/data_sources/contract-wrappers/erc20_events.ts new file mode 100644 index 000000000..e0098122f --- /dev/null +++ b/packages/pipeline/src/data_sources/contract-wrappers/erc20_events.ts @@ -0,0 +1,45 @@ +import { + ContractWrappers, + ERC20TokenApprovalEventArgs, + ERC20TokenEvents, + ERC20TokenWrapper, +} from '@0x/contract-wrappers'; +import { Web3ProviderEngine } from '@0x/subproviders'; +import { LogWithDecodedArgs } from 'ethereum-types'; + +import { GetEventsFunc, getEventsWithPaginationAsync } from './utils'; + +export class ERC20EventsSource { + private readonly _erc20Wrapper: ERC20TokenWrapper; + private readonly _tokenAddress: string; + constructor(provider: Web3ProviderEngine, networkId: number, tokenAddress: string) { + const contractWrappers = new ContractWrappers(provider, { networkId }); + this._erc20Wrapper = contractWrappers.erc20Token; + this._tokenAddress = tokenAddress; + } + + public async getApprovalEventsAsync( + startBlock: number, + endBlock: number, + ): Promise<Array<LogWithDecodedArgs<ERC20TokenApprovalEventArgs>>> { + return getEventsWithPaginationAsync( + this._getApprovalEventsForRangeAsync.bind(this) as GetEventsFunc<ERC20TokenApprovalEventArgs>, + startBlock, + endBlock, + ); + } + + // Gets all approval events of for a specific sub-range. This getter + // function will be called during each step of pagination. + private async _getApprovalEventsForRangeAsync( + fromBlock: number, + toBlock: number, + ): Promise<Array<LogWithDecodedArgs<ERC20TokenApprovalEventArgs>>> { + return this._erc20Wrapper.getLogsAsync<ERC20TokenApprovalEventArgs>( + this._tokenAddress, + ERC20TokenEvents.Approval, + { fromBlock, toBlock }, + {}, + ); + } +} diff --git a/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts b/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts index 1717eb8b3..58691e2ab 100644 --- a/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts +++ b/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts @@ -8,78 +8,52 @@ import { ExchangeWrapper, } from '@0x/contract-wrappers'; import { Web3ProviderEngine } from '@0x/subproviders'; -import { Web3Wrapper } from '@0x/web3-wrapper'; import { LogWithDecodedArgs } from 'ethereum-types'; -import { EXCHANGE_START_BLOCK } from '../../utils'; - -const BLOCK_FINALITY_THRESHOLD = 10; // When to consider blocks as final. Used to compute default toBlock. -const NUM_BLOCKS_PER_QUERY = 20000; // Number of blocks to query for events at a time. +import { GetEventsFunc, getEventsWithPaginationAsync } from './utils'; export class ExchangeEventsSource { private readonly _exchangeWrapper: ExchangeWrapper; - private readonly _web3Wrapper: Web3Wrapper; constructor(provider: Web3ProviderEngine, networkId: number) { - this._web3Wrapper = new Web3Wrapper(provider); const contractWrappers = new ContractWrappers(provider, { networkId }); this._exchangeWrapper = contractWrappers.exchange; } public async getFillEventsAsync( - fromBlock?: number, - toBlock?: number, + startBlock: number, + endBlock: number, ): Promise<Array<LogWithDecodedArgs<ExchangeFillEventArgs>>> { - return this._getEventsAsync<ExchangeFillEventArgs>(ExchangeEvents.Fill, fromBlock, toBlock); + const getFillEventsForRangeAsync = this._makeGetterFuncForEventType<ExchangeFillEventArgs>(ExchangeEvents.Fill); + return getEventsWithPaginationAsync(getFillEventsForRangeAsync, startBlock, endBlock); } public async getCancelEventsAsync( - fromBlock?: number, - toBlock?: number, + startBlock: number, + endBlock: number, ): Promise<Array<LogWithDecodedArgs<ExchangeCancelEventArgs>>> { - return this._getEventsAsync<ExchangeCancelEventArgs>(ExchangeEvents.Cancel, fromBlock, toBlock); + const getCancelEventsForRangeAsync = this._makeGetterFuncForEventType<ExchangeCancelEventArgs>( + ExchangeEvents.Cancel, + ); + return getEventsWithPaginationAsync(getCancelEventsForRangeAsync, startBlock, endBlock); } public async getCancelUpToEventsAsync( - fromBlock?: number, - toBlock?: number, + startBlock: number, + endBlock: number, ): Promise<Array<LogWithDecodedArgs<ExchangeCancelUpToEventArgs>>> { - return this._getEventsAsync<ExchangeCancelUpToEventArgs>(ExchangeEvents.CancelUpTo, fromBlock, toBlock); - } - - private async _getEventsAsync<ArgsType extends ExchangeEventArgs>( - eventName: ExchangeEvents, - fromBlock: number = EXCHANGE_START_BLOCK, - toBlock?: number, - ): Promise<Array<LogWithDecodedArgs<ArgsType>>> { - const calculatedToBlock = - toBlock === undefined - ? (await this._web3Wrapper.getBlockNumberAsync()) - BLOCK_FINALITY_THRESHOLD - : toBlock; - let events: Array<LogWithDecodedArgs<ArgsType>> = []; - for (let currFromBlock = fromBlock; currFromBlock <= calculatedToBlock; currFromBlock += NUM_BLOCKS_PER_QUERY) { - events = events.concat( - await this._getEventsForRangeAsync<ArgsType>( - eventName, - currFromBlock, - Math.min(currFromBlock + NUM_BLOCKS_PER_QUERY - 1, calculatedToBlock), - ), - ); - } - return events; + const getCancelUpToEventsForRangeAsync = this._makeGetterFuncForEventType<ExchangeCancelUpToEventArgs>( + ExchangeEvents.CancelUpTo, + ); + return getEventsWithPaginationAsync(getCancelUpToEventsForRangeAsync, startBlock, endBlock); } - private async _getEventsForRangeAsync<ArgsType extends ExchangeEventArgs>( - eventName: ExchangeEvents, - fromBlock: number, - toBlock: number, - ): Promise<Array<LogWithDecodedArgs<ArgsType>>> { - return this._exchangeWrapper.getLogsAsync<ArgsType>( - eventName, - { - fromBlock, - toBlock, - }, - {}, - ); + // Returns a getter function which gets all events of a specific type for a + // specific sub-range. This getter function will be called during each step + // of pagination. + private _makeGetterFuncForEventType<ArgsType extends ExchangeEventArgs>( + eventType: ExchangeEvents, + ): GetEventsFunc<ArgsType> { + return async (fromBlock: number, toBlock: number) => + this._exchangeWrapper.getLogsAsync<ArgsType>(eventType, { fromBlock, toBlock }, {}); } } diff --git a/packages/pipeline/src/data_sources/contract-wrappers/utils.ts b/packages/pipeline/src/data_sources/contract-wrappers/utils.ts new file mode 100644 index 000000000..67660a37e --- /dev/null +++ b/packages/pipeline/src/data_sources/contract-wrappers/utils.ts @@ -0,0 +1,67 @@ +import { DecodedLogArgs, LogWithDecodedArgs } from 'ethereum-types'; + +const NUM_BLOCKS_PER_QUERY = 10000; // Number of blocks to query for events at a time. +const NUM_RETRIES = 3; // Number of retries if a request fails or times out. + +export type GetEventsFunc<ArgsType extends DecodedLogArgs> = ( + fromBlock: number, + toBlock: number, +) => Promise<Array<LogWithDecodedArgs<ArgsType>>>; + +/** + * Gets all events between the given startBlock and endBlock by querying for + * NUM_BLOCKS_PER_QUERY at a time. Accepts a getter function in order to + * maximize code re-use and allow for getting different types of events for + * different contracts. If the getter function throws with a retryable error, + * it will automatically be retried up to NUM_RETRIES times. + * @param getEventsAsync A getter function which will be called for each step during pagination. + * @param startBlock The start of the entire block range to get events for. + * @param endBlock The end of the entire block range to get events for. + */ +export async function getEventsWithPaginationAsync<ArgsType extends DecodedLogArgs>( + getEventsAsync: GetEventsFunc<ArgsType>, + startBlock: number, + endBlock: number, +): Promise<Array<LogWithDecodedArgs<ArgsType>>> { + let events: Array<LogWithDecodedArgs<ArgsType>> = []; + for (let fromBlock = startBlock; fromBlock <= endBlock; fromBlock += NUM_BLOCKS_PER_QUERY) { + const toBlock = Math.min(fromBlock + NUM_BLOCKS_PER_QUERY - 1, endBlock); + const eventsInRange = await _getEventsWithRetriesAsync(getEventsAsync, NUM_RETRIES, fromBlock, toBlock); + events = events.concat(eventsInRange); + } + return events; +} + +/** + * Calls the getEventsAsync function and retries up to numRetries times if it + * throws with an error that is considered retryable. + * @param getEventsAsync a function that will be called on each iteration. + * @param numRetries the maximum number times to retry getEventsAsync if it fails with a retryable error. + * @param fromBlock the start of the sub-range of blocks we are getting events for. + * @param toBlock the end of the sub-range of blocks we are getting events for. + */ +export async function _getEventsWithRetriesAsync<ArgsType extends DecodedLogArgs>( + getEventsAsync: GetEventsFunc<ArgsType>, + numRetries: number, + fromBlock: number, + toBlock: number, +): Promise<Array<LogWithDecodedArgs<ArgsType>>> { + let eventsInRange: Array<LogWithDecodedArgs<ArgsType>> = []; + for (let i = 0; i <= numRetries; i++) { + try { + eventsInRange = await getEventsAsync(fromBlock, toBlock); + } catch (err) { + if (isErrorRetryable(err) && i < numRetries) { + continue; + } else { + throw err; + } + } + break; + } + return eventsInRange; +} + +function isErrorRetryable(err: Error): boolean { + return err.message.includes('network timeout'); +} diff --git a/packages/pipeline/src/entities/erc20_approval_event.ts b/packages/pipeline/src/entities/erc20_approval_event.ts new file mode 100644 index 000000000..69cdfcb0b --- /dev/null +++ b/packages/pipeline/src/entities/erc20_approval_event.ts @@ -0,0 +1,26 @@ +import { BigNumber } from '@0x/utils'; +import { Column, Entity, PrimaryColumn } from 'typeorm'; + +import { bigNumberTransformer, numberToBigIntTransformer } from '../utils'; + +@Entity({ name: 'erc20_approval_events', schema: 'raw' }) +export class ERC20ApprovalEvent { + @PrimaryColumn({ name: 'token_address' }) + public tokenAddress!: string; + @PrimaryColumn({ name: 'log_index' }) + public logIndex!: number; + @PrimaryColumn({ name: 'block_number', transformer: numberToBigIntTransformer }) + public blockNumber!: number; + + @Column({ name: 'raw_data' }) + public rawData!: string; + + @Column({ name: 'transaction_hash' }) + public transactionHash!: string; + @Column({ name: 'owner_address' }) + public ownerAddress!: string; + @Column({ name: 'spender_address' }) + public spenderAddress!: string; + @Column({ name: 'amount', type: 'numeric', transformer: bigNumberTransformer }) + public amount!: BigNumber; +} diff --git a/packages/pipeline/src/entities/index.ts b/packages/pipeline/src/entities/index.ts index db0814e38..cc3de78bb 100644 --- a/packages/pipeline/src/entities/index.ts +++ b/packages/pipeline/src/entities/index.ts @@ -14,5 +14,6 @@ export { SraOrdersObservedTimeStamp, createObservedTimestampForOrder } from './s export { TokenMetadata } from './token_metadata'; export { TokenOrderbookSnapshot } from './token_order'; export { Transaction } from './transaction'; +export { ERC20ApprovalEvent } from './erc20_approval_event'; export type ExchangeEvent = ExchangeFillEvent | ExchangeCancelEvent | ExchangeCancelUpToEvent; diff --git a/packages/pipeline/src/ormconfig.ts b/packages/pipeline/src/ormconfig.ts index b60703790..27991c430 100644 --- a/packages/pipeline/src/ormconfig.ts +++ b/packages/pipeline/src/ormconfig.ts @@ -3,6 +3,7 @@ import { ConnectionOptions } from 'typeorm'; import { Block, DexTrade, + ERC20ApprovalEvent, ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeFillEvent, @@ -22,6 +23,7 @@ const entities = [ ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeFillEvent, + ERC20ApprovalEvent, OHLCVExternal, Relayer, SraOrder, diff --git a/packages/pipeline/src/parsers/events/erc20_events.ts b/packages/pipeline/src/parsers/events/erc20_events.ts new file mode 100644 index 000000000..caf9984d0 --- /dev/null +++ b/packages/pipeline/src/parsers/events/erc20_events.ts @@ -0,0 +1,34 @@ +import { ERC20TokenApprovalEventArgs } from '@0x/contract-wrappers'; +import { LogWithDecodedArgs } from 'ethereum-types'; +import * as R from 'ramda'; + +import { ERC20ApprovalEvent } from '../../entities'; + +/** + * Parses raw event logs for an ERC20 approval event and returns an array of + * ERC20ApprovalEvent entities. + * @param eventLogs Raw event logs (e.g. returned from contract-wrappers). + */ +export const parseERC20ApprovalEvents: ( + eventLogs: Array<LogWithDecodedArgs<ERC20TokenApprovalEventArgs>>, +) => ERC20ApprovalEvent[] = R.map(_convertToERC20ApprovalEvent); + +/** + * Converts a raw event log for an ERC20 approval event into an + * ERC20ApprovalEvent entity. + * @param eventLog Raw event log (e.g. returned from contract-wrappers). + */ +export function _convertToERC20ApprovalEvent( + eventLog: LogWithDecodedArgs<ERC20TokenApprovalEventArgs>, +): ERC20ApprovalEvent { + const erc20ApprovalEvent = new ERC20ApprovalEvent(); + erc20ApprovalEvent.tokenAddress = eventLog.address as string; + erc20ApprovalEvent.blockNumber = eventLog.blockNumber as number; + erc20ApprovalEvent.logIndex = eventLog.logIndex as number; + erc20ApprovalEvent.rawData = eventLog.data as string; + erc20ApprovalEvent.transactionHash = eventLog.transactionHash; + erc20ApprovalEvent.ownerAddress = eventLog.args._owner; + erc20ApprovalEvent.spenderAddress = eventLog.args._spender; + erc20ApprovalEvent.amount = eventLog.args._value; + return erc20ApprovalEvent; +} diff --git a/packages/pipeline/src/parsers/events/exchange_events.ts b/packages/pipeline/src/parsers/events/exchange_events.ts new file mode 100644 index 000000000..e18106c75 --- /dev/null +++ b/packages/pipeline/src/parsers/events/exchange_events.ts @@ -0,0 +1,133 @@ +import { ExchangeCancelEventArgs, ExchangeCancelUpToEventArgs, ExchangeFillEventArgs } from '@0x/contract-wrappers'; +import { assetDataUtils } from '@0x/order-utils'; +import { AssetProxyId, ERC721AssetData } from '@0x/types'; +import { LogWithDecodedArgs } from 'ethereum-types'; +import * as R from 'ramda'; + +import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeFillEvent } from '../../entities'; +import { bigNumbertoStringOrNull } from '../../utils'; + +/** + * Parses raw event logs for a fill event and returns an array of + * ExchangeFillEvent entities. + * @param eventLogs Raw event logs (e.g. returned from contract-wrappers). + */ +export const parseExchangeFillEvents: ( + eventLogs: Array<LogWithDecodedArgs<ExchangeFillEventArgs>>, +) => ExchangeFillEvent[] = R.map(_convertToExchangeFillEvent); + +/** + * Parses raw event logs for a cancel event and returns an array of + * ExchangeCancelEvent entities. + * @param eventLogs Raw event logs (e.g. returned from contract-wrappers). + */ +export const parseExchangeCancelEvents: ( + eventLogs: Array<LogWithDecodedArgs<ExchangeCancelEventArgs>>, +) => ExchangeCancelEvent[] = R.map(_convertToExchangeCancelEvent); + +/** + * Parses raw event logs for a CancelUpTo event and returns an array of + * ExchangeCancelUpToEvent entities. + * @param eventLogs Raw event logs (e.g. returned from contract-wrappers). + */ +export const parseExchangeCancelUpToEvents: ( + eventLogs: Array<LogWithDecodedArgs<ExchangeCancelUpToEventArgs>>, +) => ExchangeCancelUpToEvent[] = R.map(_convertToExchangeCancelUpToEvent); + +/** + * Converts a raw event log for a fill event into an ExchangeFillEvent entity. + * @param eventLog Raw event log (e.g. returned from contract-wrappers). + */ +export function _convertToExchangeFillEvent(eventLog: LogWithDecodedArgs<ExchangeFillEventArgs>): ExchangeFillEvent { + const makerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.makerAssetData); + const makerAssetType = makerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721'; + const takerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.takerAssetData); + const takerAssetType = takerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721'; + const exchangeFillEvent = new ExchangeFillEvent(); + exchangeFillEvent.contractAddress = eventLog.address as string; + exchangeFillEvent.blockNumber = eventLog.blockNumber as number; + exchangeFillEvent.logIndex = eventLog.logIndex as number; + exchangeFillEvent.rawData = eventLog.data as string; + exchangeFillEvent.transactionHash = eventLog.transactionHash; + exchangeFillEvent.makerAddress = eventLog.args.makerAddress; + exchangeFillEvent.takerAddress = eventLog.args.takerAddress; + exchangeFillEvent.feeRecipientAddress = eventLog.args.feeRecipientAddress; + exchangeFillEvent.senderAddress = eventLog.args.senderAddress; + exchangeFillEvent.makerAssetFilledAmount = eventLog.args.makerAssetFilledAmount; + exchangeFillEvent.takerAssetFilledAmount = eventLog.args.takerAssetFilledAmount; + exchangeFillEvent.makerFeePaid = eventLog.args.makerFeePaid; + exchangeFillEvent.takerFeePaid = eventLog.args.takerFeePaid; + exchangeFillEvent.orderHash = eventLog.args.orderHash; + exchangeFillEvent.rawMakerAssetData = eventLog.args.makerAssetData; + exchangeFillEvent.makerAssetType = makerAssetType; + exchangeFillEvent.makerAssetProxyId = makerAssetData.assetProxyId; + exchangeFillEvent.makerTokenAddress = makerAssetData.tokenAddress; + // tslint has a false positive here. Type assertion is required. + // tslint:disable-next-line:no-unnecessary-type-assertion + exchangeFillEvent.makerTokenId = bigNumbertoStringOrNull((makerAssetData as ERC721AssetData).tokenId); + exchangeFillEvent.rawTakerAssetData = eventLog.args.takerAssetData; + exchangeFillEvent.takerAssetType = takerAssetType; + exchangeFillEvent.takerAssetProxyId = takerAssetData.assetProxyId; + exchangeFillEvent.takerTokenAddress = takerAssetData.tokenAddress; + // tslint:disable-next-line:no-unnecessary-type-assertion + exchangeFillEvent.takerTokenId = bigNumbertoStringOrNull((takerAssetData as ERC721AssetData).tokenId); + return exchangeFillEvent; +} + +/** + * Converts a raw event log for a cancel event into an ExchangeCancelEvent + * entity. + * @param eventLog Raw event log (e.g. returned from contract-wrappers). + */ +export function _convertToExchangeCancelEvent( + eventLog: LogWithDecodedArgs<ExchangeCancelEventArgs>, +): ExchangeCancelEvent { + const makerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.makerAssetData); + const makerAssetType = makerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721'; + const takerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.takerAssetData); + const takerAssetType = takerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721'; + const exchangeCancelEvent = new ExchangeCancelEvent(); + exchangeCancelEvent.contractAddress = eventLog.address as string; + exchangeCancelEvent.blockNumber = eventLog.blockNumber as number; + exchangeCancelEvent.logIndex = eventLog.logIndex as number; + exchangeCancelEvent.rawData = eventLog.data as string; + exchangeCancelEvent.transactionHash = eventLog.transactionHash; + exchangeCancelEvent.makerAddress = eventLog.args.makerAddress; + exchangeCancelEvent.takerAddress = eventLog.args.takerAddress; + exchangeCancelEvent.feeRecipientAddress = eventLog.args.feeRecipientAddress; + exchangeCancelEvent.senderAddress = eventLog.args.senderAddress; + exchangeCancelEvent.orderHash = eventLog.args.orderHash; + exchangeCancelEvent.rawMakerAssetData = eventLog.args.makerAssetData; + exchangeCancelEvent.makerAssetType = makerAssetType; + exchangeCancelEvent.makerAssetProxyId = makerAssetData.assetProxyId; + exchangeCancelEvent.makerTokenAddress = makerAssetData.tokenAddress; + // tslint:disable-next-line:no-unnecessary-type-assertion + exchangeCancelEvent.makerTokenId = bigNumbertoStringOrNull((makerAssetData as ERC721AssetData).tokenId); + exchangeCancelEvent.rawTakerAssetData = eventLog.args.takerAssetData; + exchangeCancelEvent.takerAssetType = takerAssetType; + exchangeCancelEvent.takerAssetProxyId = takerAssetData.assetProxyId; + exchangeCancelEvent.takerTokenAddress = takerAssetData.tokenAddress; + // tslint:disable-next-line:no-unnecessary-type-assertion + exchangeCancelEvent.takerTokenId = bigNumbertoStringOrNull((takerAssetData as ERC721AssetData).tokenId); + return exchangeCancelEvent; +} + +/** + * Converts a raw event log for a cancelUpTo event into an + * ExchangeCancelUpToEvent entity. + * @param eventLog Raw event log (e.g. returned from contract-wrappers). + */ +export function _convertToExchangeCancelUpToEvent( + eventLog: LogWithDecodedArgs<ExchangeCancelUpToEventArgs>, +): ExchangeCancelUpToEvent { + const exchangeCancelUpToEvent = new ExchangeCancelUpToEvent(); + exchangeCancelUpToEvent.contractAddress = eventLog.address as string; + exchangeCancelUpToEvent.blockNumber = eventLog.blockNumber as number; + exchangeCancelUpToEvent.logIndex = eventLog.logIndex as number; + exchangeCancelUpToEvent.rawData = eventLog.data as string; + exchangeCancelUpToEvent.transactionHash = eventLog.transactionHash; + exchangeCancelUpToEvent.makerAddress = eventLog.args.makerAddress; + exchangeCancelUpToEvent.senderAddress = eventLog.args.senderAddress; + exchangeCancelUpToEvent.orderEpoch = eventLog.args.orderEpoch; + return exchangeCancelUpToEvent; +} diff --git a/packages/pipeline/src/parsers/events/index.ts b/packages/pipeline/src/parsers/events/index.ts index e18106c75..3f9915e8b 100644 --- a/packages/pipeline/src/parsers/events/index.ts +++ b/packages/pipeline/src/parsers/events/index.ts @@ -1,133 +1,2 @@ -import { ExchangeCancelEventArgs, ExchangeCancelUpToEventArgs, ExchangeFillEventArgs } from '@0x/contract-wrappers'; -import { assetDataUtils } from '@0x/order-utils'; -import { AssetProxyId, ERC721AssetData } from '@0x/types'; -import { LogWithDecodedArgs } from 'ethereum-types'; -import * as R from 'ramda'; - -import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeFillEvent } from '../../entities'; -import { bigNumbertoStringOrNull } from '../../utils'; - -/** - * Parses raw event logs for a fill event and returns an array of - * ExchangeFillEvent entities. - * @param eventLogs Raw event logs (e.g. returned from contract-wrappers). - */ -export const parseExchangeFillEvents: ( - eventLogs: Array<LogWithDecodedArgs<ExchangeFillEventArgs>>, -) => ExchangeFillEvent[] = R.map(_convertToExchangeFillEvent); - -/** - * Parses raw event logs for a cancel event and returns an array of - * ExchangeCancelEvent entities. - * @param eventLogs Raw event logs (e.g. returned from contract-wrappers). - */ -export const parseExchangeCancelEvents: ( - eventLogs: Array<LogWithDecodedArgs<ExchangeCancelEventArgs>>, -) => ExchangeCancelEvent[] = R.map(_convertToExchangeCancelEvent); - -/** - * Parses raw event logs for a CancelUpTo event and returns an array of - * ExchangeCancelUpToEvent entities. - * @param eventLogs Raw event logs (e.g. returned from contract-wrappers). - */ -export const parseExchangeCancelUpToEvents: ( - eventLogs: Array<LogWithDecodedArgs<ExchangeCancelUpToEventArgs>>, -) => ExchangeCancelUpToEvent[] = R.map(_convertToExchangeCancelUpToEvent); - -/** - * Converts a raw event log for a fill event into an ExchangeFillEvent entity. - * @param eventLog Raw event log (e.g. returned from contract-wrappers). - */ -export function _convertToExchangeFillEvent(eventLog: LogWithDecodedArgs<ExchangeFillEventArgs>): ExchangeFillEvent { - const makerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.makerAssetData); - const makerAssetType = makerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721'; - const takerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.takerAssetData); - const takerAssetType = takerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721'; - const exchangeFillEvent = new ExchangeFillEvent(); - exchangeFillEvent.contractAddress = eventLog.address as string; - exchangeFillEvent.blockNumber = eventLog.blockNumber as number; - exchangeFillEvent.logIndex = eventLog.logIndex as number; - exchangeFillEvent.rawData = eventLog.data as string; - exchangeFillEvent.transactionHash = eventLog.transactionHash; - exchangeFillEvent.makerAddress = eventLog.args.makerAddress; - exchangeFillEvent.takerAddress = eventLog.args.takerAddress; - exchangeFillEvent.feeRecipientAddress = eventLog.args.feeRecipientAddress; - exchangeFillEvent.senderAddress = eventLog.args.senderAddress; - exchangeFillEvent.makerAssetFilledAmount = eventLog.args.makerAssetFilledAmount; - exchangeFillEvent.takerAssetFilledAmount = eventLog.args.takerAssetFilledAmount; - exchangeFillEvent.makerFeePaid = eventLog.args.makerFeePaid; - exchangeFillEvent.takerFeePaid = eventLog.args.takerFeePaid; - exchangeFillEvent.orderHash = eventLog.args.orderHash; - exchangeFillEvent.rawMakerAssetData = eventLog.args.makerAssetData; - exchangeFillEvent.makerAssetType = makerAssetType; - exchangeFillEvent.makerAssetProxyId = makerAssetData.assetProxyId; - exchangeFillEvent.makerTokenAddress = makerAssetData.tokenAddress; - // tslint has a false positive here. Type assertion is required. - // tslint:disable-next-line:no-unnecessary-type-assertion - exchangeFillEvent.makerTokenId = bigNumbertoStringOrNull((makerAssetData as ERC721AssetData).tokenId); - exchangeFillEvent.rawTakerAssetData = eventLog.args.takerAssetData; - exchangeFillEvent.takerAssetType = takerAssetType; - exchangeFillEvent.takerAssetProxyId = takerAssetData.assetProxyId; - exchangeFillEvent.takerTokenAddress = takerAssetData.tokenAddress; - // tslint:disable-next-line:no-unnecessary-type-assertion - exchangeFillEvent.takerTokenId = bigNumbertoStringOrNull((takerAssetData as ERC721AssetData).tokenId); - return exchangeFillEvent; -} - -/** - * Converts a raw event log for a cancel event into an ExchangeCancelEvent - * entity. - * @param eventLog Raw event log (e.g. returned from contract-wrappers). - */ -export function _convertToExchangeCancelEvent( - eventLog: LogWithDecodedArgs<ExchangeCancelEventArgs>, -): ExchangeCancelEvent { - const makerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.makerAssetData); - const makerAssetType = makerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721'; - const takerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.takerAssetData); - const takerAssetType = takerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721'; - const exchangeCancelEvent = new ExchangeCancelEvent(); - exchangeCancelEvent.contractAddress = eventLog.address as string; - exchangeCancelEvent.blockNumber = eventLog.blockNumber as number; - exchangeCancelEvent.logIndex = eventLog.logIndex as number; - exchangeCancelEvent.rawData = eventLog.data as string; - exchangeCancelEvent.transactionHash = eventLog.transactionHash; - exchangeCancelEvent.makerAddress = eventLog.args.makerAddress; - exchangeCancelEvent.takerAddress = eventLog.args.takerAddress; - exchangeCancelEvent.feeRecipientAddress = eventLog.args.feeRecipientAddress; - exchangeCancelEvent.senderAddress = eventLog.args.senderAddress; - exchangeCancelEvent.orderHash = eventLog.args.orderHash; - exchangeCancelEvent.rawMakerAssetData = eventLog.args.makerAssetData; - exchangeCancelEvent.makerAssetType = makerAssetType; - exchangeCancelEvent.makerAssetProxyId = makerAssetData.assetProxyId; - exchangeCancelEvent.makerTokenAddress = makerAssetData.tokenAddress; - // tslint:disable-next-line:no-unnecessary-type-assertion - exchangeCancelEvent.makerTokenId = bigNumbertoStringOrNull((makerAssetData as ERC721AssetData).tokenId); - exchangeCancelEvent.rawTakerAssetData = eventLog.args.takerAssetData; - exchangeCancelEvent.takerAssetType = takerAssetType; - exchangeCancelEvent.takerAssetProxyId = takerAssetData.assetProxyId; - exchangeCancelEvent.takerTokenAddress = takerAssetData.tokenAddress; - // tslint:disable-next-line:no-unnecessary-type-assertion - exchangeCancelEvent.takerTokenId = bigNumbertoStringOrNull((takerAssetData as ERC721AssetData).tokenId); - return exchangeCancelEvent; -} - -/** - * Converts a raw event log for a cancelUpTo event into an - * ExchangeCancelUpToEvent entity. - * @param eventLog Raw event log (e.g. returned from contract-wrappers). - */ -export function _convertToExchangeCancelUpToEvent( - eventLog: LogWithDecodedArgs<ExchangeCancelUpToEventArgs>, -): ExchangeCancelUpToEvent { - const exchangeCancelUpToEvent = new ExchangeCancelUpToEvent(); - exchangeCancelUpToEvent.contractAddress = eventLog.address as string; - exchangeCancelUpToEvent.blockNumber = eventLog.blockNumber as number; - exchangeCancelUpToEvent.logIndex = eventLog.logIndex as number; - exchangeCancelUpToEvent.rawData = eventLog.data as string; - exchangeCancelUpToEvent.transactionHash = eventLog.transactionHash; - exchangeCancelUpToEvent.makerAddress = eventLog.args.makerAddress; - exchangeCancelUpToEvent.senderAddress = eventLog.args.senderAddress; - exchangeCancelUpToEvent.orderEpoch = eventLog.args.orderEpoch; - return exchangeCancelUpToEvent; -} +export { parseExchangeCancelEvents, parseExchangeCancelUpToEvents, parseExchangeFillEvents } from './exchange_events'; +export { parseERC20ApprovalEvents } from './erc20_events'; diff --git a/packages/pipeline/src/scripts/pull_erc20_events.ts b/packages/pipeline/src/scripts/pull_erc20_events.ts new file mode 100644 index 000000000..0ad12c97a --- /dev/null +++ b/packages/pipeline/src/scripts/pull_erc20_events.ts @@ -0,0 +1,66 @@ +// tslint:disable:no-console +import { getContractAddressesForNetworkOrThrow } from '@0x/contract-addresses'; +import { web3Factory } from '@0x/dev-utils'; +import { Web3ProviderEngine } from '@0x/subproviders'; +import { Web3Wrapper } from '@0x/web3-wrapper'; +import 'reflect-metadata'; +import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; + +import { ERC20EventsSource } from '../data_sources/contract-wrappers/erc20_events'; +import { ERC20ApprovalEvent } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseERC20ApprovalEvents } from '../parsers/events'; +import { handleError, INFURA_ROOT_URL } from '../utils'; + +const NETWORK_ID = 1; +const START_BLOCK_OFFSET = 100; // Number of blocks before the last known block to consider when updating fill events. +const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. +const BLOCK_FINALITY_THRESHOLD = 10; // When to consider blocks as final. Used to compute default endBlock. +const WETH_START_BLOCK = 4719568; // Block number when the WETH contract was deployed. + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const provider = web3Factory.getRpcProvider({ + rpcUrl: INFURA_ROOT_URL, + }); + const endBlock = await calculateEndBlockAsync(provider); + await getAndSaveWETHApprovalEventsAsync(provider, endBlock); + process.exit(0); +})().catch(handleError); + +async function getAndSaveWETHApprovalEventsAsync(provider: Web3ProviderEngine, endBlock: number): Promise<void> { + console.log('Checking existing approval events...'); + const repository = connection.getRepository(ERC20ApprovalEvent); + const startBlock = (await getStartBlockAsync(repository)) || WETH_START_BLOCK; + + console.log(`Getting WETH approval events starting at ${startBlock}...`); + const wethTokenAddress = getContractAddressesForNetworkOrThrow(NETWORK_ID).etherToken; + const eventsSource = new ERC20EventsSource(provider, NETWORK_ID, wethTokenAddress); + const eventLogs = await eventsSource.getApprovalEventsAsync(startBlock, endBlock); + + console.log(`Parsing ${eventLogs.length} WETH approval events...`); + const events = parseERC20ApprovalEvents(eventLogs); + console.log(`Retrieved and parsed ${events.length} total WETH approval events.`); + await repository.save(events, { chunk: Math.ceil(events.length / BATCH_SAVE_SIZE) }); +} + +async function calculateEndBlockAsync(provider: Web3ProviderEngine): Promise<number> { + const web3Wrapper = new Web3Wrapper(provider); + const currentBlock = await web3Wrapper.getBlockNumberAsync(); + return currentBlock - BLOCK_FINALITY_THRESHOLD; +} + +async function getStartBlockAsync(repository: Repository<ERC20ApprovalEvent>): Promise<number | null> { + const fillEventCount = await repository.count(); + if (fillEventCount === 0) { + console.log(`No existing approval events found.`); + return null; + } + const queryResult = await connection.query( + `SELECT block_number FROM raw.erc20_approval_events ORDER BY block_number DESC LIMIT 1`, + ); + const lastKnownBlock = queryResult[0].block_number; + return lastKnownBlock - START_BLOCK_OFFSET; +} diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_exchange_events.ts index 80abbb8b0..e98fc6629 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_exchange_events.ts @@ -1,5 +1,7 @@ // tslint:disable:no-console import { web3Factory } from '@0x/dev-utils'; +import { Web3ProviderEngine } from '@0x/subproviders'; +import { Web3Wrapper } from '@0x/web3-wrapper'; import R = require('ramda'); import 'reflect-metadata'; import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; @@ -12,6 +14,7 @@ import { EXCHANGE_START_BLOCK, handleError, INFURA_ROOT_URL } from '../utils'; const START_BLOCK_OFFSET = 100; // Number of blocks before the last known block to consider when updating fill events. const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. +const BLOCK_FINALITY_THRESHOLD = 10; // When to consider blocks as final. Used to compute default endBlock. let connection: Connection; @@ -20,43 +23,44 @@ let connection: Connection; const provider = web3Factory.getRpcProvider({ rpcUrl: INFURA_ROOT_URL, }); + const endBlock = await calculateEndBlockAsync(provider); const eventsSource = new ExchangeEventsSource(provider, 1); - await getFillEventsAsync(eventsSource); - await getCancelEventsAsync(eventsSource); - await getCancelUpToEventsAsync(eventsSource); + await getFillEventsAsync(eventsSource, endBlock); + await getCancelEventsAsync(eventsSource, endBlock); + await getCancelUpToEventsAsync(eventsSource, endBlock); process.exit(0); })().catch(handleError); -async function getFillEventsAsync(eventsSource: ExchangeEventsSource): Promise<void> { +async function getFillEventsAsync(eventsSource: ExchangeEventsSource, endBlock: number): Promise<void> { console.log('Checking existing fill events...'); const repository = connection.getRepository(ExchangeFillEvent); const startBlock = await getStartBlockAsync(repository); console.log(`Getting fill events starting at ${startBlock}...`); - const eventLogs = await eventsSource.getFillEventsAsync(startBlock); + const eventLogs = await eventsSource.getFillEventsAsync(startBlock, endBlock); console.log('Parsing fill events...'); const events = parseExchangeFillEvents(eventLogs); console.log(`Retrieved and parsed ${events.length} total fill events.`); await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); } -async function getCancelEventsAsync(eventsSource: ExchangeEventsSource): Promise<void> { +async function getCancelEventsAsync(eventsSource: ExchangeEventsSource, endBlock: number): Promise<void> { console.log('Checking existing cancel events...'); const repository = connection.getRepository(ExchangeCancelEvent); const startBlock = await getStartBlockAsync(repository); console.log(`Getting cancel events starting at ${startBlock}...`); - const eventLogs = await eventsSource.getCancelEventsAsync(startBlock); + const eventLogs = await eventsSource.getCancelEventsAsync(startBlock, endBlock); console.log('Parsing cancel events...'); const events = parseExchangeCancelEvents(eventLogs); console.log(`Retrieved and parsed ${events.length} total cancel events.`); await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); } -async function getCancelUpToEventsAsync(eventsSource: ExchangeEventsSource): Promise<void> { +async function getCancelUpToEventsAsync(eventsSource: ExchangeEventsSource, endBlock: number): Promise<void> { console.log('Checking existing CancelUpTo events...'); const repository = connection.getRepository(ExchangeCancelUpToEvent); const startBlock = await getStartBlockAsync(repository); console.log(`Getting CancelUpTo events starting at ${startBlock}...`); - const eventLogs = await eventsSource.getCancelUpToEventsAsync(startBlock); + const eventLogs = await eventsSource.getCancelUpToEventsAsync(startBlock, endBlock); console.log('Parsing CancelUpTo events...'); const events = parseExchangeCancelUpToEvents(eventLogs); console.log(`Retrieved and parsed ${events.length} total CancelUpTo events.`); @@ -134,3 +138,9 @@ async function saveIndividuallyWithFallbackAsync<T extends ExchangeEvent>( } } } + +async function calculateEndBlockAsync(provider: Web3ProviderEngine): Promise<number> { + const web3Wrapper = new Web3Wrapper(provider); + const currentBlock = await web3Wrapper.getBlockNumberAsync(); + return currentBlock - BLOCK_FINALITY_THRESHOLD; +} diff --git a/packages/pipeline/src/scripts/pull_missing_blocks.ts b/packages/pipeline/src/scripts/pull_missing_blocks.ts index b7bd51f08..18fe1b700 100644 --- a/packages/pipeline/src/scripts/pull_missing_blocks.ts +++ b/packages/pipeline/src/scripts/pull_missing_blocks.ts @@ -24,7 +24,7 @@ let connection: Connection; (async () => { connection = await createConnection(ormConfig as ConnectionOptions); const provider = web3Factory.getRpcProvider({ - rpcUrl: `${INFURA_ROOT_URL}/${process.env.INFURA_API_KEY}`, + rpcUrl: INFURA_ROOT_URL, }); const web3Source = new Web3Source(provider); await getAllMissingBlocks(web3Source); |