From 12a3a336fb146c19402c3aa8e3599ca65b602941 Mon Sep 17 00:00:00 2001 From: Mathias Ertl Date: Sun, 5 Nov 2023 20:18:48 +0100 Subject: [PATCH] add deprecation path for old openssl-style subject strings --- ca/django_ca/management/actions.py | 15 +-- ca/django_ca/management/base.py | 38 ++++++- ca/django_ca/management/commands/init_ca.py | 24 +++-- .../management/commands/resign_cert.py | 16 +-- ca/django_ca/management/commands/sign_cert.py | 17 ++- ca/django_ca/profiles.py | 16 ++- ca/django_ca/tests/commands/test_init_ca.py | 10 +- .../tests/commands/test_resign_cert.py | 5 +- ca/django_ca/tests/commands/test_sign_cert.py | 101 ++++++++++++++---- ca/django_ca/tests/test_management_actions.py | 44 ++++---- ca/django_ca/tests/test_profiles.py | 97 ++++++++--------- ca/django_ca/tests/test_utils.py | 31 +++--- ca/django_ca/tests/test_verification.py | 3 +- ca/django_ca/typehints.py | 7 +- ca/django_ca/utils.py | 48 ++++----- 15 files changed, 298 insertions(+), 174 deletions(-) diff --git a/ca/django_ca/management/actions.py b/ca/django_ca/management/actions.py index 3d74d9f53..f10c36bb9 100644 --- a/ca/django_ca/management/actions.py +++ b/ca/django_ca/management/actions.py @@ -31,7 +31,7 @@ from django_ca.constants import EXTENSION_DEFAULT_CRITICAL, KEY_USAGE_NAMES, ReasonFlags from django_ca.models import Certificate, CertificateAuthority from django_ca.typehints import AllowedHashTypes, AlternativeNameExtensionType -from django_ca.utils import is_power2, parse_encoding, parse_general_name, x509_name +from django_ca.utils import is_power2, parse_encoding, parse_general_name ActionType = typing.TypeVar("ActionType") # pylint: disable=invalid-name ParseType = typing.TypeVar("ParseType") # pylint: disable=invalid-name @@ -455,7 +455,7 @@ def parse_value(self, value: str) -> ReasonFlags: return ReasonFlags[value] -class NameAction(SingleValueAction[str, x509.Name]): +class NameAction(SingleValueAction[str, str]): """Action to parse a string into a :py:class:`cg:~cryptography.x509.Name`. Note that this action does *not* take care of sorting the subject in any way. @@ -463,14 +463,15 @@ class NameAction(SingleValueAction[str, x509.Name]): >>> parser = argparse.ArgumentParser() >>> parser.add_argument('--name', action=NameAction) # doctest: +ELLIPSIS NameAction(...) - >>> parser.parse_args(["--name", "/CN=example.com"]) - Namespace(name=) + >>> parser.parse_args(["--name", "CN=example.com"]) + Namespace(name=CN=example.com) """ - def parse_value(self, value: str) -> x509.Name: + def parse_value(self, value: str) -> str: + # TODO: In django-ca 2.0, parse subject here directly using parse_name_rfc4514(). try: - return x509_name(value) - except ValueError as e: + return value + except ValueError as e: # pragma: no cover # pragma: only django-ca<2.0 raise argparse.ArgumentError(self, str(e)) diff --git a/ca/django_ca/management/base.py b/ca/django_ca/management/base.py index 10e62afc8..f6cd64824 100644 --- a/ca/django_ca/management/base.py +++ b/ca/django_ca/management/base.py @@ -39,8 +39,14 @@ from django_ca.management import actions, mixins from django_ca.models import CertificateAuthority, X509CertMixin from django_ca.profiles import Profile -from django_ca.typehints import ActionsContainer, AllowedHashTypes, ArgumentGroup, ExtensionMapping -from django_ca.utils import add_colons, name_for_display +from django_ca.typehints import ( + ActionsContainer, + AllowedHashTypes, + ArgumentGroup, + ExtensionMapping, + SubjectFormats, +) +from django_ca.utils import add_colons, name_for_display, parse_name_rfc4514, x509_name class BinaryOutputWrapper(OutputWrapper): @@ -247,6 +253,31 @@ def _add_extension(self, extensions: ExtensionMapping, value: x509.ExtensionType """Add an extension to the passed extension dictionary.""" extensions[value.oid] = x509.Extension(oid=value.oid, critical=critical, value=value) + def add_subject_format_option(self, parser: ActionsContainer) -> None: + """Add the --subject-format option.""" + parser.add_argument( + "--subject-format", + choices=("openssl", "rfc4514"), + default="openssl", + help='Format for parsing the subject. Use "openssl" (the default before django-ca 2.0) to pass ' + 'slash-separated subjects (e.g. "/C=AT/O=Org/CN=example.com") and "rfc4514" to pass RFC 4514 ' + 'conforming strings (e.g. "C=AT,O=Org,CN=example.com"). The default is %(default)s, but will ' + "switch to rfc4514 in django-ca 2.0. Support for openssl-style strings will be removed in " + "django-ca 2.2.", + ) + + def parse_x509_name(self, value: str, name_format: SubjectFormats) -> x509.Name: + """Parse a `name` in the given `format`.""" + if name_format == "openssl": + return x509_name(value) + if name_format == "rfc4514": + try: + return parse_name_rfc4514(value) + except ValueError as ex: + raise CommandError(str(ex)) from ex + # COVERAGE NOTE: Already covered by argparse + raise ValueError(f"{name_format}: Unknown subject format.") # pragma: no cover + def add_authority_information_access_group( self, parser: CommandParser, @@ -463,6 +494,9 @@ def add_subject_group(self, parser: CommandParser) -> None: """Add argument for a subject.""" group = parser.add_argument_group("Certificate subject", self.subject_help) + # Add the --subject-format option + self.add_subject_format_option(group) + # NOTE: Don't set the default value here because it would mask the user not setting anything at all. self.add_subject( group, diff --git a/ca/django_ca/management/commands/init_ca.py b/ca/django_ca/management/commands/init_ca.py index 0873d9d62..febdaf310 100644 --- a/ca/django_ca/management/commands/init_ca.py +++ b/ca/django_ca/management/commands/init_ca.py @@ -34,7 +34,13 @@ from django_ca.management.mixins import CertificateAuthorityDetailMixin from django_ca.models import CertificateAuthority from django_ca.tasks import cache_crl, generate_ocsp_key, run_task -from django_ca.typehints import AllowedHashTypes, ArgumentGroup, ExtensionMapping, ParsableKeyType +from django_ca.typehints import ( + AllowedHashTypes, + ArgumentGroup, + ExtensionMapping, + ParsableKeyType, + SubjectFormats, +) from django_ca.utils import ( format_general_name, parse_general_name, @@ -148,6 +154,7 @@ def add_arguments(self, parser: CommandParser) -> None: default=timedelta(365 * 10), help="CA certificate expires in DAYS days (default: %(default)s).", ) + self.add_subject_format_option(general_group) self.add_algorithm( general_group, default_text=f"{default} for RSA/EC keys, {dsa_default} for DSA keys" ) @@ -247,7 +254,7 @@ def add_arguments(self, parser: CommandParser) -> None: def handle( # pylint: disable=too-many-arguments,too-many-locals,too-many-branches,too-many-statements self, name: str, - subject: x509.Name, + subject: str, parent: Optional[CertificateAuthority], expires: timedelta, key_size: Optional[int], @@ -299,6 +306,8 @@ def handle( # pylint: disable=too-many-arguments,too-many-locals,too-many-branc # OCSP responder configuration ocsp_responder_key_validity: Optional[int], ocsp_response_validity: Optional[int], + # subject_format will be removed in django-ca 2.2 + subject_format: SubjectFormats, **options: Any, ) -> None: if not os.path.exists(ca_settings.CA_DIR): # pragma: no cover @@ -356,16 +365,19 @@ def handle( # pylint: disable=too-many-arguments,too-many-locals,too-many-branc responder_value = format_general_name(ca_issuer.access_location) raise CommandError(f"{responder_value}: CA issuer cannot be added to root CAs.") + # Parse the subject + parsed_subject = self.parse_x509_name(subject, subject_format) + # We require a valid common name - common_name = next((attr.value for attr in subject if attr.oid == NameOID.COMMON_NAME), False) + common_name = next((attr.value for attr in parsed_subject if attr.oid == NameOID.COMMON_NAME), False) if not common_name: - raise CommandError("Subject must contain a common name (/CN=...).") + raise CommandError("Subject must contain a common name (CN=...).") # See if we can work with the private key if parent: self.test_private_key(parent, parent_password) - subject = sort_name(subject) + parsed_subject = sort_name(parsed_subject) extensions: ExtensionMapping = { ExtensionOID.KEY_USAGE: x509.Extension( oid=ExtensionOID.KEY_USAGE, critical=key_usage_critical, value=key_usage @@ -464,7 +476,7 @@ def handle( # pylint: disable=too-many-arguments,too-many-locals,too-many-branc try: ca = CertificateAuthority.objects.init( name=name, - subject=subject, + subject=parsed_subject, expires=expires_datetime, algorithm=algorithm, parent=parent, diff --git a/ca/django_ca/management/commands/resign_cert.py b/ca/django_ca/management/commands/resign_cert.py index 5a4d9ff7c..398286eeb 100644 --- a/ca/django_ca/management/commands/resign_cert.py +++ b/ca/django_ca/management/commands/resign_cert.py @@ -29,7 +29,7 @@ from django_ca.management.base import BaseSignCertCommand from django_ca.models import Certificate, CertificateAuthority, Watcher from django_ca.profiles import Profile, profiles -from django_ca.typehints import AllowedHashTypes, ExtensionMapping +from django_ca.typehints import AllowedHashTypes, ExtensionMapping, SubjectFormats class Command(BaseSignCertCommand): # pylint: disable=missing-class-docstring @@ -61,11 +61,11 @@ def get_profile(self, profile: Optional[str], cert: Certificate) -> Profile: f'Profile "{cert.profile}" for original certificate is no longer defined, please set one via the command line.' # NOQA: E501 ) - def handle( # pylint: disable=too-many-arguments,too-many-locals + def handle( # pylint: disable=too-many-arguments,too-many-locals,too-many-branches self, cert: Certificate, ca: Optional[CertificateAuthority], - subject: Optional[x509.Name], + subject: Optional[str], expires: Optional[timedelta], watch: List[str], password: Optional[bytes], @@ -96,6 +96,8 @@ def handle( # pylint: disable=too-many-arguments,too-many-locals # TLSFeature extension tls_feature: Optional[x509.TLSFeature], tls_feature_critical: bool, + # subject_format will be removed in django-ca 2.2 + subject_format: SubjectFormats, **options: Any, ) -> None: if ca is None: @@ -114,7 +116,9 @@ def handle( # pylint: disable=too-many-arguments,too-many-locals watchers = list(cert.watchers.all()) if subject is None: - subject = cert.subject + parsed_subject = cert.subject + else: + parsed_subject = self.parse_x509_name(subject, subject_format) # Process any extensions given via the command-line extensions: ExtensionMapping = {} @@ -169,7 +173,7 @@ def handle( # pylint: disable=too-many-arguments,too-many-locals # NOTE: This can only happen here in two edge cases: # * Pass a subject without common name AND a certificate does *not* have a subject alternative name. # * An imported certificate that has neither Common Name nor subject alternative name. - common_names = subject.get_attributes_for_oid(NameOID.COMMON_NAME) + common_names = parsed_subject.get_attributes_for_oid(NameOID.COMMON_NAME) if not common_names and ExtensionOID.SUBJECT_ALTERNATIVE_NAME not in extensions: raise CommandError( "Must give at least a Common Name in --subject or one or more " @@ -182,7 +186,7 @@ def handle( # pylint: disable=too-many-arguments,too-many-locals csr=cert.csr.loaded, profile=profile_obj, expires=expires, - subject=subject, + subject=parsed_subject, algorithm=algorithm, extensions=extensions.values(), password=password, diff --git a/ca/django_ca/management/commands/sign_cert.py b/ca/django_ca/management/commands/sign_cert.py index c29038c2c..aef0e47ed 100644 --- a/ca/django_ca/management/commands/sign_cert.py +++ b/ca/django_ca/management/commands/sign_cert.py @@ -30,7 +30,7 @@ from django_ca.management.base import BaseSignCertCommand from django_ca.models import Certificate, CertificateAuthority, Watcher from django_ca.profiles import profiles -from django_ca.typehints import AllowedHashTypes, ExtensionMapping +from django_ca.typehints import AllowedHashTypes, ExtensionMapping, SubjectFormats class Command(BaseSignCertCommand): # pylint: disable=missing-class-docstring @@ -100,7 +100,7 @@ def add_arguments(self, parser: CommandParser) -> None: def handle( # pylint: disable=too-many-arguments,too-many-locals,too-many-branches,too-many-statements self, ca: CertificateAuthority, - subject: Optional[x509.Name], + subject: Optional[str], expires: Optional[timedelta], watch: List[str], password: Optional[bytes], @@ -135,6 +135,8 @@ def handle( # pylint: disable=too-many-arguments,too-many-locals,too-many-branc # TLSFeature extension tls_feature: Optional[x509.TLSFeature], tls_feature_critical: bool, + # subject_format will be removed in django-ca 2.2 + subject_format: SubjectFormats, **options: Any, ) -> None: # Validate parameters early so that we can return better feedback to the user. @@ -187,9 +189,14 @@ def handle( # pylint: disable=too-many-arguments,too-many-locals,too-many-branc if tls_feature is not None: self._add_extension(extensions, tls_feature, tls_feature_critical) - cname = None + # Parse the subject + parsed_subject = None if subject is not None: - cname = subject.get_attributes_for_oid(NameOID.COMMON_NAME) + parsed_subject = self.parse_x509_name(subject, subject_format) + + cname = None + if parsed_subject is not None: + cname = parsed_subject.get_attributes_for_oid(NameOID.COMMON_NAME) if not cname and ExtensionOID.SUBJECT_ALTERNATIVE_NAME not in extensions: raise CommandError( "Must give at least a Common Name in --subject or one or more " @@ -223,7 +230,7 @@ def handle( # pylint: disable=too-many-arguments,too-many-locals,too-many-branc expires=expires, extensions=extensions.values(), password=password, - subject=subject, + subject=parsed_subject, algorithm=algorithm, ) except Exception as ex: diff --git a/ca/django_ca/profiles.py b/ca/django_ca/profiles.py index 938239b94..f2995c8c9 100644 --- a/ca/django_ca/profiles.py +++ b/ca/django_ca/profiles.py @@ -17,7 +17,7 @@ import warnings from datetime import datetime, timedelta from threading import local -from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union +from typing import Any, Dict, Iterable, Iterator, List, Optional, Union from cryptography import x509 from cryptography.x509.oid import AuthorityInformationAccessOID, ExtensionOID, NameOID @@ -101,12 +101,18 @@ def __init__( extensions = {} if subject is None: - self.subject: Optional[Union[typing.Literal[False], x509.Name]] = ca_settings.CA_DEFAULT_SUBJECT + self.subject: Optional[x509.Name] = ca_settings.CA_DEFAULT_SUBJECT elif subject is False: - self.subject = False + self.subject = None elif isinstance(subject, x509.Name): self.subject = subject else: + warnings.warn( + f"{subject}: Support for passing a value of type {subject.__class__} is deprecated and will " + "be removed in django-ca 1.28.0.", + RemovedInDjangoCA128Warning, + stacklevel=2, + ) self.subject = x509_name(subject) if algorithm is not None: @@ -291,7 +297,7 @@ def create_cert( # pylint: disable=too-many-arguments add_issuer_alternative_name=add_issuer_alternative_name, ) - if self.subject is not False and self.subject is not None: + if self.subject is not None: if subject is not None: subject = merge_x509_names(self.subject, subject) else: @@ -385,7 +391,7 @@ def serialize(self) -> SerializedProfile: extensions[EXTENSION_KEYS[key]] = serialize_extension(extension) serialized_name = None - if self.subject is not None and self.subject is not False: + if self.subject is not None: serialized_name = serialize_name(self.subject) data: SerializedProfile = { diff --git a/ca/django_ca/tests/commands/test_init_ca.py b/ca/django_ca/tests/commands/test_init_ca.py index 1e3724d7a..e9a0fcc1f 100644 --- a/ca/django_ca/tests/commands/test_init_ca.py +++ b/ca/django_ca/tests/commands/test_init_ca.py @@ -772,15 +772,15 @@ def test_empty_subject_fields(self) -> None: def test_no_cn(self) -> None: """Test creating a CA with no CommonName.""" name = "test_no_cn" - subject = "/ST=/L=/O=/OU=smth" - error = r"^Subject must contain a common name \(/CN=...\)\.$" + subject = "C=AT,ST=Vienna,L=Vienna,O=Org,OU=OrgUnit" + error = r"^Subject must contain a common name \(CN=\.\.\.\)\.$" with self.assertCreateCASignals(False, False), self.assertCommandError(error): - self.cmd("init_ca", name, subject) + self.cmd("init_ca", name, subject, subject_format="rfc4514") error = r"CommonName must not be an empty value" - subject = "/ST=/L=/O=/OU=smth/CN=" + subject = "C=AT,ST=Vienna,L=Vienna,O=Org,OU=OrgUnit,CN=" with self.assertCreateCASignals(False, False), self.assertCommandError(error): - self.cmd("init_ca", name, subject) + self.cmd("init_ca", name, subject, subject_format="rfc4514") @override_tmpcadir(CA_MIN_KEY_SIZE=1024) def test_parent(self) -> None: diff --git a/ca/django_ca/tests/commands/test_resign_cert.py b/ca/django_ca/tests/commands/test_resign_cert.py index 1fdfd5625..3251fcd03 100644 --- a/ca/django_ca/tests/commands/test_resign_cert.py +++ b/ca/django_ca/tests/commands/test_resign_cert.py @@ -540,8 +540,9 @@ def test_overwrite(self) -> None: "--tls-feature", "status_request_v2", "--tls-feature-critical", + "--subject-format=rfc4514", "--subject", - f"/CN={cname}", + f"CN={cname}", "--watch", watcher, "--subject-alternative-name", @@ -640,7 +641,7 @@ def test_no_cn(self) -> None: r"--subject-alternative-name/--name arguments\.$" ) with self.assertCreateCertSignals(False, False), self.assertCommandError(msg): - self.cmd("resign_cert", cert, subject=subject) + self.cmd("resign_cert", cert, subject=subject.rfc4514_string()) @override_tmpcadir() def test_error(self) -> None: diff --git a/ca/django_ca/tests/commands/test_sign_cert.py b/ca/django_ca/tests/commands/test_sign_cert.py index 23b6900d7..7bd93021d 100644 --- a/ca/django_ca/tests/commands/test_sign_cert.py +++ b/ca/django_ca/tests/commands/test_sign_cert.py @@ -71,7 +71,13 @@ def test_from_stdin(self) -> None: """Test reading CSR from stdin.""" stdin = self.csr_pem.encode() with self.assertCreateCertSignals() as (pre, post): - stdout, stderr = self.cmd("sign_cert", ca=self.ca, subject=self.subject, stdin=stdin) + stdout, stderr = self.cmd( + "sign_cert", + ca=self.ca, + subject=self.subject.rfc4514_string(), + subject_format="rfc4514", + stdin=stdin, + ) self.assertEqual(stderr, "") cert = Certificate.objects.get(cn=self.hostname) @@ -98,7 +104,14 @@ def test_from_stdin(self) -> None: def test_with_bundle(self) -> None: """Test outputting the whole certificate bundle.""" stdin = self.csr_pem.encode() - stdout, stderr = self.cmd("sign_cert", bundle=True, ca=self.ca, subject=self.subject, stdin=stdin) + stdout, stderr = self.cmd( + "sign_cert", + bundle=True, + ca=self.ca, + subject=self.subject.rfc4514_string(), + subject_format="rfc4514", + stdin=stdin, + ) cert = Certificate.objects.get() self.assertEqual(stdout, f"Please paste the CSR:\n{cert.bundle_as_pem}") self.assertEqual(stderr, "") @@ -113,7 +126,12 @@ def test_usable_cas(self) -> None: with self.assertCreateCertSignals() as (pre, post): stdout, stderr = self.cmd( - "sign_cert", ca=ca, subject=self.subject, password=password, stdin=stdin + "sign_cert", + ca=ca, + subject=self.subject.rfc4514_string(), + subject_format="rfc4514", + password=password, + stdin=stdin, ) self.assertEqual(stderr, "") @@ -147,7 +165,13 @@ def test_from_file(self) -> None: csr_stream.write(self.csr_pem) with self.assertCreateCertSignals() as (pre, post): - stdout, stderr = self.cmd("sign_cert", ca=self.ca, subject=self.subject, csr=csr_path) + stdout, stderr = self.cmd( + "sign_cert", + ca=self.ca, + subject=self.subject.rfc4514_string(), + subject_format="rfc4514", + csr=csr_path, + ) self.assertEqual(stderr, "") cert = Certificate.objects.get() @@ -177,7 +201,12 @@ def test_to_file(self) -> None: try: with self.assertCreateCertSignals() as (pre, post): stdout, stderr = self.cmd( - "sign_cert", ca=self.ca, subject=self.subject, out=out_path, stdin=stdin + "sign_cert", + ca=self.ca, + subject=self.subject.rfc4514_string(), + subject_format="rfc4514", + out=out_path, + stdin=stdin, ) cert = Certificate.objects.get() @@ -201,7 +230,14 @@ def test_to_file(self) -> None: def test_with_rsa_with_algorithm(self) -> None: """Test creating a CA with a custom algorithm.""" stdin = self.csr_pem.encode() - self.cmd("sign_cert", ca=self.ca, subject=self.subject, stdin=stdin, algorithm=hashes.SHA256()) + self.cmd( + "sign_cert", + ca=self.ca, + subject=self.subject.rfc4514_string(), + subject_format="rfc4514", + stdin=stdin, + algorithm=hashes.SHA256(), + ) cert = Certificate.objects.get() self.assertIsInstance(cert.algorithm, hashes.SHA256) @@ -209,11 +245,12 @@ def test_with_rsa_with_algorithm(self) -> None: def test_subject_sort(self) -> None: """Test that subject is sorted on the command line.""" cname = "subject-sort.example.com" - subject = f"/CN={cname}/C=AT" + subject = f"CN={cname},C=AT" stdin = self.csr_pem.encode() cmdline = [ "sign_cert", f"--subject={subject}", + "--subject-format=rfc4514", f"--ca={self.ca.serial}", ] @@ -244,10 +281,17 @@ def test_no_dns_cn(self) -> None: stdin = self.csr_pem.encode() cname = "foo bar" msg = rf"^{cname}: Could not parse CommonName as subjectAlternativeName\.$" - subject = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cname)]) + subject = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cname)]).rfc4514_string() with self.assertCommandError(msg), self.assertCreateCertSignals(False, False): - self.cmd("sign_cert", ca=self.ca, subject=subject, cn_in_san=True, stdin=stdin) + self.cmd( + "sign_cert", + ca=self.ca, + subject=subject, + subject_format="rfc4514", + cn_in_san=True, + stdin=stdin, + ) @override_tmpcadir() def test_cn_not_in_san(self) -> None: @@ -257,7 +301,8 @@ def test_cn_not_in_san(self) -> None: stdout, stderr = self.cmd( "sign_cert", ca=self.ca, - subject=self.subject, + subject=self.subject.rfc4514_string(), + subject_format="rfc4514", cn_in_san=False, subject_alternative_name=subject_alternative_name(dns("example.com")).value, stdin=stdin, @@ -284,7 +329,8 @@ def test_no_san(self) -> None: stdout, stderr = self.cmd( "sign_cert", ca=self.ca, - subject=self.subject, + subject=self.subject.rfc4514_string(), + subject_format="rfc4514", cn_in_san=False, stdin=stdin, ) @@ -351,7 +397,8 @@ def test_profile_subject(self) -> None: cn_in_san=False, subject_alternative_name=subject_alternative_name(dns(self.hostname)).value, stdin=stdin, - subject=subject, + subject=subject.rfc4514_string(), + subject_format="rfc4514", ) cert = Certificate.objects.get(cn="CommonName2") @@ -368,7 +415,8 @@ def test_extensions(self) -> None: stdin = self.csr_pem.encode() cmdline = [ "sign_cert", - f"--subject=/CN={self.hostname}", + f"--subject=CN={self.hostname}", + "--subject-format=rfc4514", f"--ca={self.ca.serial}", # Authority Information Access extension "--ocsp-responder=http://ocsp.example.com/1", @@ -478,7 +526,8 @@ def test_extensions_with_non_default_critical(self) -> None: stdin = self.csr_pem.encode() cmdline = [ "sign_cert", - f"--subject=/CN={self.hostname}", + f"--subject=CN={self.hostname}", + "--subject-format=rfc4514", f"--ca={self.ca.serial}", # Certificate Policies extension "--policy-identifier=1.2.3", @@ -567,7 +616,8 @@ def test_add_extensions_with_formatting(self) -> None: stdin = self.csr_pem.encode() cmdline = [ "sign_cert", - f"--subject=/CN={self.hostname}", + f"--subject=CN={self.hostname}", + "--subject-format=rfc4514", f"--ca={self.ca.serial}", "--ocsp-responder=https://example.com/ocsp/{OCSP_PATH}", "--ca-issuer=https://example.com/ca-issuer/{CA_ISSUER_PATH}", @@ -615,7 +665,8 @@ def test_multiple_sans(self) -> None: stdin = self.csr_pem.encode() cmdline = [ "sign_cert", - f"--subject=/CN={self.hostname}", + f"--subject=CN={self.hostname}", + "--subject-format=rfc4514", f"--ca={self.ca.serial}", "--subject-alternative-name=URI:https://example.net", "--subject-alternative-name=DNS:example.org", @@ -713,7 +764,13 @@ def test_der_csr(self) -> None: csr_stream.write(CERT_DATA["child-cert"]["csr"]["parsed"].public_bytes(Encoding.DER)) with self.assertCreateCertSignals() as (pre, post): - stdout, stderr = self.cmd("sign_cert", ca=self.ca, subject=self.subject, csr=csr_path) + stdout, stderr = self.cmd( + "sign_cert", + ca=self.ca, + subject=self.subject.rfc4514_string(), + subject_format="rfc4514", + csr=csr_path, + ) self.assertEqual(stderr, "") cert = Certificate.objects.get() @@ -770,7 +827,7 @@ def test_no_cn_or_san(self) -> None: r"^Must give at least a Common Name in --subject or one or more " r"--subject-alternative-name/--name arguments\.$" ), self.assertCreateCertSignals(False, False): - self.cmd("sign_cert", ca=self.ca, subject=subject) + self.cmd("sign_cert", ca=self.ca, subject=subject.rfc4514_string(), subject_format="rfc4514") @override_tmpcadir() def test_unusable_ca(self) -> None: @@ -781,7 +838,13 @@ def test_unusable_ca(self) -> None: stdin = io.StringIO(self.csr_pem) with self.assertCommandError(msg), self.assertCreateCertSignals(False, False): - self.cmd("sign_cert", ca=self.ca, subject=self.subject, stdin=stdin) + self.cmd( + "sign_cert", + ca=self.ca, + subject=self.subject.rfc4514_string(), + subject_format="rfc4514", + stdin=stdin, + ) @override_tmpcadir() @freeze_time(TIMESTAMPS["everything_expired"]) diff --git a/ca/django_ca/tests/test_management_actions.py b/ca/django_ca/tests/test_management_actions.py index 9399d2a4c..4d3a6823b 100644 --- a/ca/django_ca/tests/test_management_actions.py +++ b/ca/django_ca/tests/test_management_actions.py @@ -25,7 +25,7 @@ from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.serialization import Encoding -from cryptography.x509.oid import ExtendedKeyUsageOID, NameOID +from cryptography.x509.oid import ExtendedKeyUsageOID from django.test import TestCase, override_settings @@ -366,26 +366,28 @@ def test_error(self) -> None: ) -class NameActionTestCase(ParserTestCaseMixin, TestCase): - """Test NameAction.""" - - def setUp(self) -> None: - super().setUp() - self.parser = argparse.ArgumentParser() - self.parser.add_argument("--name", action=actions.NameAction) - - def test_basic(self) -> None: - """Test basic functionality of action.""" - namespace = self.parser.parse_args(["--name=/CN=example.com"]) - self.assertEqual(namespace.name, x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "example.com")])) - - def test_error(self) -> None: - """Test false option values.""" - self.assertParserError( - ["--name=/WRONG=foobar"], - "usage: {script} [-h] [--name NAME]\n" - "{script}: error: argument --name: Unknown x509 name field: WRONG\n", - ) +# Class is not tested until django-ca 2.0, where RFC 4514 parsing is enabled. +# class NameActionTestCase(ParserTestCaseMixin, TestCase): +# """Test NameAction.""" +# +# def setUp(self) -> None: +# super().setUp() +# self.parser = argparse.ArgumentParser() +# self.parser.add_argument("--name", action=actions.NameAction) +# +# def test_basic(self) -> None: +# """Test basic functionality of action.""" +# namespace = self.parser.parse_args(["--name=/CN=example.com"]) +# expected = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "example.com")]) +# self.assertEqual(namespace.name, expected) +# +# def test_error(self) -> None: +# """Test false option values.""" +# self.assertParserError( +# ["--name=/WRONG=foobar"], +# "usage: {script} [-h] [--name NAME]\n" +# "{script}: error: argument --name: Unknown x509 name field: WRONG\n", +# ) class TLSFeatureActionTestCase(ParserTestCaseMixin, TestCase): diff --git a/ca/django_ca/tests/test_profiles.py b/ca/django_ca/tests/test_profiles.py index 7740b7463..3b1c00d61 100644 --- a/ca/django_ca/tests/test_profiles.py +++ b/ca/django_ca/tests/test_profiles.py @@ -23,8 +23,11 @@ from django.test import TestCase, override_settings +import pytest + from django_ca import ca_settings from django_ca.constants import EXTENSION_DEFAULT_CRITICAL, EXTENSION_KEYS +from django_ca.deprecation import RemovedInDjangoCA128Warning from django_ca.models import Certificate, CertificateAuthority from django_ca.profiles import Profile, get_profile, profile, profiles from django_ca.signals import pre_sign_cert @@ -102,21 +105,9 @@ def test_eq(self) -> None: self.assertNotEqual(profile, None) def test_init_django_ca_values(self) -> None: - """Test django-ca extensions as extensions.""" - prof1 = Profile( - "test", - subject=[("C", "AT"), ("CN", self.hostname)], - extensions={ - "ocsp_no_check": {}, - }, - ) - prof2 = Profile( - "test", - subject=[("C", "AT"), ("CN", self.hostname)], - extensions={ - "ocsp_no_check": ocsp_no_check(), - }, - ) + """Test passing serialized extensions leads to equal profiles.""" + prof1 = Profile("test", subject=self.subject, extensions={"ocsp_no_check": {}}) + prof2 = Profile("test", subject=self.subject, extensions={"ocsp_no_check": ocsp_no_check()}) self.assertEqual(prof1, prof2) def test_init_none_extension(self) -> None: @@ -161,7 +152,7 @@ def test_serialize(self) -> None: "test", cn_in_san=True, description=desc, - subject=[("CN", self.hostname)], + subject=x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, self.hostname)]), extensions={ EXTENSION_KEYS[ExtensionOID.KEY_USAGE]: {"value": key_usage}, }, @@ -191,7 +182,7 @@ def test_create_cert_minimal(self) -> None: ca = self.load_ca(name="root", parsed=CERT_DATA["root"]["pub"]["parsed"]) csr = CERT_DATA["child-cert"]["csr"]["parsed"] - prof = Profile("example", subject=[]) + prof = Profile("example") with self.mockSignal(pre_sign_cert) as pre: cert = self.create_cert( prof, @@ -201,7 +192,6 @@ def test_create_cert_minimal(self) -> None: add_issuer_alternative_name=False, ) self.assertEqual(pre.call_count, 1) - self.assertEqual(cert.subject, self.subject) self.assertExtensions( cert, [ca.get_authority_key_identifier_extension(), subject_alternative_name(dns(self.hostname))], @@ -217,7 +207,7 @@ def test_alternative_values(self) -> None: country_name = x509.NameAttribute(NameOID.COUNTRY_NAME, "AT") subject = x509.Name([country_name, x509.NameAttribute(NameOID.COMMON_NAME, self.hostname)]) - prof = Profile("example", subject=[]) + prof = Profile("example", subject=False) with self.mockSignal(pre_sign_cert) as pre: cert = self.create_cert( @@ -251,7 +241,7 @@ def test_overrides(self) -> None: prof = Profile( "example", - subject=[("C", "AT")], + subject=x509.Name([x509.NameAttribute(NameOID.COUNTRY_NAME, "AT")]), add_crl_url=False, add_ocsp_url=False, add_issuer_url=False, @@ -302,7 +292,7 @@ def test_none_extension(self) -> None: """Test passing an extension that is removed by the profile.""" ca = self.load_ca(name="root", parsed=CERT_DATA["root"]["pub"]["parsed"]) csr = CERT_DATA["child-cert"]["csr"]["parsed"] - prof = Profile("example", subject=[("C", "AT")], extensions={"ocsp_no_check": None}) + prof = Profile("example", extensions={"ocsp_no_check": None}) with self.mockSignal(pre_sign_cert) as pre: cert = self.create_cert(prof, ca, csr, subject=self.subject, extensions=[ocsp_no_check()]) @@ -314,25 +304,19 @@ def test_cn_in_san(self) -> None: """Test writing the common name into the SAN.""" ca = self.load_ca(name="root", parsed=CERT_DATA["root"]["pub"]["parsed"]) csr = CERT_DATA["child-cert"]["csr"]["parsed"] - subject = x509.Name( - [ - x509.NameAttribute(NameOID.COUNTRY_NAME, "AT"), - x509.NameAttribute(NameOID.COMMON_NAME, self.hostname), - ] - ) - prof = Profile("example", subject=[("C", "AT")], add_issuer_alternative_name=False, cn_in_san=False) + prof = Profile("example", subject=False, add_issuer_alternative_name=False, cn_in_san=False) with self.mockSignal(pre_sign_cert) as pre: cert = self.create_cert(prof, ca, csr, subject=self.subject) self.assertEqual(pre.call_count, 1) - self.assertEqual(cert.subject, subject) + self.assertEqual(cert.subject, self.subject) self.assertExtensions(cert, [ca.get_authority_key_identifier_extension()]) # Create the same cert, but pass cn_in_san=True to create_cert with self.mockSignal(pre_sign_cert) as pre: cert = self.create_cert(prof, ca, csr, subject=self.subject, cn_in_san=True) self.assertEqual(pre.call_count, 1) - self.assertEqual(cert.subject, subject) + self.assertEqual(cert.subject, self.subject) self.assertExtensions( cert, [ca.get_authority_key_identifier_extension(), subject_alternative_name(dns(self.hostname))], @@ -349,7 +333,7 @@ def test_cn_in_san(self) -> None: extensions=[subject_alternative_name(dns(self.hostname))], ) self.assertEqual(pre.call_count, 1) - self.assertEqual(cert.subject, subject) + self.assertEqual(cert.subject, self.subject) self.assertExtensions( cert, [ca.get_authority_key_identifier_extension(), subject_alternative_name(dns(self.hostname))], @@ -366,7 +350,7 @@ def test_cn_in_san(self) -> None: extensions=[subject_alternative_name(dns(self.hostname + ".added"))], ) self.assertEqual(pre.call_count, 1) - self.assertEqual(cert.subject, subject) + self.assertEqual(cert.subject, self.subject) self.assertExtensions( cert, [ @@ -381,7 +365,7 @@ def test_cn_in_san(self) -> None: prof, ca, csr, cn_in_san=True, extensions=[subject_alternative_name(dns(self.hostname))] ) self.assertEqual(pre.call_count, 1) - self.assertEqual(cert.subject, subject) + self.assertEqual(cert.subject, self.subject) self.assertExtensions( cert, [ca.get_authority_key_identifier_extension(), subject_alternative_name(dns(self.hostname))], @@ -398,7 +382,7 @@ def test_override_ski(self) -> None: value=x509.SubjectKeyIdentifier(b"custom value"), ) - prof = Profile("example", subject=[]) + prof = Profile("example") with self.mockSignal(pre_sign_cert) as pre: cert = self.create_cert( prof, @@ -412,7 +396,6 @@ def test_override_ski(self) -> None: extensions=[ski], ) self.assertEqual(pre.call_count, 1) - self.assertEqual(cert.subject, self.subject) self.assertExtensions( cert, [ @@ -427,7 +410,7 @@ def test_override_ski(self) -> None: @override_tmpcadir() def test_add_distribution_point_with_ca_crldp(self) -> None: """Pass a custom distribution point when creating the cert, which matches ca.crl_url.""" - prof = Profile("example", subject=[]) + prof = Profile("example") ca = self.load_ca(name="root", parsed=CERT_DATA["root"]["pub"]["parsed"]) csr = CERT_DATA["child-cert"]["csr"]["parsed"] @@ -450,7 +433,6 @@ def test_add_distribution_point_with_ca_crldp(self) -> None: extensions=[added_crldp], ) self.assertEqual(pre.call_count, 1) - self.assertEqual(cert.subject, self.subject) ski = x509.SubjectKeyIdentifier.from_public_key(cert.pub.loaded.public_key()) self.assertExtensions( @@ -471,7 +453,7 @@ def test_with_algorithm(self) -> None: root = self.load_ca(name="root", parsed=CERT_DATA["root"]["pub"]["parsed"]) csr = CERT_DATA["child-cert"]["csr"]["parsed"] - prof = Profile("example", subject=[], algorithm="SHA-512") + prof = Profile("example", algorithm="SHA-512") # Make sure that algorithm does not match what is the default profile above, so that we can test it self.assertIsInstance(root.algorithm, hashes.SHA256) @@ -493,7 +475,7 @@ def test_with_algorithm(self) -> None: @override_tmpcadir() def test_issuer_alternative_name_override(self) -> None: """Pass a custom Issuer Alternative Name which overwrites the CA value.""" - prof = Profile("example", subject=[]) + prof = Profile("example") ca = self.load_ca(name="root", parsed=CERT_DATA["root"]["pub"]["parsed"]) csr = CERT_DATA["child-cert"]["csr"]["parsed"] @@ -516,7 +498,6 @@ def test_issuer_alternative_name_override(self) -> None: extensions=[issuer_alternative_name(added_ian_uri)], ) self.assertEqual(pre.call_count, 1) - self.assertEqual(cert.subject, self.subject) ski = x509.SubjectKeyIdentifier.from_public_key(cert.pub.loaded.public_key()) self.assertExtensions( @@ -534,7 +515,7 @@ def test_issuer_alternative_name_override(self) -> None: @override_tmpcadir() def test_merge_authority_information_access_existing_values(self) -> None: """Pass a custom distribution point when creating the cert, which matches ca.crl_url.""" - prof = Profile("example", subject=[]) + prof = Profile("example") ca = self.load_ca(name="root", parsed=CERT_DATA["root"]["pub"]["parsed"]) csr = CERT_DATA["child-cert"]["csr"]["parsed"] @@ -562,7 +543,6 @@ def test_merge_authority_information_access_existing_values(self) -> None: extensions=[added_aia], ) self.assertEqual(pre.call_count, 1) - self.assertEqual(cert.subject, self.subject) ski = x509.SubjectKeyIdentifier.from_public_key(cert.pub.loaded.public_key()) @@ -587,7 +567,7 @@ def test_extension_as_cryptography(self) -> None: ca = self.load_ca(name="root", parsed=CERT_DATA["root"]["pub"]["parsed"]) csr = CERT_DATA["child-cert"]["csr"]["parsed"] - prof = Profile("example", subject=[], extensions={EXTENSION_KEYS[ExtensionOID.OCSP_NO_CHECK]: {}}) + prof = Profile("example", extensions={EXTENSION_KEYS[ExtensionOID.OCSP_NO_CHECK]: {}}) with self.mockSignal(pre_sign_cert) as pre: cert = self.create_cert( prof, @@ -598,7 +578,6 @@ def test_extension_as_cryptography(self) -> None: extensions=[ocsp_no_check()], ) self.assertEqual(pre.call_count, 1) - self.assertEqual(cert.subject, self.subject) self.assertExtensions( cert, [ @@ -615,7 +594,6 @@ def test_extension_overrides(self) -> None: # Profile with extensions (will be overwritten by the command line). prof = Profile( "example", - subject=[], extensions={ EXTENSION_KEYS[ExtensionOID.AUTHORITY_INFORMATION_ACCESS]: authority_information_access( ocsp=[uri("http://ocsp.example.com/profile")], @@ -648,7 +626,6 @@ def test_extension_overrides(self) -> None: extensions=[expected_authority_information_access], ) self.assertEqual(pre.call_count, 1) - self.assertEqual(cert.subject, self.subject) extensions = cert.x509_extensions self.assertEqual( @@ -660,7 +637,6 @@ def test_partial_authority_information_access_override(self) -> None: """Test partial overwriting of the Authority Information Access extension.""" prof = Profile( "example", - subject=[], extensions={ EXTENSION_KEYS[ExtensionOID.AUTHORITY_INFORMATION_ACCESS]: authority_information_access( ocsp=[uri("http://ocsp.example.com/profile")], @@ -693,7 +669,6 @@ def test_partial_authority_information_access_override(self) -> None: ], ) self.assertEqual(pre.call_count, 1) - self.assertEqual(cert.subject, self.subject) extensions = cert.x509_extensions self.assertEqual( @@ -721,7 +696,6 @@ def test_partial_authority_information_access_override(self) -> None: ], ) self.assertEqual(pre.call_count, 1) - self.assertEqual(cert.subject, self.subject) extensions = cert.x509_extensions self.assertEqual( @@ -738,7 +712,7 @@ def test_no_cn_no_san(self) -> None: ca = self.load_ca(name="root", parsed=CERT_DATA["root"]["pub"]["parsed"]) csr = CERT_DATA["child-cert"]["csr"]["parsed"] - prof = Profile("example", subject=[("C", "AT")]) + prof = Profile("example") msg = r"^Must name at least a CN or a subjectAlternativeName\.$" with self.mockSignal(pre_sign_cert) as pre, self.assertRaisesRegex(ValueError, msg): self.create_cert(prof, ca, csr, subject=None) @@ -749,7 +723,7 @@ def test_no_valid_cn_in_san(self) -> None: """Test what happens when the SAN has nothing usable as CN.""" ca = self.load_ca(name="root", parsed=CERT_DATA["root"]["pub"]["parsed"]) csr = CERT_DATA["child-cert"]["csr"]["parsed"] - prof = Profile("example", subject=[], extensions={EXTENSION_KEYS[ExtensionOID.OCSP_NO_CHECK]: {}}) + prof = Profile("example", extensions={EXTENSION_KEYS[ExtensionOID.OCSP_NO_CHECK]: {}}) san = subject_alternative_name(x509.RegisteredID(ExtensionOID.OCSP_NO_CHECK)) with self.mockSignal(pre_sign_cert) as pre: @@ -763,7 +737,7 @@ def test_unparsable_cn(self) -> None: csr = CERT_DATA["child-cert"]["csr"]["parsed"] cname = "foo bar" - prof = Profile("example", subject=[("C", "AT"), ("CN", cname)]) + prof = Profile("example", subject=x509.Name([x509.NameAttribute(x509.NameOID.COMMON_NAME, cname)])) msg = rf"^{cname}: Could not parse CommonName as subjectAlternativeName\.$" with self.mockSignal(pre_sign_cert) as pre, self.assertRaisesRegex(ValueError, msg): self.create_cert(prof, ca, csr) @@ -794,6 +768,25 @@ def test_repr(self) -> None: self.assertEqual(repr(profiles[name]), f"") +def test_deprecated_subject_value() -> None: + """Test deprecated subject values.""" + value = "/C=AT/L=Vienna/ST=Vienna" + msg = ( + rf"^{value}: Support for passing a value of type .* is deprecated and will be removed in " + "django-ca 1.28.0.$" + ) + with pytest.warns(RemovedInDjangoCA128Warning, match=msg): + prof = Profile("test", value) # type: ignore[arg-type] # what we test + + assert prof.subject == x509.Name( + [ + x509.NameAttribute(NameOID.COUNTRY_NAME, "AT"), + x509.NameAttribute(NameOID.LOCALITY_NAME, "Vienna"), + x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Vienna"), + ] + ) + + class GetProfileTestCase(TestCase): """Test the get_profile function.""" diff --git a/ca/django_ca/tests/test_utils.py b/ca/django_ca/tests/test_utils.py index ca431ceb5..110f63fe4 100644 --- a/ca/django_ca/tests/test_utils.py +++ b/ca/django_ca/tests/test_utils.py @@ -300,7 +300,8 @@ def test_aliases(self) -> None: def test_unknown(self) -> None: """Test unknown field.""" field = "ABC" - with self.assertRaisesRegex(ValueError, "^Unknown x509 name field: ABC$") as e: + + with self.assertRaisesRegex(ValueError, rf"^Unknown x509 name field: {field}$") as e: parse_name_x509(f"/{field}=example.com") self.assertEqual(e.exception.args, (f"Unknown x509 name field: {field}",)) @@ -461,17 +462,7 @@ def test_wildcard_domain(self) -> None: def test_dirname(self) -> None: """Test parsing a dirname.""" self.assertEqual( - parse_general_name("/CN=example.com"), - x509.DirectoryName( - x509.Name( - [ - x509.NameAttribute(NameOID.COMMON_NAME, "example.com"), - ] - ) - ), - ) - self.assertEqual( - parse_general_name("dirname:/CN=example.com"), + parse_general_name("dirname:CN=example.com"), x509.DirectoryName( x509.Name( [ @@ -481,7 +472,7 @@ def test_dirname(self) -> None: ), ) self.assertEqual( - parse_general_name("dirname:/C=AT/CN=example.com"), + parse_general_name("dirname:C=AT,CN=example.com"), x509.DirectoryName( x509.Name( [ @@ -1049,15 +1040,23 @@ class X509NameTestCase(TestCase): def test_str(self) -> None: """Test passing a string.""" - subject = "/C=AT/ST=Vienna/L=Vienna/O=O/OU=OU/CN=example.com/emailAddress=user@example.com" + subject = [ + ("C", "AT"), + ("ST", "Vienna"), + ("L", "Vienna"), + ("O", "O"), + ("OU", "OU"), + ("CN", "example.com"), + ("emailAddress", "user@example.com"), + ] self.assertEqual(x509_name(subject), self.name) def test_multiple_other(self) -> None: """Test multiple other tokens (only OUs work).""" with self.assertRaisesRegex(ValueError, '^Subject contains multiple "countryName" fields$'): - x509_name("/C=AT/C=DE") + x509_name([("C", "AT"), ("C", "DE")]) with self.assertRaisesRegex(ValueError, '^Subject contains multiple "commonName" fields$'): - x509_name("/CN=AT/CN=FOO") + x509_name([("CN", "AT"), ("CN", "FOO")]) class MergeX509NamesTestCase(TestCase): diff --git a/ca/django_ca/tests/test_verification.py b/ca/django_ca/tests/test_verification.py index a3a22c494..a1e4cec9e 100644 --- a/ca/django_ca/tests/test_verification.py +++ b/ca/django_ca/tests/test_verification.py @@ -110,7 +110,8 @@ def sign_cert( ) -> Iterator[str]: """Create a signed certificate in a temporary directory.""" stdin = self.csr_pem.encode() - subject = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, hostname)]) + # subject = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, hostname)]) + subject = f"CN={hostname}" with tempfile.TemporaryDirectory() as tempdir: out_path = os.path.join(tempdir, f"{hostname}.pem") diff --git a/ca/django_ca/typehints.py b/ca/django_ca/typehints.py index 48259b234..c58c450ce 100644 --- a/ca/django_ca/typehints.py +++ b/ca/django_ca/typehints.py @@ -16,7 +16,7 @@ import argparse import typing from datetime import datetime, timedelta -from typing import Any, Dict, Iterable, List, Optional, Tuple, Union +from typing import Any, Dict, Iterable, List, Literal, Optional, Tuple, Union from cryptography import x509 from cryptography.hazmat.primitives import hashes @@ -199,6 +199,11 @@ def __lt__(self, __other: Any) -> bool: # pragma: nocover SerializedNullExtension = typing.TypedDict("SerializedNullExtension", {"critical": bool}) +############ +# Literals # +############ +SubjectFormats = Literal["openssl", "rfc4514"] # pragma: only django-ca<=2.2 # will be removed in 2.2 + ################ # Type aliases # ################ diff --git a/ca/django_ca/utils.py b/ca/django_ca/utils.py index 916b3045b..faceff4d0 100644 --- a/ca/django_ca/utils.py +++ b/ca/django_ca/utils.py @@ -40,7 +40,7 @@ from django_ca import ca_settings, constants from django_ca.constants import NAME_OID_DISPLAY_NAMES -from django_ca.deprecation import RemovedInDjangoCA128Warning, RemovedInDjangoCA129Warning +from django_ca.deprecation import RemovedInDjangoCA129Warning from django_ca.typehints import ( AllowedHashTypes, Expires, @@ -156,6 +156,21 @@ def _format_value(val: str) -> str: return f"/{values}" +def parse_name_rfc4514(value: str) -> x509.Name: + """Parse an RFC 4514 formatted string into a :py:class:`~cg:cryptography.x509.Name`. + + This function is intended to be the inverse of :py:func:`~django_ca.utils.format_name_rfc4514`, and will + also parse the name in the order as given in the string and understands the same OID mappings. + + >>> parse_name_rfc4514("CN=example.com") + + >>> parse_name_rfc4514("C=AT,O=MyOrg,OU=MyOrgUnit,CN=example.com") + + """ + name = x509.Name.from_rfc4514_string(value, {v: k for k, v in constants.RFC4514_NAME_OVERRIDES.items()}) + return check_name(x509.Name(reversed(list(name)))) + + def format_name_rfc4514(subject: Union[x509.Name, x509.RelativeDistinguishedName]) -> str: """Format the given (relative distinguished) name as RFC4514 compatible string. @@ -398,27 +413,15 @@ def parse_name_x509(name: ParsableName) -> Tuple[x509.NameAttribute, ...]: on deviations from the format, object identifiers are case-insensitive, whitespace at the start and end is stripped and the subject does not have to start with a slash (``/``). - >>> parse_name_x509('/CN=example.com') + >>> parse_name_x509([("CN", "example.com")]) (, value='example.com')>,) - >>> parse_name_x509('c=AT/l= Vienna/o="quoting/works"/CN=example.com') # doctest: +NORMALIZE_WHITESPACE + >>> parse_name_x509( + ... [("c", "AT"), ("l", "Vienna"), ("o", "quoting/works"), ("CN", "example.com")] + ... ) # doctest: +NORMALIZE_WHITESPACE (, value='AT')>, , value='Vienna')>, , value='quoting/works')>, , value='example.com')>) - - The function also handles whitespace, quoting and slashes correctly: - - >>> parse_name_x509('L="Vienna / District"/CN=example.com') # doctest: +NORMALIZE_WHITESPACE - (, value='Vienna / District')>, - , value='example.com')>) - - Examples of where this string is used are: - - .. code-block:: console - - # openssl req -new -key priv.key -out csr -utf8 -batch -sha256 -subj '/C=AT/CN=example.com' - # openssl x509 -in cert.pem -noout -subject -nameopt compat - /C=AT/L=Vienna/CN=example.com """ if isinstance(name, str): # TYPE NOTE: mypy detects t.split() as Tuple[str, ...] and does not recognize the maxsplit parameter @@ -436,8 +439,6 @@ def parse_name_x509(name: ParsableName) -> Tuple[x509.NameAttribute, ...]: def x509_name(name: ParsableName) -> x509.Name: """Parses a string or iterable of two-tuples into a :py:class:`x509.Name `. - >>> x509_name('/C=AT/CN=example.com') - >>> x509_name([('C', 'AT'), ('CN', 'example.com')]) """ @@ -779,8 +780,6 @@ def parse_general_name(name: ParsableGeneralName) -> x509.GeneralName: >>> parse_general_name('fd00::1') - >>> parse_general_name('/CN=example.com') - )> The default fallback is to assume a :py:class:`~cg:cryptography.x509.DNSName`. If this doesn't work, an exception will be raised: @@ -796,7 +795,7 @@ def parse_general_name(name: ParsableGeneralName) -> x509.GeneralName: >>> parse_general_name('URI:https://example.com') - >>> parse_general_name('dirname:/CN=example.com') + >>> parse_general_name('dirname:CN=example.com') )> Some more exotic values can only be generated by using this prefix: @@ -843,9 +842,6 @@ def parse_general_name(name: ParsableGeneralName) -> x509.GeneralName: except ValueError: pass - if name.strip().startswith("/"): # maybe it's a dirname? - return x509.DirectoryName(x509_name(name)) - # Try to parse this as IPAddress/Network try: return x509.IPAddress(ip_address(name)) @@ -930,7 +926,7 @@ def parse_general_name(name: ParsableGeneralName) -> x509.GeneralName: return x509.OtherName(oid, der_value) elif typ == "dirname": - return x509.DirectoryName(x509_name(name)) + return x509.DirectoryName(parse_name_rfc4514(name)) else: try: return x509.DNSName(encode_dns(name))