From d7688e02268c58dd7e7979b8cb8c6b70b55622dd491f0b743b0de04d6ea50a79 Mon Sep 17 00:00:00 2001 From: Matej Cepl Date: Tue, 8 Mar 2022 10:24:12 +0000 Subject: [PATCH 1/4] Accepting request 960175 from home:mcepl:branches:devel:languages:python:numeric - Update to 2022.02.1: Add the ability for Client to run preload code Optionally use NumPy to allocate buffers Add git hash to distributed-impl version Immediately raise exception when trying to connect to a closed cluster Lazily get dask version information Remove the requirements to add comm to every handler Raise on unclosed comms in check_instances Constrained spill Remove redundant str() conversions Cluster dump now excludes run_spec by default Dump more objects with dump_cluster_state Do not connect to any sockets on import Avoid deadlock when two tasks are concurrently waiting for an unresolved ActorFuture Drop Python 3.7 Remove support for UCX < 1.11.1 Document and test spill->target hysteresis cycle Fix flaky test_remove_replicas_while_computing Fix time based test_assert_worker_story_malformed_story parameterize Remove xfail from test_no_unnecessary_imports_on_worker Start building pre-releases with cythonized scheduler Do not mark tests xfail if they don't come up in time Use gen_cluster where possible in test_dask_worker.py Generate junit report when pytest-timeout kills pytest Decrease timeout-minutes for GHA jobs Bump pre-release version to be greater than stable releases Do not run schedule jobs on forks Remove pillow<9 pin in CI Show scheduled test runs in report Add obvious exclusions with pragma statement OBS-URL: https://build.opensuse.org/request/show/960175 OBS-URL: https://build.opensuse.org/package/show/devel:languages:python:numeric/python-distributed?expand=0&rev=106 --- 5709-avoid-deadlock-ActorFuture.patch | 468 -------------------------- _multibuild | 2 +- distributed-2022.01.1-gh.tar.gz | 3 - distributed-2022.02.1-gh.tar.gz | 3 + python-distributed.changes | 79 ++++- python-distributed.spec | 17 +- 6 files changed, 89 insertions(+), 483 deletions(-) delete mode 100644 5709-avoid-deadlock-ActorFuture.patch delete mode 100644 distributed-2022.01.1-gh.tar.gz create mode 100644 distributed-2022.02.1-gh.tar.gz diff --git a/5709-avoid-deadlock-ActorFuture.patch b/5709-avoid-deadlock-ActorFuture.patch deleted file mode 100644 index 1eb1683..0000000 --- a/5709-avoid-deadlock-ActorFuture.patch +++ /dev/null @@ -1,468 +0,0 @@ -From 9f54f73be83fd0b3e897d0c077f3132000c57336 Mon Sep 17 00:00:00 2001 -From: Thomas Grainger -Date: Wed, 26 Jan 2022 15:30:16 +0000 -Subject: [PATCH 01/10] avoid deadlock in ActorFuture - -fixes #5708 -fixes #5350 ---- - distributed/__init__.py | 2 - distributed/actor.py | 161 ++++++++++++++++++++++++++++++---------- - distributed/client.py | 4 - distributed/tests/test_actor.py | 95 +++++++++++++++++++---- - docs/source/actors.rst | 8 - - 5 files changed, 209 insertions(+), 61 deletions(-) - ---- a/distributed/__init__.py -+++ b/distributed/__init__.py -@@ -4,7 +4,7 @@ import dask - from dask.config import config # type: ignore - - from ._version import get_versions --from .actor import Actor, ActorFuture -+from .actor import Actor, BaseActorFuture - from .client import ( - Client, - CompatibleExecutor, ---- a/distributed/actor.py -+++ b/distributed/actor.py -@@ -1,6 +1,15 @@ -+from __future__ import annotations -+ -+import abc - import asyncio - import functools -+import sys - import threading -+from dataclasses import dataclass -+from datetime import timedelta -+from typing import TYPE_CHECKING, Generic, NoReturn, TypeVar -+ -+from tornado.ioloop import IOLoop - - from .client import Future - from .protocol import to_serialize -@@ -8,13 +17,52 @@ from .utils import iscoroutinefunction, - from .utils_comm import WrappedKey - from .worker import get_client, get_worker - -+_T = TypeVar("_T") -+ -+if sys.version_info >= (3, 9): -+ from collections.abc import Awaitable, Generator -+else: -+ from typing import Awaitable, Generator -+ -+if sys.version_info >= (3, 8): -+ from typing import Literal -+elif TYPE_CHECKING: -+ from typing_extensions import Literal -+ -+if sys.version_info >= (3, 10): -+ from asyncio import Event as _LateLoopEvent -+else: -+ # In python 3.10 asyncio.Lock and other primitives no longer support -+ # passing a loop kwarg to bind to a loop running in another thread -+ # e.g. calling from Client(asynchronous=False). Instead the loop is bound -+ # as late as possible: when calling any methods that wait on or wake -+ # Future instances. See: https://bugs.python.org/issue42392 -+ class _LateLoopEvent: -+ def __init__(self) -> None: -+ self._event: asyncio.Event | None = None -+ -+ def set(self) -> None: -+ if self._event is None: -+ self._event = asyncio.Event() -+ -+ self._event.set() -+ -+ def is_set(self) -> bool: -+ return self._event is not None and self._event.is_set() -+ -+ async def wait(self) -> bool: -+ if self._event is None: -+ self._event = asyncio.Event() -+ -+ return await self._event.wait() -+ - - class Actor(WrappedKey): - """Controls an object on a remote worker - - An actor allows remote control of a stateful object living on a remote - worker. Method calls on this object trigger operations on the remote -- object and return ActorFutures on which we can block to get results. -+ object and return BaseActorFutures on which we can block to get results. - - Examples - -------- -@@ -36,7 +84,7 @@ class Actor(WrappedKey): - >>> counter - - -- Calling methods on this object immediately returns deferred ``ActorFuture`` -+ Calling methods on this object immediately returns deferred ``BaseActorFuture`` - objects. You can call ``.result()`` on these objects to block and get the - result of the function call. - -@@ -140,9 +188,7 @@ class Actor(WrappedKey): - return attr - - elif callable(attr): -- return lambda *args, **kwargs: ActorFuture( -- None, self._io_loop, result=attr(*args, **kwargs) -- ) -+ return lambda *args, **kwargs: EagerActorFuture(attr(*args, **kwargs)) - else: - return attr - -@@ -166,16 +212,17 @@ class Actor(WrappedKey): - return await run_actor_function_on_worker() - else: - raise OSError("Unable to contact Actor's worker") -- return result -+ if result["status"] == "OK": -+ return _OK(result["result"]) -+ return _Error(result["exception"]) - -- q = asyncio.Queue(loop=self._io_loop.asyncio_loop) -+ actor_future = ActorFuture(io_loop=self._io_loop) - - async def wait_then_add_to_queue(): -- x = await run_actor_function_on_worker() -- await q.put(x) -+ actor_future._set_result(await run_actor_function_on_worker()) - - self._io_loop.add_callback(wait_then_add_to_queue) -- return ActorFuture(q, self._io_loop) -+ return actor_future - - return func - -@@ -215,10 +262,10 @@ class ProxyRPC: - return func - - --class ActorFuture: -+class BaseActorFuture(abc.ABC, Awaitable[_T]): - """Future to an actor's method call - -- Whenever you call a method on an Actor you get an ActorFuture immediately -+ Whenever you call a method on an Actor you get a BaseActorFuture immediately - while the computation happens in the background. You can call ``.result`` - to block and collect the full result - -@@ -227,34 +274,72 @@ class ActorFuture: - Actor - """ - -- def __init__(self, q, io_loop, result=None): -- self.q = q -- self.io_loop = io_loop -- if result: -- self._cached_result = result -- self.status = "pending" -+ @abc.abstractmethod -+ def result(self, timeout: str | timedelta | float | None = None) -> _T: -+ ... -+ -+ @abc.abstractmethod -+ def done(self) -> bool: -+ ... - -- def __await__(self): -- return self._result().__await__() -+ def __repr__(self) -> Literal[""]: -+ return "" - -- def done(self): -- return self.status != "pending" - -- async def _result(self, raiseit=True): -- if not hasattr(self, "_cached_result"): -- out = await self.q.get() -- if out["status"] == "OK": -- self.status = "finished" -- self._cached_result = out["result"] -- else: -- self.status = "error" -- self._cached_result = out["exception"] -- if self.status == "error": -- raise self._cached_result -- return self._cached_result -+@dataclass(frozen=True, eq=False) -+class EagerActorFuture(BaseActorFuture[_T]): -+ """Future to an actor's method call when an actor calls another actor on the same worker""" - -- def result(self, timeout=None): -- return sync(self.io_loop, self._result, callback_timeout=timeout) -+ _result: _T - -- def __repr__(self): -- return "" -+ def __await__(self) -> Generator[object, None, _T]: -+ return self._result -+ yield -+ -+ def result(self, timeout: object = None) -> _T: -+ return self._result -+ -+ def done(self) -> Literal[True]: -+ return True -+ -+ -+@dataclass(frozen=True, eq=False) -+class _OK(Generic[_T]): -+ _v: _T -+ -+ def unwrap(self) -> _T: -+ return self._v -+ -+ -+@dataclass(frozen=True, eq=False) -+class _Error: -+ _e: Exception -+ -+ def unwrap(self) -> NoReturn: -+ raise self._e -+ -+ -+class ActorFuture(BaseActorFuture[_T]): -+ def __init__(self, io_loop: IOLoop): -+ self._io_loop = io_loop -+ self._event = _LateLoopEvent() -+ self._out: _Error | _OK[_T] | None = None -+ -+ def __await__(self) -> Generator[object, None, _T]: -+ return self._result().__await__() -+ -+ def done(self) -> bool: -+ return self._event.is_set() -+ -+ async def _result(self) -> _T: -+ await self._event.wait() -+ out = self._out -+ assert out is not None -+ return out.unwrap() -+ -+ def _set_result(self, out: _Error | _OK[_T]) -> None: -+ self._out = out -+ self._event.set() -+ -+ def result(self, timeout: str | timedelta | float | None = None) -> _T: -+ return sync(self._io_loop, self._result, callback_timeout=timeout) ---- a/distributed/client.py -+++ b/distributed/client.py -@@ -4943,11 +4943,11 @@ class as_completed: - """Add multiple futures to the collection. - - The added futures will emit from the iterator once they finish""" -- from .actor import ActorFuture -+ from .actor import BaseActorFuture - - with self.lock: - for f in futures: -- if not isinstance(f, (Future, ActorFuture)): -+ if not isinstance(f, (Future, BaseActorFuture)): - raise TypeError("Input must be a future, got %s" % f) - self.futures[f] += 1 - self.loop.add_callback(self._track_future, f) ---- a/distributed/tests/test_actor.py -+++ b/distributed/tests/test_actor.py -@@ -8,7 +8,7 @@ import dask - - from distributed import ( - Actor, -- ActorFuture, -+ BaseActorFuture, - Client, - Future, - Nanny, -@@ -16,6 +16,7 @@ from distributed import ( - get_client, - wait, - ) -+from distributed.actor import _LateLoopEvent - from distributed.metrics import time - from distributed.utils_test import cluster, gen_cluster - -@@ -39,16 +40,6 @@ class Counter: - return self.n - - --class UsesCounter: -- # An actor whose method argument is another actor -- -- def do_inc(self, ac): -- return ac.increment().result() -- -- async def ado_inc(self, ac): -- return await ac.ainc() -- -- - class List: - L: list = [] - -@@ -113,7 +104,7 @@ async def test_worker_actions(c, s, a, b - assert counter._address == a_address - - future = counter.increment(separate_thread=separate_thread) -- assert isinstance(future, ActorFuture) -+ assert isinstance(future, BaseActorFuture) - assert "Future" in type(future).__name__ - end = future.result(timeout=1) - assert end > start -@@ -266,6 +257,27 @@ def test_sync(client): - assert "distributed.actor" not in repr(future) - - -+def test_timeout(client): -+ class Waiter: -+ def __init__(self): -+ self.event = _LateLoopEvent() -+ -+ async def set(self): -+ self.event.set() -+ -+ async def wait(self): -+ return await self.event.wait() -+ -+ event = client.submit(Waiter, actor=True).result() -+ future = event.wait() -+ -+ with pytest.raises(asyncio.TimeoutError): -+ future.result(timeout="0.001s") -+ -+ event.set().result() -+ assert future.result() is True -+ -+ - @gen_cluster(client=True, config={"distributed.comm.timeouts.connect": "1s"}) - async def test_failed_worker(c, s, a, b): - future = c.submit(Counter, actor=True, workers=[a.address]) -@@ -538,11 +550,9 @@ async def test_actors_in_profile(c, s, a - - @gen_cluster(client=True) - async def test_waiter(c, s, a, b): -- from tornado.locks import Event -- - class Waiter: - def __init__(self): -- self.event = Event() -+ self.event = _LateLoopEvent() - - async def set(self): - self.event.set() -@@ -618,6 +628,42 @@ def test_worker_actor_handle_is_weakref_ - - - def test_one_thread_deadlock(): -+ class UsesCounter: -+ # An actor whose method argument is another actor -+ -+ def do_inc(self, ac): -+ return ac.increment().result() -+ -+ with cluster(nworkers=2) as (cl, w): -+ client = Client(cl["address"]) -+ ac = client.submit(Counter, actor=True).result() -+ ac2 = client.submit(UsesCounter, actor=True, workers=[ac._address]).result() -+ -+ assert ac2.do_inc(ac).result() == 1 -+ -+ -+def test_one_thread_deadlock_timeout(): -+ class UsesCounter: -+ # An actor whose method argument is another actor -+ -+ def do_inc(self, ac): -+ return ac.increment().result(timeout=1) -+ -+ with cluster(nworkers=2) as (cl, w): -+ client = Client(cl["address"]) -+ ac = client.submit(Counter, actor=True).result() -+ ac2 = client.submit(UsesCounter, actor=True, workers=[ac._address]).result() -+ -+ assert ac2.do_inc(ac).result() == 1 -+ -+ -+def test_one_thread_deadlock_sync_client(): -+ class UsesCounter: -+ # An actor whose method argument is another actor -+ -+ def do_inc(self, ac): -+ return get_client().sync(ac.increment) -+ - with cluster(nworkers=2) as (cl, w): - client = Client(cl["address"]) - ac = client.submit(Counter, actor=True).result() -@@ -628,6 +674,12 @@ def test_one_thread_deadlock(): - - @gen_cluster(client=True) - async def test_async_deadlock(client, s, a, b): -+ class UsesCounter: -+ # An actor whose method argument is another actor -+ -+ async def ado_inc(self, ac): -+ return await ac.ainc() -+ - ac = await client.submit(Counter, actor=True) - ac2 = await client.submit(UsesCounter, actor=True, workers=[ac._address]) - -@@ -698,7 +750,7 @@ async def test_actor_future_awaitable(cl - ac = await client.submit(Counter, actor=True) - futures = [ac.increment() for _ in range(10)] - -- assert all([isinstance(future, ActorFuture) for future in futures]) -+ assert all([isinstance(future, BaseActorFuture) for future in futures]) - - out = await asyncio.gather(*futures) - assert all([future.done() for future in futures]) -@@ -706,6 +758,17 @@ async def test_actor_future_awaitable(cl - - - @gen_cluster(client=True) -+async def test_actor_future_awaitable_deadlock(client, s, a, b): -+ ac = await client.submit(Counter, actor=True) -+ f = ac.increment() -+ -+ async def coro(): -+ return await f -+ -+ assert await asyncio.gather(coro(), coro()) == [1, 1] -+ -+ -+@gen_cluster(client=True) - async def test_serialize_with_pickle(c, s, a, b): - class Foo: - def __init__(self): ---- a/docs/source/actors.rst -+++ b/docs/source/actors.rst -@@ -115,15 +115,15 @@ However accessing an attribute or callin - to the remote worker, run the method on the remote worker in a separate thread - pool, and then communicate the result back to the calling side. For attribute - access these operations block and return when finished, for method calls they --return an ``ActorFuture`` immediately. -+return an ``BaseActorFuture`` immediately. - - .. code-block:: python - -- >>> future = counter.increment() # Immediately returns an ActorFuture -+ >>> future = counter.increment() # Immediately returns a BaseActorFuture - >>> future.result() # Block until finished and result arrives - 1 - --``ActorFuture`` are similar to normal Dask ``Future`` objects, but not as fully -+``BaseActorFuture`` are similar to normal Dask ``Future`` objects, but not as fully - featured. They curently *only* support the ``result`` method and nothing else. - They don't currently work with any other Dask functions that expect futures, - like ``as_completed``, ``wait``, or ``client.gather``. They can't be placed -@@ -167,7 +167,7 @@ workers have only a single thread for ac - future. - - The result is sent back immediately to the calling side, and is not stored on --the worker with the actor. It is cached on the ``ActorFuture`` object. -+the worker with the actor. It is cached on the ``BaseActorFuture`` object. - - - Calling from coroutines and async/await diff --git a/_multibuild b/_multibuild index 9bbe6ad..73ab5e8 100644 --- a/_multibuild +++ b/_multibuild @@ -1,5 +1,5 @@ test-py38 test-py39 - + test-py310 diff --git a/distributed-2022.01.1-gh.tar.gz b/distributed-2022.01.1-gh.tar.gz deleted file mode 100644 index 8d4a5d3..0000000 --- a/distributed-2022.01.1-gh.tar.gz +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:c5a110cad409e77782fa62e7759819633856b21a2cfba98690bf62bd86ddaa57 -size 1576749 diff --git a/distributed-2022.02.1-gh.tar.gz b/distributed-2022.02.1-gh.tar.gz new file mode 100644 index 0000000..e21a4e7 --- /dev/null +++ b/distributed-2022.02.1-gh.tar.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:d015e781efad256fa32ae36b4278252dfbe20b1cfd7cb51bf0f44349cfcb816f +size 1606067 diff --git a/python-distributed.changes b/python-distributed.changes index d19978c..2086ff3 100644 --- a/python-distributed.changes +++ b/python-distributed.changes @@ -1,3 +1,81 @@ +------------------------------------------------------------------- +Tue Mar 8 07:46:52 UTC 2022 - Matej Cepl + +- Update to 2022.02.1: + Add the ability for Client to run preload code + Optionally use NumPy to allocate buffers + Add git hash to distributed-impl version + Immediately raise exception when trying to connect to a closed cluster + Lazily get dask version information + Remove the requirements to add comm to every handler + Raise on unclosed comms in check_instances + Constrained spill + Remove redundant str() conversions + Cluster dump now excludes run_spec by default + Dump more objects with dump_cluster_state + Do not connect to any sockets on import + Avoid deadlock when two tasks are concurrently waiting for an unresolved ActorFuture + Drop Python 3.7 + Remove support for UCX < 1.11.1 + Document and test spill->target hysteresis cycle + Fix flaky test_remove_replicas_while_computing + Fix time based test_assert_worker_story_malformed_story parameterize + Remove xfail from test_no_unnecessary_imports_on_worker + Start building pre-releases with cythonized scheduler + Do not mark tests xfail if they don't come up in time + Use gen_cluster where possible in test_dask_worker.py + Generate junit report when pytest-timeout kills pytest + Decrease timeout-minutes for GHA jobs + Bump pre-release version to be greater than stable releases + Do not run schedule jobs on forks + Remove pillow<9 pin in CI + Show scheduled test runs in report + Add obvious exclusions with pragma statement + Add coverage exclusions for cli files + Add pragma statements + Remove pragma: no cover from distributed.cli.dask_ssh + Add pragma - worker.py, client.py, stealing.py + Relax distributed / dask-core dependencies for pre-releases + Remove test_ucx_config_w_env_var flaky condition +- Update to 2022.02.0: + Update client.scheduler_info in wait_for_workers + Increase robustness to TimeoutError during connect + Respect KeyboardInterrupt in sync + Add workflow / recipe to generate Dask/distributed pre-releases + Review Scheduler / Worker display repr + AMM: Graceful Worker Retirement + AMM: tentatively stabilize flaky tests around worker pause + AMM: speed up and stabilize test_memory + Defer pandas import on worker in P2P shuffle + Fix for distributed.worker.memory.target=False and spill=0.7 + Transition flight to missing if no who_has + Remove deprecated ncores + Deprecate registering plugins by class + Deprecate --nprocs option for dask-worker CLI + Fix imbalanced backticks + xfail test_worker_reconnects_mid_compute + Fix linting CI build + Update pre-commit versions + Reactivate pytest_resourceleaks + Set test assumption for test_client_timeout + Remove client timeout from test_ucx_config_w_env_var + Remove test_failed_worker_without_warning + Fix longitudinal report + Fix flaky test_robust_to_bad_sizeof_estimates + Revert "Pin coverage to 6.2 + Trigger test runs periodically to increases failure statistics + More fault tolerant test report + Pin pillow<9 to work around torch incompatability + Overhaul check_process_leak + Fix flaky test_exit_callback test + Generate tests summary + Upload different architectured pre-releases separately + Ignore non-test directories + Bump gpuCI PYTHON_VER to 3.9 + Regression: threads noted down before they start +- Remove upstreamed patches: + - 5709-avoid-deadlock-ActorFuture.patch + ------------------------------------------------------------------- Mon Feb 14 15:12:44 UTC 2022 - Matej Cepl @@ -5,7 +83,6 @@ Mon Feb 14 15:12:44 UTC 2022 - Matej Cepl ActorFuture (gh#dask/distributed#5709). ------------------------------------------------------------------- - Sat Jan 29 17:34:23 UTC 2022 - Ben Greiner - Update to version 2022.1.1 diff --git a/python-distributed.spec b/python-distributed.spec index 94d3ca7..f8935ea 100644 --- a/python-distributed.spec +++ b/python-distributed.spec @@ -32,14 +32,14 @@ %endif %if "%{flavor}" == "test-py310" # add to _multibuild when enabling python310 (see below) -%define psuffix -test-py310" +%define psuffix -test-py310 %define skip_python38 1 %define skip_python39 1 %bcond_without test %endif %if "%{flavor}" == "" # https://github.com/dask/distributed/issues/5350 -- NOT fixed by https://github.com/dask/distributed/pull/5353 -%define skip_python310 1 +# %%define skip_python310 1 %bcond_with test %endif @@ -55,19 +55,15 @@ %{?!python_module:%define python_module() python3-%{**}} %define skip_python2 1 -%define ghversiontag 2022.01.1 Name: python-distributed%{psuffix} # Note: please always update together with python-dask -Version: 2022.1.1 +Version: 2022.02.1 Release: 0 Summary: Library for distributed computing with Python License: BSD-3-Clause URL: https://distributed.readthedocs.io/en/latest/ -Source: https://github.com/dask/distributed/archive/refs/tags//%{ghversiontag}.tar.gz#/distributed-%{ghversiontag}-gh.tar.gz +Source: https://github.com/dask/distributed/archive/refs/tags//%{version}.tar.gz#/distributed-%{version}-gh.tar.gz Source99: python-distributed-rpmlintrc -# PATCH-FIX-UPSTREAM 5709-avoid-deadlock-ActorFuture.patch gh#dask/distributed#5709 mcepl@suse.com -# avoid deadlock in ActorFuture -Patch0: 5709-avoid-deadlock-ActorFuture.patch BuildRequires: %{python_module base >= 3.7} BuildRequires: %{python_module setuptools} BuildRequires: fdupes @@ -125,7 +121,7 @@ extends both the concurrent.futures and dask APIs to moderate sized clusters. %prep -%autosetup -p1 -n distributed-%{ghversiontag} +%autosetup -p1 -n distributed-%{version} sed -i '/addopts/ {s/--durations=20//; s/--color=yes//}' setup.cfg @@ -176,7 +172,8 @@ fi %python_alternative %{_bindir}/dask-scheduler %python_alternative %{_bindir}/dask-worker %{python_sitearch}/distributed -%{python_sitearch}/distributed-%{version}*-info +%{python_sitearch}/distributed-%(echo %{version}|sed -e 's/\.0/./')*-info + %endif %changelog From 67233a984718027804d5a18c6a0f1323bc338aebdf9a4ecd9c0a63c6a4e145c9 Mon Sep 17 00:00:00 2001 From: Matej Cepl Date: Mon, 14 Mar 2022 08:09:33 +0000 Subject: [PATCH 2/4] Make cycle OBS-URL: https://build.opensuse.org/package/show/devel:languages:python:numeric/python-distributed?expand=0&rev=107 --- python-distributed.spec | 1 - 1 file changed, 1 deletion(-) diff --git a/python-distributed.spec b/python-distributed.spec index f8935ea..4368a3f 100644 --- a/python-distributed.spec +++ b/python-distributed.spec @@ -95,7 +95,6 @@ BuildRequires: %{python_module certifi} BuildRequires: %{python_module click >= 6.6} BuildRequires: %{python_module cloudpickle >= 1.5.0} BuildRequires: %{python_module dask-all = %{version}} -BuildRequires: %{python_module distributed = %{version}} BuildRequires: %{python_module ipykernel} BuildRequires: %{python_module ipython} BuildRequires: %{python_module jupyter_client} From 739e3b195f752b72f811566d6403ac0820f8bf30d0b82c7f5e7c81d2b0d10e06 Mon Sep 17 00:00:00 2001 From: Matej Cepl Date: Mon, 14 Mar 2022 08:10:40 +0000 Subject: [PATCH 3/4] Make cycle OBS-URL: https://build.opensuse.org/package/show/devel:languages:python:numeric/python-distributed?expand=0&rev=108 --- python-distributed.spec | 1 + 1 file changed, 1 insertion(+) diff --git a/python-distributed.spec b/python-distributed.spec index 4368a3f..f8935ea 100644 --- a/python-distributed.spec +++ b/python-distributed.spec @@ -95,6 +95,7 @@ BuildRequires: %{python_module certifi} BuildRequires: %{python_module click >= 6.6} BuildRequires: %{python_module cloudpickle >= 1.5.0} BuildRequires: %{python_module dask-all = %{version}} +BuildRequires: %{python_module distributed = %{version}} BuildRequires: %{python_module ipykernel} BuildRequires: %{python_module ipython} BuildRequires: %{python_module jupyter_client} From e65a57ca6a78fd10603811416a211fbb78f63f50428f566f864031cca75ca277 Mon Sep 17 00:00:00 2001 From: Matej Cepl Date: Sun, 27 Mar 2022 18:57:25 +0000 Subject: [PATCH 4/4] OBS-URL: https://build.opensuse.org/package/show/devel:languages:python:numeric/python-distributed?expand=0&rev=109 --- distributed-2022.02.1-gh.tar.gz | 3 - distributed-2022.03.0-gh.tar.gz | 3 + distributed-ignore-thread-leaks.patch | 13 + distributed-pr5952-py310.patch | 375 ++++++++++++++++++++++++++ python-distributed.changes | 24 ++ python-distributed.spec | 77 ++++-- 6 files changed, 470 insertions(+), 25 deletions(-) delete mode 100644 distributed-2022.02.1-gh.tar.gz create mode 100644 distributed-2022.03.0-gh.tar.gz create mode 100644 distributed-ignore-thread-leaks.patch create mode 100644 distributed-pr5952-py310.patch diff --git a/distributed-2022.02.1-gh.tar.gz b/distributed-2022.02.1-gh.tar.gz deleted file mode 100644 index e21a4e7..0000000 --- a/distributed-2022.02.1-gh.tar.gz +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:d015e781efad256fa32ae36b4278252dfbe20b1cfd7cb51bf0f44349cfcb816f -size 1606067 diff --git a/distributed-2022.03.0-gh.tar.gz b/distributed-2022.03.0-gh.tar.gz new file mode 100644 index 0000000..39b196a --- /dev/null +++ b/distributed-2022.03.0-gh.tar.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:615df296e593bc636ed584c6b13ce2f05f29af8aac74d398993da2e81fd164b7 +size 1615328 diff --git a/distributed-ignore-thread-leaks.patch b/distributed-ignore-thread-leaks.patch new file mode 100644 index 0000000..58d8836 --- /dev/null +++ b/distributed-ignore-thread-leaks.patch @@ -0,0 +1,13 @@ +Index: distributed-2022.03.0/distributed/utils_test.py +=================================================================== +--- distributed-2022.03.0.orig/distributed/utils_test.py ++++ distributed-2022.03.0/distributed/utils_test.py +@@ -1612,7 +1612,7 @@ def check_thread_leak(): + yield + + start = time() +- while True: ++ while False: + bad_threads = [ + thread + for thread in threading.enumerate() diff --git a/distributed-pr5952-py310.patch b/distributed-pr5952-py310.patch new file mode 100644 index 0000000..f106a0c --- /dev/null +++ b/distributed-pr5952-py310.patch @@ -0,0 +1,375 @@ +From 9c6a4c905c75c5e64ca460ea17bb2bdf0f2782fa Mon Sep 17 00:00:00 2001 +From: James Bourbeau +Date: Thu, 3 Feb 2022 12:58:32 -0600 +Subject: [PATCH 01/12] Add Python 3.10 build to CI + +--- + .github/workflows/tests.yaml | 2 +- + continuous_integration/environment-3.10.yaml | 56 ++++++++++++++++++++ + 2 files changed, 57 insertions(+), 1 deletion(-) + create mode 100644 continuous_integration/environment-3.10.yaml + +Index: distributed-2022.03.0/.github/workflows/tests.yaml +=================================================================== +--- distributed-2022.03.0.orig/.github/workflows/tests.yaml ++++ distributed-2022.03.0/.github/workflows/tests.yaml +@@ -23,7 +23,7 @@ jobs: + fail-fast: false + matrix: + os: [ubuntu-latest, windows-latest, macos-latest] +- python-version: ["3.8", "3.9"] ++ python-version: ["3.8", "3.9", "3.10"] + # Cherry-pick test modules to split the overall runtime roughly in half + partition: [ci1, not ci1] + include: +@@ -65,12 +65,6 @@ jobs: + shell: bash -l {0} + run: conda config --show + +- - name: Install stacktrace +- shell: bash -l {0} +- # stacktrace for Python 3.8 has not been released at the moment of writing +- if: ${{ matrix.os == 'ubuntu-latest' && matrix.python-version < '3.8' }} +- run: mamba install -c conda-forge -c defaults -c numba libunwind stacktrace +- + - name: Hack around https://github.com/ipython/ipython/issues/12197 + # This upstream issue causes an interpreter crash when running + # distributed/protocol/tests/test_serialize.py::test_profile_nested_sizeof +Index: distributed-2022.03.0/continuous_integration/environment-3.10.yaml +=================================================================== +--- /dev/null ++++ distributed-2022.03.0/continuous_integration/environment-3.10.yaml +@@ -0,0 +1,56 @@ ++name: dask-distributed ++channels: ++ - conda-forge ++ - defaults ++dependencies: ++ - python=3.10 ++ - packaging ++ - pip ++ - asyncssh ++ - bokeh ++ - click ++ - cloudpickle ++ - coverage<6.3 # https://github.com/nedbat/coveragepy/issues/1310 ++ - dask # overridden by git tip below ++ - filesystem-spec # overridden by git tip below ++ - h5py ++ - ipykernel ++ - ipywidgets ++ - jinja2 ++ - joblib # overridden by git tip below ++ - jupyter_client ++ - lz4 # Only tested here ++ - msgpack-python ++ - netcdf4 ++ - paramiko ++ - pre-commit ++ - prometheus_client ++ - psutil ++ - pynvml # Only tested here ++ - pytest ++ - pytest-cov ++ - pytest-faulthandler ++ - pytest-repeat ++ - pytest-rerunfailures ++ - pytest-timeout ++ - python-blosc # Only tested here ++ - python-snappy # Only tested here ++ - requests ++ - s3fs # overridden by git tip below ++ - scikit-learn ++ - scipy ++ - sortedcollections ++ - tblib ++ - toolz ++ - tornado=6 ++ - zict # overridden by git tip below ++ - zstandard ++ - pip: ++ - git+https://github.com/dask/dask ++ - git+https://github.com/dask/s3fs ++ - git+https://github.com/dask/zict ++ # FIXME https://github.com/dask/distributed/issues/5345 ++ # - git+https://github.com/intake/filesystem_spec ++ - git+https://github.com/joblib/joblib ++ - keras ++ - pytest-asyncio<0.14.0 # `pytest-asyncio<0.14.0` isn't available on conda-forge for Python 3.10 +Index: distributed-2022.03.0/distributed/tests/test_client.py +=================================================================== +--- distributed-2022.03.0.orig/distributed/tests/test_client.py ++++ distributed-2022.03.0/distributed/tests/test_client.py +@@ -6461,6 +6461,10 @@ async def test_performance_report(c, s, + assert "cdn.bokeh.org" in data + + ++@pytest.mark.skipif( ++ sys.version_info >= (3, 10), ++ reason="On Py3.10+ semaphore._loop is not bound until .acquire() blocks", ++) + @gen_cluster(nthreads=[]) + async def test_client_gather_semaphore_loop(s): + async with Client(s.address, asynchronous=True) as c: +@@ -6471,9 +6475,16 @@ async def test_client_gather_semaphore_l + async def test_as_completed_condition_loop(c, s, a, b): + seq = c.map(inc, range(5)) + ac = as_completed(seq) ++ # consume the ac so that the ac.condition is bound to the loop on py3.10+ ++ async for _ in ac: ++ pass + assert ac.condition._loop == c.loop.asyncio_loop + + ++@pytest.mark.skipif( ++ sys.version_info >= (3, 10), ++ reason="On Py3.10+ semaphore._loop is not bound until .acquire() blocks", ++) + def test_client_connectionpool_semaphore_loop(s, a, b): + with Client(s["address"]) as c: + assert c.rpc.semaphore._loop is c.loop.asyncio_loop +Index: distributed-2022.03.0/distributed/node.py +=================================================================== +--- distributed-2022.03.0.orig/distributed/node.py ++++ distributed-2022.03.0/distributed/node.py +@@ -131,12 +131,9 @@ class ServerNode(Server): + import ssl + + ssl_options = ssl.create_default_context( +- cafile=tls_ca_file, purpose=ssl.Purpose.SERVER_AUTH ++ cafile=tls_ca_file, purpose=ssl.Purpose.CLIENT_AUTH + ) + ssl_options.load_cert_chain(tls_cert, keyfile=tls_key) +- # We don't care about auth here, just encryption +- ssl_options.check_hostname = False +- ssl_options.verify_mode = ssl.CERT_NONE + + self.http_server = HTTPServer(self.http_application, ssl_options=ssl_options) + +Index: distributed-2022.03.0/distributed/profile.py +=================================================================== +--- distributed-2022.03.0.orig/distributed/profile.py ++++ distributed-2022.03.0/distributed/profile.py +@@ -27,6 +27,7 @@ We represent this tree as a nested dicti + from __future__ import annotations + + import bisect ++import dis + import linecache + import sys + import threading +@@ -59,21 +60,41 @@ def identifier(frame): + ) + + ++# work around some frames lacking an f_lineo eg: https://bugs.python.org/issue47085 ++def _f_lineno(frame): ++ f_lineno = frame.f_lineno ++ if f_lineno is not None: ++ return f_lineno ++ ++ f_lasti = frame.f_lasti ++ code = frame.f_code ++ prev_line = code.co_firstlineno ++ ++ for start, next_line in dis.findlinestarts(code): ++ if f_lasti < start: ++ return prev_line ++ prev_line = next_line ++ ++ return prev_line ++ ++ + def repr_frame(frame): + """Render a frame as a line for inclusion into a text traceback""" + co = frame.f_code +- text = f' File "{co.co_filename}", line {frame.f_lineno}, in {co.co_name}' +- line = linecache.getline(co.co_filename, frame.f_lineno, frame.f_globals).lstrip() ++ f_lineno = _f_lineno(frame) ++ text = f' File "{co.co_filename}", line {f_lineno}, in {co.co_name}' ++ line = linecache.getline(co.co_filename, f_lineno, frame.f_globals).lstrip() + return text + "\n\t" + line + + + def info_frame(frame): + co = frame.f_code +- line = linecache.getline(co.co_filename, frame.f_lineno, frame.f_globals).lstrip() ++ f_lineno = _f_lineno(frame) ++ line = linecache.getline(co.co_filename, f_lineno, frame.f_globals).lstrip() + return { + "filename": co.co_filename, + "name": co.co_name, +- "line_number": frame.f_lineno, ++ "line_number": f_lineno, + "line": line, + } + +Index: distributed-2022.03.0/distributed/tests/test_profile.py +=================================================================== +--- distributed-2022.03.0.orig/distributed/tests/test_profile.py ++++ distributed-2022.03.0/distributed/tests/test_profile.py +@@ -1,5 +1,9 @@ ++from __future__ import annotations ++ ++import dataclasses + import sys + import threading ++from collections.abc import Iterator, Sequence + from time import sleep + + import pytest +@@ -11,6 +15,7 @@ from distributed.profile import ( + call_stack, + create, + identifier, ++ info_frame, + ll_get_stack, + llprocess, + merge, +@@ -200,3 +205,102 @@ def test_watch(): + while threading.active_count() > start_threads: + assert time() < start + 2 + sleep(0.01) ++ ++ ++@dataclasses.dataclass(frozen=True) ++class FakeCode: ++ co_filename: str ++ co_name: str ++ co_firstlineno: int ++ co_lnotab: bytes ++ co_lines_seq: Sequence[tuple[int, int, int | None]] ++ co_code: bytes ++ ++ def co_lines(self) -> Iterator[tuple[int, int, int | None]]: ++ yield from self.co_lines_seq ++ ++ ++FAKE_CODE = FakeCode( ++ co_filename="", ++ co_name="example", ++ co_firstlineno=1, ++ # https://github.com/python/cpython/blob/b68431fadb3150134ac6ccbf501cdfeaf4c75678/Objects/lnotab_notes.txt#L84 ++ # generated from: ++ # def example(): ++ # for i in range(1): ++ # if i >= 0: ++ # pass ++ # example.__code__.co_lnotab ++ co_lnotab=b"\x00\x01\x0c\x01\x08\x01\x04\xfe", ++ # generated with list(example.__code__.co_lines()) ++ co_lines_seq=[ ++ (0, 12, 2), ++ (12, 20, 3), ++ (20, 22, 4), ++ (22, 24, None), ++ (24, 28, 2), ++ ], ++ # used in dis.findlinestarts as bytecode_len = len(code.co_code) ++ # https://github.com/python/cpython/blob/6f345d363308e3e6ecf0ad518ea0fcc30afde2a8/Lib/dis.py#L457 ++ co_code=bytes(28), ++) ++ ++ ++@dataclasses.dataclass(frozen=True) ++class FakeFrame: ++ f_lasti: int ++ f_code: FakeCode ++ f_lineno: int | None = None ++ f_back: FakeFrame | None = None ++ f_globals: dict[str, object] = dataclasses.field(default_factory=dict) ++ ++ ++@pytest.mark.parametrize( ++ "f_lasti,f_lineno", ++ [ ++ (-1, 1), ++ (0, 2), ++ (1, 2), ++ (11, 2), ++ (12, 3), ++ (21, 4), ++ (22, 4), ++ (23, 4), ++ (24, 2), ++ (25, 2), ++ (26, 2), ++ (27, 2), ++ (100, 2), ++ ], ++) ++def test_info_frame_f_lineno(f_lasti: int, f_lineno: int) -> None: ++ assert info_frame(FakeFrame(f_lasti=f_lasti, f_code=FAKE_CODE)) == { ++ "filename": "", ++ "name": "example", ++ "line_number": f_lineno, ++ "line": "", ++ } ++ ++ ++@pytest.mark.parametrize( ++ "f_lasti,f_lineno", ++ [ ++ (-1, 1), ++ (0, 2), ++ (1, 2), ++ (11, 2), ++ (12, 3), ++ (21, 4), ++ (22, 4), ++ (23, 4), ++ (24, 2), ++ (25, 2), ++ (26, 2), ++ (27, 2), ++ (100, 2), ++ ], ++) ++def test_call_stack_f_lineno(f_lasti: int, f_lineno: int) -> None: ++ assert call_stack(FakeFrame(f_lasti=f_lasti, f_code=FAKE_CODE)) == [ ++ f' File "", line {f_lineno}, in example\n\t' ++ ] +Index: distributed-2022.03.0/distributed/utils_test.py +=================================================================== +--- distributed-2022.03.0.orig/distributed/utils_test.py ++++ distributed-2022.03.0/distributed/utils_test.py +@@ -706,13 +706,16 @@ def cluster( + except KeyError: + rpc_kwargs = {} + +- with rpc(saddr, **rpc_kwargs) as s: +- while True: +- nthreads = loop.run_sync(s.ncores) +- if len(nthreads) == nworkers: +- break +- if time() - start > 5: +- raise Exception("Timeout on cluster creation") ++ async def wait_for_workers(): ++ async with rpc(saddr, **rpc_kwargs) as s: ++ while True: ++ nthreads = await s.ncores() ++ if len(nthreads) == nworkers: ++ break ++ if time() - start > 5: ++ raise Exception("Timeout on cluster creation") ++ ++ loop.run_sync(wait_for_workers) + + # avoid sending processes down to function + yield {"address": saddr}, [ +Index: distributed-2022.03.0/setup.py +=================================================================== +--- distributed-2022.03.0.orig/setup.py ++++ distributed-2022.03.0/setup.py +@@ -98,8 +98,11 @@ setup( + "License :: OSI Approved :: BSD License", + "Operating System :: OS Independent", + "Programming Language :: Python", ++ "Programming Language :: Python :: 3", ++ "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", ++ "Programming Language :: Python :: 3.10", + "Topic :: Scientific/Engineering", + "Topic :: System :: Distributed Computing", + ], diff --git a/python-distributed.changes b/python-distributed.changes index 2086ff3..fa2ca9f 100644 --- a/python-distributed.changes +++ b/python-distributed.changes @@ -1,3 +1,27 @@ +------------------------------------------------------------------- +Fri Mar 25 19:18:11 UTC 2022 - Ben Greiner + +- Update to 2022.03.0 + * Support dumping cluster state to URL (GH#5863) Gabe Joseph + * Prevent data duplication on unspill (GH#5936) crusaderky + * Encapsulate spill buffer and memory_monitor (GH#5904) + crusaderky + * Drop pkg_resources in favour of importlib.metadata (GH#5923) + Thomas Grainger + * Worker State Machine refactor: redesign TaskState and scheduler + messages (GH#5922) crusaderky + * Tidying of OpenSSL 1.0.2/Python 3.9 (and earlier) handling + (GH#5854) jakirkham + * zict type annotations (GH#5905) crusaderky + * Add key to compute failed message (GH#5928) Florian Jetter + * Change default log format to include timestamp (GH#5897) + Florian Jetter + * Improve type annotations in worker.py (GH#5814) crusaderky +- Add distributed-pr5952-py310.patch -- gh#dask/distributed#5952 +- Add distributed-ignore-thread-leaks.patch +- Make the distributed/dask update sync requirement even more + obvious. + ------------------------------------------------------------------- Tue Mar 8 07:46:52 UTC 2022 - Matej Cepl diff --git a/python-distributed.spec b/python-distributed.spec index f8935ea..af140b1 100644 --- a/python-distributed.spec +++ b/python-distributed.spec @@ -31,15 +31,12 @@ %bcond_without test %endif %if "%{flavor}" == "test-py310" -# add to _multibuild when enabling python310 (see below) %define psuffix -test-py310 %define skip_python38 1 %define skip_python39 1 %bcond_without test %endif %if "%{flavor}" == "" -# https://github.com/dask/distributed/issues/5350 -- NOT fixed by https://github.com/dask/distributed/pull/5353 -# %%define skip_python310 1 %bcond_with test %endif @@ -53,18 +50,27 @@ %define cythonize --with-cython %endif +# use this to run tests with xdist in parallel, unfortunately fails server side +%bcond_with paralleltests + %{?!python_module:%define python_module() python3-%{**}} %define skip_python2 1 +# ===> Note: python-dask MUST be updated in sync with python-distributed! <=== +%define ghversiontag 2022.03.0 Name: python-distributed%{psuffix} -# Note: please always update together with python-dask -Version: 2022.02.1 +# ===> Note: python-dask MUST be updated in sync with python-distributed! <=== +Version: 2022.3.0 Release: 0 Summary: Library for distributed computing with Python License: BSD-3-Clause -URL: https://distributed.readthedocs.io/en/latest/ -Source: https://github.com/dask/distributed/archive/refs/tags//%{version}.tar.gz#/distributed-%{version}-gh.tar.gz +URL: https://distributed.dask.org +Source: https://github.com/dask/distributed/archive/refs/tags/%{ghversiontag}.tar.gz#/distributed-%{ghversiontag}-gh.tar.gz Source99: python-distributed-rpmlintrc -BuildRequires: %{python_module base >= 3.7} +# PATCH-FIX-UPSTREAM distributed-pr5952-py310.patch -- gh#dask/distributed#5952 +Patch1: distributed-pr5952-py310.patch +# PATCH-FIX-OPENSUSE distributed-ignore-thread-leaks.patch -- ignore leaking threads on obs, code@bnavigator.de +Patch2: distributed-ignore-thread-leaks.patch +BuildRequires: %{python_module base >= 3.8} BuildRequires: %{python_module setuptools} BuildRequires: fdupes BuildRequires: python-rpm-macros @@ -112,6 +118,9 @@ BuildRequires: %{python_module tblib} BuildRequires: %{python_module toolz >= 0.8.2} BuildRequires: %{python_module tornado >= 6.0.3} BuildRequires: %{python_module zict >= 0.1.3} +%if %{with paralleltests} +BuildRequires: %{python_module pytest-xdist} +%endif %endif %python_subpackages @@ -121,9 +130,10 @@ extends both the concurrent.futures and dask APIs to moderate sized clusters. %prep -%autosetup -p1 -n distributed-%{version} +%autosetup -p1 -n distributed-%{ghversiontag} -sed -i '/addopts/ {s/--durations=20//; s/--color=yes//}' setup.cfg +sed -i -e '/addopts/ {s/--durations=20//; s/--color=yes//}' \ + -e 's/timeout_method = thread/timeout_method = signal/' setup.cfg %build %if ! %{with test} @@ -141,21 +151,44 @@ sed -i '/addopts/ {s/--durations=20//; s/--color=yes//}' setup.cfg %if %{with test} %check -# randomly fail server-side -- too slow for obs (?) +# we obviously don't test a git repo +donttest="test_git_revision" +# logger error +donttest+=" or test_version_warning_in_cluster" + +# Some tests randomly fail server-side -- too slow for obs (?) +# see also https://github.com/dask/distributed/issues/5818 donttest+=" or (test_asyncprocess and test_exit_callback)" -donttest+=" or (test_nanny and test_throttle_outgoing_connections)" -donttest+=" or (test_scheduler and test_rebalance)" -donttest+=" or (test_tls_functional and test_rebalance)" -donttest+=" or (test_worker and test_fail_write_to_disk)" -donttest+=" or (test_worker and test_multiple_transfers)" -donttest+=" or (test_worker and test_remove_replicas_while_computing)" +donttest+=" or (test_client and test_repr)" +donttest+=" or (test_priorities and test_compute)" +donttest+=" or (test_resources and test_prefer_constrained)" +donttest+=" or (test_steal and test_steal_twice)" +donttest+=" or (test_worker and test_gather_dep_one_worker_always_busy)" donttest+=" or (test_worker and test_worker_reconnects_mid_compute)" + +# Exception messages not caught -- https://github.com/dask/distributed/issues/5460#issuecomment-1079432890 +python310_donttest+=" or test_exception_text" +python310_donttest+=" or test_worker_bad_args" + if [[ $(getconf LONG_BIT) -eq 32 ]]; then - # OverflowError - donttest+=" or (test_ensure_spilled_immediately)" - donttest+=" or (test_value_raises_during_spilling)" + # OverflowError -- https://github.com/dask/distributed/issues/5252 + donttest+=" or test_ensure_spilled_immediately" + donttest+=" or test_value_raises_during_spilling" + donttest+=" or test_fail_to_pickle_target_1" fi -%pytest_arch distributed/tests -r sfER -m "not avoid_ci" -k "not (${donttest:4})" --reruns 3 --reruns-delay 3 + +%if %{with paralleltests} +# not fully supported parallel test suite: https://github.com/dask/distributed/issues/5186 +# works locally, but fails with too many tests server-side +notparallel="rebalance or memory or upload" +notparallel+=" or test_open_close_many_workers" +notparallel+=" or test_recreate_error_array" +notparallel+=" or (test_preload and test_web_preload_worker)" +%pytest_arch distributed/tests -m "not avoid_ci" -n auto -k "not ($notparallel or $donttest ${$python_donttest})" +%pytest_arch distributed/tests -m "not avoid_ci" -k "($notparallel) and (not ($donttest ${$python_donttest}))" +%else +%pytest_arch distributed/tests -m "not avoid_ci" -k "not ($donttest ${$python_donttest})" --reruns 3 --reruns-delay 3 +%endif %endif %if ! %{with test} @@ -172,7 +205,7 @@ fi %python_alternative %{_bindir}/dask-scheduler %python_alternative %{_bindir}/dask-worker %{python_sitearch}/distributed -%{python_sitearch}/distributed-%(echo %{version}|sed -e 's/\.0/./')*-info +%{python_sitearch}/distributed-%{version}*-info %endif