Add some type hints

This commit is contained in:
Dan Čermák 2023-06-21 14:11:18 +02:00
parent 3b612be2da
commit e842fd26b1
No known key found for this signature in database
GPG Key ID: 96B29BCAD2AE5A44
7 changed files with 182 additions and 63 deletions

View File

@ -4,7 +4,7 @@ import os
import sys
import re
import logging
from typing import List
from typing import Generator, List, Optional, Tuple, Union
import cmdln
from collections import namedtuple
from collections import OrderedDict
@ -219,7 +219,7 @@ class ReviewBot(object):
return return_value
@memoize(session=True)
def request_override_check_users(self, project):
def request_override_check_users(self, project: str) -> List[str]:
"""Determine users allowed to override review in a comment command."""
config = Config.get(self.apiurl, project)
@ -235,7 +235,7 @@ class ReviewBot(object):
return users
def request_override_check(self, force=False):
def request_override_check(self, force: bool = False) -> Optional[bool]:
"""Check for a comment command requesting review override."""
if not force and not self.override_allow:
return None
@ -251,8 +251,8 @@ class ReviewBot(object):
self.review_messages['declined'] = message
return False
def request_commands(self, command, who_allowed=None, request=None, action=None,
include_description=True):
def request_commands(self, command: str, who_allowed=None, request=None, action=None,
include_description=True) -> Generator[Tuple[List[str], Optional[str]], None, None]:
if not request:
request = self.request
if not action:
@ -545,7 +545,7 @@ class ReviewBot(object):
self.review_messages['accepted'] += ': ' + message
return self.request_default_return
def check_source_submission(self, src_project, src_package, src_rev, target_project, target_package):
def check_source_submission(self, src_project: str, src_package: str, src_rev: str, target_project: str, target_package: str) -> None:
""" default implemention does nothing """
self.logger.info("%s/%s@%s -> %s/%s" % (src_project, src_package, src_rev, target_project, target_package))
return None
@ -812,7 +812,12 @@ class ReviewBot(object):
return False
def request_age_wait(self, age_min=None, request=None, target_project=None):
def request_age_wait(
self,
age_min: Optional[Union[str, int, float]] = None,
request=None,
target_project: Optional[str] = None
) -> bool:
if not request:
request = self.request

View File

@ -9,6 +9,8 @@ import shutil
import subprocess
import sys
import tempfile
from typing import Optional, Set
from cmdln import CmdlnOptionParser
from lxml import etree as ET
@ -42,12 +44,12 @@ class CheckSource(ReviewBot.ReviewBot):
self.skip_add_reviews = False
def target_project_config(self, project):
def target_project_config(self, project: str) -> None:
# Load project config and allow for remote entries.
config = Config.get(self.apiurl, project)
self.single_action_require = str2bool(config.get('check-source-single-action-require', 'False'))
self.ignore_devel = not str2bool(config.get('devel-project-enforce', 'False'))
self.ignore_devel: bool = not str2bool(config.get('devel-project-enforce', 'False'))
self.in_air_rename_allow = str2bool(config.get('check-source-in-air-rename-allow', 'False'))
self.add_review_team = str2bool(config.get('check-source-add-review-team', 'True'))
self.review_team = config.get('review-team')
@ -57,11 +59,11 @@ class CheckSource(ReviewBot.ReviewBot):
self.devel_whitelist = config.get('devel-whitelist', '').split()
self.skip_add_reviews = False
self.ensure_source_exist_in_baseproject = str2bool(config.get('check-source-ensure-source-exist-in-baseproject', 'False'))
self.devel_baseproject = config.get('check-source-devel-baseproject', '')
self.devel_baseproject: str = config.get('check-source-devel-baseproject', '')
self.allow_source_in_sle = str2bool(config.get('check-source-allow-source-in-sle', 'True'))
self.sle_project_to_check = config.get('check-source-sle-project', '')
self.allow_valid_source_origin = str2bool(config.get('check-source-allow-valid-source-origin', 'False'))
self.valid_source_origins = set(config.get('check-source-valid-source-origins', '').split(' '))
self.valid_source_origins: Set[str] = set(config.get('check-source-valid-source-origins', '').split(' '))
self.add_devel_project_review = str2bool(config.get('check-source-add-devel-project-review', 'False'))
self.allowed_scm_submission_sources = config.get('allowed-scm-submission-sources', '').split()
@ -78,7 +80,7 @@ class CheckSource(ReviewBot.ReviewBot):
# It might make sense to supersede maintbot, but for now.
self.skip_add_reviews = True
def is_good_name(self, package, target_package):
def is_good_name(self, package: Optional[str], target_package: Optional[str]) -> bool:
self.logger.debug(f"is_good_name {package} <-> {target_package}")
if target_package is None:
# if the name doesn't matter, existance is all
@ -114,7 +116,14 @@ class CheckSource(ReviewBot.ReviewBot):
return ret
def check_source_submission(self, source_project, source_package, source_revision, target_project, target_package):
def check_source_submission(
self,
source_project: str,
source_package: str,
source_revision: str,
target_project: str,
target_package: str
) -> bool:
super(CheckSource, self).check_source_submission(source_project,
source_package, source_revision, target_project, target_package)
self.target_project_config(target_project)
@ -762,7 +771,7 @@ class CommandLineInterface(ReviewBot.CommandLineInterface):
ReviewBot.CommandLineInterface.__init__(self, args, kwargs)
self.clazz = CheckSource
def get_optparser(self):
def get_optparser(self) -> CmdlnOptionParser:
parser = ReviewBot.CommandLineInterface.get_optparser(self)
parser.add_option('--skip-add-reviews', action='store_true', default=False,

View File

@ -1,5 +1,6 @@
#!/usr/bin/python3
from typing import Literal, Optional, Tuple, Union
from osclib.core import devel_project_get
from osclib.core import package_source_hash
from osclib.core import package_kind
@ -77,7 +78,7 @@ class OriginManager(ReviewBot.ReviewBot):
return True
def check_source_submission(self, src_project, src_package, src_rev, tgt_project, tgt_package):
def check_source_submission(self, src_project, src_package, src_rev, tgt_project, tgt_package) -> Optional[bool]:
kind = package_kind(self.apiurl, tgt_project, tgt_package)
if not (kind is None or kind == 'source'):
self.review_messages['accepted'] = 'skipping {} package since not source'.format(kind)
@ -118,7 +119,16 @@ class OriginManager(ReviewBot.ReviewBot):
source_hash_new, source_hash_old)
return self.policy_result_handle(tgt_project, tgt_package, origin_info_new, origin_info_old, result)
def config_validate(self, target_project):
def config_validate(self, target_project: str) -> Tuple[bool, bool]:
"""Check the settings ``OSRT:OriginConfig`` of the target project and
return a tuple of booleans. The first boolean indicates whether the
action should proceed and the second whether the config is valid.
This function checks whether the value
``OSRT:OriginConfig.fallback-group`` is present and whether
``OSRT:OriginConfig.review-user`` matches the configured review_user.
"""
config = config_load(self.apiurl, target_project)
if not config:
# No perfect solution for lack of a config. For normal projects a
@ -138,7 +148,11 @@ class OriginManager(ReviewBot.ReviewBot):
return True, True
def devel_project_simulate_check(self, source_project, target_project):
def devel_project_simulate_check(
self,
source_project: str,
target_project: str
) -> Union[Tuple[str, str], Tuple[Literal[False], Literal[None]]]:
config = config_load(self.apiurl, target_project)
origin_list = config_origin_list(config, self.apiurl, target_project, skip_workarounds=True)
@ -157,7 +171,11 @@ class OriginManager(ReviewBot.ReviewBot):
return False, None
def devel_project_simulate_check_command(self, source_project, target_project):
def devel_project_simulate_check_command(
self,
source_project: str,
target_project: str
) -> Union[Tuple[str, Optional[str]], Tuple[Literal[False], Literal[None]]]:
who_allowed = self.request_override_check_users(target_project)
if self.request.creator not in who_allowed:
who_allowed.append(self.request.creator)
@ -168,7 +186,7 @@ class OriginManager(ReviewBot.ReviewBot):
return False, None
def policy_result_handle(self, project, package, origin_info_new, origin_info_old, result):
def policy_result_handle(self, project, package, origin_info_new, origin_info_old, result: PolicyResult) -> Optional[bool]:
if result.wait and not result.accept:
result.comments.append(f'Decision may be overridden via `@{self.review_user} override`.')

View File

@ -1,28 +1,49 @@
from datetime import datetime
from typing import TYPE_CHECKING, Any, Dict, Generator, List, Literal, Optional, Tuple, TypedDict, Union
from dateutil.parser import parse as date_parse
import re
if TYPE_CHECKING:
import xml.etree.ElementTree as ET
else:
from lxml import etree as ET
from osc.core import http_DELETE
from osc.core import http_GET
from osc.core import http_POST
from osc.connection import http_DELETE
from osc.connection import http_GET
from osc.connection import http_POST
from osc.core import makeurl
class _BaseComment(TypedDict):
who: Optional[str]
when: datetime
parent: Optional[Any]
comment: Optional[str]
class Comment(_BaseComment):
id: Optional[str]
class RequestAsComment(_BaseComment):
id: Literal['-1']
class CommentAPI(object):
COMMENT_MARKER_REGEX = re.compile(r'<!-- (?P<bot>[^ ]+)(?P<info>(?: [^= ]+=[^ ]+)*) -->')
def __init__(self, apiurl):
self.apiurl = apiurl
def _prepare_url(self, request_id=None, project_name=None,
package_name=None, query=None):
def _prepare_url(self, request_id=None, project_name: Optional[str] = None,
package_name: Optional[str] = None,
query: Optional[Union[List[str], Dict[str, str]]] = None
) -> str:
"""Prepare the URL to get/put comments in OBS.
:param request_id: Request where to refer the comment.
:param project_name: Project name where to refer comment.
:param package_name: Package name where to refer the comment.
:returns: Formated URL for the request.
:returns: Formatted URL for the request.
"""
url = None
if request_id:
@ -36,21 +57,20 @@ class CommentAPI(object):
raise ValueError('Please, set request_id, project_name or / and package_name to add a comment.')
return url
def _comment_as_dict(self, comment_element):
def _comment_as_dict(self, comment_element: ET.Element) -> Comment:
"""Convert an XML element comment into a dictionary.
:param comment_element: XML element that store a comment.
:returns: A Python dictionary object.
"""
comment = {
return {
'who': comment_element.get('who'),
'when': datetime.strptime(comment_element.get('when'), '%Y-%m-%d %H:%M:%S %Z'),
'when': datetime.strptime(comment_element.get('when', ''), '%Y-%m-%d %H:%M:%S %Z'),
'id': comment_element.get('id'),
'parent': comment_element.get('parent', None),
'comment': comment_element.text,
}
return comment
def request_as_comment_dict(self, request):
def request_as_comment_dict(self, request) -> RequestAsComment:
return {
'who': request.creator,
'when': date_parse(request.statehistory[0].when),
@ -60,7 +80,7 @@ class CommentAPI(object):
}
def get_comments(self, request_id=None, project_name=None,
package_name=None):
package_name=None) -> Dict[str, Comment]:
"""Get the list of comments of an object in OBS.
:param request_id: Request where to get comments.
@ -106,7 +126,13 @@ class CommentAPI(object):
return c, info
return None, None
def command_find(self, comments, user, command=None, who_allowed=None):
def command_find(
self,
comments: Dict[str, Comment],
user: str,
command: Optional[str] = None,
who_allowed=None
) -> Generator[Tuple[List[str], Optional[str]], None, None]:
"""
Find comment commands with the optional conditions.

View File

@ -1,4 +1,4 @@
from __future__ import print_function
from typing import Dict, Optional
from osc import OscConfigParser
from collections import OrderedDict
@ -172,14 +172,17 @@ DEFAULT = {
#
def str2bool(v):
def str2bool(v: Optional[str]) -> bool:
return (v is not None and v.lower() in ("yes", "true", "t", "1"))
class Config(object):
"""Helper class to configuration file."""
"""Helper class for reading the osc configuration file and fetching the
remote config from the ``OSRT:config`` attribute in the target project.
def __init__(self, apiurl, project):
"""
def __init__(self, apiurl: str, project: str) -> None:
self.project = project
self.remote_values = self.fetch_remote(apiurl)
@ -191,7 +194,7 @@ class Config(object):
@staticmethod
@memoize(session=True) # Allow reset by memoize_session_reset() for ReviewBot.
def get(apiurl, project):
def get(apiurl: str, project: str):
"""Cached version for directly accessing project config."""
# Properly handle loading the config for interconnect projects.
from osclib.core import project_remote_apiurl
@ -204,7 +207,7 @@ class Config(object):
def conf(self):
return conf
def populate_conf(self):
def populate_conf(self) -> None:
"""Add sane default into the configuration and layer (defaults, remote, ~/.oscrc)."""
defaults = {}
default_ordered = OrderedDict(sorted(DEFAULT.items(), key=lambda i: int(i[1].get('_priority', 99))))
@ -244,12 +247,16 @@ class Config(object):
else:
return defaults
def fetch_remote(self, apiurl):
def fetch_remote(self, apiurl: str) -> Optional[Dict[str, str]]:
"""Fetch the configuration from the ``OSRT`` attribute namespace for the
current project from the OBS instance with the given apiurl.
"""
from osclib.core import attribute_value_load
config = attribute_value_load(apiurl, self.project, 'Config')
if config:
cp = OscConfigParser.OscConfigParser()
config = u'[remote]\n' + config
config = u'[remote]\n' + str(config)
cp.read_string(config)
return dict(cp.items('remote'))

View File

@ -4,18 +4,18 @@ from dateutil.parser import parse as date_parse
import re
import socket
import logging
from typing import List, Literal, Optional, Tuple, Union
from lxml import etree as ET
from urllib.error import HTTPError
from typing import Optional
from osc.core import create_submit_request
from osc.core import ReviewState, create_submit_request
from osc.core import get_binarylist
from osc.core import get_commitlog
from osc.core import get_dependson
from osc.core import http_DELETE
from osc.core import http_GET
from osc.core import http_POST
from osc.core import http_PUT
from osc.connection import http_DELETE
from osc.connection import http_GET
from osc.connection import http_POST
from osc.connection import http_PUT
from osc.core import makeurl
from osc.core import owner
from osc.core import search as osc_core_search
@ -279,7 +279,13 @@ def binary_src_debug(binary):
@memoize(session=True)
def devel_project_get(apiurl: str, target_project: str, target_package: str):
def devel_project_get(apiurl: str, target_project: str, target_package: str) -> Union[Tuple[str, str], Tuple[Literal[None], Literal[None]]]:
"""Fetch the devel project & package name of the supplied ``target_project``
and ``target_package`` and return the devel project and devel package name
as a tuple. If no devel project has been defined, then return ``None,
None``.
"""
try:
meta = ET.fromstringlist(show_package_meta(apiurl, target_project, target_package))
node = meta.find('devel')
@ -468,7 +474,22 @@ def package_list_kind_filtered(apiurl, project, kinds_allowed=['source']):
yield package
def attribute_value_load(apiurl: str, project: str, name: str, namespace='OSRT', package: Optional[str] = None):
def attribute_value_load(
apiurl: str,
project: str,
name: str,
namespace: str = 'OSRT',
package: Optional[str] = None) -> Optional[Union[str, Literal[True]]]:
"""Fetch an attribute from the OBS instance with the given ``apiurl`` for
the given ``project`` (and optionally for a package if a package name is
provided). The attribute is expected to have the name ``$namespace:$name``,
with the namespace defaulting to ``OSRT``.
Attributes are stored as xml and can be either strings or are interpreted as
``True`` if only the element is present but has no other children or
entries.
"""
path = list(filter(None, ['source', project, package, '_attribute', namespace + ':' + name]))
url = makeurl(apiurl, path)
@ -506,7 +527,7 @@ def attribute_value_save(
value: str,
namespace='OSRT',
package: Optional[str] = None
):
) -> None:
root = ET.Element('attributes')
attribute = ET.SubElement(root, 'attribute')
@ -525,7 +546,7 @@ def attribute_value_save(
raise e
def attribute_value_delete(apiurl: str, project: str, name: str, namespace='OSRT', package: Optional[str] = None):
def attribute_value_delete(apiurl: str, project: str, name: str, namespace='OSRT', package: Optional[str] = None) -> None:
http_DELETE(makeurl(
apiurl, list(filter(None, ['source', project, package, '_attribute', namespace + ':' + name]))))
@ -661,7 +682,12 @@ def package_source_age(apiurl, project, package):
return datetime.utcnow() - package_source_changed(apiurl, project, package)
def entity_exists(apiurl, project, package=None):
def entity_exists(apiurl: str, project: str, package: Optional[str] = None) -> bool:
"""Check whether the project or the project/package exists on the OBS
instance behind the supplied apiurl and return True if it exists or False
otherwise.
"""
try:
http_GET(makeurl(apiurl, list(filter(None, ['source', project, package])) + ['_meta']))
except HTTPError as e:
@ -673,7 +699,9 @@ def entity_exists(apiurl, project, package=None):
return True
def package_kind(apiurl, project, package):
def package_kind(
apiurl, project, package
) -> Optional[Literal['meta', 'multibuild_subpackage', 'patchinfo', 'maintenance_update', 'multispec_subpackage', 'source']]:
if package.startswith('00') or package.startswith('_'):
return 'meta'
@ -900,7 +928,7 @@ def review_find_last(request, user, states=['all']):
return None
def reviews_remaining(request, incident_psuedo=False):
def reviews_remaining(request: Request, incident_psuedo=False) -> List[ReviewState]:
reviews = []
for review in request.reviews:
if review.state != 'accepted':
@ -939,7 +967,7 @@ def issue_trackers(apiurl):
return trackers
def issue_tracker_by_url(apiurl, tracker_url):
def issue_tracker_by_url(apiurl: str, tracker_url: str) -> Optional[str]:
url = makeurl(apiurl, ['issue_trackers'])
root = ET.parse(http_GET(url)).getroot()
if not tracker_url.endswith('/'):
@ -952,7 +980,7 @@ def issue_tracker_label_apply(tracker, identifier):
return tracker.find('label').text.replace('@@@', identifier)
def request_remote_identifier(apiurl, apiurl_remote, request_id):
def request_remote_identifier(apiurl: str, apiurl_remote: str, request_id: str) -> str:
if apiurl_remote == apiurl:
return 'request#{}'.format(request_id)
@ -1007,7 +1035,7 @@ def action_is_patchinfo(action):
action.src_package == 'patchinfo' or action.src_package.startswith('patchinfo.')))
def request_action_key(action):
def request_action_key(action: Action) -> str:
identifier = []
if action.type in ['add_role', 'change_devel', 'maintenance_release', 'set_bugowner', 'submit']:

View File

@ -1,6 +1,7 @@
from copy import deepcopy
from collections import namedtuple
import logging
from typing import Any, Dict, Generator, List, Literal, NamedTuple, Optional, Tuple, TypedDict, Union
from osclib.conf import Config
from osclib.conf import str2bool
from osclib.core import attribute_value_load
@ -38,7 +39,26 @@ DEFAULTS = {
'fallback-group': '<config:origin-manager-fallback-group>',
'fallback-workaround': {},
}
POLICY_DEFAULTS = {
class Policy(TypedDict):
additional_reviews: List[Any]
automatic_updates: bool
automatic_updates_initial: bool
automatic_updates_supersede: bool
automatic_updates_delay: int
automatic_updates_frequency: int
maintainer_review_always: bool
maintainer_review_initial: bool
pending_submission_allow: bool
pending_submission_consider: bool
pending_submission_allowed_reviews: List[str]
# Submit pending requests with a set of allowed reviews, but still wait for
# the above reviews before being accepted.
pending_submission_allowed_reviews_update: List[str]
POLICY_DEFAULTS: Policy = {
'additional_reviews': [],
'automatic_updates': True,
'automatic_updates_initial': False,
@ -84,7 +104,13 @@ def config_load(apiurl, project):
return config_resolve(apiurl, project, yaml.safe_load(config))
def config_origin_generator(origins, apiurl=None, project=None, package=None, skip_workarounds=False):
def config_origin_generator(
origins,
apiurl: Optional[str] = None,
project: Optional[str] = None,
package: Optional[str] = None,
skip_workarounds=False
) -> Generator[Tuple[str, Any], None, None]:
for origin_item in origins:
for origin, values in origin_item.items():
is_workaround = origin_workaround_check(origin)
@ -103,7 +129,7 @@ def config_origin_generator(origins, apiurl=None, project=None, package=None, sk
break # Only support single value inside list item.
def config_resolve(apiurl, project, config):
def config_resolve(apiurl: str, project: str, config):
defaults = POLICY_DEFAULTS.copy()
defaults_workarounds = POLICY_DEFAULTS.copy()
@ -222,17 +248,17 @@ def config_resolve_apply(config, values_apply, key=None, workaround=False, until
values.update(values_apply)
def origin_workaround_check(origin):
def origin_workaround_check(origin: str) -> bool:
return origin.endswith('~')
def origin_workaround_ensure(origin):
def origin_workaround_ensure(origin: str) -> str:
if not origin_workaround_check(origin):
return origin + '~'
return origin
def origin_workaround_strip(origin):
def origin_workaround_strip(origin: str) -> str:
if origin_workaround_check(origin):
return origin[:-1]
return origin