1
0
mirror of https://github.com/openSUSE/osc.git synced 2025-02-03 18:16:17 +01:00

Merge pull request #1324 from dmach/repo-commands-for-managing-repositories-in-meta

Add 'repo' command and subcommands for managing repositories in project meta
This commit is contained in:
Daniel Mach 2023-07-14 10:57:40 +02:00 committed by GitHub
commit ba2488a702
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 669 additions and 0 deletions

View File

@ -0,0 +1,50 @@
Feature: `osc repo` command
Scenario: Run `osc repo` with no arguments
When I execute osc with args "repo"
Then stdout is
"""
"""
Scenario: Run `osc repo list` on a project
When I execute osc with args "repo list test:factory"
Then stdout is
"""
Repository : standard
Architectures : x86_64, i586
Paths : openSUSE.org:openSUSE:Tumbleweed/standard
Flags
build : disable: x86_64, i586
"""
@destructive
Scenario: Run `osc repo add` on a project
When I execute osc with args "repo add --yes test:factory --repo=new-repo --arch=x86_64 --arch=aarch64 --path=test:factory/standard --path=test:devel/standard"
And I execute osc with args "repo list test:factory"
Then stdout is
"""
Repository : standard
Architectures : x86_64, i586
Paths : openSUSE.org:openSUSE:Tumbleweed/standard
Flags
build : disable: x86_64, i586
Repository : new-repo
Architectures : x86_64, aarch64
Paths : test:factory/standard
test:devel/standard
Flags
build : disable: x86_64, aarch64
"""
@destructive
Scenario: Run `osc repo remove` on a project
When I execute osc with args "repo remove --yes test:factory --repo=standard --repo=does-not-exist"
And I execute osc with args "repo list test:factory"
Then stdout is
"""
"""

View File

@ -64,6 +64,34 @@ def post(apiurl, path, query=None):
return root
def put(apiurl, path, query=None, data=None):
"""
Send a PUT request to OBS.
:param apiurl: OBS apiurl.
:type apiurl: str
:param path: URL path segments.
:type path: list(str)
:param query: URL query values.
:type query: dict(str, str)
:returns: Parsed XML root.
:rtype: xml.etree.ElementTree.Element
"""
from osc import connection as osc_connection
from osc import core as osc_core
assert apiurl
assert path
if not isinstance(path, (list, tuple)):
raise TypeError("Argument `path` expects a list of strings")
url = osc_core.makeurl(apiurl, path, query)
with osc_connection.http_PUT(url, data=data) as f:
root = osc_core.ET.parse(f).getroot()
return root
def _to_xpath(*args):
"""
Convert strings and dictionaries to xpath:
@ -139,6 +167,30 @@ def find_node(root, root_name, *args):
return root.find(_to_xpath(*args))
def group_child_nodes(node):
nodes = node[:]
result = []
while nodes:
# look at the tag of the first node
tag = nodes[0].tag
# collect all nodes with the same tag and append them to the result
# then repeat the step for the next tag(s)
matches = []
others = []
for i in nodes:
if i.tag == tag:
matches.append(i)
else:
others.append(i)
result += matches
nodes = others
node[:] = result
def write_xml_node_to_file(node, path, indent=True):
"""
Write a XML node to a file.

154
osc/_private/project.py Normal file
View File

@ -0,0 +1,154 @@
from . import api
from .api import ET
from .. import core as osc_core
from .. import oscerr
class APIXMLBase:
def __init__(self, xml_root, apiurl=None):
self.root = xml_root
self.apiurl = apiurl
def to_bytes(self):
ET.indent(self.root, space=" ", level=0)
return ET.tostring(self.root, encoding="utf-8")
def to_string(self):
return self.to_bytes().decode("utf-8")
class ProjectMeta(APIXMLBase):
@classmethod
def from_api(cls, apiurl, project):
url_path = ["source", project, "_meta"]
root = api.get(apiurl, url_path)
obj = cls(root, apiurl=apiurl)
return obj
def to_api(self, apiurl, project):
url_path = ["source", project, "_meta"]
api.put(apiurl, url_path, data=self.to_bytes())
def repository_list(self):
result = []
repo_nodes = api.find_nodes(self.root, "project", "repository")
for repo_node in repo_nodes:
arch_nodes = api.find_nodes(repo_node, "repository", "arch")
path_nodes = api.find_nodes(repo_node, "repository", "path")
repo = {
"name": repo_node.attrib["name"],
"archs": [i.text.strip() for i in arch_nodes],
"paths": [i.attrib.copy() for i in path_nodes],
}
result.append(repo)
return result
def repository_add(self, name, arches, paths):
node = api.find_node(self.root, "project")
existing = api.find_node(self.root, "project", "repository", {"name": name})
if existing:
raise oscerr.OscValueError(f"Repository '{name}' already exists in project meta")
repo_node = ET.SubElement(node, "repository", attrib={"name": name})
for path_data in paths:
ET.SubElement(repo_node, "path", attrib={
"project": path_data["project"],
"repository": path_data["repository"],
})
for arch in arches:
arch_node = ET.SubElement(repo_node, "arch")
arch_node.text = arch
api.group_child_nodes(repo_node)
api.group_child_nodes(node)
def repository_remove(self, name):
repo_node = api.find_node(self.root, "project", "repository", {"name": name})
if repo_node is None:
return
self.root.remove(repo_node)
def publish_add_disable_repository(self, name: str):
publish_node = api.find_node(self.root, "project", "publish")
if publish_node is None:
project_node = api.find_node(self.root, "project")
publish_node = ET.SubElement(project_node, "publish")
else:
disable_node = api.find_node(publish_node, "publish", "disable", {"repository": name})
if disable_node is not None:
return
ET.SubElement(publish_node, "disable", attrib={"repository": name})
api.group_child_nodes(publish_node)
def publish_remove_disable_repository(self, name: str):
publish_node = api.find_node(self.root, "project", "publish")
if publish_node is None:
return
disable_node = api.find_node(publish_node, "publish", "disable", {"repository": name})
if disable_node is not None:
publish_node.remove(disable_node)
if len(publish_node) == 0:
self.root.remove(publish_node)
REPOSITORY_FLAGS_TEMPLATE = {
"build": None,
"debuginfo": None,
"publish": None,
"useforbuild": None,
}
def _update_repository_flags(self, repository_flags, xml_root):
"""
Update `repository_flags` with data from the `xml_root`.
"""
for flag in self.REPOSITORY_FLAGS_TEMPLATE:
flag_node = xml_root.find(flag)
if flag_node is None:
continue
for node in flag_node:
action = node.tag
repo = node.get("repository")
arch = node.get("arch")
for (entry_repo, entry_arch), entry_data in repository_flags.items():
match = False
if (repo, arch) == (entry_repo, entry_arch):
# apply to matching repository and architecture
match = True
elif repo == entry_repo and not arch:
# apply to all matching repositories
match = True
elif not repo and arch == entry_arch:
# apply to all matching architectures
match = True
elif not repo and not arch:
# apply to everything
match = True
if match:
entry_data[flag] = True if action == "enable" else False
def resolve_repository_flags(self, package=None):
"""
Resolve the `build`, `debuginfo`, `publish` and `useforbuild` flags
and return their values for each repository and build arch.
:returns: {(repo_name, repo_buildarch): {flag_name: bool} for all available repos
"""
result = {}
# TODO: avoid calling get_repos_of_project(), use self.root instead
for repo in osc_core.get_repos_of_project(self.apiurl, self.root.attrib["name"]):
result[(repo.name, repo.arch)] = self.REPOSITORY_FLAGS_TEMPLATE.copy()
self._update_repository_flags(result, self.root)
if package:
m = osc_core.show_package_meta(self.apiurl, self.root.attrib["name"], package)
root = ET.fromstring(b''.join(m))
self._update_repository_flags(result, root)
return result

11
osc/commands/repo.py Normal file
View File

@ -0,0 +1,11 @@
import osc.commandline
class RepoCommand(osc.commandline.OscCommand):
"""
Manage repositories in project meta
"""
name = "repo"
def run(self, args):
pass

82
osc/commands/repo_add.py Normal file
View File

@ -0,0 +1,82 @@
import difflib
import osc.commandline
from .. import oscerr
from .._private.project import ProjectMeta
from ..core import raw_input
class RepoAddCommand(osc.commandline.OscCommand):
"""
Add a repository to project meta
"""
name = "add"
parent = "RepoCommand"
def init_arguments(self):
self.add_argument(
"project",
help="Name of the project",
)
self.add_argument(
"--repo",
metavar="NAME",
required=True,
help="Name of the repository we're adding",
)
self.add_argument(
"--arch",
dest="arches",
metavar="[ARCH]",
action="append",
required=True,
help="Architecture of the repository. Can be specified multiple times.",
)
self.add_argument(
"--path",
dest="paths",
metavar="[PROJECT/REPO]",
action="append",
required=True,
help="Path associated to the repository. Format is PROJECT/REPO. Can be specified multiple times.",
)
self.add_argument(
"--disable-publish",
action="store_true",
default=False,
help="Disable publishing the added repository",
)
self.add_argument(
"--yes",
action="store_true",
help="Proceed without asking",
)
def run(self, args):
paths = []
for path in args.paths:
if "/" not in path:
self.parser.error(f"Invalid path (expected format is PROJECT/REPO): {path}")
project, repo = path.split("/")
paths.append({"project": project, "repository": repo})
meta = ProjectMeta.from_api(args.apiurl, args.project)
old_meta = meta.to_string().splitlines()
meta.repository_add(args.repo, args.arches, paths)
if args.disable_publish:
meta.publish_add_disable_repository(args.repo)
new_meta = meta.to_string().splitlines()
diff = difflib.unified_diff(old_meta, new_meta, fromfile="old", tofile="new")
print("\n".join(diff))
if not args.yes:
print()
print(f"You're changing meta of project '{args.project}'")
reply = raw_input("Do you want to apply the changes? [y/N] ").lower()
if reply != "y":
raise oscerr.UserAbort()
meta.to_api(args.apiurl, args.project)

51
osc/commands/repo_list.py Normal file
View File

@ -0,0 +1,51 @@
import osc.commandline
from ..output import KeyValueTable
from .._private.project import ProjectMeta
class RepoListCommand(osc.commandline.OscCommand):
"""
List repositories in project meta
"""
name = "list"
aliases = ["ls"]
parent = "RepoCommand"
def init_arguments(self):
self.add_argument(
"project",
help="Name of the project",
)
def run(self, args):
meta = ProjectMeta.from_api(args.apiurl, args.project)
repo_flags = meta.resolve_repository_flags()
flag_map = {}
for (repo_name, arch), data in repo_flags.items():
for flag_name, flag_value in data.items():
if flag_value is None:
continue
action = "enable" if flag_value else "disable"
flag_map.setdefault(repo_name, {}).setdefault(flag_name, {}).setdefault(action, []).append(arch)
table = KeyValueTable()
for repo in meta.repository_list():
table.add("Repository", repo["name"], color="bold")
table.add("Architectures", ", ".join(repo["archs"]))
if repo["paths"]:
paths = [f"{path['project']}/{path['repository']}" for path in repo["paths"]]
table.add("Paths", paths)
if repo["name"] in flag_map:
table.add("Flags", None)
for flag_name in flag_map[repo["name"]]:
lines = []
for action, archs in flag_map[repo["name"]][flag_name].items():
lines.append(f"{action + ':':<8s} {', '.join(archs)}")
lines.sort()
table.add(flag_name, lines, indent=4)
table.newline()
print(str(table))

View File

@ -0,0 +1,55 @@
import difflib
import osc.commandline
from .. import oscerr
from .._private.project import ProjectMeta
from ..core import raw_input
class RepoRemoveCommand(osc.commandline.OscCommand):
"""
Remove repositories from project meta
"""
name = "remove"
aliases = ["rm"]
parent = "RepoCommand"
def init_arguments(self):
self.add_argument(
"project",
help="Name of the project",
)
self.add_argument(
"--repo",
metavar="[NAME]",
action="append",
required=True,
help="Name of the repository we're removing. Can be specified multiple times.",
)
self.add_argument(
"--yes",
action="store_true",
help="Proceed without asking",
)
def run(self, args):
meta = ProjectMeta.from_api(args.apiurl, args.project)
old_meta = meta.to_string().splitlines()
for repo in args.repo:
meta.repository_remove(repo)
meta.publish_remove_disable_repository(repo)
new_meta = meta.to_string().splitlines()
diff = difflib.unified_diff(old_meta, new_meta, fromfile="old", tofile="new")
print("\n".join(diff))
if not args.yes:
print()
print(f"You're changing meta of project '{args.project}'")
reply = raw_input("Do you want to apply the changes? [y/N] ").lower()
if reply != "y":
raise oscerr.UserAbort()
meta.to_api(args.apiurl, args.project)

4
osc/output/__init__.py Normal file
View File

@ -0,0 +1,4 @@
from .key_value_table import KeyValueTable
from .tty import colorize
from .widechar import wc_ljust
from .widechar import wc_width

View File

@ -0,0 +1,78 @@
from . import tty
from . import widechar
class KeyValueTable:
class NewLine:
pass
def __init__(self):
self.rows = []
def add(self, key, value, color=None, key_color=None, indent=0):
if value is None:
lines = []
elif isinstance(value, (list, tuple)):
lines = value[:]
else:
lines = value.splitlines()
if not lines:
lines = [""]
# add the first line with the key
self.rows.append((key, lines[0], color, key_color, indent))
# then add the continuation lines without the key
for line in lines[1:]:
self.rows.append(("", line, color, key_color, 0))
def newline(self):
self.rows.append((self.NewLine, None, None, None, 0))
def __str__(self):
if not self.rows:
return ""
col1_width = max([widechar.wc_width(key) + indent for key, _, _, _, indent in self.rows if key != self.NewLine])
result = []
skip = False
for row_num in range(len(self.rows)):
if skip:
skip = False
continue
key, value, color, key_color, indent = self.rows[row_num]
if key == self.NewLine:
result.append("")
continue
next_indent = 0 # fake value
if not value and row_num < len(self.rows) - 1:
# let's peek if there's a continuation line we could merge instead of the blank value
next_key, next_value, next_color, next_key_color, next_indent = self.rows[row_num + 1]
if not next_key:
value = next_value
color = next_color
key_color = next_key_color
row_num += 1
skip = True
line = indent * " "
if not value and next_indent > 0:
# no value, the key represents a section followed by indented keys -> skip ljust() and " : " separator
line += tty.colorize(key, key_color)
else:
line += tty.colorize(widechar.wc_ljust(key, col1_width - indent), key_color)
if not key:
# continuation line without a key -> skip " : " separator
line += " "
else:
line += " : "
line += tty.colorize(value, color)
result.append(line)
return "\n".join(result)

38
osc/output/tty.py Normal file
View File

@ -0,0 +1,38 @@
import os
import sys
IS_INTERACTIVE = os.isatty(sys.stdout.fileno())
ESCAPE_CODES = {
"reset": "\033[0m",
"bold": "\033[1m",
"underline": "\033[4m",
"black": "\033[30m",
"red": "\033[31m",
"green": "\033[32m",
"yellow": "\033[33m",
"blue": "\033[34m",
"magenta": "\033[35m",
"cyan": "\033[36m",
"white": "\033[37m",
}
def colorize(text, color):
"""
Colorize `text` if the `color` is specified and we're running in an interactive terminal.
"""
if not IS_INTERACTIVE:
return text
if not color:
return text
result = ""
for i in color.split(","):
result += ESCAPE_CODES[i]
result += text
result += ESCAPE_CODES["reset"]
return result

22
osc/output/widechar.py Normal file
View File

@ -0,0 +1,22 @@
import unicodedata
def wc_width(text):
result = 0
for char in text:
if unicodedata.east_asian_width(char) in ("F", "W"):
result += 2
else:
result += 1
return result
def wc_ljust(text, width, fillchar=" "):
text_width = wc_width(text)
fill_width = wc_width(fillchar)
while text_width + fill_width <= width:
text += fillchar
text_width += fill_width
return text

View File

@ -35,6 +35,7 @@ packages =
osc
osc._private
osc.commands
osc.output
osc.util
install_requires =
cryptography

71
tests/test_output.py Normal file
View File

@ -0,0 +1,71 @@
import unittest
from osc.output import KeyValueTable
class TestKeyValueTable(unittest.TestCase):
def test_empty(self):
t = KeyValueTable()
self.assertEqual(str(t), "")
def test_simple(self):
t = KeyValueTable()
t.add("Key", "Value")
t.add("FooBar", "Text")
expected = """
Key : Value
FooBar : Text
""".strip()
self.assertEqual(str(t), expected)
def test_newline(self):
t = KeyValueTable()
t.add("Key", "Value")
t.newline()
t.add("FooBar", "Text")
expected = """
Key : Value
FooBar : Text
""".strip()
self.assertEqual(str(t), expected)
def test_continuation(self):
t = KeyValueTable()
t.add("Key", ["Value1", "Value2"])
expected = """
Key : Value1
Value2
""".strip()
self.assertEqual(str(t), expected)
def test_section(self):
t = KeyValueTable()
t.add("Section", None)
t.add("Key", "Value", indent=4)
t.add("FooBar", "Text", indent=4)
expected = """
Section
Key : Value
FooBar : Text
""".strip()
self.assertEqual(str(t), expected)
def test_wide_chars(self):
t = KeyValueTable()
t.add("Key", "Value")
t.add("🚀🚀🚀", "Value")
expected = """
Key : Value
🚀🚀🚀 : Value
""".strip()
self.assertEqual(str(t), expected)
if __name__ == "__main__":
unittest.main()