From f53472e7170798f56ea4837c310cfd4188326af8 Mon Sep 17 00:00:00 2001 From: Leonid Logvinov Date: Thu, 26 Oct 2017 12:50:02 +0300 Subject: Add initial mempool watching implememtation --- src/0x.ts | 10 +++++++ src/index.ts | 2 ++ src/mempool/mempool_watcher.ts | 64 ++++++++++++++++++++++++++++++++++++++++++ src/types.ts | 24 ++++++++-------- 4 files changed, 89 insertions(+), 11 deletions(-) create mode 100644 src/mempool/mempool_watcher.ts (limited to 'src') diff --git a/src/0x.ts b/src/0x.ts index bc753434c..10db7e158 100644 --- a/src/0x.ts +++ b/src/0x.ts @@ -11,6 +11,7 @@ import {assert} from './utils/assert'; import {AbiDecoder} from './utils/abi_decoder'; import {intervalUtils} from './utils/interval_utils'; import {artifacts} from './artifacts'; +import {MempoolWatcher} from './mempool/mempool_watcher'; import {ExchangeWrapper} from './contract_wrappers/exchange_wrapper'; import {TokenRegistryWrapper} from './contract_wrappers/token_registry_wrapper'; import {EtherTokenWrapper} from './contract_wrappers/ether_token_wrapper'; @@ -65,6 +66,10 @@ export class ZeroEx { * tokenTransferProxy smart contract. */ public proxy: TokenTransferProxyWrapper; + /** + * An instance of the MempoolWatcher class containing methods for watching pending events. + */ + public mempool: MempoolWatcher; private _web3Wrapper: Web3Wrapper; private _abiDecoder: AbiDecoder; /** @@ -191,6 +196,11 @@ export class ZeroEx { gasPrice, }; this._web3Wrapper = new Web3Wrapper(provider, defaults); + const mempoolPollingIntervalMs = _.isUndefined(config) ? undefined : config.mempoolPollingIntervalMs; + this.mempool = new MempoolWatcher( + this._web3Wrapper, + mempoolPollingIntervalMs, + ); this.token = new TokenWrapper( this._web3Wrapper, this._abiDecoder, diff --git a/src/index.ts b/src/index.ts index 249c20519..7a9b8aa63 100644 --- a/src/index.ts +++ b/src/index.ts @@ -35,4 +35,6 @@ export { OrderTransactionOpts, FilterObject, LogEvent, + DecodedLogEvent, + MempoolEventCallback, } from './types'; diff --git a/src/mempool/mempool_watcher.ts b/src/mempool/mempool_watcher.ts new file mode 100644 index 000000000..be598c28f --- /dev/null +++ b/src/mempool/mempool_watcher.ts @@ -0,0 +1,64 @@ +import * as Web3 from 'web3'; +import * as _ from 'lodash'; +import {Web3Wrapper} from '../web3_wrapper'; +import {BlockParamLiteral, EventCallback, MempoolEventCallback} from '../types'; +import {AbiDecoder} from '../utils/abi_decoder'; +import {intervalUtils} from '../utils/interval_utils'; + +const DEFAULT_MEMPOOL_POLLING_INTERVAL = 200; + +export class MempoolWatcher { + private _web3Wrapper: Web3Wrapper; + private _pollingIntervalMs: number; + private _intervalId: NodeJS.Timer; + private _lastMempoolEvents: Web3.LogEntry[] = []; + private _callback?: MempoolEventCallback; + constructor(web3Wrapper: Web3Wrapper, pollingIntervalMs: undefined|number) { + this._web3Wrapper = web3Wrapper; + this._pollingIntervalMs = _.isUndefined(pollingIntervalMs) ? + DEFAULT_MEMPOOL_POLLING_INTERVAL : + pollingIntervalMs; + } + public subscribe(callback: MempoolEventCallback): void { + this._callback = callback; + this._intervalId = intervalUtils.setAsyncExcludingInterval( + this._pollForMempoolEventsAsync.bind(this), this._pollingIntervalMs); + } + public unsubscribe(): void { + delete this._callback; + intervalUtils.clearAsyncExcludingInterval(this._intervalId); + } + private async _pollForMempoolEventsAsync(): Promise { + const pendingEvents = await this._getMempoolEventsAsync(); + if (pendingEvents.length === 0) { + // HACK: Sometimes when node rebuilds the pending block we get back the empty result. + // We don't want to emit a lot of removal events and bring them back after a couple of miliseconds, + // that's why we just ignore those cases. + return; + } + const removedEvents = _.differenceBy(this._lastMempoolEvents, pendingEvents, _.isEqual); + const newEvents = _.differenceBy(pendingEvents, this._lastMempoolEvents, _.isEqual); + let isRemoved = true; + this._emitDifferences(removedEvents, isRemoved); + isRemoved = false; + this._emitDifferences(newEvents, isRemoved); + this._lastMempoolEvents = pendingEvents; + } + private async _getMempoolEventsAsync(): Promise { + const mempoolFilter = { + fromBlock: BlockParamLiteral.Pending, + toBlock: BlockParamLiteral.Pending, + }; + const pendingEvents = await this._web3Wrapper.getLogsAsync(mempoolFilter); + return pendingEvents; + } + private _emitDifferences(logs: Web3.LogEntry[], isRemoved: boolean): void { + _.forEach(logs, log => { + const logWithDecodedArgsEvent = { + removed: isRemoved, + ...log, + }; + (this._callback as MempoolEventCallback)(logWithDecodedArgsEvent); + }); + } +} diff --git a/src/types.ts b/src/types.ts index 9ac726ef8..1b32ccdf9 100644 --- a/src/types.ts +++ b/src/types.ts @@ -37,12 +37,17 @@ export type OrderAddresses = [string, string, string, string, string]; export type OrderValues = [BigNumber, BigNumber, BigNumber, BigNumber, BigNumber, BigNumber]; -export interface LogEvent extends LogWithDecodedArgs { - removed: boolean; -} -export type EventCallbackAsync = (log: LogEvent) => Promise; -export type EventCallbackSync = (log: LogEvent) => void; +export type LogEvent = Web3.LogEntryEvent; +export type DecodedLogEvent = Web3.DecodedLogEntryEvent; + +export type EventCallbackAsync = (log: DecodedLogEvent) => Promise; +export type EventCallbackSync = (log: DecodedLogEvent) => void; export type EventCallback = EventCallbackSync|EventCallbackAsync; + +export type MempoolEventCallbackSync = (log: LogEvent) => void; +export type MempoolEventCallbackAsync = (log: LogEvent) => Promise; +export type MempoolEventCallback = MempoolEventCallbackSync|MempoolEventCallbackAsync; + export interface ExchangeContract extends Web3.ContractInstance { isValidSignature: { callAsync: (signerAddressHex: string, dataHex: string, v: number, r: string, s: string, @@ -394,12 +399,14 @@ export interface JSONRPCPayload { * exchangeContractAddress: The address of an exchange contract to use * tokenRegistryContractAddress: The address of a token registry contract to use * etherTokenContractAddress: The address of an ether token contract to use + * mempoolPollingIntervalMs: How often to check for new mempool events */ export interface ZeroExConfig { gasPrice?: BigNumber; // Gas price to use with every transaction exchangeContractAddress?: string; tokenRegistryContractAddress?: string; etherTokenContractAddress?: string; + mempoolPollingIntervalMs?: number; } export type TransactionReceipt = Web3.TransactionReceipt; @@ -415,12 +422,7 @@ export interface DecodedLogArgs { [argName: string]: ContractEventArg; } -export interface DecodedArgs { - args: ArgsType; - event: string; -} - -export interface LogWithDecodedArgs extends Web3.LogEntry, DecodedArgs {} +export interface LogWithDecodedArgs extends Web3.DecodedLogEntry {} export interface TransactionReceiptWithDecodedLogs extends Web3.TransactionReceipt { logs: Array|Web3.LogEntry>; -- cgit v1.2.3