From 4b3badeb52a9de10d6085ee3cc7598a827d1e68f Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Thu, 11 Apr 2019 15:57:59 +0200 Subject: [PATCH] Fix async batch race conditions Close batching when there is no next batch --- salt/cli/batch_async.py | 96 +++++++++++++++--------------- tests/unit/cli/test_batch_async.py | 38 +++++------- 2 files changed, 62 insertions(+), 72 deletions(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index 1557e5105b..b0ab9d9f47 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -32,14 +32,14 @@ class BatchAsync: - tag: salt/batch//start - data: { "available_minions": self.minions, - "down_minions": self.down_minions + "down_minions": targeted_minions - presence_ping_minions } When the batch ends, an `done` event is fired: - tag: salt/batch//done - data: { "available_minions": self.minions, - "down_minions": self.down_minions, + "down_minions": targeted_minions - presence_ping_minions "done_minions": self.done_minions, "timedout_minions": self.timedout_minions } @@ -68,7 +68,7 @@ class BatchAsync: self.eauth = batch_get_eauth(clear_load["kwargs"]) self.metadata = clear_load["kwargs"].get("metadata", {}) self.minions = set() - self.down_minions = set() + self.targeted_minions = set() self.timedout_minions = set() self.done_minions = set() self.active = set() @@ -110,8 +110,7 @@ class BatchAsync: minion = data["id"] if op == "ping_return": self.minions.add(minion) - self.down_minions.remove(minion) - if not self.down_minions: + if self.targeted_minions == self.minions: self.event.io_loop.spawn_callback(self.start_batch) elif op == "find_job_return": self.find_job_returned.add(minion) @@ -124,11 +123,6 @@ class BatchAsync: self.batch_delay, self.schedule_next ) - if self.initialized and self.done_minions == self.minions.difference( - self.timedout_minions - ): - self.end_batch() - def _get_next(self): to_run = ( self.minions.difference(self.done_minions) @@ -142,20 +136,17 @@ class BatchAsync: return set(list(to_run)[:next_batch_size]) @tornado.gen.coroutine - def check_find_job(self, minions): - did_not_return = minions.difference(self.find_job_returned) - if did_not_return: - for minion in did_not_return: - if minion in self.find_job_returned: - self.find_job_returned.remove(minion) - if minion in self.active: - self.active.remove(minion) - self.timedout_minions.add(minion) - running = ( - minions.difference(did_not_return) - .difference(self.done_minions) - .difference(self.timedout_minions) + def check_find_job(self, batch_minions): + timedout_minions = batch_minions.difference(self.find_job_returned).difference( + self.done_minions ) + self.timedout_minions = self.timedout_minions.union(timedout_minions) + self.active = self.active.difference(self.timedout_minions) + running = batch_minions.difference(self.done_minions).difference( + self.timedout_minions + ) + if timedout_minions: + self.event.io_loop.call_later(self.batch_delay, self.schedule_next) if running: self.event.io_loop.add_callback(self.find_job, running) @@ -193,7 +184,7 @@ class BatchAsync: metadata=self.metadata, **self.eauth ) - self.down_minions = set(ping_return["minions"]) + self.targeted_minions = set(ping_return["minions"]) @tornado.gen.coroutine def start_batch(self): @@ -202,39 +193,48 @@ class BatchAsync: self.initialized = True data = { "available_minions": self.minions, - "down_minions": self.down_minions, + "down_minions": self.targeted_minions.difference(self.minions), "metadata": self.metadata, } self.event.fire_event(data, "salt/batch/{}/start".format(self.batch_jid)) yield self.schedule_next() def end_batch(self): - data = { - "available_minions": self.minions, - "down_minions": self.down_minions, - "done_minions": self.done_minions, - "timedout_minions": self.timedout_minions, - "metadata": self.metadata, - } - self.event.fire_event(data, "salt/batch/{}/done".format(self.batch_jid)) - self.event.remove_event_handler(self.__event_handler) + left = self.minions.symmetric_difference( + self.done_minions.union(self.timedout_minions) + ) + if not left: + data = { + "available_minions": self.minions, + "down_minions": self.targeted_minions.difference(self.minions), + "done_minions": self.done_minions, + "timedout_minions": self.timedout_minions, + "metadata": self.metadata, + } + self.event.fire_event(data, "salt/batch/{}/done".format(self.batch_jid)) + self.event.remove_event_handler(self.__event_handler) @tornado.gen.coroutine def schedule_next(self): next_batch = self._get_next() if next_batch: - yield self.local.run_job_async( - next_batch, - self.opts["fun"], - self.opts["arg"], - "list", - raw=self.opts.get("raw", False), - ret=self.opts.get("return", ""), - gather_job_timeout=self.opts["gather_job_timeout"], - jid=self.batch_jid, - metadata=self.metadata, - ) - self.event.io_loop.call_later( - self.opts["timeout"], self.find_job, set(next_batch) - ) self.active = self.active.union(next_batch) + try: + yield self.local.run_job_async( + next_batch, + self.opts["fun"], + self.opts["arg"], + "list", + raw=self.opts.get("raw", False), + ret=self.opts.get("return", ""), + gather_job_timeout=self.opts["gather_job_timeout"], + jid=self.batch_jid, + metadata=self.metadata, + ) + self.event.io_loop.call_later( + self.opts["timeout"], self.find_job, set(next_batch) + ) + except Exception as ex: + self.active = self.active.difference(next_batch) + else: + self.end_batch() diff --git a/tests/unit/cli/test_batch_async.py b/tests/unit/cli/test_batch_async.py index 3f8626a2dd..d6a4bfcf60 100644 --- a/tests/unit/cli/test_batch_async.py +++ b/tests/unit/cli/test_batch_async.py @@ -68,8 +68,8 @@ class AsyncBatchTestCase(AsyncTestCase, TestCase): self.assertEqual( self.batch.local.run_job_async.call_args[0], ("*", "test.ping", [], "glob") ) - # assert down_minions == all minions matched by tgt - self.assertEqual(self.batch.down_minions, {"foo", "bar"}) + # assert targeted_minions == all minions matched by tgt + self.assertEqual(self.batch.targeted_minions, {"foo", "bar"}) @tornado.testing.gen_test def test_batch_start_on_gather_job_timeout(self): @@ -115,7 +115,10 @@ class AsyncBatchTestCase(AsyncTestCase, TestCase): self.assertEqual(len(self.batch.schedule_next.mock_calls), 1) def test_batch_fire_done_event(self): + self.batch.targeted_minions = {"foo", "baz", "bar"} self.batch.minions = {"foo", "bar"} + self.batch.done_minions = {"foo"} + self.batch.timedout_minions = {"bar"} self.batch.event = MagicMock() self.batch.metadata = {"mykey": "myvalue"} self.batch.end_batch() @@ -124,9 +127,9 @@ class AsyncBatchTestCase(AsyncTestCase, TestCase): ( { "available_minions": {"foo", "bar"}, - "done_minions": set(), - "down_minions": set(), - "timedout_minions": set(), + "done_minions": self.batch.done_minions, + "down_minions": {"baz"}, + "timedout_minions": self.batch.timedout_minions, "metadata": self.batch.metadata, }, "salt/batch/1235/done", @@ -205,7 +208,7 @@ class AsyncBatchTestCase(AsyncTestCase, TestCase): self.assertEqual(self.batch._get_next(), set()) def test_batch__event_handler_ping_return(self): - self.batch.down_minions = {"foo"} + self.batch.targeted_minions = {"foo"} self.batch.event = MagicMock( unpack=MagicMock(return_value=("salt/job/1234/ret/foo", {"id": "foo"})) ) @@ -216,7 +219,7 @@ class AsyncBatchTestCase(AsyncTestCase, TestCase): self.assertEqual(self.batch.done_minions, set()) def test_batch__event_handler_call_start_batch_when_all_pings_return(self): - self.batch.down_minions = {"foo"} + self.batch.targeted_minions = {"foo"} self.batch.event = MagicMock( unpack=MagicMock(return_value=("salt/job/1234/ret/foo", {"id": "foo"})) ) @@ -228,7 +231,7 @@ class AsyncBatchTestCase(AsyncTestCase, TestCase): ) def test_batch__event_handler_not_call_start_batch_when_not_all_pings_return(self): - self.batch.down_minions = {"foo", "bar"} + self.batch.targeted_minions = {"foo", "bar"} self.batch.event = MagicMock( unpack=MagicMock(return_value=("salt/job/1234/ret/foo", {"id": "foo"})) ) @@ -259,23 +262,10 @@ class AsyncBatchTestCase(AsyncTestCase, TestCase): self.assertEqual(self.batch.find_job_returned, {"foo"}) @tornado.testing.gen_test - def test_batch__event_handler_end_batch(self): - self.batch.event = MagicMock( - unpack=MagicMock( - return_value=("salt/job/not-my-jid/ret/foo", {"id": "foo"}) - ) - ) - future = tornado.gen.Future() - future.set_result({"minions": ["foo", "bar", "baz"]}) - self.batch.local.run_job_async.return_value = future - self.batch.start() - self.batch.initialized = True - self.assertEqual(self.batch.down_minions, {"foo", "bar", "baz"}) + def test_batch_schedule_next_end_batch_when_no_next(self): self.batch.end_batch = MagicMock() - self.batch.minions = {"foo", "bar", "baz"} - self.batch.done_minions = {"foo", "bar"} - self.batch.timedout_minions = {"baz"} - self.batch._BatchAsync__event_handler(MagicMock()) + self.batch._get_next = MagicMock(return_value={}) + self.batch.schedule_next() self.assertEqual(len(self.batch.end_batch.mock_calls), 1) @tornado.testing.gen_test -- 2.29.2