Files

658 lines
27 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#
# osc-batch-submit
# Copyright (C) 2025 Antonio Larrosa <alarrosa@suse.com>
#
import xml.etree.ElementTree as ET
import subprocess
import json
import re
import os
import glob
from osc_batch_submit.progress import Progress
# project_with_packages_to_copy = '(((SUSE:SLE-15-SP4:GA SUSE:SLE-15-SP3:Update SUSE:SLE-15-SP3:GA SUSE:SLE-15-SP2:Update SUSE:SLE-15-SP2:GA) ∩ openSUSE.org:KDE:Frameworks5) home:alarrosa:branches:SUSE:SLE-15-SP4:GA:kf5) SUSE:SLE-15-SP4:GA:Staging:B' # noqa
# srcProjects = ['openSUSE.org:KDE:Frameworks5']
#
# tgtProjects = ['SUSE:SLE-15-SP4:GA',
# 'SUSE:SLE-15-SP3:Update',
# 'SUSE:SLE-15-SP3:GA',
# 'SUSE:SLE-15-SP2:Update',
# 'SUSE:SLE-15-SP2:GA',
# 'SUSE:SLE-15-SP1:GA',
# 'SUSE:SLE-15:GA']
# tgtProject = 'SUSE:SLE-15-SP4:GA'
def find_matching_parenthesis(string, start):
# start is the position of the opening parenthesis
# returns the position of the matching closing parenthesis
level = 1
for i, ch in enumerate(string[start + 1:]):
if ch == '(' or ch == '{':
level += 1
elif ch == ')' or ch == '}':
level -= 1
if level == 0: # Found the matching parenthesis
return i + start + 1
return len(string)
class OSC:
def __init__(self, use_cache=True, ibs=False):
self.debug = False
self.use_cache = use_cache
self.cache_dirpath = None
# The directory path where cached files are stored.
self.projects = []
self.src_projects = []
self.tgt_project = ''
self.projects_inherited_from_tgt = []
self.packagesIn = {}
self.packageVersions = {}
self.versions_in_src = {}
self.versions_in_tgt = {}
self.devel_projects = {}
# Contains [project][package] -> 'devel_project'
self.projects_bugowners = {}
self.projects_maintainers = {}
self.command = ''
if ibs:
self.osc = ['osc', '-A', 'https://api.suse.de']
else:
self.osc = ['osc']
def log(self, *args):
if self.debug:
print(*args)
def osc_command(self):
return " ".join(self.osc)
def get_all_projects_from_projects_set_expression(self, prj_set_expr):
tmp = re.sub('{[^}]*}', '', prj_set_expr)
return {x.strip()
for x in re.split(r'[∩∪\*()]', tmp)
if x.strip()}
def get_packages_from_projects_in_set_expression(self, prj_set_expr):
prj_set_expr = prj_set_expr.strip()
self.log('get_packages_from_projects_in_set_expr: ', prj_set_expr)
if not re.findall(r'[∩∪\*()\{}]', prj_set_expr):
return self.packagesIn[prj_set_expr]
tok = next(re.finditer(r'[∩∪\*(\{]', prj_set_expr))
self.log(tok.group(0))
if tok.group(0) == '(':
end_parenthesis = find_matching_parenthesis(prj_set_expr,
tok.start())
if tok.start() == 0 and end_parenthesis == len(prj_set_expr) - 1:
return self.get_packages_from_projects_in_set_expression(
prj_set_expr[1:-1])
left_op = prj_set_expr[tok.start() + 1: end_parenthesis].strip()
tok = next(re.finditer(r'[∩∪\*]',
prj_set_expr[end_parenthesis + 1:]))
right_op = prj_set_expr[end_parenthesis + tok.start() + 2:].strip()
elif tok.group(0) == '{':
end_parenthesis = find_matching_parenthesis(prj_set_expr,
tok.start())
if tok.start() == 0 and end_parenthesis == len(prj_set_expr) - 1:
package_list_expr = prj_set_expr[tok.start() + 1:
end_parenthesis].strip()
self.log('package_list_expr', package_list_expr)
self.log(package_list_expr.split(','))
return set([x.strip() for x in package_list_expr.split(',')])
left_op = prj_set_expr[tok.start():
end_parenthesis + 1] # Includes the {}
tok = next(re.finditer(r'[∩∪\*]',
prj_set_expr[end_parenthesis + 1:]))
right_op = prj_set_expr[end_parenthesis + tok.start() + 2:].strip()
else:
left_op = prj_set_expr[0:tok.start()-1].strip()
right_op = prj_set_expr[tok.start()+1:].strip()
op = tok.group(0)
self.log(f'left op: {left_op}')
self.log(f'op: {op}')
self.log(f'right op: {right_op}')
left_set = self.get_packages_from_projects_in_set_expression(left_op)
right_set = self.get_packages_from_projects_in_set_expression(right_op)
if op == '':
return left_set.union(right_set)
elif op == '':
self.log('left op pkgs:', left_set)
self.log('right op pkgs:', right_set)
return left_set.intersection(right_set)
elif op == '':
if isinstance(right_set, list):
right_set = set(right_set)
self.log(f'1 {left_set}')
self.log(f'2 {right_set}')
self.log(f'3 {left_set - right_set}')
return left_set - right_set
elif op == '*':
r = set()
for pkg in left_set:
for pattern in right_set:
if re.match(pattern, pkg):
r.add(pkg)
return r
else:
return self.packagesIn[prj_set_expr]
def cache_path_for(self, cache_type, project, create_directory=True):
if not self.cache_dirpath:
cache_path = os.getenv('XDG_CACHE_HOME')
if not cache_path:
cache_path = os.path.expanduser('~/.cache')
self.cache_dirpath = f'{cache_path}/osc-batch-submit'
if create_directory:
try:
os.mkdir(self.cache_dirpath)
except FileExistsError:
pass
return f'{self.cache_dirpath}/{cache_type}{"-" if cache_type and project else ""}{project}'
def get_project_packages(self, project):
filename = self.cache_path_for('packages-in', project)
if self.use_cache:
try:
return set([line.strip('\n')
for line in open(filename, 'rt').readlines()])
except FileNotFoundError:
pass
cmd = self.osc + ['ls', project]
output = subprocess.check_output(cmd)
contents = output.decode('utf-8', errors='replace').split('\n')
packages = [line for line in contents
if line and line != '00Meta\n' and ':' not in line]
open(filename, 'wt').write('\n'.join(packages))
return set(packages)
def readAllPackages(self):
for proj in self.projects:
if proj.startswith('{'):
continue
self.packagesIn[proj] = self.get_project_packages(proj)
try:
self.packageVersions[proj] = \
self.read_all_package_versions(proj)
except FileNotFoundError:
self.packageVersions[proj] = {}
def get_project_meta_paths(self, project, repository_name='standard'):
meta_contents = None
meta_root = None
filename = self.cache_path_for('meta-prj', project)
if self.use_cache:
try:
meta_contents = open(filename, 'rt').read()
except FileNotFoundError:
pass
else:
meta_root = ET.fromstring(meta_contents)
if not meta_root:
cmd = self.osc + ['meta', 'prj', project]
proc = subprocess.run(cmd, capture_output=True)
if proc.returncode != 0:
raise RuntimeError('osc command returned an error when '
f'getting the metaprj for {project}:\n' +
proc.stderr.decode('utf-8',
errors='replace'))
open(filename, 'wt').write(proc.stdout.decode('utf-8',
errors='replace'))
try:
meta_root = ET.fromstring(proc.stdout)
except ET.ParseError:
print(f'Obtaining metaprj for {project} got {proc.stdout}')
raise
try:
repository = meta_root.find("repository[@name='standard']")
except AttributeError:
error = meta_root.find('error')
msg = f'Error obtaining repository meta for {project}'
if error is not None:
msg += f': {error.text}'
print(msg)
return None
try:
paths = [(p.attrib['project'], p.attrib['repository'])
for p in repository.findall("path")]
except AttributeError:
error = meta_root.find('error')
msg = f'Error obtaining repository paths for {project}'
if error is not None:
msg += f': {error.text}'
print(msg)
return None
return paths
def get_all_projects_inherited_from_project(self, project,
repository_name='standard'):
r = [project]
paths = self.get_project_meta_paths(project, repository_name)
visited = {(project, repository_name)}
while paths:
prj, repo = paths.pop(0)
try:
while (prj, repo) in visited:
prj, repo = paths.pop(0)
except IndexError:
break
if prj not in r:
r.append(prj)
more_paths = self.get_project_meta_paths(prj, repo)
visited.add((prj, repo))
if more_paths:
paths.extend(more_paths)
return r
def get_package_version(self, project, package, retry=True):
cmd = self.osc + ['api',
f'/source/{project}/{package}?view=info&parse=1']
proc = subprocess.run(cmd, capture_output=True)
if proc.returncode != 0:
stderr = proc.stderr.decode('utf-8', errors='replace')
if (proc.returncode == 1 and
'HTTP Error 400: download in progress' in stderr):
return 'unknown'
raise RuntimeError('osc command returned an error when getting the'
f' version of {project}/{package}:\n' +
proc.stderr.decode('utf-8', errors='replace'))
root = ET.fromstring(proc.stdout)
try:
version = root.find('version').text
except AttributeError:
error = root.find('error')
if error is not None:
print('Error obtaining package version for ' +
f'{project}/{package}: {error.text}')
if error.text == 'download in progress' and retry:
return self.get_package_version(project, package,
retry=False)
else:
print('AttributeError obtaining package version for ' +
f'{project}/{package}')
return None
return version
def get_request_state(self, requestid, retry=True):
cmd = self.osc + ['api',
f'/request/{requestid}']
proc = subprocess.run(cmd, capture_output=True)
if proc.returncode != 0:
raise RuntimeError('osc command returned an error when getting the'
f' state of request {requestid}:\n' +
proc.stderr.decode('utf-8', errors='replace'))
root = ET.fromstring(proc.stdout)
try:
state = root.find('state').attrib['name']
state_msg = '\n'.join([x.strip(' \r\n')
for x in root.find('state').itertext()
if x.strip(' \r\n')])
except AttributeError:
error = root.find('error')
if error is not None:
print('Error obtaining request state for ' +
f'{requestid}: {error.text}')
if error.text == 'download in progress' and retry:
return self.get_request_state(requestid, retry=False)
else:
print('AttributeError obtaining request state for '
f'{requestid}')
return None
return (state, state_msg)
def get_request_comments(self, requestid, only_last=False, retry=True):
cmd = self.osc + ['api',
f'/comments/request/{requestid}']
proc = subprocess.run(cmd, capture_output=True)
if proc.returncode != 0:
raise RuntimeError('osc command returned an error when getting the'
f' comments of request {requestid}:\n' +
proc.stderr.decode('utf-8', errors='replace'))
root = ET.fromstring(proc.stdout)
try:
comments = root.findall('comment')
comments_texts = [f"{x.attrib['who']}: {x.text}" for x in comments]
if only_last:
return comments_texts[-1] if comments_texts else ''
except AttributeError:
error = root.find('error')
if error is not None:
print('Error obtaining request state for ' +
f'{requestid}: {error.text}')
if error.text == 'download in progress' and retry:
return self.get_request_comments(requestid, retry=False)
else:
print('AttributeError obtaining request comments '
f'for {requestid}')
return None
return comments_texts
def get_project_bugowner_maintainer(self, project, retry=True):
if project in self.projects_bugowners:
return (self.projects_bugowners[project],
self.projects_maintainers[project])
cmd = self.osc + ['api',
f'/source/{project}/_meta']
proc = subprocess.run(cmd, capture_output=True)
if proc.returncode != 0:
raise RuntimeError('osc command returned an error when getting the'
f' bugowner/maintiner of project {project}:\n' +
proc.stderr.decode('utf-8', errors='replace'))
root = ET.fromstring(proc.stdout)
import pdb
pdb.set_trace()
try:
b_elements = root.findall("*[@role='bugowner']")
m_elements = root.findall("*[@role='maintainer']")
except AttributeError:
error = root.find('error')
if error is not None:
print('Error obtaining project bugowner for ' +
f'{project}: {error.text}')
if error.text == 'download in progress' and retry:
return self.get_project_bugowner(project, retry=False)
else:
print('AttributeError obtaining project bugowner for ' +
f'{project}')
return None
rb = []
rm = []
for element in b_elements:
rb.append((element.tag, element.attrib[f'{element.tag}id']))
for element in m_elements:
rm.append((element.tag, element.attrib[f'{element.tag}id']))
self.projects_bugowners[project] = rb
self.projects_maintainers[project] = rm
return rb, rm
def get_package_bugowner_maintainer(self, project, package, retry=True):
cmd = self.osc + ['api',
f'/source/{project}/{package}/_meta']
proc = subprocess.run(cmd, capture_output=True)
if proc.returncode != 0:
raise RuntimeError('osc command returned an error when getting the'
' bugowner/maintiner of package '
f'{project}/{package}:\n' +
proc.stderr.decode('utf-8', errors='replace'))
root = ET.fromstring(proc.stdout)
try:
b_elements = root.findall("*[@role='bugowner']")
m_elements = root.findall("*[@role='maintainer']")
except AttributeError:
error = root.find('error')
if error is not None:
print('Error obtaining package bugowner for ' +
f'{project}/{package}: {error.text}')
if error.text == 'download in progress' and retry:
return self.get_package_bugowner(project, package,
retry=False)
else:
print('AttributeError obtaining package bugowner for ' +
f'{project}/{package}')
return None
rb = []
rm = []
for element in b_elements:
rb.append((element.tag, element.attrib[f'{element.tag}id']))
for element in m_elements:
rm.append((element.tag, element.attrib[f'{element.tag}id']))
if not rb or not rm:
prj_b, prj_m = self.get_project_bugowner_maintainer(project)
if not rb:
rb = prj_b
if not rm:
rm = prj_m
return rb, rm
def get_all_package_versions(self, project, packages, filename):
versions = {}
for package in packages:
version = self.get_package_version(project, package)
versions[package] = version
self.packageVersions[project] = versions
self.write_cache_package_versions(project)
return versions
def write_cache_package_versions(self, project):
filename = self.cache_path_for('versions-in', project)
with open(filename, 'w') as fh:
json.dump(self.packageVersions[project], fh)
def read_all_package_versions(self, project):
versions = {}
if self.use_cache:
filename = self.cache_path_for('versions-in', project)
try:
with open(filename, 'r') as fh:
versions = json.load(fh)
except json.decoder.JSONDecodeError:
print(f'JSON error in {filename}. Skipping...')
return versions
def set_params(self, source_projects, target_project,
packages_expression):
self.source_projects = source_projects
self.target_project = target_project
self.packages_expression = packages_expression
self.projects_inherited_from_tgt = \
self.get_all_projects_inherited_from_project(
target_project, repository_name='standard')
tmp_prjs = self.get_all_projects_from_projects_set_expression(
packages_expression)
self.log('Projects inherited from tgt')
self.log(self.projects_inherited_from_tgt)
self.projects = list(tmp_prjs.union(source_projects,
self.projects_inherited_from_tgt,
{target_project}))
self.readAllPackages()
self.packages_to_copy = \
self.get_packages_from_projects_in_set_expression(
packages_expression)
def add_packages_to_packages_to_copy_list(self, packages):
self.packages_to_copy.extend(p for p in packages
if p not in self.packages_to_copy)
def restrict_packages_in_packages_to_copy_list(self, packages):
packages_to_copy = self.packages_to_copy
self.packages_to_copy = [p for p in packages
if p in packages_to_copy]
def get_package_devel_project(self, project, package):
'''Returns the devel project for a package.
Returns 'KDE:Applications' for project 'openSUSE:Factory',
package 'kcron'
'''
if project not in self.devel_projects:
self.read_cache_devel_projects(project)
try:
return self.devel_projects[project][package]
except KeyError:
pass
cmd = self.osc + ['develproject', project, package]
proc = subprocess.run(cmd, capture_output=True)
if proc.returncode != 0:
raise RuntimeError('osc command returned an error when getting the'
f' devel project of {project}/{package}:\n' +
proc.stderr.decode('utf-8', errors='replace'))
self.devel_projects[project][package] = \
proc.stdout.decode('utf-8',
errors='replace').strip('\n').split('/')[0]
self.write_cache_devel_projects(project)
return self.devel_projects[project][package]
def package_is_devel_package_for_project(
self, devel_project, devel_package, project):
'''Checks the devel project for a package.
Returns if devel_project is the devel project of devel_package in
project. For example, returns True for ('KDE:Applications',
'kcron', 'openSUSE:Factory').
'''
real_devel_project = self.get_package_devel_project(project,
devel_package)
return real_devel_project == devel_project
def write_cache_devel_projects(self, project):
filename = self.cache_path_for('devel-projects', project)
with open(filename, 'w') as fh:
json.dump(self.devel_projects[project], fh)
def read_cache_devel_projects(self, project):
self.devel_projects[project] = {}
if not self.use_cache:
return
filename = self.cache_path_for('devel-projects', project)
try:
with open(filename, 'r') as fh:
self.devel_projects[project] = json.load(fh)
except FileNotFoundError:
pass
def get_package_versions_from_source_projects_helper(self,
get_versions=True,
print_progress=False):
self.versions_in_src = {}
self.packageSource = {}
progress = None
if print_progress:
progress = Progress()
progress.max_value = len(self.packages_to_copy) - 1
for idx, package in enumerate(sorted(self.packages_to_copy)):
if progress:
progress.set_value(idx)
for prj in self.source_projects:
if package in self.packagesIn[prj]:
self.packageSource[package] = prj
if not get_versions:
continue
try:
self.versions_in_src[package] = \
self.packageVersions[prj][package]
except KeyError:
# We need to request the version to obs
try:
srcVersion = self.get_package_version(prj, package)
except ET.ParseError:
self.log(f'{prj} / {package} doesn\'t have a '
'version set (probably still building?)')
self.versions_in_src[package] = None
break
self.log(f'Requested src version to obs: {prj}, ' +
f'{package}, {srcVersion}')
if srcVersion:
self.versions_in_src[package] = srcVersion
self.packageVersions[prj][package] = srcVersion
self.write_cache_package_versions(prj)
break
if progress:
print('')
return (self.versions_in_src, self.packageSource)
def get_package_source_from_source_projects(self, print_progress=False):
self.get_package_versions_from_source_projects_helper(
get_versions=False, print_progress=print_progress)
return self.packageSource
def get_package_versions_from_source_projects(self, print_progress=False):
return self.get_package_versions_from_source_projects_helper(
get_versions=True, print_progress=print_progress)
def get_package_versions_from_target_projects(self, print_progress=False):
progress = None
lf = ''
if print_progress:
progress = Progress()
progress.max_value = len(self.packages_to_copy) - 1
lf = '\n'
for idx, package in enumerate(sorted(self.packages_to_copy)):
if progress:
progress.set_value(idx)
try:
tgtVersion = self.versions_in_tgt[package]
except KeyError:
# Obtaining version of {package} from
# {self.projects_inherited_from_tgt}
for prj in self.projects_inherited_from_tgt:
if package not in self.packagesIn[prj]:
# print(f'{package} not available in {prj}')
continue
# Let's see if we already read the package version
# print(f'{package} is in {prj}')
try:
tgtVersion = self.packageVersions[prj][package]
except KeyError:
# We need to request the version to obs
# print(f'Obtainining version of {package} from {prj}')
tgtVersion = self.get_package_version(prj, package)
self.log(f'Requested tgt version to obs: {prj}, ' +
f'{package}, {tgtVersion}')
if tgtVersion:
self.versions_in_tgt[package] = tgtVersion
self.packageVersions[prj][package] = tgtVersion
self.write_cache_package_versions(prj)
break
else:
print(f'{lf}# {package} not found in ' +
f'{self.projects_inherited_from_tgt}')
tgtVersion = None
if progress:
print('')
return self.versions_in_tgt
def get_packages_diff(self, srcPrj, srcPackage, tgtPrj, tgtPackage):
cmd = self.osc + ['rdiff', srcPrj, srcPackage, tgtPrj, tgtPackage]
proc = subprocess.run(cmd, capture_output=True)
if proc.returncode != 0:
raise RuntimeError('osc command returned an error when getting the'
f' rdiff of {srcPrj}/{srcPackage} and '
f'{tgtPrj}/{tgtPackage}:\n' +
proc.stderr.decode('utf-8', errors='replace'))
return proc.stdout.decode('utf-8', errors='replace').strip('\n')
def remove_cache_files(self):
dirpath = self.cache_path_for('', '')
cache_types = ['packages-in', 'meta-prj', 'versions-in', 'devel-projects']
for cache_type in cache_types:
path = f'{dirpath}/{cache_type}-*'
for filename in glob.glob(path):
os.unlink(filename)