# # osc-batch-submit # Copyright (C) 2025 Antonio Larrosa # import xml.etree.ElementTree as ET import subprocess import json import re import os import glob from osc_batch_submit.progress import Progress # project_with_packages_to_copy = '(((SUSE:SLE-15-SP4:GA ∪ SUSE:SLE-15-SP3:Update ∪ SUSE:SLE-15-SP3:GA ∪ SUSE:SLE-15-SP2:Update ∪ SUSE:SLE-15-SP2:GA) ∩ openSUSE.org:KDE:Frameworks5) ∪ home:alarrosa:branches:SUSE:SLE-15-SP4:GA:kf5) ∖ SUSE:SLE-15-SP4:GA:Staging:B' # noqa # srcProjects = ['openSUSE.org:KDE:Frameworks5'] # # tgtProjects = ['SUSE:SLE-15-SP4:GA', # 'SUSE:SLE-15-SP3:Update', # 'SUSE:SLE-15-SP3:GA', # 'SUSE:SLE-15-SP2:Update', # 'SUSE:SLE-15-SP2:GA', # 'SUSE:SLE-15-SP1:GA', # 'SUSE:SLE-15:GA'] # tgtProject = 'SUSE:SLE-15-SP4:GA' def find_matching_parenthesis(string, start): # start is the position of the opening parenthesis # returns the position of the matching closing parenthesis level = 1 for i, ch in enumerate(string[start + 1:]): if ch == '(' or ch == '{': level += 1 elif ch == ')' or ch == '}': level -= 1 if level == 0: # Found the matching parenthesis return i + start + 1 return len(string) class OSC: def __init__(self, use_cache=True, ibs=False): self.debug = False self.use_cache = use_cache self.cache_dirpath = None # The directory path where cached files are stored. self.projects = [] self.src_projects = [] self.tgt_project = '' self.projects_inherited_from_tgt = [] self.packagesIn = {} self.packageVersions = {} self.versions_in_src = {} self.versions_in_tgt = {} self.devel_projects = {} # Contains [project][package] -> 'devel_project' self.projects_bugowners = {} self.projects_maintainers = {} self.command = '' if ibs: self.osc = ['osc', '-A', 'https://api.suse.de'] else: self.osc = ['osc'] def log(self, *args): if self.debug: print(*args) def osc_command(self): return " ".join(self.osc) def get_all_projects_from_projects_set_expression(self, prj_set_expr): tmp = re.sub('{[^}]*}', '', prj_set_expr) return {x.strip() for x in re.split(r'[∩∪\*∖()]', tmp) if x.strip()} def get_packages_from_projects_in_set_expression(self, prj_set_expr): prj_set_expr = prj_set_expr.strip() self.log('get_packages_from_projects_in_set_expr: ', prj_set_expr) if not re.findall(r'[∩∪\*∖()\{}]', prj_set_expr): return self.packagesIn[prj_set_expr] tok = next(re.finditer(r'[∩∪\*∖(\{]', prj_set_expr)) self.log(tok.group(0)) if tok.group(0) == '(': end_parenthesis = find_matching_parenthesis(prj_set_expr, tok.start()) if tok.start() == 0 and end_parenthesis == len(prj_set_expr) - 1: return self.get_packages_from_projects_in_set_expression( prj_set_expr[1:-1]) left_op = prj_set_expr[tok.start() + 1: end_parenthesis].strip() tok = next(re.finditer(r'[∩∪\*∖]', prj_set_expr[end_parenthesis + 1:])) right_op = prj_set_expr[end_parenthesis + tok.start() + 2:].strip() elif tok.group(0) == '{': end_parenthesis = find_matching_parenthesis(prj_set_expr, tok.start()) if tok.start() == 0 and end_parenthesis == len(prj_set_expr) - 1: package_list_expr = prj_set_expr[tok.start() + 1: end_parenthesis].strip() self.log('package_list_expr', package_list_expr) self.log(package_list_expr.split(',')) return set([x.strip() for x in package_list_expr.split(',')]) left_op = prj_set_expr[tok.start(): end_parenthesis + 1] # Includes the {} tok = next(re.finditer(r'[∩∪\*∖]', prj_set_expr[end_parenthesis + 1:])) right_op = prj_set_expr[end_parenthesis + tok.start() + 2:].strip() else: left_op = prj_set_expr[0:tok.start()-1].strip() right_op = prj_set_expr[tok.start()+1:].strip() op = tok.group(0) self.log(f'left op: {left_op}') self.log(f'op: {op}') self.log(f'right op: {right_op}') left_set = self.get_packages_from_projects_in_set_expression(left_op) right_set = self.get_packages_from_projects_in_set_expression(right_op) if op == '∪': return left_set.union(right_set) elif op == '∩': self.log('left op pkgs:', left_set) self.log('right op pkgs:', right_set) return left_set.intersection(right_set) elif op == '∖': if isinstance(right_set, list): right_set = set(right_set) self.log(f'1 {left_set}') self.log(f'2 {right_set}') self.log(f'3 {left_set - right_set}') return left_set - right_set elif op == '*': r = set() for pkg in left_set: for pattern in right_set: if re.match(pattern, pkg): r.add(pkg) return r else: return self.packagesIn[prj_set_expr] def cache_path_for(self, cache_type, project, create_directory=True): if not self.cache_dirpath: cache_path = os.getenv('XDG_CACHE_HOME') if not cache_path: cache_path = os.path.expanduser('~/.cache') self.cache_dirpath = f'{cache_path}/osc-batch-submit' if create_directory: try: os.mkdir(self.cache_dirpath) except FileExistsError: pass return f'{self.cache_dirpath}/{cache_type}{"-" if cache_type and project else ""}{project}' def get_project_packages(self, project): filename = self.cache_path_for('packages-in', project) if self.use_cache: try: return set([line.strip('\n') for line in open(filename, 'rt').readlines()]) except FileNotFoundError: pass cmd = self.osc + ['ls', project] output = subprocess.check_output(cmd) contents = output.decode('utf-8', errors='replace').split('\n') packages = [line for line in contents if line and line != '00Meta\n' and ':' not in line] open(filename, 'wt').write('\n'.join(packages)) return set(packages) def readAllPackages(self): for proj in self.projects: if proj.startswith('{'): continue self.packagesIn[proj] = self.get_project_packages(proj) try: self.packageVersions[proj] = \ self.read_all_package_versions(proj) except FileNotFoundError: self.packageVersions[proj] = {} def get_project_meta_paths(self, project, repository_name='standard'): meta_contents = None meta_root = None filename = self.cache_path_for('meta-prj', project) if self.use_cache: try: meta_contents = open(filename, 'rt').read() except FileNotFoundError: pass else: meta_root = ET.fromstring(meta_contents) if not meta_root: cmd = self.osc + ['meta', 'prj', project] proc = subprocess.run(cmd, capture_output=True) if proc.returncode != 0: raise RuntimeError('osc command returned an error when ' f'getting the metaprj for {project}:\n' + proc.stderr.decode('utf-8', errors='replace')) open(filename, 'wt').write(proc.stdout.decode('utf-8', errors='replace')) try: meta_root = ET.fromstring(proc.stdout) except ET.ParseError: print(f'Obtaining metaprj for {project} got {proc.stdout}') raise try: repository = meta_root.find("repository[@name='standard']") except AttributeError: error = meta_root.find('error') msg = f'Error obtaining repository meta for {project}' if error is not None: msg += f': {error.text}' print(msg) return None try: paths = [(p.attrib['project'], p.attrib['repository']) for p in repository.findall("path")] except AttributeError: error = meta_root.find('error') msg = f'Error obtaining repository paths for {project}' if error is not None: msg += f': {error.text}' print(msg) return None return paths def get_all_projects_inherited_from_project(self, project, repository_name='standard'): r = [project] paths = self.get_project_meta_paths(project, repository_name) visited = {(project, repository_name)} while paths: prj, repo = paths.pop(0) try: while (prj, repo) in visited: prj, repo = paths.pop(0) except IndexError: break if prj not in r: r.append(prj) more_paths = self.get_project_meta_paths(prj, repo) visited.add((prj, repo)) if more_paths: paths.extend(more_paths) return r def get_package_version(self, project, package, retry=True): cmd = self.osc + ['api', f'/source/{project}/{package}?view=info&parse=1'] proc = subprocess.run(cmd, capture_output=True) if proc.returncode != 0: stderr = proc.stderr.decode('utf-8', errors='replace') if (proc.returncode == 1 and 'HTTP Error 400: download in progress' in stderr): return 'unknown' raise RuntimeError('osc command returned an error when getting the' f' version of {project}/{package}:\n' + proc.stderr.decode('utf-8', errors='replace')) root = ET.fromstring(proc.stdout) try: version = root.find('version').text except AttributeError: error = root.find('error') if error is not None: print('Error obtaining package version for ' + f'{project}/{package}: {error.text}') if error.text == 'download in progress' and retry: return self.get_package_version(project, package, retry=False) else: print('AttributeError obtaining package version for ' + f'{project}/{package}') return None return version def get_request_state(self, requestid, retry=True): cmd = self.osc + ['api', f'/request/{requestid}'] proc = subprocess.run(cmd, capture_output=True) if proc.returncode != 0: raise RuntimeError('osc command returned an error when getting the' f' state of request {requestid}:\n' + proc.stderr.decode('utf-8', errors='replace')) root = ET.fromstring(proc.stdout) try: state = root.find('state').attrib['name'] state_msg = '\n'.join([x.strip(' \r\n') for x in root.find('state').itertext() if x.strip(' \r\n')]) except AttributeError: error = root.find('error') if error is not None: print('Error obtaining request state for ' + f'{requestid}: {error.text}') if error.text == 'download in progress' and retry: return self.get_request_state(requestid, retry=False) else: print('AttributeError obtaining request state for ' f'{requestid}') return None return (state, state_msg) def get_request_comments(self, requestid, only_last=False, retry=True): cmd = self.osc + ['api', f'/comments/request/{requestid}'] proc = subprocess.run(cmd, capture_output=True) if proc.returncode != 0: raise RuntimeError('osc command returned an error when getting the' f' comments of request {requestid}:\n' + proc.stderr.decode('utf-8', errors='replace')) root = ET.fromstring(proc.stdout) try: comments = root.findall('comment') comments_texts = [f"{x.attrib['who']}: {x.text}" for x in comments] if only_last: return comments_texts[-1] if comments_texts else '' except AttributeError: error = root.find('error') if error is not None: print('Error obtaining request state for ' + f'{requestid}: {error.text}') if error.text == 'download in progress' and retry: return self.get_request_comments(requestid, retry=False) else: print('AttributeError obtaining request comments ' f'for {requestid}') return None return comments_texts def get_project_bugowner_maintainer(self, project, retry=True): if project in self.projects_bugowners: return (self.projects_bugowners[project], self.projects_maintainers[project]) cmd = self.osc + ['api', f'/source/{project}/_meta'] proc = subprocess.run(cmd, capture_output=True) if proc.returncode != 0: raise RuntimeError('osc command returned an error when getting the' f' bugowner/maintiner of project {project}:\n' + proc.stderr.decode('utf-8', errors='replace')) root = ET.fromstring(proc.stdout) import pdb pdb.set_trace() try: b_elements = root.findall("*[@role='bugowner']") m_elements = root.findall("*[@role='maintainer']") except AttributeError: error = root.find('error') if error is not None: print('Error obtaining project bugowner for ' + f'{project}: {error.text}') if error.text == 'download in progress' and retry: return self.get_project_bugowner(project, retry=False) else: print('AttributeError obtaining project bugowner for ' + f'{project}') return None rb = [] rm = [] for element in b_elements: rb.append((element.tag, element.attrib[f'{element.tag}id'])) for element in m_elements: rm.append((element.tag, element.attrib[f'{element.tag}id'])) self.projects_bugowners[project] = rb self.projects_maintainers[project] = rm return rb, rm def get_package_bugowner_maintainer(self, project, package, retry=True): cmd = self.osc + ['api', f'/source/{project}/{package}/_meta'] proc = subprocess.run(cmd, capture_output=True) if proc.returncode != 0: raise RuntimeError('osc command returned an error when getting the' ' bugowner/maintiner of package ' f'{project}/{package}:\n' + proc.stderr.decode('utf-8', errors='replace')) root = ET.fromstring(proc.stdout) try: b_elements = root.findall("*[@role='bugowner']") m_elements = root.findall("*[@role='maintainer']") except AttributeError: error = root.find('error') if error is not None: print('Error obtaining package bugowner for ' + f'{project}/{package}: {error.text}') if error.text == 'download in progress' and retry: return self.get_package_bugowner(project, package, retry=False) else: print('AttributeError obtaining package bugowner for ' + f'{project}/{package}') return None rb = [] rm = [] for element in b_elements: rb.append((element.tag, element.attrib[f'{element.tag}id'])) for element in m_elements: rm.append((element.tag, element.attrib[f'{element.tag}id'])) if not rb or not rm: prj_b, prj_m = self.get_project_bugowner_maintainer(project) if not rb: rb = prj_b if not rm: rm = prj_m return rb, rm def get_all_package_versions(self, project, packages, filename): versions = {} for package in packages: version = self.get_package_version(project, package) versions[package] = version self.packageVersions[project] = versions self.write_cache_package_versions(project) return versions def write_cache_package_versions(self, project): filename = self.cache_path_for('versions-in', project) with open(filename, 'w') as fh: json.dump(self.packageVersions[project], fh) def read_all_package_versions(self, project): versions = {} if self.use_cache: filename = self.cache_path_for('versions-in', project) try: with open(filename, 'r') as fh: versions = json.load(fh) except json.decoder.JSONDecodeError: print(f'JSON error in {filename}. Skipping...') return versions def set_params(self, source_projects, target_project, packages_expression): self.source_projects = source_projects self.target_project = target_project self.packages_expression = packages_expression self.projects_inherited_from_tgt = \ self.get_all_projects_inherited_from_project( target_project, repository_name='standard') tmp_prjs = self.get_all_projects_from_projects_set_expression( packages_expression) self.log('Projects inherited from tgt') self.log(self.projects_inherited_from_tgt) self.projects = list(tmp_prjs.union(source_projects, self.projects_inherited_from_tgt, {target_project})) self.readAllPackages() self.packages_to_copy = \ self.get_packages_from_projects_in_set_expression( packages_expression) def add_packages_to_packages_to_copy_list(self, packages): self.packages_to_copy.extend(p for p in packages if p not in self.packages_to_copy) def restrict_packages_in_packages_to_copy_list(self, packages): packages_to_copy = self.packages_to_copy self.packages_to_copy = [p for p in packages if p in packages_to_copy] def get_package_devel_project(self, project, package): '''Returns the devel project for a package. Returns 'KDE:Applications' for project 'openSUSE:Factory', package 'kcron' ''' if project not in self.devel_projects: self.read_cache_devel_projects(project) try: return self.devel_projects[project][package] except KeyError: pass cmd = self.osc + ['develproject', project, package] proc = subprocess.run(cmd, capture_output=True) if proc.returncode != 0: raise RuntimeError('osc command returned an error when getting the' f' devel project of {project}/{package}:\n' + proc.stderr.decode('utf-8', errors='replace')) self.devel_projects[project][package] = \ proc.stdout.decode('utf-8', errors='replace').strip('\n').split('/')[0] self.write_cache_devel_projects(project) return self.devel_projects[project][package] def package_is_devel_package_for_project( self, devel_project, devel_package, project): '''Checks the devel project for a package. Returns if devel_project is the devel project of devel_package in project. For example, returns True for ('KDE:Applications', 'kcron', 'openSUSE:Factory'). ''' real_devel_project = self.get_package_devel_project(project, devel_package) return real_devel_project == devel_project def write_cache_devel_projects(self, project): filename = self.cache_path_for('devel-projects', project) with open(filename, 'w') as fh: json.dump(self.devel_projects[project], fh) def read_cache_devel_projects(self, project): self.devel_projects[project] = {} if not self.use_cache: return filename = self.cache_path_for('devel-projects', project) try: with open(filename, 'r') as fh: self.devel_projects[project] = json.load(fh) except FileNotFoundError: pass def get_package_versions_from_source_projects_helper(self, get_versions=True, print_progress=False): self.versions_in_src = {} self.packageSource = {} progress = None if print_progress: progress = Progress() progress.max_value = len(self.packages_to_copy) - 1 for idx, package in enumerate(sorted(self.packages_to_copy)): if progress: progress.set_value(idx) for prj in self.source_projects: if package in self.packagesIn[prj]: self.packageSource[package] = prj if not get_versions: continue try: self.versions_in_src[package] = \ self.packageVersions[prj][package] except KeyError: # We need to request the version to obs try: srcVersion = self.get_package_version(prj, package) except ET.ParseError: self.log(f'{prj} / {package} doesn\'t have a ' 'version set (probably still building?)') self.versions_in_src[package] = None break self.log(f'Requested src version to obs: {prj}, ' + f'{package}, {srcVersion}') if srcVersion: self.versions_in_src[package] = srcVersion self.packageVersions[prj][package] = srcVersion self.write_cache_package_versions(prj) break if progress: print('') return (self.versions_in_src, self.packageSource) def get_package_source_from_source_projects(self, print_progress=False): self.get_package_versions_from_source_projects_helper( get_versions=False, print_progress=print_progress) return self.packageSource def get_package_versions_from_source_projects(self, print_progress=False): return self.get_package_versions_from_source_projects_helper( get_versions=True, print_progress=print_progress) def get_package_versions_from_target_projects(self, print_progress=False): progress = None lf = '' if print_progress: progress = Progress() progress.max_value = len(self.packages_to_copy) - 1 lf = '\n' for idx, package in enumerate(sorted(self.packages_to_copy)): if progress: progress.set_value(idx) try: tgtVersion = self.versions_in_tgt[package] except KeyError: # Obtaining version of {package} from # {self.projects_inherited_from_tgt} for prj in self.projects_inherited_from_tgt: if package not in self.packagesIn[prj]: # print(f'{package} not available in {prj}') continue # Let's see if we already read the package version # print(f'{package} is in {prj}') try: tgtVersion = self.packageVersions[prj][package] except KeyError: # We need to request the version to obs # print(f'Obtainining version of {package} from {prj}') tgtVersion = self.get_package_version(prj, package) self.log(f'Requested tgt version to obs: {prj}, ' + f'{package}, {tgtVersion}') if tgtVersion: self.versions_in_tgt[package] = tgtVersion self.packageVersions[prj][package] = tgtVersion self.write_cache_package_versions(prj) break else: print(f'{lf}# {package} not found in ' + f'{self.projects_inherited_from_tgt}') tgtVersion = None if progress: print('') return self.versions_in_tgt def get_packages_diff(self, srcPrj, srcPackage, tgtPrj, tgtPackage): cmd = self.osc + ['rdiff', srcPrj, srcPackage, tgtPrj, tgtPackage] proc = subprocess.run(cmd, capture_output=True) if proc.returncode != 0: raise RuntimeError('osc command returned an error when getting the' f' rdiff of {srcPrj}/{srcPackage} and ' f'{tgtPrj}/{tgtPackage}:\n' + proc.stderr.decode('utf-8', errors='replace')) return proc.stdout.decode('utf-8', errors='replace').strip('\n') def remove_cache_files(self): dirpath = self.cache_path_for('', '') cache_types = ['packages-in', 'meta-prj', 'versions-in', 'devel-projects'] for cache_type in cache_types: path = f'{dirpath}/{cache_type}-*' for filename in glob.glob(path): os.unlink(filename)