From 32ad34d2241c8e23002a4a7fc267a8a2e92fb304 Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Thu, 5 Jul 2018 23:18:55 +0200 Subject: properly stop blockstream and pass stateLayer into blockstream --- .../src/order_watcher/event_watcher.ts | 25 ++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/packages/order-watcher/src/order_watcher/event_watcher.ts b/packages/order-watcher/src/order_watcher/event_watcher.ts index 0e27cd64b..d439d9e5b 100644 --- a/packages/order-watcher/src/order_watcher/event_watcher.ts +++ b/packages/order-watcher/src/order_watcher/event_watcher.ts @@ -54,19 +54,22 @@ export class EventWatcher { this._startBlockAndLogStream(callback); } public unsubscribe(): void { - if (!_.isUndefined(this._blockAndLogStreamIntervalIfExists)) { - intervalUtils.clearAsyncExcludingInterval(this._blockAndLogStreamIntervalIfExists); - delete this._blockAndLogStreamIntervalIfExists; - delete this._blockAndLogStreamerIfExists; + if (_.isUndefined(this._blockAndLogStreamIntervalIfExists)) { + throw new Error(OrderWatcherError.SubscriptionNotFound); } + this._stopBlockAndLogStream(); } private _startBlockAndLogStream(callback: EventWatcherCallback): void { if (!_.isUndefined(this._blockAndLogStreamerIfExists)) { throw new Error(OrderWatcherError.SubscriptionAlreadyPresent); } + const eventFilter = { + fromBlock: this._stateLayer, + toBlock: this._stateLayer, + }; this._blockAndLogStreamerIfExists = new BlockAndLogStreamer( - this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper), - this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper), + this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper, this._stateLayer), + this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper, eventFilter), EventWatcher._onBlockAndLogStreamerError.bind(this, callback), ); const catchAllLogFilter = {}; @@ -85,6 +88,16 @@ export class EventWatcher { this._onLogStateChangedAsync.bind(this, callback, isRemoved), ); } + private _stopBlockAndLogStream(): void { + if (_.isUndefined(this._blockAndLogStreamerIfExists)) { + throw new Error(OrderWatcherError.SubscriptionNotFound); + } + this._blockAndLogStreamerIfExists.unsubscribeFromOnLogAdded(this._onLogAddedSubscriptionToken as string); + this._blockAndLogStreamerIfExists.unsubscribeFromOnLogRemoved(this._onLogRemovedSubscriptionToken as string); + intervalUtils.clearAsyncExcludingInterval(this._blockAndLogStreamIntervalIfExists as NodeJS.Timer); + delete this._blockAndLogStreamerIfExists; + delete this._blockAndLogStreamIntervalIfExists; + } private async _onLogStateChangedAsync( callback: EventWatcherCallback, isRemoved: boolean, -- cgit v1.2.3