From e842fd26b1c826e98bccb82bd711bb258529e73e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dan=20=C4=8Cerm=C3=A1k?= Date: Wed, 21 Jun 2023 14:11:18 +0200 Subject: [PATCH 1/5] Add some type hints --- ReviewBot.py | 19 +++++++++------ check_source.py | 23 ++++++++++++------ origin-manager.py | 28 ++++++++++++++++++---- osclib/comments.py | 54 ++++++++++++++++++++++++++++++----------- osclib/conf.py | 23 +++++++++++------- osclib/core.py | 60 +++++++++++++++++++++++++++++++++------------- osclib/origin.py | 38 ++++++++++++++++++++++++----- 7 files changed, 182 insertions(+), 63 deletions(-) diff --git a/ReviewBot.py b/ReviewBot.py index 14cf40f3..0a3b962a 100644 --- a/ReviewBot.py +++ b/ReviewBot.py @@ -4,7 +4,7 @@ import os import sys import re import logging -from typing import List +from typing import Generator, List, Optional, Tuple, Union import cmdln from collections import namedtuple from collections import OrderedDict @@ -219,7 +219,7 @@ class ReviewBot(object): return return_value @memoize(session=True) - def request_override_check_users(self, project): + def request_override_check_users(self, project: str) -> List[str]: """Determine users allowed to override review in a comment command.""" config = Config.get(self.apiurl, project) @@ -235,7 +235,7 @@ class ReviewBot(object): return users - def request_override_check(self, force=False): + def request_override_check(self, force: bool = False) -> Optional[bool]: """Check for a comment command requesting review override.""" if not force and not self.override_allow: return None @@ -251,8 +251,8 @@ class ReviewBot(object): self.review_messages['declined'] = message return False - def request_commands(self, command, who_allowed=None, request=None, action=None, - include_description=True): + def request_commands(self, command: str, who_allowed=None, request=None, action=None, + include_description=True) -> Generator[Tuple[List[str], Optional[str]], None, None]: if not request: request = self.request if not action: @@ -545,7 +545,7 @@ class ReviewBot(object): self.review_messages['accepted'] += ': ' + message return self.request_default_return - def check_source_submission(self, src_project, src_package, src_rev, target_project, target_package): + def check_source_submission(self, src_project: str, src_package: str, src_rev: str, target_project: str, target_package: str) -> None: """ default implemention does nothing """ self.logger.info("%s/%s@%s -> %s/%s" % (src_project, src_package, src_rev, target_project, target_package)) return None @@ -812,7 +812,12 @@ class ReviewBot(object): return False - def request_age_wait(self, age_min=None, request=None, target_project=None): + def request_age_wait( + self, + age_min: Optional[Union[str, int, float]] = None, + request=None, + target_project: Optional[str] = None + ) -> bool: if not request: request = self.request diff --git a/check_source.py b/check_source.py index fb2f93f7..a2df03a0 100755 --- a/check_source.py +++ b/check_source.py @@ -9,6 +9,8 @@ import shutil import subprocess import sys import tempfile +from typing import Optional, Set +from cmdln import CmdlnOptionParser from lxml import etree as ET @@ -42,12 +44,12 @@ class CheckSource(ReviewBot.ReviewBot): self.skip_add_reviews = False - def target_project_config(self, project): + def target_project_config(self, project: str) -> None: # Load project config and allow for remote entries. config = Config.get(self.apiurl, project) self.single_action_require = str2bool(config.get('check-source-single-action-require', 'False')) - self.ignore_devel = not str2bool(config.get('devel-project-enforce', 'False')) + self.ignore_devel: bool = not str2bool(config.get('devel-project-enforce', 'False')) self.in_air_rename_allow = str2bool(config.get('check-source-in-air-rename-allow', 'False')) self.add_review_team = str2bool(config.get('check-source-add-review-team', 'True')) self.review_team = config.get('review-team') @@ -57,11 +59,11 @@ class CheckSource(ReviewBot.ReviewBot): self.devel_whitelist = config.get('devel-whitelist', '').split() self.skip_add_reviews = False self.ensure_source_exist_in_baseproject = str2bool(config.get('check-source-ensure-source-exist-in-baseproject', 'False')) - self.devel_baseproject = config.get('check-source-devel-baseproject', '') + self.devel_baseproject: str = config.get('check-source-devel-baseproject', '') self.allow_source_in_sle = str2bool(config.get('check-source-allow-source-in-sle', 'True')) self.sle_project_to_check = config.get('check-source-sle-project', '') self.allow_valid_source_origin = str2bool(config.get('check-source-allow-valid-source-origin', 'False')) - self.valid_source_origins = set(config.get('check-source-valid-source-origins', '').split(' ')) + self.valid_source_origins: Set[str] = set(config.get('check-source-valid-source-origins', '').split(' ')) self.add_devel_project_review = str2bool(config.get('check-source-add-devel-project-review', 'False')) self.allowed_scm_submission_sources = config.get('allowed-scm-submission-sources', '').split() @@ -78,7 +80,7 @@ class CheckSource(ReviewBot.ReviewBot): # It might make sense to supersede maintbot, but for now. self.skip_add_reviews = True - def is_good_name(self, package, target_package): + def is_good_name(self, package: Optional[str], target_package: Optional[str]) -> bool: self.logger.debug(f"is_good_name {package} <-> {target_package}") if target_package is None: # if the name doesn't matter, existance is all @@ -114,7 +116,14 @@ class CheckSource(ReviewBot.ReviewBot): return ret - def check_source_submission(self, source_project, source_package, source_revision, target_project, target_package): + def check_source_submission( + self, + source_project: str, + source_package: str, + source_revision: str, + target_project: str, + target_package: str + ) -> bool: super(CheckSource, self).check_source_submission(source_project, source_package, source_revision, target_project, target_package) self.target_project_config(target_project) @@ -762,7 +771,7 @@ class CommandLineInterface(ReviewBot.CommandLineInterface): ReviewBot.CommandLineInterface.__init__(self, args, kwargs) self.clazz = CheckSource - def get_optparser(self): + def get_optparser(self) -> CmdlnOptionParser: parser = ReviewBot.CommandLineInterface.get_optparser(self) parser.add_option('--skip-add-reviews', action='store_true', default=False, diff --git a/origin-manager.py b/origin-manager.py index d23ddfbb..904a735b 100755 --- a/origin-manager.py +++ b/origin-manager.py @@ -1,5 +1,6 @@ #!/usr/bin/python3 +from typing import Literal, Optional, Tuple, Union from osclib.core import devel_project_get from osclib.core import package_source_hash from osclib.core import package_kind @@ -77,7 +78,7 @@ class OriginManager(ReviewBot.ReviewBot): return True - def check_source_submission(self, src_project, src_package, src_rev, tgt_project, tgt_package): + def check_source_submission(self, src_project, src_package, src_rev, tgt_project, tgt_package) -> Optional[bool]: kind = package_kind(self.apiurl, tgt_project, tgt_package) if not (kind is None or kind == 'source'): self.review_messages['accepted'] = 'skipping {} package since not source'.format(kind) @@ -118,7 +119,16 @@ class OriginManager(ReviewBot.ReviewBot): source_hash_new, source_hash_old) return self.policy_result_handle(tgt_project, tgt_package, origin_info_new, origin_info_old, result) - def config_validate(self, target_project): + def config_validate(self, target_project: str) -> Tuple[bool, bool]: + """Check the settings ``OSRT:OriginConfig`` of the target project and + return a tuple of booleans. The first boolean indicates whether the + action should proceed and the second whether the config is valid. + + This function checks whether the value + ``OSRT:OriginConfig.fallback-group`` is present and whether + ``OSRT:OriginConfig.review-user`` matches the configured review_user. + + """ config = config_load(self.apiurl, target_project) if not config: # No perfect solution for lack of a config. For normal projects a @@ -138,7 +148,11 @@ class OriginManager(ReviewBot.ReviewBot): return True, True - def devel_project_simulate_check(self, source_project, target_project): + def devel_project_simulate_check( + self, + source_project: str, + target_project: str + ) -> Union[Tuple[str, str], Tuple[Literal[False], Literal[None]]]: config = config_load(self.apiurl, target_project) origin_list = config_origin_list(config, self.apiurl, target_project, skip_workarounds=True) @@ -157,7 +171,11 @@ class OriginManager(ReviewBot.ReviewBot): return False, None - def devel_project_simulate_check_command(self, source_project, target_project): + def devel_project_simulate_check_command( + self, + source_project: str, + target_project: str + ) -> Union[Tuple[str, Optional[str]], Tuple[Literal[False], Literal[None]]]: who_allowed = self.request_override_check_users(target_project) if self.request.creator not in who_allowed: who_allowed.append(self.request.creator) @@ -168,7 +186,7 @@ class OriginManager(ReviewBot.ReviewBot): return False, None - def policy_result_handle(self, project, package, origin_info_new, origin_info_old, result): + def policy_result_handle(self, project, package, origin_info_new, origin_info_old, result: PolicyResult) -> Optional[bool]: if result.wait and not result.accept: result.comments.append(f'Decision may be overridden via `@{self.review_user} override`.') diff --git a/osclib/comments.py b/osclib/comments.py index 9d3bbff6..c20e8131 100644 --- a/osclib/comments.py +++ b/osclib/comments.py @@ -1,28 +1,49 @@ from datetime import datetime +from typing import TYPE_CHECKING, Any, Dict, Generator, List, Literal, Optional, Tuple, TypedDict, Union from dateutil.parser import parse as date_parse import re -from lxml import etree as ET +if TYPE_CHECKING: + import xml.etree.ElementTree as ET +else: + from lxml import etree as ET -from osc.core import http_DELETE -from osc.core import http_GET -from osc.core import http_POST +from osc.connection import http_DELETE +from osc.connection import http_GET +from osc.connection import http_POST from osc.core import makeurl +class _BaseComment(TypedDict): + who: Optional[str] + when: datetime + parent: Optional[Any] + comment: Optional[str] + + +class Comment(_BaseComment): + id: Optional[str] + + +class RequestAsComment(_BaseComment): + id: Literal['-1'] + + class CommentAPI(object): COMMENT_MARKER_REGEX = re.compile(r'') def __init__(self, apiurl): self.apiurl = apiurl - def _prepare_url(self, request_id=None, project_name=None, - package_name=None, query=None): + def _prepare_url(self, request_id=None, project_name: Optional[str] = None, + package_name: Optional[str] = None, + query: Optional[Union[List[str], Dict[str, str]]] = None + ) -> str: """Prepare the URL to get/put comments in OBS. :param request_id: Request where to refer the comment. :param project_name: Project name where to refer comment. :param package_name: Package name where to refer the comment. - :returns: Formated URL for the request. + :returns: Formatted URL for the request. """ url = None if request_id: @@ -36,21 +57,20 @@ class CommentAPI(object): raise ValueError('Please, set request_id, project_name or / and package_name to add a comment.') return url - def _comment_as_dict(self, comment_element): + def _comment_as_dict(self, comment_element: ET.Element) -> Comment: """Convert an XML element comment into a dictionary. :param comment_element: XML element that store a comment. :returns: A Python dictionary object. """ - comment = { + return { 'who': comment_element.get('who'), - 'when': datetime.strptime(comment_element.get('when'), '%Y-%m-%d %H:%M:%S %Z'), + 'when': datetime.strptime(comment_element.get('when', ''), '%Y-%m-%d %H:%M:%S %Z'), 'id': comment_element.get('id'), 'parent': comment_element.get('parent', None), 'comment': comment_element.text, } - return comment - def request_as_comment_dict(self, request): + def request_as_comment_dict(self, request) -> RequestAsComment: return { 'who': request.creator, 'when': date_parse(request.statehistory[0].when), @@ -60,7 +80,7 @@ class CommentAPI(object): } def get_comments(self, request_id=None, project_name=None, - package_name=None): + package_name=None) -> Dict[str, Comment]: """Get the list of comments of an object in OBS. :param request_id: Request where to get comments. @@ -106,7 +126,13 @@ class CommentAPI(object): return c, info return None, None - def command_find(self, comments, user, command=None, who_allowed=None): + def command_find( + self, + comments: Dict[str, Comment], + user: str, + command: Optional[str] = None, + who_allowed=None + ) -> Generator[Tuple[List[str], Optional[str]], None, None]: """ Find comment commands with the optional conditions. diff --git a/osclib/conf.py b/osclib/conf.py index e4261628..f0bdff7c 100644 --- a/osclib/conf.py +++ b/osclib/conf.py @@ -1,4 +1,4 @@ -from __future__ import print_function +from typing import Dict, Optional from osc import OscConfigParser from collections import OrderedDict @@ -172,14 +172,17 @@ DEFAULT = { # -def str2bool(v): +def str2bool(v: Optional[str]) -> bool: return (v is not None and v.lower() in ("yes", "true", "t", "1")) class Config(object): - """Helper class to configuration file.""" + """Helper class for reading the osc configuration file and fetching the + remote config from the ``OSRT:config`` attribute in the target project. - def __init__(self, apiurl, project): + """ + + def __init__(self, apiurl: str, project: str) -> None: self.project = project self.remote_values = self.fetch_remote(apiurl) @@ -191,7 +194,7 @@ class Config(object): @staticmethod @memoize(session=True) # Allow reset by memoize_session_reset() for ReviewBot. - def get(apiurl, project): + def get(apiurl: str, project: str): """Cached version for directly accessing project config.""" # Properly handle loading the config for interconnect projects. from osclib.core import project_remote_apiurl @@ -204,7 +207,7 @@ class Config(object): def conf(self): return conf - def populate_conf(self): + def populate_conf(self) -> None: """Add sane default into the configuration and layer (defaults, remote, ~/.oscrc).""" defaults = {} default_ordered = OrderedDict(sorted(DEFAULT.items(), key=lambda i: int(i[1].get('_priority', 99)))) @@ -244,12 +247,16 @@ class Config(object): else: return defaults - def fetch_remote(self, apiurl): + def fetch_remote(self, apiurl: str) -> Optional[Dict[str, str]]: + """Fetch the configuration from the ``OSRT`` attribute namespace for the + current project from the OBS instance with the given apiurl. + + """ from osclib.core import attribute_value_load config = attribute_value_load(apiurl, self.project, 'Config') if config: cp = OscConfigParser.OscConfigParser() - config = u'[remote]\n' + config + config = u'[remote]\n' + str(config) cp.read_string(config) return dict(cp.items('remote')) diff --git a/osclib/core.py b/osclib/core.py index a86b1499..37755703 100644 --- a/osclib/core.py +++ b/osclib/core.py @@ -4,18 +4,18 @@ from dateutil.parser import parse as date_parse import re import socket import logging +from typing import List, Literal, Optional, Tuple, Union from lxml import etree as ET from urllib.error import HTTPError -from typing import Optional -from osc.core import create_submit_request +from osc.core import ReviewState, create_submit_request from osc.core import get_binarylist from osc.core import get_commitlog from osc.core import get_dependson -from osc.core import http_DELETE -from osc.core import http_GET -from osc.core import http_POST -from osc.core import http_PUT +from osc.connection import http_DELETE +from osc.connection import http_GET +from osc.connection import http_POST +from osc.connection import http_PUT from osc.core import makeurl from osc.core import owner from osc.core import search as osc_core_search @@ -279,7 +279,13 @@ def binary_src_debug(binary): @memoize(session=True) -def devel_project_get(apiurl: str, target_project: str, target_package: str): +def devel_project_get(apiurl: str, target_project: str, target_package: str) -> Union[Tuple[str, str], Tuple[Literal[None], Literal[None]]]: + """Fetch the devel project & package name of the supplied ``target_project`` + and ``target_package`` and return the devel project and devel package name + as a tuple. If no devel project has been defined, then return ``None, + None``. + + """ try: meta = ET.fromstringlist(show_package_meta(apiurl, target_project, target_package)) node = meta.find('devel') @@ -468,7 +474,22 @@ def package_list_kind_filtered(apiurl, project, kinds_allowed=['source']): yield package -def attribute_value_load(apiurl: str, project: str, name: str, namespace='OSRT', package: Optional[str] = None): +def attribute_value_load( + apiurl: str, + project: str, + name: str, + namespace: str = 'OSRT', + package: Optional[str] = None) -> Optional[Union[str, Literal[True]]]: + """Fetch an attribute from the OBS instance with the given ``apiurl`` for + the given ``project`` (and optionally for a package if a package name is + provided). The attribute is expected to have the name ``$namespace:$name``, + with the namespace defaulting to ``OSRT``. + + Attributes are stored as xml and can be either strings or are interpreted as + ``True`` if only the element is present but has no other children or + entries. + + """ path = list(filter(None, ['source', project, package, '_attribute', namespace + ':' + name])) url = makeurl(apiurl, path) @@ -506,7 +527,7 @@ def attribute_value_save( value: str, namespace='OSRT', package: Optional[str] = None -): +) -> None: root = ET.Element('attributes') attribute = ET.SubElement(root, 'attribute') @@ -525,7 +546,7 @@ def attribute_value_save( raise e -def attribute_value_delete(apiurl: str, project: str, name: str, namespace='OSRT', package: Optional[str] = None): +def attribute_value_delete(apiurl: str, project: str, name: str, namespace='OSRT', package: Optional[str] = None) -> None: http_DELETE(makeurl( apiurl, list(filter(None, ['source', project, package, '_attribute', namespace + ':' + name])))) @@ -661,7 +682,12 @@ def package_source_age(apiurl, project, package): return datetime.utcnow() - package_source_changed(apiurl, project, package) -def entity_exists(apiurl, project, package=None): +def entity_exists(apiurl: str, project: str, package: Optional[str] = None) -> bool: + """Check whether the project or the project/package exists on the OBS + instance behind the supplied apiurl and return True if it exists or False + otherwise. + + """ try: http_GET(makeurl(apiurl, list(filter(None, ['source', project, package])) + ['_meta'])) except HTTPError as e: @@ -673,7 +699,9 @@ def entity_exists(apiurl, project, package=None): return True -def package_kind(apiurl, project, package): +def package_kind( + apiurl, project, package +) -> Optional[Literal['meta', 'multibuild_subpackage', 'patchinfo', 'maintenance_update', 'multispec_subpackage', 'source']]: if package.startswith('00') or package.startswith('_'): return 'meta' @@ -900,7 +928,7 @@ def review_find_last(request, user, states=['all']): return None -def reviews_remaining(request, incident_psuedo=False): +def reviews_remaining(request: Request, incident_psuedo=False) -> List[ReviewState]: reviews = [] for review in request.reviews: if review.state != 'accepted': @@ -939,7 +967,7 @@ def issue_trackers(apiurl): return trackers -def issue_tracker_by_url(apiurl, tracker_url): +def issue_tracker_by_url(apiurl: str, tracker_url: str) -> Optional[str]: url = makeurl(apiurl, ['issue_trackers']) root = ET.parse(http_GET(url)).getroot() if not tracker_url.endswith('/'): @@ -952,7 +980,7 @@ def issue_tracker_label_apply(tracker, identifier): return tracker.find('label').text.replace('@@@', identifier) -def request_remote_identifier(apiurl, apiurl_remote, request_id): +def request_remote_identifier(apiurl: str, apiurl_remote: str, request_id: str) -> str: if apiurl_remote == apiurl: return 'request#{}'.format(request_id) @@ -1007,7 +1035,7 @@ def action_is_patchinfo(action): action.src_package == 'patchinfo' or action.src_package.startswith('patchinfo.'))) -def request_action_key(action): +def request_action_key(action: Action) -> str: identifier = [] if action.type in ['add_role', 'change_devel', 'maintenance_release', 'set_bugowner', 'submit']: diff --git a/osclib/origin.py b/osclib/origin.py index 5fce0c92..b14a0378 100644 --- a/osclib/origin.py +++ b/osclib/origin.py @@ -1,6 +1,7 @@ from copy import deepcopy from collections import namedtuple import logging +from typing import Any, Dict, Generator, List, Literal, NamedTuple, Optional, Tuple, TypedDict, Union from osclib.conf import Config from osclib.conf import str2bool from osclib.core import attribute_value_load @@ -38,7 +39,26 @@ DEFAULTS = { 'fallback-group': '', 'fallback-workaround': {}, } -POLICY_DEFAULTS = { + + +class Policy(TypedDict): + additional_reviews: List[Any] + automatic_updates: bool + automatic_updates_initial: bool + automatic_updates_supersede: bool + automatic_updates_delay: int + automatic_updates_frequency: int + maintainer_review_always: bool + maintainer_review_initial: bool + pending_submission_allow: bool + pending_submission_consider: bool + pending_submission_allowed_reviews: List[str] + # Submit pending requests with a set of allowed reviews, but still wait for + # the above reviews before being accepted. + pending_submission_allowed_reviews_update: List[str] + + +POLICY_DEFAULTS: Policy = { 'additional_reviews': [], 'automatic_updates': True, 'automatic_updates_initial': False, @@ -84,7 +104,13 @@ def config_load(apiurl, project): return config_resolve(apiurl, project, yaml.safe_load(config)) -def config_origin_generator(origins, apiurl=None, project=None, package=None, skip_workarounds=False): +def config_origin_generator( + origins, + apiurl: Optional[str] = None, + project: Optional[str] = None, + package: Optional[str] = None, + skip_workarounds=False +) -> Generator[Tuple[str, Any], None, None]: for origin_item in origins: for origin, values in origin_item.items(): is_workaround = origin_workaround_check(origin) @@ -103,7 +129,7 @@ def config_origin_generator(origins, apiurl=None, project=None, package=None, sk break # Only support single value inside list item. -def config_resolve(apiurl, project, config): +def config_resolve(apiurl: str, project: str, config): defaults = POLICY_DEFAULTS.copy() defaults_workarounds = POLICY_DEFAULTS.copy() @@ -222,17 +248,17 @@ def config_resolve_apply(config, values_apply, key=None, workaround=False, until values.update(values_apply) -def origin_workaround_check(origin): +def origin_workaround_check(origin: str) -> bool: return origin.endswith('~') -def origin_workaround_ensure(origin): +def origin_workaround_ensure(origin: str) -> str: if not origin_workaround_check(origin): return origin + '~' return origin -def origin_workaround_strip(origin): +def origin_workaround_strip(origin: str) -> str: if origin_workaround_check(origin): return origin[:-1] return origin From 510d91c681783218fa479b3b6f02a3e3215f0812 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dan=20=C4=8Cerm=C3=A1k?= Date: Wed, 21 Jun 2023 14:11:18 +0200 Subject: [PATCH 2/5] ReviewBot: convert REVIEW_CHOICES into an enum --- ReviewBot.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/ReviewBot.py b/ReviewBot.py index 0a3b962a..825295aa 100644 --- a/ReviewBot.py +++ b/ReviewBot.py @@ -1,5 +1,6 @@ #!/usr/bin/python3 +from enum import Enum, unique import os import sys import re @@ -66,6 +67,16 @@ class PackageLookup(object): return None +@unique +class ReviewChoices(Enum): + NORMAL = 'normal' + NO = 'no' + ACCEPT = 'accept' + ACCEPT_ONPASS = 'accept-onpass' + FALLBACK_ONFAIL = 'fallback-onfail' + FALLBACK_ALWAYS = 'fallback-always' + + class ReviewBot(object): """ A generic obs request reviewer @@ -76,7 +87,10 @@ class ReviewBot(object): """ DEFAULT_REVIEW_MESSAGES = {'accepted': 'ok', 'declined': 'review failed'} - REVIEW_CHOICES = ('normal', 'no', 'accept', 'accept-onpass', 'fallback-onfail', 'fallback-always') + REVIEW_CHOICES: Tuple[ReviewChoices, ...] = ( + ReviewChoices.NORMAL, ReviewChoices.NO, ReviewChoices.ACCEPT, + ReviewChoices.ACCEPT_ONPASS, ReviewChoices.FALLBACK_ONFAIL, ReviewChoices.FALLBACK_ALWAYS + ) COMMENT_MARKER_REGEX = re.compile(r'') @@ -98,7 +112,7 @@ class ReviewBot(object): self.review_group = group self.requests: List[osc.core.Request] = [] self.review_messages = ReviewBot.DEFAULT_REVIEW_MESSAGES - self._review_mode = 'normal' + self._review_mode: ReviewChoices = ReviewChoices.NORMAL self.fallback_user = None self.fallback_group = None self.comment_api = CommentAPI(self.apiurl) @@ -151,11 +165,11 @@ class ReviewBot(object): return self.staging_apis[project] @property - def review_mode(self): + def review_mode(self) -> ReviewChoices: return self._review_mode @review_mode.setter - def review_mode(self, value): + def review_mode(self, value: ReviewChoices): if value not in self.REVIEW_CHOICES: raise Exception("invalid review option: %s" % value) self._review_mode = value From 1003e6df14ceaf40c2ff96158672e225ccddf031 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dan=20=C4=8Cerm=C3=A1k?= Date: Wed, 21 Jun 2023 14:11:18 +0200 Subject: [PATCH 3/5] Raise a concrete exception type, not the generic exception base class --- ReviewBot.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ReviewBot.py b/ReviewBot.py index 825295aa..804d48fe 100644 --- a/ReviewBot.py +++ b/ReviewBot.py @@ -171,7 +171,7 @@ class ReviewBot(object): @review_mode.setter def review_mode(self, value: ReviewChoices): if value not in self.REVIEW_CHOICES: - raise Exception("invalid review option: %s" % value) + raise ValueError("invalid review option: %s" % value) self._review_mode = value def set_request_ids(self, ids): From 05711eeccd279f47257e7be41a715b5b4a0a0c2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dan=20=C4=8Cerm=C3=A1k?= Date: Wed, 21 Jun 2023 14:11:18 +0200 Subject: [PATCH 4/5] Convert namedtuples into a typed NamedTuples --- osclib/origin.py | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/osclib/origin.py b/osclib/origin.py index b14a0378..131fe3f7 100644 --- a/osclib/origin.py +++ b/osclib/origin.py @@ -1,7 +1,8 @@ from copy import deepcopy -from collections import namedtuple import logging from typing import Any, Dict, Generator, List, Literal, NamedTuple, Optional, Tuple, TypedDict, Union + +from osc.core import ReviewState from osclib.conf import Config from osclib.conf import str2bool from osclib.core import attribute_value_load @@ -83,9 +84,22 @@ POLICY_DEFAULTS: Policy = { ], } -OriginInfo = namedtuple('OriginInfo', ['project', 'pending']) -PendingRequestInfo = namedtuple('PendingRequestInfo', ['identifier', 'reviews_remaining']) -PolicyResult = namedtuple('PolicyResult', ['wait', 'accept', 'reviews', 'comments']) + +class PendingRequestInfo(NamedTuple): + identifier: str + reviews_remaining: List[ReviewState] + + +class OriginInfo(NamedTuple): + project: str + pending: PendingRequestInfo + + +class PolicyResult(NamedTuple): + wait: bool + accept: bool + reviews: Dict[str, str] + comments: List[str] def origin_info_str(self): @@ -266,7 +280,7 @@ def origin_workaround_strip(origin: str) -> str: @memoize(session=True) def origin_find(apiurl, target_project, package, source_hash=None, current=False, - pending_allow=True, fallback=True): + pending_allow=True, fallback=True) -> Optional[OriginInfo]: config = config_load(apiurl, target_project) if not source_hash: @@ -316,7 +330,7 @@ def project_source_contain(apiurl, project, package, source_hash): return False -def project_source_pending(apiurl, project, package, source_hash): +def project_source_pending(apiurl, project, package, source_hash) -> Union[PendingRequestInfo, Literal[False]]: apiurl_remote, project_remote = project_remote_apiurl(apiurl, project) request_actions = request_action_list_source(apiurl_remote, project_remote, package, states=['new', 'review'], include_release=True) @@ -435,7 +449,7 @@ def origin_find_highest(apiurl, project, package): def policy_evaluate(apiurl, project, package, origin_info_new, origin_info_old, - source_hash_new, source_hash_old): + source_hash_new, source_hash_old) -> PolicyResult: if origin_info_new is None: config = config_load(apiurl, project) origins = config_origin_list(config, apiurl, project, package, True) @@ -536,7 +550,7 @@ def policy_input_calculate(apiurl, project, package, return inputs -def policy_input_evaluate(policy, inputs): +def policy_input_evaluate(policy, inputs) -> PolicyResult: result = PolicyResult(False, True, {}, []) if inputs['new_package']: From 1703348adb1a3126d47efa38a83de0e79b1680c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dan=20=C4=8Cerm=C3=A1k?= Date: Thu, 22 Jun 2023 17:39:54 +0200 Subject: [PATCH 5/5] Remove unused variable --- osclib/origin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/osclib/origin.py b/osclib/origin.py index 131fe3f7..0d6f957b 100644 --- a/osclib/origin.py +++ b/osclib/origin.py @@ -132,7 +132,7 @@ def config_origin_generator( break if (origin == '' or origin == '~') and apiurl and project and package: - devel_project, devel_package = origin_devel_project(apiurl, project, package) + devel_project, _ = origin_devel_project(apiurl, project, package) if not devel_project: break origin = devel_project