From 85b8666b138cab170327f0217c799277371b2e80 Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Tue, 7 May 2019 12:24:35 +0200 Subject: [PATCH] Fix async-batch multiple done events --- salt/cli/batch_async.py | 19 ++++++++++++------- tests/unit/cli/test_batch_async.py | 20 +++++++++++++------- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index b0ab9d9f47..7225491228 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -86,6 +86,7 @@ class BatchAsync: io_loop=ioloop, keep_loop=True, ) + self.scheduled = False def __set_event_handler(self): ping_return_pattern = "salt/job/{}/ret/*".format(self.ping_jid) @@ -118,10 +119,7 @@ class BatchAsync: if minion in self.active: self.active.remove(minion) self.done_minions.add(minion) - # call later so that we maybe gather more returns - self.event.io_loop.call_later( - self.batch_delay, self.schedule_next - ) + self.schedule_next() def _get_next(self): to_run = ( @@ -146,7 +144,7 @@ class BatchAsync: self.timedout_minions ) if timedout_minions: - self.event.io_loop.call_later(self.batch_delay, self.schedule_next) + self.schedule_next() if running: self.event.io_loop.add_callback(self.find_job, running) @@ -197,7 +195,7 @@ class BatchAsync: "metadata": self.metadata, } self.event.fire_event(data, "salt/batch/{}/start".format(self.batch_jid)) - yield self.schedule_next() + yield self.run_next() def end_batch(self): left = self.minions.symmetric_difference( @@ -214,8 +212,14 @@ class BatchAsync: self.event.fire_event(data, "salt/batch/{}/done".format(self.batch_jid)) self.event.remove_event_handler(self.__event_handler) - @tornado.gen.coroutine def schedule_next(self): + if not self.scheduled: + self.scheduled = True + # call later so that we maybe gather more returns + self.event.io_loop.call_later(self.batch_delay, self.run_next) + + @tornado.gen.coroutine + def run_next(self): next_batch = self._get_next() if next_batch: self.active = self.active.union(next_batch) @@ -238,3 +242,4 @@ class BatchAsync: self.active = self.active.difference(next_batch) else: self.end_batch() + self.scheduled = False diff --git a/tests/unit/cli/test_batch_async.py b/tests/unit/cli/test_batch_async.py index d6a4bfcf60..66332a548a 100644 --- a/tests/unit/cli/test_batch_async.py +++ b/tests/unit/cli/test_batch_async.py @@ -105,14 +105,14 @@ class AsyncBatchTestCase(AsyncTestCase, TestCase): @tornado.testing.gen_test def test_start_batch_calls_next(self): - self.batch.schedule_next = MagicMock(return_value=MagicMock()) + self.batch.run_next = MagicMock(return_value=MagicMock()) self.batch.event = MagicMock() future = tornado.gen.Future() future.set_result(None) - self.batch.schedule_next = MagicMock(return_value=future) + self.batch.run_next = MagicMock(return_value=future) self.batch.start_batch() self.assertEqual(self.batch.initialized, True) - self.assertEqual(len(self.batch.schedule_next.mock_calls), 1) + self.assertEqual(len(self.batch.run_next.mock_calls), 1) def test_batch_fire_done_event(self): self.batch.targeted_minions = {"foo", "baz", "bar"} @@ -147,7 +147,7 @@ class AsyncBatchTestCase(AsyncTestCase, TestCase): future = tornado.gen.Future() future.set_result({"minions": ["foo", "bar"]}) self.batch.local.run_job_async.return_value = future - ret = self.batch.schedule_next().result() + ret = self.batch.run_next().result() self.assertEqual( self.batch.local.run_job_async.call_args[0], ({"foo", "bar"}, "my.fun", [], "list"), @@ -250,7 +250,7 @@ class AsyncBatchTestCase(AsyncTestCase, TestCase): self.assertEqual(self.batch.done_minions, {"foo"}) self.assertEqual( self.batch.event.io_loop.call_later.call_args[0], - (self.batch.batch_delay, self.batch.schedule_next), + (self.batch.batch_delay, self.batch.run_next), ) def test_batch__event_handler_find_job_return(self): @@ -262,10 +262,10 @@ class AsyncBatchTestCase(AsyncTestCase, TestCase): self.assertEqual(self.batch.find_job_returned, {"foo"}) @tornado.testing.gen_test - def test_batch_schedule_next_end_batch_when_no_next(self): + def test_batch_run_next_end_batch_when_no_next(self): self.batch.end_batch = MagicMock() self.batch._get_next = MagicMock(return_value={}) - self.batch.schedule_next() + self.batch.run_next() self.assertEqual(len(self.batch.end_batch.mock_calls), 1) @tornado.testing.gen_test @@ -345,3 +345,9 @@ class AsyncBatchTestCase(AsyncTestCase, TestCase): self.batch.event.io_loop.add_callback.call_args[0], (self.batch.find_job, {"foo"}), ) + + def test_only_on_run_next_is_scheduled(self): + self.batch.event = MagicMock() + self.batch.scheduled = True + self.batch.schedule_next() + self.assertEqual(len(self.batch.event.io_loop.call_later.mock_calls), 0) -- 2.29.2