aboutsummaryrefslogtreecommitdiffstats
path: root/packages/order-watcher
diff options
context:
space:
mode:
Diffstat (limited to 'packages/order-watcher')
-rw-r--r--packages/order-watcher/src/order_watcher/event_watcher.ts25
1 files changed, 19 insertions, 6 deletions
diff --git a/packages/order-watcher/src/order_watcher/event_watcher.ts b/packages/order-watcher/src/order_watcher/event_watcher.ts
index 0e27cd64b..d439d9e5b 100644
--- a/packages/order-watcher/src/order_watcher/event_watcher.ts
+++ b/packages/order-watcher/src/order_watcher/event_watcher.ts
@@ -54,19 +54,22 @@ export class EventWatcher {
this._startBlockAndLogStream(callback);
}
public unsubscribe(): void {
- if (!_.isUndefined(this._blockAndLogStreamIntervalIfExists)) {
- intervalUtils.clearAsyncExcludingInterval(this._blockAndLogStreamIntervalIfExists);
- delete this._blockAndLogStreamIntervalIfExists;
- delete this._blockAndLogStreamerIfExists;
+ if (_.isUndefined(this._blockAndLogStreamIntervalIfExists)) {
+ throw new Error(OrderWatcherError.SubscriptionNotFound);
}
+ this._stopBlockAndLogStream();
}
private _startBlockAndLogStream(callback: EventWatcherCallback): void {
if (!_.isUndefined(this._blockAndLogStreamerIfExists)) {
throw new Error(OrderWatcherError.SubscriptionAlreadyPresent);
}
+ const eventFilter = {
+ fromBlock: this._stateLayer,
+ toBlock: this._stateLayer,
+ };
this._blockAndLogStreamerIfExists = new BlockAndLogStreamer(
- this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper),
- this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper),
+ this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper, this._stateLayer),
+ this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper, eventFilter),
EventWatcher._onBlockAndLogStreamerError.bind(this, callback),
);
const catchAllLogFilter = {};
@@ -85,6 +88,16 @@ export class EventWatcher {
this._onLogStateChangedAsync.bind(this, callback, isRemoved),
);
}
+ private _stopBlockAndLogStream(): void {
+ if (_.isUndefined(this._blockAndLogStreamerIfExists)) {
+ throw new Error(OrderWatcherError.SubscriptionNotFound);
+ }
+ this._blockAndLogStreamerIfExists.unsubscribeFromOnLogAdded(this._onLogAddedSubscriptionToken as string);
+ this._blockAndLogStreamerIfExists.unsubscribeFromOnLogRemoved(this._onLogRemovedSubscriptionToken as string);
+ intervalUtils.clearAsyncExcludingInterval(this._blockAndLogStreamIntervalIfExists as NodeJS.Timer);
+ delete this._blockAndLogStreamerIfExists;
+ delete this._blockAndLogStreamIntervalIfExists;
+ }
private async _onLogStateChangedAsync(
callback: EventWatcherCallback,
isRemoved: boolean,