Skip to content

Commit

Permalink
X509 fixes (#111)
Browse files Browse the repository at this point in the history
* Return proper content type for the x509 certificate

* Remove parenthesis

* Remove extra-variables during the import

* Comment fix

* Remove double returns

* Change log level from trace to debug

* Remove 'pass' and add logging instead

* Remove unnecessary wrapping

Remove wrapping

* PEP 8: line too long

PEP8: line too long

* PEP8: Redefine RSAError variable in except clause

* Do not return None if name was not found

* Do not return None if no matched minions found

* Fix unit tests

Fix for log checking in x509 test

We are logging in debug and not in trace mode here.
  • Loading branch information
agraul authored and meaksh committed Apr 19, 2023
1 parent ef6da7d commit 094b347
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 55 deletions.
2 changes: 2 additions & 0 deletions salt/modules/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ def _publish(
else:
return ret

return {}


def publish(
tgt, fun, arg=None, tgt_type="glob", returner="", timeout=5, via_master=None
Expand Down
93 changes: 45 additions & 48 deletions salt/modules/x509.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,13 @@

try:
import M2Crypto

HAS_M2 = True
except ImportError:
HAS_M2 = False
M2Crypto = None

try:
import OpenSSL

HAS_OPENSSL = True
except ImportError:
HAS_OPENSSL = False
OpenSSL = None

__virtualname__ = "x509"

Expand Down Expand Up @@ -94,15 +91,10 @@ def __virtual__():
# salt.features appears to not be setup when invoked via peer publishing
if __opts__.get("features", {}).get("x509_v2"):
return (False, "Superseded, using x509_v2")
if HAS_M2:
salt.utils.versions.warn_until(
"Potassium",
"The x509 modules are deprecated. Please migrate to the replacement "
"modules (x509_v2). They are the default from Salt 3008 (Argon) onwards.",
)
return __virtualname__
else:
return (False, "Could not load x509 module, m2crypto unavailable")
return (
__virtualname__ if M2Crypto is not None else False,
"Could not load x509 module, m2crypto unavailable",
)


class _Ctx(ctypes.Structure):
Expand Down Expand Up @@ -160,8 +152,8 @@ def _new_extension(name, value, critical=0, issuer=None, _pyfree=1):
x509_ext_ptr = M2Crypto.m2.x509v3_ext_conf(None, ctx, name, value)
lhash = None
except AttributeError:
lhash = M2Crypto.m2.x509v3_lhash()
ctx = M2Crypto.m2.x509v3_set_conf_lhash(lhash)
lhash = M2Crypto.m2.x509v3_lhash() # pylint: disable=no-member
ctx = M2Crypto.m2.x509v3_set_conf_lhash(lhash) # pylint: disable=no-member
# ctx not zeroed
_fix_ctx(ctx, issuer)
x509_ext_ptr = M2Crypto.m2.x509v3_ext_conf(lhash, ctx, name, value)
Expand Down Expand Up @@ -300,7 +292,7 @@ def _get_signing_policy(name):
signing_policy = policies.get(name)
if signing_policy:
return signing_policy
return __salt__["config.get"]("x509_signing_policies", {}).get(name)
return __salt__["config.get"]("x509_signing_policies", {}).get(name) or {}


def _pretty_hex(hex_str):
Expand Down Expand Up @@ -338,9 +330,11 @@ def _text_or_file(input_):
"""
if _isfile(input_):
with salt.utils.files.fopen(input_) as fp_:
return salt.utils.stringutils.to_str(fp_.read())
out = salt.utils.stringutils.to_str(fp_.read())
else:
return salt.utils.stringutils.to_str(input_)
out = salt.utils.stringutils.to_str(input_)

return out


def _parse_subject(subject):
Expand All @@ -359,7 +353,7 @@ def _parse_subject(subject):
ret_list.append((nid_num, nid_name, val))
nids.append(nid_num)
except TypeError as err:
log.trace("Missing attribute '%s'. Error: %s", nid_name, err)
log.debug("Missing attribute '%s'. Error: %s", nid_name, err)
for nid_num, nid_name, val in sorted(ret_list):
ret[nid_name] = val
return ret
Expand Down Expand Up @@ -557,8 +551,8 @@ def get_pem_entries(glob_path):
if os.path.isfile(path):
try:
ret[path] = get_pem_entry(text=path)
except ValueError:
pass
except ValueError as err:
log.debug("Unable to get PEM entries from %s: %s", path, err)

return ret

Expand Down Expand Up @@ -636,8 +630,8 @@ def read_certificates(glob_path):
if os.path.isfile(path):
try:
ret[path] = read_certificate(certificate=path)
except ValueError:
pass
except ValueError as err:
log.debug("Unable to read certificate %s: %s", path, err)

return ret

Expand Down Expand Up @@ -667,10 +661,9 @@ def read_csr(csr):
"Subject": _parse_subject(csr.get_subject()),
"Subject Hash": _dec2hex(csr.get_subject().as_hash()),
"Public Key Hash": hashlib.sha1(csr.get_pubkey().get_modulus()).hexdigest(),
"X509v3 Extensions": _get_csr_extensions(csr),
}

ret["X509v3 Extensions"] = _get_csr_extensions(csr)

return ret


Expand Down Expand Up @@ -980,7 +973,7 @@ def create_crl(
# pyOpenSSL Note due to current limitations in pyOpenSSL it is impossible
# to specify a digest For signing the CRL. This will hopefully be fixed
# soon: https://github.com/pyca/pyopenssl/pull/161
if not HAS_OPENSSL:
if OpenSSL is None:
raise salt.exceptions.SaltInvocationError(
"Could not load OpenSSL module, OpenSSL unavailable"
)
Expand Down Expand Up @@ -1131,6 +1124,7 @@ def get_signing_policy(signing_policy_name):
signing_policy = _get_signing_policy(signing_policy_name)
if not signing_policy:
return "Signing policy {} does not exist.".format(signing_policy_name)

if isinstance(signing_policy, list):
dict_ = {}
for item in signing_policy:
Expand All @@ -1147,7 +1141,7 @@ def get_signing_policy(signing_policy_name):
signing_policy["signing_cert"], "CERTIFICATE"
)
except KeyError:
pass
log.debug('Unable to get "certificate" PEM entry')

return signing_policy

Expand Down Expand Up @@ -1782,7 +1776,8 @@ def create_csr(path=None, text=False, **kwargs):
)
)

for entry in sorted(subject.nid):
# pylint: disable=unused-variable
for entry, num in subject.nid.items():
if entry in kwargs:
setattr(subject, entry, kwargs[entry])

Expand Down Expand Up @@ -1818,18 +1813,18 @@ def create_csr(path=None, text=False, **kwargs):
extstack.push(ext)

csr.add_extensions(extstack)

csr.sign(
_get_private_key_obj(
kwargs["private_key"], passphrase=kwargs["private_key_passphrase"]
),
kwargs["algorithm"],
)

if path:
return write_pem(text=csr.as_pem(), path=path, pem_type="CERTIFICATE REQUEST")
else:
return csr.as_pem()
return (
write_pem(text=csr.as_pem(), path=path, pem_type="CERTIFICATE REQUEST")
if path
else csr.as_pem()
)


def verify_private_key(private_key, public_key, passphrase=None):
Expand All @@ -1854,7 +1849,7 @@ def verify_private_key(private_key, public_key, passphrase=None):
salt '*' x509.verify_private_key private_key=/etc/pki/myca.key \\
public_key=/etc/pki/myca.crt
"""
return bool(get_public_key(private_key, passphrase) == get_public_key(public_key))
return get_public_key(private_key, passphrase) == get_public_key(public_key)


def verify_signature(
Expand Down Expand Up @@ -1910,7 +1905,10 @@ def verify_crl(crl, cert):
salt '*' x509.verify_crl crl=/etc/pki/myca.crl cert=/etc/pki/myca.crt
"""
if not salt.utils.path.which("openssl"):
raise salt.exceptions.SaltInvocationError("openssl binary not found in path")
raise salt.exceptions.SaltInvocationError(
'External command "openssl" not found'
)

crltext = _text_or_file(crl)
crltext = get_pem_entry(crltext, pem_type="X509 CRL")
crltempfile = tempfile.NamedTemporaryFile(delete=True)
Expand Down Expand Up @@ -1970,8 +1968,9 @@ def expired(certificate):
ret["expired"] = True
else:
ret["expired"] = False
except ValueError:
pass
except ValueError as err:
log.debug("Failed to get data of expired certificate: %s", err)
log.trace(err, exc_info=True)

return ret

Expand All @@ -1994,6 +1993,7 @@ def will_expire(certificate, days):
salt '*' x509.will_expire "/etc/pki/mycert.crt" days=30
"""
ts_pt = "%Y-%m-%d %H:%M:%S"
ret = {}

if os.path.isfile(certificate):
Expand All @@ -2007,14 +2007,11 @@ def will_expire(certificate, days):
_expiration_date = cert.get_not_after().get_datetime()

ret["cn"] = _parse_subject(cert.get_subject())["CN"]

if _expiration_date.strftime("%Y-%m-%d %H:%M:%S") <= _check_time.strftime(
"%Y-%m-%d %H:%M:%S"
):
ret["will_expire"] = True
else:
ret["will_expire"] = False
except ValueError:
pass
ret["will_expire"] = _expiration_date.strftime(
ts_pt
) <= _check_time.strftime(ts_pt)
except ValueError as err:
log.debug("Unable to return details of a sertificate expiration: %s", err)
log.trace(err, exc_info=True)

return ret
74 changes: 70 additions & 4 deletions salt/states/x509.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,12 @@
import salt.exceptions
import salt.utils.versions
from salt.features import features
import salt.utils.stringutils

try:
from M2Crypto.RSA import RSAError
except ImportError:
pass
RSAError = Exception("RSA Error")

log = logging.getLogger(__name__)

Expand All @@ -215,7 +216,7 @@ def __virtual__():
)
return "x509"
else:
return (False, "Could not load x509 state: m2crypto unavailable")
return False, "Could not load x509 state: the x509 is not available"


def _revoked_to_list(revs):
Expand Down Expand Up @@ -704,7 +705,70 @@ def certificate_managed(name, days_remaining=90, append_certs=None, **kwargs):
"Old": invalid_reason,
"New": "Certificate will be valid and up to date",
}
return ret
private_key_args.update(managed_private_key)
kwargs["public_key_passphrase"] = private_key_args["passphrase"]

if private_key_args["new"]:
rotate_private_key = True
private_key_args["new"] = False

if _check_private_key(
private_key_args["name"],
bits=private_key_args["bits"],
passphrase=private_key_args["passphrase"],
new=private_key_args["new"],
overwrite=private_key_args["overwrite"],
):
private_key = __salt__["x509.get_pem_entry"](
private_key_args["name"], pem_type="RSA PRIVATE KEY"
)
else:
new_private_key = True
private_key = __salt__["x509.create_private_key"](
text=True,
bits=private_key_args["bits"],
passphrase=private_key_args["passphrase"],
cipher=private_key_args["cipher"],
verbose=private_key_args["verbose"],
)

kwargs["public_key"] = private_key

current_days_remaining = 0
current_comp = {}

if os.path.isfile(name):
try:
current = __salt__["x509.read_certificate"](certificate=name)
current_comp = copy.deepcopy(current)
if "serial_number" not in kwargs:
current_comp.pop("Serial Number")
if "signing_cert" not in kwargs:
try:
current_comp["X509v3 Extensions"][
"authorityKeyIdentifier"
] = re.sub(
r"serial:([0-9A-F]{2}:)*[0-9A-F]{2}",
"serial:--",
current_comp["X509v3 Extensions"]["authorityKeyIdentifier"],
)
except KeyError:
pass
current_comp.pop("Not Before")
current_comp.pop("MD5 Finger Print")
current_comp.pop("SHA1 Finger Print")
current_comp.pop("SHA-256 Finger Print")
current_notafter = current_comp.pop("Not After")
current_days_remaining = (
datetime.datetime.strptime(current_notafter, "%Y-%m-%d %H:%M:%S")
- datetime.datetime.now()
).days
if days_remaining == 0:
days_remaining = current_days_remaining - 1
except salt.exceptions.SaltInvocationError:
current = "{} is not a valid Certificate.".format(name)
else:
current = "{} does not exist.".format(name)

contents = __salt__["x509.create_certificate"](text=True, **kwargs)
# Check the module actually returned a cert and not an error message as a string
Expand Down Expand Up @@ -900,6 +964,8 @@ def pem_managed(name, text, backup=False, **kwargs):
Any arguments supported by :py:func:`file.managed <salt.states.file.managed>` are supported.
"""
file_args, kwargs = _get_file_args(name, **kwargs)
file_args["contents"] = __salt__["x509.get_pem_entry"](text=text)
file_args["contents"] = salt.utils.stringutils.to_str(
__salt__["x509.get_pem_entry"](text=text)
)

return __states__["file.managed"](**file_args)
6 changes: 3 additions & 3 deletions tests/unit/modules/test_x509.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ def __getattr__(self, item):

subj = FakeSubject()
x509._parse_subject(subj)
assert x509.log.trace.call_args[0][0] == "Missing attribute '%s'. Error: %s"
assert x509.log.trace.call_args[0][1] == list(subj.nid.keys())[0]
assert isinstance(x509.log.trace.call_args[0][2], TypeError)
assert x509.log.debug.call_args[0][0] == "Missing attribute '%s'. Error: %s"
assert x509.log.debug.call_args[0][1] == list(subj.nid.keys())[0]
assert isinstance(x509.log.debug.call_args[0][2], TypeError)

@pytest.mark.skipif(
not HAS_M2CRYPTO, reason="Skipping, reason=M2Crypto is unavailable"
Expand Down

0 comments on commit 094b347

Please sign in to comment.