107 lines
3.7 KiB
Diff
107 lines
3.7 KiB
Diff
|
From c00801d2f9807e49769d0e0d848ec12be555dbc1 Mon Sep 17 00:00:00 2001
|
||
|
From: Victor Zhestkov <vzhestkov@suse.com>
|
||
|
Date: Wed, 25 Sep 2024 14:07:05 +0300
|
||
|
Subject: [PATCH] Revert the change making reactor less blocking
|
||
|
(bsc#1230322)
|
||
|
|
||
|
This reverts commit 0d35f09288700f5c961567442c3fcc25838b8de4.
|
||
|
---
|
||
|
salt/utils/reactor.py | 45 ++++++++++++++++---------------------------
|
||
|
1 file changed, 17 insertions(+), 28 deletions(-)
|
||
|
|
||
|
diff --git a/salt/utils/reactor.py b/salt/utils/reactor.py
|
||
|
index 78adad34da..19420a51cf 100644
|
||
|
--- a/salt/utils/reactor.py
|
||
|
+++ b/salt/utils/reactor.py
|
||
|
@@ -1,12 +1,10 @@
|
||
|
"""
|
||
|
Functions which implement running reactor jobs
|
||
|
"""
|
||
|
-
|
||
|
import fnmatch
|
||
|
import glob
|
||
|
import logging
|
||
|
import os
|
||
|
-from threading import Lock
|
||
|
|
||
|
import salt.client
|
||
|
import salt.defaults.exitcodes
|
||
|
@@ -196,6 +194,13 @@ class Reactor(salt.utils.process.SignalHandlingProcess, salt.state.Compiler):
|
||
|
self.resolve_aliases(chunks)
|
||
|
return chunks
|
||
|
|
||
|
+ def call_reactions(self, chunks):
|
||
|
+ """
|
||
|
+ Execute the reaction state
|
||
|
+ """
|
||
|
+ for chunk in chunks:
|
||
|
+ self.wrap.run(chunk)
|
||
|
+
|
||
|
def run(self):
|
||
|
"""
|
||
|
Enter into the server loop
|
||
|
@@ -213,7 +218,7 @@ class Reactor(salt.utils.process.SignalHandlingProcess, salt.state.Compiler):
|
||
|
) as event:
|
||
|
self.wrap = ReactWrap(self.opts)
|
||
|
|
||
|
- for data in event.iter_events(full=True, auto_reconnect=True):
|
||
|
+ for data in event.iter_events(full=True):
|
||
|
# skip all events fired by ourselves
|
||
|
if data["data"].get("user") == self.wrap.event_user:
|
||
|
continue
|
||
|
@@ -263,9 +268,15 @@ class Reactor(salt.utils.process.SignalHandlingProcess, salt.state.Compiler):
|
||
|
if not self.is_leader:
|
||
|
continue
|
||
|
else:
|
||
|
- self.wrap.call_reactions(
|
||
|
- data, self.list_reactors, self.reactions
|
||
|
- )
|
||
|
+ reactors = self.list_reactors(data["tag"])
|
||
|
+ if not reactors:
|
||
|
+ continue
|
||
|
+ chunks = self.reactions(data["tag"], data["data"], reactors)
|
||
|
+ if chunks:
|
||
|
+ try:
|
||
|
+ self.call_reactions(chunks)
|
||
|
+ except SystemExit:
|
||
|
+ log.warning("Exit ignored by reactor")
|
||
|
|
||
|
|
||
|
class ReactWrap:
|
||
|
@@ -286,7 +297,6 @@ class ReactWrap:
|
||
|
|
||
|
def __init__(self, opts):
|
||
|
self.opts = opts
|
||
|
- self._run_lock = Lock()
|
||
|
if ReactWrap.client_cache is None:
|
||
|
ReactWrap.client_cache = salt.utils.cache.CacheDict(
|
||
|
opts["reactor_refresh_interval"]
|
||
|
@@ -470,24 +480,3 @@ class ReactWrap:
|
||
|
Wrap LocalCaller to execute remote exec functions locally on the Minion
|
||
|
"""
|
||
|
self.client_cache["caller"].cmd(fun, *kwargs["arg"], **kwargs["kwarg"])
|
||
|
-
|
||
|
- def _call_reactions(self, data, list_reactors, get_reactions):
|
||
|
- reactors = list_reactors(data["tag"])
|
||
|
- if not reactors:
|
||
|
- return
|
||
|
- chunks = get_reactions(data["tag"], data["data"], reactors)
|
||
|
- if not chunks:
|
||
|
- return
|
||
|
- with self._run_lock:
|
||
|
- try:
|
||
|
- for chunk in chunks:
|
||
|
- self.run(chunk)
|
||
|
- except Exception as exc: # pylint: disable=broad-except
|
||
|
- log.error(
|
||
|
- "Exception while calling the reactions: %s", exc, exc_info=True
|
||
|
- )
|
||
|
-
|
||
|
- def call_reactions(self, data, list_reactors, get_reactions):
|
||
|
- return self.pool.fire_async(
|
||
|
- self._call_reactions, args=(data, list_reactors, get_reactions)
|
||
|
- )
|
||
|
--
|
||
|
2.46.1
|
||
|
|