195 lines
7.6 KiB
Python
Raw Normal View History

2014-05-20 11:57:23 +02:00
# Copyright (C) 2014 SUSE Linux Products GmbH
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
2014-05-23 09:36:21 +02:00
2014-05-20 11:57:23 +02:00
from datetime import datetime
import re
from xml.etree import cElementTree as ET
from osc.core import http_DELETE
from osc.core import http_GET
from osc.core import http_POST
from osc.core import makeurl
class CommentAPI(object):
COMMENT_MARKER_REGEX = re.compile(r'<!-- (?P<bot>[^ ]+)(?P<info>(?: [^= ]+=[^ ]+)*) -->')
2014-05-20 11:57:23 +02:00
def __init__(self, apiurl):
self.apiurl = apiurl
def _prepare_url(self, request_id=None, project_name=None,
package_name=None, query=None):
2014-05-20 11:57:23 +02:00
"""Prepare the URL to get/put comments in OBS.
:param request_id: Request where to refer the comment.
:param project_name: Project name where to refer comment.
:param package_name: Package name where to refer the comment.
:returns: Formated URL for the request.
"""
url = None
if request_id:
url = makeurl(self.apiurl, ['comments', 'request', request_id], query)
2014-05-20 11:57:23 +02:00
elif project_name and package_name:
url = makeurl(self.apiurl, ['comments', 'package', project_name,
package_name], query)
2014-05-20 11:57:23 +02:00
elif project_name:
url = makeurl(self.apiurl, ['comments', 'project', project_name], query)
2014-05-20 11:57:23 +02:00
else:
raise ValueError('Please, set request_id, project_name or / and package_name to add a comment.')
return url
def _comment_as_dict(self, comment_element):
"""Convert an XML element comment into a dictionary.
:param comment_element: XML element that store a comment.
:returns: A Python dictionary object.
"""
comment = {
'who': comment_element.get('who'),
'when': datetime.strptime(comment_element.get('when'), '%Y-%m-%d %H:%M:%S %Z'),
'id': comment_element.get('id'),
'parent': comment_element.get('parent', None),
2014-05-20 11:57:23 +02:00
'comment': comment_element.text,
}
return comment
def get_comments(self, request_id=None, project_name=None,
package_name=None):
"""Get the list of comments of an object in OBS.
:param request_id: Request where to get comments.
:param project_name: Project name where to get comments.
:param package_name: Package name where to get comments.
:returns: A list of comments (as a dictionary).
"""
url = self._prepare_url(request_id, project_name, package_name)
root = root = ET.parse(http_GET(url)).getroot()
comments = {}
for c in root.findall('comment'):
c = self._comment_as_dict(c)
comments[c['id']] = c
2014-05-20 11:57:23 +02:00
return comments
def comment_find(self, comments, bot, info_match=None):
"""Return previous bot comments that match criteria."""
# Case-insensitive for backwards compatibility.
bot = bot.lower()
for c in comments.values():
m = self.COMMENT_MARKER_REGEX.match(c['comment'])
if m and bot == m.group('bot').lower():
info = {}
# Python base regex does not support repeated subgroup capture
# so parse the optional info using string split.
stripped = m.group('info').strip()
if stripped:
for pair in stripped.split(' '):
key, value = pair.split('=')
info[key] = value
# Skip if info does not match.
if info_match:
match = True
for key, value in info_match.items():
if not(value is None or (key in info and info[key] == value)):
match = False
break
if not match:
continue
return c, info
return None, None
def add_marker(self, comment, bot, info=None):
"""Add bot marker to comment that can be used to find comment."""
if info:
infos = []
for key, value in info.items():
infos.append('='.join((str(key), str(value))))
marker = '<!-- {}{} -->'.format(bot, ' ' + ' '.join(infos) if info else '')
return marker + '\n\n' + comment
2014-05-20 11:57:23 +02:00
def add_comment(self, request_id=None, project_name=None,
package_name=None, comment=None):
"""Add a comment in an object in OBS.
:param request_id: Request where to write a comment.
:param project_name: Project name where to write a comment.
:param package_name: Package name where to write a comment.
:param comment: Comment to be published.
:return: Comment id.
"""
if not comment:
raise ValueError('Empty comment.')
url = self._prepare_url(request_id, project_name, package_name)
return http_POST(url, data=comment)
def delete(self, comment_id):
"""Remove a comment object.
:param comment_id: Id of the comment object.
"""
url = makeurl(self.apiurl, ['comment', comment_id])
2014-05-20 11:57:23 +02:00
return http_DELETE(url)
def delete_children(self, comments):
"""Removes the comments that have no childs
2014-05-23 11:51:51 +02:00
:param comments dict of id->comment dict
:return same hash without the deleted comments
2014-05-23 11:51:51 +02:00
"""
parents = []
for comment in comments.values():
if comment['parent']:
2014-05-23 11:51:51 +02:00
parents.append(comment['parent'])
2014-05-26 10:31:52 +02:00
for id_ in comments.keys():
2014-05-23 11:51:51 +02:00
if id_ not in parents:
self.delete(id_)
del comments[id_]
return comments
2014-05-20 11:57:23 +02:00
def delete_from(self, request_id=None, project_name=None,
package_name=None):
"""Remove the comments related with a request, project or package.
:param request_id: Request where to remove comments.
:param project_name: Project name where to remove comments.
:param package_name: Package name where to remove comments.
:return: Number of comments removed.
"""
comments = self.get_comments(request_id, project_name, package_name)
while comments:
comments = self.delete_children(comments)
return True
2014-05-20 11:57:23 +02:00
def delete_from_where_user(self, user, request_id=None, project_name=None,
package_name=None):
2014-05-20 11:57:23 +02:00
"""Remove comments where @user is mentioned.
This method is used to remove notifications when a request is
removed or moved to another project.
:param user: User name where the comment will be removed.
:param request_id: Request where to remove comments.
:param project_name: Project name where to remove comments.
:param package_name: Package name where to remove comments.
:return: Number of comments removed.
"""
2015-05-13 13:28:06 +02:00
for comment in self.get_comments(request_id, project_name, package_name).values():
if comment['who'] == user:
2014-05-20 11:57:23 +02:00
self.delete(comment['id'])