GOutputStream: Use async read/write of streams in splice_async()

There are some corner cases where using the sync version of read/write
in a thread could cause thread-safety issues. In these cases it's
possible to override the output stream's splice_async() function,
but for input streams one would need to do some acrobatics to
stay thread-safe. Alternatively, some implementations may not even
override their sync read/write functions.

This patch refactors the default splice_async() implementation to
call the sync read and write functions in a thread only when both
async versions are thread-based. When one or both are non-threaded,
it calls the virtual write_async() and read_async() functions of the
involved streams within the same thread.

https://bugzilla.gnome.org/show_bug.cgi?id=691581
This commit is contained in:
Mike Ruprecht 2013-02-18 08:12:50 -06:00 committed by Dan Winship
parent 87e5617a65
commit 4e9e7d0cba

View File

@ -1559,15 +1559,176 @@ g_output_stream_real_write_finish (GOutputStream *stream,
typedef struct {
GInputStream *source;
GOutputStreamSpliceFlags flags;
gssize n_read;
gssize n_written;
gsize bytes_copied;
GError *error;
guint8 *buffer;
} SpliceData;
static void
free_splice_data (SpliceData *op)
{
g_clear_pointer (&op->buffer, g_free);
g_object_unref (op->source);
g_clear_error (&op->error);
g_free (op);
}
static void
real_splice_async_complete_cb (GTask *task)
{
SpliceData *op = g_task_get_task_data (task);
if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE &&
!g_input_stream_is_closed (op->source))
return;
if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET &&
!g_output_stream_is_closed (g_task_get_source_object (task)))
return;
if (op->error != NULL)
{
g_task_return_error (task, op->error);
op->error = NULL;
}
else
{
g_task_return_int (task, op->bytes_copied);
}
g_object_unref (task);
}
static void
real_splice_async_close_input_cb (GObject *source,
GAsyncResult *res,
gpointer user_data)
{
GTask *task = user_data;
g_input_stream_close_finish (G_INPUT_STREAM (source), res, NULL);
real_splice_async_complete_cb (task);
}
static void
real_splice_async_close_output_cb (GObject *source,
GAsyncResult *res,
gpointer user_data)
{
GTask *task = G_TASK (user_data);
SpliceData *op = g_task_get_task_data (task);
GError **error = (op->error == NULL) ? &op->error : NULL;
g_output_stream_internal_close_finish (G_OUTPUT_STREAM (source), res, error);
real_splice_async_complete_cb (task);
}
static void
real_splice_async_complete (GTask *task)
{
SpliceData *op = g_task_get_task_data (task);
gboolean done = TRUE;
if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE)
{
done = FALSE;
g_input_stream_close_async (op->source, g_task_get_priority (task),
g_task_get_cancellable (task),
real_splice_async_close_input_cb, task);
}
if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET)
{
done = FALSE;
g_output_stream_internal_close_async (g_task_get_source_object (task),
g_task_get_priority (task),
g_task_get_cancellable (task),
real_splice_async_close_output_cb,
task);
}
if (done)
real_splice_async_complete_cb (task);
}
static void real_splice_async_read_cb (GObject *source,
GAsyncResult *res,
gpointer user_data);
static void
real_splice_async_write_cb (GObject *source,
GAsyncResult *res,
gpointer user_data)
{
GOutputStreamClass *class;
GTask *task = G_TASK (user_data);
SpliceData *op = g_task_get_task_data (task);
gssize ret;
class = G_OUTPUT_STREAM_GET_CLASS (g_task_get_source_object (task));
ret = class->write_finish (G_OUTPUT_STREAM (source), res, &op->error);
if (ret == -1)
{
real_splice_async_complete (task);
return;
}
op->n_written += ret;
op->bytes_copied += ret;
if (op->bytes_copied > G_MAXSSIZE)
op->bytes_copied = G_MAXSSIZE;
if (op->n_written < op->n_read)
{
class->write_async (g_task_get_source_object (task),
op->buffer + op->n_written,
op->n_read - op->n_written,
g_task_get_priority (task),
g_task_get_cancellable (task),
real_splice_async_write_cb, task);
return;
}
g_input_stream_read_async (op->source, op->buffer, 8192,
g_task_get_priority (task),
g_task_get_cancellable (task),
real_splice_async_read_cb, task);
}
static void
real_splice_async_read_cb (GObject *source,
GAsyncResult *res,
gpointer user_data)
{
GOutputStreamClass *class;
GTask *task = G_TASK (user_data);
SpliceData *op = g_task_get_task_data (task);
gssize ret;
class = G_OUTPUT_STREAM_GET_CLASS (g_task_get_source_object (task));
ret = g_input_stream_read_finish (op->source, res, &op->error);
if (ret == -1 || ret == 0)
{
real_splice_async_complete (task);
return;
}
op->n_read = ret;
op->n_written = 0;
class->write_async (g_task_get_source_object (task), op->buffer,
op->n_read, g_task_get_priority (task),
g_task_get_cancellable (task),
real_splice_async_write_cb, task);
}
static void
splice_async_thread (GTask *task,
gpointer source_object,
@ -1611,11 +1772,20 @@ g_output_stream_real_splice_async (GOutputStream *stream,
op->flags = flags;
op->source = g_object_ref (source);
/* TODO: In the case where both source and destintion have
non-threadbased async calls we can use a true async copy here */
g_task_run_in_thread (task, splice_async_thread);
g_object_unref (task);
if (g_input_stream_async_read_is_via_threads (source) &&
g_output_stream_async_write_is_via_threads (stream))
{
g_task_run_in_thread (task, splice_async_thread);
g_object_unref (task);
}
else
{
op->buffer = g_malloc (8192);
g_input_stream_read_async (op->source, op->buffer, 8192,
g_task_get_priority (task),
g_task_get_cancellable (task),
real_splice_async_read_cb, task);
}
}
static gssize