Files

436 lines
16 KiB
Python

# -*- coding: utf-8 -*-
#
# osc-batch-submit
# Copyright (C) 2025 Antonio Larrosa <alarrosa@suse.com>
#
import sys
import re
import os.path
from osc_batch_submit.osc import OSC
from osc_batch_submit.progress import Progress
from packaging import version
from argparse import ArgumentParser, RawTextHelpFormatter
from importlib.util import spec_from_loader, module_from_spec
from importlib.machinery import SourceFileLoader
__version__ = '0.1'
class Output:
def __init__(self, filename, verbose=False):
self.filename = filename
self.fh = open(filename, 'a', buffering=1)
self.verbose = verbose
def write(self, txt):
if self.verbose:
print(txt)
self.fh.write(txt+'\n')
class OSCBatchSubmit:
def __init__(self):
# Default values
self.use_cache = False
self.ibs = False
self.packages = ''
self.source_projects = []
self.target_project = ''
self.only_packages_developed_in_project = None
self.reference = None # (like a jira or bugzilla refence)
self.previous_logfile = None # To be able to supersede SRs
self.set_bugowner = False
self.force_write_message_file = False
self.config_file = None
self.verbose = False
self.print_progress = False
def parse_command_line(self):
main_parser = ArgumentParser(
description='Manage Batch Submissions',
formatter_class=RawTextHelpFormatter)
main_parser.add_argument('--version', action='version',
version='osc-batch-submit ' + __version__)
sps = main_parser.add_subparsers(
dest='command', metavar='command',
help='''The following commands are available:
sr
copypac
list-packages
list-bugowners-in-src
list-bugowners-in-tgt
remove-cache
''')
parser = sps.add_parser('submitrequest', aliases=['sr'],
description='create submit requests')
parser.add_argument('config_file')
parser.add_argument('--verbose', '-v', action='store_true')
parser.add_argument('--cache', action='store_true')
parser.add_argument('--progress', action='store_true')
parser = sps.add_parser('copypac', description='copy packages')
parser.add_argument('config_file')
parser.add_argument('--verbose', '-v', action='store_true')
parser.add_argument('--cache', action='store_true')
parser.add_argument('--progress', action='store_true')
parser = sps.add_parser('list-packages',
description='Just list packages that '
'would be copied/submitted')
parser.add_argument('config_file')
parser = sps.add_parser('list-bugowners-in-src',
description='List bugowner/maintainers in '
'source projects')
parser.add_argument('config_file')
parser = sps.add_parser('list-bugowners-in-tgt',
description='List bugowner/maintainers in '
'target projects')
parser.add_argument('config_file')
sps.add_parser('remove-cache',
description='Remove cache contents')
options = main_parser.parse_args()
if not options.command:
main_parser.print_help()
sys.exit(1)
self.command = options.command
if getattr(options, 'config_file', None):
if not options.config_file.endswith('.conf'):
print(f'Please rename {options.config_file} to end in .conf')
sys.exit(1)
self.config_file = options.config_file
self.base_name = self.config_file.removesuffix('.conf')
self.load_data_from_config_file()
if getattr(options, 'verbose', None):
self.verbose = options.verbose
if getattr(options, 'progress', None):
self.print_progress = options.progress
if getattr(options, 'cache', None):
self.use_cache = options.cache
def load_data_from_config_file(self):
spec = spec_from_loader(self.base_name,
SourceFileLoader(self.base_name,
self.config_file))
data = module_from_spec(spec)
spec.loader.exec_module(data)
for var in [x for x in dir(data) if not x.startswith("__")]:
setattr(self, var, getattr(data, var))
def run(self):
osc = OSC(self.use_cache, ibs=self.ibs)
if self.command == 'remove-cache':
osc.remove_cache_files()
return
print(f'Package list expression: {self.packages}')
print(f'From: {self.source_projects}')
print(f'To: {self.target_project}')
commands_file = self.base_name + '.sh'
idx = 1
while os.path.exists(commands_file):
idx += 1
commands_file = self.base_name + f'-{idx}.sh'
new_logfile = f'logs/{self.base_name}.log'
idx = 1
while os.path.exists(new_logfile):
idx += 1
new_logfile = f'logs/{self.base_name}-{idx}.log'
print('Obtaining the list of packages...')
osc.set_params(self.source_projects, self.target_project,
self.packages)
packagesToCopy = osc.packages_to_copy
if self.ibs:
factoryProject = 'openSUSE.org:openSUSE:Factory'
else:
factoryProject = 'openSUSE:Factory'
if self.verbose:
print('Package list: ', ' '.join(packagesToCopy))
if self.only_packages_developed_in_project:
packagesToCopy = [package for package in packagesToCopy
if osc.get_package_devel_project(
factoryProject, package) in
[self.only_packages_developed_in_project]]
print('Package list: ', ' '.join(packagesToCopy))
if self.command == 'list-packages':
print(' '.join(sorted(packagesToCopy)))
sys.exit(0)
if self.command in ['list-bugowners-in-src', 'list-bugowners-in-tgt']:
prj = self.target_project
if self.command == 'list-bugowners-in-src':
src_project_for_packages = \
osc.get_package_source_from_source_projects()
for package in sorted(packagesToCopy):
if self.command == 'list-bugowners-in-src':
prj = src_project_for_packages[package]
rb, rm = osc.get_package_bugowner_maintainer(prj, package)
print(f'{prj}/{package}')
if rb:
print(' bugowners: ', ' '.join(rb))
else:
print(' bugowners: -')
if rm:
print(' maintainers: ', ' '.join(rm))
else:
print(' maintainers: -')
sys.exit(0)
print('Getting packages versions from source projects...')
versions_in_source, src_project_for_packages = \
osc.get_package_versions_from_source_projects(
print_progress=self.print_progress)
print('Getting packages versions from target projects...')
versions_in_target = osc.get_package_versions_from_target_projects(
print_progress=self.print_progress)
srcPrj_has_to_match = False
prev_SRs = {}
if self.previous_logfile:
contents = open(self.previous_logfile, 'r').read()
sections = contents.split('#-# ')
for x, y in [x.split('\n', 1) for x in sections if x]:
sr_id = re.search(r'created request id ([0-9]*)', y)
if sr_id:
if not srcPrj_has_to_match:
x = x[x.find(' ')+1:]
prev_SRs[x] = sr_id.group(1)
print('Obtaining package diffs and generating output file...')
output = Output(commands_file, verbose=self.verbose)
output.write('#!/bin/sh')
os.chmod(commands_file, 0o764)
output.write(f'logfile="{new_logfile}"')
output.write(f'mkdir -p "{os.path.dirname(new_logfile)}"')
msg_log_files = []
progress = None
if not self.verbose and self.print_progress:
progress = Progress()
progress.max_value = len(packagesToCopy) - 1
for idx, package in enumerate(sorted(packagesToCopy)):
if progress:
progress.set_value(idx)
srcPrj = src_project_for_packages[package]
key = f'{srcPrj} {package} {self.target_project}'
output.write(f'echo "#-# {key}" | tee -a "$logfile"')
if not srcPrj_has_to_match:
key = f'{package} {self.target_project}'
try:
supersede_SR = f'-s {prev_SRs[key]}'
# Just in case the user wants to keep the last SR in the log
# and comment out the superseding of a SR
output.write(f'#echo "created request id {prev_SRs[key]}" '
'| tee -a "$logfile"')
except KeyError:
supersede_SR = ''
try:
srcVersion = versions_in_source[package]
except KeyError:
output.write(f'# Error getting source version of {package} '
'(maybe the package is broken or never built?). '
'Skipping...')
srcVersion = None
continue
try:
tgtVersion = versions_in_target[package]
except KeyError:
tgtVersion = None
# check that the versions can be parsed
try:
version.parse(srcVersion)
version.parse(tgtVersion)
versions_can_be_parsed = True
except version.InvalidVersion:
output.write(f'# {srcVersion} and/or {tgtVersion} '
'can\'t be parsed')
versions_can_be_parsed = False
except TypeError:
output.write(f'# {srcVersion} and/or {tgtVersion} '
'can\'t be parsed')
versions_can_be_parsed = False
diff_log = None
if tgtVersion and srcVersion == tgtVersion:
diff_log = osc.get_packages_diff(self.target_project,
package, srcPrj, package)
if not diff_log:
output.write(f'# {package} is the same version and exactly'
f' the same in {srcPrj} and '
f'{self.target_project} ({srcVersion})')
continue
elif (srcVersion and tgtVersion and versions_can_be_parsed and
version.parse(srcVersion) < version.parse(tgtVersion)):
output.write(f'# {package} is older in {srcPrj} than in '
f'{self.target_project} !! ({srcVersion} -> '
f'{tgtVersion})')
continue
elif (srcVersion and tgtVersion and not versions_can_be_parsed and
srcVersion < tgtVersion):
output.write(f'# {package} is older in {srcPrj} than in '
f'{self.target_project} !! ({srcVersion} -> '
f'{tgtVersion})')
continue
elif not srcVersion or not tgtVersion:
output.write(f'# srcVersion ({srcVersion}) or tgtVersion '
f'({tgtVersion}) couldn\'t be obtained')
if diff_log is None:
diff_log = osc.get_packages_diff(self.target_project, package,
srcPrj, package)
# Check for removed patches
diff_lines = diff_log.split('\n')
removed_patches = [re.sub(r'-Patch[0-9]+: *', '', x)
for x in diff_lines if x.startswith('-Patch')]
for line in diff_lines:
if (line.startswith('--- ') or line.startswith('+++ ') or
not line.startswith('+')):
continue
for patch in removed_patches[:]:
if patch in line:
removed_patches.remove(patch)
write_message_file = bool(removed_patches)
write_diff = True
try:
if self.reference:
reference_text = f' ({self.reference})'
else:
reference_text = ''
except NameError:
reference_text = ''
if tgtVersion:
if srcVersion == tgtVersion:
msg = (f'Update package {package} {tgtVersion} with '
f'changes but same version{reference_text}')
else:
msg = (f'Update package {package} from {tgtVersion} to '
f'{srcVersion}{reference_text}')
else:
msg = f'New package {package} {srcVersion}{reference_text}'
if self.set_bugowner:
if isinstance(self.set_bugowner, str):
msg += f'\nbugowner: {self.set_bugowner}\n'
else:
print('Please set "set_bugowner" to the user or group '
'that should be the bugowner')
sys.exit(1)
write_diff = False
if write_message_file or self.force_write_message_file:
filename = (f'log-messages/{srcPrj}__{self.target_project}'
f'__{package}.txt')
separator = '\n' + '-' * 72 + '\n'
if os.path.exists(filename):
contents = open(filename, 'r').read()
previous_contents = contents[:contents.find(separator)]
else:
previous_contents = ''
with open(filename, 'w') as fh:
txt = msg + '\n'
for patch in removed_patches:
txt += f'* {patch}\n'
if previous_contents:
if txt.strip(' \n') == previous_contents.strip(' \n'):
fh.write(previous_contents)
else:
fh.write(txt + '\n' + previous_contents)
output.write('# DO NOT FORGET TO REVIEW '
f'{filename} FOR CHANGES!')
else:
fh.write(txt)
if write_diff:
fh.write(separator + diff_log)
msg_param = f'-F {filename}'
msg_log_files.append(filename)
else:
msg_param = f'-m "{msg}"'
output.write(f'{osc.osc_command()} {self.command} {supersede_SR} '
f'{msg_param} {srcPrj} {package} '
f'{self.target_project} | tee -a "$logfile"')
if msg_log_files:
output.write('')
output.write('# Check the following message log files:')
output.write(f'{" ".join(msg_log_files)}')
output.write('')
output.write('echo "Log file written to: $logfile"')
if (not commands_file.startswith('./') and
not commands_file.startswith('/')):
commands_file = './' + commands_file
print(f'Commands written to: {commands_file}')
print('Done')
def main():
# try:
# config_file = sys.argv[1]
# except (IndexError, FileNotFoundError):
# print('Please pass a config file as parameter')
# sys.exit(1)
#
# command = None
# try:
# if sys.argv[2] == '--list-packages':
# command = 'list-packages'
# except IndexError:
# pass
#
# try:
# if sys.argv[2] == '--list-bugowners-in-src':
# command = 'list-bugowners-in-src'
# elif sys.argv[2] == '--list-bugowners-in-tgt':
# command = 'list-bugowners-in-tgt'
# except IndexError:
# pass
app = OSCBatchSubmit()
app.parse_command_line()
app.run()
if __name__ == "__main__":
main()