aboutsummaryrefslogtreecommitdiffstats
path: root/src/mempool
diff options
context:
space:
mode:
authorLeonid Logvinov <logvinov.leon@gmail.com>2017-10-31 00:38:10 +0800
committerFabio Berger <me@fabioberger.com>2017-10-31 00:49:16 +0800
commita896904ae7c453f51b1f46de2be3a28416db72d1 (patch)
treec50c3638b0b9bee982816bb48a4c935de798476a /src/mempool
parent6bfcd253f8e9689ce787899a42f80914b067a4f1 (diff)
downloaddexon-sol-tools-a896904ae7c453f51b1f46de2be3a28416db72d1.tar
dexon-sol-tools-a896904ae7c453f51b1f46de2be3a28416db72d1.tar.gz
dexon-sol-tools-a896904ae7c453f51b1f46de2be3a28416db72d1.tar.bz2
dexon-sol-tools-a896904ae7c453f51b1f46de2be3a28416db72d1.tar.lz
dexon-sol-tools-a896904ae7c453f51b1f46de2be3a28416db72d1.tar.xz
dexon-sol-tools-a896904ae7c453f51b1f46de2be3a28416db72d1.tar.zst
dexon-sol-tools-a896904ae7c453f51b1f46de2be3a28416db72d1.zip
Add naive order state watcher implementation
Revalidate all orders upon event received and emit order states even if not changed
Diffstat (limited to 'src/mempool')
-rw-r--r--src/mempool/event_watcher.ts20
-rw-r--r--src/mempool/order_state_watcher.ts55
2 files changed, 41 insertions, 34 deletions
diff --git a/src/mempool/event_watcher.ts b/src/mempool/event_watcher.ts
index 1ad30b790..27f0c8207 100644
--- a/src/mempool/event_watcher.ts
+++ b/src/mempool/event_watcher.ts
@@ -12,7 +12,7 @@ export class EventWatcher {
private _pollingIntervalMs: number;
private _intervalId: NodeJS.Timer;
private _lastMempoolEvents: Web3.LogEntry[] = [];
- private _callback?: MempoolEventCallback;
+ private _callbackAsync?: MempoolEventCallback;
constructor(web3Wrapper: Web3Wrapper, pollingIntervalMs: undefined|number) {
this._web3Wrapper = web3Wrapper;
this._pollingIntervalMs = _.isUndefined(pollingIntervalMs) ?
@@ -20,12 +20,12 @@ export class EventWatcher {
pollingIntervalMs;
}
public subscribe(callback: MempoolEventCallback): void {
- this._callback = callback;
+ this._callbackAsync = callback;
this._intervalId = intervalUtils.setAsyncExcludingInterval(
this._pollForMempoolEventsAsync.bind(this), this._pollingIntervalMs);
}
public unsubscribe(): void {
- delete this._callback;
+ delete this._callbackAsync;
this._lastMempoolEvents = [];
intervalUtils.clearAsyncExcludingInterval(this._intervalId);
}
@@ -40,9 +40,9 @@ export class EventWatcher {
const removedEvents = _.differenceBy(this._lastMempoolEvents, pendingEvents, JSON.stringify);
const newEvents = _.differenceBy(pendingEvents, this._lastMempoolEvents, JSON.stringify);
let isRemoved = true;
- this._emitDifferences(removedEvents, isRemoved);
+ await this._emitDifferencesAsync(removedEvents, isRemoved);
isRemoved = false;
- this._emitDifferences(newEvents, isRemoved);
+ await this._emitDifferencesAsync(newEvents, isRemoved);
this._lastMempoolEvents = pendingEvents;
}
private async _getMempoolEventsAsync(): Promise<Web3.LogEntry[]> {
@@ -53,15 +53,15 @@ export class EventWatcher {
const pendingEvents = await this._web3Wrapper.getLogsAsync(mempoolFilter);
return pendingEvents;
}
- private _emitDifferences(logs: Web3.LogEntry[], isRemoved: boolean): void {
- _.forEach(logs, log => {
+ private async _emitDifferencesAsync(logs: Web3.LogEntry[], isRemoved: boolean): Promise<void> {
+ for (const log of logs) {
const logEvent = {
removed: isRemoved,
...log,
};
- if (!_.isUndefined(this._callback)) {
- this._callback(logEvent);
+ if (!_.isUndefined(this._callbackAsync)) {
+ await this._callbackAsync(logEvent);
}
- });
+ }
}
}
diff --git a/src/mempool/order_state_watcher.ts b/src/mempool/order_state_watcher.ts
index 89f84647d..3da48005d 100644
--- a/src/mempool/order_state_watcher.ts
+++ b/src/mempool/order_state_watcher.ts
@@ -5,13 +5,14 @@ import {EventWatcher} from './event_watcher';
import {assert} from '../utils/assert';
import {artifacts} from '../artifacts';
import {AbiDecoder} from '../utils/abi_decoder';
-import {orderWatcherConfigSchema} from '../schemas/order_watcher_config_schema';
+import {OrderStateUtils} from '../utils/order_state_utils';
import {
LogEvent,
+ OrderState,
SignedOrder,
Web3Provider,
+ BlockParamLiteral,
LogWithDecodedArgs,
- OrderWatcherConfig,
OnOrderStateChangeCallback,
} from '../types';
import {Web3Wrapper} from '../web3_wrapper';
@@ -19,20 +20,19 @@ import {Web3Wrapper} from '../web3_wrapper';
export class OrderStateWatcher {
private _orders = new Map<string, SignedOrder>();
private _web3Wrapper: Web3Wrapper;
- private _config: OrderWatcherConfig;
- private _callback?: OnOrderStateChangeCallback;
- private _eventWatcher?: EventWatcher;
+ private _callbackAsync?: OnOrderStateChangeCallback;
+ private _eventWatcher: EventWatcher;
private _abiDecoder: AbiDecoder;
- constructor(provider: Web3Provider, config?: OrderWatcherConfig) {
- assert.isWeb3Provider('provider', provider);
- if (!_.isUndefined(config)) {
- assert.doesConformToSchema('config', config, orderWatcherConfigSchema);
- }
- this._web3Wrapper = new Web3Wrapper(provider);
- this._config = config || {};
- const artifactJSONs = _.values(artifacts);
- const abiArrays = _.map(artifactJSONs, artifact => artifact.abi);
- this._abiDecoder = new AbiDecoder(abiArrays);
+ private _orderStateUtils: OrderStateUtils;
+ constructor(
+ web3Wrapper: Web3Wrapper, abiDecoder: AbiDecoder, orderStateUtils: OrderStateUtils,
+ mempoolPollingIntervalMs?: number) {
+ this._web3Wrapper = web3Wrapper;
+ this._eventWatcher = new EventWatcher(
+ this._web3Wrapper, mempoolPollingIntervalMs,
+ );
+ this._abiDecoder = abiDecoder;
+ this._orderStateUtils = orderStateUtils;
}
public addOrder(signedOrder: SignedOrder): void {
assert.doesConformToSchema('signedOrder', signedOrder, schemas.signedOrderSchema);
@@ -46,17 +46,12 @@ export class OrderStateWatcher {
}
public subscribe(callback: OnOrderStateChangeCallback): void {
assert.isFunction('callback', callback);
- this._callback = callback;
- this._eventWatcher = new EventWatcher(
- this._web3Wrapper, this._config.mempoolPollingIntervalMs,
- );
+ this._callbackAsync = callback;
this._eventWatcher.subscribe(this._onMempoolEventCallbackAsync.bind(this));
}
public unsubscribe(): void {
- delete this._callback;
- if (!_.isUndefined(this._eventWatcher)) {
- this._eventWatcher.unsubscribe();
- }
+ delete this._callbackAsync;
+ this._eventWatcher.unsubscribe();
}
private async _onMempoolEventCallbackAsync(log: LogEvent): Promise<void> {
const maybeDecodedLog = this._abiDecoder.tryToDecodeLogOrNoop(log);
@@ -65,6 +60,18 @@ export class OrderStateWatcher {
}
}
private async _revalidateOrdersAsync(): Promise<void> {
- _.noop();
+ const methodOpts = {
+ defaultBlock: BlockParamLiteral.Pending,
+ };
+ const orderHashes = Array.from(this._orders.keys());
+ for (const orderHash of orderHashes) {
+ const signedOrder = this._orders.get(orderHash) as SignedOrder;
+ const orderState = await this._orderStateUtils.getOrderStateAsync(signedOrder, methodOpts);
+ if (!_.isUndefined(this._callbackAsync)) {
+ await this._callbackAsync(orderState);
+ } else {
+ break; // Unsubscribe was called
+ }
+ }
}
}