salt/backport-batch-async-fixes-and-improvements-701.patch
Victor Zhestkov c7249d56b9 - Fix virt_query outputter and add support for block devices
- Make _auth calls visible with master stats
- Repair mount.fstab_present always returning pending changes
- Set virtual grain in Podman systemd container
- Fix crash due wrong client reference on `SaltMakoTemplateLookup`
- Enhace batch async and fix some detected issues
- Added:
  * repair-virt_query-outputter-655.patch
  * make-_auth-calls-visible-with-master-stats-696.patch
  * repair-fstab_present-test-mode-702.patch
  * set-virtual-grain-in-podman-systemd-container-703.patch
  * fixed-file-client-private-attribute-reference-on-sal.patch
  * backport-batch-async-fixes-and-improvements-701.patch

OBS-URL: https://build.opensuse.org/package/show/systemsmanagement:saltstack/salt?expand=0&rev=273
2025-02-21 13:06:26 +00:00

337 lines
14 KiB
Diff

From 4fe7231fa99de8edc848367386f1a6a5192a0f58 Mon Sep 17 00:00:00 2001
From: Victor Zhestkov <vzhestkov@suse.com>
Date: Fri, 21 Feb 2025 11:15:42 +0100
Subject: [PATCH] Backport batch async fixes and improvements (#701)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
* Backport batch async fixes and improvements
Co-authored-by: Pablo Suárez Hernández <psuarezhernandez@suse.com>
* Align batch_async tests
---------
Co-authored-by: Pablo Suárez Hernández <psuarezhernandez@suse.com>
---
salt/cli/batch_async.py | 60 ++++++++++++++++-----
tests/pytests/unit/cli/test_batch_async.py | 63 ++++++++--------------
2 files changed, 69 insertions(+), 54 deletions(-)
diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py
index 5d49993faa..92215d0e04 100644
--- a/salt/cli/batch_async.py
+++ b/salt/cli/batch_async.py
@@ -35,7 +35,7 @@ def batch_async_required(opts, minions, extra):
Check opts to identify if batch async is required for the operation.
"""
if not isinstance(minions, list):
- False
+ return False
batch_async_opts = opts.get("batch_async", {})
batch_async_threshold = (
batch_async_opts.get("threshold", 1)
@@ -179,6 +179,7 @@ class SharedEventsChannel:
self._used_by.discard(subscriber_id)
def destroy_unused(self):
+ log.trace("SharedEventsChannel.destroy_unused called")
if self._used_by:
return False
self.master_event.remove_event_handler(self.__handle_event)
@@ -267,6 +268,7 @@ class BatchAsync:
self.ended = False
self.event = self.events_channel.master_event
self.scheduled = False
+ self._start_batch_on_timeout = None
def __set_event_handler(self):
self.events_channel.subscribe(
@@ -278,6 +280,8 @@ class BatchAsync:
@salt.ext.tornado.gen.coroutine
def __event_handler(self, tag, data, op):
+ # IMPORTANT: This function must run fast and not wait for any other task,
+ # otherwise it would cause events to be stuck.
if not self.event:
return
try:
@@ -285,7 +289,9 @@ class BatchAsync:
if op == "ping_return":
self.minions.add(minion)
if self.targeted_minions == self.minions:
- yield self.start_batch()
+ # call start_batch and do not wait for timeout as we received
+ # the responses from all the targets
+ self.io_loop.add_callback(self.start_batch)
elif op == "find_job_return":
if data.get("return", None):
self.find_job_returned.add(minion)
@@ -293,7 +299,8 @@ class BatchAsync:
if minion in self.active:
self.active.remove(minion)
self.done_minions.add(minion)
- yield self.schedule_next()
+ if not self.active:
+ self.io_loop.add_callback(self.schedule_next)
except Exception as ex: # pylint: disable=W0703
log.error(
"Exception occured while processing event: %s: %s",
@@ -333,7 +340,7 @@ class BatchAsync:
)
if timedout_minions:
- yield self.schedule_next()
+ self.io_loop.add_callback(self.schedule_next)
if self.event and running:
self.find_job_returned = self.find_job_returned.difference(running)
@@ -344,6 +351,9 @@ class BatchAsync:
"""
Find if the job was finished on the minions
"""
+ log.trace(
+ "[%s] BatchAsync.find_job called for minions: %s", self.batch_jid, minions
+ )
if not self.event:
return
not_done = minions.difference(self.done_minions).difference(
@@ -386,6 +396,7 @@ class BatchAsync:
if not self.event:
return
self.__set_event_handler()
+ # call test.ping for all the targets in async way
ping_return = yield self.events_channel.local_client.run_job_async(
self.opts["tgt"],
"test.ping",
@@ -398,19 +409,24 @@ class BatchAsync:
listen=False,
**self.eauth,
)
+ # ping_return contains actual targeted minions and no actual responses
+ # from the minions as it's async and intended to populate targeted_minions set
self.targeted_minions = set(ping_return["minions"])
- # start batching even if not all minions respond to ping
- yield salt.ext.tornado.gen.sleep(
- self.batch_presence_ping_timeout or self.opts["gather_job_timeout"]
+ # schedule start_batch to perform even if not all the minions responded
+ # self.__event_handler can push start_batch in case if all targets responded
+ self._start_batch_on_timeout = self.io_loop.call_later(
+ self.batch_presence_ping_timeout or self.opts["gather_job_timeout"],
+ self.start_batch,
)
- if self.event:
- yield self.start_batch()
@salt.ext.tornado.gen.coroutine
def start_batch(self):
"""
Fire `salt/batch/*/start` and continue batch with `run_next`
"""
+ if self._start_batch_on_timeout is not None:
+ self.io_loop.remove_timeout(self._start_batch_on_timeout)
+ self._start_batch_on_timeout = None
if self.initialized:
return
self.batch_size = get_bnum(self.opts, self.minions, True)
@@ -431,6 +447,7 @@ class BatchAsync:
"""
End the batch and call safe closing
"""
+ log.trace("[%s] BatchAsync.end_batch called", self.batch_jid)
left = self.minions.symmetric_difference(
self.done_minions.union(self.timedout_minions)
)
@@ -452,10 +469,11 @@ class BatchAsync:
# release to the IOLoop to allow the event to be published
# before closing batch async execution
- yield salt.ext.tornado.gen.sleep(1)
+ yield salt.ext.tornado.gen.sleep(0.03)
self.close_safe()
def close_safe(self):
+ log.trace("[%s] BatchAsync.close_safe called", self.batch_jid)
if self.events_channel is not None:
self.events_channel.unsubscribe(None, None, id(self))
self.events_channel.unuse(id(self))
@@ -465,11 +483,22 @@ class BatchAsync:
@salt.ext.tornado.gen.coroutine
def schedule_next(self):
+ log.trace("[%s] BatchAsync.schedule_next called", self.batch_jid)
if self.scheduled:
+ log.trace(
+ "[%s] BatchAsync.schedule_next -> Batch already scheduled, nothing to do.",
+ self.batch_jid,
+ )
return
self.scheduled = True
- # call later so that we maybe gather more returns
- yield salt.ext.tornado.gen.sleep(self.batch_delay)
+ if self._get_next():
+ # call later so that we maybe gather more returns
+ log.trace(
+ "[%s] BatchAsync.schedule_next delaying batch %s second(s).",
+ self.batch_jid,
+ self.batch_delay,
+ )
+ yield salt.ext.tornado.gen.sleep(self.batch_delay)
if self.event:
yield self.run_next()
@@ -480,6 +509,11 @@ class BatchAsync:
"""
self.scheduled = False
next_batch = self._get_next()
+ log.trace(
+ "[%s] BatchAsync.run_next called. Next Batch -> %s",
+ self.batch_jid,
+ next_batch,
+ )
if not next_batch:
yield self.end_batch()
return
@@ -504,7 +538,7 @@ class BatchAsync:
yield salt.ext.tornado.gen.sleep(self.opts["timeout"])
# The batch can be done already at this point, which means no self.event
- if self.event:
+ if self.event and self.active.intersection(next_batch):
yield self.find_job(set(next_batch))
except Exception as ex: # pylint: disable=W0703
log.error(
diff --git a/tests/pytests/unit/cli/test_batch_async.py b/tests/pytests/unit/cli/test_batch_async.py
index bc871aba54..be8de692e6 100644
--- a/tests/pytests/unit/cli/test_batch_async.py
+++ b/tests/pytests/unit/cli/test_batch_async.py
@@ -85,11 +85,17 @@ def test_batch_start_on_batch_presence_ping_timeout(batch):
future.set_result({})
with patch.object(batch, "events_channel", MagicMock()), patch(
"salt.ext.tornado.gen.sleep", return_value=future
- ), patch.object(batch, "start_batch", return_value=future) as start_batch_mock:
+ ), patch.object(batch, "io_loop", MagicMock()), patch.object(
+ batch, "start_batch", return_value=future
+ ) as start_batch_mock:
batch.events_channel.local_client.run_job_async.return_value = future_ret
ret = batch.start()
- # assert start_batch is called
- start_batch_mock.assert_called_once()
+ # start_batch is scheduled to be called later
+ assert batch.io_loop.call_later.call_args[0] == (
+ batch.batch_presence_ping_timeout,
+ batch.start_batch,
+ )
+ assert batch._start_batch_on_timeout is not None
# assert test.ping called
assert batch.events_channel.local_client.run_job_async.call_args[0] == (
"*",
@@ -109,16 +115,21 @@ def test_batch_start_on_gather_job_timeout(batch):
batch.batch_presence_ping_timeout = None
with patch.object(batch, "events_channel", MagicMock()), patch(
"salt.ext.tornado.gen.sleep", return_value=future
+ ), patch.object(batch, "io_loop", MagicMock()), patch.object(
+ batch, "start_batch", return_value=future
), patch.object(
batch, "start_batch", return_value=future
) as start_batch_mock, patch.object(
batch, "batch_presence_ping_timeout", None
):
batch.events_channel.local_client.run_job_async.return_value = future_ret
- # ret = batch_async.start(batch)
ret = batch.start()
- # assert start_batch is called
- start_batch_mock.assert_called_once()
+ # start_batch is scheduled to be called later
+ assert batch.io_loop.call_later.call_args[0] == (
+ batch.opts["gather_job_timeout"],
+ batch.start_batch,
+ )
+ assert batch._start_batch_on_timeout is not None
def test_batch_fire_start_event(batch):
@@ -271,34 +282,10 @@ def test_batch__event_handler_ping_return(batch):
assert batch.done_minions == set()
-def test_batch__event_handler_call_start_batch_when_all_pings_return(batch):
- batch.targeted_minions = {"foo"}
- future = salt.ext.tornado.gen.Future()
- future.set_result({})
- with patch.object(batch, "start_batch", return_value=future) as start_batch_mock:
- batch.start()
- batch._BatchAsync__event_handler(
- "salt/job/1234/ret/foo", {"id": "foo"}, "ping_return"
- )
- start_batch_mock.assert_called_once()
-
-
-def test_batch__event_handler_not_call_start_batch_when_not_all_pings_return(batch):
- batch.targeted_minions = {"foo", "bar"}
- future = salt.ext.tornado.gen.Future()
- future.set_result({})
- with patch.object(batch, "start_batch", return_value=future) as start_batch_mock:
- batch.start()
- batch._BatchAsync__event_handler(
- "salt/job/1234/ret/foo", {"id": "foo"}, "ping_return"
- )
- start_batch_mock.assert_not_called()
-
-
def test_batch__event_handler_batch_run_return(batch):
future = salt.ext.tornado.gen.Future()
future.set_result({})
- with patch.object(
+ with patch.object(batch, "io_loop", MagicMock()), patch.object(
batch, "schedule_next", return_value=future
) as schedule_next_mock:
batch.start()
@@ -308,7 +295,7 @@ def test_batch__event_handler_batch_run_return(batch):
)
assert batch.active == set()
assert batch.done_minions == {"foo"}
- schedule_next_mock.assert_called_once()
+ batch.io_loop.add_callback.call_args[0] == (batch.schedule_next)
def test_batch__event_handler_find_job_return(batch):
@@ -322,9 +309,7 @@ def test_batch__event_handler_find_job_return(batch):
def test_batch_run_next_end_batch_when_no_next(batch):
future = salt.ext.tornado.gen.Future()
future.set_result({})
- with patch.object(
- batch, "_get_next", return_value={}
- ), patch.object(
+ with patch.object(batch, "_get_next", return_value={}), patch.object(
batch, "end_batch", return_value=future
) as end_batch_mock:
batch.run_next()
@@ -337,9 +322,7 @@ def test_batch_find_job(batch):
batch.minions = {"foo", "bar"}
with patch("salt.ext.tornado.gen.sleep", return_value=future), patch.object(
batch, "check_find_job", return_value=future
- ) as check_find_job_mock, patch.object(
- batch, "jid_gen", return_value="1236"
- ):
+ ) as check_find_job_mock, patch.object(batch, "jid_gen", return_value="1236"):
batch.events_channel.local_client.run_job_async.return_value = future
batch.find_job({"foo", "bar"})
assert check_find_job_mock.call_args[0] == (
@@ -355,9 +338,7 @@ def test_batch_find_job_with_done_minions(batch):
batch.minions = {"foo", "bar"}
with patch("salt.ext.tornado.gen.sleep", return_value=future), patch.object(
batch, "check_find_job", return_value=future
- ) as check_find_job_mock, patch.object(
- batch, "jid_gen", return_value="1236"
- ):
+ ) as check_find_job_mock, patch.object(batch, "jid_gen", return_value="1236"):
batch.events_channel.local_client.run_job_async.return_value = future
batch.find_job({"foo", "bar"})
assert check_find_job_mock.call_args[0] == (
--
2.48.1