From 5741cbcbc5c2a75c2552326018ee97b8fe5f257f Mon Sep 17 00:00:00 2001 From: John Vandenberg Date: Fri, 22 Mar 2019 08:28:34 +0700 Subject: [PATCH] Backport run_in_executor --- flower/api/tasks.py | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) Index: flower-1.0.0/flower/api/tasks.py =================================================================== --- flower-1.0.0.orig/flower/api/tasks.py +++ flower-1.0.0/flower/api/tasks.py @@ -83,6 +83,24 @@ class BaseTaskHandler(BaseHandler): return result +def inline_run_in_executor(func, *args): + from tornado.concurrent import Future, chain_future + + io_loop = IOLoop.current() + if not hasattr(io_loop, "_executor"): + import concurrent.futures + from tornado.process import cpu_count + + io_loop._executor = concurrent.futures.ThreadPoolExecutor( + max_workers=(cpu_count() * 5) + ) + executor = io_loop._executor + c_future = executor.submit(func, *args) + t_future = Future() + io_loop.add_future(c_future, lambda f: chain_future(f, t_future)) + return t_future + + class TaskApply(BaseTaskHandler): @web.authenticated @gen.coroutine @@ -143,8 +161,7 @@ Execute a task by name and wait results result = task.apply_async(args=args, kwargs=kwargs, **options) response = {'task-id': result.task_id} - response = yield IOLoop.current().run_in_executor( - None, self.wait_results, result, response) + response = yield inline_run_in_executor(self.wait_results, result, response) self.write(response) def wait_results(self, result, response):