GDBusConnection: Avoid callbacks on finalized connection

Turns out that GDBusWorker will issue callbacks (in its own thread)
even after g_dbus_worker_stop() has been called. This would rarely
happen (and unreffing a connection is even rarer) so only saw this bug
occasionally when running the gdbus-connection test case in a loop.

Fix up this issue by maintaining a set of GDBusConnection objects that
are currently "alive" and do nothing in the callbacks if the passed
user_data pointer is not in this set.

Also attempted to fix up a race condition with
_g_object_wait_for_single_ref_do() and its use of GObject toggle
references - for now, just resort to busy waiting, thereby
sidestepping the toggle reference mess altogether.

Signed-off-by: David Zeuthen <davidz@redhat.com>
This commit is contained in:
David Zeuthen 2010-09-23 15:10:50 -04:00
parent c3c0e4d11d
commit f0b04acfd3
4 changed files with 115 additions and 22 deletions

View File

@ -375,13 +375,14 @@ G_DEFINE_TYPE_WITH_CODE (GDBusConnection, g_dbus_connection, G_TYPE_OBJECT,
G_IMPLEMENT_INTERFACE (G_TYPE_ASYNC_INITABLE, async_initable_iface_init) G_IMPLEMENT_INTERFACE (G_TYPE_ASYNC_INITABLE, async_initable_iface_init)
); );
static GHashTable *alive_connections = NULL;
static void static void
g_dbus_connection_dispose (GObject *object) g_dbus_connection_dispose (GObject *object)
{ {
GDBusConnection *connection = G_DBUS_CONNECTION (object); GDBusConnection *connection = G_DBUS_CONNECTION (object);
G_LOCK (message_bus_lock); G_LOCK (message_bus_lock);
//g_debug ("disposing %p", connection);
if (connection == the_session_bus) if (connection == the_session_bus)
{ {
the_session_bus = NULL; the_session_bus = NULL;
@ -390,11 +391,20 @@ g_dbus_connection_dispose (GObject *object)
{ {
the_system_bus = NULL; the_system_bus = NULL;
} }
CONNECTION_LOCK (connection);
if (connection->worker != NULL) if (connection->worker != NULL)
{ {
_g_dbus_worker_stop (connection->worker); _g_dbus_worker_stop (connection->worker);
connection->worker = NULL; connection->worker = NULL;
if (alive_connections != NULL)
g_warn_if_fail (g_hash_table_remove (alive_connections, connection));
} }
else
{
if (alive_connections != NULL)
g_warn_if_fail (g_hash_table_lookup (alive_connections, connection) == NULL);
}
CONNECTION_UNLOCK (connection);
G_UNLOCK (message_bus_lock); G_UNLOCK (message_bus_lock);
if (G_OBJECT_CLASS (g_dbus_connection_parent_class)->dispose != NULL) if (G_OBJECT_CLASS (g_dbus_connection_parent_class)->dispose != NULL)
@ -1957,19 +1967,31 @@ on_worker_message_received (GDBusWorker *worker,
GDBusMessage *message, GDBusMessage *message,
gpointer user_data) gpointer user_data)
{ {
GDBusConnection *connection = G_DBUS_CONNECTION (user_data); GDBusConnection *connection;
FilterCallback *filters; FilterCallback *filters;
gboolean consumed_by_filter; gboolean consumed_by_filter;
gboolean altered_by_filter; gboolean altered_by_filter;
guint num_filters; guint num_filters;
guint n; guint n;
gboolean alive;
G_LOCK (message_bus_lock);
alive = (g_hash_table_lookup (alive_connections, user_data) != NULL);
if (!alive)
{
G_UNLOCK (message_bus_lock);
return;
}
connection = G_DBUS_CONNECTION (user_data);
g_object_ref (connection);
G_UNLOCK (message_bus_lock);
//g_debug ("in on_worker_message_received"); //g_debug ("in on_worker_message_received");
g_object_ref (message); g_object_ref (message);
g_dbus_message_lock (message); g_dbus_message_lock (message);
g_object_ref (connection); //g_debug ("boo ref_count = %d %p %p", G_OBJECT (connection)->ref_count, connection, connection->worker);
/* First collect the set of callback functions */ /* First collect the set of callback functions */
CONNECTION_LOCK (connection); CONNECTION_LOCK (connection);
@ -2051,13 +2073,24 @@ on_worker_message_about_to_be_sent (GDBusWorker *worker,
GDBusMessage *message, GDBusMessage *message,
gpointer user_data) gpointer user_data)
{ {
GDBusConnection *connection = G_DBUS_CONNECTION (user_data); GDBusConnection *connection;
FilterCallback *filters; FilterCallback *filters;
guint num_filters; guint num_filters;
guint n; guint n;
gboolean alive;
G_LOCK (message_bus_lock);
alive = (g_hash_table_lookup (alive_connections, user_data) != NULL);
if (!alive)
{
G_UNLOCK (message_bus_lock);
return message;
}
connection = G_DBUS_CONNECTION (user_data);
g_object_ref (connection);
G_UNLOCK (message_bus_lock);
//g_debug ("in on_worker_message_about_to_be_sent"); //g_debug ("in on_worker_message_about_to_be_sent");
g_object_ref (connection);
/* First collect the set of callback functions */ /* First collect the set of callback functions */
CONNECTION_LOCK (connection); CONNECTION_LOCK (connection);
@ -2096,7 +2129,19 @@ on_worker_closed (GDBusWorker *worker,
GError *error, GError *error,
gpointer user_data) gpointer user_data)
{ {
GDBusConnection *connection = G_DBUS_CONNECTION (user_data); GDBusConnection *connection;
gboolean alive;
G_LOCK (message_bus_lock);
alive = (g_hash_table_lookup (alive_connections, user_data) != NULL);
if (!alive)
{
G_UNLOCK (message_bus_lock);
return;
}
connection = G_DBUS_CONNECTION (user_data);
g_object_ref (connection);
G_UNLOCK (message_bus_lock);
//g_debug ("in on_worker_closed: %s", error->message); //g_debug ("in on_worker_closed: %s", error->message);
@ -2104,6 +2149,8 @@ on_worker_closed (GDBusWorker *worker,
if (!connection->closed) if (!connection->closed)
set_closed_unlocked (connection, remote_peer_vanished, error); set_closed_unlocked (connection, remote_peer_vanished, error);
CONNECTION_UNLOCK (connection); CONNECTION_UNLOCK (connection);
g_object_unref (connection);
} }
/* ---------------------------------------------------------------------------------------------------- */ /* ---------------------------------------------------------------------------------------------------- */
@ -2244,6 +2291,12 @@ initable_init (GInitable *initable,
} }
#endif #endif
G_LOCK (message_bus_lock);
if (alive_connections == NULL)
alive_connections = g_hash_table_new (g_direct_hash, g_direct_equal);
g_hash_table_insert (alive_connections, connection, connection);
G_UNLOCK (message_bus_lock);
connection->worker = _g_dbus_worker_new (connection->stream, connection->worker = _g_dbus_worker_new (connection->stream,
connection->capabilities, connection->capabilities,
(connection->flags & G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING), (connection->flags & G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING),

View File

@ -1443,22 +1443,14 @@ _g_dbus_worker_new (GIOStream *stream,
/* ---------------------------------------------------------------------------------------------------- */ /* ---------------------------------------------------------------------------------------------------- */
/* This can be called from any thread - frees worker - guarantees no callbacks /* This can be called from any thread - frees worker. Note that
* will ever be issued again * callbacks might still happen if called from another thread than the
* worker - use your own synchronization primitive in the callbacks.
*/ */
void void
_g_dbus_worker_stop (GDBusWorker *worker) _g_dbus_worker_stop (GDBusWorker *worker)
{ {
/* If we're called in the worker thread it means we are called from
* a worker callback. This is fine, we just can't lock in that case since
* we're already holding the lock...
*/
if (g_thread_self () != worker->thread)
g_mutex_lock (worker->read_lock);
worker->stopped = TRUE; worker->stopped = TRUE;
if (g_thread_self () != worker->thread)
g_mutex_unlock (worker->read_lock);
g_cancellable_cancel (worker->cancellable); g_cancellable_cancel (worker->cancellable);
_g_dbus_worker_unref (worker); _g_dbus_worker_unref (worker);
} }

View File

@ -116,13 +116,13 @@ test_connection_life_cycle (void)
g_assert (!g_dbus_error_is_remote_error (error)); g_assert (!g_dbus_error_is_remote_error (error));
g_assert (c == NULL); g_assert (c == NULL);
g_error_free (error); g_error_free (error);
error = NULL;
/* /*
* Check for correct behavior when a bus is present * Check for correct behavior when a bus is present
*/ */
session_bus_up (); session_bus_up ();
/* case 1 */ /* case 1 */
error = NULL;
c = g_bus_get_sync (G_BUS_TYPE_SESSION, NULL, &error); c = g_bus_get_sync (G_BUS_TYPE_SESSION, NULL, &error);
g_assert_no_error (error); g_assert_no_error (error);
g_assert (c != NULL); g_assert (c != NULL);
@ -131,6 +131,7 @@ test_connection_life_cycle (void)
/* /*
* Check that singleton handling work * Check that singleton handling work
*/ */
error = NULL;
c2 = g_bus_get_sync (G_BUS_TYPE_SESSION, NULL, &error); c2 = g_bus_get_sync (G_BUS_TYPE_SESSION, NULL, &error);
g_assert_no_error (error); g_assert_no_error (error);
g_assert (c2 != NULL); g_assert (c2 != NULL);
@ -1070,12 +1071,13 @@ test_connection_basic (void)
g_assert (exit_on_close); g_assert (exit_on_close);
g_assert (flags == G_DBUS_CAPABILITY_FLAGS_NONE || g_assert (flags == G_DBUS_CAPABILITY_FLAGS_NONE ||
flags == G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING); flags == G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING);
g_object_unref (stream); g_object_unref (stream);
g_object_unref (connection);
g_free (name); g_free (name);
g_free (guid); g_free (guid);
_g_object_wait_for_single_ref (connection);
g_object_unref (connection);
session_bus_down (); session_bus_down ();
} }

View File

@ -158,6 +158,40 @@ _g_bus_get_priv (GBusType bus_type,
/* ---------------------------------------------------------------------------------------------------- */ /* ---------------------------------------------------------------------------------------------------- */
#if 1
/* toggle refs are not easy to use (maybe not even safe) when multiple
* threads are involved so implement this by busy-waiting for now
*/
gboolean
_g_object_wait_for_single_ref_do (gpointer object)
{
guint num_ms_elapsed;
gboolean timed_out;
timed_out = FALSE;
num_ms_elapsed = 0;
while (TRUE)
{
if (G_OBJECT (object)->ref_count == 1)
goto out;
if (num_ms_elapsed > 5000)
{
timed_out = TRUE;
goto out;
}
usleep (10 * 1000);
num_ms_elapsed += 10;
}
out:
return timed_out;
}
#else
typedef struct typedef struct
{ {
GMainLoop *loop; GMainLoop *loop;
@ -201,19 +235,31 @@ _g_object_wait_for_single_ref_do (gpointer object)
g_object_add_toggle_ref (G_OBJECT (object), g_object_add_toggle_ref (G_OBJECT (object),
on_wait_for_single_ref_toggled, on_wait_for_single_ref_toggled,
&data); &data);
/* the reference could have been removed between us checking the
* ref_count and the toggle ref being added
*/
if (G_OBJECT (object)->ref_count == 2)
goto single_ref_already;
g_object_unref (object); g_object_unref (object);
g_main_loop_run (data.loop); g_main_loop_run (data.loop);
g_object_ref (object); g_object_ref (object);
single_ref_already:
g_object_remove_toggle_ref (object, g_object_remove_toggle_ref (object,
on_wait_for_single_ref_toggled, on_wait_for_single_ref_toggled,
&data); &data);
g_source_remove (timeout_id); g_source_remove (timeout_id);
g_main_loop_unref (data.loop); g_main_loop_unref (data.loop);
out: out:
if (data.timed_out)
{
g_printerr ("b ref_count is %d\n", G_OBJECT (object)->ref_count);
}
return data.timed_out; return data.timed_out;
} }
#endif
/* ---------------------------------------------------------------------------------------------------- */ /* ---------------------------------------------------------------------------------------------------- */