diff options
-rw-r--r-- | packages/order-watcher/package.json | 1 | ||||
-rw-r--r-- | packages/order-watcher/src/order_watcher/event_watcher.ts | 150 | ||||
-rw-r--r-- | packages/order-watcher/test/event_watcher_test.ts | 126 |
3 files changed, 104 insertions, 173 deletions
diff --git a/packages/order-watcher/package.json b/packages/order-watcher/package.json index 40613ea96..047854c4d 100644 --- a/packages/order-watcher/package.json +++ b/packages/order-watcher/package.json @@ -87,6 +87,7 @@ "@0xproject/typescript-typings": "^0.4.1", "@0xproject/utils": "^0.7.2", "@0xproject/web3-wrapper": "^0.7.1", + "ethereumjs-blockstream": "5.0.0", "ethereum-types": "^0.0.2", "bintrees": "^1.0.2", "ethers": "3.0.22", diff --git a/packages/order-watcher/src/order_watcher/event_watcher.ts b/packages/order-watcher/src/order_watcher/event_watcher.ts index f39d3bf0e..e14280421 100644 --- a/packages/order-watcher/src/order_watcher/event_watcher.ts +++ b/packages/order-watcher/src/order_watcher/event_watcher.ts @@ -1,6 +1,7 @@ import { BlockParamLiteral, LogEntry } from '@0xproject/types'; import { intervalUtils } from '@0xproject/utils'; import { Web3Wrapper } from '@0xproject/web3-wrapper'; +import { Block, BlockAndLogStreamer, Log } from 'ethereumjs-blockstream'; import * as _ from 'lodash'; import { EventWatcherCallback, OrderWatcherError } from '../types'; @@ -19,10 +20,17 @@ enum LogEventState { */ export class EventWatcher { private _web3Wrapper: Web3Wrapper; + private _blockAndLogStreamerIfExists: BlockAndLogStreamer<Block, Log> | undefined; + private _blockAndLogStreamIntervalIfExists?: NodeJS.Timer; + private _onLogAddedSubscriptionToken: string | undefined; + private _onLogRemovedSubscriptionToken: string | undefined; private _pollingIntervalMs: number; - private _intervalIdIfExists?: NodeJS.Timer; - private _lastEvents: LogEntry[] = []; private _stateLayer: BlockParamLiteral; + private static _onBlockAndLogStreamerError(callback: EventWatcherCallback, err: Error): void { + // Propogate all Blockstream subscriber errors to + // top-level subscription + callback(err); + } constructor( web3Wrapper: Web3Wrapper, pollingIntervalIfExistsMs: undefined | number, @@ -33,67 +41,115 @@ export class EventWatcher { this._pollingIntervalMs = _.isUndefined(pollingIntervalIfExistsMs) ? DEFAULT_EVENT_POLLING_INTERVAL_MS : pollingIntervalIfExistsMs; + this._blockAndLogStreamerIfExists = undefined; + this._blockAndLogStreamIntervalIfExists = undefined; + this._onLogAddedSubscriptionToken = undefined; + this._onLogRemovedSubscriptionToken = undefined; } public subscribe(callback: EventWatcherCallback): void { assert.isFunction('callback', callback); - if (!_.isUndefined(this._intervalIdIfExists)) { + if (!_.isUndefined(this._blockAndLogStreamIntervalIfExists)) { throw new Error(OrderWatcherError.SubscriptionAlreadyPresent); } - this._intervalIdIfExists = intervalUtils.setAsyncExcludingInterval( - this._pollForBlockchainEventsAsync.bind(this, callback), - this._pollingIntervalMs, - (err: Error) => { - this.unsubscribe(); - callback(err); - }, - ); + this._startBlockAndLogStream(callback); + // TODO: IS the above the correct refactor of this? + // this._intervalIdIfExists = intervalUtils.setAsyncExcludingInterval( + // this._pollForBlockchainEventsAsync.bind(this, callback), + // this._pollingIntervalMs, + // (err: Error) => { + // this.unsubscribe(); + // callback(err); + // }, + // ); } public unsubscribe(): void { - this._lastEvents = []; - if (!_.isUndefined(this._intervalIdIfExists)) { - intervalUtils.clearAsyncExcludingInterval(this._intervalIdIfExists); - delete this._intervalIdIfExists; + if (!_.isUndefined(this._blockAndLogStreamIntervalIfExists)) { + intervalUtils.clearAsyncExcludingInterval(this._blockAndLogStreamIntervalIfExists); + delete this._blockAndLogStreamIntervalIfExists; + delete this._blockAndLogStreamerIfExists; } } - private async _pollForBlockchainEventsAsync(callback: EventWatcherCallback): Promise<void> { - const pendingEvents = await this._getEventsAsync(); - if (_.isUndefined(pendingEvents)) { - // HACK: This should never happen, but happens frequently on CI due to a ganache bug - return; - } - if (pendingEvents.length === 0) { - // HACK: Sometimes when node rebuilds the pending block we get back the empty result. - // We don't want to emit a lot of removal events and bring them back after a couple of miliseconds, - // that's why we just ignore those cases. - return; + private _startBlockAndLogStream(callback: EventWatcherCallback): void { + if (!_.isUndefined(this._blockAndLogStreamerIfExists)) { + throw new Error(OrderWatcherError.SubscriptionAlreadyPresent); } - const removedEvents = _.differenceBy(this._lastEvents, pendingEvents, JSON.stringify); - const newEvents = _.differenceBy(pendingEvents, this._lastEvents, JSON.stringify); - await this._emitDifferencesAsync(removedEvents, LogEventState.Removed, callback); - await this._emitDifferencesAsync(newEvents, LogEventState.Added, callback); - this._lastEvents = pendingEvents; + this._blockAndLogStreamerIfExists = new BlockAndLogStreamer( + this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper), + this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper), + EventWatcher._onBlockAndLogStreamerError.bind(this, callback), + ); + const catchAllLogFilter = {}; + this._blockAndLogStreamerIfExists.addLogFilter(catchAllLogFilter); + this._blockAndLogStreamIntervalIfExists = intervalUtils.setAsyncExcludingInterval( + this._reconcileBlockAsync.bind(this), + this._pollingIntervalMs, + this._onReconcileBlockError.bind(this, callback), + ); + let isRemoved = false; + this._onLogAddedSubscriptionToken = this._blockAndLogStreamerIfExists.subscribeToOnLogAdded( + this._onLogStateChangedAsync.bind(this, callback, isRemoved), + ); + isRemoved = true; + this._onLogRemovedSubscriptionToken = this._blockAndLogStreamerIfExists.subscribeToOnLogRemoved( + this._onLogStateChangedAsync.bind(this, callback, isRemoved), + ); } - private async _getEventsAsync(): Promise<LogEntry[]> { - const eventFilter = { - fromBlock: this._stateLayer, - toBlock: this._stateLayer, - }; - const events = await this._web3Wrapper.getLogsAsync(eventFilter); - return events; + private _onReconcileBlockError(callback: EventWatcherCallback, err: Error): void { + this.unsubscribe(); + callback(err); + } + private async _onLogStateChangedAsync( + callback: EventWatcherCallback, + isRemoved: boolean, + log: LogEntry, + ): Promise<void> { + await this._emitDifferencesAsync(log, isRemoved ? LogEventState.Removed : LogEventState.Added, callback); + } + private async _reconcileBlockAsync(): Promise<void> { + const latestBlock = await this._web3Wrapper.getBlockAsync(BlockParamLiteral.Latest); + // We need to coerce to Block type cause Web3.Block includes types for mempool blocks + if (!_.isUndefined(this._blockAndLogStreamerIfExists)) { + // If we clear the interval while fetching the block - this._blockAndLogStreamer will be undefined + await this._blockAndLogStreamerIfExists.reconcileNewBlock((latestBlock as any) as Block); + } } + // private async _pollForBlockchainEventsAsync(callback: EventWatcherCallback): Promise<void> { + // const pendingEvents = await this._getEventsAsync(); + // if (_.isUndefined(pendingEvents)) { + // // HACK: This should never happen, but happens frequently on CI due to a ganache bug + // return; + // } + // if (pendingEvents.length === 0) { + // // HACK: Sometimes when node rebuilds the pending block we get back the empty result. + // // We don't want to emit a lot of removal events and bring them back after a couple of miliseconds, + // // that's why we just ignore those cases. + // return; + // } + // const removedEvents = _.differenceBy(this._lastEvents, pendingEvents, JSON.stringify); + // const newEvents = _.differenceBy(pendingEvents, this._lastEvents, JSON.stringify); + // await this._emitDifferencesAsync(removedEvents, LogEventState.Removed, callback); + // await this._emitDifferencesAsync(newEvents, LogEventState.Added, callback); + // this._lastEvents = pendingEvents; + // } + // private async _getEventsAsync(): Promise<LogEntry[]> { + // const eventFilter = { + // fromBlock: this._stateLayer, + // toBlock: this._stateLayer, + // }; + // const events = await this._web3Wrapper.getLogsAsync(eventFilter); + // return events; + // } private async _emitDifferencesAsync( - logs: LogEntry[], + log: LogEntry, logEventState: LogEventState, callback: EventWatcherCallback, ): Promise<void> { - for (const log of logs) { - const logEvent = { - removed: logEventState === LogEventState.Removed, - ...log, - }; - if (!_.isUndefined(this._intervalIdIfExists)) { - callback(null, logEvent); - } + const logEvent = { + removed: logEventState === LogEventState.Removed, + ...log, + }; + if (!_.isUndefined(this._blockAndLogStreamIntervalIfExists)) { + callback(null, logEvent); } } } diff --git a/packages/order-watcher/test/event_watcher_test.ts b/packages/order-watcher/test/event_watcher_test.ts deleted file mode 100644 index 9f4ac053f..000000000 --- a/packages/order-watcher/test/event_watcher_test.ts +++ /dev/null @@ -1,126 +0,0 @@ -import { callbackErrorReporter } from '@0xproject/dev-utils'; -import { DoneCallback, LogEntry, LogEntryEvent } from '@0xproject/types'; -import { Web3Wrapper } from '@0xproject/web3-wrapper'; -import * as chai from 'chai'; -import * as _ from 'lodash'; -import 'mocha'; -import * as Sinon from 'sinon'; - -import { EventWatcher } from '../src/order_watcher/event_watcher'; - -import { chaiSetup } from './utils/chai_setup'; -import { provider } from './utils/web3_wrapper'; - -chaiSetup.configure(); -const expect = chai.expect; - -describe('EventWatcher', () => { - let stubs: Sinon.SinonStub[] = []; - let eventWatcher: EventWatcher; - let web3Wrapper: Web3Wrapper; - const logA: LogEntry = { - address: '0x71d271f8b14adef568f8f28f1587ce7271ac4ca5', - blockHash: null, - blockNumber: null, - data: '', - logIndex: null, - topics: [], - transactionHash: '0x004881d38cd4a8f72f1a0d68c8b9b8124504706041ff37019c1d1ed6bfda8e17', - transactionIndex: 0, - }; - const logB: LogEntry = { - address: '0x8d12a197cb00d4747a1fe03395095ce2a5cc6819', - blockHash: null, - blockNumber: null, - data: '', - logIndex: null, - topics: ['0xf341246adaac6f497bc2a656f546ab9e182111d630394f0c57c710a59a2cb567'], - transactionHash: '0x01ef3c048b18d9b09ea195b4ed94cf8dd5f3d857a1905ff886b152cfb1166f25', - transactionIndex: 0, - }; - const logC: LogEntry = { - address: '0x1d271f8b174adef58f1587ce68f8f27271ac4ca5', - blockHash: null, - blockNumber: null, - data: '', - logIndex: null, - topics: ['0xf341246adaac6f497bc2a656f546ab9e182111d630394f0c57c710a59a2cb567'], - transactionHash: '0x01ef3c048b18d9b09ea195b4ed94cf8dd5f3d857a1905ff886b152cfb1166f25', - transactionIndex: 0, - }; - before(async () => { - const pollingIntervalMs = 10; - web3Wrapper = new Web3Wrapper(provider); - eventWatcher = new EventWatcher(web3Wrapper, pollingIntervalMs); - }); - afterEach(() => { - // clean up any stubs after the test has completed - _.each(stubs, s => s.restore()); - stubs = []; - eventWatcher.unsubscribe(); - }); - it('correctly emits initial log events', (done: DoneCallback) => { - const logs: LogEntry[] = [logA, logB]; - const expectedLogEvents = [ - { - removed: false, - ...logA, - }, - { - removed: false, - ...logB, - }, - ]; - const getLogsStub = Sinon.stub(web3Wrapper, 'getLogsAsync'); - getLogsStub.onCall(0).returns(logs); - stubs.push(getLogsStub); - const expectedToBeCalledOnce = false; - const callback = callbackErrorReporter.reportNodeCallbackErrors(done, expectedToBeCalledOnce)( - (event: LogEntryEvent) => { - const expectedLogEvent = expectedLogEvents.shift(); - expect(event).to.be.deep.equal(expectedLogEvent); - if (_.isEmpty(expectedLogEvents)) { - done(); - } - }, - ); - eventWatcher.subscribe(callback); - }); - it('correctly computes the difference and emits only changes', (done: DoneCallback) => { - const initialLogs: LogEntry[] = [logA, logB]; - const changedLogs: LogEntry[] = [logA, logC]; - const expectedLogEvents = [ - { - removed: false, - ...logA, - }, - { - removed: false, - ...logB, - }, - { - removed: true, - ...logB, - }, - { - removed: false, - ...logC, - }, - ]; - const getLogsStub = Sinon.stub(web3Wrapper, 'getLogsAsync'); - getLogsStub.onCall(0).returns(initialLogs); - getLogsStub.onCall(1).returns(changedLogs); - stubs.push(getLogsStub); - const expectedToBeCalledOnce = false; - const callback = callbackErrorReporter.reportNodeCallbackErrors(done, expectedToBeCalledOnce)( - (event: LogEntryEvent) => { - const expectedLogEvent = expectedLogEvents.shift(); - expect(event).to.be.deep.equal(expectedLogEvent); - if (_.isEmpty(expectedLogEvents)) { - done(); - } - }, - ); - eventWatcher.subscribe(callback); - }); -}); |