diff options
Diffstat (limited to 'packages/order-watcher')
-rw-r--r-- | packages/order-watcher/CHANGELOG.json | 7 | ||||
-rw-r--r-- | packages/order-watcher/package.json | 1 | ||||
-rw-r--r-- | packages/order-watcher/src/order_watcher/event_watcher.ts | 125 | ||||
-rw-r--r-- | packages/order-watcher/src/order_watcher/order_watcher.ts | 4 | ||||
-rw-r--r-- | packages/order-watcher/src/types.ts | 3 | ||||
-rw-r--r-- | packages/order-watcher/test/event_watcher_test.ts | 126 | ||||
-rw-r--r-- | packages/order-watcher/test/order_watcher_test.ts | 11 |
7 files changed, 96 insertions, 181 deletions
diff --git a/packages/order-watcher/CHANGELOG.json b/packages/order-watcher/CHANGELOG.json index 00f6ad1b9..6104fc81c 100644 --- a/packages/order-watcher/CHANGELOG.json +++ b/packages/order-watcher/CHANGELOG.json @@ -3,7 +3,12 @@ "version": "0.0.7", "changes": [ { - "note": "Dependencies updated" + "note": "Switch out simple getLogs polling with ethereumjs-blockstream", + "pr": 825 + }, + { + "note": "Do not stop subscription if error is encountered", + "pr": 825 } ] }, diff --git a/packages/order-watcher/package.json b/packages/order-watcher/package.json index 19ddf4358..44748ddf9 100644 --- a/packages/order-watcher/package.json +++ b/packages/order-watcher/package.json @@ -87,6 +87,7 @@ "@0xproject/typescript-typings": "^0.4.1", "@0xproject/utils": "^0.7.2", "@0xproject/web3-wrapper": "^0.7.1", + "ethereumjs-blockstream": "5.0.0", "ethereum-types": "^0.0.2", "bintrees": "^1.0.2", "ethers": "3.0.22", diff --git a/packages/order-watcher/src/order_watcher/event_watcher.ts b/packages/order-watcher/src/order_watcher/event_watcher.ts index f39d3bf0e..08ecf81cb 100644 --- a/packages/order-watcher/src/order_watcher/event_watcher.ts +++ b/packages/order-watcher/src/order_watcher/event_watcher.ts @@ -1,6 +1,7 @@ import { BlockParamLiteral, LogEntry } from '@0xproject/types'; -import { intervalUtils } from '@0xproject/utils'; +import { intervalUtils, logUtils } from '@0xproject/utils'; import { Web3Wrapper } from '@0xproject/web3-wrapper'; +import { Block, BlockAndLogStreamer, Log } from 'ethereumjs-blockstream'; import * as _ from 'lodash'; import { EventWatcherCallback, OrderWatcherError } from '../types'; @@ -19,81 +20,115 @@ enum LogEventState { */ export class EventWatcher { private _web3Wrapper: Web3Wrapper; + private _blockAndLogStreamerIfExists: BlockAndLogStreamer<Block, Log> | undefined; + private _blockAndLogStreamIntervalIfExists?: NodeJS.Timer; + private _onLogAddedSubscriptionToken: string | undefined; + private _onLogRemovedSubscriptionToken: string | undefined; private _pollingIntervalMs: number; - private _intervalIdIfExists?: NodeJS.Timer; - private _lastEvents: LogEntry[] = []; private _stateLayer: BlockParamLiteral; + private _isVerbose: boolean; constructor( web3Wrapper: Web3Wrapper, pollingIntervalIfExistsMs: undefined | number, stateLayer: BlockParamLiteral = BlockParamLiteral.Latest, + isVerbose: boolean, ) { + this._isVerbose = isVerbose; this._web3Wrapper = web3Wrapper; this._stateLayer = stateLayer; this._pollingIntervalMs = _.isUndefined(pollingIntervalIfExistsMs) ? DEFAULT_EVENT_POLLING_INTERVAL_MS : pollingIntervalIfExistsMs; + this._blockAndLogStreamerIfExists = undefined; + this._blockAndLogStreamIntervalIfExists = undefined; + this._onLogAddedSubscriptionToken = undefined; + this._onLogRemovedSubscriptionToken = undefined; } public subscribe(callback: EventWatcherCallback): void { assert.isFunction('callback', callback); - if (!_.isUndefined(this._intervalIdIfExists)) { + if (!_.isUndefined(this._blockAndLogStreamIntervalIfExists)) { throw new Error(OrderWatcherError.SubscriptionAlreadyPresent); } - this._intervalIdIfExists = intervalUtils.setAsyncExcludingInterval( - this._pollForBlockchainEventsAsync.bind(this, callback), - this._pollingIntervalMs, - (err: Error) => { - this.unsubscribe(); - callback(err); - }, - ); + this._startBlockAndLogStream(callback); } public unsubscribe(): void { - this._lastEvents = []; - if (!_.isUndefined(this._intervalIdIfExists)) { - intervalUtils.clearAsyncExcludingInterval(this._intervalIdIfExists); - delete this._intervalIdIfExists; + if (_.isUndefined(this._blockAndLogStreamIntervalIfExists)) { + throw new Error(OrderWatcherError.SubscriptionNotFound); } + this._stopBlockAndLogStream(); } - private async _pollForBlockchainEventsAsync(callback: EventWatcherCallback): Promise<void> { - const pendingEvents = await this._getEventsAsync(); - if (_.isUndefined(pendingEvents)) { - // HACK: This should never happen, but happens frequently on CI due to a ganache bug - return; - } - if (pendingEvents.length === 0) { - // HACK: Sometimes when node rebuilds the pending block we get back the empty result. - // We don't want to emit a lot of removal events and bring them back after a couple of miliseconds, - // that's why we just ignore those cases. - return; + private _startBlockAndLogStream(callback: EventWatcherCallback): void { + if (!_.isUndefined(this._blockAndLogStreamerIfExists)) { + throw new Error(OrderWatcherError.SubscriptionAlreadyPresent); } - const removedEvents = _.differenceBy(this._lastEvents, pendingEvents, JSON.stringify); - const newEvents = _.differenceBy(pendingEvents, this._lastEvents, JSON.stringify); - await this._emitDifferencesAsync(removedEvents, LogEventState.Removed, callback); - await this._emitDifferencesAsync(newEvents, LogEventState.Added, callback); - this._lastEvents = pendingEvents; - } - private async _getEventsAsync(): Promise<LogEntry[]> { const eventFilter = { fromBlock: this._stateLayer, toBlock: this._stateLayer, }; - const events = await this._web3Wrapper.getLogsAsync(eventFilter); - return events; + this._blockAndLogStreamerIfExists = new BlockAndLogStreamer( + this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper, this._stateLayer), + this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper, eventFilter), + this._onBlockAndLogStreamerError.bind(this), + ); + const catchAllLogFilter = {}; + this._blockAndLogStreamerIfExists.addLogFilter(catchAllLogFilter); + this._blockAndLogStreamIntervalIfExists = intervalUtils.setAsyncExcludingInterval( + this._reconcileBlockAsync.bind(this), + this._pollingIntervalMs, + this._onBlockAndLogStreamerError.bind(this), + ); + let isRemoved = false; + this._onLogAddedSubscriptionToken = this._blockAndLogStreamerIfExists.subscribeToOnLogAdded( + this._onLogStateChangedAsync.bind(this, callback, isRemoved), + ); + isRemoved = true; + this._onLogRemovedSubscriptionToken = this._blockAndLogStreamerIfExists.subscribeToOnLogRemoved( + this._onLogStateChangedAsync.bind(this, callback, isRemoved), + ); + } + private _stopBlockAndLogStream(): void { + if (_.isUndefined(this._blockAndLogStreamerIfExists)) { + throw new Error(OrderWatcherError.SubscriptionNotFound); + } + this._blockAndLogStreamerIfExists.unsubscribeFromOnLogAdded(this._onLogAddedSubscriptionToken as string); + this._blockAndLogStreamerIfExists.unsubscribeFromOnLogRemoved(this._onLogRemovedSubscriptionToken as string); + intervalUtils.clearAsyncExcludingInterval(this._blockAndLogStreamIntervalIfExists as NodeJS.Timer); + delete this._blockAndLogStreamerIfExists; + delete this._blockAndLogStreamIntervalIfExists; + } + private async _onLogStateChangedAsync( + callback: EventWatcherCallback, + isRemoved: boolean, + log: LogEntry, + ): Promise<void> { + await this._emitDifferencesAsync(log, isRemoved ? LogEventState.Removed : LogEventState.Added, callback); + } + private async _reconcileBlockAsync(): Promise<void> { + const latestBlock = await this._web3Wrapper.getBlockAsync(BlockParamLiteral.Latest); + // We need to coerce to Block type cause Web3.Block includes types for mempool blocks + if (!_.isUndefined(this._blockAndLogStreamerIfExists)) { + // If we clear the interval while fetching the block - this._blockAndLogStreamer will be undefined + await this._blockAndLogStreamerIfExists.reconcileNewBlock((latestBlock as any) as Block); + } } private async _emitDifferencesAsync( - logs: LogEntry[], + log: LogEntry, logEventState: LogEventState, callback: EventWatcherCallback, ): Promise<void> { - for (const log of logs) { - const logEvent = { - removed: logEventState === LogEventState.Removed, - ...log, - }; - if (!_.isUndefined(this._intervalIdIfExists)) { - callback(null, logEvent); - } + const logEvent = { + removed: logEventState === LogEventState.Removed, + ...log, + }; + if (!_.isUndefined(this._blockAndLogStreamIntervalIfExists)) { + callback(null, logEvent); + } + } + private _onBlockAndLogStreamerError(err: Error): void { + // Since Blockstream errors are all recoverable, we simply log them if the verbose + // config is passed in. + if (this._isVerbose) { + logUtils.warn(err); } } } diff --git a/packages/order-watcher/src/order_watcher/order_watcher.ts b/packages/order-watcher/src/order_watcher/order_watcher.ts index cac3a0923..999c1c79e 100644 --- a/packages/order-watcher/src/order_watcher/order_watcher.ts +++ b/packages/order-watcher/src/order_watcher/order_watcher.ts @@ -93,7 +93,8 @@ export class OrderWatcher { const pollingIntervalIfExistsMs = _.isUndefined(config) ? undefined : config.eventPollingIntervalMs; const stateLayer = _.isUndefined(config) || _.isUndefined(config.stateLayer) ? BlockParamLiteral.Latest : config.stateLayer; - this._eventWatcher = new EventWatcher(this._web3Wrapper, pollingIntervalIfExistsMs, stateLayer); + const isVerbose = !_.isUndefined(config) && !_.isUndefined(config.isVerbose) ? config.isVerbose : false; + this._eventWatcher = new EventWatcher(this._web3Wrapper, pollingIntervalIfExistsMs, stateLayer, isVerbose); this._balanceAndProxyAllowanceLazyStore = new BalanceAndProxyAllowanceLazyStore( this._contractWrappers.token, stateLayer, @@ -236,7 +237,6 @@ export class OrderWatcher { if (!_.isNull(err)) { if (!_.isUndefined(this._callbackIfExists)) { this._callbackIfExists(err); - this.unsubscribe(); } return; } diff --git a/packages/order-watcher/src/types.ts b/packages/order-watcher/src/types.ts index f5b189c5a..63e4e7848 100644 --- a/packages/order-watcher/src/types.ts +++ b/packages/order-watcher/src/types.ts @@ -16,11 +16,12 @@ export type EventWatcherCallback = (err: null | Error, log?: LogEntryEvent) => v * stateLayer: Optional blockchain state layer OrderWatcher will monitor for new events. Default=latest. */ export interface OrderWatcherConfig { + stateLayer: BlockParamLiteral; orderExpirationCheckingIntervalMs?: number; eventPollingIntervalMs?: number; expirationMarginMs?: number; cleanupJobIntervalMs?: number; - stateLayer: BlockParamLiteral; + isVerbose?: boolean; } export type OnOrderStateChangeCallback = (err: Error | null, orderState?: OrderState) => void; diff --git a/packages/order-watcher/test/event_watcher_test.ts b/packages/order-watcher/test/event_watcher_test.ts deleted file mode 100644 index 9f4ac053f..000000000 --- a/packages/order-watcher/test/event_watcher_test.ts +++ /dev/null @@ -1,126 +0,0 @@ -import { callbackErrorReporter } from '@0xproject/dev-utils'; -import { DoneCallback, LogEntry, LogEntryEvent } from '@0xproject/types'; -import { Web3Wrapper } from '@0xproject/web3-wrapper'; -import * as chai from 'chai'; -import * as _ from 'lodash'; -import 'mocha'; -import * as Sinon from 'sinon'; - -import { EventWatcher } from '../src/order_watcher/event_watcher'; - -import { chaiSetup } from './utils/chai_setup'; -import { provider } from './utils/web3_wrapper'; - -chaiSetup.configure(); -const expect = chai.expect; - -describe('EventWatcher', () => { - let stubs: Sinon.SinonStub[] = []; - let eventWatcher: EventWatcher; - let web3Wrapper: Web3Wrapper; - const logA: LogEntry = { - address: '0x71d271f8b14adef568f8f28f1587ce7271ac4ca5', - blockHash: null, - blockNumber: null, - data: '', - logIndex: null, - topics: [], - transactionHash: '0x004881d38cd4a8f72f1a0d68c8b9b8124504706041ff37019c1d1ed6bfda8e17', - transactionIndex: 0, - }; - const logB: LogEntry = { - address: '0x8d12a197cb00d4747a1fe03395095ce2a5cc6819', - blockHash: null, - blockNumber: null, - data: '', - logIndex: null, - topics: ['0xf341246adaac6f497bc2a656f546ab9e182111d630394f0c57c710a59a2cb567'], - transactionHash: '0x01ef3c048b18d9b09ea195b4ed94cf8dd5f3d857a1905ff886b152cfb1166f25', - transactionIndex: 0, - }; - const logC: LogEntry = { - address: '0x1d271f8b174adef58f1587ce68f8f27271ac4ca5', - blockHash: null, - blockNumber: null, - data: '', - logIndex: null, - topics: ['0xf341246adaac6f497bc2a656f546ab9e182111d630394f0c57c710a59a2cb567'], - transactionHash: '0x01ef3c048b18d9b09ea195b4ed94cf8dd5f3d857a1905ff886b152cfb1166f25', - transactionIndex: 0, - }; - before(async () => { - const pollingIntervalMs = 10; - web3Wrapper = new Web3Wrapper(provider); - eventWatcher = new EventWatcher(web3Wrapper, pollingIntervalMs); - }); - afterEach(() => { - // clean up any stubs after the test has completed - _.each(stubs, s => s.restore()); - stubs = []; - eventWatcher.unsubscribe(); - }); - it('correctly emits initial log events', (done: DoneCallback) => { - const logs: LogEntry[] = [logA, logB]; - const expectedLogEvents = [ - { - removed: false, - ...logA, - }, - { - removed: false, - ...logB, - }, - ]; - const getLogsStub = Sinon.stub(web3Wrapper, 'getLogsAsync'); - getLogsStub.onCall(0).returns(logs); - stubs.push(getLogsStub); - const expectedToBeCalledOnce = false; - const callback = callbackErrorReporter.reportNodeCallbackErrors(done, expectedToBeCalledOnce)( - (event: LogEntryEvent) => { - const expectedLogEvent = expectedLogEvents.shift(); - expect(event).to.be.deep.equal(expectedLogEvent); - if (_.isEmpty(expectedLogEvents)) { - done(); - } - }, - ); - eventWatcher.subscribe(callback); - }); - it('correctly computes the difference and emits only changes', (done: DoneCallback) => { - const initialLogs: LogEntry[] = [logA, logB]; - const changedLogs: LogEntry[] = [logA, logC]; - const expectedLogEvents = [ - { - removed: false, - ...logA, - }, - { - removed: false, - ...logB, - }, - { - removed: true, - ...logB, - }, - { - removed: false, - ...logC, - }, - ]; - const getLogsStub = Sinon.stub(web3Wrapper, 'getLogsAsync'); - getLogsStub.onCall(0).returns(initialLogs); - getLogsStub.onCall(1).returns(changedLogs); - stubs.push(getLogsStub); - const expectedToBeCalledOnce = false; - const callback = callbackErrorReporter.reportNodeCallbackErrors(done, expectedToBeCalledOnce)( - (event: LogEntryEvent) => { - const expectedLogEvent = expectedLogEvents.shift(); - expect(event).to.be.deep.equal(expectedLogEvent); - if (_.isEmpty(expectedLogEvents)) { - done(); - } - }, - ); - eventWatcher.subscribe(callback); - }); -}); diff --git a/packages/order-watcher/test/order_watcher_test.ts b/packages/order-watcher/test/order_watcher_test.ts index f8175e103..2889051bc 100644 --- a/packages/order-watcher/test/order_watcher_test.ts +++ b/packages/order-watcher/test/order_watcher_test.ts @@ -46,16 +46,15 @@ describe('OrderWatcher', () => { let taker: string; let signedOrder: SignedOrder; let orderWatcher: OrderWatcher; - const config = { - networkId: constants.TESTRPC_NETWORK_ID, - }; const decimals = constants.ZRX_DECIMALS; const fillableAmount = Web3Wrapper.toBaseUnitAmount(new BigNumber(5), decimals); before(async () => { - contractWrappers = new ContractWrappers(provider, config); - // tslint:disable-next-line:no-unused-variable const networkId = await web3Wrapper.getNetworkIdAsync(); - orderWatcher = new OrderWatcher(provider, constants.TESTRPC_NETWORK_ID); + const config = { + networkId, + }; + contractWrappers = new ContractWrappers(provider, config); + orderWatcher = new OrderWatcher(provider, networkId); exchangeContractAddress = contractWrappers.exchange.getContractAddress(); userAddresses = await web3Wrapper.getAvailableAddressesAsync(); [, maker, taker] = userAddresses; |