gtask: remove hardcoded GTask thread-pool size

GTask used a 10-thread thread pool for g_task_run_in_thread() /
g_task_run_in_thread_sync(), but this ran into problems when task
threads blocked waiting for another g_task_run_in_thread_sync()
operation to complete. Previously there was a workaround for this, by
bumping up the thread limit when that case was detected, but deadlocks
could still happen if there were non-GTask threads involved. (Eg, task
A sends a message to thread X and waits for a response, but thread X
needs to complete task B in a thread before returning the response to
task A.)

So, allow GTask's thread pool to be expanded dynamically, by watching
it from the glib worker thread, and growing it (at an
exponentially-decreasing rate) if too much time passes without any
tasks completing. This should solve the deadlocking problems without
causing sudden breakage in apps that assume they can queue huge
numbers of tasks at once without consequences.

https://bugzilla.gnome.org/show_bug.cgi?id=687223
This commit is contained in:
Dan Winship
2015-03-09 16:33:16 -04:00
parent b2734d762f
commit 86866a2a6d
3 changed files with 212 additions and 35 deletions

View File

@@ -22,6 +22,7 @@
#include "gasyncresult.h"
#include "gcancellable.h"
#include "glib-private.h"
#include "glibintl.h"
@@ -558,7 +559,6 @@ struct _GTask {
gboolean thread_cancelled;
gboolean synchronous;
gboolean thread_complete;
gboolean blocking_other_task;
GError *error;
union {
@@ -594,7 +594,25 @@ G_DEFINE_TYPE_WITH_CODE (GTask, g_task, G_TYPE_OBJECT,
static GThreadPool *task_pool;
static GMutex task_pool_mutex;
static GPrivate task_private = G_PRIVATE_INIT (NULL);
static GSource *task_pool_manager;
static guint64 task_wait_time;
static gint tasks_running;
/* When the task pool fills up and blocks, and the program keeps
* queueing more tasks, we will slowly add more threads to the pool
* (in case the existing tasks are trying to queue subtasks of their
* own) until tasks start completing again. These "overflow" threads
* will only run one task apiece, and then exit, so the pool will
* eventually get back down to its base size.
*
* The base and multiplier below gives us 10 extra threads after about
* a second of blocking, 30 after 5 seconds, 100 after a minute, and
* 200 after 20 minutes.
*/
#define G_TASK_POOL_SIZE 10
#define G_TASK_WAIT_TIME_BASE 100000
#define G_TASK_WAIT_TIME_MULTIPLIER 1.03
#define G_TASK_WAIT_TIME_MAX (30 * 60 * 1000000)
static void
g_task_init (GTask *task)
@@ -1200,15 +1218,6 @@ g_task_thread_complete (GTask *task)
}
task->thread_complete = TRUE;
if (task->blocking_other_task)
{
g_mutex_lock (&task_pool_mutex);
g_thread_pool_set_max_threads (task_pool,
g_thread_pool_get_max_threads (task_pool) - 1,
NULL);
g_mutex_unlock (&task_pool_mutex);
}
g_mutex_unlock (&task->lock);
if (task->cancellable)
@@ -1220,20 +1229,65 @@ g_task_thread_complete (GTask *task)
g_task_return (task, G_TASK_RETURN_FROM_THREAD);
}
static gboolean
task_pool_manager_timeout (gpointer user_data)
{
g_mutex_lock (&task_pool_mutex);
g_thread_pool_set_max_threads (task_pool, tasks_running + 1, NULL);
g_source_set_ready_time (task_pool_manager, -1);
g_mutex_unlock (&task_pool_mutex);
return TRUE;
}
static void
g_task_thread_setup (void)
{
g_mutex_lock (&task_pool_mutex);
tasks_running++;
if (tasks_running == G_TASK_POOL_SIZE)
task_wait_time = G_TASK_WAIT_TIME_BASE;
else if (tasks_running > G_TASK_POOL_SIZE && task_wait_time < G_TASK_WAIT_TIME_MAX)
task_wait_time *= G_TASK_WAIT_TIME_MULTIPLIER;
if (tasks_running >= G_TASK_POOL_SIZE)
g_source_set_ready_time (task_pool_manager, g_get_monotonic_time () + task_wait_time);
g_mutex_unlock (&task_pool_mutex);
}
static void
g_task_thread_cleanup (void)
{
gint tasks_pending;
g_mutex_lock (&task_pool_mutex);
tasks_pending = g_thread_pool_unprocessed (task_pool);
if (tasks_running > G_TASK_POOL_SIZE)
g_thread_pool_set_max_threads (task_pool, tasks_running - 1, NULL);
else if (tasks_running + tasks_pending < G_TASK_POOL_SIZE)
g_source_set_ready_time (task_pool_manager, -1);
tasks_running--;
g_mutex_unlock (&task_pool_mutex);
}
static void
g_task_thread_pool_thread (gpointer thread_data,
gpointer pool_data)
{
GTask *task = thread_data;
g_private_set (&task_private, task);
g_task_thread_setup ();
task->task_func (task, task->source_object, task->task_data,
task->cancellable);
g_task_thread_complete (task);
g_private_set (&task_private, NULL);
g_object_unref (task);
g_task_thread_cleanup ();
}
static void
@@ -1305,18 +1359,6 @@ g_task_start_task_thread (GTask *task,
}
g_thread_pool_push (task_pool, g_object_ref (task), NULL);
if (g_private_get (&task_private))
{
/* This thread is being spawned from another GTask thread, so
* bump up max-threads so we don't starve.
*/
g_mutex_lock (&task_pool_mutex);
if (g_thread_pool_set_max_threads (task_pool,
g_thread_pool_get_max_threads (task_pool) + 1,
NULL))
task->blocking_other_task = TRUE;
g_mutex_unlock (&task_pool_mutex);
}
}
/**
@@ -1331,6 +1373,12 @@ g_task_start_task_thread (GTask *task,
*
* See #GTaskThreadFunc for more details about how @task_func is handled.
*
* Although GLib currently rate-limits the tasks queued via
* g_task_run_in_thread(), you should not assume that it will always
* do this. If you have a very large number of tasks to run, but don't
* want them to all run at once, you should only queue a limited
* number of them at a time.
*
* Since: 2.36
*/
void
@@ -1372,6 +1420,12 @@ g_task_run_in_thread (GTask *task,
* have a callback, it will not be invoked when @task_func returns.
* #GTask:completed will be set to %TRUE just before this function returns.
*
* Although GLib currently rate-limits the tasks queued via
* g_task_run_in_thread_sync(), you should not assume that it will
* always do this. If you have a very large number of tasks to run,
* but don't want them to all run at once, you should only queue a
* limited number of them at a time.
*
* Since: 2.36
*/
void
@@ -1794,14 +1848,6 @@ g_task_compare_priority (gconstpointer a,
const GTask *tb = b;
gboolean a_cancelled, b_cancelled;
/* Tasks that are causing other tasks to block have higher
* priority.
*/
if (ta->blocking_other_task && !tb->blocking_other_task)
return -1;
else if (tb->blocking_other_task && !ta->blocking_other_task)
return 1;
/* Let already-cancelled tasks finish right away */
a_cancelled = (ta->check_cancellable &&
g_cancellable_is_cancelled (ta->cancellable));
@@ -1816,14 +1862,36 @@ g_task_compare_priority (gconstpointer a,
return ta->priority - tb->priority;
}
static gboolean
trivial_source_dispatch (GSource *source,
GSourceFunc callback,
gpointer user_data)
{
return callback (user_data);
}
GSourceFuncs trivial_source_funcs = {
NULL, /* prepare */
NULL, /* check */
trivial_source_dispatch,
NULL
};
static void
g_task_thread_pool_init (void)
{
task_pool = g_thread_pool_new (g_task_thread_pool_thread, NULL,
10, FALSE, NULL);
G_TASK_POOL_SIZE, FALSE, NULL);
g_assert (task_pool != NULL);
g_thread_pool_set_sort_function (task_pool, g_task_compare_priority, NULL);
task_pool_manager = g_source_new (&trivial_source_funcs, sizeof (GSource));
g_source_set_callback (task_pool_manager, task_pool_manager_timeout, NULL, NULL);
g_source_set_ready_time (task_pool_manager, -1);
g_source_attach (task_pool_manager,
GLIB_PRIVATE_CALL (g_get_worker_context ()));
g_source_unref (task_pool_manager);
}
static void