aboutsummaryrefslogtreecommitdiffstats
path: root/packages/order-watcher
diff options
context:
space:
mode:
Diffstat (limited to 'packages/order-watcher')
-rw-r--r--packages/order-watcher/CHANGELOG.json7
-rw-r--r--packages/order-watcher/package.json1
-rw-r--r--packages/order-watcher/src/order_watcher/event_watcher.ts125
-rw-r--r--packages/order-watcher/src/order_watcher/order_watcher.ts4
-rw-r--r--packages/order-watcher/src/types.ts3
-rw-r--r--packages/order-watcher/test/event_watcher_test.ts126
-rw-r--r--packages/order-watcher/test/order_watcher_test.ts11
7 files changed, 96 insertions, 181 deletions
diff --git a/packages/order-watcher/CHANGELOG.json b/packages/order-watcher/CHANGELOG.json
index 00f6ad1b9..6104fc81c 100644
--- a/packages/order-watcher/CHANGELOG.json
+++ b/packages/order-watcher/CHANGELOG.json
@@ -3,7 +3,12 @@
"version": "0.0.7",
"changes": [
{
- "note": "Dependencies updated"
+ "note": "Switch out simple getLogs polling with ethereumjs-blockstream",
+ "pr": 825
+ },
+ {
+ "note": "Do not stop subscription if error is encountered",
+ "pr": 825
}
]
},
diff --git a/packages/order-watcher/package.json b/packages/order-watcher/package.json
index 19ddf4358..44748ddf9 100644
--- a/packages/order-watcher/package.json
+++ b/packages/order-watcher/package.json
@@ -87,6 +87,7 @@
"@0xproject/typescript-typings": "^0.4.1",
"@0xproject/utils": "^0.7.2",
"@0xproject/web3-wrapper": "^0.7.1",
+ "ethereumjs-blockstream": "5.0.0",
"ethereum-types": "^0.0.2",
"bintrees": "^1.0.2",
"ethers": "3.0.22",
diff --git a/packages/order-watcher/src/order_watcher/event_watcher.ts b/packages/order-watcher/src/order_watcher/event_watcher.ts
index f39d3bf0e..08ecf81cb 100644
--- a/packages/order-watcher/src/order_watcher/event_watcher.ts
+++ b/packages/order-watcher/src/order_watcher/event_watcher.ts
@@ -1,6 +1,7 @@
import { BlockParamLiteral, LogEntry } from '@0xproject/types';
-import { intervalUtils } from '@0xproject/utils';
+import { intervalUtils, logUtils } from '@0xproject/utils';
import { Web3Wrapper } from '@0xproject/web3-wrapper';
+import { Block, BlockAndLogStreamer, Log } from 'ethereumjs-blockstream';
import * as _ from 'lodash';
import { EventWatcherCallback, OrderWatcherError } from '../types';
@@ -19,81 +20,115 @@ enum LogEventState {
*/
export class EventWatcher {
private _web3Wrapper: Web3Wrapper;
+ private _blockAndLogStreamerIfExists: BlockAndLogStreamer<Block, Log> | undefined;
+ private _blockAndLogStreamIntervalIfExists?: NodeJS.Timer;
+ private _onLogAddedSubscriptionToken: string | undefined;
+ private _onLogRemovedSubscriptionToken: string | undefined;
private _pollingIntervalMs: number;
- private _intervalIdIfExists?: NodeJS.Timer;
- private _lastEvents: LogEntry[] = [];
private _stateLayer: BlockParamLiteral;
+ private _isVerbose: boolean;
constructor(
web3Wrapper: Web3Wrapper,
pollingIntervalIfExistsMs: undefined | number,
stateLayer: BlockParamLiteral = BlockParamLiteral.Latest,
+ isVerbose: boolean,
) {
+ this._isVerbose = isVerbose;
this._web3Wrapper = web3Wrapper;
this._stateLayer = stateLayer;
this._pollingIntervalMs = _.isUndefined(pollingIntervalIfExistsMs)
? DEFAULT_EVENT_POLLING_INTERVAL_MS
: pollingIntervalIfExistsMs;
+ this._blockAndLogStreamerIfExists = undefined;
+ this._blockAndLogStreamIntervalIfExists = undefined;
+ this._onLogAddedSubscriptionToken = undefined;
+ this._onLogRemovedSubscriptionToken = undefined;
}
public subscribe(callback: EventWatcherCallback): void {
assert.isFunction('callback', callback);
- if (!_.isUndefined(this._intervalIdIfExists)) {
+ if (!_.isUndefined(this._blockAndLogStreamIntervalIfExists)) {
throw new Error(OrderWatcherError.SubscriptionAlreadyPresent);
}
- this._intervalIdIfExists = intervalUtils.setAsyncExcludingInterval(
- this._pollForBlockchainEventsAsync.bind(this, callback),
- this._pollingIntervalMs,
- (err: Error) => {
- this.unsubscribe();
- callback(err);
- },
- );
+ this._startBlockAndLogStream(callback);
}
public unsubscribe(): void {
- this._lastEvents = [];
- if (!_.isUndefined(this._intervalIdIfExists)) {
- intervalUtils.clearAsyncExcludingInterval(this._intervalIdIfExists);
- delete this._intervalIdIfExists;
+ if (_.isUndefined(this._blockAndLogStreamIntervalIfExists)) {
+ throw new Error(OrderWatcherError.SubscriptionNotFound);
}
+ this._stopBlockAndLogStream();
}
- private async _pollForBlockchainEventsAsync(callback: EventWatcherCallback): Promise<void> {
- const pendingEvents = await this._getEventsAsync();
- if (_.isUndefined(pendingEvents)) {
- // HACK: This should never happen, but happens frequently on CI due to a ganache bug
- return;
- }
- if (pendingEvents.length === 0) {
- // HACK: Sometimes when node rebuilds the pending block we get back the empty result.
- // We don't want to emit a lot of removal events and bring them back after a couple of miliseconds,
- // that's why we just ignore those cases.
- return;
+ private _startBlockAndLogStream(callback: EventWatcherCallback): void {
+ if (!_.isUndefined(this._blockAndLogStreamerIfExists)) {
+ throw new Error(OrderWatcherError.SubscriptionAlreadyPresent);
}
- const removedEvents = _.differenceBy(this._lastEvents, pendingEvents, JSON.stringify);
- const newEvents = _.differenceBy(pendingEvents, this._lastEvents, JSON.stringify);
- await this._emitDifferencesAsync(removedEvents, LogEventState.Removed, callback);
- await this._emitDifferencesAsync(newEvents, LogEventState.Added, callback);
- this._lastEvents = pendingEvents;
- }
- private async _getEventsAsync(): Promise<LogEntry[]> {
const eventFilter = {
fromBlock: this._stateLayer,
toBlock: this._stateLayer,
};
- const events = await this._web3Wrapper.getLogsAsync(eventFilter);
- return events;
+ this._blockAndLogStreamerIfExists = new BlockAndLogStreamer(
+ this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper, this._stateLayer),
+ this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper, eventFilter),
+ this._onBlockAndLogStreamerError.bind(this),
+ );
+ const catchAllLogFilter = {};
+ this._blockAndLogStreamerIfExists.addLogFilter(catchAllLogFilter);
+ this._blockAndLogStreamIntervalIfExists = intervalUtils.setAsyncExcludingInterval(
+ this._reconcileBlockAsync.bind(this),
+ this._pollingIntervalMs,
+ this._onBlockAndLogStreamerError.bind(this),
+ );
+ let isRemoved = false;
+ this._onLogAddedSubscriptionToken = this._blockAndLogStreamerIfExists.subscribeToOnLogAdded(
+ this._onLogStateChangedAsync.bind(this, callback, isRemoved),
+ );
+ isRemoved = true;
+ this._onLogRemovedSubscriptionToken = this._blockAndLogStreamerIfExists.subscribeToOnLogRemoved(
+ this._onLogStateChangedAsync.bind(this, callback, isRemoved),
+ );
+ }
+ private _stopBlockAndLogStream(): void {
+ if (_.isUndefined(this._blockAndLogStreamerIfExists)) {
+ throw new Error(OrderWatcherError.SubscriptionNotFound);
+ }
+ this._blockAndLogStreamerIfExists.unsubscribeFromOnLogAdded(this._onLogAddedSubscriptionToken as string);
+ this._blockAndLogStreamerIfExists.unsubscribeFromOnLogRemoved(this._onLogRemovedSubscriptionToken as string);
+ intervalUtils.clearAsyncExcludingInterval(this._blockAndLogStreamIntervalIfExists as NodeJS.Timer);
+ delete this._blockAndLogStreamerIfExists;
+ delete this._blockAndLogStreamIntervalIfExists;
+ }
+ private async _onLogStateChangedAsync(
+ callback: EventWatcherCallback,
+ isRemoved: boolean,
+ log: LogEntry,
+ ): Promise<void> {
+ await this._emitDifferencesAsync(log, isRemoved ? LogEventState.Removed : LogEventState.Added, callback);
+ }
+ private async _reconcileBlockAsync(): Promise<void> {
+ const latestBlock = await this._web3Wrapper.getBlockAsync(BlockParamLiteral.Latest);
+ // We need to coerce to Block type cause Web3.Block includes types for mempool blocks
+ if (!_.isUndefined(this._blockAndLogStreamerIfExists)) {
+ // If we clear the interval while fetching the block - this._blockAndLogStreamer will be undefined
+ await this._blockAndLogStreamerIfExists.reconcileNewBlock((latestBlock as any) as Block);
+ }
}
private async _emitDifferencesAsync(
- logs: LogEntry[],
+ log: LogEntry,
logEventState: LogEventState,
callback: EventWatcherCallback,
): Promise<void> {
- for (const log of logs) {
- const logEvent = {
- removed: logEventState === LogEventState.Removed,
- ...log,
- };
- if (!_.isUndefined(this._intervalIdIfExists)) {
- callback(null, logEvent);
- }
+ const logEvent = {
+ removed: logEventState === LogEventState.Removed,
+ ...log,
+ };
+ if (!_.isUndefined(this._blockAndLogStreamIntervalIfExists)) {
+ callback(null, logEvent);
+ }
+ }
+ private _onBlockAndLogStreamerError(err: Error): void {
+ // Since Blockstream errors are all recoverable, we simply log them if the verbose
+ // config is passed in.
+ if (this._isVerbose) {
+ logUtils.warn(err);
}
}
}
diff --git a/packages/order-watcher/src/order_watcher/order_watcher.ts b/packages/order-watcher/src/order_watcher/order_watcher.ts
index cac3a0923..999c1c79e 100644
--- a/packages/order-watcher/src/order_watcher/order_watcher.ts
+++ b/packages/order-watcher/src/order_watcher/order_watcher.ts
@@ -93,7 +93,8 @@ export class OrderWatcher {
const pollingIntervalIfExistsMs = _.isUndefined(config) ? undefined : config.eventPollingIntervalMs;
const stateLayer =
_.isUndefined(config) || _.isUndefined(config.stateLayer) ? BlockParamLiteral.Latest : config.stateLayer;
- this._eventWatcher = new EventWatcher(this._web3Wrapper, pollingIntervalIfExistsMs, stateLayer);
+ const isVerbose = !_.isUndefined(config) && !_.isUndefined(config.isVerbose) ? config.isVerbose : false;
+ this._eventWatcher = new EventWatcher(this._web3Wrapper, pollingIntervalIfExistsMs, stateLayer, isVerbose);
this._balanceAndProxyAllowanceLazyStore = new BalanceAndProxyAllowanceLazyStore(
this._contractWrappers.token,
stateLayer,
@@ -236,7 +237,6 @@ export class OrderWatcher {
if (!_.isNull(err)) {
if (!_.isUndefined(this._callbackIfExists)) {
this._callbackIfExists(err);
- this.unsubscribe();
}
return;
}
diff --git a/packages/order-watcher/src/types.ts b/packages/order-watcher/src/types.ts
index f5b189c5a..63e4e7848 100644
--- a/packages/order-watcher/src/types.ts
+++ b/packages/order-watcher/src/types.ts
@@ -16,11 +16,12 @@ export type EventWatcherCallback = (err: null | Error, log?: LogEntryEvent) => v
* stateLayer: Optional blockchain state layer OrderWatcher will monitor for new events. Default=latest.
*/
export interface OrderWatcherConfig {
+ stateLayer: BlockParamLiteral;
orderExpirationCheckingIntervalMs?: number;
eventPollingIntervalMs?: number;
expirationMarginMs?: number;
cleanupJobIntervalMs?: number;
- stateLayer: BlockParamLiteral;
+ isVerbose?: boolean;
}
export type OnOrderStateChangeCallback = (err: Error | null, orderState?: OrderState) => void;
diff --git a/packages/order-watcher/test/event_watcher_test.ts b/packages/order-watcher/test/event_watcher_test.ts
deleted file mode 100644
index 9f4ac053f..000000000
--- a/packages/order-watcher/test/event_watcher_test.ts
+++ /dev/null
@@ -1,126 +0,0 @@
-import { callbackErrorReporter } from '@0xproject/dev-utils';
-import { DoneCallback, LogEntry, LogEntryEvent } from '@0xproject/types';
-import { Web3Wrapper } from '@0xproject/web3-wrapper';
-import * as chai from 'chai';
-import * as _ from 'lodash';
-import 'mocha';
-import * as Sinon from 'sinon';
-
-import { EventWatcher } from '../src/order_watcher/event_watcher';
-
-import { chaiSetup } from './utils/chai_setup';
-import { provider } from './utils/web3_wrapper';
-
-chaiSetup.configure();
-const expect = chai.expect;
-
-describe('EventWatcher', () => {
- let stubs: Sinon.SinonStub[] = [];
- let eventWatcher: EventWatcher;
- let web3Wrapper: Web3Wrapper;
- const logA: LogEntry = {
- address: '0x71d271f8b14adef568f8f28f1587ce7271ac4ca5',
- blockHash: null,
- blockNumber: null,
- data: '',
- logIndex: null,
- topics: [],
- transactionHash: '0x004881d38cd4a8f72f1a0d68c8b9b8124504706041ff37019c1d1ed6bfda8e17',
- transactionIndex: 0,
- };
- const logB: LogEntry = {
- address: '0x8d12a197cb00d4747a1fe03395095ce2a5cc6819',
- blockHash: null,
- blockNumber: null,
- data: '',
- logIndex: null,
- topics: ['0xf341246adaac6f497bc2a656f546ab9e182111d630394f0c57c710a59a2cb567'],
- transactionHash: '0x01ef3c048b18d9b09ea195b4ed94cf8dd5f3d857a1905ff886b152cfb1166f25',
- transactionIndex: 0,
- };
- const logC: LogEntry = {
- address: '0x1d271f8b174adef58f1587ce68f8f27271ac4ca5',
- blockHash: null,
- blockNumber: null,
- data: '',
- logIndex: null,
- topics: ['0xf341246adaac6f497bc2a656f546ab9e182111d630394f0c57c710a59a2cb567'],
- transactionHash: '0x01ef3c048b18d9b09ea195b4ed94cf8dd5f3d857a1905ff886b152cfb1166f25',
- transactionIndex: 0,
- };
- before(async () => {
- const pollingIntervalMs = 10;
- web3Wrapper = new Web3Wrapper(provider);
- eventWatcher = new EventWatcher(web3Wrapper, pollingIntervalMs);
- });
- afterEach(() => {
- // clean up any stubs after the test has completed
- _.each(stubs, s => s.restore());
- stubs = [];
- eventWatcher.unsubscribe();
- });
- it('correctly emits initial log events', (done: DoneCallback) => {
- const logs: LogEntry[] = [logA, logB];
- const expectedLogEvents = [
- {
- removed: false,
- ...logA,
- },
- {
- removed: false,
- ...logB,
- },
- ];
- const getLogsStub = Sinon.stub(web3Wrapper, 'getLogsAsync');
- getLogsStub.onCall(0).returns(logs);
- stubs.push(getLogsStub);
- const expectedToBeCalledOnce = false;
- const callback = callbackErrorReporter.reportNodeCallbackErrors(done, expectedToBeCalledOnce)(
- (event: LogEntryEvent) => {
- const expectedLogEvent = expectedLogEvents.shift();
- expect(event).to.be.deep.equal(expectedLogEvent);
- if (_.isEmpty(expectedLogEvents)) {
- done();
- }
- },
- );
- eventWatcher.subscribe(callback);
- });
- it('correctly computes the difference and emits only changes', (done: DoneCallback) => {
- const initialLogs: LogEntry[] = [logA, logB];
- const changedLogs: LogEntry[] = [logA, logC];
- const expectedLogEvents = [
- {
- removed: false,
- ...logA,
- },
- {
- removed: false,
- ...logB,
- },
- {
- removed: true,
- ...logB,
- },
- {
- removed: false,
- ...logC,
- },
- ];
- const getLogsStub = Sinon.stub(web3Wrapper, 'getLogsAsync');
- getLogsStub.onCall(0).returns(initialLogs);
- getLogsStub.onCall(1).returns(changedLogs);
- stubs.push(getLogsStub);
- const expectedToBeCalledOnce = false;
- const callback = callbackErrorReporter.reportNodeCallbackErrors(done, expectedToBeCalledOnce)(
- (event: LogEntryEvent) => {
- const expectedLogEvent = expectedLogEvents.shift();
- expect(event).to.be.deep.equal(expectedLogEvent);
- if (_.isEmpty(expectedLogEvents)) {
- done();
- }
- },
- );
- eventWatcher.subscribe(callback);
- });
-});
diff --git a/packages/order-watcher/test/order_watcher_test.ts b/packages/order-watcher/test/order_watcher_test.ts
index f8175e103..2889051bc 100644
--- a/packages/order-watcher/test/order_watcher_test.ts
+++ b/packages/order-watcher/test/order_watcher_test.ts
@@ -46,16 +46,15 @@ describe('OrderWatcher', () => {
let taker: string;
let signedOrder: SignedOrder;
let orderWatcher: OrderWatcher;
- const config = {
- networkId: constants.TESTRPC_NETWORK_ID,
- };
const decimals = constants.ZRX_DECIMALS;
const fillableAmount = Web3Wrapper.toBaseUnitAmount(new BigNumber(5), decimals);
before(async () => {
- contractWrappers = new ContractWrappers(provider, config);
- // tslint:disable-next-line:no-unused-variable
const networkId = await web3Wrapper.getNetworkIdAsync();
- orderWatcher = new OrderWatcher(provider, constants.TESTRPC_NETWORK_ID);
+ const config = {
+ networkId,
+ };
+ contractWrappers = new ContractWrappers(provider, config);
+ orderWatcher = new OrderWatcher(provider, networkId);
exchangeContractAddress = contractWrappers.exchange.getContractAddress();
userAddresses = await web3Wrapper.getAvailableAddressesAsync();
[, maker, taker] = userAddresses;