Merge branch 'atomic-gcancellable' into 'main'

GCancellable: Use per-instance mutex logic instead of global critical sections

Closes #2309, #2313 e #774

See merge request GNOME/glib!2765
This commit is contained in:
Marco Trevisan 2024-07-24 07:29:19 +00:00
commit 7855fc9fb6
3 changed files with 176 additions and 133 deletions

View File

@ -1,6 +1,7 @@
/* GIO - GLib Input, Output and Streaming Library /* GIO - GLib Input, Output and Streaming Library
* *
* Copyright (C) 2006-2007 Red Hat, Inc. * Copyright (C) 2006-2007 Red Hat, Inc.
* Copyright (C) 2022-2024 Canonical, Ltd.
* *
* SPDX-License-Identifier: LGPL-2.1-or-later * SPDX-License-Identifier: LGPL-2.1-or-later
* *
@ -18,6 +19,7 @@
* Public License along with this library; if not, see <http://www.gnu.org/licenses/>. * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
* *
* Author: Alexander Larsson <alexl@redhat.com> * Author: Alexander Larsson <alexl@redhat.com>
* Author: Marco Trevisan <marco.trevisan@canonical.com>
*/ */
#include "config.h" #include "config.h"
@ -45,14 +47,12 @@ enum {
struct _GCancellablePrivate struct _GCancellablePrivate
{ {
/* Atomic so that g_cancellable_is_cancelled does not require holding the mutex. */ /* Atomic so that we don't require holding global mutexes for independent ops. */
gboolean cancelled; gboolean cancelled;
/* Access to fields below is protected by cancellable_mutex. */ int cancelled_running;
guint cancelled_running : 1;
guint cancelled_running_waiting : 1;
unsigned cancelled_emissions;
unsigned cancelled_emissions_waiting : 1;
/* Access to fields below is protected by cancellable's mutex. */
GMutex mutex;
guint fd_refcount; guint fd_refcount;
GWakeup *wakeup; GWakeup *wakeup;
}; };
@ -62,7 +62,6 @@ static guint signals[LAST_SIGNAL] = { 0 };
G_DEFINE_TYPE_WITH_PRIVATE (GCancellable, g_cancellable, G_TYPE_OBJECT) G_DEFINE_TYPE_WITH_PRIVATE (GCancellable, g_cancellable, G_TYPE_OBJECT)
static GPrivate current_cancellable; static GPrivate current_cancellable;
static GMutex cancellable_mutex;
static GCond cancellable_cond; static GCond cancellable_cond;
static void static void
@ -70,9 +69,15 @@ g_cancellable_finalize (GObject *object)
{ {
GCancellable *cancellable = G_CANCELLABLE (object); GCancellable *cancellable = G_CANCELLABLE (object);
/* We're at finalization phase, so only one thread can be here.
* Thus there's no need to lock. In case something is locking us, then we've
* a bug, and g_mutex_clear() will make this clear aborting.
*/
if (cancellable->priv->wakeup) if (cancellable->priv->wakeup)
GLIB_PRIVATE_CALL (g_wakeup_free) (cancellable->priv->wakeup); GLIB_PRIVATE_CALL (g_wakeup_free) (cancellable->priv->wakeup);
g_mutex_clear (&cancellable->priv->mutex);
G_OBJECT_CLASS (g_cancellable_parent_class)->finalize (object); G_OBJECT_CLASS (g_cancellable_parent_class)->finalize (object);
} }
@ -154,6 +159,8 @@ static void
g_cancellable_init (GCancellable *cancellable) g_cancellable_init (GCancellable *cancellable)
{ {
cancellable->priv = g_cancellable_get_instance_private (cancellable); cancellable->priv = g_cancellable_get_instance_private (cancellable);
g_mutex_init (&cancellable->priv->mutex);
} }
/** /**
@ -265,28 +272,17 @@ g_cancellable_reset (GCancellable *cancellable)
g_return_if_fail (G_IS_CANCELLABLE (cancellable)); g_return_if_fail (G_IS_CANCELLABLE (cancellable));
g_mutex_lock (&cancellable_mutex);
priv = cancellable->priv; priv = cancellable->priv;
while (priv->cancelled_running || priv->cancelled_emissions > 0) g_mutex_lock (&priv->mutex);
{
if (priv->cancelled_running)
priv->cancelled_running_waiting = TRUE;
if (priv->cancelled_emissions > 0) if (g_atomic_int_compare_and_exchange (&priv->cancelled, TRUE, FALSE))
priv->cancelled_emissions_waiting = TRUE;
g_cond_wait (&cancellable_cond, &cancellable_mutex);
}
if (g_atomic_int_exchange (&priv->cancelled, FALSE))
{ {
if (priv->wakeup) if (priv->wakeup)
GLIB_PRIVATE_CALL (g_wakeup_acknowledge) (priv->wakeup); GLIB_PRIVATE_CALL (g_wakeup_acknowledge) (priv->wakeup);
} }
g_mutex_unlock (&cancellable_mutex); g_mutex_unlock (&priv->mutex);
} }
/** /**
@ -404,26 +400,29 @@ g_cancellable_get_fd (GCancellable *cancellable)
gboolean gboolean
g_cancellable_make_pollfd (GCancellable *cancellable, GPollFD *pollfd) g_cancellable_make_pollfd (GCancellable *cancellable, GPollFD *pollfd)
{ {
GCancellablePrivate *priv;
g_return_val_if_fail (pollfd != NULL, FALSE); g_return_val_if_fail (pollfd != NULL, FALSE);
if (cancellable == NULL) if (cancellable == NULL)
return FALSE; return FALSE;
g_return_val_if_fail (G_IS_CANCELLABLE (cancellable), FALSE); g_return_val_if_fail (G_IS_CANCELLABLE (cancellable), FALSE);
g_mutex_lock (&cancellable_mutex); priv = cancellable->priv;
cancellable->priv->fd_refcount++; g_mutex_lock (&priv->mutex);
if (cancellable->priv->wakeup == NULL) if ((priv->fd_refcount++) == 0)
{ {
cancellable->priv->wakeup = GLIB_PRIVATE_CALL (g_wakeup_new) (); priv->wakeup = GLIB_PRIVATE_CALL (g_wakeup_new) ();
if (g_atomic_int_get (&cancellable->priv->cancelled)) if (g_atomic_int_get (&priv->cancelled))
GLIB_PRIVATE_CALL (g_wakeup_signal) (cancellable->priv->wakeup); GLIB_PRIVATE_CALL (g_wakeup_signal) (priv->wakeup);
} }
GLIB_PRIVATE_CALL (g_wakeup_get_pollfd) (cancellable->priv->wakeup, pollfd); g_assert (priv->wakeup);
GLIB_PRIVATE_CALL (g_wakeup_get_pollfd) (priv->wakeup, pollfd);
g_mutex_unlock (&cancellable_mutex); g_mutex_unlock (&priv->mutex);
return TRUE; return TRUE;
} }
@ -447,26 +446,22 @@ g_cancellable_make_pollfd (GCancellable *cancellable, GPollFD *pollfd)
void void
g_cancellable_release_fd (GCancellable *cancellable) g_cancellable_release_fd (GCancellable *cancellable)
{ {
GCancellablePrivate *priv;
if (cancellable == NULL) if (cancellable == NULL)
return; return;
g_return_if_fail (G_IS_CANCELLABLE (cancellable)); g_return_if_fail (G_IS_CANCELLABLE (cancellable));
priv = cancellable->priv; g_mutex_lock (&cancellable->priv->mutex);
g_mutex_lock (&cancellable_mutex); g_assert (cancellable->priv->fd_refcount > 0);
g_assert (priv->fd_refcount > 0);
priv->fd_refcount--; if ((cancellable->priv->fd_refcount--) == 1)
if (priv->fd_refcount == 0)
{ {
GLIB_PRIVATE_CALL (g_wakeup_free) (priv->wakeup); GLIB_PRIVATE_CALL (g_wakeup_free) (cancellable->priv->wakeup);
priv->wakeup = NULL; cancellable->priv->wakeup = NULL;
} }
g_mutex_unlock (&cancellable_mutex); g_mutex_unlock (&cancellable->priv->mutex);
} }
/** /**
@ -495,37 +490,35 @@ g_cancellable_cancel (GCancellable *cancellable)
{ {
GCancellablePrivate *priv; GCancellablePrivate *priv;
if (cancellable == NULL || g_cancellable_is_cancelled (cancellable)) if (cancellable == NULL || g_atomic_int_get (&cancellable->priv->cancelled))
return; return;
priv = cancellable->priv; priv = cancellable->priv;
g_mutex_lock (&cancellable_mutex); /* We add a reference before locking, to avoid that potential toggle
* notifications on the object might happen while we're locked.
*/
g_object_ref (cancellable);
g_mutex_lock (&priv->mutex);
if (g_atomic_int_exchange (&priv->cancelled, TRUE)) if (!g_atomic_int_compare_and_exchange (&priv->cancelled, FALSE, TRUE))
{ {
g_mutex_unlock (&cancellable_mutex); g_mutex_unlock (&priv->mutex);
g_object_unref (cancellable);
return; return;
} }
priv->cancelled_running = TRUE; g_atomic_int_inc (&priv->cancelled_running);
if (priv->wakeup) if (priv->wakeup)
GLIB_PRIVATE_CALL (g_wakeup_signal) (priv->wakeup); GLIB_PRIVATE_CALL (g_wakeup_signal) (priv->wakeup);
g_mutex_unlock (&cancellable_mutex);
g_object_ref (cancellable);
g_signal_emit (cancellable, signals[CANCELLED], 0); g_signal_emit (cancellable, signals[CANCELLED], 0);
g_mutex_lock (&cancellable_mutex); if (g_atomic_int_dec_and_test (&priv->cancelled_running))
priv->cancelled_running = FALSE;
if (priv->cancelled_running_waiting)
g_cond_broadcast (&cancellable_cond); g_cond_broadcast (&cancellable_cond);
priv->cancelled_running_waiting = FALSE;
g_mutex_unlock (&cancellable_mutex); g_mutex_unlock (&priv->mutex);
g_object_unref (cancellable); g_object_unref (cancellable);
} }
@ -541,9 +534,11 @@ g_cancellable_cancel (GCancellable *cancellable)
* signal. Also handles the race condition that may happen * signal. Also handles the race condition that may happen
* if the cancellable is cancelled right before connecting. * if the cancellable is cancelled right before connecting.
* *
* @callback is called at most once, either directly at the * @callback is called exactly once each time @cancellable is cancelled,
* time of the connect if @cancellable is already cancelled, * either directly at the time of the connect if @cancellable is already
* or when @cancellable is cancelled in some thread. * cancelled, or when @cancellable is cancelled in some thread.
* In case the cancellable is reset via [method@Gio.Cancellable.reset]
* then the callback can be called again if the @cancellable is cancelled.
* *
* @data_destroy_func will be called when the handler is * @data_destroy_func will be called when the handler is
* disconnected, or immediately if the cancellable is already * disconnected, or immediately if the cancellable is already
@ -571,7 +566,7 @@ g_cancellable_connect (GCancellable *cancellable,
g_return_val_if_fail (G_IS_CANCELLABLE (cancellable), 0); g_return_val_if_fail (G_IS_CANCELLABLE (cancellable), 0);
g_mutex_lock (&cancellable_mutex); g_mutex_lock (&cancellable->priv->mutex);
if (g_atomic_int_get (&cancellable->priv->cancelled)) if (g_atomic_int_get (&cancellable->priv->cancelled))
{ {
@ -581,23 +576,10 @@ g_cancellable_connect (GCancellable *cancellable,
_callback = (void *)callback; _callback = (void *)callback;
id = 0; id = 0;
cancellable->priv->cancelled_emissions++;
g_mutex_unlock (&cancellable_mutex);
_callback (cancellable, data); _callback (cancellable, data);
if (data_destroy_func) if (data_destroy_func)
data_destroy_func (data); data_destroy_func (data);
g_mutex_lock (&cancellable_mutex);
if (cancellable->priv->cancelled_emissions_waiting)
g_cond_broadcast (&cancellable_cond);
cancellable->priv->cancelled_emissions--;
g_mutex_unlock (&cancellable_mutex);
} }
else else
{ {
@ -605,10 +587,9 @@ g_cancellable_connect (GCancellable *cancellable,
callback, data, callback, data,
(GClosureNotify) data_destroy_func, (GClosureNotify) data_destroy_func,
G_CONNECT_DEFAULT); G_CONNECT_DEFAULT);
g_mutex_unlock (&cancellable_mutex);
} }
g_mutex_unlock (&cancellable->priv->mutex);
return id; return id;
} }
@ -644,33 +625,26 @@ g_cancellable_disconnect (GCancellable *cancellable,
if (handler_id == 0 || cancellable == NULL) if (handler_id == 0 || cancellable == NULL)
return; return;
g_mutex_lock (&cancellable_mutex);
priv = cancellable->priv; priv = cancellable->priv;
while (priv->cancelled_running || priv->cancelled_emissions) g_mutex_lock (&priv->mutex);
{
if (priv->cancelled_running)
priv->cancelled_running_waiting = TRUE;
if (priv->cancelled_emissions) while (g_atomic_int_get (&priv->cancelled_running) != 0)
priv->cancelled_emissions_waiting = TRUE; g_cond_wait (&cancellable_cond, &priv->mutex);
g_cond_wait (&cancellable_cond, &cancellable_mutex); g_mutex_unlock (&priv->mutex);
}
g_signal_handler_disconnect (cancellable, handler_id); g_signal_handler_disconnect (cancellable, handler_id);
g_mutex_unlock (&cancellable_mutex);
} }
typedef struct { typedef struct {
GSource source; GSource source;
/* Atomic: */
GCancellable *cancellable; GCancellable *cancellable;
gulong cancelled_handler; gulong cancelled_handler;
/* Protected by cancellable_mutex: */ /* Atomic: */
gboolean resurrected_during_cancellation; gboolean cancelled_callback_called;
} GCancellableSource; } GCancellableSource;
/* /*
@ -690,27 +664,35 @@ cancellable_source_cancelled (GCancellable *cancellable,
{ {
GSource *source = user_data; GSource *source = user_data;
GCancellableSource *cancellable_source = (GCancellableSource *) source; GCancellableSource *cancellable_source = (GCancellableSource *) source;
gboolean callback_was_not_called;
g_mutex_lock (&cancellable_mutex);
/* Drop the reference added in cancellable_source_dispose(); see the comment there.
* The reference must be dropped after unlocking @cancellable_mutex since
* it could be the final reference, and the dispose function takes
* @cancellable_mutex. */
if (cancellable_source->resurrected_during_cancellation)
{
cancellable_source->resurrected_during_cancellation = FALSE;
g_mutex_unlock (&cancellable_mutex);
g_source_unref (source);
return;
}
g_source_ref (source); g_source_ref (source);
g_mutex_unlock (&cancellable_mutex);
g_source_set_ready_time (source, 0); g_source_set_ready_time (source, 0);
callback_was_not_called = g_atomic_int_compare_and_exchange (
&cancellable_source->cancelled_callback_called, FALSE, TRUE);
g_assert (callback_was_not_called);
g_source_unref (source); g_source_unref (source);
} }
static gboolean
cancellable_source_prepare (GSource *source,
gint *timeout)
{
GCancellableSource *cancellable_source = (GCancellableSource *) source;
GCancellable *cancellable;
if (timeout)
*timeout = -1;
cancellable = g_atomic_pointer_get (&cancellable_source->cancellable);
if (cancellable && !g_atomic_int_get (&cancellable->priv->cancelled_running))
g_atomic_int_set (&cancellable_source->cancelled_callback_called, FALSE);
return FALSE;
}
static gboolean static gboolean
cancellable_source_dispatch (GSource *source, cancellable_source_dispatch (GSource *source,
GSourceFunc callback, GSourceFunc callback,
@ -727,12 +709,13 @@ static void
cancellable_source_dispose (GSource *source) cancellable_source_dispose (GSource *source)
{ {
GCancellableSource *cancellable_source = (GCancellableSource *)source; GCancellableSource *cancellable_source = (GCancellableSource *)source;
GCancellable *cancellable;
g_mutex_lock (&cancellable_mutex); cancellable = g_atomic_pointer_exchange (&cancellable_source->cancellable, NULL);
if (cancellable_source->cancellable) if (cancellable)
{ {
if (cancellable_source->cancellable->priv->cancelled_running) if (g_atomic_int_get (&cancellable->priv->cancelled_running))
{ {
/* There can be a race here: if thread A has called /* There can be a race here: if thread A has called
* g_cancellable_cancel() and has got as far as committing to call * g_cancellable_cancel() and has got as far as committing to call
@ -744,21 +727,21 @@ cancellable_source_dispose (GSource *source)
* will then be left in a state where its committed to using a * will then be left in a state where its committed to using a
* dangling GCancellableSource pointer. * dangling GCancellableSource pointer.
* *
* Eliminate that race by resurrecting the #GSource temporarily, and * Eliminate that race by waiting to ensure that our cancelled
* then dropping that reference in cancellable_source_cancelled(), * callback has been called, keeping a temporary ref, so that
* which should be guaranteed to fire because were inside a * there's no risk that we're unreffing something that is still
* @cancelled_running block. * going to be used.
*/ */
g_source_ref (source); g_source_ref (source);
cancellable_source->resurrected_during_cancellation = TRUE; while (!g_atomic_int_get (&cancellable_source->cancelled_callback_called))
;
g_source_unref (source);
} }
g_clear_signal_handler (&cancellable_source->cancelled_handler, g_clear_signal_handler (&cancellable_source->cancelled_handler, cancellable);
cancellable_source->cancellable); g_object_unref (cancellable);
g_clear_object (&cancellable_source->cancellable);
} }
g_mutex_unlock (&cancellable_mutex);
} }
static gboolean static gboolean
@ -787,7 +770,7 @@ cancellable_source_closure_callback (GCancellable *cancellable,
static GSourceFuncs cancellable_source_funcs = static GSourceFuncs cancellable_source_funcs =
{ {
NULL, cancellable_source_prepare,
NULL, NULL,
cancellable_source_dispatch, cancellable_source_dispatch,
NULL, NULL,

View File

@ -262,11 +262,6 @@ threaded_dispose_thread_cb (gpointer user_data)
static void static void
test_cancellable_source_threaded_dispose (void) test_cancellable_source_threaded_dispose (void)
{ {
#ifdef _GLIB_ADDRESS_SANITIZER
g_test_incomplete ("FIXME: Leaks lots of GCancellableSource objects, see glib#2309");
(void) cancelled_cb;
(void) threaded_dispose_thread_cb;
#else
ThreadedDisposeData data; ThreadedDisposeData data;
GThread *thread = NULL; GThread *thread = NULL;
guint i; guint i;
@ -276,6 +271,10 @@ test_cancellable_source_threaded_dispose (void)
"(in one thread) and cancelling the GCancellable it refers " "(in one thread) and cancelling the GCancellable it refers "
"to (in another thread)"); "to (in another thread)");
g_test_bug ("https://gitlab.gnome.org/GNOME/glib/issues/1841"); g_test_bug ("https://gitlab.gnome.org/GNOME/glib/issues/1841");
#ifdef _GLIB_ADDRESS_SANITIZER
g_test_message ("We also ensure that no GCancellableSource are leaked");
g_test_bug ("https://gitlab.gnome.org/GNOME/glib/issues/2309");
#endif
#ifdef ENABLE_VALGRIND #ifdef ENABLE_VALGRIND
if (RUNNING_ON_VALGRIND) if (RUNNING_ON_VALGRIND)
@ -341,7 +340,6 @@ test_cancellable_source_threaded_dispose (void)
g_cond_clear (&data.cond); g_cond_clear (&data.cond);
g_ptr_array_unref (cancellables_pending_unref); g_ptr_array_unref (cancellables_pending_unref);
#endif
} }
static void static void
@ -796,6 +794,69 @@ test_cancellable_cancel_reset_connect_races (void)
g_object_unref (cancellable); g_object_unref (cancellable);
} }
static gboolean
source_cancelled_counter_cb (GCancellable *cancellable,
gpointer user_data)
{
guint *n_calls = user_data;
*n_calls = *n_calls + 1;
return G_SOURCE_CONTINUE;
}
static void
do_nothing (G_GNUC_UNUSED void *user_data)
{
/* An empty timeout/idle once callback function */
}
static void
test_cancellable_source_can_be_fired_multiple_times (void)
{
GCancellable *cancellable;
GSource *source;
guint n_calls = 0;
g_test_summary ("Test a cancellable source callback can be called multiple times");
g_test_bug ("https://gitlab.gnome.org/GNOME/glib/issues/774");
cancellable = g_cancellable_new ();
source = g_cancellable_source_new (cancellable);
g_source_set_callback (source, G_SOURCE_FUNC (source_cancelled_counter_cb),
&n_calls, NULL);
g_source_attach (source, NULL);
g_cancellable_cancel (cancellable);
g_assert_cmpuint (n_calls, ==, 0);
while (g_main_context_pending (NULL))
g_main_context_iteration (NULL, TRUE);
g_assert_cmpuint (n_calls, ==, 1);
g_cancellable_cancel (cancellable);
g_timeout_add_once (100, do_nothing, NULL);
while (g_main_context_pending (NULL))
g_main_context_iteration (NULL, TRUE);
g_assert_cmpuint (n_calls, ==, 1);
g_cancellable_reset (cancellable);
g_cancellable_cancel (cancellable);
g_assert_cmpuint (n_calls, ==, 1);
while (g_main_context_pending (NULL))
g_main_context_iteration (NULL, TRUE);
g_assert_cmpuint (n_calls, ==, 2);
g_source_unref (source);
g_object_unref (cancellable);
}
int int
main (int argc, char *argv[]) main (int argc, char *argv[])
{ {
@ -811,6 +872,7 @@ main (int argc, char *argv[])
g_test_add_func ("/cancellable/cancel-reset-races", test_cancellable_cancel_reset_races); g_test_add_func ("/cancellable/cancel-reset-races", test_cancellable_cancel_reset_races);
g_test_add_func ("/cancellable/cancel-reset-connect-races", test_cancellable_cancel_reset_connect_races); g_test_add_func ("/cancellable/cancel-reset-connect-races", test_cancellable_cancel_reset_connect_races);
g_test_add_func ("/cancellable-source/threaded-dispose", test_cancellable_source_threaded_dispose); g_test_add_func ("/cancellable-source/threaded-dispose", test_cancellable_source_threaded_dispose);
g_test_add_func ("/cancellable-source/can-be-fired-multiple-times", test_cancellable_source_can_be_fired_multiple_times);
return g_test_run (); return g_test_run ();
} }

View File

@ -1007,17 +1007,16 @@ test_dbus_roundtrip (void)
static void static void
test_dbus_peer_roundtrip (void) test_dbus_peer_roundtrip (void)
{ {
#ifdef _GLIB_ADDRESS_SANITIZER
g_test_incomplete ("FIXME: Leaks a GCancellableSource, see glib#2313");
(void) peer_connection_up;
(void) peer_connection_down;
#else
PeerConnection peer; PeerConnection peer;
#ifdef _GLIB_ADDRESS_SANITIZER
g_test_message ("Ensure that no GCancellableSource are leaked");
g_test_bug ("https://gitlab.gnome.org/GNOME/glib/issues/2313");
#endif
peer_connection_up (&peer); peer_connection_up (&peer);
do_roundtrip (peer.server_connection, peer.client_connection); do_roundtrip (peer.server_connection, peer.client_connection);
peer_connection_down (&peer); peer_connection_down (&peer);
#endif
} }
static gint items_changed_count; static gint items_changed_count;
@ -1146,17 +1145,16 @@ test_dbus_subscriptions (void)
static void static void
test_dbus_peer_subscriptions (void) test_dbus_peer_subscriptions (void)
{ {
#ifdef _GLIB_ADDRESS_SANITIZER
g_test_incomplete ("FIXME: Leaks a GCancellableSource, see glib#2313");
(void) peer_connection_up;
(void) peer_connection_down;
#else
PeerConnection peer; PeerConnection peer;
#ifdef _GLIB_ADDRESS_SANITIZER
g_test_message ("Ensure that no GCancellableSource are leaked");
g_test_bug ("https://gitlab.gnome.org/GNOME/glib/issues/2313");
#endif
peer_connection_up (&peer); peer_connection_up (&peer);
do_subscriptions (peer.server_connection, peer.client_connection); do_subscriptions (peer.server_connection, peer.client_connection);
peer_connection_down (&peer); peer_connection_down (&peer);
#endif
} }
static void static void