Files

658 lines
27 KiB
Python
Raw Permalink Normal View History

#
# osc-batch-submit
# Copyright (C) 2025 Antonio Larrosa <alarrosa@suse.com>
#
import xml.etree.ElementTree as ET
import subprocess
import json
import re
import os
2025-05-21 17:35:55 +02:00
import glob
from osc_batch_submit.progress import Progress
# project_with_packages_to_copy = '(((SUSE:SLE-15-SP4:GA SUSE:SLE-15-SP3:Update SUSE:SLE-15-SP3:GA SUSE:SLE-15-SP2:Update SUSE:SLE-15-SP2:GA) ∩ openSUSE.org:KDE:Frameworks5) home:alarrosa:branches:SUSE:SLE-15-SP4:GA:kf5) SUSE:SLE-15-SP4:GA:Staging:B' # noqa
# srcProjects = ['openSUSE.org:KDE:Frameworks5']
#
# tgtProjects = ['SUSE:SLE-15-SP4:GA',
# 'SUSE:SLE-15-SP3:Update',
# 'SUSE:SLE-15-SP3:GA',
# 'SUSE:SLE-15-SP2:Update',
# 'SUSE:SLE-15-SP2:GA',
# 'SUSE:SLE-15-SP1:GA',
# 'SUSE:SLE-15:GA']
# tgtProject = 'SUSE:SLE-15-SP4:GA'
def find_matching_parenthesis(string, start):
# start is the position of the opening parenthesis
# returns the position of the matching closing parenthesis
level = 1
for i, ch in enumerate(string[start + 1:]):
if ch == '(' or ch == '{':
level += 1
elif ch == ')' or ch == '}':
level -= 1
if level == 0: # Found the matching parenthesis
return i + start + 1
return len(string)
class OSC:
def __init__(self, use_cache=True, ibs=False):
self.debug = False
self.use_cache = use_cache
self.cache_dirpath = None
# The directory path where cached files are stored.
self.projects = []
self.src_projects = []
self.tgt_project = ''
self.projects_inherited_from_tgt = []
self.packagesIn = {}
self.packageVersions = {}
self.versions_in_src = {}
self.versions_in_tgt = {}
self.devel_projects = {}
# Contains [project][package] -> 'devel_project'
self.projects_bugowners = {}
self.projects_maintainers = {}
self.command = ''
if ibs:
self.osc = ['osc', '-A', 'https://api.suse.de']
else:
self.osc = ['osc']
def log(self, *args):
if self.debug:
print(*args)
def osc_command(self):
return " ".join(self.osc)
def get_all_projects_from_projects_set_expression(self, prj_set_expr):
tmp = re.sub('{[^}]*}', '', prj_set_expr)
return {x.strip()
for x in re.split(r'[∩∪\*()]', tmp)
if x.strip()}
def get_packages_from_projects_in_set_expression(self, prj_set_expr):
prj_set_expr = prj_set_expr.strip()
self.log('get_packages_from_projects_in_set_expr: ', prj_set_expr)
if not re.findall(r'[∩∪\*()\{}]', prj_set_expr):
return self.packagesIn[prj_set_expr]
tok = next(re.finditer(r'[∩∪\*(\{]', prj_set_expr))
self.log(tok.group(0))
if tok.group(0) == '(':
end_parenthesis = find_matching_parenthesis(prj_set_expr,
tok.start())
if tok.start() == 0 and end_parenthesis == len(prj_set_expr) - 1:
return self.get_packages_from_projects_in_set_expression(
prj_set_expr[1:-1])
left_op = prj_set_expr[tok.start() + 1: end_parenthesis].strip()
tok = next(re.finditer(r'[∩∪\*]',
prj_set_expr[end_parenthesis + 1:]))
right_op = prj_set_expr[end_parenthesis + tok.start() + 2:].strip()
elif tok.group(0) == '{':
end_parenthesis = find_matching_parenthesis(prj_set_expr,
tok.start())
if tok.start() == 0 and end_parenthesis == len(prj_set_expr) - 1:
package_list_expr = prj_set_expr[tok.start() + 1:
end_parenthesis].strip()
self.log('package_list_expr', package_list_expr)
self.log(package_list_expr.split(','))
return set([x.strip() for x in package_list_expr.split(',')])
left_op = prj_set_expr[tok.start():
end_parenthesis + 1] # Includes the {}
tok = next(re.finditer(r'[∩∪\*]',
prj_set_expr[end_parenthesis + 1:]))
right_op = prj_set_expr[end_parenthesis + tok.start() + 2:].strip()
else:
left_op = prj_set_expr[0:tok.start()-1].strip()
right_op = prj_set_expr[tok.start()+1:].strip()
op = tok.group(0)
self.log(f'left op: {left_op}')
self.log(f'op: {op}')
self.log(f'right op: {right_op}')
left_set = self.get_packages_from_projects_in_set_expression(left_op)
right_set = self.get_packages_from_projects_in_set_expression(right_op)
if op == '':
return left_set.union(right_set)
elif op == '':
self.log('left op pkgs:', left_set)
self.log('right op pkgs:', right_set)
return left_set.intersection(right_set)
elif op == '':
if isinstance(right_set, list):
right_set = set(right_set)
self.log(f'1 {left_set}')
self.log(f'2 {right_set}')
self.log(f'3 {left_set - right_set}')
return left_set - right_set
elif op == '*':
r = set()
for pkg in left_set:
for pattern in right_set:
if re.match(pattern, pkg):
r.add(pkg)
return r
else:
return self.packagesIn[prj_set_expr]
2025-05-21 17:35:55 +02:00
def cache_path_for(self, cache_type, project, create_directory=True):
if not self.cache_dirpath:
cache_path = os.getenv('XDG_CACHE_HOME')
if not cache_path:
cache_path = os.path.expanduser('~/.cache')
2025-05-21 17:35:55 +02:00
self.cache_dirpath = f'{cache_path}/osc-batch-submit'
if create_directory:
try:
os.mkdir(self.cache_dirpath)
except FileExistsError:
pass
2025-05-21 17:35:55 +02:00
return f'{self.cache_dirpath}/{cache_type}{"-" if cache_type and project else ""}{project}'
def get_project_packages(self, project):
filename = self.cache_path_for('packages-in', project)
if self.use_cache:
try:
return set([line.strip('\n')
for line in open(filename, 'rt').readlines()])
except FileNotFoundError:
pass
cmd = self.osc + ['ls', project]
output = subprocess.check_output(cmd)
contents = output.decode('utf-8', errors='replace').split('\n')
packages = [line for line in contents
if line and line != '00Meta\n' and ':' not in line]
open(filename, 'wt').write('\n'.join(packages))
return set(packages)
def readAllPackages(self):
for proj in self.projects:
if proj.startswith('{'):
continue
self.packagesIn[proj] = self.get_project_packages(proj)
try:
self.packageVersions[proj] = \
self.read_all_package_versions(proj)
except FileNotFoundError:
self.packageVersions[proj] = {}
def get_project_meta_paths(self, project, repository_name='standard'):
meta_contents = None
meta_root = None
filename = self.cache_path_for('meta-prj', project)
if self.use_cache:
try:
meta_contents = open(filename, 'rt').read()
except FileNotFoundError:
pass
else:
meta_root = ET.fromstring(meta_contents)
if not meta_root:
cmd = self.osc + ['meta', 'prj', project]
proc = subprocess.run(cmd, capture_output=True)
if proc.returncode != 0:
raise RuntimeError('osc command returned an error when '
f'getting the metaprj for {project}:\n' +
proc.stderr.decode('utf-8',
errors='replace'))
open(filename, 'wt').write(proc.stdout.decode('utf-8',
errors='replace'))
try:
meta_root = ET.fromstring(proc.stdout)
except ET.ParseError:
print(f'Obtaining metaprj for {project} got {proc.stdout}')
raise
try:
repository = meta_root.find("repository[@name='standard']")
except AttributeError:
error = meta_root.find('error')
msg = f'Error obtaining repository meta for {project}'
if error is not None:
msg += f': {error.text}'
print(msg)
return None
try:
paths = [(p.attrib['project'], p.attrib['repository'])
for p in repository.findall("path")]
except AttributeError:
error = meta_root.find('error')
msg = f'Error obtaining repository paths for {project}'
if error is not None:
msg += f': {error.text}'
print(msg)
return None
return paths
def get_all_projects_inherited_from_project(self, project,
repository_name='standard'):
r = [project]
paths = self.get_project_meta_paths(project, repository_name)
visited = {(project, repository_name)}
while paths:
prj, repo = paths.pop(0)
try:
while (prj, repo) in visited:
prj, repo = paths.pop(0)
except IndexError:
break
if prj not in r:
r.append(prj)
more_paths = self.get_project_meta_paths(prj, repo)
visited.add((prj, repo))
if more_paths:
paths.extend(more_paths)
return r
def get_package_version(self, project, package, retry=True):
cmd = self.osc + ['api',
f'/source/{project}/{package}?view=info&parse=1']
proc = subprocess.run(cmd, capture_output=True)
if proc.returncode != 0:
stderr = proc.stderr.decode('utf-8', errors='replace')
if (proc.returncode == 1 and
'HTTP Error 400: download in progress' in stderr):
return 'unknown'
raise RuntimeError('osc command returned an error when getting the'
2025-05-20 11:36:12 +02:00
f' version of {project}/{package}:\n' +
proc.stderr.decode('utf-8', errors='replace'))
root = ET.fromstring(proc.stdout)
try:
version = root.find('version').text
except AttributeError:
error = root.find('error')
if error is not None:
print('Error obtaining package version for ' +
f'{project}/{package}: {error.text}')
if error.text == 'download in progress' and retry:
return self.get_package_version(project, package,
retry=False)
else:
print('AttributeError obtaining package version for ' +
f'{project}/{package}')
return None
return version
def get_request_state(self, requestid, retry=True):
cmd = self.osc + ['api',
f'/request/{requestid}']
proc = subprocess.run(cmd, capture_output=True)
if proc.returncode != 0:
raise RuntimeError('osc command returned an error when getting the'
2025-05-20 11:36:12 +02:00
f' state of request {requestid}:\n' +
proc.stderr.decode('utf-8', errors='replace'))
root = ET.fromstring(proc.stdout)
try:
state = root.find('state').attrib['name']
state_msg = '\n'.join([x.strip(' \r\n')
for x in root.find('state').itertext()
if x.strip(' \r\n')])
except AttributeError:
error = root.find('error')
if error is not None:
print('Error obtaining request state for ' +
f'{requestid}: {error.text}')
if error.text == 'download in progress' and retry:
return self.get_request_state(requestid, retry=False)
else:
print('AttributeError obtaining request state for '
f'{requestid}')
return None
return (state, state_msg)
def get_request_comments(self, requestid, only_last=False, retry=True):
cmd = self.osc + ['api',
f'/comments/request/{requestid}']
proc = subprocess.run(cmd, capture_output=True)
if proc.returncode != 0:
raise RuntimeError('osc command returned an error when getting the'
f' comments of request {requestid}:\n' +
proc.stderr.decode('utf-8', errors='replace'))
root = ET.fromstring(proc.stdout)
try:
comments = root.findall('comment')
comments_texts = [f"{x.attrib['who']}: {x.text}" for x in comments]
if only_last:
return comments_texts[-1] if comments_texts else ''
except AttributeError:
error = root.find('error')
if error is not None:
print('Error obtaining request state for ' +
f'{requestid}: {error.text}')
if error.text == 'download in progress' and retry:
return self.get_request_comments(requestid, retry=False)
else:
print('AttributeError obtaining request comments '
f'for {requestid}')
return None
return comments_texts
def get_project_bugowner_maintainer(self, project, retry=True):
if project in self.projects_bugowners:
return (self.projects_bugowners[project],
self.projects_maintainers[project])
cmd = self.osc + ['api',
f'/source/{project}/_meta']
proc = subprocess.run(cmd, capture_output=True)
if proc.returncode != 0:
raise RuntimeError('osc command returned an error when getting the'
f' bugowner/maintiner of project {project}:\n' +
proc.stderr.decode('utf-8', errors='replace'))
root = ET.fromstring(proc.stdout)
import pdb
pdb.set_trace()
try:
b_elements = root.findall("*[@role='bugowner']")
m_elements = root.findall("*[@role='maintainer']")
except AttributeError:
error = root.find('error')
if error is not None:
print('Error obtaining project bugowner for ' +
f'{project}: {error.text}')
if error.text == 'download in progress' and retry:
return self.get_project_bugowner(project, retry=False)
else:
print('AttributeError obtaining project bugowner for ' +
f'{project}')
return None
rb = []
rm = []
for element in b_elements:
rb.append((element.tag, element.attrib[f'{element.tag}id']))
for element in m_elements:
rm.append((element.tag, element.attrib[f'{element.tag}id']))
self.projects_bugowners[project] = rb
self.projects_maintainers[project] = rm
return rb, rm
def get_package_bugowner_maintainer(self, project, package, retry=True):
cmd = self.osc + ['api',
f'/source/{project}/{package}/_meta']
proc = subprocess.run(cmd, capture_output=True)
if proc.returncode != 0:
raise RuntimeError('osc command returned an error when getting the'
' bugowner/maintiner of package '
f'{project}/{package}:\n' +
proc.stderr.decode('utf-8', errors='replace'))
root = ET.fromstring(proc.stdout)
try:
b_elements = root.findall("*[@role='bugowner']")
m_elements = root.findall("*[@role='maintainer']")
except AttributeError:
error = root.find('error')
if error is not None:
print('Error obtaining package bugowner for ' +
f'{project}/{package}: {error.text}')
if error.text == 'download in progress' and retry:
return self.get_package_bugowner(project, package,
retry=False)
else:
print('AttributeError obtaining package bugowner for ' +
f'{project}/{package}')
return None
rb = []
rm = []
for element in b_elements:
rb.append((element.tag, element.attrib[f'{element.tag}id']))
for element in m_elements:
rm.append((element.tag, element.attrib[f'{element.tag}id']))
if not rb or not rm:
prj_b, prj_m = self.get_project_bugowner_maintainer(project)
if not rb:
rb = prj_b
if not rm:
rm = prj_m
return rb, rm
def get_all_package_versions(self, project, packages, filename):
versions = {}
for package in packages:
version = self.get_package_version(project, package)
versions[package] = version
self.packageVersions[project] = versions
self.write_cache_package_versions(project)
return versions
def write_cache_package_versions(self, project):
filename = self.cache_path_for('versions-in', project)
with open(filename, 'w') as fh:
json.dump(self.packageVersions[project], fh)
def read_all_package_versions(self, project):
versions = {}
if self.use_cache:
filename = self.cache_path_for('versions-in', project)
try:
with open(filename, 'r') as fh:
versions = json.load(fh)
except json.decoder.JSONDecodeError:
print(f'JSON error in {filename}. Skipping...')
return versions
def set_params(self, source_projects, target_project,
packages_expression):
self.source_projects = source_projects
self.target_project = target_project
self.packages_expression = packages_expression
self.projects_inherited_from_tgt = \
self.get_all_projects_inherited_from_project(
target_project, repository_name='standard')
tmp_prjs = self.get_all_projects_from_projects_set_expression(
packages_expression)
self.log('Projects inherited from tgt')
self.log(self.projects_inherited_from_tgt)
self.projects = list(tmp_prjs.union(source_projects,
self.projects_inherited_from_tgt,
{target_project}))
self.readAllPackages()
self.packages_to_copy = \
self.get_packages_from_projects_in_set_expression(
packages_expression)
def add_packages_to_packages_to_copy_list(self, packages):
self.packages_to_copy.extend(p for p in packages
if p not in self.packages_to_copy)
def restrict_packages_in_packages_to_copy_list(self, packages):
packages_to_copy = self.packages_to_copy
self.packages_to_copy = [p for p in packages
if p in packages_to_copy]
def get_package_devel_project(self, project, package):
'''Returns the devel project for a package.
Returns 'KDE:Applications' for project 'openSUSE:Factory',
package 'kcron'
'''
if project not in self.devel_projects:
self.read_cache_devel_projects(project)
try:
return self.devel_projects[project][package]
except KeyError:
pass
cmd = self.osc + ['develproject', project, package]
proc = subprocess.run(cmd, capture_output=True)
if proc.returncode != 0:
raise RuntimeError('osc command returned an error when getting the'
f' devel project of {project}/{package}:\n' +
proc.stderr.decode('utf-8', errors='replace'))
self.devel_projects[project][package] = \
proc.stdout.decode('utf-8',
errors='replace').strip('\n').split('/')[0]
self.write_cache_devel_projects(project)
return self.devel_projects[project][package]
def package_is_devel_package_for_project(
self, devel_project, devel_package, project):
'''Checks the devel project for a package.
Returns if devel_project is the devel project of devel_package in
project. For example, returns True for ('KDE:Applications',
'kcron', 'openSUSE:Factory').
'''
real_devel_project = self.get_package_devel_project(project,
devel_package)
return real_devel_project == devel_project
def write_cache_devel_projects(self, project):
filename = self.cache_path_for('devel-projects', project)
with open(filename, 'w') as fh:
json.dump(self.devel_projects[project], fh)
def read_cache_devel_projects(self, project):
self.devel_projects[project] = {}
if not self.use_cache:
return
filename = self.cache_path_for('devel-projects', project)
try:
with open(filename, 'r') as fh:
self.devel_projects[project] = json.load(fh)
except FileNotFoundError:
pass
def get_package_versions_from_source_projects_helper(self,
get_versions=True,
print_progress=False):
self.versions_in_src = {}
self.packageSource = {}
progress = None
if print_progress:
progress = Progress()
progress.max_value = len(self.packages_to_copy) - 1
for idx, package in enumerate(sorted(self.packages_to_copy)):
if progress:
progress.set_value(idx)
for prj in self.source_projects:
if package in self.packagesIn[prj]:
self.packageSource[package] = prj
if not get_versions:
continue
try:
self.versions_in_src[package] = \
self.packageVersions[prj][package]
except KeyError:
# We need to request the version to obs
try:
srcVersion = self.get_package_version(prj, package)
except ET.ParseError:
self.log(f'{prj} / {package} doesn\'t have a '
'version set (probably still building?)')
self.versions_in_src[package] = None
break
self.log(f'Requested src version to obs: {prj}, ' +
f'{package}, {srcVersion}')
if srcVersion:
self.versions_in_src[package] = srcVersion
self.packageVersions[prj][package] = srcVersion
self.write_cache_package_versions(prj)
break
if progress:
print('')
return (self.versions_in_src, self.packageSource)
def get_package_source_from_source_projects(self, print_progress=False):
self.get_package_versions_from_source_projects_helper(
get_versions=False, print_progress=print_progress)
return self.packageSource
def get_package_versions_from_source_projects(self, print_progress=False):
return self.get_package_versions_from_source_projects_helper(
get_versions=True, print_progress=print_progress)
def get_package_versions_from_target_projects(self, print_progress=False):
progress = None
lf = ''
if print_progress:
progress = Progress()
progress.max_value = len(self.packages_to_copy) - 1
lf = '\n'
for idx, package in enumerate(sorted(self.packages_to_copy)):
if progress:
progress.set_value(idx)
try:
tgtVersion = self.versions_in_tgt[package]
except KeyError:
# Obtaining version of {package} from
# {self.projects_inherited_from_tgt}
for prj in self.projects_inherited_from_tgt:
if package not in self.packagesIn[prj]:
# print(f'{package} not available in {prj}')
continue
# Let's see if we already read the package version
# print(f'{package} is in {prj}')
try:
tgtVersion = self.packageVersions[prj][package]
except KeyError:
# We need to request the version to obs
# print(f'Obtainining version of {package} from {prj}')
tgtVersion = self.get_package_version(prj, package)
self.log(f'Requested tgt version to obs: {prj}, ' +
f'{package}, {tgtVersion}')
if tgtVersion:
self.versions_in_tgt[package] = tgtVersion
self.packageVersions[prj][package] = tgtVersion
self.write_cache_package_versions(prj)
break
else:
print(f'{lf}# {package} not found in ' +
f'{self.projects_inherited_from_tgt}')
tgtVersion = None
if progress:
print('')
return self.versions_in_tgt
def get_packages_diff(self, srcPrj, srcPackage, tgtPrj, tgtPackage):
cmd = self.osc + ['rdiff', srcPrj, srcPackage, tgtPrj, tgtPackage]
proc = subprocess.run(cmd, capture_output=True)
if proc.returncode != 0:
raise RuntimeError('osc command returned an error when getting the'
f' rdiff of {srcPrj}/{srcPackage} and '
f'{tgtPrj}/{tgtPackage}:\n' +
proc.stderr.decode('utf-8', errors='replace'))
return proc.stdout.decode('utf-8', errors='replace').strip('\n')
2025-05-21 17:35:55 +02:00
def remove_cache_files(self):
dirpath = self.cache_path_for('', '')
cache_types = ['packages-in', 'meta-prj', 'versions-in', 'devel-projects']
for cache_type in cache_types:
path = f'{dirpath}/{cache_type}-*'
for filename in glob.glob(path):
os.unlink(filename)