diff options
-rw-r--r-- | src/mempool/event_watcher.ts | 24 | ||||
-rw-r--r-- | src/mempool/order_state_watcher.ts | 25 | ||||
-rw-r--r-- | src/web3_wrapper.ts | 4 | ||||
-rw-r--r-- | test/order_state_watcher_test.ts | 9 |
4 files changed, 48 insertions, 14 deletions
diff --git a/src/mempool/event_watcher.ts b/src/mempool/event_watcher.ts index cb8921cfd..3f40606e7 100644 --- a/src/mempool/event_watcher.ts +++ b/src/mempool/event_watcher.ts @@ -19,10 +19,10 @@ export class EventWatcher { DEFAULT_MEMPOOL_POLLING_INTERVAL : pollingIntervalMs; } - public subscribe(callback: MempoolEventCallback): void { + public subscribe(callback: MempoolEventCallback, numConfirmations: number): void { this._callbackAsync = callback; this._intervalId = intervalUtils.setAsyncExcludingInterval( - this._pollForMempoolEventsAsync.bind(this), this._pollingIntervalMs, + this._pollForMempoolEventsAsync.bind(this, numConfirmations), this._pollingIntervalMs, ); } public unsubscribe(): void { @@ -30,8 +30,8 @@ export class EventWatcher { this._lastMempoolEvents = []; intervalUtils.clearAsyncExcludingInterval(this._intervalId); } - private async _pollForMempoolEventsAsync(): Promise<void> { - const pendingEvents = await this._getMempoolEventsAsync(); + private async _pollForMempoolEventsAsync(numConfirmations: number): Promise<void> { + const pendingEvents = await this._getMempoolEventsAsync(numConfirmations); if (pendingEvents.length === 0) { // HACK: Sometimes when node rebuilds the pending block we get back the empty result. // We don't want to emit a lot of removal events and bring them back after a couple of miliseconds, @@ -46,11 +46,19 @@ export class EventWatcher { await this._emitDifferencesAsync(newEvents, isRemoved); this._lastMempoolEvents = pendingEvents; } - private async _getMempoolEventsAsync(): Promise<Web3.LogEntry[]> { - // TODO: Allow users to listen to any number of confirmations deep, not just mempool + private async _getMempoolEventsAsync(numConfirmations: number): Promise<Web3.LogEntry[]> { + let fromBlock: BlockParamLiteral|number; + let toBlock: BlockParamLiteral|number; + if (numConfirmations === 0) { + fromBlock = BlockParamLiteral.Pending; + toBlock = BlockParamLiteral.Pending; + } else { + toBlock = await this._web3Wrapper.getBlockNumberAsync(); + fromBlock = toBlock - numConfirmations; + } const mempoolFilter = { - fromBlock: BlockParamLiteral.Pending, - toBlock: BlockParamLiteral.Pending, + fromBlock, + toBlock, }; const pendingEvents = await this._web3Wrapper.getLogsAsync(mempoolFilter); return pendingEvents; diff --git a/src/mempool/order_state_watcher.ts b/src/mempool/order_state_watcher.ts index 528b8ceff..63e812054 100644 --- a/src/mempool/order_state_watcher.ts +++ b/src/mempool/order_state_watcher.ts @@ -50,12 +50,20 @@ export class OrderStateWatcher { this._abiDecoder = abiDecoder; this._orderStateUtils = orderStateUtils; } + /** + * Add an order to the orderStateWatcher + * @param signedOrder The order you wish to start watching. + */ public addOrder(signedOrder: SignedOrder): void { assert.doesConformToSchema('signedOrder', signedOrder, schemas.signedOrderSchema); const orderHash = ZeroEx.getOrderHashHex(signedOrder); this._orders[orderHash] = signedOrder; this.addToDependentOrderHashes(signedOrder, orderHash); } + /** + * Removes an order from the orderStateWatcher + * @param orderHash The orderHash of the order you wish to stop watching. + */ public removeOrder(orderHash: string): void { assert.doesConformToSchema('orderHash', orderHash, schemas.orderHashSchema); const signedOrder = this._orders[orderHash]; @@ -66,11 +74,24 @@ export class OrderStateWatcher { this._dependentOrderHashes[signedOrder.maker][signedOrder.makerTokenAddress].delete(orderHash); // We currently do not remove the maker/makerToken keys from the mapping when all orderHashes removed } - public subscribe(callback: OnOrderStateChangeCallback): void { + /** + * Starts an orderStateWatcher subscription. The callback will be notified every time a watched order's + * backing blockchain state has changed. This is a call-to-action for the caller to re-validate the order + * @param callback Receives the orderHash of the order that should be re-validated, together. + * with all the order-relevant blockchain state needed to re-validate the order + * @param numConfirmations Number of confirmed blocks deeps you want to run the orderWatcher from. Passing + * is 0 will watch the backing node's mempool, 3 will emit events when blockchain + * state relevant to a watched order changed 3 blocks ago. + */ + public subscribe(callback: OnOrderStateChangeCallback, numConfirmations: number): void { assert.isFunction('callback', callback); this._callbackAsync = callback; - this._eventWatcher.subscribe(this._onMempoolEventCallbackAsync.bind(this)); + this._eventWatcher.subscribe(this._onMempoolEventCallbackAsync.bind(this), numConfirmations); } + /** + * Ends an orderStateWatcher subscription. + * @param signedOrder The order you wish to stop watching. + */ public unsubscribe(): void { delete this._callbackAsync; this._eventWatcher.unsubscribe(); diff --git a/src/web3_wrapper.ts b/src/web3_wrapper.ts index 01d572654..d03c102b2 100644 --- a/src/web3_wrapper.ts +++ b/src/web3_wrapper.ts @@ -100,6 +100,10 @@ export class Web3Wrapper { const signData = await promisify(this.web3.eth.sign)(address, message); return signData; } + public async getBlockNumberAsync(): Promise<number> { + const blockNumber = await promisify(this.web3.eth.getBlockNumber)(); + return blockNumber; + } public async getBlockAsync(blockParam: string|Web3.BlockParam): Promise<Web3.BlockWithoutTransactionData> { const block = await promisify(this.web3.eth.getBlock)(blockParam); return block; diff --git a/test/order_state_watcher_test.ts b/test/order_state_watcher_test.ts index 966eec2f7..5569d2354 100644 --- a/test/order_state_watcher_test.ts +++ b/test/order_state_watcher_test.ts @@ -41,6 +41,7 @@ describe('OrderStateWatcher', () => { let web3Wrapper: Web3Wrapper; let signedOrder: SignedOrder; const fillableAmount = new BigNumber(5); + const numConfirmations = 0; before(async () => { web3 = web3Factory.create(); zeroEx = new ZeroEx(web3.currentProvider); @@ -103,7 +104,7 @@ describe('OrderStateWatcher', () => { expect(invalidOrderState.error).to.be.equal(ExchangeContractErrs.InsufficientMakerAllowance); done(); }; - zeroEx.orderStateWatcher.subscribe(callback); + zeroEx.orderStateWatcher.subscribe(callback, numConfirmations); await zeroEx.token.setProxyAllowanceAsync(makerToken.address, maker, new BigNumber(0)); })().catch(done); }); @@ -121,7 +122,7 @@ describe('OrderStateWatcher', () => { expect(invalidOrderState.error).to.be.equal(ExchangeContractErrs.InsufficientMakerBalance); done(); }; - zeroEx.orderStateWatcher.subscribe(callback); + zeroEx.orderStateWatcher.subscribe(callback, numConfirmations); const anyRecipient = taker; const makerBalance = await zeroEx.token.getBalanceAsync(makerToken.address, maker); await zeroEx.token.transferAsync(makerToken.address, maker, anyRecipient, makerBalance); @@ -146,7 +147,7 @@ describe('OrderStateWatcher', () => { done(); } }; - zeroEx.orderStateWatcher.subscribe(callback); + zeroEx.orderStateWatcher.subscribe(callback, numConfirmations); const shouldThrowOnInsufficientBalanceOrAllowance = true; await zeroEx.exchange.fillOrderAsync( @@ -180,7 +181,7 @@ describe('OrderStateWatcher', () => { done(); } }; - zeroEx.orderStateWatcher.subscribe(callback); + zeroEx.orderStateWatcher.subscribe(callback, numConfirmations); const shouldThrowOnInsufficientBalanceOrAllowance = true; await zeroEx.exchange.fillOrderAsync( signedOrder, fillAmountInBaseUnits, shouldThrowOnInsufficientBalanceOrAllowance, taker, |