GDBusWorker: move flush async op into continue_writing()

This makes it easier to schedule a flush, by putting it on the same code
path as writing and closing.

Also change message_written to expect the lock to be held, since all
that's left in that function either wants to hold the lock or doesn't
care, and it's silly to release the lock immediately before calling
message_written, which just takes it again.

Bug: https://bugzilla.gnome.org/show_bug.cgi?id=662395
Signed-off-by: Simon McVittie <simon.mcvittie@collabora.co.uk>
Reviewed-by: Cosimo Alfarano <cosimo.alfarano@collabora.co.uk>
This commit is contained in:
Simon McVittie 2011-11-21 17:18:01 +00:00
parent 301332168b
commit f41178c6c7

View File

@ -1128,7 +1128,7 @@ write_message_async (GDBusWorker *worker,
write_message_continue_writing (data);
}
/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
/* called in private thread shared by all GDBusConnection instances (with write-lock held) */
static gboolean
write_message_finish (GAsyncResult *res,
GError **error)
@ -1222,17 +1222,27 @@ ostream_flush_cb (GObject *source_object,
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
* output_pending is PENDING_FLUSH on entry
*/
static void
start_flush (FlushAsyncData *data)
{
g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream),
G_PRIORITY_DEFAULT,
data->worker->cancellable,
ostream_flush_cb,
data);
}
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is held on entry
* output_pending is PENDING_NONE on entry
*/
static void
message_written (GDBusWorker *worker,
MessageToWriteData *message_data)
message_written_unlocked (GDBusWorker *worker,
MessageToWriteData *message_data)
{
GList *l;
GList *ll;
GList *flushers;
/* first log the fact that we wrote a message */
if (G_UNLIKELY (_g_dbus_debug_message ()))
{
gchar *s;
@ -1253,10 +1263,24 @@ message_written (GDBusWorker *worker,
_g_dbus_debug_print_unlock ();
}
/* then first wake up pending flushes and, if needed, flush the stream */
flushers = NULL;
g_mutex_lock (&worker->write_lock);
worker->write_num_messages_written += 1;
}
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is held on entry
* output_pending is PENDING_NONE on entry
*
* Returns: non-%NULL, setting @output_pending, if we need to flush now
*/
static FlushAsyncData *
prepare_flush_unlocked (GDBusWorker *worker)
{
GList *l;
GList *ll;
GList *flushers;
flushers = NULL;
for (l = worker->write_pending_flushes; l != NULL; l = ll)
{
FlushData *f = l->data;
@ -1273,26 +1297,18 @@ message_written (GDBusWorker *worker,
g_assert (worker->output_pending == PENDING_NONE);
worker->output_pending = PENDING_FLUSH;
}
g_mutex_unlock (&worker->write_lock);
if (flushers != NULL)
{
FlushAsyncData *data;
data = g_new0 (FlushAsyncData, 1);
data->worker = _g_dbus_worker_ref (worker);
data->flushers = flushers;
/* flush the stream before writing the next message */
g_output_stream_flush_async (g_io_stream_get_output_stream (worker->stream),
G_PRIORITY_DEFAULT,
worker->cancellable,
ostream_flush_cb,
data);
}
else
{
/* kick off the next write! */
continue_writing (worker);
return data;
}
return NULL;
}
/* called in private thread shared by all GDBusConnection instances
@ -1311,21 +1327,24 @@ write_message_cb (GObject *source_object,
g_mutex_lock (&data->worker->write_lock);
g_assert (data->worker->output_pending == PENDING_WRITE);
data->worker->output_pending = PENDING_NONE;
g_mutex_unlock (&data->worker->write_lock);
error = NULL;
if (!write_message_finish (res, &error))
{
g_mutex_unlock (&data->worker->write_lock);
/* TODO: handle */
_g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
g_error_free (error);
g_mutex_lock (&data->worker->write_lock);
}
/* this function will also kick of the next write (it might need to
* flush so writing the next message might happen much later
* e.g. async)
*/
message_written (data->worker, data);
message_written_unlocked (data->worker, data);
g_mutex_unlock (&data->worker->write_lock);
continue_writing (data->worker);
message_to_write_data_free (data);
}
@ -1409,6 +1428,7 @@ static void
continue_writing (GDBusWorker *worker)
{
MessageToWriteData *data;
FlushAsyncData *flush_async_data;
write_next:
/* we mustn't try to write two things at once */
@ -1429,10 +1449,19 @@ continue_writing (GDBusWorker *worker)
}
else
{
data = g_queue_pop_head (worker->write_queue);
flush_async_data = prepare_flush_unlocked (worker);
if (data != NULL)
worker->output_pending = PENDING_WRITE;
if (flush_async_data == NULL)
{
data = g_queue_pop_head (worker->write_queue);
if (data != NULL)
worker->output_pending = PENDING_WRITE;
}
else
{
data = NULL;
}
}
g_mutex_unlock (&worker->write_lock);
@ -1445,7 +1474,13 @@ continue_writing (GDBusWorker *worker)
* code and then writing the message out onto the GIOStream since this
* function only runs on the worker thread.
*/
if (data != NULL)
if (flush_async_data != NULL)
{
start_flush (flush_async_data);
g_assert (data == NULL);
}
else if (data != NULL)
{
GDBusMessage *old_message;
guchar *new_blob;