From 09790d33f79cc0be82a26109c924596d6f06e7a1 Mon Sep 17 00:00:00 2001 From: Erik Larsson Date: Sun, 17 Dec 2023 01:25:40 +0100 Subject: [PATCH] cryptography: add module for using TPM keys with the cryptography module This implements the necessary interfaces to use RSA and ECC keys with the cryptography modules. Enabling for example creating a CA or CSR with TPM-backed keys. Signed-off-by: Erik Larsson --- docs/api.rst | 1 + docs/cryptography.rst | 6 + src/tpm2_pytss/cryptography.py | 430 +++++++++++++++++++++++++ src/tpm2_pytss/internal/crypto.py | 64 ++++ test/test_cryptography.py | 504 ++++++++++++++++++++++++++++++ 5 files changed, 1005 insertions(+) create mode 100644 docs/cryptography.rst create mode 100644 src/tpm2_pytss/cryptography.py create mode 100644 test/test_cryptography.py diff --git a/docs/api.rst b/docs/api.rst index 5ecb2e28..f4bce0d8 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -14,3 +14,4 @@ The API documentation for the tpm2-pytss project. Utility Routines TSS PEM Key (OpenSSL) TSS2_Exception + cryptography diff --git a/docs/cryptography.rst b/docs/cryptography.rst new file mode 100644 index 00000000..48e9d097 --- /dev/null +++ b/docs/cryptography.rst @@ -0,0 +1,6 @@ +cryptography +============ + +.. automodule:: tpm2_pytss.cryptography + :members: + :undoc-members: diff --git a/src/tpm2_pytss/cryptography.py b/src/tpm2_pytss/cryptography.py new file mode 100644 index 00000000..dd75623a --- /dev/null +++ b/src/tpm2_pytss/cryptography.py @@ -0,0 +1,430 @@ +# SPDX-License-Identifier: BSD-2 + +from .ESAPI import ESAPI +from .constants import ESYS_TR, TPM2_ALG, TPMA_OBJECT, TPM2_ST, TPM2_RH +from .types import ( + TPMT_RSA_DECRYPT, + TPM2B_DATA, + TPMT_SIG_SCHEME, + TPMT_TK_HASHCHECK, + TPM2B_ECC_POINT, + TPMT_ASYM_SCHEME, + TPMT_ECC_SCHEME, + TPMU_SIG_SCHEME, +) +from .internal.crypto import ( + public_to_key, + _get_curve, + _rsa_decrypt_padding_to_scheme, + _rsa_sign_padding_to_scheme, + _int_to_buffer, + _ecc_sign_algorithm_to_scheme, + _get_digest, +) +from typing import Union +from cryptography.hazmat.primitives.asymmetric import rsa, ec, padding +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric.utils import Prehashed +from cryptography.hazmat.primitives.serialization import ( + Encoding, + PrivateFormat, + KeySerializationEncryption, +) + + +def _compare_schemes( + in_scheme: Union[TPMT_RSA_DECRYPT, TPMT_SIG_SCHEME], key_scheme: TPMT_SIG_SCHEME +) -> None: + """Compare a keys scheme and any scheme passed to sign/decrypt functions. + + Raises: + ValueError: On any scheme mismatch. + """ + if key_scheme.scheme == TPM2_ALG.NULL: + return + if in_scheme.scheme != key_scheme.scheme: + raise ValueError( + f"invalid scheme, scheme has {in_scheme.scheme} but key requires {key_scheme.scheme}" + ) + if in_scheme.scheme == TPM2_ALG.RSAES: + return + if isinstance(in_scheme.details, TPMU_SIG_SCHEME): + halg = in_scheme.details.any.hashAlg + else: + halg = in_scheme.details.anySig.hashAlg + if halg != key_scheme.details.anySig.hashAlg: + raise ValueError( + f"digest algorithm mismatch, scheme has {halg} but key requires {key_scheme.details.anySig.hashAlg}" + ) + + +class tpm_rsa_private_key(rsa.RSAPrivateKey): + """Interface to a TPM RSA key for use with the cryptography module. + + Args: + ectx (ESAPI): The ESAPI instance to use. + handle (ESYS_TR): The key handle. + session (ESYS_TR): The session to authorize usage of the key, default is ESYS_TR.PASSWORD + + Notes: + It is recommended to use the :func:`get_digest_algorithm`, :func:`get_decryption_padding` and :func:`get_signature_padding` methods for highest compatibility. + + Raises: + ValueError: If the key has the restricted bit set or if the handle doesn't reference an RSA key. + """ + + def __init__( + self, ectx: ESAPI, handle: ESYS_TR, session: ESYS_TR = ESYS_TR.PASSWORD + ): + self._handle = handle + self._session = session + self._ectx = ectx + public, _, _ = ectx.read_public(handle) + self._public = public.publicArea + if self._public.type != TPM2_ALG.RSA: + raise ValueError( + f"invalid key type, expected {TPM2_ALG.RSA}, got {self._public.type}" + ) + if self._public.objectAttributes & TPMA_OBJECT.RESTRICTED: + raise ValueError( + "TPM key does not allow generic signing and/or decryption (object attribute restricted is set)" + ) + + def decrypt(self, ciphertext: bytes, padding: padding) -> bytes: + """Implements the decrypt interface. + + See :py:meth:`~cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey.decrypt` for documentation. + + Notes: + If a non-empty label is used with OAEP padding, this will fail. + + Raises: + ValueError: if the requested padding isn't supported by the key. + """ + if not self._public.objectAttributes & TPMA_OBJECT.DECRYPT: + raise ValueError( + "TPM key does not allow decryption (object attribute decrypt is not set)" + ) + scheme = TPMT_RSA_DECRYPT() + _rsa_decrypt_padding_to_scheme(padding, scheme) + _compare_schemes(scheme, self._public.parameters.rsaDetail.scheme) + data2b = self._ectx.rsa_decrypt( + self._handle, ciphertext, scheme, TPM2B_DATA(), session1=self._session + ) + return bytes(data2b) + + def public_key(self) -> rsa.RSAPublicKey: + """Get the public key. + + Returns: the public part of the RSA key as a :py:class:`~cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey`. + """ + return public_to_key(self._public) + + @property + def key_size(self) -> int: + """The RSA key size""" + return self._public.parameters.rsaDetail.keyBits + + def get_digest_algorithm(self) -> hashes.HashAlgorithm: + """Get an usable digest algorithm for use with the key. + + If any scheme with a specified digest algorithm is specified return that algorithm. + Otherwise the name digest algorithm is returned. + + The returned digest algorithm can be used with different cryptography functions. + + Returns: + The digest algorithm as a :py:class:`~cryptography.hazmat.primitives.hashes.HashAlgorithm` subclass. + + Raises: + ValueError: If the digest algorithm is not supported. + """ + if self._public.parameters.rsaDetail.scheme.scheme in ( + TPM2_ALG.RSASSA, + TPM2_ALG.RSAPSS, + TPM2_ALG.OAEP, + ): + tpm_alg = self._public.parameters.rsaDetail.scheme.details.anySig.hashAlg + else: + tpm_alg = self._public.nameAlg + halg = _get_digest(tpm_alg) + if halg is None: + raise ValueError(f"unsupported digest algorithm {tpm_alg}") + return halg + + def get_decryption_padding(self) -> padding.AsymmetricPadding: + """Get a padding configuration for use with the decrypt method. + + If the key has a scheme specified, use that scheme. + Otherwise, use OAEP as the default. + + Returns: + An instance of :py:class:`~cryptography.hazmat.primitives.asymmetric.padding.AsymmetricPadding`. + + Raises: + ValueError: If the either the scheme or digest algorithm is unsupported. + """ + if self._public.parameters.asymDetail.scheme.scheme == TPM2_ALG.NULL: + scheme = TPMT_ASYM_SCHEME(scheme=TPM2_ALG.OAEP) + scheme.details.anySig.hashAlg = self._public.nameAlg + else: + scheme = self._public.parameters.asymDetail.scheme + if scheme.scheme == TPM2_ALG.OAEP: + algorithm = self.get_digest_algorithm() + decrypt_padding = padding.OAEP( + mgf=padding.MGF1(algorithm=algorithm()), + algorithm=algorithm(), + label=b"", + ) + elif scheme.scheme == TPM2_ALG.RSAES: + decrypt_padding = padding.PKCS1v15() + else: + raise ValueError(f"unsupported decryption scheme {scheme.scheme}") + return decrypt_padding + + def get_signature_padding(self) -> padding.AsymmetricPadding: + """Get a padding configuration for use with the sign method. + + If the key has a scheme specified, use that scheme. + Otherwise, use PSS as the default. + + Returns: + An instance of :py:class:`~cryptography.hazmat.primitives.asymmetric.padding.AsymmetricPadding`. + + Raises: ValueError if the either the scheme or digest algorithm is unsupported. + """ + if self._public.parameters.asymDetail.scheme.scheme == TPM2_ALG.NULL: + scheme = TPMT_ASYM_SCHEME(scheme=TPM2_ALG.RSAPSS) + scheme.details.anySig.hashAlg = self._public.nameAlg + else: + scheme = self._public.parameters.asymDetail.scheme + if scheme.scheme == TPM2_ALG.RSAPSS: + algorithm = self.get_digest_algorithm() + sign_padding = padding.PSS( + mgf=padding.MGF1(algorithm=algorithm()), + salt_length=padding.PSS.DIGEST_LENGTH, + ) + elif scheme.scheme == TPM2_ALG.RSASSA: + sign_padding = padding.PKCS1v15() + else: + raise ValueError(f"unsupported signature scheme {scheme.scheme}") + return sign_padding + + def sign( + self, + data: bytes, + padding: padding, + algorithm: Union[hashes.HashAlgorithm, Prehashed], + ) -> bytes: + """Implements the sign interface. + + See :py:meth:`~cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey.sign` for documentationen. + + Notes: + For PSS padding, the salt length should be set to the length of the digest as that is the only setup the TPM uses. + + Raises: + ValueError: If the requested padding isn't supported by the key or the sign_encrypt bit isn't set. + """ + if not self._public.objectAttributes & TPMA_OBJECT.SIGN_ENCRYPT: + raise ValueError( + "TPM key does not allow signing (object attribute sign_encrypt is not set)" + ) + if isinstance(algorithm, Prehashed): + raise ValueError("Prehashed data is not supported") + scheme = TPMT_SIG_SCHEME() + _rsa_sign_padding_to_scheme(padding, type(algorithm), scheme) + _compare_schemes(scheme, self._public.parameters.rsaDetail.scheme) + h = hashes.Hash(algorithm) + h.update(data) + digest = h.finalize() + validation = TPMT_TK_HASHCHECK(tag=TPM2_ST.HASHCHECK, hierarchy=TPM2_RH.NULL) + tpm_sig = self._ectx.sign( + self._handle, digest, scheme, validation, session1=self._session + ) + return bytes(tpm_sig) + + def private_numbers(self) -> None: + """Always raises a NotImplementedError.""" + raise NotImplementedError() + + def private_bytes( + self, + encoding: Encoding, + format: PrivateFormat, + encryption_algorithm: KeySerializationEncryption, + ) -> None: + """Always raises a NotImplementedError.""" + raise NotImplementedError() + + +class tpm_ecc_private_key(ec.EllipticCurvePrivateKey): + """Interface to a TPM ECC key for use with the cryptography module. + + Args: + ectx (ESAPI): The ESAPI instance to use. + handle (ESYS_TR): The key handle. + session (ESYS_TR): The session to authorize usage of the key, default is ESYS_TR.PASSWORD + + Notes: + It is recommended to use the :func:`get_digest_algorithm` and :func:`get_signature_algorithm` methods for highest compatibility. + + Raises: + ValueError: If the key has the restricted bit set, the curve isn't supported or if the handle doesn't reference an ECC key. + """ + + def __init__( + self, ectx: ESAPI, handle: ESYS_TR, session: ESYS_TR = ESYS_TR.PASSWORD + ): + self._handle = handle + self._session = session + self._ectx = ectx + public, _, _ = ectx.read_public(handle) + self._public = public.publicArea + if self._public.type != TPM2_ALG.ECC: + raise ValueError( + f"invalid key type, expected {TPM2_ALG.ECC}, got {self._public.type}" + ) + if self._public.objectAttributes & TPMA_OBJECT.RESTRICTED: + raise ValueError( + "TPM key does not allow generic signing and/or decryption (object attribute restricted is set)" + ) + cid = _get_curve(self._public.parameters.eccDetail.curveID) + if cid is None: + raise ValueError( + f"unsupported curve {self._public.parameters.eccDetail.curveID}" + ) + self._curve = cid + + def exchange( + self, algorithm: ec.ECDH, peer_public_key: ec.EllipticCurvePublicKey + ) -> bytes: + """Implements the exchange interface. + + See :py:meth:`~cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey.exchange` for documentationen. + + Raises: + ValueError: If the curves does not match or the decrypt bit isn't set. + """ + if not self._public.objectAttributes & TPMA_OBJECT.DECRYPT: + raise ValueError( + "TPM key does not allow ECDH key exchange (object attribute decrypt is not set)" + ) + if type(peer_public_key.curve) != type(self.curve): + raise ValueError( + f"curve mismatch for peer key, got {peer_public_key.curve.name}, expected {self.curve.name}" + ) + scheme = TPMT_SIG_SCHEME(scheme=TPM2_ALG.ECDH) + _compare_schemes(scheme, self._public.parameters.eccDetail.scheme) + in_point = TPM2B_ECC_POINT() + nums = peer_public_key.public_numbers() + _int_to_buffer(nums.x, in_point.point.x) + _int_to_buffer(nums.y, in_point.point.y) + + out_point = self._ectx.ecdh_zgen(self._handle, in_point, session1=self._session) + return bytes(out_point.point.x) + + def public_key(self) -> ec.EllipticCurvePublicKey: + """Get the public key. + + Returns: the public part of the ECC key as a :py:class:`~cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey` + """ + return public_to_key(self._public) + + def get_digest_algorithm(self) -> hashes.HashAlgorithm: + """Get an usable digest algorithm for use with the key. + + If any scheme with a specified digest algorithm is specified return that algorithm. + Otherwise the name digest algorithm is returned. + + The returned digest algorithm can be used with different cryptography functions. + + Returns: + The digest algorithm as a :py:class:`~cryptography.hazmat.primitives.hashes.HashAlgorithm` subclass. + + Raises: + ValueError: If the digest algorithm is not supported. + """ + if self._public.parameters.eccDetail.scheme.scheme == TPM2_ALG.ECDSA: + tpm_alg = self._public.parameters.eccDetail.scheme.details.anySig.hashAlg + else: + tpm_alg = self._public.nameAlg + halg = _get_digest(tpm_alg) + if halg is None: + raise ValueError(f"unsupported digest algorithm {tpm_alg}") + return halg + + def get_signature_algorithm(self) -> ec.EllipticCurveSignatureAlgorithm: + """Get a padding configuration for use with the sign method. + + If the key has a scheme specified, use that scheme. + Otherwise, use ECDSA as the default + + Returns: an instance of :py:class:`~cryptography.hazmat.primitives.asymmetric.ec.EllipticCurveSignatureAlgorithm` + + Raises: + ValueError: If the either the scheme or digest algorithm is unsupported. + """ + if self._public.parameters.eccDetail.scheme.scheme == TPM2_ALG.NULL: + scheme = TPMT_ECC_SCHEME(scheme=TPM2_ALG.ECDSA) + scheme.details.anySig.hashAlg = self._public.nameAlg + else: + scheme = self._public.parameters.eccDetail.scheme + if scheme.scheme == TPM2_ALG.ECDSA: + algorithm = self.get_digest_algorithm() + sig_alg = ec.ECDSA(algorithm()) + else: + raise ValueError(f"unsupported signature scheme {scheme.scheme}") + return sig_alg + + def sign( + self, data: bytes, signature_algorithm: ec.EllipticCurveSignatureAlgorithm + ) -> bytes: + """Implements the sign interface. + + See :py:meth:`~cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey.sign`: for documentation. + + Raises: + ValueError: if the requested signature algorithm isn't supported by the key or the sign_encrypt bit isn't set. + """ + if not self._public.objectAttributes & TPMA_OBJECT.SIGN_ENCRYPT: + raise ValueError( + "TPM key does not allow signing (object attribute sign_encrypt is not set)" + ) + algorithm = signature_algorithm.algorithm + if isinstance(algorithm, Prehashed): + raise ValueError("Prehashed data is not supported") + scheme = TPMT_SIG_SCHEME() + _ecc_sign_algorithm_to_scheme(signature_algorithm, scheme) + _compare_schemes(scheme, self._public.parameters.eccDetail.scheme) + h = hashes.Hash(algorithm) + h.update(data) + digest = h.finalize() + validation = TPMT_TK_HASHCHECK(tag=TPM2_ST.HASHCHECK, hierarchy=TPM2_RH.NULL) + tpm_sig = self._ectx.sign( + self._handle, digest, scheme, validation, session1=self._session + ) + return bytes(tpm_sig) + + @property + def curve(self) -> ec.EllipticCurve: + """The ECC curve.""" + return self._curve() + + @property + def key_size(self) -> int: + """The ECC key size.""" + return self.public_key().key_size + + def private_numbers(self) -> None: + """Always raises a NotImplementedError.""" + raise NotImplementedError() + + def private_bytes( + self, + encoding: Encoding, + format: PrivateFormat, + encryption_algorithm: KeySerializationEncryption, + ) -> None: + """Always raises a NotImplementedError.""" + raise NotImplementedError() diff --git a/src/tpm2_pytss/internal/crypto.py b/src/tpm2_pytss/internal/crypto.py index 93e51813..600f1510 100644 --- a/src/tpm2_pytss/internal/crypto.py +++ b/src/tpm2_pytss/internal/crypto.py @@ -84,6 +84,13 @@ def _get_digest(digestid): return None +def _get_pyca_digest(digest_type): + for (algid, d) in _digesttable: + if issubclass(digest_type, d): + return algid + return None + + def _get_alg(alg): for (algid, a) in _algtable: if algid == alg: @@ -644,3 +651,60 @@ def _decrypt( decr = ciph.decryptor() plaintextdata = decr.update(data) + decr.finalize() return plaintextdata + + +def _rsa_decrypt_padding_to_scheme( + decrypt_padding: padding.AsymmetricPadding, scheme: "TPMT_RSA_DECRYPT" +): + if isinstance(decrypt_padding, padding.OAEP): + if hasattr(decrypt_padding, "algorithm"): + alg = decrypt_padding.algorithm + elif hasattr(decrypt_padding, "_algorithm"): + # This is an ugly hack, but until cryptography 42 is released it's needed. + alg = type(decrypt_padding._algorithm) + else: + raise ValueError("unable to get hash algorithm from OAEP padding") + scheme.scheme = TPM2_ALG.OAEP + halg = _get_pyca_digest(alg) + if halg is None: + raise ValueError(f"unsupported digest algorithm {alg}") + scheme.details.oaep.hashAlg = halg + elif isinstance(decrypt_padding, padding.PKCS1v15): + scheme.scheme = TPM2_ALG.RSAES + else: + raise ValueError(f"unsupported RSA decryption scheme: {decrypt_padding}") + return + + +def _rsa_sign_padding_to_scheme( + sign_padding: padding.AsymmetricPadding, + algorithm: hashes.HashAlgorithm, + scheme: "TPMT_SIG_SCHEME", +): + if isinstance(sign_padding, padding.PSS): + scheme.scheme = TPM2_ALG.RSAPSS + + elif isinstance(sign_padding, padding.PKCS1v15): + scheme.scheme = TPM2_ALG.RSASSA + else: + raise ValueError(f"unsupported RSA signature scheme: {sign_padding}") + halg = _get_pyca_digest(algorithm) + if halg is None: + raise ValueError(f"unsupported digest algorithm {algorithm}") + scheme.details.any.hashAlg = halg + return + + +def _ecc_sign_algorithm_to_scheme( + sign_alg: ec.EllipticCurveSignatureAlgorithm, scheme: "TPMT_SIG_SCHEME" +): + if isinstance(sign_alg, ec.ECDSA): + scheme.scheme = TPM2_ALG.ECDSA + algorithm = sign_alg.algorithm + else: + raise ValueError(f"unsupported ECC signature scheme: {sign_alg}") + halg = _get_pyca_digest(type(algorithm)) + if halg is None: + raise ValueError(f"unsupported digest algorithm {algorithm}") + scheme.details.any.hashAlg = halg + return diff --git a/test/test_cryptography.py b/test/test_cryptography.py new file mode 100644 index 00000000..ed3dd6f0 --- /dev/null +++ b/test/test_cryptography.py @@ -0,0 +1,504 @@ +#!/usr/bin/python3 -u +# SPDX-License-Identifier: BSD-2 + +from .TSS2_BaseTest import TSS2_EsapiTest +from tpm2_pytss.constants import TPMA_OBJECT, TPM2_ECC, TPM2_ALG +from tpm2_pytss.types import TPM2B_PUBLIC +from tpm2_pytss.cryptography import tpm_rsa_private_key, tpm_ecc_private_key +from cryptography.hazmat.primitives.asymmetric.padding import OAEP, MGF1, PKCS1v15, PSS +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric.utils import Prehashed +from cryptography import x509 +import datetime + + +rsa_template = TPM2B_PUBLIC.parse( + "rsa2048", + objectAttributes=TPMA_OBJECT.DECRYPT + | TPMA_OBJECT.SIGN_ENCRYPT + | TPMA_OBJECT.FIXEDTPM + | TPMA_OBJECT.FIXEDPARENT + | TPMA_OBJECT.SENSITIVEDATAORIGIN + | TPMA_OBJECT.USERWITHAUTH, +) + +ecc_template = TPM2B_PUBLIC.parse( + "ecc256", + objectAttributes=TPMA_OBJECT.DECRYPT + | TPMA_OBJECT.SIGN_ENCRYPT + | TPMA_OBJECT.FIXEDTPM + | TPMA_OBJECT.FIXEDPARENT + | TPMA_OBJECT.SENSITIVEDATAORIGIN + | TPMA_OBJECT.USERWITHAUTH, +) + + +class TestCryptography(TSS2_EsapiTest): + def test_rsa_key(self): + handle, _, _, _, _ = self.ectx.create_primary( + in_sensitive=None, in_public=rsa_template + ) + privkey = tpm_rsa_private_key(self.ectx, handle) + self.assertEqual(privkey.key_size, 2048) + + with self.assertRaises(NotImplementedError) as e: + privkey.private_numbers() + + with self.assertRaises(NotImplementedError) as e: + privkey.private_bytes(encoding=None, format=None, encryption_algorithm=None) + + def test_rsa_decrypt_oaep(self): + handle, _, _, _, _ = self.ectx.create_primary( + in_sensitive=None, in_public=rsa_template + ) + privkey = tpm_rsa_private_key(self.ectx, handle) + pubkey = privkey.public_key() + + padding = privkey.get_decryption_padding() + encrypted_data = pubkey.encrypt(b"falafel", padding) + + decrypted_data = privkey.decrypt(encrypted_data, padding) + self.assertEqual(decrypted_data, b"falafel") + + def test_rsa_decrypt_pkcs1v15(self): + rsaes = TPM2B_PUBLIC(rsa_template) + rsaes.publicArea.parameters.rsaDetail.scheme.scheme = TPM2_ALG.RSAES + rsaes.publicArea.objectAttributes ^= TPMA_OBJECT.SIGN_ENCRYPT + handle, _, _, _, _ = self.ectx.create_primary( + in_sensitive=None, in_public=rsaes + ) + privkey = tpm_rsa_private_key(self.ectx, handle) + pubkey = privkey.public_key() + + padding = privkey.get_decryption_padding() + encrypted_data = pubkey.encrypt(b"falafel", padding) + + decrypted_data = privkey.decrypt(encrypted_data, padding) + self.assertEqual(decrypted_data, b"falafel") + + def test_rsa_key_bad_type(self): + handle, _, _, _, _ = self.ectx.create_primary( + in_sensitive=None, in_public=ecc_template + ) + with self.assertRaises(ValueError) as e: + tpm_rsa_private_key(self.ectx, handle) + self.assertEqual(str(e.exception), "invalid key type, expected rsa, got ecc") + + def test_rsa_key_restricted(self): + handle, _, _, _, _ = self.ectx.create_primary( + in_sensitive=None, in_public="rsa2048" + ) + with self.assertRaises(ValueError) as e: + tpm_rsa_private_key(self.ectx, handle) + self.assertEqual( + str(e.exception), + "TPM key does not allow generic signing and/or decryption (object attribute restricted is set)", + ) + + def test_rsa_sign_pss(self): + handle, _, _, _, _ = self.ectx.create_primary( + in_sensitive=None, in_public=rsa_template + ) + privkey = tpm_rsa_private_key(self.ectx, handle) + pubkey = privkey.public_key() + + padding = privkey.get_signature_padding() + halg = privkey.get_digest_algorithm() + + sig = privkey.sign(b"falafel", padding, halg()) + pubkey.verify(sig, b"falafel", padding, halg()) + + def test_rsa_sign_pkcs1v15(self): + rsassa = TPM2B_PUBLIC(rsa_template) + rsassa.publicArea.parameters.rsaDetail.scheme.scheme = TPM2_ALG.RSASSA + rsassa.publicArea.parameters.rsaDetail.scheme.details.anySig.hashAlg = ( + TPM2_ALG.SHA384 + ) + rsassa.publicArea.objectAttributes ^= TPMA_OBJECT.DECRYPT + + handle, _, _, _, _ = self.ectx.create_primary( + in_sensitive=None, in_public=rsassa + ) + privkey = tpm_rsa_private_key(self.ectx, handle) + pubkey = privkey.public_key() + + padding = privkey.get_signature_padding() + halg = privkey.get_digest_algorithm() + + sig = privkey.sign(b"falafel", padding, halg()) + pubkey.verify(sig, b"falafel", padding, halg()) + + def test_rsa_no_decrypt(self): + rsa_no_decrypt = TPM2B_PUBLIC(rsa_template) + rsa_no_decrypt.publicArea.objectAttributes ^= TPMA_OBJECT.DECRYPT + handle, _, _, _, _ = self.ectx.create_primary( + in_sensitive=None, in_public=rsa_no_decrypt + ) + privkey = tpm_rsa_private_key(self.ectx, handle) + + padding = PKCS1v15() + with self.assertRaises(ValueError) as e: + privkey.decrypt(b"falafel", padding) + self.assertEqual( + str(e.exception), + "TPM key does not allow decryption (object attribute decrypt is not set)", + ) + + def test_rsa_no_sign(self): + rsa_no_sign = TPM2B_PUBLIC(rsa_template) + rsa_no_sign.publicArea.objectAttributes ^= TPMA_OBJECT.SIGN_ENCRYPT + handle, _, _, _, _ = self.ectx.create_primary( + in_sensitive=None, in_public=rsa_no_sign + ) + privkey = tpm_rsa_private_key(self.ectx, handle) + + padding = PKCS1v15() + halg = privkey.get_digest_algorithm() + with self.assertRaises(ValueError) as e: + privkey.sign(b"falafel", padding, halg()) + self.assertEqual( + str(e.exception), + "TPM key does not allow signing (object attribute sign_encrypt is not set)", + ) + + def test_rsa_prehashed(self): + handle, _, _, _, _ = self.ectx.create_primary( + in_sensitive=None, in_public=rsa_template + ) + privkey = tpm_rsa_private_key(self.ectx, handle) + + padding = PKCS1v15() + halg = privkey.get_digest_algorithm() + with self.assertRaises(ValueError) as e: + privkey.sign(b"falafel", padding, Prehashed(halg())) + self.assertEqual(str(e.exception), "Prehashed data is not supported") + + def test_rsa_unsupported_sig_scheme(self): + rsaes = TPM2B_PUBLIC(rsa_template) + rsaes.publicArea.parameters.rsaDetail.scheme.scheme = TPM2_ALG.RSAES + rsaes.publicArea.objectAttributes ^= TPMA_OBJECT.SIGN_ENCRYPT + handle, _, _, _, _ = self.ectx.create_primary( + in_sensitive=None, in_public=rsaes + ) + privkey = tpm_rsa_private_key(self.ectx, handle) + + with self.assertRaises(ValueError) as e: + privkey.get_signature_padding() + self.assertEqual(str(e.exception), "unsupported signature scheme rsaes") + + def test_rsa_unsupported_decrypt_scheme(self): + rsassa = TPM2B_PUBLIC(rsa_template) + rsassa.publicArea.parameters.rsaDetail.scheme.scheme = TPM2_ALG.RSASSA + rsassa.publicArea.objectAttributes ^= TPMA_OBJECT.DECRYPT + handle, _, _, _, _ = self.ectx.create_primary( + in_sensitive=None, in_public=rsassa + ) + privkey = tpm_rsa_private_key(self.ectx, handle) + + with self.assertRaises(ValueError) as e: + privkey.get_decryption_padding() + self.assertEqual(str(e.exception), "unsupported decryption scheme rsassa") + + def test_ecc_key(self): + handle, _, _, _, _ = self.ectx.create_primary( + in_sensitive=None, in_public=ecc_template + ) + privkey = tpm_ecc_private_key(self.ectx, handle) + self.assertEqual(privkey.key_size, 256) + self.assertIsInstance(privkey.curve, ec.SECP256R1) + + with self.assertRaises(NotImplementedError) as e: + privkey.private_numbers() + + with self.assertRaises(NotImplementedError) as e: + privkey.private_bytes(encoding=None, format=None, encryption_algorithm=None) + + def test_ecc_key_bad_type(self): + handle, _, _, _, _ = self.ectx.create_primary( + in_sensitive=None, in_public=rsa_template + ) + with self.assertRaises(ValueError) as e: + tpm_ecc_private_key(self.ectx, handle) + self.assertEqual(str(e.exception), "invalid key type, expected ecc, got rsa") + + def test_ecc_key_restricted(self): + handle, _, _, _, _ = self.ectx.create_primary( + in_sensitive=None, in_public="ecc256" + ) + with self.assertRaises(ValueError) as e: + tpm_ecc_private_key(self.ectx, handle) + self.assertEqual( + str(e.exception), + "TPM key does not allow generic signing and/or decryption (object attribute restricted is set)", + ) + + def test_ecc_exchange(self): + handle, _, _, _, _ = self.ectx.create_primary( + in_sensitive=None, in_public=ecc_template + ) + privkey = tpm_ecc_private_key(self.ectx, handle) + + peer_key = ec.generate_private_key(privkey.curve) + peer_public_key = peer_key.public_key() + + tpm_shared_key = privkey.exchange(ec.ECDH(), peer_public_key) + pyca_shared_key = peer_key.exchange(ec.ECDH(), privkey.public_key()) + self.assertEqual(tpm_shared_key, pyca_shared_key) + + def test_ecc_sign(self): + handle, _, _, _, _ = self.ectx.create_primary( + in_sensitive=None, in_public=ecc_template + ) + privkey = tpm_ecc_private_key(self.ectx, handle) + pubkey = privkey.public_key() + + sigalg = privkey.get_signature_algorithm() + sig = privkey.sign(b"falafel", sigalg) + + pubkey.verify(sig, b"falafel", sigalg) + + def test_ecc_sign_with_scheme(self): + ecc_ecdsa = TPM2B_PUBLIC(ecc_template) + ecc_ecdsa.publicArea.parameters.eccDetail.scheme.scheme = TPM2_ALG.ECDSA + ecc_ecdsa.publicArea.objectAttributes ^= TPMA_OBJECT.DECRYPT + handle, _, _, _, _ = self.ectx.create_primary( + in_sensitive=None, in_public=ecc_ecdsa + ) + privkey = tpm_ecc_private_key(self.ectx, handle) + pubkey = privkey.public_key() + + sigalg = privkey.get_signature_algorithm() + sig = privkey.sign(b"falafel", sigalg) + + pubkey.verify(sig, b"falafel", sigalg) + + def test_ecc_no_decrypt(self): + ecc_no_decrypt = TPM2B_PUBLIC(ecc_template) + ecc_no_decrypt.publicArea.objectAttributes ^= TPMA_OBJECT.DECRYPT + handle, _, _, _, _ = self.ectx.create_primary( + in_sensitive=None, in_public=ecc_no_decrypt + ) + privkey = tpm_ecc_private_key(self.ectx, handle) + + peer_key = ec.generate_private_key(privkey.curve) + peer_public_key = peer_key.public_key() + + with self.assertRaises(ValueError) as e: + privkey.exchange(ec.ECDH(), peer_public_key) + self.assertEqual( + str(e.exception), + "TPM key does not allow ECDH key exchange (object attribute decrypt is not set)", + ) + + def test_ecc_different_curves(self): + handle, _, _, _, _ = self.ectx.create_primary( + in_sensitive=None, in_public=ecc_template + ) + privkey = tpm_ecc_private_key(self.ectx, handle) + + peer_key = ec.generate_private_key(ec.SECP192R1()) + peer_public_key = peer_key.public_key() + + with self.assertRaises(ValueError) as e: + privkey.exchange(ec.ECDH(), peer_public_key) + self.assertEqual( + str(e.exception), + "curve mismatch for peer key, got secp192r1, expected secp256r1", + ) + + def test_ecc_no_sign(self): + ecc_no_sign = TPM2B_PUBLIC(ecc_template) + ecc_no_sign.publicArea.objectAttributes ^= TPMA_OBJECT.SIGN_ENCRYPT + handle, _, _, _, _ = self.ectx.create_primary( + in_sensitive=None, in_public=ecc_no_sign + ) + privkey = tpm_ecc_private_key(self.ectx, handle) + + halg = privkey.get_digest_algorithm() + sigalg = ec.ECDSA(halg()) + with self.assertRaises(ValueError) as e: + privkey.sign(b"falafel", sigalg) + self.assertEqual( + str(e.exception), + "TPM key does not allow signing (object attribute sign_encrypt is not set)", + ) + + def test_ecc_prehashed(self): + handle, _, _, _, _ = self.ectx.create_primary( + in_sensitive=None, in_public=ecc_template + ) + privkey = tpm_ecc_private_key(self.ectx, handle) + + halg = privkey.get_digest_algorithm() + sigalg = ec.ECDSA(Prehashed(halg())) + with self.assertRaises(ValueError) as e: + privkey.sign(b"falafel", sigalg) + self.assertEqual(str(e.exception), "Prehashed data is not supported") + + def test_ecc_unsupported_curve(self): + ecc_brainpool = TPM2B_PUBLIC(ecc_template) + ecc_brainpool.publicArea.parameters.eccDetail.curveID = TPM2_ECC.BN_P256 + handle, _, _, _, _ = self.ectx.create_primary( + in_sensitive=None, in_public=ecc_brainpool + ) + + with self.assertRaises(ValueError) as e: + tpm_ecc_private_key(self.ectx, handle) + self.assertEqual(str(e.exception), "unsupported curve bn_p256") + + def test_ecc_unsupported_scheme(self): + ecc_ecdaa = TPM2B_PUBLIC(ecc_template) + ecc_ecdaa.publicArea.parameters.eccDetail.scheme.scheme = TPM2_ALG.ECDAA + ecc_ecdaa.publicArea.objectAttributes ^= TPMA_OBJECT.DECRYPT + + handle, _, _, _, _ = self.ectx.create_primary( + in_sensitive=None, in_public=ecc_ecdaa + ) + privkey = tpm_ecc_private_key(self.ectx, handle) + + with self.assertRaises(ValueError) as e: + privkey.get_signature_algorithm() + self.assertEqual(str(e.exception), "unsupported signature scheme ecdaa") + + def test_scheme_mismatch(self): + rsassa = TPM2B_PUBLIC(rsa_template) + rsassa.publicArea.parameters.rsaDetail.scheme.scheme = TPM2_ALG.RSASSA + rsassa.publicArea.objectAttributes ^= TPMA_OBJECT.DECRYPT + + handle, _, _, _, _ = self.ectx.create_primary( + in_sensitive=None, in_public=rsassa + ) + privkey = tpm_rsa_private_key(self.ectx, handle) + + padding = PSS( + mgf=MGF1(algorithm=hashes.SHA256()), salt_length=PSS.DIGEST_LENGTH + ) + + with self.assertRaises(ValueError) as e: + privkey.sign(b"falafel", padding, hashes.SHA256()) + self.assertEqual( + str(e.exception), + "invalid scheme, scheme has rsapss but key requires rsassa", + ) + + def test_scheme_digest_mismatch(self): + rsassa = TPM2B_PUBLIC(rsa_template) + rsassa.publicArea.parameters.rsaDetail.scheme.scheme = TPM2_ALG.RSASSA + rsassa.publicArea.parameters.rsaDetail.scheme.details.anySig.hashAlg = ( + TPM2_ALG.SHA1 + ) + rsassa.publicArea.objectAttributes ^= TPMA_OBJECT.DECRYPT + + handle, _, _, _, _ = self.ectx.create_primary( + in_sensitive=None, in_public=rsassa + ) + privkey = tpm_rsa_private_key(self.ectx, handle) + + padding = PKCS1v15() + + with self.assertRaises(ValueError) as e: + privkey.sign(b"falafel", padding, hashes.SHA256()) + self.assertEqual( + str(e.exception), + "digest algorithm mismatch, scheme has sha256 but key requires sha", + ) + + def test_scheme_digest_mismatch_oaep(self): + rsa_oaep = TPM2B_PUBLIC(rsa_template) + rsa_oaep.publicArea.parameters.rsaDetail.scheme.scheme = TPM2_ALG.OAEP + rsa_oaep.publicArea.parameters.rsaDetail.scheme.details.anySig.hashAlg = ( + TPM2_ALG.SHA256 + ) + rsa_oaep.publicArea.objectAttributes ^= TPMA_OBJECT.SIGN_ENCRYPT + handle, _, _, _, _ = self.ectx.create_primary( + in_sensitive=None, in_public=rsa_oaep + ) + privkey = tpm_rsa_private_key(self.ectx, handle) + + padding = OAEP( + mgf=MGF1(algorithm=hashes.SHA512()), algorithm=hashes.SHA384(), label=b"" + ) + + with self.assertRaises(ValueError) as e: + privkey.decrypt(b"falafel", padding) + self.assertEqual( + str(e.exception), + "digest algorithm mismatch, scheme has sha384 but key requires sha256", + ) + + def test_cert_builder_rsa(self): + handle, _, _, _, _ = self.ectx.create_primary( + in_sensitive=None, in_public=rsa_template + ) + privkey = tpm_rsa_private_key(self.ectx, handle) + pubkey = privkey.public_key() + + builder = x509.CertificateBuilder() + builder = builder.subject_name( + x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "falafel"),]) + ) + builder = builder.issuer_name( + x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "falafel"),]) + ) + builder = builder.serial_number(x509.random_serial_number()) + one_day = datetime.timedelta(1, 0, 0) + builder = builder.not_valid_before(datetime.datetime.today() - one_day) + builder = builder.not_valid_after(datetime.datetime.today() + one_day) + builder = builder.public_key(pubkey) + + halg = privkey.get_digest_algorithm() + cert = builder.sign(privkey, algorithm=halg()) + cert.verify_directly_issued_by(cert) + + def test_csr_builder_rsa(self): + handle, _, _, _, _ = self.ectx.create_primary( + in_sensitive=None, in_public=rsa_template + ) + privkey = tpm_rsa_private_key(self.ectx, handle) + + builder = x509.CertificateSigningRequestBuilder() + builder = builder.subject_name( + x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "falafel"),]) + ) + halg = privkey.get_digest_algorithm() + csr = builder.sign(privkey, algorithm=halg()) + self.assertEqual(csr.is_signature_valid, True) + + def test_cert_builder_ecc(self): + handle, _, _, _, _ = self.ectx.create_primary( + in_sensitive=None, in_public=ecc_template + ) + privkey = tpm_ecc_private_key(self.ectx, handle) + pubkey = privkey.public_key() + + builder = x509.CertificateBuilder() + builder = builder.subject_name( + x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "falafel"),]) + ) + builder = builder.issuer_name( + x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "falafel"),]) + ) + builder = builder.serial_number(x509.random_serial_number()) + one_day = datetime.timedelta(1, 0, 0) + builder = builder.not_valid_before(datetime.datetime.today() - one_day) + builder = builder.not_valid_after(datetime.datetime.today() + one_day) + builder = builder.public_key(pubkey) + + halg = privkey.get_digest_algorithm() + cert = builder.sign(privkey, algorithm=halg()) + cert.verify_directly_issued_by(cert) + + def test_csr_builder_ecc(self): + handle, _, _, _, _ = self.ectx.create_primary( + in_sensitive=None, in_public=ecc_template + ) + privkey = tpm_ecc_private_key(self.ectx, handle) + + builder = x509.CertificateSigningRequestBuilder() + builder = builder.subject_name( + x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "falafel"),]) + ) + halg = privkey.get_digest_algorithm() + csr = builder.sign(privkey, algorithm=halg()) + self.assertEqual(csr.is_signature_valid, True)