aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFabio Berger <me@fabioberger.com>2018-07-17 19:47:23 +0800
committerGitHub <noreply@github.com>2018-07-17 19:47:23 +0800
commit15e92958d6e908fde7dabb92aa596adfaeb0eece (patch)
tree82be007e54f5ab46026ca07dfb74e8610f16313d
parenta9038f2afc974cecf567a9aef50267d29a995e02 (diff)
parentf56a7d0cb2bbe8ba9b6c4fb18151ff068165793b (diff)
downloaddexon-sol-tools-15e92958d6e908fde7dabb92aa596adfaeb0eece.tar
dexon-sol-tools-15e92958d6e908fde7dabb92aa596adfaeb0eece.tar.gz
dexon-sol-tools-15e92958d6e908fde7dabb92aa596adfaeb0eece.tar.bz2
dexon-sol-tools-15e92958d6e908fde7dabb92aa596adfaeb0eece.tar.lz
dexon-sol-tools-15e92958d6e908fde7dabb92aa596adfaeb0eece.tar.xz
dexon-sol-tools-15e92958d6e908fde7dabb92aa596adfaeb0eece.tar.zst
dexon-sol-tools-15e92958d6e908fde7dabb92aa596adfaeb0eece.zip
Merge pull request #875 from 0xProject/fix-order-watcher
OrderWatcher Fixes
-rw-r--r--packages/0x.js/CHANGELOG.json4
-rw-r--r--packages/contract-wrappers/src/contract_wrappers/contract_wrapper.ts36
-rw-r--r--packages/contract-wrappers/src/contract_wrappers/erc20_token_wrapper.ts3
-rw-r--r--packages/contract-wrappers/src/contract_wrappers/erc721_token_wrapper.ts3
-rw-r--r--packages/contract-wrappers/src/contract_wrappers/ether_token_wrapper.ts3
-rw-r--r--packages/contract-wrappers/src/contract_wrappers/exchange_wrapper.ts3
-rw-r--r--packages/contract-wrappers/test/subscription_test.ts38
-rw-r--r--packages/order-watcher/CHANGELOG.json8
-rw-r--r--packages/order-watcher/src/order_watcher/event_watcher.ts12
-rw-r--r--packages/order-watcher/src/order_watcher/order_watcher.ts9
-rw-r--r--packages/order-watcher/src/types.ts4
11 files changed, 50 insertions, 73 deletions
diff --git a/packages/0x.js/CHANGELOG.json b/packages/0x.js/CHANGELOG.json
index 3ec09cd7e..9419311dd 100644
--- a/packages/0x.js/CHANGELOG.json
+++ b/packages/0x.js/CHANGELOG.json
@@ -32,6 +32,10 @@
{
"note": "0x.js exports renamed contract events and event arg types",
"pr": 863
+ },
+ {
+ "note": "Remove stateLayer config from OrderWatcher. It now always operates on the latest block",
+ "pr": 875
}
]
},
diff --git a/packages/contract-wrappers/src/contract_wrappers/contract_wrapper.ts b/packages/contract-wrappers/src/contract_wrappers/contract_wrapper.ts
index b8ca4d29b..daf70253a 100644
--- a/packages/contract-wrappers/src/contract_wrappers/contract_wrapper.ts
+++ b/packages/contract-wrappers/src/contract_wrappers/contract_wrapper.ts
@@ -1,5 +1,5 @@
import { ContractArtifact } from '@0xproject/sol-compiler';
-import { AbiDecoder, intervalUtils } from '@0xproject/utils';
+import { AbiDecoder, intervalUtils, logUtils } from '@0xproject/utils';
import { Web3Wrapper } from '@0xproject/web3-wrapper';
import { BlockParamLiteral, ContractAbi, FilterObject, LogEntry, LogWithDecodedArgs, RawLog } from 'ethereum-types';
import { Block, BlockAndLogStreamer, Log } from 'ethereumjs-blockstream';
@@ -41,6 +41,13 @@ export abstract class ContractWrapper {
};
private _onLogAddedSubscriptionToken: string | undefined;
private _onLogRemovedSubscriptionToken: string | undefined;
+ private static _onBlockAndLogStreamerError(isVerbose: boolean, err: Error): void {
+ // Since Blockstream errors are all recoverable, we simply log them if the verbose
+ // config is passed in.
+ if (isVerbose) {
+ logUtils.warn(err);
+ }
+ }
constructor(web3Wrapper: Web3Wrapper, networkId: number, blockPollingIntervalMs?: number) {
this._web3Wrapper = web3Wrapper;
this._networkId = networkId;
@@ -79,10 +86,11 @@ export abstract class ContractWrapper {
indexFilterValues: IndexedFilterValues,
abi: ContractAbi,
callback: EventCallback<ArgsType>,
+ isVerbose: boolean = false,
): string {
const filter = filterUtils.getFilter(address, eventName, indexFilterValues, abi);
if (_.isUndefined(this._blockAndLogStreamerIfExists)) {
- this._startBlockAndLogStream();
+ this._startBlockAndLogStream(isVerbose);
}
const filterToken = filterUtils.generateUUID();
this._filters[filterToken] = filter;
@@ -151,21 +159,21 @@ export abstract class ContractWrapper {
}
});
}
- private _startBlockAndLogStream(): void {
+ private _startBlockAndLogStream(isVerbose: boolean): void {
if (!_.isUndefined(this._blockAndLogStreamerIfExists)) {
throw new Error(ContractWrappersError.SubscriptionAlreadyPresent);
}
this._blockAndLogStreamerIfExists = new BlockAndLogStreamer(
this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper),
this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper),
- this._onBlockAndLogStreamerError.bind(this),
+ ContractWrapper._onBlockAndLogStreamerError.bind(this, isVerbose),
);
const catchAllLogFilter = {};
this._blockAndLogStreamerIfExists.addLogFilter(catchAllLogFilter);
this._blockAndLogStreamIntervalIfExists = intervalUtils.setAsyncExcludingInterval(
this._reconcileBlockAsync.bind(this),
this._blockPollingIntervalMs,
- this._onReconcileBlockError.bind(this),
+ ContractWrapper._onBlockAndLogStreamerError.bind(this, isVerbose),
);
let isRemoved = false;
this._onLogAddedSubscriptionToken = this._blockAndLogStreamerIfExists.subscribeToOnLogAdded(
@@ -176,20 +184,10 @@ export abstract class ContractWrapper {
this._onLogStateChanged.bind(this, isRemoved),
);
}
- private _onBlockAndLogStreamerError(err: Error): void {
- // Propogate all Blockstream subscriber errors to all
- // top-level subscriptions
- const filterCallbacks = _.values(this._filterCallbacks);
- _.each(filterCallbacks, filterCallback => {
- filterCallback(err);
- });
- }
- private _onReconcileBlockError(err: Error): void {
- const filterTokens = _.keys(this._filterCallbacks);
- _.each(filterTokens, filterToken => {
- this._unsubscribe(filterToken, err);
- });
- }
+ // HACK: This should be a package-scoped method (which doesn't exist in TS)
+ // We don't want this method available in the public interface for all classes
+ // who inherit from ContractWrapper, and it is only used by the internal implementation
+ // of those higher classes.
// tslint:disable-next-line:no-unused-variable
private _setNetworkId(networkId: number): void {
this._networkId = networkId;
diff --git a/packages/contract-wrappers/src/contract_wrappers/erc20_token_wrapper.ts b/packages/contract-wrappers/src/contract_wrappers/erc20_token_wrapper.ts
index 29c63564e..17bda5085 100644
--- a/packages/contract-wrappers/src/contract_wrappers/erc20_token_wrapper.ts
+++ b/packages/contract-wrappers/src/contract_wrappers/erc20_token_wrapper.ts
@@ -347,6 +347,7 @@ export class ERC20TokenWrapper extends ContractWrapper {
* @param indexFilterValues An object where the keys are indexed args returned by the event and
* the value is the value you are interested in. E.g `{maker: aUserAddressHex}`
* @param callback Callback that gets called when a log is added/removed
+ * @param isVerbose Enable verbose subscription warnings (e.g recoverable network issues encountered)
* @return Subscription token used later to unsubscribe
*/
public subscribe<ArgsType extends ERC20TokenEventArgs>(
@@ -354,6 +355,7 @@ export class ERC20TokenWrapper extends ContractWrapper {
eventName: ERC20TokenEvents,
indexFilterValues: IndexedFilterValues,
callback: EventCallback<ArgsType>,
+ isVerbose: boolean = false,
): string {
assert.isETHAddressHex('tokenAddress', tokenAddress);
assert.doesBelongToStringEnum('eventName', eventName, ERC20TokenEvents);
@@ -366,6 +368,7 @@ export class ERC20TokenWrapper extends ContractWrapper {
indexFilterValues,
artifacts.ERC20Token.compilerOutput.abi,
callback,
+ isVerbose,
);
return subscriptionToken;
}
diff --git a/packages/contract-wrappers/src/contract_wrappers/erc721_token_wrapper.ts b/packages/contract-wrappers/src/contract_wrappers/erc721_token_wrapper.ts
index b7e5519c4..3d24702b3 100644
--- a/packages/contract-wrappers/src/contract_wrappers/erc721_token_wrapper.ts
+++ b/packages/contract-wrappers/src/contract_wrappers/erc721_token_wrapper.ts
@@ -372,6 +372,7 @@ export class ERC721TokenWrapper extends ContractWrapper {
* @param indexFilterValues An object where the keys are indexed args returned by the event and
* the value is the value you are interested in. E.g `{maker: aUserAddressHex}`
* @param callback Callback that gets called when a log is added/removed
+ * @param isVerbose Enable verbose subscription warnings (e.g recoverable network issues encountered)
* @return Subscription token used later to unsubscribe
*/
public subscribe<ArgsType extends ERC721TokenEventArgs>(
@@ -379,6 +380,7 @@ export class ERC721TokenWrapper extends ContractWrapper {
eventName: ERC721TokenEvents,
indexFilterValues: IndexedFilterValues,
callback: EventCallback<ArgsType>,
+ isVerbose: boolean = false,
): string {
assert.isETHAddressHex('tokenAddress', tokenAddress);
assert.doesBelongToStringEnum('eventName', eventName, ERC721TokenEvents);
@@ -391,6 +393,7 @@ export class ERC721TokenWrapper extends ContractWrapper {
indexFilterValues,
artifacts.ERC721Token.compilerOutput.abi,
callback,
+ isVerbose,
);
return subscriptionToken;
}
diff --git a/packages/contract-wrappers/src/contract_wrappers/ether_token_wrapper.ts b/packages/contract-wrappers/src/contract_wrappers/ether_token_wrapper.ts
index 01440a5e1..5046d3667 100644
--- a/packages/contract-wrappers/src/contract_wrappers/ether_token_wrapper.ts
+++ b/packages/contract-wrappers/src/contract_wrappers/ether_token_wrapper.ts
@@ -146,6 +146,7 @@ export class EtherTokenWrapper extends ContractWrapper {
* @param indexFilterValues An object where the keys are indexed args returned by the event and
* the value is the value you are interested in. E.g `{_owner: aUserAddressHex}`
* @param callback Callback that gets called when a log is added/removed
+ * @param isVerbose Enable verbose subscription warnings (e.g recoverable network issues encountered)
* @return Subscription token used later to unsubscribe
*/
public subscribe<ArgsType extends WETH9EventArgs>(
@@ -153,6 +154,7 @@ export class EtherTokenWrapper extends ContractWrapper {
eventName: WETH9Events,
indexFilterValues: IndexedFilterValues,
callback: EventCallback<ArgsType>,
+ isVerbose: boolean = false,
): string {
assert.isETHAddressHex('etherTokenAddress', etherTokenAddress);
const normalizedEtherTokenAddress = etherTokenAddress.toLowerCase();
@@ -165,6 +167,7 @@ export class EtherTokenWrapper extends ContractWrapper {
indexFilterValues,
artifacts.EtherToken.compilerOutput.abi,
callback,
+ isVerbose,
);
return subscriptionToken;
}
diff --git a/packages/contract-wrappers/src/contract_wrappers/exchange_wrapper.ts b/packages/contract-wrappers/src/contract_wrappers/exchange_wrapper.ts
index 8b7148aed..0791c62f5 100644
--- a/packages/contract-wrappers/src/contract_wrappers/exchange_wrapper.ts
+++ b/packages/contract-wrappers/src/contract_wrappers/exchange_wrapper.ts
@@ -988,12 +988,14 @@ export class ExchangeWrapper extends ContractWrapper {
* @param indexFilterValues An object where the keys are indexed args returned by the event and
* the value is the value you are interested in. E.g `{maker: aUserAddressHex}`
* @param callback Callback that gets called when a log is added/removed
+ * @param isVerbose Enable verbose subscription warnings (e.g recoverable network issues encountered)
* @return Subscription token used later to unsubscribe
*/
public subscribe<ArgsType extends ExchangeEventArgs>(
eventName: ExchangeEvents,
indexFilterValues: IndexedFilterValues,
callback: EventCallback<ArgsType>,
+ isVerbose: boolean = false,
): string {
assert.doesBelongToStringEnum('eventName', eventName, ExchangeEvents);
assert.doesConformToSchema('indexFilterValues', indexFilterValues, schemas.indexFilterValuesSchema);
@@ -1005,6 +1007,7 @@ export class ExchangeWrapper extends ContractWrapper {
indexFilterValues,
artifacts.Exchange.compilerOutput.abi,
callback,
+ isVerbose,
);
return subscriptionToken;
}
diff --git a/packages/contract-wrappers/test/subscription_test.ts b/packages/contract-wrappers/test/subscription_test.ts
index adda4ab78..80d17576f 100644
--- a/packages/contract-wrappers/test/subscription_test.ts
+++ b/packages/contract-wrappers/test/subscription_test.ts
@@ -49,44 +49,6 @@ describe('SubscriptionTest', () => {
_.each(stubs, s => s.restore());
stubs = [];
});
- it('Should receive the Error when an error occurs while fetching the block', (done: DoneCallback) => {
- (async () => {
- const errMsg = 'Error fetching block';
- const callback = callbackErrorReporter.assertNodeCallbackError(done, errMsg);
- stubs = [Sinon.stub((contractWrappers as any)._web3Wrapper, 'getBlockAsync').throws(new Error(errMsg))];
- contractWrappers.erc20Token.subscribe(
- tokenAddress,
- ERC20TokenEvents.Approval,
- indexFilterValues,
- callback,
- );
- await contractWrappers.erc20Token.setAllowanceAsync(
- tokenAddress,
- coinbase,
- addressWithoutFunds,
- allowanceAmount,
- );
- })().catch(done);
- });
- it('Should receive the Error when an error occurs while reconciling the new block', (done: DoneCallback) => {
- (async () => {
- const errMsg = 'Error fetching logs';
- const callback = callbackErrorReporter.assertNodeCallbackError(done, errMsg);
- stubs = [Sinon.stub((contractWrappers as any)._web3Wrapper, 'getLogsAsync').throws(new Error(errMsg))];
- contractWrappers.erc20Token.subscribe(
- tokenAddress,
- ERC20TokenEvents.Approval,
- indexFilterValues,
- callback,
- );
- await contractWrappers.erc20Token.setAllowanceAsync(
- tokenAddress,
- coinbase,
- addressWithoutFunds,
- allowanceAmount,
- );
- })().catch(done);
- });
it('Should allow unsubscribeAll to be called successfully after an error', (done: DoneCallback) => {
(async () => {
const callback = (err: Error | null, _logEvent?: DecodedLogEvent<ERC20TokenApprovalEventArgs>) =>
diff --git a/packages/order-watcher/CHANGELOG.json b/packages/order-watcher/CHANGELOG.json
index a66db6eec..e747a2129 100644
--- a/packages/order-watcher/CHANGELOG.json
+++ b/packages/order-watcher/CHANGELOG.json
@@ -9,6 +9,14 @@
{
"note": "Do not stop subscription if error is encountered",
"pr": 825
+ },
+ {
+ "note": "Fixed a bug that caused the incorrect block to be fetched via JSON-RPC within Blockstream",
+ "pr": 875
+ },
+ {
+ "note": "Remove stateLayer config from OrderWatcher. It now always operates on the latest block",
+ "pr": 875
}
],
"timestamp": 1531149657
diff --git a/packages/order-watcher/src/order_watcher/event_watcher.ts b/packages/order-watcher/src/order_watcher/event_watcher.ts
index 08ecf81cb..60a3c6297 100644
--- a/packages/order-watcher/src/order_watcher/event_watcher.ts
+++ b/packages/order-watcher/src/order_watcher/event_watcher.ts
@@ -30,7 +30,7 @@ export class EventWatcher {
constructor(
web3Wrapper: Web3Wrapper,
pollingIntervalIfExistsMs: undefined | number,
- stateLayer: BlockParamLiteral = BlockParamLiteral.Latest,
+ stateLayer: BlockParamLiteral,
isVerbose: boolean,
) {
this._isVerbose = isVerbose;
@@ -61,13 +61,9 @@ export class EventWatcher {
if (!_.isUndefined(this._blockAndLogStreamerIfExists)) {
throw new Error(OrderWatcherError.SubscriptionAlreadyPresent);
}
- const eventFilter = {
- fromBlock: this._stateLayer,
- toBlock: this._stateLayer,
- };
this._blockAndLogStreamerIfExists = new BlockAndLogStreamer(
- this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper, this._stateLayer),
- this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper, eventFilter),
+ this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper),
+ this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper),
this._onBlockAndLogStreamerError.bind(this),
);
const catchAllLogFilter = {};
@@ -104,7 +100,7 @@ export class EventWatcher {
await this._emitDifferencesAsync(log, isRemoved ? LogEventState.Removed : LogEventState.Added, callback);
}
private async _reconcileBlockAsync(): Promise<void> {
- const latestBlock = await this._web3Wrapper.getBlockAsync(BlockParamLiteral.Latest);
+ const latestBlock = await this._web3Wrapper.getBlockAsync(this._stateLayer);
// We need to coerce to Block type cause Web3.Block includes types for mempool blocks
if (!_.isUndefined(this._blockAndLogStreamerIfExists)) {
// If we clear the interval while fetching the block - this._blockAndLogStreamer will be undefined
diff --git a/packages/order-watcher/src/order_watcher/order_watcher.ts b/packages/order-watcher/src/order_watcher/order_watcher.ts
index b09ba8d9d..95671684f 100644
--- a/packages/order-watcher/src/order_watcher/order_watcher.ts
+++ b/packages/order-watcher/src/order_watcher/order_watcher.ts
@@ -61,6 +61,7 @@ interface OrderStateByOrderHash {
// tslint:disable-next-line:custom-no-magic-numbers
const DEFAULT_CLEANUP_JOB_INTERVAL_MS = 1000 * 60 * 60; // 1h
+const STATE_LAYER = BlockParamLiteral.Latest;
/**
* This class includes all the functionality related to watching a set of orders
@@ -91,17 +92,15 @@ export class OrderWatcher {
});
this._contractWrappers = new ContractWrappers(provider, { networkId });
const pollingIntervalIfExistsMs = _.isUndefined(config) ? undefined : config.eventPollingIntervalMs;
- const stateLayer =
- _.isUndefined(config) || _.isUndefined(config.stateLayer) ? BlockParamLiteral.Latest : config.stateLayer;
const isVerbose = !_.isUndefined(config) && !_.isUndefined(config.isVerbose) ? config.isVerbose : false;
- this._eventWatcher = new EventWatcher(this._web3Wrapper, pollingIntervalIfExistsMs, stateLayer, isVerbose);
+ this._eventWatcher = new EventWatcher(this._web3Wrapper, pollingIntervalIfExistsMs, STATE_LAYER, isVerbose);
this._balanceAndProxyAllowanceLazyStore = new BalanceAndProxyAllowanceLazyStore(
this._contractWrappers.token,
- stateLayer,
+ STATE_LAYER,
);
this._orderFilledCancelledLazyStore = new OrderFilledCancelledLazyStore(
this._contractWrappers.exchange,
- stateLayer,
+ STATE_LAYER,
);
this._orderStateUtils = new OrderStateUtils(
this._balanceAndProxyAllowanceLazyStore,
diff --git a/packages/order-watcher/src/types.ts b/packages/order-watcher/src/types.ts
index 63e4e7848..fd71267a2 100644
--- a/packages/order-watcher/src/types.ts
+++ b/packages/order-watcher/src/types.ts
@@ -1,4 +1,4 @@
-import { BlockParamLiteral, LogEntryEvent, OrderState } from '@0xproject/types';
+import { LogEntryEvent, OrderState } from '@0xproject/types';
export enum OrderWatcherError {
SubscriptionAlreadyPresent = 'SUBSCRIPTION_ALREADY_PRESENT',
@@ -13,10 +13,8 @@ export type EventWatcherCallback = (err: null | Error, log?: LogEntryEvent) => v
* expirationMarginMs: Amount of time before order expiry that you'd like to be notified
* of an orders expiration. Default=0.
* cleanupJobIntervalMs: How often to run a cleanup job which revalidates all the orders. Default=1hr.
- * stateLayer: Optional blockchain state layer OrderWatcher will monitor for new events. Default=latest.
*/
export interface OrderWatcherConfig {
- stateLayer: BlockParamLiteral;
orderExpirationCheckingIntervalMs?: number;
eventPollingIntervalMs?: number;
expirationMarginMs?: number;