From 218a0bb719b1348265b98c346606374f4bbf2416957b91f346f1e75e198c045e Mon Sep 17 00:00:00 2001 From: Victor Zhestkov Date: Mon, 27 May 2024 11:14:47 +0000 Subject: [PATCH] Accepting request 1177104 from home:PSuarezHernandez:branches:systemsmanagement:saltstack - Several fixes for tests to avoid errors and failures in some OSes - Speed up salt.matcher.confirm_top by using __context__ - Do not call the async wrapper calls with the separate thread - Prevent OOM with high amount of batch async calls (bsc#1216063) - Add missing contextvars dependency in salt.version - Skip tests for unsupported algorithm on old OpenSSL version - Remove redundant `_file_find` call to the master - Prevent possible exception in tornado.concurrent.Future._set_done - Make reactor engine less blocking the EventPublisher - Make salt-master self recoverable on killing EventPublisher - Improve broken events catching and reporting - Make logging calls lighter - Remove unused import causing delays on starting salt-master - Added: * improve-broken-events-catching-and-reporting.patch * add-missing-contextvars-dependency-in-salt.version.patch * prevent-oom-with-high-amount-of-batch-async-calls-bs.patch * speed-up-salt.matcher.confirm_top-by-using-__context.patch * remove-redundant-_file_find-call-to-the-master.patch * make-logging-calls-lighter.patch * make-salt-master-self-recoverable-on-killing-eventpu.patch * skip-tests-for-unsupported-algorithm-on-old-openssl-.patch * remove-unused-import-causing-delays-on-starting-salt.patch * do-not-call-the-async-wrapper-calls-with-the-separat.patch * prevent-possible-exception-in-tornado.concurrent.fut.patch * several-fixes-for-tests-to-avoid-errors-and-failures.patch * make-reactor-engine-less-blocking-the-eventpublisher.patch OBS-URL: https://build.opensuse.org/request/show/1177104 OBS-URL: https://build.opensuse.org/package/show/systemsmanagement:saltstack/salt?expand=0&rev=243 --- _lastrevision | 2 +- ...ntextvars-dependency-in-salt.version.patch | 38 + ...async-wrapper-calls-with-the-separat.patch | 254 ++++ ...broken-events-catching-and-reporting.patch | 202 +++ make-logging-calls-lighter.patch | 233 +++ ...ine-less-blocking-the-eventpublisher.patch | 104 ++ ...-self-recoverable-on-killing-eventpu.patch | 243 ++++ ...-high-amount-of-batch-async-calls-bs.patch | 1272 +++++++++++++++++ ...-exception-in-tornado.concurrent.fut.patch | 37 + ...undant-_file_find-call-to-the-master.patch | 40 + ...port-causing-delays-on-starting-salt.patch | 25 + salt.changes | 32 + salt.spec | 28 +- ...r-tests-to-avoid-errors-and-failures.patch | 557 ++++++++ ...nsupported-algorithm-on-old-openssl-.patch | 117 ++ ...tcher.confirm_top-by-using-__context.patch | 64 + 16 files changed, 3246 insertions(+), 2 deletions(-) create mode 100644 add-missing-contextvars-dependency-in-salt.version.patch create mode 100644 do-not-call-the-async-wrapper-calls-with-the-separat.patch create mode 100644 improve-broken-events-catching-and-reporting.patch create mode 100644 make-logging-calls-lighter.patch create mode 100644 make-reactor-engine-less-blocking-the-eventpublisher.patch create mode 100644 make-salt-master-self-recoverable-on-killing-eventpu.patch create mode 100644 prevent-oom-with-high-amount-of-batch-async-calls-bs.patch create mode 100644 prevent-possible-exception-in-tornado.concurrent.fut.patch create mode 100644 remove-redundant-_file_find-call-to-the-master.patch create mode 100644 remove-unused-import-causing-delays-on-starting-salt.patch create mode 100644 several-fixes-for-tests-to-avoid-errors-and-failures.patch create mode 100644 skip-tests-for-unsupported-algorithm-on-old-openssl-.patch create mode 100644 speed-up-salt.matcher.confirm_top-by-using-__context.patch diff --git a/_lastrevision b/_lastrevision index 89b1a9f..2652c56 100644 --- a/_lastrevision +++ b/_lastrevision @@ -1 +1 @@ -2106e7b467fde79190f65e2de73559c3ba7e8888 \ No newline at end of file +365aa2dd170197cd849f08270e3bd2376cd79be9 \ No newline at end of file diff --git a/add-missing-contextvars-dependency-in-salt.version.patch b/add-missing-contextvars-dependency-in-salt.version.patch new file mode 100644 index 0000000..6c8a26a --- /dev/null +++ b/add-missing-contextvars-dependency-in-salt.version.patch @@ -0,0 +1,38 @@ +From 1a5716365e0c3b8d290759847f4046f28ee4b79f Mon Sep 17 00:00:00 2001 +From: Victor Zhestkov +Date: Wed, 15 May 2024 09:53:20 +0200 +Subject: [PATCH] Add missing contextvars dependency in salt.version + +--- + salt/version.py | 1 + + tests/unit/states/test_pip_state.py | 2 +- + 2 files changed, 2 insertions(+), 1 deletion(-) + +diff --git a/salt/version.py b/salt/version.py +index 44372830b2..b2643550e9 100644 +--- a/salt/version.py ++++ b/salt/version.py +@@ -717,6 +717,7 @@ def dependency_information(include_salt_cloud=False): + ("docker-py", "docker", "__version__"), + ("packaging", "packaging", "__version__"), + ("looseversion", "looseversion", None), ++ ("contextvars", "contextvars", None), + ("relenv", "relenv", "__version__"), + ] + +diff --git a/tests/unit/states/test_pip_state.py b/tests/unit/states/test_pip_state.py +index d70b115000..fe5d171a15 100644 +--- a/tests/unit/states/test_pip_state.py ++++ b/tests/unit/states/test_pip_state.py +@@ -419,7 +419,7 @@ class PipStateInstallationErrorTest(TestCase): + def test_importable_installation_error(self): + extra_requirements = [] + for name, version in salt.version.dependency_information(): +- if name in ["PyYAML", "packaging", "looseversion"]: ++ if name in ["PyYAML", "packaging", "looseversion", "contextvars"]: + extra_requirements.append("{}=={}".format(name, version)) + failures = {} + pip_version_requirements = [ +-- +2.45.0 + diff --git a/do-not-call-the-async-wrapper-calls-with-the-separat.patch b/do-not-call-the-async-wrapper-calls-with-the-separat.patch new file mode 100644 index 0000000..1f4368b --- /dev/null +++ b/do-not-call-the-async-wrapper-calls-with-the-separat.patch @@ -0,0 +1,254 @@ +From 4021f938ed1b64acd47ccaefc111197a1118ee4f Mon Sep 17 00:00:00 2001 +From: Victor Zhestkov +Date: Wed, 15 May 2024 11:48:46 +0200 +Subject: [PATCH] Do not call the async wrapper calls with the separate + thread + +* Do not run method with the distinct thread + +* Move test_asynchronous.py to pytests +--- + salt/utils/asynchronous.py | 25 +---- + tests/pytests/unit/utils/test_asynchronous.py | 92 +++++++++++++++++++ + tests/unit/utils/test_asynchronous.py | 81 ---------------- + 3 files changed, 94 insertions(+), 104 deletions(-) + create mode 100644 tests/pytests/unit/utils/test_asynchronous.py + delete mode 100644 tests/unit/utils/test_asynchronous.py + +diff --git a/salt/utils/asynchronous.py b/salt/utils/asynchronous.py +index 88596a4a20..55a50cbcbf 100644 +--- a/salt/utils/asynchronous.py ++++ b/salt/utils/asynchronous.py +@@ -2,11 +2,8 @@ + Helpers/utils for working with tornado asynchronous stuff + """ + +- + import contextlib + import logging +-import sys +-import threading + + import salt.ext.tornado.concurrent + import salt.ext.tornado.ioloop +@@ -111,30 +108,12 @@ class SyncWrapper: + + def _wrap(self, key): + def wrap(*args, **kwargs): +- results = [] +- thread = threading.Thread( +- target=self._target, +- args=(key, args, kwargs, results, self.io_loop), ++ return self.io_loop.run_sync( ++ lambda: getattr(self.obj, key)(*args, **kwargs) + ) +- thread.start() +- thread.join() +- if results[0]: +- return results[1] +- else: +- exc_info = results[1] +- raise exc_info[1].with_traceback(exc_info[2]) + + return wrap + +- def _target(self, key, args, kwargs, results, io_loop): +- try: +- result = io_loop.run_sync(lambda: getattr(self.obj, key)(*args, **kwargs)) +- results.append(True) +- results.append(result) +- except Exception: # pylint: disable=broad-except +- results.append(False) +- results.append(sys.exc_info()) +- + def __enter__(self): + return self + +diff --git a/tests/pytests/unit/utils/test_asynchronous.py b/tests/pytests/unit/utils/test_asynchronous.py +new file mode 100644 +index 0000000000..2b5613e2bf +--- /dev/null ++++ b/tests/pytests/unit/utils/test_asynchronous.py +@@ -0,0 +1,92 @@ ++import tornado.gen ++import tornado.ioloop ++ ++import salt.utils.asynchronous as asynchronous ++ ++ ++class HelperA: ++ ++ async_methods = [ ++ "sleep", ++ ] ++ ++ def __init__(self, io_loop=None): ++ pass ++ ++ @tornado.gen.coroutine ++ def sleep(self): ++ yield tornado.gen.sleep(0.1) ++ raise tornado.gen.Return(True) ++ ++ ++class HelperB: ++ ++ async_methods = [ ++ "sleep", ++ ] ++ ++ def __init__(self, a=None, io_loop=None): ++ if a is None: ++ a = asynchronous.SyncWrapper(HelperA) ++ self.a = a ++ ++ @tornado.gen.coroutine ++ def sleep(self): ++ yield tornado.gen.sleep(0.1) ++ self.a.sleep() ++ raise tornado.gen.Return(False) ++ ++ ++def test_helpers(): ++ """ ++ Test that the helper classes do what we expect within a regular asynchronous env ++ """ ++ io_loop = tornado.ioloop.IOLoop(make_current=False) ++ ret = io_loop.run_sync(lambda: HelperA().sleep()) ++ assert ret is True ++ ++ ret = io_loop.run_sync(lambda: HelperB().sleep()) ++ assert ret is False ++ ++ ++def test_basic_wrap(): ++ """ ++ Test that we can wrap an asynchronous caller. ++ """ ++ sync = asynchronous.SyncWrapper(HelperA) ++ ret = sync.sleep() ++ assert ret is True ++ ++ ++def test_basic_wrap_series(): ++ """ ++ Test that we can wrap an asynchronous caller and call the method in series. ++ """ ++ sync = asynchronous.SyncWrapper(HelperA) ++ ret = sync.sleep() ++ assert ret is True ++ ret = sync.sleep() ++ assert ret is True ++ ++ ++def test_double(): ++ """ ++ Test when the asynchronous wrapper object itself creates a wrap of another thing ++ ++ This works fine since the second wrap is based on the first's IOLoop so we ++ don't have to worry about complex start/stop mechanics ++ """ ++ sync = asynchronous.SyncWrapper(HelperB) ++ ret = sync.sleep() ++ assert ret is False ++ ++ ++def test_double_sameloop(): ++ """ ++ Test asynchronous wrappers initiated from the same IOLoop, to ensure that ++ we don't wire up both to the same IOLoop (since it causes MANY problems). ++ """ ++ a = asynchronous.SyncWrapper(HelperA) ++ sync = asynchronous.SyncWrapper(HelperB, (a,)) ++ ret = sync.sleep() ++ assert ret is False +diff --git a/tests/unit/utils/test_asynchronous.py b/tests/unit/utils/test_asynchronous.py +deleted file mode 100644 +index e5bd974cb6..0000000000 +--- a/tests/unit/utils/test_asynchronous.py ++++ /dev/null +@@ -1,81 +0,0 @@ +-import salt.ext.tornado.gen +-import salt.ext.tornado.testing +-import salt.utils.asynchronous as asynchronous +-from salt.ext.tornado.testing import AsyncTestCase +- +- +-class HelperA: +- +- async_methods = [ +- "sleep", +- ] +- +- def __init__(self, io_loop=None): +- pass +- +- @salt.ext.tornado.gen.coroutine +- def sleep(self): +- yield salt.ext.tornado.gen.sleep(0.1) +- raise salt.ext.tornado.gen.Return(True) +- +- +-class HelperB: +- +- async_methods = [ +- "sleep", +- ] +- +- def __init__(self, a=None, io_loop=None): +- if a is None: +- a = asynchronous.SyncWrapper(HelperA) +- self.a = a +- +- @salt.ext.tornado.gen.coroutine +- def sleep(self): +- yield salt.ext.tornado.gen.sleep(0.1) +- self.a.sleep() +- raise salt.ext.tornado.gen.Return(False) +- +- +-class TestSyncWrapper(AsyncTestCase): +- @salt.ext.tornado.testing.gen_test +- def test_helpers(self): +- """ +- Test that the helper classes do what we expect within a regular asynchronous env +- """ +- ha = HelperA() +- ret = yield ha.sleep() +- self.assertTrue(ret) +- +- hb = HelperB() +- ret = yield hb.sleep() +- self.assertFalse(ret) +- +- def test_basic_wrap(self): +- """ +- Test that we can wrap an asynchronous caller. +- """ +- sync = asynchronous.SyncWrapper(HelperA) +- ret = sync.sleep() +- self.assertTrue(ret) +- +- def test_double(self): +- """ +- Test when the asynchronous wrapper object itself creates a wrap of another thing +- +- This works fine since the second wrap is based on the first's IOLoop so we +- don't have to worry about complex start/stop mechanics +- """ +- sync = asynchronous.SyncWrapper(HelperB) +- ret = sync.sleep() +- self.assertFalse(ret) +- +- def test_double_sameloop(self): +- """ +- Test asynchronous wrappers initiated from the same IOLoop, to ensure that +- we don't wire up both to the same IOLoop (since it causes MANY problems). +- """ +- a = asynchronous.SyncWrapper(HelperA) +- sync = asynchronous.SyncWrapper(HelperB, (a,)) +- ret = sync.sleep() +- self.assertFalse(ret) +-- +2.45.0 + diff --git a/improve-broken-events-catching-and-reporting.patch b/improve-broken-events-catching-and-reporting.patch new file mode 100644 index 0000000..a79544e --- /dev/null +++ b/improve-broken-events-catching-and-reporting.patch @@ -0,0 +1,202 @@ +From 88bd54971d39b34d9728f3fe5fcb493cec3ff2fd Mon Sep 17 00:00:00 2001 +From: Victor Zhestkov +Date: Wed, 15 May 2024 09:22:11 +0200 +Subject: [PATCH] Improve broken events catching and reporting + +* Improve broken events catching and reporting + +* Add test of catching SaltDeserializationError on reading event + +* Add test for fire_ret_load +--- + salt/utils/event.py | 23 +++- + tests/pytests/unit/utils/event/test_event.py | 107 +++++++++++++++++++ + 2 files changed, 128 insertions(+), 2 deletions(-) + +diff --git a/salt/utils/event.py b/salt/utils/event.py +index e6d7b00520..ef048335ae 100644 +--- a/salt/utils/event.py ++++ b/salt/utils/event.py +@@ -75,6 +75,7 @@ import salt.utils.platform + import salt.utils.process + import salt.utils.stringutils + import salt.utils.zeromq ++from salt.exceptions import SaltDeserializationError + + log = logging.getLogger(__name__) + +@@ -461,7 +462,13 @@ class SaltEvent: + salt.utils.stringutils.to_bytes(TAGEND) + ) # split tag from data + mtag = salt.utils.stringutils.to_str(mtag) +- data = salt.payload.loads(mdata, encoding="utf-8") ++ try: ++ data = salt.payload.loads(mdata, encoding="utf-8") ++ except SaltDeserializationError: ++ log.warning( ++ "SaltDeserializationError on unpacking data, the payload could be incomplete" ++ ) ++ raise + return mtag, data + + def _get_match_func(self, match_type=None): +@@ -583,6 +590,9 @@ class SaltEvent: + raise + else: + return None ++ except SaltDeserializationError: ++ log.error("Unable to deserialize received event") ++ return None + except RuntimeError: + return None + +@@ -889,6 +899,14 @@ class SaltEvent: + ret = load.get("return", {}) + retcode = load["retcode"] + ++ if not isinstance(ret, dict): ++ log.error( ++ "Event with bad payload received from '%s': %s", ++ load.get("id", "UNKNOWN"), ++ "".join(ret) if isinstance(ret, list) else ret, ++ ) ++ return ++ + try: + for tag, data in ret.items(): + data["retcode"] = retcode +@@ -910,7 +928,8 @@ class SaltEvent: + ) + except Exception as exc: # pylint: disable=broad-except + log.error( +- "Event iteration failed with exception: %s", ++ "Event from '%s' iteration failed with exception: %s", ++ load.get("id", "UNKNOWN"), + exc, + exc_info_on_loglevel=logging.DEBUG, + ) +diff --git a/tests/pytests/unit/utils/event/test_event.py b/tests/pytests/unit/utils/event/test_event.py +index f4b6c15999..3eadfaf6ba 100644 +--- a/tests/pytests/unit/utils/event/test_event.py ++++ b/tests/pytests/unit/utils/event/test_event.py +@@ -12,6 +12,7 @@ import salt.ext.tornado.ioloop + import salt.ext.tornado.iostream + import salt.utils.event + import salt.utils.stringutils ++from salt.exceptions import SaltDeserializationError + from salt.utils.event import SaltEvent + from tests.support.events import eventpublisher_process, eventsender_process + from tests.support.mock import patch +@@ -340,3 +341,109 @@ def test_master_pub_permissions(sock_dir): + assert bool(os.lstat(p).st_mode & stat.S_IRUSR) + assert not bool(os.lstat(p).st_mode & stat.S_IRGRP) + assert not bool(os.lstat(p).st_mode & stat.S_IROTH) ++ ++ ++def test_event_unpack_with_SaltDeserializationError(sock_dir): ++ with eventpublisher_process(str(sock_dir)), salt.utils.event.MasterEvent( ++ str(sock_dir), listen=True ++ ) as me, patch.object( ++ salt.utils.event.log, "warning", autospec=True ++ ) as mock_log_warning, patch.object( ++ salt.utils.event.log, "error", autospec=True ++ ) as mock_log_error: ++ me.fire_event({"data": "foo1"}, "evt1") ++ me.fire_event({"data": "foo2"}, "evt2") ++ evt2 = me.get_event(tag="") ++ with patch("salt.payload.loads", side_effect=SaltDeserializationError): ++ evt1 = me.get_event(tag="") ++ _assert_got_event(evt2, {"data": "foo2"}, expected_failure=True) ++ assert evt1 is None ++ assert ( ++ mock_log_warning.mock_calls[0].args[0] ++ == "SaltDeserializationError on unpacking data, the payload could be incomplete" ++ ) ++ assert ( ++ mock_log_error.mock_calls[0].args[0] ++ == "Unable to deserialize received event" ++ ) ++ ++ ++def test_event_fire_ret_load(): ++ event = SaltEvent(node=None) ++ test_load = { ++ "id": "minion_id.example.org", ++ "jid": "20240212095247760376", ++ "fun": "state.highstate", ++ "retcode": 254, ++ "return": { ++ "saltutil_|-sync_states_|-sync_states_|-sync_states": { ++ "result": True, ++ }, ++ "saltutil_|-sync_modules_|-sync_modules_|-sync_modules": { ++ "result": False, ++ }, ++ }, ++ } ++ test_fire_event_data = { ++ "result": False, ++ "retcode": 254, ++ "jid": "20240212095247760376", ++ "id": "minion_id.example.org", ++ "success": False, ++ "return": "Error: saltutil.sync_modules", ++ "fun": "state.highstate", ++ } ++ test_unhandled_exc = "Unhandled exception running state.highstate" ++ test_traceback = [ ++ "Traceback (most recent call last):\n", ++ " Just an example of possible return as a list\n", ++ ] ++ with patch.object( ++ event, "fire_event", side_effect=[None, None, Exception] ++ ) as mock_fire_event, patch.object( ++ salt.utils.event.log, "error", autospec=True ++ ) as mock_log_error: ++ event.fire_ret_load(test_load) ++ assert len(mock_fire_event.mock_calls) == 2 ++ assert mock_fire_event.mock_calls[0].args[0] == test_fire_event_data ++ assert mock_fire_event.mock_calls[0].args[1] == "saltutil.sync_modules" ++ assert mock_fire_event.mock_calls[1].args[0] == test_fire_event_data ++ assert ( ++ mock_fire_event.mock_calls[1].args[1] ++ == "salt/job/20240212095247760376/sub/minion_id.example.org/error/state.highstate" ++ ) ++ assert not mock_log_error.mock_calls ++ ++ mock_log_error.reset_mock() ++ ++ event.fire_ret_load(test_load) ++ assert ( ++ mock_log_error.mock_calls[0].args[0] ++ == "Event from '%s' iteration failed with exception: %s" ++ ) ++ assert mock_log_error.mock_calls[0].args[1] == "minion_id.example.org" ++ ++ mock_log_error.reset_mock() ++ test_load["return"] = test_unhandled_exc ++ ++ event.fire_ret_load(test_load) ++ assert ( ++ mock_log_error.mock_calls[0].args[0] ++ == "Event with bad payload received from '%s': %s" ++ ) ++ assert mock_log_error.mock_calls[0].args[1] == "minion_id.example.org" ++ assert ( ++ mock_log_error.mock_calls[0].args[2] ++ == "Unhandled exception running state.highstate" ++ ) ++ ++ mock_log_error.reset_mock() ++ test_load["return"] = test_traceback ++ ++ event.fire_ret_load(test_load) ++ assert ( ++ mock_log_error.mock_calls[0].args[0] ++ == "Event with bad payload received from '%s': %s" ++ ) ++ assert mock_log_error.mock_calls[0].args[1] == "minion_id.example.org" ++ assert mock_log_error.mock_calls[0].args[2] == "".join(test_traceback) +-- +2.45.0 + diff --git a/make-logging-calls-lighter.patch b/make-logging-calls-lighter.patch new file mode 100644 index 0000000..194da2b --- /dev/null +++ b/make-logging-calls-lighter.patch @@ -0,0 +1,233 @@ +From 48b6f57ece7eb9f58b8e6da40ec241b6df3f6d01 Mon Sep 17 00:00:00 2001 +From: Victor Zhestkov +Date: Wed, 15 May 2024 09:20:18 +0200 +Subject: [PATCH] Make logging calls lighter + +* Call set_lowest_log_level_by_opts with set_logging_options_dict + +* Fix the _logging test with setting minimum logging level + +* Fix test_deferred_stream_handler test + +* Fix vt.Terminal failing test: test_log_sanitize + +Fixes failing test added in a09b4f445052be66f0ac53fd01fa02bfa5b82ea6 + +We can't assume tests are run at debug level, so this ensures the test +passes regardless of what logging level is currently set by capturing +the output in caplog at DEBUG which stream_stdout/stream_stderr uses by +default. + +Signed-off-by: Joe Groocock + +--------- + +Signed-off-by: Joe Groocock +Co-authored-by: Joe Groocock +--- + salt/_logging/impl.py | 1 + + .../integration/_logging/test_logging.py | 106 ++++++++++++++++++ + .../handlers/test_deferred_stream_handler.py | 9 +- + tests/pytests/unit/utils/test_vt.py | 6 +- + 4 files changed, 117 insertions(+), 5 deletions(-) + create mode 100644 tests/pytests/integration/_logging/test_logging.py + +diff --git a/salt/_logging/impl.py b/salt/_logging/impl.py +index 2d1a276cb8..1d71cb8be8 100644 +--- a/salt/_logging/impl.py ++++ b/salt/_logging/impl.py +@@ -426,6 +426,7 @@ def set_logging_options_dict(opts): + except AttributeError: + pass + set_logging_options_dict.__options_dict__ = opts ++ set_lowest_log_level_by_opts(opts) + + + def freeze_logging_options_dict(): +diff --git a/tests/pytests/integration/_logging/test_logging.py b/tests/pytests/integration/_logging/test_logging.py +new file mode 100644 +index 0000000000..8e38f55b38 +--- /dev/null ++++ b/tests/pytests/integration/_logging/test_logging.py +@@ -0,0 +1,106 @@ ++import logging ++import os ++ ++import pytest ++ ++import salt._logging.impl as log_impl ++from tests.support.mock import MagicMock, patch ++ ++pytestmark = [ ++ pytest.mark.skip_on_windows(reason="Temporarily skipped on the newer golden images") ++] ++ ++ ++log = logging.getLogger(__name__) ++ ++ ++@pytest.fixture ++def configure_loader_modules(): ++ return {log_impl: {}} ++ ++ ++def log_nameToLevel(name): ++ """ ++ Return the numeric representation of textual logging level ++ """ ++ # log level values ++ CRITICAL = 50 ++ FATAL = CRITICAL ++ ERROR = 40 ++ WARNING = 30 ++ WARN = WARNING ++ INFO = 20 ++ DEBUG = 10 ++ NOTSET = 0 ++ ++ _nameToLevel = { ++ "CRITICAL": CRITICAL, ++ "FATAL": FATAL, ++ "ERROR": ERROR, ++ "WARN": WARNING, ++ "WARNING": WARNING, ++ "INFO": INFO, ++ "DEBUG": DEBUG, ++ "NOTSET": NOTSET, ++ } ++ return _nameToLevel.get(name, None) ++ ++ ++def test_lowest_log_level(): ++ ret = log_impl.get_lowest_log_level() ++ assert ret is not None ++ ++ log_impl.set_lowest_log_level(log_nameToLevel("DEBUG")) ++ ret = log_impl.get_lowest_log_level() ++ assert ret is log_nameToLevel("DEBUG") ++ ++ log_impl.set_lowest_log_level(log_nameToLevel("WARNING")) ++ ret = log_impl.get_lowest_log_level() ++ assert ret is log_nameToLevel("WARNING") ++ ++ opts = {"log_level": "ERROR", "log_level_logfile": "INFO"} ++ log_impl.set_lowest_log_level_by_opts(opts) ++ ret = log_impl.get_lowest_log_level() ++ assert ret is log_nameToLevel("INFO") ++ ++ ++def test_get_logging_level_from_string(caplog): ++ ret = log_impl.get_logging_level_from_string(None) ++ assert ret is log_nameToLevel("WARNING") ++ ++ ret = log_impl.get_logging_level_from_string(log_nameToLevel("DEBUG")) ++ assert ret is log_nameToLevel("DEBUG") ++ ++ ret = log_impl.get_logging_level_from_string("CRITICAL") ++ assert ret is log_nameToLevel("CRITICAL") ++ ++ caplog.clear() ++ with caplog.at_level(logging.WARNING): ++ msg = "Could not translate the logging level string 'BADLEVEL' into an actual logging level integer. Returning 'logging.ERROR'." ++ ret = log_impl.get_logging_level_from_string("BADLEVEL") ++ assert ret is log_nameToLevel("ERROR") ++ assert msg in caplog.text ++ ++ ++def test_logfile_handler(caplog): ++ caplog.clear() ++ with caplog.at_level(logging.WARNING): ++ ret = log_impl.is_logfile_handler_configured() ++ assert ret is False ++ ++ msg = "log_path setting is set to `None`. Nothing else to do" ++ log_path = None ++ assert log_impl.setup_logfile_handler(log_path) is None ++ assert msg in caplog.text ++ ++ ++def test_in_mainprocess(): ++ ret = log_impl.in_mainprocess() ++ assert ret is True ++ ++ curr_pid = os.getpid() ++ with patch( ++ "os.getpid", MagicMock(side_effect=[AttributeError, curr_pid, curr_pid]) ++ ): ++ ret = log_impl.in_mainprocess() ++ assert ret is True +diff --git a/tests/pytests/unit/_logging/handlers/test_deferred_stream_handler.py b/tests/pytests/unit/_logging/handlers/test_deferred_stream_handler.py +index 76b0e88eca..62c0dff4be 100644 +--- a/tests/pytests/unit/_logging/handlers/test_deferred_stream_handler.py ++++ b/tests/pytests/unit/_logging/handlers/test_deferred_stream_handler.py +@@ -9,6 +9,7 @@ import pytest + from pytestshellutils.utils.processes import terminate_process + + from salt._logging.handlers import DeferredStreamHandler ++from salt._logging.impl import set_lowest_log_level + from salt.utils.nb_popen import NonBlockingPopen + from tests.support.helpers import CaptureOutput, dedent + from tests.support.runtests import RUNTIME_VARS +@@ -20,7 +21,7 @@ def _sync_with_handlers_proc_target(): + + with CaptureOutput() as stds: + handler = DeferredStreamHandler(sys.stderr) +- handler.setLevel(logging.DEBUG) ++ set_lowest_log_level(logging.DEBUG) + formatter = logging.Formatter("%(message)s") + handler.setFormatter(formatter) + logging.root.addHandler(handler) +@@ -45,7 +46,7 @@ def _deferred_write_on_flush_proc_target(): + + with CaptureOutput() as stds: + handler = DeferredStreamHandler(sys.stderr) +- handler.setLevel(logging.DEBUG) ++ set_lowest_log_level(logging.DEBUG) + formatter = logging.Formatter("%(message)s") + handler.setFormatter(formatter) + logging.root.addHandler(handler) +@@ -126,7 +127,7 @@ def test_deferred_write_on_atexit(tmp_path): + # Just loop consuming output + while True: + if time.time() > max_time: +- pytest.fail("Script didn't exit after {} second".format(execution_time)) ++ pytest.fail(f"Script didn't exit after {execution_time} second") + + time.sleep(0.125) + _out = proc.recv() +@@ -146,7 +147,7 @@ def test_deferred_write_on_atexit(tmp_path): + finally: + terminate_process(proc.pid, kill_children=True) + if b"Foo" not in err: +- pytest.fail("'Foo' should be in stderr and it's not: {}".format(err)) ++ pytest.fail(f"'Foo' should be in stderr and it's not: {err}") + + + @pytest.mark.skip_on_windows(reason="Windows does not support SIGINT") +diff --git a/tests/pytests/unit/utils/test_vt.py b/tests/pytests/unit/utils/test_vt.py +index 438a6eb09c..c31b25e623 100644 +--- a/tests/pytests/unit/utils/test_vt.py ++++ b/tests/pytests/unit/utils/test_vt.py +@@ -1,3 +1,4 @@ ++import logging + import os + import signal + +@@ -43,10 +44,13 @@ def test_log_sanitize(test_cmd, caplog): + cmd, + log_stdout=True, + log_stderr=True, ++ log_stdout_level="debug", ++ log_stderr_level="debug", + log_sanitize=password, + stream_stdout=False, + stream_stderr=False, + ) +- ret = term.recv() ++ with caplog.at_level(logging.DEBUG): ++ ret = term.recv() + assert password not in caplog.text + assert "******" in caplog.text +-- +2.45.0 + diff --git a/make-reactor-engine-less-blocking-the-eventpublisher.patch b/make-reactor-engine-less-blocking-the-eventpublisher.patch new file mode 100644 index 0000000..ba1e393 --- /dev/null +++ b/make-reactor-engine-less-blocking-the-eventpublisher.patch @@ -0,0 +1,104 @@ +From 0d35f09288700f5c961567442c3fcc25838b8de4 Mon Sep 17 00:00:00 2001 +From: Victor Zhestkov +Date: Wed, 15 May 2024 09:44:21 +0200 +Subject: [PATCH] Make reactor engine less blocking the EventPublisher + +--- + salt/utils/reactor.py | 45 +++++++++++++++++++++++++++---------------- + 1 file changed, 28 insertions(+), 17 deletions(-) + +diff --git a/salt/utils/reactor.py b/salt/utils/reactor.py +index 19420a51cf..78adad34da 100644 +--- a/salt/utils/reactor.py ++++ b/salt/utils/reactor.py +@@ -1,10 +1,12 @@ + """ + Functions which implement running reactor jobs + """ ++ + import fnmatch + import glob + import logging + import os ++from threading import Lock + + import salt.client + import salt.defaults.exitcodes +@@ -194,13 +196,6 @@ class Reactor(salt.utils.process.SignalHandlingProcess, salt.state.Compiler): + self.resolve_aliases(chunks) + return chunks + +- def call_reactions(self, chunks): +- """ +- Execute the reaction state +- """ +- for chunk in chunks: +- self.wrap.run(chunk) +- + def run(self): + """ + Enter into the server loop +@@ -218,7 +213,7 @@ class Reactor(salt.utils.process.SignalHandlingProcess, salt.state.Compiler): + ) as event: + self.wrap = ReactWrap(self.opts) + +- for data in event.iter_events(full=True): ++ for data in event.iter_events(full=True, auto_reconnect=True): + # skip all events fired by ourselves + if data["data"].get("user") == self.wrap.event_user: + continue +@@ -268,15 +263,9 @@ class Reactor(salt.utils.process.SignalHandlingProcess, salt.state.Compiler): + if not self.is_leader: + continue + else: +- reactors = self.list_reactors(data["tag"]) +- if not reactors: +- continue +- chunks = self.reactions(data["tag"], data["data"], reactors) +- if chunks: +- try: +- self.call_reactions(chunks) +- except SystemExit: +- log.warning("Exit ignored by reactor") ++ self.wrap.call_reactions( ++ data, self.list_reactors, self.reactions ++ ) + + + class ReactWrap: +@@ -297,6 +286,7 @@ class ReactWrap: + + def __init__(self, opts): + self.opts = opts ++ self._run_lock = Lock() + if ReactWrap.client_cache is None: + ReactWrap.client_cache = salt.utils.cache.CacheDict( + opts["reactor_refresh_interval"] +@@ -480,3 +470,24 @@ class ReactWrap: + Wrap LocalCaller to execute remote exec functions locally on the Minion + """ + self.client_cache["caller"].cmd(fun, *kwargs["arg"], **kwargs["kwarg"]) ++ ++ def _call_reactions(self, data, list_reactors, get_reactions): ++ reactors = list_reactors(data["tag"]) ++ if not reactors: ++ return ++ chunks = get_reactions(data["tag"], data["data"], reactors) ++ if not chunks: ++ return ++ with self._run_lock: ++ try: ++ for chunk in chunks: ++ self.run(chunk) ++ except Exception as exc: # pylint: disable=broad-except ++ log.error( ++ "Exception while calling the reactions: %s", exc, exc_info=True ++ ) ++ ++ def call_reactions(self, data, list_reactors, get_reactions): ++ return self.pool.fire_async( ++ self._call_reactions, args=(data, list_reactors, get_reactions) ++ ) +-- +2.45.0 + diff --git a/make-salt-master-self-recoverable-on-killing-eventpu.patch b/make-salt-master-self-recoverable-on-killing-eventpu.patch new file mode 100644 index 0000000..c353e49 --- /dev/null +++ b/make-salt-master-self-recoverable-on-killing-eventpu.patch @@ -0,0 +1,243 @@ +From 794b5d1aa7b8e880e9a21940183d241c6cbde9c9 Mon Sep 17 00:00:00 2001 +From: Victor Zhestkov +Date: Wed, 15 May 2024 09:42:23 +0200 +Subject: [PATCH] Make salt-master self recoverable on killing + EventPublisher + +* Implement timeout and tries to transport.ipc.IPCClient.send + +* Make timeout and tries configurable for fire_event + +* Add test of timeout and tries + +* Prevent exceptions from tornado Future on closing the IPC connection +--- + salt/transport/ipc.py | 73 +++++++++++++++++--- + salt/utils/event.py | 21 +++++- + tests/pytests/unit/utils/event/test_event.py | 43 ++++++++++++ + 3 files changed, 125 insertions(+), 12 deletions(-) + +diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py +index cee100b086..6631781c5c 100644 +--- a/salt/transport/ipc.py ++++ b/salt/transport/ipc.py +@@ -2,7 +2,6 @@ + IPC transport classes + """ + +- + import errno + import logging + import socket +@@ -340,7 +339,8 @@ class IPCClient: + try: + log.trace("IPCClient: Connecting to socket: %s", self.socket_path) + yield self.stream.connect(sock_addr) +- self._connecting_future.set_result(True) ++ if self._connecting_future is not None: ++ self._connecting_future.set_result(True) + break + except Exception as e: # pylint: disable=broad-except + if self.stream.closed(): +@@ -350,7 +350,8 @@ class IPCClient: + if self.stream is not None: + self.stream.close() + self.stream = None +- self._connecting_future.set_exception(e) ++ if self._connecting_future is not None: ++ self._connecting_future.set_exception(e) + break + + yield salt.ext.tornado.gen.sleep(1) +@@ -365,7 +366,13 @@ class IPCClient: + return + + self._closing = True +- self._connecting_future = None ++ if self._connecting_future is not None: ++ try: ++ self._connecting_future.set_result(True) ++ self._connecting_future.exception() # pylint: disable=E0203 ++ except Exception as e: # pylint: disable=broad-except ++ log.warning("Unhandled connecting exception: %s", e, exc_info=True) ++ self._connecting_future = None + + log.debug("Closing %s instance", self.__class__.__name__) + +@@ -435,8 +442,6 @@ class IPCMessageClient(IPCClient): + "close", + ] + +- # FIXME timeout unimplemented +- # FIXME tries unimplemented + @salt.ext.tornado.gen.coroutine + def send(self, msg, timeout=None, tries=None): + """ +@@ -445,12 +450,60 @@ class IPCMessageClient(IPCClient): + If the socket is not currently connected, a connection will be established. + + :param dict msg: The message to be sent +- :param int timeout: Timeout when sending message (Currently unimplemented) ++ :param int timeout: Timeout when sending message ++ :param int tries: Maximum numer of tries to send message + """ +- if not self.connected(): +- yield self.connect() ++ if tries is None or tries < 1: ++ tries = 1 ++ due_time = None ++ if timeout is not None: ++ due_time = time.time() + timeout ++ _try = 1 ++ exc_count = 0 + pack = salt.transport.frame.frame_msg_ipc(msg, raw_body=True) +- yield self.stream.write(pack) ++ while _try <= tries: ++ if not self.connected(): ++ self.close() ++ self.stream = None ++ self._closing = False ++ try: ++ yield self.connect( ++ timeout=( ++ None if due_time is None else max(due_time - time.time(), 1) ++ ) ++ ) ++ except StreamClosedError: ++ log.warning( ++ "IPCMessageClient: Unable to reconnect IPC stream on sending message with ID: 0x%016x%s", ++ id(msg), ++ f", retry {_try} of {tries}" if tries > 1 else "", ++ ) ++ exc_count += 1 ++ if self.connected(): ++ try: ++ yield self.stream.write(pack) ++ return ++ except StreamClosedError: ++ if self._closing: ++ break ++ log.warning( ++ "IPCMessageClient: Stream was closed on sending message with ID: 0x%016x", ++ id(msg), ++ ) ++ exc_count += 1 ++ if exc_count == 1: ++ # Give one more chance in case if stream was detected as closed ++ # on the first write attempt ++ continue ++ cur_time = time.time() ++ _try += 1 ++ if _try > tries or (due_time is not None and cur_time > due_time): ++ return ++ yield salt.ext.tornado.gen.sleep( ++ 1 ++ if due_time is None ++ else (due_time - cur_time) / max(tries - _try + 1, 1) ++ ) + + + class IPCMessageServer(IPCServer): +diff --git a/salt/utils/event.py b/salt/utils/event.py +index ef048335ae..36b530d1af 100644 +--- a/salt/utils/event.py ++++ b/salt/utils/event.py +@@ -270,6 +270,10 @@ class SaltEvent: + # and don't read out events from the buffer on an on-going basis, + # the buffer will grow resulting in big memory usage. + self.connect_pub() ++ self.pusher_send_timeout = self.opts.get( ++ "pusher_send_timeout", self.opts.get("timeout") ++ ) ++ self.pusher_send_tries = self.opts.get("pusher_send_tries", 3) + + @classmethod + def __load_cache_regex(cls): +@@ -839,10 +843,18 @@ class SaltEvent: + ] + ) + msg = salt.utils.stringutils.to_bytes(event, "utf-8") ++ if timeout is None: ++ timeout_s = self.pusher_send_timeout ++ else: ++ timeout_s = float(timeout) / 1000 + if self._run_io_loop_sync: + with salt.utils.asynchronous.current_ioloop(self.io_loop): + try: +- self.pusher.send(msg) ++ self.pusher.send( ++ msg, ++ timeout=timeout_s, ++ tries=self.pusher_send_tries, ++ ) + except Exception as exc: # pylint: disable=broad-except + log.debug( + "Publisher send failed with exception: %s", +@@ -851,7 +863,12 @@ class SaltEvent: + ) + raise + else: +- self.io_loop.spawn_callback(self.pusher.send, msg) ++ self.io_loop.spawn_callback( ++ self.pusher.send, ++ msg, ++ timeout=timeout_s, ++ tries=self.pusher_send_tries, ++ ) + return True + + def fire_master(self, data, tag, timeout=1000): +diff --git a/tests/pytests/unit/utils/event/test_event.py b/tests/pytests/unit/utils/event/test_event.py +index 3eadfaf6ba..fa9e420a93 100644 +--- a/tests/pytests/unit/utils/event/test_event.py ++++ b/tests/pytests/unit/utils/event/test_event.py +@@ -447,3 +447,46 @@ def test_event_fire_ret_load(): + ) + assert mock_log_error.mock_calls[0].args[1] == "minion_id.example.org" + assert mock_log_error.mock_calls[0].args[2] == "".join(test_traceback) ++ ++ ++@pytest.mark.slow_test ++def test_event_single_timeout_tries(sock_dir): ++ """Test an event is sent with timout and tries""" ++ ++ write_calls_count = 0 ++ real_stream_write = None ++ ++ @salt.ext.tornado.gen.coroutine ++ def write_mock(pack): ++ nonlocal write_calls_count ++ nonlocal real_stream_write ++ write_calls_count += 1 ++ if write_calls_count > 3: ++ yield real_stream_write(pack) ++ else: ++ raise salt.ext.tornado.iostream.StreamClosedError() ++ ++ with eventpublisher_process(str(sock_dir)), salt.utils.event.MasterEvent( ++ str(sock_dir), listen=True ++ ) as me: ++ me.fire_event({"data": "foo1"}, "evt1") ++ evt1 = me.get_event(tag="evt1") ++ _assert_got_event(evt1, {"data": "foo1"}) ++ real_stream_write = me.pusher.stream.write ++ with patch.object( ++ me.pusher, ++ "connected", ++ side_effect=[True, True, False, False, True, True], ++ ), patch.object( ++ me.pusher, ++ "connect", ++ side_effect=salt.ext.tornado.iostream.StreamClosedError, ++ ), patch.object( ++ me.pusher.stream, ++ "write", ++ write_mock, ++ ): ++ me.fire_event({"data": "bar2"}, "evt2", timeout=5000) ++ evt2 = me.get_event(tag="evt2") ++ _assert_got_event(evt2, {"data": "bar2"}) ++ assert write_calls_count == 4 +-- +2.45.0 + diff --git a/prevent-oom-with-high-amount-of-batch-async-calls-bs.patch b/prevent-oom-with-high-amount-of-batch-async-calls-bs.patch new file mode 100644 index 0000000..c74e14e --- /dev/null +++ b/prevent-oom-with-high-amount-of-batch-async-calls-bs.patch @@ -0,0 +1,1272 @@ +From d57472b4fa2213ec551197ee2e147aef364fdcfe Mon Sep 17 00:00:00 2001 +From: Victor Zhestkov +Date: Wed, 15 May 2024 11:47:35 +0200 +Subject: [PATCH] Prevent OOM with high amount of batch async calls + (bsc#1216063) + +* Refactor batch_async implementation + +* Fix batch_async tests after refactoring +--- + salt/cli/batch_async.py | 584 ++++++++++++++------- + salt/master.py | 9 +- + tests/pytests/unit/cli/test_batch_async.py | 360 +++++++------ + 3 files changed, 597 insertions(+), 356 deletions(-) + +diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py +index 1012ce37cc..5d49993faa 100644 +--- a/salt/cli/batch_async.py ++++ b/salt/cli/batch_async.py +@@ -2,18 +2,193 @@ + Execute a job on the targeted minions by using a moving window of fixed size `batch`. + """ + +-import gc +- +-# pylint: enable=import-error,no-name-in-module,redefined-builtin + import logging ++import re + + import salt.client + import salt.ext.tornado ++import salt.utils.event + from salt.cli.batch import batch_get_eauth, batch_get_opts, get_bnum ++from salt.ext.tornado.iostream import StreamClosedError + + log = logging.getLogger(__name__) + + ++__SHARED_EVENTS_CHANNEL = None ++ ++ ++def _get_shared_events_channel(opts, io_loop): ++ global __SHARED_EVENTS_CHANNEL ++ if __SHARED_EVENTS_CHANNEL is None: ++ __SHARED_EVENTS_CHANNEL = SharedEventsChannel(opts, io_loop) ++ return __SHARED_EVENTS_CHANNEL ++ ++ ++def _destroy_unused_shared_events_channel(): ++ global __SHARED_EVENTS_CHANNEL ++ if __SHARED_EVENTS_CHANNEL is not None and __SHARED_EVENTS_CHANNEL.destroy_unused(): ++ __SHARED_EVENTS_CHANNEL = None ++ ++ ++def batch_async_required(opts, minions, extra): ++ """ ++ Check opts to identify if batch async is required for the operation. ++ """ ++ if not isinstance(minions, list): ++ False ++ batch_async_opts = opts.get("batch_async", {}) ++ batch_async_threshold = ( ++ batch_async_opts.get("threshold", 1) ++ if isinstance(batch_async_opts, dict) ++ else 1 ++ ) ++ if batch_async_threshold == -1: ++ batch_size = get_bnum(extra, minions, True) ++ return len(minions) >= batch_size ++ elif batch_async_threshold > 0: ++ return len(minions) >= batch_async_threshold ++ return False ++ ++ ++class SharedEventsChannel: ++ def __init__(self, opts, io_loop): ++ self.io_loop = io_loop ++ self.local_client = salt.client.get_local_client( ++ opts["conf_file"], io_loop=self.io_loop ++ ) ++ self.master_event = salt.utils.event.get_event( ++ "master", ++ sock_dir=self.local_client.opts["sock_dir"], ++ opts=self.local_client.opts, ++ listen=True, ++ io_loop=self.io_loop, ++ keep_loop=True, ++ ) ++ self.master_event.set_event_handler(self.__handle_event) ++ if self.master_event.subscriber.stream: ++ self.master_event.subscriber.stream.set_close_callback(self.__handle_close) ++ self._re_tag_ret_event = re.compile(r"salt\/job\/(\d+)\/ret\/.*") ++ self._subscribers = {} ++ self._subscriptions = {} ++ self._used_by = set() ++ batch_async_opts = opts.get("batch_async", {}) ++ if not isinstance(batch_async_opts, dict): ++ batch_async_opts = {} ++ self._subscriber_reconnect_tries = batch_async_opts.get( ++ "subscriber_reconnect_tries", 5 ++ ) ++ self._subscriber_reconnect_interval = batch_async_opts.get( ++ "subscriber_reconnect_interval", 1.0 ++ ) ++ self._reconnecting_subscriber = False ++ ++ def subscribe(self, jid, op, subscriber_id, handler): ++ if subscriber_id not in self._subscribers: ++ self._subscribers[subscriber_id] = set() ++ if jid not in self._subscriptions: ++ self._subscriptions[jid] = [] ++ self._subscribers[subscriber_id].add(jid) ++ if (op, subscriber_id, handler) not in self._subscriptions[jid]: ++ self._subscriptions[jid].append((op, subscriber_id, handler)) ++ if not self.master_event.subscriber.connected(): ++ self.__reconnect_subscriber() ++ ++ def unsubscribe(self, jid, op, subscriber_id): ++ if subscriber_id not in self._subscribers: ++ return ++ jids = self._subscribers[subscriber_id].copy() ++ if jid is not None: ++ jids = set(jid) ++ for i_jid in jids: ++ self._subscriptions[i_jid] = list( ++ filter( ++ lambda x: not (op in (x[0], None) and x[1] == subscriber_id), ++ self._subscriptions.get(i_jid, []), ++ ) ++ ) ++ self._subscribers[subscriber_id].discard(i_jid) ++ self._subscriptions = dict(filter(lambda x: x[1], self._subscriptions.items())) ++ if not self._subscribers[subscriber_id]: ++ del self._subscribers[subscriber_id] ++ ++ @salt.ext.tornado.gen.coroutine ++ def __handle_close(self): ++ if not self._subscriptions: ++ return ++ log.warning("Master Event Subscriber was closed. Trying to reconnect...") ++ yield self.__reconnect_subscriber() ++ ++ @salt.ext.tornado.gen.coroutine ++ def __handle_event(self, raw): ++ if self.master_event is None: ++ return ++ try: ++ tag, data = self.master_event.unpack(raw) ++ tag_match = self._re_tag_ret_event.match(tag) ++ if tag_match: ++ jid = tag_match.group(1) ++ if jid in self._subscriptions: ++ for op, _, handler in self._subscriptions[jid]: ++ yield handler(tag, data, op) ++ except Exception as ex: # pylint: disable=W0703 ++ log.error( ++ "Exception occured while processing event: %s: %s", ++ tag, ++ ex, ++ exc_info=True, ++ ) ++ ++ @salt.ext.tornado.gen.coroutine ++ def __reconnect_subscriber(self): ++ if self.master_event.subscriber.connected() or self._reconnecting_subscriber: ++ return ++ self._reconnecting_subscriber = True ++ max_tries = max(1, int(self._subscriber_reconnect_tries)) ++ _try = 1 ++ while _try <= max_tries: ++ log.info( ++ "Trying to reconnect to event publisher (try %d of %d) ...", ++ _try, ++ max_tries, ++ ) ++ try: ++ yield self.master_event.subscriber.connect() ++ except StreamClosedError: ++ log.warning( ++ "Unable to reconnect to event publisher (try %d of %d)", ++ _try, ++ max_tries, ++ ) ++ if self.master_event.subscriber.connected(): ++ self.master_event.subscriber.stream.set_close_callback( ++ self.__handle_close ++ ) ++ log.info("Event publisher connection restored") ++ self._reconnecting_subscriber = False ++ return ++ if _try < max_tries: ++ yield salt.ext.tornado.gen.sleep(self._subscriber_reconnect_interval) ++ _try += 1 ++ self._reconnecting_subscriber = False ++ ++ def use(self, subscriber_id): ++ self._used_by.add(subscriber_id) ++ return self ++ ++ def unuse(self, subscriber_id): ++ self._used_by.discard(subscriber_id) ++ ++ def destroy_unused(self): ++ if self._used_by: ++ return False ++ self.master_event.remove_event_handler(self.__handle_event) ++ self.master_event.destroy() ++ self.master_event = None ++ self.local_client.destroy() ++ self.local_client = None ++ return True ++ ++ + class BatchAsync: + """ + Run a job on the targeted minions by using a moving window of fixed size `batch`. +@@ -28,14 +203,14 @@ class BatchAsync: + - gather_job_timeout: `find_job` timeout + - timeout: time to wait before firing a `find_job` + +- When the batch stars, a `start` event is fired: ++ When the batch starts, a `start` event is fired: + - tag: salt/batch//start + - data: { + "available_minions": self.minions, + "down_minions": targeted_minions - presence_ping_minions + } + +- When the batch ends, an `done` event is fired: ++ When the batch ends, a `done` event is fired: + - tag: salt/batch//done + - data: { + "available_minions": self.minions, +@@ -45,17 +220,26 @@ class BatchAsync: + } + """ + +- def __init__(self, parent_opts, jid_gen, clear_load): +- ioloop = salt.ext.tornado.ioloop.IOLoop.current() +- self.local = salt.client.get_local_client( +- parent_opts["conf_file"], io_loop=ioloop ++ def __init__(self, opts, jid_gen, clear_load): ++ self.extra_job_kwargs = {} ++ kwargs = clear_load.get("kwargs", {}) ++ for kwarg in ("module_executors", "executor_opts"): ++ if kwarg in kwargs: ++ self.extra_job_kwargs[kwarg] = kwargs[kwarg] ++ elif kwarg in opts: ++ self.extra_job_kwargs[kwarg] = opts[kwarg] ++ self.io_loop = salt.ext.tornado.ioloop.IOLoop.current() ++ self.events_channel = _get_shared_events_channel(opts, self.io_loop).use( ++ id(self) + ) + if "gather_job_timeout" in clear_load["kwargs"]: + clear_load["gather_job_timeout"] = clear_load["kwargs"].pop( + "gather_job_timeout" + ) + else: +- clear_load["gather_job_timeout"] = self.local.opts["gather_job_timeout"] ++ clear_load["gather_job_timeout"] = self.events_channel.local_client.opts[ ++ "gather_job_timeout" ++ ] + self.batch_presence_ping_timeout = clear_load["kwargs"].get( + "batch_presence_ping_timeout", None + ) +@@ -64,8 +248,8 @@ class BatchAsync: + clear_load.pop("tgt"), + clear_load.pop("fun"), + clear_load["kwargs"].pop("batch"), +- self.local.opts, +- **clear_load ++ self.events_channel.local_client.opts, ++ **clear_load, + ) + self.eauth = batch_get_eauth(clear_load["kwargs"]) + self.metadata = clear_load["kwargs"].get("metadata", {}) +@@ -78,54 +262,45 @@ class BatchAsync: + self.jid_gen = jid_gen + self.ping_jid = jid_gen() + self.batch_jid = jid_gen() +- self.find_job_jid = jid_gen() + self.find_job_returned = set() ++ self.metadata.update({"batch_jid": self.batch_jid, "ping_jid": self.ping_jid}) + self.ended = False +- self.event = salt.utils.event.get_event( +- "master", +- self.opts["sock_dir"], +- self.opts["transport"], +- opts=self.opts, +- listen=True, +- io_loop=ioloop, +- keep_loop=True, +- ) ++ self.event = self.events_channel.master_event + self.scheduled = False +- self.patterns = set() + + def __set_event_handler(self): +- ping_return_pattern = "salt/job/{}/ret/*".format(self.ping_jid) +- batch_return_pattern = "salt/job/{}/ret/*".format(self.batch_jid) +- self.event.subscribe(ping_return_pattern, match_type="glob") +- self.event.subscribe(batch_return_pattern, match_type="glob") +- self.patterns = { +- (ping_return_pattern, "ping_return"), +- (batch_return_pattern, "batch_run"), +- } +- self.event.set_event_handler(self.__event_handler) ++ self.events_channel.subscribe( ++ self.ping_jid, "ping_return", id(self), self.__event_handler ++ ) ++ self.events_channel.subscribe( ++ self.batch_jid, "batch_run", id(self), self.__event_handler ++ ) + +- def __event_handler(self, raw): ++ @salt.ext.tornado.gen.coroutine ++ def __event_handler(self, tag, data, op): + if not self.event: + return + try: +- mtag, data = self.event.unpack(raw) +- for (pattern, op) in self.patterns: +- if mtag.startswith(pattern[:-1]): +- minion = data["id"] +- if op == "ping_return": +- self.minions.add(minion) +- if self.targeted_minions == self.minions: +- self.event.io_loop.spawn_callback(self.start_batch) +- elif op == "find_job_return": +- if data.get("return", None): +- self.find_job_returned.add(minion) +- elif op == "batch_run": +- if minion in self.active: +- self.active.remove(minion) +- self.done_minions.add(minion) +- self.event.io_loop.spawn_callback(self.schedule_next) +- except Exception as ex: +- log.error("Exception occured while processing event: {}".format(ex)) ++ minion = data["id"] ++ if op == "ping_return": ++ self.minions.add(minion) ++ if self.targeted_minions == self.minions: ++ yield self.start_batch() ++ elif op == "find_job_return": ++ if data.get("return", None): ++ self.find_job_returned.add(minion) ++ elif op == "batch_run": ++ if minion in self.active: ++ self.active.remove(minion) ++ self.done_minions.add(minion) ++ yield self.schedule_next() ++ except Exception as ex: # pylint: disable=W0703 ++ log.error( ++ "Exception occured while processing event: %s: %s", ++ tag, ++ ex, ++ exc_info=True, ++ ) + + def _get_next(self): + to_run = ( +@@ -139,176 +314,203 @@ class BatchAsync: + ) + return set(list(to_run)[:next_batch_size]) + ++ @salt.ext.tornado.gen.coroutine + def check_find_job(self, batch_minions, jid): +- if self.event: +- find_job_return_pattern = "salt/job/{}/ret/*".format(jid) +- self.event.unsubscribe(find_job_return_pattern, match_type="glob") +- self.patterns.remove((find_job_return_pattern, "find_job_return")) +- +- timedout_minions = batch_minions.difference( +- self.find_job_returned +- ).difference(self.done_minions) +- self.timedout_minions = self.timedout_minions.union(timedout_minions) +- self.active = self.active.difference(self.timedout_minions) +- running = batch_minions.difference(self.done_minions).difference( +- self.timedout_minions +- ) ++ """ ++ Check if the job with specified ``jid`` was finished on the minions ++ """ ++ if not self.event: ++ return ++ self.events_channel.unsubscribe(jid, "find_job_return", id(self)) + +- if timedout_minions: +- self.schedule_next() ++ timedout_minions = batch_minions.difference(self.find_job_returned).difference( ++ self.done_minions ++ ) ++ self.timedout_minions = self.timedout_minions.union(timedout_minions) ++ self.active = self.active.difference(self.timedout_minions) ++ running = batch_minions.difference(self.done_minions).difference( ++ self.timedout_minions ++ ) + +- if self.event and running: +- self.find_job_returned = self.find_job_returned.difference(running) +- self.event.io_loop.spawn_callback(self.find_job, running) ++ if timedout_minions: ++ yield self.schedule_next() ++ ++ if self.event and running: ++ self.find_job_returned = self.find_job_returned.difference(running) ++ yield self.find_job(running) + + @salt.ext.tornado.gen.coroutine + def find_job(self, minions): +- if self.event: +- not_done = minions.difference(self.done_minions).difference( +- self.timedout_minions ++ """ ++ Find if the job was finished on the minions ++ """ ++ if not self.event: ++ return ++ not_done = minions.difference(self.done_minions).difference( ++ self.timedout_minions ++ ) ++ if not not_done: ++ return ++ try: ++ jid = self.jid_gen() ++ self.events_channel.subscribe( ++ jid, "find_job_return", id(self), self.__event_handler + ) +- try: +- if not_done: +- jid = self.jid_gen() +- find_job_return_pattern = "salt/job/{}/ret/*".format(jid) +- self.patterns.add((find_job_return_pattern, "find_job_return")) +- self.event.subscribe(find_job_return_pattern, match_type="glob") +- ret = yield self.local.run_job_async( +- not_done, +- "saltutil.find_job", +- [self.batch_jid], +- "list", +- gather_job_timeout=self.opts["gather_job_timeout"], +- jid=jid, +- **self.eauth +- ) +- yield salt.ext.tornado.gen.sleep(self.opts["gather_job_timeout"]) +- if self.event: +- self.event.io_loop.spawn_callback( +- self.check_find_job, not_done, jid +- ) +- except Exception as ex: +- log.error( +- "Exception occured handling batch async: {}. Aborting execution.".format( +- ex +- ) +- ) +- self.close_safe() ++ ret = yield self.events_channel.local_client.run_job_async( ++ not_done, ++ "saltutil.find_job", ++ [self.batch_jid], ++ "list", ++ gather_job_timeout=self.opts["gather_job_timeout"], ++ jid=jid, ++ io_loop=self.io_loop, ++ listen=False, ++ **self.eauth, ++ ) ++ yield salt.ext.tornado.gen.sleep(self.opts["gather_job_timeout"]) ++ if self.event: ++ yield self.check_find_job(not_done, jid) ++ except Exception as ex: # pylint: disable=W0703 ++ log.error( ++ "Exception occured handling batch async: %s. Aborting execution.", ++ ex, ++ exc_info=True, ++ ) ++ self.close_safe() + + @salt.ext.tornado.gen.coroutine + def start(self): ++ """ ++ Start the batch execution ++ """ ++ if not self.event: ++ return ++ self.__set_event_handler() ++ ping_return = yield self.events_channel.local_client.run_job_async( ++ self.opts["tgt"], ++ "test.ping", ++ [], ++ self.opts.get("selected_target_option", self.opts.get("tgt_type", "glob")), ++ gather_job_timeout=self.opts["gather_job_timeout"], ++ jid=self.ping_jid, ++ metadata=self.metadata, ++ io_loop=self.io_loop, ++ listen=False, ++ **self.eauth, ++ ) ++ self.targeted_minions = set(ping_return["minions"]) ++ # start batching even if not all minions respond to ping ++ yield salt.ext.tornado.gen.sleep( ++ self.batch_presence_ping_timeout or self.opts["gather_job_timeout"] ++ ) + if self.event: +- self.__set_event_handler() +- ping_return = yield self.local.run_job_async( +- self.opts["tgt"], +- "test.ping", +- [], +- self.opts.get( +- "selected_target_option", self.opts.get("tgt_type", "glob") +- ), +- gather_job_timeout=self.opts["gather_job_timeout"], +- jid=self.ping_jid, +- metadata=self.metadata, +- **self.eauth +- ) +- self.targeted_minions = set(ping_return["minions"]) +- # start batching even if not all minions respond to ping +- yield salt.ext.tornado.gen.sleep( +- self.batch_presence_ping_timeout or self.opts["gather_job_timeout"] +- ) +- if self.event: +- self.event.io_loop.spawn_callback(self.start_batch) ++ yield self.start_batch() + + @salt.ext.tornado.gen.coroutine + def start_batch(self): +- if not self.initialized: +- self.batch_size = get_bnum(self.opts, self.minions, True) +- self.initialized = True +- data = { +- "available_minions": self.minions, +- "down_minions": self.targeted_minions.difference(self.minions), +- "metadata": self.metadata, +- } +- ret = self.event.fire_event( +- data, "salt/batch/{}/start".format(self.batch_jid) +- ) +- if self.event: +- self.event.io_loop.spawn_callback(self.run_next) ++ """ ++ Fire `salt/batch/*/start` and continue batch with `run_next` ++ """ ++ if self.initialized: ++ return ++ self.batch_size = get_bnum(self.opts, self.minions, True) ++ self.initialized = True ++ data = { ++ "available_minions": self.minions, ++ "down_minions": self.targeted_minions.difference(self.minions), ++ "metadata": self.metadata, ++ } ++ yield self.events_channel.master_event.fire_event_async( ++ data, f"salt/batch/{self.batch_jid}/start" ++ ) ++ if self.event: ++ yield self.run_next() + + @salt.ext.tornado.gen.coroutine + def end_batch(self): ++ """ ++ End the batch and call safe closing ++ """ + left = self.minions.symmetric_difference( + self.done_minions.union(self.timedout_minions) + ) +- if not left and not self.ended: +- self.ended = True +- data = { +- "available_minions": self.minions, +- "down_minions": self.targeted_minions.difference(self.minions), +- "done_minions": self.done_minions, +- "timedout_minions": self.timedout_minions, +- "metadata": self.metadata, +- } +- self.event.fire_event(data, "salt/batch/{}/done".format(self.batch_jid)) +- +- # release to the IOLoop to allow the event to be published +- # before closing batch async execution +- yield salt.ext.tornado.gen.sleep(1) +- self.close_safe() ++ # Send salt/batch/*/done only if there is nothing to do ++ # and the event haven't been sent already ++ if left or self.ended: ++ return ++ self.ended = True ++ data = { ++ "available_minions": self.minions, ++ "down_minions": self.targeted_minions.difference(self.minions), ++ "done_minions": self.done_minions, ++ "timedout_minions": self.timedout_minions, ++ "metadata": self.metadata, ++ } ++ yield self.events_channel.master_event.fire_event_async( ++ data, f"salt/batch/{self.batch_jid}/done" ++ ) ++ ++ # release to the IOLoop to allow the event to be published ++ # before closing batch async execution ++ yield salt.ext.tornado.gen.sleep(1) ++ self.close_safe() + + def close_safe(self): +- for (pattern, label) in self.patterns: +- self.event.unsubscribe(pattern, match_type="glob") +- self.event.remove_event_handler(self.__event_handler) ++ if self.events_channel is not None: ++ self.events_channel.unsubscribe(None, None, id(self)) ++ self.events_channel.unuse(id(self)) ++ self.events_channel = None ++ _destroy_unused_shared_events_channel() + self.event = None +- self.local = None +- self.ioloop = None +- del self +- gc.collect() + + @salt.ext.tornado.gen.coroutine + def schedule_next(self): +- if not self.scheduled: +- self.scheduled = True +- # call later so that we maybe gather more returns +- yield salt.ext.tornado.gen.sleep(self.batch_delay) +- if self.event: +- self.event.io_loop.spawn_callback(self.run_next) ++ if self.scheduled: ++ return ++ self.scheduled = True ++ # call later so that we maybe gather more returns ++ yield salt.ext.tornado.gen.sleep(self.batch_delay) ++ if self.event: ++ yield self.run_next() + + @salt.ext.tornado.gen.coroutine + def run_next(self): ++ """ ++ Continue batch execution with the next targets ++ """ + self.scheduled = False + next_batch = self._get_next() +- if next_batch: +- self.active = self.active.union(next_batch) +- try: +- ret = yield self.local.run_job_async( +- next_batch, +- self.opts["fun"], +- self.opts["arg"], +- "list", +- raw=self.opts.get("raw", False), +- ret=self.opts.get("return", ""), +- gather_job_timeout=self.opts["gather_job_timeout"], +- jid=self.batch_jid, +- metadata=self.metadata, +- ) +- +- yield salt.ext.tornado.gen.sleep(self.opts["timeout"]) +- +- # The batch can be done already at this point, which means no self.event +- if self.event: +- self.event.io_loop.spawn_callback(self.find_job, set(next_batch)) +- except Exception as ex: +- log.error("Error in scheduling next batch: %s. Aborting execution", ex) +- self.active = self.active.difference(next_batch) +- self.close_safe() +- else: ++ if not next_batch: + yield self.end_batch() +- gc.collect() ++ return ++ self.active = self.active.union(next_batch) ++ try: ++ ret = yield self.events_channel.local_client.run_job_async( ++ next_batch, ++ self.opts["fun"], ++ self.opts["arg"], ++ "list", ++ raw=self.opts.get("raw", False), ++ ret=self.opts.get("return", ""), ++ gather_job_timeout=self.opts["gather_job_timeout"], ++ jid=self.batch_jid, ++ metadata=self.metadata, ++ io_loop=self.io_loop, ++ listen=False, ++ **self.eauth, ++ **self.extra_job_kwargs, ++ ) + +- def __del__(self): +- self.local = None +- self.event = None +- self.ioloop = None +- gc.collect() ++ yield salt.ext.tornado.gen.sleep(self.opts["timeout"]) ++ ++ # The batch can be done already at this point, which means no self.event ++ if self.event: ++ yield self.find_job(set(next_batch)) ++ except Exception as ex: # pylint: disable=W0703 ++ log.error( ++ "Error in scheduling next batch: %s. Aborting execution", ++ ex, ++ exc_info=True, ++ ) ++ self.active = self.active.difference(next_batch) ++ self.close_safe() +diff --git a/salt/master.py b/salt/master.py +index 425b412148..d7182d10b5 100644 +--- a/salt/master.py ++++ b/salt/master.py +@@ -2,6 +2,7 @@ + This module contains all of the routines needed to set up a master server, this + involves preparing the three listeners and the workers needed by the master. + """ ++ + import collections + import copy + import ctypes +@@ -19,7 +20,6 @@ import time + import salt.acl + import salt.auth + import salt.channel.server +-import salt.cli.batch_async + import salt.client + import salt.client.ssh.client + import salt.crypt +@@ -55,6 +55,7 @@ import salt.utils.user + import salt.utils.verify + import salt.utils.zeromq + import salt.wheel ++from salt.cli.batch_async import BatchAsync, batch_async_required + from salt.config import DEFAULT_INTERVAL + from salt.defaults import DEFAULT_TARGET_DELIM + from salt.ext.tornado.stack_context import StackContext +@@ -2174,9 +2175,9 @@ class ClearFuncs(TransportMethods): + def publish_batch(self, clear_load, minions, missing): + batch_load = {} + batch_load.update(clear_load) +- batch = salt.cli.batch_async.BatchAsync( ++ batch = BatchAsync( + self.local.opts, +- functools.partial(self._prep_jid, clear_load, {}), ++ lambda: self._prep_jid(clear_load, {}), + batch_load, + ) + ioloop = salt.ext.tornado.ioloop.IOLoop.current() +@@ -2331,7 +2332,7 @@ class ClearFuncs(TransportMethods): + ), + }, + } +- if extra.get("batch", None): ++ if extra.get("batch", None) and batch_async_required(self.opts, minions, extra): + return self.publish_batch(clear_load, minions, missing) + + jid = self._prep_jid(clear_load, extra) +diff --git a/tests/pytests/unit/cli/test_batch_async.py b/tests/pytests/unit/cli/test_batch_async.py +index e0774ffff3..bc871aba54 100644 +--- a/tests/pytests/unit/cli/test_batch_async.py ++++ b/tests/pytests/unit/cli/test_batch_async.py +@@ -1,7 +1,7 @@ + import pytest + + import salt.ext.tornado +-from salt.cli.batch_async import BatchAsync ++from salt.cli.batch_async import BatchAsync, batch_async_required + from tests.support.mock import MagicMock, patch + + +@@ -22,16 +22,44 @@ def batch(temp_salt_master): + with patch("salt.cli.batch_async.batch_get_opts", MagicMock(return_value=opts)): + batch = BatchAsync( + opts, +- MagicMock(side_effect=["1234", "1235", "1236"]), ++ MagicMock(side_effect=["1234", "1235"]), + { + "tgt": "", + "fun": "", +- "kwargs": {"batch": "", "batch_presence_ping_timeout": 1}, ++ "kwargs": { ++ "batch": "", ++ "batch_presence_ping_timeout": 1, ++ "metadata": {"mykey": "myvalue"}, ++ }, + }, + ) + yield batch + + ++@pytest.mark.parametrize( ++ "threshold,minions,batch,expected", ++ [ ++ (1, 2, 200, True), ++ (1, 500, 200, True), ++ (0, 2, 200, False), ++ (0, 500, 200, False), ++ (-1, 2, 200, False), ++ (-1, 500, 200, True), ++ (-1, 9, 10, False), ++ (-1, 11, 10, True), ++ (10, 9, 8, False), ++ (10, 9, 10, False), ++ (10, 11, 8, True), ++ (10, 11, 10, True), ++ ], ++) ++def test_batch_async_required(threshold, minions, batch, expected): ++ minions_list = [f"minion{i}.example.org" for i in range(minions)] ++ batch_async_opts = {"batch_async": {"threshold": threshold}} ++ extra = {"batch": batch} ++ assert batch_async_required(batch_async_opts, minions_list, extra) == expected ++ ++ + def test_ping_jid(batch): + assert batch.ping_jid == "1234" + +@@ -40,10 +68,6 @@ def test_batch_jid(batch): + assert batch.batch_jid == "1235" + + +-def test_find_job_jid(batch): +- assert batch.find_job_jid == "1236" +- +- + def test_batch_size(batch): + """ + Tests passing batch value as a number +@@ -55,58 +79,74 @@ def test_batch_size(batch): + + + def test_batch_start_on_batch_presence_ping_timeout(batch): +- # batch_async = BatchAsyncMock(); +- batch.event = MagicMock() ++ future_ret = salt.ext.tornado.gen.Future() ++ future_ret.set_result({"minions": ["foo", "bar"]}) + future = salt.ext.tornado.gen.Future() +- future.set_result({"minions": ["foo", "bar"]}) +- batch.local.run_job_async.return_value = future +- with patch("salt.ext.tornado.gen.sleep", return_value=future): +- # ret = batch_async.start(batch) ++ future.set_result({}) ++ with patch.object(batch, "events_channel", MagicMock()), patch( ++ "salt.ext.tornado.gen.sleep", return_value=future ++ ), patch.object(batch, "start_batch", return_value=future) as start_batch_mock: ++ batch.events_channel.local_client.run_job_async.return_value = future_ret + ret = batch.start() +- # assert start_batch is called later with batch_presence_ping_timeout as param +- assert batch.event.io_loop.spawn_callback.call_args[0] == (batch.start_batch,) ++ # assert start_batch is called ++ start_batch_mock.assert_called_once() + # assert test.ping called +- assert batch.local.run_job_async.call_args[0] == ("*", "test.ping", [], "glob") ++ assert batch.events_channel.local_client.run_job_async.call_args[0] == ( ++ "*", ++ "test.ping", ++ [], ++ "glob", ++ ) + # assert targeted_minions == all minions matched by tgt + assert batch.targeted_minions == {"foo", "bar"} + + + def test_batch_start_on_gather_job_timeout(batch): +- # batch_async = BatchAsyncMock(); +- batch.event = MagicMock() + future = salt.ext.tornado.gen.Future() +- future.set_result({"minions": ["foo", "bar"]}) +- batch.local.run_job_async.return_value = future ++ future.set_result({}) ++ future_ret = salt.ext.tornado.gen.Future() ++ future_ret.set_result({"minions": ["foo", "bar"]}) + batch.batch_presence_ping_timeout = None +- with patch("salt.ext.tornado.gen.sleep", return_value=future): ++ with patch.object(batch, "events_channel", MagicMock()), patch( ++ "salt.ext.tornado.gen.sleep", return_value=future ++ ), patch.object( ++ batch, "start_batch", return_value=future ++ ) as start_batch_mock, patch.object( ++ batch, "batch_presence_ping_timeout", None ++ ): ++ batch.events_channel.local_client.run_job_async.return_value = future_ret + # ret = batch_async.start(batch) + ret = batch.start() +- # assert start_batch is called later with gather_job_timeout as param +- assert batch.event.io_loop.spawn_callback.call_args[0] == (batch.start_batch,) ++ # assert start_batch is called ++ start_batch_mock.assert_called_once() + + + def test_batch_fire_start_event(batch): + batch.minions = {"foo", "bar"} + batch.opts = {"batch": "2", "timeout": 5} +- batch.event = MagicMock() +- batch.metadata = {"mykey": "myvalue"} +- batch.start_batch() +- assert batch.event.fire_event.call_args[0] == ( +- { +- "available_minions": {"foo", "bar"}, +- "down_minions": set(), +- "metadata": batch.metadata, +- }, +- "salt/batch/1235/start", +- ) ++ with patch.object(batch, "events_channel", MagicMock()): ++ batch.start_batch() ++ assert batch.events_channel.master_event.fire_event_async.call_args[0] == ( ++ { ++ "available_minions": {"foo", "bar"}, ++ "down_minions": set(), ++ "metadata": batch.metadata, ++ }, ++ "salt/batch/1235/start", ++ ) + + + def test_start_batch_calls_next(batch): +- batch.run_next = MagicMock(return_value=MagicMock()) +- batch.event = MagicMock() +- batch.start_batch() +- assert batch.initialized +- assert batch.event.io_loop.spawn_callback.call_args[0] == (batch.run_next,) ++ batch.initialized = False ++ future = salt.ext.tornado.gen.Future() ++ future.set_result({}) ++ with patch.object(batch, "event", MagicMock()), patch.object( ++ batch, "events_channel", MagicMock() ++ ), patch.object(batch, "run_next", return_value=future) as run_next_mock: ++ batch.events_channel.master_event.fire_event_async.return_value = future ++ batch.start_batch() ++ assert batch.initialized ++ run_next_mock.assert_called_once() + + + def test_batch_fire_done_event(batch): +@@ -114,69 +154,52 @@ def test_batch_fire_done_event(batch): + batch.minions = {"foo", "bar"} + batch.done_minions = {"foo"} + batch.timedout_minions = {"bar"} +- batch.event = MagicMock() +- batch.metadata = {"mykey": "myvalue"} +- old_event = batch.event +- batch.end_batch() +- assert old_event.fire_event.call_args[0] == ( +- { +- "available_minions": {"foo", "bar"}, +- "done_minions": batch.done_minions, +- "down_minions": {"baz"}, +- "timedout_minions": batch.timedout_minions, +- "metadata": batch.metadata, +- }, +- "salt/batch/1235/done", +- ) +- +- +-def test_batch__del__(batch): +- batch = BatchAsync(MagicMock(), MagicMock(), MagicMock()) +- event = MagicMock() +- batch.event = event +- batch.__del__() +- assert batch.local is None +- assert batch.event is None +- assert batch.ioloop is None ++ with patch.object(batch, "events_channel", MagicMock()): ++ batch.end_batch() ++ assert batch.events_channel.master_event.fire_event_async.call_args[0] == ( ++ { ++ "available_minions": {"foo", "bar"}, ++ "done_minions": batch.done_minions, ++ "down_minions": {"baz"}, ++ "timedout_minions": batch.timedout_minions, ++ "metadata": batch.metadata, ++ }, ++ "salt/batch/1235/done", ++ ) + + + def test_batch_close_safe(batch): +- batch = BatchAsync(MagicMock(), MagicMock(), MagicMock()) +- event = MagicMock() +- batch.event = event +- batch.patterns = { +- ("salt/job/1234/ret/*", "find_job_return"), +- ("salt/job/4321/ret/*", "find_job_return"), +- } +- batch.close_safe() +- assert batch.local is None +- assert batch.event is None +- assert batch.ioloop is None +- assert len(event.unsubscribe.mock_calls) == 2 +- assert len(event.remove_event_handler.mock_calls) == 1 ++ with patch.object( ++ batch, "events_channel", MagicMock() ++ ) as events_channel_mock, patch.object(batch, "event", MagicMock()): ++ batch.close_safe() ++ batch.close_safe() ++ assert batch.events_channel is None ++ assert batch.event is None ++ events_channel_mock.unsubscribe.assert_called_once() ++ events_channel_mock.unuse.assert_called_once() + + + def test_batch_next(batch): +- batch.event = MagicMock() + batch.opts["fun"] = "my.fun" + batch.opts["arg"] = [] +- batch._get_next = MagicMock(return_value={"foo", "bar"}) + batch.batch_size = 2 + future = salt.ext.tornado.gen.Future() +- future.set_result({"minions": ["foo", "bar"]}) +- batch.local.run_job_async.return_value = future +- with patch("salt.ext.tornado.gen.sleep", return_value=future): ++ future.set_result({}) ++ with patch("salt.ext.tornado.gen.sleep", return_value=future), patch.object( ++ batch, "events_channel", MagicMock() ++ ), patch.object(batch, "_get_next", return_value={"foo", "bar"}), patch.object( ++ batch, "find_job", return_value=future ++ ) as find_job_mock: ++ batch.events_channel.local_client.run_job_async.return_value = future + batch.run_next() +- assert batch.local.run_job_async.call_args[0] == ( ++ assert batch.events_channel.local_client.run_job_async.call_args[0] == ( + {"foo", "bar"}, + "my.fun", + [], + "list", + ) +- assert batch.event.io_loop.spawn_callback.call_args[0] == ( +- batch.find_job, +- {"foo", "bar"}, +- ) ++ assert find_job_mock.call_args[0] == ({"foo", "bar"},) + assert batch.active == {"bar", "foo"} + + +@@ -239,124 +262,132 @@ def test_next_batch_all_timedout(batch): + + def test_batch__event_handler_ping_return(batch): + batch.targeted_minions = {"foo"} +- batch.event = MagicMock( +- unpack=MagicMock(return_value=("salt/job/1234/ret/foo", {"id": "foo"})) +- ) + batch.start() + assert batch.minions == set() +- batch._BatchAsync__event_handler(MagicMock()) ++ batch._BatchAsync__event_handler( ++ "salt/job/1234/ret/foo", {"id": "foo"}, "ping_return" ++ ) + assert batch.minions == {"foo"} + assert batch.done_minions == set() + + + def test_batch__event_handler_call_start_batch_when_all_pings_return(batch): + batch.targeted_minions = {"foo"} +- batch.event = MagicMock( +- unpack=MagicMock(return_value=("salt/job/1234/ret/foo", {"id": "foo"})) +- ) +- batch.start() +- batch._BatchAsync__event_handler(MagicMock()) +- assert batch.event.io_loop.spawn_callback.call_args[0] == (batch.start_batch,) ++ future = salt.ext.tornado.gen.Future() ++ future.set_result({}) ++ with patch.object(batch, "start_batch", return_value=future) as start_batch_mock: ++ batch.start() ++ batch._BatchAsync__event_handler( ++ "salt/job/1234/ret/foo", {"id": "foo"}, "ping_return" ++ ) ++ start_batch_mock.assert_called_once() + + + def test_batch__event_handler_not_call_start_batch_when_not_all_pings_return(batch): + batch.targeted_minions = {"foo", "bar"} +- batch.event = MagicMock( +- unpack=MagicMock(return_value=("salt/job/1234/ret/foo", {"id": "foo"})) +- ) +- batch.start() +- batch._BatchAsync__event_handler(MagicMock()) +- assert len(batch.event.io_loop.spawn_callback.mock_calls) == 0 ++ future = salt.ext.tornado.gen.Future() ++ future.set_result({}) ++ with patch.object(batch, "start_batch", return_value=future) as start_batch_mock: ++ batch.start() ++ batch._BatchAsync__event_handler( ++ "salt/job/1234/ret/foo", {"id": "foo"}, "ping_return" ++ ) ++ start_batch_mock.assert_not_called() + + + def test_batch__event_handler_batch_run_return(batch): +- batch.event = MagicMock( +- unpack=MagicMock(return_value=("salt/job/1235/ret/foo", {"id": "foo"})) +- ) +- batch.start() +- batch.active = {"foo"} +- batch._BatchAsync__event_handler(MagicMock()) +- assert batch.active == set() +- assert batch.done_minions == {"foo"} +- assert batch.event.io_loop.spawn_callback.call_args[0] == (batch.schedule_next,) ++ future = salt.ext.tornado.gen.Future() ++ future.set_result({}) ++ with patch.object( ++ batch, "schedule_next", return_value=future ++ ) as schedule_next_mock: ++ batch.start() ++ batch.active = {"foo"} ++ batch._BatchAsync__event_handler( ++ "salt/job/1235/ret/foo", {"id": "foo"}, "batch_run" ++ ) ++ assert batch.active == set() ++ assert batch.done_minions == {"foo"} ++ schedule_next_mock.assert_called_once() + + + def test_batch__event_handler_find_job_return(batch): +- batch.event = MagicMock( +- unpack=MagicMock( +- return_value=( +- "salt/job/1236/ret/foo", +- {"id": "foo", "return": "deadbeaf"}, +- ) +- ) +- ) + batch.start() +- batch.patterns.add(("salt/job/1236/ret/*", "find_job_return")) +- batch._BatchAsync__event_handler(MagicMock()) ++ batch._BatchAsync__event_handler( ++ "salt/job/1236/ret/foo", {"id": "foo", "return": "deadbeaf"}, "find_job_return" ++ ) + assert batch.find_job_returned == {"foo"} + + + def test_batch_run_next_end_batch_when_no_next(batch): +- batch.end_batch = MagicMock() +- batch._get_next = MagicMock(return_value={}) +- batch.run_next() +- assert len(batch.end_batch.mock_calls) == 1 ++ future = salt.ext.tornado.gen.Future() ++ future.set_result({}) ++ with patch.object( ++ batch, "_get_next", return_value={} ++ ), patch.object( ++ batch, "end_batch", return_value=future ++ ) as end_batch_mock: ++ batch.run_next() ++ end_batch_mock.assert_called_once() + + + def test_batch_find_job(batch): +- batch.event = MagicMock() + future = salt.ext.tornado.gen.Future() + future.set_result({}) +- batch.local.run_job_async.return_value = future + batch.minions = {"foo", "bar"} +- batch.jid_gen = MagicMock(return_value="1234") +- with patch("salt.ext.tornado.gen.sleep", return_value=future): ++ with patch("salt.ext.tornado.gen.sleep", return_value=future), patch.object( ++ batch, "check_find_job", return_value=future ++ ) as check_find_job_mock, patch.object( ++ batch, "jid_gen", return_value="1236" ++ ): ++ batch.events_channel.local_client.run_job_async.return_value = future + batch.find_job({"foo", "bar"}) +- assert batch.event.io_loop.spawn_callback.call_args[0] == ( +- batch.check_find_job, ++ assert check_find_job_mock.call_args[0] == ( + {"foo", "bar"}, +- "1234", ++ "1236", + ) + + + def test_batch_find_job_with_done_minions(batch): + batch.done_minions = {"bar"} +- batch.event = MagicMock() + future = salt.ext.tornado.gen.Future() + future.set_result({}) +- batch.local.run_job_async.return_value = future + batch.minions = {"foo", "bar"} +- batch.jid_gen = MagicMock(return_value="1234") +- with patch("salt.ext.tornado.gen.sleep", return_value=future): ++ with patch("salt.ext.tornado.gen.sleep", return_value=future), patch.object( ++ batch, "check_find_job", return_value=future ++ ) as check_find_job_mock, patch.object( ++ batch, "jid_gen", return_value="1236" ++ ): ++ batch.events_channel.local_client.run_job_async.return_value = future + batch.find_job({"foo", "bar"}) +- assert batch.event.io_loop.spawn_callback.call_args[0] == ( +- batch.check_find_job, ++ assert check_find_job_mock.call_args[0] == ( + {"foo"}, +- "1234", ++ "1236", + ) + + + def test_batch_check_find_job_did_not_return(batch): +- batch.event = MagicMock() + batch.active = {"foo"} + batch.find_job_returned = set() +- batch.patterns = {("salt/job/1234/ret/*", "find_job_return")} +- batch.check_find_job({"foo"}, jid="1234") +- assert batch.find_job_returned == set() +- assert batch.active == set() +- assert len(batch.event.io_loop.add_callback.mock_calls) == 0 ++ future = salt.ext.tornado.gen.Future() ++ future.set_result({}) ++ with patch.object(batch, "find_job", return_value=future) as find_job_mock: ++ batch.check_find_job({"foo"}, jid="1234") ++ assert batch.find_job_returned == set() ++ assert batch.active == set() ++ find_job_mock.assert_not_called() + + + def test_batch_check_find_job_did_return(batch): +- batch.event = MagicMock() + batch.find_job_returned = {"foo"} +- batch.patterns = {("salt/job/1234/ret/*", "find_job_return")} +- batch.check_find_job({"foo"}, jid="1234") +- assert batch.event.io_loop.spawn_callback.call_args[0] == (batch.find_job, {"foo"}) ++ future = salt.ext.tornado.gen.Future() ++ future.set_result({}) ++ with patch.object(batch, "find_job", return_value=future) as find_job_mock: ++ batch.check_find_job({"foo"}, jid="1234") ++ find_job_mock.assert_called_once_with({"foo"}) + + + def test_batch_check_find_job_multiple_states(batch): +- batch.event = MagicMock() + # currently running minions + batch.active = {"foo", "bar"} + +@@ -372,21 +403,28 @@ def test_batch_check_find_job_multiple_states(batch): + # both not yet done but only 'foo' responded to find_job + not_done = {"foo", "bar"} + +- batch.patterns = {("salt/job/1234/ret/*", "find_job_return")} +- batch.check_find_job(not_done, jid="1234") ++ future = salt.ext.tornado.gen.Future() ++ future.set_result({}) + +- # assert 'bar' removed from active +- assert batch.active == {"foo"} ++ with patch.object(batch, "schedule_next", return_value=future), patch.object( ++ batch, "find_job", return_value=future ++ ) as find_job_mock: ++ batch.check_find_job(not_done, jid="1234") + +- # assert 'bar' added to timedout_minions +- assert batch.timedout_minions == {"bar", "faz"} ++ # assert 'bar' removed from active ++ assert batch.active == {"foo"} + +- # assert 'find_job' schedueled again only for 'foo' +- assert batch.event.io_loop.spawn_callback.call_args[0] == (batch.find_job, {"foo"}) ++ # assert 'bar' added to timedout_minions ++ assert batch.timedout_minions == {"bar", "faz"} ++ ++ # assert 'find_job' schedueled again only for 'foo' ++ find_job_mock.assert_called_once_with({"foo"}) + + + def test_only_on_run_next_is_scheduled(batch): +- batch.event = MagicMock() ++ future = salt.ext.tornado.gen.Future() ++ future.set_result({}) + batch.scheduled = True +- batch.schedule_next() +- assert len(batch.event.io_loop.spawn_callback.mock_calls) == 0 ++ with patch.object(batch, "run_next", return_value=future) as run_next_mock: ++ batch.schedule_next() ++ run_next_mock.assert_not_called() +-- +2.45.0 + diff --git a/prevent-possible-exception-in-tornado.concurrent.fut.patch b/prevent-possible-exception-in-tornado.concurrent.fut.patch new file mode 100644 index 0000000..cdaf248 --- /dev/null +++ b/prevent-possible-exception-in-tornado.concurrent.fut.patch @@ -0,0 +1,37 @@ +From 859be3e8213d4b5814a18fa6e9f3f17bf65b3183 Mon Sep 17 00:00:00 2001 +From: Victor Zhestkov +Date: Wed, 15 May 2024 09:45:26 +0200 +Subject: [PATCH] Prevent possible exception in + tornado.concurrent.Future._set_done + +--- + salt/ext/tornado/concurrent.py | 13 +++++++------ + 1 file changed, 7 insertions(+), 6 deletions(-) + +diff --git a/salt/ext/tornado/concurrent.py b/salt/ext/tornado/concurrent.py +index bea09ba125..011808ed27 100644 +--- a/salt/ext/tornado/concurrent.py ++++ b/salt/ext/tornado/concurrent.py +@@ -330,12 +330,13 @@ class Future(object): + + def _set_done(self): + self._done = True +- for cb in self._callbacks: +- try: +- cb(self) +- except Exception: +- app_log.exception("Exception in callback %r for %r", cb, self) +- self._callbacks = None ++ if self._callbacks: ++ for cb in self._callbacks: ++ try: ++ cb(self) ++ except Exception: ++ app_log.exception("Exception in callback %r for %r", cb, self) ++ self._callbacks = None + + # On Python 3.3 or older, objects with a destructor part of a reference + # cycle are never destroyed. It's no longer the case on Python 3.4 thanks to +-- +2.45.0 + diff --git a/remove-redundant-_file_find-call-to-the-master.patch b/remove-redundant-_file_find-call-to-the-master.patch new file mode 100644 index 0000000..3487c29 --- /dev/null +++ b/remove-redundant-_file_find-call-to-the-master.patch @@ -0,0 +1,40 @@ +From 768495df67725ae840b06d321bef8299eca2368a Mon Sep 17 00:00:00 2001 +From: Victor Zhestkov +Date: Wed, 15 May 2024 09:47:40 +0200 +Subject: [PATCH] Remove redundant `_file_find` call to the master + +--- + salt/fileclient.py | 10 ++-------- + 1 file changed, 2 insertions(+), 8 deletions(-) + +diff --git a/salt/fileclient.py b/salt/fileclient.py +index f01a86dd0d..f4b8d76dbe 100644 +--- a/salt/fileclient.py ++++ b/salt/fileclient.py +@@ -1162,10 +1162,7 @@ class RemoteClient(Client): + if senv: + saltenv = senv + +- if not salt.utils.platform.is_windows(): +- hash_server, stat_server = self.hash_and_stat_file(path, saltenv) +- else: +- hash_server = self.hash_file(path, saltenv) ++ hash_server = self.hash_file(path, saltenv) + + # Check if file exists on server, before creating files and + # directories +@@ -1206,10 +1203,7 @@ class RemoteClient(Client): + ) + + if dest2check and os.path.isfile(dest2check): +- if not salt.utils.platform.is_windows(): +- hash_local, stat_local = self.hash_and_stat_file(dest2check, saltenv) +- else: +- hash_local = self.hash_file(dest2check, saltenv) ++ hash_local = self.hash_file(dest2check, saltenv) + + if hash_local == hash_server: + return dest2check +-- +2.45.0 + diff --git a/remove-unused-import-causing-delays-on-starting-salt.patch b/remove-unused-import-causing-delays-on-starting-salt.patch new file mode 100644 index 0000000..12e6843 --- /dev/null +++ b/remove-unused-import-causing-delays-on-starting-salt.patch @@ -0,0 +1,25 @@ +From 032a470b6a2dbffbdee52c4f6a2a7fc40e3b6f85 Mon Sep 17 00:00:00 2001 +From: Victor Zhestkov +Date: Wed, 15 May 2024 09:18:42 +0200 +Subject: [PATCH] Remove unused import causing delays on starting + salt-master + +--- + salt/utils/minions.py | 1 - + 1 file changed, 1 deletion(-) + +diff --git a/salt/utils/minions.py b/salt/utils/minions.py +index 21d34b7edd..082b698522 100644 +--- a/salt/utils/minions.py ++++ b/salt/utils/minions.py +@@ -9,7 +9,6 @@ import logging + import os + import re + +-import salt.auth.ldap + import salt.cache + import salt.payload + import salt.roster +-- +2.45.0 + diff --git a/salt.changes b/salt.changes index ac201a8..89f0333 100644 --- a/salt.changes +++ b/salt.changes @@ -1,3 +1,35 @@ +------------------------------------------------------------------- +Mon May 27 11:07:26 UTC 2024 - Pablo Suárez Hernández + +- Several fixes for tests to avoid errors and failures in some OSes +- Speed up salt.matcher.confirm_top by using __context__ +- Do not call the async wrapper calls with the separate thread +- Prevent OOM with high amount of batch async calls (bsc#1216063) +- Add missing contextvars dependency in salt.version +- Skip tests for unsupported algorithm on old OpenSSL version +- Remove redundant `_file_find` call to the master +- Prevent possible exception in tornado.concurrent.Future._set_done +- Make reactor engine less blocking the EventPublisher +- Make salt-master self recoverable on killing EventPublisher +- Improve broken events catching and reporting +- Make logging calls lighter +- Remove unused import causing delays on starting salt-master + +- Added: + * improve-broken-events-catching-and-reporting.patch + * add-missing-contextvars-dependency-in-salt.version.patch + * prevent-oom-with-high-amount-of-batch-async-calls-bs.patch + * speed-up-salt.matcher.confirm_top-by-using-__context.patch + * remove-redundant-_file_find-call-to-the-master.patch + * make-logging-calls-lighter.patch + * make-salt-master-self-recoverable-on-killing-eventpu.patch + * skip-tests-for-unsupported-algorithm-on-old-openssl-.patch + * remove-unused-import-causing-delays-on-starting-salt.patch + * do-not-call-the-async-wrapper-calls-with-the-separat.patch + * prevent-possible-exception-in-tornado.concurrent.fut.patch + * several-fixes-for-tests-to-avoid-errors-and-failures.patch + * make-reactor-engine-less-blocking-the-eventpublisher.patch + ------------------------------------------------------------------- Mon May 13 15:26:19 UTC 2024 - Pablo Suárez Hernández diff --git a/salt.spec b/salt.spec index 6595c06..5ecdc12 100644 --- a/salt.spec +++ b/salt.spec @@ -356,7 +356,7 @@ Patch101: fix-problematic-tests-and-allow-smooth-tests-executi.patch # PATCH-FIX_OPENSUSE: https://github.com/openSUSE/salt/pull/628 Patch102: make-importing-seco.range-thread-safe-bsc-1211649.patch # PATCH-FIX_UPSTREAM https://github.com/saltstack/salt/pull/66130 -PAtch103: fix-tests-failures-and-errors-when-detected-on-vm-ex.patch +Patch103: fix-tests-failures-and-errors-when-detected-on-vm-ex.patch # PATCH-FIX_UPSTREAM https://github.com/saltstack/salt/pull/66234 (modified at Patch106) Patch104: decode-oscap-byte-stream-to-string-bsc-1219001.patch ### Commits to make Salt compatible with Python 3.11 (and 3.6) @@ -370,6 +370,32 @@ Patch104: decode-oscap-byte-stream-to-string-bsc-1219001.patch Patch105: fix-salt-warnings-and-testuite-for-python-3.11-635.patch # PATCH-FIX_OPENSUSE: https://github.com/openSUSE/salt/pull/639 Patch106: switch-oscap-encoding-to-utf-8-639.patch +# PATCH-FIX_UPSTREAM https://github.com/saltstack/salt/pull/65982 +Patch107: remove-unused-import-causing-delays-on-starting-salt.patch +# PATCH-FIX_UPSTREAM https://github.com/saltstack/salt/pull/66024 +Patch108: make-logging-calls-lighter.patch +# PATCH-FIX_UPSTREAM https://github.com/saltstack/salt/pull/66034 +Patch109: improve-broken-events-catching-and-reporting.patch +# PATCH-FIX_OPENSUSE: https://github.com/openSUSE/salt/pull/633 +Patch110: make-salt-master-self-recoverable-on-killing-eventpu.patch +# PATCH-FIX_UPSTREAM https://github.com/saltstack/salt/pull/66158 +Patch111: make-reactor-engine-less-blocking-the-eventpublisher.patch +# PATCH-FIX_OPENSUSE: https://github.com/openSUSE/salt/pull/638 +Patch112: prevent-possible-exception-in-tornado.concurrent.fut.patch +# PATCH-FIX_UPSTREAM https://github.com/saltstack/salt/pull/66455 +Patch113: remove-redundant-_file_find-call-to-the-master.patch +# PATCH-FIX_OPENSUSE: https://github.com/openSUSE/salt/pull/646 +Patch114: skip-tests-for-unsupported-algorithm-on-old-openssl-.patch +# PATCH-FIX_OPENSUSE: https://github.com/openSUSE/salt/pull/652 +Patch115: add-missing-contextvars-dependency-in-salt.version.patch +# PATCH-FIX_OPENSUSE: https://github.com/openSUSE/salt/pull/640 +Patch116: prevent-oom-with-high-amount-of-batch-async-calls-bs.patch +# PATCH-FIX_UPSTREAM https://github.com/saltstack/salt/pull/65983 +Patch117: do-not-call-the-async-wrapper-calls-with-the-separat.patch +# PATCH-FIX_UPSTREAM https://github.com/saltstack/salt/pull/66494 +Patch118: speed-up-salt.matcher.confirm_top-by-using-__context.patch +# PATCH-FIX_UPSTREAM https://github.com/saltstack/salt/pull/66593 +Patch119: several-fixes-for-tests-to-avoid-errors-and-failures.patch ### IMPORTANT: The line below is used as a snippet marker. Do not touch it. ### SALT PATCHES LIST END diff --git a/several-fixes-for-tests-to-avoid-errors-and-failures.patch b/several-fixes-for-tests-to-avoid-errors-and-failures.patch new file mode 100644 index 0000000..2272f9c --- /dev/null +++ b/several-fixes-for-tests-to-avoid-errors-and-failures.patch @@ -0,0 +1,557 @@ +From 30fd274d606b565a0a63fbc7f2fd67aec3c495b1 Mon Sep 17 00:00:00 2001 +From: =?UTF-8?q?Pablo=20Su=C3=A1rez=20Hern=C3=A1ndez?= + +Date: Mon, 27 May 2024 12:01:53 +0100 +Subject: [PATCH] Several fixes for tests to avoid errors and failures + in some OSes + +* test_pip_state: skip tests which requires virtualenv CLI + +* test_syndic_eauth: skip when using an incompatible Docker version + +* test_pip: skip tests which requires virtualenv CLI + +* Some more extra fixes for tests + +* Enhance paths to look for 'sftp-server' + +* Do trigger errors on newer docker-py version + +* Make test_search_not_found to not fail on transactional systems + +* Add `@pytest.mark.flaky_jail` to `tests/pytests/integration/ssh/test_ssh_setup.py::test_setup` + +Signed-off-by: Pedro Algarvio + +* Prevent crashing if mountpoint does not exist anymore + +* Skip failing tests on transactional systems + +* test_consul.py: do not produce error if consul is not available + +* Redefine max retries for some flaky tests + +* test_virt.py: skip as CI containers are not compatible with Salt 3006 + +* test_schema.py: Adjust expectations to newer jsonschema versions + +* Apply suggestions from code review + +Co-authored-by: Pedro Algarvio + +--------- + +Signed-off-by: Pedro Algarvio +Co-authored-by: Pedro Algarvio +Co-authored-by: Pedro Algarvio +--- + salt/modules/status.py | 11 +++--- + tests/conftest.py | 2 + + tests/integration/modules/test_pip.py | 4 ++ + tests/pytests/functional/cache/test_consul.py | 4 ++ + tests/pytests/functional/modules/test_pip.py | 1 + + .../functional/states/test_pip_state.py | 11 ++++++ + tests/pytests/functional/states/test_pkg.py | 4 ++ + .../integration/cli/test_syndic_eauth.py | 4 +- + .../integration/daemons/test_memory_leak.py | 2 +- + .../integration/modules/test_cmdmod.py | 1 + + .../pytests/integration/modules/test_virt.py | 4 ++ + .../integration/ssh/test_pre_flight.py | 4 ++ + .../pytests/integration/ssh/test_ssh_setup.py | 1 + + tests/pytests/scenarios/setup/test_man.py | 6 +++ + .../unit/modules/dockermod/test_module.py | 20 +++++----- + tests/unit/modules/test_zypperpkg.py | 1 + + tests/unit/utils/test_schema.py | 37 +++++++++++++++---- + 17 files changed, 92 insertions(+), 25 deletions(-) + +diff --git a/salt/modules/status.py b/salt/modules/status.py +index 4b0a3b0d400..33e5d7b8df5 100644 +--- a/salt/modules/status.py ++++ b/salt/modules/status.py +@@ -1052,11 +1052,12 @@ def diskusage(*args): + # query the filesystems disk usage + ret = {} + for path in selected: +- fsstats = os.statvfs(path) +- blksz = fsstats.f_bsize +- available = fsstats.f_bavail * blksz +- total = fsstats.f_blocks * blksz +- ret[path] = {"available": available, "total": total} ++ if os.path.exists(path): ++ fsstats = os.statvfs(path) ++ blksz = fsstats.f_bsize ++ available = fsstats.f_bavail * blksz ++ total = fsstats.f_blocks * blksz ++ ret[path] = {"available": available, "total": total} + return ret + + +diff --git a/tests/conftest.py b/tests/conftest.py +index a7777c2cea6..ad57b4adef4 100644 +--- a/tests/conftest.py ++++ b/tests/conftest.py +@@ -1428,6 +1428,8 @@ def sshd_server(salt_factories, sshd_config_dir, salt_master, grains): + "/usr/libexec/openssh/sftp-server", + # Arch Linux + "/usr/lib/ssh/sftp-server", ++ # openSUSE Tumbleweed and SL Micro 6.0 ++ "/usr/libexec/ssh/sftp-server", + ] + sftp_server_path = None + for path in sftp_server_paths: +diff --git a/tests/integration/modules/test_pip.py b/tests/integration/modules/test_pip.py +index 83457b467c8..d57e9cd2aea 100644 +--- a/tests/integration/modules/test_pip.py ++++ b/tests/integration/modules/test_pip.py +@@ -557,6 +557,10 @@ class PipModuleTest(ModuleCase): + @pytest.mark.skip_initial_gh_actions_failure( + reason="This was skipped on older golden images and is failing on newer." + ) ++ @pytest.mark.skipif( ++ bool(salt.utils.path.which("transactional-update")), ++ reason="Skipping on transactional systems", ++ ) + def test_system_pip3(self): + + self.run_function( +diff --git a/tests/pytests/functional/cache/test_consul.py b/tests/pytests/functional/cache/test_consul.py +index 30dc6925f26..996a1e932b6 100644 +--- a/tests/pytests/functional/cache/test_consul.py ++++ b/tests/pytests/functional/cache/test_consul.py +@@ -11,6 +11,10 @@ import salt.loader + from salt.utils.versions import Version + from tests.pytests.functional.cache.helpers import run_common_cache_tests + ++pytest.importorskip( ++ "consul", ++ reason="Please install python-consul package to use consul data cache driver", ++) + docker = pytest.importorskip("docker") + + log = logging.getLogger(__name__) +diff --git a/tests/pytests/functional/modules/test_pip.py b/tests/pytests/functional/modules/test_pip.py +index 9b87735b68f..e04baa7c43f 100644 +--- a/tests/pytests/functional/modules/test_pip.py ++++ b/tests/pytests/functional/modules/test_pip.py +@@ -22,6 +22,7 @@ from tests.support.helpers import VirtualEnv + ) + @pytest.mark.requires_network + @pytest.mark.slow_test ++@pytest.mark.skip_if_binaries_missing("virtualenv", reason="Needs virtualenv binary") + def test_list_available_packages(modules, pip_version, tmp_path): + with VirtualEnv(venv_dir=tmp_path, pip_requirement=pip_version) as virtualenv: + virtualenv.install("-U", pip_version) +diff --git a/tests/pytests/functional/states/test_pip_state.py b/tests/pytests/functional/states/test_pip_state.py +index 3fc6ac7a1df..1921751b5dc 100644 +--- a/tests/pytests/functional/states/test_pip_state.py ++++ b/tests/pytests/functional/states/test_pip_state.py +@@ -94,6 +94,7 @@ def test_pip_installed_removed(modules, states): + + + @pytest.mark.slow_test ++@pytest.mark.skip_if_binaries_missing("virtualenv", reason="Needs virtualenv binary") + def test_pip_installed_removed_venv(tmp_path, create_virtualenv, states): + venv_dir = tmp_path / "pip_installed_removed" + create_virtualenv(str(venv_dir)) +@@ -105,6 +106,7 @@ def test_pip_installed_removed_venv(tmp_path, create_virtualenv, states): + + + @pytest.mark.slow_test ++@pytest.mark.skip_if_binaries_missing("virtualenv", reason="Needs virtualenv binary") + def test_pip_installed_errors(tmp_path, modules, state_tree): + venv_dir = tmp_path / "pip-installed-errors" + # Since we don't have the virtualenv created, pip.installed will +@@ -141,6 +143,7 @@ pep8-pip: + assert state_return.result is True + + ++@pytest.mark.skip_if_binaries_missing("virtualenv", reason="Needs virtualenv binary") + def test_pip_installed_name_test_mode(tmp_path, create_virtualenv, states): + """ + Test pip.installed state while test=true +@@ -154,6 +157,7 @@ def test_pip_installed_name_test_mode(tmp_path, create_virtualenv, states): + assert name in ret.comment + + ++@pytest.mark.skip_if_binaries_missing("virtualenv", reason="Needs virtualenv binary") + def test_pip_installed_pkgs_test_mode(tmp_path, create_virtualenv, states): + """ + Test pip.installed state while test=true +@@ -168,6 +172,7 @@ def test_pip_installed_pkgs_test_mode(tmp_path, create_virtualenv, states): + + + @pytest.mark.slow_test ++@pytest.mark.skip_if_binaries_missing("virtualenv", reason="Needs virtualenv binary") + def test_issue_2028_pip_installed_state( + tmp_path, modules, state_tree, get_python_executable + ): +@@ -226,6 +231,7 @@ pep8-pip: + + + @pytest.mark.slow_test ++@pytest.mark.skip_if_binaries_missing("virtualenv", reason="Needs virtualenv binary") + def test_issue_2087_missing_pip(tmp_path, create_virtualenv, modules): + venv_dir = tmp_path / "issue-2087-missing-pip" + +@@ -271,6 +277,7 @@ pip.installed: + @pytest.mark.destructive_test + @pytest.mark.slow_test + @pytest.mark.skip_if_not_root ++@pytest.mark.skip_if_binaries_missing("virtualenv", reason="Needs virtualenv binary") + def test_issue_6912_wrong_owner(tmp_path, create_virtualenv, modules, states): + # Setup virtual environment directory to be used throughout the test + venv_dir = tmp_path / "6912-wrong-owner" +@@ -338,6 +345,7 @@ def test_issue_6912_wrong_owner(tmp_path, create_virtualenv, modules, states): + @pytest.mark.skip_on_darwin(reason="Test is flaky on macosx") + @pytest.mark.slow_test + @pytest.mark.skip_if_not_root ++@pytest.mark.skip_if_binaries_missing("virtualenv", reason="Needs virtualenv binary") + def test_issue_6912_wrong_owner_requirements_file( + tmp_path, create_virtualenv, state_tree, modules, states + ): +@@ -409,6 +417,7 @@ def test_issue_6912_wrong_owner_requirements_file( + + @pytest.mark.destructive_test + @pytest.mark.slow_test ++@pytest.mark.skip_if_binaries_missing("virtualenv", reason="Needs virtualenv binary") + def test_issue_6833_pip_upgrade_pip(tmp_path, create_virtualenv, modules, states): + # Create the testing virtualenv + if sys.platform == "win32": +@@ -465,6 +474,7 @@ def test_issue_6833_pip_upgrade_pip(tmp_path, create_virtualenv, modules, states + + + @pytest.mark.slow_test ++@pytest.mark.skip_if_binaries_missing("virtualenv", reason="Needs virtualenv binary") + def test_pip_installed_specific_env( + tmp_path, state_tree_prod, states, create_virtualenv + ): +@@ -514,6 +524,7 @@ def test_pip_installed_specific_env( + + + @pytest.mark.slow_test ++@pytest.mark.skip_if_binaries_missing("virtualenv", reason="Needs virtualenv binary") + def test_22359_pip_installed_unless_does_not_trigger_warnings( + create_virtualenv, tmp_path, states + ): +diff --git a/tests/pytests/functional/states/test_pkg.py b/tests/pytests/functional/states/test_pkg.py +index 864c1d025f3..9e5a8350ad9 100644 +--- a/tests/pytests/functional/states/test_pkg.py ++++ b/tests/pytests/functional/states/test_pkg.py +@@ -19,6 +19,10 @@ pytestmark = [ + pytest.mark.slow_test, + pytest.mark.skip_if_not_root, + pytest.mark.destructive_test, ++ pytest.mark.skipif( ++ bool(salt.utils.path.which("transactional-update")), ++ reason="Skipping on transactional systems", ++ ), + ] + + +diff --git a/tests/pytests/integration/cli/test_syndic_eauth.py b/tests/pytests/integration/cli/test_syndic_eauth.py +index 218022b9e3c..8dcdd3fbd28 100644 +--- a/tests/pytests/integration/cli/test_syndic_eauth.py ++++ b/tests/pytests/integration/cli/test_syndic_eauth.py +@@ -6,7 +6,9 @@ import time + + import pytest + +-docker = pytest.importorskip("docker") ++from tests.conftest import CODE_DIR ++ ++docker = pytest.importorskip("docker", minversion="4.0.0") + + INSIDE_CONTAINER = os.getenv("HOSTNAME", "") == "salt-test-container" + +diff --git a/tests/pytests/integration/daemons/test_memory_leak.py b/tests/pytests/integration/daemons/test_memory_leak.py +index 8157091c44e..f2c5307f1a5 100644 +--- a/tests/pytests/integration/daemons/test_memory_leak.py ++++ b/tests/pytests/integration/daemons/test_memory_leak.py +@@ -49,7 +49,7 @@ def file_add_delete_sls(testfile_path, base_env_state_tree_root_dir): + + @pytest.mark.skip_on_darwin(reason="MacOS is a spawning platform, won't work") + @pytest.mark.skipif(GITHUB_ACTIONS, reason="Test is failing in GitHub Actions") +-@pytest.mark.flaky(max_runs=4) ++@pytest.mark.flaky(max_runs=10) + def test_memory_leak(salt_cli, salt_minion, file_add_delete_sls): + max_usg = None + +diff --git a/tests/pytests/integration/modules/test_cmdmod.py b/tests/pytests/integration/modules/test_cmdmod.py +index d9c326c3f0a..d0b993ddbcf 100644 +--- a/tests/pytests/integration/modules/test_cmdmod.py ++++ b/tests/pytests/integration/modules/test_cmdmod.py +@@ -63,6 +63,7 @@ def test_avoid_injecting_shell_code_as_root( + + + @pytest.mark.slow_test ++@pytest.mark.flaky(max_runs=4) + def test_blacklist_glob(salt_call_cli): + """ + cmd_blacklist_glob +diff --git a/tests/pytests/integration/modules/test_virt.py b/tests/pytests/integration/modules/test_virt.py +index 572923764bb..a1cd577fe76 100644 +--- a/tests/pytests/integration/modules/test_virt.py ++++ b/tests/pytests/integration/modules/test_virt.py +@@ -26,6 +26,10 @@ pytestmark = [ + Version(docker.__version__) < Version("4.0.0"), + reason="Test does not work in this version of docker-py", + ), ++ pytest.mark.skipif( ++ salt.version.__saltstack_version__.major <= 3006, ++ reason="CI containers are not compatible with this Salt version", ++ ), + ] + + +diff --git a/tests/pytests/integration/ssh/test_pre_flight.py b/tests/pytests/integration/ssh/test_pre_flight.py +index 09c65d29430..ac32b8d90fd 100644 +--- a/tests/pytests/integration/ssh/test_pre_flight.py ++++ b/tests/pytests/integration/ssh/test_pre_flight.py +@@ -235,6 +235,10 @@ def demote(user_uid, user_gid): + + + @pytest.mark.slow_test ++@pytest.mark.skipif( ++ bool(salt.utils.path.which("transactional-update")), ++ reason="Skipping on transactional systems", ++) + def test_ssh_pre_flight_perms(salt_ssh_cli, caplog, _create_roster, account): + """ + Test to ensure standard user cannot run pre flight script +diff --git a/tests/pytests/integration/ssh/test_ssh_setup.py b/tests/pytests/integration/ssh/test_ssh_setup.py +index 97494bed36b..3b4cede85f8 100644 +--- a/tests/pytests/integration/ssh/test_ssh_setup.py ++++ b/tests/pytests/integration/ssh/test_ssh_setup.py +@@ -161,6 +161,7 @@ def salt_ssh_cli( + ) + + ++@pytest.mark.flaky_jail + def test_setup(salt_ssh_cli, ssh_container_name, ssh_sub_container_name, ssh_password): + """ + Test salt-ssh setup works +diff --git a/tests/pytests/scenarios/setup/test_man.py b/tests/pytests/scenarios/setup/test_man.py +index 28f0d6285a3..f10fe711f2d 100644 +--- a/tests/pytests/scenarios/setup/test_man.py ++++ b/tests/pytests/scenarios/setup/test_man.py +@@ -9,6 +9,9 @@ import pytest + + import salt.utils.platform + from salt.modules.virtualenv_mod import KNOWN_BINARY_NAMES ++from tests.conftest import CODE_DIR ++ ++MISSING_SETUP_PY_FILE = not CODE_DIR.joinpath("setup.py").exists() + + pytestmark = [ + pytest.mark.core_test, +@@ -16,6 +19,9 @@ pytestmark = [ + pytest.mark.skip_on_aix, + pytest.mark.skip_initial_onedir_failure, + pytest.mark.skip_if_binaries_missing(*KNOWN_BINARY_NAMES, check_all=False), ++ pytest.mark.skipif( ++ MISSING_SETUP_PY_FILE, reason="This test only work if setup.py is available" ++ ), + ] + + +diff --git a/tests/pytests/unit/modules/dockermod/test_module.py b/tests/pytests/unit/modules/dockermod/test_module.py +index 1ac7dff52a5..a87d1add167 100644 +--- a/tests/pytests/unit/modules/dockermod/test_module.py ++++ b/tests/pytests/unit/modules/dockermod/test_module.py +@@ -357,7 +357,7 @@ def test_update_mine(): + + + @pytest.mark.skipif( +- docker_mod.docker.version_info < (1, 5, 0), ++ docker_mod._get_docker_py_versioninfo() < (1, 5, 0), + reason="docker module must be installed to run this test or is too old. >=1.5.0", + ) + def test_list_networks(): +@@ -381,7 +381,7 @@ def test_list_networks(): + + + @pytest.mark.skipif( +- docker_mod.docker.version_info < (1, 5, 0), ++ docker_mod._get_docker_py_versioninfo() < (1, 5, 0), + reason="docker module must be installed to run this test or is too old. >=1.5.0", + ) + def test_create_network(): +@@ -425,7 +425,7 @@ def test_create_network(): + + + @pytest.mark.skipif( +- docker_mod.docker.version_info < (1, 5, 0), ++ docker_mod._get_docker_py_versioninfo() < (1, 5, 0), + reason="docker module must be installed to run this test or is too old. >=1.5.0", + ) + def test_remove_network(): +@@ -447,7 +447,7 @@ def test_remove_network(): + + + @pytest.mark.skipif( +- docker_mod.docker.version_info < (1, 5, 0), ++ docker_mod._get_docker_py_versioninfo() < (1, 5, 0), + reason="docker module must be installed to run this test or is too old. >=1.5.0", + ) + def test_inspect_network(): +@@ -469,7 +469,7 @@ def test_inspect_network(): + + + @pytest.mark.skipif( +- docker_mod.docker.version_info < (1, 5, 0), ++ docker_mod._get_docker_py_versioninfo() < (1, 5, 0), + reason="docker module must be installed to run this test or is too old. >=1.5.0", + ) + def test_connect_container_to_network(): +@@ -494,7 +494,7 @@ def test_connect_container_to_network(): + + + @pytest.mark.skipif( +- docker_mod.docker.version_info < (1, 5, 0), ++ docker_mod._get_docker_py_versioninfo() < (1, 5, 0), + reason="docker module must be installed to run this test or is too old. >=1.5.0", + ) + def test_disconnect_container_from_network(): +@@ -516,7 +516,7 @@ def test_disconnect_container_from_network(): + + + @pytest.mark.skipif( +- docker_mod.docker.version_info < (1, 5, 0), ++ docker_mod._get_docker_py_versioninfo() < (1, 5, 0), + reason="docker module must be installed to run this test or is too old. >=1.5.0", + ) + def test_list_volumes(): +@@ -542,7 +542,7 @@ def test_list_volumes(): + + + @pytest.mark.skipif( +- docker_mod.docker.version_info < (1, 5, 0), ++ docker_mod._get_docker_py_versioninfo() < (1, 5, 0), + reason="docker module must be installed to run this test or is too old. >=1.5.0", + ) + def test_create_volume(): +@@ -572,7 +572,7 @@ def test_create_volume(): + + + @pytest.mark.skipif( +- docker_mod.docker.version_info < (1, 5, 0), ++ docker_mod._get_docker_py_versioninfo() < (1, 5, 0), + reason="docker module must be installed to run this test or is too old. >=1.5.0", + ) + def test_remove_volume(): +@@ -594,7 +594,7 @@ def test_remove_volume(): + + + @pytest.mark.skipif( +- docker_mod.docker.version_info < (1, 5, 0), ++ docker_mod._get_docker_py_versioninfo() < (1, 5, 0), + reason="docker module must be installed to run this test or is too old. >=1.5.0", + ) + def test_inspect_volume(): +diff --git a/tests/unit/modules/test_zypperpkg.py b/tests/unit/modules/test_zypperpkg.py +index 6e5ca88895f..bd67b16745c 100644 +--- a/tests/unit/modules/test_zypperpkg.py ++++ b/tests/unit/modules/test_zypperpkg.py +@@ -2602,6 +2602,7 @@ pattern() = package-c""" + zypp_mock.assert_called_with(root=None, ignore_not_found=True) + xml_mock.nolock.noraise.xml.call.assert_called_with("search", "emacs") + ++ @patch("salt.utils.files.is_fcntl_available", MagicMock(return_value=False)) + def test_search_not_found(self): + """Test zypperpkg.search()""" + ret = { +diff --git a/tests/unit/utils/test_schema.py b/tests/unit/utils/test_schema.py +index 113c6836e07..a531dd93111 100644 +--- a/tests/unit/utils/test_schema.py ++++ b/tests/unit/utils/test_schema.py +@@ -531,7 +531,9 @@ class ConfigTestCase(TestCase): + jsonschema.validate( + {"personal_access_token": "foo"}, Requirements.serialize() + ) +- if JSONSCHEMA_VERSION >= Version("3.0.0"): ++ if JSONSCHEMA_VERSION >= Version("3.0.0") and JSONSCHEMA_VERSION < Version( ++ "4.8.0" ++ ): + self.assertIn( + "'ssh_key_file' is a required property", excinfo.exception.message + ) +@@ -899,13 +901,20 @@ class ConfigTestCase(TestCase): + except jsonschema.exceptions.ValidationError as exc: + self.fail("ValidationError raised: {}".format(exc)) + +- with self.assertRaises(jsonschema.exceptions.ValidationError) as excinfo: ++ if JSONSCHEMA_VERSION < Version("4.0.0"): ++ with self.assertRaises(jsonschema.exceptions.ValidationError) as excinfo: ++ jsonschema.validate( ++ {"item": "3"}, ++ TestConf.serialize(), ++ format_checker=jsonschema.FormatChecker(), ++ ) ++ self.assertIn("is not a", excinfo.exception.message) ++ else: + jsonschema.validate( + {"item": "3"}, + TestConf.serialize(), + format_checker=jsonschema.FormatChecker(), + ) +- self.assertIn("is not a", excinfo.exception.message) + + def test_datetime_config(self): + item = schema.DateTimeItem(title="Foo", description="Foo Item") +@@ -1878,7 +1887,9 @@ class ConfigTestCase(TestCase): + jsonschema.validate( + {"item": {"sides": "4", "color": "blue"}}, TestConf.serialize() + ) +- if JSONSCHEMA_VERSION >= Version("3.0.0"): ++ if JSONSCHEMA_VERSION >= Version("3.0.0") and JSONSCHEMA_VERSION < Version( ++ "4.8.0" ++ ): + self.assertIn("'4'", excinfo.exception.message) + self.assertIn("is not of type", excinfo.exception.message) + self.assertIn("'boolean'", excinfo.exception.message) +@@ -2003,7 +2014,9 @@ class ConfigTestCase(TestCase): + + with self.assertRaises(jsonschema.exceptions.ValidationError) as excinfo: + jsonschema.validate({"item": ["maybe"]}, TestConf.serialize()) +- if JSONSCHEMA_VERSION >= Version("3.0.0"): ++ if JSONSCHEMA_VERSION >= Version("3.0.0") and JSONSCHEMA_VERSION < Version( ++ "4.8.0" ++ ): + self.assertIn("'maybe'", excinfo.exception.message) + self.assertIn("is not one of", excinfo.exception.message) + self.assertIn("'yes'", excinfo.exception.message) +@@ -2067,7 +2080,9 @@ class ConfigTestCase(TestCase): + + with self.assertRaises(jsonschema.exceptions.ValidationError) as excinfo: + jsonschema.validate({"item": ["maybe"]}, TestConf.serialize()) +- if JSONSCHEMA_VERSION >= Version("3.0.0"): ++ if JSONSCHEMA_VERSION >= Version("3.0.0") and JSONSCHEMA_VERSION < Version( ++ "4.8.0" ++ ): + self.assertIn("'maybe'", excinfo.exception.message) + self.assertIn("is not one of", excinfo.exception.message) + self.assertIn("'yes'", excinfo.exception.message) +@@ -2154,11 +2169,17 @@ class ConfigTestCase(TestCase): + + with self.assertRaises(jsonschema.exceptions.ValidationError) as excinfo: + jsonschema.validate({"item": [True]}, TestConf.serialize()) +- self.assertIn("is not allowed for", excinfo.exception.message) ++ if JSONSCHEMA_VERSION >= Version("4.0.0"): ++ self.assertIn("should not be valid under", excinfo.exception.message) ++ else: ++ self.assertIn("is not allowed for", excinfo.exception.message) + + with self.assertRaises(jsonschema.exceptions.ValidationError) as excinfo: + jsonschema.validate({"item": [False]}, TestConf.serialize()) +- self.assertIn("is not allowed for", excinfo.exception.message) ++ if JSONSCHEMA_VERSION >= Version("4.0.0"): ++ self.assertIn("should not be valid under", excinfo.exception.message) ++ else: ++ self.assertIn("is not allowed for", excinfo.exception.message) + + def test_item_name_override_class_attrname(self): + class TestConf(schema.Schema): +-- +2.44.0 + + diff --git a/skip-tests-for-unsupported-algorithm-on-old-openssl-.patch b/skip-tests-for-unsupported-algorithm-on-old-openssl-.patch new file mode 100644 index 0000000..126c0df --- /dev/null +++ b/skip-tests-for-unsupported-algorithm-on-old-openssl-.patch @@ -0,0 +1,117 @@ +From d64311862c8cfdd7728aca504a22822df1b043c1 Mon Sep 17 00:00:00 2001 +From: Victor Zhestkov +Date: Wed, 15 May 2024 09:48:39 +0200 +Subject: [PATCH] Skip tests for unsupported algorithm on old OpenSSL + version + +--- + .../functional/modules/test_x509_v2.py | 51 +++++++++++++------ + 1 file changed, 35 insertions(+), 16 deletions(-) + +diff --git a/tests/pytests/functional/modules/test_x509_v2.py b/tests/pytests/functional/modules/test_x509_v2.py +index 8da31bed9d..c060ad2971 100644 +--- a/tests/pytests/functional/modules/test_x509_v2.py ++++ b/tests/pytests/functional/modules/test_x509_v2.py +@@ -9,6 +9,7 @@ from salt.utils.odict import OrderedDict + try: + import cryptography + import cryptography.x509 as cx509 ++ from cryptography.exceptions import UnsupportedAlgorithm + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives.serialization import ( + load_pem_private_key, +@@ -678,7 +679,10 @@ def crl_revoked(): + @pytest.mark.parametrize("algo", ["rsa", "ec", "ed25519", "ed448"]) + def test_create_certificate_self_signed(x509, algo, request): + privkey = request.getfixturevalue(f"{algo}_privkey") +- res = x509.create_certificate(signing_private_key=privkey, CN="success") ++ try: ++ res = x509.create_certificate(signing_private_key=privkey, CN="success") ++ except UnsupportedAlgorithm: ++ pytest.skip(f"Algorithm '{algo}' is not supported on this OpenSSL version") + assert res.startswith("-----BEGIN CERTIFICATE-----") + cert = _get_cert(res) + assert cert.subject.rfc4514_string() == "CN=success" +@@ -743,12 +747,15 @@ def test_create_certificate_raw(x509, rsa_privkey): + @pytest.mark.parametrize("algo", ["rsa", "ec", "ed25519", "ed448"]) + def test_create_certificate_from_privkey(x509, ca_key, ca_cert, algo, request): + privkey = request.getfixturevalue(f"{algo}_privkey") +- res = x509.create_certificate( +- signing_cert=ca_cert, +- signing_private_key=ca_key, +- private_key=privkey, +- CN="success", +- ) ++ try: ++ res = x509.create_certificate( ++ signing_cert=ca_cert, ++ signing_private_key=ca_key, ++ private_key=privkey, ++ CN="success", ++ ) ++ except UnsupportedAlgorithm: ++ pytest.skip(f"Algorithm '{algo}' is not supported on this OpenSSL version") + assert res.startswith("-----BEGIN CERTIFICATE-----") + cert = _get_cert(res) + assert cert.subject.rfc4514_string() == "CN=success" +@@ -788,12 +795,15 @@ def test_create_certificate_from_encrypted_privkey_with_encrypted_privkey( + @pytest.mark.parametrize("algo", ["rsa", "ec", "ed25519", "ed448"]) + def test_create_certificate_from_pubkey(x509, ca_key, ca_cert, algo, request): + pubkey = request.getfixturevalue(f"{algo}_pubkey") +- res = x509.create_certificate( +- signing_cert=ca_cert, +- signing_private_key=ca_key, +- public_key=pubkey, +- CN="success", +- ) ++ try: ++ res = x509.create_certificate( ++ signing_cert=ca_cert, ++ signing_private_key=ca_key, ++ public_key=pubkey, ++ CN="success", ++ ) ++ except UnsupportedAlgorithm: ++ pytest.skip(f"Algorithm '{algo}' is not supported on this OpenSSL version") + assert res.startswith("-----BEGIN CERTIFICATE-----") + cert = _get_cert(res) + assert cert.subject.rfc4514_string() == "CN=success" +@@ -1329,7 +1339,10 @@ def test_create_crl_raw(x509, crl_args): + @pytest.mark.parametrize("algo", ["rsa", "ec", "ed25519", "ed448"]) + def test_create_csr(x509, algo, request): + privkey = request.getfixturevalue(f"{algo}_privkey") +- res = x509.create_csr(private_key=privkey) ++ try: ++ res = x509.create_csr(private_key=privkey) ++ except UnsupportedAlgorithm: ++ pytest.skip(f"Algorithm '{algo}' is not supported on this OpenSSL version") + assert res.startswith("-----BEGIN CERTIFICATE REQUEST-----") + + +@@ -1444,7 +1457,10 @@ def test_create_private_key_raw(x509): + ) + def test_get_private_key_size(x509, algo, expected, request): + privkey = request.getfixturevalue(f"{algo}_privkey") +- res = x509.get_private_key_size(privkey) ++ try: ++ res = x509.get_private_key_size(privkey) ++ except UnsupportedAlgorithm: ++ pytest.skip(f"Algorithm '{algo}' is not supported on this OpenSSL version") + assert res == expected + + +@@ -1588,7 +1604,10 @@ def test_verify_private_key(x509, ca_key, ca_cert): + @pytest.mark.parametrize("algo", ["rsa", "ec", "ed25519", "ed448"]) + def test_verify_signature(x509, algo, request): + wrong_privkey = request.getfixturevalue(f"{algo}_privkey") +- privkey = x509.create_private_key(algo=algo) ++ try: ++ privkey = x509.create_private_key(algo=algo) ++ except UnsupportedAlgorithm: ++ pytest.skip(f"Algorithm '{algo}' is not supported on this OpenSSL version") + cert = x509.create_certificate(signing_private_key=privkey) + assert x509.verify_signature(cert, privkey) + assert not x509.verify_signature(cert, wrong_privkey) +-- +2.45.0 + diff --git a/speed-up-salt.matcher.confirm_top-by-using-__context.patch b/speed-up-salt.matcher.confirm_top-by-using-__context.patch new file mode 100644 index 0000000..b967b59 --- /dev/null +++ b/speed-up-salt.matcher.confirm_top-by-using-__context.patch @@ -0,0 +1,64 @@ +From a7e578b96d0e7ad8fdf4e5d62416ba6961b82315 Mon Sep 17 00:00:00 2001 +From: Victor Zhestkov +Date: Wed, 15 May 2024 11:50:52 +0200 +Subject: [PATCH] Speed up salt.matcher.confirm_top by using + __context__ + +* Speed up salt.matcher.confirm_top by using __context__ + +* Add test for getting matchers from __context__ in matchers.confirm_top +--- + salt/matchers/confirm_top.py | 6 +++++- + tests/pytests/unit/matchers/test_confirm_top.py | 15 +++++++++++++++ + 2 files changed, 20 insertions(+), 1 deletion(-) + +diff --git a/salt/matchers/confirm_top.py b/salt/matchers/confirm_top.py +index 7435f4ae94..d2edc99d8f 100644 +--- a/salt/matchers/confirm_top.py ++++ b/salt/matchers/confirm_top.py +@@ -21,7 +21,11 @@ def confirm_top(match, data, nodegroups=None): + if "match" in item: + matcher = item["match"] + +- matchers = salt.loader.matchers(__opts__) ++ if "matchers" in __context__: ++ matchers = __context__["matchers"] ++ else: ++ matchers = salt.loader.matchers(__opts__) ++ __context__["matchers"] = matchers + funcname = matcher + "_match.match" + if matcher == "nodegroup": + return matchers[funcname](match, nodegroups) +diff --git a/tests/pytests/unit/matchers/test_confirm_top.py b/tests/pytests/unit/matchers/test_confirm_top.py +index 514df323b6..f439fcf94a 100644 +--- a/tests/pytests/unit/matchers/test_confirm_top.py ++++ b/tests/pytests/unit/matchers/test_confirm_top.py +@@ -2,6 +2,7 @@ import pytest + + import salt.config + import salt.loader ++from tests.support.mock import patch + + + @pytest.fixture +@@ -12,3 +13,17 @@ def matchers(minion_opts): + def test_sanity(matchers): + match = matchers["confirm_top.confirm_top"] + assert match("*", []) is True ++ ++ ++@pytest.mark.parametrize("in_context", [False, True]) ++def test_matchers_from_context(matchers, in_context): ++ match = matchers["confirm_top.confirm_top"] ++ with patch.dict( ++ matchers.pack["__context__"], {"matchers": matchers} if in_context else {} ++ ), patch("salt.loader.matchers", return_value=matchers) as loader_matchers: ++ assert match("*", []) is True ++ assert id(matchers.pack["__context__"]["matchers"]) == id(matchers) ++ if in_context: ++ loader_matchers.assert_not_called() ++ else: ++ loader_matchers.assert_called_once() +-- +2.45.0 +