from datetime import datetime from dateutil.parser import parse as date_parse import re from lxml import etree as ET from osc.core import http_DELETE from osc.core import http_GET from osc.core import http_POST from osc.core import makeurl class CommentAPI(object): COMMENT_MARKER_REGEX = re.compile(r'') def __init__(self, apiurl): self.apiurl = apiurl def _prepare_url(self, request_id=None, project_name=None, package_name=None, query=None): """Prepare the URL to get/put comments in OBS. :param request_id: Request where to refer the comment. :param project_name: Project name where to refer comment. :param package_name: Package name where to refer the comment. :returns: Formated URL for the request. """ url = None if request_id: url = makeurl(self.apiurl, ['comments', 'request', request_id], query) elif project_name and package_name: url = makeurl(self.apiurl, ['comments', 'package', project_name, package_name], query) elif project_name: url = makeurl(self.apiurl, ['comments', 'project', project_name], query) else: raise ValueError('Please, set request_id, project_name or / and package_name to add a comment.') return url def _comment_as_dict(self, comment_element): """Convert an XML element comment into a dictionary. :param comment_element: XML element that store a comment. :returns: A Python dictionary object. """ comment = { 'who': comment_element.get('who'), 'when': datetime.strptime(comment_element.get('when'), '%Y-%m-%d %H:%M:%S %Z'), 'id': comment_element.get('id'), 'parent': comment_element.get('parent', None), 'comment': comment_element.text, } return comment def request_as_comment_dict(self, request): return { 'who': request.creator, 'when': date_parse(request.statehistory[0].when), 'id': '-1', 'parent': None, 'comment': request.description, } def get_comments(self, request_id=None, project_name=None, package_name=None): """Get the list of comments of an object in OBS. :param request_id: Request where to get comments. :param project_name: Project name where to get comments. :param package_name: Package name where to get comments. :returns: A list of comments (as a dictionary). """ url = self._prepare_url(request_id, project_name, package_name) root = root = ET.parse(http_GET(url)).getroot() comments = {} for c in root.findall('comment'): c = self._comment_as_dict(c) comments[c['id']] = c return comments def comment_find(self, comments, bot, info_match=None): """Return previous bot comments that match criteria.""" # Case-insensitive for backwards compatibility. bot = bot.lower() for c in comments.values(): m = self.COMMENT_MARKER_REGEX.match(c['comment']) if m and bot == m.group('bot').lower(): info = {} # Python base regex does not support repeated subgroup capture # so parse the optional info using string split. stripped = m.group('info').strip() if stripped: for pair in stripped.split(' '): key, value = pair.split('=') info[key] = value # Skip if info does not match. if info_match: match = True for key, value in info_match.items(): if not (value is None or (key in info and info[key] == value)): match = False break if not match: continue return c, info return None, None def command_find(self, comments, user, command=None, who_allowed=None): """ Find comment commands with the optional conditions. Usage (in comment): @ [args...] """ command_re = re.compile(r'^@(?P[^ ,:]+)[,:]? (?P.*)$', re.MULTILINE) # Search for commands in the order the comment was created. for comment in sorted(comments.values(), key=lambda c: c['when']): if who_allowed and comment['who'] not in who_allowed: continue # Handle stupid line endings returned in comments. match = command_re.search(comment['comment'].replace('\r', '')) if not match: continue if match.group('user') != user: continue args = match.group('args').strip().split(' ') if command and (args[0] or None) != command: continue yield args, comment['who'] def add_marker(self, comment, bot, info=None): """Add bot marker to comment that can be used to find comment.""" if info: infos = [] for key, value in info.items(): infos.append('='.join((str(key), str(value)))) marker = ''.format(bot, ' ' + ' '.join(infos) if info else '') return marker + '\n\n' + comment def remove_marker(self, comment): if comment.startswith('