diff options
author | Fabio Berger <me@fabioberger.com> | 2018-07-06 19:06:12 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-07-06 19:06:12 +0800 |
commit | 20d349cea71dd98001da18a1cf81e1c5a1844aaa (patch) | |
tree | a1d2f0131f0c86bd3cf996a939e5f77f389b7667 /packages/order-watcher/src | |
parent | faf5c84082fb96a2f17240ffc6e022fa8811a76a (diff) | |
parent | 1e0fa776c1b3a1c471c3ce12ced4a29f6d6a40dd (diff) | |
download | dexon-sol-tools-20d349cea71dd98001da18a1cf81e1c5a1844aaa.tar dexon-sol-tools-20d349cea71dd98001da18a1cf81e1c5a1844aaa.tar.gz dexon-sol-tools-20d349cea71dd98001da18a1cf81e1c5a1844aaa.tar.bz2 dexon-sol-tools-20d349cea71dd98001da18a1cf81e1c5a1844aaa.tar.lz dexon-sol-tools-20d349cea71dd98001da18a1cf81e1c5a1844aaa.tar.xz dexon-sol-tools-20d349cea71dd98001da18a1cf81e1c5a1844aaa.tar.zst dexon-sol-tools-20d349cea71dd98001da18a1cf81e1c5a1844aaa.zip |
Merge pull request #825 from 0xProject/fix-order-watcher
OrderWatcher Fixes
Diffstat (limited to 'packages/order-watcher/src')
-rw-r--r-- | packages/order-watcher/src/order_watcher/event_watcher.ts | 125 | ||||
-rw-r--r-- | packages/order-watcher/src/order_watcher/order_watcher.ts | 4 | ||||
-rw-r--r-- | packages/order-watcher/src/types.ts | 3 |
3 files changed, 84 insertions, 48 deletions
diff --git a/packages/order-watcher/src/order_watcher/event_watcher.ts b/packages/order-watcher/src/order_watcher/event_watcher.ts index f39d3bf0e..08ecf81cb 100644 --- a/packages/order-watcher/src/order_watcher/event_watcher.ts +++ b/packages/order-watcher/src/order_watcher/event_watcher.ts @@ -1,6 +1,7 @@ import { BlockParamLiteral, LogEntry } from '@0xproject/types'; -import { intervalUtils } from '@0xproject/utils'; +import { intervalUtils, logUtils } from '@0xproject/utils'; import { Web3Wrapper } from '@0xproject/web3-wrapper'; +import { Block, BlockAndLogStreamer, Log } from 'ethereumjs-blockstream'; import * as _ from 'lodash'; import { EventWatcherCallback, OrderWatcherError } from '../types'; @@ -19,81 +20,115 @@ enum LogEventState { */ export class EventWatcher { private _web3Wrapper: Web3Wrapper; + private _blockAndLogStreamerIfExists: BlockAndLogStreamer<Block, Log> | undefined; + private _blockAndLogStreamIntervalIfExists?: NodeJS.Timer; + private _onLogAddedSubscriptionToken: string | undefined; + private _onLogRemovedSubscriptionToken: string | undefined; private _pollingIntervalMs: number; - private _intervalIdIfExists?: NodeJS.Timer; - private _lastEvents: LogEntry[] = []; private _stateLayer: BlockParamLiteral; + private _isVerbose: boolean; constructor( web3Wrapper: Web3Wrapper, pollingIntervalIfExistsMs: undefined | number, stateLayer: BlockParamLiteral = BlockParamLiteral.Latest, + isVerbose: boolean, ) { + this._isVerbose = isVerbose; this._web3Wrapper = web3Wrapper; this._stateLayer = stateLayer; this._pollingIntervalMs = _.isUndefined(pollingIntervalIfExistsMs) ? DEFAULT_EVENT_POLLING_INTERVAL_MS : pollingIntervalIfExistsMs; + this._blockAndLogStreamerIfExists = undefined; + this._blockAndLogStreamIntervalIfExists = undefined; + this._onLogAddedSubscriptionToken = undefined; + this._onLogRemovedSubscriptionToken = undefined; } public subscribe(callback: EventWatcherCallback): void { assert.isFunction('callback', callback); - if (!_.isUndefined(this._intervalIdIfExists)) { + if (!_.isUndefined(this._blockAndLogStreamIntervalIfExists)) { throw new Error(OrderWatcherError.SubscriptionAlreadyPresent); } - this._intervalIdIfExists = intervalUtils.setAsyncExcludingInterval( - this._pollForBlockchainEventsAsync.bind(this, callback), - this._pollingIntervalMs, - (err: Error) => { - this.unsubscribe(); - callback(err); - }, - ); + this._startBlockAndLogStream(callback); } public unsubscribe(): void { - this._lastEvents = []; - if (!_.isUndefined(this._intervalIdIfExists)) { - intervalUtils.clearAsyncExcludingInterval(this._intervalIdIfExists); - delete this._intervalIdIfExists; + if (_.isUndefined(this._blockAndLogStreamIntervalIfExists)) { + throw new Error(OrderWatcherError.SubscriptionNotFound); } + this._stopBlockAndLogStream(); } - private async _pollForBlockchainEventsAsync(callback: EventWatcherCallback): Promise<void> { - const pendingEvents = await this._getEventsAsync(); - if (_.isUndefined(pendingEvents)) { - // HACK: This should never happen, but happens frequently on CI due to a ganache bug - return; - } - if (pendingEvents.length === 0) { - // HACK: Sometimes when node rebuilds the pending block we get back the empty result. - // We don't want to emit a lot of removal events and bring them back after a couple of miliseconds, - // that's why we just ignore those cases. - return; + private _startBlockAndLogStream(callback: EventWatcherCallback): void { + if (!_.isUndefined(this._blockAndLogStreamerIfExists)) { + throw new Error(OrderWatcherError.SubscriptionAlreadyPresent); } - const removedEvents = _.differenceBy(this._lastEvents, pendingEvents, JSON.stringify); - const newEvents = _.differenceBy(pendingEvents, this._lastEvents, JSON.stringify); - await this._emitDifferencesAsync(removedEvents, LogEventState.Removed, callback); - await this._emitDifferencesAsync(newEvents, LogEventState.Added, callback); - this._lastEvents = pendingEvents; - } - private async _getEventsAsync(): Promise<LogEntry[]> { const eventFilter = { fromBlock: this._stateLayer, toBlock: this._stateLayer, }; - const events = await this._web3Wrapper.getLogsAsync(eventFilter); - return events; + this._blockAndLogStreamerIfExists = new BlockAndLogStreamer( + this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper, this._stateLayer), + this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper, eventFilter), + this._onBlockAndLogStreamerError.bind(this), + ); + const catchAllLogFilter = {}; + this._blockAndLogStreamerIfExists.addLogFilter(catchAllLogFilter); + this._blockAndLogStreamIntervalIfExists = intervalUtils.setAsyncExcludingInterval( + this._reconcileBlockAsync.bind(this), + this._pollingIntervalMs, + this._onBlockAndLogStreamerError.bind(this), + ); + let isRemoved = false; + this._onLogAddedSubscriptionToken = this._blockAndLogStreamerIfExists.subscribeToOnLogAdded( + this._onLogStateChangedAsync.bind(this, callback, isRemoved), + ); + isRemoved = true; + this._onLogRemovedSubscriptionToken = this._blockAndLogStreamerIfExists.subscribeToOnLogRemoved( + this._onLogStateChangedAsync.bind(this, callback, isRemoved), + ); + } + private _stopBlockAndLogStream(): void { + if (_.isUndefined(this._blockAndLogStreamerIfExists)) { + throw new Error(OrderWatcherError.SubscriptionNotFound); + } + this._blockAndLogStreamerIfExists.unsubscribeFromOnLogAdded(this._onLogAddedSubscriptionToken as string); + this._blockAndLogStreamerIfExists.unsubscribeFromOnLogRemoved(this._onLogRemovedSubscriptionToken as string); + intervalUtils.clearAsyncExcludingInterval(this._blockAndLogStreamIntervalIfExists as NodeJS.Timer); + delete this._blockAndLogStreamerIfExists; + delete this._blockAndLogStreamIntervalIfExists; + } + private async _onLogStateChangedAsync( + callback: EventWatcherCallback, + isRemoved: boolean, + log: LogEntry, + ): Promise<void> { + await this._emitDifferencesAsync(log, isRemoved ? LogEventState.Removed : LogEventState.Added, callback); + } + private async _reconcileBlockAsync(): Promise<void> { + const latestBlock = await this._web3Wrapper.getBlockAsync(BlockParamLiteral.Latest); + // We need to coerce to Block type cause Web3.Block includes types for mempool blocks + if (!_.isUndefined(this._blockAndLogStreamerIfExists)) { + // If we clear the interval while fetching the block - this._blockAndLogStreamer will be undefined + await this._blockAndLogStreamerIfExists.reconcileNewBlock((latestBlock as any) as Block); + } } private async _emitDifferencesAsync( - logs: LogEntry[], + log: LogEntry, logEventState: LogEventState, callback: EventWatcherCallback, ): Promise<void> { - for (const log of logs) { - const logEvent = { - removed: logEventState === LogEventState.Removed, - ...log, - }; - if (!_.isUndefined(this._intervalIdIfExists)) { - callback(null, logEvent); - } + const logEvent = { + removed: logEventState === LogEventState.Removed, + ...log, + }; + if (!_.isUndefined(this._blockAndLogStreamIntervalIfExists)) { + callback(null, logEvent); + } + } + private _onBlockAndLogStreamerError(err: Error): void { + // Since Blockstream errors are all recoverable, we simply log them if the verbose + // config is passed in. + if (this._isVerbose) { + logUtils.warn(err); } } } diff --git a/packages/order-watcher/src/order_watcher/order_watcher.ts b/packages/order-watcher/src/order_watcher/order_watcher.ts index cac3a0923..999c1c79e 100644 --- a/packages/order-watcher/src/order_watcher/order_watcher.ts +++ b/packages/order-watcher/src/order_watcher/order_watcher.ts @@ -93,7 +93,8 @@ export class OrderWatcher { const pollingIntervalIfExistsMs = _.isUndefined(config) ? undefined : config.eventPollingIntervalMs; const stateLayer = _.isUndefined(config) || _.isUndefined(config.stateLayer) ? BlockParamLiteral.Latest : config.stateLayer; - this._eventWatcher = new EventWatcher(this._web3Wrapper, pollingIntervalIfExistsMs, stateLayer); + const isVerbose = !_.isUndefined(config) && !_.isUndefined(config.isVerbose) ? config.isVerbose : false; + this._eventWatcher = new EventWatcher(this._web3Wrapper, pollingIntervalIfExistsMs, stateLayer, isVerbose); this._balanceAndProxyAllowanceLazyStore = new BalanceAndProxyAllowanceLazyStore( this._contractWrappers.token, stateLayer, @@ -236,7 +237,6 @@ export class OrderWatcher { if (!_.isNull(err)) { if (!_.isUndefined(this._callbackIfExists)) { this._callbackIfExists(err); - this.unsubscribe(); } return; } diff --git a/packages/order-watcher/src/types.ts b/packages/order-watcher/src/types.ts index f5b189c5a..63e4e7848 100644 --- a/packages/order-watcher/src/types.ts +++ b/packages/order-watcher/src/types.ts @@ -16,11 +16,12 @@ export type EventWatcherCallback = (err: null | Error, log?: LogEntryEvent) => v * stateLayer: Optional blockchain state layer OrderWatcher will monitor for new events. Default=latest. */ export interface OrderWatcherConfig { + stateLayer: BlockParamLiteral; orderExpirationCheckingIntervalMs?: number; eventPollingIntervalMs?: number; expirationMarginMs?: number; cleanupJobIntervalMs?: number; - stateLayer: BlockParamLiteral; + isVerbose?: boolean; } export type OnOrderStateChangeCallback = (err: Error | null, orderState?: OrderState) => void; |