aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--packages/order-watcher/package.json1
-rw-r--r--packages/order-watcher/src/order_watcher/event_watcher.ts150
-rw-r--r--packages/order-watcher/test/event_watcher_test.ts126
3 files changed, 104 insertions, 173 deletions
diff --git a/packages/order-watcher/package.json b/packages/order-watcher/package.json
index 40613ea96..047854c4d 100644
--- a/packages/order-watcher/package.json
+++ b/packages/order-watcher/package.json
@@ -87,6 +87,7 @@
"@0xproject/typescript-typings": "^0.4.1",
"@0xproject/utils": "^0.7.2",
"@0xproject/web3-wrapper": "^0.7.1",
+ "ethereumjs-blockstream": "5.0.0",
"ethereum-types": "^0.0.2",
"bintrees": "^1.0.2",
"ethers": "3.0.22",
diff --git a/packages/order-watcher/src/order_watcher/event_watcher.ts b/packages/order-watcher/src/order_watcher/event_watcher.ts
index f39d3bf0e..e14280421 100644
--- a/packages/order-watcher/src/order_watcher/event_watcher.ts
+++ b/packages/order-watcher/src/order_watcher/event_watcher.ts
@@ -1,6 +1,7 @@
import { BlockParamLiteral, LogEntry } from '@0xproject/types';
import { intervalUtils } from '@0xproject/utils';
import { Web3Wrapper } from '@0xproject/web3-wrapper';
+import { Block, BlockAndLogStreamer, Log } from 'ethereumjs-blockstream';
import * as _ from 'lodash';
import { EventWatcherCallback, OrderWatcherError } from '../types';
@@ -19,10 +20,17 @@ enum LogEventState {
*/
export class EventWatcher {
private _web3Wrapper: Web3Wrapper;
+ private _blockAndLogStreamerIfExists: BlockAndLogStreamer<Block, Log> | undefined;
+ private _blockAndLogStreamIntervalIfExists?: NodeJS.Timer;
+ private _onLogAddedSubscriptionToken: string | undefined;
+ private _onLogRemovedSubscriptionToken: string | undefined;
private _pollingIntervalMs: number;
- private _intervalIdIfExists?: NodeJS.Timer;
- private _lastEvents: LogEntry[] = [];
private _stateLayer: BlockParamLiteral;
+ private static _onBlockAndLogStreamerError(callback: EventWatcherCallback, err: Error): void {
+ // Propogate all Blockstream subscriber errors to
+ // top-level subscription
+ callback(err);
+ }
constructor(
web3Wrapper: Web3Wrapper,
pollingIntervalIfExistsMs: undefined | number,
@@ -33,67 +41,115 @@ export class EventWatcher {
this._pollingIntervalMs = _.isUndefined(pollingIntervalIfExistsMs)
? DEFAULT_EVENT_POLLING_INTERVAL_MS
: pollingIntervalIfExistsMs;
+ this._blockAndLogStreamerIfExists = undefined;
+ this._blockAndLogStreamIntervalIfExists = undefined;
+ this._onLogAddedSubscriptionToken = undefined;
+ this._onLogRemovedSubscriptionToken = undefined;
}
public subscribe(callback: EventWatcherCallback): void {
assert.isFunction('callback', callback);
- if (!_.isUndefined(this._intervalIdIfExists)) {
+ if (!_.isUndefined(this._blockAndLogStreamIntervalIfExists)) {
throw new Error(OrderWatcherError.SubscriptionAlreadyPresent);
}
- this._intervalIdIfExists = intervalUtils.setAsyncExcludingInterval(
- this._pollForBlockchainEventsAsync.bind(this, callback),
- this._pollingIntervalMs,
- (err: Error) => {
- this.unsubscribe();
- callback(err);
- },
- );
+ this._startBlockAndLogStream(callback);
+ // TODO: IS the above the correct refactor of this?
+ // this._intervalIdIfExists = intervalUtils.setAsyncExcludingInterval(
+ // this._pollForBlockchainEventsAsync.bind(this, callback),
+ // this._pollingIntervalMs,
+ // (err: Error) => {
+ // this.unsubscribe();
+ // callback(err);
+ // },
+ // );
}
public unsubscribe(): void {
- this._lastEvents = [];
- if (!_.isUndefined(this._intervalIdIfExists)) {
- intervalUtils.clearAsyncExcludingInterval(this._intervalIdIfExists);
- delete this._intervalIdIfExists;
+ if (!_.isUndefined(this._blockAndLogStreamIntervalIfExists)) {
+ intervalUtils.clearAsyncExcludingInterval(this._blockAndLogStreamIntervalIfExists);
+ delete this._blockAndLogStreamIntervalIfExists;
+ delete this._blockAndLogStreamerIfExists;
}
}
- private async _pollForBlockchainEventsAsync(callback: EventWatcherCallback): Promise<void> {
- const pendingEvents = await this._getEventsAsync();
- if (_.isUndefined(pendingEvents)) {
- // HACK: This should never happen, but happens frequently on CI due to a ganache bug
- return;
- }
- if (pendingEvents.length === 0) {
- // HACK: Sometimes when node rebuilds the pending block we get back the empty result.
- // We don't want to emit a lot of removal events and bring them back after a couple of miliseconds,
- // that's why we just ignore those cases.
- return;
+ private _startBlockAndLogStream(callback: EventWatcherCallback): void {
+ if (!_.isUndefined(this._blockAndLogStreamerIfExists)) {
+ throw new Error(OrderWatcherError.SubscriptionAlreadyPresent);
}
- const removedEvents = _.differenceBy(this._lastEvents, pendingEvents, JSON.stringify);
- const newEvents = _.differenceBy(pendingEvents, this._lastEvents, JSON.stringify);
- await this._emitDifferencesAsync(removedEvents, LogEventState.Removed, callback);
- await this._emitDifferencesAsync(newEvents, LogEventState.Added, callback);
- this._lastEvents = pendingEvents;
+ this._blockAndLogStreamerIfExists = new BlockAndLogStreamer(
+ this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper),
+ this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper),
+ EventWatcher._onBlockAndLogStreamerError.bind(this, callback),
+ );
+ const catchAllLogFilter = {};
+ this._blockAndLogStreamerIfExists.addLogFilter(catchAllLogFilter);
+ this._blockAndLogStreamIntervalIfExists = intervalUtils.setAsyncExcludingInterval(
+ this._reconcileBlockAsync.bind(this),
+ this._pollingIntervalMs,
+ this._onReconcileBlockError.bind(this, callback),
+ );
+ let isRemoved = false;
+ this._onLogAddedSubscriptionToken = this._blockAndLogStreamerIfExists.subscribeToOnLogAdded(
+ this._onLogStateChangedAsync.bind(this, callback, isRemoved),
+ );
+ isRemoved = true;
+ this._onLogRemovedSubscriptionToken = this._blockAndLogStreamerIfExists.subscribeToOnLogRemoved(
+ this._onLogStateChangedAsync.bind(this, callback, isRemoved),
+ );
}
- private async _getEventsAsync(): Promise<LogEntry[]> {
- const eventFilter = {
- fromBlock: this._stateLayer,
- toBlock: this._stateLayer,
- };
- const events = await this._web3Wrapper.getLogsAsync(eventFilter);
- return events;
+ private _onReconcileBlockError(callback: EventWatcherCallback, err: Error): void {
+ this.unsubscribe();
+ callback(err);
+ }
+ private async _onLogStateChangedAsync(
+ callback: EventWatcherCallback,
+ isRemoved: boolean,
+ log: LogEntry,
+ ): Promise<void> {
+ await this._emitDifferencesAsync(log, isRemoved ? LogEventState.Removed : LogEventState.Added, callback);
+ }
+ private async _reconcileBlockAsync(): Promise<void> {
+ const latestBlock = await this._web3Wrapper.getBlockAsync(BlockParamLiteral.Latest);
+ // We need to coerce to Block type cause Web3.Block includes types for mempool blocks
+ if (!_.isUndefined(this._blockAndLogStreamerIfExists)) {
+ // If we clear the interval while fetching the block - this._blockAndLogStreamer will be undefined
+ await this._blockAndLogStreamerIfExists.reconcileNewBlock((latestBlock as any) as Block);
+ }
}
+ // private async _pollForBlockchainEventsAsync(callback: EventWatcherCallback): Promise<void> {
+ // const pendingEvents = await this._getEventsAsync();
+ // if (_.isUndefined(pendingEvents)) {
+ // // HACK: This should never happen, but happens frequently on CI due to a ganache bug
+ // return;
+ // }
+ // if (pendingEvents.length === 0) {
+ // // HACK: Sometimes when node rebuilds the pending block we get back the empty result.
+ // // We don't want to emit a lot of removal events and bring them back after a couple of miliseconds,
+ // // that's why we just ignore those cases.
+ // return;
+ // }
+ // const removedEvents = _.differenceBy(this._lastEvents, pendingEvents, JSON.stringify);
+ // const newEvents = _.differenceBy(pendingEvents, this._lastEvents, JSON.stringify);
+ // await this._emitDifferencesAsync(removedEvents, LogEventState.Removed, callback);
+ // await this._emitDifferencesAsync(newEvents, LogEventState.Added, callback);
+ // this._lastEvents = pendingEvents;
+ // }
+ // private async _getEventsAsync(): Promise<LogEntry[]> {
+ // const eventFilter = {
+ // fromBlock: this._stateLayer,
+ // toBlock: this._stateLayer,
+ // };
+ // const events = await this._web3Wrapper.getLogsAsync(eventFilter);
+ // return events;
+ // }
private async _emitDifferencesAsync(
- logs: LogEntry[],
+ log: LogEntry,
logEventState: LogEventState,
callback: EventWatcherCallback,
): Promise<void> {
- for (const log of logs) {
- const logEvent = {
- removed: logEventState === LogEventState.Removed,
- ...log,
- };
- if (!_.isUndefined(this._intervalIdIfExists)) {
- callback(null, logEvent);
- }
+ const logEvent = {
+ removed: logEventState === LogEventState.Removed,
+ ...log,
+ };
+ if (!_.isUndefined(this._blockAndLogStreamIntervalIfExists)) {
+ callback(null, logEvent);
}
}
}
diff --git a/packages/order-watcher/test/event_watcher_test.ts b/packages/order-watcher/test/event_watcher_test.ts
deleted file mode 100644
index 9f4ac053f..000000000
--- a/packages/order-watcher/test/event_watcher_test.ts
+++ /dev/null
@@ -1,126 +0,0 @@
-import { callbackErrorReporter } from '@0xproject/dev-utils';
-import { DoneCallback, LogEntry, LogEntryEvent } from '@0xproject/types';
-import { Web3Wrapper } from '@0xproject/web3-wrapper';
-import * as chai from 'chai';
-import * as _ from 'lodash';
-import 'mocha';
-import * as Sinon from 'sinon';
-
-import { EventWatcher } from '../src/order_watcher/event_watcher';
-
-import { chaiSetup } from './utils/chai_setup';
-import { provider } from './utils/web3_wrapper';
-
-chaiSetup.configure();
-const expect = chai.expect;
-
-describe('EventWatcher', () => {
- let stubs: Sinon.SinonStub[] = [];
- let eventWatcher: EventWatcher;
- let web3Wrapper: Web3Wrapper;
- const logA: LogEntry = {
- address: '0x71d271f8b14adef568f8f28f1587ce7271ac4ca5',
- blockHash: null,
- blockNumber: null,
- data: '',
- logIndex: null,
- topics: [],
- transactionHash: '0x004881d38cd4a8f72f1a0d68c8b9b8124504706041ff37019c1d1ed6bfda8e17',
- transactionIndex: 0,
- };
- const logB: LogEntry = {
- address: '0x8d12a197cb00d4747a1fe03395095ce2a5cc6819',
- blockHash: null,
- blockNumber: null,
- data: '',
- logIndex: null,
- topics: ['0xf341246adaac6f497bc2a656f546ab9e182111d630394f0c57c710a59a2cb567'],
- transactionHash: '0x01ef3c048b18d9b09ea195b4ed94cf8dd5f3d857a1905ff886b152cfb1166f25',
- transactionIndex: 0,
- };
- const logC: LogEntry = {
- address: '0x1d271f8b174adef58f1587ce68f8f27271ac4ca5',
- blockHash: null,
- blockNumber: null,
- data: '',
- logIndex: null,
- topics: ['0xf341246adaac6f497bc2a656f546ab9e182111d630394f0c57c710a59a2cb567'],
- transactionHash: '0x01ef3c048b18d9b09ea195b4ed94cf8dd5f3d857a1905ff886b152cfb1166f25',
- transactionIndex: 0,
- };
- before(async () => {
- const pollingIntervalMs = 10;
- web3Wrapper = new Web3Wrapper(provider);
- eventWatcher = new EventWatcher(web3Wrapper, pollingIntervalMs);
- });
- afterEach(() => {
- // clean up any stubs after the test has completed
- _.each(stubs, s => s.restore());
- stubs = [];
- eventWatcher.unsubscribe();
- });
- it('correctly emits initial log events', (done: DoneCallback) => {
- const logs: LogEntry[] = [logA, logB];
- const expectedLogEvents = [
- {
- removed: false,
- ...logA,
- },
- {
- removed: false,
- ...logB,
- },
- ];
- const getLogsStub = Sinon.stub(web3Wrapper, 'getLogsAsync');
- getLogsStub.onCall(0).returns(logs);
- stubs.push(getLogsStub);
- const expectedToBeCalledOnce = false;
- const callback = callbackErrorReporter.reportNodeCallbackErrors(done, expectedToBeCalledOnce)(
- (event: LogEntryEvent) => {
- const expectedLogEvent = expectedLogEvents.shift();
- expect(event).to.be.deep.equal(expectedLogEvent);
- if (_.isEmpty(expectedLogEvents)) {
- done();
- }
- },
- );
- eventWatcher.subscribe(callback);
- });
- it('correctly computes the difference and emits only changes', (done: DoneCallback) => {
- const initialLogs: LogEntry[] = [logA, logB];
- const changedLogs: LogEntry[] = [logA, logC];
- const expectedLogEvents = [
- {
- removed: false,
- ...logA,
- },
- {
- removed: false,
- ...logB,
- },
- {
- removed: true,
- ...logB,
- },
- {
- removed: false,
- ...logC,
- },
- ];
- const getLogsStub = Sinon.stub(web3Wrapper, 'getLogsAsync');
- getLogsStub.onCall(0).returns(initialLogs);
- getLogsStub.onCall(1).returns(changedLogs);
- stubs.push(getLogsStub);
- const expectedToBeCalledOnce = false;
- const callback = callbackErrorReporter.reportNodeCallbackErrors(done, expectedToBeCalledOnce)(
- (event: LogEntryEvent) => {
- const expectedLogEvent = expectedLogEvents.shift();
- expect(event).to.be.deep.equal(expectedLogEvent);
- if (_.isEmpty(expectedLogEvents)) {
- done();
- }
- },
- );
- eventWatcher.subscribe(callback);
- });
-});