--- a/celery/backends/async.py +++ /dev/null @@ -1,299 +0,0 @@ -"""Async I/O backend support utilities.""" -from __future__ import absolute_import, unicode_literals - -import socket -import threading -from collections import deque -from time import sleep -from weakref import WeakKeyDictionary - -from kombu.utils.compat import detect_environment -from kombu.utils.objects import cached_property - -from celery import states -from celery.exceptions import TimeoutError -from celery.five import Empty, monotonic -from celery.utils.threads import THREAD_TIMEOUT_MAX - -__all__ = ( - 'AsyncBackendMixin', 'BaseResultConsumer', 'Drainer', - 'register_drainer', -) - -drainers = {} - - -def register_drainer(name): - """Decorator used to register a new result drainer type.""" - def _inner(cls): - drainers[name] = cls - return cls - return _inner - - -@register_drainer('default') -class Drainer(object): - """Result draining service.""" - - def __init__(self, result_consumer): - self.result_consumer = result_consumer - - def start(self): - pass - - def stop(self): - pass - - def drain_events_until(self, p, timeout=None, on_interval=None, wait=None): - wait = wait or self.result_consumer.drain_events - time_start = monotonic() - - while 1: - # Total time spent may exceed a single call to wait() - if timeout and monotonic() - time_start >= timeout: - raise socket.timeout() - try: - yield self.wait_for(p, wait, timeout=1) - except socket.timeout: - pass - if on_interval: - on_interval() - if p.ready: # got event on the wanted channel. - break - - def wait_for(self, p, wait, timeout=None): - wait(timeout=timeout) - - -class greenletDrainer(Drainer): - spawn = None - _g = None - - def __init__(self, *args, **kwargs): - super(greenletDrainer, self).__init__(*args, **kwargs) - self._started = threading.Event() - self._stopped = threading.Event() - self._shutdown = threading.Event() - - def run(self): - self._started.set() - while not self._stopped.is_set(): - try: - self.result_consumer.drain_events(timeout=1) - except socket.timeout: - pass - self._shutdown.set() - - def start(self): - if not self._started.is_set(): - self._g = self.spawn(self.run) - self._started.wait() - - def stop(self): - self._stopped.set() - self._shutdown.wait(THREAD_TIMEOUT_MAX) - - def wait_for(self, p, wait, timeout=None): - self.start() - if not p.ready: - sleep(0) - - -@register_drainer('eventlet') -class eventletDrainer(greenletDrainer): - - @cached_property - def spawn(self): - from eventlet import spawn - return spawn - - -@register_drainer('gevent') -class geventDrainer(greenletDrainer): - - @cached_property - def spawn(self): - from gevent import spawn - return spawn - - -class AsyncBackendMixin(object): - """Mixin for backends that enables the async API.""" - - def _collect_into(self, result, bucket): - self.result_consumer.buckets[result] = bucket - - def iter_native(self, result, no_ack=True, **kwargs): - self._ensure_not_eager() - - results = result.results - if not results: - raise StopIteration() - - # we tell the result consumer to put consumed results - # into these buckets. - bucket = deque() - for node in results: - if node._cache: - bucket.append(node) - else: - self._collect_into(node, bucket) - - for _ in self._wait_for_pending(result, no_ack=no_ack, **kwargs): - while bucket: - node = bucket.popleft() - yield node.id, node._cache - while bucket: - node = bucket.popleft() - yield node.id, node._cache - - def add_pending_result(self, result, weak=False, start_drainer=True): - if start_drainer: - self.result_consumer.drainer.start() - try: - self._maybe_resolve_from_buffer(result) - except Empty: - self._add_pending_result(result.id, result, weak=weak) - return result - - def _maybe_resolve_from_buffer(self, result): - result._maybe_set_cache(self._pending_messages.take(result.id)) - - def _add_pending_result(self, task_id, result, weak=False): - concrete, weak_ = self._pending_results - if task_id not in weak_ and result.id not in concrete: - (weak_ if weak else concrete)[task_id] = result - self.result_consumer.consume_from(task_id) - - def add_pending_results(self, results, weak=False): - self.result_consumer.drainer.start() - return [self.add_pending_result(result, weak=weak, start_drainer=False) - for result in results] - - def remove_pending_result(self, result): - self._remove_pending_result(result.id) - self.on_result_fulfilled(result) - return result - - def _remove_pending_result(self, task_id): - for map in self._pending_results: - map.pop(task_id, None) - - def on_result_fulfilled(self, result): - self.result_consumer.cancel_for(result.id) - - def wait_for_pending(self, result, - callback=None, propagate=True, **kwargs): - self._ensure_not_eager() - for _ in self._wait_for_pending(result, **kwargs): - pass - return result.maybe_throw(callback=callback, propagate=propagate) - - def _wait_for_pending(self, result, - timeout=None, on_interval=None, on_message=None, - **kwargs): - return self.result_consumer._wait_for_pending( - result, timeout=timeout, - on_interval=on_interval, on_message=on_message, - ) - - @property - def is_async(self): - return True - - -class BaseResultConsumer(object): - """Manager responsible for consuming result messages.""" - - def __init__(self, backend, app, accept, - pending_results, pending_messages): - self.backend = backend - self.app = app - self.accept = accept - self._pending_results = pending_results - self._pending_messages = pending_messages - self.on_message = None - self.buckets = WeakKeyDictionary() - self.drainer = drainers[detect_environment()](self) - - def start(self, initial_task_id, **kwargs): - raise NotImplementedError() - - def stop(self): - pass - - def drain_events(self, timeout=None): - raise NotImplementedError() - - def consume_from(self, task_id): - raise NotImplementedError() - - def cancel_for(self, task_id): - raise NotImplementedError() - - def _after_fork(self): - self.buckets.clear() - self.buckets = WeakKeyDictionary() - self.on_message = None - self.on_after_fork() - - def on_after_fork(self): - pass - - def drain_events_until(self, p, timeout=None, on_interval=None): - return self.drainer.drain_events_until( - p, timeout=timeout, on_interval=on_interval) - - def _wait_for_pending(self, result, - timeout=None, on_interval=None, on_message=None, - **kwargs): - self.on_wait_for_pending(result, timeout=timeout, **kwargs) - prev_on_m, self.on_message = self.on_message, on_message - try: - for _ in self.drain_events_until( - result.on_ready, timeout=timeout, - on_interval=on_interval): - yield - sleep(0) - except socket.timeout: - raise TimeoutError('The operation timed out.') - finally: - self.on_message = prev_on_m - - def on_wait_for_pending(self, result, timeout=None, **kwargs): - pass - - def on_out_of_band_result(self, message): - self.on_state_change(message.payload, message) - - def _get_pending_result(self, task_id): - for mapping in self._pending_results: - try: - return mapping[task_id] - except KeyError: - pass - raise KeyError(task_id) - - def on_state_change(self, meta, message): - if self.on_message: - self.on_message(meta) - if meta['status'] in states.READY_STATES: - task_id = meta['task_id'] - try: - result = self._get_pending_result(task_id) - except KeyError: - # send to buffer in case we received this result - # before it was added to _pending_results. - self._pending_messages.put(task_id, meta) - else: - result._maybe_set_cache(meta) - buckets = self.buckets - try: - # remove bucket for this result, since it's fulfilled - bucket = buckets.pop(result) - except KeyError: - pass - else: - # send to waiter via bucket - bucket.append(result) - sleep(0) --- /dev/null +++ b/celery/backends/async_tools.py @@ -0,0 +1,299 @@ +"""Async I/O backend support utilities.""" +from __future__ import absolute_import, unicode_literals + +import socket +import threading +from collections import deque +from time import sleep +from weakref import WeakKeyDictionary + +from kombu.utils.compat import detect_environment +from kombu.utils.objects import cached_property + +from celery import states +from celery.exceptions import TimeoutError +from celery.five import Empty, monotonic +from celery.utils.threads import THREAD_TIMEOUT_MAX + +__all__ = ( + 'AsyncBackendMixin', 'BaseResultConsumer', 'Drainer', + 'register_drainer', +) + +drainers = {} + + +def register_drainer(name): + """Decorator used to register a new result drainer type.""" + def _inner(cls): + drainers[name] = cls + return cls + return _inner + + +@register_drainer('default') +class Drainer(object): + """Result draining service.""" + + def __init__(self, result_consumer): + self.result_consumer = result_consumer + + def start(self): + pass + + def stop(self): + pass + + def drain_events_until(self, p, timeout=None, on_interval=None, wait=None): + wait = wait or self.result_consumer.drain_events + time_start = monotonic() + + while 1: + # Total time spent may exceed a single call to wait() + if timeout and monotonic() - time_start >= timeout: + raise socket.timeout() + try: + yield self.wait_for(p, wait, timeout=1) + except socket.timeout: + pass + if on_interval: + on_interval() + if p.ready: # got event on the wanted channel. + break + + def wait_for(self, p, wait, timeout=None): + wait(timeout=timeout) + + +class greenletDrainer(Drainer): + spawn = None + _g = None + + def __init__(self, *args, **kwargs): + super(greenletDrainer, self).__init__(*args, **kwargs) + self._started = threading.Event() + self._stopped = threading.Event() + self._shutdown = threading.Event() + + def run(self): + self._started.set() + while not self._stopped.is_set(): + try: + self.result_consumer.drain_events(timeout=1) + except socket.timeout: + pass + self._shutdown.set() + + def start(self): + if not self._started.is_set(): + self._g = self.spawn(self.run) + self._started.wait() + + def stop(self): + self._stopped.set() + self._shutdown.wait(THREAD_TIMEOUT_MAX) + + def wait_for(self, p, wait, timeout=None): + self.start() + if not p.ready: + sleep(0) + + +@register_drainer('eventlet') +class eventletDrainer(greenletDrainer): + + @cached_property + def spawn(self): + from eventlet import spawn + return spawn + + +@register_drainer('gevent') +class geventDrainer(greenletDrainer): + + @cached_property + def spawn(self): + from gevent import spawn + return spawn + + +class AsyncBackendMixin(object): + """Mixin for backends that enables the async API.""" + + def _collect_into(self, result, bucket): + self.result_consumer.buckets[result] = bucket + + def iter_native(self, result, no_ack=True, **kwargs): + self._ensure_not_eager() + + results = result.results + if not results: + raise StopIteration() + + # we tell the result consumer to put consumed results + # into these buckets. + bucket = deque() + for node in results: + if node._cache: + bucket.append(node) + else: + self._collect_into(node, bucket) + + for _ in self._wait_for_pending(result, no_ack=no_ack, **kwargs): + while bucket: + node = bucket.popleft() + yield node.id, node._cache + while bucket: + node = bucket.popleft() + yield node.id, node._cache + + def add_pending_result(self, result, weak=False, start_drainer=True): + if start_drainer: + self.result_consumer.drainer.start() + try: + self._maybe_resolve_from_buffer(result) + except Empty: + self._add_pending_result(result.id, result, weak=weak) + return result + + def _maybe_resolve_from_buffer(self, result): + result._maybe_set_cache(self._pending_messages.take(result.id)) + + def _add_pending_result(self, task_id, result, weak=False): + concrete, weak_ = self._pending_results + if task_id not in weak_ and result.id not in concrete: + (weak_ if weak else concrete)[task_id] = result + self.result_consumer.consume_from(task_id) + + def add_pending_results(self, results, weak=False): + self.result_consumer.drainer.start() + return [self.add_pending_result(result, weak=weak, start_drainer=False) + for result in results] + + def remove_pending_result(self, result): + self._remove_pending_result(result.id) + self.on_result_fulfilled(result) + return result + + def _remove_pending_result(self, task_id): + for map in self._pending_results: + map.pop(task_id, None) + + def on_result_fulfilled(self, result): + self.result_consumer.cancel_for(result.id) + + def wait_for_pending(self, result, + callback=None, propagate=True, **kwargs): + self._ensure_not_eager() + for _ in self._wait_for_pending(result, **kwargs): + pass + return result.maybe_throw(callback=callback, propagate=propagate) + + def _wait_for_pending(self, result, + timeout=None, on_interval=None, on_message=None, + **kwargs): + return self.result_consumer._wait_for_pending( + result, timeout=timeout, + on_interval=on_interval, on_message=on_message, + ) + + @property + def is_async(self): + return True + + +class BaseResultConsumer(object): + """Manager responsible for consuming result messages.""" + + def __init__(self, backend, app, accept, + pending_results, pending_messages): + self.backend = backend + self.app = app + self.accept = accept + self._pending_results = pending_results + self._pending_messages = pending_messages + self.on_message = None + self.buckets = WeakKeyDictionary() + self.drainer = drainers[detect_environment()](self) + + def start(self, initial_task_id, **kwargs): + raise NotImplementedError() + + def stop(self): + pass + + def drain_events(self, timeout=None): + raise NotImplementedError() + + def consume_from(self, task_id): + raise NotImplementedError() + + def cancel_for(self, task_id): + raise NotImplementedError() + + def _after_fork(self): + self.buckets.clear() + self.buckets = WeakKeyDictionary() + self.on_message = None + self.on_after_fork() + + def on_after_fork(self): + pass + + def drain_events_until(self, p, timeout=None, on_interval=None): + return self.drainer.drain_events_until( + p, timeout=timeout, on_interval=on_interval) + + def _wait_for_pending(self, result, + timeout=None, on_interval=None, on_message=None, + **kwargs): + self.on_wait_for_pending(result, timeout=timeout, **kwargs) + prev_on_m, self.on_message = self.on_message, on_message + try: + for _ in self.drain_events_until( + result.on_ready, timeout=timeout, + on_interval=on_interval): + yield + sleep(0) + except socket.timeout: + raise TimeoutError('The operation timed out.') + finally: + self.on_message = prev_on_m + + def on_wait_for_pending(self, result, timeout=None, **kwargs): + pass + + def on_out_of_band_result(self, message): + self.on_state_change(message.payload, message) + + def _get_pending_result(self, task_id): + for mapping in self._pending_results: + try: + return mapping[task_id] + except KeyError: + pass + raise KeyError(task_id) + + def on_state_change(self, meta, message): + if self.on_message: + self.on_message(meta) + if meta['status'] in states.READY_STATES: + task_id = meta['task_id'] + try: + result = self._get_pending_result(task_id) + except KeyError: + # send to buffer in case we received this result + # before it was added to _pending_results. + self._pending_messages.put(task_id, meta) + else: + result._maybe_set_cache(meta) + buckets = self.buckets + try: + # remove bucket for this result, since it's fulfilled + bucket = buckets.pop(result) + except KeyError: + pass + else: + # send to waiter via bucket + bucket.append(result) + sleep(0) --- a/celery/backends/rpc.py +++ b/celery/backends/rpc.py @@ -17,7 +17,7 @@ from celery._state import current_task, from celery.five import items, range from . import base -from .async import AsyncBackendMixin, BaseResultConsumer +from .async_tools import AsyncBackendMixin, BaseResultConsumer __all__ = ('BacklogLimitExceeded', 'RPCBackend') --- a/celery/backends/redis.py +++ b/celery/backends/redis.py @@ -19,7 +19,7 @@ from celery.utils.functional import dict from celery.utils.log import get_logger from celery.utils.time import humanize_seconds -from . import async, base +from . import async_tools, base try: from urllib.parse import unquote @@ -74,7 +74,7 @@ E_LOST = 'Connection to Redis lost: Retr logger = get_logger(__name__) -class ResultConsumer(async.BaseResultConsumer): +class ResultConsumer(async_tools.BaseResultConsumer): _pubsub = None def __init__(self, *args, **kwargs): @@ -138,7 +138,7 @@ class ResultConsumer(async.BaseResultCon self._pubsub.unsubscribe(key) -class RedisBackend(base.BaseKeyValueStoreBackend, async.AsyncBackendMixin): +class RedisBackend(base.BaseKeyValueStoreBackend, async_tools.AsyncBackendMixin): """Redis task result store.""" ResultConsumer = ResultConsumer --- a/t/unit/backends/test_redis.py +++ b/t/unit/backends/test_redis.py @@ -146,7 +146,7 @@ class test_RedisResultConsumer: def get_consumer(self): return self.get_backend().result_consumer - @patch('celery.backends.async.BaseResultConsumer.on_after_fork') + @patch('celery.backends.async_tools.BaseResultConsumer.on_after_fork') def test_on_after_fork(self, parent_method): consumer = self.get_consumer() consumer.start('none') @@ -172,7 +172,7 @@ class test_RedisResultConsumer: parent_method.assert_called_once() @patch('celery.backends.redis.ResultConsumer.cancel_for') - @patch('celery.backends.async.BaseResultConsumer.on_state_change') + @patch('celery.backends.async_tools.BaseResultConsumer.on_state_change') def test_on_state_change(self, parent_method, cancel_for): consumer = self.get_consumer() meta = {'task_id': 'testing', 'status': states.SUCCESS} --- a/celery/app/routes.py +++ b/celery/app/routes.py @@ -20,6 +20,9 @@ from celery.utils.imports import symbol_ __all__ = ('MapRoute', 'Router', 'prepare') +if not hasattr(re, "_pattern_type"): + setattr(re, '_pattern_type', re.Pattern) + def glob_to_re(glob, quote=string.punctuation.replace('*', '')): glob = ''.join('\\' + c if c in quote else c for c in glob) return glob.replace('*', '.+?') --- a/celery/security/certificate.py +++ b/celery/security/certificate.py @@ -43,7 +43,7 @@ class Certificate(object): def verify(self, data, signature, digest): """Verify signature for string containing data.""" with reraise_errors('Bad signature: {0!r}'): - crypto.verify(self._cert, signature, data, digest) + crypto.verify(self._cert, signature, data.encode('utf-8'), digest) class CertStore(object): --- a/t/unit/worker/test_loops.py +++ b/t/unit/worker/test_loops.py @@ -372,8 +372,8 @@ class test_asynloop: x = X(self.app) def Gen(): - raise StopIteration() - yield + if 0: + yield gen = Gen() x.hub.add_writer(6, gen) x.hub.on_tick.add(x.close_then_error(Mock(name='tick'), 2))