gtask: don't deadlock when tasks block on other tasks

If tasks block waiting for other tasks to complete then the system can
end up starved for threads. Avoid this by bumping up max-threads in
that case.

This also reverts 7b1f8c58 and reverts max-threads for GTask's
GThreadPool back to 10.

https://bugzilla.gnome.org/show_bug.cgi?id=687223
This commit is contained in:
Dan Winship 2012-12-15 11:44:59 -05:00
parent 2149b29468
commit 07bb8097e5
2 changed files with 114 additions and 10 deletions

View File

@ -584,6 +584,7 @@ struct _GTask {
gboolean thread_cancelled; gboolean thread_cancelled;
gboolean synchronous; gboolean synchronous;
gboolean thread_complete; gboolean thread_complete;
gboolean blocking_other_task;
GError *error; GError *error;
union { union {
@ -613,6 +614,8 @@ G_DEFINE_TYPE_WITH_CODE (GTask, g_task, G_TYPE_OBJECT,
g_task_thread_pool_init ();) g_task_thread_pool_init ();)
static GThreadPool *task_pool; static GThreadPool *task_pool;
static GMutex task_pool_mutex;
static GPrivate task_private = G_PRIVATE_INIT (NULL);
static void static void
g_task_init (GTask *task) g_task_init (GTask *task)
@ -1208,6 +1211,15 @@ g_task_thread_complete (GTask *task)
} }
task->thread_complete = TRUE; task->thread_complete = TRUE;
if (task->blocking_other_task)
{
g_mutex_lock (&task_pool_mutex);
g_thread_pool_set_max_threads (task_pool,
g_thread_pool_get_max_threads (task_pool) - 1,
NULL);
g_mutex_unlock (&task_pool_mutex);
}
g_mutex_unlock (&task->lock); g_mutex_unlock (&task->lock);
if (task->cancellable) if (task->cancellable)
@ -1225,9 +1237,13 @@ g_task_thread_pool_thread (gpointer thread_data,
{ {
GTask *task = thread_data; GTask *task = thread_data;
g_private_set (&task_private, task);
task->task_func (task, task->source_object, task->task_data, task->task_func (task, task->source_object, task->task_data,
task->cancellable); task->cancellable);
g_task_thread_complete (task); g_task_thread_complete (task);
g_private_set (&task_private, NULL);
g_object_unref (task); g_object_unref (task);
} }
@ -1294,6 +1310,18 @@ g_task_start_task_thread (GTask *task,
g_thread_pool_push (task_pool, g_object_ref (task), &task->error); g_thread_pool_push (task_pool, g_object_ref (task), &task->error);
if (task->error) if (task->error)
task->thread_complete = TRUE; task->thread_complete = TRUE;
else if (g_private_get (&task_private))
{
/* This thread is being spawned from another GTask thread, so
* bump up max-threads so we don't starve.
*/
g_mutex_lock (&task_pool_mutex);
if (g_thread_pool_set_max_threads (task_pool,
g_thread_pool_get_max_threads (task_pool) + 1,
NULL))
task->blocking_other_task = TRUE;
g_mutex_unlock (&task_pool_mutex);
}
} }
/** /**
@ -1747,12 +1775,19 @@ g_task_compare_priority (gconstpointer a,
const GTask *tb = b; const GTask *tb = b;
gboolean a_cancelled, b_cancelled; gboolean a_cancelled, b_cancelled;
/* Tasks that are causing other tasks to block have higher
* priority.
*/
if (ta->blocking_other_task && !tb->blocking_other_task)
return -1;
else if (tb->blocking_other_task && !ta->blocking_other_task)
return 1;
/* Let already-cancelled tasks finish right away */
a_cancelled = (ta->check_cancellable && a_cancelled = (ta->check_cancellable &&
g_cancellable_is_cancelled (ta->cancellable)); g_cancellable_is_cancelled (ta->cancellable));
b_cancelled = (tb->check_cancellable && b_cancelled = (tb->check_cancellable &&
g_cancellable_is_cancelled (tb->cancellable)); g_cancellable_is_cancelled (tb->cancellable));
/* Let already-cancelled tasks finish right away */
if (a_cancelled && !b_cancelled) if (a_cancelled && !b_cancelled)
return -1; return -1;
else if (b_cancelled && !a_cancelled) else if (b_cancelled && !a_cancelled)
@ -1766,7 +1801,7 @@ static void
g_task_thread_pool_init (void) g_task_thread_pool_init (void)
{ {
task_pool = g_thread_pool_new (g_task_thread_pool_thread, NULL, task_pool = g_thread_pool_new (g_task_thread_pool_thread, NULL,
100, FALSE, NULL); 10, FALSE, NULL);
g_assert (task_pool != NULL); g_assert (task_pool != NULL);
g_thread_pool_set_sort_function (task_pool, g_task_compare_priority, NULL); g_thread_pool_set_sort_function (task_pool, g_task_compare_priority, NULL);

View File

@ -824,29 +824,37 @@ fake_task_thread (GTask *task,
g_task_return_boolean (task, TRUE); g_task_return_boolean (task, TRUE);
} }
#define G_TASK_THREAD_POOL_SIZE 100 #define G_TASK_THREAD_POOL_SIZE 10
static int fake_tasks_running;
static void static void
test_run_in_thread_priority (void) fake_task_callback (GObject *source,
GAsyncResult *result,
gpointer user_data)
{
if (--fake_tasks_running == 0)
g_main_loop_quit (loop);
}
static void
clog_up_thread_pool (void)
{ {
GTask *task; GTask *task;
GCancellable *cancellable;
int seq_a, seq_b, seq_c, seq_d;
int i; int i;
/* Flush the thread pool, then clog it up with junk tasks */
g_thread_pool_stop_unused_threads (); g_thread_pool_stop_unused_threads ();
g_mutex_lock (&fake_task_mutex); g_mutex_lock (&fake_task_mutex);
for (i = 0; i < G_TASK_THREAD_POOL_SIZE - 1; i++) for (i = 0; i < G_TASK_THREAD_POOL_SIZE - 1; i++)
{ {
task = g_task_new (NULL, NULL, NULL, NULL); task = g_task_new (NULL, NULL, fake_task_callback, NULL);
g_task_set_task_data (task, &fake_task_mutex, NULL); g_task_set_task_data (task, &fake_task_mutex, NULL);
g_assert_cmpint (g_task_get_priority (task), ==, G_PRIORITY_DEFAULT); g_assert_cmpint (g_task_get_priority (task), ==, G_PRIORITY_DEFAULT);
g_task_set_priority (task, G_PRIORITY_HIGH * 2); g_task_set_priority (task, G_PRIORITY_HIGH * 2);
g_assert_cmpint (g_task_get_priority (task), ==, G_PRIORITY_HIGH * 2); g_assert_cmpint (g_task_get_priority (task), ==, G_PRIORITY_HIGH * 2);
g_task_run_in_thread (task, fake_task_thread); g_task_run_in_thread (task, fake_task_thread);
g_object_unref (task); g_object_unref (task);
fake_tasks_running++;
} }
g_mutex_lock (&last_fake_task_mutex); g_mutex_lock (&last_fake_task_mutex);
@ -855,6 +863,23 @@ test_run_in_thread_priority (void)
g_task_set_priority (task, G_PRIORITY_HIGH * 2); g_task_set_priority (task, G_PRIORITY_HIGH * 2);
g_task_run_in_thread (task, fake_task_thread); g_task_run_in_thread (task, fake_task_thread);
g_object_unref (task); g_object_unref (task);
}
static void
unclog_thread_pool (void)
{
g_mutex_unlock (&fake_task_mutex);
g_main_loop_run (loop);
}
static void
test_run_in_thread_priority (void)
{
GTask *task;
GCancellable *cancellable;
int seq_a, seq_b, seq_c, seq_d;
clog_up_thread_pool ();
/* Queue three more tasks that we'll arrange to have run serially */ /* Queue three more tasks that we'll arrange to have run serially */
task = g_task_new (NULL, NULL, NULL, NULL); task = g_task_new (NULL, NULL, NULL, NULL);
@ -894,7 +919,50 @@ test_run_in_thread_priority (void)
g_assert_cmpint (seq_a, ==, 3); g_assert_cmpint (seq_a, ==, 3);
g_assert_cmpint (seq_b, ==, 4); g_assert_cmpint (seq_b, ==, 4);
g_mutex_unlock (&fake_task_mutex); unclog_thread_pool ();
}
/* test_run_in_thread_nested: task threads that block waiting on
* other task threads will not cause the thread pool to starve.
*/
static void
run_nested_task_thread (GTask *task,
gpointer source_object,
gpointer task_data,
GCancellable *cancellable)
{
GTask *nested;
int *nested_tasks_left = task_data;
if ((*nested_tasks_left)--)
{
nested = g_task_new (NULL, NULL, NULL, NULL);
g_task_set_task_data (nested, nested_tasks_left, NULL);
g_task_run_in_thread_sync (nested, run_nested_task_thread);
g_object_unref (nested);
}
g_task_return_boolean (task, TRUE);
}
static void
test_run_in_thread_nested (void)
{
GTask *task;
int nested_tasks_left = 2;
clog_up_thread_pool ();
task = g_task_new (NULL, NULL, quit_main_loop_callback, NULL);
g_task_set_task_data (task, &nested_tasks_left, NULL);
g_task_run_in_thread (task, run_nested_task_thread);
g_object_unref (task);
g_mutex_unlock (&last_fake_task_mutex);
g_main_loop_run (loop);
unclog_thread_pool ();
} }
/* test_return_on_cancel */ /* test_return_on_cancel */
@ -1652,6 +1720,7 @@ main (int argc, char **argv)
g_test_add_func ("/gtask/run-in-thread", test_run_in_thread); g_test_add_func ("/gtask/run-in-thread", test_run_in_thread);
g_test_add_func ("/gtask/run-in-thread-sync", test_run_in_thread_sync); g_test_add_func ("/gtask/run-in-thread-sync", test_run_in_thread_sync);
g_test_add_func ("/gtask/run-in-thread-priority", test_run_in_thread_priority); g_test_add_func ("/gtask/run-in-thread-priority", test_run_in_thread_priority);
g_test_add_func ("/gtask/run-in-thread-nested", test_run_in_thread_nested);
g_test_add_func ("/gtask/return-on-cancel", test_return_on_cancel); g_test_add_func ("/gtask/return-on-cancel", test_return_on_cancel);
g_test_add_func ("/gtask/return-on-cancel-sync", test_return_on_cancel_sync); g_test_add_func ("/gtask/return-on-cancel-sync", test_return_on_cancel_sync);
g_test_add_func ("/gtask/return-on-cancel-atomic", test_return_on_cancel_atomic); g_test_add_func ("/gtask/return-on-cancel-atomic", test_return_on_cancel_atomic);