#!/usr/bin/python3 import os import glob import subprocess import yaml import sys import pprint AUTHORIZED_REPOS = [ "registry.suse.com/suse/sles/", "registry.suse.com/rancher", "registry.rancher.com", ] EXTRA_CONFIG = None class CheckResult: """Class to track count of issues""" def __init__(self): self.hints = 0 self.warnings = 0 self.errors = 0 def hint(self, msg): print(f"Hint: {msg}") self.hints += 1 def warn(self, msg): print(f"Warning: {msg}") self.warnings += 1 def error(self, msg): print(f"Error: {msg}") self.errors += 1 def tarballs(): """Return a list of .helminfo files to check.""" if "BUILD_ROOT" not in os.environ: # Not running in an OBS build container return glob.glob("*.tgz") # Running in an OBS build container buildroot = os.environ["BUILD_ROOT"] topdir = "/usr/src/packages" if os.path.isdir(buildroot + "/.build.packages"): topdir = "/.build.packages" if os.path.islink(buildroot + "/.build.packages"): topdir = "/" + os.readlink(buildroot + "/.build.packages") return glob.glob(f"{buildroot}{topdir}/HELM/*.tgz") def get_extra_config(): global EXTRA_CONFIG if EXTRA_CONFIG is not None: return EXTRA_CONFIG if "BUILD_ROOT" not in os.environ: file_path = "./.checks_helm.yaml" else: buildroot = os.environ["BUILD_ROOT"] topdir = "/usr/src/packages" file_path = f"{buildroot}{topdir}/SOURCES/.checks_helm.yaml" try: with open(file_path) as config_file: EXTRA_CONFIG = yaml.safe_load(config_file) if EXTRA_CONFIG is None: # No document in stream EXTRA_CONFIG = {} except OSError: EXTRA_CONFIG = {} return EXTRA_CONFIG def get_extra_params(): config = get_extra_config() args = [] for api in config.get('extra_apis', []): args.extend(['-a', api]) return args def is_exception(image): config = get_extra_config() exceptions = config.get('image_exceptions', []) (namespace, _, _) = image.partition(':') return namespace in exceptions def get_template(tarball_path): raw_templates = subprocess.check_output( [ "helm", "template", tarball_path, ] + get_extra_params() ).decode() return yaml.safe_load_all(raw_templates) def extract_key(key, var): if hasattr(var, "items"): # hasattr(var,'items') for python 3 for k, v in var.items(): # var.items() for python 3 if k == key: yield v if isinstance(v, dict): for result in extract_key(key, v): yield result elif isinstance(v, list): for d in v: for result in extract_key(key, d): yield result def check_template(result, template): if template["kind"] not in [ "Pod", "Deployment", "StatefulSet", "DaemonSet", "ReplicaSet", "Job", "CronJob", ]: return for image in extract_key("image", template): if not image.startswith(tuple(AUTHORIZED_REPOS)) and not is_exception(image): result.error(f"{image} is not from authorized source") pass def main(): result = CheckResult() img_repo = subprocess.check_output( [ "rpm", "--macros=/root/.rpmmacros", "-E", "%{?img_repo}", ] ).strip() if img_repo: result.hint(f"Adding '{img_repo.decode()}' to authorized repo") AUTHORIZED_REPOS.append(img_repo.decode()) else: result.warn("img_repo macro not defined, will not add extra authorized repo") for tarball in tarballs(): print(f"Looking at {tarball}") for template in get_template(tarball): if template: # Exclude empty templates check_template(result, template) ret = 0 if result.errors > 0: print("Fatal errors found.") ret = 1 sys.exit(ret) if __name__ == "__main__": main()