From 218a0bb719b1348265b98c346606374f4bbf2416957b91f346f1e75e198c045e Mon Sep 17 00:00:00 2001
From: Victor Zhestkov <vzhestkov@suse.com>
Date: Mon, 27 May 2024 11:14:47 +0000
Subject: [PATCH] Accepting request 1177104 from
 home:PSuarezHernandez:branches:systemsmanagement:saltstack

- Several fixes for tests to avoid errors and failures in some OSes
- Speed up salt.matcher.confirm_top by using __context__
- Do not call the async wrapper calls with the separate thread
- Prevent OOM with high amount of batch async calls (bsc#1216063)
- Add missing contextvars dependency in salt.version
- Skip tests for unsupported algorithm on old OpenSSL version
- Remove redundant `_file_find` call to the master
- Prevent possible exception in tornado.concurrent.Future._set_done
- Make reactor engine less blocking the EventPublisher
- Make salt-master self recoverable on killing EventPublisher
- Improve broken events catching and reporting
- Make logging calls lighter
- Remove unused import causing delays on starting salt-master
- Added:
  * improve-broken-events-catching-and-reporting.patch
  * add-missing-contextvars-dependency-in-salt.version.patch
  * prevent-oom-with-high-amount-of-batch-async-calls-bs.patch
  * speed-up-salt.matcher.confirm_top-by-using-__context.patch
  * remove-redundant-_file_find-call-to-the-master.patch
  * make-logging-calls-lighter.patch
  * make-salt-master-self-recoverable-on-killing-eventpu.patch
  * skip-tests-for-unsupported-algorithm-on-old-openssl-.patch
  * remove-unused-import-causing-delays-on-starting-salt.patch
  * do-not-call-the-async-wrapper-calls-with-the-separat.patch
  * prevent-possible-exception-in-tornado.concurrent.fut.patch
  * several-fixes-for-tests-to-avoid-errors-and-failures.patch
  * make-reactor-engine-less-blocking-the-eventpublisher.patch

OBS-URL: https://build.opensuse.org/request/show/1177104
OBS-URL: https://build.opensuse.org/package/show/systemsmanagement:saltstack/salt?expand=0&rev=243
---
 _lastrevision                                 |    2 +-
 ...ntextvars-dependency-in-salt.version.patch |   38 +
 ...async-wrapper-calls-with-the-separat.patch |  254 ++++
 ...broken-events-catching-and-reporting.patch |  202 +++
 make-logging-calls-lighter.patch              |  233 +++
 ...ine-less-blocking-the-eventpublisher.patch |  104 ++
 ...-self-recoverable-on-killing-eventpu.patch |  243 ++++
 ...-high-amount-of-batch-async-calls-bs.patch | 1272 +++++++++++++++++
 ...-exception-in-tornado.concurrent.fut.patch |   37 +
 ...undant-_file_find-call-to-the-master.patch |   40 +
 ...port-causing-delays-on-starting-salt.patch |   25 +
 salt.changes                                  |   32 +
 salt.spec                                     |   28 +-
 ...r-tests-to-avoid-errors-and-failures.patch |  557 ++++++++
 ...nsupported-algorithm-on-old-openssl-.patch |  117 ++
 ...tcher.confirm_top-by-using-__context.patch |   64 +
 16 files changed, 3246 insertions(+), 2 deletions(-)
 create mode 100644 add-missing-contextvars-dependency-in-salt.version.patch
 create mode 100644 do-not-call-the-async-wrapper-calls-with-the-separat.patch
 create mode 100644 improve-broken-events-catching-and-reporting.patch
 create mode 100644 make-logging-calls-lighter.patch
 create mode 100644 make-reactor-engine-less-blocking-the-eventpublisher.patch
 create mode 100644 make-salt-master-self-recoverable-on-killing-eventpu.patch
 create mode 100644 prevent-oom-with-high-amount-of-batch-async-calls-bs.patch
 create mode 100644 prevent-possible-exception-in-tornado.concurrent.fut.patch
 create mode 100644 remove-redundant-_file_find-call-to-the-master.patch
 create mode 100644 remove-unused-import-causing-delays-on-starting-salt.patch
 create mode 100644 several-fixes-for-tests-to-avoid-errors-and-failures.patch
 create mode 100644 skip-tests-for-unsupported-algorithm-on-old-openssl-.patch
 create mode 100644 speed-up-salt.matcher.confirm_top-by-using-__context.patch

diff --git a/_lastrevision b/_lastrevision
index 89b1a9f..2652c56 100644
--- a/_lastrevision
+++ b/_lastrevision
@@ -1 +1 @@
-2106e7b467fde79190f65e2de73559c3ba7e8888
\ No newline at end of file
+365aa2dd170197cd849f08270e3bd2376cd79be9
\ No newline at end of file
diff --git a/add-missing-contextvars-dependency-in-salt.version.patch b/add-missing-contextvars-dependency-in-salt.version.patch
new file mode 100644
index 0000000..6c8a26a
--- /dev/null
+++ b/add-missing-contextvars-dependency-in-salt.version.patch
@@ -0,0 +1,38 @@
+From 1a5716365e0c3b8d290759847f4046f28ee4b79f Mon Sep 17 00:00:00 2001
+From: Victor Zhestkov <vzhestkov@suse.com>
+Date: Wed, 15 May 2024 09:53:20 +0200
+Subject: [PATCH] Add missing contextvars dependency in salt.version
+
+---
+ salt/version.py                     | 1 +
+ tests/unit/states/test_pip_state.py | 2 +-
+ 2 files changed, 2 insertions(+), 1 deletion(-)
+
+diff --git a/salt/version.py b/salt/version.py
+index 44372830b2..b2643550e9 100644
+--- a/salt/version.py
++++ b/salt/version.py
+@@ -717,6 +717,7 @@ def dependency_information(include_salt_cloud=False):
+         ("docker-py", "docker", "__version__"),
+         ("packaging", "packaging", "__version__"),
+         ("looseversion", "looseversion", None),
++        ("contextvars", "contextvars", None),
+         ("relenv", "relenv", "__version__"),
+     ]
+ 
+diff --git a/tests/unit/states/test_pip_state.py b/tests/unit/states/test_pip_state.py
+index d70b115000..fe5d171a15 100644
+--- a/tests/unit/states/test_pip_state.py
++++ b/tests/unit/states/test_pip_state.py
+@@ -419,7 +419,7 @@ class PipStateInstallationErrorTest(TestCase):
+     def test_importable_installation_error(self):
+         extra_requirements = []
+         for name, version in salt.version.dependency_information():
+-            if name in ["PyYAML", "packaging", "looseversion"]:
++            if name in ["PyYAML", "packaging", "looseversion", "contextvars"]:
+                 extra_requirements.append("{}=={}".format(name, version))
+         failures = {}
+         pip_version_requirements = [
+-- 
+2.45.0
+
diff --git a/do-not-call-the-async-wrapper-calls-with-the-separat.patch b/do-not-call-the-async-wrapper-calls-with-the-separat.patch
new file mode 100644
index 0000000..1f4368b
--- /dev/null
+++ b/do-not-call-the-async-wrapper-calls-with-the-separat.patch
@@ -0,0 +1,254 @@
+From 4021f938ed1b64acd47ccaefc111197a1118ee4f Mon Sep 17 00:00:00 2001
+From: Victor Zhestkov <vzhestkov@suse.com>
+Date: Wed, 15 May 2024 11:48:46 +0200
+Subject: [PATCH] Do not call the async wrapper calls with the separate
+ thread
+
+* Do not run method with the distinct thread
+
+* Move test_asynchronous.py to pytests
+---
+ salt/utils/asynchronous.py                    | 25 +----
+ tests/pytests/unit/utils/test_asynchronous.py | 92 +++++++++++++++++++
+ tests/unit/utils/test_asynchronous.py         | 81 ----------------
+ 3 files changed, 94 insertions(+), 104 deletions(-)
+ create mode 100644 tests/pytests/unit/utils/test_asynchronous.py
+ delete mode 100644 tests/unit/utils/test_asynchronous.py
+
+diff --git a/salt/utils/asynchronous.py b/salt/utils/asynchronous.py
+index 88596a4a20..55a50cbcbf 100644
+--- a/salt/utils/asynchronous.py
++++ b/salt/utils/asynchronous.py
+@@ -2,11 +2,8 @@
+ Helpers/utils for working with tornado asynchronous stuff
+ """
+ 
+-
+ import contextlib
+ import logging
+-import sys
+-import threading
+ 
+ import salt.ext.tornado.concurrent
+ import salt.ext.tornado.ioloop
+@@ -111,30 +108,12 @@ class SyncWrapper:
+ 
+     def _wrap(self, key):
+         def wrap(*args, **kwargs):
+-            results = []
+-            thread = threading.Thread(
+-                target=self._target,
+-                args=(key, args, kwargs, results, self.io_loop),
++            return self.io_loop.run_sync(
++                lambda: getattr(self.obj, key)(*args, **kwargs)
+             )
+-            thread.start()
+-            thread.join()
+-            if results[0]:
+-                return results[1]
+-            else:
+-                exc_info = results[1]
+-                raise exc_info[1].with_traceback(exc_info[2])
+ 
+         return wrap
+ 
+-    def _target(self, key, args, kwargs, results, io_loop):
+-        try:
+-            result = io_loop.run_sync(lambda: getattr(self.obj, key)(*args, **kwargs))
+-            results.append(True)
+-            results.append(result)
+-        except Exception:  # pylint: disable=broad-except
+-            results.append(False)
+-            results.append(sys.exc_info())
+-
+     def __enter__(self):
+         return self
+ 
+diff --git a/tests/pytests/unit/utils/test_asynchronous.py b/tests/pytests/unit/utils/test_asynchronous.py
+new file mode 100644
+index 0000000000..2b5613e2bf
+--- /dev/null
++++ b/tests/pytests/unit/utils/test_asynchronous.py
+@@ -0,0 +1,92 @@
++import tornado.gen
++import tornado.ioloop
++
++import salt.utils.asynchronous as asynchronous
++
++
++class HelperA:
++
++    async_methods = [
++        "sleep",
++    ]
++
++    def __init__(self, io_loop=None):
++        pass
++
++    @tornado.gen.coroutine
++    def sleep(self):
++        yield tornado.gen.sleep(0.1)
++        raise tornado.gen.Return(True)
++
++
++class HelperB:
++
++    async_methods = [
++        "sleep",
++    ]
++
++    def __init__(self, a=None, io_loop=None):
++        if a is None:
++            a = asynchronous.SyncWrapper(HelperA)
++        self.a = a
++
++    @tornado.gen.coroutine
++    def sleep(self):
++        yield tornado.gen.sleep(0.1)
++        self.a.sleep()
++        raise tornado.gen.Return(False)
++
++
++def test_helpers():
++    """
++    Test that the helper classes do what we expect within a regular asynchronous env
++    """
++    io_loop = tornado.ioloop.IOLoop(make_current=False)
++    ret = io_loop.run_sync(lambda: HelperA().sleep())
++    assert ret is True
++
++    ret = io_loop.run_sync(lambda: HelperB().sleep())
++    assert ret is False
++
++
++def test_basic_wrap():
++    """
++    Test that we can wrap an asynchronous caller.
++    """
++    sync = asynchronous.SyncWrapper(HelperA)
++    ret = sync.sleep()
++    assert ret is True
++
++
++def test_basic_wrap_series():
++    """
++    Test that we can wrap an asynchronous caller and call the method in series.
++    """
++    sync = asynchronous.SyncWrapper(HelperA)
++    ret = sync.sleep()
++    assert ret is True
++    ret = sync.sleep()
++    assert ret is True
++
++
++def test_double():
++    """
++    Test when the asynchronous wrapper object itself creates a wrap of another thing
++
++    This works fine since the second wrap is based on the first's IOLoop so we
++    don't have to worry about complex start/stop mechanics
++    """
++    sync = asynchronous.SyncWrapper(HelperB)
++    ret = sync.sleep()
++    assert ret is False
++
++
++def test_double_sameloop():
++    """
++    Test asynchronous wrappers initiated from the same IOLoop, to ensure that
++    we don't wire up both to the same IOLoop (since it causes MANY problems).
++    """
++    a = asynchronous.SyncWrapper(HelperA)
++    sync = asynchronous.SyncWrapper(HelperB, (a,))
++    ret = sync.sleep()
++    assert ret is False
+diff --git a/tests/unit/utils/test_asynchronous.py b/tests/unit/utils/test_asynchronous.py
+deleted file mode 100644
+index e5bd974cb6..0000000000
+--- a/tests/unit/utils/test_asynchronous.py
++++ /dev/null
+@@ -1,81 +0,0 @@
+-import salt.ext.tornado.gen
+-import salt.ext.tornado.testing
+-import salt.utils.asynchronous as asynchronous
+-from salt.ext.tornado.testing import AsyncTestCase
+-
+-
+-class HelperA:
+-
+-    async_methods = [
+-        "sleep",
+-    ]
+-
+-    def __init__(self, io_loop=None):
+-        pass
+-
+-    @salt.ext.tornado.gen.coroutine
+-    def sleep(self):
+-        yield salt.ext.tornado.gen.sleep(0.1)
+-        raise salt.ext.tornado.gen.Return(True)
+-
+-
+-class HelperB:
+-
+-    async_methods = [
+-        "sleep",
+-    ]
+-
+-    def __init__(self, a=None, io_loop=None):
+-        if a is None:
+-            a = asynchronous.SyncWrapper(HelperA)
+-        self.a = a
+-
+-    @salt.ext.tornado.gen.coroutine
+-    def sleep(self):
+-        yield salt.ext.tornado.gen.sleep(0.1)
+-        self.a.sleep()
+-        raise salt.ext.tornado.gen.Return(False)
+-
+-
+-class TestSyncWrapper(AsyncTestCase):
+-    @salt.ext.tornado.testing.gen_test
+-    def test_helpers(self):
+-        """
+-        Test that the helper classes do what we expect within a regular asynchronous env
+-        """
+-        ha = HelperA()
+-        ret = yield ha.sleep()
+-        self.assertTrue(ret)
+-
+-        hb = HelperB()
+-        ret = yield hb.sleep()
+-        self.assertFalse(ret)
+-
+-    def test_basic_wrap(self):
+-        """
+-        Test that we can wrap an asynchronous caller.
+-        """
+-        sync = asynchronous.SyncWrapper(HelperA)
+-        ret = sync.sleep()
+-        self.assertTrue(ret)
+-
+-    def test_double(self):
+-        """
+-        Test when the asynchronous wrapper object itself creates a wrap of another thing
+-
+-        This works fine since the second wrap is based on the first's IOLoop so we
+-        don't have to worry about complex start/stop mechanics
+-        """
+-        sync = asynchronous.SyncWrapper(HelperB)
+-        ret = sync.sleep()
+-        self.assertFalse(ret)
+-
+-    def test_double_sameloop(self):
+-        """
+-        Test asynchronous wrappers initiated from the same IOLoop, to ensure that
+-        we don't wire up both to the same IOLoop (since it causes MANY problems).
+-        """
+-        a = asynchronous.SyncWrapper(HelperA)
+-        sync = asynchronous.SyncWrapper(HelperB, (a,))
+-        ret = sync.sleep()
+-        self.assertFalse(ret)
+-- 
+2.45.0
+
diff --git a/improve-broken-events-catching-and-reporting.patch b/improve-broken-events-catching-and-reporting.patch
new file mode 100644
index 0000000..a79544e
--- /dev/null
+++ b/improve-broken-events-catching-and-reporting.patch
@@ -0,0 +1,202 @@
+From 88bd54971d39b34d9728f3fe5fcb493cec3ff2fd Mon Sep 17 00:00:00 2001
+From: Victor Zhestkov <vzhestkov@suse.com>
+Date: Wed, 15 May 2024 09:22:11 +0200
+Subject: [PATCH] Improve broken events catching and reporting
+
+* Improve broken events catching and reporting
+
+* Add test of catching SaltDeserializationError on reading event
+
+* Add test for fire_ret_load
+---
+ salt/utils/event.py                          |  23 +++-
+ tests/pytests/unit/utils/event/test_event.py | 107 +++++++++++++++++++
+ 2 files changed, 128 insertions(+), 2 deletions(-)
+
+diff --git a/salt/utils/event.py b/salt/utils/event.py
+index e6d7b00520..ef048335ae 100644
+--- a/salt/utils/event.py
++++ b/salt/utils/event.py
+@@ -75,6 +75,7 @@ import salt.utils.platform
+ import salt.utils.process
+ import salt.utils.stringutils
+ import salt.utils.zeromq
++from salt.exceptions import SaltDeserializationError
+ 
+ log = logging.getLogger(__name__)
+ 
+@@ -461,7 +462,13 @@ class SaltEvent:
+             salt.utils.stringutils.to_bytes(TAGEND)
+         )  # split tag from data
+         mtag = salt.utils.stringutils.to_str(mtag)
+-        data = salt.payload.loads(mdata, encoding="utf-8")
++        try:
++            data = salt.payload.loads(mdata, encoding="utf-8")
++        except SaltDeserializationError:
++            log.warning(
++                "SaltDeserializationError on unpacking data, the payload could be incomplete"
++            )
++            raise
+         return mtag, data
+ 
+     def _get_match_func(self, match_type=None):
+@@ -583,6 +590,9 @@ class SaltEvent:
+                     raise
+                 else:
+                     return None
++            except SaltDeserializationError:
++                log.error("Unable to deserialize received event")
++                return None
+             except RuntimeError:
+                 return None
+ 
+@@ -889,6 +899,14 @@ class SaltEvent:
+             ret = load.get("return", {})
+             retcode = load["retcode"]
+ 
++        if not isinstance(ret, dict):
++            log.error(
++                "Event with bad payload received from '%s': %s",
++                load.get("id", "UNKNOWN"),
++                "".join(ret) if isinstance(ret, list) else ret,
++            )
++            return
++
+         try:
+             for tag, data in ret.items():
+                 data["retcode"] = retcode
+@@ -910,7 +928,8 @@ class SaltEvent:
+                     )
+         except Exception as exc:  # pylint: disable=broad-except
+             log.error(
+-                "Event iteration failed with exception: %s",
++                "Event from '%s' iteration failed with exception: %s",
++                load.get("id", "UNKNOWN"),
+                 exc,
+                 exc_info_on_loglevel=logging.DEBUG,
+             )
+diff --git a/tests/pytests/unit/utils/event/test_event.py b/tests/pytests/unit/utils/event/test_event.py
+index f4b6c15999..3eadfaf6ba 100644
+--- a/tests/pytests/unit/utils/event/test_event.py
++++ b/tests/pytests/unit/utils/event/test_event.py
+@@ -12,6 +12,7 @@ import salt.ext.tornado.ioloop
+ import salt.ext.tornado.iostream
+ import salt.utils.event
+ import salt.utils.stringutils
++from salt.exceptions import SaltDeserializationError
+ from salt.utils.event import SaltEvent
+ from tests.support.events import eventpublisher_process, eventsender_process
+ from tests.support.mock import patch
+@@ -340,3 +341,109 @@ def test_master_pub_permissions(sock_dir):
+         assert bool(os.lstat(p).st_mode & stat.S_IRUSR)
+         assert not bool(os.lstat(p).st_mode & stat.S_IRGRP)
+         assert not bool(os.lstat(p).st_mode & stat.S_IROTH)
++
++
++def test_event_unpack_with_SaltDeserializationError(sock_dir):
++    with eventpublisher_process(str(sock_dir)), salt.utils.event.MasterEvent(
++        str(sock_dir), listen=True
++    ) as me, patch.object(
++        salt.utils.event.log, "warning", autospec=True
++    ) as mock_log_warning, patch.object(
++        salt.utils.event.log, "error", autospec=True
++    ) as mock_log_error:
++        me.fire_event({"data": "foo1"}, "evt1")
++        me.fire_event({"data": "foo2"}, "evt2")
++        evt2 = me.get_event(tag="")
++        with patch("salt.payload.loads", side_effect=SaltDeserializationError):
++            evt1 = me.get_event(tag="")
++        _assert_got_event(evt2, {"data": "foo2"}, expected_failure=True)
++        assert evt1 is None
++        assert (
++            mock_log_warning.mock_calls[0].args[0]
++            == "SaltDeserializationError on unpacking data, the payload could be incomplete"
++        )
++        assert (
++            mock_log_error.mock_calls[0].args[0]
++            == "Unable to deserialize received event"
++        )
++
++
++def test_event_fire_ret_load():
++    event = SaltEvent(node=None)
++    test_load = {
++        "id": "minion_id.example.org",
++        "jid": "20240212095247760376",
++        "fun": "state.highstate",
++        "retcode": 254,
++        "return": {
++            "saltutil_|-sync_states_|-sync_states_|-sync_states": {
++                "result": True,
++            },
++            "saltutil_|-sync_modules_|-sync_modules_|-sync_modules": {
++                "result": False,
++            },
++        },
++    }
++    test_fire_event_data = {
++        "result": False,
++        "retcode": 254,
++        "jid": "20240212095247760376",
++        "id": "minion_id.example.org",
++        "success": False,
++        "return": "Error: saltutil.sync_modules",
++        "fun": "state.highstate",
++    }
++    test_unhandled_exc = "Unhandled exception running state.highstate"
++    test_traceback = [
++        "Traceback (most recent call last):\n",
++        "    Just an example of possible return as a list\n",
++    ]
++    with patch.object(
++        event, "fire_event", side_effect=[None, None, Exception]
++    ) as mock_fire_event, patch.object(
++        salt.utils.event.log, "error", autospec=True
++    ) as mock_log_error:
++        event.fire_ret_load(test_load)
++        assert len(mock_fire_event.mock_calls) == 2
++        assert mock_fire_event.mock_calls[0].args[0] == test_fire_event_data
++        assert mock_fire_event.mock_calls[0].args[1] == "saltutil.sync_modules"
++        assert mock_fire_event.mock_calls[1].args[0] == test_fire_event_data
++        assert (
++            mock_fire_event.mock_calls[1].args[1]
++            == "salt/job/20240212095247760376/sub/minion_id.example.org/error/state.highstate"
++        )
++        assert not mock_log_error.mock_calls
++
++        mock_log_error.reset_mock()
++
++        event.fire_ret_load(test_load)
++        assert (
++            mock_log_error.mock_calls[0].args[0]
++            == "Event from '%s' iteration failed with exception: %s"
++        )
++        assert mock_log_error.mock_calls[0].args[1] == "minion_id.example.org"
++
++        mock_log_error.reset_mock()
++        test_load["return"] = test_unhandled_exc
++
++        event.fire_ret_load(test_load)
++        assert (
++            mock_log_error.mock_calls[0].args[0]
++            == "Event with bad payload received from '%s': %s"
++        )
++        assert mock_log_error.mock_calls[0].args[1] == "minion_id.example.org"
++        assert (
++            mock_log_error.mock_calls[0].args[2]
++            == "Unhandled exception running state.highstate"
++        )
++
++        mock_log_error.reset_mock()
++        test_load["return"] = test_traceback
++
++        event.fire_ret_load(test_load)
++        assert (
++            mock_log_error.mock_calls[0].args[0]
++            == "Event with bad payload received from '%s': %s"
++        )
++        assert mock_log_error.mock_calls[0].args[1] == "minion_id.example.org"
++        assert mock_log_error.mock_calls[0].args[2] == "".join(test_traceback)
+-- 
+2.45.0
+
diff --git a/make-logging-calls-lighter.patch b/make-logging-calls-lighter.patch
new file mode 100644
index 0000000..194da2b
--- /dev/null
+++ b/make-logging-calls-lighter.patch
@@ -0,0 +1,233 @@
+From 48b6f57ece7eb9f58b8e6da40ec241b6df3f6d01 Mon Sep 17 00:00:00 2001
+From: Victor Zhestkov <vzhestkov@suse.com>
+Date: Wed, 15 May 2024 09:20:18 +0200
+Subject: [PATCH] Make logging calls lighter
+
+* Call set_lowest_log_level_by_opts with set_logging_options_dict
+
+* Fix the _logging test with setting minimum logging level
+
+* Fix test_deferred_stream_handler test
+
+* Fix vt.Terminal failing test: test_log_sanitize
+
+Fixes failing test added in a09b4f445052be66f0ac53fd01fa02bfa5b82ea6
+
+We can't assume tests are run at debug level, so this ensures the test
+passes regardless of what logging level is currently set by capturing
+the output in caplog at DEBUG which stream_stdout/stream_stderr uses by
+default.
+
+Signed-off-by: Joe Groocock <jgroocock@cloudflare.com>
+
+---------
+
+Signed-off-by: Joe Groocock <jgroocock@cloudflare.com>
+Co-authored-by: Joe Groocock <jgroocock@cloudflare.com>
+---
+ salt/_logging/impl.py                         |   1 +
+ .../integration/_logging/test_logging.py      | 106 ++++++++++++++++++
+ .../handlers/test_deferred_stream_handler.py  |   9 +-
+ tests/pytests/unit/utils/test_vt.py           |   6 +-
+ 4 files changed, 117 insertions(+), 5 deletions(-)
+ create mode 100644 tests/pytests/integration/_logging/test_logging.py
+
+diff --git a/salt/_logging/impl.py b/salt/_logging/impl.py
+index 2d1a276cb8..1d71cb8be8 100644
+--- a/salt/_logging/impl.py
++++ b/salt/_logging/impl.py
+@@ -426,6 +426,7 @@ def set_logging_options_dict(opts):
+     except AttributeError:
+         pass
+     set_logging_options_dict.__options_dict__ = opts
++    set_lowest_log_level_by_opts(opts)
+ 
+ 
+ def freeze_logging_options_dict():
+diff --git a/tests/pytests/integration/_logging/test_logging.py b/tests/pytests/integration/_logging/test_logging.py
+new file mode 100644
+index 0000000000..8e38f55b38
+--- /dev/null
++++ b/tests/pytests/integration/_logging/test_logging.py
+@@ -0,0 +1,106 @@
++import logging
++import os
++
++import pytest
++
++import salt._logging.impl as log_impl
++from tests.support.mock import MagicMock, patch
++
++pytestmark = [
++    pytest.mark.skip_on_windows(reason="Temporarily skipped on the newer golden images")
++]
++
++
++log = logging.getLogger(__name__)
++
++
++@pytest.fixture
++def configure_loader_modules():
++    return {log_impl: {}}
++
++
++def log_nameToLevel(name):
++    """
++    Return the numeric representation of textual logging level
++    """
++    # log level values
++    CRITICAL = 50
++    FATAL = CRITICAL
++    ERROR = 40
++    WARNING = 30
++    WARN = WARNING
++    INFO = 20
++    DEBUG = 10
++    NOTSET = 0
++
++    _nameToLevel = {
++        "CRITICAL": CRITICAL,
++        "FATAL": FATAL,
++        "ERROR": ERROR,
++        "WARN": WARNING,
++        "WARNING": WARNING,
++        "INFO": INFO,
++        "DEBUG": DEBUG,
++        "NOTSET": NOTSET,
++    }
++    return _nameToLevel.get(name, None)
++
++
++def test_lowest_log_level():
++    ret = log_impl.get_lowest_log_level()
++    assert ret is not None
++
++    log_impl.set_lowest_log_level(log_nameToLevel("DEBUG"))
++    ret = log_impl.get_lowest_log_level()
++    assert ret is log_nameToLevel("DEBUG")
++
++    log_impl.set_lowest_log_level(log_nameToLevel("WARNING"))
++    ret = log_impl.get_lowest_log_level()
++    assert ret is log_nameToLevel("WARNING")
++
++    opts = {"log_level": "ERROR", "log_level_logfile": "INFO"}
++    log_impl.set_lowest_log_level_by_opts(opts)
++    ret = log_impl.get_lowest_log_level()
++    assert ret is log_nameToLevel("INFO")
++
++
++def test_get_logging_level_from_string(caplog):
++    ret = log_impl.get_logging_level_from_string(None)
++    assert ret is log_nameToLevel("WARNING")
++
++    ret = log_impl.get_logging_level_from_string(log_nameToLevel("DEBUG"))
++    assert ret is log_nameToLevel("DEBUG")
++
++    ret = log_impl.get_logging_level_from_string("CRITICAL")
++    assert ret is log_nameToLevel("CRITICAL")
++
++    caplog.clear()
++    with caplog.at_level(logging.WARNING):
++        msg = "Could not translate the logging level string 'BADLEVEL' into an actual logging level integer. Returning 'logging.ERROR'."
++        ret = log_impl.get_logging_level_from_string("BADLEVEL")
++        assert ret is log_nameToLevel("ERROR")
++        assert msg in caplog.text
++
++
++def test_logfile_handler(caplog):
++    caplog.clear()
++    with caplog.at_level(logging.WARNING):
++        ret = log_impl.is_logfile_handler_configured()
++        assert ret is False
++
++        msg = "log_path setting is set to `None`. Nothing else to do"
++        log_path = None
++        assert log_impl.setup_logfile_handler(log_path) is None
++        assert msg in caplog.text
++
++
++def test_in_mainprocess():
++    ret = log_impl.in_mainprocess()
++    assert ret is True
++
++    curr_pid = os.getpid()
++    with patch(
++        "os.getpid", MagicMock(side_effect=[AttributeError, curr_pid, curr_pid])
++    ):
++        ret = log_impl.in_mainprocess()
++        assert ret is True
+diff --git a/tests/pytests/unit/_logging/handlers/test_deferred_stream_handler.py b/tests/pytests/unit/_logging/handlers/test_deferred_stream_handler.py
+index 76b0e88eca..62c0dff4be 100644
+--- a/tests/pytests/unit/_logging/handlers/test_deferred_stream_handler.py
++++ b/tests/pytests/unit/_logging/handlers/test_deferred_stream_handler.py
+@@ -9,6 +9,7 @@ import pytest
+ from pytestshellutils.utils.processes import terminate_process
+ 
+ from salt._logging.handlers import DeferredStreamHandler
++from salt._logging.impl import set_lowest_log_level
+ from salt.utils.nb_popen import NonBlockingPopen
+ from tests.support.helpers import CaptureOutput, dedent
+ from tests.support.runtests import RUNTIME_VARS
+@@ -20,7 +21,7 @@ def _sync_with_handlers_proc_target():
+ 
+     with CaptureOutput() as stds:
+         handler = DeferredStreamHandler(sys.stderr)
+-        handler.setLevel(logging.DEBUG)
++        set_lowest_log_level(logging.DEBUG)
+         formatter = logging.Formatter("%(message)s")
+         handler.setFormatter(formatter)
+         logging.root.addHandler(handler)
+@@ -45,7 +46,7 @@ def _deferred_write_on_flush_proc_target():
+ 
+     with CaptureOutput() as stds:
+         handler = DeferredStreamHandler(sys.stderr)
+-        handler.setLevel(logging.DEBUG)
++        set_lowest_log_level(logging.DEBUG)
+         formatter = logging.Formatter("%(message)s")
+         handler.setFormatter(formatter)
+         logging.root.addHandler(handler)
+@@ -126,7 +127,7 @@ def test_deferred_write_on_atexit(tmp_path):
+         # Just loop consuming output
+         while True:
+             if time.time() > max_time:
+-                pytest.fail("Script didn't exit after {} second".format(execution_time))
++                pytest.fail(f"Script didn't exit after {execution_time} second")
+ 
+             time.sleep(0.125)
+             _out = proc.recv()
+@@ -146,7 +147,7 @@ def test_deferred_write_on_atexit(tmp_path):
+     finally:
+         terminate_process(proc.pid, kill_children=True)
+     if b"Foo" not in err:
+-        pytest.fail("'Foo' should be in stderr and it's not: {}".format(err))
++        pytest.fail(f"'Foo' should be in stderr and it's not: {err}")
+ 
+ 
+ @pytest.mark.skip_on_windows(reason="Windows does not support SIGINT")
+diff --git a/tests/pytests/unit/utils/test_vt.py b/tests/pytests/unit/utils/test_vt.py
+index 438a6eb09c..c31b25e623 100644
+--- a/tests/pytests/unit/utils/test_vt.py
++++ b/tests/pytests/unit/utils/test_vt.py
+@@ -1,3 +1,4 @@
++import logging
+ import os
+ import signal
+ 
+@@ -43,10 +44,13 @@ def test_log_sanitize(test_cmd, caplog):
+         cmd,
+         log_stdout=True,
+         log_stderr=True,
++        log_stdout_level="debug",
++        log_stderr_level="debug",
+         log_sanitize=password,
+         stream_stdout=False,
+         stream_stderr=False,
+     )
+-    ret = term.recv()
++    with caplog.at_level(logging.DEBUG):
++        ret = term.recv()
+     assert password not in caplog.text
+     assert "******" in caplog.text
+-- 
+2.45.0
+
diff --git a/make-reactor-engine-less-blocking-the-eventpublisher.patch b/make-reactor-engine-less-blocking-the-eventpublisher.patch
new file mode 100644
index 0000000..ba1e393
--- /dev/null
+++ b/make-reactor-engine-less-blocking-the-eventpublisher.patch
@@ -0,0 +1,104 @@
+From 0d35f09288700f5c961567442c3fcc25838b8de4 Mon Sep 17 00:00:00 2001
+From: Victor Zhestkov <vzhestkov@suse.com>
+Date: Wed, 15 May 2024 09:44:21 +0200
+Subject: [PATCH] Make reactor engine less blocking the EventPublisher
+
+---
+ salt/utils/reactor.py | 45 +++++++++++++++++++++++++++----------------
+ 1 file changed, 28 insertions(+), 17 deletions(-)
+
+diff --git a/salt/utils/reactor.py b/salt/utils/reactor.py
+index 19420a51cf..78adad34da 100644
+--- a/salt/utils/reactor.py
++++ b/salt/utils/reactor.py
+@@ -1,10 +1,12 @@
+ """
+ Functions which implement running reactor jobs
+ """
++
+ import fnmatch
+ import glob
+ import logging
+ import os
++from threading import Lock
+ 
+ import salt.client
+ import salt.defaults.exitcodes
+@@ -194,13 +196,6 @@ class Reactor(salt.utils.process.SignalHandlingProcess, salt.state.Compiler):
+         self.resolve_aliases(chunks)
+         return chunks
+ 
+-    def call_reactions(self, chunks):
+-        """
+-        Execute the reaction state
+-        """
+-        for chunk in chunks:
+-            self.wrap.run(chunk)
+-
+     def run(self):
+         """
+         Enter into the server loop
+@@ -218,7 +213,7 @@ class Reactor(salt.utils.process.SignalHandlingProcess, salt.state.Compiler):
+         ) as event:
+             self.wrap = ReactWrap(self.opts)
+ 
+-            for data in event.iter_events(full=True):
++            for data in event.iter_events(full=True, auto_reconnect=True):
+                 # skip all events fired by ourselves
+                 if data["data"].get("user") == self.wrap.event_user:
+                     continue
+@@ -268,15 +263,9 @@ class Reactor(salt.utils.process.SignalHandlingProcess, salt.state.Compiler):
+                     if not self.is_leader:
+                         continue
+                     else:
+-                        reactors = self.list_reactors(data["tag"])
+-                        if not reactors:
+-                            continue
+-                        chunks = self.reactions(data["tag"], data["data"], reactors)
+-                        if chunks:
+-                            try:
+-                                self.call_reactions(chunks)
+-                            except SystemExit:
+-                                log.warning("Exit ignored by reactor")
++                        self.wrap.call_reactions(
++                            data, self.list_reactors, self.reactions
++                        )
+ 
+ 
+ class ReactWrap:
+@@ -297,6 +286,7 @@ class ReactWrap:
+ 
+     def __init__(self, opts):
+         self.opts = opts
++        self._run_lock = Lock()
+         if ReactWrap.client_cache is None:
+             ReactWrap.client_cache = salt.utils.cache.CacheDict(
+                 opts["reactor_refresh_interval"]
+@@ -480,3 +470,24 @@ class ReactWrap:
+         Wrap LocalCaller to execute remote exec functions locally on the Minion
+         """
+         self.client_cache["caller"].cmd(fun, *kwargs["arg"], **kwargs["kwarg"])
++
++    def _call_reactions(self, data, list_reactors, get_reactions):
++        reactors = list_reactors(data["tag"])
++        if not reactors:
++            return
++        chunks = get_reactions(data["tag"], data["data"], reactors)
++        if not chunks:
++            return
++        with self._run_lock:
++            try:
++                for chunk in chunks:
++                    self.run(chunk)
++            except Exception as exc:  # pylint: disable=broad-except
++                log.error(
++                    "Exception while calling the reactions: %s", exc, exc_info=True
++                )
++
++    def call_reactions(self, data, list_reactors, get_reactions):
++        return self.pool.fire_async(
++            self._call_reactions, args=(data, list_reactors, get_reactions)
++        )
+-- 
+2.45.0
+
diff --git a/make-salt-master-self-recoverable-on-killing-eventpu.patch b/make-salt-master-self-recoverable-on-killing-eventpu.patch
new file mode 100644
index 0000000..c353e49
--- /dev/null
+++ b/make-salt-master-self-recoverable-on-killing-eventpu.patch
@@ -0,0 +1,243 @@
+From 794b5d1aa7b8e880e9a21940183d241c6cbde9c9 Mon Sep 17 00:00:00 2001
+From: Victor Zhestkov <vzhestkov@suse.com>
+Date: Wed, 15 May 2024 09:42:23 +0200
+Subject: [PATCH] Make salt-master self recoverable on killing
+ EventPublisher
+
+* Implement timeout and tries to transport.ipc.IPCClient.send
+
+* Make timeout and tries configurable for fire_event
+
+* Add test of timeout and tries
+
+* Prevent exceptions from tornado Future on closing the IPC connection
+---
+ salt/transport/ipc.py                        | 73 +++++++++++++++++---
+ salt/utils/event.py                          | 21 +++++-
+ tests/pytests/unit/utils/event/test_event.py | 43 ++++++++++++
+ 3 files changed, 125 insertions(+), 12 deletions(-)
+
+diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py
+index cee100b086..6631781c5c 100644
+--- a/salt/transport/ipc.py
++++ b/salt/transport/ipc.py
+@@ -2,7 +2,6 @@
+ IPC transport classes
+ """
+ 
+-
+ import errno
+ import logging
+ import socket
+@@ -340,7 +339,8 @@ class IPCClient:
+             try:
+                 log.trace("IPCClient: Connecting to socket: %s", self.socket_path)
+                 yield self.stream.connect(sock_addr)
+-                self._connecting_future.set_result(True)
++                if self._connecting_future is not None:
++                    self._connecting_future.set_result(True)
+                 break
+             except Exception as e:  # pylint: disable=broad-except
+                 if self.stream.closed():
+@@ -350,7 +350,8 @@ class IPCClient:
+                     if self.stream is not None:
+                         self.stream.close()
+                         self.stream = None
+-                    self._connecting_future.set_exception(e)
++                    if self._connecting_future is not None:
++                        self._connecting_future.set_exception(e)
+                     break
+ 
+                 yield salt.ext.tornado.gen.sleep(1)
+@@ -365,7 +366,13 @@ class IPCClient:
+             return
+ 
+         self._closing = True
+-        self._connecting_future = None
++        if self._connecting_future is not None:
++            try:
++                self._connecting_future.set_result(True)
++                self._connecting_future.exception()  # pylint: disable=E0203
++            except Exception as e:  # pylint: disable=broad-except
++                log.warning("Unhandled connecting exception: %s", e, exc_info=True)
++            self._connecting_future = None
+ 
+         log.debug("Closing %s instance", self.__class__.__name__)
+ 
+@@ -435,8 +442,6 @@ class IPCMessageClient(IPCClient):
+         "close",
+     ]
+ 
+-    # FIXME timeout unimplemented
+-    # FIXME tries unimplemented
+     @salt.ext.tornado.gen.coroutine
+     def send(self, msg, timeout=None, tries=None):
+         """
+@@ -445,12 +450,60 @@ class IPCMessageClient(IPCClient):
+         If the socket is not currently connected, a connection will be established.
+ 
+         :param dict msg: The message to be sent
+-        :param int timeout: Timeout when sending message (Currently unimplemented)
++        :param int timeout: Timeout when sending message
++        :param int tries: Maximum numer of tries to send message
+         """
+-        if not self.connected():
+-            yield self.connect()
++        if tries is None or tries < 1:
++            tries = 1
++        due_time = None
++        if timeout is not None:
++            due_time = time.time() + timeout
++        _try = 1
++        exc_count = 0
+         pack = salt.transport.frame.frame_msg_ipc(msg, raw_body=True)
+-        yield self.stream.write(pack)
++        while _try <= tries:
++            if not self.connected():
++                self.close()
++                self.stream = None
++                self._closing = False
++                try:
++                    yield self.connect(
++                        timeout=(
++                            None if due_time is None else max(due_time - time.time(), 1)
++                        )
++                    )
++                except StreamClosedError:
++                    log.warning(
++                        "IPCMessageClient: Unable to reconnect IPC stream on sending message with ID: 0x%016x%s",
++                        id(msg),
++                        f", retry {_try} of {tries}" if tries > 1 else "",
++                    )
++                    exc_count += 1
++            if self.connected():
++                try:
++                    yield self.stream.write(pack)
++                    return
++                except StreamClosedError:
++                    if self._closing:
++                        break
++                    log.warning(
++                        "IPCMessageClient: Stream was closed on sending message with ID: 0x%016x",
++                        id(msg),
++                    )
++                    exc_count += 1
++                    if exc_count == 1:
++                        # Give one more chance in case if stream was detected as closed
++                        # on the first write attempt
++                        continue
++            cur_time = time.time()
++            _try += 1
++            if _try > tries or (due_time is not None and cur_time > due_time):
++                return
++            yield salt.ext.tornado.gen.sleep(
++                1
++                if due_time is None
++                else (due_time - cur_time) / max(tries - _try + 1, 1)
++            )
+ 
+ 
+ class IPCMessageServer(IPCServer):
+diff --git a/salt/utils/event.py b/salt/utils/event.py
+index ef048335ae..36b530d1af 100644
+--- a/salt/utils/event.py
++++ b/salt/utils/event.py
+@@ -270,6 +270,10 @@ class SaltEvent:
+             # and don't read out events from the buffer on an on-going basis,
+             # the buffer will grow resulting in big memory usage.
+             self.connect_pub()
++        self.pusher_send_timeout = self.opts.get(
++            "pusher_send_timeout", self.opts.get("timeout")
++        )
++        self.pusher_send_tries = self.opts.get("pusher_send_tries", 3)
+ 
+     @classmethod
+     def __load_cache_regex(cls):
+@@ -839,10 +843,18 @@ class SaltEvent:
+             ]
+         )
+         msg = salt.utils.stringutils.to_bytes(event, "utf-8")
++        if timeout is None:
++            timeout_s = self.pusher_send_timeout
++        else:
++            timeout_s = float(timeout) / 1000
+         if self._run_io_loop_sync:
+             with salt.utils.asynchronous.current_ioloop(self.io_loop):
+                 try:
+-                    self.pusher.send(msg)
++                    self.pusher.send(
++                        msg,
++                        timeout=timeout_s,
++                        tries=self.pusher_send_tries,
++                    )
+                 except Exception as exc:  # pylint: disable=broad-except
+                     log.debug(
+                         "Publisher send failed with exception: %s",
+@@ -851,7 +863,12 @@ class SaltEvent:
+                     )
+                     raise
+         else:
+-            self.io_loop.spawn_callback(self.pusher.send, msg)
++            self.io_loop.spawn_callback(
++                self.pusher.send,
++                msg,
++                timeout=timeout_s,
++                tries=self.pusher_send_tries,
++            )
+         return True
+ 
+     def fire_master(self, data, tag, timeout=1000):
+diff --git a/tests/pytests/unit/utils/event/test_event.py b/tests/pytests/unit/utils/event/test_event.py
+index 3eadfaf6ba..fa9e420a93 100644
+--- a/tests/pytests/unit/utils/event/test_event.py
++++ b/tests/pytests/unit/utils/event/test_event.py
+@@ -447,3 +447,46 @@ def test_event_fire_ret_load():
+         )
+         assert mock_log_error.mock_calls[0].args[1] == "minion_id.example.org"
+         assert mock_log_error.mock_calls[0].args[2] == "".join(test_traceback)
++
++
++@pytest.mark.slow_test
++def test_event_single_timeout_tries(sock_dir):
++    """Test an event is sent with timout and tries"""
++
++    write_calls_count = 0
++    real_stream_write = None
++
++    @salt.ext.tornado.gen.coroutine
++    def write_mock(pack):
++        nonlocal write_calls_count
++        nonlocal real_stream_write
++        write_calls_count += 1
++        if write_calls_count > 3:
++            yield real_stream_write(pack)
++        else:
++            raise salt.ext.tornado.iostream.StreamClosedError()
++
++    with eventpublisher_process(str(sock_dir)), salt.utils.event.MasterEvent(
++        str(sock_dir), listen=True
++    ) as me:
++        me.fire_event({"data": "foo1"}, "evt1")
++        evt1 = me.get_event(tag="evt1")
++        _assert_got_event(evt1, {"data": "foo1"})
++        real_stream_write = me.pusher.stream.write
++        with patch.object(
++            me.pusher,
++            "connected",
++            side_effect=[True, True, False, False, True, True],
++        ), patch.object(
++            me.pusher,
++            "connect",
++            side_effect=salt.ext.tornado.iostream.StreamClosedError,
++        ), patch.object(
++            me.pusher.stream,
++            "write",
++            write_mock,
++        ):
++            me.fire_event({"data": "bar2"}, "evt2", timeout=5000)
++            evt2 = me.get_event(tag="evt2")
++            _assert_got_event(evt2, {"data": "bar2"})
++            assert write_calls_count == 4
+-- 
+2.45.0
+
diff --git a/prevent-oom-with-high-amount-of-batch-async-calls-bs.patch b/prevent-oom-with-high-amount-of-batch-async-calls-bs.patch
new file mode 100644
index 0000000..c74e14e
--- /dev/null
+++ b/prevent-oom-with-high-amount-of-batch-async-calls-bs.patch
@@ -0,0 +1,1272 @@
+From d57472b4fa2213ec551197ee2e147aef364fdcfe Mon Sep 17 00:00:00 2001
+From: Victor Zhestkov <vzhestkov@suse.com>
+Date: Wed, 15 May 2024 11:47:35 +0200
+Subject: [PATCH] Prevent OOM with high amount of batch async calls
+ (bsc#1216063)
+
+* Refactor batch_async implementation
+
+* Fix batch_async tests after refactoring
+---
+ salt/cli/batch_async.py                    | 584 ++++++++++++++-------
+ salt/master.py                             |   9 +-
+ tests/pytests/unit/cli/test_batch_async.py | 360 +++++++------
+ 3 files changed, 597 insertions(+), 356 deletions(-)
+
+diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py
+index 1012ce37cc..5d49993faa 100644
+--- a/salt/cli/batch_async.py
++++ b/salt/cli/batch_async.py
+@@ -2,18 +2,193 @@
+ Execute a job on the targeted minions by using a moving window of fixed size `batch`.
+ """
+ 
+-import gc
+-
+-# pylint: enable=import-error,no-name-in-module,redefined-builtin
+ import logging
++import re
+ 
+ import salt.client
+ import salt.ext.tornado
++import salt.utils.event
+ from salt.cli.batch import batch_get_eauth, batch_get_opts, get_bnum
++from salt.ext.tornado.iostream import StreamClosedError
+ 
+ log = logging.getLogger(__name__)
+ 
+ 
++__SHARED_EVENTS_CHANNEL = None
++
++
++def _get_shared_events_channel(opts, io_loop):
++    global __SHARED_EVENTS_CHANNEL
++    if __SHARED_EVENTS_CHANNEL is None:
++        __SHARED_EVENTS_CHANNEL = SharedEventsChannel(opts, io_loop)
++    return __SHARED_EVENTS_CHANNEL
++
++
++def _destroy_unused_shared_events_channel():
++    global __SHARED_EVENTS_CHANNEL
++    if __SHARED_EVENTS_CHANNEL is not None and __SHARED_EVENTS_CHANNEL.destroy_unused():
++        __SHARED_EVENTS_CHANNEL = None
++
++
++def batch_async_required(opts, minions, extra):
++    """
++    Check opts to identify if batch async is required for the operation.
++    """
++    if not isinstance(minions, list):
++        False
++    batch_async_opts = opts.get("batch_async", {})
++    batch_async_threshold = (
++        batch_async_opts.get("threshold", 1)
++        if isinstance(batch_async_opts, dict)
++        else 1
++    )
++    if batch_async_threshold == -1:
++        batch_size = get_bnum(extra, minions, True)
++        return len(minions) >= batch_size
++    elif batch_async_threshold > 0:
++        return len(minions) >= batch_async_threshold
++    return False
++
++
++class SharedEventsChannel:
++    def __init__(self, opts, io_loop):
++        self.io_loop = io_loop
++        self.local_client = salt.client.get_local_client(
++            opts["conf_file"], io_loop=self.io_loop
++        )
++        self.master_event = salt.utils.event.get_event(
++            "master",
++            sock_dir=self.local_client.opts["sock_dir"],
++            opts=self.local_client.opts,
++            listen=True,
++            io_loop=self.io_loop,
++            keep_loop=True,
++        )
++        self.master_event.set_event_handler(self.__handle_event)
++        if self.master_event.subscriber.stream:
++            self.master_event.subscriber.stream.set_close_callback(self.__handle_close)
++        self._re_tag_ret_event = re.compile(r"salt\/job\/(\d+)\/ret\/.*")
++        self._subscribers = {}
++        self._subscriptions = {}
++        self._used_by = set()
++        batch_async_opts = opts.get("batch_async", {})
++        if not isinstance(batch_async_opts, dict):
++            batch_async_opts = {}
++        self._subscriber_reconnect_tries = batch_async_opts.get(
++            "subscriber_reconnect_tries", 5
++        )
++        self._subscriber_reconnect_interval = batch_async_opts.get(
++            "subscriber_reconnect_interval", 1.0
++        )
++        self._reconnecting_subscriber = False
++
++    def subscribe(self, jid, op, subscriber_id, handler):
++        if subscriber_id not in self._subscribers:
++            self._subscribers[subscriber_id] = set()
++        if jid not in self._subscriptions:
++            self._subscriptions[jid] = []
++        self._subscribers[subscriber_id].add(jid)
++        if (op, subscriber_id, handler) not in self._subscriptions[jid]:
++            self._subscriptions[jid].append((op, subscriber_id, handler))
++        if not self.master_event.subscriber.connected():
++            self.__reconnect_subscriber()
++
++    def unsubscribe(self, jid, op, subscriber_id):
++        if subscriber_id not in self._subscribers:
++            return
++        jids = self._subscribers[subscriber_id].copy()
++        if jid is not None:
++            jids = set(jid)
++        for i_jid in jids:
++            self._subscriptions[i_jid] = list(
++                filter(
++                    lambda x: not (op in (x[0], None) and x[1] == subscriber_id),
++                    self._subscriptions.get(i_jid, []),
++                )
++            )
++            self._subscribers[subscriber_id].discard(i_jid)
++        self._subscriptions = dict(filter(lambda x: x[1], self._subscriptions.items()))
++        if not self._subscribers[subscriber_id]:
++            del self._subscribers[subscriber_id]
++
++    @salt.ext.tornado.gen.coroutine
++    def __handle_close(self):
++        if not self._subscriptions:
++            return
++        log.warning("Master Event Subscriber was closed. Trying to reconnect...")
++        yield self.__reconnect_subscriber()
++
++    @salt.ext.tornado.gen.coroutine
++    def __handle_event(self, raw):
++        if self.master_event is None:
++            return
++        try:
++            tag, data = self.master_event.unpack(raw)
++            tag_match = self._re_tag_ret_event.match(tag)
++            if tag_match:
++                jid = tag_match.group(1)
++                if jid in self._subscriptions:
++                    for op, _, handler in self._subscriptions[jid]:
++                        yield handler(tag, data, op)
++        except Exception as ex:  # pylint: disable=W0703
++            log.error(
++                "Exception occured while processing event: %s: %s",
++                tag,
++                ex,
++                exc_info=True,
++            )
++
++    @salt.ext.tornado.gen.coroutine
++    def __reconnect_subscriber(self):
++        if self.master_event.subscriber.connected() or self._reconnecting_subscriber:
++            return
++        self._reconnecting_subscriber = True
++        max_tries = max(1, int(self._subscriber_reconnect_tries))
++        _try = 1
++        while _try <= max_tries:
++            log.info(
++                "Trying to reconnect to event publisher (try %d of %d) ...",
++                _try,
++                max_tries,
++            )
++            try:
++                yield self.master_event.subscriber.connect()
++            except StreamClosedError:
++                log.warning(
++                    "Unable to reconnect to event publisher (try %d of %d)",
++                    _try,
++                    max_tries,
++                )
++            if self.master_event.subscriber.connected():
++                self.master_event.subscriber.stream.set_close_callback(
++                    self.__handle_close
++                )
++                log.info("Event publisher connection restored")
++                self._reconnecting_subscriber = False
++                return
++            if _try < max_tries:
++                yield salt.ext.tornado.gen.sleep(self._subscriber_reconnect_interval)
++            _try += 1
++        self._reconnecting_subscriber = False
++
++    def use(self, subscriber_id):
++        self._used_by.add(subscriber_id)
++        return self
++
++    def unuse(self, subscriber_id):
++        self._used_by.discard(subscriber_id)
++
++    def destroy_unused(self):
++        if self._used_by:
++            return False
++        self.master_event.remove_event_handler(self.__handle_event)
++        self.master_event.destroy()
++        self.master_event = None
++        self.local_client.destroy()
++        self.local_client = None
++        return True
++
++
+ class BatchAsync:
+     """
+     Run a job on the targeted minions by using a moving window of fixed size `batch`.
+@@ -28,14 +203,14 @@ class BatchAsync:
+         - gather_job_timeout: `find_job` timeout
+         - timeout: time to wait before firing a `find_job`
+ 
+-    When the batch stars, a `start` event is fired:
++    When the batch starts, a `start` event is fired:
+          - tag: salt/batch/<batch-jid>/start
+          - data: {
+              "available_minions": self.minions,
+              "down_minions": targeted_minions - presence_ping_minions
+            }
+ 
+-    When the batch ends, an `done` event is fired:
++    When the batch ends, a `done` event is fired:
+         - tag: salt/batch/<batch-jid>/done
+         - data: {
+              "available_minions": self.minions,
+@@ -45,17 +220,26 @@ class BatchAsync:
+          }
+     """
+ 
+-    def __init__(self, parent_opts, jid_gen, clear_load):
+-        ioloop = salt.ext.tornado.ioloop.IOLoop.current()
+-        self.local = salt.client.get_local_client(
+-            parent_opts["conf_file"], io_loop=ioloop
++    def __init__(self, opts, jid_gen, clear_load):
++        self.extra_job_kwargs = {}
++        kwargs = clear_load.get("kwargs", {})
++        for kwarg in ("module_executors", "executor_opts"):
++            if kwarg in kwargs:
++                self.extra_job_kwargs[kwarg] = kwargs[kwarg]
++            elif kwarg in opts:
++                self.extra_job_kwargs[kwarg] = opts[kwarg]
++        self.io_loop = salt.ext.tornado.ioloop.IOLoop.current()
++        self.events_channel = _get_shared_events_channel(opts, self.io_loop).use(
++            id(self)
+         )
+         if "gather_job_timeout" in clear_load["kwargs"]:
+             clear_load["gather_job_timeout"] = clear_load["kwargs"].pop(
+                 "gather_job_timeout"
+             )
+         else:
+-            clear_load["gather_job_timeout"] = self.local.opts["gather_job_timeout"]
++            clear_load["gather_job_timeout"] = self.events_channel.local_client.opts[
++                "gather_job_timeout"
++            ]
+         self.batch_presence_ping_timeout = clear_load["kwargs"].get(
+             "batch_presence_ping_timeout", None
+         )
+@@ -64,8 +248,8 @@ class BatchAsync:
+             clear_load.pop("tgt"),
+             clear_load.pop("fun"),
+             clear_load["kwargs"].pop("batch"),
+-            self.local.opts,
+-            **clear_load
++            self.events_channel.local_client.opts,
++            **clear_load,
+         )
+         self.eauth = batch_get_eauth(clear_load["kwargs"])
+         self.metadata = clear_load["kwargs"].get("metadata", {})
+@@ -78,54 +262,45 @@ class BatchAsync:
+         self.jid_gen = jid_gen
+         self.ping_jid = jid_gen()
+         self.batch_jid = jid_gen()
+-        self.find_job_jid = jid_gen()
+         self.find_job_returned = set()
++        self.metadata.update({"batch_jid": self.batch_jid, "ping_jid": self.ping_jid})
+         self.ended = False
+-        self.event = salt.utils.event.get_event(
+-            "master",
+-            self.opts["sock_dir"],
+-            self.opts["transport"],
+-            opts=self.opts,
+-            listen=True,
+-            io_loop=ioloop,
+-            keep_loop=True,
+-        )
++        self.event = self.events_channel.master_event
+         self.scheduled = False
+-        self.patterns = set()
+ 
+     def __set_event_handler(self):
+-        ping_return_pattern = "salt/job/{}/ret/*".format(self.ping_jid)
+-        batch_return_pattern = "salt/job/{}/ret/*".format(self.batch_jid)
+-        self.event.subscribe(ping_return_pattern, match_type="glob")
+-        self.event.subscribe(batch_return_pattern, match_type="glob")
+-        self.patterns = {
+-            (ping_return_pattern, "ping_return"),
+-            (batch_return_pattern, "batch_run"),
+-        }
+-        self.event.set_event_handler(self.__event_handler)
++        self.events_channel.subscribe(
++            self.ping_jid, "ping_return", id(self), self.__event_handler
++        )
++        self.events_channel.subscribe(
++            self.batch_jid, "batch_run", id(self), self.__event_handler
++        )
+ 
+-    def __event_handler(self, raw):
++    @salt.ext.tornado.gen.coroutine
++    def __event_handler(self, tag, data, op):
+         if not self.event:
+             return
+         try:
+-            mtag, data = self.event.unpack(raw)
+-            for (pattern, op) in self.patterns:
+-                if mtag.startswith(pattern[:-1]):
+-                    minion = data["id"]
+-                    if op == "ping_return":
+-                        self.minions.add(minion)
+-                        if self.targeted_minions == self.minions:
+-                            self.event.io_loop.spawn_callback(self.start_batch)
+-                    elif op == "find_job_return":
+-                        if data.get("return", None):
+-                            self.find_job_returned.add(minion)
+-                    elif op == "batch_run":
+-                        if minion in self.active:
+-                            self.active.remove(minion)
+-                            self.done_minions.add(minion)
+-                            self.event.io_loop.spawn_callback(self.schedule_next)
+-        except Exception as ex:
+-            log.error("Exception occured while processing event: {}".format(ex))
++            minion = data["id"]
++            if op == "ping_return":
++                self.minions.add(minion)
++                if self.targeted_minions == self.minions:
++                    yield self.start_batch()
++            elif op == "find_job_return":
++                if data.get("return", None):
++                    self.find_job_returned.add(minion)
++            elif op == "batch_run":
++                if minion in self.active:
++                    self.active.remove(minion)
++                    self.done_minions.add(minion)
++                    yield self.schedule_next()
++        except Exception as ex:  # pylint: disable=W0703
++            log.error(
++                "Exception occured while processing event: %s: %s",
++                tag,
++                ex,
++                exc_info=True,
++            )
+ 
+     def _get_next(self):
+         to_run = (
+@@ -139,176 +314,203 @@ class BatchAsync:
+         )
+         return set(list(to_run)[:next_batch_size])
+ 
++    @salt.ext.tornado.gen.coroutine
+     def check_find_job(self, batch_minions, jid):
+-        if self.event:
+-            find_job_return_pattern = "salt/job/{}/ret/*".format(jid)
+-            self.event.unsubscribe(find_job_return_pattern, match_type="glob")
+-            self.patterns.remove((find_job_return_pattern, "find_job_return"))
+-
+-            timedout_minions = batch_minions.difference(
+-                self.find_job_returned
+-            ).difference(self.done_minions)
+-            self.timedout_minions = self.timedout_minions.union(timedout_minions)
+-            self.active = self.active.difference(self.timedout_minions)
+-            running = batch_minions.difference(self.done_minions).difference(
+-                self.timedout_minions
+-            )
++        """
++        Check if the job with specified ``jid`` was finished on the minions
++        """
++        if not self.event:
++            return
++        self.events_channel.unsubscribe(jid, "find_job_return", id(self))
+ 
+-            if timedout_minions:
+-                self.schedule_next()
++        timedout_minions = batch_minions.difference(self.find_job_returned).difference(
++            self.done_minions
++        )
++        self.timedout_minions = self.timedout_minions.union(timedout_minions)
++        self.active = self.active.difference(self.timedout_minions)
++        running = batch_minions.difference(self.done_minions).difference(
++            self.timedout_minions
++        )
+ 
+-            if self.event and running:
+-                self.find_job_returned = self.find_job_returned.difference(running)
+-                self.event.io_loop.spawn_callback(self.find_job, running)
++        if timedout_minions:
++            yield self.schedule_next()
++
++        if self.event and running:
++            self.find_job_returned = self.find_job_returned.difference(running)
++            yield self.find_job(running)
+ 
+     @salt.ext.tornado.gen.coroutine
+     def find_job(self, minions):
+-        if self.event:
+-            not_done = minions.difference(self.done_minions).difference(
+-                self.timedout_minions
++        """
++        Find if the job was finished on the minions
++        """
++        if not self.event:
++            return
++        not_done = minions.difference(self.done_minions).difference(
++            self.timedout_minions
++        )
++        if not not_done:
++            return
++        try:
++            jid = self.jid_gen()
++            self.events_channel.subscribe(
++                jid, "find_job_return", id(self), self.__event_handler
+             )
+-            try:
+-                if not_done:
+-                    jid = self.jid_gen()
+-                    find_job_return_pattern = "salt/job/{}/ret/*".format(jid)
+-                    self.patterns.add((find_job_return_pattern, "find_job_return"))
+-                    self.event.subscribe(find_job_return_pattern, match_type="glob")
+-                    ret = yield self.local.run_job_async(
+-                        not_done,
+-                        "saltutil.find_job",
+-                        [self.batch_jid],
+-                        "list",
+-                        gather_job_timeout=self.opts["gather_job_timeout"],
+-                        jid=jid,
+-                        **self.eauth
+-                    )
+-                    yield salt.ext.tornado.gen.sleep(self.opts["gather_job_timeout"])
+-                    if self.event:
+-                        self.event.io_loop.spawn_callback(
+-                            self.check_find_job, not_done, jid
+-                        )
+-            except Exception as ex:
+-                log.error(
+-                    "Exception occured handling batch async: {}. Aborting execution.".format(
+-                        ex
+-                    )
+-                )
+-                self.close_safe()
++            ret = yield self.events_channel.local_client.run_job_async(
++                not_done,
++                "saltutil.find_job",
++                [self.batch_jid],
++                "list",
++                gather_job_timeout=self.opts["gather_job_timeout"],
++                jid=jid,
++                io_loop=self.io_loop,
++                listen=False,
++                **self.eauth,
++            )
++            yield salt.ext.tornado.gen.sleep(self.opts["gather_job_timeout"])
++            if self.event:
++                yield self.check_find_job(not_done, jid)
++        except Exception as ex:  # pylint: disable=W0703
++            log.error(
++                "Exception occured handling batch async: %s. Aborting execution.",
++                ex,
++                exc_info=True,
++            )
++            self.close_safe()
+ 
+     @salt.ext.tornado.gen.coroutine
+     def start(self):
++        """
++        Start the batch execution
++        """
++        if not self.event:
++            return
++        self.__set_event_handler()
++        ping_return = yield self.events_channel.local_client.run_job_async(
++            self.opts["tgt"],
++            "test.ping",
++            [],
++            self.opts.get("selected_target_option", self.opts.get("tgt_type", "glob")),
++            gather_job_timeout=self.opts["gather_job_timeout"],
++            jid=self.ping_jid,
++            metadata=self.metadata,
++            io_loop=self.io_loop,
++            listen=False,
++            **self.eauth,
++        )
++        self.targeted_minions = set(ping_return["minions"])
++        # start batching even if not all minions respond to ping
++        yield salt.ext.tornado.gen.sleep(
++            self.batch_presence_ping_timeout or self.opts["gather_job_timeout"]
++        )
+         if self.event:
+-            self.__set_event_handler()
+-            ping_return = yield self.local.run_job_async(
+-                self.opts["tgt"],
+-                "test.ping",
+-                [],
+-                self.opts.get(
+-                    "selected_target_option", self.opts.get("tgt_type", "glob")
+-                ),
+-                gather_job_timeout=self.opts["gather_job_timeout"],
+-                jid=self.ping_jid,
+-                metadata=self.metadata,
+-                **self.eauth
+-            )
+-            self.targeted_minions = set(ping_return["minions"])
+-            # start batching even if not all minions respond to ping
+-            yield salt.ext.tornado.gen.sleep(
+-                self.batch_presence_ping_timeout or self.opts["gather_job_timeout"]
+-            )
+-            if self.event:
+-                self.event.io_loop.spawn_callback(self.start_batch)
++            yield self.start_batch()
+ 
+     @salt.ext.tornado.gen.coroutine
+     def start_batch(self):
+-        if not self.initialized:
+-            self.batch_size = get_bnum(self.opts, self.minions, True)
+-            self.initialized = True
+-            data = {
+-                "available_minions": self.minions,
+-                "down_minions": self.targeted_minions.difference(self.minions),
+-                "metadata": self.metadata,
+-            }
+-            ret = self.event.fire_event(
+-                data, "salt/batch/{}/start".format(self.batch_jid)
+-            )
+-            if self.event:
+-                self.event.io_loop.spawn_callback(self.run_next)
++        """
++        Fire `salt/batch/*/start` and continue batch with `run_next`
++        """
++        if self.initialized:
++            return
++        self.batch_size = get_bnum(self.opts, self.minions, True)
++        self.initialized = True
++        data = {
++            "available_minions": self.minions,
++            "down_minions": self.targeted_minions.difference(self.minions),
++            "metadata": self.metadata,
++        }
++        yield self.events_channel.master_event.fire_event_async(
++            data, f"salt/batch/{self.batch_jid}/start"
++        )
++        if self.event:
++            yield self.run_next()
+ 
+     @salt.ext.tornado.gen.coroutine
+     def end_batch(self):
++        """
++        End the batch and call safe closing
++        """
+         left = self.minions.symmetric_difference(
+             self.done_minions.union(self.timedout_minions)
+         )
+-        if not left and not self.ended:
+-            self.ended = True
+-            data = {
+-                "available_minions": self.minions,
+-                "down_minions": self.targeted_minions.difference(self.minions),
+-                "done_minions": self.done_minions,
+-                "timedout_minions": self.timedout_minions,
+-                "metadata": self.metadata,
+-            }
+-            self.event.fire_event(data, "salt/batch/{}/done".format(self.batch_jid))
+-
+-            # release to the IOLoop to allow the event to be published
+-            # before closing batch async execution
+-            yield salt.ext.tornado.gen.sleep(1)
+-            self.close_safe()
++        # Send salt/batch/*/done only if there is nothing to do
++        # and the event haven't been sent already
++        if left or self.ended:
++            return
++        self.ended = True
++        data = {
++            "available_minions": self.minions,
++            "down_minions": self.targeted_minions.difference(self.minions),
++            "done_minions": self.done_minions,
++            "timedout_minions": self.timedout_minions,
++            "metadata": self.metadata,
++        }
++        yield self.events_channel.master_event.fire_event_async(
++            data, f"salt/batch/{self.batch_jid}/done"
++        )
++
++        # release to the IOLoop to allow the event to be published
++        # before closing batch async execution
++        yield salt.ext.tornado.gen.sleep(1)
++        self.close_safe()
+ 
+     def close_safe(self):
+-        for (pattern, label) in self.patterns:
+-            self.event.unsubscribe(pattern, match_type="glob")
+-        self.event.remove_event_handler(self.__event_handler)
++        if self.events_channel is not None:
++            self.events_channel.unsubscribe(None, None, id(self))
++            self.events_channel.unuse(id(self))
++            self.events_channel = None
++            _destroy_unused_shared_events_channel()
+         self.event = None
+-        self.local = None
+-        self.ioloop = None
+-        del self
+-        gc.collect()
+ 
+     @salt.ext.tornado.gen.coroutine
+     def schedule_next(self):
+-        if not self.scheduled:
+-            self.scheduled = True
+-            # call later so that we maybe gather more returns
+-            yield salt.ext.tornado.gen.sleep(self.batch_delay)
+-            if self.event:
+-                self.event.io_loop.spawn_callback(self.run_next)
++        if self.scheduled:
++            return
++        self.scheduled = True
++        # call later so that we maybe gather more returns
++        yield salt.ext.tornado.gen.sleep(self.batch_delay)
++        if self.event:
++            yield self.run_next()
+ 
+     @salt.ext.tornado.gen.coroutine
+     def run_next(self):
++        """
++        Continue batch execution with the next targets
++        """
+         self.scheduled = False
+         next_batch = self._get_next()
+-        if next_batch:
+-            self.active = self.active.union(next_batch)
+-            try:
+-                ret = yield self.local.run_job_async(
+-                    next_batch,
+-                    self.opts["fun"],
+-                    self.opts["arg"],
+-                    "list",
+-                    raw=self.opts.get("raw", False),
+-                    ret=self.opts.get("return", ""),
+-                    gather_job_timeout=self.opts["gather_job_timeout"],
+-                    jid=self.batch_jid,
+-                    metadata=self.metadata,
+-                )
+-
+-                yield salt.ext.tornado.gen.sleep(self.opts["timeout"])
+-
+-                # The batch can be done already at this point, which means no self.event
+-                if self.event:
+-                    self.event.io_loop.spawn_callback(self.find_job, set(next_batch))
+-            except Exception as ex:
+-                log.error("Error in scheduling next batch: %s. Aborting execution", ex)
+-                self.active = self.active.difference(next_batch)
+-                self.close_safe()
+-        else:
++        if not next_batch:
+             yield self.end_batch()
+-        gc.collect()
++            return
++        self.active = self.active.union(next_batch)
++        try:
++            ret = yield self.events_channel.local_client.run_job_async(
++                next_batch,
++                self.opts["fun"],
++                self.opts["arg"],
++                "list",
++                raw=self.opts.get("raw", False),
++                ret=self.opts.get("return", ""),
++                gather_job_timeout=self.opts["gather_job_timeout"],
++                jid=self.batch_jid,
++                metadata=self.metadata,
++                io_loop=self.io_loop,
++                listen=False,
++                **self.eauth,
++                **self.extra_job_kwargs,
++            )
+ 
+-    def __del__(self):
+-        self.local = None
+-        self.event = None
+-        self.ioloop = None
+-        gc.collect()
++            yield salt.ext.tornado.gen.sleep(self.opts["timeout"])
++
++            # The batch can be done already at this point, which means no self.event
++            if self.event:
++                yield self.find_job(set(next_batch))
++        except Exception as ex:  # pylint: disable=W0703
++            log.error(
++                "Error in scheduling next batch: %s. Aborting execution",
++                ex,
++                exc_info=True,
++            )
++            self.active = self.active.difference(next_batch)
++            self.close_safe()
+diff --git a/salt/master.py b/salt/master.py
+index 425b412148..d7182d10b5 100644
+--- a/salt/master.py
++++ b/salt/master.py
+@@ -2,6 +2,7 @@
+ This module contains all of the routines needed to set up a master server, this
+ involves preparing the three listeners and the workers needed by the master.
+ """
++
+ import collections
+ import copy
+ import ctypes
+@@ -19,7 +20,6 @@ import time
+ import salt.acl
+ import salt.auth
+ import salt.channel.server
+-import salt.cli.batch_async
+ import salt.client
+ import salt.client.ssh.client
+ import salt.crypt
+@@ -55,6 +55,7 @@ import salt.utils.user
+ import salt.utils.verify
+ import salt.utils.zeromq
+ import salt.wheel
++from salt.cli.batch_async import BatchAsync, batch_async_required
+ from salt.config import DEFAULT_INTERVAL
+ from salt.defaults import DEFAULT_TARGET_DELIM
+ from salt.ext.tornado.stack_context import StackContext
+@@ -2174,9 +2175,9 @@ class ClearFuncs(TransportMethods):
+     def publish_batch(self, clear_load, minions, missing):
+         batch_load = {}
+         batch_load.update(clear_load)
+-        batch = salt.cli.batch_async.BatchAsync(
++        batch = BatchAsync(
+             self.local.opts,
+-            functools.partial(self._prep_jid, clear_load, {}),
++            lambda: self._prep_jid(clear_load, {}),
+             batch_load,
+         )
+         ioloop = salt.ext.tornado.ioloop.IOLoop.current()
+@@ -2331,7 +2332,7 @@ class ClearFuncs(TransportMethods):
+                         ),
+                     },
+                 }
+-        if extra.get("batch", None):
++        if extra.get("batch", None) and batch_async_required(self.opts, minions, extra):
+             return self.publish_batch(clear_load, minions, missing)
+ 
+         jid = self._prep_jid(clear_load, extra)
+diff --git a/tests/pytests/unit/cli/test_batch_async.py b/tests/pytests/unit/cli/test_batch_async.py
+index e0774ffff3..bc871aba54 100644
+--- a/tests/pytests/unit/cli/test_batch_async.py
++++ b/tests/pytests/unit/cli/test_batch_async.py
+@@ -1,7 +1,7 @@
+ import pytest
+ 
+ import salt.ext.tornado
+-from salt.cli.batch_async import BatchAsync
++from salt.cli.batch_async import BatchAsync, batch_async_required
+ from tests.support.mock import MagicMock, patch
+ 
+ 
+@@ -22,16 +22,44 @@ def batch(temp_salt_master):
+         with patch("salt.cli.batch_async.batch_get_opts", MagicMock(return_value=opts)):
+             batch = BatchAsync(
+                 opts,
+-                MagicMock(side_effect=["1234", "1235", "1236"]),
++                MagicMock(side_effect=["1234", "1235"]),
+                 {
+                     "tgt": "",
+                     "fun": "",
+-                    "kwargs": {"batch": "", "batch_presence_ping_timeout": 1},
++                    "kwargs": {
++                        "batch": "",
++                        "batch_presence_ping_timeout": 1,
++                        "metadata": {"mykey": "myvalue"},
++                    },
+                 },
+             )
+             yield batch
+ 
+ 
++@pytest.mark.parametrize(
++    "threshold,minions,batch,expected",
++    [
++        (1, 2, 200, True),
++        (1, 500, 200, True),
++        (0, 2, 200, False),
++        (0, 500, 200, False),
++        (-1, 2, 200, False),
++        (-1, 500, 200, True),
++        (-1, 9, 10, False),
++        (-1, 11, 10, True),
++        (10, 9, 8, False),
++        (10, 9, 10, False),
++        (10, 11, 8, True),
++        (10, 11, 10, True),
++    ],
++)
++def test_batch_async_required(threshold, minions, batch, expected):
++    minions_list = [f"minion{i}.example.org" for i in range(minions)]
++    batch_async_opts = {"batch_async": {"threshold": threshold}}
++    extra = {"batch": batch}
++    assert batch_async_required(batch_async_opts, minions_list, extra) == expected
++
++
+ def test_ping_jid(batch):
+     assert batch.ping_jid == "1234"
+ 
+@@ -40,10 +68,6 @@ def test_batch_jid(batch):
+     assert batch.batch_jid == "1235"
+ 
+ 
+-def test_find_job_jid(batch):
+-    assert batch.find_job_jid == "1236"
+-
+-
+ def test_batch_size(batch):
+     """
+     Tests passing batch value as a number
+@@ -55,58 +79,74 @@ def test_batch_size(batch):
+ 
+ 
+ def test_batch_start_on_batch_presence_ping_timeout(batch):
+-    # batch_async = BatchAsyncMock();
+-    batch.event = MagicMock()
++    future_ret = salt.ext.tornado.gen.Future()
++    future_ret.set_result({"minions": ["foo", "bar"]})
+     future = salt.ext.tornado.gen.Future()
+-    future.set_result({"minions": ["foo", "bar"]})
+-    batch.local.run_job_async.return_value = future
+-    with patch("salt.ext.tornado.gen.sleep", return_value=future):
+-        # ret = batch_async.start(batch)
++    future.set_result({})
++    with patch.object(batch, "events_channel", MagicMock()), patch(
++        "salt.ext.tornado.gen.sleep", return_value=future
++    ), patch.object(batch, "start_batch", return_value=future) as start_batch_mock:
++        batch.events_channel.local_client.run_job_async.return_value = future_ret
+         ret = batch.start()
+-        # assert start_batch is called later with batch_presence_ping_timeout as param
+-        assert batch.event.io_loop.spawn_callback.call_args[0] == (batch.start_batch,)
++        # assert start_batch is called
++        start_batch_mock.assert_called_once()
+         # assert test.ping called
+-        assert batch.local.run_job_async.call_args[0] == ("*", "test.ping", [], "glob")
++        assert batch.events_channel.local_client.run_job_async.call_args[0] == (
++            "*",
++            "test.ping",
++            [],
++            "glob",
++        )
+         # assert targeted_minions == all minions matched by tgt
+         assert batch.targeted_minions == {"foo", "bar"}
+ 
+ 
+ def test_batch_start_on_gather_job_timeout(batch):
+-    # batch_async = BatchAsyncMock();
+-    batch.event = MagicMock()
+     future = salt.ext.tornado.gen.Future()
+-    future.set_result({"minions": ["foo", "bar"]})
+-    batch.local.run_job_async.return_value = future
++    future.set_result({})
++    future_ret = salt.ext.tornado.gen.Future()
++    future_ret.set_result({"minions": ["foo", "bar"]})
+     batch.batch_presence_ping_timeout = None
+-    with patch("salt.ext.tornado.gen.sleep", return_value=future):
++    with patch.object(batch, "events_channel", MagicMock()), patch(
++        "salt.ext.tornado.gen.sleep", return_value=future
++    ), patch.object(
++        batch, "start_batch", return_value=future
++    ) as start_batch_mock, patch.object(
++        batch, "batch_presence_ping_timeout", None
++    ):
++        batch.events_channel.local_client.run_job_async.return_value = future_ret
+         # ret = batch_async.start(batch)
+         ret = batch.start()
+-        # assert start_batch is called later with gather_job_timeout as param
+-        assert batch.event.io_loop.spawn_callback.call_args[0] == (batch.start_batch,)
++        # assert start_batch is called
++        start_batch_mock.assert_called_once()
+ 
+ 
+ def test_batch_fire_start_event(batch):
+     batch.minions = {"foo", "bar"}
+     batch.opts = {"batch": "2", "timeout": 5}
+-    batch.event = MagicMock()
+-    batch.metadata = {"mykey": "myvalue"}
+-    batch.start_batch()
+-    assert batch.event.fire_event.call_args[0] == (
+-        {
+-            "available_minions": {"foo", "bar"},
+-            "down_minions": set(),
+-            "metadata": batch.metadata,
+-        },
+-        "salt/batch/1235/start",
+-    )
++    with patch.object(batch, "events_channel", MagicMock()):
++        batch.start_batch()
++        assert batch.events_channel.master_event.fire_event_async.call_args[0] == (
++            {
++                "available_minions": {"foo", "bar"},
++                "down_minions": set(),
++                "metadata": batch.metadata,
++            },
++            "salt/batch/1235/start",
++        )
+ 
+ 
+ def test_start_batch_calls_next(batch):
+-    batch.run_next = MagicMock(return_value=MagicMock())
+-    batch.event = MagicMock()
+-    batch.start_batch()
+-    assert batch.initialized
+-    assert batch.event.io_loop.spawn_callback.call_args[0] == (batch.run_next,)
++    batch.initialized = False
++    future = salt.ext.tornado.gen.Future()
++    future.set_result({})
++    with patch.object(batch, "event", MagicMock()), patch.object(
++        batch, "events_channel", MagicMock()
++    ), patch.object(batch, "run_next", return_value=future) as run_next_mock:
++        batch.events_channel.master_event.fire_event_async.return_value = future
++        batch.start_batch()
++        assert batch.initialized
++        run_next_mock.assert_called_once()
+ 
+ 
+ def test_batch_fire_done_event(batch):
+@@ -114,69 +154,52 @@ def test_batch_fire_done_event(batch):
+     batch.minions = {"foo", "bar"}
+     batch.done_minions = {"foo"}
+     batch.timedout_minions = {"bar"}
+-    batch.event = MagicMock()
+-    batch.metadata = {"mykey": "myvalue"}
+-    old_event = batch.event
+-    batch.end_batch()
+-    assert old_event.fire_event.call_args[0] == (
+-        {
+-            "available_minions": {"foo", "bar"},
+-            "done_minions": batch.done_minions,
+-            "down_minions": {"baz"},
+-            "timedout_minions": batch.timedout_minions,
+-            "metadata": batch.metadata,
+-        },
+-        "salt/batch/1235/done",
+-    )
+-
+-
+-def test_batch__del__(batch):
+-    batch = BatchAsync(MagicMock(), MagicMock(), MagicMock())
+-    event = MagicMock()
+-    batch.event = event
+-    batch.__del__()
+-    assert batch.local is None
+-    assert batch.event is None
+-    assert batch.ioloop is None
++    with patch.object(batch, "events_channel", MagicMock()):
++        batch.end_batch()
++        assert batch.events_channel.master_event.fire_event_async.call_args[0] == (
++            {
++                "available_minions": {"foo", "bar"},
++                "done_minions": batch.done_minions,
++                "down_minions": {"baz"},
++                "timedout_minions": batch.timedout_minions,
++                "metadata": batch.metadata,
++            },
++            "salt/batch/1235/done",
++        )
+ 
+ 
+ def test_batch_close_safe(batch):
+-    batch = BatchAsync(MagicMock(), MagicMock(), MagicMock())
+-    event = MagicMock()
+-    batch.event = event
+-    batch.patterns = {
+-        ("salt/job/1234/ret/*", "find_job_return"),
+-        ("salt/job/4321/ret/*", "find_job_return"),
+-    }
+-    batch.close_safe()
+-    assert batch.local is None
+-    assert batch.event is None
+-    assert batch.ioloop is None
+-    assert len(event.unsubscribe.mock_calls) == 2
+-    assert len(event.remove_event_handler.mock_calls) == 1
++    with patch.object(
++        batch, "events_channel", MagicMock()
++    ) as events_channel_mock, patch.object(batch, "event", MagicMock()):
++        batch.close_safe()
++        batch.close_safe()
++        assert batch.events_channel is None
++        assert batch.event is None
++        events_channel_mock.unsubscribe.assert_called_once()
++        events_channel_mock.unuse.assert_called_once()
+ 
+ 
+ def test_batch_next(batch):
+-    batch.event = MagicMock()
+     batch.opts["fun"] = "my.fun"
+     batch.opts["arg"] = []
+-    batch._get_next = MagicMock(return_value={"foo", "bar"})
+     batch.batch_size = 2
+     future = salt.ext.tornado.gen.Future()
+-    future.set_result({"minions": ["foo", "bar"]})
+-    batch.local.run_job_async.return_value = future
+-    with patch("salt.ext.tornado.gen.sleep", return_value=future):
++    future.set_result({})
++    with patch("salt.ext.tornado.gen.sleep", return_value=future), patch.object(
++        batch, "events_channel", MagicMock()
++    ), patch.object(batch, "_get_next", return_value={"foo", "bar"}), patch.object(
++        batch, "find_job", return_value=future
++    ) as find_job_mock:
++        batch.events_channel.local_client.run_job_async.return_value = future
+         batch.run_next()
+-        assert batch.local.run_job_async.call_args[0] == (
++        assert batch.events_channel.local_client.run_job_async.call_args[0] == (
+             {"foo", "bar"},
+             "my.fun",
+             [],
+             "list",
+         )
+-        assert batch.event.io_loop.spawn_callback.call_args[0] == (
+-            batch.find_job,
+-            {"foo", "bar"},
+-        )
++        assert find_job_mock.call_args[0] == ({"foo", "bar"},)
+         assert batch.active == {"bar", "foo"}
+ 
+ 
+@@ -239,124 +262,132 @@ def test_next_batch_all_timedout(batch):
+ 
+ def test_batch__event_handler_ping_return(batch):
+     batch.targeted_minions = {"foo"}
+-    batch.event = MagicMock(
+-        unpack=MagicMock(return_value=("salt/job/1234/ret/foo", {"id": "foo"}))
+-    )
+     batch.start()
+     assert batch.minions == set()
+-    batch._BatchAsync__event_handler(MagicMock())
++    batch._BatchAsync__event_handler(
++        "salt/job/1234/ret/foo", {"id": "foo"}, "ping_return"
++    )
+     assert batch.minions == {"foo"}
+     assert batch.done_minions == set()
+ 
+ 
+ def test_batch__event_handler_call_start_batch_when_all_pings_return(batch):
+     batch.targeted_minions = {"foo"}
+-    batch.event = MagicMock(
+-        unpack=MagicMock(return_value=("salt/job/1234/ret/foo", {"id": "foo"}))
+-    )
+-    batch.start()
+-    batch._BatchAsync__event_handler(MagicMock())
+-    assert batch.event.io_loop.spawn_callback.call_args[0] == (batch.start_batch,)
++    future = salt.ext.tornado.gen.Future()
++    future.set_result({})
++    with patch.object(batch, "start_batch", return_value=future) as start_batch_mock:
++        batch.start()
++        batch._BatchAsync__event_handler(
++            "salt/job/1234/ret/foo", {"id": "foo"}, "ping_return"
++        )
++        start_batch_mock.assert_called_once()
+ 
+ 
+ def test_batch__event_handler_not_call_start_batch_when_not_all_pings_return(batch):
+     batch.targeted_minions = {"foo", "bar"}
+-    batch.event = MagicMock(
+-        unpack=MagicMock(return_value=("salt/job/1234/ret/foo", {"id": "foo"}))
+-    )
+-    batch.start()
+-    batch._BatchAsync__event_handler(MagicMock())
+-    assert len(batch.event.io_loop.spawn_callback.mock_calls) == 0
++    future = salt.ext.tornado.gen.Future()
++    future.set_result({})
++    with patch.object(batch, "start_batch", return_value=future) as start_batch_mock:
++        batch.start()
++        batch._BatchAsync__event_handler(
++            "salt/job/1234/ret/foo", {"id": "foo"}, "ping_return"
++        )
++        start_batch_mock.assert_not_called()
+ 
+ 
+ def test_batch__event_handler_batch_run_return(batch):
+-    batch.event = MagicMock(
+-        unpack=MagicMock(return_value=("salt/job/1235/ret/foo", {"id": "foo"}))
+-    )
+-    batch.start()
+-    batch.active = {"foo"}
+-    batch._BatchAsync__event_handler(MagicMock())
+-    assert batch.active == set()
+-    assert batch.done_minions == {"foo"}
+-    assert batch.event.io_loop.spawn_callback.call_args[0] == (batch.schedule_next,)
++    future = salt.ext.tornado.gen.Future()
++    future.set_result({})
++    with patch.object(
++        batch, "schedule_next", return_value=future
++    ) as schedule_next_mock:
++        batch.start()
++        batch.active = {"foo"}
++        batch._BatchAsync__event_handler(
++            "salt/job/1235/ret/foo", {"id": "foo"}, "batch_run"
++        )
++        assert batch.active == set()
++        assert batch.done_minions == {"foo"}
++        schedule_next_mock.assert_called_once()
+ 
+ 
+ def test_batch__event_handler_find_job_return(batch):
+-    batch.event = MagicMock(
+-        unpack=MagicMock(
+-            return_value=(
+-                "salt/job/1236/ret/foo",
+-                {"id": "foo", "return": "deadbeaf"},
+-            )
+-        )
+-    )
+     batch.start()
+-    batch.patterns.add(("salt/job/1236/ret/*", "find_job_return"))
+-    batch._BatchAsync__event_handler(MagicMock())
++    batch._BatchAsync__event_handler(
++        "salt/job/1236/ret/foo", {"id": "foo", "return": "deadbeaf"}, "find_job_return"
++    )
+     assert batch.find_job_returned == {"foo"}
+ 
+ 
+ def test_batch_run_next_end_batch_when_no_next(batch):
+-    batch.end_batch = MagicMock()
+-    batch._get_next = MagicMock(return_value={})
+-    batch.run_next()
+-    assert len(batch.end_batch.mock_calls) == 1
++    future = salt.ext.tornado.gen.Future()
++    future.set_result({})
++    with patch.object(
++        batch, "_get_next", return_value={}
++    ), patch.object(
++        batch, "end_batch", return_value=future
++    ) as end_batch_mock:
++        batch.run_next()
++        end_batch_mock.assert_called_once()
+ 
+ 
+ def test_batch_find_job(batch):
+-    batch.event = MagicMock()
+     future = salt.ext.tornado.gen.Future()
+     future.set_result({})
+-    batch.local.run_job_async.return_value = future
+     batch.minions = {"foo", "bar"}
+-    batch.jid_gen = MagicMock(return_value="1234")
+-    with patch("salt.ext.tornado.gen.sleep", return_value=future):
++    with patch("salt.ext.tornado.gen.sleep", return_value=future), patch.object(
++        batch, "check_find_job", return_value=future
++    ) as check_find_job_mock, patch.object(
++        batch, "jid_gen", return_value="1236"
++    ):
++        batch.events_channel.local_client.run_job_async.return_value = future
+         batch.find_job({"foo", "bar"})
+-        assert batch.event.io_loop.spawn_callback.call_args[0] == (
+-            batch.check_find_job,
++        assert check_find_job_mock.call_args[0] == (
+             {"foo", "bar"},
+-            "1234",
++            "1236",
+         )
+ 
+ 
+ def test_batch_find_job_with_done_minions(batch):
+     batch.done_minions = {"bar"}
+-    batch.event = MagicMock()
+     future = salt.ext.tornado.gen.Future()
+     future.set_result({})
+-    batch.local.run_job_async.return_value = future
+     batch.minions = {"foo", "bar"}
+-    batch.jid_gen = MagicMock(return_value="1234")
+-    with patch("salt.ext.tornado.gen.sleep", return_value=future):
++    with patch("salt.ext.tornado.gen.sleep", return_value=future), patch.object(
++        batch, "check_find_job", return_value=future
++    ) as check_find_job_mock, patch.object(
++        batch, "jid_gen", return_value="1236"
++    ):
++        batch.events_channel.local_client.run_job_async.return_value = future
+         batch.find_job({"foo", "bar"})
+-        assert batch.event.io_loop.spawn_callback.call_args[0] == (
+-            batch.check_find_job,
++        assert check_find_job_mock.call_args[0] == (
+             {"foo"},
+-            "1234",
++            "1236",
+         )
+ 
+ 
+ def test_batch_check_find_job_did_not_return(batch):
+-    batch.event = MagicMock()
+     batch.active = {"foo"}
+     batch.find_job_returned = set()
+-    batch.patterns = {("salt/job/1234/ret/*", "find_job_return")}
+-    batch.check_find_job({"foo"}, jid="1234")
+-    assert batch.find_job_returned == set()
+-    assert batch.active == set()
+-    assert len(batch.event.io_loop.add_callback.mock_calls) == 0
++    future = salt.ext.tornado.gen.Future()
++    future.set_result({})
++    with patch.object(batch, "find_job", return_value=future) as find_job_mock:
++        batch.check_find_job({"foo"}, jid="1234")
++        assert batch.find_job_returned == set()
++        assert batch.active == set()
++        find_job_mock.assert_not_called()
+ 
+ 
+ def test_batch_check_find_job_did_return(batch):
+-    batch.event = MagicMock()
+     batch.find_job_returned = {"foo"}
+-    batch.patterns = {("salt/job/1234/ret/*", "find_job_return")}
+-    batch.check_find_job({"foo"}, jid="1234")
+-    assert batch.event.io_loop.spawn_callback.call_args[0] == (batch.find_job, {"foo"})
++    future = salt.ext.tornado.gen.Future()
++    future.set_result({})
++    with patch.object(batch, "find_job", return_value=future) as find_job_mock:
++        batch.check_find_job({"foo"}, jid="1234")
++        find_job_mock.assert_called_once_with({"foo"})
+ 
+ 
+ def test_batch_check_find_job_multiple_states(batch):
+-    batch.event = MagicMock()
+     # currently running minions
+     batch.active = {"foo", "bar"}
+ 
+@@ -372,21 +403,28 @@ def test_batch_check_find_job_multiple_states(batch):
+     # both not yet done but only 'foo' responded to find_job
+     not_done = {"foo", "bar"}
+ 
+-    batch.patterns = {("salt/job/1234/ret/*", "find_job_return")}
+-    batch.check_find_job(not_done, jid="1234")
++    future = salt.ext.tornado.gen.Future()
++    future.set_result({})
+ 
+-    # assert 'bar' removed from active
+-    assert batch.active == {"foo"}
++    with patch.object(batch, "schedule_next", return_value=future), patch.object(
++        batch, "find_job", return_value=future
++    ) as find_job_mock:
++        batch.check_find_job(not_done, jid="1234")
+ 
+-    # assert 'bar' added to timedout_minions
+-    assert batch.timedout_minions == {"bar", "faz"}
++        # assert 'bar' removed from active
++        assert batch.active == {"foo"}
+ 
+-    # assert 'find_job' schedueled again only for 'foo'
+-    assert batch.event.io_loop.spawn_callback.call_args[0] == (batch.find_job, {"foo"})
++        # assert 'bar' added to timedout_minions
++        assert batch.timedout_minions == {"bar", "faz"}
++
++        # assert 'find_job' schedueled again only for 'foo'
++        find_job_mock.assert_called_once_with({"foo"})
+ 
+ 
+ def test_only_on_run_next_is_scheduled(batch):
+-    batch.event = MagicMock()
++    future = salt.ext.tornado.gen.Future()
++    future.set_result({})
+     batch.scheduled = True
+-    batch.schedule_next()
+-    assert len(batch.event.io_loop.spawn_callback.mock_calls) == 0
++    with patch.object(batch, "run_next", return_value=future) as run_next_mock:
++        batch.schedule_next()
++        run_next_mock.assert_not_called()
+-- 
+2.45.0
+
diff --git a/prevent-possible-exception-in-tornado.concurrent.fut.patch b/prevent-possible-exception-in-tornado.concurrent.fut.patch
new file mode 100644
index 0000000..cdaf248
--- /dev/null
+++ b/prevent-possible-exception-in-tornado.concurrent.fut.patch
@@ -0,0 +1,37 @@
+From 859be3e8213d4b5814a18fa6e9f3f17bf65b3183 Mon Sep 17 00:00:00 2001
+From: Victor Zhestkov <vzhestkov@suse.com>
+Date: Wed, 15 May 2024 09:45:26 +0200
+Subject: [PATCH] Prevent possible exception in
+ tornado.concurrent.Future._set_done
+
+---
+ salt/ext/tornado/concurrent.py | 13 +++++++------
+ 1 file changed, 7 insertions(+), 6 deletions(-)
+
+diff --git a/salt/ext/tornado/concurrent.py b/salt/ext/tornado/concurrent.py
+index bea09ba125..011808ed27 100644
+--- a/salt/ext/tornado/concurrent.py
++++ b/salt/ext/tornado/concurrent.py
+@@ -330,12 +330,13 @@ class Future(object):
+ 
+     def _set_done(self):
+         self._done = True
+-        for cb in self._callbacks:
+-            try:
+-                cb(self)
+-            except Exception:
+-                app_log.exception("Exception in callback %r for %r", cb, self)
+-        self._callbacks = None
++        if self._callbacks:
++            for cb in self._callbacks:
++                try:
++                    cb(self)
++                except Exception:
++                    app_log.exception("Exception in callback %r for %r", cb, self)
++            self._callbacks = None
+ 
+     # On Python 3.3 or older, objects with a destructor part of a reference
+     # cycle are never destroyed. It's no longer the case on Python 3.4 thanks to
+-- 
+2.45.0
+
diff --git a/remove-redundant-_file_find-call-to-the-master.patch b/remove-redundant-_file_find-call-to-the-master.patch
new file mode 100644
index 0000000..3487c29
--- /dev/null
+++ b/remove-redundant-_file_find-call-to-the-master.patch
@@ -0,0 +1,40 @@
+From 768495df67725ae840b06d321bef8299eca2368a Mon Sep 17 00:00:00 2001
+From: Victor Zhestkov <vzhestkov@suse.com>
+Date: Wed, 15 May 2024 09:47:40 +0200
+Subject: [PATCH] Remove redundant `_file_find` call to the master
+
+---
+ salt/fileclient.py | 10 ++--------
+ 1 file changed, 2 insertions(+), 8 deletions(-)
+
+diff --git a/salt/fileclient.py b/salt/fileclient.py
+index f01a86dd0d..f4b8d76dbe 100644
+--- a/salt/fileclient.py
++++ b/salt/fileclient.py
+@@ -1162,10 +1162,7 @@ class RemoteClient(Client):
+         if senv:
+             saltenv = senv
+ 
+-        if not salt.utils.platform.is_windows():
+-            hash_server, stat_server = self.hash_and_stat_file(path, saltenv)
+-        else:
+-            hash_server = self.hash_file(path, saltenv)
++        hash_server = self.hash_file(path, saltenv)
+ 
+         # Check if file exists on server, before creating files and
+         # directories
+@@ -1206,10 +1203,7 @@ class RemoteClient(Client):
+         )
+ 
+         if dest2check and os.path.isfile(dest2check):
+-            if not salt.utils.platform.is_windows():
+-                hash_local, stat_local = self.hash_and_stat_file(dest2check, saltenv)
+-            else:
+-                hash_local = self.hash_file(dest2check, saltenv)
++            hash_local = self.hash_file(dest2check, saltenv)
+ 
+             if hash_local == hash_server:
+                 return dest2check
+-- 
+2.45.0
+
diff --git a/remove-unused-import-causing-delays-on-starting-salt.patch b/remove-unused-import-causing-delays-on-starting-salt.patch
new file mode 100644
index 0000000..12e6843
--- /dev/null
+++ b/remove-unused-import-causing-delays-on-starting-salt.patch
@@ -0,0 +1,25 @@
+From 032a470b6a2dbffbdee52c4f6a2a7fc40e3b6f85 Mon Sep 17 00:00:00 2001
+From: Victor Zhestkov <vzhestkov@suse.com>
+Date: Wed, 15 May 2024 09:18:42 +0200
+Subject: [PATCH] Remove unused import causing delays on starting
+ salt-master
+
+---
+ salt/utils/minions.py | 1 -
+ 1 file changed, 1 deletion(-)
+
+diff --git a/salt/utils/minions.py b/salt/utils/minions.py
+index 21d34b7edd..082b698522 100644
+--- a/salt/utils/minions.py
++++ b/salt/utils/minions.py
+@@ -9,7 +9,6 @@ import logging
+ import os
+ import re
+ 
+-import salt.auth.ldap
+ import salt.cache
+ import salt.payload
+ import salt.roster
+-- 
+2.45.0
+
diff --git a/salt.changes b/salt.changes
index ac201a8..89f0333 100644
--- a/salt.changes
+++ b/salt.changes
@@ -1,3 +1,35 @@
+-------------------------------------------------------------------
+Mon May 27 11:07:26 UTC 2024 - Pablo Suárez Hernández <pablo.suarezhernandez@suse.com>
+
+- Several fixes for tests to avoid errors and failures in some OSes
+- Speed up salt.matcher.confirm_top by using __context__
+- Do not call the async wrapper calls with the separate thread
+- Prevent OOM with high amount of batch async calls (bsc#1216063)
+- Add missing contextvars dependency in salt.version
+- Skip tests for unsupported algorithm on old OpenSSL version
+- Remove redundant `_file_find` call to the master
+- Prevent possible exception in tornado.concurrent.Future._set_done
+- Make reactor engine less blocking the EventPublisher
+- Make salt-master self recoverable on killing EventPublisher
+- Improve broken events catching and reporting
+- Make logging calls lighter
+- Remove unused import causing delays on starting salt-master
+
+- Added:
+  * improve-broken-events-catching-and-reporting.patch
+  * add-missing-contextvars-dependency-in-salt.version.patch
+  * prevent-oom-with-high-amount-of-batch-async-calls-bs.patch
+  * speed-up-salt.matcher.confirm_top-by-using-__context.patch
+  * remove-redundant-_file_find-call-to-the-master.patch
+  * make-logging-calls-lighter.patch
+  * make-salt-master-self-recoverable-on-killing-eventpu.patch
+  * skip-tests-for-unsupported-algorithm-on-old-openssl-.patch
+  * remove-unused-import-causing-delays-on-starting-salt.patch
+  * do-not-call-the-async-wrapper-calls-with-the-separat.patch
+  * prevent-possible-exception-in-tornado.concurrent.fut.patch
+  * several-fixes-for-tests-to-avoid-errors-and-failures.patch
+  * make-reactor-engine-less-blocking-the-eventpublisher.patch
+
 -------------------------------------------------------------------
 Mon May 13 15:26:19 UTC 2024 - Pablo Suárez Hernández <pablo.suarezhernandez@suse.com>
 
diff --git a/salt.spec b/salt.spec
index 6595c06..5ecdc12 100644
--- a/salt.spec
+++ b/salt.spec
@@ -356,7 +356,7 @@ Patch101:       fix-problematic-tests-and-allow-smooth-tests-executi.patch
 # PATCH-FIX_OPENSUSE: https://github.com/openSUSE/salt/pull/628
 Patch102:       make-importing-seco.range-thread-safe-bsc-1211649.patch
 # PATCH-FIX_UPSTREAM https://github.com/saltstack/salt/pull/66130
-PAtch103:       fix-tests-failures-and-errors-when-detected-on-vm-ex.patch
+Patch103:       fix-tests-failures-and-errors-when-detected-on-vm-ex.patch
 # PATCH-FIX_UPSTREAM https://github.com/saltstack/salt/pull/66234 (modified at Patch106)
 Patch104:       decode-oscap-byte-stream-to-string-bsc-1219001.patch
 ### Commits to make Salt compatible with Python 3.11 (and 3.6)
@@ -370,6 +370,32 @@ Patch104:       decode-oscap-byte-stream-to-string-bsc-1219001.patch
 Patch105:       fix-salt-warnings-and-testuite-for-python-3.11-635.patch
 # PATCH-FIX_OPENSUSE: https://github.com/openSUSE/salt/pull/639
 Patch106:       switch-oscap-encoding-to-utf-8-639.patch
+# PATCH-FIX_UPSTREAM https://github.com/saltstack/salt/pull/65982
+Patch107:       remove-unused-import-causing-delays-on-starting-salt.patch
+# PATCH-FIX_UPSTREAM https://github.com/saltstack/salt/pull/66024
+Patch108:       make-logging-calls-lighter.patch
+# PATCH-FIX_UPSTREAM https://github.com/saltstack/salt/pull/66034
+Patch109:       improve-broken-events-catching-and-reporting.patch
+# PATCH-FIX_OPENSUSE: https://github.com/openSUSE/salt/pull/633
+Patch110:       make-salt-master-self-recoverable-on-killing-eventpu.patch
+# PATCH-FIX_UPSTREAM https://github.com/saltstack/salt/pull/66158
+Patch111:       make-reactor-engine-less-blocking-the-eventpublisher.patch
+# PATCH-FIX_OPENSUSE: https://github.com/openSUSE/salt/pull/638
+Patch112:       prevent-possible-exception-in-tornado.concurrent.fut.patch
+# PATCH-FIX_UPSTREAM https://github.com/saltstack/salt/pull/66455
+Patch113:       remove-redundant-_file_find-call-to-the-master.patch
+# PATCH-FIX_OPENSUSE: https://github.com/openSUSE/salt/pull/646
+Patch114:       skip-tests-for-unsupported-algorithm-on-old-openssl-.patch
+# PATCH-FIX_OPENSUSE: https://github.com/openSUSE/salt/pull/652
+Patch115:       add-missing-contextvars-dependency-in-salt.version.patch
+# PATCH-FIX_OPENSUSE: https://github.com/openSUSE/salt/pull/640
+Patch116:       prevent-oom-with-high-amount-of-batch-async-calls-bs.patch
+# PATCH-FIX_UPSTREAM https://github.com/saltstack/salt/pull/65983
+Patch117:       do-not-call-the-async-wrapper-calls-with-the-separat.patch
+# PATCH-FIX_UPSTREAM https://github.com/saltstack/salt/pull/66494
+Patch118:       speed-up-salt.matcher.confirm_top-by-using-__context.patch
+# PATCH-FIX_UPSTREAM https://github.com/saltstack/salt/pull/66593
+Patch119:       several-fixes-for-tests-to-avoid-errors-and-failures.patch
 
 ### IMPORTANT: The line below is used as a snippet marker. Do not touch it.
 ### SALT PATCHES LIST END
diff --git a/several-fixes-for-tests-to-avoid-errors-and-failures.patch b/several-fixes-for-tests-to-avoid-errors-and-failures.patch
new file mode 100644
index 0000000..2272f9c
--- /dev/null
+++ b/several-fixes-for-tests-to-avoid-errors-and-failures.patch
@@ -0,0 +1,557 @@
+From 30fd274d606b565a0a63fbc7f2fd67aec3c495b1 Mon Sep 17 00:00:00 2001
+From: =?UTF-8?q?Pablo=20Su=C3=A1rez=20Hern=C3=A1ndez?=
+ <psuarezhernandez@suse.com>
+Date: Mon, 27 May 2024 12:01:53 +0100
+Subject: [PATCH] Several fixes for tests to avoid errors and failures
+ in some OSes
+
+* test_pip_state: skip tests which requires virtualenv CLI
+
+* test_syndic_eauth: skip when using an incompatible Docker version
+
+* test_pip: skip tests which requires virtualenv CLI
+
+* Some more extra fixes for tests
+
+* Enhance paths to look for 'sftp-server'
+
+* Do trigger errors on newer docker-py version
+
+* Make test_search_not_found to not fail on transactional systems
+
+* Add `@pytest.mark.flaky_jail` to `tests/pytests/integration/ssh/test_ssh_setup.py::test_setup`
+
+Signed-off-by: Pedro Algarvio <palgarvio@vmware.com>
+
+* Prevent crashing if mountpoint does not exist anymore
+
+* Skip failing tests on transactional systems
+
+* test_consul.py: do not produce error if consul is not available
+
+* Redefine max retries for some flaky tests
+
+* test_virt.py: skip as CI containers are not compatible with Salt 3006
+
+* test_schema.py: Adjust expectations to newer jsonschema versions
+
+* Apply suggestions from code review
+
+Co-authored-by: Pedro Algarvio <pedro@algarvio.me>
+
+---------
+
+Signed-off-by: Pedro Algarvio <palgarvio@vmware.com>
+Co-authored-by: Pedro Algarvio <palgarvio@vmware.com>
+Co-authored-by: Pedro Algarvio <pedro@algarvio.me>
+---
+ salt/modules/status.py                        | 11 +++---
+ tests/conftest.py                             |  2 +
+ tests/integration/modules/test_pip.py         |  4 ++
+ tests/pytests/functional/cache/test_consul.py |  4 ++
+ tests/pytests/functional/modules/test_pip.py  |  1 +
+ .../functional/states/test_pip_state.py       | 11 ++++++
+ tests/pytests/functional/states/test_pkg.py   |  4 ++
+ .../integration/cli/test_syndic_eauth.py      |  4 +-
+ .../integration/daemons/test_memory_leak.py   |  2 +-
+ .../integration/modules/test_cmdmod.py        |  1 +
+ .../pytests/integration/modules/test_virt.py  |  4 ++
+ .../integration/ssh/test_pre_flight.py        |  4 ++
+ .../pytests/integration/ssh/test_ssh_setup.py |  1 +
+ tests/pytests/scenarios/setup/test_man.py     |  6 +++
+ .../unit/modules/dockermod/test_module.py     | 20 +++++-----
+ tests/unit/modules/test_zypperpkg.py          |  1 +
+ tests/unit/utils/test_schema.py               | 37 +++++++++++++++----
+ 17 files changed, 92 insertions(+), 25 deletions(-)
+
+diff --git a/salt/modules/status.py b/salt/modules/status.py
+index 4b0a3b0d400..33e5d7b8df5 100644
+--- a/salt/modules/status.py
++++ b/salt/modules/status.py
+@@ -1052,11 +1052,12 @@ def diskusage(*args):
+     # query the filesystems disk usage
+     ret = {}
+     for path in selected:
+-        fsstats = os.statvfs(path)
+-        blksz = fsstats.f_bsize
+-        available = fsstats.f_bavail * blksz
+-        total = fsstats.f_blocks * blksz
+-        ret[path] = {"available": available, "total": total}
++        if os.path.exists(path):
++            fsstats = os.statvfs(path)
++            blksz = fsstats.f_bsize
++            available = fsstats.f_bavail * blksz
++            total = fsstats.f_blocks * blksz
++            ret[path] = {"available": available, "total": total}
+     return ret
+ 
+ 
+diff --git a/tests/conftest.py b/tests/conftest.py
+index a7777c2cea6..ad57b4adef4 100644
+--- a/tests/conftest.py
++++ b/tests/conftest.py
+@@ -1428,6 +1428,8 @@ def sshd_server(salt_factories, sshd_config_dir, salt_master, grains):
+         "/usr/libexec/openssh/sftp-server",
+         # Arch Linux
+         "/usr/lib/ssh/sftp-server",
++        # openSUSE Tumbleweed and SL Micro 6.0
++        "/usr/libexec/ssh/sftp-server",
+     ]
+     sftp_server_path = None
+     for path in sftp_server_paths:
+diff --git a/tests/integration/modules/test_pip.py b/tests/integration/modules/test_pip.py
+index 83457b467c8..d57e9cd2aea 100644
+--- a/tests/integration/modules/test_pip.py
++++ b/tests/integration/modules/test_pip.py
+@@ -557,6 +557,10 @@ class PipModuleTest(ModuleCase):
+     @pytest.mark.skip_initial_gh_actions_failure(
+         reason="This was skipped on older golden images and is failing on newer."
+     )
++    @pytest.mark.skipif(
++        bool(salt.utils.path.which("transactional-update")),
++        reason="Skipping on transactional systems",
++    )
+     def test_system_pip3(self):
+ 
+         self.run_function(
+diff --git a/tests/pytests/functional/cache/test_consul.py b/tests/pytests/functional/cache/test_consul.py
+index 30dc6925f26..996a1e932b6 100644
+--- a/tests/pytests/functional/cache/test_consul.py
++++ b/tests/pytests/functional/cache/test_consul.py
+@@ -11,6 +11,10 @@ import salt.loader
+ from salt.utils.versions import Version
+ from tests.pytests.functional.cache.helpers import run_common_cache_tests
+ 
++pytest.importorskip(
++    "consul",
++    reason="Please install python-consul package to use consul data cache driver",
++)
+ docker = pytest.importorskip("docker")
+ 
+ log = logging.getLogger(__name__)
+diff --git a/tests/pytests/functional/modules/test_pip.py b/tests/pytests/functional/modules/test_pip.py
+index 9b87735b68f..e04baa7c43f 100644
+--- a/tests/pytests/functional/modules/test_pip.py
++++ b/tests/pytests/functional/modules/test_pip.py
+@@ -22,6 +22,7 @@ from tests.support.helpers import VirtualEnv
+ )
+ @pytest.mark.requires_network
+ @pytest.mark.slow_test
++@pytest.mark.skip_if_binaries_missing("virtualenv", reason="Needs virtualenv binary")
+ def test_list_available_packages(modules, pip_version, tmp_path):
+     with VirtualEnv(venv_dir=tmp_path, pip_requirement=pip_version) as virtualenv:
+         virtualenv.install("-U", pip_version)
+diff --git a/tests/pytests/functional/states/test_pip_state.py b/tests/pytests/functional/states/test_pip_state.py
+index 3fc6ac7a1df..1921751b5dc 100644
+--- a/tests/pytests/functional/states/test_pip_state.py
++++ b/tests/pytests/functional/states/test_pip_state.py
+@@ -94,6 +94,7 @@ def test_pip_installed_removed(modules, states):
+ 
+ 
+ @pytest.mark.slow_test
++@pytest.mark.skip_if_binaries_missing("virtualenv", reason="Needs virtualenv binary")
+ def test_pip_installed_removed_venv(tmp_path, create_virtualenv, states):
+     venv_dir = tmp_path / "pip_installed_removed"
+     create_virtualenv(str(venv_dir))
+@@ -105,6 +106,7 @@ def test_pip_installed_removed_venv(tmp_path, create_virtualenv, states):
+ 
+ 
+ @pytest.mark.slow_test
++@pytest.mark.skip_if_binaries_missing("virtualenv", reason="Needs virtualenv binary")
+ def test_pip_installed_errors(tmp_path, modules, state_tree):
+     venv_dir = tmp_path / "pip-installed-errors"
+     # Since we don't have the virtualenv created, pip.installed will
+@@ -141,6 +143,7 @@ pep8-pip:
+                 assert state_return.result is True
+ 
+ 
++@pytest.mark.skip_if_binaries_missing("virtualenv", reason="Needs virtualenv binary")
+ def test_pip_installed_name_test_mode(tmp_path, create_virtualenv, states):
+     """
+     Test pip.installed state while test=true
+@@ -154,6 +157,7 @@ def test_pip_installed_name_test_mode(tmp_path, create_virtualenv, states):
+     assert name in ret.comment
+ 
+ 
++@pytest.mark.skip_if_binaries_missing("virtualenv", reason="Needs virtualenv binary")
+ def test_pip_installed_pkgs_test_mode(tmp_path, create_virtualenv, states):
+     """
+     Test pip.installed state while test=true
+@@ -168,6 +172,7 @@ def test_pip_installed_pkgs_test_mode(tmp_path, create_virtualenv, states):
+ 
+ 
+ @pytest.mark.slow_test
++@pytest.mark.skip_if_binaries_missing("virtualenv", reason="Needs virtualenv binary")
+ def test_issue_2028_pip_installed_state(
+     tmp_path, modules, state_tree, get_python_executable
+ ):
+@@ -226,6 +231,7 @@ pep8-pip:
+ 
+ 
+ @pytest.mark.slow_test
++@pytest.mark.skip_if_binaries_missing("virtualenv", reason="Needs virtualenv binary")
+ def test_issue_2087_missing_pip(tmp_path, create_virtualenv, modules):
+     venv_dir = tmp_path / "issue-2087-missing-pip"
+ 
+@@ -271,6 +277,7 @@ pip.installed:
+ @pytest.mark.destructive_test
+ @pytest.mark.slow_test
+ @pytest.mark.skip_if_not_root
++@pytest.mark.skip_if_binaries_missing("virtualenv", reason="Needs virtualenv binary")
+ def test_issue_6912_wrong_owner(tmp_path, create_virtualenv, modules, states):
+     # Setup virtual environment directory to be used throughout the test
+     venv_dir = tmp_path / "6912-wrong-owner"
+@@ -338,6 +345,7 @@ def test_issue_6912_wrong_owner(tmp_path, create_virtualenv, modules, states):
+ @pytest.mark.skip_on_darwin(reason="Test is flaky on macosx")
+ @pytest.mark.slow_test
+ @pytest.mark.skip_if_not_root
++@pytest.mark.skip_if_binaries_missing("virtualenv", reason="Needs virtualenv binary")
+ def test_issue_6912_wrong_owner_requirements_file(
+     tmp_path, create_virtualenv, state_tree, modules, states
+ ):
+@@ -409,6 +417,7 @@ def test_issue_6912_wrong_owner_requirements_file(
+ 
+ @pytest.mark.destructive_test
+ @pytest.mark.slow_test
++@pytest.mark.skip_if_binaries_missing("virtualenv", reason="Needs virtualenv binary")
+ def test_issue_6833_pip_upgrade_pip(tmp_path, create_virtualenv, modules, states):
+     # Create the testing virtualenv
+     if sys.platform == "win32":
+@@ -465,6 +474,7 @@ def test_issue_6833_pip_upgrade_pip(tmp_path, create_virtualenv, modules, states
+ 
+ 
+ @pytest.mark.slow_test
++@pytest.mark.skip_if_binaries_missing("virtualenv", reason="Needs virtualenv binary")
+ def test_pip_installed_specific_env(
+     tmp_path, state_tree_prod, states, create_virtualenv
+ ):
+@@ -514,6 +524,7 @@ def test_pip_installed_specific_env(
+ 
+ 
+ @pytest.mark.slow_test
++@pytest.mark.skip_if_binaries_missing("virtualenv", reason="Needs virtualenv binary")
+ def test_22359_pip_installed_unless_does_not_trigger_warnings(
+     create_virtualenv, tmp_path, states
+ ):
+diff --git a/tests/pytests/functional/states/test_pkg.py b/tests/pytests/functional/states/test_pkg.py
+index 864c1d025f3..9e5a8350ad9 100644
+--- a/tests/pytests/functional/states/test_pkg.py
++++ b/tests/pytests/functional/states/test_pkg.py
+@@ -19,6 +19,10 @@ pytestmark = [
+     pytest.mark.slow_test,
+     pytest.mark.skip_if_not_root,
+     pytest.mark.destructive_test,
++    pytest.mark.skipif(
++        bool(salt.utils.path.which("transactional-update")),
++        reason="Skipping on transactional systems",
++    ),
+ ]
+ 
+ 
+diff --git a/tests/pytests/integration/cli/test_syndic_eauth.py b/tests/pytests/integration/cli/test_syndic_eauth.py
+index 218022b9e3c..8dcdd3fbd28 100644
+--- a/tests/pytests/integration/cli/test_syndic_eauth.py
++++ b/tests/pytests/integration/cli/test_syndic_eauth.py
+@@ -6,7 +6,9 @@ import time
+ 
+ import pytest
+ 
+-docker = pytest.importorskip("docker")
++from tests.conftest import CODE_DIR
++
++docker = pytest.importorskip("docker", minversion="4.0.0")
+ 
+ INSIDE_CONTAINER = os.getenv("HOSTNAME", "") == "salt-test-container"
+ 
+diff --git a/tests/pytests/integration/daemons/test_memory_leak.py b/tests/pytests/integration/daemons/test_memory_leak.py
+index 8157091c44e..f2c5307f1a5 100644
+--- a/tests/pytests/integration/daemons/test_memory_leak.py
++++ b/tests/pytests/integration/daemons/test_memory_leak.py
+@@ -49,7 +49,7 @@ def file_add_delete_sls(testfile_path, base_env_state_tree_root_dir):
+ 
+ @pytest.mark.skip_on_darwin(reason="MacOS is a spawning platform, won't work")
+ @pytest.mark.skipif(GITHUB_ACTIONS, reason="Test is failing in GitHub Actions")
+-@pytest.mark.flaky(max_runs=4)
++@pytest.mark.flaky(max_runs=10)
+ def test_memory_leak(salt_cli, salt_minion, file_add_delete_sls):
+     max_usg = None
+ 
+diff --git a/tests/pytests/integration/modules/test_cmdmod.py b/tests/pytests/integration/modules/test_cmdmod.py
+index d9c326c3f0a..d0b993ddbcf 100644
+--- a/tests/pytests/integration/modules/test_cmdmod.py
++++ b/tests/pytests/integration/modules/test_cmdmod.py
+@@ -63,6 +63,7 @@ def test_avoid_injecting_shell_code_as_root(
+ 
+ 
+ @pytest.mark.slow_test
++@pytest.mark.flaky(max_runs=4)
+ def test_blacklist_glob(salt_call_cli):
+     """
+     cmd_blacklist_glob
+diff --git a/tests/pytests/integration/modules/test_virt.py b/tests/pytests/integration/modules/test_virt.py
+index 572923764bb..a1cd577fe76 100644
+--- a/tests/pytests/integration/modules/test_virt.py
++++ b/tests/pytests/integration/modules/test_virt.py
+@@ -26,6 +26,10 @@ pytestmark = [
+         Version(docker.__version__) < Version("4.0.0"),
+         reason="Test does not work in this version of docker-py",
+     ),
++    pytest.mark.skipif(
++        salt.version.__saltstack_version__.major <= 3006,
++        reason="CI containers are not compatible with this Salt version",
++    ),
+ ]
+ 
+ 
+diff --git a/tests/pytests/integration/ssh/test_pre_flight.py b/tests/pytests/integration/ssh/test_pre_flight.py
+index 09c65d29430..ac32b8d90fd 100644
+--- a/tests/pytests/integration/ssh/test_pre_flight.py
++++ b/tests/pytests/integration/ssh/test_pre_flight.py
+@@ -235,6 +235,10 @@ def demote(user_uid, user_gid):
+ 
+ 
+ @pytest.mark.slow_test
++@pytest.mark.skipif(
++    bool(salt.utils.path.which("transactional-update")),
++    reason="Skipping on transactional systems",
++)
+ def test_ssh_pre_flight_perms(salt_ssh_cli, caplog, _create_roster, account):
+     """
+     Test to ensure standard user cannot run pre flight script
+diff --git a/tests/pytests/integration/ssh/test_ssh_setup.py b/tests/pytests/integration/ssh/test_ssh_setup.py
+index 97494bed36b..3b4cede85f8 100644
+--- a/tests/pytests/integration/ssh/test_ssh_setup.py
++++ b/tests/pytests/integration/ssh/test_ssh_setup.py
+@@ -161,6 +161,7 @@ def salt_ssh_cli(
+     )
+ 
+ 
++@pytest.mark.flaky_jail
+ def test_setup(salt_ssh_cli, ssh_container_name, ssh_sub_container_name, ssh_password):
+     """
+     Test salt-ssh setup works
+diff --git a/tests/pytests/scenarios/setup/test_man.py b/tests/pytests/scenarios/setup/test_man.py
+index 28f0d6285a3..f10fe711f2d 100644
+--- a/tests/pytests/scenarios/setup/test_man.py
++++ b/tests/pytests/scenarios/setup/test_man.py
+@@ -9,6 +9,9 @@ import pytest
+ 
+ import salt.utils.platform
+ from salt.modules.virtualenv_mod import KNOWN_BINARY_NAMES
++from tests.conftest import CODE_DIR
++
++MISSING_SETUP_PY_FILE = not CODE_DIR.joinpath("setup.py").exists()
+ 
+ pytestmark = [
+     pytest.mark.core_test,
+@@ -16,6 +19,9 @@ pytestmark = [
+     pytest.mark.skip_on_aix,
+     pytest.mark.skip_initial_onedir_failure,
+     pytest.mark.skip_if_binaries_missing(*KNOWN_BINARY_NAMES, check_all=False),
++    pytest.mark.skipif(
++        MISSING_SETUP_PY_FILE, reason="This test only work if setup.py is available"
++    ),
+ ]
+ 
+ 
+diff --git a/tests/pytests/unit/modules/dockermod/test_module.py b/tests/pytests/unit/modules/dockermod/test_module.py
+index 1ac7dff52a5..a87d1add167 100644
+--- a/tests/pytests/unit/modules/dockermod/test_module.py
++++ b/tests/pytests/unit/modules/dockermod/test_module.py
+@@ -357,7 +357,7 @@ def test_update_mine():
+ 
+ 
+ @pytest.mark.skipif(
+-    docker_mod.docker.version_info < (1, 5, 0),
++    docker_mod._get_docker_py_versioninfo() < (1, 5, 0),
+     reason="docker module must be installed to run this test or is too old. >=1.5.0",
+ )
+ def test_list_networks():
+@@ -381,7 +381,7 @@ def test_list_networks():
+ 
+ 
+ @pytest.mark.skipif(
+-    docker_mod.docker.version_info < (1, 5, 0),
++    docker_mod._get_docker_py_versioninfo() < (1, 5, 0),
+     reason="docker module must be installed to run this test or is too old. >=1.5.0",
+ )
+ def test_create_network():
+@@ -425,7 +425,7 @@ def test_create_network():
+ 
+ 
+ @pytest.mark.skipif(
+-    docker_mod.docker.version_info < (1, 5, 0),
++    docker_mod._get_docker_py_versioninfo() < (1, 5, 0),
+     reason="docker module must be installed to run this test or is too old. >=1.5.0",
+ )
+ def test_remove_network():
+@@ -447,7 +447,7 @@ def test_remove_network():
+ 
+ 
+ @pytest.mark.skipif(
+-    docker_mod.docker.version_info < (1, 5, 0),
++    docker_mod._get_docker_py_versioninfo() < (1, 5, 0),
+     reason="docker module must be installed to run this test or is too old. >=1.5.0",
+ )
+ def test_inspect_network():
+@@ -469,7 +469,7 @@ def test_inspect_network():
+ 
+ 
+ @pytest.mark.skipif(
+-    docker_mod.docker.version_info < (1, 5, 0),
++    docker_mod._get_docker_py_versioninfo() < (1, 5, 0),
+     reason="docker module must be installed to run this test or is too old. >=1.5.0",
+ )
+ def test_connect_container_to_network():
+@@ -494,7 +494,7 @@ def test_connect_container_to_network():
+ 
+ 
+ @pytest.mark.skipif(
+-    docker_mod.docker.version_info < (1, 5, 0),
++    docker_mod._get_docker_py_versioninfo() < (1, 5, 0),
+     reason="docker module must be installed to run this test or is too old. >=1.5.0",
+ )
+ def test_disconnect_container_from_network():
+@@ -516,7 +516,7 @@ def test_disconnect_container_from_network():
+ 
+ 
+ @pytest.mark.skipif(
+-    docker_mod.docker.version_info < (1, 5, 0),
++    docker_mod._get_docker_py_versioninfo() < (1, 5, 0),
+     reason="docker module must be installed to run this test or is too old. >=1.5.0",
+ )
+ def test_list_volumes():
+@@ -542,7 +542,7 @@ def test_list_volumes():
+ 
+ 
+ @pytest.mark.skipif(
+-    docker_mod.docker.version_info < (1, 5, 0),
++    docker_mod._get_docker_py_versioninfo() < (1, 5, 0),
+     reason="docker module must be installed to run this test or is too old. >=1.5.0",
+ )
+ def test_create_volume():
+@@ -572,7 +572,7 @@ def test_create_volume():
+ 
+ 
+ @pytest.mark.skipif(
+-    docker_mod.docker.version_info < (1, 5, 0),
++    docker_mod._get_docker_py_versioninfo() < (1, 5, 0),
+     reason="docker module must be installed to run this test or is too old. >=1.5.0",
+ )
+ def test_remove_volume():
+@@ -594,7 +594,7 @@ def test_remove_volume():
+ 
+ 
+ @pytest.mark.skipif(
+-    docker_mod.docker.version_info < (1, 5, 0),
++    docker_mod._get_docker_py_versioninfo() < (1, 5, 0),
+     reason="docker module must be installed to run this test or is too old. >=1.5.0",
+ )
+ def test_inspect_volume():
+diff --git a/tests/unit/modules/test_zypperpkg.py b/tests/unit/modules/test_zypperpkg.py
+index 6e5ca88895f..bd67b16745c 100644
+--- a/tests/unit/modules/test_zypperpkg.py
++++ b/tests/unit/modules/test_zypperpkg.py
+@@ -2602,6 +2602,7 @@ pattern() = package-c"""
+             zypp_mock.assert_called_with(root=None, ignore_not_found=True)
+             xml_mock.nolock.noraise.xml.call.assert_called_with("search", "emacs")
+ 
++    @patch("salt.utils.files.is_fcntl_available", MagicMock(return_value=False))
+     def test_search_not_found(self):
+         """Test zypperpkg.search()"""
+         ret = {
+diff --git a/tests/unit/utils/test_schema.py b/tests/unit/utils/test_schema.py
+index 113c6836e07..a531dd93111 100644
+--- a/tests/unit/utils/test_schema.py
++++ b/tests/unit/utils/test_schema.py
+@@ -531,7 +531,9 @@ class ConfigTestCase(TestCase):
+             jsonschema.validate(
+                 {"personal_access_token": "foo"}, Requirements.serialize()
+             )
+-        if JSONSCHEMA_VERSION >= Version("3.0.0"):
++        if JSONSCHEMA_VERSION >= Version("3.0.0") and JSONSCHEMA_VERSION < Version(
++            "4.8.0"
++        ):
+             self.assertIn(
+                 "'ssh_key_file' is a required property", excinfo.exception.message
+             )
+@@ -899,13 +901,20 @@ class ConfigTestCase(TestCase):
+         except jsonschema.exceptions.ValidationError as exc:
+             self.fail("ValidationError raised: {}".format(exc))
+ 
+-        with self.assertRaises(jsonschema.exceptions.ValidationError) as excinfo:
++        if JSONSCHEMA_VERSION < Version("4.0.0"):
++            with self.assertRaises(jsonschema.exceptions.ValidationError) as excinfo:
++                jsonschema.validate(
++                    {"item": "3"},
++                    TestConf.serialize(),
++                    format_checker=jsonschema.FormatChecker(),
++                )
++            self.assertIn("is not a", excinfo.exception.message)
++        else:
+             jsonschema.validate(
+                 {"item": "3"},
+                 TestConf.serialize(),
+                 format_checker=jsonschema.FormatChecker(),
+             )
+-        self.assertIn("is not a", excinfo.exception.message)
+ 
+     def test_datetime_config(self):
+         item = schema.DateTimeItem(title="Foo", description="Foo Item")
+@@ -1878,7 +1887,9 @@ class ConfigTestCase(TestCase):
+             jsonschema.validate(
+                 {"item": {"sides": "4", "color": "blue"}}, TestConf.serialize()
+             )
+-        if JSONSCHEMA_VERSION >= Version("3.0.0"):
++        if JSONSCHEMA_VERSION >= Version("3.0.0") and JSONSCHEMA_VERSION < Version(
++            "4.8.0"
++        ):
+             self.assertIn("'4'", excinfo.exception.message)
+             self.assertIn("is not of type", excinfo.exception.message)
+             self.assertIn("'boolean'", excinfo.exception.message)
+@@ -2003,7 +2014,9 @@ class ConfigTestCase(TestCase):
+ 
+         with self.assertRaises(jsonschema.exceptions.ValidationError) as excinfo:
+             jsonschema.validate({"item": ["maybe"]}, TestConf.serialize())
+-        if JSONSCHEMA_VERSION >= Version("3.0.0"):
++        if JSONSCHEMA_VERSION >= Version("3.0.0") and JSONSCHEMA_VERSION < Version(
++            "4.8.0"
++        ):
+             self.assertIn("'maybe'", excinfo.exception.message)
+             self.assertIn("is not one of", excinfo.exception.message)
+             self.assertIn("'yes'", excinfo.exception.message)
+@@ -2067,7 +2080,9 @@ class ConfigTestCase(TestCase):
+ 
+         with self.assertRaises(jsonschema.exceptions.ValidationError) as excinfo:
+             jsonschema.validate({"item": ["maybe"]}, TestConf.serialize())
+-        if JSONSCHEMA_VERSION >= Version("3.0.0"):
++        if JSONSCHEMA_VERSION >= Version("3.0.0") and JSONSCHEMA_VERSION < Version(
++            "4.8.0"
++        ):
+             self.assertIn("'maybe'", excinfo.exception.message)
+             self.assertIn("is not one of", excinfo.exception.message)
+             self.assertIn("'yes'", excinfo.exception.message)
+@@ -2154,11 +2169,17 @@ class ConfigTestCase(TestCase):
+ 
+         with self.assertRaises(jsonschema.exceptions.ValidationError) as excinfo:
+             jsonschema.validate({"item": [True]}, TestConf.serialize())
+-        self.assertIn("is not allowed for", excinfo.exception.message)
++        if JSONSCHEMA_VERSION >= Version("4.0.0"):
++            self.assertIn("should not be valid under", excinfo.exception.message)
++        else:
++            self.assertIn("is not allowed for", excinfo.exception.message)
+ 
+         with self.assertRaises(jsonschema.exceptions.ValidationError) as excinfo:
+             jsonschema.validate({"item": [False]}, TestConf.serialize())
+-        self.assertIn("is not allowed for", excinfo.exception.message)
++        if JSONSCHEMA_VERSION >= Version("4.0.0"):
++            self.assertIn("should not be valid under", excinfo.exception.message)
++        else:
++            self.assertIn("is not allowed for", excinfo.exception.message)
+ 
+     def test_item_name_override_class_attrname(self):
+         class TestConf(schema.Schema):
+-- 
+2.44.0
+
+
diff --git a/skip-tests-for-unsupported-algorithm-on-old-openssl-.patch b/skip-tests-for-unsupported-algorithm-on-old-openssl-.patch
new file mode 100644
index 0000000..126c0df
--- /dev/null
+++ b/skip-tests-for-unsupported-algorithm-on-old-openssl-.patch
@@ -0,0 +1,117 @@
+From d64311862c8cfdd7728aca504a22822df1b043c1 Mon Sep 17 00:00:00 2001
+From: Victor Zhestkov <vzhestkov@suse.com>
+Date: Wed, 15 May 2024 09:48:39 +0200
+Subject: [PATCH] Skip tests for unsupported algorithm on old OpenSSL
+ version
+
+---
+ .../functional/modules/test_x509_v2.py        | 51 +++++++++++++------
+ 1 file changed, 35 insertions(+), 16 deletions(-)
+
+diff --git a/tests/pytests/functional/modules/test_x509_v2.py b/tests/pytests/functional/modules/test_x509_v2.py
+index 8da31bed9d..c060ad2971 100644
+--- a/tests/pytests/functional/modules/test_x509_v2.py
++++ b/tests/pytests/functional/modules/test_x509_v2.py
+@@ -9,6 +9,7 @@ from salt.utils.odict import OrderedDict
+ try:
+     import cryptography
+     import cryptography.x509 as cx509
++    from cryptography.exceptions import UnsupportedAlgorithm
+     from cryptography.hazmat.primitives import hashes
+     from cryptography.hazmat.primitives.serialization import (
+         load_pem_private_key,
+@@ -678,7 +679,10 @@ def crl_revoked():
+ @pytest.mark.parametrize("algo", ["rsa", "ec", "ed25519", "ed448"])
+ def test_create_certificate_self_signed(x509, algo, request):
+     privkey = request.getfixturevalue(f"{algo}_privkey")
+-    res = x509.create_certificate(signing_private_key=privkey, CN="success")
++    try:
++        res = x509.create_certificate(signing_private_key=privkey, CN="success")
++    except UnsupportedAlgorithm:
++        pytest.skip(f"Algorithm '{algo}' is not supported on this OpenSSL version")
+     assert res.startswith("-----BEGIN CERTIFICATE-----")
+     cert = _get_cert(res)
+     assert cert.subject.rfc4514_string() == "CN=success"
+@@ -743,12 +747,15 @@ def test_create_certificate_raw(x509, rsa_privkey):
+ @pytest.mark.parametrize("algo", ["rsa", "ec", "ed25519", "ed448"])
+ def test_create_certificate_from_privkey(x509, ca_key, ca_cert, algo, request):
+     privkey = request.getfixturevalue(f"{algo}_privkey")
+-    res = x509.create_certificate(
+-        signing_cert=ca_cert,
+-        signing_private_key=ca_key,
+-        private_key=privkey,
+-        CN="success",
+-    )
++    try:
++        res = x509.create_certificate(
++            signing_cert=ca_cert,
++            signing_private_key=ca_key,
++            private_key=privkey,
++            CN="success",
++        )
++    except UnsupportedAlgorithm:
++        pytest.skip(f"Algorithm '{algo}' is not supported on this OpenSSL version")
+     assert res.startswith("-----BEGIN CERTIFICATE-----")
+     cert = _get_cert(res)
+     assert cert.subject.rfc4514_string() == "CN=success"
+@@ -788,12 +795,15 @@ def test_create_certificate_from_encrypted_privkey_with_encrypted_privkey(
+ @pytest.mark.parametrize("algo", ["rsa", "ec", "ed25519", "ed448"])
+ def test_create_certificate_from_pubkey(x509, ca_key, ca_cert, algo, request):
+     pubkey = request.getfixturevalue(f"{algo}_pubkey")
+-    res = x509.create_certificate(
+-        signing_cert=ca_cert,
+-        signing_private_key=ca_key,
+-        public_key=pubkey,
+-        CN="success",
+-    )
++    try:
++        res = x509.create_certificate(
++            signing_cert=ca_cert,
++            signing_private_key=ca_key,
++            public_key=pubkey,
++            CN="success",
++        )
++    except UnsupportedAlgorithm:
++        pytest.skip(f"Algorithm '{algo}' is not supported on this OpenSSL version")
+     assert res.startswith("-----BEGIN CERTIFICATE-----")
+     cert = _get_cert(res)
+     assert cert.subject.rfc4514_string() == "CN=success"
+@@ -1329,7 +1339,10 @@ def test_create_crl_raw(x509, crl_args):
+ @pytest.mark.parametrize("algo", ["rsa", "ec", "ed25519", "ed448"])
+ def test_create_csr(x509, algo, request):
+     privkey = request.getfixturevalue(f"{algo}_privkey")
+-    res = x509.create_csr(private_key=privkey)
++    try:
++        res = x509.create_csr(private_key=privkey)
++    except UnsupportedAlgorithm:
++        pytest.skip(f"Algorithm '{algo}' is not supported on this OpenSSL version")
+     assert res.startswith("-----BEGIN CERTIFICATE REQUEST-----")
+ 
+ 
+@@ -1444,7 +1457,10 @@ def test_create_private_key_raw(x509):
+ )
+ def test_get_private_key_size(x509, algo, expected, request):
+     privkey = request.getfixturevalue(f"{algo}_privkey")
+-    res = x509.get_private_key_size(privkey)
++    try:
++        res = x509.get_private_key_size(privkey)
++    except UnsupportedAlgorithm:
++        pytest.skip(f"Algorithm '{algo}' is not supported on this OpenSSL version")
+     assert res == expected
+ 
+ 
+@@ -1588,7 +1604,10 @@ def test_verify_private_key(x509, ca_key, ca_cert):
+ @pytest.mark.parametrize("algo", ["rsa", "ec", "ed25519", "ed448"])
+ def test_verify_signature(x509, algo, request):
+     wrong_privkey = request.getfixturevalue(f"{algo}_privkey")
+-    privkey = x509.create_private_key(algo=algo)
++    try:
++        privkey = x509.create_private_key(algo=algo)
++    except UnsupportedAlgorithm:
++        pytest.skip(f"Algorithm '{algo}' is not supported on this OpenSSL version")
+     cert = x509.create_certificate(signing_private_key=privkey)
+     assert x509.verify_signature(cert, privkey)
+     assert not x509.verify_signature(cert, wrong_privkey)
+-- 
+2.45.0
+
diff --git a/speed-up-salt.matcher.confirm_top-by-using-__context.patch b/speed-up-salt.matcher.confirm_top-by-using-__context.patch
new file mode 100644
index 0000000..b967b59
--- /dev/null
+++ b/speed-up-salt.matcher.confirm_top-by-using-__context.patch
@@ -0,0 +1,64 @@
+From a7e578b96d0e7ad8fdf4e5d62416ba6961b82315 Mon Sep 17 00:00:00 2001
+From: Victor Zhestkov <vzhestkov@suse.com>
+Date: Wed, 15 May 2024 11:50:52 +0200
+Subject: [PATCH] Speed up salt.matcher.confirm_top by using
+ __context__
+
+* Speed up salt.matcher.confirm_top by using __context__
+
+* Add test for getting matchers from __context__ in matchers.confirm_top
+---
+ salt/matchers/confirm_top.py                    |  6 +++++-
+ tests/pytests/unit/matchers/test_confirm_top.py | 15 +++++++++++++++
+ 2 files changed, 20 insertions(+), 1 deletion(-)
+
+diff --git a/salt/matchers/confirm_top.py b/salt/matchers/confirm_top.py
+index 7435f4ae94..d2edc99d8f 100644
+--- a/salt/matchers/confirm_top.py
++++ b/salt/matchers/confirm_top.py
+@@ -21,7 +21,11 @@ def confirm_top(match, data, nodegroups=None):
+             if "match" in item:
+                 matcher = item["match"]
+ 
+-    matchers = salt.loader.matchers(__opts__)
++    if "matchers" in __context__:
++        matchers = __context__["matchers"]
++    else:
++        matchers = salt.loader.matchers(__opts__)
++        __context__["matchers"] = matchers
+     funcname = matcher + "_match.match"
+     if matcher == "nodegroup":
+         return matchers[funcname](match, nodegroups)
+diff --git a/tests/pytests/unit/matchers/test_confirm_top.py b/tests/pytests/unit/matchers/test_confirm_top.py
+index 514df323b6..f439fcf94a 100644
+--- a/tests/pytests/unit/matchers/test_confirm_top.py
++++ b/tests/pytests/unit/matchers/test_confirm_top.py
+@@ -2,6 +2,7 @@ import pytest
+ 
+ import salt.config
+ import salt.loader
++from tests.support.mock import patch
+ 
+ 
+ @pytest.fixture
+@@ -12,3 +13,17 @@ def matchers(minion_opts):
+ def test_sanity(matchers):
+     match = matchers["confirm_top.confirm_top"]
+     assert match("*", []) is True
++
++
++@pytest.mark.parametrize("in_context", [False, True])
++def test_matchers_from_context(matchers, in_context):
++    match = matchers["confirm_top.confirm_top"]
++    with patch.dict(
++        matchers.pack["__context__"], {"matchers": matchers} if in_context else {}
++    ), patch("salt.loader.matchers", return_value=matchers) as loader_matchers:
++        assert match("*", []) is True
++        assert id(matchers.pack["__context__"]["matchers"]) == id(matchers)
++        if in_context:
++            loader_matchers.assert_not_called()
++        else:
++            loader_matchers.assert_called_once()
+-- 
+2.45.0
+