From 8d1aba4e450922ec7ae4ce5fcf13dc5f7d2b8b7e Mon Sep 17 00:00:00 2001 From: Victor Zhestkov Date: Thu, 24 Feb 2022 16:52:24 +0300 Subject: [PATCH] Add salt-ssh support with venv-salt-minion - 3004 (#493) * Add salt-ssh support with venv-salt-minion * Add some comments and drop the commented line * Fix return in check_venv_hash_file * Convert all script parameters to strings * Reduce the size of minion response Minion response contains SSH_PY_CODE wrapped to base64. This fix reduces the size of the response in DEBUG logging * Make VENV_HASH_FILE global * Pass the context to roster modules * Avoid race condition on loading roster modules * Prevent simultaneous to salt-ssh minion * Make ssh session grace time configurable * Prevent possible segfault by GC * Revert "Avoid race condition on loading roster modules" This reverts commit 8ff822a162cc494d3528184aef983ad20e09f4e2. * Prevent deadlocks with importlib on using LazyLoader * Make logging on salt-ssh errors more informative * Add comments about using salt.loader.LOAD_LOCK * Fix test_loader test * Prevent deadlocks on using logging * Use collections.deque instead of list for salt-ssh Suggested by @agraul * Get proper exitstatus from salt.utils.vt.Terminal to prevent empty event returns due to improperly detecting the child process as failed * Do not run pre flight script for raw_shell --- salt/_logging/impl.py | 59 ++++++----- salt/client/ssh/__init__.py | 174 ++++++++++++++++++++++++++++----- salt/client/ssh/client.py | 7 +- salt/client/ssh/shell.py | 8 ++ salt/client/ssh/ssh_py_shim.py | 108 +++++++++++--------- salt/loader/__init__.py | 31 +++++- salt/netapi/__init__.py | 3 +- salt/roster/__init__.py | 6 +- tests/unit/test_loader.py | 2 +- 9 files changed, 292 insertions(+), 106 deletions(-) diff --git a/salt/_logging/impl.py b/salt/_logging/impl.py index 779316ce0b..953490b284 100644 --- a/salt/_logging/impl.py +++ b/salt/_logging/impl.py @@ -7,6 +7,7 @@ import logging import re import sys +import threading import types # Let's define these custom logging levels before importing the salt._logging.mixins @@ -89,6 +90,10 @@ SORTED_LEVEL_NAMES = [l[0] for l in sorted(LOG_LEVELS.items(), key=lambda x: x[1 MODNAME_PATTERN = re.compile(r"(?P%%\(name\)(?:\-(?P[\d]+))?s)") +# LOG_LOCK is used to prevent deadlocks on using logging +# in combination with multiprocessing with salt-api +LOG_LOCK = threading.Lock() + # ----- REMOVE ME ON REFACTOR COMPLETE ------------------------------------------------------------------------------> class __NullLoggingHandler(TemporaryLoggingHandler): @@ -283,31 +288,35 @@ class SaltLoggingClass( else: extra["exc_info_on_loglevel"] = exc_info_on_loglevel - if sys.version_info < (3,): - LOGGING_LOGGER_CLASS._log( - self, level, msg, args, exc_info=exc_info, extra=extra - ) - elif sys.version_info < (3, 8): - LOGGING_LOGGER_CLASS._log( - self, - level, - msg, - args, - exc_info=exc_info, - extra=extra, - stack_info=stack_info, - ) - else: - LOGGING_LOGGER_CLASS._log( - self, - level, - msg, - args, - exc_info=exc_info, - extra=extra, - stack_info=stack_info, - stacklevel=stacklevel, - ) + try: + LOG_LOCK.acquire() + if sys.version_info < (3,): + LOGGING_LOGGER_CLASS._log( + self, level, msg, args, exc_info=exc_info, extra=extra + ) + elif sys.version_info < (3, 8): + LOGGING_LOGGER_CLASS._log( + self, + level, + msg, + args, + exc_info=exc_info, + extra=extra, + stack_info=stack_info, + ) + else: + LOGGING_LOGGER_CLASS._log( + self, + level, + msg, + args, + exc_info=exc_info, + extra=extra, + stack_info=stack_info, + stacklevel=stacklevel, + ) + finally: + LOG_LOCK.release() def makeRecord( self, diff --git a/salt/client/ssh/__init__.py b/salt/client/ssh/__init__.py index 37faa869bc..0066f4597b 100644 --- a/salt/client/ssh/__init__.py +++ b/salt/client/ssh/__init__.py @@ -6,11 +6,13 @@ import base64 import binascii import copy import datetime +import gc import getpass import hashlib import logging import multiprocessing import os +import psutil import queue import re import subprocess @@ -19,6 +21,7 @@ import tarfile import tempfile import time import uuid +from collections import deque import salt.client.ssh.shell import salt.client.ssh.wrapper @@ -44,6 +47,7 @@ import salt.utils.stringutils import salt.utils.thin import salt.utils.url import salt.utils.verify +from salt._logging.impl import LOG_LOCK from salt.template import compile_template from salt.utils.platform import is_junos, is_windows from salt.utils.process import Process @@ -146,15 +150,26 @@ elif [ "$SUDO" ] && [ -n "$SUDO_USER" ] then SUDO="sudo " fi EX_PYTHON_INVALID={EX_THIN_PYTHON_INVALID} -PYTHON_CMDS="python3 /usr/libexec/platform-python python27 python2.7 python26 python2.6 python2 python" +set +x +SSH_PY_CODE='import base64; + exec(base64.b64decode("""{{SSH_PY_CODE}}""").decode("utf-8"))' +if [ -n "$DEBUG" ] + then set -x +fi +PYTHON_CMDS="/var/tmp/venv-salt-minion/bin/python python3 /usr/libexec/platform-python python27 python2.7 python26 python2.6 python2 python" for py_cmd in $PYTHON_CMDS do if command -v "$py_cmd" >/dev/null 2>&1 && "$py_cmd" -c "import sys; sys.exit(not (sys.version_info >= (2, 6)));" then py_cmd_path=`"$py_cmd" -c 'from __future__ import print_function;import sys; print(sys.executable);'` cmdpath=`command -v $py_cmd 2>/dev/null || which $py_cmd 2>/dev/null` + cmdpath=`readlink -f $cmdpath` if file $cmdpath | grep "shell script" > /dev/null then + if echo $cmdpath | grep venv-salt-minion > /dev/null + then + exec $SUDO "$cmdpath" -c "$SSH_PY_CODE" + fi ex_vars="'PATH', 'LD_LIBRARY_PATH', 'MANPATH', \ 'XDG_DATA_DIRS', 'PKG_CONFIG_PATH'" export `$py_cmd -c \ @@ -166,13 +181,9 @@ do exec $SUDO PATH=$PATH LD_LIBRARY_PATH=$LD_LIBRARY_PATH \ MANPATH=$MANPATH XDG_DATA_DIRS=$XDG_DATA_DIRS \ PKG_CONFIG_PATH=$PKG_CONFIG_PATH \ - "$py_cmd_path" -c \ - 'import base64; - exec(base64.b64decode("""{{SSH_PY_CODE}}""").decode("utf-8"))' + "$py_cmd_path" -c "$SSH_PY_CODE" else - exec $SUDO "$py_cmd_path" -c \ - 'import base64; - exec(base64.b64decode("""{{SSH_PY_CODE}}""").decode("utf-8"))' + exec $SUDO "$py_cmd_path" -c "$SSH_PY_CODE" fi exit 0 else @@ -189,6 +200,9 @@ EOF'''.format( ] ) +# The file on a salt-ssh minion used to identify if Salt Bundle was deployed +VENV_HASH_FILE = "/var/tmp/venv-salt-minion/venv-hash.txt" + if not is_windows() and not is_junos(): shim_file = os.path.join(os.path.dirname(__file__), "ssh_py_shim.py") if not os.path.exists(shim_file): @@ -209,7 +223,7 @@ class SSH: ROSTER_UPDATE_FLAG = "#__needs_update" - def __init__(self, opts): + def __init__(self, opts, context=None): self.__parsed_rosters = {SSH.ROSTER_UPDATE_FLAG: True} pull_sock = os.path.join(opts["sock_dir"], "master_event_pull.ipc") if os.path.exists(pull_sock) and zmq: @@ -236,7 +250,9 @@ class SSH: else "glob" ) self._expand_target() - self.roster = salt.roster.Roster(self.opts, self.opts.get("roster", "flat")) + self.roster = salt.roster.Roster( + self.opts, self.opts.get("roster", "flat"), context=context + ) self.targets = self.roster.targets(self.opts["tgt"], self.tgt_type) if not self.targets: self._update_targets() @@ -317,6 +333,14 @@ class SSH: ) self.mods = mod_data(self.fsclient) + self.cache = salt.cache.Cache(self.opts) + self.master_id = self.opts["id"] + self.max_pid_wait = int(self.opts.get("ssh_max_pid_wait", 600)) + self.session_flock_file = os.path.join( + self.opts["cachedir"], "salt-ssh.session.lock" + ) + self.ssh_session_grace_time = int(self.opts.get("ssh_session_grace_time", 3)) + @property def parse_tgt(self): """ @@ -531,6 +555,8 @@ class SSH: """ Run the routine in a "Thread", put a dict on the queue """ + LOG_LOCK.release() + salt.loader.LOAD_LOCK.release() opts = copy.deepcopy(opts) single = Single( opts, @@ -570,7 +596,7 @@ class SSH: """ que = multiprocessing.Queue() running = {} - target_iter = self.targets.__iter__() + targets_queue = deque(self.targets.keys()) returned = set() rets = set() init = False @@ -579,11 +605,43 @@ class SSH: log.error("No matching targets found in roster.") break if len(running) < self.opts.get("ssh_max_procs", 25) and not init: - try: - host = next(target_iter) - except StopIteration: + if targets_queue: + host = targets_queue.popleft() + else: init = True continue + with salt.utils.files.flopen(self.session_flock_file, "w"): + cached_session = self.cache.fetch("salt-ssh/session", host) + if cached_session is not None and "ts" in cached_session: + prev_session_running = time.time() - cached_session["ts"] + if ( + "pid" in cached_session + and cached_session.get("master_id", self.master_id) + == self.master_id + ): + pid_running = ( + False + if cached_session["pid"] == 0 + else psutil.pid_exists(cached_session["pid"]) + ) + if ( + pid_running and prev_session_running < self.max_pid_wait + ) or ( + not pid_running + and prev_session_running < self.ssh_session_grace_time + ): + targets_queue.append(host) + time.sleep(0.3) + continue + self.cache.store( + "salt-ssh/session", + host, + { + "pid": 0, + "master_id": self.master_id, + "ts": time.time(), + }, + ) for default in self.defaults: if default not in self.targets[host]: self.targets[host][default] = self.defaults[default] @@ -615,8 +673,38 @@ class SSH: mine, ) routine = Process(target=self.handle_routine, args=args) - routine.start() + # Explicitly call garbage collector to prevent possible segfault + # in salt-api child process. (bsc#1188607) + gc.collect() + try: + # salt.loader.LOAD_LOCK is used to prevent deadlock + # with importlib in combination with using multiprocessing (bsc#1182851) + # If the salt-api child process is creating while LazyLoader instance + # is loading module, new child process gets the lock for this module acquired. + # Touching this module with importlib inside child process leads to deadlock. + # + # salt.loader.LOAD_LOCK is used to prevent salt-api child process creation + # while creating new instance of LazyLoader + # salt.loader.LOAD_LOCK must be released explicitly in self.handle_routine + salt.loader.LOAD_LOCK.acquire() + # The same solution applied to fix logging deadlock + # LOG_LOCK must be released explicitly in self.handle_routine + LOG_LOCK.acquire() + routine.start() + finally: + LOG_LOCK.release() + salt.loader.LOAD_LOCK.release() running[host] = {"thread": routine} + with salt.utils.files.flopen(self.session_flock_file, "w"): + self.cache.store( + "salt-ssh/session", + host, + { + "pid": routine.pid, + "master_id": self.master_id, + "ts": time.time(), + }, + ) continue ret = {} try: @@ -647,12 +735,27 @@ class SSH: ) ret = {"id": host, "ret": error} log.error(error) + log.error( + "PID %s did not return any data for host '%s'", + running[host]["thread"].pid, + host, + ) yield {ret["id"]: ret["ret"]} running[host]["thread"].join() rets.add(host) for host in rets: if host in running: running.pop(host) + with salt.utils.files.flopen(self.session_flock_file, "w"): + self.cache.store( + "salt-ssh/session", + host, + { + "pid": 0, + "master_id": self.master_id, + "ts": time.time(), + }, + ) if len(rets) >= len(self.targets): break # Sleep when limit or all threads started @@ -916,6 +1019,7 @@ class Single: self.context = {"master_opts": self.opts, "fileclient": self.fsclient} self.ssh_pre_flight = kwargs.get("ssh_pre_flight", None) + self.ssh_pre_flight_args = kwargs.get("ssh_pre_flight_args", None) if self.ssh_pre_flight: self.ssh_pre_file = os.path.basename(self.ssh_pre_flight) @@ -1007,7 +1111,7 @@ class Single: self.shell.send(self.ssh_pre_flight, script) - return self.execute_script(script) + return self.execute_script(script, script_args=self.ssh_pre_flight_args) def check_thin_dir(self): """ @@ -1020,14 +1124,24 @@ class Single: return False return True + def check_venv_hash_file(self): + """ + check if the venv exists on the remote machine + """ + stdout, stderr, retcode = self.shell.exec_cmd( + "test -f {}".format(VENV_HASH_FILE) + ) + return retcode == 0 + def deploy(self): """ Deploy salt-thin """ - self.shell.send( - self.thin, - os.path.join(self.thin_dir, "salt-thin.tgz"), - ) + if not self.check_venv_hash_file(): + self.shell.send( + self.thin, + os.path.join(self.thin_dir, "salt-thin.tgz"), + ) self.deploy_ext() return True @@ -1055,8 +1169,9 @@ class Single: Returns tuple of (stdout, stderr, retcode) """ stdout = stderr = retcode = None + raw_shell = self.opts.get("raw_shell", False) - if self.ssh_pre_flight: + if self.ssh_pre_flight and not raw_shell: if not self.opts.get("ssh_run_pre_flight", False) and self.check_thin_dir(): log.info( "%s thin dir already exists. Not running ssh_pre_flight script", @@ -1070,14 +1185,16 @@ class Single: stdout, stderr, retcode = self.run_ssh_pre_flight() if retcode != 0: log.error( - "Error running ssh_pre_flight script %s", self.ssh_pre_file + "Error running ssh_pre_flight script %s for host '%s'", + self.ssh_pre_file, + self.target["host"], ) return stdout, stderr, retcode log.info( "Successfully ran the ssh_pre_flight script: %s", self.ssh_pre_file ) - if self.opts.get("raw_shell", False): + if raw_shell: cmd_str = " ".join([self._escape_arg(arg) for arg in self.argv]) stdout, stderr, retcode = self.shell.exec_cmd(cmd_str) @@ -1335,15 +1452,24 @@ ARGS = {arguments}\n'''.format( return cmd - def execute_script(self, script, extension="py", pre_dir=""): + def execute_script(self, script, extension="py", pre_dir="", script_args=None): """ execute a script on the minion then delete """ + args = "" + if script_args: + args = " {}".format( + " ".join([str(el) for el in script_args]) + if isinstance(script_args, (list, tuple)) + else script_args + ) if extension == "ps1": ret = self.shell.exec_cmd('"powershell {}"'.format(script)) else: if not self.winrm: - ret = self.shell.exec_cmd("/bin/sh '{}{}'".format(pre_dir, script)) + ret = self.shell.exec_cmd( + "/bin/sh '{}{}'{}".format(pre_dir, script, args) + ) else: ret = saltwinshell.call_python(self, script) diff --git a/salt/client/ssh/client.py b/salt/client/ssh/client.py index 245e1529c6..a45deeb325 100644 --- a/salt/client/ssh/client.py +++ b/salt/client/ssh/client.py @@ -107,7 +107,7 @@ class SSHClient: return sane_kwargs def _prep_ssh( - self, tgt, fun, arg=(), timeout=None, tgt_type="glob", kwarg=None, **kwargs + self, tgt, fun, arg=(), timeout=None, tgt_type="glob", kwarg=None, context=None, **kwargs ): """ Prepare the arguments @@ -122,7 +122,7 @@ class SSHClient: opts["selected_target_option"] = tgt_type opts["tgt"] = tgt opts["arg"] = arg - return salt.client.ssh.SSH(opts) + return salt.client.ssh.SSH(opts, context=context) def cmd_iter( self, @@ -159,7 +159,7 @@ class SSHClient: final.update(ret) return final - def cmd_sync(self, low): + def cmd_sync(self, low, context=None): """ Execute a salt-ssh call synchronously. @@ -192,6 +192,7 @@ class SSHClient: low.get("timeout"), low.get("tgt_type"), low.get("kwarg"), + context=context, **kwargs ) diff --git a/salt/client/ssh/shell.py b/salt/client/ssh/shell.py index 7461618a2e..6b54a20abd 100644 --- a/salt/client/ssh/shell.py +++ b/salt/client/ssh/shell.py @@ -442,6 +442,14 @@ class Shell: if stdout: old_stdout = stdout time.sleep(0.01) + if term.exitstatus is None: + try: + term.wait() + except: # pylint: disable=broad-except + # It's safe to put the broad exception handling here + # as we just need to ensure the child process in term finished + # to get proper term.exitstatus instead of None + pass return ret_stdout, ret_stderr, term.exitstatus finally: term.close(terminate=True, kill=True) diff --git a/salt/client/ssh/ssh_py_shim.py b/salt/client/ssh/ssh_py_shim.py index b77749f495..293ea1b7fa 100644 --- a/salt/client/ssh/ssh_py_shim.py +++ b/salt/client/ssh/ssh_py_shim.py @@ -279,56 +279,72 @@ def main(argv): # pylint: disable=W0613 """ Main program body """ - thin_path = os.path.join(OPTIONS.saltdir, THIN_ARCHIVE) - if os.path.isfile(thin_path): - if OPTIONS.checksum != get_hash(thin_path, OPTIONS.hashfunc): - need_deployment() - unpack_thin(thin_path) - # Salt thin now is available to use - else: - if not sys.platform.startswith("win"): - scpstat = subprocess.Popen(["/bin/sh", "-c", "command -v scp"]).wait() - if scpstat != 0: - sys.exit(EX_SCP_NOT_FOUND) - - if os.path.exists(OPTIONS.saltdir) and not os.path.isdir(OPTIONS.saltdir): - sys.stderr.write( - 'ERROR: salt path "{0}" exists but is not a directory\n'.format( - OPTIONS.saltdir + + virt_env = os.getenv("VIRTUAL_ENV", None) + # VIRTUAL_ENV environment variable is defined by venv-salt-minion wrapper + # it's used to check if the shim is running under this wrapper + venv_salt_call = None + if virt_env and "venv-salt-minion" in virt_env: + venv_salt_call = os.path.join(virt_env, "bin", "salt-call") + if not os.path.exists(venv_salt_call): + venv_salt_call = None + elif not os.path.exists(OPTIONS.saltdir): + os.makedirs(OPTIONS.saltdir) + cache_dir = os.path.join(OPTIONS.saltdir, "running_data", "var", "cache") + os.makedirs(os.path.join(cache_dir, "salt")) + os.symlink("salt", os.path.relpath(os.path.join(cache_dir, "venv-salt-minion"))) + + if venv_salt_call is None: + # Use Salt thin only if Salt Bundle (venv-salt-minion) is not available + thin_path = os.path.join(OPTIONS.saltdir, THIN_ARCHIVE) + if os.path.isfile(thin_path): + if OPTIONS.checksum != get_hash(thin_path, OPTIONS.hashfunc): + need_deployment() + unpack_thin(thin_path) + # Salt thin now is available to use + else: + if not sys.platform.startswith("win"): + scpstat = subprocess.Popen(["/bin/sh", "-c", "command -v scp"]).wait() + if scpstat != 0: + sys.exit(EX_SCP_NOT_FOUND) + + if os.path.exists(OPTIONS.saltdir) and not os.path.isdir(OPTIONS.saltdir): + sys.stderr.write( + 'ERROR: salt path "{0}" exists but is' + " not a directory\n".format(OPTIONS.saltdir) ) - ) - sys.exit(EX_CANTCREAT) + sys.exit(EX_CANTCREAT) - if not os.path.exists(OPTIONS.saltdir): - need_deployment() + if not os.path.exists(OPTIONS.saltdir): + need_deployment() - code_checksum_path = os.path.normpath( - os.path.join(OPTIONS.saltdir, "code-checksum") - ) - if not os.path.exists(code_checksum_path) or not os.path.isfile( - code_checksum_path - ): - sys.stderr.write( - "WARNING: Unable to locate current code checksum: {0}.\n".format( - code_checksum_path - ) + code_checksum_path = os.path.normpath( + os.path.join(OPTIONS.saltdir, "code-checksum") ) - need_deployment() - with open(code_checksum_path, "r") as vpo: - cur_code_cs = vpo.readline().strip() - if cur_code_cs != OPTIONS.code_checksum: - sys.stderr.write( - "WARNING: current code checksum {0} is different to {1}.\n".format( - cur_code_cs, OPTIONS.code_checksum + if not os.path.exists(code_checksum_path) or not os.path.isfile( + code_checksum_path + ): + sys.stderr.write( + "WARNING: Unable to locate current code checksum: {0}.\n".format( + code_checksum_path + ) ) - ) - need_deployment() - # Salt thin exists and is up-to-date - fall through and use it + need_deployment() + with open(code_checksum_path, "r") as vpo: + cur_code_cs = vpo.readline().strip() + if cur_code_cs != OPTIONS.code_checksum: + sys.stderr.write( + "WARNING: current code checksum {0} is different to {1}.\n".format( + cur_code_cs, OPTIONS.code_checksum + ) + ) + need_deployment() + # Salt thin exists and is up-to-date - fall through and use it - salt_call_path = os.path.join(OPTIONS.saltdir, "salt-call") - if not os.path.isfile(salt_call_path): - sys.stderr.write('ERROR: thin is missing "{0}"\n'.format(salt_call_path)) - need_deployment() + salt_call_path = os.path.join(OPTIONS.saltdir, "salt-call") + if not os.path.isfile(salt_call_path): + sys.stderr.write('ERROR: thin is missing "{0}"\n'.format(salt_call_path)) + need_deployment() with open(os.path.join(OPTIONS.saltdir, "minion"), "w") as config: config.write(OPTIONS.config + "\n") @@ -351,8 +367,8 @@ def main(argv): # pylint: disable=W0613 argv_prepared = ARGS salt_argv = [ - get_executable(), - salt_call_path, + sys.executable if venv_salt_call is not None else get_executable(), + venv_salt_call if venv_salt_call is not None else salt_call_path, "--retcode-passthrough", "--local", "--metadata", diff --git a/salt/loader/__init__.py b/salt/loader/__init__.py index f7815acc03..a0f2220476 100644 --- a/salt/loader/__init__.py +++ b/salt/loader/__init__.py @@ -8,6 +8,7 @@ import contextlib import logging import os import re +import threading import time import types @@ -31,7 +32,7 @@ from salt.exceptions import LoaderError from salt.template import check_render_pipe_str from salt.utils import entrypoints -from .lazy import SALT_BASE_PATH, FilterDictWrapper, LazyLoader +from .lazy import SALT_BASE_PATH, FilterDictWrapper, LazyLoader as _LazyLoader log = logging.getLogger(__name__) @@ -81,6 +82,18 @@ SALT_INTERNAL_LOADERS_PATHS = ( str(SALT_BASE_PATH / "wheel"), ) +LOAD_LOCK = threading.Lock() + + +def LazyLoader(*args, **kwargs): + # This wrapper is used to prevent deadlocks with importlib (bsc#1182851) + # LOAD_LOCK is also used directly in salt.client.ssh.SSH + try: + LOAD_LOCK.acquire() + return _LazyLoader(*args, **kwargs) + finally: + LOAD_LOCK.release() + def static_loader( opts, @@ -597,16 +610,19 @@ def fileserver(opts, backends): ) -def roster(opts, runner=None, utils=None, whitelist=None): +def roster(opts, runner=None, utils=None, whitelist=None, context=None): """ Returns the roster modules """ + if context is None: + context = {} + return LazyLoader( _module_dirs(opts, "roster"), opts, tag="roster", whitelist=whitelist, - pack={"__runner__": runner, "__utils__": utils}, + pack={"__runner__": runner, "__utils__": utils, "__context__": context}, extra_module_dirs=utils.module_dirs if utils else None, ) @@ -744,7 +760,14 @@ def render(opts, functions, states=None, proxy=None, context=None): ) rend = FilterDictWrapper(ret, ".render") - if not check_render_pipe_str( + def _check_render_pipe_str(pipestr, renderers, blacklist, whitelist): + try: + LOAD_LOCK.acquire() + return check_render_pipe_str(pipestr, renderers, blacklist, whitelist) + finally: + LOAD_LOCK.release() + + if not _check_render_pipe_str( opts["renderer"], rend, opts["renderer_blacklist"], opts["renderer_whitelist"] ): err = ( diff --git a/salt/netapi/__init__.py b/salt/netapi/__init__.py index 81954acb96..5d2ff994a6 100644 --- a/salt/netapi/__init__.py +++ b/salt/netapi/__init__.py @@ -46,6 +46,7 @@ class NetapiClient: self.loadauth = salt.auth.LoadAuth(apiopts) self.key = salt.daemons.masterapi.access_keys(apiopts) self.ckminions = salt.utils.minions.CkMinions(apiopts) + self.context = {} def _is_master_running(self): """ @@ -205,7 +206,7 @@ class NetapiClient: with salt.client.ssh.client.SSHClient( mopts=self.opts, disable_custom_roster=True ) as client: - return client.cmd_sync(kwargs) + return client.cmd_sync(kwargs, context=self.context) def runner(self, fun, timeout=None, full_return=False, **kwargs): """ diff --git a/salt/roster/__init__.py b/salt/roster/__init__.py index b45afffd24..4b6182b2dd 100644 --- a/salt/roster/__init__.py +++ b/salt/roster/__init__.py @@ -59,7 +59,7 @@ class Roster: minion aware """ - def __init__(self, opts, backends="flat"): + def __init__(self, opts, backends="flat", context=None): self.opts = opts if isinstance(backends, list): self.backends = backends @@ -71,7 +71,9 @@ class Roster: self.backends = ["flat"] utils = salt.loader.utils(self.opts) runner = salt.loader.runner(self.opts, utils=utils) - self.rosters = salt.loader.roster(self.opts, runner=runner, utils=utils) + self.rosters = salt.loader.roster( + self.opts, runner=runner, utils=utils, context=context + ) def _gen_back(self): """ diff --git a/tests/unit/test_loader.py b/tests/unit/test_loader.py index 2319f815d3..e83f86cd01 100644 --- a/tests/unit/test_loader.py +++ b/tests/unit/test_loader.py @@ -1696,7 +1696,7 @@ class LazyLoaderRefreshFileMappingTest(TestCase): cls.funcs = salt.loader.minion_mods(cls.opts, utils=cls.utils, proxy=cls.proxy) def setUp(self): - class LazyLoaderMock(salt.loader.LazyLoader): + class LazyLoaderMock(salt.loader._LazyLoader): pass self.LOADER_CLASS = LazyLoaderMock -- 2.35.1