From c00801d2f9807e49769d0e0d848ec12be555dbc1 Mon Sep 17 00:00:00 2001 From: Victor Zhestkov Date: Wed, 25 Sep 2024 14:07:05 +0300 Subject: [PATCH] Revert the change making reactor less blocking (bsc#1230322) This reverts commit 0d35f09288700f5c961567442c3fcc25838b8de4. --- salt/utils/reactor.py | 45 ++++++++++++++++--------------------------- 1 file changed, 17 insertions(+), 28 deletions(-) diff --git a/salt/utils/reactor.py b/salt/utils/reactor.py index 78adad34da..19420a51cf 100644 --- a/salt/utils/reactor.py +++ b/salt/utils/reactor.py @@ -1,12 +1,10 @@ """ Functions which implement running reactor jobs """ - import fnmatch import glob import logging import os -from threading import Lock import salt.client import salt.defaults.exitcodes @@ -196,6 +194,13 @@ class Reactor(salt.utils.process.SignalHandlingProcess, salt.state.Compiler): self.resolve_aliases(chunks) return chunks + def call_reactions(self, chunks): + """ + Execute the reaction state + """ + for chunk in chunks: + self.wrap.run(chunk) + def run(self): """ Enter into the server loop @@ -213,7 +218,7 @@ class Reactor(salt.utils.process.SignalHandlingProcess, salt.state.Compiler): ) as event: self.wrap = ReactWrap(self.opts) - for data in event.iter_events(full=True, auto_reconnect=True): + for data in event.iter_events(full=True): # skip all events fired by ourselves if data["data"].get("user") == self.wrap.event_user: continue @@ -263,9 +268,15 @@ class Reactor(salt.utils.process.SignalHandlingProcess, salt.state.Compiler): if not self.is_leader: continue else: - self.wrap.call_reactions( - data, self.list_reactors, self.reactions - ) + reactors = self.list_reactors(data["tag"]) + if not reactors: + continue + chunks = self.reactions(data["tag"], data["data"], reactors) + if chunks: + try: + self.call_reactions(chunks) + except SystemExit: + log.warning("Exit ignored by reactor") class ReactWrap: @@ -286,7 +297,6 @@ class ReactWrap: def __init__(self, opts): self.opts = opts - self._run_lock = Lock() if ReactWrap.client_cache is None: ReactWrap.client_cache = salt.utils.cache.CacheDict( opts["reactor_refresh_interval"] @@ -470,24 +480,3 @@ class ReactWrap: Wrap LocalCaller to execute remote exec functions locally on the Minion """ self.client_cache["caller"].cmd(fun, *kwargs["arg"], **kwargs["kwarg"]) - - def _call_reactions(self, data, list_reactors, get_reactions): - reactors = list_reactors(data["tag"]) - if not reactors: - return - chunks = get_reactions(data["tag"], data["data"], reactors) - if not chunks: - return - with self._run_lock: - try: - for chunk in chunks: - self.run(chunk) - except Exception as exc: # pylint: disable=broad-except - log.error( - "Exception while calling the reactions: %s", exc, exc_info=True - ) - - def call_reactions(self, data, list_reactors, get_reactions): - return self.pool.fire_async( - self._call_reactions, args=(data, list_reactors, get_reactions) - ) -- 2.46.1