path: root/python-packages/order_utils/src/zero_ex
diff options
Diffstat (limited to 'python-packages/order_utils/src/zero_ex')
10 files changed, 407 insertions, 0 deletions
diff --git a/python-packages/order_utils/src/zero_ex/__init__.py b/python-packages/order_utils/src/zero_ex/__init__.py
new file mode 100644
index 000000000..e90d833db
--- /dev/null
+++ b/python-packages/order_utils/src/zero_ex/__init__.py
@@ -0,0 +1,2 @@
+"""0x Python API."""
diff --git a/python-packages/order_utils/src/zero_ex/contract_artifacts/__init__.py b/python-packages/order_utils/src/zero_ex/contract_artifacts/__init__.py
new file mode 100644
index 000000000..ed45d2c8e
--- /dev/null
+++ b/python-packages/order_utils/src/zero_ex/contract_artifacts/__init__.py
@@ -0,0 +1 @@
+"""Solc-generated artifacts for 0x smart contracts."""
diff --git a/python-packages/order_utils/src/zero_ex/contract_artifacts/artifacts b/python-packages/order_utils/src/zero_ex/contract_artifacts/artifacts
new file mode 120000
index 000000000..82d28ba87
--- /dev/null
+++ b/python-packages/order_utils/src/zero_ex/contract_artifacts/artifacts
@@ -0,0 +1 @@
+../../../../../packages/contract-artifacts/artifacts \ No newline at end of file
diff --git a/python-packages/order_utils/src/zero_ex/dev_utils/__init__.py b/python-packages/order_utils/src/zero_ex/dev_utils/__init__.py
new file mode 100644
index 000000000..b6a224d2c
--- /dev/null
+++ b/python-packages/order_utils/src/zero_ex/dev_utils/__init__.py
@@ -0,0 +1 @@
+"""Dev utils to be shared across 0x projects and packages."""
diff --git a/python-packages/order_utils/src/zero_ex/dev_utils/abi_utils.py b/python-packages/order_utils/src/zero_ex/dev_utils/abi_utils.py
new file mode 100644
index 000000000..71b6128ca
--- /dev/null
+++ b/python-packages/order_utils/src/zero_ex/dev_utils/abi_utils.py
@@ -0,0 +1,102 @@
+"""Ethereum ABI utilities.
+Builds on the eth-abi package, adding some convenience methods like those found
+in npmjs.com/package/ethereumjs-abi. Ideally, all of this code should be
+pushed upstream into eth-abi.
+import re
+from typing import Any, List
+from mypy_extensions import TypedDict
+from eth_abi import encode_abi
+from web3 import Web3
+from .type_assertions import assert_is_string, assert_is_list
+class MethodSignature(TypedDict, total=False):
+ """Object interface to an ABI method signature."""
+ method: str
+ args: List[str]
+def parse_signature(signature: str) -> MethodSignature:
+ """Parse a method signature into its constituent parts.
+ >>> parse_signature("ERC20Token(address)")
+ {'method': 'ERC20Token', 'args': ['address']}
+ """
+ assert_is_string(signature, "signature")
+ matches = re.match(r"^(\w+)\((.+)\)$", signature)
+ if matches is None:
+ raise ValueError(f"Invalid method signature {signature}")
+ return {"method": matches[1], "args": matches[2].split(",")}
+def elementary_name(name: str) -> str:
+ """Convert from short to canonical names; barely implemented.
+ Modeled after ethereumjs-abi's ABI.elementaryName(), but only implemented
+ to support our particular use case and a few other simple ones.
+ >>> elementary_name("address")
+ 'address'
+ >>> elementary_name("uint")
+ 'uint256'
+ """
+ assert_is_string(name, "name")
+ return {
+ "int": "int256",
+ "uint": "uint256",
+ "fixed": "fixed128x128",
+ "ufixed": "ufixed128x128",
+ }.get(name, name)
+def event_id(name: str, types: List[str]) -> str:
+ """Return the Keccak-256 hash of the given method.
+ >>> event_id("ERC20Token", ["address"])
+ '0xf47261b06eedbfce68afd46d0f3c27c60b03faad319eaf33103611cf8f6456ad'
+ """
+ assert_is_string(name, "name")
+ assert_is_list(types, "types")
+ signature = f"{name}({','.join(list(map(elementary_name, types)))})"
+ return Web3.sha3(text=signature).hex()
+def method_id(name: str, types: List[str]) -> str:
+ """Return the 4-byte method identifier.
+ >>> method_id("ERC20Token", ["address"])
+ '0xf47261b0'
+ """
+ assert_is_string(name, "name")
+ assert_is_list(types, "types")
+ return event_id(name, types)[0:10]
+def simple_encode(method: str, *args: Any) -> bytes:
+ # docstring considered all one line by pylint: disable=line-too-long
+ r"""Encode a method ABI.
+ >>> simple_encode("ERC20Token(address)", "0x1dc4c1cefef38a777b15aa20260a54e584b16c48")
+ b'\xf4ra\xb0\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x1d\xc4\xc1\xce\xfe\xf3\x8aw{\x15\xaa &\nT\xe5\x84\xb1lH'
+ """ # noqa: E501 (line too long)
+ assert_is_string(method, "method")
+ signature: MethodSignature = parse_signature(method)
+ return bytes.fromhex(
+ (
+ method_id(signature["method"], signature["args"])
+ + encode_abi(signature["args"], args).hex()
+ )[2:]
+ )
diff --git a/python-packages/order_utils/src/zero_ex/dev_utils/type_assertions.py b/python-packages/order_utils/src/zero_ex/dev_utils/type_assertions.py
new file mode 100644
index 000000000..08c1b0ea5
--- /dev/null
+++ b/python-packages/order_utils/src/zero_ex/dev_utils/type_assertions.py
@@ -0,0 +1,58 @@
+"""Assertions for runtime type checking of function arguments."""
+from typing import Any
+def assert_is_string(value: Any, name: str) -> None:
+ """If :param value: isn't of type str, raise a TypeError.
+ >>> try: assert_is_string(123, 'var')
+ ... except TypeError as type_error: print(str(type_error))
+ ...
+ expected variable 'var', with value 123, to have type 'str', not 'int'
+ """
+ if not isinstance(value, str):
+ raise TypeError(
+ f"expected variable '{name}', with value {str(value)}, to have"
+ + f" type 'str', not '{type(value).__name__}'"
+ )
+def assert_is_list(value: Any, name: str) -> None:
+ """If :param value: isn't of type list, raise a TypeError.
+ >>> try: assert_is_list(123, 'var')
+ ... except TypeError as type_error: print(str(type_error))
+ ...
+ expected variable 'var', with value 123, to have type 'list', not 'int'
+ """
+ if not isinstance(value, list):
+ raise TypeError(
+ f"expected variable '{name}', with value {str(value)}, to have"
+ + f" type 'list', not '{type(value).__name__}'"
+ )
+def assert_is_int(value: Any, name: str) -> None:
+ """If :param value: isn't of type int, raise a TypeError.
+ >>> try: assert_is_int('asdf', 'var')
+ ... except TypeError as type_error: print(str(type_error))
+ ...
+ expected variable 'var', with value asdf, to have type 'int', not 'str'
+ """
+ if not isinstance(value, int):
+ raise TypeError(
+ f"expected variable '{name}', with value {str(value)}, to have"
+ + f" type 'int', not '{type(value).__name__}'"
+ )
+def assert_is_hex_string(value: Any, name: str) -> None:
+ """Assert that :param value: is a string of hex chars.
+ If :param value: isn't a str, raise a TypeError. If it is a string but
+ contains non-hex characters ("0x" prefix permitted), raise a ValueError.
+ """
+ assert_is_string(value, name)
+ int(value, 16) # raises a ValueError if value isn't a base-16 str
diff --git a/python-packages/order_utils/src/zero_ex/order_utils/__init__.py b/python-packages/order_utils/src/zero_ex/order_utils/__init__.py
new file mode 100644
index 000000000..80445cb6e
--- /dev/null
+++ b/python-packages/order_utils/src/zero_ex/order_utils/__init__.py
@@ -0,0 +1,11 @@
+"""Order utilities for 0x applications.
+Some methods require the caller to pass in a `Web3.HTTPProvider` object. For
+local testing one may construct such a provider pointing at an instance of
+`ganache-cli <https://www.npmjs.com/package/ganache-cli>`_ which has the 0x
+contracts deployed on it. For convenience, a docker container is provided for
+just this purpose. To start it: ``docker run -d -p 8545:8545 0xorg/ganache-cli
+--gasLimit 10000000 --db /snapshot --noVMErrorsOnRPCResponse -p 8545
+--networkId 50 -m "concert load couple harbor equip island argue ramp clarify
+fence smart topic"``.
diff --git a/python-packages/order_utils/src/zero_ex/order_utils/asset_data_utils.py b/python-packages/order_utils/src/zero_ex/order_utils/asset_data_utils.py
new file mode 100644
index 000000000..e6f9a07c1
--- /dev/null
+++ b/python-packages/order_utils/src/zero_ex/order_utils/asset_data_utils.py
@@ -0,0 +1,143 @@
+"""Asset data encoding and decoding utilities."""
+from mypy_extensions import TypedDict
+import eth_abi
+from zero_ex.dev_utils import abi_utils
+from zero_ex.dev_utils.type_assertions import assert_is_string, assert_is_int
+class ERC20AssetData(TypedDict):
+ """Object interface to ERC20 asset data."""
+ asset_proxy_id: str
+ token_address: str
+class ERC721AssetData(TypedDict):
+ """Object interface to ERC721 asset data."""
+ asset_proxy_id: str
+ token_address: str
+ token_id: int
+def encode_erc20_asset_data(token_address: str) -> str:
+ """Encode an ERC20 token address into an asset data string.
+ :param token_address: the ERC20 token's contract address.
+ :rtype: hex encoded asset data string, usable in the makerAssetData or
+ takerAssetData fields in a 0x order.
+ >>> encode_erc20_asset_data('0x1dc4c1cefef38a777b15aa20260a54e584b16c48')
+ '0xf47261b00000000000000000000000001dc4c1cefef38a777b15aa20260a54e584b16c48'
+ """
+ assert_is_string(token_address, "token_address")
+ return (
+ "0x"
+ + abi_utils.simple_encode("ERC20Token(address)", token_address).hex()
+ )
+def decode_erc20_asset_data(asset_data: str) -> ERC20AssetData:
+ # docstring considered all one line by pylint: disable=line-too-long
+ """Decode an ERC20 asset data hex string.
+ :param asset_data: String produced by prior call to encode_erc20_asset_data()
+ >>> decode_erc20_asset_data("0xf47261b00000000000000000000000001dc4c1cefef38a777b15aa20260a54e584b16c48")
+ {'asset_proxy_id': '0xf47261b0', 'token_address': '0x1dc4c1cefef38a777b15aa20260a54e584b16c48'}
+ """ # noqa: E501 (line too long)
+ assert_is_string(asset_data, "asset_data")
+ if len(asset_data) < ERC20_ASSET_DATA_BYTE_LENGTH:
+ raise ValueError(
+ "Could not decode ERC20 Proxy Data. Expected length of encoded"
+ + f" data to be at least {str(ERC20_ASSET_DATA_BYTE_LENGTH)}."
+ + f" Got {str(len(asset_data))}."
+ )
+ asset_proxy_id: str = asset_data[0:SELECTOR_LENGTH]
+ if asset_proxy_id != abi_utils.method_id("ERC20Token", ["address"]):
+ raise ValueError(
+ "Could not decode ERC20 Proxy Data. Expected Asset Proxy Id to be"
+ + f" ERC20 ({abi_utils.method_id('ERC20Token', ['address'])})"
+ + f" but got {asset_proxy_id}."
+ )
+ # workaround for https://github.com/PyCQA/pylint/issues/1498
+ # pylint: disable=unsubscriptable-object
+ token_address = eth_abi.decode_abi(
+ ["address"], bytes.fromhex(asset_data[SELECTOR_LENGTH:])
+ )[0]
+ return {"asset_proxy_id": asset_proxy_id, "token_address": token_address}
+def encode_erc721_asset_data(token_address: str, token_id: int) -> str:
+ # docstring considered all one line by pylint: disable=line-too-long
+ """Encode an ERC721 asset data hex string.
+ :param token_address: the ERC721 token's contract address.
+ :param token_id: the identifier of the asset's instance of the token.
+ :rtype: hex encoded asset data string, usable in the makerAssetData or
+ takerAssetData fields in a 0x order.
+ >>> encode_erc721_asset_data('0x1dc4c1cefef38a777b15aa20260a54e584b16c48', 1)
+ '0x025717920000000000000000000000001dc4c1cefef38a777b15aa20260a54e584b16c480000000000000000000000000000000000000000000000000000000000000001'
+ """ # noqa: E501 (line too long)
+ assert_is_string(token_address, "token_address")
+ assert_is_int(token_id, "token_id")
+ return (
+ "0x"
+ + abi_utils.simple_encode(
+ "ERC721Token(address,uint256)", token_address, token_id
+ ).hex()
+ )
+def decode_erc721_asset_data(asset_data: str) -> ERC721AssetData:
+ # docstring considered all one line by pylint: disable=line-too-long
+ """Decode an ERC721 asset data hex string.
+ >>> decode_erc721_asset_data('0x025717920000000000000000000000001dc4c1cefef38a777b15aa20260a54e584b16c480000000000000000000000000000000000000000000000000000000000000001')
+ {'asset_proxy_id': '0x02571792', 'token_address': '0x1dc4c1cefef38a777b15aa20260a54e584b16c48', 'token_id': 1}
+ """ # noqa: E501 (line too long)
+ assert_is_string(asset_data, "asset_data")
+ if len(asset_data) < ERC721_ASSET_DATA_MINIMUM_BYTE_LENGTH:
+ raise ValueError(
+ "Could not decode ERC721 Asset Data. Expected length of encoded"
+ + f"data to be at least {ERC721_ASSET_DATA_MINIMUM_BYTE_LENGTH}. "
+ + f"Got {len(asset_data)}."
+ )
+ asset_proxy_id: str = asset_data[0:SELECTOR_LENGTH]
+ # prefer `black` formatting. pylint: disable=C0330
+ if asset_proxy_id != abi_utils.method_id(
+ "ERC721Token", ["address", "uint256"]
+ ):
+ raise ValueError(
+ "Could not decode ERC721 Asset Data. Expected Asset Proxy Id to be"
+ + f" ERC721 ("
+ + f"{abi_utils.method_id('ERC721Token', ['address', 'uint256'])}"
+ + f"), but got {asset_proxy_id}"
+ )
+ (token_address, token_id) = eth_abi.decode_abi(
+ ["address", "uint256"], bytes.fromhex(asset_data[SELECTOR_LENGTH:])
+ )
+ return {
+ "asset_proxy_id": asset_proxy_id,
+ "token_address": token_address,
+ "token_id": token_id,
+ }
diff --git a/python-packages/order_utils/src/zero_ex/order_utils/py.typed b/python-packages/order_utils/src/zero_ex/order_utils/py.typed
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/python-packages/order_utils/src/zero_ex/order_utils/py.typed
diff --git a/python-packages/order_utils/src/zero_ex/order_utils/signature_utils.py b/python-packages/order_utils/src/zero_ex/order_utils/signature_utils.py
new file mode 100644
index 000000000..12525ba88
--- /dev/null
+++ b/python-packages/order_utils/src/zero_ex/order_utils/signature_utils.py
@@ -0,0 +1,88 @@
+"""Signature utilities."""
+from typing import Dict, Tuple
+import json
+from pkg_resources import resource_string
+from eth_utils import is_address, to_checksum_address
+from web3 import Web3
+import web3.exceptions
+from web3.utils import datatypes
+from zero_ex.dev_utils.type_assertions import assert_is_hex_string
+# prefer `black` formatting. pylint: disable=C0330
+EXCHANGE_ABI = json.loads(
+ resource_string("zero_ex.contract_artifacts", "artifacts/Exchange.json")
+network_to_exchange_addr: Dict[str, str] = {
+ "1": "0x4f833a24e1f95d70f028921e27040ca56e09ab0b",
+ "3": "0x4530c0483a1633c7a1c97d2c53721caff2caaaaf",
+ "42": "0x35dd2932454449b14cee11a94d3674a936d5d7b2",
+ "50": "0x48bacb9266a570d521063ef5dd96e61686dbe788",
+# prefer `black` formatting. pylint: disable=C0330
+def is_valid_signature(
+ provider: Web3.HTTPProvider, data: str, signature: str, signer_address: str
+) -> Tuple[bool, str]:
+ # docstring considered all one line by pylint: disable=line-too-long
+ """Check the validity of the supplied signature.
+ Check if the supplied ``signature`` corresponds to signing ``data`` with
+ the private key corresponding to ``signer_address``.
+ :param provider: A Web3 provider able to access the 0x Exchange contract.
+ :param data: The hex encoded data signed by the supplied signature.
+ :param signature: The hex encoded signature.
+ :param signer_address: The hex encoded address that signed the data to
+ produce the supplied signature.
+ :rtype: Boolean indicating whether the given signature is valid.
+ >>> is_valid_signature(
+ ... Web3.HTTPProvider(""),
+ ... '0x6927e990021d23b1eb7b8789f6a6feaf98fe104bb0cf8259421b79f9a34222b0',
+ ... '0x1B61a3ed31b43c8780e905a260a35faefcc527be7516aa11c0256729b5b351bc3340349190569279751135161d22529dc25add4f6069af05be04cacbda2ace225403',
+ ... '0x5409ed021d9299bf6814279a6a1411a7e866a631',
+ ... )
+ (True, '')
+ """ # noqa: E501 (line too long)
+ # TODO: make this provider check more flexible. pylint: disable=fixme
+ # https://app.asana.com/0/684263176955174/901300863045491/f
+ if not isinstance(provider, Web3.HTTPProvider):
+ raise TypeError("provider is not a Web3.HTTPProvider")
+ assert_is_hex_string(data, "data")
+ assert_is_hex_string(signature, "signature")
+ assert_is_hex_string(signer_address, "signer_address")
+ if not is_address(signer_address):
+ raise ValueError("signer_address is not a valid address")
+ web3_instance = Web3(provider)
+ # false positive from pylint: disable=no-member
+ network_id = web3_instance.net.version
+ contract_address = network_to_exchange_addr[network_id]
+ # false positive from pylint: disable=no-member
+ contract: datatypes.Contract = web3_instance.eth.contract(
+ address=to_checksum_address(contract_address), abi=EXCHANGE_ABI
+ )
+ try:
+ return (
+ contract.call().isValidSignature(
+ data, to_checksum_address(signer_address), signature
+ ),
+ "",
+ )
+ except web3.exceptions.BadFunctionCallOutput as exception:
+ known_revert_reasons = [
+ ]
+ for known_revert_reason in known_revert_reasons:
+ if known_revert_reason in str(exception):
+ return (False, known_revert_reason)
+ return (False, f"Unknown: {exception}")