Accepting request 960175 from home:mcepl:branches:devel:languages:python:numeric

- Update to 2022.02.1:
    Add the ability for Client to run preload code
    Optionally use NumPy to allocate buffers
    Add git hash to distributed-impl version
    Immediately raise exception when trying to connect to a closed cluster
    Lazily get dask version information
    Remove the requirements to add comm to every handler
    Raise on unclosed comms in check_instances
    Constrained spill
    Remove redundant str() conversions
    Cluster dump now excludes run_spec by default
    Dump more objects with dump_cluster_state
    Do not connect to any sockets on import
    Avoid deadlock when two tasks are concurrently waiting for an unresolved ActorFuture
    Drop Python 3.7
    Remove support for UCX < 1.11.1
    Document and test spill->target hysteresis cycle
    Fix flaky test_remove_replicas_while_computing
    Fix time based test_assert_worker_story_malformed_story parameterize
    Remove xfail from test_no_unnecessary_imports_on_worker
    Start building pre-releases with cythonized scheduler
    Do not mark tests xfail if they don't come up in time
    Use gen_cluster where possible in test_dask_worker.py
    Generate junit report when pytest-timeout kills pytest
    Decrease timeout-minutes for GHA jobs
    Bump pre-release version to be greater than stable releases
    Do not run schedule jobs on forks
    Remove pillow<9 pin in CI
    Show scheduled test runs in report
    Add obvious exclusions with pragma statement

OBS-URL: https://build.opensuse.org/request/show/960175
OBS-URL: https://build.opensuse.org/package/show/devel:languages:python:numeric/python-distributed?expand=0&rev=106
This commit is contained in:
Matej Cepl 2022-03-08 10:24:12 +00:00 committed by Git OBS Bridge
parent b8e6adb4b0
commit d7688e0226
6 changed files with 89 additions and 483 deletions

View File

@ -1,468 +0,0 @@
From 9f54f73be83fd0b3e897d0c077f3132000c57336 Mon Sep 17 00:00:00 2001
From: Thomas Grainger <tagrain@gmail.com>
Date: Wed, 26 Jan 2022 15:30:16 +0000
Subject: [PATCH 01/10] avoid deadlock in ActorFuture
fixes #5708
fixes #5350
---
distributed/__init__.py | 2
distributed/actor.py | 161 ++++++++++++++++++++++++++++++----------
distributed/client.py | 4
distributed/tests/test_actor.py | 95 +++++++++++++++++++----
docs/source/actors.rst | 8 -
5 files changed, 209 insertions(+), 61 deletions(-)
--- a/distributed/__init__.py
+++ b/distributed/__init__.py
@@ -4,7 +4,7 @@ import dask
from dask.config import config # type: ignore
from ._version import get_versions
-from .actor import Actor, ActorFuture
+from .actor import Actor, BaseActorFuture
from .client import (
Client,
CompatibleExecutor,
--- a/distributed/actor.py
+++ b/distributed/actor.py
@@ -1,6 +1,15 @@
+from __future__ import annotations
+
+import abc
import asyncio
import functools
+import sys
import threading
+from dataclasses import dataclass
+from datetime import timedelta
+from typing import TYPE_CHECKING, Generic, NoReturn, TypeVar
+
+from tornado.ioloop import IOLoop
from .client import Future
from .protocol import to_serialize
@@ -8,13 +17,52 @@ from .utils import iscoroutinefunction,
from .utils_comm import WrappedKey
from .worker import get_client, get_worker
+_T = TypeVar("_T")
+
+if sys.version_info >= (3, 9):
+ from collections.abc import Awaitable, Generator
+else:
+ from typing import Awaitable, Generator
+
+if sys.version_info >= (3, 8):
+ from typing import Literal
+elif TYPE_CHECKING:
+ from typing_extensions import Literal
+
+if sys.version_info >= (3, 10):
+ from asyncio import Event as _LateLoopEvent
+else:
+ # In python 3.10 asyncio.Lock and other primitives no longer support
+ # passing a loop kwarg to bind to a loop running in another thread
+ # e.g. calling from Client(asynchronous=False). Instead the loop is bound
+ # as late as possible: when calling any methods that wait on or wake
+ # Future instances. See: https://bugs.python.org/issue42392
+ class _LateLoopEvent:
+ def __init__(self) -> None:
+ self._event: asyncio.Event | None = None
+
+ def set(self) -> None:
+ if self._event is None:
+ self._event = asyncio.Event()
+
+ self._event.set()
+
+ def is_set(self) -> bool:
+ return self._event is not None and self._event.is_set()
+
+ async def wait(self) -> bool:
+ if self._event is None:
+ self._event = asyncio.Event()
+
+ return await self._event.wait()
+
class Actor(WrappedKey):
"""Controls an object on a remote worker
An actor allows remote control of a stateful object living on a remote
worker. Method calls on this object trigger operations on the remote
- object and return ActorFutures on which we can block to get results.
+ object and return BaseActorFutures on which we can block to get results.
Examples
--------
@@ -36,7 +84,7 @@ class Actor(WrappedKey):
>>> counter
<Actor: Counter, key=Counter-1234abcd>
- Calling methods on this object immediately returns deferred ``ActorFuture``
+ Calling methods on this object immediately returns deferred ``BaseActorFuture``
objects. You can call ``.result()`` on these objects to block and get the
result of the function call.
@@ -140,9 +188,7 @@ class Actor(WrappedKey):
return attr
elif callable(attr):
- return lambda *args, **kwargs: ActorFuture(
- None, self._io_loop, result=attr(*args, **kwargs)
- )
+ return lambda *args, **kwargs: EagerActorFuture(attr(*args, **kwargs))
else:
return attr
@@ -166,16 +212,17 @@ class Actor(WrappedKey):
return await run_actor_function_on_worker()
else:
raise OSError("Unable to contact Actor's worker")
- return result
+ if result["status"] == "OK":
+ return _OK(result["result"])
+ return _Error(result["exception"])
- q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
+ actor_future = ActorFuture(io_loop=self._io_loop)
async def wait_then_add_to_queue():
- x = await run_actor_function_on_worker()
- await q.put(x)
+ actor_future._set_result(await run_actor_function_on_worker())
self._io_loop.add_callback(wait_then_add_to_queue)
- return ActorFuture(q, self._io_loop)
+ return actor_future
return func
@@ -215,10 +262,10 @@ class ProxyRPC:
return func
-class ActorFuture:
+class BaseActorFuture(abc.ABC, Awaitable[_T]):
"""Future to an actor's method call
- Whenever you call a method on an Actor you get an ActorFuture immediately
+ Whenever you call a method on an Actor you get a BaseActorFuture immediately
while the computation happens in the background. You can call ``.result``
to block and collect the full result
@@ -227,34 +274,72 @@ class ActorFuture:
Actor
"""
- def __init__(self, q, io_loop, result=None):
- self.q = q
- self.io_loop = io_loop
- if result:
- self._cached_result = result
- self.status = "pending"
+ @abc.abstractmethod
+ def result(self, timeout: str | timedelta | float | None = None) -> _T:
+ ...
+
+ @abc.abstractmethod
+ def done(self) -> bool:
+ ...
- def __await__(self):
- return self._result().__await__()
+ def __repr__(self) -> Literal["<ActorFuture>"]:
+ return "<ActorFuture>"
- def done(self):
- return self.status != "pending"
- async def _result(self, raiseit=True):
- if not hasattr(self, "_cached_result"):
- out = await self.q.get()
- if out["status"] == "OK":
- self.status = "finished"
- self._cached_result = out["result"]
- else:
- self.status = "error"
- self._cached_result = out["exception"]
- if self.status == "error":
- raise self._cached_result
- return self._cached_result
+@dataclass(frozen=True, eq=False)
+class EagerActorFuture(BaseActorFuture[_T]):
+ """Future to an actor's method call when an actor calls another actor on the same worker"""
- def result(self, timeout=None):
- return sync(self.io_loop, self._result, callback_timeout=timeout)
+ _result: _T
- def __repr__(self):
- return "<ActorFuture>"
+ def __await__(self) -> Generator[object, None, _T]:
+ return self._result
+ yield
+
+ def result(self, timeout: object = None) -> _T:
+ return self._result
+
+ def done(self) -> Literal[True]:
+ return True
+
+
+@dataclass(frozen=True, eq=False)
+class _OK(Generic[_T]):
+ _v: _T
+
+ def unwrap(self) -> _T:
+ return self._v
+
+
+@dataclass(frozen=True, eq=False)
+class _Error:
+ _e: Exception
+
+ def unwrap(self) -> NoReturn:
+ raise self._e
+
+
+class ActorFuture(BaseActorFuture[_T]):
+ def __init__(self, io_loop: IOLoop):
+ self._io_loop = io_loop
+ self._event = _LateLoopEvent()
+ self._out: _Error | _OK[_T] | None = None
+
+ def __await__(self) -> Generator[object, None, _T]:
+ return self._result().__await__()
+
+ def done(self) -> bool:
+ return self._event.is_set()
+
+ async def _result(self) -> _T:
+ await self._event.wait()
+ out = self._out
+ assert out is not None
+ return out.unwrap()
+
+ def _set_result(self, out: _Error | _OK[_T]) -> None:
+ self._out = out
+ self._event.set()
+
+ def result(self, timeout: str | timedelta | float | None = None) -> _T:
+ return sync(self._io_loop, self._result, callback_timeout=timeout)
--- a/distributed/client.py
+++ b/distributed/client.py
@@ -4943,11 +4943,11 @@ class as_completed:
"""Add multiple futures to the collection.
The added futures will emit from the iterator once they finish"""
- from .actor import ActorFuture
+ from .actor import BaseActorFuture
with self.lock:
for f in futures:
- if not isinstance(f, (Future, ActorFuture)):
+ if not isinstance(f, (Future, BaseActorFuture)):
raise TypeError("Input must be a future, got %s" % f)
self.futures[f] += 1
self.loop.add_callback(self._track_future, f)
--- a/distributed/tests/test_actor.py
+++ b/distributed/tests/test_actor.py
@@ -8,7 +8,7 @@ import dask
from distributed import (
Actor,
- ActorFuture,
+ BaseActorFuture,
Client,
Future,
Nanny,
@@ -16,6 +16,7 @@ from distributed import (
get_client,
wait,
)
+from distributed.actor import _LateLoopEvent
from distributed.metrics import time
from distributed.utils_test import cluster, gen_cluster
@@ -39,16 +40,6 @@ class Counter:
return self.n
-class UsesCounter:
- # An actor whose method argument is another actor
-
- def do_inc(self, ac):
- return ac.increment().result()
-
- async def ado_inc(self, ac):
- return await ac.ainc()
-
-
class List:
L: list = []
@@ -113,7 +104,7 @@ async def test_worker_actions(c, s, a, b
assert counter._address == a_address
future = counter.increment(separate_thread=separate_thread)
- assert isinstance(future, ActorFuture)
+ assert isinstance(future, BaseActorFuture)
assert "Future" in type(future).__name__
end = future.result(timeout=1)
assert end > start
@@ -266,6 +257,27 @@ def test_sync(client):
assert "distributed.actor" not in repr(future)
+def test_timeout(client):
+ class Waiter:
+ def __init__(self):
+ self.event = _LateLoopEvent()
+
+ async def set(self):
+ self.event.set()
+
+ async def wait(self):
+ return await self.event.wait()
+
+ event = client.submit(Waiter, actor=True).result()
+ future = event.wait()
+
+ with pytest.raises(asyncio.TimeoutError):
+ future.result(timeout="0.001s")
+
+ event.set().result()
+ assert future.result() is True
+
+
@gen_cluster(client=True, config={"distributed.comm.timeouts.connect": "1s"})
async def test_failed_worker(c, s, a, b):
future = c.submit(Counter, actor=True, workers=[a.address])
@@ -538,11 +550,9 @@ async def test_actors_in_profile(c, s, a
@gen_cluster(client=True)
async def test_waiter(c, s, a, b):
- from tornado.locks import Event
-
class Waiter:
def __init__(self):
- self.event = Event()
+ self.event = _LateLoopEvent()
async def set(self):
self.event.set()
@@ -618,6 +628,42 @@ def test_worker_actor_handle_is_weakref_
def test_one_thread_deadlock():
+ class UsesCounter:
+ # An actor whose method argument is another actor
+
+ def do_inc(self, ac):
+ return ac.increment().result()
+
+ with cluster(nworkers=2) as (cl, w):
+ client = Client(cl["address"])
+ ac = client.submit(Counter, actor=True).result()
+ ac2 = client.submit(UsesCounter, actor=True, workers=[ac._address]).result()
+
+ assert ac2.do_inc(ac).result() == 1
+
+
+def test_one_thread_deadlock_timeout():
+ class UsesCounter:
+ # An actor whose method argument is another actor
+
+ def do_inc(self, ac):
+ return ac.increment().result(timeout=1)
+
+ with cluster(nworkers=2) as (cl, w):
+ client = Client(cl["address"])
+ ac = client.submit(Counter, actor=True).result()
+ ac2 = client.submit(UsesCounter, actor=True, workers=[ac._address]).result()
+
+ assert ac2.do_inc(ac).result() == 1
+
+
+def test_one_thread_deadlock_sync_client():
+ class UsesCounter:
+ # An actor whose method argument is another actor
+
+ def do_inc(self, ac):
+ return get_client().sync(ac.increment)
+
with cluster(nworkers=2) as (cl, w):
client = Client(cl["address"])
ac = client.submit(Counter, actor=True).result()
@@ -628,6 +674,12 @@ def test_one_thread_deadlock():
@gen_cluster(client=True)
async def test_async_deadlock(client, s, a, b):
+ class UsesCounter:
+ # An actor whose method argument is another actor
+
+ async def ado_inc(self, ac):
+ return await ac.ainc()
+
ac = await client.submit(Counter, actor=True)
ac2 = await client.submit(UsesCounter, actor=True, workers=[ac._address])
@@ -698,7 +750,7 @@ async def test_actor_future_awaitable(cl
ac = await client.submit(Counter, actor=True)
futures = [ac.increment() for _ in range(10)]
- assert all([isinstance(future, ActorFuture) for future in futures])
+ assert all([isinstance(future, BaseActorFuture) for future in futures])
out = await asyncio.gather(*futures)
assert all([future.done() for future in futures])
@@ -706,6 +758,17 @@ async def test_actor_future_awaitable(cl
@gen_cluster(client=True)
+async def test_actor_future_awaitable_deadlock(client, s, a, b):
+ ac = await client.submit(Counter, actor=True)
+ f = ac.increment()
+
+ async def coro():
+ return await f
+
+ assert await asyncio.gather(coro(), coro()) == [1, 1]
+
+
+@gen_cluster(client=True)
async def test_serialize_with_pickle(c, s, a, b):
class Foo:
def __init__(self):
--- a/docs/source/actors.rst
+++ b/docs/source/actors.rst
@@ -115,15 +115,15 @@ However accessing an attribute or callin
to the remote worker, run the method on the remote worker in a separate thread
pool, and then communicate the result back to the calling side. For attribute
access these operations block and return when finished, for method calls they
-return an ``ActorFuture`` immediately.
+return an ``BaseActorFuture`` immediately.
.. code-block:: python
- >>> future = counter.increment() # Immediately returns an ActorFuture
+ >>> future = counter.increment() # Immediately returns a BaseActorFuture
>>> future.result() # Block until finished and result arrives
1
-``ActorFuture`` are similar to normal Dask ``Future`` objects, but not as fully
+``BaseActorFuture`` are similar to normal Dask ``Future`` objects, but not as fully
featured. They curently *only* support the ``result`` method and nothing else.
They don't currently work with any other Dask functions that expect futures,
like ``as_completed``, ``wait``, or ``client.gather``. They can't be placed
@@ -167,7 +167,7 @@ workers have only a single thread for ac
future.
The result is sent back immediately to the calling side, and is not stored on
-the worker with the actor. It is cached on the ``ActorFuture`` object.
+the worker with the actor. It is cached on the ``BaseActorFuture`` object.
Calling from coroutines and async/await

View File

@ -1,5 +1,5 @@
<multibuild>
<package>test-py38</package>
<package>test-py39</package>
<!-- package>test-py310</package -->
<package>test-py310</package>
</multibuild>

View File

@ -1,3 +0,0 @@
version https://git-lfs.github.com/spec/v1
oid sha256:c5a110cad409e77782fa62e7759819633856b21a2cfba98690bf62bd86ddaa57
size 1576749

View File

@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:d015e781efad256fa32ae36b4278252dfbe20b1cfd7cb51bf0f44349cfcb816f
size 1606067

View File

@ -1,3 +1,81 @@
-------------------------------------------------------------------
Tue Mar 8 07:46:52 UTC 2022 - Matej Cepl <mcepl@suse.com>
- Update to 2022.02.1:
Add the ability for Client to run preload code
Optionally use NumPy to allocate buffers
Add git hash to distributed-impl version
Immediately raise exception when trying to connect to a closed cluster
Lazily get dask version information
Remove the requirements to add comm to every handler
Raise on unclosed comms in check_instances
Constrained spill
Remove redundant str() conversions
Cluster dump now excludes run_spec by default
Dump more objects with dump_cluster_state
Do not connect to any sockets on import
Avoid deadlock when two tasks are concurrently waiting for an unresolved ActorFuture
Drop Python 3.7
Remove support for UCX < 1.11.1
Document and test spill->target hysteresis cycle
Fix flaky test_remove_replicas_while_computing
Fix time based test_assert_worker_story_malformed_story parameterize
Remove xfail from test_no_unnecessary_imports_on_worker
Start building pre-releases with cythonized scheduler
Do not mark tests xfail if they don't come up in time
Use gen_cluster where possible in test_dask_worker.py
Generate junit report when pytest-timeout kills pytest
Decrease timeout-minutes for GHA jobs
Bump pre-release version to be greater than stable releases
Do not run schedule jobs on forks
Remove pillow<9 pin in CI
Show scheduled test runs in report
Add obvious exclusions with pragma statement
Add coverage exclusions for cli files
Add pragma statements
Remove pragma: no cover from distributed.cli.dask_ssh
Add pragma - worker.py, client.py, stealing.py
Relax distributed / dask-core dependencies for pre-releases
Remove test_ucx_config_w_env_var flaky condition
- Update to 2022.02.0:
Update client.scheduler_info in wait_for_workers
Increase robustness to TimeoutError during connect
Respect KeyboardInterrupt in sync
Add workflow / recipe to generate Dask/distributed pre-releases
Review Scheduler / Worker display repr
AMM: Graceful Worker Retirement
AMM: tentatively stabilize flaky tests around worker pause
AMM: speed up and stabilize test_memory
Defer pandas import on worker in P2P shuffle
Fix for distributed.worker.memory.target=False and spill=0.7
Transition flight to missing if no who_has
Remove deprecated ncores
Deprecate registering plugins by class
Deprecate --nprocs option for dask-worker CLI
Fix imbalanced backticks
xfail test_worker_reconnects_mid_compute
Fix linting CI build
Update pre-commit versions
Reactivate pytest_resourceleaks
Set test assumption for test_client_timeout
Remove client timeout from test_ucx_config_w_env_var
Remove test_failed_worker_without_warning
Fix longitudinal report
Fix flaky test_robust_to_bad_sizeof_estimates
Revert "Pin coverage to 6.2
Trigger test runs periodically to increases failure statistics
More fault tolerant test report
Pin pillow<9 to work around torch incompatability
Overhaul check_process_leak
Fix flaky test_exit_callback test
Generate tests summary
Upload different architectured pre-releases separately
Ignore non-test directories
Bump gpuCI PYTHON_VER to 3.9
Regression: threads noted down before they start
- Remove upstreamed patches:
- 5709-avoid-deadlock-ActorFuture.patch
-------------------------------------------------------------------
Mon Feb 14 15:12:44 UTC 2022 - Matej Cepl <mcepl@suse.com>
@ -5,7 +83,6 @@ Mon Feb 14 15:12:44 UTC 2022 - Matej Cepl <mcepl@suse.com>
ActorFuture (gh#dask/distributed#5709).
-------------------------------------------------------------------
Sat Jan 29 17:34:23 UTC 2022 - Ben Greiner <code@bnavigator.de>
- Update to version 2022.1.1

View File

@ -32,14 +32,14 @@
%endif
%if "%{flavor}" == "test-py310"
# add to _multibuild when enabling python310 (see below)
%define psuffix -test-py310"
%define psuffix -test-py310
%define skip_python38 1
%define skip_python39 1
%bcond_without test
%endif
%if "%{flavor}" == ""
# https://github.com/dask/distributed/issues/5350 -- NOT fixed by https://github.com/dask/distributed/pull/5353
%define skip_python310 1
# %%define skip_python310 1
%bcond_with test
%endif
@ -55,19 +55,15 @@
%{?!python_module:%define python_module() python3-%{**}}
%define skip_python2 1
%define ghversiontag 2022.01.1
Name: python-distributed%{psuffix}
# Note: please always update together with python-dask
Version: 2022.1.1
Version: 2022.02.1
Release: 0
Summary: Library for distributed computing with Python
License: BSD-3-Clause
URL: https://distributed.readthedocs.io/en/latest/
Source: https://github.com/dask/distributed/archive/refs/tags//%{ghversiontag}.tar.gz#/distributed-%{ghversiontag}-gh.tar.gz
Source: https://github.com/dask/distributed/archive/refs/tags//%{version}.tar.gz#/distributed-%{version}-gh.tar.gz
Source99: python-distributed-rpmlintrc
# PATCH-FIX-UPSTREAM 5709-avoid-deadlock-ActorFuture.patch gh#dask/distributed#5709 mcepl@suse.com
# avoid deadlock in ActorFuture
Patch0: 5709-avoid-deadlock-ActorFuture.patch
BuildRequires: %{python_module base >= 3.7}
BuildRequires: %{python_module setuptools}
BuildRequires: fdupes
@ -125,7 +121,7 @@ extends both the concurrent.futures and dask APIs to moderate sized
clusters.
%prep
%autosetup -p1 -n distributed-%{ghversiontag}
%autosetup -p1 -n distributed-%{version}
sed -i '/addopts/ {s/--durations=20//; s/--color=yes//}' setup.cfg
@ -176,7 +172,8 @@ fi
%python_alternative %{_bindir}/dask-scheduler
%python_alternative %{_bindir}/dask-worker
%{python_sitearch}/distributed
%{python_sitearch}/distributed-%{version}*-info
%{python_sitearch}/distributed-%(echo %{version}|sed -e 's/\.0/./')*-info
%endif
%changelog