Merge branch '978-dbus-signal-emission-race' into 'master'

Fix race between D-Bus signal emission and unsubscription

Closes #978

See merge request GNOME/glib!1332
This commit is contained in:
Philip Withnall 2020-01-21 10:43:39 +00:00
commit 5c7a88e972
4 changed files with 130 additions and 94 deletions

View File

@ -3248,18 +3248,9 @@ typedef struct
gchar *object_path; gchar *object_path;
gchar *arg0; gchar *arg0;
GDBusSignalFlags flags; GDBusSignalFlags flags;
GArray *subscribers; GPtrArray *subscribers; /* (owned) (element-type SignalSubscriber) */
} SignalData; } SignalData;
typedef struct
{
GDBusSignalCallback callback;
gpointer user_data;
GDestroyNotify user_data_free_func;
guint id;
GMainContext *context;
} SignalSubscriber;
static void static void
signal_data_free (SignalData *signal_data) signal_data_free (SignalData *signal_data)
{ {
@ -3270,10 +3261,46 @@ signal_data_free (SignalData *signal_data)
g_free (signal_data->member); g_free (signal_data->member);
g_free (signal_data->object_path); g_free (signal_data->object_path);
g_free (signal_data->arg0); g_free (signal_data->arg0);
g_array_free (signal_data->subscribers, TRUE); g_ptr_array_unref (signal_data->subscribers);
g_free (signal_data); g_free (signal_data);
} }
typedef struct
{
/* All fields are immutable after construction. */
gatomicrefcount ref_count;
GDBusSignalCallback callback;
gpointer user_data;
GDestroyNotify user_data_free_func;
guint id;
GMainContext *context;
} SignalSubscriber;
static SignalSubscriber *
signal_subscriber_ref (SignalSubscriber *subscriber)
{
g_atomic_ref_count_inc (&subscriber->ref_count);
return subscriber;
}
static void
signal_subscriber_unref (SignalSubscriber *subscriber)
{
if (g_atomic_ref_count_dec (&subscriber->ref_count))
{
/* Destroy the user data. It doesnt matter which thread
* signal_subscriber_unref() is called in (or whether its called with a
* lock held), as call_destroy_notify() always defers to the next
* #GMainContext iteration. */
call_destroy_notify (subscriber->context,
subscriber->user_data_free_func,
subscriber->user_data);
g_main_context_unref (subscriber->context);
g_free (subscriber);
}
}
static gchar * static gchar *
args_to_rule (const gchar *sender, args_to_rule (const gchar *sender,
const gchar *interface_name, const gchar *interface_name,
@ -3441,6 +3468,24 @@ is_signal_data_for_name_lost_or_acquired (SignalData *signal_data)
* signal is unsubscribed from, and may be called after @connection * signal is unsubscribed from, and may be called after @connection
* has been destroyed.) * has been destroyed.)
* *
* As @callback is potentially invoked in a different thread from where its
* emitted, its possible for this to happen after
* g_dbus_connection_signal_unsubscribe() has been called in another thread.
* Due to this, @user_data should have a strong reference which is freed with
* @user_data_free_func, rather than pointing to data whose lifecycle is tied
* to the signal subscription. For example, if a #GObject is used to store the
* subscription ID from g_dbus_connection_signal_subscribe(), a strong reference
* to that #GObject must be passed to @user_data, and g_object_unref() passed to
* @user_data_free_func. You are responsible for breaking the resulting
* reference count cycle by explicitly unsubscribing from the signal when
* dropping the last external reference to the #GObject. Alternatively, a weak
* reference may be used.
*
* It is guaranteed that if you unsubscribe from a signal using
* g_dbus_connection_signal_unsubscribe() from the same thread which made the
* corresponding g_dbus_connection_signal_subscribe() call, @callback will not
* be invoked after g_dbus_connection_signal_unsubscribe() returns.
*
* The returned subscription identifier is an opaque value which is guaranteed * The returned subscription identifier is an opaque value which is guaranteed
* to never be zero. * to never be zero.
* *
@ -3464,7 +3509,7 @@ g_dbus_connection_signal_subscribe (GDBusConnection *connection,
{ {
gchar *rule; gchar *rule;
SignalData *signal_data; SignalData *signal_data;
SignalSubscriber subscriber; SignalSubscriber *subscriber;
GPtrArray *signal_data_array; GPtrArray *signal_data_array;
const gchar *sender_unique_name; const gchar *sender_unique_name;
@ -3505,17 +3550,19 @@ g_dbus_connection_signal_subscribe (GDBusConnection *connection,
else else
sender_unique_name = ""; sender_unique_name = "";
subscriber.callback = callback; subscriber = g_new0 (SignalSubscriber, 1);
subscriber.user_data = user_data; subscriber->ref_count = 1;
subscriber.user_data_free_func = user_data_free_func; subscriber->callback = callback;
subscriber.id = (guint) g_atomic_int_add (&_global_subscriber_id, 1); /* TODO: overflow etc. */ subscriber->user_data = user_data;
subscriber.context = g_main_context_ref_thread_default (); subscriber->user_data_free_func = user_data_free_func;
subscriber->id = (guint) g_atomic_int_add (&_global_subscriber_id, 1); /* TODO: overflow etc. */
subscriber->context = g_main_context_ref_thread_default ();
/* see if we've already have this rule */ /* see if we've already have this rule */
signal_data = g_hash_table_lookup (connection->map_rule_to_signal_data, rule); signal_data = g_hash_table_lookup (connection->map_rule_to_signal_data, rule);
if (signal_data != NULL) if (signal_data != NULL)
{ {
g_array_append_val (signal_data->subscribers, subscriber); g_ptr_array_add (signal_data->subscribers, subscriber);
g_free (rule); g_free (rule);
goto out; goto out;
} }
@ -3529,8 +3576,8 @@ g_dbus_connection_signal_subscribe (GDBusConnection *connection,
signal_data->object_path = g_strdup (object_path); signal_data->object_path = g_strdup (object_path);
signal_data->arg0 = g_strdup (arg0); signal_data->arg0 = g_strdup (arg0);
signal_data->flags = flags; signal_data->flags = flags;
signal_data->subscribers = g_array_new (FALSE, FALSE, sizeof (SignalSubscriber)); signal_data->subscribers = g_ptr_array_new_with_free_func ((GDestroyNotify) signal_subscriber_unref);
g_array_append_val (signal_data->subscribers, subscriber); g_ptr_array_add (signal_data->subscribers, subscriber);
g_hash_table_insert (connection->map_rule_to_signal_data, g_hash_table_insert (connection->map_rule_to_signal_data,
signal_data->rule, signal_data->rule,
@ -3560,26 +3607,27 @@ g_dbus_connection_signal_subscribe (GDBusConnection *connection,
out: out:
g_hash_table_insert (connection->map_id_to_signal_data, g_hash_table_insert (connection->map_id_to_signal_data,
GUINT_TO_POINTER (subscriber.id), GUINT_TO_POINTER (subscriber->id),
signal_data); signal_data);
CONNECTION_UNLOCK (connection); CONNECTION_UNLOCK (connection);
return subscriber.id; return subscriber->id;
} }
/* ---------------------------------------------------------------------------------------------------- */ /* ---------------------------------------------------------------------------------------------------- */
/* called in any thread */ /* called in any thread */
/* must hold lock when calling this (except if connection->finalizing is TRUE) */ /* must hold lock when calling this (except if connection->finalizing is TRUE)
static void * returns the number of removed subscribers */
static guint
unsubscribe_id_internal (GDBusConnection *connection, unsubscribe_id_internal (GDBusConnection *connection,
guint subscription_id, guint subscription_id)
GArray *out_removed_subscribers)
{ {
SignalData *signal_data; SignalData *signal_data;
GPtrArray *signal_data_array; GPtrArray *signal_data_array;
guint n; guint n;
guint n_removed = 0;
signal_data = g_hash_table_lookup (connection->map_id_to_signal_data, signal_data = g_hash_table_lookup (connection->map_id_to_signal_data,
GUINT_TO_POINTER (subscription_id)); GUINT_TO_POINTER (subscription_id));
@ -3591,16 +3639,19 @@ unsubscribe_id_internal (GDBusConnection *connection,
for (n = 0; n < signal_data->subscribers->len; n++) for (n = 0; n < signal_data->subscribers->len; n++)
{ {
SignalSubscriber *subscriber; SignalSubscriber *subscriber = signal_data->subscribers->pdata[n];
subscriber = &(g_array_index (signal_data->subscribers, SignalSubscriber, n));
if (subscriber->id != subscription_id) if (subscriber->id != subscription_id)
continue; continue;
/* Its OK to rearrange the array order using the fast #GPtrArray
* removal functions, since were going to exit the loop below anyway we
* never move on to the next element. Secondly, subscription IDs are
* guaranteed to be unique. */
g_warn_if_fail (g_hash_table_remove (connection->map_id_to_signal_data, g_warn_if_fail (g_hash_table_remove (connection->map_id_to_signal_data,
GUINT_TO_POINTER (subscription_id))); GUINT_TO_POINTER (subscription_id)));
g_array_append_val (out_removed_subscribers, *subscriber); n_removed++;
g_array_remove_index (signal_data->subscribers, n); g_ptr_array_remove_index_fast (signal_data->subscribers, n);
if (signal_data->subscribers->len == 0) if (signal_data->subscribers->len == 0)
{ {
@ -3641,7 +3692,7 @@ unsubscribe_id_internal (GDBusConnection *connection,
g_assert_not_reached (); g_assert_not_reached ();
out: out:
; return n_removed;
} }
/** /**
@ -3658,44 +3709,24 @@ void
g_dbus_connection_signal_unsubscribe (GDBusConnection *connection, g_dbus_connection_signal_unsubscribe (GDBusConnection *connection,
guint subscription_id) guint subscription_id)
{ {
GArray *subscribers; guint n_subscribers_removed G_GNUC_UNUSED /* when compiling with G_DISABLE_ASSERT */;
guint n;
g_return_if_fail (G_IS_DBUS_CONNECTION (connection)); g_return_if_fail (G_IS_DBUS_CONNECTION (connection));
g_return_if_fail (check_initialized (connection)); g_return_if_fail (check_initialized (connection));
subscribers = g_array_new (FALSE, FALSE, sizeof (SignalSubscriber));
CONNECTION_LOCK (connection); CONNECTION_LOCK (connection);
unsubscribe_id_internal (connection, n_subscribers_removed = unsubscribe_id_internal (connection, subscription_id);
subscription_id,
subscribers);
CONNECTION_UNLOCK (connection); CONNECTION_UNLOCK (connection);
/* invariant */ /* invariant */
g_assert (subscribers->len == 0 || subscribers->len == 1); g_assert (n_subscribers_removed == 0 || n_subscribers_removed == 1);
/* call GDestroyNotify without lock held */
for (n = 0; n < subscribers->len; n++)
{
SignalSubscriber *subscriber;
subscriber = &(g_array_index (subscribers, SignalSubscriber, n));
call_destroy_notify (subscriber->context,
subscriber->user_data_free_func,
subscriber->user_data);
g_main_context_unref (subscriber->context);
}
g_array_free (subscribers, TRUE);
} }
/* ---------------------------------------------------------------------------------------------------- */ /* ---------------------------------------------------------------------------------------------------- */
typedef struct typedef struct
{ {
guint subscription_id; SignalSubscriber *subscriber; /* (owned) */
GDBusSignalCallback callback;
gpointer user_data;
GDBusMessage *message; GDBusMessage *message;
GDBusConnection *connection; GDBusConnection *connection;
const gchar *sender; const gchar *sender;
@ -3727,7 +3758,7 @@ emit_signal_instance_in_idle_cb (gpointer data)
#if 0 #if 0
g_print ("in emit_signal_instance_in_idle_cb (id=%d sender=%s path=%s interface=%s member=%s params=%s)\n", g_print ("in emit_signal_instance_in_idle_cb (id=%d sender=%s path=%s interface=%s member=%s params=%s)\n",
signal_instance->subscription_id, signal_instance->subscriber->id,
signal_instance->sender, signal_instance->sender,
signal_instance->path, signal_instance->path,
signal_instance->interface, signal_instance->interface,
@ -3739,18 +3770,18 @@ emit_signal_instance_in_idle_cb (gpointer data)
CONNECTION_LOCK (signal_instance->connection); CONNECTION_LOCK (signal_instance->connection);
has_subscription = FALSE; has_subscription = FALSE;
if (g_hash_table_lookup (signal_instance->connection->map_id_to_signal_data, if (g_hash_table_lookup (signal_instance->connection->map_id_to_signal_data,
GUINT_TO_POINTER (signal_instance->subscription_id)) != NULL) GUINT_TO_POINTER (signal_instance->subscriber->id)) != NULL)
has_subscription = TRUE; has_subscription = TRUE;
CONNECTION_UNLOCK (signal_instance->connection); CONNECTION_UNLOCK (signal_instance->connection);
if (has_subscription) if (has_subscription)
signal_instance->callback (signal_instance->connection, signal_instance->subscriber->callback (signal_instance->connection,
signal_instance->sender, signal_instance->sender,
signal_instance->path, signal_instance->path,
signal_instance->interface, signal_instance->interface,
signal_instance->member, signal_instance->member,
parameters, parameters,
signal_instance->user_data); signal_instance->subscriber->user_data);
g_variant_unref (parameters); g_variant_unref (parameters);
@ -3762,6 +3793,7 @@ signal_instance_free (SignalInstance *signal_instance)
{ {
g_object_unref (signal_instance->message); g_object_unref (signal_instance->message);
g_object_unref (signal_instance->connection); g_object_unref (signal_instance->connection);
signal_subscriber_unref (signal_instance->subscriber);
g_free (signal_instance); g_free (signal_instance);
} }
@ -3876,16 +3908,12 @@ schedule_callbacks (GDBusConnection *connection,
for (m = 0; m < signal_data->subscribers->len; m++) for (m = 0; m < signal_data->subscribers->len; m++)
{ {
SignalSubscriber *subscriber; SignalSubscriber *subscriber = signal_data->subscribers->pdata[m];
GSource *idle_source; GSource *idle_source;
SignalInstance *signal_instance; SignalInstance *signal_instance;
subscriber = &(g_array_index (signal_data->subscribers, SignalSubscriber, m));
signal_instance = g_new0 (SignalInstance, 1); signal_instance = g_new0 (SignalInstance, 1);
signal_instance->subscription_id = subscriber->id; signal_instance->subscriber = signal_subscriber_ref (subscriber);
signal_instance->callback = subscriber->callback;
signal_instance->user_data = subscriber->user_data;
signal_instance->message = g_object_ref (message); signal_instance->message = g_object_ref (message);
signal_instance->connection = g_object_ref (connection); signal_instance->connection = g_object_ref (connection);
signal_instance->sender = sender; signal_instance->sender = sender;
@ -3954,7 +3982,6 @@ purge_all_signal_subscriptions (GDBusConnection *connection)
GHashTableIter iter; GHashTableIter iter;
gpointer key; gpointer key;
GArray *ids; GArray *ids;
GArray *subscribers;
guint n; guint n;
ids = g_array_new (FALSE, FALSE, sizeof (guint)); ids = g_array_new (FALSE, FALSE, sizeof (guint));
@ -3965,28 +3992,12 @@ purge_all_signal_subscriptions (GDBusConnection *connection)
g_array_append_val (ids, subscription_id); g_array_append_val (ids, subscription_id);
} }
subscribers = g_array_new (FALSE, FALSE, sizeof (SignalSubscriber));
for (n = 0; n < ids->len; n++) for (n = 0; n < ids->len; n++)
{ {
guint subscription_id = g_array_index (ids, guint, n); guint subscription_id = g_array_index (ids, guint, n);
unsubscribe_id_internal (connection, unsubscribe_id_internal (connection, subscription_id);
subscription_id,
subscribers);
} }
g_array_free (ids, TRUE); g_array_free (ids, TRUE);
/* call GDestroyNotify without lock held */
for (n = 0; n < subscribers->len; n++)
{
SignalSubscriber *subscriber;
subscriber = &(g_array_index (subscribers, SignalSubscriber, n));
call_destroy_notify (subscriber->context,
subscriber->user_data_free_func,
subscriber->user_data);
g_main_context_unref (subscriber->context);
}
g_array_free (subscribers, TRUE);
} }
/* ---------------------------------------------------------------------------------------------------- */ /* ---------------------------------------------------------------------------------------------------- */

View File

@ -366,7 +366,12 @@ request_name_cb (GObject *source_object,
connection = g_object_ref (client->connection); connection = g_object_ref (client->connection);
G_UNLOCK (lock); G_UNLOCK (lock);
/* start listening to NameLost and NameAcquired messages */ /* Start listening to NameLost and NameAcquired messages. We hold
* references to the Client in the signal closures, since its possible
* for a signal to be in-flight after unsubscribing the signal handler.
* This creates a reference count cycle, but thats explicitly broken by
* disconnecting the signal handlers before calling client_unref() in
* g_bus_unown_name(). */
if (connection != NULL) if (connection != NULL)
{ {
client->name_lost_subscription_id = client->name_lost_subscription_id =
@ -378,8 +383,8 @@ request_name_cb (GObject *source_object,
client->name, client->name,
G_DBUS_SIGNAL_FLAGS_NONE, G_DBUS_SIGNAL_FLAGS_NONE,
on_name_lost_or_acquired, on_name_lost_or_acquired,
client, client_ref (client),
NULL); (GDestroyNotify) client_unref);
client->name_acquired_subscription_id = client->name_acquired_subscription_id =
g_dbus_connection_signal_subscribe (connection, g_dbus_connection_signal_subscribe (connection,
"org.freedesktop.DBus", "org.freedesktop.DBus",
@ -389,8 +394,8 @@ request_name_cb (GObject *source_object,
client->name, client->name,
G_DBUS_SIGNAL_FLAGS_NONE, G_DBUS_SIGNAL_FLAGS_NONE,
on_name_lost_or_acquired, on_name_lost_or_acquired,
client, client_ref (client),
NULL); (GDestroyNotify) client_unref);
g_object_unref (connection); g_object_unref (connection);
} }
} }

View File

@ -189,6 +189,7 @@ test_bus_own_name (void)
* Stop owning the name - this should invoke our free func * Stop owning the name - this should invoke our free func
*/ */
g_bus_unown_name (id); g_bus_unown_name (id);
g_main_loop_run (loop);
g_assert_cmpint (data.num_free_func, ==, 2); g_assert_cmpint (data.num_free_func, ==, 2);
/* /*
@ -330,6 +331,7 @@ test_bus_own_name (void)
g_assert_cmpint (data2.num_acquired, ==, 0); g_assert_cmpint (data2.num_acquired, ==, 0);
g_assert_cmpint (data2.num_lost, ==, 1); g_assert_cmpint (data2.num_lost, ==, 1);
g_bus_unown_name (id2); g_bus_unown_name (id2);
g_main_loop_run (loop);
g_assert_cmpint (data2.num_bus_acquired, ==, 0); g_assert_cmpint (data2.num_bus_acquired, ==, 0);
g_assert_cmpint (data2.num_acquired, ==, 0); g_assert_cmpint (data2.num_acquired, ==, 0);
g_assert_cmpint (data2.num_lost, ==, 1); g_assert_cmpint (data2.num_lost, ==, 1);
@ -355,6 +357,7 @@ test_bus_own_name (void)
g_assert_cmpint (data2.num_acquired, ==, 0); g_assert_cmpint (data2.num_acquired, ==, 0);
g_assert_cmpint (data2.num_lost, ==, 1); g_assert_cmpint (data2.num_lost, ==, 1);
g_bus_unown_name (id2); g_bus_unown_name (id2);
g_main_loop_run (loop);
g_assert_cmpint (data2.num_bus_acquired, ==, 0); g_assert_cmpint (data2.num_bus_acquired, ==, 0);
g_assert_cmpint (data2.num_acquired, ==, 0); g_assert_cmpint (data2.num_acquired, ==, 0);
g_assert_cmpint (data2.num_lost, ==, 1); g_assert_cmpint (data2.num_lost, ==, 1);
@ -365,6 +368,7 @@ test_bus_own_name (void)
*/ */
data.expect_null_connection = FALSE; data.expect_null_connection = FALSE;
g_bus_unown_name (id); g_bus_unown_name (id);
g_main_loop_run (loop);
g_assert_cmpint (data.num_bus_acquired, ==, 1); g_assert_cmpint (data.num_bus_acquired, ==, 1);
g_assert_cmpint (data.num_acquired, ==, 1); g_assert_cmpint (data.num_acquired, ==, 1);
g_assert_cmpint (data.num_free_func, ==, 4); g_assert_cmpint (data.num_free_func, ==, 4);
@ -418,6 +422,7 @@ test_bus_own_name (void)
g_assert_cmpint (data2.num_acquired, ==, 0); g_assert_cmpint (data2.num_acquired, ==, 0);
g_assert_cmpint (data2.num_lost, ==, 1); g_assert_cmpint (data2.num_lost, ==, 1);
g_bus_unown_name (id2); g_bus_unown_name (id2);
g_main_loop_run (loop);
g_assert_cmpint (data2.num_bus_acquired, ==, 0); g_assert_cmpint (data2.num_bus_acquired, ==, 0);
g_assert_cmpint (data2.num_acquired, ==, 0); g_assert_cmpint (data2.num_acquired, ==, 0);
g_assert_cmpint (data2.num_lost, ==, 1); g_assert_cmpint (data2.num_lost, ==, 1);
@ -450,8 +455,9 @@ test_bus_own_name (void)
g_assert_cmpint (data2.num_bus_acquired, ==, 0); g_assert_cmpint (data2.num_bus_acquired, ==, 0);
/* ok, make owner2 release the name - then wait for owner to automagically reacquire it */ /* ok, make owner2 release the name - then wait for owner to automagically reacquire it */
g_bus_unown_name (id2); g_bus_unown_name (id2);
g_assert_cmpint (data2.num_free_func, ==, 1);
g_main_loop_run (loop); g_main_loop_run (loop);
g_main_loop_run (loop);
g_assert_cmpint (data2.num_free_func, ==, 1);
g_assert_cmpint (data.num_acquired, ==, 2); g_assert_cmpint (data.num_acquired, ==, 2);
g_assert_cmpint (data.num_lost, ==, 1); g_assert_cmpint (data.num_lost, ==, 1);
@ -466,6 +472,7 @@ test_bus_own_name (void)
g_assert_cmpint (data.num_acquired, ==, 2); g_assert_cmpint (data.num_acquired, ==, 2);
g_assert_cmpint (data.num_lost, ==, 2); g_assert_cmpint (data.num_lost, ==, 2);
g_bus_unown_name (id); g_bus_unown_name (id);
g_main_loop_run (loop);
g_assert_cmpint (data.num_free_func, ==, 5); g_assert_cmpint (data.num_free_func, ==, 5);
g_object_unref (c); g_object_unref (c);
@ -645,6 +652,7 @@ test_bus_watch_name (void)
/* unown the name */ /* unown the name */
g_bus_unown_name (owner_id); g_bus_unown_name (owner_id);
g_main_loop_run (loop);
g_assert_cmpint (data.num_acquired, ==, 1); g_assert_cmpint (data.num_acquired, ==, 1);
g_assert_cmpint (data.num_free_func, ==, 2); g_assert_cmpint (data.num_free_func, ==, 2);
@ -704,6 +712,7 @@ test_bus_watch_name (void)
g_assert_cmpint (data.num_free_func, ==, 1); g_assert_cmpint (data.num_free_func, ==, 1);
g_bus_unown_name (owner_id); g_bus_unown_name (owner_id);
g_main_loop_run (loop);
g_assert_cmpint (data.num_free_func, ==, 2); g_assert_cmpint (data.num_free_func, ==, 2);
session_bus_down (); session_bus_down ();

View File

@ -463,6 +463,17 @@
fun:g_system_thread_new fun:g_system_thread_new
} }
{
g-task-thread-pool-init
Memcheck:Leak
match-leak-kinds:possible,reachable,definite
fun:malloc
...
fun:g_thread_new
...
fun:g_task_thread_pool_init
}
{ {
g-io-module-default-proxy-resolver-gnome g-io-module-default-proxy-resolver-gnome
Memcheck:Leak Memcheck:Leak