218a0bb719
- Several fixes for tests to avoid errors and failures in some OSes - Speed up salt.matcher.confirm_top by using __context__ - Do not call the async wrapper calls with the separate thread - Prevent OOM with high amount of batch async calls (bsc#1216063) - Add missing contextvars dependency in salt.version - Skip tests for unsupported algorithm on old OpenSSL version - Remove redundant `_file_find` call to the master - Prevent possible exception in tornado.concurrent.Future._set_done - Make reactor engine less blocking the EventPublisher - Make salt-master self recoverable on killing EventPublisher - Improve broken events catching and reporting - Make logging calls lighter - Remove unused import causing delays on starting salt-master - Added: * improve-broken-events-catching-and-reporting.patch * add-missing-contextvars-dependency-in-salt.version.patch * prevent-oom-with-high-amount-of-batch-async-calls-bs.patch * speed-up-salt.matcher.confirm_top-by-using-__context.patch * remove-redundant-_file_find-call-to-the-master.patch * make-logging-calls-lighter.patch * make-salt-master-self-recoverable-on-killing-eventpu.patch * skip-tests-for-unsupported-algorithm-on-old-openssl-.patch * remove-unused-import-causing-delays-on-starting-salt.patch * do-not-call-the-async-wrapper-calls-with-the-separat.patch * prevent-possible-exception-in-tornado.concurrent.fut.patch * several-fixes-for-tests-to-avoid-errors-and-failures.patch * make-reactor-engine-less-blocking-the-eventpublisher.patch OBS-URL: https://build.opensuse.org/request/show/1177104 OBS-URL: https://build.opensuse.org/package/show/systemsmanagement:saltstack/salt?expand=0&rev=243
105 lines
3.6 KiB
Diff
105 lines
3.6 KiB
Diff
From 0d35f09288700f5c961567442c3fcc25838b8de4 Mon Sep 17 00:00:00 2001
|
|
From: Victor Zhestkov <vzhestkov@suse.com>
|
|
Date: Wed, 15 May 2024 09:44:21 +0200
|
|
Subject: [PATCH] Make reactor engine less blocking the EventPublisher
|
|
|
|
---
|
|
salt/utils/reactor.py | 45 +++++++++++++++++++++++++++----------------
|
|
1 file changed, 28 insertions(+), 17 deletions(-)
|
|
|
|
diff --git a/salt/utils/reactor.py b/salt/utils/reactor.py
|
|
index 19420a51cf..78adad34da 100644
|
|
--- a/salt/utils/reactor.py
|
|
+++ b/salt/utils/reactor.py
|
|
@@ -1,10 +1,12 @@
|
|
"""
|
|
Functions which implement running reactor jobs
|
|
"""
|
|
+
|
|
import fnmatch
|
|
import glob
|
|
import logging
|
|
import os
|
|
+from threading import Lock
|
|
|
|
import salt.client
|
|
import salt.defaults.exitcodes
|
|
@@ -194,13 +196,6 @@ class Reactor(salt.utils.process.SignalHandlingProcess, salt.state.Compiler):
|
|
self.resolve_aliases(chunks)
|
|
return chunks
|
|
|
|
- def call_reactions(self, chunks):
|
|
- """
|
|
- Execute the reaction state
|
|
- """
|
|
- for chunk in chunks:
|
|
- self.wrap.run(chunk)
|
|
-
|
|
def run(self):
|
|
"""
|
|
Enter into the server loop
|
|
@@ -218,7 +213,7 @@ class Reactor(salt.utils.process.SignalHandlingProcess, salt.state.Compiler):
|
|
) as event:
|
|
self.wrap = ReactWrap(self.opts)
|
|
|
|
- for data in event.iter_events(full=True):
|
|
+ for data in event.iter_events(full=True, auto_reconnect=True):
|
|
# skip all events fired by ourselves
|
|
if data["data"].get("user") == self.wrap.event_user:
|
|
continue
|
|
@@ -268,15 +263,9 @@ class Reactor(salt.utils.process.SignalHandlingProcess, salt.state.Compiler):
|
|
if not self.is_leader:
|
|
continue
|
|
else:
|
|
- reactors = self.list_reactors(data["tag"])
|
|
- if not reactors:
|
|
- continue
|
|
- chunks = self.reactions(data["tag"], data["data"], reactors)
|
|
- if chunks:
|
|
- try:
|
|
- self.call_reactions(chunks)
|
|
- except SystemExit:
|
|
- log.warning("Exit ignored by reactor")
|
|
+ self.wrap.call_reactions(
|
|
+ data, self.list_reactors, self.reactions
|
|
+ )
|
|
|
|
|
|
class ReactWrap:
|
|
@@ -297,6 +286,7 @@ class ReactWrap:
|
|
|
|
def __init__(self, opts):
|
|
self.opts = opts
|
|
+ self._run_lock = Lock()
|
|
if ReactWrap.client_cache is None:
|
|
ReactWrap.client_cache = salt.utils.cache.CacheDict(
|
|
opts["reactor_refresh_interval"]
|
|
@@ -480,3 +470,24 @@ class ReactWrap:
|
|
Wrap LocalCaller to execute remote exec functions locally on the Minion
|
|
"""
|
|
self.client_cache["caller"].cmd(fun, *kwargs["arg"], **kwargs["kwarg"])
|
|
+
|
|
+ def _call_reactions(self, data, list_reactors, get_reactions):
|
|
+ reactors = list_reactors(data["tag"])
|
|
+ if not reactors:
|
|
+ return
|
|
+ chunks = get_reactions(data["tag"], data["data"], reactors)
|
|
+ if not chunks:
|
|
+ return
|
|
+ with self._run_lock:
|
|
+ try:
|
|
+ for chunk in chunks:
|
|
+ self.run(chunk)
|
|
+ except Exception as exc: # pylint: disable=broad-except
|
|
+ log.error(
|
|
+ "Exception while calling the reactions: %s", exc, exc_info=True
|
|
+ )
|
|
+
|
|
+ def call_reactions(self, data, list_reactors, get_reactions):
|
|
+ return self.pool.fire_async(
|
|
+ self._call_reactions, args=(data, list_reactors, get_reactions)
|
|
+ )
|
|
--
|
|
2.45.0
|
|
|