1
0
Files
python-distributed/distributed-pr5022-improve_ci.patch
Matej Cepl 66e1d35321 Accepting request 907354 from home:bnavigator:branches:devel:languages:python:numeric
- Update to version 2021.7.0
  * Fix Nbytes jitter - less expensive
  * Use native GH actions cancel feature
  * Don't require workers to report to scheduler if scheduler
    shuts down
  * Add pandas to the list of checked packages for client.
    get_versions()
  * Move worker preload before scheduler address is set
  * Fix flaky test_oversubscribing_leases
  * Update scheduling policy docs for #4967
  * Add echo handler to Server class
  * Also include pngs when bundling package
  * Remove duplicated dashboard panes
  * Fix worker memory dashboard flickering
  * Tabs on bottom left corner on dashboard
  * Rename nbytes widgets
  * Co-assign root-ish tasks
  * OSError tweaks
  * Update imports to cudf.testing._utils
  * Ensure shuffle split default durations uses proper prefix
  * Follow up pyupgrade formatting
  * Rename plot dropdown
  * Pyupgrade
  * Misc Sphinx tweaks
  * No longer hold dependencies of erred tasks in memory
  * Add maximum shard size to config
  * Ensure shuffle split operations are blacklisted from work
    stealing
  * Add dropdown menu to access individual plots
  * Edited the path to scheduler.py
  * Task Group Graph Visualization
  * Remove more internal references to deprecated utilities
  * Restructure nbytes hover
  * Except more errors in pynvml.nvmlInit()
  * Add occupancy as individual plot
  * Deprecate utilities which have moved to dask
  * Ensure connectionpool does not leave comms if closed mid
    connect
  * Add support for registering scheduler plugins from Client
  * Stealing dashboard fixes
  * Allow requirements verification to be ignored when loading
    backends from entrypoints
  * Add Log and Logs to API docs
  * Support fixtures and pytest.mark.parametrize with gen_cluster
- Release 2021.06.2
  * Revert refactor to utils.Log[s] and Cluster.get_logs
  * Use deprecation utility from Dask
  * Add transition counter to Scheduler
  * Remove nbytes_in_memory
- Release 2021.06.1
  * Fix deadlock in handle_missing_dep if additional replicas are
    available
  * Add configuration to enable/disable NVML diagnostics
  * Add scheduler log tab to performance reports
  * Add HTML repr to scheduler_info and incorporate into client
    and cluster reprs
  * Fix error state typo
  * Allow actor exceptions to propagate
  * Remove importing apply from dask.compatibility
  * Use more informative default name for WorkerPlugin s
  * Removed unused utility functions
  * Locally rerun successfully completed futures
  * Forget erred tasks and fix deadlocks on worker
  * Handle HTTPClientError in websocket connector
  * Update dask_cuda usage in SSHCluster docstring
  * Remove tests for process_time and thread_time
  * Flake8 config cleanup
  * Don't strip scheduler protocol when determining host
  * Add more documentation on memory management
  * Add range_query tests to NVML test suite
  * No longer cancel result future in async process when using
    timeouts
- Release 2021.06.0
  * Multiple worker executors
  * Ensure PyNVML works correctly when installed with no GPUs
  * Show more in test summary
  * Move SystemMonitor s GPU initialization back to constructor
  * Mark test_server_comms_mark_active_handlers with pytest.mark.asyncio
  * Who has has what html reprs v2
  * O(1) rebalance
  * Ensure repr and eq for cluster always works
- Release 2021.05.1
  * Drop usage of WhoHas & WhatHas from Client
  * Ensure adaptive scaling is properly awaited and closed
  * Fix WhoHas/ HasWhat async usage
  * Add HTML reprs for Client.who_has and Client.has_what
  * Prevent accidentally starting multiple Worker s in the same
    process
  * Add system tab to performance reports
  * Let servers close faster if there are no active handlers
  * Fix UCX scrub config logging
  * Ensure worker clients are closed
  * Fix warning for attribute error when deleting a client
  * Ensure exceptions are raised if workers are incorrectly started
  * Update handling of UCX exceptions on endpoint closing
  * Ensure busy workloads properly look up who_has
  * Check distributed.scheduler.pickle in Scheduler.run_function
  * Add performance_report to API docs
  * Use dict _workers_dv in unordered use cases
  * Bump pre-commit hook versions
  * Do not mindlessly spawn workers when no memory limit is set
  * test_memory to use gen_cluster
  * Increase timeout of gen_test to 30s
- Work on the very flaky testsuite:
  * Add missing conftest.py not packaged on PyPI
  * Add distributed-pr5022-improve_ci.patch in the hope for better
    stability -- gh#dask/distributed#5022
  * Do not use pytest-xdist
- Add Cython as runtime dep because the scheduler checks the
  presence

OBS-URL: https://build.opensuse.org/request/show/907354
OBS-URL: https://build.opensuse.org/package/show/devel:languages:python:numeric/python-distributed?expand=0&rev=98
2021-07-21 09:16:05 +00:00

2622 lines
96 KiB
Diff

Index: distributed-2021.7.0/continuous_integration/condarc
===================================================================
--- /dev/null
+++ distributed-2021.7.0/continuous_integration/condarc
@@ -0,0 +1,9 @@
+channels:
+ - conda-forge
+ - defaults
+channel_priority: true
+auto_activate_base: false
+remote_backoff_factor: 20
+remote_connect_timeout_secs: 20.0
+remote_max_retries: 10
+remote_read_timeout_secs: 60.0
Index: distributed-2021.7.0/distributed/cli/tests/test_dask_scheduler.py
===================================================================
--- distributed-2021.7.0.orig/distributed/cli/tests/test_dask_scheduler.py
+++ distributed-2021.7.0/distributed/cli/tests/test_dask_scheduler.py
@@ -1,3 +1,4 @@
+import psutil
import pytest
pytest.importorskip("requests")
@@ -14,6 +15,7 @@ from click.testing import CliRunner
import distributed
import distributed.cli.dask_scheduler
from distributed import Client, Scheduler
+from distributed.compatibility import LINUX
from distributed.metrics import time
from distributed.utils import get_ip, get_ip_interface, tmpfile
from distributed.utils_test import (
@@ -118,9 +120,7 @@ def test_dashboard_non_standard_ports(lo
requests.get("http://localhost:4832/status/")
-@pytest.mark.skipif(
- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost"
-)
+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost")
def test_dashboard_whitelist(loop):
pytest.importorskip("bokeh")
with pytest.raises(Exception):
@@ -144,7 +144,6 @@ def test_dashboard_whitelist(loop):
def test_interface(loop):
- psutil = pytest.importorskip("psutil")
if_names = sorted(psutil.net_if_addrs())
for if_name in if_names:
try:
@@ -168,25 +167,24 @@ def test_interface(loop):
start = time()
while not len(c.nthreads()):
sleep(0.1)
- assert time() - start < 5
+ assert time() - start < 30
info = c.scheduler_info()
assert "tcp://127.0.0.1" in info["address"]
assert all("127.0.0.1" == d["host"] for d in info["workers"].values())
-@pytest.mark.flaky(reruns=10, reruns_delay=5)
def test_pid_file(loop):
def check_pidfile(proc, pidfile):
start = time()
while not os.path.exists(pidfile):
sleep(0.01)
- assert time() < start + 5
+ assert time() < start + 30
text = False
start = time()
while not text:
sleep(0.01)
- assert time() < start + 5
+ assert time() < start + 30
with open(pidfile) as f:
text = f.read()
pid = int(text)
Index: distributed-2021.7.0/distributed/cli/tests/test_dask_worker.py
===================================================================
--- distributed-2021.7.0.orig/distributed/cli/tests/test_dask_worker.py
+++ distributed-2021.7.0/distributed/cli/tests/test_dask_worker.py
@@ -6,7 +6,6 @@ from click.testing import CliRunner
pytest.importorskip("requests")
import os
-import sys
from multiprocessing import cpu_count
from time import sleep
@@ -14,6 +13,7 @@ import requests
import distributed.cli.dask_worker
from distributed import Client, Scheduler
+from distributed.compatibility import LINUX
from distributed.deploy.utils import nprocesses_nthreads
from distributed.metrics import time
from distributed.utils import parse_ports, sync, tmpfile
@@ -275,9 +275,7 @@ def test_nprocs_expands_name(loop):
assert len(set(names)) == 4
-@pytest.mark.skipif(
- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost"
-)
+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost")
@pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"])
@pytest.mark.parametrize(
"listen_address", ["tcp://0.0.0.0:39837", "tcp://127.0.0.2:39837"]
@@ -311,9 +309,7 @@ def test_contact_listen_address(loop, na
assert client.run(func) == {"tcp://127.0.0.2:39837": listen_address}
-@pytest.mark.skipif(
- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost"
-)
+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost")
@pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"])
@pytest.mark.parametrize("host", ["127.0.0.2", "0.0.0.0"])
def test_respect_host_listen_address(loop, nanny, host):
Index: distributed-2021.7.0/distributed/comm/tests/test_comms.py
===================================================================
--- distributed-2021.7.0.orig/distributed/comm/tests/test_comms.py
+++ distributed-2021.7.0/distributed/comm/tests/test_comms.py
@@ -29,7 +29,6 @@ from distributed.comm import (
)
from distributed.comm.registry import backends, get_backend
from distributed.comm.tcp import TCP, TCPBackend, TCPConnector
-from distributed.compatibility import WINDOWS
from distributed.metrics import time
from distributed.protocol import Serialized, deserialize, serialize, to_serialize
from distributed.utils import get_ip, get_ipv6
@@ -110,7 +109,7 @@ async def debug_loop():
while True:
loop = ioloop.IOLoop.current()
print(".", loop, loop._handlers)
- await asyncio.sleep(0.50)
+ await asyncio.sleep(0.5)
#
@@ -1107,7 +1106,6 @@ async def check_deserialize(addr):
await check_connector_deserialize(addr, True, msg, partial(check_out, True))
-@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=WINDOWS)
@pytest.mark.asyncio
async def test_tcp_deserialize():
await check_deserialize("tcp://")
Index: distributed-2021.7.0/distributed/comm/tests/test_ucx.py
===================================================================
--- distributed-2021.7.0.orig/distributed/comm/tests/test_ucx.py
+++ distributed-2021.7.0/distributed/comm/tests/test_ucx.py
@@ -276,7 +276,7 @@ async def test_ucx_localcluster(processe
) as cluster:
async with Client(cluster, asynchronous=True) as client:
x = client.submit(inc, 1)
- await x.result()
+ await x
assert x.key in cluster.scheduler.tasks
if not processes:
assert any(w.data == {x.key: 2} for w in cluster.workers.values())
Index: distributed-2021.7.0/distributed/comm/tests/test_ws.py
===================================================================
--- distributed-2021.7.0.orig/distributed/comm/tests/test_ws.py
+++ distributed-2021.7.0/distributed/comm/tests/test_ws.py
@@ -122,9 +122,9 @@ async def test_collections(cleanup):
async def test_large_transfer(cleanup):
np = pytest.importorskip("numpy")
async with Scheduler(protocol="ws://") as s:
- async with Worker(s.address, protocol="ws://") as w:
+ async with Worker(s.address, protocol="ws://"):
async with Client(s.address, asynchronous=True) as c:
- future = await c.scatter(np.random.random(1000000))
+ await c.scatter(np.random.random(1_000_000))
@pytest.mark.asyncio
Index: distributed-2021.7.0/distributed/compatibility.py
===================================================================
--- distributed-2021.7.0.orig/distributed/compatibility.py
+++ distributed-2021.7.0/distributed/compatibility.py
@@ -8,6 +8,7 @@ logging_names = logging._levelToName.cop
logging_names.update(logging._nameToLevel)
PYPY = platform.python_implementation().lower() == "pypy"
+LINUX = sys.platform == "linux"
MACOS = sys.platform == "darwin"
WINDOWS = sys.platform.startswith("win")
TORNADO6 = tornado.version_info[0] >= 6
Index: distributed-2021.7.0/distributed/dashboard/tests/test_scheduler_bokeh.py
===================================================================
--- distributed-2021.7.0.orig/distributed/dashboard/tests/test_scheduler_bokeh.py
+++ distributed-2021.7.0/distributed/dashboard/tests/test_scheduler_bokeh.py
@@ -17,7 +17,6 @@ from dask.core import flatten
from dask.utils import stringify
from distributed.client import wait
-from distributed.compatibility import MACOS
from distributed.dashboard import scheduler
from distributed.dashboard.components.scheduler import (
AggregateAction,
@@ -93,10 +92,8 @@ async def test_counters(c, s, a, b):
await asyncio.sleep(0.1)
ss.update()
- start = time()
while not len(ss.digest_sources["tick-duration"][0].data["x"]):
- await asyncio.sleep(1)
- assert time() < start + 5
+ await asyncio.sleep(0.01)
@gen_cluster(client=True)
@@ -184,7 +181,7 @@ async def test_task_stream_clear_interva
await wait(c.map(inc, range(10)))
ts.update()
- await asyncio.sleep(0.010)
+ await asyncio.sleep(0.01)
await wait(c.map(dec, range(10)))
ts.update()
@@ -192,7 +189,7 @@ async def test_task_stream_clear_interva
assert ts.source.data["name"].count("inc") == 10
assert ts.source.data["name"].count("dec") == 10
- await asyncio.sleep(0.300)
+ await asyncio.sleep(0.3)
await wait(c.map(inc, range(10, 20)))
ts.update()
@@ -848,7 +845,6 @@ async def test_aggregate_action(c, s, a,
assert ("compute") in mbk.action_source.data["names"]
-@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=MACOS)
@gen_cluster(client=True, scheduler_kwargs={"dashboard": True})
async def test_compute_per_key(c, s, a, b):
mbk = ComputePerKey(s)
Index: distributed-2021.7.0/distributed/deploy/local.py
===================================================================
--- distributed-2021.7.0.orig/distributed/deploy/local.py
+++ distributed-2021.7.0/distributed/deploy/local.py
@@ -136,8 +136,8 @@ class LocalCluster(SpecCluster):
if threads_per_worker == 0:
warnings.warn(
- "Setting `threads_per_worker` to 0 is discouraged. "
- "Please set to None or to a specific int to get best behavior."
+ "Setting `threads_per_worker` to 0 has been deprecated. "
+ "Please set to None or to a specific int."
)
threads_per_worker = None
Index: distributed-2021.7.0/distributed/deploy/old_ssh.py
===================================================================
--- distributed-2021.7.0.orig/distributed/deploy/old_ssh.py
+++ distributed-2021.7.0/distributed/deploy/old_ssh.py
@@ -51,8 +51,8 @@ def async_ssh(cmd_dict):
port=cmd_dict["ssh_port"],
key_filename=cmd_dict["ssh_private_key"],
compress=True,
- timeout=20,
- banner_timeout=20,
+ timeout=30,
+ banner_timeout=30,
) # Helps prevent timeouts when many concurrent ssh connections are opened.
# Connection successful, break out of while loop
break
Index: distributed-2021.7.0/distributed/deploy/tests/test_adaptive.py
===================================================================
--- distributed-2021.7.0.orig/distributed/deploy/tests/test_adaptive.py
+++ distributed-2021.7.0/distributed/deploy/tests/test_adaptive.py
@@ -8,6 +8,7 @@ import pytest
import dask
from distributed import Adaptive, Client, LocalCluster, SpecCluster, Worker, wait
+from distributed.compatibility import WINDOWS
from distributed.metrics import time
from distributed.utils_test import async_wait_for, clean, gen_test, slowinc
@@ -40,8 +41,8 @@ def test_adaptive_local_cluster(loop):
assert not c.nthreads()
-@pytest.mark.asyncio
-async def test_adaptive_local_cluster_multi_workers(cleanup):
+@gen_test()
+async def test_adaptive_local_cluster_multi_workers():
async with LocalCluster(
n_workers=0,
scheduler_port=0,
@@ -56,19 +57,14 @@ async def test_adaptive_local_cluster_mu
async with Client(cluster, asynchronous=True) as c:
futures = c.map(slowinc, range(100), delay=0.01)
- start = time()
while not cluster.scheduler.workers:
await asyncio.sleep(0.01)
- assert time() < start + 15, adapt.log
await c.gather(futures)
del futures
- start = time()
- # while cluster.workers:
while cluster.scheduler.workers:
await asyncio.sleep(0.01)
- assert time() < start + 15, adapt.log
# no workers for a while
for i in range(10):
@@ -242,7 +238,7 @@ async def test_adapt_quickly():
await cluster.close()
-@gen_test(timeout=None)
+@gen_test()
async def test_adapt_down():
"""Ensure that redefining adapt with a lower maximum removes workers"""
async with LocalCluster(
@@ -302,14 +298,15 @@ def test_basic_no_loop(loop):
loop.add_callback(loop.stop)
-@pytest.mark.asyncio
+@pytest.mark.flaky(condition=not WINDOWS, reruns=10, reruns_delay=5)
+@pytest.mark.xfail(condition=WINDOWS, reason="extremely flaky")
+@gen_test()
async def test_target_duration():
- """Ensure that redefining adapt with a lower maximum removes workers"""
with dask.config.set(
{"distributed.scheduler.default-task-durations": {"slowinc": 1}}
):
async with LocalCluster(
- 0,
+ n_workers=0,
asynchronous=True,
processes=False,
scheduler_port=0,
@@ -318,16 +315,12 @@ async def test_target_duration():
) as cluster:
adapt = cluster.adapt(interval="20ms", minimum=2, target_duration="5s")
async with Client(cluster, asynchronous=True) as client:
- while len(cluster.scheduler.workers) < 2:
- await asyncio.sleep(0.01)
-
+ await client.wait_for_workers(2)
futures = client.map(slowinc, range(100), delay=0.3)
+ await wait(futures)
- while len(adapt.log) < 2:
- await asyncio.sleep(0.01)
-
- assert adapt.log[0][1] == {"status": "up", "n": 2}
- assert adapt.log[1][1] == {"status": "up", "n": 20}
+ assert adapt.log[0][1] == {"status": "up", "n": 2}
+ assert adapt.log[1][1] == {"status": "up", "n": 20}
@pytest.mark.asyncio
Index: distributed-2021.7.0/distributed/deploy/tests/test_adaptive_core.py
===================================================================
--- distributed-2021.7.0.orig/distributed/deploy/tests/test_adaptive_core.py
+++ distributed-2021.7.0/distributed/deploy/tests/test_adaptive_core.py
@@ -85,10 +85,10 @@ async def test_interval():
assert time() < start + 2
adapt.stop()
- await asyncio.sleep(0.050)
+ await asyncio.sleep(0.05)
adapt._target = 10
- await asyncio.sleep(0.020)
+ await asyncio.sleep(0.02)
assert len(adapt.plan) == 1 # last value from before, unchanged
Index: distributed-2021.7.0/distributed/deploy/tests/test_local.py
===================================================================
--- distributed-2021.7.0.orig/distributed/deploy/tests/test_local.py
+++ distributed-2021.7.0/distributed/deploy/tests/test_local.py
@@ -17,6 +17,7 @@ from tornado.ioloop import IOLoop
from dask.system import CPU_COUNT
from distributed import Client, Nanny, Worker, get_client
+from distributed.compatibility import LINUX
from distributed.core import Status
from distributed.deploy.local import LocalCluster
from distributed.deploy.utils_test import ClusterTest
@@ -693,13 +694,12 @@ def test_adapt_then_manual(loop):
processes=False,
n_workers=8,
) as cluster:
- sleep(0.1)
cluster.adapt(minimum=0, maximum=4, interval="10ms")
start = time()
while cluster.scheduler.workers or cluster.workers:
- sleep(0.1)
- assert time() < start + 5
+ sleep(0.01)
+ assert time() < start + 30
assert not cluster.workers
@@ -715,8 +715,8 @@ def test_adapt_then_manual(loop):
start = time()
while len(cluster.scheduler.workers) != 2:
- sleep(0.1)
- assert time() < start + 5
+ sleep(0.01)
+ assert time() < start + 30
@pytest.mark.parametrize("temporary", [True, False])
@@ -843,9 +843,7 @@ def test_protocol_tcp(loop):
assert cluster.scheduler.address.startswith("tcp://")
-@pytest.mark.skipif(
- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost"
-)
+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost")
def test_protocol_ip(loop):
with LocalCluster(
host="tcp://127.0.0.2", loop=loop, n_workers=0, processes=False
@@ -990,7 +988,7 @@ async def test_repr(cleanup):
@pytest.mark.asyncio
async def test_threads_per_worker_set_to_0(cleanup):
with pytest.warns(
- Warning, match="Setting `threads_per_worker` to 0 is discouraged."
+ Warning, match="Setting `threads_per_worker` to 0 has been deprecated."
):
async with LocalCluster(
n_workers=2, processes=False, threads_per_worker=0, asynchronous=True
Index: distributed-2021.7.0/distributed/deploy/tests/test_spec_cluster.py
===================================================================
--- distributed-2021.7.0.orig/distributed/deploy/tests/test_spec_cluster.py
+++ distributed-2021.7.0/distributed/deploy/tests/test_spec_cluster.py
@@ -14,6 +14,7 @@ from distributed.core import Status
from distributed.deploy.spec import ProcessInterface, close_clusters, run_spec
from distributed.metrics import time
from distributed.utils import is_valid_xml
+from distributed.utils_test import gen_test
class MyWorker(Worker):
@@ -138,8 +139,8 @@ async def test_scale(cleanup):
@pytest.mark.slow
-@pytest.mark.asyncio
-async def test_adaptive_killed_worker(cleanup):
+@gen_test()
+async def test_adaptive_killed_worker():
with dask.config.set({"distributed.deploy.lost-worker-timeout": 0.1}):
async with SpecCluster(
@@ -147,13 +148,10 @@ async def test_adaptive_killed_worker(cl
worker={"cls": Nanny, "options": {"nthreads": 1}},
scheduler={"cls": Scheduler, "options": {"port": 0}},
) as cluster:
-
async with Client(cluster, asynchronous=True) as client:
-
- cluster.adapt(minimum=1, maximum=1)
-
# Scale up a cluster with 1 worker.
- while len(cluster.workers) != 1:
+ cluster.adapt(minimum=1, maximum=1)
+ while not cluster.workers:
await asyncio.sleep(0.01)
future = client.submit(sleep, 0.1)
@@ -163,11 +161,11 @@ async def test_adaptive_killed_worker(cl
await cluster.workers[worker_id].kill()
# Wait for the worker to re-spawn and finish sleeping.
- await future.result(timeout=5)
+ await future
-@pytest.mark.asyncio
-async def test_unexpected_closed_worker(cleanup):
+@gen_test()
+async def test_unexpected_closed_worker():
worker = {"cls": Worker, "options": {"nthreads": 1}}
with dask.config.set({"distributed.deploy.lost-worker-timeout": "10ms"}):
async with SpecCluster(
@@ -197,25 +195,20 @@ async def test_unexpected_closed_worker(
assert len(cluster.workers) == 2
-@pytest.mark.flaky(reruns=10, reruns_delay=5)
-@pytest.mark.slow
-@pytest.mark.asyncio
-async def test_restart(cleanup):
- # Regression test for https://github.com/dask/distributed/issues/3062
+@gen_test(timeout=60)
+async def test_restart():
+ """Regression test for https://github.com/dask/distributed/issues/3062"""
worker = {"cls": Nanny, "options": {"nthreads": 1}}
- with dask.config.set({"distributed.deploy.lost-worker-timeout": "2s"}):
- async with SpecCluster(
- asynchronous=True, scheduler=scheduler, worker=worker
- ) as cluster:
- async with Client(cluster, asynchronous=True) as client:
- cluster.scale(2)
- await cluster
- assert len(cluster.workers) == 2
- await client.restart()
- start = time()
- while len(cluster.workers) < 2:
- await asyncio.sleep(0.5)
- assert time() < start + 60
+ async with SpecCluster(
+ asynchronous=True, scheduler=scheduler, worker=worker
+ ) as cluster:
+ async with Client(cluster, asynchronous=True) as client:
+ cluster.scale(2)
+ await cluster
+ assert len(cluster.workers) == 2
+ await client.restart()
+ while len(cluster.workers) < 2:
+ await asyncio.sleep(0.01)
@pytest.mark.skipif(WINDOWS, reason="HTTP Server doesn't close out")
Index: distributed-2021.7.0/distributed/diagnostics/tests/test_progress.py
===================================================================
--- distributed-2021.7.0.orig/distributed/diagnostics/tests/test_progress.py
+++ distributed-2021.7.0/distributed/diagnostics/tests/test_progress.py
@@ -4,6 +4,7 @@ import pytest
from distributed import Nanny
from distributed.client import wait
+from distributed.compatibility import LINUX
from distributed.diagnostics.progress import (
AllProgress,
GroupProgress,
@@ -95,8 +96,9 @@ def check_bar_completed(capsys, width=40
assert percent == "100% Completed"
+@pytest.mark.flaky(condition=not COMPILED and LINUX, reruns=10, reruns_delay=5)
@pytest.mark.xfail(COMPILED, reason="Fails with cythonized scheduler")
-@gen_cluster(client=True, Worker=Nanny, timeout=None)
+@gen_cluster(client=True, Worker=Nanny)
async def test_AllProgress(c, s, a, b):
x, y, z = c.map(inc, [1, 2, 3])
xx, yy, zz = c.map(dec, [x, y, z])
@@ -178,8 +180,9 @@ async def test_AllProgress(c, s, a, b):
assert all(set(d) == {"div"} for d in p.state.values())
+@pytest.mark.flaky(condition=LINUX, reruns=10, reruns_delay=5)
@gen_cluster(client=True, Worker=Nanny)
-async def test_AllProgress_lost_key(c, s, a, b, timeout=None):
+async def test_AllProgress_lost_key(c, s, a, b):
p = AllProgress(s)
futures = c.map(inc, range(5))
await wait(futures)
@@ -188,10 +191,8 @@ async def test_AllProgress_lost_key(c, s
await a.close()
await b.close()
- start = time()
while len(p.state["memory"]["inc"]) > 0:
- await asyncio.sleep(0.1)
- assert time() < start + 5
+ await asyncio.sleep(0.01)
@gen_cluster(client=True)
Index: distributed-2021.7.0/distributed/diagnostics/tests/test_scheduler_plugin.py
===================================================================
--- distributed-2021.7.0.orig/distributed/diagnostics/tests/test_scheduler_plugin.py
+++ distributed-2021.7.0/distributed/diagnostics/tests/test_scheduler_plugin.py
@@ -33,7 +33,7 @@ async def test_simple(c, s, a, b):
assert counter not in s.plugins
-@gen_cluster(nthreads=[], client=False)
+@gen_cluster(nthreads=[])
async def test_add_remove_worker(s):
events = []
@@ -71,7 +71,7 @@ async def test_add_remove_worker(s):
assert events == []
-@gen_cluster(nthreads=[], client=False)
+@gen_cluster(nthreads=[])
async def test_async_add_remove_worker(s):
events = []
Index: distributed-2021.7.0/distributed/diagnostics/tests/test_widgets.py
===================================================================
--- distributed-2021.7.0.orig/distributed/diagnostics/tests/test_widgets.py
+++ distributed-2021.7.0/distributed/diagnostics/tests/test_widgets.py
@@ -5,8 +5,6 @@ pytest.importorskip("ipywidgets")
from ipykernel.comm import Comm
from ipywidgets import Widget
-from distributed.compatibility import WINDOWS
-
#################
# Utility stuff #
#################
@@ -145,7 +143,6 @@ async def test_multi_progressbar_widget(
assert sorted(capacities, reverse=True) == capacities
-@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=WINDOWS)
@gen_cluster()
async def test_multi_progressbar_widget_after_close(s, a, b):
s.update_graph(
Index: distributed-2021.7.0/distributed/distributed.yaml
===================================================================
--- distributed-2021.7.0.orig/distributed/distributed.yaml
+++ distributed-2021.7.0/distributed/distributed.yaml
@@ -176,7 +176,7 @@ distributed:
threads: 0 # Threads to use. 0 for single-threaded, -1 to infer from cpu count.
timeouts:
- connect: 10s # time before connecting fails
+ connect: 30s # time before connecting fails
tcp: 30s # time before calling an unresponsive connection dead
require-encryption: null # Whether to require encryption on non-local comms
Index: distributed-2021.7.0/distributed/nanny.py
===================================================================
--- distributed-2021.7.0.orig/distributed/nanny.py
+++ distributed-2021.7.0/distributed/nanny.py
@@ -384,7 +384,7 @@ class Nanny(ServerNode):
raise
return result
- async def restart(self, comm=None, timeout=2, executor_wait=True):
+ async def restart(self, comm=None, timeout=30, executor_wait=True):
async def _():
if self.process is not None:
await self.kill()
@@ -393,7 +393,9 @@ class Nanny(ServerNode):
try:
await asyncio.wait_for(_(), timeout)
except TimeoutError:
- logger.error("Restart timed out, returning before finished")
+ logger.error(
+ f"Restart timed out after {timeout}s; returning before finished"
+ )
return "timed out"
else:
return "OK"
Index: distributed-2021.7.0/distributed/protocol/tests/test_pickle.py
===================================================================
--- distributed-2021.7.0.orig/distributed/protocol/tests/test_pickle.py
+++ distributed-2021.7.0/distributed/protocol/tests/test_pickle.py
@@ -128,7 +128,6 @@ def test_pickle_numpy():
assert (deserialize(h, f) == x).all()
-@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=sys.version_info[:2] == (3, 8))
def test_pickle_functions():
def make_closure():
value = 1
Index: distributed-2021.7.0/distributed/scheduler.py
===================================================================
--- distributed-2021.7.0.orig/distributed/scheduler.py
+++ distributed-2021.7.0/distributed/scheduler.py
@@ -3440,19 +3440,18 @@ class Scheduler(SchedulerState, ServerNo
http_server_modules = dask.config.get("distributed.scheduler.http.routes")
show_dashboard = dashboard or (dashboard is None and dashboard_address)
- missing_bokeh = False
# install vanilla route if show_dashboard but bokeh is not installed
if show_dashboard:
try:
import distributed.dashboard.scheduler
except ImportError:
- missing_bokeh = True
+ show_dashboard = False
http_server_modules.append("distributed.http.scheduler.missing_bokeh")
routes = get_handlers(
server=self, modules=http_server_modules, prefix=http_prefix
)
self.start_http_server(routes, dashboard_address, default_port=8787)
- if show_dashboard and not missing_bokeh:
+ if show_dashboard:
distributed.dashboard.scheduler.connect(
self.http_application, self.http_server, self, prefix=http_prefix
)
@@ -5467,8 +5466,8 @@ class Scheduler(SchedulerState, ServerNo
for collection in self._task_state_collections:
collection.clear()
- async def restart(self, client=None, timeout=3):
- """Restart all workers. Reset local state."""
+ async def restart(self, client=None, timeout=30):
+ """Restart all workers. Reset local state."""
parent: SchedulerState = cast(SchedulerState, self)
with log_errors():
Index: distributed-2021.7.0/distributed/tests/test_actor.py
===================================================================
--- distributed-2021.7.0.orig/distributed/tests/test_actor.py
+++ distributed-2021.7.0/distributed/tests/test_actor.py
@@ -485,10 +485,8 @@ async def test_compute(c, s, a, b):
result = await c.compute(final, actors=counter)
assert result == 0 + 1 + 2 + 3 + 4
- start = time()
while a.data or b.data:
await asyncio.sleep(0.01)
- assert time() < start + 30
def test_compute_sync(client):
Index: distributed-2021.7.0/distributed/tests/test_as_completed.py
===================================================================
--- distributed-2021.7.0.orig/distributed/tests/test_as_completed.py
+++ distributed-2021.7.0/distributed/tests/test_as_completed.py
@@ -14,7 +14,7 @@ from distributed.utils_test import gen_c
@gen_cluster(client=True)
-async def test__as_completed(c, s, a, b):
+async def test_as_completed_async(c, s, a, b):
x = c.submit(inc, 1)
y = c.submit(inc, 1)
z = c.submit(inc, 2)
@@ -29,7 +29,7 @@ async def test__as_completed(c, s, a, b)
assert result in [x, y, z]
-def test_as_completed(client):
+def test_as_completed_sync(client):
x = client.submit(inc, 1)
y = client.submit(inc, 2)
z = client.submit(inc, 1)
@@ -201,7 +201,6 @@ async def test_as_completed_with_results
assert str(exc.value) == "hello!"
-@pytest.mark.flaky(reruns=10, reruns_delay=5)
def test_as_completed_with_results_no_raise(client):
x = client.submit(throws, 1)
y = client.submit(inc, 5)
@@ -260,7 +259,7 @@ async def test_as_completed_with_results
assert dd[z][0] == 2
-@gen_cluster(client=True, timeout=None)
+@gen_cluster(client=True)
async def test_clear(c, s, a, b):
futures = c.map(inc, range(3))
ac = as_completed(futures)
Index: distributed-2021.7.0/distributed/tests/test_asyncprocess.py
===================================================================
--- distributed-2021.7.0.orig/distributed/tests/test_asyncprocess.py
+++ distributed-2021.7.0/distributed/tests/test_asyncprocess.py
@@ -8,6 +8,7 @@ import weakref
from datetime import timedelta
from time import sleep
+import psutil
import pytest
from tornado import gen
from tornado.locks import Event
@@ -280,13 +281,9 @@ async def test_child_main_thread():
q._writer.close()
-@pytest.mark.skipif(
- sys.platform.startswith("win"), reason="num_fds not supported on windows"
-)
+@pytest.mark.skipif(WINDOWS, reason="num_fds not supported on windows")
@gen_test()
async def test_num_fds():
- psutil = pytest.importorskip("psutil")
-
# Warm up
proc = AsyncProcess(target=exit_now)
proc.daemon = True
@@ -303,11 +300,8 @@ async def test_num_fds():
assert not proc.is_alive()
assert proc.exitcode == 0
- start = time()
while p.num_fds() > before:
- await asyncio.sleep(0.1)
- print("fds:", before, p.num_fds())
- assert time() < start + 10
+ await asyncio.sleep(0.01)
@gen_test()
@@ -407,7 +401,7 @@ def test_asyncprocess_child_teardown_on_
try:
readable = children_alive.poll(short_timeout)
except BrokenPipeError:
- assert sys.platform.startswith("win"), "should only raise on windows"
+ assert WINDOWS, "should only raise on windows"
# Broken pipe implies closed, which is readable.
readable = True
@@ -422,7 +416,7 @@ def test_asyncprocess_child_teardown_on_
except EOFError:
pass # Test passes.
except BrokenPipeError:
- assert sys.platform.startswith("win"), "should only raise on windows"
+ assert WINDOWS, "should only raise on windows"
# Test passes.
else:
# Oops, children_alive read something. It should be closed. If
Index: distributed-2021.7.0/distributed/tests/test_client.py
===================================================================
--- distributed-2021.7.0.orig/distributed/tests/test_client.py
+++ distributed-2021.7.0/distributed/tests/test_client.py
@@ -55,7 +55,7 @@ from distributed.client import (
wait,
)
from distributed.comm import CommClosedError
-from distributed.compatibility import MACOS, WINDOWS
+from distributed.compatibility import LINUX, WINDOWS
from distributed.core import Status
from distributed.metrics import time
from distributed.objects import HasWhat, WhoHas
@@ -95,7 +95,7 @@ from distributed.utils_test import (
)
-@gen_cluster(client=True, timeout=None)
+@gen_cluster(client=True)
async def test_submit(c, s, a, b):
x = c.submit(inc, 10)
assert not x.done()
@@ -624,7 +624,7 @@ async def test_limit_concurrent_gatherin
assert len(a.outgoing_transfer_log) + len(b.outgoing_transfer_log) < 100
-@gen_cluster(client=True, timeout=None)
+@gen_cluster(client=True)
async def test_get(c, s, a, b):
future = c.get({"x": (inc, 1)}, "x", sync=False)
assert isinstance(future, Future)
@@ -717,7 +717,7 @@ async def test_wait_first_completed(c, s
assert y.status == "pending"
-@gen_cluster(client=True, timeout=2)
+@gen_cluster(client=True)
async def test_wait_timeout(c, s, a, b):
future = c.submit(sleep, 0.3)
with pytest.raises(TimeoutError):
@@ -794,7 +794,7 @@ async def test_garbage_collection_with_s
await asyncio.sleep(0.1)
-@gen_cluster(timeout=1000, client=True)
+@gen_cluster(client=True)
async def test_recompute_released_key(c, s, a, b):
x = c.submit(inc, 100)
result1 = await x
@@ -907,9 +907,7 @@ async def test_tokenize_on_futures(c, s,
assert tok == tokenize(y)
-@pytest.mark.skipif(
- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost"
-)
+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost")
@gen_cluster([("127.0.0.1", 1), ("127.0.0.2", 2)], client=True)
async def test_restrictions_submit(c, s, a, b):
x = c.submit(inc, 1, workers={a.ip})
@@ -936,9 +934,7 @@ async def test_restrictions_ip_port(c, s
assert y.key in b.data
-@pytest.mark.skipif(
- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost"
-)
+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost")
@gen_cluster([("127.0.0.1", 1), ("127.0.0.2", 2)], client=True)
async def test_restrictions_map(c, s, a, b):
L = c.map(inc, range(5), workers={a.ip})
@@ -950,9 +946,7 @@ async def test_restrictions_map(c, s, a,
assert s.host_restrictions[x.key] == {a.ip}
-@pytest.mark.skipif(
- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost"
-)
+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost")
@gen_cluster([("127.0.0.1", 1), ("127.0.0.2", 2)], client=True)
async def test_restrictions_get(c, s, a, b):
dsk = {"x": 1, "y": (inc, "x"), "z": (inc, "y")}
@@ -991,7 +985,7 @@ async def dont_test_bad_restrictions_rai
assert z.key in str(e)
-@gen_cluster(client=True, timeout=None)
+@gen_cluster(client=True)
async def test_remove_worker(c, s, a, b):
L = c.map(inc, range(20))
await wait(L)
@@ -1280,9 +1274,7 @@ async def test_get_nbytes(c, s, a, b):
assert s.get_nbytes(summary=False) == {x.key: sizeof(1), y.key: sizeof(2)}
-@pytest.mark.skipif(
- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost"
-)
+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost")
@gen_cluster([("127.0.0.1", 1), ("127.0.0.2", 2)], client=True)
async def test_nbytes_determines_worker(c, s, a, b):
x = c.submit(identity, 1, workers=[a.ip])
@@ -1455,7 +1447,7 @@ async def test_scatter_direct_empty(c, s
await c.scatter(123, direct=True, timeout=0.1)
-@gen_cluster(client=True, timeout=None, nthreads=[("127.0.0.1", 1)] * 5)
+@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 5)
async def test_scatter_direct_spread_evenly(c, s, *workers):
futures = []
for i in range(10):
@@ -1799,7 +1791,7 @@ async def test_remote_scatter_gather(c,
assert (xx, yy, zz) == (1, 2, 3)
-@gen_cluster(timeout=1000, client=True)
+@gen_cluster(client=True)
async def test_remote_submit_on_Future(c, s, a, b):
x = c.submit(lambda x: x + 1, 1)
y = c.submit(lambda x: x + 1, x)
@@ -1836,9 +1828,7 @@ async def test_client_with_scheduler(c,
assert result == 12
-@pytest.mark.skipif(
- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost"
-)
+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost")
@gen_cluster([("127.0.0.1", 1), ("127.0.0.2", 2)], client=True)
async def test_allow_restrictions(c, s, a, b):
aws = s.workers[a.address]
@@ -1971,7 +1961,7 @@ async def test_badly_serialized_input(c,
assert "hello!" in str(info.value)
-@pytest.mark.skipif("True", reason="")
+@pytest.mark.skip
async def test_badly_serialized_input_stderr(capsys, c):
o = BadlySerializedObject()
future = c.submit(inc, o)
@@ -2853,7 +2843,6 @@ async def test_persist_get(c, s, a, b):
@pytest.mark.skipif(WINDOWS, reason="num_fds not supported on windows")
def test_client_num_fds(loop):
- psutil = pytest.importorskip("psutil")
with cluster() as (s, [a, b]):
proc = psutil.Process()
with Client(s["address"], loop=loop) as c: # first client to start loop
@@ -2864,7 +2853,7 @@ def test_client_num_fds(loop):
start = time()
while proc.num_fds() > before:
sleep(0.01)
- assert time() < start + 4
+ assert time() < start + 10, (before, proc.num_fds())
@gen_cluster()
@@ -3035,9 +3024,7 @@ async def test_receive_lost_key(c, s, a,
await asyncio.sleep(0.01)
-@pytest.mark.skipif(
- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost"
-)
+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost")
@gen_cluster([("127.0.0.1", 1), ("127.0.0.2", 2)], client=True)
async def test_unrunnable_task_runs(c, s, a, b):
x = c.submit(inc, 1, workers=[a.ip])
@@ -3073,9 +3060,7 @@ async def test_add_worker_after_tasks(c,
await n.close()
-@pytest.mark.skipif(
- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost"
-)
+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost")
@gen_cluster([("127.0.0.1", 1), ("127.0.0.2", 2)], client=True)
async def test_workers_register_indirect_data(c, s, a, b):
[x] = await c.scatter([1], workers=a.address)
@@ -3205,13 +3190,10 @@ async def test_client_replicate(c, s, *w
assert len(s.tasks[y.key].who_has) == 10
-@pytest.mark.skipif(
- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost"
-)
+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost")
@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1), ("127.0.0.2", 1), ("127.0.0.2", 1)],
- timeout=None,
)
async def test_client_replicate_host(client, s, a, b, c):
aws = s.workers[a.address]
@@ -3565,7 +3547,7 @@ def test_get_returns_early(c):
@pytest.mark.slow
-@gen_cluster(Worker=Nanny, client=True)
+@gen_cluster(Worker=Nanny, client=True, timeout=60)
async def test_Client_clears_references_after_restart(c, s, a, b):
x = c.submit(inc, 1)
assert x.key in c.refcount
@@ -3789,7 +3771,6 @@ async def test_reconnect_timeout(c, s):
@pytest.mark.skipif(WINDOWS, reason="num_fds not supported on windows")
@pytest.mark.parametrize("worker,count,repeat", [(Worker, 100, 5), (Nanny, 10, 20)])
def test_open_close_many_workers(loop, worker, count, repeat):
- psutil = pytest.importorskip("psutil")
proc = psutil.Process()
with cluster(nworkers=0, active_rpc_timeout=2) as (s, _):
@@ -3857,7 +3838,7 @@ def test_open_close_many_workers(loop, w
raise ValueError("File descriptors did not clean up")
-@gen_cluster(client=False, timeout=None)
+@gen_cluster()
async def test_idempotence(s, a, b):
c = await Client(s.address, asynchronous=True)
f = await Client(s.address, asynchronous=True)
@@ -4070,7 +4051,7 @@ async def test_scatter_compute_store_los
assert z.status == "cancelled"
-@gen_cluster(client=False)
+@gen_cluster()
async def test_serialize_future(s, a, b):
c1 = await Client(s.address, asynchronous=True)
c2 = await Client(s.address, asynchronous=True)
@@ -4091,7 +4072,7 @@ async def test_serialize_future(s, a, b)
await c2.close()
-@gen_cluster(client=False)
+@gen_cluster()
async def test_temp_default_client(s, a, b):
c1 = await Client(s.address, asynchronous=True)
c2 = await Client(s.address, asynchronous=True)
@@ -4172,7 +4153,7 @@ def test_as_current_is_thread_local(s):
t2.join()
-@gen_cluster(client=False)
+@gen_cluster()
async def test_as_current_is_task_local(s, a, b):
l1 = asyncio.Lock()
l2 = asyncio.Lock()
@@ -4557,7 +4538,7 @@ def assert_no_data_loss(scheduler):
assert not (k == key and v == "waiting")
-@gen_cluster(client=True, timeout=None)
+@gen_cluster(client=True)
async def test_interleave_computations(c, s, a, b):
import distributed
@@ -4592,7 +4573,7 @@ async def test_interleave_computations(c
@pytest.mark.skip(reason="Now prefer first-in-first-out")
-@gen_cluster(client=True, timeout=None)
+@gen_cluster(client=True)
async def test_interleave_computations_map(c, s, a, b):
xs = c.map(slowinc, range(30), delay=0.02)
ys = c.map(slowdec, xs, delay=0.02)
@@ -4621,7 +4602,6 @@ async def test_scatter_dict_workers(c, s
assert "a" in a.data or "a" in b.data
-@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=MACOS)
@pytest.mark.slow
@gen_test()
async def test_client_timeout():
@@ -4629,18 +4609,17 @@ async def test_client_timeout():
s = Scheduler(loop=c.loop, port=57484)
await asyncio.sleep(4)
+
try:
await s
except OSError: # port in use
await c.close()
return
- start = time()
- await c
try:
- assert time() < start + 2
- finally:
+ await c
await c.close()
+ finally:
await s.close()
@@ -5167,7 +5146,7 @@ async def test_serialize_collections(c,
assert result == sum(range(10))
-@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 1, timeout=100)
+@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 1)
async def test_secede_simple(c, s, a):
def f():
client = get_client()
@@ -5178,7 +5157,6 @@ async def test_secede_simple(c, s, a):
assert result == 2
-@pytest.mark.flaky(reruns=10, reruns_delay=5)
@pytest.mark.slow
@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 2, timeout=60)
async def test_secede_balances(c, s, a, b):
@@ -5276,18 +5254,15 @@ def _dynamic_workload(x, delay=0.01):
return total.result()
-def _test_dynamic_workloads_sync(c, delay):
- future = c.submit(_dynamic_workload, 0, delay=delay)
- assert future.result(timeout=40) == 52
-
-
def test_dynamic_workloads_sync(c):
- _test_dynamic_workloads_sync(c, delay=0.02)
+ future = c.submit(_dynamic_workload, 0, delay=0.02)
+ assert future.result(timeout=20) == 52
@pytest.mark.slow
def test_dynamic_workloads_sync_random(c):
- _test_dynamic_workloads_sync(c, delay="random")
+ future = c.submit(_dynamic_workload, 0, delay="random")
+ assert future.result(timeout=20) == 52
@pytest.mark.xfail(COMPILED, reason="Fails with cythonized scheduler")
@@ -5424,6 +5399,7 @@ async def test_call_stack_collections_al
assert result
+@pytest.mark.flaky(condition=WINDOWS, reruns=10, reruns_delay=5)
@gen_cluster(client=True, worker_kwargs={"profile_cycle_interval": "100ms"})
async def test_profile(c, s, a, b):
futures = c.map(slowinc, range(10), delay=0.05, workers=a.address)
@@ -5843,9 +5819,15 @@ async def test_scatter_error_cancel(c, s
assert y.status == "error" # not cancelled
+@pytest.mark.xfail(reason="GH#5409 Dask-Default-Threads are frequently detected")
def test_no_threads_lingering():
+ if threading.active_count() < 40:
+ return
active = dict(threading._active)
- assert threading.active_count() < 40, list(active.values())
+ print(f"==== Found {len(active)} active threads: ====")
+ for t in active.values():
+ print(t)
+ assert False
@gen_cluster()
@@ -6071,18 +6053,18 @@ async def test_file_descriptors_dont_lea
df = dask.datasets.timeseries(freq="10s", dtypes={"x": int, "y": float})
proc = psutil.Process()
- start = proc.num_fds()
+ before = proc.num_fds()
async with Scheduler(port=0, dashboard_address=":0") as s:
- async with Worker(s.address, nthreads=2) as a, Worker(
- s.address, nthreads=2
- ) as b:
- async with Client(s.address, asynchronous=True) as c:
- await df.sum().persist()
+ async with Worker(s.address), Worker(s.address), Client(
+ s.address, asynchronous=True
+ ):
+ assert proc.num_fds() > before
+ await df.sum().persist()
- begin = time()
- while proc.num_fds() > begin:
+ start = time()
+ while proc.num_fds() > before:
await asyncio.sleep(0.01)
- assert time() < begin + 5, (start, proc.num_fds())
+ assert time() < start + 10, (before, proc.num_fds())
@pytest.mark.asyncio
Index: distributed-2021.7.0/distributed/tests/test_client_executor.py
===================================================================
--- distributed-2021.7.0.orig/distributed/tests/test_client_executor.py
+++ distributed-2021.7.0/distributed/tests/test_client_executor.py
@@ -13,6 +13,7 @@ import pytest
from tlz import take
from distributed import Client
+from distributed.compatibility import MACOS
from distributed.utils import CancelledError
from distributed.utils_test import (
cluster,
@@ -93,13 +94,12 @@ def test_wait(client):
assert "hello" in str(errors[0])
-@pytest.mark.flaky(reruns=10, reruns_delay=5)
def test_cancellation(client):
with client.get_executor(pure=False) as e:
fut = e.submit(time.sleep, 2.0)
start = time.time()
while number_of_processing_tasks(client) == 0:
- assert time.time() < start + 1
+ assert time.time() < start + 10
time.sleep(0.01)
assert not fut.done()
@@ -107,7 +107,7 @@ def test_cancellation(client):
assert fut.cancelled()
start = time.time()
while number_of_processing_tasks(client) != 0:
- assert time.time() < start + 1
+ assert time.time() < start + 10
time.sleep(0.01)
with pytest.raises(CancelledError):
@@ -118,7 +118,7 @@ def test_cancellation(client):
N = 10
fs = [e.submit(slowinc, i, delay=0.02) for i in range(N)]
fs[3].cancel()
- res = wait(fs, return_when=FIRST_COMPLETED)
+ res = wait(fs, return_when=FIRST_COMPLETED, timeout=30)
assert len(res.not_done) > 0
assert len(res.done) >= 1
@@ -132,10 +132,11 @@ def test_cancellation(client):
fs[3].cancel()
fs[8].cancel()
- n_cancelled = sum(f.cancelled() for f in as_completed(fs))
+ n_cancelled = sum(f.cancelled() for f in as_completed(fs, timeout=30))
assert n_cancelled == 2
+@pytest.mark.flaky(condition=MACOS, reruns=10, reruns_delay=5)
def test_map(client):
with client.get_executor() as e:
N = 10
Index: distributed-2021.7.0/distributed/tests/test_collections.py
===================================================================
--- distributed-2021.7.0.orig/distributed/tests/test_collections.py
+++ distributed-2021.7.0/distributed/tests/test_collections.py
@@ -40,7 +40,7 @@ def assert_equal(a, b):
assert a == b
-@gen_cluster(timeout=240, client=True)
+@gen_cluster(client=True)
async def test_dataframes(c, s, a, b):
df = pd.DataFrame(
{"x": np.random.random(1000), "y": np.random.random(1000)},
Index: distributed-2021.7.0/distributed/tests/test_diskutils.py
===================================================================
--- distributed-2021.7.0.orig/distributed/tests/test_diskutils.py
+++ distributed-2021.7.0/distributed/tests/test_diskutils.py
@@ -12,7 +12,7 @@ import pytest
import dask
-from distributed.compatibility import MACOS, WINDOWS
+from distributed.compatibility import MACOS
from distributed.diskutils import WorkSpace
from distributed.metrics import time
from distributed.utils import mp_context
@@ -273,16 +273,14 @@ def _test_workspace_concurrency(tmpdir,
return n_created, n_purged
-@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=MACOS)
@pytest.mark.slow
+@pytest.mark.xfail(condition=MACOS, reason="extremely flaky")
def test_workspace_concurrency(tmpdir):
- if WINDOWS:
- raise pytest.xfail.Exception("TODO: unknown failure on windows")
_test_workspace_concurrency(tmpdir, 5.0, 6)
-@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=MACOS)
@pytest.mark.slow
+@pytest.mark.xfail(condition=MACOS, reason="extremely flaky")
def test_workspace_concurrency_intense(tmpdir):
n_created, n_purged = _test_workspace_concurrency(tmpdir, 8.0, 16)
assert n_created >= 100
Index: distributed-2021.7.0/distributed/tests/test_failed_workers.py
===================================================================
--- distributed-2021.7.0.orig/distributed/tests/test_failed_workers.py
+++ distributed-2021.7.0/distributed/tests/test_failed_workers.py
@@ -75,12 +75,7 @@ def test_gather_after_failed_worker(loop
assert result == list(map(inc, range(10)))
-@gen_cluster(
- client=True,
- Worker=Nanny,
- nthreads=[("127.0.0.1", 1)] * 4,
- config={"distributed.comm.timeouts.connect": "1s"},
-)
+@gen_cluster(client=True, Worker=Nanny, nthreads=[("127.0.0.1", 1)] * 4)
async def test_gather_then_submit_after_failed_workers(c, s, w, x, y, z):
L = c.map(inc, range(20))
await wait(L)
@@ -88,7 +83,7 @@ async def test_gather_then_submit_after_
w.process.process._process.terminate()
total = c.submit(sum, L)
- for i in range(3):
+ for _ in range(3):
await wait(total)
addr = first(s.tasks[total.key].who_has).address
for worker in [x, y, z]:
@@ -101,7 +96,7 @@ async def test_gather_then_submit_after_
@pytest.mark.xfail(COMPILED, reason="Fails with cythonized scheduler")
-@gen_cluster(Worker=Nanny, timeout=60, client=True)
+@gen_cluster(Worker=Nanny, client=True, timeout=60)
async def test_failed_worker_without_warning(c, s, a, b):
L = c.map(inc, range(10))
await wait(L)
@@ -298,18 +293,20 @@ async def test_multiple_clients_restart(
await c2.close()
-@pytest.mark.flaky(reruns=10, reruns_timeout=5, condition=MACOS)
@gen_cluster(Worker=Nanny, timeout=60)
async def test_restart_scheduler(s, a, b):
- import gc
+ assert len(s.nthreads) == 2
+ pids = (a.pid, b.pid)
+ assert pids[0]
+ assert pids[1]
- gc.collect()
- addrs = (a.worker_address, b.worker_address)
await s.restart()
- assert len(s.nthreads) == 2
- addrs2 = (a.worker_address, b.worker_address)
- assert addrs != addrs2
+ assert len(s.nthreads) == 2
+ pids2 = (a.pid, b.pid)
+ assert pids2[0]
+ assert pids2[1]
+ assert pids != pids2
@gen_cluster(Worker=Nanny, client=True, timeout=60)
@@ -325,6 +322,8 @@ async def test_forgotten_futures_dont_cl
await y
+@pytest.mark.slow
+@pytest.mark.flaky(condition=MACOS, reruns=10, reruns_delay=5)
@gen_cluster(client=True, timeout=60, active_rpc_timeout=10)
async def test_broken_worker_during_computation(c, s, a, b):
s.allowed_failures = 100
@@ -404,14 +403,13 @@ class SlowTransmitData:
return parse_bytes(dask.config.get("distributed.comm.offload")) + 1
+@pytest.mark.flaky(reruns=10, reruns_delay=5)
@gen_cluster(client=True)
async def test_worker_who_has_clears_after_failed_connection(c, s, a, b):
n = await Nanny(s.address, nthreads=2, loop=s.loop)
- start = time()
while len(s.nthreads) < 3:
await asyncio.sleep(0.01)
- assert time() < start + 5
def slow_ser(x, delay):
return SlowTransmitData(x, delay=delay)
@@ -497,7 +495,7 @@ async def test_restart_timeout_on_long_r
with captured_logger("distributed.scheduler") as sio:
future = c.submit(sleep, 3600)
await asyncio.sleep(0.1)
- await c.restart(timeout=20)
+ await c.restart()
text = sio.getvalue()
assert "timeout" not in text.lower()
@@ -547,7 +545,7 @@ class SlowDeserialize:
return parse_bytes(dask.config.get("distributed.comm.offload")) + 1
-@gen_cluster(client=True, timeout=None)
+@gen_cluster(client=True)
async def test_handle_superfluous_data(c, s, a, b):
"""
See https://github.com/dask/distributed/pull/4784#discussion_r649210094
Index: distributed-2021.7.0/distributed/tests/test_nanny.py
===================================================================
--- distributed-2021.7.0.orig/distributed/tests/test_nanny.py
+++ distributed-2021.7.0/distributed/tests/test_nanny.py
@@ -4,9 +4,10 @@ import logging
import multiprocessing as mp
import os
import random
-import sys
from contextlib import suppress
+from time import sleep
+import psutil
import pytest
from tlz import first, valmap
from tornado.ioloop import IOLoop
@@ -14,6 +15,7 @@ from tornado.ioloop import IOLoop
import dask
from distributed import Client, Nanny, Scheduler, Worker, rpc, wait, worker
+from distributed.compatibility import LINUX, WINDOWS
from distributed.core import CommClosedError, Status
from distributed.diagnostics import SchedulerPlugin
from distributed.metrics import time
@@ -72,12 +74,11 @@ async def test_str(s, a, b):
@gen_cluster(nthreads=[], client=True)
async def test_nanny_process_failure(c, s):
- n = await Nanny(s.address, nthreads=2, loop=s.loop)
+ n = await Nanny(s.address, nthreads=2)
first_dir = n.worker_dir
assert os.path.exists(first_dir)
- original_address = n.worker_address
ww = rpc(n.worker_address)
await ww.update_data(data=valmap(dumps, {"x": 1, "y": 2}))
pid = n.pid
@@ -85,23 +86,17 @@ async def test_nanny_process_failure(c,
with suppress(CommClosedError):
await c.run(os._exit, 0, workers=[n.worker_address])
- start = time()
while n.pid == pid: # wait while process dies and comes back
await asyncio.sleep(0.01)
- assert time() - start < 5
- start = time()
await asyncio.sleep(1)
while not n.is_alive(): # wait while process comes back
await asyncio.sleep(0.01)
- assert time() - start < 5
# assert n.worker_address != original_address # most likely
- start = time()
while n.worker_address not in s.nthreads or n.worker_dir is None:
await asyncio.sleep(0.01)
- assert time() - start < 5
second_dir = n.worker_dir
@@ -115,7 +110,6 @@ async def test_nanny_process_failure(c,
@gen_cluster(nthreads=[])
async def test_run(s):
- pytest.importorskip("psutil")
n = await Nanny(s.address, nthreads=2, loop=s.loop)
with rpc(n.address) as nn:
@@ -173,7 +167,7 @@ async def test_nanny_alt_worker_class(c,
@pytest.mark.slow
-@gen_cluster(client=False, nthreads=[])
+@gen_cluster(nthreads=[])
async def test_nanny_death_timeout(s):
await s.close()
w = Nanny(s.address, death_timeout=1)
@@ -198,12 +192,9 @@ async def test_random_seed(c, s, a, b):
await check_func(lambda a, b: np.random.randint(a, b))
-@pytest.mark.skipif(
- sys.platform.startswith("win"), reason="num_fds not supported on windows"
-)
-@gen_cluster(client=False, nthreads=[])
+@pytest.mark.skipif(WINDOWS, reason="num_fds not supported on windows")
+@gen_cluster(nthreads=[])
async def test_num_fds(s):
- psutil = pytest.importorskip("psutil")
proc = psutil.Process()
# Warm up
@@ -219,16 +210,12 @@ async def test_num_fds(s):
await asyncio.sleep(0.1)
await w.close()
- start = time()
while proc.num_fds() > before:
print("fds:", before, proc.num_fds())
await asyncio.sleep(0.1)
- assert time() < start + 10
-@pytest.mark.skipif(
- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost"
-)
+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost")
@gen_cluster(client=True, nthreads=[])
async def test_worker_uses_same_host_as_nanny(c, s):
for host in ["tcp://0.0.0.0", "tcp://127.0.0.2"]:
@@ -273,29 +260,24 @@ async def test_nanny_timeout(c, s, a):
nthreads=[("127.0.0.1", 1)],
client=True,
Worker=Nanny,
- worker_kwargs={"memory_limit": 1e8},
- timeout=20,
- clean_kwargs={"threads": False},
+ worker_kwargs={"memory_limit": "400 MiB"},
)
async def test_nanny_terminate(c, s, a):
- from time import sleep
-
def leak():
L = []
while True:
- L.append(b"0" * 5000000)
+ L.append(b"0" * 5_000_000)
sleep(0.01)
- proc = a.process.pid
+ before = a.process.pid
with captured_logger(logging.getLogger("distributed.nanny")) as logger:
future = c.submit(leak)
- start = time()
- while a.process.pid == proc:
- await asyncio.sleep(0.1)
- assert time() < start + 10
- out = logger.getvalue()
- assert "restart" in out.lower()
- assert "memory" in out.lower()
+ while a.process.pid == before:
+ await asyncio.sleep(0.01)
+
+ out = logger.getvalue()
+ assert "restart" in out.lower()
+ assert "memory" in out.lower()
@gen_cluster(
Index: distributed-2021.7.0/distributed/tests/test_publish.py
===================================================================
--- distributed-2021.7.0.orig/distributed/tests/test_publish.py
+++ distributed-2021.7.0/distributed/tests/test_publish.py
@@ -11,7 +11,7 @@ from distributed.protocol import Seriali
from distributed.utils_test import gen_cluster, inc
-@gen_cluster(client=False)
+@gen_cluster()
async def test_publish_simple(s, a, b):
c = Client(s.address, asynchronous=True)
f = Client(s.address, asynchronous=True)
@@ -37,7 +37,7 @@ async def test_publish_simple(s, a, b):
await asyncio.gather(c.close(), f.close())
-@gen_cluster(client=False)
+@gen_cluster()
async def test_publish_non_string_key(s, a, b):
async with Client(s.address, asynchronous=True) as c:
for name in [("a", "b"), 9.0, 8]:
@@ -52,7 +52,7 @@ async def test_publish_non_string_key(s,
assert name in datasets
-@gen_cluster(client=False)
+@gen_cluster()
async def test_publish_roundtrip(s, a, b):
c = await Client(s.address, asynchronous=True)
f = await Client(s.address, asynchronous=True)
@@ -147,7 +147,7 @@ def test_unpublish_multiple_datasets_syn
assert "y" in str(exc_info.value)
-@gen_cluster(client=False)
+@gen_cluster()
async def test_publish_bag(s, a, b):
db = pytest.importorskip("dask.bag")
c = await Client(s.address, asynchronous=True)
Index: distributed-2021.7.0/distributed/tests/test_pubsub.py
===================================================================
--- distributed-2021.7.0.orig/distributed/tests/test_pubsub.py
+++ distributed-2021.7.0/distributed/tests/test_pubsub.py
@@ -10,7 +10,7 @@ from distributed.metrics import time
from distributed.utils_test import gen_cluster
-@gen_cluster(client=True, timeout=None)
+@gen_cluster(client=True)
async def test_speed(c, s, a, b):
"""
This tests how quickly we can move messages back and forth
Index: distributed-2021.7.0/distributed/tests/test_queues.py
===================================================================
--- distributed-2021.7.0.orig/distributed/tests/test_queues.py
+++ distributed-2021.7.0/distributed/tests/test_queues.py
@@ -110,7 +110,7 @@ def test_picklability_sync(client):
@pytest.mark.slow
-@gen_cluster(client=True, nthreads=[("127.0.0.1", 2)] * 5, Worker=Nanny, timeout=None)
+@gen_cluster(client=True, nthreads=[("127.0.0.1", 2)] * 5, Worker=Nanny)
async def test_race(c, s, *workers):
def f(i):
with worker_client() as c:
Index: distributed-2021.7.0/distributed/tests/test_resources.py
===================================================================
--- distributed-2021.7.0.orig/distributed/tests/test_resources.py
+++ distributed-2021.7.0/distributed/tests/test_resources.py
@@ -9,7 +9,6 @@ from dask.utils import stringify
from distributed import Worker
from distributed.client import wait
-from distributed.compatibility import WINDOWS
from distributed.utils_test import gen_cluster, inc, slowadd, slowinc
@@ -378,9 +377,7 @@ async def test_full_collections(c, s, a,
reason="don't track resources through optimization"
),
),
- pytest.param(
- False, marks=pytest.mark.flaky(reruns=10, reruns_delay=5, condition=WINDOWS)
- ),
+ False,
],
)
def test_collections_get(client, optimize_graph, s, a, b):
Index: distributed-2021.7.0/distributed/tests/test_scheduler.py
===================================================================
--- distributed-2021.7.0.orig/distributed/tests/test_scheduler.py
+++ distributed-2021.7.0/distributed/tests/test_scheduler.py
@@ -12,6 +12,7 @@ from time import sleep
from unittest import mock
import cloudpickle
+import psutil
import pytest
from tlz import concat, first, frequencies, merge, valmap
@@ -21,7 +22,7 @@ from dask.utils import apply, parse_time
from distributed import Client, Nanny, Worker, fire_and_forget, wait
from distributed.comm import Comm
-from distributed.compatibility import MACOS, WINDOWS
+from distributed.compatibility import LINUX, WINDOWS
from distributed.core import ConnectionPool, Status, connect, rpc
from distributed.metrics import time
from distributed.protocol.pickle import dumps
@@ -618,7 +619,7 @@ async def test_ready_remove_worker(s, a,
assert all(len(w.processing) > w.nthreads for w in s.workers.values())
-@gen_cluster(client=True, Worker=Nanny)
+@gen_cluster(client=True, Worker=Nanny, timeout=60)
async def test_restart(c, s, a, b):
futures = c.map(inc, range(20))
await wait(futures)
@@ -742,24 +743,17 @@ async def test_config_stealing(cleanup):
assert "stealing" not in s.extensions
-@pytest.mark.skipif(
- sys.platform.startswith("win"), reason="file descriptors not really a thing"
-)
+@pytest.mark.skipif(WINDOWS, reason="num_fds not supported on windows")
@gen_cluster(nthreads=[])
async def test_file_descriptors_dont_leak(s):
- psutil = pytest.importorskip("psutil")
proc = psutil.Process()
before = proc.num_fds()
- w = await Worker(s.address)
- await w.close()
+ async with Worker(s.address):
+ assert proc.num_fds() > before
- during = proc.num_fds()
-
- start = time()
while proc.num_fds() > before:
await asyncio.sleep(0.01)
- assert time() < start + 5
@gen_cluster()
@@ -827,7 +821,7 @@ async def test_scheduler_sees_memory_lim
await w.close()
-@gen_cluster(client=True, timeout=1000)
+@gen_cluster(client=True)
async def test_retire_workers(c, s, a, b):
[x] = await c.scatter([1], workers=a.address)
[y] = await c.scatter([list(range(1000))], workers=b.address)
@@ -929,13 +923,10 @@ async def test_retire_workers_no_suspici
@pytest.mark.slow
-@pytest.mark.skipif(
- sys.platform.startswith("win"), reason="file descriptors not really a thing"
-)
-@gen_cluster(client=True, nthreads=[], timeout=240)
+@pytest.mark.skipif(WINDOWS, reason="num_fds not supported on windows")
+@gen_cluster(client=True, nthreads=[], timeout=60)
async def test_file_descriptors(c, s):
await asyncio.sleep(0.1)
- psutil = pytest.importorskip("psutil")
da = pytest.importorskip("dask.array")
proc = psutil.Process()
num_fds_1 = proc.num_fds()
@@ -980,10 +971,8 @@ async def test_file_descriptors(c, s):
assert comm.closed() or comm.peer_address != s.address, comm
assert not s.stream_comms
- start = time()
while proc.num_fds() > num_fds_1 + N:
await asyncio.sleep(0.01)
- assert time() < start + 3
@pytest.mark.slow
@@ -1325,36 +1314,30 @@ async def test_non_existent_worker(c, s)
async def test_correct_bad_time_estimate(c, s, *workers):
future = c.submit(slowinc, 1, delay=0)
await wait(future)
-
futures = [c.submit(slowinc, future, delay=0.1, pure=False) for i in range(20)]
-
await asyncio.sleep(0.5)
-
await wait(futures)
-
assert all(w.data for w in workers), [sorted(w.data) for w in workers]
-@gen_test()
-async def test_service_hosts():
- port = 0
- for url, expected in [
- ("tcp://0.0.0.0", ("::", "0.0.0.0")),
- ("tcp://127.0.0.1", ("::", "0.0.0.0")),
- ("tcp://127.0.0.1:38275", ("::", "0.0.0.0")),
- ]:
- async with Scheduler(host=url) as s:
- sock = first(s.http_server._sockets.values())
- if isinstance(expected, tuple):
- assert sock.getsockname()[0] in expected
- else:
- assert sock.getsockname()[0] == expected
-
- port = ("127.0.0.1", 0)
- for url in ["tcp://0.0.0.0", "tcp://127.0.0.1", "tcp://127.0.0.1:38275"]:
- async with Scheduler(dashboard_address="127.0.0.1:0", host=url) as s:
- sock = first(s.http_server._sockets.values())
- assert sock.getsockname()[0] == "127.0.0.1"
+@pytest.mark.parametrize(
+ "host", ["tcp://0.0.0.0", "tcp://127.0.0.1", "tcp://127.0.0.1:38275"]
+)
+@pytest.mark.parametrize(
+ "dashboard_address,expect",
+ [
+ (None, ("::", "0.0.0.0")),
+ ("127.0.0.1:0", ("127.0.0.1",)),
+ ],
+)
+@pytest.mark.asyncio
+async def test_dashboard_host(host, dashboard_address, expect):
+ """Dashboard is accessible from any host by default, but it can be also bound to
+ localhost.
+ """
+ async with Scheduler(host=host, dashboard_address=dashboard_address) as s:
+ sock = first(s.http_server._sockets.values())
+ assert sock.getsockname()[0] in expect
@gen_cluster(client=True, worker_kwargs={"profile_cycle_interval": "100ms"})
@@ -1543,9 +1526,6 @@ async def test_retries(c, s, a, b):
exc_info.match("one")
-@pytest.mark.flaky(
- reruns=10, reruns_delay=5, reason="second worker also errant for some reason"
-)
@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3)
async def test_missing_data_errant_worker(c, s, w1, w2, w3):
with dask.config.set({"distributed.comm.timeouts.connect": "1s"}):
@@ -1779,7 +1759,7 @@ async def test_bandwidth(c, s, a, b):
assert not s.bandwidth_workers
-@gen_cluster(client=True, Worker=Nanny)
+@gen_cluster(client=True, Worker=Nanny, timeout=60)
async def test_bandwidth_clear(c, s, a, b):
np = pytest.importorskip("numpy")
x = c.submit(np.arange, 1000000, workers=[a.worker_address], pure=False)
@@ -1821,9 +1801,7 @@ async def test_close_workers(s, a, b):
assert b.status == Status.closed
-@pytest.mark.skipif(
- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost"
-)
+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost")
@gen_test()
async def test_host_address():
s = await Scheduler(host="127.0.0.2", port=0)
@@ -2361,7 +2339,7 @@ async def test_unknown_task_duration_con
assert s.idle_since == s.time_started
-@gen_cluster(client=True, timeout=None)
+@gen_cluster(client=True)
async def test_retire_state_change(c, s, a, b):
np = pytest.importorskip("numpy")
y = c.map(lambda x: x ** 2, range(10))
@@ -2482,8 +2460,8 @@ async def assert_memory(scheduler_or_wor
t0 = time()
while True:
minfo = scheduler_or_workerstate.memory
- nbytes = getattr(minfo, attr)
- if min_ * 2 ** 20 <= nbytes <= max_ * 2 ** 20:
+ nmib = getattr(minfo, attr) / 2 ** 20
+ if min_ <= nmib <= max_:
return
if time() - t0 > timeout:
raise TimeoutError(
@@ -2492,13 +2470,12 @@ async def assert_memory(scheduler_or_wor
await asyncio.sleep(0.1)
-# This test is heavily influenced by hard-to-control factors such as memory management
-# by the Python interpreter and the OS, so it occasionally glitches
-@pytest.mark.flaky(reruns=3, reruns_delay=5)
-# ~33s runtime, or distributed.memory.recent-to-old-time + 3s
+# ~31s runtime, or distributed.worker.memory.recent-to-old-time + 1s.
+# On Windows, it can take ~65s due to worker memory needing to stabilize first.
@pytest.mark.slow
+@pytest.mark.flaky(condition=LINUX, reason="see comments", reruns=10, reruns_delay=5)
@gen_cluster(
- client=True, Worker=Nanny, worker_kwargs={"memory_limit": "500 MiB"}, timeout=60
+ client=True, Worker=Nanny, worker_kwargs={"memory_limit": "500 MiB"}, timeout=120
)
async def test_memory(c, s, *_):
pytest.importorskip("zict")
@@ -2511,14 +2488,25 @@ async def test_memory(c, s, *_):
assert s_m0.managed == 0
assert a.memory.managed == 0
assert b.memory.managed == 0
- # When a worker first goes online, its RAM is immediately counted as
- # unmanaged_old
- await assert_memory(s, "unmanaged_recent", 0, 40, timeout=0)
- await assert_memory(a, "unmanaged_recent", 0, 20, timeout=0)
- await assert_memory(b, "unmanaged_recent", 0, 20, timeout=0)
- f1 = c.submit(leaking, 100, 50, 5, pure=False, workers=[a.name])
- f2 = c.submit(leaking, 100, 50, 5, pure=False, workers=[b.name])
+ # When a worker first goes online, its RAM is immediately counted as unmanaged_old.
+ # On Windows, however, there is somehow enough time between the worker start and
+ # this line for 2 heartbeats and the memory keeps growing substantially for a while.
+ # Sometimes there is a single heartbeat but on the consecutive test we observe
+ # a large unexplained increase in unmanaged_recent memory.
+ # Wait for the situation to stabilize.
+ if WINDOWS:
+ await asyncio.sleep(10)
+ initial_timeout = 40
+ else:
+ initial_timeout = 0
+
+ await assert_memory(s, "unmanaged_recent", 0, 40, timeout=initial_timeout)
+ await assert_memory(a, "unmanaged_recent", 0, 20, timeout=initial_timeout)
+ await assert_memory(b, "unmanaged_recent", 0, 20, timeout=initial_timeout)
+
+ f1 = c.submit(leaking, 100, 50, 10, pure=False, workers=[a.name])
+ f2 = c.submit(leaking, 100, 50, 10, pure=False, workers=[b.name])
await assert_memory(s, "unmanaged_recent", 300, 380)
await assert_memory(a, "unmanaged_recent", 150, 190)
await assert_memory(b, "unmanaged_recent", 150, 190)
@@ -2533,19 +2521,17 @@ async def test_memory(c, s, *_):
await assert_memory(b, "unmanaged_recent", 50, 90)
# Force the output of f1 and f2 to spill to disk.
- # With target=0.6 and memory_limit=500 MiB, we'll start spilling at 300 MiB
- # process memory per worker, or roughly after 3~7 rounds of the below depending
- # on how much RAM the interpreter is using.
+ # With spill=0.7 and memory_limit=500 MiB, we'll start spilling at 350 MiB process
+ # memory per worker, or up to 20 iterations of the below depending on how much RAM
+ # the interpreter is using.
more_futs = []
- for _ in range(8):
- if s.memory.managed_spilled > 0:
- break
- more_futs += [
- c.submit(leaking, 20, 0, 0, pure=False, workers=[a.name]),
- c.submit(leaking, 20, 0, 0, pure=False, workers=[b.name]),
- ]
- await asyncio.sleep(2)
- await assert_memory(s, "managed_spilled", 1, 999)
+ while not s.memory.managed_spilled:
+ if a.memory.process < 0.7 * 500 * 2 ** 20:
+ more_futs.append(c.submit(leaking, 10, 0, 0, pure=False, workers=[a.name]))
+ if b.memory.process < 0.7 * 500 * 2 ** 20:
+ more_futs.append(c.submit(leaking, 10, 0, 0, pure=False, workers=[b.name]))
+ await wait(more_futs)
+ await asyncio.sleep(1)
# Wait for the spilling to finish. Note that this does not make the test take
# longer as we're waiting for recent-to-old-time anyway.
@@ -2568,24 +2554,23 @@ async def test_memory(c, s, *_):
# transition into unmanaged_old
await c.run(gc.collect)
await assert_memory(s, "unmanaged_recent", 0, 90, timeout=40)
- await assert_memory(
- s,
- "unmanaged_old",
- orig_old + 90,
- # On MacOS, the process memory of the Python interpreter does not shrink as
- # fast as on Linux/Windows
- 9999 if MACOS else orig_old + 190,
- timeout=40,
- )
-
- # When the leaked memory is cleared, unmanaged and unmanaged_old drop
- # On MacOS, the process memory of the Python interpreter does not shrink as fast
- # as on Linux/Windows
- if not MACOS:
- await c.run(clear_leak)
- await assert_memory(s, "unmanaged", 0, orig_unmanaged + 95)
- await assert_memory(s, "unmanaged_old", 0, orig_old + 95)
- await assert_memory(s, "unmanaged_recent", 0, 90)
+ await assert_memory(s, "unmanaged_old", orig_old + 90, 9999, timeout=40)
+
+ # When the leaked memory is cleared, unmanaged and unmanaged_old drop.
+ # On MacOS and Windows, the process memory of the Python interpreter does not shrink
+ # as fast as on Linux. Note that this behaviour is heavily impacted by OS tweaks,
+ # meaning that what you observe on your local host may behave differently on CI.
+ # Even on Linux, this occasionally glitches - hence why there is a flaky marker on
+ # this test.
+ if not LINUX:
+ return
+
+ orig_unmanaged = s.memory.unmanaged / 2 ** 20
+ orig_old = s.memory.unmanaged_old / 2 ** 20
+ await c.run(clear_leak)
+ await assert_memory(s, "unmanaged", 0, orig_unmanaged - 60)
+ await assert_memory(s, "unmanaged_old", 0, orig_old - 60)
+ await assert_memory(s, "unmanaged_recent", 0, 90)
@gen_cluster(client=True, worker_kwargs={"memory_limit": 0})
@@ -2825,23 +2810,20 @@ async def test_rebalance_no_limit(c, s,
worker_kwargs={"memory_limit": "1000 MiB"},
config={
"distributed.worker.memory.rebalance.measure": "managed",
- "distributed.worker.memory.rebalance.recipient-max": 0.4,
+ "distributed.worker.memory.rebalance.sender-min": 0.2,
+ "distributed.worker.memory.rebalance.recipient-max": 0.1,
},
)
async def test_rebalance_no_recipients(c, s, *_):
"""There are sender workers, but no recipient workers"""
a, b = s.workers
- futures = [
- c.submit(lambda: "x" * (400 * 2 ** 20), pure=False, workers=[a]), # 40%
- c.submit(lambda: "x" * (400 * 2 ** 20), pure=False, workers=[b]), # 40%
- ] + c.map(
- lambda _: "x" * (2 ** 21), range(100), workers=[a]
- ) # 20%
- await wait(futures)
- await assert_memory(s, "managed", 1000, 1001)
- await assert_ndata(c, {a: 101, b: 1})
+ fut_a = c.map(lambda _: "x" * (2 ** 20), range(250), workers=[a]) # 25%
+ fut_b = c.map(lambda _: "x" * (2 ** 20), range(100), workers=[b]) # 10%
+ await wait(fut_a + fut_b)
+ await assert_memory(s, "managed", 350, 351)
+ await assert_ndata(c, {a: 250, b: 100})
await s.rebalance()
- await assert_ndata(c, {a: 101, b: 1})
+ await assert_ndata(c, {a: 250, b: 100})
s.validate_state()
Index: distributed-2021.7.0/distributed/tests/test_semaphore.py
===================================================================
--- distributed-2021.7.0.orig/distributed/tests/test_semaphore.py
+++ distributed-2021.7.0/distributed/tests/test_semaphore.py
@@ -11,7 +11,6 @@ from dask.distributed import Client
from distributed import Semaphore, fire_and_forget
from distributed.comm import Comm
-from distributed.compatibility import WINDOWS
from distributed.core import ConnectionPool
from distributed.metrics import time
from distributed.utils_test import captured_logger, cluster, gen_cluster, slowidentity
@@ -91,21 +90,17 @@ def test_timeout_sync(client):
@gen_cluster(
- client=True,
- timeout=20,
config={
"distributed.scheduler.locks.lease-validation-interval": "200ms",
"distributed.scheduler.locks.lease-timeout": "200ms",
},
)
-async def test_release_semaphore_after_timeout(c, s, a, b):
+async def test_release_semaphore_after_timeout(s, a, b):
sem = await Semaphore(name="x", max_leases=2)
await sem.acquire() # leases: 2 - 1 = 1
semB = await Semaphore(name="x", max_leases=2)
-
assert await semB.acquire() # leases: 1 - 1 = 0
-
assert not (await sem.acquire(timeout=0.01))
assert not (await semB.acquire(timeout=0.01))
@@ -114,9 +109,7 @@ async def test_release_semaphore_after_t
semB.refresh_callback.stop()
del semB
-
assert await sem.acquire(timeout=1)
-
assert not (await sem.acquire(timeout=0.1))
@@ -564,7 +557,6 @@ async def test_release_retry(c, s, a, b)
assert await semaphore.release() is True
-@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=WINDOWS)
@gen_cluster(
client=True,
config={
Index: distributed-2021.7.0/distributed/tests/test_steal.py
===================================================================
--- distributed-2021.7.0.orig/distributed/tests/test_steal.py
+++ distributed-2021.7.0/distributed/tests/test_steal.py
@@ -2,7 +2,6 @@ import asyncio
import itertools
import logging
import random
-import sys
import weakref
from operator import mul
from time import sleep
@@ -13,6 +12,7 @@ from tlz import concat, sliding_window
import dask
from distributed import Nanny, Worker, wait, worker_client
+from distributed.compatibility import LINUX
from distributed.config import config
from distributed.metrics import time
from distributed.scheduler import key_split
@@ -33,9 +33,7 @@ setup_module = nodebug_setup_module
teardown_module = nodebug_teardown_module
-@pytest.mark.skipif(
- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost"
-)
+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost")
@gen_cluster(client=True, nthreads=[("127.0.0.1", 2), ("127.0.0.2", 2)])
async def test_work_stealing(c, s, a, b):
[x] = await c._scatter([1], workers=a.address)
@@ -144,7 +142,7 @@ async def test_steal_related_tasks(e, s,
assert nearby > 10
-@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 10, timeout=1000)
+@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 10)
async def test_dont_steal_fast_tasks_compute_time(c, s, *workers):
def do_nothing(x, y=None):
pass
@@ -285,9 +283,7 @@ async def test_steal_worker_restrictions
assert len(wc.tasks) == 0
-@pytest.mark.skipif(
- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost"
-)
+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost")
@gen_cluster(client=True, nthreads=[("127.0.0.1", 1), ("127.0.0.2", 1)])
async def test_dont_steal_host_restrictions(c, s, a, b):
future = c.submit(slowinc, 1, delay=0.10, workers=a.address)
@@ -306,9 +302,7 @@ async def test_dont_steal_host_restricti
assert len(b.tasks) == 0
-@pytest.mark.skipif(
- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost"
-)
+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost")
@gen_cluster(client=True, nthreads=[("127.0.0.1", 1), ("127.0.0.2", 2)])
async def test_steal_host_restrictions(c, s, wa, wb):
future = c.submit(slowinc, 1, delay=0.10, workers=wa.address)
@@ -354,9 +348,7 @@ async def test_dont_steal_resource_restr
assert len(b.tasks) == 0
-@gen_cluster(
- client=True, nthreads=[("127.0.0.1", 1, {"resources": {"A": 2}})], timeout=3
-)
+@gen_cluster(client=True, nthreads=[("127.0.0.1", 1, {"resources": {"A": 2}})])
async def test_steal_resource_restrictions(c, s, a):
future = c.submit(slowinc, 1, delay=0.10, workers=a.address)
await future
@@ -601,7 +593,7 @@ def test_balance(inp, expected):
test()
-@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 2, Worker=Nanny)
+@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 2, Worker=Nanny, timeout=60)
async def test_restart(c, s, a, b):
futures = c.map(
slowinc, range(100), delay=0.1, workers=a.address, allow_other_workers=True
@@ -613,7 +605,7 @@ async def test_restart(c, s, a, b):
assert any(st for st in steal.stealable_all)
assert any(x for L in steal.stealable.values() for x in L)
- await c.restart(timeout=10)
+ await c.restart()
assert not any(x for x in steal.stealable_all)
assert not any(x for L in steal.stealable.values() for x in L)
@@ -740,7 +732,6 @@ async def test_dont_steal_long_running_t
) <= 1
-@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=sys.version_info[:2] == (3, 8))
@gen_cluster(client=True, nthreads=[("127.0.0.1", 5)] * 2)
async def test_cleanup_repeated_tasks(c, s, a, b):
class Foo:
Index: distributed-2021.7.0/distributed/tests/test_stress.py
===================================================================
--- distributed-2021.7.0.orig/distributed/tests/test_stress.py
+++ distributed-2021.7.0/distributed/tests/test_stress.py
@@ -1,6 +1,5 @@
import asyncio
import random
-import sys
from contextlib import suppress
from operator import add
from time import sleep
@@ -11,9 +10,10 @@ from tlz import concat, sliding_window
from dask import delayed
from distributed import Client, Nanny, wait
+from distributed.compatibility import WINDOWS
from distributed.config import config
from distributed.metrics import time
-from distributed.utils import All, CancelledError
+from distributed.utils import CancelledError
from distributed.utils_test import (
bump_rlimit,
cluster,
@@ -43,6 +43,7 @@ async def test_stress_1(c, s, a, b):
assert result == sum(map(inc, range(n)))
+@pytest.mark.slow
@pytest.mark.parametrize(("func", "n"), [(slowinc, 100), (inc, 1000)])
def test_stress_gc(loop, func, n):
with cluster() as (s, [a, b]):
@@ -54,10 +55,8 @@ def test_stress_gc(loop, func, n):
assert x.result() == n + 2
-@pytest.mark.skipif(
- sys.platform.startswith("win"), reason="test can leave dangling RPC objects"
-)
-@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 8, timeout=None)
+@pytest.mark.skipif(WINDOWS, reason="test can leave dangling RPC objects")
+@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 8)
async def test_cancel_stress(c, s, *workers):
da = pytest.importorskip("dask.array")
x = da.random.random((50, 50), chunks=(2, 2))
@@ -86,28 +85,33 @@ def test_cancel_stress_sync(loop):
c.cancel(f)
-@gen_cluster(nthreads=[], client=True, timeout=None)
+@pytest.mark.slow
+@gen_cluster(
+ nthreads=[],
+ client=True,
+ timeout=120,
+ scheduler_kwargs={"allowed_failures": 100_000},
+)
async def test_stress_creation_and_deletion(c, s):
# Assertions are handled by the validate mechanism in the scheduler
- s.allowed_failures = 100000
da = pytest.importorskip("dask.array")
- x = da.random.random(size=(2000, 2000), chunks=(100, 100))
- y = (x + 1).T + (x * 2) - x.mean(axis=1)
-
+ rng = da.random.RandomState(0)
+ x = rng.random(size=(2000, 2000), chunks=(100, 100))
+ y = ((x + 1).T + (x * 2) - x.mean(axis=1)).sum().round(2)
z = c.persist(y)
async def create_and_destroy_worker(delay):
start = time()
while time() < start + 5:
- n = await Nanny(s.address, nthreads=2, loop=s.loop)
- await asyncio.sleep(delay)
- await n.close()
+ async with Nanny(s.address, nthreads=2):
+ await asyncio.sleep(delay)
print("Killed nanny")
- await asyncio.wait_for(
- All([create_and_destroy_worker(0.1 * i) for i in range(20)]), 60
- )
+ await asyncio.gather(*(create_and_destroy_worker(0.1 * i) for i in range(20)))
+
+ async with Nanny(s.address, nthreads=2):
+ assert await c.compute(z) == 8000884.93
@gen_cluster(nthreads=[("127.0.0.1", 1)] * 10, client=True, timeout=60)
@@ -169,6 +173,7 @@ def vsum(*args):
@pytest.mark.avoid_ci
@pytest.mark.slow
+@pytest.mark.timeout(1100) # Override timeout from setup.cfg
@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 80, timeout=1000)
async def test_stress_communication(c, s, *workers):
s.validate = False # very slow otherwise
Index: distributed-2021.7.0/distributed/tests/test_tls_functional.py
===================================================================
--- distributed-2021.7.0.orig/distributed/tests/test_tls_functional.py
+++ distributed-2021.7.0/distributed/tests/test_tls_functional.py
@@ -42,7 +42,7 @@ async def test_Queue(c, s, a, b):
assert future.key == future2.key
-@gen_tls_cluster(client=True, timeout=None)
+@gen_tls_cluster(client=True)
async def test_client_submit(c, s, a, b):
assert s.address.startswith("tls://")
Index: distributed-2021.7.0/distributed/tests/test_utils.py
===================================================================
--- distributed-2021.7.0.orig/distributed/tests/test_utils.py
+++ distributed-2021.7.0/distributed/tests/test_utils.py
@@ -4,7 +4,6 @@ import io
import os
import queue
import socket
-import sys
import traceback
from time import sleep
@@ -13,6 +12,7 @@ from tornado.ioloop import IOLoop
import dask
+from distributed.compatibility import MACOS, WINDOWS
from distributed.metrics import time
from distributed.utils import (
LRU,
@@ -140,23 +140,12 @@ def test_ensure_ip():
assert ensure_ip("::1") == "::1"
+@pytest.mark.skipif(WINDOWS, reason="TODO")
def test_get_ip_interface():
- if sys.platform == "darwin":
- assert get_ip_interface("lo0") == "127.0.0.1"
- elif sys.platform.startswith("linux"):
- assert get_ip_interface("lo") == "127.0.0.1"
- else:
- pytest.skip(f"test needs to be enhanced for platform {sys.platform!r}")
-
- non_existent_interface = "__non-existent-interface"
- expected_error_message = f"{non_existent_interface!r}.+network interface.+"
-
- if sys.platform == "darwin":
- expected_error_message += "'lo0'"
- elif sys.platform.startswith("linux"):
- expected_error_message += "'lo'"
- with pytest.raises(ValueError, match=expected_error_message):
- get_ip_interface(non_existent_interface)
+ iface = "lo0" if MACOS else "lo"
+ assert get_ip_interface(iface) == "127.0.0.1"
+ with pytest.raises(ValueError, match=f"'__notexist'.+network interface.+'{iface}'"):
+ get_ip_interface("__notexist")
def test_truncate_exception():
Index: distributed-2021.7.0/distributed/tests/test_utils_test.py
===================================================================
--- distributed-2021.7.0.orig/distributed/tests/test_utils_test.py
+++ distributed-2021.7.0/distributed/tests/test_utils_test.py
@@ -141,7 +141,7 @@ def test_gen_cluster_cleans_up_client(lo
assert not dask.config.get("get", None)
-@gen_cluster(client=False)
+@gen_cluster()
async def test_gen_cluster_without_client(s, a, b):
assert isinstance(s, Scheduler)
for w in [a, b]:
Index: distributed-2021.7.0/distributed/tests/test_variable.py
===================================================================
--- distributed-2021.7.0.orig/distributed/tests/test_variable.py
+++ distributed-2021.7.0/distributed/tests/test_variable.py
@@ -191,7 +191,7 @@ async def test_timeout_get(c, s, a, b):
@pytest.mark.slow
-@gen_cluster(client=True, nthreads=[("127.0.0.1", 2)] * 5, Worker=Nanny, timeout=None)
+@gen_cluster(client=True, nthreads=[("127.0.0.1", 2)] * 5, Worker=Nanny)
async def test_race(c, s, *workers):
NITERS = 50
@@ -216,10 +216,8 @@ async def test_race(c, s, *workers):
results = await c.gather(futures)
assert all(r > NITERS * 0.8 for r in results)
- start = time()
while len(s.wants_what["variable-x"]) != 1:
await asyncio.sleep(0.01)
- assert time() - start < 2
@gen_cluster(client=True)
Index: distributed-2021.7.0/distributed/tests/test_worker.py
===================================================================
--- distributed-2021.7.0.orig/distributed/tests/test_worker.py
+++ distributed-2021.7.0/distributed/tests/test_worker.py
@@ -18,7 +18,6 @@ from tlz import first, pluck, sliding_wi
import dask
from dask import delayed
from dask.system import CPU_COUNT
-from dask.utils import format_bytes
from distributed import (
Client,
@@ -31,7 +30,7 @@ from distributed import (
)
from distributed.comm.registry import backends
from distributed.comm.tcp import TCPBackend
-from distributed.compatibility import MACOS, WINDOWS
+from distributed.compatibility import LINUX, MACOS, WINDOWS
from distributed.core import CommClosedError, Status, rpc
from distributed.diagnostics.plugin import PipInstall
from distributed.metrics import time
@@ -780,7 +779,7 @@ async def test_hold_onto_dependents(c, s
@pytest.mark.slow
-@gen_cluster(client=False, nthreads=[])
+@gen_cluster(nthreads=[])
async def test_worker_death_timeout(s):
with dask.config.set({"distributed.comm.timeouts.connect": "1s"}):
await s.close()
@@ -1005,27 +1004,17 @@ async def test_global_workers(s, a, b):
assert w is a or w is b
-@pytest.mark.skipif(WINDOWS, reason="file descriptors")
+@pytest.mark.skipif(WINDOWS, reason="num_fds not supported on windows")
@gen_cluster(nthreads=[])
async def test_worker_fds(s):
- psutil = pytest.importorskip("psutil")
- await asyncio.sleep(0.05)
- start = psutil.Process().num_fds()
-
- worker = await Worker(s.address, loop=s.loop)
- await asyncio.sleep(0.1)
- middle = psutil.Process().num_fds()
- start = time()
- while middle > start:
- await asyncio.sleep(0.01)
- assert time() < start + 1
+ proc = psutil.Process()
+ before = psutil.Process().num_fds()
- await worker.close()
+ async with Worker(s.address, loop=s.loop):
+ assert proc.num_fds() > before
- start = time()
- while psutil.Process().num_fds() > start:
+ while proc.num_fds() > before:
await asyncio.sleep(0.01)
- assert time() < start + 0.5
@gen_cluster(nthreads=[])
@@ -1064,13 +1053,13 @@ async def test_scheduler_file():
@gen_cluster(client=True)
async def test_scheduler_delay(c, s, a, b):
old = a.scheduler_delay
- assert abs(a.scheduler_delay) < 0.3
- assert abs(b.scheduler_delay) < 0.3
- await asyncio.sleep(a.periodic_callbacks["heartbeat"].callback_time / 1000 + 0.3)
+ assert abs(a.scheduler_delay) < 0.6
+ assert abs(b.scheduler_delay) < 0.6
+ await asyncio.sleep(a.periodic_callbacks["heartbeat"].callback_time / 1000 + 0.6)
assert a.scheduler_delay != old
-@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=MACOS)
+@pytest.mark.flaky(reruns=10, reruns_delay=5)
@gen_cluster(client=True)
async def test_statistical_profiling(c, s, a, b):
futures = c.map(slowinc, range(10), delay=0.1)
@@ -1134,7 +1123,6 @@ async def test_robust_to_bad_sizeof_esti
@pytest.mark.slow
-@pytest.mark.flaky(reruns=10, reruns_delay=5, condition=sys.version_info[:2] == (3, 8))
@gen_cluster(
nthreads=[("127.0.0.1", 2)],
client=True,
@@ -1144,7 +1132,6 @@ async def test_robust_to_bad_sizeof_esti
"memory_target_fraction": False,
"memory_pause_fraction": 0.5,
},
- timeout=20,
)
async def test_pause_executor(c, s, a):
memory = psutil.Process().memory_info().rss
@@ -1159,14 +1146,9 @@ async def test_pause_executor(c, s, a):
future = c.submit(f)
futures = c.map(slowinc, range(30), delay=0.1)
- start = time()
while not a.paused:
await asyncio.sleep(0.01)
- assert time() < start + 4, (
- format_bytes(psutil.Process().memory_info().rss),
- format_bytes(a.memory_limit),
- len(a.data),
- )
+
out = logger.getvalue()
assert "memory" in out.lower()
assert "pausing" in out.lower()
@@ -1312,9 +1294,7 @@ async def test_wait_for_outgoing(c, s, a
assert 1 / 3 < ratio < 3
-@pytest.mark.skipif(
- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost"
-)
+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost")
@gen_cluster(
nthreads=[("127.0.0.1", 1), ("127.0.0.1", 1), ("127.0.0.2", 1)], client=True
)
@@ -1467,9 +1447,7 @@ async def test_local_directory_make_new_
assert "dask-worker-space" in w.local_directory
-@pytest.mark.skipif(
- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost"
-)
+@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost")
@gen_cluster(nthreads=[], client=True)
async def test_host_address(c, s):
w = await Worker(s.address, host="127.0.0.2")
@@ -2058,7 +2036,7 @@ async def test_worker_state_error_releas
res = c.submit(raise_exc, f, g, workers=[a.address])
with pytest.raises(RuntimeError):
- await res.result()
+ await res
# Nothing bad happened on B, therefore B should hold on to G
assert len(b.tasks) == 1
@@ -2124,7 +2102,7 @@ async def test_worker_state_error_releas
res = c.submit(raise_exc, f, g, workers=[a.address])
with pytest.raises(RuntimeError):
- await res.result()
+ await res
# Nothing bad happened on B, therefore B should hold on to G
assert len(b.tasks) == 1
@@ -2190,7 +2168,7 @@ async def test_worker_state_error_releas
res = c.submit(raise_exc, f, g, workers=[a.address])
with pytest.raises(RuntimeError):
- await res.result()
+ await res
# Nothing bad happened on B, therefore B should hold on to G
assert len(b.tasks) == 1
@@ -2251,7 +2229,7 @@ async def test_worker_state_error_long_c
)
with pytest.raises(RuntimeError):
- await res.result()
+ await res
expected_states_A = {
f.key: "memory",
@@ -2319,7 +2297,7 @@ async def test_worker_state_error_long_c
await asyncio.sleep(0.01)
-@gen_cluster(client=True, nthreads=[("127.0.0.1", x) for x in range(4)], timeout=None)
+@gen_cluster(client=True, nthreads=[("127.0.0.1", x) for x in range(4)])
async def test_hold_on_to_replicas(c, s, *workers):
f1 = c.submit(inc, 1, workers=[workers[0].address], key="f1")
f2 = c.submit(inc, 2, workers=[workers[1].address], key="f2")
Index: distributed-2021.7.0/distributed/utils_test.py
===================================================================
--- distributed-2021.7.0.orig/distributed/utils_test.py
+++ distributed-2021.7.0/distributed/utils_test.py
@@ -764,6 +764,10 @@ def gen_test(timeout=_TEST_TIMEOUT):
async def test_foo():
await ... # use tornado coroutines
"""
+ assert timeout, (
+ "timeout should always be set and it should be smaller than the global one from"
+ "pytest-timeout"
+ )
def _(func):
def test_func():
@@ -809,8 +813,6 @@ async def start_cluster(
)
for i, ncore in enumerate(nthreads)
]
- # for w in workers:
- # w.rpc = workers[0].rpc
await asyncio.gather(*workers)
@@ -875,12 +877,16 @@ def gen_cluster(
start
end
"""
+ assert timeout, (
+ "timeout should always be set and it should be smaller than the global one from"
+ "pytest-timeout"
+ )
if ncores is not None:
warnings.warn("ncores= has moved to nthreads=", stacklevel=2)
nthreads = ncores
worker_kwargs = merge(
- {"memory_limit": system.MEMORY_LIMIT, "death_timeout": 10}, worker_kwargs
+ {"memory_limit": system.MEMORY_LIMIT, "death_timeout": 15}, worker_kwargs
)
def _(func):
@@ -930,8 +936,7 @@ def gen_cluster(
args = [c] + args
try:
future = func(*args, *outer_args, **kwargs)
- if timeout:
- future = asyncio.wait_for(future, timeout)
+ future = asyncio.wait_for(future, timeout)
result = await future
if s.validate:
s.validate_state()
Index: distributed-2021.7.0/distributed/worker.py
===================================================================
--- distributed-2021.7.0.orig/distributed/worker.py
+++ distributed-2021.7.0/distributed/worker.py
@@ -1217,7 +1217,7 @@ class Worker(ServerNode):
return self.close(*args, **kwargs)
async def close(
- self, report=True, timeout=10, nanny=True, executor_wait=True, safe=False
+ self, report=True, timeout=30, nanny=True, executor_wait=True, safe=False
):
with log_errors():
if self.status in (Status.closed, Status.closing):
Index: distributed-2021.7.0/setup.cfg
===================================================================
--- distributed-2021.7.0.orig/setup.cfg
+++ distributed-2021.7.0/setup.cfg
@@ -38,10 +38,9 @@ markers =
slow: marks tests as slow (deselect with '-m "not slow"')
avoid_ci: marks tests as flaky on CI on all OSs
ipython: marks tests as exercising IPython
-timeout_method = thread
+timeout_method = signal
timeout = 300
[egg_info]
tag_build =
tag_date = 0
-