Bug 623142 – Ensure ::new-connection runs before processing D-Bus messages

Without this guarantee, peer-to-peer connections are not very
useful. However, with this guarantee it's possible to export objects
in a handler for the GDBusServer::new-connection signal.

There are two caveats with this patch

 - it won't work on message bus connections
 - we don't queue up messages to be written

that can be addresses later if needed.

https://bugzilla.gnome.org/show_bug.cgi?id=623142

Signed-off-by: David Zeuthen <davidz@redhat.com>
This commit is contained in:
David Zeuthen 2010-06-30 11:43:42 -04:00
parent 137ae2413c
commit 038d03cd08
9 changed files with 316 additions and 8 deletions

View File

@ -2390,6 +2390,7 @@ g_dbus_connection_new_sync
g_dbus_connection_new_for_address
g_dbus_connection_new_for_address_finish
g_dbus_connection_new_for_address_sync
g_dbus_connection_start_message_processing
GDBusCapabilityFlags
g_dbus_connection_close
g_dbus_connection_is_closed

View File

@ -830,6 +830,23 @@ g_dbus_connection_get_stream (GDBusConnection *connection)
return connection->priv->stream;
}
/**
* g_dbus_connection_start_message_processing:
* @connection: A #GDBusConnection.
*
* If @connection was created with
* %G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING, this method
* starts processing messages. Does nothing on if @connection wasn't
* created with this flag or if the method has already been called.
*
* Since: 2.26
*/
void
g_dbus_connection_start_message_processing (GDBusConnection *connection)
{
g_return_if_fail (G_IS_DBUS_CONNECTION (connection));
_g_dbus_worker_unfreeze (connection->priv->worker);
}
/**
* g_dbus_connection_is_closed:
@ -1877,16 +1894,27 @@ initable_init (GInitable *initable,
connection->priv->worker = _g_dbus_worker_new (connection->priv->stream,
connection->priv->capabilities,
(connection->priv->flags & G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING),
on_worker_message_received,
on_worker_message_about_to_be_sent,
on_worker_closed,
connection);
/* if a bus connection, invoke org.freedesktop.DBus.Hello - this is how we're getting a name */
/* if a bus connection, call org.freedesktop.DBus.Hello - this is how we're getting a name */
if (connection->priv->flags & G_DBUS_CONNECTION_FLAGS_MESSAGE_BUS_CONNECTION)
{
GVariant *hello_result;
/* we could lift this restriction by adding code in gdbusprivate.c */
if (connection->priv->flags & G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING)
{
g_set_error_literal (&connection->priv->initialization_error,
G_IO_ERROR,
G_IO_ERROR_FAILED,
"Cannot use DELAY_MESSAGE_PROCESSING with MESSAGE_BUS_CONNECTION");
goto out;
}
hello_result = g_dbus_connection_call_sync (connection,
"org.freedesktop.DBus", /* name */
"/org/freedesktop/DBus", /* path */

View File

@ -128,6 +128,7 @@ GDBusConnection *g_dbus_connection_new_for_address_sync (const gchar
/* ---------------------------------------------------------------------------------------------------- */
void g_dbus_connection_start_message_processing (GDBusConnection *connection);
gboolean g_dbus_connection_is_closed (GDBusConnection *connection);
void g_dbus_connection_close (GDBusConnection *connection);
GIOStream *g_dbus_connection_get_stream (GDBusConnection *connection);

View File

@ -350,7 +350,16 @@ _g_dbus_shared_thread_unref (void)
struct GDBusWorker
{
volatile gint ref_count;
gboolean stopped;
/* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
* only affects messages received from the other peer (since GDBusServer is the
* only user) - we might want it to affect messages sent to the other peer too?
*/
gboolean frozen;
GQueue *received_messages_while_frozen;
GIOStream *stream;
GDBusCapabilityFlags capabilities;
GCancellable *cancellable;
@ -406,11 +415,13 @@ _g_dbus_worker_unref (GDBusWorker *worker)
if (worker->read_fd_list != NULL)
g_object_unref (worker->read_fd_list);
g_queue_foreach (worker->received_messages_while_frozen, (GFunc) g_object_unref, NULL);
g_queue_free (worker->received_messages_while_frozen);
g_mutex_free (worker->write_lock);
g_queue_foreach (worker->write_queue,
(GFunc) message_to_write_data_free,
NULL);
g_queue_foreach (worker->write_queue, (GFunc) message_to_write_data_free, NULL);
g_queue_free (worker->write_queue);
g_free (worker);
}
}
@ -443,6 +454,66 @@ _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker *worker,
return ret;
}
/* can only be called from private thread with read-lock held - takes ownership of @message */
static void
_g_dbus_worker_queue_or_deliver_received_message (GDBusWorker *worker,
GDBusMessage *message)
{
if (worker->frozen || g_queue_get_length (worker->received_messages_while_frozen) > 0)
{
/* queue up */
g_queue_push_tail (worker->received_messages_while_frozen, message);
}
else
{
/* not frozen, nor anything in queue */
_g_dbus_worker_emit_message_received (worker, message);
g_object_unref (message);
}
}
/* called in private thread shared by all GDBusConnection instances (without read-lock held) */
static gboolean
unfreeze_in_idle_cb (gpointer user_data)
{
GDBusWorker *worker = user_data;
GDBusMessage *message;
g_mutex_lock (worker->read_lock);
if (worker->frozen)
{
while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
{
_g_dbus_worker_emit_message_received (worker, message);
g_object_unref (message);
}
worker->frozen = FALSE;
}
else
{
g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
}
g_mutex_unlock (worker->read_lock);
return FALSE;
}
/* can be called from any thread */
void
_g_dbus_worker_unfreeze (GDBusWorker *worker)
{
GSource *idle_source;
idle_source = g_idle_source_new ();
g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
g_source_set_callback (idle_source,
unfreeze_in_idle_cb,
_g_dbus_worker_ref (worker),
(GDestroyNotify) _g_dbus_worker_unref);
g_source_attach (idle_source, shared_thread_data->context);
g_source_unref (idle_source);
}
/* ---------------------------------------------------------------------------------------------------- */
static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
/* called in private thread shared by all GDBusConnection instances (without read-lock held) */
@ -639,8 +710,7 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream,
}
/* yay, got a message, go deliver it */
_g_dbus_worker_emit_message_received (worker, message);
g_object_unref (message);
_g_dbus_worker_queue_or_deliver_received_message (worker, message);
/* start reading another message! */
worker->read_buffer_bytes_wanted = 0;
@ -952,6 +1022,7 @@ _g_dbus_worker_thread_begin_func (gpointer user_data)
GDBusWorker *
_g_dbus_worker_new (GIOStream *stream,
GDBusCapabilityFlags capabilities,
gboolean initially_frozen,
GDBusWorkerMessageReceivedCallback message_received_callback,
GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
GDBusWorkerDisconnectedCallback disconnected_callback,
@ -976,6 +1047,9 @@ _g_dbus_worker_new (GIOStream *stream,
worker->capabilities = capabilities;
worker->cancellable = g_cancellable_new ();
worker->frozen = initially_frozen;
worker->received_messages_while_frozen = g_queue_new ();
worker->write_lock = g_mutex_new ();
worker->write_queue = g_queue_new ();

View File

@ -53,6 +53,7 @@ typedef void (*GDBusWorkerDisconnectedCallback) (GDBusWorker *worker,
*/
GDBusWorker *_g_dbus_worker_new (GIOStream *stream,
GDBusCapabilityFlags capabilities,
gboolean initially_frozen,
GDBusWorkerMessageReceivedCallback message_received_callback,
GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
GDBusWorkerDisconnectedCallback disconnected_callback,
@ -67,6 +68,9 @@ void _g_dbus_worker_send_message (GDBusWorker *worker,
/* can be called from any thread */
void _g_dbus_worker_stop (GDBusWorker *worker);
/* can be called from any thread */
void _g_dbus_worker_unfreeze (GDBusWorker *worker);
/* ---------------------------------------------------------------------------------------------------- */
void _g_dbus_initialize (void);

View File

@ -367,6 +367,11 @@ g_dbus_server_class_init (GDBusServerClass *klass)
* linkend="g-main-context-push-thread-default">thread-default main
* loop</link> of the thread that @server was constructed in.
*
* You are guaranteed that signal handlers for this signal runs
* before incoming messages on @connection are processed. This means
* that it's suitable to call g_dbus_connection_register_object() or
* similar from the signal handler.
*
* Since: 2.26
*/
_signals[NEW_CONNECTION_SIGNAL] = g_signal_new ("new-connection",
@ -889,6 +894,7 @@ emit_new_connection_in_idle (gpointer user_data)
_signals[NEW_CONNECTION_SIGNAL],
0,
data->connection);
g_dbus_connection_start_message_processing (data->connection);
g_object_unref (data->connection);
return FALSE;
@ -925,7 +931,9 @@ on_run (GSocketService *service,
goto out;
}
connection_flags = G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_SERVER;
connection_flags =
G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_SERVER |
G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING;
if (server->priv->flags & G_DBUS_SERVER_FLAGS_AUTHENTICATION_ALLOW_ANONYMOUS)
connection_flags |= G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_ALLOW_ANONYMOUS;
@ -944,6 +952,7 @@ on_run (GSocketService *service,
_signals[NEW_CONNECTION_SIGNAL],
0,
connection);
g_dbus_connection_start_message_processing (connection);
g_object_unref (connection);
}
else

View File

@ -1529,6 +1529,7 @@ g_dbus_connection_new_for_address
g_dbus_connection_new_for_address_finish
g_dbus_connection_new_for_address_sync
g_dbus_connection_new_sync
g_dbus_connection_start_message_processing
g_dbus_connection_get_capabilities
g_dbus_connection_get_exit_on_close
g_dbus_connection_get_guid

View File

@ -969,6 +969,8 @@ typedef enum
* method.
* @G_DBUS_CONNECTION_FLAGS_MESSAGE_BUS_CONNECTION: Pass this flag if connecting to a peer that is a
* message bus. This means that the Hello() method will be invoked as part of the connection setup.
* @G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING: If set, processing of D-Bus messages is
* delayed until g_dbus_connection_start_message_processing() is called.
*
* Flags used when creating a new #GDBusConnection.
*
@ -979,7 +981,8 @@ typedef enum {
G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_CLIENT = (1<<0),
G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_SERVER = (1<<1),
G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_ALLOW_ANONYMOUS = (1<<2),
G_DBUS_CONNECTION_FLAGS_MESSAGE_BUS_CONNECTION = (1<<3)
G_DBUS_CONNECTION_FLAGS_MESSAGE_BUS_CONNECTION = (1<<3),
G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING = (1<<4)
} GDBusConnectionFlags;
/**

View File

@ -732,6 +732,192 @@ test_peer (void)
/* ---------------------------------------------------------------------------------------------------- */
typedef struct
{
GDBusServer *server;
GMainContext *context;
GMainLoop *loop;
GList *connections;
} DmpData;
static void
dmp_data_free (DmpData *data)
{
g_main_loop_unref (data->loop);
g_main_context_unref (data->context);
g_object_unref (data->server);
g_list_foreach (data->connections, (GFunc) g_object_unref, NULL);
g_list_free (data->connections);
g_free (data);
}
static void
dmp_on_method_call (GDBusConnection *connection,
const gchar *sender,
const gchar *object_path,
const gchar *interface_name,
const gchar *method_name,
GVariant *parameters,
GDBusMethodInvocation *invocation,
gpointer user_data)
{
//DmpData *data = user_data;
gint32 first;
gint32 second;
g_variant_get (parameters,
"(ii)",
&first,
&second);
g_dbus_method_invocation_return_value (invocation,
g_variant_new ("(i)", first + second));
}
static const GDBusInterfaceVTable dmp_interface_vtable =
{
dmp_on_method_call,
NULL, /* get_property */
NULL /* set_property */
};
/* Runs in thread we created GDBusServer in (since we didn't pass G_DBUS_SERVER_FLAGS_RUN_IN_THREAD) */
static void
dmp_on_new_connection (GDBusServer *server,
GDBusConnection *connection,
gpointer user_data)
{
DmpData *data = user_data;
GDBusNodeInfo *node;
GError *error;
/* accept the connection */
data->connections = g_list_prepend (data->connections, g_object_ref (connection));
error = NULL;
node = g_dbus_node_info_new_for_xml ("<node>"
" <interface name='org.gtk.GDBus.DmpInterface'>"
" <method name='AddPair'>"
" <arg type='i' name='first' direction='in'/>"
" <arg type='i' name='second' direction='in'/>"
" <arg type='i' name='sum' direction='out'/>"
" </method>"
" </interface>"
"</node>",
&error);
g_assert_no_error (error);
/* sleep 100ms before exporting an object - this is to test that
* G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING really works
* (GDBusServer uses this feature).
*/
usleep (100 * 1000);
/* export an object */
error = NULL;
g_dbus_connection_register_object (connection,
"/dmp/test",
node->interfaces[0],
&dmp_interface_vtable,
data,
NULL,
&error);
//g_dbus_node_info_unref (node);
}
static gpointer
dmp_thread_func (gpointer user_data)
{
DmpData *data = user_data;
GError *error;
gchar *guid;
data->context = g_main_context_new ();
g_main_context_push_thread_default (data->context);
error = NULL;
guid = g_dbus_generate_guid ();
data->server = g_dbus_server_new_sync ("nonce-tcp:",
G_DBUS_SERVER_FLAGS_NONE,
guid,
NULL, /* GDBusAuthObserver */
NULL, /* GCancellable */
&error);
g_assert_no_error (error);
g_signal_connect (data->server,
"new-connection",
G_CALLBACK (dmp_on_new_connection),
data);
g_dbus_server_start (data->server);
data->loop = g_main_loop_new (data->context, FALSE);
g_main_loop_run (data->loop);
g_main_context_pop_thread_default (data->context);
g_free (guid);
return NULL;
}
static void
delayed_message_processing (void)
{
GError *error;
DmpData *data;
GThread *service_thread;
guint n;
data = g_new0 (DmpData, 1);
error = NULL;
service_thread = g_thread_create (dmp_thread_func,
data,
TRUE,
&error);
while (data->server == NULL || !g_dbus_server_is_active (data->server))
g_thread_yield ();
for (n = 0; n < 5; n++)
{
GDBusConnection *c;
GVariant *res;
gint32 val;
error = NULL;
c = g_dbus_connection_new_for_address_sync (g_dbus_server_get_client_address (data->server),
G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_CLIENT,
NULL, /* GDBusAuthObserver */
NULL, /* GCancellable */
&error);
g_assert_no_error (error);
error = NULL;
res = g_dbus_connection_call_sync (c,
NULL, /* bus name */
"/dmp/test",
"org.gtk.GDBus.DmpInterface",
"AddPair",
g_variant_new ("(ii)", 2, n),
G_VARIANT_TYPE ("(i)"),
G_DBUS_CALL_FLAGS_NONE,
-1, /* timeout_msec */
NULL, /* GCancellable */
&error);
g_assert_no_error (error);
g_variant_get (res, "(i)", &val);
g_assert_cmpint (val, ==, 2 + n);
g_variant_unref (res);
g_object_unref (c);
}
g_main_loop_quit (data->loop);
g_thread_join (service_thread);
dmp_data_free (data);
}
/* ---------------------------------------------------------------------------------------------------- */
int
main (int argc,
char *argv[])
@ -753,6 +939,7 @@ main (int argc,
loop = g_main_loop_new (NULL, FALSE);
g_test_add_func ("/gdbus/peer-to-peer", test_peer);
g_test_add_func ("/gdbus/delayed-message-processing", delayed_message_processing);
ret = g_test_run();