aboutsummaryrefslogtreecommitdiffstats
path: root/packages/order-watcher/src
diff options
context:
space:
mode:
Diffstat (limited to 'packages/order-watcher/src')
-rw-r--r--packages/order-watcher/src/order_watcher/event_watcher.ts125
-rw-r--r--packages/order-watcher/src/order_watcher/order_watcher.ts4
-rw-r--r--packages/order-watcher/src/types.ts3
3 files changed, 84 insertions, 48 deletions
diff --git a/packages/order-watcher/src/order_watcher/event_watcher.ts b/packages/order-watcher/src/order_watcher/event_watcher.ts
index f39d3bf0e..08ecf81cb 100644
--- a/packages/order-watcher/src/order_watcher/event_watcher.ts
+++ b/packages/order-watcher/src/order_watcher/event_watcher.ts
@@ -1,6 +1,7 @@
import { BlockParamLiteral, LogEntry } from '@0xproject/types';
-import { intervalUtils } from '@0xproject/utils';
+import { intervalUtils, logUtils } from '@0xproject/utils';
import { Web3Wrapper } from '@0xproject/web3-wrapper';
+import { Block, BlockAndLogStreamer, Log } from 'ethereumjs-blockstream';
import * as _ from 'lodash';
import { EventWatcherCallback, OrderWatcherError } from '../types';
@@ -19,81 +20,115 @@ enum LogEventState {
*/
export class EventWatcher {
private _web3Wrapper: Web3Wrapper;
+ private _blockAndLogStreamerIfExists: BlockAndLogStreamer<Block, Log> | undefined;
+ private _blockAndLogStreamIntervalIfExists?: NodeJS.Timer;
+ private _onLogAddedSubscriptionToken: string | undefined;
+ private _onLogRemovedSubscriptionToken: string | undefined;
private _pollingIntervalMs: number;
- private _intervalIdIfExists?: NodeJS.Timer;
- private _lastEvents: LogEntry[] = [];
private _stateLayer: BlockParamLiteral;
+ private _isVerbose: boolean;
constructor(
web3Wrapper: Web3Wrapper,
pollingIntervalIfExistsMs: undefined | number,
stateLayer: BlockParamLiteral = BlockParamLiteral.Latest,
+ isVerbose: boolean,
) {
+ this._isVerbose = isVerbose;
this._web3Wrapper = web3Wrapper;
this._stateLayer = stateLayer;
this._pollingIntervalMs = _.isUndefined(pollingIntervalIfExistsMs)
? DEFAULT_EVENT_POLLING_INTERVAL_MS
: pollingIntervalIfExistsMs;
+ this._blockAndLogStreamerIfExists = undefined;
+ this._blockAndLogStreamIntervalIfExists = undefined;
+ this._onLogAddedSubscriptionToken = undefined;
+ this._onLogRemovedSubscriptionToken = undefined;
}
public subscribe(callback: EventWatcherCallback): void {
assert.isFunction('callback', callback);
- if (!_.isUndefined(this._intervalIdIfExists)) {
+ if (!_.isUndefined(this._blockAndLogStreamIntervalIfExists)) {
throw new Error(OrderWatcherError.SubscriptionAlreadyPresent);
}
- this._intervalIdIfExists = intervalUtils.setAsyncExcludingInterval(
- this._pollForBlockchainEventsAsync.bind(this, callback),
- this._pollingIntervalMs,
- (err: Error) => {
- this.unsubscribe();
- callback(err);
- },
- );
+ this._startBlockAndLogStream(callback);
}
public unsubscribe(): void {
- this._lastEvents = [];
- if (!_.isUndefined(this._intervalIdIfExists)) {
- intervalUtils.clearAsyncExcludingInterval(this._intervalIdIfExists);
- delete this._intervalIdIfExists;
+ if (_.isUndefined(this._blockAndLogStreamIntervalIfExists)) {
+ throw new Error(OrderWatcherError.SubscriptionNotFound);
}
+ this._stopBlockAndLogStream();
}
- private async _pollForBlockchainEventsAsync(callback: EventWatcherCallback): Promise<void> {
- const pendingEvents = await this._getEventsAsync();
- if (_.isUndefined(pendingEvents)) {
- // HACK: This should never happen, but happens frequently on CI due to a ganache bug
- return;
- }
- if (pendingEvents.length === 0) {
- // HACK: Sometimes when node rebuilds the pending block we get back the empty result.
- // We don't want to emit a lot of removal events and bring them back after a couple of miliseconds,
- // that's why we just ignore those cases.
- return;
+ private _startBlockAndLogStream(callback: EventWatcherCallback): void {
+ if (!_.isUndefined(this._blockAndLogStreamerIfExists)) {
+ throw new Error(OrderWatcherError.SubscriptionAlreadyPresent);
}
- const removedEvents = _.differenceBy(this._lastEvents, pendingEvents, JSON.stringify);
- const newEvents = _.differenceBy(pendingEvents, this._lastEvents, JSON.stringify);
- await this._emitDifferencesAsync(removedEvents, LogEventState.Removed, callback);
- await this._emitDifferencesAsync(newEvents, LogEventState.Added, callback);
- this._lastEvents = pendingEvents;
- }
- private async _getEventsAsync(): Promise<LogEntry[]> {
const eventFilter = {
fromBlock: this._stateLayer,
toBlock: this._stateLayer,
};
- const events = await this._web3Wrapper.getLogsAsync(eventFilter);
- return events;
+ this._blockAndLogStreamerIfExists = new BlockAndLogStreamer(
+ this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper, this._stateLayer),
+ this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper, eventFilter),
+ this._onBlockAndLogStreamerError.bind(this),
+ );
+ const catchAllLogFilter = {};
+ this._blockAndLogStreamerIfExists.addLogFilter(catchAllLogFilter);
+ this._blockAndLogStreamIntervalIfExists = intervalUtils.setAsyncExcludingInterval(
+ this._reconcileBlockAsync.bind(this),
+ this._pollingIntervalMs,
+ this._onBlockAndLogStreamerError.bind(this),
+ );
+ let isRemoved = false;
+ this._onLogAddedSubscriptionToken = this._blockAndLogStreamerIfExists.subscribeToOnLogAdded(
+ this._onLogStateChangedAsync.bind(this, callback, isRemoved),
+ );
+ isRemoved = true;
+ this._onLogRemovedSubscriptionToken = this._blockAndLogStreamerIfExists.subscribeToOnLogRemoved(
+ this._onLogStateChangedAsync.bind(this, callback, isRemoved),
+ );
+ }
+ private _stopBlockAndLogStream(): void {
+ if (_.isUndefined(this._blockAndLogStreamerIfExists)) {
+ throw new Error(OrderWatcherError.SubscriptionNotFound);
+ }
+ this._blockAndLogStreamerIfExists.unsubscribeFromOnLogAdded(this._onLogAddedSubscriptionToken as string);
+ this._blockAndLogStreamerIfExists.unsubscribeFromOnLogRemoved(this._onLogRemovedSubscriptionToken as string);
+ intervalUtils.clearAsyncExcludingInterval(this._blockAndLogStreamIntervalIfExists as NodeJS.Timer);
+ delete this._blockAndLogStreamerIfExists;
+ delete this._blockAndLogStreamIntervalIfExists;
+ }
+ private async _onLogStateChangedAsync(
+ callback: EventWatcherCallback,
+ isRemoved: boolean,
+ log: LogEntry,
+ ): Promise<void> {
+ await this._emitDifferencesAsync(log, isRemoved ? LogEventState.Removed : LogEventState.Added, callback);
+ }
+ private async _reconcileBlockAsync(): Promise<void> {
+ const latestBlock = await this._web3Wrapper.getBlockAsync(BlockParamLiteral.Latest);
+ // We need to coerce to Block type cause Web3.Block includes types for mempool blocks
+ if (!_.isUndefined(this._blockAndLogStreamerIfExists)) {
+ // If we clear the interval while fetching the block - this._blockAndLogStreamer will be undefined
+ await this._blockAndLogStreamerIfExists.reconcileNewBlock((latestBlock as any) as Block);
+ }
}
private async _emitDifferencesAsync(
- logs: LogEntry[],
+ log: LogEntry,
logEventState: LogEventState,
callback: EventWatcherCallback,
): Promise<void> {
- for (const log of logs) {
- const logEvent = {
- removed: logEventState === LogEventState.Removed,
- ...log,
- };
- if (!_.isUndefined(this._intervalIdIfExists)) {
- callback(null, logEvent);
- }
+ const logEvent = {
+ removed: logEventState === LogEventState.Removed,
+ ...log,
+ };
+ if (!_.isUndefined(this._blockAndLogStreamIntervalIfExists)) {
+ callback(null, logEvent);
+ }
+ }
+ private _onBlockAndLogStreamerError(err: Error): void {
+ // Since Blockstream errors are all recoverable, we simply log them if the verbose
+ // config is passed in.
+ if (this._isVerbose) {
+ logUtils.warn(err);
}
}
}
diff --git a/packages/order-watcher/src/order_watcher/order_watcher.ts b/packages/order-watcher/src/order_watcher/order_watcher.ts
index cac3a0923..999c1c79e 100644
--- a/packages/order-watcher/src/order_watcher/order_watcher.ts
+++ b/packages/order-watcher/src/order_watcher/order_watcher.ts
@@ -93,7 +93,8 @@ export class OrderWatcher {
const pollingIntervalIfExistsMs = _.isUndefined(config) ? undefined : config.eventPollingIntervalMs;
const stateLayer =
_.isUndefined(config) || _.isUndefined(config.stateLayer) ? BlockParamLiteral.Latest : config.stateLayer;
- this._eventWatcher = new EventWatcher(this._web3Wrapper, pollingIntervalIfExistsMs, stateLayer);
+ const isVerbose = !_.isUndefined(config) && !_.isUndefined(config.isVerbose) ? config.isVerbose : false;
+ this._eventWatcher = new EventWatcher(this._web3Wrapper, pollingIntervalIfExistsMs, stateLayer, isVerbose);
this._balanceAndProxyAllowanceLazyStore = new BalanceAndProxyAllowanceLazyStore(
this._contractWrappers.token,
stateLayer,
@@ -236,7 +237,6 @@ export class OrderWatcher {
if (!_.isNull(err)) {
if (!_.isUndefined(this._callbackIfExists)) {
this._callbackIfExists(err);
- this.unsubscribe();
}
return;
}
diff --git a/packages/order-watcher/src/types.ts b/packages/order-watcher/src/types.ts
index f5b189c5a..63e4e7848 100644
--- a/packages/order-watcher/src/types.ts
+++ b/packages/order-watcher/src/types.ts
@@ -16,11 +16,12 @@ export type EventWatcherCallback = (err: null | Error, log?: LogEntryEvent) => v
* stateLayer: Optional blockchain state layer OrderWatcher will monitor for new events. Default=latest.
*/
export interface OrderWatcherConfig {
+ stateLayer: BlockParamLiteral;
orderExpirationCheckingIntervalMs?: number;
eventPollingIntervalMs?: number;
expirationMarginMs?: number;
cleanupJobIntervalMs?: number;
- stateLayer: BlockParamLiteral;
+ isVerbose?: boolean;
}
export type OnOrderStateChangeCallback = (err: Error | null, orderState?: OrderState) => void;