From 6bcd9adb9e41ccc45ef5e117df243526bbd281ec Mon Sep 17 00:00:00 2001 From: Leonid Logvinov Date: Fri, 10 Nov 2017 16:32:23 -0500 Subject: Make subscribe function async and make blockStore operational --- src/order_watcher/order_state_watcher.ts | 51 +++++++++++++++++++++----------- src/stores/block_store.ts | 29 +++++++++++++++--- 2 files changed, 59 insertions(+), 21 deletions(-) diff --git a/src/order_watcher/order_state_watcher.ts b/src/order_watcher/order_state_watcher.ts index 8e0b46eb0..affb350a3 100644 --- a/src/order_watcher/order_state_watcher.ts +++ b/src/order_watcher/order_state_watcher.ts @@ -50,11 +50,15 @@ export class OrderStateWatcher { private _dependentOrderHashes: DependentOrderHashes = {}; private _callbackIfExistsAsync?: OnOrderStateChangeCallback; private _eventWatcher: EventWatcher; + private _web3Wrapper: Web3Wrapper; + private _token: TokenWrapper; + private _exchange: ExchangeWrapper; private _abiDecoder: AbiDecoder; - private _orderStateUtils: OrderStateUtils; + private _orderStateUtils?: OrderStateUtils; + private _blockStore?: BlockStore; private _numConfirmations: number; - private _orderFilledCancelledLazyStore: OrderFilledCancelledLazyStore; - private _balanceAndProxyAllowanceLazyStore: BalanceAndProxyAllowanceLazyStore; + private _orderFilledCancelledLazyStore?: OrderFilledCancelledLazyStore; + private _balanceAndProxyAllowanceLazyStore?: BalanceAndProxyAllowanceLazyStore; constructor( web3Wrapper: Web3Wrapper, abiDecoder: AbiDecoder, token: TokenWrapper, exchange: ExchangeWrapper, config?: OrderStateWatcherConfig, @@ -67,16 +71,8 @@ export class OrderStateWatcher { web3Wrapper, eventPollingIntervalMs, this._numConfirmations, ); this._abiDecoder = abiDecoder; - const blockStore = new BlockStore(web3Wrapper); - this._balanceAndProxyAllowanceLazyStore = new BalanceAndProxyAllowanceLazyStore( - token, blockStore, this._numConfirmations, - ); - this._orderFilledCancelledLazyStore = new OrderFilledCancelledLazyStore( - exchange, blockStore, this._numConfirmations, - ); - this._orderStateUtils = new OrderStateUtils( - this._balanceAndProxyAllowanceLazyStore, this._orderFilledCancelledLazyStore, - ); + this._token = token; + this._exchange = exchange; } /** * Add an order to the orderStateWatcher. Before the order is added, it's @@ -109,11 +105,22 @@ export class OrderStateWatcher { * @param callback Receives the orderHash of the order that should be re-validated, together * with all the order-relevant blockchain state needed to re-validate the order. */ - public subscribe(callback: OnOrderStateChangeCallback): void { + public async subscribeAsync(callback: OnOrderStateChangeCallback): Promise { assert.isFunction('callback', callback); if (!_.isUndefined(this._callbackIfExistsAsync)) { throw new Error(ZeroExError.SubscriptionAlreadyPresent); } + this._blockStore = new BlockStore(this._web3Wrapper); + await this._blockStore.startAsync(); + this._balanceAndProxyAllowanceLazyStore = new BalanceAndProxyAllowanceLazyStore( + this._token, this._blockStore, this._numConfirmations, + ); + this._orderFilledCancelledLazyStore = new OrderFilledCancelledLazyStore( + this._exchange, this._blockStore, this._numConfirmations, + ); + this._orderStateUtils = new OrderStateUtils( + this._balanceAndProxyAllowanceLazyStore, this._orderFilledCancelledLazyStore, + ); this._callbackIfExistsAsync = callback; this._eventWatcher.subscribe(this._onEventWatcherCallbackAsync.bind(this)); } @@ -121,7 +128,15 @@ export class OrderStateWatcher { * Ends an orderStateWatcher subscription. */ public unsubscribe(): void { + if (_.isUndefined(this._blockStore)) { + throw new Error(ZeroExError.SubscriptionNotFound); + } + this._blockStore.stop(); + delete this._blockStore; delete this._callbackIfExistsAsync; + delete this._balanceAndProxyAllowanceLazyStore; + delete this._orderFilledCancelledLazyStore; + delete this._orderStateUtils; this._eventWatcher.unsubscribe(); } private async _onEventWatcherCallbackAsync(log: LogEvent): Promise { @@ -174,12 +189,14 @@ export class OrderStateWatcher { private async _emitRevalidateOrdersAsync(orderHashes: string[]): Promise { for (const orderHash of orderHashes) { const signedOrder = this._orderByOrderHash[orderHash] as SignedOrder; + if (_.isUndefined(this._orderStateUtils)) { + break; // Unsubscribe was called + } const orderState = await this._orderStateUtils.getOrderStateAsync(signedOrder); - if (!_.isUndefined(this._callbackIfExistsAsync)) { - await this._callbackIfExistsAsync(orderState); - } else { + if (_.isUndefined(this._callbackIfExistsAsync)) { break; // Unsubscribe was called } + await this._callbackIfExistsAsync(orderState); } } private addToDependentOrderHashes(signedOrder: SignedOrder, orderHash: string) { diff --git a/src/stores/block_store.ts b/src/stores/block_store.ts index d1b4d3c54..70798a999 100644 --- a/src/stores/block_store.ts +++ b/src/stores/block_store.ts @@ -1,8 +1,11 @@ import * as _ from 'lodash'; import * as Web3 from 'web3'; import {BigNumber} from 'bignumber.js'; -import {BlockParamLiteral, InternalZeroExError} from '../types'; +import {BlockParamLiteral, InternalZeroExError, ZeroExError} from '../types'; import {Web3Wrapper} from '../web3_wrapper'; +import {intervalUtils} from '../utils/interval_utils'; + +const POLLING_INTERVAL_MS = 500; /** * Store for a current latest block number @@ -10,9 +13,9 @@ import {Web3Wrapper} from '../web3_wrapper'; export class BlockStore { private web3Wrapper?: Web3Wrapper; private latestBlockNumber?: number; + private intervalId?: NodeJS.Timer; constructor(web3Wrapper?: Web3Wrapper) { this.web3Wrapper = web3Wrapper; - // TODO start a subscription } public getBlockNumberWithNConfirmations(numConfirmations: number): Web3.BlockParam { let blockNumber; @@ -32,7 +35,25 @@ export class BlockStore { } return blockNumber; } - public setLatestBlock(latestBlockNumber: number): void { - this.latestBlockNumber = latestBlockNumber; + public async startAsync(): Promise { + await this.updateLatestBlockAsync(); + this.intervalId = intervalUtils.setAsyncExcludingInterval( + this.updateLatestBlockAsync.bind(this), POLLING_INTERVAL_MS, + ); + } + public stop(): void { + if (!_.isUndefined(this.intervalId)) { + intervalUtils.clearAsyncExcludingInterval(this.intervalId); + } + } + private async updateLatestBlockAsync(): Promise { + if (_.isUndefined(this.web3Wrapper)) { + throw new Error(InternalZeroExError.Web3WrapperRequiredToStartBlockStore); + } + const block = await this.web3Wrapper.getBlockAsync(BlockParamLiteral.Latest); + if (_.isNull(block.number)) { + throw new Error(ZeroExError.FailedToFetchLatestBlock); + } + this.latestBlockNumber = block.number; } } -- cgit v1.2.3