aboutsummaryrefslogtreecommitdiffstats
path: root/packages/pipeline
diff options
context:
space:
mode:
Diffstat (limited to 'packages/pipeline')
-rw-r--r--packages/pipeline/package.json22
-rw-r--r--packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts51
-rw-r--r--packages/pipeline/src/data_sources/etherscan/index.ts52
-rw-r--r--packages/pipeline/src/data_types/events/event_utils.ts35
-rw-r--r--packages/pipeline/src/index.ts43
-rw-r--r--packages/pipeline/src/parsers/events/index.ts (renamed from packages/pipeline/src/data_types/events/exchange_events.ts)25
-rw-r--r--packages/pipeline/src/parsers/sra_orders/index.ts (renamed from packages/pipeline/src/data_types/sra_orders/index.ts)0
-rw-r--r--packages/pipeline/test/data_types/events/event_utils_test.ts86
-rw-r--r--packages/pipeline/test/parsers/events/index_test.ts (renamed from packages/pipeline/test/data_types/events/exchange_events_test.ts)2
-rw-r--r--packages/pipeline/test/parsers/sra_orders/index_test.ts (renamed from packages/pipeline/test/data_types/sra_orders/index_test.ts)11
10 files changed, 85 insertions, 242 deletions
diff --git a/packages/pipeline/package.json b/packages/pipeline/package.json
index be23bfe2c..0071fab2c 100644
--- a/packages/pipeline/package.json
+++ b/packages/pipeline/package.json
@@ -1,5 +1,5 @@
{
- "name": "@0xproject/pipeline",
+ "name": "@0x/pipeline",
"version": "0.0.1",
"private": true,
"description": "Data pipeline for offline analysis",
@@ -25,7 +25,8 @@
},
"license": "Apache-2.0",
"devDependencies": {
- "@0xproject/tslint-config": "^1.0.7",
+ "@types/ramda": "^0.25.38",
+ "@0x/tslint-config": "^1.0.9",
"chai": "^4.1.2",
"chai-as-promised": "^7.1.1",
"chai-bignumber": "^2.0.2",
@@ -35,14 +36,15 @@
"typescript": "3.0.1"
},
"dependencies": {
- "@0xproject/contract-artifacts": "^1.0.0",
- "@0xproject/connect": "^2.0.4",
- "@0xproject/contract-wrappers": "^1.0.1",
- "@0xproject/order-utils": "^1.0.2",
- "@0xproject/subproviders": "^2.0.2",
- "@0xproject/types": "^1.0.1",
- "@0xproject/utils": "^1.0.8",
- "@types/ramda": "^0.25.38",
+ "@0x/dev-utils": "^1.0.13",
+ "@0x/contract-artifacts": "^1.0.1",
+ "@0x/connect": "^3.0.2",
+ "@0x/contract-wrappers": "^3.0.0",
+ "@0x/order-utils": "^2.0.0",
+ "@0x/subproviders": "^2.1.0",
+ "@0x/types": "^1.2.0",
+ "@0x/utils": "^2.0.3",
+ "@0x/web3-wrapper": "^3.1.0",
"axios": "^0.18.0",
"ethereum-types": "^1.0.6",
"ramda": "^0.25.0",
diff --git a/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts b/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts
new file mode 100644
index 000000000..77217c601
--- /dev/null
+++ b/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts
@@ -0,0 +1,51 @@
+import { ContractWrappers, ExchangeEvents, ExchangeFillEventArgs, ExchangeWrapper } from '@0xproject/contract-wrappers';
+import { Web3ProviderEngine } from '@0xproject/subproviders';
+import { Web3Wrapper } from '@0xproject/web3-wrapper';
+import { LogWithDecodedArgs } from 'ethereum-types';
+
+const BLOCK_FINALITY_THRESHOLD = 10; // When to consider blocks as final. Used to compute default toBlock.
+const NUM_BLOCKS_PER_QUERY = 100000; // Number of blocks to query for events at a time.
+const EXCHANGE_START_BLOCK = 6271590; // Block number when the Exchange contract was deployed to mainnet.
+
+export class ExchangeEventsSource {
+ private _exchangeWrapper: ExchangeWrapper;
+ private _web3Wrapper: Web3Wrapper;
+ constructor(provider: Web3ProviderEngine, networkId: number) {
+ this._web3Wrapper = new Web3Wrapper(provider);
+ const contractWrappers = new ContractWrappers(provider, { networkId });
+ this._exchangeWrapper = contractWrappers.exchange;
+ }
+
+ // TODO(albrow): Get Cancel and CancelUpTo events.
+
+ public async getFillEventsAsync(
+ fromBlock: number = EXCHANGE_START_BLOCK,
+ toBlock?: number,
+ ): Promise<Array<LogWithDecodedArgs<ExchangeFillEventArgs>>> {
+ const calculatedToBlock =
+ toBlock === undefined
+ ? (await this._web3Wrapper.getBlockNumberAsync()) - BLOCK_FINALITY_THRESHOLD
+ : toBlock;
+ let events: Array<LogWithDecodedArgs<ExchangeFillEventArgs>> = [];
+ for (let currFromBlock = fromBlock; currFromBlock <= calculatedToBlock; currFromBlock += NUM_BLOCKS_PER_QUERY) {
+ events = events.concat(
+ await this._getFillEventsForRangeAsync(currFromBlock, currFromBlock + NUM_BLOCKS_PER_QUERY - 1),
+ );
+ }
+ return events;
+ }
+
+ private async _getFillEventsForRangeAsync(
+ fromBlock: number,
+ toBlock: number,
+ ): Promise<Array<LogWithDecodedArgs<ExchangeFillEventArgs>>> {
+ return this._exchangeWrapper.getLogsAsync<ExchangeFillEventArgs>(
+ ExchangeEvents.Fill,
+ {
+ fromBlock,
+ toBlock,
+ },
+ {},
+ );
+ }
+}
diff --git a/packages/pipeline/src/data_sources/etherscan/index.ts b/packages/pipeline/src/data_sources/etherscan/index.ts
deleted file mode 100644
index 044fff02e..000000000
--- a/packages/pipeline/src/data_sources/etherscan/index.ts
+++ /dev/null
@@ -1,52 +0,0 @@
-import { default as axios } from 'axios';
-import { BlockParam, BlockParamLiteral } from 'ethereum-types';
-
-const ETHERSCAN_URL = 'https://api.etherscan.io/api';
-
-export class Etherscan {
- private readonly _apiKey: string;
- constructor(apiKey: string) {
- this._apiKey = apiKey;
- }
-
- /**
- * Gets the raw events for a specific contract and block range.
- * @param contractAddress The address of the contract to get the events for.
- * @param fromBlock The start of the block range to get events for (inclusive).
- * @param toBlock The end of the block range to get events for (inclusive).
- * @returns A list of decoded events.
- */
- public async getContractEventsAsync(
- contractAddress: string,
- fromBlock: BlockParam = BlockParamLiteral.Earliest,
- toBlock: BlockParam = BlockParamLiteral.Latest,
- ): Promise<EventsResponse> {
- const fullURL = `${ETHERSCAN_URL}?module=logs&action=getLogs&address=${contractAddress}&fromBlock=${fromBlock}&toBlock=${toBlock}&apikey=${
- this._apiKey
- }`;
- const resp = await axios.get<EventsResponse>(fullURL);
- // TODO(albrow): Check response code.
- return resp.data;
- }
-}
-
-// Raw events response from etherescan.io
-export interface EventsResponse {
- status: string;
- message: string;
- result: EventsResponseResult[];
-}
-
-// Events as represented in the response from etherscan.io
-export interface EventsResponseResult {
- address: string;
- topics: string[];
- data: string;
- blockNumber: string;
- timeStamp: string;
- gasPrice: string;
- gasUsed: string;
- logIndex: string;
- transactionHash: string;
- transactionIndex: string;
-}
diff --git a/packages/pipeline/src/data_types/events/event_utils.ts b/packages/pipeline/src/data_types/events/event_utils.ts
deleted file mode 100644
index 6be964807..000000000
--- a/packages/pipeline/src/data_types/events/event_utils.ts
+++ /dev/null
@@ -1,35 +0,0 @@
-import { AbiDecoder } from '@0xproject/utils';
-import { AbiDefinition, LogEntry, LogWithDecodedArgs } from 'ethereum-types';
-
-import { EventsResponseResult } from '../../data_sources/etherscan';
-
-const hexRadix = 16;
-
-function hexToInt(hex: string): number {
- return parseInt(hex.replace('0x', ''), hexRadix);
-}
-
-// Converts a raw event response to a LogEntry
-export function convertResponseToLogEntry(result: EventsResponseResult): LogEntry {
- return {
- logIndex: hexToInt(result.logIndex),
- transactionIndex: hexToInt(result.transactionIndex),
- transactionHash: result.transactionHash,
- blockHash: '',
- blockNumber: hexToInt(result.blockNumber),
- address: result.address,
- data: result.data,
- topics: result.topics,
- };
-}
-
-// Decodes a LogEntry into a LogWithDecodedArgs
-export function decodeLogEntry<EventArgsType>(
- contractAbi: AbiDefinition[],
- log: LogEntry,
-): LogWithDecodedArgs<EventArgsType> {
- const abiDecoder = new AbiDecoder([contractAbi]);
- const logWithDecodedArgs = abiDecoder.tryToDecodeLogOrNoop<EventArgsType>(log);
- // tslint:disable-next-line:no-unnecessary-type-assertion
- return logWithDecodedArgs as LogWithDecodedArgs<EventArgsType>;
-}
diff --git a/packages/pipeline/src/index.ts b/packages/pipeline/src/index.ts
index a1dbb35ff..77c92cc34 100644
--- a/packages/pipeline/src/index.ts
+++ b/packages/pipeline/src/index.ts
@@ -1,52 +1,43 @@
import { HttpClient } from '@0xproject/connect';
+import { web3Factory } from '@0xproject/dev-utils';
import 'reflect-metadata';
import { Connection, createConnection } from 'typeorm';
-import { Etherscan } from './data_sources/etherscan';
-import { parseExchangeEvents } from './data_types/events/exchange_events';
-import { parseSraOrders } from './data_types/sra_orders';
-import { ExchangeCancelEvent } from './entities/ExchangeCancelEvent';
-import { ExchangeCancelUpToEvent } from './entities/ExchangeCancelUpToEvent';
-import { ExchangeFillEvent } from './entities/ExchangeFillEvent';
+import { ExchangeEventsSource } from './data_sources/contract-wrappers/exchange_events';
import { SraOrder } from './entities/SraOrder';
import { config } from './ormconfig';
-
-const etherscan = new Etherscan(process.env.ETHERSCAN_API_KEY as string);
-const EXCHANGE_ADDRESS = '0x4f833a24e1f95d70f028921e27040ca56e09ab0b';
+import { parseExchangeEvents } from './parsers/events';
+import { parseSraOrders } from './parsers/sra_orders';
let connection: Connection;
(async () => {
connection = await createConnection(config);
await getExchangeEventsAsync();
- await getSraOrdersAsync();
+ // await getSraOrdersAsync();
})();
+// TODO(albrow): Separately: Errors do not appear to be handled correctly. If you use the
+// wrong rpcUrl it just returns early with no error.
async function getExchangeEventsAsync(): Promise<void> {
- const fillRepository = connection.getRepository(ExchangeFillEvent);
- const cancelRepository = connection.getRepository(ExchangeCancelEvent);
- const cancelUpToRepository = connection.getRepository(ExchangeCancelUpToEvent);
- console.log(
- `found ${(await fillRepository.count()) +
- (await cancelRepository.count()) +
- (await cancelUpToRepository.count())} existing events`,
- );
- const rawEvents = await etherscan.getContractEventsAsync(EXCHANGE_ADDRESS);
- const events = parseExchangeEvents(rawEvents);
+ const provider = web3Factory.getRpcProvider({
+ rpcUrl: 'https://mainnet.infura.io',
+ });
+ const exchangeEvents = new ExchangeEventsSource(provider, 1);
+ const eventLogs = await exchangeEvents.getFillEventsAsync();
+ const events = parseExchangeEvents(eventLogs);
+ console.log('Got events: ' + events.length);
for (const event of events) {
await event.save();
}
- console.log(
- `now there are ${(await fillRepository.count()) +
- (await cancelRepository.count()) +
- (await cancelUpToRepository.count())} total events`,
- );
+ console.log('Saved events.');
+ console.log('Exiting process');
+ process.exit(0);
}
async function getSraOrdersAsync(): Promise<void> {
const orderRepository = connection.getRepository(SraOrder);
console.log(`found ${await orderRepository.count()} existing orders`);
-
const sraUrl = 'https://api.radarrelay.com/0x/v2';
const connect = new HttpClient(sraUrl);
const rawOrders = await connect.getOrdersAsync();
diff --git a/packages/pipeline/src/data_types/events/exchange_events.ts b/packages/pipeline/src/parsers/events/index.ts
index 30ef058f3..66f382dda 100644
--- a/packages/pipeline/src/data_types/events/exchange_events.ts
+++ b/packages/pipeline/src/parsers/events/index.ts
@@ -1,4 +1,3 @@
-import { Exchange } from '@0xproject/contract-artifacts';
import {
ExchangeCancelEventArgs,
ExchangeCancelUpToEventArgs,
@@ -10,34 +9,16 @@ import { AssetProxyId, ERC721AssetData } from '@0xproject/types';
import { LogWithDecodedArgs } from 'ethereum-types';
import * as R from 'ramda';
-import { EventsResponse } from '../../data_sources/etherscan';
import { ExchangeCancelEvent } from '../../entities/ExchangeCancelEvent';
import { ExchangeCancelUpToEvent } from '../../entities/ExchangeCancelUpToEvent';
import { ExchangeFillEvent } from '../../entities/ExchangeFillEvent';
import { bigNumbertoStringOrNull } from '../../utils';
-import { convertResponseToLogEntry, decodeLogEntry } from './event_utils';
-
export type ExchangeEventEntity = ExchangeFillEvent | ExchangeCancelEvent | ExchangeCancelUpToEvent;
-export function parseExchangeEvents(rawEventsResponse: EventsResponse): ExchangeEventEntity[] {
- const logEntries = R.map(convertResponseToLogEntry, rawEventsResponse.result);
- const decodedLogEntries = R.map(
- eventResponse => decodeLogEntry<ExchangeEventArgs>(Exchange.compilerOutput.abi, eventResponse),
- logEntries,
- );
- const filteredLogEntries = R.filter(shouldIncludeLogEntry, decodedLogEntries);
- return R.map(_convertToEntity, filteredLogEntries);
-}
-
-export function shouldIncludeLogEntry(logEntry: LogWithDecodedArgs<ExchangeEventArgs>): boolean {
- if (!R.contains(logEntry.event, ['Fill', 'Cancel', 'CancelUpTo'])) {
- return false;
- } else if (logEntry.logIndex == null || isNaN(logEntry.logIndex)) {
- return false;
- }
- return true;
-}
+export const parseExchangeEvents: (
+ eventLogs: Array<LogWithDecodedArgs<ExchangeEventArgs>>,
+) => ExchangeEventEntity[] = R.map(_convertToEntity);
export function _convertToEntity(eventLog: LogWithDecodedArgs<ExchangeEventArgs>): ExchangeEventEntity {
switch (eventLog.event) {
diff --git a/packages/pipeline/src/data_types/sra_orders/index.ts b/packages/pipeline/src/parsers/sra_orders/index.ts
index fb2b74dfe..fb2b74dfe 100644
--- a/packages/pipeline/src/data_types/sra_orders/index.ts
+++ b/packages/pipeline/src/parsers/sra_orders/index.ts
diff --git a/packages/pipeline/test/data_types/events/event_utils_test.ts b/packages/pipeline/test/data_types/events/event_utils_test.ts
deleted file mode 100644
index 731819106..000000000
--- a/packages/pipeline/test/data_types/events/event_utils_test.ts
+++ /dev/null
@@ -1,86 +0,0 @@
-import { Exchange } from '@0xproject/contract-artifacts';
-import { BigNumber } from '@0xproject/utils';
-import * as chai from 'chai';
-import { DecodedLogArgs, LogEntry, LogWithDecodedArgs } from 'ethereum-types';
-import 'mocha';
-
-import { EventsResponseResult } from '../../../src/data_sources/etherscan';
-import { convertResponseToLogEntry, decodeLogEntry } from '../../../src/data_types/events/event_utils';
-import { chaiSetup } from '../../utils/chai_setup';
-
-chaiSetup.configure();
-const expect = chai.expect;
-
-describe('event_utils', () => {
- describe('convertResponseToLogEntry', () => {
- it('converts EventsResponseResult to LogEntry', () => {
- const input: EventsResponseResult = {
- address: '0x4f833a24e1f95d70f028921e27040ca56e09ab0b',
- topics: [
- '0x82af639571738f4ebd4268fb0363d8957ebe1bbb9e78dba5ebd69eed39b154f0',
- '0x00000000000000000000000067032ef7be8fa07c4335d0134099db0f3875e930',
- '0x0000000000000000000000000000000000000000000000000000000000000000',
- ],
- data: '0x00000000000000000000000000000000000000000000000000000165f2d3f94d',
- blockNumber: '0x61127b',
- timeStamp: '0x5ba2878e',
- gasPrice: '0x1a13b8600',
- gasUsed: '0xd9dc',
- logIndex: '0x63',
- transactionHash: '0xa3f71931ddab6e758b9d1755b2715b376759f49f23fff60755f7e073367d61b5',
- transactionIndex: '0x35',
- };
- const expected: LogEntry = {
- logIndex: 99,
- transactionIndex: 53,
- transactionHash: input.transactionHash,
- blockHash: '',
- blockNumber: 6361723,
- address: input.address,
- data: input.data,
- topics: input.topics,
- };
- const actual = convertResponseToLogEntry(input);
- expect(actual).deep.equal(expected);
- });
- });
- describe('decodeLogEntry', () => {
- it('decodes LogEntry into LogWithDecodedArgs', () => {
- const input: LogEntry = {
- logIndex: 96,
- transactionIndex: 52,
- transactionHash: '0x02b59043e9b38b430c8c66abe67ab4a9e5509def8f8552b54231e88db1839831',
- blockHash: '',
- blockNumber: 6361723,
- address: '0x4f833a24e1f95d70f028921e27040ca56e09ab0b',
- data:
- '0x00000000000000000000000067032ef7be8fa07c4335d0134099db0f3875e93000000000000000000000000067032ef7be8fa07c4335d0134099db0f3875e930000000000000000000000000000000000000000000000000000000174876e8000000000000000000000000000000000000000000000000000000000013ab668000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000001600000000000000000000000000000000000000000000000000000000000000024f47261b0000000000000000000000000e41d2489571d322189246dafa5ebde1f4699f498000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000024f47261b0000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc200000000000000000000000000000000000000000000000000000000',
- topics: [
- '0x0bcc4c97732e47d9946f229edb95f5b6323f601300e4690de719993f3c371129',
- '0x0000000000000000000000003f7f832abb3be28442c0e48b7222e02b322c78f3',
- '0x000000000000000000000000a258b39954cef5cb142fd567a46cddb31a670124',
- '0x523404b4e6f847d9aefcf5be024be396449b4635590291fd7a28a8c940843858',
- ],
- };
- const expected: LogWithDecodedArgs<DecodedLogArgs> = {
- ...input,
- event: 'Fill',
- args: {
- makerAddress: '0x3f7f832abb3be28442c0e48b7222e02b322c78f3',
- feeRecipientAddress: '0xa258b39954cef5cb142fd567a46cddb31a670124',
- takerAddress: '0x67032ef7be8fa07c4335d0134099db0f3875e930',
- senderAddress: '0x67032ef7be8fa07c4335d0134099db0f3875e930',
- makerAssetFilledAmount: new BigNumber('100000000000'),
- takerAssetFilledAmount: new BigNumber('330000000'),
- makerFeePaid: new BigNumber('0'),
- takerFeePaid: new BigNumber('0'),
- orderHash: '0x523404b4e6f847d9aefcf5be024be396449b4635590291fd7a28a8c940843858',
- makerAssetData: '0xf47261b0000000000000000000000000e41d2489571d322189246dafa5ebde1f4699f498',
- takerAssetData: '0xf47261b0000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc2',
- },
- };
- const actual = decodeLogEntry(Exchange.compilerOutput.abi, input);
- expect(actual).deep.equal(expected);
- });
- });
-});
diff --git a/packages/pipeline/test/data_types/events/exchange_events_test.ts b/packages/pipeline/test/parsers/events/index_test.ts
index f1432892d..2a2db1a94 100644
--- a/packages/pipeline/test/data_types/events/exchange_events_test.ts
+++ b/packages/pipeline/test/parsers/events/index_test.ts
@@ -4,8 +4,8 @@ import * as chai from 'chai';
import { LogWithDecodedArgs } from 'ethereum-types';
import 'mocha';
-import { _convertToEntity } from '../../../src/data_types/events/exchange_events';
import { ExchangeFillEvent } from '../../../src/entities/ExchangeFillEvent';
+import { _convertToEntity } from '../../../src/parsers/events';
import { chaiSetup } from '../../utils/chai_setup';
chaiSetup.configure();
diff --git a/packages/pipeline/test/data_types/sra_orders/index_test.ts b/packages/pipeline/test/parsers/sra_orders/index_test.ts
index 174f89b4f..952a6f3c6 100644
--- a/packages/pipeline/test/data_types/sra_orders/index_test.ts
+++ b/packages/pipeline/test/parsers/sra_orders/index_test.ts
@@ -2,26 +2,17 @@ import { APIOrder } from '@0xproject/types';
import { BigNumber } from '@0xproject/utils';
import * as chai from 'chai';
import 'mocha';
-import { Connection, createConnection } from 'typeorm';
-import { _convertToEntity } from '../../../src/data_types/sra_orders';
import { SraOrder } from '../../../src/entities/SraOrder';
+import { _convertToEntity } from '../../../src/parsers/sra_orders';
import { chaiSetup } from '../../utils/chai_setup';
-import { config } from '../../../src/ormconfig';
-
chaiSetup.configure();
const expect = chai.expect;
// tslint:disable:custom-no-magic-numbers
describe('sra_orders', () => {
describe('_convertToEntity', () => {
- before(async () => {
- // HACK(albrow): We don't actually use this connection but it seems
- // to be required because chai calls the inspect method of the
- // entity and that method requires a "default" connection.
- await createConnection(config);
- });
it('converts ApiOrder to SraOrder entity', () => {
const input: APIOrder = {
order: {