From 4fe7231fa99de8edc848367386f1a6a5192a0f58 Mon Sep 17 00:00:00 2001 From: Victor Zhestkov Date: Fri, 21 Feb 2025 11:15:42 +0100 Subject: [PATCH] Backport batch async fixes and improvements (#701) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Backport batch async fixes and improvements Co-authored-by: Pablo Suárez Hernández * Align batch_async tests --------- Co-authored-by: Pablo Suárez Hernández --- salt/cli/batch_async.py | 60 ++++++++++++++++----- tests/pytests/unit/cli/test_batch_async.py | 63 ++++++++-------------- 2 files changed, 69 insertions(+), 54 deletions(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index 5d49993faa..92215d0e04 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -35,7 +35,7 @@ def batch_async_required(opts, minions, extra): Check opts to identify if batch async is required for the operation. """ if not isinstance(minions, list): - False + return False batch_async_opts = opts.get("batch_async", {}) batch_async_threshold = ( batch_async_opts.get("threshold", 1) @@ -179,6 +179,7 @@ class SharedEventsChannel: self._used_by.discard(subscriber_id) def destroy_unused(self): + log.trace("SharedEventsChannel.destroy_unused called") if self._used_by: return False self.master_event.remove_event_handler(self.__handle_event) @@ -267,6 +268,7 @@ class BatchAsync: self.ended = False self.event = self.events_channel.master_event self.scheduled = False + self._start_batch_on_timeout = None def __set_event_handler(self): self.events_channel.subscribe( @@ -278,6 +280,8 @@ class BatchAsync: @salt.ext.tornado.gen.coroutine def __event_handler(self, tag, data, op): + # IMPORTANT: This function must run fast and not wait for any other task, + # otherwise it would cause events to be stuck. if not self.event: return try: @@ -285,7 +289,9 @@ class BatchAsync: if op == "ping_return": self.minions.add(minion) if self.targeted_minions == self.minions: - yield self.start_batch() + # call start_batch and do not wait for timeout as we received + # the responses from all the targets + self.io_loop.add_callback(self.start_batch) elif op == "find_job_return": if data.get("return", None): self.find_job_returned.add(minion) @@ -293,7 +299,8 @@ class BatchAsync: if minion in self.active: self.active.remove(minion) self.done_minions.add(minion) - yield self.schedule_next() + if not self.active: + self.io_loop.add_callback(self.schedule_next) except Exception as ex: # pylint: disable=W0703 log.error( "Exception occured while processing event: %s: %s", @@ -333,7 +340,7 @@ class BatchAsync: ) if timedout_minions: - yield self.schedule_next() + self.io_loop.add_callback(self.schedule_next) if self.event and running: self.find_job_returned = self.find_job_returned.difference(running) @@ -344,6 +351,9 @@ class BatchAsync: """ Find if the job was finished on the minions """ + log.trace( + "[%s] BatchAsync.find_job called for minions: %s", self.batch_jid, minions + ) if not self.event: return not_done = minions.difference(self.done_minions).difference( @@ -386,6 +396,7 @@ class BatchAsync: if not self.event: return self.__set_event_handler() + # call test.ping for all the targets in async way ping_return = yield self.events_channel.local_client.run_job_async( self.opts["tgt"], "test.ping", @@ -398,19 +409,24 @@ class BatchAsync: listen=False, **self.eauth, ) + # ping_return contains actual targeted minions and no actual responses + # from the minions as it's async and intended to populate targeted_minions set self.targeted_minions = set(ping_return["minions"]) - # start batching even if not all minions respond to ping - yield salt.ext.tornado.gen.sleep( - self.batch_presence_ping_timeout or self.opts["gather_job_timeout"] + # schedule start_batch to perform even if not all the minions responded + # self.__event_handler can push start_batch in case if all targets responded + self._start_batch_on_timeout = self.io_loop.call_later( + self.batch_presence_ping_timeout or self.opts["gather_job_timeout"], + self.start_batch, ) - if self.event: - yield self.start_batch() @salt.ext.tornado.gen.coroutine def start_batch(self): """ Fire `salt/batch/*/start` and continue batch with `run_next` """ + if self._start_batch_on_timeout is not None: + self.io_loop.remove_timeout(self._start_batch_on_timeout) + self._start_batch_on_timeout = None if self.initialized: return self.batch_size = get_bnum(self.opts, self.minions, True) @@ -431,6 +447,7 @@ class BatchAsync: """ End the batch and call safe closing """ + log.trace("[%s] BatchAsync.end_batch called", self.batch_jid) left = self.minions.symmetric_difference( self.done_minions.union(self.timedout_minions) ) @@ -452,10 +469,11 @@ class BatchAsync: # release to the IOLoop to allow the event to be published # before closing batch async execution - yield salt.ext.tornado.gen.sleep(1) + yield salt.ext.tornado.gen.sleep(0.03) self.close_safe() def close_safe(self): + log.trace("[%s] BatchAsync.close_safe called", self.batch_jid) if self.events_channel is not None: self.events_channel.unsubscribe(None, None, id(self)) self.events_channel.unuse(id(self)) @@ -465,11 +483,22 @@ class BatchAsync: @salt.ext.tornado.gen.coroutine def schedule_next(self): + log.trace("[%s] BatchAsync.schedule_next called", self.batch_jid) if self.scheduled: + log.trace( + "[%s] BatchAsync.schedule_next -> Batch already scheduled, nothing to do.", + self.batch_jid, + ) return self.scheduled = True - # call later so that we maybe gather more returns - yield salt.ext.tornado.gen.sleep(self.batch_delay) + if self._get_next(): + # call later so that we maybe gather more returns + log.trace( + "[%s] BatchAsync.schedule_next delaying batch %s second(s).", + self.batch_jid, + self.batch_delay, + ) + yield salt.ext.tornado.gen.sleep(self.batch_delay) if self.event: yield self.run_next() @@ -480,6 +509,11 @@ class BatchAsync: """ self.scheduled = False next_batch = self._get_next() + log.trace( + "[%s] BatchAsync.run_next called. Next Batch -> %s", + self.batch_jid, + next_batch, + ) if not next_batch: yield self.end_batch() return @@ -504,7 +538,7 @@ class BatchAsync: yield salt.ext.tornado.gen.sleep(self.opts["timeout"]) # The batch can be done already at this point, which means no self.event - if self.event: + if self.event and self.active.intersection(next_batch): yield self.find_job(set(next_batch)) except Exception as ex: # pylint: disable=W0703 log.error( diff --git a/tests/pytests/unit/cli/test_batch_async.py b/tests/pytests/unit/cli/test_batch_async.py index bc871aba54..be8de692e6 100644 --- a/tests/pytests/unit/cli/test_batch_async.py +++ b/tests/pytests/unit/cli/test_batch_async.py @@ -85,11 +85,17 @@ def test_batch_start_on_batch_presence_ping_timeout(batch): future.set_result({}) with patch.object(batch, "events_channel", MagicMock()), patch( "salt.ext.tornado.gen.sleep", return_value=future - ), patch.object(batch, "start_batch", return_value=future) as start_batch_mock: + ), patch.object(batch, "io_loop", MagicMock()), patch.object( + batch, "start_batch", return_value=future + ) as start_batch_mock: batch.events_channel.local_client.run_job_async.return_value = future_ret ret = batch.start() - # assert start_batch is called - start_batch_mock.assert_called_once() + # start_batch is scheduled to be called later + assert batch.io_loop.call_later.call_args[0] == ( + batch.batch_presence_ping_timeout, + batch.start_batch, + ) + assert batch._start_batch_on_timeout is not None # assert test.ping called assert batch.events_channel.local_client.run_job_async.call_args[0] == ( "*", @@ -109,16 +115,21 @@ def test_batch_start_on_gather_job_timeout(batch): batch.batch_presence_ping_timeout = None with patch.object(batch, "events_channel", MagicMock()), patch( "salt.ext.tornado.gen.sleep", return_value=future + ), patch.object(batch, "io_loop", MagicMock()), patch.object( + batch, "start_batch", return_value=future ), patch.object( batch, "start_batch", return_value=future ) as start_batch_mock, patch.object( batch, "batch_presence_ping_timeout", None ): batch.events_channel.local_client.run_job_async.return_value = future_ret - # ret = batch_async.start(batch) ret = batch.start() - # assert start_batch is called - start_batch_mock.assert_called_once() + # start_batch is scheduled to be called later + assert batch.io_loop.call_later.call_args[0] == ( + batch.opts["gather_job_timeout"], + batch.start_batch, + ) + assert batch._start_batch_on_timeout is not None def test_batch_fire_start_event(batch): @@ -271,34 +282,10 @@ def test_batch__event_handler_ping_return(batch): assert batch.done_minions == set() -def test_batch__event_handler_call_start_batch_when_all_pings_return(batch): - batch.targeted_minions = {"foo"} - future = salt.ext.tornado.gen.Future() - future.set_result({}) - with patch.object(batch, "start_batch", return_value=future) as start_batch_mock: - batch.start() - batch._BatchAsync__event_handler( - "salt/job/1234/ret/foo", {"id": "foo"}, "ping_return" - ) - start_batch_mock.assert_called_once() - - -def test_batch__event_handler_not_call_start_batch_when_not_all_pings_return(batch): - batch.targeted_minions = {"foo", "bar"} - future = salt.ext.tornado.gen.Future() - future.set_result({}) - with patch.object(batch, "start_batch", return_value=future) as start_batch_mock: - batch.start() - batch._BatchAsync__event_handler( - "salt/job/1234/ret/foo", {"id": "foo"}, "ping_return" - ) - start_batch_mock.assert_not_called() - - def test_batch__event_handler_batch_run_return(batch): future = salt.ext.tornado.gen.Future() future.set_result({}) - with patch.object( + with patch.object(batch, "io_loop", MagicMock()), patch.object( batch, "schedule_next", return_value=future ) as schedule_next_mock: batch.start() @@ -308,7 +295,7 @@ def test_batch__event_handler_batch_run_return(batch): ) assert batch.active == set() assert batch.done_minions == {"foo"} - schedule_next_mock.assert_called_once() + batch.io_loop.add_callback.call_args[0] == (batch.schedule_next) def test_batch__event_handler_find_job_return(batch): @@ -322,9 +309,7 @@ def test_batch__event_handler_find_job_return(batch): def test_batch_run_next_end_batch_when_no_next(batch): future = salt.ext.tornado.gen.Future() future.set_result({}) - with patch.object( - batch, "_get_next", return_value={} - ), patch.object( + with patch.object(batch, "_get_next", return_value={}), patch.object( batch, "end_batch", return_value=future ) as end_batch_mock: batch.run_next() @@ -337,9 +322,7 @@ def test_batch_find_job(batch): batch.minions = {"foo", "bar"} with patch("salt.ext.tornado.gen.sleep", return_value=future), patch.object( batch, "check_find_job", return_value=future - ) as check_find_job_mock, patch.object( - batch, "jid_gen", return_value="1236" - ): + ) as check_find_job_mock, patch.object(batch, "jid_gen", return_value="1236"): batch.events_channel.local_client.run_job_async.return_value = future batch.find_job({"foo", "bar"}) assert check_find_job_mock.call_args[0] == ( @@ -355,9 +338,7 @@ def test_batch_find_job_with_done_minions(batch): batch.minions = {"foo", "bar"} with patch("salt.ext.tornado.gen.sleep", return_value=future), patch.object( batch, "check_find_job", return_value=future - ) as check_find_job_mock, patch.object( - batch, "jid_gen", return_value="1236" - ): + ) as check_find_job_mock, patch.object(batch, "jid_gen", return_value="1236"): batch.events_channel.local_client.run_job_async.return_value = future batch.find_job({"foo", "bar"}) assert check_find_job_mock.call_args[0] == ( -- 2.48.1