diff --git a/osc/_private/project.py b/osc/_private/project.py
deleted file mode 100644
index 239f594b..00000000
--- a/osc/_private/project.py
+++ /dev/null
@@ -1,154 +0,0 @@
-from . import api
-from .api import ET
-from .. import core as osc_core
-from .. import oscerr
-
-
-class APIXMLBase:
- def __init__(self, xml_root, apiurl=None):
- self.root = xml_root
- self.apiurl = apiurl
-
- def to_bytes(self):
- api.xml_indent(self.root)
- return ET.tostring(self.root, encoding="utf-8")
-
- def to_string(self):
- return self.to_bytes().decode("utf-8")
-
-
-class ProjectMeta(APIXMLBase):
- @classmethod
- def from_api(cls, apiurl, project):
- url_path = ["source", project, "_meta"]
- root = api.get(apiurl, url_path)
- obj = cls(root, apiurl=apiurl)
- return obj
-
- def to_api(self, apiurl, project):
- url_path = ["source", project, "_meta"]
- api.put(apiurl, url_path, data=self.to_bytes())
-
- def repository_list(self):
- result = []
- repo_nodes = api.find_nodes(self.root, "project", "repository")
- for repo_node in repo_nodes:
- arch_nodes = api.find_nodes(repo_node, "repository", "arch")
- path_nodes = api.find_nodes(repo_node, "repository", "path")
- repo = {
- "name": repo_node.attrib["name"],
- "archs": [i.text.strip() for i in arch_nodes],
- "paths": [i.attrib.copy() for i in path_nodes],
- }
- result.append(repo)
- return result
-
- def repository_add(self, name, arches, paths):
- node = api.find_node(self.root, "project")
-
- existing = api.find_node(self.root, "project", "repository", {"name": name})
- if existing:
- raise oscerr.OscValueError(f"Repository '{name}' already exists in project meta")
-
- repo_node = ET.SubElement(node, "repository", attrib={"name": name})
-
- for path_data in paths:
- ET.SubElement(repo_node, "path", attrib={
- "project": path_data["project"],
- "repository": path_data["repository"],
- })
-
- for arch in arches:
- arch_node = ET.SubElement(repo_node, "arch")
- arch_node.text = arch
-
- api.group_child_nodes(repo_node)
- api.group_child_nodes(node)
-
- def repository_remove(self, name):
- repo_node = api.find_node(self.root, "project", "repository", {"name": name})
- if repo_node is None:
- return
- self.root.remove(repo_node)
-
- def publish_add_disable_repository(self, name: str):
- publish_node = api.find_node(self.root, "project", "publish")
- if publish_node is None:
- project_node = api.find_node(self.root, "project")
- publish_node = ET.SubElement(project_node, "publish")
- else:
- disable_node = api.find_node(publish_node, "publish", "disable", {"repository": name})
- if disable_node is not None:
- return
-
- ET.SubElement(publish_node, "disable", attrib={"repository": name})
- api.group_child_nodes(publish_node)
-
- def publish_remove_disable_repository(self, name: str):
- publish_node = api.find_node(self.root, "project", "publish")
- if publish_node is None:
- return
-
- disable_node = api.find_node(publish_node, "publish", "disable", {"repository": name})
- if disable_node is not None:
- publish_node.remove(disable_node)
-
- if len(publish_node) == 0:
- self.root.remove(publish_node)
-
- REPOSITORY_FLAGS_TEMPLATE = {
- "build": None,
- "debuginfo": None,
- "publish": None,
- "useforbuild": None,
- }
-
- def _update_repository_flags(self, repository_flags, xml_root):
- """
- Update `repository_flags` with data from the `xml_root`.
- """
- for flag in self.REPOSITORY_FLAGS_TEMPLATE:
- flag_node = xml_root.find(flag)
- if flag_node is None:
- continue
- for node in flag_node:
- action = node.tag
- repo = node.get("repository")
- arch = node.get("arch")
- for (entry_repo, entry_arch), entry_data in repository_flags.items():
- match = False
- if (repo, arch) == (entry_repo, entry_arch):
- # apply to matching repository and architecture
- match = True
- elif repo == entry_repo and not arch:
- # apply to all matching repositories
- match = True
- elif not repo and arch == entry_arch:
- # apply to all matching architectures
- match = True
- elif not repo and not arch:
- # apply to everything
- match = True
- if match:
- entry_data[flag] = True if action == "enable" else False
-
- def resolve_repository_flags(self, package=None):
- """
- Resolve the `build`, `debuginfo`, `publish` and `useforbuild` flags
- and return their values for each repository and build arch.
-
- :returns: {(repo_name, repo_buildarch): {flag_name: bool} for all available repos
- """
- result = {}
- # TODO: avoid calling get_repos_of_project(), use self.root instead
- for repo in osc_core.get_repos_of_project(self.apiurl, self.root.attrib["name"]):
- result[(repo.name, repo.arch)] = self.REPOSITORY_FLAGS_TEMPLATE.copy()
-
- self._update_repository_flags(result, self.root)
-
- if package:
- m = osc_core.show_package_meta(self.apiurl, self.root.attrib["name"], package)
- root = ET.fromstring(b''.join(m))
- self._update_repository_flags(result, root)
-
- return result
diff --git a/osc/commandline.py b/osc/commandline.py
index ee87b012..8db51fc2 100644
--- a/osc/commandline.py
+++ b/osc/commandline.py
@@ -9048,7 +9048,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
delPerson(apiurl, prj, pac, opts.delete, role)
elif opts.devel_project:
# XXX: does it really belong to this command?
- setDevelProject(apiurl, prj, pac, opts.devel_project)
+ set_devel_project(apiurl, prj, pac, opts.devel_project)
else:
if pac:
m = show_package_meta(apiurl, prj, pac)
diff --git a/osc/commands/repo.py b/osc/commands/repo.py
index 33c3c190..c745adc3 100644
--- a/osc/commands/repo.py
+++ b/osc/commands/repo.py
@@ -5,6 +5,7 @@ class RepoCommand(osc.commandline.OscCommand):
"""
Manage repositories in project meta
"""
+
name = "repo"
def run(self, args):
diff --git a/osc/commands/repo_add.py b/osc/commands/repo_add.py
index ec9ff5ca..9c8dadb9 100644
--- a/osc/commands/repo_add.py
+++ b/osc/commands/repo_add.py
@@ -1,9 +1,9 @@
import difflib
import osc.commandline
+from .. import obs_api
from .. import oscerr
-from .._private.project import ProjectMeta
-from ..core import raw_input
+from ..output import get_user_input
class RepoAddCommand(osc.commandline.OscCommand):
@@ -61,22 +61,50 @@ class RepoAddCommand(osc.commandline.OscCommand):
project, repo = path.split("/")
paths.append({"project": project, "repository": repo})
- meta = ProjectMeta.from_api(args.apiurl, args.project)
- old_meta = meta.to_string().splitlines()
+ project_obj = obs_api.Project.from_api(args.apiurl, args.project)
+ old = project_obj.to_string()
+
+ matching_repos = [i for i in project_obj.repository_list or [] if i.name == args.repo]
+ if matching_repos:
+ raise oscerr.OscValueError(f"Repository '{args.repo}' already exists in project meta")
+
+ project_obj.repository_list.append(
+ {
+ "name": args.repo,
+ "arch_list": args.arches,
+ "path_list": paths,
+ }
+ )
- meta.repository_add(args.repo, args.arches, paths)
if args.disable_publish:
- meta.publish_add_disable_repository(args.repo)
-
- new_meta = meta.to_string().splitlines()
- diff = difflib.unified_diff(old_meta, new_meta, fromfile="old", tofile="new")
- print("\n".join(diff))
+ matching_publish_disable_repos = [
+ i for i in project_obj.publish_list or [] if i.flag == "disable" and i.repository == args.repo
+ ]
+ if not matching_publish_disable_repos:
+ if project_obj.publish_list is None:
+ project_obj.publish_list = []
+ project_obj.publish_list.append(
+ {
+ "flag": "disable",
+ "repository": args.repo,
+ }
+ )
if not args.yes:
+ new = project_obj.to_string()
+ diff = difflib.unified_diff(old.splitlines(), new.splitlines(), fromfile="old", tofile="new")
+ print("\n".join(diff))
print()
- print(f"You're changing meta of project '{args.project}'")
- reply = raw_input("Do you want to apply the changes? [y/N] ").lower()
- if reply != "y":
+
+ reply = get_user_input(
+ f"""
+ You're changing meta of project '{args.project}'
+ Do you want to apply the changes?
+ """,
+ answers={"y": "yes", "n": "no"},
+ )
+
+ if reply == "n":
raise oscerr.UserAbort()
- meta.to_api(args.apiurl, args.project)
+ project_obj.to_api(args.apiurl)
diff --git a/osc/commands/repo_list.py b/osc/commands/repo_list.py
index a23ea0f2..111a7e21 100644
--- a/osc/commands/repo_list.py
+++ b/osc/commands/repo_list.py
@@ -1,6 +1,6 @@
import osc.commandline
+from .. import obs_api
from ..output import KeyValueTable
-from .._private.project import ProjectMeta
class RepoListCommand(osc.commandline.OscCommand):
@@ -19,9 +19,9 @@ class RepoListCommand(osc.commandline.OscCommand):
)
def run(self, args):
- meta = ProjectMeta.from_api(args.apiurl, args.project)
+ project_obj = obs_api.Project.from_api(args.apiurl, args.project)
+ repo_flags = project_obj.resolve_repository_flags()
- repo_flags = meta.resolve_repository_flags()
flag_map = {}
for (repo_name, arch), data in repo_flags.items():
for flag_name, flag_value in data.items():
@@ -31,18 +31,18 @@ class RepoListCommand(osc.commandline.OscCommand):
flag_map.setdefault(repo_name, {}).setdefault(flag_name, {}).setdefault(action, []).append(arch)
table = KeyValueTable()
- for repo in meta.repository_list():
- table.add("Repository", repo["name"], color="bold")
- table.add("Architectures", ", ".join(repo["archs"]))
- if repo["paths"]:
- paths = [f"{path['project']}/{path['repository']}" for path in repo["paths"]]
+ for repo in project_obj.repository_list or []:
+ table.add("Repository", repo.name, color="bold")
+ table.add("Architectures", ", ".join(repo.arch_list))
+ if repo.path_list:
+ paths = [f"{path.project}/{path.repository}" for path in repo.path_list]
table.add("Paths", paths)
- if repo["name"] in flag_map:
+ if repo.name in flag_map:
table.add("Flags", None)
- for flag_name in flag_map[repo["name"]]:
+ for flag_name in flag_map[repo.name]:
lines = []
- for action, archs in flag_map[repo["name"]][flag_name].items():
+ for action, archs in flag_map[repo.name][flag_name].items():
lines.append(f"{action + ':':<8s} {', '.join(archs)}")
lines.sort()
table.add(flag_name, lines, indent=4)
diff --git a/osc/commands/repo_remove.py b/osc/commands/repo_remove.py
index 859e919a..5832a438 100644
--- a/osc/commands/repo_remove.py
+++ b/osc/commands/repo_remove.py
@@ -1,9 +1,9 @@
import difflib
import osc.commandline
+from .. import obs_api
from .. import oscerr
-from .._private.project import ProjectMeta
-from ..core import raw_input
+from ..output import get_user_input
class RepoRemoveCommand(osc.commandline.OscCommand):
@@ -34,22 +34,35 @@ class RepoRemoveCommand(osc.commandline.OscCommand):
)
def run(self, args):
- meta = ProjectMeta.from_api(args.apiurl, args.project)
- old_meta = meta.to_string().splitlines()
+ project_obj = obs_api.Project.from_api(args.apiurl, args.project)
+ old = project_obj.to_string()
for repo in args.repo:
- meta.repository_remove(repo)
- meta.publish_remove_disable_repository(repo)
+ if project_obj.repository_list is not None:
+ project_obj.repository_list = [i for i in project_obj.repository_list if i.name != repo]
+ if project_obj.publish_list is not None:
+ project_obj.publish_list = [
+ i for i in project_obj.publish_list if i.flag != "disable" or i.repository != repo
+ ]
- new_meta = meta.to_string().splitlines()
- diff = difflib.unified_diff(old_meta, new_meta, fromfile="old", tofile="new")
- print("\n".join(diff))
+ if not project_obj.has_changed():
+ return
if not args.yes:
+ new = project_obj.to_string()
+ diff = difflib.unified_diff(old.splitlines(), new.splitlines(), fromfile="old", tofile="new")
+ print("\n".join(diff))
print()
- print(f"You're changing meta of project '{args.project}'")
- reply = raw_input("Do you want to apply the changes? [y/N] ").lower()
- if reply != "y":
+
+ reply = get_user_input(
+ f"""
+ You're changing meta of project '{args.project}'
+ Do you want to apply the changes?
+ """,
+ answers={"y": "yes", "n": "no"},
+ )
+
+ if reply == "n":
raise oscerr.UserAbort()
- meta.to_api(args.apiurl, args.project)
+ project_obj.to_api(args.apiurl)
diff --git a/osc/core.py b/osc/core.py
index c20544fc..27ba092b 100644
--- a/osc/core.py
+++ b/osc/core.py
@@ -2335,39 +2335,42 @@ rev: %s
for the updatepacmetafromspec subcommand
argument force supress the confirm question
"""
+ from . import obs_api
+ from .output import get_user_input
- m = b''.join(show_package_meta(self.apiurl, self.prjname, self.name))
+ package_obj = obs_api.Package.from_api(self.apiurl, self.prjname, self.name)
+ old = package_obj.to_string()
+ package_obj.title = self.summary.strip()
+ package_obj.description = "".join(self.descr).strip()
+ package_obj.url = self.url.strip()
+ new = package_obj.to_string()
- root = ET.fromstring(m)
- root.find('title').text = self.summary
- root.find('description').text = ''.join(self.descr)
- url = root.find('url')
- if url is None:
- url = ET.SubElement(root, 'url')
- url.text = self.url
+ if not package_obj.has_changed():
+ return
- def delegate(force=False): return make_meta_url('pkg',
- (self.prjname, self.name),
- self.apiurl, force=force)
- url_factory = metafile._URLFactory(delegate)
- mf = metafile(url_factory, ET.tostring(root, encoding=ET_ENCODING))
-
- if not force:
- print('*' * 36, 'old', '*' * 36)
- print(decode_it(m))
- print('*' * 36, 'new', '*' * 36)
- print(ET.tostring(root, encoding=ET_ENCODING))
- print('*' * 72)
- repl = raw_input('Write? (y/N/e) ')
+ if force:
+ reply = "y"
else:
- repl = 'y'
+ while True:
+ print("\n".join(difflib.unified_diff(old.splitlines(), new.splitlines(), fromfile="old", tofile="new")))
+ print()
- if repl == 'y':
- mf.sync()
- elif repl == 'e':
- mf.edit()
+ reply = get_user_input(
+ "Write?",
+ answers={"y": "yes", "n": "no", "e": "edit"},
+ )
+ if reply == "y":
+ break
+ if reply == "n":
+ break
+ if reply == "e":
+ _, _, edited_obj = package_obj.do_edit()
+ package_obj.do_update(edited_obj)
+ new = package_obj.to_string()
+ continue
- mf.discard()
+ if reply == "y":
+ package_obj.to_api(self.apiurl)
def mark_frozen(self):
store_write_string(self.absdir, '_frozenlink', '')
@@ -3853,15 +3856,14 @@ def download_assets(directory):
def show_scmsync(apiurl, prj, pac=None):
+ from . import obs_api
+
if pac:
- m = show_package_meta(apiurl, prj, pac)
- else:
- m = show_project_meta(apiurl, prj)
- node = ET.fromstring(b''.join(m)).find('scmsync')
- if node is None:
- return None
- else:
- return node.text
+ package_obj = obs_api.Package.from_api(apiurl, prj, pac)
+ return package_obj.scmsync
+
+ project_obj = obs_api.Project.from_api(apiurl, prj)
+ return project_obj.scmsync
def show_devel_project(apiurl, prj, pac):
@@ -3908,20 +3910,15 @@ def set_devel_project(apiurl, prj, pac, devprj=None, devpac=None, print_to="debu
def show_package_disabled_repos(apiurl: str, prj: str, pac: str):
- m = show_package_meta(apiurl, prj, pac)
+ from . import obs_api
+
# FIXME: don't work if all repos of a project are disabled and only some are enabled since is empty
- try:
- root = ET.fromstring(''.join(m))
- elm = root.find('build')
- r = []
- for node in elm.findall('disable'):
- repo = node.get('repository')
- arch = node.get('arch')
- dis_r = {'repo': repo, 'arch': arch}
- r.append(dis_r)
- return r
- except:
- return None
+ package_obj = obs_api.Package.from_api(apiurl, prj, pac)
+ result = []
+ for i in package_obj.build_list or []:
+ if i.flag == "disable":
+ result.append({"repo": i.repository, "arch": i.arch})
+ return result
def show_pattern_metalist(apiurl: str, prj: str):
@@ -6446,11 +6443,10 @@ def get_platforms_of_project(apiurl: str, prj: str):
def get_repositories_of_project(apiurl: str, prj: str):
- f = show_project_meta(apiurl, prj)
- root = ET.fromstring(b''.join(f))
+ from . import obs_api
- r = [node.get('name') for node in root.findall('repository')]
- return r
+ project_obj = obs_api.Project.from_api(apiurl, prj)
+ return [i.name for i in project_obj.repository_list or []]
class Repo:
@@ -6488,13 +6484,13 @@ class Repo:
f.write(f'{repo.name} {repo.arch}\n')
-def get_repos_of_project(apiurl, prj):
- f = show_project_meta(apiurl, prj)
- root = ET.fromstring(b''.join(f))
+def get_repos_of_project(apiurl: str, prj: str):
+ from . import obs_api
- for node in root.findall('repository'):
- for node2 in node.findall('arch'):
- yield Repo(node.get('name'), node2.text)
+ project_obj = obs_api.Project.from_api(apiurl, prj)
+ for repo in project_obj.repository_list or []:
+ for arch in repo.arch_list:
+ yield Repo(repo.name, arch)
def get_binarylist(
@@ -7981,36 +7977,6 @@ def setBugowner(apiurl: str, prj: str, pac: str, user=None, group=None):
data=ET.tostring(root, encoding=ET_ENCODING))
-def setDevelProject(apiurl, prj, pac, dprj, dpkg=None):
- """ set the element to package metadata"""
- path = (prj, pac)
- data = meta_exists(metatype='pkg',
- path_args=path,
- template_args=None,
- create_new=False)
-
- if data and show_project_meta(apiurl, dprj) is not None:
- root = ET.fromstring(parse_meta_to_string(data))
- if not root.find('devel') is not None:
- ET.SubElement(root, 'devel')
- elem = root.find('devel')
- if dprj:
- elem.set('project', dprj)
- else:
- if 'project' in elem.keys():
- del elem.attrib['project']
- if dpkg:
- elem.set('package', dpkg)
- else:
- if 'package' in elem.keys():
- del elem.attrib['package']
- edit_meta(metatype='pkg',
- path_args=path,
- data=ET.tostring(root, encoding=ET_ENCODING))
- else:
- print("osc: an error occured")
-
-
def createPackageDir(pathname, prj_obj=None):
"""
create and initialize a new package dir in the given project.
diff --git a/osc/obs_api/project.py b/osc/obs_api/project.py
index 9c6cf501..ecec9b50 100644
--- a/osc/obs_api/project.py
+++ b/osc/obs_api/project.py
@@ -8,6 +8,7 @@ from .project_link import ProjectLink
from .project_maintenance_maintains import ProjectMaintenanceMaintains
from .repository import Repository
from .simple_flag import SimpleFlag
+from .status import Status
class Project(XmlModel):
@@ -112,3 +113,39 @@ class Project(XmlModel):
url_query = {}
response = cls.xml_request("GET", apiurl, url_path, url_query)
return cls.from_file(response)
+
+ def to_api(self, apiurl, *, project=None):
+ project = project or self.name
+ url_path = ["source", project, "_meta"]
+ url_query = {}
+ response = self.xml_request("PUT", apiurl, url_path, url_query, data=self.to_string())
+ return Status.from_file(response)
+
+ def resolve_repository_flags(self, package_obj=None):
+ """
+ Resolve the `build`, `debuginfo`, `publish` and `useforbuild` flags
+ and return their values for each repository and build arch.
+
+ :returns: {(repo_name, repo_buildarch): {flag_name: bool} for all available repos
+ """
+ result = {}
+ flag_names = ("build", "debuginfo", "publish", "useforbuild")
+
+ # populate the result matrix: {(repo, arch): {"build": None, "debuginfo": None, "publish": None, "useforbuild": None}}
+ for repo_obj in self.repository_list or []:
+ for arch in repo_obj.arch_list or []:
+ result[(repo_obj.name, arch)] = dict([(flag_name, None) for flag_name in flag_names])
+
+ for flag_name in flag_names:
+ flag_objects = getattr(self, f"{flag_name}_list") or []
+ if package_obj is not None:
+ flag_objects += getattr(package_obj, f"{flag_name}_list") or []
+
+ for flag_obj in flag_objects:
+ # look up entries matching the current flag and change their values according to the flag's tag
+ for (entry_repo, entry_arch), entry_data in result.items():
+ match = flag_obj.repository in (entry_repo, None) and flag_obj.arch in (entry_arch, None)
+ if match:
+ entry_data[flag_name] = True if flag_obj.flag == "enable" else False
+
+ return result
diff --git a/osc/util/models.py b/osc/util/models.py
index b3ec4037..7f03bacc 100644
--- a/osc/util/models.py
+++ b/osc/util/models.py
@@ -249,14 +249,28 @@ class Field(property):
def get(self, obj):
try:
result = obj._values[self.name]
+
+ # convert dictionaries into objects
+ # we can't do it earlier because list is a standalone object that is not under our control
+ if result is not None and self.is_model_list:
+ for num, i in enumerate(result):
+ if isinstance(i, dict):
+ klass = self.inner_type
+ result[num] = klass(**i)
+
if self.get_callback is not None:
result = self.get_callback(obj, result)
+
return result
except KeyError:
pass
try:
result = obj._defaults[self.name]
+ if isinstance(result, (dict, list)):
+ # make a deepcopy to avoid problems with mutable defaults
+ result = copy.deepcopy(result)
+ obj._values[self.name] = result
if self.get_callback is not None:
result = self.get_callback(obj, result)
return result
diff --git a/tests/test_models.py b/tests/test_models.py
index 89f9d981..84ae542b 100644
--- a/tests/test_models.py
+++ b/tests/test_models.py
@@ -339,6 +339,19 @@ class Test(unittest.TestCase):
m.field = [{"text": "one"}, {"text": "two"}]
self.assertFalse(m.has_changed())
+ def test_append_dict(self):
+ class TestSubmodel(BaseModel):
+ text: str = Field(default="default")
+
+ class TestModel(BaseModel):
+ field: Optional[List[TestSubmodel]] = Field(default=[])
+
+ m = TestModel()
+ m.field.append({"text": "value"})
+ # dict is converted to object next time the field is retrieved
+ self.assertIsInstance(m.field[0], BaseModel)
+ self.assertEqual(m.field[0].text, "value")
+
if __name__ == "__main__":
unittest.main()