1
0
mirror of https://github.com/openSUSE/osc.git synced 2025-01-01 04:36:13 +01:00

Merge pull request #1495 from dmach/xmlmodel-migrate-from-show_project_meta-show_package_meta

Migrate more code from show_project_meta() and show_package_meta() to xml models
This commit is contained in:
Daniel Mach 2024-02-26 09:39:15 +01:00 committed by GitHub
commit 88f2faf181
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 199 additions and 281 deletions

View File

@ -1,154 +0,0 @@
from . import api
from .api import ET
from .. import core as osc_core
from .. import oscerr
class APIXMLBase:
def __init__(self, xml_root, apiurl=None):
self.root = xml_root
self.apiurl = apiurl
def to_bytes(self):
api.xml_indent(self.root)
return ET.tostring(self.root, encoding="utf-8")
def to_string(self):
return self.to_bytes().decode("utf-8")
class ProjectMeta(APIXMLBase):
@classmethod
def from_api(cls, apiurl, project):
url_path = ["source", project, "_meta"]
root = api.get(apiurl, url_path)
obj = cls(root, apiurl=apiurl)
return obj
def to_api(self, apiurl, project):
url_path = ["source", project, "_meta"]
api.put(apiurl, url_path, data=self.to_bytes())
def repository_list(self):
result = []
repo_nodes = api.find_nodes(self.root, "project", "repository")
for repo_node in repo_nodes:
arch_nodes = api.find_nodes(repo_node, "repository", "arch")
path_nodes = api.find_nodes(repo_node, "repository", "path")
repo = {
"name": repo_node.attrib["name"],
"archs": [i.text.strip() for i in arch_nodes],
"paths": [i.attrib.copy() for i in path_nodes],
}
result.append(repo)
return result
def repository_add(self, name, arches, paths):
node = api.find_node(self.root, "project")
existing = api.find_node(self.root, "project", "repository", {"name": name})
if existing:
raise oscerr.OscValueError(f"Repository '{name}' already exists in project meta")
repo_node = ET.SubElement(node, "repository", attrib={"name": name})
for path_data in paths:
ET.SubElement(repo_node, "path", attrib={
"project": path_data["project"],
"repository": path_data["repository"],
})
for arch in arches:
arch_node = ET.SubElement(repo_node, "arch")
arch_node.text = arch
api.group_child_nodes(repo_node)
api.group_child_nodes(node)
def repository_remove(self, name):
repo_node = api.find_node(self.root, "project", "repository", {"name": name})
if repo_node is None:
return
self.root.remove(repo_node)
def publish_add_disable_repository(self, name: str):
publish_node = api.find_node(self.root, "project", "publish")
if publish_node is None:
project_node = api.find_node(self.root, "project")
publish_node = ET.SubElement(project_node, "publish")
else:
disable_node = api.find_node(publish_node, "publish", "disable", {"repository": name})
if disable_node is not None:
return
ET.SubElement(publish_node, "disable", attrib={"repository": name})
api.group_child_nodes(publish_node)
def publish_remove_disable_repository(self, name: str):
publish_node = api.find_node(self.root, "project", "publish")
if publish_node is None:
return
disable_node = api.find_node(publish_node, "publish", "disable", {"repository": name})
if disable_node is not None:
publish_node.remove(disable_node)
if len(publish_node) == 0:
self.root.remove(publish_node)
REPOSITORY_FLAGS_TEMPLATE = {
"build": None,
"debuginfo": None,
"publish": None,
"useforbuild": None,
}
def _update_repository_flags(self, repository_flags, xml_root):
"""
Update `repository_flags` with data from the `xml_root`.
"""
for flag in self.REPOSITORY_FLAGS_TEMPLATE:
flag_node = xml_root.find(flag)
if flag_node is None:
continue
for node in flag_node:
action = node.tag
repo = node.get("repository")
arch = node.get("arch")
for (entry_repo, entry_arch), entry_data in repository_flags.items():
match = False
if (repo, arch) == (entry_repo, entry_arch):
# apply to matching repository and architecture
match = True
elif repo == entry_repo and not arch:
# apply to all matching repositories
match = True
elif not repo and arch == entry_arch:
# apply to all matching architectures
match = True
elif not repo and not arch:
# apply to everything
match = True
if match:
entry_data[flag] = True if action == "enable" else False
def resolve_repository_flags(self, package=None):
"""
Resolve the `build`, `debuginfo`, `publish` and `useforbuild` flags
and return their values for each repository and build arch.
:returns: {(repo_name, repo_buildarch): {flag_name: bool} for all available repos
"""
result = {}
# TODO: avoid calling get_repos_of_project(), use self.root instead
for repo in osc_core.get_repos_of_project(self.apiurl, self.root.attrib["name"]):
result[(repo.name, repo.arch)] = self.REPOSITORY_FLAGS_TEMPLATE.copy()
self._update_repository_flags(result, self.root)
if package:
m = osc_core.show_package_meta(self.apiurl, self.root.attrib["name"], package)
root = ET.fromstring(b''.join(m))
self._update_repository_flags(result, root)
return result

View File

@ -9048,7 +9048,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
delPerson(apiurl, prj, pac, opts.delete, role) delPerson(apiurl, prj, pac, opts.delete, role)
elif opts.devel_project: elif opts.devel_project:
# XXX: does it really belong to this command? # XXX: does it really belong to this command?
setDevelProject(apiurl, prj, pac, opts.devel_project) set_devel_project(apiurl, prj, pac, opts.devel_project)
else: else:
if pac: if pac:
m = show_package_meta(apiurl, prj, pac) m = show_package_meta(apiurl, prj, pac)

View File

@ -5,6 +5,7 @@ class RepoCommand(osc.commandline.OscCommand):
""" """
Manage repositories in project meta Manage repositories in project meta
""" """
name = "repo" name = "repo"
def run(self, args): def run(self, args):

View File

@ -1,9 +1,9 @@
import difflib import difflib
import osc.commandline import osc.commandline
from .. import obs_api
from .. import oscerr from .. import oscerr
from .._private.project import ProjectMeta from ..output import get_user_input
from ..core import raw_input
class RepoAddCommand(osc.commandline.OscCommand): class RepoAddCommand(osc.commandline.OscCommand):
@ -61,22 +61,50 @@ class RepoAddCommand(osc.commandline.OscCommand):
project, repo = path.split("/") project, repo = path.split("/")
paths.append({"project": project, "repository": repo}) paths.append({"project": project, "repository": repo})
meta = ProjectMeta.from_api(args.apiurl, args.project) project_obj = obs_api.Project.from_api(args.apiurl, args.project)
old_meta = meta.to_string().splitlines() old = project_obj.to_string()
matching_repos = [i for i in project_obj.repository_list or [] if i.name == args.repo]
if matching_repos:
raise oscerr.OscValueError(f"Repository '{args.repo}' already exists in project meta")
project_obj.repository_list.append(
{
"name": args.repo,
"arch_list": args.arches,
"path_list": paths,
}
)
meta.repository_add(args.repo, args.arches, paths)
if args.disable_publish: if args.disable_publish:
meta.publish_add_disable_repository(args.repo) matching_publish_disable_repos = [
i for i in project_obj.publish_list or [] if i.flag == "disable" and i.repository == args.repo
new_meta = meta.to_string().splitlines() ]
diff = difflib.unified_diff(old_meta, new_meta, fromfile="old", tofile="new") if not matching_publish_disable_repos:
print("\n".join(diff)) if project_obj.publish_list is None:
project_obj.publish_list = []
project_obj.publish_list.append(
{
"flag": "disable",
"repository": args.repo,
}
)
if not args.yes: if not args.yes:
new = project_obj.to_string()
diff = difflib.unified_diff(old.splitlines(), new.splitlines(), fromfile="old", tofile="new")
print("\n".join(diff))
print() print()
print(f"You're changing meta of project '{args.project}'")
reply = raw_input("Do you want to apply the changes? [y/N] ").lower() reply = get_user_input(
if reply != "y": f"""
You're changing meta of project '{args.project}'
Do you want to apply the changes?
""",
answers={"y": "yes", "n": "no"},
)
if reply == "n":
raise oscerr.UserAbort() raise oscerr.UserAbort()
meta.to_api(args.apiurl, args.project) project_obj.to_api(args.apiurl)

View File

@ -1,6 +1,6 @@
import osc.commandline import osc.commandline
from .. import obs_api
from ..output import KeyValueTable from ..output import KeyValueTable
from .._private.project import ProjectMeta
class RepoListCommand(osc.commandline.OscCommand): class RepoListCommand(osc.commandline.OscCommand):
@ -19,9 +19,9 @@ class RepoListCommand(osc.commandline.OscCommand):
) )
def run(self, args): def run(self, args):
meta = ProjectMeta.from_api(args.apiurl, args.project) project_obj = obs_api.Project.from_api(args.apiurl, args.project)
repo_flags = project_obj.resolve_repository_flags()
repo_flags = meta.resolve_repository_flags()
flag_map = {} flag_map = {}
for (repo_name, arch), data in repo_flags.items(): for (repo_name, arch), data in repo_flags.items():
for flag_name, flag_value in data.items(): for flag_name, flag_value in data.items():
@ -31,18 +31,18 @@ class RepoListCommand(osc.commandline.OscCommand):
flag_map.setdefault(repo_name, {}).setdefault(flag_name, {}).setdefault(action, []).append(arch) flag_map.setdefault(repo_name, {}).setdefault(flag_name, {}).setdefault(action, []).append(arch)
table = KeyValueTable() table = KeyValueTable()
for repo in meta.repository_list(): for repo in project_obj.repository_list or []:
table.add("Repository", repo["name"], color="bold") table.add("Repository", repo.name, color="bold")
table.add("Architectures", ", ".join(repo["archs"])) table.add("Architectures", ", ".join(repo.arch_list))
if repo["paths"]: if repo.path_list:
paths = [f"{path['project']}/{path['repository']}" for path in repo["paths"]] paths = [f"{path.project}/{path.repository}" for path in repo.path_list]
table.add("Paths", paths) table.add("Paths", paths)
if repo["name"] in flag_map: if repo.name in flag_map:
table.add("Flags", None) table.add("Flags", None)
for flag_name in flag_map[repo["name"]]: for flag_name in flag_map[repo.name]:
lines = [] lines = []
for action, archs in flag_map[repo["name"]][flag_name].items(): for action, archs in flag_map[repo.name][flag_name].items():
lines.append(f"{action + ':':<8s} {', '.join(archs)}") lines.append(f"{action + ':':<8s} {', '.join(archs)}")
lines.sort() lines.sort()
table.add(flag_name, lines, indent=4) table.add(flag_name, lines, indent=4)

View File

@ -1,9 +1,9 @@
import difflib import difflib
import osc.commandline import osc.commandline
from .. import obs_api
from .. import oscerr from .. import oscerr
from .._private.project import ProjectMeta from ..output import get_user_input
from ..core import raw_input
class RepoRemoveCommand(osc.commandline.OscCommand): class RepoRemoveCommand(osc.commandline.OscCommand):
@ -34,22 +34,35 @@ class RepoRemoveCommand(osc.commandline.OscCommand):
) )
def run(self, args): def run(self, args):
meta = ProjectMeta.from_api(args.apiurl, args.project) project_obj = obs_api.Project.from_api(args.apiurl, args.project)
old_meta = meta.to_string().splitlines() old = project_obj.to_string()
for repo in args.repo: for repo in args.repo:
meta.repository_remove(repo) if project_obj.repository_list is not None:
meta.publish_remove_disable_repository(repo) project_obj.repository_list = [i for i in project_obj.repository_list if i.name != repo]
if project_obj.publish_list is not None:
project_obj.publish_list = [
i for i in project_obj.publish_list if i.flag != "disable" or i.repository != repo
]
new_meta = meta.to_string().splitlines() if not project_obj.has_changed():
diff = difflib.unified_diff(old_meta, new_meta, fromfile="old", tofile="new") return
print("\n".join(diff))
if not args.yes: if not args.yes:
new = project_obj.to_string()
diff = difflib.unified_diff(old.splitlines(), new.splitlines(), fromfile="old", tofile="new")
print("\n".join(diff))
print() print()
print(f"You're changing meta of project '{args.project}'")
reply = raw_input("Do you want to apply the changes? [y/N] ").lower() reply = get_user_input(
if reply != "y": f"""
You're changing meta of project '{args.project}'
Do you want to apply the changes?
""",
answers={"y": "yes", "n": "no"},
)
if reply == "n":
raise oscerr.UserAbort() raise oscerr.UserAbort()
meta.to_api(args.apiurl, args.project) project_obj.to_api(args.apiurl)

View File

@ -2335,39 +2335,42 @@ rev: %s
for the updatepacmetafromspec subcommand for the updatepacmetafromspec subcommand
argument force supress the confirm question argument force supress the confirm question
""" """
from . import obs_api
from .output import get_user_input
m = b''.join(show_package_meta(self.apiurl, self.prjname, self.name)) package_obj = obs_api.Package.from_api(self.apiurl, self.prjname, self.name)
old = package_obj.to_string()
package_obj.title = self.summary.strip()
package_obj.description = "".join(self.descr).strip()
package_obj.url = self.url.strip()
new = package_obj.to_string()
root = ET.fromstring(m) if not package_obj.has_changed():
root.find('title').text = self.summary return
root.find('description').text = ''.join(self.descr)
url = root.find('url')
if url is None:
url = ET.SubElement(root, 'url')
url.text = self.url
def delegate(force=False): return make_meta_url('pkg', if force:
(self.prjname, self.name), reply = "y"
self.apiurl, force=force)
url_factory = metafile._URLFactory(delegate)
mf = metafile(url_factory, ET.tostring(root, encoding=ET_ENCODING))
if not force:
print('*' * 36, 'old', '*' * 36)
print(decode_it(m))
print('*' * 36, 'new', '*' * 36)
print(ET.tostring(root, encoding=ET_ENCODING))
print('*' * 72)
repl = raw_input('Write? (y/N/e) ')
else: else:
repl = 'y' while True:
print("\n".join(difflib.unified_diff(old.splitlines(), new.splitlines(), fromfile="old", tofile="new")))
print()
if repl == 'y': reply = get_user_input(
mf.sync() "Write?",
elif repl == 'e': answers={"y": "yes", "n": "no", "e": "edit"},
mf.edit() )
if reply == "y":
break
if reply == "n":
break
if reply == "e":
_, _, edited_obj = package_obj.do_edit()
package_obj.do_update(edited_obj)
new = package_obj.to_string()
continue
mf.discard() if reply == "y":
package_obj.to_api(self.apiurl)
def mark_frozen(self): def mark_frozen(self):
store_write_string(self.absdir, '_frozenlink', '') store_write_string(self.absdir, '_frozenlink', '')
@ -3853,15 +3856,14 @@ def download_assets(directory):
def show_scmsync(apiurl, prj, pac=None): def show_scmsync(apiurl, prj, pac=None):
from . import obs_api
if pac: if pac:
m = show_package_meta(apiurl, prj, pac) package_obj = obs_api.Package.from_api(apiurl, prj, pac)
else: return package_obj.scmsync
m = show_project_meta(apiurl, prj)
node = ET.fromstring(b''.join(m)).find('scmsync') project_obj = obs_api.Project.from_api(apiurl, prj)
if node is None: return project_obj.scmsync
return None
else:
return node.text
def show_devel_project(apiurl, prj, pac): def show_devel_project(apiurl, prj, pac):
@ -3908,20 +3910,15 @@ def set_devel_project(apiurl, prj, pac, devprj=None, devpac=None, print_to="debu
def show_package_disabled_repos(apiurl: str, prj: str, pac: str): def show_package_disabled_repos(apiurl: str, prj: str, pac: str):
m = show_package_meta(apiurl, prj, pac) from . import obs_api
# FIXME: don't work if all repos of a project are disabled and only some are enabled since <disable/> is empty # FIXME: don't work if all repos of a project are disabled and only some are enabled since <disable/> is empty
try: package_obj = obs_api.Package.from_api(apiurl, prj, pac)
root = ET.fromstring(''.join(m)) result = []
elm = root.find('build') for i in package_obj.build_list or []:
r = [] if i.flag == "disable":
for node in elm.findall('disable'): result.append({"repo": i.repository, "arch": i.arch})
repo = node.get('repository') return result
arch = node.get('arch')
dis_r = {'repo': repo, 'arch': arch}
r.append(dis_r)
return r
except:
return None
def show_pattern_metalist(apiurl: str, prj: str): def show_pattern_metalist(apiurl: str, prj: str):
@ -6446,11 +6443,10 @@ def get_platforms_of_project(apiurl: str, prj: str):
def get_repositories_of_project(apiurl: str, prj: str): def get_repositories_of_project(apiurl: str, prj: str):
f = show_project_meta(apiurl, prj) from . import obs_api
root = ET.fromstring(b''.join(f))
r = [node.get('name') for node in root.findall('repository')] project_obj = obs_api.Project.from_api(apiurl, prj)
return r return [i.name for i in project_obj.repository_list or []]
class Repo: class Repo:
@ -6488,13 +6484,13 @@ class Repo:
f.write(f'{repo.name} {repo.arch}\n') f.write(f'{repo.name} {repo.arch}\n')
def get_repos_of_project(apiurl, prj): def get_repos_of_project(apiurl: str, prj: str):
f = show_project_meta(apiurl, prj) from . import obs_api
root = ET.fromstring(b''.join(f))
for node in root.findall('repository'): project_obj = obs_api.Project.from_api(apiurl, prj)
for node2 in node.findall('arch'): for repo in project_obj.repository_list or []:
yield Repo(node.get('name'), node2.text) for arch in repo.arch_list:
yield Repo(repo.name, arch)
def get_binarylist( def get_binarylist(
@ -7981,36 +7977,6 @@ def setBugowner(apiurl: str, prj: str, pac: str, user=None, group=None):
data=ET.tostring(root, encoding=ET_ENCODING)) data=ET.tostring(root, encoding=ET_ENCODING))
def setDevelProject(apiurl, prj, pac, dprj, dpkg=None):
""" set the <devel project="..."> element to package metadata"""
path = (prj, pac)
data = meta_exists(metatype='pkg',
path_args=path,
template_args=None,
create_new=False)
if data and show_project_meta(apiurl, dprj) is not None:
root = ET.fromstring(parse_meta_to_string(data))
if not root.find('devel') is not None:
ET.SubElement(root, 'devel')
elem = root.find('devel')
if dprj:
elem.set('project', dprj)
else:
if 'project' in elem.keys():
del elem.attrib['project']
if dpkg:
elem.set('package', dpkg)
else:
if 'package' in elem.keys():
del elem.attrib['package']
edit_meta(metatype='pkg',
path_args=path,
data=ET.tostring(root, encoding=ET_ENCODING))
else:
print("osc: an error occured")
def createPackageDir(pathname, prj_obj=None): def createPackageDir(pathname, prj_obj=None):
""" """
create and initialize a new package dir in the given project. create and initialize a new package dir in the given project.

View File

@ -8,6 +8,7 @@ from .project_link import ProjectLink
from .project_maintenance_maintains import ProjectMaintenanceMaintains from .project_maintenance_maintains import ProjectMaintenanceMaintains
from .repository import Repository from .repository import Repository
from .simple_flag import SimpleFlag from .simple_flag import SimpleFlag
from .status import Status
class Project(XmlModel): class Project(XmlModel):
@ -112,3 +113,39 @@ class Project(XmlModel):
url_query = {} url_query = {}
response = cls.xml_request("GET", apiurl, url_path, url_query) response = cls.xml_request("GET", apiurl, url_path, url_query)
return cls.from_file(response) return cls.from_file(response)
def to_api(self, apiurl, *, project=None):
project = project or self.name
url_path = ["source", project, "_meta"]
url_query = {}
response = self.xml_request("PUT", apiurl, url_path, url_query, data=self.to_string())
return Status.from_file(response)
def resolve_repository_flags(self, package_obj=None):
"""
Resolve the `build`, `debuginfo`, `publish` and `useforbuild` flags
and return their values for each repository and build arch.
:returns: {(repo_name, repo_buildarch): {flag_name: bool} for all available repos
"""
result = {}
flag_names = ("build", "debuginfo", "publish", "useforbuild")
# populate the result matrix: {(repo, arch): {"build": None, "debuginfo": None, "publish": None, "useforbuild": None}}
for repo_obj in self.repository_list or []:
for arch in repo_obj.arch_list or []:
result[(repo_obj.name, arch)] = dict([(flag_name, None) for flag_name in flag_names])
for flag_name in flag_names:
flag_objects = getattr(self, f"{flag_name}_list") or []
if package_obj is not None:
flag_objects += getattr(package_obj, f"{flag_name}_list") or []
for flag_obj in flag_objects:
# look up entries matching the current flag and change their values according to the flag's tag
for (entry_repo, entry_arch), entry_data in result.items():
match = flag_obj.repository in (entry_repo, None) and flag_obj.arch in (entry_arch, None)
if match:
entry_data[flag_name] = True if flag_obj.flag == "enable" else False
return result

View File

@ -249,14 +249,28 @@ class Field(property):
def get(self, obj): def get(self, obj):
try: try:
result = obj._values[self.name] result = obj._values[self.name]
# convert dictionaries into objects
# we can't do it earlier because list is a standalone object that is not under our control
if result is not None and self.is_model_list:
for num, i in enumerate(result):
if isinstance(i, dict):
klass = self.inner_type
result[num] = klass(**i)
if self.get_callback is not None: if self.get_callback is not None:
result = self.get_callback(obj, result) result = self.get_callback(obj, result)
return result return result
except KeyError: except KeyError:
pass pass
try: try:
result = obj._defaults[self.name] result = obj._defaults[self.name]
if isinstance(result, (dict, list)):
# make a deepcopy to avoid problems with mutable defaults
result = copy.deepcopy(result)
obj._values[self.name] = result
if self.get_callback is not None: if self.get_callback is not None:
result = self.get_callback(obj, result) result = self.get_callback(obj, result)
return result return result

View File

@ -339,6 +339,19 @@ class Test(unittest.TestCase):
m.field = [{"text": "one"}, {"text": "two"}] m.field = [{"text": "one"}, {"text": "two"}]
self.assertFalse(m.has_changed()) self.assertFalse(m.has_changed())
def test_append_dict(self):
class TestSubmodel(BaseModel):
text: str = Field(default="default")
class TestModel(BaseModel):
field: Optional[List[TestSubmodel]] = Field(default=[])
m = TestModel()
m.field.append({"text": "value"})
# dict is converted to object next time the field is retrieved
self.assertIsInstance(m.field[0], BaseModel)
self.assertEqual(m.field[0].text, "value")
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()