Bug 626748 – Use async methods for writing and handle EAGAIN

If sending a lot of data and/or the other peer is not reading it, then
socket buffers can overflow. This is communicated from the kernel by
returning EAGAIN. In GIO, it is modelled by g_output_stream_write()
and g_socket_send_message() returning G_IO_ERROR_WOULD_BLOCK.

It is also problematic that that we're using synchronous IO in the
shared GDBus IO thread. It means that one GDBusConnection can lock up
others.

It turns out that by porting from g_output_stream_write() to
g_output_stream_write_async() we fix the EAGAIN issue. For GSocket, we
still need to handle things manually (by creating a GSource) as
g_socket_send_message() is used.

We check the new behavior in Michael's producer/consumer test case (at
/gdbus/overflow in gdbus-peer.c) added in the last commit.

Also add a test case that sends and receives a 20 MiB message.

Also add a new `transport' G_DBUS_DEBUG option so it is easy to
inspect partial writes:

 $ G_DBUS_DEBUG=transport ./gdbus-connection -p /gdbus/connection/large_message
 [...]
 ========================================================================
 GDBus-debug:Transport:
   >>>> WROTE 128000 bytes of message with serial 4 and
        size 20971669 from offset 0 on a GSocketOutputStream
 ========================================================================
 GDBus-debug:Transport:
   >>>> WROTE 128000 bytes of message with serial 4 and
        size 20971669 from offset 128000 on a GSocketOutputStream
 ========================================================================
 GDBus-debug:Transport:
   >>>> WROTE 128000 bytes of message with serial 4 and
        size 20971669 from offset 256000 on a GSocketOutputStream
 [...]
 ========================================================================
 GDBus-debug:Transport:
   >>>> WROTE 43669 bytes of message with serial 4 and
        size 20971669 from offset 20928000 on a GSocketOutputStream
 [...]
 ========================================================================
 GDBus-debug:Transport:
   <<<< READ 16 bytes of message with serial 3 and
        size 20971620 to offset 0 from a GSocketInputStream
 ========================================================================
 GDBus-debug:Transport:
   <<<< READ 15984 bytes of message with serial 3 and
        size 20971620 to offset 16 from a GSocketInputStream
 ========================================================================
 GDBus-debug:Transport:
   <<<< READ 16000 bytes of message with serial 3 and
        size 20971620 to offset 16000 from a GSocketInputStream
 [...]
 ========================================================================
 GDBus-debug:Transport:
   <<<< READ 144000 bytes of message with serial 3 and
        size 20971620 to offset 20720000 from a GSocketInputStream
 ========================================================================
 GDBus-debug:Transport:
   <<<< READ 107620 bytes of message with serial 3 and
        size 20971620 to offset 20864000 from a GSocketInputStream
 OK

https://bugzilla.gnome.org/show_bug.cgi?id=626748

Signed-off-by: David Zeuthen <davidz@redhat.com>
This commit is contained in:
David Zeuthen 2010-08-16 13:43:35 -04:00
parent a6264a3a19
commit 8a3a4596e2
5 changed files with 607 additions and 165 deletions

View File

@ -339,6 +339,10 @@
cause GLib to print out different types of debugging cause GLib to print out different types of debugging
information when using the D-Bus routines. information when using the D-Bus routines.
<variablelist> <variablelist>
<varlistentry>
<term>transport</term>
<listitem><para>Show IO activity (e.g. reads and writes)</para></listitem>
</varlistentry>
<varlistentry> <varlistentry>
<term>message</term> <term>message</term>
<listitem><para>Show all sent and received D-Bus messages</para></listitem> <listitem><para>Show all sent and received D-Bus messages</para></listitem>

View File

@ -40,6 +40,7 @@
#include "giostream.h" #include "giostream.h"
#include "gsocketcontrolmessage.h" #include "gsocketcontrolmessage.h"
#include "gsocketconnection.h" #include "gsocketconnection.h"
#include "gsocketoutputstream.h"
#ifdef G_OS_UNIX #ifdef G_OS_UNIX
#include "gunixfdmessage.h" #include "gunixfdmessage.h"
@ -386,16 +387,19 @@ struct GDBusWorker
/* used for writing */ /* used for writing */
GMutex *write_lock; GMutex *write_lock;
GQueue *write_queue; GQueue *write_queue;
gboolean write_is_pending; gint num_writes_pending;
guint64 write_num_messages_written; guint64 write_num_messages_written;
GList *write_pending_flushes; GList *write_pending_flushes;
}; };
/* ---------------------------------------------------------------------------------------------------- */
typedef struct typedef struct
{ {
GMutex *mutex; GMutex *mutex;
GCond *cond; GCond *cond;
guint64 number_to_wait_for; guint64 number_to_wait_for;
GError *error;
} FlushData; } FlushData;
struct _MessageToWriteData ; struct _MessageToWriteData ;
@ -403,6 +407,14 @@ typedef struct _MessageToWriteData MessageToWriteData;
static void message_to_write_data_free (MessageToWriteData *data); static void message_to_write_data_free (MessageToWriteData *data);
static void read_message_print_transport_debug (gssize bytes_read,
GDBusWorker *worker);
static void write_message_print_transport_debug (gssize bytes_written,
MessageToWriteData *data);
/* ---------------------------------------------------------------------------------------------------- */
static GDBusWorker * static GDBusWorker *
_g_dbus_worker_ref (GDBusWorker *worker) _g_dbus_worker_ref (GDBusWorker *worker)
{ {
@ -646,6 +658,8 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream,
goto out; goto out;
} }
read_message_print_transport_debug (bytes_read, worker);
worker->read_buffer_cur_size += bytes_read; worker->read_buffer_cur_size += bytes_read;
if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size) if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
{ {
@ -803,14 +817,20 @@ _g_dbus_worker_do_read (GDBusWorker *worker)
struct _MessageToWriteData struct _MessageToWriteData
{ {
GDBusWorker *worker;
GDBusMessage *message; GDBusMessage *message;
gchar *blob; gchar *blob;
gsize blob_size; gsize blob_size;
gsize total_written;
GSimpleAsyncResult *simple;
}; };
static void static void
message_to_write_data_free (MessageToWriteData *data) message_to_write_data_free (MessageToWriteData *data)
{ {
_g_dbus_worker_unref (data->worker);
g_object_unref (data->message); g_object_unref (data->message);
g_free (data->blob); g_free (data->blob);
g_free (data); g_free (data);
@ -818,115 +838,316 @@ message_to_write_data_free (MessageToWriteData *data)
/* ---------------------------------------------------------------------------------------------------- */ /* ---------------------------------------------------------------------------------------------------- */
static void write_message_continue_writing (MessageToWriteData *data);
/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
static void
write_message_async_cb (GObject *source_object,
GAsyncResult *res,
gpointer user_data)
{
MessageToWriteData *data = user_data;
GSimpleAsyncResult *simple;
gssize bytes_written;
GError *error;
/* Note: we can't access data->simple after calling g_async_result_complete () because the
* callback can free @data and we're not completing in idle. So use a copy of the pointer.
*/
simple = data->simple;
error = NULL;
bytes_written = g_output_stream_write_finish (G_OUTPUT_STREAM (source_object),
res,
&error);
if (bytes_written == -1)
{
g_simple_async_result_set_from_error (simple, error);
g_error_free (error);
g_simple_async_result_complete (simple);
g_object_unref (simple);
goto out;
}
g_assert (bytes_written > 0); /* zero is never returned */
write_message_print_transport_debug (bytes_written, data);
data->total_written += bytes_written;
g_assert (data->total_written <= data->blob_size);
if (data->total_written == data->blob_size)
{
g_simple_async_result_complete (simple);
g_object_unref (simple);
goto out;
}
write_message_continue_writing (data);
out:
;
}
/* called in private thread shared by all GDBusConnection instances (without write-lock held) */ /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
static gboolean static gboolean
write_message (GDBusWorker *worker, on_socket_ready (GSocket *socket,
MessageToWriteData *data, GIOCondition condition,
GError **error) gpointer user_data)
{ {
gboolean ret; MessageToWriteData *data = user_data;
GList *l; write_message_continue_writing (data);
GList *ll; return FALSE; /* remove source */
}
g_return_val_if_fail (data->blob_size > 16, FALSE); /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
static void
write_message_continue_writing (MessageToWriteData *data)
{
GOutputStream *ostream;
GSimpleAsyncResult *simple;
#ifdef G_OS_UNIX
GUnixFDList *fd_list;
#endif
ret = FALSE; /* Note: we can't access data->simple after calling g_async_result_complete () because the
* callback can free @data and we're not completing in idle. So use a copy of the pointer.
/* First, the initial 16 bytes - special case UNIX sockets here
* since it may involve writing an ancillary message with file
* descriptors
*/ */
simple = data->simple;
ostream = g_io_stream_get_output_stream (data->worker->stream);
#ifdef G_OS_UNIX
fd_list = g_dbus_message_get_unix_fd_list (data->message);
#endif
g_assert (!g_output_stream_has_pending (ostream));
g_assert_cmpint (data->total_written, <, data->blob_size);
if (FALSE) if (FALSE)
{ {
} }
#ifdef G_OS_UNIX #ifdef G_OS_UNIX
else if (worker->socket != NULL) else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0)
{ {
GOutputVector vector; GOutputVector vector;
GSocketControlMessage *message; GSocketControlMessage *control_message;
GUnixFDList *fd_list;
gssize bytes_written; gssize bytes_written;
GError *error;
fd_list = g_dbus_message_get_unix_fd_list (data->message);
message = NULL;
if (fd_list != NULL)
{
if (!G_IS_UNIX_CONNECTION (worker->stream))
{
g_set_error (error,
G_IO_ERROR,
G_IO_ERROR_INVALID_ARGUMENT,
"Tried sending a file descriptor on unsupported stream of type %s",
g_type_name (G_TYPE_FROM_INSTANCE (worker->stream)));
goto out;
}
else if (!(worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
{
g_set_error_literal (error,
G_IO_ERROR,
G_IO_ERROR_INVALID_ARGUMENT,
"Tried sending a file descriptor but remote peer does not support this capability");
goto out;
}
message = g_unix_fd_message_new_with_fd_list (fd_list);
}
vector.buffer = data->blob; vector.buffer = data->blob;
vector.size = 16; vector.size = data->blob_size;
bytes_written = g_socket_send_message (worker->socket, control_message = NULL;
if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0)
{
if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
{
g_simple_async_result_set_error (simple,
G_IO_ERROR,
G_IO_ERROR_FAILED,
"Tried sending a file descriptor but remote peer does not support this capability");
g_simple_async_result_complete (simple);
g_object_unref (simple);
goto out;
}
control_message = g_unix_fd_message_new_with_fd_list (fd_list);
}
error = NULL;
bytes_written = g_socket_send_message (data->worker->socket,
NULL, /* address */ NULL, /* address */
&vector, &vector,
1, 1,
message != NULL ? &message : NULL, control_message != NULL ? &control_message : NULL,
message != NULL ? 1 : 0, control_message != NULL ? 1 : 0,
G_SOCKET_MSG_NONE, G_SOCKET_MSG_NONE,
worker->cancellable, data->worker->cancellable,
error); &error);
if (control_message != NULL)
g_object_unref (control_message);
if (bytes_written == -1) if (bytes_written == -1)
{ {
g_prefix_error (error, _("Error writing first 16 bytes of message to socket: ")); /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
if (message != NULL) if (error->domain == G_IO_ERROR && error->code == G_IO_ERROR_WOULD_BLOCK)
g_object_unref (message); {
GSource *source;
source = g_socket_create_source (data->worker->socket,
G_IO_OUT | G_IO_HUP | G_IO_ERR,
data->worker->cancellable);
g_source_set_callback (source,
(GSourceFunc) on_socket_ready,
data,
NULL); /* GDestroyNotify */
g_source_attach (source, g_main_context_get_thread_default ());
g_source_unref (source);
goto out; goto out;
} }
if (message != NULL) g_simple_async_result_set_from_error (simple, error);
g_object_unref (message); g_error_free (error);
g_simple_async_result_complete (simple);
g_object_unref (simple);
goto out;
}
g_assert (bytes_written > 0); /* zero is never returned */
if (bytes_written < 16) write_message_print_transport_debug (bytes_written, data);
data->total_written += bytes_written;
g_assert (data->total_written <= data->blob_size);
if (data->total_written == data->blob_size)
{ {
/* TODO: I think this needs to be handled ... are we guaranteed that the ancillary g_simple_async_result_complete (simple);
* messages are sent? g_object_unref (simple);
*/ goto out;
g_assert_not_reached ();
} }
write_message_continue_writing (data);
} }
#endif /* #ifdef G_OS_UNIX */ #endif
else else
{ {
/* write the first 16 bytes (guaranteed to return an error if everything can't be written) */ if (fd_list != NULL)
if (!g_output_stream_write_all (g_io_stream_get_output_stream (worker->stream), {
(const gchar *) data->blob, g_simple_async_result_set_error (simple,
16, G_IO_ERROR,
NULL, /* bytes_written */ G_IO_ERROR_FAILED,
worker->cancellable, /* cancellable */ "Tried sending a file descriptor on unsupported stream of type %s",
error)) g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
g_simple_async_result_complete (simple);
g_object_unref (simple);
goto out; goto out;
} }
/* Then write the rest of the message (guaranteed to return an error if everything can't be written) */ g_output_stream_write_async (ostream,
if (!g_output_stream_write_all (g_io_stream_get_output_stream (worker->stream), (const gchar *) data->blob + data->total_written,
(const gchar *) data->blob + 16, data->blob_size - data->total_written,
data->blob_size - 16, G_PRIORITY_DEFAULT,
NULL, /* bytes_written */ data->worker->cancellable,
worker->cancellable, /* cancellable */ write_message_async_cb,
error)) data);
goto out; }
out:
;
}
ret = TRUE; /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
static void
write_message_async (GDBusWorker *worker,
MessageToWriteData *data,
GAsyncReadyCallback callback,
gpointer user_data)
{
data->simple = g_simple_async_result_new (NULL,
callback,
user_data,
write_message_async);
data->total_written = 0;
write_message_continue_writing (data);
}
/* wake up pending flushes */ /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
static gboolean
write_message_finish (GAsyncResult *res,
GError **error)
{
g_warn_if_fail (g_simple_async_result_get_source_tag (G_SIMPLE_ASYNC_RESULT (res)) == write_message_async);
if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res), error))
return FALSE;
else
return TRUE;
}
/* ---------------------------------------------------------------------------------------------------- */
static void maybe_write_next_message (GDBusWorker *worker);
typedef struct
{
GDBusWorker *worker;
GList *flushers;
} FlushAsyncData;
/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
static void
ostream_flush_cb (GObject *source_object,
GAsyncResult *res,
gpointer user_data)
{
FlushAsyncData *data = user_data;
GError *error;
GList *l;
error = NULL;
g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
res,
&error);
if (error == NULL)
{
if (G_UNLIKELY (_g_dbus_debug_transport ()))
{
_g_dbus_debug_print_lock ();
g_print ("========================================================================\n"
"GDBus-debug:Transport:\n"
" ---- FLUSHED stream of type %s\n",
g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
_g_dbus_debug_print_unlock ();
}
}
g_assert (data->flushers != NULL);
for (l = data->flushers; l != NULL; l = l->next)
{
FlushData *f = l->data;
f->error = error != NULL ? g_error_copy (error) : NULL;
g_mutex_lock (f->mutex);
g_cond_signal (f->cond);
g_mutex_unlock (f->mutex);
}
g_list_free (data->flushers);
if (error != NULL)
g_error_free (error);
/* OK, cool, finally kick off the next write */
maybe_write_next_message (data->worker);
_g_dbus_worker_unref (data->worker);
g_free (data);
}
/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
static void
message_written (GDBusWorker *worker,
MessageToWriteData *message_data)
{
GList *l;
GList *ll;
GList *flushers;
/* first log the fact that we wrote a message */
if (G_UNLIKELY (_g_dbus_debug_message ()))
{
gchar *s;
_g_dbus_debug_print_lock ();
g_print ("========================================================================\n"
"GDBus-debug:Message:\n"
" >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
message_data->blob_size);
s = g_dbus_message_print (message_data->message, 2);
g_print ("%s", s);
g_free (s);
if (G_UNLIKELY (_g_dbus_debug_payload ()))
{
s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2);
g_print ("%s\n", s);
g_free (s);
}
_g_dbus_debug_print_unlock ();
}
/* then first wake up pending flushes and, if needed, flush the stream */
flushers = NULL;
g_mutex_lock (worker->write_lock); g_mutex_lock (worker->write_lock);
worker->write_num_messages_written += 1; worker->write_num_messages_written += 1;
for (l = worker->write_pending_flushes; l != NULL; l = ll) for (l = worker->write_pending_flushes; l != NULL; l = ll)
@ -936,81 +1157,114 @@ write_message (GDBusWorker *worker,
if (f->number_to_wait_for == worker->write_num_messages_written) if (f->number_to_wait_for == worker->write_num_messages_written)
{ {
g_mutex_lock (f->mutex); flushers = g_list_append (flushers, f);
g_cond_signal (f->cond);
g_mutex_unlock (f->mutex);
worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l); worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
} }
} }
g_mutex_unlock (worker->write_lock); g_mutex_unlock (worker->write_lock);
if (G_UNLIKELY (_g_dbus_debug_message ())) if (flushers != NULL)
{ {
gchar *s; FlushAsyncData *data;
_g_dbus_debug_print_lock (); data = g_new0 (FlushAsyncData, 1);
g_print ("========================================================================\n" data->worker = _g_dbus_worker_ref (worker);
"GDBus-debug:Message:\n" data->flushers = flushers;
" >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n", /* flush the stream before writing the next message */
data->blob_size); g_output_stream_flush_async (g_io_stream_get_output_stream (worker->stream),
s = g_dbus_message_print (data->message, 2); G_PRIORITY_DEFAULT,
g_print ("%s", s); worker->cancellable,
g_free (s); ostream_flush_cb,
if (G_UNLIKELY (_g_dbus_debug_payload ())) data);
}
else
{ {
s = _g_dbus_hexdump (data->blob, data->blob_size, 2); /* kick off the next write! */
g_print ("%s\n", s); maybe_write_next_message (worker);
g_free (s);
} }
_g_dbus_debug_print_unlock ();
} }
out:
return ret;
}
/* ---------------------------------------------------------------------------------------------------- */
/* called in private thread shared by all GDBusConnection instances (without write-lock held) */ /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
static gboolean static void
write_message_in_idle_cb (gpointer user_data) write_message_cb (GObject *source_object,
GAsyncResult *res,
gpointer user_data)
{ {
GDBusWorker *worker = user_data; MessageToWriteData *data = user_data;
gboolean more_writes_are_pending;
MessageToWriteData *data;
gboolean message_was_dropped;
GError *error; GError *error;
g_mutex_lock (data->worker->write_lock);
data->worker->num_writes_pending -= 1;
g_mutex_unlock (data->worker->write_lock);
error = NULL;
if (!write_message_finish (res, &error))
{
/* TODO: handle */
_g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
g_error_free (error);
}
/* this function will also kick of the next write (it might need to
* flush so writing the next message might happen much later
* e.g. async)
*/
message_written (data->worker, data);
message_to_write_data_free (data);
}
/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
static void
maybe_write_next_message (GDBusWorker *worker)
{
MessageToWriteData *data;
write_next:
g_mutex_lock (worker->write_lock); g_mutex_lock (worker->write_lock);
data = g_queue_pop_head (worker->write_queue); data = g_queue_pop_head (worker->write_queue);
g_assert (data != NULL); if (data != NULL)
more_writes_are_pending = (g_queue_get_length (worker->write_queue) > 0); worker->num_writes_pending += 1;
worker->write_is_pending = more_writes_are_pending;
g_mutex_unlock (worker->write_lock); g_mutex_unlock (worker->write_lock);
/* Note that write_lock is only used for protecting the @write_queue /* Note that write_lock is only used for protecting the @write_queue
* and @write_is_pending fields of the GDBusWorker struct ... which we * and @num_writes_pending fields of the GDBusWorker struct ... which we
* need to modify from arbitrary threads in _g_dbus_worker_send_message(). * need to modify from arbitrary threads in _g_dbus_worker_send_message().
* *
* Therefore, it's fine to drop it here when calling back into user * Therefore, it's fine to drop it here when calling back into user
* code and then writing the message out onto the GIOStream since this * code and then writing the message out onto the GIOStream since this
* function only runs on the worker thread. * function only runs on the worker thread.
*/ */
if (data != NULL)
{
gboolean message_was_dropped;
message_was_dropped = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message); message_was_dropped = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
if (G_LIKELY (!message_was_dropped)) if (G_UNLIKELY (message_was_dropped))
{ {
error = NULL; g_mutex_lock (worker->write_lock);
if (!write_message (worker, worker->num_writes_pending -= 1;
data, g_mutex_unlock (worker->write_lock);
&error))
{
/* TODO: handle */
_g_dbus_worker_emit_disconnected (worker, TRUE, error);
g_error_free (error);
}
}
message_to_write_data_free (data); message_to_write_data_free (data);
goto write_next;
}
else
{
write_message_async (worker,
data,
write_message_cb,
data);
}
}
}
return more_writes_are_pending; /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
static gboolean
write_message_in_idle_cb (gpointer user_data)
{
GDBusWorker *worker = user_data;
if (worker->num_writes_pending == 0)
maybe_write_next_message (worker);
return FALSE;
} }
/* ---------------------------------------------------------------------------------------------------- */ /* ---------------------------------------------------------------------------------------------------- */
@ -1029,18 +1283,16 @@ _g_dbus_worker_send_message (GDBusWorker *worker,
g_return_if_fail (blob_len > 16); g_return_if_fail (blob_len > 16);
data = g_new0 (MessageToWriteData, 1); data = g_new0 (MessageToWriteData, 1);
data->worker = _g_dbus_worker_ref (worker);
data->message = g_object_ref (message); data->message = g_object_ref (message);
data->blob = blob; /* steal! */ data->blob = blob; /* steal! */
data->blob_size = blob_len; data->blob_size = blob_len;
g_mutex_lock (worker->write_lock); g_mutex_lock (worker->write_lock);
g_queue_push_tail (worker->write_queue, data); g_queue_push_tail (worker->write_queue, data);
if (!worker->write_is_pending) if (worker->num_writes_pending == 0)
{ {
GSource *idle_source; GSource *idle_source;
worker->write_is_pending = TRUE;
idle_source = g_idle_source_new (); idle_source = g_idle_source_new ();
g_source_set_priority (idle_source, G_PRIORITY_DEFAULT); g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
g_source_set_callback (idle_source, g_source_set_callback (idle_source,
@ -1145,6 +1397,7 @@ _g_dbus_worker_flush_sync (GDBusWorker *worker,
FlushData *data; FlushData *data;
data = NULL; data = NULL;
ret = TRUE;
/* if the queue is empty, there's nothing to wait for */ /* if the queue is empty, there's nothing to wait for */
g_mutex_lock (worker->write_lock); g_mutex_lock (worker->write_lock);
@ -1164,29 +1417,32 @@ _g_dbus_worker_flush_sync (GDBusWorker *worker,
g_cond_wait (data->cond, data->mutex); g_cond_wait (data->cond, data->mutex);
g_mutex_unlock (data->mutex); g_mutex_unlock (data->mutex);
/* note:the element is removed from worker->write_pending_flushes in write_message() */ /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
g_cond_free (data->cond); g_cond_free (data->cond);
g_mutex_free (data->mutex); g_mutex_free (data->mutex);
if (data->error != NULL)
{
ret = FALSE;
g_propagate_error (error, data->error);
}
g_free (data); g_free (data);
} }
ret = g_output_stream_flush (g_io_stream_get_output_stream (worker->stream),
cancellable,
error);
return ret; return ret;
} }
/* ---------------------------------------------------------------------------------------------------- */ /* ---------------------------------------------------------------------------------------------------- */
#define G_DBUS_DEBUG_AUTHENTICATION (1<<0) #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
#define G_DBUS_DEBUG_MESSAGE (1<<1) #define G_DBUS_DEBUG_TRANSPORT (1<<1)
#define G_DBUS_DEBUG_PAYLOAD (1<<2) #define G_DBUS_DEBUG_MESSAGE (1<<2)
#define G_DBUS_DEBUG_CALL (1<<3) #define G_DBUS_DEBUG_PAYLOAD (1<<3)
#define G_DBUS_DEBUG_SIGNAL (1<<4) #define G_DBUS_DEBUG_CALL (1<<4)
#define G_DBUS_DEBUG_INCOMING (1<<5) #define G_DBUS_DEBUG_SIGNAL (1<<5)
#define G_DBUS_DEBUG_RETURN (1<<6) #define G_DBUS_DEBUG_INCOMING (1<<6)
#define G_DBUS_DEBUG_EMISSION (1<<7) #define G_DBUS_DEBUG_RETURN (1<<7)
#define G_DBUS_DEBUG_ADDRESS (1<<8) #define G_DBUS_DEBUG_EMISSION (1<<8)
#define G_DBUS_DEBUG_ADDRESS (1<<9)
static gint _gdbus_debug_flags = 0; static gint _gdbus_debug_flags = 0;
@ -1197,6 +1453,13 @@ _g_dbus_debug_authentication (void)
return (_gdbus_debug_flags & G_DBUS_DEBUG_AUTHENTICATION) != 0; return (_gdbus_debug_flags & G_DBUS_DEBUG_AUTHENTICATION) != 0;
} }
gboolean
_g_dbus_debug_transport (void)
{
_g_dbus_initialize ();
return (_gdbus_debug_flags & G_DBUS_DEBUG_TRANSPORT) != 0;
}
gboolean gboolean
_g_dbus_debug_message (void) _g_dbus_debug_message (void)
{ {
@ -1292,6 +1555,7 @@ _g_dbus_initialize (void)
{ {
const GDebugKey keys[] = { const GDebugKey keys[] = {
{ "authentication", G_DBUS_DEBUG_AUTHENTICATION }, { "authentication", G_DBUS_DEBUG_AUTHENTICATION },
{ "transport", G_DBUS_DEBUG_TRANSPORT },
{ "message", G_DBUS_DEBUG_MESSAGE }, { "message", G_DBUS_DEBUG_MESSAGE },
{ "payload", G_DBUS_DEBUG_PAYLOAD }, { "payload", G_DBUS_DEBUG_PAYLOAD },
{ "call", G_DBUS_DEBUG_CALL }, { "call", G_DBUS_DEBUG_CALL },
@ -1448,3 +1712,76 @@ _g_dbus_enum_to_string (GType enum_type, gint value)
} }
/* ---------------------------------------------------------------------------------------------------- */ /* ---------------------------------------------------------------------------------------------------- */
static void
write_message_print_transport_debug (gssize bytes_written,
MessageToWriteData *data)
{
if (G_LIKELY (!_g_dbus_debug_transport ()))
goto out;
_g_dbus_debug_print_lock ();
g_print ("========================================================================\n"
"GDBus-debug:Transport:\n"
" >>>> WROTE %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
" size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n",
bytes_written,
g_dbus_message_get_serial (data->message),
data->blob_size,
data->total_written,
g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
_g_dbus_debug_print_unlock ();
out:
;
}
/* ---------------------------------------------------------------------------------------------------- */
static void
read_message_print_transport_debug (gssize bytes_read,
GDBusWorker *worker)
{
gsize size;
gint32 serial;
gint32 message_length;
if (G_LIKELY (!_g_dbus_debug_transport ()))
goto out;
size = bytes_read + worker->read_buffer_cur_size;
serial = 0;
message_length = 0;
if (size >= 16)
message_length = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, size, NULL);
if (size >= 1)
{
switch (worker->read_buffer[0])
{
case 'l':
if (size >= 12)
serial = GUINT32_FROM_LE (((guint32 *) worker->read_buffer)[2]);
break;
case 'B':
if (size >= 12)
serial = GUINT32_FROM_BE (((guint32 *) worker->read_buffer)[2]);
break;
default:
/* an error will be set elsewhere if this happens */
goto out;
}
}
_g_dbus_debug_print_lock ();
g_print ("========================================================================\n"
"GDBus-debug:Transport:\n"
" <<<< READ %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
" size %d to offset %" G_GSIZE_FORMAT " from a %s\n",
bytes_read,
serial,
message_length,
worker->read_buffer_cur_size,
g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))));
_g_dbus_debug_print_unlock ();
out:
;
}

View File

@ -80,6 +80,7 @@ gboolean _g_dbus_worker_flush_sync (GDBusWorker *worker,
void _g_dbus_initialize (void); void _g_dbus_initialize (void);
gboolean _g_dbus_debug_authentication (void); gboolean _g_dbus_debug_authentication (void);
gboolean _g_dbus_debug_transport (void);
gboolean _g_dbus_debug_message (void); gboolean _g_dbus_debug_message (void);
gboolean _g_dbus_debug_payload (void); gboolean _g_dbus_debug_payload (void);
gboolean _g_dbus_debug_call (void); gboolean _g_dbus_debug_call (void);

View File

@ -917,6 +917,85 @@ test_connection_basic (void)
/* ---------------------------------------------------------------------------------------------------- */ /* ---------------------------------------------------------------------------------------------------- */
/* Message size > 20MiB ... should be enough to make sure the message
* is fragmented when shoved across any transport
*/
#define LARGE_MESSAGE_STRING_LENGTH (20*1024*1024)
static void
large_message_on_name_appeared (GDBusConnection *connection,
const gchar *name,
const gchar *name_owner,
gpointer user_data)
{
GError *error;
gchar *request;
const gchar *reply;
GVariant *result;
guint n;
request = g_new (gchar, LARGE_MESSAGE_STRING_LENGTH + 1);
for (n = 0; n < LARGE_MESSAGE_STRING_LENGTH; n++)
request[n] = '0' + (n%10);
request[n] = '\0';
error = NULL;
result = g_dbus_connection_call_sync (connection,
"com.example.TestService", /* bus name */
"/com/example/TestObject", /* object path */
"com.example.Frob", /* interface name */
"HelloWorld", /* method name */
g_variant_new ("(s)", request), /* parameters */
G_VARIANT_TYPE ("(s)"), /* return type */
G_DBUS_CALL_FLAGS_NONE,
-1,
NULL,
&error);
g_assert_no_error (error);
g_assert (result != NULL);
g_variant_get (result, "(&s)", &reply);
g_assert_cmpint (strlen (reply), >, LARGE_MESSAGE_STRING_LENGTH);
g_assert (g_str_has_prefix (reply, "You greeted me with '01234567890123456789012"));
g_assert (g_str_has_suffix (reply, "6789'. Thanks!"));
g_variant_unref (result);
g_free (request);
g_main_loop_quit (loop);
}
static void
large_message_on_name_vanished (GDBusConnection *connection,
const gchar *name,
gpointer user_data)
{
}
static void
test_connection_large_message (void)
{
guint watcher_id;
session_bus_up ();
/* this is safe; testserver will exit once the bus goes away */
g_assert (g_spawn_command_line_async (SRCDIR "/gdbus-testserver.py", NULL));
watcher_id = g_bus_watch_name (G_BUS_TYPE_SESSION,
"com.example.TestService",
G_BUS_NAME_WATCHER_FLAGS_NONE,
large_message_on_name_appeared,
large_message_on_name_vanished,
NULL, /* user_data */
NULL); /* GDestroyNotify */
g_main_loop_run (loop);
g_bus_unwatch_name (watcher_id);
session_bus_down ();
}
/* ---------------------------------------------------------------------------------------------------- */
int int
main (int argc, main (int argc,
char *argv[]) char *argv[])
@ -939,5 +1018,6 @@ main (int argc,
g_test_add_func ("/gdbus/connection/signals", test_connection_signals); g_test_add_func ("/gdbus/connection/signals", test_connection_signals);
g_test_add_func ("/gdbus/connection/filter", test_connection_filter); g_test_add_func ("/gdbus/connection/filter", test_connection_filter);
g_test_add_func ("/gdbus/connection/flush", test_connection_flush); g_test_add_func ("/gdbus/connection/flush", test_connection_flush);
g_test_add_func ("/gdbus/connection/large_message", test_connection_large_message);
return g_test_run(); return g_test_run();
} }

View File

@ -1250,30 +1250,42 @@ test_credentials (void)
/* ---------------------------------------------------------------------------------------------------- */ /* ---------------------------------------------------------------------------------------------------- */
#if 0 /* def G_OS_UNIX disabled while it fails */ #ifdef G_OS_UNIX
/* Chosen to be big enough to overflow the socket buffer */
#define OVERFLOW_NUM_SIGNALS 5000
#define OVERFLOW_TIMEOUT_SEC 10
static gboolean static gboolean
signal_count_cb (GDBusConnection *connection, overflow_filter_func (GDBusConnection *connection,
GDBusMessage *message, GDBusMessage *message,
gboolean incoming, gboolean incoming,
gpointer user_data) gpointer user_data)
{ {
volatile int *p = user_data; volatile gint *counter = user_data;
(*p)++; *counter += 1;
return TRUE; return FALSE; /* don't drop the message */
}
static gboolean
overflow_on_500ms_later_func (gpointer user_data)
{
g_main_loop_quit (loop);
return FALSE; /* don't keep the idle */
} }
static void static void
test_overflow (void) test_overflow (void)
{ {
gint sv[2], i; gint sv[2];
gint n;
GSocket *socket; GSocket *socket;
GSocketConnection *socket_connection; GSocketConnection *socket_connection;
GDBusConnection *producer, *consumer; GDBusConnection *producer, *consumer;
GError *error; GError *error;
gchar *guid;
pid_t child;
GTimer *timer; GTimer *timer;
volatile int counter = 0; volatile gint n_messages_received;
volatile gint n_messages_sent;
g_assert_cmpint (socketpair (AF_UNIX, SOCK_STREAM, 0, sv), ==, 0); g_assert_cmpint (socketpair (AF_UNIX, SOCK_STREAM, 0, sv), ==, 0);
@ -1287,14 +1299,16 @@ test_overflow (void)
NULL, /* guid */ NULL, /* guid */
G_DBUS_CONNECTION_FLAGS_NONE, G_DBUS_CONNECTION_FLAGS_NONE,
NULL, /* GDBusAuthObserver */ NULL, /* GDBusAuthObserver */
NULL, NULL, /* GCancellable */
&error); &error);
g_dbus_connection_set_exit_on_close (producer, TRUE); g_dbus_connection_set_exit_on_close (producer, TRUE);
g_assert_no_error (error); g_assert_no_error (error);
g_object_unref (socket_connection); g_object_unref (socket_connection);
n_messages_sent = 0;
g_dbus_connection_add_filter (producer, overflow_filter_func, (gpointer) &n_messages_sent, NULL);
/* send enough data that we get an EAGAIN */ /* send enough data that we get an EAGAIN */
for (i = 0; i < 1000; i++) for (n = 0; n < OVERFLOW_NUM_SIGNALS; n++)
{ {
error = NULL; error = NULL;
g_dbus_connection_emit_signal (producer, g_dbus_connection_emit_signal (producer,
@ -1304,41 +1318,46 @@ test_overflow (void)
"Member", "Member",
g_variant_new ("(s)", "a string"), g_variant_new ("(s)", "a string"),
&error); &error);
/* run the main event loop - otherwise GDBusConnection::closed won't be fired */
g_main_context_iteration (NULL, FALSE);
g_assert_no_error (error); g_assert_no_error (error);
static gint count = 0;
g_print ("%d ", count++);
} }
/* sleep for 0.5 sec (to allow the GDBus IO thread to fill up the
* kernel buffers) and verify that n_messages_sent <
* OVERFLOW_NUM_SIGNALS
*
* This is to verify that not all the submitted messages have been
* sent to the underlying transport.
*/
g_timeout_add (500, overflow_on_500ms_later_func, NULL);
g_main_loop_run (loop);
g_assert_cmpint (n_messages_sent, <, OVERFLOW_NUM_SIGNALS);
/* now suck it all out as a client, and add it up */ /* now suck it all out as a client, and add it up */
socket = g_socket_new_from_fd (sv[1], &error); socket = g_socket_new_from_fd (sv[1], &error);
g_assert_no_error (error); g_assert_no_error (error);
socket_connection = g_socket_connection_factory_create_connection (socket); socket_connection = g_socket_connection_factory_create_connection (socket);
g_assert (socket_connection != NULL); g_assert (socket_connection != NULL);
g_object_unref (socket); g_object_unref (socket);
guid = g_dbus_generate_guid ();
consumer = g_dbus_connection_new_sync (G_IO_STREAM (socket_connection), consumer = g_dbus_connection_new_sync (G_IO_STREAM (socket_connection),
guid, NULL, /* guid */
G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING, G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING,
NULL, /* GDBusAuthObserver */ NULL, /* GDBusAuthObserver */
NULL, NULL, /* GCancellable */
&error); &error);
g_dbus_connection_add_filter (consumer, signal_count_cb, &counter, NULL);
g_dbus_connection_start_message_processing (consumer);
g_free (guid);
g_assert_no_error (error); g_assert_no_error (error);
g_object_unref (socket_connection); g_object_unref (socket_connection);
n_messages_received = 0;
g_dbus_connection_add_filter (consumer, overflow_filter_func, (gpointer) &n_messages_received, NULL);
g_dbus_connection_start_message_processing (consumer);
timer = g_timer_new (); timer = g_timer_new ();
g_timer_start (timer); g_timer_start (timer);
while (counter < 1000 && while (n_messages_received < OVERFLOW_NUM_SIGNALS && g_timer_elapsed (timer, NULL) < OVERFLOW_TIMEOUT_SEC)
g_timer_elapsed (timer, NULL) < 5.0)
g_main_context_iteration (NULL, FALSE); g_main_context_iteration (NULL, FALSE);
g_assert (counter == 1000); g_assert_cmpint (n_messages_sent, ==, OVERFLOW_NUM_SIGNALS);
g_assert_cmpint (n_messages_received, ==, OVERFLOW_NUM_SIGNALS);
g_timer_destroy (timer); g_timer_destroy (timer);
g_object_unref (consumer); g_object_unref (consumer);
@ -1348,6 +1367,7 @@ test_overflow (void)
static void static void
test_overflow (void) test_overflow (void)
{ {
/* TODO: test this with e.g. GWin32InputStream/GWin32OutputStream */
} }
#endif #endif