From 4e9e7d0cba53a711bd650e9a5e28452b93f0d849 Mon Sep 17 00:00:00 2001 From: Mike Ruprecht Date: Mon, 18 Feb 2013 08:12:50 -0600 Subject: [PATCH] GOutputStream: Use async read/write of streams in splice_async() There are some corner cases where using the sync version of read/write in a thread could cause thread-safety issues. In these cases it's possible to override the output stream's splice_async() function, but for input streams one would need to do some acrobatics to stay thread-safe. Alternatively, some implementations may not even override their sync read/write functions. This patch refactors the default splice_async() implementation to call the sync read and write functions in a thread only when both async versions are thread-based. When one or both are non-threaded, it calls the virtual write_async() and read_async() functions of the involved streams within the same thread. https://bugzilla.gnome.org/show_bug.cgi?id=691581 --- gio/goutputstream.c | 180 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 175 insertions(+), 5 deletions(-) diff --git a/gio/goutputstream.c b/gio/goutputstream.c index 1b85e9e05..7e03df4b1 100644 --- a/gio/goutputstream.c +++ b/gio/goutputstream.c @@ -1559,15 +1559,176 @@ g_output_stream_real_write_finish (GOutputStream *stream, typedef struct { GInputStream *source; GOutputStreamSpliceFlags flags; + gssize n_read; + gssize n_written; + gsize bytes_copied; + GError *error; + guint8 *buffer; } SpliceData; static void free_splice_data (SpliceData *op) { + g_clear_pointer (&op->buffer, g_free); g_object_unref (op->source); + g_clear_error (&op->error); g_free (op); } +static void +real_splice_async_complete_cb (GTask *task) +{ + SpliceData *op = g_task_get_task_data (task); + + if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE && + !g_input_stream_is_closed (op->source)) + return; + + if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET && + !g_output_stream_is_closed (g_task_get_source_object (task))) + return; + + if (op->error != NULL) + { + g_task_return_error (task, op->error); + op->error = NULL; + } + else + { + g_task_return_int (task, op->bytes_copied); + } + + g_object_unref (task); +} + +static void +real_splice_async_close_input_cb (GObject *source, + GAsyncResult *res, + gpointer user_data) +{ + GTask *task = user_data; + + g_input_stream_close_finish (G_INPUT_STREAM (source), res, NULL); + + real_splice_async_complete_cb (task); +} + +static void +real_splice_async_close_output_cb (GObject *source, + GAsyncResult *res, + gpointer user_data) +{ + GTask *task = G_TASK (user_data); + SpliceData *op = g_task_get_task_data (task); + GError **error = (op->error == NULL) ? &op->error : NULL; + + g_output_stream_internal_close_finish (G_OUTPUT_STREAM (source), res, error); + + real_splice_async_complete_cb (task); +} + +static void +real_splice_async_complete (GTask *task) +{ + SpliceData *op = g_task_get_task_data (task); + gboolean done = TRUE; + + if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE) + { + done = FALSE; + g_input_stream_close_async (op->source, g_task_get_priority (task), + g_task_get_cancellable (task), + real_splice_async_close_input_cb, task); + } + + if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET) + { + done = FALSE; + g_output_stream_internal_close_async (g_task_get_source_object (task), + g_task_get_priority (task), + g_task_get_cancellable (task), + real_splice_async_close_output_cb, + task); + } + + if (done) + real_splice_async_complete_cb (task); +} + +static void real_splice_async_read_cb (GObject *source, + GAsyncResult *res, + gpointer user_data); + +static void +real_splice_async_write_cb (GObject *source, + GAsyncResult *res, + gpointer user_data) +{ + GOutputStreamClass *class; + GTask *task = G_TASK (user_data); + SpliceData *op = g_task_get_task_data (task); + gssize ret; + + class = G_OUTPUT_STREAM_GET_CLASS (g_task_get_source_object (task)); + + ret = class->write_finish (G_OUTPUT_STREAM (source), res, &op->error); + + if (ret == -1) + { + real_splice_async_complete (task); + return; + } + + op->n_written += ret; + op->bytes_copied += ret; + if (op->bytes_copied > G_MAXSSIZE) + op->bytes_copied = G_MAXSSIZE; + + if (op->n_written < op->n_read) + { + class->write_async (g_task_get_source_object (task), + op->buffer + op->n_written, + op->n_read - op->n_written, + g_task_get_priority (task), + g_task_get_cancellable (task), + real_splice_async_write_cb, task); + return; + } + + g_input_stream_read_async (op->source, op->buffer, 8192, + g_task_get_priority (task), + g_task_get_cancellable (task), + real_splice_async_read_cb, task); +} + +static void +real_splice_async_read_cb (GObject *source, + GAsyncResult *res, + gpointer user_data) +{ + GOutputStreamClass *class; + GTask *task = G_TASK (user_data); + SpliceData *op = g_task_get_task_data (task); + gssize ret; + + class = G_OUTPUT_STREAM_GET_CLASS (g_task_get_source_object (task)); + + ret = g_input_stream_read_finish (op->source, res, &op->error); + if (ret == -1 || ret == 0) + { + real_splice_async_complete (task); + return; + } + + op->n_read = ret; + op->n_written = 0; + + class->write_async (g_task_get_source_object (task), op->buffer, + op->n_read, g_task_get_priority (task), + g_task_get_cancellable (task), + real_splice_async_write_cb, task); +} + static void splice_async_thread (GTask *task, gpointer source_object, @@ -1611,11 +1772,20 @@ g_output_stream_real_splice_async (GOutputStream *stream, op->flags = flags; op->source = g_object_ref (source); - /* TODO: In the case where both source and destintion have - non-threadbased async calls we can use a true async copy here */ - - g_task_run_in_thread (task, splice_async_thread); - g_object_unref (task); + if (g_input_stream_async_read_is_via_threads (source) && + g_output_stream_async_write_is_via_threads (stream)) + { + g_task_run_in_thread (task, splice_async_thread); + g_object_unref (task); + } + else + { + op->buffer = g_malloc (8192); + g_input_stream_read_async (op->source, op->buffer, 8192, + g_task_get_priority (task), + g_task_get_cancellable (task), + real_splice_async_read_cb, task); + } } static gssize