15
0
forked from pool/python-celery

- Update to 5.3.6:

* Update task.py get_custom_headers missing 'compression' key
  * Basic ElasticSearch / ElasticClient 8.x Support
  * Fix eager tasks does not populate name field
  * Fix: serialization error when gossip working
  * Stamping bugfix with group/chord header errback linking
  * Fixed issue: Flags broker_connection_retry_on_startup &
    broker_connection_retry aren’t reliable
  * Use string value for CELERY_SKIP_CHECKS envvar
  * Added initial support for python 3.11
  * Fixed a small float value of retry_backoff
  * Update CELERY_TASK_EAGER setting in user guide
  * Fixed bug where retrying a task loses its stamps
  * Warn when an unnamed periodic task override another one.
  * Fix Task.handle_ignore not wrapping exceptions properly
  * Align revoke and revoke_by_stamped_headers return values (terminate=True)
  * Added signal triggered before fork
  * Deprecate pytz and use zoneinfo
  * recreate_module: set spec to the new module
  * Do not recommend using easy_install anymore
  * importlib_metadata remove deprecated entry point interfaces
  * New control command revoke_by_stamped_headers
  * Remove reference to old Python version
  * Stamping a task left the task properties dirty
  * Bugfix for nested group stamping on task replace
  * Add broker_channel_error_retry option
  * async chords should pass it's kwargs to the group/body.
  * Fix honor Django's TIME_ZONE setting.
  * Remove Python 3.4 compatibility code.
  * Use importlib instead of deprecated pkg_resources.

OBS-URL: https://build.opensuse.org/package/show/devel:languages:python/python-celery?expand=0&rev=156
This commit is contained in:
2024-02-09 03:10:11 +00:00
committed by Git OBS Bridge
parent b71e079312
commit c893ced983
10 changed files with 137 additions and 500 deletions

View File

@@ -1,141 +0,0 @@
From 879af6341974c3778077d8212d78f093b2d77a4f Mon Sep 17 00:00:00 2001
From: Tomer Nosrati <tomer.nosrati@kcg.tech>
Date: Tue, 4 Oct 2022 02:06:50 +0300
Subject: [PATCH] Fixed error handling bugs due to upgrade to a newer version
of billiard
---
celery/app/task.py | 4 +++-
celery/worker/request.py | 19 ++++++++++++++-----
t/unit/utils/test_collections.py | 4 ++--
t/unit/worker/test_request.py | 8 ++++----
4 files changed, 23 insertions(+), 12 deletions(-)
diff --git a/celery/app/task.py b/celery/app/task.py
index 212bc772e01..d6108fbef8c 100644
--- a/celery/app/task.py
+++ b/celery/app/task.py
@@ -1,7 +1,7 @@
"""Task implementation: request context and the task base class."""
import sys
-from billiard.einfo import ExceptionInfo
+from billiard.einfo import ExceptionInfo, ExceptionWithTraceback
from kombu import serialization
from kombu.exceptions import OperationalError
from kombu.utils.uuid import uuid
@@ -813,6 +813,8 @@ def apply(self, args=None, kwargs=None,
retval = ret.retval
if isinstance(retval, ExceptionInfo):
retval, tb = retval.exception, retval.traceback
+ if isinstance(retval, ExceptionWithTraceback):
+ retval = retval.exc
if isinstance(retval, Retry) and retval.sig is not None:
return retval.sig.apply(retries=retries + 1)
state = states.SUCCESS if ret.info is None else ret.info.state
diff --git a/celery/worker/request.py b/celery/worker/request.py
index d89971468c6..d0004a19ccc 100644
--- a/celery/worker/request.py
+++ b/celery/worker/request.py
@@ -10,6 +10,7 @@
from weakref import ref
from billiard.common import TERM_SIGNAME
+from billiard.einfo import ExceptionWithTraceback
from kombu.utils.encoding import safe_repr, safe_str
from kombu.utils.objects import cached_property
@@ -511,8 +512,11 @@ def on_success(self, failed__retval__runtime, **kwargs):
"""Handler called if the task was successfully processed."""
failed, retval, runtime = failed__retval__runtime
if failed:
- if isinstance(retval.exception, (SystemExit, KeyboardInterrupt)):
- raise retval.exception
+ exc = retval.exception
+ if isinstance(exc, ExceptionWithTraceback):
+ exc = exc.exc
+ if isinstance(exc, (SystemExit, KeyboardInterrupt)):
+ raise exc
return self.on_failure(retval, return_ok=True)
task_ready(self, successful=True)
@@ -535,6 +539,9 @@ def on_failure(self, exc_info, send_failed_event=True, return_ok=False):
task_ready(self)
exc = exc_info.exception
+ if isinstance(exc, ExceptionWithTraceback):
+ exc = exc.exc
+
is_terminated = isinstance(exc, Terminated)
if is_terminated:
# If the task was terminated and the task was not cancelled due
@@ -735,9 +742,11 @@ def execute_using_pool(self, pool, **kwargs):
def on_success(self, failed__retval__runtime, **kwargs):
failed, retval, runtime = failed__retval__runtime
if failed:
- if isinstance(retval.exception, (
- SystemExit, KeyboardInterrupt)):
- raise retval.exception
+ exc = retval.exception
+ if isinstance(exc, ExceptionWithTraceback):
+ exc = exc.exc
+ if isinstance(exc, (SystemExit, KeyboardInterrupt)):
+ raise exc
return self.on_failure(retval, return_ok=True)
task_ready(self)
diff --git a/t/unit/utils/test_collections.py b/t/unit/utils/test_collections.py
index ce776cebf1a..aae685ebc7c 100644
--- a/t/unit/utils/test_collections.py
+++ b/t/unit/utils/test_collections.py
@@ -145,8 +145,8 @@ def test_exception_info(self):
except Exception:
einfo = ExceptionInfo()
assert str(einfo) == einfo.traceback
- assert isinstance(einfo.exception, LookupError)
- assert einfo.exception.args == ('The quick brown fox jumps...',)
+ assert isinstance(einfo.exception.exc, LookupError)
+ assert einfo.exception.exc.args == ('The quick brown fox jumps...',)
assert einfo.traceback
assert repr(einfo)
diff --git a/t/unit/worker/test_request.py b/t/unit/worker/test_request.py
index a34f70dc80d..b818f2837cc 100644
--- a/t/unit/worker/test_request.py
+++ b/t/unit/worker/test_request.py
@@ -155,7 +155,7 @@ def test_execute_jail_failure(self):
self.app, uuid(), self.mytask_raising.name, {}, [4], {},
)
assert isinstance(ret, ExceptionInfo)
- assert ret.exception.args == (4,)
+ assert ret.exception.exc.args == (4,)
def test_execute_task_ignore_result(self):
@self.app.task(shared=False, ignore_result=True)
@@ -385,7 +385,7 @@ def test_on_failure_WorkerLostError_redelivered_True(self):
task_failure,
sender=req.task,
task_id=req.id,
- exception=einfo.exception,
+ exception=einfo.exception.exc,
args=req.args,
kwargs=req.kwargs,
traceback=einfo.traceback,
@@ -394,7 +394,7 @@ def test_on_failure_WorkerLostError_redelivered_True(self):
req.on_failure(einfo)
req.task.backend.mark_as_failure.assert_called_once_with(req.id,
- einfo.exception,
+ einfo.exception.exc,
request=req._context,
store_result=True)
@@ -807,7 +807,7 @@ def test_from_message_invalid_kwargs(self):
m = self.TaskMessage(self.mytask.name, args=(), kwargs='foo')
req = Request(m, app=self.app)
with pytest.raises(InvalidTaskError):
- raise req.execute().exception
+ raise req.execute().exception.exc
def test_on_hard_timeout_acks_late(self, patching):
error = patching('celery.worker.request.error')

View File

@@ -1,34 +0,0 @@
From b260860988469ef8ad74f2d4225839c2fa91d590 Mon Sep 17 00:00:00 2001
From: Omer Katz <omer.katz@kcg.tech>
Date: Sat, 9 Apr 2022 13:27:58 +0300
Subject: [PATCH] Avoid importing buf_t from billiard's compat module as it was
removed.
buf_t was a compatibility layer for 2.7, it's no longer needed so it was removed from billiard.
We should adjust the code in Celery as well.
---
celery/concurrency/asynpool.py | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/celery/concurrency/asynpool.py b/celery/concurrency/asynpool.py
index b9f2875a261..489336936c1 100644
--- a/celery/concurrency/asynpool.py
+++ b/celery/concurrency/asynpool.py
@@ -26,7 +26,7 @@
from weakref import WeakValueDictionary, ref
from billiard import pool as _pool
-from billiard.compat import buf_t, isblocking, setblocking
+from billiard.compat import isblocking, setblocking
from billiard.pool import ACK, NACK, RUN, TERMINATE, WorkersJoined
from billiard.queues import _SimpleQueue
from kombu.asynchronous import ERR, WRITE
@@ -868,7 +868,7 @@ def send_job(tup):
header = pack('>I', body_size)
# index 1,0 is the job ID.
job = get_job(tup[1][0])
- job._payload = buf_t(header), buf_t(body), body_size
+ job._payload = memoryview(header), memoryview(body), body_size
put_message(job)
self._quick_put = send_job

View File

@@ -1,3 +0,0 @@
version https://git-lfs.github.com/spec/v1
oid sha256:fafbd82934d30f8a004f81e8f7a062e31413a23d444be8ee3326553915958c6d
size 1474243

3
celery-5.3.6.tar.gz Normal file
View File

@@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:870cc71d737c0200c397290d730344cc991d13a057534353d124c9380267aab9
size 1544498

View File

@@ -1,7 +1,7 @@
Index: celery-5.2.6/celery/contrib/pytest.py
Index: celery-5.3.6/celery/contrib/pytest.py
===================================================================
--- celery-5.2.6.orig/celery/contrib/pytest.py
+++ celery-5.2.6/celery/contrib/pytest.py
--- celery-5.3.6.orig/celery/contrib/pytest.py
+++ celery-5.3.6/celery/contrib/pytest.py
@@ -19,16 +19,6 @@ NO_WORKER = os.environ.get('NO_WORKER')
# Well, they're called fixtures....
@@ -19,11 +19,11 @@ Index: celery-5.2.6/celery/contrib/pytest.py
@contextmanager
def _create_app(enable_logging=False,
use_trap=False,
Index: celery-5.2.6/t/unit/conftest.py
Index: celery-5.3.6/t/unit/conftest.py
===================================================================
--- celery-5.2.6.orig/t/unit/conftest.py
+++ celery-5.2.6/t/unit/conftest.py
@@ -64,6 +64,16 @@ class WhateverIO(io.StringIO):
--- celery-5.3.6.orig/t/unit/conftest.py
+++ celery-5.3.6/t/unit/conftest.py
@@ -56,6 +56,16 @@ class WhateverIO(io.StringIO):
_SIO_write(self, data.decode() if isinstance(data, bytes) else data)

View File

@@ -1,3 +1,47 @@
-------------------------------------------------------------------
Fri Feb 9 03:09:55 UTC 2024 - Steve Kowalik <steven.kowalik@suse.com>
- Update to 5.3.6:
* Update task.py get_custom_headers missing 'compression' key
* Basic ElasticSearch / ElasticClient 8.x Support
* Fix eager tasks does not populate name field
* Fix: serialization error when gossip working
* Stamping bugfix with group/chord header errback linking
* Fixed issue: Flags broker_connection_retry_on_startup &
broker_connection_retry arent reliable
* Use string value for CELERY_SKIP_CHECKS envvar
* Added initial support for python 3.11
* Fixed a small float value of retry_backoff
* Update CELERY_TASK_EAGER setting in user guide
* Fixed bug where retrying a task loses its stamps
* Warn when an unnamed periodic task override another one.
* Fix Task.handle_ignore not wrapping exceptions properly
* Align revoke and revoke_by_stamped_headers return values (terminate=True)
* Added signal triggered before fork
* Deprecate pytz and use zoneinfo
* recreate_module: set spec to the new module
* Do not recommend using easy_install anymore
* importlib_metadata remove deprecated entry point interfaces
* New control command revoke_by_stamped_headers
* Remove reference to old Python version
* Stamping a task left the task properties dirty
* Bugfix for nested group stamping on task replace
* Add broker_channel_error_retry option
* async chords should pass it's kwargs to the group/body.
* Fix honor Django's TIME_ZONE setting.
* Remove Python 3.4 compatibility code.
* Use importlib instead of deprecated pkg_resources.
* load_extension_class_names - correct module_name
* Include dont_autoretry_for option in tasks.
- Drop patches, included upstream:
* 879af6341974c3778077d8212d78f093b2d77a4f.patch
* b260860988469ef8ad74f2d4225839c2fa91d590.patch
* sqlalchemy-2.0.patch
* tests.patch
- Refreshed move-pytest-configuration-to-conftest.patch
- Add patch support-moto-5.patch:
* Support changes in moto 5.
-------------------------------------------------------------------
Sun Jun 11 13:26:40 UTC 2023 - ecsos <ecsos@opensuse.org>

View File

@@ -1,7 +1,7 @@
#
# spec file
# spec file for package python-celery
#
# Copyright (c) 2023 SUSE LLC
# Copyright (c) 2024 SUSE LLC
#
# All modifications and additions to the file contributed by third parties
# remain the property of their copyright owners, unless otherwise agreed
@@ -28,19 +28,15 @@
%bcond_with ringdisabled
%{?sle15_python_module_pythons}
Name: python-celery%{psuffix}
Version: 5.2.7
Version: 5.3.6
Release: 0
Summary: Distributed Task Queue module for Python
License: BSD-3-Clause
URL: http://celeryproject.org
Source: https://files.pythonhosted.org/packages/source/c/celery/celery-%{version}.tar.gz
Patch0: move-pytest-configuration-to-conftest.patch
Patch1: tests.patch
# PATCH-FIX-UPSTREAM compatibility with newer billiard
Patch2: https://github.com/celery/celery/commit/b260860988469ef8ad74f2d4225839c2fa91d590.patch
Patch3: https://github.com/celery/celery/commit/879af6341974c3778077d8212d78f093b2d77a4f.patch
# PATCH-FIX-UPSTREAM sqlalchemy-2.0.patch -- gh#celery/celery#8271
Patch4: sqlalchemy-2.0.patch
# PATCH-FIX-UPSTREAM gh#celery/celery#8838
Patch1: support-moto-5.patch
BuildRequires: %{python_module setuptools}
BuildRequires: fdupes
BuildRequires: netcfg
@@ -51,10 +47,11 @@ Requires: python-click-didyoumean >= 0.0.3
Requires: python-click-plugins >= 1.1.1
Requires: python-click-repl >= 0.2.0
Requires: python-kombu >= 5.2.3
Requires: python-pytz >= 2021.3
Requires: python-python-dateutil
Requires: python-tzdata
Requires: python-vine >= 5.0.0
Requires(post): update-alternatives
Requires(postun):update-alternatives
Requires(postun): update-alternatives
Recommends: python-cryptography
Recommends: python-curses
Suggests: python-eventlet
@@ -70,13 +67,16 @@ BuildRequires: %{python_module boto3 >= 1.9.178}
BuildRequires: %{python_module case >= 1.3.1}
BuildRequires: %{python_module celery = %{version}}
BuildRequires: %{python_module cryptography >= 36.0.2}
BuildRequires: %{python_module dbm}
BuildRequires: %{python_module eventlet >= 0.32.0}
BuildRequires: %{python_module gevent}
BuildRequires: %{python_module moto >= 2.2.6}
BuildRequires: %{python_module msgpack}
BuildRequires: %{python_module pymongo >= 4.0.2}
BuildRequires: %{python_module pytest >= 4.5.0}
BuildRequires: %{python_module pytest-click}
BuildRequires: %{python_module pytest-subtests}
BuildRequires: %{python_module tzdata}
%if %{with ringdisabled}
ExclusiveArch: do-not-build
%endif

View File

@@ -1,19 +0,0 @@
Index: celery-5.2.7/t/unit/backends/test_database.py
===================================================================
--- celery-5.2.7.orig/t/unit/backends/test_database.py
+++ celery-5.2.7/t/unit/backends/test_database.py
@@ -410,7 +410,13 @@ class test_SessionManager:
from sqlalchemy.dialects.sqlite import dialect
from sqlalchemy.exc import DatabaseError
- sqlite = dialect.dbapi()
+ if hasattr(dialect, 'dbapi'):
+ # Method name in SQLAlchemy < 2.0
+ sqlite = dialect.dbapi()
+ else:
+ # Newer method name in SQLAlchemy 2.0
+ sqlite = dialect.import_dbapi()
+
manager = SessionManager()
engine = manager.get_engine('dburi')

72
support-moto-5.patch Normal file
View File

@@ -0,0 +1,72 @@
commit 8ebab3d94de46dc7074ffade0aec50d739d36e26
Author: Steve Kowalik <steven@wedontsleep.org>
Date: Fri Feb 9 11:58:01 2024 +1100
Support moto 5.0
moto 5.0 has been released, and the major change is to pull all of the
seperate mock calls into one -- mock_aws. Continue to support moto 4,
since it's easy to do so.
Index: celery-5.3.6/t/unit/backends/test_s3.py
===================================================================
--- celery-5.3.6.orig/t/unit/backends/test_s3.py
+++ celery-5.3.6/t/unit/backends/test_s3.py
@@ -3,7 +3,11 @@ from unittest.mock import patch
import boto3
import pytest
from botocore.exceptions import ClientError
-from moto import mock_s3
+
+try:
+ from moto import mock_aws
+except ImportError:
+ from moto import mock_s3 as mock_aws
from celery import states
from celery.backends.s3 import S3Backend
@@ -84,7 +88,7 @@ class test_S3Backend:
's3', endpoint_url=endpoint_url)
@pytest.mark.parametrize("key", ['uuid', b'uuid'])
- @mock_s3
+ @mock_aws
def test_set_and_get_a_key(self, key):
self._mock_s3_resource()
@@ -97,7 +101,7 @@ class test_S3Backend:
assert s3_backend.get(key) == 'another_status'
- @mock_s3
+ @mock_aws
def test_set_and_get_a_result(self):
self._mock_s3_resource()
@@ -111,7 +115,7 @@ class test_S3Backend:
value = s3_backend.get_result('foo')
assert value == 'baar'
- @mock_s3
+ @mock_aws
def test_get_a_missing_key(self):
self._mock_s3_resource()
@@ -141,7 +145,7 @@ class test_S3Backend:
s3_backend.get('uuidddd')
@pytest.mark.parametrize("key", ['uuid', b'uuid'])
- @mock_s3
+ @mock_aws
def test_delete_a_key(self, key):
self._mock_s3_resource()
@@ -157,7 +161,7 @@ class test_S3Backend:
assert s3_backend.get(key) is None
- @mock_s3
+ @mock_aws
def test_with_a_non_existing_bucket(self):
self._mock_s3_resource()

View File

@@ -1,285 +0,0 @@
From 9e324caaa6b175d8e51d3582378b78757e66a12d Mon Sep 17 00:00:00 2001
From: dobosevych <dobosevych@users.noreply.github.com>
Date: Thu, 14 Apr 2022 18:22:33 +0300
Subject: [PATCH] Integration test fix (#7460)
* Integration debugging
* Integration debugging
* Integration debugging
* Commented tasks that aren't working
* Fixed test_inspect.py
* Fixed serialization test_canvas.py
* Request fixes
* Setup full pipeline
* Setup full pipeline
* Setup full pipeline
* Setup python-package.yml
* Setup python-package.yml
* Added 3.10 to integration tests
* test_task.py fixed
* test_generator fixed
* Added parametrization to test_generation
* fixed test_generator
* Reverted encoding in test_canvas.py
* Rollback codecov
* Retries now respect additional options.
Previously, expires and other options were not merged with
the current task's options. This commit fixes the issue.
Co-authored-by: Omer Katz <omer.katz@kcg.tech>
---
celery/app/task.py | 2 +-
celery/canvas.py | 13 +++++---
celery/contrib/pytest.py | 2 +-
celery/worker/request.py | 2 +-
requirements/test-integration.txt | 1 +
t/integration/tasks.py | 7 +++--
t/integration/test_canvas.py | 19 ++++++------
t/integration/test_tasks.py | 11 +++++--
9 files changed, 79 insertions(+), 24 deletions(-)
Index: celery-5.2.6/celery/app/task.py
===================================================================
--- celery-5.2.6.orig/celery/app/task.py
+++ celery-5.2.6/celery/app/task.py
@@ -605,7 +605,7 @@ class Task:
request = self.request if request is None else request
args = request.args if args is None else args
kwargs = request.kwargs if kwargs is None else kwargs
- options = request.as_execution_options()
+ options = {**request.as_execution_options(), **extra_options}
delivery_info = request.delivery_info or {}
priority = delivery_info.get('priority')
if priority is not None:
Index: celery-5.2.6/celery/canvas.py
===================================================================
--- celery-5.2.6.orig/celery/canvas.py
+++ celery-5.2.6/celery/canvas.py
@@ -26,7 +26,7 @@ from celery.utils import abstract
from celery.utils.collections import ChainMap
from celery.utils.functional import _regen
from celery.utils.functional import chunks as _chunks
-from celery.utils.functional import (is_list, lookahead, maybe_list, regen,
+from celery.utils.functional import (is_list, maybe_list, regen,
seq_concat_item, seq_concat_seq)
from celery.utils.objects import getitem_property
from celery.utils.text import remove_repeating_from_task, truncate
@@ -1184,9 +1184,11 @@ class group(Signature):
# next_task is None. This enables us to set the chord size
# without burning through the entire generator. See #3021.
chord_size = 0
- for task_index, (current_task, next_task) in enumerate(
- lookahead(tasks)
- ):
+ tasks_shifted, tasks = itertools.tee(tasks)
+ next(tasks_shifted, None)
+ next_task = next(tasks_shifted, None)
+
+ for task_index, current_task in enumerate(tasks):
# We expect that each task must be part of the same group which
# seems sensible enough. If that's somehow not the case we'll
# end up messing up chord counts and there are all sorts of
@@ -1212,6 +1214,7 @@ class group(Signature):
if p and not p.cancelled and not p.ready:
p.size += 1
res.then(p, weak=True)
+ next_task = next(tasks_shifted, None)
yield res # <-- r.parent, etc set in the frozen result.
def _freeze_gid(self, options):
@@ -1249,7 +1252,7 @@ class group(Signature):
# we freeze all tasks in the clone tasks1, and then zip the results
# with the IDs of tasks in the second clone, tasks2. and then, we build
# a generator that takes only the task IDs from tasks2.
- self.tasks = regen(x[0] for x in zip(tasks2, results))
+ self.tasks = regen(tasks2)
else:
new_tasks = []
# Need to unroll subgroups early so that chord gets the
Index: celery-5.2.6/celery/contrib/pytest.py
===================================================================
--- celery-5.2.6.orig/celery/contrib/pytest.py
+++ celery-5.2.6/celery/contrib/pytest.py
@@ -88,7 +88,7 @@ def celery_session_worker(
for module in celery_includes:
celery_session_app.loader.import_task_module(module)
for class_task in celery_class_tasks:
- celery_session_app.tasks.register(class_task)
+ celery_session_app.register_task(class_task)
with worker.start_worker(celery_session_app,
pool=celery_worker_pool,
**celery_worker_parameters) as w:
Index: celery-5.2.6/celery/worker/request.py
===================================================================
--- celery-5.2.6.orig/celery/worker/request.py
+++ celery-5.2.6/celery/worker/request.py
@@ -155,7 +155,7 @@ class Request:
'exchange': delivery_info.get('exchange'),
'routing_key': delivery_info.get('routing_key'),
'priority': properties.get('priority'),
- 'redelivered': delivery_info.get('redelivered'),
+ 'redelivered': delivery_info.get('redelivered', False),
}
self._request_dict.update({
'properties': properties,
Index: celery-5.2.6/requirements/test-integration.txt
===================================================================
--- celery-5.2.6.orig/requirements/test-integration.txt
+++ celery-5.2.6/requirements/test-integration.txt
@@ -3,3 +3,4 @@
-r extras/auth.txt
-r extras/memcache.txt
pytest-rerunfailures>=6.0
+git+https://github.com/celery/kombu.git
Index: celery-5.2.6/t/integration/tasks.py
===================================================================
--- celery-5.2.6.orig/t/integration/tasks.py
+++ celery-5.2.6/t/integration/tasks.py
@@ -197,16 +197,17 @@ def retry(self, return_value=None):
raise self.retry(exc=ExpectedException(), countdown=5)
-@shared_task(bind=True, expires=60.0, max_retries=1)
-def retry_once(self, *args, expires=60.0, max_retries=1, countdown=0.1):
+@shared_task(bind=True, expires=120.0, max_retries=1)
+def retry_once(self, *args, expires=None, max_retries=1, countdown=0.1):
"""Task that fails and is retried. Returns the number of retries."""
if self.request.retries:
return self.request.retries
raise self.retry(countdown=countdown,
+ expires=expires,
max_retries=max_retries)
-@shared_task(bind=True, expires=60.0, max_retries=1)
+@shared_task(bind=True, max_retries=1)
def retry_once_priority(self, *args, expires=60.0, max_retries=1,
countdown=0.1):
"""Task that fails and is retried. Returns the priority."""
Index: celery-5.2.6/t/integration/test_canvas.py
===================================================================
--- celery-5.2.6.orig/t/integration/test_canvas.py
+++ celery-5.2.6/t/integration/test_canvas.py
@@ -124,7 +124,7 @@ class test_link_error:
)
assert result.get(timeout=TIMEOUT, propagate=False) == exception
- @flaky
+ @pytest.mark.xfail(raises=TimeoutError, reason="Task is timeout instead of returning exception")
def test_link_error_callback_retries(self):
exception = ExpectedException("Task expected to fail", "test")
result = fail.apply_async(
@@ -144,7 +144,7 @@ class test_link_error:
assert (fail.apply().get(timeout=TIMEOUT, propagate=False), True) == (
exception, True)
- @flaky
+ @pytest.mark.xfail(raises=TimeoutError, reason="Task is timeout instead of returning exception")
def test_link_error_using_signature(self):
fail = signature('t.integration.tasks.fail', args=("test",))
retrun_exception = signature('t.integration.tasks.return_exception')
@@ -179,7 +179,7 @@ class test_chain:
res = c()
assert res.get(timeout=TIMEOUT) == [64, 65, 66, 67]
- @flaky
+ @pytest.mark.xfail(raises=TimeoutError, reason="Task is timeout")
def test_group_results_in_chain(self, manager):
# This adds in an explicit test for the special case added in commit
# 1e3fcaa969de6ad32b52a3ed8e74281e5e5360e6
@@ -477,7 +477,7 @@ class test_chain:
res = c()
assert res.get(timeout=TIMEOUT) == [8, 8]
- @flaky
+ @pytest.mark.xfail(raises=TimeoutError, reason="Task is timeout")
def test_nested_chain_group_lone(self, manager):
"""
Test that a lone group in a chain completes.
@@ -1233,7 +1233,7 @@ class test_chord:
result = c()
assert result.get(timeout=TIMEOUT) == 4
- @flaky
+ @pytest.mark.xfail(reason="async_results aren't performed in async way")
def test_redis_subscribed_channels_leak(self, manager):
if not manager.app.conf.result_backend.startswith('redis'):
raise pytest.skip('Requires redis result backend.')
@@ -1566,11 +1566,12 @@ class test_chord:
) == 1
@flaky
- def test_generator(self, manager):
+ @pytest.mark.parametrize('size', [3, 4, 5, 6, 7, 8, 9])
+ def test_generator(self, manager, size):
def assert_generator(file_name):
- for i in range(3):
+ for i in range(size):
sleep(1)
- if i == 2:
+ if i == size - 1:
with open(file_name) as file_handle:
# ensures chord header generators tasks are processed incrementally #3021
assert file_handle.readline() == '0\n', "Chord header was unrolled too early"
@@ -1579,7 +1580,7 @@ class test_chord:
with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp_file:
file_name = tmp_file.name
c = chord(assert_generator(file_name), tsum.s())
- assert c().get(timeout=TIMEOUT) == 3
+ assert c().get(timeout=TIMEOUT) == size * (size - 1) // 2
@flaky
def test_parallel_chords(self, manager):
Index: celery-5.2.6/t/integration/test_tasks.py
===================================================================
--- celery-5.2.6.orig/t/integration/test_tasks.py
+++ celery-5.2.6/t/integration/test_tasks.py
@@ -29,7 +29,7 @@ class test_class_based_tasks:
def test_class_based_task_retried(self, celery_session_app,
celery_session_worker):
task = ClassBasedAutoRetryTask()
- celery_session_app.tasks.register(task)
+ celery_session_app.register_task(task)
res = task.delay()
assert res.get(timeout=TIMEOUT) == 1
@@ -255,12 +255,17 @@ class test_tasks:
manager.assert_accepted([r1.id])
@flaky
- def test_task_retried(self):
+ def test_task_retried_once(self, manager):
res = retry_once.delay()
assert res.get(timeout=TIMEOUT) == 1 # retried once
@flaky
- def test_task_retried_priority(self):
+ def test_task_retried_once_with_expires(self, manager):
+ res = retry_once.delay(expires=60)
+ assert res.get(timeout=TIMEOUT) == 1 # retried once
+
+ @flaky
+ def test_task_retried_priority(self, manager):
res = retry_once_priority.apply_async(priority=7)
assert res.get(timeout=TIMEOUT) == 7 # retried once with priority 7