- Make _auth calls visible with master stats - Repair mount.fstab_present always returning pending changes - Set virtual grain in Podman systemd container - Fix crash due wrong client reference on `SaltMakoTemplateLookup` - Enhace batch async and fix some detected issues - Added: * repair-virt_query-outputter-655.patch * make-_auth-calls-visible-with-master-stats-696.patch * repair-fstab_present-test-mode-702.patch * set-virtual-grain-in-podman-systemd-container-703.patch * fixed-file-client-private-attribute-reference-on-sal.patch * backport-batch-async-fixes-and-improvements-701.patch OBS-URL: https://build.opensuse.org/package/show/systemsmanagement:saltstack/salt?expand=0&rev=273
337 lines
14 KiB
Diff
337 lines
14 KiB
Diff
From 4fe7231fa99de8edc848367386f1a6a5192a0f58 Mon Sep 17 00:00:00 2001
|
|
From: Victor Zhestkov <vzhestkov@suse.com>
|
|
Date: Fri, 21 Feb 2025 11:15:42 +0100
|
|
Subject: [PATCH] Backport batch async fixes and improvements (#701)
|
|
MIME-Version: 1.0
|
|
Content-Type: text/plain; charset=UTF-8
|
|
Content-Transfer-Encoding: 8bit
|
|
|
|
* Backport batch async fixes and improvements
|
|
|
|
Co-authored-by: Pablo Suárez Hernández <psuarezhernandez@suse.com>
|
|
|
|
* Align batch_async tests
|
|
|
|
---------
|
|
|
|
Co-authored-by: Pablo Suárez Hernández <psuarezhernandez@suse.com>
|
|
---
|
|
salt/cli/batch_async.py | 60 ++++++++++++++++-----
|
|
tests/pytests/unit/cli/test_batch_async.py | 63 ++++++++--------------
|
|
2 files changed, 69 insertions(+), 54 deletions(-)
|
|
|
|
diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py
|
|
index 5d49993faa..92215d0e04 100644
|
|
--- a/salt/cli/batch_async.py
|
|
+++ b/salt/cli/batch_async.py
|
|
@@ -35,7 +35,7 @@ def batch_async_required(opts, minions, extra):
|
|
Check opts to identify if batch async is required for the operation.
|
|
"""
|
|
if not isinstance(minions, list):
|
|
- False
|
|
+ return False
|
|
batch_async_opts = opts.get("batch_async", {})
|
|
batch_async_threshold = (
|
|
batch_async_opts.get("threshold", 1)
|
|
@@ -179,6 +179,7 @@ class SharedEventsChannel:
|
|
self._used_by.discard(subscriber_id)
|
|
|
|
def destroy_unused(self):
|
|
+ log.trace("SharedEventsChannel.destroy_unused called")
|
|
if self._used_by:
|
|
return False
|
|
self.master_event.remove_event_handler(self.__handle_event)
|
|
@@ -267,6 +268,7 @@ class BatchAsync:
|
|
self.ended = False
|
|
self.event = self.events_channel.master_event
|
|
self.scheduled = False
|
|
+ self._start_batch_on_timeout = None
|
|
|
|
def __set_event_handler(self):
|
|
self.events_channel.subscribe(
|
|
@@ -278,6 +280,8 @@ class BatchAsync:
|
|
|
|
@salt.ext.tornado.gen.coroutine
|
|
def __event_handler(self, tag, data, op):
|
|
+ # IMPORTANT: This function must run fast and not wait for any other task,
|
|
+ # otherwise it would cause events to be stuck.
|
|
if not self.event:
|
|
return
|
|
try:
|
|
@@ -285,7 +289,9 @@ class BatchAsync:
|
|
if op == "ping_return":
|
|
self.minions.add(minion)
|
|
if self.targeted_minions == self.minions:
|
|
- yield self.start_batch()
|
|
+ # call start_batch and do not wait for timeout as we received
|
|
+ # the responses from all the targets
|
|
+ self.io_loop.add_callback(self.start_batch)
|
|
elif op == "find_job_return":
|
|
if data.get("return", None):
|
|
self.find_job_returned.add(minion)
|
|
@@ -293,7 +299,8 @@ class BatchAsync:
|
|
if minion in self.active:
|
|
self.active.remove(minion)
|
|
self.done_minions.add(minion)
|
|
- yield self.schedule_next()
|
|
+ if not self.active:
|
|
+ self.io_loop.add_callback(self.schedule_next)
|
|
except Exception as ex: # pylint: disable=W0703
|
|
log.error(
|
|
"Exception occured while processing event: %s: %s",
|
|
@@ -333,7 +340,7 @@ class BatchAsync:
|
|
)
|
|
|
|
if timedout_minions:
|
|
- yield self.schedule_next()
|
|
+ self.io_loop.add_callback(self.schedule_next)
|
|
|
|
if self.event and running:
|
|
self.find_job_returned = self.find_job_returned.difference(running)
|
|
@@ -344,6 +351,9 @@ class BatchAsync:
|
|
"""
|
|
Find if the job was finished on the minions
|
|
"""
|
|
+ log.trace(
|
|
+ "[%s] BatchAsync.find_job called for minions: %s", self.batch_jid, minions
|
|
+ )
|
|
if not self.event:
|
|
return
|
|
not_done = minions.difference(self.done_minions).difference(
|
|
@@ -386,6 +396,7 @@ class BatchAsync:
|
|
if not self.event:
|
|
return
|
|
self.__set_event_handler()
|
|
+ # call test.ping for all the targets in async way
|
|
ping_return = yield self.events_channel.local_client.run_job_async(
|
|
self.opts["tgt"],
|
|
"test.ping",
|
|
@@ -398,19 +409,24 @@ class BatchAsync:
|
|
listen=False,
|
|
**self.eauth,
|
|
)
|
|
+ # ping_return contains actual targeted minions and no actual responses
|
|
+ # from the minions as it's async and intended to populate targeted_minions set
|
|
self.targeted_minions = set(ping_return["minions"])
|
|
- # start batching even if not all minions respond to ping
|
|
- yield salt.ext.tornado.gen.sleep(
|
|
- self.batch_presence_ping_timeout or self.opts["gather_job_timeout"]
|
|
+ # schedule start_batch to perform even if not all the minions responded
|
|
+ # self.__event_handler can push start_batch in case if all targets responded
|
|
+ self._start_batch_on_timeout = self.io_loop.call_later(
|
|
+ self.batch_presence_ping_timeout or self.opts["gather_job_timeout"],
|
|
+ self.start_batch,
|
|
)
|
|
- if self.event:
|
|
- yield self.start_batch()
|
|
|
|
@salt.ext.tornado.gen.coroutine
|
|
def start_batch(self):
|
|
"""
|
|
Fire `salt/batch/*/start` and continue batch with `run_next`
|
|
"""
|
|
+ if self._start_batch_on_timeout is not None:
|
|
+ self.io_loop.remove_timeout(self._start_batch_on_timeout)
|
|
+ self._start_batch_on_timeout = None
|
|
if self.initialized:
|
|
return
|
|
self.batch_size = get_bnum(self.opts, self.minions, True)
|
|
@@ -431,6 +447,7 @@ class BatchAsync:
|
|
"""
|
|
End the batch and call safe closing
|
|
"""
|
|
+ log.trace("[%s] BatchAsync.end_batch called", self.batch_jid)
|
|
left = self.minions.symmetric_difference(
|
|
self.done_minions.union(self.timedout_minions)
|
|
)
|
|
@@ -452,10 +469,11 @@ class BatchAsync:
|
|
|
|
# release to the IOLoop to allow the event to be published
|
|
# before closing batch async execution
|
|
- yield salt.ext.tornado.gen.sleep(1)
|
|
+ yield salt.ext.tornado.gen.sleep(0.03)
|
|
self.close_safe()
|
|
|
|
def close_safe(self):
|
|
+ log.trace("[%s] BatchAsync.close_safe called", self.batch_jid)
|
|
if self.events_channel is not None:
|
|
self.events_channel.unsubscribe(None, None, id(self))
|
|
self.events_channel.unuse(id(self))
|
|
@@ -465,11 +483,22 @@ class BatchAsync:
|
|
|
|
@salt.ext.tornado.gen.coroutine
|
|
def schedule_next(self):
|
|
+ log.trace("[%s] BatchAsync.schedule_next called", self.batch_jid)
|
|
if self.scheduled:
|
|
+ log.trace(
|
|
+ "[%s] BatchAsync.schedule_next -> Batch already scheduled, nothing to do.",
|
|
+ self.batch_jid,
|
|
+ )
|
|
return
|
|
self.scheduled = True
|
|
- # call later so that we maybe gather more returns
|
|
- yield salt.ext.tornado.gen.sleep(self.batch_delay)
|
|
+ if self._get_next():
|
|
+ # call later so that we maybe gather more returns
|
|
+ log.trace(
|
|
+ "[%s] BatchAsync.schedule_next delaying batch %s second(s).",
|
|
+ self.batch_jid,
|
|
+ self.batch_delay,
|
|
+ )
|
|
+ yield salt.ext.tornado.gen.sleep(self.batch_delay)
|
|
if self.event:
|
|
yield self.run_next()
|
|
|
|
@@ -480,6 +509,11 @@ class BatchAsync:
|
|
"""
|
|
self.scheduled = False
|
|
next_batch = self._get_next()
|
|
+ log.trace(
|
|
+ "[%s] BatchAsync.run_next called. Next Batch -> %s",
|
|
+ self.batch_jid,
|
|
+ next_batch,
|
|
+ )
|
|
if not next_batch:
|
|
yield self.end_batch()
|
|
return
|
|
@@ -504,7 +538,7 @@ class BatchAsync:
|
|
yield salt.ext.tornado.gen.sleep(self.opts["timeout"])
|
|
|
|
# The batch can be done already at this point, which means no self.event
|
|
- if self.event:
|
|
+ if self.event and self.active.intersection(next_batch):
|
|
yield self.find_job(set(next_batch))
|
|
except Exception as ex: # pylint: disable=W0703
|
|
log.error(
|
|
diff --git a/tests/pytests/unit/cli/test_batch_async.py b/tests/pytests/unit/cli/test_batch_async.py
|
|
index bc871aba54..be8de692e6 100644
|
|
--- a/tests/pytests/unit/cli/test_batch_async.py
|
|
+++ b/tests/pytests/unit/cli/test_batch_async.py
|
|
@@ -85,11 +85,17 @@ def test_batch_start_on_batch_presence_ping_timeout(batch):
|
|
future.set_result({})
|
|
with patch.object(batch, "events_channel", MagicMock()), patch(
|
|
"salt.ext.tornado.gen.sleep", return_value=future
|
|
- ), patch.object(batch, "start_batch", return_value=future) as start_batch_mock:
|
|
+ ), patch.object(batch, "io_loop", MagicMock()), patch.object(
|
|
+ batch, "start_batch", return_value=future
|
|
+ ) as start_batch_mock:
|
|
batch.events_channel.local_client.run_job_async.return_value = future_ret
|
|
ret = batch.start()
|
|
- # assert start_batch is called
|
|
- start_batch_mock.assert_called_once()
|
|
+ # start_batch is scheduled to be called later
|
|
+ assert batch.io_loop.call_later.call_args[0] == (
|
|
+ batch.batch_presence_ping_timeout,
|
|
+ batch.start_batch,
|
|
+ )
|
|
+ assert batch._start_batch_on_timeout is not None
|
|
# assert test.ping called
|
|
assert batch.events_channel.local_client.run_job_async.call_args[0] == (
|
|
"*",
|
|
@@ -109,16 +115,21 @@ def test_batch_start_on_gather_job_timeout(batch):
|
|
batch.batch_presence_ping_timeout = None
|
|
with patch.object(batch, "events_channel", MagicMock()), patch(
|
|
"salt.ext.tornado.gen.sleep", return_value=future
|
|
+ ), patch.object(batch, "io_loop", MagicMock()), patch.object(
|
|
+ batch, "start_batch", return_value=future
|
|
), patch.object(
|
|
batch, "start_batch", return_value=future
|
|
) as start_batch_mock, patch.object(
|
|
batch, "batch_presence_ping_timeout", None
|
|
):
|
|
batch.events_channel.local_client.run_job_async.return_value = future_ret
|
|
- # ret = batch_async.start(batch)
|
|
ret = batch.start()
|
|
- # assert start_batch is called
|
|
- start_batch_mock.assert_called_once()
|
|
+ # start_batch is scheduled to be called later
|
|
+ assert batch.io_loop.call_later.call_args[0] == (
|
|
+ batch.opts["gather_job_timeout"],
|
|
+ batch.start_batch,
|
|
+ )
|
|
+ assert batch._start_batch_on_timeout is not None
|
|
|
|
|
|
def test_batch_fire_start_event(batch):
|
|
@@ -271,34 +282,10 @@ def test_batch__event_handler_ping_return(batch):
|
|
assert batch.done_minions == set()
|
|
|
|
|
|
-def test_batch__event_handler_call_start_batch_when_all_pings_return(batch):
|
|
- batch.targeted_minions = {"foo"}
|
|
- future = salt.ext.tornado.gen.Future()
|
|
- future.set_result({})
|
|
- with patch.object(batch, "start_batch", return_value=future) as start_batch_mock:
|
|
- batch.start()
|
|
- batch._BatchAsync__event_handler(
|
|
- "salt/job/1234/ret/foo", {"id": "foo"}, "ping_return"
|
|
- )
|
|
- start_batch_mock.assert_called_once()
|
|
-
|
|
-
|
|
-def test_batch__event_handler_not_call_start_batch_when_not_all_pings_return(batch):
|
|
- batch.targeted_minions = {"foo", "bar"}
|
|
- future = salt.ext.tornado.gen.Future()
|
|
- future.set_result({})
|
|
- with patch.object(batch, "start_batch", return_value=future) as start_batch_mock:
|
|
- batch.start()
|
|
- batch._BatchAsync__event_handler(
|
|
- "salt/job/1234/ret/foo", {"id": "foo"}, "ping_return"
|
|
- )
|
|
- start_batch_mock.assert_not_called()
|
|
-
|
|
-
|
|
def test_batch__event_handler_batch_run_return(batch):
|
|
future = salt.ext.tornado.gen.Future()
|
|
future.set_result({})
|
|
- with patch.object(
|
|
+ with patch.object(batch, "io_loop", MagicMock()), patch.object(
|
|
batch, "schedule_next", return_value=future
|
|
) as schedule_next_mock:
|
|
batch.start()
|
|
@@ -308,7 +295,7 @@ def test_batch__event_handler_batch_run_return(batch):
|
|
)
|
|
assert batch.active == set()
|
|
assert batch.done_minions == {"foo"}
|
|
- schedule_next_mock.assert_called_once()
|
|
+ batch.io_loop.add_callback.call_args[0] == (batch.schedule_next)
|
|
|
|
|
|
def test_batch__event_handler_find_job_return(batch):
|
|
@@ -322,9 +309,7 @@ def test_batch__event_handler_find_job_return(batch):
|
|
def test_batch_run_next_end_batch_when_no_next(batch):
|
|
future = salt.ext.tornado.gen.Future()
|
|
future.set_result({})
|
|
- with patch.object(
|
|
- batch, "_get_next", return_value={}
|
|
- ), patch.object(
|
|
+ with patch.object(batch, "_get_next", return_value={}), patch.object(
|
|
batch, "end_batch", return_value=future
|
|
) as end_batch_mock:
|
|
batch.run_next()
|
|
@@ -337,9 +322,7 @@ def test_batch_find_job(batch):
|
|
batch.minions = {"foo", "bar"}
|
|
with patch("salt.ext.tornado.gen.sleep", return_value=future), patch.object(
|
|
batch, "check_find_job", return_value=future
|
|
- ) as check_find_job_mock, patch.object(
|
|
- batch, "jid_gen", return_value="1236"
|
|
- ):
|
|
+ ) as check_find_job_mock, patch.object(batch, "jid_gen", return_value="1236"):
|
|
batch.events_channel.local_client.run_job_async.return_value = future
|
|
batch.find_job({"foo", "bar"})
|
|
assert check_find_job_mock.call_args[0] == (
|
|
@@ -355,9 +338,7 @@ def test_batch_find_job_with_done_minions(batch):
|
|
batch.minions = {"foo", "bar"}
|
|
with patch("salt.ext.tornado.gen.sleep", return_value=future), patch.object(
|
|
batch, "check_find_job", return_value=future
|
|
- ) as check_find_job_mock, patch.object(
|
|
- batch, "jid_gen", return_value="1236"
|
|
- ):
|
|
+ ) as check_find_job_mock, patch.object(batch, "jid_gen", return_value="1236"):
|
|
batch.events_channel.local_client.run_job_async.return_value = future
|
|
batch.find_job({"foo", "bar"})
|
|
assert check_find_job_mock.call_args[0] == (
|
|
--
|
|
2.48.1
|
|
|