1
0
mirror of https://github.com/openSUSE/osc.git synced 2025-01-17 19:06:15 +01:00
github.com_openSUSE_osc/tests/common.py

213 lines
7.6 KiB
Python
Raw Normal View History

import unittest
import osc.core
import shutil
import tempfile
import os
import sys
from xml.etree import cElementTree as ET
EXPECTED_REQUESTS = []
if sys.version_info[0:2] in ((2, 6), (2, 7)):
bytes = lambda x, *args: x
2013-04-09 14:22:45 +02:00
try:
#python 2.x
2013-04-09 14:22:45 +02:00
from cStringIO import StringIO
from urllib2 import HTTPHandler, addinfourl, build_opener
from urlparse import urlparse, parse_qs
2013-04-09 14:22:45 +02:00
except ImportError:
from io import StringIO
from urllib.request import HTTPHandler, addinfourl, build_opener
from urllib.parse import urlparse, parse_qs
from io import BytesIO
def urlcompare(url, *args):
"""compare all components of url except query string - it is converted to
dict, therefor different ordering does not makes url's different, as well
as quoting of a query string"""
components = urlparse(url)
query_args = parse_qs(components.query)
components = components._replace(query=None)
if not args:
return False
for url in args:
components2 = urlparse(url)
query_args2 = parse_qs(components2.query)
components2 = components2._replace(query=None)
if components != components2 or \
query_args != query_args2:
return False
return True
2013-04-09 14:22:45 +02:00
class RequestWrongOrder(Exception):
"""raised if an unexpected request is issued to urllib2"""
def __init__(self, url, exp_url, method, exp_method):
Exception.__init__(self)
self.url = url
self.exp_url = exp_url
self.method = method
self.exp_method = exp_method
def __str__(self):
return '%s, %s, %s, %s' % (self.url, self.exp_url, self.method, self.exp_method)
class RequestDataMismatch(Exception):
"""raised if POSTed or PUTed data doesn't match with the expected data"""
def __init__(self, url, got, exp):
self.url = url
self.got = got
self.exp = exp
def __str__(self):
return '%s, %s, %s' % (self.url, self.got, self.exp)
2013-04-09 14:22:45 +02:00
class MyHTTPHandler(HTTPHandler):
def __init__(self, exp_requests, fixtures_dir):
2013-04-09 14:22:45 +02:00
HTTPHandler.__init__(self)
self.__exp_requests = exp_requests
self.__fixtures_dir = fixtures_dir
def http_open(self, req):
r = self.__exp_requests.pop(0)
if not urlcompare(req.get_full_url(), r[1]) or req.get_method() != r[0]:
raise RequestWrongOrder(req.get_full_url(), r[1], req.get_method(), r[0])
if req.get_method() in ('GET', 'DELETE'):
return self.__mock_GET(r[1], **r[2])
elif req.get_method() in ('PUT', 'POST'):
return self.__mock_PUT(req, **r[2])
def __mock_GET(self, fullurl, **kwargs):
return self.__get_response(fullurl, **kwargs)
def __mock_PUT(self, req, **kwargs):
exp = kwargs.get('exp', None)
2013-04-09 14:22:45 +02:00
if exp is not None and 'expfile' in kwargs:
raise RuntimeError('either specify exp or expfile')
2013-04-09 14:22:45 +02:00
elif 'expfile' in kwargs:
exp = open(os.path.join(self.__fixtures_dir, kwargs['expfile']), 'r').read()
elif exp is None:
raise RuntimeError('exp or expfile required')
if exp is not None:
# use req.data instead of req.get_data() for python3 compatiblity
data = req.data
if hasattr(data, 'read'):
data = data.read()
if data != bytes(exp, "utf-8"):
raise RequestDataMismatch(req.get_full_url(), repr(data), repr(exp))
return self.__get_response(req.get_full_url(), **kwargs)
def __get_response(self, url, **kwargs):
f = None
2013-04-09 14:22:45 +02:00
if 'exception' in kwargs:
raise kwargs['exception']
2013-04-09 14:22:45 +02:00
if 'text' not in kwargs and 'file' in kwargs:
f = BytesIO(open(os.path.join(self.__fixtures_dir, kwargs['file']), 'rb').read())
2013-04-09 14:22:45 +02:00
elif 'text' in kwargs and 'file' not in kwargs:
f = BytesIO(bytes(kwargs['text'], 'utf-8'))
else:
raise RuntimeError('either specify text or file')
2013-04-09 14:22:45 +02:00
resp = addinfourl(f, {}, url)
resp.code = kwargs.get('code', 200)
resp.msg = ''
return resp
def urldecorator(method, fullurl, **kwargs):
def decorate(test_method):
def wrapped_test_method(*args):
addExpectedRequest(method, fullurl, **kwargs)
test_method(*args)
# "rename" method otherwise we cannot specify a TestCaseClass.testName
# cmdline arg when using unittest.main()
wrapped_test_method.__name__ = test_method.__name__
return wrapped_test_method
return decorate
def GET(fullurl, **kwargs):
return urldecorator('GET', fullurl, **kwargs)
def PUT(fullurl, **kwargs):
return urldecorator('PUT', fullurl, **kwargs)
def POST(fullurl, **kwargs):
return urldecorator('POST', fullurl, **kwargs)
def DELETE(fullurl, **kwargs):
return urldecorator('DELETE', fullurl, **kwargs)
def addExpectedRequest(method, url, **kwargs):
global EXPECTED_REQUESTS
EXPECTED_REQUESTS.append((method, url, kwargs))
class OscTestCase(unittest.TestCase):
def setUp(self, copytree=True):
2013-01-18 22:58:53 +01:00
oscrc = os.path.join(self._get_fixtures_dir(), 'oscrc')
osc.core.conf.get_config(override_conffile=oscrc,
override_no_keyring=True, override_no_gnome_keyring=True)
2013-01-18 22:58:53 +01:00
os.environ['OSC_CONFIG'] = oscrc
self.tmpdir = tempfile.mkdtemp(prefix='osc_test')
if copytree:
shutil.copytree(os.path.join(self._get_fixtures_dir(), 'osctest'), os.path.join(self.tmpdir, 'osctest'))
global EXPECTED_REQUESTS
EXPECTED_REQUESTS = []
2013-04-09 14:22:45 +02:00
osc.core.conf._build_opener = lambda u: build_opener(MyHTTPHandler(EXPECTED_REQUESTS, self._get_fixtures_dir()))
self.stdout = sys.stdout
2013-04-09 14:22:45 +02:00
sys.stdout = StringIO()
def tearDown(self):
self.assertTrue(len(EXPECTED_REQUESTS) == 0)
sys.stdout = self.stdout
try:
shutil.rmtree(self.tmpdir)
except:
pass
def _get_fixtures_dir(self):
raise NotImplementedError('subclasses should implement this method')
def _change_to_pkg(self, name):
os.chdir(os.path.join(self.tmpdir, 'osctest', name))
def _check_list(self, fname, exp):
fname = os.path.join('.osc', fname)
self.assertTrue(os.path.exists(fname))
self.assertEqual(open(fname, 'r').read(), exp)
def _check_addlist(self, exp):
self._check_list('_to_be_added', exp)
def _check_deletelist(self, exp):
self._check_list('_to_be_deleted', exp)
def _check_conflictlist(self, exp):
self._check_list('_in_conflict', exp)
def _check_status(self, p, fname, exp):
self.assertEqual(p.status(fname), exp)
def _check_digests(self, fname, *skipfiles):
fname = os.path.join(self._get_fixtures_dir(), fname)
self.assertEqual(open(os.path.join('.osc', '_files'), 'r').read(), open(fname, 'r').read())
root = ET.parse(fname).getroot()
for i in root.findall('entry'):
if i.get('name') in skipfiles:
continue
self.assertTrue(os.path.exists(os.path.join('.osc', i.get('name'))))
self.assertEqual(osc.core.dgst(os.path.join('.osc', i.get('name'))), i.get('md5'))
def assertEqualMultiline(self, got, exp):
if (got + exp).find('\n') == -1:
self.assertEqual(got, exp)
else:
start_delim = "\n" + (" 8< ".join(["-----"] * 8)) + "\n"
end_delim = "\n" + (" >8 ".join(["-----"] * 8)) + "\n\n"
self.assertEqual(got, exp,
"got:" + start_delim + got + end_delim +
"expected:" + start_delim + exp + end_delim)