65598582f5
OBS-URL: https://build.opensuse.org/package/show/systemsmanagement:saltstack/salt?expand=0&rev=179
206 lines
7.7 KiB
Diff
206 lines
7.7 KiB
Diff
From e53d50ce5fabf67eeb5344f7be9cccbb09d0179b Mon Sep 17 00:00:00 2001
|
|
From: =?UTF-8?q?Pablo=20Su=C3=A1rez=20Hern=C3=A1ndez?=
|
|
<psuarezhernandez@suse.com>
|
|
Date: Thu, 26 Sep 2019 10:41:06 +0100
|
|
Subject: [PATCH] Improve batch_async to release consumed memory
|
|
(bsc#1140912)
|
|
|
|
---
|
|
salt/cli/batch_async.py | 89 ++++++++++++++++++++++++-----------------
|
|
1 file changed, 52 insertions(+), 37 deletions(-)
|
|
|
|
diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py
|
|
index 388b709416..0a0b8f5f83 100644
|
|
--- a/salt/cli/batch_async.py
|
|
+++ b/salt/cli/batch_async.py
|
|
@@ -2,7 +2,7 @@
|
|
Execute a job on the targeted minions by using a moving window of fixed size `batch`.
|
|
"""
|
|
|
|
-import fnmatch
|
|
+import gc
|
|
|
|
# pylint: enable=import-error,no-name-in-module,redefined-builtin
|
|
import logging
|
|
@@ -78,6 +78,7 @@ class BatchAsync:
|
|
self.batch_jid = jid_gen()
|
|
self.find_job_jid = jid_gen()
|
|
self.find_job_returned = set()
|
|
+ self.ended = False
|
|
self.event = salt.utils.event.get_event(
|
|
"master",
|
|
self.opts["sock_dir"],
|
|
@@ -88,6 +89,7 @@ class BatchAsync:
|
|
keep_loop=True,
|
|
)
|
|
self.scheduled = False
|
|
+ self.patterns = {}
|
|
|
|
def __set_event_handler(self):
|
|
ping_return_pattern = "salt/job/{}/ret/*".format(self.ping_jid)
|
|
@@ -118,7 +120,7 @@ class BatchAsync:
|
|
if minion in self.active:
|
|
self.active.remove(minion)
|
|
self.done_minions.add(minion)
|
|
- self.schedule_next()
|
|
+ self.event.io_loop.spawn_callback(self.schedule_next)
|
|
|
|
def _get_next(self):
|
|
to_run = (
|
|
@@ -132,27 +134,27 @@ class BatchAsync:
|
|
)
|
|
return set(list(to_run)[:next_batch_size])
|
|
|
|
- @tornado.gen.coroutine
|
|
def check_find_job(self, batch_minions, jid):
|
|
- find_job_return_pattern = "salt/job/{}/ret/*".format(jid)
|
|
- self.event.unsubscribe(find_job_return_pattern, match_type="glob")
|
|
- self.patterns.remove((find_job_return_pattern, "find_job_return"))
|
|
+ if self.event:
|
|
+ find_job_return_pattern = "salt/job/{}/ret/*".format(jid)
|
|
+ self.event.unsubscribe(find_job_return_pattern, match_type="glob")
|
|
+ self.patterns.remove((find_job_return_pattern, "find_job_return"))
|
|
|
|
- timedout_minions = batch_minions.difference(self.find_job_returned).difference(
|
|
- self.done_minions
|
|
- )
|
|
- self.timedout_minions = self.timedout_minions.union(timedout_minions)
|
|
- self.active = self.active.difference(self.timedout_minions)
|
|
- running = batch_minions.difference(self.done_minions).difference(
|
|
- self.timedout_minions
|
|
- )
|
|
+ timedout_minions = batch_minions.difference(
|
|
+ self.find_job_returned
|
|
+ ).difference(self.done_minions)
|
|
+ self.timedout_minions = self.timedout_minions.union(timedout_minions)
|
|
+ self.active = self.active.difference(self.timedout_minions)
|
|
+ running = batch_minions.difference(self.done_minions).difference(
|
|
+ self.timedout_minions
|
|
+ )
|
|
|
|
- if timedout_minions:
|
|
- self.schedule_next()
|
|
+ if timedout_minions:
|
|
+ self.schedule_next()
|
|
|
|
- if running:
|
|
- self.find_job_returned = self.find_job_returned.difference(running)
|
|
- self.event.io_loop.add_callback(self.find_job, running)
|
|
+ if running:
|
|
+ self.find_job_returned = self.find_job_returned.difference(running)
|
|
+ self.event.io_loop.spawn_callback(self.find_job, running)
|
|
|
|
@tornado.gen.coroutine
|
|
def find_job(self, minions):
|
|
@@ -175,18 +177,12 @@ class BatchAsync:
|
|
jid=jid,
|
|
**self.eauth
|
|
)
|
|
- self.event.io_loop.call_later(
|
|
- self.opts["gather_job_timeout"], self.check_find_job, not_done, jid
|
|
- )
|
|
+ yield tornado.gen.sleep(self.opts["gather_job_timeout"])
|
|
+ self.event.io_loop.spawn_callback(self.check_find_job, not_done, jid)
|
|
|
|
@tornado.gen.coroutine
|
|
def start(self):
|
|
self.__set_event_handler()
|
|
- # start batching even if not all minions respond to ping
|
|
- self.event.io_loop.call_later(
|
|
- self.batch_presence_ping_timeout or self.opts["gather_job_timeout"],
|
|
- self.start_batch,
|
|
- )
|
|
ping_return = yield self.local.run_job_async(
|
|
self.opts["tgt"],
|
|
"test.ping",
|
|
@@ -198,6 +194,11 @@ class BatchAsync:
|
|
**self.eauth
|
|
)
|
|
self.targeted_minions = set(ping_return["minions"])
|
|
+ # start batching even if not all minions respond to ping
|
|
+ yield tornado.gen.sleep(
|
|
+ self.batch_presence_ping_timeout or self.opts["gather_job_timeout"]
|
|
+ )
|
|
+ self.event.io_loop.spawn_callback(self.start_batch)
|
|
|
|
@tornado.gen.coroutine
|
|
def start_batch(self):
|
|
@@ -209,14 +210,18 @@ class BatchAsync:
|
|
"down_minions": self.targeted_minions.difference(self.minions),
|
|
"metadata": self.metadata,
|
|
}
|
|
- self.event.fire_event(data, "salt/batch/{}/start".format(self.batch_jid))
|
|
- yield self.run_next()
|
|
+ ret = self.event.fire_event(
|
|
+ data, "salt/batch/{}/start".format(self.batch_jid)
|
|
+ )
|
|
+ self.event.io_loop.spawn_callback(self.run_next)
|
|
|
|
+ @tornado.gen.coroutine
|
|
def end_batch(self):
|
|
left = self.minions.symmetric_difference(
|
|
self.done_minions.union(self.timedout_minions)
|
|
)
|
|
- if not left:
|
|
+ if not left and not self.ended:
|
|
+ self.ended = True
|
|
data = {
|
|
"available_minions": self.minions,
|
|
"down_minions": self.targeted_minions.difference(self.minions),
|
|
@@ -229,20 +234,26 @@ class BatchAsync:
|
|
for (pattern, label) in self.patterns:
|
|
if label in ["ping_return", "batch_run"]:
|
|
self.event.unsubscribe(pattern, match_type="glob")
|
|
+ del self
|
|
+ gc.collect()
|
|
+ yield
|
|
|
|
+ @tornado.gen.coroutine
|
|
def schedule_next(self):
|
|
if not self.scheduled:
|
|
self.scheduled = True
|
|
# call later so that we maybe gather more returns
|
|
- self.event.io_loop.call_later(self.batch_delay, self.run_next)
|
|
+ yield tornado.gen.sleep(self.batch_delay)
|
|
+ self.event.io_loop.spawn_callback(self.run_next)
|
|
|
|
@tornado.gen.coroutine
|
|
def run_next(self):
|
|
+ self.scheduled = False
|
|
next_batch = self._get_next()
|
|
if next_batch:
|
|
self.active = self.active.union(next_batch)
|
|
try:
|
|
- yield self.local.run_job_async(
|
|
+ ret = yield self.local.run_job_async(
|
|
next_batch,
|
|
self.opts["fun"],
|
|
self.opts["arg"],
|
|
@@ -254,13 +265,17 @@ class BatchAsync:
|
|
metadata=self.metadata,
|
|
)
|
|
|
|
- self.event.io_loop.call_later(
|
|
- self.opts["timeout"], self.find_job, set(next_batch)
|
|
- )
|
|
+ yield tornado.gen.sleep(self.opts["timeout"])
|
|
+ self.event.io_loop.spawn_callback(self.find_job, set(next_batch))
|
|
except Exception as ex:
|
|
log.error("Error in scheduling next batch: %s", ex)
|
|
self.active = self.active.difference(next_batch)
|
|
else:
|
|
- self.end_batch()
|
|
- self.scheduled = False
|
|
+ yield self.end_batch()
|
|
+ gc.collect()
|
|
yield
|
|
+
|
|
+ def __del__(self):
|
|
+ self.event = None
|
|
+ self.ioloop = None
|
|
+ gc.collect()
|
|
--
|
|
2.29.2
|
|
|
|
|