1273 lines
48 KiB
Diff
1273 lines
48 KiB
Diff
|
From d57472b4fa2213ec551197ee2e147aef364fdcfe Mon Sep 17 00:00:00 2001
|
||
|
From: Victor Zhestkov <vzhestkov@suse.com>
|
||
|
Date: Wed, 15 May 2024 11:47:35 +0200
|
||
|
Subject: [PATCH] Prevent OOM with high amount of batch async calls
|
||
|
(bsc#1216063)
|
||
|
|
||
|
* Refactor batch_async implementation
|
||
|
|
||
|
* Fix batch_async tests after refactoring
|
||
|
---
|
||
|
salt/cli/batch_async.py | 584 ++++++++++++++-------
|
||
|
salt/master.py | 9 +-
|
||
|
tests/pytests/unit/cli/test_batch_async.py | 360 +++++++------
|
||
|
3 files changed, 597 insertions(+), 356 deletions(-)
|
||
|
|
||
|
diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py
|
||
|
index 1012ce37cc..5d49993faa 100644
|
||
|
--- a/salt/cli/batch_async.py
|
||
|
+++ b/salt/cli/batch_async.py
|
||
|
@@ -2,18 +2,193 @@
|
||
|
Execute a job on the targeted minions by using a moving window of fixed size `batch`.
|
||
|
"""
|
||
|
|
||
|
-import gc
|
||
|
-
|
||
|
-# pylint: enable=import-error,no-name-in-module,redefined-builtin
|
||
|
import logging
|
||
|
+import re
|
||
|
|
||
|
import salt.client
|
||
|
import salt.ext.tornado
|
||
|
+import salt.utils.event
|
||
|
from salt.cli.batch import batch_get_eauth, batch_get_opts, get_bnum
|
||
|
+from salt.ext.tornado.iostream import StreamClosedError
|
||
|
|
||
|
log = logging.getLogger(__name__)
|
||
|
|
||
|
|
||
|
+__SHARED_EVENTS_CHANNEL = None
|
||
|
+
|
||
|
+
|
||
|
+def _get_shared_events_channel(opts, io_loop):
|
||
|
+ global __SHARED_EVENTS_CHANNEL
|
||
|
+ if __SHARED_EVENTS_CHANNEL is None:
|
||
|
+ __SHARED_EVENTS_CHANNEL = SharedEventsChannel(opts, io_loop)
|
||
|
+ return __SHARED_EVENTS_CHANNEL
|
||
|
+
|
||
|
+
|
||
|
+def _destroy_unused_shared_events_channel():
|
||
|
+ global __SHARED_EVENTS_CHANNEL
|
||
|
+ if __SHARED_EVENTS_CHANNEL is not None and __SHARED_EVENTS_CHANNEL.destroy_unused():
|
||
|
+ __SHARED_EVENTS_CHANNEL = None
|
||
|
+
|
||
|
+
|
||
|
+def batch_async_required(opts, minions, extra):
|
||
|
+ """
|
||
|
+ Check opts to identify if batch async is required for the operation.
|
||
|
+ """
|
||
|
+ if not isinstance(minions, list):
|
||
|
+ False
|
||
|
+ batch_async_opts = opts.get("batch_async", {})
|
||
|
+ batch_async_threshold = (
|
||
|
+ batch_async_opts.get("threshold", 1)
|
||
|
+ if isinstance(batch_async_opts, dict)
|
||
|
+ else 1
|
||
|
+ )
|
||
|
+ if batch_async_threshold == -1:
|
||
|
+ batch_size = get_bnum(extra, minions, True)
|
||
|
+ return len(minions) >= batch_size
|
||
|
+ elif batch_async_threshold > 0:
|
||
|
+ return len(minions) >= batch_async_threshold
|
||
|
+ return False
|
||
|
+
|
||
|
+
|
||
|
+class SharedEventsChannel:
|
||
|
+ def __init__(self, opts, io_loop):
|
||
|
+ self.io_loop = io_loop
|
||
|
+ self.local_client = salt.client.get_local_client(
|
||
|
+ opts["conf_file"], io_loop=self.io_loop
|
||
|
+ )
|
||
|
+ self.master_event = salt.utils.event.get_event(
|
||
|
+ "master",
|
||
|
+ sock_dir=self.local_client.opts["sock_dir"],
|
||
|
+ opts=self.local_client.opts,
|
||
|
+ listen=True,
|
||
|
+ io_loop=self.io_loop,
|
||
|
+ keep_loop=True,
|
||
|
+ )
|
||
|
+ self.master_event.set_event_handler(self.__handle_event)
|
||
|
+ if self.master_event.subscriber.stream:
|
||
|
+ self.master_event.subscriber.stream.set_close_callback(self.__handle_close)
|
||
|
+ self._re_tag_ret_event = re.compile(r"salt\/job\/(\d+)\/ret\/.*")
|
||
|
+ self._subscribers = {}
|
||
|
+ self._subscriptions = {}
|
||
|
+ self._used_by = set()
|
||
|
+ batch_async_opts = opts.get("batch_async", {})
|
||
|
+ if not isinstance(batch_async_opts, dict):
|
||
|
+ batch_async_opts = {}
|
||
|
+ self._subscriber_reconnect_tries = batch_async_opts.get(
|
||
|
+ "subscriber_reconnect_tries", 5
|
||
|
+ )
|
||
|
+ self._subscriber_reconnect_interval = batch_async_opts.get(
|
||
|
+ "subscriber_reconnect_interval", 1.0
|
||
|
+ )
|
||
|
+ self._reconnecting_subscriber = False
|
||
|
+
|
||
|
+ def subscribe(self, jid, op, subscriber_id, handler):
|
||
|
+ if subscriber_id not in self._subscribers:
|
||
|
+ self._subscribers[subscriber_id] = set()
|
||
|
+ if jid not in self._subscriptions:
|
||
|
+ self._subscriptions[jid] = []
|
||
|
+ self._subscribers[subscriber_id].add(jid)
|
||
|
+ if (op, subscriber_id, handler) not in self._subscriptions[jid]:
|
||
|
+ self._subscriptions[jid].append((op, subscriber_id, handler))
|
||
|
+ if not self.master_event.subscriber.connected():
|
||
|
+ self.__reconnect_subscriber()
|
||
|
+
|
||
|
+ def unsubscribe(self, jid, op, subscriber_id):
|
||
|
+ if subscriber_id not in self._subscribers:
|
||
|
+ return
|
||
|
+ jids = self._subscribers[subscriber_id].copy()
|
||
|
+ if jid is not None:
|
||
|
+ jids = set(jid)
|
||
|
+ for i_jid in jids:
|
||
|
+ self._subscriptions[i_jid] = list(
|
||
|
+ filter(
|
||
|
+ lambda x: not (op in (x[0], None) and x[1] == subscriber_id),
|
||
|
+ self._subscriptions.get(i_jid, []),
|
||
|
+ )
|
||
|
+ )
|
||
|
+ self._subscribers[subscriber_id].discard(i_jid)
|
||
|
+ self._subscriptions = dict(filter(lambda x: x[1], self._subscriptions.items()))
|
||
|
+ if not self._subscribers[subscriber_id]:
|
||
|
+ del self._subscribers[subscriber_id]
|
||
|
+
|
||
|
+ @salt.ext.tornado.gen.coroutine
|
||
|
+ def __handle_close(self):
|
||
|
+ if not self._subscriptions:
|
||
|
+ return
|
||
|
+ log.warning("Master Event Subscriber was closed. Trying to reconnect...")
|
||
|
+ yield self.__reconnect_subscriber()
|
||
|
+
|
||
|
+ @salt.ext.tornado.gen.coroutine
|
||
|
+ def __handle_event(self, raw):
|
||
|
+ if self.master_event is None:
|
||
|
+ return
|
||
|
+ try:
|
||
|
+ tag, data = self.master_event.unpack(raw)
|
||
|
+ tag_match = self._re_tag_ret_event.match(tag)
|
||
|
+ if tag_match:
|
||
|
+ jid = tag_match.group(1)
|
||
|
+ if jid in self._subscriptions:
|
||
|
+ for op, _, handler in self._subscriptions[jid]:
|
||
|
+ yield handler(tag, data, op)
|
||
|
+ except Exception as ex: # pylint: disable=W0703
|
||
|
+ log.error(
|
||
|
+ "Exception occured while processing event: %s: %s",
|
||
|
+ tag,
|
||
|
+ ex,
|
||
|
+ exc_info=True,
|
||
|
+ )
|
||
|
+
|
||
|
+ @salt.ext.tornado.gen.coroutine
|
||
|
+ def __reconnect_subscriber(self):
|
||
|
+ if self.master_event.subscriber.connected() or self._reconnecting_subscriber:
|
||
|
+ return
|
||
|
+ self._reconnecting_subscriber = True
|
||
|
+ max_tries = max(1, int(self._subscriber_reconnect_tries))
|
||
|
+ _try = 1
|
||
|
+ while _try <= max_tries:
|
||
|
+ log.info(
|
||
|
+ "Trying to reconnect to event publisher (try %d of %d) ...",
|
||
|
+ _try,
|
||
|
+ max_tries,
|
||
|
+ )
|
||
|
+ try:
|
||
|
+ yield self.master_event.subscriber.connect()
|
||
|
+ except StreamClosedError:
|
||
|
+ log.warning(
|
||
|
+ "Unable to reconnect to event publisher (try %d of %d)",
|
||
|
+ _try,
|
||
|
+ max_tries,
|
||
|
+ )
|
||
|
+ if self.master_event.subscriber.connected():
|
||
|
+ self.master_event.subscriber.stream.set_close_callback(
|
||
|
+ self.__handle_close
|
||
|
+ )
|
||
|
+ log.info("Event publisher connection restored")
|
||
|
+ self._reconnecting_subscriber = False
|
||
|
+ return
|
||
|
+ if _try < max_tries:
|
||
|
+ yield salt.ext.tornado.gen.sleep(self._subscriber_reconnect_interval)
|
||
|
+ _try += 1
|
||
|
+ self._reconnecting_subscriber = False
|
||
|
+
|
||
|
+ def use(self, subscriber_id):
|
||
|
+ self._used_by.add(subscriber_id)
|
||
|
+ return self
|
||
|
+
|
||
|
+ def unuse(self, subscriber_id):
|
||
|
+ self._used_by.discard(subscriber_id)
|
||
|
+
|
||
|
+ def destroy_unused(self):
|
||
|
+ if self._used_by:
|
||
|
+ return False
|
||
|
+ self.master_event.remove_event_handler(self.__handle_event)
|
||
|
+ self.master_event.destroy()
|
||
|
+ self.master_event = None
|
||
|
+ self.local_client.destroy()
|
||
|
+ self.local_client = None
|
||
|
+ return True
|
||
|
+
|
||
|
+
|
||
|
class BatchAsync:
|
||
|
"""
|
||
|
Run a job on the targeted minions by using a moving window of fixed size `batch`.
|
||
|
@@ -28,14 +203,14 @@ class BatchAsync:
|
||
|
- gather_job_timeout: `find_job` timeout
|
||
|
- timeout: time to wait before firing a `find_job`
|
||
|
|
||
|
- When the batch stars, a `start` event is fired:
|
||
|
+ When the batch starts, a `start` event is fired:
|
||
|
- tag: salt/batch/<batch-jid>/start
|
||
|
- data: {
|
||
|
"available_minions": self.minions,
|
||
|
"down_minions": targeted_minions - presence_ping_minions
|
||
|
}
|
||
|
|
||
|
- When the batch ends, an `done` event is fired:
|
||
|
+ When the batch ends, a `done` event is fired:
|
||
|
- tag: salt/batch/<batch-jid>/done
|
||
|
- data: {
|
||
|
"available_minions": self.minions,
|
||
|
@@ -45,17 +220,26 @@ class BatchAsync:
|
||
|
}
|
||
|
"""
|
||
|
|
||
|
- def __init__(self, parent_opts, jid_gen, clear_load):
|
||
|
- ioloop = salt.ext.tornado.ioloop.IOLoop.current()
|
||
|
- self.local = salt.client.get_local_client(
|
||
|
- parent_opts["conf_file"], io_loop=ioloop
|
||
|
+ def __init__(self, opts, jid_gen, clear_load):
|
||
|
+ self.extra_job_kwargs = {}
|
||
|
+ kwargs = clear_load.get("kwargs", {})
|
||
|
+ for kwarg in ("module_executors", "executor_opts"):
|
||
|
+ if kwarg in kwargs:
|
||
|
+ self.extra_job_kwargs[kwarg] = kwargs[kwarg]
|
||
|
+ elif kwarg in opts:
|
||
|
+ self.extra_job_kwargs[kwarg] = opts[kwarg]
|
||
|
+ self.io_loop = salt.ext.tornado.ioloop.IOLoop.current()
|
||
|
+ self.events_channel = _get_shared_events_channel(opts, self.io_loop).use(
|
||
|
+ id(self)
|
||
|
)
|
||
|
if "gather_job_timeout" in clear_load["kwargs"]:
|
||
|
clear_load["gather_job_timeout"] = clear_load["kwargs"].pop(
|
||
|
"gather_job_timeout"
|
||
|
)
|
||
|
else:
|
||
|
- clear_load["gather_job_timeout"] = self.local.opts["gather_job_timeout"]
|
||
|
+ clear_load["gather_job_timeout"] = self.events_channel.local_client.opts[
|
||
|
+ "gather_job_timeout"
|
||
|
+ ]
|
||
|
self.batch_presence_ping_timeout = clear_load["kwargs"].get(
|
||
|
"batch_presence_ping_timeout", None
|
||
|
)
|
||
|
@@ -64,8 +248,8 @@ class BatchAsync:
|
||
|
clear_load.pop("tgt"),
|
||
|
clear_load.pop("fun"),
|
||
|
clear_load["kwargs"].pop("batch"),
|
||
|
- self.local.opts,
|
||
|
- **clear_load
|
||
|
+ self.events_channel.local_client.opts,
|
||
|
+ **clear_load,
|
||
|
)
|
||
|
self.eauth = batch_get_eauth(clear_load["kwargs"])
|
||
|
self.metadata = clear_load["kwargs"].get("metadata", {})
|
||
|
@@ -78,54 +262,45 @@ class BatchAsync:
|
||
|
self.jid_gen = jid_gen
|
||
|
self.ping_jid = jid_gen()
|
||
|
self.batch_jid = jid_gen()
|
||
|
- self.find_job_jid = jid_gen()
|
||
|
self.find_job_returned = set()
|
||
|
+ self.metadata.update({"batch_jid": self.batch_jid, "ping_jid": self.ping_jid})
|
||
|
self.ended = False
|
||
|
- self.event = salt.utils.event.get_event(
|
||
|
- "master",
|
||
|
- self.opts["sock_dir"],
|
||
|
- self.opts["transport"],
|
||
|
- opts=self.opts,
|
||
|
- listen=True,
|
||
|
- io_loop=ioloop,
|
||
|
- keep_loop=True,
|
||
|
- )
|
||
|
+ self.event = self.events_channel.master_event
|
||
|
self.scheduled = False
|
||
|
- self.patterns = set()
|
||
|
|
||
|
def __set_event_handler(self):
|
||
|
- ping_return_pattern = "salt/job/{}/ret/*".format(self.ping_jid)
|
||
|
- batch_return_pattern = "salt/job/{}/ret/*".format(self.batch_jid)
|
||
|
- self.event.subscribe(ping_return_pattern, match_type="glob")
|
||
|
- self.event.subscribe(batch_return_pattern, match_type="glob")
|
||
|
- self.patterns = {
|
||
|
- (ping_return_pattern, "ping_return"),
|
||
|
- (batch_return_pattern, "batch_run"),
|
||
|
- }
|
||
|
- self.event.set_event_handler(self.__event_handler)
|
||
|
+ self.events_channel.subscribe(
|
||
|
+ self.ping_jid, "ping_return", id(self), self.__event_handler
|
||
|
+ )
|
||
|
+ self.events_channel.subscribe(
|
||
|
+ self.batch_jid, "batch_run", id(self), self.__event_handler
|
||
|
+ )
|
||
|
|
||
|
- def __event_handler(self, raw):
|
||
|
+ @salt.ext.tornado.gen.coroutine
|
||
|
+ def __event_handler(self, tag, data, op):
|
||
|
if not self.event:
|
||
|
return
|
||
|
try:
|
||
|
- mtag, data = self.event.unpack(raw)
|
||
|
- for (pattern, op) in self.patterns:
|
||
|
- if mtag.startswith(pattern[:-1]):
|
||
|
- minion = data["id"]
|
||
|
- if op == "ping_return":
|
||
|
- self.minions.add(minion)
|
||
|
- if self.targeted_minions == self.minions:
|
||
|
- self.event.io_loop.spawn_callback(self.start_batch)
|
||
|
- elif op == "find_job_return":
|
||
|
- if data.get("return", None):
|
||
|
- self.find_job_returned.add(minion)
|
||
|
- elif op == "batch_run":
|
||
|
- if minion in self.active:
|
||
|
- self.active.remove(minion)
|
||
|
- self.done_minions.add(minion)
|
||
|
- self.event.io_loop.spawn_callback(self.schedule_next)
|
||
|
- except Exception as ex:
|
||
|
- log.error("Exception occured while processing event: {}".format(ex))
|
||
|
+ minion = data["id"]
|
||
|
+ if op == "ping_return":
|
||
|
+ self.minions.add(minion)
|
||
|
+ if self.targeted_minions == self.minions:
|
||
|
+ yield self.start_batch()
|
||
|
+ elif op == "find_job_return":
|
||
|
+ if data.get("return", None):
|
||
|
+ self.find_job_returned.add(minion)
|
||
|
+ elif op == "batch_run":
|
||
|
+ if minion in self.active:
|
||
|
+ self.active.remove(minion)
|
||
|
+ self.done_minions.add(minion)
|
||
|
+ yield self.schedule_next()
|
||
|
+ except Exception as ex: # pylint: disable=W0703
|
||
|
+ log.error(
|
||
|
+ "Exception occured while processing event: %s: %s",
|
||
|
+ tag,
|
||
|
+ ex,
|
||
|
+ exc_info=True,
|
||
|
+ )
|
||
|
|
||
|
def _get_next(self):
|
||
|
to_run = (
|
||
|
@@ -139,176 +314,203 @@ class BatchAsync:
|
||
|
)
|
||
|
return set(list(to_run)[:next_batch_size])
|
||
|
|
||
|
+ @salt.ext.tornado.gen.coroutine
|
||
|
def check_find_job(self, batch_minions, jid):
|
||
|
- if self.event:
|
||
|
- find_job_return_pattern = "salt/job/{}/ret/*".format(jid)
|
||
|
- self.event.unsubscribe(find_job_return_pattern, match_type="glob")
|
||
|
- self.patterns.remove((find_job_return_pattern, "find_job_return"))
|
||
|
-
|
||
|
- timedout_minions = batch_minions.difference(
|
||
|
- self.find_job_returned
|
||
|
- ).difference(self.done_minions)
|
||
|
- self.timedout_minions = self.timedout_minions.union(timedout_minions)
|
||
|
- self.active = self.active.difference(self.timedout_minions)
|
||
|
- running = batch_minions.difference(self.done_minions).difference(
|
||
|
- self.timedout_minions
|
||
|
- )
|
||
|
+ """
|
||
|
+ Check if the job with specified ``jid`` was finished on the minions
|
||
|
+ """
|
||
|
+ if not self.event:
|
||
|
+ return
|
||
|
+ self.events_channel.unsubscribe(jid, "find_job_return", id(self))
|
||
|
|
||
|
- if timedout_minions:
|
||
|
- self.schedule_next()
|
||
|
+ timedout_minions = batch_minions.difference(self.find_job_returned).difference(
|
||
|
+ self.done_minions
|
||
|
+ )
|
||
|
+ self.timedout_minions = self.timedout_minions.union(timedout_minions)
|
||
|
+ self.active = self.active.difference(self.timedout_minions)
|
||
|
+ running = batch_minions.difference(self.done_minions).difference(
|
||
|
+ self.timedout_minions
|
||
|
+ )
|
||
|
|
||
|
- if self.event and running:
|
||
|
- self.find_job_returned = self.find_job_returned.difference(running)
|
||
|
- self.event.io_loop.spawn_callback(self.find_job, running)
|
||
|
+ if timedout_minions:
|
||
|
+ yield self.schedule_next()
|
||
|
+
|
||
|
+ if self.event and running:
|
||
|
+ self.find_job_returned = self.find_job_returned.difference(running)
|
||
|
+ yield self.find_job(running)
|
||
|
|
||
|
@salt.ext.tornado.gen.coroutine
|
||
|
def find_job(self, minions):
|
||
|
- if self.event:
|
||
|
- not_done = minions.difference(self.done_minions).difference(
|
||
|
- self.timedout_minions
|
||
|
+ """
|
||
|
+ Find if the job was finished on the minions
|
||
|
+ """
|
||
|
+ if not self.event:
|
||
|
+ return
|
||
|
+ not_done = minions.difference(self.done_minions).difference(
|
||
|
+ self.timedout_minions
|
||
|
+ )
|
||
|
+ if not not_done:
|
||
|
+ return
|
||
|
+ try:
|
||
|
+ jid = self.jid_gen()
|
||
|
+ self.events_channel.subscribe(
|
||
|
+ jid, "find_job_return", id(self), self.__event_handler
|
||
|
)
|
||
|
- try:
|
||
|
- if not_done:
|
||
|
- jid = self.jid_gen()
|
||
|
- find_job_return_pattern = "salt/job/{}/ret/*".format(jid)
|
||
|
- self.patterns.add((find_job_return_pattern, "find_job_return"))
|
||
|
- self.event.subscribe(find_job_return_pattern, match_type="glob")
|
||
|
- ret = yield self.local.run_job_async(
|
||
|
- not_done,
|
||
|
- "saltutil.find_job",
|
||
|
- [self.batch_jid],
|
||
|
- "list",
|
||
|
- gather_job_timeout=self.opts["gather_job_timeout"],
|
||
|
- jid=jid,
|
||
|
- **self.eauth
|
||
|
- )
|
||
|
- yield salt.ext.tornado.gen.sleep(self.opts["gather_job_timeout"])
|
||
|
- if self.event:
|
||
|
- self.event.io_loop.spawn_callback(
|
||
|
- self.check_find_job, not_done, jid
|
||
|
- )
|
||
|
- except Exception as ex:
|
||
|
- log.error(
|
||
|
- "Exception occured handling batch async: {}. Aborting execution.".format(
|
||
|
- ex
|
||
|
- )
|
||
|
- )
|
||
|
- self.close_safe()
|
||
|
+ ret = yield self.events_channel.local_client.run_job_async(
|
||
|
+ not_done,
|
||
|
+ "saltutil.find_job",
|
||
|
+ [self.batch_jid],
|
||
|
+ "list",
|
||
|
+ gather_job_timeout=self.opts["gather_job_timeout"],
|
||
|
+ jid=jid,
|
||
|
+ io_loop=self.io_loop,
|
||
|
+ listen=False,
|
||
|
+ **self.eauth,
|
||
|
+ )
|
||
|
+ yield salt.ext.tornado.gen.sleep(self.opts["gather_job_timeout"])
|
||
|
+ if self.event:
|
||
|
+ yield self.check_find_job(not_done, jid)
|
||
|
+ except Exception as ex: # pylint: disable=W0703
|
||
|
+ log.error(
|
||
|
+ "Exception occured handling batch async: %s. Aborting execution.",
|
||
|
+ ex,
|
||
|
+ exc_info=True,
|
||
|
+ )
|
||
|
+ self.close_safe()
|
||
|
|
||
|
@salt.ext.tornado.gen.coroutine
|
||
|
def start(self):
|
||
|
+ """
|
||
|
+ Start the batch execution
|
||
|
+ """
|
||
|
+ if not self.event:
|
||
|
+ return
|
||
|
+ self.__set_event_handler()
|
||
|
+ ping_return = yield self.events_channel.local_client.run_job_async(
|
||
|
+ self.opts["tgt"],
|
||
|
+ "test.ping",
|
||
|
+ [],
|
||
|
+ self.opts.get("selected_target_option", self.opts.get("tgt_type", "glob")),
|
||
|
+ gather_job_timeout=self.opts["gather_job_timeout"],
|
||
|
+ jid=self.ping_jid,
|
||
|
+ metadata=self.metadata,
|
||
|
+ io_loop=self.io_loop,
|
||
|
+ listen=False,
|
||
|
+ **self.eauth,
|
||
|
+ )
|
||
|
+ self.targeted_minions = set(ping_return["minions"])
|
||
|
+ # start batching even if not all minions respond to ping
|
||
|
+ yield salt.ext.tornado.gen.sleep(
|
||
|
+ self.batch_presence_ping_timeout or self.opts["gather_job_timeout"]
|
||
|
+ )
|
||
|
if self.event:
|
||
|
- self.__set_event_handler()
|
||
|
- ping_return = yield self.local.run_job_async(
|
||
|
- self.opts["tgt"],
|
||
|
- "test.ping",
|
||
|
- [],
|
||
|
- self.opts.get(
|
||
|
- "selected_target_option", self.opts.get("tgt_type", "glob")
|
||
|
- ),
|
||
|
- gather_job_timeout=self.opts["gather_job_timeout"],
|
||
|
- jid=self.ping_jid,
|
||
|
- metadata=self.metadata,
|
||
|
- **self.eauth
|
||
|
- )
|
||
|
- self.targeted_minions = set(ping_return["minions"])
|
||
|
- # start batching even if not all minions respond to ping
|
||
|
- yield salt.ext.tornado.gen.sleep(
|
||
|
- self.batch_presence_ping_timeout or self.opts["gather_job_timeout"]
|
||
|
- )
|
||
|
- if self.event:
|
||
|
- self.event.io_loop.spawn_callback(self.start_batch)
|
||
|
+ yield self.start_batch()
|
||
|
|
||
|
@salt.ext.tornado.gen.coroutine
|
||
|
def start_batch(self):
|
||
|
- if not self.initialized:
|
||
|
- self.batch_size = get_bnum(self.opts, self.minions, True)
|
||
|
- self.initialized = True
|
||
|
- data = {
|
||
|
- "available_minions": self.minions,
|
||
|
- "down_minions": self.targeted_minions.difference(self.minions),
|
||
|
- "metadata": self.metadata,
|
||
|
- }
|
||
|
- ret = self.event.fire_event(
|
||
|
- data, "salt/batch/{}/start".format(self.batch_jid)
|
||
|
- )
|
||
|
- if self.event:
|
||
|
- self.event.io_loop.spawn_callback(self.run_next)
|
||
|
+ """
|
||
|
+ Fire `salt/batch/*/start` and continue batch with `run_next`
|
||
|
+ """
|
||
|
+ if self.initialized:
|
||
|
+ return
|
||
|
+ self.batch_size = get_bnum(self.opts, self.minions, True)
|
||
|
+ self.initialized = True
|
||
|
+ data = {
|
||
|
+ "available_minions": self.minions,
|
||
|
+ "down_minions": self.targeted_minions.difference(self.minions),
|
||
|
+ "metadata": self.metadata,
|
||
|
+ }
|
||
|
+ yield self.events_channel.master_event.fire_event_async(
|
||
|
+ data, f"salt/batch/{self.batch_jid}/start"
|
||
|
+ )
|
||
|
+ if self.event:
|
||
|
+ yield self.run_next()
|
||
|
|
||
|
@salt.ext.tornado.gen.coroutine
|
||
|
def end_batch(self):
|
||
|
+ """
|
||
|
+ End the batch and call safe closing
|
||
|
+ """
|
||
|
left = self.minions.symmetric_difference(
|
||
|
self.done_minions.union(self.timedout_minions)
|
||
|
)
|
||
|
- if not left and not self.ended:
|
||
|
- self.ended = True
|
||
|
- data = {
|
||
|
- "available_minions": self.minions,
|
||
|
- "down_minions": self.targeted_minions.difference(self.minions),
|
||
|
- "done_minions": self.done_minions,
|
||
|
- "timedout_minions": self.timedout_minions,
|
||
|
- "metadata": self.metadata,
|
||
|
- }
|
||
|
- self.event.fire_event(data, "salt/batch/{}/done".format(self.batch_jid))
|
||
|
-
|
||
|
- # release to the IOLoop to allow the event to be published
|
||
|
- # before closing batch async execution
|
||
|
- yield salt.ext.tornado.gen.sleep(1)
|
||
|
- self.close_safe()
|
||
|
+ # Send salt/batch/*/done only if there is nothing to do
|
||
|
+ # and the event haven't been sent already
|
||
|
+ if left or self.ended:
|
||
|
+ return
|
||
|
+ self.ended = True
|
||
|
+ data = {
|
||
|
+ "available_minions": self.minions,
|
||
|
+ "down_minions": self.targeted_minions.difference(self.minions),
|
||
|
+ "done_minions": self.done_minions,
|
||
|
+ "timedout_minions": self.timedout_minions,
|
||
|
+ "metadata": self.metadata,
|
||
|
+ }
|
||
|
+ yield self.events_channel.master_event.fire_event_async(
|
||
|
+ data, f"salt/batch/{self.batch_jid}/done"
|
||
|
+ )
|
||
|
+
|
||
|
+ # release to the IOLoop to allow the event to be published
|
||
|
+ # before closing batch async execution
|
||
|
+ yield salt.ext.tornado.gen.sleep(1)
|
||
|
+ self.close_safe()
|
||
|
|
||
|
def close_safe(self):
|
||
|
- for (pattern, label) in self.patterns:
|
||
|
- self.event.unsubscribe(pattern, match_type="glob")
|
||
|
- self.event.remove_event_handler(self.__event_handler)
|
||
|
+ if self.events_channel is not None:
|
||
|
+ self.events_channel.unsubscribe(None, None, id(self))
|
||
|
+ self.events_channel.unuse(id(self))
|
||
|
+ self.events_channel = None
|
||
|
+ _destroy_unused_shared_events_channel()
|
||
|
self.event = None
|
||
|
- self.local = None
|
||
|
- self.ioloop = None
|
||
|
- del self
|
||
|
- gc.collect()
|
||
|
|
||
|
@salt.ext.tornado.gen.coroutine
|
||
|
def schedule_next(self):
|
||
|
- if not self.scheduled:
|
||
|
- self.scheduled = True
|
||
|
- # call later so that we maybe gather more returns
|
||
|
- yield salt.ext.tornado.gen.sleep(self.batch_delay)
|
||
|
- if self.event:
|
||
|
- self.event.io_loop.spawn_callback(self.run_next)
|
||
|
+ if self.scheduled:
|
||
|
+ return
|
||
|
+ self.scheduled = True
|
||
|
+ # call later so that we maybe gather more returns
|
||
|
+ yield salt.ext.tornado.gen.sleep(self.batch_delay)
|
||
|
+ if self.event:
|
||
|
+ yield self.run_next()
|
||
|
|
||
|
@salt.ext.tornado.gen.coroutine
|
||
|
def run_next(self):
|
||
|
+ """
|
||
|
+ Continue batch execution with the next targets
|
||
|
+ """
|
||
|
self.scheduled = False
|
||
|
next_batch = self._get_next()
|
||
|
- if next_batch:
|
||
|
- self.active = self.active.union(next_batch)
|
||
|
- try:
|
||
|
- ret = yield self.local.run_job_async(
|
||
|
- next_batch,
|
||
|
- self.opts["fun"],
|
||
|
- self.opts["arg"],
|
||
|
- "list",
|
||
|
- raw=self.opts.get("raw", False),
|
||
|
- ret=self.opts.get("return", ""),
|
||
|
- gather_job_timeout=self.opts["gather_job_timeout"],
|
||
|
- jid=self.batch_jid,
|
||
|
- metadata=self.metadata,
|
||
|
- )
|
||
|
-
|
||
|
- yield salt.ext.tornado.gen.sleep(self.opts["timeout"])
|
||
|
-
|
||
|
- # The batch can be done already at this point, which means no self.event
|
||
|
- if self.event:
|
||
|
- self.event.io_loop.spawn_callback(self.find_job, set(next_batch))
|
||
|
- except Exception as ex:
|
||
|
- log.error("Error in scheduling next batch: %s. Aborting execution", ex)
|
||
|
- self.active = self.active.difference(next_batch)
|
||
|
- self.close_safe()
|
||
|
- else:
|
||
|
+ if not next_batch:
|
||
|
yield self.end_batch()
|
||
|
- gc.collect()
|
||
|
+ return
|
||
|
+ self.active = self.active.union(next_batch)
|
||
|
+ try:
|
||
|
+ ret = yield self.events_channel.local_client.run_job_async(
|
||
|
+ next_batch,
|
||
|
+ self.opts["fun"],
|
||
|
+ self.opts["arg"],
|
||
|
+ "list",
|
||
|
+ raw=self.opts.get("raw", False),
|
||
|
+ ret=self.opts.get("return", ""),
|
||
|
+ gather_job_timeout=self.opts["gather_job_timeout"],
|
||
|
+ jid=self.batch_jid,
|
||
|
+ metadata=self.metadata,
|
||
|
+ io_loop=self.io_loop,
|
||
|
+ listen=False,
|
||
|
+ **self.eauth,
|
||
|
+ **self.extra_job_kwargs,
|
||
|
+ )
|
||
|
|
||
|
- def __del__(self):
|
||
|
- self.local = None
|
||
|
- self.event = None
|
||
|
- self.ioloop = None
|
||
|
- gc.collect()
|
||
|
+ yield salt.ext.tornado.gen.sleep(self.opts["timeout"])
|
||
|
+
|
||
|
+ # The batch can be done already at this point, which means no self.event
|
||
|
+ if self.event:
|
||
|
+ yield self.find_job(set(next_batch))
|
||
|
+ except Exception as ex: # pylint: disable=W0703
|
||
|
+ log.error(
|
||
|
+ "Error in scheduling next batch: %s. Aborting execution",
|
||
|
+ ex,
|
||
|
+ exc_info=True,
|
||
|
+ )
|
||
|
+ self.active = self.active.difference(next_batch)
|
||
|
+ self.close_safe()
|
||
|
diff --git a/salt/master.py b/salt/master.py
|
||
|
index 425b412148..d7182d10b5 100644
|
||
|
--- a/salt/master.py
|
||
|
+++ b/salt/master.py
|
||
|
@@ -2,6 +2,7 @@
|
||
|
This module contains all of the routines needed to set up a master server, this
|
||
|
involves preparing the three listeners and the workers needed by the master.
|
||
|
"""
|
||
|
+
|
||
|
import collections
|
||
|
import copy
|
||
|
import ctypes
|
||
|
@@ -19,7 +20,6 @@ import time
|
||
|
import salt.acl
|
||
|
import salt.auth
|
||
|
import salt.channel.server
|
||
|
-import salt.cli.batch_async
|
||
|
import salt.client
|
||
|
import salt.client.ssh.client
|
||
|
import salt.crypt
|
||
|
@@ -55,6 +55,7 @@ import salt.utils.user
|
||
|
import salt.utils.verify
|
||
|
import salt.utils.zeromq
|
||
|
import salt.wheel
|
||
|
+from salt.cli.batch_async import BatchAsync, batch_async_required
|
||
|
from salt.config import DEFAULT_INTERVAL
|
||
|
from salt.defaults import DEFAULT_TARGET_DELIM
|
||
|
from salt.ext.tornado.stack_context import StackContext
|
||
|
@@ -2174,9 +2175,9 @@ class ClearFuncs(TransportMethods):
|
||
|
def publish_batch(self, clear_load, minions, missing):
|
||
|
batch_load = {}
|
||
|
batch_load.update(clear_load)
|
||
|
- batch = salt.cli.batch_async.BatchAsync(
|
||
|
+ batch = BatchAsync(
|
||
|
self.local.opts,
|
||
|
- functools.partial(self._prep_jid, clear_load, {}),
|
||
|
+ lambda: self._prep_jid(clear_load, {}),
|
||
|
batch_load,
|
||
|
)
|
||
|
ioloop = salt.ext.tornado.ioloop.IOLoop.current()
|
||
|
@@ -2331,7 +2332,7 @@ class ClearFuncs(TransportMethods):
|
||
|
),
|
||
|
},
|
||
|
}
|
||
|
- if extra.get("batch", None):
|
||
|
+ if extra.get("batch", None) and batch_async_required(self.opts, minions, extra):
|
||
|
return self.publish_batch(clear_load, minions, missing)
|
||
|
|
||
|
jid = self._prep_jid(clear_load, extra)
|
||
|
diff --git a/tests/pytests/unit/cli/test_batch_async.py b/tests/pytests/unit/cli/test_batch_async.py
|
||
|
index e0774ffff3..bc871aba54 100644
|
||
|
--- a/tests/pytests/unit/cli/test_batch_async.py
|
||
|
+++ b/tests/pytests/unit/cli/test_batch_async.py
|
||
|
@@ -1,7 +1,7 @@
|
||
|
import pytest
|
||
|
|
||
|
import salt.ext.tornado
|
||
|
-from salt.cli.batch_async import BatchAsync
|
||
|
+from salt.cli.batch_async import BatchAsync, batch_async_required
|
||
|
from tests.support.mock import MagicMock, patch
|
||
|
|
||
|
|
||
|
@@ -22,16 +22,44 @@ def batch(temp_salt_master):
|
||
|
with patch("salt.cli.batch_async.batch_get_opts", MagicMock(return_value=opts)):
|
||
|
batch = BatchAsync(
|
||
|
opts,
|
||
|
- MagicMock(side_effect=["1234", "1235", "1236"]),
|
||
|
+ MagicMock(side_effect=["1234", "1235"]),
|
||
|
{
|
||
|
"tgt": "",
|
||
|
"fun": "",
|
||
|
- "kwargs": {"batch": "", "batch_presence_ping_timeout": 1},
|
||
|
+ "kwargs": {
|
||
|
+ "batch": "",
|
||
|
+ "batch_presence_ping_timeout": 1,
|
||
|
+ "metadata": {"mykey": "myvalue"},
|
||
|
+ },
|
||
|
},
|
||
|
)
|
||
|
yield batch
|
||
|
|
||
|
|
||
|
+@pytest.mark.parametrize(
|
||
|
+ "threshold,minions,batch,expected",
|
||
|
+ [
|
||
|
+ (1, 2, 200, True),
|
||
|
+ (1, 500, 200, True),
|
||
|
+ (0, 2, 200, False),
|
||
|
+ (0, 500, 200, False),
|
||
|
+ (-1, 2, 200, False),
|
||
|
+ (-1, 500, 200, True),
|
||
|
+ (-1, 9, 10, False),
|
||
|
+ (-1, 11, 10, True),
|
||
|
+ (10, 9, 8, False),
|
||
|
+ (10, 9, 10, False),
|
||
|
+ (10, 11, 8, True),
|
||
|
+ (10, 11, 10, True),
|
||
|
+ ],
|
||
|
+)
|
||
|
+def test_batch_async_required(threshold, minions, batch, expected):
|
||
|
+ minions_list = [f"minion{i}.example.org" for i in range(minions)]
|
||
|
+ batch_async_opts = {"batch_async": {"threshold": threshold}}
|
||
|
+ extra = {"batch": batch}
|
||
|
+ assert batch_async_required(batch_async_opts, minions_list, extra) == expected
|
||
|
+
|
||
|
+
|
||
|
def test_ping_jid(batch):
|
||
|
assert batch.ping_jid == "1234"
|
||
|
|
||
|
@@ -40,10 +68,6 @@ def test_batch_jid(batch):
|
||
|
assert batch.batch_jid == "1235"
|
||
|
|
||
|
|
||
|
-def test_find_job_jid(batch):
|
||
|
- assert batch.find_job_jid == "1236"
|
||
|
-
|
||
|
-
|
||
|
def test_batch_size(batch):
|
||
|
"""
|
||
|
Tests passing batch value as a number
|
||
|
@@ -55,58 +79,74 @@ def test_batch_size(batch):
|
||
|
|
||
|
|
||
|
def test_batch_start_on_batch_presence_ping_timeout(batch):
|
||
|
- # batch_async = BatchAsyncMock();
|
||
|
- batch.event = MagicMock()
|
||
|
+ future_ret = salt.ext.tornado.gen.Future()
|
||
|
+ future_ret.set_result({"minions": ["foo", "bar"]})
|
||
|
future = salt.ext.tornado.gen.Future()
|
||
|
- future.set_result({"minions": ["foo", "bar"]})
|
||
|
- batch.local.run_job_async.return_value = future
|
||
|
- with patch("salt.ext.tornado.gen.sleep", return_value=future):
|
||
|
- # ret = batch_async.start(batch)
|
||
|
+ future.set_result({})
|
||
|
+ with patch.object(batch, "events_channel", MagicMock()), patch(
|
||
|
+ "salt.ext.tornado.gen.sleep", return_value=future
|
||
|
+ ), patch.object(batch, "start_batch", return_value=future) as start_batch_mock:
|
||
|
+ batch.events_channel.local_client.run_job_async.return_value = future_ret
|
||
|
ret = batch.start()
|
||
|
- # assert start_batch is called later with batch_presence_ping_timeout as param
|
||
|
- assert batch.event.io_loop.spawn_callback.call_args[0] == (batch.start_batch,)
|
||
|
+ # assert start_batch is called
|
||
|
+ start_batch_mock.assert_called_once()
|
||
|
# assert test.ping called
|
||
|
- assert batch.local.run_job_async.call_args[0] == ("*", "test.ping", [], "glob")
|
||
|
+ assert batch.events_channel.local_client.run_job_async.call_args[0] == (
|
||
|
+ "*",
|
||
|
+ "test.ping",
|
||
|
+ [],
|
||
|
+ "glob",
|
||
|
+ )
|
||
|
# assert targeted_minions == all minions matched by tgt
|
||
|
assert batch.targeted_minions == {"foo", "bar"}
|
||
|
|
||
|
|
||
|
def test_batch_start_on_gather_job_timeout(batch):
|
||
|
- # batch_async = BatchAsyncMock();
|
||
|
- batch.event = MagicMock()
|
||
|
future = salt.ext.tornado.gen.Future()
|
||
|
- future.set_result({"minions": ["foo", "bar"]})
|
||
|
- batch.local.run_job_async.return_value = future
|
||
|
+ future.set_result({})
|
||
|
+ future_ret = salt.ext.tornado.gen.Future()
|
||
|
+ future_ret.set_result({"minions": ["foo", "bar"]})
|
||
|
batch.batch_presence_ping_timeout = None
|
||
|
- with patch("salt.ext.tornado.gen.sleep", return_value=future):
|
||
|
+ with patch.object(batch, "events_channel", MagicMock()), patch(
|
||
|
+ "salt.ext.tornado.gen.sleep", return_value=future
|
||
|
+ ), patch.object(
|
||
|
+ batch, "start_batch", return_value=future
|
||
|
+ ) as start_batch_mock, patch.object(
|
||
|
+ batch, "batch_presence_ping_timeout", None
|
||
|
+ ):
|
||
|
+ batch.events_channel.local_client.run_job_async.return_value = future_ret
|
||
|
# ret = batch_async.start(batch)
|
||
|
ret = batch.start()
|
||
|
- # assert start_batch is called later with gather_job_timeout as param
|
||
|
- assert batch.event.io_loop.spawn_callback.call_args[0] == (batch.start_batch,)
|
||
|
+ # assert start_batch is called
|
||
|
+ start_batch_mock.assert_called_once()
|
||
|
|
||
|
|
||
|
def test_batch_fire_start_event(batch):
|
||
|
batch.minions = {"foo", "bar"}
|
||
|
batch.opts = {"batch": "2", "timeout": 5}
|
||
|
- batch.event = MagicMock()
|
||
|
- batch.metadata = {"mykey": "myvalue"}
|
||
|
- batch.start_batch()
|
||
|
- assert batch.event.fire_event.call_args[0] == (
|
||
|
- {
|
||
|
- "available_minions": {"foo", "bar"},
|
||
|
- "down_minions": set(),
|
||
|
- "metadata": batch.metadata,
|
||
|
- },
|
||
|
- "salt/batch/1235/start",
|
||
|
- )
|
||
|
+ with patch.object(batch, "events_channel", MagicMock()):
|
||
|
+ batch.start_batch()
|
||
|
+ assert batch.events_channel.master_event.fire_event_async.call_args[0] == (
|
||
|
+ {
|
||
|
+ "available_minions": {"foo", "bar"},
|
||
|
+ "down_minions": set(),
|
||
|
+ "metadata": batch.metadata,
|
||
|
+ },
|
||
|
+ "salt/batch/1235/start",
|
||
|
+ )
|
||
|
|
||
|
|
||
|
def test_start_batch_calls_next(batch):
|
||
|
- batch.run_next = MagicMock(return_value=MagicMock())
|
||
|
- batch.event = MagicMock()
|
||
|
- batch.start_batch()
|
||
|
- assert batch.initialized
|
||
|
- assert batch.event.io_loop.spawn_callback.call_args[0] == (batch.run_next,)
|
||
|
+ batch.initialized = False
|
||
|
+ future = salt.ext.tornado.gen.Future()
|
||
|
+ future.set_result({})
|
||
|
+ with patch.object(batch, "event", MagicMock()), patch.object(
|
||
|
+ batch, "events_channel", MagicMock()
|
||
|
+ ), patch.object(batch, "run_next", return_value=future) as run_next_mock:
|
||
|
+ batch.events_channel.master_event.fire_event_async.return_value = future
|
||
|
+ batch.start_batch()
|
||
|
+ assert batch.initialized
|
||
|
+ run_next_mock.assert_called_once()
|
||
|
|
||
|
|
||
|
def test_batch_fire_done_event(batch):
|
||
|
@@ -114,69 +154,52 @@ def test_batch_fire_done_event(batch):
|
||
|
batch.minions = {"foo", "bar"}
|
||
|
batch.done_minions = {"foo"}
|
||
|
batch.timedout_minions = {"bar"}
|
||
|
- batch.event = MagicMock()
|
||
|
- batch.metadata = {"mykey": "myvalue"}
|
||
|
- old_event = batch.event
|
||
|
- batch.end_batch()
|
||
|
- assert old_event.fire_event.call_args[0] == (
|
||
|
- {
|
||
|
- "available_minions": {"foo", "bar"},
|
||
|
- "done_minions": batch.done_minions,
|
||
|
- "down_minions": {"baz"},
|
||
|
- "timedout_minions": batch.timedout_minions,
|
||
|
- "metadata": batch.metadata,
|
||
|
- },
|
||
|
- "salt/batch/1235/done",
|
||
|
- )
|
||
|
-
|
||
|
-
|
||
|
-def test_batch__del__(batch):
|
||
|
- batch = BatchAsync(MagicMock(), MagicMock(), MagicMock())
|
||
|
- event = MagicMock()
|
||
|
- batch.event = event
|
||
|
- batch.__del__()
|
||
|
- assert batch.local is None
|
||
|
- assert batch.event is None
|
||
|
- assert batch.ioloop is None
|
||
|
+ with patch.object(batch, "events_channel", MagicMock()):
|
||
|
+ batch.end_batch()
|
||
|
+ assert batch.events_channel.master_event.fire_event_async.call_args[0] == (
|
||
|
+ {
|
||
|
+ "available_minions": {"foo", "bar"},
|
||
|
+ "done_minions": batch.done_minions,
|
||
|
+ "down_minions": {"baz"},
|
||
|
+ "timedout_minions": batch.timedout_minions,
|
||
|
+ "metadata": batch.metadata,
|
||
|
+ },
|
||
|
+ "salt/batch/1235/done",
|
||
|
+ )
|
||
|
|
||
|
|
||
|
def test_batch_close_safe(batch):
|
||
|
- batch = BatchAsync(MagicMock(), MagicMock(), MagicMock())
|
||
|
- event = MagicMock()
|
||
|
- batch.event = event
|
||
|
- batch.patterns = {
|
||
|
- ("salt/job/1234/ret/*", "find_job_return"),
|
||
|
- ("salt/job/4321/ret/*", "find_job_return"),
|
||
|
- }
|
||
|
- batch.close_safe()
|
||
|
- assert batch.local is None
|
||
|
- assert batch.event is None
|
||
|
- assert batch.ioloop is None
|
||
|
- assert len(event.unsubscribe.mock_calls) == 2
|
||
|
- assert len(event.remove_event_handler.mock_calls) == 1
|
||
|
+ with patch.object(
|
||
|
+ batch, "events_channel", MagicMock()
|
||
|
+ ) as events_channel_mock, patch.object(batch, "event", MagicMock()):
|
||
|
+ batch.close_safe()
|
||
|
+ batch.close_safe()
|
||
|
+ assert batch.events_channel is None
|
||
|
+ assert batch.event is None
|
||
|
+ events_channel_mock.unsubscribe.assert_called_once()
|
||
|
+ events_channel_mock.unuse.assert_called_once()
|
||
|
|
||
|
|
||
|
def test_batch_next(batch):
|
||
|
- batch.event = MagicMock()
|
||
|
batch.opts["fun"] = "my.fun"
|
||
|
batch.opts["arg"] = []
|
||
|
- batch._get_next = MagicMock(return_value={"foo", "bar"})
|
||
|
batch.batch_size = 2
|
||
|
future = salt.ext.tornado.gen.Future()
|
||
|
- future.set_result({"minions": ["foo", "bar"]})
|
||
|
- batch.local.run_job_async.return_value = future
|
||
|
- with patch("salt.ext.tornado.gen.sleep", return_value=future):
|
||
|
+ future.set_result({})
|
||
|
+ with patch("salt.ext.tornado.gen.sleep", return_value=future), patch.object(
|
||
|
+ batch, "events_channel", MagicMock()
|
||
|
+ ), patch.object(batch, "_get_next", return_value={"foo", "bar"}), patch.object(
|
||
|
+ batch, "find_job", return_value=future
|
||
|
+ ) as find_job_mock:
|
||
|
+ batch.events_channel.local_client.run_job_async.return_value = future
|
||
|
batch.run_next()
|
||
|
- assert batch.local.run_job_async.call_args[0] == (
|
||
|
+ assert batch.events_channel.local_client.run_job_async.call_args[0] == (
|
||
|
{"foo", "bar"},
|
||
|
"my.fun",
|
||
|
[],
|
||
|
"list",
|
||
|
)
|
||
|
- assert batch.event.io_loop.spawn_callback.call_args[0] == (
|
||
|
- batch.find_job,
|
||
|
- {"foo", "bar"},
|
||
|
- )
|
||
|
+ assert find_job_mock.call_args[0] == ({"foo", "bar"},)
|
||
|
assert batch.active == {"bar", "foo"}
|
||
|
|
||
|
|
||
|
@@ -239,124 +262,132 @@ def test_next_batch_all_timedout(batch):
|
||
|
|
||
|
def test_batch__event_handler_ping_return(batch):
|
||
|
batch.targeted_minions = {"foo"}
|
||
|
- batch.event = MagicMock(
|
||
|
- unpack=MagicMock(return_value=("salt/job/1234/ret/foo", {"id": "foo"}))
|
||
|
- )
|
||
|
batch.start()
|
||
|
assert batch.minions == set()
|
||
|
- batch._BatchAsync__event_handler(MagicMock())
|
||
|
+ batch._BatchAsync__event_handler(
|
||
|
+ "salt/job/1234/ret/foo", {"id": "foo"}, "ping_return"
|
||
|
+ )
|
||
|
assert batch.minions == {"foo"}
|
||
|
assert batch.done_minions == set()
|
||
|
|
||
|
|
||
|
def test_batch__event_handler_call_start_batch_when_all_pings_return(batch):
|
||
|
batch.targeted_minions = {"foo"}
|
||
|
- batch.event = MagicMock(
|
||
|
- unpack=MagicMock(return_value=("salt/job/1234/ret/foo", {"id": "foo"}))
|
||
|
- )
|
||
|
- batch.start()
|
||
|
- batch._BatchAsync__event_handler(MagicMock())
|
||
|
- assert batch.event.io_loop.spawn_callback.call_args[0] == (batch.start_batch,)
|
||
|
+ future = salt.ext.tornado.gen.Future()
|
||
|
+ future.set_result({})
|
||
|
+ with patch.object(batch, "start_batch", return_value=future) as start_batch_mock:
|
||
|
+ batch.start()
|
||
|
+ batch._BatchAsync__event_handler(
|
||
|
+ "salt/job/1234/ret/foo", {"id": "foo"}, "ping_return"
|
||
|
+ )
|
||
|
+ start_batch_mock.assert_called_once()
|
||
|
|
||
|
|
||
|
def test_batch__event_handler_not_call_start_batch_when_not_all_pings_return(batch):
|
||
|
batch.targeted_minions = {"foo", "bar"}
|
||
|
- batch.event = MagicMock(
|
||
|
- unpack=MagicMock(return_value=("salt/job/1234/ret/foo", {"id": "foo"}))
|
||
|
- )
|
||
|
- batch.start()
|
||
|
- batch._BatchAsync__event_handler(MagicMock())
|
||
|
- assert len(batch.event.io_loop.spawn_callback.mock_calls) == 0
|
||
|
+ future = salt.ext.tornado.gen.Future()
|
||
|
+ future.set_result({})
|
||
|
+ with patch.object(batch, "start_batch", return_value=future) as start_batch_mock:
|
||
|
+ batch.start()
|
||
|
+ batch._BatchAsync__event_handler(
|
||
|
+ "salt/job/1234/ret/foo", {"id": "foo"}, "ping_return"
|
||
|
+ )
|
||
|
+ start_batch_mock.assert_not_called()
|
||
|
|
||
|
|
||
|
def test_batch__event_handler_batch_run_return(batch):
|
||
|
- batch.event = MagicMock(
|
||
|
- unpack=MagicMock(return_value=("salt/job/1235/ret/foo", {"id": "foo"}))
|
||
|
- )
|
||
|
- batch.start()
|
||
|
- batch.active = {"foo"}
|
||
|
- batch._BatchAsync__event_handler(MagicMock())
|
||
|
- assert batch.active == set()
|
||
|
- assert batch.done_minions == {"foo"}
|
||
|
- assert batch.event.io_loop.spawn_callback.call_args[0] == (batch.schedule_next,)
|
||
|
+ future = salt.ext.tornado.gen.Future()
|
||
|
+ future.set_result({})
|
||
|
+ with patch.object(
|
||
|
+ batch, "schedule_next", return_value=future
|
||
|
+ ) as schedule_next_mock:
|
||
|
+ batch.start()
|
||
|
+ batch.active = {"foo"}
|
||
|
+ batch._BatchAsync__event_handler(
|
||
|
+ "salt/job/1235/ret/foo", {"id": "foo"}, "batch_run"
|
||
|
+ )
|
||
|
+ assert batch.active == set()
|
||
|
+ assert batch.done_minions == {"foo"}
|
||
|
+ schedule_next_mock.assert_called_once()
|
||
|
|
||
|
|
||
|
def test_batch__event_handler_find_job_return(batch):
|
||
|
- batch.event = MagicMock(
|
||
|
- unpack=MagicMock(
|
||
|
- return_value=(
|
||
|
- "salt/job/1236/ret/foo",
|
||
|
- {"id": "foo", "return": "deadbeaf"},
|
||
|
- )
|
||
|
- )
|
||
|
- )
|
||
|
batch.start()
|
||
|
- batch.patterns.add(("salt/job/1236/ret/*", "find_job_return"))
|
||
|
- batch._BatchAsync__event_handler(MagicMock())
|
||
|
+ batch._BatchAsync__event_handler(
|
||
|
+ "salt/job/1236/ret/foo", {"id": "foo", "return": "deadbeaf"}, "find_job_return"
|
||
|
+ )
|
||
|
assert batch.find_job_returned == {"foo"}
|
||
|
|
||
|
|
||
|
def test_batch_run_next_end_batch_when_no_next(batch):
|
||
|
- batch.end_batch = MagicMock()
|
||
|
- batch._get_next = MagicMock(return_value={})
|
||
|
- batch.run_next()
|
||
|
- assert len(batch.end_batch.mock_calls) == 1
|
||
|
+ future = salt.ext.tornado.gen.Future()
|
||
|
+ future.set_result({})
|
||
|
+ with patch.object(
|
||
|
+ batch, "_get_next", return_value={}
|
||
|
+ ), patch.object(
|
||
|
+ batch, "end_batch", return_value=future
|
||
|
+ ) as end_batch_mock:
|
||
|
+ batch.run_next()
|
||
|
+ end_batch_mock.assert_called_once()
|
||
|
|
||
|
|
||
|
def test_batch_find_job(batch):
|
||
|
- batch.event = MagicMock()
|
||
|
future = salt.ext.tornado.gen.Future()
|
||
|
future.set_result({})
|
||
|
- batch.local.run_job_async.return_value = future
|
||
|
batch.minions = {"foo", "bar"}
|
||
|
- batch.jid_gen = MagicMock(return_value="1234")
|
||
|
- with patch("salt.ext.tornado.gen.sleep", return_value=future):
|
||
|
+ with patch("salt.ext.tornado.gen.sleep", return_value=future), patch.object(
|
||
|
+ batch, "check_find_job", return_value=future
|
||
|
+ ) as check_find_job_mock, patch.object(
|
||
|
+ batch, "jid_gen", return_value="1236"
|
||
|
+ ):
|
||
|
+ batch.events_channel.local_client.run_job_async.return_value = future
|
||
|
batch.find_job({"foo", "bar"})
|
||
|
- assert batch.event.io_loop.spawn_callback.call_args[0] == (
|
||
|
- batch.check_find_job,
|
||
|
+ assert check_find_job_mock.call_args[0] == (
|
||
|
{"foo", "bar"},
|
||
|
- "1234",
|
||
|
+ "1236",
|
||
|
)
|
||
|
|
||
|
|
||
|
def test_batch_find_job_with_done_minions(batch):
|
||
|
batch.done_minions = {"bar"}
|
||
|
- batch.event = MagicMock()
|
||
|
future = salt.ext.tornado.gen.Future()
|
||
|
future.set_result({})
|
||
|
- batch.local.run_job_async.return_value = future
|
||
|
batch.minions = {"foo", "bar"}
|
||
|
- batch.jid_gen = MagicMock(return_value="1234")
|
||
|
- with patch("salt.ext.tornado.gen.sleep", return_value=future):
|
||
|
+ with patch("salt.ext.tornado.gen.sleep", return_value=future), patch.object(
|
||
|
+ batch, "check_find_job", return_value=future
|
||
|
+ ) as check_find_job_mock, patch.object(
|
||
|
+ batch, "jid_gen", return_value="1236"
|
||
|
+ ):
|
||
|
+ batch.events_channel.local_client.run_job_async.return_value = future
|
||
|
batch.find_job({"foo", "bar"})
|
||
|
- assert batch.event.io_loop.spawn_callback.call_args[0] == (
|
||
|
- batch.check_find_job,
|
||
|
+ assert check_find_job_mock.call_args[0] == (
|
||
|
{"foo"},
|
||
|
- "1234",
|
||
|
+ "1236",
|
||
|
)
|
||
|
|
||
|
|
||
|
def test_batch_check_find_job_did_not_return(batch):
|
||
|
- batch.event = MagicMock()
|
||
|
batch.active = {"foo"}
|
||
|
batch.find_job_returned = set()
|
||
|
- batch.patterns = {("salt/job/1234/ret/*", "find_job_return")}
|
||
|
- batch.check_find_job({"foo"}, jid="1234")
|
||
|
- assert batch.find_job_returned == set()
|
||
|
- assert batch.active == set()
|
||
|
- assert len(batch.event.io_loop.add_callback.mock_calls) == 0
|
||
|
+ future = salt.ext.tornado.gen.Future()
|
||
|
+ future.set_result({})
|
||
|
+ with patch.object(batch, "find_job", return_value=future) as find_job_mock:
|
||
|
+ batch.check_find_job({"foo"}, jid="1234")
|
||
|
+ assert batch.find_job_returned == set()
|
||
|
+ assert batch.active == set()
|
||
|
+ find_job_mock.assert_not_called()
|
||
|
|
||
|
|
||
|
def test_batch_check_find_job_did_return(batch):
|
||
|
- batch.event = MagicMock()
|
||
|
batch.find_job_returned = {"foo"}
|
||
|
- batch.patterns = {("salt/job/1234/ret/*", "find_job_return")}
|
||
|
- batch.check_find_job({"foo"}, jid="1234")
|
||
|
- assert batch.event.io_loop.spawn_callback.call_args[0] == (batch.find_job, {"foo"})
|
||
|
+ future = salt.ext.tornado.gen.Future()
|
||
|
+ future.set_result({})
|
||
|
+ with patch.object(batch, "find_job", return_value=future) as find_job_mock:
|
||
|
+ batch.check_find_job({"foo"}, jid="1234")
|
||
|
+ find_job_mock.assert_called_once_with({"foo"})
|
||
|
|
||
|
|
||
|
def test_batch_check_find_job_multiple_states(batch):
|
||
|
- batch.event = MagicMock()
|
||
|
# currently running minions
|
||
|
batch.active = {"foo", "bar"}
|
||
|
|
||
|
@@ -372,21 +403,28 @@ def test_batch_check_find_job_multiple_states(batch):
|
||
|
# both not yet done but only 'foo' responded to find_job
|
||
|
not_done = {"foo", "bar"}
|
||
|
|
||
|
- batch.patterns = {("salt/job/1234/ret/*", "find_job_return")}
|
||
|
- batch.check_find_job(not_done, jid="1234")
|
||
|
+ future = salt.ext.tornado.gen.Future()
|
||
|
+ future.set_result({})
|
||
|
|
||
|
- # assert 'bar' removed from active
|
||
|
- assert batch.active == {"foo"}
|
||
|
+ with patch.object(batch, "schedule_next", return_value=future), patch.object(
|
||
|
+ batch, "find_job", return_value=future
|
||
|
+ ) as find_job_mock:
|
||
|
+ batch.check_find_job(not_done, jid="1234")
|
||
|
|
||
|
- # assert 'bar' added to timedout_minions
|
||
|
- assert batch.timedout_minions == {"bar", "faz"}
|
||
|
+ # assert 'bar' removed from active
|
||
|
+ assert batch.active == {"foo"}
|
||
|
|
||
|
- # assert 'find_job' schedueled again only for 'foo'
|
||
|
- assert batch.event.io_loop.spawn_callback.call_args[0] == (batch.find_job, {"foo"})
|
||
|
+ # assert 'bar' added to timedout_minions
|
||
|
+ assert batch.timedout_minions == {"bar", "faz"}
|
||
|
+
|
||
|
+ # assert 'find_job' schedueled again only for 'foo'
|
||
|
+ find_job_mock.assert_called_once_with({"foo"})
|
||
|
|
||
|
|
||
|
def test_only_on_run_next_is_scheduled(batch):
|
||
|
- batch.event = MagicMock()
|
||
|
+ future = salt.ext.tornado.gen.Future()
|
||
|
+ future.set_result({})
|
||
|
batch.scheduled = True
|
||
|
- batch.schedule_next()
|
||
|
- assert len(batch.event.io_loop.spawn_callback.mock_calls) == 0
|
||
|
+ with patch.object(batch, "run_next", return_value=future) as run_next_mock:
|
||
|
+ batch.schedule_next()
|
||
|
+ run_next_mock.assert_not_called()
|
||
|
--
|
||
|
2.45.0
|
||
|
|