aboutsummaryrefslogtreecommitdiffstats
path: root/packages/pipeline
diff options
context:
space:
mode:
authorAlex Browne <stephenalexbrowne@gmail.com>2018-12-06 03:08:19 +0800
committerGitHub <noreply@github.com>2018-12-06 03:08:19 +0800
commitb2dd5495bcf13a9ea71498b5def12c75589b0156 (patch)
tree0e0d728d540e747c32a083d604d7916a35ea95cf /packages/pipeline
parent72a30260d88e722a6b076134693360c573f6c70f (diff)
parente0348f9c044b4909260e4864398b4f50232da620 (diff)
downloaddexon-sol-tools-b2dd5495bcf13a9ea71498b5def12c75589b0156.tar
dexon-sol-tools-b2dd5495bcf13a9ea71498b5def12c75589b0156.tar.gz
dexon-sol-tools-b2dd5495bcf13a9ea71498b5def12c75589b0156.tar.bz2
dexon-sol-tools-b2dd5495bcf13a9ea71498b5def12c75589b0156.tar.lz
dexon-sol-tools-b2dd5495bcf13a9ea71498b5def12c75589b0156.tar.xz
dexon-sol-tools-b2dd5495bcf13a9ea71498b5def12c75589b0156.tar.zst
dexon-sol-tools-b2dd5495bcf13a9ea71498b5def12c75589b0156.zip
Merge pull request #1377 from 0xProject/feature/pipeline-cleanup-mega-rebase
Merge all pipeline code into development
Diffstat (limited to 'packages/pipeline')
-rw-r--r--packages/pipeline/.npmignore7
-rw-r--r--packages/pipeline/README.md166
-rw-r--r--packages/pipeline/coverage/.gitkeep0
-rw-r--r--packages/pipeline/migrations/1542070840010-InitialSchema.ts187
-rw-r--r--packages/pipeline/migrations/1542147915364-NewSraOrderTimestampFormat.ts48
-rw-r--r--packages/pipeline/migrations/1542152278484-RenameSraOrdersFilledAmounts.ts13
-rw-r--r--packages/pipeline/migrations/1542234704666-ConvertBigNumberToNumeric.ts53
-rw-r--r--packages/pipeline/migrations/1542249766882-AddHomepageUrlToRelayers.ts14
-rw-r--r--packages/pipeline/migrations/1542401122477-MakeTakerAddressNullable.ts17
-rw-r--r--packages/pipeline/migrations/1542655823221-NewMetadataAndOHLCVTables.ts60
-rw-r--r--packages/pipeline/migrations/1543434472116-TokenOrderbookSnapshots.ts30
-rw-r--r--packages/pipeline/migrations/1543446690436-CreateDexTrades.ts41
-rw-r--r--packages/pipeline/migrations/1543980079179-ConvertTokenMetadataDecimalsToBigNumber.ts17
-rw-r--r--packages/pipeline/migrations/1543983324954-ConvertTransactionGasPriceToBigNumber.ts19
-rw-r--r--packages/pipeline/package.json65
-rw-r--r--packages/pipeline/src/data_sources/bloxy/index.ts133
-rw-r--r--packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts85
-rw-r--r--packages/pipeline/src/data_sources/ddex/index.ts78
-rw-r--r--packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts94
-rw-r--r--packages/pipeline/src/data_sources/paradex/index.ts92
-rw-r--r--packages/pipeline/src/data_sources/relayer-registry/index.ts33
-rw-r--r--packages/pipeline/src/data_sources/trusted_tokens/index.ts29
-rw-r--r--packages/pipeline/src/data_sources/web3/index.ts22
-rw-r--r--packages/pipeline/src/entities/block.ts13
-rw-r--r--packages/pipeline/src/entities/dex_trade.ts54
-rw-r--r--packages/pipeline/src/entities/exchange_cancel_event.ts51
-rw-r--r--packages/pipeline/src/entities/exchange_cancel_up_to_event.ts26
-rw-r--r--packages/pipeline/src/entities/exchange_fill_event.ts60
-rw-r--r--packages/pipeline/src/entities/index.ts18
-rw-r--r--packages/pipeline/src/entities/ohlcv_external.ts30
-rw-r--r--packages/pipeline/src/entities/relayer.ts21
-rw-r--r--packages/pipeline/src/entities/sra_order.ts63
-rw-r--r--packages/pipeline/src/entities/sra_order_observed_timestamp.ts32
-rw-r--r--packages/pipeline/src/entities/token_metadata.ts22
-rw-r--r--packages/pipeline/src/entities/token_order.ts29
-rw-r--r--packages/pipeline/src/entities/transaction.ts19
-rw-r--r--packages/pipeline/src/ormconfig.ts42
-rw-r--r--packages/pipeline/src/parsers/bloxy/index.ts53
-rw-r--r--packages/pipeline/src/parsers/ddex_orders/index.ts77
-rw-r--r--packages/pipeline/src/parsers/events/index.ts133
-rw-r--r--packages/pipeline/src/parsers/ohlcv_external/crypto_compare.ts38
-rw-r--r--packages/pipeline/src/parsers/paradex_orders/index.ts66
-rw-r--r--packages/pipeline/src/parsers/relayer_registry/index.ts37
-rw-r--r--packages/pipeline/src/parsers/sra_orders/index.ts62
-rw-r--r--packages/pipeline/src/parsers/token_metadata/index.ts47
-rw-r--r--packages/pipeline/src/parsers/web3/index.ts49
-rw-r--r--packages/pipeline/src/scripts/pull_competing_dex_trades.ts51
-rw-r--r--packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts55
-rw-r--r--packages/pipeline/src/scripts/pull_missing_blocks.ts80
-rw-r--r--packages/pipeline/src/scripts/pull_missing_events.ts136
-rw-r--r--packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts101
-rw-r--r--packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts87
-rw-r--r--packages/pipeline/src/scripts/pull_radar_relay_orders.ts53
-rw-r--r--packages/pipeline/src/scripts/pull_trusted_tokens.ts52
-rw-r--r--packages/pipeline/src/scripts/update_relayer_info.ts33
-rw-r--r--packages/pipeline/src/types.ts2
-rw-r--r--packages/pipeline/src/utils/constants.ts3
-rw-r--r--packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts92
-rw-r--r--packages/pipeline/src/utils/index.ts38
-rw-r--r--packages/pipeline/src/utils/transformers/big_number.ts16
-rw-r--r--packages/pipeline/src/utils/transformers/index.ts2
-rw-r--r--packages/pipeline/src/utils/transformers/number_to_bigint.ts27
-rw-r--r--packages/pipeline/test/data_sources/ohlcv_external/crypto_compare_test.ts47
-rw-r--r--packages/pipeline/test/db_global_hooks.ts9
-rw-r--r--packages/pipeline/test/db_setup.ts174
-rw-r--r--packages/pipeline/test/entities/block_test.ts23
-rw-r--r--packages/pipeline/test/entities/dex_trades_test.ts60
-rw-r--r--packages/pipeline/test/entities/exchange_cancel_event_test.ts57
-rw-r--r--packages/pipeline/test/entities/exchange_cancel_up_to_event_test.ts29
-rw-r--r--packages/pipeline/test/entities/exchange_fill_event_test.ts62
-rw-r--r--packages/pipeline/test/entities/ohlcv_external_test.ts35
-rw-r--r--packages/pipeline/test/entities/relayer_test.ts55
-rw-r--r--packages/pipeline/test/entities/sra_order_test.ts84
-rw-r--r--packages/pipeline/test/entities/token_metadata_test.ts39
-rw-r--r--packages/pipeline/test/entities/token_order_test.ts31
-rw-r--r--packages/pipeline/test/entities/transaction_test.ts26
-rw-r--r--packages/pipeline/test/entities/util.ts25
-rw-r--r--packages/pipeline/test/parsers/bloxy/index_test.ts99
-rw-r--r--packages/pipeline/test/parsers/ddex_orders/index_test.ts66
-rw-r--r--packages/pipeline/test/parsers/events/index_test.ts78
-rw-r--r--packages/pipeline/test/parsers/ohlcv_external/crypto_compare_test.ts62
-rw-r--r--packages/pipeline/test/parsers/paradex_orders/index_test.ts54
-rw-r--r--packages/pipeline/test/parsers/sra_orders/index_test.ts68
-rw-r--r--packages/pipeline/test/utils/chai_setup.ts13
-rw-r--r--packages/pipeline/tsconfig.json10
-rw-r--r--packages/pipeline/tslint.json3
-rw-r--r--packages/pipeline/typedoc-tsconfig.json10
87 files changed, 4392 insertions, 0 deletions
diff --git a/packages/pipeline/.npmignore b/packages/pipeline/.npmignore
new file mode 100644
index 000000000..89302c908
--- /dev/null
+++ b/packages/pipeline/.npmignore
@@ -0,0 +1,7 @@
+.*
+yarn-error.log
+/scripts/
+/generated_docs/
+/src/
+tsconfig.json
+/lib/monorepo_scripts/
diff --git a/packages/pipeline/README.md b/packages/pipeline/README.md
new file mode 100644
index 000000000..794488cac
--- /dev/null
+++ b/packages/pipeline/README.md
@@ -0,0 +1,166 @@
+## @0xproject/pipeline
+
+This repository contains scripts used for scraping data from the Ethereum blockchain into SQL tables for analysis by the 0x team.
+
+## Contributing
+
+We strongly recommend that the community help us make improvements and determine the future direction of the protocol. To report bugs within this package, please create an issue in this repository.
+
+Please read our [contribution guidelines](../../CONTRIBUTING.md) before getting started.
+
+### Install dependencies:
+
+```bash
+yarn install
+```
+
+### Build
+
+```bash
+yarn build
+```
+
+### Clean
+
+```bash
+yarn clean
+```
+
+### Lint
+
+```bash
+yarn lint
+```
+
+### Migrations
+
+Create a new migration: `yarn migrate:create --name MigrationNameInCamelCase`
+Run migrations: `yarn migrate:run`
+Revert the most recent migration (CAUTION: may result in data loss!): `yarn migrate:revert`
+
+## Testing
+
+There are several test scripts in **package.json**. You can run all the tests
+with `yarn test:all` or run certain tests seprately by following the
+instructions below. Some tests may not work out of the box on certain platforms
+or operating systems (see the "Database tests" section below).
+
+### Unit tests
+
+The unit tests can be run with `yarn test`. These tests don't depend on any
+services or databases and will run in any environment that can run Node.
+
+### Database tests
+
+Database integration tests can be run with `yarn test:db`. These tests will
+attempt to automatically spin up a Postgres database via Docker. If this doesn't
+work you have two other options:
+
+1. Set the `DOCKER_SOCKET` environment variable to a valid socket path to use
+ for communicating with Docker.
+2. Start Postgres manually and set the `ZEROEX_DATA_PIPELINE_TEST_DB_URL`
+ environment variable. If this is set, the tests will use your existing
+ Postgres database instead of trying to create one with Docker.
+
+## Running locally
+
+`pipeline` requires access to a PostgreSQL database. The easiest way to start
+Postgres is via Docker. Depending on your platform, you may need to prepend
+`sudo` to the following command:
+
+```
+docker run --rm -d -p 5432:5432 --name pipeline_postgres postgres:11-alpine
+```
+
+This will start a Postgres server with the default username and database name
+(`postgres` and `postgres`). You should set the environment variable as follows:
+
+```
+export ZEROEX_DATA_PIPELINE_DB_URL=postgresql://postgres@localhost/postgres
+```
+
+First thing you will need to do is run the migrations:
+
+```
+yarn migrate:run
+```
+
+Now you can run scripts locally:
+
+```
+node packages/pipeline/lib/src/scripts/pull_radar_relay_orders.js
+```
+
+To stop the Postgres server (you may need to add `sudo`):
+
+```
+docker stop pipeline_postgres
+```
+
+This will remove all data from the database.
+
+If you prefer, you can also install Postgres with e.g.,
+[Homebrew](https://wiki.postgresql.org/wiki/Homebrew) or
+[Postgress.app](https://postgresapp.com/). Keep in mind that you will need to
+set the`ZEROEX_DATA_PIPELINE_DB_URL` environment variable to a valid
+[PostgreSQL connection url](https://stackoverflow.com/questions/3582552/postgresql-connection-url)
+
+## Directory structure
+
+```
+.
+├── lib: Code generated by the TypeScript compiler. Don't edit this directly.
+├── migrations: Code for creating and updating database schemas.
+├── node_modules:
+├── src: All TypeScript source code.
+│   ├── data_sources: Code responsible for getting raw data, typically from a third-party source.
+│   ├── entities: TypeORM entities which closely mirror our database schemas. Some other ORMs call these "models".
+│   ├── parsers: Code for converting raw data into entities.
+│   ├── scripts: Executable scripts which put all the pieces together.
+│   └── utils: Various utils used across packages/files.
+├── test: All tests go here and are organized in the same way as the folder/file that they test.
+```
+
+## Adding new data to the pipeline
+
+1. Create an entity in the _entities_ directory. Entities directly mirror our
+ database schemas. We follow the practice of having "dumb" entities, so
+ entity classes should typically not have any methods.
+2. Create a migration using the `yarn migrate:create` command. Create/update
+ tables as needed. Remember to fill in both the `up` and `down` methods. Try
+ to avoid data loss as much as possible in your migrations.
+3. Add basic tests for your entity and migrations to the **test/entities/**
+ directory.
+4. Create a class or function in the **data_sources/** directory for getting
+ raw data. This code should abstract away pagination and rate-limiting as
+ much as possible.
+5. Create a class or function in the **parsers/** directory for converting the
+ raw data into an entity. Also add tests in the **tests/** directory to test
+ the parser.
+6. Create an executable script in the **scripts/** directory for putting
+ everything together. Your script can accept environment variables for things
+ like API keys. It should pull the data, parse it, and save it to the
+ database. Scripts should be idempotent and atomic (when possible). What this
+ means is that your script may be responsible for determining _which_ data
+ needs to be updated. For example, you may need to query the database to find
+ the most recent block number that we have already pulled, then pull new data
+ starting from that block number.
+7. Run the migrations and then run your new script locally and verify it works
+ as expected.
+
+#### Additional guidelines and tips:
+
+* Table names should be plural and separated by underscores (e.g.,
+ `exchange_fill_events`).
+* Any table which contains data which comes directly from a third-party source
+ should be namespaced in the `raw` PostgreSQL schema.
+* Column names in the database should be separated by underscores (e.g.,
+ `maker_asset_type`).
+* Field names in entity classes (like any other fields in TypeScript) should
+ be camel-cased (e.g., `makerAssetType`).
+* All timestamps should be stored as milliseconds since the Unix Epoch.
+* Use the `BigNumber` type for TypeScript code which deals with 256-bit
+ numbers from smart contracts or for any case where we are dealing with large
+ floating point numbers.
+* [TypeORM documentation](http://typeorm.io/#/) is pretty robust and can be a
+ helpful resource.
diff --git a/packages/pipeline/coverage/.gitkeep b/packages/pipeline/coverage/.gitkeep
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/packages/pipeline/coverage/.gitkeep
diff --git a/packages/pipeline/migrations/1542070840010-InitialSchema.ts b/packages/pipeline/migrations/1542070840010-InitialSchema.ts
new file mode 100644
index 000000000..895f9e6c9
--- /dev/null
+++ b/packages/pipeline/migrations/1542070840010-InitialSchema.ts
@@ -0,0 +1,187 @@
+import { MigrationInterface, QueryRunner, Table } from 'typeorm';
+
+const blocks = new Table({
+ name: 'raw.blocks',
+ columns: [
+ { name: 'number', type: 'bigint', isPrimary: true },
+ { name: 'hash', type: 'varchar', isPrimary: true },
+ { name: 'timestamp', type: 'bigint' },
+ ],
+});
+
+const exchange_cancel_events = new Table({
+ name: 'raw.exchange_cancel_events',
+ columns: [
+ { name: 'contract_address', type: 'char(42)', isPrimary: true },
+ { name: 'log_index', type: 'integer', isPrimary: true },
+ { name: 'block_number', type: 'bigint', isPrimary: true },
+
+ { name: 'raw_data', type: 'varchar' },
+
+ { name: 'transaction_hash', type: 'varchar' },
+ { name: 'maker_address', type: 'char(42)' },
+ { name: 'taker_address', type: 'char(42)' },
+ { name: 'fee_recipient_address', type: 'char(42)' },
+ { name: 'sender_address', type: 'char(42)' },
+ { name: 'order_hash', type: 'varchar' },
+
+ { name: 'raw_maker_asset_data', type: 'varchar' },
+ { name: 'maker_asset_type', type: 'varchar' },
+ { name: 'maker_asset_proxy_id', type: 'varchar' },
+ { name: 'maker_token_address', type: 'char(42)' },
+ { name: 'maker_token_id', type: 'varchar', isNullable: true },
+ { name: 'raw_taker_asset_data', type: 'varchar' },
+ { name: 'taker_asset_type', type: 'varchar' },
+ { name: 'taker_asset_proxy_id', type: 'varchar' },
+ { name: 'taker_token_address', type: 'char(42)' },
+ { name: 'taker_token_id', type: 'varchar', isNullable: true },
+ ],
+});
+
+const exchange_cancel_up_to_events = new Table({
+ name: 'raw.exchange_cancel_up_to_events',
+ columns: [
+ { name: 'contract_address', type: 'char(42)', isPrimary: true },
+ { name: 'log_index', type: 'integer', isPrimary: true },
+ { name: 'block_number', type: 'bigint', isPrimary: true },
+
+ { name: 'raw_data', type: 'varchar' },
+
+ { name: 'transaction_hash', type: 'varchar' },
+ { name: 'maker_address', type: 'char(42)' },
+ { name: 'sender_address', type: 'char(42)' },
+ { name: 'order_epoch', type: 'varchar' },
+ ],
+});
+
+const exchange_fill_events = new Table({
+ name: 'raw.exchange_fill_events',
+ columns: [
+ { name: 'contract_address', type: 'char(42)', isPrimary: true },
+ { name: 'log_index', type: 'integer', isPrimary: true },
+ { name: 'block_number', type: 'bigint', isPrimary: true },
+
+ { name: 'raw_data', type: 'varchar' },
+
+ { name: 'transaction_hash', type: 'varchar' },
+ { name: 'maker_address', type: 'char(42)' },
+ { name: 'taker_address', type: 'char(42)' },
+ { name: 'fee_recipient_address', type: 'char(42)' },
+ { name: 'sender_address', type: 'char(42)' },
+ { name: 'maker_asset_filled_amount', type: 'varchar' },
+ { name: 'taker_asset_filled_amount', type: 'varchar' },
+ { name: 'maker_fee_paid', type: 'varchar' },
+ { name: 'taker_fee_paid', type: 'varchar' },
+ { name: 'order_hash', type: 'varchar' },
+
+ { name: 'raw_maker_asset_data', type: 'varchar' },
+ { name: 'maker_asset_type', type: 'varchar' },
+ { name: 'maker_asset_proxy_id', type: 'varchar' },
+ { name: 'maker_token_address', type: 'char(42)' },
+ { name: 'maker_token_id', type: 'varchar', isNullable: true },
+ { name: 'raw_taker_asset_data', type: 'varchar' },
+ { name: 'taker_asset_type', type: 'varchar' },
+ { name: 'taker_asset_proxy_id', type: 'varchar' },
+ { name: 'taker_token_address', type: 'char(42)' },
+ { name: 'taker_token_id', type: 'varchar', isNullable: true },
+ ],
+});
+
+const relayers = new Table({
+ name: 'raw.relayers',
+ columns: [
+ { name: 'uuid', type: 'varchar', isPrimary: true },
+ { name: 'name', type: 'varchar' },
+ { name: 'sra_http_endpoint', type: 'varchar', isNullable: true },
+ { name: 'sra_ws_endpoint', type: 'varchar', isNullable: true },
+ { name: 'app_url', type: 'varchar', isNullable: true },
+ { name: 'fee_recipient_addresses', type: 'char(42)', isArray: true },
+ { name: 'taker_addresses', type: 'char(42)', isArray: true },
+ ],
+});
+
+const sra_orders = new Table({
+ name: 'raw.sra_orders',
+ columns: [
+ { name: 'exchange_address', type: 'char(42)', isPrimary: true },
+ { name: 'order_hash_hex', type: 'varchar', isPrimary: true },
+
+ { name: 'source_url', type: 'varchar' },
+ { name: 'last_updated_timestamp', type: 'bigint' },
+ { name: 'first_seen_timestamp', type: 'bigint' },
+
+ { name: 'maker_address', type: 'char(42)' },
+ { name: 'taker_address', type: 'char(42)' },
+ { name: 'fee_recipient_address', type: 'char(42)' },
+ { name: 'sender_address', type: 'char(42)' },
+ { name: 'maker_asset_filled_amount', type: 'varchar' },
+ { name: 'taker_asset_filled_amount', type: 'varchar' },
+ { name: 'maker_fee', type: 'varchar' },
+ { name: 'taker_fee', type: 'varchar' },
+ { name: 'expiration_time_seconds', type: 'int' },
+ { name: 'salt', type: 'varchar' },
+ { name: 'signature', type: 'varchar' },
+
+ { name: 'raw_maker_asset_data', type: 'varchar' },
+ { name: 'maker_asset_type', type: 'varchar' },
+ { name: 'maker_asset_proxy_id', type: 'varchar' },
+ { name: 'maker_token_address', type: 'char(42)' },
+ { name: 'maker_token_id', type: 'varchar', isNullable: true },
+ { name: 'raw_taker_asset_data', type: 'varchar' },
+ { name: 'taker_asset_type', type: 'varchar' },
+ { name: 'taker_asset_proxy_id', type: 'varchar' },
+ { name: 'taker_token_address', type: 'char(42)' },
+ { name: 'taker_token_id', type: 'varchar', isNullable: true },
+
+ { name: 'metadata_json', type: 'varchar' },
+ ],
+});
+
+const token_on_chain_metadata = new Table({
+ name: 'raw.token_on_chain_metadata',
+ columns: [
+ { name: 'address', type: 'char(42)', isPrimary: true },
+ { name: 'decimals', type: 'integer' },
+ { name: 'symbol', type: 'varchar' },
+ { name: 'name', type: 'varchar' },
+ ],
+});
+
+const transactions = new Table({
+ name: 'raw.transactions',
+ columns: [
+ { name: 'block_number', type: 'bigint', isPrimary: true },
+ { name: 'block_hash', type: 'varchar', isPrimary: true },
+ { name: 'transaction_hash', type: 'varchar', isPrimary: true },
+ { name: 'gas_used', type: 'bigint' },
+ { name: 'gas_price', type: 'bigint' },
+ ],
+});
+
+export class InitialSchema1542070840010 implements MigrationInterface {
+ public async up(queryRunner: QueryRunner): Promise<any> {
+ await queryRunner.createSchema('raw');
+
+ await queryRunner.createTable(blocks);
+ await queryRunner.createTable(exchange_cancel_events);
+ await queryRunner.createTable(exchange_cancel_up_to_events);
+ await queryRunner.createTable(exchange_fill_events);
+ await queryRunner.createTable(relayers);
+ await queryRunner.createTable(sra_orders);
+ await queryRunner.createTable(token_on_chain_metadata);
+ await queryRunner.createTable(transactions);
+ }
+
+ public async down(queryRunner: QueryRunner): Promise<any> {
+ await queryRunner.dropTable(blocks.name);
+ await queryRunner.dropTable(exchange_cancel_events.name);
+ await queryRunner.dropTable(exchange_cancel_up_to_events.name);
+ await queryRunner.dropTable(exchange_fill_events.name);
+ await queryRunner.dropTable(relayers.name);
+ await queryRunner.dropTable(sra_orders.name);
+ await queryRunner.dropTable(token_on_chain_metadata.name);
+ await queryRunner.dropTable(transactions.name);
+
+ await queryRunner.dropSchema('raw');
+ }
+}
diff --git a/packages/pipeline/migrations/1542147915364-NewSraOrderTimestampFormat.ts b/packages/pipeline/migrations/1542147915364-NewSraOrderTimestampFormat.ts
new file mode 100644
index 000000000..5a8f3fec8
--- /dev/null
+++ b/packages/pipeline/migrations/1542147915364-NewSraOrderTimestampFormat.ts
@@ -0,0 +1,48 @@
+import { MigrationInterface, QueryRunner, Table } from 'typeorm';
+
+export class NewSraOrderTimestampFormat1542147915364 implements MigrationInterface {
+ public async up(queryRunner: QueryRunner): Promise<any> {
+ await queryRunner.query(
+ `ALTER TABLE raw.sra_orders
+ DROP CONSTRAINT "PK_09bfb9980715329563bd53d667e",
+ ADD PRIMARY KEY (order_hash_hex, exchange_address, source_url);
+ `,
+ );
+
+ await queryRunner.query(
+ `CREATE TABLE raw.sra_orders_observed_timestamps (
+ order_hash_hex varchar NOT NULL,
+ exchange_address varchar NOT NULL,
+ source_url varchar NOT NULL,
+ observed_timestamp bigint NOT NULL,
+ FOREIGN KEY
+ (order_hash_hex, exchange_address, source_url)
+ REFERENCES raw.sra_orders (order_hash_hex, exchange_address, source_url),
+ PRIMARY KEY (order_hash_hex, exchange_address, source_url, observed_timestamp)
+ );`,
+ );
+
+ await queryRunner.query(
+ `ALTER TABLE raw.sra_orders
+ DROP COLUMN last_updated_timestamp,
+ DROP COLUMN first_seen_timestamp;`,
+ );
+ }
+
+ public async down(queryRunner: QueryRunner): Promise<any> {
+ await queryRunner.dropTable('raw.sra_orders_observed_timestamps');
+
+ await queryRunner.query(
+ `ALTER TABLE raw.sra_orders
+ ADD COLUMN last_updated_timestamp bigint NOT NULL DEFAULT 0,
+ ADD COLUMN first_seen_timestamp bigint NOT NULL DEFAULT 0;`,
+ );
+
+ await queryRunner.query(
+ `ALTER TABLE raw.sra_orders
+ DROP CONSTRAINT sra_orders_pkey,
+ ADD CONSTRAINT "PK_09bfb9980715329563bd53d667e" PRIMARY KEY ("exchange_address", "order_hash_hex");
+ `,
+ );
+ }
+}
diff --git a/packages/pipeline/migrations/1542152278484-RenameSraOrdersFilledAmounts.ts b/packages/pipeline/migrations/1542152278484-RenameSraOrdersFilledAmounts.ts
new file mode 100644
index 000000000..a13e3efa5
--- /dev/null
+++ b/packages/pipeline/migrations/1542152278484-RenameSraOrdersFilledAmounts.ts
@@ -0,0 +1,13 @@
+import { MigrationInterface, QueryRunner } from 'typeorm';
+
+export class RenameSraOrdersFilledAmounts1542152278484 implements MigrationInterface {
+ public async up(queryRunner: QueryRunner): Promise<any> {
+ await queryRunner.renameColumn('raw.sra_orders', 'maker_asset_filled_amount', 'maker_asset_amount');
+ await queryRunner.renameColumn('raw.sra_orders', 'taker_asset_filled_amount', 'taker_asset_amount');
+ }
+
+ public async down(queryRunner: QueryRunner): Promise<any> {
+ await queryRunner.renameColumn('raw.sra_orders', 'maker_asset_amount', 'maker_asset_filled_amount');
+ await queryRunner.renameColumn('raw.sra_orders', 'taker_asset_amount', 'taker_asset_filled_amount');
+ }
+}
diff --git a/packages/pipeline/migrations/1542234704666-ConvertBigNumberToNumeric.ts b/packages/pipeline/migrations/1542234704666-ConvertBigNumberToNumeric.ts
new file mode 100644
index 000000000..5200ef7cc
--- /dev/null
+++ b/packages/pipeline/migrations/1542234704666-ConvertBigNumberToNumeric.ts
@@ -0,0 +1,53 @@
+import { MigrationInterface, QueryRunner } from 'typeorm';
+
+export class ConvertBigNumberToNumeric1542234704666 implements MigrationInterface {
+ public async up(queryRunner: QueryRunner): Promise<any> {
+ await queryRunner.query(
+ `ALTER TABLE raw.exchange_fill_events
+ ALTER COLUMN maker_asset_filled_amount TYPE numeric USING maker_asset_filled_amount::numeric,
+ ALTER COLUMN taker_asset_filled_amount TYPE numeric USING taker_asset_filled_amount::numeric,
+ ALTER COLUMN maker_fee_paid TYPE numeric USING maker_fee_paid::numeric,
+ ALTER COLUMN taker_fee_paid TYPE numeric USING taker_fee_paid::numeric;`,
+ );
+
+ await queryRunner.query(
+ `ALTER TABLE raw.exchange_cancel_up_to_events
+ ALTER COLUMN order_epoch TYPE numeric USING order_epoch::numeric;`,
+ );
+
+ await queryRunner.query(
+ `ALTER TABLE raw.sra_orders
+ ALTER COLUMN maker_asset_amount TYPE numeric USING maker_asset_amount::numeric,
+ ALTER COLUMN taker_asset_amount TYPE numeric USING taker_asset_amount::numeric,
+ ALTER COLUMN maker_fee TYPE numeric USING maker_fee::numeric,
+ ALTER COLUMN taker_fee TYPE numeric USING taker_fee::numeric,
+ ALTER COLUMN expiration_time_seconds TYPE numeric USING expiration_time_seconds::numeric,
+ ALTER COLUMN salt TYPE numeric USING salt::numeric;`,
+ );
+ }
+
+ public async down(queryRunner: QueryRunner): Promise<any> {
+ await queryRunner.query(
+ `ALTER TABLE raw.sra_orders
+ ALTER COLUMN maker_asset_amount TYPE varchar USING maker_asset_amount::varchar,
+ ALTER COLUMN taker_asset_amount TYPE varchar USING taker_asset_amount::varchar,
+ ALTER COLUMN maker_fee TYPE varchar USING maker_fee::varchar,
+ ALTER COLUMN taker_fee TYPE varchar USING taker_fee::varchar,
+ ALTER COLUMN expiration_time_seconds TYPE varchar USING expiration_time_seconds::varchar,
+ ALTER COLUMN salt TYPE varchar USING salt::varchar;`,
+ );
+
+ await queryRunner.query(
+ `ALTER TABLE raw.exchange_cancel_up_to_events
+ ALTER COLUMN order_epoch TYPE varchar USING order_epoch::varchar;`,
+ );
+
+ await queryRunner.query(
+ `ALTER TABLE raw.exchange_fill_events
+ ALTER COLUMN maker_asset_filled_amount TYPE varchar USING maker_asset_filled_amount::varchar,
+ ALTER COLUMN taker_asset_filled_amount TYPE varchar USING taker_asset_filled_amount::varchar,
+ ALTER COLUMN maker_fee_paid TYPE varchar USING maker_fee_paid::varchar,
+ ALTER COLUMN taker_fee_paid TYPE varchar USING taker_fee_paid::varchar;`,
+ );
+ }
+}
diff --git a/packages/pipeline/migrations/1542249766882-AddHomepageUrlToRelayers.ts b/packages/pipeline/migrations/1542249766882-AddHomepageUrlToRelayers.ts
new file mode 100644
index 000000000..9a4811ad5
--- /dev/null
+++ b/packages/pipeline/migrations/1542249766882-AddHomepageUrlToRelayers.ts
@@ -0,0 +1,14 @@
+import { MigrationInterface, QueryRunner, TableColumn } from 'typeorm';
+
+export class AddHomepageUrlToRelayers1542249766882 implements MigrationInterface {
+ public async up(queryRunner: QueryRunner): Promise<any> {
+ await queryRunner.addColumn(
+ 'raw.relayers',
+ new TableColumn({ name: 'homepage_url', type: 'varchar', default: `'unknown'` }),
+ );
+ }
+
+ public async down(queryRunner: QueryRunner): Promise<any> {
+ await queryRunner.dropColumn('raw.relayers', 'homepage_url');
+ }
+}
diff --git a/packages/pipeline/migrations/1542401122477-MakeTakerAddressNullable.ts b/packages/pipeline/migrations/1542401122477-MakeTakerAddressNullable.ts
new file mode 100644
index 000000000..957c85a36
--- /dev/null
+++ b/packages/pipeline/migrations/1542401122477-MakeTakerAddressNullable.ts
@@ -0,0 +1,17 @@
+import { MigrationInterface, QueryRunner } from 'typeorm';
+
+export class MakeTakerAddressNullable1542401122477 implements MigrationInterface {
+ public async up(queryRunner: QueryRunner): Promise<any> {
+ await queryRunner.query(
+ `ALTER TABLE raw.exchange_cancel_events
+ ALTER COLUMN taker_address DROP NOT NULL;`,
+ );
+ }
+
+ public async down(queryRunner: QueryRunner): Promise<any> {
+ await queryRunner.query(
+ `ALTER TABLE raw.exchange_cancel_events
+ ALTER COLUMN taker_address SET NOT NULL;`,
+ );
+ }
+}
diff --git a/packages/pipeline/migrations/1542655823221-NewMetadataAndOHLCVTables.ts b/packages/pipeline/migrations/1542655823221-NewMetadataAndOHLCVTables.ts
new file mode 100644
index 000000000..838f5ba9c
--- /dev/null
+++ b/packages/pipeline/migrations/1542655823221-NewMetadataAndOHLCVTables.ts
@@ -0,0 +1,60 @@
+import { MigrationInterface, QueryRunner } from 'typeorm';
+
+export class NewMetadataAndOHLCVTables1542655823221 implements MigrationInterface {
+ // tslint:disable-next-line
+ public async up(queryRunner: QueryRunner): Promise<any> {
+ await queryRunner.query(`
+ CREATE TABLE raw.token_metadata (
+ address VARCHAR NOT NULL,
+ authority VARCHAR NOT NULL,
+ decimals INT NULL,
+ symbol VARCHAR NULL,
+ name VARCHAR NULL,
+
+ PRIMARY KEY (address, authority)
+ );
+ `);
+
+ await queryRunner.dropTable('raw.token_on_chain_metadata');
+
+ await queryRunner.query(`
+ CREATE TABLE raw.ohlcv_external (
+ exchange VARCHAR NOT NULL,
+ from_symbol VARCHAR NOT NULL,
+ to_symbol VARCHAR NOT NULL,
+ start_time BIGINT NOT NULL,
+ end_time BIGINT NOT NULL,
+
+ open DOUBLE PRECISION NOT NULL,
+ close DOUBLE PRECISION NOT NULL,
+ low DOUBLE PRECISION NOT NULL,
+ high DOUBLE PRECISION NOT NULL,
+ volume_from DOUBLE PRECISION NOT NULL,
+ volume_to DOUBLE PRECISION NOT NULL,
+
+ source VARCHAR NOT NULL,
+ observed_timestamp BIGINT NOT NULL,
+
+ PRIMARY KEY (exchange, from_symbol, to_symbol, start_time, end_time, source, observed_timestamp)
+ );
+ `);
+ }
+
+ // tslint:disable-next-line
+ public async down(queryRunner: QueryRunner): Promise<any> {
+ await queryRunner.query(`
+ CREATE TABLE raw.token_on_chain_metadata (
+ address VARCHAR NOT NULL,
+ decimals INT NULL,
+ symbol VARCHAR NULL,
+ name VARCHAR NULL,
+
+ PRIMARY KEY (address)
+ );
+ `);
+
+ await queryRunner.dropTable('raw.token_metadata');
+
+ await queryRunner.dropTable('raw.ohlcv_external');
+ }
+}
diff --git a/packages/pipeline/migrations/1543434472116-TokenOrderbookSnapshots.ts b/packages/pipeline/migrations/1543434472116-TokenOrderbookSnapshots.ts
new file mode 100644
index 000000000..a7117c753
--- /dev/null
+++ b/packages/pipeline/migrations/1543434472116-TokenOrderbookSnapshots.ts
@@ -0,0 +1,30 @@
+import { MigrationInterface, QueryRunner, Table } from 'typeorm';
+
+const tokenOrderbookSnapshots = new Table({
+ name: 'raw.token_orderbook_snapshots',
+ columns: [
+ { name: 'observed_timestamp', type: 'bigint', isPrimary: true },
+ { name: 'source', type: 'varchar', isPrimary: true },
+ { name: 'order_type', type: 'order_t' },
+ { name: 'price', type: 'numeric', isPrimary: true },
+
+ { name: 'base_asset_symbol', type: 'varchar', isPrimary: true },
+ { name: 'base_asset_address', type: 'char(42)' },
+ { name: 'base_volume', type: 'numeric' },
+
+ { name: 'quote_asset_symbol', type: 'varchar', isPrimary: true },
+ { name: 'quote_asset_address', type: 'char(42)' },
+ { name: 'quote_volume', type: 'numeric' },
+ ],
+});
+
+export class TokenOrderbookSnapshots1543434472116 implements MigrationInterface {
+ public async up(queryRunner: QueryRunner): Promise<any> {
+ await queryRunner.query(`CREATE TYPE order_t AS enum('bid', 'ask');`);
+ await queryRunner.createTable(tokenOrderbookSnapshots);
+ }
+
+ public async down(queryRunner: QueryRunner): Promise<any> {
+ await queryRunner.dropTable(tokenOrderbookSnapshots.name);
+ }
+}
diff --git a/packages/pipeline/migrations/1543446690436-CreateDexTrades.ts b/packages/pipeline/migrations/1543446690436-CreateDexTrades.ts
new file mode 100644
index 000000000..267cf144b
--- /dev/null
+++ b/packages/pipeline/migrations/1543446690436-CreateDexTrades.ts
@@ -0,0 +1,41 @@
+import { MigrationInterface, QueryRunner, Table } from 'typeorm';
+
+const dexTrades = new Table({
+ name: 'raw.dex_trades',
+ columns: [
+ { name: 'source_url', type: 'varchar', isPrimary: true },
+ { name: 'tx_hash', type: 'varchar', isPrimary: true },
+
+ { name: 'tx_timestamp', type: 'bigint' },
+ { name: 'tx_date', type: 'varchar' },
+ { name: 'tx_sender', type: 'varchar(42)' },
+ { name: 'smart_contract_id', type: 'bigint' },
+ { name: 'smart_contract_address', type: 'varchar(42)' },
+ { name: 'contract_type', type: 'varchar' },
+ { name: 'maker', type: 'varchar(42)' },
+ { name: 'taker', type: 'varchar(42)' },
+ { name: 'amount_buy', type: 'numeric' },
+ { name: 'maker_fee_amount', type: 'numeric' },
+ { name: 'buy_currency_id', type: 'bigint' },
+ { name: 'buy_symbol', type: 'varchar' },
+ { name: 'amount_sell', type: 'numeric' },
+ { name: 'taker_fee_amount', type: 'numeric' },
+ { name: 'sell_currency_id', type: 'bigint' },
+ { name: 'sell_symbol', type: 'varchar' },
+ { name: 'maker_annotation', type: 'varchar' },
+ { name: 'taker_annotation', type: 'varchar' },
+ { name: 'protocol', type: 'varchar' },
+ { name: 'buy_address', type: 'varchar(42)', isNullable: true },
+ { name: 'sell_address', type: 'varchar(42)', isNullable: true },
+ ],
+});
+
+export class CreateDexTrades1543446690436 implements MigrationInterface {
+ public async up(queryRunner: QueryRunner): Promise<any> {
+ await queryRunner.createTable(dexTrades);
+ }
+
+ public async down(queryRunner: QueryRunner): Promise<any> {
+ await queryRunner.dropTable(dexTrades);
+ }
+}
diff --git a/packages/pipeline/migrations/1543980079179-ConvertTokenMetadataDecimalsToBigNumber.ts b/packages/pipeline/migrations/1543980079179-ConvertTokenMetadataDecimalsToBigNumber.ts
new file mode 100644
index 000000000..351bc7eb8
--- /dev/null
+++ b/packages/pipeline/migrations/1543980079179-ConvertTokenMetadataDecimalsToBigNumber.ts
@@ -0,0 +1,17 @@
+import { MigrationInterface, QueryRunner } from 'typeorm';
+
+export class ConvertTokenMetadataDecimalsToBigNumber1543980079179 implements MigrationInterface {
+ public async up(queryRunner: QueryRunner): Promise<any> {
+ await queryRunner.query(
+ `ALTER TABLE raw.token_metadata
+ ALTER COLUMN decimals TYPE numeric USING decimals::numeric;`,
+ );
+ }
+
+ public async down(queryRunner: QueryRunner): Promise<any> {
+ await queryRunner.query(
+ `ALTER TABLE raw.token_metadata
+ ALTER COLUMN decimals TYPE numeric USING decimals::integer;`,
+ );
+ }
+}
diff --git a/packages/pipeline/migrations/1543983324954-ConvertTransactionGasPriceToBigNumber.ts b/packages/pipeline/migrations/1543983324954-ConvertTransactionGasPriceToBigNumber.ts
new file mode 100644
index 000000000..dcb0fd727
--- /dev/null
+++ b/packages/pipeline/migrations/1543983324954-ConvertTransactionGasPriceToBigNumber.ts
@@ -0,0 +1,19 @@
+import { MigrationInterface, QueryRunner } from 'typeorm';
+
+export class ConvertTransactionGasPriceToBigNumber1543983324954 implements MigrationInterface {
+ public async up(queryRunner: QueryRunner): Promise<any> {
+ await queryRunner.query(
+ `ALTER TABLE raw.transactions
+ ALTER COLUMN gas_price TYPE numeric USING gas_price::numeric,
+ ALTER COLUMN gas_used TYPE numeric USING gas_used::numeric;`,
+ );
+ }
+
+ public async down(queryRunner: QueryRunner): Promise<any> {
+ await queryRunner.query(
+ `ALTER TABLE raw.transactions
+ ALTER COLUMN gas_price TYPE numeric USING gas_price::bigint,
+ ALTER COLUMN gas_used TYPE numeric USING gas_used::bigint;`,
+ );
+ }
+}
diff --git a/packages/pipeline/package.json b/packages/pipeline/package.json
new file mode 100644
index 000000000..0539618d4
--- /dev/null
+++ b/packages/pipeline/package.json
@@ -0,0 +1,65 @@
+{
+ "name": "@0x/pipeline",
+ "version": "1.0.0",
+ "private": true,
+ "description": "Data pipeline for offline analysis",
+ "scripts": {
+ "build": "yarn tsc -b",
+ "build:ci": "yarn build",
+ "test": "yarn run_mocha",
+ "rebuild_and_test": "run-s build test:all",
+ "test:db": "yarn run_mocha:db",
+ "test:all": "run-s test test:db",
+ "test:circleci": "yarn test:coverage",
+ "run_mocha": "mocha --require source-map-support/register --require make-promises-safe 'lib/test/!(entities)/**/*_test.js' --bail --exit",
+ "run_mocha:db": "mocha --require source-map-support/register --require make-promises-safe lib/test/db_global_hooks.js 'lib/test/entities/*_test.js' --bail --exit --timeout 60000",
+ "test:coverage": "nyc npm run test:all --all && yarn coverage:report:lcov",
+ "coverage:report:lcov": "nyc report --reporter=text-lcov > coverage/lcov.info",
+ "clean": "shx rm -rf lib",
+ "lint": "tslint --project . --format stylish --exclude ./migrations/**/*",
+ "migrate:run": "yarn typeorm migration:run --config ./lib/src/ormconfig",
+ "migrate:revert": "yarn typeorm migration:revert --config ./lib/src/ormconfig",
+ "migrate:create": "yarn typeorm migration:create --config ./lib/src/ormconfig --dir migrations"
+ },
+ "repository": {
+ "type": "git",
+ "url": "https://github.com/0xProject/0x-monorepo"
+ },
+ "license": "Apache-2.0",
+ "devDependencies": {
+ "@0x/tslint-config": "^1.0.9",
+ "@types/axios": "^0.14.0",
+ "@types/ramda": "^0.25.38",
+ "chai": "^4.1.2",
+ "chai-as-promised": "^7.1.1",
+ "chai-bignumber": "^2.0.2",
+ "dirty-chai": "^2.0.1",
+ "mocha": "^5.2.0",
+ "tslint": "5.11.0",
+ "typescript": "3.0.1"
+ },
+ "dependencies": {
+ "@0x/connect": "^3.0.2",
+ "@0x/contract-artifacts": "^1.0.1",
+ "@0x/contract-wrappers": "^3.0.0",
+ "@0x/dev-utils": "^1.0.13",
+ "@0x/order-utils": "^2.0.0",
+ "@0x/subproviders": "^2.1.0",
+ "@0x/types": "^1.2.0",
+ "@0x/utils": "^2.0.3",
+ "@0x/web3-wrapper": "^3.1.0",
+ "@types/dockerode": "^2.5.9",
+ "@types/p-limit": "^2.0.0",
+ "async-parallel": "^1.2.3",
+ "axios": "^0.18.0",
+ "dockerode": "^2.5.7",
+ "ethereum-types": "^1.0.6",
+ "p-limit": "^2.0.0",
+ "pg": "^7.5.0",
+ "prettier": "^1.15.3",
+ "ramda": "^0.25.0",
+ "reflect-metadata": "^0.1.12",
+ "sqlite3": "^4.0.2",
+ "typeorm": "^0.2.7"
+ }
+}
diff --git a/packages/pipeline/src/data_sources/bloxy/index.ts b/packages/pipeline/src/data_sources/bloxy/index.ts
new file mode 100644
index 000000000..31cd5bfd6
--- /dev/null
+++ b/packages/pipeline/src/data_sources/bloxy/index.ts
@@ -0,0 +1,133 @@
+import axios from 'axios';
+import * as R from 'ramda';
+
+// URL to use for getting dex trades from Bloxy.
+export const BLOXY_DEX_TRADES_URL = 'https://bloxy.info/api/dex/trades';
+// Number of trades to get at once. Must be less than or equal to MAX_OFFSET.
+const TRADES_PER_QUERY = 10000;
+// Maximum offset supported by the Bloxy API.
+const MAX_OFFSET = 100000;
+// Buffer to subtract from offset. This means we will request some trades twice
+// but we have less chance on missing out on any data.
+const OFFSET_BUFFER = 1000;
+// Maximum number of days supported by the Bloxy API.
+const MAX_DAYS = 30;
+// Buffer used for comparing the last seen timestamp to the last returned
+// timestamp. Increasing this reduces chances of data loss but also creates more
+// redundancy and can impact performance.
+// tslint:disable-next-line:custom-no-magic-numbers
+const LAST_SEEN_TIMESTAMP_BUFFER_MS = 1000 * 60 * 30; // 30 minutes
+
+// tslint:disable-next-line:custom-no-magic-numbers
+const millisecondsPerDay = 1000 * 60 * 60 * 24; // ms/d = ms/s * s/m * m/h * h/d
+
+export interface BloxyTrade {
+ tx_hash: string;
+ tx_time: string;
+ tx_date: string;
+ tx_sender: string;
+ smart_contract_id: number;
+ smart_contract_address: string;
+ contract_type: string;
+ maker: string;
+ taker: string;
+ amountBuy: number;
+ makerFee: number;
+ buyCurrencyId: number;
+ buySymbol: string;
+ amountSell: number;
+ takerFee: number;
+ sellCurrencyId: number;
+ sellSymbol: string;
+ maker_annotation: string;
+ taker_annotation: string;
+ protocol: string;
+ buyAddress: string | null;
+ sellAddress: string | null;
+}
+
+interface BloxyError {
+ error: string;
+}
+
+type BloxyResponse<T> = T | BloxyError;
+type BloxyTradeResponse = BloxyResponse<BloxyTrade[]>;
+
+function isError<T>(response: BloxyResponse<T>): response is BloxyError {
+ return (response as BloxyError).error !== undefined;
+}
+
+export class BloxySource {
+ private readonly _apiKey: string;
+
+ constructor(apiKey: string) {
+ this._apiKey = apiKey;
+ }
+
+ /**
+ * Gets all latest trades between the lastSeenTimestamp (minus some buffer)
+ * and the current time. Note that because the Bloxy API has some hard
+ * limits it might not always be possible to get *all* the trades in the
+ * desired time range.
+ * @param lastSeenTimestamp The latest timestamp for trades that have
+ * already been seen.
+ */
+ public async getDexTradesAsync(lastSeenTimestamp: number): Promise<BloxyTrade[]> {
+ let allTrades: BloxyTrade[] = [];
+
+ // Clamp numberOfDays so that it is always between 1 and MAX_DAYS (inclusive)
+ const numberOfDays = R.clamp(1, MAX_DAYS, getDaysSinceTimestamp(lastSeenTimestamp));
+
+ // Keep getting trades until we hit one of the following conditions:
+ //
+ // 1. Offset hits MAX_OFFSET (we can't go back any further).
+ // 2. There are no more trades in the response.
+ // 3. We see a tx_time equal to or earlier than lastSeenTimestamp (plus
+ // some buffer).
+ //
+ for (let offset = 0; offset <= MAX_OFFSET; offset += TRADES_PER_QUERY - OFFSET_BUFFER) {
+ const trades = await this._getTradesWithOffsetAsync(numberOfDays, offset);
+ if (trades.length === 0) {
+ // There are no more trades left for the days we are querying.
+ // This means we are done.
+ return filterDuplicateTrades(allTrades);
+ }
+ const sortedTrades = R.reverse(R.sortBy(trade => trade.tx_time, trades));
+ allTrades = allTrades.concat(sortedTrades);
+
+ // Check if lastReturnedTimestamp < lastSeenTimestamp
+ const lastReturnedTimestamp = new Date(sortedTrades[0].tx_time).getTime();
+ if (lastReturnedTimestamp < lastSeenTimestamp - LAST_SEEN_TIMESTAMP_BUFFER_MS) {
+ // We are at the point where we have already seen trades for the
+ // timestamp range that is being returned. We're done.
+ return filterDuplicateTrades(allTrades);
+ }
+ }
+ return filterDuplicateTrades(allTrades);
+ }
+
+ private async _getTradesWithOffsetAsync(numberOfDays: number, offset: number): Promise<BloxyTrade[]> {
+ const resp = await axios.get<BloxyTradeResponse>(BLOXY_DEX_TRADES_URL, {
+ params: {
+ key: this._apiKey,
+ days: numberOfDays,
+ limit: TRADES_PER_QUERY,
+ offset,
+ },
+ });
+ if (isError(resp.data)) {
+ throw new Error('Error in Bloxy API response: ' + resp.data.error);
+ }
+ return resp.data;
+ }
+}
+
+// Computes the number of days between the given timestamp and the current
+// timestamp (rounded up).
+function getDaysSinceTimestamp(timestamp: number): number {
+ const msSinceTimestamp = Date.now() - timestamp;
+ const daysSinceTimestamp = msSinceTimestamp / millisecondsPerDay;
+ return Math.ceil(daysSinceTimestamp);
+}
+
+const filterDuplicateTrades = R.uniqBy((trade: BloxyTrade) => trade.tx_hash);
diff --git a/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts b/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts
new file mode 100644
index 000000000..1717eb8b3
--- /dev/null
+++ b/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts
@@ -0,0 +1,85 @@
+import {
+ ContractWrappers,
+ ExchangeCancelEventArgs,
+ ExchangeCancelUpToEventArgs,
+ ExchangeEventArgs,
+ ExchangeEvents,
+ ExchangeFillEventArgs,
+ ExchangeWrapper,
+} from '@0x/contract-wrappers';
+import { Web3ProviderEngine } from '@0x/subproviders';
+import { Web3Wrapper } from '@0x/web3-wrapper';
+import { LogWithDecodedArgs } from 'ethereum-types';
+
+import { EXCHANGE_START_BLOCK } from '../../utils';
+
+const BLOCK_FINALITY_THRESHOLD = 10; // When to consider blocks as final. Used to compute default toBlock.
+const NUM_BLOCKS_PER_QUERY = 20000; // Number of blocks to query for events at a time.
+
+export class ExchangeEventsSource {
+ private readonly _exchangeWrapper: ExchangeWrapper;
+ private readonly _web3Wrapper: Web3Wrapper;
+ constructor(provider: Web3ProviderEngine, networkId: number) {
+ this._web3Wrapper = new Web3Wrapper(provider);
+ const contractWrappers = new ContractWrappers(provider, { networkId });
+ this._exchangeWrapper = contractWrappers.exchange;
+ }
+
+ public async getFillEventsAsync(
+ fromBlock?: number,
+ toBlock?: number,
+ ): Promise<Array<LogWithDecodedArgs<ExchangeFillEventArgs>>> {
+ return this._getEventsAsync<ExchangeFillEventArgs>(ExchangeEvents.Fill, fromBlock, toBlock);
+ }
+
+ public async getCancelEventsAsync(
+ fromBlock?: number,
+ toBlock?: number,
+ ): Promise<Array<LogWithDecodedArgs<ExchangeCancelEventArgs>>> {
+ return this._getEventsAsync<ExchangeCancelEventArgs>(ExchangeEvents.Cancel, fromBlock, toBlock);
+ }
+
+ public async getCancelUpToEventsAsync(
+ fromBlock?: number,
+ toBlock?: number,
+ ): Promise<Array<LogWithDecodedArgs<ExchangeCancelUpToEventArgs>>> {
+ return this._getEventsAsync<ExchangeCancelUpToEventArgs>(ExchangeEvents.CancelUpTo, fromBlock, toBlock);
+ }
+
+ private async _getEventsAsync<ArgsType extends ExchangeEventArgs>(
+ eventName: ExchangeEvents,
+ fromBlock: number = EXCHANGE_START_BLOCK,
+ toBlock?: number,
+ ): Promise<Array<LogWithDecodedArgs<ArgsType>>> {
+ const calculatedToBlock =
+ toBlock === undefined
+ ? (await this._web3Wrapper.getBlockNumberAsync()) - BLOCK_FINALITY_THRESHOLD
+ : toBlock;
+ let events: Array<LogWithDecodedArgs<ArgsType>> = [];
+ for (let currFromBlock = fromBlock; currFromBlock <= calculatedToBlock; currFromBlock += NUM_BLOCKS_PER_QUERY) {
+ events = events.concat(
+ await this._getEventsForRangeAsync<ArgsType>(
+ eventName,
+ currFromBlock,
+ Math.min(currFromBlock + NUM_BLOCKS_PER_QUERY - 1, calculatedToBlock),
+ ),
+ );
+ }
+ return events;
+ }
+
+ private async _getEventsForRangeAsync<ArgsType extends ExchangeEventArgs>(
+ eventName: ExchangeEvents,
+ fromBlock: number,
+ toBlock: number,
+ ): Promise<Array<LogWithDecodedArgs<ArgsType>>> {
+ return this._exchangeWrapper.getLogsAsync<ArgsType>(
+ eventName,
+ {
+ fromBlock,
+ toBlock,
+ },
+ {},
+ );
+ }
+}
diff --git a/packages/pipeline/src/data_sources/ddex/index.ts b/packages/pipeline/src/data_sources/ddex/index.ts
new file mode 100644
index 000000000..2bbd8c29b
--- /dev/null
+++ b/packages/pipeline/src/data_sources/ddex/index.ts
@@ -0,0 +1,78 @@
+import { fetchAsync, logUtils } from '@0x/utils';
+
+const DDEX_BASE_URL = 'https://api.ddex.io/v2';
+const ACTIVE_MARKETS_URL = `${DDEX_BASE_URL}/markets`;
+const NO_AGGREGATION_LEVEL = 3; // See https://docs.ddex.io/#get-orderbook
+const ORDERBOOK_ENDPOINT = `/orderbook?level=${NO_AGGREGATION_LEVEL}`;
+export const DDEX_SOURCE = 'ddex';
+
+export interface DdexActiveMarketsResponse {
+ status: number;
+ desc: string;
+ data: {
+ markets: DdexMarket[];
+ };
+}
+
+export interface DdexMarket {
+ id: string;
+ quoteToken: string;
+ quoteTokenDecimals: number;
+ quoteTokenAddress: string;
+ baseToken: string;
+ baseTokenDecimals: number;
+ baseTokenAddress: string;
+ minOrderSize: string;
+ maxOrderSize: string;
+ pricePrecision: number;
+ priceDecimals: number;
+ amountDecimals: number;
+}
+
+export interface DdexOrderbookResponse {
+ status: number;
+ desc: string;
+ data: {
+ orderBook: DdexOrderbook;
+ };
+}
+
+export interface DdexOrderbook {
+ marketId: string;
+ bids: DdexOrder[];
+ asks: DdexOrder[];
+}
+
+export interface DdexOrder {
+ price: string;
+ amount: string;
+ orderId: string;
+}
+
+// tslint:disable:prefer-function-over-method
+// ^ Keep consistency with other sources and help logical organization
+export class DdexSource {
+ /**
+ * Call Ddex API to find out which markets they are maintaining orderbooks for.
+ */
+ public async getActiveMarketsAsync(): Promise<DdexMarket[]> {
+ logUtils.log('Getting all active DDEX markets');
+ const resp = await fetchAsync(ACTIVE_MARKETS_URL);
+ const respJson: DdexActiveMarketsResponse = await resp.json();
+ const markets = respJson.data.markets;
+ logUtils.log(`Got ${markets.length} markets.`);
+ return markets;
+ }
+
+ /**
+ * Retrieve orderbook from Ddex API for a given market.
+ * @param marketId String identifying the market we want data for. Eg. 'REP/AUG'
+ */
+ public async getMarketOrderbookAsync(marketId: string): Promise<DdexOrderbook> {
+ logUtils.log(`${marketId}: Retrieving orderbook.`);
+ const marketOrderbookUrl = `${ACTIVE_MARKETS_URL}/${marketId}${ORDERBOOK_ENDPOINT}`;
+ const resp = await fetchAsync(marketOrderbookUrl);
+ const respJson: DdexOrderbookResponse = await resp.json();
+ return respJson.data.orderBook;
+ }
+}
diff --git a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts
new file mode 100644
index 000000000..6b10c29c5
--- /dev/null
+++ b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts
@@ -0,0 +1,94 @@
+// tslint:disable:no-duplicate-imports
+import { fetchAsync } from '@0x/utils';
+import promiseLimit = require('p-limit');
+import { stringify } from 'querystring';
+import * as R from 'ramda';
+
+import { TradingPair } from '../../utils/get_ohlcv_trading_pairs';
+
+export interface CryptoCompareOHLCVResponse {
+ Data: Map<string, CryptoCompareOHLCVRecord[]>;
+ Response: string;
+ Message: string;
+}
+
+export interface CryptoCompareOHLCVRecord {
+ time: number; // in seconds, not milliseconds
+ close: number;
+ high: number;
+ low: number;
+ open: number;
+ volumefrom: number;
+ volumeto: number;
+}
+
+export interface CryptoCompareOHLCVParams {
+ fsym: string;
+ tsym: string;
+ e?: string;
+ aggregate?: string;
+ aggregatePredictableTimePeriods?: boolean;
+ limit?: number;
+ toTs?: number;
+}
+
+const ONE_WEEK = 7 * 24 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers
+const ONE_HOUR = 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers
+const ONE_SECOND = 1000;
+const HTTP_OK_STATUS = 200;
+
+export class CryptoCompareOHLCVSource {
+ public readonly interval = ONE_WEEK; // the hourly API returns data for one week at a time
+ public readonly default_exchange = 'CCCAGG';
+ public readonly intervalBetweenRecords = ONE_HOUR;
+ private readonly _url: string = 'https://min-api.cryptocompare.com/data/histohour?';
+
+ // rate-limit for all API calls through this class instance
+ private readonly _promiseLimit: (fetchFn: () => Promise<Response>) => Promise<Response>;
+ constructor(maxConcurrentRequests: number = 50) {
+ this._promiseLimit = promiseLimit(maxConcurrentRequests);
+ }
+
+ // gets OHLCV records starting from pair.latest
+ public async getHourlyOHLCVAsync(pair: TradingPair): Promise<CryptoCompareOHLCVRecord[]> {
+ const params = {
+ e: this.default_exchange,
+ fsym: pair.fromSymbol,
+ tsym: pair.toSymbol,
+ toTs: Math.floor((pair.latestSavedTime + this.interval) / ONE_SECOND), // CryptoCompare uses timestamp in seconds. not ms
+ };
+ const url = this._url + stringify(params);
+
+ // go through the instance-wide rate-limit
+ const fetchPromise: Promise<Response> = this._promiseLimit(() => {
+ // tslint:disable-next-line:no-console
+ console.log(`Scraping Crypto Compare at ${url}`);
+ return fetchAsync(url);
+ });
+
+ const response = await Promise.resolve(fetchPromise);
+ if (response.status !== HTTP_OK_STATUS) {
+ // tslint:disable-next-line:no-console
+ console.log(`Error scraping ${url}`);
+ return [];
+ }
+ const json: CryptoCompareOHLCVResponse = await response.json();
+ if (json.Response === 'Error' || Object.keys(json.Data).length === 0) {
+ // tslint:disable-next-line:no-console
+ console.log(`Error scraping ${url}: ${json.Message}`);
+ return [];
+ }
+ return Object.values(json.Data).filter(rec => rec.time * ONE_SECOND >= pair.latestSavedTime);
+ }
+ public generateBackfillIntervals(pair: TradingPair): TradingPair[] {
+ const now = new Date().getTime();
+ const f = (p: TradingPair): false | [TradingPair, TradingPair] => {
+ if (p.latestSavedTime > now) {
+ return false;
+ } else {
+ return [p, R.merge(p, { latestSavedTime: p.latestSavedTime + this.interval })];
+ }
+ };
+ return R.unfold(f, pair);
+ }
+}
diff --git a/packages/pipeline/src/data_sources/paradex/index.ts b/packages/pipeline/src/data_sources/paradex/index.ts
new file mode 100644
index 000000000..69a03d553
--- /dev/null
+++ b/packages/pipeline/src/data_sources/paradex/index.ts
@@ -0,0 +1,92 @@
+import { fetchAsync, logUtils } from '@0x/utils';
+
+const PARADEX_BASE_URL = 'https://api.paradex.io/consumer/v0';
+const ACTIVE_MARKETS_URL = PARADEX_BASE_URL + '/markets';
+const ORDERBOOK_ENDPOINT = PARADEX_BASE_URL + '/orderbook';
+const TOKEN_INFO_ENDPOINT = PARADEX_BASE_URL + '/tokens';
+export const PARADEX_SOURCE = 'paradex';
+
+export type ParadexActiveMarketsResponse = ParadexMarket[];
+
+export interface ParadexMarket {
+ id: string;
+ symbol: string;
+ baseToken: string;
+ quoteToken: string;
+ minOrderSize: string;
+ maxOrderSize: string;
+ priceMaxDecimals: number;
+ amountMaxDecimals: number;
+ // These are not native to the Paradex API response. We tag them on later
+ // by calling the token endpoint and joining on symbol.
+ baseTokenAddress?: string;
+ quoteTokenAddress?: string;
+}
+
+export interface ParadexOrderbookResponse {
+ marketId: number;
+ marketSymbol: string;
+ bids: ParadexOrder[];
+ asks: ParadexOrder[];
+}
+
+export interface ParadexOrder {
+ amount: string;
+ price: string;
+}
+
+export type ParadexTokenInfoResponse = ParadexTokenInfo[];
+
+export interface ParadexTokenInfo {
+ name: string;
+ symbol: string;
+ address: string;
+}
+
+export class ParadexSource {
+ private readonly _apiKey: string;
+
+ constructor(apiKey: string) {
+ this._apiKey = apiKey;
+ }
+
+ /**
+ * Call Paradex API to find out which markets they are maintaining orderbooks for.
+ */
+ public async getActiveMarketsAsync(): Promise<ParadexActiveMarketsResponse> {
+ logUtils.log('Getting all active Paradex markets.');
+ const resp = await fetchAsync(ACTIVE_MARKETS_URL, {
+ headers: { 'API-KEY': this._apiKey },
+ });
+ const markets: ParadexActiveMarketsResponse = await resp.json();
+ logUtils.log(`Got ${markets.length} markets.`);
+ return markets;
+ }
+
+ /**
+ * Call Paradex API to find out their token information.
+ */
+ public async getTokenInfoAsync(): Promise<ParadexTokenInfoResponse> {
+ logUtils.log('Getting token information from Paradex.');
+ const resp = await fetchAsync(TOKEN_INFO_ENDPOINT, {
+ headers: { 'API-KEY': this._apiKey },
+ });
+ const tokens: ParadexTokenInfoResponse = await resp.json();
+ logUtils.log(`Got information for ${tokens.length} tokens.`);
+ return tokens;
+ }
+
+ /**
+ * Retrieve orderbook from Paradex API for a given market.
+ * @param marketSymbol String representing the market we want data for.
+ */
+ public async getMarketOrderbookAsync(marketSymbol: string): Promise<ParadexOrderbookResponse> {
+ logUtils.log(`${marketSymbol}: Retrieving orderbook.`);
+ const marketOrderbookUrl = `${ORDERBOOK_ENDPOINT}?market=${marketSymbol}`;
+ const resp = await fetchAsync(marketOrderbookUrl, {
+ headers: { 'API-KEY': this._apiKey },
+ });
+ const orderbookResponse: ParadexOrderbookResponse = await resp.json();
+ return orderbookResponse;
+ }
+}
diff --git a/packages/pipeline/src/data_sources/relayer-registry/index.ts b/packages/pipeline/src/data_sources/relayer-registry/index.ts
new file mode 100644
index 000000000..8133f5eae
--- /dev/null
+++ b/packages/pipeline/src/data_sources/relayer-registry/index.ts
@@ -0,0 +1,33 @@
+import axios from 'axios';
+
+export interface RelayerResponse {
+ name: string;
+ homepage_url: string;
+ app_url: string;
+ header_img: string;
+ logo_img: string;
+ networks: RelayerResponseNetwork[];
+}
+
+export interface RelayerResponseNetwork {
+ networkId: number;
+ sra_http_endpoint?: string;
+ sra_ws_endpoint?: string;
+ static_order_fields?: {
+ fee_recipient_addresses?: string[];
+ taker_addresses?: string[];
+ };
+}
+
+export class RelayerRegistrySource {
+ private readonly _url: string;
+
+ constructor(url: string) {
+ this._url = url;
+ }
+
+ public async getRelayerInfoAsync(): Promise<Map<string, RelayerResponse>> {
+ const resp = await axios.get<Map<string, RelayerResponse>>(this._url);
+ return resp.data;
+ }
+}
diff --git a/packages/pipeline/src/data_sources/trusted_tokens/index.ts b/packages/pipeline/src/data_sources/trusted_tokens/index.ts
new file mode 100644
index 000000000..552739fb9
--- /dev/null
+++ b/packages/pipeline/src/data_sources/trusted_tokens/index.ts
@@ -0,0 +1,29 @@
+import axios from 'axios';
+
+export interface ZeroExTrustedTokenMeta {
+ address: string;
+ name: string;
+ symbol: string;
+ decimals: number;
+}
+
+export interface MetamaskTrustedTokenMeta {
+ address: string;
+ name: string;
+ erc20: boolean;
+ symbol: string;
+ decimals: number;
+}
+
+export class TrustedTokenSource<T> {
+ private readonly _url: string;
+
+ constructor(url: string) {
+ this._url = url;
+ }
+
+ public async getTrustedTokenMetaAsync(): Promise<T> {
+ const resp = await axios.get<T>(this._url);
+ return resp.data;
+ }
+}
diff --git a/packages/pipeline/src/data_sources/web3/index.ts b/packages/pipeline/src/data_sources/web3/index.ts
new file mode 100644
index 000000000..45a9ea161
--- /dev/null
+++ b/packages/pipeline/src/data_sources/web3/index.ts
@@ -0,0 +1,22 @@
+import { Web3ProviderEngine } from '@0x/subproviders';
+import { Web3Wrapper } from '@0x/web3-wrapper';
+import { BlockWithoutTransactionData, Transaction } from 'ethereum-types';
+
+export class Web3Source {
+ private readonly _web3Wrapper: Web3Wrapper;
+ constructor(provider: Web3ProviderEngine) {
+ this._web3Wrapper = new Web3Wrapper(provider);
+ }
+
+ public async getBlockInfoAsync(blockNumber: number): Promise<BlockWithoutTransactionData> {
+ const block = await this._web3Wrapper.getBlockIfExistsAsync(blockNumber);
+ if (block == null) {
+ return Promise.reject(new Error(`Could not find block for given block number: ${blockNumber}`));
+ }
+ return block;
+ }
+
+ public async getTransactionInfoAsync(txHash: string): Promise<Transaction> {
+ return this._web3Wrapper.getTransactionByHashAsync(txHash);
+ }
+}
diff --git a/packages/pipeline/src/entities/block.ts b/packages/pipeline/src/entities/block.ts
new file mode 100644
index 000000000..398946622
--- /dev/null
+++ b/packages/pipeline/src/entities/block.ts
@@ -0,0 +1,13 @@
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+import { numberToBigIntTransformer } from '../utils';
+
+@Entity({ name: 'blocks', schema: 'raw' })
+export class Block {
+ @PrimaryColumn() public hash!: string;
+ @PrimaryColumn({ transformer: numberToBigIntTransformer })
+ public number!: number;
+
+ @Column({ name: 'timestamp', transformer: numberToBigIntTransformer })
+ public timestamp!: number;
+}
diff --git a/packages/pipeline/src/entities/dex_trade.ts b/packages/pipeline/src/entities/dex_trade.ts
new file mode 100644
index 000000000..9d288cb51
--- /dev/null
+++ b/packages/pipeline/src/entities/dex_trade.ts
@@ -0,0 +1,54 @@
+import { BigNumber } from '@0x/utils';
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+import { bigNumberTransformer, numberToBigIntTransformer } from '../utils';
+
+@Entity({ name: 'dex_trades', schema: 'raw' })
+export class DexTrade {
+ @PrimaryColumn({ name: 'source_url' })
+ public sourceUrl!: string;
+ @PrimaryColumn({ name: 'tx_hash' })
+ public txHash!: string;
+
+ @Column({ name: 'tx_timestamp', type: 'bigint', transformer: numberToBigIntTransformer })
+ public txTimestamp!: number;
+ @Column({ name: 'tx_date' })
+ public txDate!: string;
+ @Column({ name: 'tx_sender' })
+ public txSender!: string;
+ @Column({ name: 'smart_contract_id', type: 'bigint', transformer: numberToBigIntTransformer })
+ public smartContractId!: number;
+ @Column({ name: 'smart_contract_address' })
+ public smartContractAddress!: string;
+ @Column({ name: 'contract_type' })
+ public contractType!: string;
+ @Column({ type: 'varchar' })
+ public maker!: string;
+ @Column({ type: 'varchar' })
+ public taker!: string;
+ @Column({ name: 'amount_buy', type: 'numeric', transformer: bigNumberTransformer })
+ public amountBuy!: BigNumber;
+ @Column({ name: 'maker_fee_amount', type: 'numeric', transformer: bigNumberTransformer })
+ public makerFeeAmount!: BigNumber;
+ @Column({ name: 'buy_currency_id', type: 'bigint', transformer: numberToBigIntTransformer })
+ public buyCurrencyId!: number;
+ @Column({ name: 'buy_symbol' })
+ public buySymbol!: string;
+ @Column({ name: 'amount_sell', type: 'numeric', transformer: bigNumberTransformer })
+ public amountSell!: BigNumber;
+ @Column({ name: 'taker_fee_amount', type: 'numeric', transformer: bigNumberTransformer })
+ public takerFeeAmount!: BigNumber;
+ @Column({ name: 'sell_currency_id', type: 'bigint', transformer: numberToBigIntTransformer })
+ public sellCurrencyId!: number;
+ @Column({ name: 'sell_symbol' })
+ public sellSymbol!: string;
+ @Column({ name: 'maker_annotation' })
+ public makerAnnotation!: string;
+ @Column({ name: 'taker_annotation' })
+ public takerAnnotation!: string;
+ @Column() public protocol!: string;
+ @Column({ name: 'buy_address', type: 'varchar', nullable: true })
+ public buyAddress!: string | null;
+ @Column({ name: 'sell_address', type: 'varchar', nullable: true })
+ public sellAddress!: string | null;
+}
diff --git a/packages/pipeline/src/entities/exchange_cancel_event.ts b/packages/pipeline/src/entities/exchange_cancel_event.ts
new file mode 100644
index 000000000..38f99c903
--- /dev/null
+++ b/packages/pipeline/src/entities/exchange_cancel_event.ts
@@ -0,0 +1,51 @@
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+import { AssetType } from '../types';
+import { numberToBigIntTransformer } from '../utils';
+
+@Entity({ name: 'exchange_cancel_events', schema: 'raw' })
+export class ExchangeCancelEvent {
+ @PrimaryColumn({ name: 'contract_address' })
+ public contractAddress!: string;
+ @PrimaryColumn({ name: 'log_index' })
+ public logIndex!: number;
+ @PrimaryColumn({ name: 'block_number', transformer: numberToBigIntTransformer })
+ public blockNumber!: number;
+
+ @Column({ name: 'raw_data' })
+ public rawData!: string;
+
+ @Column({ name: 'transaction_hash' })
+ public transactionHash!: string;
+ @Column({ name: 'maker_address' })
+ public makerAddress!: string;
+ @Column({ nullable: true, type: String, name: 'taker_address' })
+ public takerAddress!: string;
+ @Column({ name: 'fee_recipient_address' })
+ public feeRecipientAddress!: string;
+ @Column({ name: 'sender_address' })
+ public senderAddress!: string;
+ @Column({ name: 'order_hash' })
+ public orderHash!: string;
+
+ @Column({ name: 'raw_maker_asset_data' })
+ public rawMakerAssetData!: string;
+ @Column({ name: 'maker_asset_type' })
+ public makerAssetType!: AssetType;
+ @Column({ name: 'maker_asset_proxy_id' })
+ public makerAssetProxyId!: string;
+ @Column({ name: 'maker_token_address' })
+ public makerTokenAddress!: string;
+ @Column({ nullable: true, type: String, name: 'maker_token_id' })
+ public makerTokenId!: string | null;
+ @Column({ name: 'raw_taker_asset_data' })
+ public rawTakerAssetData!: string;
+ @Column({ name: 'taker_asset_type' })
+ public takerAssetType!: AssetType;
+ @Column({ name: 'taker_asset_proxy_id' })
+ public takerAssetProxyId!: string;
+ @Column({ name: 'taker_token_address' })
+ public takerTokenAddress!: string;
+ @Column({ nullable: true, type: String, name: 'taker_token_id' })
+ public takerTokenId!: string | null;
+}
diff --git a/packages/pipeline/src/entities/exchange_cancel_up_to_event.ts b/packages/pipeline/src/entities/exchange_cancel_up_to_event.ts
new file mode 100644
index 000000000..27580305e
--- /dev/null
+++ b/packages/pipeline/src/entities/exchange_cancel_up_to_event.ts
@@ -0,0 +1,26 @@
+import { BigNumber } from '@0x/utils';
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+import { bigNumberTransformer, numberToBigIntTransformer } from '../utils';
+
+@Entity({ name: 'exchange_cancel_up_to_events', schema: 'raw' })
+export class ExchangeCancelUpToEvent {
+ @PrimaryColumn({ name: 'contract_address' })
+ public contractAddress!: string;
+ @PrimaryColumn({ name: 'log_index' })
+ public logIndex!: number;
+ @PrimaryColumn({ name: 'block_number', transformer: numberToBigIntTransformer })
+ public blockNumber!: number;
+
+ @Column({ name: 'raw_data' })
+ public rawData!: string;
+
+ @Column({ name: 'transaction_hash' })
+ public transactionHash!: string;
+ @Column({ name: 'maker_address' })
+ public makerAddress!: string;
+ @Column({ name: 'sender_address' })
+ public senderAddress!: string;
+ @Column({ name: 'order_epoch', type: 'numeric', transformer: bigNumberTransformer })
+ public orderEpoch!: BigNumber;
+}
diff --git a/packages/pipeline/src/entities/exchange_fill_event.ts b/packages/pipeline/src/entities/exchange_fill_event.ts
new file mode 100644
index 000000000..9b7727615
--- /dev/null
+++ b/packages/pipeline/src/entities/exchange_fill_event.ts
@@ -0,0 +1,60 @@
+import { BigNumber } from '@0x/utils';
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+import { AssetType } from '../types';
+import { bigNumberTransformer, numberToBigIntTransformer } from '../utils';
+
+@Entity({ name: 'exchange_fill_events', schema: 'raw' })
+export class ExchangeFillEvent {
+ @PrimaryColumn({ name: 'contract_address' })
+ public contractAddress!: string;
+ @PrimaryColumn({ name: 'log_index' })
+ public logIndex!: number;
+ @PrimaryColumn({ name: 'block_number', transformer: numberToBigIntTransformer })
+ public blockNumber!: number;
+
+ @Column({ name: 'raw_data' })
+ public rawData!: string;
+
+ @Column({ name: 'transaction_hash' })
+ public transactionHash!: string;
+ @Column({ name: 'maker_address' })
+ public makerAddress!: string;
+ @Column({ name: 'taker_address' })
+ public takerAddress!: string;
+ @Column({ name: 'fee_recipient_address' })
+ public feeRecipientAddress!: string;
+ @Column({ name: 'sender_address' })
+ public senderAddress!: string;
+ @Column({ name: 'maker_asset_filled_amount', type: 'numeric', transformer: bigNumberTransformer })
+ public makerAssetFilledAmount!: BigNumber;
+ @Column({ name: 'taker_asset_filled_amount', type: 'numeric', transformer: bigNumberTransformer })
+ public takerAssetFilledAmount!: BigNumber;
+ @Column({ name: 'maker_fee_paid', type: 'numeric', transformer: bigNumberTransformer })
+ public makerFeePaid!: BigNumber;
+ @Column({ name: 'taker_fee_paid', type: 'numeric', transformer: bigNumberTransformer })
+ public takerFeePaid!: BigNumber;
+ @Column({ name: 'order_hash' })
+ public orderHash!: string;
+
+ @Column({ name: 'raw_maker_asset_data' })
+ public rawMakerAssetData!: string;
+ @Column({ name: 'maker_asset_type' })
+ public makerAssetType!: AssetType;
+ @Column({ name: 'maker_asset_proxy_id' })
+ public makerAssetProxyId!: string;
+ @Column({ name: 'maker_token_address' })
+ public makerTokenAddress!: string;
+ @Column({ nullable: true, type: String, name: 'maker_token_id' })
+ public makerTokenId!: string | null;
+ @Column({ name: 'raw_taker_asset_data' })
+ public rawTakerAssetData!: string;
+ @Column({ name: 'taker_asset_type' })
+ public takerAssetType!: AssetType;
+ @Column({ name: 'taker_asset_proxy_id' })
+ public takerAssetProxyId!: string;
+ @Column({ name: 'taker_token_address' })
+ public takerTokenAddress!: string;
+ @Column({ nullable: true, type: String, name: 'taker_token_id' })
+ public takerTokenId!: string | null;
+}
diff --git a/packages/pipeline/src/entities/index.ts b/packages/pipeline/src/entities/index.ts
new file mode 100644
index 000000000..db0814e38
--- /dev/null
+++ b/packages/pipeline/src/entities/index.ts
@@ -0,0 +1,18 @@
+import { ExchangeCancelEvent } from './exchange_cancel_event';
+import { ExchangeCancelUpToEvent } from './exchange_cancel_up_to_event';
+import { ExchangeFillEvent } from './exchange_fill_event';
+
+export { Block } from './block';
+export { DexTrade } from './dex_trade';
+export { ExchangeCancelEvent } from './exchange_cancel_event';
+export { ExchangeCancelUpToEvent } from './exchange_cancel_up_to_event';
+export { ExchangeFillEvent } from './exchange_fill_event';
+export { OHLCVExternal } from './ohlcv_external';
+export { Relayer } from './relayer';
+export { SraOrder } from './sra_order';
+export { SraOrdersObservedTimeStamp, createObservedTimestampForOrder } from './sra_order_observed_timestamp';
+export { TokenMetadata } from './token_metadata';
+export { TokenOrderbookSnapshot } from './token_order';
+export { Transaction } from './transaction';
+
+export type ExchangeEvent = ExchangeFillEvent | ExchangeCancelEvent | ExchangeCancelUpToEvent;
diff --git a/packages/pipeline/src/entities/ohlcv_external.ts b/packages/pipeline/src/entities/ohlcv_external.ts
new file mode 100644
index 000000000..4f55dd930
--- /dev/null
+++ b/packages/pipeline/src/entities/ohlcv_external.ts
@@ -0,0 +1,30 @@
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+import { numberToBigIntTransformer } from '../utils';
+
+@Entity({ name: 'ohlcv_external', schema: 'raw' })
+export class OHLCVExternal {
+ @PrimaryColumn() public exchange!: string;
+
+ @PrimaryColumn({ name: 'from_symbol', type: 'varchar' })
+ public fromSymbol!: string;
+ @PrimaryColumn({ name: 'to_symbol', type: 'varchar' })
+ public toSymbol!: string;
+ @PrimaryColumn({ name: 'start_time', transformer: numberToBigIntTransformer })
+ public startTime!: number;
+ @PrimaryColumn({ name: 'end_time', transformer: numberToBigIntTransformer })
+ public endTime!: number;
+
+ @Column() public open!: number;
+ @Column() public close!: number;
+ @Column() public low!: number;
+ @Column() public high!: number;
+ @Column({ name: 'volume_from' })
+ public volumeFrom!: number;
+ @Column({ name: 'volume_to' })
+ public volumeTo!: number;
+
+ @PrimaryColumn() public source!: string;
+ @PrimaryColumn({ name: 'observed_timestamp', transformer: numberToBigIntTransformer })
+ public observedTimestamp!: number;
+}
diff --git a/packages/pipeline/src/entities/relayer.ts b/packages/pipeline/src/entities/relayer.ts
new file mode 100644
index 000000000..5af8578b4
--- /dev/null
+++ b/packages/pipeline/src/entities/relayer.ts
@@ -0,0 +1,21 @@
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+@Entity({ name: 'relayers', schema: 'raw' })
+export class Relayer {
+ @PrimaryColumn() public uuid!: string;
+
+ @Column() public name!: string;
+ @Column({ name: 'homepage_url', type: 'varchar' })
+ public homepageUrl!: string;
+ @Column({ name: 'sra_http_endpoint', type: 'varchar', nullable: true })
+ public sraHttpEndpoint!: string | null;
+ @Column({ name: 'sra_ws_endpoint', type: 'varchar', nullable: true })
+ public sraWsEndpoint!: string | null;
+ @Column({ name: 'app_url', type: 'varchar', nullable: true })
+ public appUrl!: string | null;
+
+ @Column({ name: 'fee_recipient_addresses', type: 'varchar', array: true })
+ public feeRecipientAddresses!: string[];
+ @Column({ name: 'taker_addresses', type: 'varchar', array: true })
+ public takerAddresses!: string[];
+}
diff --git a/packages/pipeline/src/entities/sra_order.ts b/packages/pipeline/src/entities/sra_order.ts
new file mode 100644
index 000000000..9c730a0bb
--- /dev/null
+++ b/packages/pipeline/src/entities/sra_order.ts
@@ -0,0 +1,63 @@
+import { BigNumber } from '@0x/utils';
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+import { AssetType } from '../types';
+import { bigNumberTransformer } from '../utils';
+
+@Entity({ name: 'sra_orders', schema: 'raw' })
+export class SraOrder {
+ @PrimaryColumn({ name: 'exchange_address' })
+ public exchangeAddress!: string;
+ @PrimaryColumn({ name: 'order_hash_hex' })
+ public orderHashHex!: string;
+ @PrimaryColumn({ name: 'source_url' })
+ public sourceUrl!: string;
+
+ @Column({ name: 'maker_address' })
+ public makerAddress!: string;
+ @Column({ name: 'taker_address' })
+ public takerAddress!: string;
+ @Column({ name: 'fee_recipient_address' })
+ public feeRecipientAddress!: string;
+ @Column({ name: 'sender_address' })
+ public senderAddress!: string;
+ @Column({ name: 'maker_asset_amount', type: 'numeric', transformer: bigNumberTransformer })
+ public makerAssetAmount!: BigNumber;
+ @Column({ name: 'taker_asset_amount', type: 'numeric', transformer: bigNumberTransformer })
+ public takerAssetAmount!: BigNumber;
+ @Column({ name: 'maker_fee', type: 'numeric', transformer: bigNumberTransformer })
+ public makerFee!: BigNumber;
+ @Column({ name: 'taker_fee', type: 'numeric', transformer: bigNumberTransformer })
+ public takerFee!: BigNumber;
+ @Column({ name: 'expiration_time_seconds', type: 'numeric', transformer: bigNumberTransformer })
+ public expirationTimeSeconds!: BigNumber;
+ @Column({ name: 'salt', type: 'numeric', transformer: bigNumberTransformer })
+ public salt!: BigNumber;
+ @Column({ name: 'signature' })
+ public signature!: string;
+
+ @Column({ name: 'raw_maker_asset_data' })
+ public rawMakerAssetData!: string;
+ @Column({ name: 'maker_asset_type' })
+ public makerAssetType!: AssetType;
+ @Column({ name: 'maker_asset_proxy_id' })
+ public makerAssetProxyId!: string;
+ @Column({ name: 'maker_token_address' })
+ public makerTokenAddress!: string;
+ @Column({ nullable: true, type: String, name: 'maker_token_id' })
+ public makerTokenId!: string | null;
+ @Column({ name: 'raw_taker_asset_data' })
+ public rawTakerAssetData!: string;
+ @Column({ name: 'taker_asset_type' })
+ public takerAssetType!: AssetType;
+ @Column({ name: 'taker_asset_proxy_id' })
+ public takerAssetProxyId!: string;
+ @Column({ name: 'taker_token_address' })
+ public takerTokenAddress!: string;
+ @Column({ nullable: true, type: String, name: 'taker_token_id' })
+ public takerTokenId!: string | null;
+
+ // TODO(albrow): Make this optional?
+ @Column({ name: 'metadata_json' })
+ public metadataJson!: string;
+}
diff --git a/packages/pipeline/src/entities/sra_order_observed_timestamp.ts b/packages/pipeline/src/entities/sra_order_observed_timestamp.ts
new file mode 100644
index 000000000..cd2d41397
--- /dev/null
+++ b/packages/pipeline/src/entities/sra_order_observed_timestamp.ts
@@ -0,0 +1,32 @@
+import { Entity, PrimaryColumn } from 'typeorm';
+
+import { numberToBigIntTransformer } from '../utils';
+
+import { SraOrder } from './sra_order';
+
+@Entity({ name: 'sra_orders_observed_timestamps', schema: 'raw' })
+export class SraOrdersObservedTimeStamp {
+ @PrimaryColumn({ name: 'exchange_address' })
+ public exchangeAddress!: string;
+ @PrimaryColumn({ name: 'order_hash_hex' })
+ public orderHashHex!: string;
+ @PrimaryColumn({ name: 'source_url' })
+ public sourceUrl!: string;
+
+ @PrimaryColumn({ name: 'observed_timestamp', transformer: numberToBigIntTransformer })
+ public observedTimestamp!: number;
+}
+
+/**
+ * Returns a new SraOrdersObservedTimeStamp for the given order based on the
+ * current time.
+ * @param order The order to generate a timestamp for.
+ */
+export function createObservedTimestampForOrder(order: SraOrder): SraOrdersObservedTimeStamp {
+ const observed = new SraOrdersObservedTimeStamp();
+ observed.exchangeAddress = order.exchangeAddress;
+ observed.orderHashHex = order.orderHashHex;
+ observed.sourceUrl = order.sourceUrl;
+ observed.observedTimestamp = Date.now();
+ return observed;
+}
diff --git a/packages/pipeline/src/entities/token_metadata.ts b/packages/pipeline/src/entities/token_metadata.ts
new file mode 100644
index 000000000..911b53972
--- /dev/null
+++ b/packages/pipeline/src/entities/token_metadata.ts
@@ -0,0 +1,22 @@
+import { BigNumber } from '@0x/utils';
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+import { bigNumberTransformer } from '../utils/transformers';
+
+@Entity({ name: 'token_metadata', schema: 'raw' })
+export class TokenMetadata {
+ @PrimaryColumn({ type: 'varchar', nullable: false })
+ public address!: string;
+
+ @PrimaryColumn({ type: 'varchar', nullable: false })
+ public authority!: string;
+
+ @Column({ type: 'numeric', transformer: bigNumberTransformer, nullable: true })
+ public decimals!: BigNumber | null;
+
+ @Column({ type: 'varchar', nullable: true })
+ public symbol!: string | null;
+
+ @Column({ type: 'varchar', nullable: true })
+ public name!: string | null;
+}
diff --git a/packages/pipeline/src/entities/token_order.ts b/packages/pipeline/src/entities/token_order.ts
new file mode 100644
index 000000000..557705767
--- /dev/null
+++ b/packages/pipeline/src/entities/token_order.ts
@@ -0,0 +1,29 @@
+import { BigNumber } from '@0x/utils';
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+import { OrderType } from '../types';
+import { bigNumberTransformer, numberToBigIntTransformer } from '../utils';
+
+@Entity({ name: 'token_orderbook_snapshots', schema: 'raw' })
+export class TokenOrderbookSnapshot {
+ @PrimaryColumn({ name: 'observed_timestamp', type: 'bigint', transformer: numberToBigIntTransformer })
+ public observedTimestamp!: number;
+ @PrimaryColumn({ name: 'source' })
+ public source!: string;
+ @Column({ name: 'order_type' })
+ public orderType!: OrderType;
+ @PrimaryColumn({ name: 'price', type: 'numeric', transformer: bigNumberTransformer })
+ public price!: BigNumber;
+ @PrimaryColumn({ name: 'base_asset_symbol' })
+ public baseAssetSymbol!: string;
+ @Column({ name: 'base_asset_address' })
+ public baseAssetAddress!: string;
+ @Column({ name: 'base_volume', type: 'numeric', transformer: bigNumberTransformer })
+ public baseVolume!: BigNumber;
+ @PrimaryColumn({ name: 'quote_asset_symbol' })
+ public quoteAssetSymbol!: string;
+ @Column({ name: 'quote_asset_address' })
+ public quoteAssetAddress!: string;
+ @Column({ name: 'quote_volume', type: 'numeric', transformer: bigNumberTransformer })
+ public quoteVolume!: BigNumber;
+}
diff --git a/packages/pipeline/src/entities/transaction.ts b/packages/pipeline/src/entities/transaction.ts
new file mode 100644
index 000000000..742050177
--- /dev/null
+++ b/packages/pipeline/src/entities/transaction.ts
@@ -0,0 +1,19 @@
+import { BigNumber } from '@0x/utils';
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+import { bigNumberTransformer, numberToBigIntTransformer } from '../utils';
+
+@Entity({ name: 'transactions', schema: 'raw' })
+export class Transaction {
+ @PrimaryColumn({ name: 'transaction_hash' })
+ public transactionHash!: string;
+ @PrimaryColumn({ name: 'block_hash' })
+ public blockHash!: string;
+ @PrimaryColumn({ name: 'block_number', transformer: numberToBigIntTransformer })
+ public blockNumber!: number;
+
+ @Column({ type: 'numeric', name: 'gas_used', transformer: bigNumberTransformer })
+ public gasUsed!: BigNumber;
+ @Column({ type: 'numeric', name: 'gas_price', transformer: bigNumberTransformer })
+ public gasPrice!: BigNumber;
+}
diff --git a/packages/pipeline/src/ormconfig.ts b/packages/pipeline/src/ormconfig.ts
new file mode 100644
index 000000000..9f7815b4e
--- /dev/null
+++ b/packages/pipeline/src/ormconfig.ts
@@ -0,0 +1,42 @@
+import { ConnectionOptions } from 'typeorm';
+
+import {
+ Block,
+ DexTrade,
+ ExchangeCancelEvent,
+ ExchangeCancelUpToEvent,
+ ExchangeFillEvent,
+ OHLCVExternal,
+ Relayer,
+ SraOrder,
+ SraOrdersObservedTimeStamp,
+ TokenMetadata,
+ TokenOrderbookSnapshot,
+ Transaction,
+} from './entities';
+
+const entities = [
+ Block,
+ DexTrade,
+ ExchangeCancelEvent,
+ ExchangeCancelUpToEvent,
+ ExchangeFillEvent,
+ OHLCVExternal,
+ Relayer,
+ SraOrder,
+ SraOrdersObservedTimeStamp,
+ TokenMetadata,
+ TokenOrderbookSnapshot,
+ Transaction,
+];
+
+const config: ConnectionOptions = {
+ type: 'postgres',
+ url: process.env.ZEROEX_DATA_PIPELINE_DB_URL,
+ synchronize: false,
+ logging: ['error'],
+ entities,
+ migrations: ['./lib/migrations/**/*.js'],
+};
+
+module.exports = config;
diff --git a/packages/pipeline/src/parsers/bloxy/index.ts b/packages/pipeline/src/parsers/bloxy/index.ts
new file mode 100644
index 000000000..caa55d289
--- /dev/null
+++ b/packages/pipeline/src/parsers/bloxy/index.ts
@@ -0,0 +1,53 @@
+import { BigNumber } from '@0x/utils';
+import * as R from 'ramda';
+
+import { BLOXY_DEX_TRADES_URL, BloxyTrade } from '../../data_sources/bloxy';
+import { DexTrade } from '../../entities';
+
+/**
+ * Parses a raw trades response from the Bloxy Dex API and returns an array of
+ * DexTrade entities.
+ * @param rawTrades A raw order response from an SRA endpoint.
+ */
+export function parseBloxyTrades(rawTrades: BloxyTrade[]): DexTrade[] {
+ return R.map(_parseBloxyTrade, rawTrades);
+}
+
+/**
+ * Converts a single Bloxy trade into a DexTrade entity.
+ * @param rawTrade A single trade from the response from the Bloxy API.
+ */
+export function _parseBloxyTrade(rawTrade: BloxyTrade): DexTrade {
+ const dexTrade = new DexTrade();
+ dexTrade.sourceUrl = BLOXY_DEX_TRADES_URL;
+ dexTrade.txHash = rawTrade.tx_hash;
+ dexTrade.txTimestamp = new Date(rawTrade.tx_time).getTime();
+ dexTrade.txDate = rawTrade.tx_date;
+ dexTrade.txSender = rawTrade.tx_sender;
+ dexTrade.smartContractId = rawTrade.smart_contract_id;
+ dexTrade.smartContractAddress = rawTrade.smart_contract_address;
+ dexTrade.contractType = rawTrade.contract_type;
+ dexTrade.maker = rawTrade.maker;
+ dexTrade.taker = rawTrade.taker;
+ // TODO(albrow): The Bloxy API returns amounts and fees as a `number` type
+ // but some of their values have too many significant digits to be
+ // represented that way. Ideally they will switch to using strings and then
+ // we can update this code.
+ dexTrade.amountBuy = new BigNumber(rawTrade.amountBuy.toString());
+ dexTrade.makerFeeAmount = new BigNumber(rawTrade.makerFee.toString());
+ dexTrade.buyCurrencyId = rawTrade.buyCurrencyId;
+ dexTrade.buySymbol = filterNullCharacters(rawTrade.buySymbol);
+ dexTrade.amountSell = new BigNumber(rawTrade.amountSell.toString());
+ dexTrade.takerFeeAmount = new BigNumber(rawTrade.takerFee.toString());
+ dexTrade.sellCurrencyId = rawTrade.sellCurrencyId;
+ dexTrade.sellSymbol = filterNullCharacters(rawTrade.sellSymbol);
+ dexTrade.makerAnnotation = rawTrade.maker_annotation;
+ dexTrade.takerAnnotation = rawTrade.taker_annotation;
+ dexTrade.protocol = rawTrade.protocol;
+ dexTrade.buyAddress = rawTrade.buyAddress;
+ dexTrade.sellAddress = rawTrade.sellAddress;
+ return dexTrade;
+}
+
+// Works with any form of escaped null character (e.g., '\0' and '\u0000').
+const filterNullCharacters = R.replace(/\0/g, '');
diff --git a/packages/pipeline/src/parsers/ddex_orders/index.ts b/packages/pipeline/src/parsers/ddex_orders/index.ts
new file mode 100644
index 000000000..81132e8f0
--- /dev/null
+++ b/packages/pipeline/src/parsers/ddex_orders/index.ts
@@ -0,0 +1,77 @@
+import { BigNumber } from '@0x/utils';
+import * as R from 'ramda';
+
+import { DdexMarket, DdexOrder, DdexOrderbook } from '../../data_sources/ddex';
+import { TokenOrderbookSnapshot as TokenOrder } from '../../entities';
+import { OrderType } from '../../types';
+
+/**
+ * Marque function of this file.
+ * 1) Takes in orders from an orderbook,
+ * other information attached.
+ * @param ddexOrderbook A raw orderbook that we pull from the Ddex API.
+ * @param ddexMarket An object containing market data also directly from the API.
+ * @param observedTimestamp Time at which the orders for the market were pulled.
+ * @param source The exchange where these orders are placed. In this case 'ddex'.
+ */
+export function parseDdexOrders(
+ ddexOrderbook: DdexOrderbook,
+ ddexMarket: DdexMarket,
+ observedTimestamp: number,
+ source: string,
+): TokenOrder[] {
+ const aggregatedBids = aggregateOrders(ddexOrderbook.bids);
+ const aggregatedAsks = aggregateOrders(ddexOrderbook.asks);
+ const parsedBids = aggregatedBids.map(order => parseDdexOrder(ddexMarket, observedTimestamp, 'bid', source, order));
+ const parsedAsks = aggregatedAsks.map(order => parseDdexOrder(ddexMarket, observedTimestamp, 'ask', source, order));
+ return parsedBids.concat(parsedAsks);
+}
+
+/**
+ * Aggregates orders by price point for consistency with other exchanges.
+ * Querying the Ddex API at level 3 setting returns a breakdown of
+ * individual orders at each price point. Other exchanges only give total amount
+ * at each price point. Returns an array of <price, amount> tuples.
+ * @param ddexOrders A list of Ddex orders awaiting aggregation.
+ */
+export function aggregateOrders(ddexOrders: DdexOrder[]): Array<[string, BigNumber]> {
+ const sumAmount = (acc: BigNumber, order: DdexOrder): BigNumber => acc.plus(order.amount);
+ const aggregatedPricePoints = R.reduceBy(sumAmount, new BigNumber(0), R.prop('price'), ddexOrders);
+ return Object.entries(aggregatedPricePoints);
+}
+
+/**
+ * Parse a single aggregated Ddex order in order to form a tokenOrder entity
+ * which can be saved into the database.
+ * @param ddexMarket An object containing information about the market where these
+ * trades have been placed.
+ * @param observedTimestamp The time when the API response returned back to us.
+ * @param orderType 'bid' or 'ask' enum.
+ * @param source Exchange where these orders were placed.
+ * @param ddexOrder A <price, amount> tuple which we will convert to volume-basis.
+ */
+export function parseDdexOrder(
+ ddexMarket: DdexMarket,
+ observedTimestamp: number,
+ orderType: OrderType,
+ source: string,
+ ddexOrder: [string, BigNumber],
+): TokenOrder {
+ const tokenOrder = new TokenOrder();
+ const price = new BigNumber(ddexOrder[0]);
+ const amount = ddexOrder[1];
+
+ tokenOrder.source = source;
+ tokenOrder.observedTimestamp = observedTimestamp;
+ tokenOrder.orderType = orderType;
+ tokenOrder.price = price;
+
+ tokenOrder.baseAssetSymbol = ddexMarket.baseToken;
+ tokenOrder.baseAssetAddress = ddexMarket.baseTokenAddress;
+ tokenOrder.baseVolume = price.times(amount);
+
+ tokenOrder.quoteAssetSymbol = ddexMarket.quoteToken;
+ tokenOrder.quoteAssetAddress = ddexMarket.quoteTokenAddress;
+ tokenOrder.quoteVolume = amount;
+ return tokenOrder;
+}
diff --git a/packages/pipeline/src/parsers/events/index.ts b/packages/pipeline/src/parsers/events/index.ts
new file mode 100644
index 000000000..e18106c75
--- /dev/null
+++ b/packages/pipeline/src/parsers/events/index.ts
@@ -0,0 +1,133 @@
+import { ExchangeCancelEventArgs, ExchangeCancelUpToEventArgs, ExchangeFillEventArgs } from '@0x/contract-wrappers';
+import { assetDataUtils } from '@0x/order-utils';
+import { AssetProxyId, ERC721AssetData } from '@0x/types';
+import { LogWithDecodedArgs } from 'ethereum-types';
+import * as R from 'ramda';
+
+import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeFillEvent } from '../../entities';
+import { bigNumbertoStringOrNull } from '../../utils';
+
+/**
+ * Parses raw event logs for a fill event and returns an array of
+ * ExchangeFillEvent entities.
+ * @param eventLogs Raw event logs (e.g. returned from contract-wrappers).
+ */
+export const parseExchangeFillEvents: (
+ eventLogs: Array<LogWithDecodedArgs<ExchangeFillEventArgs>>,
+) => ExchangeFillEvent[] = R.map(_convertToExchangeFillEvent);
+
+/**
+ * Parses raw event logs for a cancel event and returns an array of
+ * ExchangeCancelEvent entities.
+ * @param eventLogs Raw event logs (e.g. returned from contract-wrappers).
+ */
+export const parseExchangeCancelEvents: (
+ eventLogs: Array<LogWithDecodedArgs<ExchangeCancelEventArgs>>,
+) => ExchangeCancelEvent[] = R.map(_convertToExchangeCancelEvent);
+
+/**
+ * Parses raw event logs for a CancelUpTo event and returns an array of
+ * ExchangeCancelUpToEvent entities.
+ * @param eventLogs Raw event logs (e.g. returned from contract-wrappers).
+ */
+export const parseExchangeCancelUpToEvents: (
+ eventLogs: Array<LogWithDecodedArgs<ExchangeCancelUpToEventArgs>>,
+) => ExchangeCancelUpToEvent[] = R.map(_convertToExchangeCancelUpToEvent);
+
+/**
+ * Converts a raw event log for a fill event into an ExchangeFillEvent entity.
+ * @param eventLog Raw event log (e.g. returned from contract-wrappers).
+ */
+export function _convertToExchangeFillEvent(eventLog: LogWithDecodedArgs<ExchangeFillEventArgs>): ExchangeFillEvent {
+ const makerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.makerAssetData);
+ const makerAssetType = makerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721';
+ const takerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.takerAssetData);
+ const takerAssetType = takerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721';
+ const exchangeFillEvent = new ExchangeFillEvent();
+ exchangeFillEvent.contractAddress = eventLog.address as string;
+ exchangeFillEvent.blockNumber = eventLog.blockNumber as number;
+ exchangeFillEvent.logIndex = eventLog.logIndex as number;
+ exchangeFillEvent.rawData = eventLog.data as string;
+ exchangeFillEvent.transactionHash = eventLog.transactionHash;
+ exchangeFillEvent.makerAddress = eventLog.args.makerAddress;
+ exchangeFillEvent.takerAddress = eventLog.args.takerAddress;
+ exchangeFillEvent.feeRecipientAddress = eventLog.args.feeRecipientAddress;
+ exchangeFillEvent.senderAddress = eventLog.args.senderAddress;
+ exchangeFillEvent.makerAssetFilledAmount = eventLog.args.makerAssetFilledAmount;
+ exchangeFillEvent.takerAssetFilledAmount = eventLog.args.takerAssetFilledAmount;
+ exchangeFillEvent.makerFeePaid = eventLog.args.makerFeePaid;
+ exchangeFillEvent.takerFeePaid = eventLog.args.takerFeePaid;
+ exchangeFillEvent.orderHash = eventLog.args.orderHash;
+ exchangeFillEvent.rawMakerAssetData = eventLog.args.makerAssetData;
+ exchangeFillEvent.makerAssetType = makerAssetType;
+ exchangeFillEvent.makerAssetProxyId = makerAssetData.assetProxyId;
+ exchangeFillEvent.makerTokenAddress = makerAssetData.tokenAddress;
+ // tslint has a false positive here. Type assertion is required.
+ // tslint:disable-next-line:no-unnecessary-type-assertion
+ exchangeFillEvent.makerTokenId = bigNumbertoStringOrNull((makerAssetData as ERC721AssetData).tokenId);
+ exchangeFillEvent.rawTakerAssetData = eventLog.args.takerAssetData;
+ exchangeFillEvent.takerAssetType = takerAssetType;
+ exchangeFillEvent.takerAssetProxyId = takerAssetData.assetProxyId;
+ exchangeFillEvent.takerTokenAddress = takerAssetData.tokenAddress;
+ // tslint:disable-next-line:no-unnecessary-type-assertion
+ exchangeFillEvent.takerTokenId = bigNumbertoStringOrNull((takerAssetData as ERC721AssetData).tokenId);
+ return exchangeFillEvent;
+}
+
+/**
+ * Converts a raw event log for a cancel event into an ExchangeCancelEvent
+ * entity.
+ * @param eventLog Raw event log (e.g. returned from contract-wrappers).
+ */
+export function _convertToExchangeCancelEvent(
+ eventLog: LogWithDecodedArgs<ExchangeCancelEventArgs>,
+): ExchangeCancelEvent {
+ const makerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.makerAssetData);
+ const makerAssetType = makerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721';
+ const takerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.takerAssetData);
+ const takerAssetType = takerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721';
+ const exchangeCancelEvent = new ExchangeCancelEvent();
+ exchangeCancelEvent.contractAddress = eventLog.address as string;
+ exchangeCancelEvent.blockNumber = eventLog.blockNumber as number;
+ exchangeCancelEvent.logIndex = eventLog.logIndex as number;
+ exchangeCancelEvent.rawData = eventLog.data as string;
+ exchangeCancelEvent.transactionHash = eventLog.transactionHash;
+ exchangeCancelEvent.makerAddress = eventLog.args.makerAddress;
+ exchangeCancelEvent.takerAddress = eventLog.args.takerAddress;
+ exchangeCancelEvent.feeRecipientAddress = eventLog.args.feeRecipientAddress;
+ exchangeCancelEvent.senderAddress = eventLog.args.senderAddress;
+ exchangeCancelEvent.orderHash = eventLog.args.orderHash;
+ exchangeCancelEvent.rawMakerAssetData = eventLog.args.makerAssetData;
+ exchangeCancelEvent.makerAssetType = makerAssetType;
+ exchangeCancelEvent.makerAssetProxyId = makerAssetData.assetProxyId;
+ exchangeCancelEvent.makerTokenAddress = makerAssetData.tokenAddress;
+ // tslint:disable-next-line:no-unnecessary-type-assertion
+ exchangeCancelEvent.makerTokenId = bigNumbertoStringOrNull((makerAssetData as ERC721AssetData).tokenId);
+ exchangeCancelEvent.rawTakerAssetData = eventLog.args.takerAssetData;
+ exchangeCancelEvent.takerAssetType = takerAssetType;
+ exchangeCancelEvent.takerAssetProxyId = takerAssetData.assetProxyId;
+ exchangeCancelEvent.takerTokenAddress = takerAssetData.tokenAddress;
+ // tslint:disable-next-line:no-unnecessary-type-assertion
+ exchangeCancelEvent.takerTokenId = bigNumbertoStringOrNull((takerAssetData as ERC721AssetData).tokenId);
+ return exchangeCancelEvent;
+}
+
+/**
+ * Converts a raw event log for a cancelUpTo event into an
+ * ExchangeCancelUpToEvent entity.
+ * @param eventLog Raw event log (e.g. returned from contract-wrappers).
+ */
+export function _convertToExchangeCancelUpToEvent(
+ eventLog: LogWithDecodedArgs<ExchangeCancelUpToEventArgs>,
+): ExchangeCancelUpToEvent {
+ const exchangeCancelUpToEvent = new ExchangeCancelUpToEvent();
+ exchangeCancelUpToEvent.contractAddress = eventLog.address as string;
+ exchangeCancelUpToEvent.blockNumber = eventLog.blockNumber as number;
+ exchangeCancelUpToEvent.logIndex = eventLog.logIndex as number;
+ exchangeCancelUpToEvent.rawData = eventLog.data as string;
+ exchangeCancelUpToEvent.transactionHash = eventLog.transactionHash;
+ exchangeCancelUpToEvent.makerAddress = eventLog.args.makerAddress;
+ exchangeCancelUpToEvent.senderAddress = eventLog.args.senderAddress;
+ exchangeCancelUpToEvent.orderEpoch = eventLog.args.orderEpoch;
+ return exchangeCancelUpToEvent;
+}
diff --git a/packages/pipeline/src/parsers/ohlcv_external/crypto_compare.ts b/packages/pipeline/src/parsers/ohlcv_external/crypto_compare.ts
new file mode 100644
index 000000000..3efb90384
--- /dev/null
+++ b/packages/pipeline/src/parsers/ohlcv_external/crypto_compare.ts
@@ -0,0 +1,38 @@
+import { CryptoCompareOHLCVRecord } from '../../data_sources/ohlcv_external/crypto_compare';
+import { OHLCVExternal } from '../../entities';
+
+const ONE_SECOND = 1000; // Crypto Compare uses timestamps in seconds instead of milliseconds
+
+export interface OHLCVMetadata {
+ exchange: string;
+ fromSymbol: string;
+ toSymbol: string;
+ source: string;
+ observedTimestamp: number;
+ interval: number;
+}
+/**
+ * Parses OHLCV records from Crypto Compare into an array of OHLCVExternal entities
+ * @param rawRecords an array of OHLCV records from Crypto Compare (not the full response)
+ */
+export function parseRecords(rawRecords: CryptoCompareOHLCVRecord[], metadata: OHLCVMetadata): OHLCVExternal[] {
+ return rawRecords.map(rec => {
+ const ohlcvRecord = new OHLCVExternal();
+ ohlcvRecord.exchange = metadata.exchange;
+ ohlcvRecord.fromSymbol = metadata.fromSymbol;
+ ohlcvRecord.toSymbol = metadata.toSymbol;
+ ohlcvRecord.startTime = rec.time * ONE_SECOND - metadata.interval;
+ ohlcvRecord.endTime = rec.time * ONE_SECOND;
+
+ ohlcvRecord.open = rec.open;
+ ohlcvRecord.close = rec.close;
+ ohlcvRecord.low = rec.low;
+ ohlcvRecord.high = rec.high;
+ ohlcvRecord.volumeFrom = rec.volumefrom;
+ ohlcvRecord.volumeTo = rec.volumeto;
+
+ ohlcvRecord.source = metadata.source;
+ ohlcvRecord.observedTimestamp = metadata.observedTimestamp;
+ return ohlcvRecord;
+ });
+}
diff --git a/packages/pipeline/src/parsers/paradex_orders/index.ts b/packages/pipeline/src/parsers/paradex_orders/index.ts
new file mode 100644
index 000000000..7966658a7
--- /dev/null
+++ b/packages/pipeline/src/parsers/paradex_orders/index.ts
@@ -0,0 +1,66 @@
+import { BigNumber } from '@0x/utils';
+
+import { ParadexMarket, ParadexOrder, ParadexOrderbookResponse } from '../../data_sources/paradex';
+import { TokenOrderbookSnapshot as TokenOrder } from '../../entities';
+import { OrderType } from '../../types';
+
+/**
+ * Marque function of this file.
+ * 1) Takes in orders from an orderbook (orders are already aggregated by price point),
+ * 2) For each aggregated order, forms a TokenOrder entity with market data and
+ * other information attached.
+ * @param paradexOrderbookResponse An orderbook response from the Paradex API.
+ * @param paradexMarket An object containing market data also directly from the API.
+ * @param observedTimestamp Time at which the orders for the market were pulled.
+ * @param source The exchange where these orders are placed. In this case 'paradex'.
+ */
+export function parseParadexOrders(
+ paradexOrderbookResponse: ParadexOrderbookResponse,
+ paradexMarket: ParadexMarket,
+ observedTimestamp: number,
+ source: string,
+): TokenOrder[] {
+ const parsedBids = paradexOrderbookResponse.bids.map(order =>
+ parseParadexOrder(paradexMarket, observedTimestamp, 'bid', source, order),
+ );
+ const parsedAsks = paradexOrderbookResponse.asks.map(order =>
+ parseParadexOrder(paradexMarket, observedTimestamp, 'ask', source, order),
+ );
+ return parsedBids.concat(parsedAsks);
+}
+
+/**
+ * Parse a single aggregated Ddex order in order to form a tokenOrder entity
+ * which can be saved into the database.
+ * @param paradexMarket An object containing information about the market where these
+ * orders have been placed.
+ * @param observedTimestamp The time when the API response returned back to us.
+ * @param orderType 'bid' or 'ask' enum.
+ * @param source Exchange where these orders were placed.
+ * @param paradexOrder A ParadexOrder object; basically price, amount tuple.
+ */
+export function parseParadexOrder(
+ paradexMarket: ParadexMarket,
+ observedTimestamp: number,
+ orderType: OrderType,
+ source: string,
+ paradexOrder: ParadexOrder,
+): TokenOrder {
+ const tokenOrder = new TokenOrder();
+ const price = new BigNumber(paradexOrder.price);
+ const amount = new BigNumber(paradexOrder.amount);
+
+ tokenOrder.source = source;
+ tokenOrder.observedTimestamp = observedTimestamp;
+ tokenOrder.orderType = orderType;
+ tokenOrder.price = price;
+
+ tokenOrder.baseAssetSymbol = paradexMarket.baseToken;
+ tokenOrder.baseAssetAddress = paradexMarket.baseTokenAddress as string;
+ tokenOrder.baseVolume = price.times(amount);
+
+ tokenOrder.quoteAssetSymbol = paradexMarket.quoteToken;
+ tokenOrder.quoteAssetAddress = paradexMarket.quoteTokenAddress as string;
+ tokenOrder.quoteVolume = amount;
+ return tokenOrder;
+}
diff --git a/packages/pipeline/src/parsers/relayer_registry/index.ts b/packages/pipeline/src/parsers/relayer_registry/index.ts
new file mode 100644
index 000000000..9723880a4
--- /dev/null
+++ b/packages/pipeline/src/parsers/relayer_registry/index.ts
@@ -0,0 +1,37 @@
+import * as R from 'ramda';
+
+import { RelayerResponse, RelayerResponseNetwork } from '../../data_sources/relayer-registry';
+import { Relayer } from '../../entities';
+
+/**
+ * Parses a raw relayer registry response into an array of Relayer entities.
+ * @param rawResp raw response from the relayer-registry json file.
+ */
+export function parseRelayers(rawResp: Map<string, RelayerResponse>): Relayer[] {
+ const parsedAsObject = R.mapObjIndexed(parseRelayer, rawResp);
+ return R.values(parsedAsObject);
+}
+
+function parseRelayer(relayerResp: RelayerResponse, uuid: string): Relayer {
+ const relayer = new Relayer();
+ relayer.uuid = uuid;
+ relayer.name = relayerResp.name;
+ relayer.homepageUrl = relayerResp.homepage_url;
+ relayer.appUrl = relayerResp.app_url;
+ const mainNetworkRelayerInfo = getMainNetwork(relayerResp);
+ if (mainNetworkRelayerInfo !== undefined) {
+ relayer.sraHttpEndpoint = mainNetworkRelayerInfo.sra_http_endpoint || null;
+ relayer.sraWsEndpoint = mainNetworkRelayerInfo.sra_ws_endpoint || null;
+ relayer.feeRecipientAddresses =
+ R.path(['static_order_fields', 'fee_recipient_addresses'], mainNetworkRelayerInfo) || [];
+ relayer.takerAddresses = R.path(['static_order_fields', 'taker_addresses'], mainNetworkRelayerInfo) || [];
+ } else {
+ relayer.feeRecipientAddresses = [];
+ relayer.takerAddresses = [];
+ }
+ return relayer;
+}
+
+function getMainNetwork(relayerResp: RelayerResponse): RelayerResponseNetwork | undefined {
+ return R.find(network => network.networkId === 1, relayerResp.networks);
+}
diff --git a/packages/pipeline/src/parsers/sra_orders/index.ts b/packages/pipeline/src/parsers/sra_orders/index.ts
new file mode 100644
index 000000000..ef8901e40
--- /dev/null
+++ b/packages/pipeline/src/parsers/sra_orders/index.ts
@@ -0,0 +1,62 @@
+import { APIOrder, OrdersResponse } from '@0x/connect';
+import { assetDataUtils, orderHashUtils } from '@0x/order-utils';
+import { AssetProxyId, ERC721AssetData } from '@0x/types';
+import * as R from 'ramda';
+
+import { SraOrder } from '../../entities';
+import { bigNumbertoStringOrNull } from '../../utils';
+
+/**
+ * Parses a raw order response from an SRA endpoint and returns an array of
+ * SraOrder entities.
+ * @param rawOrdersResponse A raw order response from an SRA endpoint.
+ */
+export function parseSraOrders(rawOrdersResponse: OrdersResponse): SraOrder[] {
+ return R.map(_convertToEntity, rawOrdersResponse.records);
+}
+
+/**
+ * Converts a single APIOrder into an SraOrder entity.
+ * @param apiOrder A single order from the response from an SRA endpoint.
+ */
+export function _convertToEntity(apiOrder: APIOrder): SraOrder {
+ // TODO(albrow): refactor out common asset data decoding code.
+ const makerAssetData = assetDataUtils.decodeAssetDataOrThrow(apiOrder.order.makerAssetData);
+ const makerAssetType = makerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721';
+ const takerAssetData = assetDataUtils.decodeAssetDataOrThrow(apiOrder.order.takerAssetData);
+ const takerAssetType = takerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721';
+
+ const sraOrder = new SraOrder();
+ sraOrder.exchangeAddress = apiOrder.order.exchangeAddress;
+ sraOrder.orderHashHex = orderHashUtils.getOrderHashHex(apiOrder.order);
+
+ sraOrder.makerAddress = apiOrder.order.makerAddress;
+ sraOrder.takerAddress = apiOrder.order.takerAddress;
+ sraOrder.feeRecipientAddress = apiOrder.order.feeRecipientAddress;
+ sraOrder.senderAddress = apiOrder.order.senderAddress;
+ sraOrder.makerAssetAmount = apiOrder.order.makerAssetAmount;
+ sraOrder.takerAssetAmount = apiOrder.order.takerAssetAmount;
+ sraOrder.makerFee = apiOrder.order.makerFee;
+ sraOrder.takerFee = apiOrder.order.takerFee;
+ sraOrder.expirationTimeSeconds = apiOrder.order.expirationTimeSeconds;
+ sraOrder.salt = apiOrder.order.salt;
+ sraOrder.signature = apiOrder.order.signature;
+
+ sraOrder.rawMakerAssetData = apiOrder.order.makerAssetData;
+ sraOrder.makerAssetType = makerAssetType;
+ sraOrder.makerAssetProxyId = makerAssetData.assetProxyId;
+ sraOrder.makerTokenAddress = makerAssetData.tokenAddress;
+ // tslint has a false positive here. Type assertion is required.
+ // tslint:disable-next-line:no-unnecessary-type-assertion
+ sraOrder.makerTokenId = bigNumbertoStringOrNull((makerAssetData as ERC721AssetData).tokenId);
+ sraOrder.rawTakerAssetData = apiOrder.order.takerAssetData;
+ sraOrder.takerAssetType = takerAssetType;
+ sraOrder.takerAssetProxyId = takerAssetData.assetProxyId;
+ sraOrder.takerTokenAddress = takerAssetData.tokenAddress;
+ // tslint:disable-next-line:no-unnecessary-type-assertion
+ sraOrder.takerTokenId = bigNumbertoStringOrNull((takerAssetData as ERC721AssetData).tokenId);
+
+ sraOrder.metadataJson = JSON.stringify(apiOrder.metaData);
+
+ return sraOrder;
+}
diff --git a/packages/pipeline/src/parsers/token_metadata/index.ts b/packages/pipeline/src/parsers/token_metadata/index.ts
new file mode 100644
index 000000000..3b3e05d76
--- /dev/null
+++ b/packages/pipeline/src/parsers/token_metadata/index.ts
@@ -0,0 +1,47 @@
+import { BigNumber } from '@0x/utils';
+import * as R from 'ramda';
+
+import { MetamaskTrustedTokenMeta, ZeroExTrustedTokenMeta } from '../../data_sources/trusted_tokens';
+import { TokenMetadata } from '../../entities';
+import {} from '../../utils';
+
+/**
+ * Parses Metamask's trusted tokens list.
+ * @param rawResp raw response from the metamask json file.
+ */
+export function parseMetamaskTrustedTokens(rawResp: Map<string, MetamaskTrustedTokenMeta>): TokenMetadata[] {
+ const parsedAsObject = R.mapObjIndexed(parseMetamaskTrustedToken, rawResp);
+ return R.values(parsedAsObject);
+}
+
+/**
+ * Parses 0x's trusted tokens list.
+ * @param rawResp raw response from the 0x trusted tokens file.
+ */
+export function parseZeroExTrustedTokens(rawResp: ZeroExTrustedTokenMeta[]): TokenMetadata[] {
+ return R.map(parseZeroExTrustedToken, rawResp);
+}
+
+function parseMetamaskTrustedToken(resp: MetamaskTrustedTokenMeta, address: string): TokenMetadata {
+ const trustedToken = new TokenMetadata();
+
+ trustedToken.address = address;
+ trustedToken.decimals = new BigNumber(resp.decimals);
+ trustedToken.symbol = resp.symbol;
+ trustedToken.name = resp.name;
+ trustedToken.authority = 'metamask';
+
+ return trustedToken;
+}
+
+function parseZeroExTrustedToken(resp: ZeroExTrustedTokenMeta): TokenMetadata {
+ const trustedToken = new TokenMetadata();
+
+ trustedToken.address = resp.address;
+ trustedToken.decimals = new BigNumber(resp.decimals);
+ trustedToken.symbol = resp.symbol;
+ trustedToken.name = resp.name;
+ trustedToken.authority = '0x';
+
+ return trustedToken;
+}
diff --git a/packages/pipeline/src/parsers/web3/index.ts b/packages/pipeline/src/parsers/web3/index.ts
new file mode 100644
index 000000000..f986efc59
--- /dev/null
+++ b/packages/pipeline/src/parsers/web3/index.ts
@@ -0,0 +1,49 @@
+import { BigNumber } from '@0x/utils';
+import { BlockWithoutTransactionData, Transaction as EthTransaction } from 'ethereum-types';
+
+import { Block, Transaction } from '../../entities';
+
+const MILLISECONDS_PER_SECOND = 1000;
+
+/**
+ * Parses a raw block and returns a Block entity.
+ * @param rawBlock a raw block (e.g. returned from web3-wrapper).
+ */
+export function parseBlock(rawBlock: BlockWithoutTransactionData): Block {
+ if (rawBlock.hash == null) {
+ throw new Error('Tried to parse raw block but hash was null');
+ }
+ if (rawBlock.number == null) {
+ throw new Error('Tried to parse raw block but number was null');
+ }
+
+ const block = new Block();
+ block.hash = rawBlock.hash;
+ block.number = rawBlock.number;
+ // Block timestamps are in seconds, but we use milliseconds everywhere else.
+ block.timestamp = rawBlock.timestamp * MILLISECONDS_PER_SECOND;
+ return block;
+}
+
+/**
+ * Parses a raw transaction and returns a Transaction entity.
+ * @param rawBlock a raw transaction (e.g. returned from web3-wrapper).
+ */
+export function parseTransaction(rawTransaction: EthTransaction): Transaction {
+ if (rawTransaction.blockHash == null) {
+ throw new Error('Tried to parse raw transaction but blockHash was null');
+ }
+ if (rawTransaction.blockNumber == null) {
+ throw new Error('Tried to parse raw transaction but blockNumber was null');
+ }
+
+ const tx = new Transaction();
+ tx.transactionHash = rawTransaction.hash;
+ tx.blockHash = rawTransaction.blockHash;
+ tx.blockNumber = rawTransaction.blockNumber;
+
+ tx.gasUsed = new BigNumber(rawTransaction.gas);
+ tx.gasPrice = rawTransaction.gasPrice;
+
+ return tx;
+}
diff --git a/packages/pipeline/src/scripts/pull_competing_dex_trades.ts b/packages/pipeline/src/scripts/pull_competing_dex_trades.ts
new file mode 100644
index 000000000..4e4c12dd0
--- /dev/null
+++ b/packages/pipeline/src/scripts/pull_competing_dex_trades.ts
@@ -0,0 +1,51 @@
+// tslint:disable:no-console
+import 'reflect-metadata';
+import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm';
+
+import { BloxySource } from '../data_sources/bloxy';
+import { DexTrade } from '../entities';
+import * as ormConfig from '../ormconfig';
+import { parseBloxyTrades } from '../parsers/bloxy';
+import { handleError } from '../utils';
+
+// Number of trades to save at once.
+const BATCH_SAVE_SIZE = 1000;
+
+let connection: Connection;
+
+(async () => {
+ connection = await createConnection(ormConfig as ConnectionOptions);
+ await getAndSaveTrades();
+ process.exit(0);
+})().catch(handleError);
+
+async function getAndSaveTrades(): Promise<void> {
+ const apiKey = process.env.BLOXY_API_KEY;
+ if (apiKey === undefined) {
+ throw new Error('Missing required env var: BLOXY_API_KEY');
+ }
+ const bloxySource = new BloxySource(apiKey);
+ const tradesRepository = connection.getRepository(DexTrade);
+ const lastSeenTimestamp = await getLastSeenTimestampAsync(tradesRepository);
+ console.log(`Last seen timestamp: ${lastSeenTimestamp === 0 ? 'none' : lastSeenTimestamp}`);
+ console.log('Getting latest dex trades...');
+ const rawTrades = await bloxySource.getDexTradesAsync(lastSeenTimestamp);
+ console.log(`Parsing ${rawTrades.length} trades...`);
+ const trades = parseBloxyTrades(rawTrades);
+ console.log(`Saving ${trades.length} trades...`);
+ await tradesRepository.save(trades, { chunk: Math.ceil(trades.length / BATCH_SAVE_SIZE) });
+ console.log('Done saving trades.');
+}
+
+async function getLastSeenTimestampAsync(tradesRepository: Repository<DexTrade>): Promise<number> {
+ if ((await tradesRepository.count()) === 0) {
+ return 0;
+ }
+ const response = (await connection.query(
+ 'SELECT tx_timestamp FROM raw.dex_trades ORDER BY tx_timestamp DESC LIMIT 1',
+ )) as Array<{ tx_timestamp: number }>;
+ if (response.length === 0) {
+ return 0;
+ }
+ return response[0].tx_timestamp;
+}
diff --git a/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts
new file mode 100644
index 000000000..7868e9c5a
--- /dev/null
+++ b/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts
@@ -0,0 +1,55 @@
+import { logUtils } from '@0x/utils';
+import * as R from 'ramda';
+import { Connection, ConnectionOptions, createConnection } from 'typeorm';
+
+import { DDEX_SOURCE, DdexMarket, DdexSource } from '../data_sources/ddex';
+import { TokenOrderbookSnapshot as TokenOrder } from '../entities';
+import * as ormConfig from '../ormconfig';
+import { parseDdexOrders } from '../parsers/ddex_orders';
+import { handleError } from '../utils';
+
+// Number of orders to save at once.
+const BATCH_SAVE_SIZE = 1000;
+
+// Number of markets to retrieve orderbooks for at once.
+const MARKET_ORDERBOOK_REQUEST_BATCH_SIZE = 50;
+
+// Delay between market orderbook requests.
+const MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY = 5000;
+
+let connection: Connection;
+
+(async () => {
+ connection = await createConnection(ormConfig as ConnectionOptions);
+ const ddexSource = new DdexSource();
+ const markets = await ddexSource.getActiveMarketsAsync();
+ for (const marketsChunk of R.splitEvery(MARKET_ORDERBOOK_REQUEST_BATCH_SIZE, markets)) {
+ await Promise.all(
+ marketsChunk.map(async (market: DdexMarket) => getAndSaveMarketOrderbook(ddexSource, market)),
+ );
+ await new Promise<void>(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY));
+ }
+ process.exit(0);
+})().catch(handleError);
+
+/**
+ * Retrieve orderbook from Ddex API for a given market. Parse orders and insert
+ * them into our database.
+ * @param ddexSource Data source which can query Ddex API.
+ * @param market Object from Ddex API containing market data.
+ */
+async function getAndSaveMarketOrderbook(ddexSource: DdexSource, market: DdexMarket): Promise<void> {
+ const orderBook = await ddexSource.getMarketOrderbookAsync(market.id);
+ const observedTimestamp = Date.now();
+
+ logUtils.log(`${market.id}: Parsing orders.`);
+ const orders = parseDdexOrders(orderBook, market, observedTimestamp, DDEX_SOURCE);
+
+ if (orders.length > 0) {
+ logUtils.log(`${market.id}: Saving ${orders.length} orders.`);
+ const TokenOrderRepository = connection.getRepository(TokenOrder);
+ await TokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) });
+ } else {
+ logUtils.log(`${market.id}: 0 orders to save.`);
+ }
+}
diff --git a/packages/pipeline/src/scripts/pull_missing_blocks.ts b/packages/pipeline/src/scripts/pull_missing_blocks.ts
new file mode 100644
index 000000000..b7bd51f08
--- /dev/null
+++ b/packages/pipeline/src/scripts/pull_missing_blocks.ts
@@ -0,0 +1,80 @@
+// tslint:disable:no-console
+import { web3Factory } from '@0x/dev-utils';
+import * as Parallel from 'async-parallel';
+import R = require('ramda');
+import 'reflect-metadata';
+import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm';
+
+import { Web3Source } from '../data_sources/web3';
+import { Block } from '../entities';
+import * as ormConfig from '../ormconfig';
+import { parseBlock } from '../parsers/web3';
+import { EXCHANGE_START_BLOCK, handleError, INFURA_ROOT_URL } from '../utils';
+
+// Number of blocks to save at once.
+const BATCH_SAVE_SIZE = 1000;
+// Maximum number of requests to send at once.
+const MAX_CONCURRENCY = 10;
+// Maximum number of blocks to query for at once. This is also the maximum
+// number of blocks we will hold in memory prior to being saved to the database.
+const MAX_BLOCKS_PER_QUERY = 1000;
+
+let connection: Connection;
+
+(async () => {
+ connection = await createConnection(ormConfig as ConnectionOptions);
+ const provider = web3Factory.getRpcProvider({
+ rpcUrl: `${INFURA_ROOT_URL}/${process.env.INFURA_API_KEY}`,
+ });
+ const web3Source = new Web3Source(provider);
+ await getAllMissingBlocks(web3Source);
+ process.exit(0);
+})().catch(handleError);
+
+interface MissingBlocksResponse {
+ block_number: string;
+}
+
+async function getAllMissingBlocks(web3Source: Web3Source): Promise<void> {
+ const blocksRepository = connection.getRepository(Block);
+ let fromBlock = EXCHANGE_START_BLOCK;
+ while (true) {
+ const blockNumbers = await getMissingBlockNumbers(fromBlock);
+ if (blockNumbers.length === 0) {
+ // There are no more missing blocks. We're done.
+ break;
+ }
+ await getAndSaveBlocks(web3Source, blocksRepository, blockNumbers);
+ fromBlock = Math.max(...blockNumbers) + 1;
+ }
+ const totalBlocks = await blocksRepository.count();
+ console.log(`Done saving blocks. There are now ${totalBlocks} total blocks.`);
+}
+
+async function getMissingBlockNumbers(fromBlock: number): Promise<number[]> {
+ console.log(`Checking for missing blocks starting at ${fromBlock}...`);
+ const response = (await connection.query(
+ 'SELECT DISTINCT(block_number) FROM raw.exchange_fill_events WHERE block_number NOT IN (SELECT number FROM raw.blocks) AND block_number >= $1 ORDER BY block_number ASC LIMIT $2',
+ [fromBlock, MAX_BLOCKS_PER_QUERY],
+ )) as MissingBlocksResponse[];
+ const blockNumberStrings = R.pluck('block_number', response);
+ const blockNumbers = R.map(parseInt, blockNumberStrings);
+ console.log(`Found ${blockNumbers.length} missing blocks in the given range.`);
+ return blockNumbers;
+}
+
+async function getAndSaveBlocks(
+ web3Source: Web3Source,
+ blocksRepository: Repository<Block>,
+ blockNumbers: number[],
+): Promise<void> {
+ console.log(`Getting block data for ${blockNumbers.length} blocks...`);
+ Parallel.setConcurrency(MAX_CONCURRENCY);
+ const rawBlocks = await Parallel.map(blockNumbers, async (blockNumber: number) =>
+ web3Source.getBlockInfoAsync(blockNumber),
+ );
+ console.log(`Parsing ${rawBlocks.length} blocks...`);
+ const blocks = R.map(parseBlock, rawBlocks);
+ console.log(`Saving ${blocks.length} blocks...`);
+ await blocksRepository.save(blocks, { chunk: Math.ceil(blocks.length / BATCH_SAVE_SIZE) });
+}
diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts
new file mode 100644
index 000000000..80abbb8b0
--- /dev/null
+++ b/packages/pipeline/src/scripts/pull_missing_events.ts
@@ -0,0 +1,136 @@
+// tslint:disable:no-console
+import { web3Factory } from '@0x/dev-utils';
+import R = require('ramda');
+import 'reflect-metadata';
+import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm';
+
+import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events';
+import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeEvent, ExchangeFillEvent } from '../entities';
+import * as ormConfig from '../ormconfig';
+import { parseExchangeCancelEvents, parseExchangeCancelUpToEvents, parseExchangeFillEvents } from '../parsers/events';
+import { EXCHANGE_START_BLOCK, handleError, INFURA_ROOT_URL } from '../utils';
+
+const START_BLOCK_OFFSET = 100; // Number of blocks before the last known block to consider when updating fill events.
+const BATCH_SAVE_SIZE = 1000; // Number of events to save at once.
+
+let connection: Connection;
+
+(async () => {
+ connection = await createConnection(ormConfig as ConnectionOptions);
+ const provider = web3Factory.getRpcProvider({
+ rpcUrl: INFURA_ROOT_URL,
+ });
+ const eventsSource = new ExchangeEventsSource(provider, 1);
+ await getFillEventsAsync(eventsSource);
+ await getCancelEventsAsync(eventsSource);
+ await getCancelUpToEventsAsync(eventsSource);
+ process.exit(0);
+})().catch(handleError);
+
+async function getFillEventsAsync(eventsSource: ExchangeEventsSource): Promise<void> {
+ console.log('Checking existing fill events...');
+ const repository = connection.getRepository(ExchangeFillEvent);
+ const startBlock = await getStartBlockAsync(repository);
+ console.log(`Getting fill events starting at ${startBlock}...`);
+ const eventLogs = await eventsSource.getFillEventsAsync(startBlock);
+ console.log('Parsing fill events...');
+ const events = parseExchangeFillEvents(eventLogs);
+ console.log(`Retrieved and parsed ${events.length} total fill events.`);
+ await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events);
+}
+
+async function getCancelEventsAsync(eventsSource: ExchangeEventsSource): Promise<void> {
+ console.log('Checking existing cancel events...');
+ const repository = connection.getRepository(ExchangeCancelEvent);
+ const startBlock = await getStartBlockAsync(repository);
+ console.log(`Getting cancel events starting at ${startBlock}...`);
+ const eventLogs = await eventsSource.getCancelEventsAsync(startBlock);
+ console.log('Parsing cancel events...');
+ const events = parseExchangeCancelEvents(eventLogs);
+ console.log(`Retrieved and parsed ${events.length} total cancel events.`);
+ await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events);
+}
+
+async function getCancelUpToEventsAsync(eventsSource: ExchangeEventsSource): Promise<void> {
+ console.log('Checking existing CancelUpTo events...');
+ const repository = connection.getRepository(ExchangeCancelUpToEvent);
+ const startBlock = await getStartBlockAsync(repository);
+ console.log(`Getting CancelUpTo events starting at ${startBlock}...`);
+ const eventLogs = await eventsSource.getCancelUpToEventsAsync(startBlock);
+ console.log('Parsing CancelUpTo events...');
+ const events = parseExchangeCancelUpToEvents(eventLogs);
+ console.log(`Retrieved and parsed ${events.length} total CancelUpTo events.`);
+ await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events);
+}
+
+const tableNameRegex = /^[a-zA-Z_]*$/;
+
+async function getStartBlockAsync<T extends ExchangeEvent>(repository: Repository<T>): Promise<number> {
+ const fillEventCount = await repository.count();
+ if (fillEventCount === 0) {
+ console.log(`No existing ${repository.metadata.name}s found.`);
+ return EXCHANGE_START_BLOCK;
+ }
+ const tableName = repository.metadata.tableName;
+ if (!tableNameRegex.test(tableName)) {
+ throw new Error(`Unexpected special character in table name: ${tableName}`);
+ }
+ const queryResult = await connection.query(
+ `SELECT block_number FROM raw.${tableName} ORDER BY block_number DESC LIMIT 1`,
+ );
+ const lastKnownBlock = queryResult[0].block_number;
+ return lastKnownBlock - START_BLOCK_OFFSET;
+}
+
+async function saveEventsAsync<T extends ExchangeEvent>(
+ isInitialPull: boolean,
+ repository: Repository<T>,
+ events: T[],
+): Promise<void> {
+ console.log(`Saving ${repository.metadata.name}s...`);
+ if (isInitialPull) {
+ // Split data into numChunks pieces of maximum size BATCH_SAVE_SIZE
+ // each.
+ for (const eventsBatch of R.splitEvery(BATCH_SAVE_SIZE, events)) {
+ await repository.insert(eventsBatch);
+ }
+ } else {
+ // If we possibly have some overlap where we need to update some
+ // existing events, we need to use our workaround/fallback.
+ await saveIndividuallyWithFallbackAsync(repository, events);
+ }
+ const totalEvents = await repository.count();
+ console.log(`Done saving events. There are now ${totalEvents} total ${repository.metadata.name}s.`);
+}
+
+async function saveIndividuallyWithFallbackAsync<T extends ExchangeEvent>(
+ repository: Repository<T>,
+ events: T[],
+): Promise<void> {
+ // Note(albrow): This is a temporary hack because `save` is not working as
+ // documented and is causing a foreign key constraint violation. Hopefully
+ // can remove later because this "poor man's upsert" implementation operates
+ // on one event at a time and is therefore much slower.
+ for (const event of events) {
+ try {
+ // First try an insert.
+ await repository.insert(event);
+ } catch {
+ // If it fails, assume it was a foreign key constraint error and try
+ // doing an update instead.
+ // Note(albrow): Unfortunately the `as any` hack here seems
+ // required. I can't figure out how to convince the type-checker
+ // that the criteria and the entity itself are the correct type for
+ // the given repository. If we can remove the `save` hack then this
+ // will probably no longer be necessary.
+ await repository.update(
+ {
+ contractAddress: event.contractAddress,
+ blockNumber: event.blockNumber,
+ logIndex: event.logIndex,
+ } as any,
+ event as any,
+ );
+ }
+ }
+}
diff --git a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts
new file mode 100644
index 000000000..6979cd10e
--- /dev/null
+++ b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts
@@ -0,0 +1,101 @@
+// tslint:disable:no-console
+import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm';
+
+import { CryptoCompareOHLCVSource } from '../data_sources/ohlcv_external/crypto_compare';
+import { OHLCVExternal } from '../entities';
+import * as ormConfig from '../ormconfig';
+import { OHLCVMetadata, parseRecords } from '../parsers/ohlcv_external/crypto_compare';
+import { handleError } from '../utils';
+import { fetchOHLCVTradingPairsAsync, TradingPair } from '../utils/get_ohlcv_trading_pairs';
+
+const SOURCE_NAME = 'CryptoCompare';
+const TWO_HOURS_AGO = new Date().getTime() - 2 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers
+const ONE_HOUR_AGO = new Date().getTime() - 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers
+const ONE_SECOND = 1000;
+
+const MAX_CONCURRENT_REQUESTS = parseInt(process.env.CRYPTOCOMPARE_MAX_CONCURRENT_REQUESTS || '14', 10); // tslint:disable-line:custom-no-magic-numbers
+const EARLIEST_BACKFILL_DATE = process.env.OHLCV_EARLIEST_BACKFILL_DATE || '2010-09-01'; // the time when BTC/USD info starts appearing on Crypto Compare
+const EARLIEST_BACKFILL_TIME = new Date(EARLIEST_BACKFILL_DATE).getTime();
+
+let connection: Connection;
+
+(async () => {
+ connection = await createConnection(ormConfig as ConnectionOptions);
+ const repository = connection.getRepository(OHLCVExternal);
+ const source = new CryptoCompareOHLCVSource(MAX_CONCURRENT_REQUESTS);
+
+ const jobTime = new Date().getTime();
+ const tradingPairs = await fetchOHLCVTradingPairsAsync(connection, SOURCE_NAME, EARLIEST_BACKFILL_TIME);
+ console.log(`Starting ${tradingPairs.length} job(s) to scrape Crypto Compare for OHLCV records...`);
+
+ const fetchAndSavePromises = tradingPairs.map(async pair => {
+ const pairs = source.generateBackfillIntervals(pair);
+ return fetchAndSaveAsync(source, repository, jobTime, pairs);
+ });
+ await Promise.all(fetchAndSavePromises);
+ console.log(`Finished scraping OHLCV records from Crypto Compare, exiting...`);
+ process.exit(0);
+})().catch(handleError);
+
+async function fetchAndSaveAsync(
+ source: CryptoCompareOHLCVSource,
+ repository: Repository<OHLCVExternal>,
+ jobTime: number,
+ pairs: TradingPair[],
+): Promise<void> {
+ const sortAscTimestamp = (a: TradingPair, b: TradingPair): number => {
+ if (a.latestSavedTime < b.latestSavedTime) {
+ return -1;
+ } else if (a.latestSavedTime > b.latestSavedTime) {
+ return 1;
+ } else {
+ return 0;
+ }
+ };
+ pairs.sort(sortAscTimestamp);
+
+ let i = 0;
+ while (i < pairs.length) {
+ const pair = pairs[i];
+ if (pair.latestSavedTime > TWO_HOURS_AGO) {
+ break;
+ }
+ const rawRecords = await source.getHourlyOHLCVAsync(pair);
+ const records = rawRecords.filter(rec => {
+ return rec.time * ONE_SECOND < ONE_HOUR_AGO && rec.time * ONE_SECOND > pair.latestSavedTime;
+ }); // Crypto Compare can take ~30mins to finalise records
+ if (records.length === 0) {
+ console.log(`No more records, stopping task for ${JSON.stringify(pair)}`);
+ break;
+ }
+ const metadata: OHLCVMetadata = {
+ exchange: source.default_exchange,
+ fromSymbol: pair.fromSymbol,
+ toSymbol: pair.toSymbol,
+ source: SOURCE_NAME,
+ observedTimestamp: jobTime,
+ interval: source.intervalBetweenRecords,
+ };
+ const parsedRecords = parseRecords(records, metadata);
+ try {
+ await saveRecordsAsync(repository, parsedRecords);
+ i++;
+ } catch (err) {
+ console.log(`Error saving OHLCVRecords, stopping task for ${JSON.stringify(pair)} [${err}]`);
+ break;
+ }
+ }
+ return Promise.resolve();
+}
+
+async function saveRecordsAsync(repository: Repository<OHLCVExternal>, records: OHLCVExternal[]): Promise<void> {
+ const metadata = [
+ records[0].fromSymbol,
+ records[0].toSymbol,
+ new Date(records[0].startTime),
+ new Date(records[records.length - 1].endTime),
+ ];
+
+ console.log(`Saving ${records.length} records to ${repository.metadata.name}... ${JSON.stringify(metadata)}`);
+ await repository.save(records);
+}
diff --git a/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts
new file mode 100644
index 000000000..bae1fbede
--- /dev/null
+++ b/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts
@@ -0,0 +1,87 @@
+import { logUtils } from '@0x/utils';
+import { Connection, ConnectionOptions, createConnection } from 'typeorm';
+
+import {
+ PARADEX_SOURCE,
+ ParadexActiveMarketsResponse,
+ ParadexMarket,
+ ParadexSource,
+ ParadexTokenInfoResponse,
+} from '../data_sources/paradex';
+import { TokenOrderbookSnapshot as TokenOrder } from '../entities';
+import * as ormConfig from '../ormconfig';
+import { parseParadexOrders } from '../parsers/paradex_orders';
+import { handleError } from '../utils';
+
+// Number of orders to save at once.
+const BATCH_SAVE_SIZE = 1000;
+
+let connection: Connection;
+
+(async () => {
+ connection = await createConnection(ormConfig as ConnectionOptions);
+ const apiKey = process.env.PARADEX_DATA_PIPELINE_API_KEY;
+ if (apiKey === undefined) {
+ throw new Error('Missing required env var: PARADEX_DATA_PIPELINE_API_KEY');
+ }
+ const paradexSource = new ParadexSource(apiKey);
+ const markets = await paradexSource.getActiveMarketsAsync();
+ const tokenInfoResponse = await paradexSource.getTokenInfoAsync();
+ const extendedMarkets = addTokenAddresses(markets, tokenInfoResponse);
+ await Promise.all(
+ extendedMarkets.map(async (market: ParadexMarket) => getAndSaveMarketOrderbook(paradexSource, market)),
+ );
+ process.exit(0);
+})().catch(handleError);
+
+/**
+ * Extend the default ParadexMarket objects with token addresses.
+ * @param markets An array of ParadexMarket objects.
+ * @param tokenInfoResponse An array of ParadexTokenInfo containing the addresses.
+ */
+function addTokenAddresses(
+ markets: ParadexActiveMarketsResponse,
+ tokenInfoResponse: ParadexTokenInfoResponse,
+): ParadexMarket[] {
+ const symbolAddressMapping = new Map<string, string>();
+ tokenInfoResponse.forEach(tokenInfo => symbolAddressMapping.set(tokenInfo.symbol, tokenInfo.address));
+
+ markets.forEach((market: ParadexMarket) => {
+ if (symbolAddressMapping.has(market.baseToken)) {
+ market.baseTokenAddress = symbolAddressMapping.get(market.baseToken);
+ } else {
+ market.quoteTokenAddress = '';
+ logUtils.warn(`${market.baseToken}: No address found.`);
+ }
+
+ if (symbolAddressMapping.has(market.quoteToken)) {
+ market.quoteTokenAddress = symbolAddressMapping.get(market.quoteToken);
+ } else {
+ market.quoteTokenAddress = '';
+ logUtils.warn(`${market.quoteToken}: No address found.`);
+ }
+ });
+ return markets;
+}
+
+/**
+ * Retrieve orderbook from Paradex API for a given market. Parse orders and insert
+ * them into our database.
+ * @param paradexSource Data source which can query the Paradex API.
+ * @param market Object from the Paradex API with information about the market in question.
+ */
+async function getAndSaveMarketOrderbook(paradexSource: ParadexSource, market: ParadexMarket): Promise<void> {
+ const paradexOrderbookResponse = await paradexSource.getMarketOrderbookAsync(market.symbol);
+ const observedTimestamp = Date.now();
+
+ logUtils.log(`${market.symbol}: Parsing orders.`);
+ const orders = parseParadexOrders(paradexOrderbookResponse, market, observedTimestamp, PARADEX_SOURCE);
+
+ if (orders.length > 0) {
+ logUtils.log(`${market.symbol}: Saving ${orders.length} orders.`);
+ const tokenOrderRepository = connection.getRepository(TokenOrder);
+ await tokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) });
+ } else {
+ logUtils.log(`${market.symbol}: 0 orders to save.`);
+ }
+}
diff --git a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts
new file mode 100644
index 000000000..6c18bcaef
--- /dev/null
+++ b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts
@@ -0,0 +1,53 @@
+// tslint:disable:no-console
+import { HttpClient } from '@0x/connect';
+import * as R from 'ramda';
+import 'reflect-metadata';
+import { Connection, ConnectionOptions, createConnection, EntityManager } from 'typeorm';
+
+import { createObservedTimestampForOrder, SraOrder } from '../entities';
+import * as ormConfig from '../ormconfig';
+import { parseSraOrders } from '../parsers/sra_orders';
+import { handleError } from '../utils';
+
+const RADAR_RELAY_URL = 'https://api.radarrelay.com/0x/v2';
+const ORDERS_PER_PAGE = 10000; // Number of orders to get per request.
+
+let connection: Connection;
+
+(async () => {
+ connection = await createConnection(ormConfig as ConnectionOptions);
+ await getOrderbookAsync();
+ process.exit(0);
+})().catch(handleError);
+
+async function getOrderbookAsync(): Promise<void> {
+ console.log('Getting all orders...');
+ const connectClient = new HttpClient(RADAR_RELAY_URL);
+ const rawOrders = await connectClient.getOrdersAsync({
+ perPage: ORDERS_PER_PAGE,
+ });
+ console.log(`Got ${rawOrders.records.length} orders.`);
+ console.log('Parsing orders...');
+ // Parse the sra orders, then add source url to each.
+ const orders = R.pipe(parseSraOrders, R.map(setSourceUrl(RADAR_RELAY_URL)))(rawOrders);
+ // Save all the orders and update the observed time stamps in a single
+ // transaction.
+ console.log('Saving orders and updating timestamps...');
+ await connection.transaction(async (manager: EntityManager): Promise<void> => {
+ for (const order of orders) {
+ await manager.save(SraOrder, order);
+ const observedTimestamp = createObservedTimestampForOrder(order);
+ await manager.save(observedTimestamp);
+ }
+ });
+}
+
+const sourceUrlProp = R.lensProp('sourceUrl');
+
+/**
+ * Sets the source url for a single order. Returns a new order instead of
+ * mutating the given one.
+ */
+const setSourceUrl = R.curry((sourceURL: string, order: SraOrder): SraOrder => {
+ return R.set(sourceUrlProp, sourceURL, order);
+});
diff --git a/packages/pipeline/src/scripts/pull_trusted_tokens.ts b/packages/pipeline/src/scripts/pull_trusted_tokens.ts
new file mode 100644
index 000000000..1befc4437
--- /dev/null
+++ b/packages/pipeline/src/scripts/pull_trusted_tokens.ts
@@ -0,0 +1,52 @@
+import 'reflect-metadata';
+import { Connection, ConnectionOptions, createConnection } from 'typeorm';
+
+import { MetamaskTrustedTokenMeta, TrustedTokenSource, ZeroExTrustedTokenMeta } from '../data_sources/trusted_tokens';
+import { TokenMetadata } from '../entities';
+import * as ormConfig from '../ormconfig';
+import { parseMetamaskTrustedTokens, parseZeroExTrustedTokens } from '../parsers/token_metadata';
+import { handleError } from '../utils';
+
+const METAMASK_TRUSTED_TOKENS_URL =
+ 'https://raw.githubusercontent.com/MetaMask/eth-contract-metadata/d45916c533116510cc8e9e048a8b5fc3732a6b6d/contract-map.json';
+
+const ZEROEX_TRUSTED_TOKENS_URL = 'https://website-api.0xproject.com/tokens';
+
+let connection: Connection;
+
+(async () => {
+ connection = await createConnection(ormConfig as ConnectionOptions);
+ await getMetamaskTrustedTokens();
+ await getZeroExTrustedTokens();
+ process.exit(0);
+})().catch(handleError);
+
+async function getMetamaskTrustedTokens(): Promise<void> {
+ // tslint:disable-next-line:no-console
+ console.log('Getting latest metamask trusted tokens list ...');
+ const trustedTokensRepository = connection.getRepository(TokenMetadata);
+ const trustedTokensSource = new TrustedTokenSource<Map<string, MetamaskTrustedTokenMeta>>(
+ METAMASK_TRUSTED_TOKENS_URL,
+ );
+ const resp = await trustedTokensSource.getTrustedTokenMetaAsync();
+ const trustedTokens = parseMetamaskTrustedTokens(resp);
+ // tslint:disable-next-line:no-console
+ console.log('Saving metamask trusted tokens list');
+ await trustedTokensRepository.save(trustedTokens);
+ // tslint:disable-next-line:no-console
+ console.log('Done saving metamask trusted tokens.');
+}
+
+async function getZeroExTrustedTokens(): Promise<void> {
+ // tslint:disable-next-line:no-console
+ console.log('Getting latest 0x trusted tokens list ...');
+ const trustedTokensRepository = connection.getRepository(TokenMetadata);
+ const trustedTokensSource = new TrustedTokenSource<ZeroExTrustedTokenMeta[]>(ZEROEX_TRUSTED_TOKENS_URL);
+ const resp = await trustedTokensSource.getTrustedTokenMetaAsync();
+ const trustedTokens = parseZeroExTrustedTokens(resp);
+ // tslint:disable-next-line:no-console
+ console.log('Saving metamask trusted tokens list');
+ await trustedTokensRepository.save(trustedTokens);
+ // tslint:disable-next-line:no-console
+ console.log('Done saving metamask trusted tokens.');
+}
diff --git a/packages/pipeline/src/scripts/update_relayer_info.ts b/packages/pipeline/src/scripts/update_relayer_info.ts
new file mode 100644
index 000000000..f8918728d
--- /dev/null
+++ b/packages/pipeline/src/scripts/update_relayer_info.ts
@@ -0,0 +1,33 @@
+// tslint:disable:no-console
+import 'reflect-metadata';
+import { Connection, ConnectionOptions, createConnection } from 'typeorm';
+
+import { RelayerRegistrySource } from '../data_sources/relayer-registry';
+import { Relayer } from '../entities';
+import * as ormConfig from '../ormconfig';
+import { parseRelayers } from '../parsers/relayer_registry';
+import { handleError } from '../utils';
+
+// NOTE(albrow): We need to manually update this URL for now. Fix this when we
+// have the relayer-registry behind semantic versioning.
+const RELAYER_REGISTRY_URL =
+ 'https://raw.githubusercontent.com/0xProject/0x-relayer-registry/4701c85677d161ea729a466aebbc1826c6aa2c0b/relayers.json';
+
+let connection: Connection;
+
+(async () => {
+ connection = await createConnection(ormConfig as ConnectionOptions);
+ await getRelayers();
+ process.exit(0);
+})().catch(handleError);
+
+async function getRelayers(): Promise<void> {
+ console.log('Getting latest relayer info...');
+ const relayerRepository = connection.getRepository(Relayer);
+ const relayerSource = new RelayerRegistrySource(RELAYER_REGISTRY_URL);
+ const relayersResp = await relayerSource.getRelayerInfoAsync();
+ const relayers = parseRelayers(relayersResp);
+ console.log('Saving relayer info...');
+ await relayerRepository.save(relayers);
+ console.log('Done saving relayer info.');
+}
diff --git a/packages/pipeline/src/types.ts b/packages/pipeline/src/types.ts
new file mode 100644
index 000000000..e02b42a40
--- /dev/null
+++ b/packages/pipeline/src/types.ts
@@ -0,0 +1,2 @@
+export type AssetType = 'erc20' | 'erc721';
+export type OrderType = 'bid' | 'ask';
diff --git a/packages/pipeline/src/utils/constants.ts b/packages/pipeline/src/utils/constants.ts
new file mode 100644
index 000000000..56f3e82d8
--- /dev/null
+++ b/packages/pipeline/src/utils/constants.ts
@@ -0,0 +1,3 @@
+// Block number when the Exchange contract was deployed to mainnet.
+export const EXCHANGE_START_BLOCK = 6271590;
+export const INFURA_ROOT_URL = 'https://mainnet.infura.io';
diff --git a/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts b/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts
new file mode 100644
index 000000000..9d3ef2fba
--- /dev/null
+++ b/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts
@@ -0,0 +1,92 @@
+import { fetchAsync } from '@0x/utils';
+import * as R from 'ramda';
+import { Connection } from 'typeorm';
+
+export interface TradingPair {
+ fromSymbol: string;
+ toSymbol: string;
+ latestSavedTime: number;
+}
+
+const COINLIST_API = 'https://min-api.cryptocompare.com/data/all/coinlist?BuiltOn=7605';
+
+interface CryptoCompareCoinListResp {
+ Data: Map<string, CryptoCompareCoin>;
+}
+
+interface CryptoCompareCoin {
+ Symbol: string;
+ BuiltOn: string;
+ SmartContractAddress: string;
+}
+
+const TO_CURRENCIES = ['USD', 'EUR', 'ETH', 'USDT'];
+const ETHEREUM_IDENTIFIER = '7605';
+const HTTP_OK_STATUS = 200;
+/**
+ * Get trading pairs with latest scraped time for OHLCV records
+ * @param conn a typeorm Connection to postgres
+ */
+export async function fetchOHLCVTradingPairsAsync(
+ conn: Connection,
+ source: string,
+ earliestBackfillTime: number,
+): Promise<TradingPair[]> {
+ // fetch existing ohlcv records
+ const latestTradingPairs: Array<{
+ from_symbol: string;
+ to_symbol: string;
+ latest: string;
+ }> = await conn.query(`SELECT
+ MAX(end_time) as latest,
+ from_symbol,
+ to_symbol
+ FROM raw.ohlcv_external
+ GROUP BY from_symbol, to_symbol;`);
+
+ const latestTradingPairsIndex: { [fromSym: string]: { [toSym: string]: number } } = {};
+ latestTradingPairs.forEach(pair => {
+ const latestIndex: { [toSym: string]: number } = latestTradingPairsIndex[pair.from_symbol] || {};
+ latestIndex[pair.to_symbol] = parseInt(pair.latest, 10); // tslint:disable-line:custom-no-magic-numbers
+ latestTradingPairsIndex[pair.from_symbol] = latestIndex;
+ });
+
+ // get token symbols used by Crypto Compare
+ const allCoinsResp = await fetchAsync(COINLIST_API);
+ if (allCoinsResp.status !== HTTP_OK_STATUS) {
+ return [];
+ }
+ const allCoins: CryptoCompareCoinListResp = await allCoinsResp.json();
+ const erc20CoinsIndex: Map<string, string> = new Map();
+ Object.entries(allCoins.Data).forEach(pair => {
+ const [symbol, coinData] = pair;
+ if (coinData.BuiltOn === ETHEREUM_IDENTIFIER && coinData.SmartContractAddress !== 'N/A') {
+ erc20CoinsIndex.set(coinData.SmartContractAddress.toLowerCase(), symbol);
+ }
+ });
+
+ // fetch all tokens that are traded on 0x
+ const rawTokenAddresses: Array<{ tokenaddress: string }> = await conn.query(
+ `SELECT DISTINCT(maker_token_address) as tokenaddress FROM raw.exchange_fill_events UNION
+ SELECT DISTINCT(taker_token_address) as tokenaddress FROM raw.exchange_fill_events`,
+ );
+ const tokenAddresses = R.pluck('tokenaddress', rawTokenAddresses);
+
+ // join token addresses with CC symbols
+ const allTokenSymbols: string[] = tokenAddresses
+ .map(tokenAddress => erc20CoinsIndex.get(tokenAddress.toLowerCase()) || '')
+ .filter(x => x);
+
+ // generate list of all tokens with time of latest existing record OR default earliest time
+ const allTradingPairCombinations: TradingPair[] = R.chain(sym => {
+ return TO_CURRENCIES.map(fiat => {
+ return {
+ fromSymbol: sym,
+ toSymbol: fiat,
+ latestSavedTime: R.path<number>([sym, fiat], latestTradingPairsIndex) || earliestBackfillTime,
+ };
+ });
+ }, allTokenSymbols);
+
+ return allTradingPairCombinations;
+}
diff --git a/packages/pipeline/src/utils/index.ts b/packages/pipeline/src/utils/index.ts
new file mode 100644
index 000000000..2096a0a39
--- /dev/null
+++ b/packages/pipeline/src/utils/index.ts
@@ -0,0 +1,38 @@
+import { BigNumber } from '@0x/utils';
+export * from './transformers';
+export * from './constants';
+
+/**
+ * If the given BigNumber is not null, returns the string representation of that
+ * number. Otherwise, returns null.
+ * @param n The number to convert.
+ */
+export function bigNumbertoStringOrNull(n: BigNumber): string | null {
+ if (n == null) {
+ return null;
+ }
+ return n.toString();
+}
+
+/**
+ * Logs an error by intelligently checking for `message` and `stack` properties.
+ * Intended for use with top-level immediately invoked asynchronous functions.
+ * @param e the error to log.
+ */
+export function handleError(e: any): void {
+ if (e.message != null) {
+ // tslint:disable-next-line:no-console
+ console.error(e.message);
+ } else {
+ // tslint:disable-next-line:no-console
+ console.error('Unknown error');
+ }
+ if (e.stack != null) {
+ // tslint:disable-next-line:no-console
+ console.error(e.stack);
+ } else {
+ // tslint:disable-next-line:no-console
+ console.error('(No stack trace)');
+ }
+ process.exit(1);
+}
diff --git a/packages/pipeline/src/utils/transformers/big_number.ts b/packages/pipeline/src/utils/transformers/big_number.ts
new file mode 100644
index 000000000..5f2e4d565
--- /dev/null
+++ b/packages/pipeline/src/utils/transformers/big_number.ts
@@ -0,0 +1,16 @@
+import { BigNumber } from '@0x/utils';
+import { ValueTransformer } from 'typeorm/decorator/options/ValueTransformer';
+
+export class BigNumberTransformer implements ValueTransformer {
+ // tslint:disable-next-line:prefer-function-over-method
+ public to(value: BigNumber | null): string | null {
+ return value === null ? null : value.toString();
+ }
+
+ // tslint:disable-next-line:prefer-function-over-method
+ public from(value: string | null): BigNumber | null {
+ return value === null ? null : new BigNumber(value);
+ }
+}
+
+export const bigNumberTransformer = new BigNumberTransformer();
diff --git a/packages/pipeline/src/utils/transformers/index.ts b/packages/pipeline/src/utils/transformers/index.ts
new file mode 100644
index 000000000..232c1c5de
--- /dev/null
+++ b/packages/pipeline/src/utils/transformers/index.ts
@@ -0,0 +1,2 @@
+export * from './big_number';
+export * from './number_to_bigint';
diff --git a/packages/pipeline/src/utils/transformers/number_to_bigint.ts b/packages/pipeline/src/utils/transformers/number_to_bigint.ts
new file mode 100644
index 000000000..85560c1f0
--- /dev/null
+++ b/packages/pipeline/src/utils/transformers/number_to_bigint.ts
@@ -0,0 +1,27 @@
+import { BigNumber } from '@0x/utils';
+import { ValueTransformer } from 'typeorm/decorator/options/ValueTransformer';
+
+const decimalRadix = 10;
+
+// Can be used to convert a JavaScript number type to a Postgres bigint type and
+// vice versa. By default TypeORM will silently convert number types to string
+// if the corresponding Postgres type is bigint. See
+// https://github.com/typeorm/typeorm/issues/2400 for more information.
+export class NumberToBigIntTransformer implements ValueTransformer {
+ // tslint:disable-next-line:prefer-function-over-method
+ public to(value: number): string {
+ return value.toString();
+ }
+
+ // tslint:disable-next-line:prefer-function-over-method
+ public from(value: string): number {
+ if (new BigNumber(value).greaterThan(Number.MAX_SAFE_INTEGER)) {
+ throw new Error(
+ `Attempted to convert PostgreSQL bigint value (${value}) to JavaScript number type but it is too big to safely convert`,
+ );
+ }
+ return Number.parseInt(value, decimalRadix);
+ }
+}
+
+export const numberToBigIntTransformer = new NumberToBigIntTransformer();
diff --git a/packages/pipeline/test/data_sources/ohlcv_external/crypto_compare_test.ts b/packages/pipeline/test/data_sources/ohlcv_external/crypto_compare_test.ts
new file mode 100644
index 000000000..cb374bbb1
--- /dev/null
+++ b/packages/pipeline/test/data_sources/ohlcv_external/crypto_compare_test.ts
@@ -0,0 +1,47 @@
+import * as chai from 'chai';
+import 'mocha';
+import * as R from 'ramda';
+
+import { CryptoCompareOHLCVSource } from '../../../src/data_sources/ohlcv_external/crypto_compare';
+import { TradingPair } from '../../../src/utils/get_ohlcv_trading_pairs';
+import { chaiSetup } from '../../utils/chai_setup';
+
+chaiSetup.configure();
+const expect = chai.expect;
+
+// tslint:disable:custom-no-magic-numbers
+describe('ohlcv_external data source (Crypto Compare)', () => {
+ describe('generateBackfillIntervals', () => {
+ it('generates pairs with intervals to query', () => {
+ const source = new CryptoCompareOHLCVSource();
+ const pair: TradingPair = {
+ fromSymbol: 'ETH',
+ toSymbol: 'ZRX',
+ latestSavedTime: new Date().getTime() - source.interval * 2,
+ };
+
+ const expected = [
+ pair,
+ R.merge(pair, { latestSavedTime: pair.latestSavedTime + source.interval }),
+ R.merge(pair, { latestSavedTime: pair.latestSavedTime + source.interval * 2 }),
+ ];
+
+ const actual = source.generateBackfillIntervals(pair);
+ expect(actual).deep.equal(expected);
+ });
+
+ it('returns single pair if no backfill is needed', () => {
+ const source = new CryptoCompareOHLCVSource();
+ const pair: TradingPair = {
+ fromSymbol: 'ETH',
+ toSymbol: 'ZRX',
+ latestSavedTime: new Date().getTime() - source.interval + 5000,
+ };
+
+ const expected = [pair];
+
+ const actual = source.generateBackfillIntervals(pair);
+ expect(actual).deep.equal(expected);
+ });
+ });
+});
diff --git a/packages/pipeline/test/db_global_hooks.ts b/packages/pipeline/test/db_global_hooks.ts
new file mode 100644
index 000000000..dfee02c45
--- /dev/null
+++ b/packages/pipeline/test/db_global_hooks.ts
@@ -0,0 +1,9 @@
+import { setUpDbAsync, tearDownDbAsync } from './db_setup';
+
+before('set up database', async () => {
+ await setUpDbAsync();
+});
+
+after('tear down database', async () => {
+ await tearDownDbAsync();
+});
diff --git a/packages/pipeline/test/db_setup.ts b/packages/pipeline/test/db_setup.ts
new file mode 100644
index 000000000..bf31d15b6
--- /dev/null
+++ b/packages/pipeline/test/db_setup.ts
@@ -0,0 +1,174 @@
+import * as Docker from 'dockerode';
+import * as fs from 'fs';
+import * as R from 'ramda';
+import { Connection, ConnectionOptions, createConnection } from 'typeorm';
+
+import * as ormConfig from '../src/ormconfig';
+
+// The name of the image to pull and use for the container. This also affects
+// which version of Postgres we use.
+const DOCKER_IMAGE_NAME = 'postgres:11-alpine';
+// The name to use for the Docker container which will run Postgres.
+const DOCKER_CONTAINER_NAME = '0x_pipeline_postgres_test';
+// The port which will be exposed on the Docker container.
+const POSTGRES_HOST_PORT = '15432';
+// Number of milliseconds to wait for postgres to finish initializing after
+// starting the docker container.
+const POSTGRES_SETUP_DELAY_MS = 5000;
+
+/**
+ * Sets up the database for testing purposes. If the
+ * ZEROEX_DATA_PIPELINE_TEST_DB_URL env var is specified, it will create a
+ * connection using that url. Otherwise it will spin up a new Docker container
+ * with a Postgres database and then create a connection to that database.
+ */
+export async function setUpDbAsync(): Promise<void> {
+ const connection = await createDbConnectionOnceAsync();
+ await connection.runMigrations({ transaction: true });
+}
+
+/**
+ * Tears down the database used for testing. This completely destroys any data.
+ * If a docker container was created, it destroys that container too.
+ */
+export async function tearDownDbAsync(): Promise<void> {
+ const connection = await createDbConnectionOnceAsync();
+ for (const _ of connection.migrations) {
+ await connection.undoLastMigration({ transaction: true });
+ }
+ if (needsDocker()) {
+ const docker = initDockerOnce();
+ const postgresContainer = docker.getContainer(DOCKER_CONTAINER_NAME);
+ await postgresContainer.kill();
+ await postgresContainer.remove();
+ }
+}
+
+let savedConnection: Connection;
+
+/**
+ * The first time this is run, it creates and returns a new TypeORM connection.
+ * Each subsequent time, it returns the existing connection. This is helpful
+ * because only one TypeORM connection can be active at a time.
+ */
+export async function createDbConnectionOnceAsync(): Promise<Connection> {
+ if (savedConnection !== undefined) {
+ return savedConnection;
+ }
+
+ if (needsDocker()) {
+ await initContainerAsync();
+ }
+ const testDbUrl =
+ process.env.ZEROEX_DATA_PIPELINE_TEST_DB_URL ||
+ `postgresql://postgres@localhost:${POSTGRES_HOST_PORT}/postgres`;
+ const testOrmConfig = R.merge(ormConfig, { url: testDbUrl }) as ConnectionOptions;
+
+ savedConnection = await createConnection(testOrmConfig);
+ return savedConnection;
+}
+
+async function sleepAsync(ms: number): Promise<{}> {
+ return new Promise<{}>(resolve => setTimeout(resolve, ms));
+}
+
+let savedDocker: Docker;
+
+function initDockerOnce(): Docker {
+ if (savedDocker !== undefined) {
+ return savedDocker;
+ }
+
+ // Note(albrow): Code for determining the right socket path is partially
+ // based on https://github.com/apocas/dockerode/blob/8f3aa85311fab64d58eca08fef49aa1da5b5f60b/test/spec_helper.js
+ const isWin = require('os').type() === 'Windows_NT';
+ const socketPath = process.env.DOCKER_SOCKET || (isWin ? '//./pipe/docker_engine' : '/var/run/docker.sock');
+ const isSocket = fs.existsSync(socketPath) ? fs.statSync(socketPath).isSocket() : false;
+ if (!isSocket) {
+ throw new Error(`Failed to connect to Docker using socket path: "${socketPath}".
+
+The database integration tests need to be able to connect to a Postgres database. Make sure that Docker is running and accessible at the expected socket path. If Docker isn't working you have two options:
+
+ 1) Set the DOCKER_SOCKET environment variable to a socket path that can be used to connect to Docker or
+ 2) Set the ZEROEX_DATA_PIPELINE_TEST_DB_URL environment variable to connect directly to an existing Postgres database instead of trying to start Postgres via Docker
+`);
+ }
+ savedDocker = new Docker({
+ socketPath,
+ });
+ return savedDocker;
+}
+
+// Creates the container, waits for it to initialize, and returns it.
+async function initContainerAsync(): Promise<Docker.Container> {
+ const docker = initDockerOnce();
+
+ // Tear down any existing containers with the same name.
+ await tearDownExistingContainerIfAnyAsync();
+
+ // Pull the image we need.
+ await pullImageAsync(docker, DOCKER_IMAGE_NAME);
+
+ // Create the container.
+ const postgresContainer = await docker.createContainer({
+ name: DOCKER_CONTAINER_NAME,
+ Image: DOCKER_IMAGE_NAME,
+ ExposedPorts: {
+ '5432': {},
+ },
+ HostConfig: {
+ PortBindings: {
+ '5432': [
+ {
+ HostPort: POSTGRES_HOST_PORT,
+ },
+ ],
+ },
+ },
+ });
+ await postgresContainer.start();
+ await sleepAsync(POSTGRES_SETUP_DELAY_MS);
+ return postgresContainer;
+}
+
+async function tearDownExistingContainerIfAnyAsync(): Promise<void> {
+ const docker = initDockerOnce();
+
+ // Check if a container with the desired name already exists. If so, this
+ // probably means we didn't clean up properly on the last test run.
+ const existingContainer = docker.getContainer(DOCKER_CONTAINER_NAME);
+ if (existingContainer != null) {
+ try {
+ await existingContainer.kill();
+ } catch {
+ // If this fails, it's fine. The container was probably already
+ // killed.
+ }
+ try {
+ await existingContainer.remove();
+ } catch {
+ // If this fails, it's fine. The container was probably already
+ // removed.
+ }
+ }
+}
+
+function needsDocker(): boolean {
+ return process.env.ZEROEX_DATA_PIPELINE_TEST_DB_URL === undefined;
+}
+
+// Note(albrow): This is partially based on
+// https://stackoverflow.com/questions/38258263/how-do-i-wait-for-a-pull
+async function pullImageAsync(docker: Docker, imageName: string): Promise<void> {
+ return new Promise<void>((resolve, reject) => {
+ docker.pull(imageName, {}, (err, stream) => {
+ if (err != null) {
+ reject(err);
+ return;
+ }
+ docker.modem.followProgress(stream, () => {
+ resolve();
+ });
+ });
+ });
+}
diff --git a/packages/pipeline/test/entities/block_test.ts b/packages/pipeline/test/entities/block_test.ts
new file mode 100644
index 000000000..503f284f0
--- /dev/null
+++ b/packages/pipeline/test/entities/block_test.ts
@@ -0,0 +1,23 @@
+import 'mocha';
+import 'reflect-metadata';
+
+import { Block } from '../../src/entities';
+import { createDbConnectionOnceAsync } from '../db_setup';
+import { chaiSetup } from '../utils/chai_setup';
+
+import { testSaveAndFindEntityAsync } from './util';
+
+chaiSetup.configure();
+
+// tslint:disable:custom-no-magic-numbers
+describe('Block entity', () => {
+ it('save/find', async () => {
+ const connection = await createDbConnectionOnceAsync();
+ const block = new Block();
+ block.hash = '0x12345';
+ block.number = 1234567;
+ block.timestamp = 5432154321;
+ const blocksRepository = connection.getRepository(Block);
+ await testSaveAndFindEntityAsync(blocksRepository, block);
+ });
+});
diff --git a/packages/pipeline/test/entities/dex_trades_test.ts b/packages/pipeline/test/entities/dex_trades_test.ts
new file mode 100644
index 000000000..83aaeec8f
--- /dev/null
+++ b/packages/pipeline/test/entities/dex_trades_test.ts
@@ -0,0 +1,60 @@
+import { BigNumber } from '@0x/utils';
+import 'mocha';
+import * as R from 'ramda';
+import 'reflect-metadata';
+
+import { DexTrade } from '../../src/entities';
+import { createDbConnectionOnceAsync } from '../db_setup';
+import { chaiSetup } from '../utils/chai_setup';
+
+import { testSaveAndFindEntityAsync } from './util';
+
+chaiSetup.configure();
+
+const baseTrade = {
+ sourceUrl: 'https://bloxy.info/api/dex/trades',
+ txTimestamp: 1543447585938,
+ txDate: '2018-11-21',
+ txSender: '0x00923b9a074762b93650716333b3e1473a15048e',
+ smartContractId: 7091917,
+ smartContractAddress: '0x818e6fecd516ecc3849daf6845e3ec868087b755',
+ contractType: 'DEX/Kyber Network Proxy',
+ maker: '0xbf2179859fc6d5bee9bf9158632dc51678a4100c',
+ taker: '0xbf2179859fc6d5bee9bf9158632dc51678a4100d',
+ amountBuy: new BigNumber('1.011943163078103'),
+ makerFeeAmount: new BigNumber(0),
+ buyCurrencyId: 1,
+ buySymbol: 'ETH',
+ amountSell: new BigNumber('941.4997928436911'),
+ takerFeeAmount: new BigNumber(0),
+ sellCurrencyId: 16610,
+ sellSymbol: 'ELF',
+ makerAnnotation: '',
+ takerAnnotation: '',
+ protocol: 'Kyber Network Proxy',
+ sellAddress: '0xbf2179859fc6d5bee9bf9158632dc51678a4100e',
+};
+
+const tradeWithNullAddresses: DexTrade = R.merge(baseTrade, {
+ txHash: '0xb93a7faf92efbbb5405c9a73cd4efd99702fe27c03ff22baee1f1b1e37b3a0bf',
+ buyAddress: '0xbf2179859fc6d5bee9bf9158632dc51678a4100e',
+ sellAddress: '0xbf2179859fc6d5bee9bf9158632dc51678a4100f',
+});
+
+const tradeWithNonNullAddresses: DexTrade = R.merge(baseTrade, {
+ txHash: '0xb93a7faf92efbbb5405c9a73cd4efd99702fe27c03ff22baee1f1b1e37b3a0be',
+ buyAddress: null,
+ sellAddress: null,
+});
+
+// tslint:disable:custom-no-magic-numbers
+describe('DexTrade entity', () => {
+ it('save/find', async () => {
+ const connection = await createDbConnectionOnceAsync();
+ const trades = [tradeWithNullAddresses, tradeWithNonNullAddresses];
+ const tradesRepository = connection.getRepository(DexTrade);
+ for (const trade of trades) {
+ await testSaveAndFindEntityAsync(tradesRepository, trade);
+ }
+ });
+});
diff --git a/packages/pipeline/test/entities/exchange_cancel_event_test.ts b/packages/pipeline/test/entities/exchange_cancel_event_test.ts
new file mode 100644
index 000000000..f3b306d69
--- /dev/null
+++ b/packages/pipeline/test/entities/exchange_cancel_event_test.ts
@@ -0,0 +1,57 @@
+import 'mocha';
+import * as R from 'ramda';
+import 'reflect-metadata';
+
+import { ExchangeCancelEvent } from '../../src/entities';
+import { AssetType } from '../../src/types';
+import { createDbConnectionOnceAsync } from '../db_setup';
+import { chaiSetup } from '../utils/chai_setup';
+
+import { testSaveAndFindEntityAsync } from './util';
+
+chaiSetup.configure();
+
+const baseCancelEvent = {
+ contractAddress: '0x4f833a24e1f95d70f028921e27040ca56e09ab0b',
+ logIndex: 1234,
+ blockNumber: 6276262,
+ rawData: '0x000000000000000000000000f6da68519f78b0d0bc93c701e86affcb75c92428',
+ transactionHash: '0x6dd106d002873746072fc5e496dd0fb2541b68c77bcf9184ae19a42fd33657fe',
+ makerAddress: '0xf6da68519f78b0d0bc93c701e86affcb75c92428',
+ takerAddress: '0xf6da68519f78b0d0bc93c701e86affcb75c92428',
+ feeRecipientAddress: '0xc370d2a5920344aa6b7d8d11250e3e861434cbdd',
+ senderAddress: '0xf6da68519f78b0d0bc93c701e86affcb75c92428',
+ orderHash: '0xab12ed2cbaa5615ab690b9da75a46e53ddfcf3f1a68655b5fe0d94c75a1aac4a',
+ rawMakerAssetData: '0xf47261b0000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc2',
+ makerAssetProxyId: '0xf47261b0',
+ makerTokenAddress: '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2',
+ rawTakerAssetData: '0xf47261b0000000000000000000000000e41d2489571d322189246dafa5ebde1f4699f498',
+ takerAssetProxyId: '0xf47261b0',
+ takerTokenAddress: '0xe41d2489571d322189246dafa5ebde1f4699f498',
+};
+
+const erc20CancelEvent = R.merge(baseCancelEvent, {
+ makerAssetType: 'erc20' as AssetType,
+ makerTokenId: null,
+ takerAssetType: 'erc20' as AssetType,
+ takerTokenId: null,
+});
+
+const erc721CancelEvent = R.merge(baseCancelEvent, {
+ makerAssetType: 'erc721' as AssetType,
+ makerTokenId: '19378573',
+ takerAssetType: 'erc721' as AssetType,
+ takerTokenId: '63885673888',
+});
+
+// tslint:disable:custom-no-magic-numbers
+describe('ExchangeCancelEvent entity', () => {
+ it('save/find', async () => {
+ const connection = await createDbConnectionOnceAsync();
+ const events = [erc20CancelEvent, erc721CancelEvent];
+ const cancelEventRepository = connection.getRepository(ExchangeCancelEvent);
+ for (const event of events) {
+ await testSaveAndFindEntityAsync(cancelEventRepository, event);
+ }
+ });
+});
diff --git a/packages/pipeline/test/entities/exchange_cancel_up_to_event_test.ts b/packages/pipeline/test/entities/exchange_cancel_up_to_event_test.ts
new file mode 100644
index 000000000..aa34f8c1c
--- /dev/null
+++ b/packages/pipeline/test/entities/exchange_cancel_up_to_event_test.ts
@@ -0,0 +1,29 @@
+import { BigNumber } from '@0x/utils';
+import 'mocha';
+import 'reflect-metadata';
+
+import { ExchangeCancelUpToEvent } from '../../src/entities';
+import { createDbConnectionOnceAsync } from '../db_setup';
+import { chaiSetup } from '../utils/chai_setup';
+
+import { testSaveAndFindEntityAsync } from './util';
+
+chaiSetup.configure();
+
+// tslint:disable:custom-no-magic-numbers
+describe('ExchangeCancelUpToEvent entity', () => {
+ it('save/find', async () => {
+ const connection = await createDbConnectionOnceAsync();
+ const cancelUpToEventRepository = connection.getRepository(ExchangeCancelUpToEvent);
+ const cancelUpToEvent = new ExchangeCancelUpToEvent();
+ cancelUpToEvent.blockNumber = 6276262;
+ cancelUpToEvent.contractAddress = '0x4f833a24e1f95d70f028921e27040ca56e09ab0b';
+ cancelUpToEvent.logIndex = 42;
+ cancelUpToEvent.makerAddress = '0xf6da68519f78b0d0bc93c701e86affcb75c92428';
+ cancelUpToEvent.orderEpoch = new BigNumber('123456789123456789');
+ cancelUpToEvent.rawData = '0x000000000000000000000000f6da68519f78b0d0bc93c701e86affcb75c92428';
+ cancelUpToEvent.senderAddress = '0xf6da68519f78b0d0bc93c701e86affcb75c92428';
+ cancelUpToEvent.transactionHash = '0x6dd106d002873746072fc5e496dd0fb2541b68c77bcf9184ae19a42fd33657fe';
+ await testSaveAndFindEntityAsync(cancelUpToEventRepository, cancelUpToEvent);
+ });
+});
diff --git a/packages/pipeline/test/entities/exchange_fill_event_test.ts b/packages/pipeline/test/entities/exchange_fill_event_test.ts
new file mode 100644
index 000000000..b2cb8c5e0
--- /dev/null
+++ b/packages/pipeline/test/entities/exchange_fill_event_test.ts
@@ -0,0 +1,62 @@
+import { BigNumber } from '@0x/utils';
+import 'mocha';
+import * as R from 'ramda';
+import 'reflect-metadata';
+
+import { ExchangeFillEvent } from '../../src/entities';
+import { AssetType } from '../../src/types';
+import { createDbConnectionOnceAsync } from '../db_setup';
+import { chaiSetup } from '../utils/chai_setup';
+
+import { testSaveAndFindEntityAsync } from './util';
+
+chaiSetup.configure();
+
+const baseFillEvent = {
+ contractAddress: '0x4f833a24e1f95d70f028921e27040ca56e09ab0b',
+ blockNumber: 6276262,
+ logIndex: 102,
+ rawData: '0x000000000000000000000000f6da68519f78b0d0bc93c701e86affcb75c92428',
+ transactionHash: '0x6dd106d002873746072fc5e496dd0fb2541b68c77bcf9184ae19a42fd33657fe',
+ makerAddress: '0xf6da68519f78b0d0bc93c701e86affcb75c92428',
+ takerAddress: '0xf6da68519f78b0d0bc93c701e86affcb75c92428',
+ feeRecipientAddress: '0xc370d2a5920344aa6b7d8d11250e3e861434cbdd',
+ senderAddress: '0xf6da68519f78b0d0bc93c701e86affcb75c92428',
+ makerAssetFilledAmount: new BigNumber('10000000000000000'),
+ takerAssetFilledAmount: new BigNumber('100000000000000000'),
+ makerFeePaid: new BigNumber('0'),
+ takerFeePaid: new BigNumber('12345'),
+ orderHash: '0xab12ed2cbaa5615ab690b9da75a46e53ddfcf3f1a68655b5fe0d94c75a1aac4a',
+ rawMakerAssetData: '0xf47261b0000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc2',
+ makerAssetProxyId: '0xf47261b0',
+ makerTokenAddress: '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2',
+ rawTakerAssetData: '0xf47261b0000000000000000000000000e41d2489571d322189246dafa5ebde1f4699f498',
+ takerAssetProxyId: '0xf47261b0',
+ takerTokenAddress: '0xe41d2489571d322189246dafa5ebde1f4699f498',
+};
+
+const erc20FillEvent = R.merge(baseFillEvent, {
+ makerAssetType: 'erc20' as AssetType,
+ makerTokenId: null,
+ takerAssetType: 'erc20' as AssetType,
+ takerTokenId: null,
+});
+
+const erc721FillEvent = R.merge(baseFillEvent, {
+ makerAssetType: 'erc721' as AssetType,
+ makerTokenId: '19378573',
+ takerAssetType: 'erc721' as AssetType,
+ takerTokenId: '63885673888',
+});
+
+// tslint:disable:custom-no-magic-numbers
+describe('ExchangeFillEvent entity', () => {
+ it('save/find', async () => {
+ const connection = await createDbConnectionOnceAsync();
+ const events = [erc20FillEvent, erc721FillEvent];
+ const fillEventsRepository = connection.getRepository(ExchangeFillEvent);
+ for (const event of events) {
+ await testSaveAndFindEntityAsync(fillEventsRepository, event);
+ }
+ });
+});
diff --git a/packages/pipeline/test/entities/ohlcv_external_test.ts b/packages/pipeline/test/entities/ohlcv_external_test.ts
new file mode 100644
index 000000000..8b995db50
--- /dev/null
+++ b/packages/pipeline/test/entities/ohlcv_external_test.ts
@@ -0,0 +1,35 @@
+import 'mocha';
+import 'reflect-metadata';
+
+import { OHLCVExternal } from '../../src/entities';
+import { createDbConnectionOnceAsync } from '../db_setup';
+import { chaiSetup } from '../utils/chai_setup';
+
+import { testSaveAndFindEntityAsync } from './util';
+
+chaiSetup.configure();
+
+const ohlcvExternal: OHLCVExternal = {
+ exchange: 'CCCAGG',
+ fromSymbol: 'ETH',
+ toSymbol: 'ZRX',
+ startTime: 1543352400000,
+ endTime: 1543356000000,
+ open: 307.41,
+ close: 310.08,
+ low: 304.6,
+ high: 310.27,
+ volumeFrom: 904.6,
+ volumeTo: 278238.5,
+ source: 'Crypto Compare',
+ observedTimestamp: 1543442338074,
+};
+
+// tslint:disable:custom-no-magic-numbers
+describe('OHLCVExternal entity', () => {
+ it('save/find', async () => {
+ const connection = await createDbConnectionOnceAsync();
+ const repository = connection.getRepository(OHLCVExternal);
+ await testSaveAndFindEntityAsync(repository, ohlcvExternal);
+ });
+});
diff --git a/packages/pipeline/test/entities/relayer_test.ts b/packages/pipeline/test/entities/relayer_test.ts
new file mode 100644
index 000000000..760ffb6f9
--- /dev/null
+++ b/packages/pipeline/test/entities/relayer_test.ts
@@ -0,0 +1,55 @@
+import 'mocha';
+import * as R from 'ramda';
+import 'reflect-metadata';
+
+import { Relayer } from '../../src/entities';
+import { createDbConnectionOnceAsync } from '../db_setup';
+import { chaiSetup } from '../utils/chai_setup';
+
+import { testSaveAndFindEntityAsync } from './util';
+
+chaiSetup.configure();
+
+const baseRelayer = {
+ uuid: 'e8d27d8d-ddf6-48b1-9663-60b0a3ddc716',
+ name: 'Radar Relay',
+ homepageUrl: 'https://radarrelay.com',
+ appUrl: null,
+ sraHttpEndpoint: null,
+ sraWsEndpoint: null,
+ feeRecipientAddresses: [],
+ takerAddresses: [],
+};
+
+const relayerWithUrls = R.merge(baseRelayer, {
+ uuid: 'e8d27d8d-ddf6-48b1-9663-60b0a3ddc717',
+ appUrl: 'https://app.radarrelay.com',
+ sraHttpEndpoint: 'https://api.radarrelay.com/0x/v2/',
+ sraWsEndpoint: 'wss://ws.radarrelay.com/0x/v2',
+});
+
+const relayerWithAddresses = R.merge(baseRelayer, {
+ uuid: 'e8d27d8d-ddf6-48b1-9663-60b0a3ddc718',
+ feeRecipientAddresses: [
+ '0xa258b39954cef5cb142fd567a46cddb31a670124',
+ '0xa258b39954cef5cb142fd567a46cddb31a670125',
+ '0xa258b39954cef5cb142fd567a46cddb31a670126',
+ ],
+ takerAddresses: [
+ '0xa258b39954cef5cb142fd567a46cddb31a670127',
+ '0xa258b39954cef5cb142fd567a46cddb31a670128',
+ '0xa258b39954cef5cb142fd567a46cddb31a670129',
+ ],
+});
+
+// tslint:disable:custom-no-magic-numbers
+describe('Relayer entity', () => {
+ it('save/find', async () => {
+ const connection = await createDbConnectionOnceAsync();
+ const relayers = [baseRelayer, relayerWithUrls, relayerWithAddresses];
+ const relayerRepository = connection.getRepository(Relayer);
+ for (const relayer of relayers) {
+ await testSaveAndFindEntityAsync(relayerRepository, relayer);
+ }
+ });
+});
diff --git a/packages/pipeline/test/entities/sra_order_test.ts b/packages/pipeline/test/entities/sra_order_test.ts
new file mode 100644
index 000000000..c43de8ce8
--- /dev/null
+++ b/packages/pipeline/test/entities/sra_order_test.ts
@@ -0,0 +1,84 @@
+import { BigNumber } from '@0x/utils';
+import 'mocha';
+import * as R from 'ramda';
+import 'reflect-metadata';
+import { Repository } from 'typeorm';
+
+import { SraOrder, SraOrdersObservedTimeStamp } from '../../src/entities';
+import { AssetType } from '../../src/types';
+import { createDbConnectionOnceAsync } from '../db_setup';
+import { chaiSetup } from '../utils/chai_setup';
+
+import { testSaveAndFindEntityAsync } from './util';
+
+chaiSetup.configure();
+
+const baseOrder = {
+ sourceUrl: 'https://api.radarrelay.com/0x/v2',
+ exchangeAddress: '0x4f833a24e1f95d70f028921e27040ca56e09ab0b',
+ makerAddress: '0xb45df06e38540a675fdb5b598abf2c0dbe9d6b81',
+ takerAddress: '0x0000000000000000000000000000000000000000',
+ feeRecipientAddress: '0xa258b39954cef5cb142fd567a46cddb31a670124',
+ senderAddress: '0x0000000000000000000000000000000000000000',
+ makerAssetAmount: new BigNumber('1619310371000000000'),
+ takerAssetAmount: new BigNumber('8178335207070707070707'),
+ makerFee: new BigNumber('100'),
+ takerFee: new BigNumber('200'),
+ expirationTimeSeconds: new BigNumber('1538529488'),
+ salt: new BigNumber('1537924688891'),
+ signature: '0x1b5a5d672b0d647b5797387ccbb89d8',
+ rawMakerAssetData: '0xf47261b0000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc2',
+ makerAssetProxyId: '0xf47261b0',
+ makerTokenAddress: '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2',
+ rawTakerAssetData: '0xf47261b000000000000000000000000042d6622dece394b54999fbd73d108123806f6a18',
+ takerAssetProxyId: '0xf47261b0',
+ takerTokenAddress: '0x42d6622dece394b54999fbd73d108123806f6a18',
+ metadataJson: '{"isThisArbitraryData":true,"powerLevel":9001}',
+};
+
+const erc20Order = R.merge(baseOrder, {
+ orderHashHex: '0x1bdbeb0d088a33da28b9ee6d94e8771452f90f4a69107da2fa75195d61b9a1c9',
+ makerAssetType: 'erc20' as AssetType,
+ makerTokenId: null,
+ takerAssetType: 'erc20' as AssetType,
+ takerTokenId: null,
+});
+
+const erc721Order = R.merge(baseOrder, {
+ orderHashHex: '0x1bdbeb0d088a33da28b9ee6d94e8771452f90f4a69107da2fa75195d61b9a1d0',
+ makerAssetType: 'erc721' as AssetType,
+ makerTokenId: '19378573',
+ takerAssetType: 'erc721' as AssetType,
+ takerTokenId: '63885673888',
+});
+
+// tslint:disable:custom-no-magic-numbers
+describe('SraOrder and SraOrdersObservedTimeStamp entities', () => {
+ // Note(albrow): SraOrder and SraOrdersObservedTimeStamp are tightly coupled
+ // and timestamps have a foreign key constraint such that they have to point
+ // to an existing SraOrder. For these reasons, we are testing them together
+ // in the same test.
+ it('save/find', async () => {
+ const connection = await createDbConnectionOnceAsync();
+ const orderRepository = connection.getRepository(SraOrder);
+ const timestampRepository = connection.getRepository(SraOrdersObservedTimeStamp);
+ const orders = [erc20Order, erc721Order];
+ for (const order of orders) {
+ await testOrderWithTimestampAsync(orderRepository, timestampRepository, order);
+ }
+ });
+});
+
+async function testOrderWithTimestampAsync(
+ orderRepository: Repository<SraOrder>,
+ timestampRepository: Repository<SraOrdersObservedTimeStamp>,
+ order: SraOrder,
+): Promise<void> {
+ await testSaveAndFindEntityAsync(orderRepository, order);
+ const timestamp = new SraOrdersObservedTimeStamp();
+ timestamp.exchangeAddress = order.exchangeAddress;
+ timestamp.orderHashHex = order.orderHashHex;
+ timestamp.sourceUrl = order.sourceUrl;
+ timestamp.observedTimestamp = 1543377376153;
+ await testSaveAndFindEntityAsync(timestampRepository, timestamp);
+}
diff --git a/packages/pipeline/test/entities/token_metadata_test.ts b/packages/pipeline/test/entities/token_metadata_test.ts
new file mode 100644
index 000000000..48e656644
--- /dev/null
+++ b/packages/pipeline/test/entities/token_metadata_test.ts
@@ -0,0 +1,39 @@
+import { BigNumber } from '@0x/utils';
+import 'mocha';
+import 'reflect-metadata';
+
+import { TokenMetadata } from '../../src/entities';
+import { createDbConnectionOnceAsync } from '../db_setup';
+import { chaiSetup } from '../utils/chai_setup';
+
+import { testSaveAndFindEntityAsync } from './util';
+
+chaiSetup.configure();
+
+const metadataWithoutNullFields: TokenMetadata = {
+ address: '0xe41d2489571d322189246dafa5ebde1f4699f498',
+ authority: 'https://website-api.0xproject.com/tokens',
+ decimals: new BigNumber(18),
+ symbol: 'ZRX',
+ name: '0x',
+};
+
+const metadataWithNullFields: TokenMetadata = {
+ address: '0xe41d2489571d322189246dafa5ebde1f4699f499',
+ authority: 'https://website-api.0xproject.com/tokens',
+ decimals: null,
+ symbol: null,
+ name: null,
+};
+
+// tslint:disable:custom-no-magic-numbers
+describe('TokenMetadata entity', () => {
+ it('save/find', async () => {
+ const connection = await createDbConnectionOnceAsync();
+ const tokenMetadata = [metadataWithoutNullFields, metadataWithNullFields];
+ const tokenMetadataRepository = connection.getRepository(TokenMetadata);
+ for (const tokenMetadatum of tokenMetadata) {
+ await testSaveAndFindEntityAsync(tokenMetadataRepository, tokenMetadatum);
+ }
+ });
+});
diff --git a/packages/pipeline/test/entities/token_order_test.ts b/packages/pipeline/test/entities/token_order_test.ts
new file mode 100644
index 000000000..c6057f5aa
--- /dev/null
+++ b/packages/pipeline/test/entities/token_order_test.ts
@@ -0,0 +1,31 @@
+import { BigNumber } from '@0x/utils';
+import 'mocha';
+
+import { TokenOrderbookSnapshot } from '../../src/entities';
+import { createDbConnectionOnceAsync } from '../db_setup';
+import { chaiSetup } from '../utils/chai_setup';
+
+import { testSaveAndFindEntityAsync } from './util';
+
+chaiSetup.configure();
+
+const tokenOrderbookSnapshot: TokenOrderbookSnapshot = {
+ source: 'ddextest',
+ observedTimestamp: Date.now(),
+ orderType: 'bid',
+ price: new BigNumber(10.1),
+ baseAssetSymbol: 'ETH',
+ baseAssetAddress: '0x818e6fecd516ecc3849daf6845e3ec868087b755',
+ baseVolume: new BigNumber(143),
+ quoteAssetSymbol: 'ABC',
+ quoteAssetAddress: '0x00923b9a074762b93650716333b3e1473a15048e',
+ quoteVolume: new BigNumber(12.3234234),
+};
+
+describe('TokenOrderbookSnapshot entity', () => {
+ it('save/find', async () => {
+ const connection = await createDbConnectionOnceAsync();
+ const tokenOrderbookSnapshotRepository = connection.getRepository(TokenOrderbookSnapshot);
+ await testSaveAndFindEntityAsync(tokenOrderbookSnapshotRepository, tokenOrderbookSnapshot);
+ });
+});
diff --git a/packages/pipeline/test/entities/transaction_test.ts b/packages/pipeline/test/entities/transaction_test.ts
new file mode 100644
index 000000000..634844544
--- /dev/null
+++ b/packages/pipeline/test/entities/transaction_test.ts
@@ -0,0 +1,26 @@
+import { BigNumber } from '@0x/utils';
+import 'mocha';
+import 'reflect-metadata';
+
+import { Transaction } from '../../src/entities';
+import { createDbConnectionOnceAsync } from '../db_setup';
+import { chaiSetup } from '../utils/chai_setup';
+
+import { testSaveAndFindEntityAsync } from './util';
+
+chaiSetup.configure();
+
+// tslint:disable:custom-no-magic-numbers
+describe('Transaction entity', () => {
+ it('save/find', async () => {
+ const connection = await createDbConnectionOnceAsync();
+ const transactionRepository = connection.getRepository(Transaction);
+ const transaction = new Transaction();
+ transaction.blockHash = '0x6ff106d00b6c3746072fc06bae140fb2549036ba7bcf9184ae19a42fd33657fd';
+ transaction.blockNumber = 6276262;
+ transaction.gasPrice = new BigNumber(3000000);
+ transaction.gasUsed = new BigNumber(125000);
+ transaction.transactionHash = '0x6dd106d002873746072fc5e496dd0fb2541b68c77bcf9184ae19a42fd33657fe';
+ await testSaveAndFindEntityAsync(transactionRepository, transaction);
+ });
+});
diff --git a/packages/pipeline/test/entities/util.ts b/packages/pipeline/test/entities/util.ts
new file mode 100644
index 000000000..043a3b15d
--- /dev/null
+++ b/packages/pipeline/test/entities/util.ts
@@ -0,0 +1,25 @@
+import * as chai from 'chai';
+import 'mocha';
+
+import { Repository } from 'typeorm';
+
+const expect = chai.expect;
+
+/**
+ * First saves the given entity to the database, then finds it and makes sure
+ * that the found entity is exactly equal to the original one. This is a bare
+ * minimum basic test to make sure that the entity type definition and our
+ * database schema are aligned and that it is possible to save and find the
+ * entity.
+ * @param repository A TypeORM repository corresponding with the type of the entity.
+ * @param entity An instance of a TypeORM entity which will be saved/retrieved from the database.
+ */
+export async function testSaveAndFindEntityAsync<T>(repository: Repository<T>, entity: T): Promise<void> {
+ // Note(albrow): We are forced to use an 'as any' hack here because
+ // TypeScript complains about stack depth when checking the types.
+ await repository.save(entity as any);
+ const gotEntity = await repository.findOneOrFail({
+ where: entity,
+ });
+ expect(gotEntity).deep.equal(entity);
+}
diff --git a/packages/pipeline/test/parsers/bloxy/index_test.ts b/packages/pipeline/test/parsers/bloxy/index_test.ts
new file mode 100644
index 000000000..2b8d68f98
--- /dev/null
+++ b/packages/pipeline/test/parsers/bloxy/index_test.ts
@@ -0,0 +1,99 @@
+// tslint:disable:custom-no-magic-numbers
+import { BigNumber } from '@0x/utils';
+import * as chai from 'chai';
+import 'mocha';
+import * as R from 'ramda';
+
+import { BLOXY_DEX_TRADES_URL, BloxyTrade } from '../../../src/data_sources/bloxy';
+import { DexTrade } from '../../../src/entities';
+import { _parseBloxyTrade } from '../../../src/parsers/bloxy';
+import { _convertToExchangeFillEvent } from '../../../src/parsers/events';
+import { chaiSetup } from '../../utils/chai_setup';
+
+chaiSetup.configure();
+const expect = chai.expect;
+
+const baseInput: BloxyTrade = {
+ tx_hash: '0xb93a7faf92efbbb5405c9a73cd4efd99702fe27c03ff22baee1f1b1e37b3a0bf',
+ tx_time: '2018-11-21T09:06:28.000+00:00',
+ tx_date: '2018-11-21',
+ tx_sender: '0x00923b9a074762b93650716333b3e1473a15048e',
+ smart_contract_id: 7091917,
+ smart_contract_address: '0x818e6fecd516ecc3849daf6845e3ec868087b755',
+ contract_type: 'DEX/Kyber Network Proxy',
+ maker: '0x0000000000000000000000000000000000000001',
+ taker: '0x0000000000000000000000000000000000000002',
+ amountBuy: 1.011943163078103,
+ makerFee: 38.912083,
+ buyCurrencyId: 1,
+ buySymbol: 'ETH',
+ amountSell: 941.4997928436911,
+ takerFee: 100.39,
+ sellCurrencyId: 16610,
+ sellSymbol: 'ELF',
+ maker_annotation: 'random annotation',
+ taker_annotation: 'random other annotation',
+ protocol: 'Kyber Network Proxy',
+ buyAddress: '0xbf2179859fc6d5bee9bf9158632dc51678a4100d',
+ sellAddress: '0xbf2179859fc6d5bee9bf9158632dc51678a4100e',
+};
+
+const baseExpected: DexTrade = {
+ sourceUrl: BLOXY_DEX_TRADES_URL,
+ txHash: '0xb93a7faf92efbbb5405c9a73cd4efd99702fe27c03ff22baee1f1b1e37b3a0bf',
+ txTimestamp: 1542791188000,
+ txDate: '2018-11-21',
+ txSender: '0x00923b9a074762b93650716333b3e1473a15048e',
+ smartContractId: 7091917,
+ smartContractAddress: '0x818e6fecd516ecc3849daf6845e3ec868087b755',
+ contractType: 'DEX/Kyber Network Proxy',
+ maker: '0x0000000000000000000000000000000000000001',
+ taker: '0x0000000000000000000000000000000000000002',
+ amountBuy: new BigNumber('1.011943163078103'),
+ makerFeeAmount: new BigNumber('38.912083'),
+ buyCurrencyId: 1,
+ buySymbol: 'ETH',
+ amountSell: new BigNumber('941.4997928436911'),
+ takerFeeAmount: new BigNumber('100.39'),
+ sellCurrencyId: 16610,
+ sellSymbol: 'ELF',
+ makerAnnotation: 'random annotation',
+ takerAnnotation: 'random other annotation',
+ protocol: 'Kyber Network Proxy',
+ buyAddress: '0xbf2179859fc6d5bee9bf9158632dc51678a4100d',
+ sellAddress: '0xbf2179859fc6d5bee9bf9158632dc51678a4100e',
+};
+
+interface TestCase {
+ input: BloxyTrade;
+ expected: DexTrade;
+}
+
+const testCases: TestCase[] = [
+ {
+ input: baseInput,
+ expected: baseExpected,
+ },
+ {
+ input: R.merge(baseInput, { buyAddress: null, sellAddress: null }),
+ expected: R.merge(baseExpected, { buyAddress: null, sellAddress: null }),
+ },
+ {
+ input: R.merge(baseInput, {
+ buySymbol:
+ 'RING\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000',
+ }),
+ expected: R.merge(baseExpected, { buySymbol: 'RING' }),
+ },
+];
+
+describe('bloxy', () => {
+ describe('_parseBloxyTrade', () => {
+ for (const [i, testCase] of testCases.entries()) {
+ it(`converts BloxyTrade to DexTrade entity (${i + 1}/${testCases.length})`, () => {
+ const actual = _parseBloxyTrade(testCase.input);
+ expect(actual).deep.equal(testCase.expected);
+ });
+ }
+ });
+});
diff --git a/packages/pipeline/test/parsers/ddex_orders/index_test.ts b/packages/pipeline/test/parsers/ddex_orders/index_test.ts
new file mode 100644
index 000000000..213100f44
--- /dev/null
+++ b/packages/pipeline/test/parsers/ddex_orders/index_test.ts
@@ -0,0 +1,66 @@
+import { BigNumber } from '@0x/utils';
+import * as chai from 'chai';
+import 'mocha';
+
+import { DdexMarket } from '../../../src/data_sources/ddex';
+import { TokenOrderbookSnapshot as TokenOrder } from '../../../src/entities';
+import { aggregateOrders, parseDdexOrder } from '../../../src/parsers/ddex_orders';
+import { OrderType } from '../../../src/types';
+import { chaiSetup } from '../../utils/chai_setup';
+
+chaiSetup.configure();
+const expect = chai.expect;
+
+// tslint:disable:custom-no-magic-numbers
+describe('ddex_orders', () => {
+ describe('aggregateOrders', () => {
+ it('aggregates orders by price point', () => {
+ const input = [
+ { price: '1', amount: '20', orderId: 'testtest' },
+ { price: '1', amount: '30', orderId: 'testone' },
+ { price: '2', amount: '100', orderId: 'testtwo' },
+ ];
+ const expected = [['1', new BigNumber(50)], ['2', new BigNumber(100)]];
+ const actual = aggregateOrders(input);
+ expect(actual).deep.equal(expected);
+ });
+ });
+
+ describe('parseDdexOrder', () => {
+ it('converts ddexOrder to TokenOrder entity', () => {
+ const ddexOrder: [string, BigNumber] = ['0.5', new BigNumber(10)];
+ const ddexMarket: DdexMarket = {
+ id: 'ABC-DEF',
+ quoteToken: 'ABC',
+ quoteTokenDecimals: 5,
+ quoteTokenAddress: '0x0000000000000000000000000000000000000000',
+ baseToken: 'DEF',
+ baseTokenDecimals: 2,
+ baseTokenAddress: '0xb45df06e38540a675fdb5b598abf2c0dbe9d6b81',
+ minOrderSize: '0.1',
+ maxOrderSize: '1000',
+ pricePrecision: 1,
+ priceDecimals: 1,
+ amountDecimals: 0,
+ };
+ const observedTimestamp: number = Date.now();
+ const orderType: OrderType = 'bid';
+ const source: string = 'ddex';
+
+ const expected = new TokenOrder();
+ expected.source = 'ddex';
+ expected.observedTimestamp = observedTimestamp;
+ expected.orderType = 'bid';
+ expected.price = new BigNumber(0.5);
+ expected.baseAssetSymbol = 'DEF';
+ expected.baseAssetAddress = '0xb45df06e38540a675fdb5b598abf2c0dbe9d6b81';
+ expected.baseVolume = new BigNumber(5);
+ expected.quoteAssetSymbol = 'ABC';
+ expected.quoteAssetAddress = '0x0000000000000000000000000000000000000000';
+ expected.quoteVolume = new BigNumber(10);
+
+ const actual = parseDdexOrder(ddexMarket, observedTimestamp, orderType, source, ddexOrder);
+ expect(actual).deep.equal(expected);
+ });
+ });
+});
diff --git a/packages/pipeline/test/parsers/events/index_test.ts b/packages/pipeline/test/parsers/events/index_test.ts
new file mode 100644
index 000000000..7e439ce39
--- /dev/null
+++ b/packages/pipeline/test/parsers/events/index_test.ts
@@ -0,0 +1,78 @@
+import { ExchangeFillEventArgs } from '@0x/contract-wrappers';
+import { BigNumber } from '@0x/utils';
+import * as chai from 'chai';
+import { LogWithDecodedArgs } from 'ethereum-types';
+import 'mocha';
+
+import { ExchangeFillEvent } from '../../../src/entities';
+import { _convertToExchangeFillEvent } from '../../../src/parsers/events';
+import { chaiSetup } from '../../utils/chai_setup';
+
+chaiSetup.configure();
+const expect = chai.expect;
+
+// tslint:disable:custom-no-magic-numbers
+describe('exchange_events', () => {
+ describe('_convertToExchangeFillEvent', () => {
+ it('converts LogWithDecodedArgs to ExchangeFillEvent entity', () => {
+ const input: LogWithDecodedArgs<ExchangeFillEventArgs> = {
+ logIndex: 102,
+ transactionIndex: 38,
+ transactionHash: '0x6dd106d002873746072fc5e496dd0fb2541b68c77bcf9184ae19a42fd33657fe',
+ blockHash: '',
+ blockNumber: 6276262,
+ address: '0x4f833a24e1f95d70f028921e27040ca56e09ab0b',
+ data:
+ '0x000000000000000000000000f6da68519f78b0d0bc93c701e86affcb75c92428000000000000000000000000f6da68519f78b0d0bc93c701e86affcb75c92428000000000000000000000000000000000000000000000000002386f26fc10000000000000000000000000000000000000000000000000000016345785d8a000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000001600000000000000000000000000000000000000000000000000000000000000024f47261b0000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc2000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000024f47261b0000000000000000000000000e41d2489571d322189246dafa5ebde1f4699f49800000000000000000000000000000000000000000000000000000000',
+ topics: [
+ '0x0bcc4c97732e47d9946f229edb95f5b6323f601300e4690de719993f3c371129',
+ '0x000000000000000000000000f6da68519f78b0d0bc93c701e86affcb75c92428',
+ '0x000000000000000000000000c370d2a5920344aa6b7d8d11250e3e861434cbdd',
+ '0xab12ed2cbaa5615ab690b9da75a46e53ddfcf3f1a68655b5fe0d94c75a1aac4a',
+ ],
+ event: 'Fill',
+ args: {
+ makerAddress: '0xf6da68519f78b0d0bc93c701e86affcb75c92428',
+ feeRecipientAddress: '0xc370d2a5920344aa6b7d8d11250e3e861434cbdd',
+ takerAddress: '0xf6da68519f78b0d0bc93c701e86affcb75c92428',
+ senderAddress: '0xf6da68519f78b0d0bc93c701e86affcb75c92428',
+ makerAssetFilledAmount: new BigNumber('10000000000000000'),
+ takerAssetFilledAmount: new BigNumber('100000000000000000'),
+ makerFeePaid: new BigNumber('0'),
+ takerFeePaid: new BigNumber('12345'),
+ orderHash: '0xab12ed2cbaa5615ab690b9da75a46e53ddfcf3f1a68655b5fe0d94c75a1aac4a',
+ makerAssetData: '0xf47261b0000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc2',
+ takerAssetData: '0xf47261b0000000000000000000000000e41d2489571d322189246dafa5ebde1f4699f498',
+ },
+ };
+ const expected = new ExchangeFillEvent();
+ expected.contractAddress = '0x4f833a24e1f95d70f028921e27040ca56e09ab0b';
+ expected.blockNumber = 6276262;
+ expected.logIndex = 102;
+ expected.rawData =
+ '0x000000000000000000000000f6da68519f78b0d0bc93c701e86affcb75c92428000000000000000000000000f6da68519f78b0d0bc93c701e86affcb75c92428000000000000000000000000000000000000000000000000002386f26fc10000000000000000000000000000000000000000000000000000016345785d8a000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000001600000000000000000000000000000000000000000000000000000000000000024f47261b0000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc2000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000024f47261b0000000000000000000000000e41d2489571d322189246dafa5ebde1f4699f49800000000000000000000000000000000000000000000000000000000';
+ expected.transactionHash = '0x6dd106d002873746072fc5e496dd0fb2541b68c77bcf9184ae19a42fd33657fe';
+ expected.makerAddress = '0xf6da68519f78b0d0bc93c701e86affcb75c92428';
+ expected.takerAddress = '0xf6da68519f78b0d0bc93c701e86affcb75c92428';
+ expected.feeRecipientAddress = '0xc370d2a5920344aa6b7d8d11250e3e861434cbdd';
+ expected.senderAddress = '0xf6da68519f78b0d0bc93c701e86affcb75c92428';
+ expected.makerAssetFilledAmount = new BigNumber('10000000000000000');
+ expected.takerAssetFilledAmount = new BigNumber('100000000000000000');
+ expected.makerFeePaid = new BigNumber('0');
+ expected.takerFeePaid = new BigNumber('12345');
+ expected.orderHash = '0xab12ed2cbaa5615ab690b9da75a46e53ddfcf3f1a68655b5fe0d94c75a1aac4a';
+ expected.rawMakerAssetData = '0xf47261b0000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc2';
+ expected.makerAssetType = 'erc20';
+ expected.makerAssetProxyId = '0xf47261b0';
+ expected.makerTokenAddress = '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2';
+ expected.makerTokenId = null;
+ expected.rawTakerAssetData = '0xf47261b0000000000000000000000000e41d2489571d322189246dafa5ebde1f4699f498';
+ expected.takerAssetType = 'erc20';
+ expected.takerAssetProxyId = '0xf47261b0';
+ expected.takerTokenAddress = '0xe41d2489571d322189246dafa5ebde1f4699f498';
+ expected.takerTokenId = null;
+ const actual = _convertToExchangeFillEvent(input);
+ expect(actual).deep.equal(expected);
+ });
+ });
+});
diff --git a/packages/pipeline/test/parsers/ohlcv_external/crypto_compare_test.ts b/packages/pipeline/test/parsers/ohlcv_external/crypto_compare_test.ts
new file mode 100644
index 000000000..118cafc5e
--- /dev/null
+++ b/packages/pipeline/test/parsers/ohlcv_external/crypto_compare_test.ts
@@ -0,0 +1,62 @@
+import * as chai from 'chai';
+import 'mocha';
+import * as R from 'ramda';
+
+import { CryptoCompareOHLCVRecord } from '../../../src/data_sources/ohlcv_external/crypto_compare';
+import { OHLCVExternal } from '../../../src/entities';
+import { OHLCVMetadata, parseRecords } from '../../../src/parsers/ohlcv_external/crypto_compare';
+import { chaiSetup } from '../../utils/chai_setup';
+
+chaiSetup.configure();
+const expect = chai.expect;
+
+// tslint:disable:custom-no-magic-numbers
+describe('ohlcv_external parser (Crypto Compare)', () => {
+ describe('parseRecords', () => {
+ const record: CryptoCompareOHLCVRecord = {
+ time: 200,
+ close: 100,
+ high: 101,
+ low: 99,
+ open: 98,
+ volumefrom: 1234,
+ volumeto: 4321,
+ };
+
+ const metadata: OHLCVMetadata = {
+ fromSymbol: 'ETH',
+ toSymbol: 'ZRX',
+ exchange: 'CCCAGG',
+ source: 'CryptoCompare',
+ observedTimestamp: new Date().getTime(),
+ interval: 100000,
+ };
+
+ const entity = new OHLCVExternal();
+ entity.exchange = metadata.exchange;
+ entity.fromSymbol = metadata.fromSymbol;
+ entity.toSymbol = metadata.toSymbol;
+ entity.startTime = 100000;
+ entity.endTime = 200000;
+ entity.open = record.open;
+ entity.close = record.close;
+ entity.low = record.low;
+ entity.high = record.high;
+ entity.volumeFrom = record.volumefrom;
+ entity.volumeTo = record.volumeto;
+ entity.source = metadata.source;
+ entity.observedTimestamp = metadata.observedTimestamp;
+
+ it('converts Crypto Compare OHLCV records to OHLCVExternal entity', () => {
+ const input = [record, R.merge(record, { time: 300 }), R.merge(record, { time: 400 })];
+ const expected = [
+ entity,
+ R.merge(entity, { startTime: 200000, endTime: 300000 }),
+ R.merge(entity, { startTime: 300000, endTime: 400000 }),
+ ];
+
+ const actual = parseRecords(input, metadata);
+ expect(actual).deep.equal(expected);
+ });
+ });
+});
diff --git a/packages/pipeline/test/parsers/paradex_orders/index_test.ts b/packages/pipeline/test/parsers/paradex_orders/index_test.ts
new file mode 100644
index 000000000..1522806bf
--- /dev/null
+++ b/packages/pipeline/test/parsers/paradex_orders/index_test.ts
@@ -0,0 +1,54 @@
+import { BigNumber } from '@0x/utils';
+import * as chai from 'chai';
+import 'mocha';
+
+import { ParadexMarket, ParadexOrder } from '../../../src/data_sources/paradex';
+import { TokenOrderbookSnapshot as TokenOrder } from '../../../src/entities';
+import { parseParadexOrder } from '../../../src/parsers/paradex_orders';
+import { OrderType } from '../../../src/types';
+import { chaiSetup } from '../../utils/chai_setup';
+
+chaiSetup.configure();
+const expect = chai.expect;
+
+// tslint:disable:custom-no-magic-numbers
+describe('paradex_orders', () => {
+ describe('parseParadexOrder', () => {
+ it('converts ParadexOrder to TokenOrder entity', () => {
+ const paradexOrder: ParadexOrder = {
+ amount: '412',
+ price: '0.1245',
+ };
+ const paradexMarket: ParadexMarket = {
+ id: '2',
+ symbol: 'ABC/DEF',
+ baseToken: 'DEF',
+ quoteToken: 'ABC',
+ minOrderSize: '0.1',
+ maxOrderSize: '1000',
+ priceMaxDecimals: 5,
+ amountMaxDecimals: 5,
+ baseTokenAddress: '0xb45df06e38540a675fdb5b598abf2c0dbe9d6b81',
+ quoteTokenAddress: '0x0000000000000000000000000000000000000000',
+ };
+ const observedTimestamp: number = Date.now();
+ const orderType: OrderType = 'bid';
+ const source: string = 'paradex';
+
+ const expected = new TokenOrder();
+ expected.source = 'paradex';
+ expected.observedTimestamp = observedTimestamp;
+ expected.orderType = 'bid';
+ expected.price = new BigNumber(0.1245);
+ expected.baseAssetSymbol = 'DEF';
+ expected.baseAssetAddress = '0xb45df06e38540a675fdb5b598abf2c0dbe9d6b81';
+ expected.baseVolume = new BigNumber(412 * 0.1245);
+ expected.quoteAssetSymbol = 'ABC';
+ expected.quoteAssetAddress = '0x0000000000000000000000000000000000000000';
+ expected.quoteVolume = new BigNumber(412);
+
+ const actual = parseParadexOrder(paradexMarket, observedTimestamp, orderType, source, paradexOrder);
+ expect(actual).deep.equal(expected);
+ });
+ });
+});
diff --git a/packages/pipeline/test/parsers/sra_orders/index_test.ts b/packages/pipeline/test/parsers/sra_orders/index_test.ts
new file mode 100644
index 000000000..ee2842ef3
--- /dev/null
+++ b/packages/pipeline/test/parsers/sra_orders/index_test.ts
@@ -0,0 +1,68 @@
+import { APIOrder } from '@0x/types';
+import { BigNumber } from '@0x/utils';
+import * as chai from 'chai';
+import 'mocha';
+
+import { SraOrder } from '../../../src/entities';
+import { _convertToEntity } from '../../../src/parsers/sra_orders';
+import { chaiSetup } from '../../utils/chai_setup';
+
+chaiSetup.configure();
+const expect = chai.expect;
+
+// tslint:disable:custom-no-magic-numbers
+describe('sra_orders', () => {
+ describe('_convertToEntity', () => {
+ it('converts ApiOrder to SraOrder entity', () => {
+ const input: APIOrder = {
+ order: {
+ makerAddress: '0xb45df06e38540a675fdb5b598abf2c0dbe9d6b81',
+ takerAddress: '0x0000000000000000000000000000000000000000',
+ feeRecipientAddress: '0xa258b39954cef5cb142fd567a46cddb31a670124',
+ senderAddress: '0x0000000000000000000000000000000000000000',
+ makerAssetAmount: new BigNumber('1619310371000000000'),
+ takerAssetAmount: new BigNumber('8178335207070707070707'),
+ makerFee: new BigNumber('0'),
+ takerFee: new BigNumber('0'),
+ exchangeAddress: '0x4f833a24e1f95d70f028921e27040ca56e09ab0b',
+ expirationTimeSeconds: new BigNumber('1538529488'),
+ signature:
+ '0x1b5a5d672b0d647b5797387ccbb89d822d5d2e873346b014f4ff816ff0783f2a7a0d2824d2d7042ec8ea375bc7f870963e1cb8248f1db03ddf125e27b5963aa11f03',
+ salt: new BigNumber('1537924688891'),
+ makerAssetData: '0xf47261b0000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc2',
+ takerAssetData: '0xf47261b000000000000000000000000042d6622dece394b54999fbd73d108123806f6a18',
+ },
+ metaData: { isThisArbitraryData: true, powerLevel: 9001 },
+ };
+ const expected = new SraOrder();
+ expected.exchangeAddress = '0x4f833a24e1f95d70f028921e27040ca56e09ab0b';
+ expected.orderHashHex = '0x1bdbeb0d088a33da28b9ee6d94e8771452f90f4a69107da2fa75195d61b9a1c9';
+ expected.makerAddress = '0xb45df06e38540a675fdb5b598abf2c0dbe9d6b81';
+ expected.takerAddress = '0x0000000000000000000000000000000000000000';
+ expected.feeRecipientAddress = '0xa258b39954cef5cb142fd567a46cddb31a670124';
+ expected.senderAddress = '0x0000000000000000000000000000000000000000';
+ expected.makerAssetAmount = new BigNumber('1619310371000000000');
+ expected.takerAssetAmount = new BigNumber('8178335207070707070707');
+ expected.makerFee = new BigNumber('0');
+ expected.takerFee = new BigNumber('0');
+ expected.expirationTimeSeconds = new BigNumber('1538529488');
+ expected.salt = new BigNumber('1537924688891');
+ expected.signature =
+ '0x1b5a5d672b0d647b5797387ccbb89d822d5d2e873346b014f4ff816ff0783f2a7a0d2824d2d7042ec8ea375bc7f870963e1cb8248f1db03ddf125e27b5963aa11f03';
+ expected.rawMakerAssetData = '0xf47261b0000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc2';
+ expected.makerAssetType = 'erc20';
+ expected.makerAssetProxyId = '0xf47261b0';
+ expected.makerTokenAddress = '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2';
+ expected.makerTokenId = null;
+ expected.rawTakerAssetData = '0xf47261b000000000000000000000000042d6622dece394b54999fbd73d108123806f6a18';
+ expected.takerAssetType = 'erc20';
+ expected.takerAssetProxyId = '0xf47261b0';
+ expected.takerTokenAddress = '0x42d6622dece394b54999fbd73d108123806f6a18';
+ expected.takerTokenId = null;
+ expected.metadataJson = '{"isThisArbitraryData":true,"powerLevel":9001}';
+
+ const actual = _convertToEntity(input);
+ expect(actual).deep.equal(expected);
+ });
+ });
+});
diff --git a/packages/pipeline/test/utils/chai_setup.ts b/packages/pipeline/test/utils/chai_setup.ts
new file mode 100644
index 000000000..1a8733093
--- /dev/null
+++ b/packages/pipeline/test/utils/chai_setup.ts
@@ -0,0 +1,13 @@
+import * as chai from 'chai';
+import chaiAsPromised = require('chai-as-promised');
+import ChaiBigNumber = require('chai-bignumber');
+import * as dirtyChai from 'dirty-chai';
+
+export const chaiSetup = {
+ configure(): void {
+ chai.config.includeStack = true;
+ chai.use(ChaiBigNumber());
+ chai.use(dirtyChai);
+ chai.use(chaiAsPromised);
+ },
+};
diff --git a/packages/pipeline/tsconfig.json b/packages/pipeline/tsconfig.json
new file mode 100644
index 000000000..6f138f260
--- /dev/null
+++ b/packages/pipeline/tsconfig.json
@@ -0,0 +1,10 @@
+{
+ "extends": "../../tsconfig",
+ "compilerOptions": {
+ "outDir": "lib",
+ "rootDir": ".",
+ "emitDecoratorMetadata": true,
+ "experimentalDecorators": true
+ },
+ "include": ["./src/**/*", "./test/**/*", "./migrations/**/*"]
+}
diff --git a/packages/pipeline/tslint.json b/packages/pipeline/tslint.json
new file mode 100644
index 000000000..dd9053357
--- /dev/null
+++ b/packages/pipeline/tslint.json
@@ -0,0 +1,3 @@
+{
+ "extends": ["@0x/tslint-config"]
+}
diff --git a/packages/pipeline/typedoc-tsconfig.json b/packages/pipeline/typedoc-tsconfig.json
new file mode 100644
index 000000000..8b0ff51c1
--- /dev/null
+++ b/packages/pipeline/typedoc-tsconfig.json
@@ -0,0 +1,10 @@
+{
+ "extends": "../../typedoc-tsconfig",
+ "compilerOptions": {
+ "outDir": "lib",
+ "rootDir": ".",
+ "emitDecoratorMetadata": true,
+ "experimentalDecorators": true
+ },
+ "include": ["./src/**/*", "./test/**/*", "./migrations/**/*"]
+}