From 37eab408ed9977a26b0dd5c670786f7c47e0da1b Mon Sep 17 00:00:00 2001 From: Michal Hrusecky Date: Fri, 7 Mar 2014 13:58:33 +0100 Subject: [PATCH] Try to implement more package handling --- tests/obs.py | 156 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 112 insertions(+), 44 deletions(-) diff --git a/tests/obs.py b/tests/obs.py index 787488b7..35acce72 100644 --- a/tests/obs.py +++ b/tests/obs.py @@ -9,6 +9,7 @@ import sys import httpretty import xml.etree.ElementTree as ET import time +import types from string import Template import oscs @@ -81,11 +82,21 @@ class OBS(object): 'prj': 'openSUSE:Factory:Staging:B', 'pkg': 'wine', 'devprj': 'home:Admin' } } + self.meta_data = { + } self.pkg_data = { 'home:Admin/gcc': {'rev': '1', 'vrev': '1', 'name': 'gcc', 'srcmd5': 'de7a9f5e3bedb01980465f3be3d236cb'}, 'home:Admin/wine': {'rev': '1', 'vrev': '1', 'name': 'wine', - 'srcmd5': 'de7a9f5e3bedb01980465f3be3d236cb'} + 'srcmd5': 'de7a9f5e3bedb01980465f3be3d236cb'}, + 'openSUSE:Factory/gcc': {'rev': '1', 'vrev': '1', 'name': 'gcc', + 'srcmd5': 'de7a9f5e3bedb01980465f3be3d236cb'}, + 'openSUSE:Factory/wine': {'rev': '1', 'vrev': '1', 'name': 'wine', + 'srcmd5': 'de7a9f5e3bedb01980465f3be3d236cb'}, + 'openSUSE:Factory:Rings:0-Bootstrap/elem-ring0': {'rev': '1', 'vrev': '1', 'name': 'elem-ring0', + 'srcmd5': 'de7a9f5e3bedb01980465f3be3d236cb'}, + 'openSUSE:Factory/binutils': {'rev': '1', 'vrev': '1', 'name': 'wine', + 'srcmd5': 'de7a9f5e3bedb01980465f3be3d236cb'} } def _clear_responses(self): @@ -106,6 +117,10 @@ class OBS(object): self._pkg_sources() # Build results self._build_results() + # List factory + self._factory_list() + # Project freeze + self._project_freeze() # Workaround self._ugly_hack() @@ -132,38 +147,43 @@ class OBS(object): reply = self.responses[request.method][path] # We have something to reply with if reply: - # It's a dict, therefore there is return code as well - if isinstance(reply, dict): - ret_code = reply['status'] - reply = reply['reply'] - else: - ret_code = 200 - # It's a list, so take the first - if isinstance(reply, list): - reply = reply.pop(0) - # It's string - if isinstance(reply, string_types): - # It's XML - if reply.startswith('<'): - return (ret_code, headers, reply) - # It's fixture - else: - return (ret_code, headers, self._get_fixture_content(reply)) - # All is left is callback function - else: - return (ret_code, headers, reply(self.responses, request, uri)) - # No possible response found - else: - if len(path) == 0: - path = uri - if len(path) > 1: - ret = self._pretty_callback(request, 'https://localhost' + posixpath.dirname(path), headers, False) - if ret: - return ret - if exception: - raise BaseException("No response for {} on {} provided".format(request.method, uri)) - else: + def get_reply(reply): + # It's a dict, therefore there is return code as well + if isinstance(reply, dict): + ret_code = reply['status'] + reply = get_reply(reply['reply']) + if isinstance(reply, list): + reply[0] = ret_code + return reply + # It's a list, so take the first + if isinstance(reply, list): + return get_reply(reply.pop(0)) + # It's string + if isinstance(reply, string_types): + # It's XML + if reply.startswith('<'): + return (200, headers, reply) + # It's fixture + else: + return (200, headers, self._get_fixture_content(reply)) + # All is left is callback function + if callable(reply): + return get_reply(reply(self.responses, request, uri)) return None + reply = get_reply(reply) + if reply: + return reply + # No possible response found + if len(path) == 0: + path = uri + if len(path) > 1: + ret = self._pretty_callback(request, 'https://localhost' + posixpath.dirname(path), headers, False) + if ret: + return ret + if exception: + raise BaseException("No response for {} on {} provided".format(request.method, uri)) + else: + return None def _ugly_hack(self): """ @@ -213,6 +233,26 @@ class OBS(object): self.responses['GET']['/build/openSUSE:Factory:Staging:A/_result'] = build_results self.responses['GET']['/build/openSUSE:Factory:Staging:B/_result'] = build_results + def _factory_list(self): + def factory_list(responses, request, uri): + if 'nofilename' in request.querystring and '1' in request.querystring['nofilename'] and 'view' in request.querystring and 'info' in request.querystring['view']: + ret = '\n' + for pkg in self.pkg_data: + if re.match(r'openSUSE:Factory/', pkg): + ret += ' '.format(self.pkg_data[pkg]['name'], self.pkg_data[pkg]['rev'], self.pkg_data[pkg]['vrev'], self.pkg_data[pkg]['srcmd5']) + ret += '\n' + return ret + else: + raise BaseException("No response for {}".format(uri)) + self.responses['GET']['/source/openSUSE:Factory'] = factory_list + + def _project_freeze(self): + # FIXME: Actually do what is supposed to happen + def project_freeze(responses, request, uri): + return "Ok" + for pr in self.st_project_data: + self.responses['PUT']['/source/openSUSE:Factory:Staging:' + pr + '/_project/_frozenlinks'] = project_freeze + def _project_meta(self): # Load template tmpl = Template(self._get_fixture_content('staging-project-meta.xml')) @@ -267,15 +307,43 @@ class OBS(object): def _pkg_sources(self): def pkg_source(responses, request, uri): - key = re.match(r'.*/source/([^?]+)(\?.*)?', uri).group(1) - return ''.format( - self.pkg_data[key]['name'], - self.pkg_data[key]['rev'], - self.pkg_data[key]['vrev'], - self.pkg_data[key]['srcmd5'] - ) - for pkg in self.pkg_data: - self.responses['GET']['/source/' + pkg] = pkg_source + match = re.match(r'.*/source/([^/]+)/([^?/]+)([/?].*)?', request.path) + if not match: + return { 'status': 404, 'reply': 'Not found' } + key = match.group(1) + '/' + match.group(2) + if match.group(3) == '/_meta': + return self.meta_data[key] + if key in self.pkg_data: + if not self.pkg_data[key]: + return { 'status': 404, 'reply': 'Not found' } + return ''.format( + self.pkg_data[key]['name'], + self.pkg_data[key]['rev'], + self.pkg_data[key]['vrev'], + self.pkg_data[key]['srcmd5'] + ) + return { 'status': 404, 'reply': 'Not found' } + + def pkg_change(responses, request, uri): + match = re.match(r'.*/source/([^/]+)/([^?/]+)([/?].*)?', request.path) + key = match.group(1) + '/' + match.group(2) + if match.group(3) == '/_meta': + self.meta_data[key] = request.body + return request.body + if match.group(3) == '/_aggregate': + xml = ET.fromstring(str(request.body)) + element = xml.findall('aggregate')[0] + dev_prj = element.get('project') + # FIXME get data from linked project + self.pkg_data[key] = { + 'rev': '1', 'vrev': '1', 'name': match.group(2), + 'srcmd5': 'de7a9f5e3bedb01980465f3be3d236cb' + } + return request.body + return "Ok" + + self.responses['GET']['/source'] = pkg_source + self.responses['PUT']['/source'] = pkg_change def _link_sources(self): # Load template @@ -285,12 +353,12 @@ class OBS(object): key = re.match(r'.*/source/([^?]+)(\?.*)?', uri).group(1) del self.responses['GET']['/source/' + str(key)] del self.links_data[str(key)] - return "Ok" + return "Ok" def create_empty(responses, request, uri): key = re.match(r'.*/source/(.+)/_meta', uri).group(1) self.links_data[str(key)] = {} - return "Ok" + return "Ok" def create_link(responses, request, uri): tmpl = Template(self._get_fixture_content('linksource.xml')) @@ -304,7 +372,7 @@ class OBS(object): } self.responses['GET']['/source/' + key] = tmpl.substitute(self.links_data[key]) self.responses['DELETE']['/source/' + key] = delete_link - return "Ok" + return "Ok" # Register methods for requests for link in self.links_data: