diff --git a/docs/reference/gio/overview.xml b/docs/reference/gio/overview.xml index 5b7a570fc..a2954b64b 100644 --- a/docs/reference/gio/overview.xml +++ b/docs/reference/gio/overview.xml @@ -339,6 +339,10 @@ cause GLib to print out different types of debugging information when using the D-Bus routines. + + transport + Show IO activity (e.g. reads and writes) + message Show all sent and received D-Bus messages diff --git a/gio/gdbusprivate.c b/gio/gdbusprivate.c index d718a39a4..60f9bbfdd 100644 --- a/gio/gdbusprivate.c +++ b/gio/gdbusprivate.c @@ -40,6 +40,7 @@ #include "giostream.h" #include "gsocketcontrolmessage.h" #include "gsocketconnection.h" +#include "gsocketoutputstream.h" #ifdef G_OS_UNIX #include "gunixfdmessage.h" @@ -386,16 +387,19 @@ struct GDBusWorker /* used for writing */ GMutex *write_lock; GQueue *write_queue; - gboolean write_is_pending; + gint num_writes_pending; guint64 write_num_messages_written; GList *write_pending_flushes; }; +/* ---------------------------------------------------------------------------------------------------- */ + typedef struct { GMutex *mutex; GCond *cond; guint64 number_to_wait_for; + GError *error; } FlushData; struct _MessageToWriteData ; @@ -403,6 +407,14 @@ typedef struct _MessageToWriteData MessageToWriteData; static void message_to_write_data_free (MessageToWriteData *data); +static void read_message_print_transport_debug (gssize bytes_read, + GDBusWorker *worker); + +static void write_message_print_transport_debug (gssize bytes_written, + MessageToWriteData *data); + +/* ---------------------------------------------------------------------------------------------------- */ + static GDBusWorker * _g_dbus_worker_ref (GDBusWorker *worker) { @@ -646,6 +658,8 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, goto out; } + read_message_print_transport_debug (bytes_read, worker); + worker->read_buffer_cur_size += bytes_read; if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size) { @@ -803,14 +817,20 @@ _g_dbus_worker_do_read (GDBusWorker *worker) struct _MessageToWriteData { + GDBusWorker *worker; GDBusMessage *message; gchar *blob; gsize blob_size; + + gsize total_written; + GSimpleAsyncResult *simple; + }; static void message_to_write_data_free (MessageToWriteData *data) { + _g_dbus_worker_unref (data->worker); g_object_unref (data->message); g_free (data->blob); g_free (data); @@ -818,115 +838,316 @@ message_to_write_data_free (MessageToWriteData *data) /* ---------------------------------------------------------------------------------------------------- */ +static void write_message_continue_writing (MessageToWriteData *data); + +/* called in private thread shared by all GDBusConnection instances (without write-lock held) */ +static void +write_message_async_cb (GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + MessageToWriteData *data = user_data; + GSimpleAsyncResult *simple; + gssize bytes_written; + GError *error; + + /* Note: we can't access data->simple after calling g_async_result_complete () because the + * callback can free @data and we're not completing in idle. So use a copy of the pointer. + */ + simple = data->simple; + + error = NULL; + bytes_written = g_output_stream_write_finish (G_OUTPUT_STREAM (source_object), + res, + &error); + if (bytes_written == -1) + { + g_simple_async_result_set_from_error (simple, error); + g_error_free (error); + g_simple_async_result_complete (simple); + g_object_unref (simple); + goto out; + } + g_assert (bytes_written > 0); /* zero is never returned */ + + write_message_print_transport_debug (bytes_written, data); + + data->total_written += bytes_written; + g_assert (data->total_written <= data->blob_size); + if (data->total_written == data->blob_size) + { + g_simple_async_result_complete (simple); + g_object_unref (simple); + goto out; + } + + write_message_continue_writing (data); + + out: + ; +} + /* called in private thread shared by all GDBusConnection instances (without write-lock held) */ static gboolean -write_message (GDBusWorker *worker, - MessageToWriteData *data, - GError **error) +on_socket_ready (GSocket *socket, + GIOCondition condition, + gpointer user_data) { - gboolean ret; - GList *l; - GList *ll; + MessageToWriteData *data = user_data; + write_message_continue_writing (data); + return FALSE; /* remove source */ +} - g_return_val_if_fail (data->blob_size > 16, FALSE); +/* called in private thread shared by all GDBusConnection instances (without write-lock held) */ +static void +write_message_continue_writing (MessageToWriteData *data) +{ + GOutputStream *ostream; + GSimpleAsyncResult *simple; +#ifdef G_OS_UNIX + GUnixFDList *fd_list; +#endif - ret = FALSE; - - /* First, the initial 16 bytes - special case UNIX sockets here - * since it may involve writing an ancillary message with file - * descriptors + /* Note: we can't access data->simple after calling g_async_result_complete () because the + * callback can free @data and we're not completing in idle. So use a copy of the pointer. */ + simple = data->simple; + + ostream = g_io_stream_get_output_stream (data->worker->stream); +#ifdef G_OS_UNIX + fd_list = g_dbus_message_get_unix_fd_list (data->message); +#endif + + g_assert (!g_output_stream_has_pending (ostream)); + g_assert_cmpint (data->total_written, <, data->blob_size); + if (FALSE) { } #ifdef G_OS_UNIX - else if (worker->socket != NULL) + else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0) { GOutputVector vector; - GSocketControlMessage *message; - GUnixFDList *fd_list; + GSocketControlMessage *control_message; gssize bytes_written; - - fd_list = g_dbus_message_get_unix_fd_list (data->message); - - message = NULL; - if (fd_list != NULL) - { - if (!G_IS_UNIX_CONNECTION (worker->stream)) - { - g_set_error (error, - G_IO_ERROR, - G_IO_ERROR_INVALID_ARGUMENT, - "Tried sending a file descriptor on unsupported stream of type %s", - g_type_name (G_TYPE_FROM_INSTANCE (worker->stream))); - goto out; - } - else if (!(worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING)) - { - g_set_error_literal (error, - G_IO_ERROR, - G_IO_ERROR_INVALID_ARGUMENT, - "Tried sending a file descriptor but remote peer does not support this capability"); - goto out; - } - message = g_unix_fd_message_new_with_fd_list (fd_list); - } + GError *error; vector.buffer = data->blob; - vector.size = 16; + vector.size = data->blob_size; - bytes_written = g_socket_send_message (worker->socket, + control_message = NULL; + if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0) + { + if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING)) + { + g_simple_async_result_set_error (simple, + G_IO_ERROR, + G_IO_ERROR_FAILED, + "Tried sending a file descriptor but remote peer does not support this capability"); + g_simple_async_result_complete (simple); + g_object_unref (simple); + goto out; + } + control_message = g_unix_fd_message_new_with_fd_list (fd_list); + } + + error = NULL; + bytes_written = g_socket_send_message (data->worker->socket, NULL, /* address */ &vector, 1, - message != NULL ? &message : NULL, - message != NULL ? 1 : 0, + control_message != NULL ? &control_message : NULL, + control_message != NULL ? 1 : 0, G_SOCKET_MSG_NONE, - worker->cancellable, - error); + data->worker->cancellable, + &error); + if (control_message != NULL) + g_object_unref (control_message); + if (bytes_written == -1) { - g_prefix_error (error, _("Error writing first 16 bytes of message to socket: ")); - if (message != NULL) - g_object_unref (message); + /* Handle WOULD_BLOCK by waiting until there's room in the buffer */ + if (error->domain == G_IO_ERROR && error->code == G_IO_ERROR_WOULD_BLOCK) + { + GSource *source; + source = g_socket_create_source (data->worker->socket, + G_IO_OUT | G_IO_HUP | G_IO_ERR, + data->worker->cancellable); + g_source_set_callback (source, + (GSourceFunc) on_socket_ready, + data, + NULL); /* GDestroyNotify */ + g_source_attach (source, g_main_context_get_thread_default ()); + g_source_unref (source); + goto out; + } + g_simple_async_result_set_from_error (simple, error); + g_error_free (error); + g_simple_async_result_complete (simple); + g_object_unref (simple); goto out; } - if (message != NULL) - g_object_unref (message); + g_assert (bytes_written > 0); /* zero is never returned */ - if (bytes_written < 16) + write_message_print_transport_debug (bytes_written, data); + + data->total_written += bytes_written; + g_assert (data->total_written <= data->blob_size); + if (data->total_written == data->blob_size) { - /* TODO: I think this needs to be handled ... are we guaranteed that the ancillary - * messages are sent? - */ - g_assert_not_reached (); + g_simple_async_result_complete (simple); + g_object_unref (simple); + goto out; } + + write_message_continue_writing (data); } -#endif /* #ifdef G_OS_UNIX */ +#endif else { - /* write the first 16 bytes (guaranteed to return an error if everything can't be written) */ - if (!g_output_stream_write_all (g_io_stream_get_output_stream (worker->stream), - (const gchar *) data->blob, - 16, - NULL, /* bytes_written */ - worker->cancellable, /* cancellable */ - error)) - goto out; + if (fd_list != NULL) + { + g_simple_async_result_set_error (simple, + G_IO_ERROR, + G_IO_ERROR_FAILED, + "Tried sending a file descriptor on unsupported stream of type %s", + g_type_name (G_TYPE_FROM_INSTANCE (ostream))); + g_simple_async_result_complete (simple); + g_object_unref (simple); + goto out; + } + + g_output_stream_write_async (ostream, + (const gchar *) data->blob + data->total_written, + data->blob_size - data->total_written, + G_PRIORITY_DEFAULT, + data->worker->cancellable, + write_message_async_cb, + data); + } + out: + ; +} + +/* called in private thread shared by all GDBusConnection instances (without write-lock held) */ +static void +write_message_async (GDBusWorker *worker, + MessageToWriteData *data, + GAsyncReadyCallback callback, + gpointer user_data) +{ + data->simple = g_simple_async_result_new (NULL, + callback, + user_data, + write_message_async); + data->total_written = 0; + write_message_continue_writing (data); +} + +/* called in private thread shared by all GDBusConnection instances (without write-lock held) */ +static gboolean +write_message_finish (GAsyncResult *res, + GError **error) +{ + g_warn_if_fail (g_simple_async_result_get_source_tag (G_SIMPLE_ASYNC_RESULT (res)) == write_message_async); + if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res), error)) + return FALSE; + else + return TRUE; +} +/* ---------------------------------------------------------------------------------------------------- */ + +static void maybe_write_next_message (GDBusWorker *worker); + +typedef struct +{ + GDBusWorker *worker; + GList *flushers; +} FlushAsyncData; + +/* called in private thread shared by all GDBusConnection instances (without write-lock held) */ +static void +ostream_flush_cb (GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + FlushAsyncData *data = user_data; + GError *error; + GList *l; + + error = NULL; + g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object), + res, + &error); + + if (error == NULL) + { + if (G_UNLIKELY (_g_dbus_debug_transport ())) + { + _g_dbus_debug_print_lock (); + g_print ("========================================================================\n" + "GDBus-debug:Transport:\n" + " ---- FLUSHED stream of type %s\n", + g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream)))); + _g_dbus_debug_print_unlock (); + } } - /* Then write the rest of the message (guaranteed to return an error if everything can't be written) */ - if (!g_output_stream_write_all (g_io_stream_get_output_stream (worker->stream), - (const gchar *) data->blob + 16, - data->blob_size - 16, - NULL, /* bytes_written */ - worker->cancellable, /* cancellable */ - error)) - goto out; + g_assert (data->flushers != NULL); + for (l = data->flushers; l != NULL; l = l->next) + { + FlushData *f = l->data; - ret = TRUE; + f->error = error != NULL ? g_error_copy (error) : NULL; - /* wake up pending flushes */ + g_mutex_lock (f->mutex); + g_cond_signal (f->cond); + g_mutex_unlock (f->mutex); + } + g_list_free (data->flushers); + + if (error != NULL) + g_error_free (error); + + /* OK, cool, finally kick off the next write */ + maybe_write_next_message (data->worker); + + _g_dbus_worker_unref (data->worker); + g_free (data); +} + +/* called in private thread shared by all GDBusConnection instances (without write-lock held) */ +static void +message_written (GDBusWorker *worker, + MessageToWriteData *message_data) +{ + GList *l; + GList *ll; + GList *flushers; + + /* first log the fact that we wrote a message */ + if (G_UNLIKELY (_g_dbus_debug_message ())) + { + gchar *s; + _g_dbus_debug_print_lock (); + g_print ("========================================================================\n" + "GDBus-debug:Message:\n" + " >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n", + message_data->blob_size); + s = g_dbus_message_print (message_data->message, 2); + g_print ("%s", s); + g_free (s); + if (G_UNLIKELY (_g_dbus_debug_payload ())) + { + s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2); + g_print ("%s\n", s); + g_free (s); + } + _g_dbus_debug_print_unlock (); + } + + /* then first wake up pending flushes and, if needed, flush the stream */ + flushers = NULL; g_mutex_lock (worker->write_lock); worker->write_num_messages_written += 1; for (l = worker->write_pending_flushes; l != NULL; l = ll) @@ -936,81 +1157,114 @@ write_message (GDBusWorker *worker, if (f->number_to_wait_for == worker->write_num_messages_written) { - g_mutex_lock (f->mutex); - g_cond_signal (f->cond); - g_mutex_unlock (f->mutex); + flushers = g_list_append (flushers, f); worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l); } } g_mutex_unlock (worker->write_lock); - if (G_UNLIKELY (_g_dbus_debug_message ())) + if (flushers != NULL) { - gchar *s; - _g_dbus_debug_print_lock (); - g_print ("========================================================================\n" - "GDBus-debug:Message:\n" - " >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n", - data->blob_size); - s = g_dbus_message_print (data->message, 2); - g_print ("%s", s); - g_free (s); - if (G_UNLIKELY (_g_dbus_debug_payload ())) - { - s = _g_dbus_hexdump (data->blob, data->blob_size, 2); - g_print ("%s\n", s); - g_free (s); - } - _g_dbus_debug_print_unlock (); + FlushAsyncData *data; + data = g_new0 (FlushAsyncData, 1); + data->worker = _g_dbus_worker_ref (worker); + data->flushers = flushers; + /* flush the stream before writing the next message */ + g_output_stream_flush_async (g_io_stream_get_output_stream (worker->stream), + G_PRIORITY_DEFAULT, + worker->cancellable, + ostream_flush_cb, + data); + } + else + { + /* kick off the next write! */ + maybe_write_next_message (worker); } - - out: - return ret; } -/* ---------------------------------------------------------------------------------------------------- */ +/* called in private thread shared by all GDBusConnection instances (without write-lock held) */ +static void +write_message_cb (GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + MessageToWriteData *data = user_data; + GError *error; + + g_mutex_lock (data->worker->write_lock); + data->worker->num_writes_pending -= 1; + g_mutex_unlock (data->worker->write_lock); + + error = NULL; + if (!write_message_finish (res, &error)) + { + /* TODO: handle */ + _g_dbus_worker_emit_disconnected (data->worker, TRUE, error); + g_error_free (error); + } + + /* this function will also kick of the next write (it might need to + * flush so writing the next message might happen much later + * e.g. async) + */ + message_written (data->worker, data); + + message_to_write_data_free (data); +} /* called in private thread shared by all GDBusConnection instances (without write-lock held) */ -static gboolean -write_message_in_idle_cb (gpointer user_data) +static void +maybe_write_next_message (GDBusWorker *worker) { - GDBusWorker *worker = user_data; - gboolean more_writes_are_pending; MessageToWriteData *data; - gboolean message_was_dropped; - GError *error; + + write_next: g_mutex_lock (worker->write_lock); data = g_queue_pop_head (worker->write_queue); - g_assert (data != NULL); - more_writes_are_pending = (g_queue_get_length (worker->write_queue) > 0); - worker->write_is_pending = more_writes_are_pending; + if (data != NULL) + worker->num_writes_pending += 1; g_mutex_unlock (worker->write_lock); /* Note that write_lock is only used for protecting the @write_queue - * and @write_is_pending fields of the GDBusWorker struct ... which we + * and @num_writes_pending fields of the GDBusWorker struct ... which we * need to modify from arbitrary threads in _g_dbus_worker_send_message(). * * Therefore, it's fine to drop it here when calling back into user * code and then writing the message out onto the GIOStream since this * function only runs on the worker thread. */ - message_was_dropped = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message); - if (G_LIKELY (!message_was_dropped)) + if (data != NULL) { - error = NULL; - if (!write_message (worker, - data, - &error)) + gboolean message_was_dropped; + message_was_dropped = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message); + if (G_UNLIKELY (message_was_dropped)) { - /* TODO: handle */ - _g_dbus_worker_emit_disconnected (worker, TRUE, error); - g_error_free (error); + g_mutex_lock (worker->write_lock); + worker->num_writes_pending -= 1; + g_mutex_unlock (worker->write_lock); + message_to_write_data_free (data); + goto write_next; + } + else + { + write_message_async (worker, + data, + write_message_cb, + data); } } - message_to_write_data_free (data); +} - return more_writes_are_pending; +/* called in private thread shared by all GDBusConnection instances (without write-lock held) */ +static gboolean +write_message_in_idle_cb (gpointer user_data) +{ + GDBusWorker *worker = user_data; + if (worker->num_writes_pending == 0) + maybe_write_next_message (worker); + return FALSE; } /* ---------------------------------------------------------------------------------------------------- */ @@ -1029,18 +1283,16 @@ _g_dbus_worker_send_message (GDBusWorker *worker, g_return_if_fail (blob_len > 16); data = g_new0 (MessageToWriteData, 1); + data->worker = _g_dbus_worker_ref (worker); data->message = g_object_ref (message); data->blob = blob; /* steal! */ data->blob_size = blob_len; g_mutex_lock (worker->write_lock); g_queue_push_tail (worker->write_queue, data); - if (!worker->write_is_pending) + if (worker->num_writes_pending == 0) { GSource *idle_source; - - worker->write_is_pending = TRUE; - idle_source = g_idle_source_new (); g_source_set_priority (idle_source, G_PRIORITY_DEFAULT); g_source_set_callback (idle_source, @@ -1145,6 +1397,7 @@ _g_dbus_worker_flush_sync (GDBusWorker *worker, FlushData *data; data = NULL; + ret = TRUE; /* if the queue is empty, there's nothing to wait for */ g_mutex_lock (worker->write_lock); @@ -1164,29 +1417,32 @@ _g_dbus_worker_flush_sync (GDBusWorker *worker, g_cond_wait (data->cond, data->mutex); g_mutex_unlock (data->mutex); - /* note:the element is removed from worker->write_pending_flushes in write_message() */ + /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */ g_cond_free (data->cond); g_mutex_free (data->mutex); + if (data->error != NULL) + { + ret = FALSE; + g_propagate_error (error, data->error); + } g_free (data); } - ret = g_output_stream_flush (g_io_stream_get_output_stream (worker->stream), - cancellable, - error); return ret; } /* ---------------------------------------------------------------------------------------------------- */ #define G_DBUS_DEBUG_AUTHENTICATION (1<<0) -#define G_DBUS_DEBUG_MESSAGE (1<<1) -#define G_DBUS_DEBUG_PAYLOAD (1<<2) -#define G_DBUS_DEBUG_CALL (1<<3) -#define G_DBUS_DEBUG_SIGNAL (1<<4) -#define G_DBUS_DEBUG_INCOMING (1<<5) -#define G_DBUS_DEBUG_RETURN (1<<6) -#define G_DBUS_DEBUG_EMISSION (1<<7) -#define G_DBUS_DEBUG_ADDRESS (1<<8) +#define G_DBUS_DEBUG_TRANSPORT (1<<1) +#define G_DBUS_DEBUG_MESSAGE (1<<2) +#define G_DBUS_DEBUG_PAYLOAD (1<<3) +#define G_DBUS_DEBUG_CALL (1<<4) +#define G_DBUS_DEBUG_SIGNAL (1<<5) +#define G_DBUS_DEBUG_INCOMING (1<<6) +#define G_DBUS_DEBUG_RETURN (1<<7) +#define G_DBUS_DEBUG_EMISSION (1<<8) +#define G_DBUS_DEBUG_ADDRESS (1<<9) static gint _gdbus_debug_flags = 0; @@ -1197,6 +1453,13 @@ _g_dbus_debug_authentication (void) return (_gdbus_debug_flags & G_DBUS_DEBUG_AUTHENTICATION) != 0; } +gboolean +_g_dbus_debug_transport (void) +{ + _g_dbus_initialize (); + return (_gdbus_debug_flags & G_DBUS_DEBUG_TRANSPORT) != 0; +} + gboolean _g_dbus_debug_message (void) { @@ -1292,6 +1555,7 @@ _g_dbus_initialize (void) { const GDebugKey keys[] = { { "authentication", G_DBUS_DEBUG_AUTHENTICATION }, + { "transport", G_DBUS_DEBUG_TRANSPORT }, { "message", G_DBUS_DEBUG_MESSAGE }, { "payload", G_DBUS_DEBUG_PAYLOAD }, { "call", G_DBUS_DEBUG_CALL }, @@ -1448,3 +1712,76 @@ _g_dbus_enum_to_string (GType enum_type, gint value) } /* ---------------------------------------------------------------------------------------------------- */ + +static void +write_message_print_transport_debug (gssize bytes_written, + MessageToWriteData *data) +{ + if (G_LIKELY (!_g_dbus_debug_transport ())) + goto out; + + _g_dbus_debug_print_lock (); + g_print ("========================================================================\n" + "GDBus-debug:Transport:\n" + " >>>> WROTE %" G_GSIZE_FORMAT " bytes of message with serial %d and\n" + " size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n", + bytes_written, + g_dbus_message_get_serial (data->message), + data->blob_size, + data->total_written, + g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream)))); + _g_dbus_debug_print_unlock (); + out: + ; +} + +/* ---------------------------------------------------------------------------------------------------- */ + +static void +read_message_print_transport_debug (gssize bytes_read, + GDBusWorker *worker) +{ + gsize size; + gint32 serial; + gint32 message_length; + + if (G_LIKELY (!_g_dbus_debug_transport ())) + goto out; + + size = bytes_read + worker->read_buffer_cur_size; + serial = 0; + message_length = 0; + if (size >= 16) + message_length = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, size, NULL); + if (size >= 1) + { + switch (worker->read_buffer[0]) + { + case 'l': + if (size >= 12) + serial = GUINT32_FROM_LE (((guint32 *) worker->read_buffer)[2]); + break; + case 'B': + if (size >= 12) + serial = GUINT32_FROM_BE (((guint32 *) worker->read_buffer)[2]); + break; + default: + /* an error will be set elsewhere if this happens */ + goto out; + } + } + + _g_dbus_debug_print_lock (); + g_print ("========================================================================\n" + "GDBus-debug:Transport:\n" + " <<<< READ %" G_GSIZE_FORMAT " bytes of message with serial %d and\n" + " size %d to offset %" G_GSIZE_FORMAT " from a %s\n", + bytes_read, + serial, + message_length, + worker->read_buffer_cur_size, + g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream)))); + _g_dbus_debug_print_unlock (); + out: + ; +} diff --git a/gio/gdbusprivate.h b/gio/gdbusprivate.h index a226623f6..ae8d416cd 100644 --- a/gio/gdbusprivate.h +++ b/gio/gdbusprivate.h @@ -80,6 +80,7 @@ gboolean _g_dbus_worker_flush_sync (GDBusWorker *worker, void _g_dbus_initialize (void); gboolean _g_dbus_debug_authentication (void); +gboolean _g_dbus_debug_transport (void); gboolean _g_dbus_debug_message (void); gboolean _g_dbus_debug_payload (void); gboolean _g_dbus_debug_call (void); diff --git a/gio/tests/gdbus-connection.c b/gio/tests/gdbus-connection.c index 6d3c48367..fc0274628 100644 --- a/gio/tests/gdbus-connection.c +++ b/gio/tests/gdbus-connection.c @@ -917,6 +917,85 @@ test_connection_basic (void) /* ---------------------------------------------------------------------------------------------------- */ +/* Message size > 20MiB ... should be enough to make sure the message + * is fragmented when shoved across any transport + */ +#define LARGE_MESSAGE_STRING_LENGTH (20*1024*1024) + +static void +large_message_on_name_appeared (GDBusConnection *connection, + const gchar *name, + const gchar *name_owner, + gpointer user_data) +{ + GError *error; + gchar *request; + const gchar *reply; + GVariant *result; + guint n; + + request = g_new (gchar, LARGE_MESSAGE_STRING_LENGTH + 1); + for (n = 0; n < LARGE_MESSAGE_STRING_LENGTH; n++) + request[n] = '0' + (n%10); + request[n] = '\0'; + + error = NULL; + result = g_dbus_connection_call_sync (connection, + "com.example.TestService", /* bus name */ + "/com/example/TestObject", /* object path */ + "com.example.Frob", /* interface name */ + "HelloWorld", /* method name */ + g_variant_new ("(s)", request), /* parameters */ + G_VARIANT_TYPE ("(s)"), /* return type */ + G_DBUS_CALL_FLAGS_NONE, + -1, + NULL, + &error); + g_assert_no_error (error); + g_assert (result != NULL); + g_variant_get (result, "(&s)", &reply); + g_assert_cmpint (strlen (reply), >, LARGE_MESSAGE_STRING_LENGTH); + g_assert (g_str_has_prefix (reply, "You greeted me with '01234567890123456789012")); + g_assert (g_str_has_suffix (reply, "6789'. Thanks!")); + g_variant_unref (result); + + g_free (request); + + g_main_loop_quit (loop); +} + +static void +large_message_on_name_vanished (GDBusConnection *connection, + const gchar *name, + gpointer user_data) +{ +} + +static void +test_connection_large_message (void) +{ + guint watcher_id; + + session_bus_up (); + + /* this is safe; testserver will exit once the bus goes away */ + g_assert (g_spawn_command_line_async (SRCDIR "/gdbus-testserver.py", NULL)); + + watcher_id = g_bus_watch_name (G_BUS_TYPE_SESSION, + "com.example.TestService", + G_BUS_NAME_WATCHER_FLAGS_NONE, + large_message_on_name_appeared, + large_message_on_name_vanished, + NULL, /* user_data */ + NULL); /* GDestroyNotify */ + g_main_loop_run (loop); + g_bus_unwatch_name (watcher_id); + + session_bus_down (); +} + +/* ---------------------------------------------------------------------------------------------------- */ + int main (int argc, char *argv[]) @@ -939,5 +1018,6 @@ main (int argc, g_test_add_func ("/gdbus/connection/signals", test_connection_signals); g_test_add_func ("/gdbus/connection/filter", test_connection_filter); g_test_add_func ("/gdbus/connection/flush", test_connection_flush); + g_test_add_func ("/gdbus/connection/large_message", test_connection_large_message); return g_test_run(); } diff --git a/gio/tests/gdbus-peer.c b/gio/tests/gdbus-peer.c index 662fc3e6a..a904c7a68 100644 --- a/gio/tests/gdbus-peer.c +++ b/gio/tests/gdbus-peer.c @@ -1250,30 +1250,42 @@ test_credentials (void) /* ---------------------------------------------------------------------------------------------------- */ -#if 0 /* def G_OS_UNIX disabled while it fails */ +#ifdef G_OS_UNIX + +/* Chosen to be big enough to overflow the socket buffer */ +#define OVERFLOW_NUM_SIGNALS 5000 +#define OVERFLOW_TIMEOUT_SEC 10 + static gboolean -signal_count_cb (GDBusConnection *connection, - GDBusMessage *message, - gboolean incoming, - gpointer user_data) +overflow_filter_func (GDBusConnection *connection, + GDBusMessage *message, + gboolean incoming, + gpointer user_data) { - volatile int *p = user_data; - (*p)++; - return TRUE; + volatile gint *counter = user_data; + *counter += 1; + return FALSE; /* don't drop the message */ +} + +static gboolean +overflow_on_500ms_later_func (gpointer user_data) +{ + g_main_loop_quit (loop); + return FALSE; /* don't keep the idle */ } static void test_overflow (void) { - gint sv[2], i; + gint sv[2]; + gint n; GSocket *socket; GSocketConnection *socket_connection; GDBusConnection *producer, *consumer; GError *error; - gchar *guid; - pid_t child; GTimer *timer; - volatile int counter = 0; + volatile gint n_messages_received; + volatile gint n_messages_sent; g_assert_cmpint (socketpair (AF_UNIX, SOCK_STREAM, 0, sv), ==, 0); @@ -1287,14 +1299,16 @@ test_overflow (void) NULL, /* guid */ G_DBUS_CONNECTION_FLAGS_NONE, NULL, /* GDBusAuthObserver */ - NULL, + NULL, /* GCancellable */ &error); g_dbus_connection_set_exit_on_close (producer, TRUE); g_assert_no_error (error); g_object_unref (socket_connection); + n_messages_sent = 0; + g_dbus_connection_add_filter (producer, overflow_filter_func, (gpointer) &n_messages_sent, NULL); /* send enough data that we get an EAGAIN */ - for (i = 0; i < 1000; i++) + for (n = 0; n < OVERFLOW_NUM_SIGNALS; n++) { error = NULL; g_dbus_connection_emit_signal (producer, @@ -1304,41 +1318,46 @@ test_overflow (void) "Member", g_variant_new ("(s)", "a string"), &error); - /* run the main event loop - otherwise GDBusConnection::closed won't be fired */ - g_main_context_iteration (NULL, FALSE); g_assert_no_error (error); - static gint count = 0; - g_print ("%d ", count++); } + /* sleep for 0.5 sec (to allow the GDBus IO thread to fill up the + * kernel buffers) and verify that n_messages_sent < + * OVERFLOW_NUM_SIGNALS + * + * This is to verify that not all the submitted messages have been + * sent to the underlying transport. + */ + g_timeout_add (500, overflow_on_500ms_later_func, NULL); + g_main_loop_run (loop); + g_assert_cmpint (n_messages_sent, <, OVERFLOW_NUM_SIGNALS); + /* now suck it all out as a client, and add it up */ socket = g_socket_new_from_fd (sv[1], &error); g_assert_no_error (error); socket_connection = g_socket_connection_factory_create_connection (socket); g_assert (socket_connection != NULL); g_object_unref (socket); - guid = g_dbus_generate_guid (); consumer = g_dbus_connection_new_sync (G_IO_STREAM (socket_connection), - guid, + NULL, /* guid */ G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING, NULL, /* GDBusAuthObserver */ - NULL, + NULL, /* GCancellable */ &error); - g_dbus_connection_add_filter (consumer, signal_count_cb, &counter, NULL); - g_dbus_connection_start_message_processing (consumer); - - g_free (guid); g_assert_no_error (error); g_object_unref (socket_connection); + n_messages_received = 0; + g_dbus_connection_add_filter (consumer, overflow_filter_func, (gpointer) &n_messages_received, NULL); + g_dbus_connection_start_message_processing (consumer); timer = g_timer_new (); g_timer_start (timer); - while (counter < 1000 && - g_timer_elapsed (timer, NULL) < 5.0) + while (n_messages_received < OVERFLOW_NUM_SIGNALS && g_timer_elapsed (timer, NULL) < OVERFLOW_TIMEOUT_SEC) g_main_context_iteration (NULL, FALSE); - g_assert (counter == 1000); + g_assert_cmpint (n_messages_sent, ==, OVERFLOW_NUM_SIGNALS); + g_assert_cmpint (n_messages_received, ==, OVERFLOW_NUM_SIGNALS); g_timer_destroy (timer); g_object_unref (consumer); @@ -1348,6 +1367,7 @@ test_overflow (void) static void test_overflow (void) { + /* TODO: test this with e.g. GWin32InputStream/GWin32OutputStream */ } #endif