From d9e308e53af3ab84fb12554a1b75ca50bcee2946 Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Thu, 5 Jul 2018 16:39:53 +0200 Subject: Refactor order-watcher to use Blockstream under-the-hood --- packages/order-watcher/package.json | 1 + .../src/order_watcher/event_watcher.ts | 150 ++++++++++++++------- packages/order-watcher/test/event_watcher_test.ts | 126 ----------------- 3 files changed, 104 insertions(+), 173 deletions(-) delete mode 100644 packages/order-watcher/test/event_watcher_test.ts diff --git a/packages/order-watcher/package.json b/packages/order-watcher/package.json index 40613ea96..047854c4d 100644 --- a/packages/order-watcher/package.json +++ b/packages/order-watcher/package.json @@ -87,6 +87,7 @@ "@0xproject/typescript-typings": "^0.4.1", "@0xproject/utils": "^0.7.2", "@0xproject/web3-wrapper": "^0.7.1", + "ethereumjs-blockstream": "5.0.0", "ethereum-types": "^0.0.2", "bintrees": "^1.0.2", "ethers": "3.0.22", diff --git a/packages/order-watcher/src/order_watcher/event_watcher.ts b/packages/order-watcher/src/order_watcher/event_watcher.ts index f39d3bf0e..e14280421 100644 --- a/packages/order-watcher/src/order_watcher/event_watcher.ts +++ b/packages/order-watcher/src/order_watcher/event_watcher.ts @@ -1,6 +1,7 @@ import { BlockParamLiteral, LogEntry } from '@0xproject/types'; import { intervalUtils } from '@0xproject/utils'; import { Web3Wrapper } from '@0xproject/web3-wrapper'; +import { Block, BlockAndLogStreamer, Log } from 'ethereumjs-blockstream'; import * as _ from 'lodash'; import { EventWatcherCallback, OrderWatcherError } from '../types'; @@ -19,10 +20,17 @@ enum LogEventState { */ export class EventWatcher { private _web3Wrapper: Web3Wrapper; + private _blockAndLogStreamerIfExists: BlockAndLogStreamer | undefined; + private _blockAndLogStreamIntervalIfExists?: NodeJS.Timer; + private _onLogAddedSubscriptionToken: string | undefined; + private _onLogRemovedSubscriptionToken: string | undefined; private _pollingIntervalMs: number; - private _intervalIdIfExists?: NodeJS.Timer; - private _lastEvents: LogEntry[] = []; private _stateLayer: BlockParamLiteral; + private static _onBlockAndLogStreamerError(callback: EventWatcherCallback, err: Error): void { + // Propogate all Blockstream subscriber errors to + // top-level subscription + callback(err); + } constructor( web3Wrapper: Web3Wrapper, pollingIntervalIfExistsMs: undefined | number, @@ -33,67 +41,115 @@ export class EventWatcher { this._pollingIntervalMs = _.isUndefined(pollingIntervalIfExistsMs) ? DEFAULT_EVENT_POLLING_INTERVAL_MS : pollingIntervalIfExistsMs; + this._blockAndLogStreamerIfExists = undefined; + this._blockAndLogStreamIntervalIfExists = undefined; + this._onLogAddedSubscriptionToken = undefined; + this._onLogRemovedSubscriptionToken = undefined; } public subscribe(callback: EventWatcherCallback): void { assert.isFunction('callback', callback); - if (!_.isUndefined(this._intervalIdIfExists)) { + if (!_.isUndefined(this._blockAndLogStreamIntervalIfExists)) { throw new Error(OrderWatcherError.SubscriptionAlreadyPresent); } - this._intervalIdIfExists = intervalUtils.setAsyncExcludingInterval( - this._pollForBlockchainEventsAsync.bind(this, callback), - this._pollingIntervalMs, - (err: Error) => { - this.unsubscribe(); - callback(err); - }, - ); + this._startBlockAndLogStream(callback); + // TODO: IS the above the correct refactor of this? + // this._intervalIdIfExists = intervalUtils.setAsyncExcludingInterval( + // this._pollForBlockchainEventsAsync.bind(this, callback), + // this._pollingIntervalMs, + // (err: Error) => { + // this.unsubscribe(); + // callback(err); + // }, + // ); } public unsubscribe(): void { - this._lastEvents = []; - if (!_.isUndefined(this._intervalIdIfExists)) { - intervalUtils.clearAsyncExcludingInterval(this._intervalIdIfExists); - delete this._intervalIdIfExists; + if (!_.isUndefined(this._blockAndLogStreamIntervalIfExists)) { + intervalUtils.clearAsyncExcludingInterval(this._blockAndLogStreamIntervalIfExists); + delete this._blockAndLogStreamIntervalIfExists; + delete this._blockAndLogStreamerIfExists; } } - private async _pollForBlockchainEventsAsync(callback: EventWatcherCallback): Promise { - const pendingEvents = await this._getEventsAsync(); - if (_.isUndefined(pendingEvents)) { - // HACK: This should never happen, but happens frequently on CI due to a ganache bug - return; - } - if (pendingEvents.length === 0) { - // HACK: Sometimes when node rebuilds the pending block we get back the empty result. - // We don't want to emit a lot of removal events and bring them back after a couple of miliseconds, - // that's why we just ignore those cases. - return; + private _startBlockAndLogStream(callback: EventWatcherCallback): void { + if (!_.isUndefined(this._blockAndLogStreamerIfExists)) { + throw new Error(OrderWatcherError.SubscriptionAlreadyPresent); } - const removedEvents = _.differenceBy(this._lastEvents, pendingEvents, JSON.stringify); - const newEvents = _.differenceBy(pendingEvents, this._lastEvents, JSON.stringify); - await this._emitDifferencesAsync(removedEvents, LogEventState.Removed, callback); - await this._emitDifferencesAsync(newEvents, LogEventState.Added, callback); - this._lastEvents = pendingEvents; + this._blockAndLogStreamerIfExists = new BlockAndLogStreamer( + this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper), + this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper), + EventWatcher._onBlockAndLogStreamerError.bind(this, callback), + ); + const catchAllLogFilter = {}; + this._blockAndLogStreamerIfExists.addLogFilter(catchAllLogFilter); + this._blockAndLogStreamIntervalIfExists = intervalUtils.setAsyncExcludingInterval( + this._reconcileBlockAsync.bind(this), + this._pollingIntervalMs, + this._onReconcileBlockError.bind(this, callback), + ); + let isRemoved = false; + this._onLogAddedSubscriptionToken = this._blockAndLogStreamerIfExists.subscribeToOnLogAdded( + this._onLogStateChangedAsync.bind(this, callback, isRemoved), + ); + isRemoved = true; + this._onLogRemovedSubscriptionToken = this._blockAndLogStreamerIfExists.subscribeToOnLogRemoved( + this._onLogStateChangedAsync.bind(this, callback, isRemoved), + ); } - private async _getEventsAsync(): Promise { - const eventFilter = { - fromBlock: this._stateLayer, - toBlock: this._stateLayer, - }; - const events = await this._web3Wrapper.getLogsAsync(eventFilter); - return events; + private _onReconcileBlockError(callback: EventWatcherCallback, err: Error): void { + this.unsubscribe(); + callback(err); + } + private async _onLogStateChangedAsync( + callback: EventWatcherCallback, + isRemoved: boolean, + log: LogEntry, + ): Promise { + await this._emitDifferencesAsync(log, isRemoved ? LogEventState.Removed : LogEventState.Added, callback); + } + private async _reconcileBlockAsync(): Promise { + const latestBlock = await this._web3Wrapper.getBlockAsync(BlockParamLiteral.Latest); + // We need to coerce to Block type cause Web3.Block includes types for mempool blocks + if (!_.isUndefined(this._blockAndLogStreamerIfExists)) { + // If we clear the interval while fetching the block - this._blockAndLogStreamer will be undefined + await this._blockAndLogStreamerIfExists.reconcileNewBlock((latestBlock as any) as Block); + } } + // private async _pollForBlockchainEventsAsync(callback: EventWatcherCallback): Promise { + // const pendingEvents = await this._getEventsAsync(); + // if (_.isUndefined(pendingEvents)) { + // // HACK: This should never happen, but happens frequently on CI due to a ganache bug + // return; + // } + // if (pendingEvents.length === 0) { + // // HACK: Sometimes when node rebuilds the pending block we get back the empty result. + // // We don't want to emit a lot of removal events and bring them back after a couple of miliseconds, + // // that's why we just ignore those cases. + // return; + // } + // const removedEvents = _.differenceBy(this._lastEvents, pendingEvents, JSON.stringify); + // const newEvents = _.differenceBy(pendingEvents, this._lastEvents, JSON.stringify); + // await this._emitDifferencesAsync(removedEvents, LogEventState.Removed, callback); + // await this._emitDifferencesAsync(newEvents, LogEventState.Added, callback); + // this._lastEvents = pendingEvents; + // } + // private async _getEventsAsync(): Promise { + // const eventFilter = { + // fromBlock: this._stateLayer, + // toBlock: this._stateLayer, + // }; + // const events = await this._web3Wrapper.getLogsAsync(eventFilter); + // return events; + // } private async _emitDifferencesAsync( - logs: LogEntry[], + log: LogEntry, logEventState: LogEventState, callback: EventWatcherCallback, ): Promise { - for (const log of logs) { - const logEvent = { - removed: logEventState === LogEventState.Removed, - ...log, - }; - if (!_.isUndefined(this._intervalIdIfExists)) { - callback(null, logEvent); - } + const logEvent = { + removed: logEventState === LogEventState.Removed, + ...log, + }; + if (!_.isUndefined(this._blockAndLogStreamIntervalIfExists)) { + callback(null, logEvent); } } } diff --git a/packages/order-watcher/test/event_watcher_test.ts b/packages/order-watcher/test/event_watcher_test.ts deleted file mode 100644 index 9f4ac053f..000000000 --- a/packages/order-watcher/test/event_watcher_test.ts +++ /dev/null @@ -1,126 +0,0 @@ -import { callbackErrorReporter } from '@0xproject/dev-utils'; -import { DoneCallback, LogEntry, LogEntryEvent } from '@0xproject/types'; -import { Web3Wrapper } from '@0xproject/web3-wrapper'; -import * as chai from 'chai'; -import * as _ from 'lodash'; -import 'mocha'; -import * as Sinon from 'sinon'; - -import { EventWatcher } from '../src/order_watcher/event_watcher'; - -import { chaiSetup } from './utils/chai_setup'; -import { provider } from './utils/web3_wrapper'; - -chaiSetup.configure(); -const expect = chai.expect; - -describe('EventWatcher', () => { - let stubs: Sinon.SinonStub[] = []; - let eventWatcher: EventWatcher; - let web3Wrapper: Web3Wrapper; - const logA: LogEntry = { - address: '0x71d271f8b14adef568f8f28f1587ce7271ac4ca5', - blockHash: null, - blockNumber: null, - data: '', - logIndex: null, - topics: [], - transactionHash: '0x004881d38cd4a8f72f1a0d68c8b9b8124504706041ff37019c1d1ed6bfda8e17', - transactionIndex: 0, - }; - const logB: LogEntry = { - address: '0x8d12a197cb00d4747a1fe03395095ce2a5cc6819', - blockHash: null, - blockNumber: null, - data: '', - logIndex: null, - topics: ['0xf341246adaac6f497bc2a656f546ab9e182111d630394f0c57c710a59a2cb567'], - transactionHash: '0x01ef3c048b18d9b09ea195b4ed94cf8dd5f3d857a1905ff886b152cfb1166f25', - transactionIndex: 0, - }; - const logC: LogEntry = { - address: '0x1d271f8b174adef58f1587ce68f8f27271ac4ca5', - blockHash: null, - blockNumber: null, - data: '', - logIndex: null, - topics: ['0xf341246adaac6f497bc2a656f546ab9e182111d630394f0c57c710a59a2cb567'], - transactionHash: '0x01ef3c048b18d9b09ea195b4ed94cf8dd5f3d857a1905ff886b152cfb1166f25', - transactionIndex: 0, - }; - before(async () => { - const pollingIntervalMs = 10; - web3Wrapper = new Web3Wrapper(provider); - eventWatcher = new EventWatcher(web3Wrapper, pollingIntervalMs); - }); - afterEach(() => { - // clean up any stubs after the test has completed - _.each(stubs, s => s.restore()); - stubs = []; - eventWatcher.unsubscribe(); - }); - it('correctly emits initial log events', (done: DoneCallback) => { - const logs: LogEntry[] = [logA, logB]; - const expectedLogEvents = [ - { - removed: false, - ...logA, - }, - { - removed: false, - ...logB, - }, - ]; - const getLogsStub = Sinon.stub(web3Wrapper, 'getLogsAsync'); - getLogsStub.onCall(0).returns(logs); - stubs.push(getLogsStub); - const expectedToBeCalledOnce = false; - const callback = callbackErrorReporter.reportNodeCallbackErrors(done, expectedToBeCalledOnce)( - (event: LogEntryEvent) => { - const expectedLogEvent = expectedLogEvents.shift(); - expect(event).to.be.deep.equal(expectedLogEvent); - if (_.isEmpty(expectedLogEvents)) { - done(); - } - }, - ); - eventWatcher.subscribe(callback); - }); - it('correctly computes the difference and emits only changes', (done: DoneCallback) => { - const initialLogs: LogEntry[] = [logA, logB]; - const changedLogs: LogEntry[] = [logA, logC]; - const expectedLogEvents = [ - { - removed: false, - ...logA, - }, - { - removed: false, - ...logB, - }, - { - removed: true, - ...logB, - }, - { - removed: false, - ...logC, - }, - ]; - const getLogsStub = Sinon.stub(web3Wrapper, 'getLogsAsync'); - getLogsStub.onCall(0).returns(initialLogs); - getLogsStub.onCall(1).returns(changedLogs); - stubs.push(getLogsStub); - const expectedToBeCalledOnce = false; - const callback = callbackErrorReporter.reportNodeCallbackErrors(done, expectedToBeCalledOnce)( - (event: LogEntryEvent) => { - const expectedLogEvent = expectedLogEvents.shift(); - expect(event).to.be.deep.equal(expectedLogEvent); - if (_.isEmpty(expectedLogEvents)) { - done(); - } - }, - ); - eventWatcher.subscribe(callback); - }); -}); -- cgit v1.2.3 From a874cd2424b5a42aed7bcf287dbd4314237e3522 Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Thu, 5 Jul 2018 16:42:01 +0200 Subject: Remove legacy logic --- .../src/order_watcher/event_watcher.ts | 35 ---------------------- 1 file changed, 35 deletions(-) diff --git a/packages/order-watcher/src/order_watcher/event_watcher.ts b/packages/order-watcher/src/order_watcher/event_watcher.ts index e14280421..9a4089384 100644 --- a/packages/order-watcher/src/order_watcher/event_watcher.ts +++ b/packages/order-watcher/src/order_watcher/event_watcher.ts @@ -52,15 +52,6 @@ export class EventWatcher { throw new Error(OrderWatcherError.SubscriptionAlreadyPresent); } this._startBlockAndLogStream(callback); - // TODO: IS the above the correct refactor of this? - // this._intervalIdIfExists = intervalUtils.setAsyncExcludingInterval( - // this._pollForBlockchainEventsAsync.bind(this, callback), - // this._pollingIntervalMs, - // (err: Error) => { - // this.unsubscribe(); - // callback(err); - // }, - // ); } public unsubscribe(): void { if (!_.isUndefined(this._blockAndLogStreamIntervalIfExists)) { @@ -113,32 +104,6 @@ export class EventWatcher { await this._blockAndLogStreamerIfExists.reconcileNewBlock((latestBlock as any) as Block); } } - // private async _pollForBlockchainEventsAsync(callback: EventWatcherCallback): Promise { - // const pendingEvents = await this._getEventsAsync(); - // if (_.isUndefined(pendingEvents)) { - // // HACK: This should never happen, but happens frequently on CI due to a ganache bug - // return; - // } - // if (pendingEvents.length === 0) { - // // HACK: Sometimes when node rebuilds the pending block we get back the empty result. - // // We don't want to emit a lot of removal events and bring them back after a couple of miliseconds, - // // that's why we just ignore those cases. - // return; - // } - // const removedEvents = _.differenceBy(this._lastEvents, pendingEvents, JSON.stringify); - // const newEvents = _.differenceBy(pendingEvents, this._lastEvents, JSON.stringify); - // await this._emitDifferencesAsync(removedEvents, LogEventState.Removed, callback); - // await this._emitDifferencesAsync(newEvents, LogEventState.Added, callback); - // this._lastEvents = pendingEvents; - // } - // private async _getEventsAsync(): Promise { - // const eventFilter = { - // fromBlock: this._stateLayer, - // toBlock: this._stateLayer, - // }; - // const events = await this._web3Wrapper.getLogsAsync(eventFilter); - // return events; - // } private async _emitDifferencesAsync( log: LogEntry, logEventState: LogEventState, -- cgit v1.2.3 From 0df36471b7f6dd53c4aa3c1d34b88e904b15c35f Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Thu, 5 Jul 2018 22:24:21 +0200 Subject: Pass actual networkId --- packages/order-watcher/test/order_watcher_test.ts | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/packages/order-watcher/test/order_watcher_test.ts b/packages/order-watcher/test/order_watcher_test.ts index f8175e103..2889051bc 100644 --- a/packages/order-watcher/test/order_watcher_test.ts +++ b/packages/order-watcher/test/order_watcher_test.ts @@ -46,16 +46,15 @@ describe('OrderWatcher', () => { let taker: string; let signedOrder: SignedOrder; let orderWatcher: OrderWatcher; - const config = { - networkId: constants.TESTRPC_NETWORK_ID, - }; const decimals = constants.ZRX_DECIMALS; const fillableAmount = Web3Wrapper.toBaseUnitAmount(new BigNumber(5), decimals); before(async () => { - contractWrappers = new ContractWrappers(provider, config); - // tslint:disable-next-line:no-unused-variable const networkId = await web3Wrapper.getNetworkIdAsync(); - orderWatcher = new OrderWatcher(provider, constants.TESTRPC_NETWORK_ID); + const config = { + networkId, + }; + contractWrappers = new ContractWrappers(provider, config); + orderWatcher = new OrderWatcher(provider, networkId); exchangeContractAddress = contractWrappers.exchange.getContractAddress(); userAddresses = await web3Wrapper.getAvailableAddressesAsync(); [, maker, taker] = userAddresses; -- cgit v1.2.3 From abb38e1bc05889a145ab91177609a11ab469c187 Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Thu, 5 Jul 2018 22:25:47 +0200 Subject: Don't unsubscribe on Blockstream errors --- packages/order-watcher/src/order_watcher/event_watcher.ts | 6 +----- packages/order-watcher/src/order_watcher/order_watcher.ts | 1 - 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/packages/order-watcher/src/order_watcher/event_watcher.ts b/packages/order-watcher/src/order_watcher/event_watcher.ts index 9a4089384..0e27cd64b 100644 --- a/packages/order-watcher/src/order_watcher/event_watcher.ts +++ b/packages/order-watcher/src/order_watcher/event_watcher.ts @@ -74,7 +74,7 @@ export class EventWatcher { this._blockAndLogStreamIntervalIfExists = intervalUtils.setAsyncExcludingInterval( this._reconcileBlockAsync.bind(this), this._pollingIntervalMs, - this._onReconcileBlockError.bind(this, callback), + EventWatcher._onBlockAndLogStreamerError.bind(this, callback), ); let isRemoved = false; this._onLogAddedSubscriptionToken = this._blockAndLogStreamerIfExists.subscribeToOnLogAdded( @@ -85,10 +85,6 @@ export class EventWatcher { this._onLogStateChangedAsync.bind(this, callback, isRemoved), ); } - private _onReconcileBlockError(callback: EventWatcherCallback, err: Error): void { - this.unsubscribe(); - callback(err); - } private async _onLogStateChangedAsync( callback: EventWatcherCallback, isRemoved: boolean, diff --git a/packages/order-watcher/src/order_watcher/order_watcher.ts b/packages/order-watcher/src/order_watcher/order_watcher.ts index 0ee56592e..b28e9bc37 100644 --- a/packages/order-watcher/src/order_watcher/order_watcher.ts +++ b/packages/order-watcher/src/order_watcher/order_watcher.ts @@ -233,7 +233,6 @@ export class OrderWatcher { if (!_.isNull(err)) { if (!_.isUndefined(this._callbackIfExists)) { this._callbackIfExists(err); - this.unsubscribe(); } return; } -- cgit v1.2.3 From ade2f96ca34b1e625495ff042d3dc53d321cc1aa Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Thu, 5 Jul 2018 22:32:25 +0200 Subject: Add changelog notes --- packages/order-watcher/CHANGELOG.json | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/order-watcher/CHANGELOG.json b/packages/order-watcher/CHANGELOG.json index 00f6ad1b9..7a4b753a0 100644 --- a/packages/order-watcher/CHANGELOG.json +++ b/packages/order-watcher/CHANGELOG.json @@ -3,7 +3,10 @@ "version": "0.0.7", "changes": [ { - "note": "Dependencies updated" + "note": "Switch out simple getLogs polling with ethereumjs-blockstream" + }, + { + "note": "Do not stop subscription if error is encountered" } ] }, -- cgit v1.2.3 From cfbb1c440ee6659fc4790b71929770bc7e54ea69 Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Thu, 5 Jul 2018 22:33:11 +0200 Subject: Add pr number --- packages/order-watcher/CHANGELOG.json | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/order-watcher/CHANGELOG.json b/packages/order-watcher/CHANGELOG.json index 7a4b753a0..6104fc81c 100644 --- a/packages/order-watcher/CHANGELOG.json +++ b/packages/order-watcher/CHANGELOG.json @@ -3,10 +3,12 @@ "version": "0.0.7", "changes": [ { - "note": "Switch out simple getLogs polling with ethereumjs-blockstream" + "note": "Switch out simple getLogs polling with ethereumjs-blockstream", + "pr": 825 }, { - "note": "Do not stop subscription if error is encountered" + "note": "Do not stop subscription if error is encountered", + "pr": 825 } ] }, -- cgit v1.2.3 From 32ad34d2241c8e23002a4a7fc267a8a2e92fb304 Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Thu, 5 Jul 2018 23:18:55 +0200 Subject: properly stop blockstream and pass stateLayer into blockstream --- .../src/order_watcher/event_watcher.ts | 25 ++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/packages/order-watcher/src/order_watcher/event_watcher.ts b/packages/order-watcher/src/order_watcher/event_watcher.ts index 0e27cd64b..d439d9e5b 100644 --- a/packages/order-watcher/src/order_watcher/event_watcher.ts +++ b/packages/order-watcher/src/order_watcher/event_watcher.ts @@ -54,19 +54,22 @@ export class EventWatcher { this._startBlockAndLogStream(callback); } public unsubscribe(): void { - if (!_.isUndefined(this._blockAndLogStreamIntervalIfExists)) { - intervalUtils.clearAsyncExcludingInterval(this._blockAndLogStreamIntervalIfExists); - delete this._blockAndLogStreamIntervalIfExists; - delete this._blockAndLogStreamerIfExists; + if (_.isUndefined(this._blockAndLogStreamIntervalIfExists)) { + throw new Error(OrderWatcherError.SubscriptionNotFound); } + this._stopBlockAndLogStream(); } private _startBlockAndLogStream(callback: EventWatcherCallback): void { if (!_.isUndefined(this._blockAndLogStreamerIfExists)) { throw new Error(OrderWatcherError.SubscriptionAlreadyPresent); } + const eventFilter = { + fromBlock: this._stateLayer, + toBlock: this._stateLayer, + }; this._blockAndLogStreamerIfExists = new BlockAndLogStreamer( - this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper), - this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper), + this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper, this._stateLayer), + this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper, eventFilter), EventWatcher._onBlockAndLogStreamerError.bind(this, callback), ); const catchAllLogFilter = {}; @@ -85,6 +88,16 @@ export class EventWatcher { this._onLogStateChangedAsync.bind(this, callback, isRemoved), ); } + private _stopBlockAndLogStream(): void { + if (_.isUndefined(this._blockAndLogStreamerIfExists)) { + throw new Error(OrderWatcherError.SubscriptionNotFound); + } + this._blockAndLogStreamerIfExists.unsubscribeFromOnLogAdded(this._onLogAddedSubscriptionToken as string); + this._blockAndLogStreamerIfExists.unsubscribeFromOnLogRemoved(this._onLogRemovedSubscriptionToken as string); + intervalUtils.clearAsyncExcludingInterval(this._blockAndLogStreamIntervalIfExists as NodeJS.Timer); + delete this._blockAndLogStreamerIfExists; + delete this._blockAndLogStreamIntervalIfExists; + } private async _onLogStateChangedAsync( callback: EventWatcherCallback, isRemoved: boolean, -- cgit v1.2.3 From 1e0fa776c1b3a1c471c3ce12ced4a29f6d6a40dd Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Fri, 6 Jul 2018 12:34:03 +0200 Subject: Add isVerbose flag and log blockstream recoverable errors rather then bubbling them up --- .../src/order_watcher/event_watcher.ts | 21 +++++++++++++-------- .../src/order_watcher/order_watcher.ts | 3 ++- packages/order-watcher/src/types.ts | 3 ++- 3 files changed, 17 insertions(+), 10 deletions(-) diff --git a/packages/order-watcher/src/order_watcher/event_watcher.ts b/packages/order-watcher/src/order_watcher/event_watcher.ts index d439d9e5b..08ecf81cb 100644 --- a/packages/order-watcher/src/order_watcher/event_watcher.ts +++ b/packages/order-watcher/src/order_watcher/event_watcher.ts @@ -1,5 +1,5 @@ import { BlockParamLiteral, LogEntry } from '@0xproject/types'; -import { intervalUtils } from '@0xproject/utils'; +import { intervalUtils, logUtils } from '@0xproject/utils'; import { Web3Wrapper } from '@0xproject/web3-wrapper'; import { Block, BlockAndLogStreamer, Log } from 'ethereumjs-blockstream'; import * as _ from 'lodash'; @@ -26,16 +26,14 @@ export class EventWatcher { private _onLogRemovedSubscriptionToken: string | undefined; private _pollingIntervalMs: number; private _stateLayer: BlockParamLiteral; - private static _onBlockAndLogStreamerError(callback: EventWatcherCallback, err: Error): void { - // Propogate all Blockstream subscriber errors to - // top-level subscription - callback(err); - } + private _isVerbose: boolean; constructor( web3Wrapper: Web3Wrapper, pollingIntervalIfExistsMs: undefined | number, stateLayer: BlockParamLiteral = BlockParamLiteral.Latest, + isVerbose: boolean, ) { + this._isVerbose = isVerbose; this._web3Wrapper = web3Wrapper; this._stateLayer = stateLayer; this._pollingIntervalMs = _.isUndefined(pollingIntervalIfExistsMs) @@ -70,14 +68,14 @@ export class EventWatcher { this._blockAndLogStreamerIfExists = new BlockAndLogStreamer( this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper, this._stateLayer), this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper, eventFilter), - EventWatcher._onBlockAndLogStreamerError.bind(this, callback), + this._onBlockAndLogStreamerError.bind(this), ); const catchAllLogFilter = {}; this._blockAndLogStreamerIfExists.addLogFilter(catchAllLogFilter); this._blockAndLogStreamIntervalIfExists = intervalUtils.setAsyncExcludingInterval( this._reconcileBlockAsync.bind(this), this._pollingIntervalMs, - EventWatcher._onBlockAndLogStreamerError.bind(this, callback), + this._onBlockAndLogStreamerError.bind(this), ); let isRemoved = false; this._onLogAddedSubscriptionToken = this._blockAndLogStreamerIfExists.subscribeToOnLogAdded( @@ -126,4 +124,11 @@ export class EventWatcher { callback(null, logEvent); } } + private _onBlockAndLogStreamerError(err: Error): void { + // Since Blockstream errors are all recoverable, we simply log them if the verbose + // config is passed in. + if (this._isVerbose) { + logUtils.warn(err); + } + } } diff --git a/packages/order-watcher/src/order_watcher/order_watcher.ts b/packages/order-watcher/src/order_watcher/order_watcher.ts index b28e9bc37..a822e9f56 100644 --- a/packages/order-watcher/src/order_watcher/order_watcher.ts +++ b/packages/order-watcher/src/order_watcher/order_watcher.ts @@ -90,7 +90,8 @@ export class OrderWatcher { const pollingIntervalIfExistsMs = _.isUndefined(config) ? undefined : config.eventPollingIntervalMs; const stateLayer = _.isUndefined(config) || _.isUndefined(config.stateLayer) ? BlockParamLiteral.Latest : config.stateLayer; - this._eventWatcher = new EventWatcher(this._web3Wrapper, pollingIntervalIfExistsMs, stateLayer); + const isVerbose = !_.isUndefined(config) && !_.isUndefined(config.isVerbose) ? config.isVerbose : false; + this._eventWatcher = new EventWatcher(this._web3Wrapper, pollingIntervalIfExistsMs, stateLayer, isVerbose); this._balanceAndProxyAllowanceLazyStore = new BalanceAndProxyAllowanceLazyStore( this._contractWrappers.token, stateLayer, diff --git a/packages/order-watcher/src/types.ts b/packages/order-watcher/src/types.ts index f5b189c5a..63e4e7848 100644 --- a/packages/order-watcher/src/types.ts +++ b/packages/order-watcher/src/types.ts @@ -16,11 +16,12 @@ export type EventWatcherCallback = (err: null | Error, log?: LogEntryEvent) => v * stateLayer: Optional blockchain state layer OrderWatcher will monitor for new events. Default=latest. */ export interface OrderWatcherConfig { + stateLayer: BlockParamLiteral; orderExpirationCheckingIntervalMs?: number; eventPollingIntervalMs?: number; expirationMarginMs?: number; cleanupJobIntervalMs?: number; - stateLayer: BlockParamLiteral; + isVerbose?: boolean; } export type OnOrderStateChangeCallback = (err: Error | null, orderState?: OrderState) => void; -- cgit v1.2.3