diff options
author | Leonid Logvinov <logvinov.leon@gmail.com> | 2017-10-05 19:34:30 +0800 |
---|---|---|
committer | Leonid Logvinov <logvinov.leon@gmail.com> | 2017-10-05 20:35:37 +0800 |
commit | 7dd63523939822203d938511472c84b8ff418aaf (patch) | |
tree | 68e75aa486e673fc7faf2dc69e0527844b842e81 /src/contract_wrappers/contract_wrapper.ts | |
parent | e37a3155cd52d35da3eef9a8dc450b9b3df0b888 (diff) | |
download | dexon-sol-tools-7dd63523939822203d938511472c84b8ff418aaf.tar dexon-sol-tools-7dd63523939822203d938511472c84b8ff418aaf.tar.gz dexon-sol-tools-7dd63523939822203d938511472c84b8ff418aaf.tar.bz2 dexon-sol-tools-7dd63523939822203d938511472c84b8ff418aaf.tar.lz dexon-sol-tools-7dd63523939822203d938511472c84b8ff418aaf.tar.xz dexon-sol-tools-7dd63523939822203d938511472c84b8ff418aaf.tar.zst dexon-sol-tools-7dd63523939822203d938511472c84b8ff418aaf.zip |
Implement subscriptions based on ethereumjs-blockstream
Diffstat (limited to 'src/contract_wrappers/contract_wrapper.ts')
-rw-r--r-- | src/contract_wrappers/contract_wrapper.ts | 122 |
1 files changed, 82 insertions, 40 deletions
diff --git a/src/contract_wrappers/contract_wrapper.ts b/src/contract_wrappers/contract_wrapper.ts index 927a09b52..6f22f5bdb 100644 --- a/src/contract_wrappers/contract_wrapper.ts +++ b/src/contract_wrappers/contract_wrapper.ts @@ -1,10 +1,10 @@ import * as _ from 'lodash'; import * as Web3 from 'web3'; -import * as ethUtil from 'ethereumjs-util'; -import {BlockAndLogStreamer} from 'ethereumjs-blockstream'; +import {BlockAndLogStreamer, Block} from 'ethereumjs-blockstream'; import {Web3Wrapper} from '../web3_wrapper'; import {AbiDecoder} from '../utils/abi_decoder'; import { + ZeroExError, InternalZeroExError, Artifact, LogWithDecodedArgs, @@ -12,38 +12,73 @@ import { ContractEvents, SubscriptionOpts, IndexedFilterValues, + EventCallback, } from '../types'; import {utils} from '../utils/utils'; - -const TOPIC_LENGTH = 32; +import {constants} from '../utils/constants'; +import {intervalUtils} from '../utils/interval_utils'; +import {filterUtils} from '../utils/filter_utils'; export class ContractWrapper { protected _web3Wrapper: Web3Wrapper; private _abiDecoder?: AbiDecoder; private _blockAndLogStreamer: BlockAndLogStreamer; + private _blockAndLogStreamInterval: NodeJS.Timer; + private _activeFilters: number; + private _filters: {[filterToken: string]: Web3.FilterObject}; + private _filterCallbacks: {[filterToken: string]: EventCallback}; + private _onLogAddedSubscriptionToken: string|undefined; + private _onLogRemovedSubscriptionToken: string|undefined; constructor(web3Wrapper: Web3Wrapper, abiDecoder?: AbiDecoder) { this._web3Wrapper = web3Wrapper; this._abiDecoder = abiDecoder; - const getBlockAsync = async (hash: string) => this._web3Wrapper.getBlockAsync(hash); - this._blockAndLogStreamer = new BlockAndLogStreamer( - this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper), - this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper), + this._activeFilters = 0; + this._filters = {}; + this._filterCallbacks = {}; + this._onLogAddedSubscriptionToken = undefined; + this._onLogRemovedSubscriptionToken = undefined; + } + protected _subscribe(address: string, eventName: ContractEvents, + indexFilterValues: IndexedFilterValues, abi: Web3.ContractAbi, + callback: EventCallback): string { + const filter = filterUtils.getFilter( + this._web3Wrapper.keccak256.bind(this._web3Wrapper), address, eventName, indexFilterValues, abi, ); + if (_.isEmpty(this._filters)) { + this._startBlockAndLogStream(); + let removed = false; + this._onLogAddedSubscriptionToken = this._blockAndLogStreamer.subscribeToOnLogAdded( + this._onLogStateChanged.bind(this, removed), + ); + removed = true; + this._onLogRemovedSubscriptionToken = this._blockAndLogStreamer.subscribeToOnLogRemoved( + this._onLogStateChanged.bind(this, removed), + ); + } + const filterToken = filterUtils.generateUUID(); + this._filters[filterToken] = filter; + this._filterCallbacks[filterToken] = callback; + return filterToken; + } + protected _unsubscribe(filterToken: string): void { + if (_.isUndefined(this._filters[filterToken])) { + throw new Error(ZeroExError.SubscriptionNotFound); + } + delete this._filters[filterToken]; + delete this._filterCallbacks[filterToken]; + if (_.isEmpty(this._filters)) { + this._blockAndLogStreamer.unsubscribeFromOnLogAdded(this._onLogAddedSubscriptionToken as string); + this._blockAndLogStreamer.unsubscribeFromOnLogRemoved(this._onLogRemovedSubscriptionToken as string); + this._stopBlockAndLogStream(); + } } protected async _getLogsAsync(address: string, eventName: ContractEvents, subscriptionOpts: SubscriptionOpts, indexFilterValues: IndexedFilterValues, abi: Web3.ContractAbi): Promise<LogWithDecodedArgs[]> { - const eventAbi = _.find(abi, {name: eventName}) as Web3.EventAbi; - const eventSignature = this._getEventSignatureFromAbiByName(eventAbi, eventName); - const topicForEventSignature = this._web3Wrapper.keccak256(eventSignature); - const topicsForIndexedArgs = this._getTopicsForIndexedArgs(eventAbi, indexFilterValues); - const topics = [topicForEventSignature, ...topicsForIndexedArgs]; - const filter = { - fromBlock: subscriptionOpts.fromBlock, - toBlock: subscriptionOpts.toBlock, - address, - topics, - }; + const filter = filterUtils.getFilter( + this._web3Wrapper.keccak256.bind(this._web3Wrapper), address, eventName, indexFilterValues, abi, + subscriptionOpts, + ); const logs = await this._web3Wrapper.getLogsAsync(filter); const logsWithDecodedArguments = _.map(logs, this._tryToDecodeLogOrNoop.bind(this)); return logsWithDecodedArguments; @@ -62,27 +97,34 @@ export class ContractWrapper { await this._web3Wrapper.getContractInstanceFromArtifactAsync<A>(artifact, addressIfExists); return contractInstance; } - protected _getEventSignatureFromAbiByName(eventAbi: Web3.EventAbi, eventName: ContractEvents): string { - const types = _.map(eventAbi.inputs, 'type'); - const signature = `${eventAbi.name}(${types.join(',')})`; - return signature; - } - private _getTopicsForIndexedArgs(abi: Web3.EventAbi, indexFilterValues: IndexedFilterValues): Array<string|null> { - const topics: Array<string|null> = []; - for (const eventInput of abi.inputs) { - if (!eventInput.indexed) { - continue; + private _onLogStateChanged(removed: boolean, log: Web3.LogEntry): void { + _.forEach(this._filters, (filter: Web3.FilterObject, filterToken: string) => { + if (filterUtils.matchesFilter(log, filter)) { + const decodedLog = this._tryToDecodeLogOrNoop(log) as LogWithDecodedArgs; + const logEvent = { + ...decodedLog, + removed, + }; + this._filterCallbacks[filterToken](logEvent); } - if (_.isUndefined(indexFilterValues[eventInput.name])) { - topics.push(null); - } else { - const value = indexFilterValues[eventInput.name] as string; - const buffer = ethUtil.toBuffer(value); - const paddedBuffer = ethUtil.setLengthLeft(buffer, TOPIC_LENGTH); - const topic = ethUtil.bufferToHex(paddedBuffer); - topics.push(topic); - } - } - return topics; + }); + } + private _startBlockAndLogStream(): void { + this._blockAndLogStreamer = new BlockAndLogStreamer( + this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper), + this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper), + ); + this._blockAndLogStreamer.addLogFilter({}); + this._blockAndLogStreamInterval = intervalUtils.setAsyncExcludingInterval( + this._reconcileBlockAsync.bind(this), constants.DEFAULT_BLOCK_POLLING_INTERVAL, + ); + } + private _stopBlockAndLogStream(): void { + intervalUtils.clearAsyncExcludingInterval(this._blockAndLogStreamInterval); + delete this._blockAndLogStreamer; + } + private async _reconcileBlockAsync(): Promise<void> { + const latestBlock = await this._web3Wrapper.getBlockAsync('latest'); + this._blockAndLogStreamer.reconcileNewBlock(latestBlock as any as Block); } } |