From d7e55ab03c1b444e9d33e3c458779cec0ed0eef84247ba21e073332212545c47 Mon Sep 17 00:00:00 2001 From: Matej Cepl Date: Mon, 9 Aug 2021 13:04:37 +0000 Subject: [PATCH] Accepting request 910741 from home:bnavigator:branches:devel:languages:python:numeric - Update to version 2021.7.2 * Fix a deadlock connected to task stealing and task deserialization * Include maximum shard size in second to_frames method * Minor dashboard style updates * Cap maximum shard size at the size of an integer * Document automatic MALLOC_TRIM_THRESHOLD_ environment variable * Mark ucx-py tests for GPU * Update individual profile plot sizing * Handle NVMLError_Unknown in NVML diagnostics * Unit tests to use a random port for the dashboard * Ensure worker reconnect registers existing tasks properly * Halve CI runtime! * Add NannyPlugins * Add WorkerNetworkBandwidth chart to dashboard * Set nanny environment variables in config * Read smaller frames to workaround OpenSSL bug * Move UCX/RMM config variables to Distributed namespace * Allow ws(s) messages greater than 10Mb * Short-circuit root-ish check for many deps -Release 2021.07.1 * Remove experimental feature warning from actors docs * Keep dependents in worker dependency if TS is still known * Add Scheduler.set_restrictions * Make Actor futures awaitable and work with as_completed * Simplify test_secede_balances * Computation class * Some light dashboard cleanup * Don't package tests * Add pytest marker for GPU tests * Actor: don't hold key references on workers * Collapse nav to hamburger sooner * Verify that actors survive pickling * Reenable UCX-Py tests that used to segfault * Better support ProcessPoolExecutors * Simplify test_worker_heartbeat_after_cancel * Avoid property validation in Bokeh * Reduce default websocket frame size and make configurable * Disable pytest-timeout SIGALARM on MacOS * rebalance() resilience to computations * Improve CI stability * Ensure heartbeats after cancelation do not raise KeyError s * Add more useful exception message on TLS cert mismatch * Add bokeh mode parameter to performance reports - Use the GitHub tarball because the PyPI sdist does to provide the tests anymore * Remove extra conftest.py source - Drop distributed-pr5022-improve_ci.patch merged upstream OBS-URL: https://build.opensuse.org/request/show/910741 OBS-URL: https://build.opensuse.org/package/show/devel:languages:python:numeric/python-distributed?expand=0&rev=99 --- conftest.py | 37 - distributed-2021.07.2-gh.tar.gz | 3 + distributed-2021.7.0.tar.gz | 3 - distributed-pr5022-improve_ci.patch | 2621 --------------------------- python-distributed.changes | 52 + python-distributed.spec | 18 +- 6 files changed, 60 insertions(+), 2674 deletions(-) delete mode 100644 conftest.py create mode 100644 distributed-2021.07.2-gh.tar.gz delete mode 100644 distributed-2021.7.0.tar.gz delete mode 100644 distributed-pr5022-improve_ci.patch diff --git a/conftest.py b/conftest.py deleted file mode 100644 index cd7623a..0000000 --- a/conftest.py +++ /dev/null @@ -1,37 +0,0 @@ -# https://pytest.org/latest/example/simple.html#control-skipping-of-tests-according-to-command-line-option -import pytest - -# Uncomment to enable more logging and checks -# (https://docs.python.org/3/library/asyncio-dev.html) -# Note this makes things slower and might consume much memory. -# os.environ["PYTHONASYNCIODEBUG"] = "1" - -try: - import faulthandler -except ImportError: - pass -else: - try: - faulthandler.enable() - except Exception: - pass - -# Make all fixtures available -from distributed.utils_test import * # noqa - - -def pytest_addoption(parser): - parser.addoption("--runslow", action="store_true", help="run slow tests") - - -def pytest_collection_modifyitems(config, items): - if config.getoption("--runslow"): - # --runslow given in cli: do not skip slow tests - return - skip_slow = pytest.mark.skip(reason="need --runslow option to run") - for item in items: - if "slow" in item.keywords: - item.add_marker(skip_slow) - - -pytest_plugins = ["distributed.pytest_resourceleaks"] diff --git a/distributed-2021.07.2-gh.tar.gz b/distributed-2021.07.2-gh.tar.gz new file mode 100644 index 0000000..933fa19 --- /dev/null +++ b/distributed-2021.07.2-gh.tar.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:2b985191330b4dfb5cb2ed431ac66cfee184ac2032f35a26fae0e8f4036c23de +size 1460430 diff --git a/distributed-2021.7.0.tar.gz b/distributed-2021.7.0.tar.gz deleted file mode 100644 index ab2ce66..0000000 --- a/distributed-2021.7.0.tar.gz +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:6257f399ea564bfdcd80dcc9df6cdaf703dbadb94b7c068d103f8366dc7f8d1f -size 1065464 diff --git a/distributed-pr5022-improve_ci.patch b/distributed-pr5022-improve_ci.patch deleted file mode 100644 index fd7c71b..0000000 --- a/distributed-pr5022-improve_ci.patch +++ /dev/null @@ -1,2621 +0,0 @@ -Index: distributed-2021.7.0/continuous_integration/condarc -=================================================================== ---- /dev/null -+++ distributed-2021.7.0/continuous_integration/condarc -@@ -0,0 +1,9 @@ -+channels: -+ - conda-forge -+ - defaults -+channel_priority: true -+auto_activate_base: false -+remote_backoff_factor: 20 -+remote_connect_timeout_secs: 20.0 -+remote_max_retries: 10 -+remote_read_timeout_secs: 60.0 -Index: distributed-2021.7.0/distributed/cli/tests/test_dask_scheduler.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/cli/tests/test_dask_scheduler.py -+++ distributed-2021.7.0/distributed/cli/tests/test_dask_scheduler.py -@@ -1,3 +1,4 @@ -+import psutil - import pytest - - pytest.importorskip("requests") -@@ -14,6 +15,7 @@ from click.testing import CliRunner - import distributed - import distributed.cli.dask_scheduler - from distributed import Client, Scheduler -+from distributed.compatibility import LINUX - from distributed.metrics import time - from distributed.utils import get_ip, get_ip_interface, tmpfile - from distributed.utils_test import ( -@@ -118,9 +120,7 @@ def test_dashboard_non_standard_ports(lo - requests.get("http://localhost:4832/status/") - - --@pytest.mark.skipif( -- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" --) -+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") - def test_dashboard_whitelist(loop): - pytest.importorskip("bokeh") - with pytest.raises(Exception): -@@ -144,7 +144,6 @@ def test_dashboard_whitelist(loop): - - - def test_interface(loop): -- psutil = pytest.importorskip("psutil") - if_names = sorted(psutil.net_if_addrs()) - for if_name in if_names: - try: -@@ -168,25 +167,24 @@ def test_interface(loop): - start = time() - while not len(c.nthreads()): - sleep(0.1) -- assert time() - start < 5 -+ assert time() - start < 30 - info = c.scheduler_info() - assert "tcp://127.0.0.1" in info["address"] - assert all("127.0.0.1" == d["host"] for d in info["workers"].values()) - - --@pytest.mark.flaky(reruns=10, reruns_delay=5) - def test_pid_file(loop): - def check_pidfile(proc, pidfile): - start = time() - while not os.path.exists(pidfile): - sleep(0.01) -- assert time() < start + 5 -+ assert time() < start + 30 - - text = False - start = time() - while not text: - sleep(0.01) -- assert time() < start + 5 -+ assert time() < start + 30 - with open(pidfile) as f: - text = f.read() - pid = int(text) -Index: distributed-2021.7.0/distributed/cli/tests/test_dask_worker.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/cli/tests/test_dask_worker.py -+++ distributed-2021.7.0/distributed/cli/tests/test_dask_worker.py -@@ -6,7 +6,6 @@ from click.testing import CliRunner - pytest.importorskip("requests") - - import os --import sys - from multiprocessing import cpu_count - from time import sleep - -@@ -14,6 +13,7 @@ import requests - - import distributed.cli.dask_worker - from distributed import Client, Scheduler -+from distributed.compatibility import LINUX - from distributed.deploy.utils import nprocesses_nthreads - from distributed.metrics import time - from distributed.utils import parse_ports, sync, tmpfile -@@ -275,9 +275,7 @@ def test_nprocs_expands_name(loop): - assert len(set(names)) == 4 - - --@pytest.mark.skipif( -- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" --) -+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") - @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) - @pytest.mark.parametrize( - "listen_address", ["tcp://0.0.0.0:39837", "tcp://127.0.0.2:39837"] -@@ -311,9 +309,7 @@ def test_contact_listen_address(loop, na - assert client.run(func) == {"tcp://127.0.0.2:39837": listen_address} - - --@pytest.mark.skipif( -- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" --) -+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") - @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) - @pytest.mark.parametrize("host", ["127.0.0.2", "0.0.0.0"]) - def test_respect_host_listen_address(loop, nanny, host): -Index: distributed-2021.7.0/distributed/comm/tests/test_comms.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/comm/tests/test_comms.py -+++ distributed-2021.7.0/distributed/comm/tests/test_comms.py -@@ -29,7 +29,6 @@ from distributed.comm import ( - ) - from distributed.comm.registry import backends, get_backend - from distributed.comm.tcp import TCP, TCPBackend, TCPConnector --from distributed.compatibility import WINDOWS - from distributed.metrics import time - from distributed.protocol import Serialized, deserialize, serialize, to_serialize - from distributed.utils import get_ip, get_ipv6 -@@ -110,7 +109,7 @@ async def debug_loop(): - while True: - loop = ioloop.IOLoop.current() - print(".", loop, loop._handlers) -- await asyncio.sleep(0.50) -+ await asyncio.sleep(0.5) - - - # -@@ -1107,7 +1106,6 @@ async def check_deserialize(addr): - await check_connector_deserialize(addr, True, msg, partial(check_out, True)) - - --@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=WINDOWS) - @pytest.mark.asyncio - async def test_tcp_deserialize(): - await check_deserialize("tcp://") -Index: distributed-2021.7.0/distributed/comm/tests/test_ucx.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/comm/tests/test_ucx.py -+++ distributed-2021.7.0/distributed/comm/tests/test_ucx.py -@@ -276,7 +276,7 @@ async def test_ucx_localcluster(processe - ) as cluster: - async with Client(cluster, asynchronous=True) as client: - x = client.submit(inc, 1) -- await x.result() -+ await x - assert x.key in cluster.scheduler.tasks - if not processes: - assert any(w.data == {x.key: 2} for w in cluster.workers.values()) -Index: distributed-2021.7.0/distributed/comm/tests/test_ws.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/comm/tests/test_ws.py -+++ distributed-2021.7.0/distributed/comm/tests/test_ws.py -@@ -122,9 +122,9 @@ async def test_collections(cleanup): - async def test_large_transfer(cleanup): - np = pytest.importorskip("numpy") - async with Scheduler(protocol="ws://") as s: -- async with Worker(s.address, protocol="ws://") as w: -+ async with Worker(s.address, protocol="ws://"): - async with Client(s.address, asynchronous=True) as c: -- future = await c.scatter(np.random.random(1000000)) -+ await c.scatter(np.random.random(1_000_000)) - - - @pytest.mark.asyncio -Index: distributed-2021.7.0/distributed/compatibility.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/compatibility.py -+++ distributed-2021.7.0/distributed/compatibility.py -@@ -8,6 +8,7 @@ logging_names = logging._levelToName.cop - logging_names.update(logging._nameToLevel) - - PYPY = platform.python_implementation().lower() == "pypy" -+LINUX = sys.platform == "linux" - MACOS = sys.platform == "darwin" - WINDOWS = sys.platform.startswith("win") - TORNADO6 = tornado.version_info[0] >= 6 -Index: distributed-2021.7.0/distributed/dashboard/tests/test_scheduler_bokeh.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/dashboard/tests/test_scheduler_bokeh.py -+++ distributed-2021.7.0/distributed/dashboard/tests/test_scheduler_bokeh.py -@@ -17,7 +17,6 @@ from dask.core import flatten - from dask.utils import stringify - - from distributed.client import wait --from distributed.compatibility import MACOS - from distributed.dashboard import scheduler - from distributed.dashboard.components.scheduler import ( - AggregateAction, -@@ -93,10 +92,8 @@ async def test_counters(c, s, a, b): - await asyncio.sleep(0.1) - ss.update() - -- start = time() - while not len(ss.digest_sources["tick-duration"][0].data["x"]): -- await asyncio.sleep(1) -- assert time() < start + 5 -+ await asyncio.sleep(0.01) - - - @gen_cluster(client=True) -@@ -184,7 +181,7 @@ async def test_task_stream_clear_interva - - await wait(c.map(inc, range(10))) - ts.update() -- await asyncio.sleep(0.010) -+ await asyncio.sleep(0.01) - await wait(c.map(dec, range(10))) - ts.update() - -@@ -192,7 +189,7 @@ async def test_task_stream_clear_interva - assert ts.source.data["name"].count("inc") == 10 - assert ts.source.data["name"].count("dec") == 10 - -- await asyncio.sleep(0.300) -+ await asyncio.sleep(0.3) - await wait(c.map(inc, range(10, 20))) - ts.update() - -@@ -848,7 +845,6 @@ async def test_aggregate_action(c, s, a, - assert ("compute") in mbk.action_source.data["names"] - - --@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=MACOS) - @gen_cluster(client=True, scheduler_kwargs={"dashboard": True}) - async def test_compute_per_key(c, s, a, b): - mbk = ComputePerKey(s) -Index: distributed-2021.7.0/distributed/deploy/local.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/deploy/local.py -+++ distributed-2021.7.0/distributed/deploy/local.py -@@ -136,8 +136,8 @@ class LocalCluster(SpecCluster): - - if threads_per_worker == 0: - warnings.warn( -- "Setting `threads_per_worker` to 0 is discouraged. " -- "Please set to None or to a specific int to get best behavior." -+ "Setting `threads_per_worker` to 0 has been deprecated. " -+ "Please set to None or to a specific int." - ) - threads_per_worker = None - -Index: distributed-2021.7.0/distributed/deploy/old_ssh.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/deploy/old_ssh.py -+++ distributed-2021.7.0/distributed/deploy/old_ssh.py -@@ -51,8 +51,8 @@ def async_ssh(cmd_dict): - port=cmd_dict["ssh_port"], - key_filename=cmd_dict["ssh_private_key"], - compress=True, -- timeout=20, -- banner_timeout=20, -+ timeout=30, -+ banner_timeout=30, - ) # Helps prevent timeouts when many concurrent ssh connections are opened. - # Connection successful, break out of while loop - break -Index: distributed-2021.7.0/distributed/deploy/tests/test_adaptive.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/deploy/tests/test_adaptive.py -+++ distributed-2021.7.0/distributed/deploy/tests/test_adaptive.py -@@ -8,6 +8,7 @@ import pytest - import dask - - from distributed import Adaptive, Client, LocalCluster, SpecCluster, Worker, wait -+from distributed.compatibility import WINDOWS - from distributed.metrics import time - from distributed.utils_test import async_wait_for, clean, gen_test, slowinc - -@@ -40,8 +41,8 @@ def test_adaptive_local_cluster(loop): - assert not c.nthreads() - - --@pytest.mark.asyncio --async def test_adaptive_local_cluster_multi_workers(cleanup): -+@gen_test() -+async def test_adaptive_local_cluster_multi_workers(): - async with LocalCluster( - n_workers=0, - scheduler_port=0, -@@ -56,19 +57,14 @@ async def test_adaptive_local_cluster_mu - async with Client(cluster, asynchronous=True) as c: - futures = c.map(slowinc, range(100), delay=0.01) - -- start = time() - while not cluster.scheduler.workers: - await asyncio.sleep(0.01) -- assert time() < start + 15, adapt.log - - await c.gather(futures) - del futures - -- start = time() -- # while cluster.workers: - while cluster.scheduler.workers: - await asyncio.sleep(0.01) -- assert time() < start + 15, adapt.log - - # no workers for a while - for i in range(10): -@@ -242,7 +238,7 @@ async def test_adapt_quickly(): - await cluster.close() - - --@gen_test(timeout=None) -+@gen_test() - async def test_adapt_down(): - """Ensure that redefining adapt with a lower maximum removes workers""" - async with LocalCluster( -@@ -302,14 +298,15 @@ def test_basic_no_loop(loop): - loop.add_callback(loop.stop) - - --@pytest.mark.asyncio -+@pytest.mark.flaky(condition=not WINDOWS, reruns=10, reruns_delay=5) -+@pytest.mark.xfail(condition=WINDOWS, reason="extremely flaky") -+@gen_test() - async def test_target_duration(): -- """Ensure that redefining adapt with a lower maximum removes workers""" - with dask.config.set( - {"distributed.scheduler.default-task-durations": {"slowinc": 1}} - ): - async with LocalCluster( -- 0, -+ n_workers=0, - asynchronous=True, - processes=False, - scheduler_port=0, -@@ -318,16 +315,12 @@ async def test_target_duration(): - ) as cluster: - adapt = cluster.adapt(interval="20ms", minimum=2, target_duration="5s") - async with Client(cluster, asynchronous=True) as client: -- while len(cluster.scheduler.workers) < 2: -- await asyncio.sleep(0.01) -- -+ await client.wait_for_workers(2) - futures = client.map(slowinc, range(100), delay=0.3) -+ await wait(futures) - -- while len(adapt.log) < 2: -- await asyncio.sleep(0.01) -- -- assert adapt.log[0][1] == {"status": "up", "n": 2} -- assert adapt.log[1][1] == {"status": "up", "n": 20} -+ assert adapt.log[0][1] == {"status": "up", "n": 2} -+ assert adapt.log[1][1] == {"status": "up", "n": 20} - - - @pytest.mark.asyncio -Index: distributed-2021.7.0/distributed/deploy/tests/test_adaptive_core.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/deploy/tests/test_adaptive_core.py -+++ distributed-2021.7.0/distributed/deploy/tests/test_adaptive_core.py -@@ -85,10 +85,10 @@ async def test_interval(): - assert time() < start + 2 - - adapt.stop() -- await asyncio.sleep(0.050) -+ await asyncio.sleep(0.05) - - adapt._target = 10 -- await asyncio.sleep(0.020) -+ await asyncio.sleep(0.02) - assert len(adapt.plan) == 1 # last value from before, unchanged - - -Index: distributed-2021.7.0/distributed/deploy/tests/test_local.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/deploy/tests/test_local.py -+++ distributed-2021.7.0/distributed/deploy/tests/test_local.py -@@ -17,6 +17,7 @@ from tornado.ioloop import IOLoop - from dask.system import CPU_COUNT - - from distributed import Client, Nanny, Worker, get_client -+from distributed.compatibility import LINUX - from distributed.core import Status - from distributed.deploy.local import LocalCluster - from distributed.deploy.utils_test import ClusterTest -@@ -693,13 +694,12 @@ def test_adapt_then_manual(loop): - processes=False, - n_workers=8, - ) as cluster: -- sleep(0.1) - cluster.adapt(minimum=0, maximum=4, interval="10ms") - - start = time() - while cluster.scheduler.workers or cluster.workers: -- sleep(0.1) -- assert time() < start + 5 -+ sleep(0.01) -+ assert time() < start + 30 - - assert not cluster.workers - -@@ -715,8 +715,8 @@ def test_adapt_then_manual(loop): - - start = time() - while len(cluster.scheduler.workers) != 2: -- sleep(0.1) -- assert time() < start + 5 -+ sleep(0.01) -+ assert time() < start + 30 - - - @pytest.mark.parametrize("temporary", [True, False]) -@@ -843,9 +843,7 @@ def test_protocol_tcp(loop): - assert cluster.scheduler.address.startswith("tcp://") - - --@pytest.mark.skipif( -- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" --) -+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") - def test_protocol_ip(loop): - with LocalCluster( - host="tcp://127.0.0.2", loop=loop, n_workers=0, processes=False -@@ -990,7 +988,7 @@ async def test_repr(cleanup): - @pytest.mark.asyncio - async def test_threads_per_worker_set_to_0(cleanup): - with pytest.warns( -- Warning, match="Setting `threads_per_worker` to 0 is discouraged." -+ Warning, match="Setting `threads_per_worker` to 0 has been deprecated." - ): - async with LocalCluster( - n_workers=2, processes=False, threads_per_worker=0, asynchronous=True -Index: distributed-2021.7.0/distributed/deploy/tests/test_spec_cluster.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/deploy/tests/test_spec_cluster.py -+++ distributed-2021.7.0/distributed/deploy/tests/test_spec_cluster.py -@@ -14,6 +14,7 @@ from distributed.core import Status - from distributed.deploy.spec import ProcessInterface, close_clusters, run_spec - from distributed.metrics import time - from distributed.utils import is_valid_xml -+from distributed.utils_test import gen_test - - - class MyWorker(Worker): -@@ -138,8 +139,8 @@ async def test_scale(cleanup): - - - @pytest.mark.slow --@pytest.mark.asyncio --async def test_adaptive_killed_worker(cleanup): -+@gen_test() -+async def test_adaptive_killed_worker(): - with dask.config.set({"distributed.deploy.lost-worker-timeout": 0.1}): - - async with SpecCluster( -@@ -147,13 +148,10 @@ async def test_adaptive_killed_worker(cl - worker={"cls": Nanny, "options": {"nthreads": 1}}, - scheduler={"cls": Scheduler, "options": {"port": 0}}, - ) as cluster: -- - async with Client(cluster, asynchronous=True) as client: -- -- cluster.adapt(minimum=1, maximum=1) -- - # Scale up a cluster with 1 worker. -- while len(cluster.workers) != 1: -+ cluster.adapt(minimum=1, maximum=1) -+ while not cluster.workers: - await asyncio.sleep(0.01) - - future = client.submit(sleep, 0.1) -@@ -163,11 +161,11 @@ async def test_adaptive_killed_worker(cl - await cluster.workers[worker_id].kill() - - # Wait for the worker to re-spawn and finish sleeping. -- await future.result(timeout=5) -+ await future - - --@pytest.mark.asyncio --async def test_unexpected_closed_worker(cleanup): -+@gen_test() -+async def test_unexpected_closed_worker(): - worker = {"cls": Worker, "options": {"nthreads": 1}} - with dask.config.set({"distributed.deploy.lost-worker-timeout": "10ms"}): - async with SpecCluster( -@@ -197,25 +195,20 @@ async def test_unexpected_closed_worker( - assert len(cluster.workers) == 2 - - --@pytest.mark.flaky(reruns=10, reruns_delay=5) --@pytest.mark.slow --@pytest.mark.asyncio --async def test_restart(cleanup): -- # Regression test for https://github.com/dask/distributed/issues/3062 -+@gen_test(timeout=60) -+async def test_restart(): -+ """Regression test for https://github.com/dask/distributed/issues/3062""" - worker = {"cls": Nanny, "options": {"nthreads": 1}} -- with dask.config.set({"distributed.deploy.lost-worker-timeout": "2s"}): -- async with SpecCluster( -- asynchronous=True, scheduler=scheduler, worker=worker -- ) as cluster: -- async with Client(cluster, asynchronous=True) as client: -- cluster.scale(2) -- await cluster -- assert len(cluster.workers) == 2 -- await client.restart() -- start = time() -- while len(cluster.workers) < 2: -- await asyncio.sleep(0.5) -- assert time() < start + 60 -+ async with SpecCluster( -+ asynchronous=True, scheduler=scheduler, worker=worker -+ ) as cluster: -+ async with Client(cluster, asynchronous=True) as client: -+ cluster.scale(2) -+ await cluster -+ assert len(cluster.workers) == 2 -+ await client.restart() -+ while len(cluster.workers) < 2: -+ await asyncio.sleep(0.01) - - - @pytest.mark.skipif(WINDOWS, reason="HTTP Server doesn't close out") -Index: distributed-2021.7.0/distributed/diagnostics/tests/test_progress.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/diagnostics/tests/test_progress.py -+++ distributed-2021.7.0/distributed/diagnostics/tests/test_progress.py -@@ -4,6 +4,7 @@ import pytest - - from distributed import Nanny - from distributed.client import wait -+from distributed.compatibility import LINUX - from distributed.diagnostics.progress import ( - AllProgress, - GroupProgress, -@@ -95,8 +96,9 @@ def check_bar_completed(capsys, width=40 - assert percent == "100% Completed" - - -+@pytest.mark.flaky(condition=not COMPILED and LINUX, reruns=10, reruns_delay=5) - @pytest.mark.xfail(COMPILED, reason="Fails with cythonized scheduler") --@gen_cluster(client=True, Worker=Nanny, timeout=None) -+@gen_cluster(client=True, Worker=Nanny) - async def test_AllProgress(c, s, a, b): - x, y, z = c.map(inc, [1, 2, 3]) - xx, yy, zz = c.map(dec, [x, y, z]) -@@ -178,8 +180,9 @@ async def test_AllProgress(c, s, a, b): - assert all(set(d) == {"div"} for d in p.state.values()) - - -+@pytest.mark.flaky(condition=LINUX, reruns=10, reruns_delay=5) - @gen_cluster(client=True, Worker=Nanny) --async def test_AllProgress_lost_key(c, s, a, b, timeout=None): -+async def test_AllProgress_lost_key(c, s, a, b): - p = AllProgress(s) - futures = c.map(inc, range(5)) - await wait(futures) -@@ -188,10 +191,8 @@ async def test_AllProgress_lost_key(c, s - await a.close() - await b.close() - -- start = time() - while len(p.state["memory"]["inc"]) > 0: -- await asyncio.sleep(0.1) -- assert time() < start + 5 -+ await asyncio.sleep(0.01) - - - @gen_cluster(client=True) -Index: distributed-2021.7.0/distributed/diagnostics/tests/test_scheduler_plugin.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/diagnostics/tests/test_scheduler_plugin.py -+++ distributed-2021.7.0/distributed/diagnostics/tests/test_scheduler_plugin.py -@@ -33,7 +33,7 @@ async def test_simple(c, s, a, b): - assert counter not in s.plugins - - --@gen_cluster(nthreads=[], client=False) -+@gen_cluster(nthreads=[]) - async def test_add_remove_worker(s): - events = [] - -@@ -71,7 +71,7 @@ async def test_add_remove_worker(s): - assert events == [] - - --@gen_cluster(nthreads=[], client=False) -+@gen_cluster(nthreads=[]) - async def test_async_add_remove_worker(s): - events = [] - -Index: distributed-2021.7.0/distributed/diagnostics/tests/test_widgets.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/diagnostics/tests/test_widgets.py -+++ distributed-2021.7.0/distributed/diagnostics/tests/test_widgets.py -@@ -5,8 +5,6 @@ pytest.importorskip("ipywidgets") - from ipykernel.comm import Comm - from ipywidgets import Widget - --from distributed.compatibility import WINDOWS -- - ################# - # Utility stuff # - ################# -@@ -145,7 +143,6 @@ async def test_multi_progressbar_widget( - assert sorted(capacities, reverse=True) == capacities - - --@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=WINDOWS) - @gen_cluster() - async def test_multi_progressbar_widget_after_close(s, a, b): - s.update_graph( -Index: distributed-2021.7.0/distributed/distributed.yaml -=================================================================== ---- distributed-2021.7.0.orig/distributed/distributed.yaml -+++ distributed-2021.7.0/distributed/distributed.yaml -@@ -176,7 +176,7 @@ distributed: - threads: 0 # Threads to use. 0 for single-threaded, -1 to infer from cpu count. - - timeouts: -- connect: 10s # time before connecting fails -+ connect: 30s # time before connecting fails - tcp: 30s # time before calling an unresponsive connection dead - - require-encryption: null # Whether to require encryption on non-local comms -Index: distributed-2021.7.0/distributed/nanny.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/nanny.py -+++ distributed-2021.7.0/distributed/nanny.py -@@ -384,7 +384,7 @@ class Nanny(ServerNode): - raise - return result - -- async def restart(self, comm=None, timeout=2, executor_wait=True): -+ async def restart(self, comm=None, timeout=30, executor_wait=True): - async def _(): - if self.process is not None: - await self.kill() -@@ -393,7 +393,9 @@ class Nanny(ServerNode): - try: - await asyncio.wait_for(_(), timeout) - except TimeoutError: -- logger.error("Restart timed out, returning before finished") -+ logger.error( -+ f"Restart timed out after {timeout}s; returning before finished" -+ ) - return "timed out" - else: - return "OK" -Index: distributed-2021.7.0/distributed/protocol/tests/test_pickle.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/protocol/tests/test_pickle.py -+++ distributed-2021.7.0/distributed/protocol/tests/test_pickle.py -@@ -128,7 +128,6 @@ def test_pickle_numpy(): - assert (deserialize(h, f) == x).all() - - --@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=sys.version_info[:2] == (3, 8)) - def test_pickle_functions(): - def make_closure(): - value = 1 -Index: distributed-2021.7.0/distributed/scheduler.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/scheduler.py -+++ distributed-2021.7.0/distributed/scheduler.py -@@ -3440,19 +3440,18 @@ class Scheduler(SchedulerState, ServerNo - - http_server_modules = dask.config.get("distributed.scheduler.http.routes") - show_dashboard = dashboard or (dashboard is None and dashboard_address) -- missing_bokeh = False - # install vanilla route if show_dashboard but bokeh is not installed - if show_dashboard: - try: - import distributed.dashboard.scheduler - except ImportError: -- missing_bokeh = True -+ show_dashboard = False - http_server_modules.append("distributed.http.scheduler.missing_bokeh") - routes = get_handlers( - server=self, modules=http_server_modules, prefix=http_prefix - ) - self.start_http_server(routes, dashboard_address, default_port=8787) -- if show_dashboard and not missing_bokeh: -+ if show_dashboard: - distributed.dashboard.scheduler.connect( - self.http_application, self.http_server, self, prefix=http_prefix - ) -@@ -5467,8 +5466,8 @@ class Scheduler(SchedulerState, ServerNo - for collection in self._task_state_collections: - collection.clear() - -- async def restart(self, client=None, timeout=3): -- """Restart all workers. Reset local state.""" -+ async def restart(self, client=None, timeout=30): -+ """Restart all workers. Reset local state.""" - parent: SchedulerState = cast(SchedulerState, self) - with log_errors(): - -Index: distributed-2021.7.0/distributed/tests/test_actor.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/tests/test_actor.py -+++ distributed-2021.7.0/distributed/tests/test_actor.py -@@ -485,10 +485,8 @@ async def test_compute(c, s, a, b): - result = await c.compute(final, actors=counter) - assert result == 0 + 1 + 2 + 3 + 4 - -- start = time() - while a.data or b.data: - await asyncio.sleep(0.01) -- assert time() < start + 30 - - - def test_compute_sync(client): -Index: distributed-2021.7.0/distributed/tests/test_as_completed.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/tests/test_as_completed.py -+++ distributed-2021.7.0/distributed/tests/test_as_completed.py -@@ -14,7 +14,7 @@ from distributed.utils_test import gen_c - - - @gen_cluster(client=True) --async def test__as_completed(c, s, a, b): -+async def test_as_completed_async(c, s, a, b): - x = c.submit(inc, 1) - y = c.submit(inc, 1) - z = c.submit(inc, 2) -@@ -29,7 +29,7 @@ async def test__as_completed(c, s, a, b) - assert result in [x, y, z] - - --def test_as_completed(client): -+def test_as_completed_sync(client): - x = client.submit(inc, 1) - y = client.submit(inc, 2) - z = client.submit(inc, 1) -@@ -201,7 +201,6 @@ async def test_as_completed_with_results - assert str(exc.value) == "hello!" - - --@pytest.mark.flaky(reruns=10, reruns_delay=5) - def test_as_completed_with_results_no_raise(client): - x = client.submit(throws, 1) - y = client.submit(inc, 5) -@@ -260,7 +259,7 @@ async def test_as_completed_with_results - assert dd[z][0] == 2 - - --@gen_cluster(client=True, timeout=None) -+@gen_cluster(client=True) - async def test_clear(c, s, a, b): - futures = c.map(inc, range(3)) - ac = as_completed(futures) -Index: distributed-2021.7.0/distributed/tests/test_asyncprocess.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/tests/test_asyncprocess.py -+++ distributed-2021.7.0/distributed/tests/test_asyncprocess.py -@@ -8,6 +8,7 @@ import weakref - from datetime import timedelta - from time import sleep - -+import psutil - import pytest - from tornado import gen - from tornado.locks import Event -@@ -280,13 +281,9 @@ async def test_child_main_thread(): - q._writer.close() - - --@pytest.mark.skipif( -- sys.platform.startswith("win"), reason="num_fds not supported on windows" --) -+@pytest.mark.skipif(WINDOWS, reason="num_fds not supported on windows") - @gen_test() - async def test_num_fds(): -- psutil = pytest.importorskip("psutil") -- - # Warm up - proc = AsyncProcess(target=exit_now) - proc.daemon = True -@@ -303,11 +300,8 @@ async def test_num_fds(): - assert not proc.is_alive() - assert proc.exitcode == 0 - -- start = time() - while p.num_fds() > before: -- await asyncio.sleep(0.1) -- print("fds:", before, p.num_fds()) -- assert time() < start + 10 -+ await asyncio.sleep(0.01) - - - @gen_test() -@@ -407,7 +401,7 @@ def test_asyncprocess_child_teardown_on_ - try: - readable = children_alive.poll(short_timeout) - except BrokenPipeError: -- assert sys.platform.startswith("win"), "should only raise on windows" -+ assert WINDOWS, "should only raise on windows" - # Broken pipe implies closed, which is readable. - readable = True - -@@ -422,7 +416,7 @@ def test_asyncprocess_child_teardown_on_ - except EOFError: - pass # Test passes. - except BrokenPipeError: -- assert sys.platform.startswith("win"), "should only raise on windows" -+ assert WINDOWS, "should only raise on windows" - # Test passes. - else: - # Oops, children_alive read something. It should be closed. If -Index: distributed-2021.7.0/distributed/tests/test_client.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/tests/test_client.py -+++ distributed-2021.7.0/distributed/tests/test_client.py -@@ -55,7 +55,7 @@ from distributed.client import ( - wait, - ) - from distributed.comm import CommClosedError --from distributed.compatibility import MACOS, WINDOWS -+from distributed.compatibility import LINUX, WINDOWS - from distributed.core import Status - from distributed.metrics import time - from distributed.objects import HasWhat, WhoHas -@@ -95,7 +95,7 @@ from distributed.utils_test import ( - ) - - --@gen_cluster(client=True, timeout=None) -+@gen_cluster(client=True) - async def test_submit(c, s, a, b): - x = c.submit(inc, 10) - assert not x.done() -@@ -624,7 +624,7 @@ async def test_limit_concurrent_gatherin - assert len(a.outgoing_transfer_log) + len(b.outgoing_transfer_log) < 100 - - --@gen_cluster(client=True, timeout=None) -+@gen_cluster(client=True) - async def test_get(c, s, a, b): - future = c.get({"x": (inc, 1)}, "x", sync=False) - assert isinstance(future, Future) -@@ -717,7 +717,7 @@ async def test_wait_first_completed(c, s - assert y.status == "pending" - - --@gen_cluster(client=True, timeout=2) -+@gen_cluster(client=True) - async def test_wait_timeout(c, s, a, b): - future = c.submit(sleep, 0.3) - with pytest.raises(TimeoutError): -@@ -794,7 +794,7 @@ async def test_garbage_collection_with_s - await asyncio.sleep(0.1) - - --@gen_cluster(timeout=1000, client=True) -+@gen_cluster(client=True) - async def test_recompute_released_key(c, s, a, b): - x = c.submit(inc, 100) - result1 = await x -@@ -907,9 +907,7 @@ async def test_tokenize_on_futures(c, s, - assert tok == tokenize(y) - - --@pytest.mark.skipif( -- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" --) -+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") - @gen_cluster([("127.0.0.1", 1), ("127.0.0.2", 2)], client=True) - async def test_restrictions_submit(c, s, a, b): - x = c.submit(inc, 1, workers={a.ip}) -@@ -936,9 +934,7 @@ async def test_restrictions_ip_port(c, s - assert y.key in b.data - - --@pytest.mark.skipif( -- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" --) -+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") - @gen_cluster([("127.0.0.1", 1), ("127.0.0.2", 2)], client=True) - async def test_restrictions_map(c, s, a, b): - L = c.map(inc, range(5), workers={a.ip}) -@@ -950,9 +946,7 @@ async def test_restrictions_map(c, s, a, - assert s.host_restrictions[x.key] == {a.ip} - - --@pytest.mark.skipif( -- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" --) -+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") - @gen_cluster([("127.0.0.1", 1), ("127.0.0.2", 2)], client=True) - async def test_restrictions_get(c, s, a, b): - dsk = {"x": 1, "y": (inc, "x"), "z": (inc, "y")} -@@ -991,7 +985,7 @@ async def dont_test_bad_restrictions_rai - assert z.key in str(e) - - --@gen_cluster(client=True, timeout=None) -+@gen_cluster(client=True) - async def test_remove_worker(c, s, a, b): - L = c.map(inc, range(20)) - await wait(L) -@@ -1280,9 +1274,7 @@ async def test_get_nbytes(c, s, a, b): - assert s.get_nbytes(summary=False) == {x.key: sizeof(1), y.key: sizeof(2)} - - --@pytest.mark.skipif( -- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" --) -+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") - @gen_cluster([("127.0.0.1", 1), ("127.0.0.2", 2)], client=True) - async def test_nbytes_determines_worker(c, s, a, b): - x = c.submit(identity, 1, workers=[a.ip]) -@@ -1455,7 +1447,7 @@ async def test_scatter_direct_empty(c, s - await c.scatter(123, direct=True, timeout=0.1) - - --@gen_cluster(client=True, timeout=None, nthreads=[("127.0.0.1", 1)] * 5) -+@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 5) - async def test_scatter_direct_spread_evenly(c, s, *workers): - futures = [] - for i in range(10): -@@ -1799,7 +1791,7 @@ async def test_remote_scatter_gather(c, - assert (xx, yy, zz) == (1, 2, 3) - - --@gen_cluster(timeout=1000, client=True) -+@gen_cluster(client=True) - async def test_remote_submit_on_Future(c, s, a, b): - x = c.submit(lambda x: x + 1, 1) - y = c.submit(lambda x: x + 1, x) -@@ -1836,9 +1828,7 @@ async def test_client_with_scheduler(c, - assert result == 12 - - --@pytest.mark.skipif( -- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" --) -+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") - @gen_cluster([("127.0.0.1", 1), ("127.0.0.2", 2)], client=True) - async def test_allow_restrictions(c, s, a, b): - aws = s.workers[a.address] -@@ -1971,7 +1961,7 @@ async def test_badly_serialized_input(c, - assert "hello!" in str(info.value) - - --@pytest.mark.skipif("True", reason="") -+@pytest.mark.skip - async def test_badly_serialized_input_stderr(capsys, c): - o = BadlySerializedObject() - future = c.submit(inc, o) -@@ -2853,7 +2843,6 @@ async def test_persist_get(c, s, a, b): - - @pytest.mark.skipif(WINDOWS, reason="num_fds not supported on windows") - def test_client_num_fds(loop): -- psutil = pytest.importorskip("psutil") - with cluster() as (s, [a, b]): - proc = psutil.Process() - with Client(s["address"], loop=loop) as c: # first client to start loop -@@ -2864,7 +2853,7 @@ def test_client_num_fds(loop): - start = time() - while proc.num_fds() > before: - sleep(0.01) -- assert time() < start + 4 -+ assert time() < start + 10, (before, proc.num_fds()) - - - @gen_cluster() -@@ -3035,9 +3024,7 @@ async def test_receive_lost_key(c, s, a, - await asyncio.sleep(0.01) - - --@pytest.mark.skipif( -- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" --) -+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") - @gen_cluster([("127.0.0.1", 1), ("127.0.0.2", 2)], client=True) - async def test_unrunnable_task_runs(c, s, a, b): - x = c.submit(inc, 1, workers=[a.ip]) -@@ -3073,9 +3060,7 @@ async def test_add_worker_after_tasks(c, - await n.close() - - --@pytest.mark.skipif( -- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" --) -+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") - @gen_cluster([("127.0.0.1", 1), ("127.0.0.2", 2)], client=True) - async def test_workers_register_indirect_data(c, s, a, b): - [x] = await c.scatter([1], workers=a.address) -@@ -3205,13 +3190,10 @@ async def test_client_replicate(c, s, *w - assert len(s.tasks[y.key].who_has) == 10 - - --@pytest.mark.skipif( -- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" --) -+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") - @gen_cluster( - client=True, - nthreads=[("127.0.0.1", 1), ("127.0.0.2", 1), ("127.0.0.2", 1)], -- timeout=None, - ) - async def test_client_replicate_host(client, s, a, b, c): - aws = s.workers[a.address] -@@ -3565,7 +3547,7 @@ def test_get_returns_early(c): - - - @pytest.mark.slow --@gen_cluster(Worker=Nanny, client=True) -+@gen_cluster(Worker=Nanny, client=True, timeout=60) - async def test_Client_clears_references_after_restart(c, s, a, b): - x = c.submit(inc, 1) - assert x.key in c.refcount -@@ -3789,7 +3771,6 @@ async def test_reconnect_timeout(c, s): - @pytest.mark.skipif(WINDOWS, reason="num_fds not supported on windows") - @pytest.mark.parametrize("worker,count,repeat", [(Worker, 100, 5), (Nanny, 10, 20)]) - def test_open_close_many_workers(loop, worker, count, repeat): -- psutil = pytest.importorskip("psutil") - proc = psutil.Process() - - with cluster(nworkers=0, active_rpc_timeout=2) as (s, _): -@@ -3857,7 +3838,7 @@ def test_open_close_many_workers(loop, w - raise ValueError("File descriptors did not clean up") - - --@gen_cluster(client=False, timeout=None) -+@gen_cluster() - async def test_idempotence(s, a, b): - c = await Client(s.address, asynchronous=True) - f = await Client(s.address, asynchronous=True) -@@ -4070,7 +4051,7 @@ async def test_scatter_compute_store_los - assert z.status == "cancelled" - - --@gen_cluster(client=False) -+@gen_cluster() - async def test_serialize_future(s, a, b): - c1 = await Client(s.address, asynchronous=True) - c2 = await Client(s.address, asynchronous=True) -@@ -4091,7 +4072,7 @@ async def test_serialize_future(s, a, b) - await c2.close() - - --@gen_cluster(client=False) -+@gen_cluster() - async def test_temp_default_client(s, a, b): - c1 = await Client(s.address, asynchronous=True) - c2 = await Client(s.address, asynchronous=True) -@@ -4172,7 +4153,7 @@ def test_as_current_is_thread_local(s): - t2.join() - - --@gen_cluster(client=False) -+@gen_cluster() - async def test_as_current_is_task_local(s, a, b): - l1 = asyncio.Lock() - l2 = asyncio.Lock() -@@ -4557,7 +4538,7 @@ def assert_no_data_loss(scheduler): - assert not (k == key and v == "waiting") - - --@gen_cluster(client=True, timeout=None) -+@gen_cluster(client=True) - async def test_interleave_computations(c, s, a, b): - import distributed - -@@ -4592,7 +4573,7 @@ async def test_interleave_computations(c - - - @pytest.mark.skip(reason="Now prefer first-in-first-out") --@gen_cluster(client=True, timeout=None) -+@gen_cluster(client=True) - async def test_interleave_computations_map(c, s, a, b): - xs = c.map(slowinc, range(30), delay=0.02) - ys = c.map(slowdec, xs, delay=0.02) -@@ -4621,7 +4602,6 @@ async def test_scatter_dict_workers(c, s - assert "a" in a.data or "a" in b.data - - --@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=MACOS) - @pytest.mark.slow - @gen_test() - async def test_client_timeout(): -@@ -4629,18 +4609,17 @@ async def test_client_timeout(): - - s = Scheduler(loop=c.loop, port=57484) - await asyncio.sleep(4) -+ - try: - await s - except OSError: # port in use - await c.close() - return - -- start = time() -- await c - try: -- assert time() < start + 2 -- finally: -+ await c - await c.close() -+ finally: - await s.close() - - -@@ -5167,7 +5146,7 @@ async def test_serialize_collections(c, - assert result == sum(range(10)) - - --@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 1, timeout=100) -+@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 1) - async def test_secede_simple(c, s, a): - def f(): - client = get_client() -@@ -5178,7 +5157,6 @@ async def test_secede_simple(c, s, a): - assert result == 2 - - --@pytest.mark.flaky(reruns=10, reruns_delay=5) - @pytest.mark.slow - @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 2, timeout=60) - async def test_secede_balances(c, s, a, b): -@@ -5276,18 +5254,15 @@ def _dynamic_workload(x, delay=0.01): - return total.result() - - --def _test_dynamic_workloads_sync(c, delay): -- future = c.submit(_dynamic_workload, 0, delay=delay) -- assert future.result(timeout=40) == 52 -- -- - def test_dynamic_workloads_sync(c): -- _test_dynamic_workloads_sync(c, delay=0.02) -+ future = c.submit(_dynamic_workload, 0, delay=0.02) -+ assert future.result(timeout=20) == 52 - - - @pytest.mark.slow - def test_dynamic_workloads_sync_random(c): -- _test_dynamic_workloads_sync(c, delay="random") -+ future = c.submit(_dynamic_workload, 0, delay="random") -+ assert future.result(timeout=20) == 52 - - - @pytest.mark.xfail(COMPILED, reason="Fails with cythonized scheduler") -@@ -5424,6 +5399,7 @@ async def test_call_stack_collections_al - assert result - - -+@pytest.mark.flaky(condition=WINDOWS, reruns=10, reruns_delay=5) - @gen_cluster(client=True, worker_kwargs={"profile_cycle_interval": "100ms"}) - async def test_profile(c, s, a, b): - futures = c.map(slowinc, range(10), delay=0.05, workers=a.address) -@@ -5843,9 +5819,15 @@ async def test_scatter_error_cancel(c, s - assert y.status == "error" # not cancelled - - -+@pytest.mark.xfail(reason="GH#5409 Dask-Default-Threads are frequently detected") - def test_no_threads_lingering(): -+ if threading.active_count() < 40: -+ return - active = dict(threading._active) -- assert threading.active_count() < 40, list(active.values()) -+ print(f"==== Found {len(active)} active threads: ====") -+ for t in active.values(): -+ print(t) -+ assert False - - - @gen_cluster() -@@ -6071,18 +6053,18 @@ async def test_file_descriptors_dont_lea - df = dask.datasets.timeseries(freq="10s", dtypes={"x": int, "y": float}) - - proc = psutil.Process() -- start = proc.num_fds() -+ before = proc.num_fds() - async with Scheduler(port=0, dashboard_address=":0") as s: -- async with Worker(s.address, nthreads=2) as a, Worker( -- s.address, nthreads=2 -- ) as b: -- async with Client(s.address, asynchronous=True) as c: -- await df.sum().persist() -+ async with Worker(s.address), Worker(s.address), Client( -+ s.address, asynchronous=True -+ ): -+ assert proc.num_fds() > before -+ await df.sum().persist() - -- begin = time() -- while proc.num_fds() > begin: -+ start = time() -+ while proc.num_fds() > before: - await asyncio.sleep(0.01) -- assert time() < begin + 5, (start, proc.num_fds()) -+ assert time() < start + 10, (before, proc.num_fds()) - - - @pytest.mark.asyncio -Index: distributed-2021.7.0/distributed/tests/test_client_executor.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/tests/test_client_executor.py -+++ distributed-2021.7.0/distributed/tests/test_client_executor.py -@@ -13,6 +13,7 @@ import pytest - from tlz import take - - from distributed import Client -+from distributed.compatibility import MACOS - from distributed.utils import CancelledError - from distributed.utils_test import ( - cluster, -@@ -93,13 +94,12 @@ def test_wait(client): - assert "hello" in str(errors[0]) - - --@pytest.mark.flaky(reruns=10, reruns_delay=5) - def test_cancellation(client): - with client.get_executor(pure=False) as e: - fut = e.submit(time.sleep, 2.0) - start = time.time() - while number_of_processing_tasks(client) == 0: -- assert time.time() < start + 1 -+ assert time.time() < start + 10 - time.sleep(0.01) - assert not fut.done() - -@@ -107,7 +107,7 @@ def test_cancellation(client): - assert fut.cancelled() - start = time.time() - while number_of_processing_tasks(client) != 0: -- assert time.time() < start + 1 -+ assert time.time() < start + 10 - time.sleep(0.01) - - with pytest.raises(CancelledError): -@@ -118,7 +118,7 @@ def test_cancellation(client): - N = 10 - fs = [e.submit(slowinc, i, delay=0.02) for i in range(N)] - fs[3].cancel() -- res = wait(fs, return_when=FIRST_COMPLETED) -+ res = wait(fs, return_when=FIRST_COMPLETED, timeout=30) - assert len(res.not_done) > 0 - assert len(res.done) >= 1 - -@@ -132,10 +132,11 @@ def test_cancellation(client): - fs[3].cancel() - fs[8].cancel() - -- n_cancelled = sum(f.cancelled() for f in as_completed(fs)) -+ n_cancelled = sum(f.cancelled() for f in as_completed(fs, timeout=30)) - assert n_cancelled == 2 - - -+@pytest.mark.flaky(condition=MACOS, reruns=10, reruns_delay=5) - def test_map(client): - with client.get_executor() as e: - N = 10 -Index: distributed-2021.7.0/distributed/tests/test_collections.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/tests/test_collections.py -+++ distributed-2021.7.0/distributed/tests/test_collections.py -@@ -40,7 +40,7 @@ def assert_equal(a, b): - assert a == b - - --@gen_cluster(timeout=240, client=True) -+@gen_cluster(client=True) - async def test_dataframes(c, s, a, b): - df = pd.DataFrame( - {"x": np.random.random(1000), "y": np.random.random(1000)}, -Index: distributed-2021.7.0/distributed/tests/test_diskutils.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/tests/test_diskutils.py -+++ distributed-2021.7.0/distributed/tests/test_diskutils.py -@@ -12,7 +12,7 @@ import pytest - - import dask - --from distributed.compatibility import MACOS, WINDOWS -+from distributed.compatibility import MACOS - from distributed.diskutils import WorkSpace - from distributed.metrics import time - from distributed.utils import mp_context -@@ -273,16 +273,14 @@ def _test_workspace_concurrency(tmpdir, - return n_created, n_purged - - --@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=MACOS) - @pytest.mark.slow -+@pytest.mark.xfail(condition=MACOS, reason="extremely flaky") - def test_workspace_concurrency(tmpdir): -- if WINDOWS: -- raise pytest.xfail.Exception("TODO: unknown failure on windows") - _test_workspace_concurrency(tmpdir, 5.0, 6) - - --@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=MACOS) - @pytest.mark.slow -+@pytest.mark.xfail(condition=MACOS, reason="extremely flaky") - def test_workspace_concurrency_intense(tmpdir): - n_created, n_purged = _test_workspace_concurrency(tmpdir, 8.0, 16) - assert n_created >= 100 -Index: distributed-2021.7.0/distributed/tests/test_failed_workers.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/tests/test_failed_workers.py -+++ distributed-2021.7.0/distributed/tests/test_failed_workers.py -@@ -75,12 +75,7 @@ def test_gather_after_failed_worker(loop - assert result == list(map(inc, range(10))) - - --@gen_cluster( -- client=True, -- Worker=Nanny, -- nthreads=[("127.0.0.1", 1)] * 4, -- config={"distributed.comm.timeouts.connect": "1s"}, --) -+@gen_cluster(client=True, Worker=Nanny, nthreads=[("127.0.0.1", 1)] * 4) - async def test_gather_then_submit_after_failed_workers(c, s, w, x, y, z): - L = c.map(inc, range(20)) - await wait(L) -@@ -88,7 +83,7 @@ async def test_gather_then_submit_after_ - w.process.process._process.terminate() - total = c.submit(sum, L) - -- for i in range(3): -+ for _ in range(3): - await wait(total) - addr = first(s.tasks[total.key].who_has).address - for worker in [x, y, z]: -@@ -101,7 +96,7 @@ async def test_gather_then_submit_after_ - - - @pytest.mark.xfail(COMPILED, reason="Fails with cythonized scheduler") --@gen_cluster(Worker=Nanny, timeout=60, client=True) -+@gen_cluster(Worker=Nanny, client=True, timeout=60) - async def test_failed_worker_without_warning(c, s, a, b): - L = c.map(inc, range(10)) - await wait(L) -@@ -298,18 +293,20 @@ async def test_multiple_clients_restart( - await c2.close() - - --@pytest.mark.flaky(reruns=10, reruns_timeout=5, condition=MACOS) - @gen_cluster(Worker=Nanny, timeout=60) - async def test_restart_scheduler(s, a, b): -- import gc -+ assert len(s.nthreads) == 2 -+ pids = (a.pid, b.pid) -+ assert pids[0] -+ assert pids[1] - -- gc.collect() -- addrs = (a.worker_address, b.worker_address) - await s.restart() -- assert len(s.nthreads) == 2 -- addrs2 = (a.worker_address, b.worker_address) - -- assert addrs != addrs2 -+ assert len(s.nthreads) == 2 -+ pids2 = (a.pid, b.pid) -+ assert pids2[0] -+ assert pids2[1] -+ assert pids != pids2 - - - @gen_cluster(Worker=Nanny, client=True, timeout=60) -@@ -325,6 +322,8 @@ async def test_forgotten_futures_dont_cl - await y - - -+@pytest.mark.slow -+@pytest.mark.flaky(condition=MACOS, reruns=10, reruns_delay=5) - @gen_cluster(client=True, timeout=60, active_rpc_timeout=10) - async def test_broken_worker_during_computation(c, s, a, b): - s.allowed_failures = 100 -@@ -404,14 +403,13 @@ class SlowTransmitData: - return parse_bytes(dask.config.get("distributed.comm.offload")) + 1 - - -+@pytest.mark.flaky(reruns=10, reruns_delay=5) - @gen_cluster(client=True) - async def test_worker_who_has_clears_after_failed_connection(c, s, a, b): - n = await Nanny(s.address, nthreads=2, loop=s.loop) - -- start = time() - while len(s.nthreads) < 3: - await asyncio.sleep(0.01) -- assert time() < start + 5 - - def slow_ser(x, delay): - return SlowTransmitData(x, delay=delay) -@@ -497,7 +495,7 @@ async def test_restart_timeout_on_long_r - with captured_logger("distributed.scheduler") as sio: - future = c.submit(sleep, 3600) - await asyncio.sleep(0.1) -- await c.restart(timeout=20) -+ await c.restart() - - text = sio.getvalue() - assert "timeout" not in text.lower() -@@ -547,7 +545,7 @@ class SlowDeserialize: - return parse_bytes(dask.config.get("distributed.comm.offload")) + 1 - - --@gen_cluster(client=True, timeout=None) -+@gen_cluster(client=True) - async def test_handle_superfluous_data(c, s, a, b): - """ - See https://github.com/dask/distributed/pull/4784#discussion_r649210094 -Index: distributed-2021.7.0/distributed/tests/test_nanny.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/tests/test_nanny.py -+++ distributed-2021.7.0/distributed/tests/test_nanny.py -@@ -4,9 +4,10 @@ import logging - import multiprocessing as mp - import os - import random --import sys - from contextlib import suppress -+from time import sleep - -+import psutil - import pytest - from tlz import first, valmap - from tornado.ioloop import IOLoop -@@ -14,6 +15,7 @@ from tornado.ioloop import IOLoop - import dask - - from distributed import Client, Nanny, Scheduler, Worker, rpc, wait, worker -+from distributed.compatibility import LINUX, WINDOWS - from distributed.core import CommClosedError, Status - from distributed.diagnostics import SchedulerPlugin - from distributed.metrics import time -@@ -72,12 +74,11 @@ async def test_str(s, a, b): - - @gen_cluster(nthreads=[], client=True) - async def test_nanny_process_failure(c, s): -- n = await Nanny(s.address, nthreads=2, loop=s.loop) -+ n = await Nanny(s.address, nthreads=2) - first_dir = n.worker_dir - - assert os.path.exists(first_dir) - -- original_address = n.worker_address - ww = rpc(n.worker_address) - await ww.update_data(data=valmap(dumps, {"x": 1, "y": 2})) - pid = n.pid -@@ -85,23 +86,17 @@ async def test_nanny_process_failure(c, - with suppress(CommClosedError): - await c.run(os._exit, 0, workers=[n.worker_address]) - -- start = time() - while n.pid == pid: # wait while process dies and comes back - await asyncio.sleep(0.01) -- assert time() - start < 5 - -- start = time() - await asyncio.sleep(1) - while not n.is_alive(): # wait while process comes back - await asyncio.sleep(0.01) -- assert time() - start < 5 - - # assert n.worker_address != original_address # most likely - -- start = time() - while n.worker_address not in s.nthreads or n.worker_dir is None: - await asyncio.sleep(0.01) -- assert time() - start < 5 - - second_dir = n.worker_dir - -@@ -115,7 +110,6 @@ async def test_nanny_process_failure(c, - - @gen_cluster(nthreads=[]) - async def test_run(s): -- pytest.importorskip("psutil") - n = await Nanny(s.address, nthreads=2, loop=s.loop) - - with rpc(n.address) as nn: -@@ -173,7 +167,7 @@ async def test_nanny_alt_worker_class(c, - - - @pytest.mark.slow --@gen_cluster(client=False, nthreads=[]) -+@gen_cluster(nthreads=[]) - async def test_nanny_death_timeout(s): - await s.close() - w = Nanny(s.address, death_timeout=1) -@@ -198,12 +192,9 @@ async def test_random_seed(c, s, a, b): - await check_func(lambda a, b: np.random.randint(a, b)) - - --@pytest.mark.skipif( -- sys.platform.startswith("win"), reason="num_fds not supported on windows" --) --@gen_cluster(client=False, nthreads=[]) -+@pytest.mark.skipif(WINDOWS, reason="num_fds not supported on windows") -+@gen_cluster(nthreads=[]) - async def test_num_fds(s): -- psutil = pytest.importorskip("psutil") - proc = psutil.Process() - - # Warm up -@@ -219,16 +210,12 @@ async def test_num_fds(s): - await asyncio.sleep(0.1) - await w.close() - -- start = time() - while proc.num_fds() > before: - print("fds:", before, proc.num_fds()) - await asyncio.sleep(0.1) -- assert time() < start + 10 - - --@pytest.mark.skipif( -- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" --) -+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") - @gen_cluster(client=True, nthreads=[]) - async def test_worker_uses_same_host_as_nanny(c, s): - for host in ["tcp://0.0.0.0", "tcp://127.0.0.2"]: -@@ -273,29 +260,24 @@ async def test_nanny_timeout(c, s, a): - nthreads=[("127.0.0.1", 1)], - client=True, - Worker=Nanny, -- worker_kwargs={"memory_limit": 1e8}, -- timeout=20, -- clean_kwargs={"threads": False}, -+ worker_kwargs={"memory_limit": "400 MiB"}, - ) - async def test_nanny_terminate(c, s, a): -- from time import sleep -- - def leak(): - L = [] - while True: -- L.append(b"0" * 5000000) -+ L.append(b"0" * 5_000_000) - sleep(0.01) - -- proc = a.process.pid -+ before = a.process.pid - with captured_logger(logging.getLogger("distributed.nanny")) as logger: - future = c.submit(leak) -- start = time() -- while a.process.pid == proc: -- await asyncio.sleep(0.1) -- assert time() < start + 10 -- out = logger.getvalue() -- assert "restart" in out.lower() -- assert "memory" in out.lower() -+ while a.process.pid == before: -+ await asyncio.sleep(0.01) -+ -+ out = logger.getvalue() -+ assert "restart" in out.lower() -+ assert "memory" in out.lower() - - - @gen_cluster( -Index: distributed-2021.7.0/distributed/tests/test_publish.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/tests/test_publish.py -+++ distributed-2021.7.0/distributed/tests/test_publish.py -@@ -11,7 +11,7 @@ from distributed.protocol import Seriali - from distributed.utils_test import gen_cluster, inc - - --@gen_cluster(client=False) -+@gen_cluster() - async def test_publish_simple(s, a, b): - c = Client(s.address, asynchronous=True) - f = Client(s.address, asynchronous=True) -@@ -37,7 +37,7 @@ async def test_publish_simple(s, a, b): - await asyncio.gather(c.close(), f.close()) - - --@gen_cluster(client=False) -+@gen_cluster() - async def test_publish_non_string_key(s, a, b): - async with Client(s.address, asynchronous=True) as c: - for name in [("a", "b"), 9.0, 8]: -@@ -52,7 +52,7 @@ async def test_publish_non_string_key(s, - assert name in datasets - - --@gen_cluster(client=False) -+@gen_cluster() - async def test_publish_roundtrip(s, a, b): - c = await Client(s.address, asynchronous=True) - f = await Client(s.address, asynchronous=True) -@@ -147,7 +147,7 @@ def test_unpublish_multiple_datasets_syn - assert "y" in str(exc_info.value) - - --@gen_cluster(client=False) -+@gen_cluster() - async def test_publish_bag(s, a, b): - db = pytest.importorskip("dask.bag") - c = await Client(s.address, asynchronous=True) -Index: distributed-2021.7.0/distributed/tests/test_pubsub.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/tests/test_pubsub.py -+++ distributed-2021.7.0/distributed/tests/test_pubsub.py -@@ -10,7 +10,7 @@ from distributed.metrics import time - from distributed.utils_test import gen_cluster - - --@gen_cluster(client=True, timeout=None) -+@gen_cluster(client=True) - async def test_speed(c, s, a, b): - """ - This tests how quickly we can move messages back and forth -Index: distributed-2021.7.0/distributed/tests/test_queues.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/tests/test_queues.py -+++ distributed-2021.7.0/distributed/tests/test_queues.py -@@ -110,7 +110,7 @@ def test_picklability_sync(client): - - - @pytest.mark.slow --@gen_cluster(client=True, nthreads=[("127.0.0.1", 2)] * 5, Worker=Nanny, timeout=None) -+@gen_cluster(client=True, nthreads=[("127.0.0.1", 2)] * 5, Worker=Nanny) - async def test_race(c, s, *workers): - def f(i): - with worker_client() as c: -Index: distributed-2021.7.0/distributed/tests/test_resources.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/tests/test_resources.py -+++ distributed-2021.7.0/distributed/tests/test_resources.py -@@ -9,7 +9,6 @@ from dask.utils import stringify - - from distributed import Worker - from distributed.client import wait --from distributed.compatibility import WINDOWS - from distributed.utils_test import gen_cluster, inc, slowadd, slowinc - - -@@ -378,9 +377,7 @@ async def test_full_collections(c, s, a, - reason="don't track resources through optimization" - ), - ), -- pytest.param( -- False, marks=pytest.mark.flaky(reruns=10, reruns_delay=5, condition=WINDOWS) -- ), -+ False, - ], - ) - def test_collections_get(client, optimize_graph, s, a, b): -Index: distributed-2021.7.0/distributed/tests/test_scheduler.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/tests/test_scheduler.py -+++ distributed-2021.7.0/distributed/tests/test_scheduler.py -@@ -12,6 +12,7 @@ from time import sleep - from unittest import mock - - import cloudpickle -+import psutil - import pytest - from tlz import concat, first, frequencies, merge, valmap - -@@ -21,7 +22,7 @@ from dask.utils import apply, parse_time - - from distributed import Client, Nanny, Worker, fire_and_forget, wait - from distributed.comm import Comm --from distributed.compatibility import MACOS, WINDOWS -+from distributed.compatibility import LINUX, WINDOWS - from distributed.core import ConnectionPool, Status, connect, rpc - from distributed.metrics import time - from distributed.protocol.pickle import dumps -@@ -618,7 +619,7 @@ async def test_ready_remove_worker(s, a, - assert all(len(w.processing) > w.nthreads for w in s.workers.values()) - - --@gen_cluster(client=True, Worker=Nanny) -+@gen_cluster(client=True, Worker=Nanny, timeout=60) - async def test_restart(c, s, a, b): - futures = c.map(inc, range(20)) - await wait(futures) -@@ -742,24 +743,17 @@ async def test_config_stealing(cleanup): - assert "stealing" not in s.extensions - - --@pytest.mark.skipif( -- sys.platform.startswith("win"), reason="file descriptors not really a thing" --) -+@pytest.mark.skipif(WINDOWS, reason="num_fds not supported on windows") - @gen_cluster(nthreads=[]) - async def test_file_descriptors_dont_leak(s): -- psutil = pytest.importorskip("psutil") - proc = psutil.Process() - before = proc.num_fds() - -- w = await Worker(s.address) -- await w.close() -+ async with Worker(s.address): -+ assert proc.num_fds() > before - -- during = proc.num_fds() -- -- start = time() - while proc.num_fds() > before: - await asyncio.sleep(0.01) -- assert time() < start + 5 - - - @gen_cluster() -@@ -827,7 +821,7 @@ async def test_scheduler_sees_memory_lim - await w.close() - - --@gen_cluster(client=True, timeout=1000) -+@gen_cluster(client=True) - async def test_retire_workers(c, s, a, b): - [x] = await c.scatter([1], workers=a.address) - [y] = await c.scatter([list(range(1000))], workers=b.address) -@@ -929,13 +923,10 @@ async def test_retire_workers_no_suspici - - - @pytest.mark.slow --@pytest.mark.skipif( -- sys.platform.startswith("win"), reason="file descriptors not really a thing" --) --@gen_cluster(client=True, nthreads=[], timeout=240) -+@pytest.mark.skipif(WINDOWS, reason="num_fds not supported on windows") -+@gen_cluster(client=True, nthreads=[], timeout=60) - async def test_file_descriptors(c, s): - await asyncio.sleep(0.1) -- psutil = pytest.importorskip("psutil") - da = pytest.importorskip("dask.array") - proc = psutil.Process() - num_fds_1 = proc.num_fds() -@@ -980,10 +971,8 @@ async def test_file_descriptors(c, s): - assert comm.closed() or comm.peer_address != s.address, comm - assert not s.stream_comms - -- start = time() - while proc.num_fds() > num_fds_1 + N: - await asyncio.sleep(0.01) -- assert time() < start + 3 - - - @pytest.mark.slow -@@ -1325,36 +1314,30 @@ async def test_non_existent_worker(c, s) - async def test_correct_bad_time_estimate(c, s, *workers): - future = c.submit(slowinc, 1, delay=0) - await wait(future) -- - futures = [c.submit(slowinc, future, delay=0.1, pure=False) for i in range(20)] -- - await asyncio.sleep(0.5) -- - await wait(futures) -- - assert all(w.data for w in workers), [sorted(w.data) for w in workers] - - --@gen_test() --async def test_service_hosts(): -- port = 0 -- for url, expected in [ -- ("tcp://0.0.0.0", ("::", "0.0.0.0")), -- ("tcp://127.0.0.1", ("::", "0.0.0.0")), -- ("tcp://127.0.0.1:38275", ("::", "0.0.0.0")), -- ]: -- async with Scheduler(host=url) as s: -- sock = first(s.http_server._sockets.values()) -- if isinstance(expected, tuple): -- assert sock.getsockname()[0] in expected -- else: -- assert sock.getsockname()[0] == expected -- -- port = ("127.0.0.1", 0) -- for url in ["tcp://0.0.0.0", "tcp://127.0.0.1", "tcp://127.0.0.1:38275"]: -- async with Scheduler(dashboard_address="127.0.0.1:0", host=url) as s: -- sock = first(s.http_server._sockets.values()) -- assert sock.getsockname()[0] == "127.0.0.1" -+@pytest.mark.parametrize( -+ "host", ["tcp://0.0.0.0", "tcp://127.0.0.1", "tcp://127.0.0.1:38275"] -+) -+@pytest.mark.parametrize( -+ "dashboard_address,expect", -+ [ -+ (None, ("::", "0.0.0.0")), -+ ("127.0.0.1:0", ("127.0.0.1",)), -+ ], -+) -+@pytest.mark.asyncio -+async def test_dashboard_host(host, dashboard_address, expect): -+ """Dashboard is accessible from any host by default, but it can be also bound to -+ localhost. -+ """ -+ async with Scheduler(host=host, dashboard_address=dashboard_address) as s: -+ sock = first(s.http_server._sockets.values()) -+ assert sock.getsockname()[0] in expect - - - @gen_cluster(client=True, worker_kwargs={"profile_cycle_interval": "100ms"}) -@@ -1543,9 +1526,6 @@ async def test_retries(c, s, a, b): - exc_info.match("one") - - --@pytest.mark.flaky( -- reruns=10, reruns_delay=5, reason="second worker also errant for some reason" --) - @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3) - async def test_missing_data_errant_worker(c, s, w1, w2, w3): - with dask.config.set({"distributed.comm.timeouts.connect": "1s"}): -@@ -1779,7 +1759,7 @@ async def test_bandwidth(c, s, a, b): - assert not s.bandwidth_workers - - --@gen_cluster(client=True, Worker=Nanny) -+@gen_cluster(client=True, Worker=Nanny, timeout=60) - async def test_bandwidth_clear(c, s, a, b): - np = pytest.importorskip("numpy") - x = c.submit(np.arange, 1000000, workers=[a.worker_address], pure=False) -@@ -1821,9 +1801,7 @@ async def test_close_workers(s, a, b): - assert b.status == Status.closed - - --@pytest.mark.skipif( -- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" --) -+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") - @gen_test() - async def test_host_address(): - s = await Scheduler(host="127.0.0.2", port=0) -@@ -2361,7 +2339,7 @@ async def test_unknown_task_duration_con - assert s.idle_since == s.time_started - - --@gen_cluster(client=True, timeout=None) -+@gen_cluster(client=True) - async def test_retire_state_change(c, s, a, b): - np = pytest.importorskip("numpy") - y = c.map(lambda x: x ** 2, range(10)) -@@ -2482,8 +2460,8 @@ async def assert_memory(scheduler_or_wor - t0 = time() - while True: - minfo = scheduler_or_workerstate.memory -- nbytes = getattr(minfo, attr) -- if min_ * 2 ** 20 <= nbytes <= max_ * 2 ** 20: -+ nmib = getattr(minfo, attr) / 2 ** 20 -+ if min_ <= nmib <= max_: - return - if time() - t0 > timeout: - raise TimeoutError( -@@ -2492,13 +2470,12 @@ async def assert_memory(scheduler_or_wor - await asyncio.sleep(0.1) - - --# This test is heavily influenced by hard-to-control factors such as memory management --# by the Python interpreter and the OS, so it occasionally glitches --@pytest.mark.flaky(reruns=3, reruns_delay=5) --# ~33s runtime, or distributed.memory.recent-to-old-time + 3s -+# ~31s runtime, or distributed.worker.memory.recent-to-old-time + 1s. -+# On Windows, it can take ~65s due to worker memory needing to stabilize first. - @pytest.mark.slow -+@pytest.mark.flaky(condition=LINUX, reason="see comments", reruns=10, reruns_delay=5) - @gen_cluster( -- client=True, Worker=Nanny, worker_kwargs={"memory_limit": "500 MiB"}, timeout=60 -+ client=True, Worker=Nanny, worker_kwargs={"memory_limit": "500 MiB"}, timeout=120 - ) - async def test_memory(c, s, *_): - pytest.importorskip("zict") -@@ -2511,14 +2488,25 @@ async def test_memory(c, s, *_): - assert s_m0.managed == 0 - assert a.memory.managed == 0 - assert b.memory.managed == 0 -- # When a worker first goes online, its RAM is immediately counted as -- # unmanaged_old -- await assert_memory(s, "unmanaged_recent", 0, 40, timeout=0) -- await assert_memory(a, "unmanaged_recent", 0, 20, timeout=0) -- await assert_memory(b, "unmanaged_recent", 0, 20, timeout=0) - -- f1 = c.submit(leaking, 100, 50, 5, pure=False, workers=[a.name]) -- f2 = c.submit(leaking, 100, 50, 5, pure=False, workers=[b.name]) -+ # When a worker first goes online, its RAM is immediately counted as unmanaged_old. -+ # On Windows, however, there is somehow enough time between the worker start and -+ # this line for 2 heartbeats and the memory keeps growing substantially for a while. -+ # Sometimes there is a single heartbeat but on the consecutive test we observe -+ # a large unexplained increase in unmanaged_recent memory. -+ # Wait for the situation to stabilize. -+ if WINDOWS: -+ await asyncio.sleep(10) -+ initial_timeout = 40 -+ else: -+ initial_timeout = 0 -+ -+ await assert_memory(s, "unmanaged_recent", 0, 40, timeout=initial_timeout) -+ await assert_memory(a, "unmanaged_recent", 0, 20, timeout=initial_timeout) -+ await assert_memory(b, "unmanaged_recent", 0, 20, timeout=initial_timeout) -+ -+ f1 = c.submit(leaking, 100, 50, 10, pure=False, workers=[a.name]) -+ f2 = c.submit(leaking, 100, 50, 10, pure=False, workers=[b.name]) - await assert_memory(s, "unmanaged_recent", 300, 380) - await assert_memory(a, "unmanaged_recent", 150, 190) - await assert_memory(b, "unmanaged_recent", 150, 190) -@@ -2533,19 +2521,17 @@ async def test_memory(c, s, *_): - await assert_memory(b, "unmanaged_recent", 50, 90) - - # Force the output of f1 and f2 to spill to disk. -- # With target=0.6 and memory_limit=500 MiB, we'll start spilling at 300 MiB -- # process memory per worker, or roughly after 3~7 rounds of the below depending -- # on how much RAM the interpreter is using. -+ # With spill=0.7 and memory_limit=500 MiB, we'll start spilling at 350 MiB process -+ # memory per worker, or up to 20 iterations of the below depending on how much RAM -+ # the interpreter is using. - more_futs = [] -- for _ in range(8): -- if s.memory.managed_spilled > 0: -- break -- more_futs += [ -- c.submit(leaking, 20, 0, 0, pure=False, workers=[a.name]), -- c.submit(leaking, 20, 0, 0, pure=False, workers=[b.name]), -- ] -- await asyncio.sleep(2) -- await assert_memory(s, "managed_spilled", 1, 999) -+ while not s.memory.managed_spilled: -+ if a.memory.process < 0.7 * 500 * 2 ** 20: -+ more_futs.append(c.submit(leaking, 10, 0, 0, pure=False, workers=[a.name])) -+ if b.memory.process < 0.7 * 500 * 2 ** 20: -+ more_futs.append(c.submit(leaking, 10, 0, 0, pure=False, workers=[b.name])) -+ await wait(more_futs) -+ await asyncio.sleep(1) - - # Wait for the spilling to finish. Note that this does not make the test take - # longer as we're waiting for recent-to-old-time anyway. -@@ -2568,24 +2554,23 @@ async def test_memory(c, s, *_): - # transition into unmanaged_old - await c.run(gc.collect) - await assert_memory(s, "unmanaged_recent", 0, 90, timeout=40) -- await assert_memory( -- s, -- "unmanaged_old", -- orig_old + 90, -- # On MacOS, the process memory of the Python interpreter does not shrink as -- # fast as on Linux/Windows -- 9999 if MACOS else orig_old + 190, -- timeout=40, -- ) -- -- # When the leaked memory is cleared, unmanaged and unmanaged_old drop -- # On MacOS, the process memory of the Python interpreter does not shrink as fast -- # as on Linux/Windows -- if not MACOS: -- await c.run(clear_leak) -- await assert_memory(s, "unmanaged", 0, orig_unmanaged + 95) -- await assert_memory(s, "unmanaged_old", 0, orig_old + 95) -- await assert_memory(s, "unmanaged_recent", 0, 90) -+ await assert_memory(s, "unmanaged_old", orig_old + 90, 9999, timeout=40) -+ -+ # When the leaked memory is cleared, unmanaged and unmanaged_old drop. -+ # On MacOS and Windows, the process memory of the Python interpreter does not shrink -+ # as fast as on Linux. Note that this behaviour is heavily impacted by OS tweaks, -+ # meaning that what you observe on your local host may behave differently on CI. -+ # Even on Linux, this occasionally glitches - hence why there is a flaky marker on -+ # this test. -+ if not LINUX: -+ return -+ -+ orig_unmanaged = s.memory.unmanaged / 2 ** 20 -+ orig_old = s.memory.unmanaged_old / 2 ** 20 -+ await c.run(clear_leak) -+ await assert_memory(s, "unmanaged", 0, orig_unmanaged - 60) -+ await assert_memory(s, "unmanaged_old", 0, orig_old - 60) -+ await assert_memory(s, "unmanaged_recent", 0, 90) - - - @gen_cluster(client=True, worker_kwargs={"memory_limit": 0}) -@@ -2825,23 +2810,20 @@ async def test_rebalance_no_limit(c, s, - worker_kwargs={"memory_limit": "1000 MiB"}, - config={ - "distributed.worker.memory.rebalance.measure": "managed", -- "distributed.worker.memory.rebalance.recipient-max": 0.4, -+ "distributed.worker.memory.rebalance.sender-min": 0.2, -+ "distributed.worker.memory.rebalance.recipient-max": 0.1, - }, - ) - async def test_rebalance_no_recipients(c, s, *_): - """There are sender workers, but no recipient workers""" - a, b = s.workers -- futures = [ -- c.submit(lambda: "x" * (400 * 2 ** 20), pure=False, workers=[a]), # 40% -- c.submit(lambda: "x" * (400 * 2 ** 20), pure=False, workers=[b]), # 40% -- ] + c.map( -- lambda _: "x" * (2 ** 21), range(100), workers=[a] -- ) # 20% -- await wait(futures) -- await assert_memory(s, "managed", 1000, 1001) -- await assert_ndata(c, {a: 101, b: 1}) -+ fut_a = c.map(lambda _: "x" * (2 ** 20), range(250), workers=[a]) # 25% -+ fut_b = c.map(lambda _: "x" * (2 ** 20), range(100), workers=[b]) # 10% -+ await wait(fut_a + fut_b) -+ await assert_memory(s, "managed", 350, 351) -+ await assert_ndata(c, {a: 250, b: 100}) - await s.rebalance() -- await assert_ndata(c, {a: 101, b: 1}) -+ await assert_ndata(c, {a: 250, b: 100}) - s.validate_state() - - -Index: distributed-2021.7.0/distributed/tests/test_semaphore.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/tests/test_semaphore.py -+++ distributed-2021.7.0/distributed/tests/test_semaphore.py -@@ -11,7 +11,6 @@ from dask.distributed import Client - - from distributed import Semaphore, fire_and_forget - from distributed.comm import Comm --from distributed.compatibility import WINDOWS - from distributed.core import ConnectionPool - from distributed.metrics import time - from distributed.utils_test import captured_logger, cluster, gen_cluster, slowidentity -@@ -91,21 +90,17 @@ def test_timeout_sync(client): - - - @gen_cluster( -- client=True, -- timeout=20, - config={ - "distributed.scheduler.locks.lease-validation-interval": "200ms", - "distributed.scheduler.locks.lease-timeout": "200ms", - }, - ) --async def test_release_semaphore_after_timeout(c, s, a, b): -+async def test_release_semaphore_after_timeout(s, a, b): - sem = await Semaphore(name="x", max_leases=2) - await sem.acquire() # leases: 2 - 1 = 1 - - semB = await Semaphore(name="x", max_leases=2) -- - assert await semB.acquire() # leases: 1 - 1 = 0 -- - assert not (await sem.acquire(timeout=0.01)) - assert not (await semB.acquire(timeout=0.01)) - -@@ -114,9 +109,7 @@ async def test_release_semaphore_after_t - - semB.refresh_callback.stop() - del semB -- - assert await sem.acquire(timeout=1) -- - assert not (await sem.acquire(timeout=0.1)) - - -@@ -564,7 +557,6 @@ async def test_release_retry(c, s, a, b) - assert await semaphore.release() is True - - --@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=WINDOWS) - @gen_cluster( - client=True, - config={ -Index: distributed-2021.7.0/distributed/tests/test_steal.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/tests/test_steal.py -+++ distributed-2021.7.0/distributed/tests/test_steal.py -@@ -2,7 +2,6 @@ import asyncio - import itertools - import logging - import random --import sys - import weakref - from operator import mul - from time import sleep -@@ -13,6 +12,7 @@ from tlz import concat, sliding_window - import dask - - from distributed import Nanny, Worker, wait, worker_client -+from distributed.compatibility import LINUX - from distributed.config import config - from distributed.metrics import time - from distributed.scheduler import key_split -@@ -33,9 +33,7 @@ setup_module = nodebug_setup_module - teardown_module = nodebug_teardown_module - - --@pytest.mark.skipif( -- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" --) -+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") - @gen_cluster(client=True, nthreads=[("127.0.0.1", 2), ("127.0.0.2", 2)]) - async def test_work_stealing(c, s, a, b): - [x] = await c._scatter([1], workers=a.address) -@@ -144,7 +142,7 @@ async def test_steal_related_tasks(e, s, - assert nearby > 10 - - --@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 10, timeout=1000) -+@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 10) - async def test_dont_steal_fast_tasks_compute_time(c, s, *workers): - def do_nothing(x, y=None): - pass -@@ -285,9 +283,7 @@ async def test_steal_worker_restrictions - assert len(wc.tasks) == 0 - - --@pytest.mark.skipif( -- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" --) -+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") - @gen_cluster(client=True, nthreads=[("127.0.0.1", 1), ("127.0.0.2", 1)]) - async def test_dont_steal_host_restrictions(c, s, a, b): - future = c.submit(slowinc, 1, delay=0.10, workers=a.address) -@@ -306,9 +302,7 @@ async def test_dont_steal_host_restricti - assert len(b.tasks) == 0 - - --@pytest.mark.skipif( -- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" --) -+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") - @gen_cluster(client=True, nthreads=[("127.0.0.1", 1), ("127.0.0.2", 2)]) - async def test_steal_host_restrictions(c, s, wa, wb): - future = c.submit(slowinc, 1, delay=0.10, workers=wa.address) -@@ -354,9 +348,7 @@ async def test_dont_steal_resource_restr - assert len(b.tasks) == 0 - - --@gen_cluster( -- client=True, nthreads=[("127.0.0.1", 1, {"resources": {"A": 2}})], timeout=3 --) -+@gen_cluster(client=True, nthreads=[("127.0.0.1", 1, {"resources": {"A": 2}})]) - async def test_steal_resource_restrictions(c, s, a): - future = c.submit(slowinc, 1, delay=0.10, workers=a.address) - await future -@@ -601,7 +593,7 @@ def test_balance(inp, expected): - test() - - --@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 2, Worker=Nanny) -+@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 2, Worker=Nanny, timeout=60) - async def test_restart(c, s, a, b): - futures = c.map( - slowinc, range(100), delay=0.1, workers=a.address, allow_other_workers=True -@@ -613,7 +605,7 @@ async def test_restart(c, s, a, b): - assert any(st for st in steal.stealable_all) - assert any(x for L in steal.stealable.values() for x in L) - -- await c.restart(timeout=10) -+ await c.restart() - - assert not any(x for x in steal.stealable_all) - assert not any(x for L in steal.stealable.values() for x in L) -@@ -740,7 +732,6 @@ async def test_dont_steal_long_running_t - ) <= 1 - - --@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=sys.version_info[:2] == (3, 8)) - @gen_cluster(client=True, nthreads=[("127.0.0.1", 5)] * 2) - async def test_cleanup_repeated_tasks(c, s, a, b): - class Foo: -Index: distributed-2021.7.0/distributed/tests/test_stress.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/tests/test_stress.py -+++ distributed-2021.7.0/distributed/tests/test_stress.py -@@ -1,6 +1,5 @@ - import asyncio - import random --import sys - from contextlib import suppress - from operator import add - from time import sleep -@@ -11,9 +10,10 @@ from tlz import concat, sliding_window - from dask import delayed - - from distributed import Client, Nanny, wait -+from distributed.compatibility import WINDOWS - from distributed.config import config - from distributed.metrics import time --from distributed.utils import All, CancelledError -+from distributed.utils import CancelledError - from distributed.utils_test import ( - bump_rlimit, - cluster, -@@ -43,6 +43,7 @@ async def test_stress_1(c, s, a, b): - assert result == sum(map(inc, range(n))) - - -+@pytest.mark.slow - @pytest.mark.parametrize(("func", "n"), [(slowinc, 100), (inc, 1000)]) - def test_stress_gc(loop, func, n): - with cluster() as (s, [a, b]): -@@ -54,10 +55,8 @@ def test_stress_gc(loop, func, n): - assert x.result() == n + 2 - - --@pytest.mark.skipif( -- sys.platform.startswith("win"), reason="test can leave dangling RPC objects" --) --@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 8, timeout=None) -+@pytest.mark.skipif(WINDOWS, reason="test can leave dangling RPC objects") -+@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 8) - async def test_cancel_stress(c, s, *workers): - da = pytest.importorskip("dask.array") - x = da.random.random((50, 50), chunks=(2, 2)) -@@ -86,28 +85,33 @@ def test_cancel_stress_sync(loop): - c.cancel(f) - - --@gen_cluster(nthreads=[], client=True, timeout=None) -+@pytest.mark.slow -+@gen_cluster( -+ nthreads=[], -+ client=True, -+ timeout=120, -+ scheduler_kwargs={"allowed_failures": 100_000}, -+) - async def test_stress_creation_and_deletion(c, s): - # Assertions are handled by the validate mechanism in the scheduler -- s.allowed_failures = 100000 - da = pytest.importorskip("dask.array") - -- x = da.random.random(size=(2000, 2000), chunks=(100, 100)) -- y = (x + 1).T + (x * 2) - x.mean(axis=1) -- -+ rng = da.random.RandomState(0) -+ x = rng.random(size=(2000, 2000), chunks=(100, 100)) -+ y = ((x + 1).T + (x * 2) - x.mean(axis=1)).sum().round(2) - z = c.persist(y) - - async def create_and_destroy_worker(delay): - start = time() - while time() < start + 5: -- n = await Nanny(s.address, nthreads=2, loop=s.loop) -- await asyncio.sleep(delay) -- await n.close() -+ async with Nanny(s.address, nthreads=2): -+ await asyncio.sleep(delay) - print("Killed nanny") - -- await asyncio.wait_for( -- All([create_and_destroy_worker(0.1 * i) for i in range(20)]), 60 -- ) -+ await asyncio.gather(*(create_and_destroy_worker(0.1 * i) for i in range(20))) -+ -+ async with Nanny(s.address, nthreads=2): -+ assert await c.compute(z) == 8000884.93 - - - @gen_cluster(nthreads=[("127.0.0.1", 1)] * 10, client=True, timeout=60) -@@ -169,6 +173,7 @@ def vsum(*args): - - @pytest.mark.avoid_ci - @pytest.mark.slow -+@pytest.mark.timeout(1100) # Override timeout from setup.cfg - @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 80, timeout=1000) - async def test_stress_communication(c, s, *workers): - s.validate = False # very slow otherwise -Index: distributed-2021.7.0/distributed/tests/test_tls_functional.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/tests/test_tls_functional.py -+++ distributed-2021.7.0/distributed/tests/test_tls_functional.py -@@ -42,7 +42,7 @@ async def test_Queue(c, s, a, b): - assert future.key == future2.key - - --@gen_tls_cluster(client=True, timeout=None) -+@gen_tls_cluster(client=True) - async def test_client_submit(c, s, a, b): - assert s.address.startswith("tls://") - -Index: distributed-2021.7.0/distributed/tests/test_utils.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/tests/test_utils.py -+++ distributed-2021.7.0/distributed/tests/test_utils.py -@@ -4,7 +4,6 @@ import io - import os - import queue - import socket --import sys - import traceback - from time import sleep - -@@ -13,6 +12,7 @@ from tornado.ioloop import IOLoop - - import dask - -+from distributed.compatibility import MACOS, WINDOWS - from distributed.metrics import time - from distributed.utils import ( - LRU, -@@ -140,23 +140,12 @@ def test_ensure_ip(): - assert ensure_ip("::1") == "::1" - - -+@pytest.mark.skipif(WINDOWS, reason="TODO") - def test_get_ip_interface(): -- if sys.platform == "darwin": -- assert get_ip_interface("lo0") == "127.0.0.1" -- elif sys.platform.startswith("linux"): -- assert get_ip_interface("lo") == "127.0.0.1" -- else: -- pytest.skip(f"test needs to be enhanced for platform {sys.platform!r}") -- -- non_existent_interface = "__non-existent-interface" -- expected_error_message = f"{non_existent_interface!r}.+network interface.+" -- -- if sys.platform == "darwin": -- expected_error_message += "'lo0'" -- elif sys.platform.startswith("linux"): -- expected_error_message += "'lo'" -- with pytest.raises(ValueError, match=expected_error_message): -- get_ip_interface(non_existent_interface) -+ iface = "lo0" if MACOS else "lo" -+ assert get_ip_interface(iface) == "127.0.0.1" -+ with pytest.raises(ValueError, match=f"'__notexist'.+network interface.+'{iface}'"): -+ get_ip_interface("__notexist") - - - def test_truncate_exception(): -Index: distributed-2021.7.0/distributed/tests/test_utils_test.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/tests/test_utils_test.py -+++ distributed-2021.7.0/distributed/tests/test_utils_test.py -@@ -141,7 +141,7 @@ def test_gen_cluster_cleans_up_client(lo - assert not dask.config.get("get", None) - - --@gen_cluster(client=False) -+@gen_cluster() - async def test_gen_cluster_without_client(s, a, b): - assert isinstance(s, Scheduler) - for w in [a, b]: -Index: distributed-2021.7.0/distributed/tests/test_variable.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/tests/test_variable.py -+++ distributed-2021.7.0/distributed/tests/test_variable.py -@@ -191,7 +191,7 @@ async def test_timeout_get(c, s, a, b): - - - @pytest.mark.slow --@gen_cluster(client=True, nthreads=[("127.0.0.1", 2)] * 5, Worker=Nanny, timeout=None) -+@gen_cluster(client=True, nthreads=[("127.0.0.1", 2)] * 5, Worker=Nanny) - async def test_race(c, s, *workers): - NITERS = 50 - -@@ -216,10 +216,8 @@ async def test_race(c, s, *workers): - results = await c.gather(futures) - assert all(r > NITERS * 0.8 for r in results) - -- start = time() - while len(s.wants_what["variable-x"]) != 1: - await asyncio.sleep(0.01) -- assert time() - start < 2 - - - @gen_cluster(client=True) -Index: distributed-2021.7.0/distributed/tests/test_worker.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/tests/test_worker.py -+++ distributed-2021.7.0/distributed/tests/test_worker.py -@@ -18,7 +18,6 @@ from tlz import first, pluck, sliding_wi - import dask - from dask import delayed - from dask.system import CPU_COUNT --from dask.utils import format_bytes - - from distributed import ( - Client, -@@ -31,7 +30,7 @@ from distributed import ( - ) - from distributed.comm.registry import backends - from distributed.comm.tcp import TCPBackend --from distributed.compatibility import MACOS, WINDOWS -+from distributed.compatibility import LINUX, MACOS, WINDOWS - from distributed.core import CommClosedError, Status, rpc - from distributed.diagnostics.plugin import PipInstall - from distributed.metrics import time -@@ -780,7 +779,7 @@ async def test_hold_onto_dependents(c, s - - - @pytest.mark.slow --@gen_cluster(client=False, nthreads=[]) -+@gen_cluster(nthreads=[]) - async def test_worker_death_timeout(s): - with dask.config.set({"distributed.comm.timeouts.connect": "1s"}): - await s.close() -@@ -1005,27 +1004,17 @@ async def test_global_workers(s, a, b): - assert w is a or w is b - - --@pytest.mark.skipif(WINDOWS, reason="file descriptors") -+@pytest.mark.skipif(WINDOWS, reason="num_fds not supported on windows") - @gen_cluster(nthreads=[]) - async def test_worker_fds(s): -- psutil = pytest.importorskip("psutil") -- await asyncio.sleep(0.05) -- start = psutil.Process().num_fds() -- -- worker = await Worker(s.address, loop=s.loop) -- await asyncio.sleep(0.1) -- middle = psutil.Process().num_fds() -- start = time() -- while middle > start: -- await asyncio.sleep(0.01) -- assert time() < start + 1 -+ proc = psutil.Process() -+ before = psutil.Process().num_fds() - -- await worker.close() -+ async with Worker(s.address, loop=s.loop): -+ assert proc.num_fds() > before - -- start = time() -- while psutil.Process().num_fds() > start: -+ while proc.num_fds() > before: - await asyncio.sleep(0.01) -- assert time() < start + 0.5 - - - @gen_cluster(nthreads=[]) -@@ -1064,13 +1053,13 @@ async def test_scheduler_file(): - @gen_cluster(client=True) - async def test_scheduler_delay(c, s, a, b): - old = a.scheduler_delay -- assert abs(a.scheduler_delay) < 0.3 -- assert abs(b.scheduler_delay) < 0.3 -- await asyncio.sleep(a.periodic_callbacks["heartbeat"].callback_time / 1000 + 0.3) -+ assert abs(a.scheduler_delay) < 0.6 -+ assert abs(b.scheduler_delay) < 0.6 -+ await asyncio.sleep(a.periodic_callbacks["heartbeat"].callback_time / 1000 + 0.6) - assert a.scheduler_delay != old - - --@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=MACOS) -+@pytest.mark.flaky(reruns=10, reruns_delay=5) - @gen_cluster(client=True) - async def test_statistical_profiling(c, s, a, b): - futures = c.map(slowinc, range(10), delay=0.1) -@@ -1134,7 +1123,6 @@ async def test_robust_to_bad_sizeof_esti - - - @pytest.mark.slow --@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=sys.version_info[:2] == (3, 8)) - @gen_cluster( - nthreads=[("127.0.0.1", 2)], - client=True, -@@ -1144,7 +1132,6 @@ async def test_robust_to_bad_sizeof_esti - "memory_target_fraction": False, - "memory_pause_fraction": 0.5, - }, -- timeout=20, - ) - async def test_pause_executor(c, s, a): - memory = psutil.Process().memory_info().rss -@@ -1159,14 +1146,9 @@ async def test_pause_executor(c, s, a): - future = c.submit(f) - futures = c.map(slowinc, range(30), delay=0.1) - -- start = time() - while not a.paused: - await asyncio.sleep(0.01) -- assert time() < start + 4, ( -- format_bytes(psutil.Process().memory_info().rss), -- format_bytes(a.memory_limit), -- len(a.data), -- ) -+ - out = logger.getvalue() - assert "memory" in out.lower() - assert "pausing" in out.lower() -@@ -1312,9 +1294,7 @@ async def test_wait_for_outgoing(c, s, a - assert 1 / 3 < ratio < 3 - - --@pytest.mark.skipif( -- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" --) -+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") - @gen_cluster( - nthreads=[("127.0.0.1", 1), ("127.0.0.1", 1), ("127.0.0.2", 1)], client=True - ) -@@ -1467,9 +1447,7 @@ async def test_local_directory_make_new_ - assert "dask-worker-space" in w.local_directory - - --@pytest.mark.skipif( -- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" --) -+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") - @gen_cluster(nthreads=[], client=True) - async def test_host_address(c, s): - w = await Worker(s.address, host="127.0.0.2") -@@ -2058,7 +2036,7 @@ async def test_worker_state_error_releas - res = c.submit(raise_exc, f, g, workers=[a.address]) - - with pytest.raises(RuntimeError): -- await res.result() -+ await res - - # Nothing bad happened on B, therefore B should hold on to G - assert len(b.tasks) == 1 -@@ -2124,7 +2102,7 @@ async def test_worker_state_error_releas - res = c.submit(raise_exc, f, g, workers=[a.address]) - - with pytest.raises(RuntimeError): -- await res.result() -+ await res - - # Nothing bad happened on B, therefore B should hold on to G - assert len(b.tasks) == 1 -@@ -2190,7 +2168,7 @@ async def test_worker_state_error_releas - res = c.submit(raise_exc, f, g, workers=[a.address]) - - with pytest.raises(RuntimeError): -- await res.result() -+ await res - - # Nothing bad happened on B, therefore B should hold on to G - assert len(b.tasks) == 1 -@@ -2251,7 +2229,7 @@ async def test_worker_state_error_long_c - ) - - with pytest.raises(RuntimeError): -- await res.result() -+ await res - - expected_states_A = { - f.key: "memory", -@@ -2319,7 +2297,7 @@ async def test_worker_state_error_long_c - await asyncio.sleep(0.01) - - --@gen_cluster(client=True, nthreads=[("127.0.0.1", x) for x in range(4)], timeout=None) -+@gen_cluster(client=True, nthreads=[("127.0.0.1", x) for x in range(4)]) - async def test_hold_on_to_replicas(c, s, *workers): - f1 = c.submit(inc, 1, workers=[workers[0].address], key="f1") - f2 = c.submit(inc, 2, workers=[workers[1].address], key="f2") -Index: distributed-2021.7.0/distributed/utils_test.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/utils_test.py -+++ distributed-2021.7.0/distributed/utils_test.py -@@ -764,6 +764,10 @@ def gen_test(timeout=_TEST_TIMEOUT): - async def test_foo(): - await ... # use tornado coroutines - """ -+ assert timeout, ( -+ "timeout should always be set and it should be smaller than the global one from" -+ "pytest-timeout" -+ ) - - def _(func): - def test_func(): -@@ -809,8 +813,6 @@ async def start_cluster( - ) - for i, ncore in enumerate(nthreads) - ] -- # for w in workers: -- # w.rpc = workers[0].rpc - - await asyncio.gather(*workers) - -@@ -875,12 +877,16 @@ def gen_cluster( - start - end - """ -+ assert timeout, ( -+ "timeout should always be set and it should be smaller than the global one from" -+ "pytest-timeout" -+ ) - if ncores is not None: - warnings.warn("ncores= has moved to nthreads=", stacklevel=2) - nthreads = ncores - - worker_kwargs = merge( -- {"memory_limit": system.MEMORY_LIMIT, "death_timeout": 10}, worker_kwargs -+ {"memory_limit": system.MEMORY_LIMIT, "death_timeout": 15}, worker_kwargs - ) - - def _(func): -@@ -930,8 +936,7 @@ def gen_cluster( - args = [c] + args - try: - future = func(*args, *outer_args, **kwargs) -- if timeout: -- future = asyncio.wait_for(future, timeout) -+ future = asyncio.wait_for(future, timeout) - result = await future - if s.validate: - s.validate_state() -Index: distributed-2021.7.0/distributed/worker.py -=================================================================== ---- distributed-2021.7.0.orig/distributed/worker.py -+++ distributed-2021.7.0/distributed/worker.py -@@ -1217,7 +1217,7 @@ class Worker(ServerNode): - return self.close(*args, **kwargs) - - async def close( -- self, report=True, timeout=10, nanny=True, executor_wait=True, safe=False -+ self, report=True, timeout=30, nanny=True, executor_wait=True, safe=False - ): - with log_errors(): - if self.status in (Status.closed, Status.closing): -Index: distributed-2021.7.0/setup.cfg -=================================================================== ---- distributed-2021.7.0.orig/setup.cfg -+++ distributed-2021.7.0/setup.cfg -@@ -38,10 +38,9 @@ markers = - slow: marks tests as slow (deselect with '-m "not slow"') - avoid_ci: marks tests as flaky on CI on all OSs - ipython: marks tests as exercising IPython --timeout_method = thread -+timeout_method = signal - timeout = 300 - - [egg_info] - tag_build = - tag_date = 0 -- diff --git a/python-distributed.changes b/python-distributed.changes index 579452a..1227adb 100644 --- a/python-distributed.changes +++ b/python-distributed.changes @@ -1,3 +1,55 @@ +------------------------------------------------------------------- +Sun Aug 8 14:36:34 UTC 2021 - Ben Greiner + +- Update to version 2021.7.2 + * Fix a deadlock connected to task stealing and task + deserialization + * Include maximum shard size in second to_frames method + * Minor dashboard style updates + * Cap maximum shard size at the size of an integer + * Document automatic MALLOC_TRIM_THRESHOLD_ environment variable + * Mark ucx-py tests for GPU + * Update individual profile plot sizing + * Handle NVMLError_Unknown in NVML diagnostics + * Unit tests to use a random port for the dashboard + * Ensure worker reconnect registers existing tasks properly + * Halve CI runtime! + * Add NannyPlugins + * Add WorkerNetworkBandwidth chart to dashboard + * Set nanny environment variables in config + * Read smaller frames to workaround OpenSSL bug + * Move UCX/RMM config variables to Distributed namespace + * Allow ws(s) messages greater than 10Mb + * Short-circuit root-ish check for many deps +-Release 2021.07.1 + * Remove experimental feature warning from actors docs + * Keep dependents in worker dependency if TS is still known + * Add Scheduler.set_restrictions + * Make Actor futures awaitable and work with as_completed + * Simplify test_secede_balances + * Computation class + * Some light dashboard cleanup + * Don't package tests + * Add pytest marker for GPU tests + * Actor: don't hold key references on workers + * Collapse nav to hamburger sooner + * Verify that actors survive pickling + * Reenable UCX-Py tests that used to segfault + * Better support ProcessPoolExecutors + * Simplify test_worker_heartbeat_after_cancel + * Avoid property validation in Bokeh + * Reduce default websocket frame size and make configurable + * Disable pytest-timeout SIGALARM on MacOS + * rebalance() resilience to computations + * Improve CI stability + * Ensure heartbeats after cancelation do not raise KeyError s + * Add more useful exception message on TLS cert mismatch + * Add bokeh mode parameter to performance reports +- Use the GitHub tarball because the PyPI sdist does to provide the + tests anymore + * Remove extra conftest.py source +- Drop distributed-pr5022-improve_ci.patch merged upstream + ------------------------------------------------------------------- Fri Jul 16 09:31:13 UTC 2021 - Ben Greiner diff --git a/python-distributed.spec b/python-distributed.spec index f0b81cc..db9aaae 100644 --- a/python-distributed.spec +++ b/python-distributed.spec @@ -36,20 +36,16 @@ %{?!python_module:%define python_module() python3-%{**}} %define skip_python2 1 %define skip_python36 1 -%define ghversiontag 2021.07.0 +%define ghversiontag 2021.07.2 Name: python-distributed%{psuffix} # Note: please always update together with python-dask -Version: 2021.7.0 +Version: 2021.7.2 Release: 0 Summary: Library for distributed computing with Python License: BSD-3-Clause URL: https://distributed.readthedocs.io/en/latest/ -Source: https://files.pythonhosted.org/packages/source/d/distributed/distributed-%{version}.tar.gz -# Missing in the PyPI package but needed for pytest fixtures. Note: One of the next releases will miss all of the tests. (gh#dask/distributed#5054) -Source1: https://github.com/dask/distributed/raw/%{ghversiontag}/conftest.py +Source: https://github.com/dask/distributed/archive/refs/tags//%{ghversiontag}.tar.gz#/distributed-%{ghversiontag}-gh.tar.gz Source99: python-distributed-rpmlintrc -# PATCH-FIX-UPSTREAM distributed-pr5022-improve_ci.patch -- gh#dask/distributed#5022 -Patch0: distributed-pr5022-improve_ci.patch BuildRequires: %{python_module base >= 3.7} BuildRequires: %{python_module setuptools} BuildRequires: fdupes @@ -104,8 +100,7 @@ extends both the concurrent.futures and dask APIs to moderate sized clusters. %prep -%autosetup -p1 -n distributed-%{version} -cp %SOURCE1 . +%autosetup -p1 -n distributed-%{ghversiontag} %build %if ! %{with test} @@ -118,10 +113,7 @@ cp %SOURCE1 . %python_clone -a %{buildroot}%{_bindir}/dask-ssh %python_clone -a %{buildroot}%{_bindir}/dask-scheduler %python_clone -a %{buildroot}%{_bindir}/dask-worker -%{python_expand # -chmod -x %{buildroot}%{$python_sitearch}/distributed/tests/test_utils_test.py -%fdupes %{buildroot}%{$python_sitearch} -} +%python_expand %fdupes %{buildroot}%{$python_sitearch} %endif %if %{with test}