from datetime import datetime from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Tuple, Union try: from typing import Literal, TypedDict except ImportError: from typing_extensions import Literal, TypedDict from dateutil.parser import parse as date_parse import re if TYPE_CHECKING: import xml.etree.ElementTree as ET else: from lxml import etree as ET from osc.connection import http_DELETE from osc.connection import http_GET from osc.connection import http_POST from osc.core import makeurl class _BaseComment(TypedDict): who: Optional[str] when: datetime parent: Optional[Any] comment: Optional[str] class Comment(_BaseComment): id: Optional[str] class RequestAsComment(_BaseComment): id: Literal['-1'] class CommentAPI(object): COMMENT_MARKER_REGEX = re.compile(r'') def __init__(self, apiurl): self.apiurl = apiurl def _prepare_url(self, request_id=None, project_name: Optional[str] = None, package_name: Optional[str] = None, query: Optional[Union[List[str], Dict[str, str]]] = None ) -> str: """Prepare the URL to get/put comments in OBS. :param request_id: Request where to refer the comment. :param project_name: Project name where to refer comment. :param package_name: Package name where to refer the comment. :returns: Formatted URL for the request. """ url = None if request_id: url = makeurl(self.apiurl, ['comments', 'request', request_id], query) elif project_name and package_name: url = makeurl(self.apiurl, ['comments', 'package', project_name, package_name], query) elif project_name: url = makeurl(self.apiurl, ['comments', 'project', project_name], query) else: raise ValueError('Please, set request_id, project_name or / and package_name to add a comment.') return url def _comment_as_dict(self, comment_element: ET.Element) -> Comment: """Convert an XML element comment into a dictionary. :param comment_element: XML element that store a comment. :returns: A Python dictionary object. """ return { 'who': comment_element.get('who'), 'when': datetime.strptime(comment_element.get('when', ''), '%Y-%m-%d %H:%M:%S %Z'), 'id': comment_element.get('id'), 'parent': comment_element.get('parent', None), 'comment': comment_element.text, } def request_as_comment_dict(self, request) -> RequestAsComment: return { 'who': request.creator, 'when': date_parse(request.statehistory[0].when), 'id': '-1', 'parent': None, 'comment': request.description, } def get_comments(self, request_id=None, project_name=None, package_name=None) -> Dict[str, Comment]: """Get the list of comments of an object in OBS. :param request_id: Request where to get comments. :param project_name: Project name where to get comments. :param package_name: Package name where to get comments. :returns: A list of comments (as a dictionary). """ url = self._prepare_url(request_id, project_name, package_name) root = root = ET.parse(http_GET(url)).getroot() comments = {} for c in root.findall('comment'): c = self._comment_as_dict(c) comments[c['id']] = c return comments def comment_find(self, comments, bot, info_match=None): """Return previous bot comments that match criteria.""" # Case-insensitive for backwards compatibility. bot = bot.lower() for c in comments.values(): m = self.COMMENT_MARKER_REGEX.match(c['comment']) if m and bot == m.group('bot').lower(): info = {} # Python base regex does not support repeated subgroup capture # so parse the optional info using string split. stripped = m.group('info').strip() if stripped: for pair in stripped.split(' '): key, value = pair.split('=') info[key] = value # Skip if info does not match. if info_match: match = True for key, value in info_match.items(): if not (value is None or (key in info and info[key] == value)): match = False break if not match: continue return c, info return None, None def command_find( self, comments: Dict[str, Comment], user: str, command: Optional[str] = None, who_allowed=None ) -> Generator[Tuple[List[str], Optional[str]], None, None]: """ Find comment commands with the optional conditions. Usage (in comment): @ [args...] """ command_re = re.compile(r'^@(?P[^ ,:]+)[,:]? (?P.*)$', re.MULTILINE) # Search for commands in the order the comment was created. for comment in sorted(comments.values(), key=lambda c: c['when']): if who_allowed and comment['who'] not in who_allowed: continue # Handle stupid line endings returned in comments. match = command_re.search(comment['comment'].replace('\r', '')) if not match: continue if match.group('user') != user: continue args = match.group('args').strip().split(' ') if command and (args[0] or None) != command: continue yield args, comment['who'] def add_marker(self, comment, bot, info=None): """Add bot marker to comment that can be used to find comment.""" if info: infos = [] for key, value in info.items(): infos.append('='.join((str(key), str(value)))) marker = f"" return marker + '\n\n' + comment def remove_marker(self, comment): if comment.startswith('