From 0d070cfa49210cb9f01a0d101db313c2fac0f212 Mon Sep 17 00:00:00 2001 From: Alex Stapleton Date: Wed, 23 Apr 2014 21:51:47 +0100 Subject: [PATCH] OpenSSL key loading implementation --- .../hazmat/backends/openssl/backend.py | 122 +++++++++- .../hazmat/primitives/serialization.py | 20 ++ docs/hazmat/primitives/asymmetric/index.rst | 1 + .../primitives/asymmetric/serialization.rst | 48 ++++ tests/hazmat/backends/test_openssl.py | 13 + tests/hazmat/primitives/test_serialization.py | 226 ++++++++++++++++++ 6 files changed, 429 insertions(+), 1 deletion(-) create mode 100644 cryptography/hazmat/primitives/serialization.py create mode 100644 docs/hazmat/primitives/asymmetric/serialization.rst create mode 100644 tests/hazmat/primitives/test_serialization.py diff --git a/cryptography/hazmat/backends/openssl/backend.py b/cryptography/hazmat/backends/openssl/backend.py index e00be92f934c..aa3a93507b4e 100644 --- a/cryptography/hazmat/backends/openssl/backend.py +++ b/cryptography/hazmat/backends/openssl/backend.py @@ -26,7 +26,7 @@ ) from cryptography.hazmat.backends.interfaces import ( CMACBackend, CipherBackend, DSABackend, HMACBackend, HashBackend, - PBKDF2HMACBackend, RSABackend + PBKDF2HMACBackend, RSABackend, TraditionalOpenSSLSerializationBackend ) from cryptography.hazmat.bindings.openssl.binding import Binding from cryptography.hazmat.primitives import hashes, interfaces @@ -42,6 +42,7 @@ ) +_MemoryBIO = collections.namedtuple("_MemoryBIO", ["bio", "char_ptr"]) _OpenSSLError = collections.namedtuple("_OpenSSLError", ["code", "lib", "func", "reason"]) @@ -53,6 +54,7 @@ @utils.register_interface(HMACBackend) @utils.register_interface(PBKDF2HMACBackend) @utils.register_interface(RSABackend) +@utils.register_interface(TraditionalOpenSSLSerializationBackend) class Backend(object): """ OpenSSL API binding interfaces. @@ -371,6 +373,32 @@ def _rsa_public_key_to_evp_pkey(self, public_key): return evp_pkey + def _bytes_to_bio(self, data): + """ + Return a _MemoryBIO namedtuple of (BIO, char*). + + The char* is the storage for the BIO and it must stay alive until the + BIO is finished with. + """ + data_char_p = backend._ffi.new("char[]", data) + bio = backend._lib.BIO_new_mem_buf( + data_char_p, len(data) + ) + assert bio != self._ffi.NULL + + return _MemoryBIO(self._ffi.gc(bio, self._lib.BIO_free), data_char_p) + + def _evp_pkey_to_private_key(self, evp_pkey): + type = evp_pkey.type + + if type == self._lib.EVP_PKEY_RSA: + rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey) + assert rsa_cdata != self._ffi.NULL + rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) + return self._rsa_cdata_to_private_key(rsa_cdata) + else: + raise UnsupportedAlgorithm("Unsupported key type.") + def _rsa_cdata_to_private_key(self, cdata): return rsa.RSAPrivateKey( p=self._bn_to_int(cdata.p), @@ -383,6 +411,34 @@ def _rsa_cdata_to_private_key(self, cdata): modulus=self._bn_to_int(cdata.n), ) + def _pem_password_cb(self, password): + """ + Generate a pem_password_cb that returns password + + typedef int pem_password_cb(char *buf, int size, + int rwflag, void *userdata); + + suitable for decrypting PKCS8 files and so on + """ + + def pem_password_cb(buf, size, writing, userdata): + pem_password_cb.called += 1 + + if not password or len(password) >= size: + return 0 + else: + pw_buf = self._ffi.buffer(buf, size) + pw_buf[:len(password)] = password + return len(password) + + pem_password_cb.called = 0 + + return ( + self._ffi.callback("int (char *, int, int, void *)", + pem_password_cb), + pem_password_cb + ) + def _rsa_cdata_from_private_key(self, private_key): # Does not GC the RSA cdata. You *must* make sure it's freed # correctly yourself! @@ -656,6 +712,70 @@ def cmac_algorithm_supported(self, algorithm): def create_cmac_ctx(self, algorithm): return _CMACContext(self, algorithm) + def load_traditional_openssl_pem_private_key(self, data, password): + mem_bio = self._bytes_to_bio(data) + + password_callback, password_func = self._pem_password_cb(password) + + evp_pkey = self._lib.PEM_read_bio_PrivateKey( + mem_bio.bio, + self._ffi.NULL, + password_callback, + self._ffi.NULL + ) + + if evp_pkey == self._ffi.NULL: + errors = self._consume_errors() + if not errors: + raise ValueError("Could not unserialize key data.") + + if errors[0][1:] == ( + self._lib.ERR_LIB_PEM, + self._lib.PEM_F_PEM_DO_HEADER, + self._lib.PEM_R_BAD_PASSWORD_READ + ): + assert not password + raise TypeError( + "Password was not given but private key is encrypted.") + + elif errors[0][1:] == ( + self._lib.ERR_LIB_EVP, + self._lib.EVP_F_EVP_DECRYPTFINAL_EX, + self._lib.EVP_R_BAD_DECRYPT + ): + raise ValueError( + "Bad decrypt. Incorrect password?" + ) + + elif errors[0][1:] == ( + self._lib.ERR_LIB_PEM, + self._lib.PEM_F_PEM_GET_EVP_CIPHER_INFO, + self._lib.PEM_R_UNSUPPORTED_ENCRYPTION + ): + raise UnsupportedAlgorithm( + "PEM data is encrypted with an unsupported cipher") + + else: + assert errors[0][1] in ( + self._lib.ERR_LIB_EVP, + self._lib.ERR_LIB_PEM, + self._lib.ERR_LIB_ASN1, + ) + raise ValueError("Could not unserialize key data.") + + evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) + + if password is not None and password_func.called == 0: + raise TypeError( + "Password was given but private key is not encrypted.") + + assert ( + (password is not None and password_func.called == 1) or + password is None + ) + + return self._evp_pkey_to_private_key(evp_pkey) + class GetCipherByName(object): def __init__(self, fmt): diff --git a/cryptography/hazmat/primitives/serialization.py b/cryptography/hazmat/primitives/serialization.py new file mode 100644 index 000000000000..389375085dda --- /dev/null +++ b/cryptography/hazmat/primitives/serialization.py @@ -0,0 +1,20 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import, division, print_function + + +def load_pem_traditional_openssl_private_key(data, password, backend): + return backend.load_traditional_openssl_pem_private_key( + data, password + ) diff --git a/docs/hazmat/primitives/asymmetric/index.rst b/docs/hazmat/primitives/asymmetric/index.rst index ca048d118ca6..047f9cb92d01 100644 --- a/docs/hazmat/primitives/asymmetric/index.rst +++ b/docs/hazmat/primitives/asymmetric/index.rst @@ -9,3 +9,4 @@ Asymmetric algorithms dsa rsa padding + serialization diff --git a/docs/hazmat/primitives/asymmetric/serialization.rst b/docs/hazmat/primitives/asymmetric/serialization.rst new file mode 100644 index 000000000000..4d079b5a723a --- /dev/null +++ b/docs/hazmat/primitives/asymmetric/serialization.rst @@ -0,0 +1,48 @@ +.. hazmat:: + +Key Serialization +================= + +.. currentmodule:: cryptography.hazmat.primitives.serialization + +There are several common schemes for serializing asymmetric private and public +keys to bytes. They generally support encryption of private keys and additional +key metadata. + + +Traditional OpenSSL Format +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The "traditional" PKCS #1 based serialization format used by OpenSSL. +It supports password based symmetric key encryption. Commonly found in +OpenSSL based TLS applications. It is usually found in PEM format with a +header that mentions the type of the serialized key. e.g. +``-----BEGIN RSA PRIVATE KEY-----``. + +.. function:: load_pem_traditional_openssl_private_key(data, password, backend) + + .. versionadded:: 0.4 + + Deserialize a private key from PEM encoded data to one of the supported + asymmetric private key types. + + :param bytes data: The PEM encoded key data. + + :param bytes password: The password to use to decrypt the data. Should + be ``None`` if the private key is not encrypted. + :param backend: A + :class:`~cryptography.hazmat.backends.interfaces.TraditionalOpenSSLSerializationBackend` + provider. + + :returns: A new instance of a private key. + + :raises ValueError: If the PEM data could not be decrypted or if its + structure could not be decoded successfully. + + :raises TypeError: If a ``password`` was given and the private key was + not encrypted. Or if the key was encrypted but no + password was supplied. + + :raises UnsupportedAlgorithm: If the serialized key is of a type that + is not supported by the backend or if the key is encrypted with a + symmetric cipher that is not supported by the backend. diff --git a/tests/hazmat/backends/test_openssl.py b/tests/hazmat/backends/test_openssl.py index bba7d758b55f..37347bc84cf9 100644 --- a/tests/hazmat/backends/test_openssl.py +++ b/tests/hazmat/backends/test_openssl.py @@ -13,6 +13,8 @@ from __future__ import absolute_import, division, print_function +import pretend + import pytest from cryptography import utils @@ -358,3 +360,14 @@ def __init__(self): with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_CIPHER): backend.create_cmac_ctx(FakeAlgorithm()) + + +class TestOpenSSLSerialisationWithOpenSSL(object): + def test_password_too_long(self): + ffi_cb, cb = backend._pem_password_cb(b"aa") + assert cb(None, 1, False, None) == 0 + + def test_unsupported_evp_pkey_type(self): + key = pretend.stub(type="unsupported") + with raises_unsupported_algorithm(None): + backend._evp_pkey_to_private_key(key) diff --git a/tests/hazmat/primitives/test_serialization.py b/tests/hazmat/primitives/test_serialization.py new file mode 100644 index 000000000000..7e6987c484c1 --- /dev/null +++ b/tests/hazmat/primitives/test_serialization.py @@ -0,0 +1,226 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from __future__ import absolute_import, division, print_function + +import os +import textwrap + +import pytest + +from cryptography.hazmat.primitives.serialization import ( + load_pem_traditional_openssl_private_key +) + +from .utils import _check_rsa_private_key, load_vectors_from_file +from ...utils import raises_unsupported_algorithm + + +@pytest.mark.traditional_openssl_serialization +class TestTraditionalOpenSSLSerialisation(object): + @pytest.mark.parametrize( + ("key_file", "password"), + [ + ("key1.pem", b"123456"), + ("key2.pem", b"a123456"), + ("testrsa.pem", None), + ("testrsa-encrypted.pem", b"password"), + ] + ) + def test_load_pem_rsa_private_key(self, key_file, password, backend): + key = load_vectors_from_file( + os.path.join( + "asymmetric", "Traditional_OpenSSL_Serialization", key_file), + lambda pemfile: load_pem_traditional_openssl_private_key( + pemfile.read().encode(), password, backend + ) + ) + + assert key + _check_rsa_private_key(key) + + def test_key1_pem_encrypted_values(self, backend): + pkey = load_vectors_from_file( + os.path.join( + "asymmetric", "Traditional_OpenSSL_Serialization", "key1.pem"), + lambda pemfile: load_pem_traditional_openssl_private_key( + pemfile.read().encode(), b"123456", backend + ) + ) + assert pkey + + assert pkey.p == int( + "fb7d316fc51531b36d93adaefaf52db6ad5beb793d37c4cf9dfc1ddd17cfbafb", + 16 + ) + assert pkey.q == int( + "df98264e646de9a0fbeab094e31caad5bc7adceaaae3c800ca0275dd4bb307f5", + 16 + ) + assert pkey.private_exponent == int( + "db4848c36f478dd5d38f35ae519643b6b810d404bcb76c00e44015e56ca1cab0" + "7bb7ae91f6b4b43fcfc82a47d7ed55b8c575152116994c2ce5325ec24313b911", + 16 + ) + assert pkey.dmp1 == int( + "ce997f967192c2bcc3853186f1559fd355c190c58ddc15cbf5de9b6df954c727", + 16 + ) + assert pkey.dmq1 == int( + "b018a57ab20ffaa3862435445d863369b852cf70a67c55058213e3fe10e3848d", + 16 + ) + assert pkey.iqmp == int( + "6a8d830616924f5cf2d1bc1973f97fde6b63e052222ac7be06aa2532d10bac76", + 16 + ) + assert pkey.public_exponent == 65537 + assert pkey.modulus == int( + "dba786074f2f0350ce1d99f5aed5b520cfe0deb5429ec8f2a88563763f566e77" + "9814b7c310e5326edae31198eed439b845dd2db99eaa60f5c16a43f4be6bcf37", + 16 + ) + + def test_unused_password(self, backend): + key_file = os.path.join( + "asymmetric", "Traditional_OpenSSL_Serialization", "testrsa.pem") + password = b"this password will not be used" + + with pytest.raises(TypeError): + load_vectors_from_file( + key_file, + lambda pemfile: load_pem_traditional_openssl_private_key( + pemfile.read().encode(), password, backend + ) + ) + + def test_wrong_password(self, backend): + key_file = os.path.join( + "asymmetric", + "Traditional_OpenSSL_Serialization", + "testrsa-encrypted.pem" + ) + password = b"this password is wrong" + + with pytest.raises(ValueError): + load_vectors_from_file( + key_file, + lambda pemfile: load_pem_traditional_openssl_private_key( + pemfile.read().encode(), password, backend + ) + ) + + @pytest.mark.parametrize("password", [None, b""]) + def test_missing_password(self, backend, password): + key_file = os.path.join( + "asymmetric", + "Traditional_OpenSSL_Serialization", + "testrsa-encrypted.pem" + ) + + with pytest.raises(TypeError): + load_vectors_from_file( + key_file, + lambda pemfile: load_pem_traditional_openssl_private_key( + pemfile.read().encode(), password, backend + ) + ) + + def test_wrong_format(self, backend): + key_data = b"---- NOT A KEY ----\n" + + with pytest.raises(ValueError): + load_pem_traditional_openssl_private_key( + key_data, None, backend + ) + + with pytest.raises(ValueError): + load_pem_traditional_openssl_private_key( + key_data, b"this password will not be used", backend + ) + + def test_corrupt_format(self, backend): + # privkey.pem with a bunch of data missing. + key_data = textwrap.dedent("""\ + -----BEGIN RSA PRIVATE KEY----- + MIIBPAIBAAJBAKrbeqkuRk8VcRmWFmtP+LviMB3+6dizWW3DwaffznyHGAFwUJ/I + Tv0XtbsCyl3QoyKGhrOAy3RvPK5M38iuXT0CAwEAAQJAZ3cnzaHXM/bxGaR5CR1R + rD1qFBAVfoQFiOH9uPJgMaoAuoQEisPHVcZDKcOv4wEg6/TInAIXBnEigtqvRzuy + mvcpHZwQJdmdHHkGKAs37Dfxi67HbkUCIQCeZGliHXFa071Fp06ZeWlR2ADonTZz + rJBhdTe0v5pCeQIhAIZfkiGgGBX4cIuuckzEm43g9WMUjxP/0GlK39vIyihxAiEA + mymehFRT0MvqW5xAKAx7Pgkt8HVKwVhc2LwGKHE0DZM= + -----END RSA PRIVATE KEY----- + """).encode() + + with pytest.raises(ValueError): + load_pem_traditional_openssl_private_key( + key_data, None, backend + ) + + with pytest.raises(ValueError): + load_pem_traditional_openssl_private_key( + key_data, b"this password will not be used", backend + ) + + def test_encrypted_corrupt_format(self, backend): + # privkey.pem with a single bit flipped + key_data = textwrap.dedent("""\ + -----BEGIN RSA PRIVATE KEY----- + Proc-Type: <,ENCRYPTED + DEK-Info: AES-128-CBC,5E22A2BD85A653FB7A3ED20DE84F54CD + + hAqtb5ZkTMGcs4BBDQ1SKZzdQThWRDzEDxM3qBfjvYa35KxZ54aic013mW/lwj2I + v5bbpOjrHYHNAiZYZ7RNb+ztbF6F/g5PA5g7mFwEq+LFBY0InIplYBSv9QtE+lot + Dy4AlZa/+NzJwgdKDb+JVfk5SddyD4ywnyeORnMPy4xXKvjXwmW+iLibZVKsjIgw + H8hSxcD+FhWyJm9h9uLtmpuqhQo0jTUYpnTezZx2xeVPB53Ev7YCxR9Nsgj5GsVf + 9Z/hqLB7IFgM3pa0z3PQeUIZF/cEf72fISWIOBwwkzVrPUkXWfbuWeJXQXSs3amE + 5A295jD9BQp9CY0nNFSsy+qiXWToq2xT3y5zVNEStmN0SCGNaIlUnJzL9IHW+oMI + kPmXZMnAYBWeeCF1gf3J3aE5lZInegHNfEI0+J0LazC2aNU5Dg/BNqrmRqKWEIo/ + -----END RSA PRIVATE KEY----- + """).encode() + + password = b"this password is wrong" + + with pytest.raises(ValueError): + load_pem_traditional_openssl_private_key( + key_data, None, backend + ) + + with pytest.raises(ValueError): + load_pem_traditional_openssl_private_key( + key_data, password, backend + ) + + def test_unsupported_key_encryption(self, backend): + key_data = textwrap.dedent("""\ + -----BEGIN RSA PRIVATE KEY----- + Proc-Type: 4,ENCRYPTED + DEK-Info: FAKE-123,5E22A2BD85A653FB7A3ED20DE84F54CD + + hAqtb5ZkTMGcs4BBDQ1SKZzdQThWRDzEDxM3qBfjvYa35KxZ54aic013mW/lwj2I + v5bbpOjrHYHNAiZYZ7RNb+ztbF6F/g5PA5g7mFwEq+LFBY0InIplYBSv9QtE+lot + Dy4AlZa/+NzJwgdKDb+JVfk5SddyD4ywnyeORnMPy4xXKvjXwmW+iLibZVKsjIgw + H8hSxcD+FhWyJm9h9uLtmpuqhQo0jTUYpnTezZx2xeVPB53Ev7YCxR9Nsgj5GsVf + 9Z/hqLB7IFgM3pa0z3PQeUIZF/cEf72fISWIOBwwkzVrPUkXWfbuWeJXQXSs3amE + 5A295jD9BQp9CY0nNFSsy+qiXWToq2xT3y5zVNEStmN0SCGNaIlUnJzL9IHW+oMI + kPmXZMnAYBWeeCF1gf3J3aE5lZInegHNfEI0+J0LazC2aNU5Dg/BNqrmRqKWEIo/ + -----END RSA PRIVATE KEY----- + """).encode() + + password = b"password" + + with raises_unsupported_algorithm(None): + load_pem_traditional_openssl_private_key( + key_data, password, backend + )