244 lines
9.0 KiB
Diff
244 lines
9.0 KiB
Diff
From 794b5d1aa7b8e880e9a21940183d241c6cbde9c9 Mon Sep 17 00:00:00 2001
|
|
From: Victor Zhestkov <vzhestkov@suse.com>
|
|
Date: Wed, 15 May 2024 09:42:23 +0200
|
|
Subject: [PATCH] Make salt-master self recoverable on killing
|
|
EventPublisher
|
|
|
|
* Implement timeout and tries to transport.ipc.IPCClient.send
|
|
|
|
* Make timeout and tries configurable for fire_event
|
|
|
|
* Add test of timeout and tries
|
|
|
|
* Prevent exceptions from tornado Future on closing the IPC connection
|
|
---
|
|
salt/transport/ipc.py | 73 +++++++++++++++++---
|
|
salt/utils/event.py | 21 +++++-
|
|
tests/pytests/unit/utils/event/test_event.py | 43 ++++++++++++
|
|
3 files changed, 125 insertions(+), 12 deletions(-)
|
|
|
|
diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py
|
|
index cee100b086..6631781c5c 100644
|
|
--- a/salt/transport/ipc.py
|
|
+++ b/salt/transport/ipc.py
|
|
@@ -2,7 +2,6 @@
|
|
IPC transport classes
|
|
"""
|
|
|
|
-
|
|
import errno
|
|
import logging
|
|
import socket
|
|
@@ -340,7 +339,8 @@ class IPCClient:
|
|
try:
|
|
log.trace("IPCClient: Connecting to socket: %s", self.socket_path)
|
|
yield self.stream.connect(sock_addr)
|
|
- self._connecting_future.set_result(True)
|
|
+ if self._connecting_future is not None:
|
|
+ self._connecting_future.set_result(True)
|
|
break
|
|
except Exception as e: # pylint: disable=broad-except
|
|
if self.stream.closed():
|
|
@@ -350,7 +350,8 @@ class IPCClient:
|
|
if self.stream is not None:
|
|
self.stream.close()
|
|
self.stream = None
|
|
- self._connecting_future.set_exception(e)
|
|
+ if self._connecting_future is not None:
|
|
+ self._connecting_future.set_exception(e)
|
|
break
|
|
|
|
yield salt.ext.tornado.gen.sleep(1)
|
|
@@ -365,7 +366,13 @@ class IPCClient:
|
|
return
|
|
|
|
self._closing = True
|
|
- self._connecting_future = None
|
|
+ if self._connecting_future is not None:
|
|
+ try:
|
|
+ self._connecting_future.set_result(True)
|
|
+ self._connecting_future.exception() # pylint: disable=E0203
|
|
+ except Exception as e: # pylint: disable=broad-except
|
|
+ log.warning("Unhandled connecting exception: %s", e, exc_info=True)
|
|
+ self._connecting_future = None
|
|
|
|
log.debug("Closing %s instance", self.__class__.__name__)
|
|
|
|
@@ -435,8 +442,6 @@ class IPCMessageClient(IPCClient):
|
|
"close",
|
|
]
|
|
|
|
- # FIXME timeout unimplemented
|
|
- # FIXME tries unimplemented
|
|
@salt.ext.tornado.gen.coroutine
|
|
def send(self, msg, timeout=None, tries=None):
|
|
"""
|
|
@@ -445,12 +450,60 @@ class IPCMessageClient(IPCClient):
|
|
If the socket is not currently connected, a connection will be established.
|
|
|
|
:param dict msg: The message to be sent
|
|
- :param int timeout: Timeout when sending message (Currently unimplemented)
|
|
+ :param int timeout: Timeout when sending message
|
|
+ :param int tries: Maximum numer of tries to send message
|
|
"""
|
|
- if not self.connected():
|
|
- yield self.connect()
|
|
+ if tries is None or tries < 1:
|
|
+ tries = 1
|
|
+ due_time = None
|
|
+ if timeout is not None:
|
|
+ due_time = time.time() + timeout
|
|
+ _try = 1
|
|
+ exc_count = 0
|
|
pack = salt.transport.frame.frame_msg_ipc(msg, raw_body=True)
|
|
- yield self.stream.write(pack)
|
|
+ while _try <= tries:
|
|
+ if not self.connected():
|
|
+ self.close()
|
|
+ self.stream = None
|
|
+ self._closing = False
|
|
+ try:
|
|
+ yield self.connect(
|
|
+ timeout=(
|
|
+ None if due_time is None else max(due_time - time.time(), 1)
|
|
+ )
|
|
+ )
|
|
+ except StreamClosedError:
|
|
+ log.warning(
|
|
+ "IPCMessageClient: Unable to reconnect IPC stream on sending message with ID: 0x%016x%s",
|
|
+ id(msg),
|
|
+ f", retry {_try} of {tries}" if tries > 1 else "",
|
|
+ )
|
|
+ exc_count += 1
|
|
+ if self.connected():
|
|
+ try:
|
|
+ yield self.stream.write(pack)
|
|
+ return
|
|
+ except StreamClosedError:
|
|
+ if self._closing:
|
|
+ break
|
|
+ log.warning(
|
|
+ "IPCMessageClient: Stream was closed on sending message with ID: 0x%016x",
|
|
+ id(msg),
|
|
+ )
|
|
+ exc_count += 1
|
|
+ if exc_count == 1:
|
|
+ # Give one more chance in case if stream was detected as closed
|
|
+ # on the first write attempt
|
|
+ continue
|
|
+ cur_time = time.time()
|
|
+ _try += 1
|
|
+ if _try > tries or (due_time is not None and cur_time > due_time):
|
|
+ return
|
|
+ yield salt.ext.tornado.gen.sleep(
|
|
+ 1
|
|
+ if due_time is None
|
|
+ else (due_time - cur_time) / max(tries - _try + 1, 1)
|
|
+ )
|
|
|
|
|
|
class IPCMessageServer(IPCServer):
|
|
diff --git a/salt/utils/event.py b/salt/utils/event.py
|
|
index ef048335ae..36b530d1af 100644
|
|
--- a/salt/utils/event.py
|
|
+++ b/salt/utils/event.py
|
|
@@ -270,6 +270,10 @@ class SaltEvent:
|
|
# and don't read out events from the buffer on an on-going basis,
|
|
# the buffer will grow resulting in big memory usage.
|
|
self.connect_pub()
|
|
+ self.pusher_send_timeout = self.opts.get(
|
|
+ "pusher_send_timeout", self.opts.get("timeout")
|
|
+ )
|
|
+ self.pusher_send_tries = self.opts.get("pusher_send_tries", 3)
|
|
|
|
@classmethod
|
|
def __load_cache_regex(cls):
|
|
@@ -839,10 +843,18 @@ class SaltEvent:
|
|
]
|
|
)
|
|
msg = salt.utils.stringutils.to_bytes(event, "utf-8")
|
|
+ if timeout is None:
|
|
+ timeout_s = self.pusher_send_timeout
|
|
+ else:
|
|
+ timeout_s = float(timeout) / 1000
|
|
if self._run_io_loop_sync:
|
|
with salt.utils.asynchronous.current_ioloop(self.io_loop):
|
|
try:
|
|
- self.pusher.send(msg)
|
|
+ self.pusher.send(
|
|
+ msg,
|
|
+ timeout=timeout_s,
|
|
+ tries=self.pusher_send_tries,
|
|
+ )
|
|
except Exception as exc: # pylint: disable=broad-except
|
|
log.debug(
|
|
"Publisher send failed with exception: %s",
|
|
@@ -851,7 +863,12 @@ class SaltEvent:
|
|
)
|
|
raise
|
|
else:
|
|
- self.io_loop.spawn_callback(self.pusher.send, msg)
|
|
+ self.io_loop.spawn_callback(
|
|
+ self.pusher.send,
|
|
+ msg,
|
|
+ timeout=timeout_s,
|
|
+ tries=self.pusher_send_tries,
|
|
+ )
|
|
return True
|
|
|
|
def fire_master(self, data, tag, timeout=1000):
|
|
diff --git a/tests/pytests/unit/utils/event/test_event.py b/tests/pytests/unit/utils/event/test_event.py
|
|
index 3eadfaf6ba..fa9e420a93 100644
|
|
--- a/tests/pytests/unit/utils/event/test_event.py
|
|
+++ b/tests/pytests/unit/utils/event/test_event.py
|
|
@@ -447,3 +447,46 @@ def test_event_fire_ret_load():
|
|
)
|
|
assert mock_log_error.mock_calls[0].args[1] == "minion_id.example.org"
|
|
assert mock_log_error.mock_calls[0].args[2] == "".join(test_traceback)
|
|
+
|
|
+
|
|
+@pytest.mark.slow_test
|
|
+def test_event_single_timeout_tries(sock_dir):
|
|
+ """Test an event is sent with timout and tries"""
|
|
+
|
|
+ write_calls_count = 0
|
|
+ real_stream_write = None
|
|
+
|
|
+ @salt.ext.tornado.gen.coroutine
|
|
+ def write_mock(pack):
|
|
+ nonlocal write_calls_count
|
|
+ nonlocal real_stream_write
|
|
+ write_calls_count += 1
|
|
+ if write_calls_count > 3:
|
|
+ yield real_stream_write(pack)
|
|
+ else:
|
|
+ raise salt.ext.tornado.iostream.StreamClosedError()
|
|
+
|
|
+ with eventpublisher_process(str(sock_dir)), salt.utils.event.MasterEvent(
|
|
+ str(sock_dir), listen=True
|
|
+ ) as me:
|
|
+ me.fire_event({"data": "foo1"}, "evt1")
|
|
+ evt1 = me.get_event(tag="evt1")
|
|
+ _assert_got_event(evt1, {"data": "foo1"})
|
|
+ real_stream_write = me.pusher.stream.write
|
|
+ with patch.object(
|
|
+ me.pusher,
|
|
+ "connected",
|
|
+ side_effect=[True, True, False, False, True, True],
|
|
+ ), patch.object(
|
|
+ me.pusher,
|
|
+ "connect",
|
|
+ side_effect=salt.ext.tornado.iostream.StreamClosedError,
|
|
+ ), patch.object(
|
|
+ me.pusher.stream,
|
|
+ "write",
|
|
+ write_mock,
|
|
+ ):
|
|
+ me.fire_event({"data": "bar2"}, "evt2", timeout=5000)
|
|
+ evt2 = me.get_event(tag="evt2")
|
|
+ _assert_got_event(evt2, {"data": "bar2"})
|
|
+ assert write_calls_count == 4
|
|
--
|
|
2.45.0
|
|
|