diff --git a/5709-avoid-deadlock-ActorFuture.patch b/5709-avoid-deadlock-ActorFuture.patch deleted file mode 100644 index 1eb1683..0000000 --- a/5709-avoid-deadlock-ActorFuture.patch +++ /dev/null @@ -1,468 +0,0 @@ -From 9f54f73be83fd0b3e897d0c077f3132000c57336 Mon Sep 17 00:00:00 2001 -From: Thomas Grainger -Date: Wed, 26 Jan 2022 15:30:16 +0000 -Subject: [PATCH 01/10] avoid deadlock in ActorFuture - -fixes #5708 -fixes #5350 ---- - distributed/__init__.py | 2 - distributed/actor.py | 161 ++++++++++++++++++++++++++++++---------- - distributed/client.py | 4 - distributed/tests/test_actor.py | 95 +++++++++++++++++++---- - docs/source/actors.rst | 8 - - 5 files changed, 209 insertions(+), 61 deletions(-) - ---- a/distributed/__init__.py -+++ b/distributed/__init__.py -@@ -4,7 +4,7 @@ import dask - from dask.config import config # type: ignore - - from ._version import get_versions --from .actor import Actor, ActorFuture -+from .actor import Actor, BaseActorFuture - from .client import ( - Client, - CompatibleExecutor, ---- a/distributed/actor.py -+++ b/distributed/actor.py -@@ -1,6 +1,15 @@ -+from __future__ import annotations -+ -+import abc - import asyncio - import functools -+import sys - import threading -+from dataclasses import dataclass -+from datetime import timedelta -+from typing import TYPE_CHECKING, Generic, NoReturn, TypeVar -+ -+from tornado.ioloop import IOLoop - - from .client import Future - from .protocol import to_serialize -@@ -8,13 +17,52 @@ from .utils import iscoroutinefunction, - from .utils_comm import WrappedKey - from .worker import get_client, get_worker - -+_T = TypeVar("_T") -+ -+if sys.version_info >= (3, 9): -+ from collections.abc import Awaitable, Generator -+else: -+ from typing import Awaitable, Generator -+ -+if sys.version_info >= (3, 8): -+ from typing import Literal -+elif TYPE_CHECKING: -+ from typing_extensions import Literal -+ -+if sys.version_info >= (3, 10): -+ from asyncio import Event as _LateLoopEvent -+else: -+ # In python 3.10 asyncio.Lock and other primitives no longer support -+ # passing a loop kwarg to bind to a loop running in another thread -+ # e.g. calling from Client(asynchronous=False). Instead the loop is bound -+ # as late as possible: when calling any methods that wait on or wake -+ # Future instances. See: https://bugs.python.org/issue42392 -+ class _LateLoopEvent: -+ def __init__(self) -> None: -+ self._event: asyncio.Event | None = None -+ -+ def set(self) -> None: -+ if self._event is None: -+ self._event = asyncio.Event() -+ -+ self._event.set() -+ -+ def is_set(self) -> bool: -+ return self._event is not None and self._event.is_set() -+ -+ async def wait(self) -> bool: -+ if self._event is None: -+ self._event = asyncio.Event() -+ -+ return await self._event.wait() -+ - - class Actor(WrappedKey): - """Controls an object on a remote worker - - An actor allows remote control of a stateful object living on a remote - worker. Method calls on this object trigger operations on the remote -- object and return ActorFutures on which we can block to get results. -+ object and return BaseActorFutures on which we can block to get results. - - Examples - -------- -@@ -36,7 +84,7 @@ class Actor(WrappedKey): - >>> counter - - -- Calling methods on this object immediately returns deferred ``ActorFuture`` -+ Calling methods on this object immediately returns deferred ``BaseActorFuture`` - objects. You can call ``.result()`` on these objects to block and get the - result of the function call. - -@@ -140,9 +188,7 @@ class Actor(WrappedKey): - return attr - - elif callable(attr): -- return lambda *args, **kwargs: ActorFuture( -- None, self._io_loop, result=attr(*args, **kwargs) -- ) -+ return lambda *args, **kwargs: EagerActorFuture(attr(*args, **kwargs)) - else: - return attr - -@@ -166,16 +212,17 @@ class Actor(WrappedKey): - return await run_actor_function_on_worker() - else: - raise OSError("Unable to contact Actor's worker") -- return result -+ if result["status"] == "OK": -+ return _OK(result["result"]) -+ return _Error(result["exception"]) - -- q = asyncio.Queue(loop=self._io_loop.asyncio_loop) -+ actor_future = ActorFuture(io_loop=self._io_loop) - - async def wait_then_add_to_queue(): -- x = await run_actor_function_on_worker() -- await q.put(x) -+ actor_future._set_result(await run_actor_function_on_worker()) - - self._io_loop.add_callback(wait_then_add_to_queue) -- return ActorFuture(q, self._io_loop) -+ return actor_future - - return func - -@@ -215,10 +262,10 @@ class ProxyRPC: - return func - - --class ActorFuture: -+class BaseActorFuture(abc.ABC, Awaitable[_T]): - """Future to an actor's method call - -- Whenever you call a method on an Actor you get an ActorFuture immediately -+ Whenever you call a method on an Actor you get a BaseActorFuture immediately - while the computation happens in the background. You can call ``.result`` - to block and collect the full result - -@@ -227,34 +274,72 @@ class ActorFuture: - Actor - """ - -- def __init__(self, q, io_loop, result=None): -- self.q = q -- self.io_loop = io_loop -- if result: -- self._cached_result = result -- self.status = "pending" -+ @abc.abstractmethod -+ def result(self, timeout: str | timedelta | float | None = None) -> _T: -+ ... -+ -+ @abc.abstractmethod -+ def done(self) -> bool: -+ ... - -- def __await__(self): -- return self._result().__await__() -+ def __repr__(self) -> Literal[""]: -+ return "" - -- def done(self): -- return self.status != "pending" - -- async def _result(self, raiseit=True): -- if not hasattr(self, "_cached_result"): -- out = await self.q.get() -- if out["status"] == "OK": -- self.status = "finished" -- self._cached_result = out["result"] -- else: -- self.status = "error" -- self._cached_result = out["exception"] -- if self.status == "error": -- raise self._cached_result -- return self._cached_result -+@dataclass(frozen=True, eq=False) -+class EagerActorFuture(BaseActorFuture[_T]): -+ """Future to an actor's method call when an actor calls another actor on the same worker""" - -- def result(self, timeout=None): -- return sync(self.io_loop, self._result, callback_timeout=timeout) -+ _result: _T - -- def __repr__(self): -- return "" -+ def __await__(self) -> Generator[object, None, _T]: -+ return self._result -+ yield -+ -+ def result(self, timeout: object = None) -> _T: -+ return self._result -+ -+ def done(self) -> Literal[True]: -+ return True -+ -+ -+@dataclass(frozen=True, eq=False) -+class _OK(Generic[_T]): -+ _v: _T -+ -+ def unwrap(self) -> _T: -+ return self._v -+ -+ -+@dataclass(frozen=True, eq=False) -+class _Error: -+ _e: Exception -+ -+ def unwrap(self) -> NoReturn: -+ raise self._e -+ -+ -+class ActorFuture(BaseActorFuture[_T]): -+ def __init__(self, io_loop: IOLoop): -+ self._io_loop = io_loop -+ self._event = _LateLoopEvent() -+ self._out: _Error | _OK[_T] | None = None -+ -+ def __await__(self) -> Generator[object, None, _T]: -+ return self._result().__await__() -+ -+ def done(self) -> bool: -+ return self._event.is_set() -+ -+ async def _result(self) -> _T: -+ await self._event.wait() -+ out = self._out -+ assert out is not None -+ return out.unwrap() -+ -+ def _set_result(self, out: _Error | _OK[_T]) -> None: -+ self._out = out -+ self._event.set() -+ -+ def result(self, timeout: str | timedelta | float | None = None) -> _T: -+ return sync(self._io_loop, self._result, callback_timeout=timeout) ---- a/distributed/client.py -+++ b/distributed/client.py -@@ -4943,11 +4943,11 @@ class as_completed: - """Add multiple futures to the collection. - - The added futures will emit from the iterator once they finish""" -- from .actor import ActorFuture -+ from .actor import BaseActorFuture - - with self.lock: - for f in futures: -- if not isinstance(f, (Future, ActorFuture)): -+ if not isinstance(f, (Future, BaseActorFuture)): - raise TypeError("Input must be a future, got %s" % f) - self.futures[f] += 1 - self.loop.add_callback(self._track_future, f) ---- a/distributed/tests/test_actor.py -+++ b/distributed/tests/test_actor.py -@@ -8,7 +8,7 @@ import dask - - from distributed import ( - Actor, -- ActorFuture, -+ BaseActorFuture, - Client, - Future, - Nanny, -@@ -16,6 +16,7 @@ from distributed import ( - get_client, - wait, - ) -+from distributed.actor import _LateLoopEvent - from distributed.metrics import time - from distributed.utils_test import cluster, gen_cluster - -@@ -39,16 +40,6 @@ class Counter: - return self.n - - --class UsesCounter: -- # An actor whose method argument is another actor -- -- def do_inc(self, ac): -- return ac.increment().result() -- -- async def ado_inc(self, ac): -- return await ac.ainc() -- -- - class List: - L: list = [] - -@@ -113,7 +104,7 @@ async def test_worker_actions(c, s, a, b - assert counter._address == a_address - - future = counter.increment(separate_thread=separate_thread) -- assert isinstance(future, ActorFuture) -+ assert isinstance(future, BaseActorFuture) - assert "Future" in type(future).__name__ - end = future.result(timeout=1) - assert end > start -@@ -266,6 +257,27 @@ def test_sync(client): - assert "distributed.actor" not in repr(future) - - -+def test_timeout(client): -+ class Waiter: -+ def __init__(self): -+ self.event = _LateLoopEvent() -+ -+ async def set(self): -+ self.event.set() -+ -+ async def wait(self): -+ return await self.event.wait() -+ -+ event = client.submit(Waiter, actor=True).result() -+ future = event.wait() -+ -+ with pytest.raises(asyncio.TimeoutError): -+ future.result(timeout="0.001s") -+ -+ event.set().result() -+ assert future.result() is True -+ -+ - @gen_cluster(client=True, config={"distributed.comm.timeouts.connect": "1s"}) - async def test_failed_worker(c, s, a, b): - future = c.submit(Counter, actor=True, workers=[a.address]) -@@ -538,11 +550,9 @@ async def test_actors_in_profile(c, s, a - - @gen_cluster(client=True) - async def test_waiter(c, s, a, b): -- from tornado.locks import Event -- - class Waiter: - def __init__(self): -- self.event = Event() -+ self.event = _LateLoopEvent() - - async def set(self): - self.event.set() -@@ -618,6 +628,42 @@ def test_worker_actor_handle_is_weakref_ - - - def test_one_thread_deadlock(): -+ class UsesCounter: -+ # An actor whose method argument is another actor -+ -+ def do_inc(self, ac): -+ return ac.increment().result() -+ -+ with cluster(nworkers=2) as (cl, w): -+ client = Client(cl["address"]) -+ ac = client.submit(Counter, actor=True).result() -+ ac2 = client.submit(UsesCounter, actor=True, workers=[ac._address]).result() -+ -+ assert ac2.do_inc(ac).result() == 1 -+ -+ -+def test_one_thread_deadlock_timeout(): -+ class UsesCounter: -+ # An actor whose method argument is another actor -+ -+ def do_inc(self, ac): -+ return ac.increment().result(timeout=1) -+ -+ with cluster(nworkers=2) as (cl, w): -+ client = Client(cl["address"]) -+ ac = client.submit(Counter, actor=True).result() -+ ac2 = client.submit(UsesCounter, actor=True, workers=[ac._address]).result() -+ -+ assert ac2.do_inc(ac).result() == 1 -+ -+ -+def test_one_thread_deadlock_sync_client(): -+ class UsesCounter: -+ # An actor whose method argument is another actor -+ -+ def do_inc(self, ac): -+ return get_client().sync(ac.increment) -+ - with cluster(nworkers=2) as (cl, w): - client = Client(cl["address"]) - ac = client.submit(Counter, actor=True).result() -@@ -628,6 +674,12 @@ def test_one_thread_deadlock(): - - @gen_cluster(client=True) - async def test_async_deadlock(client, s, a, b): -+ class UsesCounter: -+ # An actor whose method argument is another actor -+ -+ async def ado_inc(self, ac): -+ return await ac.ainc() -+ - ac = await client.submit(Counter, actor=True) - ac2 = await client.submit(UsesCounter, actor=True, workers=[ac._address]) - -@@ -698,7 +750,7 @@ async def test_actor_future_awaitable(cl - ac = await client.submit(Counter, actor=True) - futures = [ac.increment() for _ in range(10)] - -- assert all([isinstance(future, ActorFuture) for future in futures]) -+ assert all([isinstance(future, BaseActorFuture) for future in futures]) - - out = await asyncio.gather(*futures) - assert all([future.done() for future in futures]) -@@ -706,6 +758,17 @@ async def test_actor_future_awaitable(cl - - - @gen_cluster(client=True) -+async def test_actor_future_awaitable_deadlock(client, s, a, b): -+ ac = await client.submit(Counter, actor=True) -+ f = ac.increment() -+ -+ async def coro(): -+ return await f -+ -+ assert await asyncio.gather(coro(), coro()) == [1, 1] -+ -+ -+@gen_cluster(client=True) - async def test_serialize_with_pickle(c, s, a, b): - class Foo: - def __init__(self): ---- a/docs/source/actors.rst -+++ b/docs/source/actors.rst -@@ -115,15 +115,15 @@ However accessing an attribute or callin - to the remote worker, run the method on the remote worker in a separate thread - pool, and then communicate the result back to the calling side. For attribute - access these operations block and return when finished, for method calls they --return an ``ActorFuture`` immediately. -+return an ``BaseActorFuture`` immediately. - - .. code-block:: python - -- >>> future = counter.increment() # Immediately returns an ActorFuture -+ >>> future = counter.increment() # Immediately returns a BaseActorFuture - >>> future.result() # Block until finished and result arrives - 1 - --``ActorFuture`` are similar to normal Dask ``Future`` objects, but not as fully -+``BaseActorFuture`` are similar to normal Dask ``Future`` objects, but not as fully - featured. They curently *only* support the ``result`` method and nothing else. - They don't currently work with any other Dask functions that expect futures, - like ``as_completed``, ``wait``, or ``client.gather``. They can't be placed -@@ -167,7 +167,7 @@ workers have only a single thread for ac - future. - - The result is sent back immediately to the calling side, and is not stored on --the worker with the actor. It is cached on the ``ActorFuture`` object. -+the worker with the actor. It is cached on the ``BaseActorFuture`` object. - - - Calling from coroutines and async/await diff --git a/_multibuild b/_multibuild index 9bbe6ad..73ab5e8 100644 --- a/_multibuild +++ b/_multibuild @@ -1,5 +1,5 @@ test-py38 test-py39 - + test-py310 diff --git a/distributed-2022.01.1-gh.tar.gz b/distributed-2022.01.1-gh.tar.gz deleted file mode 100644 index 8d4a5d3..0000000 --- a/distributed-2022.01.1-gh.tar.gz +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:c5a110cad409e77782fa62e7759819633856b21a2cfba98690bf62bd86ddaa57 -size 1576749 diff --git a/distributed-2022.02.1-gh.tar.gz b/distributed-2022.02.1-gh.tar.gz new file mode 100644 index 0000000..e21a4e7 --- /dev/null +++ b/distributed-2022.02.1-gh.tar.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:d015e781efad256fa32ae36b4278252dfbe20b1cfd7cb51bf0f44349cfcb816f +size 1606067 diff --git a/python-distributed.changes b/python-distributed.changes index d19978c..2086ff3 100644 --- a/python-distributed.changes +++ b/python-distributed.changes @@ -1,3 +1,81 @@ +------------------------------------------------------------------- +Tue Mar 8 07:46:52 UTC 2022 - Matej Cepl + +- Update to 2022.02.1: + Add the ability for Client to run preload code + Optionally use NumPy to allocate buffers + Add git hash to distributed-impl version + Immediately raise exception when trying to connect to a closed cluster + Lazily get dask version information + Remove the requirements to add comm to every handler + Raise on unclosed comms in check_instances + Constrained spill + Remove redundant str() conversions + Cluster dump now excludes run_spec by default + Dump more objects with dump_cluster_state + Do not connect to any sockets on import + Avoid deadlock when two tasks are concurrently waiting for an unresolved ActorFuture + Drop Python 3.7 + Remove support for UCX < 1.11.1 + Document and test spill->target hysteresis cycle + Fix flaky test_remove_replicas_while_computing + Fix time based test_assert_worker_story_malformed_story parameterize + Remove xfail from test_no_unnecessary_imports_on_worker + Start building pre-releases with cythonized scheduler + Do not mark tests xfail if they don't come up in time + Use gen_cluster where possible in test_dask_worker.py + Generate junit report when pytest-timeout kills pytest + Decrease timeout-minutes for GHA jobs + Bump pre-release version to be greater than stable releases + Do not run schedule jobs on forks + Remove pillow<9 pin in CI + Show scheduled test runs in report + Add obvious exclusions with pragma statement + Add coverage exclusions for cli files + Add pragma statements + Remove pragma: no cover from distributed.cli.dask_ssh + Add pragma - worker.py, client.py, stealing.py + Relax distributed / dask-core dependencies for pre-releases + Remove test_ucx_config_w_env_var flaky condition +- Update to 2022.02.0: + Update client.scheduler_info in wait_for_workers + Increase robustness to TimeoutError during connect + Respect KeyboardInterrupt in sync + Add workflow / recipe to generate Dask/distributed pre-releases + Review Scheduler / Worker display repr + AMM: Graceful Worker Retirement + AMM: tentatively stabilize flaky tests around worker pause + AMM: speed up and stabilize test_memory + Defer pandas import on worker in P2P shuffle + Fix for distributed.worker.memory.target=False and spill=0.7 + Transition flight to missing if no who_has + Remove deprecated ncores + Deprecate registering plugins by class + Deprecate --nprocs option for dask-worker CLI + Fix imbalanced backticks + xfail test_worker_reconnects_mid_compute + Fix linting CI build + Update pre-commit versions + Reactivate pytest_resourceleaks + Set test assumption for test_client_timeout + Remove client timeout from test_ucx_config_w_env_var + Remove test_failed_worker_without_warning + Fix longitudinal report + Fix flaky test_robust_to_bad_sizeof_estimates + Revert "Pin coverage to 6.2 + Trigger test runs periodically to increases failure statistics + More fault tolerant test report + Pin pillow<9 to work around torch incompatability + Overhaul check_process_leak + Fix flaky test_exit_callback test + Generate tests summary + Upload different architectured pre-releases separately + Ignore non-test directories + Bump gpuCI PYTHON_VER to 3.9 + Regression: threads noted down before they start +- Remove upstreamed patches: + - 5709-avoid-deadlock-ActorFuture.patch + ------------------------------------------------------------------- Mon Feb 14 15:12:44 UTC 2022 - Matej Cepl @@ -5,7 +83,6 @@ Mon Feb 14 15:12:44 UTC 2022 - Matej Cepl ActorFuture (gh#dask/distributed#5709). ------------------------------------------------------------------- - Sat Jan 29 17:34:23 UTC 2022 - Ben Greiner - Update to version 2022.1.1 diff --git a/python-distributed.spec b/python-distributed.spec index 94d3ca7..f8935ea 100644 --- a/python-distributed.spec +++ b/python-distributed.spec @@ -32,14 +32,14 @@ %endif %if "%{flavor}" == "test-py310" # add to _multibuild when enabling python310 (see below) -%define psuffix -test-py310" +%define psuffix -test-py310 %define skip_python38 1 %define skip_python39 1 %bcond_without test %endif %if "%{flavor}" == "" # https://github.com/dask/distributed/issues/5350 -- NOT fixed by https://github.com/dask/distributed/pull/5353 -%define skip_python310 1 +# %%define skip_python310 1 %bcond_with test %endif @@ -55,19 +55,15 @@ %{?!python_module:%define python_module() python3-%{**}} %define skip_python2 1 -%define ghversiontag 2022.01.1 Name: python-distributed%{psuffix} # Note: please always update together with python-dask -Version: 2022.1.1 +Version: 2022.02.1 Release: 0 Summary: Library for distributed computing with Python License: BSD-3-Clause URL: https://distributed.readthedocs.io/en/latest/ -Source: https://github.com/dask/distributed/archive/refs/tags//%{ghversiontag}.tar.gz#/distributed-%{ghversiontag}-gh.tar.gz +Source: https://github.com/dask/distributed/archive/refs/tags//%{version}.tar.gz#/distributed-%{version}-gh.tar.gz Source99: python-distributed-rpmlintrc -# PATCH-FIX-UPSTREAM 5709-avoid-deadlock-ActorFuture.patch gh#dask/distributed#5709 mcepl@suse.com -# avoid deadlock in ActorFuture -Patch0: 5709-avoid-deadlock-ActorFuture.patch BuildRequires: %{python_module base >= 3.7} BuildRequires: %{python_module setuptools} BuildRequires: fdupes @@ -125,7 +121,7 @@ extends both the concurrent.futures and dask APIs to moderate sized clusters. %prep -%autosetup -p1 -n distributed-%{ghversiontag} +%autosetup -p1 -n distributed-%{version} sed -i '/addopts/ {s/--durations=20//; s/--color=yes//}' setup.cfg @@ -176,7 +172,8 @@ fi %python_alternative %{_bindir}/dask-scheduler %python_alternative %{_bindir}/dask-worker %{python_sitearch}/distributed -%{python_sitearch}/distributed-%{version}*-info +%{python_sitearch}/distributed-%(echo %{version}|sed -e 's/\.0/./')*-info + %endif %changelog