aboutsummaryrefslogtreecommitdiffstats
path: root/packages/order-watcher/src/order_watcher
diff options
context:
space:
mode:
Diffstat (limited to 'packages/order-watcher/src/order_watcher')
-rw-r--r--packages/order-watcher/src/order_watcher/collision_resistant_abi_decoder.ts54
-rw-r--r--packages/order-watcher/src/order_watcher/dependent_order_hashes_tracker.ts243
-rw-r--r--packages/order-watcher/src/order_watcher/event_watcher.ts158
-rw-r--r--packages/order-watcher/src/order_watcher/expiration_watcher.ts89
-rw-r--r--packages/order-watcher/src/order_watcher/order_watcher.ts490
-rw-r--r--packages/order-watcher/src/order_watcher/order_watcher_web_socket_server.ts200
6 files changed, 0 insertions, 1234 deletions
diff --git a/packages/order-watcher/src/order_watcher/collision_resistant_abi_decoder.ts b/packages/order-watcher/src/order_watcher/collision_resistant_abi_decoder.ts
deleted file mode 100644
index 2ea796947..000000000
--- a/packages/order-watcher/src/order_watcher/collision_resistant_abi_decoder.ts
+++ /dev/null
@@ -1,54 +0,0 @@
-import { AbiDecoder } from '@0x/utils';
-import { ContractAbi, DecodedLogArgs, LogEntry, LogWithDecodedArgs, RawLog } from 'ethereum-types';
-
-const TOKEN_TYPE_COLLISION = `Token can't be marked as ERC20 and ERC721 at the same time`;
-
-/**
- * ERC20 and ERC721 have some events with different args but colliding signature.
- * For exmaple:
- * Transfer(_from address, _to address, _value uint256)
- * Transfer(_from address, _to address, _tokenId uint256)
- * Both have the signature:
- * Transfer(address,address,uint256)
- *
- * In order to correctly decode those events we need to know the token type by address in advance.
- * You can pass it by calling `this.addERC20Token(address)` or `this.addERC721Token(address)`
- */
-export class CollisionResistanceAbiDecoder {
- private readonly _erc20AbiDecoder: AbiDecoder;
- private readonly _erc721AbiDecoder: AbiDecoder;
- private readonly _restAbiDecoder: AbiDecoder;
- private readonly _knownERC20Tokens = new Set();
- private readonly _knownERC721Tokens = new Set();
- constructor(erc20Abi: ContractAbi, erc721Abi: ContractAbi, abis: ContractAbi[]) {
- this._erc20AbiDecoder = new AbiDecoder([erc20Abi]);
- this._erc721AbiDecoder = new AbiDecoder([erc721Abi]);
- this._restAbiDecoder = new AbiDecoder(abis);
- }
- public tryToDecodeLogOrNoop<ArgsType extends DecodedLogArgs>(log: LogEntry): LogWithDecodedArgs<ArgsType> | RawLog {
- if (this._knownERC20Tokens.has(log.address)) {
- const maybeDecodedERC20Log = this._erc20AbiDecoder.tryToDecodeLogOrNoop(log);
- return maybeDecodedERC20Log;
- } else if (this._knownERC721Tokens.has(log.address)) {
- const maybeDecodedERC721Log = this._erc721AbiDecoder.tryToDecodeLogOrNoop(log);
- return maybeDecodedERC721Log;
- } else {
- const maybeDecodedLog = this._restAbiDecoder.tryToDecodeLogOrNoop(log);
- return maybeDecodedLog;
- }
- }
- // Hints the ABI decoder that a particular token address is ERC20 and events from it should be decoded as ERC20 events
- public addERC20Token(address: string): void {
- if (this._knownERC721Tokens.has(address)) {
- throw new Error(TOKEN_TYPE_COLLISION);
- }
- this._knownERC20Tokens.add(address);
- }
- // Hints the ABI decoder that a particular token address is ERC721 and events from it should be decoded as ERC721 events
- public addERC721Token(address: string): void {
- if (this._knownERC20Tokens.has(address)) {
- throw new Error(TOKEN_TYPE_COLLISION);
- }
- this._knownERC721Tokens.add(address);
- }
-}
diff --git a/packages/order-watcher/src/order_watcher/dependent_order_hashes_tracker.ts b/packages/order-watcher/src/order_watcher/dependent_order_hashes_tracker.ts
deleted file mode 100644
index d1085014c..000000000
--- a/packages/order-watcher/src/order_watcher/dependent_order_hashes_tracker.ts
+++ /dev/null
@@ -1,243 +0,0 @@
-import { assetDataUtils, orderHashUtils } from '@0x/order-utils';
-import { AssetProxyId, SignedOrder } from '@0x/types';
-import { BigNumber } from '@0x/utils';
-import * as _ from 'lodash';
-
-export interface OrderHashesByMakerAddress {
- [makerAddress: string]: Set<string>;
-}
-
-export interface OrderHashesByERC20ByMakerAddress {
- [makerAddress: string]: {
- [erc20TokenAddress: string]: Set<string>;
- };
-}
-
-export interface OrderHashesByERC721AddressByTokenIdByMakerAddress {
- [makerAddress: string]: {
- [erc721TokenAddress: string]: {
- // Ideally erc721TokenId should be a BigNumber, but it's not a valid index type so we just convert it to a string before using it as an index
- [erc721TokenId: string]: Set<string>;
- };
- };
-}
-
-/**
- */
-export class DependentOrderHashesTracker {
- private readonly _zrxTokenAddress: string;
- // `_orderHashesByMakerAddress` is redundant and could be generated from
- // `_orderHashesByERC20ByMakerAddress` and `_orderHashesByERC721AddressByTokenIdByMakerAddress`
- // on the fly by merging all the entries together but it's more complex and computationally heavy.
- // We might change that in future if we're move memory-constrained.
- private readonly _orderHashesByMakerAddress: OrderHashesByMakerAddress = {};
- private readonly _orderHashesByERC20ByMakerAddress: OrderHashesByERC20ByMakerAddress = {};
- private readonly _orderHashesByERC721AddressByTokenIdByMakerAddress: OrderHashesByERC721AddressByTokenIdByMakerAddress = {};
- constructor(zrxTokenAddress: string) {
- this._zrxTokenAddress = zrxTokenAddress;
- }
- public getDependentOrderHashesByERC721ByMaker(makerAddress: string, tokenAddress: string): string[] {
- const orderHashSets = _.values(
- this._orderHashesByERC721AddressByTokenIdByMakerAddress[makerAddress][tokenAddress],
- );
- const orderHashList = _.reduce(
- orderHashSets,
- (accumulator, orderHashSet) => [...accumulator, ...orderHashSet],
- [] as string[],
- );
- const uniqueOrderHashList = _.uniq(orderHashList);
- return uniqueOrderHashList;
- }
- public getDependentOrderHashesByMaker(makerAddress: string): string[] {
- const dependentOrderHashes = Array.from(this._orderHashesByMakerAddress[makerAddress] || {});
- return dependentOrderHashes;
- }
- public getDependentOrderHashesByAssetDataByMaker(makerAddress: string, assetData: string): string[] {
- const decodedAssetData = assetDataUtils.decodeAssetDataOrThrow(assetData);
- const dependentOrderHashes =
- decodedAssetData.assetProxyId === AssetProxyId.ERC20
- ? this._getDependentOrderHashesByERC20AssetData(makerAddress, assetData)
- : this._getDependentOrderHashesByERC721AssetData(makerAddress, assetData);
- return dependentOrderHashes;
- }
- public addToDependentOrderHashes(signedOrder: SignedOrder): void {
- this._addAssetDataToDependentOrderHashes(signedOrder, signedOrder.makerAssetData);
- this._addToERC20DependentOrderHashes(signedOrder, this._zrxTokenAddress);
- this._addToMakerDependentOrderHashes(signedOrder);
- }
- public removeFromDependentOrderHashes(signedOrder: SignedOrder): void {
- this._removeAssetDataFromDependentOrderHashes(signedOrder, signedOrder.makerAssetData);
- // If makerToken === ZRX then we already removed it and we don't need to remove it again.
- const decodedMakerAssetData = assetDataUtils.decodeAssetDataOrThrow(signedOrder.makerAssetData);
- if (
- assetDataUtils.isERC20AssetData(decodedMakerAssetData) &&
- decodedMakerAssetData.tokenAddress !== this._zrxTokenAddress
- ) {
- this._removeFromERC20DependentOrderhashes(signedOrder, this._zrxTokenAddress);
- }
- this._removeFromMakerDependentOrderhashes(signedOrder);
- }
- private _getDependentOrderHashesByERC20AssetData(makerAddress: string, erc20AssetData: string): string[] {
- const tokenAddress = assetDataUtils.decodeERC20AssetData(erc20AssetData).tokenAddress;
- let dependentOrderHashes: string[] = [];
- if (
- !_.isUndefined(this._orderHashesByERC20ByMakerAddress[makerAddress]) &&
- !_.isUndefined(this._orderHashesByERC20ByMakerAddress[makerAddress][tokenAddress])
- ) {
- dependentOrderHashes = Array.from(this._orderHashesByERC20ByMakerAddress[makerAddress][tokenAddress]);
- }
- return dependentOrderHashes;
- }
- private _getDependentOrderHashesByERC721AssetData(makerAddress: string, erc721AssetData: string): string[] {
- const tokenAddress = assetDataUtils.decodeERC721AssetData(erc721AssetData).tokenAddress;
- const tokenId = assetDataUtils.decodeERC721AssetData(erc721AssetData).tokenId;
- let dependentOrderHashes: string[] = [];
- if (
- !_.isUndefined(this._orderHashesByERC721AddressByTokenIdByMakerAddress[makerAddress]) &&
- !_.isUndefined(this._orderHashesByERC721AddressByTokenIdByMakerAddress[makerAddress][tokenAddress]) &&
- !_.isUndefined(
- this._orderHashesByERC721AddressByTokenIdByMakerAddress[makerAddress][tokenAddress][tokenId.toString()],
- )
- ) {
- dependentOrderHashes = Array.from(
- this._orderHashesByERC721AddressByTokenIdByMakerAddress[makerAddress][tokenAddress][tokenId.toString()],
- );
- }
- return dependentOrderHashes;
- }
- private _addToERC20DependentOrderHashes(signedOrder: SignedOrder, erc20TokenAddress: string): void {
- const orderHash = orderHashUtils.getOrderHashHex(signedOrder);
- if (_.isUndefined(this._orderHashesByERC20ByMakerAddress[signedOrder.makerAddress])) {
- this._orderHashesByERC20ByMakerAddress[signedOrder.makerAddress] = {};
- }
- if (_.isUndefined(this._orderHashesByERC20ByMakerAddress[signedOrder.makerAddress][erc20TokenAddress])) {
- this._orderHashesByERC20ByMakerAddress[signedOrder.makerAddress][erc20TokenAddress] = new Set();
- }
- this._orderHashesByERC20ByMakerAddress[signedOrder.makerAddress][erc20TokenAddress].add(orderHash);
- }
- private _addToERC721DependentOrderHashes(
- signedOrder: SignedOrder,
- erc721TokenAddress: string,
- tokenId: BigNumber,
- ): void {
- const orderHash = orderHashUtils.getOrderHashHex(signedOrder);
- if (_.isUndefined(this._orderHashesByERC721AddressByTokenIdByMakerAddress[signedOrder.makerAddress])) {
- this._orderHashesByERC721AddressByTokenIdByMakerAddress[signedOrder.makerAddress] = {};
- }
-
- if (
- _.isUndefined(
- this._orderHashesByERC721AddressByTokenIdByMakerAddress[signedOrder.makerAddress][erc721TokenAddress],
- )
- ) {
- this._orderHashesByERC721AddressByTokenIdByMakerAddress[signedOrder.makerAddress][erc721TokenAddress] = {};
- }
-
- if (
- _.isUndefined(
- this._orderHashesByERC721AddressByTokenIdByMakerAddress[signedOrder.makerAddress][erc721TokenAddress][
- tokenId.toString()
- ],
- )
- ) {
- this._orderHashesByERC721AddressByTokenIdByMakerAddress[signedOrder.makerAddress][erc721TokenAddress][
- tokenId.toString()
- ] = new Set();
- }
-
- this._orderHashesByERC721AddressByTokenIdByMakerAddress[signedOrder.makerAddress][erc721TokenAddress][
- tokenId.toString()
- ].add(orderHash);
- }
- private _addAssetDataToDependentOrderHashes(signedOrder: SignedOrder, assetData: string): void {
- const decodedAssetData = assetDataUtils.decodeAssetDataOrThrow(assetData);
- if (assetDataUtils.isERC20AssetData(decodedAssetData)) {
- this._addToERC20DependentOrderHashes(signedOrder, decodedAssetData.tokenAddress);
- } else if (assetDataUtils.isERC721AssetData(decodedAssetData)) {
- this._addToERC721DependentOrderHashes(signedOrder, decodedAssetData.tokenAddress, decodedAssetData.tokenId);
- } else if (assetDataUtils.isMultiAssetData(decodedAssetData)) {
- _.each(decodedAssetData.nestedAssetData, nestedAssetDataElement =>
- this._addAssetDataToDependentOrderHashes(signedOrder, nestedAssetDataElement),
- );
- }
- }
- private _addToMakerDependentOrderHashes(signedOrder: SignedOrder): void {
- const orderHash = orderHashUtils.getOrderHashHex(signedOrder);
- if (_.isUndefined(this._orderHashesByMakerAddress[signedOrder.makerAddress])) {
- this._orderHashesByMakerAddress[signedOrder.makerAddress] = new Set();
- }
- this._orderHashesByMakerAddress[signedOrder.makerAddress].add(orderHash);
- }
- private _removeFromERC20DependentOrderhashes(signedOrder: SignedOrder, erc20TokenAddress: string): void {
- const orderHash = orderHashUtils.getOrderHashHex(signedOrder);
- this._orderHashesByERC20ByMakerAddress[signedOrder.makerAddress][erc20TokenAddress].delete(orderHash);
-
- if (_.isEmpty(this._orderHashesByERC20ByMakerAddress[signedOrder.makerAddress][erc20TokenAddress])) {
- delete this._orderHashesByERC20ByMakerAddress[signedOrder.makerAddress][erc20TokenAddress];
- }
-
- if (_.isEmpty(this._orderHashesByERC20ByMakerAddress[signedOrder.makerAddress])) {
- delete this._orderHashesByERC20ByMakerAddress[signedOrder.makerAddress];
- }
- }
- private _removeFromERC721DependentOrderhashes(
- signedOrder: SignedOrder,
- erc721TokenAddress: string,
- tokenId: BigNumber,
- ): void {
- const orderHash = orderHashUtils.getOrderHashHex(signedOrder);
- this._orderHashesByERC721AddressByTokenIdByMakerAddress[signedOrder.makerAddress][erc721TokenAddress][
- tokenId.toString()
- ].delete(orderHash);
-
- if (
- _.isEmpty(
- this._orderHashesByERC721AddressByTokenIdByMakerAddress[signedOrder.makerAddress][erc721TokenAddress][
- tokenId.toString()
- ],
- )
- ) {
- delete this._orderHashesByERC721AddressByTokenIdByMakerAddress[signedOrder.makerAddress][
- erc721TokenAddress
- ][tokenId.toString()];
- }
-
- if (
- _.isEmpty(
- this._orderHashesByERC721AddressByTokenIdByMakerAddress[signedOrder.makerAddress][erc721TokenAddress],
- )
- ) {
- delete this._orderHashesByERC721AddressByTokenIdByMakerAddress[signedOrder.makerAddress][
- erc721TokenAddress
- ];
- }
-
- if (_.isEmpty(this._orderHashesByERC721AddressByTokenIdByMakerAddress[signedOrder.makerAddress])) {
- delete this._orderHashesByERC721AddressByTokenIdByMakerAddress[signedOrder.makerAddress];
- }
- }
- private _removeFromMakerDependentOrderhashes(signedOrder: SignedOrder): void {
- const orderHash = orderHashUtils.getOrderHashHex(signedOrder);
- this._orderHashesByMakerAddress[signedOrder.makerAddress].delete(orderHash);
-
- if (_.isEmpty(this._orderHashesByMakerAddress[signedOrder.makerAddress])) {
- delete this._orderHashesByMakerAddress[signedOrder.makerAddress];
- }
- }
- private _removeAssetDataFromDependentOrderHashes(signedOrder: SignedOrder, assetData: string): void {
- const decodedAssetData = assetDataUtils.decodeAssetDataOrThrow(assetData);
- if (assetDataUtils.isERC20AssetData(decodedAssetData)) {
- this._removeFromERC20DependentOrderhashes(signedOrder, decodedAssetData.tokenAddress);
- } else if (assetDataUtils.isERC721AssetData(decodedAssetData)) {
- this._removeFromERC721DependentOrderhashes(
- signedOrder,
- decodedAssetData.tokenAddress,
- decodedAssetData.tokenId,
- );
- } else if (assetDataUtils.isMultiAssetData(decodedAssetData)) {
- _.each(decodedAssetData.nestedAssetData, nestedAssetDataElement =>
- this._removeAssetDataFromDependentOrderHashes(signedOrder, nestedAssetDataElement),
- );
- }
- }
-}
diff --git a/packages/order-watcher/src/order_watcher/event_watcher.ts b/packages/order-watcher/src/order_watcher/event_watcher.ts
deleted file mode 100644
index 3149d858b..000000000
--- a/packages/order-watcher/src/order_watcher/event_watcher.ts
+++ /dev/null
@@ -1,158 +0,0 @@
-import { intervalUtils, logUtils } from '@0x/utils';
-import { marshaller, Web3Wrapper } from '@0x/web3-wrapper';
-import { BlockParamLiteral, FilterObject, LogEntry, Provider, RawLogEntry } from 'ethereum-types';
-import { Block, BlockAndLogStreamer, Log } from 'ethereumjs-blockstream';
-import * as _ from 'lodash';
-
-import { EventWatcherCallback, OrderWatcherError } from '../types';
-import { assert } from '../utils/assert';
-
-const DEFAULT_EVENT_POLLING_INTERVAL_MS = 200;
-
-enum LogEventState {
- Removed,
- Added,
-}
-
-/**
- * The EventWatcher watches for blockchain events at the specified block confirmation
- * depth.
- */
-export class EventWatcher {
- private readonly _web3Wrapper: Web3Wrapper;
- private readonly _isVerbose: boolean;
- private _blockAndLogStreamerIfExists: BlockAndLogStreamer<Block, Log> | undefined;
- private _blockAndLogStreamIntervalIfExists?: NodeJS.Timer;
- private _onLogAddedSubscriptionToken: string | undefined;
- private _onLogRemovedSubscriptionToken: string | undefined;
- private readonly _pollingIntervalMs: number;
- constructor(
- provider: Provider,
- pollingIntervalIfExistsMs: undefined | number,
- stateLayer: BlockParamLiteral,
- isVerbose: boolean,
- ) {
- this._isVerbose = isVerbose;
- this._web3Wrapper = new Web3Wrapper(provider);
- this._pollingIntervalMs = _.isUndefined(pollingIntervalIfExistsMs)
- ? DEFAULT_EVENT_POLLING_INTERVAL_MS
- : pollingIntervalIfExistsMs;
- this._blockAndLogStreamerIfExists = undefined;
- this._blockAndLogStreamIntervalIfExists = undefined;
- this._onLogAddedSubscriptionToken = undefined;
- this._onLogRemovedSubscriptionToken = undefined;
- }
- public subscribe(callback: EventWatcherCallback): void {
- assert.isFunction('callback', callback);
- if (!_.isUndefined(this._blockAndLogStreamIntervalIfExists)) {
- throw new Error(OrderWatcherError.SubscriptionAlreadyPresent);
- }
- this._startBlockAndLogStream(callback);
- }
- public unsubscribe(): void {
- if (_.isUndefined(this._blockAndLogStreamIntervalIfExists)) {
- throw new Error(OrderWatcherError.SubscriptionNotFound);
- }
- this._stopBlockAndLogStream();
- }
- private _startBlockAndLogStream(callback: EventWatcherCallback): void {
- if (!_.isUndefined(this._blockAndLogStreamerIfExists)) {
- throw new Error(OrderWatcherError.SubscriptionAlreadyPresent);
- }
- this._blockAndLogStreamerIfExists = new BlockAndLogStreamer(
- this._blockstreamGetBlockOrNullAsync.bind(this),
- this._blockstreamGetLogsAsync.bind(this),
- this._onBlockAndLogStreamerError.bind(this),
- );
- const catchAllLogFilter = {};
- this._blockAndLogStreamerIfExists.addLogFilter(catchAllLogFilter);
- this._blockAndLogStreamIntervalIfExists = intervalUtils.setAsyncExcludingInterval(
- this._reconcileBlockAsync.bind(this),
- this._pollingIntervalMs,
- this._onBlockAndLogStreamerError.bind(this),
- );
- let isRemoved = false;
- this._onLogAddedSubscriptionToken = this._blockAndLogStreamerIfExists.subscribeToOnLogAdded(
- this._onLogStateChangedAsync.bind(this, callback, isRemoved),
- );
- isRemoved = true;
- this._onLogRemovedSubscriptionToken = this._blockAndLogStreamerIfExists.subscribeToOnLogRemoved(
- this._onLogStateChangedAsync.bind(this, callback, isRemoved),
- );
- }
- // This method only exists in order to comply with the expected interface of Blockstream's constructor
- private async _blockstreamGetBlockOrNullAsync(hash: string): Promise<Block | null> {
- const shouldIncludeTransactionData = false;
- const blockOrNull = await this._web3Wrapper.sendRawPayloadAsync<Block | null>({
- method: 'eth_getBlockByHash',
- params: [hash, shouldIncludeTransactionData],
- });
- return blockOrNull;
- }
- // This method only exists in order to comply with the expected interface of Blockstream's constructor
- private async _blockstreamGetLatestBlockOrNullAsync(): Promise<Block | null> {
- const shouldIncludeTransactionData = false;
- const blockOrNull = await this._web3Wrapper.sendRawPayloadAsync<Block | null>({
- method: 'eth_getBlockByNumber',
- params: [BlockParamLiteral.Latest, shouldIncludeTransactionData],
- });
- return blockOrNull;
- }
- // This method only exists in order to comply with the expected interface of Blockstream's constructor
- private async _blockstreamGetLogsAsync(filterOptions: FilterObject): Promise<RawLogEntry[]> {
- const logs = await this._web3Wrapper.sendRawPayloadAsync<RawLogEntry[]>({
- method: 'eth_getLogs',
- params: [filterOptions],
- });
- return logs as RawLogEntry[];
- }
- private _stopBlockAndLogStream(): void {
- if (_.isUndefined(this._blockAndLogStreamerIfExists)) {
- throw new Error(OrderWatcherError.SubscriptionNotFound);
- }
- this._blockAndLogStreamerIfExists.unsubscribeFromOnLogAdded(this._onLogAddedSubscriptionToken as string);
- this._blockAndLogStreamerIfExists.unsubscribeFromOnLogRemoved(this._onLogRemovedSubscriptionToken as string);
- intervalUtils.clearAsyncExcludingInterval(this._blockAndLogStreamIntervalIfExists as NodeJS.Timer);
- delete this._blockAndLogStreamerIfExists;
- delete this._blockAndLogStreamIntervalIfExists;
- }
- private async _onLogStateChangedAsync(
- callback: EventWatcherCallback,
- isRemoved: boolean,
- rawLog: RawLogEntry,
- ): Promise<void> {
- const log: LogEntry = marshaller.unmarshalLog(rawLog);
- await this._emitDifferencesAsync(log, isRemoved ? LogEventState.Removed : LogEventState.Added, callback);
- }
- private async _reconcileBlockAsync(): Promise<void> {
- const latestBlockOrNull = await this._blockstreamGetLatestBlockOrNullAsync();
- if (_.isNull(latestBlockOrNull)) {
- return; // noop
- }
- // We need to coerce to Block type cause Web3.Block includes types for mempool blocks
- if (!_.isUndefined(this._blockAndLogStreamerIfExists)) {
- // If we clear the interval while fetching the block - this._blockAndLogStreamer will be undefined
- await this._blockAndLogStreamerIfExists.reconcileNewBlock(latestBlockOrNull);
- }
- }
- private async _emitDifferencesAsync(
- log: LogEntry,
- logEventState: LogEventState,
- callback: EventWatcherCallback,
- ): Promise<void> {
- const logEvent = {
- removed: logEventState === LogEventState.Removed,
- ...log,
- };
- if (!_.isUndefined(this._blockAndLogStreamIntervalIfExists)) {
- callback(null, logEvent);
- }
- }
- private _onBlockAndLogStreamerError(err: Error): void {
- // Since Blockstream errors are all recoverable, we simply log them if the verbose
- // config is passed in.
- if (this._isVerbose) {
- logUtils.warn(err);
- }
- }
-}
diff --git a/packages/order-watcher/src/order_watcher/expiration_watcher.ts b/packages/order-watcher/src/order_watcher/expiration_watcher.ts
deleted file mode 100644
index 82590efde..000000000
--- a/packages/order-watcher/src/order_watcher/expiration_watcher.ts
+++ /dev/null
@@ -1,89 +0,0 @@
-import { BigNumber, intervalUtils } from '@0x/utils';
-import { RBTree } from 'bintrees';
-import * as _ from 'lodash';
-
-import { OrderWatcherError } from '../types';
-import { utils } from '../utils/utils';
-
-const DEFAULT_EXPIRATION_MARGIN_MS = 0;
-const DEFAULT_ORDER_EXPIRATION_CHECKING_INTERVAL_MS = 50;
-
-/**
- * This class includes the functionality to detect expired orders.
- * It stores them in a min heap by expiration time and checks for expired ones every `orderExpirationCheckingIntervalMs`
- */
-export class ExpirationWatcher {
- private readonly _orderHashByExpirationRBTree: RBTree<string>;
- private readonly _expiration: { [orderHash: string]: BigNumber } = {};
- private readonly _orderExpirationCheckingIntervalMs: number;
- private readonly _expirationMarginMs: number;
- private _orderExpirationCheckingIntervalIdIfExists?: NodeJS.Timer;
- constructor(expirationMarginIfExistsMs?: number, orderExpirationCheckingIntervalIfExistsMs?: number) {
- this._orderExpirationCheckingIntervalMs =
- orderExpirationCheckingIntervalIfExistsMs || DEFAULT_ORDER_EXPIRATION_CHECKING_INTERVAL_MS;
- this._expirationMarginMs = expirationMarginIfExistsMs || DEFAULT_EXPIRATION_MARGIN_MS;
- this._orderExpirationCheckingIntervalMs =
- expirationMarginIfExistsMs || DEFAULT_ORDER_EXPIRATION_CHECKING_INTERVAL_MS;
- const comparator = (lhsOrderHash: string, rhsOrderHash: string) => {
- const lhsExpiration = this._expiration[lhsOrderHash].toNumber();
- const rhsExpiration = this._expiration[rhsOrderHash].toNumber();
- if (lhsExpiration !== rhsExpiration) {
- return lhsExpiration - rhsExpiration;
- } else {
- // HACK: If two orders have identical expirations, the order in which they are emitted by the
- // ExpirationWatcher does not matter, so we emit them in alphabetical order by orderHash.
- return lhsOrderHash.localeCompare(rhsOrderHash);
- }
- };
- this._orderHashByExpirationRBTree = new RBTree(comparator);
- }
- public subscribe(callback: (orderHash: string) => void): void {
- if (!_.isUndefined(this._orderExpirationCheckingIntervalIdIfExists)) {
- throw new Error(OrderWatcherError.SubscriptionAlreadyPresent);
- }
- this._orderExpirationCheckingIntervalIdIfExists = intervalUtils.setInterval(
- this._pruneExpiredOrders.bind(this, callback),
- this._orderExpirationCheckingIntervalMs,
- _.noop.bind(_), // _pruneExpiredOrders never throws
- );
- }
- public unsubscribe(): void {
- if (_.isUndefined(this._orderExpirationCheckingIntervalIdIfExists)) {
- throw new Error(OrderWatcherError.SubscriptionNotFound);
- }
- intervalUtils.clearInterval(this._orderExpirationCheckingIntervalIdIfExists);
- delete this._orderExpirationCheckingIntervalIdIfExists;
- }
- public addOrder(orderHash: string, expirationUnixTimestampMs: BigNumber): void {
- this._expiration[orderHash] = expirationUnixTimestampMs;
- this._orderHashByExpirationRBTree.insert(orderHash);
- }
- public removeOrder(orderHash: string): void {
- if (_.isUndefined(this._expiration[orderHash])) {
- return; // noop since order already removed
- }
- this._orderHashByExpirationRBTree.remove(orderHash);
- delete this._expiration[orderHash];
- }
- private _pruneExpiredOrders(callback: (orderHash: string) => void): void {
- const currentUnixTimestampMs = utils.getCurrentUnixTimestampMs();
- while (true) {
- const hasNoTrackedOrders = this._orderHashByExpirationRBTree.size === 0;
- if (hasNoTrackedOrders) {
- break;
- }
- const nextOrderHashToExpire = this._orderHashByExpirationRBTree.min();
- const hasNoExpiredOrders = this._expiration[nextOrderHashToExpire].isGreaterThan(
- currentUnixTimestampMs.plus(this._expirationMarginMs),
- );
- const isSubscriptionActive = _.isUndefined(this._orderExpirationCheckingIntervalIdIfExists);
- if (hasNoExpiredOrders || isSubscriptionActive) {
- break;
- }
- const orderHash = this._orderHashByExpirationRBTree.min();
- this._orderHashByExpirationRBTree.remove(orderHash);
- delete this._expiration[orderHash];
- callback(orderHash);
- }
- }
-}
diff --git a/packages/order-watcher/src/order_watcher/order_watcher.ts b/packages/order-watcher/src/order_watcher/order_watcher.ts
deleted file mode 100644
index a06fd0cfe..000000000
--- a/packages/order-watcher/src/order_watcher/order_watcher.ts
+++ /dev/null
@@ -1,490 +0,0 @@
-// tslint:disable:no-unnecessary-type-assertion
-import { ContractAddresses } from '@0x/contract-addresses';
-import * as artifacts from '@0x/contract-artifacts';
-import {
- AssetBalanceAndProxyAllowanceFetcher,
- ContractWrappers,
- ERC20TokenApprovalEventArgs,
- ERC20TokenEventArgs,
- ERC20TokenEvents,
- ERC20TokenTransferEventArgs,
- ERC721TokenApprovalEventArgs,
- ERC721TokenApprovalForAllEventArgs,
- ERC721TokenEventArgs,
- ERC721TokenEvents,
- ERC721TokenTransferEventArgs,
- ExchangeCancelEventArgs,
- ExchangeCancelUpToEventArgs,
- ExchangeEventArgs,
- ExchangeEvents,
- ExchangeFillEventArgs,
- OrderFilledCancelledFetcher,
- WETH9DepositEventArgs,
- WETH9EventArgs,
- WETH9Events,
- WETH9WithdrawalEventArgs,
-} from '@0x/contract-wrappers';
-import { schemas } from '@0x/json-schemas';
-import {
- assetDataUtils,
- BalanceAndProxyAllowanceLazyStore,
- OrderFilledCancelledLazyStore,
- orderHashUtils,
- OrderStateUtils,
-} from '@0x/order-utils';
-import { AssetProxyId, ExchangeContractErrs, OrderState, SignedOrder, Stats } from '@0x/types';
-import { errorUtils, intervalUtils } from '@0x/utils';
-import { BlockParamLiteral, LogEntryEvent, LogWithDecodedArgs, Provider } from 'ethereum-types';
-import * as _ from 'lodash';
-
-import { orderWatcherPartialConfigSchema } from '../schemas/order_watcher_partial_config_schema';
-import { OnOrderStateChangeCallback, OrderWatcherConfig, OrderWatcherError } from '../types';
-import { assert } from '../utils/assert';
-
-import { CollisionResistanceAbiDecoder } from './collision_resistant_abi_decoder';
-import { DependentOrderHashesTracker } from './dependent_order_hashes_tracker';
-import { EventWatcher } from './event_watcher';
-import { ExpirationWatcher } from './expiration_watcher';
-
-const MILLISECONDS_IN_A_SECOND = 1000;
-
-type ContractEventArgs = WETH9EventArgs | ExchangeEventArgs | ERC20TokenEventArgs | ERC721TokenEventArgs;
-
-interface OrderByOrderHash {
- [orderHash: string]: SignedOrder;
-}
-
-interface OrderStateByOrderHash {
- [orderHash: string]: OrderState;
-}
-
-const DEFAULT_ORDER_WATCHER_CONFIG: OrderWatcherConfig = {
- orderExpirationCheckingIntervalMs: 50,
- eventPollingIntervalMs: 200,
- expirationMarginMs: 0,
- // tslint:disable-next-line:custom-no-magic-numbers
- cleanupJobIntervalMs: 1000 * 60 * 60, // 1h
- isVerbose: true,
-};
-const STATE_LAYER = BlockParamLiteral.Latest;
-
-/**
- * This class includes all the functionality related to watching a set of orders
- * for potential changes in order validity/fillability. The orderWatcher notifies
- * the subscriber of these changes so that a final decision can be made on whether
- * the order should be deemed invalid.
- */
-export class OrderWatcher {
- private readonly _dependentOrderHashesTracker: DependentOrderHashesTracker;
- private readonly _orderStateByOrderHashCache: OrderStateByOrderHash = {};
- private readonly _orderByOrderHash: OrderByOrderHash = {};
- private readonly _eventWatcher: EventWatcher;
- private readonly _provider: Provider;
- private readonly _collisionResistantAbiDecoder: CollisionResistanceAbiDecoder;
- private readonly _expirationWatcher: ExpirationWatcher;
- private readonly _orderStateUtils: OrderStateUtils;
- private readonly _orderFilledCancelledLazyStore: OrderFilledCancelledLazyStore;
- private readonly _balanceAndProxyAllowanceLazyStore: BalanceAndProxyAllowanceLazyStore;
- private readonly _cleanupJobInterval: number;
- private _cleanupJobIntervalIdIfExists?: NodeJS.Timer;
- private _callbackIfExists?: OnOrderStateChangeCallback;
- /**
- * Instantiate a new OrderWatcher
- * @param provider Web3 provider to use for JSON RPC calls
- * @param networkId NetworkId to watch orders on
- * @param contractAddresses Optional contract addresses. Defaults to known
- * addresses based on networkId.
- * @param partialConfig Optional configurations
- */
- constructor(
- provider: Provider,
- networkId: number,
- contractAddresses?: ContractAddresses,
- partialConfig: Partial<OrderWatcherConfig> = DEFAULT_ORDER_WATCHER_CONFIG,
- ) {
- assert.isWeb3Provider('provider', provider);
- assert.isNumber('networkId', networkId);
- assert.doesConformToSchema('partialConfig', partialConfig, orderWatcherPartialConfigSchema);
- const config = {
- ...DEFAULT_ORDER_WATCHER_CONFIG,
- ...partialConfig,
- };
-
- this._provider = provider;
- this._collisionResistantAbiDecoder = new CollisionResistanceAbiDecoder(
- artifacts.ERC20Token.compilerOutput.abi,
- artifacts.ERC721Token.compilerOutput.abi,
- [artifacts.WETH9.compilerOutput.abi, artifacts.Exchange.compilerOutput.abi],
- );
- const contractWrappers = new ContractWrappers(provider, {
- networkId,
- // Note(albrow): We let the contract-wrappers package handle
- // default values for contractAddresses.
- contractAddresses,
- });
- this._eventWatcher = new EventWatcher(provider, config.eventPollingIntervalMs, STATE_LAYER, config.isVerbose);
- const balanceAndProxyAllowanceFetcher = new AssetBalanceAndProxyAllowanceFetcher(
- contractWrappers.erc20Token,
- contractWrappers.erc721Token,
- STATE_LAYER,
- );
- this._balanceAndProxyAllowanceLazyStore = new BalanceAndProxyAllowanceLazyStore(
- balanceAndProxyAllowanceFetcher,
- );
- const orderFilledCancelledFetcher = new OrderFilledCancelledFetcher(contractWrappers.exchange, STATE_LAYER);
- this._orderFilledCancelledLazyStore = new OrderFilledCancelledLazyStore(orderFilledCancelledFetcher);
- this._orderStateUtils = new OrderStateUtils(balanceAndProxyAllowanceFetcher, orderFilledCancelledFetcher);
- const expirationMarginIfExistsMs = _.isUndefined(config) ? undefined : config.expirationMarginMs;
- this._expirationWatcher = new ExpirationWatcher(
- expirationMarginIfExistsMs,
- config.orderExpirationCheckingIntervalMs,
- );
- this._cleanupJobInterval = config.cleanupJobIntervalMs;
- const zrxTokenAddress = assetDataUtils.decodeERC20AssetData(orderFilledCancelledFetcher.getZRXAssetData())
- .tokenAddress;
- this._dependentOrderHashesTracker = new DependentOrderHashesTracker(zrxTokenAddress);
- }
- /**
- * Add an order to the orderWatcher. Before the order is added, it's
- * signature is verified.
- * @param signedOrder The order you wish to start watching.
- */
- public async addOrderAsync(signedOrder: SignedOrder): Promise<void> {
- assert.doesConformToSchema('signedOrder', signedOrder, schemas.signedOrderSchema);
- const orderHash = orderHashUtils.getOrderHashHex(signedOrder);
- await assert.isValidSignatureAsync(this._provider, orderHash, signedOrder.signature, signedOrder.makerAddress);
-
- const expirationUnixTimestampMs = signedOrder.expirationTimeSeconds.times(MILLISECONDS_IN_A_SECOND);
- this._expirationWatcher.addOrder(orderHash, expirationUnixTimestampMs);
-
- this._orderByOrderHash[orderHash] = signedOrder;
- this._dependentOrderHashesTracker.addToDependentOrderHashes(signedOrder);
-
- const orderAssetDatas = [signedOrder.makerAssetData, signedOrder.takerAssetData];
- _.each(orderAssetDatas, assetData => this._addAssetDataToAbiDecoder(assetData));
- }
- /**
- * Removes an order from the orderWatcher
- * @param orderHash The orderHash of the order you wish to stop watching.
- */
- public removeOrder(orderHash: string): void {
- assert.doesConformToSchema('orderHash', orderHash, schemas.orderHashSchema);
- const signedOrder = this._orderByOrderHash[orderHash];
- if (_.isUndefined(signedOrder)) {
- return; // noop
- }
- this._dependentOrderHashesTracker.removeFromDependentOrderHashes(signedOrder);
- delete this._orderByOrderHash[orderHash];
- this._expirationWatcher.removeOrder(orderHash);
- delete this._orderStateByOrderHashCache[orderHash];
- }
- /**
- * Starts an orderWatcher subscription. The callback will be called every time a watched order's
- * backing blockchain state has changed. This is a call-to-action for the caller to re-validate the order.
- * @param callback Receives the orderHash of the order that should be re-validated, together
- * with all the order-relevant blockchain state needed to re-validate the order.
- */
- public subscribe(callback: OnOrderStateChangeCallback): void {
- assert.isFunction('callback', callback);
- if (!_.isUndefined(this._callbackIfExists)) {
- throw new Error(OrderWatcherError.SubscriptionAlreadyPresent);
- }
- this._callbackIfExists = callback;
- this._eventWatcher.subscribe(this._onEventWatcherCallbackAsync.bind(this));
- this._expirationWatcher.subscribe(this._onOrderExpired.bind(this));
- this._cleanupJobIntervalIdIfExists = intervalUtils.setAsyncExcludingInterval(
- this._cleanupAsync.bind(this),
- this._cleanupJobInterval,
- (err: Error) => {
- this.unsubscribe();
- callback(err);
- },
- );
- }
- /**
- * Ends an orderWatcher subscription.
- */
- public unsubscribe(): void {
- if (_.isUndefined(this._callbackIfExists) || _.isUndefined(this._cleanupJobIntervalIdIfExists)) {
- throw new Error(OrderWatcherError.SubscriptionNotFound);
- }
- this._balanceAndProxyAllowanceLazyStore.deleteAll();
- this._orderFilledCancelledLazyStore.deleteAll();
- delete this._callbackIfExists;
- this._eventWatcher.unsubscribe();
- this._expirationWatcher.unsubscribe();
- intervalUtils.clearAsyncExcludingInterval(this._cleanupJobIntervalIdIfExists);
- }
- /**
- * Gets statistics of the OrderWatcher Instance.
- */
- public getStats(): Stats {
- return {
- orderCount: _.size(this._orderByOrderHash),
- };
- }
- private async _cleanupAsync(): Promise<void> {
- for (const orderHash of _.keys(this._orderByOrderHash)) {
- this._cleanupOrderRelatedState(orderHash);
- await this._emitRevalidateOrdersAsync([orderHash]);
- }
- }
- private _addAssetDataToAbiDecoder(assetData: string): void {
- const decodedAssetData = assetDataUtils.decodeAssetDataOrThrow(assetData);
- if (assetDataUtils.isERC20AssetData(decodedAssetData)) {
- this._collisionResistantAbiDecoder.addERC20Token(decodedAssetData.tokenAddress);
- } else if (assetDataUtils.isERC721AssetData(decodedAssetData)) {
- this._collisionResistantAbiDecoder.addERC721Token(decodedAssetData.tokenAddress);
- } else if (assetDataUtils.isMultiAssetData(decodedAssetData)) {
- _.each(decodedAssetData.nestedAssetData, nestedAssetDataElement =>
- this._addAssetDataToAbiDecoder(nestedAssetDataElement),
- );
- }
- }
- private _deleteLazyStoreBalance(assetData: string, userAddress: string): void {
- const assetProxyId = assetDataUtils.decodeAssetProxyId(assetData);
- switch (assetProxyId) {
- case AssetProxyId.ERC20:
- case AssetProxyId.ERC721:
- this._balanceAndProxyAllowanceLazyStore.deleteBalance(assetData, userAddress);
- break;
- case AssetProxyId.MultiAsset:
- const decodedAssetData = assetDataUtils.decodeMultiAssetData(assetData);
- _.each(decodedAssetData.nestedAssetData, nestedAssetDataElement =>
- this._deleteLazyStoreBalance(nestedAssetDataElement, userAddress),
- );
- break;
- default:
- break;
- }
- }
- private _deleteLazyStoreProxyAllowance(assetData: string, userAddress: string): void {
- const assetProxyId = assetDataUtils.decodeAssetProxyId(assetData);
- switch (assetProxyId) {
- case AssetProxyId.ERC20:
- case AssetProxyId.ERC721:
- this._balanceAndProxyAllowanceLazyStore.deleteProxyAllowance(assetData, userAddress);
- break;
- case AssetProxyId.MultiAsset:
- const decodedAssetData = assetDataUtils.decodeMultiAssetData(assetData);
- _.each(decodedAssetData.nestedAssetData, nestedAssetDataElement =>
- this._deleteLazyStoreProxyAllowance(nestedAssetDataElement, userAddress),
- );
- break;
- default:
- break;
- }
- }
- private _cleanupOrderRelatedState(orderHash: string): void {
- const signedOrder = this._orderByOrderHash[orderHash];
-
- this._orderFilledCancelledLazyStore.deleteFilledTakerAmount(orderHash);
- this._orderFilledCancelledLazyStore.deleteIsCancelled(orderHash);
-
- this._deleteLazyStoreBalance(signedOrder.makerAssetData, signedOrder.makerAddress);
- this._deleteLazyStoreProxyAllowance(signedOrder.makerAssetData, signedOrder.makerAddress);
- this._deleteLazyStoreBalance(signedOrder.takerAssetData, signedOrder.takerAddress);
- this._deleteLazyStoreProxyAllowance(signedOrder.takerAssetData, signedOrder.takerAddress);
-
- const zrxAssetData = this._orderFilledCancelledLazyStore.getZRXAssetData();
- if (!signedOrder.makerFee.isZero()) {
- this._deleteLazyStoreBalance(zrxAssetData, signedOrder.makerAddress);
- this._deleteLazyStoreProxyAllowance(zrxAssetData, signedOrder.makerAddress);
- }
- if (!signedOrder.takerFee.isZero()) {
- this._deleteLazyStoreBalance(zrxAssetData, signedOrder.takerAddress);
- this._deleteLazyStoreProxyAllowance(zrxAssetData, signedOrder.takerAddress);
- }
- }
- private _onOrderExpired(orderHash: string): void {
- const orderState: OrderState = {
- isValid: false,
- orderHash,
- error: ExchangeContractErrs.OrderFillExpired,
- };
- if (!_.isUndefined(this._orderByOrderHash[orderHash])) {
- this.removeOrder(orderHash);
- if (!_.isUndefined(this._callbackIfExists)) {
- this._callbackIfExists(null, orderState);
- }
- }
- }
- private async _onEventWatcherCallbackAsync(err: Error | null, logIfExists?: LogEntryEvent): Promise<void> {
- if (!_.isNull(err)) {
- if (!_.isUndefined(this._callbackIfExists)) {
- this._callbackIfExists(err);
- }
- return;
- }
- const maybeDecodedLog = this._collisionResistantAbiDecoder.tryToDecodeLogOrNoop<ContractEventArgs>(
- // At this moment we are sure that no error occured and log is defined.
- logIfExists as LogEntryEvent,
- );
- const isLogDecoded = !_.isUndefined(((maybeDecodedLog as any) as LogWithDecodedArgs<ContractEventArgs>).event);
- if (!isLogDecoded) {
- return; // noop
- }
- const decodedLog = (maybeDecodedLog as any) as LogWithDecodedArgs<ContractEventArgs>;
- const transactionHash = decodedLog.transactionHash;
- switch (decodedLog.event) {
- case ERC20TokenEvents.Approval:
- case ERC721TokenEvents.Approval: {
- // ERC20 and ERC721 Transfer events have the same name so we need to distinguish them by args
- if (!_.isUndefined(decodedLog.args._value)) {
- // ERC20
- // Invalidate cache
- const args = decodedLog.args as ERC20TokenApprovalEventArgs;
- const tokenAssetData = assetDataUtils.encodeERC20AssetData(decodedLog.address);
- this._deleteLazyStoreProxyAllowance(tokenAssetData, args._owner);
- // Revalidate orders
- const orderHashes = this._dependentOrderHashesTracker.getDependentOrderHashesByAssetDataByMaker(
- args._owner,
- tokenAssetData,
- );
- await this._emitRevalidateOrdersAsync(orderHashes, transactionHash);
- break;
- } else {
- // ERC721
- // Invalidate cache
- const args = decodedLog.args as ERC721TokenApprovalEventArgs;
- const tokenAssetData = assetDataUtils.encodeERC721AssetData(decodedLog.address, args._tokenId);
- this._deleteLazyStoreProxyAllowance(tokenAssetData, args._owner);
- // Revalidate orders
- const orderHashes = this._dependentOrderHashesTracker.getDependentOrderHashesByAssetDataByMaker(
- args._owner,
- tokenAssetData,
- );
- await this._emitRevalidateOrdersAsync(orderHashes, transactionHash);
- break;
- }
- }
- case ERC20TokenEvents.Transfer:
- case ERC721TokenEvents.Transfer: {
- // ERC20 and ERC721 Transfer events have the same name so we need to distinguish them by args
- if (!_.isUndefined(decodedLog.args._value)) {
- // ERC20
- // Invalidate cache
- const args = decodedLog.args as ERC20TokenTransferEventArgs;
- const tokenAssetData = assetDataUtils.encodeERC20AssetData(decodedLog.address);
- this._deleteLazyStoreBalance(tokenAssetData, args._from);
- this._deleteLazyStoreBalance(tokenAssetData, args._to);
- // Revalidate orders
- const orderHashes = this._dependentOrderHashesTracker.getDependentOrderHashesByAssetDataByMaker(
- args._from,
- tokenAssetData,
- );
- await this._emitRevalidateOrdersAsync(orderHashes, transactionHash);
- break;
- } else {
- // ERC721
- // Invalidate cache
- const args = decodedLog.args as ERC721TokenTransferEventArgs;
- const tokenAssetData = assetDataUtils.encodeERC721AssetData(decodedLog.address, args._tokenId);
- this._deleteLazyStoreBalance(tokenAssetData, args._from);
- this._deleteLazyStoreBalance(tokenAssetData, args._to);
- // Revalidate orders
- const orderHashes = this._dependentOrderHashesTracker.getDependentOrderHashesByAssetDataByMaker(
- args._from,
- tokenAssetData,
- );
- await this._emitRevalidateOrdersAsync(orderHashes, transactionHash);
- break;
- }
- }
- case ERC721TokenEvents.ApprovalForAll: {
- // Invalidate cache
- const args = decodedLog.args as ERC721TokenApprovalForAllEventArgs;
- const tokenAddress = decodedLog.address;
- this._balanceAndProxyAllowanceLazyStore.deleteAllERC721ProxyAllowance(tokenAddress, args._owner);
- // Revalidate orders
- const orderHashes = this._dependentOrderHashesTracker.getDependentOrderHashesByERC721ByMaker(
- args._owner,
- tokenAddress,
- );
- await this._emitRevalidateOrdersAsync(orderHashes, transactionHash);
- break;
- }
- case WETH9Events.Deposit: {
- // Invalidate cache
- const args = decodedLog.args as WETH9DepositEventArgs;
- const tokenAssetData = assetDataUtils.encodeERC20AssetData(decodedLog.address);
- this._deleteLazyStoreBalance(tokenAssetData, args._owner);
- // Revalidate orders
- const orderHashes = this._dependentOrderHashesTracker.getDependentOrderHashesByAssetDataByMaker(
- args._owner,
- tokenAssetData,
- );
- await this._emitRevalidateOrdersAsync(orderHashes, transactionHash);
- break;
- }
- case WETH9Events.Withdrawal: {
- // Invalidate cache
- const args = decodedLog.args as WETH9WithdrawalEventArgs;
- const tokenAssetData = assetDataUtils.encodeERC20AssetData(decodedLog.address);
- this._deleteLazyStoreBalance(tokenAssetData, args._owner);
- // Revalidate orders
- const orderHashes = this._dependentOrderHashesTracker.getDependentOrderHashesByAssetDataByMaker(
- args._owner,
- tokenAssetData,
- );
- await this._emitRevalidateOrdersAsync(orderHashes, transactionHash);
- break;
- }
- case ExchangeEvents.Fill: {
- // Invalidate cache
- const args = decodedLog.args as ExchangeFillEventArgs;
- this._orderFilledCancelledLazyStore.deleteFilledTakerAmount(args.orderHash);
- // Revalidate orders
- const orderHash = args.orderHash;
- const isOrderWatched = !_.isUndefined(this._orderByOrderHash[orderHash]);
- if (isOrderWatched) {
- await this._emitRevalidateOrdersAsync([orderHash], transactionHash);
- }
- break;
- }
- case ExchangeEvents.Cancel: {
- // Invalidate cache
- const args = decodedLog.args as ExchangeCancelEventArgs;
- this._orderFilledCancelledLazyStore.deleteIsCancelled(args.orderHash);
- // Revalidate orders
- const orderHash = args.orderHash;
- const isOrderWatched = !_.isUndefined(this._orderByOrderHash[orderHash]);
- if (isOrderWatched) {
- await this._emitRevalidateOrdersAsync([orderHash], transactionHash);
- }
- break;
- }
- case ExchangeEvents.CancelUpTo: {
- // TODO(logvinov): Do it smarter and actually look at the salt and order epoch
- // Invalidate cache
- const args = decodedLog.args as ExchangeCancelUpToEventArgs;
- this._orderFilledCancelledLazyStore.deleteAllIsCancelled();
- // Revalidate orders
- const orderHashes = this._dependentOrderHashesTracker.getDependentOrderHashesByMaker(args.makerAddress);
- await this._emitRevalidateOrdersAsync(orderHashes, transactionHash);
- break;
- }
-
- default:
- throw errorUtils.spawnSwitchErr('decodedLog.event', decodedLog.event);
- }
- }
- private async _emitRevalidateOrdersAsync(orderHashes: string[], transactionHash?: string): Promise<void> {
- for (const orderHash of orderHashes) {
- const signedOrder = this._orderByOrderHash[orderHash];
- // Most of these calls will never reach the network because the data is fetched from stores
- // and only updated when cache is invalidated
- const orderState = await this._orderStateUtils.getOpenOrderStateAsync(signedOrder, transactionHash);
- if (_.isUndefined(this._callbackIfExists)) {
- break; // Unsubscribe was called
- }
- if (_.isEqual(orderState, this._orderStateByOrderHashCache[orderHash])) {
- // Actual order state didn't change
- continue;
- } else {
- this._orderStateByOrderHashCache[orderHash] = orderState;
- }
- this._callbackIfExists(null, orderState);
- }
- }
-}
diff --git a/packages/order-watcher/src/order_watcher/order_watcher_web_socket_server.ts b/packages/order-watcher/src/order_watcher/order_watcher_web_socket_server.ts
deleted file mode 100644
index b75b07603..000000000
--- a/packages/order-watcher/src/order_watcher/order_watcher_web_socket_server.ts
+++ /dev/null
@@ -1,200 +0,0 @@
-import { ContractAddresses } from '@0x/contract-addresses';
-import { schemas } from '@0x/json-schemas';
-import { OrderStateInvalid, OrderStateValid, SignedOrder } from '@0x/types';
-import { BigNumber, logUtils } from '@0x/utils';
-import { Provider } from 'ethereum-types';
-import * as http from 'http';
-import * as WebSocket from 'websocket';
-
-import { GetStatsResult, OrderWatcherConfig, OrderWatcherMethod, WebSocketRequest, WebSocketResponse } from '../types';
-import { assert } from '../utils/assert';
-
-import { OrderWatcher } from './order_watcher';
-
-const DEFAULT_HTTP_PORT = 8080;
-const JSON_RPC_VERSION = '2.0';
-
-// Wraps the OrderWatcher functionality in a WebSocket server. Motivations:
-// 1) Users can watch orders via non-typescript programs.
-// 2) Better encapsulation so that users can work
-export class OrderWatcherWebSocketServer {
- private readonly _orderWatcher: OrderWatcher;
- private readonly _httpServer: http.Server;
- private readonly _connectionStore: Set<WebSocket.connection>;
- private readonly _wsServer: WebSocket.server;
- private readonly _isVerbose: boolean;
- /**
- * Recover types lost when the payload is stringified.
- */
- private static _parseSignedOrder(rawRequest: any): SignedOrder {
- const bigNumberFields = [
- 'salt',
- 'makerFee',
- 'takerFee',
- 'makerAssetAmount',
- 'takerAssetAmount',
- 'expirationTimeSeconds',
- ];
- for (const field of bigNumberFields) {
- rawRequest[field] = new BigNumber(rawRequest[field]);
- }
- return rawRequest;
- }
-
- /**
- * Instantiate a new WebSocket server which provides OrderWatcher functionality
- * @param provider Web3 provider to use for JSON RPC calls.
- * @param networkId NetworkId to watch orders on.
- * @param contractAddresses Optional contract addresses. Defaults to known
- * addresses based on networkId.
- * @param orderWatcherConfig OrderWatcher configurations. isVerbose sets the verbosity for the WebSocket server aswell.
- * @param isVerbose Whether to enable verbose logging. Defaults to true.
- */
- constructor(
- provider: Provider,
- networkId: number,
- contractAddresses?: ContractAddresses,
- orderWatcherConfig?: Partial<OrderWatcherConfig>,
- ) {
- this._isVerbose =
- orderWatcherConfig !== undefined && orderWatcherConfig.isVerbose !== undefined
- ? orderWatcherConfig.isVerbose
- : true;
- this._orderWatcher = new OrderWatcher(provider, networkId, contractAddresses, orderWatcherConfig);
- this._connectionStore = new Set();
- this._httpServer = http.createServer();
- this._wsServer = new WebSocket.server({
- httpServer: this._httpServer,
- // Avoid setting autoAcceptConnections to true as it defeats all
- // standard cross-origin protection facilities built into the protocol
- // and the browser.
- // Source: https://www.npmjs.com/package/websocket#server-example
- // Also ensures that a request event is emitted by
- // the server whenever a new WebSocket request is made.
- autoAcceptConnections: false,
- });
-
- this._wsServer.on('request', async (request: any) => {
- // Designed for usage pattern where client and server are run on the same
- // machine by the same user. As such, no security checks are in place.
- const connection: WebSocket.connection = request.accept(null, request.origin);
- this._log(`${new Date()} [Server] Accepted connection from origin ${request.origin}.`);
- connection.on('message', this._onMessageCallbackAsync.bind(this, connection));
- connection.on('close', this._onCloseCallback.bind(this, connection));
- this._connectionStore.add(connection);
- });
- }
-
- /**
- * Activates the WebSocket server by subscribing to the OrderWatcher and
- * starting the WebSocket's HTTP server
- */
- public start(): void {
- // Have the WebSocket server subscribe to the OrderWatcher to receive updates.
- // These updates are then broadcast to clients in the _connectionStore.
- this._orderWatcher.subscribe(this._broadcastCallback.bind(this));
-
- const port = process.env.ORDER_WATCHER_HTTP_PORT || DEFAULT_HTTP_PORT;
- this._httpServer.listen(port, () => {
- this._log(`${new Date()} [Server] Listening on port ${port}`);
- });
- }
-
- /**
- * Deactivates the WebSocket server by stopping the HTTP server from accepting
- * new connections and unsubscribing from the OrderWatcher
- */
- public stop(): void {
- this._httpServer.close();
- this._orderWatcher.unsubscribe();
- }
-
- private _log(...args: any[]): void {
- if (this._isVerbose) {
- logUtils.log(...args);
- }
- }
-
- private async _onMessageCallbackAsync(connection: WebSocket.connection, message: any): Promise<void> {
- let response: WebSocketResponse;
- let id: number | null = null;
- try {
- assert.doesConformToSchema('message', message, schemas.orderWatcherWebSocketUtf8MessageSchema);
- const request: WebSocketRequest = JSON.parse(message.utf8Data);
- id = request.id;
- assert.doesConformToSchema('request', request, schemas.orderWatcherWebSocketRequestSchema);
- assert.isString(request.jsonrpc, JSON_RPC_VERSION);
- response = {
- id,
- jsonrpc: JSON_RPC_VERSION,
- method: request.method,
- result: await this._routeRequestAsync(request),
- };
- } catch (err) {
- response = {
- id,
- jsonrpc: JSON_RPC_VERSION,
- method: null,
- error: err.toString(),
- };
- }
- this._log(`${new Date()} [Server] OrderWatcher output: ${JSON.stringify(response)}`);
- connection.sendUTF(JSON.stringify(response));
- }
-
- private _onCloseCallback(connection: WebSocket.connection): void {
- this._connectionStore.delete(connection);
- this._log(`${new Date()} [Server] Client ${connection.remoteAddress} disconnected.`);
- }
-
- private async _routeRequestAsync(request: WebSocketRequest): Promise<GetStatsResult | undefined> {
- this._log(`${new Date()} [Server] Request received: ${request.method}`);
- switch (request.method) {
- case OrderWatcherMethod.AddOrder: {
- const signedOrder: SignedOrder = OrderWatcherWebSocketServer._parseSignedOrder(
- request.params.signedOrder,
- );
- await this._orderWatcher.addOrderAsync(signedOrder);
- break;
- }
- case OrderWatcherMethod.RemoveOrder: {
- this._orderWatcher.removeOrder(request.params.orderHash || 'undefined');
- break;
- }
- case OrderWatcherMethod.GetStats: {
- return this._orderWatcher.getStats();
- }
- default:
- // Should never reach here. Should be caught by JSON schema check.
- throw new Error(`Unexpected default case hit for request.method`);
- }
- return undefined;
- }
-
- /**
- * Broadcasts OrderState changes to ALL connected clients. At the moment,
- * we do not support clients subscribing to only a subset of orders. As such,
- * Client B will be notified of changes to an order that Client A added.
- */
- private _broadcastCallback(err: Error | null, orderState?: OrderStateValid | OrderStateInvalid | undefined): void {
- const method = OrderWatcherMethod.Update;
- const response =
- err === null
- ? {
- jsonrpc: JSON_RPC_VERSION,
- method,
- result: orderState,
- }
- : {
- jsonrpc: JSON_RPC_VERSION,
- method,
- error: {
- code: -32000,
- message: err.message,
- },
- };
- this._connectionStore.forEach((connection: WebSocket.connection) => {
- connection.sendUTF(JSON.stringify(response));
- });
- }
-}