From e53d50ce5fabf67eeb5344f7be9cccbb09d0179b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Su=C3=A1rez=20Hern=C3=A1ndez?= Date: Thu, 26 Sep 2019 10:41:06 +0100 Subject: [PATCH] Improve batch_async to release consumed memory (bsc#1140912) --- salt/cli/batch_async.py | 89 ++++++++++++++++++++++++----------------- 1 file changed, 52 insertions(+), 37 deletions(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index 388b709416..0a0b8f5f83 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -2,7 +2,7 @@ Execute a job on the targeted minions by using a moving window of fixed size `batch`. """ -import fnmatch +import gc # pylint: enable=import-error,no-name-in-module,redefined-builtin import logging @@ -78,6 +78,7 @@ class BatchAsync: self.batch_jid = jid_gen() self.find_job_jid = jid_gen() self.find_job_returned = set() + self.ended = False self.event = salt.utils.event.get_event( "master", self.opts["sock_dir"], @@ -88,6 +89,7 @@ class BatchAsync: keep_loop=True, ) self.scheduled = False + self.patterns = {} def __set_event_handler(self): ping_return_pattern = "salt/job/{}/ret/*".format(self.ping_jid) @@ -118,7 +120,7 @@ class BatchAsync: if minion in self.active: self.active.remove(minion) self.done_minions.add(minion) - self.schedule_next() + self.event.io_loop.spawn_callback(self.schedule_next) def _get_next(self): to_run = ( @@ -132,27 +134,27 @@ class BatchAsync: ) return set(list(to_run)[:next_batch_size]) - @tornado.gen.coroutine def check_find_job(self, batch_minions, jid): - find_job_return_pattern = "salt/job/{}/ret/*".format(jid) - self.event.unsubscribe(find_job_return_pattern, match_type="glob") - self.patterns.remove((find_job_return_pattern, "find_job_return")) + if self.event: + find_job_return_pattern = "salt/job/{}/ret/*".format(jid) + self.event.unsubscribe(find_job_return_pattern, match_type="glob") + self.patterns.remove((find_job_return_pattern, "find_job_return")) - timedout_minions = batch_minions.difference(self.find_job_returned).difference( - self.done_minions - ) - self.timedout_minions = self.timedout_minions.union(timedout_minions) - self.active = self.active.difference(self.timedout_minions) - running = batch_minions.difference(self.done_minions).difference( - self.timedout_minions - ) + timedout_minions = batch_minions.difference( + self.find_job_returned + ).difference(self.done_minions) + self.timedout_minions = self.timedout_minions.union(timedout_minions) + self.active = self.active.difference(self.timedout_minions) + running = batch_minions.difference(self.done_minions).difference( + self.timedout_minions + ) - if timedout_minions: - self.schedule_next() + if timedout_minions: + self.schedule_next() - if running: - self.find_job_returned = self.find_job_returned.difference(running) - self.event.io_loop.add_callback(self.find_job, running) + if running: + self.find_job_returned = self.find_job_returned.difference(running) + self.event.io_loop.spawn_callback(self.find_job, running) @tornado.gen.coroutine def find_job(self, minions): @@ -175,18 +177,12 @@ class BatchAsync: jid=jid, **self.eauth ) - self.event.io_loop.call_later( - self.opts["gather_job_timeout"], self.check_find_job, not_done, jid - ) + yield tornado.gen.sleep(self.opts["gather_job_timeout"]) + self.event.io_loop.spawn_callback(self.check_find_job, not_done, jid) @tornado.gen.coroutine def start(self): self.__set_event_handler() - # start batching even if not all minions respond to ping - self.event.io_loop.call_later( - self.batch_presence_ping_timeout or self.opts["gather_job_timeout"], - self.start_batch, - ) ping_return = yield self.local.run_job_async( self.opts["tgt"], "test.ping", @@ -198,6 +194,11 @@ class BatchAsync: **self.eauth ) self.targeted_minions = set(ping_return["minions"]) + # start batching even if not all minions respond to ping + yield tornado.gen.sleep( + self.batch_presence_ping_timeout or self.opts["gather_job_timeout"] + ) + self.event.io_loop.spawn_callback(self.start_batch) @tornado.gen.coroutine def start_batch(self): @@ -209,14 +210,18 @@ class BatchAsync: "down_minions": self.targeted_minions.difference(self.minions), "metadata": self.metadata, } - self.event.fire_event(data, "salt/batch/{}/start".format(self.batch_jid)) - yield self.run_next() + ret = self.event.fire_event( + data, "salt/batch/{}/start".format(self.batch_jid) + ) + self.event.io_loop.spawn_callback(self.run_next) + @tornado.gen.coroutine def end_batch(self): left = self.minions.symmetric_difference( self.done_minions.union(self.timedout_minions) ) - if not left: + if not left and not self.ended: + self.ended = True data = { "available_minions": self.minions, "down_minions": self.targeted_minions.difference(self.minions), @@ -229,20 +234,26 @@ class BatchAsync: for (pattern, label) in self.patterns: if label in ["ping_return", "batch_run"]: self.event.unsubscribe(pattern, match_type="glob") + del self + gc.collect() + yield + @tornado.gen.coroutine def schedule_next(self): if not self.scheduled: self.scheduled = True # call later so that we maybe gather more returns - self.event.io_loop.call_later(self.batch_delay, self.run_next) + yield tornado.gen.sleep(self.batch_delay) + self.event.io_loop.spawn_callback(self.run_next) @tornado.gen.coroutine def run_next(self): + self.scheduled = False next_batch = self._get_next() if next_batch: self.active = self.active.union(next_batch) try: - yield self.local.run_job_async( + ret = yield self.local.run_job_async( next_batch, self.opts["fun"], self.opts["arg"], @@ -254,13 +265,17 @@ class BatchAsync: metadata=self.metadata, ) - self.event.io_loop.call_later( - self.opts["timeout"], self.find_job, set(next_batch) - ) + yield tornado.gen.sleep(self.opts["timeout"]) + self.event.io_loop.spawn_callback(self.find_job, set(next_batch)) except Exception as ex: log.error("Error in scheduling next batch: %s", ex) self.active = self.active.difference(next_batch) else: - self.end_batch() - self.scheduled = False + yield self.end_batch() + gc.collect() yield + + def __del__(self): + self.event = None + self.ioloop = None + gc.collect() -- 2.29.2