diff --git a/osclib/origin.py b/osclib/origin.py index b14a0378..131fe3f7 100644 --- a/osclib/origin.py +++ b/osclib/origin.py @@ -1,7 +1,8 @@ from copy import deepcopy -from collections import namedtuple import logging from typing import Any, Dict, Generator, List, Literal, NamedTuple, Optional, Tuple, TypedDict, Union + +from osc.core import ReviewState from osclib.conf import Config from osclib.conf import str2bool from osclib.core import attribute_value_load @@ -83,9 +84,22 @@ POLICY_DEFAULTS: Policy = { ], } -OriginInfo = namedtuple('OriginInfo', ['project', 'pending']) -PendingRequestInfo = namedtuple('PendingRequestInfo', ['identifier', 'reviews_remaining']) -PolicyResult = namedtuple('PolicyResult', ['wait', 'accept', 'reviews', 'comments']) + +class PendingRequestInfo(NamedTuple): + identifier: str + reviews_remaining: List[ReviewState] + + +class OriginInfo(NamedTuple): + project: str + pending: PendingRequestInfo + + +class PolicyResult(NamedTuple): + wait: bool + accept: bool + reviews: Dict[str, str] + comments: List[str] def origin_info_str(self): @@ -266,7 +280,7 @@ def origin_workaround_strip(origin: str) -> str: @memoize(session=True) def origin_find(apiurl, target_project, package, source_hash=None, current=False, - pending_allow=True, fallback=True): + pending_allow=True, fallback=True) -> Optional[OriginInfo]: config = config_load(apiurl, target_project) if not source_hash: @@ -316,7 +330,7 @@ def project_source_contain(apiurl, project, package, source_hash): return False -def project_source_pending(apiurl, project, package, source_hash): +def project_source_pending(apiurl, project, package, source_hash) -> Union[PendingRequestInfo, Literal[False]]: apiurl_remote, project_remote = project_remote_apiurl(apiurl, project) request_actions = request_action_list_source(apiurl_remote, project_remote, package, states=['new', 'review'], include_release=True) @@ -435,7 +449,7 @@ def origin_find_highest(apiurl, project, package): def policy_evaluate(apiurl, project, package, origin_info_new, origin_info_old, - source_hash_new, source_hash_old): + source_hash_new, source_hash_old) -> PolicyResult: if origin_info_new is None: config = config_load(apiurl, project) origins = config_origin_list(config, apiurl, project, package, True) @@ -536,7 +550,7 @@ def policy_input_calculate(apiurl, project, package, return inputs -def policy_input_evaluate(policy, inputs): +def policy_input_evaluate(policy, inputs) -> PolicyResult: result = PolicyResult(False, True, {}, []) if inputs['new_package']: