aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/mempool/event_watcher.ts24
-rw-r--r--src/mempool/order_state_watcher.ts25
-rw-r--r--src/web3_wrapper.ts4
-rw-r--r--test/order_state_watcher_test.ts9
4 files changed, 48 insertions, 14 deletions
diff --git a/src/mempool/event_watcher.ts b/src/mempool/event_watcher.ts
index cb8921cfd..3f40606e7 100644
--- a/src/mempool/event_watcher.ts
+++ b/src/mempool/event_watcher.ts
@@ -19,10 +19,10 @@ export class EventWatcher {
DEFAULT_MEMPOOL_POLLING_INTERVAL :
pollingIntervalMs;
}
- public subscribe(callback: MempoolEventCallback): void {
+ public subscribe(callback: MempoolEventCallback, numConfirmations: number): void {
this._callbackAsync = callback;
this._intervalId = intervalUtils.setAsyncExcludingInterval(
- this._pollForMempoolEventsAsync.bind(this), this._pollingIntervalMs,
+ this._pollForMempoolEventsAsync.bind(this, numConfirmations), this._pollingIntervalMs,
);
}
public unsubscribe(): void {
@@ -30,8 +30,8 @@ export class EventWatcher {
this._lastMempoolEvents = [];
intervalUtils.clearAsyncExcludingInterval(this._intervalId);
}
- private async _pollForMempoolEventsAsync(): Promise<void> {
- const pendingEvents = await this._getMempoolEventsAsync();
+ private async _pollForMempoolEventsAsync(numConfirmations: number): Promise<void> {
+ const pendingEvents = await this._getMempoolEventsAsync(numConfirmations);
if (pendingEvents.length === 0) {
// HACK: Sometimes when node rebuilds the pending block we get back the empty result.
// We don't want to emit a lot of removal events and bring them back after a couple of miliseconds,
@@ -46,11 +46,19 @@ export class EventWatcher {
await this._emitDifferencesAsync(newEvents, isRemoved);
this._lastMempoolEvents = pendingEvents;
}
- private async _getMempoolEventsAsync(): Promise<Web3.LogEntry[]> {
- // TODO: Allow users to listen to any number of confirmations deep, not just mempool
+ private async _getMempoolEventsAsync(numConfirmations: number): Promise<Web3.LogEntry[]> {
+ let fromBlock: BlockParamLiteral|number;
+ let toBlock: BlockParamLiteral|number;
+ if (numConfirmations === 0) {
+ fromBlock = BlockParamLiteral.Pending;
+ toBlock = BlockParamLiteral.Pending;
+ } else {
+ toBlock = await this._web3Wrapper.getBlockNumberAsync();
+ fromBlock = toBlock - numConfirmations;
+ }
const mempoolFilter = {
- fromBlock: BlockParamLiteral.Pending,
- toBlock: BlockParamLiteral.Pending,
+ fromBlock,
+ toBlock,
};
const pendingEvents = await this._web3Wrapper.getLogsAsync(mempoolFilter);
return pendingEvents;
diff --git a/src/mempool/order_state_watcher.ts b/src/mempool/order_state_watcher.ts
index 528b8ceff..63e812054 100644
--- a/src/mempool/order_state_watcher.ts
+++ b/src/mempool/order_state_watcher.ts
@@ -50,12 +50,20 @@ export class OrderStateWatcher {
this._abiDecoder = abiDecoder;
this._orderStateUtils = orderStateUtils;
}
+ /**
+ * Add an order to the orderStateWatcher
+ * @param signedOrder The order you wish to start watching.
+ */
public addOrder(signedOrder: SignedOrder): void {
assert.doesConformToSchema('signedOrder', signedOrder, schemas.signedOrderSchema);
const orderHash = ZeroEx.getOrderHashHex(signedOrder);
this._orders[orderHash] = signedOrder;
this.addToDependentOrderHashes(signedOrder, orderHash);
}
+ /**
+ * Removes an order from the orderStateWatcher
+ * @param orderHash The orderHash of the order you wish to stop watching.
+ */
public removeOrder(orderHash: string): void {
assert.doesConformToSchema('orderHash', orderHash, schemas.orderHashSchema);
const signedOrder = this._orders[orderHash];
@@ -66,11 +74,24 @@ export class OrderStateWatcher {
this._dependentOrderHashes[signedOrder.maker][signedOrder.makerTokenAddress].delete(orderHash);
// We currently do not remove the maker/makerToken keys from the mapping when all orderHashes removed
}
- public subscribe(callback: OnOrderStateChangeCallback): void {
+ /**
+ * Starts an orderStateWatcher subscription. The callback will be notified every time a watched order's
+ * backing blockchain state has changed. This is a call-to-action for the caller to re-validate the order
+ * @param callback Receives the orderHash of the order that should be re-validated, together.
+ * with all the order-relevant blockchain state needed to re-validate the order
+ * @param numConfirmations Number of confirmed blocks deeps you want to run the orderWatcher from. Passing
+ * is 0 will watch the backing node's mempool, 3 will emit events when blockchain
+ * state relevant to a watched order changed 3 blocks ago.
+ */
+ public subscribe(callback: OnOrderStateChangeCallback, numConfirmations: number): void {
assert.isFunction('callback', callback);
this._callbackAsync = callback;
- this._eventWatcher.subscribe(this._onMempoolEventCallbackAsync.bind(this));
+ this._eventWatcher.subscribe(this._onMempoolEventCallbackAsync.bind(this), numConfirmations);
}
+ /**
+ * Ends an orderStateWatcher subscription.
+ * @param signedOrder The order you wish to stop watching.
+ */
public unsubscribe(): void {
delete this._callbackAsync;
this._eventWatcher.unsubscribe();
diff --git a/src/web3_wrapper.ts b/src/web3_wrapper.ts
index 01d572654..d03c102b2 100644
--- a/src/web3_wrapper.ts
+++ b/src/web3_wrapper.ts
@@ -100,6 +100,10 @@ export class Web3Wrapper {
const signData = await promisify(this.web3.eth.sign)(address, message);
return signData;
}
+ public async getBlockNumberAsync(): Promise<number> {
+ const blockNumber = await promisify(this.web3.eth.getBlockNumber)();
+ return blockNumber;
+ }
public async getBlockAsync(blockParam: string|Web3.BlockParam): Promise<Web3.BlockWithoutTransactionData> {
const block = await promisify(this.web3.eth.getBlock)(blockParam);
return block;
diff --git a/test/order_state_watcher_test.ts b/test/order_state_watcher_test.ts
index 966eec2f7..5569d2354 100644
--- a/test/order_state_watcher_test.ts
+++ b/test/order_state_watcher_test.ts
@@ -41,6 +41,7 @@ describe('OrderStateWatcher', () => {
let web3Wrapper: Web3Wrapper;
let signedOrder: SignedOrder;
const fillableAmount = new BigNumber(5);
+ const numConfirmations = 0;
before(async () => {
web3 = web3Factory.create();
zeroEx = new ZeroEx(web3.currentProvider);
@@ -103,7 +104,7 @@ describe('OrderStateWatcher', () => {
expect(invalidOrderState.error).to.be.equal(ExchangeContractErrs.InsufficientMakerAllowance);
done();
};
- zeroEx.orderStateWatcher.subscribe(callback);
+ zeroEx.orderStateWatcher.subscribe(callback, numConfirmations);
await zeroEx.token.setProxyAllowanceAsync(makerToken.address, maker, new BigNumber(0));
})().catch(done);
});
@@ -121,7 +122,7 @@ describe('OrderStateWatcher', () => {
expect(invalidOrderState.error).to.be.equal(ExchangeContractErrs.InsufficientMakerBalance);
done();
};
- zeroEx.orderStateWatcher.subscribe(callback);
+ zeroEx.orderStateWatcher.subscribe(callback, numConfirmations);
const anyRecipient = taker;
const makerBalance = await zeroEx.token.getBalanceAsync(makerToken.address, maker);
await zeroEx.token.transferAsync(makerToken.address, maker, anyRecipient, makerBalance);
@@ -146,7 +147,7 @@ describe('OrderStateWatcher', () => {
done();
}
};
- zeroEx.orderStateWatcher.subscribe(callback);
+ zeroEx.orderStateWatcher.subscribe(callback, numConfirmations);
const shouldThrowOnInsufficientBalanceOrAllowance = true;
await zeroEx.exchange.fillOrderAsync(
@@ -180,7 +181,7 @@ describe('OrderStateWatcher', () => {
done();
}
};
- zeroEx.orderStateWatcher.subscribe(callback);
+ zeroEx.orderStateWatcher.subscribe(callback, numConfirmations);
const shouldThrowOnInsufficientBalanceOrAllowance = true;
await zeroEx.exchange.fillOrderAsync(
signedOrder, fillAmountInBaseUnits, shouldThrowOnInsufficientBalanceOrAllowance, taker,