aboutsummaryrefslogtreecommitdiffstats
path: root/src/contract_wrappers
diff options
context:
space:
mode:
authorLeonid Logvinov <logvinov.leon@gmail.com>2017-10-05 19:34:30 +0800
committerLeonid Logvinov <logvinov.leon@gmail.com>2017-10-05 20:35:37 +0800
commit7dd63523939822203d938511472c84b8ff418aaf (patch)
tree68e75aa486e673fc7faf2dc69e0527844b842e81 /src/contract_wrappers
parente37a3155cd52d35da3eef9a8dc450b9b3df0b888 (diff)
downloaddexon-sol-tools-7dd63523939822203d938511472c84b8ff418aaf.tar
dexon-sol-tools-7dd63523939822203d938511472c84b8ff418aaf.tar.gz
dexon-sol-tools-7dd63523939822203d938511472c84b8ff418aaf.tar.bz2
dexon-sol-tools-7dd63523939822203d938511472c84b8ff418aaf.tar.lz
dexon-sol-tools-7dd63523939822203d938511472c84b8ff418aaf.tar.xz
dexon-sol-tools-7dd63523939822203d938511472c84b8ff418aaf.tar.zst
dexon-sol-tools-7dd63523939822203d938511472c84b8ff418aaf.zip
Implement subscriptions based on ethereumjs-blockstream
Diffstat (limited to 'src/contract_wrappers')
-rw-r--r--src/contract_wrappers/contract_wrapper.ts122
-rw-r--r--src/contract_wrappers/exchange_wrapper.ts82
-rw-r--r--src/contract_wrappers/token_wrapper.ts60
3 files changed, 141 insertions, 123 deletions
diff --git a/src/contract_wrappers/contract_wrapper.ts b/src/contract_wrappers/contract_wrapper.ts
index 927a09b52..6f22f5bdb 100644
--- a/src/contract_wrappers/contract_wrapper.ts
+++ b/src/contract_wrappers/contract_wrapper.ts
@@ -1,10 +1,10 @@
import * as _ from 'lodash';
import * as Web3 from 'web3';
-import * as ethUtil from 'ethereumjs-util';
-import {BlockAndLogStreamer} from 'ethereumjs-blockstream';
+import {BlockAndLogStreamer, Block} from 'ethereumjs-blockstream';
import {Web3Wrapper} from '../web3_wrapper';
import {AbiDecoder} from '../utils/abi_decoder';
import {
+ ZeroExError,
InternalZeroExError,
Artifact,
LogWithDecodedArgs,
@@ -12,38 +12,73 @@ import {
ContractEvents,
SubscriptionOpts,
IndexedFilterValues,
+ EventCallback,
} from '../types';
import {utils} from '../utils/utils';
-
-const TOPIC_LENGTH = 32;
+import {constants} from '../utils/constants';
+import {intervalUtils} from '../utils/interval_utils';
+import {filterUtils} from '../utils/filter_utils';
export class ContractWrapper {
protected _web3Wrapper: Web3Wrapper;
private _abiDecoder?: AbiDecoder;
private _blockAndLogStreamer: BlockAndLogStreamer;
+ private _blockAndLogStreamInterval: NodeJS.Timer;
+ private _activeFilters: number;
+ private _filters: {[filterToken: string]: Web3.FilterObject};
+ private _filterCallbacks: {[filterToken: string]: EventCallback};
+ private _onLogAddedSubscriptionToken: string|undefined;
+ private _onLogRemovedSubscriptionToken: string|undefined;
constructor(web3Wrapper: Web3Wrapper, abiDecoder?: AbiDecoder) {
this._web3Wrapper = web3Wrapper;
this._abiDecoder = abiDecoder;
- const getBlockAsync = async (hash: string) => this._web3Wrapper.getBlockAsync(hash);
- this._blockAndLogStreamer = new BlockAndLogStreamer(
- this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper),
- this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper),
+ this._activeFilters = 0;
+ this._filters = {};
+ this._filterCallbacks = {};
+ this._onLogAddedSubscriptionToken = undefined;
+ this._onLogRemovedSubscriptionToken = undefined;
+ }
+ protected _subscribe(address: string, eventName: ContractEvents,
+ indexFilterValues: IndexedFilterValues, abi: Web3.ContractAbi,
+ callback: EventCallback): string {
+ const filter = filterUtils.getFilter(
+ this._web3Wrapper.keccak256.bind(this._web3Wrapper), address, eventName, indexFilterValues, abi,
);
+ if (_.isEmpty(this._filters)) {
+ this._startBlockAndLogStream();
+ let removed = false;
+ this._onLogAddedSubscriptionToken = this._blockAndLogStreamer.subscribeToOnLogAdded(
+ this._onLogStateChanged.bind(this, removed),
+ );
+ removed = true;
+ this._onLogRemovedSubscriptionToken = this._blockAndLogStreamer.subscribeToOnLogRemoved(
+ this._onLogStateChanged.bind(this, removed),
+ );
+ }
+ const filterToken = filterUtils.generateUUID();
+ this._filters[filterToken] = filter;
+ this._filterCallbacks[filterToken] = callback;
+ return filterToken;
+ }
+ protected _unsubscribe(filterToken: string): void {
+ if (_.isUndefined(this._filters[filterToken])) {
+ throw new Error(ZeroExError.SubscriptionNotFound);
+ }
+ delete this._filters[filterToken];
+ delete this._filterCallbacks[filterToken];
+ if (_.isEmpty(this._filters)) {
+ this._blockAndLogStreamer.unsubscribeFromOnLogAdded(this._onLogAddedSubscriptionToken as string);
+ this._blockAndLogStreamer.unsubscribeFromOnLogRemoved(this._onLogRemovedSubscriptionToken as string);
+ this._stopBlockAndLogStream();
+ }
}
protected async _getLogsAsync(address: string, eventName: ContractEvents, subscriptionOpts: SubscriptionOpts,
indexFilterValues: IndexedFilterValues,
abi: Web3.ContractAbi): Promise<LogWithDecodedArgs[]> {
- const eventAbi = _.find(abi, {name: eventName}) as Web3.EventAbi;
- const eventSignature = this._getEventSignatureFromAbiByName(eventAbi, eventName);
- const topicForEventSignature = this._web3Wrapper.keccak256(eventSignature);
- const topicsForIndexedArgs = this._getTopicsForIndexedArgs(eventAbi, indexFilterValues);
- const topics = [topicForEventSignature, ...topicsForIndexedArgs];
- const filter = {
- fromBlock: subscriptionOpts.fromBlock,
- toBlock: subscriptionOpts.toBlock,
- address,
- topics,
- };
+ const filter = filterUtils.getFilter(
+ this._web3Wrapper.keccak256.bind(this._web3Wrapper), address, eventName, indexFilterValues, abi,
+ subscriptionOpts,
+ );
const logs = await this._web3Wrapper.getLogsAsync(filter);
const logsWithDecodedArguments = _.map(logs, this._tryToDecodeLogOrNoop.bind(this));
return logsWithDecodedArguments;
@@ -62,27 +97,34 @@ export class ContractWrapper {
await this._web3Wrapper.getContractInstanceFromArtifactAsync<A>(artifact, addressIfExists);
return contractInstance;
}
- protected _getEventSignatureFromAbiByName(eventAbi: Web3.EventAbi, eventName: ContractEvents): string {
- const types = _.map(eventAbi.inputs, 'type');
- const signature = `${eventAbi.name}(${types.join(',')})`;
- return signature;
- }
- private _getTopicsForIndexedArgs(abi: Web3.EventAbi, indexFilterValues: IndexedFilterValues): Array<string|null> {
- const topics: Array<string|null> = [];
- for (const eventInput of abi.inputs) {
- if (!eventInput.indexed) {
- continue;
+ private _onLogStateChanged(removed: boolean, log: Web3.LogEntry): void {
+ _.forEach(this._filters, (filter: Web3.FilterObject, filterToken: string) => {
+ if (filterUtils.matchesFilter(log, filter)) {
+ const decodedLog = this._tryToDecodeLogOrNoop(log) as LogWithDecodedArgs;
+ const logEvent = {
+ ...decodedLog,
+ removed,
+ };
+ this._filterCallbacks[filterToken](logEvent);
}
- if (_.isUndefined(indexFilterValues[eventInput.name])) {
- topics.push(null);
- } else {
- const value = indexFilterValues[eventInput.name] as string;
- const buffer = ethUtil.toBuffer(value);
- const paddedBuffer = ethUtil.setLengthLeft(buffer, TOPIC_LENGTH);
- const topic = ethUtil.bufferToHex(paddedBuffer);
- topics.push(topic);
- }
- }
- return topics;
+ });
+ }
+ private _startBlockAndLogStream(): void {
+ this._blockAndLogStreamer = new BlockAndLogStreamer(
+ this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper),
+ this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper),
+ );
+ this._blockAndLogStreamer.addLogFilter({});
+ this._blockAndLogStreamInterval = intervalUtils.setAsyncExcludingInterval(
+ this._reconcileBlockAsync.bind(this), constants.DEFAULT_BLOCK_POLLING_INTERVAL,
+ );
+ }
+ private _stopBlockAndLogStream(): void {
+ intervalUtils.clearAsyncExcludingInterval(this._blockAndLogStreamInterval);
+ delete this._blockAndLogStreamer;
+ }
+ private async _reconcileBlockAsync(): Promise<void> {
+ const latestBlock = await this._web3Wrapper.getBlockAsync('latest');
+ this._blockAndLogStreamer.reconcileNewBlock(latestBlock as any as Block);
}
}
diff --git a/src/contract_wrappers/exchange_wrapper.ts b/src/contract_wrappers/exchange_wrapper.ts
index 32eaa590c..e5f190864 100644
--- a/src/contract_wrappers/exchange_wrapper.ts
+++ b/src/contract_wrappers/exchange_wrapper.ts
@@ -16,11 +16,8 @@ import {
SignedOrder,
ContractEvent,
ExchangeEvents,
- ContractEventEmitter,
SubscriptionOpts,
IndexedFilterValues,
- CreateContractEvent,
- ContractEventObj,
OrderCancellationRequest,
OrderFillRequest,
LogErrorContractEventArgs,
@@ -31,10 +28,10 @@ import {
ValidateOrderFillableOpts,
OrderTransactionOpts,
RawLog,
+ EventCallback,
} from '../types';
import {assert} from '../utils/assert';
import {utils} from '../utils/utils';
-import {eventUtils} from '../utils/event_utils';
import {OrderValidationUtils} from '../utils/order_validation_utils';
import {ContractWrapper} from './contract_wrapper';
import {constants} from '../utils/constants';
@@ -51,7 +48,7 @@ const SHOULD_VALIDATE_BY_DEFAULT = true;
*/
export class ExchangeWrapper extends ContractWrapper {
private _exchangeContractIfExists?: ExchangeContract;
- private _exchangeLogEventEmitters: ContractEventEmitter[];
+ private _activeSubscriptions: string[];
private _orderValidationUtils: OrderValidationUtils;
private _tokenWrapper: TokenWrapper;
private _exchangeContractErrCodesToMsg = {
@@ -86,7 +83,7 @@ export class ExchangeWrapper extends ContractWrapper {
super(web3Wrapper, abiDecoder);
this._tokenWrapper = tokenWrapper;
this._orderValidationUtils = new OrderValidationUtils(tokenWrapper, this);
- this._exchangeLogEventEmitters = [];
+ this._activeSubscriptions = [];
this._contractAddressIfExists = contractAddressIfExists;
}
/**
@@ -622,41 +619,32 @@ export class ExchangeWrapper extends ContractWrapper {
return txHash;
}
/**
- * Subscribe to an event type emitted by the Exchange smart contract
- * @param eventName The exchange contract event you would like to subscribe to.
- * @param subscriptionOpts Subscriptions options that let you configure the subscription.
- * @param indexFilterValues An object where the keys are indexed args returned by the event and
- * the value is the value you are interested in. E.g `{maker: aUserAddressHex}`
- * @param exchangeContractAddress The hex encoded address of the Exchange contract to call.
- * @return ContractEventEmitter object
+ * Subscribe to an event type emitted by the Exchange contract.
+ * @param tokenAddress The hex encoded address where the ERC20 token is deployed.
+ * @param eventName The exchange contract event you would like to subscribe to.
+ * @param indexFilterValues An object where the keys are indexed args returned by the event and
+ * the value is the value you are interested in. E.g `{maker: aUserAddressHex}`
+ * @param callback Callback that gets called when a log is added/removed
+ * @return ContractEventEmitter object
*/
- public async subscribeAsync(eventName: ExchangeEvents, subscriptionOpts: SubscriptionOpts,
- indexFilterValues: IndexedFilterValues, exchangeContractAddress: string):
- Promise<ContractEventEmitter> {
- assert.isETHAddressHex('exchangeContractAddress', exchangeContractAddress);
+ public async subscribeAsync(eventName: ExchangeEvents, indexFilterValues: IndexedFilterValues,
+ callback: EventCallback): Promise<string> {
assert.doesBelongToStringEnum('eventName', eventName, ExchangeEvents);
- assert.doesConformToSchema('subscriptionOpts', subscriptionOpts, schemas.subscriptionOptsSchema);
assert.doesConformToSchema('indexFilterValues', indexFilterValues, schemas.indexFilterValuesSchema);
- const exchangeContract = await this._getExchangeContractAsync();
- let createLogEvent: CreateContractEvent;
- switch (eventName) {
- case ExchangeEvents.LogFill:
- createLogEvent = exchangeContract.LogFill;
- break;
- case ExchangeEvents.LogError:
- createLogEvent = exchangeContract.LogError;
- break;
- case ExchangeEvents.LogCancel:
- createLogEvent = exchangeContract.LogCancel;
- break;
- default:
- throw utils.spawnSwitchErr('ExchangeEvents', eventName);
- }
-
- const logEventObj: ContractEventObj = createLogEvent(indexFilterValues, subscriptionOpts);
- const eventEmitter = eventUtils.wrapEventEmitter(logEventObj);
- this._exchangeLogEventEmitters.push(eventEmitter);
- return eventEmitter;
+ const exchangeContractAddress = await this.getContractAddressAsync();
+ const subscriptionToken = this._subscribe(
+ exchangeContractAddress, eventName, indexFilterValues, artifacts.ExchangeArtifact.abi, callback,
+ );
+ this._activeSubscriptions.push(subscriptionToken);
+ return subscriptionToken;
+ }
+ /**
+ * Cancel a subscription
+ * @param subscriptionToken Subscription token returned by `subscribe()`
+ */
+ public unsubscribe(subscriptionToken: string): void {
+ _.pull(this._activeSubscriptions, subscriptionToken);
+ this._unsubscribe(subscriptionToken);
}
/**
* Gets historical logs without creating a subscription
@@ -678,15 +666,6 @@ export class ExchangeWrapper extends ContractWrapper {
return logs;
}
/**
- * Stops watching for all exchange events
- */
- public async stopWatchingAllEventsAsync(): Promise<void> {
- const stopWatchingPromises = _.map(this._exchangeLogEventEmitters,
- logEventObj => logEventObj.stopWatchingAsync());
- await Promise.all(stopWatchingPromises);
- this._exchangeLogEventEmitters = [];
- }
- /**
* Retrieves the Ethereum address of the Exchange contract deployed on the network
* that the user-passed web3 provider is connected to.
* @returns The Ethereum address of the Exchange contract being used.
@@ -809,8 +788,15 @@ export class ExchangeWrapper extends ContractWrapper {
const ZRXtokenAddress = await exchangeInstance.ZRX_TOKEN_CONTRACT.callAsync();
return ZRXtokenAddress;
}
+ /**
+ * Cancels all existing subscriptions
+ */
+ public unsubscribeAll(): void {
+ _.forEach(this._activeSubscriptions, this._unsubscribe.bind(this));
+ this._activeSubscriptions = [];
+ }
private async _invalidateContractInstancesAsync(): Promise<void> {
- await this.stopWatchingAllEventsAsync();
+ this.unsubscribeAll();
delete this._exchangeContractIfExists;
}
private async _isValidSignatureUsingContractCallAsync(dataHex: string, ecSignature: ECSignature,
diff --git a/src/contract_wrappers/token_wrapper.ts b/src/contract_wrappers/token_wrapper.ts
index f988e6ece..d2b1ccf96 100644
--- a/src/contract_wrappers/token_wrapper.ts
+++ b/src/contract_wrappers/token_wrapper.ts
@@ -4,7 +4,6 @@ import {SchemaValidator, schemas} from '0x-json-schemas';
import {Web3Wrapper} from '../web3_wrapper';
import {assert} from '../utils/assert';
import {utils} from '../utils/utils';
-import {eventUtils} from '../utils/event_utils';
import {constants} from '../utils/constants';
import {ContractWrapper} from './contract_wrapper';
import {AbiDecoder} from '../utils/abi_decoder';
@@ -15,12 +14,10 @@ import {
TokenEvents,
IndexedFilterValues,
SubscriptionOpts,
- CreateContractEvent,
- ContractEventEmitter,
- ContractEventObj,
MethodOpts,
LogWithDecodedArgs,
RawLog,
+ EventCallback,
} from '../types';
const ALLOWANCE_TO_ZERO_GAS_AMOUNT = 47155;
@@ -33,13 +30,13 @@ const ALLOWANCE_TO_ZERO_GAS_AMOUNT = 47155;
export class TokenWrapper extends ContractWrapper {
public UNLIMITED_ALLOWANCE_IN_BASE_UNITS = constants.UNLIMITED_ALLOWANCE_IN_BASE_UNITS;
private _tokenContractsByAddress: {[address: string]: TokenContract};
- private _tokenLogEventEmitters: ContractEventEmitter[];
+ private _activeSubscriptions: string[];
private _tokenTransferProxyContractAddressFetcher: () => Promise<string>;
constructor(web3Wrapper: Web3Wrapper, abiDecoder: AbiDecoder,
tokenTransferProxyContractAddressFetcher: () => Promise<string>) {
super(web3Wrapper, abiDecoder);
this._tokenContractsByAddress = {};
- this._tokenLogEventEmitters = [];
+ this._activeSubscriptions = [];
this._tokenTransferProxyContractAddressFetcher = tokenTransferProxyContractAddressFetcher;
}
/**
@@ -251,34 +248,29 @@ export class TokenWrapper extends ContractWrapper {
* Subscribe to an event type emitted by the Token contract.
* @param tokenAddress The hex encoded address where the ERC20 token is deployed.
* @param eventName The token contract event you would like to subscribe to.
- * @param subscriptionOpts Subscriptions options that let you configure the subscription.
* @param indexFilterValues An object where the keys are indexed args returned by the event and
* the value is the value you are interested in. E.g `{maker: aUserAddressHex}`
+ * @param callback Callback that gets called when a log is added/removed
* @return ContractEventEmitter object
*/
- public async subscribeAsync(tokenAddress: string, eventName: TokenEvents, subscriptionOpts: SubscriptionOpts,
- indexFilterValues: IndexedFilterValues): Promise<ContractEventEmitter> {
+ public subscribe(tokenAddress: string, eventName: TokenEvents, indexFilterValues: IndexedFilterValues,
+ callback: EventCallback): string {
assert.isETHAddressHex('tokenAddress', tokenAddress);
assert.doesBelongToStringEnum('eventName', eventName, TokenEvents);
- assert.doesConformToSchema('subscriptionOpts', subscriptionOpts, schemas.subscriptionOptsSchema);
assert.doesConformToSchema('indexFilterValues', indexFilterValues, schemas.indexFilterValuesSchema);
- const tokenContract = await this._getTokenContractAsync(tokenAddress);
- let createLogEvent: CreateContractEvent;
- switch (eventName) {
- case TokenEvents.Approval:
- createLogEvent = tokenContract.Approval;
- break;
- case TokenEvents.Transfer:
- createLogEvent = tokenContract.Transfer;
- break;
- default:
- throw utils.spawnSwitchErr('TokenEvents', eventName);
- }
-
- const logEventObj: ContractEventObj = createLogEvent(indexFilterValues, subscriptionOpts);
- const eventEmitter = eventUtils.wrapEventEmitter(logEventObj);
- this._tokenLogEventEmitters.push(eventEmitter);
- return eventEmitter;
+ const subscriptionToken = this._subscribe(
+ tokenAddress, eventName, indexFilterValues, artifacts.TokenArtifact.abi, callback,
+ );
+ this._activeSubscriptions.push(subscriptionToken);
+ return subscriptionToken;
+ }
+ /**
+ * Cancel a subscription
+ * @param subscriptionToken Subscription token returned by `subscribe()`
+ */
+ public unsubscribe(subscriptionToken: string): void {
+ _.pull(this._activeSubscriptions, subscriptionToken);
+ this._unsubscribe(subscriptionToken);
}
/**
* Gets historical logs without creating a subscription
@@ -301,16 +293,14 @@ export class TokenWrapper extends ContractWrapper {
return logs;
}
/**
- * Stops watching for all token events
+ * Cancels all existing subscriptions
*/
- public async stopWatchingAllEventsAsync(): Promise<void> {
- const stopWatchingPromises = _.map(this._tokenLogEventEmitters,
- logEventObj => logEventObj.stopWatchingAsync());
- await Promise.all(stopWatchingPromises);
- this._tokenLogEventEmitters = [];
+ public unsubscribeAll(): void {
+ _.forEach(this._activeSubscriptions, this._unsubscribe.bind(this));
+ this._activeSubscriptions = [];
}
- private async _invalidateContractInstancesAsync(): Promise<void> {
- await this.stopWatchingAllEventsAsync();
+ private _invalidateContractInstancesAsync(): void {
+ this.unsubscribeAll();
this._tokenContractsByAddress = {};
}
private async _getTokenContractAsync(tokenAddress: string): Promise<TokenContract> {