#!/usr/bin/env python # -*- coding: utf-8 -*- # # (C) 2014 tchvatal@suse.cz, openSUSE.org # Distribute under GPLv2 or later import os import sys import httpretty import xml.etree.ElementTree as ET import time import types from string import Template import oscs import osc import re import pprint import posixpath PY3 = sys.version_info[0] == 3 if PY3: string_types = str, else: string_types = basestring, class OBS(object): """ Class trying to simulate a simple OBS """ def __init__(self): """ Initialize the configuration and create basic OBS instance """ # Make osc happy about config file oscrc = os.path.join(self._get_fixtures_dir(), 'oscrc') osc.core.conf.get_config(override_conffile=oscrc, override_no_keyring=True, override_no_gnome_keyring=True) os.environ['OSC_CONFIG'] = oscrc # (Re)set configuration self.reset_config() def reset_config(self): """ Resets whole OBS class """ # Initialize states self._set_init_data() # Setup callbacks self._clear_responses() def _set_init_data(self): """ Resets states """ # XXX TODO Write fixtures in an external file, or recreate # this data from other fixtures # Initial request data self.requests_data = { '123': {'request': 'new', 'review': 'accepted', 'who': 'Admin', 'by': 'group', 'id': '123', 'by_who': 'opensuse-review-team', 'package': 'gcc'}, '321': {'request': 'review', 'review': 'new', 'who': 'Admin', 'by': 'group', 'id': '321', 'by_who': 'factory-staging', 'package': 'puppet'}, '333': {'request': 'review', 'review': 'new', 'who': 'Admin', 'by': 'project', 'id': '333', 'by_who': 'openSUSE:Factory:Staging:B', 'package': 'wine'} } self.st_project_data = { 'A': {'project': 'openSUSE:Factory:Staging:A', 'title': '', 'description': ''}, 'U': {'project': 'openSUSE:Factory:Staging:U', 'title': 'Unfrozen', 'description': ''}, 'B': {'project': 'openSUSE:Factory:Staging:B', 'title': 'wine', 'description': 'requests:\n- {id: 333, package: wine}'} } self.links_data = { 'openSUSE:Factory:Staging:B/wine': { 'prj': 'openSUSE:Factory:Staging:B', 'pkg': 'wine', 'devprj': 'home:Admin' } } self.meta_data = { } self.pkg_data = { 'home:Admin/gcc': {'rev': '1', 'vrev': '1', 'name': 'gcc', 'srcmd5': 'de7a9f5e3bedb01980465f3be3d236cb'}, 'home:Admin/wine': {'rev': '1', 'vrev': '1', 'name': 'wine', 'srcmd5': 'de7a9f5e3bedb01980465f3be3d236cb'}, 'openSUSE:Factory/gcc': {'rev': '1', 'vrev': '1', 'name': 'gcc', 'srcmd5': 'de7a9f5e3bedb01980465f3be3d236cb'}, 'openSUSE:Factory/wine': {'rev': '1', 'vrev': '1', 'name': 'wine', 'srcmd5': 'de7a9f5e3bedb01980465f3be3d236cb'}, 'openSUSE:Factory:Rings:0-Bootstrap/elem-ring0': {'rev': '1', 'vrev': '1', 'name': 'elem-ring0', 'srcmd5': 'de7a9f5e3bedb01980465f3be3d236cb'}, 'openSUSE:Factory/binutils': {'rev': '1', 'vrev': '1', 'name': 'wine', 'srcmd5': 'de7a9f5e3bedb01980465f3be3d236cb'} } def _clear_responses(self): """ Resets predefined responses """ self.responses = {'DELETE': {}, 'GET': {}, 'PUT': {}, 'POST': {}, 'ALL': {}} # Add methods to manipulate reviews self._request_review() # Add methods to search requests and projects self._search() # Add methods to work with project metadata self._project_meta() # Add linked packages self._link_sources() # Add packages self._pkg_sources() # Build results self._build_results() # List factory self._factory_list() # Project freeze self._project_freeze() # Comments self._comments() # Workaround self._ugly_hack() def _pretty_callback(self, request, uri, headers, exception=True): """ Custom callback for HTTPretty. It mocks requests and replaces calls with either xml, content of file, function call or first item in array of those. :param request: request as provided to callback function by HTTPretty :param uri: uri as provided to callback function by HTTPretty :param headers: headers as provided to callback function by HTTPretty """ # Get path path = re.match(r'.*localhost([^?]*)(\?.*)?', uri).group(1) reply = None # Try to find a fallback if path in self.responses['ALL']: reply = self.responses['ALL'][path] # Try to find a specific method if path in self.responses[request.method]: reply = self.responses[request.method][path] # We have something to reply with if reply: def get_reply(reply): # It's a dict, therefore there is return code as well if isinstance(reply, dict): ret_code = reply['status'] reply = get_reply(reply['reply']) if isinstance(reply, list): reply[0] = ret_code return reply # It's a list, so take the first if isinstance(reply, list): return get_reply(reply.pop(0)) # It's string if isinstance(reply, string_types): # It's XML if reply.startswith('<'): return (200, headers, reply) # It's fixture else: return (200, headers, self._get_fixture_content(reply)) # All is left is callback function if callable(reply): return get_reply(reply(self.responses, request, uri)) return None reply = get_reply(reply) if reply: return reply # No possible response found if len(path) == 0: path = uri if len(path) > 1: ret = self._pretty_callback(request, 'https://localhost' + posixpath.dirname(path), headers, False) if ret: return ret if exception: raise BaseException("No response for {} on {} provided".format(request.method, uri)) else: return None def _ugly_hack(self): """ Static fixtures we don't have a way of generating yet Whole point of this setup is to cleanup all tests and be able to move everything to new test-suite. """ # Build results verification, maybe not worth of dynamic processing self.responses['GET']['/build/red/_result'] = 'build-results-red.xml' self.responses['GET']['/build/green/_result'] = 'build-results-green.xml' # Testing of rings self.responses['GET']['/source/openSUSE:Factory:Rings:0-Bootstrap'] = 'ring-0-project.xml' self.responses['GET']['/source/openSUSE:Factory:Rings:1-MinimalX'] = 'ring-1-project.xml' # Testing of frozen packages tmpl = Template(self._get_fixture_content('project-f-metalist.xml')) self.responses['GET']['/source/openSUSE:Factory:Staging:B/_project'] = tmpl.substitute({'mtime': int(time.time()) - 100}) self.responses['GET']['/source/openSUSE:Factory:Staging:A/_project'] = tmpl.substitute({'mtime': int(time.time()) - 3600*24*356}) self.responses['GET']['/source/openSUSE:Factory:Staging:U/_project'] = 'project-u-metalist.xml' def _build_results(self): """ Mimic build results, B is broken, A works """ def build_results(responses, request, uri): ret_str = '\n' prj = re.match(r'.*/([^/]*)/_result', uri).group(1) if prj == 'openSUSE:Factory:Staging:B': states = ['failed', 'broken', 'building'] else: states = ['excluded', 'succeeded'] for st in states: if st == 'building': ret_str += ' \n'.format(prj, st, st) else: ret_str += ' \n'.format(prj, st, "published") for dt in self.links_data: if self.links_data[dt]['prj'] == prj: ret_str += ' \n'.format(self.links_data[dt]['pkg'], st) ret_str += ' \n' ret_str += '\n' return ret_str self.responses['GET']['/build/openSUSE:Factory:Staging:A/_result'] = build_results self.responses['GET']['/build/openSUSE:Factory:Staging:B/_result'] = build_results def _factory_list(self): def factory_list(responses, request, uri): if 'nofilename' in request.querystring and '1' in request.querystring['nofilename'] and 'view' in request.querystring and 'info' in request.querystring['view']: ret = '\n' for pkg in self.pkg_data: if re.match(r'openSUSE:Factory/', pkg): ret += ' '.format(self.pkg_data[pkg]['name'], self.pkg_data[pkg]['rev'], self.pkg_data[pkg]['vrev'], self.pkg_data[pkg]['srcmd5']) ret += '\n' return ret else: raise BaseException("No response for {}".format(uri)) self.responses['GET']['/source/openSUSE:Factory'] = factory_list def _project_freeze(self): # FIXME: Actually do what is supposed to happen def project_freeze(responses, request, uri): return "Ok" for pr in self.st_project_data: self.responses['PUT']['/source/openSUSE:Factory:Staging:' + pr + '/_project/_frozenlinks'] = project_freeze def find_request(responses, request, uri): if 'view' in request.querystring and request.querystring['view'][0] == u"collection": rqs = [] # Itereate through all requests for rq in self.requests_data: # Find the ones matching the condition if self.requests_data[rq]['request'] in ['review', 'new', 'declined'] and ( (not 'package' in request.querystring) or self.requests_data[rq]['package'] in request.querystring['package']): rqs.append(rq) # Create response ret_str = '' for rq in rqs: ret_str += responses['GET']['/request/' + rq] ret_str += '' return ret_str raise BaseException('No request search defined for ' + uri) self.responses['GET']['/request'] = find_request def _project_meta(self): # Load template tmpl = Template(self._get_fixture_content('staging-project-meta.xml')) def project_meta_change(responses, request, uri): path = re.match(r'.*localhost([^?]*)(\?.*)?', uri).group(1) self.responses['GET'][path] = request.body return self.responses['GET'][path] # Register methods for all requests for pr in self.st_project_data: # Static response for gets (just filling template from local data) self.responses['GET']['/source/openSUSE:Factory:Staging:' + pr + '/_meta'] = tmpl.substitute(self.st_project_data[pr]) # Interpret other requests self.responses['ALL']['/source/openSUSE:Factory:Staging:' + pr + '/_meta'] = project_meta_change def _request_review(self): """Register requests methods.""" # Load template tmpl = Template(self._get_fixture_content('request_review.xml')) # What happens when we try to change the review def review_change(responses, request, uri): rq_id = re.match(r'.*/([0-9]+)', uri).group(1) args = self.requests_data[rq_id] # Adding review if 'cmd' in request.querystring and 'addreview' in request.querystring['cmd']: self.requests_data[rq_id]['request'] = 'review' self.requests_data[rq_id]['review'] = 'new' # Changing review if 'cmd' in request.querystring and 'changereviewstate' in request.querystring[u'cmd']: self.requests_data[rq_id]['request'] = 'new' self.requests_data[rq_id]['review'] = str(request.querystring[u'newstate'][0]) # Project review if 'by_project' in request.querystring: self.requests_data[rq_id]['by'] = 'project' self.requests_data[rq_id]['by_who'] = str(request.querystring[u'by_project'][0]) # Group review if 'by_group' in request.querystring: self.requests_data[rq_id]['by'] = 'group' self.requests_data[rq_id]['by_who'] = str(request.querystring[u'by_group'][0]) responses['GET']['/request/' + rq_id] = tmpl.substitute(self.requests_data[rq_id]) return responses['GET']['/request/' + rq_id] # Register methods for all requests for rq in self.requests_data: # Static response for gets (just filling template from local data) self.responses['GET']['/request/' + rq] = tmpl.substitute(self.requests_data[rq]) # Interpret other requests self.responses['ALL']['/request/' + rq] = review_change def _pkg_sources(self): def pkg_source(responses, request, uri): match = re.match(r'.*/source/([^/]+)/([^?/]+)([/?].*)?', request.path) if not match: return { 'status': 404, 'reply': 'Not found' } key = match.group(1) + '/' + match.group(2) if match.group(3) == '/_meta': return self.meta_data[key] if key in self.pkg_data: if not self.pkg_data[key]: return { 'status': 404, 'reply': 'Not found' } return ''.format( self.pkg_data[key]['name'], self.pkg_data[key]['rev'], self.pkg_data[key]['vrev'], self.pkg_data[key]['srcmd5'] ) return { 'status': 404, 'reply': 'Not found' } def pkg_change(responses, request, uri): match = re.match(r'.*/source/([^/]+)/([^?/]+)([/?].*)?', request.path) key = match.group(1) + '/' + match.group(2) if match.group(3) == '/_meta': self.meta_data[key] = request.body return request.body if match.group(3) == '/_aggregate': xml = ET.fromstring(str(request.body)) element = xml.findall('aggregate')[0] dev_prj = element.get('project') # FIXME get data from linked project self.pkg_data[key] = { 'rev': '1', 'vrev': '1', 'name': match.group(2), 'srcmd5': 'de7a9f5e3bedb01980465f3be3d236cb' } return request.body return "Ok" self.responses['GET']['/source'] = pkg_source self.responses['PUT']['/source'] = pkg_change def _link_sources(self): # Load template tmpl = Template(self._get_fixture_content('linksource.xml')) def delete_link(responses, request, uri): key = re.match(r'.*/source/([^?]+)(\?.*)?', uri).group(1) del self.responses['GET']['/source/' + str(key)] del self.links_data[str(key)] return "Ok" def create_empty(responses, request, uri): key = re.match(r'.*/source/(.+)/_meta', uri).group(1) self.links_data[str(key)] = {} return "Ok" def create_link(responses, request, uri): tmpl = Template(self._get_fixture_content('linksource.xml')) key = re.match(r'.*/source/(.+)/_link', uri).group(1) match = re.match(r'(.+)/(.+)', key) xml = ET.fromstring(str(request.body)) self.links_data[str(key)] = { 'prj': match.group(1), 'pkg': match.group(2), 'devprj': xml.get('project') } self.responses['GET']['/source/' + key] = tmpl.substitute(self.links_data[key]) self.responses['DELETE']['/source/' + key] = delete_link return "Ok" # Register methods for requests for link in self.links_data: self.responses['GET']['/source/' + link] = tmpl.substitute(self.links_data[link]) self.responses['DELETE']['/source/' + link] = delete_link # Register method for package creation for pr in self.st_project_data: for rq in self.requests_data: self.responses['PUT']['/source/openSUSE:Factory:Staging:' + pr + '/' + self.requests_data[rq]['package'] + '/_meta'] = create_empty self.responses['PUT']['/source/openSUSE:Factory:Staging:' + pr + '/' + self.requests_data[rq]['package'] + '/_link'] = create_link def _search(self): """ Allows searching for requests """ def request_search(responses, request, uri): # Searching for requests that has open review for staging group if 'match' in request.querystring and request.querystring['match'][0] == u"state/@name='review' and review[@by_group='factory-staging' and @state='new']": rqs = [] # Itereate through all requests for rq in self.requests_data: # Find the ones matching the condition if self.requests_data[rq]['request'] == 'review' and self.requests_data[rq]['review'] == 'new' and self.requests_data[rq]['by'] == 'group' and self.requests_data[rq]['by_who'] == 'factory-staging': rqs.append(rq) # Create response ret_str = '' for rq in rqs: ret_str += responses['GET']['/request/' + rq] ret_str += '' return ret_str # Searching for requests that has open review for staging project if 'match' in request.querystring and re.match(r"state/@name='review' and review\[@by_project='([^']+)' and @state='new'\]", request.querystring[u'match'][0]): prj_match = re.match(r"state/@name='review' and review\[@by_project='([^']+)' and @state='new'\]", request.querystring[u'match'][0]) prj = str(prj_match.group(1)) rqs = [] # Itereate through all requests for rq in self.requests_data: # Find the ones matching the condition if self.requests_data[rq]['request'] == 'review' and self.requests_data[rq]['review'] == 'new' and self.requests_data[rq]['by'] == 'project' and self.requests_data[rq]['by_who'] == prj: rqs.append(rq) # Create response ret_str = '\n' for rq in rqs: ret_str += ' \n' ret_str += '' return ret_str # We are searching for something else, we don't know the answer raise BaseException("No search results defined for " + pprint.pformat(request.querystring)) def id_project_search(responses, request, uri): # Searching for project if 'match' in request.querystring and request.querystring['match'][0] == u"starts-with(@name,\'openSUSE:Factory:Staging:\')": ret_str = '\n' # Itereate through all requests for prj in self.st_project_data: ret_str += ' \n' ret_str += '' return ret_str # We are searching for something else, we don't know the answer raise BaseException("No search results defined for " + pprint.pformat(request.querystring)) self.responses['GET']['/search/request'] = request_search self.responses['GET']['/search/request/id'] = request_search self.responses['GET']['/search/project/id'] = id_project_search def _comments(self): def comment(responses, request, uri): return 'Ok' for st in self.st_project_data: self.responses['POST']['/comments/project/openSUSE:Factory:Staging:%s' % st] = comment def register_obs(self): """ Register custom callback for HTTPretty """ httpretty.register_uri(httpretty.DELETE, re.compile(r'/.*localhost.*/'), body=self._pretty_callback) httpretty.register_uri(httpretty.GET, re.compile(r'/.*localhost.*/'), body=self._pretty_callback) httpretty.register_uri(httpretty.PUT, re.compile(r'/.*localhost.*/'), body=self._pretty_callback) httpretty.register_uri(httpretty.POST, re.compile(r'/.*localhost.*/'), body=self._pretty_callback) self.reset_config() # Initiate the api with mocked rings self.api = oscs.StagingAPI('https://localhost') def _get_fixtures_dir(self): """ Return path for fixtures """ return os.path.join(os.getcwd(), 'tests/fixtures') def _get_fixture_path(self, filename): """ Return path for fixture """ return os.path.join(self._get_fixtures_dir(), filename) def _get_fixture_content(self, filename): """ Return content of fixture """ response = open(self._get_fixture_path(filename), 'r') content = response.read() response.close() return content