import unittest import osc.core import shutil import tempfile import os import sys try: # Works up to Python 3.8, needed for Python < 3.3 (inc 2.7) from xml.etree import cElementTree as ET except ImportError: # will import a fast implementation from 3.3 onwards, needed # for 3.9+ from xml.etree import ElementTree as ET EXPECTED_REQUESTS = [] try: #python 2.x from cStringIO import StringIO from urllib2 import HTTPHandler, addinfourl, build_opener from urlparse import urlparse, parse_qs except ImportError: from io import StringIO from urllib.request import HTTPHandler, addinfourl, build_opener from urllib.parse import urlparse, parse_qs from io import BytesIO def urlcompare(url, *args): """compare all components of url except query string - it is converted to dict, therefor different ordering does not makes url's different, as well as quoting of a query string""" components = urlparse(url) query_args = parse_qs(components.query) components = components._replace(query=None) if not args: return False for url in args: components2 = urlparse(url) query_args2 = parse_qs(components2.query) components2 = components2._replace(query=None) if components != components2 or \ query_args != query_args2: return False return True def xml_equal(actual, exp): try: actual_xml = ET.fromstring(actual) exp_xml = ET.fromstring(exp) except ET.ParseError: return False todo = [(actual_xml, exp_xml)] while todo: actual_xml, exp_xml = todo.pop(0) if actual_xml.tag != exp_xml.tag: return False if actual_xml.attrib != exp_xml.attrib: return False if actual_xml.text != exp_xml.text: return False if actual_xml.tail != exp_xml.tail: return False if len(actual_xml) != len(exp_xml): return False todo.extend(list(zip(actual_xml, exp_xml))) return True class RequestWrongOrder(Exception): """raised if an unexpected request is issued to urllib2""" def __init__(self, url, exp_url, method, exp_method): Exception.__init__(self) self.url = url self.exp_url = exp_url self.method = method self.exp_method = exp_method def __str__(self): return '%s, %s, %s, %s' % (self.url, self.exp_url, self.method, self.exp_method) class RequestDataMismatch(Exception): """raised if POSTed or PUTed data doesn't match with the expected data""" def __init__(self, url, got, exp): self.url = url self.got = got self.exp = exp def __str__(self): return '%s, %s, %s' % (self.url, self.got, self.exp) class MyHTTPHandler(HTTPHandler): def __init__(self, exp_requests, fixtures_dir): HTTPHandler.__init__(self) self.__exp_requests = exp_requests self.__fixtures_dir = fixtures_dir def http_open(self, req): r = self.__exp_requests.pop(0) if not urlcompare(req.get_full_url(), r[1]) or req.get_method() != r[0]: raise RequestWrongOrder(req.get_full_url(), r[1], req.get_method(), r[0]) if req.get_method() in ('GET', 'DELETE'): return self.__mock_GET(r[1], **r[2]) elif req.get_method() in ('PUT', 'POST'): return self.__mock_PUT(req, **r[2]) def __mock_GET(self, fullurl, **kwargs): return self.__get_response(fullurl, **kwargs) def __mock_PUT(self, req, **kwargs): exp = kwargs.get('exp', None) if exp is not None and 'expfile' in kwargs: raise RuntimeError('either specify exp or expfile') elif 'expfile' in kwargs: exp = open(os.path.join(self.__fixtures_dir, kwargs['expfile']), 'rb').read() elif exp is None: raise RuntimeError('exp or expfile required') else: # for now, assume exp is a str exp = exp.encode('utf-8') # use req.data instead of req.get_data() for python3 compatiblity data = req.data if hasattr(data, 'read'): data = data.read() if data != exp: # We do not have a notion to explicitly mark xml content. In case # of xml, we do not care about the exact xml representation (for # now). Hence, if both, data and exp, are xml and are "equal", # everything is fine (for now); otherwise, error out # (of course, this is problematic if we want to ensure that XML # documents are bit identical...) if not xml_equal(data, exp): raise RequestDataMismatch(req.get_full_url(), repr(data), repr(exp)) return self.__get_response(req.get_full_url(), **kwargs) def __get_response(self, url, **kwargs): f = None if 'exception' in kwargs: raise kwargs['exception'] if 'text' not in kwargs and 'file' in kwargs: f = BytesIO(open(os.path.join(self.__fixtures_dir, kwargs['file']), 'rb').read()) elif 'text' in kwargs and 'file' not in kwargs: f = BytesIO(kwargs['text'].encode('utf-8')) else: raise RuntimeError('either specify text or file') resp = addinfourl(f, {}, url) resp.code = kwargs.get('code', 200) resp.msg = '' return resp def urldecorator(method, fullurl, **kwargs): def decorate(test_method): def wrapped_test_method(*args): addExpectedRequest(method, fullurl, **kwargs) test_method(*args) # "rename" method otherwise we cannot specify a TestCaseClass.testName # cmdline arg when using unittest.main() wrapped_test_method.__name__ = test_method.__name__ return wrapped_test_method return decorate def GET(fullurl, **kwargs): return urldecorator('GET', fullurl, **kwargs) def PUT(fullurl, **kwargs): return urldecorator('PUT', fullurl, **kwargs) def POST(fullurl, **kwargs): return urldecorator('POST', fullurl, **kwargs) def DELETE(fullurl, **kwargs): return urldecorator('DELETE', fullurl, **kwargs) def addExpectedRequest(method, url, **kwargs): global EXPECTED_REQUESTS EXPECTED_REQUESTS.append((method, url, kwargs)) class OscTestCase(unittest.TestCase): def setUp(self, copytree=True): oscrc = os.path.join(self._get_fixtures_dir(), 'oscrc') osc.core.conf.get_config(override_conffile=oscrc, override_no_keyring=True, override_no_gnome_keyring=True) os.environ['OSC_CONFIG'] = oscrc self.tmpdir = tempfile.mkdtemp(prefix='osc_test') if copytree: shutil.copytree(os.path.join(self._get_fixtures_dir(), 'osctest'), os.path.join(self.tmpdir, 'osctest')) global EXPECTED_REQUESTS EXPECTED_REQUESTS = [] osc.core.conf._build_opener = lambda u: build_opener(MyHTTPHandler(EXPECTED_REQUESTS, self._get_fixtures_dir())) self.stdout = sys.stdout sys.stdout = StringIO() def tearDown(self): self.assertTrue(len(EXPECTED_REQUESTS) == 0) sys.stdout = self.stdout try: shutil.rmtree(self.tmpdir) except: pass def _get_fixtures_dir(self): raise NotImplementedError('subclasses should implement this method') def _change_to_pkg(self, name): os.chdir(os.path.join(self.tmpdir, 'osctest', name)) def _check_list(self, fname, exp): fname = os.path.join('.osc', fname) self.assertTrue(os.path.exists(fname)) self.assertEqual(open(fname, 'r').read(), exp) def _check_addlist(self, exp): self._check_list('_to_be_added', exp) def _check_deletelist(self, exp): self._check_list('_to_be_deleted', exp) def _check_conflictlist(self, exp): self._check_list('_in_conflict', exp) def _check_status(self, p, fname, exp): self.assertEqual(p.status(fname), exp) def _check_digests(self, fname, *skipfiles): fname = os.path.join(self._get_fixtures_dir(), fname) with open(os.path.join('.osc', '_files'), 'r') as f: files_act = f.read() with open(fname, 'r') as f: files_exp = f.read() self.assertXMLEqual(files_act, files_exp) root = ET.fromstring(files_act) for i in root.findall('entry'): if i.get('name') in skipfiles: continue self.assertTrue(os.path.exists(os.path.join('.osc', i.get('name')))) self.assertEqual(osc.core.dgst(os.path.join('.osc', i.get('name'))), i.get('md5')) def assertXMLEqual(self, act, exp): if xml_equal(act, exp): return # ok, xmls are different, hence, assertEqual is expected to fail # (we just use it in order to get a "nice" error message) self.assertEqual(act, exp) # not reached (unless assertEqual is overridden in an incompatible way) raise RuntimeError('assertEqual assumptions violated') def assertEqualMultiline(self, got, exp): if (got + exp).find('\n') == -1: self.assertEqual(got, exp) else: start_delim = "\n" + (" 8< ".join(["-----"] * 8)) + "\n" end_delim = "\n" + (" >8 ".join(["-----"] * 8)) + "\n\n" self.assertEqual(got, exp, "got:" + start_delim + got + end_delim + "expected:" + start_delim + exp + end_delim)