gthreadedresolver: Switch to using a separate thread pool

Rather than running lookups in the global shared thread pool belonging
to `GTask`, run them in a private thread pool.

This is needed because the global shared thread pool is constrained to
only 14 threads. If there are 14 ongoing calls to
`g_task_run_in_thread()` from any library/code in the process, and then
one of them asks to do a DNS lookup, the lookup will block forever.

Under certain circumstances, particularly where there are a couple of
deep chains of dependent tasks running with `g_task_run_in_thread()`,
this can livelock the program.

Since `GResolver` is likely to be called as a frequent leaf call in
certain workloads, and in particular there are likely to be several
lookups requested at the same time, it makes sense to move resolver
lookups to a private thread pool.

Signed-off-by: Philip Withnall <pwithnall@endlessos.org>
This commit is contained in:
Philip Withnall 2023-04-25 15:58:46 +01:00
parent 84074ce757
commit 7b18e6205a

View File

@ -41,18 +41,39 @@
struct _GThreadedResolver
{
GResolver parent_instance;
GThreadPool *thread_pool; /* (owned) */
};
G_DEFINE_TYPE (GThreadedResolver, g_threaded_resolver, G_TYPE_RESOLVER)
static void threaded_resolver_worker_cb (GTask *task,
gpointer source_object,
gpointer task_data,
GCancellable *cancellable);
static void run_task_in_thread_pool_async (GThreadedResolver *self,
GTask *task);
static void run_task_in_thread_pool_sync (GThreadedResolver *self,
GTask *task);
static void threaded_resolver_worker_cb (gpointer task_data,
gpointer user_data);
static void
g_threaded_resolver_init (GThreadedResolver *gtr)
g_threaded_resolver_init (GThreadedResolver *self)
{
self->thread_pool = g_thread_pool_new_full (threaded_resolver_worker_cb,
self,
(GDestroyNotify) g_object_unref,
20,
FALSE,
NULL);
}
static void
g_threaded_resolver_finalize (GObject *object)
{
GThreadedResolver *self = G_THREADED_RESOLVER (object);
g_thread_pool_free (self->thread_pool, TRUE, FALSE);
self->thread_pool = NULL;
G_OBJECT_CLASS (g_threaded_resolver_parent_class)->finalize (object);
}
static GResolverError
@ -95,6 +116,23 @@ typedef struct {
GResolverRecordType record_type;
} lookup_records;
};
GCond cond; /* used for signalling completion of the task when running it sync */
GMutex lock;
/* This enum indicates that a particular code path has claimed the
* task and is shortly about to call g_task_return_*() on it.
* This must be accessed with GThreadedResolver.lock held. */
enum
{
NOT_YET,
COMPLETED, /* libc lookup call has completed successfully or errored */
} will_return;
/* Whether the thread pool thread executing this lookup has finished executing
* it and g_task_return_*() has been called on it already.
* This must be accessed with GThreadedResolver.lock held. */
gboolean has_returned;
} LookupData;
static LookupData *
@ -103,6 +141,8 @@ lookup_data_new_by_name (const char *hostname,
{
LookupData *data = g_new0 (LookupData, 1);
data->lookup_type = LOOKUP_BY_NAME;
g_cond_init (&data->cond);
g_mutex_init (&data->lock);
data->lookup_by_name.hostname = g_strdup (hostname);
data->lookup_by_name.address_family = address_family;
return g_steal_pointer (&data);
@ -113,6 +153,8 @@ lookup_data_new_by_address (GInetAddress *address)
{
LookupData *data = g_new0 (LookupData, 1);
data->lookup_type = LOOKUP_BY_ADDRESS;
g_cond_init (&data->cond);
g_mutex_init (&data->lock);
data->lookup_by_address.address = g_object_ref (address);
return g_steal_pointer (&data);
}
@ -123,6 +165,8 @@ lookup_data_new_records (const gchar *rrname,
{
LookupData *data = g_new0 (LookupData, 1);
data->lookup_type = LOOKUP_RECORDS;
g_cond_init (&data->cond);
g_mutex_init (&data->lock);
data->lookup_records.rrname = g_strdup (rrname);
data->lookup_records.record_type = record_type;
return g_steal_pointer (&data);
@ -145,6 +189,9 @@ lookup_data_free (LookupData *data)
g_assert_not_reached ();
}
g_mutex_clear (&data->lock);
g_cond_clear (&data->cond);
g_free (data);
}
@ -243,6 +290,7 @@ lookup_by_name (GResolver *resolver,
GCancellable *cancellable,
GError **error)
{
GThreadedResolver *self = G_THREADED_RESOLVER (resolver);
GTask *task;
GList *addresses;
LookupData *data;
@ -252,8 +300,9 @@ lookup_by_name (GResolver *resolver,
g_task_set_source_tag (task, lookup_by_name);
g_task_set_name (task, "[gio] resolver lookup");
g_task_set_task_data (task, g_steal_pointer (&data), (GDestroyNotify) lookup_data_free);
g_task_set_return_on_cancel (task, TRUE);
g_task_run_in_thread_sync (task, threaded_resolver_worker_cb);
run_task_in_thread_pool_sync (self, task);
addresses = g_task_propagate_pointer (task, error);
g_object_unref (task);
@ -285,6 +334,7 @@ lookup_by_name_with_flags (GResolver *resolver,
GCancellable *cancellable,
GError **error)
{
GThreadedResolver *self = G_THREADED_RESOLVER (resolver);
GTask *task;
GList *addresses;
LookupData *data;
@ -294,8 +344,9 @@ lookup_by_name_with_flags (GResolver *resolver,
g_task_set_source_tag (task, lookup_by_name_with_flags);
g_task_set_name (task, "[gio] resolver lookup");
g_task_set_task_data (task, g_steal_pointer (&data), (GDestroyNotify) lookup_data_free);
g_task_set_return_on_cancel (task, TRUE);
g_task_run_in_thread_sync (task, threaded_resolver_worker_cb);
run_task_in_thread_pool_sync (self, task);
addresses = g_task_propagate_pointer (task, error);
g_object_unref (task);
@ -310,6 +361,7 @@ lookup_by_name_with_flags_async (GResolver *resolver,
GAsyncReadyCallback callback,
gpointer user_data)
{
GThreadedResolver *self = G_THREADED_RESOLVER (resolver);
GTask *task;
LookupData *data;
@ -322,8 +374,9 @@ lookup_by_name_with_flags_async (GResolver *resolver,
g_task_set_source_tag (task, lookup_by_name_with_flags_async);
g_task_set_name (task, "[gio] resolver lookup");
g_task_set_task_data (task, g_steal_pointer (&data), (GDestroyNotify) lookup_data_free);
g_task_set_return_on_cancel (task, TRUE);
g_task_run_in_thread (task, threaded_resolver_worker_cb);
run_task_in_thread_pool_async (self, task);
g_object_unref (task);
}
@ -415,6 +468,7 @@ lookup_by_address (GResolver *resolver,
GCancellable *cancellable,
GError **error)
{
GThreadedResolver *self = G_THREADED_RESOLVER (resolver);
LookupData *data = NULL;
GTask *task;
gchar *name;
@ -424,8 +478,9 @@ lookup_by_address (GResolver *resolver,
g_task_set_source_tag (task, lookup_by_address);
g_task_set_name (task, "[gio] resolver lookup");
g_task_set_task_data (task, g_steal_pointer (&data), (GDestroyNotify) lookup_data_free);
g_task_set_return_on_cancel (task, TRUE);
g_task_run_in_thread_sync (task, threaded_resolver_worker_cb);
run_task_in_thread_pool_sync (self, task);
name = g_task_propagate_pointer (task, error);
g_object_unref (task);
@ -439,6 +494,7 @@ lookup_by_address_async (GResolver *resolver,
GAsyncReadyCallback callback,
gpointer user_data)
{
GThreadedResolver *self = G_THREADED_RESOLVER (resolver);
LookupData *data = NULL;
GTask *task;
@ -447,8 +503,9 @@ lookup_by_address_async (GResolver *resolver,
g_task_set_source_tag (task, lookup_by_address_async);
g_task_set_name (task, "[gio] resolver lookup");
g_task_set_task_data (task, g_steal_pointer (&data), (GDestroyNotify) lookup_data_free);
g_task_set_return_on_cancel (task, TRUE);
g_task_run_in_thread (task, threaded_resolver_worker_cb);
run_task_in_thread_pool_async (self, task);
g_object_unref (task);
}
@ -1247,6 +1304,7 @@ lookup_records (GResolver *resolver,
GCancellable *cancellable,
GError **error)
{
GThreadedResolver *self = G_THREADED_RESOLVER (resolver);
GTask *task;
GList *records;
LookupData *data = NULL;
@ -1258,8 +1316,8 @@ lookup_records (GResolver *resolver,
data = lookup_data_new_records (rrname, record_type);
g_task_set_task_data (task, g_steal_pointer (&data), (GDestroyNotify) lookup_data_free);
g_task_set_return_on_cancel (task, TRUE);
g_task_run_in_thread_sync (task, threaded_resolver_worker_cb);
run_task_in_thread_pool_sync (self, task);
records = g_task_propagate_pointer (task, error);
g_object_unref (task);
@ -1274,6 +1332,7 @@ lookup_records_async (GResolver *resolver,
GAsyncReadyCallback callback,
gpointer user_data)
{
GThreadedResolver *self = G_THREADED_RESOLVER (resolver);
GTask *task;
LookupData *data = NULL;
@ -1284,8 +1343,8 @@ lookup_records_async (GResolver *resolver,
data = lookup_data_new_records (rrname, record_type);
g_task_set_task_data (task, g_steal_pointer (&data), (GDestroyNotify) lookup_data_free);
g_task_set_return_on_cancel (task, TRUE);
g_task_run_in_thread (task, threaded_resolver_worker_cb);
run_task_in_thread_pool_async (self, task);
g_object_unref (task);
}
@ -1300,13 +1359,41 @@ lookup_records_finish (GResolver *resolver,
}
static void
threaded_resolver_worker_cb (GTask *task,
gpointer source_object,
gpointer task_data,
GCancellable *cancellable)
run_task_in_thread_pool_async (GThreadedResolver *self,
GTask *task)
{
LookupData *data = task_data;
LookupData *data = g_task_get_task_data (task);
g_mutex_lock (&data->lock);
g_thread_pool_push (self->thread_pool, g_object_ref (task), NULL);
g_mutex_unlock (&data->lock);
}
static void
run_task_in_thread_pool_sync (GThreadedResolver *self,
GTask *task)
{
LookupData *data = g_task_get_task_data (task);
run_task_in_thread_pool_async (self, task);
g_mutex_lock (&data->lock);
while (!data->has_returned)
g_cond_wait (&data->cond, &data->lock);
g_mutex_unlock (&data->lock);
}
static void
threaded_resolver_worker_cb (gpointer task_data,
gpointer user_data)
{
GTask *task = G_TASK (g_steal_pointer (&task_data));
LookupData *data = g_task_get_task_data (task);
GCancellable *cancellable = g_task_get_cancellable (task);
GError *local_error = NULL;
gboolean should_return;
switch (data->lookup_type) {
case LOOKUP_BY_NAME:
@ -1316,10 +1403,19 @@ threaded_resolver_worker_cb (GTask *task,
cancellable,
&local_error);
if (addresses != NULL)
g_task_return_pointer (task, g_steal_pointer (&addresses), (GDestroyNotify) g_resolver_free_addresses);
else
g_task_return_error (task, g_steal_pointer (&local_error));
g_mutex_lock (&data->lock);
should_return = g_atomic_int_compare_and_exchange (&data->will_return, NOT_YET, COMPLETED);
g_mutex_unlock (&data->lock);
if (should_return)
{
if (addresses != NULL)
g_task_return_pointer (task, g_steal_pointer (&addresses), (GDestroyNotify) g_resolver_free_addresses);
else
g_task_return_error (task, g_steal_pointer (&local_error));
}
g_clear_pointer (&addresses, g_resolver_free_addresses);
}
break;
case LOOKUP_BY_ADDRESS:
@ -1328,10 +1424,19 @@ threaded_resolver_worker_cb (GTask *task,
cancellable,
&local_error);
if (name != NULL)
g_task_return_pointer (task, g_steal_pointer (&name), g_free);
else
g_task_return_error (task, g_steal_pointer (&local_error));
g_mutex_lock (&data->lock);
should_return = g_atomic_int_compare_and_exchange (&data->will_return, NOT_YET, COMPLETED);
g_mutex_unlock (&data->lock);
if (should_return)
{
if (name != NULL)
g_task_return_pointer (task, g_steal_pointer (&name), g_free);
else
g_task_return_error (task, g_steal_pointer (&local_error));
}
g_clear_pointer (&name, g_free);
}
break;
case LOOKUP_RECORDS:
@ -1341,22 +1446,43 @@ threaded_resolver_worker_cb (GTask *task,
cancellable,
&local_error);
if (records != NULL)
g_task_return_pointer (task, g_steal_pointer (&records), (GDestroyNotify) free_records);
else
g_task_return_error (task, g_steal_pointer (&local_error));
g_mutex_lock (&data->lock);
should_return = g_atomic_int_compare_and_exchange (&data->will_return, NOT_YET, COMPLETED);
g_mutex_unlock (&data->lock);
if (should_return)
{
if (records != NULL)
g_task_return_pointer (task, g_steal_pointer (&records), (GDestroyNotify) free_records);
else
g_task_return_error (task, g_steal_pointer (&local_error));
}
g_clear_pointer (&records, free_records);
}
break;
default:
g_assert_not_reached ();
}
/* Signal completion of a task. */
g_mutex_lock (&data->lock);
g_assert (!data->has_returned);
data->has_returned = TRUE;
g_cond_broadcast (&data->cond);
g_mutex_unlock (&data->lock);
g_object_unref (task);
}
static void
g_threaded_resolver_class_init (GThreadedResolverClass *threaded_class)
{
GObjectClass *object_class = G_OBJECT_CLASS (threaded_class);
GResolverClass *resolver_class = G_RESOLVER_CLASS (threaded_class);
object_class->finalize = g_threaded_resolver_finalize;
resolver_class->lookup_by_name = lookup_by_name;
resolver_class->lookup_by_name_async = lookup_by_name_async;
resolver_class->lookup_by_name_finish = lookup_by_name_finish;