salt/implementation-of-held-unheld-functions-for-state-pk.patch

814 lines
28 KiB
Diff
Raw Normal View History

From 8e5295ef9047a9afdd2323508c633ab0356ef603 Mon Sep 17 00:00:00 2001
From: Alexander Graul <agraul@suse.com>
Date: Wed, 19 Jan 2022 15:34:24 +0100
Subject: [PATCH] Implementation of held/unheld functions for state pkg
(#387)
* Implementation of held/unheld functions for state pkg
---
salt/modules/zypperpkg.py | 119 ++++++-
salt/states/pkg.py | 310 +++++++++++++++++++
tests/pytests/unit/modules/test_zypperpkg.py | 133 ++++++++
tests/pytests/unit/states/test_pkg.py | 137 ++++++++
4 files changed, 686 insertions(+), 13 deletions(-)
diff --git a/salt/modules/zypperpkg.py b/salt/modules/zypperpkg.py
index 4fc045c313..ac6c36a09f 100644
--- a/salt/modules/zypperpkg.py
+++ b/salt/modules/zypperpkg.py
@@ -2103,6 +2103,76 @@ def purge(
return _uninstall(inclusion_detection, name=name, pkgs=pkgs, root=root)
+def list_holds(pattern=None, full=True, root=None, **kwargs):
+ """
+ List information on locked packages.
+
+ .. note::
+ This function returns the computed output of ``list_locks``
+ to show exact locked packages.
+
+ pattern
+ Regular expression used to match the package name
+
+ full : True
+ Show the full hold definition including version and epoch. Set to
+ ``False`` to return just the name of the package(s) being held.
+
+ root
+ Operate on a different root directory.
+
+
+ CLI Example:
+
+ .. code-block:: bash
+
+ salt '*' pkg.list_holds
+ salt '*' pkg.list_holds full=False
+ """
+ locks = list_locks(root=root)
+ ret = []
+ inst_pkgs = {}
+ for solv_name, lock in locks.items():
+ if lock.get("type", "package") != "package":
+ continue
+ try:
+ found_pkgs = search(
+ solv_name,
+ root=root,
+ match=None if "*" in solv_name else "exact",
+ case_sensitive=(lock.get("case_sensitive", "on") == "on"),
+ installed_only=True,
+ details=True,
+ all_versions=True,
+ ignore_no_matching_item=True,
+ )
+ except CommandExecutionError:
+ continue
+ if found_pkgs:
+ for pkg in found_pkgs:
+ if pkg not in inst_pkgs:
+ inst_pkgs.update(
+ info_installed(
+ pkg, root=root, attr="edition,epoch", all_versions=True
+ )
+ )
+
+ ptrn_re = re.compile(r"{}-\S+".format(pattern)) if pattern else None
+ for pkg_name, pkg_editions in inst_pkgs.items():
+ for pkg_info in pkg_editions:
+ pkg_ret = (
+ "{}-{}:{}.*".format(
+ pkg_name, pkg_info.get("epoch", 0), pkg_info.get("edition")
+ )
+ if full
+ else pkg_name
+ )
+ if pkg_ret not in ret and (not ptrn_re or ptrn_re.match(pkg_ret)):
+ ret.append(pkg_ret)
+
+ return ret
+
+
def list_locks(root=None):
"""
List current package locks.
@@ -2173,7 +2243,7 @@ def clean_locks(root=None):
return out
-def unhold(name=None, pkgs=None, **kwargs):
+def unhold(name=None, pkgs=None, root=None, **kwargs):
"""
.. versionadded:: 3003
@@ -2187,6 +2257,9 @@ def unhold(name=None, pkgs=None, **kwargs):
A list of packages to unhold. The ``name`` parameter will be ignored if
this option is passed.
+ root
+ Operate on a different root directory.
+
CLI Example:
.. code-block:: bash
@@ -2201,24 +2274,38 @@ def unhold(name=None, pkgs=None, **kwargs):
targets = []
if pkgs:
- for pkg in salt.utils.data.repack_dictlist(pkgs):
- targets.append(pkg)
+ targets.extend(pkgs)
else:
targets.append(name)
locks = list_locks()
removed = []
- missing = []
for target in targets:
+ version = None
+ if isinstance(target, dict):
+ (target, version) = next(iter(target.items()))
ret[target] = {"name": target, "changes": {}, "result": True, "comment": ""}
if locks.get(target):
- removed.append(target)
- ret[target]["changes"]["new"] = ""
- ret[target]["changes"]["old"] = "hold"
- ret[target]["comment"] = "Package {} is no longer held.".format(target)
+ lock_ver = None
+ if "version" in locks.get(target):
+ lock_ver = locks.get(target)["version"]
+ lock_ver = lock_ver.lstrip("= ")
+ if version and lock_ver != version:
+ ret[target]["result"] = False
+ ret[target][
+ "comment"
+ ] = "Unable to unhold package {} as it is held with the other version.".format(
+ target
+ )
+ else:
+ removed.append(
+ target if not lock_ver else "{}={}".format(target, lock_ver)
+ )
+ ret[target]["changes"]["new"] = ""
+ ret[target]["changes"]["old"] = "hold"
+ ret[target]["comment"] = "Package {} is no longer held.".format(target)
else:
- missing.append(target)
ret[target]["comment"] = "Package {} was already unheld.".format(target)
if removed:
@@ -2271,7 +2358,7 @@ def remove_lock(name, root=None, **kwargs):
return {"removed": len(removed), "not_found": missing}
-def hold(name=None, pkgs=None, **kwargs):
+def hold(name=None, pkgs=None, root=None, **kwargs):
"""
.. versionadded:: 3003
@@ -2285,6 +2372,10 @@ def hold(name=None, pkgs=None, **kwargs):
A list of packages to hold. The ``name`` parameter will be ignored if
this option is passed.
+ root
+ Operate on a different root directory.
+
+
CLI Example:
.. code-block:: bash
@@ -2299,8 +2390,7 @@ def hold(name=None, pkgs=None, **kwargs):
targets = []
if pkgs:
- for pkg in salt.utils.data.repack_dictlist(pkgs):
- targets.append(pkg)
+ targets.extend(pkgs)
else:
targets.append(name)
@@ -2308,9 +2398,12 @@ def hold(name=None, pkgs=None, **kwargs):
added = []
for target in targets:
+ version = None
+ if isinstance(target, dict):
+ (target, version) = next(iter(target.items()))
ret[target] = {"name": target, "changes": {}, "result": True, "comment": ""}
if not locks.get(target):
- added.append(target)
+ added.append(target if not version else "{}={}".format(target, version))
ret[target]["changes"]["new"] = "hold"
ret[target]["changes"]["old"] = ""
ret[target]["comment"] = "Package {} is now being held.".format(target)
diff --git a/salt/states/pkg.py b/salt/states/pkg.py
index f71f61e720..0d601e1aaf 100644
--- a/salt/states/pkg.py
+++ b/salt/states/pkg.py
@@ -3644,3 +3644,313 @@ def mod_beacon(name, **kwargs):
),
"result": False,
}
+
+
+def held(name, version=None, pkgs=None, replace=False, **kwargs):
+ """
+ Set package in 'hold' state, meaning it will not be changed.
+
+ :param str name:
+ The name of the package to be held. This parameter is ignored
+ if ``pkgs`` is used.
+
+ :param str version:
+ Hold a specific version of a package.
+ Full description of this parameter is in `installed` function.
+
+ .. note::
+
+ This parameter make sense for Zypper-based systems.
+ Ignored for YUM/DNF and APT
+
+ :param list pkgs:
+ A list of packages to be held. All packages listed under ``pkgs``
+ will be held.
+
+ .. code-block:: yaml
+
+ mypkgs:
+ pkg.held:
+ - pkgs:
+ - foo
+ - bar: 1.2.3-4
+ - baz
+
+ .. note::
+
+ For Zypper-based systems the package could be held for
+ the version specified. YUM/DNF and APT ingore it.
+
+ :param bool replace:
+ Force replacement of existings holds with specified.
+ By default, this parameter is set to ``False``.
+ """
+
+ if isinstance(pkgs, list) and len(pkgs) == 0 and not replace:
+ return {
+ "name": name,
+ "changes": {},
+ "result": True,
+ "comment": "No packages to be held provided",
+ }
+
+ # If just a name (and optionally a version) is passed, just pack them into
+ # the pkgs argument.
+ if name and pkgs is None:
+ if version:
+ pkgs = [{name: version}]
+ version = None
+ else:
+ pkgs = [name]
+
+ locks = {}
+ vr_lock = False
+ if "pkg.list_locks" in __salt__:
+ locks = __salt__["pkg.list_locks"]()
+ vr_lock = True
+ elif "pkg.list_holds" in __salt__:
+ _locks = __salt__["pkg.list_holds"](full=True)
+ lock_re = re.compile(r"^(.+)-(\d+):(.*)\.\*")
+ for lock in _locks:
+ match = lock_re.match(lock)
+ if match:
+ epoch = match.group(2)
+ if epoch == "0":
+ epoch = ""
+ else:
+ epoch = "{}:".format(epoch)
+ locks.update(
+ {match.group(1): {"version": "{}{}".format(epoch, match.group(3))}}
+ )
+ else:
+ locks.update({lock: {}})
+ elif "pkg.get_selections" in __salt__:
+ _locks = __salt__["pkg.get_selections"](state="hold")
+ for lock in _locks.get("hold", []):
+ locks.update({lock: {}})
+ else:
+ return {
+ "name": name,
+ "changes": {},
+ "result": False,
+ "comment": "No any function to get the list of held packages available.\n"
+ "Check if the package manager supports package locking.",
+ }
+
+ if "pkg.hold" not in __salt__:
+ return {
+ "name": name,
+ "changes": {},
+ "result": False,
+ "comment": "`hold` function is not implemented for the package manager.",
+ }
+
+ ret = {"name": name, "changes": {}, "result": True, "comment": ""}
+ comments = []
+
+ held_pkgs = set()
+ for pkg in pkgs:
+ if isinstance(pkg, dict):
+ (pkg_name, pkg_ver) = next(iter(pkg.items()))
+ else:
+ pkg_name = pkg
+ pkg_ver = None
+ lock_ver = None
+ if pkg_name in locks and "version" in locks[pkg_name]:
+ lock_ver = locks[pkg_name]["version"]
+ lock_ver = lock_ver.lstrip("= ")
+ held_pkgs.add(pkg_name)
+ if pkg_name not in locks or (vr_lock and lock_ver != pkg_ver):
+ if __opts__["test"]:
+ if pkg_name in locks:
+ comments.append(
+ "The following package's hold rule would be updated: {}{}".format(
+ pkg_name,
+ "" if not pkg_ver else " (version = {})".format(pkg_ver),
+ )
+ )
+ else:
+ comments.append(
+ "The following package would be held: {}{}".format(
+ pkg_name,
+ "" if not pkg_ver else " (version = {})".format(pkg_ver),
+ )
+ )
+ else:
+ unhold_ret = None
+ if pkg_name in locks:
+ unhold_ret = __salt__["pkg.unhold"](name=name, pkgs=[pkg_name])
+ hold_ret = __salt__["pkg.hold"](name=name, pkgs=[pkg])
+ if not hold_ret.get(pkg_name, {}).get("result", False):
+ ret["result"] = False
+ if (
+ unhold_ret
+ and unhold_ret.get(pkg_name, {}).get("result", False)
+ and hold_ret
+ and hold_ret.get(pkg_name, {}).get("result", False)
+ ):
+ comments.append(
+ "Package {} was updated with hold rule".format(pkg_name)
+ )
+ elif hold_ret and hold_ret.get(pkg_name, {}).get("result", False):
+ comments.append("Package {} is now being held".format(pkg_name))
+ else:
+ comments.append("Package {} was not held".format(pkg_name))
+ ret["changes"].update(hold_ret)
+
+ if replace:
+ for pkg_name in locks:
+ if locks[pkg_name].get("type", "package") != "package":
+ continue
+ if __opts__["test"]:
+ if pkg_name not in held_pkgs:
+ comments.append(
+ "The following package would be unheld: {}".format(pkg_name)
+ )
+ else:
+ if pkg_name not in held_pkgs:
+ unhold_ret = __salt__["pkg.unhold"](name=name, pkgs=[pkg_name])
+ if not unhold_ret.get(pkg_name, {}).get("result", False):
+ ret["result"] = False
+ if unhold_ret and unhold_ret.get(pkg_name, {}).get("comment"):
+ comments.append(unhold_ret.get(pkg_name).get("comment"))
+ ret["changes"].update(unhold_ret)
+
+ ret["comment"] = "\n".join(comments)
+ if not (ret["changes"] or ret["comment"]):
+ ret["comment"] = "No changes made"
+
+ return ret
+
+
+def unheld(name, version=None, pkgs=None, all=False, **kwargs):
+ """
+ Unset package from 'hold' state, to allow operations with the package.
+
+ :param str name:
+ The name of the package to be unheld. This parameter is ignored if "pkgs"
+ is used.
+
+ :param str version:
+ Unhold a specific version of a package.
+ Full description of this parameter is in `installed` function.
+
+ .. note::
+
+ This parameter make sense for Zypper-based systems.
+ Ignored for YUM/DNF and APT.
+
+ :param list pkgs:
+ A list of packages to be unheld. All packages listed under ``pkgs``
+ will be unheld.
+
+ .. code-block:: yaml
+
+ mypkgs:
+ pkg.unheld:
+ - pkgs:
+ - foo
+ - bar: 1.2.3-4
+ - baz
+
+ .. note::
+
+ For Zypper-based systems the package could be held for
+ the version specified. YUM/DNF and APT ingore it.
+ For ``unheld`` there is no need to specify the exact version
+ to be unheld.
+
+ :param bool all:
+ Force removing of all existings locks.
+ By default, this parameter is set to ``False``.
+ """
+
+ if isinstance(pkgs, list) and len(pkgs) == 0 and not all:
+ return {
+ "name": name,
+ "changes": {},
+ "result": True,
+ "comment": "No packages to be unheld provided",
+ }
+
+ # If just a name (and optionally a version) is passed, just pack them into
+ # the pkgs argument.
+ if name and pkgs is None:
+ pkgs = [{name: version}]
+ version = None
+
+ locks = {}
+ vr_lock = False
+ if "pkg.list_locks" in __salt__:
+ locks = __salt__["pkg.list_locks"]()
+ vr_lock = True
+ elif "pkg.list_holds" in __salt__:
+ _locks = __salt__["pkg.list_holds"](full=True)
+ lock_re = re.compile(r"^(.+)-(\d+):(.*)\.\*")
+ for lock in _locks:
+ match = lock_re.match(lock)
+ if match:
+ epoch = match.group(2)
+ if epoch == "0":
+ epoch = ""
+ else:
+ epoch = "{}:".format(epoch)
+ locks.update(
+ {match.group(1): {"version": "{}{}".format(epoch, match.group(3))}}
+ )
+ else:
+ locks.update({lock: {}})
+ elif "pkg.get_selections" in __salt__:
+ _locks = __salt__["pkg.get_selections"](state="hold")
+ for lock in _locks.get("hold", []):
+ locks.update({lock: {}})
+ else:
+ return {
+ "name": name,
+ "changes": {},
+ "result": False,
+ "comment": "No any function to get the list of held packages available.\n"
+ "Check if the package manager supports package locking.",
+ }
+
+ dpkgs = {}
+ for pkg in pkgs:
+ if isinstance(pkg, dict):
+ (pkg_name, pkg_ver) = next(iter(pkg.items()))
+ dpkgs.update({pkg_name: pkg_ver})
+ else:
+ dpkgs.update({pkg: None})
+
+ ret = {"name": name, "changes": {}, "result": True, "comment": ""}
+ comments = []
+
+ for pkg_name in locks:
+ if locks[pkg_name].get("type", "package") != "package":
+ continue
+ lock_ver = None
+ if vr_lock and "version" in locks[pkg_name]:
+ lock_ver = locks[pkg_name]["version"]
+ lock_ver = lock_ver.lstrip("= ")
+ if all or (pkg_name in dpkgs and (not lock_ver or lock_ver == dpkgs[pkg_name])):
+ if __opts__["test"]:
+ comments.append(
+ "The following package would be unheld: {}{}".format(
+ pkg_name,
+ ""
+ if not dpkgs.get(pkg_name)
+ else " (version = {})".format(lock_ver),
+ )
+ )
+ else:
+ unhold_ret = __salt__["pkg.unhold"](name=name, pkgs=[pkg_name])
+ if not unhold_ret.get(pkg_name, {}).get("result", False):
+ ret["result"] = False
+ if unhold_ret and unhold_ret.get(pkg_name, {}).get("comment"):
+ comments.append(unhold_ret.get(pkg_name).get("comment"))
+ ret["changes"].update(unhold_ret)
+
+ ret["comment"] = "\n".join(comments)
+ if not (ret["changes"] or ret["comment"]):
+ ret["comment"] = "No changes made"
+
+ return ret
diff --git a/tests/pytests/unit/modules/test_zypperpkg.py b/tests/pytests/unit/modules/test_zypperpkg.py
index eb1e63f6d7..bfc1558c9a 100644
--- a/tests/pytests/unit/modules/test_zypperpkg.py
+++ b/tests/pytests/unit/modules/test_zypperpkg.py
@@ -121,3 +121,136 @@ def test_del_repo_key():
with patch.dict(zypper.__salt__, salt_mock):
assert zypper.del_repo_key(keyid="keyid", root="/mnt")
salt_mock["lowpkg.remove_gpg_key"].assert_called_once_with("keyid", "/mnt")
+
+
+def test_pkg_hold():
+ """
+ Tests holding packages with Zypper
+ """
+
+ # Test openSUSE 15.3
+ list_locks_mock = {
+ "bar": {"type": "package", "match_type": "glob", "case_sensitive": "on"},
+ "minimal_base": {
+ "type": "pattern",
+ "match_type": "glob",
+ "case_sensitive": "on",
+ },
+ "baz": {"type": "package", "match_type": "glob", "case_sensitive": "on"},
+ }
+
+ cmd = MagicMock(
+ return_value={
+ "pid": 1234,
+ "retcode": 0,
+ "stdout": "Specified lock has been successfully added.",
+ "stderr": "",
+ }
+ )
+ with patch.object(
+ zypper, "list_locks", MagicMock(return_value=list_locks_mock)
+ ), patch.dict(zypper.__salt__, {"cmd.run_all": cmd}):
+ ret = zypper.hold("foo")
+ assert ret["foo"]["changes"]["old"] == ""
+ assert ret["foo"]["changes"]["new"] == "hold"
+ assert ret["foo"]["comment"] == "Package foo is now being held."
+ cmd.assert_called_once_with(
+ ["zypper", "--non-interactive", "--no-refresh", "al", "foo"],
+ env={},
+ output_loglevel="trace",
+ python_shell=False,
+ )
+ cmd.reset_mock()
+ ret = zypper.hold(pkgs=["foo", "bar"])
+ assert ret["foo"]["changes"]["old"] == ""
+ assert ret["foo"]["changes"]["new"] == "hold"
+ assert ret["foo"]["comment"] == "Package foo is now being held."
+ assert ret["bar"]["changes"] == {}
+ assert ret["bar"]["comment"] == "Package bar is already set to be held."
+ cmd.assert_called_once_with(
+ ["zypper", "--non-interactive", "--no-refresh", "al", "foo"],
+ env={},
+ output_loglevel="trace",
+ python_shell=False,
+ )
+
+
+def test_pkg_unhold():
+ """
+ Tests unholding packages with Zypper
+ """
+
+ # Test openSUSE 15.3
+ list_locks_mock = {
+ "bar": {"type": "package", "match_type": "glob", "case_sensitive": "on"},
+ "minimal_base": {
+ "type": "pattern",
+ "match_type": "glob",
+ "case_sensitive": "on",
+ },
+ "baz": {"type": "package", "match_type": "glob", "case_sensitive": "on"},
+ }
+
+ cmd = MagicMock(
+ return_value={
+ "pid": 1234,
+ "retcode": 0,
+ "stdout": "1 lock has been successfully removed.",
+ "stderr": "",
+ }
+ )
+ with patch.object(
+ zypper, "list_locks", MagicMock(return_value=list_locks_mock)
+ ), patch.dict(zypper.__salt__, {"cmd.run_all": cmd}):
+ ret = zypper.unhold("foo")
+ assert ret["foo"]["comment"] == "Package foo was already unheld."
+ cmd.assert_not_called()
+ cmd.reset_mock()
+ ret = zypper.unhold(pkgs=["foo", "bar"])
+ assert ret["foo"]["changes"] == {}
+ assert ret["foo"]["comment"] == "Package foo was already unheld."
+ assert ret["bar"]["changes"]["old"] == "hold"
+ assert ret["bar"]["changes"]["new"] == ""
+ assert ret["bar"]["comment"] == "Package bar is no longer held."
+ cmd.assert_called_once_with(
+ ["zypper", "--non-interactive", "--no-refresh", "rl", "bar"],
+ env={},
+ output_loglevel="trace",
+ python_shell=False,
+ )
+
+
+def test_pkg_list_holds():
+ """
+ Tests listing of calculated held packages with Zypper
+ """
+
+ # Test openSUSE 15.3
+ list_locks_mock = {
+ "bar": {"type": "package", "match_type": "glob", "case_sensitive": "on"},
+ "minimal_base": {
+ "type": "pattern",
+ "match_type": "glob",
+ "case_sensitive": "on",
+ },
+ "baz": {"type": "package", "match_type": "glob", "case_sensitive": "on"},
+ }
+ installed_pkgs = {
+ "foo": [{"edition": "1.2.3-1.1"}],
+ "bar": [{"edition": "2.3.4-2.1", "epoch": "2"}],
+ }
+
+ def zypper_search_mock(name, *_args, **_kwargs):
+ if name in installed_pkgs:
+ return {name: installed_pkgs.get(name)}
+
+ with patch.object(
+ zypper, "list_locks", MagicMock(return_value=list_locks_mock)
+ ), patch.object(
+ zypper, "search", MagicMock(side_effect=zypper_search_mock)
+ ), patch.object(
+ zypper, "info_installed", MagicMock(side_effect=zypper_search_mock)
+ ):
+ ret = zypper.list_holds()
+ assert len(ret) == 1
+ assert "bar-2:2.3.4-2.1.*" in ret
diff --git a/tests/pytests/unit/states/test_pkg.py b/tests/pytests/unit/states/test_pkg.py
index 7e667d36fd..17b91bcb39 100644
--- a/tests/pytests/unit/states/test_pkg.py
+++ b/tests/pytests/unit/states/test_pkg.py
@@ -578,3 +578,140 @@ def test_removed_purged_with_changes_test_true(list_pkgs, action):
ret = pkg_actions[action]("pkga", test=True)
assert ret["result"] is None
assert ret["changes"] == expected
+
+
+@pytest.mark.parametrize(
+ "package_manager", [("Zypper"), ("YUM/DNF"), ("APT")],
+)
+def test_held_unheld(package_manager):
+ """
+ Test pkg.held and pkg.unheld with Zypper, YUM/DNF and APT
+ """
+
+ if package_manager == "Zypper":
+ list_holds_func = "pkg.list_locks"
+ list_holds_mock = MagicMock(
+ return_value={
+ "bar": {
+ "type": "package",
+ "match_type": "glob",
+ "case_sensitive": "on",
+ },
+ "minimal_base": {
+ "type": "pattern",
+ "match_type": "glob",
+ "case_sensitive": "on",
+ },
+ "baz": {
+ "type": "package",
+ "match_type": "glob",
+ "case_sensitive": "on",
+ },
+ }
+ )
+ elif package_manager == "YUM/DNF":
+ list_holds_func = "pkg.list_holds"
+ list_holds_mock = MagicMock(
+ return_value=["bar-0:1.2.3-1.1.*", "baz-0:2.3.4-2.1.*"]
+ )
+ elif package_manager == "APT":
+ list_holds_func = "pkg.get_selections"
+ list_holds_mock = MagicMock(return_value={"hold": ["bar", "baz"]})
+
+ def pkg_hold(name, pkgs=None, *_args, **__kwargs):
+ if name and pkgs is None:
+ pkgs = [name]
+ ret = {}
+ for pkg in pkgs:
+ ret.update(
+ {
+ pkg: {
+ "name": pkg,
+ "changes": {"new": "hold", "old": ""},
+ "result": True,
+ "comment": "Package {} is now being held.".format(pkg),
+ }
+ }
+ )
+ return ret
+
+ def pkg_unhold(name, pkgs=None, *_args, **__kwargs):
+ if name and pkgs is None:
+ pkgs = [name]
+ ret = {}
+ for pkg in pkgs:
+ ret.update(
+ {
+ pkg: {
+ "name": pkg,
+ "changes": {"new": "", "old": "hold"},
+ "result": True,
+ "comment": "Package {} is no longer held.".format(pkg),
+ }
+ }
+ )
+ return ret
+
+ hold_mock = MagicMock(side_effect=pkg_hold)
+ unhold_mock = MagicMock(side_effect=pkg_unhold)
+
+ # Testing with Zypper
+ with patch.dict(
+ pkg.__salt__,
+ {
+ list_holds_func: list_holds_mock,
+ "pkg.hold": hold_mock,
+ "pkg.unhold": unhold_mock,
+ },
+ ):
+ # Holding one of two packages
+ ret = pkg.held("held-test", pkgs=["foo", "bar"])
+ assert "foo" in ret["changes"]
+ assert len(ret["changes"]) == 1
+ hold_mock.assert_called_once_with(name="held-test", pkgs=["foo"])
+ unhold_mock.assert_not_called()
+
+ hold_mock.reset_mock()
+ unhold_mock.reset_mock()
+
+ # Holding one of two packages and replacing all the rest held packages
+ ret = pkg.held("held-test", pkgs=["foo", "bar"], replace=True)
+ assert "foo" in ret["changes"]
+ assert "baz" in ret["changes"]
+ assert len(ret["changes"]) == 2
+ hold_mock.assert_called_once_with(name="held-test", pkgs=["foo"])
+ unhold_mock.assert_called_once_with(name="held-test", pkgs=["baz"])
+
+ hold_mock.reset_mock()
+ unhold_mock.reset_mock()
+
+ # Remove all holds
+ ret = pkg.held("held-test", pkgs=[], replace=True)
+ assert "bar" in ret["changes"]
+ assert "baz" in ret["changes"]
+ assert len(ret["changes"]) == 2
+ hold_mock.assert_not_called()
+ unhold_mock.assert_any_call(name="held-test", pkgs=["baz"])
+ unhold_mock.assert_any_call(name="held-test", pkgs=["bar"])
+
+ hold_mock.reset_mock()
+ unhold_mock.reset_mock()
+
+ # Unolding one of two packages
+ ret = pkg.unheld("held-test", pkgs=["foo", "bar"])
+ assert "bar" in ret["changes"]
+ assert len(ret["changes"]) == 1
+ unhold_mock.assert_called_once_with(name="held-test", pkgs=["bar"])
+ hold_mock.assert_not_called()
+
+ hold_mock.reset_mock()
+ unhold_mock.reset_mock()
+
+ # Remove all holds
+ ret = pkg.unheld("held-test", all=True)
+ assert "bar" in ret["changes"]
+ assert "baz" in ret["changes"]
+ assert len(ret["changes"]) == 2
+ hold_mock.assert_not_called()
+ unhold_mock.assert_any_call(name="held-test", pkgs=["baz"])
+ unhold_mock.assert_any_call(name="held-test", pkgs=["bar"])
--
2.34.1