diff options
author | Leonid <logvinov.leon@gmail.com> | 2017-10-06 20:24:42 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-06 20:24:42 +0800 |
commit | f38d2f80a6e3af4ff7e2454d42abdf2a389e4303 (patch) | |
tree | 9e2f94e7d8236f7612628288d7eab3333d240b6b /src/contract_wrappers/contract_wrapper.ts | |
parent | cd5327bc31618b4ea268de1902a74cf862987ee6 (diff) | |
parent | 0c112a2a1c1811e1fc7c4b03881f960b952c788c (diff) | |
download | dexon-sol-tools-f38d2f80a6e3af4ff7e2454d42abdf2a389e4303.tar dexon-sol-tools-f38d2f80a6e3af4ff7e2454d42abdf2a389e4303.tar.gz dexon-sol-tools-f38d2f80a6e3af4ff7e2454d42abdf2a389e4303.tar.bz2 dexon-sol-tools-f38d2f80a6e3af4ff7e2454d42abdf2a389e4303.tar.lz dexon-sol-tools-f38d2f80a6e3af4ff7e2454d42abdf2a389e4303.tar.xz dexon-sol-tools-f38d2f80a6e3af4ff7e2454d42abdf2a389e4303.tar.zst dexon-sol-tools-f38d2f80a6e3af4ff7e2454d42abdf2a389e4303.zip |
Merge pull request #182 from 0xProject/feature/ethereumjs-blockstream
Rewrite subscriptions
Diffstat (limited to 'src/contract_wrappers/contract_wrapper.ts')
-rw-r--r-- | src/contract_wrappers/contract_wrapper.ts | 120 |
1 files changed, 85 insertions, 35 deletions
diff --git a/src/contract_wrappers/contract_wrapper.ts b/src/contract_wrappers/contract_wrapper.ts index 743dfc9b2..f6ccfdee4 100644 --- a/src/contract_wrappers/contract_wrapper.ts +++ b/src/contract_wrappers/contract_wrapper.ts @@ -1,9 +1,10 @@ import * as _ from 'lodash'; import * as Web3 from 'web3'; -import * as ethUtil from 'ethereumjs-util'; +import {BlockAndLogStreamer, Block} from 'ethereumjs-blockstream'; import {Web3Wrapper} from '../web3_wrapper'; import {AbiDecoder} from '../utils/abi_decoder'; import { + ZeroExError, InternalZeroExError, Artifact, LogWithDecodedArgs, @@ -11,32 +12,57 @@ import { ContractEvents, SubscriptionOpts, IndexedFilterValues, + EventCallback, + BlockParamLiteral, } from '../types'; -import {utils} from '../utils/utils'; - -const TOPIC_LENGTH = 32; +import {constants} from '../utils/constants'; +import {intervalUtils} from '../utils/interval_utils'; +import {filterUtils} from '../utils/filter_utils'; export class ContractWrapper { protected _web3Wrapper: Web3Wrapper; private _abiDecoder?: AbiDecoder; + private _blockAndLogStreamer: BlockAndLogStreamer|undefined; + private _blockAndLogStreamInterval: NodeJS.Timer; + private _filters: {[filterToken: string]: Web3.FilterObject}; + private _filterCallbacks: {[filterToken: string]: EventCallback}; + private _onLogAddedSubscriptionToken: string|undefined; + private _onLogRemovedSubscriptionToken: string|undefined; constructor(web3Wrapper: Web3Wrapper, abiDecoder?: AbiDecoder) { this._web3Wrapper = web3Wrapper; this._abiDecoder = abiDecoder; + this._filters = {}; + this._filterCallbacks = {}; + this._blockAndLogStreamer = undefined; + this._onLogAddedSubscriptionToken = undefined; + this._onLogRemovedSubscriptionToken = undefined; + } + protected _subscribe(address: string, eventName: ContractEvents, + indexFilterValues: IndexedFilterValues, abi: Web3.ContractAbi, + callback: EventCallback): string { + const filter = filterUtils.getFilter(address, eventName, indexFilterValues, abi); + if (_.isUndefined(this._blockAndLogStreamer)) { + this._startBlockAndLogStream(); + } + const filterToken = filterUtils.generateUUID(); + this._filters[filterToken] = filter; + this._filterCallbacks[filterToken] = callback; + return filterToken; + } + protected _unsubscribe(filterToken: string): void { + if (_.isUndefined(this._filters[filterToken])) { + throw new Error(ZeroExError.SubscriptionNotFound); + } + delete this._filters[filterToken]; + delete this._filterCallbacks[filterToken]; + if (_.isEmpty(this._filters)) { + this._stopBlockAndLogStream(); + } } protected async _getLogsAsync(address: string, eventName: ContractEvents, subscriptionOpts: SubscriptionOpts, indexFilterValues: IndexedFilterValues, abi: Web3.ContractAbi): Promise<LogWithDecodedArgs[]> { - const eventAbi = _.find(abi, {name: eventName}) as Web3.EventAbi; - const eventSignature = this._getEventSignatureFromAbiByName(eventAbi, eventName); - const topicForEventSignature = this._web3Wrapper.keccak256(eventSignature); - const topicsForIndexedArgs = this._getTopicsForIndexedArgs(eventAbi, indexFilterValues); - const topics = [topicForEventSignature, ...topicsForIndexedArgs]; - const filter = { - fromBlock: subscriptionOpts.fromBlock, - toBlock: subscriptionOpts.toBlock, - address, - topics, - }; + const filter = filterUtils.getFilter(address, eventName, indexFilterValues, abi, subscriptionOpts); const logs = await this._web3Wrapper.getLogsAsync(filter); const logsWithDecodedArguments = _.map(logs, this._tryToDecodeLogOrNoop.bind(this)); return logsWithDecodedArguments; @@ -55,27 +81,51 @@ export class ContractWrapper { await this._web3Wrapper.getContractInstanceFromArtifactAsync<A>(artifact, addressIfExists); return contractInstance; } - protected _getEventSignatureFromAbiByName(eventAbi: Web3.EventAbi, eventName: ContractEvents): string { - const types = _.map(eventAbi.inputs, 'type'); - const signature = `${eventAbi.name}(${types.join(',')})`; - return signature; - } - private _getTopicsForIndexedArgs(abi: Web3.EventAbi, indexFilterValues: IndexedFilterValues): Array<string|null> { - const topics: Array<string|null> = []; - for (const eventInput of abi.inputs) { - if (!eventInput.indexed) { - continue; - } - if (_.isUndefined(indexFilterValues[eventInput.name])) { - topics.push(null); - } else { - const value = indexFilterValues[eventInput.name] as string; - const buffer = ethUtil.toBuffer(value); - const paddedBuffer = ethUtil.setLengthLeft(buffer, TOPIC_LENGTH); - const topic = ethUtil.bufferToHex(paddedBuffer); - topics.push(topic); + private _onLogStateChanged(removed: boolean, log: Web3.LogEntry): void { + _.forEach(this._filters, (filter: Web3.FilterObject, filterToken: string) => { + if (filterUtils.matchesFilter(log, filter)) { + const decodedLog = this._tryToDecodeLogOrNoop(log) as LogWithDecodedArgs; + const logEvent = { + ...decodedLog, + removed, + }; + this._filterCallbacks[filterToken](logEvent); } + }); + } + private _startBlockAndLogStream(): void { + this._blockAndLogStreamer = new BlockAndLogStreamer( + this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper), + this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper), + ); + const catchAllLogFilter = {}; + this._blockAndLogStreamer.addLogFilter(catchAllLogFilter); + this._blockAndLogStreamInterval = intervalUtils.setAsyncExcludingInterval( + this._reconcileBlockAsync.bind(this), constants.DEFAULT_BLOCK_POLLING_INTERVAL, + ); + let removed = false; + this._onLogAddedSubscriptionToken = this._blockAndLogStreamer.subscribeToOnLogAdded( + this._onLogStateChanged.bind(this, removed), + ); + removed = true; + this._onLogRemovedSubscriptionToken = this._blockAndLogStreamer.subscribeToOnLogRemoved( + this._onLogStateChanged.bind(this, removed), + ); + } + private _stopBlockAndLogStream(): void { + (this._blockAndLogStreamer as BlockAndLogStreamer).unsubscribeFromOnLogAdded( + this._onLogAddedSubscriptionToken as string); + (this._blockAndLogStreamer as BlockAndLogStreamer).unsubscribeFromOnLogRemoved( + this._onLogRemovedSubscriptionToken as string); + intervalUtils.clearAsyncExcludingInterval(this._blockAndLogStreamInterval); + delete this._blockAndLogStreamer; + } + private async _reconcileBlockAsync(): Promise<void> { + const latestBlock = await this._web3Wrapper.getBlockAsync(BlockParamLiteral.Latest); + // We need to coerce to Block type cause Web3.Block includes types for mempool blocks + if (!_.isUndefined(this._blockAndLogStreamer)) { + // If we clear the interval while fetching the block - this._blockAndLogStreamer will be undefined + this._blockAndLogStreamer.reconcileNewBlock(latestBlock as any as Block); } - return topics; } } |