From cb472e1f0fc18a554e0de9e3fe6bbe16557957ee Mon Sep 17 00:00:00 2001 From: Silvio Moioli Date: Wed, 20 Sep 2017 14:33:33 +0200 Subject: [PATCH 4/6] Introduce process_count_max minion configuration parameter This allows users to limit the number of processes or threads a minion will start in response to published messages, prevents resource exhaustion in case a high number of concurrent jobs is scheduled in a short time. process_count_max: add defaults and documentation process_count_max: adapt existing unit tests process_count_max: add unit test process_count_max: disable by default --- conf/minion | 6 +++++ doc/ref/configuration/minion.rst | 17 +++++++++++++ salt/config/__init__.py | 4 +++ salt/minion.py | 10 ++++++++ tests/unit/test_minion.py | 53 +++++++++++++++++++++++++++++++++++++--- 5 files changed, 87 insertions(+), 3 deletions(-) diff --git a/conf/minion b/conf/minion index 6cae043295..4a3cddcbd1 100644 --- a/conf/minion +++ b/conf/minion @@ -689,6 +689,12 @@ # for a full explanation. #multiprocessing: True +# Limit the maximum amount of processes or threads created by salt-minion. +# This is useful to avoid resource exhaustion in case the minion receives more +# publications than it is able to handle, as it limits the number of spawned +# processes or threads. -1 is the default and disables the limit. +#process_count_max: -1 + ##### Logging settings ##### ########################################## diff --git a/doc/ref/configuration/minion.rst b/doc/ref/configuration/minion.rst index 5c92b932ab..19e9026dd8 100644 --- a/doc/ref/configuration/minion.rst +++ b/doc/ref/configuration/minion.rst @@ -2352,6 +2352,23 @@ executed in a thread. multiprocessing: True +.. conf_minion:: process_count_max + +``process_count_max`` +------- + +.. versionadded:: Oxygen + +Default: ``-1`` + +Limit the maximum amount of processes or threads created by ``salt-minion``. +This is useful to avoid resource exhaustion in case the minion receives more +publications than it is able to handle, as it limits the number of spawned +processes or threads. ``-1`` is the default and disables the limit. + +.. code-block:: yaml + + process_count_max: -1 .. _minion-logging-settings: diff --git a/salt/config/__init__.py b/salt/config/__init__.py index 0f06f9ccca..668051a789 100644 --- a/salt/config/__init__.py +++ b/salt/config/__init__.py @@ -328,6 +328,9 @@ VALID_OPTS = { # Whether or not processes should be forked when needed. The alternative is to use threading. 'multiprocessing': bool, + # Maximum number of concurrently active processes at any given point in time + 'process_count_max': int, + # Whether or not the salt minion should run scheduled mine updates 'mine_enabled': bool, @@ -1193,6 +1196,7 @@ DEFAULT_MINION_OPTS = { 'auto_accept': True, 'autosign_timeout': 120, 'multiprocessing': True, + 'process_count_max': -1, 'mine_enabled': True, 'mine_return_job': False, 'mine_interval': 60, diff --git a/salt/minion.py b/salt/minion.py index 394b11a2e8..33cbb8fa0a 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -1290,6 +1290,7 @@ class Minion(MinionBase): self._send_req_async(load, timeout, callback=lambda f: None) # pylint: disable=unexpected-keyword-arg return True + @tornado.gen.coroutine def _handle_decoded_payload(self, data): ''' Override this method if you wish to handle the decoded data @@ -1321,6 +1322,15 @@ class Minion(MinionBase): self.functions, self.returners, self.function_errors, self.executors = self._load_modules() self.schedule.functions = self.functions self.schedule.returners = self.returners + + process_count_max = self.opts.get('process_count_max') + if process_count_max > 0: + process_count = len(salt.utils.minion.running(self.opts)) + while process_count >= process_count_max: + log.warn("Maximum number of processes reached while executing jid {0}, waiting...".format(data['jid'])) + yield tornado.gen.sleep(10) + process_count = len(salt.utils.minion.running(self.opts)) + # We stash an instance references to allow for the socket # communication in Windows. You can't pickle functions, and thus # python needs to be able to reconstruct the reference on the other diff --git a/tests/unit/test_minion.py b/tests/unit/test_minion.py index 535dfeedfc..6c9dca13cd 100644 --- a/tests/unit/test_minion.py +++ b/tests/unit/test_minion.py @@ -18,6 +18,7 @@ from salt.utils import event from salt.exceptions import SaltSystemExit import salt.syspaths import tornado +from salt.ext.six.moves import range __opts__ = {} @@ -69,7 +70,7 @@ class MinionTestCase(TestCase): mock_jid_queue = [123] try: minion = salt.minion.Minion(mock_opts, jid_queue=copy.copy(mock_jid_queue), io_loop=tornado.ioloop.IOLoop()) - ret = minion._handle_decoded_payload(mock_data) + ret = minion._handle_decoded_payload(mock_data).result() self.assertEqual(minion.jid_queue, mock_jid_queue) self.assertIsNone(ret) finally: @@ -98,7 +99,7 @@ class MinionTestCase(TestCase): # Call the _handle_decoded_payload function and update the mock_jid_queue to include the new # mock_jid. The mock_jid should have been added to the jid_queue since the mock_jid wasn't # previously included. The minion's jid_queue attribute and the mock_jid_queue should be equal. - minion._handle_decoded_payload(mock_data) + minion._handle_decoded_payload(mock_data).result() mock_jid_queue.append(mock_jid) self.assertEqual(minion.jid_queue, mock_jid_queue) finally: @@ -126,8 +127,54 @@ class MinionTestCase(TestCase): # Call the _handle_decoded_payload function and check that the queue is smaller by one item # and contains the new jid - minion._handle_decoded_payload(mock_data) + minion._handle_decoded_payload(mock_data).result() self.assertEqual(len(minion.jid_queue), 2) self.assertEqual(minion.jid_queue, [456, 789]) finally: minion.destroy() + + def test_process_count_max(self): + ''' + Tests that the _handle_decoded_payload function does not spawn more than the configured amount of processes, + as per process_count_max. + ''' + with patch('salt.minion.Minion.ctx', MagicMock(return_value={})), \ + patch('salt.utils.process.SignalHandlingMultiprocessingProcess.start', MagicMock(return_value=True)), \ + patch('salt.utils.process.SignalHandlingMultiprocessingProcess.join', MagicMock(return_value=True)), \ + patch('salt.utils.minion.running', MagicMock(return_value=[])), \ + patch('tornado.gen.sleep', MagicMock(return_value=tornado.concurrent.Future())): + process_count_max = 10 + mock_opts = salt.config.DEFAULT_MINION_OPTS + mock_opts['minion_jid_queue_hwm'] = 100 + mock_opts["process_count_max"] = process_count_max + + try: + io_loop = tornado.ioloop.IOLoop() + minion = salt.minion.Minion(mock_opts, jid_queue=[], io_loop=io_loop) + + # mock gen.sleep to throw a special Exception when called, so that we detect it + class SleepCalledEception(Exception): + """Thrown when sleep is called""" + pass + tornado.gen.sleep.return_value.set_exception(SleepCalledEception()) + + # up until process_count_max: gen.sleep does not get called, processes are started normally + for i in range(process_count_max): + mock_data = {'fun': 'foo.bar', + 'jid': i} + io_loop.run_sync(lambda data=mock_data: minion._handle_decoded_payload(data)) + self.assertEqual(salt.utils.process.SignalHandlingMultiprocessingProcess.start.call_count, i + 1) + self.assertEqual(len(minion.jid_queue), i + 1) + salt.utils.minion.running.return_value += [i] + + # above process_count_max: gen.sleep does get called, JIDs are created but no new processes are started + mock_data = {'fun': 'foo.bar', + 'jid': process_count_max + 1} + + self.assertRaises(SleepCalledEception, + lambda: io_loop.run_sync(lambda: minion._handle_decoded_payload(mock_data))) + self.assertEqual(salt.utils.process.SignalHandlingMultiprocessingProcess.start.call_count, + process_count_max) + self.assertEqual(len(minion.jid_queue), process_count_max + 1) + finally: + minion.destroy() -- 2.14.2