Skip to content

Commit

Permalink
Add ability to use the API on an individual basis
Browse files Browse the repository at this point in the history
  • Loading branch information
mathiasertl committed Oct 28, 2023
1 parent dc928a5 commit da9f35a
Show file tree
Hide file tree
Showing 27 changed files with 374 additions and 94 deletions.
8 changes: 7 additions & 1 deletion ca/django_ca/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,15 +425,18 @@ def get_fieldsets( # type: ignore[override]
if obj is None: # pragma: no cover # we never add certificate authorities, so it's never None
return fieldsets

# Mark certificate policies as read-only if the configured extension is to complex for the widget.
sign_certificate_policies = obj.sign_certificate_policies
if sign_certificate_policies and not certificate_policies_is_simple(sign_certificate_policies.value):
detail_fields = fieldsets[1][1]["fields"]
sign_certificate_policies_index = detail_fields.index("sign_certificate_policies")
detail_fields[sign_certificate_policies_index] = "sign_certificate_policies_readonly"

api_index = 1
if ca_settings.CA_ENABLE_ACME:
api_index = 2
fieldsets.insert(
2,
1,
(
_("ACME"),
{
Expand All @@ -447,6 +450,9 @@ def get_fieldsets( # type: ignore[override]
),
)

if ca_settings.CA_ENABLE_REST_API:
fieldsets.insert(api_index, (_("API"), {"fields": ["api_enabled"]}))

return fieldsets

def get_readonly_fields( # type: ignore[override]
Expand Down
6 changes: 4 additions & 2 deletions ca/django_ca/api/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def list_certificate_authorities(
request: WSGIRequest, filters: CertificateAuthorityFilterSchema = Query(...)
) -> CertificateAuthorityQuerySet:
"""Retrieve a list of currently usable certificate authorities."""
qs = CertificateAuthority.objects.enabled()
qs = CertificateAuthority.objects.enabled().exclude(api_enabled=False)
if filters.expired is False:
qs = qs.valid()
return qs
Expand Down Expand Up @@ -148,7 +148,9 @@ def sign_certificate(request: WSGIRequest, serial: str, data: SignCertificateSch
)
def get_certificate_order(request: WSGIRequest, serial: str, slug: str) -> CertificateOrder:
"""Retrieve information about the certificate order identified by `slug`."""
return CertificateOrder.objects.get(certificate_authority__serial=serial, slug=slug)
return CertificateOrder.objects.get(
certificate_authority__serial=serial, certificate_authority__api_enabled=True, slug=slug
)


@api.get(
Expand Down
2 changes: 1 addition & 1 deletion ca/django_ca/api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

def get_certificate_authority(serial: str, expired: bool = False) -> CertificateAuthority:
"""Get a certificate authority from the given serial."""
qs = CertificateAuthority.objects.enabled()
qs = CertificateAuthority.objects.enabled().exclude(api_enabled=False)
if expired is False:
qs = qs.valid()

Expand Down
7 changes: 6 additions & 1 deletion ca/django_ca/management/commands/edit_ca.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def add_arguments(self, parser: CommandParser) -> None:
self.add_ca(parser, "ca", allow_disabled=True)
self.add_acme_group(parser)
self.add_ocsp_group(parser)
self.add_rest_api_group(parser)
self.add_ca_args(parser)

group = parser.add_mutually_exclusive_group()
Expand Down Expand Up @@ -98,7 +99,7 @@ def handle( # pylint: disable=too-many-arguments
ca.terms_of_service = options["tos"]

# Set ACME options
if ca_settings.CA_ENABLE_ACME: # pragma: no branch; never False because parser throws error already
if ca_settings.CA_ENABLE_ACME: # pragma: no branch; never False b/c parser throws error already
for param in ["acme_enabled", "acme_registration", "acme_requires_contact"]:
if options[param] is not None:
setattr(ca, param, options[param])
Expand All @@ -108,6 +109,10 @@ def handle( # pylint: disable=too-many-arguments
raise CommandError(f"{acme_profile}: Profile is not defined.")
ca.acme_profile = acme_profile

if ca_settings.CA_ENABLE_REST_API: # pragma: no branch; never False b/c parser throws error already
if (api_enabled := options.get("api_enabled")) is not None:
ca.api_enabled = api_enabled

# Set OCSP responder options
if ocsp_responder_key_validity is not None:
ca.ocsp_responder_key_validity = ocsp_responder_key_validity
Expand Down
6 changes: 6 additions & 0 deletions ca/django_ca/management/commands/import_ca.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def add_arguments(self, parser: CommandParser) -> None:

self.add_acme_group(parser)
self.add_ocsp_group(parser)
self.add_rest_api_group(parser)
self.add_ca_args(parser)

parser.add_argument("name", help="Human-readable name of the CA")
Expand Down Expand Up @@ -150,6 +151,11 @@ def handle( # pylint: disable=too-many-locals,too-many-arguments
if acme_profile := options["acme_profile"]:
ca.acme_profile = acme_profile

# Set API options
if ca_settings.CA_ENABLE_REST_API: # pragma: no branch; never False b/c parser throws error already
if (api_enabled := options.get("api_enabled")) is not None:
ca.api_enabled = api_enabled

# load public key
try:
pem_loaded = x509.load_pem_x509_certificate(pem_data)
Expand Down
7 changes: 6 additions & 1 deletion ca/django_ca/management/commands/init_ca.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ def add_arguments(self, parser: CommandParser) -> None:

self.add_acme_group(parser)
self.add_ocsp_group(parser)
self.add_rest_api_group(parser)

self.add_authority_information_access_group(parser, ("--ca-ocsp-url",), ("--ca-issuer-url",))
self.add_basic_constraints_group(parser)
Expand Down Expand Up @@ -445,7 +446,7 @@ def handle( # pylint: disable=too-many-arguments,too-many-locals,too-many-branc
if options[opt] is not None:
kwargs[opt] = options[opt]

if ca_settings.CA_ENABLE_ACME: # pragma: no branch; never False because parser throws error already
if ca_settings.CA_ENABLE_ACME: # pragma: no branch; never False b/c parser throws error already
# These settings are only there if ACME is enabled
for opt in ["acme_enabled", "acme_registration", "acme_requires_contact"]:
if options[opt] is not None:
Expand All @@ -456,6 +457,10 @@ def handle( # pylint: disable=too-many-arguments,too-many-locals,too-many-branc
raise CommandError(f"{acme_profile}: Profile is not defined.")
kwargs["acme_profile"] = acme_profile

if ca_settings.CA_ENABLE_REST_API: # pragma: no branch; never False b/c parser throws error already
if (api_enabled := options.get("api_enabled")) is not None:
kwargs["api_enabled"] = api_enabled

try:
ca = CertificateAuthority.objects.init(
name=name,
Expand Down
20 changes: 19 additions & 1 deletion ca/django_ca/management/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def add_general_args(self, parser: CommandParser, default: Optional[str] = "") -
return group

def add_acme_group(self, parser: CommandParser) -> None:
"""Add arguments for ACMEv2."""
"""Add arguments for ACMEv2 (if enabled)."""
if not ca_settings.CA_ENABLE_ACME:
return

Expand Down Expand Up @@ -279,6 +279,24 @@ def add_ocsp_group(self, parser: CommandParser) -> None:
help="How long (*in seconds*) OCSP responses are valid (default: 86400).",
)

def add_rest_api_group(self, parser: CommandParser) -> None:
"""Add arguments for the REST API (if enabled)."""
if not ca_settings.CA_ENABLE_REST_API:
return

group = parser.add_argument_group("API Access")
enable_group = group.add_mutually_exclusive_group()
enable_group.add_argument(
"--api-enable",
dest="api_enabled",
action="store_true",
default=None,
help="Enable API support.",
)
enable_group.add_argument(
"--api-disable", dest="api_enabled", action="store_false", help="Disable API support."
)

def add_ca_args(self, parser: ActionsContainer) -> None:
"""Add CA arguments."""
group = parser.add_argument_group(
Expand Down
6 changes: 6 additions & 0 deletions ca/django_ca/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ def init(
sign_certificate_policies: Optional[x509.Extension[x509.CertificatePolicies]] = None,
ocsp_responder_key_validity: Optional[int] = None,
ocsp_response_validity: Optional[int] = None,
api_enabled: Optional[bool] = None,
) -> "CertificateAuthority":
"""Create a new certificate authority.
Expand Down Expand Up @@ -340,6 +341,8 @@ def init(
How long (in days) OCSP responder keys should be valid.
ocsp_response_validity : int, optional
How long (in seconds) OCSP responses should be valid.
api_enabled : bool, optional
If the REST API shall be enabled.
Raises
------
Expand Down Expand Up @@ -456,6 +459,7 @@ def init(
sign_certificate_policies=sign_certificate_policies,
ocsp_responder_key_validity=ocsp_responder_key_validity,
ocsp_response_validity=ocsp_response_validity,
api_enabled=api_enabled,
)

private_key = generate_private_key(key_size, key_type, elliptic_curve)
Expand Down Expand Up @@ -516,6 +520,8 @@ def init(
ca.ocsp_responder_key_validity = ocsp_responder_key_validity
if ocsp_response_validity is not None:
ca.ocsp_response_validity = ocsp_response_validity
if api_enabled is not None:
ca.api_enabled = api_enabled

ca.update_certificate(certificate)

Expand Down
21 changes: 21 additions & 0 deletions ca/django_ca/migrations/0036_certificateauthority_api_enabled.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Generated by Django 4.2.6 on 2023-10-28 08:15

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("django_ca", "0035_certificateorder"),
]

operations = [
migrations.AddField(
model_name="certificateauthority",
name="api_enabled",
field=models.BooleanField(
default=False,
help_text="Whether it is possible to use the API for this CA.",
verbose_name="Enable API",
),
),
]
7 changes: 7 additions & 0 deletions ca/django_ca/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,13 @@ class CertificateAuthority(X509CertMixin):
)
# CAA record and website are general fields

# API fields
api_enabled = models.BooleanField(
default=False,
verbose_name=_("Enable API"),
help_text=_("Whether it is possible to use the API for this CA."),
)

_key = None

def key(self, password: Optional[Union[str, bytes]] = None) -> CertificateIssuerPrivateKeyTypes:
Expand Down
7 changes: 6 additions & 1 deletion ca/django_ca/tests/admin/test_admin_ca.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,15 @@ class CertificateAuthorityAdminViewTestCase(StandardAdminViewTestCaseMixin[Certi
)

@override_settings(CA_ENABLE_ACME=False)
def test_change_view_with_acme(self) -> None:
def test_change_view_without_acme(self) -> None:
"""Basic tests but with ACME support disabled."""
self.test_change_view()

@override_settings(CA_ENABLE_REST_API=False)
def test_change_view_without_api(self) -> None:
"""Basic tests but with API support disabled."""
self.test_change_view()

def test_complex_sign_certificate_policies(self) -> None:
"""Test that complex Certificate Policy extensions are read-only."""
ca = self.cas["root"]
Expand Down
32 changes: 30 additions & 2 deletions ca/django_ca/tests/api/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
ListResponse = List[DetailResponse]


@pytest.fixture()
@pytest.fixture
def api_user(user: User, api_permission: Tuple[Type[Model], str]) -> User:
"""Extend user fixture to add required permission."""
content_type = ContentType.objects.get_for_model(api_permission[0])
Expand All @@ -42,14 +42,22 @@ def api_user(user: User, api_permission: Tuple[Type[Model], str]) -> User:
return user


@pytest.fixture()
@pytest.fixture
def api_client(client: Client, api_user: User) -> Client:
"""HTTP client with HTTP basic authentication for the user."""
credentials = base64.b64encode(api_user.username.encode("utf-8") + b":password").decode()
client.defaults["HTTP_AUTHORIZATION"] = "Basic " + credentials
return client


@pytest.fixture
def root(root: CertificateAuthority) -> CertificateAuthority:
"""Extend root fixture to enable API access."""
root.api_enabled = True
root.save()
return root


@pytest.fixture
def root_response(root: CertificateAuthority) -> DetailResponse:
"""Fixture for the expected response schema for the root CA."""
Expand Down Expand Up @@ -104,6 +112,8 @@ class APIPermissionTestBase:
"""Base class for testing permission handling in API views."""

path: str
expected_disabled_status_code = HTTPStatus.NOT_FOUND
expected_disabled_response: Any = {"detail": "Not Found"}

def request(self, client: Client) -> HttpResponse:
"""Make a default request to the view under test (non-GET requests must override this)."""
Expand Down Expand Up @@ -131,3 +141,21 @@ def test_user_with_no_permissions(self, user: User, api_client: Client) -> None:
response = self.request(api_client)
assert response.status_code == HTTPStatus.FORBIDDEN, response.content
assert response.json() == {"detail": "Forbidden"}, response.json()

def test_disabled_ca(self, api_client: Client, root: CertificateAuthority) -> None:
"""Test that disabling the API access for the CA really disables it."""
root.enabled = False
root.save()

response = self.request(api_client)
assert response.status_code == self.expected_disabled_status_code, response.content
assert response.json() == self.expected_disabled_response

def test_disabled_api_access(self, api_client: Client, root: CertificateAuthority) -> None:
"""Test that disabling the API access for the CA really disables it."""
root.api_enabled = False
root.save()

response = self.request(api_client)
assert response.status_code == self.expected_disabled_status_code, response.content
assert response.json() == self.expected_disabled_response
16 changes: 5 additions & 11 deletions ca/django_ca/tests/api/test_list_cas.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,12 @@ def test_list_view(api_client: Client, expected_response: ListResponse) -> None:
assert response.json() == expected_response, response.json()


@pytest.mark.usefixtures("root")
@freeze_time(TIMESTAMPS["everything_expired"])
def test_expired_certificate_authorities_are_excluded(api_client: Client) -> None:
"""Test that expired CAs are excluded by default."""
response = request(api_client)
assert CertificateAuthority.objects.count() > 0 # just to be sure that there would be some
assert response.status_code == HTTPStatus.OK, response.content
assert response.json() == [], response.json()

Expand All @@ -81,18 +83,10 @@ def test_expired_filter(api_client: Client, expected_response: ListResponse) ->
assert response.json() == expected_response, response.json()


@freeze_time(TIMESTAMPS["everything_valid"])
def test_disabled_ca(api_client: Client, root: CertificateAuthority) -> None:
"""Test that a disabled CA is *not* included."""
root.enabled = False
root.save()

response = request(api_client)
assert response.status_code == HTTPStatus.OK, response.content
assert response.json() == [], response.json()


class TestPermissions(APIPermissionTestBase):
"""Test permissions for this view."""

path = path

expected_disabled_status_code = HTTPStatus.OK
expected_disabled_response = []
12 changes: 0 additions & 12 deletions ca/django_ca/tests/api/test_sign_cert.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,18 +507,6 @@ def test_expired_ca(api_client: Client) -> None:
assert response.json() == {"detail": "Not Found"}, response.json()


@pytest.mark.usefixtures("tmpcadir")
@freeze_time(TIMESTAMPS["everything_valid"])
def test_disabled_ca(api_client: Client, root: CertificateAuthority) -> None:
"""Test that you cannot sign a certificate for a disabled CA."""
root.enabled = False
root.save()

response = request(api_client, {"csr": CERT_DATA["root-cert"]["csr"]["pem"], "subject": default_subject})
assert response.status_code == HTTPStatus.NOT_FOUND, response.content
assert response.json() == {"detail": "Not Found"}, response.json()


class TestPermissions(APIPermissionTestBase):
"""Test permissions for this view."""

Expand Down
Loading

0 comments on commit da9f35a

Please sign in to comment.