aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorLeonid Logvinov <logvinov.leon@gmail.com>2017-10-05 19:34:30 +0800
committerLeonid Logvinov <logvinov.leon@gmail.com>2017-10-05 20:35:37 +0800
commit7dd63523939822203d938511472c84b8ff418aaf (patch)
tree68e75aa486e673fc7faf2dc69e0527844b842e81 /src
parente37a3155cd52d35da3eef9a8dc450b9b3df0b888 (diff)
downloaddexon-sol-tools-7dd63523939822203d938511472c84b8ff418aaf.tar
dexon-sol-tools-7dd63523939822203d938511472c84b8ff418aaf.tar.gz
dexon-sol-tools-7dd63523939822203d938511472c84b8ff418aaf.tar.bz2
dexon-sol-tools-7dd63523939822203d938511472c84b8ff418aaf.tar.lz
dexon-sol-tools-7dd63523939822203d938511472c84b8ff418aaf.tar.xz
dexon-sol-tools-7dd63523939822203d938511472c84b8ff418aaf.tar.zst
dexon-sol-tools-7dd63523939822203d938511472c84b8ff418aaf.zip
Implement subscriptions based on ethereumjs-blockstream
Diffstat (limited to 'src')
-rw-r--r--src/contract_wrappers/contract_wrapper.ts122
-rw-r--r--src/contract_wrappers/exchange_wrapper.ts82
-rw-r--r--src/contract_wrappers/token_wrapper.ts60
-rw-r--r--src/index.ts2
-rw-r--r--src/types.ts24
-rw-r--r--src/utils/constants.ts1
-rw-r--r--src/utils/event_utils.ts41
-rw-r--r--src/utils/filter_utils.ts80
8 files changed, 229 insertions, 183 deletions
diff --git a/src/contract_wrappers/contract_wrapper.ts b/src/contract_wrappers/contract_wrapper.ts
index 927a09b52..6f22f5bdb 100644
--- a/src/contract_wrappers/contract_wrapper.ts
+++ b/src/contract_wrappers/contract_wrapper.ts
@@ -1,10 +1,10 @@
import * as _ from 'lodash';
import * as Web3 from 'web3';
-import * as ethUtil from 'ethereumjs-util';
-import {BlockAndLogStreamer} from 'ethereumjs-blockstream';
+import {BlockAndLogStreamer, Block} from 'ethereumjs-blockstream';
import {Web3Wrapper} from '../web3_wrapper';
import {AbiDecoder} from '../utils/abi_decoder';
import {
+ ZeroExError,
InternalZeroExError,
Artifact,
LogWithDecodedArgs,
@@ -12,38 +12,73 @@ import {
ContractEvents,
SubscriptionOpts,
IndexedFilterValues,
+ EventCallback,
} from '../types';
import {utils} from '../utils/utils';
-
-const TOPIC_LENGTH = 32;
+import {constants} from '../utils/constants';
+import {intervalUtils} from '../utils/interval_utils';
+import {filterUtils} from '../utils/filter_utils';
export class ContractWrapper {
protected _web3Wrapper: Web3Wrapper;
private _abiDecoder?: AbiDecoder;
private _blockAndLogStreamer: BlockAndLogStreamer;
+ private _blockAndLogStreamInterval: NodeJS.Timer;
+ private _activeFilters: number;
+ private _filters: {[filterToken: string]: Web3.FilterObject};
+ private _filterCallbacks: {[filterToken: string]: EventCallback};
+ private _onLogAddedSubscriptionToken: string|undefined;
+ private _onLogRemovedSubscriptionToken: string|undefined;
constructor(web3Wrapper: Web3Wrapper, abiDecoder?: AbiDecoder) {
this._web3Wrapper = web3Wrapper;
this._abiDecoder = abiDecoder;
- const getBlockAsync = async (hash: string) => this._web3Wrapper.getBlockAsync(hash);
- this._blockAndLogStreamer = new BlockAndLogStreamer(
- this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper),
- this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper),
+ this._activeFilters = 0;
+ this._filters = {};
+ this._filterCallbacks = {};
+ this._onLogAddedSubscriptionToken = undefined;
+ this._onLogRemovedSubscriptionToken = undefined;
+ }
+ protected _subscribe(address: string, eventName: ContractEvents,
+ indexFilterValues: IndexedFilterValues, abi: Web3.ContractAbi,
+ callback: EventCallback): string {
+ const filter = filterUtils.getFilter(
+ this._web3Wrapper.keccak256.bind(this._web3Wrapper), address, eventName, indexFilterValues, abi,
);
+ if (_.isEmpty(this._filters)) {
+ this._startBlockAndLogStream();
+ let removed = false;
+ this._onLogAddedSubscriptionToken = this._blockAndLogStreamer.subscribeToOnLogAdded(
+ this._onLogStateChanged.bind(this, removed),
+ );
+ removed = true;
+ this._onLogRemovedSubscriptionToken = this._blockAndLogStreamer.subscribeToOnLogRemoved(
+ this._onLogStateChanged.bind(this, removed),
+ );
+ }
+ const filterToken = filterUtils.generateUUID();
+ this._filters[filterToken] = filter;
+ this._filterCallbacks[filterToken] = callback;
+ return filterToken;
+ }
+ protected _unsubscribe(filterToken: string): void {
+ if (_.isUndefined(this._filters[filterToken])) {
+ throw new Error(ZeroExError.SubscriptionNotFound);
+ }
+ delete this._filters[filterToken];
+ delete this._filterCallbacks[filterToken];
+ if (_.isEmpty(this._filters)) {
+ this._blockAndLogStreamer.unsubscribeFromOnLogAdded(this._onLogAddedSubscriptionToken as string);
+ this._blockAndLogStreamer.unsubscribeFromOnLogRemoved(this._onLogRemovedSubscriptionToken as string);
+ this._stopBlockAndLogStream();
+ }
}
protected async _getLogsAsync(address: string, eventName: ContractEvents, subscriptionOpts: SubscriptionOpts,
indexFilterValues: IndexedFilterValues,
abi: Web3.ContractAbi): Promise<LogWithDecodedArgs[]> {
- const eventAbi = _.find(abi, {name: eventName}) as Web3.EventAbi;
- const eventSignature = this._getEventSignatureFromAbiByName(eventAbi, eventName);
- const topicForEventSignature = this._web3Wrapper.keccak256(eventSignature);
- const topicsForIndexedArgs = this._getTopicsForIndexedArgs(eventAbi, indexFilterValues);
- const topics = [topicForEventSignature, ...topicsForIndexedArgs];
- const filter = {
- fromBlock: subscriptionOpts.fromBlock,
- toBlock: subscriptionOpts.toBlock,
- address,
- topics,
- };
+ const filter = filterUtils.getFilter(
+ this._web3Wrapper.keccak256.bind(this._web3Wrapper), address, eventName, indexFilterValues, abi,
+ subscriptionOpts,
+ );
const logs = await this._web3Wrapper.getLogsAsync(filter);
const logsWithDecodedArguments = _.map(logs, this._tryToDecodeLogOrNoop.bind(this));
return logsWithDecodedArguments;
@@ -62,27 +97,34 @@ export class ContractWrapper {
await this._web3Wrapper.getContractInstanceFromArtifactAsync<A>(artifact, addressIfExists);
return contractInstance;
}
- protected _getEventSignatureFromAbiByName(eventAbi: Web3.EventAbi, eventName: ContractEvents): string {
- const types = _.map(eventAbi.inputs, 'type');
- const signature = `${eventAbi.name}(${types.join(',')})`;
- return signature;
- }
- private _getTopicsForIndexedArgs(abi: Web3.EventAbi, indexFilterValues: IndexedFilterValues): Array<string|null> {
- const topics: Array<string|null> = [];
- for (const eventInput of abi.inputs) {
- if (!eventInput.indexed) {
- continue;
+ private _onLogStateChanged(removed: boolean, log: Web3.LogEntry): void {
+ _.forEach(this._filters, (filter: Web3.FilterObject, filterToken: string) => {
+ if (filterUtils.matchesFilter(log, filter)) {
+ const decodedLog = this._tryToDecodeLogOrNoop(log) as LogWithDecodedArgs;
+ const logEvent = {
+ ...decodedLog,
+ removed,
+ };
+ this._filterCallbacks[filterToken](logEvent);
}
- if (_.isUndefined(indexFilterValues[eventInput.name])) {
- topics.push(null);
- } else {
- const value = indexFilterValues[eventInput.name] as string;
- const buffer = ethUtil.toBuffer(value);
- const paddedBuffer = ethUtil.setLengthLeft(buffer, TOPIC_LENGTH);
- const topic = ethUtil.bufferToHex(paddedBuffer);
- topics.push(topic);
- }
- }
- return topics;
+ });
+ }
+ private _startBlockAndLogStream(): void {
+ this._blockAndLogStreamer = new BlockAndLogStreamer(
+ this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper),
+ this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper),
+ );
+ this._blockAndLogStreamer.addLogFilter({});
+ this._blockAndLogStreamInterval = intervalUtils.setAsyncExcludingInterval(
+ this._reconcileBlockAsync.bind(this), constants.DEFAULT_BLOCK_POLLING_INTERVAL,
+ );
+ }
+ private _stopBlockAndLogStream(): void {
+ intervalUtils.clearAsyncExcludingInterval(this._blockAndLogStreamInterval);
+ delete this._blockAndLogStreamer;
+ }
+ private async _reconcileBlockAsync(): Promise<void> {
+ const latestBlock = await this._web3Wrapper.getBlockAsync('latest');
+ this._blockAndLogStreamer.reconcileNewBlock(latestBlock as any as Block);
}
}
diff --git a/src/contract_wrappers/exchange_wrapper.ts b/src/contract_wrappers/exchange_wrapper.ts
index 32eaa590c..e5f190864 100644
--- a/src/contract_wrappers/exchange_wrapper.ts
+++ b/src/contract_wrappers/exchange_wrapper.ts
@@ -16,11 +16,8 @@ import {
SignedOrder,
ContractEvent,
ExchangeEvents,
- ContractEventEmitter,
SubscriptionOpts,
IndexedFilterValues,
- CreateContractEvent,
- ContractEventObj,
OrderCancellationRequest,
OrderFillRequest,
LogErrorContractEventArgs,
@@ -31,10 +28,10 @@ import {
ValidateOrderFillableOpts,
OrderTransactionOpts,
RawLog,
+ EventCallback,
} from '../types';
import {assert} from '../utils/assert';
import {utils} from '../utils/utils';
-import {eventUtils} from '../utils/event_utils';
import {OrderValidationUtils} from '../utils/order_validation_utils';
import {ContractWrapper} from './contract_wrapper';
import {constants} from '../utils/constants';
@@ -51,7 +48,7 @@ const SHOULD_VALIDATE_BY_DEFAULT = true;
*/
export class ExchangeWrapper extends ContractWrapper {
private _exchangeContractIfExists?: ExchangeContract;
- private _exchangeLogEventEmitters: ContractEventEmitter[];
+ private _activeSubscriptions: string[];
private _orderValidationUtils: OrderValidationUtils;
private _tokenWrapper: TokenWrapper;
private _exchangeContractErrCodesToMsg = {
@@ -86,7 +83,7 @@ export class ExchangeWrapper extends ContractWrapper {
super(web3Wrapper, abiDecoder);
this._tokenWrapper = tokenWrapper;
this._orderValidationUtils = new OrderValidationUtils(tokenWrapper, this);
- this._exchangeLogEventEmitters = [];
+ this._activeSubscriptions = [];
this._contractAddressIfExists = contractAddressIfExists;
}
/**
@@ -622,41 +619,32 @@ export class ExchangeWrapper extends ContractWrapper {
return txHash;
}
/**
- * Subscribe to an event type emitted by the Exchange smart contract
- * @param eventName The exchange contract event you would like to subscribe to.
- * @param subscriptionOpts Subscriptions options that let you configure the subscription.
- * @param indexFilterValues An object where the keys are indexed args returned by the event and
- * the value is the value you are interested in. E.g `{maker: aUserAddressHex}`
- * @param exchangeContractAddress The hex encoded address of the Exchange contract to call.
- * @return ContractEventEmitter object
+ * Subscribe to an event type emitted by the Exchange contract.
+ * @param tokenAddress The hex encoded address where the ERC20 token is deployed.
+ * @param eventName The exchange contract event you would like to subscribe to.
+ * @param indexFilterValues An object where the keys are indexed args returned by the event and
+ * the value is the value you are interested in. E.g `{maker: aUserAddressHex}`
+ * @param callback Callback that gets called when a log is added/removed
+ * @return ContractEventEmitter object
*/
- public async subscribeAsync(eventName: ExchangeEvents, subscriptionOpts: SubscriptionOpts,
- indexFilterValues: IndexedFilterValues, exchangeContractAddress: string):
- Promise<ContractEventEmitter> {
- assert.isETHAddressHex('exchangeContractAddress', exchangeContractAddress);
+ public async subscribeAsync(eventName: ExchangeEvents, indexFilterValues: IndexedFilterValues,
+ callback: EventCallback): Promise<string> {
assert.doesBelongToStringEnum('eventName', eventName, ExchangeEvents);
- assert.doesConformToSchema('subscriptionOpts', subscriptionOpts, schemas.subscriptionOptsSchema);
assert.doesConformToSchema('indexFilterValues', indexFilterValues, schemas.indexFilterValuesSchema);
- const exchangeContract = await this._getExchangeContractAsync();
- let createLogEvent: CreateContractEvent;
- switch (eventName) {
- case ExchangeEvents.LogFill:
- createLogEvent = exchangeContract.LogFill;
- break;
- case ExchangeEvents.LogError:
- createLogEvent = exchangeContract.LogError;
- break;
- case ExchangeEvents.LogCancel:
- createLogEvent = exchangeContract.LogCancel;
- break;
- default:
- throw utils.spawnSwitchErr('ExchangeEvents', eventName);
- }
-
- const logEventObj: ContractEventObj = createLogEvent(indexFilterValues, subscriptionOpts);
- const eventEmitter = eventUtils.wrapEventEmitter(logEventObj);
- this._exchangeLogEventEmitters.push(eventEmitter);
- return eventEmitter;
+ const exchangeContractAddress = await this.getContractAddressAsync();
+ const subscriptionToken = this._subscribe(
+ exchangeContractAddress, eventName, indexFilterValues, artifacts.ExchangeArtifact.abi, callback,
+ );
+ this._activeSubscriptions.push(subscriptionToken);
+ return subscriptionToken;
+ }
+ /**
+ * Cancel a subscription
+ * @param subscriptionToken Subscription token returned by `subscribe()`
+ */
+ public unsubscribe(subscriptionToken: string): void {
+ _.pull(this._activeSubscriptions, subscriptionToken);
+ this._unsubscribe(subscriptionToken);
}
/**
* Gets historical logs without creating a subscription
@@ -678,15 +666,6 @@ export class ExchangeWrapper extends ContractWrapper {
return logs;
}
/**
- * Stops watching for all exchange events
- */
- public async stopWatchingAllEventsAsync(): Promise<void> {
- const stopWatchingPromises = _.map(this._exchangeLogEventEmitters,
- logEventObj => logEventObj.stopWatchingAsync());
- await Promise.all(stopWatchingPromises);
- this._exchangeLogEventEmitters = [];
- }
- /**
* Retrieves the Ethereum address of the Exchange contract deployed on the network
* that the user-passed web3 provider is connected to.
* @returns The Ethereum address of the Exchange contract being used.
@@ -809,8 +788,15 @@ export class ExchangeWrapper extends ContractWrapper {
const ZRXtokenAddress = await exchangeInstance.ZRX_TOKEN_CONTRACT.callAsync();
return ZRXtokenAddress;
}
+ /**
+ * Cancels all existing subscriptions
+ */
+ public unsubscribeAll(): void {
+ _.forEach(this._activeSubscriptions, this._unsubscribe.bind(this));
+ this._activeSubscriptions = [];
+ }
private async _invalidateContractInstancesAsync(): Promise<void> {
- await this.stopWatchingAllEventsAsync();
+ this.unsubscribeAll();
delete this._exchangeContractIfExists;
}
private async _isValidSignatureUsingContractCallAsync(dataHex: string, ecSignature: ECSignature,
diff --git a/src/contract_wrappers/token_wrapper.ts b/src/contract_wrappers/token_wrapper.ts
index f988e6ece..d2b1ccf96 100644
--- a/src/contract_wrappers/token_wrapper.ts
+++ b/src/contract_wrappers/token_wrapper.ts
@@ -4,7 +4,6 @@ import {SchemaValidator, schemas} from '0x-json-schemas';
import {Web3Wrapper} from '../web3_wrapper';
import {assert} from '../utils/assert';
import {utils} from '../utils/utils';
-import {eventUtils} from '../utils/event_utils';
import {constants} from '../utils/constants';
import {ContractWrapper} from './contract_wrapper';
import {AbiDecoder} from '../utils/abi_decoder';
@@ -15,12 +14,10 @@ import {
TokenEvents,
IndexedFilterValues,
SubscriptionOpts,
- CreateContractEvent,
- ContractEventEmitter,
- ContractEventObj,
MethodOpts,
LogWithDecodedArgs,
RawLog,
+ EventCallback,
} from '../types';
const ALLOWANCE_TO_ZERO_GAS_AMOUNT = 47155;
@@ -33,13 +30,13 @@ const ALLOWANCE_TO_ZERO_GAS_AMOUNT = 47155;
export class TokenWrapper extends ContractWrapper {
public UNLIMITED_ALLOWANCE_IN_BASE_UNITS = constants.UNLIMITED_ALLOWANCE_IN_BASE_UNITS;
private _tokenContractsByAddress: {[address: string]: TokenContract};
- private _tokenLogEventEmitters: ContractEventEmitter[];
+ private _activeSubscriptions: string[];
private _tokenTransferProxyContractAddressFetcher: () => Promise<string>;
constructor(web3Wrapper: Web3Wrapper, abiDecoder: AbiDecoder,
tokenTransferProxyContractAddressFetcher: () => Promise<string>) {
super(web3Wrapper, abiDecoder);
this._tokenContractsByAddress = {};
- this._tokenLogEventEmitters = [];
+ this._activeSubscriptions = [];
this._tokenTransferProxyContractAddressFetcher = tokenTransferProxyContractAddressFetcher;
}
/**
@@ -251,34 +248,29 @@ export class TokenWrapper extends ContractWrapper {
* Subscribe to an event type emitted by the Token contract.
* @param tokenAddress The hex encoded address where the ERC20 token is deployed.
* @param eventName The token contract event you would like to subscribe to.
- * @param subscriptionOpts Subscriptions options that let you configure the subscription.
* @param indexFilterValues An object where the keys are indexed args returned by the event and
* the value is the value you are interested in. E.g `{maker: aUserAddressHex}`
+ * @param callback Callback that gets called when a log is added/removed
* @return ContractEventEmitter object
*/
- public async subscribeAsync(tokenAddress: string, eventName: TokenEvents, subscriptionOpts: SubscriptionOpts,
- indexFilterValues: IndexedFilterValues): Promise<ContractEventEmitter> {
+ public subscribe(tokenAddress: string, eventName: TokenEvents, indexFilterValues: IndexedFilterValues,
+ callback: EventCallback): string {
assert.isETHAddressHex('tokenAddress', tokenAddress);
assert.doesBelongToStringEnum('eventName', eventName, TokenEvents);
- assert.doesConformToSchema('subscriptionOpts', subscriptionOpts, schemas.subscriptionOptsSchema);
assert.doesConformToSchema('indexFilterValues', indexFilterValues, schemas.indexFilterValuesSchema);
- const tokenContract = await this._getTokenContractAsync(tokenAddress);
- let createLogEvent: CreateContractEvent;
- switch (eventName) {
- case TokenEvents.Approval:
- createLogEvent = tokenContract.Approval;
- break;
- case TokenEvents.Transfer:
- createLogEvent = tokenContract.Transfer;
- break;
- default:
- throw utils.spawnSwitchErr('TokenEvents', eventName);
- }
-
- const logEventObj: ContractEventObj = createLogEvent(indexFilterValues, subscriptionOpts);
- const eventEmitter = eventUtils.wrapEventEmitter(logEventObj);
- this._tokenLogEventEmitters.push(eventEmitter);
- return eventEmitter;
+ const subscriptionToken = this._subscribe(
+ tokenAddress, eventName, indexFilterValues, artifacts.TokenArtifact.abi, callback,
+ );
+ this._activeSubscriptions.push(subscriptionToken);
+ return subscriptionToken;
+ }
+ /**
+ * Cancel a subscription
+ * @param subscriptionToken Subscription token returned by `subscribe()`
+ */
+ public unsubscribe(subscriptionToken: string): void {
+ _.pull(this._activeSubscriptions, subscriptionToken);
+ this._unsubscribe(subscriptionToken);
}
/**
* Gets historical logs without creating a subscription
@@ -301,16 +293,14 @@ export class TokenWrapper extends ContractWrapper {
return logs;
}
/**
- * Stops watching for all token events
+ * Cancels all existing subscriptions
*/
- public async stopWatchingAllEventsAsync(): Promise<void> {
- const stopWatchingPromises = _.map(this._tokenLogEventEmitters,
- logEventObj => logEventObj.stopWatchingAsync());
- await Promise.all(stopWatchingPromises);
- this._tokenLogEventEmitters = [];
+ public unsubscribeAll(): void {
+ _.forEach(this._activeSubscriptions, this._unsubscribe.bind(this));
+ this._activeSubscriptions = [];
}
- private async _invalidateContractInstancesAsync(): Promise<void> {
- await this.stopWatchingAllEventsAsync();
+ private _invalidateContractInstancesAsync(): void {
+ this.unsubscribeAll();
this._tokenContractsByAddress = {};
}
private async _getTokenContractAsync(tokenAddress: string): Promise<TokenContract> {
diff --git a/src/index.ts b/src/index.ts
index 3359743e9..97ab084b7 100644
--- a/src/index.ts
+++ b/src/index.ts
@@ -19,7 +19,6 @@ export {
OrderFillOrKillRequest,
OrderCancellationRequest,
OrderFillRequest,
- ContractEventEmitter,
LogErrorContractEventArgs,
LogCancelContractEventArgs,
LogFillContractEventArgs,
@@ -36,4 +35,5 @@ export {
MethodOpts,
OrderTransactionOpts,
FilterObject,
+ LogEvent,
} from './types';
diff --git a/src/types.ts b/src/types.ts
index 35bb6af78..f0f37bfca 100644
--- a/src/types.ts
+++ b/src/types.ts
@@ -14,6 +14,7 @@ export enum ZeroExError {
InvalidJump = 'INVALID_JUMP',
OutOfGas = 'OUT_OF_GAS',
NoNetworkId = 'NO_NETWORK_ID',
+ SubscriptionNotFound = 'SUBSCRIPTION_NOT_FOUND',
}
export enum InternalZeroExError {
@@ -35,23 +36,17 @@ export type OrderAddresses = [string, string, string, string, string];
export type OrderValues = [BigNumber.BigNumber, BigNumber.BigNumber, BigNumber.BigNumber,
BigNumber.BigNumber, BigNumber.BigNumber, BigNumber.BigNumber];
-export type EventCallbackAsync = (err: Error, event: ContractEvent) => Promise<void>;
-export type EventCallbackSync = (err: Error, event: ContractEvent) => void;
-export type EventCallback = EventCallbackSync|EventCallbackAsync;
-export interface ContractEventObj {
- watch: (eventWatch: EventCallback) => void;
- stopWatching: () => void;
+export interface LogEvent extends LogWithDecodedArgs {
+ removed: boolean;
}
-export type CreateContractEvent = (indexFilterValues: IndexedFilterValues,
- subscriptionOpts: SubscriptionOpts) => ContractEventObj;
+export type EventCallbackAsync = (log: LogEvent) => Promise<void>;
+export type EventCallbackSync = (log: LogEvent) => void;
+export type EventCallback = EventCallbackSync|EventCallbackAsync;
export interface ExchangeContract extends Web3.ContractInstance {
isValidSignature: {
callAsync: (signerAddressHex: string, dataHex: string, v: number, r: string, s: string,
txOpts?: TxOpts) => Promise<boolean>;
};
- LogFill: CreateContractEvent;
- LogCancel: CreateContractEvent;
- LogError: CreateContractEvent;
ZRX_TOKEN_CONTRACT: {
callAsync: () => Promise<string>;
};
@@ -137,8 +132,6 @@ export interface ExchangeContract extends Web3.ContractInstance {
}
export interface TokenContract extends Web3.ContractInstance {
- Transfer: CreateContractEvent;
- Approval: CreateContractEvent;
balanceOf: {
callAsync: (address: string, defaultBlock?: Web3.BlockParam) => Promise<BigNumber.BigNumber>;
};
@@ -378,11 +371,6 @@ export interface OrderFillRequest {
export type AsyncMethod = (...args: any[]) => Promise<any>;
-export interface ContractEventEmitter {
- watch: (eventCallback: EventCallback) => void;
- stopWatchingAsync: () => Promise<void>;
-}
-
/**
* We re-export the `Web3.Provider` type specified in the Web3 Typescript typings
* since it is the type of the `provider` argument to the `ZeroEx` constructor.
diff --git a/src/utils/constants.ts b/src/utils/constants.ts
index 1b11d7055..a066fe869 100644
--- a/src/utils/constants.ts
+++ b/src/utils/constants.ts
@@ -7,4 +7,5 @@ export const constants = {
INVALID_JUMP_PATTERN: 'invalid JUMP at',
OUT_OF_GAS_PATTERN: 'out of gas',
UNLIMITED_ALLOWANCE_IN_BASE_UNITS: new BigNumber(2).pow(256).minus(1),
+ DEFAULT_BLOCK_POLLING_INTERVAL: 1000,
};
diff --git a/src/utils/event_utils.ts b/src/utils/event_utils.ts
deleted file mode 100644
index e8f30e1a8..000000000
--- a/src/utils/event_utils.ts
+++ /dev/null
@@ -1,41 +0,0 @@
-import * as _ from 'lodash';
-import {EventCallback, ContractEventArg, ContractEvent, ContractEventObj, ContractEventEmitter} from '../types';
-import * as BigNumber from 'bignumber.js';
-import promisify = require('es6-promisify');
-
-export const eventUtils = {
- wrapEventEmitter(event: ContractEventObj): ContractEventEmitter {
- const watch = (eventCallback: EventCallback) => {
- const bignumberWrappingEventCallback = eventUtils._getBigNumberWrappingEventCallback(eventCallback);
- event.watch(bignumberWrappingEventCallback);
- };
- const zeroExEvent = {
- watch,
- stopWatchingAsync: async () => {
- await promisify(event.stopWatching, event)();
- },
- };
- return zeroExEvent;
- },
- /**
- * Wraps eventCallback function so that all the BigNumber arguments are wrapped in a newer version of BigNumber.
- * @param eventCallback Event callback function to be wrapped
- * @return Wrapped event callback function
- */
- _getBigNumberWrappingEventCallback(eventCallback: EventCallback): EventCallback {
- const bignumberWrappingEventCallback = (err: Error, event: ContractEvent) => {
- if (_.isNull(err)) {
- const wrapIfBigNumber = (value: ContractEventArg): ContractEventArg => {
- // HACK: The old version of BigNumber used by Web3@0.19.0 does not support the `isBigNumber`
- // and checking for a BigNumber instance using `instanceof` does not work either. We therefore
- // check if the value constructor is a bignumber constructor.
- const isWeb3BigNumber = _.startsWith(value.constructor.toString(), 'function BigNumber(');
- return isWeb3BigNumber ? new BigNumber(value) : value;
- };
- event.args = _.mapValues(event.args, wrapIfBigNumber);
- }
- eventCallback(err, event);
- };
- return bignumberWrappingEventCallback;
- },
-};
diff --git a/src/utils/filter_utils.ts b/src/utils/filter_utils.ts
new file mode 100644
index 000000000..ee39b6836
--- /dev/null
+++ b/src/utils/filter_utils.ts
@@ -0,0 +1,80 @@
+import * as _ from 'lodash';
+import * as Web3 from 'web3';
+import * as uuid from 'uuid/v4';
+import * as ethUtil from 'ethereumjs-util';
+import {ContractEvents, IndexedFilterValues, SubscriptionOpts} from '../types';
+
+const TOPIC_LENGTH = 32;
+
+export const filterUtils = {
+ generateUUID(): string {
+ return uuid();
+ },
+ getFilter(keccak256: (data: string) => string, address: string, eventName: ContractEvents,
+ indexFilterValues: IndexedFilterValues, abi: Web3.ContractAbi,
+ subscriptionOpts?: SubscriptionOpts): Web3.FilterObject {
+ const eventAbi = _.find(abi, {name: eventName}) as Web3.EventAbi;
+ const eventSignature = filterUtils.getEventSignatureFromAbiByName(eventAbi, eventName);
+ const topicForEventSignature = keccak256(eventSignature);
+ const topicsForIndexedArgs = filterUtils.getTopicsForIndexedArgs(eventAbi, indexFilterValues);
+ const topics = [topicForEventSignature, ...topicsForIndexedArgs];
+ let filter: Web3.FilterObject = {
+ address,
+ topics,
+ };
+ if (!_.isUndefined(subscriptionOpts)) {
+ filter = {
+ ...subscriptionOpts,
+ ...filter,
+ };
+ }
+ return filter;
+ },
+ getEventSignatureFromAbiByName(eventAbi: Web3.EventAbi, eventName: ContractEvents): string {
+ const types = _.map(eventAbi.inputs, 'type');
+ const signature = `${eventAbi.name}(${types.join(',')})`;
+ return signature;
+ },
+ getTopicsForIndexedArgs(abi: Web3.EventAbi, indexFilterValues: IndexedFilterValues): Array<string|null> {
+ const topics: Array<string|null> = [];
+ for (const eventInput of abi.inputs) {
+ if (!eventInput.indexed) {
+ continue;
+ }
+ if (_.isUndefined(indexFilterValues[eventInput.name])) {
+ topics.push(null);
+ } else {
+ const value = indexFilterValues[eventInput.name] as string;
+ const buffer = ethUtil.toBuffer(value);
+ const paddedBuffer = ethUtil.setLengthLeft(buffer, TOPIC_LENGTH);
+ const topic = ethUtil.bufferToHex(paddedBuffer);
+ topics.push(topic);
+ }
+ }
+ return topics;
+ },
+ matchesFilter(log: Web3.LogEntry, filter: Web3.FilterObject): boolean {
+ if (!_.isUndefined(filter.address) && log.address !== filter.address) {
+ return false;
+ }
+ if (!_.isUndefined(filter.topics)) {
+ return filterUtils.matchesTopics(log.topics, filter.topics);
+ }
+ return true;
+ },
+ matchesTopics(logTopics: string[], filterTopics: Array<string[]|string|null>): boolean {
+ const matchesTopic = _.zipWith(logTopics, filterTopics, filterUtils.matchesTopic.bind(filterUtils));
+ const matchesTopics = _.every(matchesTopic);
+ return matchesTopics;
+ },
+ matchesTopic(logTopic: string, filterTopic: string[]|string|null): boolean {
+ if (_.isArray(filterTopic)) {
+ return _.includes(filterTopic, logTopic);
+ }
+ if (_.isString(filterTopic)) {
+ return filterTopic === logTopic;
+ }
+ // null topic is a wildcard
+ return true;
+ },
+};