salt/make-reactor-engine-less-blocking-the-eventpublisher.patch

105 lines
3.6 KiB
Diff

From 0d35f09288700f5c961567442c3fcc25838b8de4 Mon Sep 17 00:00:00 2001
From: Victor Zhestkov <vzhestkov@suse.com>
Date: Wed, 15 May 2024 09:44:21 +0200
Subject: [PATCH] Make reactor engine less blocking the EventPublisher
---
salt/utils/reactor.py | 45 +++++++++++++++++++++++++++----------------
1 file changed, 28 insertions(+), 17 deletions(-)
diff --git a/salt/utils/reactor.py b/salt/utils/reactor.py
index 19420a51cf..78adad34da 100644
--- a/salt/utils/reactor.py
+++ b/salt/utils/reactor.py
@@ -1,10 +1,12 @@
"""
Functions which implement running reactor jobs
"""
+
import fnmatch
import glob
import logging
import os
+from threading import Lock
import salt.client
import salt.defaults.exitcodes
@@ -194,13 +196,6 @@ class Reactor(salt.utils.process.SignalHandlingProcess, salt.state.Compiler):
self.resolve_aliases(chunks)
return chunks
- def call_reactions(self, chunks):
- """
- Execute the reaction state
- """
- for chunk in chunks:
- self.wrap.run(chunk)
-
def run(self):
"""
Enter into the server loop
@@ -218,7 +213,7 @@ class Reactor(salt.utils.process.SignalHandlingProcess, salt.state.Compiler):
) as event:
self.wrap = ReactWrap(self.opts)
- for data in event.iter_events(full=True):
+ for data in event.iter_events(full=True, auto_reconnect=True):
# skip all events fired by ourselves
if data["data"].get("user") == self.wrap.event_user:
continue
@@ -268,15 +263,9 @@ class Reactor(salt.utils.process.SignalHandlingProcess, salt.state.Compiler):
if not self.is_leader:
continue
else:
- reactors = self.list_reactors(data["tag"])
- if not reactors:
- continue
- chunks = self.reactions(data["tag"], data["data"], reactors)
- if chunks:
- try:
- self.call_reactions(chunks)
- except SystemExit:
- log.warning("Exit ignored by reactor")
+ self.wrap.call_reactions(
+ data, self.list_reactors, self.reactions
+ )
class ReactWrap:
@@ -297,6 +286,7 @@ class ReactWrap:
def __init__(self, opts):
self.opts = opts
+ self._run_lock = Lock()
if ReactWrap.client_cache is None:
ReactWrap.client_cache = salt.utils.cache.CacheDict(
opts["reactor_refresh_interval"]
@@ -480,3 +470,24 @@ class ReactWrap:
Wrap LocalCaller to execute remote exec functions locally on the Minion
"""
self.client_cache["caller"].cmd(fun, *kwargs["arg"], **kwargs["kwarg"])
+
+ def _call_reactions(self, data, list_reactors, get_reactions):
+ reactors = list_reactors(data["tag"])
+ if not reactors:
+ return
+ chunks = get_reactions(data["tag"], data["data"], reactors)
+ if not chunks:
+ return
+ with self._run_lock:
+ try:
+ for chunk in chunks:
+ self.run(chunk)
+ except Exception as exc: # pylint: disable=broad-except
+ log.error(
+ "Exception while calling the reactions: %s", exc, exc_info=True
+ )
+
+ def call_reactions(self, data, list_reactors, get_reactions):
+ return self.pool.fire_async(
+ self._call_reactions, args=(data, list_reactors, get_reactions)
+ )
--
2.45.0