diff options
author | Leonid Logvinov <logvinov.leon@gmail.com> | 2017-10-26 17:50:02 +0800 |
---|---|---|
committer | Fabio Berger <me@fabioberger.com> | 2017-10-31 00:49:16 +0800 |
commit | f53472e7170798f56ea4837c310cfd4188326af8 (patch) | |
tree | 64f4151571e0fc91df67ee07e2ce286a298e0aba | |
parent | 7fa5d34c45f58ebdf729e9f7a500627ab72fb5c8 (diff) | |
download | dexon-0x-contracts-f53472e7170798f56ea4837c310cfd4188326af8.tar dexon-0x-contracts-f53472e7170798f56ea4837c310cfd4188326af8.tar.gz dexon-0x-contracts-f53472e7170798f56ea4837c310cfd4188326af8.tar.bz2 dexon-0x-contracts-f53472e7170798f56ea4837c310cfd4188326af8.tar.lz dexon-0x-contracts-f53472e7170798f56ea4837c310cfd4188326af8.tar.xz dexon-0x-contracts-f53472e7170798f56ea4837c310cfd4188326af8.tar.zst dexon-0x-contracts-f53472e7170798f56ea4837c310cfd4188326af8.zip |
Add initial mempool watching implememtation
-rw-r--r-- | package.json | 4 | ||||
-rw-r--r-- | src/0x.ts | 10 | ||||
-rw-r--r-- | src/index.ts | 2 | ||||
-rw-r--r-- | src/mempool/mempool_watcher.ts | 64 | ||||
-rw-r--r-- | src/types.ts | 24 | ||||
-rw-r--r-- | test/exchange_wrapper_test.ts | 24 | ||||
-rw-r--r-- | test/token_wrapper_test.ts | 11 | ||||
-rw-r--r-- | yarn.lock | 8 |
8 files changed, 115 insertions, 32 deletions
diff --git a/package.json b/package.json index 8e1cfd502..e615a8a3a 100644 --- a/package.json +++ b/package.json @@ -78,14 +78,14 @@ "sinon": "^4.0.0", "source-map-support": "^0.5.0", "truffle-hdwallet-provider": "^0.0.3", - "tslint": "^5.3.2", + "tslint": "~5.5.0", "tslint-config-0xproject": "^0.0.2", "typedoc": "~0.8.0", "types-bn": "^0.0.1", "types-ethereumjs-util": "0xProject/types-ethereumjs-util", "typescript": "^2.4.1", "web3-provider-engine": "^13.0.1", - "web3-typescript-typings": "^0.6.2", + "web3-typescript-typings": "^0.7.0", "webpack": "^3.1.0" }, "dependencies": { @@ -11,6 +11,7 @@ import {assert} from './utils/assert'; import {AbiDecoder} from './utils/abi_decoder'; import {intervalUtils} from './utils/interval_utils'; import {artifacts} from './artifacts'; +import {MempoolWatcher} from './mempool/mempool_watcher'; import {ExchangeWrapper} from './contract_wrappers/exchange_wrapper'; import {TokenRegistryWrapper} from './contract_wrappers/token_registry_wrapper'; import {EtherTokenWrapper} from './contract_wrappers/ether_token_wrapper'; @@ -65,6 +66,10 @@ export class ZeroEx { * tokenTransferProxy smart contract. */ public proxy: TokenTransferProxyWrapper; + /** + * An instance of the MempoolWatcher class containing methods for watching pending events. + */ + public mempool: MempoolWatcher; private _web3Wrapper: Web3Wrapper; private _abiDecoder: AbiDecoder; /** @@ -191,6 +196,11 @@ export class ZeroEx { gasPrice, }; this._web3Wrapper = new Web3Wrapper(provider, defaults); + const mempoolPollingIntervalMs = _.isUndefined(config) ? undefined : config.mempoolPollingIntervalMs; + this.mempool = new MempoolWatcher( + this._web3Wrapper, + mempoolPollingIntervalMs, + ); this.token = new TokenWrapper( this._web3Wrapper, this._abiDecoder, diff --git a/src/index.ts b/src/index.ts index 249c20519..7a9b8aa63 100644 --- a/src/index.ts +++ b/src/index.ts @@ -35,4 +35,6 @@ export { OrderTransactionOpts, FilterObject, LogEvent, + DecodedLogEvent, + MempoolEventCallback, } from './types'; diff --git a/src/mempool/mempool_watcher.ts b/src/mempool/mempool_watcher.ts new file mode 100644 index 000000000..be598c28f --- /dev/null +++ b/src/mempool/mempool_watcher.ts @@ -0,0 +1,64 @@ +import * as Web3 from 'web3'; +import * as _ from 'lodash'; +import {Web3Wrapper} from '../web3_wrapper'; +import {BlockParamLiteral, EventCallback, MempoolEventCallback} from '../types'; +import {AbiDecoder} from '../utils/abi_decoder'; +import {intervalUtils} from '../utils/interval_utils'; + +const DEFAULT_MEMPOOL_POLLING_INTERVAL = 200; + +export class MempoolWatcher { + private _web3Wrapper: Web3Wrapper; + private _pollingIntervalMs: number; + private _intervalId: NodeJS.Timer; + private _lastMempoolEvents: Web3.LogEntry[] = []; + private _callback?: MempoolEventCallback; + constructor(web3Wrapper: Web3Wrapper, pollingIntervalMs: undefined|number) { + this._web3Wrapper = web3Wrapper; + this._pollingIntervalMs = _.isUndefined(pollingIntervalMs) ? + DEFAULT_MEMPOOL_POLLING_INTERVAL : + pollingIntervalMs; + } + public subscribe(callback: MempoolEventCallback): void { + this._callback = callback; + this._intervalId = intervalUtils.setAsyncExcludingInterval( + this._pollForMempoolEventsAsync.bind(this), this._pollingIntervalMs); + } + public unsubscribe(): void { + delete this._callback; + intervalUtils.clearAsyncExcludingInterval(this._intervalId); + } + private async _pollForMempoolEventsAsync(): Promise<void> { + const pendingEvents = await this._getMempoolEventsAsync(); + if (pendingEvents.length === 0) { + // HACK: Sometimes when node rebuilds the pending block we get back the empty result. + // We don't want to emit a lot of removal events and bring them back after a couple of miliseconds, + // that's why we just ignore those cases. + return; + } + const removedEvents = _.differenceBy(this._lastMempoolEvents, pendingEvents, _.isEqual); + const newEvents = _.differenceBy(pendingEvents, this._lastMempoolEvents, _.isEqual); + let isRemoved = true; + this._emitDifferences(removedEvents, isRemoved); + isRemoved = false; + this._emitDifferences(newEvents, isRemoved); + this._lastMempoolEvents = pendingEvents; + } + private async _getMempoolEventsAsync(): Promise<Web3.LogEntry[]> { + const mempoolFilter = { + fromBlock: BlockParamLiteral.Pending, + toBlock: BlockParamLiteral.Pending, + }; + const pendingEvents = await this._web3Wrapper.getLogsAsync(mempoolFilter); + return pendingEvents; + } + private _emitDifferences(logs: Web3.LogEntry[], isRemoved: boolean): void { + _.forEach(logs, log => { + const logWithDecodedArgsEvent = { + removed: isRemoved, + ...log, + }; + (this._callback as MempoolEventCallback)(logWithDecodedArgsEvent); + }); + } +} diff --git a/src/types.ts b/src/types.ts index 9ac726ef8..1b32ccdf9 100644 --- a/src/types.ts +++ b/src/types.ts @@ -37,12 +37,17 @@ export type OrderAddresses = [string, string, string, string, string]; export type OrderValues = [BigNumber, BigNumber, BigNumber, BigNumber, BigNumber, BigNumber]; -export interface LogEvent<ArgsType> extends LogWithDecodedArgs<ArgsType> { - removed: boolean; -} -export type EventCallbackAsync<ArgsType> = (log: LogEvent<ArgsType>) => Promise<void>; -export type EventCallbackSync<ArgsType> = (log: LogEvent<ArgsType>) => void; +export type LogEvent = Web3.LogEntryEvent; +export type DecodedLogEvent<ArgsType> = Web3.DecodedLogEntryEvent<ArgsType>; + +export type EventCallbackAsync<ArgsType> = (log: DecodedLogEvent<ArgsType>) => Promise<void>; +export type EventCallbackSync<ArgsType> = (log: DecodedLogEvent<ArgsType>) => void; export type EventCallback<ArgsType> = EventCallbackSync<ArgsType>|EventCallbackAsync<ArgsType>; + +export type MempoolEventCallbackSync = (log: LogEvent) => void; +export type MempoolEventCallbackAsync = (log: LogEvent) => Promise<void>; +export type MempoolEventCallback = MempoolEventCallbackSync|MempoolEventCallbackAsync; + export interface ExchangeContract extends Web3.ContractInstance { isValidSignature: { callAsync: (signerAddressHex: string, dataHex: string, v: number, r: string, s: string, @@ -394,12 +399,14 @@ export interface JSONRPCPayload { * exchangeContractAddress: The address of an exchange contract to use * tokenRegistryContractAddress: The address of a token registry contract to use * etherTokenContractAddress: The address of an ether token contract to use + * mempoolPollingIntervalMs: How often to check for new mempool events */ export interface ZeroExConfig { gasPrice?: BigNumber; // Gas price to use with every transaction exchangeContractAddress?: string; tokenRegistryContractAddress?: string; etherTokenContractAddress?: string; + mempoolPollingIntervalMs?: number; } export type TransactionReceipt = Web3.TransactionReceipt; @@ -415,12 +422,7 @@ export interface DecodedLogArgs { [argName: string]: ContractEventArg; } -export interface DecodedArgs<ArgsType> { - args: ArgsType; - event: string; -} - -export interface LogWithDecodedArgs<ArgsType> extends Web3.LogEntry, DecodedArgs<ArgsType> {} +export interface LogWithDecodedArgs<ArgsType> extends Web3.DecodedLogEntry<ArgsType> {} export interface TransactionReceiptWithDecodedLogs extends Web3.TransactionReceipt { logs: Array<LogWithDecodedArgs<DecodedLogArgs>|Web3.LogEntry>; diff --git a/test/exchange_wrapper_test.ts b/test/exchange_wrapper_test.ts index 7c76499d5..15d1cb3e4 100644 --- a/test/exchange_wrapper_test.ts +++ b/test/exchange_wrapper_test.ts @@ -18,6 +18,7 @@ import { LogFillContractEventArgs, LogCancelContractEventArgs, LogEvent, + DecodedLogEvent, } from '../src'; import {DoneCallback, BlockParamLiteral} from '../src/types'; import {FillScenarios} from './utils/fill_scenarios'; @@ -304,11 +305,11 @@ describe('ExchangeWrapper', () => { orderFillBatch = [ { signedOrder, - takerTokenFillAmount: takerTokenFillAmount, + takerTokenFillAmount, }, { signedOrder: anotherSignedOrder, - takerTokenFillAmount: takerTokenFillAmount, + takerTokenFillAmount, }, ]; }); @@ -647,7 +648,7 @@ describe('ExchangeWrapper', () => { // Source: https://github.com/mochajs/mocha/issues/2407 it('Should receive the LogFill event when an order is filled', (done: DoneCallback) => { (async () => { - const callback = (logEvent: LogEvent<LogFillContractEventArgs>) => { + const callback = (logEvent: DecodedLogEvent<LogFillContractEventArgs>) => { expect(logEvent.event).to.be.equal(ExchangeEvents.LogFill); done(); }; @@ -655,13 +656,14 @@ describe('ExchangeWrapper', () => { ExchangeEvents.LogFill, indexFilterValues, callback, ); await zeroEx.exchange.fillOrderAsync( - signedOrder, takerTokenFillAmountInBaseUnits, shouldThrowOnInsufficientBalanceOrAllowance, takerAddress, + signedOrder, takerTokenFillAmountInBaseUnits, shouldThrowOnInsufficientBalanceOrAllowance, + takerAddress, ); })().catch(done); }); it('Should receive the LogCancel event when an order is cancelled', (done: DoneCallback) => { (async () => { - const callback = (logEvent: LogEvent<LogCancelContractEventArgs>) => { + const callback = (logEvent: DecodedLogEvent<LogCancelContractEventArgs>) => { expect(logEvent.event).to.be.equal(ExchangeEvents.LogCancel); done(); }; @@ -673,7 +675,7 @@ describe('ExchangeWrapper', () => { }); it('Outstanding subscriptions are cancelled when zeroEx.setProviderAsync called', (done: DoneCallback) => { (async () => { - const callbackNeverToBeCalled = (logEvent: LogEvent<LogFillContractEventArgs>) => { + const callbackNeverToBeCalled = (logEvent: DecodedLogEvent<LogFillContractEventArgs>) => { done(new Error('Expected this subscription to have been cancelled')); }; await zeroEx.exchange.subscribeAsync( @@ -683,7 +685,7 @@ describe('ExchangeWrapper', () => { const newProvider = web3Factory.getRpcProvider(); await zeroEx.setProviderAsync(newProvider); - const callback = (logEvent: LogEvent<LogFillContractEventArgs>) => { + const callback = (logEvent: DecodedLogEvent<LogFillContractEventArgs>) => { expect(logEvent.event).to.be.equal(ExchangeEvents.LogFill); done(); }; @@ -691,13 +693,14 @@ describe('ExchangeWrapper', () => { ExchangeEvents.LogFill, indexFilterValues, callback, ); await zeroEx.exchange.fillOrderAsync( - signedOrder, takerTokenFillAmountInBaseUnits, shouldThrowOnInsufficientBalanceOrAllowance, takerAddress, + signedOrder, takerTokenFillAmountInBaseUnits, shouldThrowOnInsufficientBalanceOrAllowance, + takerAddress, ); })().catch(done); }); it('Should cancel subscription when unsubscribe called', (done: DoneCallback) => { (async () => { - const callbackNeverToBeCalled = (logEvent: LogEvent<LogFillContractEventArgs>) => { + const callbackNeverToBeCalled = (logEvent: DecodedLogEvent<LogFillContractEventArgs>) => { done(new Error('Expected this subscription to have been cancelled')); }; const subscriptionToken = await zeroEx.exchange.subscribeAsync( @@ -705,7 +708,8 @@ describe('ExchangeWrapper', () => { ); zeroEx.exchange.unsubscribe(subscriptionToken); await zeroEx.exchange.fillOrderAsync( - signedOrder, takerTokenFillAmountInBaseUnits, shouldThrowOnInsufficientBalanceOrAllowance, takerAddress, + signedOrder, takerTokenFillAmountInBaseUnits, shouldThrowOnInsufficientBalanceOrAllowance, + takerAddress, ); done(); })().catch(done); diff --git a/test/token_wrapper_test.ts b/test/token_wrapper_test.ts index b35fa43f9..2f6f126c1 100644 --- a/test/token_wrapper_test.ts +++ b/test/token_wrapper_test.ts @@ -17,6 +17,7 @@ import { TokenContractEventArgs, LogWithDecodedArgs, LogEvent, + DecodedLogEvent, } from '../src'; import {BlockchainLifecycle} from './utils/blockchain_lifecycle'; import {TokenUtils} from './utils/token_utils'; @@ -358,7 +359,7 @@ describe('TokenWrapper', () => { // Source: https://github.com/mochajs/mocha/issues/2407 it('Should receive the Transfer event when tokens are transfered', (done: DoneCallback) => { (async () => { - const callback = (logEvent: LogEvent<TransferContractEventArgs>) => { + const callback = (logEvent: DecodedLogEvent<TransferContractEventArgs>) => { expect(logEvent).to.not.be.undefined(); const args = logEvent.args; expect(args._from).to.be.equal(coinbase); @@ -373,7 +374,7 @@ describe('TokenWrapper', () => { }); it('Should receive the Approval event when allowance is being set', (done: DoneCallback) => { (async () => { - const callback = (logEvent: LogEvent<ApprovalContractEventArgs>) => { + const callback = (logEvent: DecodedLogEvent<ApprovalContractEventArgs>) => { expect(logEvent).to.not.be.undefined(); const args = logEvent.args; expect(args._owner).to.be.equal(coinbase); @@ -388,13 +389,13 @@ describe('TokenWrapper', () => { }); it('Outstanding subscriptions are cancelled when zeroEx.setProviderAsync called', (done: DoneCallback) => { (async () => { - const callbackNeverToBeCalled = (logEvent: LogEvent<TransferContractEventArgs>) => { + const callbackNeverToBeCalled = (logEvent: DecodedLogEvent<TransferContractEventArgs>) => { done(new Error('Expected this subscription to have been cancelled')); }; zeroEx.token.subscribe( tokenAddress, TokenEvents.Transfer, indexFilterValues, callbackNeverToBeCalled, ); - const callbackToBeCalled = (logEvent: LogEvent<TransferContractEventArgs>) => { + const callbackToBeCalled = (logEvent: DecodedLogEvent<TransferContractEventArgs>) => { done(); }; const newProvider = web3Factory.getRpcProvider(); @@ -407,7 +408,7 @@ describe('TokenWrapper', () => { }); it('Should cancel subscription when unsubscribe called', (done: DoneCallback) => { (async () => { - const callbackNeverToBeCalled = (logEvent: LogEvent<TokenContractEventArgs>) => { + const callbackNeverToBeCalled = (logEvent: DecodedLogEvent<TokenContractEventArgs>) => { done(new Error('Expected this subscription to have been cancelled')); }; const subscriptionToken = zeroEx.token.subscribe( @@ -4685,7 +4685,7 @@ tslint-react@^3.0.0: version "3.0.0" resolved "https://registry.yarnpkg.com/tslint-react/-/tslint-react-3.0.0.tgz#00c48ab7f22e91533b62bdef2c162b49447af00a" -tslint@^5.3.2: +tslint@~5.5.0: version "5.5.0" resolved "https://registry.yarnpkg.com/tslint/-/tslint-5.5.0.tgz#10e8dab3e3061fa61e9442e8cee3982acf20a6aa" dependencies: @@ -4940,9 +4940,9 @@ web3-provider-engine@^8.4.0: xhr "^2.2.0" xtend "^4.0.1" -web3-typescript-typings@^0.6.2: - version "0.6.2" - resolved "https://registry.yarnpkg.com/web3-typescript-typings/-/web3-typescript-typings-0.6.2.tgz#5dd9bf4dcd1d6dd6897c87d055d1f5cc8f98dfbd" +web3-typescript-typings@^0.7.0: + version "0.7.0" + resolved "https://registry.yarnpkg.com/web3-typescript-typings/-/web3-typescript-typings-0.7.0.tgz#a8adcfaa5f4933eddd53d9e592bace3edfffa050" dependencies: bignumber.js "^4.0.2" |