Skip to content

Commit

Permalink
OpenSSL key loading implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
public committed May 3, 2014
1 parent 1c851f6 commit 0d070cf
Show file tree
Hide file tree
Showing 6 changed files with 429 additions and 1 deletion.
122 changes: 121 additions & 1 deletion cryptography/hazmat/backends/openssl/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
)
from cryptography.hazmat.backends.interfaces import (
CMACBackend, CipherBackend, DSABackend, HMACBackend, HashBackend,
PBKDF2HMACBackend, RSABackend
PBKDF2HMACBackend, RSABackend, TraditionalOpenSSLSerializationBackend
)
from cryptography.hazmat.bindings.openssl.binding import Binding
from cryptography.hazmat.primitives import hashes, interfaces
Expand All @@ -42,6 +42,7 @@
)


_MemoryBIO = collections.namedtuple("_MemoryBIO", ["bio", "char_ptr"])
_OpenSSLError = collections.namedtuple("_OpenSSLError",
["code", "lib", "func", "reason"])

Expand All @@ -53,6 +54,7 @@
@utils.register_interface(HMACBackend)
@utils.register_interface(PBKDF2HMACBackend)
@utils.register_interface(RSABackend)
@utils.register_interface(TraditionalOpenSSLSerializationBackend)
class Backend(object):
"""
OpenSSL API binding interfaces.
Expand Down Expand Up @@ -371,6 +373,32 @@ def _rsa_public_key_to_evp_pkey(self, public_key):

return evp_pkey

def _bytes_to_bio(self, data):
"""
Return a _MemoryBIO namedtuple of (BIO, char*).
The char* is the storage for the BIO and it must stay alive until the
BIO is finished with.
"""
data_char_p = backend._ffi.new("char[]", data)
bio = backend._lib.BIO_new_mem_buf(
data_char_p, len(data)
)
assert bio != self._ffi.NULL

return _MemoryBIO(self._ffi.gc(bio, self._lib.BIO_free), data_char_p)

def _evp_pkey_to_private_key(self, evp_pkey):
type = evp_pkey.type

if type == self._lib.EVP_PKEY_RSA:
rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
assert rsa_cdata != self._ffi.NULL
rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
return self._rsa_cdata_to_private_key(rsa_cdata)
else:
raise UnsupportedAlgorithm("Unsupported key type.")

def _rsa_cdata_to_private_key(self, cdata):
return rsa.RSAPrivateKey(
p=self._bn_to_int(cdata.p),
Expand All @@ -383,6 +411,34 @@ def _rsa_cdata_to_private_key(self, cdata):
modulus=self._bn_to_int(cdata.n),
)

def _pem_password_cb(self, password):
"""
Generate a pem_password_cb that returns password
typedef int pem_password_cb(char *buf, int size,
int rwflag, void *userdata);
suitable for decrypting PKCS8 files and so on
"""

def pem_password_cb(buf, size, writing, userdata):
pem_password_cb.called += 1

if not password or len(password) >= size:
return 0
else:
pw_buf = self._ffi.buffer(buf, size)
pw_buf[:len(password)] = password
return len(password)

pem_password_cb.called = 0

return (
self._ffi.callback("int (char *, int, int, void *)",
pem_password_cb),
pem_password_cb
)

def _rsa_cdata_from_private_key(self, private_key):
# Does not GC the RSA cdata. You *must* make sure it's freed
# correctly yourself!
Expand Down Expand Up @@ -656,6 +712,70 @@ def cmac_algorithm_supported(self, algorithm):
def create_cmac_ctx(self, algorithm):
return _CMACContext(self, algorithm)

def load_traditional_openssl_pem_private_key(self, data, password):
mem_bio = self._bytes_to_bio(data)

password_callback, password_func = self._pem_password_cb(password)

evp_pkey = self._lib.PEM_read_bio_PrivateKey(
mem_bio.bio,
self._ffi.NULL,
password_callback,
self._ffi.NULL
)

if evp_pkey == self._ffi.NULL:
errors = self._consume_errors()
if not errors:
raise ValueError("Could not unserialize key data.")

if errors[0][1:] == (
self._lib.ERR_LIB_PEM,
self._lib.PEM_F_PEM_DO_HEADER,
self._lib.PEM_R_BAD_PASSWORD_READ
):
assert not password
raise TypeError(
"Password was not given but private key is encrypted.")

elif errors[0][1:] == (
self._lib.ERR_LIB_EVP,
self._lib.EVP_F_EVP_DECRYPTFINAL_EX,
self._lib.EVP_R_BAD_DECRYPT
):
raise ValueError(
"Bad decrypt. Incorrect password?"
)

elif errors[0][1:] == (
self._lib.ERR_LIB_PEM,
self._lib.PEM_F_PEM_GET_EVP_CIPHER_INFO,
self._lib.PEM_R_UNSUPPORTED_ENCRYPTION
):
raise UnsupportedAlgorithm(
"PEM data is encrypted with an unsupported cipher")

else:
assert errors[0][1] in (
self._lib.ERR_LIB_EVP,
self._lib.ERR_LIB_PEM,
self._lib.ERR_LIB_ASN1,
)
raise ValueError("Could not unserialize key data.")

evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)

if password is not None and password_func.called == 0:
raise TypeError(
"Password was given but private key is not encrypted.")

assert (
(password is not None and password_func.called == 1) or
password is None
)

return self._evp_pkey_to_private_key(evp_pkey)


class GetCipherByName(object):
def __init__(self, fmt):
Expand Down
20 changes: 20 additions & 0 deletions cryptography/hazmat/primitives/serialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import, division, print_function


def load_pem_traditional_openssl_private_key(data, password, backend):
return backend.load_traditional_openssl_pem_private_key(
data, password
)
1 change: 1 addition & 0 deletions docs/hazmat/primitives/asymmetric/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ Asymmetric algorithms
dsa
rsa
padding
serialization
48 changes: 48 additions & 0 deletions docs/hazmat/primitives/asymmetric/serialization.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
.. hazmat::

Key Serialization
=================

.. currentmodule:: cryptography.hazmat.primitives.serialization

There are several common schemes for serializing asymmetric private and public
keys to bytes. They generally support encryption of private keys and additional
key metadata.


Traditional OpenSSL Format
~~~~~~~~~~~~~~~~~~~~~~~~~~

The "traditional" PKCS #1 based serialization format used by OpenSSL.
It supports password based symmetric key encryption. Commonly found in
OpenSSL based TLS applications. It is usually found in PEM format with a
header that mentions the type of the serialized key. e.g.
``-----BEGIN RSA PRIVATE KEY-----``.

.. function:: load_pem_traditional_openssl_private_key(data, password, backend)

.. versionadded:: 0.4

Deserialize a private key from PEM encoded data to one of the supported
asymmetric private key types.

:param bytes data: The PEM encoded key data.

:param bytes password: The password to use to decrypt the data. Should
be ``None`` if the private key is not encrypted.
:param backend: A
:class:`~cryptography.hazmat.backends.interfaces.TraditionalOpenSSLSerializationBackend`
provider.

:returns: A new instance of a private key.

:raises ValueError: If the PEM data could not be decrypted or if its
structure could not be decoded successfully.

:raises TypeError: If a ``password`` was given and the private key was
not encrypted. Or if the key was encrypted but no
password was supplied.

:raises UnsupportedAlgorithm: If the serialized key is of a type that
is not supported by the backend or if the key is encrypted with a
symmetric cipher that is not supported by the backend.
13 changes: 13 additions & 0 deletions tests/hazmat/backends/test_openssl.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

from __future__ import absolute_import, division, print_function

import pretend

import pytest

from cryptography import utils
Expand Down Expand Up @@ -358,3 +360,14 @@ def __init__(self):

with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_CIPHER):
backend.create_cmac_ctx(FakeAlgorithm())


class TestOpenSSLSerialisationWithOpenSSL(object):
def test_password_too_long(self):
ffi_cb, cb = backend._pem_password_cb(b"aa")
assert cb(None, 1, False, None) == 0

def test_unsupported_evp_pkey_type(self):
key = pretend.stub(type="unsupported")
with raises_unsupported_algorithm(None):
backend._evp_pkey_to_private_key(key)
Loading

0 comments on commit 0d070cf

Please sign in to comment.