From fd54a6a3ad91a6aaff0e2a81f4fd9856b02ff320 Mon Sep 17 00:00:00 2001 From: Leonid Logvinov Date: Mon, 30 Oct 2017 10:53:59 +0200 Subject: Rename MempoolWatcher to EventWatcher and remove from public interface --- src/mempool/event_watcher.ts | 66 ++++++++++++++++++++++++++++++++++++++++++ src/mempool/mempool_watcher.ts | 66 ------------------------------------------ 2 files changed, 66 insertions(+), 66 deletions(-) create mode 100644 src/mempool/event_watcher.ts delete mode 100644 src/mempool/mempool_watcher.ts (limited to 'src/mempool') diff --git a/src/mempool/event_watcher.ts b/src/mempool/event_watcher.ts new file mode 100644 index 000000000..e28219682 --- /dev/null +++ b/src/mempool/event_watcher.ts @@ -0,0 +1,66 @@ +import * as Web3 from 'web3'; +import * as _ from 'lodash'; +import {Web3Wrapper} from '../web3_wrapper'; +import {BlockParamLiteral, EventCallback, MempoolEventCallback} from '../types'; +import {AbiDecoder} from '../utils/abi_decoder'; +import {intervalUtils} from '../utils/interval_utils'; + +const DEFAULT_MEMPOOL_POLLING_INTERVAL = 200; + +export class EventWatcher { + private _web3Wrapper: Web3Wrapper; + private _pollingIntervalMs: number; + private _intervalId: NodeJS.Timer; + private _lastMempoolEvents: Web3.LogEntry[] = []; + private _callback?: MempoolEventCallback; + constructor(web3Wrapper: Web3Wrapper, pollingIntervalMs: undefined|number) { + this._web3Wrapper = web3Wrapper; + this._pollingIntervalMs = _.isUndefined(pollingIntervalMs) ? + DEFAULT_MEMPOOL_POLLING_INTERVAL : + pollingIntervalMs; + } + public subscribe(callback: MempoolEventCallback): void { + this._callback = callback; + this._intervalId = intervalUtils.setAsyncExcludingInterval( + this._pollForMempoolEventsAsync.bind(this), this._pollingIntervalMs); + } + public unsubscribe(): void { + delete this._callback; + intervalUtils.clearAsyncExcludingInterval(this._intervalId); + } + private async _pollForMempoolEventsAsync(): Promise { + const pendingEvents = await this._getMempoolEventsAsync(); + if (pendingEvents.length === 0) { + // HACK: Sometimes when node rebuilds the pending block we get back the empty result. + // We don't want to emit a lot of removal events and bring them back after a couple of miliseconds, + // that's why we just ignore those cases. + return; + } + const removedEvents = _.differenceBy(this._lastMempoolEvents, pendingEvents, JSON.stringify); + const newEvents = _.differenceBy(pendingEvents, this._lastMempoolEvents, JSON.stringify); + let isRemoved = true; + this._emitDifferences(removedEvents, isRemoved); + isRemoved = false; + this._emitDifferences(newEvents, isRemoved); + this._lastMempoolEvents = pendingEvents; + } + private async _getMempoolEventsAsync(): Promise { + const mempoolFilter = { + fromBlock: BlockParamLiteral.Pending, + toBlock: BlockParamLiteral.Pending, + }; + const pendingEvents = await this._web3Wrapper.getLogsAsync(mempoolFilter); + return pendingEvents; + } + private _emitDifferences(logs: Web3.LogEntry[], isRemoved: boolean): void { + _.forEach(logs, log => { + const logEvent = { + removed: isRemoved, + ...log, + }; + if (!_.isUndefined(this._callback)) { + this._callback(logEvent); + } + }); + } +} diff --git a/src/mempool/mempool_watcher.ts b/src/mempool/mempool_watcher.ts deleted file mode 100644 index 70d263fcb..000000000 --- a/src/mempool/mempool_watcher.ts +++ /dev/null @@ -1,66 +0,0 @@ -import * as Web3 from 'web3'; -import * as _ from 'lodash'; -import {Web3Wrapper} from '../web3_wrapper'; -import {BlockParamLiteral, EventCallback, MempoolEventCallback} from '../types'; -import {AbiDecoder} from '../utils/abi_decoder'; -import {intervalUtils} from '../utils/interval_utils'; - -const DEFAULT_MEMPOOL_POLLING_INTERVAL = 200; - -export class MempoolWatcher { - private _web3Wrapper: Web3Wrapper; - private _pollingIntervalMs: number; - private _intervalId: NodeJS.Timer; - private _lastMempoolEvents: Web3.LogEntry[] = []; - private _callback?: MempoolEventCallback; - constructor(web3Wrapper: Web3Wrapper, pollingIntervalMs: undefined|number) { - this._web3Wrapper = web3Wrapper; - this._pollingIntervalMs = _.isUndefined(pollingIntervalMs) ? - DEFAULT_MEMPOOL_POLLING_INTERVAL : - pollingIntervalMs; - } - public subscribe(callback: MempoolEventCallback): void { - this._callback = callback; - this._intervalId = intervalUtils.setAsyncExcludingInterval( - this._pollForMempoolEventsAsync.bind(this), this._pollingIntervalMs); - } - public unsubscribe(): void { - delete this._callback; - intervalUtils.clearAsyncExcludingInterval(this._intervalId); - } - private async _pollForMempoolEventsAsync(): Promise { - const pendingEvents = await this._getMempoolEventsAsync(); - if (pendingEvents.length === 0) { - // HACK: Sometimes when node rebuilds the pending block we get back the empty result. - // We don't want to emit a lot of removal events and bring them back after a couple of miliseconds, - // that's why we just ignore those cases. - return; - } - const removedEvents = _.differenceBy(this._lastMempoolEvents, pendingEvents, JSON.stringify); - const newEvents = _.differenceBy(pendingEvents, this._lastMempoolEvents, JSON.stringify); - let isRemoved = true; - this._emitDifferences(removedEvents, isRemoved); - isRemoved = false; - this._emitDifferences(newEvents, isRemoved); - this._lastMempoolEvents = pendingEvents; - } - private async _getMempoolEventsAsync(): Promise { - const mempoolFilter = { - fromBlock: BlockParamLiteral.Pending, - toBlock: BlockParamLiteral.Pending, - }; - const pendingEvents = await this._web3Wrapper.getLogsAsync(mempoolFilter); - return pendingEvents; - } - private _emitDifferences(logs: Web3.LogEntry[], isRemoved: boolean): void { - _.forEach(logs, log => { - const logEvent = { - removed: isRemoved, - ...log, - }; - if (!_.isUndefined(this._callback)) { - this._callback(logEvent); - } - }); - } -} -- cgit v1.2.3