From 45411ccbe3c9d1b08332942d1e7b594330688126 Mon Sep 17 00:00:00 2001 From: David Zeuthen Date: Mon, 21 Jun 2010 16:08:53 -0400 Subject: [PATCH] =?UTF-8?q?Bug=20621945=20=E2=80=93=20Filter=20outgoing=20?= =?UTF-8?q?messages=20in=20GDBusConnection?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This patch breaks some rarely-used public API (only known user is dconf). This patch is based on work from Peng Huang . See https://bugzilla.gnome.org/show_bug.cgi?id=621945 Signed-off-by: David Zeuthen --- gio/gdbusconnection.c | 66 ++++++++++++++++++++++++++++++--- gio/gdbusconnection.h | 3 ++ gio/gdbusprivate.c | 71 ++++++++++++++++++++++++------------ gio/gdbusprivate.h | 5 +++ gio/tests/gdbus-connection.c | 16 ++++++-- 5 files changed, 130 insertions(+), 31 deletions(-) diff --git a/gio/gdbusconnection.c b/gio/gdbusconnection.c index a41ee856d..d2ab2d4bd 100644 --- a/gio/gdbusconnection.c +++ b/gio/gdbusconnection.c @@ -1624,6 +1624,7 @@ on_worker_message_received (GDBusWorker *worker, { consumed_by_filter = filters[n].func (connection, message, + TRUE, filters[n].user_data); if (consumed_by_filter) break; @@ -1673,6 +1674,52 @@ on_worker_message_received (GDBusWorker *worker, g_free (filters); } +/* Called in worker's thread */ +static gboolean +on_worker_message_about_to_be_sent (GDBusWorker *worker, + GDBusMessage *message, + gpointer user_data) +{ + GDBusConnection *connection = G_DBUS_CONNECTION (user_data); + FilterCallback *filters; + gboolean consumed_by_filter; + guint num_filters; + guint n; + + //g_debug ("in on_worker_message_about_to_be_sent"); + + g_object_ref (connection); + + /* First collect the set of callback functions */ + CONNECTION_LOCK (connection); + num_filters = connection->priv->filters->len; + filters = g_new0 (FilterCallback, num_filters); + for (n = 0; n < num_filters; n++) + { + FilterData *data = connection->priv->filters->pdata[n]; + filters[n].func = data->filter_function; + filters[n].user_data = data->user_data; + } + CONNECTION_UNLOCK (connection); + + /* the call the filters in order (without holding the lock) */ + consumed_by_filter = FALSE; + for (n = 0; n < num_filters; n++) + { + consumed_by_filter = filters[n].func (connection, + message, + FALSE, + filters[n].user_data); + if (consumed_by_filter) + break; + } + + g_object_unref (connection); + g_free (filters); + + return consumed_by_filter; +} + /* Called in worker's thread - we must not block */ static void on_worker_closed (GDBusWorker *worker, @@ -1831,6 +1878,7 @@ initable_init (GInitable *initable, connection->priv->worker = _g_dbus_worker_new (connection->priv->stream, connection->priv->capabilities, on_worker_message_received, + on_worker_message_about_to_be_sent, on_worker_closed, connection); @@ -2266,11 +2314,11 @@ static guint _global_filter_id = 1; * is removed or %NULL. * * Adds a message filter. Filters are handlers that are run on all - * incoming messages, prior to standard dispatch. Filters are run in - * the order that they were added. The same handler can be added as a - * filter more than once, in which case it will be run more than once. - * Filters added during a filter callback won't be run on the message - * being processed. + * incoming and outgoing messages, prior to standard dispatch. Filters + * are run in the order that they were added. The same handler can be + * added as a filter more than once, in which case it will be run more + * than once. Filters added during a filter callback won't be run on + * the message being processed. * * Note that filters are run in a dedicated message handling thread so * they can't block and, generally, can't do anything but signal a @@ -2279,6 +2327,14 @@ static guint _global_filter_id = 1; * g_dbus_connection_signal_subscribe() or * g_dbus_connection_call() instead. * + * If a filter consumes an incoming message (by returning %TRUE), the + * message is not dispatched anywhere else - not even the standard + * dispatch machinery (that API such as + * g_dbus_connection_signal_subscribe() and + * g_dbus_connection_send_message_with_reply() relies on) will see the + * message. Similary, if a filter consumes an outgoing message, the + * message will not be sent to the other peer. + * * Returns: A filter identifier that can be used with * g_dbus_connection_remove_filter(). * diff --git a/gio/gdbusconnection.h b/gio/gdbusconnection.h index b203196a2..7390dc561 100644 --- a/gio/gdbusconnection.h +++ b/gio/gdbusconnection.h @@ -469,6 +469,8 @@ void g_dbus_connection_signal_unsubscribe (GDBusConnection * GDBusMessageFilterFunction: * @connection: A #GDBusConnection. * @message: A #GDBusMessage. + * @incoming: %TRUE if it is a message received from the other peer, %FALSE if it is + * a message to be sent to the other peer. * @user_data: User data passed when adding the filter. * * Signature for function used in g_dbus_connection_add_filter(). @@ -480,6 +482,7 @@ void g_dbus_connection_signal_unsubscribe (GDBusConnection */ typedef gboolean (*GDBusMessageFilterFunction) (GDBusConnection *connection, GDBusMessage *message, + gboolean incoming, gpointer user_data); guint g_dbus_connection_add_filter (GDBusConnection *connection, diff --git a/gio/gdbusprivate.c b/gio/gdbusprivate.c index 76809eedc..3a48d1d62 100644 --- a/gio/gdbusprivate.c +++ b/gio/gdbusprivate.c @@ -355,6 +355,7 @@ struct GDBusWorker GDBusCapabilityFlags capabilities; GCancellable *cancellable; GDBusWorkerMessageReceivedCallback message_received_callback; + GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback; GDBusWorkerDisconnectedCallback disconnected_callback; gpointer user_data; @@ -424,13 +425,24 @@ _g_dbus_worker_emit_disconnected (GDBusWorker *worker, } static void -_g_dbus_worker_emit_message (GDBusWorker *worker, - GDBusMessage *message) +_g_dbus_worker_emit_message_received (GDBusWorker *worker, + GDBusMessage *message) { if (!worker->stopped) worker->message_received_callback (worker, message, worker->user_data); } +static gboolean +_g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker *worker, + GDBusMessage *message) +{ + gboolean ret; + ret = FALSE; + if (!worker->stopped) + ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data); + return ret; +} + static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker); /* called in private thread shared by all GDBusConnection instances (without read-lock held) */ @@ -627,7 +639,7 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, } /* yay, got a message, go deliver it */ - _g_dbus_worker_emit_message (worker, message); + _g_dbus_worker_emit_message_received (worker, message); g_object_unref (message); /* start reading another message! */ @@ -720,7 +732,7 @@ message_to_write_data_free (MessageToWriteData *data) /* ---------------------------------------------------------------------------------------------------- */ -/* called in private thread shared by all GDBusConnection instances (with write-lock held) */ +/* called in private thread shared by all GDBusConnection instances (without write-lock held) */ static gboolean write_message (GDBusWorker *worker, MessageToWriteData *data, @@ -848,29 +860,39 @@ write_message_in_idle_cb (gpointer user_data) GDBusWorker *worker = user_data; gboolean more_writes_are_pending; MessageToWriteData *data; + gboolean message_was_dropped; GError *error; g_mutex_lock (worker->write_lock); - data = g_queue_pop_head (worker->write_queue); g_assert (data != NULL); - - error = NULL; - if (!write_message (worker, - data, - &error)) - { - /* TODO: handle */ - _g_dbus_worker_emit_disconnected (worker, TRUE, error); - g_error_free (error); - } - message_to_write_data_free (data); - more_writes_are_pending = (g_queue_get_length (worker->write_queue) > 0); - worker->write_is_pending = more_writes_are_pending; g_mutex_unlock (worker->write_lock); + /* Note that write_lock is only used for protecting the @write_queue + * and @write_is_pending fields of the GDBusWorker struct ... which we + * need to modify from arbitrary threads in _g_dbus_worker_send_message(). + * + * Therefore, it's fine to drop it here when calling back into user + * code and then writing the message out onto the GIOStream since this + * function only runs on the worker thread. + */ + message_was_dropped = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message); + if (G_LIKELY (!message_was_dropped)) + { + error = NULL; + if (!write_message (worker, + data, + &error)) + { + /* TODO: handle */ + _g_dbus_worker_emit_disconnected (worker, TRUE, error); + g_error_free (error); + } + } + message_to_write_data_free (data); + return more_writes_are_pending; } @@ -928,16 +950,18 @@ _g_dbus_worker_thread_begin_func (gpointer user_data) } GDBusWorker * -_g_dbus_worker_new (GIOStream *stream, - GDBusCapabilityFlags capabilities, - GDBusWorkerMessageReceivedCallback message_received_callback, - GDBusWorkerDisconnectedCallback disconnected_callback, - gpointer user_data) +_g_dbus_worker_new (GIOStream *stream, + GDBusCapabilityFlags capabilities, + GDBusWorkerMessageReceivedCallback message_received_callback, + GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback, + GDBusWorkerDisconnectedCallback disconnected_callback, + gpointer user_data) { GDBusWorker *worker; g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL); g_return_val_if_fail (message_received_callback != NULL, NULL); + g_return_val_if_fail (message_about_to_be_sent_callback != NULL, NULL); g_return_val_if_fail (disconnected_callback != NULL, NULL); worker = g_new0 (GDBusWorker, 1); @@ -945,6 +969,7 @@ _g_dbus_worker_new (GIOStream *stream, worker->read_lock = g_mutex_new (); worker->message_received_callback = message_received_callback; + worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback; worker->disconnected_callback = disconnected_callback; worker->user_data = user_data; worker->stream = g_object_ref (stream); diff --git a/gio/gdbusprivate.h b/gio/gdbusprivate.h index 337c2786b..0d9cb610e 100644 --- a/gio/gdbusprivate.h +++ b/gio/gdbusprivate.h @@ -39,6 +39,10 @@ typedef void (*GDBusWorkerMessageReceivedCallback) (GDBusWorker *worker, GDBusMessage *message, gpointer user_data); +typedef gboolean (*GDBusWorkerMessageAboutToBeSentCallback) (GDBusWorker *worker, + GDBusMessage *message, + gpointer user_data); + typedef void (*GDBusWorkerDisconnectedCallback) (GDBusWorker *worker, gboolean remote_peer_vanished, GError *error, @@ -50,6 +54,7 @@ typedef void (*GDBusWorkerDisconnectedCallback) (GDBusWorker *worker, GDBusWorker *_g_dbus_worker_new (GIOStream *stream, GDBusCapabilityFlags capabilities, GDBusWorkerMessageReceivedCallback message_received_callback, + GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback, GDBusWorkerDisconnectedCallback disconnected_callback, gpointer user_data); diff --git a/gio/tests/gdbus-connection.c b/gio/tests/gdbus-connection.c index 15fca70e4..01113ee72 100644 --- a/gio/tests/gdbus-connection.c +++ b/gio/tests/gdbus-connection.c @@ -555,20 +555,29 @@ test_connection_signals (void) typedef struct { guint num_handled; + guint num_outgoing; guint32 serial; } FilterData; static gboolean filter_func (GDBusConnection *connection, GDBusMessage *message, + gboolean incoming, gpointer user_data) { FilterData *data = user_data; guint32 reply_serial; - reply_serial = g_dbus_message_get_reply_serial (message); - if (reply_serial == data->serial) - data->num_handled += 1; + if (incoming) + { + reply_serial = g_dbus_message_get_reply_serial (message); + if (reply_serial == data->serial) + data->num_handled += 1; + } + else + { + data->num_outgoing += 1; + } return FALSE; } @@ -638,6 +647,7 @@ test_connection_filter (void) g_assert (r != NULL); g_object_unref (r); g_assert_cmpint (data.num_handled, ==, 3); + g_assert_cmpint (data.num_outgoing, ==, 3); _g_object_wait_for_single_ref (c); g_object_unref (c);