Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support reissue by serial numbers #4960

Merged
merged 4 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 56 additions & 15 deletions lemur/certificates/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@
:license: Apache, see LICENSE for more details.
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
"""
import arrow
import sys
import click
from time import sleep

import arrow
import click
from flask import current_app
from flask_principal import Identity, identity_changed
from sentry_sdk import capture_exception
from sqlalchemy import or_
from sqlalchemy.orm.exc import MultipleResultsFound
from tabulate import tabulate
from time import sleep
from sentry_sdk import capture_exception

from lemur import database
from lemur.authorities.models import Authority
Expand All @@ -35,7 +36,7 @@
list_recent_valid_certs_issued_by_authority,
get_certificates_with_same_cn_with_rotate_on,
identify_and_persist_expiring_deployed_certificates,
send_certificate_expiration_metrics
send_certificate_expiration_metrics, get_by_serial
)
from lemur.certificates.verify import verify_string
from lemur.constants import SUCCESS_METRIC_STATUS, FAILURE_METRIC_STATUS, CRLReason
Expand All @@ -46,7 +47,6 @@
from lemur.notifications.messaging import send_rotation_notification, send_reissue_no_endpoints_notification, \
send_reissue_failed_notification
from lemur.plugins.base import plugins
from sqlalchemy.orm.exc import MultipleResultsFound


@click.group(name="certificates", help="Handles all certificate related tasks.")
Expand Down Expand Up @@ -96,6 +96,25 @@ def validate_certificate(certificate_name):
return cert


def validate_certificates_by_serial_numbers(serial_numbers):
"""
Ensuring that the specified certificate(s) exist.
:param serial_numbers:
:return:
"""
if serial_numbers:
all_certs = []
for serial_number in serial_numbers:
certs = get_by_serial(serial_number)

if not certs or len(certs) == 0:
click.echo(f"[-] No certificate found with serial number: {serial_number}")
sys.exit(1)
all_certs.extend(certs)

return all_certs


def validate_endpoint(endpoint_name):
"""
Ensuring that the specified endpoint exists.
Expand Down Expand Up @@ -534,6 +553,13 @@ def rotate_region(endpoint_name, new_certificate_name, old_certificate_name, mes
"old_certificate_name",
help="Name of the certificate you wish to reissue.",
)
@click.option(
"-s",
"--serial-numbers",
"serial_numbers",
multiple=True,
help="Serial number(s) of the certificate(s) you wish to reissue.",
)
@click.option(
"-a",
"--notify",
Expand All @@ -550,15 +576,18 @@ def rotate_region(endpoint_name, new_certificate_name, old_certificate_name, mes
default=False,
help="Persist changes.",
)
def reissue_command(old_certificate_name, notify, commit):
reissue(old_certificate_name, notify, commit)
def reissue_command(old_certificate_name, serial_numbers, notify, commit):
jtschladen marked this conversation as resolved.
Show resolved Hide resolved
reissue(old_certificate_name, serial_numbers, notify, commit)


def reissue(old_certificate_name, notify, commit):
def reissue(old_certificate_name, notify, commit, serial_numbers):
"""
Reissues certificate with the same parameters as it was originally issued with.
If not time period is provided, reissues certificate as valid from today to
If no time period is provided, reissues certificate as valid from today to
today + length of original.
Accepts both name and a list of serial numbers; if both are provided, both will be used.
If neither name nor serial numbers are provided, all certs pending reissue will be reissued.
Note that serial numbers must be in decimal format (not hex).
"""
if commit:
click.echo("[!] Running in COMMIT mode.")
Expand All @@ -568,12 +597,24 @@ def reissue(old_certificate_name, notify, commit):
status = FAILURE_METRIC_STATUS

try:
old_cert = validate_certificate(old_certificate_name)
certs_to_reissue = []

if not old_cert:
for certificate in get_all_pending_reissue():
request_reissue(certificate, notify, commit)
else:
old_cert_by_name = validate_certificate(old_certificate_name)
if old_cert_by_name:
certs_to_reissue.append(old_cert_by_name)
click.echo("[+] Reissuing 1 certificate by name")

old_certs_by_serial_numbers = validate_certificates_by_serial_numbers(serial_numbers)
if old_certs_by_serial_numbers:
certs_to_reissue.extend(old_certs_by_serial_numbers)
click.echo(f"[+] Reissuing {len(old_certs_by_serial_numbers)} certificates by serial number")

# if neither name nor serial numbers were specified, reissue all pending reissues
if not certs_to_reissue:
certs_to_reissue = get_all_pending_reissue()
click.echo(f"[+] Reissuing all {len(certs_to_reissue)} pending certificates.")

for old_cert in certs_to_reissue:
request_reissue(old_cert, notify, commit)

status = SUCCESS_METRIC_STATUS
Expand Down
5 changes: 3 additions & 2 deletions lemur/common/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@
import copy
import sys
import time
from datetime import datetime, timezone, timedelta

from celery import Celery
from celery.app.task import Context
from celery.exceptions import SoftTimeLimitExceeded
from celery.signals import task_failure, task_received, task_revoked, task_success
from datetime import datetime, timezone, timedelta
from flask import current_app
from sentry_sdk import capture_exception

Expand Down Expand Up @@ -590,7 +591,7 @@ def certificate_reissue():
current_app.logger.debug(log_data)
try:
notify = current_app.config.get("ENABLE_REISSUE_NOTIFICATION", None)
cli_certificate.reissue(None, notify, True)
cli_certificate.reissue(None, notify, True, None)
except SoftTimeLimitExceeded:
log_data["message"] = "Certificate reissue: Time limit exceeded."
current_app.logger.error(log_data)
Expand Down
1 change: 1 addition & 0 deletions lemur/tests/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class CertificateFactory(BaseFactory):
"""Certificate factory."""

name = Sequence(lambda n: f"certificate{n}")
serial = Sequence(lambda n: f"{n}")
chain = INTERMEDIATE_CERT_STR
body = SAN_CERT_STR
private_key = SAN_CERT_KEY
Expand Down
67 changes: 67 additions & 0 deletions lemur/tests/test_certificates.py
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,73 @@ def test_reissue_certificate(
assert new_cert.organization == certificate.organization


def test_reissue_command_by_name(
issuer_plugin, crypto_authority, logged_in_user
):
from lemur.certificates.cli import reissue
from lemur.tests.conf import LEMUR_DEFAULT_ORGANIZATION
from lemur.tests.factories import CertificateFactory

certificate = CertificateFactory(name="to_be_reissued_cert", authority=crypto_authority)

reissue(certificate.name, False, True, None)

new_cert = certificate.replaced[0]
assert new_cert
assert new_cert.key_type == "RSA2048"
assert new_cert.organization != certificate.organization
# Check for default value since authority does not have cab_compliant option set
assert new_cert.organization == LEMUR_DEFAULT_ORGANIZATION
assert new_cert.description.startswith(f"Reissued by Lemur for cert ID {certificate.id}")


def test_reissue_command_by_serial_numbers(
issuer_plugin, crypto_authority, logged_in_user
):
from lemur.certificates.cli import reissue
from lemur.tests.conf import LEMUR_DEFAULT_ORGANIZATION
from lemur.tests.factories import CertificateFactory

cert1 = CertificateFactory(name="to_be_reissued_cert_1", authority=crypto_authority)
cert2 = CertificateFactory(name="to_be_reissued_cert_2", authority=crypto_authority)
cert3 = CertificateFactory(name="to_be_reissued_cert_3", authority=crypto_authority)

reissue(None, False, True, [cert1.serial, cert2.serial, cert3.serial])

for cert in [cert1, cert2, cert3]:
new_cert = cert.replaced[0]
assert new_cert
assert new_cert.key_type == "RSA2048"
assert new_cert.organization != cert.organization
# Check for default value since authority does not have cab_compliant option set
assert new_cert.organization == LEMUR_DEFAULT_ORGANIZATION
assert new_cert.description.startswith(f"Reissued by Lemur for cert ID {cert.id}")


def test_reissue_command_by_name_and_serial_numbers(
issuer_plugin, crypto_authority, logged_in_user
):
from lemur.certificates.cli import reissue
from lemur.tests.conf import LEMUR_DEFAULT_ORGANIZATION
from lemur.tests.factories import CertificateFactory

cert1 = CertificateFactory(name="to_be_reissued_cert_1", authority=crypto_authority)
cert2 = CertificateFactory(name="to_be_reissued_cert_2", authority=crypto_authority)
cert3 = CertificateFactory(name="to_be_reissued_cert_3", authority=crypto_authority)
cert4 = CertificateFactory(name="to_be_reissued_cert_4", authority=crypto_authority)

reissue(cert1.name, False, True, [cert2.serial, cert3.serial, cert4.serial])

for cert in [cert1, cert2, cert3, cert4]:
new_cert = cert.replaced[0]
assert new_cert
assert new_cert.key_type == "RSA2048"
assert new_cert.organization != cert.organization
# Check for default value since authority does not have cab_compliant option set
assert new_cert.organization == LEMUR_DEFAULT_ORGANIZATION
assert new_cert.description.startswith(f"Reissued by Lemur for cert ID {cert.id}")


def test_create_csr():
csr, private_key = create_csr(
owner="joe@example.com",
Expand Down