GCancellable: Ensure it is always cancelled on connect callback

When a cancellable is cancelled when we call g_cancellable_connect we
used to immediately call the provided callback, while this is fine we
actually had race in case the cancellable was about to be reset or in
the middle of a cancellation.

In fact it could happen that when we released the mutex, another thread
could reset the cancellable just before the callback is actually called
and so leading to call it with g_cancellable_cancelled() == FALSE.

So to handle this, make disconnect and reset function to wait for
connection emission to finish, not to break their assumptions.

This can be tested using some "brute-force" tests where multiple threads
are racing to connect and disconnect while others are cancelling and
resetting a cancellable, ensuring that all works as we expect.
This commit is contained in:
Marco Trevisan (Treviño) 2022-06-22 01:30:57 +02:00
parent ded3099afc
commit cdda03a690
2 changed files with 92 additions and 6 deletions

View File

@ -50,6 +50,8 @@ struct _GCancellablePrivate
/* Access to fields below is protected by cancellable_mutex. */ /* Access to fields below is protected by cancellable_mutex. */
guint cancelled_running : 1; guint cancelled_running : 1;
guint cancelled_running_waiting : 1; guint cancelled_running_waiting : 1;
unsigned cancelled_emissions;
unsigned cancelled_emissions_waiting : 1;
guint fd_refcount; guint fd_refcount;
GWakeup *wakeup; GWakeup *wakeup;
@ -267,9 +269,14 @@ g_cancellable_reset (GCancellable *cancellable)
priv = cancellable->priv; priv = cancellable->priv;
while (priv->cancelled_running) while (priv->cancelled_running || priv->cancelled_emissions > 0)
{ {
if (priv->cancelled_running)
priv->cancelled_running_waiting = TRUE; priv->cancelled_running_waiting = TRUE;
if (priv->cancelled_emissions > 0)
priv->cancelled_emissions_waiting = TRUE;
g_cond_wait (&cancellable_cond, &cancellable_mutex); g_cond_wait (&cancellable_cond, &cancellable_mutex);
} }
@ -571,15 +578,26 @@ g_cancellable_connect (GCancellable *cancellable,
void (*_callback) (GCancellable *cancellable, void (*_callback) (GCancellable *cancellable,
gpointer user_data); gpointer user_data);
g_mutex_unlock (&cancellable_mutex);
_callback = (void *)callback; _callback = (void *)callback;
id = 0; id = 0;
cancellable->priv->cancelled_emissions++;
g_mutex_unlock (&cancellable_mutex);
_callback (cancellable, data); _callback (cancellable, data);
if (data_destroy_func) if (data_destroy_func)
data_destroy_func (data); data_destroy_func (data);
g_mutex_lock (&cancellable_mutex);
if (cancellable->priv->cancelled_emissions_waiting)
g_cond_broadcast (&cancellable_cond);
cancellable->priv->cancelled_emissions--;
g_mutex_unlock (&cancellable_mutex);
} }
else else
{ {
@ -630,9 +648,14 @@ g_cancellable_disconnect (GCancellable *cancellable,
priv = cancellable->priv; priv = cancellable->priv;
while (priv->cancelled_running) while (priv->cancelled_running || priv->cancelled_emissions)
{ {
if (priv->cancelled_running)
priv->cancelled_running_waiting = TRUE; priv->cancelled_running_waiting = TRUE;
if (priv->cancelled_emissions)
priv->cancelled_emissions_waiting = TRUE;
g_cond_wait (&cancellable_cond, &cancellable_mutex); g_cond_wait (&cancellable_cond, &cancellable_mutex);
} }

View File

@ -728,6 +728,68 @@ test_cancellable_cancel_reset_races (void)
g_object_unref (cancellable); g_object_unref (cancellable);
} }
static gpointer
repeatedly_connecting_thread (gpointer data)
{
GCancellable *cancellable = data;
const guint iterations = 10000;
gboolean callback_ever_called = FALSE;
for (guint i = 0; i < iterations; ++i)
{
gboolean callback_called = FALSE;
gboolean called;
gulong id = g_cancellable_connect (cancellable,
G_CALLBACK (on_racy_cancellable_cancelled),
&callback_called, NULL);
called = g_atomic_int_get (&callback_called);
callback_ever_called |= called;
if (g_test_verbose () && called)
g_test_message ("Reconnecting cancellation callback called");
g_cancellable_disconnect (cancellable, id);
}
if (!callback_ever_called)
g_test_incomplete ("We didn't really checked if callbacks is called properly");
return NULL;
}
static void
test_cancellable_cancel_reset_connect_races (void)
{
GCancellable *cancellable;
GThread *resetting_thread = NULL;
GThread *cancelling_thread = NULL;
GThread *connecting_thread = NULL;
gboolean callback_called = FALSE;
g_test_summary ("Tests threads racing for cancelling, connecting and disconnecting "
" and resetting a GCancellable");
cancellable = g_cancellable_new ();
g_cancellable_connect (cancellable, G_CALLBACK (on_racy_cancellable_cancelled),
&callback_called, NULL);
g_assert_false (callback_called);
resetting_thread = g_thread_new ("/cancel-reset-connect-races/resetting",
repeatedly_resetting_thread,
cancellable);
cancelling_thread = g_thread_new ("/cancel-reset-connect-races/cancelling",
repeatedly_cancelling_thread, cancellable);
connecting_thread = g_thread_new ("/cancel-reset-connect-races/connecting",
repeatedly_connecting_thread, cancellable);
g_thread_join (g_steal_pointer (&cancelling_thread));
g_thread_join (g_steal_pointer (&resetting_thread));
g_thread_join (g_steal_pointer (&connecting_thread));
g_assert_true (callback_called);
g_object_unref (cancellable);
}
int int
main (int argc, char *argv[]) main (int argc, char *argv[])
{ {
@ -741,6 +803,7 @@ main (int argc, char *argv[])
g_test_add_func ("/cancellable/poll-fd-cancelled", test_cancellable_cancelled_poll_fd); g_test_add_func ("/cancellable/poll-fd-cancelled", test_cancellable_cancelled_poll_fd);
g_test_add_func ("/cancellable/poll-fd-cancelled-threaded", test_cancellable_cancelled_poll_fd_threaded); g_test_add_func ("/cancellable/poll-fd-cancelled-threaded", test_cancellable_cancelled_poll_fd_threaded);
g_test_add_func ("/cancellable/cancel-reset-races", test_cancellable_cancel_reset_races); g_test_add_func ("/cancellable/cancel-reset-races", test_cancellable_cancel_reset_races);
g_test_add_func ("/cancellable/cancel-reset-connect-races", test_cancellable_cancel_reset_connect_races);
g_test_add_func ("/cancellable-source/threaded-dispose", test_cancellable_source_threaded_dispose); g_test_add_func ("/cancellable-source/threaded-dispose", test_cancellable_source_threaded_dispose);
return g_test_run (); return g_test_run ();