658 lines
27 KiB
Python
658 lines
27 KiB
Python
#
|
||
# osc-batch-submit
|
||
# Copyright (C) 2025 Antonio Larrosa <alarrosa@suse.com>
|
||
#
|
||
import xml.etree.ElementTree as ET
|
||
import subprocess
|
||
import json
|
||
import re
|
||
import os
|
||
import glob
|
||
from osc_batch_submit.progress import Progress
|
||
|
||
# project_with_packages_to_copy = '(((SUSE:SLE-15-SP4:GA ∪ SUSE:SLE-15-SP3:Update ∪ SUSE:SLE-15-SP3:GA ∪ SUSE:SLE-15-SP2:Update ∪ SUSE:SLE-15-SP2:GA) ∩ openSUSE.org:KDE:Frameworks5) ∪ home:alarrosa:branches:SUSE:SLE-15-SP4:GA:kf5) ∖ SUSE:SLE-15-SP4:GA:Staging:B' # noqa
|
||
# srcProjects = ['openSUSE.org:KDE:Frameworks5']
|
||
#
|
||
# tgtProjects = ['SUSE:SLE-15-SP4:GA',
|
||
# 'SUSE:SLE-15-SP3:Update',
|
||
# 'SUSE:SLE-15-SP3:GA',
|
||
# 'SUSE:SLE-15-SP2:Update',
|
||
# 'SUSE:SLE-15-SP2:GA',
|
||
# 'SUSE:SLE-15-SP1:GA',
|
||
# 'SUSE:SLE-15:GA']
|
||
# tgtProject = 'SUSE:SLE-15-SP4:GA'
|
||
|
||
|
||
def find_matching_parenthesis(string, start):
|
||
# start is the position of the opening parenthesis
|
||
# returns the position of the matching closing parenthesis
|
||
|
||
level = 1
|
||
for i, ch in enumerate(string[start + 1:]):
|
||
if ch == '(' or ch == '{':
|
||
level += 1
|
||
elif ch == ')' or ch == '}':
|
||
level -= 1
|
||
if level == 0: # Found the matching parenthesis
|
||
return i + start + 1
|
||
return len(string)
|
||
|
||
|
||
class OSC:
|
||
def __init__(self, use_cache=True, ibs=False):
|
||
self.debug = False
|
||
self.use_cache = use_cache
|
||
self.cache_dirpath = None
|
||
# The directory path where cached files are stored.
|
||
|
||
self.projects = []
|
||
self.src_projects = []
|
||
self.tgt_project = ''
|
||
self.projects_inherited_from_tgt = []
|
||
self.packagesIn = {}
|
||
self.packageVersions = {}
|
||
self.versions_in_src = {}
|
||
self.versions_in_tgt = {}
|
||
self.devel_projects = {}
|
||
# Contains [project][package] -> 'devel_project'
|
||
|
||
self.projects_bugowners = {}
|
||
self.projects_maintainers = {}
|
||
|
||
self.command = ''
|
||
if ibs:
|
||
self.osc = ['osc', '-A', 'https://api.suse.de']
|
||
else:
|
||
self.osc = ['osc']
|
||
|
||
def log(self, *args):
|
||
if self.debug:
|
||
print(*args)
|
||
|
||
def osc_command(self):
|
||
return " ".join(self.osc)
|
||
|
||
def get_all_projects_from_projects_set_expression(self, prj_set_expr):
|
||
tmp = re.sub('{[^}]*}', '', prj_set_expr)
|
||
return {x.strip()
|
||
for x in re.split(r'[∩∪\*∖()]', tmp)
|
||
if x.strip()}
|
||
|
||
def get_packages_from_projects_in_set_expression(self, prj_set_expr):
|
||
prj_set_expr = prj_set_expr.strip()
|
||
self.log('get_packages_from_projects_in_set_expr: ', prj_set_expr)
|
||
if not re.findall(r'[∩∪\*∖()\{}]', prj_set_expr):
|
||
return self.packagesIn[prj_set_expr]
|
||
|
||
tok = next(re.finditer(r'[∩∪\*∖(\{]', prj_set_expr))
|
||
self.log(tok.group(0))
|
||
if tok.group(0) == '(':
|
||
end_parenthesis = find_matching_parenthesis(prj_set_expr,
|
||
tok.start())
|
||
if tok.start() == 0 and end_parenthesis == len(prj_set_expr) - 1:
|
||
return self.get_packages_from_projects_in_set_expression(
|
||
prj_set_expr[1:-1])
|
||
left_op = prj_set_expr[tok.start() + 1: end_parenthesis].strip()
|
||
tok = next(re.finditer(r'[∩∪\*∖]',
|
||
prj_set_expr[end_parenthesis + 1:]))
|
||
right_op = prj_set_expr[end_parenthesis + tok.start() + 2:].strip()
|
||
elif tok.group(0) == '{':
|
||
end_parenthesis = find_matching_parenthesis(prj_set_expr,
|
||
tok.start())
|
||
if tok.start() == 0 and end_parenthesis == len(prj_set_expr) - 1:
|
||
package_list_expr = prj_set_expr[tok.start() + 1:
|
||
end_parenthesis].strip()
|
||
self.log('package_list_expr', package_list_expr)
|
||
self.log(package_list_expr.split(','))
|
||
return set([x.strip() for x in package_list_expr.split(',')])
|
||
left_op = prj_set_expr[tok.start():
|
||
end_parenthesis + 1] # Includes the {}
|
||
tok = next(re.finditer(r'[∩∪\*∖]',
|
||
prj_set_expr[end_parenthesis + 1:]))
|
||
right_op = prj_set_expr[end_parenthesis + tok.start() + 2:].strip()
|
||
else:
|
||
left_op = prj_set_expr[0:tok.start()-1].strip()
|
||
right_op = prj_set_expr[tok.start()+1:].strip()
|
||
op = tok.group(0)
|
||
self.log(f'left op: {left_op}')
|
||
self.log(f'op: {op}')
|
||
self.log(f'right op: {right_op}')
|
||
left_set = self.get_packages_from_projects_in_set_expression(left_op)
|
||
right_set = self.get_packages_from_projects_in_set_expression(right_op)
|
||
|
||
if op == '∪':
|
||
return left_set.union(right_set)
|
||
elif op == '∩':
|
||
self.log('left op pkgs:', left_set)
|
||
self.log('right op pkgs:', right_set)
|
||
return left_set.intersection(right_set)
|
||
elif op == '∖':
|
||
if isinstance(right_set, list):
|
||
right_set = set(right_set)
|
||
self.log(f'1 {left_set}')
|
||
self.log(f'2 {right_set}')
|
||
self.log(f'3 {left_set - right_set}')
|
||
return left_set - right_set
|
||
elif op == '*':
|
||
r = set()
|
||
for pkg in left_set:
|
||
for pattern in right_set:
|
||
if re.match(pattern, pkg):
|
||
r.add(pkg)
|
||
return r
|
||
else:
|
||
return self.packagesIn[prj_set_expr]
|
||
|
||
def cache_path_for(self, cache_type, project, create_directory=True):
|
||
if not self.cache_dirpath:
|
||
cache_path = os.getenv('XDG_CACHE_HOME')
|
||
if not cache_path:
|
||
cache_path = os.path.expanduser('~/.cache')
|
||
self.cache_dirpath = f'{cache_path}/osc-batch-submit'
|
||
if create_directory:
|
||
try:
|
||
os.mkdir(self.cache_dirpath)
|
||
except FileExistsError:
|
||
pass
|
||
|
||
return f'{self.cache_dirpath}/{cache_type}{"-" if cache_type and project else ""}{project}'
|
||
|
||
def get_project_packages(self, project):
|
||
filename = self.cache_path_for('packages-in', project)
|
||
if self.use_cache:
|
||
try:
|
||
return set([line.strip('\n')
|
||
for line in open(filename, 'rt').readlines()])
|
||
except FileNotFoundError:
|
||
pass
|
||
|
||
cmd = self.osc + ['ls', project]
|
||
output = subprocess.check_output(cmd)
|
||
contents = output.decode('utf-8', errors='replace').split('\n')
|
||
packages = [line for line in contents
|
||
if line and line != '00Meta\n' and ':' not in line]
|
||
open(filename, 'wt').write('\n'.join(packages))
|
||
return set(packages)
|
||
|
||
def readAllPackages(self):
|
||
for proj in self.projects:
|
||
if proj.startswith('{'):
|
||
continue
|
||
self.packagesIn[proj] = self.get_project_packages(proj)
|
||
try:
|
||
self.packageVersions[proj] = \
|
||
self.read_all_package_versions(proj)
|
||
except FileNotFoundError:
|
||
self.packageVersions[proj] = {}
|
||
|
||
def get_project_meta_paths(self, project, repository_name='standard'):
|
||
meta_contents = None
|
||
meta_root = None
|
||
|
||
filename = self.cache_path_for('meta-prj', project)
|
||
if self.use_cache:
|
||
try:
|
||
meta_contents = open(filename, 'rt').read()
|
||
except FileNotFoundError:
|
||
pass
|
||
else:
|
||
meta_root = ET.fromstring(meta_contents)
|
||
|
||
if not meta_root:
|
||
cmd = self.osc + ['meta', 'prj', project]
|
||
proc = subprocess.run(cmd, capture_output=True)
|
||
if proc.returncode != 0:
|
||
raise RuntimeError('osc command returned an error when '
|
||
f'getting the metaprj for {project}:\n' +
|
||
proc.stderr.decode('utf-8',
|
||
errors='replace'))
|
||
|
||
open(filename, 'wt').write(proc.stdout.decode('utf-8',
|
||
errors='replace'))
|
||
try:
|
||
meta_root = ET.fromstring(proc.stdout)
|
||
except ET.ParseError:
|
||
print(f'Obtaining metaprj for {project} got {proc.stdout}')
|
||
raise
|
||
|
||
try:
|
||
repository = meta_root.find("repository[@name='standard']")
|
||
except AttributeError:
|
||
error = meta_root.find('error')
|
||
msg = f'Error obtaining repository meta for {project}'
|
||
if error is not None:
|
||
msg += f': {error.text}'
|
||
print(msg)
|
||
return None
|
||
try:
|
||
paths = [(p.attrib['project'], p.attrib['repository'])
|
||
for p in repository.findall("path")]
|
||
except AttributeError:
|
||
error = meta_root.find('error')
|
||
msg = f'Error obtaining repository paths for {project}'
|
||
if error is not None:
|
||
msg += f': {error.text}'
|
||
print(msg)
|
||
return None
|
||
return paths
|
||
|
||
def get_all_projects_inherited_from_project(self, project,
|
||
repository_name='standard'):
|
||
r = [project]
|
||
paths = self.get_project_meta_paths(project, repository_name)
|
||
visited = {(project, repository_name)}
|
||
while paths:
|
||
prj, repo = paths.pop(0)
|
||
try:
|
||
while (prj, repo) in visited:
|
||
prj, repo = paths.pop(0)
|
||
except IndexError:
|
||
break
|
||
if prj not in r:
|
||
r.append(prj)
|
||
more_paths = self.get_project_meta_paths(prj, repo)
|
||
visited.add((prj, repo))
|
||
if more_paths:
|
||
paths.extend(more_paths)
|
||
return r
|
||
|
||
def get_package_version(self, project, package, retry=True):
|
||
cmd = self.osc + ['api',
|
||
f'/source/{project}/{package}?view=info&parse=1']
|
||
proc = subprocess.run(cmd, capture_output=True)
|
||
if proc.returncode != 0:
|
||
stderr = proc.stderr.decode('utf-8', errors='replace')
|
||
if (proc.returncode == 1 and
|
||
'HTTP Error 400: download in progress' in stderr):
|
||
return 'unknown'
|
||
raise RuntimeError('osc command returned an error when getting the'
|
||
f' version of {project}/{package}:\n' +
|
||
proc.stderr.decode('utf-8', errors='replace'))
|
||
root = ET.fromstring(proc.stdout)
|
||
try:
|
||
version = root.find('version').text
|
||
except AttributeError:
|
||
error = root.find('error')
|
||
if error is not None:
|
||
print('Error obtaining package version for ' +
|
||
f'{project}/{package}: {error.text}')
|
||
if error.text == 'download in progress' and retry:
|
||
return self.get_package_version(project, package,
|
||
retry=False)
|
||
else:
|
||
print('AttributeError obtaining package version for ' +
|
||
f'{project}/{package}')
|
||
|
||
return None
|
||
return version
|
||
|
||
def get_request_state(self, requestid, retry=True):
|
||
cmd = self.osc + ['api',
|
||
f'/request/{requestid}']
|
||
proc = subprocess.run(cmd, capture_output=True)
|
||
if proc.returncode != 0:
|
||
raise RuntimeError('osc command returned an error when getting the'
|
||
f' state of request {requestid}:\n' +
|
||
proc.stderr.decode('utf-8', errors='replace'))
|
||
root = ET.fromstring(proc.stdout)
|
||
try:
|
||
state = root.find('state').attrib['name']
|
||
state_msg = '\n'.join([x.strip(' \r\n')
|
||
for x in root.find('state').itertext()
|
||
if x.strip(' \r\n')])
|
||
except AttributeError:
|
||
error = root.find('error')
|
||
if error is not None:
|
||
print('Error obtaining request state for ' +
|
||
f'{requestid}: {error.text}')
|
||
if error.text == 'download in progress' and retry:
|
||
return self.get_request_state(requestid, retry=False)
|
||
else:
|
||
print('AttributeError obtaining request state for '
|
||
f'{requestid}')
|
||
|
||
return None
|
||
return (state, state_msg)
|
||
|
||
def get_request_comments(self, requestid, only_last=False, retry=True):
|
||
cmd = self.osc + ['api',
|
||
f'/comments/request/{requestid}']
|
||
proc = subprocess.run(cmd, capture_output=True)
|
||
if proc.returncode != 0:
|
||
raise RuntimeError('osc command returned an error when getting the'
|
||
f' comments of request {requestid}:\n' +
|
||
proc.stderr.decode('utf-8', errors='replace'))
|
||
root = ET.fromstring(proc.stdout)
|
||
try:
|
||
comments = root.findall('comment')
|
||
comments_texts = [f"{x.attrib['who']}: {x.text}" for x in comments]
|
||
if only_last:
|
||
return comments_texts[-1] if comments_texts else ''
|
||
except AttributeError:
|
||
error = root.find('error')
|
||
if error is not None:
|
||
print('Error obtaining request state for ' +
|
||
f'{requestid}: {error.text}')
|
||
if error.text == 'download in progress' and retry:
|
||
return self.get_request_comments(requestid, retry=False)
|
||
else:
|
||
print('AttributeError obtaining request comments '
|
||
f'for {requestid}')
|
||
|
||
return None
|
||
return comments_texts
|
||
|
||
def get_project_bugowner_maintainer(self, project, retry=True):
|
||
if project in self.projects_bugowners:
|
||
return (self.projects_bugowners[project],
|
||
self.projects_maintainers[project])
|
||
|
||
cmd = self.osc + ['api',
|
||
f'/source/{project}/_meta']
|
||
proc = subprocess.run(cmd, capture_output=True)
|
||
if proc.returncode != 0:
|
||
raise RuntimeError('osc command returned an error when getting the'
|
||
f' bugowner/maintiner of project {project}:\n' +
|
||
proc.stderr.decode('utf-8', errors='replace'))
|
||
root = ET.fromstring(proc.stdout)
|
||
import pdb
|
||
pdb.set_trace()
|
||
try:
|
||
b_elements = root.findall("*[@role='bugowner']")
|
||
m_elements = root.findall("*[@role='maintainer']")
|
||
except AttributeError:
|
||
error = root.find('error')
|
||
if error is not None:
|
||
print('Error obtaining project bugowner for ' +
|
||
f'{project}: {error.text}')
|
||
if error.text == 'download in progress' and retry:
|
||
return self.get_project_bugowner(project, retry=False)
|
||
else:
|
||
print('AttributeError obtaining project bugowner for ' +
|
||
f'{project}')
|
||
|
||
return None
|
||
|
||
rb = []
|
||
rm = []
|
||
for element in b_elements:
|
||
rb.append((element.tag, element.attrib[f'{element.tag}id']))
|
||
for element in m_elements:
|
||
rm.append((element.tag, element.attrib[f'{element.tag}id']))
|
||
self.projects_bugowners[project] = rb
|
||
self.projects_maintainers[project] = rm
|
||
return rb, rm
|
||
|
||
def get_package_bugowner_maintainer(self, project, package, retry=True):
|
||
cmd = self.osc + ['api',
|
||
f'/source/{project}/{package}/_meta']
|
||
proc = subprocess.run(cmd, capture_output=True)
|
||
if proc.returncode != 0:
|
||
raise RuntimeError('osc command returned an error when getting the'
|
||
' bugowner/maintiner of package '
|
||
f'{project}/{package}:\n' +
|
||
proc.stderr.decode('utf-8', errors='replace'))
|
||
root = ET.fromstring(proc.stdout)
|
||
try:
|
||
b_elements = root.findall("*[@role='bugowner']")
|
||
m_elements = root.findall("*[@role='maintainer']")
|
||
except AttributeError:
|
||
error = root.find('error')
|
||
if error is not None:
|
||
print('Error obtaining package bugowner for ' +
|
||
f'{project}/{package}: {error.text}')
|
||
if error.text == 'download in progress' and retry:
|
||
return self.get_package_bugowner(project, package,
|
||
retry=False)
|
||
else:
|
||
print('AttributeError obtaining package bugowner for ' +
|
||
f'{project}/{package}')
|
||
|
||
return None
|
||
|
||
rb = []
|
||
rm = []
|
||
for element in b_elements:
|
||
rb.append((element.tag, element.attrib[f'{element.tag}id']))
|
||
for element in m_elements:
|
||
rm.append((element.tag, element.attrib[f'{element.tag}id']))
|
||
if not rb or not rm:
|
||
prj_b, prj_m = self.get_project_bugowner_maintainer(project)
|
||
if not rb:
|
||
rb = prj_b
|
||
if not rm:
|
||
rm = prj_m
|
||
|
||
return rb, rm
|
||
|
||
def get_all_package_versions(self, project, packages, filename):
|
||
versions = {}
|
||
for package in packages:
|
||
version = self.get_package_version(project, package)
|
||
versions[package] = version
|
||
self.packageVersions[project] = versions
|
||
self.write_cache_package_versions(project)
|
||
return versions
|
||
|
||
def write_cache_package_versions(self, project):
|
||
filename = self.cache_path_for('versions-in', project)
|
||
with open(filename, 'w') as fh:
|
||
json.dump(self.packageVersions[project], fh)
|
||
|
||
def read_all_package_versions(self, project):
|
||
versions = {}
|
||
if self.use_cache:
|
||
filename = self.cache_path_for('versions-in', project)
|
||
try:
|
||
with open(filename, 'r') as fh:
|
||
versions = json.load(fh)
|
||
except json.decoder.JSONDecodeError:
|
||
print(f'JSON error in {filename}. Skipping...')
|
||
return versions
|
||
|
||
def set_params(self, source_projects, target_project,
|
||
packages_expression):
|
||
self.source_projects = source_projects
|
||
self.target_project = target_project
|
||
self.packages_expression = packages_expression
|
||
self.projects_inherited_from_tgt = \
|
||
self.get_all_projects_inherited_from_project(
|
||
target_project, repository_name='standard')
|
||
|
||
tmp_prjs = self.get_all_projects_from_projects_set_expression(
|
||
packages_expression)
|
||
|
||
self.log('Projects inherited from tgt')
|
||
self.log(self.projects_inherited_from_tgt)
|
||
self.projects = list(tmp_prjs.union(source_projects,
|
||
self.projects_inherited_from_tgt,
|
||
{target_project}))
|
||
self.readAllPackages()
|
||
|
||
self.packages_to_copy = \
|
||
self.get_packages_from_projects_in_set_expression(
|
||
packages_expression)
|
||
|
||
def add_packages_to_packages_to_copy_list(self, packages):
|
||
self.packages_to_copy.extend(p for p in packages
|
||
if p not in self.packages_to_copy)
|
||
|
||
def restrict_packages_in_packages_to_copy_list(self, packages):
|
||
packages_to_copy = self.packages_to_copy
|
||
|
||
self.packages_to_copy = [p for p in packages
|
||
if p in packages_to_copy]
|
||
|
||
def get_package_devel_project(self, project, package):
|
||
'''Returns the devel project for a package.
|
||
|
||
Returns 'KDE:Applications' for project 'openSUSE:Factory',
|
||
package 'kcron'
|
||
'''
|
||
|
||
if project not in self.devel_projects:
|
||
self.read_cache_devel_projects(project)
|
||
|
||
try:
|
||
return self.devel_projects[project][package]
|
||
except KeyError:
|
||
pass
|
||
|
||
cmd = self.osc + ['develproject', project, package]
|
||
proc = subprocess.run(cmd, capture_output=True)
|
||
if proc.returncode != 0:
|
||
raise RuntimeError('osc command returned an error when getting the'
|
||
f' devel project of {project}/{package}:\n' +
|
||
proc.stderr.decode('utf-8', errors='replace'))
|
||
|
||
self.devel_projects[project][package] = \
|
||
proc.stdout.decode('utf-8',
|
||
errors='replace').strip('\n').split('/')[0]
|
||
self.write_cache_devel_projects(project)
|
||
|
||
return self.devel_projects[project][package]
|
||
|
||
def package_is_devel_package_for_project(
|
||
self, devel_project, devel_package, project):
|
||
'''Checks the devel project for a package.
|
||
|
||
Returns if devel_project is the devel project of devel_package in
|
||
project. For example, returns True for ('KDE:Applications',
|
||
'kcron', 'openSUSE:Factory').
|
||
'''
|
||
|
||
real_devel_project = self.get_package_devel_project(project,
|
||
devel_package)
|
||
return real_devel_project == devel_project
|
||
|
||
def write_cache_devel_projects(self, project):
|
||
filename = self.cache_path_for('devel-projects', project)
|
||
with open(filename, 'w') as fh:
|
||
json.dump(self.devel_projects[project], fh)
|
||
|
||
def read_cache_devel_projects(self, project):
|
||
self.devel_projects[project] = {}
|
||
if not self.use_cache:
|
||
return
|
||
|
||
filename = self.cache_path_for('devel-projects', project)
|
||
try:
|
||
with open(filename, 'r') as fh:
|
||
self.devel_projects[project] = json.load(fh)
|
||
except FileNotFoundError:
|
||
pass
|
||
|
||
def get_package_versions_from_source_projects_helper(self,
|
||
get_versions=True,
|
||
print_progress=False):
|
||
self.versions_in_src = {}
|
||
self.packageSource = {}
|
||
progress = None
|
||
if print_progress:
|
||
progress = Progress()
|
||
progress.max_value = len(self.packages_to_copy) - 1
|
||
|
||
for idx, package in enumerate(sorted(self.packages_to_copy)):
|
||
if progress:
|
||
progress.set_value(idx)
|
||
for prj in self.source_projects:
|
||
if package in self.packagesIn[prj]:
|
||
self.packageSource[package] = prj
|
||
if not get_versions:
|
||
continue
|
||
try:
|
||
self.versions_in_src[package] = \
|
||
self.packageVersions[prj][package]
|
||
except KeyError:
|
||
# We need to request the version to obs
|
||
try:
|
||
srcVersion = self.get_package_version(prj, package)
|
||
except ET.ParseError:
|
||
self.log(f'{prj} / {package} doesn\'t have a '
|
||
'version set (probably still building?)')
|
||
self.versions_in_src[package] = None
|
||
break
|
||
|
||
self.log(f'Requested src version to obs: {prj}, ' +
|
||
f'{package}, {srcVersion}')
|
||
if srcVersion:
|
||
self.versions_in_src[package] = srcVersion
|
||
self.packageVersions[prj][package] = srcVersion
|
||
self.write_cache_package_versions(prj)
|
||
|
||
break
|
||
if progress:
|
||
print('')
|
||
|
||
return (self.versions_in_src, self.packageSource)
|
||
|
||
def get_package_source_from_source_projects(self, print_progress=False):
|
||
self.get_package_versions_from_source_projects_helper(
|
||
get_versions=False, print_progress=print_progress)
|
||
return self.packageSource
|
||
|
||
def get_package_versions_from_source_projects(self, print_progress=False):
|
||
return self.get_package_versions_from_source_projects_helper(
|
||
get_versions=True, print_progress=print_progress)
|
||
|
||
def get_package_versions_from_target_projects(self, print_progress=False):
|
||
progress = None
|
||
lf = ''
|
||
if print_progress:
|
||
progress = Progress()
|
||
progress.max_value = len(self.packages_to_copy) - 1
|
||
lf = '\n'
|
||
for idx, package in enumerate(sorted(self.packages_to_copy)):
|
||
if progress:
|
||
progress.set_value(idx)
|
||
try:
|
||
tgtVersion = self.versions_in_tgt[package]
|
||
except KeyError:
|
||
# Obtaining version of {package} from
|
||
# {self.projects_inherited_from_tgt}
|
||
for prj in self.projects_inherited_from_tgt:
|
||
if package not in self.packagesIn[prj]:
|
||
# print(f'{package} not available in {prj}')
|
||
continue
|
||
# Let's see if we already read the package version
|
||
# print(f'{package} is in {prj}')
|
||
try:
|
||
tgtVersion = self.packageVersions[prj][package]
|
||
except KeyError:
|
||
# We need to request the version to obs
|
||
# print(f'Obtainining version of {package} from {prj}')
|
||
|
||
tgtVersion = self.get_package_version(prj, package)
|
||
self.log(f'Requested tgt version to obs: {prj}, ' +
|
||
f'{package}, {tgtVersion}')
|
||
if tgtVersion:
|
||
self.versions_in_tgt[package] = tgtVersion
|
||
self.packageVersions[prj][package] = tgtVersion
|
||
self.write_cache_package_versions(prj)
|
||
break
|
||
else:
|
||
print(f'{lf}# {package} not found in ' +
|
||
f'{self.projects_inherited_from_tgt}')
|
||
tgtVersion = None
|
||
if progress:
|
||
print('')
|
||
return self.versions_in_tgt
|
||
|
||
def get_packages_diff(self, srcPrj, srcPackage, tgtPrj, tgtPackage):
|
||
cmd = self.osc + ['rdiff', srcPrj, srcPackage, tgtPrj, tgtPackage]
|
||
proc = subprocess.run(cmd, capture_output=True)
|
||
if proc.returncode != 0:
|
||
raise RuntimeError('osc command returned an error when getting the'
|
||
f' rdiff of {srcPrj}/{srcPackage} and '
|
||
f'{tgtPrj}/{tgtPackage}:\n' +
|
||
proc.stderr.decode('utf-8', errors='replace'))
|
||
return proc.stdout.decode('utf-8', errors='replace').strip('\n')
|
||
|
||
def remove_cache_files(self):
|
||
dirpath = self.cache_path_for('', '')
|
||
cache_types = ['packages-in', 'meta-prj', 'versions-in', 'devel-projects']
|
||
for cache_type in cache_types:
|
||
path = f'{dirpath}/{cache_type}-*'
|
||
for filename in glob.glob(path):
|
||
os.unlink(filename)
|