From 1b78ceabaf3885e7d90d13a041685e7dda960ac9 Mon Sep 17 00:00:00 2001 From: Alexander Graul Date: Tue, 18 Jan 2022 16:38:17 +0100 Subject: [PATCH] X509 fixes (#111) * Return proper content type for the x509 certificate * Remove parenthesis * Remove extra-variables during the import * Comment fix * Remove double returns * Change log level from trace to debug * Remove 'pass' and add logging instead * Remove unnecessary wrapping Remove wrapping * PEP 8: line too long PEP8: line too long * PEP8: Redefine RSAError variable in except clause * Do not return None if name was not found * Do not return None if no matched minions found * Fix unit tests Fix for log checking in x509 test We are logging in debug and not in trace mode here. --- salt/modules/publish.py | 2 + salt/modules/x509.py | 93 ++++++++++++++++----------------- salt/states/x509.py | 74 ++++++++++++++++++++++++-- tests/unit/modules/test_x509.py | 6 +-- 4 files changed, 121 insertions(+), 54 deletions(-) diff --git a/salt/modules/publish.py b/salt/modules/publish.py index c2dd62b913..45ee41ef19 100644 --- a/salt/modules/publish.py +++ b/salt/modules/publish.py @@ -199,6 +199,8 @@ def _publish( else: return ret + return {} + def publish( tgt, fun, arg=None, tgt_type="glob", returner="", timeout=5, via_master=None diff --git a/salt/modules/x509.py b/salt/modules/x509.py index 2f9cc5cc44..194116a85c 100644 --- a/salt/modules/x509.py +++ b/salt/modules/x509.py @@ -30,16 +30,13 @@ from salt.utils.odict import OrderedDict try: import M2Crypto - - HAS_M2 = True except ImportError: - HAS_M2 = False + M2Crypto = None + try: import OpenSSL - - HAS_OPENSSL = True except ImportError: - HAS_OPENSSL = False + OpenSSL = None __virtualname__ = "x509" @@ -79,10 +76,10 @@ def __virtual__(): """ only load this module if m2crypto is available """ - if HAS_M2: - return __virtualname__ - else: - return (False, "Could not load x509 module, m2crypto unavailable") + return ( + __virtualname__ if M2Crypto is not None else False, + "Could not load x509 module, m2crypto unavailable", + ) class _Ctx(ctypes.Structure): @@ -140,8 +137,8 @@ def _new_extension(name, value, critical=0, issuer=None, _pyfree=1): x509_ext_ptr = M2Crypto.m2.x509v3_ext_conf(None, ctx, name, value) lhash = None except AttributeError: - lhash = M2Crypto.m2.x509v3_lhash() - ctx = M2Crypto.m2.x509v3_set_conf_lhash(lhash) + lhash = M2Crypto.m2.x509v3_lhash() # pylint: disable=no-member + ctx = M2Crypto.m2.x509v3_set_conf_lhash(lhash) # pylint: disable=no-member # ctx not zeroed _fix_ctx(ctx, issuer) x509_ext_ptr = M2Crypto.m2.x509v3_ext_conf(lhash, ctx, name, value) @@ -280,7 +277,7 @@ def _get_signing_policy(name): signing_policy = policies.get(name) if signing_policy: return signing_policy - return __salt__["config.get"]("x509_signing_policies", {}).get(name) + return __salt__["config.get"]("x509_signing_policies", {}).get(name) or {} def _pretty_hex(hex_str): @@ -318,9 +315,11 @@ def _text_or_file(input_): """ if _isfile(input_): with salt.utils.files.fopen(input_) as fp_: - return salt.utils.stringutils.to_str(fp_.read()) + out = salt.utils.stringutils.to_str(fp_.read()) else: - return salt.utils.stringutils.to_str(input_) + out = salt.utils.stringutils.to_str(input_) + + return out def _parse_subject(subject): @@ -339,7 +338,7 @@ def _parse_subject(subject): ret_list.append((nid_num, nid_name, val)) nids.append(nid_num) except TypeError as err: - log.trace("Missing attribute '%s'. Error: %s", nid_name, err) + log.debug("Missing attribute '%s'. Error: %s", nid_name, err) for nid_num, nid_name, val in sorted(ret_list): ret[nid_name] = val return ret @@ -537,8 +536,8 @@ def get_pem_entries(glob_path): if os.path.isfile(path): try: ret[path] = get_pem_entry(text=path) - except ValueError: - pass + except ValueError as err: + log.debug("Unable to get PEM entries from %s: %s", path, err) return ret @@ -616,8 +615,8 @@ def read_certificates(glob_path): if os.path.isfile(path): try: ret[path] = read_certificate(certificate=path) - except ValueError: - pass + except ValueError as err: + log.debug("Unable to read certificate %s: %s", path, err) return ret @@ -647,10 +646,9 @@ def read_csr(csr): "Subject": _parse_subject(csr.get_subject()), "Subject Hash": _dec2hex(csr.get_subject().as_hash()), "Public Key Hash": hashlib.sha1(csr.get_pubkey().get_modulus()).hexdigest(), + "X509v3 Extensions": _get_csr_extensions(csr), } - ret["X509v3 Extensions"] = _get_csr_extensions(csr) - return ret @@ -960,7 +958,7 @@ def create_crl( # pyOpenSSL Note due to current limitations in pyOpenSSL it is impossible # to specify a digest For signing the CRL. This will hopefully be fixed # soon: https://github.com/pyca/pyopenssl/pull/161 - if not HAS_OPENSSL: + if OpenSSL is None: raise salt.exceptions.SaltInvocationError( "Could not load OpenSSL module, OpenSSL unavailable" ) @@ -1111,6 +1109,7 @@ def get_signing_policy(signing_policy_name): signing_policy = _get_signing_policy(signing_policy_name) if not signing_policy: return "Signing policy {} does not exist.".format(signing_policy_name) + if isinstance(signing_policy, list): dict_ = {} for item in signing_policy: @@ -1127,7 +1126,7 @@ def get_signing_policy(signing_policy_name): signing_policy["signing_cert"], "CERTIFICATE" ) except KeyError: - pass + log.debug('Unable to get "certificate" PEM entry') return signing_policy @@ -1762,7 +1761,8 @@ def create_csr(path=None, text=False, **kwargs): ) ) - for entry in sorted(subject.nid): + # pylint: disable=unused-variable + for entry, num in subject.nid.items(): if entry in kwargs: setattr(subject, entry, kwargs[entry]) @@ -1798,7 +1798,6 @@ def create_csr(path=None, text=False, **kwargs): extstack.push(ext) csr.add_extensions(extstack) - csr.sign( _get_private_key_obj( kwargs["private_key"], passphrase=kwargs["private_key_passphrase"] @@ -1806,10 +1805,11 @@ def create_csr(path=None, text=False, **kwargs): kwargs["algorithm"], ) - if path: - return write_pem(text=csr.as_pem(), path=path, pem_type="CERTIFICATE REQUEST") - else: - return csr.as_pem() + return ( + write_pem(text=csr.as_pem(), path=path, pem_type="CERTIFICATE REQUEST") + if path + else csr.as_pem() + ) def verify_private_key(private_key, public_key, passphrase=None): @@ -1834,7 +1834,7 @@ def verify_private_key(private_key, public_key, passphrase=None): salt '*' x509.verify_private_key private_key=/etc/pki/myca.key \\ public_key=/etc/pki/myca.crt """ - return bool(get_public_key(private_key, passphrase) == get_public_key(public_key)) + return get_public_key(private_key, passphrase) == get_public_key(public_key) def verify_signature( @@ -1890,7 +1890,10 @@ def verify_crl(crl, cert): salt '*' x509.verify_crl crl=/etc/pki/myca.crl cert=/etc/pki/myca.crt """ if not salt.utils.path.which("openssl"): - raise salt.exceptions.SaltInvocationError("openssl binary not found in path") + raise salt.exceptions.SaltInvocationError( + 'External command "openssl" not found' + ) + crltext = _text_or_file(crl) crltext = get_pem_entry(crltext, pem_type="X509 CRL") crltempfile = tempfile.NamedTemporaryFile(delete=True) @@ -1912,10 +1915,7 @@ def verify_crl(crl, cert): crltempfile.close() certtempfile.close() - if "verify OK" in output: - return True - else: - return False + return "verify OK" in output def expired(certificate): @@ -1953,8 +1953,9 @@ def expired(certificate): ret["expired"] = True else: ret["expired"] = False - except ValueError: - pass + except ValueError as err: + log.debug("Failed to get data of expired certificate: %s", err) + log.trace(err, exc_info=True) return ret @@ -1977,6 +1978,7 @@ def will_expire(certificate, days): salt '*' x509.will_expire "/etc/pki/mycert.crt" days=30 """ + ts_pt = "%Y-%m-%d %H:%M:%S" ret = {} if os.path.isfile(certificate): @@ -1990,14 +1992,11 @@ def will_expire(certificate, days): _expiration_date = cert.get_not_after().get_datetime() ret["cn"] = _parse_subject(cert.get_subject())["CN"] - - if _expiration_date.strftime("%Y-%m-%d %H:%M:%S") <= _check_time.strftime( - "%Y-%m-%d %H:%M:%S" - ): - ret["will_expire"] = True - else: - ret["will_expire"] = False - except ValueError: - pass + ret["will_expire"] = _expiration_date.strftime( + ts_pt + ) <= _check_time.strftime(ts_pt) + except ValueError as err: + log.debug("Unable to return details of a sertificate expiration: %s", err) + log.trace(err, exc_info=True) return ret diff --git a/salt/states/x509.py b/salt/states/x509.py index b3d2f978bd..16811bcfb8 100644 --- a/salt/states/x509.py +++ b/salt/states/x509.py @@ -177,11 +177,12 @@ import os import re import salt.exceptions +import salt.utils.stringutils try: from M2Crypto.RSA import RSAError except ImportError: - pass + RSAError = Exception("RSA Error") log = logging.getLogger(__name__) @@ -193,7 +194,7 @@ def __virtual__(): if "x509.get_pem_entry" in __salt__: return "x509" else: - return (False, "Could not load x509 state: m2crypto unavailable") + return False, "Could not load x509 state: the x509 is not available" def _revoked_to_list(revs): @@ -682,7 +683,70 @@ def certificate_managed(name, days_remaining=90, append_certs=None, **kwargs): "Old": invalid_reason, "New": "Certificate will be valid and up to date", } - return ret + private_key_args.update(managed_private_key) + kwargs["public_key_passphrase"] = private_key_args["passphrase"] + + if private_key_args["new"]: + rotate_private_key = True + private_key_args["new"] = False + + if _check_private_key( + private_key_args["name"], + bits=private_key_args["bits"], + passphrase=private_key_args["passphrase"], + new=private_key_args["new"], + overwrite=private_key_args["overwrite"], + ): + private_key = __salt__["x509.get_pem_entry"]( + private_key_args["name"], pem_type="RSA PRIVATE KEY" + ) + else: + new_private_key = True + private_key = __salt__["x509.create_private_key"]( + text=True, + bits=private_key_args["bits"], + passphrase=private_key_args["passphrase"], + cipher=private_key_args["cipher"], + verbose=private_key_args["verbose"], + ) + + kwargs["public_key"] = private_key + + current_days_remaining = 0 + current_comp = {} + + if os.path.isfile(name): + try: + current = __salt__["x509.read_certificate"](certificate=name) + current_comp = copy.deepcopy(current) + if "serial_number" not in kwargs: + current_comp.pop("Serial Number") + if "signing_cert" not in kwargs: + try: + current_comp["X509v3 Extensions"][ + "authorityKeyIdentifier" + ] = re.sub( + r"serial:([0-9A-F]{2}:)*[0-9A-F]{2}", + "serial:--", + current_comp["X509v3 Extensions"]["authorityKeyIdentifier"], + ) + except KeyError: + pass + current_comp.pop("Not Before") + current_comp.pop("MD5 Finger Print") + current_comp.pop("SHA1 Finger Print") + current_comp.pop("SHA-256 Finger Print") + current_notafter = current_comp.pop("Not After") + current_days_remaining = ( + datetime.datetime.strptime(current_notafter, "%Y-%m-%d %H:%M:%S") + - datetime.datetime.now() + ).days + if days_remaining == 0: + days_remaining = current_days_remaining - 1 + except salt.exceptions.SaltInvocationError: + current = "{} is not a valid Certificate.".format(name) + else: + current = "{} does not exist.".format(name) contents = __salt__["x509.create_certificate"](text=True, **kwargs) # Check the module actually returned a cert and not an error message as a string @@ -878,6 +942,8 @@ def pem_managed(name, text, backup=False, **kwargs): Any arguments supported by :py:func:`file.managed ` are supported. """ file_args, kwargs = _get_file_args(name, **kwargs) - file_args["contents"] = __salt__["x509.get_pem_entry"](text=text) + file_args["contents"] = salt.utils.stringutils.to_str( + __salt__["x509.get_pem_entry"](text=text) + ) return __states__["file.managed"](**file_args) diff --git a/tests/unit/modules/test_x509.py b/tests/unit/modules/test_x509.py index 8f4c433b1a..3105290a2c 100644 --- a/tests/unit/modules/test_x509.py +++ b/tests/unit/modules/test_x509.py @@ -118,9 +118,9 @@ class X509TestCase(TestCase, LoaderModuleMockMixin): subj = FakeSubject() x509._parse_subject(subj) - assert x509.log.trace.call_args[0][0] == "Missing attribute '%s'. Error: %s" - assert x509.log.trace.call_args[0][1] == list(subj.nid.keys())[0] - assert isinstance(x509.log.trace.call_args[0][2], TypeError) + assert x509.log.debug.call_args[0][0] == "Missing attribute '%s'. Error: %s" + assert x509.log.debug.call_args[0][1] == list(subj.nid.keys())[0] + assert isinstance(x509.log.debug.call_args[0][2], TypeError) @skipIf(not HAS_M2CRYPTO, "Skipping, M2Crypto is unavailable") def test_get_pem_entry(self): -- 2.37.3