275 lines
11 KiB
Diff
275 lines
11 KiB
Diff
|
From 0cf1a655aa9353b22ae011e492a33aa52d780f83 Mon Sep 17 00:00:00 2001
|
||
|
From: Jochen Breuer <jbreuer@suse.de>
|
||
|
Date: Tue, 10 Mar 2020 14:02:17 +0100
|
||
|
Subject: [PATCH] Changed imports to vendored Tornado
|
||
|
|
||
|
---
|
||
|
salt/cli/batch_async.py | 26 ++++++++++++------------
|
||
|
salt/master.py | 2 +-
|
||
|
salt/transport/ipc.py | 4 ++--
|
||
|
tests/unit/cli/test_batch_async.py | 32 +++++++++++++++---------------
|
||
|
4 files changed, 32 insertions(+), 32 deletions(-)
|
||
|
|
||
|
diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py
|
||
|
index b8f272ed67..08eeb34f1c 100644
|
||
|
--- a/salt/cli/batch_async.py
|
||
|
+++ b/salt/cli/batch_async.py
|
||
|
@@ -6,7 +6,7 @@ Execute a job on the targeted minions by using a moving window of fixed size `ba
|
||
|
# Import python libs
|
||
|
from __future__ import absolute_import, print_function, unicode_literals
|
||
|
import gc
|
||
|
-import tornado
|
||
|
+import salt.ext.tornado
|
||
|
|
||
|
# Import salt libs
|
||
|
import salt.client
|
||
|
@@ -50,7 +50,7 @@ class BatchAsync(object):
|
||
|
}
|
||
|
'''
|
||
|
def __init__(self, parent_opts, jid_gen, clear_load):
|
||
|
- ioloop = tornado.ioloop.IOLoop.current()
|
||
|
+ ioloop = salt.ext.tornado.ioloop.IOLoop.current()
|
||
|
self.local = salt.client.get_local_client(parent_opts['conf_file'], io_loop=ioloop)
|
||
|
if 'gather_job_timeout' in clear_load['kwargs']:
|
||
|
clear_load['gather_job_timeout'] = clear_load['kwargs'].pop('gather_job_timeout')
|
||
|
@@ -152,7 +152,7 @@ class BatchAsync(object):
|
||
|
self.find_job_returned = self.find_job_returned.difference(running)
|
||
|
self.event.io_loop.spawn_callback(self.find_job, running)
|
||
|
|
||
|
- @tornado.gen.coroutine
|
||
|
+ @salt.ext.tornado.gen.coroutine
|
||
|
def find_job(self, minions):
|
||
|
if self.event:
|
||
|
not_done = minions.difference(self.done_minions).difference(self.timedout_minions)
|
||
|
@@ -170,7 +170,7 @@ class BatchAsync(object):
|
||
|
gather_job_timeout=self.opts['gather_job_timeout'],
|
||
|
jid=jid,
|
||
|
**self.eauth)
|
||
|
- yield tornado.gen.sleep(self.opts['gather_job_timeout'])
|
||
|
+ yield salt.ext.tornado.gen.sleep(self.opts['gather_job_timeout'])
|
||
|
if self.event:
|
||
|
self.event.io_loop.spawn_callback(
|
||
|
self.check_find_job,
|
||
|
@@ -180,7 +180,7 @@ class BatchAsync(object):
|
||
|
log.error("Exception occured handling batch async: {}. Aborting execution.".format(ex))
|
||
|
self.close_safe()
|
||
|
|
||
|
- @tornado.gen.coroutine
|
||
|
+ @salt.ext.tornado.gen.coroutine
|
||
|
def start(self):
|
||
|
if self.event:
|
||
|
self.__set_event_handler()
|
||
|
@@ -198,11 +198,11 @@ class BatchAsync(object):
|
||
|
**self.eauth)
|
||
|
self.targeted_minions = set(ping_return['minions'])
|
||
|
#start batching even if not all minions respond to ping
|
||
|
- yield tornado.gen.sleep(self.batch_presence_ping_timeout or self.opts['gather_job_timeout'])
|
||
|
+ yield salt.ext.tornado.gen.sleep(self.batch_presence_ping_timeout or self.opts['gather_job_timeout'])
|
||
|
if self.event:
|
||
|
self.event.io_loop.spawn_callback(self.start_batch)
|
||
|
|
||
|
- @tornado.gen.coroutine
|
||
|
+ @salt.ext.tornado.gen.coroutine
|
||
|
def start_batch(self):
|
||
|
if not self.initialized:
|
||
|
self.batch_size = get_bnum(self.opts, self.minions, True)
|
||
|
@@ -216,7 +216,7 @@ class BatchAsync(object):
|
||
|
if self.event:
|
||
|
self.event.io_loop.spawn_callback(self.run_next)
|
||
|
|
||
|
- @tornado.gen.coroutine
|
||
|
+ @salt.ext.tornado.gen.coroutine
|
||
|
def end_batch(self):
|
||
|
left = self.minions.symmetric_difference(self.done_minions.union(self.timedout_minions))
|
||
|
if not left and not self.ended:
|
||
|
@@ -232,7 +232,7 @@ class BatchAsync(object):
|
||
|
|
||
|
# release to the IOLoop to allow the event to be published
|
||
|
# before closing batch async execution
|
||
|
- yield tornado.gen.sleep(1)
|
||
|
+ yield salt.ext.tornado.gen.sleep(1)
|
||
|
self.close_safe()
|
||
|
|
||
|
def close_safe(self):
|
||
|
@@ -245,16 +245,16 @@ class BatchAsync(object):
|
||
|
del self
|
||
|
gc.collect()
|
||
|
|
||
|
- @tornado.gen.coroutine
|
||
|
+ @salt.ext.tornado.gen.coroutine
|
||
|
def schedule_next(self):
|
||
|
if not self.scheduled:
|
||
|
self.scheduled = True
|
||
|
# call later so that we maybe gather more returns
|
||
|
- yield tornado.gen.sleep(self.batch_delay)
|
||
|
+ yield salt.ext.tornado.gen.sleep(self.batch_delay)
|
||
|
if self.event:
|
||
|
self.event.io_loop.spawn_callback(self.run_next)
|
||
|
|
||
|
- @tornado.gen.coroutine
|
||
|
+ @salt.ext.tornado.gen.coroutine
|
||
|
def run_next(self):
|
||
|
self.scheduled = False
|
||
|
next_batch = self._get_next()
|
||
|
@@ -272,7 +272,7 @@ class BatchAsync(object):
|
||
|
jid=self.batch_jid,
|
||
|
metadata=self.metadata)
|
||
|
|
||
|
- yield tornado.gen.sleep(self.opts['timeout'])
|
||
|
+ yield salt.ext.tornado.gen.sleep(self.opts['timeout'])
|
||
|
|
||
|
# The batch can be done already at this point, which means no self.event
|
||
|
if self.event:
|
||
|
diff --git a/salt/master.py b/salt/master.py
|
||
|
index 3abf7ae60b..3a9d12999d 100644
|
||
|
--- a/salt/master.py
|
||
|
+++ b/salt/master.py
|
||
|
@@ -2049,7 +2049,7 @@ class ClearFuncs(object):
|
||
|
functools.partial(self._prep_jid, clear_load, {}),
|
||
|
batch_load
|
||
|
)
|
||
|
- ioloop = tornado.ioloop.IOLoop.current()
|
||
|
+ ioloop = salt.ext.tornado.ioloop.IOLoop.current()
|
||
|
ioloop.add_callback(batch.start)
|
||
|
|
||
|
return {
|
||
|
diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py
|
||
|
index d2b295a633..33ee3d4182 100644
|
||
|
--- a/salt/transport/ipc.py
|
||
|
+++ b/salt/transport/ipc.py
|
||
|
@@ -697,7 +697,7 @@ class IPCMessageSubscriber(IPCClient):
|
||
|
for callback in self.callbacks:
|
||
|
self.io_loop.spawn_callback(callback, raw)
|
||
|
|
||
|
- @tornado.gen.coroutine
|
||
|
+ @salt.ext.tornado.gen.coroutine
|
||
|
def read_async(self):
|
||
|
'''
|
||
|
Asynchronously read messages and invoke a callback when they are ready.
|
||
|
@@ -712,7 +712,7 @@ class IPCMessageSubscriber(IPCClient):
|
||
|
yield salt.ext.tornado.gen.sleep(1)
|
||
|
except Exception as exc: # pylint: disable=broad-except
|
||
|
log.error('Exception occurred while Subscriber connecting: %s', exc)
|
||
|
- yield tornado.gen.sleep(1)
|
||
|
+ yield salt.ext.tornado.gen.sleep(1)
|
||
|
yield self._read(None, self.__run_callbacks)
|
||
|
|
||
|
def close(self):
|
||
|
diff --git a/tests/unit/cli/test_batch_async.py b/tests/unit/cli/test_batch_async.py
|
||
|
index e1ce60859b..635dc689a8 100644
|
||
|
--- a/tests/unit/cli/test_batch_async.py
|
||
|
+++ b/tests/unit/cli/test_batch_async.py
|
||
|
@@ -5,8 +5,8 @@ from __future__ import absolute_import
|
||
|
# Import Salt Libs
|
||
|
from salt.cli.batch_async import BatchAsync
|
||
|
|
||
|
-import tornado
|
||
|
-from tornado.testing import AsyncTestCase
|
||
|
+import salt.ext.tornado
|
||
|
+from salt.ext.tornado.testing import AsyncTestCase
|
||
|
from tests.support.unit import skipIf, TestCase
|
||
|
from tests.support.mock import MagicMock, patch, NO_MOCK, NO_MOCK_REASON
|
||
|
|
||
|
@@ -59,10 +59,10 @@ class AsyncBatchTestCase(AsyncTestCase, TestCase):
|
||
|
self.batch.start_batch()
|
||
|
self.assertEqual(self.batch.batch_size, 2)
|
||
|
|
||
|
- @tornado.testing.gen_test
|
||
|
+ @salt.ext.tornado.testing.gen_test
|
||
|
def test_batch_start_on_batch_presence_ping_timeout(self):
|
||
|
self.batch.event = MagicMock()
|
||
|
- future = tornado.gen.Future()
|
||
|
+ future = salt.ext.tornado.gen.Future()
|
||
|
future.set_result({'minions': ['foo', 'bar']})
|
||
|
self.batch.local.run_job_async.return_value = future
|
||
|
ret = self.batch.start()
|
||
|
@@ -78,10 +78,10 @@ class AsyncBatchTestCase(AsyncTestCase, TestCase):
|
||
|
# assert targeted_minions == all minions matched by tgt
|
||
|
self.assertEqual(self.batch.targeted_minions, set(['foo', 'bar']))
|
||
|
|
||
|
- @tornado.testing.gen_test
|
||
|
+ @salt.ext.tornado.testing.gen_test
|
||
|
def test_batch_start_on_gather_job_timeout(self):
|
||
|
self.batch.event = MagicMock()
|
||
|
- future = tornado.gen.Future()
|
||
|
+ future = salt.ext.tornado.gen.Future()
|
||
|
future.set_result({'minions': ['foo', 'bar']})
|
||
|
self.batch.local.run_job_async.return_value = future
|
||
|
self.batch.batch_presence_ping_timeout = None
|
||
|
@@ -109,7 +109,7 @@ class AsyncBatchTestCase(AsyncTestCase, TestCase):
|
||
|
)
|
||
|
)
|
||
|
|
||
|
- @tornado.testing.gen_test
|
||
|
+ @salt.ext.tornado.testing.gen_test
|
||
|
def test_start_batch_calls_next(self):
|
||
|
self.batch.run_next = MagicMock(return_value=MagicMock())
|
||
|
self.batch.event = MagicMock()
|
||
|
@@ -165,14 +165,14 @@ class AsyncBatchTestCase(AsyncTestCase, TestCase):
|
||
|
self.assertEqual(
|
||
|
len(event.remove_event_handler.mock_calls), 1)
|
||
|
|
||
|
- @tornado.testing.gen_test
|
||
|
+ @salt.ext.tornado.testing.gen_test
|
||
|
def test_batch_next(self):
|
||
|
self.batch.event = MagicMock()
|
||
|
self.batch.opts['fun'] = 'my.fun'
|
||
|
self.batch.opts['arg'] = []
|
||
|
self.batch._get_next = MagicMock(return_value={'foo', 'bar'})
|
||
|
self.batch.batch_size = 2
|
||
|
- future = tornado.gen.Future()
|
||
|
+ future = salt.ext.tornado.gen.Future()
|
||
|
future.set_result({'minions': ['foo', 'bar']})
|
||
|
self.batch.local.run_job_async.return_value = future
|
||
|
self.batch.run_next()
|
||
|
@@ -284,38 +284,38 @@ class AsyncBatchTestCase(AsyncTestCase, TestCase):
|
||
|
self.batch._BatchAsync__event_handler(MagicMock())
|
||
|
self.assertEqual(self.batch.find_job_returned, {'foo'})
|
||
|
|
||
|
- @tornado.testing.gen_test
|
||
|
+ @salt.ext.tornado.testing.gen_test
|
||
|
def test_batch_run_next_end_batch_when_no_next(self):
|
||
|
self.batch.end_batch = MagicMock()
|
||
|
self.batch._get_next = MagicMock(return_value={})
|
||
|
self.batch.run_next()
|
||
|
self.assertEqual(len(self.batch.end_batch.mock_calls), 1)
|
||
|
|
||
|
- @tornado.testing.gen_test
|
||
|
+ @salt.ext.tornado.testing.gen_test
|
||
|
def test_batch_find_job(self):
|
||
|
self.batch.event = MagicMock()
|
||
|
- future = tornado.gen.Future()
|
||
|
+ future = salt.ext.tornado.gen.Future()
|
||
|
future.set_result({})
|
||
|
self.batch.local.run_job_async.return_value = future
|
||
|
self.batch.minions = set(['foo', 'bar'])
|
||
|
self.batch.jid_gen = MagicMock(return_value="1234")
|
||
|
- tornado.gen.sleep = MagicMock(return_value=future)
|
||
|
+ salt.ext.tornado.gen.sleep = MagicMock(return_value=future)
|
||
|
self.batch.find_job({'foo', 'bar'})
|
||
|
self.assertEqual(
|
||
|
self.batch.event.io_loop.spawn_callback.call_args[0],
|
||
|
(self.batch.check_find_job, {'foo', 'bar'}, "1234")
|
||
|
)
|
||
|
|
||
|
- @tornado.testing.gen_test
|
||
|
+ @salt.ext.tornado.testing.gen_test
|
||
|
def test_batch_find_job_with_done_minions(self):
|
||
|
self.batch.done_minions = {'bar'}
|
||
|
self.batch.event = MagicMock()
|
||
|
- future = tornado.gen.Future()
|
||
|
+ future = salt.ext.tornado.gen.Future()
|
||
|
future.set_result({})
|
||
|
self.batch.local.run_job_async.return_value = future
|
||
|
self.batch.minions = set(['foo', 'bar'])
|
||
|
self.batch.jid_gen = MagicMock(return_value="1234")
|
||
|
- tornado.gen.sleep = MagicMock(return_value=future)
|
||
|
+ salt.ext.tornado.gen.sleep = MagicMock(return_value=future)
|
||
|
self.batch.find_job({'foo', 'bar'})
|
||
|
self.assertEqual(
|
||
|
self.batch.event.io_loop.spawn_callback.call_args[0],
|
||
|
--
|
||
|
2.23.0
|
||
|
|
||
|
|