From 83079edafe62a3093af01426a5096eb33bc5433a Mon Sep 17 00:00:00 2001 From: Mathias Ertl Date: Sun, 29 Dec 2024 12:09:18 +0100 Subject: [PATCH] parse subject directly and handle in argparse --- ca/django_ca/api/auth.py | 3 +- ca/django_ca/management/actions.py | 11 +- ca/django_ca/management/base.py | 9 +- ca/django_ca/management/commands/init_ca.py | 9 +- .../management/commands/resign_cert.py | 10 +- ca/django_ca/management/commands/sign_cert.py | 13 +-- ca/django_ca/tests/commands/test_init_ca.py | 2 +- .../tests/commands/test_resign_cert.py | 2 +- ca/django_ca/tests/commands/test_sign_cert.py | 108 +++++++++--------- ca/django_ca/tests/test_verification.py | 3 +- 10 files changed, 75 insertions(+), 95 deletions(-) diff --git a/ca/django_ca/api/auth.py b/ca/django_ca/api/auth.py index 0cff43f19..fd0f8be55 100644 --- a/ca/django_ca/api/auth.py +++ b/ca/django_ca/api/auth.py @@ -59,7 +59,8 @@ async def authenticate( # pylint: disable=invalid-overridden-method # NOTE: ahas_perm() is introduced in Django 5.2. if hasattr(user, "ahasperm"): # pragma: only django>5.1 - has_perm = await user.ahas_perm(self.permission) + # TYPEHINT NOTE: mypy will complain on currently released django==5.1. + has_perm = await user.ahas_perm(self.permission) # type: ignore[attr-defined] else: # pragma: only django<5.2 has_perm = await sync_to_async(user.has_perm)(self.permission) diff --git a/ca/django_ca/management/actions.py b/ca/django_ca/management/actions.py index f4f5ee241..4183d923a 100644 --- a/ca/django_ca/management/actions.py +++ b/ca/django_ca/management/actions.py @@ -36,7 +36,7 @@ from django_ca.models import Certificate, CertificateAuthority from django_ca.pydantic.validators import is_power_two_validator from django_ca.typehints import AllowedHashTypes, AlternativeNameExtensionType, EllipticCurves -from django_ca.utils import parse_encoding, parse_general_name +from django_ca.utils import parse_encoding, parse_general_name, parse_name_rfc4514 ActionType = typing.TypeVar("ActionType") # pylint: disable=invalid-name ParseType = typing.TypeVar("ParseType") # pylint: disable=invalid-name @@ -479,7 +479,7 @@ def parse_value(self, value: str) -> ReasonFlags: return ReasonFlags[value] -class NameAction(SingleValueAction[str, str]): +class NameAction(SingleValueAction[str, x509.Name]): """Action to parse a string into a :py:class:`cg:~cryptography.x509.Name`. Note that this action does *not* take care of sorting the subject in any way. @@ -491,11 +491,10 @@ class NameAction(SingleValueAction[str, str]): Namespace(name=CN=example.com) """ - def parse_value(self, value: str) -> str: - # TODO: In django-ca 2.0, parse subject here directly using parse_name_rfc4514(). + def parse_value(self, value: str) -> x509.Name: try: - return value - except ValueError as ex: # pragma: no cover # pragma: only django-ca<2.0 + return parse_name_rfc4514(value) + except ValueError as ex: raise argparse.ArgumentError(self, str(ex)) from ex diff --git a/ca/django_ca/management/base.py b/ca/django_ca/management/base.py index 9b0ece63f..e4c96ff0b 100644 --- a/ca/django_ca/management/base.py +++ b/ca/django_ca/management/base.py @@ -49,7 +49,7 @@ ConfigurableExtension, ConfigurableExtensionType, ) -from django_ca.utils import add_colons, name_for_display, parse_name_rfc4514 +from django_ca.utils import add_colons, name_for_display if typing.TYPE_CHECKING: from django_stubs_ext import StrOrPromise @@ -412,13 +412,6 @@ def add_extension( x509.Extension(oid=value.oid, critical=critical, value=value) # type: ignore[arg-type] ) - def parse_x509_name(self, value: str) -> x509.Name: - """Parse a `name` in the given `format`.""" - try: - return parse_name_rfc4514(value) - except ValueError as ex: - raise CommandError(ex) from ex - def add_extended_key_usage_group(self, parser: CommandParser) -> None: """Add argument group for the Extended Key Usage extension.""" ext_name = constants.EXTENSION_NAMES[ExtensionOID.EXTENDED_KEY_USAGE] diff --git a/ca/django_ca/management/commands/init_ca.py b/ca/django_ca/management/commands/init_ca.py index c794137d3..48a677ea9 100644 --- a/ca/django_ca/management/commands/init_ca.py +++ b/ca/django_ca/management/commands/init_ca.py @@ -282,7 +282,7 @@ def add_extension( def handle( # pylint: disable=too-many-locals # noqa: PLR0912,PLR0913,PLR0915 self, name: str, - subject: str, + subject: x509.Name, parent: Optional[CertificateAuthority], expires: timedelta, # private key storage options @@ -418,11 +418,8 @@ def handle( # pylint: disable=too-many-locals # noqa: PLR0912,PLR0913,PLR0915 responder_value = format_general_name(ca_issuer.access_location) raise CommandError(f"{responder_value}: CA issuer cannot be added to root CAs.") - # Parse the subject - parsed_subject = self.parse_x509_name(subject) - # We require a valid common name - common_name = next((attr.value for attr in parsed_subject if attr.oid == NameOID.COMMON_NAME), False) + common_name = next((attr.value for attr in subject if attr.oid == NameOID.COMMON_NAME), False) if not common_name: raise CommandError("Subject must contain a common name (CN=...).") @@ -545,7 +542,7 @@ def handle( # pylint: disable=too-many-locals # noqa: PLR0912,PLR0913,PLR0915 name=name, key_backend=key_backend, key_backend_options=key_backend_options, - subject=parsed_subject, + subject=subject, not_after=not_after_datetime, algorithm=algorithm, parent=parent, diff --git a/ca/django_ca/management/commands/resign_cert.py b/ca/django_ca/management/commands/resign_cert.py index 8e2566563..2dc390bbd 100644 --- a/ca/django_ca/management/commands/resign_cert.py +++ b/ca/django_ca/management/commands/resign_cert.py @@ -68,7 +68,7 @@ def handle( # pylint: disable=too-many-locals # noqa: PLR0912, PLR0913 self, cert: Certificate, ca: Optional[CertificateAuthority], - subject: Optional[str], + subject: Optional[x509.Name], expires: Optional[timedelta], watch: list[str], profile: Optional[str], @@ -117,9 +117,7 @@ def handle( # pylint: disable=too-many-locals # noqa: PLR0912, PLR0913 watchers = list(cert.watchers.all()) if subject is None: - parsed_subject = cert.subject - else: - parsed_subject = self.parse_x509_name(subject) + subject = cert.subject # Process any extensions given via the command-line extensions: list[ConfigurableExtension] = [] @@ -177,7 +175,7 @@ def handle( # pylint: disable=too-many-locals # noqa: PLR0912, PLR0913 # NOTE: This can only happen here in two edge cases: # * Pass a subject without common name AND a certificate does *not* have a subject alternative name. # * An imported certificate that has neither Common Name nor subject alternative name. - common_names = parsed_subject.get_attributes_for_oid(NameOID.COMMON_NAME) + common_names = subject.get_attributes_for_oid(NameOID.COMMON_NAME) has_subject_alternative_name = next( (ext for ext in extensions if ext.oid == ExtensionOID.SUBJECT_ALTERNATIVE_NAME), None ) @@ -194,7 +192,7 @@ def handle( # pylint: disable=too-many-locals # noqa: PLR0912, PLR0913 csr=cert.csr.loaded, profile=profile_obj, not_after=expires, - subject=parsed_subject, + subject=subject, algorithm=algorithm, extensions=extensions, ) diff --git a/ca/django_ca/management/commands/sign_cert.py b/ca/django_ca/management/commands/sign_cert.py index 3a702c9b7..535350176 100644 --- a/ca/django_ca/management/commands/sign_cert.py +++ b/ca/django_ca/management/commands/sign_cert.py @@ -70,7 +70,7 @@ def add_arguments(self, parser: CommandParser) -> None: def handle( # pylint: disable=too-many-locals # noqa: PLR0912, PLR0913 self, ca: CertificateAuthority, - subject: Optional[str], + subject: Optional[x509.Name], expires: Optional[timedelta], watch: list[str], csr_path: str, @@ -150,14 +150,9 @@ def handle( # pylint: disable=too-many-locals # noqa: PLR0912, PLR0913 if tls_feature is not None: self.add_extension(extensions, tls_feature, tls_feature_critical) - # Parse the subject - parsed_subject = None - if subject is not None: - parsed_subject = self.parse_x509_name(subject) - cname = None - if parsed_subject is not None: - cname = parsed_subject.get_attributes_for_oid(NameOID.COMMON_NAME) + if subject is not None: + cname = subject.get_attributes_for_oid(NameOID.COMMON_NAME) if not cname and subject_alternative_name is None: raise CommandError( "Must give at least a Common Name in --subject or one or more " @@ -190,7 +185,7 @@ def handle( # pylint: disable=too-many-locals # noqa: PLR0912, PLR0913 profile=profile_obj, not_after=expires, extensions=extensions, - subject=parsed_subject, + subject=subject, algorithm=algorithm, ) except Exception as ex: diff --git a/ca/django_ca/tests/commands/test_init_ca.py b/ca/django_ca/tests/commands/test_init_ca.py index ab6868225..f123bf1cc 100644 --- a/ca/django_ca/tests/commands/test_init_ca.py +++ b/ca/django_ca/tests/commands/test_init_ca.py @@ -1534,7 +1534,7 @@ def test_invalid_common_name(value: str, msg: str) -> None: def test_unparsable_subject(ca_name: str) -> None: """Test error when you pass an unparsable subject.""" - with assert_command_error(r"^/CN=example\.com: Could not parse name as RFC 4514 string\.$"): + with assert_command_error(r"/CN=example\.com: Could not parse name as RFC 4514 string\.$"): cmd("init_ca", ca_name, "/CN=example.com") diff --git a/ca/django_ca/tests/commands/test_resign_cert.py b/ca/django_ca/tests/commands/test_resign_cert.py index ca22fe220..1b83da5f6 100644 --- a/ca/django_ca/tests/commands/test_resign_cert.py +++ b/ca/django_ca/tests/commands/test_resign_cert.py @@ -635,7 +635,7 @@ def test_no_cn(self) -> None: r"--subject-alternative-name/--name arguments\.$" ) with assert_create_cert_signals(False, False), assert_command_error(msg): - cmd("resign_cert", cert, subject=subject.rfc4514_string()) + cmd("resign_cert", cert, subject=subject) @override_tmpcadir() def test_error(self) -> None: diff --git a/ca/django_ca/tests/commands/test_sign_cert.py b/ca/django_ca/tests/commands/test_sign_cert.py index 075471e1f..827ff73b2 100644 --- a/ca/django_ca/tests/commands/test_sign_cert.py +++ b/ca/django_ca/tests/commands/test_sign_cert.py @@ -48,6 +48,7 @@ certificate_policies, cmd, cmd_e2e, + cn, crl_distribution_points, distribution_point, dns, @@ -66,17 +67,17 @@ pytestmark = [pytest.mark.freeze_time(TIMESTAMPS["everything_valid"])] -def sign_cert(ca: CertificateAuthority, subject: str, **kwargs: Any) -> tuple[str, str]: +def sign_cert(ca: CertificateAuthority, subject: x509.Name, **kwargs: Any) -> tuple[str, str]: """Shortcut for the sign_cert command.""" return cmd("sign_cert", ca=ca, subject=subject, **kwargs) -def test_usable_cas(usable_ca: CertificateAuthority, subject: x509.Name, rfc4514_subject: str) -> None: +def test_usable_cas(usable_ca: CertificateAuthority, subject: x509.Name) -> None: """Test signing with all usable CAs.""" password = CERT_DATA[usable_ca.name].get("password") with assert_create_cert_signals() as (pre, post): - stdout, stderr = sign_cert(usable_ca, rfc4514_subject, password=password, stdin=csr) + stdout, stderr = sign_cert(usable_ca, subject, password=password, stdin=csr) assert stderr == "" cert = Certificate.objects.get(ca=usable_ca) @@ -95,20 +96,20 @@ def test_usable_cas(usable_ca: CertificateAuthority, subject: x509.Name, rfc4514 assert_authority_key_identifier(usable_ca, cert) -def test_with_bundle(usable_root: CertificateAuthority, rfc4514_subject: str) -> None: +def test_with_bundle(usable_root: CertificateAuthority, subject: x509.Name) -> None: """Test outputting the whole certificate bundle.""" - stdout, stderr = sign_cert(usable_root, rfc4514_subject, bundle=True, stdin=csr) + stdout, stderr = sign_cert(usable_root, subject, bundle=True, stdin=csr) cert = Certificate.objects.get() assert stdout == f"Please paste the CSR:\n{cert.bundle_as_pem}" assert stderr == "" assert isinstance(cert.algorithm, hashes.SHA256) -def test_from_file(usable_root: CertificateAuthority, subject: x509.Name, rfc4514_subject: str) -> None: +def test_from_file(usable_root: CertificateAuthority, subject: x509.Name) -> None: """Test reading CSR from file.""" csr_path = FIXTURES_DIR / CERT_DATA["root-cert"]["csr_filename"] with assert_create_cert_signals() as (pre, post): - stdout, stderr = sign_cert(usable_root, rfc4514_subject, csr=csr_path) + stdout, stderr = sign_cert(usable_root, subject, csr=csr_path) assert stderr == "" cert = Certificate.objects.get() @@ -126,11 +127,11 @@ def test_from_file(usable_root: CertificateAuthority, subject: x509.Name, rfc451 assert ExtensionOID.SUBJECT_ALTERNATIVE_NAME not in actual -def test_to_file(tmp_path: Path, usable_root: CertificateAuthority, rfc4514_subject: str) -> None: +def test_to_file(tmp_path: Path, usable_root: CertificateAuthority, subject: x509.Name) -> None: """Test writing PEM to file.""" out_path = os.path.join(tmp_path, "test.pem") with assert_create_cert_signals() as (pre, post): - stdout, stderr = sign_cert(usable_root, rfc4514_subject, out=out_path, stdin=csr) + stdout, stderr = sign_cert(usable_root, subject, out=out_path, stdin=csr) assert stdout == "Please paste the CSR:\n" assert stderr == "" @@ -144,10 +145,10 @@ def test_to_file(tmp_path: Path, usable_root: CertificateAuthority, rfc4514_subj assert cert.pub.pem == from_file -def test_with_rsa_with_algorithm(usable_root: CertificateAuthority, rfc4514_subject: str) -> None: +def test_with_rsa_with_algorithm(usable_root: CertificateAuthority, subject: x509.Name) -> None: """Test creating a CA with a custom algorithm.""" assert isinstance(usable_root.algorithm, hashes.SHA256) # make sure that default is different - sign_cert(usable_root, rfc4514_subject, stdin=csr, algorithm=hashes.SHA3_256()) + sign_cert(usable_root, subject, stdin=csr, algorithm=hashes.SHA3_256()) cert = Certificate.objects.get() assert isinstance(cert.algorithm, hashes.SHA3_256) @@ -211,10 +212,10 @@ def test_subject_sort_with_no_common_name( ) -def test_no_san(usable_root: CertificateAuthority, subject: x509.Name, rfc4514_subject: str) -> None: +def test_no_san(usable_root: CertificateAuthority, subject: x509.Name) -> None: """Test signing without passing any SANs.""" with assert_create_cert_signals() as (pre, post): - stdout, stderr = sign_cert(usable_root, rfc4514_subject, stdin=csr) + stdout, stderr = sign_cert(usable_root, subject, stdin=csr) cert = Certificate.objects.get() assert cert.pub.loaded.subject == subject assert_post_issue_cert(post, cert) @@ -265,12 +266,7 @@ def test_profile_subject(settings: SettingsWrapper, usable_root: CertificateAuth ] ) with assert_create_cert_signals() as (pre, post): - sign_cert( - usable_root, - subject_alternative_name=san.value, - stdin=csr, - subject=subject.rfc4514_string(), - ) + sign_cert(usable_root, subject_alternative_name=san.value, stdin=csr, subject=subject) cert = Certificate.objects.get(cn="CommonName2") assert_post_issue_cert(post, cert) @@ -541,7 +537,7 @@ def test_no_subject(settings: SettingsWrapper, usable_root: CertificateAuthority @pytest.mark.usefixtures("tmpcadir") -def test_secondary_backend(pwd: CertificateAuthority, rfc4514_subject: str) -> None: +def test_secondary_backend(pwd: CertificateAuthority, subject: x509.Name) -> None: """Sign a certificate with a CA in the secondary backend.""" # Prepare root so that it is usable with the secondary backend. secondary_location = storages["secondary"].location # type: ignore[attr-defined] @@ -550,43 +546,43 @@ def test_secondary_backend(pwd: CertificateAuthority, rfc4514_subject: str) -> N pwd.save() with assert_create_cert_signals() as (pre, post): - sign_cert(pwd, rfc4514_subject, secondary_password=CERT_DATA["pwd"]["password"], stdin=csr) + sign_cert(pwd, subject, secondary_password=CERT_DATA["pwd"]["password"], stdin=csr) cert = Certificate.objects.get() assert_signature([pwd], cert) @pytest.mark.hsm -def test_hsm_backend(usable_hsm_ca: CertificateAuthority, rfc4514_subject: str) -> None: +def test_hsm_backend(usable_hsm_ca: CertificateAuthority, subject: x509.Name) -> None: """Test signing a certificate with a CA that is in a HSM.""" with assert_create_cert_signals() as (pre, post): - sign_cert(usable_hsm_ca, rfc4514_subject, stdin=csr) + sign_cert(usable_hsm_ca, subject, stdin=csr) cert = Certificate.objects.get() assert_signature([usable_hsm_ca], cert) def test_encrypted_ca_with_settings( - usable_pwd: CertificateAuthority, rfc4514_subject: str, settings: SettingsWrapper + usable_pwd: CertificateAuthority, subject: x509.Name, settings: SettingsWrapper ) -> None: """Sign a certificate with an encrypted CA, with the password in CA_PASSWORDS.""" settings.CA_PASSWORDS = {usable_pwd.serial: CERT_DATA[usable_pwd.name]["password"]} with assert_create_cert_signals(): - sign_cert(usable_pwd, rfc4514_subject, stdin=csr) + sign_cert(usable_pwd, subject, stdin=csr) cert = Certificate.objects.get() assert_signature([usable_pwd], cert) -def test_unencrypted_ca_with_password(usable_root: CertificateAuthority, rfc4514_subject: str) -> None: +def test_unencrypted_ca_with_password(usable_root: CertificateAuthority, subject: x509.Name) -> None: """Test signing with a CA that is not protected with a password, but giving a password.""" with ( assert_command_error(r"^Password was given but private key is not encrypted\.$"), assert_create_cert_signals(False, False), ): - sign_cert(usable_root, rfc4514_subject, password=b"there-is-no-password", stdin=csr) + sign_cert(usable_root, subject, password=b"there-is-no-password", stdin=csr) assert Certificate.objects.exists() is False def test_encrypted_ca_with_no_password( - usable_pwd: CertificateAuthority, rfc4514_subject: str, settings: SettingsWrapper + usable_pwd: CertificateAuthority, subject: x509.Name, settings: SettingsWrapper ) -> None: """Test signing with a CA that is protected with a password, but not giving a password.""" settings.CA_PASSWORDS = {} @@ -594,21 +590,21 @@ def test_encrypted_ca_with_no_password( assert_command_error(r"^Password was not given but private key is encrypted$"), assert_create_cert_signals(False, False), ): - sign_cert(usable_pwd, rfc4514_subject, stdin=csr) + sign_cert(usable_pwd, subject, stdin=csr) assert Certificate.objects.exists() is False -def test_encrypted_ca_with_wrong_password(usable_pwd: CertificateAuthority, rfc4514_subject: str) -> None: +def test_encrypted_ca_with_wrong_password(usable_pwd: CertificateAuthority, subject: x509.Name) -> None: """Test that passing the wrong password raises an error.""" with ( assert_command_error(r"^Could not decrypt private key - bad password\?$"), assert_create_cert_signals(False, False), ): - sign_cert(usable_pwd, rfc4514_subject, stdin=csr, password=b"wrong") + sign_cert(usable_pwd, subject, stdin=csr, password=b"wrong") assert Certificate.objects.exists() is False -def test_unparsable_private_key(usable_root: CertificateAuthority, rfc4514_subject: str) -> None: +def test_unparsable_private_key(usable_root: CertificateAuthority, subject: x509.Name) -> None: """Test creating a cert where the CA private key contains bogus data.""" path = storages["django-ca"].path(usable_root.key_backend_options["path"]) with open(path, "wb") as stream: @@ -618,7 +614,7 @@ def test_unparsable_private_key(usable_root: CertificateAuthority, rfc4514_subje assert_command_error(r"^Could not decrypt private key - bad password\?$"), assert_create_cert_signals(False, False), ): - sign_cert(usable_root, rfc4514_subject, stdin=csr) + sign_cert(usable_root, subject, stdin=csr) def test_unsortable_subject_with_no_profile_subject( @@ -632,12 +628,9 @@ def test_unsortable_subject_with_no_profile_subject( "correct" location from the SubjectAlternativeName extension). """ settings.CA_PROFILES = {model_settings.CA_DEFAULT_PROFILE: {"subject": False}} + subject = x509.Name([x509.NameAttribute(NameOID.INN, "weird"), cn(hostname)]) with assert_create_cert_signals() as (pre, post): - stdout, stderr = sign_cert( - usable_root, - subject=f"inn=weird,CN={hostname}", - stdin=csr, - ) + stdout, stderr = sign_cert(usable_root, subject=subject, stdin=csr) assert stderr == "" cert = Certificate.objects.get(cn=hostname) @@ -662,8 +655,11 @@ def test_unsortable_subject_with_profile_subject( """ settings.CA_PROFILES = {} settings.CA_DEFAULT_SUBJECT = ({"oid": "C", "value": "AT"},) - subject = f"inn=weird,CN={hostname}" - with assert_command_error(rf"^{subject}: Unsortable name$"), assert_create_cert_signals(False, False): + subject = x509.Name([x509.NameAttribute(NameOID.INN, "weird"), cn(hostname)]) + with ( + assert_command_error(rf"^inn=weird,CN={hostname}: Unsortable name$"), + assert_create_cert_signals(False, False), + ): sign_cert(usable_root, subject, stdin=csr) @@ -676,14 +672,14 @@ def test_unsortable_subject_with_no_common_name( """ settings.CA_PROFILES = {} settings.CA_DEFAULT_SUBJECT = None - subject = "inn=weird" + subject = x509.Name([x509.NameAttribute(NameOID.INN, "weird")]) san = subject_alternative_name(dns(hostname)).value - with assert_command_error(rf"^{subject}: Unsortable name$"), assert_create_cert_signals(False, False): + with assert_command_error(r"^inn=weird: Unsortable name$"), assert_create_cert_signals(False, False): # NOTE: pass SAN as otherwise check for missing common name or san would fire sign_cert(usable_root, subject, subject_alternative_name=san, stdin=csr) -def test_expiry_too_late(usable_root: CertificateAuthority, rfc4514_subject: str) -> None: +def test_expiry_too_late(usable_root: CertificateAuthority, subject: x509.Name) -> None: """Test signing with an expiry after the CA expires.""" time_left = (usable_root.not_after - timezone.now()).days expires = timedelta(days=time_left + 3) @@ -694,10 +690,10 @@ def test_expiry_too_late(usable_root: CertificateAuthority, rfc4514_subject: str ), assert_create_cert_signals(False, False), ): - sign_cert(usable_root, rfc4514_subject, expires=expires, stdin=csr) + sign_cert(usable_root, subject, expires=expires, stdin=csr) -def test_revoked_ca(root: CertificateAuthority, rfc4514_subject: str) -> None: +def test_revoked_ca(root: CertificateAuthority, subject: x509.Name) -> None: """Test signing with a revoked CA.""" root.revoke() @@ -705,13 +701,13 @@ def test_revoked_ca(root: CertificateAuthority, rfc4514_subject: str) -> None: assert_command_error(r"^Certificate authority is revoked\.$"), assert_create_cert_signals(False, False), ): - sign_cert(root, rfc4514_subject, stdin=csr) + sign_cert(root, subject, stdin=csr) -def test_invalid_algorithm(ed_ca: CertificateAuthority, rfc4514_subject: str) -> None: +def test_invalid_algorithm(ed_ca: CertificateAuthority, subject: x509.Name) -> None: """Test passing an invalid algorithm.""" with assert_command_error(r"^Ed(448|25519) keys do not allow an algorithm for signing\.$"): - sign_cert(ed_ca, rfc4514_subject, algorithm=hashes.SHA512()) + sign_cert(ed_ca, subject, algorithm=hashes.SHA512()) def test_no_cn_or_san(usable_root: CertificateAuthority, hostname: str) -> None: @@ -724,29 +720,29 @@ def test_no_cn_or_san(usable_root: CertificateAuthority, hostname: str) -> None: ), assert_create_cert_signals(False, False), ): - sign_cert(usable_root, subject.rfc4514_string()) + sign_cert(usable_root, subject) -def test_unusable_ca(root: CertificateAuthority, rfc4514_subject: str) -> None: +def test_unusable_ca(root: CertificateAuthority, subject: x509.Name) -> None: """Test signing with an unusable CA.""" msg = r"root.key: Private key file not found\.$" with assert_command_error(msg), assert_create_cert_signals(False, False): - sign_cert(root, rfc4514_subject, stdin=csr) + sign_cert(root, subject, stdin=csr) @pytest.mark.freeze_time(TIMESTAMPS["everything_expired"]) -def test_expired_ca(root: CertificateAuthority, rfc4514_subject: str) -> None: +def test_expired_ca(root: CertificateAuthority, subject: x509.Name) -> None: """Test signing with an expired CA.""" msg = r"^Certificate authority has expired\.$" with assert_command_error(msg), assert_create_cert_signals(False, False): - sign_cert(root, rfc4514_subject, stdin=csr) + sign_cert(root, subject, stdin=csr) def test_add_any_policy(root: CertificateAuthority) -> None: """Test adding the anyPolicy, which is an error for end-entity certificates.""" cmdline = [ "sign_cert", - "--subject=/CN=example.com", + "--subject=CN=example.com", f"--ca={root.serial}", "--policy-identifier=anyPolicy", ] @@ -760,11 +756,11 @@ def test_add_any_policy(root: CertificateAuthority) -> None: assert "anyPolicy is not allowed in this context." in actual_stderr.getvalue() -def test_model_validation_error(root: CertificateAuthority, rfc4514_subject: str) -> None: +def test_model_validation_error(root: CertificateAuthority, subject: x509.Name) -> None: """Test model validation is tested properly. NOTE: This example is contrived for the default backend, as the type of the password would already be checked by argparse. Other backends however might have other validation mechanisms. """ with assert_command_error(r"^password: Input should be a valid bytes$"): - sign_cert(root, rfc4514_subject, password=123) + sign_cert(root, subject, password=123) diff --git a/ca/django_ca/tests/test_verification.py b/ca/django_ca/tests/test_verification.py index 555effa09..20fa6b0a2 100644 --- a/ca/django_ca/tests/test_verification.py +++ b/ca/django_ca/tests/test_verification.py @@ -38,6 +38,7 @@ from django_ca.tests.base.constants import CERT_DATA from django_ca.tests.base.utils import ( cmd, + cn, crl_distribution_points, distribution_point, override_tmpcadir, @@ -119,7 +120,7 @@ def dumped(*certificates: X509CertMixin) -> Iterator[list[str]]: def sign_cert(ca: CertificateAuthority, hostname: str = "example.com", **kwargs: Any) -> Iterator[str]: """Create a signed certificate in a temporary directory.""" stdin = CERT_DATA["root-cert"]["csr"]["parsed"].public_bytes(Encoding.PEM) - subject = f"CN={hostname}" + subject = x509.Name([cn(hostname)]) with tempfile.TemporaryDirectory() as tempdir: out_path = os.path.join(tempdir, f"{hostname}.pem")