import logging import re import time from lxml import etree as ET import solv class Group(object): def __init__(self, name, pkglist): self.name = name self.safe_name = re.sub(r'\W', '_', name.lower()) self.pkglist = pkglist self.architectures = pkglist.all_architectures self.conditional = None self.flavors = dict() self.packages = dict() self.locked = set() self.solved_packages = None self.solved = False self.not_found = dict() self.unresolvable = dict() self.default_support_status = None self.ignore_broken = False for a in self.architectures: self.packages[a] = [] self.unresolvable[a] = dict() self.comment = ' ### AUTOMATICALLY GENERATED, DO NOT EDIT ### ' self.srcpkgs = None self.develpkgs = dict() self.silents = set() self.required = set() self.ignored = set() # special feature for SLE. Patterns are marked for expansion # of recommended packages, all others aren't. Only works # with recommends on actual package names, not virtual # provides. self.expand_recommended = set() # special feature for Tumbleweed. Just like the above but for # suggested (recommends are default) self.expand_suggested = set() pkglist.groups[self.safe_name] = self self.logger = logging.getLogger(__name__) def _add_to_packages(self, package, arch=None): archs = self.architectures if arch: archs = [arch] for a in archs: # we use groups.yml for powerpc through a branch, # so ignore inapplicable architectures if a not in self.packages: continue self.packages[a].append([package, self.name]) def parse_yml(self, packages): # package less group is a rare exception if packages is None: return for package in packages: if not isinstance(package, dict): self._add_to_packages(package) continue name = list(package)[0] for rel in package[name]: arch = None if rel == 'locked': self.locked.add(name) continue elif rel == 'silent': self.silents.add(name) elif rel == 'required': self.required.add(name) elif rel == 'recommended': self.expand_recommended.add(name) elif rel == 'suggested': self.expand_suggested.add(name) self.expand_recommended.add(name) else: arch = rel self._add_to_packages(name, arch) def _verify_solved(self): if not self.solved: raise Exception(f'group {self.name} not solved') def inherit(self, group): for arch in self.architectures: self.packages[arch] += group.packages[arch] self.locked.update(group.locked) self.silents.update(group.silents) self.required.update(group.required) self.expand_recommended.update(group.expand_recommended) self.expand_suggested.update(group.expand_suggested) # do not repeat packages def ignore(self, without): for arch in ['*'] + self.pkglist.filtered_architectures: s = set(without.solved_packages[arch]) s |= set(without.solved_packages['*']) for p in s: self.solved_packages[arch].pop(p, None) for p in without.not_found.keys(): if p not in self.not_found: continue self.not_found[p] -= without.not_found[p] if not self.not_found[p]: self.not_found.pop(p) for g in without.ignored: self.ignore(g) self.ignored.add(without) def solve(self, use_recommends=False): """ base: list of base groups or None """ solved = dict() for arch in self.pkglist.filtered_architectures: solved[arch] = dict() self.srcpkgs = dict() self.recommends = dict() self.suggested = dict() for arch in self.pkglist.filtered_architectures: pool = self.pkglist.prepare_pool(arch, False) solver = pool.Solver() solver.set_flag(solver.SOLVER_FLAG_IGNORE_RECOMMENDED, not use_recommends) solver.set_flag(solver.SOLVER_FLAG_ADD_ALREADY_RECOMMENDED, use_recommends) # pool.set_debuglevel(10) suggested = dict() # packages resulting from explicit recommended expansion extra = [] def solve_one_package(n, group): jobs = list(self.pkglist.lockjobs[arch]) sel = pool.select(str(n), solv.Selection.SELECTION_NAME) if sel.isempty(): self.logger.debug(f'{self.name}.{arch}: package {n} not found') self.not_found.setdefault(n, set()).add(arch) return else: if n in self.expand_recommended: for s in sel.solvables(): for dep in s.lookup_deparray(solv.SOLVABLE_RECOMMENDS): # only add recommends that exist as packages rec = pool.select(dep.str(), solv.Selection.SELECTION_NAME) if not rec.isempty(): extra.append([dep.str(), f"{group}:recommended:{n}"]) jobs += sel.jobs(solv.Job.SOLVER_INSTALL) locked = self.locked | self.pkglist.unwanted for lock in locked: sel = pool.select(str(lock), solv.Selection.SELECTION_NAME) # if we can't find it, it probably is not as important if not sel.isempty(): jobs += sel.jobs(solv.Job.SOLVER_LOCK) for s in self.silents: sel = pool.select(str(s), solv.Selection.SELECTION_NAME | solv.Selection.SELECTION_FLAT) if sel.isempty(): self.logger.warning(f'{self.name}.{arch}: silent package {s} not found') else: jobs += sel.jobs(solv.Job.SOLVER_INSTALL) problems = solver.solve(jobs) if problems: for problem in problems: msg = f'unresolvable: {self.name}:{n}.{arch}: {problem}' self.logger.debug(msg) self.unresolvable[arch][n] = str(problem) return for s in solver.get_recommended(): if s.name in locked: continue self.recommends.setdefault(s.name, f"{group}:{n}") if n in self.expand_suggested: for s in solver.get_suggested(): suggested[s.name] = f"{group}:suggested:{n}" self.suggested.setdefault(s.name, suggested[s.name]) trans = solver.transaction() if trans.isempty(): self.logger.error('%s.%s: nothing to do', self.name, arch) return for s in trans.newsolvables(): solved[arch].setdefault(s.name, f"{group}:{n}") if None: reason, rule = solver.describe_decision(s) print(self.name, s.name, reason, rule.info().problemstr()) # don't ask me why, but that's how it seems to work if s.lookup_void(solv.SOLVABLE_SOURCENAME): src = s.name else: src = s.lookup_str(solv.SOLVABLE_SOURCENAME) self.srcpkgs[src] = f"{group}:{s.name}" start = time.time() for n, group in self.packages[arch]: solve_one_package(n, group) # resetup the pool with ignored conflicts to get supplements from the list pool = self.pkglist.prepare_pool(arch, True) solver = pool.Solver() solver.set_flag(solver.SOLVER_FLAG_IGNORE_RECOMMENDED, not use_recommends) solver.set_flag(solver.SOLVER_FLAG_ADD_ALREADY_RECOMMENDED, use_recommends) jobs = list(self.pkglist.lockjobs[arch]) locked = self.locked | self.pkglist.unwanted for lock in locked: sel = pool.select(str(lock), solv.Selection.SELECTION_NAME) # if we can't find it, it probably is not as important if not sel.isempty(): jobs += sel.jobs(solv.Job.SOLVER_LOCK) for n in list(solved[arch]) + list(suggested): if n in locked: continue sel = pool.select(str(n), solv.Selection.SELECTION_NAME) jobs += sel.jobs(solv.Job.SOLVER_INSTALL) solver.solve(jobs) trans = solver.transaction() for s in trans.newsolvables(): solved[arch].setdefault(s.name, f"{group}:expansion") end = time.time() self.logger.info('%s - solving took %f', self.name, end - start) common = None # compute common packages across all architectures for arch in self.pkglist.filtered_architectures: if common is None: common = set(solved[arch]) continue common &= set(solved[arch]) if common is None: common = set() # reduce arch specific set by common ones solved['*'] = dict() for arch in self.pkglist.filtered_architectures: for p in common: solved['*'][p] = solved[arch].pop(p) self.solved_packages = solved self.solved = True def check_dups(self, modules, overlap): if not overlap: return packages = set(self.solved_packages['*']) for arch in self.pkglist.filtered_architectures: packages.update(self.solved_packages[arch]) for m in modules: # do not check with ourselves and only once for the rest if m.name <= self.name: continue if self.name in m.conflicts or m.name in self.conflicts: continue mp = set(m.solved_packages['*']) for arch in self.pkglist.filtered_architectures: mp.update(m.solved_packages[arch]) if len(packages & mp): overlap.comment += f"\n overlapping between {self.name} and {m.name}\n" for p in sorted(packages & mp): for arch in list(m.solved_packages): if m.solved_packages[arch].get(p, None): overlap.comment += f" # {m.name}.{arch}: {m.solved_packages[arch][p]}\n" if self.solved_packages[arch].get(p, None): overlap.comment += f" # {self.name}.{arch}: {self.solved_packages[arch][p]}\n" overlap.comment += f" - {p}\n" overlap._add_to_packages(p) def collect_devel_packages(self): for arch in self.pkglist.filtered_architectures: pool = self.pkglist.prepare_pool(arch, False) pool.Selection() for s in pool.solvables_iter(): if s.name.endswith('-devel'): # don't ask me why, but that's how it seems to work if s.lookup_void(solv.SOLVABLE_SOURCENAME): src = s.name else: src = s.lookup_str(solv.SOLVABLE_SOURCENAME) if src in self.srcpkgs: self.develpkgs[s.name] = self.srcpkgs[src] def _filter_already_selected(self, modules, pkgdict): # erase our own - so we don't filter our own for p in list(pkgdict): already_present = False for m in modules: for arch in ['*'] + self.pkglist.filtered_architectures: already_present = already_present or (p in m.solved_packages[arch]) if already_present: del pkgdict[p] def filter_already_selected(self, modules): self._filter_already_selected(modules, self.recommends) def tocompose(self, prefix, arch, ignore_broken=False, comment=None): packages = self.solved_packages.get(arch, dict()) name = self.name missing = dict() if arch == '*': missing = self.not_found content = '' unresolvable = self.unresolvable.get(arch, dict()) for name in sorted(list(packages) + list(missing) + list(unresolvable)): if name in self.silents: continue if name in missing: if ignore_broken and name not in self.required: msg = f" {name} not found on {','.join(sorted(missing[name]))}" content += f"{prefix} #- {msg}\n" self.logger.error(msg) continue if name in unresolvable: if ignore_broken and name not in self.required: msg = f" {name} uninstallable: {unresolvable[name]}" content += f"{prefix} #- {msg}\n" self.logger.error(msg) continue content += f"{prefix} - {name}" if name in packages and packages[name]: content += f" # reason: {packages[name]}" if comment: content += f" # {comment}" content += "\n" content += "\n" return content def toxml(self, arch, ignore_broken=False, comment=None): packages = self.solved_packages.get(arch, dict()) name = self.name if arch != '*': name += f".{arch}" root = ET.Element('group', {'name': name}) if comment: c = ET.Comment(comment) root.append(c) if arch != '*': ET.SubElement(root, 'conditional', {'name': f'only_{arch}'}) packagelist = ET.SubElement(root, 'packagelist', {'relationship': 'recommends'}) missing = dict() if arch == '*': missing = self.not_found unresolvable = self.unresolvable.get(arch, dict()) for name in sorted(list(packages) + list(missing) + list(unresolvable)): if name in self.silents: continue if name in missing: msg = f" {name} not found on {','.join(sorted(missing[name]))}" if ignore_broken and name not in self.required: c = ET.Comment(msg) packagelist.append(c) continue name = msg if name in unresolvable: msg = f' {name} uninstallable: {unresolvable[name]}' if ignore_broken and name not in self.required: c = ET.Comment(msg) packagelist.append(c) continue else: self.logger.error(msg) name = msg status = self.pkglist.supportstatus(name) or self.default_support_status attrs = {'name': name} if status is not None: attrs['supportstatus'] = status ET.SubElement(packagelist, 'package', attrs) if name in packages and packages[name]: c = ET.Comment(f' reason: {packages[name]} ') packagelist.append(c) return root # just list all packages in it as an array - to be output as one yml def summary(self): ret = set() for arch in ['*'] + self.pkglist.filtered_architectures: ret |= set(self.solved_packages[arch]) return ret