From 9f54f73be83fd0b3e897d0c077f3132000c57336 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Wed, 26 Jan 2022 15:30:16 +0000 Subject: [PATCH 01/10] avoid deadlock in ActorFuture fixes #5708 fixes #5350 --- distributed/__init__.py | 2 distributed/actor.py | 161 ++++++++++++++++++++++++++++++---------- distributed/client.py | 4 distributed/tests/test_actor.py | 95 +++++++++++++++++++---- docs/source/actors.rst | 8 - 5 files changed, 209 insertions(+), 61 deletions(-) --- a/distributed/__init__.py +++ b/distributed/__init__.py @@ -4,7 +4,7 @@ import dask from dask.config import config # type: ignore from ._version import get_versions -from .actor import Actor, ActorFuture +from .actor import Actor, BaseActorFuture from .client import ( Client, CompatibleExecutor, --- a/distributed/actor.py +++ b/distributed/actor.py @@ -1,6 +1,15 @@ +from __future__ import annotations + +import abc import asyncio import functools +import sys import threading +from dataclasses import dataclass +from datetime import timedelta +from typing import TYPE_CHECKING, Generic, NoReturn, TypeVar + +from tornado.ioloop import IOLoop from .client import Future from .protocol import to_serialize @@ -8,13 +17,52 @@ from .utils import iscoroutinefunction, from .utils_comm import WrappedKey from .worker import get_client, get_worker +_T = TypeVar("_T") + +if sys.version_info >= (3, 9): + from collections.abc import Awaitable, Generator +else: + from typing import Awaitable, Generator + +if sys.version_info >= (3, 8): + from typing import Literal +elif TYPE_CHECKING: + from typing_extensions import Literal + +if sys.version_info >= (3, 10): + from asyncio import Event as _LateLoopEvent +else: + # In python 3.10 asyncio.Lock and other primitives no longer support + # passing a loop kwarg to bind to a loop running in another thread + # e.g. calling from Client(asynchronous=False). Instead the loop is bound + # as late as possible: when calling any methods that wait on or wake + # Future instances. See: https://bugs.python.org/issue42392 + class _LateLoopEvent: + def __init__(self) -> None: + self._event: asyncio.Event | None = None + + def set(self) -> None: + if self._event is None: + self._event = asyncio.Event() + + self._event.set() + + def is_set(self) -> bool: + return self._event is not None and self._event.is_set() + + async def wait(self) -> bool: + if self._event is None: + self._event = asyncio.Event() + + return await self._event.wait() + class Actor(WrappedKey): """Controls an object on a remote worker An actor allows remote control of a stateful object living on a remote worker. Method calls on this object trigger operations on the remote - object and return ActorFutures on which we can block to get results. + object and return BaseActorFutures on which we can block to get results. Examples -------- @@ -36,7 +84,7 @@ class Actor(WrappedKey): >>> counter - Calling methods on this object immediately returns deferred ``ActorFuture`` + Calling methods on this object immediately returns deferred ``BaseActorFuture`` objects. You can call ``.result()`` on these objects to block and get the result of the function call. @@ -140,9 +188,7 @@ class Actor(WrappedKey): return attr elif callable(attr): - return lambda *args, **kwargs: ActorFuture( - None, self._io_loop, result=attr(*args, **kwargs) - ) + return lambda *args, **kwargs: EagerActorFuture(attr(*args, **kwargs)) else: return attr @@ -166,16 +212,17 @@ class Actor(WrappedKey): return await run_actor_function_on_worker() else: raise OSError("Unable to contact Actor's worker") - return result + if result["status"] == "OK": + return _OK(result["result"]) + return _Error(result["exception"]) - q = asyncio.Queue(loop=self._io_loop.asyncio_loop) + actor_future = ActorFuture(io_loop=self._io_loop) async def wait_then_add_to_queue(): - x = await run_actor_function_on_worker() - await q.put(x) + actor_future._set_result(await run_actor_function_on_worker()) self._io_loop.add_callback(wait_then_add_to_queue) - return ActorFuture(q, self._io_loop) + return actor_future return func @@ -215,10 +262,10 @@ class ProxyRPC: return func -class ActorFuture: +class BaseActorFuture(abc.ABC, Awaitable[_T]): """Future to an actor's method call - Whenever you call a method on an Actor you get an ActorFuture immediately + Whenever you call a method on an Actor you get a BaseActorFuture immediately while the computation happens in the background. You can call ``.result`` to block and collect the full result @@ -227,34 +274,72 @@ class ActorFuture: Actor """ - def __init__(self, q, io_loop, result=None): - self.q = q - self.io_loop = io_loop - if result: - self._cached_result = result - self.status = "pending" + @abc.abstractmethod + def result(self, timeout: str | timedelta | float | None = None) -> _T: + ... + + @abc.abstractmethod + def done(self) -> bool: + ... - def __await__(self): - return self._result().__await__() + def __repr__(self) -> Literal[""]: + return "" - def done(self): - return self.status != "pending" - async def _result(self, raiseit=True): - if not hasattr(self, "_cached_result"): - out = await self.q.get() - if out["status"] == "OK": - self.status = "finished" - self._cached_result = out["result"] - else: - self.status = "error" - self._cached_result = out["exception"] - if self.status == "error": - raise self._cached_result - return self._cached_result +@dataclass(frozen=True, eq=False) +class EagerActorFuture(BaseActorFuture[_T]): + """Future to an actor's method call when an actor calls another actor on the same worker""" - def result(self, timeout=None): - return sync(self.io_loop, self._result, callback_timeout=timeout) + _result: _T - def __repr__(self): - return "" + def __await__(self) -> Generator[object, None, _T]: + return self._result + yield + + def result(self, timeout: object = None) -> _T: + return self._result + + def done(self) -> Literal[True]: + return True + + +@dataclass(frozen=True, eq=False) +class _OK(Generic[_T]): + _v: _T + + def unwrap(self) -> _T: + return self._v + + +@dataclass(frozen=True, eq=False) +class _Error: + _e: Exception + + def unwrap(self) -> NoReturn: + raise self._e + + +class ActorFuture(BaseActorFuture[_T]): + def __init__(self, io_loop: IOLoop): + self._io_loop = io_loop + self._event = _LateLoopEvent() + self._out: _Error | _OK[_T] | None = None + + def __await__(self) -> Generator[object, None, _T]: + return self._result().__await__() + + def done(self) -> bool: + return self._event.is_set() + + async def _result(self) -> _T: + await self._event.wait() + out = self._out + assert out is not None + return out.unwrap() + + def _set_result(self, out: _Error | _OK[_T]) -> None: + self._out = out + self._event.set() + + def result(self, timeout: str | timedelta | float | None = None) -> _T: + return sync(self._io_loop, self._result, callback_timeout=timeout) --- a/distributed/client.py +++ b/distributed/client.py @@ -4943,11 +4943,11 @@ class as_completed: """Add multiple futures to the collection. The added futures will emit from the iterator once they finish""" - from .actor import ActorFuture + from .actor import BaseActorFuture with self.lock: for f in futures: - if not isinstance(f, (Future, ActorFuture)): + if not isinstance(f, (Future, BaseActorFuture)): raise TypeError("Input must be a future, got %s" % f) self.futures[f] += 1 self.loop.add_callback(self._track_future, f) --- a/distributed/tests/test_actor.py +++ b/distributed/tests/test_actor.py @@ -8,7 +8,7 @@ import dask from distributed import ( Actor, - ActorFuture, + BaseActorFuture, Client, Future, Nanny, @@ -16,6 +16,7 @@ from distributed import ( get_client, wait, ) +from distributed.actor import _LateLoopEvent from distributed.metrics import time from distributed.utils_test import cluster, gen_cluster @@ -39,16 +40,6 @@ class Counter: return self.n -class UsesCounter: - # An actor whose method argument is another actor - - def do_inc(self, ac): - return ac.increment().result() - - async def ado_inc(self, ac): - return await ac.ainc() - - class List: L: list = [] @@ -113,7 +104,7 @@ async def test_worker_actions(c, s, a, b assert counter._address == a_address future = counter.increment(separate_thread=separate_thread) - assert isinstance(future, ActorFuture) + assert isinstance(future, BaseActorFuture) assert "Future" in type(future).__name__ end = future.result(timeout=1) assert end > start @@ -266,6 +257,27 @@ def test_sync(client): assert "distributed.actor" not in repr(future) +def test_timeout(client): + class Waiter: + def __init__(self): + self.event = _LateLoopEvent() + + async def set(self): + self.event.set() + + async def wait(self): + return await self.event.wait() + + event = client.submit(Waiter, actor=True).result() + future = event.wait() + + with pytest.raises(asyncio.TimeoutError): + future.result(timeout="0.001s") + + event.set().result() + assert future.result() is True + + @gen_cluster(client=True, config={"distributed.comm.timeouts.connect": "1s"}) async def test_failed_worker(c, s, a, b): future = c.submit(Counter, actor=True, workers=[a.address]) @@ -538,11 +550,9 @@ async def test_actors_in_profile(c, s, a @gen_cluster(client=True) async def test_waiter(c, s, a, b): - from tornado.locks import Event - class Waiter: def __init__(self): - self.event = Event() + self.event = _LateLoopEvent() async def set(self): self.event.set() @@ -618,6 +628,42 @@ def test_worker_actor_handle_is_weakref_ def test_one_thread_deadlock(): + class UsesCounter: + # An actor whose method argument is another actor + + def do_inc(self, ac): + return ac.increment().result() + + with cluster(nworkers=2) as (cl, w): + client = Client(cl["address"]) + ac = client.submit(Counter, actor=True).result() + ac2 = client.submit(UsesCounter, actor=True, workers=[ac._address]).result() + + assert ac2.do_inc(ac).result() == 1 + + +def test_one_thread_deadlock_timeout(): + class UsesCounter: + # An actor whose method argument is another actor + + def do_inc(self, ac): + return ac.increment().result(timeout=1) + + with cluster(nworkers=2) as (cl, w): + client = Client(cl["address"]) + ac = client.submit(Counter, actor=True).result() + ac2 = client.submit(UsesCounter, actor=True, workers=[ac._address]).result() + + assert ac2.do_inc(ac).result() == 1 + + +def test_one_thread_deadlock_sync_client(): + class UsesCounter: + # An actor whose method argument is another actor + + def do_inc(self, ac): + return get_client().sync(ac.increment) + with cluster(nworkers=2) as (cl, w): client = Client(cl["address"]) ac = client.submit(Counter, actor=True).result() @@ -628,6 +674,12 @@ def test_one_thread_deadlock(): @gen_cluster(client=True) async def test_async_deadlock(client, s, a, b): + class UsesCounter: + # An actor whose method argument is another actor + + async def ado_inc(self, ac): + return await ac.ainc() + ac = await client.submit(Counter, actor=True) ac2 = await client.submit(UsesCounter, actor=True, workers=[ac._address]) @@ -698,7 +750,7 @@ async def test_actor_future_awaitable(cl ac = await client.submit(Counter, actor=True) futures = [ac.increment() for _ in range(10)] - assert all([isinstance(future, ActorFuture) for future in futures]) + assert all([isinstance(future, BaseActorFuture) for future in futures]) out = await asyncio.gather(*futures) assert all([future.done() for future in futures]) @@ -706,6 +758,17 @@ async def test_actor_future_awaitable(cl @gen_cluster(client=True) +async def test_actor_future_awaitable_deadlock(client, s, a, b): + ac = await client.submit(Counter, actor=True) + f = ac.increment() + + async def coro(): + return await f + + assert await asyncio.gather(coro(), coro()) == [1, 1] + + +@gen_cluster(client=True) async def test_serialize_with_pickle(c, s, a, b): class Foo: def __init__(self): --- a/docs/source/actors.rst +++ b/docs/source/actors.rst @@ -115,15 +115,15 @@ However accessing an attribute or callin to the remote worker, run the method on the remote worker in a separate thread pool, and then communicate the result back to the calling side. For attribute access these operations block and return when finished, for method calls they -return an ``ActorFuture`` immediately. +return an ``BaseActorFuture`` immediately. .. code-block:: python - >>> future = counter.increment() # Immediately returns an ActorFuture + >>> future = counter.increment() # Immediately returns a BaseActorFuture >>> future.result() # Block until finished and result arrives 1 -``ActorFuture`` are similar to normal Dask ``Future`` objects, but not as fully +``BaseActorFuture`` are similar to normal Dask ``Future`` objects, but not as fully featured. They curently *only* support the ``result`` method and nothing else. They don't currently work with any other Dask functions that expect futures, like ``as_completed``, ``wait``, or ``client.gather``. They can't be placed @@ -167,7 +167,7 @@ workers have only a single thread for ac future. The result is sent back immediately to the calling side, and is not stored on -the worker with the actor. It is cached on the ``ActorFuture`` object. +the worker with the actor. It is cached on the ``BaseActorFuture`` object. Calling from coroutines and async/await