diff options
author | Fabio Berger <me@fabioberger.com> | 2018-12-19 22:16:35 +0800 |
---|---|---|
committer | Fabio Berger <me@fabioberger.com> | 2018-12-19 22:16:35 +0800 |
commit | 293dadc22aedcaf540f2dc17c8c38087e7ace037 (patch) | |
tree | 79f624ab03071a28da83c7bf542acfe0dd7af8cb /packages/pipeline | |
parent | ddf3bb7c0446f2d85b6fa55cbe0b00b227f08af7 (diff) | |
parent | 040b402b6d558d13f2f4e032297b6723cdf2aafe (diff) | |
download | dexon-sol-tools-293dadc22aedcaf540f2dc17c8c38087e7ace037.tar dexon-sol-tools-293dadc22aedcaf540f2dc17c8c38087e7ace037.tar.gz dexon-sol-tools-293dadc22aedcaf540f2dc17c8c38087e7ace037.tar.bz2 dexon-sol-tools-293dadc22aedcaf540f2dc17c8c38087e7ace037.tar.lz dexon-sol-tools-293dadc22aedcaf540f2dc17c8c38087e7ace037.tar.xz dexon-sol-tools-293dadc22aedcaf540f2dc17c8c38087e7ace037.tar.zst dexon-sol-tools-293dadc22aedcaf540f2dc17c8c38087e7ace037.zip |
Merge branch 'development' into website/addPySRA
* development: (141 commits)
Add missing CHANGELOG entry for OrderWatcher WS interface
Bump up stale to close to 30 days
Move onMessageAsync outside of tests and add comments
Fix WS tests to remove race-condition and be more specific about the message expected
Add temporary console.log to test failing on CI
Make @0x/contracts-test-utils a dependency instead of a devDependency
Fix test-publish failure in contracts packages
Fixed solhint errors
Added documentation to `LibAddressArray.append` and switched `if` to `require` smt
Updated changelogs for new contracts
Added `gas` field so tests pass on Geth;
Added Changelog for new Extensions
Updated comment `Execute fillOrder` -> `Execute exchange function`
Explicit returns
Prettier / Linter on contracts + TS
Refactoring balance threshold filter
Moved exchange calldata functions to separate mixin
Less Assembly. More Solidity. Less Efficiency. More Readability.
Run all tests for extensions
Cleaned up tests for balance threshold filter
...
Diffstat (limited to 'packages/pipeline')
10 files changed, 122 insertions, 68 deletions
diff --git a/packages/pipeline/package.json b/packages/pipeline/package.json index f8adf9055..a40f3d21c 100644 --- a/packages/pipeline/package.json +++ b/packages/pipeline/package.json @@ -1,6 +1,6 @@ { "name": "@0x/pipeline", - "version": "1.0.1", + "version": "1.0.2", "private": true, "description": "Data pipeline for offline analysis", "scripts": { @@ -27,7 +27,7 @@ }, "license": "Apache-2.0", "devDependencies": { - "@0x/tslint-config": "^1.0.9", + "@0x/tslint-config": "^2.0.0", "@types/axios": "^0.14.0", "@types/ramda": "^0.25.38", "chai": "^4.1.2", @@ -39,23 +39,23 @@ "typescript": "3.0.1" }, "dependencies": { - "@0x/connect": "^3.0.9", + "@0x/connect": "^3.0.10", "@0x/contract-addresses": "^2.0.0", "@0x/contract-artifacts": "^1.0.1", "@0x/contract-wrappers": "^3.0.0", - "@0x/dev-utils": "^1.0.20", + "@0x/dev-utils": "^1.0.21", "@0x/order-utils": "^2.0.0", - "@0x/subproviders": "^2.1.7", - "@0x/types": "^1.4.0", - "@0x/utils": "^2.0.7", - "@0x/web3-wrapper": "^3.2.0", + "@0x/subproviders": "^2.1.8", + "@0x/types": "^1.4.1", + "@0x/utils": "^2.0.8", + "@0x/web3-wrapper": "^3.2.1", "@types/dockerode": "^2.5.9", "@types/p-limit": "^2.0.0", "async-parallel": "^1.2.3", "axios": "^0.18.0", "bottleneck": "^2.13.2", "dockerode": "^2.5.7", - "ethereum-types": "^1.1.3", + "ethereum-types": "^1.1.4", "pg": "^7.5.0", "prettier": "^1.15.3", "ramda": "^0.25.0", diff --git a/packages/pipeline/src/parsers/ddex_orders/index.ts b/packages/pipeline/src/parsers/ddex_orders/index.ts index 52a998f9f..d7b97efbe 100644 --- a/packages/pipeline/src/parsers/ddex_orders/index.ts +++ b/packages/pipeline/src/parsers/ddex_orders/index.ts @@ -54,12 +54,14 @@ export function parseDdexOrder( tokenOrder.orderType = orderType; tokenOrder.price = price; - tokenOrder.baseAssetSymbol = ddexMarket.baseToken; - tokenOrder.baseAssetAddress = ddexMarket.baseTokenAddress; - tokenOrder.baseVolume = price.times(amount); + // ddex currently confuses quote and base assets. + // We switch them here to maintain our internal consistency. + tokenOrder.baseAssetSymbol = ddexMarket.quoteToken; + tokenOrder.baseAssetAddress = ddexMarket.quoteTokenAddress; + tokenOrder.baseVolume = amount; - tokenOrder.quoteAssetSymbol = ddexMarket.quoteToken; - tokenOrder.quoteAssetAddress = ddexMarket.quoteTokenAddress; - tokenOrder.quoteVolume = amount; + tokenOrder.quoteAssetSymbol = ddexMarket.baseToken; + tokenOrder.quoteAssetAddress = ddexMarket.baseTokenAddress; + tokenOrder.quoteVolume = price.times(amount); return tokenOrder; } diff --git a/packages/pipeline/src/parsers/oasis_orders/index.ts b/packages/pipeline/src/parsers/oasis_orders/index.ts index 7aafbf460..13997f31b 100644 --- a/packages/pipeline/src/parsers/oasis_orders/index.ts +++ b/packages/pipeline/src/parsers/oasis_orders/index.ts @@ -62,10 +62,10 @@ export function parseOasisOrder( tokenOrder.baseAssetSymbol = oasisMarket.base; tokenOrder.baseAssetAddress = null; // Oasis doesn't provide address information - tokenOrder.baseVolume = price.times(amount); + tokenOrder.baseVolume = amount; tokenOrder.quoteAssetSymbol = oasisMarket.quote; tokenOrder.quoteAssetAddress = null; // Oasis doesn't provide address information - tokenOrder.quoteVolume = amount; + tokenOrder.quoteVolume = price.times(amount); return tokenOrder; } diff --git a/packages/pipeline/src/parsers/paradex_orders/index.ts b/packages/pipeline/src/parsers/paradex_orders/index.ts index 7966658a7..5ceeb64a4 100644 --- a/packages/pipeline/src/parsers/paradex_orders/index.ts +++ b/packages/pipeline/src/parsers/paradex_orders/index.ts @@ -57,10 +57,10 @@ export function parseParadexOrder( tokenOrder.baseAssetSymbol = paradexMarket.baseToken; tokenOrder.baseAssetAddress = paradexMarket.baseTokenAddress as string; - tokenOrder.baseVolume = price.times(amount); + tokenOrder.baseVolume = amount; tokenOrder.quoteAssetSymbol = paradexMarket.quoteToken; tokenOrder.quoteAssetAddress = paradexMarket.quoteTokenAddress as string; - tokenOrder.quoteVolume = amount; + tokenOrder.quoteVolume = price.times(amount); return tokenOrder; } diff --git a/packages/pipeline/src/scripts/pull_erc20_events.ts b/packages/pipeline/src/scripts/pull_erc20_events.ts index 0ad12c97a..bd520c610 100644 --- a/packages/pipeline/src/scripts/pull_erc20_events.ts +++ b/packages/pipeline/src/scripts/pull_erc20_events.ts @@ -1,10 +1,10 @@ -// tslint:disable:no-console import { getContractAddressesForNetworkOrThrow } from '@0x/contract-addresses'; import { web3Factory } from '@0x/dev-utils'; import { Web3ProviderEngine } from '@0x/subproviders'; +import { logUtils } from '@0x/utils'; import { Web3Wrapper } from '@0x/web3-wrapper'; import 'reflect-metadata'; -import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; +import { Connection, ConnectionOptions, createConnection } from 'typeorm'; import { ERC20EventsSource } from '../data_sources/contract-wrappers/erc20_events'; import { ERC20ApprovalEvent } from '../entities'; @@ -16,33 +16,63 @@ const NETWORK_ID = 1; const START_BLOCK_OFFSET = 100; // Number of blocks before the last known block to consider when updating fill events. const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. const BLOCK_FINALITY_THRESHOLD = 10; // When to consider blocks as final. Used to compute default endBlock. -const WETH_START_BLOCK = 4719568; // Block number when the WETH contract was deployed. let connection: Connection; +interface Token { + // name is used for logging only. + name: string; + address: string; + defaultStartBlock: number; +} + +const tokensToGetApprovalEvents: Token[] = [ + { + name: 'WETH', + address: getContractAddressesForNetworkOrThrow(NETWORK_ID).etherToken, + defaultStartBlock: 4719568, // Block when the WETH contract was deployed. + }, + { + name: 'ZRX', + address: getContractAddressesForNetworkOrThrow(NETWORK_ID).zrxToken, + defaultStartBlock: 4145415, // Block when the ZRX contract was deployed. + }, + { + name: 'DAI', + address: '0x89d24a6b4ccb1b6faa2625fe562bdd9a23260359', + defaultStartBlock: 4752008, // Block when the DAI contract was deployed. + }, +]; + (async () => { connection = await createConnection(ormConfig as ConnectionOptions); const provider = web3Factory.getRpcProvider({ rpcUrl: INFURA_ROOT_URL, }); const endBlock = await calculateEndBlockAsync(provider); - await getAndSaveWETHApprovalEventsAsync(provider, endBlock); + for (const token of tokensToGetApprovalEvents) { + await getAndSaveApprovalEventsAsync(provider, token, endBlock); + } process.exit(0); })().catch(handleError); -async function getAndSaveWETHApprovalEventsAsync(provider: Web3ProviderEngine, endBlock: number): Promise<void> { - console.log('Checking existing approval events...'); +async function getAndSaveApprovalEventsAsync( + provider: Web3ProviderEngine, + token: Token, + endBlock: number, +): Promise<void> { + logUtils.log(`Getting approval events for ${token.name}...`); + logUtils.log('Checking existing approval events...'); const repository = connection.getRepository(ERC20ApprovalEvent); - const startBlock = (await getStartBlockAsync(repository)) || WETH_START_BLOCK; + const startBlock = (await getStartBlockAsync(token)) || token.defaultStartBlock; - console.log(`Getting WETH approval events starting at ${startBlock}...`); - const wethTokenAddress = getContractAddressesForNetworkOrThrow(NETWORK_ID).etherToken; - const eventsSource = new ERC20EventsSource(provider, NETWORK_ID, wethTokenAddress); + logUtils.log(`Getting approval events starting at ${startBlock}...`); + const eventsSource = new ERC20EventsSource(provider, NETWORK_ID, token.address); const eventLogs = await eventsSource.getApprovalEventsAsync(startBlock, endBlock); - console.log(`Parsing ${eventLogs.length} WETH approval events...`); + logUtils.log(`Parsing ${eventLogs.length} approval events...`); const events = parseERC20ApprovalEvents(eventLogs); - console.log(`Retrieved and parsed ${events.length} total WETH approval events.`); + logUtils.log(`Retrieved and parsed ${events.length} total approval events.`); await repository.save(events, { chunk: Math.ceil(events.length / BATCH_SAVE_SIZE) }); } @@ -52,15 +82,15 @@ async function calculateEndBlockAsync(provider: Web3ProviderEngine): Promise<num return currentBlock - BLOCK_FINALITY_THRESHOLD; } -async function getStartBlockAsync(repository: Repository<ERC20ApprovalEvent>): Promise<number | null> { - const fillEventCount = await repository.count(); - if (fillEventCount === 0) { - console.log(`No existing approval events found.`); - return null; - } +async function getStartBlockAsync(token: Token): Promise<number | null> { const queryResult = await connection.query( - `SELECT block_number FROM raw.erc20_approval_events ORDER BY block_number DESC LIMIT 1`, + `SELECT block_number FROM raw.erc20_approval_events WHERE token_address = $1 ORDER BY block_number DESC LIMIT 1`, + [token.address], ); + if (queryResult.length === 0) { + logUtils.log(`No existing approval events found for ${token.name}.`); + return null; + } const lastKnownBlock = queryResult[0].block_number; return lastKnownBlock - START_BLOCK_OFFSET; } diff --git a/packages/pipeline/src/scripts/pull_missing_blocks.ts b/packages/pipeline/src/scripts/pull_missing_blocks.ts index a5203824c..bb5385126 100644 --- a/packages/pipeline/src/scripts/pull_missing_blocks.ts +++ b/packages/pipeline/src/scripts/pull_missing_blocks.ts @@ -9,7 +9,7 @@ import { Web3Source } from '../data_sources/web3'; import { Block } from '../entities'; import * as ormConfig from '../ormconfig'; import { parseBlock } from '../parsers/web3'; -import { EXCHANGE_START_BLOCK, handleError, INFURA_ROOT_URL } from '../utils'; +import { handleError, INFURA_ROOT_URL } from '../utils'; // Number of blocks to save at once. const BATCH_SAVE_SIZE = 1000; @@ -37,22 +37,19 @@ interface MissingBlocksResponse { async function getAllMissingBlocksAsync(web3Source: Web3Source): Promise<void> { const blocksRepository = connection.getRepository(Block); - let fromBlock = EXCHANGE_START_BLOCK; while (true) { - const blockNumbers = await getMissingBlockNumbersAsync(fromBlock); + const blockNumbers = await getMissingBlockNumbersAsync(); if (blockNumbers.length === 0) { // There are no more missing blocks. We're done. break; } await getAndSaveBlocksAsync(web3Source, blocksRepository, blockNumbers); - fromBlock = Math.max(...blockNumbers) + 1; } const totalBlocks = await blocksRepository.count(); console.log(`Done saving blocks. There are now ${totalBlocks} total blocks.`); } -async function getMissingBlockNumbersAsync(fromBlock: number): Promise<number[]> { - console.log(`Checking for missing blocks starting at ${fromBlock}...`); +async function getMissingBlockNumbersAsync(): Promise<number[]> { // Note(albrow): The easiest way to get all the blocks we need is to // consider all the events tables together in a single query. If this query // gets too slow, we should consider re-architecting so that we can work on @@ -66,13 +63,12 @@ async function getMissingBlockNumbersAsync(fromBlock: number): Promise<number[]> ) SELECT DISTINCT(block_number) FROM all_events WHERE block_number NOT IN (SELECT number FROM raw.blocks) - AND block_number >= $1 - ORDER BY block_number ASC LIMIT $2`, - [fromBlock, MAX_BLOCKS_PER_QUERY], + ORDER BY block_number ASC LIMIT $1`, + [MAX_BLOCKS_PER_QUERY], )) as MissingBlocksResponse[]; const blockNumberStrings = R.pluck('block_number', response); const blockNumbers = R.map(parseInt, blockNumberStrings); - console.log(`Found ${blockNumbers.length} missing blocks in the given range.`); + console.log(`Found ${blockNumbers.length} missing blocks.`); return blockNumbers; } diff --git a/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts b/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts index 9d3ef2fba..19f81344e 100644 --- a/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts +++ b/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts @@ -23,6 +23,18 @@ interface CryptoCompareCoin { const TO_CURRENCIES = ['USD', 'EUR', 'ETH', 'USDT']; const ETHEREUM_IDENTIFIER = '7605'; const HTTP_OK_STATUS = 200; + +interface StaticPair { + fromSymbol: string; + toSymbol: string; +} +const SPECIAL_CASES: StaticPair[] = [ + { + fromSymbol: 'ETH', + toSymbol: 'USD', + }, +]; + /** * Get trading pairs with latest scraped time for OHLCV records * @param conn a typeorm Connection to postgres @@ -44,6 +56,7 @@ export async function fetchOHLCVTradingPairsAsync( FROM raw.ohlcv_external GROUP BY from_symbol, to_symbol;`); + // build addressable index: { fromsym: { tosym: time }} const latestTradingPairsIndex: { [fromSym: string]: { [toSym: string]: number } } = {}; latestTradingPairs.forEach(pair => { const latestIndex: { [toSym: string]: number } = latestTradingPairsIndex[pair.from_symbol] || {}; @@ -51,6 +64,13 @@ export async function fetchOHLCVTradingPairsAsync( latestTradingPairsIndex[pair.from_symbol] = latestIndex; }); + // match time to special cases + const specialCases: TradingPair[] = SPECIAL_CASES.map(pair => { + const latestSavedTime = + R.path<number>([pair.fromSymbol, pair.toSymbol], latestTradingPairsIndex) || earliestBackfillTime; + return R.assoc('latestSavedTime', latestSavedTime, pair); + }); + // get token symbols used by Crypto Compare const allCoinsResp = await fetchAsync(COINLIST_API); if (allCoinsResp.status !== HTTP_OK_STATUS) { @@ -66,27 +86,31 @@ export async function fetchOHLCVTradingPairsAsync( }); // fetch all tokens that are traded on 0x - const rawTokenAddresses: Array<{ tokenaddress: string }> = await conn.query( + const rawEventTokenAddresses: Array<{ tokenaddress: string }> = await conn.query( `SELECT DISTINCT(maker_token_address) as tokenaddress FROM raw.exchange_fill_events UNION SELECT DISTINCT(taker_token_address) as tokenaddress FROM raw.exchange_fill_events`, ); - const tokenAddresses = R.pluck('tokenaddress', rawTokenAddresses); + + // tslint:disable-next-line:no-unbound-method + const eventTokenAddresses = R.pluck('tokenaddress', rawEventTokenAddresses).map(R.toLower); // join token addresses with CC symbols - const allTokenSymbols: string[] = tokenAddresses - .map(tokenAddress => erc20CoinsIndex.get(tokenAddress.toLowerCase()) || '') - .filter(x => x); + const eventTokenSymbols: string[] = eventTokenAddresses + .filter(tokenAddress => erc20CoinsIndex.has(tokenAddress)) + .map(tokenAddress => erc20CoinsIndex.get(tokenAddress) as string); - // generate list of all tokens with time of latest existing record OR default earliest time - const allTradingPairCombinations: TradingPair[] = R.chain(sym => { + // join traded tokens with fiat and latest backfill time + const eventTradingPairs: TradingPair[] = R.chain(sym => { return TO_CURRENCIES.map(fiat => { - return { + const pair = { fromSymbol: sym, toSymbol: fiat, latestSavedTime: R.path<number>([sym, fiat], latestTradingPairsIndex) || earliestBackfillTime, }; + return pair; }); - }, allTokenSymbols); + }, eventTokenSymbols); - return allTradingPairCombinations; + // join with special cases + return R.concat(eventTradingPairs, specialCases); } diff --git a/packages/pipeline/test/parsers/ddex_orders/index_test.ts b/packages/pipeline/test/parsers/ddex_orders/index_test.ts index 9f4bfe7e3..4a4a86bf8 100644 --- a/packages/pipeline/test/parsers/ddex_orders/index_test.ts +++ b/packages/pipeline/test/parsers/ddex_orders/index_test.ts @@ -39,12 +39,14 @@ describe('ddex_orders', () => { expected.observedTimestamp = observedTimestamp; expected.orderType = 'bid'; expected.price = new BigNumber(0.5); - expected.baseAssetSymbol = 'DEF'; - expected.baseAssetAddress = '0xb45df06e38540a675fdb5b598abf2c0dbe9d6b81'; - expected.baseVolume = new BigNumber(5); - expected.quoteAssetSymbol = 'ABC'; - expected.quoteAssetAddress = '0x0000000000000000000000000000000000000000'; - expected.quoteVolume = new BigNumber(10); + // ddex currently confuses base and quote assets. + // Switch them to maintain our internal consistency. + expected.baseAssetSymbol = 'ABC'; + expected.baseAssetAddress = '0x0000000000000000000000000000000000000000'; + expected.baseVolume = new BigNumber(10); + expected.quoteAssetSymbol = 'DEF'; + expected.quoteAssetAddress = '0xb45df06e38540a675fdb5b598abf2c0dbe9d6b81'; + expected.quoteVolume = new BigNumber(5); const actual = parseDdexOrder(ddexMarket, observedTimestamp, orderType, source, ddexOrder); expect(actual).deep.equal(expected); diff --git a/packages/pipeline/test/parsers/oasis_orders/index_test.ts b/packages/pipeline/test/parsers/oasis_orders/index_test.ts index 9e8ba9a40..433bfb665 100644 --- a/packages/pipeline/test/parsers/oasis_orders/index_test.ts +++ b/packages/pipeline/test/parsers/oasis_orders/index_test.ts @@ -37,10 +37,10 @@ describe('oasis_orders', () => { expected.price = new BigNumber(0.5); expected.baseAssetSymbol = 'DEF'; expected.baseAssetAddress = null; - expected.baseVolume = new BigNumber(5); + expected.baseVolume = new BigNumber(10); expected.quoteAssetSymbol = 'ABC'; expected.quoteAssetAddress = null; - expected.quoteVolume = new BigNumber(10); + expected.quoteVolume = new BigNumber(5); const actual = parseOasisOrder(oasisMarket, observedTimestamp, orderType, source, oasisOrder); expect(actual).deep.equal(expected); diff --git a/packages/pipeline/test/parsers/paradex_orders/index_test.ts b/packages/pipeline/test/parsers/paradex_orders/index_test.ts index 1522806bf..6b811b90d 100644 --- a/packages/pipeline/test/parsers/paradex_orders/index_test.ts +++ b/packages/pipeline/test/parsers/paradex_orders/index_test.ts @@ -42,10 +42,10 @@ describe('paradex_orders', () => { expected.price = new BigNumber(0.1245); expected.baseAssetSymbol = 'DEF'; expected.baseAssetAddress = '0xb45df06e38540a675fdb5b598abf2c0dbe9d6b81'; - expected.baseVolume = new BigNumber(412 * 0.1245); + expected.baseVolume = new BigNumber(412); expected.quoteAssetSymbol = 'ABC'; expected.quoteAssetAddress = '0x0000000000000000000000000000000000000000'; - expected.quoteVolume = new BigNumber(412); + expected.quoteVolume = new BigNumber(412 * 0.1245); const actual = parseParadexOrder(paradexMarket, observedTimestamp, orderType, source, paradexOrder); expect(actual).deep.equal(expected); |