Initial release of osc-batch-submit

This is still work in progress from a script I made for my own
and with part of it (the progress file) copied and slightly adapted
from Bard.

osc-batch-submit help submitting packages in batch at once
for large projects like KDE, Qt or gstreamer.
This commit is contained in:
2025-03-14 13:42:09 +01:00
parent 92596d5b63
commit abedc3bdfc
11 changed files with 1254 additions and 0 deletions

1
MANIFEST.in Normal file
View File

@@ -0,0 +1 @@
recursive-include osc_batch_submit/examples *.conf

View File

@@ -0,0 +1,19 @@
ibs = True
use_cache = False
packages_to_copy_expr = '{ffmpeg-7, geany, glfw, libdovi, libplacebo, librist, python-glad2}'
source_projects = ['openSUSE.org:multimedia:libs', 'openSUSE.org:games', 'openSUSE.org:openSUSE:Factory']
target_project = 'SUSE:SLFO:Main'
only_packages_developed_in_project = None
reference = 'dependency of ffmpeg-7'
previous_logfile = ''
set_bugowner = 'group:gnome-maintainers'
force_recheck_bugowner = False

View File

@@ -0,0 +1,15 @@
ibs = True
use_cache = False
packages_to_copy_expr = '((openSUSE.org:multimedia:libs * {.*gst.*}) ∩ SUSE:SLFO:Main)'
source_projects = ['openSUSE.org:multimedia:libs']
target_project = 'SUSE:SLFO:Main'
only_packages_developed_in_project = None
set_bugowner = False
force_recheck_bugowner = False

View File

@@ -0,0 +1,9 @@
ibs = True
use_cache = False
packages_to_copy_expr = '((openSUSE.org:multimedia:libs * {.*gst.*}) ∩ SUSE:SLFO:Main) {pipewire, wireplumber}'
source_projects = ['openSUSE.org:multimedia:libs']
target_project = 'home:alarrosa:branches:SUSE:SLE-15-SP6:Update:gstreamer-1.24.12'

View File

@@ -0,0 +1,19 @@
ibs = False
use_cache = True
packages_to_copy_expr = 'openSUSE:Factory ∩ KDE:Frameworks'
source_projects = ['KDE:Frameworks']
target_project = 'openSUSE:Leap:16.0'
only_packages_developed_in_project = 'KDE:Frameworks'
reference = ''
previous_logfile = ''
set_bugowner = False
force_recheck_bugowner = False

View File

@@ -0,0 +1,19 @@
ibs = False
use_cache = True
packages_to_copy_expr = 'openSUSE:Factory ∩ (KDE:Frameworks KDE:Applications KDE:Extra)'
source_projects = ['KDE:Frameworks', 'KDE:Applications', 'KDE:Extra']
target_project = 'openSUSE:Leap:16.0'
only_packages_developed_in_project = None
reference = ''
previous_logfile = ''
set_bugowner = False
force_recheck_bugowner = False

View File

@@ -0,0 +1,15 @@
ibs = False
use_cache = True
packages_to_copy_expr = '(openSUSE:Backports:SLE-15-SP6 openSUSE:Leap:15.6) ∩ (KDE:Frameworks5)'
source_projects = ['KDE:Frameworks5']
target_project = 'openSUSE:Backports:SLE-15-SP6'
only_packages_developed_in_project = 'openSUSE.org:KDE:Frameworks5'
reference = 'boo#1221200'
previous_logfile = ''

417
osc_batch_submit/main.py Normal file
View File

@@ -0,0 +1,417 @@
# -*- coding: utf-8 -*-
#
# osc-batch-submit
# Copyright (C) 2025 Antonio Larrosa <alarrosa@suse.com>
#
import sys
import re
import os.path
from osc_batch_submit.osc import OSC
from osc_batch_submit.progress import Progress
from packaging import version
from argparse import ArgumentParser, RawTextHelpFormatter
from importlib.util import spec_from_loader, module_from_spec
from importlib.machinery import SourceFileLoader
__version__ = '0.1'
class Output:
def __init__(self, filename, verbose=False):
self.filename = filename
self.fh = open(filename, 'a', buffering=1)
self.verbose = verbose
def write(self, txt):
if self.verbose:
print(txt)
self.fh.write(txt+'\n')
class OSCBatchSubmit:
def __init__(self):
# Default values
self.use_cache = False
self.ibs = False
self.packages_to_copy_expr = ''
self.source_projects = []
self.target_project = ''
self.only_packages_developed_in_project = None
self.reference = None # (like a jira or bugzilla refence)
self.previous_logfile = None # To be able to supersede SRs
self.set_bugowner = False
self.force_recheck_bugowner = False
self.config_file = None
self.verbose = False
self.print_progress = False
def parse_command_line(self):
main_parser = ArgumentParser(
description='Manage your music collection',
formatter_class=RawTextHelpFormatter)
main_parser.add_argument('--version', action='version',
version='osc-batch-submit ' + __version__)
sps = main_parser.add_subparsers(
dest='command', metavar='command',
help='''The following commands are available:
sr
copypac
list-packages
list-bugowners-in-src
list-bugowners-in-tgt
''')
parser = sps.add_parser('submitrequest', aliases=['sr'],
description='create submit requests')
parser.add_argument('config_file')
parser.add_argument('--verbose', '-v', action='store_true')
parser.add_argument('--progress', action='store_true')
parser = sps.add_parser('copypac', description='copy packages')
parser.add_argument('config_file')
parser.add_argument('--verbose', '-v', action='store_true')
parser.add_argument('--progress', action='store_true')
parser = sps.add_parser('list-packages',
description='Just list packages that '
'would be copied/submitted')
parser.add_argument('config_file')
parser = sps.add_parser('list-bugowners-in-src',
description='List bugowner/maintainers in '
'source projects')
parser.add_argument('config_file')
parser = sps.add_parser('list-bugowners-in-tgt',
description='List bugowner/maintainers in '
'target projects')
parser.add_argument('config_file')
options = main_parser.parse_args()
self.command = options.command
if getattr(options, 'config_file'):
if not options.config_file.endswith('.conf'):
print(f'Please rename {options.config_file} to end in .conf')
sys.exit(1)
self.config_file = options.config_file
self.base_name = self.config_file.removesuffix('.conf')
self.load_data_from_config_file()
if getattr(options, 'verbose'):
self.verbose = options.verbose
if getattr(options, 'progress'):
self.print_progress = options.progress
def load_data_from_config_file(self):
spec = spec_from_loader(self.base_name,
SourceFileLoader(self.base_name,
self.config_file))
data = module_from_spec(spec)
spec.loader.exec_module(data)
for var in [x for x in dir(data) if not x.startswith("__")]:
setattr(self, var, getattr(data, var))
def run(self):
print(f'Package list expression: {self.packages_to_copy_expr}')
print(f'From: {self.source_projects}')
print(f'To: {self.target_project}')
commands_file = self.base_name + '.sh'
idx = 1
while os.path.exists(commands_file):
idx += 1
commands_file = self.base_name + f'-{idx}.sh'
new_logfile = f'logs/{self.base_name}.log'
idx = 1
while os.path.exists(new_logfile):
idx += 1
new_logfile = f'logs/{self.base_name}-{idx}.log'
osc = OSC(self.use_cache, ibs=self.ibs)
print('Obtaining the list of packages...')
osc.set_params(self.source_projects, self.target_project,
self.packages_to_copy_expr)
packagesToCopy = osc.packages_to_copy
if self.ibs:
factoryProject = 'openSUSE.org:openSUSE:Factory'
else:
factoryProject = 'openSUSE:Factory'
if self.verbose:
print('Package list: ', ' '.join(packagesToCopy))
if self.only_packages_developed_in_project:
packagesToCopy = [package for package in packagesToCopy
if osc.get_package_devel_project(
factoryProject, package) in
[self.only_packages_developed_in_project]]
print('Package list: ', ' '.join(packagesToCopy))
if self.command == 'list-packages':
print(' '.join(sorted(packagesToCopy)))
sys.exit(0)
if self.command in ['list-bugowners-in-src', 'list-bugowners-in-tgt']:
prj = self.target_project
if self.command == 'list-bugowners-in-src':
src_project_for_packages = \
osc.get_package_source_from_source_projects()
for package in sorted(packagesToCopy):
if self.command == 'list-bugowners-in-src':
prj = src_project_for_packages[package]
rb, rm = osc.get_package_bugowner_maintainer(prj, package)
print(f'{prj}/{package}')
if rb:
print(' bugowners: ', ' '.join(rb))
else:
print(' bugowners: -')
if rm:
print(' maintainers: ', ' '.join(rm))
else:
print(' maintainers: -')
sys.exit(0)
print('Getting packages versions from source projects...')
versions_in_source, src_project_for_packages = \
osc.get_package_versions_from_source_projects(
print_progress=self.print_progress)
print('Getting packages versions from target projects...')
versions_in_target = osc.get_package_versions_from_target_projects(
print_progress=self.print_progress)
srcPrj_has_to_match = False
prev_SRs = {}
if self.previous_logfile:
contents = open(self.previous_logfile, 'r').read()
sections = contents.split('#-# ')
for x, y in [x.split('\n', 1) for x in sections if x]:
sr_id = re.search(r'created request id ([0-9]*)', y)
if sr_id:
if not srcPrj_has_to_match:
x = x[x.find(' ')+1:]
prev_SRs[x] = sr_id.group(1)
print('Obtaining package diffs and generating output file...')
output = Output(commands_file, verbose=self.verbose)
output.write('#!/bin/sh')
os.chmod(commands_file, 0o764)
output.write(f'logfile="{new_logfile}"')
msg_log_files = []
progress = None
if not self.verbose and self.print_progress:
progress = Progress()
progress.max_value = len(packagesToCopy) - 1
for idx, package in enumerate(sorted(packagesToCopy)):
if progress:
progress.set_value(idx)
srcPrj = src_project_for_packages[package]
key = f'{srcPrj} {package} {self.target_project}'
output.write(f'echo "#-# {key}" | tee -a "$logfile"')
if not srcPrj_has_to_match:
key = f'{package} {self.target_project}'
try:
supersede_SR = f'-s {prev_SRs[key]}'
# Just in case the user wants to keep the last SR in the log
# and comment out the superseding of a SR
output.write(f'#echo "created request id {prev_SRs[key]}" '
'| tee -a "$logfile"')
except KeyError:
supersede_SR = ''
try:
srcVersion = versions_in_source[package]
except KeyError:
output.write(f'# Error getting source version of {package} '
'(maybe the package is broken or never built?). '
'Skipping...')
srcVersion = None
continue
try:
tgtVersion = versions_in_target[package]
except KeyError:
tgtVersion = None
# check that the versions can be parsed
try:
version.parse(srcVersion)
version.parse(tgtVersion)
versions_can_be_parsed = True
except version.InvalidVersion:
output.write(f'# {srcVersion} and/or {tgtVersion} '
'can\'t be parsed')
versions_can_be_parsed = False
except TypeError:
output.write(f'# {srcVersion} and/or {tgtVersion} '
'can\'t be parsed')
versions_can_be_parsed = False
diff_log = None
if tgtVersion and srcVersion == tgtVersion:
diff_log = osc.get_packages_diff(self.target_project,
package, srcPrj, package)
if not diff_log:
output.write(f'# {package} is the same version and exactly'
f'the same in {srcPrj} and '
f'{self.target_project} ({srcVersion})')
continue
elif (srcVersion and tgtVersion and versions_can_be_parsed and
version.parse(srcVersion) < version.parse(tgtVersion)):
output.write(f'# {package} is older in {srcPrj} than in '
f'{self.target_project} !! ({srcVersion} -> '
f'{tgtVersion})')
continue
elif (srcVersion and tgtVersion and not versions_can_be_parsed and
srcVersion < tgtVersion):
output.write(f'# {package} is older in {srcPrj} than in '
f'{self.target_project} !! ({srcVersion} -> '
f'{tgtVersion})')
continue
elif not srcVersion or not tgtVersion:
output.write(f'# srcVersion ({srcVersion}) or tgtVersion '
f'({tgtVersion}) couldn\'t be obtained')
if diff_log is None:
diff_log = osc.get_packages_diff(self.target_project, package,
srcPrj, package)
# Check for removed patches
diff_lines = diff_log.split('\n')
removed_patches = [re.sub(r'-Patch[0-9]+: *', '', x)
for x in diff_lines if x.startswith('-Patch')]
for line in diff_lines:
if (line.startswith('--- ') or line.startswith('+++ ') or
not line.startswith('+')):
continue
for patch in removed_patches[:]:
if patch in line:
removed_patches.remove(patch)
write_message_file = bool(removed_patches)
write_diff = True
try:
if self.reference:
reference_text = f' ({self.reference})'
else:
reference_text = ''
except NameError:
reference_text = ''
if tgtVersion:
if srcVersion == tgtVersion:
msg = (f'Update package {package} {tgtVersion} with '
f'changes but same version{reference_text}')
else:
msg = (f'Update package {package} from {tgtVersion} to '
f'{srcVersion}{reference_text}')
else:
msg = f'New package {package} {srcVersion}{reference_text}'
if self.set_bugowner:
if isinstance(self.set_bugowner, str):
msg += f'\nbugowner: {self.set_bugowner}\n'
else:
print('Please set "set_bugowner" to the user or group '
'that should be the bugowner')
sys.exit(1)
write_message_file = self.force_recheck_bugowner
write_diff = False
if write_message_file:
filename = (f'log-messages/{srcPrj}__{self.target_project}'
f'__{package}.txt')
separator = '\n' + '-' * 72 + '\n'
if os.path.exists(filename):
contents = open(filename, 'r').read()
previous_contents = contents[:contents.find(separator)]
else:
previous_contents = ''
with open(filename, 'w') as fh:
txt = msg + '\n'
for patch in removed_patches:
txt += f'* {patch}\n'
if previous_contents:
if txt.strip(' \n') == previous_contents.strip(' \n'):
fh.write(previous_contents)
else:
fh.write(txt + '\n' + previous_contents)
output.write('# DO NOT FORGET TO REVIEW '
f'{filename} FOR CHANGES!')
else:
fh.write(txt)
if write_diff:
fh.write(separator + diff_log)
msg_param = f'-F {filename}'
msg_log_files.append(filename)
else:
msg_param = f'-m "{msg}"'
output.write(f'{osc.osc_command()} {self.command} {supersede_SR} '
f'{msg_param} {srcPrj} {package} '
f'{self.target_project} | tee -a "$logfile"')
if msg_log_files:
output.write('')
output.write('# Check following msg log files:')
output.write(f'{" ".join(msg_log_files)}')
output.write('')
output.write('echo "Log file written to: $logfile"')
if (not commands_file.startswith('./') and
not commands_file.startswith('/')):
commands_file = './' + commands_file
print(f'Commands written to: {commands_file}')
print('Done')
def main():
# try:
# config_file = sys.argv[1]
# except (IndexError, FileNotFoundError):
# print('Please pass a config file as parameter')
# sys.exit(1)
#
# command = None
# try:
# if sys.argv[2] == '--list-packages':
# command = 'list-packages'
# except IndexError:
# pass
#
# try:
# if sys.argv[2] == '--list-bugowners-in-src':
# command = 'list-bugowners-in-src'
# elif sys.argv[2] == '--list-bugowners-in-tgt':
# command = 'list-bugowners-in-tgt'
# except IndexError:
# pass
app = OSCBatchSubmit()
app.parse_command_line()
app.run()
if __name__ == "__main__":
main()

643
osc_batch_submit/osc.py Normal file
View File

@@ -0,0 +1,643 @@
#
# osc-batch-submit
# Copyright (C) 2025 Antonio Larrosa <alarrosa@suse.com>
#
import xml.etree.ElementTree as ET
import subprocess
import json
import re
import os
from osc_batch_submit.progress import Progress
# project_with_packages_to_copy = '(((SUSE:SLE-15-SP4:GA SUSE:SLE-15-SP3:Update SUSE:SLE-15-SP3:GA SUSE:SLE-15-SP2:Update SUSE:SLE-15-SP2:GA) ∩ openSUSE.org:KDE:Frameworks5) home:alarrosa:branches:SUSE:SLE-15-SP4:GA:kf5) SUSE:SLE-15-SP4:GA:Staging:B' # noqa
# srcProjects = ['openSUSE.org:KDE:Frameworks5']
#
# tgtProjects = ['SUSE:SLE-15-SP4:GA',
# 'SUSE:SLE-15-SP3:Update',
# 'SUSE:SLE-15-SP3:GA',
# 'SUSE:SLE-15-SP2:Update',
# 'SUSE:SLE-15-SP2:GA',
# 'SUSE:SLE-15-SP1:GA',
# 'SUSE:SLE-15:GA']
# tgtProject = 'SUSE:SLE-15-SP4:GA'
def find_matching_parenthesis(string, start):
# start is the position of the opening parenthesis
# returns the position of the matching closing parenthesis
level = 1
for i, ch in enumerate(string[start + 1:]):
if ch == '(' or ch == '{':
level += 1
elif ch == ')' or ch == '}':
level -= 1
if level == 0: # Found the matching parenthesis
return i + start + 1
return len(string)
class OSC:
def __init__(self, use_cache=True, ibs=False):
self.debug = False
self.use_cache = use_cache
self.cache_dirpath = None
# The directory path where cached files are stored.
self.projects = []
self.src_projects = []
self.tgt_project = ''
self.projects_inherited_from_tgt = []
self.packagesIn = {}
self.packageVersions = {}
self.versions_in_src = {}
self.versions_in_tgt = {}
self.devel_projects = {}
# Contains [project][package] -> 'devel_project'
self.projects_bugowners = {}
self.projects_maintainers = {}
self.command = ''
if ibs:
self.osc = ['osc', '-A', 'https://api.suse.de']
else:
self.osc = ['osc']
def log(self, *args):
if self.debug:
print(*args)
def osc_command(self):
return " ".join(self.osc)
def get_all_projects_from_projects_set_expression(self, prj_set_expr):
tmp = re.sub('{[^}]*}', '', prj_set_expr)
return {x.strip()
for x in re.split(r'[∩∪\*()]', tmp)
if x.strip()}
def get_packages_from_projects_in_set_expression(self, prj_set_expr):
prj_set_expr = prj_set_expr.strip()
self.log('get_packages_from_projects_in_set_expr: ', prj_set_expr)
if not re.findall(r'[∩∪\*()\{}]', prj_set_expr):
return self.packagesIn[prj_set_expr]
tok = next(re.finditer(r'[∩∪\*(\{]', prj_set_expr))
self.log(tok.group(0))
if tok.group(0) == '(':
end_parenthesis = find_matching_parenthesis(prj_set_expr,
tok.start())
if tok.start() == 0 and end_parenthesis == len(prj_set_expr) - 1:
return self.get_packages_from_projects_in_set_expression(
prj_set_expr[1:-1])
left_op = prj_set_expr[tok.start() + 1: end_parenthesis].strip()
tok = next(re.finditer(r'[∩∪\*]',
prj_set_expr[end_parenthesis + 1:]))
right_op = prj_set_expr[end_parenthesis + tok.start() + 2:].strip()
elif tok.group(0) == '{':
end_parenthesis = find_matching_parenthesis(prj_set_expr,
tok.start())
if tok.start() == 0 and end_parenthesis == len(prj_set_expr) - 1:
package_list_expr = prj_set_expr[tok.start() + 1:
end_parenthesis].strip()
self.log('package_list_expr', package_list_expr)
self.log(package_list_expr.split(','))
return set([x.strip() for x in package_list_expr.split(',')])
left_op = prj_set_expr[tok.start():
end_parenthesis + 1] # Includes the {}
tok = next(re.finditer(r'[∩∪\*]',
prj_set_expr[end_parenthesis + 1:]))
right_op = prj_set_expr[end_parenthesis + tok.start() + 2:].strip()
else:
left_op = prj_set_expr[0:tok.start()-1].strip()
right_op = prj_set_expr[tok.start()+1:].strip()
op = tok.group(0)
self.log(f'left op: {left_op}')
self.log(f'op: {op}')
self.log(f'right op: {right_op}')
left_set = self.get_packages_from_projects_in_set_expression(left_op)
right_set = self.get_packages_from_projects_in_set_expression(right_op)
if op == '':
return left_set.union(right_set)
elif op == '':
self.log('left op pkgs:', left_set)
self.log('right op pkgs:', right_set)
return left_set.intersection(right_set)
elif op == '':
if isinstance(right_set, list):
right_set = set(right_set)
self.log(f'1 {left_set}')
self.log(f'2 {right_set}')
self.log(f'3 {left_set - right_set}')
return left_set - right_set
elif op == '*':
r = set()
for pkg in left_set:
for pattern in right_set:
if re.match(pattern, pkg):
r.add(pkg)
return r
else:
return self.packagesIn[prj_set_expr]
def cache_path_for(self, cache_type, project):
if not self.cache_dirpath:
cache_path = os.getenv('XDG_CACHE_HOME')
if not cache_path:
cache_path = os.path.expanduser('~/.cache')
self.cache_dirpath = f'{cache_path}/osc_batch_tool'
try:
os.mkdir(self.cache_dirpath)
except FileExistsError:
pass
return f'{self.cache_dirpath}/{cache_type}-{project}'
def get_project_packages(self, project):
filename = self.cache_path_for('packages-in', project)
if self.use_cache:
try:
return set([line.strip('\n')
for line in open(filename, 'rt').readlines()])
except FileNotFoundError:
pass
cmd = self.osc + ['ls', project]
output = subprocess.check_output(cmd)
contents = output.decode('utf-8', errors='replace').split('\n')
packages = [line for line in contents
if line and line != '00Meta\n' and ':' not in line]
open(filename, 'wt').write('\n'.join(packages))
return set(packages)
def readAllPackages(self):
for proj in self.projects:
if proj.startswith('{'):
continue
self.packagesIn[proj] = self.get_project_packages(proj)
try:
self.packageVersions[proj] = \
self.read_all_package_versions(proj)
except FileNotFoundError:
self.packageVersions[proj] = {}
def get_project_meta_paths(self, project, repository_name='standard'):
meta_contents = None
meta_root = None
filename = self.cache_path_for('meta-prj', project)
if self.use_cache:
try:
meta_contents = open(filename, 'rt').read()
except FileNotFoundError:
pass
else:
meta_root = ET.fromstring(meta_contents)
if not meta_root:
cmd = self.osc + ['meta', 'prj', project]
proc = subprocess.run(cmd, capture_output=True)
if proc.returncode != 0:
raise RuntimeError('osc command returned an error when '
f'getting the metaprj for {project}:\n' +
proc.stderr.decode('utf-8',
errors='replace'))
open(filename, 'wt').write(proc.stdout.decode('utf-8',
errors='replace'))
try:
meta_root = ET.fromstring(proc.stdout)
except ET.ParseError:
print(f'Obtaining metaprj for {project} got {proc.stdout}')
raise
try:
repository = meta_root.find("repository[@name='standard']")
except AttributeError:
error = meta_root.find('error')
msg = f'Error obtaining repository meta for {project}'
if error is not None:
msg += f': {error.text}'
print(msg)
return None
try:
paths = [(p.attrib['project'], p.attrib['repository'])
for p in repository.findall("path")]
except AttributeError:
error = meta_root.find('error')
msg = f'Error obtaining repository paths for {project}'
if error is not None:
msg += f': {error.text}'
print(msg)
return None
return paths
def get_all_projects_inherited_from_project(self, project,
repository_name='standard'):
r = [project]
paths = self.get_project_meta_paths(project, repository_name)
visited = {(project, repository_name)}
while paths:
prj, repo = paths.pop(0)
try:
while (prj, repo) in visited:
prj, repo = paths.pop(0)
except IndexError:
break
if prj not in r:
r.append(prj)
more_paths = self.get_project_meta_paths(prj, repo)
visited.add((prj, repo))
if more_paths:
paths.extend(more_paths)
return r
def get_package_version(self, project, package, retry=True):
cmd = self.osc + ['api',
f'/source/{project}/{package}?view=info&parse=1']
proc = subprocess.run(cmd, capture_output=True)
if proc.returncode != 0:
raise RuntimeError('osc command returned an error when getting the'
f'version of {project}/{package}:\n' +
proc.stderr.decode('utf-8', errors='replace'))
root = ET.fromstring(proc.stdout)
try:
version = root.find('version').text
except AttributeError:
error = root.find('error')
if error is not None:
print('Error obtaining package version for ' +
f'{project}/{package}: {error.text}')
if error.text == 'download in progress' and retry:
return self.get_package_version(project, package,
retry=False)
else:
print('AttributeError obtaining package version for ' +
f'{project}/{package}')
return None
return version
def get_request_state(self, requestid, retry=True):
cmd = self.osc + ['api',
f'/request/{requestid}']
proc = subprocess.run(cmd, capture_output=True)
if proc.returncode != 0:
raise RuntimeError('osc command returned an error when getting the'
f'state of request {requestid}:\n' +
proc.stderr.decode('utf-8', errors='replace'))
root = ET.fromstring(proc.stdout)
try:
state = root.find('state').attrib['name']
state_msg = '\n'.join([x.strip(' \r\n')
for x in root.find('state').itertext()
if x.strip(' \r\n')])
except AttributeError:
error = root.find('error')
if error is not None:
print('Error obtaining request state for ' +
f'{requestid}: {error.text}')
if error.text == 'download in progress' and retry:
return self.get_request_state(requestid, retry=False)
else:
print('AttributeError obtaining request state for '
f'{requestid}')
return None
return (state, state_msg)
def get_request_comments(self, requestid, only_last=False, retry=True):
cmd = self.osc + ['api',
f'/comments/request/{requestid}']
proc = subprocess.run(cmd, capture_output=True)
if proc.returncode != 0:
raise RuntimeError('osc command returned an error when getting the'
f' comments of request {requestid}:\n' +
proc.stderr.decode('utf-8', errors='replace'))
root = ET.fromstring(proc.stdout)
try:
comments = root.findall('comment')
comments_texts = [f"{x.attrib['who']}: {x.text}" for x in comments]
if only_last:
return comments_texts[-1] if comments_texts else ''
except AttributeError:
error = root.find('error')
if error is not None:
print('Error obtaining request state for ' +
f'{requestid}: {error.text}')
if error.text == 'download in progress' and retry:
return self.get_request_comments(requestid, retry=False)
else:
print('AttributeError obtaining request comments '
f'for {requestid}')
return None
return comments_texts
def get_project_bugowner_maintainer(self, project, retry=True):
if project in self.projects_bugowners:
return (self.projects_bugowners[project],
self.projects_maintainers[project])
cmd = self.osc + ['api',
f'/source/{project}/_meta']
proc = subprocess.run(cmd, capture_output=True)
if proc.returncode != 0:
raise RuntimeError('osc command returned an error when getting the'
f' bugowner/maintiner of project {project}:\n' +
proc.stderr.decode('utf-8', errors='replace'))
root = ET.fromstring(proc.stdout)
import pdb
pdb.set_trace()
try:
b_elements = root.findall("*[@role='bugowner']")
m_elements = root.findall("*[@role='maintainer']")
except AttributeError:
error = root.find('error')
if error is not None:
print('Error obtaining project bugowner for ' +
f'{project}: {error.text}')
if error.text == 'download in progress' and retry:
return self.get_project_bugowner(project, retry=False)
else:
print('AttributeError obtaining project bugowner for ' +
f'{project}')
return None
rb = []
rm = []
for element in b_elements:
rb.append((element.tag, element.attrib[f'{element.tag}id']))
for element in m_elements:
rm.append((element.tag, element.attrib[f'{element.tag}id']))
self.projects_bugowners[project] = rb
self.projects_maintainers[project] = rm
return rb, rm
def get_package_bugowner_maintainer(self, project, package, retry=True):
cmd = self.osc + ['api',
f'/source/{project}/{package}/_meta']
proc = subprocess.run(cmd, capture_output=True)
if proc.returncode != 0:
raise RuntimeError('osc command returned an error when getting the'
' bugowner/maintiner of package '
f'{project}/{package}:\n' +
proc.stderr.decode('utf-8', errors='replace'))
root = ET.fromstring(proc.stdout)
try:
b_elements = root.findall("*[@role='bugowner']")
m_elements = root.findall("*[@role='maintainer']")
except AttributeError:
error = root.find('error')
if error is not None:
print('Error obtaining package bugowner for ' +
f'{project}/{package}: {error.text}')
if error.text == 'download in progress' and retry:
return self.get_package_bugowner(project, package,
retry=False)
else:
print('AttributeError obtaining package bugowner for ' +
f'{project}/{package}')
return None
rb = []
rm = []
for element in b_elements:
rb.append((element.tag, element.attrib[f'{element.tag}id']))
for element in m_elements:
rm.append((element.tag, element.attrib[f'{element.tag}id']))
if not rb or not rm:
prj_b, prj_m = self.get_project_bugowner_maintainer(project)
if not rb:
rb = prj_b
if not rm:
rm = prj_m
return rb, rm
def get_all_package_versions(self, project, packages, filename):
versions = {}
for package in packages:
version = self.get_package_version(project, package)
versions[package] = version
self.packageVersions[project] = versions
self.write_cache_package_versions(project)
return versions
def write_cache_package_versions(self, project):
filename = self.cache_path_for('versions-in', project)
with open(filename, 'w') as fh:
json.dump(self.packageVersions[project], fh)
def read_all_package_versions(self, project):
versions = {}
if self.use_cache:
filename = self.cache_path_for('versions-in', project)
try:
with open(filename, 'r') as fh:
versions = json.load(fh)
except json.decoder.JSONDecodeError:
print(f'JSON error in {filename}. Skipping...')
return versions
def set_params(self, source_projects, target_project,
packages_to_copy_expression):
self.source_projects = source_projects
self.target_project = target_project
self.packages_to_copy_expression = packages_to_copy_expression
self.projects_inherited_from_tgt = \
self.get_all_projects_inherited_from_project(
target_project, repository_name='standard')
tmp_prjs = self.get_all_projects_from_projects_set_expression(
packages_to_copy_expression)
self.log('Projects inherited from tgt')
self.log(self.projects_inherited_from_tgt)
self.projects = list(tmp_prjs.union(source_projects,
self.projects_inherited_from_tgt,
{target_project}))
self.readAllPackages()
self.packages_to_copy = \
self.get_packages_from_projects_in_set_expression(
packages_to_copy_expression)
def add_packages_to_packages_to_copy_list(self, packages):
self.packages_to_copy.extend(p for p in packages
if p not in self.packages_to_copy)
def restrict_packages_in_packages_to_copy_list(self, packages):
packages_to_copy = self.packages_to_copy
self.packages_to_copy = [p for p in packages
if p in packages_to_copy]
def get_package_devel_project(self, project, package):
'''Returns the devel project for a package.
Returns 'KDE:Applications' for project 'openSUSE:Factory',
package 'kcron'
'''
if project not in self.devel_projects:
self.read_cache_devel_projects(project)
try:
return self.devel_projects[project][package]
except KeyError:
pass
cmd = self.osc + ['develproject', project, package]
proc = subprocess.run(cmd, capture_output=True)
if proc.returncode != 0:
raise RuntimeError('osc command returned an error when getting the'
f' devel project of {project}/{package}:\n' +
proc.stderr.decode('utf-8', errors='replace'))
self.devel_projects[project][package] = \
proc.stdout.decode('utf-8',
errors='replace').strip('\n').split('/')[0]
self.write_cache_devel_projects(project)
return self.devel_projects[project][package]
def package_is_devel_package_for_project(
self, devel_project, devel_package, project):
'''Checks the devel project for a package.
Returns if devel_project is the devel project of devel_package in
project. For example, returns True for ('KDE:Applications',
'kcron', 'openSUSE:Factory').
'''
real_devel_project = self.get_package_devel_project(project,
devel_package)
return real_devel_project == devel_project
def write_cache_devel_projects(self, project):
filename = self.cache_path_for('devel-projects', project)
with open(filename, 'w') as fh:
json.dump(self.devel_projects[project], fh)
def read_cache_devel_projects(self, project):
self.devel_projects[project] = {}
if not self.use_cache:
return
filename = self.cache_path_for('devel-projects', project)
try:
with open(filename, 'r') as fh:
self.devel_projects[project] = json.load(fh)
except FileNotFoundError:
pass
def get_package_versions_from_source_projects_helper(self,
get_versions=True,
print_progress=False):
self.versions_in_src = {}
self.packageSource = {}
progress = None
if print_progress:
progress = Progress()
progress.max_value = len(self.packages_to_copy) - 1
for idx, package in enumerate(sorted(self.packages_to_copy)):
if progress:
progress.set_value(idx)
for prj in self.source_projects:
if package in self.packagesIn[prj]:
self.packageSource[package] = prj
if not get_versions:
continue
try:
self.versions_in_src[package] = \
self.packageVersions[prj][package]
except KeyError:
# We need to request the version to obs
try:
srcVersion = self.get_package_version(prj, package)
except ET.ParseError:
self.log(f'{prj} / {package} doesn\'t have a '
'version set (probably still building?)')
self.versions_in_src[package] = None
break
self.log(f'Requested src version to obs: {prj}, ' +
f'{package}, {srcVersion}')
if srcVersion:
self.versions_in_src[package] = srcVersion
self.packageVersions[prj][package] = srcVersion
self.write_cache_package_versions(prj)
break
if progress:
print('')
return (self.versions_in_src, self.packageSource)
def get_package_source_from_source_projects(self, print_progress=False):
self.get_package_versions_from_source_projects_helper(
get_versions=False, print_progress=print_progress)
return self.packageSource
def get_package_versions_from_source_projects(self, print_progress=False):
return self.get_package_versions_from_source_projects_helper(
get_versions=True, print_progress=print_progress)
def get_package_versions_from_target_projects(self, print_progress=False):
progress = None
lf = ''
if print_progress:
progress = Progress()
progress.max_value = len(self.packages_to_copy) - 1
lf = '\n'
for idx, package in enumerate(sorted(self.packages_to_copy)):
if progress:
progress.set_value(idx)
try:
tgtVersion = self.versions_in_tgt[package]
except KeyError:
# Obtaining version of {package} from
# {self.projects_inherited_from_tgt}
for prj in self.projects_inherited_from_tgt:
if package not in self.packagesIn[prj]:
# print(f'{package} not available in {prj}')
continue
# Let's see if we already read the package version
# print(f'{package} is in {prj}')
try:
tgtVersion = self.packageVersions[prj][package]
except KeyError:
# We need to request the version to obs
# print(f'Obtainining version of {package} from {prj}')
tgtVersion = self.get_package_version(prj, package)
self.log(f'Requested tgt version to obs: {prj}, ' +
f'{package}, {tgtVersion}')
if tgtVersion:
self.versions_in_tgt[package] = tgtVersion
self.packageVersions[prj][package] = tgtVersion
self.write_cache_package_versions(prj)
break
else:
print(f'{lf}# {package} not found in ' +
f'{self.projects_inherited_from_tgt}')
tgtVersion = None
if progress:
print('')
return self.versions_in_tgt
def get_packages_diff(self, srcPrj, srcPackage, tgtPrj, tgtPackage):
cmd = self.osc + ['rdiff', srcPrj, srcPackage, tgtPrj, tgtPackage]
proc = subprocess.run(cmd, capture_output=True)
if proc.returncode != 0:
raise RuntimeError('osc command returned an error when getting the'
f' rdiff of {srcPrj}/{srcPackage} and '
f'{tgtPrj}/{tgtPackage}:\n' +
proc.stderr.decode('utf-8', errors='replace'))
return proc.stdout.decode('utf-8', errors='replace').strip('\n')

View File

@@ -0,0 +1,72 @@
# -*- coding: utf-8 -*-
import os
import sys
import re
class TerminalColors:
Red = '\033[91m'
ENDC = '\033[0m'
def printableLen(text):
"""Return length of printable characters in string."""
strip_ANSI_pat = re.compile(r"""\x1b\[[;\d]*[A-Za-z]""", re.VERBOSE)
try:
return len(strip_ANSI_pat.sub("", text))
except TypeError:
print(text, type(text))
raise
class Progress:
def __init__(self, precision=0, prefix=''):
"""Create a Progress object."""
self.max_value = 100
self.min_value = 0
self.value = 0
self.last_text = ''
self.printed = False
self.isatty = sys.stdout.isatty()
if not self.isatty:
print(prefix, end='')
self.prefix = prefix
self.format = '%%.%df' % precision
def set_value(self, value, print_=True):
self.value = value
if print_:
self.print_percentage()
def remove_print(self):
if not self.printed or not self.isatty:
return
backspaces = '\b' * len(self.last_text)
print(backspaces, end='', flush=True)
self.printed = False
def print_percentage(self):
max_width = os.get_terminal_size().columns
percent = ((self.value - self.min_value) * 100.0 /
(self.max_value - self.min_value))
if not self.isatty:
if percent == 100:
print('100%')
return
tmp = (self.prefix + self.format % percent) + '%'
if max_width > 30:
max_width -= 10 + printableLen(tmp)
bar_len = int(max_width * percent // 100)
if percent < 100:
tmp += (' ' + TerminalColors.Red + ('' * bar_len) +
TerminalColors.ENDC + '' * int(max_width - bar_len))
else:
tmp += ' ' * (max_width + 1)
if tmp != self.last_text:
self.remove_print()
print(tmp, end='', flush=True)
self.last_text = tmp
self.printed = True

25
pyproject.toml Normal file
View File

@@ -0,0 +1,25 @@
[project]
name = "osc-batch-submit"
description = "Tool to create batch commands for osc"
requires-python = ">=3.7"
version = "0.1"
authors = [
{name = "Antonio Larrosa", email = "alarrosa@suse.com"},
]
classifiers = [
"Development Status :: 4 - Beta",
"Environment :: Console",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
]
license = {text = "GPL-3.0"}
dependencies = [
"packaging",
]
[build-system]
requires = ["setuptools>=62.4.0"]
build-backend = "setuptools.build_meta"
[project.scripts]
osc-batch-submit = "osc_batch_submit.main:main"