aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLeonid Logvinov <logvinov.leon@gmail.com>2017-10-26 17:50:02 +0800
committerFabio Berger <me@fabioberger.com>2017-10-31 00:49:16 +0800
commitf53472e7170798f56ea4837c310cfd4188326af8 (patch)
tree64f4151571e0fc91df67ee07e2ce286a298e0aba
parent7fa5d34c45f58ebdf729e9f7a500627ab72fb5c8 (diff)
downloaddexon-0x-contracts-f53472e7170798f56ea4837c310cfd4188326af8.tar
dexon-0x-contracts-f53472e7170798f56ea4837c310cfd4188326af8.tar.gz
dexon-0x-contracts-f53472e7170798f56ea4837c310cfd4188326af8.tar.bz2
dexon-0x-contracts-f53472e7170798f56ea4837c310cfd4188326af8.tar.lz
dexon-0x-contracts-f53472e7170798f56ea4837c310cfd4188326af8.tar.xz
dexon-0x-contracts-f53472e7170798f56ea4837c310cfd4188326af8.tar.zst
dexon-0x-contracts-f53472e7170798f56ea4837c310cfd4188326af8.zip
Add initial mempool watching implememtation
-rw-r--r--package.json4
-rw-r--r--src/0x.ts10
-rw-r--r--src/index.ts2
-rw-r--r--src/mempool/mempool_watcher.ts64
-rw-r--r--src/types.ts24
-rw-r--r--test/exchange_wrapper_test.ts24
-rw-r--r--test/token_wrapper_test.ts11
-rw-r--r--yarn.lock8
8 files changed, 115 insertions, 32 deletions
diff --git a/package.json b/package.json
index 8e1cfd502..e615a8a3a 100644
--- a/package.json
+++ b/package.json
@@ -78,14 +78,14 @@
"sinon": "^4.0.0",
"source-map-support": "^0.5.0",
"truffle-hdwallet-provider": "^0.0.3",
- "tslint": "^5.3.2",
+ "tslint": "~5.5.0",
"tslint-config-0xproject": "^0.0.2",
"typedoc": "~0.8.0",
"types-bn": "^0.0.1",
"types-ethereumjs-util": "0xProject/types-ethereumjs-util",
"typescript": "^2.4.1",
"web3-provider-engine": "^13.0.1",
- "web3-typescript-typings": "^0.6.2",
+ "web3-typescript-typings": "^0.7.0",
"webpack": "^3.1.0"
},
"dependencies": {
diff --git a/src/0x.ts b/src/0x.ts
index bc753434c..10db7e158 100644
--- a/src/0x.ts
+++ b/src/0x.ts
@@ -11,6 +11,7 @@ import {assert} from './utils/assert';
import {AbiDecoder} from './utils/abi_decoder';
import {intervalUtils} from './utils/interval_utils';
import {artifacts} from './artifacts';
+import {MempoolWatcher} from './mempool/mempool_watcher';
import {ExchangeWrapper} from './contract_wrappers/exchange_wrapper';
import {TokenRegistryWrapper} from './contract_wrappers/token_registry_wrapper';
import {EtherTokenWrapper} from './contract_wrappers/ether_token_wrapper';
@@ -65,6 +66,10 @@ export class ZeroEx {
* tokenTransferProxy smart contract.
*/
public proxy: TokenTransferProxyWrapper;
+ /**
+ * An instance of the MempoolWatcher class containing methods for watching pending events.
+ */
+ public mempool: MempoolWatcher;
private _web3Wrapper: Web3Wrapper;
private _abiDecoder: AbiDecoder;
/**
@@ -191,6 +196,11 @@ export class ZeroEx {
gasPrice,
};
this._web3Wrapper = new Web3Wrapper(provider, defaults);
+ const mempoolPollingIntervalMs = _.isUndefined(config) ? undefined : config.mempoolPollingIntervalMs;
+ this.mempool = new MempoolWatcher(
+ this._web3Wrapper,
+ mempoolPollingIntervalMs,
+ );
this.token = new TokenWrapper(
this._web3Wrapper,
this._abiDecoder,
diff --git a/src/index.ts b/src/index.ts
index 249c20519..7a9b8aa63 100644
--- a/src/index.ts
+++ b/src/index.ts
@@ -35,4 +35,6 @@ export {
OrderTransactionOpts,
FilterObject,
LogEvent,
+ DecodedLogEvent,
+ MempoolEventCallback,
} from './types';
diff --git a/src/mempool/mempool_watcher.ts b/src/mempool/mempool_watcher.ts
new file mode 100644
index 000000000..be598c28f
--- /dev/null
+++ b/src/mempool/mempool_watcher.ts
@@ -0,0 +1,64 @@
+import * as Web3 from 'web3';
+import * as _ from 'lodash';
+import {Web3Wrapper} from '../web3_wrapper';
+import {BlockParamLiteral, EventCallback, MempoolEventCallback} from '../types';
+import {AbiDecoder} from '../utils/abi_decoder';
+import {intervalUtils} from '../utils/interval_utils';
+
+const DEFAULT_MEMPOOL_POLLING_INTERVAL = 200;
+
+export class MempoolWatcher {
+ private _web3Wrapper: Web3Wrapper;
+ private _pollingIntervalMs: number;
+ private _intervalId: NodeJS.Timer;
+ private _lastMempoolEvents: Web3.LogEntry[] = [];
+ private _callback?: MempoolEventCallback;
+ constructor(web3Wrapper: Web3Wrapper, pollingIntervalMs: undefined|number) {
+ this._web3Wrapper = web3Wrapper;
+ this._pollingIntervalMs = _.isUndefined(pollingIntervalMs) ?
+ DEFAULT_MEMPOOL_POLLING_INTERVAL :
+ pollingIntervalMs;
+ }
+ public subscribe(callback: MempoolEventCallback): void {
+ this._callback = callback;
+ this._intervalId = intervalUtils.setAsyncExcludingInterval(
+ this._pollForMempoolEventsAsync.bind(this), this._pollingIntervalMs);
+ }
+ public unsubscribe(): void {
+ delete this._callback;
+ intervalUtils.clearAsyncExcludingInterval(this._intervalId);
+ }
+ private async _pollForMempoolEventsAsync(): Promise<void> {
+ const pendingEvents = await this._getMempoolEventsAsync();
+ if (pendingEvents.length === 0) {
+ // HACK: Sometimes when node rebuilds the pending block we get back the empty result.
+ // We don't want to emit a lot of removal events and bring them back after a couple of miliseconds,
+ // that's why we just ignore those cases.
+ return;
+ }
+ const removedEvents = _.differenceBy(this._lastMempoolEvents, pendingEvents, _.isEqual);
+ const newEvents = _.differenceBy(pendingEvents, this._lastMempoolEvents, _.isEqual);
+ let isRemoved = true;
+ this._emitDifferences(removedEvents, isRemoved);
+ isRemoved = false;
+ this._emitDifferences(newEvents, isRemoved);
+ this._lastMempoolEvents = pendingEvents;
+ }
+ private async _getMempoolEventsAsync(): Promise<Web3.LogEntry[]> {
+ const mempoolFilter = {
+ fromBlock: BlockParamLiteral.Pending,
+ toBlock: BlockParamLiteral.Pending,
+ };
+ const pendingEvents = await this._web3Wrapper.getLogsAsync(mempoolFilter);
+ return pendingEvents;
+ }
+ private _emitDifferences(logs: Web3.LogEntry[], isRemoved: boolean): void {
+ _.forEach(logs, log => {
+ const logWithDecodedArgsEvent = {
+ removed: isRemoved,
+ ...log,
+ };
+ (this._callback as MempoolEventCallback)(logWithDecodedArgsEvent);
+ });
+ }
+}
diff --git a/src/types.ts b/src/types.ts
index 9ac726ef8..1b32ccdf9 100644
--- a/src/types.ts
+++ b/src/types.ts
@@ -37,12 +37,17 @@ export type OrderAddresses = [string, string, string, string, string];
export type OrderValues = [BigNumber, BigNumber, BigNumber,
BigNumber, BigNumber, BigNumber];
-export interface LogEvent<ArgsType> extends LogWithDecodedArgs<ArgsType> {
- removed: boolean;
-}
-export type EventCallbackAsync<ArgsType> = (log: LogEvent<ArgsType>) => Promise<void>;
-export type EventCallbackSync<ArgsType> = (log: LogEvent<ArgsType>) => void;
+export type LogEvent = Web3.LogEntryEvent;
+export type DecodedLogEvent<ArgsType> = Web3.DecodedLogEntryEvent<ArgsType>;
+
+export type EventCallbackAsync<ArgsType> = (log: DecodedLogEvent<ArgsType>) => Promise<void>;
+export type EventCallbackSync<ArgsType> = (log: DecodedLogEvent<ArgsType>) => void;
export type EventCallback<ArgsType> = EventCallbackSync<ArgsType>|EventCallbackAsync<ArgsType>;
+
+export type MempoolEventCallbackSync = (log: LogEvent) => void;
+export type MempoolEventCallbackAsync = (log: LogEvent) => Promise<void>;
+export type MempoolEventCallback = MempoolEventCallbackSync|MempoolEventCallbackAsync;
+
export interface ExchangeContract extends Web3.ContractInstance {
isValidSignature: {
callAsync: (signerAddressHex: string, dataHex: string, v: number, r: string, s: string,
@@ -394,12 +399,14 @@ export interface JSONRPCPayload {
* exchangeContractAddress: The address of an exchange contract to use
* tokenRegistryContractAddress: The address of a token registry contract to use
* etherTokenContractAddress: The address of an ether token contract to use
+ * mempoolPollingIntervalMs: How often to check for new mempool events
*/
export interface ZeroExConfig {
gasPrice?: BigNumber; // Gas price to use with every transaction
exchangeContractAddress?: string;
tokenRegistryContractAddress?: string;
etherTokenContractAddress?: string;
+ mempoolPollingIntervalMs?: number;
}
export type TransactionReceipt = Web3.TransactionReceipt;
@@ -415,12 +422,7 @@ export interface DecodedLogArgs {
[argName: string]: ContractEventArg;
}
-export interface DecodedArgs<ArgsType> {
- args: ArgsType;
- event: string;
-}
-
-export interface LogWithDecodedArgs<ArgsType> extends Web3.LogEntry, DecodedArgs<ArgsType> {}
+export interface LogWithDecodedArgs<ArgsType> extends Web3.DecodedLogEntry<ArgsType> {}
export interface TransactionReceiptWithDecodedLogs extends Web3.TransactionReceipt {
logs: Array<LogWithDecodedArgs<DecodedLogArgs>|Web3.LogEntry>;
diff --git a/test/exchange_wrapper_test.ts b/test/exchange_wrapper_test.ts
index 7c76499d5..15d1cb3e4 100644
--- a/test/exchange_wrapper_test.ts
+++ b/test/exchange_wrapper_test.ts
@@ -18,6 +18,7 @@ import {
LogFillContractEventArgs,
LogCancelContractEventArgs,
LogEvent,
+ DecodedLogEvent,
} from '../src';
import {DoneCallback, BlockParamLiteral} from '../src/types';
import {FillScenarios} from './utils/fill_scenarios';
@@ -304,11 +305,11 @@ describe('ExchangeWrapper', () => {
orderFillBatch = [
{
signedOrder,
- takerTokenFillAmount: takerTokenFillAmount,
+ takerTokenFillAmount,
},
{
signedOrder: anotherSignedOrder,
- takerTokenFillAmount: takerTokenFillAmount,
+ takerTokenFillAmount,
},
];
});
@@ -647,7 +648,7 @@ describe('ExchangeWrapper', () => {
// Source: https://github.com/mochajs/mocha/issues/2407
it('Should receive the LogFill event when an order is filled', (done: DoneCallback) => {
(async () => {
- const callback = (logEvent: LogEvent<LogFillContractEventArgs>) => {
+ const callback = (logEvent: DecodedLogEvent<LogFillContractEventArgs>) => {
expect(logEvent.event).to.be.equal(ExchangeEvents.LogFill);
done();
};
@@ -655,13 +656,14 @@ describe('ExchangeWrapper', () => {
ExchangeEvents.LogFill, indexFilterValues, callback,
);
await zeroEx.exchange.fillOrderAsync(
- signedOrder, takerTokenFillAmountInBaseUnits, shouldThrowOnInsufficientBalanceOrAllowance, takerAddress,
+ signedOrder, takerTokenFillAmountInBaseUnits, shouldThrowOnInsufficientBalanceOrAllowance,
+ takerAddress,
);
})().catch(done);
});
it('Should receive the LogCancel event when an order is cancelled', (done: DoneCallback) => {
(async () => {
- const callback = (logEvent: LogEvent<LogCancelContractEventArgs>) => {
+ const callback = (logEvent: DecodedLogEvent<LogCancelContractEventArgs>) => {
expect(logEvent.event).to.be.equal(ExchangeEvents.LogCancel);
done();
};
@@ -673,7 +675,7 @@ describe('ExchangeWrapper', () => {
});
it('Outstanding subscriptions are cancelled when zeroEx.setProviderAsync called', (done: DoneCallback) => {
(async () => {
- const callbackNeverToBeCalled = (logEvent: LogEvent<LogFillContractEventArgs>) => {
+ const callbackNeverToBeCalled = (logEvent: DecodedLogEvent<LogFillContractEventArgs>) => {
done(new Error('Expected this subscription to have been cancelled'));
};
await zeroEx.exchange.subscribeAsync(
@@ -683,7 +685,7 @@ describe('ExchangeWrapper', () => {
const newProvider = web3Factory.getRpcProvider();
await zeroEx.setProviderAsync(newProvider);
- const callback = (logEvent: LogEvent<LogFillContractEventArgs>) => {
+ const callback = (logEvent: DecodedLogEvent<LogFillContractEventArgs>) => {
expect(logEvent.event).to.be.equal(ExchangeEvents.LogFill);
done();
};
@@ -691,13 +693,14 @@ describe('ExchangeWrapper', () => {
ExchangeEvents.LogFill, indexFilterValues, callback,
);
await zeroEx.exchange.fillOrderAsync(
- signedOrder, takerTokenFillAmountInBaseUnits, shouldThrowOnInsufficientBalanceOrAllowance, takerAddress,
+ signedOrder, takerTokenFillAmountInBaseUnits, shouldThrowOnInsufficientBalanceOrAllowance,
+ takerAddress,
);
})().catch(done);
});
it('Should cancel subscription when unsubscribe called', (done: DoneCallback) => {
(async () => {
- const callbackNeverToBeCalled = (logEvent: LogEvent<LogFillContractEventArgs>) => {
+ const callbackNeverToBeCalled = (logEvent: DecodedLogEvent<LogFillContractEventArgs>) => {
done(new Error('Expected this subscription to have been cancelled'));
};
const subscriptionToken = await zeroEx.exchange.subscribeAsync(
@@ -705,7 +708,8 @@ describe('ExchangeWrapper', () => {
);
zeroEx.exchange.unsubscribe(subscriptionToken);
await zeroEx.exchange.fillOrderAsync(
- signedOrder, takerTokenFillAmountInBaseUnits, shouldThrowOnInsufficientBalanceOrAllowance, takerAddress,
+ signedOrder, takerTokenFillAmountInBaseUnits, shouldThrowOnInsufficientBalanceOrAllowance,
+ takerAddress,
);
done();
})().catch(done);
diff --git a/test/token_wrapper_test.ts b/test/token_wrapper_test.ts
index b35fa43f9..2f6f126c1 100644
--- a/test/token_wrapper_test.ts
+++ b/test/token_wrapper_test.ts
@@ -17,6 +17,7 @@ import {
TokenContractEventArgs,
LogWithDecodedArgs,
LogEvent,
+ DecodedLogEvent,
} from '../src';
import {BlockchainLifecycle} from './utils/blockchain_lifecycle';
import {TokenUtils} from './utils/token_utils';
@@ -358,7 +359,7 @@ describe('TokenWrapper', () => {
// Source: https://github.com/mochajs/mocha/issues/2407
it('Should receive the Transfer event when tokens are transfered', (done: DoneCallback) => {
(async () => {
- const callback = (logEvent: LogEvent<TransferContractEventArgs>) => {
+ const callback = (logEvent: DecodedLogEvent<TransferContractEventArgs>) => {
expect(logEvent).to.not.be.undefined();
const args = logEvent.args;
expect(args._from).to.be.equal(coinbase);
@@ -373,7 +374,7 @@ describe('TokenWrapper', () => {
});
it('Should receive the Approval event when allowance is being set', (done: DoneCallback) => {
(async () => {
- const callback = (logEvent: LogEvent<ApprovalContractEventArgs>) => {
+ const callback = (logEvent: DecodedLogEvent<ApprovalContractEventArgs>) => {
expect(logEvent).to.not.be.undefined();
const args = logEvent.args;
expect(args._owner).to.be.equal(coinbase);
@@ -388,13 +389,13 @@ describe('TokenWrapper', () => {
});
it('Outstanding subscriptions are cancelled when zeroEx.setProviderAsync called', (done: DoneCallback) => {
(async () => {
- const callbackNeverToBeCalled = (logEvent: LogEvent<TransferContractEventArgs>) => {
+ const callbackNeverToBeCalled = (logEvent: DecodedLogEvent<TransferContractEventArgs>) => {
done(new Error('Expected this subscription to have been cancelled'));
};
zeroEx.token.subscribe(
tokenAddress, TokenEvents.Transfer, indexFilterValues, callbackNeverToBeCalled,
);
- const callbackToBeCalled = (logEvent: LogEvent<TransferContractEventArgs>) => {
+ const callbackToBeCalled = (logEvent: DecodedLogEvent<TransferContractEventArgs>) => {
done();
};
const newProvider = web3Factory.getRpcProvider();
@@ -407,7 +408,7 @@ describe('TokenWrapper', () => {
});
it('Should cancel subscription when unsubscribe called', (done: DoneCallback) => {
(async () => {
- const callbackNeverToBeCalled = (logEvent: LogEvent<TokenContractEventArgs>) => {
+ const callbackNeverToBeCalled = (logEvent: DecodedLogEvent<TokenContractEventArgs>) => {
done(new Error('Expected this subscription to have been cancelled'));
};
const subscriptionToken = zeroEx.token.subscribe(
diff --git a/yarn.lock b/yarn.lock
index eefd9a429..65bfa7476 100644
--- a/yarn.lock
+++ b/yarn.lock
@@ -4685,7 +4685,7 @@ tslint-react@^3.0.0:
version "3.0.0"
resolved "https://registry.yarnpkg.com/tslint-react/-/tslint-react-3.0.0.tgz#00c48ab7f22e91533b62bdef2c162b49447af00a"
-tslint@^5.3.2:
+tslint@~5.5.0:
version "5.5.0"
resolved "https://registry.yarnpkg.com/tslint/-/tslint-5.5.0.tgz#10e8dab3e3061fa61e9442e8cee3982acf20a6aa"
dependencies:
@@ -4940,9 +4940,9 @@ web3-provider-engine@^8.4.0:
xhr "^2.2.0"
xtend "^4.0.1"
-web3-typescript-typings@^0.6.2:
- version "0.6.2"
- resolved "https://registry.yarnpkg.com/web3-typescript-typings/-/web3-typescript-typings-0.6.2.tgz#5dd9bf4dcd1d6dd6897c87d055d1f5cc8f98dfbd"
+web3-typescript-typings@^0.7.0:
+ version "0.7.0"
+ resolved "https://registry.yarnpkg.com/web3-typescript-typings/-/web3-typescript-typings-0.7.0.tgz#a8adcfaa5f4933eddd53d9e592bace3edfffa050"
dependencies:
bignumber.js "^4.0.2"