From 66e1d3532150a6ae4a6f7ce743a76f8621d307af392e4e42ce3050e346f8930d Mon Sep 17 00:00:00 2001 From: Matej Cepl Date: Wed, 21 Jul 2021 09:16:05 +0000 Subject: [PATCH] Accepting request 907354 from home:bnavigator:branches:devel:languages:python:numeric - Update to version 2021.7.0 * Fix Nbytes jitter - less expensive * Use native GH actions cancel feature * Don't require workers to report to scheduler if scheduler shuts down * Add pandas to the list of checked packages for client. get_versions() * Move worker preload before scheduler address is set * Fix flaky test_oversubscribing_leases * Update scheduling policy docs for #4967 * Add echo handler to Server class * Also include pngs when bundling package * Remove duplicated dashboard panes * Fix worker memory dashboard flickering * Tabs on bottom left corner on dashboard * Rename nbytes widgets * Co-assign root-ish tasks * OSError tweaks * Update imports to cudf.testing._utils * Ensure shuffle split default durations uses proper prefix * Follow up pyupgrade formatting * Rename plot dropdown * Pyupgrade * Misc Sphinx tweaks * No longer hold dependencies of erred tasks in memory * Add maximum shard size to config * Ensure shuffle split operations are blacklisted from work stealing * Add dropdown menu to access individual plots * Edited the path to scheduler.py * Task Group Graph Visualization * Remove more internal references to deprecated utilities * Restructure nbytes hover * Except more errors in pynvml.nvmlInit() * Add occupancy as individual plot * Deprecate utilities which have moved to dask * Ensure connectionpool does not leave comms if closed mid connect * Add support for registering scheduler plugins from Client * Stealing dashboard fixes * Allow requirements verification to be ignored when loading backends from entrypoints * Add Log and Logs to API docs * Support fixtures and pytest.mark.parametrize with gen_cluster - Release 2021.06.2 * Revert refactor to utils.Log[s] and Cluster.get_logs * Use deprecation utility from Dask * Add transition counter to Scheduler * Remove nbytes_in_memory - Release 2021.06.1 * Fix deadlock in handle_missing_dep if additional replicas are available * Add configuration to enable/disable NVML diagnostics * Add scheduler log tab to performance reports * Add HTML repr to scheduler_info and incorporate into client and cluster reprs * Fix error state typo * Allow actor exceptions to propagate * Remove importing apply from dask.compatibility * Use more informative default name for WorkerPlugin s * Removed unused utility functions * Locally rerun successfully completed futures * Forget erred tasks and fix deadlocks on worker * Handle HTTPClientError in websocket connector * Update dask_cuda usage in SSHCluster docstring * Remove tests for process_time and thread_time * Flake8 config cleanup * Don't strip scheduler protocol when determining host * Add more documentation on memory management * Add range_query tests to NVML test suite * No longer cancel result future in async process when using timeouts - Release 2021.06.0 * Multiple worker executors * Ensure PyNVML works correctly when installed with no GPUs * Show more in test summary * Move SystemMonitor s GPU initialization back to constructor * Mark test_server_comms_mark_active_handlers with pytest.mark.asyncio * Who has has what html reprs v2 * O(1) rebalance * Ensure repr and eq for cluster always works - Release 2021.05.1 * Drop usage of WhoHas & WhatHas from Client * Ensure adaptive scaling is properly awaited and closed * Fix WhoHas/ HasWhat async usage * Add HTML reprs for Client.who_has and Client.has_what * Prevent accidentally starting multiple Worker s in the same process * Add system tab to performance reports * Let servers close faster if there are no active handlers * Fix UCX scrub config logging * Ensure worker clients are closed * Fix warning for attribute error when deleting a client * Ensure exceptions are raised if workers are incorrectly started * Update handling of UCX exceptions on endpoint closing * Ensure busy workloads properly look up who_has * Check distributed.scheduler.pickle in Scheduler.run_function * Add performance_report to API docs * Use dict _workers_dv in unordered use cases * Bump pre-commit hook versions * Do not mindlessly spawn workers when no memory limit is set * test_memory to use gen_cluster * Increase timeout of gen_test to 30s - Work on the very flaky testsuite: * Add missing conftest.py not packaged on PyPI * Add distributed-pr5022-improve_ci.patch in the hope for better stability -- gh#dask/distributed#5022 * Do not use pytest-xdist - Add Cython as runtime dep because the scheduler checks the presence OBS-URL: https://build.opensuse.org/request/show/907354 OBS-URL: https://build.opensuse.org/package/show/devel:languages:python:numeric/python-distributed?expand=0&rev=98 --- conftest.py | 37 + distributed-2021.5.0.tar.gz | 3 - distributed-2021.7.0.tar.gz | 3 + distributed-pr5022-improve_ci.patch | 2621 +++++++++++++++++++++++++++ python-distributed.changes | 114 ++ python-distributed.spec | 162 +- 6 files changed, 2804 insertions(+), 136 deletions(-) create mode 100644 conftest.py delete mode 100644 distributed-2021.5.0.tar.gz create mode 100644 distributed-2021.7.0.tar.gz create mode 100644 distributed-pr5022-improve_ci.patch diff --git a/conftest.py b/conftest.py new file mode 100644 index 0000000..cd7623a --- /dev/null +++ b/conftest.py @@ -0,0 +1,37 @@ +# https://pytest.org/latest/example/simple.html#control-skipping-of-tests-according-to-command-line-option +import pytest + +# Uncomment to enable more logging and checks +# (https://docs.python.org/3/library/asyncio-dev.html) +# Note this makes things slower and might consume much memory. +# os.environ["PYTHONASYNCIODEBUG"] = "1" + +try: + import faulthandler +except ImportError: + pass +else: + try: + faulthandler.enable() + except Exception: + pass + +# Make all fixtures available +from distributed.utils_test import * # noqa + + +def pytest_addoption(parser): + parser.addoption("--runslow", action="store_true", help="run slow tests") + + +def pytest_collection_modifyitems(config, items): + if config.getoption("--runslow"): + # --runslow given in cli: do not skip slow tests + return + skip_slow = pytest.mark.skip(reason="need --runslow option to run") + for item in items: + if "slow" in item.keywords: + item.add_marker(skip_slow) + + +pytest_plugins = ["distributed.pytest_resourceleaks"] diff --git a/distributed-2021.5.0.tar.gz b/distributed-2021.5.0.tar.gz deleted file mode 100644 index c78cf6e..0000000 --- a/distributed-2021.5.0.tar.gz +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:4da09db7972120db0a8e7fc2b2f91bb4d952ce21b185ccc1356603613060a0b3 -size 717781 diff --git a/distributed-2021.7.0.tar.gz b/distributed-2021.7.0.tar.gz new file mode 100644 index 0000000..ab2ce66 --- /dev/null +++ b/distributed-2021.7.0.tar.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:6257f399ea564bfdcd80dcc9df6cdaf703dbadb94b7c068d103f8366dc7f8d1f +size 1065464 diff --git a/distributed-pr5022-improve_ci.patch b/distributed-pr5022-improve_ci.patch new file mode 100644 index 0000000..fd7c71b --- /dev/null +++ b/distributed-pr5022-improve_ci.patch @@ -0,0 +1,2621 @@ +Index: distributed-2021.7.0/continuous_integration/condarc +=================================================================== +--- /dev/null ++++ distributed-2021.7.0/continuous_integration/condarc +@@ -0,0 +1,9 @@ ++channels: ++ - conda-forge ++ - defaults ++channel_priority: true ++auto_activate_base: false ++remote_backoff_factor: 20 ++remote_connect_timeout_secs: 20.0 ++remote_max_retries: 10 ++remote_read_timeout_secs: 60.0 +Index: distributed-2021.7.0/distributed/cli/tests/test_dask_scheduler.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/cli/tests/test_dask_scheduler.py ++++ distributed-2021.7.0/distributed/cli/tests/test_dask_scheduler.py +@@ -1,3 +1,4 @@ ++import psutil + import pytest + + pytest.importorskip("requests") +@@ -14,6 +15,7 @@ from click.testing import CliRunner + import distributed + import distributed.cli.dask_scheduler + from distributed import Client, Scheduler ++from distributed.compatibility import LINUX + from distributed.metrics import time + from distributed.utils import get_ip, get_ip_interface, tmpfile + from distributed.utils_test import ( +@@ -118,9 +120,7 @@ def test_dashboard_non_standard_ports(lo + requests.get("http://localhost:4832/status/") + + +-@pytest.mark.skipif( +- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" +-) ++@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") + def test_dashboard_whitelist(loop): + pytest.importorskip("bokeh") + with pytest.raises(Exception): +@@ -144,7 +144,6 @@ def test_dashboard_whitelist(loop): + + + def test_interface(loop): +- psutil = pytest.importorskip("psutil") + if_names = sorted(psutil.net_if_addrs()) + for if_name in if_names: + try: +@@ -168,25 +167,24 @@ def test_interface(loop): + start = time() + while not len(c.nthreads()): + sleep(0.1) +- assert time() - start < 5 ++ assert time() - start < 30 + info = c.scheduler_info() + assert "tcp://127.0.0.1" in info["address"] + assert all("127.0.0.1" == d["host"] for d in info["workers"].values()) + + +-@pytest.mark.flaky(reruns=10, reruns_delay=5) + def test_pid_file(loop): + def check_pidfile(proc, pidfile): + start = time() + while not os.path.exists(pidfile): + sleep(0.01) +- assert time() < start + 5 ++ assert time() < start + 30 + + text = False + start = time() + while not text: + sleep(0.01) +- assert time() < start + 5 ++ assert time() < start + 30 + with open(pidfile) as f: + text = f.read() + pid = int(text) +Index: distributed-2021.7.0/distributed/cli/tests/test_dask_worker.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/cli/tests/test_dask_worker.py ++++ distributed-2021.7.0/distributed/cli/tests/test_dask_worker.py +@@ -6,7 +6,6 @@ from click.testing import CliRunner + pytest.importorskip("requests") + + import os +-import sys + from multiprocessing import cpu_count + from time import sleep + +@@ -14,6 +13,7 @@ import requests + + import distributed.cli.dask_worker + from distributed import Client, Scheduler ++from distributed.compatibility import LINUX + from distributed.deploy.utils import nprocesses_nthreads + from distributed.metrics import time + from distributed.utils import parse_ports, sync, tmpfile +@@ -275,9 +275,7 @@ def test_nprocs_expands_name(loop): + assert len(set(names)) == 4 + + +-@pytest.mark.skipif( +- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" +-) ++@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") + @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) + @pytest.mark.parametrize( + "listen_address", ["tcp://0.0.0.0:39837", "tcp://127.0.0.2:39837"] +@@ -311,9 +309,7 @@ def test_contact_listen_address(loop, na + assert client.run(func) == {"tcp://127.0.0.2:39837": listen_address} + + +-@pytest.mark.skipif( +- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" +-) ++@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") + @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) + @pytest.mark.parametrize("host", ["127.0.0.2", "0.0.0.0"]) + def test_respect_host_listen_address(loop, nanny, host): +Index: distributed-2021.7.0/distributed/comm/tests/test_comms.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/comm/tests/test_comms.py ++++ distributed-2021.7.0/distributed/comm/tests/test_comms.py +@@ -29,7 +29,6 @@ from distributed.comm import ( + ) + from distributed.comm.registry import backends, get_backend + from distributed.comm.tcp import TCP, TCPBackend, TCPConnector +-from distributed.compatibility import WINDOWS + from distributed.metrics import time + from distributed.protocol import Serialized, deserialize, serialize, to_serialize + from distributed.utils import get_ip, get_ipv6 +@@ -110,7 +109,7 @@ async def debug_loop(): + while True: + loop = ioloop.IOLoop.current() + print(".", loop, loop._handlers) +- await asyncio.sleep(0.50) ++ await asyncio.sleep(0.5) + + + # +@@ -1107,7 +1106,6 @@ async def check_deserialize(addr): + await check_connector_deserialize(addr, True, msg, partial(check_out, True)) + + +-@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=WINDOWS) + @pytest.mark.asyncio + async def test_tcp_deserialize(): + await check_deserialize("tcp://") +Index: distributed-2021.7.0/distributed/comm/tests/test_ucx.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/comm/tests/test_ucx.py ++++ distributed-2021.7.0/distributed/comm/tests/test_ucx.py +@@ -276,7 +276,7 @@ async def test_ucx_localcluster(processe + ) as cluster: + async with Client(cluster, asynchronous=True) as client: + x = client.submit(inc, 1) +- await x.result() ++ await x + assert x.key in cluster.scheduler.tasks + if not processes: + assert any(w.data == {x.key: 2} for w in cluster.workers.values()) +Index: distributed-2021.7.0/distributed/comm/tests/test_ws.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/comm/tests/test_ws.py ++++ distributed-2021.7.0/distributed/comm/tests/test_ws.py +@@ -122,9 +122,9 @@ async def test_collections(cleanup): + async def test_large_transfer(cleanup): + np = pytest.importorskip("numpy") + async with Scheduler(protocol="ws://") as s: +- async with Worker(s.address, protocol="ws://") as w: ++ async with Worker(s.address, protocol="ws://"): + async with Client(s.address, asynchronous=True) as c: +- future = await c.scatter(np.random.random(1000000)) ++ await c.scatter(np.random.random(1_000_000)) + + + @pytest.mark.asyncio +Index: distributed-2021.7.0/distributed/compatibility.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/compatibility.py ++++ distributed-2021.7.0/distributed/compatibility.py +@@ -8,6 +8,7 @@ logging_names = logging._levelToName.cop + logging_names.update(logging._nameToLevel) + + PYPY = platform.python_implementation().lower() == "pypy" ++LINUX = sys.platform == "linux" + MACOS = sys.platform == "darwin" + WINDOWS = sys.platform.startswith("win") + TORNADO6 = tornado.version_info[0] >= 6 +Index: distributed-2021.7.0/distributed/dashboard/tests/test_scheduler_bokeh.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/dashboard/tests/test_scheduler_bokeh.py ++++ distributed-2021.7.0/distributed/dashboard/tests/test_scheduler_bokeh.py +@@ -17,7 +17,6 @@ from dask.core import flatten + from dask.utils import stringify + + from distributed.client import wait +-from distributed.compatibility import MACOS + from distributed.dashboard import scheduler + from distributed.dashboard.components.scheduler import ( + AggregateAction, +@@ -93,10 +92,8 @@ async def test_counters(c, s, a, b): + await asyncio.sleep(0.1) + ss.update() + +- start = time() + while not len(ss.digest_sources["tick-duration"][0].data["x"]): +- await asyncio.sleep(1) +- assert time() < start + 5 ++ await asyncio.sleep(0.01) + + + @gen_cluster(client=True) +@@ -184,7 +181,7 @@ async def test_task_stream_clear_interva + + await wait(c.map(inc, range(10))) + ts.update() +- await asyncio.sleep(0.010) ++ await asyncio.sleep(0.01) + await wait(c.map(dec, range(10))) + ts.update() + +@@ -192,7 +189,7 @@ async def test_task_stream_clear_interva + assert ts.source.data["name"].count("inc") == 10 + assert ts.source.data["name"].count("dec") == 10 + +- await asyncio.sleep(0.300) ++ await asyncio.sleep(0.3) + await wait(c.map(inc, range(10, 20))) + ts.update() + +@@ -848,7 +845,6 @@ async def test_aggregate_action(c, s, a, + assert ("compute") in mbk.action_source.data["names"] + + +-@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=MACOS) + @gen_cluster(client=True, scheduler_kwargs={"dashboard": True}) + async def test_compute_per_key(c, s, a, b): + mbk = ComputePerKey(s) +Index: distributed-2021.7.0/distributed/deploy/local.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/deploy/local.py ++++ distributed-2021.7.0/distributed/deploy/local.py +@@ -136,8 +136,8 @@ class LocalCluster(SpecCluster): + + if threads_per_worker == 0: + warnings.warn( +- "Setting `threads_per_worker` to 0 is discouraged. " +- "Please set to None or to a specific int to get best behavior." ++ "Setting `threads_per_worker` to 0 has been deprecated. " ++ "Please set to None or to a specific int." + ) + threads_per_worker = None + +Index: distributed-2021.7.0/distributed/deploy/old_ssh.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/deploy/old_ssh.py ++++ distributed-2021.7.0/distributed/deploy/old_ssh.py +@@ -51,8 +51,8 @@ def async_ssh(cmd_dict): + port=cmd_dict["ssh_port"], + key_filename=cmd_dict["ssh_private_key"], + compress=True, +- timeout=20, +- banner_timeout=20, ++ timeout=30, ++ banner_timeout=30, + ) # Helps prevent timeouts when many concurrent ssh connections are opened. + # Connection successful, break out of while loop + break +Index: distributed-2021.7.0/distributed/deploy/tests/test_adaptive.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/deploy/tests/test_adaptive.py ++++ distributed-2021.7.0/distributed/deploy/tests/test_adaptive.py +@@ -8,6 +8,7 @@ import pytest + import dask + + from distributed import Adaptive, Client, LocalCluster, SpecCluster, Worker, wait ++from distributed.compatibility import WINDOWS + from distributed.metrics import time + from distributed.utils_test import async_wait_for, clean, gen_test, slowinc + +@@ -40,8 +41,8 @@ def test_adaptive_local_cluster(loop): + assert not c.nthreads() + + +-@pytest.mark.asyncio +-async def test_adaptive_local_cluster_multi_workers(cleanup): ++@gen_test() ++async def test_adaptive_local_cluster_multi_workers(): + async with LocalCluster( + n_workers=0, + scheduler_port=0, +@@ -56,19 +57,14 @@ async def test_adaptive_local_cluster_mu + async with Client(cluster, asynchronous=True) as c: + futures = c.map(slowinc, range(100), delay=0.01) + +- start = time() + while not cluster.scheduler.workers: + await asyncio.sleep(0.01) +- assert time() < start + 15, adapt.log + + await c.gather(futures) + del futures + +- start = time() +- # while cluster.workers: + while cluster.scheduler.workers: + await asyncio.sleep(0.01) +- assert time() < start + 15, adapt.log + + # no workers for a while + for i in range(10): +@@ -242,7 +238,7 @@ async def test_adapt_quickly(): + await cluster.close() + + +-@gen_test(timeout=None) ++@gen_test() + async def test_adapt_down(): + """Ensure that redefining adapt with a lower maximum removes workers""" + async with LocalCluster( +@@ -302,14 +298,15 @@ def test_basic_no_loop(loop): + loop.add_callback(loop.stop) + + +-@pytest.mark.asyncio ++@pytest.mark.flaky(condition=not WINDOWS, reruns=10, reruns_delay=5) ++@pytest.mark.xfail(condition=WINDOWS, reason="extremely flaky") ++@gen_test() + async def test_target_duration(): +- """Ensure that redefining adapt with a lower maximum removes workers""" + with dask.config.set( + {"distributed.scheduler.default-task-durations": {"slowinc": 1}} + ): + async with LocalCluster( +- 0, ++ n_workers=0, + asynchronous=True, + processes=False, + scheduler_port=0, +@@ -318,16 +315,12 @@ async def test_target_duration(): + ) as cluster: + adapt = cluster.adapt(interval="20ms", minimum=2, target_duration="5s") + async with Client(cluster, asynchronous=True) as client: +- while len(cluster.scheduler.workers) < 2: +- await asyncio.sleep(0.01) +- ++ await client.wait_for_workers(2) + futures = client.map(slowinc, range(100), delay=0.3) ++ await wait(futures) + +- while len(adapt.log) < 2: +- await asyncio.sleep(0.01) +- +- assert adapt.log[0][1] == {"status": "up", "n": 2} +- assert adapt.log[1][1] == {"status": "up", "n": 20} ++ assert adapt.log[0][1] == {"status": "up", "n": 2} ++ assert adapt.log[1][1] == {"status": "up", "n": 20} + + + @pytest.mark.asyncio +Index: distributed-2021.7.0/distributed/deploy/tests/test_adaptive_core.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/deploy/tests/test_adaptive_core.py ++++ distributed-2021.7.0/distributed/deploy/tests/test_adaptive_core.py +@@ -85,10 +85,10 @@ async def test_interval(): + assert time() < start + 2 + + adapt.stop() +- await asyncio.sleep(0.050) ++ await asyncio.sleep(0.05) + + adapt._target = 10 +- await asyncio.sleep(0.020) ++ await asyncio.sleep(0.02) + assert len(adapt.plan) == 1 # last value from before, unchanged + + +Index: distributed-2021.7.0/distributed/deploy/tests/test_local.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/deploy/tests/test_local.py ++++ distributed-2021.7.0/distributed/deploy/tests/test_local.py +@@ -17,6 +17,7 @@ from tornado.ioloop import IOLoop + from dask.system import CPU_COUNT + + from distributed import Client, Nanny, Worker, get_client ++from distributed.compatibility import LINUX + from distributed.core import Status + from distributed.deploy.local import LocalCluster + from distributed.deploy.utils_test import ClusterTest +@@ -693,13 +694,12 @@ def test_adapt_then_manual(loop): + processes=False, + n_workers=8, + ) as cluster: +- sleep(0.1) + cluster.adapt(minimum=0, maximum=4, interval="10ms") + + start = time() + while cluster.scheduler.workers or cluster.workers: +- sleep(0.1) +- assert time() < start + 5 ++ sleep(0.01) ++ assert time() < start + 30 + + assert not cluster.workers + +@@ -715,8 +715,8 @@ def test_adapt_then_manual(loop): + + start = time() + while len(cluster.scheduler.workers) != 2: +- sleep(0.1) +- assert time() < start + 5 ++ sleep(0.01) ++ assert time() < start + 30 + + + @pytest.mark.parametrize("temporary", [True, False]) +@@ -843,9 +843,7 @@ def test_protocol_tcp(loop): + assert cluster.scheduler.address.startswith("tcp://") + + +-@pytest.mark.skipif( +- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" +-) ++@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") + def test_protocol_ip(loop): + with LocalCluster( + host="tcp://127.0.0.2", loop=loop, n_workers=0, processes=False +@@ -990,7 +988,7 @@ async def test_repr(cleanup): + @pytest.mark.asyncio + async def test_threads_per_worker_set_to_0(cleanup): + with pytest.warns( +- Warning, match="Setting `threads_per_worker` to 0 is discouraged." ++ Warning, match="Setting `threads_per_worker` to 0 has been deprecated." + ): + async with LocalCluster( + n_workers=2, processes=False, threads_per_worker=0, asynchronous=True +Index: distributed-2021.7.0/distributed/deploy/tests/test_spec_cluster.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/deploy/tests/test_spec_cluster.py ++++ distributed-2021.7.0/distributed/deploy/tests/test_spec_cluster.py +@@ -14,6 +14,7 @@ from distributed.core import Status + from distributed.deploy.spec import ProcessInterface, close_clusters, run_spec + from distributed.metrics import time + from distributed.utils import is_valid_xml ++from distributed.utils_test import gen_test + + + class MyWorker(Worker): +@@ -138,8 +139,8 @@ async def test_scale(cleanup): + + + @pytest.mark.slow +-@pytest.mark.asyncio +-async def test_adaptive_killed_worker(cleanup): ++@gen_test() ++async def test_adaptive_killed_worker(): + with dask.config.set({"distributed.deploy.lost-worker-timeout": 0.1}): + + async with SpecCluster( +@@ -147,13 +148,10 @@ async def test_adaptive_killed_worker(cl + worker={"cls": Nanny, "options": {"nthreads": 1}}, + scheduler={"cls": Scheduler, "options": {"port": 0}}, + ) as cluster: +- + async with Client(cluster, asynchronous=True) as client: +- +- cluster.adapt(minimum=1, maximum=1) +- + # Scale up a cluster with 1 worker. +- while len(cluster.workers) != 1: ++ cluster.adapt(minimum=1, maximum=1) ++ while not cluster.workers: + await asyncio.sleep(0.01) + + future = client.submit(sleep, 0.1) +@@ -163,11 +161,11 @@ async def test_adaptive_killed_worker(cl + await cluster.workers[worker_id].kill() + + # Wait for the worker to re-spawn and finish sleeping. +- await future.result(timeout=5) ++ await future + + +-@pytest.mark.asyncio +-async def test_unexpected_closed_worker(cleanup): ++@gen_test() ++async def test_unexpected_closed_worker(): + worker = {"cls": Worker, "options": {"nthreads": 1}} + with dask.config.set({"distributed.deploy.lost-worker-timeout": "10ms"}): + async with SpecCluster( +@@ -197,25 +195,20 @@ async def test_unexpected_closed_worker( + assert len(cluster.workers) == 2 + + +-@pytest.mark.flaky(reruns=10, reruns_delay=5) +-@pytest.mark.slow +-@pytest.mark.asyncio +-async def test_restart(cleanup): +- # Regression test for https://github.com/dask/distributed/issues/3062 ++@gen_test(timeout=60) ++async def test_restart(): ++ """Regression test for https://github.com/dask/distributed/issues/3062""" + worker = {"cls": Nanny, "options": {"nthreads": 1}} +- with dask.config.set({"distributed.deploy.lost-worker-timeout": "2s"}): +- async with SpecCluster( +- asynchronous=True, scheduler=scheduler, worker=worker +- ) as cluster: +- async with Client(cluster, asynchronous=True) as client: +- cluster.scale(2) +- await cluster +- assert len(cluster.workers) == 2 +- await client.restart() +- start = time() +- while len(cluster.workers) < 2: +- await asyncio.sleep(0.5) +- assert time() < start + 60 ++ async with SpecCluster( ++ asynchronous=True, scheduler=scheduler, worker=worker ++ ) as cluster: ++ async with Client(cluster, asynchronous=True) as client: ++ cluster.scale(2) ++ await cluster ++ assert len(cluster.workers) == 2 ++ await client.restart() ++ while len(cluster.workers) < 2: ++ await asyncio.sleep(0.01) + + + @pytest.mark.skipif(WINDOWS, reason="HTTP Server doesn't close out") +Index: distributed-2021.7.0/distributed/diagnostics/tests/test_progress.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/diagnostics/tests/test_progress.py ++++ distributed-2021.7.0/distributed/diagnostics/tests/test_progress.py +@@ -4,6 +4,7 @@ import pytest + + from distributed import Nanny + from distributed.client import wait ++from distributed.compatibility import LINUX + from distributed.diagnostics.progress import ( + AllProgress, + GroupProgress, +@@ -95,8 +96,9 @@ def check_bar_completed(capsys, width=40 + assert percent == "100% Completed" + + ++@pytest.mark.flaky(condition=not COMPILED and LINUX, reruns=10, reruns_delay=5) + @pytest.mark.xfail(COMPILED, reason="Fails with cythonized scheduler") +-@gen_cluster(client=True, Worker=Nanny, timeout=None) ++@gen_cluster(client=True, Worker=Nanny) + async def test_AllProgress(c, s, a, b): + x, y, z = c.map(inc, [1, 2, 3]) + xx, yy, zz = c.map(dec, [x, y, z]) +@@ -178,8 +180,9 @@ async def test_AllProgress(c, s, a, b): + assert all(set(d) == {"div"} for d in p.state.values()) + + ++@pytest.mark.flaky(condition=LINUX, reruns=10, reruns_delay=5) + @gen_cluster(client=True, Worker=Nanny) +-async def test_AllProgress_lost_key(c, s, a, b, timeout=None): ++async def test_AllProgress_lost_key(c, s, a, b): + p = AllProgress(s) + futures = c.map(inc, range(5)) + await wait(futures) +@@ -188,10 +191,8 @@ async def test_AllProgress_lost_key(c, s + await a.close() + await b.close() + +- start = time() + while len(p.state["memory"]["inc"]) > 0: +- await asyncio.sleep(0.1) +- assert time() < start + 5 ++ await asyncio.sleep(0.01) + + + @gen_cluster(client=True) +Index: distributed-2021.7.0/distributed/diagnostics/tests/test_scheduler_plugin.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/diagnostics/tests/test_scheduler_plugin.py ++++ distributed-2021.7.0/distributed/diagnostics/tests/test_scheduler_plugin.py +@@ -33,7 +33,7 @@ async def test_simple(c, s, a, b): + assert counter not in s.plugins + + +-@gen_cluster(nthreads=[], client=False) ++@gen_cluster(nthreads=[]) + async def test_add_remove_worker(s): + events = [] + +@@ -71,7 +71,7 @@ async def test_add_remove_worker(s): + assert events == [] + + +-@gen_cluster(nthreads=[], client=False) ++@gen_cluster(nthreads=[]) + async def test_async_add_remove_worker(s): + events = [] + +Index: distributed-2021.7.0/distributed/diagnostics/tests/test_widgets.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/diagnostics/tests/test_widgets.py ++++ distributed-2021.7.0/distributed/diagnostics/tests/test_widgets.py +@@ -5,8 +5,6 @@ pytest.importorskip("ipywidgets") + from ipykernel.comm import Comm + from ipywidgets import Widget + +-from distributed.compatibility import WINDOWS +- + ################# + # Utility stuff # + ################# +@@ -145,7 +143,6 @@ async def test_multi_progressbar_widget( + assert sorted(capacities, reverse=True) == capacities + + +-@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=WINDOWS) + @gen_cluster() + async def test_multi_progressbar_widget_after_close(s, a, b): + s.update_graph( +Index: distributed-2021.7.0/distributed/distributed.yaml +=================================================================== +--- distributed-2021.7.0.orig/distributed/distributed.yaml ++++ distributed-2021.7.0/distributed/distributed.yaml +@@ -176,7 +176,7 @@ distributed: + threads: 0 # Threads to use. 0 for single-threaded, -1 to infer from cpu count. + + timeouts: +- connect: 10s # time before connecting fails ++ connect: 30s # time before connecting fails + tcp: 30s # time before calling an unresponsive connection dead + + require-encryption: null # Whether to require encryption on non-local comms +Index: distributed-2021.7.0/distributed/nanny.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/nanny.py ++++ distributed-2021.7.0/distributed/nanny.py +@@ -384,7 +384,7 @@ class Nanny(ServerNode): + raise + return result + +- async def restart(self, comm=None, timeout=2, executor_wait=True): ++ async def restart(self, comm=None, timeout=30, executor_wait=True): + async def _(): + if self.process is not None: + await self.kill() +@@ -393,7 +393,9 @@ class Nanny(ServerNode): + try: + await asyncio.wait_for(_(), timeout) + except TimeoutError: +- logger.error("Restart timed out, returning before finished") ++ logger.error( ++ f"Restart timed out after {timeout}s; returning before finished" ++ ) + return "timed out" + else: + return "OK" +Index: distributed-2021.7.0/distributed/protocol/tests/test_pickle.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/protocol/tests/test_pickle.py ++++ distributed-2021.7.0/distributed/protocol/tests/test_pickle.py +@@ -128,7 +128,6 @@ def test_pickle_numpy(): + assert (deserialize(h, f) == x).all() + + +-@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=sys.version_info[:2] == (3, 8)) + def test_pickle_functions(): + def make_closure(): + value = 1 +Index: distributed-2021.7.0/distributed/scheduler.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/scheduler.py ++++ distributed-2021.7.0/distributed/scheduler.py +@@ -3440,19 +3440,18 @@ class Scheduler(SchedulerState, ServerNo + + http_server_modules = dask.config.get("distributed.scheduler.http.routes") + show_dashboard = dashboard or (dashboard is None and dashboard_address) +- missing_bokeh = False + # install vanilla route if show_dashboard but bokeh is not installed + if show_dashboard: + try: + import distributed.dashboard.scheduler + except ImportError: +- missing_bokeh = True ++ show_dashboard = False + http_server_modules.append("distributed.http.scheduler.missing_bokeh") + routes = get_handlers( + server=self, modules=http_server_modules, prefix=http_prefix + ) + self.start_http_server(routes, dashboard_address, default_port=8787) +- if show_dashboard and not missing_bokeh: ++ if show_dashboard: + distributed.dashboard.scheduler.connect( + self.http_application, self.http_server, self, prefix=http_prefix + ) +@@ -5467,8 +5466,8 @@ class Scheduler(SchedulerState, ServerNo + for collection in self._task_state_collections: + collection.clear() + +- async def restart(self, client=None, timeout=3): +- """Restart all workers. Reset local state.""" ++ async def restart(self, client=None, timeout=30): ++ """Restart all workers. Reset local state.""" + parent: SchedulerState = cast(SchedulerState, self) + with log_errors(): + +Index: distributed-2021.7.0/distributed/tests/test_actor.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/tests/test_actor.py ++++ distributed-2021.7.0/distributed/tests/test_actor.py +@@ -485,10 +485,8 @@ async def test_compute(c, s, a, b): + result = await c.compute(final, actors=counter) + assert result == 0 + 1 + 2 + 3 + 4 + +- start = time() + while a.data or b.data: + await asyncio.sleep(0.01) +- assert time() < start + 30 + + + def test_compute_sync(client): +Index: distributed-2021.7.0/distributed/tests/test_as_completed.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/tests/test_as_completed.py ++++ distributed-2021.7.0/distributed/tests/test_as_completed.py +@@ -14,7 +14,7 @@ from distributed.utils_test import gen_c + + + @gen_cluster(client=True) +-async def test__as_completed(c, s, a, b): ++async def test_as_completed_async(c, s, a, b): + x = c.submit(inc, 1) + y = c.submit(inc, 1) + z = c.submit(inc, 2) +@@ -29,7 +29,7 @@ async def test__as_completed(c, s, a, b) + assert result in [x, y, z] + + +-def test_as_completed(client): ++def test_as_completed_sync(client): + x = client.submit(inc, 1) + y = client.submit(inc, 2) + z = client.submit(inc, 1) +@@ -201,7 +201,6 @@ async def test_as_completed_with_results + assert str(exc.value) == "hello!" + + +-@pytest.mark.flaky(reruns=10, reruns_delay=5) + def test_as_completed_with_results_no_raise(client): + x = client.submit(throws, 1) + y = client.submit(inc, 5) +@@ -260,7 +259,7 @@ async def test_as_completed_with_results + assert dd[z][0] == 2 + + +-@gen_cluster(client=True, timeout=None) ++@gen_cluster(client=True) + async def test_clear(c, s, a, b): + futures = c.map(inc, range(3)) + ac = as_completed(futures) +Index: distributed-2021.7.0/distributed/tests/test_asyncprocess.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/tests/test_asyncprocess.py ++++ distributed-2021.7.0/distributed/tests/test_asyncprocess.py +@@ -8,6 +8,7 @@ import weakref + from datetime import timedelta + from time import sleep + ++import psutil + import pytest + from tornado import gen + from tornado.locks import Event +@@ -280,13 +281,9 @@ async def test_child_main_thread(): + q._writer.close() + + +-@pytest.mark.skipif( +- sys.platform.startswith("win"), reason="num_fds not supported on windows" +-) ++@pytest.mark.skipif(WINDOWS, reason="num_fds not supported on windows") + @gen_test() + async def test_num_fds(): +- psutil = pytest.importorskip("psutil") +- + # Warm up + proc = AsyncProcess(target=exit_now) + proc.daemon = True +@@ -303,11 +300,8 @@ async def test_num_fds(): + assert not proc.is_alive() + assert proc.exitcode == 0 + +- start = time() + while p.num_fds() > before: +- await asyncio.sleep(0.1) +- print("fds:", before, p.num_fds()) +- assert time() < start + 10 ++ await asyncio.sleep(0.01) + + + @gen_test() +@@ -407,7 +401,7 @@ def test_asyncprocess_child_teardown_on_ + try: + readable = children_alive.poll(short_timeout) + except BrokenPipeError: +- assert sys.platform.startswith("win"), "should only raise on windows" ++ assert WINDOWS, "should only raise on windows" + # Broken pipe implies closed, which is readable. + readable = True + +@@ -422,7 +416,7 @@ def test_asyncprocess_child_teardown_on_ + except EOFError: + pass # Test passes. + except BrokenPipeError: +- assert sys.platform.startswith("win"), "should only raise on windows" ++ assert WINDOWS, "should only raise on windows" + # Test passes. + else: + # Oops, children_alive read something. It should be closed. If +Index: distributed-2021.7.0/distributed/tests/test_client.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/tests/test_client.py ++++ distributed-2021.7.0/distributed/tests/test_client.py +@@ -55,7 +55,7 @@ from distributed.client import ( + wait, + ) + from distributed.comm import CommClosedError +-from distributed.compatibility import MACOS, WINDOWS ++from distributed.compatibility import LINUX, WINDOWS + from distributed.core import Status + from distributed.metrics import time + from distributed.objects import HasWhat, WhoHas +@@ -95,7 +95,7 @@ from distributed.utils_test import ( + ) + + +-@gen_cluster(client=True, timeout=None) ++@gen_cluster(client=True) + async def test_submit(c, s, a, b): + x = c.submit(inc, 10) + assert not x.done() +@@ -624,7 +624,7 @@ async def test_limit_concurrent_gatherin + assert len(a.outgoing_transfer_log) + len(b.outgoing_transfer_log) < 100 + + +-@gen_cluster(client=True, timeout=None) ++@gen_cluster(client=True) + async def test_get(c, s, a, b): + future = c.get({"x": (inc, 1)}, "x", sync=False) + assert isinstance(future, Future) +@@ -717,7 +717,7 @@ async def test_wait_first_completed(c, s + assert y.status == "pending" + + +-@gen_cluster(client=True, timeout=2) ++@gen_cluster(client=True) + async def test_wait_timeout(c, s, a, b): + future = c.submit(sleep, 0.3) + with pytest.raises(TimeoutError): +@@ -794,7 +794,7 @@ async def test_garbage_collection_with_s + await asyncio.sleep(0.1) + + +-@gen_cluster(timeout=1000, client=True) ++@gen_cluster(client=True) + async def test_recompute_released_key(c, s, a, b): + x = c.submit(inc, 100) + result1 = await x +@@ -907,9 +907,7 @@ async def test_tokenize_on_futures(c, s, + assert tok == tokenize(y) + + +-@pytest.mark.skipif( +- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" +-) ++@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") + @gen_cluster([("127.0.0.1", 1), ("127.0.0.2", 2)], client=True) + async def test_restrictions_submit(c, s, a, b): + x = c.submit(inc, 1, workers={a.ip}) +@@ -936,9 +934,7 @@ async def test_restrictions_ip_port(c, s + assert y.key in b.data + + +-@pytest.mark.skipif( +- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" +-) ++@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") + @gen_cluster([("127.0.0.1", 1), ("127.0.0.2", 2)], client=True) + async def test_restrictions_map(c, s, a, b): + L = c.map(inc, range(5), workers={a.ip}) +@@ -950,9 +946,7 @@ async def test_restrictions_map(c, s, a, + assert s.host_restrictions[x.key] == {a.ip} + + +-@pytest.mark.skipif( +- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" +-) ++@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") + @gen_cluster([("127.0.0.1", 1), ("127.0.0.2", 2)], client=True) + async def test_restrictions_get(c, s, a, b): + dsk = {"x": 1, "y": (inc, "x"), "z": (inc, "y")} +@@ -991,7 +985,7 @@ async def dont_test_bad_restrictions_rai + assert z.key in str(e) + + +-@gen_cluster(client=True, timeout=None) ++@gen_cluster(client=True) + async def test_remove_worker(c, s, a, b): + L = c.map(inc, range(20)) + await wait(L) +@@ -1280,9 +1274,7 @@ async def test_get_nbytes(c, s, a, b): + assert s.get_nbytes(summary=False) == {x.key: sizeof(1), y.key: sizeof(2)} + + +-@pytest.mark.skipif( +- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" +-) ++@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") + @gen_cluster([("127.0.0.1", 1), ("127.0.0.2", 2)], client=True) + async def test_nbytes_determines_worker(c, s, a, b): + x = c.submit(identity, 1, workers=[a.ip]) +@@ -1455,7 +1447,7 @@ async def test_scatter_direct_empty(c, s + await c.scatter(123, direct=True, timeout=0.1) + + +-@gen_cluster(client=True, timeout=None, nthreads=[("127.0.0.1", 1)] * 5) ++@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 5) + async def test_scatter_direct_spread_evenly(c, s, *workers): + futures = [] + for i in range(10): +@@ -1799,7 +1791,7 @@ async def test_remote_scatter_gather(c, + assert (xx, yy, zz) == (1, 2, 3) + + +-@gen_cluster(timeout=1000, client=True) ++@gen_cluster(client=True) + async def test_remote_submit_on_Future(c, s, a, b): + x = c.submit(lambda x: x + 1, 1) + y = c.submit(lambda x: x + 1, x) +@@ -1836,9 +1828,7 @@ async def test_client_with_scheduler(c, + assert result == 12 + + +-@pytest.mark.skipif( +- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" +-) ++@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") + @gen_cluster([("127.0.0.1", 1), ("127.0.0.2", 2)], client=True) + async def test_allow_restrictions(c, s, a, b): + aws = s.workers[a.address] +@@ -1971,7 +1961,7 @@ async def test_badly_serialized_input(c, + assert "hello!" in str(info.value) + + +-@pytest.mark.skipif("True", reason="") ++@pytest.mark.skip + async def test_badly_serialized_input_stderr(capsys, c): + o = BadlySerializedObject() + future = c.submit(inc, o) +@@ -2853,7 +2843,6 @@ async def test_persist_get(c, s, a, b): + + @pytest.mark.skipif(WINDOWS, reason="num_fds not supported on windows") + def test_client_num_fds(loop): +- psutil = pytest.importorskip("psutil") + with cluster() as (s, [a, b]): + proc = psutil.Process() + with Client(s["address"], loop=loop) as c: # first client to start loop +@@ -2864,7 +2853,7 @@ def test_client_num_fds(loop): + start = time() + while proc.num_fds() > before: + sleep(0.01) +- assert time() < start + 4 ++ assert time() < start + 10, (before, proc.num_fds()) + + + @gen_cluster() +@@ -3035,9 +3024,7 @@ async def test_receive_lost_key(c, s, a, + await asyncio.sleep(0.01) + + +-@pytest.mark.skipif( +- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" +-) ++@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") + @gen_cluster([("127.0.0.1", 1), ("127.0.0.2", 2)], client=True) + async def test_unrunnable_task_runs(c, s, a, b): + x = c.submit(inc, 1, workers=[a.ip]) +@@ -3073,9 +3060,7 @@ async def test_add_worker_after_tasks(c, + await n.close() + + +-@pytest.mark.skipif( +- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" +-) ++@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") + @gen_cluster([("127.0.0.1", 1), ("127.0.0.2", 2)], client=True) + async def test_workers_register_indirect_data(c, s, a, b): + [x] = await c.scatter([1], workers=a.address) +@@ -3205,13 +3190,10 @@ async def test_client_replicate(c, s, *w + assert len(s.tasks[y.key].who_has) == 10 + + +-@pytest.mark.skipif( +- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" +-) ++@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") + @gen_cluster( + client=True, + nthreads=[("127.0.0.1", 1), ("127.0.0.2", 1), ("127.0.0.2", 1)], +- timeout=None, + ) + async def test_client_replicate_host(client, s, a, b, c): + aws = s.workers[a.address] +@@ -3565,7 +3547,7 @@ def test_get_returns_early(c): + + + @pytest.mark.slow +-@gen_cluster(Worker=Nanny, client=True) ++@gen_cluster(Worker=Nanny, client=True, timeout=60) + async def test_Client_clears_references_after_restart(c, s, a, b): + x = c.submit(inc, 1) + assert x.key in c.refcount +@@ -3789,7 +3771,6 @@ async def test_reconnect_timeout(c, s): + @pytest.mark.skipif(WINDOWS, reason="num_fds not supported on windows") + @pytest.mark.parametrize("worker,count,repeat", [(Worker, 100, 5), (Nanny, 10, 20)]) + def test_open_close_many_workers(loop, worker, count, repeat): +- psutil = pytest.importorskip("psutil") + proc = psutil.Process() + + with cluster(nworkers=0, active_rpc_timeout=2) as (s, _): +@@ -3857,7 +3838,7 @@ def test_open_close_many_workers(loop, w + raise ValueError("File descriptors did not clean up") + + +-@gen_cluster(client=False, timeout=None) ++@gen_cluster() + async def test_idempotence(s, a, b): + c = await Client(s.address, asynchronous=True) + f = await Client(s.address, asynchronous=True) +@@ -4070,7 +4051,7 @@ async def test_scatter_compute_store_los + assert z.status == "cancelled" + + +-@gen_cluster(client=False) ++@gen_cluster() + async def test_serialize_future(s, a, b): + c1 = await Client(s.address, asynchronous=True) + c2 = await Client(s.address, asynchronous=True) +@@ -4091,7 +4072,7 @@ async def test_serialize_future(s, a, b) + await c2.close() + + +-@gen_cluster(client=False) ++@gen_cluster() + async def test_temp_default_client(s, a, b): + c1 = await Client(s.address, asynchronous=True) + c2 = await Client(s.address, asynchronous=True) +@@ -4172,7 +4153,7 @@ def test_as_current_is_thread_local(s): + t2.join() + + +-@gen_cluster(client=False) ++@gen_cluster() + async def test_as_current_is_task_local(s, a, b): + l1 = asyncio.Lock() + l2 = asyncio.Lock() +@@ -4557,7 +4538,7 @@ def assert_no_data_loss(scheduler): + assert not (k == key and v == "waiting") + + +-@gen_cluster(client=True, timeout=None) ++@gen_cluster(client=True) + async def test_interleave_computations(c, s, a, b): + import distributed + +@@ -4592,7 +4573,7 @@ async def test_interleave_computations(c + + + @pytest.mark.skip(reason="Now prefer first-in-first-out") +-@gen_cluster(client=True, timeout=None) ++@gen_cluster(client=True) + async def test_interleave_computations_map(c, s, a, b): + xs = c.map(slowinc, range(30), delay=0.02) + ys = c.map(slowdec, xs, delay=0.02) +@@ -4621,7 +4602,6 @@ async def test_scatter_dict_workers(c, s + assert "a" in a.data or "a" in b.data + + +-@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=MACOS) + @pytest.mark.slow + @gen_test() + async def test_client_timeout(): +@@ -4629,18 +4609,17 @@ async def test_client_timeout(): + + s = Scheduler(loop=c.loop, port=57484) + await asyncio.sleep(4) ++ + try: + await s + except OSError: # port in use + await c.close() + return + +- start = time() +- await c + try: +- assert time() < start + 2 +- finally: ++ await c + await c.close() ++ finally: + await s.close() + + +@@ -5167,7 +5146,7 @@ async def test_serialize_collections(c, + assert result == sum(range(10)) + + +-@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 1, timeout=100) ++@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 1) + async def test_secede_simple(c, s, a): + def f(): + client = get_client() +@@ -5178,7 +5157,6 @@ async def test_secede_simple(c, s, a): + assert result == 2 + + +-@pytest.mark.flaky(reruns=10, reruns_delay=5) + @pytest.mark.slow + @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 2, timeout=60) + async def test_secede_balances(c, s, a, b): +@@ -5276,18 +5254,15 @@ def _dynamic_workload(x, delay=0.01): + return total.result() + + +-def _test_dynamic_workloads_sync(c, delay): +- future = c.submit(_dynamic_workload, 0, delay=delay) +- assert future.result(timeout=40) == 52 +- +- + def test_dynamic_workloads_sync(c): +- _test_dynamic_workloads_sync(c, delay=0.02) ++ future = c.submit(_dynamic_workload, 0, delay=0.02) ++ assert future.result(timeout=20) == 52 + + + @pytest.mark.slow + def test_dynamic_workloads_sync_random(c): +- _test_dynamic_workloads_sync(c, delay="random") ++ future = c.submit(_dynamic_workload, 0, delay="random") ++ assert future.result(timeout=20) == 52 + + + @pytest.mark.xfail(COMPILED, reason="Fails with cythonized scheduler") +@@ -5424,6 +5399,7 @@ async def test_call_stack_collections_al + assert result + + ++@pytest.mark.flaky(condition=WINDOWS, reruns=10, reruns_delay=5) + @gen_cluster(client=True, worker_kwargs={"profile_cycle_interval": "100ms"}) + async def test_profile(c, s, a, b): + futures = c.map(slowinc, range(10), delay=0.05, workers=a.address) +@@ -5843,9 +5819,15 @@ async def test_scatter_error_cancel(c, s + assert y.status == "error" # not cancelled + + ++@pytest.mark.xfail(reason="GH#5409 Dask-Default-Threads are frequently detected") + def test_no_threads_lingering(): ++ if threading.active_count() < 40: ++ return + active = dict(threading._active) +- assert threading.active_count() < 40, list(active.values()) ++ print(f"==== Found {len(active)} active threads: ====") ++ for t in active.values(): ++ print(t) ++ assert False + + + @gen_cluster() +@@ -6071,18 +6053,18 @@ async def test_file_descriptors_dont_lea + df = dask.datasets.timeseries(freq="10s", dtypes={"x": int, "y": float}) + + proc = psutil.Process() +- start = proc.num_fds() ++ before = proc.num_fds() + async with Scheduler(port=0, dashboard_address=":0") as s: +- async with Worker(s.address, nthreads=2) as a, Worker( +- s.address, nthreads=2 +- ) as b: +- async with Client(s.address, asynchronous=True) as c: +- await df.sum().persist() ++ async with Worker(s.address), Worker(s.address), Client( ++ s.address, asynchronous=True ++ ): ++ assert proc.num_fds() > before ++ await df.sum().persist() + +- begin = time() +- while proc.num_fds() > begin: ++ start = time() ++ while proc.num_fds() > before: + await asyncio.sleep(0.01) +- assert time() < begin + 5, (start, proc.num_fds()) ++ assert time() < start + 10, (before, proc.num_fds()) + + + @pytest.mark.asyncio +Index: distributed-2021.7.0/distributed/tests/test_client_executor.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/tests/test_client_executor.py ++++ distributed-2021.7.0/distributed/tests/test_client_executor.py +@@ -13,6 +13,7 @@ import pytest + from tlz import take + + from distributed import Client ++from distributed.compatibility import MACOS + from distributed.utils import CancelledError + from distributed.utils_test import ( + cluster, +@@ -93,13 +94,12 @@ def test_wait(client): + assert "hello" in str(errors[0]) + + +-@pytest.mark.flaky(reruns=10, reruns_delay=5) + def test_cancellation(client): + with client.get_executor(pure=False) as e: + fut = e.submit(time.sleep, 2.0) + start = time.time() + while number_of_processing_tasks(client) == 0: +- assert time.time() < start + 1 ++ assert time.time() < start + 10 + time.sleep(0.01) + assert not fut.done() + +@@ -107,7 +107,7 @@ def test_cancellation(client): + assert fut.cancelled() + start = time.time() + while number_of_processing_tasks(client) != 0: +- assert time.time() < start + 1 ++ assert time.time() < start + 10 + time.sleep(0.01) + + with pytest.raises(CancelledError): +@@ -118,7 +118,7 @@ def test_cancellation(client): + N = 10 + fs = [e.submit(slowinc, i, delay=0.02) for i in range(N)] + fs[3].cancel() +- res = wait(fs, return_when=FIRST_COMPLETED) ++ res = wait(fs, return_when=FIRST_COMPLETED, timeout=30) + assert len(res.not_done) > 0 + assert len(res.done) >= 1 + +@@ -132,10 +132,11 @@ def test_cancellation(client): + fs[3].cancel() + fs[8].cancel() + +- n_cancelled = sum(f.cancelled() for f in as_completed(fs)) ++ n_cancelled = sum(f.cancelled() for f in as_completed(fs, timeout=30)) + assert n_cancelled == 2 + + ++@pytest.mark.flaky(condition=MACOS, reruns=10, reruns_delay=5) + def test_map(client): + with client.get_executor() as e: + N = 10 +Index: distributed-2021.7.0/distributed/tests/test_collections.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/tests/test_collections.py ++++ distributed-2021.7.0/distributed/tests/test_collections.py +@@ -40,7 +40,7 @@ def assert_equal(a, b): + assert a == b + + +-@gen_cluster(timeout=240, client=True) ++@gen_cluster(client=True) + async def test_dataframes(c, s, a, b): + df = pd.DataFrame( + {"x": np.random.random(1000), "y": np.random.random(1000)}, +Index: distributed-2021.7.0/distributed/tests/test_diskutils.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/tests/test_diskutils.py ++++ distributed-2021.7.0/distributed/tests/test_diskutils.py +@@ -12,7 +12,7 @@ import pytest + + import dask + +-from distributed.compatibility import MACOS, WINDOWS ++from distributed.compatibility import MACOS + from distributed.diskutils import WorkSpace + from distributed.metrics import time + from distributed.utils import mp_context +@@ -273,16 +273,14 @@ def _test_workspace_concurrency(tmpdir, + return n_created, n_purged + + +-@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=MACOS) + @pytest.mark.slow ++@pytest.mark.xfail(condition=MACOS, reason="extremely flaky") + def test_workspace_concurrency(tmpdir): +- if WINDOWS: +- raise pytest.xfail.Exception("TODO: unknown failure on windows") + _test_workspace_concurrency(tmpdir, 5.0, 6) + + +-@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=MACOS) + @pytest.mark.slow ++@pytest.mark.xfail(condition=MACOS, reason="extremely flaky") + def test_workspace_concurrency_intense(tmpdir): + n_created, n_purged = _test_workspace_concurrency(tmpdir, 8.0, 16) + assert n_created >= 100 +Index: distributed-2021.7.0/distributed/tests/test_failed_workers.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/tests/test_failed_workers.py ++++ distributed-2021.7.0/distributed/tests/test_failed_workers.py +@@ -75,12 +75,7 @@ def test_gather_after_failed_worker(loop + assert result == list(map(inc, range(10))) + + +-@gen_cluster( +- client=True, +- Worker=Nanny, +- nthreads=[("127.0.0.1", 1)] * 4, +- config={"distributed.comm.timeouts.connect": "1s"}, +-) ++@gen_cluster(client=True, Worker=Nanny, nthreads=[("127.0.0.1", 1)] * 4) + async def test_gather_then_submit_after_failed_workers(c, s, w, x, y, z): + L = c.map(inc, range(20)) + await wait(L) +@@ -88,7 +83,7 @@ async def test_gather_then_submit_after_ + w.process.process._process.terminate() + total = c.submit(sum, L) + +- for i in range(3): ++ for _ in range(3): + await wait(total) + addr = first(s.tasks[total.key].who_has).address + for worker in [x, y, z]: +@@ -101,7 +96,7 @@ async def test_gather_then_submit_after_ + + + @pytest.mark.xfail(COMPILED, reason="Fails with cythonized scheduler") +-@gen_cluster(Worker=Nanny, timeout=60, client=True) ++@gen_cluster(Worker=Nanny, client=True, timeout=60) + async def test_failed_worker_without_warning(c, s, a, b): + L = c.map(inc, range(10)) + await wait(L) +@@ -298,18 +293,20 @@ async def test_multiple_clients_restart( + await c2.close() + + +-@pytest.mark.flaky(reruns=10, reruns_timeout=5, condition=MACOS) + @gen_cluster(Worker=Nanny, timeout=60) + async def test_restart_scheduler(s, a, b): +- import gc ++ assert len(s.nthreads) == 2 ++ pids = (a.pid, b.pid) ++ assert pids[0] ++ assert pids[1] + +- gc.collect() +- addrs = (a.worker_address, b.worker_address) + await s.restart() +- assert len(s.nthreads) == 2 +- addrs2 = (a.worker_address, b.worker_address) + +- assert addrs != addrs2 ++ assert len(s.nthreads) == 2 ++ pids2 = (a.pid, b.pid) ++ assert pids2[0] ++ assert pids2[1] ++ assert pids != pids2 + + + @gen_cluster(Worker=Nanny, client=True, timeout=60) +@@ -325,6 +322,8 @@ async def test_forgotten_futures_dont_cl + await y + + ++@pytest.mark.slow ++@pytest.mark.flaky(condition=MACOS, reruns=10, reruns_delay=5) + @gen_cluster(client=True, timeout=60, active_rpc_timeout=10) + async def test_broken_worker_during_computation(c, s, a, b): + s.allowed_failures = 100 +@@ -404,14 +403,13 @@ class SlowTransmitData: + return parse_bytes(dask.config.get("distributed.comm.offload")) + 1 + + ++@pytest.mark.flaky(reruns=10, reruns_delay=5) + @gen_cluster(client=True) + async def test_worker_who_has_clears_after_failed_connection(c, s, a, b): + n = await Nanny(s.address, nthreads=2, loop=s.loop) + +- start = time() + while len(s.nthreads) < 3: + await asyncio.sleep(0.01) +- assert time() < start + 5 + + def slow_ser(x, delay): + return SlowTransmitData(x, delay=delay) +@@ -497,7 +495,7 @@ async def test_restart_timeout_on_long_r + with captured_logger("distributed.scheduler") as sio: + future = c.submit(sleep, 3600) + await asyncio.sleep(0.1) +- await c.restart(timeout=20) ++ await c.restart() + + text = sio.getvalue() + assert "timeout" not in text.lower() +@@ -547,7 +545,7 @@ class SlowDeserialize: + return parse_bytes(dask.config.get("distributed.comm.offload")) + 1 + + +-@gen_cluster(client=True, timeout=None) ++@gen_cluster(client=True) + async def test_handle_superfluous_data(c, s, a, b): + """ + See https://github.com/dask/distributed/pull/4784#discussion_r649210094 +Index: distributed-2021.7.0/distributed/tests/test_nanny.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/tests/test_nanny.py ++++ distributed-2021.7.0/distributed/tests/test_nanny.py +@@ -4,9 +4,10 @@ import logging + import multiprocessing as mp + import os + import random +-import sys + from contextlib import suppress ++from time import sleep + ++import psutil + import pytest + from tlz import first, valmap + from tornado.ioloop import IOLoop +@@ -14,6 +15,7 @@ from tornado.ioloop import IOLoop + import dask + + from distributed import Client, Nanny, Scheduler, Worker, rpc, wait, worker ++from distributed.compatibility import LINUX, WINDOWS + from distributed.core import CommClosedError, Status + from distributed.diagnostics import SchedulerPlugin + from distributed.metrics import time +@@ -72,12 +74,11 @@ async def test_str(s, a, b): + + @gen_cluster(nthreads=[], client=True) + async def test_nanny_process_failure(c, s): +- n = await Nanny(s.address, nthreads=2, loop=s.loop) ++ n = await Nanny(s.address, nthreads=2) + first_dir = n.worker_dir + + assert os.path.exists(first_dir) + +- original_address = n.worker_address + ww = rpc(n.worker_address) + await ww.update_data(data=valmap(dumps, {"x": 1, "y": 2})) + pid = n.pid +@@ -85,23 +86,17 @@ async def test_nanny_process_failure(c, + with suppress(CommClosedError): + await c.run(os._exit, 0, workers=[n.worker_address]) + +- start = time() + while n.pid == pid: # wait while process dies and comes back + await asyncio.sleep(0.01) +- assert time() - start < 5 + +- start = time() + await asyncio.sleep(1) + while not n.is_alive(): # wait while process comes back + await asyncio.sleep(0.01) +- assert time() - start < 5 + + # assert n.worker_address != original_address # most likely + +- start = time() + while n.worker_address not in s.nthreads or n.worker_dir is None: + await asyncio.sleep(0.01) +- assert time() - start < 5 + + second_dir = n.worker_dir + +@@ -115,7 +110,6 @@ async def test_nanny_process_failure(c, + + @gen_cluster(nthreads=[]) + async def test_run(s): +- pytest.importorskip("psutil") + n = await Nanny(s.address, nthreads=2, loop=s.loop) + + with rpc(n.address) as nn: +@@ -173,7 +167,7 @@ async def test_nanny_alt_worker_class(c, + + + @pytest.mark.slow +-@gen_cluster(client=False, nthreads=[]) ++@gen_cluster(nthreads=[]) + async def test_nanny_death_timeout(s): + await s.close() + w = Nanny(s.address, death_timeout=1) +@@ -198,12 +192,9 @@ async def test_random_seed(c, s, a, b): + await check_func(lambda a, b: np.random.randint(a, b)) + + +-@pytest.mark.skipif( +- sys.platform.startswith("win"), reason="num_fds not supported on windows" +-) +-@gen_cluster(client=False, nthreads=[]) ++@pytest.mark.skipif(WINDOWS, reason="num_fds not supported on windows") ++@gen_cluster(nthreads=[]) + async def test_num_fds(s): +- psutil = pytest.importorskip("psutil") + proc = psutil.Process() + + # Warm up +@@ -219,16 +210,12 @@ async def test_num_fds(s): + await asyncio.sleep(0.1) + await w.close() + +- start = time() + while proc.num_fds() > before: + print("fds:", before, proc.num_fds()) + await asyncio.sleep(0.1) +- assert time() < start + 10 + + +-@pytest.mark.skipif( +- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" +-) ++@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") + @gen_cluster(client=True, nthreads=[]) + async def test_worker_uses_same_host_as_nanny(c, s): + for host in ["tcp://0.0.0.0", "tcp://127.0.0.2"]: +@@ -273,29 +260,24 @@ async def test_nanny_timeout(c, s, a): + nthreads=[("127.0.0.1", 1)], + client=True, + Worker=Nanny, +- worker_kwargs={"memory_limit": 1e8}, +- timeout=20, +- clean_kwargs={"threads": False}, ++ worker_kwargs={"memory_limit": "400 MiB"}, + ) + async def test_nanny_terminate(c, s, a): +- from time import sleep +- + def leak(): + L = [] + while True: +- L.append(b"0" * 5000000) ++ L.append(b"0" * 5_000_000) + sleep(0.01) + +- proc = a.process.pid ++ before = a.process.pid + with captured_logger(logging.getLogger("distributed.nanny")) as logger: + future = c.submit(leak) +- start = time() +- while a.process.pid == proc: +- await asyncio.sleep(0.1) +- assert time() < start + 10 +- out = logger.getvalue() +- assert "restart" in out.lower() +- assert "memory" in out.lower() ++ while a.process.pid == before: ++ await asyncio.sleep(0.01) ++ ++ out = logger.getvalue() ++ assert "restart" in out.lower() ++ assert "memory" in out.lower() + + + @gen_cluster( +Index: distributed-2021.7.0/distributed/tests/test_publish.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/tests/test_publish.py ++++ distributed-2021.7.0/distributed/tests/test_publish.py +@@ -11,7 +11,7 @@ from distributed.protocol import Seriali + from distributed.utils_test import gen_cluster, inc + + +-@gen_cluster(client=False) ++@gen_cluster() + async def test_publish_simple(s, a, b): + c = Client(s.address, asynchronous=True) + f = Client(s.address, asynchronous=True) +@@ -37,7 +37,7 @@ async def test_publish_simple(s, a, b): + await asyncio.gather(c.close(), f.close()) + + +-@gen_cluster(client=False) ++@gen_cluster() + async def test_publish_non_string_key(s, a, b): + async with Client(s.address, asynchronous=True) as c: + for name in [("a", "b"), 9.0, 8]: +@@ -52,7 +52,7 @@ async def test_publish_non_string_key(s, + assert name in datasets + + +-@gen_cluster(client=False) ++@gen_cluster() + async def test_publish_roundtrip(s, a, b): + c = await Client(s.address, asynchronous=True) + f = await Client(s.address, asynchronous=True) +@@ -147,7 +147,7 @@ def test_unpublish_multiple_datasets_syn + assert "y" in str(exc_info.value) + + +-@gen_cluster(client=False) ++@gen_cluster() + async def test_publish_bag(s, a, b): + db = pytest.importorskip("dask.bag") + c = await Client(s.address, asynchronous=True) +Index: distributed-2021.7.0/distributed/tests/test_pubsub.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/tests/test_pubsub.py ++++ distributed-2021.7.0/distributed/tests/test_pubsub.py +@@ -10,7 +10,7 @@ from distributed.metrics import time + from distributed.utils_test import gen_cluster + + +-@gen_cluster(client=True, timeout=None) ++@gen_cluster(client=True) + async def test_speed(c, s, a, b): + """ + This tests how quickly we can move messages back and forth +Index: distributed-2021.7.0/distributed/tests/test_queues.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/tests/test_queues.py ++++ distributed-2021.7.0/distributed/tests/test_queues.py +@@ -110,7 +110,7 @@ def test_picklability_sync(client): + + + @pytest.mark.slow +-@gen_cluster(client=True, nthreads=[("127.0.0.1", 2)] * 5, Worker=Nanny, timeout=None) ++@gen_cluster(client=True, nthreads=[("127.0.0.1", 2)] * 5, Worker=Nanny) + async def test_race(c, s, *workers): + def f(i): + with worker_client() as c: +Index: distributed-2021.7.0/distributed/tests/test_resources.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/tests/test_resources.py ++++ distributed-2021.7.0/distributed/tests/test_resources.py +@@ -9,7 +9,6 @@ from dask.utils import stringify + + from distributed import Worker + from distributed.client import wait +-from distributed.compatibility import WINDOWS + from distributed.utils_test import gen_cluster, inc, slowadd, slowinc + + +@@ -378,9 +377,7 @@ async def test_full_collections(c, s, a, + reason="don't track resources through optimization" + ), + ), +- pytest.param( +- False, marks=pytest.mark.flaky(reruns=10, reruns_delay=5, condition=WINDOWS) +- ), ++ False, + ], + ) + def test_collections_get(client, optimize_graph, s, a, b): +Index: distributed-2021.7.0/distributed/tests/test_scheduler.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/tests/test_scheduler.py ++++ distributed-2021.7.0/distributed/tests/test_scheduler.py +@@ -12,6 +12,7 @@ from time import sleep + from unittest import mock + + import cloudpickle ++import psutil + import pytest + from tlz import concat, first, frequencies, merge, valmap + +@@ -21,7 +22,7 @@ from dask.utils import apply, parse_time + + from distributed import Client, Nanny, Worker, fire_and_forget, wait + from distributed.comm import Comm +-from distributed.compatibility import MACOS, WINDOWS ++from distributed.compatibility import LINUX, WINDOWS + from distributed.core import ConnectionPool, Status, connect, rpc + from distributed.metrics import time + from distributed.protocol.pickle import dumps +@@ -618,7 +619,7 @@ async def test_ready_remove_worker(s, a, + assert all(len(w.processing) > w.nthreads for w in s.workers.values()) + + +-@gen_cluster(client=True, Worker=Nanny) ++@gen_cluster(client=True, Worker=Nanny, timeout=60) + async def test_restart(c, s, a, b): + futures = c.map(inc, range(20)) + await wait(futures) +@@ -742,24 +743,17 @@ async def test_config_stealing(cleanup): + assert "stealing" not in s.extensions + + +-@pytest.mark.skipif( +- sys.platform.startswith("win"), reason="file descriptors not really a thing" +-) ++@pytest.mark.skipif(WINDOWS, reason="num_fds not supported on windows") + @gen_cluster(nthreads=[]) + async def test_file_descriptors_dont_leak(s): +- psutil = pytest.importorskip("psutil") + proc = psutil.Process() + before = proc.num_fds() + +- w = await Worker(s.address) +- await w.close() ++ async with Worker(s.address): ++ assert proc.num_fds() > before + +- during = proc.num_fds() +- +- start = time() + while proc.num_fds() > before: + await asyncio.sleep(0.01) +- assert time() < start + 5 + + + @gen_cluster() +@@ -827,7 +821,7 @@ async def test_scheduler_sees_memory_lim + await w.close() + + +-@gen_cluster(client=True, timeout=1000) ++@gen_cluster(client=True) + async def test_retire_workers(c, s, a, b): + [x] = await c.scatter([1], workers=a.address) + [y] = await c.scatter([list(range(1000))], workers=b.address) +@@ -929,13 +923,10 @@ async def test_retire_workers_no_suspici + + + @pytest.mark.slow +-@pytest.mark.skipif( +- sys.platform.startswith("win"), reason="file descriptors not really a thing" +-) +-@gen_cluster(client=True, nthreads=[], timeout=240) ++@pytest.mark.skipif(WINDOWS, reason="num_fds not supported on windows") ++@gen_cluster(client=True, nthreads=[], timeout=60) + async def test_file_descriptors(c, s): + await asyncio.sleep(0.1) +- psutil = pytest.importorskip("psutil") + da = pytest.importorskip("dask.array") + proc = psutil.Process() + num_fds_1 = proc.num_fds() +@@ -980,10 +971,8 @@ async def test_file_descriptors(c, s): + assert comm.closed() or comm.peer_address != s.address, comm + assert not s.stream_comms + +- start = time() + while proc.num_fds() > num_fds_1 + N: + await asyncio.sleep(0.01) +- assert time() < start + 3 + + + @pytest.mark.slow +@@ -1325,36 +1314,30 @@ async def test_non_existent_worker(c, s) + async def test_correct_bad_time_estimate(c, s, *workers): + future = c.submit(slowinc, 1, delay=0) + await wait(future) +- + futures = [c.submit(slowinc, future, delay=0.1, pure=False) for i in range(20)] +- + await asyncio.sleep(0.5) +- + await wait(futures) +- + assert all(w.data for w in workers), [sorted(w.data) for w in workers] + + +-@gen_test() +-async def test_service_hosts(): +- port = 0 +- for url, expected in [ +- ("tcp://0.0.0.0", ("::", "0.0.0.0")), +- ("tcp://127.0.0.1", ("::", "0.0.0.0")), +- ("tcp://127.0.0.1:38275", ("::", "0.0.0.0")), +- ]: +- async with Scheduler(host=url) as s: +- sock = first(s.http_server._sockets.values()) +- if isinstance(expected, tuple): +- assert sock.getsockname()[0] in expected +- else: +- assert sock.getsockname()[0] == expected +- +- port = ("127.0.0.1", 0) +- for url in ["tcp://0.0.0.0", "tcp://127.0.0.1", "tcp://127.0.0.1:38275"]: +- async with Scheduler(dashboard_address="127.0.0.1:0", host=url) as s: +- sock = first(s.http_server._sockets.values()) +- assert sock.getsockname()[0] == "127.0.0.1" ++@pytest.mark.parametrize( ++ "host", ["tcp://0.0.0.0", "tcp://127.0.0.1", "tcp://127.0.0.1:38275"] ++) ++@pytest.mark.parametrize( ++ "dashboard_address,expect", ++ [ ++ (None, ("::", "0.0.0.0")), ++ ("127.0.0.1:0", ("127.0.0.1",)), ++ ], ++) ++@pytest.mark.asyncio ++async def test_dashboard_host(host, dashboard_address, expect): ++ """Dashboard is accessible from any host by default, but it can be also bound to ++ localhost. ++ """ ++ async with Scheduler(host=host, dashboard_address=dashboard_address) as s: ++ sock = first(s.http_server._sockets.values()) ++ assert sock.getsockname()[0] in expect + + + @gen_cluster(client=True, worker_kwargs={"profile_cycle_interval": "100ms"}) +@@ -1543,9 +1526,6 @@ async def test_retries(c, s, a, b): + exc_info.match("one") + + +-@pytest.mark.flaky( +- reruns=10, reruns_delay=5, reason="second worker also errant for some reason" +-) + @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3) + async def test_missing_data_errant_worker(c, s, w1, w2, w3): + with dask.config.set({"distributed.comm.timeouts.connect": "1s"}): +@@ -1779,7 +1759,7 @@ async def test_bandwidth(c, s, a, b): + assert not s.bandwidth_workers + + +-@gen_cluster(client=True, Worker=Nanny) ++@gen_cluster(client=True, Worker=Nanny, timeout=60) + async def test_bandwidth_clear(c, s, a, b): + np = pytest.importorskip("numpy") + x = c.submit(np.arange, 1000000, workers=[a.worker_address], pure=False) +@@ -1821,9 +1801,7 @@ async def test_close_workers(s, a, b): + assert b.status == Status.closed + + +-@pytest.mark.skipif( +- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" +-) ++@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") + @gen_test() + async def test_host_address(): + s = await Scheduler(host="127.0.0.2", port=0) +@@ -2361,7 +2339,7 @@ async def test_unknown_task_duration_con + assert s.idle_since == s.time_started + + +-@gen_cluster(client=True, timeout=None) ++@gen_cluster(client=True) + async def test_retire_state_change(c, s, a, b): + np = pytest.importorskip("numpy") + y = c.map(lambda x: x ** 2, range(10)) +@@ -2482,8 +2460,8 @@ async def assert_memory(scheduler_or_wor + t0 = time() + while True: + minfo = scheduler_or_workerstate.memory +- nbytes = getattr(minfo, attr) +- if min_ * 2 ** 20 <= nbytes <= max_ * 2 ** 20: ++ nmib = getattr(minfo, attr) / 2 ** 20 ++ if min_ <= nmib <= max_: + return + if time() - t0 > timeout: + raise TimeoutError( +@@ -2492,13 +2470,12 @@ async def assert_memory(scheduler_or_wor + await asyncio.sleep(0.1) + + +-# This test is heavily influenced by hard-to-control factors such as memory management +-# by the Python interpreter and the OS, so it occasionally glitches +-@pytest.mark.flaky(reruns=3, reruns_delay=5) +-# ~33s runtime, or distributed.memory.recent-to-old-time + 3s ++# ~31s runtime, or distributed.worker.memory.recent-to-old-time + 1s. ++# On Windows, it can take ~65s due to worker memory needing to stabilize first. + @pytest.mark.slow ++@pytest.mark.flaky(condition=LINUX, reason="see comments", reruns=10, reruns_delay=5) + @gen_cluster( +- client=True, Worker=Nanny, worker_kwargs={"memory_limit": "500 MiB"}, timeout=60 ++ client=True, Worker=Nanny, worker_kwargs={"memory_limit": "500 MiB"}, timeout=120 + ) + async def test_memory(c, s, *_): + pytest.importorskip("zict") +@@ -2511,14 +2488,25 @@ async def test_memory(c, s, *_): + assert s_m0.managed == 0 + assert a.memory.managed == 0 + assert b.memory.managed == 0 +- # When a worker first goes online, its RAM is immediately counted as +- # unmanaged_old +- await assert_memory(s, "unmanaged_recent", 0, 40, timeout=0) +- await assert_memory(a, "unmanaged_recent", 0, 20, timeout=0) +- await assert_memory(b, "unmanaged_recent", 0, 20, timeout=0) + +- f1 = c.submit(leaking, 100, 50, 5, pure=False, workers=[a.name]) +- f2 = c.submit(leaking, 100, 50, 5, pure=False, workers=[b.name]) ++ # When a worker first goes online, its RAM is immediately counted as unmanaged_old. ++ # On Windows, however, there is somehow enough time between the worker start and ++ # this line for 2 heartbeats and the memory keeps growing substantially for a while. ++ # Sometimes there is a single heartbeat but on the consecutive test we observe ++ # a large unexplained increase in unmanaged_recent memory. ++ # Wait for the situation to stabilize. ++ if WINDOWS: ++ await asyncio.sleep(10) ++ initial_timeout = 40 ++ else: ++ initial_timeout = 0 ++ ++ await assert_memory(s, "unmanaged_recent", 0, 40, timeout=initial_timeout) ++ await assert_memory(a, "unmanaged_recent", 0, 20, timeout=initial_timeout) ++ await assert_memory(b, "unmanaged_recent", 0, 20, timeout=initial_timeout) ++ ++ f1 = c.submit(leaking, 100, 50, 10, pure=False, workers=[a.name]) ++ f2 = c.submit(leaking, 100, 50, 10, pure=False, workers=[b.name]) + await assert_memory(s, "unmanaged_recent", 300, 380) + await assert_memory(a, "unmanaged_recent", 150, 190) + await assert_memory(b, "unmanaged_recent", 150, 190) +@@ -2533,19 +2521,17 @@ async def test_memory(c, s, *_): + await assert_memory(b, "unmanaged_recent", 50, 90) + + # Force the output of f1 and f2 to spill to disk. +- # With target=0.6 and memory_limit=500 MiB, we'll start spilling at 300 MiB +- # process memory per worker, or roughly after 3~7 rounds of the below depending +- # on how much RAM the interpreter is using. ++ # With spill=0.7 and memory_limit=500 MiB, we'll start spilling at 350 MiB process ++ # memory per worker, or up to 20 iterations of the below depending on how much RAM ++ # the interpreter is using. + more_futs = [] +- for _ in range(8): +- if s.memory.managed_spilled > 0: +- break +- more_futs += [ +- c.submit(leaking, 20, 0, 0, pure=False, workers=[a.name]), +- c.submit(leaking, 20, 0, 0, pure=False, workers=[b.name]), +- ] +- await asyncio.sleep(2) +- await assert_memory(s, "managed_spilled", 1, 999) ++ while not s.memory.managed_spilled: ++ if a.memory.process < 0.7 * 500 * 2 ** 20: ++ more_futs.append(c.submit(leaking, 10, 0, 0, pure=False, workers=[a.name])) ++ if b.memory.process < 0.7 * 500 * 2 ** 20: ++ more_futs.append(c.submit(leaking, 10, 0, 0, pure=False, workers=[b.name])) ++ await wait(more_futs) ++ await asyncio.sleep(1) + + # Wait for the spilling to finish. Note that this does not make the test take + # longer as we're waiting for recent-to-old-time anyway. +@@ -2568,24 +2554,23 @@ async def test_memory(c, s, *_): + # transition into unmanaged_old + await c.run(gc.collect) + await assert_memory(s, "unmanaged_recent", 0, 90, timeout=40) +- await assert_memory( +- s, +- "unmanaged_old", +- orig_old + 90, +- # On MacOS, the process memory of the Python interpreter does not shrink as +- # fast as on Linux/Windows +- 9999 if MACOS else orig_old + 190, +- timeout=40, +- ) +- +- # When the leaked memory is cleared, unmanaged and unmanaged_old drop +- # On MacOS, the process memory of the Python interpreter does not shrink as fast +- # as on Linux/Windows +- if not MACOS: +- await c.run(clear_leak) +- await assert_memory(s, "unmanaged", 0, orig_unmanaged + 95) +- await assert_memory(s, "unmanaged_old", 0, orig_old + 95) +- await assert_memory(s, "unmanaged_recent", 0, 90) ++ await assert_memory(s, "unmanaged_old", orig_old + 90, 9999, timeout=40) ++ ++ # When the leaked memory is cleared, unmanaged and unmanaged_old drop. ++ # On MacOS and Windows, the process memory of the Python interpreter does not shrink ++ # as fast as on Linux. Note that this behaviour is heavily impacted by OS tweaks, ++ # meaning that what you observe on your local host may behave differently on CI. ++ # Even on Linux, this occasionally glitches - hence why there is a flaky marker on ++ # this test. ++ if not LINUX: ++ return ++ ++ orig_unmanaged = s.memory.unmanaged / 2 ** 20 ++ orig_old = s.memory.unmanaged_old / 2 ** 20 ++ await c.run(clear_leak) ++ await assert_memory(s, "unmanaged", 0, orig_unmanaged - 60) ++ await assert_memory(s, "unmanaged_old", 0, orig_old - 60) ++ await assert_memory(s, "unmanaged_recent", 0, 90) + + + @gen_cluster(client=True, worker_kwargs={"memory_limit": 0}) +@@ -2825,23 +2810,20 @@ async def test_rebalance_no_limit(c, s, + worker_kwargs={"memory_limit": "1000 MiB"}, + config={ + "distributed.worker.memory.rebalance.measure": "managed", +- "distributed.worker.memory.rebalance.recipient-max": 0.4, ++ "distributed.worker.memory.rebalance.sender-min": 0.2, ++ "distributed.worker.memory.rebalance.recipient-max": 0.1, + }, + ) + async def test_rebalance_no_recipients(c, s, *_): + """There are sender workers, but no recipient workers""" + a, b = s.workers +- futures = [ +- c.submit(lambda: "x" * (400 * 2 ** 20), pure=False, workers=[a]), # 40% +- c.submit(lambda: "x" * (400 * 2 ** 20), pure=False, workers=[b]), # 40% +- ] + c.map( +- lambda _: "x" * (2 ** 21), range(100), workers=[a] +- ) # 20% +- await wait(futures) +- await assert_memory(s, "managed", 1000, 1001) +- await assert_ndata(c, {a: 101, b: 1}) ++ fut_a = c.map(lambda _: "x" * (2 ** 20), range(250), workers=[a]) # 25% ++ fut_b = c.map(lambda _: "x" * (2 ** 20), range(100), workers=[b]) # 10% ++ await wait(fut_a + fut_b) ++ await assert_memory(s, "managed", 350, 351) ++ await assert_ndata(c, {a: 250, b: 100}) + await s.rebalance() +- await assert_ndata(c, {a: 101, b: 1}) ++ await assert_ndata(c, {a: 250, b: 100}) + s.validate_state() + + +Index: distributed-2021.7.0/distributed/tests/test_semaphore.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/tests/test_semaphore.py ++++ distributed-2021.7.0/distributed/tests/test_semaphore.py +@@ -11,7 +11,6 @@ from dask.distributed import Client + + from distributed import Semaphore, fire_and_forget + from distributed.comm import Comm +-from distributed.compatibility import WINDOWS + from distributed.core import ConnectionPool + from distributed.metrics import time + from distributed.utils_test import captured_logger, cluster, gen_cluster, slowidentity +@@ -91,21 +90,17 @@ def test_timeout_sync(client): + + + @gen_cluster( +- client=True, +- timeout=20, + config={ + "distributed.scheduler.locks.lease-validation-interval": "200ms", + "distributed.scheduler.locks.lease-timeout": "200ms", + }, + ) +-async def test_release_semaphore_after_timeout(c, s, a, b): ++async def test_release_semaphore_after_timeout(s, a, b): + sem = await Semaphore(name="x", max_leases=2) + await sem.acquire() # leases: 2 - 1 = 1 + + semB = await Semaphore(name="x", max_leases=2) +- + assert await semB.acquire() # leases: 1 - 1 = 0 +- + assert not (await sem.acquire(timeout=0.01)) + assert not (await semB.acquire(timeout=0.01)) + +@@ -114,9 +109,7 @@ async def test_release_semaphore_after_t + + semB.refresh_callback.stop() + del semB +- + assert await sem.acquire(timeout=1) +- + assert not (await sem.acquire(timeout=0.1)) + + +@@ -564,7 +557,6 @@ async def test_release_retry(c, s, a, b) + assert await semaphore.release() is True + + +-@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=WINDOWS) + @gen_cluster( + client=True, + config={ +Index: distributed-2021.7.0/distributed/tests/test_steal.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/tests/test_steal.py ++++ distributed-2021.7.0/distributed/tests/test_steal.py +@@ -2,7 +2,6 @@ import asyncio + import itertools + import logging + import random +-import sys + import weakref + from operator import mul + from time import sleep +@@ -13,6 +12,7 @@ from tlz import concat, sliding_window + import dask + + from distributed import Nanny, Worker, wait, worker_client ++from distributed.compatibility import LINUX + from distributed.config import config + from distributed.metrics import time + from distributed.scheduler import key_split +@@ -33,9 +33,7 @@ setup_module = nodebug_setup_module + teardown_module = nodebug_teardown_module + + +-@pytest.mark.skipif( +- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" +-) ++@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") + @gen_cluster(client=True, nthreads=[("127.0.0.1", 2), ("127.0.0.2", 2)]) + async def test_work_stealing(c, s, a, b): + [x] = await c._scatter([1], workers=a.address) +@@ -144,7 +142,7 @@ async def test_steal_related_tasks(e, s, + assert nearby > 10 + + +-@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 10, timeout=1000) ++@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 10) + async def test_dont_steal_fast_tasks_compute_time(c, s, *workers): + def do_nothing(x, y=None): + pass +@@ -285,9 +283,7 @@ async def test_steal_worker_restrictions + assert len(wc.tasks) == 0 + + +-@pytest.mark.skipif( +- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" +-) ++@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") + @gen_cluster(client=True, nthreads=[("127.0.0.1", 1), ("127.0.0.2", 1)]) + async def test_dont_steal_host_restrictions(c, s, a, b): + future = c.submit(slowinc, 1, delay=0.10, workers=a.address) +@@ -306,9 +302,7 @@ async def test_dont_steal_host_restricti + assert len(b.tasks) == 0 + + +-@pytest.mark.skipif( +- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" +-) ++@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") + @gen_cluster(client=True, nthreads=[("127.0.0.1", 1), ("127.0.0.2", 2)]) + async def test_steal_host_restrictions(c, s, wa, wb): + future = c.submit(slowinc, 1, delay=0.10, workers=wa.address) +@@ -354,9 +348,7 @@ async def test_dont_steal_resource_restr + assert len(b.tasks) == 0 + + +-@gen_cluster( +- client=True, nthreads=[("127.0.0.1", 1, {"resources": {"A": 2}})], timeout=3 +-) ++@gen_cluster(client=True, nthreads=[("127.0.0.1", 1, {"resources": {"A": 2}})]) + async def test_steal_resource_restrictions(c, s, a): + future = c.submit(slowinc, 1, delay=0.10, workers=a.address) + await future +@@ -601,7 +593,7 @@ def test_balance(inp, expected): + test() + + +-@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 2, Worker=Nanny) ++@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 2, Worker=Nanny, timeout=60) + async def test_restart(c, s, a, b): + futures = c.map( + slowinc, range(100), delay=0.1, workers=a.address, allow_other_workers=True +@@ -613,7 +605,7 @@ async def test_restart(c, s, a, b): + assert any(st for st in steal.stealable_all) + assert any(x for L in steal.stealable.values() for x in L) + +- await c.restart(timeout=10) ++ await c.restart() + + assert not any(x for x in steal.stealable_all) + assert not any(x for L in steal.stealable.values() for x in L) +@@ -740,7 +732,6 @@ async def test_dont_steal_long_running_t + ) <= 1 + + +-@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=sys.version_info[:2] == (3, 8)) + @gen_cluster(client=True, nthreads=[("127.0.0.1", 5)] * 2) + async def test_cleanup_repeated_tasks(c, s, a, b): + class Foo: +Index: distributed-2021.7.0/distributed/tests/test_stress.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/tests/test_stress.py ++++ distributed-2021.7.0/distributed/tests/test_stress.py +@@ -1,6 +1,5 @@ + import asyncio + import random +-import sys + from contextlib import suppress + from operator import add + from time import sleep +@@ -11,9 +10,10 @@ from tlz import concat, sliding_window + from dask import delayed + + from distributed import Client, Nanny, wait ++from distributed.compatibility import WINDOWS + from distributed.config import config + from distributed.metrics import time +-from distributed.utils import All, CancelledError ++from distributed.utils import CancelledError + from distributed.utils_test import ( + bump_rlimit, + cluster, +@@ -43,6 +43,7 @@ async def test_stress_1(c, s, a, b): + assert result == sum(map(inc, range(n))) + + ++@pytest.mark.slow + @pytest.mark.parametrize(("func", "n"), [(slowinc, 100), (inc, 1000)]) + def test_stress_gc(loop, func, n): + with cluster() as (s, [a, b]): +@@ -54,10 +55,8 @@ def test_stress_gc(loop, func, n): + assert x.result() == n + 2 + + +-@pytest.mark.skipif( +- sys.platform.startswith("win"), reason="test can leave dangling RPC objects" +-) +-@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 8, timeout=None) ++@pytest.mark.skipif(WINDOWS, reason="test can leave dangling RPC objects") ++@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 8) + async def test_cancel_stress(c, s, *workers): + da = pytest.importorskip("dask.array") + x = da.random.random((50, 50), chunks=(2, 2)) +@@ -86,28 +85,33 @@ def test_cancel_stress_sync(loop): + c.cancel(f) + + +-@gen_cluster(nthreads=[], client=True, timeout=None) ++@pytest.mark.slow ++@gen_cluster( ++ nthreads=[], ++ client=True, ++ timeout=120, ++ scheduler_kwargs={"allowed_failures": 100_000}, ++) + async def test_stress_creation_and_deletion(c, s): + # Assertions are handled by the validate mechanism in the scheduler +- s.allowed_failures = 100000 + da = pytest.importorskip("dask.array") + +- x = da.random.random(size=(2000, 2000), chunks=(100, 100)) +- y = (x + 1).T + (x * 2) - x.mean(axis=1) +- ++ rng = da.random.RandomState(0) ++ x = rng.random(size=(2000, 2000), chunks=(100, 100)) ++ y = ((x + 1).T + (x * 2) - x.mean(axis=1)).sum().round(2) + z = c.persist(y) + + async def create_and_destroy_worker(delay): + start = time() + while time() < start + 5: +- n = await Nanny(s.address, nthreads=2, loop=s.loop) +- await asyncio.sleep(delay) +- await n.close() ++ async with Nanny(s.address, nthreads=2): ++ await asyncio.sleep(delay) + print("Killed nanny") + +- await asyncio.wait_for( +- All([create_and_destroy_worker(0.1 * i) for i in range(20)]), 60 +- ) ++ await asyncio.gather(*(create_and_destroy_worker(0.1 * i) for i in range(20))) ++ ++ async with Nanny(s.address, nthreads=2): ++ assert await c.compute(z) == 8000884.93 + + + @gen_cluster(nthreads=[("127.0.0.1", 1)] * 10, client=True, timeout=60) +@@ -169,6 +173,7 @@ def vsum(*args): + + @pytest.mark.avoid_ci + @pytest.mark.slow ++@pytest.mark.timeout(1100) # Override timeout from setup.cfg + @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 80, timeout=1000) + async def test_stress_communication(c, s, *workers): + s.validate = False # very slow otherwise +Index: distributed-2021.7.0/distributed/tests/test_tls_functional.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/tests/test_tls_functional.py ++++ distributed-2021.7.0/distributed/tests/test_tls_functional.py +@@ -42,7 +42,7 @@ async def test_Queue(c, s, a, b): + assert future.key == future2.key + + +-@gen_tls_cluster(client=True, timeout=None) ++@gen_tls_cluster(client=True) + async def test_client_submit(c, s, a, b): + assert s.address.startswith("tls://") + +Index: distributed-2021.7.0/distributed/tests/test_utils.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/tests/test_utils.py ++++ distributed-2021.7.0/distributed/tests/test_utils.py +@@ -4,7 +4,6 @@ import io + import os + import queue + import socket +-import sys + import traceback + from time import sleep + +@@ -13,6 +12,7 @@ from tornado.ioloop import IOLoop + + import dask + ++from distributed.compatibility import MACOS, WINDOWS + from distributed.metrics import time + from distributed.utils import ( + LRU, +@@ -140,23 +140,12 @@ def test_ensure_ip(): + assert ensure_ip("::1") == "::1" + + ++@pytest.mark.skipif(WINDOWS, reason="TODO") + def test_get_ip_interface(): +- if sys.platform == "darwin": +- assert get_ip_interface("lo0") == "127.0.0.1" +- elif sys.platform.startswith("linux"): +- assert get_ip_interface("lo") == "127.0.0.1" +- else: +- pytest.skip(f"test needs to be enhanced for platform {sys.platform!r}") +- +- non_existent_interface = "__non-existent-interface" +- expected_error_message = f"{non_existent_interface!r}.+network interface.+" +- +- if sys.platform == "darwin": +- expected_error_message += "'lo0'" +- elif sys.platform.startswith("linux"): +- expected_error_message += "'lo'" +- with pytest.raises(ValueError, match=expected_error_message): +- get_ip_interface(non_existent_interface) ++ iface = "lo0" if MACOS else "lo" ++ assert get_ip_interface(iface) == "127.0.0.1" ++ with pytest.raises(ValueError, match=f"'__notexist'.+network interface.+'{iface}'"): ++ get_ip_interface("__notexist") + + + def test_truncate_exception(): +Index: distributed-2021.7.0/distributed/tests/test_utils_test.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/tests/test_utils_test.py ++++ distributed-2021.7.0/distributed/tests/test_utils_test.py +@@ -141,7 +141,7 @@ def test_gen_cluster_cleans_up_client(lo + assert not dask.config.get("get", None) + + +-@gen_cluster(client=False) ++@gen_cluster() + async def test_gen_cluster_without_client(s, a, b): + assert isinstance(s, Scheduler) + for w in [a, b]: +Index: distributed-2021.7.0/distributed/tests/test_variable.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/tests/test_variable.py ++++ distributed-2021.7.0/distributed/tests/test_variable.py +@@ -191,7 +191,7 @@ async def test_timeout_get(c, s, a, b): + + + @pytest.mark.slow +-@gen_cluster(client=True, nthreads=[("127.0.0.1", 2)] * 5, Worker=Nanny, timeout=None) ++@gen_cluster(client=True, nthreads=[("127.0.0.1", 2)] * 5, Worker=Nanny) + async def test_race(c, s, *workers): + NITERS = 50 + +@@ -216,10 +216,8 @@ async def test_race(c, s, *workers): + results = await c.gather(futures) + assert all(r > NITERS * 0.8 for r in results) + +- start = time() + while len(s.wants_what["variable-x"]) != 1: + await asyncio.sleep(0.01) +- assert time() - start < 2 + + + @gen_cluster(client=True) +Index: distributed-2021.7.0/distributed/tests/test_worker.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/tests/test_worker.py ++++ distributed-2021.7.0/distributed/tests/test_worker.py +@@ -18,7 +18,6 @@ from tlz import first, pluck, sliding_wi + import dask + from dask import delayed + from dask.system import CPU_COUNT +-from dask.utils import format_bytes + + from distributed import ( + Client, +@@ -31,7 +30,7 @@ from distributed import ( + ) + from distributed.comm.registry import backends + from distributed.comm.tcp import TCPBackend +-from distributed.compatibility import MACOS, WINDOWS ++from distributed.compatibility import LINUX, MACOS, WINDOWS + from distributed.core import CommClosedError, Status, rpc + from distributed.diagnostics.plugin import PipInstall + from distributed.metrics import time +@@ -780,7 +779,7 @@ async def test_hold_onto_dependents(c, s + + + @pytest.mark.slow +-@gen_cluster(client=False, nthreads=[]) ++@gen_cluster(nthreads=[]) + async def test_worker_death_timeout(s): + with dask.config.set({"distributed.comm.timeouts.connect": "1s"}): + await s.close() +@@ -1005,27 +1004,17 @@ async def test_global_workers(s, a, b): + assert w is a or w is b + + +-@pytest.mark.skipif(WINDOWS, reason="file descriptors") ++@pytest.mark.skipif(WINDOWS, reason="num_fds not supported on windows") + @gen_cluster(nthreads=[]) + async def test_worker_fds(s): +- psutil = pytest.importorskip("psutil") +- await asyncio.sleep(0.05) +- start = psutil.Process().num_fds() +- +- worker = await Worker(s.address, loop=s.loop) +- await asyncio.sleep(0.1) +- middle = psutil.Process().num_fds() +- start = time() +- while middle > start: +- await asyncio.sleep(0.01) +- assert time() < start + 1 ++ proc = psutil.Process() ++ before = psutil.Process().num_fds() + +- await worker.close() ++ async with Worker(s.address, loop=s.loop): ++ assert proc.num_fds() > before + +- start = time() +- while psutil.Process().num_fds() > start: ++ while proc.num_fds() > before: + await asyncio.sleep(0.01) +- assert time() < start + 0.5 + + + @gen_cluster(nthreads=[]) +@@ -1064,13 +1053,13 @@ async def test_scheduler_file(): + @gen_cluster(client=True) + async def test_scheduler_delay(c, s, a, b): + old = a.scheduler_delay +- assert abs(a.scheduler_delay) < 0.3 +- assert abs(b.scheduler_delay) < 0.3 +- await asyncio.sleep(a.periodic_callbacks["heartbeat"].callback_time / 1000 + 0.3) ++ assert abs(a.scheduler_delay) < 0.6 ++ assert abs(b.scheduler_delay) < 0.6 ++ await asyncio.sleep(a.periodic_callbacks["heartbeat"].callback_time / 1000 + 0.6) + assert a.scheduler_delay != old + + +-@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=MACOS) ++@pytest.mark.flaky(reruns=10, reruns_delay=5) + @gen_cluster(client=True) + async def test_statistical_profiling(c, s, a, b): + futures = c.map(slowinc, range(10), delay=0.1) +@@ -1134,7 +1123,6 @@ async def test_robust_to_bad_sizeof_esti + + + @pytest.mark.slow +-@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=sys.version_info[:2] == (3, 8)) + @gen_cluster( + nthreads=[("127.0.0.1", 2)], + client=True, +@@ -1144,7 +1132,6 @@ async def test_robust_to_bad_sizeof_esti + "memory_target_fraction": False, + "memory_pause_fraction": 0.5, + }, +- timeout=20, + ) + async def test_pause_executor(c, s, a): + memory = psutil.Process().memory_info().rss +@@ -1159,14 +1146,9 @@ async def test_pause_executor(c, s, a): + future = c.submit(f) + futures = c.map(slowinc, range(30), delay=0.1) + +- start = time() + while not a.paused: + await asyncio.sleep(0.01) +- assert time() < start + 4, ( +- format_bytes(psutil.Process().memory_info().rss), +- format_bytes(a.memory_limit), +- len(a.data), +- ) ++ + out = logger.getvalue() + assert "memory" in out.lower() + assert "pausing" in out.lower() +@@ -1312,9 +1294,7 @@ async def test_wait_for_outgoing(c, s, a + assert 1 / 3 < ratio < 3 + + +-@pytest.mark.skipif( +- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" +-) ++@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") + @gen_cluster( + nthreads=[("127.0.0.1", 1), ("127.0.0.1", 1), ("127.0.0.2", 1)], client=True + ) +@@ -1467,9 +1447,7 @@ async def test_local_directory_make_new_ + assert "dask-worker-space" in w.local_directory + + +-@pytest.mark.skipif( +- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" +-) ++@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") + @gen_cluster(nthreads=[], client=True) + async def test_host_address(c, s): + w = await Worker(s.address, host="127.0.0.2") +@@ -2058,7 +2036,7 @@ async def test_worker_state_error_releas + res = c.submit(raise_exc, f, g, workers=[a.address]) + + with pytest.raises(RuntimeError): +- await res.result() ++ await res + + # Nothing bad happened on B, therefore B should hold on to G + assert len(b.tasks) == 1 +@@ -2124,7 +2102,7 @@ async def test_worker_state_error_releas + res = c.submit(raise_exc, f, g, workers=[a.address]) + + with pytest.raises(RuntimeError): +- await res.result() ++ await res + + # Nothing bad happened on B, therefore B should hold on to G + assert len(b.tasks) == 1 +@@ -2190,7 +2168,7 @@ async def test_worker_state_error_releas + res = c.submit(raise_exc, f, g, workers=[a.address]) + + with pytest.raises(RuntimeError): +- await res.result() ++ await res + + # Nothing bad happened on B, therefore B should hold on to G + assert len(b.tasks) == 1 +@@ -2251,7 +2229,7 @@ async def test_worker_state_error_long_c + ) + + with pytest.raises(RuntimeError): +- await res.result() ++ await res + + expected_states_A = { + f.key: "memory", +@@ -2319,7 +2297,7 @@ async def test_worker_state_error_long_c + await asyncio.sleep(0.01) + + +-@gen_cluster(client=True, nthreads=[("127.0.0.1", x) for x in range(4)], timeout=None) ++@gen_cluster(client=True, nthreads=[("127.0.0.1", x) for x in range(4)]) + async def test_hold_on_to_replicas(c, s, *workers): + f1 = c.submit(inc, 1, workers=[workers[0].address], key="f1") + f2 = c.submit(inc, 2, workers=[workers[1].address], key="f2") +Index: distributed-2021.7.0/distributed/utils_test.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/utils_test.py ++++ distributed-2021.7.0/distributed/utils_test.py +@@ -764,6 +764,10 @@ def gen_test(timeout=_TEST_TIMEOUT): + async def test_foo(): + await ... # use tornado coroutines + """ ++ assert timeout, ( ++ "timeout should always be set and it should be smaller than the global one from" ++ "pytest-timeout" ++ ) + + def _(func): + def test_func(): +@@ -809,8 +813,6 @@ async def start_cluster( + ) + for i, ncore in enumerate(nthreads) + ] +- # for w in workers: +- # w.rpc = workers[0].rpc + + await asyncio.gather(*workers) + +@@ -875,12 +877,16 @@ def gen_cluster( + start + end + """ ++ assert timeout, ( ++ "timeout should always be set and it should be smaller than the global one from" ++ "pytest-timeout" ++ ) + if ncores is not None: + warnings.warn("ncores= has moved to nthreads=", stacklevel=2) + nthreads = ncores + + worker_kwargs = merge( +- {"memory_limit": system.MEMORY_LIMIT, "death_timeout": 10}, worker_kwargs ++ {"memory_limit": system.MEMORY_LIMIT, "death_timeout": 15}, worker_kwargs + ) + + def _(func): +@@ -930,8 +936,7 @@ def gen_cluster( + args = [c] + args + try: + future = func(*args, *outer_args, **kwargs) +- if timeout: +- future = asyncio.wait_for(future, timeout) ++ future = asyncio.wait_for(future, timeout) + result = await future + if s.validate: + s.validate_state() +Index: distributed-2021.7.0/distributed/worker.py +=================================================================== +--- distributed-2021.7.0.orig/distributed/worker.py ++++ distributed-2021.7.0/distributed/worker.py +@@ -1217,7 +1217,7 @@ class Worker(ServerNode): + return self.close(*args, **kwargs) + + async def close( +- self, report=True, timeout=10, nanny=True, executor_wait=True, safe=False ++ self, report=True, timeout=30, nanny=True, executor_wait=True, safe=False + ): + with log_errors(): + if self.status in (Status.closed, Status.closing): +Index: distributed-2021.7.0/setup.cfg +=================================================================== +--- distributed-2021.7.0.orig/setup.cfg ++++ distributed-2021.7.0/setup.cfg +@@ -38,10 +38,9 @@ markers = + slow: marks tests as slow (deselect with '-m "not slow"') + avoid_ci: marks tests as flaky on CI on all OSs + ipython: marks tests as exercising IPython +-timeout_method = thread ++timeout_method = signal + timeout = 300 + + [egg_info] + tag_build = + tag_date = 0 +- diff --git a/python-distributed.changes b/python-distributed.changes index b9853c3..579452a 100644 --- a/python-distributed.changes +++ b/python-distributed.changes @@ -1,3 +1,117 @@ +------------------------------------------------------------------- +Fri Jul 16 09:31:13 UTC 2021 - Ben Greiner + +- Update to version 2021.7.0 + * Fix Nbytes jitter - less expensive + * Use native GH actions cancel feature + * Don't require workers to report to scheduler if scheduler + shuts down + * Add pandas to the list of checked packages for client. + get_versions() + * Move worker preload before scheduler address is set + * Fix flaky test_oversubscribing_leases + * Update scheduling policy docs for #4967 + * Add echo handler to Server class + * Also include pngs when bundling package + * Remove duplicated dashboard panes + * Fix worker memory dashboard flickering + * Tabs on bottom left corner on dashboard + * Rename nbytes widgets + * Co-assign root-ish tasks + * OSError tweaks + * Update imports to cudf.testing._utils + * Ensure shuffle split default durations uses proper prefix + * Follow up pyupgrade formatting + * Rename plot dropdown + * Pyupgrade + * Misc Sphinx tweaks + * No longer hold dependencies of erred tasks in memory + * Add maximum shard size to config + * Ensure shuffle split operations are blacklisted from work + stealing + * Add dropdown menu to access individual plots + * Edited the path to scheduler.py + * Task Group Graph Visualization + * Remove more internal references to deprecated utilities + * Restructure nbytes hover + * Except more errors in pynvml.nvmlInit() + * Add occupancy as individual plot + * Deprecate utilities which have moved to dask + * Ensure connectionpool does not leave comms if closed mid + connect + * Add support for registering scheduler plugins from Client + * Stealing dashboard fixes + * Allow requirements verification to be ignored when loading + backends from entrypoints + * Add Log and Logs to API docs + * Support fixtures and pytest.mark.parametrize with gen_cluster +- Release 2021.06.2 + * Revert refactor to utils.Log[s] and Cluster.get_logs + * Use deprecation utility from Dask + * Add transition counter to Scheduler + * Remove nbytes_in_memory +- Release 2021.06.1 + * Fix deadlock in handle_missing_dep if additional replicas are + available + * Add configuration to enable/disable NVML diagnostics + * Add scheduler log tab to performance reports + * Add HTML repr to scheduler_info and incorporate into client + and cluster reprs + * Fix error state typo + * Allow actor exceptions to propagate + * Remove importing apply from dask.compatibility + * Use more informative default name for WorkerPlugin s + * Removed unused utility functions + * Locally rerun successfully completed futures + * Forget erred tasks and fix deadlocks on worker + * Handle HTTPClientError in websocket connector + * Update dask_cuda usage in SSHCluster docstring + * Remove tests for process_time and thread_time + * Flake8 config cleanup + * Don't strip scheduler protocol when determining host + * Add more documentation on memory management + * Add range_query tests to NVML test suite + * No longer cancel result future in async process when using + timeouts +- Release 2021.06.0 + * Multiple worker executors + * Ensure PyNVML works correctly when installed with no GPUs + * Show more in test summary + * Move SystemMonitor s GPU initialization back to constructor + * Mark test_server_comms_mark_active_handlers with pytest.mark.asyncio + * Who has has what html reprs v2 + * O(1) rebalance + * Ensure repr and eq for cluster always works +- Release 2021.05.1 + * Drop usage of WhoHas & WhatHas from Client + * Ensure adaptive scaling is properly awaited and closed + * Fix WhoHas/ HasWhat async usage + * Add HTML reprs for Client.who_has and Client.has_what + * Prevent accidentally starting multiple Worker s in the same + process + * Add system tab to performance reports + * Let servers close faster if there are no active handlers + * Fix UCX scrub config logging + * Ensure worker clients are closed + * Fix warning for attribute error when deleting a client + * Ensure exceptions are raised if workers are incorrectly started + * Update handling of UCX exceptions on endpoint closing + * Ensure busy workloads properly look up who_has + * Check distributed.scheduler.pickle in Scheduler.run_function + * Add performance_report to API docs + * Use dict _workers_dv in unordered use cases + * Bump pre-commit hook versions + * Do not mindlessly spawn workers when no memory limit is set + * test_memory to use gen_cluster + * Increase timeout of gen_test to 30s +- Work on the very flaky testsuite: + * Add missing conftest.py not packaged on PyPI + * Add distributed-pr5022-improve_ci.patch in the hope for better + stability -- gh#dask/distributed#5022 + * Do not use pytest-xdist +- Add Cython as runtime dep because the scheduler checks the + presence + ------------------------------------------------------------------- Tue May 18 10:12:19 UTC 2021 - Ben Greiner diff --git a/python-distributed.spec b/python-distributed.spec index 4a19a3d..f0b81cc 100644 --- a/python-distributed.spec +++ b/python-distributed.spec @@ -1,5 +1,5 @@ # -# spec file for package python-distributed-test +# spec file # # Copyright (c) 2021 SUSE LLC # @@ -24,23 +24,32 @@ %define psuffix %{nil} %bcond_with test %endif +%ifarch %{ix86} %{arm} +# cython optimizations not supported on 32-bit: https://github.com/dask/dask/issues/7489 +%bcond_with cythonize +%else +%bcond_without cythonize +%endif +%if %{with cythonize} +%define cythonize --with-cython +%endif %{?!python_module:%define python_module() python3-%{**}} %define skip_python2 1 %define skip_python36 1 -%ifnarch %{ix86} %{arm} -%define cythonize --with-cython -# cython optimizations not supported on 32-bit: https://github.com/dask/dask/issues/7489 -%endif +%define ghversiontag 2021.07.0 Name: python-distributed%{psuffix} # Note: please always update together with python-dask -Version: 2021.5.0 +Version: 2021.7.0 Release: 0 Summary: Library for distributed computing with Python License: BSD-3-Clause URL: https://distributed.readthedocs.io/en/latest/ Source: https://files.pythonhosted.org/packages/source/d/distributed/distributed-%{version}.tar.gz +# Missing in the PyPI package but needed for pytest fixtures. Note: One of the next releases will miss all of the tests. (gh#dask/distributed#5054) +Source1: https://github.com/dask/distributed/raw/%{ghversiontag}/conftest.py Source99: python-distributed-rpmlintrc -BuildRequires: %{python_module Cython} +# PATCH-FIX-UPSTREAM distributed-pr5022-improve_ci.patch -- gh#dask/distributed#5022 +Patch0: distributed-pr5022-improve_ci.patch BuildRequires: %{python_module base >= 3.7} BuildRequires: %{python_module setuptools} BuildRequires: fdupes @@ -57,6 +66,11 @@ Requires: python-tblib Requires: python-toolz >= 0.8.2 Requires: python-tornado >= 6.0.3 Requires: python-zict >= 0.1.3 +%if %{with cythonize} +BuildRequires: %{python_module Cython} +# the cythonized scheduler needs Cython also as runtime dep for some checks +Requires: python-Cython +%endif %if %{with test} BuildRequires: %{python_module PyYAML} BuildRequires: %{python_module bokeh} @@ -64,7 +78,6 @@ BuildRequires: %{python_module certifi} BuildRequires: %{python_module click >= 6.6} BuildRequires: %{python_module cloudpickle >= 1.5.0} BuildRequires: %{python_module dask-all = %{version}} -# need built extension BuildRequires: %{python_module distributed = %{version}} BuildRequires: %{python_module ipykernel} BuildRequires: %{python_module ipython} @@ -74,7 +87,6 @@ BuildRequires: %{python_module psutil} BuildRequires: %{python_module pytest-asyncio} BuildRequires: %{python_module pytest-rerunfailures} BuildRequires: %{python_module pytest-timeout} -BuildRequires: %{python_module pytest-xdist} BuildRequires: %{python_module pytest} BuildRequires: %{python_module requests} BuildRequires: %{python_module sortedcontainers} @@ -93,6 +105,7 @@ clusters. %prep %autosetup -p1 -n distributed-%{version} +cp %SOURCE1 . %build %if ! %{with test} @@ -115,142 +128,25 @@ chmod -x %{buildroot}%{$python_sitearch}/distributed/tests/test_utils_test.py %check # many tests from multiple files are broken by new pytest-asyncio # (see https://github.com/dask/distributed/pull/4212 and https://github.com/pytest-dev/pytest-asyncio/issues/168) -# as a proof build it with old pytest-asyncio and see these tests pass %if %{pkg_vcmp python3-pytest-asyncio >= 0.14} -donttest+=" or (test_asyncprocess and test_child_main_thread)" -donttest+=" or (test_asyncprocess and test_close)" -donttest+=" or (test_asyncprocess and test_exitcode)" -donttest+=" or (test_asyncprocess and test_num_fds)" -donttest+=" or (test_asyncprocess and test_performance_report)" -donttest+=" or (test_asyncprocess and test_signal)" -donttest+=" or (test_client and test_add_worker_after_task)" -donttest+=" or (test_client and test_bad_tasks_fail)" -donttest+=" or (test_client and test_futures_in_subgraphs)" -donttest+=" or (test_client and test_get_client)" -donttest+=" or (test_client and test_logs)" -donttest+=" or (test_client and test_lose_scattered_data)" -donttest+=" or (test_client and test_performance_report)" -donttest+=" or (test_client and test_quiet_client_close)" -donttest+=" or (test_client and test_repr_async)" -donttest+=" or (test_client and test_secede_balances)" -donttest+=" or (test_client and test_secede_simple)" -donttest+=" or (test_client and test_serialize_collections)" -donttest+=" or (test_client_executor and test_cancellation)" -donttest+=" or (test_client_loop and test_close_loop_sync)" -donttest+=" or (test_collections and test_sparse_arrays)" -donttest+=" or (test_events and test_event_on_workers)" -donttest+=" or (test_events and test_set_not_set_many_events)" -donttest+=" or (test_events and test_two_events_on_workers)" -donttest+=" or (test_failed_workers and test_broken_worker_during_computation)" -donttest+=" or (test_failed_workers and test_gather_then_submit_after_failed_workers)" -donttest+=" or (test_failed_workers and test_restart)" -donttest+=" or (test_failed_workers and test_worker_time_to_live)" -donttest+=" or (test_failed_workers and test_worker_who_has_clears_after_failed_connection)" -donttest+=" or (test_locks and test_lock)" -donttest+=" or (test_locks and test_serializable)" -donttest+=" or (test_nanny and test_mp_pool_worker_no_daemon)" -donttest+=" or (test_nanny and test_mp_process_worker_no_daemon)" -donttest+=" or (test_nanny and test_nanny)" -donttest+=" or (test_nanny and test_num_fds)" +donttest+=" or (test_client and test_get_client_functions_spawn_clusters)" donttest+=" or (test_preload and test_web_preload)" -donttest+=" or (test_profile and test_watch)" -donttest+=" or (test_publish and test_publish_simple)" -donttest+=" or (test_queues and test_2220)" -donttest+=" or (test_resources and test_prefer_constrained)" -donttest+=" or (test_scheduler and test_balance_many_workers)" -donttest+=" or (test_scheduler and test_bandwidth_clear)" -donttest+=" or (test_scheduler and test_dashboard_address)" -donttest+=" or (test_scheduler and test_dont_recompute_if_persisted)" -donttest+=" or (test_scheduler and test_file_descriptors)" -donttest+=" or (test_scheduler and test_gather_allow_worker_reconnect)" -donttest+=" or (test_scheduler and test_idle_timeout)" -donttest+=" or (test_scheduler and test_include_communication_in_occupancy)" -donttest+=" or (test_scheduler and test_log_tasks_during_restart)" -donttest+=" or (test_scheduler and test_restart)" -donttest+=" or (test_scheduler and test_scheduler_init_pulls_blocked_handlers_from_config)" -donttest+=" or (test_scheduler and test_service_hosts)" -donttest+=" or (test_scheduler and test_steal_when_more_tasks)" -donttest+=" or (test_scheduler and test_task_groups)" -donttest+=" or (test_semaphor and test_getvalue)" donttest+=" or (test_semaphore and test_access_semaphore_by_name)" donttest+=" or (test_semaphore and test_close_async)" donttest+=" or (test_semaphore and test_oversubscribing_leases)" -donttest+=" or (test_semaphore and test_release_failure)" donttest+=" or (test_semaphore and test_release_once_too_many_resilience)" -donttest+=" or (test_semaphore and test_release_semaphore_after_timeout)" donttest+=" or (test_semaphore and test_release_simple)" donttest+=" or (test_semaphore and test_threadpoolworkers_pick_correct_ioloop)" -donttest+=" or (test_sparse_arrays and concurrent)" -donttest+=" or (test_spec and test_address_default_none)" -donttest+=" or (test_spec and test_child_address_persists)" -donttest+=" or (test_steal and test_balance)" -donttest+=" or (test_steal and test_dont_steal_already_released)" -donttest+=" or (test_steal and test_dont_steal_few_saturated_tasks_many_workers)" -donttest+=" or (test_steal and test_dont_steal_unknown_functions)" -donttest+=" or (test_steal and test_eventually_steal_unknown_functions)" -donttest+=" or (test_steal and test_restart)" -donttest+=" or (test_steal and test_steal_more_attractive_tasks)" -donttest+=" or (test_steal and test_steal_twice)" -donttest+=" or (test_steal and test_steal_when_more_tasks)" -donttest+=" or (test_steal and test_worksteal_many_thieves)" -donttest+=" or (test_stress and test_cancel_stress)" -donttest+=" or (test_tls_functional and test_retire_workers)" -donttest+=" or (test_tls_functional and test_worker_client)" -donttest+=" or (test_utils and test_sync_closed_loop)" -donttest+=" or (test_worker and test_dont_overlap_communications_to_same_worker)" -donttest+=" or (test_worker and test_gather_many_small)" -donttest+=" or (test_worker and test_get_client)" -donttest+=" or (test_worker and test_lifetime)" -donttest+=" or (test_worker and test_robust_to_bad_sizeof_estimates)" -donttest+=" or (test_worker and test_share_communication)" -donttest+=" or (test_worker and test_statistical_profiling_2)" -donttest+=" or (test_worker and test_stop_doing_unnecessary_work)" -donttest+=" or (test_worker and test_wait_for_outgoing)" -donttest+=" or (test_worker and test_workerstate_executing)" -donttest+=" or (test_worker_client and test_async)" -donttest+=" or (test_worker_client and test_client_executor)" -donttest+=" or (test_worker_client and test_compute_within_worker_client)" -donttest+=" or (test_worker_client and test_gather_multi_machine)" -donttest+=" or (test_worker_client and test_local_client_warning)" -donttest+=" or (test_worker_client and test_scatter_from_worker)" -donttest+=" or (test_worker_client and test_scatter_singleton)" -donttest+=" or (test_worker_client and test_secede_without_stealing_issue_1262)" -donttest+=" or (test_worker_client and test_submit_different_names)" -donttest+=" or (test_worker_client and test_submit_from_worker)" +donttest+=" or (test_worker and test_worker_client_closes_if_created_on_worker_last_worker_alive)" +donttest+=" or (test_worker and test_worker_client_closes_if_created_on_worker_one_worker)" %endif -# false version mismatch -donttest+=" or test_version_warning_in_cluster" -# ambiguous order in returned message -donttest+=" or (test_client and test_as_completed_async_for_cancel)" -# too many open files -donttest+=" or (test_stress and test_stress_communication)" # randomly fail even with old asyncio -- too slow for obs (?) donttest+=" or (test_asyncprocess and test_exit_callback)" -donttest+=" or (test_client and test_cleanup_after_broken_client_connection)" -donttest+=" or (test_client and test_open_close_many_workers)" -donttest+=" or (test_client and test_profile)" -donttest+=" or (test_client and test_quiet_quit_when_cluster_leaves)" -donttest+=" or (test_client and test_reconnect)" -donttest+=" or (test_client and test_sub_submit_priority)" -donttest+=" or (test_client and test_upload_file_exception_sync)" -donttest+=" or (test_client and test_upload_file_sync)" -donttest+=" or (test_diskutils and test_workspace_concurrency)" -donttest+=" or (test_failed_workers and test_fast_kill)" -donttest+=" or (test_metrics and time)" -donttest+=" or (test_queues and test_race)" -donttest+=" or (test_scheduler and test_gather_failing_cnn_recover)" -donttest+=" or (test_steal and test_dont_steal_fast_tasks_compute_time)" -donttest+=" or (test_stress and test_close_connections)" donttest+=" or (test_worker and test_fail_write_to_disk)" -donttest+=" or test_queue_in_task or test_variable_in_task" -# https://github.com/dask/distributed/pull/4719: "This test is heavily influenced by hard-to-control factors such as memory management" -# probably influenced by OBS scheduling -donttest+=" or (test_scheduler and test_memory)" -# likely related to the above (https://github.com/dask/distributed/pull/4651) -donttest+=" or (test_worker and test_spill_to_disk)" -# flaky on i586 -donttest+=" or (test_client_executor and test_map)" -%pytest_arch -rfE -n auto distributed/tests/ -k "not (${donttest:4})" -m "not avoid_travis" --timeout 180 +# rebalance fails on the server, but not when building locally +donttest+=" or (test_scheduler and test_rebalance)" +donttest+=" or (test_tls_functional and test_rebalance)" +%pytest_arch distributed/tests -r sfER -m "not avoid_ci" -k "not (${donttest:4})" --reruns 3 --reruns-delay 3 %endif %if ! %{with test}