1
0
mirror of https://github.com/openSUSE/osc.git synced 2024-11-15 00:36:11 +01:00
github.com_openSUSE_osc/osc/_private/project.py

155 lines
5.6 KiB
Python

from . import api
from .api import ET
from .. import core as osc_core
from .. import oscerr
class APIXMLBase:
def __init__(self, xml_root, apiurl=None):
self.root = xml_root
self.apiurl = apiurl
def to_bytes(self):
api.xml_indent(self.root)
return ET.tostring(self.root, encoding="utf-8")
def to_string(self):
return self.to_bytes().decode("utf-8")
class ProjectMeta(APIXMLBase):
@classmethod
def from_api(cls, apiurl, project):
url_path = ["source", project, "_meta"]
root = api.get(apiurl, url_path)
obj = cls(root, apiurl=apiurl)
return obj
def to_api(self, apiurl, project):
url_path = ["source", project, "_meta"]
api.put(apiurl, url_path, data=self.to_bytes())
def repository_list(self):
result = []
repo_nodes = api.find_nodes(self.root, "project", "repository")
for repo_node in repo_nodes:
arch_nodes = api.find_nodes(repo_node, "repository", "arch")
path_nodes = api.find_nodes(repo_node, "repository", "path")
repo = {
"name": repo_node.attrib["name"],
"archs": [i.text.strip() for i in arch_nodes],
"paths": [i.attrib.copy() for i in path_nodes],
}
result.append(repo)
return result
def repository_add(self, name, arches, paths):
node = api.find_node(self.root, "project")
existing = api.find_node(self.root, "project", "repository", {"name": name})
if existing:
raise oscerr.OscValueError(f"Repository '{name}' already exists in project meta")
repo_node = ET.SubElement(node, "repository", attrib={"name": name})
for path_data in paths:
ET.SubElement(repo_node, "path", attrib={
"project": path_data["project"],
"repository": path_data["repository"],
})
for arch in arches:
arch_node = ET.SubElement(repo_node, "arch")
arch_node.text = arch
api.group_child_nodes(repo_node)
api.group_child_nodes(node)
def repository_remove(self, name):
repo_node = api.find_node(self.root, "project", "repository", {"name": name})
if repo_node is None:
return
self.root.remove(repo_node)
def publish_add_disable_repository(self, name: str):
publish_node = api.find_node(self.root, "project", "publish")
if publish_node is None:
project_node = api.find_node(self.root, "project")
publish_node = ET.SubElement(project_node, "publish")
else:
disable_node = api.find_node(publish_node, "publish", "disable", {"repository": name})
if disable_node is not None:
return
ET.SubElement(publish_node, "disable", attrib={"repository": name})
api.group_child_nodes(publish_node)
def publish_remove_disable_repository(self, name: str):
publish_node = api.find_node(self.root, "project", "publish")
if publish_node is None:
return
disable_node = api.find_node(publish_node, "publish", "disable", {"repository": name})
if disable_node is not None:
publish_node.remove(disable_node)
if len(publish_node) == 0:
self.root.remove(publish_node)
REPOSITORY_FLAGS_TEMPLATE = {
"build": None,
"debuginfo": None,
"publish": None,
"useforbuild": None,
}
def _update_repository_flags(self, repository_flags, xml_root):
"""
Update `repository_flags` with data from the `xml_root`.
"""
for flag in self.REPOSITORY_FLAGS_TEMPLATE:
flag_node = xml_root.find(flag)
if flag_node is None:
continue
for node in flag_node:
action = node.tag
repo = node.get("repository")
arch = node.get("arch")
for (entry_repo, entry_arch), entry_data in repository_flags.items():
match = False
if (repo, arch) == (entry_repo, entry_arch):
# apply to matching repository and architecture
match = True
elif repo == entry_repo and not arch:
# apply to all matching repositories
match = True
elif not repo and arch == entry_arch:
# apply to all matching architectures
match = True
elif not repo and not arch:
# apply to everything
match = True
if match:
entry_data[flag] = True if action == "enable" else False
def resolve_repository_flags(self, package=None):
"""
Resolve the `build`, `debuginfo`, `publish` and `useforbuild` flags
and return their values for each repository and build arch.
:returns: {(repo_name, repo_buildarch): {flag_name: bool} for all available repos
"""
result = {}
# TODO: avoid calling get_repos_of_project(), use self.root instead
for repo in osc_core.get_repos_of_project(self.apiurl, self.root.attrib["name"]):
result[(repo.name, repo.arch)] = self.REPOSITORY_FLAGS_TEMPLATE.copy()
self._update_repository_flags(result, self.root)
if package:
m = osc_core.show_package_meta(self.apiurl, self.root.attrib["name"], package)
root = ET.fromstring(b''.join(m))
self._update_repository_flags(result, root)
return result