diff options
Diffstat (limited to 'packages/pipeline')
-rw-r--r-- | packages/pipeline/package.json | 22 | ||||
-rw-r--r-- | packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts | 51 | ||||
-rw-r--r-- | packages/pipeline/src/data_sources/etherscan/index.ts | 52 | ||||
-rw-r--r-- | packages/pipeline/src/data_types/events/event_utils.ts | 35 | ||||
-rw-r--r-- | packages/pipeline/src/index.ts | 43 | ||||
-rw-r--r-- | packages/pipeline/src/parsers/events/index.ts (renamed from packages/pipeline/src/data_types/events/exchange_events.ts) | 25 | ||||
-rw-r--r-- | packages/pipeline/src/parsers/sra_orders/index.ts (renamed from packages/pipeline/src/data_types/sra_orders/index.ts) | 0 | ||||
-rw-r--r-- | packages/pipeline/test/data_types/events/event_utils_test.ts | 86 | ||||
-rw-r--r-- | packages/pipeline/test/parsers/events/index_test.ts (renamed from packages/pipeline/test/data_types/events/exchange_events_test.ts) | 2 | ||||
-rw-r--r-- | packages/pipeline/test/parsers/sra_orders/index_test.ts (renamed from packages/pipeline/test/data_types/sra_orders/index_test.ts) | 11 |
10 files changed, 85 insertions, 242 deletions
diff --git a/packages/pipeline/package.json b/packages/pipeline/package.json index be23bfe2c..0071fab2c 100644 --- a/packages/pipeline/package.json +++ b/packages/pipeline/package.json @@ -1,5 +1,5 @@ { - "name": "@0xproject/pipeline", + "name": "@0x/pipeline", "version": "0.0.1", "private": true, "description": "Data pipeline for offline analysis", @@ -25,7 +25,8 @@ }, "license": "Apache-2.0", "devDependencies": { - "@0xproject/tslint-config": "^1.0.7", + "@types/ramda": "^0.25.38", + "@0x/tslint-config": "^1.0.9", "chai": "^4.1.2", "chai-as-promised": "^7.1.1", "chai-bignumber": "^2.0.2", @@ -35,14 +36,15 @@ "typescript": "3.0.1" }, "dependencies": { - "@0xproject/contract-artifacts": "^1.0.0", - "@0xproject/connect": "^2.0.4", - "@0xproject/contract-wrappers": "^1.0.1", - "@0xproject/order-utils": "^1.0.2", - "@0xproject/subproviders": "^2.0.2", - "@0xproject/types": "^1.0.1", - "@0xproject/utils": "^1.0.8", - "@types/ramda": "^0.25.38", + "@0x/dev-utils": "^1.0.13", + "@0x/contract-artifacts": "^1.0.1", + "@0x/connect": "^3.0.2", + "@0x/contract-wrappers": "^3.0.0", + "@0x/order-utils": "^2.0.0", + "@0x/subproviders": "^2.1.0", + "@0x/types": "^1.2.0", + "@0x/utils": "^2.0.3", + "@0x/web3-wrapper": "^3.1.0", "axios": "^0.18.0", "ethereum-types": "^1.0.6", "ramda": "^0.25.0", diff --git a/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts b/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts new file mode 100644 index 000000000..77217c601 --- /dev/null +++ b/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts @@ -0,0 +1,51 @@ +import { ContractWrappers, ExchangeEvents, ExchangeFillEventArgs, ExchangeWrapper } from '@0xproject/contract-wrappers'; +import { Web3ProviderEngine } from '@0xproject/subproviders'; +import { Web3Wrapper } from '@0xproject/web3-wrapper'; +import { LogWithDecodedArgs } from 'ethereum-types'; + +const BLOCK_FINALITY_THRESHOLD = 10; // When to consider blocks as final. Used to compute default toBlock. +const NUM_BLOCKS_PER_QUERY = 100000; // Number of blocks to query for events at a time. +const EXCHANGE_START_BLOCK = 6271590; // Block number when the Exchange contract was deployed to mainnet. + +export class ExchangeEventsSource { + private _exchangeWrapper: ExchangeWrapper; + private _web3Wrapper: Web3Wrapper; + constructor(provider: Web3ProviderEngine, networkId: number) { + this._web3Wrapper = new Web3Wrapper(provider); + const contractWrappers = new ContractWrappers(provider, { networkId }); + this._exchangeWrapper = contractWrappers.exchange; + } + + // TODO(albrow): Get Cancel and CancelUpTo events. + + public async getFillEventsAsync( + fromBlock: number = EXCHANGE_START_BLOCK, + toBlock?: number, + ): Promise<Array<LogWithDecodedArgs<ExchangeFillEventArgs>>> { + const calculatedToBlock = + toBlock === undefined + ? (await this._web3Wrapper.getBlockNumberAsync()) - BLOCK_FINALITY_THRESHOLD + : toBlock; + let events: Array<LogWithDecodedArgs<ExchangeFillEventArgs>> = []; + for (let currFromBlock = fromBlock; currFromBlock <= calculatedToBlock; currFromBlock += NUM_BLOCKS_PER_QUERY) { + events = events.concat( + await this._getFillEventsForRangeAsync(currFromBlock, currFromBlock + NUM_BLOCKS_PER_QUERY - 1), + ); + } + return events; + } + + private async _getFillEventsForRangeAsync( + fromBlock: number, + toBlock: number, + ): Promise<Array<LogWithDecodedArgs<ExchangeFillEventArgs>>> { + return this._exchangeWrapper.getLogsAsync<ExchangeFillEventArgs>( + ExchangeEvents.Fill, + { + fromBlock, + toBlock, + }, + {}, + ); + } +} diff --git a/packages/pipeline/src/data_sources/etherscan/index.ts b/packages/pipeline/src/data_sources/etherscan/index.ts deleted file mode 100644 index 044fff02e..000000000 --- a/packages/pipeline/src/data_sources/etherscan/index.ts +++ /dev/null @@ -1,52 +0,0 @@ -import { default as axios } from 'axios'; -import { BlockParam, BlockParamLiteral } from 'ethereum-types'; - -const ETHERSCAN_URL = 'https://api.etherscan.io/api'; - -export class Etherscan { - private readonly _apiKey: string; - constructor(apiKey: string) { - this._apiKey = apiKey; - } - - /** - * Gets the raw events for a specific contract and block range. - * @param contractAddress The address of the contract to get the events for. - * @param fromBlock The start of the block range to get events for (inclusive). - * @param toBlock The end of the block range to get events for (inclusive). - * @returns A list of decoded events. - */ - public async getContractEventsAsync( - contractAddress: string, - fromBlock: BlockParam = BlockParamLiteral.Earliest, - toBlock: BlockParam = BlockParamLiteral.Latest, - ): Promise<EventsResponse> { - const fullURL = `${ETHERSCAN_URL}?module=logs&action=getLogs&address=${contractAddress}&fromBlock=${fromBlock}&toBlock=${toBlock}&apikey=${ - this._apiKey - }`; - const resp = await axios.get<EventsResponse>(fullURL); - // TODO(albrow): Check response code. - return resp.data; - } -} - -// Raw events response from etherescan.io -export interface EventsResponse { - status: string; - message: string; - result: EventsResponseResult[]; -} - -// Events as represented in the response from etherscan.io -export interface EventsResponseResult { - address: string; - topics: string[]; - data: string; - blockNumber: string; - timeStamp: string; - gasPrice: string; - gasUsed: string; - logIndex: string; - transactionHash: string; - transactionIndex: string; -} diff --git a/packages/pipeline/src/data_types/events/event_utils.ts b/packages/pipeline/src/data_types/events/event_utils.ts deleted file mode 100644 index 6be964807..000000000 --- a/packages/pipeline/src/data_types/events/event_utils.ts +++ /dev/null @@ -1,35 +0,0 @@ -import { AbiDecoder } from '@0xproject/utils'; -import { AbiDefinition, LogEntry, LogWithDecodedArgs } from 'ethereum-types'; - -import { EventsResponseResult } from '../../data_sources/etherscan'; - -const hexRadix = 16; - -function hexToInt(hex: string): number { - return parseInt(hex.replace('0x', ''), hexRadix); -} - -// Converts a raw event response to a LogEntry -export function convertResponseToLogEntry(result: EventsResponseResult): LogEntry { - return { - logIndex: hexToInt(result.logIndex), - transactionIndex: hexToInt(result.transactionIndex), - transactionHash: result.transactionHash, - blockHash: '', - blockNumber: hexToInt(result.blockNumber), - address: result.address, - data: result.data, - topics: result.topics, - }; -} - -// Decodes a LogEntry into a LogWithDecodedArgs -export function decodeLogEntry<EventArgsType>( - contractAbi: AbiDefinition[], - log: LogEntry, -): LogWithDecodedArgs<EventArgsType> { - const abiDecoder = new AbiDecoder([contractAbi]); - const logWithDecodedArgs = abiDecoder.tryToDecodeLogOrNoop<EventArgsType>(log); - // tslint:disable-next-line:no-unnecessary-type-assertion - return logWithDecodedArgs as LogWithDecodedArgs<EventArgsType>; -} diff --git a/packages/pipeline/src/index.ts b/packages/pipeline/src/index.ts index a1dbb35ff..77c92cc34 100644 --- a/packages/pipeline/src/index.ts +++ b/packages/pipeline/src/index.ts @@ -1,52 +1,43 @@ import { HttpClient } from '@0xproject/connect'; +import { web3Factory } from '@0xproject/dev-utils'; import 'reflect-metadata'; import { Connection, createConnection } from 'typeorm'; -import { Etherscan } from './data_sources/etherscan'; -import { parseExchangeEvents } from './data_types/events/exchange_events'; -import { parseSraOrders } from './data_types/sra_orders'; -import { ExchangeCancelEvent } from './entities/ExchangeCancelEvent'; -import { ExchangeCancelUpToEvent } from './entities/ExchangeCancelUpToEvent'; -import { ExchangeFillEvent } from './entities/ExchangeFillEvent'; +import { ExchangeEventsSource } from './data_sources/contract-wrappers/exchange_events'; import { SraOrder } from './entities/SraOrder'; import { config } from './ormconfig'; - -const etherscan = new Etherscan(process.env.ETHERSCAN_API_KEY as string); -const EXCHANGE_ADDRESS = '0x4f833a24e1f95d70f028921e27040ca56e09ab0b'; +import { parseExchangeEvents } from './parsers/events'; +import { parseSraOrders } from './parsers/sra_orders'; let connection: Connection; (async () => { connection = await createConnection(config); await getExchangeEventsAsync(); - await getSraOrdersAsync(); + // await getSraOrdersAsync(); })(); +// TODO(albrow): Separately: Errors do not appear to be handled correctly. If you use the +// wrong rpcUrl it just returns early with no error. async function getExchangeEventsAsync(): Promise<void> { - const fillRepository = connection.getRepository(ExchangeFillEvent); - const cancelRepository = connection.getRepository(ExchangeCancelEvent); - const cancelUpToRepository = connection.getRepository(ExchangeCancelUpToEvent); - console.log( - `found ${(await fillRepository.count()) + - (await cancelRepository.count()) + - (await cancelUpToRepository.count())} existing events`, - ); - const rawEvents = await etherscan.getContractEventsAsync(EXCHANGE_ADDRESS); - const events = parseExchangeEvents(rawEvents); + const provider = web3Factory.getRpcProvider({ + rpcUrl: 'https://mainnet.infura.io', + }); + const exchangeEvents = new ExchangeEventsSource(provider, 1); + const eventLogs = await exchangeEvents.getFillEventsAsync(); + const events = parseExchangeEvents(eventLogs); + console.log('Got events: ' + events.length); for (const event of events) { await event.save(); } - console.log( - `now there are ${(await fillRepository.count()) + - (await cancelRepository.count()) + - (await cancelUpToRepository.count())} total events`, - ); + console.log('Saved events.'); + console.log('Exiting process'); + process.exit(0); } async function getSraOrdersAsync(): Promise<void> { const orderRepository = connection.getRepository(SraOrder); console.log(`found ${await orderRepository.count()} existing orders`); - const sraUrl = 'https://api.radarrelay.com/0x/v2'; const connect = new HttpClient(sraUrl); const rawOrders = await connect.getOrdersAsync(); diff --git a/packages/pipeline/src/data_types/events/exchange_events.ts b/packages/pipeline/src/parsers/events/index.ts index 30ef058f3..66f382dda 100644 --- a/packages/pipeline/src/data_types/events/exchange_events.ts +++ b/packages/pipeline/src/parsers/events/index.ts @@ -1,4 +1,3 @@ -import { Exchange } from '@0xproject/contract-artifacts'; import { ExchangeCancelEventArgs, ExchangeCancelUpToEventArgs, @@ -10,34 +9,16 @@ import { AssetProxyId, ERC721AssetData } from '@0xproject/types'; import { LogWithDecodedArgs } from 'ethereum-types'; import * as R from 'ramda'; -import { EventsResponse } from '../../data_sources/etherscan'; import { ExchangeCancelEvent } from '../../entities/ExchangeCancelEvent'; import { ExchangeCancelUpToEvent } from '../../entities/ExchangeCancelUpToEvent'; import { ExchangeFillEvent } from '../../entities/ExchangeFillEvent'; import { bigNumbertoStringOrNull } from '../../utils'; -import { convertResponseToLogEntry, decodeLogEntry } from './event_utils'; - export type ExchangeEventEntity = ExchangeFillEvent | ExchangeCancelEvent | ExchangeCancelUpToEvent; -export function parseExchangeEvents(rawEventsResponse: EventsResponse): ExchangeEventEntity[] { - const logEntries = R.map(convertResponseToLogEntry, rawEventsResponse.result); - const decodedLogEntries = R.map( - eventResponse => decodeLogEntry<ExchangeEventArgs>(Exchange.compilerOutput.abi, eventResponse), - logEntries, - ); - const filteredLogEntries = R.filter(shouldIncludeLogEntry, decodedLogEntries); - return R.map(_convertToEntity, filteredLogEntries); -} - -export function shouldIncludeLogEntry(logEntry: LogWithDecodedArgs<ExchangeEventArgs>): boolean { - if (!R.contains(logEntry.event, ['Fill', 'Cancel', 'CancelUpTo'])) { - return false; - } else if (logEntry.logIndex == null || isNaN(logEntry.logIndex)) { - return false; - } - return true; -} +export const parseExchangeEvents: ( + eventLogs: Array<LogWithDecodedArgs<ExchangeEventArgs>>, +) => ExchangeEventEntity[] = R.map(_convertToEntity); export function _convertToEntity(eventLog: LogWithDecodedArgs<ExchangeEventArgs>): ExchangeEventEntity { switch (eventLog.event) { diff --git a/packages/pipeline/src/data_types/sra_orders/index.ts b/packages/pipeline/src/parsers/sra_orders/index.ts index fb2b74dfe..fb2b74dfe 100644 --- a/packages/pipeline/src/data_types/sra_orders/index.ts +++ b/packages/pipeline/src/parsers/sra_orders/index.ts diff --git a/packages/pipeline/test/data_types/events/event_utils_test.ts b/packages/pipeline/test/data_types/events/event_utils_test.ts deleted file mode 100644 index 731819106..000000000 --- a/packages/pipeline/test/data_types/events/event_utils_test.ts +++ /dev/null @@ -1,86 +0,0 @@ -import { Exchange } from '@0xproject/contract-artifacts'; -import { BigNumber } from '@0xproject/utils'; -import * as chai from 'chai'; -import { DecodedLogArgs, LogEntry, LogWithDecodedArgs } from 'ethereum-types'; -import 'mocha'; - -import { EventsResponseResult } from '../../../src/data_sources/etherscan'; -import { convertResponseToLogEntry, decodeLogEntry } from '../../../src/data_types/events/event_utils'; -import { chaiSetup } from '../../utils/chai_setup'; - -chaiSetup.configure(); -const expect = chai.expect; - -describe('event_utils', () => { - describe('convertResponseToLogEntry', () => { - it('converts EventsResponseResult to LogEntry', () => { - const input: EventsResponseResult = { - address: '0x4f833a24e1f95d70f028921e27040ca56e09ab0b', - topics: [ - '0x82af639571738f4ebd4268fb0363d8957ebe1bbb9e78dba5ebd69eed39b154f0', - '0x00000000000000000000000067032ef7be8fa07c4335d0134099db0f3875e930', - '0x0000000000000000000000000000000000000000000000000000000000000000', - ], - data: '0x00000000000000000000000000000000000000000000000000000165f2d3f94d', - blockNumber: '0x61127b', - timeStamp: '0x5ba2878e', - gasPrice: '0x1a13b8600', - gasUsed: '0xd9dc', - logIndex: '0x63', - transactionHash: '0xa3f71931ddab6e758b9d1755b2715b376759f49f23fff60755f7e073367d61b5', - transactionIndex: '0x35', - }; - const expected: LogEntry = { - logIndex: 99, - transactionIndex: 53, - transactionHash: input.transactionHash, - blockHash: '', - blockNumber: 6361723, - address: input.address, - data: input.data, - topics: input.topics, - }; - const actual = convertResponseToLogEntry(input); - expect(actual).deep.equal(expected); - }); - }); - describe('decodeLogEntry', () => { - it('decodes LogEntry into LogWithDecodedArgs', () => { - const input: LogEntry = { - logIndex: 96, - transactionIndex: 52, - transactionHash: '0x02b59043e9b38b430c8c66abe67ab4a9e5509def8f8552b54231e88db1839831', - blockHash: '', - blockNumber: 6361723, - address: '0x4f833a24e1f95d70f028921e27040ca56e09ab0b', - data: - '0x00000000000000000000000067032ef7be8fa07c4335d0134099db0f3875e93000000000000000000000000067032ef7be8fa07c4335d0134099db0f3875e930000000000000000000000000000000000000000000000000000000174876e8000000000000000000000000000000000000000000000000000000000013ab668000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000001600000000000000000000000000000000000000000000000000000000000000024f47261b0000000000000000000000000e41d2489571d322189246dafa5ebde1f4699f498000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000024f47261b0000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc200000000000000000000000000000000000000000000000000000000', - topics: [ - '0x0bcc4c97732e47d9946f229edb95f5b6323f601300e4690de719993f3c371129', - '0x0000000000000000000000003f7f832abb3be28442c0e48b7222e02b322c78f3', - '0x000000000000000000000000a258b39954cef5cb142fd567a46cddb31a670124', - '0x523404b4e6f847d9aefcf5be024be396449b4635590291fd7a28a8c940843858', - ], - }; - const expected: LogWithDecodedArgs<DecodedLogArgs> = { - ...input, - event: 'Fill', - args: { - makerAddress: '0x3f7f832abb3be28442c0e48b7222e02b322c78f3', - feeRecipientAddress: '0xa258b39954cef5cb142fd567a46cddb31a670124', - takerAddress: '0x67032ef7be8fa07c4335d0134099db0f3875e930', - senderAddress: '0x67032ef7be8fa07c4335d0134099db0f3875e930', - makerAssetFilledAmount: new BigNumber('100000000000'), - takerAssetFilledAmount: new BigNumber('330000000'), - makerFeePaid: new BigNumber('0'), - takerFeePaid: new BigNumber('0'), - orderHash: '0x523404b4e6f847d9aefcf5be024be396449b4635590291fd7a28a8c940843858', - makerAssetData: '0xf47261b0000000000000000000000000e41d2489571d322189246dafa5ebde1f4699f498', - takerAssetData: '0xf47261b0000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc2', - }, - }; - const actual = decodeLogEntry(Exchange.compilerOutput.abi, input); - expect(actual).deep.equal(expected); - }); - }); -}); diff --git a/packages/pipeline/test/data_types/events/exchange_events_test.ts b/packages/pipeline/test/parsers/events/index_test.ts index f1432892d..2a2db1a94 100644 --- a/packages/pipeline/test/data_types/events/exchange_events_test.ts +++ b/packages/pipeline/test/parsers/events/index_test.ts @@ -4,8 +4,8 @@ import * as chai from 'chai'; import { LogWithDecodedArgs } from 'ethereum-types'; import 'mocha'; -import { _convertToEntity } from '../../../src/data_types/events/exchange_events'; import { ExchangeFillEvent } from '../../../src/entities/ExchangeFillEvent'; +import { _convertToEntity } from '../../../src/parsers/events'; import { chaiSetup } from '../../utils/chai_setup'; chaiSetup.configure(); diff --git a/packages/pipeline/test/data_types/sra_orders/index_test.ts b/packages/pipeline/test/parsers/sra_orders/index_test.ts index 174f89b4f..952a6f3c6 100644 --- a/packages/pipeline/test/data_types/sra_orders/index_test.ts +++ b/packages/pipeline/test/parsers/sra_orders/index_test.ts @@ -2,26 +2,17 @@ import { APIOrder } from '@0xproject/types'; import { BigNumber } from '@0xproject/utils'; import * as chai from 'chai'; import 'mocha'; -import { Connection, createConnection } from 'typeorm'; -import { _convertToEntity } from '../../../src/data_types/sra_orders'; import { SraOrder } from '../../../src/entities/SraOrder'; +import { _convertToEntity } from '../../../src/parsers/sra_orders'; import { chaiSetup } from '../../utils/chai_setup'; -import { config } from '../../../src/ormconfig'; - chaiSetup.configure(); const expect = chai.expect; // tslint:disable:custom-no-magic-numbers describe('sra_orders', () => { describe('_convertToEntity', () => { - before(async () => { - // HACK(albrow): We don't actually use this connection but it seems - // to be required because chai calls the inspect method of the - // entity and that method requires a "default" connection. - await createConnection(config); - }); it('converts ApiOrder to SraOrder entity', () => { const input: APIOrder = { order: { |