diff --git a/docs/reference/gio/gio-sections.txt b/docs/reference/gio/gio-sections.txt index a9efdd37a..57329a525 100644 --- a/docs/reference/gio/gio-sections.txt +++ b/docs/reference/gio/gio-sections.txt @@ -895,6 +895,8 @@ GUnixOutputStreamPrivate GIOStream g_io_stream_get_input_stream g_io_stream_get_output_stream +g_io_stream_splice_async +g_io_stream_splice_finish g_io_stream_close g_io_stream_close_async g_io_stream_close_finish diff --git a/gio/gioenums.h b/gio/gioenums.h index 7aaba24e6..c481f5ef9 100644 --- a/gio/gioenums.h +++ b/gio/gioenums.h @@ -571,6 +571,25 @@ typedef enum { } GOutputStreamSpliceFlags; +/** + * GIOStreamSpliceFlags: + * @G_IO_STREAM_SPLICE_NONE: Do not close either stream. + * @G_IO_STREAM_SPLICE_CLOSE_STREAM1: Close the first stream after + * the splice. + * @G_IO_STREAM_SPLICE_CLOSE_STREAM2: Close the second stream after + * the splice. + * @G_IO_STREAM_SPLICE_WAIT_FOR_BOTH: Wait for both splice operations to finish + * before calling the callback. + * + * GIOStreamSpliceFlags determine how streams should be spliced. + **/ +typedef enum { + G_IO_STREAM_SPLICE_NONE = 0, + G_IO_STREAM_SPLICE_CLOSE_STREAM1 = (1 << 0), + G_IO_STREAM_SPLICE_CLOSE_STREAM2 = (1 << 1), + G_IO_STREAM_SPLICE_WAIT_FOR_BOTH = (1 << 2), +} GIOStreamSpliceFlags; + /** * GEmblemOrigin: * @G_EMBLEM_ORIGIN_UNKNOWN: Emblem of unknown origin diff --git a/gio/giostream.c b/gio/giostream.c index 7c5bdf02e..137c97628 100644 --- a/gio/giostream.c +++ b/gio/giostream.c @@ -606,3 +606,251 @@ g_io_stream_real_close_finish (GIOStream *stream, g_io_stream_real_close_async); return TRUE; } + +typedef struct +{ + GIOStream *stream1; + GIOStream *stream2; + GIOStreamSpliceFlags flags; + gint io_priority; + GCancellable *cancellable; + gulong cancelled_id; + GCancellable *op1_cancellable; + GCancellable *op2_cancellable; + guint completed; + GError *error; +} SpliceContext; + +static void +splice_context_free (SpliceContext *ctx) +{ + g_object_unref (ctx->stream1); + g_object_unref (ctx->stream2); + if (ctx->cancellable != NULL) + g_object_unref (ctx->cancellable); + g_object_unref (ctx->op1_cancellable); + g_object_unref (ctx->op2_cancellable); + g_clear_error (&ctx->error); + g_slice_free (SpliceContext, ctx); +} + +static void +splice_complete (GSimpleAsyncResult *simple, + SpliceContext *ctx) +{ + if (ctx->cancelled_id != 0) + g_cancellable_disconnect (ctx->cancellable, ctx->cancelled_id); + ctx->cancelled_id = 0; + + if (ctx->error != NULL) + g_simple_async_result_set_from_error (simple, ctx->error); + g_simple_async_result_complete (simple); +} + +static void +splice_close_cb (GObject *iostream, + GAsyncResult *res, + gpointer user_data) +{ + GSimpleAsyncResult *simple = user_data; + SpliceContext *ctx; + GError *error = NULL; + + g_io_stream_close_finish (G_IO_STREAM (iostream), res, &error); + + ctx = g_simple_async_result_get_op_res_gpointer (simple); + ctx->completed++; + + /* Keep the first error that occured */ + if (error != NULL && ctx->error == NULL) + ctx->error = error; + else + g_clear_error (&error); + + /* If all operations are done, complete now */ + if (ctx->completed == 4) + splice_complete (simple, ctx); + + g_object_unref (simple); +} + +static void +splice_cb (GObject *ostream, + GAsyncResult *res, + gpointer user_data) +{ + GSimpleAsyncResult *simple = user_data; + SpliceContext *ctx; + GError *error = NULL; + + g_output_stream_splice_finish (G_OUTPUT_STREAM (ostream), res, &error); + + ctx = g_simple_async_result_get_op_res_gpointer (simple); + ctx->completed++; + + /* ignore cancellation error if it was not requested by the user */ + if (error != NULL && + g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) && + (ctx->cancellable == NULL || + !g_cancellable_is_cancelled (ctx->cancellable))) + g_clear_error (&error); + + /* Keep the first error that occured */ + if (error != NULL && ctx->error == NULL) + ctx->error = error; + else + g_clear_error (&error); + + if (ctx->completed == 1 && + (ctx->flags & G_IO_STREAM_SPLICE_WAIT_FOR_BOTH) == 0) + { + /* We don't want to wait for the 2nd operation to finish, cancel it */ + g_cancellable_cancel (ctx->op1_cancellable); + g_cancellable_cancel (ctx->op2_cancellable); + } + else if (ctx->completed == 2) + { + if (ctx->cancellable == NULL || + !g_cancellable_is_cancelled (ctx->cancellable)) + { + g_cancellable_reset (ctx->op1_cancellable); + g_cancellable_reset (ctx->op2_cancellable); + } + + /* Close the IO streams if needed */ + if ((ctx->flags & G_IO_STREAM_SPLICE_CLOSE_STREAM1) != 0) + g_io_stream_close_async (ctx->stream1, ctx->io_priority, + ctx->op1_cancellable, splice_close_cb, g_object_ref (simple)); + else + ctx->completed++; + + if ((ctx->flags & G_IO_STREAM_SPLICE_CLOSE_STREAM2) != 0) + g_io_stream_close_async (ctx->stream2, ctx->io_priority, + ctx->op2_cancellable, splice_close_cb, g_object_ref (simple)); + else + ctx->completed++; + + /* If all operations are done, complete now */ + if (ctx->completed == 4) + splice_complete (simple, ctx); + } + + g_object_unref (simple); +} + +static void +splice_cancelled_cb (GCancellable *cancellable, + GSimpleAsyncResult *simple) +{ + SpliceContext *ctx; + + ctx = g_simple_async_result_get_op_res_gpointer (simple); + g_cancellable_cancel (ctx->op1_cancellable); + g_cancellable_cancel (ctx->op2_cancellable); +} + +/** + * g_io_stream_splice_async: + * @stream1: a #GIOStream. + * @stream2: a #GIOStream. + * @flags: a set of #GIOStreamSpliceFlags. + * @io_priority: the io priority of the request. + * @cancellable: optional #GCancellable object, %NULL to ignore. + * @callback: a #GAsyncReadyCallback. + * @user_data: user data passed to @callback. + * + * Asyncronously splice the output stream of @stream1 to the input stream of + * @stream2, and splice the output stream of @stream2 to the input stream of + * @stream1. + * + * When the operation is finished @callback will be called. + * You can then call g_io_stream_splice_finish() to get the + * result of the operation. + **/ +void +g_io_stream_splice_async (GIOStream *stream1, + GIOStream *stream2, + GIOStreamSpliceFlags flags, + gint io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + GSimpleAsyncResult *simple; + SpliceContext *ctx; + GInputStream *istream; + GOutputStream *ostream; + + if (cancellable != NULL && g_cancellable_is_cancelled (cancellable)) + { + g_simple_async_report_error_in_idle (NULL, callback, + user_data, G_IO_ERROR, G_IO_ERROR_CANCELLED, + "Operation has been cancelled"); + return; + } + + ctx = g_slice_new0 (SpliceContext); + ctx->stream1 = g_object_ref (stream1); + ctx->stream2 = g_object_ref (stream2); + ctx->flags = flags; + ctx->io_priority = io_priority; + ctx->op1_cancellable = g_cancellable_new (); + ctx->op2_cancellable = g_cancellable_new (); + ctx->completed = 0; + + simple = g_simple_async_result_new (NULL, callback, user_data, + g_io_stream_splice_finish); + g_simple_async_result_set_op_res_gpointer (simple, ctx, + (GDestroyNotify) splice_context_free); + + if (cancellable != NULL) + { + ctx->cancellable = g_object_ref (cancellable); + ctx->cancelled_id = g_cancellable_connect (cancellable, + G_CALLBACK (splice_cancelled_cb), g_object_ref (simple), + g_object_unref); + } + + istream = g_io_stream_get_input_stream (stream1); + ostream = g_io_stream_get_output_stream (stream2); + g_output_stream_splice_async (ostream, istream, G_OUTPUT_STREAM_SPLICE_NONE, + io_priority, ctx->op1_cancellable, splice_cb, + g_object_ref (simple)); + + istream = g_io_stream_get_input_stream (stream2); + ostream = g_io_stream_get_output_stream (stream1); + g_output_stream_splice_async (ostream, istream, G_OUTPUT_STREAM_SPLICE_NONE, + io_priority, ctx->op2_cancellable, splice_cb, + g_object_ref (simple)); + + g_object_unref (simple); +} + +/** + * g_io_stream_splice_finish: + * @result: a #GAsyncResult. + * @error: a #GError location to store the error occuring, or %NULL to + * ignore. + * + * Finishes an asynchronous io stream splice operation. + * + * Returns: %TRUE on success, %FALSE otherwise. + **/ +gboolean +g_io_stream_splice_finish (GAsyncResult *result, + GError **error) +{ + GSimpleAsyncResult *simple; + + g_return_val_if_fail (G_IS_SIMPLE_ASYNC_RESULT (result), FALSE); + + simple = G_SIMPLE_ASYNC_RESULT (result); + + if (g_simple_async_result_propagate_error (simple, error)) + return FALSE; + + g_return_val_if_fail (g_simple_async_result_is_valid (result, NULL, + g_io_stream_splice_finish), FALSE); + + return TRUE; +} diff --git a/gio/giostream.h b/gio/giostream.h index 048fd4f10..90283ddbd 100644 --- a/gio/giostream.h +++ b/gio/giostream.h @@ -88,6 +88,17 @@ GType g_io_stream_get_type (void) G_GNUC_CONST; GInputStream * g_io_stream_get_input_stream (GIOStream *stream); GOutputStream *g_io_stream_get_output_stream (GIOStream *stream); +void g_io_stream_splice_async (GIOStream *stream1, + GIOStream *stream2, + GIOStreamSpliceFlags flags, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); + +gboolean g_io_stream_splice_finish (GAsyncResult *result, + GError **error); + gboolean g_io_stream_close (GIOStream *stream, GCancellable *cancellable, GError **error); diff --git a/gio/tests/Makefile.am b/gio/tests/Makefile.am index def7392a8..5f49e6cec 100644 --- a/gio/tests/Makefile.am +++ b/gio/tests/Makefile.am @@ -18,6 +18,7 @@ progs_ldadd = \ $(top_builddir)/gio/libgio-2.0.la TEST_PROGS += \ + io-stream \ actions \ memory-input-stream \ memory-output-stream \ @@ -102,6 +103,9 @@ if OS_WIN32 TEST_PROGS += win32-streams endif +io_stream_SOURCES = io-stream.c +io_stream_LDADD = $(progs_ldadd) + actions_LDADD = $(progs_ldadd) memory_input_stream_SOURCES = memory-input-stream.c diff --git a/gio/tests/io-stream.c b/gio/tests/io-stream.c new file mode 100644 index 000000000..dbac08bfd --- /dev/null +++ b/gio/tests/io-stream.c @@ -0,0 +1,185 @@ +/* GLib testing framework examples and tests + * Copyright (C) 2010 Collabora Ltd. + * Authors: Xavier Claessens + * + * This work is provided "as is"; redistribution and modification + * in whole or in part, in any medium, physical or electronic is + * permitted without restriction. + * + * This work is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * + * In no event shall the authors or contributors be liable for any + * direct, indirect, incidental, special, exemplary, or consequential + * damages (including, but not limited to, procurement of substitute + * goods or services; loss of use, data, or profits; or business + * interruption) however caused and on any theory of liability, whether + * in contract, strict liability, or tort (including negligence or + * otherwise) arising in any way out of the use of this software, even + * if advised of the possibility of such damage. + */ + +#include +#include +#include +#include + +typedef struct +{ + GIOStream parent; + GInputStream *input_stream; + GOutputStream *output_stream; +} GTestIOStream; + +typedef struct +{ + GIOStreamClass parent_class; +} GTestIOStreamClass; + +G_DEFINE_TYPE (GTestIOStream, g_test_io_stream, G_TYPE_IO_STREAM); + + +static GInputStream * +get_input_stream (GIOStream *io_stream) +{ + GTestIOStream *self = (GTestIOStream *) io_stream; + + return self->input_stream; +} + +static GOutputStream * +get_output_stream (GIOStream *io_stream) +{ + GTestIOStream *self = (GTestIOStream *) io_stream; + + return self->output_stream; +} + +static void +finalize (GObject *object) +{ + GTestIOStream *self = (GTestIOStream *) object; + + if (self->input_stream != NULL) + g_object_unref (self->input_stream); + + if (self->output_stream != NULL) + g_object_unref (self->output_stream); + + G_OBJECT_CLASS (g_test_io_stream_parent_class)->finalize (object); +} + +static void +g_test_io_stream_class_init (GTestIOStreamClass *klass) +{ + GObjectClass *object_class = G_OBJECT_CLASS (klass); + GIOStreamClass *io_class = G_IO_STREAM_CLASS (klass); + + object_class->finalize = finalize; + + io_class->get_input_stream = get_input_stream; + io_class->get_output_stream = get_output_stream; +} + +static void +g_test_io_stream_init (GTestIOStream *self) +{ +} + +static GIOStream * +g_test_io_stream_new (GInputStream *input, GOutputStream *output) +{ + GTestIOStream *self; + + self = g_object_new (g_test_io_stream_get_type (), NULL); + self->input_stream = g_object_ref (input); + self->output_stream = g_object_ref (output); + + return G_IO_STREAM (self); +} + +typedef struct +{ + GMainLoop *main_loop; + const gchar *data1; + const gchar *data2; + GIOStream *iostream1; + GIOStream *iostream2; +} TestCopyChunksData; + +static void +test_copy_chunks_splice_cb (GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + TestCopyChunksData *data = user_data; + GMemoryOutputStream *ostream; + gchar *received_data; + GError *error = NULL; + + g_io_stream_splice_finish (res, &error); + g_assert_no_error (error); + + ostream = G_MEMORY_OUTPUT_STREAM (((GTestIOStream *) data->iostream1)->output_stream); + received_data = g_memory_output_stream_get_data (ostream); + g_assert_cmpstr (received_data, ==, data->data2); + + ostream = G_MEMORY_OUTPUT_STREAM (((GTestIOStream *) data->iostream2)->output_stream); + received_data = g_memory_output_stream_get_data (ostream); + g_assert_cmpstr (received_data, ==, data->data1); + + g_assert (g_io_stream_is_closed (data->iostream1)); + g_assert (g_io_stream_is_closed (data->iostream2)); + + g_main_loop_quit (data->main_loop); +} + +static void +test_copy_chunks (void) +{ + TestCopyChunksData data; + GInputStream *istream; + GOutputStream *ostream; + + data.main_loop = g_main_loop_new (NULL, FALSE); + data.data1 = "abcdefghijklmnopqrstuvwxyz"; + data.data2 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + + istream = g_memory_input_stream_new_from_data (data.data1, -1, NULL); + ostream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free); + data.iostream1 = g_test_io_stream_new (istream, ostream); + g_object_unref (istream); + g_object_unref (ostream); + + istream = g_memory_input_stream_new_from_data (data.data2, -1, NULL); + ostream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free); + data.iostream2 = g_test_io_stream_new (istream, ostream); + g_object_unref (istream); + g_object_unref (ostream); + + g_io_stream_splice_async (data.iostream1, data.iostream2, + G_IO_STREAM_SPLICE_CLOSE_STREAM1 | G_IO_STREAM_SPLICE_CLOSE_STREAM2 | + G_IO_STREAM_SPLICE_WAIT_FOR_BOTH, G_PRIORITY_DEFAULT, + NULL, test_copy_chunks_splice_cb, &data); + + /* We do not hold a ref in data struct, this is to make sure the operation + * keeps the iostream objects alive until it finishes */ + g_object_unref (data.iostream1); + g_object_unref (data.iostream2); + + g_main_loop_run (data.main_loop); + g_main_loop_unref (data.main_loop); +} + +int +main (int argc, + char *argv[]) +{ + g_type_init (); + g_test_init (&argc, &argv, NULL); + + g_test_add_func ("/io-stream/copy-chunks", test_copy_chunks); + + return g_test_run(); +}