From f41178c6c79a00a1fb3a99bc3ae7c88a85c833fc Mon Sep 17 00:00:00 2001 From: Simon McVittie Date: Mon, 21 Nov 2011 17:18:01 +0000 Subject: [PATCH] GDBusWorker: move flush async op into continue_writing() This makes it easier to schedule a flush, by putting it on the same code path as writing and closing. Also change message_written to expect the lock to be held, since all that's left in that function either wants to hold the lock or doesn't care, and it's silly to release the lock immediately before calling message_written, which just takes it again. Bug: https://bugzilla.gnome.org/show_bug.cgi?id=662395 Signed-off-by: Simon McVittie Reviewed-by: Cosimo Alfarano --- gio/gdbusprivate.c | 101 ++++++++++++++++++++++++++++++--------------- 1 file changed, 68 insertions(+), 33 deletions(-) diff --git a/gio/gdbusprivate.c b/gio/gdbusprivate.c index 6257b35dc..48606c498 100644 --- a/gio/gdbusprivate.c +++ b/gio/gdbusprivate.c @@ -1128,7 +1128,7 @@ write_message_async (GDBusWorker *worker, write_message_continue_writing (data); } -/* called in private thread shared by all GDBusConnection instances (without write-lock held) */ +/* called in private thread shared by all GDBusConnection instances (with write-lock held) */ static gboolean write_message_finish (GAsyncResult *res, GError **error) @@ -1222,17 +1222,27 @@ ostream_flush_cb (GObject *source_object, /* called in private thread shared by all GDBusConnection instances * * write-lock is not held on entry + * output_pending is PENDING_FLUSH on entry + */ +static void +start_flush (FlushAsyncData *data) +{ + g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream), + G_PRIORITY_DEFAULT, + data->worker->cancellable, + ostream_flush_cb, + data); +} + +/* called in private thread shared by all GDBusConnection instances + * + * write-lock is held on entry * output_pending is PENDING_NONE on entry */ static void -message_written (GDBusWorker *worker, - MessageToWriteData *message_data) +message_written_unlocked (GDBusWorker *worker, + MessageToWriteData *message_data) { - GList *l; - GList *ll; - GList *flushers; - - /* first log the fact that we wrote a message */ if (G_UNLIKELY (_g_dbus_debug_message ())) { gchar *s; @@ -1253,10 +1263,24 @@ message_written (GDBusWorker *worker, _g_dbus_debug_print_unlock (); } - /* then first wake up pending flushes and, if needed, flush the stream */ - flushers = NULL; - g_mutex_lock (&worker->write_lock); worker->write_num_messages_written += 1; +} + +/* called in private thread shared by all GDBusConnection instances + * + * write-lock is held on entry + * output_pending is PENDING_NONE on entry + * + * Returns: non-%NULL, setting @output_pending, if we need to flush now + */ +static FlushAsyncData * +prepare_flush_unlocked (GDBusWorker *worker) +{ + GList *l; + GList *ll; + GList *flushers; + + flushers = NULL; for (l = worker->write_pending_flushes; l != NULL; l = ll) { FlushData *f = l->data; @@ -1273,26 +1297,18 @@ message_written (GDBusWorker *worker, g_assert (worker->output_pending == PENDING_NONE); worker->output_pending = PENDING_FLUSH; } - g_mutex_unlock (&worker->write_lock); if (flushers != NULL) { FlushAsyncData *data; + data = g_new0 (FlushAsyncData, 1); data->worker = _g_dbus_worker_ref (worker); data->flushers = flushers; - /* flush the stream before writing the next message */ - g_output_stream_flush_async (g_io_stream_get_output_stream (worker->stream), - G_PRIORITY_DEFAULT, - worker->cancellable, - ostream_flush_cb, - data); - } - else - { - /* kick off the next write! */ - continue_writing (worker); + return data; } + + return NULL; } /* called in private thread shared by all GDBusConnection instances @@ -1311,21 +1327,24 @@ write_message_cb (GObject *source_object, g_mutex_lock (&data->worker->write_lock); g_assert (data->worker->output_pending == PENDING_WRITE); data->worker->output_pending = PENDING_NONE; - g_mutex_unlock (&data->worker->write_lock); error = NULL; if (!write_message_finish (res, &error)) { + g_mutex_unlock (&data->worker->write_lock); + /* TODO: handle */ _g_dbus_worker_emit_disconnected (data->worker, TRUE, error); g_error_free (error); + + g_mutex_lock (&data->worker->write_lock); } - /* this function will also kick of the next write (it might need to - * flush so writing the next message might happen much later - * e.g. async) - */ - message_written (data->worker, data); + message_written_unlocked (data->worker, data); + + g_mutex_unlock (&data->worker->write_lock); + + continue_writing (data->worker); message_to_write_data_free (data); } @@ -1409,6 +1428,7 @@ static void continue_writing (GDBusWorker *worker) { MessageToWriteData *data; + FlushAsyncData *flush_async_data; write_next: /* we mustn't try to write two things at once */ @@ -1429,10 +1449,19 @@ continue_writing (GDBusWorker *worker) } else { - data = g_queue_pop_head (worker->write_queue); + flush_async_data = prepare_flush_unlocked (worker); - if (data != NULL) - worker->output_pending = PENDING_WRITE; + if (flush_async_data == NULL) + { + data = g_queue_pop_head (worker->write_queue); + + if (data != NULL) + worker->output_pending = PENDING_WRITE; + } + else + { + data = NULL; + } } g_mutex_unlock (&worker->write_lock); @@ -1445,7 +1474,13 @@ continue_writing (GDBusWorker *worker) * code and then writing the message out onto the GIOStream since this * function only runs on the worker thread. */ - if (data != NULL) + + if (flush_async_data != NULL) + { + start_flush (flush_async_data); + g_assert (data == NULL); + } + else if (data != NULL) { GDBusMessage *old_message; guchar *new_blob;