GCancellable: Use per-instance mutex logic instead of global critical sections

GCancellable is meant to be used in multi-thread operations but all the
cancellable instances were sharing a single mutex to synchronize them
which can be less optimal when many instances are in place.
Especially when we're doing a lock/unlock dances that may leave another
thread to take the control of a critical section in an unexpected way.

This in fact was leading to some races in GCancellableSources causing
leaks because we were assuming that the "cancelled" callback was always
called before our dispose implementation.

As per this, use per-instance mutexes.

The lock is also now used only to protect the calls that may interact
with cancelled state or that depends on that, as per this we can just
reduce it to the cancel and reset case, other than to the connect one to
prevent the race that we could have when connecting to a cancellable
that is reset from another thread.

We don't really need to release the locks during callbacks now as they
are per instance, and there's really no function that we allowed to call
during a ::cancelled signal callback that may require an unlocked state.
This could been done in case with a recursive lock, that is easy enough
to implement but not really needed for this case.

Fixes: #2309, #2313
This commit is contained in:
Marco Trevisan (Treviño) 2022-06-21 05:58:00 +02:00
parent 0a8cb10f22
commit 3a07b2abd4
3 changed files with 102 additions and 129 deletions

View File

@ -1,6 +1,7 @@
/* GIO - GLib Input, Output and Streaming Library /* GIO - GLib Input, Output and Streaming Library
* *
* Copyright (C) 2006-2007 Red Hat, Inc. * Copyright (C) 2006-2007 Red Hat, Inc.
* Copyright (C) 2022-2024 Canonical, Ltd.
* *
* SPDX-License-Identifier: LGPL-2.1-or-later * SPDX-License-Identifier: LGPL-2.1-or-later
* *
@ -18,6 +19,7 @@
* Public License along with this library; if not, see <http://www.gnu.org/licenses/>. * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
* *
* Author: Alexander Larsson <alexl@redhat.com> * Author: Alexander Larsson <alexl@redhat.com>
* Author: Marco Trevisan <marco.trevisan@canonical.com>
*/ */
#include "config.h" #include "config.h"
@ -45,14 +47,12 @@ enum {
struct _GCancellablePrivate struct _GCancellablePrivate
{ {
/* Atomic so that g_cancellable_is_cancelled does not require holding the mutex. */ /* Atomic so that we don't require holding global mutexes for independent ops. */
gboolean cancelled; gboolean cancelled;
/* Access to fields below is protected by cancellable_mutex. */ int cancelled_running;
guint cancelled_running : 1;
guint cancelled_running_waiting : 1;
unsigned cancelled_emissions;
unsigned cancelled_emissions_waiting : 1;
/* Access to fields below is protected by cancellable's mutex. */
GMutex mutex;
guint fd_refcount; guint fd_refcount;
GWakeup *wakeup; GWakeup *wakeup;
}; };
@ -62,7 +62,6 @@ static guint signals[LAST_SIGNAL] = { 0 };
G_DEFINE_TYPE_WITH_PRIVATE (GCancellable, g_cancellable, G_TYPE_OBJECT) G_DEFINE_TYPE_WITH_PRIVATE (GCancellable, g_cancellable, G_TYPE_OBJECT)
static GPrivate current_cancellable; static GPrivate current_cancellable;
static GMutex cancellable_mutex;
static GCond cancellable_cond; static GCond cancellable_cond;
static void static void
@ -70,9 +69,15 @@ g_cancellable_finalize (GObject *object)
{ {
GCancellable *cancellable = G_CANCELLABLE (object); GCancellable *cancellable = G_CANCELLABLE (object);
/* We're at finalization phase, so only one thread can be here.
* Thus there's no need to lock. In case something is locking us, then we've
* a bug, and g_mutex_clear() will make this clear aborting.
*/
if (cancellable->priv->wakeup) if (cancellable->priv->wakeup)
GLIB_PRIVATE_CALL (g_wakeup_free) (cancellable->priv->wakeup); GLIB_PRIVATE_CALL (g_wakeup_free) (cancellable->priv->wakeup);
g_mutex_clear (&cancellable->priv->mutex);
G_OBJECT_CLASS (g_cancellable_parent_class)->finalize (object); G_OBJECT_CLASS (g_cancellable_parent_class)->finalize (object);
} }
@ -154,6 +159,8 @@ static void
g_cancellable_init (GCancellable *cancellable) g_cancellable_init (GCancellable *cancellable)
{ {
cancellable->priv = g_cancellable_get_instance_private (cancellable); cancellable->priv = g_cancellable_get_instance_private (cancellable);
g_mutex_init (&cancellable->priv->mutex);
} }
/** /**
@ -265,28 +272,17 @@ g_cancellable_reset (GCancellable *cancellable)
g_return_if_fail (G_IS_CANCELLABLE (cancellable)); g_return_if_fail (G_IS_CANCELLABLE (cancellable));
g_mutex_lock (&cancellable_mutex);
priv = cancellable->priv; priv = cancellable->priv;
while (priv->cancelled_running || priv->cancelled_emissions > 0) g_mutex_lock (&priv->mutex);
{
if (priv->cancelled_running)
priv->cancelled_running_waiting = TRUE;
if (priv->cancelled_emissions > 0) if (g_atomic_int_compare_and_exchange (&priv->cancelled, TRUE, FALSE))
priv->cancelled_emissions_waiting = TRUE;
g_cond_wait (&cancellable_cond, &cancellable_mutex);
}
if (g_atomic_int_exchange (&priv->cancelled, FALSE))
{ {
if (priv->wakeup) if (priv->wakeup)
GLIB_PRIVATE_CALL (g_wakeup_acknowledge) (priv->wakeup); GLIB_PRIVATE_CALL (g_wakeup_acknowledge) (priv->wakeup);
} }
g_mutex_unlock (&cancellable_mutex); g_mutex_unlock (&priv->mutex);
} }
/** /**
@ -404,26 +400,29 @@ g_cancellable_get_fd (GCancellable *cancellable)
gboolean gboolean
g_cancellable_make_pollfd (GCancellable *cancellable, GPollFD *pollfd) g_cancellable_make_pollfd (GCancellable *cancellable, GPollFD *pollfd)
{ {
GCancellablePrivate *priv;
g_return_val_if_fail (pollfd != NULL, FALSE); g_return_val_if_fail (pollfd != NULL, FALSE);
if (cancellable == NULL) if (cancellable == NULL)
return FALSE; return FALSE;
g_return_val_if_fail (G_IS_CANCELLABLE (cancellable), FALSE); g_return_val_if_fail (G_IS_CANCELLABLE (cancellable), FALSE);
g_mutex_lock (&cancellable_mutex); priv = cancellable->priv;
cancellable->priv->fd_refcount++; g_mutex_lock (&priv->mutex);
if (cancellable->priv->wakeup == NULL) if ((priv->fd_refcount++) == 0)
{ {
cancellable->priv->wakeup = GLIB_PRIVATE_CALL (g_wakeup_new) (); priv->wakeup = GLIB_PRIVATE_CALL (g_wakeup_new) ();
if (g_atomic_int_get (&cancellable->priv->cancelled)) if (g_atomic_int_get (&priv->cancelled))
GLIB_PRIVATE_CALL (g_wakeup_signal) (cancellable->priv->wakeup); GLIB_PRIVATE_CALL (g_wakeup_signal) (priv->wakeup);
} }
GLIB_PRIVATE_CALL (g_wakeup_get_pollfd) (cancellable->priv->wakeup, pollfd); g_assert (priv->wakeup);
GLIB_PRIVATE_CALL (g_wakeup_get_pollfd) (priv->wakeup, pollfd);
g_mutex_unlock (&cancellable_mutex); g_mutex_unlock (&priv->mutex);
return TRUE; return TRUE;
} }
@ -447,26 +446,22 @@ g_cancellable_make_pollfd (GCancellable *cancellable, GPollFD *pollfd)
void void
g_cancellable_release_fd (GCancellable *cancellable) g_cancellable_release_fd (GCancellable *cancellable)
{ {
GCancellablePrivate *priv;
if (cancellable == NULL) if (cancellable == NULL)
return; return;
g_return_if_fail (G_IS_CANCELLABLE (cancellable)); g_return_if_fail (G_IS_CANCELLABLE (cancellable));
priv = cancellable->priv; g_mutex_lock (&cancellable->priv->mutex);
g_mutex_lock (&cancellable_mutex); g_assert (cancellable->priv->fd_refcount > 0);
g_assert (priv->fd_refcount > 0);
priv->fd_refcount--; if ((cancellable->priv->fd_refcount--) == 1)
if (priv->fd_refcount == 0)
{ {
GLIB_PRIVATE_CALL (g_wakeup_free) (priv->wakeup); GLIB_PRIVATE_CALL (g_wakeup_free) (cancellable->priv->wakeup);
priv->wakeup = NULL; cancellable->priv->wakeup = NULL;
} }
g_mutex_unlock (&cancellable_mutex); g_mutex_unlock (&cancellable->priv->mutex);
} }
/** /**
@ -495,37 +490,31 @@ g_cancellable_cancel (GCancellable *cancellable)
{ {
GCancellablePrivate *priv; GCancellablePrivate *priv;
if (cancellable == NULL || g_cancellable_is_cancelled (cancellable)) if (cancellable == NULL || g_atomic_int_get (&cancellable->priv->cancelled))
return; return;
priv = cancellable->priv; priv = cancellable->priv;
g_mutex_lock (&cancellable_mutex); g_mutex_lock (&priv->mutex);
if (g_atomic_int_exchange (&priv->cancelled, TRUE)) if (!g_atomic_int_compare_and_exchange (&priv->cancelled, FALSE, TRUE))
{ {
g_mutex_unlock (&cancellable_mutex); g_mutex_unlock (&priv->mutex);
return; return;
} }
priv->cancelled_running = TRUE; g_atomic_int_inc (&priv->cancelled_running);
if (priv->wakeup) if (priv->wakeup)
GLIB_PRIVATE_CALL (g_wakeup_signal) (priv->wakeup); GLIB_PRIVATE_CALL (g_wakeup_signal) (priv->wakeup);
g_mutex_unlock (&cancellable_mutex);
g_object_ref (cancellable); g_object_ref (cancellable);
g_signal_emit (cancellable, signals[CANCELLED], 0); g_signal_emit (cancellable, signals[CANCELLED], 0);
g_mutex_lock (&cancellable_mutex); if (g_atomic_int_dec_and_test (&priv->cancelled_running))
priv->cancelled_running = FALSE;
if (priv->cancelled_running_waiting)
g_cond_broadcast (&cancellable_cond); g_cond_broadcast (&cancellable_cond);
priv->cancelled_running_waiting = FALSE;
g_mutex_unlock (&cancellable_mutex); g_mutex_unlock (&priv->mutex);
g_object_unref (cancellable); g_object_unref (cancellable);
} }
@ -573,7 +562,7 @@ g_cancellable_connect (GCancellable *cancellable,
g_return_val_if_fail (G_IS_CANCELLABLE (cancellable), 0); g_return_val_if_fail (G_IS_CANCELLABLE (cancellable), 0);
g_mutex_lock (&cancellable_mutex); g_mutex_lock (&cancellable->priv->mutex);
if (g_atomic_int_get (&cancellable->priv->cancelled)) if (g_atomic_int_get (&cancellable->priv->cancelled))
{ {
@ -583,23 +572,10 @@ g_cancellable_connect (GCancellable *cancellable,
_callback = (void *)callback; _callback = (void *)callback;
id = 0; id = 0;
cancellable->priv->cancelled_emissions++;
g_mutex_unlock (&cancellable_mutex);
_callback (cancellable, data); _callback (cancellable, data);
if (data_destroy_func) if (data_destroy_func)
data_destroy_func (data); data_destroy_func (data);
g_mutex_lock (&cancellable_mutex);
if (cancellable->priv->cancelled_emissions_waiting)
g_cond_broadcast (&cancellable_cond);
cancellable->priv->cancelled_emissions--;
g_mutex_unlock (&cancellable_mutex);
} }
else else
{ {
@ -607,10 +583,9 @@ g_cancellable_connect (GCancellable *cancellable,
callback, data, callback, data,
(GClosureNotify) data_destroy_func, (GClosureNotify) data_destroy_func,
G_CONNECT_DEFAULT); G_CONNECT_DEFAULT);
g_mutex_unlock (&cancellable_mutex);
} }
g_mutex_unlock (&cancellable->priv->mutex);
return id; return id;
} }
@ -646,33 +621,26 @@ g_cancellable_disconnect (GCancellable *cancellable,
if (handler_id == 0 || cancellable == NULL) if (handler_id == 0 || cancellable == NULL)
return; return;
g_mutex_lock (&cancellable_mutex);
priv = cancellable->priv; priv = cancellable->priv;
while (priv->cancelled_running || priv->cancelled_emissions) g_mutex_lock (&priv->mutex);
{
if (priv->cancelled_running)
priv->cancelled_running_waiting = TRUE;
if (priv->cancelled_emissions) while (g_atomic_int_get (&priv->cancelled_running) != 0)
priv->cancelled_emissions_waiting = TRUE; g_cond_wait (&cancellable_cond, &priv->mutex);
g_cond_wait (&cancellable_cond, &cancellable_mutex); g_mutex_unlock (&priv->mutex);
}
g_signal_handler_disconnect (cancellable, handler_id); g_signal_handler_disconnect (cancellable, handler_id);
g_mutex_unlock (&cancellable_mutex);
} }
typedef struct { typedef struct {
GSource source; GSource source;
/* Atomic: */
GCancellable *cancellable; GCancellable *cancellable;
gulong cancelled_handler; gulong cancelled_handler;
/* Protected by cancellable_mutex: */ /* Atomic: */
gboolean resurrected_during_cancellation; gboolean cancelled_callback_called;
} GCancellableSource; } GCancellableSource;
/* /*
@ -692,27 +660,35 @@ cancellable_source_cancelled (GCancellable *cancellable,
{ {
GSource *source = user_data; GSource *source = user_data;
GCancellableSource *cancellable_source = (GCancellableSource *) source; GCancellableSource *cancellable_source = (GCancellableSource *) source;
gboolean callback_was_not_called;
g_mutex_lock (&cancellable_mutex);
/* Drop the reference added in cancellable_source_dispose(); see the comment there.
* The reference must be dropped after unlocking @cancellable_mutex since
* it could be the final reference, and the dispose function takes
* @cancellable_mutex. */
if (cancellable_source->resurrected_during_cancellation)
{
cancellable_source->resurrected_during_cancellation = FALSE;
g_mutex_unlock (&cancellable_mutex);
g_source_unref (source);
return;
}
g_source_ref (source); g_source_ref (source);
g_mutex_unlock (&cancellable_mutex);
g_source_set_ready_time (source, 0); g_source_set_ready_time (source, 0);
callback_was_not_called = g_atomic_int_compare_and_exchange (
&cancellable_source->cancelled_callback_called, FALSE, TRUE);
g_assert (callback_was_not_called);
g_source_unref (source); g_source_unref (source);
} }
static gboolean
cancellable_source_prepare (GSource *source,
gint *timeout)
{
GCancellableSource *cancellable_source = (GCancellableSource *) source;
GCancellable *cancellable;
if (timeout)
*timeout = -1;
cancellable = g_atomic_pointer_get (&cancellable_source->cancellable);
if (cancellable && !g_atomic_int_get (&cancellable->priv->cancelled_running))
g_atomic_int_set (&cancellable_source->cancelled_callback_called, FALSE);
return FALSE;
}
static gboolean static gboolean
cancellable_source_dispatch (GSource *source, cancellable_source_dispatch (GSource *source,
GSourceFunc callback, GSourceFunc callback,
@ -729,12 +705,13 @@ static void
cancellable_source_dispose (GSource *source) cancellable_source_dispose (GSource *source)
{ {
GCancellableSource *cancellable_source = (GCancellableSource *)source; GCancellableSource *cancellable_source = (GCancellableSource *)source;
GCancellable *cancellable;
g_mutex_lock (&cancellable_mutex); cancellable = g_atomic_pointer_exchange (&cancellable_source->cancellable, NULL);
if (cancellable_source->cancellable) if (cancellable)
{ {
if (cancellable_source->cancellable->priv->cancelled_running) if (g_atomic_int_get (&cancellable->priv->cancelled_running))
{ {
/* There can be a race here: if thread A has called /* There can be a race here: if thread A has called
* g_cancellable_cancel() and has got as far as committing to call * g_cancellable_cancel() and has got as far as committing to call
@ -746,21 +723,21 @@ cancellable_source_dispose (GSource *source)
* will then be left in a state where its committed to using a * will then be left in a state where its committed to using a
* dangling GCancellableSource pointer. * dangling GCancellableSource pointer.
* *
* Eliminate that race by resurrecting the #GSource temporarily, and * Eliminate that race by waiting to ensure that our cancelled
* then dropping that reference in cancellable_source_cancelled(), * callback has been called, keeping a temporary ref, so that
* which should be guaranteed to fire because were inside a * there's no risk that we're unreffing something that is still
* @cancelled_running block. * going to be used.
*/ */
g_source_ref (source); g_source_ref (source);
cancellable_source->resurrected_during_cancellation = TRUE; while (!g_atomic_int_get (&cancellable_source->cancelled_callback_called))
;
g_source_unref (source);
} }
g_clear_signal_handler (&cancellable_source->cancelled_handler, g_clear_signal_handler (&cancellable_source->cancelled_handler, cancellable);
cancellable_source->cancellable); g_object_unref (cancellable);
g_clear_object (&cancellable_source->cancellable);
} }
g_mutex_unlock (&cancellable_mutex);
} }
static gboolean static gboolean
@ -789,7 +766,7 @@ cancellable_source_closure_callback (GCancellable *cancellable,
static GSourceFuncs cancellable_source_funcs = static GSourceFuncs cancellable_source_funcs =
{ {
NULL, cancellable_source_prepare,
NULL, NULL,
cancellable_source_dispatch, cancellable_source_dispatch,
NULL, NULL,

View File

@ -262,11 +262,6 @@ threaded_dispose_thread_cb (gpointer user_data)
static void static void
test_cancellable_source_threaded_dispose (void) test_cancellable_source_threaded_dispose (void)
{ {
#ifdef _GLIB_ADDRESS_SANITIZER
g_test_incomplete ("FIXME: Leaks lots of GCancellableSource objects, see glib#2309");
(void) cancelled_cb;
(void) threaded_dispose_thread_cb;
#else
ThreadedDisposeData data; ThreadedDisposeData data;
GThread *thread = NULL; GThread *thread = NULL;
guint i; guint i;
@ -276,6 +271,10 @@ test_cancellable_source_threaded_dispose (void)
"(in one thread) and cancelling the GCancellable it refers " "(in one thread) and cancelling the GCancellable it refers "
"to (in another thread)"); "to (in another thread)");
g_test_bug ("https://gitlab.gnome.org/GNOME/glib/issues/1841"); g_test_bug ("https://gitlab.gnome.org/GNOME/glib/issues/1841");
#ifdef _GLIB_ADDRESS_SANITIZER
g_test_message ("We also ensure that no GCancellableSource are leaked");
g_test_bug ("https://gitlab.gnome.org/GNOME/glib/issues/2309");
#endif
#ifdef ENABLE_VALGRIND #ifdef ENABLE_VALGRIND
if (RUNNING_ON_VALGRIND) if (RUNNING_ON_VALGRIND)
@ -341,7 +340,6 @@ test_cancellable_source_threaded_dispose (void)
g_cond_clear (&data.cond); g_cond_clear (&data.cond);
g_ptr_array_unref (cancellables_pending_unref); g_ptr_array_unref (cancellables_pending_unref);
#endif
} }
static void static void

View File

@ -1007,17 +1007,16 @@ test_dbus_roundtrip (void)
static void static void
test_dbus_peer_roundtrip (void) test_dbus_peer_roundtrip (void)
{ {
#ifdef _GLIB_ADDRESS_SANITIZER
g_test_incomplete ("FIXME: Leaks a GCancellableSource, see glib#2313");
(void) peer_connection_up;
(void) peer_connection_down;
#else
PeerConnection peer; PeerConnection peer;
#ifdef _GLIB_ADDRESS_SANITIZER
g_test_message ("Ensure that no GCancellableSource are leaked");
g_test_bug ("https://gitlab.gnome.org/GNOME/glib/issues/2313");
#endif
peer_connection_up (&peer); peer_connection_up (&peer);
do_roundtrip (peer.server_connection, peer.client_connection); do_roundtrip (peer.server_connection, peer.client_connection);
peer_connection_down (&peer); peer_connection_down (&peer);
#endif
} }
static gint items_changed_count; static gint items_changed_count;
@ -1146,17 +1145,16 @@ test_dbus_subscriptions (void)
static void static void
test_dbus_peer_subscriptions (void) test_dbus_peer_subscriptions (void)
{ {
#ifdef _GLIB_ADDRESS_SANITIZER
g_test_incomplete ("FIXME: Leaks a GCancellableSource, see glib#2313");
(void) peer_connection_up;
(void) peer_connection_down;
#else
PeerConnection peer; PeerConnection peer;
#ifdef _GLIB_ADDRESS_SANITIZER
g_test_message ("Ensure that no GCancellableSource are leaked");
g_test_bug ("https://gitlab.gnome.org/GNOME/glib/issues/2313");
#endif
peer_connection_up (&peer); peer_connection_up (&peer);
do_subscriptions (peer.server_connection, peer.client_connection); do_subscriptions (peer.server_connection, peer.client_connection);
peer_connection_down (&peer); peer_connection_down (&peer);
#endif
} }
static void static void