# -*- coding: utf-8 -*- # # osc-batch-submit # Copyright (C) 2025 Antonio Larrosa # import sys import re import os.path from osc_batch_submit.osc import OSC from osc_batch_submit.progress import Progress from packaging import version from argparse import ArgumentParser, RawTextHelpFormatter from importlib.util import spec_from_loader, module_from_spec from importlib.machinery import SourceFileLoader __version__ = '0.1' class Output: def __init__(self, filename, verbose=False): self.filename = filename self.fh = open(filename, 'a', buffering=1) self.verbose = verbose def write(self, txt): if self.verbose: print(txt) self.fh.write(txt+'\n') class OSCBatchSubmit: def __init__(self): # Default values self.use_cache = False self.ibs = False self.packages = '' self.source_projects = [] self.target_project = '' self.only_packages_developed_in_project = None self.reference = None # (like a jira or bugzilla refence) self.previous_logfile = None # To be able to supersede SRs self.set_bugowner = False self.force_write_message_file = False self.config_file = None self.verbose = False self.print_progress = False def parse_command_line(self): main_parser = ArgumentParser( description='Manage Batch Submissions', formatter_class=RawTextHelpFormatter) main_parser.add_argument('--version', action='version', version='osc-batch-submit ' + __version__) sps = main_parser.add_subparsers( dest='command', metavar='command', help='''The following commands are available: sr copypac list-packages list-bugowners-in-src list-bugowners-in-tgt remove-cache ''') parser = sps.add_parser('submitrequest', aliases=['sr'], description='create submit requests') parser.add_argument('config_file') parser.add_argument('--verbose', '-v', action='store_true') parser.add_argument('--cache', action='store_true') parser.add_argument('--progress', action='store_true') parser = sps.add_parser('copypac', description='copy packages') parser.add_argument('config_file') parser.add_argument('--verbose', '-v', action='store_true') parser.add_argument('--cache', action='store_true') parser.add_argument('--progress', action='store_true') parser = sps.add_parser('list-packages', description='Just list packages that ' 'would be copied/submitted') parser.add_argument('config_file') parser = sps.add_parser('list-bugowners-in-src', description='List bugowner/maintainers in ' 'source projects') parser.add_argument('config_file') parser = sps.add_parser('list-bugowners-in-tgt', description='List bugowner/maintainers in ' 'target projects') parser.add_argument('config_file') sps.add_parser('remove-cache', description='Remove cache contents') options = main_parser.parse_args() if not options.command: main_parser.print_help() sys.exit(1) self.command = options.command if getattr(options, 'config_file', None): if not options.config_file.endswith('.conf'): print(f'Please rename {options.config_file} to end in .conf') sys.exit(1) self.config_file = options.config_file self.base_name = self.config_file.removesuffix('.conf') self.load_data_from_config_file() if getattr(options, 'verbose', None): self.verbose = options.verbose if getattr(options, 'progress', None): self.print_progress = options.progress if getattr(options, 'cache', None): self.use_cache = options.cache def load_data_from_config_file(self): spec = spec_from_loader(self.base_name, SourceFileLoader(self.base_name, self.config_file)) data = module_from_spec(spec) spec.loader.exec_module(data) for var in [x for x in dir(data) if not x.startswith("__")]: setattr(self, var, getattr(data, var)) def run(self): osc = OSC(self.use_cache, ibs=self.ibs) if self.command == 'remove-cache': osc.remove_cache_files() return print(f'Package list expression: {self.packages}') print(f'From: {self.source_projects}') print(f'To: {self.target_project}') commands_file = self.base_name + '.sh' idx = 1 while os.path.exists(commands_file): idx += 1 commands_file = self.base_name + f'-{idx}.sh' new_logfile = f'logs/{self.base_name}.log' idx = 1 while os.path.exists(new_logfile): idx += 1 new_logfile = f'logs/{self.base_name}-{idx}.log' print('Obtaining the list of packages...') osc.set_params(self.source_projects, self.target_project, self.packages) packagesToCopy = osc.packages_to_copy if self.ibs: factoryProject = 'openSUSE.org:openSUSE:Factory' else: factoryProject = 'openSUSE:Factory' if self.verbose: print('Package list: ', ' '.join(packagesToCopy)) if self.only_packages_developed_in_project: packagesToCopy = [package for package in packagesToCopy if osc.get_package_devel_project( factoryProject, package) in [self.only_packages_developed_in_project]] print('Package list: ', ' '.join(packagesToCopy)) if self.command == 'list-packages': print(' '.join(sorted(packagesToCopy))) sys.exit(0) if self.command in ['list-bugowners-in-src', 'list-bugowners-in-tgt']: prj = self.target_project if self.command == 'list-bugowners-in-src': src_project_for_packages = \ osc.get_package_source_from_source_projects() for package in sorted(packagesToCopy): if self.command == 'list-bugowners-in-src': prj = src_project_for_packages[package] rb, rm = osc.get_package_bugowner_maintainer(prj, package) print(f'{prj}/{package}') if rb: print(' bugowners: ', ' '.join(rb)) else: print(' bugowners: -') if rm: print(' maintainers: ', ' '.join(rm)) else: print(' maintainers: -') sys.exit(0) print('Getting packages versions from source projects...') versions_in_source, src_project_for_packages = \ osc.get_package_versions_from_source_projects( print_progress=self.print_progress) print('Getting packages versions from target projects...') versions_in_target = osc.get_package_versions_from_target_projects( print_progress=self.print_progress) srcPrj_has_to_match = False prev_SRs = {} if self.previous_logfile: contents = open(self.previous_logfile, 'r').read() sections = contents.split('#-# ') for x, y in [x.split('\n', 1) for x in sections if x]: sr_id = re.search(r'created request id ([0-9]*)', y) if sr_id: if not srcPrj_has_to_match: x = x[x.find(' ')+1:] prev_SRs[x] = sr_id.group(1) print('Obtaining package diffs and generating output file...') output = Output(commands_file, verbose=self.verbose) output.write('#!/bin/sh') os.chmod(commands_file, 0o764) output.write(f'logfile="{new_logfile}"') output.write(f'mkdir -p "{os.path.dirname(new_logfile)}"') msg_log_files = [] progress = None if not self.verbose and self.print_progress: progress = Progress() progress.max_value = len(packagesToCopy) - 1 for idx, package in enumerate(sorted(packagesToCopy)): if progress: progress.set_value(idx) srcPrj = src_project_for_packages[package] key = f'{srcPrj} {package} {self.target_project}' output.write(f'echo "#-# {key}" | tee -a "$logfile"') if not srcPrj_has_to_match: key = f'{package} {self.target_project}' try: supersede_SR = f'-s {prev_SRs[key]}' # Just in case the user wants to keep the last SR in the log # and comment out the superseding of a SR output.write(f'#echo "created request id {prev_SRs[key]}" ' '| tee -a "$logfile"') except KeyError: supersede_SR = '' try: srcVersion = versions_in_source[package] except KeyError: output.write(f'# Error getting source version of {package} ' '(maybe the package is broken or never built?). ' 'Skipping...') srcVersion = None continue try: tgtVersion = versions_in_target[package] except KeyError: tgtVersion = None # check that the versions can be parsed try: version.parse(srcVersion) version.parse(tgtVersion) versions_can_be_parsed = True except version.InvalidVersion: output.write(f'# {srcVersion} and/or {tgtVersion} ' 'can\'t be parsed') versions_can_be_parsed = False except TypeError: output.write(f'# {srcVersion} and/or {tgtVersion} ' 'can\'t be parsed') versions_can_be_parsed = False diff_log = None if tgtVersion and srcVersion == tgtVersion: diff_log = osc.get_packages_diff(self.target_project, package, srcPrj, package) if not diff_log: output.write(f'# {package} is the same version and exactly' f' the same in {srcPrj} and ' f'{self.target_project} ({srcVersion})') continue elif (srcVersion and tgtVersion and versions_can_be_parsed and version.parse(srcVersion) < version.parse(tgtVersion)): output.write(f'# {package} is older in {srcPrj} than in ' f'{self.target_project} !! ({srcVersion} -> ' f'{tgtVersion})') continue elif (srcVersion and tgtVersion and not versions_can_be_parsed and srcVersion < tgtVersion): output.write(f'# {package} is older in {srcPrj} than in ' f'{self.target_project} !! ({srcVersion} -> ' f'{tgtVersion})') continue elif not srcVersion or not tgtVersion: output.write(f'# srcVersion ({srcVersion}) or tgtVersion ' f'({tgtVersion}) couldn\'t be obtained') if diff_log is None: diff_log = osc.get_packages_diff(self.target_project, package, srcPrj, package) # Check for removed patches diff_lines = diff_log.split('\n') removed_patches = [re.sub(r'-Patch[0-9]+: *', '', x) for x in diff_lines if x.startswith('-Patch')] for line in diff_lines: if (line.startswith('--- ') or line.startswith('+++ ') or not line.startswith('+')): continue for patch in removed_patches[:]: if patch in line: removed_patches.remove(patch) write_message_file = bool(removed_patches) write_diff = True try: if self.reference: reference_text = f' ({self.reference})' else: reference_text = '' except NameError: reference_text = '' if tgtVersion: if srcVersion == tgtVersion: msg = (f'Update package {package} {tgtVersion} with ' f'changes but same version{reference_text}') else: msg = (f'Update package {package} from {tgtVersion} to ' f'{srcVersion}{reference_text}') else: msg = f'New package {package} {srcVersion}{reference_text}' if self.set_bugowner: if isinstance(self.set_bugowner, str): msg += f'\nbugowner: {self.set_bugowner}\n' else: print('Please set "set_bugowner" to the user or group ' 'that should be the bugowner') sys.exit(1) write_diff = False if write_message_file or self.force_write_message_file: filename = (f'log-messages/{srcPrj}__{self.target_project}' f'__{package}.txt') separator = '\n' + '-' * 72 + '\n' if os.path.exists(filename): contents = open(filename, 'r').read() previous_contents = contents[:contents.find(separator)] else: previous_contents = '' with open(filename, 'w') as fh: txt = msg + '\n' for patch in removed_patches: txt += f'* {patch}\n' if previous_contents: if txt.strip(' \n') == previous_contents.strip(' \n'): fh.write(previous_contents) else: fh.write(txt + '\n' + previous_contents) output.write('# DO NOT FORGET TO REVIEW ' f'{filename} FOR CHANGES!') else: fh.write(txt) if write_diff: fh.write(separator + diff_log) msg_param = f'-F {filename}' msg_log_files.append(filename) else: msg_param = f'-m "{msg}"' output.write(f'{osc.osc_command()} {self.command} {supersede_SR} ' f'{msg_param} {srcPrj} {package} ' f'{self.target_project} | tee -a "$logfile"') if msg_log_files: output.write('') output.write('# Check the following message log files:') output.write(f'{" ".join(msg_log_files)}') output.write('') output.write('echo "Log file written to: $logfile"') if (not commands_file.startswith('./') and not commands_file.startswith('/')): commands_file = './' + commands_file print(f'Commands written to: {commands_file}') print('Done') def main(): # try: # config_file = sys.argv[1] # except (IndexError, FileNotFoundError): # print('Please pass a config file as parameter') # sys.exit(1) # # command = None # try: # if sys.argv[2] == '--list-packages': # command = 'list-packages' # except IndexError: # pass # # try: # if sys.argv[2] == '--list-bugowners-in-src': # command = 'list-bugowners-in-src' # elif sys.argv[2] == '--list-bugowners-in-tgt': # command = 'list-bugowners-in-tgt' # except IndexError: # pass app = OSCBatchSubmit() app.parse_command_line() app.run() if __name__ == "__main__": main()