import xml.etree.ElementTree as ET import subprocess import time import os from collections import Counter def get_buildstatus(project: str) -> ET.Element: for _ in range(5): try: output = subprocess.check_output(["osc", "pr", "--xml", project]) return ET.fromstring(output) except subprocess.CalledProcessError: continue def do_wait(project:str, commit:str) -> ET.Element: last_state = None while True: time.sleep(5) status = get_buildstatus(project) if last_state == status.get("state"): continue else: last_state = status.get("state") scminfo = { e.text for e in status.findall(".//scminfo") } if len(scminfo) != 1 or scminfo.pop() != commit: print("Waiting for OBS to sync with SCM") continue if not all([ e.get('state') == "published" and e.get('dirty') is None for e in status.findall("./result")]): print("Waiting for OBS to finish building") continue return status def print_results(status: ET.Element) -> bool: results = {} failed = [] for e in status.findall("./result"): repo = results.get(e.get("repository"), {}) repo[e.get("arch")] = e results[e.get("repository")] = repo for repo in results.keys(): print(f"{repo}:") depth=1 for arch in results[repo].keys(): counts = Counter() if repo != "charts": print(f"\t{arch}:") depth=2 for package in results[repo][arch].findall("./status"): if package.get("code") in ["excluded", "disabled"]: continue if package.get("code") in ["failed", "unresolvable", "broken"]: details = package.findtext("details") if details: failed.append(f"{package.get('package')} ({arch}): {details}") else: failed.append(f"{package.get('package')} ({arch})") counts[package.get("code")] += 1 for (code, count) in counts.items(): print("\t"*depth, f"{code}: {count}") failed.sort() if failed: print("\nPackages failing: ") for fail in failed: print("\t", fail) return len(failed) != 0 def main(): project = os.environ.get("OBS_PROJECT") sha = os.environ.get("GITEA_SHA") status = do_wait(project, sha) if print_results(status): return 1 if __name__ == "__main__": main()