mirror of
https://gitlab.gnome.org/GNOME/glib.git
synced 2025-01-13 07:56:17 +01:00
Bug 621945 – Filter outgoing messages in GDBusConnection
This patch breaks some rarely-used public API (only known user is dconf). This patch is based on work from Peng Huang <shawn.p.huang@gmail.com>. See https://bugzilla.gnome.org/show_bug.cgi?id=621945 Signed-off-by: David Zeuthen <davidz@redhat.com>
This commit is contained in:
parent
a4cd39e741
commit
45411ccbe3
@ -1624,6 +1624,7 @@ on_worker_message_received (GDBusWorker *worker,
|
|||||||
{
|
{
|
||||||
consumed_by_filter = filters[n].func (connection,
|
consumed_by_filter = filters[n].func (connection,
|
||||||
message,
|
message,
|
||||||
|
TRUE,
|
||||||
filters[n].user_data);
|
filters[n].user_data);
|
||||||
if (consumed_by_filter)
|
if (consumed_by_filter)
|
||||||
break;
|
break;
|
||||||
@ -1673,6 +1674,52 @@ on_worker_message_received (GDBusWorker *worker,
|
|||||||
g_free (filters);
|
g_free (filters);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Called in worker's thread */
|
||||||
|
static gboolean
|
||||||
|
on_worker_message_about_to_be_sent (GDBusWorker *worker,
|
||||||
|
GDBusMessage *message,
|
||||||
|
gpointer user_data)
|
||||||
|
{
|
||||||
|
GDBusConnection *connection = G_DBUS_CONNECTION (user_data);
|
||||||
|
FilterCallback *filters;
|
||||||
|
gboolean consumed_by_filter;
|
||||||
|
guint num_filters;
|
||||||
|
guint n;
|
||||||
|
|
||||||
|
//g_debug ("in on_worker_message_about_to_be_sent");
|
||||||
|
|
||||||
|
g_object_ref (connection);
|
||||||
|
|
||||||
|
/* First collect the set of callback functions */
|
||||||
|
CONNECTION_LOCK (connection);
|
||||||
|
num_filters = connection->priv->filters->len;
|
||||||
|
filters = g_new0 (FilterCallback, num_filters);
|
||||||
|
for (n = 0; n < num_filters; n++)
|
||||||
|
{
|
||||||
|
FilterData *data = connection->priv->filters->pdata[n];
|
||||||
|
filters[n].func = data->filter_function;
|
||||||
|
filters[n].user_data = data->user_data;
|
||||||
|
}
|
||||||
|
CONNECTION_UNLOCK (connection);
|
||||||
|
|
||||||
|
/* the call the filters in order (without holding the lock) */
|
||||||
|
consumed_by_filter = FALSE;
|
||||||
|
for (n = 0; n < num_filters; n++)
|
||||||
|
{
|
||||||
|
consumed_by_filter = filters[n].func (connection,
|
||||||
|
message,
|
||||||
|
FALSE,
|
||||||
|
filters[n].user_data);
|
||||||
|
if (consumed_by_filter)
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
g_object_unref (connection);
|
||||||
|
g_free (filters);
|
||||||
|
|
||||||
|
return consumed_by_filter;
|
||||||
|
}
|
||||||
|
|
||||||
/* Called in worker's thread - we must not block */
|
/* Called in worker's thread - we must not block */
|
||||||
static void
|
static void
|
||||||
on_worker_closed (GDBusWorker *worker,
|
on_worker_closed (GDBusWorker *worker,
|
||||||
@ -1831,6 +1878,7 @@ initable_init (GInitable *initable,
|
|||||||
connection->priv->worker = _g_dbus_worker_new (connection->priv->stream,
|
connection->priv->worker = _g_dbus_worker_new (connection->priv->stream,
|
||||||
connection->priv->capabilities,
|
connection->priv->capabilities,
|
||||||
on_worker_message_received,
|
on_worker_message_received,
|
||||||
|
on_worker_message_about_to_be_sent,
|
||||||
on_worker_closed,
|
on_worker_closed,
|
||||||
connection);
|
connection);
|
||||||
|
|
||||||
@ -2266,11 +2314,11 @@ static guint _global_filter_id = 1;
|
|||||||
* is removed or %NULL.
|
* is removed or %NULL.
|
||||||
*
|
*
|
||||||
* Adds a message filter. Filters are handlers that are run on all
|
* Adds a message filter. Filters are handlers that are run on all
|
||||||
* incoming messages, prior to standard dispatch. Filters are run in
|
* incoming and outgoing messages, prior to standard dispatch. Filters
|
||||||
* the order that they were added. The same handler can be added as a
|
* are run in the order that they were added. The same handler can be
|
||||||
* filter more than once, in which case it will be run more than once.
|
* added as a filter more than once, in which case it will be run more
|
||||||
* Filters added during a filter callback won't be run on the message
|
* than once. Filters added during a filter callback won't be run on
|
||||||
* being processed.
|
* the message being processed.
|
||||||
*
|
*
|
||||||
* Note that filters are run in a dedicated message handling thread so
|
* Note that filters are run in a dedicated message handling thread so
|
||||||
* they can't block and, generally, can't do anything but signal a
|
* they can't block and, generally, can't do anything but signal a
|
||||||
@ -2279,6 +2327,14 @@ static guint _global_filter_id = 1;
|
|||||||
* g_dbus_connection_signal_subscribe() or
|
* g_dbus_connection_signal_subscribe() or
|
||||||
* g_dbus_connection_call() instead.
|
* g_dbus_connection_call() instead.
|
||||||
*
|
*
|
||||||
|
* If a filter consumes an incoming message (by returning %TRUE), the
|
||||||
|
* message is not dispatched anywhere else - not even the standard
|
||||||
|
* dispatch machinery (that API such as
|
||||||
|
* g_dbus_connection_signal_subscribe() and
|
||||||
|
* g_dbus_connection_send_message_with_reply() relies on) will see the
|
||||||
|
* message. Similary, if a filter consumes an outgoing message, the
|
||||||
|
* message will not be sent to the other peer.
|
||||||
|
*
|
||||||
* Returns: A filter identifier that can be used with
|
* Returns: A filter identifier that can be used with
|
||||||
* g_dbus_connection_remove_filter().
|
* g_dbus_connection_remove_filter().
|
||||||
*
|
*
|
||||||
|
@ -469,6 +469,8 @@ void g_dbus_connection_signal_unsubscribe (GDBusConnection
|
|||||||
* GDBusMessageFilterFunction:
|
* GDBusMessageFilterFunction:
|
||||||
* @connection: A #GDBusConnection.
|
* @connection: A #GDBusConnection.
|
||||||
* @message: A #GDBusMessage.
|
* @message: A #GDBusMessage.
|
||||||
|
* @incoming: %TRUE if it is a message received from the other peer, %FALSE if it is
|
||||||
|
* a message to be sent to the other peer.
|
||||||
* @user_data: User data passed when adding the filter.
|
* @user_data: User data passed when adding the filter.
|
||||||
*
|
*
|
||||||
* Signature for function used in g_dbus_connection_add_filter().
|
* Signature for function used in g_dbus_connection_add_filter().
|
||||||
@ -480,6 +482,7 @@ void g_dbus_connection_signal_unsubscribe (GDBusConnection
|
|||||||
*/
|
*/
|
||||||
typedef gboolean (*GDBusMessageFilterFunction) (GDBusConnection *connection,
|
typedef gboolean (*GDBusMessageFilterFunction) (GDBusConnection *connection,
|
||||||
GDBusMessage *message,
|
GDBusMessage *message,
|
||||||
|
gboolean incoming,
|
||||||
gpointer user_data);
|
gpointer user_data);
|
||||||
|
|
||||||
guint g_dbus_connection_add_filter (GDBusConnection *connection,
|
guint g_dbus_connection_add_filter (GDBusConnection *connection,
|
||||||
|
@ -355,6 +355,7 @@ struct GDBusWorker
|
|||||||
GDBusCapabilityFlags capabilities;
|
GDBusCapabilityFlags capabilities;
|
||||||
GCancellable *cancellable;
|
GCancellable *cancellable;
|
||||||
GDBusWorkerMessageReceivedCallback message_received_callback;
|
GDBusWorkerMessageReceivedCallback message_received_callback;
|
||||||
|
GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback;
|
||||||
GDBusWorkerDisconnectedCallback disconnected_callback;
|
GDBusWorkerDisconnectedCallback disconnected_callback;
|
||||||
gpointer user_data;
|
gpointer user_data;
|
||||||
|
|
||||||
@ -424,13 +425,24 @@ _g_dbus_worker_emit_disconnected (GDBusWorker *worker,
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
_g_dbus_worker_emit_message (GDBusWorker *worker,
|
_g_dbus_worker_emit_message_received (GDBusWorker *worker,
|
||||||
GDBusMessage *message)
|
GDBusMessage *message)
|
||||||
{
|
{
|
||||||
if (!worker->stopped)
|
if (!worker->stopped)
|
||||||
worker->message_received_callback (worker, message, worker->user_data);
|
worker->message_received_callback (worker, message, worker->user_data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static gboolean
|
||||||
|
_g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker *worker,
|
||||||
|
GDBusMessage *message)
|
||||||
|
{
|
||||||
|
gboolean ret;
|
||||||
|
ret = FALSE;
|
||||||
|
if (!worker->stopped)
|
||||||
|
ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
|
static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
|
||||||
|
|
||||||
/* called in private thread shared by all GDBusConnection instances (without read-lock held) */
|
/* called in private thread shared by all GDBusConnection instances (without read-lock held) */
|
||||||
@ -627,7 +639,7 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream,
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* yay, got a message, go deliver it */
|
/* yay, got a message, go deliver it */
|
||||||
_g_dbus_worker_emit_message (worker, message);
|
_g_dbus_worker_emit_message_received (worker, message);
|
||||||
g_object_unref (message);
|
g_object_unref (message);
|
||||||
|
|
||||||
/* start reading another message! */
|
/* start reading another message! */
|
||||||
@ -720,7 +732,7 @@ message_to_write_data_free (MessageToWriteData *data)
|
|||||||
|
|
||||||
/* ---------------------------------------------------------------------------------------------------- */
|
/* ---------------------------------------------------------------------------------------------------- */
|
||||||
|
|
||||||
/* called in private thread shared by all GDBusConnection instances (with write-lock held) */
|
/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
|
||||||
static gboolean
|
static gboolean
|
||||||
write_message (GDBusWorker *worker,
|
write_message (GDBusWorker *worker,
|
||||||
MessageToWriteData *data,
|
MessageToWriteData *data,
|
||||||
@ -848,29 +860,39 @@ write_message_in_idle_cb (gpointer user_data)
|
|||||||
GDBusWorker *worker = user_data;
|
GDBusWorker *worker = user_data;
|
||||||
gboolean more_writes_are_pending;
|
gboolean more_writes_are_pending;
|
||||||
MessageToWriteData *data;
|
MessageToWriteData *data;
|
||||||
|
gboolean message_was_dropped;
|
||||||
GError *error;
|
GError *error;
|
||||||
|
|
||||||
g_mutex_lock (worker->write_lock);
|
g_mutex_lock (worker->write_lock);
|
||||||
|
|
||||||
data = g_queue_pop_head (worker->write_queue);
|
data = g_queue_pop_head (worker->write_queue);
|
||||||
g_assert (data != NULL);
|
g_assert (data != NULL);
|
||||||
|
|
||||||
error = NULL;
|
|
||||||
if (!write_message (worker,
|
|
||||||
data,
|
|
||||||
&error))
|
|
||||||
{
|
|
||||||
/* TODO: handle */
|
|
||||||
_g_dbus_worker_emit_disconnected (worker, TRUE, error);
|
|
||||||
g_error_free (error);
|
|
||||||
}
|
|
||||||
message_to_write_data_free (data);
|
|
||||||
|
|
||||||
more_writes_are_pending = (g_queue_get_length (worker->write_queue) > 0);
|
more_writes_are_pending = (g_queue_get_length (worker->write_queue) > 0);
|
||||||
|
|
||||||
worker->write_is_pending = more_writes_are_pending;
|
worker->write_is_pending = more_writes_are_pending;
|
||||||
g_mutex_unlock (worker->write_lock);
|
g_mutex_unlock (worker->write_lock);
|
||||||
|
|
||||||
|
/* Note that write_lock is only used for protecting the @write_queue
|
||||||
|
* and @write_is_pending fields of the GDBusWorker struct ... which we
|
||||||
|
* need to modify from arbitrary threads in _g_dbus_worker_send_message().
|
||||||
|
*
|
||||||
|
* Therefore, it's fine to drop it here when calling back into user
|
||||||
|
* code and then writing the message out onto the GIOStream since this
|
||||||
|
* function only runs on the worker thread.
|
||||||
|
*/
|
||||||
|
message_was_dropped = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
|
||||||
|
if (G_LIKELY (!message_was_dropped))
|
||||||
|
{
|
||||||
|
error = NULL;
|
||||||
|
if (!write_message (worker,
|
||||||
|
data,
|
||||||
|
&error))
|
||||||
|
{
|
||||||
|
/* TODO: handle */
|
||||||
|
_g_dbus_worker_emit_disconnected (worker, TRUE, error);
|
||||||
|
g_error_free (error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
message_to_write_data_free (data);
|
||||||
|
|
||||||
return more_writes_are_pending;
|
return more_writes_are_pending;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -928,16 +950,18 @@ _g_dbus_worker_thread_begin_func (gpointer user_data)
|
|||||||
}
|
}
|
||||||
|
|
||||||
GDBusWorker *
|
GDBusWorker *
|
||||||
_g_dbus_worker_new (GIOStream *stream,
|
_g_dbus_worker_new (GIOStream *stream,
|
||||||
GDBusCapabilityFlags capabilities,
|
GDBusCapabilityFlags capabilities,
|
||||||
GDBusWorkerMessageReceivedCallback message_received_callback,
|
GDBusWorkerMessageReceivedCallback message_received_callback,
|
||||||
GDBusWorkerDisconnectedCallback disconnected_callback,
|
GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
|
||||||
gpointer user_data)
|
GDBusWorkerDisconnectedCallback disconnected_callback,
|
||||||
|
gpointer user_data)
|
||||||
{
|
{
|
||||||
GDBusWorker *worker;
|
GDBusWorker *worker;
|
||||||
|
|
||||||
g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
|
g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
|
||||||
g_return_val_if_fail (message_received_callback != NULL, NULL);
|
g_return_val_if_fail (message_received_callback != NULL, NULL);
|
||||||
|
g_return_val_if_fail (message_about_to_be_sent_callback != NULL, NULL);
|
||||||
g_return_val_if_fail (disconnected_callback != NULL, NULL);
|
g_return_val_if_fail (disconnected_callback != NULL, NULL);
|
||||||
|
|
||||||
worker = g_new0 (GDBusWorker, 1);
|
worker = g_new0 (GDBusWorker, 1);
|
||||||
@ -945,6 +969,7 @@ _g_dbus_worker_new (GIOStream *stream,
|
|||||||
|
|
||||||
worker->read_lock = g_mutex_new ();
|
worker->read_lock = g_mutex_new ();
|
||||||
worker->message_received_callback = message_received_callback;
|
worker->message_received_callback = message_received_callback;
|
||||||
|
worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
|
||||||
worker->disconnected_callback = disconnected_callback;
|
worker->disconnected_callback = disconnected_callback;
|
||||||
worker->user_data = user_data;
|
worker->user_data = user_data;
|
||||||
worker->stream = g_object_ref (stream);
|
worker->stream = g_object_ref (stream);
|
||||||
|
@ -39,6 +39,10 @@ typedef void (*GDBusWorkerMessageReceivedCallback) (GDBusWorker *worker,
|
|||||||
GDBusMessage *message,
|
GDBusMessage *message,
|
||||||
gpointer user_data);
|
gpointer user_data);
|
||||||
|
|
||||||
|
typedef gboolean (*GDBusWorkerMessageAboutToBeSentCallback) (GDBusWorker *worker,
|
||||||
|
GDBusMessage *message,
|
||||||
|
gpointer user_data);
|
||||||
|
|
||||||
typedef void (*GDBusWorkerDisconnectedCallback) (GDBusWorker *worker,
|
typedef void (*GDBusWorkerDisconnectedCallback) (GDBusWorker *worker,
|
||||||
gboolean remote_peer_vanished,
|
gboolean remote_peer_vanished,
|
||||||
GError *error,
|
GError *error,
|
||||||
@ -50,6 +54,7 @@ typedef void (*GDBusWorkerDisconnectedCallback) (GDBusWorker *worker,
|
|||||||
GDBusWorker *_g_dbus_worker_new (GIOStream *stream,
|
GDBusWorker *_g_dbus_worker_new (GIOStream *stream,
|
||||||
GDBusCapabilityFlags capabilities,
|
GDBusCapabilityFlags capabilities,
|
||||||
GDBusWorkerMessageReceivedCallback message_received_callback,
|
GDBusWorkerMessageReceivedCallback message_received_callback,
|
||||||
|
GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
|
||||||
GDBusWorkerDisconnectedCallback disconnected_callback,
|
GDBusWorkerDisconnectedCallback disconnected_callback,
|
||||||
gpointer user_data);
|
gpointer user_data);
|
||||||
|
|
||||||
|
@ -555,20 +555,29 @@ test_connection_signals (void)
|
|||||||
typedef struct
|
typedef struct
|
||||||
{
|
{
|
||||||
guint num_handled;
|
guint num_handled;
|
||||||
|
guint num_outgoing;
|
||||||
guint32 serial;
|
guint32 serial;
|
||||||
} FilterData;
|
} FilterData;
|
||||||
|
|
||||||
static gboolean
|
static gboolean
|
||||||
filter_func (GDBusConnection *connection,
|
filter_func (GDBusConnection *connection,
|
||||||
GDBusMessage *message,
|
GDBusMessage *message,
|
||||||
|
gboolean incoming,
|
||||||
gpointer user_data)
|
gpointer user_data)
|
||||||
{
|
{
|
||||||
FilterData *data = user_data;
|
FilterData *data = user_data;
|
||||||
guint32 reply_serial;
|
guint32 reply_serial;
|
||||||
|
|
||||||
reply_serial = g_dbus_message_get_reply_serial (message);
|
if (incoming)
|
||||||
if (reply_serial == data->serial)
|
{
|
||||||
data->num_handled += 1;
|
reply_serial = g_dbus_message_get_reply_serial (message);
|
||||||
|
if (reply_serial == data->serial)
|
||||||
|
data->num_handled += 1;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
data->num_outgoing += 1;
|
||||||
|
}
|
||||||
|
|
||||||
return FALSE;
|
return FALSE;
|
||||||
}
|
}
|
||||||
@ -638,6 +647,7 @@ test_connection_filter (void)
|
|||||||
g_assert (r != NULL);
|
g_assert (r != NULL);
|
||||||
g_object_unref (r);
|
g_object_unref (r);
|
||||||
g_assert_cmpint (data.num_handled, ==, 3);
|
g_assert_cmpint (data.num_handled, ==, 3);
|
||||||
|
g_assert_cmpint (data.num_outgoing, ==, 3);
|
||||||
|
|
||||||
_g_object_wait_for_single_ref (c);
|
_g_object_wait_for_single_ref (c);
|
||||||
g_object_unref (c);
|
g_object_unref (c);
|
||||||
|
Loading…
Reference in New Issue
Block a user