Remove fake OBS in favor of using generated test data
This commit is contained in:
parent
5f3dcca267
commit
8a7b65d505
@ -10,6 +10,11 @@ from osc.core import makeurl
|
|||||||
from osc.core import http_GET
|
from osc.core import http_GET
|
||||||
from osc.core import http_POST
|
from osc.core import http_POST
|
||||||
|
|
||||||
|
try:
|
||||||
|
from urllib.error import HTTPError
|
||||||
|
except ImportError:
|
||||||
|
# python 2.x
|
||||||
|
from urllib2 import HTTPError
|
||||||
|
|
||||||
class OBSLock(object):
|
class OBSLock(object):
|
||||||
"""Implement a distributed lock using a shared OBS resource."""
|
"""Implement a distributed lock using a shared OBS resource."""
|
||||||
@ -51,7 +56,12 @@ class OBSLock(object):
|
|||||||
|
|
||||||
def _read(self):
|
def _read(self):
|
||||||
url = makeurl(self.apiurl, ['source', self.lock, '_attribute', '%s:LockedBy' % self.ns])
|
url = makeurl(self.apiurl, ['source', self.lock, '_attribute', '%s:LockedBy' % self.ns])
|
||||||
root = ET.parse(http_GET(url)).getroot()
|
try:
|
||||||
|
root = ET.parse(http_GET(url)).getroot()
|
||||||
|
except HTTPError as e:
|
||||||
|
if e.code == 404:
|
||||||
|
return None
|
||||||
|
raise e
|
||||||
signature = None
|
signature = None
|
||||||
try:
|
try:
|
||||||
signature = root.find('.//value').text
|
signature = root.find('.//value').text
|
||||||
|
@ -245,36 +245,6 @@ class StagingAPI(object):
|
|||||||
|
|
||||||
return packages_staged
|
return packages_staged
|
||||||
|
|
||||||
def get_package_information(self, project, pkgname, rev=None):
|
|
||||||
"""
|
|
||||||
Get the revision packagename and source project to copy from
|
|
||||||
based on content provided
|
|
||||||
:param project: the project we are having the package in
|
|
||||||
:param pkgname: name of the package we want to identify
|
|
||||||
:return dict ( project, package, revision, md5sum )
|
|
||||||
"""
|
|
||||||
|
|
||||||
package_info = {}
|
|
||||||
|
|
||||||
query = {
|
|
||||||
'rev': rev
|
|
||||||
}
|
|
||||||
if rev:
|
|
||||||
url = self.makeurl(['source', project, pkgname], query=query)
|
|
||||||
else:
|
|
||||||
url = self.makeurl(['source', project, pkgname])
|
|
||||||
content = http_GET(url)
|
|
||||||
root = ET.parse(content).getroot()
|
|
||||||
package_info['dir_srcmd5'] = root.attrib['srcmd5']
|
|
||||||
|
|
||||||
linkinfo = root.find('linkinfo')
|
|
||||||
package_info['srcmd5'] = linkinfo.attrib['srcmd5']
|
|
||||||
package_info['rev'] = linkinfo.attrib.get('rev', None)
|
|
||||||
package_info['project'] = linkinfo.attrib['project']
|
|
||||||
package_info['package'] = linkinfo.attrib['package']
|
|
||||||
|
|
||||||
return package_info
|
|
||||||
|
|
||||||
def extract_specfile_short(self, filelist):
|
def extract_specfile_short(self, filelist):
|
||||||
packages = [spec[:-5] for spec in filelist if re.search(r'\.spec$', spec)]
|
packages = [spec[:-5] for spec in filelist if re.search(r'\.spec$', spec)]
|
||||||
|
|
||||||
@ -1042,14 +1012,18 @@ class StagingAPI(object):
|
|||||||
"""Determine if staging project is both active and no longer pending."""
|
"""Determine if staging project is both active and no longer pending."""
|
||||||
return status['overall_state'] in ['acceptable', 'review', 'failed']
|
return status['overall_state'] in ['acceptable', 'review', 'failed']
|
||||||
|
|
||||||
|
# we use a private function to mock it - httpretty and vcr don't mix well
|
||||||
|
def _fetch_project_meta(self, project):
|
||||||
|
url = self.makeurl(['source', project, '_project'], {'meta': '1'})
|
||||||
|
return http_GET(url).read()
|
||||||
|
|
||||||
def days_since_last_freeze(self, project):
|
def days_since_last_freeze(self, project):
|
||||||
"""
|
"""
|
||||||
Checks the last update for the frozen links
|
Checks the last update for the frozen links
|
||||||
:param project: project to check
|
:param project: project to check
|
||||||
:return age in days(float) of the last update
|
:return age in days(float) of the last update
|
||||||
"""
|
"""
|
||||||
url = self.makeurl(['source', project, '_project'], {'meta': '1'})
|
root = ET.fromstring(self._fetch_project_meta(project))
|
||||||
root = ET.parse(http_GET(url)).getroot()
|
|
||||||
for entry in root.findall('entry'):
|
for entry in root.findall('entry'):
|
||||||
if entry.get('name') == '_frozenlinks':
|
if entry.get('name') == '_frozenlinks':
|
||||||
return (time.time() - float(entry.get('mtime')))/3600/24
|
return (time.time() - float(entry.get('mtime')))/3600/24
|
||||||
|
@ -12,4 +12,3 @@ influxdb
|
|||||||
# Dependencies for testing
|
# Dependencies for testing
|
||||||
httpretty<0.9.6
|
httpretty<0.9.6
|
||||||
mock
|
mock
|
||||||
|
|
||||||
|
@ -1,38 +1,52 @@
|
|||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from . import obs
|
|
||||||
from osclib.accept_command import AcceptCommand
|
from osclib.accept_command import AcceptCommand
|
||||||
|
from osclib.select_command import SelectCommand
|
||||||
from osclib.conf import Config
|
from osclib.conf import Config
|
||||||
from osclib.comments import CommentAPI
|
from osclib.comments import CommentAPI
|
||||||
from osclib.stagingapi import StagingAPI
|
from osclib.stagingapi import StagingAPI
|
||||||
|
|
||||||
|
from mock import MagicMock
|
||||||
|
from . import vcrhelpers
|
||||||
|
|
||||||
class TestAccept(unittest.TestCase):
|
class TestAccept(unittest.TestCase):
|
||||||
|
|
||||||
def setUp(self):
|
def setup_vcr(self):
|
||||||
"""
|
wf = vcrhelpers.StagingWorkflow()
|
||||||
Initialize the configuration
|
wf.setup_rings()
|
||||||
"""
|
|
||||||
self.obs = obs.OBS()
|
self.c_api = CommentAPI(wf.api.apiurl)
|
||||||
Config(obs.APIURL, 'openSUSE:Factory')
|
|
||||||
self.api = StagingAPI(obs.APIURL, 'openSUSE:Factory')
|
staging_b = wf.create_staging('B', freeze=True)
|
||||||
|
self.prj = staging_b.name
|
||||||
|
|
||||||
|
self.winerq = wf.create_submit_request('devel:wine', 'wine', text='Hallo World')
|
||||||
|
self.assertEqual(True, SelectCommand(wf.api, self.prj).perform(['wine']))
|
||||||
|
self.comments = self.c_api.get_comments(project_name=self.prj)
|
||||||
|
self.assertGreater(len(self.comments), 0)
|
||||||
|
return wf
|
||||||
|
|
||||||
def test_accept_comments(self):
|
def test_accept_comments(self):
|
||||||
c_api = CommentAPI(self.api.apiurl)
|
wf = self.setup_vcr()
|
||||||
staging_c = 'openSUSE:Factory:Staging:C'
|
|
||||||
comments = c_api.get_comments(project_name=staging_c)
|
|
||||||
|
|
||||||
# Accept staging C (containing apparmor and mariadb)
|
self.assertEqual(True, AcceptCommand(wf.api).perform(self.prj))
|
||||||
self.assertEqual(True, AcceptCommand(self.api).perform(staging_c))
|
|
||||||
|
|
||||||
# Comments are cleared up
|
# Comments are cleared up
|
||||||
accepted_comments = c_api.get_comments(project_name=staging_c)
|
accepted_comments = self.c_api.get_comments(project_name=self.prj)
|
||||||
self.assertNotEqual(len(comments), 0)
|
|
||||||
self.assertEqual(len(accepted_comments), 0)
|
self.assertEqual(len(accepted_comments), 0)
|
||||||
|
|
||||||
# But the comment was written at some point
|
def test_accept_final_comment(self):
|
||||||
self.assertEqual(len(self.obs.comment_bodies), 1)
|
wf = self.setup_vcr()
|
||||||
comment = self.obs.comment_bodies[0]
|
|
||||||
self.assertTrue('The following packages have been submitted to openSUSE:Factory' in comment)
|
# snipe out cleanup to see the comments before the final countdown
|
||||||
self.assertTrue('apparmor' in comment)
|
wf.api.staging_deactivate = MagicMock(return_value=True)
|
||||||
self.assertTrue('mariadb' in comment)
|
|
||||||
|
self.assertEqual(True, AcceptCommand(wf.api).perform(self.prj))
|
||||||
|
|
||||||
|
comments = self.c_api.get_comments(project_name=self.prj)
|
||||||
|
self.assertGreater(len(comments), len(self.comments))
|
||||||
|
|
||||||
|
# check which id was added
|
||||||
|
new_id = (set(comments.keys()) - set(self.comments.keys())).pop()
|
||||||
|
comment = comments[new_id]['comment']
|
||||||
|
self.assertEqual('Project "{}" accepted. The following packages have been submitted to openSUSE:Factory: wine.'.format(self.prj), comment)
|
||||||
|
@ -1,86 +1,72 @@
|
|||||||
|
from __future__ import print_function
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
import unittest
|
import unittest
|
||||||
import httpretty
|
import httpretty
|
||||||
|
import re
|
||||||
|
|
||||||
from . import obs
|
|
||||||
|
import osc.core
|
||||||
from osclib.conf import Config
|
from osclib.conf import Config
|
||||||
from osclib.stagingapi import StagingAPI
|
from osclib.stagingapi import StagingAPI
|
||||||
|
from xml.etree import cElementTree as ET
|
||||||
|
from mock import MagicMock
|
||||||
|
from . import vcrhelpers
|
||||||
|
|
||||||
class TestApiCalls(unittest.TestCase):
|
class TestApiCalls(unittest.TestCase):
|
||||||
"""
|
"""
|
||||||
Tests for various api calls to ensure we return expected content
|
Tests for various api calls to ensure we return expected content
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
"""
|
|
||||||
Initialize the configuration
|
|
||||||
"""
|
|
||||||
|
|
||||||
self.obs = obs.OBS()
|
|
||||||
Config(obs.APIURL, 'openSUSE:Factory')
|
|
||||||
self.api = StagingAPI(obs.APIURL, 'openSUSE:Factory')
|
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
"""Clean internal cache"""
|
|
||||||
if hasattr(self.api, '_invalidate_all'):
|
|
||||||
self.api._invalidate_all()
|
|
||||||
|
|
||||||
def test_ring_packages(self):
|
def test_ring_packages(self):
|
||||||
"""
|
"""
|
||||||
Validate the creation of the rings.
|
Validate the creation of the rings.
|
||||||
"""
|
"""
|
||||||
# our content in the XML files
|
|
||||||
|
wf = vcrhelpers.StagingWorkflow()
|
||||||
|
wf.setup_rings()
|
||||||
|
|
||||||
|
curl = wf.create_package('target', 'curl')
|
||||||
|
curl.create_file('curl.spec')
|
||||||
|
curl.create_file('curl-mini.spec')
|
||||||
|
cmini = wf.create_link(curl, target_project=wf.projects['target'], target_package='curl-mini')
|
||||||
|
cring1 = wf.create_link(curl, target_project=wf.projects['ring1'], target_package='curl')
|
||||||
|
cring0 = wf.create_link(cring1, target_project=wf.projects['ring0'], target_package='curl-mini')
|
||||||
|
|
||||||
# test content for listonly ie. list command
|
# test content for listonly ie. list command
|
||||||
ring_packages = {
|
ring_packages = {
|
||||||
'apparmor': 'openSUSE:Factory:Rings:1-MinimalX',
|
'curl': 'openSUSE:Factory:Rings:0-Bootstrap',
|
||||||
'elem-ring-0': 'openSUSE:Factory:Rings:0-Bootstrap',
|
'curl-mini': 'openSUSE:Factory:Rings:0-Bootstrap',
|
||||||
'elem-ring-1': 'openSUSE:Factory:Rings:0-Bootstrap',
|
|
||||||
'elem-ring-mini': 'openSUSE:Factory:Rings:0-Bootstrap',
|
|
||||||
'wine': 'openSUSE:Factory:Rings:1-MinimalX',
|
'wine': 'openSUSE:Factory:Rings:1-MinimalX',
|
||||||
}
|
}
|
||||||
self.assertEqual(ring_packages, self.api.ring_packages_for_links)
|
self.assertEqual(ring_packages, wf.api.ring_packages_for_links)
|
||||||
|
|
||||||
# test content for real usage
|
# test content for real usage
|
||||||
ring_packages = {
|
ring_packages = {
|
||||||
'apparmor': 'openSUSE:Factory:Rings:1-MinimalX',
|
'curl': 'openSUSE:Factory:Rings:1-MinimalX',
|
||||||
'elem-ring-0': 'openSUSE:Factory:Rings:0-Bootstrap',
|
'curl-mini': 'openSUSE:Factory:Rings:0-Bootstrap',
|
||||||
'elem-ring-1': 'openSUSE:Factory:Rings:1-MinimalX',
|
|
||||||
'elem-ring-mini': 'openSUSE:Factory:Rings:0-Bootstrap',
|
|
||||||
'wine': 'openSUSE:Factory:Rings:1-MinimalX',
|
'wine': 'openSUSE:Factory:Rings:1-MinimalX',
|
||||||
}
|
}
|
||||||
self.assertEqual(ring_packages, self.api.ring_packages)
|
self.assertEqual(ring_packages, wf.api.ring_packages)
|
||||||
|
|
||||||
@unittest.skip("no longer approving non-ring packages")
|
|
||||||
def test_dispatch_open_requests(self):
|
|
||||||
"""
|
|
||||||
Test dispatching and closure of non-ring packages
|
|
||||||
"""
|
|
||||||
|
|
||||||
# Get rid of open requests
|
|
||||||
self.api.dispatch_open_requests()
|
|
||||||
# Check that we tried to close it
|
|
||||||
self.assertEqual(httpretty.last_request().method, 'POST')
|
|
||||||
self.assertEqual(httpretty.last_request().querystring[u'cmd'], [u'changereviewstate'])
|
|
||||||
# Try it again
|
|
||||||
self.api.dispatch_open_requests()
|
|
||||||
# This time there should be nothing to close
|
|
||||||
self.assertEqual(httpretty.last_request().method, 'GET')
|
|
||||||
|
|
||||||
def test_pseudometa_get_prj(self):
|
def test_pseudometa_get_prj(self):
|
||||||
"""
|
"""
|
||||||
Test getting project metadata from YAML in project description
|
Test getting project metadata from YAML in project description
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
wf = self.setup_vcr()
|
||||||
|
wf.create_staging('A')
|
||||||
|
|
||||||
# Try to get data from project that has no metadata
|
# Try to get data from project that has no metadata
|
||||||
data = self.api.get_prj_pseudometa('openSUSE:Factory:Staging:A')
|
data = wf.api.get_prj_pseudometa('openSUSE:Factory:Staging:A')
|
||||||
# Should be empty, but contain structure to work with
|
# Should be empty, but contain structure to work with
|
||||||
self.assertEqual(data, {'requests': []})
|
self.assertEqual(data, {'requests': []})
|
||||||
# Add some sample data
|
# Add some sample data
|
||||||
rq = {'id': '123', 'package': 'test-package'}
|
rq = {'id': '123', 'package': 'test-package'}
|
||||||
data['requests'].append(rq)
|
data['requests'].append(rq)
|
||||||
# Save them and read them back
|
# Save them and read them back
|
||||||
self.api.set_prj_pseudometa('openSUSE:Factory:Staging:A', data)
|
wf.api.set_prj_pseudometa('openSUSE:Factory:Staging:A', data)
|
||||||
test_data = self.api.get_prj_pseudometa('openSUSE:Factory:Staging:A')
|
test_data = wf.api.get_prj_pseudometa('openSUSE:Factory:Staging:A')
|
||||||
# Verify that we got back the same data
|
# Verify that we got back the same data
|
||||||
self.assertEqual(data, test_data)
|
self.assertEqual(data, test_data)
|
||||||
|
|
||||||
@ -89,186 +75,207 @@ class TestApiCalls(unittest.TestCase):
|
|||||||
List projects and their content
|
List projects and their content
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
wf = self.setup_vcr()
|
||||||
|
staging_a = wf.create_staging('A')
|
||||||
|
|
||||||
# Prepare expected results
|
# Prepare expected results
|
||||||
data = []
|
data = [staging_a.name, self.staging_b.name]
|
||||||
for prj in self.obs.staging_project:
|
|
||||||
data.append('openSUSE:Factory:Staging:' + prj)
|
|
||||||
|
|
||||||
# Compare the results
|
# Compare the results
|
||||||
self.assertEqual(data, self.api.get_staging_projects())
|
self.assertEqual(data, wf.api.get_staging_projects())
|
||||||
|
|
||||||
def test_open_requests(self):
|
def test_open_requests(self):
|
||||||
"""
|
"""
|
||||||
Test searching for open requests
|
Test searching for open requests
|
||||||
"""
|
"""
|
||||||
|
wf = vcrhelpers.StagingWorkflow()
|
||||||
requests = []
|
wf.create_submit_request('devel:wine', 'wine')
|
||||||
|
|
||||||
# get the open requests
|
# get the open requests
|
||||||
requests = self.api.get_open_requests()
|
requests = wf.api.get_open_requests()
|
||||||
|
|
||||||
# Compare the results, we only care now that we got 1 of them not the content
|
# Compare the results, we only care now that we got 1 of them not the content
|
||||||
self.assertEqual(1, len(requests))
|
self.assertEqual(1, len(requests))
|
||||||
|
|
||||||
def test_get_package_information(self):
|
|
||||||
"""
|
|
||||||
Test if we get proper project, name and revision from the staging informations
|
|
||||||
"""
|
|
||||||
|
|
||||||
package_info = {
|
|
||||||
'dir_srcmd5': '751efeae52d6c99de48164088a33d855',
|
|
||||||
'project': 'home:Admin',
|
|
||||||
'rev': '7b98ac01b8071d63a402fa99dc79331c',
|
|
||||||
'srcmd5': '7b98ac01b8071d63a402fa99dc79331c',
|
|
||||||
'package': 'wine'
|
|
||||||
}
|
|
||||||
|
|
||||||
# Compare the results, we only care now that we got 2 of them not the content
|
|
||||||
self.assertEqual(
|
|
||||||
package_info,
|
|
||||||
self.api.get_package_information('openSUSE:Factory:Staging:B', 'wine'))
|
|
||||||
|
|
||||||
def test_request_id_package_mapping(self):
|
def test_request_id_package_mapping(self):
|
||||||
"""
|
"""
|
||||||
Test whether we can get correct id for sr in staging project
|
Test whether we can get correct id for sr in staging project
|
||||||
"""
|
"""
|
||||||
|
|
||||||
prj = 'openSUSE:Factory:Staging:B'
|
wf = self.setup_vcr()
|
||||||
|
prj = self.staging_b.name
|
||||||
|
|
||||||
# Get rq
|
# Get rq
|
||||||
num = self.api.get_request_id_for_package(prj, 'wine')
|
num = wf.api.get_request_id_for_package(prj, 'wine')
|
||||||
self.assertEqual(333, num)
|
self.assertEqual(str(num), self.winerq.reqid)
|
||||||
# Get package name
|
# Get package name
|
||||||
self.assertEqual('wine', self.api.get_package_for_request_id(prj, num))
|
self.assertEqual('wine', wf.api.get_package_for_request_id(prj, num))
|
||||||
|
|
||||||
|
def setup_vcr(self):
|
||||||
|
wf = vcrhelpers.StagingWorkflow()
|
||||||
|
wf.setup_rings()
|
||||||
|
self.staging_b = wf.create_staging('B')
|
||||||
|
prj = self.staging_b.name
|
||||||
|
|
||||||
|
self.winerq = wf.create_submit_request('devel:wine', 'wine', text='Hallo World')
|
||||||
|
wf.api.rq_to_prj(self.winerq.reqid, prj)
|
||||||
|
|
||||||
|
# Get rq number
|
||||||
|
num = wf.api.get_request_id_for_package(prj, 'wine')
|
||||||
|
self.assertEqual(str(num), self.winerq.reqid)
|
||||||
|
self.assertIsNotNone(num)
|
||||||
|
self.assertTrue(wf.api.item_exists(prj, 'wine'))
|
||||||
|
|
||||||
|
return wf
|
||||||
|
|
||||||
def test_rm_from_prj(self):
|
def test_rm_from_prj(self):
|
||||||
prj = 'openSUSE:Factory:Staging:B'
|
wf = self.setup_vcr()
|
||||||
pkg = 'wine'
|
|
||||||
|
|
||||||
full_name = prj + '/' + pkg
|
|
||||||
|
|
||||||
# Verify package is there
|
|
||||||
self.assertTrue(full_name in self.obs.links)
|
|
||||||
|
|
||||||
# Get rq number
|
|
||||||
num = self.api.get_request_id_for_package(prj, pkg)
|
|
||||||
|
|
||||||
# Delete the package
|
# Delete the package
|
||||||
self.api.rm_from_prj(prj, package='wine')
|
wf.api.rm_from_prj(self.staging_b.name, package='wine')
|
||||||
|
self.verify_wine_is_gone(wf)
|
||||||
# Verify package is not there
|
|
||||||
self.assertTrue(full_name not in self.obs.links)
|
|
||||||
|
|
||||||
# RQ is gone
|
|
||||||
self.assertEqual(None, self.api.get_request_id_for_package(prj, pkg))
|
|
||||||
self.assertEqual(None, self.api.get_package_for_request_id(prj, num))
|
|
||||||
|
|
||||||
# Verify that review is closed
|
|
||||||
self.assertEqual('accepted', self.obs.requests[str(num)]['review'])
|
|
||||||
self.assertEqual('new', self.obs.requests[str(num)]['request'])
|
|
||||||
|
|
||||||
def test_rm_from_prj_2(self):
|
def test_rm_from_prj_2(self):
|
||||||
|
wf = self.setup_vcr()
|
||||||
|
|
||||||
# Try the same with request number
|
# Try the same with request number
|
||||||
prj = 'openSUSE:Factory:Staging:B'
|
wf.api.rm_from_prj(self.staging_b.name, request_id=self.winerq.reqid)
|
||||||
|
self.verify_wine_is_gone(wf)
|
||||||
|
|
||||||
|
def verify_wine_is_gone(self, wf):
|
||||||
|
prj = self.staging_b.name
|
||||||
pkg = 'wine'
|
pkg = 'wine'
|
||||||
|
num = self.winerq.reqid
|
||||||
full_name = prj + '/' + pkg
|
|
||||||
|
|
||||||
# Get rq number
|
|
||||||
num = self.api.get_request_id_for_package(prj, pkg)
|
|
||||||
|
|
||||||
# Delete the package
|
|
||||||
self.api.rm_from_prj(prj, request_id=num)
|
|
||||||
|
|
||||||
# Verify package is not there
|
# Verify package is not there
|
||||||
self.assertTrue(full_name not in self.obs.links)
|
self.assertFalse(wf.api.item_exists(prj, pkg))
|
||||||
|
|
||||||
# RQ is gone
|
# RQ is gone
|
||||||
self.assertEqual(None, self.api.get_request_id_for_package(prj, pkg))
|
self.assertIsNone(wf.api.get_request_id_for_package(prj, pkg))
|
||||||
self.assertEqual(None, self.api.get_package_for_request_id(prj, num))
|
self.assertIsNone(wf.api.get_package_for_request_id(prj, num))
|
||||||
|
|
||||||
# Verify that review is closed
|
# Verify that review is closed
|
||||||
self.assertEqual('accepted', self.obs.requests[str(num)]['review'])
|
rq = self.winerq.xml()
|
||||||
self.assertEqual('new', self.obs.requests[str(num)]['request'])
|
self.assertIsNotNone(rq.find('.//state[@name="new"]'))
|
||||||
|
|
||||||
def test_add_sr(self):
|
def test_add_sr(self):
|
||||||
prj = 'openSUSE:Factory:Staging:A'
|
wf = self.setup_vcr()
|
||||||
rq = '123'
|
|
||||||
|
prj = self.staging_b.name
|
||||||
|
pkg = 'wine'
|
||||||
|
num = self.winerq.reqid
|
||||||
|
|
||||||
# Running it twice shouldn't change anything
|
# Running it twice shouldn't change anything
|
||||||
for i in range(2):
|
for i in range(2):
|
||||||
# Add rq to the project
|
# Add rq to the project
|
||||||
self.api.rq_to_prj(rq, prj)
|
wf.api.rq_to_prj(num, prj)
|
||||||
# Verify that review is there
|
# Verify that review is there
|
||||||
self.assertEqual('new', self.obs.requests[rq]['review'])
|
reviews = [{'by_group': 'factory-staging', 'state': 'accepted'},
|
||||||
self.assertEqual('review', self.obs.requests[rq]['request'])
|
{'by_project': 'openSUSE:Factory:Staging:B', 'state': 'new'}]
|
||||||
self.assertEqual(self.api.get_prj_pseudometa('openSUSE:Factory:Staging:A'),
|
self.assertEqual(self.winerq.reviews(), reviews)
|
||||||
{'requests': [{'id': 123, 'package': 'gcc', 'author': 'Admin', 'type': 'submit'}]})
|
self.assertEqual(wf.api.get_prj_pseudometa(prj),
|
||||||
|
{'requests': [{'id': int(num), 'package': 'wine', 'author': 'Admin', 'type': 'submit'}]})
|
||||||
|
|
||||||
def test_create_package_container(self):
|
def test_create_package_container(self):
|
||||||
"""Test if the uploaded _meta is correct."""
|
"""Test if the uploaded _meta is correct."""
|
||||||
|
|
||||||
self.api.create_package_container('openSUSE:Factory:Staging:B', 'wine')
|
wf = vcrhelpers.StagingWorkflow()
|
||||||
self.assertEqual(httpretty.last_request().method, 'PUT')
|
wf.create_staging('B')
|
||||||
self.assertEqual(httpretty.last_request().body, '<package name="wine"><title/><description/></package>')
|
wf.api.create_package_container('openSUSE:Factory:Staging:B', 'wine')
|
||||||
self.assertEqual(httpretty.last_request().path, '/source/openSUSE:Factory:Staging:B/wine/_meta')
|
|
||||||
|
|
||||||
self.api.create_package_container('openSUSE:Factory:Staging:B', 'wine', disable_build=True)
|
url = wf.api.makeurl(['source', 'openSUSE:Factory:Staging:B', 'wine', '_meta'])
|
||||||
self.assertEqual(httpretty.last_request().method, 'PUT')
|
self.assertEqual(osc.core.http_GET(url).read(), '<package name="wine" project="openSUSE:Factory:Staging:B">\n <title/>\n <description/>\n</package>\n')
|
||||||
self.assertEqual(httpretty.last_request().body, '<package name="wine"><title/><description/><build><disable/></build></package>')
|
|
||||||
self.assertEqual(httpretty.last_request().path, '/source/openSUSE:Factory:Staging:B/wine/_meta')
|
wf.api.create_package_container('openSUSE:Factory:Staging:B', 'wine', disable_build=True)
|
||||||
|
m = '<package name="wine" project="openSUSE:Factory:Staging:B">\n <title/>\n <description/>\n <build>\n <disable/>\n </build>\n</package>\n'
|
||||||
|
self.assertEqual(osc.core.http_GET(url).read(), m)
|
||||||
|
|
||||||
def test_review_handling(self):
|
def test_review_handling(self):
|
||||||
"""Test whether accepting/creating reviews behaves correctly."""
|
"""Test whether accepting/creating reviews behaves correctly."""
|
||||||
|
|
||||||
|
wf = self.setup_vcr()
|
||||||
|
|
||||||
|
request = wf.create_submit_request('devel:wine', 'winetools')
|
||||||
|
reviews = [{'state': 'new', 'by_group': 'factory-staging'}]
|
||||||
|
self.assertEqual(request.reviews(), reviews)
|
||||||
|
num = request.reqid
|
||||||
|
|
||||||
# Add review
|
# Add review
|
||||||
self.api.add_review('123', by_project='openSUSE:Factory:Staging:A')
|
wf.api.add_review(num, by_project='openSUSE:Factory:Staging:B')
|
||||||
self.assertEqual(httpretty.last_request().method, 'POST')
|
reviews.append({'by_project': 'openSUSE:Factory:Staging:B', 'state': 'new'})
|
||||||
self.assertEqual(httpretty.last_request().querystring[u'cmd'], [u'addreview'])
|
self.assertEqual(request.reviews(), reviews)
|
||||||
# Try to readd, should do anything
|
|
||||||
self.api.add_review('123', by_project='openSUSE:Factory:Staging:A')
|
# Try to readd, should not do anything
|
||||||
self.assertEqual(httpretty.last_request().method, 'GET')
|
wf.api.add_review(num, by_project='openSUSE:Factory:Staging:B')
|
||||||
|
self.assertEqual(request.reviews(), reviews)
|
||||||
|
|
||||||
# Accept review
|
# Accept review
|
||||||
self.api.set_review('123', 'openSUSE:Factory:Staging:A')
|
wf.api.set_review(num, 'openSUSE:Factory:Staging:B')
|
||||||
self.assertEqual(httpretty.last_request().method, 'POST')
|
reviews[1]['state'] = 'accepted'
|
||||||
self.assertEqual(httpretty.last_request().querystring[u'cmd'], [u'changereviewstate'])
|
self.assertEqual(request.reviews(), reviews)
|
||||||
|
|
||||||
# Try to accept it again should do anything
|
# Try to accept it again should do anything
|
||||||
self.api.set_review('123', 'openSUSE:Factory:Staging:A')
|
wf.api.set_review(num, 'openSUSE:Factory:Staging:B')
|
||||||
self.assertEqual(httpretty.last_request().method, 'GET')
|
self.assertEqual(request.reviews(), reviews)
|
||||||
|
|
||||||
# But we should be able to reopen it
|
# But we should be able to reopen it
|
||||||
self.api.add_review('123', by_project='openSUSE:Factory:Staging:A')
|
wf.api.add_review(num, by_project='openSUSE:Factory:Staging:B')
|
||||||
self.assertEqual(httpretty.last_request().method, 'POST')
|
reviews.append({'by_project': 'openSUSE:Factory:Staging:B', 'state': 'new'})
|
||||||
self.assertEqual(httpretty.last_request().querystring[u'cmd'], [u'addreview'])
|
self.assertEqual(request.reviews(), reviews)
|
||||||
|
|
||||||
def test_prj_from_letter(self):
|
def test_prj_from_letter(self):
|
||||||
|
|
||||||
|
wf = vcrhelpers.StagingWorkflow()
|
||||||
# Verify it works
|
# Verify it works
|
||||||
self.assertEqual(self.api.prj_from_letter('openSUSE:Factory'), 'openSUSE:Factory')
|
self.assertEqual(wf.api.prj_from_letter('openSUSE:Factory'), 'openSUSE:Factory')
|
||||||
self.assertEqual(self.api.prj_from_letter('A'), 'openSUSE:Factory:Staging:A')
|
self.assertEqual(wf.api.prj_from_letter('A'), 'openSUSE:Factory:Staging:A')
|
||||||
|
|
||||||
def test_frozen_mtime(self):
|
def test_frozen_mtime(self):
|
||||||
"""Test frozen mtime."""
|
"""Test frozen mtime."""
|
||||||
|
|
||||||
# Testing frozen mtime
|
wf = vcrhelpers.StagingWorkflow()
|
||||||
self.assertTrue(self.api.days_since_last_freeze('openSUSE:Factory:Staging:A') > 8)
|
wf.create_staging('A')
|
||||||
self.assertTrue(self.api.days_since_last_freeze('openSUSE:Factory:Staging:B') < 1)
|
|
||||||
|
|
||||||
# U == unfrozen
|
# unfrozen
|
||||||
self.assertTrue(self.api.days_since_last_freeze('openSUSE:Factory:Staging:U') > 1000)
|
self.assertTrue(wf.api.days_since_last_freeze('openSUSE:Factory:Staging:A') > 1000)
|
||||||
|
|
||||||
|
# Testing frozen mtime
|
||||||
|
wf.create_staging('B', freeze=True)
|
||||||
|
self.assertLess(wf.api.days_since_last_freeze('openSUSE:Factory:Staging:B'), 1)
|
||||||
|
self.mock_project_meta(wf)
|
||||||
|
self.assertGreater(wf.api.days_since_last_freeze('openSUSE:Factory:Staging:B'), 8)
|
||||||
|
|
||||||
def test_frozen_enough(self):
|
def test_frozen_enough(self):
|
||||||
"""Test frozen enough."""
|
"""Test frozen enough."""
|
||||||
|
|
||||||
# Testing frozen mtime
|
wf = vcrhelpers.StagingWorkflow()
|
||||||
self.assertEqual(self.api.prj_frozen_enough('openSUSE:Factory:Staging:B'), True)
|
wf.create_staging('A')
|
||||||
self.assertEqual(self.api.prj_frozen_enough('openSUSE:Factory:Staging:A'), False)
|
|
||||||
|
|
||||||
# U == unfrozen
|
# Unfrozen
|
||||||
self.assertEqual(self.api.prj_frozen_enough('openSUSE:Factory:Staging:U'), False)
|
self.assertEqual(wf.api.prj_frozen_enough('openSUSE:Factory:Staging:A'), False)
|
||||||
|
|
||||||
|
wf.create_staging('B', freeze=True)
|
||||||
|
self.assertEqual(wf.api.prj_frozen_enough('openSUSE:Factory:Staging:B'), True)
|
||||||
|
|
||||||
|
self.mock_project_meta(wf)
|
||||||
|
self.assertEqual(wf.api.prj_frozen_enough('openSUSE:Factory:Staging:B'), False)
|
||||||
|
|
||||||
|
def mock_project_meta(self, wf):
|
||||||
|
body = """<directory name="_project" rev="3" vrev="" srcmd5="9dd1ec5b77a9e953662eb32955e9066a">
|
||||||
|
<entry name="_frozenlinks" md5="64127b7a5dabbca0ec2bf04cd04c9195" size="16" mtime="1555000000"/>
|
||||||
|
<entry name="_meta" md5="cf6fb1eac1a676d6c3707303ae2571ad" size="162" mtime="1555945413"/>
|
||||||
|
</directory>"""
|
||||||
|
|
||||||
|
wf.api._fetch_project_meta = MagicMock(return_value=body)
|
||||||
|
|
||||||
def test_move(self):
|
def test_move(self):
|
||||||
"""Test package movement."""
|
"""Test package movement."""
|
||||||
|
|
||||||
init_data = self.api.get_package_information('openSUSE:Factory:Staging:B', 'wine')
|
wf = self.setup_vcr()
|
||||||
self.api.move_between_project('openSUSE:Factory:Staging:B', 333, 'openSUSE:Factory:Staging:A')
|
staging_a = wf.create_staging('A')
|
||||||
test_data = self.api.get_package_information('openSUSE:Factory:Staging:A', 'wine')
|
|
||||||
self.assertEqual(init_data, test_data)
|
self.assertTrue(wf.api.item_exists('openSUSE:Factory:Staging:B', 'wine'))
|
||||||
|
self.assertFalse(wf.api.item_exists('openSUSE:Factory:Staging:A', 'wine'))
|
||||||
|
wf.api.move_between_project('openSUSE:Factory:Staging:B', self.winerq.reqid, 'openSUSE:Factory:Staging:A')
|
||||||
|
self.assertTrue(wf.api.item_exists('openSUSE:Factory:Staging:A', 'wine'))
|
||||||
|
self.assertFalse(wf.api.item_exists('openSUSE:Factory:Staging:B', 'wine'))
|
||||||
|
@ -1,10 +1,13 @@
|
|||||||
import unittest
|
import unittest
|
||||||
|
import json
|
||||||
|
|
||||||
from . import obs
|
|
||||||
from osclib.conf import Config
|
from osclib.conf import Config
|
||||||
from osclib.check_command import CheckCommand
|
from osclib.check_command import CheckCommand
|
||||||
from osclib.stagingapi import StagingAPI
|
from osclib.stagingapi import StagingAPI
|
||||||
|
|
||||||
|
from mock import MagicMock
|
||||||
|
from . import vcrhelpers
|
||||||
|
|
||||||
FULL_REPORT = """
|
FULL_REPORT = """
|
||||||
-- BUILDING Project openSUSE:Factory:Staging:A still needs attention
|
-- BUILDING Project openSUSE:Factory:Staging:A still needs attention
|
||||||
- pcre: Missing reviews: repo-checker
|
- pcre: Missing reviews: repo-checker
|
||||||
@ -109,21 +112,25 @@ H_REPORT = """
|
|||||||
class TestCheckCommand(unittest.TestCase):
|
class TestCheckCommand(unittest.TestCase):
|
||||||
"""Tests CheckCommand."""
|
"""Tests CheckCommand."""
|
||||||
|
|
||||||
def setUp(self):
|
def setup_vcr(self):
|
||||||
"""Initialize the configuration."""
|
wf = vcrhelpers.StagingWorkflow()
|
||||||
|
wf.create_staging('H')
|
||||||
self.obs = obs.OBS()
|
self.checkcommand = CheckCommand(wf.api)
|
||||||
Config(obs.APIURL, 'openSUSE:Factory')
|
return wf
|
||||||
self.stagingapi = StagingAPI(obs.APIURL, 'openSUSE:Factory')
|
|
||||||
self.checkcommand = CheckCommand(self.stagingapi)
|
|
||||||
|
|
||||||
def test_check_command_all(self):
|
def test_check_command_all(self):
|
||||||
"""Validate json conversion for all projects."""
|
"""Validate json conversion for all projects."""
|
||||||
|
wf = self.setup_vcr()
|
||||||
|
with open('tests/fixtures/project/staging_projects/openSUSE:Factory.json') as f:
|
||||||
|
wf.api.project_status = MagicMock(return_value=json.load(f))
|
||||||
report = self.checkcommand._check_project()
|
report = self.checkcommand._check_project()
|
||||||
self.maxDiff = 20000
|
self.maxDiff = 20000
|
||||||
self.assertMultiLineEqual('\n'.join(report).strip(), FULL_REPORT.strip())
|
self.assertMultiLineEqual('\n'.join(report).strip(), FULL_REPORT.strip())
|
||||||
|
|
||||||
def test_check_command_single(self):
|
def test_check_command_single(self):
|
||||||
"""Validate json conversion for a single project."""
|
"""Validate json conversion for a single project."""
|
||||||
|
wf = self.setup_vcr()
|
||||||
|
with open('tests/fixtures/project/staging_projects/openSUSE:Factory/H.json') as f:
|
||||||
|
wf.api.project_status = MagicMock(return_value=json.load(f))
|
||||||
report = self.checkcommand._check_project('H')
|
report = self.checkcommand._check_project('H')
|
||||||
self.assertMultiLineEqual('\n'.join(report).strip(), H_REPORT.strip())
|
self.assertMultiLineEqual('\n'.join(report).strip(), H_REPORT.strip())
|
||||||
|
@ -5,41 +5,38 @@ from osclib.conf import Config
|
|||||||
from osclib.core import attribute_value_save
|
from osclib.core import attribute_value_save
|
||||||
from osclib.memoize import memoize_session_reset
|
from osclib.memoize import memoize_session_reset
|
||||||
from osclib.stagingapi import StagingAPI
|
from osclib.stagingapi import StagingAPI
|
||||||
|
from . import vcrhelpers
|
||||||
from . import obs
|
|
||||||
|
|
||||||
|
|
||||||
class TestConfig(unittest.TestCase):
|
class TestConfig(unittest.TestCase):
|
||||||
def setUp(self):
|
def setup_vcr(self):
|
||||||
self.obs = obs.OBS()
|
return vcrhelpers.StagingWorkflow()
|
||||||
self.load_config()
|
|
||||||
self.api = StagingAPI(obs.APIURL, obs.PROJECT)
|
|
||||||
|
|
||||||
def load_config(self, project=obs.PROJECT):
|
|
||||||
self.config = Config(obs.APIURL, project)
|
|
||||||
|
|
||||||
def test_basic(self):
|
def test_basic(self):
|
||||||
self.assertEqual('openSUSE', conf.config[obs.PROJECT]['lock-ns'])
|
wf = self.setup_vcr()
|
||||||
|
self.assertEqual('openSUSE', conf.config[wf.project]['lock-ns'])
|
||||||
|
|
||||||
def test_remote(self):
|
def test_remote(self):
|
||||||
|
wf = self.setup_vcr()
|
||||||
# Initial config present in fixtures/oscrc and obs.py attribute default.
|
# Initial config present in fixtures/oscrc and obs.py attribute default.
|
||||||
# Local config fixture contains overridden-by-local and should win over
|
# Local config fixture contains overridden-by-local and should win over
|
||||||
# the remote config value.
|
# the remote config value.
|
||||||
self.assertEqual('local', conf.config[obs.PROJECT]['overridden-by-local'])
|
self.assertEqual('local', conf.config[wf.project]['overridden-by-local'])
|
||||||
self.assertEqual('remote-indeed', conf.config[obs.PROJECT]['remote-only'])
|
self.assertEqual('remote-indeed', conf.config[wf.project]['remote-only'])
|
||||||
|
|
||||||
# Change remote value.
|
# Change remote value.
|
||||||
attribute_value_save(obs.APIURL, obs.PROJECT, 'Config', 'remote-only = new value\n')
|
attribute_value_save(wf.apiurl, wf.project, 'Config', 'remote-only = new value\n')
|
||||||
self.load_config()
|
wf.load_config()
|
||||||
|
|
||||||
self.assertEqual('local', conf.config[obs.PROJECT]['overridden-by-local'])
|
self.assertEqual('local', conf.config[wf.project]['overridden-by-local'])
|
||||||
self.assertEqual('new value', conf.config[obs.PROJECT]['remote-only'])
|
self.assertEqual('new value', conf.config[wf.project]['remote-only'])
|
||||||
|
|
||||||
def test_remote_none(self):
|
def test_remote_none(self):
|
||||||
self.load_config('not_real_project')
|
wf = self.setup_vcr()
|
||||||
|
wf.load_config('not_real_project')
|
||||||
self.assertTrue(True) # Did not crash!
|
self.assertTrue(True) # Did not crash!
|
||||||
|
|
||||||
def test_pattern_order(self):
|
def test_pattern_order(self):
|
||||||
|
wf = self.setup_vcr()
|
||||||
# Add pattern to defaults in order to identify which was matched.
|
# Add pattern to defaults in order to identify which was matched.
|
||||||
for pattern in DEFAULT:
|
for pattern in DEFAULT:
|
||||||
DEFAULT[pattern]['pattern'] = pattern
|
DEFAULT[pattern]['pattern'] = pattern
|
||||||
@ -61,16 +58,17 @@ class TestConfig(unittest.TestCase):
|
|||||||
# Ensure each pattern is match instead of catch-all pattern.
|
# Ensure each pattern is match instead of catch-all pattern.
|
||||||
patterns = set()
|
patterns = set()
|
||||||
for project in projects:
|
for project in projects:
|
||||||
config = Config(obs.APIURL, project)
|
config = Config(wf.apiurl, project)
|
||||||
patterns.add(conf.config[project]['pattern'])
|
patterns.add(conf.config[project]['pattern'])
|
||||||
|
|
||||||
self.assertEqual(len(patterns), len(DEFAULT))
|
self.assertEqual(len(patterns), len(DEFAULT))
|
||||||
|
|
||||||
def test_get_memoize_reset(self):
|
def test_get_memoize_reset(self):
|
||||||
"""Ensure memoize_session_reset() properly forces re-fetch of config."""
|
"""Ensure memoize_session_reset() properly forces re-fetch of config."""
|
||||||
self.assertEqual('remote-indeed', Config.get(obs.APIURL, obs.PROJECT)['remote-only'])
|
wf = self.setup_vcr()
|
||||||
|
self.assertEqual('remote-indeed', Config.get(wf.apiurl, wf.project)['remote-only'])
|
||||||
|
|
||||||
attribute_value_save(obs.APIURL, obs.PROJECT, 'Config', 'remote-only = new value\n')
|
attribute_value_save(wf.apiurl, wf.project, 'Config', 'remote-only = new value\n')
|
||||||
memoize_session_reset()
|
memoize_session_reset()
|
||||||
|
|
||||||
self.assertEqual('new value', Config.get(obs.APIURL, obs.PROJECT)['remote-only'])
|
self.assertEqual('new value', Config.get(wf.apiurl, wf.project)['remote-only'])
|
||||||
|
@ -4,21 +4,12 @@ import difflib
|
|||||||
import subprocess
|
import subprocess
|
||||||
import tempfile
|
import tempfile
|
||||||
|
|
||||||
from . import obs
|
|
||||||
|
|
||||||
from osclib.conf import Config
|
from osclib.conf import Config
|
||||||
from osclib.freeze_command import FreezeCommand
|
from osclib.freeze_command import FreezeCommand
|
||||||
from osclib.stagingapi import StagingAPI
|
from osclib.stagingapi import StagingAPI
|
||||||
|
from . import vcrhelpers
|
||||||
|
|
||||||
class TestFreeze(unittest.TestCase):
|
class TestFreeze(unittest.TestCase):
|
||||||
def setUp(self):
|
|
||||||
"""
|
|
||||||
Initialize the configuration
|
|
||||||
"""
|
|
||||||
self.obs = obs.OBS()
|
|
||||||
Config(obs.APIURL, 'openSUSE:Factory')
|
|
||||||
self.api = StagingAPI(obs.APIURL, 'openSUSE:Factory')
|
|
||||||
|
|
||||||
def _get_fixture_path(self, filename):
|
def _get_fixture_path(self, filename):
|
||||||
"""
|
"""
|
||||||
@ -33,8 +24,9 @@ class TestFreeze(unittest.TestCase):
|
|||||||
return os.path.join(os.getcwd(), 'tests/fixtures')
|
return os.path.join(os.getcwd(), 'tests/fixtures')
|
||||||
|
|
||||||
def test_bootstrap_copy(self):
|
def test_bootstrap_copy(self):
|
||||||
|
wf = vcrhelpers.StagingWorkflow()
|
||||||
|
|
||||||
fc = FreezeCommand(self.api)
|
fc = FreezeCommand(wf.api)
|
||||||
|
|
||||||
fp = self._get_fixture_path('staging-meta-for-bootstrap-copy.xml')
|
fp = self._get_fixture_path('staging-meta-for-bootstrap-copy.xml')
|
||||||
fixture = subprocess.check_output('/usr/bin/xmllint --format %s' % fp, shell=True)
|
fixture = subprocess.check_output('/usr/bin/xmllint --format %s' % fp, shell=True)
|
||||||
|
1032
tests/obs.py
1032
tests/obs.py
File diff suppressed because it is too large
Load Diff
@ -2,16 +2,12 @@ from datetime import datetime
|
|||||||
import unittest
|
import unittest
|
||||||
from osclib.conf import Config
|
from osclib.conf import Config
|
||||||
from osclib.obslock import OBSLock
|
from osclib.obslock import OBSLock
|
||||||
|
from . import vcrhelpers
|
||||||
from . import obs
|
|
||||||
|
|
||||||
class TestOBSLock(unittest.TestCase):
|
class TestOBSLock(unittest.TestCase):
|
||||||
def setUp(self):
|
|
||||||
self.obs = obs.OBS()
|
|
||||||
Config(obs.APIURL, obs.PROJECT)
|
|
||||||
|
|
||||||
def obs_lock(self, reason='list'):
|
def obs_lock(self, wf, reason='list'):
|
||||||
return OBSLock(obs.APIURL, obs.PROJECT, reason=reason)
|
return OBSLock(wf.apiurl, wf.project, reason=reason)
|
||||||
|
|
||||||
def assertLockFail(self, lock):
|
def assertLockFail(self, lock):
|
||||||
with self.assertRaises(SystemExit):
|
with self.assertRaises(SystemExit):
|
||||||
@ -19,7 +15,8 @@ class TestOBSLock(unittest.TestCase):
|
|||||||
self.assertFalse(lock.locked)
|
self.assertFalse(lock.locked)
|
||||||
|
|
||||||
def test_lock(self):
|
def test_lock(self):
|
||||||
lock = self.obs_lock()
|
wf = self.setup_vcr()
|
||||||
|
lock = self.obs_lock(wf)
|
||||||
self.assertFalse(lock.locked)
|
self.assertFalse(lock.locked)
|
||||||
|
|
||||||
with lock:
|
with lock:
|
||||||
@ -33,9 +30,13 @@ class TestOBSLock(unittest.TestCase):
|
|||||||
|
|
||||||
self.assertFalse(lock.locked)
|
self.assertFalse(lock.locked)
|
||||||
|
|
||||||
def test_locked_self(self, hold=False):
|
def test_locked_self(self):
|
||||||
lock1 = self.obs_lock()
|
wf = self.setup_vcr()
|
||||||
lock2 = self.obs_lock()
|
self.locked_self(wf, hold=False)
|
||||||
|
|
||||||
|
def locked_self(self, wf, hold=False):
|
||||||
|
lock1 = self.obs_lock(wf)
|
||||||
|
lock2 = self.obs_lock(wf)
|
||||||
|
|
||||||
self.assertFalse(lock1.locked)
|
self.assertFalse(lock1.locked)
|
||||||
self.assertFalse(lock2.locked)
|
self.assertFalse(lock2.locked)
|
||||||
@ -47,10 +48,12 @@ class TestOBSLock(unittest.TestCase):
|
|||||||
if not hold:
|
if not hold:
|
||||||
# A hold will remain locked.
|
# A hold will remain locked.
|
||||||
self.assertFalse(lock1.locked)
|
self.assertFalse(lock1.locked)
|
||||||
|
|
||||||
self.assertFalse(lock2.locked)
|
self.assertFalse(lock2.locked)
|
||||||
|
|
||||||
def test_hold(self):
|
def test_hold(self):
|
||||||
lock = self.obs_lock('lock')
|
wf = self.setup_vcr()
|
||||||
|
lock = self.obs_lock(wf, 'lock')
|
||||||
self.assertFalse(lock.locked)
|
self.assertFalse(lock.locked)
|
||||||
|
|
||||||
with lock:
|
with lock:
|
||||||
@ -60,7 +63,7 @@ class TestOBSLock(unittest.TestCase):
|
|||||||
self.assertTrue(lock.locked)
|
self.assertTrue(lock.locked)
|
||||||
|
|
||||||
# Same constraints should apply since same user against hold.
|
# Same constraints should apply since same user against hold.
|
||||||
self.test_locked_self(hold=True)
|
self.locked_self(wf, hold=True)
|
||||||
|
|
||||||
# Hold should remain after subcommands are executed.
|
# Hold should remain after subcommands are executed.
|
||||||
user, reason, reason_sub, ts = lock._parse(lock._read())
|
user, reason, reason_sub, ts = lock._parse(lock._read())
|
||||||
@ -70,7 +73,7 @@ class TestOBSLock(unittest.TestCase):
|
|||||||
self.assertIsInstance(ts, datetime)
|
self.assertIsInstance(ts, datetime)
|
||||||
|
|
||||||
# Other users should not bypass hold.
|
# Other users should not bypass hold.
|
||||||
lock_user2 = self.obs_lock()
|
lock_user2 = self.obs_lock(wf)
|
||||||
lock_user2.user = 'other'
|
lock_user2.user = 'other'
|
||||||
self.assertLockFail(lock_user2)
|
self.assertLockFail(lock_user2)
|
||||||
|
|
||||||
@ -79,8 +82,9 @@ class TestOBSLock(unittest.TestCase):
|
|||||||
self.assertFalse(lock.locked)
|
self.assertFalse(lock.locked)
|
||||||
|
|
||||||
def test_expire(self):
|
def test_expire(self):
|
||||||
lock1 = self.obs_lock()
|
wf = self.setup_vcr()
|
||||||
lock2 = self.obs_lock()
|
lock1 = self.obs_lock(wf)
|
||||||
|
lock2 = self.obs_lock(wf)
|
||||||
lock2.ttl = 0
|
lock2.ttl = 0
|
||||||
lock2.user = 'user2'
|
lock2.user = 'user2'
|
||||||
|
|
||||||
@ -95,8 +99,9 @@ class TestOBSLock(unittest.TestCase):
|
|||||||
self.assertEqual(user, lock2.user)
|
self.assertEqual(user, lock2.user)
|
||||||
|
|
||||||
def test_expire_hold(self):
|
def test_expire_hold(self):
|
||||||
lock1 = self.obs_lock('lock')
|
wf = self.setup_vcr()
|
||||||
lock2 = self.obs_lock('override')
|
lock1 = self.obs_lock(wf, 'lock')
|
||||||
|
lock2 = self.obs_lock(wf, 'override')
|
||||||
lock2.ttl = 0
|
lock2.ttl = 0
|
||||||
lock2.user = 'user2'
|
lock2.user = 'user2'
|
||||||
|
|
||||||
@ -113,16 +118,27 @@ class TestOBSLock(unittest.TestCase):
|
|||||||
self.assertEqual(reason, 'override')
|
self.assertEqual(reason, 'override')
|
||||||
self.assertEqual(reason_sub, None, 'does not inherit hold')
|
self.assertEqual(reason_sub, None, 'does not inherit hold')
|
||||||
|
|
||||||
|
def setup_vcr(self):
|
||||||
|
wf = vcrhelpers.StagingWorkflow()
|
||||||
|
wf.create_target()
|
||||||
|
# we should most likely create this as part of create_target, but
|
||||||
|
# it just slows down all other tests
|
||||||
|
wf.create_attribute_type('openSUSE', 'LockedBy')
|
||||||
|
wf.create_project(wf.project + ':Staging')
|
||||||
|
return wf
|
||||||
|
|
||||||
def test_reserved_characters(self):
|
def test_reserved_characters(self):
|
||||||
lock = self.obs_lock('some reason @ #night')
|
wf = self.setup_vcr()
|
||||||
|
lock = self.obs_lock(wf, 'some reason @ #night')
|
||||||
|
|
||||||
with lock:
|
with lock:
|
||||||
_, reason, _, _ = lock._parse(lock._read())
|
_, reason, _, _ = lock._parse(lock._read())
|
||||||
self.assertEqual(reason, 'some reason at hashnight')
|
self.assertEqual(reason, 'some reason at hashnight')
|
||||||
|
|
||||||
def test_needed(self):
|
def test_needed(self):
|
||||||
lock1 = self.obs_lock()
|
wf = self.setup_vcr()
|
||||||
lock2 = self.obs_lock('unlock')
|
lock1 = self.obs_lock(wf)
|
||||||
|
lock2 = self.obs_lock(wf, 'unlock')
|
||||||
lock2.user = 'user2'
|
lock2.user = 'user2'
|
||||||
lock2.needed = False
|
lock2.needed = False
|
||||||
|
|
||||||
|
@ -1,63 +1,81 @@
|
|||||||
import unittest
|
import unittest
|
||||||
|
import os.path
|
||||||
from . import obs
|
|
||||||
from osc import oscerr
|
from osc import oscerr
|
||||||
|
import osc.conf
|
||||||
|
from osclib.cache import Cache
|
||||||
|
from osclib.cache_manager import CacheManager
|
||||||
from osclib.comments import CommentAPI
|
from osclib.comments import CommentAPI
|
||||||
from osclib.conf import Config
|
from osclib.conf import Config
|
||||||
from osclib.select_command import SelectCommand
|
from osclib.select_command import SelectCommand
|
||||||
from osclib.stagingapi import StagingAPI
|
from osclib.stagingapi import StagingAPI
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from mock import MagicMock
|
||||||
|
from . import vcrhelpers
|
||||||
|
|
||||||
class TestSelect(unittest.TestCase):
|
class TestSelect(unittest.TestCase):
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
"""
|
|
||||||
Initialize the configuration
|
|
||||||
"""
|
|
||||||
self.obs = obs.OBS()
|
|
||||||
Config(obs.APIURL, 'openSUSE:Factory')
|
|
||||||
self.api = StagingAPI(obs.APIURL, 'openSUSE:Factory')
|
|
||||||
|
|
||||||
def test_old_frozen(self):
|
def test_old_frozen(self):
|
||||||
self.assertEqual(self.api.prj_frozen_enough('openSUSE:Factory:Staging:A'), False)
|
wf = vcrhelpers.StagingWorkflow()
|
||||||
|
wf.api.prj_frozen_enough = MagicMock(return_value=False)
|
||||||
|
|
||||||
# check it won't allow selecting
|
# check it won't allow selecting
|
||||||
self.assertEqual(False, SelectCommand(self.api, 'openSUSE:Factory:Staging:A').perform(['gcc']))
|
staging = wf.create_staging('Old')
|
||||||
|
self.assertEqual(False, SelectCommand(wf.api, staging.name).perform(['gcc']))
|
||||||
|
|
||||||
def test_select_comments(self):
|
def test_select_comments(self):
|
||||||
c_api = CommentAPI(self.api.apiurl)
|
wf = vcrhelpers.StagingWorkflow()
|
||||||
staging_b = 'openSUSE:Factory:Staging:B'
|
wf.setup_rings()
|
||||||
comments = c_api.get_comments(project_name=staging_b)
|
|
||||||
|
staging_b = wf.create_staging('B', freeze=True)
|
||||||
|
|
||||||
|
c_api = CommentAPI(wf.api.apiurl)
|
||||||
|
comments = c_api.get_comments(project_name=staging_b.name)
|
||||||
|
|
||||||
|
r1 = wf.create_submit_request('devel:wine', 'wine')
|
||||||
|
r2 = wf.create_submit_request('devel:gcc', 'gcc')
|
||||||
|
|
||||||
# First select
|
# First select
|
||||||
self.assertEqual(True, SelectCommand(self.api, staging_b).perform(['gcc', 'wine']))
|
self.assertEqual(True, SelectCommand(wf.api, staging_b.name).perform(['gcc', 'wine']))
|
||||||
first_select_comments = c_api.get_comments(project_name=staging_b)
|
first_select_comments = c_api.get_comments(project_name=staging_b.name)
|
||||||
last_id = sorted(first_select_comments.keys())[-1]
|
last_id = sorted(first_select_comments.keys())[-1]
|
||||||
first_select_comment = first_select_comments[last_id]
|
first_select_comment = first_select_comments[last_id]
|
||||||
# Only one comment is added
|
# Only one comment is added
|
||||||
self.assertEqual(len(first_select_comments), len(comments) + 1)
|
self.assertEqual(len(first_select_comments), len(comments) + 1)
|
||||||
# With the right content
|
# With the right content
|
||||||
self.assertTrue('request#123 for package gcc submitted by Admin' in first_select_comment['comment'])
|
expected = 'request#{} for package gcc submitted by Admin'.format(r2.reqid)
|
||||||
|
self.assertTrue(expected in first_select_comment['comment'])
|
||||||
|
|
||||||
# Second select
|
# Second select
|
||||||
self.assertEqual(True, SelectCommand(self.api, staging_b).perform(['puppet']))
|
r3 = wf.create_submit_request('devel:gcc', 'gcc8')
|
||||||
second_select_comments = c_api.get_comments(project_name=staging_b)
|
self.assertEqual(True, SelectCommand(wf.api, staging_b.name).perform(['gcc8']))
|
||||||
|
second_select_comments = c_api.get_comments(project_name=staging_b.name)
|
||||||
last_id = sorted(second_select_comments.keys())[-1]
|
last_id = sorted(second_select_comments.keys())[-1]
|
||||||
second_select_comment = second_select_comments[last_id]
|
second_select_comment = second_select_comments[last_id]
|
||||||
# The number of comments increased by one
|
# The number of comments increased by one
|
||||||
self.assertEqual(len(second_select_comments) - 1, len(first_select_comments))
|
self.assertEqual(len(second_select_comments) - 1, len(first_select_comments))
|
||||||
self.assertNotEqual(second_select_comment['comment'], first_select_comment['comment'])
|
self.assertNotEqual(second_select_comment['comment'], first_select_comment['comment'])
|
||||||
# The new comments contains new, but not old
|
# The new comments contains new, but not old
|
||||||
self.assertFalse('request#123 for package gcc submitted by Admin' in second_select_comment['comment'])
|
self.assertFalse('request#{} for package gcz submitted by Admin'.format(r2.reqid) in second_select_comment['comment'])
|
||||||
self.assertTrue('added request#321 for package puppet submitted by Admin' in second_select_comment['comment'])
|
self.assertTrue('added request#{} for package gcc8 submitted by Admin'.format(r3.reqid) in second_select_comment['comment'])
|
||||||
|
|
||||||
def test_no_matches(self):
|
def test_no_matches(self):
|
||||||
|
wf = vcrhelpers.StagingWorkflow()
|
||||||
|
|
||||||
|
staging = wf.create_staging('N', freeze=True)
|
||||||
|
|
||||||
# search for requests
|
# search for requests
|
||||||
with self.assertRaises(oscerr.WrongArgs) as cm:
|
with self.assertRaises(oscerr.WrongArgs) as cm:
|
||||||
SelectCommand(self.api, 'openSUSE:Factory:Staging:B').perform(['bash'])
|
SelectCommand(wf.api, staging.name).perform(['bash'])
|
||||||
self.assertEqual(str(cm.exception), "No SR# found for: bash")
|
self.assertEqual(str(cm.exception), "No SR# found for: bash")
|
||||||
|
|
||||||
def test_selected(self):
|
def test_selected(self):
|
||||||
# make sure the project is frozen recently for other tests
|
wf = vcrhelpers.StagingWorkflow()
|
||||||
|
|
||||||
ret = SelectCommand(self.api, 'openSUSE:Factory:Staging:B').perform(['wine'])
|
wf.setup_rings()
|
||||||
|
staging = wf.create_staging('S', freeze=True)
|
||||||
|
|
||||||
|
request = wf.create_submit_request('devel:wine', 'wine')
|
||||||
|
|
||||||
|
ret = SelectCommand(wf.api, staging.name).perform(['wine'])
|
||||||
self.assertEqual(True, ret)
|
self.assertEqual(True, ret)
|
||||||
|
10
tests/test.oscrc
Normal file
10
tests/test.oscrc
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
[general]
|
||||||
|
apiurl = http://localhost:3737
|
||||||
|
|
||||||
|
[http://localhost:3737]
|
||||||
|
user=Admin
|
||||||
|
pass=opensuse
|
||||||
|
|
||||||
|
[openSUSE:Factory]
|
||||||
|
overridden-by-local = local
|
||||||
|
|
@ -1,18 +1,30 @@
|
|||||||
import unittest
|
import unittest
|
||||||
from osclib.conf import Config
|
from osclib.conf import Config
|
||||||
from osclib.stagingapi import StagingAPI
|
from osclib.stagingapi import StagingAPI
|
||||||
|
from osclib.select_command import SelectCommand
|
||||||
from osclib.unselect_command import UnselectCommand
|
from osclib.unselect_command import UnselectCommand
|
||||||
|
from osclib.core import package_list_without_links
|
||||||
from . import obs
|
from . import vcrhelpers
|
||||||
|
|
||||||
class TestUnselect(unittest.TestCase):
|
class TestUnselect(unittest.TestCase):
|
||||||
def setUp(self):
|
|
||||||
self.obs = obs.OBS()
|
|
||||||
Config(obs.APIURL, obs.PROJECT)
|
|
||||||
self.api = StagingAPI(obs.APIURL, obs.PROJECT)
|
|
||||||
|
|
||||||
def test_cleanup_filter(self):
|
def test_cleanup_filter(self):
|
||||||
UnselectCommand.config_init(self.api)
|
wf = vcrhelpers.StagingWorkflow()
|
||||||
|
UnselectCommand.config_init(wf.api)
|
||||||
UnselectCommand.cleanup_days = 1
|
UnselectCommand.cleanup_days = 1
|
||||||
obsolete = self.api.project_status_requests('obsolete', UnselectCommand.filter_obsolete)
|
obsolete = wf.api.project_status_requests('obsolete', UnselectCommand.filter_obsolete)
|
||||||
self.assertSequenceEqual(['627445', '642126', '646560', '645723', '646823'], obsolete)
|
self.assertSequenceEqual([], obsolete)
|
||||||
|
|
||||||
|
def test_free_staging(self):
|
||||||
|
wf = vcrhelpers.StagingWorkflow()
|
||||||
|
wf.setup_rings()
|
||||||
|
|
||||||
|
staging_a = wf.create_staging('A', freeze=True)
|
||||||
|
winerq = wf.create_submit_request('devel:wine', 'wine')
|
||||||
|
self.assertEqual(True, SelectCommand(wf.api, staging_a.name).perform(['wine']))
|
||||||
|
self.assertEqual(['wine'], package_list_without_links(wf.apiurl, staging_a.name))
|
||||||
|
|
||||||
|
uc = UnselectCommand(wf.api)
|
||||||
|
self.assertIsNone(uc.perform(['wine']))
|
||||||
|
|
||||||
|
self.assertEqual([], package_list_without_links(wf.apiurl, staging_a.name))
|
||||||
|
283
tests/vcrhelpers.py
Normal file
283
tests/vcrhelpers.py
Normal file
@ -0,0 +1,283 @@
|
|||||||
|
from osc import oscerr
|
||||||
|
from osclib.cache import Cache
|
||||||
|
from osclib.cache_manager import CacheManager
|
||||||
|
from osclib.conf import Config
|
||||||
|
from osclib.freeze_command import FreezeCommand
|
||||||
|
from osclib.stagingapi import StagingAPI
|
||||||
|
from osclib.core import attribute_value_save
|
||||||
|
from osclib.memoize import memoize_session_reset
|
||||||
|
import logging
|
||||||
|
import os.path
|
||||||
|
import osc.conf
|
||||||
|
import osc.core
|
||||||
|
import random
|
||||||
|
import string
|
||||||
|
from xml.etree import cElementTree as ET
|
||||||
|
|
||||||
|
try:
|
||||||
|
from urllib.error import HTTPError, URLError
|
||||||
|
except ImportError:
|
||||||
|
#python 2.x
|
||||||
|
from urllib2 import HTTPError, URLError
|
||||||
|
|
||||||
|
APIURL = 'http://localhost:3737'
|
||||||
|
PROJECT = 'openSUSE:Factory'
|
||||||
|
|
||||||
|
class StagingWorkflow(object):
|
||||||
|
def __init__(self, project=PROJECT):
|
||||||
|
"""
|
||||||
|
Initialize the configuration
|
||||||
|
"""
|
||||||
|
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||||
|
oscrc = os.path.join(THIS_DIR, 'test.oscrc')
|
||||||
|
|
||||||
|
self.apiurl = APIURL
|
||||||
|
logging.basicConfig()
|
||||||
|
|
||||||
|
# clear cache from other tests - otherwise the VCR is replayed depending
|
||||||
|
# on test order, which can be harmful
|
||||||
|
memoize_session_reset()
|
||||||
|
|
||||||
|
osc.core.conf.get_config(override_conffile=oscrc,
|
||||||
|
override_no_keyring=True,
|
||||||
|
override_no_gnome_keyring=True)
|
||||||
|
if os.environ.get('OSC_DEBUG'):
|
||||||
|
osc.core.conf.config['debug'] = 1
|
||||||
|
self.project = project
|
||||||
|
self.projects = {}
|
||||||
|
self.requests = []
|
||||||
|
self.groups = []
|
||||||
|
CacheManager.test = True
|
||||||
|
# disable caching, the TTLs break any reproduciblity
|
||||||
|
Cache.CACHE_DIR = None
|
||||||
|
Cache.PATTERNS = {}
|
||||||
|
Cache.init()
|
||||||
|
self.setup_remote_config()
|
||||||
|
self.load_config()
|
||||||
|
self.api = StagingAPI(APIURL, project)
|
||||||
|
|
||||||
|
def load_config(self, project=None):
|
||||||
|
if project is None:
|
||||||
|
project = self.project
|
||||||
|
self.config = Config(APIURL, project)
|
||||||
|
|
||||||
|
def create_attribute_type(self, namespace, name, values=None):
|
||||||
|
meta="""
|
||||||
|
<namespace name='{}'>
|
||||||
|
<modifiable_by user='Admin'/>
|
||||||
|
</namespace>""".format(namespace)
|
||||||
|
url = osc.core.makeurl(APIURL, ['attribute', namespace, '_meta'])
|
||||||
|
osc.core.http_PUT(url, data=meta)
|
||||||
|
|
||||||
|
meta="<definition name='{}' namespace='{}'><description/>".format(name, namespace)
|
||||||
|
if values:
|
||||||
|
meta += "<count>{}</count>".format(values)
|
||||||
|
meta += "<modifiable_by role='maintainer'/></definition>"
|
||||||
|
url = osc.core.makeurl(APIURL, ['attribute', namespace, name, '_meta'])
|
||||||
|
osc.core.http_PUT(url, data=meta)
|
||||||
|
|
||||||
|
def setup_remote_config(self):
|
||||||
|
self.create_target()
|
||||||
|
self.create_attribute_type('OSRT', 'Config', 1)
|
||||||
|
attribute_value_save(APIURL, self.project, 'Config', 'overridden-by-local = remote-nope\n'
|
||||||
|
'remote-only = remote-indeed\n')
|
||||||
|
|
||||||
|
def create_group(self, name):
|
||||||
|
if name in self.groups: return
|
||||||
|
meta = """
|
||||||
|
<group>
|
||||||
|
<title>{}</title>
|
||||||
|
</group>
|
||||||
|
""".format(name)
|
||||||
|
self.groups.append(name)
|
||||||
|
url = osc.core.makeurl(APIURL, ['group', name])
|
||||||
|
osc.core.http_PUT(url, data=meta)
|
||||||
|
|
||||||
|
def create_target(self):
|
||||||
|
if self.projects.get('target'): return
|
||||||
|
self.create_group('factory-staging')
|
||||||
|
self.projects['target'] = Project(name=self.project, reviewer={'groups': ['factory-staging']})
|
||||||
|
|
||||||
|
def setup_rings(self):
|
||||||
|
self.create_target()
|
||||||
|
self.projects['ring0'] = Project(name=self.project + ':Rings:0-Bootstrap')
|
||||||
|
self.projects['ring1'] = Project(name=self.project + ':Rings:1-MinimalX')
|
||||||
|
target_wine = Package(name='wine', project=self.projects['target'])
|
||||||
|
self.create_link(target_wine, self.projects['ring1'])
|
||||||
|
|
||||||
|
def create_package(self, project, package):
|
||||||
|
project = self.create_project(project)
|
||||||
|
return Package(name=package, project=project)
|
||||||
|
|
||||||
|
def create_link(self, source_package, target_project, target_package=None):
|
||||||
|
if not target_package:
|
||||||
|
target_package = source_package.name
|
||||||
|
target_package = Package(name=target_package, project=target_project)
|
||||||
|
url = self.api.makeurl(['source', target_project.name, target_package.name, '_link'])
|
||||||
|
osc.core.http_PUT(url, data='<link project="{}" package="{}"/>'.format(source_package.project.name,
|
||||||
|
source_package.name))
|
||||||
|
return target_package
|
||||||
|
|
||||||
|
def create_project(self, name):
|
||||||
|
if isinstance(name, Project):
|
||||||
|
return name
|
||||||
|
if name in self.projects:
|
||||||
|
return self.projects[name]
|
||||||
|
self.projects[name] = Project(name)
|
||||||
|
return self.projects[name]
|
||||||
|
|
||||||
|
def create_submit_request(self, project, package, text=None):
|
||||||
|
project = self.create_project(project)
|
||||||
|
package = Package(name=package, project=project)
|
||||||
|
package.create_commit(text)
|
||||||
|
request = Request(source_package=package, target_project=self.project)
|
||||||
|
self.requests.append(request)
|
||||||
|
return request
|
||||||
|
|
||||||
|
def create_staging(self, suffix, freeze=False):
|
||||||
|
staging = Project(self.project + ':Staging:' + suffix)
|
||||||
|
if freeze:
|
||||||
|
FreezeCommand(self.api).perform(staging.name)
|
||||||
|
self.projects['staging:{}'.format(suffix)] = staging
|
||||||
|
return staging
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
print('deleting staging workflow')
|
||||||
|
for project in self.projects.values():
|
||||||
|
project.remove()
|
||||||
|
for request in self.requests:
|
||||||
|
request.revoke()
|
||||||
|
for group in self.groups:
|
||||||
|
url = osc.core.makeurl(APIURL, ['group', group])
|
||||||
|
try:
|
||||||
|
osc.core.http_DELETE(url)
|
||||||
|
except HTTPError:
|
||||||
|
pass
|
||||||
|
print('done')
|
||||||
|
if hasattr(self.api, '_invalidate_all'):
|
||||||
|
self.api._invalidate_all()
|
||||||
|
|
||||||
|
class Project(object):
|
||||||
|
def __init__(self, name, reviewer={}):
|
||||||
|
self.name = name
|
||||||
|
|
||||||
|
meta = """
|
||||||
|
<project name="{0}">
|
||||||
|
<title></title>
|
||||||
|
<description></description>
|
||||||
|
</project>""".format(self.name)
|
||||||
|
|
||||||
|
root = ET.fromstring(meta)
|
||||||
|
for group in reviewer.get('groups', []):
|
||||||
|
ET.SubElement(root, 'group', { 'groupid': group, 'role': 'reviewer'} )
|
||||||
|
for group in reviewer.get('users', []):
|
||||||
|
ET.SubElement(root, 'person', { 'userid': group, 'role': 'reviewer'} )
|
||||||
|
|
||||||
|
url = osc.core.make_meta_url('prj', self.name, APIURL)
|
||||||
|
osc.core.http_PUT(url, data=ET.tostring(root))
|
||||||
|
|
||||||
|
self.packages = []
|
||||||
|
|
||||||
|
def add_package(self, package):
|
||||||
|
self.packages.append(package)
|
||||||
|
|
||||||
|
def remove(self):
|
||||||
|
if not self.name:
|
||||||
|
return
|
||||||
|
print('deleting project', self.name)
|
||||||
|
for package in self.packages:
|
||||||
|
package.remove()
|
||||||
|
|
||||||
|
url = osc.core.makeurl(APIURL, ['source', self.name])
|
||||||
|
try:
|
||||||
|
osc.core.http_DELETE(url)
|
||||||
|
except HTTPError:
|
||||||
|
pass
|
||||||
|
self.name = None
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
self.remove()
|
||||||
|
|
||||||
|
class Package(object):
|
||||||
|
def __init__(self, name, project):
|
||||||
|
self.name = name
|
||||||
|
self.project = project
|
||||||
|
|
||||||
|
meta = """
|
||||||
|
<package project="{1}" name="{0}">
|
||||||
|
<title></title>
|
||||||
|
<description></description>
|
||||||
|
</package>""".format(self.name, self.project.name)
|
||||||
|
|
||||||
|
url = osc.core.make_meta_url('pkg', (self.project.name, self.name), APIURL)
|
||||||
|
osc.core.http_PUT(url, data=meta)
|
||||||
|
print('created {}/{}'.format(self.project.name, self.name))
|
||||||
|
self.project.add_package(self)
|
||||||
|
|
||||||
|
# delete from instance
|
||||||
|
def __del__(self):
|
||||||
|
self.remove()
|
||||||
|
|
||||||
|
def create_file(self, filename, data=''):
|
||||||
|
url = osc.core.makeurl(APIURL, ['source', self.project.name, self.name, filename])
|
||||||
|
osc.core.http_PUT(url, data=data)
|
||||||
|
|
||||||
|
def remove(self):
|
||||||
|
if not self.project:
|
||||||
|
return
|
||||||
|
print('deleting package', self.project.name, self.name)
|
||||||
|
url = osc.core.makeurl(APIURL, ['source', self.project.name, self.name])
|
||||||
|
try:
|
||||||
|
osc.core.http_DELETE(url)
|
||||||
|
except HTTPError:
|
||||||
|
# only cleanup
|
||||||
|
pass
|
||||||
|
|
||||||
|
def create_commit(self, text=None):
|
||||||
|
url = osc.core.makeurl(APIURL, ['source', self.project.name, self.name, 'README'])
|
||||||
|
if not text:
|
||||||
|
text = ''.join([random.choice(string.letters) for i in range(40)])
|
||||||
|
osc.core.http_PUT(url, data=text)
|
||||||
|
|
||||||
|
class Request(object):
|
||||||
|
def __init__(self, source_package, target_project):
|
||||||
|
self.source_package = source_package
|
||||||
|
self.target_project = target_project
|
||||||
|
|
||||||
|
self.reqid = osc.core.create_submit_request(APIURL,
|
||||||
|
src_project=self.source_package.project.name,
|
||||||
|
src_package=self.source_package.name,
|
||||||
|
dst_project=self.target_project)
|
||||||
|
self.revoked = False
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
self.revoke()
|
||||||
|
|
||||||
|
def revoke(self):
|
||||||
|
if self.revoked: return
|
||||||
|
self.revoked = True
|
||||||
|
url = osc.core.makeurl(APIURL, ['request', self.reqid], { 'newstate': 'revoked',
|
||||||
|
'cmd': 'changestate' })
|
||||||
|
try:
|
||||||
|
osc.core.http_POST(url)
|
||||||
|
except HTTPError:
|
||||||
|
# may fail if already accepted/declined in tests
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _translate_review(self, review):
|
||||||
|
ret = {'state': review.get('state')}
|
||||||
|
for type in ['by_project', 'by_package', 'by_user', 'by_group']:
|
||||||
|
if not review.get(type):
|
||||||
|
continue
|
||||||
|
ret[type] = review.get(type)
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def reviews(self):
|
||||||
|
ret = []
|
||||||
|
for review in self.xml().findall('.//review'):
|
||||||
|
ret.append(self._translate_review(review))
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def xml(self):
|
||||||
|
url = osc.core.makeurl(APIURL, ['request', self.reqid])
|
||||||
|
return ET.parse(osc.core.http_GET(url))
|
Loading…
x
Reference in New Issue
Block a user