From 2c4ea84af38aca525e516e0391ea831d1fe6f611 Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Mon, 23 Jul 2018 10:51:41 +0200 Subject: [PATCH] Add support for Python 3.7 Rename module to full wording Fix imports Fix docstring typo Fix CLI config Fix comments Fix docstrings Rename async function to asynchronous Change internal function signatures to avoid reserved word Remove internal variables/properties with the reserved words Fix local opts from CLI Fix log error/info/warning and exception messages Cleanup docstrings at module level Fix function signatures in Cassandra module Lintfix: PEP8 requires two empty lines Deprecate 'async' parameter in Mandrill API Revert api call: it is about "functionname_async" suffix. Add 'async' backward compatibility Update docstring Use kwargs instead of directly named parameters Support original API Fix nag-message Keep runner API unchanged fix unicode literals Remove async keyword, moving it into the kwargs. Fix configuration setting --- salt/client/__init__.py | 2 +- salt/client/api.py | 6 ++--- salt/client/mixins.py | 4 ++-- salt/cloud/clouds/msazure.py | 2 +- salt/cloud/clouds/profitbricks.py | 2 +- salt/cloud/clouds/xen.py | 2 +- salt/daemons/masterapi.py | 6 ++--- salt/engines/slack.py | 4 ++-- salt/master.py | 6 ++--- salt/minion.py | 4 ++-- salt/modules/cassandra_cql.py | 22 ++++++++++--------- salt/modules/mandrill.py | 21 ++++++++++++------ salt/modules/saltutil.py | 6 +++-- salt/netapi/rest_cherrypy/app.py | 4 ++-- salt/netapi/rest_cherrypy/event_processor.py | 2 +- salt/netapi/rest_tornado/event_processor.py | 2 +- salt/netapi/rest_tornado/saltnado.py | 8 +++---- .../rest_tornado/saltnado_websockets.py | 2 +- salt/returners/cassandra_cql_return.py | 8 +++---- salt/runner.py | 10 ++++----- salt/thorium/runner.py | 6 ++--- salt/thorium/wheel.py | 4 ++-- salt/transport/client.py | 2 +- salt/transport/ipc.py | 10 ++++----- salt/transport/server.py | 2 +- salt/transport/tcp.py | 16 +++++++------- salt/utils/{async.py => asynchronous.py} | 22 +++++++++---------- salt/utils/event.py | 18 +++++++-------- salt/utils/process.py | 4 ++-- salt/utils/thin.py | 2 +- salt/wheel/__init__.py | 4 ++-- .../files/engines/runtests_engine.py | 4 ++-- .../netapi/rest_tornado/test_app.py | 2 +- tests/support/case.py | 10 +++++---- tests/unit/utils/test_async.py | 20 ++++++++--------- 35 files changed, 131 insertions(+), 118 deletions(-) rename salt/utils/{async.py => asynchronous.py} (81%) diff --git a/salt/client/__init__.py b/salt/client/__init__.py index 9bf8e32491..dcbc1473e1 100644 --- a/salt/client/__init__.py +++ b/salt/client/__init__.py @@ -284,7 +284,7 @@ class LocalClient(object): 'No command was sent, no jid was assigned.') return {} - # don't install event subscription listeners when the request is async + # don't install event subscription listeners when the request is asynchronous # and doesn't care. this is important as it will create event leaks otherwise if not listen: return pub_data diff --git a/salt/client/api.py b/salt/client/api.py index ac6f6de24a..b2aab460fa 100644 --- a/salt/client/api.py +++ b/salt/client/api.py @@ -93,7 +93,7 @@ class APIClient(object): The cmd dict items are as follows: - mode: either 'sync' or 'async'. Defaults to 'async' if missing + mode: either 'sync' or 'asynchronous'. Defaults to 'asynchronous' if missing fun: required. If the function is to be run on the master using either a wheel or runner client then the fun: includes either 'wheel.' or 'runner.' as a prefix and has three parts separated by '.'. @@ -120,7 +120,7 @@ class APIClient(object): ''' cmd = dict(cmd) # make copy client = 'minion' # default to local minion client - mode = cmd.get('mode', 'async') # default to 'async' + mode = cmd.get('mode', 'async') # check for wheel or runner prefix to fun name to use wheel or runner client funparts = cmd.get('fun', '').split('.') @@ -162,7 +162,7 @@ class APIClient(object): ''' return self.runnerClient.master_call(**kwargs) - runner_sync = runner_async # always runner async, so works in either mode + runner_sync = runner_async # always runner asynchronous, so works in either mode def wheel_sync(self, **kwargs): ''' diff --git a/salt/client/mixins.py b/salt/client/mixins.py index 29b6077661..4182fa5b81 100644 --- a/salt/client/mixins.py +++ b/salt/client/mixins.py @@ -458,7 +458,7 @@ class SyncClientMixin(object): class AsyncClientMixin(object): ''' - A mixin for *Client interfaces to enable easy async function execution + A mixin for *Client interfaces to enable easy asynchronous function execution ''' client = None tag_prefix = None @@ -510,7 +510,7 @@ class AsyncClientMixin(object): tag = salt.utils.event.tagify(jid, prefix=self.tag_prefix) return {'tag': tag, 'jid': jid} - def async(self, fun, low, user='UNKNOWN', pub=None): + def asynchronous(self, fun, low, user='UNKNOWN', pub=None): ''' Execute the function in a multiprocess and return the event tag to use to watch for the return diff --git a/salt/cloud/clouds/msazure.py b/salt/cloud/clouds/msazure.py index aa5cd14255..4a95c3af96 100644 --- a/salt/cloud/clouds/msazure.py +++ b/salt/cloud/clouds/msazure.py @@ -888,7 +888,7 @@ def _wait_for_async(conn, request_id): while result.status == 'InProgress': count = count + 1 if count > 120: - raise ValueError('Timed out waiting for async operation to complete.') + raise ValueError('Timed out waiting for asynchronous operation to complete.') time.sleep(5) result = conn.get_operation_status(request_id) diff --git a/salt/cloud/clouds/profitbricks.py b/salt/cloud/clouds/profitbricks.py index 1ce0a162f0..8d13bf7b70 100644 --- a/salt/cloud/clouds/profitbricks.py +++ b/salt/cloud/clouds/profitbricks.py @@ -1098,7 +1098,7 @@ def _wait_for_completion(conn, promise, wait_timeout, msg): ) raise Exception( - 'Timed out waiting for async operation {0} "{1}" to complete.'.format( + 'Timed out waiting for asynchronous operation {0} "{1}" to complete.'.format( msg, six.text_type(promise['requestId']) ) ) diff --git a/salt/cloud/clouds/xen.py b/salt/cloud/clouds/xen.py index 0b79d4dfb9..6f23b813a7 100644 --- a/salt/cloud/clouds/xen.py +++ b/salt/cloud/clouds/xen.py @@ -719,7 +719,7 @@ def _wait_for_ip(name, session): def _run_async_task(task=None, session=None): ''' - Run XenAPI task in async mode to prevent timeouts + Run XenAPI task in asynchronous mode to prevent timeouts ''' if task is None or session is None: return None diff --git a/salt/daemons/masterapi.py b/salt/daemons/masterapi.py index 84537fab3b..62dd0cd1ea 100644 --- a/salt/daemons/masterapi.py +++ b/salt/daemons/masterapi.py @@ -1068,9 +1068,9 @@ class LocalFuncs(object): try: fun = load.pop('fun') runner_client = salt.runner.RunnerClient(self.opts) - return runner_client.async(fun, - load.get('kwarg', {}), - username) + return runner_client.asynchronous(fun, + load.get('kwarg', {}), + username) except Exception as exc: log.exception('Exception occurred while introspecting %s') return {'error': {'name': exc.__class__.__name__, diff --git a/salt/engines/slack.py b/salt/engines/slack.py index e664bbee03..c35435e42e 100644 --- a/salt/engines/slack.py +++ b/salt/engines/slack.py @@ -740,7 +740,7 @@ class SlackClient(object): :param interval: time to wait between ending a loop and beginning the next ''' - log.debug('Going to run a command async') + log.debug('Going to run a command asynchronous') runner_functions = sorted(salt.runner.Runner(__opts__).functions) # Parse args and kwargs cmd = msg['cmdline'][0] @@ -762,7 +762,7 @@ class SlackClient(object): log.debug('Command %s will run via runner_functions', cmd) # pylint is tripping # pylint: disable=missing-whitespace-after-comma - job_id_dict = runner.async(cmd, {'args': args, 'kwargs': kwargs}) + job_id_dict = runner.asynchronous(cmd, {'args': args, 'kwargs': kwargs}) job_id = job_id_dict['jid'] # Default to trying to run as a client module. diff --git a/salt/master.py b/salt/master.py index e400054d72..86b639dd5b 100644 --- a/salt/master.py +++ b/salt/master.py @@ -1878,9 +1878,9 @@ class ClearFuncs(object): try: fun = clear_load.pop('fun') runner_client = salt.runner.RunnerClient(self.opts) - return runner_client.async(fun, - clear_load.get('kwarg', {}), - username) + return runner_client.asynchronous(fun, + clear_load.get('kwarg', {}), + username) except Exception as exc: log.error('Exception occurred while introspecting %s: %s', fun, exc) return {'error': {'name': exc.__class__.__name__, diff --git a/salt/minion.py b/salt/minion.py index 0a6771dccd..17e11c0ebe 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -926,7 +926,7 @@ class MinionManager(MinionBase): install_zmq() self.io_loop = ZMQDefaultLoop.current() self.process_manager = ProcessManager(name='MultiMinionProcessManager') - self.io_loop.spawn_callback(self.process_manager.run, async=True) + self.io_loop.spawn_callback(self.process_manager.run, **{'async': True}) # Tornado backward compat def __del__(self): self.destroy() @@ -1123,7 +1123,7 @@ class Minion(MinionBase): time.sleep(sleep_time) self.process_manager = ProcessManager(name='MinionProcessManager') - self.io_loop.spawn_callback(self.process_manager.run, async=True) + self.io_loop.spawn_callback(self.process_manager.run, **{'async': True}) # We don't have the proxy setup yet, so we can't start engines # Engines need to be able to access __proxy__ if not salt.utils.platform.is_proxy(): diff --git a/salt/modules/cassandra_cql.py b/salt/modules/cassandra_cql.py index 82b211bddf..30db93dccc 100644 --- a/salt/modules/cassandra_cql.py +++ b/salt/modules/cassandra_cql.py @@ -93,6 +93,7 @@ from salt.exceptions import CommandExecutionError # Import 3rd-party libs from salt.ext import six from salt.ext.six.moves import range +import salt.utils.versions SSL_VERSION = 'ssl_version' @@ -128,7 +129,7 @@ def __virtual__(): def _async_log_errors(errors): - log.error('Cassandra_cql async call returned: %s', errors) + log.error('Cassandra_cql asynchronous call returned: %s', errors) def _load_properties(property_name, config_option, set_default=False, default=None): @@ -361,9 +362,8 @@ def cql_query(query, contact_points=None, port=None, cql_user=None, cql_pass=Non return ret -def cql_query_with_prepare(query, statement_name, statement_arguments, async=False, - callback_errors=None, - contact_points=None, port=None, cql_user=None, cql_pass=None): +def cql_query_with_prepare(query, statement_name, statement_arguments, callback_errors=None, contact_points=None, + port=None, cql_user=None, cql_pass=None, **kwargs): ''' Run a query on a Cassandra cluster and return a dictionary. @@ -377,8 +377,8 @@ def cql_query_with_prepare(query, statement_name, statement_arguments, async=Fal :type statement_name: str :param statement_arguments: Bind parameters for the SQL statement :type statement_arguments: list[str] - :param async: Run this query in asynchronous mode - :type async: bool + :param async: Run this query in asynchronous mode + :type async: bool :param callback_errors: Function to call after query runs if there is an error :type callback_errors: Function callable :param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs. @@ -401,12 +401,14 @@ def cql_query_with_prepare(query, statement_name, statement_arguments, async=Fal # Insert data asynchronously salt this-node cassandra_cql.cql_query_with_prepare "name_insert" "INSERT INTO USERS (first_name, last_name) VALUES (?, ?)" \ - statement_arguments=['John','Doe'], async=True + statement_arguments=['John','Doe'], asynchronous=True # Select data, should not be asynchronous because there is not currently a facility to return data from a future salt this-node cassandra_cql.cql_query_with_prepare "name_select" "SELECT * FROM USERS WHERE first_name=?" \ statement_arguments=['John'] ''' + # Backward-compatibility with Python 3.7: "async" is a reserved word + asynchronous = kwargs.get('async', False) try: cluster, session = _connect(contact_points=contact_points, port=port, cql_user=cql_user, cql_pass=cql_pass) @@ -431,7 +433,7 @@ def cql_query_with_prepare(query, statement_name, statement_arguments, async=Fal ret = [] try: - if async: + if asynchronous: future_results = session.execute_async(bound_statement.bind(statement_arguments)) # future_results.add_callbacks(_async_log_errors) else: @@ -441,7 +443,7 @@ def cql_query_with_prepare(query, statement_name, statement_arguments, async=Fal msg = "ERROR: Cassandra query failed: {0} reason: {1}".format(query, e) raise CommandExecutionError(msg) - if not async and results: + if not asynchronous and results: for result in results: values = {} for key, value in six.iteritems(result): @@ -456,7 +458,7 @@ def cql_query_with_prepare(query, statement_name, statement_arguments, async=Fal # If this was a synchronous call, then we either have an empty list # because there was no return, or we have a return - # If this was an async call we only return the empty list + # If this was an asynchronous call we only return the empty list return ret diff --git a/salt/modules/mandrill.py b/salt/modules/mandrill.py index 248939d09c..7044060154 100644 --- a/salt/modules/mandrill.py +++ b/salt/modules/mandrill.py @@ -24,6 +24,7 @@ import logging # Import Salt libs import salt.utils.json +import salt.utils.versions # import third party try: @@ -137,12 +138,13 @@ def _http_request(url, def send(message, - async=False, + asynchronous=False, ip_pool=None, send_at=None, api_url=None, api_version=None, - api_key=None): + api_key=None, + **kwargs): ''' Send out the email using the details from the ``message`` argument. @@ -151,14 +153,14 @@ def send(message, sent as dictionary with at fields as specified in the Mandrill API documentation. - async: ``False`` + asynchronous: ``False`` Enable a background sending mode that is optimized for bulk sending. - In async mode, messages/send will immediately return a status of - "queued" for every recipient. To handle rejections when sending in async + In asynchronous mode, messages/send will immediately return a status of + "queued" for every recipient. To handle rejections when sending in asynchronous mode, set up a webhook for the 'reject' event. Defaults to false for messages with no more than 10 recipients; messages with more than 10 recipients are always sent asynchronously, regardless of the value of - async. + asynchronous. ip_pool The name of the dedicated ip pool that should be used to send the @@ -229,6 +231,11 @@ def send(message, result: True ''' + if 'async' in kwargs: # Remove this in Sodium + salt.utils.versions.warn_until('Sodium', 'Parameter "async" is renamed to "asynchronous" ' + 'and will be removed in version {version}.') + asynchronous = bool(kwargs['async']) + params = _get_api_params(api_url=api_url, api_version=api_version, api_key=api_key) @@ -238,7 +245,7 @@ def send(message, data = { 'key': params['api_key'], 'message': message, - 'async': async, + 'async': asynchronous, 'ip_pool': ip_pool, 'send_at': send_at } diff --git a/salt/modules/saltutil.py b/salt/modules/saltutil.py index 9cb27858d1..2c152e3ff1 100644 --- a/salt/modules/saltutil.py +++ b/salt/modules/saltutil.py @@ -947,10 +947,11 @@ def refresh_pillar(): ret = False # Effectively a no-op, since we can't really return without an event system return ret + pillar_refresh = salt.utils.functools.alias_function(refresh_pillar, 'pillar_refresh') -def refresh_modules(async=True): +def refresh_modules(**kwargs): ''' Signal the minion to refresh the module and grain data @@ -964,8 +965,9 @@ def refresh_modules(async=True): salt '*' saltutil.refresh_modules ''' + asynchronous = bool(kwargs.get('async', True)) try: - if async: + if asynchronous: # If we're going to block, first setup a listener ret = __salt__['event.fire']({}, 'module_refresh') else: diff --git a/salt/netapi/rest_cherrypy/app.py b/salt/netapi/rest_cherrypy/app.py index 077ccce0be..78ea3c3fef 100644 --- a/salt/netapi/rest_cherrypy/app.py +++ b/salt/netapi/rest_cherrypy/app.py @@ -529,7 +529,7 @@ described above, the most effective and most scalable way to use both Salt and salt-api is to run commands asynchronously using the ``local_async``, ``runner_async``, and ``wheel_async`` clients. -Running async jobs results in being able to process 3x more commands per second +Running asynchronous jobs results in being able to process 3x more commands per second for ``LocalClient`` and 17x more commands per second for ``RunnerClient``, in addition to much less network traffic and memory requirements. Job returns can be fetched from Salt's job cache via the ``/jobs/`` endpoint, or they can @@ -2534,7 +2534,7 @@ class WebsocketEndpoint(object): parent_pipe, child_pipe = Pipe() handler.pipe = parent_pipe handler.opts = self.opts - # Process to handle async push to a client. + # Process to handle asynchronous push to a client. # Each GET request causes a process to be kicked off. proc = Process(target=event_stream, args=(handler, child_pipe)) proc.start() diff --git a/salt/netapi/rest_cherrypy/event_processor.py b/salt/netapi/rest_cherrypy/event_processor.py index e409a00180..f0cf6d361a 100644 --- a/salt/netapi/rest_cherrypy/event_processor.py +++ b/salt/netapi/rest_cherrypy/event_processor.py @@ -180,7 +180,7 @@ class SaltInfo(object): 'expr_type': 'list', 'mode': 'client', 'client': 'local', - 'async': 'local_async', + 'asynchronous': 'local_async', 'token': token, }) diff --git a/salt/netapi/rest_tornado/event_processor.py b/salt/netapi/rest_tornado/event_processor.py index d8c338836e..70a379e2c5 100644 --- a/salt/netapi/rest_tornado/event_processor.py +++ b/salt/netapi/rest_tornado/event_processor.py @@ -194,7 +194,7 @@ class SaltInfo(object): 'expr_type': 'list', 'mode': 'client', 'client': 'local', - 'async': 'local_async', + 'asynchronous': 'local_async', 'token': token, }) diff --git a/salt/netapi/rest_tornado/saltnado.py b/salt/netapi/rest_tornado/saltnado.py index 2da44960c8..7942033c59 100644 --- a/salt/netapi/rest_tornado/saltnado.py +++ b/salt/netapi/rest_tornado/saltnado.py @@ -244,7 +244,7 @@ def _json_dumps(obj, **kwargs): # # master side # - "runner" (done) -# - "wheel" (need async api...) +# - "wheel" (need asynchronous api...) AUTH_TOKEN_HEADER = 'X-Auth-Token' @@ -273,7 +273,7 @@ class Any(Future): class EventListener(object): ''' Class responsible for listening to the salt master event bus and updating - futures. This is the core of what makes this async, this allows us to do + futures. This is the core of what makes this asynchronous, this allows us to do non-blocking work in the main processes and "wait" for an event to happen ''' @@ -336,7 +336,7 @@ class EventListener(object): timeout=None ): ''' - Get an event (async of course) return a future that will get it later + Get an event (asynchronous of course) return a future that will get it later ''' # if the request finished, no reason to allow event fetching, since we # can't send back to the client @@ -653,7 +653,7 @@ class SaltAuthHandler(BaseSaltAPIHandler): # pylint: disable=W0223 self.write(self.serialize(ret)) - # TODO: make async? Underlying library isn't... and we ARE making disk calls :( + # TODO: make asynchronous? Underlying library isn't... and we ARE making disk calls :( def post(self): ''' :ref:`Authenticate ` against Salt's eauth system diff --git a/salt/netapi/rest_tornado/saltnado_websockets.py b/salt/netapi/rest_tornado/saltnado_websockets.py index 89cdfd039a..cf6d51852f 100644 --- a/salt/netapi/rest_tornado/saltnado_websockets.py +++ b/salt/netapi/rest_tornado/saltnado_websockets.py @@ -411,7 +411,7 @@ class FormattedEventsHandler(AllEventsHandler): # pylint: disable=W0223,W0232 'tgt': '*', 'token': self.token, 'mode': 'client', - 'async': 'local_async', + 'asynchronous': 'local_async', 'client': 'local' }) while True: diff --git a/salt/returners/cassandra_cql_return.py b/salt/returners/cassandra_cql_return.py index 8e92e32147..0ec8c2db27 100644 --- a/salt/returners/cassandra_cql_return.py +++ b/salt/returners/cassandra_cql_return.py @@ -204,7 +204,7 @@ def returner(ret): __salt__['cassandra_cql.cql_query_with_prepare'](query, 'returner_return', tuple(statement_arguments), - async=True) + asynchronous=True) except CommandExecutionError: log.critical('Could not insert into salt_returns with Cassandra returner.') raise @@ -228,7 +228,7 @@ def returner(ret): __salt__['cassandra_cql.cql_query_with_prepare'](query, 'returner_minion', tuple(statement_arguments), - async=True) + asynchronous=True) except CommandExecutionError: log.critical('Could not store minion ID with Cassandra returner.') raise @@ -270,7 +270,7 @@ def event_return(events): try: __salt__['cassandra_cql.cql_query_with_prepare'](query, 'salt_events', statement_arguments, - async=True) + asynchronous=True) except CommandExecutionError: log.critical('Could not store events with Cassandra returner.') raise @@ -300,7 +300,7 @@ def save_load(jid, load, minions=None): try: __salt__['cassandra_cql.cql_query_with_prepare'](query, 'save_load', statement_arguments, - async=True) + asynchronous=True) except CommandExecutionError: log.critical('Could not save load in jids table.') raise diff --git a/salt/runner.py b/salt/runner.py index 188064665b..ec389a45b0 100644 --- a/salt/runner.py +++ b/salt/runner.py @@ -240,13 +240,13 @@ class Runner(RunnerClient): if self.opts.get('eauth'): async_pub = self.cmd_async(low) else: - async_pub = self.async(self.opts['fun'], - low, - user=user, - pub=async_pub) + async_pub = self.asynchronous(self.opts['fun'], + low, + user=user, + pub=async_pub) # by default: info will be not enougth to be printed out ! log.warning( - 'Running in async mode. Results of this execution may ' + 'Running in asynchronous mode. Results of this execution may ' 'be collected by attaching to the master event bus or ' 'by examing the master job cache, if configured. ' 'This execution is running under tag %s', async_pub['tag'] diff --git a/salt/thorium/runner.py b/salt/thorium/runner.py index d6235d40e7..9545eac35c 100644 --- a/salt/thorium/runner.py +++ b/salt/thorium/runner.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- ''' -React by calling async runners +React by calling asynchronous runners ''' # Import python libs from __future__ import absolute_import, print_function, unicode_literals @@ -14,7 +14,7 @@ def cmd( arg=(), **kwargs): ''' - Execute a runner async: + Execute a runner asynchronous: USAGE: @@ -42,7 +42,7 @@ def cmd( func = name local_opts = {} local_opts.update(__opts__) - local_opts['async'] = True # ensure this will be run async + local_opts['async'] = True # ensure this will be run asynchronous local_opts.update({ 'fun': func, 'arg': arg, diff --git a/salt/thorium/wheel.py b/salt/thorium/wheel.py index 7c98eff4bd..e3c4bf1701 100644 --- a/salt/thorium/wheel.py +++ b/salt/thorium/wheel.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- ''' -React by calling async runners +React by calling asynchronous runners ''' # Import python libs from __future__ import absolute_import, print_function, unicode_literals @@ -14,7 +14,7 @@ def cmd( arg=(), **kwargs): ''' - Execute a runner async: + Execute a runner asynchronous: USAGE: diff --git a/salt/transport/client.py b/salt/transport/client.py index 86c4962f94..ca83ac9376 100644 --- a/salt/transport/client.py +++ b/salt/transport/client.py @@ -10,7 +10,7 @@ from __future__ import absolute_import, print_function, unicode_literals import logging # Import Salt Libs -from salt.utils.async import SyncWrapper +from salt.utils.asynchronous import SyncWrapper log = logging.getLogger(__name__) diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py index 108e62da1f..a6e46e8eed 100644 --- a/salt/transport/ipc.py +++ b/salt/transport/ipc.py @@ -130,7 +130,7 @@ class IPCServer(object): else: self.sock = tornado.netutil.bind_unix_socket(self.socket_path) - with salt.utils.async.current_ioloop(self.io_loop): + with salt.utils.asynchronous.current_ioloop(self.io_loop): tornado.netutil.add_accept_handler( self.sock, self.handle_connection, @@ -196,7 +196,7 @@ class IPCServer(object): log.trace('IPCServer: Handling connection ' 'to address: %s', address) try: - with salt.utils.async.current_ioloop(self.io_loop): + with salt.utils.asynchronous.current_ioloop(self.io_loop): stream = IOStream( connection, ) @@ -329,7 +329,7 @@ class IPCClient(object): break if self.stream is None: - with salt.utils.async.current_ioloop(self.io_loop): + with salt.utils.asynchronous.current_ioloop(self.io_loop): self.stream = IOStream( socket.socket(sock_type, socket.SOCK_STREAM), ) @@ -510,7 +510,7 @@ class IPCMessagePublisher(object): else: self.sock = tornado.netutil.bind_unix_socket(self.socket_path) - with salt.utils.async.current_ioloop(self.io_loop): + with salt.utils.asynchronous.current_ioloop(self.io_loop): tornado.netutil.add_accept_handler( self.sock, self.handle_connection, @@ -549,7 +549,7 @@ class IPCMessagePublisher(object): if self.opts['ipc_write_buffer'] > 0: kwargs['max_write_buffer_size'] = self.opts['ipc_write_buffer'] log.trace('Setting IPC connection write buffer: %s', (self.opts['ipc_write_buffer'])) - with salt.utils.async.current_ioloop(self.io_loop): + with salt.utils.asynchronous.current_ioloop(self.io_loop): stream = IOStream( connection, **kwargs diff --git a/salt/transport/server.py b/salt/transport/server.py index 46c14bdb39..1d67dc98af 100644 --- a/salt/transport/server.py +++ b/salt/transport/server.py @@ -55,7 +55,7 @@ class ReqServerChannel(object): ''' Do anything you need post-fork. This should handle all incoming payloads and call payload_handler. You will also be passed io_loop, for all of your - async needs + asynchronous needs ''' pass diff --git a/salt/transport/tcp.py b/salt/transport/tcp.py index 4b9f14768a..d9c15773a9 100644 --- a/salt/transport/tcp.py +++ b/salt/transport/tcp.py @@ -19,7 +19,7 @@ import traceback # Import Salt Libs import salt.crypt -import salt.utils.async +import salt.utils.asynchronous import salt.utils.event import salt.utils.files import salt.utils.platform @@ -476,7 +476,7 @@ class AsyncTCPPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.tran 'tok': self.tok, 'data': data, 'tag': tag} - req_channel = salt.utils.async.SyncWrapper( + req_channel = salt.utils.asynchronous.SyncWrapper( AsyncTCPReqChannel, (self.opts,) ) try: @@ -603,7 +603,7 @@ class TCPReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.tra self.payload_handler = payload_handler self.io_loop = io_loop self.serial = salt.payload.Serial(self.opts) - with salt.utils.async.current_ioloop(self.io_loop): + with salt.utils.asynchronous.current_ioloop(self.io_loop): if USE_LOAD_BALANCER: self.req_server = LoadBalancerWorker(self.socket_queue, self.handle_message, @@ -869,7 +869,7 @@ class SaltMessageClient(object): self.io_loop = io_loop or tornado.ioloop.IOLoop.current() - with salt.utils.async.current_ioloop(self.io_loop): + with salt.utils.asynchronous.current_ioloop(self.io_loop): self._tcp_client = TCPClientKeepAlive(opts, resolver=resolver) self._mid = 1 @@ -895,7 +895,7 @@ class SaltMessageClient(object): if hasattr(self, '_stream') and not self._stream.closed(): # If _stream_return() hasn't completed, it means the IO # Loop is stopped (such as when using - # 'salt.utils.async.SyncWrapper'). Ensure that + # 'salt.utils.asynchronous.SyncWrapper'). Ensure that # _stream_return() completes by restarting the IO Loop. # This will prevent potential errors on shutdown. try: @@ -969,7 +969,7 @@ class SaltMessageClient(object): 'source_port': self.source_port} else: log.warning('If you need a certain source IP/port, consider upgrading Tornado >= 4.5') - with salt.utils.async.current_ioloop(self.io_loop): + with salt.utils.asynchronous.current_ioloop(self.io_loop): self._stream = yield self._tcp_client.connect(self.host, self.port, ssl_options=self.opts.get('ssl'), @@ -1441,9 +1441,9 @@ class TCPPubServerChannel(salt.transport.server.PubServerChannel): pull_uri = int(self.opts.get('tcp_master_publish_pull', 4514)) else: pull_uri = os.path.join(self.opts['sock_dir'], 'publish_pull.ipc') - # TODO: switch to the actual async interface + # TODO: switch to the actual asynchronous interface #pub_sock = salt.transport.ipc.IPCMessageClient(self.opts, io_loop=self.io_loop) - pub_sock = salt.utils.async.SyncWrapper( + pub_sock = salt.utils.asynchronous.SyncWrapper( salt.transport.ipc.IPCMessageClient, (pull_uri,) ) diff --git a/salt/utils/async.py b/salt/utils/asynchronous.py similarity index 81% rename from salt/utils/async.py rename to salt/utils/asynchronous.py index 55d21d0ccc..16a7088360 100644 --- a/salt/utils/async.py +++ b/salt/utils/asynchronous.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- ''' -Helpers/utils for working with tornado async stuff +Helpers/utils for working with tornado asynchronous stuff ''' from __future__ import absolute_import, print_function, unicode_literals @@ -30,9 +30,9 @@ class SyncWrapper(object): This is uses as a simple wrapper, for example: - async = AsyncClass() + asynchronous = AsyncClass() # this method would reguarly return a future - future = async.async_method() + future = asynchronous.async_method() sync = SyncWrapper(async_factory_method, (arg1, arg2), {'kwarg1': 'val'}) # the sync wrapper will automatically wait on the future @@ -46,15 +46,15 @@ class SyncWrapper(object): kwargs['io_loop'] = self.io_loop with current_ioloop(self.io_loop): - self.async = method(*args, **kwargs) + self.asynchronous = method(*args, **kwargs) def __getattribute__(self, key): try: return object.__getattribute__(self, key) except AttributeError as ex: - if key == 'async': + if key == 'asynchronous': raise ex - attr = getattr(self.async, key) + attr = getattr(self.asynchronous, key) if hasattr(attr, '__call__'): def wrap(*args, **kwargs): # Overload the ioloop for the func call-- since it might call .current() @@ -75,15 +75,15 @@ class SyncWrapper(object): def __del__(self): ''' - On deletion of the async wrapper, make sure to clean up the async stuff + On deletion of the asynchronous wrapper, make sure to clean up the asynchronous stuff ''' - if hasattr(self, 'async'): - if hasattr(self.async, 'close'): + if hasattr(self, 'asynchronous'): + if hasattr(self.asynchronous, 'close'): # Certain things such as streams should be closed before # their associated io_loop is closed to allow for proper # cleanup. - self.async.close() - del self.async + self.asynchronous.close() + del self.asynchronous self.io_loop.close() del self.io_loop elif hasattr(self, 'io_loop'): diff --git a/salt/utils/event.py b/salt/utils/event.py index 9a62b6c353..a2390730fe 100644 --- a/salt/utils/event.py +++ b/salt/utils/event.py @@ -72,7 +72,7 @@ import tornado.iostream # Import salt libs import salt.config import salt.payload -import salt.utils.async +import salt.utils.asynchronous import salt.utils.cache import salt.utils.dicttrim import salt.utils.files @@ -228,7 +228,7 @@ class SaltEvent(object): :param Bool keep_loop: Pass a boolean to determine if we want to keep the io loop or destroy it when the event handle is destroyed. This is useful when using event - loops from within third party async code + loops from within third party asynchronous code ''' self.serial = salt.payload.Serial({'serial': 'msgpack'}) self.keep_loop = keep_loop @@ -364,7 +364,7 @@ class SaltEvent(object): return True if self._run_io_loop_sync: - with salt.utils.async.current_ioloop(self.io_loop): + with salt.utils.asynchronous.current_ioloop(self.io_loop): if self.subscriber is None: self.subscriber = salt.transport.ipc.IPCMessageSubscriber( self.puburi, @@ -383,7 +383,7 @@ class SaltEvent(object): io_loop=self.io_loop ) - # For the async case, the connect will be defered to when + # For the asynchronous case, the connect will be defered to when # set_event_handler() is invoked. self.cpub = True return self.cpub @@ -409,7 +409,7 @@ class SaltEvent(object): return True if self._run_io_loop_sync: - with salt.utils.async.current_ioloop(self.io_loop): + with salt.utils.asynchronous.current_ioloop(self.io_loop): if self.pusher is None: self.pusher = salt.transport.ipc.IPCMessageClient( self.pulluri, @@ -427,7 +427,7 @@ class SaltEvent(object): self.pulluri, io_loop=self.io_loop ) - # For the async case, the connect will be deferred to when + # For the asynchronous case, the connect will be deferred to when # fire_event() is invoked. self.cpush = True return self.cpush @@ -632,7 +632,7 @@ class SaltEvent(object): ret = self._check_pending(tag, match_func) if ret is None: - with salt.utils.async.current_ioloop(self.io_loop): + with salt.utils.asynchronous.current_ioloop(self.io_loop): if auto_reconnect: raise_errors = self.raise_errors self.raise_errors = True @@ -743,7 +743,7 @@ class SaltEvent(object): serialized_data]) msg = salt.utils.stringutils.to_bytes(event, 'utf-8') if self._run_io_loop_sync: - with salt.utils.async.current_ioloop(self.io_loop): + with salt.utils.asynchronous.current_ioloop(self.io_loop): try: self.io_loop.run_sync(lambda: self.pusher.send(msg)) except Exception as ex: @@ -1083,7 +1083,7 @@ class EventPublisher(salt.utils.process.SignalHandlingMultiprocessingProcess): ''' salt.utils.process.appendproctitle(self.__class__.__name__) self.io_loop = tornado.ioloop.IOLoop() - with salt.utils.async.current_ioloop(self.io_loop): + with salt.utils.asynchronous.current_ioloop(self.io_loop): if self.opts['ipc_mode'] == 'tcp': epub_uri = int(self.opts['tcp_master_pub_port']) epull_uri = int(self.opts['tcp_master_pull_port']) diff --git a/salt/utils/process.py b/salt/utils/process.py index 20f7feee8a..95c2288da3 100644 --- a/salt/utils/process.py +++ b/salt/utils/process.py @@ -472,7 +472,7 @@ class ProcessManager(object): del self._process_map[pid] @gen.coroutine - def run(self, async=False): + def run(self, asynchronous=False): ''' Load and start all available api modules ''' @@ -495,7 +495,7 @@ class ProcessManager(object): # The event-based subprocesses management code was removed from here # because os.wait() conflicts with the subprocesses management logic # implemented in `multiprocessing` package. See #35480 for details. - if async: + if asynchronous: yield gen.sleep(10) else: time.sleep(10) diff --git a/salt/utils/thin.py b/salt/utils/thin.py index b99e407583..9a74b8d7d6 100644 --- a/salt/utils/thin.py +++ b/salt/utils/thin.py @@ -701,7 +701,7 @@ def gen_min(cachedir, extra_mods='', overwrite=False, so_mods='', 'salt/utils/openstack', 'salt/utils/openstack/__init__.py', 'salt/utils/openstack/swift.py', - 'salt/utils/async.py', + 'salt/utils/asynchronous.py', 'salt/utils/process.py', 'salt/utils/jinja.py', 'salt/utils/rsax931.py', diff --git a/salt/wheel/__init__.py b/salt/wheel/__init__.py index abfd776342..65092ef974 100644 --- a/salt/wheel/__init__.py +++ b/salt/wheel/__init__.py @@ -57,7 +57,7 @@ class WheelClient(salt.client.mixins.SyncClientMixin, return self.low(fun, kwargs, print_event=kwargs.get('print_event', True), full_return=kwargs.get('full_return', False)) # TODO: Inconsistent with runner client-- the runner client's master_call gives - # an async return, unlike this + # an asynchronous return, unlike this def master_call(self, **kwargs): ''' Execute a wheel function through the master network interface (eauth). @@ -120,7 +120,7 @@ class WheelClient(salt.client.mixins.SyncClientMixin, {'jid': '20131219224744416681', 'tag': 'salt/wheel/20131219224744416681'} ''' fun = low.pop('fun') - return self.async(fun, low) + return self.asynchronous(fun, low) def cmd(self, fun, arg=None, pub_data=None, kwarg=None, print_event=True, full_return=False): ''' diff --git a/tests/integration/files/engines/runtests_engine.py b/tests/integration/files/engines/runtests_engine.py index ddb52d5c7f..426ab2a5b2 100644 --- a/tests/integration/files/engines/runtests_engine.py +++ b/tests/integration/files/engines/runtests_engine.py @@ -21,7 +21,7 @@ import logging # Import salt libs import salt.utils.event -import salt.utils.async +import salt.utils.asynchronous # Import 3rd-party libs from tornado import gen @@ -70,7 +70,7 @@ class PyTestEngine(object): self.sock.bind(('localhost', port)) # become a server socket self.sock.listen(5) - with salt.utils.async.current_ioloop(self.io_loop): + with salt.utils.asynchronous.current_ioloop(self.io_loop): netutil.add_accept_handler( self.sock, self.handle_connection, diff --git a/tests/integration/netapi/rest_tornado/test_app.py b/tests/integration/netapi/rest_tornado/test_app.py index 2efd2e9f3d..beb085db1e 100644 --- a/tests/integration/netapi/rest_tornado/test_app.py +++ b/tests/integration/netapi/rest_tornado/test_app.py @@ -398,7 +398,7 @@ class TestMinionSaltAPIHandler(_SaltnadoIntegrationTestCase): def test_post_with_incorrect_client(self): ''' - The /minions endpoint is async only, so if you try something else + The /minions endpoint is asynchronous only, so if you try something else make sure you get an error ''' # get a token for this test diff --git a/tests/support/case.py b/tests/support/case.py index 9de6b81fb7..87aeb13bf6 100644 --- a/tests/support/case.py +++ b/tests/support/case.py @@ -13,7 +13,7 @@ # pylint: disable=repr-flag-used-in-string # Import python libs -from __future__ import absolute_import +from __future__ import absolute_import, unicode_literals import os import re import sys @@ -143,17 +143,19 @@ class ShellTestCase(TestCase, AdaptedConfigurationTestCaseMixin): arg_str, with_retcode=False, catch_stderr=False, - async=False, + asynchronous=False, timeout=60, - config_dir=None): + config_dir=None, + **kwargs): ''' Execute salt-run ''' + asynchronous = kwargs.get('async', asynchronous) arg_str = '-c {0}{async_flag} -t {timeout} {1}'.format( config_dir or self.get_config_dir(), arg_str, timeout=timeout, - async_flag=' --async' if async else '') + async_flag=' --async' if asynchronous else '') return self.run_script('salt-run', arg_str, with_retcode=with_retcode, diff --git a/tests/unit/utils/test_async.py b/tests/unit/utils/test_async.py index c93538f0dd..694a7aebfe 100644 --- a/tests/unit/utils/test_async.py +++ b/tests/unit/utils/test_async.py @@ -8,7 +8,7 @@ import tornado.testing import tornado.gen from tornado.testing import AsyncTestCase -import salt.utils.async as async +import salt.utils.asynchronous as asynchronous class HelperA(object): @@ -24,7 +24,7 @@ class HelperA(object): class HelperB(object): def __init__(self, a=None, io_loop=None): if a is None: - a = async.SyncWrapper(HelperA) + a = asynchronous.SyncWrapper(HelperA) self.a = a @tornado.gen.coroutine @@ -38,7 +38,7 @@ class TestSyncWrapper(AsyncTestCase): @tornado.testing.gen_test def test_helpers(self): ''' - Test that the helper classes do what we expect within a regular async env + Test that the helper classes do what we expect within a regular asynchronous env ''' ha = HelperA() ret = yield ha.sleep() @@ -50,29 +50,29 @@ class TestSyncWrapper(AsyncTestCase): def test_basic_wrap(self): ''' - Test that we can wrap an async caller. + Test that we can wrap an asynchronous caller. ''' - sync = async.SyncWrapper(HelperA) + sync = asynchronous.SyncWrapper(HelperA) ret = sync.sleep() self.assertTrue(ret) def test_double(self): ''' - Test when the async wrapper object itself creates a wrap of another thing + Test when the asynchronous wrapper object itself creates a wrap of another thing This works fine since the second wrap is based on the first's IOLoop so we don't have to worry about complex start/stop mechanics ''' - sync = async.SyncWrapper(HelperB) + sync = asynchronous.SyncWrapper(HelperB) ret = sync.sleep() self.assertFalse(ret) def test_double_sameloop(self): ''' - Test async wrappers initiated from the same IOLoop, to ensure that + Test asynchronous wrappers initiated from the same IOLoop, to ensure that we don't wire up both to the same IOLoop (since it causes MANY problems). ''' - a = async.SyncWrapper(HelperA) - sync = async.SyncWrapper(HelperB, (a,)) + a = asynchronous.SyncWrapper(HelperA) + sync = asynchronous.SyncWrapper(HelperB, (a,)) ret = sync.sleep() self.assertFalse(ret) -- 2.17.1