aboutsummaryrefslogtreecommitdiffstats
path: root/src/contract_wrappers/contract_wrapper.ts
diff options
context:
space:
mode:
authorLeonid Logvinov <logvinov.leon@gmail.com>2017-10-05 19:34:30 +0800
committerLeonid Logvinov <logvinov.leon@gmail.com>2017-10-05 20:35:37 +0800
commit7dd63523939822203d938511472c84b8ff418aaf (patch)
tree68e75aa486e673fc7faf2dc69e0527844b842e81 /src/contract_wrappers/contract_wrapper.ts
parente37a3155cd52d35da3eef9a8dc450b9b3df0b888 (diff)
downloaddexon-sol-tools-7dd63523939822203d938511472c84b8ff418aaf.tar
dexon-sol-tools-7dd63523939822203d938511472c84b8ff418aaf.tar.gz
dexon-sol-tools-7dd63523939822203d938511472c84b8ff418aaf.tar.bz2
dexon-sol-tools-7dd63523939822203d938511472c84b8ff418aaf.tar.lz
dexon-sol-tools-7dd63523939822203d938511472c84b8ff418aaf.tar.xz
dexon-sol-tools-7dd63523939822203d938511472c84b8ff418aaf.tar.zst
dexon-sol-tools-7dd63523939822203d938511472c84b8ff418aaf.zip
Implement subscriptions based on ethereumjs-blockstream
Diffstat (limited to 'src/contract_wrappers/contract_wrapper.ts')
-rw-r--r--src/contract_wrappers/contract_wrapper.ts122
1 files changed, 82 insertions, 40 deletions
diff --git a/src/contract_wrappers/contract_wrapper.ts b/src/contract_wrappers/contract_wrapper.ts
index 927a09b52..6f22f5bdb 100644
--- a/src/contract_wrappers/contract_wrapper.ts
+++ b/src/contract_wrappers/contract_wrapper.ts
@@ -1,10 +1,10 @@
import * as _ from 'lodash';
import * as Web3 from 'web3';
-import * as ethUtil from 'ethereumjs-util';
-import {BlockAndLogStreamer} from 'ethereumjs-blockstream';
+import {BlockAndLogStreamer, Block} from 'ethereumjs-blockstream';
import {Web3Wrapper} from '../web3_wrapper';
import {AbiDecoder} from '../utils/abi_decoder';
import {
+ ZeroExError,
InternalZeroExError,
Artifact,
LogWithDecodedArgs,
@@ -12,38 +12,73 @@ import {
ContractEvents,
SubscriptionOpts,
IndexedFilterValues,
+ EventCallback,
} from '../types';
import {utils} from '../utils/utils';
-
-const TOPIC_LENGTH = 32;
+import {constants} from '../utils/constants';
+import {intervalUtils} from '../utils/interval_utils';
+import {filterUtils} from '../utils/filter_utils';
export class ContractWrapper {
protected _web3Wrapper: Web3Wrapper;
private _abiDecoder?: AbiDecoder;
private _blockAndLogStreamer: BlockAndLogStreamer;
+ private _blockAndLogStreamInterval: NodeJS.Timer;
+ private _activeFilters: number;
+ private _filters: {[filterToken: string]: Web3.FilterObject};
+ private _filterCallbacks: {[filterToken: string]: EventCallback};
+ private _onLogAddedSubscriptionToken: string|undefined;
+ private _onLogRemovedSubscriptionToken: string|undefined;
constructor(web3Wrapper: Web3Wrapper, abiDecoder?: AbiDecoder) {
this._web3Wrapper = web3Wrapper;
this._abiDecoder = abiDecoder;
- const getBlockAsync = async (hash: string) => this._web3Wrapper.getBlockAsync(hash);
- this._blockAndLogStreamer = new BlockAndLogStreamer(
- this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper),
- this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper),
+ this._activeFilters = 0;
+ this._filters = {};
+ this._filterCallbacks = {};
+ this._onLogAddedSubscriptionToken = undefined;
+ this._onLogRemovedSubscriptionToken = undefined;
+ }
+ protected _subscribe(address: string, eventName: ContractEvents,
+ indexFilterValues: IndexedFilterValues, abi: Web3.ContractAbi,
+ callback: EventCallback): string {
+ const filter = filterUtils.getFilter(
+ this._web3Wrapper.keccak256.bind(this._web3Wrapper), address, eventName, indexFilterValues, abi,
);
+ if (_.isEmpty(this._filters)) {
+ this._startBlockAndLogStream();
+ let removed = false;
+ this._onLogAddedSubscriptionToken = this._blockAndLogStreamer.subscribeToOnLogAdded(
+ this._onLogStateChanged.bind(this, removed),
+ );
+ removed = true;
+ this._onLogRemovedSubscriptionToken = this._blockAndLogStreamer.subscribeToOnLogRemoved(
+ this._onLogStateChanged.bind(this, removed),
+ );
+ }
+ const filterToken = filterUtils.generateUUID();
+ this._filters[filterToken] = filter;
+ this._filterCallbacks[filterToken] = callback;
+ return filterToken;
+ }
+ protected _unsubscribe(filterToken: string): void {
+ if (_.isUndefined(this._filters[filterToken])) {
+ throw new Error(ZeroExError.SubscriptionNotFound);
+ }
+ delete this._filters[filterToken];
+ delete this._filterCallbacks[filterToken];
+ if (_.isEmpty(this._filters)) {
+ this._blockAndLogStreamer.unsubscribeFromOnLogAdded(this._onLogAddedSubscriptionToken as string);
+ this._blockAndLogStreamer.unsubscribeFromOnLogRemoved(this._onLogRemovedSubscriptionToken as string);
+ this._stopBlockAndLogStream();
+ }
}
protected async _getLogsAsync(address: string, eventName: ContractEvents, subscriptionOpts: SubscriptionOpts,
indexFilterValues: IndexedFilterValues,
abi: Web3.ContractAbi): Promise<LogWithDecodedArgs[]> {
- const eventAbi = _.find(abi, {name: eventName}) as Web3.EventAbi;
- const eventSignature = this._getEventSignatureFromAbiByName(eventAbi, eventName);
- const topicForEventSignature = this._web3Wrapper.keccak256(eventSignature);
- const topicsForIndexedArgs = this._getTopicsForIndexedArgs(eventAbi, indexFilterValues);
- const topics = [topicForEventSignature, ...topicsForIndexedArgs];
- const filter = {
- fromBlock: subscriptionOpts.fromBlock,
- toBlock: subscriptionOpts.toBlock,
- address,
- topics,
- };
+ const filter = filterUtils.getFilter(
+ this._web3Wrapper.keccak256.bind(this._web3Wrapper), address, eventName, indexFilterValues, abi,
+ subscriptionOpts,
+ );
const logs = await this._web3Wrapper.getLogsAsync(filter);
const logsWithDecodedArguments = _.map(logs, this._tryToDecodeLogOrNoop.bind(this));
return logsWithDecodedArguments;
@@ -62,27 +97,34 @@ export class ContractWrapper {
await this._web3Wrapper.getContractInstanceFromArtifactAsync<A>(artifact, addressIfExists);
return contractInstance;
}
- protected _getEventSignatureFromAbiByName(eventAbi: Web3.EventAbi, eventName: ContractEvents): string {
- const types = _.map(eventAbi.inputs, 'type');
- const signature = `${eventAbi.name}(${types.join(',')})`;
- return signature;
- }
- private _getTopicsForIndexedArgs(abi: Web3.EventAbi, indexFilterValues: IndexedFilterValues): Array<string|null> {
- const topics: Array<string|null> = [];
- for (const eventInput of abi.inputs) {
- if (!eventInput.indexed) {
- continue;
+ private _onLogStateChanged(removed: boolean, log: Web3.LogEntry): void {
+ _.forEach(this._filters, (filter: Web3.FilterObject, filterToken: string) => {
+ if (filterUtils.matchesFilter(log, filter)) {
+ const decodedLog = this._tryToDecodeLogOrNoop(log) as LogWithDecodedArgs;
+ const logEvent = {
+ ...decodedLog,
+ removed,
+ };
+ this._filterCallbacks[filterToken](logEvent);
}
- if (_.isUndefined(indexFilterValues[eventInput.name])) {
- topics.push(null);
- } else {
- const value = indexFilterValues[eventInput.name] as string;
- const buffer = ethUtil.toBuffer(value);
- const paddedBuffer = ethUtil.setLengthLeft(buffer, TOPIC_LENGTH);
- const topic = ethUtil.bufferToHex(paddedBuffer);
- topics.push(topic);
- }
- }
- return topics;
+ });
+ }
+ private _startBlockAndLogStream(): void {
+ this._blockAndLogStreamer = new BlockAndLogStreamer(
+ this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper),
+ this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper),
+ );
+ this._blockAndLogStreamer.addLogFilter({});
+ this._blockAndLogStreamInterval = intervalUtils.setAsyncExcludingInterval(
+ this._reconcileBlockAsync.bind(this), constants.DEFAULT_BLOCK_POLLING_INTERVAL,
+ );
+ }
+ private _stopBlockAndLogStream(): void {
+ intervalUtils.clearAsyncExcludingInterval(this._blockAndLogStreamInterval);
+ delete this._blockAndLogStreamer;
+ }
+ private async _reconcileBlockAsync(): Promise<void> {
+ const latestBlock = await this._web3Wrapper.getBlockAsync('latest');
+ this._blockAndLogStreamer.reconcileNewBlock(latestBlock as any as Block);
}
}