15
0
forked from pool/python-celery

Accepting request 677998 from home:jayvdb:branches:devel:languages:python

- Replace no-async.patch with three Python 3.7 patches merged upstream
  python37-1.patch, python37-2.patch & python37-3.patch
- Replace sed invocation with unpin-pytest.patch for clarity

OBS-URL: https://build.opensuse.org/request/show/677998
OBS-URL: https://build.opensuse.org/package/show/devel:languages:python/python-celery?expand=0&rev=114
This commit is contained in:
Tomáš Chvátal
2019-02-22 10:39:57 +00:00
committed by Git OBS Bridge
parent 27b13e8829
commit 270b7c6b0e
7 changed files with 224 additions and 702 deletions

View File

@@ -1,700 +0,0 @@
--- a/celery/backends/async.py
+++ /dev/null
@@ -1,299 +0,0 @@
-"""Async I/O backend support utilities."""
-from __future__ import absolute_import, unicode_literals
-
-import socket
-import threading
-from collections import deque
-from time import sleep
-from weakref import WeakKeyDictionary
-
-from kombu.utils.compat import detect_environment
-from kombu.utils.objects import cached_property
-
-from celery import states
-from celery.exceptions import TimeoutError
-from celery.five import Empty, monotonic
-from celery.utils.threads import THREAD_TIMEOUT_MAX
-
-__all__ = (
- 'AsyncBackendMixin', 'BaseResultConsumer', 'Drainer',
- 'register_drainer',
-)
-
-drainers = {}
-
-
-def register_drainer(name):
- """Decorator used to register a new result drainer type."""
- def _inner(cls):
- drainers[name] = cls
- return cls
- return _inner
-
-
-@register_drainer('default')
-class Drainer(object):
- """Result draining service."""
-
- def __init__(self, result_consumer):
- self.result_consumer = result_consumer
-
- def start(self):
- pass
-
- def stop(self):
- pass
-
- def drain_events_until(self, p, timeout=None, on_interval=None, wait=None):
- wait = wait or self.result_consumer.drain_events
- time_start = monotonic()
-
- while 1:
- # Total time spent may exceed a single call to wait()
- if timeout and monotonic() - time_start >= timeout:
- raise socket.timeout()
- try:
- yield self.wait_for(p, wait, timeout=1)
- except socket.timeout:
- pass
- if on_interval:
- on_interval()
- if p.ready: # got event on the wanted channel.
- break
-
- def wait_for(self, p, wait, timeout=None):
- wait(timeout=timeout)
-
-
-class greenletDrainer(Drainer):
- spawn = None
- _g = None
-
- def __init__(self, *args, **kwargs):
- super(greenletDrainer, self).__init__(*args, **kwargs)
- self._started = threading.Event()
- self._stopped = threading.Event()
- self._shutdown = threading.Event()
-
- def run(self):
- self._started.set()
- while not self._stopped.is_set():
- try:
- self.result_consumer.drain_events(timeout=1)
- except socket.timeout:
- pass
- self._shutdown.set()
-
- def start(self):
- if not self._started.is_set():
- self._g = self.spawn(self.run)
- self._started.wait()
-
- def stop(self):
- self._stopped.set()
- self._shutdown.wait(THREAD_TIMEOUT_MAX)
-
- def wait_for(self, p, wait, timeout=None):
- self.start()
- if not p.ready:
- sleep(0)
-
-
-@register_drainer('eventlet')
-class eventletDrainer(greenletDrainer):
-
- @cached_property
- def spawn(self):
- from eventlet import spawn
- return spawn
-
-
-@register_drainer('gevent')
-class geventDrainer(greenletDrainer):
-
- @cached_property
- def spawn(self):
- from gevent import spawn
- return spawn
-
-
-class AsyncBackendMixin(object):
- """Mixin for backends that enables the async API."""
-
- def _collect_into(self, result, bucket):
- self.result_consumer.buckets[result] = bucket
-
- def iter_native(self, result, no_ack=True, **kwargs):
- self._ensure_not_eager()
-
- results = result.results
- if not results:
- raise StopIteration()
-
- # we tell the result consumer to put consumed results
- # into these buckets.
- bucket = deque()
- for node in results:
- if node._cache:
- bucket.append(node)
- else:
- self._collect_into(node, bucket)
-
- for _ in self._wait_for_pending(result, no_ack=no_ack, **kwargs):
- while bucket:
- node = bucket.popleft()
- yield node.id, node._cache
- while bucket:
- node = bucket.popleft()
- yield node.id, node._cache
-
- def add_pending_result(self, result, weak=False, start_drainer=True):
- if start_drainer:
- self.result_consumer.drainer.start()
- try:
- self._maybe_resolve_from_buffer(result)
- except Empty:
- self._add_pending_result(result.id, result, weak=weak)
- return result
-
- def _maybe_resolve_from_buffer(self, result):
- result._maybe_set_cache(self._pending_messages.take(result.id))
-
- def _add_pending_result(self, task_id, result, weak=False):
- concrete, weak_ = self._pending_results
- if task_id not in weak_ and result.id not in concrete:
- (weak_ if weak else concrete)[task_id] = result
- self.result_consumer.consume_from(task_id)
-
- def add_pending_results(self, results, weak=False):
- self.result_consumer.drainer.start()
- return [self.add_pending_result(result, weak=weak, start_drainer=False)
- for result in results]
-
- def remove_pending_result(self, result):
- self._remove_pending_result(result.id)
- self.on_result_fulfilled(result)
- return result
-
- def _remove_pending_result(self, task_id):
- for map in self._pending_results:
- map.pop(task_id, None)
-
- def on_result_fulfilled(self, result):
- self.result_consumer.cancel_for(result.id)
-
- def wait_for_pending(self, result,
- callback=None, propagate=True, **kwargs):
- self._ensure_not_eager()
- for _ in self._wait_for_pending(result, **kwargs):
- pass
- return result.maybe_throw(callback=callback, propagate=propagate)
-
- def _wait_for_pending(self, result,
- timeout=None, on_interval=None, on_message=None,
- **kwargs):
- return self.result_consumer._wait_for_pending(
- result, timeout=timeout,
- on_interval=on_interval, on_message=on_message,
- )
-
- @property
- def is_async(self):
- return True
-
-
-class BaseResultConsumer(object):
- """Manager responsible for consuming result messages."""
-
- def __init__(self, backend, app, accept,
- pending_results, pending_messages):
- self.backend = backend
- self.app = app
- self.accept = accept
- self._pending_results = pending_results
- self._pending_messages = pending_messages
- self.on_message = None
- self.buckets = WeakKeyDictionary()
- self.drainer = drainers[detect_environment()](self)
-
- def start(self, initial_task_id, **kwargs):
- raise NotImplementedError()
-
- def stop(self):
- pass
-
- def drain_events(self, timeout=None):
- raise NotImplementedError()
-
- def consume_from(self, task_id):
- raise NotImplementedError()
-
- def cancel_for(self, task_id):
- raise NotImplementedError()
-
- def _after_fork(self):
- self.buckets.clear()
- self.buckets = WeakKeyDictionary()
- self.on_message = None
- self.on_after_fork()
-
- def on_after_fork(self):
- pass
-
- def drain_events_until(self, p, timeout=None, on_interval=None):
- return self.drainer.drain_events_until(
- p, timeout=timeout, on_interval=on_interval)
-
- def _wait_for_pending(self, result,
- timeout=None, on_interval=None, on_message=None,
- **kwargs):
- self.on_wait_for_pending(result, timeout=timeout, **kwargs)
- prev_on_m, self.on_message = self.on_message, on_message
- try:
- for _ in self.drain_events_until(
- result.on_ready, timeout=timeout,
- on_interval=on_interval):
- yield
- sleep(0)
- except socket.timeout:
- raise TimeoutError('The operation timed out.')
- finally:
- self.on_message = prev_on_m
-
- def on_wait_for_pending(self, result, timeout=None, **kwargs):
- pass
-
- def on_out_of_band_result(self, message):
- self.on_state_change(message.payload, message)
-
- def _get_pending_result(self, task_id):
- for mapping in self._pending_results:
- try:
- return mapping[task_id]
- except KeyError:
- pass
- raise KeyError(task_id)
-
- def on_state_change(self, meta, message):
- if self.on_message:
- self.on_message(meta)
- if meta['status'] in states.READY_STATES:
- task_id = meta['task_id']
- try:
- result = self._get_pending_result(task_id)
- except KeyError:
- # send to buffer in case we received this result
- # before it was added to _pending_results.
- self._pending_messages.put(task_id, meta)
- else:
- result._maybe_set_cache(meta)
- buckets = self.buckets
- try:
- # remove bucket for this result, since it's fulfilled
- bucket = buckets.pop(result)
- except KeyError:
- pass
- else:
- # send to waiter via bucket
- bucket.append(result)
- sleep(0)
--- /dev/null
+++ b/celery/backends/async_tools.py
@@ -0,0 +1,299 @@
+"""Async I/O backend support utilities."""
+from __future__ import absolute_import, unicode_literals
+
+import socket
+import threading
+from collections import deque
+from time import sleep
+from weakref import WeakKeyDictionary
+
+from kombu.utils.compat import detect_environment
+from kombu.utils.objects import cached_property
+
+from celery import states
+from celery.exceptions import TimeoutError
+from celery.five import Empty, monotonic
+from celery.utils.threads import THREAD_TIMEOUT_MAX
+
+__all__ = (
+ 'AsyncBackendMixin', 'BaseResultConsumer', 'Drainer',
+ 'register_drainer',
+)
+
+drainers = {}
+
+
+def register_drainer(name):
+ """Decorator used to register a new result drainer type."""
+ def _inner(cls):
+ drainers[name] = cls
+ return cls
+ return _inner
+
+
+@register_drainer('default')
+class Drainer(object):
+ """Result draining service."""
+
+ def __init__(self, result_consumer):
+ self.result_consumer = result_consumer
+
+ def start(self):
+ pass
+
+ def stop(self):
+ pass
+
+ def drain_events_until(self, p, timeout=None, on_interval=None, wait=None):
+ wait = wait or self.result_consumer.drain_events
+ time_start = monotonic()
+
+ while 1:
+ # Total time spent may exceed a single call to wait()
+ if timeout and monotonic() - time_start >= timeout:
+ raise socket.timeout()
+ try:
+ yield self.wait_for(p, wait, timeout=1)
+ except socket.timeout:
+ pass
+ if on_interval:
+ on_interval()
+ if p.ready: # got event on the wanted channel.
+ break
+
+ def wait_for(self, p, wait, timeout=None):
+ wait(timeout=timeout)
+
+
+class greenletDrainer(Drainer):
+ spawn = None
+ _g = None
+
+ def __init__(self, *args, **kwargs):
+ super(greenletDrainer, self).__init__(*args, **kwargs)
+ self._started = threading.Event()
+ self._stopped = threading.Event()
+ self._shutdown = threading.Event()
+
+ def run(self):
+ self._started.set()
+ while not self._stopped.is_set():
+ try:
+ self.result_consumer.drain_events(timeout=1)
+ except socket.timeout:
+ pass
+ self._shutdown.set()
+
+ def start(self):
+ if not self._started.is_set():
+ self._g = self.spawn(self.run)
+ self._started.wait()
+
+ def stop(self):
+ self._stopped.set()
+ self._shutdown.wait(THREAD_TIMEOUT_MAX)
+
+ def wait_for(self, p, wait, timeout=None):
+ self.start()
+ if not p.ready:
+ sleep(0)
+
+
+@register_drainer('eventlet')
+class eventletDrainer(greenletDrainer):
+
+ @cached_property
+ def spawn(self):
+ from eventlet import spawn
+ return spawn
+
+
+@register_drainer('gevent')
+class geventDrainer(greenletDrainer):
+
+ @cached_property
+ def spawn(self):
+ from gevent import spawn
+ return spawn
+
+
+class AsyncBackendMixin(object):
+ """Mixin for backends that enables the async API."""
+
+ def _collect_into(self, result, bucket):
+ self.result_consumer.buckets[result] = bucket
+
+ def iter_native(self, result, no_ack=True, **kwargs):
+ self._ensure_not_eager()
+
+ results = result.results
+ if not results:
+ raise StopIteration()
+
+ # we tell the result consumer to put consumed results
+ # into these buckets.
+ bucket = deque()
+ for node in results:
+ if node._cache:
+ bucket.append(node)
+ else:
+ self._collect_into(node, bucket)
+
+ for _ in self._wait_for_pending(result, no_ack=no_ack, **kwargs):
+ while bucket:
+ node = bucket.popleft()
+ yield node.id, node._cache
+ while bucket:
+ node = bucket.popleft()
+ yield node.id, node._cache
+
+ def add_pending_result(self, result, weak=False, start_drainer=True):
+ if start_drainer:
+ self.result_consumer.drainer.start()
+ try:
+ self._maybe_resolve_from_buffer(result)
+ except Empty:
+ self._add_pending_result(result.id, result, weak=weak)
+ return result
+
+ def _maybe_resolve_from_buffer(self, result):
+ result._maybe_set_cache(self._pending_messages.take(result.id))
+
+ def _add_pending_result(self, task_id, result, weak=False):
+ concrete, weak_ = self._pending_results
+ if task_id not in weak_ and result.id not in concrete:
+ (weak_ if weak else concrete)[task_id] = result
+ self.result_consumer.consume_from(task_id)
+
+ def add_pending_results(self, results, weak=False):
+ self.result_consumer.drainer.start()
+ return [self.add_pending_result(result, weak=weak, start_drainer=False)
+ for result in results]
+
+ def remove_pending_result(self, result):
+ self._remove_pending_result(result.id)
+ self.on_result_fulfilled(result)
+ return result
+
+ def _remove_pending_result(self, task_id):
+ for map in self._pending_results:
+ map.pop(task_id, None)
+
+ def on_result_fulfilled(self, result):
+ self.result_consumer.cancel_for(result.id)
+
+ def wait_for_pending(self, result,
+ callback=None, propagate=True, **kwargs):
+ self._ensure_not_eager()
+ for _ in self._wait_for_pending(result, **kwargs):
+ pass
+ return result.maybe_throw(callback=callback, propagate=propagate)
+
+ def _wait_for_pending(self, result,
+ timeout=None, on_interval=None, on_message=None,
+ **kwargs):
+ return self.result_consumer._wait_for_pending(
+ result, timeout=timeout,
+ on_interval=on_interval, on_message=on_message,
+ )
+
+ @property
+ def is_async(self):
+ return True
+
+
+class BaseResultConsumer(object):
+ """Manager responsible for consuming result messages."""
+
+ def __init__(self, backend, app, accept,
+ pending_results, pending_messages):
+ self.backend = backend
+ self.app = app
+ self.accept = accept
+ self._pending_results = pending_results
+ self._pending_messages = pending_messages
+ self.on_message = None
+ self.buckets = WeakKeyDictionary()
+ self.drainer = drainers[detect_environment()](self)
+
+ def start(self, initial_task_id, **kwargs):
+ raise NotImplementedError()
+
+ def stop(self):
+ pass
+
+ def drain_events(self, timeout=None):
+ raise NotImplementedError()
+
+ def consume_from(self, task_id):
+ raise NotImplementedError()
+
+ def cancel_for(self, task_id):
+ raise NotImplementedError()
+
+ def _after_fork(self):
+ self.buckets.clear()
+ self.buckets = WeakKeyDictionary()
+ self.on_message = None
+ self.on_after_fork()
+
+ def on_after_fork(self):
+ pass
+
+ def drain_events_until(self, p, timeout=None, on_interval=None):
+ return self.drainer.drain_events_until(
+ p, timeout=timeout, on_interval=on_interval)
+
+ def _wait_for_pending(self, result,
+ timeout=None, on_interval=None, on_message=None,
+ **kwargs):
+ self.on_wait_for_pending(result, timeout=timeout, **kwargs)
+ prev_on_m, self.on_message = self.on_message, on_message
+ try:
+ for _ in self.drain_events_until(
+ result.on_ready, timeout=timeout,
+ on_interval=on_interval):
+ yield
+ sleep(0)
+ except socket.timeout:
+ raise TimeoutError('The operation timed out.')
+ finally:
+ self.on_message = prev_on_m
+
+ def on_wait_for_pending(self, result, timeout=None, **kwargs):
+ pass
+
+ def on_out_of_band_result(self, message):
+ self.on_state_change(message.payload, message)
+
+ def _get_pending_result(self, task_id):
+ for mapping in self._pending_results:
+ try:
+ return mapping[task_id]
+ except KeyError:
+ pass
+ raise KeyError(task_id)
+
+ def on_state_change(self, meta, message):
+ if self.on_message:
+ self.on_message(meta)
+ if meta['status'] in states.READY_STATES:
+ task_id = meta['task_id']
+ try:
+ result = self._get_pending_result(task_id)
+ except KeyError:
+ # send to buffer in case we received this result
+ # before it was added to _pending_results.
+ self._pending_messages.put(task_id, meta)
+ else:
+ result._maybe_set_cache(meta)
+ buckets = self.buckets
+ try:
+ # remove bucket for this result, since it's fulfilled
+ bucket = buckets.pop(result)
+ except KeyError:
+ pass
+ else:
+ # send to waiter via bucket
+ bucket.append(result)
+ sleep(0)
--- a/celery/backends/rpc.py
+++ b/celery/backends/rpc.py
@@ -17,7 +17,7 @@ from celery._state import current_task,
from celery.five import items, range
from . import base
-from .async import AsyncBackendMixin, BaseResultConsumer
+from .async_tools import AsyncBackendMixin, BaseResultConsumer
__all__ = ('BacklogLimitExceeded', 'RPCBackend')
--- a/celery/backends/redis.py
+++ b/celery/backends/redis.py
@@ -19,7 +19,7 @@ from celery.utils.functional import dict
from celery.utils.log import get_logger
from celery.utils.time import humanize_seconds
-from . import async, base
+from . import async_tools, base
try:
from urllib.parse import unquote
@@ -74,7 +74,7 @@ E_LOST = 'Connection to Redis lost: Retr
logger = get_logger(__name__)
-class ResultConsumer(async.BaseResultConsumer):
+class ResultConsumer(async_tools.BaseResultConsumer):
_pubsub = None
def __init__(self, *args, **kwargs):
@@ -138,7 +138,7 @@ class ResultConsumer(async.BaseResultCon
self._pubsub.unsubscribe(key)
-class RedisBackend(base.BaseKeyValueStoreBackend, async.AsyncBackendMixin):
+class RedisBackend(base.BaseKeyValueStoreBackend, async_tools.AsyncBackendMixin):
"""Redis task result store."""
ResultConsumer = ResultConsumer
--- a/t/unit/backends/test_redis.py
+++ b/t/unit/backends/test_redis.py
@@ -146,7 +146,7 @@ class test_RedisResultConsumer:
def get_consumer(self):
return self.get_backend().result_consumer
- @patch('celery.backends.async.BaseResultConsumer.on_after_fork')
+ @patch('celery.backends.async_tools.BaseResultConsumer.on_after_fork')
def test_on_after_fork(self, parent_method):
consumer = self.get_consumer()
consumer.start('none')
@@ -172,7 +172,7 @@ class test_RedisResultConsumer:
parent_method.assert_called_once()
@patch('celery.backends.redis.ResultConsumer.cancel_for')
- @patch('celery.backends.async.BaseResultConsumer.on_state_change')
+ @patch('celery.backends.async_tools.BaseResultConsumer.on_state_change')
def test_on_state_change(self, parent_method, cancel_for):
consumer = self.get_consumer()
meta = {'task_id': 'testing', 'status': states.SUCCESS}
--- a/celery/app/routes.py
+++ b/celery/app/routes.py
@@ -20,6 +20,9 @@ from celery.utils.imports import symbol_
__all__ = ('MapRoute', 'Router', 'prepare')
+if not hasattr(re, "_pattern_type"):
+ setattr(re, '_pattern_type', re.Pattern)
+
def glob_to_re(glob, quote=string.punctuation.replace('*', '')):
glob = ''.join('\\' + c if c in quote else c for c in glob)
return glob.replace('*', '.+?')
--- a/celery/security/certificate.py
+++ b/celery/security/certificate.py
@@ -43,7 +43,7 @@ class Certificate(object):
def verify(self, data, signature, digest):
"""Verify signature for string containing data."""
with reraise_errors('Bad signature: {0!r}'):
- crypto.verify(self._cert, signature, data, digest)
+ crypto.verify(self._cert, signature, data.encode('utf-8'), digest)
class CertStore(object):
--- a/t/unit/worker/test_loops.py
+++ b/t/unit/worker/test_loops.py
@@ -372,8 +372,8 @@ class test_asynloop:
x = X(self.app)
def Gen():
- raise StopIteration()
- yield
+ if 0:
+ yield
gen = Gen()
x.hub.add_writer(6, gen)
x.hub.on_tick.add(x.close_then_error(Mock(name='tick'), 2))

View File

@@ -1,3 +1,10 @@
-------------------------------------------------------------------
Thu Feb 21 10:48:26 UTC 2019 - John Vandenberg <jayvdb@gmail.com>
- Replace no-async.patch with three Python 3.7 patches merged upstream
python37-1.patch, python37-2.patch & python37-3.patch
- Replace sed invocation with unpin-pytest.patch for clarity
-------------------------------------------------------------------
Thu Feb 21 09:44:59 UTC 2019 - John Vandenberg <jayvdb@gmail.com>

View File

@@ -27,8 +27,12 @@ URL: http://celeryproject.org
Source: https://files.pythonhosted.org/packages/source/c/celery/celery-%{version}.tar.gz
Patch0: disable-pytest-log-capturing.patch
Patch1: celery-no-redis.patch
Patch2: no-async.patch
Patch2: unpin-pytest.patch
Patch3: relax-billiard-pin.patch
# Upstream patches for Python 3.7 support
Patch4: python37-1.patch
Patch5: python37-2.patch
Patch6: python37-3.patch
BuildRequires: %{python_module SQLAlchemy}
BuildRequires: %{python_module billiard >= 3.5.0.2}
BuildRequires: %{python_module case >= 1.3.1}
@@ -67,7 +71,6 @@ scheduling as well.
%prep
%setup -q -n celery-%{version}
%autopatch -p1
sed -i -e 's:,<3.3::g' requirements/test.txt
%build
%python_build

120
python37-1.patch Normal file
View File

@@ -0,0 +1,120 @@
From e7002769211f7340f38df80b3112706a8e07cafb Mon Sep 17 00:00:00 2001
From: Asif Saifuddin Auvi <auvipy@gmail.com>
Date: Mon, 9 Jul 2018 11:33:36 +0600
Subject: [PATCH] Python 3.7 compat issues (#4852)
* renamed banckend.async to asynchronous
* adjust redis imports of async
* adjust imports of async
* import style adjust
* renamed doc from async to asynchronous
* renamed doc contents from async to asynchronous
---
celery/backends/{async.py => asynchronous.py} | 0
celery/backends/redis.py | 7 ++++---
celery/backends/rpc.py | 2 +-
...backends.async.rst => celery.backends.asynchronous.rst} | 6 +++---
t/unit/backends/test_redis.py | 4 ++--
5 files changed, 10 insertions(+), 9 deletions(-)
rename celery/backends/{async.py => asynchronous.py} (100%)
rename docs/internals/reference/{celery.backends.async.rst => celery.backends.asynchronous.rst} (52%)
diff --git a/celery/backends/async.py b/celery/backends/asynchronous.py
similarity index 100%
rename from celery/backends/async.py
rename to celery/backends/asynchronous.py
diff --git a/celery/backends/redis.py b/celery/backends/redis.py
index 012db0f36e..6c311d8273 100644
--- a/celery/backends/redis.py
+++ b/celery/backends/redis.py
@@ -19,7 +19,8 @@
from celery.utils.log import get_logger
from celery.utils.time import humanize_seconds
-from . import async, base
+from .asynchronous import AsyncBackendMixin, BaseResultConsumer
+from .base import BaseKeyValueStoreBackend
try:
from urllib.parse import unquote
@@ -74,7 +75,7 @@
logger = get_logger(__name__)
-class ResultConsumer(async.BaseResultConsumer):
+class ResultConsumer(BaseResultConsumer):
_pubsub = None
def __init__(self, *args, **kwargs):
@@ -138,7 +139,7 @@ def cancel_for(self, task_id):
self._pubsub.unsubscribe(key)
-class RedisBackend(base.BaseKeyValueStoreBackend, async.AsyncBackendMixin):
+class RedisBackend(BaseKeyValueStoreBackend, AsyncBackendMixin):
"""Redis task result store."""
ResultConsumer = ResultConsumer
diff --git a/celery/backends/rpc.py b/celery/backends/rpc.py
index 6e31cef75e..5e6e407ce6 100644
--- a/celery/backends/rpc.py
+++ b/celery/backends/rpc.py
@@ -17,7 +17,7 @@
from celery.five import items, range
from . import base
-from .async import AsyncBackendMixin, BaseResultConsumer
+from .asynchronous import AsyncBackendMixin, BaseResultConsumer
__all__ = ('BacklogLimitExceeded', 'RPCBackend')
diff --git a/docs/internals/reference/celery.backends.async.rst b/docs/internals/reference/celery.backends.asynchronous.rst
similarity index 52%
rename from docs/internals/reference/celery.backends.async.rst
rename to docs/internals/reference/celery.backends.asynchronous.rst
index 03d10feb33..fef524294e 100644
--- a/docs/internals/reference/celery.backends.async.rst
+++ b/docs/internals/reference/celery.backends.asynchronous.rst
@@ -1,12 +1,12 @@
=====================================
- ``celery.backends.async``
+ ``celery.backends.asynchronous``
=====================================
.. contents::
:local:
-.. currentmodule:: celery.backends.async
+.. currentmodule:: celery.backends.asynchronous
-.. automodule:: celery.backends.async
+.. automodule:: celery.backends.asynchronous
:members:
:undoc-members:
diff --git a/t/unit/backends/test_redis.py b/t/unit/backends/test_redis.py
index 166aa0dc34..6a7dbbd501 100644
--- a/t/unit/backends/test_redis.py
+++ b/t/unit/backends/test_redis.py
@@ -146,7 +146,7 @@ class _RedisBackend(RedisBackend):
def get_consumer(self):
return self.get_backend().result_consumer
- @patch('celery.backends.async.BaseResultConsumer.on_after_fork')
+ @patch('celery.backends.asynchronous.BaseResultConsumer.on_after_fork')
def test_on_after_fork(self, parent_method):
consumer = self.get_consumer()
consumer.start('none')
@@ -172,7 +172,7 @@ def test_on_after_fork(self, parent_method):
parent_method.assert_called_once()
@patch('celery.backends.redis.ResultConsumer.cancel_for')
- @patch('celery.backends.async.BaseResultConsumer.on_state_change')
+ @patch('celery.backends.asynchronous.BaseResultConsumer.on_state_change')
def test_on_state_change(self, parent_method, cancel_for):
consumer = self.get_consumer()
meta = {'task_id': 'testing', 'status': states.SUCCESS}

58
python37-2.patch Normal file
View File

@@ -0,0 +1,58 @@
From 1c3a15938d0b9dde674d4666689d6a6c733d64e4 Mon Sep 17 00:00:00 2001
From: kidoz <ckidoz@gmail.com>
Date: Thu, 12 Jul 2018 20:02:10 +0300
Subject: [PATCH] Added compatibility with python 3.7 (#4902)
---
celery/app/routes.py | 8 +++++++-
t/unit/app/test_routes.py | 5 +++++
2 files changed, 12 insertions(+), 1 deletion(-)
diff --git a/celery/app/routes.py b/celery/app/routes.py
index 9957a4feae..dc06eb988e 100644
--- a/celery/app/routes.py
+++ b/celery/app/routes.py
@@ -17,6 +17,12 @@
from celery.utils.functional import maybe_evaluate, mlazy
from celery.utils.imports import symbol_by_name
+try:
+ Pattern = re._pattern_type
+except AttributeError: # pragma: no cover
+ # for support Python 3.7
+ Pattern = re.Pattern
+
__all__ = ('MapRoute', 'Router', 'prepare')
@@ -33,7 +39,7 @@ def __init__(self, map):
self.map = {}
self.patterns = OrderedDict()
for k, v in map:
- if isinstance(k, re._pattern_type):
+ if isinstance(k, Pattern):
self.patterns[k] = v
elif '*' in k:
self.patterns[re.compile(glob_to_re(k))] = v
diff --git a/t/unit/app/test_routes.py b/t/unit/app/test_routes.py
index 8d3eac0417..5ed8c53b1c 100644
--- a/t/unit/app/test_routes.py
+++ b/t/unit/app/test_routes.py
@@ -78,12 +78,17 @@ def test_route_for_task(self):
assert route('celery.awesome') is None
def test_route_for_task__glob(self):
+ from re import compile
+
route = routes.MapRoute([
('proj.tasks.*', 'routeA'),
('demoapp.tasks.bar.*', {'exchange': 'routeB'}),
+ (compile(r'(video|image)\.tasks\..*'), {'queue': 'media'}),
])
assert route('proj.tasks.foo') == {'queue': 'routeA'}
assert route('demoapp.tasks.bar.moo') == {'exchange': 'routeB'}
+ assert route('video.tasks.foo') == {'queue': 'media'}
+ assert route('image.tasks.foo') == {'queue': 'media'}
assert route('demoapp.foo.bar.moo') is None
def test_expand_route_not_found(self):

28
python37-3.patch Normal file
View File

@@ -0,0 +1,28 @@
From 6ab775eb6b5643311af21557fa84d10ce605eb17 Mon Sep 17 00:00:00 2001
From: Omer Katz <omer.drow@gmail.com>
Date: Fri, 4 Jan 2019 07:12:10 +0200
Subject: [PATCH] Avoid raising StopIterator in generators. (#5263)
According to [PEP-479](https://www.python.org/dev/peps/pep-0479/) StopIteration should not be used any more to indicate the termination of a generator.
Starting from Python 3.7 this behaviour is always enforced and a RuntimeError is raised instead.
Instead of raising a StopIterator exception, we now never execute our yield statement.
Since it is present Gen is still a generator but it will never yield any value.
---
t/unit/worker/test_loops.py | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/t/unit/worker/test_loops.py b/t/unit/worker/test_loops.py
index f86f730f16..d57b416e47 100644
--- a/t/unit/worker/test_loops.py
+++ b/t/unit/worker/test_loops.py
@@ -383,8 +383,8 @@ def test_poll_write_generator_stopped(self):
x = X(self.app)
def Gen():
- raise StopIteration()
- yield
+ if 0:
+ yield
gen = Gen()
x.hub.add_writer(6, gen)
x.hub.on_tick.add(x.close_then_error(Mock(name='tick'), 2))

6
unpin-pytest.patch Normal file
View File

@@ -0,0 +1,6 @@
--- celery-4.2.1/requirements/test.txt.orig 2019-02-21 17:43:53.252577134 +0700
+++ celery-4.2.1/requirements/test.txt 2019-02-21 17:44:02.860644766 +0700
@@ -1,2 +1,2 @@
case>=1.3.1
-pytest>=3.0,<3.3
+pytest>=3.0