import os
import unittest
import logging
import httpretty
import osc
import re
from osclib.cache import Cache
from . import OBSLocal
from urllib.parse import urlparse, parse_qs
from check_source_in_factory import FactorySourceChecker
APIURL = 'http://testhost.example.com'
FIXTURES = os.path.join(os.getcwd(), 'tests/fixtures')
class TestFactorySourceAccept(OBSLocal.TestCase):
def tearDown(self):
httpretty.disable()
httpretty.reset()
def setUp(self):
"""
Initialize the configuration
"""
super().setUp()
Cache.last_updated[APIURL] = {'__oldest': '2016-12-18T11:49:37Z'}
httpretty.reset()
httpretty.enable(allow_net_connect=False)
logging.basicConfig()
self.logger = logging.getLogger(__file__)
self.logger.setLevel(logging.DEBUG)
self.checker = FactorySourceChecker(apiurl = APIURL,
user = 'factory-source',
logger = self.logger)
self.checker.override_allow = False # Test setup cannot handle.
def test_accept_request(self):
httpretty.register_uri(httpretty.GET,
APIURL + '/source/openSUSE:Factory/00Meta/lookup.yml',
status = 404)
httpretty.register_uri(httpretty.GET,
APIURL + "/request/770001",
body = """
...
...
""")
httpretty.register_uri(httpretty.GET,
APIURL + "/source/Base:System/timezone?view=info&rev=481ecbe0dfc63ece3a1f1b5598f7d96c",
match_querystring = True,
body = """
timezone.spec
""")
httpretty.register_uri(httpretty.GET,
APIURL + "/source/openSUSE:Factory/timezone/_meta",
body = """
timezone
""")
httpretty.register_uri(httpretty.GET,
APIURL + "/source/Base:System/timezone/_meta",
body = """
timezone
""")
httpretty.register_uri(httpretty.GET,
APIURL + "/source/openSUSE:Factory/timezone?view=info",
match_querystring = True,
body = """
timezone.spec
""")
httpretty.register_uri(httpretty.GET,
APIURL + "/source/openSUSE:Factory/timezone/_history?limit=5",
match_querystring = True,
body = """
timezone.spec
""")
httpretty.register_uri(httpretty.GET,
APIURL + '/search/request',
responses = [
httpretty.Response( body = """
...
...
"""),
httpretty.Response( body = """
...
...
""")
])
result = { 'status': None }
def change_request(result, method, uri, headers):
query = parse_qs(urlparse(uri).query)
if query == {'by_user': ['factory-source'], 'cmd': ['changereviewstate'], 'newstate': ['accepted']}:
result['status'] = True
else:
result['status'] = 'ERROR'
return (200, headers, '')
httpretty.register_uri(httpretty.POST,
APIURL + "/request/770001",
body = lambda method, uri, headers: change_request(result, method, uri, headers))
# first time request is in in review
self.checker.set_request_ids(['770001'])
self.checker.check_requests()
self.assertEqual(result['status'], None)
# second time request is in state new so we can accept
self.checker.set_request_ids(['770001'])
self.checker.check_requests()
self.assertTrue(result['status'])
def test_source_not_in_factory(self):
httpretty.register_uri(httpretty.GET,
APIURL + '/search/request?match=state%2F%40name%3D%27review%27+and+review%5B%40by_user%3D%27factory-source%27+and+%40state%3D%27new%27%5D&withfullhistory=1',
match_querystring = True,
body = """
accepted
Review got accepted
accepted
Request created
test update
Request got a new review request
test update
""")
httpretty.register_uri(httpretty.GET,
APIURL + "/request/261411",
body = """
accepted
Review got accepted
accepted
Request created
test update
Request got a new review request
test update
""")
httpretty.register_uri(httpretty.GET,
APIURL + "/source/home:lnussel:branches:openSUSE:Backports:SLE-12/plan",
body = """
""")
httpretty.register_uri(httpretty.GET,
APIURL + "/source/openSUSE:Factory/plan/_meta",
status = 404,
body = """
openSUSE:Factory/plan
""")
httpretty.register_uri(httpretty.GET,
APIURL + '/source/openSUSE:Factory/00Meta/lookup.yml',
status = 404)
httpretty.register_uri(httpretty.GET,
APIURL + '/search/request',
body = """
""")
result = { 'factory_source_declined': None }
def change_request(result, method, uri, headers):
query = parse_qs(urlparse(uri).query)
if query == {'by_user': ['factory-source'], 'cmd': ['changereviewstate'], 'newstate': ['declined']}:
result['factory_source_declined'] = True
return (200, headers, '')
httpretty.register_uri(httpretty.POST,
APIURL + "/request/261411",
body = lambda method, uri, headers: change_request(result, method, uri, headers))
self.checker.requests = []
self.checker.set_request_ids_search_review()
self.checker.check_requests()
self.assertTrue(result['factory_source_declined'])
if __name__ == '__main__':
unittest.main()