diff options
Diffstat (limited to 'packages/order-watcher/src/order_watcher')
6 files changed, 0 insertions, 1234 deletions
diff --git a/packages/order-watcher/src/order_watcher/collision_resistant_abi_decoder.ts b/packages/order-watcher/src/order_watcher/collision_resistant_abi_decoder.ts deleted file mode 100644 index 2ea796947..000000000 --- a/packages/order-watcher/src/order_watcher/collision_resistant_abi_decoder.ts +++ /dev/null @@ -1,54 +0,0 @@ -import { AbiDecoder } from '@0x/utils'; -import { ContractAbi, DecodedLogArgs, LogEntry, LogWithDecodedArgs, RawLog } from 'ethereum-types'; - -const TOKEN_TYPE_COLLISION = `Token can't be marked as ERC20 and ERC721 at the same time`; - -/** - * ERC20 and ERC721 have some events with different args but colliding signature. - * For exmaple: - * Transfer(_from address, _to address, _value uint256) - * Transfer(_from address, _to address, _tokenId uint256) - * Both have the signature: - * Transfer(address,address,uint256) - * - * In order to correctly decode those events we need to know the token type by address in advance. - * You can pass it by calling `this.addERC20Token(address)` or `this.addERC721Token(address)` - */ -export class CollisionResistanceAbiDecoder { - private readonly _erc20AbiDecoder: AbiDecoder; - private readonly _erc721AbiDecoder: AbiDecoder; - private readonly _restAbiDecoder: AbiDecoder; - private readonly _knownERC20Tokens = new Set(); - private readonly _knownERC721Tokens = new Set(); - constructor(erc20Abi: ContractAbi, erc721Abi: ContractAbi, abis: ContractAbi[]) { - this._erc20AbiDecoder = new AbiDecoder([erc20Abi]); - this._erc721AbiDecoder = new AbiDecoder([erc721Abi]); - this._restAbiDecoder = new AbiDecoder(abis); - } - public tryToDecodeLogOrNoop<ArgsType extends DecodedLogArgs>(log: LogEntry): LogWithDecodedArgs<ArgsType> | RawLog { - if (this._knownERC20Tokens.has(log.address)) { - const maybeDecodedERC20Log = this._erc20AbiDecoder.tryToDecodeLogOrNoop(log); - return maybeDecodedERC20Log; - } else if (this._knownERC721Tokens.has(log.address)) { - const maybeDecodedERC721Log = this._erc721AbiDecoder.tryToDecodeLogOrNoop(log); - return maybeDecodedERC721Log; - } else { - const maybeDecodedLog = this._restAbiDecoder.tryToDecodeLogOrNoop(log); - return maybeDecodedLog; - } - } - // Hints the ABI decoder that a particular token address is ERC20 and events from it should be decoded as ERC20 events - public addERC20Token(address: string): void { - if (this._knownERC721Tokens.has(address)) { - throw new Error(TOKEN_TYPE_COLLISION); - } - this._knownERC20Tokens.add(address); - } - // Hints the ABI decoder that a particular token address is ERC721 and events from it should be decoded as ERC721 events - public addERC721Token(address: string): void { - if (this._knownERC20Tokens.has(address)) { - throw new Error(TOKEN_TYPE_COLLISION); - } - this._knownERC721Tokens.add(address); - } -} diff --git a/packages/order-watcher/src/order_watcher/dependent_order_hashes_tracker.ts b/packages/order-watcher/src/order_watcher/dependent_order_hashes_tracker.ts deleted file mode 100644 index d1085014c..000000000 --- a/packages/order-watcher/src/order_watcher/dependent_order_hashes_tracker.ts +++ /dev/null @@ -1,243 +0,0 @@ -import { assetDataUtils, orderHashUtils } from '@0x/order-utils'; -import { AssetProxyId, SignedOrder } from '@0x/types'; -import { BigNumber } from '@0x/utils'; -import * as _ from 'lodash'; - -export interface OrderHashesByMakerAddress { - [makerAddress: string]: Set<string>; -} - -export interface OrderHashesByERC20ByMakerAddress { - [makerAddress: string]: { - [erc20TokenAddress: string]: Set<string>; - }; -} - -export interface OrderHashesByERC721AddressByTokenIdByMakerAddress { - [makerAddress: string]: { - [erc721TokenAddress: string]: { - // Ideally erc721TokenId should be a BigNumber, but it's not a valid index type so we just convert it to a string before using it as an index - [erc721TokenId: string]: Set<string>; - }; - }; -} - -/** - */ -export class DependentOrderHashesTracker { - private readonly _zrxTokenAddress: string; - // `_orderHashesByMakerAddress` is redundant and could be generated from - // `_orderHashesByERC20ByMakerAddress` and `_orderHashesByERC721AddressByTokenIdByMakerAddress` - // on the fly by merging all the entries together but it's more complex and computationally heavy. - // We might change that in future if we're move memory-constrained. - private readonly _orderHashesByMakerAddress: OrderHashesByMakerAddress = {}; - private readonly _orderHashesByERC20ByMakerAddress: OrderHashesByERC20ByMakerAddress = {}; - private readonly _orderHashesByERC721AddressByTokenIdByMakerAddress: OrderHashesByERC721AddressByTokenIdByMakerAddress = {}; - constructor(zrxTokenAddress: string) { - this._zrxTokenAddress = zrxTokenAddress; - } - public getDependentOrderHashesByERC721ByMaker(makerAddress: string, tokenAddress: string): string[] { - const orderHashSets = _.values( - this._orderHashesByERC721AddressByTokenIdByMakerAddress[makerAddress][tokenAddress], - ); - const orderHashList = _.reduce( - orderHashSets, - (accumulator, orderHashSet) => [...accumulator, ...orderHashSet], - [] as string[], - ); - const uniqueOrderHashList = _.uniq(orderHashList); - return uniqueOrderHashList; - } - public getDependentOrderHashesByMaker(makerAddress: string): string[] { - const dependentOrderHashes = Array.from(this._orderHashesByMakerAddress[makerAddress] || {}); - return dependentOrderHashes; - } - public getDependentOrderHashesByAssetDataByMaker(makerAddress: string, assetData: string): string[] { - const decodedAssetData = assetDataUtils.decodeAssetDataOrThrow(assetData); - const dependentOrderHashes = - decodedAssetData.assetProxyId === AssetProxyId.ERC20 - ? this._getDependentOrderHashesByERC20AssetData(makerAddress, assetData) - : this._getDependentOrderHashesByERC721AssetData(makerAddress, assetData); - return dependentOrderHashes; - } - public addToDependentOrderHashes(signedOrder: SignedOrder): void { - this._addAssetDataToDependentOrderHashes(signedOrder, signedOrder.makerAssetData); - this._addToERC20DependentOrderHashes(signedOrder, this._zrxTokenAddress); - this._addToMakerDependentOrderHashes(signedOrder); - } - public removeFromDependentOrderHashes(signedOrder: SignedOrder): void { - this._removeAssetDataFromDependentOrderHashes(signedOrder, signedOrder.makerAssetData); - // If makerToken === ZRX then we already removed it and we don't need to remove it again. - const decodedMakerAssetData = assetDataUtils.decodeAssetDataOrThrow(signedOrder.makerAssetData); - if ( - assetDataUtils.isERC20AssetData(decodedMakerAssetData) && - decodedMakerAssetData.tokenAddress !== this._zrxTokenAddress - ) { - this._removeFromERC20DependentOrderhashes(signedOrder, this._zrxTokenAddress); - } - this._removeFromMakerDependentOrderhashes(signedOrder); - } - private _getDependentOrderHashesByERC20AssetData(makerAddress: string, erc20AssetData: string): string[] { - const tokenAddress = assetDataUtils.decodeERC20AssetData(erc20AssetData).tokenAddress; - let dependentOrderHashes: string[] = []; - if ( - !_.isUndefined(this._orderHashesByERC20ByMakerAddress[makerAddress]) && - !_.isUndefined(this._orderHashesByERC20ByMakerAddress[makerAddress][tokenAddress]) - ) { - dependentOrderHashes = Array.from(this._orderHashesByERC20ByMakerAddress[makerAddress][tokenAddress]); - } - return dependentOrderHashes; - } - private _getDependentOrderHashesByERC721AssetData(makerAddress: string, erc721AssetData: string): string[] { - const tokenAddress = assetDataUtils.decodeERC721AssetData(erc721AssetData).tokenAddress; - const tokenId = assetDataUtils.decodeERC721AssetData(erc721AssetData).tokenId; - let dependentOrderHashes: string[] = []; - if ( - !_.isUndefined(this._orderHashesByERC721AddressByTokenIdByMakerAddress[makerAddress]) && - !_.isUndefined(this._orderHashesByERC721AddressByTokenIdByMakerAddress[makerAddress][tokenAddress]) && - !_.isUndefined( - this._orderHashesByERC721AddressByTokenIdByMakerAddress[makerAddress][tokenAddress][tokenId.toString()], - ) - ) { - dependentOrderHashes = Array.from( - this._orderHashesByERC721AddressByTokenIdByMakerAddress[makerAddress][tokenAddress][tokenId.toString()], - ); - } - return dependentOrderHashes; - } - private _addToERC20DependentOrderHashes(signedOrder: SignedOrder, erc20TokenAddress: string): void { - const orderHash = orderHashUtils.getOrderHashHex(signedOrder); - if (_.isUndefined(this._orderHashesByERC20ByMakerAddress[signedOrder.makerAddress])) { - this._orderHashesByERC20ByMakerAddress[signedOrder.makerAddress] = {}; - } - if (_.isUndefined(this._orderHashesByERC20ByMakerAddress[signedOrder.makerAddress][erc20TokenAddress])) { - this._orderHashesByERC20ByMakerAddress[signedOrder.makerAddress][erc20TokenAddress] = new Set(); - } - this._orderHashesByERC20ByMakerAddress[signedOrder.makerAddress][erc20TokenAddress].add(orderHash); - } - private _addToERC721DependentOrderHashes( - signedOrder: SignedOrder, - erc721TokenAddress: string, - tokenId: BigNumber, - ): void { - const orderHash = orderHashUtils.getOrderHashHex(signedOrder); - if (_.isUndefined(this._orderHashesByERC721AddressByTokenIdByMakerAddress[signedOrder.makerAddress])) { - this._orderHashesByERC721AddressByTokenIdByMakerAddress[signedOrder.makerAddress] = {}; - } - - if ( - _.isUndefined( - this._orderHashesByERC721AddressByTokenIdByMakerAddress[signedOrder.makerAddress][erc721TokenAddress], - ) - ) { - this._orderHashesByERC721AddressByTokenIdByMakerAddress[signedOrder.makerAddress][erc721TokenAddress] = {}; - } - - if ( - _.isUndefined( - this._orderHashesByERC721AddressByTokenIdByMakerAddress[signedOrder.makerAddress][erc721TokenAddress][ - tokenId.toString() - ], - ) - ) { - this._orderHashesByERC721AddressByTokenIdByMakerAddress[signedOrder.makerAddress][erc721TokenAddress][ - tokenId.toString() - ] = new Set(); - } - - this._orderHashesByERC721AddressByTokenIdByMakerAddress[signedOrder.makerAddress][erc721TokenAddress][ - tokenId.toString() - ].add(orderHash); - } - private _addAssetDataToDependentOrderHashes(signedOrder: SignedOrder, assetData: string): void { - const decodedAssetData = assetDataUtils.decodeAssetDataOrThrow(assetData); - if (assetDataUtils.isERC20AssetData(decodedAssetData)) { - this._addToERC20DependentOrderHashes(signedOrder, decodedAssetData.tokenAddress); - } else if (assetDataUtils.isERC721AssetData(decodedAssetData)) { - this._addToERC721DependentOrderHashes(signedOrder, decodedAssetData.tokenAddress, decodedAssetData.tokenId); - } else if (assetDataUtils.isMultiAssetData(decodedAssetData)) { - _.each(decodedAssetData.nestedAssetData, nestedAssetDataElement => - this._addAssetDataToDependentOrderHashes(signedOrder, nestedAssetDataElement), - ); - } - } - private _addToMakerDependentOrderHashes(signedOrder: SignedOrder): void { - const orderHash = orderHashUtils.getOrderHashHex(signedOrder); - if (_.isUndefined(this._orderHashesByMakerAddress[signedOrder.makerAddress])) { - this._orderHashesByMakerAddress[signedOrder.makerAddress] = new Set(); - } - this._orderHashesByMakerAddress[signedOrder.makerAddress].add(orderHash); - } - private _removeFromERC20DependentOrderhashes(signedOrder: SignedOrder, erc20TokenAddress: string): void { - const orderHash = orderHashUtils.getOrderHashHex(signedOrder); - this._orderHashesByERC20ByMakerAddress[signedOrder.makerAddress][erc20TokenAddress].delete(orderHash); - - if (_.isEmpty(this._orderHashesByERC20ByMakerAddress[signedOrder.makerAddress][erc20TokenAddress])) { - delete this._orderHashesByERC20ByMakerAddress[signedOrder.makerAddress][erc20TokenAddress]; - } - - if (_.isEmpty(this._orderHashesByERC20ByMakerAddress[signedOrder.makerAddress])) { - delete this._orderHashesByERC20ByMakerAddress[signedOrder.makerAddress]; - } - } - private _removeFromERC721DependentOrderhashes( - signedOrder: SignedOrder, - erc721TokenAddress: string, - tokenId: BigNumber, - ): void { - const orderHash = orderHashUtils.getOrderHashHex(signedOrder); - this._orderHashesByERC721AddressByTokenIdByMakerAddress[signedOrder.makerAddress][erc721TokenAddress][ - tokenId.toString() - ].delete(orderHash); - - if ( - _.isEmpty( - this._orderHashesByERC721AddressByTokenIdByMakerAddress[signedOrder.makerAddress][erc721TokenAddress][ - tokenId.toString() - ], - ) - ) { - delete this._orderHashesByERC721AddressByTokenIdByMakerAddress[signedOrder.makerAddress][ - erc721TokenAddress - ][tokenId.toString()]; - } - - if ( - _.isEmpty( - this._orderHashesByERC721AddressByTokenIdByMakerAddress[signedOrder.makerAddress][erc721TokenAddress], - ) - ) { - delete this._orderHashesByERC721AddressByTokenIdByMakerAddress[signedOrder.makerAddress][ - erc721TokenAddress - ]; - } - - if (_.isEmpty(this._orderHashesByERC721AddressByTokenIdByMakerAddress[signedOrder.makerAddress])) { - delete this._orderHashesByERC721AddressByTokenIdByMakerAddress[signedOrder.makerAddress]; - } - } - private _removeFromMakerDependentOrderhashes(signedOrder: SignedOrder): void { - const orderHash = orderHashUtils.getOrderHashHex(signedOrder); - this._orderHashesByMakerAddress[signedOrder.makerAddress].delete(orderHash); - - if (_.isEmpty(this._orderHashesByMakerAddress[signedOrder.makerAddress])) { - delete this._orderHashesByMakerAddress[signedOrder.makerAddress]; - } - } - private _removeAssetDataFromDependentOrderHashes(signedOrder: SignedOrder, assetData: string): void { - const decodedAssetData = assetDataUtils.decodeAssetDataOrThrow(assetData); - if (assetDataUtils.isERC20AssetData(decodedAssetData)) { - this._removeFromERC20DependentOrderhashes(signedOrder, decodedAssetData.tokenAddress); - } else if (assetDataUtils.isERC721AssetData(decodedAssetData)) { - this._removeFromERC721DependentOrderhashes( - signedOrder, - decodedAssetData.tokenAddress, - decodedAssetData.tokenId, - ); - } else if (assetDataUtils.isMultiAssetData(decodedAssetData)) { - _.each(decodedAssetData.nestedAssetData, nestedAssetDataElement => - this._removeAssetDataFromDependentOrderHashes(signedOrder, nestedAssetDataElement), - ); - } - } -} diff --git a/packages/order-watcher/src/order_watcher/event_watcher.ts b/packages/order-watcher/src/order_watcher/event_watcher.ts deleted file mode 100644 index 3149d858b..000000000 --- a/packages/order-watcher/src/order_watcher/event_watcher.ts +++ /dev/null @@ -1,158 +0,0 @@ -import { intervalUtils, logUtils } from '@0x/utils'; -import { marshaller, Web3Wrapper } from '@0x/web3-wrapper'; -import { BlockParamLiteral, FilterObject, LogEntry, Provider, RawLogEntry } from 'ethereum-types'; -import { Block, BlockAndLogStreamer, Log } from 'ethereumjs-blockstream'; -import * as _ from 'lodash'; - -import { EventWatcherCallback, OrderWatcherError } from '../types'; -import { assert } from '../utils/assert'; - -const DEFAULT_EVENT_POLLING_INTERVAL_MS = 200; - -enum LogEventState { - Removed, - Added, -} - -/** - * The EventWatcher watches for blockchain events at the specified block confirmation - * depth. - */ -export class EventWatcher { - private readonly _web3Wrapper: Web3Wrapper; - private readonly _isVerbose: boolean; - private _blockAndLogStreamerIfExists: BlockAndLogStreamer<Block, Log> | undefined; - private _blockAndLogStreamIntervalIfExists?: NodeJS.Timer; - private _onLogAddedSubscriptionToken: string | undefined; - private _onLogRemovedSubscriptionToken: string | undefined; - private readonly _pollingIntervalMs: number; - constructor( - provider: Provider, - pollingIntervalIfExistsMs: undefined | number, - stateLayer: BlockParamLiteral, - isVerbose: boolean, - ) { - this._isVerbose = isVerbose; - this._web3Wrapper = new Web3Wrapper(provider); - this._pollingIntervalMs = _.isUndefined(pollingIntervalIfExistsMs) - ? DEFAULT_EVENT_POLLING_INTERVAL_MS - : pollingIntervalIfExistsMs; - this._blockAndLogStreamerIfExists = undefined; - this._blockAndLogStreamIntervalIfExists = undefined; - this._onLogAddedSubscriptionToken = undefined; - this._onLogRemovedSubscriptionToken = undefined; - } - public subscribe(callback: EventWatcherCallback): void { - assert.isFunction('callback', callback); - if (!_.isUndefined(this._blockAndLogStreamIntervalIfExists)) { - throw new Error(OrderWatcherError.SubscriptionAlreadyPresent); - } - this._startBlockAndLogStream(callback); - } - public unsubscribe(): void { - if (_.isUndefined(this._blockAndLogStreamIntervalIfExists)) { - throw new Error(OrderWatcherError.SubscriptionNotFound); - } - this._stopBlockAndLogStream(); - } - private _startBlockAndLogStream(callback: EventWatcherCallback): void { - if (!_.isUndefined(this._blockAndLogStreamerIfExists)) { - throw new Error(OrderWatcherError.SubscriptionAlreadyPresent); - } - this._blockAndLogStreamerIfExists = new BlockAndLogStreamer( - this._blockstreamGetBlockOrNullAsync.bind(this), - this._blockstreamGetLogsAsync.bind(this), - this._onBlockAndLogStreamerError.bind(this), - ); - const catchAllLogFilter = {}; - this._blockAndLogStreamerIfExists.addLogFilter(catchAllLogFilter); - this._blockAndLogStreamIntervalIfExists = intervalUtils.setAsyncExcludingInterval( - this._reconcileBlockAsync.bind(this), - this._pollingIntervalMs, - this._onBlockAndLogStreamerError.bind(this), - ); - let isRemoved = false; - this._onLogAddedSubscriptionToken = this._blockAndLogStreamerIfExists.subscribeToOnLogAdded( - this._onLogStateChangedAsync.bind(this, callback, isRemoved), - ); - isRemoved = true; - this._onLogRemovedSubscriptionToken = this._blockAndLogStreamerIfExists.subscribeToOnLogRemoved( - this._onLogStateChangedAsync.bind(this, callback, isRemoved), - ); - } - // This method only exists in order to comply with the expected interface of Blockstream's constructor - private async _blockstreamGetBlockOrNullAsync(hash: string): Promise<Block | null> { - const shouldIncludeTransactionData = false; - const blockOrNull = await this._web3Wrapper.sendRawPayloadAsync<Block | null>({ - method: 'eth_getBlockByHash', - params: [hash, shouldIncludeTransactionData], - }); - return blockOrNull; - } - // This method only exists in order to comply with the expected interface of Blockstream's constructor - private async _blockstreamGetLatestBlockOrNullAsync(): Promise<Block | null> { - const shouldIncludeTransactionData = false; - const blockOrNull = await this._web3Wrapper.sendRawPayloadAsync<Block | null>({ - method: 'eth_getBlockByNumber', - params: [BlockParamLiteral.Latest, shouldIncludeTransactionData], - }); - return blockOrNull; - } - // This method only exists in order to comply with the expected interface of Blockstream's constructor - private async _blockstreamGetLogsAsync(filterOptions: FilterObject): Promise<RawLogEntry[]> { - const logs = await this._web3Wrapper.sendRawPayloadAsync<RawLogEntry[]>({ - method: 'eth_getLogs', - params: [filterOptions], - }); - return logs as RawLogEntry[]; - } - private _stopBlockAndLogStream(): void { - if (_.isUndefined(this._blockAndLogStreamerIfExists)) { - throw new Error(OrderWatcherError.SubscriptionNotFound); - } - this._blockAndLogStreamerIfExists.unsubscribeFromOnLogAdded(this._onLogAddedSubscriptionToken as string); - this._blockAndLogStreamerIfExists.unsubscribeFromOnLogRemoved(this._onLogRemovedSubscriptionToken as string); - intervalUtils.clearAsyncExcludingInterval(this._blockAndLogStreamIntervalIfExists as NodeJS.Timer); - delete this._blockAndLogStreamerIfExists; - delete this._blockAndLogStreamIntervalIfExists; - } - private async _onLogStateChangedAsync( - callback: EventWatcherCallback, - isRemoved: boolean, - rawLog: RawLogEntry, - ): Promise<void> { - const log: LogEntry = marshaller.unmarshalLog(rawLog); - await this._emitDifferencesAsync(log, isRemoved ? LogEventState.Removed : LogEventState.Added, callback); - } - private async _reconcileBlockAsync(): Promise<void> { - const latestBlockOrNull = await this._blockstreamGetLatestBlockOrNullAsync(); - if (_.isNull(latestBlockOrNull)) { - return; // noop - } - // We need to coerce to Block type cause Web3.Block includes types for mempool blocks - if (!_.isUndefined(this._blockAndLogStreamerIfExists)) { - // If we clear the interval while fetching the block - this._blockAndLogStreamer will be undefined - await this._blockAndLogStreamerIfExists.reconcileNewBlock(latestBlockOrNull); - } - } - private async _emitDifferencesAsync( - log: LogEntry, - logEventState: LogEventState, - callback: EventWatcherCallback, - ): Promise<void> { - const logEvent = { - removed: logEventState === LogEventState.Removed, - ...log, - }; - if (!_.isUndefined(this._blockAndLogStreamIntervalIfExists)) { - callback(null, logEvent); - } - } - private _onBlockAndLogStreamerError(err: Error): void { - // Since Blockstream errors are all recoverable, we simply log them if the verbose - // config is passed in. - if (this._isVerbose) { - logUtils.warn(err); - } - } -} diff --git a/packages/order-watcher/src/order_watcher/expiration_watcher.ts b/packages/order-watcher/src/order_watcher/expiration_watcher.ts deleted file mode 100644 index 82590efde..000000000 --- a/packages/order-watcher/src/order_watcher/expiration_watcher.ts +++ /dev/null @@ -1,89 +0,0 @@ -import { BigNumber, intervalUtils } from '@0x/utils'; -import { RBTree } from 'bintrees'; -import * as _ from 'lodash'; - -import { OrderWatcherError } from '../types'; -import { utils } from '../utils/utils'; - -const DEFAULT_EXPIRATION_MARGIN_MS = 0; -const DEFAULT_ORDER_EXPIRATION_CHECKING_INTERVAL_MS = 50; - -/** - * This class includes the functionality to detect expired orders. - * It stores them in a min heap by expiration time and checks for expired ones every `orderExpirationCheckingIntervalMs` - */ -export class ExpirationWatcher { - private readonly _orderHashByExpirationRBTree: RBTree<string>; - private readonly _expiration: { [orderHash: string]: BigNumber } = {}; - private readonly _orderExpirationCheckingIntervalMs: number; - private readonly _expirationMarginMs: number; - private _orderExpirationCheckingIntervalIdIfExists?: NodeJS.Timer; - constructor(expirationMarginIfExistsMs?: number, orderExpirationCheckingIntervalIfExistsMs?: number) { - this._orderExpirationCheckingIntervalMs = - orderExpirationCheckingIntervalIfExistsMs || DEFAULT_ORDER_EXPIRATION_CHECKING_INTERVAL_MS; - this._expirationMarginMs = expirationMarginIfExistsMs || DEFAULT_EXPIRATION_MARGIN_MS; - this._orderExpirationCheckingIntervalMs = - expirationMarginIfExistsMs || DEFAULT_ORDER_EXPIRATION_CHECKING_INTERVAL_MS; - const comparator = (lhsOrderHash: string, rhsOrderHash: string) => { - const lhsExpiration = this._expiration[lhsOrderHash].toNumber(); - const rhsExpiration = this._expiration[rhsOrderHash].toNumber(); - if (lhsExpiration !== rhsExpiration) { - return lhsExpiration - rhsExpiration; - } else { - // HACK: If two orders have identical expirations, the order in which they are emitted by the - // ExpirationWatcher does not matter, so we emit them in alphabetical order by orderHash. - return lhsOrderHash.localeCompare(rhsOrderHash); - } - }; - this._orderHashByExpirationRBTree = new RBTree(comparator); - } - public subscribe(callback: (orderHash: string) => void): void { - if (!_.isUndefined(this._orderExpirationCheckingIntervalIdIfExists)) { - throw new Error(OrderWatcherError.SubscriptionAlreadyPresent); - } - this._orderExpirationCheckingIntervalIdIfExists = intervalUtils.setInterval( - this._pruneExpiredOrders.bind(this, callback), - this._orderExpirationCheckingIntervalMs, - _.noop.bind(_), // _pruneExpiredOrders never throws - ); - } - public unsubscribe(): void { - if (_.isUndefined(this._orderExpirationCheckingIntervalIdIfExists)) { - throw new Error(OrderWatcherError.SubscriptionNotFound); - } - intervalUtils.clearInterval(this._orderExpirationCheckingIntervalIdIfExists); - delete this._orderExpirationCheckingIntervalIdIfExists; - } - public addOrder(orderHash: string, expirationUnixTimestampMs: BigNumber): void { - this._expiration[orderHash] = expirationUnixTimestampMs; - this._orderHashByExpirationRBTree.insert(orderHash); - } - public removeOrder(orderHash: string): void { - if (_.isUndefined(this._expiration[orderHash])) { - return; // noop since order already removed - } - this._orderHashByExpirationRBTree.remove(orderHash); - delete this._expiration[orderHash]; - } - private _pruneExpiredOrders(callback: (orderHash: string) => void): void { - const currentUnixTimestampMs = utils.getCurrentUnixTimestampMs(); - while (true) { - const hasNoTrackedOrders = this._orderHashByExpirationRBTree.size === 0; - if (hasNoTrackedOrders) { - break; - } - const nextOrderHashToExpire = this._orderHashByExpirationRBTree.min(); - const hasNoExpiredOrders = this._expiration[nextOrderHashToExpire].isGreaterThan( - currentUnixTimestampMs.plus(this._expirationMarginMs), - ); - const isSubscriptionActive = _.isUndefined(this._orderExpirationCheckingIntervalIdIfExists); - if (hasNoExpiredOrders || isSubscriptionActive) { - break; - } - const orderHash = this._orderHashByExpirationRBTree.min(); - this._orderHashByExpirationRBTree.remove(orderHash); - delete this._expiration[orderHash]; - callback(orderHash); - } - } -} diff --git a/packages/order-watcher/src/order_watcher/order_watcher.ts b/packages/order-watcher/src/order_watcher/order_watcher.ts deleted file mode 100644 index a06fd0cfe..000000000 --- a/packages/order-watcher/src/order_watcher/order_watcher.ts +++ /dev/null @@ -1,490 +0,0 @@ -// tslint:disable:no-unnecessary-type-assertion -import { ContractAddresses } from '@0x/contract-addresses'; -import * as artifacts from '@0x/contract-artifacts'; -import { - AssetBalanceAndProxyAllowanceFetcher, - ContractWrappers, - ERC20TokenApprovalEventArgs, - ERC20TokenEventArgs, - ERC20TokenEvents, - ERC20TokenTransferEventArgs, - ERC721TokenApprovalEventArgs, - ERC721TokenApprovalForAllEventArgs, - ERC721TokenEventArgs, - ERC721TokenEvents, - ERC721TokenTransferEventArgs, - ExchangeCancelEventArgs, - ExchangeCancelUpToEventArgs, - ExchangeEventArgs, - ExchangeEvents, - ExchangeFillEventArgs, - OrderFilledCancelledFetcher, - WETH9DepositEventArgs, - WETH9EventArgs, - WETH9Events, - WETH9WithdrawalEventArgs, -} from '@0x/contract-wrappers'; -import { schemas } from '@0x/json-schemas'; -import { - assetDataUtils, - BalanceAndProxyAllowanceLazyStore, - OrderFilledCancelledLazyStore, - orderHashUtils, - OrderStateUtils, -} from '@0x/order-utils'; -import { AssetProxyId, ExchangeContractErrs, OrderState, SignedOrder, Stats } from '@0x/types'; -import { errorUtils, intervalUtils } from '@0x/utils'; -import { BlockParamLiteral, LogEntryEvent, LogWithDecodedArgs, Provider } from 'ethereum-types'; -import * as _ from 'lodash'; - -import { orderWatcherPartialConfigSchema } from '../schemas/order_watcher_partial_config_schema'; -import { OnOrderStateChangeCallback, OrderWatcherConfig, OrderWatcherError } from '../types'; -import { assert } from '../utils/assert'; - -import { CollisionResistanceAbiDecoder } from './collision_resistant_abi_decoder'; -import { DependentOrderHashesTracker } from './dependent_order_hashes_tracker'; -import { EventWatcher } from './event_watcher'; -import { ExpirationWatcher } from './expiration_watcher'; - -const MILLISECONDS_IN_A_SECOND = 1000; - -type ContractEventArgs = WETH9EventArgs | ExchangeEventArgs | ERC20TokenEventArgs | ERC721TokenEventArgs; - -interface OrderByOrderHash { - [orderHash: string]: SignedOrder; -} - -interface OrderStateByOrderHash { - [orderHash: string]: OrderState; -} - -const DEFAULT_ORDER_WATCHER_CONFIG: OrderWatcherConfig = { - orderExpirationCheckingIntervalMs: 50, - eventPollingIntervalMs: 200, - expirationMarginMs: 0, - // tslint:disable-next-line:custom-no-magic-numbers - cleanupJobIntervalMs: 1000 * 60 * 60, // 1h - isVerbose: true, -}; -const STATE_LAYER = BlockParamLiteral.Latest; - -/** - * This class includes all the functionality related to watching a set of orders - * for potential changes in order validity/fillability. The orderWatcher notifies - * the subscriber of these changes so that a final decision can be made on whether - * the order should be deemed invalid. - */ -export class OrderWatcher { - private readonly _dependentOrderHashesTracker: DependentOrderHashesTracker; - private readonly _orderStateByOrderHashCache: OrderStateByOrderHash = {}; - private readonly _orderByOrderHash: OrderByOrderHash = {}; - private readonly _eventWatcher: EventWatcher; - private readonly _provider: Provider; - private readonly _collisionResistantAbiDecoder: CollisionResistanceAbiDecoder; - private readonly _expirationWatcher: ExpirationWatcher; - private readonly _orderStateUtils: OrderStateUtils; - private readonly _orderFilledCancelledLazyStore: OrderFilledCancelledLazyStore; - private readonly _balanceAndProxyAllowanceLazyStore: BalanceAndProxyAllowanceLazyStore; - private readonly _cleanupJobInterval: number; - private _cleanupJobIntervalIdIfExists?: NodeJS.Timer; - private _callbackIfExists?: OnOrderStateChangeCallback; - /** - * Instantiate a new OrderWatcher - * @param provider Web3 provider to use for JSON RPC calls - * @param networkId NetworkId to watch orders on - * @param contractAddresses Optional contract addresses. Defaults to known - * addresses based on networkId. - * @param partialConfig Optional configurations - */ - constructor( - provider: Provider, - networkId: number, - contractAddresses?: ContractAddresses, - partialConfig: Partial<OrderWatcherConfig> = DEFAULT_ORDER_WATCHER_CONFIG, - ) { - assert.isWeb3Provider('provider', provider); - assert.isNumber('networkId', networkId); - assert.doesConformToSchema('partialConfig', partialConfig, orderWatcherPartialConfigSchema); - const config = { - ...DEFAULT_ORDER_WATCHER_CONFIG, - ...partialConfig, - }; - - this._provider = provider; - this._collisionResistantAbiDecoder = new CollisionResistanceAbiDecoder( - artifacts.ERC20Token.compilerOutput.abi, - artifacts.ERC721Token.compilerOutput.abi, - [artifacts.WETH9.compilerOutput.abi, artifacts.Exchange.compilerOutput.abi], - ); - const contractWrappers = new ContractWrappers(provider, { - networkId, - // Note(albrow): We let the contract-wrappers package handle - // default values for contractAddresses. - contractAddresses, - }); - this._eventWatcher = new EventWatcher(provider, config.eventPollingIntervalMs, STATE_LAYER, config.isVerbose); - const balanceAndProxyAllowanceFetcher = new AssetBalanceAndProxyAllowanceFetcher( - contractWrappers.erc20Token, - contractWrappers.erc721Token, - STATE_LAYER, - ); - this._balanceAndProxyAllowanceLazyStore = new BalanceAndProxyAllowanceLazyStore( - balanceAndProxyAllowanceFetcher, - ); - const orderFilledCancelledFetcher = new OrderFilledCancelledFetcher(contractWrappers.exchange, STATE_LAYER); - this._orderFilledCancelledLazyStore = new OrderFilledCancelledLazyStore(orderFilledCancelledFetcher); - this._orderStateUtils = new OrderStateUtils(balanceAndProxyAllowanceFetcher, orderFilledCancelledFetcher); - const expirationMarginIfExistsMs = _.isUndefined(config) ? undefined : config.expirationMarginMs; - this._expirationWatcher = new ExpirationWatcher( - expirationMarginIfExistsMs, - config.orderExpirationCheckingIntervalMs, - ); - this._cleanupJobInterval = config.cleanupJobIntervalMs; - const zrxTokenAddress = assetDataUtils.decodeERC20AssetData(orderFilledCancelledFetcher.getZRXAssetData()) - .tokenAddress; - this._dependentOrderHashesTracker = new DependentOrderHashesTracker(zrxTokenAddress); - } - /** - * Add an order to the orderWatcher. Before the order is added, it's - * signature is verified. - * @param signedOrder The order you wish to start watching. - */ - public async addOrderAsync(signedOrder: SignedOrder): Promise<void> { - assert.doesConformToSchema('signedOrder', signedOrder, schemas.signedOrderSchema); - const orderHash = orderHashUtils.getOrderHashHex(signedOrder); - await assert.isValidSignatureAsync(this._provider, orderHash, signedOrder.signature, signedOrder.makerAddress); - - const expirationUnixTimestampMs = signedOrder.expirationTimeSeconds.times(MILLISECONDS_IN_A_SECOND); - this._expirationWatcher.addOrder(orderHash, expirationUnixTimestampMs); - - this._orderByOrderHash[orderHash] = signedOrder; - this._dependentOrderHashesTracker.addToDependentOrderHashes(signedOrder); - - const orderAssetDatas = [signedOrder.makerAssetData, signedOrder.takerAssetData]; - _.each(orderAssetDatas, assetData => this._addAssetDataToAbiDecoder(assetData)); - } - /** - * Removes an order from the orderWatcher - * @param orderHash The orderHash of the order you wish to stop watching. - */ - public removeOrder(orderHash: string): void { - assert.doesConformToSchema('orderHash', orderHash, schemas.orderHashSchema); - const signedOrder = this._orderByOrderHash[orderHash]; - if (_.isUndefined(signedOrder)) { - return; // noop - } - this._dependentOrderHashesTracker.removeFromDependentOrderHashes(signedOrder); - delete this._orderByOrderHash[orderHash]; - this._expirationWatcher.removeOrder(orderHash); - delete this._orderStateByOrderHashCache[orderHash]; - } - /** - * Starts an orderWatcher subscription. The callback will be called every time a watched order's - * backing blockchain state has changed. This is a call-to-action for the caller to re-validate the order. - * @param callback Receives the orderHash of the order that should be re-validated, together - * with all the order-relevant blockchain state needed to re-validate the order. - */ - public subscribe(callback: OnOrderStateChangeCallback): void { - assert.isFunction('callback', callback); - if (!_.isUndefined(this._callbackIfExists)) { - throw new Error(OrderWatcherError.SubscriptionAlreadyPresent); - } - this._callbackIfExists = callback; - this._eventWatcher.subscribe(this._onEventWatcherCallbackAsync.bind(this)); - this._expirationWatcher.subscribe(this._onOrderExpired.bind(this)); - this._cleanupJobIntervalIdIfExists = intervalUtils.setAsyncExcludingInterval( - this._cleanupAsync.bind(this), - this._cleanupJobInterval, - (err: Error) => { - this.unsubscribe(); - callback(err); - }, - ); - } - /** - * Ends an orderWatcher subscription. - */ - public unsubscribe(): void { - if (_.isUndefined(this._callbackIfExists) || _.isUndefined(this._cleanupJobIntervalIdIfExists)) { - throw new Error(OrderWatcherError.SubscriptionNotFound); - } - this._balanceAndProxyAllowanceLazyStore.deleteAll(); - this._orderFilledCancelledLazyStore.deleteAll(); - delete this._callbackIfExists; - this._eventWatcher.unsubscribe(); - this._expirationWatcher.unsubscribe(); - intervalUtils.clearAsyncExcludingInterval(this._cleanupJobIntervalIdIfExists); - } - /** - * Gets statistics of the OrderWatcher Instance. - */ - public getStats(): Stats { - return { - orderCount: _.size(this._orderByOrderHash), - }; - } - private async _cleanupAsync(): Promise<void> { - for (const orderHash of _.keys(this._orderByOrderHash)) { - this._cleanupOrderRelatedState(orderHash); - await this._emitRevalidateOrdersAsync([orderHash]); - } - } - private _addAssetDataToAbiDecoder(assetData: string): void { - const decodedAssetData = assetDataUtils.decodeAssetDataOrThrow(assetData); - if (assetDataUtils.isERC20AssetData(decodedAssetData)) { - this._collisionResistantAbiDecoder.addERC20Token(decodedAssetData.tokenAddress); - } else if (assetDataUtils.isERC721AssetData(decodedAssetData)) { - this._collisionResistantAbiDecoder.addERC721Token(decodedAssetData.tokenAddress); - } else if (assetDataUtils.isMultiAssetData(decodedAssetData)) { - _.each(decodedAssetData.nestedAssetData, nestedAssetDataElement => - this._addAssetDataToAbiDecoder(nestedAssetDataElement), - ); - } - } - private _deleteLazyStoreBalance(assetData: string, userAddress: string): void { - const assetProxyId = assetDataUtils.decodeAssetProxyId(assetData); - switch (assetProxyId) { - case AssetProxyId.ERC20: - case AssetProxyId.ERC721: - this._balanceAndProxyAllowanceLazyStore.deleteBalance(assetData, userAddress); - break; - case AssetProxyId.MultiAsset: - const decodedAssetData = assetDataUtils.decodeMultiAssetData(assetData); - _.each(decodedAssetData.nestedAssetData, nestedAssetDataElement => - this._deleteLazyStoreBalance(nestedAssetDataElement, userAddress), - ); - break; - default: - break; - } - } - private _deleteLazyStoreProxyAllowance(assetData: string, userAddress: string): void { - const assetProxyId = assetDataUtils.decodeAssetProxyId(assetData); - switch (assetProxyId) { - case AssetProxyId.ERC20: - case AssetProxyId.ERC721: - this._balanceAndProxyAllowanceLazyStore.deleteProxyAllowance(assetData, userAddress); - break; - case AssetProxyId.MultiAsset: - const decodedAssetData = assetDataUtils.decodeMultiAssetData(assetData); - _.each(decodedAssetData.nestedAssetData, nestedAssetDataElement => - this._deleteLazyStoreProxyAllowance(nestedAssetDataElement, userAddress), - ); - break; - default: - break; - } - } - private _cleanupOrderRelatedState(orderHash: string): void { - const signedOrder = this._orderByOrderHash[orderHash]; - - this._orderFilledCancelledLazyStore.deleteFilledTakerAmount(orderHash); - this._orderFilledCancelledLazyStore.deleteIsCancelled(orderHash); - - this._deleteLazyStoreBalance(signedOrder.makerAssetData, signedOrder.makerAddress); - this._deleteLazyStoreProxyAllowance(signedOrder.makerAssetData, signedOrder.makerAddress); - this._deleteLazyStoreBalance(signedOrder.takerAssetData, signedOrder.takerAddress); - this._deleteLazyStoreProxyAllowance(signedOrder.takerAssetData, signedOrder.takerAddress); - - const zrxAssetData = this._orderFilledCancelledLazyStore.getZRXAssetData(); - if (!signedOrder.makerFee.isZero()) { - this._deleteLazyStoreBalance(zrxAssetData, signedOrder.makerAddress); - this._deleteLazyStoreProxyAllowance(zrxAssetData, signedOrder.makerAddress); - } - if (!signedOrder.takerFee.isZero()) { - this._deleteLazyStoreBalance(zrxAssetData, signedOrder.takerAddress); - this._deleteLazyStoreProxyAllowance(zrxAssetData, signedOrder.takerAddress); - } - } - private _onOrderExpired(orderHash: string): void { - const orderState: OrderState = { - isValid: false, - orderHash, - error: ExchangeContractErrs.OrderFillExpired, - }; - if (!_.isUndefined(this._orderByOrderHash[orderHash])) { - this.removeOrder(orderHash); - if (!_.isUndefined(this._callbackIfExists)) { - this._callbackIfExists(null, orderState); - } - } - } - private async _onEventWatcherCallbackAsync(err: Error | null, logIfExists?: LogEntryEvent): Promise<void> { - if (!_.isNull(err)) { - if (!_.isUndefined(this._callbackIfExists)) { - this._callbackIfExists(err); - } - return; - } - const maybeDecodedLog = this._collisionResistantAbiDecoder.tryToDecodeLogOrNoop<ContractEventArgs>( - // At this moment we are sure that no error occured and log is defined. - logIfExists as LogEntryEvent, - ); - const isLogDecoded = !_.isUndefined(((maybeDecodedLog as any) as LogWithDecodedArgs<ContractEventArgs>).event); - if (!isLogDecoded) { - return; // noop - } - const decodedLog = (maybeDecodedLog as any) as LogWithDecodedArgs<ContractEventArgs>; - const transactionHash = decodedLog.transactionHash; - switch (decodedLog.event) { - case ERC20TokenEvents.Approval: - case ERC721TokenEvents.Approval: { - // ERC20 and ERC721 Transfer events have the same name so we need to distinguish them by args - if (!_.isUndefined(decodedLog.args._value)) { - // ERC20 - // Invalidate cache - const args = decodedLog.args as ERC20TokenApprovalEventArgs; - const tokenAssetData = assetDataUtils.encodeERC20AssetData(decodedLog.address); - this._deleteLazyStoreProxyAllowance(tokenAssetData, args._owner); - // Revalidate orders - const orderHashes = this._dependentOrderHashesTracker.getDependentOrderHashesByAssetDataByMaker( - args._owner, - tokenAssetData, - ); - await this._emitRevalidateOrdersAsync(orderHashes, transactionHash); - break; - } else { - // ERC721 - // Invalidate cache - const args = decodedLog.args as ERC721TokenApprovalEventArgs; - const tokenAssetData = assetDataUtils.encodeERC721AssetData(decodedLog.address, args._tokenId); - this._deleteLazyStoreProxyAllowance(tokenAssetData, args._owner); - // Revalidate orders - const orderHashes = this._dependentOrderHashesTracker.getDependentOrderHashesByAssetDataByMaker( - args._owner, - tokenAssetData, - ); - await this._emitRevalidateOrdersAsync(orderHashes, transactionHash); - break; - } - } - case ERC20TokenEvents.Transfer: - case ERC721TokenEvents.Transfer: { - // ERC20 and ERC721 Transfer events have the same name so we need to distinguish them by args - if (!_.isUndefined(decodedLog.args._value)) { - // ERC20 - // Invalidate cache - const args = decodedLog.args as ERC20TokenTransferEventArgs; - const tokenAssetData = assetDataUtils.encodeERC20AssetData(decodedLog.address); - this._deleteLazyStoreBalance(tokenAssetData, args._from); - this._deleteLazyStoreBalance(tokenAssetData, args._to); - // Revalidate orders - const orderHashes = this._dependentOrderHashesTracker.getDependentOrderHashesByAssetDataByMaker( - args._from, - tokenAssetData, - ); - await this._emitRevalidateOrdersAsync(orderHashes, transactionHash); - break; - } else { - // ERC721 - // Invalidate cache - const args = decodedLog.args as ERC721TokenTransferEventArgs; - const tokenAssetData = assetDataUtils.encodeERC721AssetData(decodedLog.address, args._tokenId); - this._deleteLazyStoreBalance(tokenAssetData, args._from); - this._deleteLazyStoreBalance(tokenAssetData, args._to); - // Revalidate orders - const orderHashes = this._dependentOrderHashesTracker.getDependentOrderHashesByAssetDataByMaker( - args._from, - tokenAssetData, - ); - await this._emitRevalidateOrdersAsync(orderHashes, transactionHash); - break; - } - } - case ERC721TokenEvents.ApprovalForAll: { - // Invalidate cache - const args = decodedLog.args as ERC721TokenApprovalForAllEventArgs; - const tokenAddress = decodedLog.address; - this._balanceAndProxyAllowanceLazyStore.deleteAllERC721ProxyAllowance(tokenAddress, args._owner); - // Revalidate orders - const orderHashes = this._dependentOrderHashesTracker.getDependentOrderHashesByERC721ByMaker( - args._owner, - tokenAddress, - ); - await this._emitRevalidateOrdersAsync(orderHashes, transactionHash); - break; - } - case WETH9Events.Deposit: { - // Invalidate cache - const args = decodedLog.args as WETH9DepositEventArgs; - const tokenAssetData = assetDataUtils.encodeERC20AssetData(decodedLog.address); - this._deleteLazyStoreBalance(tokenAssetData, args._owner); - // Revalidate orders - const orderHashes = this._dependentOrderHashesTracker.getDependentOrderHashesByAssetDataByMaker( - args._owner, - tokenAssetData, - ); - await this._emitRevalidateOrdersAsync(orderHashes, transactionHash); - break; - } - case WETH9Events.Withdrawal: { - // Invalidate cache - const args = decodedLog.args as WETH9WithdrawalEventArgs; - const tokenAssetData = assetDataUtils.encodeERC20AssetData(decodedLog.address); - this._deleteLazyStoreBalance(tokenAssetData, args._owner); - // Revalidate orders - const orderHashes = this._dependentOrderHashesTracker.getDependentOrderHashesByAssetDataByMaker( - args._owner, - tokenAssetData, - ); - await this._emitRevalidateOrdersAsync(orderHashes, transactionHash); - break; - } - case ExchangeEvents.Fill: { - // Invalidate cache - const args = decodedLog.args as ExchangeFillEventArgs; - this._orderFilledCancelledLazyStore.deleteFilledTakerAmount(args.orderHash); - // Revalidate orders - const orderHash = args.orderHash; - const isOrderWatched = !_.isUndefined(this._orderByOrderHash[orderHash]); - if (isOrderWatched) { - await this._emitRevalidateOrdersAsync([orderHash], transactionHash); - } - break; - } - case ExchangeEvents.Cancel: { - // Invalidate cache - const args = decodedLog.args as ExchangeCancelEventArgs; - this._orderFilledCancelledLazyStore.deleteIsCancelled(args.orderHash); - // Revalidate orders - const orderHash = args.orderHash; - const isOrderWatched = !_.isUndefined(this._orderByOrderHash[orderHash]); - if (isOrderWatched) { - await this._emitRevalidateOrdersAsync([orderHash], transactionHash); - } - break; - } - case ExchangeEvents.CancelUpTo: { - // TODO(logvinov): Do it smarter and actually look at the salt and order epoch - // Invalidate cache - const args = decodedLog.args as ExchangeCancelUpToEventArgs; - this._orderFilledCancelledLazyStore.deleteAllIsCancelled(); - // Revalidate orders - const orderHashes = this._dependentOrderHashesTracker.getDependentOrderHashesByMaker(args.makerAddress); - await this._emitRevalidateOrdersAsync(orderHashes, transactionHash); - break; - } - - default: - throw errorUtils.spawnSwitchErr('decodedLog.event', decodedLog.event); - } - } - private async _emitRevalidateOrdersAsync(orderHashes: string[], transactionHash?: string): Promise<void> { - for (const orderHash of orderHashes) { - const signedOrder = this._orderByOrderHash[orderHash]; - // Most of these calls will never reach the network because the data is fetched from stores - // and only updated when cache is invalidated - const orderState = await this._orderStateUtils.getOpenOrderStateAsync(signedOrder, transactionHash); - if (_.isUndefined(this._callbackIfExists)) { - break; // Unsubscribe was called - } - if (_.isEqual(orderState, this._orderStateByOrderHashCache[orderHash])) { - // Actual order state didn't change - continue; - } else { - this._orderStateByOrderHashCache[orderHash] = orderState; - } - this._callbackIfExists(null, orderState); - } - } -} diff --git a/packages/order-watcher/src/order_watcher/order_watcher_web_socket_server.ts b/packages/order-watcher/src/order_watcher/order_watcher_web_socket_server.ts deleted file mode 100644 index b75b07603..000000000 --- a/packages/order-watcher/src/order_watcher/order_watcher_web_socket_server.ts +++ /dev/null @@ -1,200 +0,0 @@ -import { ContractAddresses } from '@0x/contract-addresses'; -import { schemas } from '@0x/json-schemas'; -import { OrderStateInvalid, OrderStateValid, SignedOrder } from '@0x/types'; -import { BigNumber, logUtils } from '@0x/utils'; -import { Provider } from 'ethereum-types'; -import * as http from 'http'; -import * as WebSocket from 'websocket'; - -import { GetStatsResult, OrderWatcherConfig, OrderWatcherMethod, WebSocketRequest, WebSocketResponse } from '../types'; -import { assert } from '../utils/assert'; - -import { OrderWatcher } from './order_watcher'; - -const DEFAULT_HTTP_PORT = 8080; -const JSON_RPC_VERSION = '2.0'; - -// Wraps the OrderWatcher functionality in a WebSocket server. Motivations: -// 1) Users can watch orders via non-typescript programs. -// 2) Better encapsulation so that users can work -export class OrderWatcherWebSocketServer { - private readonly _orderWatcher: OrderWatcher; - private readonly _httpServer: http.Server; - private readonly _connectionStore: Set<WebSocket.connection>; - private readonly _wsServer: WebSocket.server; - private readonly _isVerbose: boolean; - /** - * Recover types lost when the payload is stringified. - */ - private static _parseSignedOrder(rawRequest: any): SignedOrder { - const bigNumberFields = [ - 'salt', - 'makerFee', - 'takerFee', - 'makerAssetAmount', - 'takerAssetAmount', - 'expirationTimeSeconds', - ]; - for (const field of bigNumberFields) { - rawRequest[field] = new BigNumber(rawRequest[field]); - } - return rawRequest; - } - - /** - * Instantiate a new WebSocket server which provides OrderWatcher functionality - * @param provider Web3 provider to use for JSON RPC calls. - * @param networkId NetworkId to watch orders on. - * @param contractAddresses Optional contract addresses. Defaults to known - * addresses based on networkId. - * @param orderWatcherConfig OrderWatcher configurations. isVerbose sets the verbosity for the WebSocket server aswell. - * @param isVerbose Whether to enable verbose logging. Defaults to true. - */ - constructor( - provider: Provider, - networkId: number, - contractAddresses?: ContractAddresses, - orderWatcherConfig?: Partial<OrderWatcherConfig>, - ) { - this._isVerbose = - orderWatcherConfig !== undefined && orderWatcherConfig.isVerbose !== undefined - ? orderWatcherConfig.isVerbose - : true; - this._orderWatcher = new OrderWatcher(provider, networkId, contractAddresses, orderWatcherConfig); - this._connectionStore = new Set(); - this._httpServer = http.createServer(); - this._wsServer = new WebSocket.server({ - httpServer: this._httpServer, - // Avoid setting autoAcceptConnections to true as it defeats all - // standard cross-origin protection facilities built into the protocol - // and the browser. - // Source: https://www.npmjs.com/package/websocket#server-example - // Also ensures that a request event is emitted by - // the server whenever a new WebSocket request is made. - autoAcceptConnections: false, - }); - - this._wsServer.on('request', async (request: any) => { - // Designed for usage pattern where client and server are run on the same - // machine by the same user. As such, no security checks are in place. - const connection: WebSocket.connection = request.accept(null, request.origin); - this._log(`${new Date()} [Server] Accepted connection from origin ${request.origin}.`); - connection.on('message', this._onMessageCallbackAsync.bind(this, connection)); - connection.on('close', this._onCloseCallback.bind(this, connection)); - this._connectionStore.add(connection); - }); - } - - /** - * Activates the WebSocket server by subscribing to the OrderWatcher and - * starting the WebSocket's HTTP server - */ - public start(): void { - // Have the WebSocket server subscribe to the OrderWatcher to receive updates. - // These updates are then broadcast to clients in the _connectionStore. - this._orderWatcher.subscribe(this._broadcastCallback.bind(this)); - - const port = process.env.ORDER_WATCHER_HTTP_PORT || DEFAULT_HTTP_PORT; - this._httpServer.listen(port, () => { - this._log(`${new Date()} [Server] Listening on port ${port}`); - }); - } - - /** - * Deactivates the WebSocket server by stopping the HTTP server from accepting - * new connections and unsubscribing from the OrderWatcher - */ - public stop(): void { - this._httpServer.close(); - this._orderWatcher.unsubscribe(); - } - - private _log(...args: any[]): void { - if (this._isVerbose) { - logUtils.log(...args); - } - } - - private async _onMessageCallbackAsync(connection: WebSocket.connection, message: any): Promise<void> { - let response: WebSocketResponse; - let id: number | null = null; - try { - assert.doesConformToSchema('message', message, schemas.orderWatcherWebSocketUtf8MessageSchema); - const request: WebSocketRequest = JSON.parse(message.utf8Data); - id = request.id; - assert.doesConformToSchema('request', request, schemas.orderWatcherWebSocketRequestSchema); - assert.isString(request.jsonrpc, JSON_RPC_VERSION); - response = { - id, - jsonrpc: JSON_RPC_VERSION, - method: request.method, - result: await this._routeRequestAsync(request), - }; - } catch (err) { - response = { - id, - jsonrpc: JSON_RPC_VERSION, - method: null, - error: err.toString(), - }; - } - this._log(`${new Date()} [Server] OrderWatcher output: ${JSON.stringify(response)}`); - connection.sendUTF(JSON.stringify(response)); - } - - private _onCloseCallback(connection: WebSocket.connection): void { - this._connectionStore.delete(connection); - this._log(`${new Date()} [Server] Client ${connection.remoteAddress} disconnected.`); - } - - private async _routeRequestAsync(request: WebSocketRequest): Promise<GetStatsResult | undefined> { - this._log(`${new Date()} [Server] Request received: ${request.method}`); - switch (request.method) { - case OrderWatcherMethod.AddOrder: { - const signedOrder: SignedOrder = OrderWatcherWebSocketServer._parseSignedOrder( - request.params.signedOrder, - ); - await this._orderWatcher.addOrderAsync(signedOrder); - break; - } - case OrderWatcherMethod.RemoveOrder: { - this._orderWatcher.removeOrder(request.params.orderHash || 'undefined'); - break; - } - case OrderWatcherMethod.GetStats: { - return this._orderWatcher.getStats(); - } - default: - // Should never reach here. Should be caught by JSON schema check. - throw new Error(`Unexpected default case hit for request.method`); - } - return undefined; - } - - /** - * Broadcasts OrderState changes to ALL connected clients. At the moment, - * we do not support clients subscribing to only a subset of orders. As such, - * Client B will be notified of changes to an order that Client A added. - */ - private _broadcastCallback(err: Error | null, orderState?: OrderStateValid | OrderStateInvalid | undefined): void { - const method = OrderWatcherMethod.Update; - const response = - err === null - ? { - jsonrpc: JSON_RPC_VERSION, - method, - result: orderState, - } - : { - jsonrpc: JSON_RPC_VERSION, - method, - error: { - code: -32000, - message: err.message, - }, - }; - this._connectionStore.forEach((connection: WebSocket.connection) => { - connection.sendUTF(JSON.stringify(response)); - }); - } -} |