salt/add-salt-ssh-support-with-venv-salt-minion-3004-493.patch

841 lines
31 KiB
Diff
Raw Normal View History

From 8d1aba4e450922ec7ae4ce5fcf13dc5f7d2b8b7e Mon Sep 17 00:00:00 2001
From: Victor Zhestkov <vzhestkov@suse.com>
Date: Thu, 24 Feb 2022 16:52:24 +0300
Subject: [PATCH] Add salt-ssh support with venv-salt-minion - 3004
(#493)
* Add salt-ssh support with venv-salt-minion
* Add some comments and drop the commented line
* Fix return in check_venv_hash_file
* Convert all script parameters to strings
* Reduce the size of minion response
Minion response contains SSH_PY_CODE wrapped to base64.
This fix reduces the size of the response in DEBUG logging
* Make VENV_HASH_FILE global
* Pass the context to roster modules
* Avoid race condition on loading roster modules
* Prevent simultaneous to salt-ssh minion
* Make ssh session grace time configurable
* Prevent possible segfault by GC
* Revert "Avoid race condition on loading roster modules"
This reverts commit 8ff822a162cc494d3528184aef983ad20e09f4e2.
* Prevent deadlocks with importlib on using LazyLoader
* Make logging on salt-ssh errors more informative
* Add comments about using salt.loader.LOAD_LOCK
* Fix test_loader test
* Prevent deadlocks on using logging
* Use collections.deque instead of list for salt-ssh
Suggested by @agraul
* Get proper exitstatus from salt.utils.vt.Terminal
to prevent empty event returns due to improperly detecting
the child process as failed
* Do not run pre flight script for raw_shell
---
salt/_logging/impl.py | 59 ++++++-----
salt/client/ssh/__init__.py | 174 ++++++++++++++++++++++++++++-----
salt/client/ssh/client.py | 7 +-
salt/client/ssh/shell.py | 8 ++
salt/client/ssh/ssh_py_shim.py | 108 +++++++++++---------
salt/loader/__init__.py | 31 +++++-
salt/netapi/__init__.py | 3 +-
salt/roster/__init__.py | 6 +-
tests/unit/test_loader.py | 2 +-
9 files changed, 292 insertions(+), 106 deletions(-)
diff --git a/salt/_logging/impl.py b/salt/_logging/impl.py
index 779316ce0b..953490b284 100644
--- a/salt/_logging/impl.py
+++ b/salt/_logging/impl.py
@@ -7,6 +7,7 @@
import logging
import re
import sys
+import threading
import types
# Let's define these custom logging levels before importing the salt._logging.mixins
@@ -89,6 +90,10 @@ SORTED_LEVEL_NAMES = [l[0] for l in sorted(LOG_LEVELS.items(), key=lambda x: x[1
MODNAME_PATTERN = re.compile(r"(?P<name>%%\(name\)(?:\-(?P<digits>[\d]+))?s)")
+# LOG_LOCK is used to prevent deadlocks on using logging
+# in combination with multiprocessing with salt-api
+LOG_LOCK = threading.Lock()
+
# ----- REMOVE ME ON REFACTOR COMPLETE ------------------------------------------------------------------------------>
class __NullLoggingHandler(TemporaryLoggingHandler):
@@ -283,31 +288,35 @@ class SaltLoggingClass(
else:
extra["exc_info_on_loglevel"] = exc_info_on_loglevel
- if sys.version_info < (3,):
- LOGGING_LOGGER_CLASS._log(
- self, level, msg, args, exc_info=exc_info, extra=extra
- )
- elif sys.version_info < (3, 8):
- LOGGING_LOGGER_CLASS._log(
- self,
- level,
- msg,
- args,
- exc_info=exc_info,
- extra=extra,
- stack_info=stack_info,
- )
- else:
- LOGGING_LOGGER_CLASS._log(
- self,
- level,
- msg,
- args,
- exc_info=exc_info,
- extra=extra,
- stack_info=stack_info,
- stacklevel=stacklevel,
- )
+ try:
+ LOG_LOCK.acquire()
+ if sys.version_info < (3,):
+ LOGGING_LOGGER_CLASS._log(
+ self, level, msg, args, exc_info=exc_info, extra=extra
+ )
+ elif sys.version_info < (3, 8):
+ LOGGING_LOGGER_CLASS._log(
+ self,
+ level,
+ msg,
+ args,
+ exc_info=exc_info,
+ extra=extra,
+ stack_info=stack_info,
+ )
+ else:
+ LOGGING_LOGGER_CLASS._log(
+ self,
+ level,
+ msg,
+ args,
+ exc_info=exc_info,
+ extra=extra,
+ stack_info=stack_info,
+ stacklevel=stacklevel,
+ )
+ finally:
+ LOG_LOCK.release()
def makeRecord(
self,
diff --git a/salt/client/ssh/__init__.py b/salt/client/ssh/__init__.py
index 37faa869bc..0066f4597b 100644
--- a/salt/client/ssh/__init__.py
+++ b/salt/client/ssh/__init__.py
@@ -6,11 +6,13 @@ import base64
import binascii
import copy
import datetime
+import gc
import getpass
import hashlib
import logging
import multiprocessing
import os
+import psutil
import queue
import re
import subprocess
@@ -19,6 +21,7 @@ import tarfile
import tempfile
import time
import uuid
+from collections import deque
import salt.client.ssh.shell
import salt.client.ssh.wrapper
@@ -44,6 +47,7 @@ import salt.utils.stringutils
import salt.utils.thin
import salt.utils.url
import salt.utils.verify
+from salt._logging.impl import LOG_LOCK
from salt.template import compile_template
from salt.utils.platform import is_junos, is_windows
from salt.utils.process import Process
@@ -146,15 +150,26 @@ elif [ "$SUDO" ] && [ -n "$SUDO_USER" ]
then SUDO="sudo "
fi
EX_PYTHON_INVALID={EX_THIN_PYTHON_INVALID}
-PYTHON_CMDS="python3 /usr/libexec/platform-python python27 python2.7 python26 python2.6 python2 python"
+set +x
+SSH_PY_CODE='import base64;
+ exec(base64.b64decode("""{{SSH_PY_CODE}}""").decode("utf-8"))'
+if [ -n "$DEBUG" ]
+ then set -x
+fi
+PYTHON_CMDS="/var/tmp/venv-salt-minion/bin/python python3 /usr/libexec/platform-python python27 python2.7 python26 python2.6 python2 python"
for py_cmd in $PYTHON_CMDS
do
if command -v "$py_cmd" >/dev/null 2>&1 && "$py_cmd" -c "import sys; sys.exit(not (sys.version_info >= (2, 6)));"
then
py_cmd_path=`"$py_cmd" -c 'from __future__ import print_function;import sys; print(sys.executable);'`
cmdpath=`command -v $py_cmd 2>/dev/null || which $py_cmd 2>/dev/null`
+ cmdpath=`readlink -f $cmdpath`
if file $cmdpath | grep "shell script" > /dev/null
then
+ if echo $cmdpath | grep venv-salt-minion > /dev/null
+ then
+ exec $SUDO "$cmdpath" -c "$SSH_PY_CODE"
+ fi
ex_vars="'PATH', 'LD_LIBRARY_PATH', 'MANPATH', \
'XDG_DATA_DIRS', 'PKG_CONFIG_PATH'"
export `$py_cmd -c \
@@ -166,13 +181,9 @@ do
exec $SUDO PATH=$PATH LD_LIBRARY_PATH=$LD_LIBRARY_PATH \
MANPATH=$MANPATH XDG_DATA_DIRS=$XDG_DATA_DIRS \
PKG_CONFIG_PATH=$PKG_CONFIG_PATH \
- "$py_cmd_path" -c \
- 'import base64;
- exec(base64.b64decode("""{{SSH_PY_CODE}}""").decode("utf-8"))'
+ "$py_cmd_path" -c "$SSH_PY_CODE"
else
- exec $SUDO "$py_cmd_path" -c \
- 'import base64;
- exec(base64.b64decode("""{{SSH_PY_CODE}}""").decode("utf-8"))'
+ exec $SUDO "$py_cmd_path" -c "$SSH_PY_CODE"
fi
exit 0
else
@@ -189,6 +200,9 @@ EOF'''.format(
]
)
+# The file on a salt-ssh minion used to identify if Salt Bundle was deployed
+VENV_HASH_FILE = "/var/tmp/venv-salt-minion/venv-hash.txt"
+
if not is_windows() and not is_junos():
shim_file = os.path.join(os.path.dirname(__file__), "ssh_py_shim.py")
if not os.path.exists(shim_file):
@@ -209,7 +223,7 @@ class SSH:
ROSTER_UPDATE_FLAG = "#__needs_update"
- def __init__(self, opts):
+ def __init__(self, opts, context=None):
self.__parsed_rosters = {SSH.ROSTER_UPDATE_FLAG: True}
pull_sock = os.path.join(opts["sock_dir"], "master_event_pull.ipc")
if os.path.exists(pull_sock) and zmq:
@@ -236,7 +250,9 @@ class SSH:
else "glob"
)
self._expand_target()
- self.roster = salt.roster.Roster(self.opts, self.opts.get("roster", "flat"))
+ self.roster = salt.roster.Roster(
+ self.opts, self.opts.get("roster", "flat"), context=context
+ )
self.targets = self.roster.targets(self.opts["tgt"], self.tgt_type)
if not self.targets:
self._update_targets()
@@ -317,6 +333,14 @@ class SSH:
)
self.mods = mod_data(self.fsclient)
+ self.cache = salt.cache.Cache(self.opts)
+ self.master_id = self.opts["id"]
+ self.max_pid_wait = int(self.opts.get("ssh_max_pid_wait", 600))
+ self.session_flock_file = os.path.join(
+ self.opts["cachedir"], "salt-ssh.session.lock"
+ )
+ self.ssh_session_grace_time = int(self.opts.get("ssh_session_grace_time", 3))
+
@property
def parse_tgt(self):
"""
@@ -531,6 +555,8 @@ class SSH:
"""
Run the routine in a "Thread", put a dict on the queue
"""
+ LOG_LOCK.release()
+ salt.loader.LOAD_LOCK.release()
opts = copy.deepcopy(opts)
single = Single(
opts,
@@ -570,7 +596,7 @@ class SSH:
"""
que = multiprocessing.Queue()
running = {}
- target_iter = self.targets.__iter__()
+ targets_queue = deque(self.targets.keys())
returned = set()
rets = set()
init = False
@@ -579,11 +605,43 @@ class SSH:
log.error("No matching targets found in roster.")
break
if len(running) < self.opts.get("ssh_max_procs", 25) and not init:
- try:
- host = next(target_iter)
- except StopIteration:
+ if targets_queue:
+ host = targets_queue.popleft()
+ else:
init = True
continue
+ with salt.utils.files.flopen(self.session_flock_file, "w"):
+ cached_session = self.cache.fetch("salt-ssh/session", host)
+ if cached_session is not None and "ts" in cached_session:
+ prev_session_running = time.time() - cached_session["ts"]
+ if (
+ "pid" in cached_session
+ and cached_session.get("master_id", self.master_id)
+ == self.master_id
+ ):
+ pid_running = (
+ False
+ if cached_session["pid"] == 0
+ else psutil.pid_exists(cached_session["pid"])
+ )
+ if (
+ pid_running and prev_session_running < self.max_pid_wait
+ ) or (
+ not pid_running
+ and prev_session_running < self.ssh_session_grace_time
+ ):
+ targets_queue.append(host)
+ time.sleep(0.3)
+ continue
+ self.cache.store(
+ "salt-ssh/session",
+ host,
+ {
+ "pid": 0,
+ "master_id": self.master_id,
+ "ts": time.time(),
+ },
+ )
for default in self.defaults:
if default not in self.targets[host]:
self.targets[host][default] = self.defaults[default]
@@ -615,8 +673,38 @@ class SSH:
mine,
)
routine = Process(target=self.handle_routine, args=args)
- routine.start()
+ # Explicitly call garbage collector to prevent possible segfault
+ # in salt-api child process. (bsc#1188607)
+ gc.collect()
+ try:
+ # salt.loader.LOAD_LOCK is used to prevent deadlock
+ # with importlib in combination with using multiprocessing (bsc#1182851)
+ # If the salt-api child process is creating while LazyLoader instance
+ # is loading module, new child process gets the lock for this module acquired.
+ # Touching this module with importlib inside child process leads to deadlock.
+ #
+ # salt.loader.LOAD_LOCK is used to prevent salt-api child process creation
+ # while creating new instance of LazyLoader
+ # salt.loader.LOAD_LOCK must be released explicitly in self.handle_routine
+ salt.loader.LOAD_LOCK.acquire()
+ # The same solution applied to fix logging deadlock
+ # LOG_LOCK must be released explicitly in self.handle_routine
+ LOG_LOCK.acquire()
+ routine.start()
+ finally:
+ LOG_LOCK.release()
+ salt.loader.LOAD_LOCK.release()
running[host] = {"thread": routine}
+ with salt.utils.files.flopen(self.session_flock_file, "w"):
+ self.cache.store(
+ "salt-ssh/session",
+ host,
+ {
+ "pid": routine.pid,
+ "master_id": self.master_id,
+ "ts": time.time(),
+ },
+ )
continue
ret = {}
try:
@@ -647,12 +735,27 @@ class SSH:
)
ret = {"id": host, "ret": error}
log.error(error)
+ log.error(
+ "PID %s did not return any data for host '%s'",
+ running[host]["thread"].pid,
+ host,
+ )
yield {ret["id"]: ret["ret"]}
running[host]["thread"].join()
rets.add(host)
for host in rets:
if host in running:
running.pop(host)
+ with salt.utils.files.flopen(self.session_flock_file, "w"):
+ self.cache.store(
+ "salt-ssh/session",
+ host,
+ {
+ "pid": 0,
+ "master_id": self.master_id,
+ "ts": time.time(),
+ },
+ )
if len(rets) >= len(self.targets):
break
# Sleep when limit or all threads started
@@ -916,6 +1019,7 @@ class Single:
self.context = {"master_opts": self.opts, "fileclient": self.fsclient}
self.ssh_pre_flight = kwargs.get("ssh_pre_flight", None)
+ self.ssh_pre_flight_args = kwargs.get("ssh_pre_flight_args", None)
if self.ssh_pre_flight:
self.ssh_pre_file = os.path.basename(self.ssh_pre_flight)
@@ -1007,7 +1111,7 @@ class Single:
self.shell.send(self.ssh_pre_flight, script)
- return self.execute_script(script)
+ return self.execute_script(script, script_args=self.ssh_pre_flight_args)
def check_thin_dir(self):
"""
@@ -1020,14 +1124,24 @@ class Single:
return False
return True
+ def check_venv_hash_file(self):
+ """
+ check if the venv exists on the remote machine
+ """
+ stdout, stderr, retcode = self.shell.exec_cmd(
+ "test -f {}".format(VENV_HASH_FILE)
+ )
+ return retcode == 0
+
def deploy(self):
"""
Deploy salt-thin
"""
- self.shell.send(
- self.thin,
- os.path.join(self.thin_dir, "salt-thin.tgz"),
- )
+ if not self.check_venv_hash_file():
+ self.shell.send(
+ self.thin,
+ os.path.join(self.thin_dir, "salt-thin.tgz"),
+ )
self.deploy_ext()
return True
@@ -1055,8 +1169,9 @@ class Single:
Returns tuple of (stdout, stderr, retcode)
"""
stdout = stderr = retcode = None
+ raw_shell = self.opts.get("raw_shell", False)
- if self.ssh_pre_flight:
+ if self.ssh_pre_flight and not raw_shell:
if not self.opts.get("ssh_run_pre_flight", False) and self.check_thin_dir():
log.info(
"%s thin dir already exists. Not running ssh_pre_flight script",
@@ -1070,14 +1185,16 @@ class Single:
stdout, stderr, retcode = self.run_ssh_pre_flight()
if retcode != 0:
log.error(
- "Error running ssh_pre_flight script %s", self.ssh_pre_file
+ "Error running ssh_pre_flight script %s for host '%s'",
+ self.ssh_pre_file,
+ self.target["host"],
)
return stdout, stderr, retcode
log.info(
"Successfully ran the ssh_pre_flight script: %s", self.ssh_pre_file
)
- if self.opts.get("raw_shell", False):
+ if raw_shell:
cmd_str = " ".join([self._escape_arg(arg) for arg in self.argv])
stdout, stderr, retcode = self.shell.exec_cmd(cmd_str)
@@ -1335,15 +1452,24 @@ ARGS = {arguments}\n'''.format(
return cmd
- def execute_script(self, script, extension="py", pre_dir=""):
+ def execute_script(self, script, extension="py", pre_dir="", script_args=None):
"""
execute a script on the minion then delete
"""
+ args = ""
+ if script_args:
+ args = " {}".format(
+ " ".join([str(el) for el in script_args])
+ if isinstance(script_args, (list, tuple))
+ else script_args
+ )
if extension == "ps1":
ret = self.shell.exec_cmd('"powershell {}"'.format(script))
else:
if not self.winrm:
- ret = self.shell.exec_cmd("/bin/sh '{}{}'".format(pre_dir, script))
+ ret = self.shell.exec_cmd(
+ "/bin/sh '{}{}'{}".format(pre_dir, script, args)
+ )
else:
ret = saltwinshell.call_python(self, script)
diff --git a/salt/client/ssh/client.py b/salt/client/ssh/client.py
index 245e1529c6..a45deeb325 100644
--- a/salt/client/ssh/client.py
+++ b/salt/client/ssh/client.py
@@ -107,7 +107,7 @@ class SSHClient:
return sane_kwargs
def _prep_ssh(
- self, tgt, fun, arg=(), timeout=None, tgt_type="glob", kwarg=None, **kwargs
+ self, tgt, fun, arg=(), timeout=None, tgt_type="glob", kwarg=None, context=None, **kwargs
):
"""
Prepare the arguments
@@ -122,7 +122,7 @@ class SSHClient:
opts["selected_target_option"] = tgt_type
opts["tgt"] = tgt
opts["arg"] = arg
- return salt.client.ssh.SSH(opts)
+ return salt.client.ssh.SSH(opts, context=context)
def cmd_iter(
self,
@@ -159,7 +159,7 @@ class SSHClient:
final.update(ret)
return final
- def cmd_sync(self, low):
+ def cmd_sync(self, low, context=None):
"""
Execute a salt-ssh call synchronously.
@@ -192,6 +192,7 @@ class SSHClient:
low.get("timeout"),
low.get("tgt_type"),
low.get("kwarg"),
+ context=context,
**kwargs
)
diff --git a/salt/client/ssh/shell.py b/salt/client/ssh/shell.py
index 7461618a2e..6b54a20abd 100644
--- a/salt/client/ssh/shell.py
+++ b/salt/client/ssh/shell.py
@@ -442,6 +442,14 @@ class Shell:
if stdout:
old_stdout = stdout
time.sleep(0.01)
+ if term.exitstatus is None:
+ try:
+ term.wait()
+ except: # pylint: disable=broad-except
+ # It's safe to put the broad exception handling here
+ # as we just need to ensure the child process in term finished
+ # to get proper term.exitstatus instead of None
+ pass
return ret_stdout, ret_stderr, term.exitstatus
finally:
term.close(terminate=True, kill=True)
diff --git a/salt/client/ssh/ssh_py_shim.py b/salt/client/ssh/ssh_py_shim.py
index b77749f495..293ea1b7fa 100644
--- a/salt/client/ssh/ssh_py_shim.py
+++ b/salt/client/ssh/ssh_py_shim.py
@@ -279,56 +279,72 @@ def main(argv): # pylint: disable=W0613
"""
Main program body
"""
- thin_path = os.path.join(OPTIONS.saltdir, THIN_ARCHIVE)
- if os.path.isfile(thin_path):
- if OPTIONS.checksum != get_hash(thin_path, OPTIONS.hashfunc):
- need_deployment()
- unpack_thin(thin_path)
- # Salt thin now is available to use
- else:
- if not sys.platform.startswith("win"):
- scpstat = subprocess.Popen(["/bin/sh", "-c", "command -v scp"]).wait()
- if scpstat != 0:
- sys.exit(EX_SCP_NOT_FOUND)
-
- if os.path.exists(OPTIONS.saltdir) and not os.path.isdir(OPTIONS.saltdir):
- sys.stderr.write(
- 'ERROR: salt path "{0}" exists but is not a directory\n'.format(
- OPTIONS.saltdir
+
+ virt_env = os.getenv("VIRTUAL_ENV", None)
+ # VIRTUAL_ENV environment variable is defined by venv-salt-minion wrapper
+ # it's used to check if the shim is running under this wrapper
+ venv_salt_call = None
+ if virt_env and "venv-salt-minion" in virt_env:
+ venv_salt_call = os.path.join(virt_env, "bin", "salt-call")
+ if not os.path.exists(venv_salt_call):
+ venv_salt_call = None
+ elif not os.path.exists(OPTIONS.saltdir):
+ os.makedirs(OPTIONS.saltdir)
+ cache_dir = os.path.join(OPTIONS.saltdir, "running_data", "var", "cache")
+ os.makedirs(os.path.join(cache_dir, "salt"))
+ os.symlink("salt", os.path.relpath(os.path.join(cache_dir, "venv-salt-minion")))
+
+ if venv_salt_call is None:
+ # Use Salt thin only if Salt Bundle (venv-salt-minion) is not available
+ thin_path = os.path.join(OPTIONS.saltdir, THIN_ARCHIVE)
+ if os.path.isfile(thin_path):
+ if OPTIONS.checksum != get_hash(thin_path, OPTIONS.hashfunc):
+ need_deployment()
+ unpack_thin(thin_path)
+ # Salt thin now is available to use
+ else:
+ if not sys.platform.startswith("win"):
+ scpstat = subprocess.Popen(["/bin/sh", "-c", "command -v scp"]).wait()
+ if scpstat != 0:
+ sys.exit(EX_SCP_NOT_FOUND)
+
+ if os.path.exists(OPTIONS.saltdir) and not os.path.isdir(OPTIONS.saltdir):
+ sys.stderr.write(
+ 'ERROR: salt path "{0}" exists but is'
+ " not a directory\n".format(OPTIONS.saltdir)
)
- )
- sys.exit(EX_CANTCREAT)
+ sys.exit(EX_CANTCREAT)
- if not os.path.exists(OPTIONS.saltdir):
- need_deployment()
+ if not os.path.exists(OPTIONS.saltdir):
+ need_deployment()
- code_checksum_path = os.path.normpath(
- os.path.join(OPTIONS.saltdir, "code-checksum")
- )
- if not os.path.exists(code_checksum_path) or not os.path.isfile(
- code_checksum_path
- ):
- sys.stderr.write(
- "WARNING: Unable to locate current code checksum: {0}.\n".format(
- code_checksum_path
- )
+ code_checksum_path = os.path.normpath(
+ os.path.join(OPTIONS.saltdir, "code-checksum")
)
- need_deployment()
- with open(code_checksum_path, "r") as vpo:
- cur_code_cs = vpo.readline().strip()
- if cur_code_cs != OPTIONS.code_checksum:
- sys.stderr.write(
- "WARNING: current code checksum {0} is different to {1}.\n".format(
- cur_code_cs, OPTIONS.code_checksum
+ if not os.path.exists(code_checksum_path) or not os.path.isfile(
+ code_checksum_path
+ ):
+ sys.stderr.write(
+ "WARNING: Unable to locate current code checksum: {0}.\n".format(
+ code_checksum_path
+ )
)
- )
- need_deployment()
- # Salt thin exists and is up-to-date - fall through and use it
+ need_deployment()
+ with open(code_checksum_path, "r") as vpo:
+ cur_code_cs = vpo.readline().strip()
+ if cur_code_cs != OPTIONS.code_checksum:
+ sys.stderr.write(
+ "WARNING: current code checksum {0} is different to {1}.\n".format(
+ cur_code_cs, OPTIONS.code_checksum
+ )
+ )
+ need_deployment()
+ # Salt thin exists and is up-to-date - fall through and use it
- salt_call_path = os.path.join(OPTIONS.saltdir, "salt-call")
- if not os.path.isfile(salt_call_path):
- sys.stderr.write('ERROR: thin is missing "{0}"\n'.format(salt_call_path))
- need_deployment()
+ salt_call_path = os.path.join(OPTIONS.saltdir, "salt-call")
+ if not os.path.isfile(salt_call_path):
+ sys.stderr.write('ERROR: thin is missing "{0}"\n'.format(salt_call_path))
+ need_deployment()
with open(os.path.join(OPTIONS.saltdir, "minion"), "w") as config:
config.write(OPTIONS.config + "\n")
@@ -351,8 +367,8 @@ def main(argv): # pylint: disable=W0613
argv_prepared = ARGS
salt_argv = [
- get_executable(),
- salt_call_path,
+ sys.executable if venv_salt_call is not None else get_executable(),
+ venv_salt_call if venv_salt_call is not None else salt_call_path,
"--retcode-passthrough",
"--local",
"--metadata",
diff --git a/salt/loader/__init__.py b/salt/loader/__init__.py
index f7815acc03..a0f2220476 100644
--- a/salt/loader/__init__.py
+++ b/salt/loader/__init__.py
@@ -8,6 +8,7 @@ import contextlib
import logging
import os
import re
+import threading
import time
import types
@@ -31,7 +32,7 @@ from salt.exceptions import LoaderError
from salt.template import check_render_pipe_str
from salt.utils import entrypoints
-from .lazy import SALT_BASE_PATH, FilterDictWrapper, LazyLoader
+from .lazy import SALT_BASE_PATH, FilterDictWrapper, LazyLoader as _LazyLoader
log = logging.getLogger(__name__)
@@ -81,6 +82,18 @@ SALT_INTERNAL_LOADERS_PATHS = (
str(SALT_BASE_PATH / "wheel"),
)
+LOAD_LOCK = threading.Lock()
+
+
+def LazyLoader(*args, **kwargs):
+ # This wrapper is used to prevent deadlocks with importlib (bsc#1182851)
+ # LOAD_LOCK is also used directly in salt.client.ssh.SSH
+ try:
+ LOAD_LOCK.acquire()
+ return _LazyLoader(*args, **kwargs)
+ finally:
+ LOAD_LOCK.release()
+
def static_loader(
opts,
@@ -597,16 +610,19 @@ def fileserver(opts, backends):
)
-def roster(opts, runner=None, utils=None, whitelist=None):
+def roster(opts, runner=None, utils=None, whitelist=None, context=None):
"""
Returns the roster modules
"""
+ if context is None:
+ context = {}
+
return LazyLoader(
_module_dirs(opts, "roster"),
opts,
tag="roster",
whitelist=whitelist,
- pack={"__runner__": runner, "__utils__": utils},
+ pack={"__runner__": runner, "__utils__": utils, "__context__": context},
extra_module_dirs=utils.module_dirs if utils else None,
)
@@ -744,7 +760,14 @@ def render(opts, functions, states=None, proxy=None, context=None):
)
rend = FilterDictWrapper(ret, ".render")
- if not check_render_pipe_str(
+ def _check_render_pipe_str(pipestr, renderers, blacklist, whitelist):
+ try:
+ LOAD_LOCK.acquire()
+ return check_render_pipe_str(pipestr, renderers, blacklist, whitelist)
+ finally:
+ LOAD_LOCK.release()
+
+ if not _check_render_pipe_str(
opts["renderer"], rend, opts["renderer_blacklist"], opts["renderer_whitelist"]
):
err = (
diff --git a/salt/netapi/__init__.py b/salt/netapi/__init__.py
index 81954acb96..5d2ff994a6 100644
--- a/salt/netapi/__init__.py
+++ b/salt/netapi/__init__.py
@@ -46,6 +46,7 @@ class NetapiClient:
self.loadauth = salt.auth.LoadAuth(apiopts)
self.key = salt.daemons.masterapi.access_keys(apiopts)
self.ckminions = salt.utils.minions.CkMinions(apiopts)
+ self.context = {}
def _is_master_running(self):
"""
@@ -205,7 +206,7 @@ class NetapiClient:
with salt.client.ssh.client.SSHClient(
mopts=self.opts, disable_custom_roster=True
) as client:
- return client.cmd_sync(kwargs)
+ return client.cmd_sync(kwargs, context=self.context)
def runner(self, fun, timeout=None, full_return=False, **kwargs):
"""
diff --git a/salt/roster/__init__.py b/salt/roster/__init__.py
index b45afffd24..4b6182b2dd 100644
--- a/salt/roster/__init__.py
+++ b/salt/roster/__init__.py
@@ -59,7 +59,7 @@ class Roster:
minion aware
"""
- def __init__(self, opts, backends="flat"):
+ def __init__(self, opts, backends="flat", context=None):
self.opts = opts
if isinstance(backends, list):
self.backends = backends
@@ -71,7 +71,9 @@ class Roster:
self.backends = ["flat"]
utils = salt.loader.utils(self.opts)
runner = salt.loader.runner(self.opts, utils=utils)
- self.rosters = salt.loader.roster(self.opts, runner=runner, utils=utils)
+ self.rosters = salt.loader.roster(
+ self.opts, runner=runner, utils=utils, context=context
+ )
def _gen_back(self):
"""
diff --git a/tests/unit/test_loader.py b/tests/unit/test_loader.py
index 2319f815d3..e83f86cd01 100644
--- a/tests/unit/test_loader.py
+++ b/tests/unit/test_loader.py
@@ -1696,7 +1696,7 @@ class LazyLoaderRefreshFileMappingTest(TestCase):
cls.funcs = salt.loader.minion_mods(cls.opts, utils=cls.utils, proxy=cls.proxy)
def setUp(self):
- class LazyLoaderMock(salt.loader.LazyLoader):
+ class LazyLoaderMock(salt.loader._LazyLoader):
pass
self.LOADER_CLASS = LazyLoaderMock
--
2.35.1