diff options
author | Fabio Berger <me@fabioberger.com> | 2017-11-13 11:17:18 +0800 |
---|---|---|
committer | Fabio Berger <me@fabioberger.com> | 2017-11-13 11:17:18 +0800 |
commit | c4ee2d73865a1444c079b9e2836b7630a0adf03e (patch) | |
tree | b9c7794e7022fb189675d914f5fe58dcabd67dec /src/order_watcher | |
parent | a74ec0effa818a86233fe64cb0dad2c61bbb4bb6 (diff) | |
download | dexon-sol-tools-c4ee2d73865a1444c079b9e2836b7630a0adf03e.tar dexon-sol-tools-c4ee2d73865a1444c079b9e2836b7630a0adf03e.tar.gz dexon-sol-tools-c4ee2d73865a1444c079b9e2836b7630a0adf03e.tar.bz2 dexon-sol-tools-c4ee2d73865a1444c079b9e2836b7630a0adf03e.tar.lz dexon-sol-tools-c4ee2d73865a1444c079b9e2836b7630a0adf03e.tar.xz dexon-sol-tools-c4ee2d73865a1444c079b9e2836b7630a0adf03e.tar.zst dexon-sol-tools-c4ee2d73865a1444c079b9e2836b7630a0adf03e.zip |
Switch over to Lerna + Yarn Workspaces setup for a mono-repo approach
Diffstat (limited to 'src/order_watcher')
-rw-r--r-- | src/order_watcher/event_watcher.ts | 88 | ||||
-rw-r--r-- | src/order_watcher/order_state_watcher.ts | 232 |
2 files changed, 0 insertions, 320 deletions
diff --git a/src/order_watcher/event_watcher.ts b/src/order_watcher/event_watcher.ts deleted file mode 100644 index 81529a98c..000000000 --- a/src/order_watcher/event_watcher.ts +++ /dev/null @@ -1,88 +0,0 @@ -import * as Web3 from 'web3'; -import * as _ from 'lodash'; -import {Web3Wrapper} from '../web3_wrapper'; -import { - BlockParamLiteral, - EventCallback, - EventWatcherCallback, - ZeroExError, -} from '../types'; -import {AbiDecoder} from '../utils/abi_decoder'; -import {intervalUtils} from '../utils/interval_utils'; -import {assert} from '../utils/assert'; -import {utils} from '../utils/utils'; - -const DEFAULT_EVENT_POLLING_INTERVAL = 200; - -enum LogEventState { - Removed, - Added, -} - -/* - * The EventWatcher watches for blockchain events at the specified block confirmation - * depth. - */ -export class EventWatcher { - private _web3Wrapper: Web3Wrapper; - private _pollingIntervalMs: number; - private _intervalIdIfExists?: NodeJS.Timer; - private _lastEvents: Web3.LogEntry[] = []; - constructor(web3Wrapper: Web3Wrapper, pollingIntervalMs: undefined|number) { - this._web3Wrapper = web3Wrapper; - this._pollingIntervalMs = _.isUndefined(pollingIntervalMs) ? - DEFAULT_EVENT_POLLING_INTERVAL : - pollingIntervalMs; - } - public subscribe(callback: EventWatcherCallback): void { - assert.isFunction('callback', callback); - if (!_.isUndefined(this._intervalIdIfExists)) { - throw new Error(ZeroExError.SubscriptionAlreadyPresent); - } - this._intervalIdIfExists = intervalUtils.setAsyncExcludingInterval( - this._pollForBlockchainEventsAsync.bind(this, callback), this._pollingIntervalMs, - ); - } - public unsubscribe(): void { - this._lastEvents = []; - if (!_.isUndefined(this._intervalIdIfExists)) { - intervalUtils.clearAsyncExcludingInterval(this._intervalIdIfExists); - delete this._intervalIdIfExists; - } - } - private async _pollForBlockchainEventsAsync(callback: EventWatcherCallback): Promise<void> { - const pendingEvents = await this._getEventsAsync(); - if (pendingEvents.length === 0) { - // HACK: Sometimes when node rebuilds the pending block we get back the empty result. - // We don't want to emit a lot of removal events and bring them back after a couple of miliseconds, - // that's why we just ignore those cases. - return; - } - const removedEvents = _.differenceBy(this._lastEvents, pendingEvents, JSON.stringify); - const newEvents = _.differenceBy(pendingEvents, this._lastEvents, JSON.stringify); - await this._emitDifferencesAsync(removedEvents, LogEventState.Removed, callback); - await this._emitDifferencesAsync(newEvents, LogEventState.Added, callback); - this._lastEvents = pendingEvents; - } - private async _getEventsAsync(): Promise<Web3.LogEntry[]> { - const eventFilter = { - fromBlock: BlockParamLiteral.Pending, - toBlock: BlockParamLiteral.Pending, - }; - const events = await this._web3Wrapper.getLogsAsync(eventFilter); - return events; - } - private async _emitDifferencesAsync( - logs: Web3.LogEntry[], logEventState: LogEventState, callback: EventWatcherCallback, - ): Promise<void> { - for (const log of logs) { - const logEvent = { - removed: logEventState === LogEventState.Removed, - ...log, - }; - if (!_.isUndefined(this._intervalIdIfExists)) { - await callback(logEvent); - } - } - } -} diff --git a/src/order_watcher/order_state_watcher.ts b/src/order_watcher/order_state_watcher.ts deleted file mode 100644 index 139f13fdf..000000000 --- a/src/order_watcher/order_state_watcher.ts +++ /dev/null @@ -1,232 +0,0 @@ -import * as _ from 'lodash'; -import {schemas} from '0x-json-schemas'; -import {ZeroEx} from '../0x'; -import {EventWatcher} from './event_watcher'; -import {assert} from '../utils/assert'; -import {utils} from '../utils/utils'; -import {artifacts} from '../artifacts'; -import {AbiDecoder} from '../utils/abi_decoder'; -import {OrderStateUtils} from '../utils/order_state_utils'; -import { - LogEvent, - OrderState, - SignedOrder, - Web3Provider, - BlockParamLiteral, - LogWithDecodedArgs, - ContractEventArgs, - OnOrderStateChangeCallback, - OrderStateWatcherConfig, - ApprovalContractEventArgs, - TransferContractEventArgs, - LogFillContractEventArgs, - LogCancelContractEventArgs, - ExchangeEvents, - TokenEvents, - ZeroExError, -} from '../types'; -import {Web3Wrapper} from '../web3_wrapper'; -import {TokenWrapper} from '../contract_wrappers/token_wrapper'; -import {ExchangeWrapper} from '../contract_wrappers/exchange_wrapper'; -import {OrderFilledCancelledLazyStore} from '../stores/order_filled_cancelled_lazy_store'; -import {BalanceAndProxyAllowanceLazyStore} from '../stores/balance_proxy_allowance_lazy_store'; - -const DEFAULT_NUM_CONFIRMATIONS = 0; - -interface DependentOrderHashes { - [makerAddress: string]: { - [makerToken: string]: Set<string>, - }; -} - -interface OrderByOrderHash { - [orderHash: string]: SignedOrder; -} - -/** - * This class includes all the functionality related to watching a set of orders - * for potential changes in order validity/fillability. The orderWatcher notifies - * the subscriber of these changes so that a final decison can be made on whether - * the order should be deemed invalid. - */ -export class OrderStateWatcher { - private _orderByOrderHash: OrderByOrderHash = {}; - private _dependentOrderHashes: DependentOrderHashes = {}; - private _callbackIfExistsAsync?: OnOrderStateChangeCallback; - private _eventWatcher: EventWatcher; - private _web3Wrapper: Web3Wrapper; - private _abiDecoder: AbiDecoder; - private _orderStateUtils: OrderStateUtils; - private _orderFilledCancelledLazyStore: OrderFilledCancelledLazyStore; - private _balanceAndProxyAllowanceLazyStore: BalanceAndProxyAllowanceLazyStore; - constructor( - web3Wrapper: Web3Wrapper, abiDecoder: AbiDecoder, token: TokenWrapper, exchange: ExchangeWrapper, - config?: OrderStateWatcherConfig, - ) { - this._abiDecoder = abiDecoder; - this._web3Wrapper = web3Wrapper; - const eventPollingIntervalMs = _.isUndefined(config) ? undefined : config.eventPollingIntervalMs; - this._eventWatcher = new EventWatcher(web3Wrapper, eventPollingIntervalMs); - this._balanceAndProxyAllowanceLazyStore = new BalanceAndProxyAllowanceLazyStore(token); - this._orderFilledCancelledLazyStore = new OrderFilledCancelledLazyStore(exchange); - this._orderStateUtils = new OrderStateUtils( - this._balanceAndProxyAllowanceLazyStore, this._orderFilledCancelledLazyStore, - ); - } - /** - * Add an order to the orderStateWatcher. Before the order is added, it's - * signature is verified. - * @param signedOrder The order you wish to start watching. - */ - public addOrder(signedOrder: SignedOrder): void { - assert.doesConformToSchema('signedOrder', signedOrder, schemas.signedOrderSchema); - const orderHash = ZeroEx.getOrderHashHex(signedOrder); - assert.isValidSignature(orderHash, signedOrder.ecSignature, signedOrder.maker); - this._orderByOrderHash[orderHash] = signedOrder; - this.addToDependentOrderHashes(signedOrder, orderHash); - } - /** - * Removes an order from the orderStateWatcher - * @param orderHash The orderHash of the order you wish to stop watching. - */ - public removeOrder(orderHash: string): void { - assert.doesConformToSchema('orderHash', orderHash, schemas.orderHashSchema); - const signedOrder = this._orderByOrderHash[orderHash]; - if (_.isUndefined(signedOrder)) { - return; // noop - } - delete this._orderByOrderHash[orderHash]; - this.removeFromDependentOrderHashes(signedOrder.maker, signedOrder.makerTokenAddress, orderHash); - } - /** - * Starts an orderStateWatcher subscription. The callback will be called every time a watched order's - * backing blockchain state has changed. This is a call-to-action for the caller to re-validate the order. - * @param callback Receives the orderHash of the order that should be re-validated, together - * with all the order-relevant blockchain state needed to re-validate the order. - */ - public subscribe(callback: OnOrderStateChangeCallback): void { - assert.isFunction('callback', callback); - if (!_.isUndefined(this._callbackIfExistsAsync)) { - throw new Error(ZeroExError.SubscriptionAlreadyPresent); - } - this._callbackIfExistsAsync = callback; - this._eventWatcher.subscribe(this._onEventWatcherCallbackAsync.bind(this)); - } - /** - * Ends an orderStateWatcher subscription. - */ - public unsubscribe(): void { - if (_.isUndefined(this._callbackIfExistsAsync)) { - throw new Error(ZeroExError.SubscriptionNotFound); - } - this._balanceAndProxyAllowanceLazyStore.deleteAll(); - this._orderFilledCancelledLazyStore.deleteAll(); - delete this._callbackIfExistsAsync; - this._eventWatcher.unsubscribe(); - } - private async _onEventWatcherCallbackAsync(log: LogEvent): Promise<void> { - const maybeDecodedLog = this._abiDecoder.tryToDecodeLogOrNoop(log); - const isLogDecoded = !_.isUndefined((maybeDecodedLog as LogWithDecodedArgs<any>).event); - if (!isLogDecoded) { - return; // noop - } - const decodedLog = maybeDecodedLog as LogWithDecodedArgs<ContractEventArgs>; - let makerToken: string; - let makerAddress: string; - let orderHashesSet: Set<string>; - switch (decodedLog.event) { - case TokenEvents.Approval: - { - // Invalidate cache - const args = decodedLog.args as ApprovalContractEventArgs; - this._balanceAndProxyAllowanceLazyStore.deleteProxyAllowance(decodedLog.address, args._owner); - // Revalidate orders - makerToken = decodedLog.address; - makerAddress = args._owner; - orderHashesSet = _.get(this._dependentOrderHashes, [makerAddress, makerToken]); - if (!_.isUndefined(orderHashesSet)) { - const orderHashes = Array.from(orderHashesSet); - await this._emitRevalidateOrdersAsync(orderHashes); - } - break; - } - case TokenEvents.Transfer: - { - // Invalidate cache - const args = decodedLog.args as TransferContractEventArgs; - this._balanceAndProxyAllowanceLazyStore.deleteBalance(decodedLog.address, args._from); - this._balanceAndProxyAllowanceLazyStore.deleteBalance(decodedLog.address, args._to); - // Revalidate orders - makerToken = decodedLog.address; - makerAddress = args._from; - orderHashesSet = _.get(this._dependentOrderHashes, [makerAddress, makerToken]); - if (!_.isUndefined(orderHashesSet)) { - const orderHashes = Array.from(orderHashesSet); - await this._emitRevalidateOrdersAsync(orderHashes); - } - break; - } - case ExchangeEvents.LogFill: - { - // Invalidate cache - const args = decodedLog.args as LogFillContractEventArgs; - this._orderFilledCancelledLazyStore.deleteFilledTakerAmount(args.orderHash); - // Revalidate orders - const orderHash = args.orderHash; - const isOrderWatched = !_.isUndefined(this._orderByOrderHash[orderHash]); - if (isOrderWatched) { - await this._emitRevalidateOrdersAsync([orderHash]); - } - break; - } - case ExchangeEvents.LogCancel: - { - // Invalidate cache - const args = decodedLog.args as LogCancelContractEventArgs; - this._orderFilledCancelledLazyStore.deleteCancelledTakerAmount(args.orderHash); - // Revalidate orders - const orderHash = args.orderHash; - const isOrderWatched = !_.isUndefined(this._orderByOrderHash[orderHash]); - if (isOrderWatched) { - await this._emitRevalidateOrdersAsync([orderHash]); - } - break; - } - case ExchangeEvents.LogError: - return; // noop - - default: - throw utils.spawnSwitchErr('decodedLog.event', decodedLog.event); - } - } - private async _emitRevalidateOrdersAsync(orderHashes: string[]): Promise<void> { - for (const orderHash of orderHashes) { - const signedOrder = this._orderByOrderHash[orderHash] as SignedOrder; - // Most of these calls will never reach the network because the data is fetched from stores - // and only updated when cache is invalidated - const orderState = await this._orderStateUtils.getOrderStateAsync(signedOrder); - if (_.isUndefined(this._callbackIfExistsAsync)) { - break; // Unsubscribe was called - } - await this._callbackIfExistsAsync(orderState); - } - } - private addToDependentOrderHashes(signedOrder: SignedOrder, orderHash: string) { - if (_.isUndefined(this._dependentOrderHashes[signedOrder.maker])) { - this._dependentOrderHashes[signedOrder.maker] = {}; - } - if (_.isUndefined(this._dependentOrderHashes[signedOrder.maker][signedOrder.makerTokenAddress])) { - this._dependentOrderHashes[signedOrder.maker][signedOrder.makerTokenAddress] = new Set(); - } - this._dependentOrderHashes[signedOrder.maker][signedOrder.makerTokenAddress].add(orderHash); - } - private removeFromDependentOrderHashes(makerAddress: string, makerTokenAddress: string, orderHash: string) { - this._dependentOrderHashes[makerAddress][makerTokenAddress].delete(orderHash); - if (this._dependentOrderHashes[makerAddress][makerTokenAddress].size === 0) { - delete this._dependentOrderHashes[makerAddress][makerTokenAddress]; - } - if (_.isEmpty(this._dependentOrderHashes[makerAddress])) { - delete this._dependentOrderHashes[makerAddress]; - } - } -} |