From b4cdb14b9b79589d7b24fd7655406c15b6bb00f6 Mon Sep 17 00:00:00 2001
From: Alex Browne <stephenalexbrowne@gmail.com>
Date: Tue, 11 Dec 2018 15:16:05 -0800
Subject: Refactor event scraping and add support for scraping ERC20 approval
 events (#1401)

* Refactor event scraping and add support for scraping ERC20 approval events

* Add tests for data_sources/contract-wrappers/utils
---
 .../1544131464368-CreateERC20ApprovalEvents.ts     |  26 ++++
 packages/pipeline/package.json                     |   1 +
 .../data_sources/contract-wrappers/erc20_events.ts |  45 +++++++
 .../contract-wrappers/exchange_events.ts           |  76 ++++-------
 .../src/data_sources/contract-wrappers/utils.ts    |  67 ++++++++++
 .../pipeline/src/entities/erc20_approval_event.ts  |  26 ++++
 packages/pipeline/src/entities/index.ts            |   1 +
 packages/pipeline/src/ormconfig.ts                 |   2 +
 .../pipeline/src/parsers/events/erc20_events.ts    |  34 +++++
 .../pipeline/src/parsers/events/exchange_events.ts | 133 +++++++++++++++++++
 packages/pipeline/src/parsers/events/index.ts      | 135 +------------------
 packages/pipeline/src/scripts/pull_erc20_events.ts |  66 ++++++++++
 .../pipeline/src/scripts/pull_exchange_events.ts   | 146 +++++++++++++++++++++
 .../pipeline/src/scripts/pull_missing_blocks.ts    |   2 +-
 .../pipeline/src/scripts/pull_missing_events.ts    | 136 -------------------
 .../data_sources/contract-wrappers/utils_test.ts   | 109 +++++++++++++++
 .../test/entities/erc20_approval_events_test.ts    |  29 ++++
 packages/pipeline/test/parsers/bloxy/index_test.ts |   1 -
 .../test/parsers/events/erc20_events_test.ts       |  54 ++++++++
 .../test/parsers/events/exchange_events_test.ts    |  78 +++++++++++
 .../pipeline/test/parsers/events/index_test.ts     |  78 -----------
 21 files changed, 845 insertions(+), 400 deletions(-)
 create mode 100644 packages/pipeline/migrations/1544131464368-CreateERC20ApprovalEvents.ts
 create mode 100644 packages/pipeline/src/data_sources/contract-wrappers/erc20_events.ts
 create mode 100644 packages/pipeline/src/data_sources/contract-wrappers/utils.ts
 create mode 100644 packages/pipeline/src/entities/erc20_approval_event.ts
 create mode 100644 packages/pipeline/src/parsers/events/erc20_events.ts
 create mode 100644 packages/pipeline/src/parsers/events/exchange_events.ts
 create mode 100644 packages/pipeline/src/scripts/pull_erc20_events.ts
 create mode 100644 packages/pipeline/src/scripts/pull_exchange_events.ts
 delete mode 100644 packages/pipeline/src/scripts/pull_missing_events.ts
 create mode 100644 packages/pipeline/test/data_sources/contract-wrappers/utils_test.ts
 create mode 100644 packages/pipeline/test/entities/erc20_approval_events_test.ts
 create mode 100644 packages/pipeline/test/parsers/events/erc20_events_test.ts
 create mode 100644 packages/pipeline/test/parsers/events/exchange_events_test.ts
 delete mode 100644 packages/pipeline/test/parsers/events/index_test.ts

diff --git a/packages/pipeline/migrations/1544131464368-CreateERC20ApprovalEvents.ts b/packages/pipeline/migrations/1544131464368-CreateERC20ApprovalEvents.ts
new file mode 100644
index 000000000..2e84e0ec8
--- /dev/null
+++ b/packages/pipeline/migrations/1544131464368-CreateERC20ApprovalEvents.ts
@@ -0,0 +1,26 @@
+import { MigrationInterface, QueryRunner, Table } from 'typeorm';
+
+const erc20ApprovalEvents = new Table({
+    name: 'raw.erc20_approval_events',
+    columns: [
+        { name: 'token_address', type: 'varchar(42)', isPrimary: true },
+        { name: 'log_index', type: 'integer', isPrimary: true },
+        { name: 'block_number', type: 'bigint', isPrimary: true },
+
+        { name: 'raw_data', type: 'varchar' },
+        { name: 'transaction_hash', type: 'varchar' },
+        { name: 'owner_address', type: 'varchar(42)' },
+        { name: 'spender_address', type: 'varchar(42)' },
+        { name: 'amount', type: 'numeric' },
+    ],
+});
+
+export class CreateERC20TokenApprovalEvents1544131464368 implements MigrationInterface {
+    public async up(queryRunner: QueryRunner): Promise<any> {
+        await queryRunner.createTable(erc20ApprovalEvents);
+    }
+
+    public async down(queryRunner: QueryRunner): Promise<any> {
+        await queryRunner.dropTable(erc20ApprovalEvents);
+    }
+}
diff --git a/packages/pipeline/package.json b/packages/pipeline/package.json
index 4fde906b8..ca95a7514 100644
--- a/packages/pipeline/package.json
+++ b/packages/pipeline/package.json
@@ -40,6 +40,7 @@
     },
     "dependencies": {
         "@0x/connect": "^3.0.2",
+        "@0x/contract-addresses": "^2.0.0",
         "@0x/contract-artifacts": "^1.0.1",
         "@0x/contract-wrappers": "^3.0.0",
         "@0x/dev-utils": "^1.0.13",
diff --git a/packages/pipeline/src/data_sources/contract-wrappers/erc20_events.ts b/packages/pipeline/src/data_sources/contract-wrappers/erc20_events.ts
new file mode 100644
index 000000000..e0098122f
--- /dev/null
+++ b/packages/pipeline/src/data_sources/contract-wrappers/erc20_events.ts
@@ -0,0 +1,45 @@
+import {
+    ContractWrappers,
+    ERC20TokenApprovalEventArgs,
+    ERC20TokenEvents,
+    ERC20TokenWrapper,
+} from '@0x/contract-wrappers';
+import { Web3ProviderEngine } from '@0x/subproviders';
+import { LogWithDecodedArgs } from 'ethereum-types';
+
+import { GetEventsFunc, getEventsWithPaginationAsync } from './utils';
+
+export class ERC20EventsSource {
+    private readonly _erc20Wrapper: ERC20TokenWrapper;
+    private readonly _tokenAddress: string;
+    constructor(provider: Web3ProviderEngine, networkId: number, tokenAddress: string) {
+        const contractWrappers = new ContractWrappers(provider, { networkId });
+        this._erc20Wrapper = contractWrappers.erc20Token;
+        this._tokenAddress = tokenAddress;
+    }
+
+    public async getApprovalEventsAsync(
+        startBlock: number,
+        endBlock: number,
+    ): Promise<Array<LogWithDecodedArgs<ERC20TokenApprovalEventArgs>>> {
+        return getEventsWithPaginationAsync(
+            this._getApprovalEventsForRangeAsync.bind(this) as GetEventsFunc<ERC20TokenApprovalEventArgs>,
+            startBlock,
+            endBlock,
+        );
+    }
+
+    // Gets all approval events of for a specific sub-range. This getter
+    // function will be called during each step of pagination.
+    private async _getApprovalEventsForRangeAsync(
+        fromBlock: number,
+        toBlock: number,
+    ): Promise<Array<LogWithDecodedArgs<ERC20TokenApprovalEventArgs>>> {
+        return this._erc20Wrapper.getLogsAsync<ERC20TokenApprovalEventArgs>(
+            this._tokenAddress,
+            ERC20TokenEvents.Approval,
+            { fromBlock, toBlock },
+            {},
+        );
+    }
+}
diff --git a/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts b/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts
index 1717eb8b3..58691e2ab 100644
--- a/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts
+++ b/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts
@@ -8,78 +8,52 @@ import {
     ExchangeWrapper,
 } from '@0x/contract-wrappers';
 import { Web3ProviderEngine } from '@0x/subproviders';
-import { Web3Wrapper } from '@0x/web3-wrapper';
 import { LogWithDecodedArgs } from 'ethereum-types';
 
-import { EXCHANGE_START_BLOCK } from '../../utils';
-
-const BLOCK_FINALITY_THRESHOLD = 10; // When to consider blocks as final. Used to compute default toBlock.
-const NUM_BLOCKS_PER_QUERY = 20000; // Number of blocks to query for events at a time.
+import { GetEventsFunc, getEventsWithPaginationAsync } from './utils';
 
 export class ExchangeEventsSource {
     private readonly _exchangeWrapper: ExchangeWrapper;
-    private readonly _web3Wrapper: Web3Wrapper;
     constructor(provider: Web3ProviderEngine, networkId: number) {
-        this._web3Wrapper = new Web3Wrapper(provider);
         const contractWrappers = new ContractWrappers(provider, { networkId });
         this._exchangeWrapper = contractWrappers.exchange;
     }
 
     public async getFillEventsAsync(
-        fromBlock?: number,
-        toBlock?: number,
+        startBlock: number,
+        endBlock: number,
     ): Promise<Array<LogWithDecodedArgs<ExchangeFillEventArgs>>> {
-        return this._getEventsAsync<ExchangeFillEventArgs>(ExchangeEvents.Fill, fromBlock, toBlock);
+        const getFillEventsForRangeAsync = this._makeGetterFuncForEventType<ExchangeFillEventArgs>(ExchangeEvents.Fill);
+        return getEventsWithPaginationAsync(getFillEventsForRangeAsync, startBlock, endBlock);
     }
 
     public async getCancelEventsAsync(
-        fromBlock?: number,
-        toBlock?: number,
+        startBlock: number,
+        endBlock: number,
     ): Promise<Array<LogWithDecodedArgs<ExchangeCancelEventArgs>>> {
-        return this._getEventsAsync<ExchangeCancelEventArgs>(ExchangeEvents.Cancel, fromBlock, toBlock);
+        const getCancelEventsForRangeAsync = this._makeGetterFuncForEventType<ExchangeCancelEventArgs>(
+            ExchangeEvents.Cancel,
+        );
+        return getEventsWithPaginationAsync(getCancelEventsForRangeAsync, startBlock, endBlock);
     }
 
     public async getCancelUpToEventsAsync(
-        fromBlock?: number,
-        toBlock?: number,
+        startBlock: number,
+        endBlock: number,
     ): Promise<Array<LogWithDecodedArgs<ExchangeCancelUpToEventArgs>>> {
-        return this._getEventsAsync<ExchangeCancelUpToEventArgs>(ExchangeEvents.CancelUpTo, fromBlock, toBlock);
-    }
-
-    private async _getEventsAsync<ArgsType extends ExchangeEventArgs>(
-        eventName: ExchangeEvents,
-        fromBlock: number = EXCHANGE_START_BLOCK,
-        toBlock?: number,
-    ): Promise<Array<LogWithDecodedArgs<ArgsType>>> {
-        const calculatedToBlock =
-            toBlock === undefined
-                ? (await this._web3Wrapper.getBlockNumberAsync()) - BLOCK_FINALITY_THRESHOLD
-                : toBlock;
-        let events: Array<LogWithDecodedArgs<ArgsType>> = [];
-        for (let currFromBlock = fromBlock; currFromBlock <= calculatedToBlock; currFromBlock += NUM_BLOCKS_PER_QUERY) {
-            events = events.concat(
-                await this._getEventsForRangeAsync<ArgsType>(
-                    eventName,
-                    currFromBlock,
-                    Math.min(currFromBlock + NUM_BLOCKS_PER_QUERY - 1, calculatedToBlock),
-                ),
-            );
-        }
-        return events;
+        const getCancelUpToEventsForRangeAsync = this._makeGetterFuncForEventType<ExchangeCancelUpToEventArgs>(
+            ExchangeEvents.CancelUpTo,
+        );
+        return getEventsWithPaginationAsync(getCancelUpToEventsForRangeAsync, startBlock, endBlock);
     }
 
-    private async _getEventsForRangeAsync<ArgsType extends ExchangeEventArgs>(
-        eventName: ExchangeEvents,
-        fromBlock: number,
-        toBlock: number,
-    ): Promise<Array<LogWithDecodedArgs<ArgsType>>> {
-        return this._exchangeWrapper.getLogsAsync<ArgsType>(
-            eventName,
-            {
-                fromBlock,
-                toBlock,
-            },
-            {},
-        );
+    // Returns a getter function which gets all events of a specific type for a
+    // specific sub-range. This getter function will be called during each step
+    // of pagination.
+    private _makeGetterFuncForEventType<ArgsType extends ExchangeEventArgs>(
+        eventType: ExchangeEvents,
+    ): GetEventsFunc<ArgsType> {
+        return async (fromBlock: number, toBlock: number) =>
+            this._exchangeWrapper.getLogsAsync<ArgsType>(eventType, { fromBlock, toBlock }, {});
     }
 }
diff --git a/packages/pipeline/src/data_sources/contract-wrappers/utils.ts b/packages/pipeline/src/data_sources/contract-wrappers/utils.ts
new file mode 100644
index 000000000..67660a37e
--- /dev/null
+++ b/packages/pipeline/src/data_sources/contract-wrappers/utils.ts
@@ -0,0 +1,67 @@
+import { DecodedLogArgs, LogWithDecodedArgs } from 'ethereum-types';
+
+const NUM_BLOCKS_PER_QUERY = 10000; // Number of blocks to query for events at a time.
+const NUM_RETRIES = 3; // Number of retries if a request fails or times out.
+
+export type GetEventsFunc<ArgsType extends DecodedLogArgs> = (
+    fromBlock: number,
+    toBlock: number,
+) => Promise<Array<LogWithDecodedArgs<ArgsType>>>;
+
+/**
+ * Gets all events between the given startBlock and endBlock by querying for
+ * NUM_BLOCKS_PER_QUERY at a time. Accepts a getter function in order to
+ * maximize code re-use and allow for getting different types of events for
+ * different contracts. If the getter function throws with a retryable error,
+ * it will automatically be retried up to NUM_RETRIES times.
+ * @param getEventsAsync A getter function which will be called for each step during pagination.
+ * @param startBlock The start of the entire block range to get events for.
+ * @param endBlock The end of the entire block range to get events for.
+ */
+export async function getEventsWithPaginationAsync<ArgsType extends DecodedLogArgs>(
+    getEventsAsync: GetEventsFunc<ArgsType>,
+    startBlock: number,
+    endBlock: number,
+): Promise<Array<LogWithDecodedArgs<ArgsType>>> {
+    let events: Array<LogWithDecodedArgs<ArgsType>> = [];
+    for (let fromBlock = startBlock; fromBlock <= endBlock; fromBlock += NUM_BLOCKS_PER_QUERY) {
+        const toBlock = Math.min(fromBlock + NUM_BLOCKS_PER_QUERY - 1, endBlock);
+        const eventsInRange = await _getEventsWithRetriesAsync(getEventsAsync, NUM_RETRIES, fromBlock, toBlock);
+        events = events.concat(eventsInRange);
+    }
+    return events;
+}
+
+/**
+ * Calls the getEventsAsync function and retries up to numRetries times if it
+ * throws with an error that is considered retryable.
+ * @param getEventsAsync a function that will be called on each iteration.
+ * @param numRetries the maximum number times to retry getEventsAsync if it fails with a retryable error.
+ * @param fromBlock the start of the sub-range of blocks we are getting events for.
+ * @param toBlock the end of the sub-range of blocks we are getting events for.
+ */
+export async function _getEventsWithRetriesAsync<ArgsType extends DecodedLogArgs>(
+    getEventsAsync: GetEventsFunc<ArgsType>,
+    numRetries: number,
+    fromBlock: number,
+    toBlock: number,
+): Promise<Array<LogWithDecodedArgs<ArgsType>>> {
+    let eventsInRange: Array<LogWithDecodedArgs<ArgsType>> = [];
+    for (let i = 0; i <= numRetries; i++) {
+        try {
+            eventsInRange = await getEventsAsync(fromBlock, toBlock);
+        } catch (err) {
+            if (isErrorRetryable(err) && i < numRetries) {
+                continue;
+            } else {
+                throw err;
+            }
+        }
+        break;
+    }
+    return eventsInRange;
+}
+
+function isErrorRetryable(err: Error): boolean {
+    return err.message.includes('network timeout');
+}
diff --git a/packages/pipeline/src/entities/erc20_approval_event.ts b/packages/pipeline/src/entities/erc20_approval_event.ts
new file mode 100644
index 000000000..69cdfcb0b
--- /dev/null
+++ b/packages/pipeline/src/entities/erc20_approval_event.ts
@@ -0,0 +1,26 @@
+import { BigNumber } from '@0x/utils';
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+import { bigNumberTransformer, numberToBigIntTransformer } from '../utils';
+
+@Entity({ name: 'erc20_approval_events', schema: 'raw' })
+export class ERC20ApprovalEvent {
+    @PrimaryColumn({ name: 'token_address' })
+    public tokenAddress!: string;
+    @PrimaryColumn({ name: 'log_index' })
+    public logIndex!: number;
+    @PrimaryColumn({ name: 'block_number', transformer: numberToBigIntTransformer })
+    public blockNumber!: number;
+
+    @Column({ name: 'raw_data' })
+    public rawData!: string;
+
+    @Column({ name: 'transaction_hash' })
+    public transactionHash!: string;
+    @Column({ name: 'owner_address' })
+    public ownerAddress!: string;
+    @Column({ name: 'spender_address' })
+    public spenderAddress!: string;
+    @Column({ name: 'amount', type: 'numeric', transformer: bigNumberTransformer })
+    public amount!: BigNumber;
+}
diff --git a/packages/pipeline/src/entities/index.ts b/packages/pipeline/src/entities/index.ts
index db0814e38..cc3de78bb 100644
--- a/packages/pipeline/src/entities/index.ts
+++ b/packages/pipeline/src/entities/index.ts
@@ -14,5 +14,6 @@ export { SraOrdersObservedTimeStamp, createObservedTimestampForOrder } from './s
 export { TokenMetadata } from './token_metadata';
 export { TokenOrderbookSnapshot } from './token_order';
 export { Transaction } from './transaction';
+export { ERC20ApprovalEvent } from './erc20_approval_event';
 
 export type ExchangeEvent = ExchangeFillEvent | ExchangeCancelEvent | ExchangeCancelUpToEvent;
diff --git a/packages/pipeline/src/ormconfig.ts b/packages/pipeline/src/ormconfig.ts
index 9f7815b4e..fe11d81d5 100644
--- a/packages/pipeline/src/ormconfig.ts
+++ b/packages/pipeline/src/ormconfig.ts
@@ -3,6 +3,7 @@ import { ConnectionOptions } from 'typeorm';
 import {
     Block,
     DexTrade,
+    ERC20ApprovalEvent,
     ExchangeCancelEvent,
     ExchangeCancelUpToEvent,
     ExchangeFillEvent,
@@ -21,6 +22,7 @@ const entities = [
     ExchangeCancelEvent,
     ExchangeCancelUpToEvent,
     ExchangeFillEvent,
+    ERC20ApprovalEvent,
     OHLCVExternal,
     Relayer,
     SraOrder,
diff --git a/packages/pipeline/src/parsers/events/erc20_events.ts b/packages/pipeline/src/parsers/events/erc20_events.ts
new file mode 100644
index 000000000..caf9984d0
--- /dev/null
+++ b/packages/pipeline/src/parsers/events/erc20_events.ts
@@ -0,0 +1,34 @@
+import { ERC20TokenApprovalEventArgs } from '@0x/contract-wrappers';
+import { LogWithDecodedArgs } from 'ethereum-types';
+import * as R from 'ramda';
+
+import { ERC20ApprovalEvent } from '../../entities';
+
+/**
+ * Parses raw event logs for an ERC20 approval event and returns an array of
+ * ERC20ApprovalEvent entities.
+ * @param eventLogs Raw event logs (e.g. returned from contract-wrappers).
+ */
+export const parseERC20ApprovalEvents: (
+    eventLogs: Array<LogWithDecodedArgs<ERC20TokenApprovalEventArgs>>,
+) => ERC20ApprovalEvent[] = R.map(_convertToERC20ApprovalEvent);
+
+/**
+ * Converts a raw event log for an ERC20 approval event into an
+ * ERC20ApprovalEvent entity.
+ * @param eventLog Raw event log (e.g. returned from contract-wrappers).
+ */
+export function _convertToERC20ApprovalEvent(
+    eventLog: LogWithDecodedArgs<ERC20TokenApprovalEventArgs>,
+): ERC20ApprovalEvent {
+    const erc20ApprovalEvent = new ERC20ApprovalEvent();
+    erc20ApprovalEvent.tokenAddress = eventLog.address as string;
+    erc20ApprovalEvent.blockNumber = eventLog.blockNumber as number;
+    erc20ApprovalEvent.logIndex = eventLog.logIndex as number;
+    erc20ApprovalEvent.rawData = eventLog.data as string;
+    erc20ApprovalEvent.transactionHash = eventLog.transactionHash;
+    erc20ApprovalEvent.ownerAddress = eventLog.args._owner;
+    erc20ApprovalEvent.spenderAddress = eventLog.args._spender;
+    erc20ApprovalEvent.amount = eventLog.args._value;
+    return erc20ApprovalEvent;
+}
diff --git a/packages/pipeline/src/parsers/events/exchange_events.ts b/packages/pipeline/src/parsers/events/exchange_events.ts
new file mode 100644
index 000000000..e18106c75
--- /dev/null
+++ b/packages/pipeline/src/parsers/events/exchange_events.ts
@@ -0,0 +1,133 @@
+import { ExchangeCancelEventArgs, ExchangeCancelUpToEventArgs, ExchangeFillEventArgs } from '@0x/contract-wrappers';
+import { assetDataUtils } from '@0x/order-utils';
+import { AssetProxyId, ERC721AssetData } from '@0x/types';
+import { LogWithDecodedArgs } from 'ethereum-types';
+import * as R from 'ramda';
+
+import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeFillEvent } from '../../entities';
+import { bigNumbertoStringOrNull } from '../../utils';
+
+/**
+ * Parses raw event logs for a fill event and returns an array of
+ * ExchangeFillEvent entities.
+ * @param eventLogs Raw event logs (e.g. returned from contract-wrappers).
+ */
+export const parseExchangeFillEvents: (
+    eventLogs: Array<LogWithDecodedArgs<ExchangeFillEventArgs>>,
+) => ExchangeFillEvent[] = R.map(_convertToExchangeFillEvent);
+
+/**
+ * Parses raw event logs for a cancel event and returns an array of
+ * ExchangeCancelEvent entities.
+ * @param eventLogs Raw event logs (e.g. returned from contract-wrappers).
+ */
+export const parseExchangeCancelEvents: (
+    eventLogs: Array<LogWithDecodedArgs<ExchangeCancelEventArgs>>,
+) => ExchangeCancelEvent[] = R.map(_convertToExchangeCancelEvent);
+
+/**
+ * Parses raw event logs for a CancelUpTo event and returns an array of
+ * ExchangeCancelUpToEvent entities.
+ * @param eventLogs Raw event logs (e.g. returned from contract-wrappers).
+ */
+export const parseExchangeCancelUpToEvents: (
+    eventLogs: Array<LogWithDecodedArgs<ExchangeCancelUpToEventArgs>>,
+) => ExchangeCancelUpToEvent[] = R.map(_convertToExchangeCancelUpToEvent);
+
+/**
+ * Converts a raw event log for a fill event into an ExchangeFillEvent entity.
+ * @param eventLog Raw event log (e.g. returned from contract-wrappers).
+ */
+export function _convertToExchangeFillEvent(eventLog: LogWithDecodedArgs<ExchangeFillEventArgs>): ExchangeFillEvent {
+    const makerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.makerAssetData);
+    const makerAssetType = makerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721';
+    const takerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.takerAssetData);
+    const takerAssetType = takerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721';
+    const exchangeFillEvent = new ExchangeFillEvent();
+    exchangeFillEvent.contractAddress = eventLog.address as string;
+    exchangeFillEvent.blockNumber = eventLog.blockNumber as number;
+    exchangeFillEvent.logIndex = eventLog.logIndex as number;
+    exchangeFillEvent.rawData = eventLog.data as string;
+    exchangeFillEvent.transactionHash = eventLog.transactionHash;
+    exchangeFillEvent.makerAddress = eventLog.args.makerAddress;
+    exchangeFillEvent.takerAddress = eventLog.args.takerAddress;
+    exchangeFillEvent.feeRecipientAddress = eventLog.args.feeRecipientAddress;
+    exchangeFillEvent.senderAddress = eventLog.args.senderAddress;
+    exchangeFillEvent.makerAssetFilledAmount = eventLog.args.makerAssetFilledAmount;
+    exchangeFillEvent.takerAssetFilledAmount = eventLog.args.takerAssetFilledAmount;
+    exchangeFillEvent.makerFeePaid = eventLog.args.makerFeePaid;
+    exchangeFillEvent.takerFeePaid = eventLog.args.takerFeePaid;
+    exchangeFillEvent.orderHash = eventLog.args.orderHash;
+    exchangeFillEvent.rawMakerAssetData = eventLog.args.makerAssetData;
+    exchangeFillEvent.makerAssetType = makerAssetType;
+    exchangeFillEvent.makerAssetProxyId = makerAssetData.assetProxyId;
+    exchangeFillEvent.makerTokenAddress = makerAssetData.tokenAddress;
+    // tslint has a false positive here. Type assertion is required.
+    // tslint:disable-next-line:no-unnecessary-type-assertion
+    exchangeFillEvent.makerTokenId = bigNumbertoStringOrNull((makerAssetData as ERC721AssetData).tokenId);
+    exchangeFillEvent.rawTakerAssetData = eventLog.args.takerAssetData;
+    exchangeFillEvent.takerAssetType = takerAssetType;
+    exchangeFillEvent.takerAssetProxyId = takerAssetData.assetProxyId;
+    exchangeFillEvent.takerTokenAddress = takerAssetData.tokenAddress;
+    // tslint:disable-next-line:no-unnecessary-type-assertion
+    exchangeFillEvent.takerTokenId = bigNumbertoStringOrNull((takerAssetData as ERC721AssetData).tokenId);
+    return exchangeFillEvent;
+}
+
+/**
+ * Converts a raw event log for a cancel event into an ExchangeCancelEvent
+ * entity.
+ * @param eventLog Raw event log (e.g. returned from contract-wrappers).
+ */
+export function _convertToExchangeCancelEvent(
+    eventLog: LogWithDecodedArgs<ExchangeCancelEventArgs>,
+): ExchangeCancelEvent {
+    const makerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.makerAssetData);
+    const makerAssetType = makerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721';
+    const takerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.takerAssetData);
+    const takerAssetType = takerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721';
+    const exchangeCancelEvent = new ExchangeCancelEvent();
+    exchangeCancelEvent.contractAddress = eventLog.address as string;
+    exchangeCancelEvent.blockNumber = eventLog.blockNumber as number;
+    exchangeCancelEvent.logIndex = eventLog.logIndex as number;
+    exchangeCancelEvent.rawData = eventLog.data as string;
+    exchangeCancelEvent.transactionHash = eventLog.transactionHash;
+    exchangeCancelEvent.makerAddress = eventLog.args.makerAddress;
+    exchangeCancelEvent.takerAddress = eventLog.args.takerAddress;
+    exchangeCancelEvent.feeRecipientAddress = eventLog.args.feeRecipientAddress;
+    exchangeCancelEvent.senderAddress = eventLog.args.senderAddress;
+    exchangeCancelEvent.orderHash = eventLog.args.orderHash;
+    exchangeCancelEvent.rawMakerAssetData = eventLog.args.makerAssetData;
+    exchangeCancelEvent.makerAssetType = makerAssetType;
+    exchangeCancelEvent.makerAssetProxyId = makerAssetData.assetProxyId;
+    exchangeCancelEvent.makerTokenAddress = makerAssetData.tokenAddress;
+    // tslint:disable-next-line:no-unnecessary-type-assertion
+    exchangeCancelEvent.makerTokenId = bigNumbertoStringOrNull((makerAssetData as ERC721AssetData).tokenId);
+    exchangeCancelEvent.rawTakerAssetData = eventLog.args.takerAssetData;
+    exchangeCancelEvent.takerAssetType = takerAssetType;
+    exchangeCancelEvent.takerAssetProxyId = takerAssetData.assetProxyId;
+    exchangeCancelEvent.takerTokenAddress = takerAssetData.tokenAddress;
+    // tslint:disable-next-line:no-unnecessary-type-assertion
+    exchangeCancelEvent.takerTokenId = bigNumbertoStringOrNull((takerAssetData as ERC721AssetData).tokenId);
+    return exchangeCancelEvent;
+}
+
+/**
+ * Converts a raw event log for a cancelUpTo event into an
+ * ExchangeCancelUpToEvent entity.
+ * @param eventLog Raw event log (e.g. returned from contract-wrappers).
+ */
+export function _convertToExchangeCancelUpToEvent(
+    eventLog: LogWithDecodedArgs<ExchangeCancelUpToEventArgs>,
+): ExchangeCancelUpToEvent {
+    const exchangeCancelUpToEvent = new ExchangeCancelUpToEvent();
+    exchangeCancelUpToEvent.contractAddress = eventLog.address as string;
+    exchangeCancelUpToEvent.blockNumber = eventLog.blockNumber as number;
+    exchangeCancelUpToEvent.logIndex = eventLog.logIndex as number;
+    exchangeCancelUpToEvent.rawData = eventLog.data as string;
+    exchangeCancelUpToEvent.transactionHash = eventLog.transactionHash;
+    exchangeCancelUpToEvent.makerAddress = eventLog.args.makerAddress;
+    exchangeCancelUpToEvent.senderAddress = eventLog.args.senderAddress;
+    exchangeCancelUpToEvent.orderEpoch = eventLog.args.orderEpoch;
+    return exchangeCancelUpToEvent;
+}
diff --git a/packages/pipeline/src/parsers/events/index.ts b/packages/pipeline/src/parsers/events/index.ts
index e18106c75..3f9915e8b 100644
--- a/packages/pipeline/src/parsers/events/index.ts
+++ b/packages/pipeline/src/parsers/events/index.ts
@@ -1,133 +1,2 @@
-import { ExchangeCancelEventArgs, ExchangeCancelUpToEventArgs, ExchangeFillEventArgs } from '@0x/contract-wrappers';
-import { assetDataUtils } from '@0x/order-utils';
-import { AssetProxyId, ERC721AssetData } from '@0x/types';
-import { LogWithDecodedArgs } from 'ethereum-types';
-import * as R from 'ramda';
-
-import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeFillEvent } from '../../entities';
-import { bigNumbertoStringOrNull } from '../../utils';
-
-/**
- * Parses raw event logs for a fill event and returns an array of
- * ExchangeFillEvent entities.
- * @param eventLogs Raw event logs (e.g. returned from contract-wrappers).
- */
-export const parseExchangeFillEvents: (
-    eventLogs: Array<LogWithDecodedArgs<ExchangeFillEventArgs>>,
-) => ExchangeFillEvent[] = R.map(_convertToExchangeFillEvent);
-
-/**
- * Parses raw event logs for a cancel event and returns an array of
- * ExchangeCancelEvent entities.
- * @param eventLogs Raw event logs (e.g. returned from contract-wrappers).
- */
-export const parseExchangeCancelEvents: (
-    eventLogs: Array<LogWithDecodedArgs<ExchangeCancelEventArgs>>,
-) => ExchangeCancelEvent[] = R.map(_convertToExchangeCancelEvent);
-
-/**
- * Parses raw event logs for a CancelUpTo event and returns an array of
- * ExchangeCancelUpToEvent entities.
- * @param eventLogs Raw event logs (e.g. returned from contract-wrappers).
- */
-export const parseExchangeCancelUpToEvents: (
-    eventLogs: Array<LogWithDecodedArgs<ExchangeCancelUpToEventArgs>>,
-) => ExchangeCancelUpToEvent[] = R.map(_convertToExchangeCancelUpToEvent);
-
-/**
- * Converts a raw event log for a fill event into an ExchangeFillEvent entity.
- * @param eventLog Raw event log (e.g. returned from contract-wrappers).
- */
-export function _convertToExchangeFillEvent(eventLog: LogWithDecodedArgs<ExchangeFillEventArgs>): ExchangeFillEvent {
-    const makerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.makerAssetData);
-    const makerAssetType = makerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721';
-    const takerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.takerAssetData);
-    const takerAssetType = takerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721';
-    const exchangeFillEvent = new ExchangeFillEvent();
-    exchangeFillEvent.contractAddress = eventLog.address as string;
-    exchangeFillEvent.blockNumber = eventLog.blockNumber as number;
-    exchangeFillEvent.logIndex = eventLog.logIndex as number;
-    exchangeFillEvent.rawData = eventLog.data as string;
-    exchangeFillEvent.transactionHash = eventLog.transactionHash;
-    exchangeFillEvent.makerAddress = eventLog.args.makerAddress;
-    exchangeFillEvent.takerAddress = eventLog.args.takerAddress;
-    exchangeFillEvent.feeRecipientAddress = eventLog.args.feeRecipientAddress;
-    exchangeFillEvent.senderAddress = eventLog.args.senderAddress;
-    exchangeFillEvent.makerAssetFilledAmount = eventLog.args.makerAssetFilledAmount;
-    exchangeFillEvent.takerAssetFilledAmount = eventLog.args.takerAssetFilledAmount;
-    exchangeFillEvent.makerFeePaid = eventLog.args.makerFeePaid;
-    exchangeFillEvent.takerFeePaid = eventLog.args.takerFeePaid;
-    exchangeFillEvent.orderHash = eventLog.args.orderHash;
-    exchangeFillEvent.rawMakerAssetData = eventLog.args.makerAssetData;
-    exchangeFillEvent.makerAssetType = makerAssetType;
-    exchangeFillEvent.makerAssetProxyId = makerAssetData.assetProxyId;
-    exchangeFillEvent.makerTokenAddress = makerAssetData.tokenAddress;
-    // tslint has a false positive here. Type assertion is required.
-    // tslint:disable-next-line:no-unnecessary-type-assertion
-    exchangeFillEvent.makerTokenId = bigNumbertoStringOrNull((makerAssetData as ERC721AssetData).tokenId);
-    exchangeFillEvent.rawTakerAssetData = eventLog.args.takerAssetData;
-    exchangeFillEvent.takerAssetType = takerAssetType;
-    exchangeFillEvent.takerAssetProxyId = takerAssetData.assetProxyId;
-    exchangeFillEvent.takerTokenAddress = takerAssetData.tokenAddress;
-    // tslint:disable-next-line:no-unnecessary-type-assertion
-    exchangeFillEvent.takerTokenId = bigNumbertoStringOrNull((takerAssetData as ERC721AssetData).tokenId);
-    return exchangeFillEvent;
-}
-
-/**
- * Converts a raw event log for a cancel event into an ExchangeCancelEvent
- * entity.
- * @param eventLog Raw event log (e.g. returned from contract-wrappers).
- */
-export function _convertToExchangeCancelEvent(
-    eventLog: LogWithDecodedArgs<ExchangeCancelEventArgs>,
-): ExchangeCancelEvent {
-    const makerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.makerAssetData);
-    const makerAssetType = makerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721';
-    const takerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.takerAssetData);
-    const takerAssetType = takerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721';
-    const exchangeCancelEvent = new ExchangeCancelEvent();
-    exchangeCancelEvent.contractAddress = eventLog.address as string;
-    exchangeCancelEvent.blockNumber = eventLog.blockNumber as number;
-    exchangeCancelEvent.logIndex = eventLog.logIndex as number;
-    exchangeCancelEvent.rawData = eventLog.data as string;
-    exchangeCancelEvent.transactionHash = eventLog.transactionHash;
-    exchangeCancelEvent.makerAddress = eventLog.args.makerAddress;
-    exchangeCancelEvent.takerAddress = eventLog.args.takerAddress;
-    exchangeCancelEvent.feeRecipientAddress = eventLog.args.feeRecipientAddress;
-    exchangeCancelEvent.senderAddress = eventLog.args.senderAddress;
-    exchangeCancelEvent.orderHash = eventLog.args.orderHash;
-    exchangeCancelEvent.rawMakerAssetData = eventLog.args.makerAssetData;
-    exchangeCancelEvent.makerAssetType = makerAssetType;
-    exchangeCancelEvent.makerAssetProxyId = makerAssetData.assetProxyId;
-    exchangeCancelEvent.makerTokenAddress = makerAssetData.tokenAddress;
-    // tslint:disable-next-line:no-unnecessary-type-assertion
-    exchangeCancelEvent.makerTokenId = bigNumbertoStringOrNull((makerAssetData as ERC721AssetData).tokenId);
-    exchangeCancelEvent.rawTakerAssetData = eventLog.args.takerAssetData;
-    exchangeCancelEvent.takerAssetType = takerAssetType;
-    exchangeCancelEvent.takerAssetProxyId = takerAssetData.assetProxyId;
-    exchangeCancelEvent.takerTokenAddress = takerAssetData.tokenAddress;
-    // tslint:disable-next-line:no-unnecessary-type-assertion
-    exchangeCancelEvent.takerTokenId = bigNumbertoStringOrNull((takerAssetData as ERC721AssetData).tokenId);
-    return exchangeCancelEvent;
-}
-
-/**
- * Converts a raw event log for a cancelUpTo event into an
- * ExchangeCancelUpToEvent entity.
- * @param eventLog Raw event log (e.g. returned from contract-wrappers).
- */
-export function _convertToExchangeCancelUpToEvent(
-    eventLog: LogWithDecodedArgs<ExchangeCancelUpToEventArgs>,
-): ExchangeCancelUpToEvent {
-    const exchangeCancelUpToEvent = new ExchangeCancelUpToEvent();
-    exchangeCancelUpToEvent.contractAddress = eventLog.address as string;
-    exchangeCancelUpToEvent.blockNumber = eventLog.blockNumber as number;
-    exchangeCancelUpToEvent.logIndex = eventLog.logIndex as number;
-    exchangeCancelUpToEvent.rawData = eventLog.data as string;
-    exchangeCancelUpToEvent.transactionHash = eventLog.transactionHash;
-    exchangeCancelUpToEvent.makerAddress = eventLog.args.makerAddress;
-    exchangeCancelUpToEvent.senderAddress = eventLog.args.senderAddress;
-    exchangeCancelUpToEvent.orderEpoch = eventLog.args.orderEpoch;
-    return exchangeCancelUpToEvent;
-}
+export { parseExchangeCancelEvents, parseExchangeCancelUpToEvents, parseExchangeFillEvents } from './exchange_events';
+export { parseERC20ApprovalEvents } from './erc20_events';
diff --git a/packages/pipeline/src/scripts/pull_erc20_events.ts b/packages/pipeline/src/scripts/pull_erc20_events.ts
new file mode 100644
index 000000000..0ad12c97a
--- /dev/null
+++ b/packages/pipeline/src/scripts/pull_erc20_events.ts
@@ -0,0 +1,66 @@
+// tslint:disable:no-console
+import { getContractAddressesForNetworkOrThrow } from '@0x/contract-addresses';
+import { web3Factory } from '@0x/dev-utils';
+import { Web3ProviderEngine } from '@0x/subproviders';
+import { Web3Wrapper } from '@0x/web3-wrapper';
+import 'reflect-metadata';
+import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm';
+
+import { ERC20EventsSource } from '../data_sources/contract-wrappers/erc20_events';
+import { ERC20ApprovalEvent } from '../entities';
+import * as ormConfig from '../ormconfig';
+import { parseERC20ApprovalEvents } from '../parsers/events';
+import { handleError, INFURA_ROOT_URL } from '../utils';
+
+const NETWORK_ID = 1;
+const START_BLOCK_OFFSET = 100; // Number of blocks before the last known block to consider when updating fill events.
+const BATCH_SAVE_SIZE = 1000; // Number of events to save at once.
+const BLOCK_FINALITY_THRESHOLD = 10; // When to consider blocks as final. Used to compute default endBlock.
+const WETH_START_BLOCK = 4719568; // Block number when the WETH contract was deployed.
+
+let connection: Connection;
+
+(async () => {
+    connection = await createConnection(ormConfig as ConnectionOptions);
+    const provider = web3Factory.getRpcProvider({
+        rpcUrl: INFURA_ROOT_URL,
+    });
+    const endBlock = await calculateEndBlockAsync(provider);
+    await getAndSaveWETHApprovalEventsAsync(provider, endBlock);
+    process.exit(0);
+})().catch(handleError);
+
+async function getAndSaveWETHApprovalEventsAsync(provider: Web3ProviderEngine, endBlock: number): Promise<void> {
+    console.log('Checking existing approval events...');
+    const repository = connection.getRepository(ERC20ApprovalEvent);
+    const startBlock = (await getStartBlockAsync(repository)) || WETH_START_BLOCK;
+
+    console.log(`Getting WETH approval events starting at ${startBlock}...`);
+    const wethTokenAddress = getContractAddressesForNetworkOrThrow(NETWORK_ID).etherToken;
+    const eventsSource = new ERC20EventsSource(provider, NETWORK_ID, wethTokenAddress);
+    const eventLogs = await eventsSource.getApprovalEventsAsync(startBlock, endBlock);
+
+    console.log(`Parsing ${eventLogs.length} WETH approval events...`);
+    const events = parseERC20ApprovalEvents(eventLogs);
+    console.log(`Retrieved and parsed ${events.length} total WETH approval events.`);
+    await repository.save(events, { chunk: Math.ceil(events.length / BATCH_SAVE_SIZE) });
+}
+
+async function calculateEndBlockAsync(provider: Web3ProviderEngine): Promise<number> {
+    const web3Wrapper = new Web3Wrapper(provider);
+    const currentBlock = await web3Wrapper.getBlockNumberAsync();
+    return currentBlock - BLOCK_FINALITY_THRESHOLD;
+}
+
+async function getStartBlockAsync(repository: Repository<ERC20ApprovalEvent>): Promise<number | null> {
+    const fillEventCount = await repository.count();
+    if (fillEventCount === 0) {
+        console.log(`No existing approval events found.`);
+        return null;
+    }
+    const queryResult = await connection.query(
+        `SELECT block_number FROM raw.erc20_approval_events ORDER BY block_number DESC LIMIT 1`,
+    );
+    const lastKnownBlock = queryResult[0].block_number;
+    return lastKnownBlock - START_BLOCK_OFFSET;
+}
diff --git a/packages/pipeline/src/scripts/pull_exchange_events.ts b/packages/pipeline/src/scripts/pull_exchange_events.ts
new file mode 100644
index 000000000..e98fc6629
--- /dev/null
+++ b/packages/pipeline/src/scripts/pull_exchange_events.ts
@@ -0,0 +1,146 @@
+// tslint:disable:no-console
+import { web3Factory } from '@0x/dev-utils';
+import { Web3ProviderEngine } from '@0x/subproviders';
+import { Web3Wrapper } from '@0x/web3-wrapper';
+import R = require('ramda');
+import 'reflect-metadata';
+import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm';
+
+import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events';
+import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeEvent, ExchangeFillEvent } from '../entities';
+import * as ormConfig from '../ormconfig';
+import { parseExchangeCancelEvents, parseExchangeCancelUpToEvents, parseExchangeFillEvents } from '../parsers/events';
+import { EXCHANGE_START_BLOCK, handleError, INFURA_ROOT_URL } from '../utils';
+
+const START_BLOCK_OFFSET = 100; // Number of blocks before the last known block to consider when updating fill events.
+const BATCH_SAVE_SIZE = 1000; // Number of events to save at once.
+const BLOCK_FINALITY_THRESHOLD = 10; // When to consider blocks as final. Used to compute default endBlock.
+
+let connection: Connection;
+
+(async () => {
+    connection = await createConnection(ormConfig as ConnectionOptions);
+    const provider = web3Factory.getRpcProvider({
+        rpcUrl: INFURA_ROOT_URL,
+    });
+    const endBlock = await calculateEndBlockAsync(provider);
+    const eventsSource = new ExchangeEventsSource(provider, 1);
+    await getFillEventsAsync(eventsSource, endBlock);
+    await getCancelEventsAsync(eventsSource, endBlock);
+    await getCancelUpToEventsAsync(eventsSource, endBlock);
+    process.exit(0);
+})().catch(handleError);
+
+async function getFillEventsAsync(eventsSource: ExchangeEventsSource, endBlock: number): Promise<void> {
+    console.log('Checking existing fill events...');
+    const repository = connection.getRepository(ExchangeFillEvent);
+    const startBlock = await getStartBlockAsync(repository);
+    console.log(`Getting fill events starting at ${startBlock}...`);
+    const eventLogs = await eventsSource.getFillEventsAsync(startBlock, endBlock);
+    console.log('Parsing fill events...');
+    const events = parseExchangeFillEvents(eventLogs);
+    console.log(`Retrieved and parsed ${events.length} total fill events.`);
+    await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events);
+}
+
+async function getCancelEventsAsync(eventsSource: ExchangeEventsSource, endBlock: number): Promise<void> {
+    console.log('Checking existing cancel events...');
+    const repository = connection.getRepository(ExchangeCancelEvent);
+    const startBlock = await getStartBlockAsync(repository);
+    console.log(`Getting cancel events starting at ${startBlock}...`);
+    const eventLogs = await eventsSource.getCancelEventsAsync(startBlock, endBlock);
+    console.log('Parsing cancel events...');
+    const events = parseExchangeCancelEvents(eventLogs);
+    console.log(`Retrieved and parsed ${events.length} total cancel events.`);
+    await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events);
+}
+
+async function getCancelUpToEventsAsync(eventsSource: ExchangeEventsSource, endBlock: number): Promise<void> {
+    console.log('Checking existing CancelUpTo events...');
+    const repository = connection.getRepository(ExchangeCancelUpToEvent);
+    const startBlock = await getStartBlockAsync(repository);
+    console.log(`Getting CancelUpTo events starting at ${startBlock}...`);
+    const eventLogs = await eventsSource.getCancelUpToEventsAsync(startBlock, endBlock);
+    console.log('Parsing CancelUpTo events...');
+    const events = parseExchangeCancelUpToEvents(eventLogs);
+    console.log(`Retrieved and parsed ${events.length} total CancelUpTo events.`);
+    await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events);
+}
+
+const tableNameRegex = /^[a-zA-Z_]*$/;
+
+async function getStartBlockAsync<T extends ExchangeEvent>(repository: Repository<T>): Promise<number> {
+    const fillEventCount = await repository.count();
+    if (fillEventCount === 0) {
+        console.log(`No existing ${repository.metadata.name}s found.`);
+        return EXCHANGE_START_BLOCK;
+    }
+    const tableName = repository.metadata.tableName;
+    if (!tableNameRegex.test(tableName)) {
+        throw new Error(`Unexpected special character in table name: ${tableName}`);
+    }
+    const queryResult = await connection.query(
+        `SELECT block_number FROM raw.${tableName} ORDER BY block_number DESC LIMIT 1`,
+    );
+    const lastKnownBlock = queryResult[0].block_number;
+    return lastKnownBlock - START_BLOCK_OFFSET;
+}
+
+async function saveEventsAsync<T extends ExchangeEvent>(
+    isInitialPull: boolean,
+    repository: Repository<T>,
+    events: T[],
+): Promise<void> {
+    console.log(`Saving ${repository.metadata.name}s...`);
+    if (isInitialPull) {
+        // Split data into numChunks pieces of maximum size BATCH_SAVE_SIZE
+        // each.
+        for (const eventsBatch of R.splitEvery(BATCH_SAVE_SIZE, events)) {
+            await repository.insert(eventsBatch);
+        }
+    } else {
+        // If we possibly have some overlap where we need to update some
+        // existing events, we need to use our workaround/fallback.
+        await saveIndividuallyWithFallbackAsync(repository, events);
+    }
+    const totalEvents = await repository.count();
+    console.log(`Done saving events. There are now ${totalEvents} total ${repository.metadata.name}s.`);
+}
+
+async function saveIndividuallyWithFallbackAsync<T extends ExchangeEvent>(
+    repository: Repository<T>,
+    events: T[],
+): Promise<void> {
+    // Note(albrow): This is a temporary hack because `save` is not working as
+    // documented and is causing a foreign key constraint violation. Hopefully
+    // can remove later because this "poor man's upsert" implementation operates
+    // on one event at a time and is therefore much slower.
+    for (const event of events) {
+        try {
+            // First try an insert.
+            await repository.insert(event);
+        } catch {
+            // If it fails, assume it was a foreign key constraint error and try
+            // doing an update instead.
+            // Note(albrow): Unfortunately the `as any` hack here seems
+            // required. I can't figure out how to convince the type-checker
+            // that the criteria and the entity itself are the correct type for
+            // the given repository. If we can remove the `save` hack then this
+            // will probably no longer be necessary.
+            await repository.update(
+                {
+                    contractAddress: event.contractAddress,
+                    blockNumber: event.blockNumber,
+                    logIndex: event.logIndex,
+                } as any,
+                event as any,
+            );
+        }
+    }
+}
+
+async function calculateEndBlockAsync(provider: Web3ProviderEngine): Promise<number> {
+    const web3Wrapper = new Web3Wrapper(provider);
+    const currentBlock = await web3Wrapper.getBlockNumberAsync();
+    return currentBlock - BLOCK_FINALITY_THRESHOLD;
+}
diff --git a/packages/pipeline/src/scripts/pull_missing_blocks.ts b/packages/pipeline/src/scripts/pull_missing_blocks.ts
index b7bd51f08..18fe1b700 100644
--- a/packages/pipeline/src/scripts/pull_missing_blocks.ts
+++ b/packages/pipeline/src/scripts/pull_missing_blocks.ts
@@ -24,7 +24,7 @@ let connection: Connection;
 (async () => {
     connection = await createConnection(ormConfig as ConnectionOptions);
     const provider = web3Factory.getRpcProvider({
-        rpcUrl: `${INFURA_ROOT_URL}/${process.env.INFURA_API_KEY}`,
+        rpcUrl: INFURA_ROOT_URL,
     });
     const web3Source = new Web3Source(provider);
     await getAllMissingBlocks(web3Source);
diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts
deleted file mode 100644
index 80abbb8b0..000000000
--- a/packages/pipeline/src/scripts/pull_missing_events.ts
+++ /dev/null
@@ -1,136 +0,0 @@
-// tslint:disable:no-console
-import { web3Factory } from '@0x/dev-utils';
-import R = require('ramda');
-import 'reflect-metadata';
-import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm';
-
-import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events';
-import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeEvent, ExchangeFillEvent } from '../entities';
-import * as ormConfig from '../ormconfig';
-import { parseExchangeCancelEvents, parseExchangeCancelUpToEvents, parseExchangeFillEvents } from '../parsers/events';
-import { EXCHANGE_START_BLOCK, handleError, INFURA_ROOT_URL } from '../utils';
-
-const START_BLOCK_OFFSET = 100; // Number of blocks before the last known block to consider when updating fill events.
-const BATCH_SAVE_SIZE = 1000; // Number of events to save at once.
-
-let connection: Connection;
-
-(async () => {
-    connection = await createConnection(ormConfig as ConnectionOptions);
-    const provider = web3Factory.getRpcProvider({
-        rpcUrl: INFURA_ROOT_URL,
-    });
-    const eventsSource = new ExchangeEventsSource(provider, 1);
-    await getFillEventsAsync(eventsSource);
-    await getCancelEventsAsync(eventsSource);
-    await getCancelUpToEventsAsync(eventsSource);
-    process.exit(0);
-})().catch(handleError);
-
-async function getFillEventsAsync(eventsSource: ExchangeEventsSource): Promise<void> {
-    console.log('Checking existing fill events...');
-    const repository = connection.getRepository(ExchangeFillEvent);
-    const startBlock = await getStartBlockAsync(repository);
-    console.log(`Getting fill events starting at ${startBlock}...`);
-    const eventLogs = await eventsSource.getFillEventsAsync(startBlock);
-    console.log('Parsing fill events...');
-    const events = parseExchangeFillEvents(eventLogs);
-    console.log(`Retrieved and parsed ${events.length} total fill events.`);
-    await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events);
-}
-
-async function getCancelEventsAsync(eventsSource: ExchangeEventsSource): Promise<void> {
-    console.log('Checking existing cancel events...');
-    const repository = connection.getRepository(ExchangeCancelEvent);
-    const startBlock = await getStartBlockAsync(repository);
-    console.log(`Getting cancel events starting at ${startBlock}...`);
-    const eventLogs = await eventsSource.getCancelEventsAsync(startBlock);
-    console.log('Parsing cancel events...');
-    const events = parseExchangeCancelEvents(eventLogs);
-    console.log(`Retrieved and parsed ${events.length} total cancel events.`);
-    await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events);
-}
-
-async function getCancelUpToEventsAsync(eventsSource: ExchangeEventsSource): Promise<void> {
-    console.log('Checking existing CancelUpTo events...');
-    const repository = connection.getRepository(ExchangeCancelUpToEvent);
-    const startBlock = await getStartBlockAsync(repository);
-    console.log(`Getting CancelUpTo events starting at ${startBlock}...`);
-    const eventLogs = await eventsSource.getCancelUpToEventsAsync(startBlock);
-    console.log('Parsing CancelUpTo events...');
-    const events = parseExchangeCancelUpToEvents(eventLogs);
-    console.log(`Retrieved and parsed ${events.length} total CancelUpTo events.`);
-    await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events);
-}
-
-const tableNameRegex = /^[a-zA-Z_]*$/;
-
-async function getStartBlockAsync<T extends ExchangeEvent>(repository: Repository<T>): Promise<number> {
-    const fillEventCount = await repository.count();
-    if (fillEventCount === 0) {
-        console.log(`No existing ${repository.metadata.name}s found.`);
-        return EXCHANGE_START_BLOCK;
-    }
-    const tableName = repository.metadata.tableName;
-    if (!tableNameRegex.test(tableName)) {
-        throw new Error(`Unexpected special character in table name: ${tableName}`);
-    }
-    const queryResult = await connection.query(
-        `SELECT block_number FROM raw.${tableName} ORDER BY block_number DESC LIMIT 1`,
-    );
-    const lastKnownBlock = queryResult[0].block_number;
-    return lastKnownBlock - START_BLOCK_OFFSET;
-}
-
-async function saveEventsAsync<T extends ExchangeEvent>(
-    isInitialPull: boolean,
-    repository: Repository<T>,
-    events: T[],
-): Promise<void> {
-    console.log(`Saving ${repository.metadata.name}s...`);
-    if (isInitialPull) {
-        // Split data into numChunks pieces of maximum size BATCH_SAVE_SIZE
-        // each.
-        for (const eventsBatch of R.splitEvery(BATCH_SAVE_SIZE, events)) {
-            await repository.insert(eventsBatch);
-        }
-    } else {
-        // If we possibly have some overlap where we need to update some
-        // existing events, we need to use our workaround/fallback.
-        await saveIndividuallyWithFallbackAsync(repository, events);
-    }
-    const totalEvents = await repository.count();
-    console.log(`Done saving events. There are now ${totalEvents} total ${repository.metadata.name}s.`);
-}
-
-async function saveIndividuallyWithFallbackAsync<T extends ExchangeEvent>(
-    repository: Repository<T>,
-    events: T[],
-): Promise<void> {
-    // Note(albrow): This is a temporary hack because `save` is not working as
-    // documented and is causing a foreign key constraint violation. Hopefully
-    // can remove later because this "poor man's upsert" implementation operates
-    // on one event at a time and is therefore much slower.
-    for (const event of events) {
-        try {
-            // First try an insert.
-            await repository.insert(event);
-        } catch {
-            // If it fails, assume it was a foreign key constraint error and try
-            // doing an update instead.
-            // Note(albrow): Unfortunately the `as any` hack here seems
-            // required. I can't figure out how to convince the type-checker
-            // that the criteria and the entity itself are the correct type for
-            // the given repository. If we can remove the `save` hack then this
-            // will probably no longer be necessary.
-            await repository.update(
-                {
-                    contractAddress: event.contractAddress,
-                    blockNumber: event.blockNumber,
-                    logIndex: event.logIndex,
-                } as any,
-                event as any,
-            );
-        }
-    }
-}
diff --git a/packages/pipeline/test/data_sources/contract-wrappers/utils_test.ts b/packages/pipeline/test/data_sources/contract-wrappers/utils_test.ts
new file mode 100644
index 000000000..06f1a5e86
--- /dev/null
+++ b/packages/pipeline/test/data_sources/contract-wrappers/utils_test.ts
@@ -0,0 +1,109 @@
+// tslint:disable:custom-no-magic-numbers
+import * as chai from 'chai';
+import { LogWithDecodedArgs } from 'ethereum-types';
+import 'mocha';
+
+import { _getEventsWithRetriesAsync } from '../../../src/data_sources/contract-wrappers/utils';
+import { chaiSetup } from '../../utils/chai_setup';
+
+chaiSetup.configure();
+const expect = chai.expect;
+
+const retryableMessage = 'network timeout: (simulated network timeout error)';
+const retryableError = new Error(retryableMessage);
+
+describe('data_sources/contract-wrappers/utils', () => {
+    describe('_getEventsWithRetriesAsync', () => {
+        it('sends a single request if it was successful', async () => {
+            // Pre-declare values for the fromBlock and toBlock arguments.
+            const expectedFromBlock = 100;
+            const expectedToBlock = 200;
+            const expectedLogs: Array<LogWithDecodedArgs<any>> = [
+                {
+                    logIndex: 123,
+                    transactionIndex: 456,
+                    transactionHash: '0x6dd106d002873746072fc5e496dd0fb2541b68c77bcf9184ae19a42fd33657fe',
+                    blockHash: '0x6dd106d002873746072fc5e496dd0fb2541b68c77bcf9184ae19a42fd33657ff',
+                    blockNumber: 789,
+                    address: '0x6dd106d002873746072fc5e496dd0fb2541b68c77bcf9184ae19a42fd3365800',
+                    data: 'fake raw data',
+                    topics: [],
+                    event: 'TEST_EVENT',
+                    args: [1, 2, 3],
+                },
+            ];
+
+            // mockGetEventsAsync checks its arguments, increments `callCount`
+            // and returns `expectedLogs`.
+            let callCount = 0;
+            const mockGetEventsAsync = async (
+                fromBlock: number,
+                toBlock: number,
+            ): Promise<Array<LogWithDecodedArgs<any>>> => {
+                expect(fromBlock).equals(expectedFromBlock);
+                expect(toBlock).equals(expectedToBlock);
+                callCount += 1;
+                return expectedLogs;
+            };
+
+            // Make sure that we get what we expected and that the mock function
+            // was called exactly once.
+            const gotLogs = await _getEventsWithRetriesAsync(mockGetEventsAsync, 3, expectedFromBlock, expectedToBlock);
+            expect(gotLogs).deep.equals(expectedLogs);
+            expect(callCount).equals(
+                1,
+                'getEventsAsync function was called more than once even though it was successful',
+            );
+        });
+        it('retries and eventually succeeds', async () => {
+            const numRetries = 5;
+            let callCount = 0;
+            // mockGetEventsAsync throws unless callCount == numRetries + 1.
+            const mockGetEventsAsync = async (
+                _fromBlock: number,
+                _toBlock: number,
+            ): Promise<Array<LogWithDecodedArgs<any>>> => {
+                callCount += 1;
+                if (callCount === numRetries + 1) {
+                    return [];
+                }
+                throw retryableError;
+            };
+            await _getEventsWithRetriesAsync(mockGetEventsAsync, numRetries, 100, 300);
+            expect(callCount).equals(numRetries + 1, 'getEventsAsync function was called the wrong number of times');
+        });
+        it('throws for non-retryable errors', async () => {
+            const numRetries = 5;
+            const expectedMessage = 'Non-retryable error';
+            // mockGetEventsAsync always throws a non-retryable error.
+            const mockGetEventsAsync = async (
+                _fromBlock: number,
+                _toBlock: number,
+            ): Promise<Array<LogWithDecodedArgs<any>>> => {
+                throw new Error(expectedMessage);
+            };
+            // Note(albrow): This does actually return a promise (or at least a
+            // "promise-like object" and is a false positive in TSLint.
+            // tslint:disable-next-line:await-promise
+            await expect(_getEventsWithRetriesAsync(mockGetEventsAsync, numRetries, 100, 300)).to.be.rejectedWith(
+                expectedMessage,
+            );
+        });
+        it('throws after too many retries', async () => {
+            const numRetries = 5;
+            // mockGetEventsAsync always throws a retryable error.
+            const mockGetEventsAsync = async (
+                _fromBlock: number,
+                _toBlock: number,
+            ): Promise<Array<LogWithDecodedArgs<any>>> => {
+                throw retryableError;
+            };
+            // Note(albrow): This does actually return a promise (or at least a
+            // "promise-like object" and is a false positive in TSLint.
+            // tslint:disable-next-line:await-promise
+            await expect(_getEventsWithRetriesAsync(mockGetEventsAsync, numRetries, 100, 300)).to.be.rejectedWith(
+                retryableMessage,
+            );
+        });
+    });
+});
diff --git a/packages/pipeline/test/entities/erc20_approval_events_test.ts b/packages/pipeline/test/entities/erc20_approval_events_test.ts
new file mode 100644
index 000000000..1ecf41ee5
--- /dev/null
+++ b/packages/pipeline/test/entities/erc20_approval_events_test.ts
@@ -0,0 +1,29 @@
+import { BigNumber } from '@0x/utils';
+import 'mocha';
+import 'reflect-metadata';
+
+import { ERC20ApprovalEvent } from '../../src/entities';
+import { createDbConnectionOnceAsync } from '../db_setup';
+import { chaiSetup } from '../utils/chai_setup';
+
+import { testSaveAndFindEntityAsync } from './util';
+
+chaiSetup.configure();
+
+// tslint:disable:custom-no-magic-numbers
+describe('ERC20ApprovalEvent entity', () => {
+    it('save/find', async () => {
+        const connection = await createDbConnectionOnceAsync();
+        const event = new ERC20ApprovalEvent();
+        event.tokenAddress = '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2';
+        event.blockNumber = 6281577;
+        event.rawData = '0x000000000000000000000000000000000000000000000002b9cba5ee21ad3df9';
+        event.logIndex = 43;
+        event.transactionHash = '0xcb46b19c786376a0a0140d51e3e606a4c4f926d8ca5434e96d2f69d04d8d9c7f';
+        event.ownerAddress = '0x0b65c5f6f3a05d6be5588a72b603360773b3fe04';
+        event.spenderAddress = '0x448a5065aebb8e423f0896e6c5d525c040f59af3';
+        event.amount = new BigNumber('50281464906893835769');
+        const blocksRepository = connection.getRepository(ERC20ApprovalEvent);
+        await testSaveAndFindEntityAsync(blocksRepository, event);
+    });
+});
diff --git a/packages/pipeline/test/parsers/bloxy/index_test.ts b/packages/pipeline/test/parsers/bloxy/index_test.ts
index 2b8d68f98..6aabb091d 100644
--- a/packages/pipeline/test/parsers/bloxy/index_test.ts
+++ b/packages/pipeline/test/parsers/bloxy/index_test.ts
@@ -7,7 +7,6 @@ import * as R from 'ramda';
 import { BLOXY_DEX_TRADES_URL, BloxyTrade } from '../../../src/data_sources/bloxy';
 import { DexTrade } from '../../../src/entities';
 import { _parseBloxyTrade } from '../../../src/parsers/bloxy';
-import { _convertToExchangeFillEvent } from '../../../src/parsers/events';
 import { chaiSetup } from '../../utils/chai_setup';
 
 chaiSetup.configure();
diff --git a/packages/pipeline/test/parsers/events/erc20_events_test.ts b/packages/pipeline/test/parsers/events/erc20_events_test.ts
new file mode 100644
index 000000000..962c50f98
--- /dev/null
+++ b/packages/pipeline/test/parsers/events/erc20_events_test.ts
@@ -0,0 +1,54 @@
+import { ERC20TokenApprovalEventArgs } from '@0x/contract-wrappers';
+import { BigNumber } from '@0x/utils';
+import * as chai from 'chai';
+import { LogWithDecodedArgs } from 'ethereum-types';
+import 'mocha';
+
+import { ERC20ApprovalEvent } from '../../../src/entities';
+import { _convertToERC20ApprovalEvent } from '../../../src/parsers/events/erc20_events';
+import { _convertToExchangeFillEvent } from '../../../src/parsers/events/exchange_events';
+import { chaiSetup } from '../../utils/chai_setup';
+
+chaiSetup.configure();
+const expect = chai.expect;
+
+// tslint:disable:custom-no-magic-numbers
+describe('erc20_events', () => {
+    describe('_convertToERC20ApprovalEvent', () => {
+        it('converts LogWithDecodedArgs to ERC20ApprovalEvent entity', () => {
+            const input: LogWithDecodedArgs<ERC20TokenApprovalEventArgs> = {
+                address: '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2',
+                blockHash: '0xd2d7aafaa7102aec0bca8ef026d5a85133e87892334c46ee1e92e42912991c9b',
+                blockNumber: 6281577,
+                data: '0x000000000000000000000000000000000000000000000002b9cba5ee21ad3df9',
+                logIndex: 43,
+                topics: [
+                    '0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925',
+                    '0x0000000000000000000000000b65c5f6f3a05d6be5588a72b603360773b3fe04',
+                    '0x000000000000000000000000448a5065aebb8e423f0896e6c5d525c040f59af3',
+                ],
+                transactionHash: '0xcb46b19c786376a0a0140d51e3e606a4c4f926d8ca5434e96d2f69d04d8d9c7f',
+                transactionIndex: 103,
+                event: 'Approval',
+                args: {
+                    _owner: '0x0b65c5f6f3a05d6be5588a72b603360773b3fe04',
+                    _spender: '0x448a5065aebb8e423f0896e6c5d525c040f59af3',
+                    _value: new BigNumber('50281464906893835769'),
+                },
+            };
+
+            const expected = new ERC20ApprovalEvent();
+            expected.tokenAddress = '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2';
+            expected.blockNumber = 6281577;
+            expected.rawData = '0x000000000000000000000000000000000000000000000002b9cba5ee21ad3df9';
+            expected.logIndex = 43;
+            expected.transactionHash = '0xcb46b19c786376a0a0140d51e3e606a4c4f926d8ca5434e96d2f69d04d8d9c7f';
+            expected.ownerAddress = '0x0b65c5f6f3a05d6be5588a72b603360773b3fe04';
+            expected.spenderAddress = '0x448a5065aebb8e423f0896e6c5d525c040f59af3';
+            expected.amount = new BigNumber('50281464906893835769');
+
+            const actual = _convertToERC20ApprovalEvent(input);
+            expect(actual).deep.equal(expected);
+        });
+    });
+});
diff --git a/packages/pipeline/test/parsers/events/exchange_events_test.ts b/packages/pipeline/test/parsers/events/exchange_events_test.ts
new file mode 100644
index 000000000..5d4b185a5
--- /dev/null
+++ b/packages/pipeline/test/parsers/events/exchange_events_test.ts
@@ -0,0 +1,78 @@
+import { ExchangeFillEventArgs } from '@0x/contract-wrappers';
+import { BigNumber } from '@0x/utils';
+import * as chai from 'chai';
+import { LogWithDecodedArgs } from 'ethereum-types';
+import 'mocha';
+
+import { ExchangeFillEvent } from '../../../src/entities';
+import { _convertToExchangeFillEvent } from '../../../src/parsers/events/exchange_events';
+import { chaiSetup } from '../../utils/chai_setup';
+
+chaiSetup.configure();
+const expect = chai.expect;
+
+// tslint:disable:custom-no-magic-numbers
+describe('exchange_events', () => {
+    describe('_convertToExchangeFillEvent', () => {
+        it('converts LogWithDecodedArgs to ExchangeFillEvent entity', () => {
+            const input: LogWithDecodedArgs<ExchangeFillEventArgs> = {
+                logIndex: 102,
+                transactionIndex: 38,
+                transactionHash: '0x6dd106d002873746072fc5e496dd0fb2541b68c77bcf9184ae19a42fd33657fe',
+                blockHash: '',
+                blockNumber: 6276262,
+                address: '0x4f833a24e1f95d70f028921e27040ca56e09ab0b',
+                data:
+                    '0x000000000000000000000000f6da68519f78b0d0bc93c701e86affcb75c92428000000000000000000000000f6da68519f78b0d0bc93c701e86affcb75c92428000000000000000000000000000000000000000000000000002386f26fc10000000000000000000000000000000000000000000000000000016345785d8a000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000001600000000000000000000000000000000000000000000000000000000000000024f47261b0000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc2000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000024f47261b0000000000000000000000000e41d2489571d322189246dafa5ebde1f4699f49800000000000000000000000000000000000000000000000000000000',
+                topics: [
+                    '0x0bcc4c97732e47d9946f229edb95f5b6323f601300e4690de719993f3c371129',
+                    '0x000000000000000000000000f6da68519f78b0d0bc93c701e86affcb75c92428',
+                    '0x000000000000000000000000c370d2a5920344aa6b7d8d11250e3e861434cbdd',
+                    '0xab12ed2cbaa5615ab690b9da75a46e53ddfcf3f1a68655b5fe0d94c75a1aac4a',
+                ],
+                event: 'Fill',
+                args: {
+                    makerAddress: '0xf6da68519f78b0d0bc93c701e86affcb75c92428',
+                    feeRecipientAddress: '0xc370d2a5920344aa6b7d8d11250e3e861434cbdd',
+                    takerAddress: '0xf6da68519f78b0d0bc93c701e86affcb75c92428',
+                    senderAddress: '0xf6da68519f78b0d0bc93c701e86affcb75c92428',
+                    makerAssetFilledAmount: new BigNumber('10000000000000000'),
+                    takerAssetFilledAmount: new BigNumber('100000000000000000'),
+                    makerFeePaid: new BigNumber('0'),
+                    takerFeePaid: new BigNumber('12345'),
+                    orderHash: '0xab12ed2cbaa5615ab690b9da75a46e53ddfcf3f1a68655b5fe0d94c75a1aac4a',
+                    makerAssetData: '0xf47261b0000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc2',
+                    takerAssetData: '0xf47261b0000000000000000000000000e41d2489571d322189246dafa5ebde1f4699f498',
+                },
+            };
+            const expected = new ExchangeFillEvent();
+            expected.contractAddress = '0x4f833a24e1f95d70f028921e27040ca56e09ab0b';
+            expected.blockNumber = 6276262;
+            expected.logIndex = 102;
+            expected.rawData =
+                '0x000000000000000000000000f6da68519f78b0d0bc93c701e86affcb75c92428000000000000000000000000f6da68519f78b0d0bc93c701e86affcb75c92428000000000000000000000000000000000000000000000000002386f26fc10000000000000000000000000000000000000000000000000000016345785d8a000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000001600000000000000000000000000000000000000000000000000000000000000024f47261b0000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc2000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000024f47261b0000000000000000000000000e41d2489571d322189246dafa5ebde1f4699f49800000000000000000000000000000000000000000000000000000000';
+            expected.transactionHash = '0x6dd106d002873746072fc5e496dd0fb2541b68c77bcf9184ae19a42fd33657fe';
+            expected.makerAddress = '0xf6da68519f78b0d0bc93c701e86affcb75c92428';
+            expected.takerAddress = '0xf6da68519f78b0d0bc93c701e86affcb75c92428';
+            expected.feeRecipientAddress = '0xc370d2a5920344aa6b7d8d11250e3e861434cbdd';
+            expected.senderAddress = '0xf6da68519f78b0d0bc93c701e86affcb75c92428';
+            expected.makerAssetFilledAmount = new BigNumber('10000000000000000');
+            expected.takerAssetFilledAmount = new BigNumber('100000000000000000');
+            expected.makerFeePaid = new BigNumber('0');
+            expected.takerFeePaid = new BigNumber('12345');
+            expected.orderHash = '0xab12ed2cbaa5615ab690b9da75a46e53ddfcf3f1a68655b5fe0d94c75a1aac4a';
+            expected.rawMakerAssetData = '0xf47261b0000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc2';
+            expected.makerAssetType = 'erc20';
+            expected.makerAssetProxyId = '0xf47261b0';
+            expected.makerTokenAddress = '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2';
+            expected.makerTokenId = null;
+            expected.rawTakerAssetData = '0xf47261b0000000000000000000000000e41d2489571d322189246dafa5ebde1f4699f498';
+            expected.takerAssetType = 'erc20';
+            expected.takerAssetProxyId = '0xf47261b0';
+            expected.takerTokenAddress = '0xe41d2489571d322189246dafa5ebde1f4699f498';
+            expected.takerTokenId = null;
+            const actual = _convertToExchangeFillEvent(input);
+            expect(actual).deep.equal(expected);
+        });
+    });
+});
diff --git a/packages/pipeline/test/parsers/events/index_test.ts b/packages/pipeline/test/parsers/events/index_test.ts
deleted file mode 100644
index 7e439ce39..000000000
--- a/packages/pipeline/test/parsers/events/index_test.ts
+++ /dev/null
@@ -1,78 +0,0 @@
-import { ExchangeFillEventArgs } from '@0x/contract-wrappers';
-import { BigNumber } from '@0x/utils';
-import * as chai from 'chai';
-import { LogWithDecodedArgs } from 'ethereum-types';
-import 'mocha';
-
-import { ExchangeFillEvent } from '../../../src/entities';
-import { _convertToExchangeFillEvent } from '../../../src/parsers/events';
-import { chaiSetup } from '../../utils/chai_setup';
-
-chaiSetup.configure();
-const expect = chai.expect;
-
-// tslint:disable:custom-no-magic-numbers
-describe('exchange_events', () => {
-    describe('_convertToExchangeFillEvent', () => {
-        it('converts LogWithDecodedArgs to ExchangeFillEvent entity', () => {
-            const input: LogWithDecodedArgs<ExchangeFillEventArgs> = {
-                logIndex: 102,
-                transactionIndex: 38,
-                transactionHash: '0x6dd106d002873746072fc5e496dd0fb2541b68c77bcf9184ae19a42fd33657fe',
-                blockHash: '',
-                blockNumber: 6276262,
-                address: '0x4f833a24e1f95d70f028921e27040ca56e09ab0b',
-                data:
-                    '0x000000000000000000000000f6da68519f78b0d0bc93c701e86affcb75c92428000000000000000000000000f6da68519f78b0d0bc93c701e86affcb75c92428000000000000000000000000000000000000000000000000002386f26fc10000000000000000000000000000000000000000000000000000016345785d8a000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000001600000000000000000000000000000000000000000000000000000000000000024f47261b0000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc2000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000024f47261b0000000000000000000000000e41d2489571d322189246dafa5ebde1f4699f49800000000000000000000000000000000000000000000000000000000',
-                topics: [
-                    '0x0bcc4c97732e47d9946f229edb95f5b6323f601300e4690de719993f3c371129',
-                    '0x000000000000000000000000f6da68519f78b0d0bc93c701e86affcb75c92428',
-                    '0x000000000000000000000000c370d2a5920344aa6b7d8d11250e3e861434cbdd',
-                    '0xab12ed2cbaa5615ab690b9da75a46e53ddfcf3f1a68655b5fe0d94c75a1aac4a',
-                ],
-                event: 'Fill',
-                args: {
-                    makerAddress: '0xf6da68519f78b0d0bc93c701e86affcb75c92428',
-                    feeRecipientAddress: '0xc370d2a5920344aa6b7d8d11250e3e861434cbdd',
-                    takerAddress: '0xf6da68519f78b0d0bc93c701e86affcb75c92428',
-                    senderAddress: '0xf6da68519f78b0d0bc93c701e86affcb75c92428',
-                    makerAssetFilledAmount: new BigNumber('10000000000000000'),
-                    takerAssetFilledAmount: new BigNumber('100000000000000000'),
-                    makerFeePaid: new BigNumber('0'),
-                    takerFeePaid: new BigNumber('12345'),
-                    orderHash: '0xab12ed2cbaa5615ab690b9da75a46e53ddfcf3f1a68655b5fe0d94c75a1aac4a',
-                    makerAssetData: '0xf47261b0000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc2',
-                    takerAssetData: '0xf47261b0000000000000000000000000e41d2489571d322189246dafa5ebde1f4699f498',
-                },
-            };
-            const expected = new ExchangeFillEvent();
-            expected.contractAddress = '0x4f833a24e1f95d70f028921e27040ca56e09ab0b';
-            expected.blockNumber = 6276262;
-            expected.logIndex = 102;
-            expected.rawData =
-                '0x000000000000000000000000f6da68519f78b0d0bc93c701e86affcb75c92428000000000000000000000000f6da68519f78b0d0bc93c701e86affcb75c92428000000000000000000000000000000000000000000000000002386f26fc10000000000000000000000000000000000000000000000000000016345785d8a000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000001600000000000000000000000000000000000000000000000000000000000000024f47261b0000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc2000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000024f47261b0000000000000000000000000e41d2489571d322189246dafa5ebde1f4699f49800000000000000000000000000000000000000000000000000000000';
-            expected.transactionHash = '0x6dd106d002873746072fc5e496dd0fb2541b68c77bcf9184ae19a42fd33657fe';
-            expected.makerAddress = '0xf6da68519f78b0d0bc93c701e86affcb75c92428';
-            expected.takerAddress = '0xf6da68519f78b0d0bc93c701e86affcb75c92428';
-            expected.feeRecipientAddress = '0xc370d2a5920344aa6b7d8d11250e3e861434cbdd';
-            expected.senderAddress = '0xf6da68519f78b0d0bc93c701e86affcb75c92428';
-            expected.makerAssetFilledAmount = new BigNumber('10000000000000000');
-            expected.takerAssetFilledAmount = new BigNumber('100000000000000000');
-            expected.makerFeePaid = new BigNumber('0');
-            expected.takerFeePaid = new BigNumber('12345');
-            expected.orderHash = '0xab12ed2cbaa5615ab690b9da75a46e53ddfcf3f1a68655b5fe0d94c75a1aac4a';
-            expected.rawMakerAssetData = '0xf47261b0000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc2';
-            expected.makerAssetType = 'erc20';
-            expected.makerAssetProxyId = '0xf47261b0';
-            expected.makerTokenAddress = '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2';
-            expected.makerTokenId = null;
-            expected.rawTakerAssetData = '0xf47261b0000000000000000000000000e41d2489571d322189246dafa5ebde1f4699f498';
-            expected.takerAssetType = 'erc20';
-            expected.takerAssetProxyId = '0xf47261b0';
-            expected.takerTokenAddress = '0xe41d2489571d322189246dafa5ebde1f4699f498';
-            expected.takerTokenId = null;
-            const actual = _convertToExchangeFillEvent(input);
-            expect(actual).deep.equal(expected);
-        });
-    });
-});
-- 
cgit v1.2.3