# Copyright (C) 2015 SUSE Linux GmbH
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import os
import unittest
import logging
import httpretty
import osc
import re
import urlparse
from osclib.cache import Cache
from check_source_in_factory import FactorySourceChecker
APIURL = 'https://testhost.example.com'
FIXTURES = os.path.join(os.getcwd(), 'tests/fixtures')
def rr(s):
return re.compile(re.escape(APIURL + s))
class TestFactorySourceAccept(unittest.TestCase):
def setUp(self):
"""
Initialize the configuration
"""
Cache.last_updated[APIURL] = {'__oldest': '2016-12-18T11:49:37Z'}
httpretty.reset()
httpretty.enable()
oscrc = os.path.join(FIXTURES, 'oscrc')
osc.core.conf.get_config(override_conffile=oscrc,
override_no_keyring=True,
override_no_gnome_keyring=True)
#osc.conf.config['debug'] = 1
logging.basicConfig()
self.logger = logging.getLogger(__file__)
self.logger.setLevel(logging.DEBUG)
self.checker = FactorySourceChecker(apiurl = APIURL, \
user = 'factory-source', \
logger = self.logger)
def test_accept_request(self):
httpretty.register_uri(httpretty.GET,
APIURL + "/request/770001",
body = """
......
""")
httpretty.register_uri(httpretty.GET,
rr("/source/Base:System/timezone?rev=481ecbe0dfc63ece3a1f1b5598f7d96c&view=info"),
match_querystring = True,
body = """
timezone.spec
""")
httpretty.register_uri(httpretty.GET,
rr("/source/openSUSE:Factory/timezone?view=info"),
match_querystring = True,
body = """
timezone.spec
""")
httpretty.register_uri(httpretty.GET,
rr("/source/openSUSE:Factory/timezone/_history?limit=5"),
match_querystring = True,
body = """
timezone.spec
""")
httpretty.register_uri(httpretty.GET,
rr("/search/request?match=%28state%2F%40name%3D%27new%27+or+state%2F%40name%3D%27review%27%29+and+%28action%2Ftarget%2F%40project%3D%27openSUSE%3AFactory%27+or+submit%2Ftarget%2F%40project%3D%27openSUSE%3AFactory%27+or+action%2Fsource%2F%40project%3D%27openSUSE%3AFactory%27+or+submit%2Fsource%2F%40project%3D%27openSUSE%3AFactory%27%29+and+%28action%2Ftarget%2F%40package%3D%27timezone%27+or+submit%2Ftarget%2F%40package%3D%27timezone%27+or+action%2Fsource%2F%40package%3D%27timezone%27+or+submit%2Fsource%2F%40package%3D%27timezone%27%29+and+action%2F%40type%3D%27submit%27"),
match_querystring = True,
responses = [
httpretty.Response( body = """
... ...
"""),
httpretty.Response( body = """
... ...
""")
])
result = { 'status' : None }
def change_request(result, method, uri, headers):
u = urlparse.urlparse(uri)
if u.query == 'newstate=accepted&cmd=changereviewstate&by_user=factory-source':
result['status'] = True
else:
result['status'] = 'ERROR'
return (200, headers, '')
httpretty.register_uri(httpretty.POST,
APIURL + "/request/770001",
body = lambda method, uri, headers: change_request(result, method, uri, headers))
# first time request is in in review
self.checker.set_request_ids(['770001'])
self.checker.check_requests()
self.assertEqual(result['status'], None)
# second time request is in state new so we can accept
self.checker.set_request_ids(['770001'])
self.checker.check_requests()
self.assertTrue(result['status'])
def test_source_not_in_factory(self):
httpretty.register_uri(httpretty.GET,
rr("/search/request?withfullhistory=1&match=state%2F%40name%3D%27review%27+and+review%5B%40by_user%3D%27factory-source%27+and+%40state%3D%27new%27%5D"),
match_querystring = True,
body = """
acceptedReview got acceptedacceptedRequest createdtest updateRequest got a new review requesttest update
""")
httpretty.register_uri(httpretty.GET,
APIURL + "/request/261411",
body = """
acceptedReview got acceptedacceptedRequest createdtest updateRequest got a new review requesttest update
""")
httpretty.register_uri(httpretty.GET,
APIURL + "/source/home:lnussel:branches:openSUSE:Backports:SLE-12/plan",
body = """
""")
httpretty.register_uri(httpretty.GET,
rr("/source/openSUSE:Factory/plan?view=info"),
match_querystring = True,
status = 404,
body = """
openSUSE:Factory/plan
""")
httpretty.register_uri(httpretty.GET,
rr("/search/request?match=%28state%2F%40name%3D%27new%27+or+state%2F%40name%3D%27review%27%29+and+%28action%2Ftarget%2F%40project%3D%27openSUSE%3AFactory%27+or+submit%2Ftarget%2F%40project%3D%27openSUSE%3AFactory%27+or+action%2Fsource%2F%40project%3D%27openSUSE%3AFactory%27+or+submit%2Fsource%2F%40project%3D%27openSUSE%3AFactory%27%29+and+%28action%2Ftarget%2F%40package%3D%27plan%27+or+submit%2Ftarget%2F%40package%3D%27plan%27+or+action%2Fsource%2F%40package%3D%27plan%27+or+submit%2Fsource%2F%40package%3D%27plan%27%29+and+action%2F%40type%3D%27submit%27"),
match_querystring = True,
body = """
""")
result = { 'factory_source_declined' : None }
def change_request(result, method, uri, headers):
u = urlparse.urlparse(uri)
if u.query == 'newstate=declined&cmd=changereviewstate&by_user=factory-source':
result['factory_source_declined'] = True
return (200, headers, '')
httpretty.register_uri(httpretty.POST,
APIURL + "/request/261411",
body = lambda method, uri, headers: change_request(result, method, uri, headers))
self.checker.requests = []
self.checker.set_request_ids_search_review()
self.checker.check_requests()
self.assertTrue(result['factory_source_declined'])
if __name__ == '__main__':
unittest.main()