Adrian Schröter cf36ceb5b8
Add support for git project handling
Automatically switches over from OBS api calls to git
when specifing a git repository on CLI
2024-05-07 16:22:21 +02:00

407 lines
16 KiB
Python

import logging
import re
import time
from lxml import etree as ET
import solv
class Group(object):
def __init__(self, name, pkglist):
self.name = name
self.safe_name = re.sub(r'\W', '_', name.lower())
self.pkglist = pkglist
self.architectures = pkglist.all_architectures
self.conditional = None
self.flavors = dict()
self.packages = dict()
self.locked = set()
self.solved_packages = None
self.solved = False
self.not_found = dict()
self.unresolvable = dict()
self.default_support_status = None
self.ignore_broken = False
for a in self.architectures:
self.packages[a] = []
self.unresolvable[a] = dict()
self.comment = ' ### AUTOMATICALLY GENERATED, DO NOT EDIT ### '
self.srcpkgs = None
self.develpkgs = dict()
self.silents = set()
self.required = set()
self.ignored = set()
# special feature for SLE. Patterns are marked for expansion
# of recommended packages, all others aren't. Only works
# with recommends on actual package names, not virtual
# provides.
self.expand_recommended = set()
# special feature for Tumbleweed. Just like the above but for
# suggested (recommends are default)
self.expand_suggested = set()
pkglist.groups[self.safe_name] = self
self.logger = logging.getLogger(__name__)
def _add_to_packages(self, package, arch=None):
archs = self.architectures
if arch:
archs = [arch]
for a in archs:
# we use groups.yml for powerpc through a branch,
# so ignore inapplicable architectures
if a not in self.packages:
continue
self.packages[a].append([package, self.name])
def parse_yml(self, packages):
# package less group is a rare exception
if packages is None:
return
for package in packages:
if not isinstance(package, dict):
self._add_to_packages(package)
continue
name = list(package)[0]
for rel in package[name]:
arch = None
if rel == 'locked':
self.locked.add(name)
continue
elif rel == 'silent':
self.silents.add(name)
elif rel == 'required':
self.required.add(name)
elif rel == 'recommended':
self.expand_recommended.add(name)
elif rel == 'suggested':
self.expand_suggested.add(name)
self.expand_recommended.add(name)
else:
arch = rel
self._add_to_packages(name, arch)
def _verify_solved(self):
if not self.solved:
raise Exception('group {} not solved'.format(self.name))
def inherit(self, group):
for arch in self.architectures:
self.packages[arch] += group.packages[arch]
self.locked.update(group.locked)
self.silents.update(group.silents)
self.required.update(group.required)
self.expand_recommended.update(group.expand_recommended)
self.expand_suggested.update(group.expand_suggested)
# do not repeat packages
def ignore(self, without):
for arch in ['*'] + self.pkglist.filtered_architectures:
s = set(without.solved_packages[arch])
s |= set(without.solved_packages['*'])
for p in s:
self.solved_packages[arch].pop(p, None)
for p in without.not_found.keys():
if p not in self.not_found:
continue
self.not_found[p] -= without.not_found[p]
if not self.not_found[p]:
self.not_found.pop(p)
for g in without.ignored:
self.ignore(g)
self.ignored.add(without)
def solve(self, use_recommends=False):
""" base: list of base groups or None """
solved = dict()
for arch in self.pkglist.filtered_architectures:
solved[arch] = dict()
self.srcpkgs = dict()
self.recommends = dict()
self.suggested = dict()
for arch in self.pkglist.filtered_architectures:
pool = self.pkglist.prepare_pool(arch, False)
solver = pool.Solver()
solver.set_flag(solver.SOLVER_FLAG_IGNORE_RECOMMENDED, not use_recommends)
solver.set_flag(solver.SOLVER_FLAG_ADD_ALREADY_RECOMMENDED, use_recommends)
# pool.set_debuglevel(10)
suggested = dict()
# packages resulting from explicit recommended expansion
extra = []
def solve_one_package(n, group):
jobs = list(self.pkglist.lockjobs[arch])
sel = pool.select(str(n), solv.Selection.SELECTION_NAME)
if sel.isempty():
self.logger.debug('{}.{}: package {} not found'.format(self.name, arch, n))
self.not_found.setdefault(n, set()).add(arch)
return
else:
if n in self.expand_recommended:
for s in sel.solvables():
for dep in s.lookup_deparray(solv.SOLVABLE_RECOMMENDS):
# only add recommends that exist as packages
rec = pool.select(dep.str(), solv.Selection.SELECTION_NAME)
if not rec.isempty():
extra.append([dep.str(), group + ':recommended:' + n])
jobs += sel.jobs(solv.Job.SOLVER_INSTALL)
locked = self.locked | self.pkglist.unwanted
for lock in locked:
sel = pool.select(str(lock), solv.Selection.SELECTION_NAME)
# if we can't find it, it probably is not as important
if not sel.isempty():
jobs += sel.jobs(solv.Job.SOLVER_LOCK)
for s in self.silents:
sel = pool.select(str(s), solv.Selection.SELECTION_NAME | solv.Selection.SELECTION_FLAT)
if sel.isempty():
self.logger.warning('{}.{}: silent package {} not found'.format(self.name, arch, s))
else:
jobs += sel.jobs(solv.Job.SOLVER_INSTALL)
problems = solver.solve(jobs)
if problems:
for problem in problems:
msg = 'unresolvable: {}:{}.{}: {}'.format(self.name, n, arch, problem)
self.logger.debug(msg)
self.unresolvable[arch][n] = str(problem)
return
for s in solver.get_recommended():
if s.name in locked:
continue
self.recommends.setdefault(s.name, group + ':' + n)
if n in self.expand_suggested:
for s in solver.get_suggested():
suggested[s.name] = group + ':suggested:' + n
self.suggested.setdefault(s.name, suggested[s.name])
trans = solver.transaction()
if trans.isempty():
self.logger.error('%s.%s: nothing to do', self.name, arch)
return
for s in trans.newsolvables():
solved[arch].setdefault(s.name, group + ':' + n)
if None:
reason, rule = solver.describe_decision(s)
print(self.name, s.name, reason, rule.info().problemstr())
# don't ask me why, but that's how it seems to work
if s.lookup_void(solv.SOLVABLE_SOURCENAME):
src = s.name
else:
src = s.lookup_str(solv.SOLVABLE_SOURCENAME)
self.srcpkgs[src] = group + ':' + s.name
start = time.time()
for n, group in self.packages[arch]:
solve_one_package(n, group)
# resetup the pool with ignored conflicts to get supplements from the list
pool = self.pkglist.prepare_pool(arch, True)
solver = pool.Solver()
solver.set_flag(solver.SOLVER_FLAG_IGNORE_RECOMMENDED, not use_recommends)
solver.set_flag(solver.SOLVER_FLAG_ADD_ALREADY_RECOMMENDED, use_recommends)
jobs = list(self.pkglist.lockjobs[arch])
locked = self.locked | self.pkglist.unwanted
for lock in locked:
sel = pool.select(str(lock), solv.Selection.SELECTION_NAME)
# if we can't find it, it probably is not as important
if not sel.isempty():
jobs += sel.jobs(solv.Job.SOLVER_LOCK)
for n in list(solved[arch]) + list(suggested):
if n in locked:
continue
sel = pool.select(str(n), solv.Selection.SELECTION_NAME)
jobs += sel.jobs(solv.Job.SOLVER_INSTALL)
solver.solve(jobs)
trans = solver.transaction()
for s in trans.newsolvables():
solved[arch].setdefault(s.name, group + ':expansion')
end = time.time()
self.logger.info('%s - solving took %f', self.name, end - start)
common = None
# compute common packages across all architectures
for arch in self.pkglist.filtered_architectures:
if common is None:
common = set(solved[arch])
continue
common &= set(solved[arch])
if common is None:
common = set()
# reduce arch specific set by common ones
solved['*'] = dict()
for arch in self.pkglist.filtered_architectures:
for p in common:
solved['*'][p] = solved[arch].pop(p)
self.solved_packages = solved
self.solved = True
def check_dups(self, modules, overlap):
if not overlap:
return
packages = set(self.solved_packages['*'])
for arch in self.pkglist.filtered_architectures:
packages.update(self.solved_packages[arch])
for m in modules:
# do not check with ourselves and only once for the rest
if m.name <= self.name:
continue
if self.name in m.conflicts or m.name in self.conflicts:
continue
mp = set(m.solved_packages['*'])
for arch in self.pkglist.filtered_architectures:
mp.update(m.solved_packages[arch])
if len(packages & mp):
overlap.comment += '\n overlapping between ' + self.name + ' and ' + m.name + '\n'
for p in sorted(packages & mp):
for arch in list(m.solved_packages):
if m.solved_packages[arch].get(p, None):
overlap.comment += ' # ' + m.name + '.' + arch + ': ' + m.solved_packages[arch][p] + '\n'
if self.solved_packages[arch].get(p, None):
overlap.comment += ' # ' + self.name + '.' + \
arch + ': ' + self.solved_packages[arch][p] + '\n'
overlap.comment += ' - ' + p + '\n'
overlap._add_to_packages(p)
def collect_devel_packages(self):
for arch in self.pkglist.filtered_architectures:
pool = self.pkglist.prepare_pool(arch, False)
pool.Selection()
for s in pool.solvables_iter():
if s.name.endswith('-devel'):
# don't ask me why, but that's how it seems to work
if s.lookup_void(solv.SOLVABLE_SOURCENAME):
src = s.name
else:
src = s.lookup_str(solv.SOLVABLE_SOURCENAME)
if src in self.srcpkgs:
self.develpkgs[s.name] = self.srcpkgs[src]
def _filter_already_selected(self, modules, pkgdict):
# erase our own - so we don't filter our own
for p in list(pkgdict):
already_present = False
for m in modules:
for arch in ['*'] + self.pkglist.filtered_architectures:
already_present = already_present or (p in m.solved_packages[arch])
if already_present:
del pkgdict[p]
def filter_already_selected(self, modules):
self._filter_already_selected(modules, self.recommends)
def tocompose(self, prefix, arch, ignore_broken=False, comment=None):
packages = self.solved_packages.get(arch, dict())
name = self.name
missing = dict()
if arch == '*':
missing = self.not_found
content = ''
unresolvable = self.unresolvable.get(arch, dict())
for name in sorted(list(packages) + list(missing) + list(unresolvable)):
if name in self.silents:
continue
if name in missing:
if ignore_broken and name not in self.required:
msg = ' {} not found on {}'.format(name, ','.join(sorted(missing[name])))
content += prefix + "#- " + msg + "\n"
self.logger.error(msg)
continue
if name in unresolvable:
if ignore_broken and name not in self.required:
msg = ' {} uninstallable: {}'.format(name, unresolvable[name])
content += prefix + "#- " + msg + "\n"
self.logger.error(msg)
continue
content += prefix + "- " + name
if comment:
content += " # " + comment
content += "\n"
content += "\n"
return content
def toxml(self, arch, ignore_broken=False, comment=None):
packages = self.solved_packages.get(arch, dict())
name = self.name
if arch != '*':
name += '.' + arch
root = ET.Element('group', {'name': name})
if comment:
c = ET.Comment(comment)
root.append(c)
if arch != '*':
ET.SubElement(root, 'conditional', {'name': 'only_{}'.format(arch)})
packagelist = ET.SubElement(root, 'packagelist', {'relationship': 'recommends'})
missing = dict()
if arch == '*':
missing = self.not_found
unresolvable = self.unresolvable.get(arch, dict())
for name in sorted(list(packages) + list(missing) + list(unresolvable)):
if name in self.silents:
continue
if name in missing:
msg = ' {} not found on {}'.format(name, ','.join(sorted(missing[name])))
if ignore_broken and name not in self.required:
c = ET.Comment(msg)
packagelist.append(c)
continue
name = msg
if name in unresolvable:
msg = ' {} uninstallable: {}'.format(name, unresolvable[name])
if ignore_broken and name not in self.required:
c = ET.Comment(msg)
packagelist.append(c)
continue
else:
self.logger.error(msg)
name = msg
status = self.pkglist.supportstatus(name) or self.default_support_status
attrs = {'name': name}
if status is not None:
attrs['supportstatus'] = status
ET.SubElement(packagelist, 'package', attrs)
if name in packages and packages[name]:
c = ET.Comment(' reason: {} '.format(packages[name]))
packagelist.append(c)
return root
# just list all packages in it as an array - to be output as one yml
def summary(self):
ret = set()
for arch in ['*'] + self.pkglist.filtered_architectures:
ret |= set(self.solved_packages[arch])
return ret