diff --git a/gio/gtask.c b/gio/gtask.c index 0a80c24b8..bdef1f4f3 100644 --- a/gio/gtask.c +++ b/gio/gtask.c @@ -584,6 +584,7 @@ struct _GTask { gboolean thread_cancelled; gboolean synchronous; gboolean thread_complete; + gboolean blocking_other_task; GError *error; union { @@ -613,6 +614,8 @@ G_DEFINE_TYPE_WITH_CODE (GTask, g_task, G_TYPE_OBJECT, g_task_thread_pool_init ();) static GThreadPool *task_pool; +static GMutex task_pool_mutex; +static GPrivate task_private = G_PRIVATE_INIT (NULL); static void g_task_init (GTask *task) @@ -1208,6 +1211,15 @@ g_task_thread_complete (GTask *task) } task->thread_complete = TRUE; + + if (task->blocking_other_task) + { + g_mutex_lock (&task_pool_mutex); + g_thread_pool_set_max_threads (task_pool, + g_thread_pool_get_max_threads (task_pool) - 1, + NULL); + g_mutex_unlock (&task_pool_mutex); + } g_mutex_unlock (&task->lock); if (task->cancellable) @@ -1225,9 +1237,13 @@ g_task_thread_pool_thread (gpointer thread_data, { GTask *task = thread_data; + g_private_set (&task_private, task); + task->task_func (task, task->source_object, task->task_data, task->cancellable); g_task_thread_complete (task); + + g_private_set (&task_private, NULL); g_object_unref (task); } @@ -1294,6 +1310,18 @@ g_task_start_task_thread (GTask *task, g_thread_pool_push (task_pool, g_object_ref (task), &task->error); if (task->error) task->thread_complete = TRUE; + else if (g_private_get (&task_private)) + { + /* This thread is being spawned from another GTask thread, so + * bump up max-threads so we don't starve. + */ + g_mutex_lock (&task_pool_mutex); + if (g_thread_pool_set_max_threads (task_pool, + g_thread_pool_get_max_threads (task_pool) + 1, + NULL)) + task->blocking_other_task = TRUE; + g_mutex_unlock (&task_pool_mutex); + } } /** @@ -1747,12 +1775,19 @@ g_task_compare_priority (gconstpointer a, const GTask *tb = b; gboolean a_cancelled, b_cancelled; + /* Tasks that are causing other tasks to block have higher + * priority. + */ + if (ta->blocking_other_task && !tb->blocking_other_task) + return -1; + else if (tb->blocking_other_task && !ta->blocking_other_task) + return 1; + + /* Let already-cancelled tasks finish right away */ a_cancelled = (ta->check_cancellable && g_cancellable_is_cancelled (ta->cancellable)); b_cancelled = (tb->check_cancellable && g_cancellable_is_cancelled (tb->cancellable)); - - /* Let already-cancelled tasks finish right away */ if (a_cancelled && !b_cancelled) return -1; else if (b_cancelled && !a_cancelled) @@ -1766,7 +1801,7 @@ static void g_task_thread_pool_init (void) { task_pool = g_thread_pool_new (g_task_thread_pool_thread, NULL, - 100, FALSE, NULL); + 10, FALSE, NULL); g_assert (task_pool != NULL); g_thread_pool_set_sort_function (task_pool, g_task_compare_priority, NULL); diff --git a/gio/tests/task.c b/gio/tests/task.c index 8065807db..6dbff5c83 100644 --- a/gio/tests/task.c +++ b/gio/tests/task.c @@ -824,29 +824,37 @@ fake_task_thread (GTask *task, g_task_return_boolean (task, TRUE); } -#define G_TASK_THREAD_POOL_SIZE 100 +#define G_TASK_THREAD_POOL_SIZE 10 +static int fake_tasks_running; static void -test_run_in_thread_priority (void) +fake_task_callback (GObject *source, + GAsyncResult *result, + gpointer user_data) +{ + if (--fake_tasks_running == 0) + g_main_loop_quit (loop); +} + +static void +clog_up_thread_pool (void) { GTask *task; - GCancellable *cancellable; - int seq_a, seq_b, seq_c, seq_d; int i; - /* Flush the thread pool, then clog it up with junk tasks */ g_thread_pool_stop_unused_threads (); g_mutex_lock (&fake_task_mutex); for (i = 0; i < G_TASK_THREAD_POOL_SIZE - 1; i++) { - task = g_task_new (NULL, NULL, NULL, NULL); + task = g_task_new (NULL, NULL, fake_task_callback, NULL); g_task_set_task_data (task, &fake_task_mutex, NULL); g_assert_cmpint (g_task_get_priority (task), ==, G_PRIORITY_DEFAULT); g_task_set_priority (task, G_PRIORITY_HIGH * 2); g_assert_cmpint (g_task_get_priority (task), ==, G_PRIORITY_HIGH * 2); g_task_run_in_thread (task, fake_task_thread); g_object_unref (task); + fake_tasks_running++; } g_mutex_lock (&last_fake_task_mutex); @@ -855,6 +863,23 @@ test_run_in_thread_priority (void) g_task_set_priority (task, G_PRIORITY_HIGH * 2); g_task_run_in_thread (task, fake_task_thread); g_object_unref (task); +} + +static void +unclog_thread_pool (void) +{ + g_mutex_unlock (&fake_task_mutex); + g_main_loop_run (loop); +} + +static void +test_run_in_thread_priority (void) +{ + GTask *task; + GCancellable *cancellable; + int seq_a, seq_b, seq_c, seq_d; + + clog_up_thread_pool (); /* Queue three more tasks that we'll arrange to have run serially */ task = g_task_new (NULL, NULL, NULL, NULL); @@ -894,7 +919,50 @@ test_run_in_thread_priority (void) g_assert_cmpint (seq_a, ==, 3); g_assert_cmpint (seq_b, ==, 4); - g_mutex_unlock (&fake_task_mutex); + unclog_thread_pool (); +} + +/* test_run_in_thread_nested: task threads that block waiting on + * other task threads will not cause the thread pool to starve. + */ + +static void +run_nested_task_thread (GTask *task, + gpointer source_object, + gpointer task_data, + GCancellable *cancellable) +{ + GTask *nested; + int *nested_tasks_left = task_data; + + if ((*nested_tasks_left)--) + { + nested = g_task_new (NULL, NULL, NULL, NULL); + g_task_set_task_data (nested, nested_tasks_left, NULL); + g_task_run_in_thread_sync (nested, run_nested_task_thread); + g_object_unref (nested); + } + + g_task_return_boolean (task, TRUE); +} + +static void +test_run_in_thread_nested (void) +{ + GTask *task; + int nested_tasks_left = 2; + + clog_up_thread_pool (); + + task = g_task_new (NULL, NULL, quit_main_loop_callback, NULL); + g_task_set_task_data (task, &nested_tasks_left, NULL); + g_task_run_in_thread (task, run_nested_task_thread); + g_object_unref (task); + + g_mutex_unlock (&last_fake_task_mutex); + g_main_loop_run (loop); + + unclog_thread_pool (); } /* test_return_on_cancel */ @@ -1652,6 +1720,7 @@ main (int argc, char **argv) g_test_add_func ("/gtask/run-in-thread", test_run_in_thread); g_test_add_func ("/gtask/run-in-thread-sync", test_run_in_thread_sync); g_test_add_func ("/gtask/run-in-thread-priority", test_run_in_thread_priority); + g_test_add_func ("/gtask/run-in-thread-nested", test_run_in_thread_nested); g_test_add_func ("/gtask/return-on-cancel", test_return_on_cancel); g_test_add_func ("/gtask/return-on-cancel-sync", test_return_on_cancel_sync); g_test_add_func ("/gtask/return-on-cancel-atomic", test_return_on_cancel_atomic);