SHA256
1
0
forked from pool/salt
salt/revert-usage-of-long-running-req-channel-bsc-1213960.patch

419 lines
16 KiB
Diff
Raw Normal View History

From 3cc2aee2290bd9a4fba9e0cebe3b65370aa76af6 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Pablo=20Su=C3=A1rez=20Hern=C3=A1ndez?=
<psuarezhernandez@suse.com>
Date: Fri, 1 Sep 2023 08:22:44 +0100
Subject: [PATCH] Revert usage of long running REQ channel (bsc#1213960,
bsc#1213630, bsc#1213257)
* Revert usage of long running REQ channel (bsc#1213960, bsc#1213630, bsc#1213257)
This reverts commits:
https://github.com/saltstack/salt/commit/a99ffb557b8a1142225d4925aba4a5e493923d2f
https://github.com/saltstack/salt/commit/80ae5188807550e7592fa12d8661ee83c4313ec8
https://github.com/saltstack/salt/commit/3c7e1ec1f08abd7cd1ba78ad7880acb6ba6fdce7
https://github.com/saltstack/salt/commit/171926cc57618b51bf3fdc042b62212e681180fc
From this PR: https://github.com/saltstack/salt/pull/61468
See: https://github.com/saltstack/salt/issues/62959#issuecomment-1658335432
* Revert "Fix linter"
This reverts commit d09d2d3f31e06c554b4ed869b7dc4f8b8bce5dc9.
* Revert "add a regression test"
This reverts commit b2c32be0a80c92585a9063409c42895357bb0dbe.
* Fix failing tests after reverting commits
---
doc/topics/development/architecture.rst | 8 +-
salt/channel/client.py | 9 +--
salt/minion.py | 47 +++--------
salt/transport/ipc.py | 5 +-
salt/utils/asynchronous.py | 2 +-
.../transport/server/test_req_channel.py | 16 ++--
tests/pytests/unit/test_minion.py | 79 ++++---------------
7 files changed, 39 insertions(+), 127 deletions(-)
diff --git a/doc/topics/development/architecture.rst b/doc/topics/development/architecture.rst
index 17400db001..1c717092f8 100644
--- a/doc/topics/development/architecture.rst
+++ b/doc/topics/development/architecture.rst
@@ -220,15 +220,11 @@ the received message.
4) The new minion thread is created. The _thread_return() function starts up
and actually calls out to the requested function contained in the job.
5) The requested function runs and returns a result. [Still in thread.]
-6) The result of the function that's run is published on the minion's local event bus with event
-tag "__master_req_channel_payload" [Still in thread.]
+6) The result of the function that's run is encrypted and returned to the
+master's ReqServer (TCP 4506 on master). [Still in thread.]
7) Thread exits. Because the main thread was only blocked for the time that it
took to initialize the worker thread, many other requests could have been
received and processed during this time.
-8) Minion event handler gets the event with tag "__master_req_channel_payload"
-and sends the payload to master's ReqServer (TCP 4506 on master), via the long-running async request channel
-that was opened when minion first started up.
-
A Note on ClearFuncs vs. AESFuncs
diff --git a/salt/channel/client.py b/salt/channel/client.py
index e5b073ccdb..76d7a8e5b9 100644
--- a/salt/channel/client.py
+++ b/salt/channel/client.py
@@ -98,7 +98,6 @@ class AsyncReqChannel:
"_crypted_transfer",
"_uncrypted_transfer",
"send",
- "connect",
]
close_methods = [
"close",
@@ -128,7 +127,7 @@ class AsyncReqChannel:
else:
auth = None
- transport = salt.transport.request_client(opts, io_loop=io_loop)
+ transport = salt.transport.request_client(opts, io_loop)
return cls(opts, transport, auth)
def __init__(self, opts, transport, auth, **kwargs):
@@ -271,10 +270,6 @@ class AsyncReqChannel:
raise salt.ext.tornado.gen.Return(ret)
- @salt.ext.tornado.gen.coroutine
- def connect(self):
- yield self.transport.connect()
-
@salt.ext.tornado.gen.coroutine
def send(self, load, tries=3, timeout=60, raw=False):
"""
@@ -295,7 +290,7 @@ class AsyncReqChannel:
ret = yield self._crypted_transfer(load, timeout=timeout, raw=raw)
break
except Exception as exc: # pylint: disable=broad-except
- log.trace("Failed to send msg %r", exc)
+ log.error("Failed to send msg %r", exc)
if _try >= tries:
raise
else:
diff --git a/salt/minion.py b/salt/minion.py
index c3b65f16c3..9597d6e63a 100644
--- a/salt/minion.py
+++ b/salt/minion.py
@@ -1361,30 +1361,11 @@ class Minion(MinionBase):
"""
Return a future which will complete when you are connected to a master
"""
- # Consider refactoring so that eval_master does not have a subtle side-effect on the contents of the opts array
master, self.pub_channel = yield self.eval_master(
self.opts, self.timeout, self.safe, failed
)
-
- # a long-running req channel
- self.req_channel = salt.transport.client.AsyncReqChannel.factory(
- self.opts, io_loop=self.io_loop
- )
-
- if hasattr(
- self.req_channel, "connect"
- ): # TODO: consider generalizing this for all channels
- log.debug("Connecting minion's long-running req channel")
- yield self.req_channel.connect()
-
yield self._post_master_init(master)
- @salt.ext.tornado.gen.coroutine
- def handle_payload(self, payload, reply_func):
- self.payloads.append(payload)
- yield reply_func(payload)
- self.payload_ack.notify()
-
# TODO: better name...
@salt.ext.tornado.gen.coroutine
def _post_master_init(self, master):
@@ -1599,6 +1580,7 @@ class Minion(MinionBase):
return functions, returners, errors, executors
def _send_req_sync(self, load, timeout):
+
if self.opts["minion_sign_messages"]:
log.trace("Signing event to be published onto the bus.")
minion_privkey_path = os.path.join(self.opts["pki_dir"], "minion.pem")
@@ -1607,11 +1589,9 @@ class Minion(MinionBase):
)
load["sig"] = sig
- with salt.utils.event.get_event(
- "minion", opts=self.opts, listen=False
- ) as event:
- return event.fire_event(
- load, "__master_req_channel_payload", timeout=timeout
+ with salt.channel.client.ReqChannel.factory(self.opts) as channel:
+ return channel.send(
+ load, timeout=timeout, tries=self.opts["return_retry_tries"]
)
@salt.ext.tornado.gen.coroutine
@@ -1624,11 +1604,9 @@ class Minion(MinionBase):
)
load["sig"] = sig
- with salt.utils.event.get_event(
- "minion", opts=self.opts, listen=False
- ) as event:
- ret = yield event.fire_event_async(
- load, "__master_req_channel_payload", timeout=timeout
+ with salt.channel.client.AsyncReqChannel.factory(self.opts) as channel:
+ ret = yield channel.send(
+ load, timeout=timeout, tries=self.opts["return_retry_tries"]
)
raise salt.ext.tornado.gen.Return(ret)
@@ -2055,7 +2033,7 @@ class Minion(MinionBase):
minion_instance._return_pub(ret)
# Add default returners from minion config
- # Should have been converted to comma-delimited string already
+ # Should have been coverted to comma-delimited string already
if isinstance(opts.get("return"), str):
if data["ret"]:
data["ret"] = ",".join((data["ret"], opts["return"]))
@@ -2662,7 +2640,6 @@ class Minion(MinionBase):
"""
Send mine data to the master
"""
- # Consider using a long-running req channel to send mine data
with salt.channel.client.ReqChannel.factory(self.opts) as channel:
data["tok"] = self.tok
try:
@@ -2699,12 +2676,6 @@ class Minion(MinionBase):
force_refresh=data.get("force_refresh", False),
notify=data.get("notify", False),
)
- elif tag.startswith("__master_req_channel_payload"):
- yield _minion.req_channel.send(
- data,
- timeout=_minion._return_retry_timer(),
- tries=_minion.opts["return_retry_tries"],
- )
elif tag.startswith("pillar_refresh"):
yield _minion.pillar_refresh(
force_refresh=data.get("force_refresh", False),
@@ -3175,7 +3146,7 @@ class Minion(MinionBase):
if self._target_load(payload["load"]):
self._handle_decoded_payload(payload["load"])
elif self.opts["zmq_filtering"]:
- # In the filtering enabled case, we'd like to know when minion sees something it shouldn't
+ # In the filtering enabled case, we'd like to know when minion sees something it shouldnt
log.trace(
"Broadcast message received not for this minion, Load: %s",
payload["load"],
diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py
index 3a3f0c7a5f..cee100b086 100644
--- a/salt/transport/ipc.py
+++ b/salt/transport/ipc.py
@@ -208,10 +208,7 @@ class IPCServer:
log.error("Exception occurred while handling stream: %s", exc)
def handle_connection(self, connection, address):
- log.trace(
- "IPCServer: Handling connection to address: %s",
- address if address else connection,
- )
+ log.trace("IPCServer: Handling connection to address: %s", address)
try:
with salt.utils.asynchronous.current_ioloop(self.io_loop):
stream = IOStream(
diff --git a/salt/utils/asynchronous.py b/salt/utils/asynchronous.py
index 0c645bbc3b..88596a4a20 100644
--- a/salt/utils/asynchronous.py
+++ b/salt/utils/asynchronous.py
@@ -34,7 +34,7 @@ class SyncWrapper:
This is uses as a simple wrapper, for example:
asynchronous = AsyncClass()
- # this method would regularly return a future
+ # this method would reguarly return a future
future = asynchronous.async_method()
sync = SyncWrapper(async_factory_method, (arg1, arg2), {'kwarg1': 'val'})
diff --git a/tests/pytests/functional/transport/server/test_req_channel.py b/tests/pytests/functional/transport/server/test_req_channel.py
index 4a74802a0d..555c040c1c 100644
--- a/tests/pytests/functional/transport/server/test_req_channel.py
+++ b/tests/pytests/functional/transport/server/test_req_channel.py
@@ -124,7 +124,7 @@ def req_channel_crypt(request):
@pytest.fixture
-def push_channel(req_server_channel, salt_minion, req_channel_crypt):
+def req_channel(req_server_channel, salt_minion, req_channel_crypt):
with salt.channel.client.ReqChannel.factory(
salt_minion.config, crypt=req_channel_crypt
) as _req_channel:
@@ -135,7 +135,7 @@ def push_channel(req_server_channel, salt_minion, req_channel_crypt):
_req_channel.obj._refcount = 0
-def test_basic(push_channel):
+def test_basic(req_channel):
"""
Test a variety of messages, make sure we get the expected responses
"""
@@ -145,11 +145,11 @@ def test_basic(push_channel):
{"baz": "qux", "list": [1, 2, 3]},
]
for msg in msgs:
- ret = push_channel.send(dict(msg), timeout=5, tries=1)
+ ret = req_channel.send(dict(msg), timeout=5, tries=1)
assert ret["load"] == msg
-def test_normalization(push_channel):
+def test_normalization(req_channel):
"""
Since we use msgpack, we need to test that list types are converted to lists
"""
@@ -160,21 +160,21 @@ def test_normalization(push_channel):
{"list": tuple([1, 2, 3])},
]
for msg in msgs:
- ret = push_channel.send(msg, timeout=5, tries=1)
+ ret = req_channel.send(msg, timeout=5, tries=1)
for key, value in ret["load"].items():
assert types[key] == type(value)
-def test_badload(push_channel, req_channel_crypt):
+def test_badload(req_channel, req_channel_crypt):
"""
Test a variety of bad requests, make sure that we get some sort of error
"""
msgs = ["", [], tuple()]
if req_channel_crypt == "clear":
for msg in msgs:
- ret = push_channel.send(msg, timeout=5, tries=1)
+ ret = req_channel.send(msg, timeout=5, tries=1)
assert ret == "payload and load must be a dict"
else:
for msg in msgs:
with pytest.raises(salt.exceptions.AuthenticationError):
- push_channel.send(msg, timeout=5, tries=1)
+ req_channel.send(msg, timeout=5, tries=1)
diff --git a/tests/pytests/unit/test_minion.py b/tests/pytests/unit/test_minion.py
index 1cee025a48..4508eaee95 100644
--- a/tests/pytests/unit/test_minion.py
+++ b/tests/pytests/unit/test_minion.py
@@ -55,27 +55,26 @@ def test_minion_load_grains_default():
@pytest.mark.parametrize(
- "event",
+ "req_channel",
[
(
- "fire_event",
- lambda data, tag, cb=None, timeout=60: True,
+ "salt.channel.client.AsyncReqChannel.factory",
+ lambda load, timeout, tries: salt.ext.tornado.gen.maybe_future(tries),
),
(
- "fire_event_async",
- lambda data, tag, cb=None, timeout=60: salt.ext.tornado.gen.maybe_future(
- True
- ),
+ "salt.channel.client.ReqChannel.factory",
+ lambda load, timeout, tries: tries,
),
],
)
-def test_send_req_fires_completion_event(event, minion_opts):
- event_enter = MagicMock()
- event_enter.send.side_effect = event[1]
- event = MagicMock()
- event.__enter__.return_value = event_enter
+def test_send_req_tries(req_channel, minion_opts):
+ channel_enter = MagicMock()
+ channel_enter.send.side_effect = req_channel[1]
+ channel = MagicMock()
+ channel.__enter__.return_value = channel_enter
- with patch("salt.utils.event.get_event", return_value=event):
+ with patch(req_channel[0], return_value=channel):
+ minion_opts = salt.config.DEFAULT_MINION_OPTS.copy()
minion_opts["random_startup_delay"] = 0
minion_opts["return_retry_tries"] = 30
minion_opts["grains"] = {}
@@ -85,62 +84,16 @@ def test_send_req_fires_completion_event(event, minion_opts):
load = {"load": "value"}
timeout = 60
- # XXX This is buggy because "async" in event[0] will never evaluate
- # to True and if it *did* evaluate to true the test would fail
- # because you Mock isn't a co-routine.
- if "async" in event[0]:
+ if "Async" in req_channel[0]:
rtn = minion._send_req_async(load, timeout).result()
else:
rtn = minion._send_req_sync(load, timeout)
- # get the
- for idx, call in enumerate(event.mock_calls, 1):
- if "fire_event" in call[0]:
- condition_event_tag = (
- len(call.args) > 1
- and call.args[1] == "__master_req_channel_payload"
- )
- condition_event_tag_error = "{} != {}; Call(number={}): {}".format(
- idx, call, call.args[1], "__master_req_channel_payload"
- )
- condition_timeout = (
- len(call.kwargs) == 1 and call.kwargs["timeout"] == timeout
- )
- condition_timeout_error = "{} != {}; Call(number={}): {}".format(
- idx, call, call.kwargs["timeout"], timeout
- )
-
- fire_event_called = True
- assert condition_event_tag, condition_event_tag_error
- assert condition_timeout, condition_timeout_error
-
- assert fire_event_called
- assert rtn
-
-
-async def test_send_req_async_regression_62453(minion_opts):
- event_enter = MagicMock()
- event_enter.send.side_effect = (
- lambda data, tag, cb=None, timeout=60: salt.ext.tornado.gen.maybe_future(True)
- )
- event = MagicMock()
- event.__enter__.return_value = event_enter
-
- minion_opts["random_startup_delay"] = 0
- minion_opts["return_retry_tries"] = 30
- minion_opts["grains"] = {}
- with patch("salt.loader.grains"):
- minion = salt.minion.Minion(minion_opts)
-
- load = {"load": "value"}
- timeout = 60
-
- # We are just validating no exception is raised
- rtn = await minion._send_req_async(load, timeout)
- assert rtn is False
+ assert rtn == 30
-def test_mine_send_tries():
+@patch("salt.channel.client.ReqChannel.factory")
+def test_mine_send_tries(req_channel_factory):
channel_enter = MagicMock()
channel_enter.send.side_effect = lambda load, timeout, tries: tries
channel = MagicMock()
--
2.41.0