diff options
Diffstat (limited to 'packages/order-watcher/src')
-rw-r--r-- | packages/order-watcher/src/order_watcher/event_watcher.ts | 150 |
1 files changed, 103 insertions, 47 deletions
diff --git a/packages/order-watcher/src/order_watcher/event_watcher.ts b/packages/order-watcher/src/order_watcher/event_watcher.ts index f39d3bf0e..e14280421 100644 --- a/packages/order-watcher/src/order_watcher/event_watcher.ts +++ b/packages/order-watcher/src/order_watcher/event_watcher.ts @@ -1,6 +1,7 @@ import { BlockParamLiteral, LogEntry } from '@0xproject/types'; import { intervalUtils } from '@0xproject/utils'; import { Web3Wrapper } from '@0xproject/web3-wrapper'; +import { Block, BlockAndLogStreamer, Log } from 'ethereumjs-blockstream'; import * as _ from 'lodash'; import { EventWatcherCallback, OrderWatcherError } from '../types'; @@ -19,10 +20,17 @@ enum LogEventState { */ export class EventWatcher { private _web3Wrapper: Web3Wrapper; + private _blockAndLogStreamerIfExists: BlockAndLogStreamer<Block, Log> | undefined; + private _blockAndLogStreamIntervalIfExists?: NodeJS.Timer; + private _onLogAddedSubscriptionToken: string | undefined; + private _onLogRemovedSubscriptionToken: string | undefined; private _pollingIntervalMs: number; - private _intervalIdIfExists?: NodeJS.Timer; - private _lastEvents: LogEntry[] = []; private _stateLayer: BlockParamLiteral; + private static _onBlockAndLogStreamerError(callback: EventWatcherCallback, err: Error): void { + // Propogate all Blockstream subscriber errors to + // top-level subscription + callback(err); + } constructor( web3Wrapper: Web3Wrapper, pollingIntervalIfExistsMs: undefined | number, @@ -33,67 +41,115 @@ export class EventWatcher { this._pollingIntervalMs = _.isUndefined(pollingIntervalIfExistsMs) ? DEFAULT_EVENT_POLLING_INTERVAL_MS : pollingIntervalIfExistsMs; + this._blockAndLogStreamerIfExists = undefined; + this._blockAndLogStreamIntervalIfExists = undefined; + this._onLogAddedSubscriptionToken = undefined; + this._onLogRemovedSubscriptionToken = undefined; } public subscribe(callback: EventWatcherCallback): void { assert.isFunction('callback', callback); - if (!_.isUndefined(this._intervalIdIfExists)) { + if (!_.isUndefined(this._blockAndLogStreamIntervalIfExists)) { throw new Error(OrderWatcherError.SubscriptionAlreadyPresent); } - this._intervalIdIfExists = intervalUtils.setAsyncExcludingInterval( - this._pollForBlockchainEventsAsync.bind(this, callback), - this._pollingIntervalMs, - (err: Error) => { - this.unsubscribe(); - callback(err); - }, - ); + this._startBlockAndLogStream(callback); + // TODO: IS the above the correct refactor of this? + // this._intervalIdIfExists = intervalUtils.setAsyncExcludingInterval( + // this._pollForBlockchainEventsAsync.bind(this, callback), + // this._pollingIntervalMs, + // (err: Error) => { + // this.unsubscribe(); + // callback(err); + // }, + // ); } public unsubscribe(): void { - this._lastEvents = []; - if (!_.isUndefined(this._intervalIdIfExists)) { - intervalUtils.clearAsyncExcludingInterval(this._intervalIdIfExists); - delete this._intervalIdIfExists; + if (!_.isUndefined(this._blockAndLogStreamIntervalIfExists)) { + intervalUtils.clearAsyncExcludingInterval(this._blockAndLogStreamIntervalIfExists); + delete this._blockAndLogStreamIntervalIfExists; + delete this._blockAndLogStreamerIfExists; } } - private async _pollForBlockchainEventsAsync(callback: EventWatcherCallback): Promise<void> { - const pendingEvents = await this._getEventsAsync(); - if (_.isUndefined(pendingEvents)) { - // HACK: This should never happen, but happens frequently on CI due to a ganache bug - return; - } - if (pendingEvents.length === 0) { - // HACK: Sometimes when node rebuilds the pending block we get back the empty result. - // We don't want to emit a lot of removal events and bring them back after a couple of miliseconds, - // that's why we just ignore those cases. - return; + private _startBlockAndLogStream(callback: EventWatcherCallback): void { + if (!_.isUndefined(this._blockAndLogStreamerIfExists)) { + throw new Error(OrderWatcherError.SubscriptionAlreadyPresent); } - const removedEvents = _.differenceBy(this._lastEvents, pendingEvents, JSON.stringify); - const newEvents = _.differenceBy(pendingEvents, this._lastEvents, JSON.stringify); - await this._emitDifferencesAsync(removedEvents, LogEventState.Removed, callback); - await this._emitDifferencesAsync(newEvents, LogEventState.Added, callback); - this._lastEvents = pendingEvents; + this._blockAndLogStreamerIfExists = new BlockAndLogStreamer( + this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper), + this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper), + EventWatcher._onBlockAndLogStreamerError.bind(this, callback), + ); + const catchAllLogFilter = {}; + this._blockAndLogStreamerIfExists.addLogFilter(catchAllLogFilter); + this._blockAndLogStreamIntervalIfExists = intervalUtils.setAsyncExcludingInterval( + this._reconcileBlockAsync.bind(this), + this._pollingIntervalMs, + this._onReconcileBlockError.bind(this, callback), + ); + let isRemoved = false; + this._onLogAddedSubscriptionToken = this._blockAndLogStreamerIfExists.subscribeToOnLogAdded( + this._onLogStateChangedAsync.bind(this, callback, isRemoved), + ); + isRemoved = true; + this._onLogRemovedSubscriptionToken = this._blockAndLogStreamerIfExists.subscribeToOnLogRemoved( + this._onLogStateChangedAsync.bind(this, callback, isRemoved), + ); } - private async _getEventsAsync(): Promise<LogEntry[]> { - const eventFilter = { - fromBlock: this._stateLayer, - toBlock: this._stateLayer, - }; - const events = await this._web3Wrapper.getLogsAsync(eventFilter); - return events; + private _onReconcileBlockError(callback: EventWatcherCallback, err: Error): void { + this.unsubscribe(); + callback(err); + } + private async _onLogStateChangedAsync( + callback: EventWatcherCallback, + isRemoved: boolean, + log: LogEntry, + ): Promise<void> { + await this._emitDifferencesAsync(log, isRemoved ? LogEventState.Removed : LogEventState.Added, callback); + } + private async _reconcileBlockAsync(): Promise<void> { + const latestBlock = await this._web3Wrapper.getBlockAsync(BlockParamLiteral.Latest); + // We need to coerce to Block type cause Web3.Block includes types for mempool blocks + if (!_.isUndefined(this._blockAndLogStreamerIfExists)) { + // If we clear the interval while fetching the block - this._blockAndLogStreamer will be undefined + await this._blockAndLogStreamerIfExists.reconcileNewBlock((latestBlock as any) as Block); + } } + // private async _pollForBlockchainEventsAsync(callback: EventWatcherCallback): Promise<void> { + // const pendingEvents = await this._getEventsAsync(); + // if (_.isUndefined(pendingEvents)) { + // // HACK: This should never happen, but happens frequently on CI due to a ganache bug + // return; + // } + // if (pendingEvents.length === 0) { + // // HACK: Sometimes when node rebuilds the pending block we get back the empty result. + // // We don't want to emit a lot of removal events and bring them back after a couple of miliseconds, + // // that's why we just ignore those cases. + // return; + // } + // const removedEvents = _.differenceBy(this._lastEvents, pendingEvents, JSON.stringify); + // const newEvents = _.differenceBy(pendingEvents, this._lastEvents, JSON.stringify); + // await this._emitDifferencesAsync(removedEvents, LogEventState.Removed, callback); + // await this._emitDifferencesAsync(newEvents, LogEventState.Added, callback); + // this._lastEvents = pendingEvents; + // } + // private async _getEventsAsync(): Promise<LogEntry[]> { + // const eventFilter = { + // fromBlock: this._stateLayer, + // toBlock: this._stateLayer, + // }; + // const events = await this._web3Wrapper.getLogsAsync(eventFilter); + // return events; + // } private async _emitDifferencesAsync( - logs: LogEntry[], + log: LogEntry, logEventState: LogEventState, callback: EventWatcherCallback, ): Promise<void> { - for (const log of logs) { - const logEvent = { - removed: logEventState === LogEventState.Removed, - ...log, - }; - if (!_.isUndefined(this._intervalIdIfExists)) { - callback(null, logEvent); - } + const logEvent = { + removed: logEventState === LogEventState.Removed, + ...log, + }; + if (!_.isUndefined(this._blockAndLogStreamIntervalIfExists)) { + callback(null, logEvent); } } } |