1
0
mirror of https://github.com/openSUSE/osc.git synced 2025-01-01 04:36:13 +01:00

Merge pull request #1349 from dmach/xmlmodel

Object wrappers for OBS XML
This commit is contained in:
Daniel Mach 2024-02-23 10:15:05 +01:00 committed by GitHub
commit af24f3c75a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 1243 additions and 42 deletions

View File

@ -15,8 +15,6 @@ Scenario: Run `osc setdevelproject <devel_project>`
And stdout is And stdout is
""" """
Setting devel project of package 'test:factory/test-pkgA' to package 'test:devel/test-pkgA' Setting devel project of package 'test:factory/test-pkgA' to package 'test:devel/test-pkgA'
Sending meta data...
Done.
""" """
@ -27,8 +25,6 @@ Scenario: Run `osc setdevelproject <devel_project> <devel_package>`
And stdout is And stdout is
""" """
Setting devel project of package 'test:factory/test-pkgA' to package 'test:devel/test-pkgA' Setting devel project of package 'test:factory/test-pkgA' to package 'test:devel/test-pkgA'
Sending meta data...
Done.
""" """
@ -39,8 +35,6 @@ Scenario: Run `osc setdevelproject <devel_project>/<devel_package>`
And stdout is And stdout is
""" """
Setting devel project of package 'test:factory/test-pkgA' to package 'test:devel/test-pkgA' Setting devel project of package 'test:factory/test-pkgA' to package 'test:devel/test-pkgA'
Sending meta data...
Done.
""" """
@ -52,6 +46,4 @@ Scenario: Run `osc setdevelproject --unset`
And stdout is And stdout is
""" """
Unsetting devel project from package 'test:factory/test-pkgA' Unsetting devel project from package 'test:factory/test-pkgA'
Sending meta data...
Done.
""" """

View File

@ -13,8 +13,6 @@ Scenario: Run `osc setdevelproject <project> <package> <devel_project>`
And stdout is And stdout is
""" """
Setting devel project of package 'test:factory/test-pkgA' to package 'test:devel/test-pkgA' Setting devel project of package 'test:factory/test-pkgA' to package 'test:devel/test-pkgA'
Sending meta data...
Done.
""" """
@ -25,8 +23,6 @@ Scenario: Run `osc setdevelproject <project> <package> <devel_project> <devel_pa
And stdout is And stdout is
""" """
Setting devel project of package 'test:factory/test-pkgB' to package 'test:devel/test-pkgA' Setting devel project of package 'test:factory/test-pkgB' to package 'test:devel/test-pkgA'
Sending meta data...
Done.
""" """
@ -37,8 +33,6 @@ Scenario: Run `osc setdevelproject <project>/<package> <devel_project>/<devel_p
And stdout is And stdout is
""" """
Setting devel project of package 'test:factory/test-pkgB' to package 'test:devel/test-pkgA' Setting devel project of package 'test:factory/test-pkgB' to package 'test:devel/test-pkgA'
Sending meta data...
Done.
""" """
@ -49,6 +43,4 @@ Scenario: Run `osc setdevelproject <project> <package> --unset`
And stdout is And stdout is
""" """
Unsetting devel project from package 'test:factory/test-pkgA' Unsetting devel project from package 'test:factory/test-pkgA'
Sending meta data...
Done.
""" """

View File

@ -3865,15 +3865,20 @@ def show_scmsync(apiurl, prj, pac=None):
def show_devel_project(apiurl, prj, pac): def show_devel_project(apiurl, prj, pac):
m = show_package_meta(apiurl, prj, pac) from . import obs_api
node = ET.fromstring(b''.join(m)).find('devel')
if node is None: package_obj = obs_api.Package.from_api(apiurl, prj, pac)
if package_obj.devel is None:
return None, None return None, None
else:
return node.get('project'), node.get('package', None) # mute a false-positive: Instance of 'dict' has no 'project' member (no-member)
# pylint: disable=no-member
return package_obj.devel.project, package_obj.devel.package
def set_devel_project(apiurl, prj, pac, devprj=None, devpac=None, print_to="debug"): def set_devel_project(apiurl, prj, pac, devprj=None, devpac=None, print_to="debug"):
from . import obs_api
if devprj: if devprj:
msg = "Setting devel project of" msg = "Setting devel project of"
else: else:
@ -3888,26 +3893,18 @@ def set_devel_project(apiurl, prj, pac, devprj=None, devpac=None, print_to="debu
) )
_private.print_msg(msg, print_to=print_to) _private.print_msg(msg, print_to=print_to)
meta = show_package_meta(apiurl, prj, pac) package_obj = obs_api.Package.from_api(apiurl, prj, pac)
root = ET.fromstring(b''.join(meta))
node = root.find('devel') if devprj is None:
if node is None: package_obj.devel = None
if devprj is None:
return
node = ET.Element('devel')
root.append(node)
else: else:
if devprj is None: package_obj.devel = {"project": devprj, "package": devpac}
root.remove(node)
else: if package_obj.has_changed():
node.clear() return package_obj.to_api(apiurl)
if devprj:
node.set('project', devprj) # TODO: debug log that we have skipped the API call
if devpac: return None
node.set('package', devpac)
url = makeurl(apiurl, ['source', prj, pac, '_meta'])
mf = metafile(url, ET.tostring(root, encoding=ET_ENCODING))
mf.sync()
def show_package_disabled_repos(apiurl: str, prj: str, pac: str): def show_package_disabled_repos(apiurl: str, prj: str, pac: str):

2
osc/obs_api/__init__.py Normal file
View File

@ -0,0 +1,2 @@
from .package import Package
from .project import Project

79
osc/obs_api/enums.py Normal file
View File

@ -0,0 +1,79 @@
from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import
class BlockModes(str, Enum):
ALL = "all"
LOCAL = "local"
NEVER = "never"
class BuildArch(str, Enum):
NOARCH = "noarch"
AARCH64 = "aarch64"
AARCH64_ILP32 = "aarch64_ilp32"
ARMV4L = "armv4l"
ARMV5L = "armv5l"
ARMV6L = "armv6l"
ARMV7L = "armv7l"
ARMV5EL = "armv5el"
ARMV6EL = "armv6el"
ARMV7EL = "armv7el"
ARMV7HL = "armv7hl"
ARMV8EL = "armv8el"
HPPA = "hppa"
M68K = "m68k"
I386 = "i386"
I486 = "i486"
I586 = "i586"
I686 = "i686"
ATHLON = "athlon"
IA64 = "ia64"
K1OM = "k1om"
MIPS = "mips"
MIPSEL = "mipsel"
MIPS32 = "mips32"
MIPS64 = "mips64"
MIPS64EL = "mips64el"
PPC = "ppc"
PPC64 = "ppc64"
PPC64P7 = "ppc64p7"
PPC64LE = "ppc64le"
RISCV64 = "riscv64"
S390 = "s390"
S390X = "s390x"
SH4 = "sh4"
SPARC = "sparc"
SPARC64 = "sparc64"
SPARC64V = "sparc64v"
SPARCV8 = "sparcv8"
SPARCV9 = "sparcv9"
SPARCV9V = "sparcv9v"
X86_64 = "x86_64"
LOCAL = "local"
class LinkedbuildModes(str, Enum):
OFF = "off"
LOCALDEP = "localdep"
ALLDIRECT = "alldirect"
ALL = "all"
class LocalRole(str, Enum):
MAINTAINER = "maintainer"
BUGOWNER = "bugowner"
REVIEWER = "reviewer"
DOWNLOADER = "downloader"
READER = "reader"
class RebuildModes(str, Enum):
TRANSITIVE = "transitive"
DIRECT = "direct"
LOCAL = "local"
class ReleaseTriggers(str, Enum):
MANUAL = "manual"
MAINTENANCE = "maintenance"
OBSGENDIFF = "obsgendiff"

24
osc/obs_api/flag.py Normal file
View File

@ -0,0 +1,24 @@
from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import
class Flag(XmlModel):
XML_TAG = None
def __init__(self, flag, **kwargs):
super().__init__(flag=flag, **kwargs)
class FlagChoices(Enum):
ENABLE = "enable"
DISABLE = "disable"
flag: FlagChoices = Field(
xml_set_tag=True,
)
arch: Optional[str] = Field(
xml_attribute=True,
)
repository: Optional[str] = Field(
xml_attribute=True,
)

15
osc/obs_api/group_role.py Normal file
View File

@ -0,0 +1,15 @@
from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import
from .enums import LocalRole
class GroupRole(XmlModel):
XML_TAG = "group"
groupid: str = Field(
xml_attribute=True,
)
role: LocalRole = Field(
xml_attribute=True,
)

127
osc/obs_api/package.py Normal file
View File

@ -0,0 +1,127 @@
from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import
from .flag import Flag
from .group_role import GroupRole
from .package_devel import PackageDevel
from .person_role import PersonRole
from .simple_flag import SimpleFlag
from .status import Status
class Package(XmlModel):
XML_TAG = "package"
name: str = Field(
xml_attribute=True,
)
project: str = Field(
xml_attribute=True,
)
title: str = Field()
description: str = Field()
devel: Optional[PackageDevel] = Field()
releasename: Optional[str] = Field()
person_list: Optional[List[PersonRole]] = Field(
xml_name="person",
)
group_list: Optional[List[GroupRole]] = Field(
xml_name="group",
)
lock: Optional[SimpleFlag] = Field()
build_list: Optional[List[Flag]] = Field(
xml_name="build",
xml_wrapped=True,
)
publish_list: Optional[List[Flag]] = Field(
xml_name="publish",
xml_wrapped=True,
)
useforbuild_list: Optional[List[Flag]] = Field(
xml_name="useforbuild",
xml_wrapped=True,
)
debuginfo_list: Optional[List[Flag]] = Field(
xml_name="debuginfo",
xml_wrapped=True,
)
binarydownload: Optional[SimpleFlag] = Field()
sourceaccess: Optional[SimpleFlag] = Field()
url: Optional[str] = Field()
scmsync: Optional[str] = Field()
bcntsynctag: Optional[str] = Field()
@classmethod
def from_api(cls, apiurl, project, package, *, rev=None):
url_path = ["source", project, package, "_meta"]
url_query = {
"rev": rev,
}
response = cls.xml_request("GET", apiurl, url_path, url_query)
return cls.from_file(response)
def to_api(self, apiurl, *, project=None, package=None):
project = project or self.project
package = package or self.name
url_path = ["source", project, package, "_meta"]
url_query = {}
response = self.xml_request("PUT", apiurl, url_path, url_query, data=self.to_string())
return Status.from_file(response)
@classmethod
def cmd_release(
cls,
apiurl: str,
project: str,
package: str,
*,
repository: Optional[str] = None,
arch: Optional[str] = None,
target_project: Optional[str] = None,
target_repository: Optional[str] = None,
setrelease: Optional[str] = None,
nodelay: Optional[bool] = None,
):
"""
POST /source/{project}/{package}?cmd=release
Release sources and binaries of a specified package.
:param apiurl: Full apiurl or its alias.
:param project: Project name.
:param package: Package name.
:param repository: Limit the release to the given repository.
:param arch: Limit the release to the given architecture.
:param target_project: The name of the release target project.
:param target_repository: The name of the release target repository.
:param setrelease: Tag the release with the given value.
:param nodelay: Do not delay the relase. If not set, the release will be delayed to be done later.
"""
url_path = ["source", project, package]
url_query = {
"cmd": "release",
"repository": repository,
"arch": arch,
"target_project": target_project,
"target_repository": target_repository,
"setrelease": setrelease,
"nodelay": nodelay,
}
response = cls.xml_request("POST", apiurl, url_path, url_query)
return Status.from_string(response.read())

View File

@ -0,0 +1,13 @@
from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import
class PackageDevel(XmlModel):
XML_TAG = "devel"
project: str = Field(
xml_attribute=True,
)
package: Optional[str] = Field(
xml_attribute=True,
)

View File

@ -0,0 +1,15 @@
from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import
from .enums import LocalRole
class PersonRole(XmlModel):
XML_TAG = "person"
userid: str = Field(
xml_attribute=True,
)
role: LocalRole = Field(
xml_attribute=True,
)

114
osc/obs_api/project.py Normal file
View File

@ -0,0 +1,114 @@
from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import
from .flag import Flag
from .group_role import GroupRole
from .person_role import PersonRole
from .project_devel import ProjectDevel
from .project_link import ProjectLink
from .project_maintenance_maintains import ProjectMaintenanceMaintains
from .repository import Repository
from .simple_flag import SimpleFlag
class Project(XmlModel):
XML_TAG = "project"
name: str = Field(
xml_attribute=True,
)
class KindEnum(str, Enum):
STANDARD = "standard"
MAINTENANCE = "maintenance"
MAINTENANCE_INCIDENT = "maintenance_incident"
MAINTENANCE_RELEASE = "maintenance_release"
kind: Optional[KindEnum] = Field(
xml_attribute=True,
)
title: str = Field(
)
description: str = Field(
)
url: Optional[str] = Field(
)
link_list: Optional[List[ProjectLink]] = Field(
xml_name="link",
)
mountproject: Optional[str] = Field(
)
remoteurl: Optional[str] = Field(
)
scmsync: Optional[str] = Field(
)
devel: Optional[ProjectDevel] = Field(
)
person_list: Optional[List[PersonRole]] = Field(
xml_name="person",
)
group_list: Optional[List[GroupRole]] = Field(
xml_name="group",
)
lock: Optional[SimpleFlag] = Field(
xml_wrapped=True,
)
build_list: Optional[List[Flag]] = Field(
xml_name="build",
xml_wrapped=True,
)
publish_list: Optional[List[Flag]] = Field(
xml_name="publish",
xml_wrapped=True,
)
useforbuild_list: Optional[List[Flag]] = Field(
xml_name="useforbuild",
xml_wrapped=True,
)
debuginfo_list: Optional[List[Flag]] = Field(
xml_name="debuginfo",
xml_wrapped=True,
)
binarydownload_list: Optional[List[Flag]] = Field(
xml_name="binarydownload",
xml_wrapped=True,
)
sourceaccess: Optional[SimpleFlag] = Field(
xml_wrapped=True,
)
access: Optional[SimpleFlag] = Field(
xml_wrapped=True,
)
maintenance_list: Optional[List[ProjectMaintenanceMaintains]] = Field(
xml_name="maintenance",
xml_wrapped=True,
)
repository_list: Optional[List[Repository]] = Field(
xml_name="repository",
)
@classmethod
def from_api(cls, apiurl, project):
url_path = ["source", project, "_meta"]
url_query = {}
response = cls.xml_request("GET", apiurl, url_path, url_query)
return cls.from_file(response)

View File

@ -0,0 +1,9 @@
from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import
class ProjectDevel(XmlModel):
XML_TAG = "devel"
project: str = Field(
xml_attribute=True,
)

View File

@ -0,0 +1,17 @@
from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import
class ProjectLink(XmlModel):
XML_TAG = "link"
project: str = Field(
xml_attribute=True,
)
class VrevmodeEnum(str, Enum):
UNEXTEND = "unextend"
EXTEND = "extend"
vrevmode: Optional[VrevmodeEnum] = Field(
xml_attribute=True,
)

View File

@ -0,0 +1,9 @@
from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import
class ProjectMaintenanceMaintains(XmlModel):
XML_TAG = "maintains"
project: str = Field(
xml_attribute=True,
)

50
osc/obs_api/repository.py Normal file
View File

@ -0,0 +1,50 @@
from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import
from .enums import BlockModes
from .enums import BuildArch
from .enums import LinkedbuildModes
from .enums import RebuildModes
from .repository_download import RepositoryDownload
from .repository_hostsystem import RepositoryHostsystem
from .repository_path import RepositoryPath
from .repository_releasetarget import RepositoryReleasetarget
class Repository(XmlModel):
XML_TAG = "repository"
name: str = Field(
xml_attribute=True,
)
rebuild: Optional[RebuildModes] = Field(
xml_attribute=True,
)
block: Optional[BlockModes] = Field(
xml_attribute=True,
)
linkedbuild: Optional[LinkedbuildModes] = Field(
xml_attribute=True,
)
download_list: Optional[List[RepositoryDownload]] = Field(
xml_name="download",
)
releasetarget_list: Optional[List[RepositoryReleasetarget]] = Field(
xml_name="releasetarget",
)
hostsystem_list: Optional[List[RepositoryHostsystem]] = Field(
xml_name="hostsystem",
)
path_list: Optional[List[RepositoryPath]] = Field(
xml_name="path",
)
arch_list: Optional[List[BuildArch]] = Field(
xml_name="arch",
)

View File

@ -0,0 +1,36 @@
from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import
from .repository_download_master import RepositoryDownloadMaster
class RepositoryDownload(XmlModel):
XML_TAG = "download"
arch: str = Field(
xml_attribute=True,
)
url: str = Field(
xml_attribute=True,
)
class RepotypeEnum(str, Enum):
RPMMD = "rpmmd"
SUSETAGS = "susetags"
DEB = "deb"
ARCH = "arch"
MDK = "mdk"
REGISTRY = "registry"
repotype: RepotypeEnum = Field(
xml_attribute=True,
)
archfilter: Optional[str] = Field(
)
master: Optional[RepositoryDownloadMaster] = Field(
)
pubkey: Optional[str] = Field(
)

View File

@ -0,0 +1,13 @@
from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import
class RepositoryDownloadMaster(XmlModel):
XML_TAG = "master"
url: str = Field(
xml_attribute=True,
)
sslfingerprint: Optional[str] = Field(
xml_attribute=True,
)

View File

@ -0,0 +1,13 @@
from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import
class RepositoryHostsystem(XmlModel):
XML_TAG = "hostsystem"
repository: str = Field(
xml_attribute=True,
)
project: str = Field(
xml_attribute=True,
)

View File

@ -0,0 +1,13 @@
from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import
class RepositoryPath(XmlModel):
XML_TAG = "path"
project: str = Field(
xml_attribute=True,
)
repository: str = Field(
xml_attribute=True,
)

View File

@ -0,0 +1,19 @@
from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import
from .enums import ReleaseTriggers
class RepositoryReleasetarget(XmlModel):
XML_TAG = "releasetarget"
project: str = Field(
xml_attribute=True,
)
repository: str = Field(
xml_attribute=True,
)
trigger: Optional[ReleaseTriggers] = Field(
xml_attribute=True,
)

View File

@ -0,0 +1,24 @@
from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import
class SimpleFlag(XmlModel):
XML_TAG = None
XML_TAG_FIELD = "flag"
def __init__(self, flag):
super().__init__(flag=flag)
class SimpleFlagChoices(Enum):
ENABLE = "enable"
DISABLE = "disable"
flag: SimpleFlagChoices = Field(
xml_wrapped=True,
xml_set_tag=True,
)
def __eq__(self, other):
if hasattr(other, "flag"):
return self.flag == other.flag
# allow comparing with a string
return self.flag == other

42
osc/obs_api/status.py Normal file
View File

@ -0,0 +1,42 @@
import textwrap
from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import
from .status_data import StatusData
class Status(XmlModel):
XML_TAG = "status"
code: str = Field(
xml_attribute=True,
description=textwrap.dedent(
"""
Status code returned by the server.
"""
),
)
summary: Optional[str] = Field(
description=textwrap.dedent(
"""
Human readable summary.
"""
),
)
details: Optional[str] = Field(
description=textwrap.dedent(
"""
Detailed, human readable information.
"""
),
)
data_list: Optional[List[StatusData]] = Field(
xml_name="data",
description=textwrap.dedent(
"""
Additional machine readable data.
"""
),
)

View File

@ -0,0 +1,31 @@
import textwrap
from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import
class StatusData(XmlModel):
XML_TAG = "data"
class NameEnum(str, Enum):
SOURCEPROJECT = "sourceproject"
SOURCEPACKAGE = "sourcepackage"
TARGETPROJECT = "targetproject"
TARGETPACKAGE = "targetpackage"
name: NameEnum = Field(
xml_attribute=True,
description=textwrap.dedent(
"""
Key.
"""
),
)
value: str = Field(
xml_set_text=True,
description=textwrap.dedent(
"""
Value.
"""
),
)

View File

@ -9,9 +9,12 @@ This module IS NOT a supported API, it is meant for osc internal use only.
import copy import copy
import inspect import inspect
import sys import sys
import tempfile
import types import types
import typing
from typing import Callable from typing import Callable
from typing import get_type_hints from typing import get_type_hints
from xml.etree import ElementTree as ET
# supported types # supported types
from enum import Enum from enum import Enum
@ -23,7 +26,6 @@ from typing import Optional
from typing import Tuple from typing import Tuple
from typing import Union from typing import Union
if sys.version_info < (3, 8): if sys.version_info < (3, 8):
def get_origin(typ): def get_origin(typ):
@ -36,9 +38,14 @@ if sys.version_info < (3, 8):
else: else:
from typing import get_origin from typing import get_origin
import urllib3.response
from . import xml
__all__ = ( __all__ = (
"BaseModel", "BaseModel",
"XmlModel",
"Field", "Field",
"NotSet", "NotSet",
"FromParent", "FromParent",
@ -297,6 +304,10 @@ class Field(property):
else: else:
new_value.append(i) new_value.append(i)
value = new_value value = new_value
elif self.is_model and isinstance(value, str) and hasattr(self.origin_type, "XML_TAG_FIELD"):
klass = self.origin_type
key = getattr(self.origin_type, "XML_TAG_FIELD")
value = klass(**{key: value})
self.validate_type(value) self.validate_type(value)
obj._values[self.name] = value obj._values[self.name] = value
@ -374,8 +385,16 @@ class BaseModel(metaclass=ModelMeta):
for name, field in self.__fields__.items(): for name, field in self.__fields__.items():
field.validate_type(getattr(self, name)) field.validate_type(getattr(self, name))
self._snapshot = {} # copy of ``self.dict()`` so we can determine if the object has changed later on
self.do_snapshot()
self._allow_new_attributes = False self._allow_new_attributes = False
def __eq__(self, other):
if type(self) != type(other):
return False
return self.dict() == other.dict()
def dict(self): def dict(self):
result = {} result = {}
for name, field in self.__fields__.items(): for name, field in self.__fields__.items():
@ -390,3 +409,362 @@ class BaseModel(metaclass=ModelMeta):
result[name] = value result[name] = value
return result return result
def do_snapshot(self):
"""
Save ``self.dict()`` result as a new starting point for detecting changes in the object data.
"""
self._snapshot = self.dict()
def has_changed(self):
"""
Determine if the object data has changed since its creation or the last snapshot.
"""
return self.dict() != self._snapshot
class XmlModel(BaseModel):
XML_TAG = None
def to_xml(self) -> ET.Element:
xml_tag = None
# check if there's a special field that sets the tag
for field_name, field in self.__fields__.items():
xml_set_tag = field.extra.get("xml_set_tag", False)
if xml_set_tag:
value = getattr(self, field_name)
xml_tag = value
break
# use the value from the class
if xml_tag is None:
xml_tag = self.XML_TAG
assert xml_tag is not None
root = ET.Element(xml_tag)
for field_name, field in self.__fields__.items():
xml_attribute = field.extra.get("xml_attribute", False)
xml_set_tag = field.extra.get("xml_set_tag", False)
xml_set_text = field.extra.get("xml_set_text", False)
xml_name = field.extra.get("xml_name", field_name)
xml_wrapped = field.extra.get("xml_wrapped", False)
xml_item_name = field.extra.get("xml_item_name", xml_name)
if xml_set_tag:
# a special case when the field determines the top-level tag name
continue
value = getattr(self, field_name)
if value is None:
# skip fields that are not set
continue
# if value is wrapped into an external element, create it
if xml_wrapped:
wrapper_node = ET.SubElement(root, xml_name)
else:
wrapper_node = root
if xml_set_text:
wrapper_node.text = str(value)
continue
if field.origin_type == list:
for entry in value:
if isinstance(entry, dict):
klass = field.inner_type
obj = klass(**entry)
node = obj.to_xml()
wrapper_node.append(node)
elif field.inner_type and issubclass(field.inner_type, XmlModel):
wrapper_node.append(entry.to_xml())
else:
node = ET.SubElement(wrapper_node, xml_item_name)
if xml_attribute:
node.attrib[xml_attribute] = entry
else:
node.text = entry
elif issubclass(field.origin_type, XmlModel):
wrapper_node.append(value.to_xml())
elif xml_attribute:
wrapper_node.attrib[xml_name] = str(value)
else:
node = ET.SubElement(wrapper_node, xml_name)
node.text = str(value)
return root
@classmethod
def from_string(cls, string: str) -> "XmlModel":
"""
Instantiate model from string.
"""
root = ET.fromstring(string)
return cls.from_xml(root)
@classmethod
def from_file(cls, file: Union[str, typing.IO]) -> "XmlModel":
"""
Instantiate model from file.
"""
root = ET.parse(file).getroot()
return cls.from_xml(root)
def to_bytes(self) -> bytes:
"""
Serialize the object as XML and return it as utf-8 encoded bytes.
"""
root = self.to_xml()
xml.xml_indent(root)
return ET.tostring(root, encoding="utf-8")
def to_string(self) -> str:
"""
Serialize the object as XML and return it as a string.
"""
return self.to_bytes().decode("utf-8")
def to_file(self, file: Union[str, typing.IO]) -> None:
"""
Serialize the object as XML and save it to an utf-8 encoded file.
"""
root = self.to_xml()
xml.xml_indent(root)
return ET.ElementTree(root).write(file, encoding="utf-8")
@staticmethod
def value_from_string(field, value):
"""
Convert field value from string to the actual type of the field.
"""
if field.origin_type is bool:
if value.lower() in ["1", "yes", "true", "on"]:
value = True
return value
if value.lower() in ["0", "no", "false", "off"]:
value = False
return value
if field.origin_type is int:
value = int(value)
return value
return value
@classmethod
def _remove_processed_node(cls, parent, node):
"""
Remove a node that has been fully processed and is now empty.
"""
if len(node) != 0:
raise RuntimeError(f"Node {node} contains unprocessed child elements {list(node)}")
if node.attrib:
raise RuntimeError(f"Node {node} contains unprocessed attributes {node.attrib}")
if node.text is not None and node.text.strip():
raise RuntimeError(f"Node {node} contains unprocessed text {node.text}")
if parent is not None:
parent.remove(node)
@classmethod
def from_xml(cls, root: ET.Element):
"""
Instantiate model from a XML root.
"""
# We need to make sure we parse all data
# and that's why we remove processed elements and attributes and check that nothing remains.
# Otherwise we'd be sending partial XML back and that would lead to data loss.
#
# Let's make a copy of the xml tree because we'll destroy it during the process.
orig_root = root
root = copy.deepcopy(root)
kwargs = {}
for field_name, field in cls.__fields__.items():
xml_attribute = field.extra.get("xml_attribute", False)
xml_set_tag = field.extra.get("xml_set_tag", False)
xml_set_text = field.extra.get("xml_set_text", False)
xml_name = field.extra.get("xml_name", field_name)
xml_wrapped = field.extra.get("xml_wrapped", False)
xml_item_name = field.extra.get("xml_item_name", xml_name)
value: Any
node: Optional[ET.Element]
if xml_set_tag:
# field contains name of the ``root`` tag
if xml_wrapped:
# the last node wins (overrides the previous nodes)
for node in root[:]:
value = node.tag
cls._remove_processed_node(root, node)
else:
value = root.tag
kwargs[field_name] = value
continue
if xml_set_text:
# field contains the value (text) of the element
if xml_wrapped:
# the last node wins (overrides the previous nodes)
for node in root[:]:
value = node.text
node.text = None
cls._remove_processed_node(root, node)
else:
value = root.text
root.text = None
value = value.strip()
kwargs[field_name] = value
continue
if xml_attribute:
# field is an attribute that contains a scalar
if xml_name not in root.attrib:
continue
value = cls.value_from_string(field, root.attrib.pop(xml_name))
kwargs[field_name] = value
continue
if field.origin_type is list:
if xml_wrapped:
wrapper_node = root.find(xml_name)
# we'll consider all nodes inside the wrapper node
nodes = wrapper_node[:] if wrapper_node is not None else None
else:
wrapper_node = None
# we'll consider only nodes with matching name
nodes = root.findall(xml_item_name)
if not nodes:
if wrapper_node is not None:
cls._remove_processed_node(root, wrapper_node)
continue
values = []
for node in nodes:
if field.is_model_list:
klass = field.inner_type
entry = klass.from_xml(node)
# clear node as it was checked in from_xml() already
node.text = None
node.attrib = {}
node[:] = []
else:
entry = cls.value_from_string(field, node.text)
node.text = None
values.append(entry)
if xml_wrapped:
cls._remove_processed_node(wrapper_node, node)
else:
cls._remove_processed_node(root, node)
if xml_wrapped:
cls._remove_processed_node(root, wrapper_node)
kwargs[field_name] = values
continue
if field.is_model:
# field contains an instance of XmlModel
assert xml_name is not None
node = root.find(xml_name)
if node is None:
continue
klass = field.origin_type
kwargs[field_name] = klass.from_xml(node)
# clear node as it was checked in from_xml() already
node.text = None
node.attrib = {}
node[:] = []
cls._remove_processed_node(root, node)
continue
# field contains a scalar
node = root.find(xml_name)
if node is None:
continue
value = cls.value_from_string(field, node.text)
node.text = None
if value is None:
if field.is_optional:
continue
value = ""
kwargs[field_name] = value
cls._remove_processed_node(root, node)
cls._remove_processed_node(None, root)
obj = cls(**kwargs)
obj.__dict__["_root"] = orig_root
return obj
@classmethod
def xml_request(cls, method: str, apiurl: str, path: List[str], query: Optional[dict] = None, data: Optional[str] = None) -> urllib3.response.HTTPResponse:
from ..connection import http_request
from ..core import makeurl
url = makeurl(apiurl, path, query)
# TODO: catch HTTPError and return the wrapped response as XmlModel instance
return http_request(method, url, data=data, retry_on_400=False)
def do_update(self, other: "XmlModel") -> None:
"""
Update values of the fields in the current model instance from another.
"""
self._values = copy.deepcopy(other._values)
def do_edit(self) -> Tuple[str, str, "XmlModel"]:
"""
Serialize model as XML and open it in an editor for editing.
Return a tuple with:
* a string with original data
* a string with edited data
* an instance of the class with edited data loaded
IMPORTANT: This method is always interactive.
"""
from ..core import run_editor
from ..output import get_user_input
def write_file(f, data):
f.seek(0)
f.write(data)
f.truncate()
f.flush()
with tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8", prefix="obs_xml_", suffix=".xml") as f:
original_data = self.to_string()
write_file(f, original_data)
while True:
run_editor(f.name)
try:
edited_obj = self.__class__.from_file(f.name)
f.seek(0)
edited_data = f.read()
break
except Exception as e:
reply = get_user_input(
f"""
The edited data is not valid.
{e}
""",
answers={"a": "abort", "e": "edit", "u": "undo changes and edit"},
)
if reply == "a":
from .. import oscerr
raise oscerr.UserAbort()
elif reply == "e":
continue
elif reply == "u":
write_file(f, original_data)
continue
return original_data, edited_data, edited_obj

View File

@ -517,7 +517,7 @@ class TestConf(unittest.TestCase):
conf1 = osc.conf.Options() conf1 = osc.conf.Options()
conf2 = osc.conf.Options() conf2 = osc.conf.Options()
self.assertNotEqual(conf1, conf2) self.assertEqual(conf1, conf2) # models are compared by their contents now
self.assertNotEqual(id(conf1), id(conf2)) self.assertNotEqual(id(conf1), id(conf2))
self.assertNotEqual(id(conf1.api_host_options), id(conf2.api_host_options)) self.assertNotEqual(id(conf1.api_host_options), id(conf2.api_host_options))

View File

@ -316,6 +316,29 @@ class Test(unittest.TestCase):
self.assertEqual(m.quiet, False) self.assertEqual(m.quiet, False)
self.assertEqual(m.verbose, True) self.assertEqual(m.verbose, True)
def test_has_changed(self):
class TestSubmodel(BaseModel):
text: str = Field(default="default")
class TestModel(BaseModel):
field: Optional[List[TestSubmodel]] = Field(default=[])
m = TestModel()
self.assertFalse(m.has_changed())
# a new instance of empty list
m.field = []
self.assertFalse(m.has_changed())
m.field = [{"text": "one"}, {"text": "two"}]
self.assertTrue(m.has_changed())
m.do_snapshot()
# a new instance of list with new instances of objects with the same data
m.field = [{"text": "one"}, {"text": "two"}]
self.assertFalse(m.has_changed())
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@ -0,0 +1,154 @@
import textwrap
import unittest
from osc.util.models import *
class TestXmlModel(unittest.TestCase):
def test_attribute(self):
class TestModel(XmlModel):
XML_TAG = "tag"
value: str = Field(xml_attribute=True)
m = TestModel(value="FOO")
self.assertEqual(m.dict(), {"value": "FOO"})
expected = """<tag value="FOO" />"""
self.assertEqual(m.to_string(), expected)
# verify that we can also load the serialized data
m = TestModel.from_string(expected)
self.assertEqual(m.to_string(), expected)
def test_element(self):
class TestModel(XmlModel):
XML_TAG = "tag"
value: str = Field()
m = TestModel(value="FOO")
self.assertEqual(m.dict(), {"value": "FOO"})
expected = textwrap.dedent(
"""
<tag>
<value>FOO</value>
</tag>
"""
).strip()
self.assertEqual(m.to_string(), expected)
# verify that we can also load the serialized data
m = TestModel.from_string(expected)
self.assertEqual(m.to_string(), expected)
def test_element_list(self):
class TestModel(XmlModel):
XML_TAG = "tag"
value_list: List[str] = Field(xml_name="value")
m = TestModel(value_list=["FOO", "BAR"])
self.assertEqual(m.dict(), {"value_list": ["FOO", "BAR"]})
expected = textwrap.dedent(
"""
<tag>
<value>FOO</value>
<value>BAR</value>
</tag>
"""
).strip()
self.assertEqual(m.to_string(), expected)
# verify that we can also load the serialized data
m = TestModel.from_string(expected)
self.assertEqual(m.to_string(), expected)
def test_child_model(self):
class ChildModel(XmlModel):
XML_TAG = "child"
value: str = Field()
class ParentModel(XmlModel):
XML_TAG = "parent"
text: str = Field()
child: ChildModel = Field()
m = ParentModel(text="TEXT", child={"value": "FOO"})
expected = textwrap.dedent(
"""
<parent>
<text>TEXT</text>
<child>
<value>FOO</value>
</child>
</parent>
"""
).strip()
self.assertEqual(m.to_string(), expected)
# verify that we can also load the serialized data
m = ParentModel.from_string(expected)
self.assertEqual(m.to_string(), expected)
def test_child_model_list(self):
class ChildModel(XmlModel):
XML_TAG = "child"
value: str = Field()
class ParentModel(XmlModel):
XML_TAG = "parent"
text: str = Field()
child: List[ChildModel] = Field()
m = ParentModel(text="TEXT", child=[{"value": "FOO"}, {"value": "BAR"}])
expected = textwrap.dedent(
"""
<parent>
<text>TEXT</text>
<child>
<value>FOO</value>
</child>
<child>
<value>BAR</value>
</child>
</parent>
"""
).strip()
self.assertEqual(m.to_string(), expected)
# verify that we can also load the serialized data
m = ParentModel.from_string(expected)
self.assertEqual(m.to_string(), expected)
def test_child_model_list_wrapped(self):
class ChildModel(XmlModel):
XML_TAG = "child"
value: str = Field()
class ParentModel(XmlModel):
XML_TAG = "parent"
text: str = Field()
child: List[ChildModel] = Field(xml_wrapped=True, xml_name="children")
m = ParentModel(text="TEXT", child=[{"value": "FOO"}, {"value": "BAR"}])
expected = textwrap.dedent(
"""
<parent>
<text>TEXT</text>
<children>
<child>
<value>FOO</value>
</child>
<child>
<value>BAR</value>
</child>
</children>
</parent>
"""
).strip()
self.assertEqual(m.to_string(), expected)
# verify that we can also load the serialized data
m = ParentModel.from_string(expected)
self.assertEqual(m.to_string(), expected)
if __name__ == "__main__":
unittest.main()