Skip to content

Commit

Permalink
store certificate policies data in new Pydantic format
Browse files Browse the repository at this point in the history
  • Loading branch information
mathiasertl committed Dec 29, 2023
1 parent ab85a64 commit f75e1fe
Show file tree
Hide file tree
Showing 5 changed files with 455 additions and 83 deletions.
20 changes: 20 additions & 0 deletions ca/django_ca/migrations/0038_auto_20231228_1932.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Generated by Django 5.0 on 2023-12-28 18:32

from django.db import migrations


def update_sign_certificates_schema(apps, schema_editor) -> None:
"""Migrate stored data to new Pydantic-based serialization."""
CertificateAuthority = apps.get_model("django_ca", "CertificateAuthority")
for ca in CertificateAuthority.objects.exclude(sign_certificate_policies=None):
ca.save()


class Migration(migrations.Migration):
dependencies = [
("django_ca", "0037_alter_certificateauthority_name_and_more"),
]

operations = [
migrations.RunPython(update_sign_certificates_schema, reverse_code=migrations.RunPython.noop)
]
152 changes: 129 additions & 23 deletions ca/django_ca/modelfields.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import abc
import typing
from typing import Any, Dict, Optional, Sequence, Tuple, Type, Union
from typing import Any, Dict, List, Optional, Sequence, Tuple, Type, Union

from pydantic import ValidationError as PydanticValidationError

from cryptography import x509
from cryptography.hazmat.primitives.serialization import Encoding
Expand All @@ -29,9 +31,16 @@
from django.utils.translation import gettext_lazy as _

from django_ca import constants
from django_ca.extensions import parse_extension, serialize_extension
from django_ca.fields import CertificateSigningRequestField as CertificateSigningRequestFormField
from django_ca.typehints import JSON, ExtensionTypeTypeVar, SerializedExtension
from django_ca.pydantic.extensions import CertificatePoliciesModel, ExtensionModelTypeVar
from django_ca.typehints import (
JSON,
ExtensionTypeTypeVar,
SerializedNoticeReference,
SerializedPolicyInformation,
SerializedPydanticExtension,
SerializedUserNotice,
)

DecodableCertificate = Union[str, bytes, x509.Certificate]
DecodableCertificateSigningRequest = Union[str, bytes, x509.CertificateSigningRequest]
Expand Down Expand Up @@ -269,7 +278,7 @@ class CertificateField(LazyBinaryField[DecodableCertificate, LazyCertificate]):
wrapper = LazyCertificate


class ExtensionField(models.JSONField, typing.Generic[ExtensionTypeTypeVar]):
class ExtensionField(models.JSONField, typing.Generic[ExtensionTypeTypeVar, ExtensionModelTypeVar]):
"""Base class for fields storing a `x509.Extension` class.
Since the docs are a bit confusing, here is how the methods are called in some scenarios
Expand All @@ -286,6 +295,7 @@ class ExtensionField(models.JSONField, typing.Generic[ExtensionTypeTypeVar]):
"""

extension_class: Type[ExtensionTypeTypeVar]
model_class: Type[ExtensionModelTypeVar]
default_error_messages = { # noqa: RUF012 # defined in base class, cannot be overwritten
"unparsable-extension": _("The value cannot be parsed to an extension."),
"invalid-type": _("%(value)s: Not a cryptography.x509.Extension class."),
Expand All @@ -302,14 +312,24 @@ def __get__( # type: ignore[override]
def __set__(
self,
instance: Any,
value: Optional[Union[x509.Extension[x509.ExtensionType], SerializedExtension]],
value: Optional[
Union[
x509.Extension[ExtensionTypeTypeVar], ExtensionModelTypeVar, SerializedPydanticExtension
]
],
) -> None:
...

@property
def extension_key(self) -> str:
"""The extension key for the handled extension."""
return constants.EXTENSION_KEYS[self.extension_class.oid]
def unparsable(self, value: JSON) -> ValidationError:
"""Raise a ValidationError for an unparsable value."""
return ValidationError(
self.error_messages["unparsable-extension"], code="unparsable-extension", params={"value": value}
)

# COVERAGE NOTE: Currently overwritten in the only implementing subclass
def parse_raw_extension(self, value: JSON) -> x509.Extension[ExtensionTypeTypeVar]: # pragma: no cover
"""Give implementing subclasses the opportunity to implement their own parsing."""
raise self.unparsable(value)

def from_db_value(
self, value: Any, expression: Any, connection: Any
Expand All @@ -321,31 +341,49 @@ def from_db_value(
# TYPE NOTE: django-stubs seems to not have the function in the super-class
parsed_json: JSON = super().from_db_value(value, expression, connection) # type: ignore[misc]

return parse_extension(self.extension_key, parsed_json) # type: ignore[return-value,arg-type]
if isinstance(parsed_json, dict) and "type" in parsed_json:
return self.model_class.model_validate(parsed_json, strict=True).cryptography

# The passed value looks like arbitrary data, so we give the implementing subclass the opportunity
# to parse the value. parse_raw_extension() just raises ValidationError in the base class.
return self.parse_raw_extension(parsed_json)

def to_python(self, value: Any) -> Optional[x509.Extension[ExtensionTypeTypeVar]]:
if isinstance(value, x509.Extension):
"""Convert the set value to the correct Python type.
This function is called during full_clean() to convert the value to the expected Python type:
>>> obj.certificate_policies = x509.Extension(...)
>>> obj.full_clean() # to_python() is called here
As such the method must handle *any* value gracefully (or raise ValidationError) and return a correct
x509.Extension instance.
"""
if isinstance(value, x509.Extension) and isinstance(value.value, self.extension_class):
return value
if isinstance(value, self.model_class):
return value.cryptography

# COVERAGE NOTE: Despite extensive tests, this method never seems to be called with `value=None`. The
# docs however strongly recommend that we handle this case, hence the block below.
if value is None: # pragma: no cover
return value

try:
return parse_extension(self.extension_key, value) # type: ignore
except Exception as ex:
raise ValidationError(
self.error_messages["unparsable-extension"],
code="unparsable-extension",
params={"value": value},
) from ex
if isinstance(value, dict) and "type" in value:
try:
return self.model_class.model_validate(value, strict=True).cryptography
except PydanticValidationError as ex:
raise self.unparsable(value) from ex

# The passed value looks like arbitrary data, so we give the implementing subclass the opportunity
# to parse the value. parse_raw_extension() just raises ValidationError in the base class.
return self.parse_raw_extension(value)

def get_prep_value(self, value: Any) -> Optional[SerializedExtension]:
def get_prep_value(self, value: Any) -> Optional[SerializedPydanticExtension]:
"""Prepare the value so that it can be stored in the database.
This function is invoked during ``save()``. `value` may be the cryptography extension value (in
particular, if ``full_clean()`` was called before) or the serialized extension.
particular, if ``full_clean()`` -> ``to_python()`` was called before) or the serialized extension.
"""
if value is None: # pragma: no cover # this happens during migrations
return value
Expand All @@ -356,6 +394,9 @@ def get_prep_value(self, value: Any) -> Optional[SerializedExtension]:

return value # type: ignore[return-value]

if isinstance(value, self.model_class):
return value.model_dump(mode="json")

if not isinstance(value, x509.Extension):
raise ValidationError(
self.error_messages["invalid-type"],
Expand All @@ -369,7 +410,7 @@ def get_prep_value(self, value: Any) -> Optional[SerializedExtension]:
params={"extension_class": self.extension_class.__name__},
)

return serialize_extension(value)
return self.model_class.model_validate(value).model_dump(mode="json")

def validate(self, value: x509.Extension[ExtensionTypeTypeVar], model_instance: Any) -> None:
"""Handle field-specific validation.
Expand All @@ -394,8 +435,73 @@ def validate(self, value: x509.Extension[ExtensionTypeTypeVar], model_instance:
)


class CertificatePoliciesField(ExtensionField[x509.CertificatePolicies]):
class CertificatePoliciesField(ExtensionField[x509.CertificatePolicies, CertificatePoliciesModel]):
"""Field storing a :py:class:`~cg:cryptography.x509.CertificatePolicies`-based extension."""

description = _("A Certificate Policies extension object.")
extension_class = x509.CertificatePolicies
model_class = CertificatePoliciesModel

def _parse_notice_reference(
self, value: Optional[SerializedNoticeReference]
) -> Optional[x509.NoticeReference]:
if not value:
return None

return x509.NoticeReference(
organization=value.get("organization"), notice_numbers=value["notice_numbers"]
)

def _parse_user_notice(self, value: SerializedUserNotice) -> x509.UserNotice:
notice_reference = self._parse_notice_reference(value.get("notice_reference"))
return x509.UserNotice(notice_reference=notice_reference, explicit_text=value.get("explicit_text"))

def _parse_policy_qualifiers(
self, value: Optional[List[Union[str, SerializedUserNotice]]]
) -> Optional[List[Union[str, x509.UserNotice]]]:
if value is None:
return None

qualifiers: List[Union[str, x509.UserNotice]] = []

for qual in value:
if isinstance(qual, str):
qualifiers.append(qual)
else:
qualifiers.append(self._parse_user_notice(qual))
return qualifiers

def _parse_certificate_policies(
self, value: List[SerializedPolicyInformation]
) -> x509.CertificatePolicies:
policies: List[x509.PolicyInformation] = []
for pol in value:
identifier = x509.ObjectIdentifier(pol["policy_identifier"])
qualifiers = self._parse_policy_qualifiers(pol.get("policy_qualifiers"))

policies.append(
x509.PolicyInformation(policy_identifier=identifier, policy_qualifiers=qualifiers)
)

return x509.CertificatePolicies(policies)

def parse_raw_extension(self, value: JSON) -> x509.Extension[x509.CertificatePolicies]:
oid = self.extension_class.oid
if not isinstance(value, dict):
raise self.unparsable(value)

critical = value.get("critical", constants.EXTENSION_DEFAULT_CRITICAL[oid])
if not isinstance(critical, bool):
raise self.unparsable(value)

serialized_certificate_policies: List[SerializedPolicyInformation] = typing.cast(
List[SerializedPolicyInformation], value.get("value")
)
if not isinstance(serialized_certificate_policies, list):
raise self.unparsable(value)

try:
parsed = self._parse_certificate_policies(serialized_certificate_policies)
except Exception as ex:
raise self.unparsable(value) from ex
return x509.Extension(oid=oid, critical=critical, value=parsed)
110 changes: 110 additions & 0 deletions ca/django_ca/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import coverage

from cryptography import x509
from cryptography.x509.oid import CertificatePoliciesOID, ExtensionOID

import pytest
from _pytest.config import Config as PytestConfig
Expand Down Expand Up @@ -202,3 +203,112 @@ def precertificate_signed_certificate_timestamps_pub(request: "SubRequest") -> I
name = request.param.replace("-", "_")

yield request.getfixturevalue(f"{name}_pub")


@pytest.fixture(
params=(
[x509.PolicyInformation(policy_identifier=CertificatePoliciesOID.ANY_POLICY, policy_qualifiers=None)],
[
x509.PolicyInformation(
policy_identifier=CertificatePoliciesOID.ANY_POLICY, policy_qualifiers=["example"]
)
],
[
x509.PolicyInformation(
policy_identifier=CertificatePoliciesOID.ANY_POLICY,
policy_qualifiers=[x509.UserNotice(notice_reference=None, explicit_text=None)],
)
],
[
x509.PolicyInformation(
policy_identifier=CertificatePoliciesOID.ANY_POLICY,
policy_qualifiers=[x509.UserNotice(notice_reference=None, explicit_text="explicit text")],
)
],
[
x509.PolicyInformation(
policy_identifier=CertificatePoliciesOID.ANY_POLICY,
policy_qualifiers=[
x509.UserNotice(
notice_reference=x509.NoticeReference(organization=None, notice_numbers=[]),
explicit_text="explicit",
)
],
)
],
[ # notice reference with org, but still empty notice numbers
x509.PolicyInformation(
policy_identifier=CertificatePoliciesOID.ANY_POLICY,
policy_qualifiers=[
x509.UserNotice(
notice_reference=x509.NoticeReference(organization="MyOrg", notice_numbers=[]),
explicit_text="explicit",
)
],
)
],
[
x509.PolicyInformation(
policy_identifier=CertificatePoliciesOID.ANY_POLICY,
policy_qualifiers=[
x509.UserNotice(
notice_reference=x509.NoticeReference(organization="MyOrg", notice_numbers=[1, 2, 3]),
explicit_text="explicit",
)
],
)
],
[ # test multiple qualifiers
x509.PolicyInformation(
policy_identifier=CertificatePoliciesOID.ANY_POLICY,
policy_qualifiers=["simple qualifier 1", "simple_qualifier 2"],
)
],
[ # test multiple complex qualifiers
x509.PolicyInformation(
policy_identifier=CertificatePoliciesOID.ANY_POLICY,
policy_qualifiers=[
"simple qualifier 1",
x509.UserNotice(
notice_reference=x509.NoticeReference(organization="MyOrg 2", notice_numbers=[2, 4]),
explicit_text="explicit 2",
),
"simple qualifier 3",
x509.UserNotice(
notice_reference=x509.NoticeReference(organization="MyOrg 4", notice_numbers=[]),
explicit_text="explicit 4",
),
],
)
],
[ # test multiple policy information
x509.PolicyInformation(
policy_identifier=CertificatePoliciesOID.ANY_POLICY,
policy_qualifiers=["simple qualifier 1", "simple_qualifier 2"],
),
x509.PolicyInformation(
policy_identifier=CertificatePoliciesOID.ANY_POLICY,
policy_qualifiers=[
"simple qualifier 1",
x509.UserNotice(
notice_reference=x509.NoticeReference(organization="MyOrg 2", notice_numbers=[2, 4]),
explicit_text="explicit 2",
),
],
),
],
)
)
def certificate_policies_value(request: "SubRequest") -> Iterator[x509.CertificatePolicies]:
"""Parametrized fixture with many different x509.CertificatePolicies objects."""
yield x509.CertificatePolicies(policies=request.param)


@pytest.fixture(params=(True, False))
def certificate_policies(
request: "SubRequest", certificate_policies_value: x509.CertificatePolicies
) -> Iterator[x509.Extension[x509.CertificatePolicies]]:
"""Parametrized fixture yielding different x509.Extension[x509.CertificatePolicies] objects."""
yield x509.Extension(
critical=request.param, oid=ExtensionOID.CERTIFICATE_POLICIES, value=certificate_policies_value
)
2 changes: 0 additions & 2 deletions ca/django_ca/tests/pydantic/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
import pytest

from django_ca.pydantic.base import CryptographyModel
from django_ca.pydantic.extensions import ExtensionModel

CryptographyModelTypeVar = TypeVar("CryptographyModelTypeVar", bound=CryptographyModel[Any])
ExtensionModelTypeVar = TypeVar("ExtensionModelTypeVar", bound=ExtensionModel[Any])
ExpectedErrors = List[Tuple[str, Tuple[str, ...], Union[str, "re.Pattern[str]"]]]


Expand Down
Loading

0 comments on commit f75e1fe

Please sign in to comment.