diff options
-rw-r--r-- | packages/0x.js/src/order_watcher/expiration_watcher.ts | 58 | ||||
-rw-r--r-- | packages/0x.js/src/order_watcher/order_state_watcher.ts | 25 | ||||
-rw-r--r-- | packages/0x.js/src/types.ts | 2 | ||||
-rw-r--r-- | packages/0x.js/src/utils/heap.ts | 90 |
4 files changed, 173 insertions, 2 deletions
diff --git a/packages/0x.js/src/order_watcher/expiration_watcher.ts b/packages/0x.js/src/order_watcher/expiration_watcher.ts new file mode 100644 index 000000000..1a9e9a418 --- /dev/null +++ b/packages/0x.js/src/order_watcher/expiration_watcher.ts @@ -0,0 +1,58 @@ +import * as _ from 'lodash'; +import {BigNumber} from 'bignumber.js'; +import {utils} from '../utils/utils'; +import {intervalUtils} from '../utils/interval_utils'; +import {SignedOrder} from '../types'; +import {Heap} from '../utils/heap'; +import {ZeroEx} from '../0x'; + +// Order prunning is very fast +const DEFAULT_ORDER_EXPIRATION_CHECKING_INTERVAL_MS = 50; + +/** + * This class includes all the functionality related to prunning expired orders + */ +export class ExpirationWatcher { + private orderHashHeapByExpiration: Heap<string>; + private expiration: {[orderHash: string]: BigNumber}; + private callbackIfExists?: (orderHash: string) => void; + private orderExpirationCheckingIntervalMs: number; + private orderExpirationCheckingIntervalIdIfExists?: NodeJS.Timer; + constructor(orderExpirationCheckingIntervalMs?: number) { + this.orderExpirationCheckingIntervalMs = orderExpirationCheckingIntervalMs || + DEFAULT_ORDER_EXPIRATION_CHECKING_INTERVAL_MS; + this.expiration = {}; + const scoreFunction = ((orderHash: string) => { + return this.expiration[orderHash].toNumber(); + }).bind(this); + this.orderHashHeapByExpiration = new Heap(scoreFunction); + } + public subscribe(callback: (orderHash: string) => void): void { + this.callbackIfExists = callback; + this.orderExpirationCheckingIntervalIdIfExists = intervalUtils.setAsyncExcludingInterval( + this.pruneExpiredOrders.bind(this), this.orderExpirationCheckingIntervalMs, + ); + } + public unsubscribe(): void { + intervalUtils.clearAsyncExcludingInterval(this.orderExpirationCheckingIntervalIdIfExists as NodeJS.Timer); + delete this.callbackIfExists; + } + public addOrder(orderHash: string, expirationUnixTimestampSec: BigNumber): void { + this.expiration[orderHash] = expirationUnixTimestampSec; + // We don't remove hashes on order remove because it's slow (linear). + // We just skip them later if the order was already removed from the order watcher. + this.orderHashHeapByExpiration.push(orderHash); + } + private pruneExpiredOrders(): void { + const currentUnixTimestampSec = utils.getCurrentUnixTimestamp(); + while ( + this.orderHashHeapByExpiration.size() !== 0 && + this.expiration[this.orderHashHeapByExpiration.head()].lessThan(currentUnixTimestampSec) && + !_.isUndefined(this.callbackIfExists) + ) { + const orderHash = this.orderHashHeapByExpiration.pop(); + delete this.expiration[orderHash]; + this.callbackIfExists(orderHash); + } + } +} diff --git a/packages/0x.js/src/order_watcher/order_state_watcher.ts b/packages/0x.js/src/order_watcher/order_state_watcher.ts index 2b9d7997e..f21ad0bbe 100644 --- a/packages/0x.js/src/order_watcher/order_state_watcher.ts +++ b/packages/0x.js/src/order_watcher/order_state_watcher.ts @@ -6,6 +6,7 @@ import {assert} from '../utils/assert'; import {utils} from '../utils/utils'; import {artifacts} from '../artifacts'; import {AbiDecoder} from '../utils/abi_decoder'; +import {intervalUtils} from '../utils/interval_utils'; import {OrderStateUtils} from '../utils/order_state_utils'; import { LogEvent, @@ -24,14 +25,14 @@ import { ExchangeEvents, TokenEvents, ZeroExError, + ExchangeContractErrs, } from '../types'; import {Web3Wrapper} from '../web3_wrapper'; import {TokenWrapper} from '../contract_wrappers/token_wrapper'; import {ExchangeWrapper} from '../contract_wrappers/exchange_wrapper'; import {OrderFilledCancelledLazyStore} from '../stores/order_filled_cancelled_lazy_store'; import {BalanceAndProxyAllowanceLazyStore} from '../stores/balance_proxy_allowance_lazy_store'; - -const DEFAULT_NUM_CONFIRMATIONS = 0; +import {ExpirationWatcher} from './expiration_watcher'; interface DependentOrderHashes { [makerAddress: string]: { @@ -56,6 +57,7 @@ export class OrderStateWatcher { private _eventWatcher: EventWatcher; private _web3Wrapper: Web3Wrapper; private _abiDecoder: AbiDecoder; + private _expirationWatcher: ExpirationWatcher; private _orderStateUtils: OrderStateUtils; private _orderFilledCancelledLazyStore: OrderFilledCancelledLazyStore; private _balanceAndProxyAllowanceLazyStore: BalanceAndProxyAllowanceLazyStore; @@ -72,6 +74,10 @@ export class OrderStateWatcher { this._orderStateUtils = new OrderStateUtils( this._balanceAndProxyAllowanceLazyStore, this._orderFilledCancelledLazyStore, ); + const orderExpirationCheckingIntervalMs = _.isUndefined(config) ? + undefined : + config.orderExpirationCheckingIntervalMs; + this._expirationWatcher = new ExpirationWatcher(orderExpirationCheckingIntervalMs); } /** * Add an order to the orderStateWatcher. Before the order is added, it's @@ -84,6 +90,8 @@ export class OrderStateWatcher { assert.isValidSignature(orderHash, signedOrder.ecSignature, signedOrder.maker); this._orderByOrderHash[orderHash] = signedOrder; this.addToDependentOrderHashes(signedOrder, orderHash); + // We don't remove orders from expirationWatcher because heap removal is linear. We just skip it later + this._expirationWatcher.addOrder(orderHash, signedOrder.expirationUnixTimestampSec); } /** * Removes an order from the orderStateWatcher @@ -111,6 +119,7 @@ export class OrderStateWatcher { } this._callbackIfExistsAsync = callback; this._eventWatcher.subscribe(this._onEventWatcherCallbackAsync.bind(this)); + this._expirationWatcher.subscribe(this._onOrderExpired.bind(this)); } /** * Ends an orderStateWatcher subscription. @@ -123,6 +132,18 @@ export class OrderStateWatcher { this._orderFilledCancelledLazyStore.deleteAll(); delete this._callbackIfExistsAsync; this._eventWatcher.unsubscribe(); + this._expirationWatcher.unsubscribe(); + } + private _onOrderExpired(orderHash: string): void { + const orderState: OrderState = { + isValid: false, + orderHash, + error: ExchangeContractErrs.OrderFillExpired, + }; + if (!_.isUndefined(this._orderByOrderHash[orderHash])) { + // We need this check because we never remove the orders from expiration watcher + (this._callbackIfExistsAsync as OnOrderStateChangeCallback)(orderState); + } } private async _onEventWatcherCallbackAsync(log: LogEvent): Promise<void> { const maybeDecodedLog = this._abiDecoder.tryToDecodeLogOrNoop(log); diff --git a/packages/0x.js/src/types.ts b/packages/0x.js/src/types.ts index 11b5d8569..45b49e148 100644 --- a/packages/0x.js/src/types.ts +++ b/packages/0x.js/src/types.ts @@ -397,9 +397,11 @@ export interface JSONRPCPayload { } /* + * orderExpirationCheckingIntervalMs: How often to check for expired orders * eventPollingIntervalMs: How often to poll the Ethereum node for new events */ export interface OrderStateWatcherConfig { + orderExpirationCheckingIntervalMs?: number; eventPollingIntervalMs?: number; } diff --git a/packages/0x.js/src/utils/heap.ts b/packages/0x.js/src/utils/heap.ts new file mode 100644 index 000000000..aaa17e719 --- /dev/null +++ b/packages/0x.js/src/utils/heap.ts @@ -0,0 +1,90 @@ +// Based on Original JavaScript Code from Marijn Haverbeke (http://eloquentjavascript.net/1st_edition/appendix2.html) +export class Heap<T> { + private content: T[]; + private scoreFunction: (x: T) => number; + constructor(scoreFunction: (x: T) => number) { + this.content = []; + this.scoreFunction = scoreFunction; + } + public push(element: T) { + this.content.push(element); + this.bubbleUp(this.content.length - 1); + } + public size(): number { + const size = this.content.length; + return size; + } + public head(): T { + const head = this.content[0]; + return head; + } + public pop(): T { + const head = this.content[0]; + const end = this.content.pop(); + if (this.content.length > 0) { + this.content[0] = end as T; + this.sinkDown(0); + } + return head; + } + private bubbleUp(n: number) { + // Fetch the element that has to be moved. + const element = this.content[n]; + const score = this.scoreFunction(element); + // When at 0, an element can not go up any further. + while (n > 0) { + // Compute the parent element's index, and fetch it. + const parentN = Math.floor((n + 1) / 2) - 1; + const parent = this.content[parentN]; + // If the parent has a lesser score, things are in order and we + // are done. + if (score >= this.scoreFunction(parent)) { + break; + } + + // Otherwise, swap the parent with the current element and + // continue. + this.content[parentN] = element; + this.content[n] = parent; + n = parentN; + } + } + + private sinkDown(n: number) { + // Look up the target element and its score. + const length = this.content.length; + const element = this.content[n]; + const elemScore = this.scoreFunction(element); + + while (true) { + // Compute the indices of the child elements. + const child2N = (n + 1) * 2; + const child1N = child2N - 1; + // This is used to store the new position of the element, if any. + let swap = n; + let child1Score; + let child2Score; + // If the first child exists (is inside the array)... + if (child1N < length) { + // Look it up and compute its score. + const child1 = this.content[child1N]; + child1Score = this.scoreFunction(child1); + // If the score is less than our element's, we need to swap. + if (child1Score < elemScore) { + swap = child1N; + } + // Do the same checks for the other child. + if (child2N < length) { + const child2 = this.content[child2N]; + child2Score = this.scoreFunction(child2); + if (child2Score < (swap == null ? elemScore : child1Score)) { + swap = child2N; + } + } + } + this.content[n] = this.content[swap]; + this.content[swap] = element; + n = swap; + } + } +} |