aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorLeonid <logvinov.leon@gmail.com>2017-10-06 20:24:42 +0800
committerGitHub <noreply@github.com>2017-10-06 20:24:42 +0800
commitf38d2f80a6e3af4ff7e2454d42abdf2a389e4303 (patch)
tree9e2f94e7d8236f7612628288d7eab3333d240b6b /src
parentcd5327bc31618b4ea268de1902a74cf862987ee6 (diff)
parent0c112a2a1c1811e1fc7c4b03881f960b952c788c (diff)
downloaddexon-sol-tools-f38d2f80a6e3af4ff7e2454d42abdf2a389e4303.tar
dexon-sol-tools-f38d2f80a6e3af4ff7e2454d42abdf2a389e4303.tar.gz
dexon-sol-tools-f38d2f80a6e3af4ff7e2454d42abdf2a389e4303.tar.bz2
dexon-sol-tools-f38d2f80a6e3af4ff7e2454d42abdf2a389e4303.tar.lz
dexon-sol-tools-f38d2f80a6e3af4ff7e2454d42abdf2a389e4303.tar.xz
dexon-sol-tools-f38d2f80a6e3af4ff7e2454d42abdf2a389e4303.tar.zst
dexon-sol-tools-f38d2f80a6e3af4ff7e2454d42abdf2a389e4303.zip
Merge pull request #182 from 0xProject/feature/ethereumjs-blockstream
Rewrite subscriptions
Diffstat (limited to 'src')
-rw-r--r--src/0x.ts9
-rw-r--r--src/contract_wrappers/contract_wrapper.ts120
-rw-r--r--src/contract_wrappers/exchange_wrapper.ts86
-rw-r--r--src/contract_wrappers/token_wrapper.ts67
-rw-r--r--src/index.ts2
-rw-r--r--src/subproviders/empty_wallet_subprovider.ts1
-rw-r--r--src/types.ts32
-rw-r--r--src/utils/abi_decoder.ts2
-rw-r--r--src/utils/assert.ts3
-rw-r--r--src/utils/constants.ts1
-rw-r--r--src/utils/event_utils.ts41
-rw-r--r--src/utils/filter_utils.ts82
-rw-r--r--src/web3_wrapper.ts12
13 files changed, 255 insertions, 203 deletions
diff --git a/src/0x.ts b/src/0x.ts
index 3180c52f6..9f955c807 100644
--- a/src/0x.ts
+++ b/src/0x.ts
@@ -1,12 +1,8 @@
import * as _ from 'lodash';
import * as BigNumber from 'bignumber.js';
-import * as Web3 from 'web3';
-import * as abiDecoder from 'abi-decoder';
import {SchemaValidator, schemas} from '0x-json-schemas';
import {bigNumberConfigs} from './bignumber_config';
import * as ethUtil from 'ethereumjs-util';
-import findVersions = require('find-versions');
-import compareVersions = require('compare-versions');
import {Web3Wrapper} from './web3_wrapper';
import {constants} from './utils/constants';
import {utils} from './utils/utils';
@@ -27,12 +23,7 @@ import {
SignedOrder,
Web3Provider,
ZeroExConfig,
- TransactionReceipt,
- DecodedLogArgs,
TransactionReceiptWithDecodedLogs,
- LogWithDecodedArgs,
- FilterObject,
- RawLog,
} from './types';
import {zeroExConfigSchema} from './schemas/zero_ex_config_schema';
diff --git a/src/contract_wrappers/contract_wrapper.ts b/src/contract_wrappers/contract_wrapper.ts
index 743dfc9b2..f6ccfdee4 100644
--- a/src/contract_wrappers/contract_wrapper.ts
+++ b/src/contract_wrappers/contract_wrapper.ts
@@ -1,9 +1,10 @@
import * as _ from 'lodash';
import * as Web3 from 'web3';
-import * as ethUtil from 'ethereumjs-util';
+import {BlockAndLogStreamer, Block} from 'ethereumjs-blockstream';
import {Web3Wrapper} from '../web3_wrapper';
import {AbiDecoder} from '../utils/abi_decoder';
import {
+ ZeroExError,
InternalZeroExError,
Artifact,
LogWithDecodedArgs,
@@ -11,32 +12,57 @@ import {
ContractEvents,
SubscriptionOpts,
IndexedFilterValues,
+ EventCallback,
+ BlockParamLiteral,
} from '../types';
-import {utils} from '../utils/utils';
-
-const TOPIC_LENGTH = 32;
+import {constants} from '../utils/constants';
+import {intervalUtils} from '../utils/interval_utils';
+import {filterUtils} from '../utils/filter_utils';
export class ContractWrapper {
protected _web3Wrapper: Web3Wrapper;
private _abiDecoder?: AbiDecoder;
+ private _blockAndLogStreamer: BlockAndLogStreamer|undefined;
+ private _blockAndLogStreamInterval: NodeJS.Timer;
+ private _filters: {[filterToken: string]: Web3.FilterObject};
+ private _filterCallbacks: {[filterToken: string]: EventCallback};
+ private _onLogAddedSubscriptionToken: string|undefined;
+ private _onLogRemovedSubscriptionToken: string|undefined;
constructor(web3Wrapper: Web3Wrapper, abiDecoder?: AbiDecoder) {
this._web3Wrapper = web3Wrapper;
this._abiDecoder = abiDecoder;
+ this._filters = {};
+ this._filterCallbacks = {};
+ this._blockAndLogStreamer = undefined;
+ this._onLogAddedSubscriptionToken = undefined;
+ this._onLogRemovedSubscriptionToken = undefined;
+ }
+ protected _subscribe(address: string, eventName: ContractEvents,
+ indexFilterValues: IndexedFilterValues, abi: Web3.ContractAbi,
+ callback: EventCallback): string {
+ const filter = filterUtils.getFilter(address, eventName, indexFilterValues, abi);
+ if (_.isUndefined(this._blockAndLogStreamer)) {
+ this._startBlockAndLogStream();
+ }
+ const filterToken = filterUtils.generateUUID();
+ this._filters[filterToken] = filter;
+ this._filterCallbacks[filterToken] = callback;
+ return filterToken;
+ }
+ protected _unsubscribe(filterToken: string): void {
+ if (_.isUndefined(this._filters[filterToken])) {
+ throw new Error(ZeroExError.SubscriptionNotFound);
+ }
+ delete this._filters[filterToken];
+ delete this._filterCallbacks[filterToken];
+ if (_.isEmpty(this._filters)) {
+ this._stopBlockAndLogStream();
+ }
}
protected async _getLogsAsync(address: string, eventName: ContractEvents, subscriptionOpts: SubscriptionOpts,
indexFilterValues: IndexedFilterValues,
abi: Web3.ContractAbi): Promise<LogWithDecodedArgs[]> {
- const eventAbi = _.find(abi, {name: eventName}) as Web3.EventAbi;
- const eventSignature = this._getEventSignatureFromAbiByName(eventAbi, eventName);
- const topicForEventSignature = this._web3Wrapper.keccak256(eventSignature);
- const topicsForIndexedArgs = this._getTopicsForIndexedArgs(eventAbi, indexFilterValues);
- const topics = [topicForEventSignature, ...topicsForIndexedArgs];
- const filter = {
- fromBlock: subscriptionOpts.fromBlock,
- toBlock: subscriptionOpts.toBlock,
- address,
- topics,
- };
+ const filter = filterUtils.getFilter(address, eventName, indexFilterValues, abi, subscriptionOpts);
const logs = await this._web3Wrapper.getLogsAsync(filter);
const logsWithDecodedArguments = _.map(logs, this._tryToDecodeLogOrNoop.bind(this));
return logsWithDecodedArguments;
@@ -55,27 +81,51 @@ export class ContractWrapper {
await this._web3Wrapper.getContractInstanceFromArtifactAsync<A>(artifact, addressIfExists);
return contractInstance;
}
- protected _getEventSignatureFromAbiByName(eventAbi: Web3.EventAbi, eventName: ContractEvents): string {
- const types = _.map(eventAbi.inputs, 'type');
- const signature = `${eventAbi.name}(${types.join(',')})`;
- return signature;
- }
- private _getTopicsForIndexedArgs(abi: Web3.EventAbi, indexFilterValues: IndexedFilterValues): Array<string|null> {
- const topics: Array<string|null> = [];
- for (const eventInput of abi.inputs) {
- if (!eventInput.indexed) {
- continue;
- }
- if (_.isUndefined(indexFilterValues[eventInput.name])) {
- topics.push(null);
- } else {
- const value = indexFilterValues[eventInput.name] as string;
- const buffer = ethUtil.toBuffer(value);
- const paddedBuffer = ethUtil.setLengthLeft(buffer, TOPIC_LENGTH);
- const topic = ethUtil.bufferToHex(paddedBuffer);
- topics.push(topic);
+ private _onLogStateChanged(removed: boolean, log: Web3.LogEntry): void {
+ _.forEach(this._filters, (filter: Web3.FilterObject, filterToken: string) => {
+ if (filterUtils.matchesFilter(log, filter)) {
+ const decodedLog = this._tryToDecodeLogOrNoop(log) as LogWithDecodedArgs;
+ const logEvent = {
+ ...decodedLog,
+ removed,
+ };
+ this._filterCallbacks[filterToken](logEvent);
}
+ });
+ }
+ private _startBlockAndLogStream(): void {
+ this._blockAndLogStreamer = new BlockAndLogStreamer(
+ this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper),
+ this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper),
+ );
+ const catchAllLogFilter = {};
+ this._blockAndLogStreamer.addLogFilter(catchAllLogFilter);
+ this._blockAndLogStreamInterval = intervalUtils.setAsyncExcludingInterval(
+ this._reconcileBlockAsync.bind(this), constants.DEFAULT_BLOCK_POLLING_INTERVAL,
+ );
+ let removed = false;
+ this._onLogAddedSubscriptionToken = this._blockAndLogStreamer.subscribeToOnLogAdded(
+ this._onLogStateChanged.bind(this, removed),
+ );
+ removed = true;
+ this._onLogRemovedSubscriptionToken = this._blockAndLogStreamer.subscribeToOnLogRemoved(
+ this._onLogStateChanged.bind(this, removed),
+ );
+ }
+ private _stopBlockAndLogStream(): void {
+ (this._blockAndLogStreamer as BlockAndLogStreamer).unsubscribeFromOnLogAdded(
+ this._onLogAddedSubscriptionToken as string);
+ (this._blockAndLogStreamer as BlockAndLogStreamer).unsubscribeFromOnLogRemoved(
+ this._onLogRemovedSubscriptionToken as string);
+ intervalUtils.clearAsyncExcludingInterval(this._blockAndLogStreamInterval);
+ delete this._blockAndLogStreamer;
+ }
+ private async _reconcileBlockAsync(): Promise<void> {
+ const latestBlock = await this._web3Wrapper.getBlockAsync(BlockParamLiteral.Latest);
+ // We need to coerce to Block type cause Web3.Block includes types for mempool blocks
+ if (!_.isUndefined(this._blockAndLogStreamer)) {
+ // If we clear the interval while fetching the block - this._blockAndLogStreamer will be undefined
+ this._blockAndLogStreamer.reconcileNewBlock(latestBlock as any as Block);
}
- return topics;
}
}
diff --git a/src/contract_wrappers/exchange_wrapper.ts b/src/contract_wrappers/exchange_wrapper.ts
index 32eaa590c..5f02903ce 100644
--- a/src/contract_wrappers/exchange_wrapper.ts
+++ b/src/contract_wrappers/exchange_wrapper.ts
@@ -1,7 +1,6 @@
import * as _ from 'lodash';
import * as BigNumber from 'bignumber.js';
-import {SchemaValidator, schemas} from '0x-json-schemas';
-import promisify = require('es6-promisify');
+import {schemas} from '0x-json-schemas';
import {Web3Wrapper} from '../web3_wrapper';
import {
ECSignature,
@@ -16,11 +15,8 @@ import {
SignedOrder,
ContractEvent,
ExchangeEvents,
- ContractEventEmitter,
SubscriptionOpts,
IndexedFilterValues,
- CreateContractEvent,
- ContractEventObj,
OrderCancellationRequest,
OrderFillRequest,
LogErrorContractEventArgs,
@@ -31,13 +27,12 @@ import {
ValidateOrderFillableOpts,
OrderTransactionOpts,
RawLog,
+ EventCallback,
} from '../types';
import {assert} from '../utils/assert';
import {utils} from '../utils/utils';
-import {eventUtils} from '../utils/event_utils';
import {OrderValidationUtils} from '../utils/order_validation_utils';
import {ContractWrapper} from './contract_wrapper';
-import {constants} from '../utils/constants';
import {TokenWrapper} from './token_wrapper';
import {decorators} from '../utils/decorators';
import {AbiDecoder} from '../utils/abi_decoder';
@@ -51,7 +46,7 @@ const SHOULD_VALIDATE_BY_DEFAULT = true;
*/
export class ExchangeWrapper extends ContractWrapper {
private _exchangeContractIfExists?: ExchangeContract;
- private _exchangeLogEventEmitters: ContractEventEmitter[];
+ private _activeSubscriptions: string[];
private _orderValidationUtils: OrderValidationUtils;
private _tokenWrapper: TokenWrapper;
private _exchangeContractErrCodesToMsg = {
@@ -86,7 +81,7 @@ export class ExchangeWrapper extends ContractWrapper {
super(web3Wrapper, abiDecoder);
this._tokenWrapper = tokenWrapper;
this._orderValidationUtils = new OrderValidationUtils(tokenWrapper, this);
- this._exchangeLogEventEmitters = [];
+ this._activeSubscriptions = [];
this._contractAddressIfExists = contractAddressIfExists;
}
/**
@@ -622,41 +617,32 @@ export class ExchangeWrapper extends ContractWrapper {
return txHash;
}
/**
- * Subscribe to an event type emitted by the Exchange smart contract
- * @param eventName The exchange contract event you would like to subscribe to.
- * @param subscriptionOpts Subscriptions options that let you configure the subscription.
- * @param indexFilterValues An object where the keys are indexed args returned by the event and
- * the value is the value you are interested in. E.g `{maker: aUserAddressHex}`
- * @param exchangeContractAddress The hex encoded address of the Exchange contract to call.
- * @return ContractEventEmitter object
+ * Subscribe to an event type emitted by the Exchange contract.
+ * @param eventName The exchange contract event you would like to subscribe to.
+ * @param indexFilterValues An object where the keys are indexed args returned by the event and
+ * the value is the value you are interested in. E.g `{maker: aUserAddressHex}`
+ * @param callback Callback that gets called when a log is added/removed
+ * @return Subscription token used later to unsubscribe
*/
- public async subscribeAsync(eventName: ExchangeEvents, subscriptionOpts: SubscriptionOpts,
- indexFilterValues: IndexedFilterValues, exchangeContractAddress: string):
- Promise<ContractEventEmitter> {
- assert.isETHAddressHex('exchangeContractAddress', exchangeContractAddress);
+ public async subscribeAsync(eventName: ExchangeEvents, indexFilterValues: IndexedFilterValues,
+ callback: EventCallback): Promise<string> {
assert.doesBelongToStringEnum('eventName', eventName, ExchangeEvents);
- assert.doesConformToSchema('subscriptionOpts', subscriptionOpts, schemas.subscriptionOptsSchema);
assert.doesConformToSchema('indexFilterValues', indexFilterValues, schemas.indexFilterValuesSchema);
- const exchangeContract = await this._getExchangeContractAsync();
- let createLogEvent: CreateContractEvent;
- switch (eventName) {
- case ExchangeEvents.LogFill:
- createLogEvent = exchangeContract.LogFill;
- break;
- case ExchangeEvents.LogError:
- createLogEvent = exchangeContract.LogError;
- break;
- case ExchangeEvents.LogCancel:
- createLogEvent = exchangeContract.LogCancel;
- break;
- default:
- throw utils.spawnSwitchErr('ExchangeEvents', eventName);
- }
-
- const logEventObj: ContractEventObj = createLogEvent(indexFilterValues, subscriptionOpts);
- const eventEmitter = eventUtils.wrapEventEmitter(logEventObj);
- this._exchangeLogEventEmitters.push(eventEmitter);
- return eventEmitter;
+ assert.isFunction('callback', callback);
+ const exchangeContractAddress = await this.getContractAddressAsync();
+ const subscriptionToken = this._subscribe(
+ exchangeContractAddress, eventName, indexFilterValues, artifacts.ExchangeArtifact.abi, callback,
+ );
+ this._activeSubscriptions.push(subscriptionToken);
+ return subscriptionToken;
+ }
+ /**
+ * Cancel a subscription
+ * @param subscriptionToken Subscription token returned by `subscribe()`
+ */
+ public unsubscribe(subscriptionToken: string): void {
+ _.pull(this._activeSubscriptions, subscriptionToken);
+ this._unsubscribe(subscriptionToken);
}
/**
* Gets historical logs without creating a subscription
@@ -678,15 +664,6 @@ export class ExchangeWrapper extends ContractWrapper {
return logs;
}
/**
- * Stops watching for all exchange events
- */
- public async stopWatchingAllEventsAsync(): Promise<void> {
- const stopWatchingPromises = _.map(this._exchangeLogEventEmitters,
- logEventObj => logEventObj.stopWatchingAsync());
- await Promise.all(stopWatchingPromises);
- this._exchangeLogEventEmitters = [];
- }
- /**
* Retrieves the Ethereum address of the Exchange contract deployed on the network
* that the user-passed web3 provider is connected to.
* @returns The Ethereum address of the Exchange contract being used.
@@ -809,8 +786,15 @@ export class ExchangeWrapper extends ContractWrapper {
const ZRXtokenAddress = await exchangeInstance.ZRX_TOKEN_CONTRACT.callAsync();
return ZRXtokenAddress;
}
+ /**
+ * Cancels all existing subscriptions
+ */
+ public unsubscribeAll(): void {
+ _.forEach(this._activeSubscriptions, this._unsubscribe.bind(this));
+ this._activeSubscriptions = [];
+ }
private async _invalidateContractInstancesAsync(): Promise<void> {
- await this.stopWatchingAllEventsAsync();
+ this.unsubscribeAll();
delete this._exchangeContractIfExists;
}
private async _isValidSignatureUsingContractCallAsync(dataHex: string, ecSignature: ECSignature,
diff --git a/src/contract_wrappers/token_wrapper.ts b/src/contract_wrappers/token_wrapper.ts
index f988e6ece..abd090f7e 100644
--- a/src/contract_wrappers/token_wrapper.ts
+++ b/src/contract_wrappers/token_wrapper.ts
@@ -1,10 +1,8 @@
import * as _ from 'lodash';
import * as BigNumber from 'bignumber.js';
-import {SchemaValidator, schemas} from '0x-json-schemas';
+import {schemas} from '0x-json-schemas';
import {Web3Wrapper} from '../web3_wrapper';
import {assert} from '../utils/assert';
-import {utils} from '../utils/utils';
-import {eventUtils} from '../utils/event_utils';
import {constants} from '../utils/constants';
import {ContractWrapper} from './contract_wrapper';
import {AbiDecoder} from '../utils/abi_decoder';
@@ -15,12 +13,9 @@ import {
TokenEvents,
IndexedFilterValues,
SubscriptionOpts,
- CreateContractEvent,
- ContractEventEmitter,
- ContractEventObj,
MethodOpts,
LogWithDecodedArgs,
- RawLog,
+ EventCallback,
} from '../types';
const ALLOWANCE_TO_ZERO_GAS_AMOUNT = 47155;
@@ -33,13 +28,13 @@ const ALLOWANCE_TO_ZERO_GAS_AMOUNT = 47155;
export class TokenWrapper extends ContractWrapper {
public UNLIMITED_ALLOWANCE_IN_BASE_UNITS = constants.UNLIMITED_ALLOWANCE_IN_BASE_UNITS;
private _tokenContractsByAddress: {[address: string]: TokenContract};
- private _tokenLogEventEmitters: ContractEventEmitter[];
+ private _activeSubscriptions: string[];
private _tokenTransferProxyContractAddressFetcher: () => Promise<string>;
constructor(web3Wrapper: Web3Wrapper, abiDecoder: AbiDecoder,
tokenTransferProxyContractAddressFetcher: () => Promise<string>) {
super(web3Wrapper, abiDecoder);
this._tokenContractsByAddress = {};
- this._tokenLogEventEmitters = [];
+ this._activeSubscriptions = [];
this._tokenTransferProxyContractAddressFetcher = tokenTransferProxyContractAddressFetcher;
}
/**
@@ -251,34 +246,30 @@ export class TokenWrapper extends ContractWrapper {
* Subscribe to an event type emitted by the Token contract.
* @param tokenAddress The hex encoded address where the ERC20 token is deployed.
* @param eventName The token contract event you would like to subscribe to.
- * @param subscriptionOpts Subscriptions options that let you configure the subscription.
* @param indexFilterValues An object where the keys are indexed args returned by the event and
* the value is the value you are interested in. E.g `{maker: aUserAddressHex}`
- * @return ContractEventEmitter object
+ * @param callback Callback that gets called when a log is added/removed
+ * @return Subscription token used later to unsubscribe
*/
- public async subscribeAsync(tokenAddress: string, eventName: TokenEvents, subscriptionOpts: SubscriptionOpts,
- indexFilterValues: IndexedFilterValues): Promise<ContractEventEmitter> {
+ public subscribe(tokenAddress: string, eventName: TokenEvents, indexFilterValues: IndexedFilterValues,
+ callback: EventCallback): string {
assert.isETHAddressHex('tokenAddress', tokenAddress);
assert.doesBelongToStringEnum('eventName', eventName, TokenEvents);
- assert.doesConformToSchema('subscriptionOpts', subscriptionOpts, schemas.subscriptionOptsSchema);
assert.doesConformToSchema('indexFilterValues', indexFilterValues, schemas.indexFilterValuesSchema);
- const tokenContract = await this._getTokenContractAsync(tokenAddress);
- let createLogEvent: CreateContractEvent;
- switch (eventName) {
- case TokenEvents.Approval:
- createLogEvent = tokenContract.Approval;
- break;
- case TokenEvents.Transfer:
- createLogEvent = tokenContract.Transfer;
- break;
- default:
- throw utils.spawnSwitchErr('TokenEvents', eventName);
- }
-
- const logEventObj: ContractEventObj = createLogEvent(indexFilterValues, subscriptionOpts);
- const eventEmitter = eventUtils.wrapEventEmitter(logEventObj);
- this._tokenLogEventEmitters.push(eventEmitter);
- return eventEmitter;
+ assert.isFunction('callback', callback);
+ const subscriptionToken = this._subscribe(
+ tokenAddress, eventName, indexFilterValues, artifacts.TokenArtifact.abi, callback,
+ );
+ this._activeSubscriptions.push(subscriptionToken);
+ return subscriptionToken;
+ }
+ /**
+ * Cancel a subscription
+ * @param subscriptionToken Subscription token returned by `subscribe()`
+ */
+ public unsubscribe(subscriptionToken: string): void {
+ _.pull(this._activeSubscriptions, subscriptionToken);
+ this._unsubscribe(subscriptionToken);
}
/**
* Gets historical logs without creating a subscription
@@ -301,16 +292,14 @@ export class TokenWrapper extends ContractWrapper {
return logs;
}
/**
- * Stops watching for all token events
+ * Cancels all existing subscriptions
*/
- public async stopWatchingAllEventsAsync(): Promise<void> {
- const stopWatchingPromises = _.map(this._tokenLogEventEmitters,
- logEventObj => logEventObj.stopWatchingAsync());
- await Promise.all(stopWatchingPromises);
- this._tokenLogEventEmitters = [];
+ public unsubscribeAll(): void {
+ _.forEach(this._activeSubscriptions, this._unsubscribe.bind(this));
+ this._activeSubscriptions = [];
}
- private async _invalidateContractInstancesAsync(): Promise<void> {
- await this.stopWatchingAllEventsAsync();
+ private _invalidateContractInstancesAsync(): void {
+ this.unsubscribeAll();
this._tokenContractsByAddress = {};
}
private async _getTokenContractAsync(tokenAddress: string): Promise<TokenContract> {
diff --git a/src/index.ts b/src/index.ts
index 3359743e9..97ab084b7 100644
--- a/src/index.ts
+++ b/src/index.ts
@@ -19,7 +19,6 @@ export {
OrderFillOrKillRequest,
OrderCancellationRequest,
OrderFillRequest,
- ContractEventEmitter,
LogErrorContractEventArgs,
LogCancelContractEventArgs,
LogFillContractEventArgs,
@@ -36,4 +35,5 @@ export {
MethodOpts,
OrderTransactionOpts,
FilterObject,
+ LogEvent,
} from './types';
diff --git a/src/subproviders/empty_wallet_subprovider.ts b/src/subproviders/empty_wallet_subprovider.ts
index 0d037042d..2f260217c 100644
--- a/src/subproviders/empty_wallet_subprovider.ts
+++ b/src/subproviders/empty_wallet_subprovider.ts
@@ -1,4 +1,3 @@
-import * as Web3 from 'web3';
import {JSONRPCPayload} from '../types';
/*
diff --git a/src/types.ts b/src/types.ts
index 35bb6af78..44094f442 100644
--- a/src/types.ts
+++ b/src/types.ts
@@ -14,6 +14,7 @@ export enum ZeroExError {
InvalidJump = 'INVALID_JUMP',
OutOfGas = 'OUT_OF_GAS',
NoNetworkId = 'NO_NETWORK_ID',
+ SubscriptionNotFound = 'SUBSCRIPTION_NOT_FOUND',
}
export enum InternalZeroExError {
@@ -35,23 +36,17 @@ export type OrderAddresses = [string, string, string, string, string];
export type OrderValues = [BigNumber.BigNumber, BigNumber.BigNumber, BigNumber.BigNumber,
BigNumber.BigNumber, BigNumber.BigNumber, BigNumber.BigNumber];
-export type EventCallbackAsync = (err: Error, event: ContractEvent) => Promise<void>;
-export type EventCallbackSync = (err: Error, event: ContractEvent) => void;
-export type EventCallback = EventCallbackSync|EventCallbackAsync;
-export interface ContractEventObj {
- watch: (eventWatch: EventCallback) => void;
- stopWatching: () => void;
+export interface LogEvent extends LogWithDecodedArgs {
+ removed: boolean;
}
-export type CreateContractEvent = (indexFilterValues: IndexedFilterValues,
- subscriptionOpts: SubscriptionOpts) => ContractEventObj;
+export type EventCallbackAsync = (log: LogEvent) => Promise<void>;
+export type EventCallbackSync = (log: LogEvent) => void;
+export type EventCallback = EventCallbackSync|EventCallbackAsync;
export interface ExchangeContract extends Web3.ContractInstance {
isValidSignature: {
callAsync: (signerAddressHex: string, dataHex: string, v: number, r: string, s: string,
txOpts?: TxOpts) => Promise<boolean>;
};
- LogFill: CreateContractEvent;
- LogCancel: CreateContractEvent;
- LogError: CreateContractEvent;
ZRX_TOKEN_CONTRACT: {
callAsync: () => Promise<string>;
};
@@ -137,8 +132,6 @@ export interface ExchangeContract extends Web3.ContractInstance {
}
export interface TokenContract extends Web3.ContractInstance {
- Transfer: CreateContractEvent;
- Approval: CreateContractEvent;
balanceOf: {
callAsync: (address: string, defaultBlock?: Web3.BlockParam) => Promise<BigNumber.BigNumber>;
};
@@ -352,7 +345,13 @@ export interface IndexedFilterValues {
[index: string]: ContractEventArg;
}
-export type BlockParam = 'latest'|'earliest'|'pending'|number;
+export enum BlockParamLiteral {
+ Latest = 'latest',
+ Earliest = 'earliest',
+ Pending = 'pending',
+}
+
+export type BlockParam = BlockParamLiteral|number;
export interface SubscriptionOpts {
fromBlock: BlockParam;
@@ -378,11 +377,6 @@ export interface OrderFillRequest {
export type AsyncMethod = (...args: any[]) => Promise<any>;
-export interface ContractEventEmitter {
- watch: (eventCallback: EventCallback) => void;
- stopWatchingAsync: () => Promise<void>;
-}
-
/**
* We re-export the `Web3.Provider` type specified in the Web3 Typescript typings
* since it is the type of the `provider` argument to the `ZeroEx` constructor.
diff --git a/src/utils/abi_decoder.ts b/src/utils/abi_decoder.ts
index 542591251..52b114c12 100644
--- a/src/utils/abi_decoder.ts
+++ b/src/utils/abi_decoder.ts
@@ -1,7 +1,7 @@
import * as Web3 from 'web3';
import * as _ from 'lodash';
import * as BigNumber from 'bignumber.js';
-import {AbiType, DecodedLogArgs, DecodedArgs, LogWithDecodedArgs, RawLog, SolidityTypes} from '../types';
+import {AbiType, DecodedLogArgs, LogWithDecodedArgs, RawLog, SolidityTypes} from '../types';
import * as SolidityCoder from 'web3/lib/solidity/coder';
export class AbiDecoder {
diff --git a/src/utils/assert.ts b/src/utils/assert.ts
index eb084129c..0b7a11939 100644
--- a/src/utils/assert.ts
+++ b/src/utils/assert.ts
@@ -17,6 +17,9 @@ export const assert = {
isString(variableName: string, value: string): void {
this.assert(_.isString(value), this.typeAssertionMessage(variableName, 'string', value));
},
+ isFunction(variableName: string, value: any): void {
+ this.assert(_.isFunction(value), this.typeAssertionMessage(variableName, 'function', value));
+ },
isHexString(variableName: string, value: string): void {
this.assert(_.isString(value) && HEX_REGEX.test(value),
this.typeAssertionMessage(variableName, 'HexString', value));
diff --git a/src/utils/constants.ts b/src/utils/constants.ts
index 1b11d7055..a066fe869 100644
--- a/src/utils/constants.ts
+++ b/src/utils/constants.ts
@@ -7,4 +7,5 @@ export const constants = {
INVALID_JUMP_PATTERN: 'invalid JUMP at',
OUT_OF_GAS_PATTERN: 'out of gas',
UNLIMITED_ALLOWANCE_IN_BASE_UNITS: new BigNumber(2).pow(256).minus(1),
+ DEFAULT_BLOCK_POLLING_INTERVAL: 1000,
};
diff --git a/src/utils/event_utils.ts b/src/utils/event_utils.ts
deleted file mode 100644
index e8f30e1a8..000000000
--- a/src/utils/event_utils.ts
+++ /dev/null
@@ -1,41 +0,0 @@
-import * as _ from 'lodash';
-import {EventCallback, ContractEventArg, ContractEvent, ContractEventObj, ContractEventEmitter} from '../types';
-import * as BigNumber from 'bignumber.js';
-import promisify = require('es6-promisify');
-
-export const eventUtils = {
- wrapEventEmitter(event: ContractEventObj): ContractEventEmitter {
- const watch = (eventCallback: EventCallback) => {
- const bignumberWrappingEventCallback = eventUtils._getBigNumberWrappingEventCallback(eventCallback);
- event.watch(bignumberWrappingEventCallback);
- };
- const zeroExEvent = {
- watch,
- stopWatchingAsync: async () => {
- await promisify(event.stopWatching, event)();
- },
- };
- return zeroExEvent;
- },
- /**
- * Wraps eventCallback function so that all the BigNumber arguments are wrapped in a newer version of BigNumber.
- * @param eventCallback Event callback function to be wrapped
- * @return Wrapped event callback function
- */
- _getBigNumberWrappingEventCallback(eventCallback: EventCallback): EventCallback {
- const bignumberWrappingEventCallback = (err: Error, event: ContractEvent) => {
- if (_.isNull(err)) {
- const wrapIfBigNumber = (value: ContractEventArg): ContractEventArg => {
- // HACK: The old version of BigNumber used by Web3@0.19.0 does not support the `isBigNumber`
- // and checking for a BigNumber instance using `instanceof` does not work either. We therefore
- // check if the value constructor is a bignumber constructor.
- const isWeb3BigNumber = _.startsWith(value.constructor.toString(), 'function BigNumber(');
- return isWeb3BigNumber ? new BigNumber(value) : value;
- };
- event.args = _.mapValues(event.args, wrapIfBigNumber);
- }
- eventCallback(err, event);
- };
- return bignumberWrappingEventCallback;
- },
-};
diff --git a/src/utils/filter_utils.ts b/src/utils/filter_utils.ts
new file mode 100644
index 000000000..e09a95a6e
--- /dev/null
+++ b/src/utils/filter_utils.ts
@@ -0,0 +1,82 @@
+import * as _ from 'lodash';
+import * as Web3 from 'web3';
+import * as uuid from 'uuid/v4';
+import * as ethUtil from 'ethereumjs-util';
+import * as jsSHA3 from 'js-sha3';
+import {ContractEvents, IndexedFilterValues, SubscriptionOpts} from '../types';
+
+const TOPIC_LENGTH = 32;
+
+export const filterUtils = {
+ generateUUID(): string {
+ return uuid();
+ },
+ getFilter(address: string, eventName: ContractEvents,
+ indexFilterValues: IndexedFilterValues, abi: Web3.ContractAbi,
+ subscriptionOpts?: SubscriptionOpts): Web3.FilterObject {
+ const eventAbi = _.find(abi, {name: eventName}) as Web3.EventAbi;
+ const eventSignature = filterUtils.getEventSignatureFromAbiByName(eventAbi, eventName);
+ const topicForEventSignature = ethUtil.addHexPrefix(jsSHA3.keccak256(eventSignature));
+ const topicsForIndexedArgs = filterUtils.getTopicsForIndexedArgs(eventAbi, indexFilterValues);
+ const topics = [topicForEventSignature, ...topicsForIndexedArgs];
+ let filter: Web3.FilterObject = {
+ address,
+ topics,
+ };
+ if (!_.isUndefined(subscriptionOpts)) {
+ filter = {
+ ...subscriptionOpts,
+ ...filter,
+ };
+ }
+ return filter;
+ },
+ getEventSignatureFromAbiByName(eventAbi: Web3.EventAbi, eventName: ContractEvents): string {
+ const types = _.map(eventAbi.inputs, 'type');
+ const signature = `${eventAbi.name}(${types.join(',')})`;
+ return signature;
+ },
+ getTopicsForIndexedArgs(abi: Web3.EventAbi, indexFilterValues: IndexedFilterValues): Array<string|null> {
+ const topics: Array<string|null> = [];
+ for (const eventInput of abi.inputs) {
+ if (!eventInput.indexed) {
+ continue;
+ }
+ if (_.isUndefined(indexFilterValues[eventInput.name])) {
+ // Null is a wildcard topic in a JSON-RPC call
+ topics.push(null);
+ } else {
+ const value = indexFilterValues[eventInput.name] as string;
+ const buffer = ethUtil.toBuffer(value);
+ const paddedBuffer = ethUtil.setLengthLeft(buffer, TOPIC_LENGTH);
+ const topic = ethUtil.bufferToHex(paddedBuffer);
+ topics.push(topic);
+ }
+ }
+ return topics;
+ },
+ matchesFilter(log: Web3.LogEntry, filter: Web3.FilterObject): boolean {
+ if (!_.isUndefined(filter.address) && log.address !== filter.address) {
+ return false;
+ }
+ if (!_.isUndefined(filter.topics)) {
+ return filterUtils.matchesTopics(log.topics, filter.topics);
+ }
+ return true;
+ },
+ matchesTopics(logTopics: string[], filterTopics: Array<string[]|string|null>): boolean {
+ const matchesTopic = _.zipWith(logTopics, filterTopics, filterUtils.matchesTopic.bind(filterUtils));
+ const matchesTopics = _.every(matchesTopic);
+ return matchesTopics;
+ },
+ matchesTopic(logTopic: string, filterTopic: string[]|string|null): boolean {
+ if (_.isArray(filterTopic)) {
+ return _.includes(filterTopic, logTopic);
+ }
+ if (_.isString(filterTopic)) {
+ return filterTopic === logTopic;
+ }
+ // null topic is a wildcard
+ return true;
+ },
+};
diff --git a/src/web3_wrapper.ts b/src/web3_wrapper.ts
index 7576f3d40..9de75c809 100644
--- a/src/web3_wrapper.ts
+++ b/src/web3_wrapper.ts
@@ -94,8 +94,12 @@ export class Web3Wrapper {
const signData = await promisify(this.web3.eth.sign)(address, message);
return signData;
}
- public async getBlockTimestampAsync(blockHash: string): Promise<number> {
- const {timestamp} = await promisify(this.web3.eth.getBlock)(blockHash);
+ public async getBlockAsync(blockParam: string|Web3.BlockParam): Promise<Web3.BlockWithoutTransactionData> {
+ const block = await promisify(this.web3.eth.getBlock)(blockParam);
+ return block;
+ }
+ public async getBlockTimestampAsync(blockParam: string|Web3.BlockParam): Promise<number> {
+ const {timestamp} = await this.getBlockAsync(blockParam);
return timestamp;
}
public async getAvailableAddressesAsync(): Promise<string[]> {
@@ -112,10 +116,6 @@ export class Web3Wrapper {
const logs = await this.sendRawPayloadAsync(payload);
return logs;
}
- public keccak256(data: string): string {
- const hash = this.web3.sha3(data);
- return hash;
- }
private getContractInstance<A extends Web3.ContractInstance>(abi: Web3.ContractAbi, address: string): A {
const web3ContractInstance = this.web3.eth.contract(abi).at(address);
const contractInstance = new Contract(web3ContractInstance, this.defaults) as any as A;