import os import unittest import logging import httpretty from osclib.cache import Cache from . import OBSLocal from urllib.parse import urlparse, parse_qs from check_source_in_factory import FactorySourceChecker APIURL = 'http://testhost.example.com' FIXTURES = os.path.join(os.getcwd(), 'tests/fixtures') class TestFactorySourceAccept(OBSLocal.TestCase): def tearDown(self): httpretty.disable() httpretty.reset() def setUp(self): """ Initialize the configuration """ super().setUp() Cache.last_updated[APIURL] = {'__oldest': '2016-12-18T11:49:37Z'} httpretty.reset() httpretty.enable(allow_net_connect=False) logging.basicConfig() self.logger = logging.getLogger(__file__) self.logger.setLevel(logging.DEBUG) self.checker = FactorySourceChecker(apiurl=APIURL, user='factory-source', logger=self.logger) self.checker.override_allow = False # Test setup cannot handle. def test_accept_request(self): httpretty.register_uri(httpretty.GET, APIURL + '/source/openSUSE:Factory/00Meta/lookup.yml', status=404) httpretty.register_uri(httpretty.GET, APIURL + "/request/770001", body=""" ... ... """) httpretty.register_uri(httpretty.GET, APIURL + "/source/Base:System/timezone?view=info&rev=481ecbe0dfc63ece3a1f1b5598f7d96c", match_querystring=True, body=""" timezone.spec """) httpretty.register_uri(httpretty.GET, APIURL + "/source/openSUSE:Factory/timezone/_meta", body=""" timezone """) httpretty.register_uri(httpretty.GET, APIURL + "/source/Base:System/timezone/_meta", body=""" timezone """) httpretty.register_uri(httpretty.GET, APIURL + "/source/openSUSE:Factory/timezone?view=info", match_querystring=True, body=""" timezone.spec """) httpretty.register_uri(httpretty.GET, APIURL + "/source/openSUSE:Factory/timezone/_history?limit=5", match_querystring=True, body=""" timezone.spec """) httpretty.register_uri(httpretty.GET, APIURL + '/search/request', responses=[ httpretty.Response(body=""" ... ... """), httpretty.Response(body=""" ... ... """) ]) result = {'status': None} def change_request(result, method, uri, headers): query = parse_qs(urlparse(uri).query) if query == {'by_user': ['factory-source'], 'cmd': ['changereviewstate'], 'newstate': ['accepted']}: result['status'] = True else: result['status'] = 'ERROR' return (200, headers, '') httpretty.register_uri(httpretty.POST, APIURL + "/request/770001", body=lambda method, uri, headers: change_request(result, method, uri, headers)) # first time request is in in review self.checker.set_request_ids(['770001']) self.checker.check_requests() self.assertEqual(result['status'], None) # second time request is in state new so we can accept self.checker.set_request_ids(['770001']) self.checker.check_requests() self.assertTrue(result['status']) def test_source_not_in_factory(self): url = '/search/request?match=state%2F%40name%3D%27review%27+and+review%5B%40by_user%3D' url += '%27factory-source%27+and+%40state%3D%27new%27%5D&withfullhistory=1' httpretty.register_uri(httpretty.GET, APIURL + url, match_querystring=True, body=""" accepted Review got accepted accepted Request created test update Request got a new review request test update """) httpretty.register_uri(httpretty.GET, APIURL + "/request/261411", body=""" accepted Review got accepted accepted Request created test update Request got a new review request test update """) httpretty.register_uri(httpretty.GET, APIURL + "/source/home:lnussel:branches:openSUSE:Backports:SLE-12/plan", body=""" """) httpretty.register_uri(httpretty.GET, APIURL + "/source/openSUSE:Factory/plan/_meta", status=404, body=""" openSUSE:Factory/plan """) httpretty.register_uri(httpretty.GET, APIURL + '/source/openSUSE:Factory/00Meta/lookup.yml', status=404) httpretty.register_uri(httpretty.GET, APIURL + '/search/request', body=""" """) result = {'factory_source_declined': None} def change_request(result, method, uri, headers): query = parse_qs(urlparse(uri).query) if query == {'by_user': ['factory-source'], 'cmd': ['changereviewstate'], 'newstate': ['declined']}: result['factory_source_declined'] = True return (200, headers, '') httpretty.register_uri(httpretty.POST, APIURL + "/request/261411", body=lambda method, uri, headers: change_request(result, method, uri, headers)) self.checker.requests = [] self.checker.set_request_ids_search_review() self.checker.check_requests() self.assertTrue(result['factory_source_declined']) if __name__ == '__main__': unittest.main()