Files
Factory/edge-build-checks/20-helm-images
Denislav Prodanov a7d128b8c4
All checks were successful
Build PR in OBS / Build PR in OBS (pull_request_target) Successful in 16s
updated sriov images
2025-05-15 14:06:32 +02:00

160 lines
4.1 KiB
Python

#!/usr/bin/python3
import os
import glob
import subprocess
import yaml
import sys
import pprint
AUTHORIZED_REPOS = [
"registry.suse.com/suse/sles/",
"registry.suse.com/rancher",
"registry.rancher.com",
]
EXTRA_CONFIG = None
class CheckResult:
"""Class to track count of issues"""
def __init__(self):
self.hints = 0
self.warnings = 0
self.errors = 0
def hint(self, msg):
print(f"Hint: {msg}")
self.hints += 1
def warn(self, msg):
print(f"Warning: {msg}")
self.warnings += 1
def error(self, msg):
print(f"Error: {msg}")
self.errors += 1
def tarballs():
"""Return a list of .helminfo files to check."""
if "BUILD_ROOT" not in os.environ:
# Not running in an OBS build container
return glob.glob("*.tgz")
# Running in an OBS build container
buildroot = os.environ["BUILD_ROOT"]
topdir = "/usr/src/packages"
if os.path.isdir(buildroot + "/.build.packages"):
topdir = "/.build.packages"
if os.path.islink(buildroot + "/.build.packages"):
topdir = "/" + os.readlink(buildroot + "/.build.packages")
return glob.glob(f"{buildroot}{topdir}/HELM/*.tgz")
def get_extra_config():
global EXTRA_CONFIG
if EXTRA_CONFIG is not None:
return EXTRA_CONFIG
if "BUILD_ROOT" not in os.environ:
file_path = "./.checks_helm.yaml"
else:
buildroot = os.environ["BUILD_ROOT"]
topdir = "/usr/src/packages"
file_path = f"{buildroot}{topdir}/SOURCES/.checks_helm.yaml"
try:
with open(file_path) as config_file:
EXTRA_CONFIG = yaml.safe_load(config_file)
if EXTRA_CONFIG is None: # No document in stream
EXTRA_CONFIG = {}
except OSError:
EXTRA_CONFIG = {}
return EXTRA_CONFIG
def get_extra_params():
config = get_extra_config()
args = []
for api in config.get('extra_apis', []):
args.extend(['-a', api])
return args
def is_exception(image):
config = get_extra_config()
exceptions = config.get('image_exceptions', [])
(namespace, _, _) = image.partition(':')
return namespace in exceptions
def get_template(tarball_path):
raw_templates = subprocess.check_output(
[
"helm",
"template",
tarball_path,
] + get_extra_params()
).decode()
return yaml.safe_load_all(raw_templates)
def extract_key(key, var):
if hasattr(var, "items"): # hasattr(var,'items') for python 3
for k, v in var.items(): # var.items() for python 3
if k == key:
yield v
if isinstance(v, dict):
for result in extract_key(key, v):
yield result
elif isinstance(v, list):
for d in v:
for result in extract_key(key, d):
yield result
def check_template(result, template):
if template["kind"] not in [
"Pod",
"Deployment",
"StatefulSet",
"DaemonSet",
"ReplicaSet",
"Job",
"CronJob",
]:
return
for image in extract_key("image", template):
if not image.startswith(tuple(AUTHORIZED_REPOS)) and not is_exception(image):
result.error(f"{image} is not from authorized source")
pass
def main():
result = CheckResult()
img_repo = subprocess.check_output(
[
"rpm",
"--macros=/root/.rpmmacros",
"-E",
"%{?img_repo}",
]
).strip()
if img_repo:
result.hint(f"Adding '{img_repo.decode()}' to authorized repo")
AUTHORIZED_REPOS.append(img_repo.decode())
else:
result.warn("img_repo macro not defined, will not add extra authorized repo")
for tarball in tarballs():
print(f"Looking at {tarball}")
for template in get_template(tarball):
if template: # Exclude empty templates
check_template(result, template)
ret = 0
if result.errors > 0:
print("Fatal errors found.")
ret = 1
sys.exit(ret)
if __name__ == "__main__":
main()