17
0
Files
python-distributed/distributed-fix-python310.patch

61 lines
2.2 KiB
Diff

From f78696ebe8d1629fd946a4606be306bd0dbadb15 Mon Sep 17 00:00:00 2001
From: Elliott Sales de Andrade <quantum.analyst@gmail.com>
Date: Sun, 26 Sep 2021 19:30:38 -0400
Subject: [PATCH] Remove explicit loop from asyncio Queue
In Python 3.5.3 and 3.6, the `get_event_loop` returned the expected
result always, so in Python 3.8, this parameter was deprecated and in
3.10 it was removed.
---
distributed/actor.py | 7 ++++++-
distributed/tests/test_client.py | 18 ------------------
2 files changed, 6 insertions(+), 19 deletions(-)
diff --git a/distributed/actor.py b/distributed/actor.py
index e99cac66b1..1dc16e1a58 100644
--- a/distributed/actor.py
+++ b/distributed/actor.py
@@ -168,7 +168,12 @@ async def run_actor_function_on_worker():
raise OSError("Unable to contact Actor's worker")
return result
- q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
+ loop = asyncio.get_event_loop()
+ try:
+ asyncio.set_event_loop(self._io_loop.asyncio_loop)
+ q = asyncio.Queue()
+ finally:
+ asyncio.set_event_loop(loop)
async def wait_then_add_to_queue():
x = await run_actor_function_on_worker()
diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py
index 456485f0ca..88bf18b60f 100644
--- a/distributed/tests/test_client.py
+++ b/distributed/tests/test_client.py
@@ -6301,24 +6301,6 @@ async def f(stacklevel, mode=None):
assert "cdn.bokeh.org" in data
-@gen_cluster(nthreads=[])
-async def test_client_gather_semaphore_loop(s):
- async with Client(s.address, asynchronous=True) as c:
- assert c._gather_semaphore._loop is c.loop.asyncio_loop
-
-
-@gen_cluster(client=True)
-async def test_as_completed_condition_loop(c, s, a, b):
- seq = c.map(inc, range(5))
- ac = as_completed(seq)
- assert ac.condition._loop == c.loop.asyncio_loop
-
-
-def test_client_connectionpool_semaphore_loop(s, a, b):
- with Client(s["address"]) as c:
- assert c.rpc.semaphore._loop is c.loop.asyncio_loop
-
-
@pytest.mark.slow
@gen_cluster(nthreads=[], timeout=60)
async def test_mixed_compression(s):