From 65e33acaf10fdd838c0cdf34ec93df3a2ed1f0d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Su=C3=A1rez=20Hern=C3=A1ndez?= Date: Thu, 26 Sep 2019 10:41:06 +0100 Subject: [PATCH] Improve batch_async to release consumed memory (bsc#1140912) --- salt/cli/batch_async.py | 73 ++++++++++++++++++++++++++++++------------------- 1 file changed, 45 insertions(+), 28 deletions(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index 8a67331102..2bb50459c8 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -5,6 +5,7 @@ Execute a job on the targeted minions by using a moving window of fixed size `ba # Import python libs from __future__ import absolute_import, print_function, unicode_literals +import gc import tornado # Import salt libs @@ -77,6 +78,7 @@ class BatchAsync(object): self.batch_jid = jid_gen() self.find_job_jid = jid_gen() self.find_job_returned = set() + self.ended = False self.event = salt.utils.event.get_event( 'master', self.opts['sock_dir'], @@ -86,6 +88,7 @@ class BatchAsync(object): io_loop=ioloop, keep_loop=True) self.scheduled = False + self.patterns = {} def __set_event_handler(self): ping_return_pattern = 'salt/job/{0}/ret/*'.format(self.ping_jid) @@ -116,7 +119,7 @@ class BatchAsync(object): if minion in self.active: self.active.remove(minion) self.done_minions.add(minion) - self.schedule_next() + self.event.io_loop.spawn_callback(self.schedule_next) def _get_next(self): to_run = self.minions.difference( @@ -129,23 +132,23 @@ class BatchAsync(object): ) return set(list(to_run)[:next_batch_size]) - @tornado.gen.coroutine def check_find_job(self, batch_minions, jid): - find_job_return_pattern = 'salt/job/{0}/ret/*'.format(jid) - self.event.unsubscribe(find_job_return_pattern, match_type='glob') - self.patterns.remove((find_job_return_pattern, "find_job_return")) + if self.event: + find_job_return_pattern = 'salt/job/{0}/ret/*'.format(jid) + self.event.unsubscribe(find_job_return_pattern, match_type='glob') + self.patterns.remove((find_job_return_pattern, "find_job_return")) - timedout_minions = batch_minions.difference(self.find_job_returned).difference(self.done_minions) - self.timedout_minions = self.timedout_minions.union(timedout_minions) - self.active = self.active.difference(self.timedout_minions) - running = batch_minions.difference(self.done_minions).difference(self.timedout_minions) + timedout_minions = batch_minions.difference(self.find_job_returned).difference(self.done_minions) + self.timedout_minions = self.timedout_minions.union(timedout_minions) + self.active = self.active.difference(self.timedout_minions) + running = batch_minions.difference(self.done_minions).difference(self.timedout_minions) - if timedout_minions: - self.schedule_next() + if timedout_minions: + self.schedule_next() - if running: - self.find_job_returned = self.find_job_returned.difference(running) - self.event.io_loop.add_callback(self.find_job, running) + if running: + self.find_job_returned = self.find_job_returned.difference(running) + self.event.io_loop.spawn_callback(self.find_job, running) @tornado.gen.coroutine def find_job(self, minions): @@ -165,8 +168,8 @@ class BatchAsync(object): gather_job_timeout=self.opts['gather_job_timeout'], jid=jid, **self.eauth) - self.event.io_loop.call_later( - self.opts['gather_job_timeout'], + yield tornado.gen.sleep(self.opts['gather_job_timeout']) + self.event.io_loop.spawn_callback( self.check_find_job, not_done, jid) @@ -174,10 +177,6 @@ class BatchAsync(object): @tornado.gen.coroutine def start(self): self.__set_event_handler() - #start batching even if not all minions respond to ping - self.event.io_loop.call_later( - self.batch_presence_ping_timeout or self.opts['gather_job_timeout'], - self.start_batch) ping_return = yield self.local.run_job_async( self.opts['tgt'], 'test.ping', @@ -191,6 +190,10 @@ class BatchAsync(object): metadata=self.metadata, **self.eauth) self.targeted_minions = set(ping_return['minions']) + #start batching even if not all minions respond to ping + yield tornado.gen.sleep(self.batch_presence_ping_timeout or self.opts['gather_job_timeout']) + self.event.io_loop.spawn_callback(self.start_batch) + @tornado.gen.coroutine def start_batch(self): @@ -202,12 +205,14 @@ class BatchAsync(object): "down_minions": self.targeted_minions.difference(self.minions), "metadata": self.metadata } - self.event.fire_event(data, "salt/batch/{0}/start".format(self.batch_jid)) - yield self.run_next() + ret = self.event.fire_event(data, "salt/batch/{0}/start".format(self.batch_jid)) + self.event.io_loop.spawn_callback(self.run_next) + @tornado.gen.coroutine def end_batch(self): left = self.minions.symmetric_difference(self.done_minions.union(self.timedout_minions)) - if not left: + if not left and not self.ended: + self.ended = True data = { "available_minions": self.minions, "down_minions": self.targeted_minions.difference(self.minions), @@ -220,20 +225,26 @@ class BatchAsync(object): for (pattern, label) in self.patterns: if label in ["ping_return", "batch_run"]: self.event.unsubscribe(pattern, match_type='glob') + del self + gc.collect() + yield + @tornado.gen.coroutine def schedule_next(self): if not self.scheduled: self.scheduled = True # call later so that we maybe gather more returns - self.event.io_loop.call_later(self.batch_delay, self.run_next) + yield tornado.gen.sleep(self.batch_delay) + self.event.io_loop.spawn_callback(self.run_next) @tornado.gen.coroutine def run_next(self): + self.scheduled = False next_batch = self._get_next() if next_batch: self.active = self.active.union(next_batch) try: - yield self.local.run_job_async( + ret = yield self.local.run_job_async( next_batch, self.opts['fun'], self.opts['arg'], @@ -244,11 +255,17 @@ class BatchAsync(object): jid=self.batch_jid, metadata=self.metadata) - self.event.io_loop.call_later(self.opts['timeout'], self.find_job, set(next_batch)) + yield tornado.gen.sleep(self.opts['timeout']) + self.event.io_loop.spawn_callback(self.find_job, set(next_batch)) except Exception as ex: log.error("Error in scheduling next batch: %s", ex) self.active = self.active.difference(next_batch) else: - self.end_batch() - self.scheduled = False + yield self.end_batch() + gc.collect() yield + + def __del__(self): + self.event = None + self.ioloop = None + gc.collect() -- 2.16.4