Merge pull request #2877 from SchoolGuy/add_typing_hints

Add typing hints
This commit is contained in:
Stephan Kulow 2022-10-13 15:03:54 +02:00 committed by GitHub
commit 5e2a171600
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 150 additions and 82 deletions

View File

@ -9,7 +9,7 @@ The generated release spec files are split into 000release-packages to avoid nee
## Input ## Input
The package list generator reads several files. The most important are group*.yml (tradionally only groups.yml) within 000package-groups. The package list generator reads several files. The most important are group*.yml (traditionally only groups.yml) within 000package-groups.
### supportstatus.txt ### supportstatus.txt
The file lists the packages and their support level. It's only necessary to list packages here that have a different level than the default level specificied in the groups. The format is plain text: <package name> <level> - the level is handed over 1:1 to KIWI file. Currently used values are: unsupported, l2 and l3 The file lists the packages and their support level. It's only necessary to list packages here that have a different level than the default level specificied in the groups. The format is plain text: <package name> <level> - the level is handed over 1:1 to KIWI file. Currently used values are: unsupported, l2 and l3
@ -19,7 +19,7 @@ The file is a list of package lists and the special hash 'OUTPUT'. OUTPUT contai
We currently support: We currently support:
* default-support * default-support
Sets the support level in case there is no explicitly entry in [supportstatus.txt](#supportstatus.txt), defaults to 'unsupported' Sets the support level in case there is no explicitly entry in [supportstatus.txt](#supportstatustxt), defaults to 'unsupported'
* recommends * recommends
If the solver should take recommends into account when solving the package list, defaults to false. If the solver should take recommends into account when solving the package list, defaults to false.
* includes * includes
@ -87,7 +87,7 @@ list2:
- pkg6: [recommended] - pkg6: [recommended]
``` ```
## Overlap calculcation ## Overlap calculation
TODO TODO
## Handling in staging workflow ## Handling in staging workflow

View File

@ -6,6 +6,7 @@ import socket
import logging import logging
from lxml import etree as ET from lxml import etree as ET
from urllib.error import HTTPError from urllib.error import HTTPError
from typing import Optional
from osc.core import create_submit_request from osc.core import create_submit_request
from osc.core import get_binarylist from osc.core import get_binarylist
@ -413,7 +414,7 @@ def package_list_kind_filtered(apiurl, project, kinds_allowed=['source']):
yield package yield package
def attribute_value_load(apiurl, project, name, namespace='OSRT', package=None): def attribute_value_load(apiurl: str, project: str, name: str, namespace='OSRT', package: Optional[str] = None):
path = list(filter(None, ['source', project, package, '_attribute', namespace + ':' + name])) path = list(filter(None, ['source', project, package, '_attribute', namespace + ':' + name]))
url = makeurl(apiurl, path) url = makeurl(apiurl, path)
@ -444,7 +445,14 @@ def attribute_value_load(apiurl, project, name, namespace='OSRT', package=None):
# Remember to create for both OBS and IBS as necessary. # Remember to create for both OBS and IBS as necessary.
def attribute_value_save(apiurl, project, name, value, namespace='OSRT', package=None): def attribute_value_save(
apiurl: str,
project: str,
name: str,
value: str,
namespace='OSRT',
package: Optional[str] = None
):
root = ET.Element('attributes') root = ET.Element('attributes')
attribute = ET.SubElement(root, 'attribute') attribute = ET.SubElement(root, 'attribute')
@ -463,13 +471,13 @@ def attribute_value_save(apiurl, project, name, value, namespace='OSRT', package
raise e raise e
def attribute_value_delete(apiurl, project, name, namespace='OSRT', package=None): def attribute_value_delete(apiurl: str, project: str, name: str, namespace='OSRT', package: Optional[str] = None):
http_DELETE(makeurl( http_DELETE(makeurl(
apiurl, list(filter(None, ['source', project, package, '_attribute', namespace + ':' + name])))) apiurl, list(filter(None, ['source', project, package, '_attribute', namespace + ':' + name]))))
@memoize(session=True) @memoize(session=True)
def repository_path_expand(apiurl, project, repo, visited_repos=None): def repository_path_expand(apiurl: str, project: str, repo: str, visited_repos: Optional[set] = None):
"""Recursively list underlying projects.""" """Recursively list underlying projects."""
if visited_repos is None: if visited_repos is None:
visited_repos = set() visited_repos = set()

View File

@ -1,27 +1,58 @@
import datetime import datetime
import textwrap import enum
import re
import tempfile
import logging import logging
import os import os
import re
import sys import sys
import tempfile
import textwrap
from typing import Dict, List, Optional
from lxml import etree as ET from lxml import etree as ET
from osc.core import Package, checkout_package, http_GET, makeurl
from osclib.comments import CommentAPI from osclib.comments import CommentAPI
from osc.core import checkout_package, http_GET, makeurl
from osc.core import Package
MARKER = 'PackageListDiff' MARKER = 'PackageListDiff'
class PkglistComments(object): class PkglistSectionCommend(enum.Enum):
ADD = "add"
REMOVE = "remove"
MOVE = "move"
class PkglistSection:
def __init__(
self,
command: PkglistSectionCommend,
pkgs: Optional[List[str]] = None,
to_module: Optional[List[str]] = None,
from_module: Optional[List[str]] = None
):
self.command = command
if pkgs is None:
self.pkgs = []
else:
self.pkgs = pkgs
if pkgs is None:
self.to_module = []
else:
self.to_module = to_module
if pkgs is None:
self.from_module = []
else:
self.from_module = from_module
class PkglistComments:
"""Handling staging comments of diffs""" """Handling staging comments of diffs"""
def __init__(self, apiurl): def __init__(self, apiurl: str):
self.apiurl = apiurl self.apiurl = apiurl
self.comment = CommentAPI(apiurl) self.comment = CommentAPI(apiurl)
def read_summary_file(self, file): def read_summary_file(self, file: str) -> Dict[str, List[str]]:
ret = dict() ret = dict()
with open(file, 'r') as f: with open(file, 'r') as f:
for line in f: for line in f:
@ -30,7 +61,7 @@ class PkglistComments(object):
ret[pkg].append(group) ret[pkg].append(group)
return ret return ret
def write_summary_file(self, file, content): def write_summary_file(self, file: str, content: dict):
output = [] output = []
for pkg in sorted(content): for pkg in sorted(content):
for group in sorted(content[pkg]): for group in sorted(content[pkg]):
@ -40,7 +71,7 @@ class PkglistComments(object):
for line in sorted(output): for line in sorted(output):
f.write(line + '\n') f.write(line + '\n')
def calculcate_package_diff(self, old_file, new_file): def calculcate_package_diff(self, old_file: str, new_file: str):
old_file = self.read_summary_file(old_file) old_file = self.read_summary_file(old_file)
new_file = self.read_summary_file(new_file) new_file = self.read_summary_file(new_file)
@ -102,7 +133,7 @@ class PkglistComments(object):
return report.strip() return report.strip()
def handle_package_diff(self, project, old_file, new_file): def handle_package_diff(self, project: str, old_file: str, new_file: str):
comments = self.comment.get_comments(project_name=project) comments = self.comment.get_comments(project_name=project)
comment, _ = self.comment.comment_find(comments, MARKER) comment, _ = self.comment.comment_find(comments, MARKER)
@ -134,7 +165,7 @@ class PkglistComments(object):
return 1 return 1
def is_approved(self, comment, comments): def is_approved(self, comment, comments: dict) -> str | None:
if not comment: if not comment:
return None return None
@ -145,19 +176,28 @@ class PkglistComments(object):
return c['who'] return c['who']
return None return None
def parse_title(self, line): def parse_title(self, line: str) -> Optional[PkglistSection]:
m = re.match(r'\*\*Add to (.*)\*\*', line) m = re.match(r'\*\*Add to (.*)\*\*', line)
if m: if m:
return {'cmd': 'add', 'to': m.group(1).split(','), 'pkgs': []} return PkglistSection(PkglistSectionCommend.ADD, pkgs=[], to_module=m.group(1).split(','))
m = re.match(r'\*\*Move from (.*) to (.*)\*\*', line) m = re.match(r'\*\*Move from (.*) to (.*)\*\*', line)
if m: if m:
return {'cmd': 'move', 'from': m.group(1).split(','), 'to': m.group(2).split(','), 'pkgs': []} return PkglistSection(
PkglistSectionCommend.MOVE,
pkgs=[],
from_module=m.group(1).split(','),
to_module=m.group(2).split(','),
)
m = re.match(r'\*\*Remove from (.*)\*\*', line) m = re.match(r'\*\*Remove from (.*)\*\*', line)
if m: if m:
return {'cmd': 'remove', 'from': m.group(1).split(','), 'pkgs': []} return PkglistSection(
PkglistSectionCommend.REMOVE,
pkgs=[],
from_module=m.group(1).split(','),
)
return None return None
def parse_sections(self, comment): def parse_sections(self, comment: str) -> List[PkglistSection]:
current_section = None current_section = None
sections = [] sections = []
in_quote = False in_quote = False
@ -174,33 +214,33 @@ class PkglistComments(object):
for pkg in line.split(','): for pkg in line.split(','):
pkg = pkg.strip() pkg = pkg.strip()
if pkg: if pkg:
current_section['pkgs'].append(pkg) current_section.pkgs.append(pkg)
if current_section: if current_section:
sections.append(current_section) sections.append(current_section)
return sections return sections
def apply_move(self, content, section): def apply_move(self, content: Dict[str, List[str]], section: PkglistSection):
for pkg in section['pkgs']: for pkg in section.pkgs:
pkg_content = content[pkg] pkg_content = content[pkg]
for group in section['from']: for group in section.from_module:
try: try:
pkg_content.remove(group) pkg_content.remove(group)
except ValueError: except ValueError:
logging.error(f"Can't remove {pkg} from {group}, not there. Mismatch.") logging.error(f"Can't remove {pkg} from {group}, not there. Mismatch.")
sys.exit(1) sys.exit(1)
for group in section['to']: for group in section.to_module:
pkg_content.append(group) pkg_content.append(group)
content[pkg] = pkg_content content[pkg] = pkg_content
def apply_add(self, content, section): def apply_add(self, content: Dict[str, List[str]], section: PkglistSection):
for pkg in section['pkgs']: for pkg in section.pkgs:
content.setdefault(pkg, []) content.setdefault(pkg, [])
content[pkg] += section['to'] content[pkg] += section.to_module
def apply_remove(self, content, section): def apply_remove(self, content: Dict[str, List[str]], section: PkglistSection):
for pkg in section['pkgs']: for pkg in section.pkgs:
pkg_content = content[pkg] pkg_content = content[pkg]
for group in section['from']: for group in section.from_module:
try: try:
pkg_content.remove(group) pkg_content.remove(group)
except ValueError: except ValueError:
@ -208,38 +248,38 @@ class PkglistComments(object):
sys.exit(1) sys.exit(1)
content[pkg] = pkg_content content[pkg] = pkg_content
def apply_commands(self, filename, sections): def apply_commands(self, filename: str, sections: List[PkglistSection]):
content = self.read_summary_file(filename) content = self.read_summary_file(filename)
for section in sections: for section in sections:
if section['cmd'] == 'move': if section.command == PkglistSectionCommend.MOVE:
self.apply_move(content, section) self.apply_move(content, section)
elif section['cmd'] == 'add': elif section.command == PkglistSectionCommend.ADD:
self.apply_add(content, section) self.apply_add(content, section)
elif section['cmd'] == 'remove': elif section.command == PkglistSectionCommend.REMOVE:
self.apply_remove(content, section) self.apply_remove(content, section)
self.write_summary_file(filename, content) self.write_summary_file(filename, content)
def format_pkgs(self, pkgs): def format_pkgs(self, pkgs: List[str]):
text = ', '.join(pkgs) text = ', '.join(pkgs)
return " " + "\n ".join(textwrap.wrap(text, width=68, break_long_words=False, break_on_hyphens=False)) + "\n\n" return " " + "\n ".join(textwrap.wrap(text, width=68, break_long_words=False, break_on_hyphens=False)) + "\n\n"
def format_move(self, section): def format_move(self, section: PkglistSection):
gfrom = ','.join(section['from']) gfrom = ','.join(section.from_module)
gto = ','.join(section['to']) gto = ','.join(section.to_module)
text = f" * Move from {gfrom} to {gto}:\n" text = f" * Move from {gfrom} to {gto}:\n"
return text + self.format_pkgs(section['pkgs']) return text + self.format_pkgs(section.pkgs)
def format_add(self, section): def format_add(self, section: PkglistSection):
gto = ','.join(section['to']) gto = ','.join(section.to_module)
text = f" * Add to {gto}:\n" text = f" * Add to {gto}:\n"
return text + self.format_pkgs(section['pkgs']) return text + self.format_pkgs(section.pkgs)
def format_remove(self, section): def format_remove(self, section: PkglistSection):
gfrom = ','.join(section['from']) gfrom = ','.join(section.from_module)
text = f" * Remove from {gfrom}:\n" text = f" * Remove from {gfrom}:\n"
return text + self.format_pkgs(section['pkgs']) return text + self.format_pkgs(section.pkgs)
def apply_changes(self, filename, sections, approver): def apply_changes(self, filename: str, sections: List[PkglistSection], approver: str):
text = "-------------------------------------------------------------------\n" text = "-------------------------------------------------------------------\n"
now = datetime.datetime.utcnow() now = datetime.datetime.utcnow()
date = now.strftime("%a %b %d %H:%M:%S UTC %Y") date = now.strftime("%a %b %d %H:%M:%S UTC %Y")
@ -249,11 +289,11 @@ class PkglistComments(object):
email = root.find('email').text email = root.find('email').text
text += f"{date} - {realname} <{email}>\n\n- Approved changes to summary-staging.txt\n" text += f"{date} - {realname} <{email}>\n\n- Approved changes to summary-staging.txt\n"
for section in sections: for section in sections:
if section['cmd'] == 'move': if section.command == PkglistSectionCommend.MOVE:
text += self.format_move(section) text += self.format_move(section)
elif section['cmd'] == 'add': elif section.command == PkglistSectionCommend.ADD:
text += self.format_add(section) text += self.format_add(section)
elif section['cmd'] == 'remove': elif section.command == PkglistSectionCommend.REMOVE:
text += self.format_remove(section) text += self.format_remove(section)
with open(filename + '.new', 'w') as writer: with open(filename + '.new', 'w') as writer:
writer.write(text) writer.write(text)
@ -262,7 +302,7 @@ class PkglistComments(object):
writer.write(line) writer.write(line)
os.rename(filename + '.new', filename) os.rename(filename + '.new', filename)
def check_staging_accept(self, project, target): def check_staging_accept(self, project: str, target: str):
comments = self.comment.get_comments(project_name=project) comments = self.comment.get_comments(project_name=project)
comment, _ = self.comment.comment_find(comments, MARKER) comment, _ = self.comment.comment_find(comments, MARKER)
approver = self.is_approved(comment, comments) approver = self.is_approved(comment, comments)

View File

@ -1,5 +1,6 @@
from io import StringIO from io import StringIO
from datetime import datetime from datetime import datetime
from typing import List
import dateutil.parser import dateutil.parser
import logging import logging
import textwrap import textwrap
@ -52,7 +53,7 @@ class StagingAPI(object):
Class containing various api calls to work with staging projects. Class containing various api calls to work with staging projects.
""" """
def __init__(self, apiurl, project): def __init__(self, apiurl: str, project: str):
"""Initialize instance variables.""" """Initialize instance variables."""
self.apiurl = apiurl self.apiurl = apiurl
@ -149,7 +150,7 @@ class StagingAPI(object):
xpath = f'repository[@name="{repo_name}"]' xpath = f'repository[@name="{repo_name}"]'
return len(meta.xpath(xpath)) > 0 return len(meta.xpath(xpath)) > 0
def makeurl(self, paths, query=None): def makeurl(self, paths: List[str], query=None) -> str:
""" """
Wrapper around osc's makeurl passing our apiurl Wrapper around osc's makeurl passing our apiurl
:return url made for l and query :return url made for l and query

View File

@ -86,7 +86,7 @@ class CommandLineInterface(ToolBase.CommandLineInterface):
if apiurl.find('opensuse.org') > 0: if apiurl.find('opensuse.org') > 0:
os.environ['OBS_NAME'] = 'build.opensuse.org' os.environ['OBS_NAME'] = 'build.opensuse.org'
def solve_project(project, scope): def solve_project(project, scope: str):
try: try:
self.tool.reset() self.tool.reset()
self.tool.dry_run = self.options.dry self.tool.dry_run = self.options.dry

View File

@ -8,6 +8,8 @@ import shutil
import subprocess import subprocess
import yaml import yaml
from typing import Any, Mapping, Optional
from lxml import etree as ET from lxml import etree as ET
from osc.core import checkout_package from osc.core import checkout_package
@ -175,7 +177,7 @@ class PkgListGen(ToolBase.ToolBase):
if package[0] not in g.solved_packages['*']: if package[0] not in g.solved_packages['*']:
self.logger.error(f'Missing {package[0]} in {groupname} for {arch}') self.logger.error(f'Missing {package[0]} in {groupname} for {arch}')
def expand_repos(self, project, repo='standard'): def expand_repos(self, project: str, repo='standard'):
return repository_path_expand(self.apiurl, project, repo) return repository_path_expand(self.apiurl, project, repo)
def _check_supplements(self): def _check_supplements(self):
@ -506,7 +508,13 @@ class PkgListGen(ToolBase.ToolBase):
print('%endif', file=output) print('%endif', file=output)
output.flush() output.flush()
def solve_project(self, ignore_unresolvable=False, ignore_recommended=False, locale=None, locales_from=None): def solve_project(
self,
ignore_unresolvable=False,
ignore_recommended=False,
locale: Optional[str] = None,
locales_from: Optional[str] = None
):
self.load_all_groups() self.load_all_groups()
if not self.output: if not self.output:
self.logger.error('OUTPUT not defined') self.logger.error('OUTPUT not defined')
@ -601,9 +609,19 @@ class PkgListGen(ToolBase.ToolBase):
new_lines.append(line.replace('<version></version>', product_version)) new_lines.append(line.replace('<version></version>', product_version))
open(product_file, 'w').write(''.join(new_lines)) open(product_file, 'w').write(''.join(new_lines))
def update_and_solve_target(self, api, target_project, target_config, main_repo, def update_and_solve_target(
project, scope, force, no_checkout, self,
only_release_packages, stop_after_solve): api,
target_project: str,
target_config: Mapping[str, Any],
main_repo: str,
project: str,
scope: str,
force: bool,
no_checkout: bool,
only_release_packages: bool,
stop_after_solve: bool
):
self.all_architectures = target_config.get('pkglistgen-archs').split(' ') self.all_architectures = target_config.get('pkglistgen-archs').split(' ')
self.use_newest_version = str2bool(target_config.get('pkglistgen-use-newest-version', 'False')) self.use_newest_version = str2bool(target_config.get('pkglistgen-use-newest-version', 'False'))
self.repos = self.expand_repos(project, main_repo) self.repos = self.expand_repos(project, main_repo)
@ -682,11 +700,12 @@ class PkgListGen(ToolBase.ToolBase):
self.load_all_groups() self.load_all_groups()
self.write_group_stubs() self.write_group_stubs()
else: else:
summary = self.solve_project(ignore_unresolvable=str2bool(target_config.get('pkglistgen-ignore-unresolvable')), summary = self.solve_project(
ignore_recommended=str2bool( ignore_unresolvable=str2bool(target_config.get('pkglistgen-ignore-unresolvable')),
target_config.get('pkglistgen-ignore-recommended')), ignore_recommended=str2bool(target_config.get('pkglistgen-ignore-recommended')),
locale=target_config.get('pkglistgen-locale'), locale=target_config.get('pkglistgen-locale'),
locales_from=target_config.get('pkglistgen-locales-from')) locales_from=target_config.get('pkglistgen-locales-from')
)
if stop_after_solve: if stop_after_solve:
return return