From 82ec4dcaed8107d436f76c45ec30645715b6dbef Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Mon, 6 Feb 2012 15:08:08 -0500 Subject: [PATCH] gio: implement GPollableInput/OutputStream in more stream types Implement GPollableInputStream in GMemoryInputStream and GConverterInputStream, and likewise implement GPollableOutputStream in the corresponding output streams. https://bugzilla.gnome.org/show_bug.cgi?id=673997 --- gio/gconverterinputstream.c | 130 ++++++++++++++++++--- gio/gconverteroutputstream.c | 119 +++++++++++++++---- gio/gmemoryinputstream.c | 41 ++++++- gio/gmemoryoutputstream.c | 40 ++++++- gio/tests/converter-stream.c | 214 +++++++++++++++++++++++++++++++++++ 5 files changed, 508 insertions(+), 36 deletions(-) diff --git a/gio/gconverterinputstream.c b/gio/gconverterinputstream.c index 2fbf94db6..1acf9a948 100644 --- a/gio/gconverterinputstream.c +++ b/gio/gconverterinputstream.c @@ -25,6 +25,7 @@ #include #include "gconverterinputstream.h" +#include "gpollableinputstream.h" #include "gsimpleasyncresult.h" #include "gcancellable.h" #include "gioenumtypes.h" @@ -41,6 +42,8 @@ * Converter input stream implements #GInputStream and allows * conversion of data of various types during reading. * + * As of GLib 2.34, #GConverterInputStream implements + * #GPollableInputStream. **/ #define INITIAL_BUFFER_SIZE 4096 @@ -55,6 +58,7 @@ typedef struct { struct _GConverterInputStreamPrivate { gboolean at_input_end; gboolean finished; + gboolean need_input; GConverter *converter; Buffer input_buffer; Buffer converted_buffer; @@ -80,9 +84,24 @@ static gssize g_converter_input_stream_read (GInputStream *stream, GCancellable *cancellable, GError **error); -G_DEFINE_TYPE (GConverterInputStream, - g_converter_input_stream, - G_TYPE_FILTER_INPUT_STREAM) +static gboolean g_converter_input_stream_can_poll (GPollableInputStream *stream); +static gboolean g_converter_input_stream_is_readable (GPollableInputStream *stream); +static gssize g_converter_input_stream_read_nonblocking (GPollableInputStream *stream, + void *buffer, + gsize size, + GError **error); + +static GSource *g_converter_input_stream_create_source (GPollableInputStream *stream, + GCancellable *cancellable); + +static void g_converter_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface); + +G_DEFINE_TYPE_WITH_CODE (GConverterInputStream, + g_converter_input_stream, + G_TYPE_FILTER_INPUT_STREAM, + G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM, + g_converter_input_stream_pollable_iface_init); + ) static void g_converter_input_stream_class_init (GConverterInputStreamClass *klass) @@ -112,6 +131,15 @@ g_converter_input_stream_class_init (GConverterInputStreamClass *klass) } +static void +g_converter_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface) +{ + iface->can_poll = g_converter_input_stream_can_poll; + iface->is_readable = g_converter_input_stream_is_readable; + iface->read_nonblocking = g_converter_input_stream_read_nonblocking; + iface->create_source = g_converter_input_stream_create_source; +} + static void g_converter_input_stream_finalize (GObject *object) { @@ -320,6 +348,7 @@ buffer_ensure_space (Buffer *buffer, static gssize fill_input_buffer (GConverterInputStream *stream, gsize at_least_size, + gboolean blocking, GCancellable *cancellable, GError **error) { @@ -332,25 +361,30 @@ fill_input_buffer (GConverterInputStream *stream, buffer_ensure_space (&priv->input_buffer, at_least_size); base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream; - nread = g_input_stream_read (base_stream, - priv->input_buffer.data + priv->input_buffer.end, - buffer_tailspace (&priv->input_buffer), - cancellable, - error); + nread = g_pollable_stream_read (base_stream, + priv->input_buffer.data + priv->input_buffer.end, + buffer_tailspace (&priv->input_buffer), + blocking, + cancellable, + error); if (nread > 0) - priv->input_buffer.end += nread; + { + priv->input_buffer.end += nread; + priv->need_input = FALSE; + } return nread; } static gssize -g_converter_input_stream_read (GInputStream *stream, - void *buffer, - gsize count, - GCancellable *cancellable, - GError **error) +read_internal (GInputStream *stream, + void *buffer, + gsize count, + gboolean blocking, + GCancellable *cancellable, + GError **error) { GConverterInputStream *cstream; GConverterInputStreamPrivate *priv; @@ -389,7 +423,7 @@ g_converter_input_stream_read (GInputStream *stream, total_bytes_read == 0 && !priv->at_input_end) { - nread = fill_input_buffer (cstream, count, cancellable, error); + nread = fill_input_buffer (cstream, count, blocking, cancellable, error); if (nread < 0) return -1; if (nread == 0) @@ -497,6 +531,7 @@ g_converter_input_stream_read (GInputStream *stream, my_error2 = NULL; nread = fill_input_buffer (cstream, buffer_data_size (&priv->input_buffer) + 4096, + blocking, cancellable, &my_error2); if (nread < 0) @@ -504,6 +539,7 @@ g_converter_input_stream_read (GInputStream *stream, /* Can't read any more data, return that error */ g_error_free (my_error); g_propagate_error (error, my_error2); + priv->need_input = TRUE; return -1; } else if (nread == 0) @@ -536,6 +572,70 @@ g_converter_input_stream_read (GInputStream *stream, g_assert_not_reached (); } +static gssize +g_converter_input_stream_read (GInputStream *stream, + void *buffer, + gsize count, + GCancellable *cancellable, + GError **error) +{ + return read_internal (stream, buffer, count, TRUE, cancellable, error); +} + +static gboolean +g_converter_input_stream_can_poll (GPollableInputStream *stream) +{ + GInputStream *base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream; + + return (G_IS_POLLABLE_INPUT_STREAM (base_stream) && + g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (base_stream))); +} + +static gboolean +g_converter_input_stream_is_readable (GPollableInputStream *stream) +{ + GInputStream *base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream; + GConverterInputStream *cstream = G_CONVERTER_INPUT_STREAM (stream); + + if (buffer_data_size (&cstream->priv->converted_buffer)) + return TRUE; + else if (buffer_data_size (&cstream->priv->input_buffer) && + !cstream->priv->need_input) + return TRUE; + else + return g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (base_stream)); +} + +static gssize +g_converter_input_stream_read_nonblocking (GPollableInputStream *stream, + void *buffer, + gsize count, + GError **error) +{ + return read_internal (G_INPUT_STREAM (stream), buffer, count, + FALSE, NULL, error); +} + +static GSource * +g_converter_input_stream_create_source (GPollableInputStream *stream, + GCancellable *cancellable) +{ + GInputStream *base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream; + GSource *base_source, *pollable_source; + + if (g_pollable_input_stream_is_readable (stream)) + base_source = g_timeout_source_new (0); + else + base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (base_stream), NULL); + + pollable_source = g_pollable_source_new_full (stream, base_source, + cancellable); + g_source_unref (base_source); + + return pollable_source; +} + + /** * g_converter_input_stream_get_converter: * @converter_stream: a #GConverterInputStream diff --git a/gio/gconverteroutputstream.c b/gio/gconverteroutputstream.c index 5b1cbec7e..91990108b 100644 --- a/gio/gconverteroutputstream.c +++ b/gio/gconverteroutputstream.c @@ -25,6 +25,7 @@ #include #include "gconverteroutputstream.h" +#include "gpollableoutputstream.h" #include "gsimpleasyncresult.h" #include "gcancellable.h" #include "gioenumtypes.h" @@ -41,6 +42,8 @@ * Converter output stream implements #GOutputStream and allows * conversion of data of various types during reading. * + * As of GLib 2.34, #GConverterOutputStream implements + * #GPollableOutputStream. **/ #define INITIAL_BUFFER_SIZE 4096 @@ -96,9 +99,24 @@ static gboolean g_converter_output_stream_flush (GOutputStream *stream, GCancellable *cancellable, GError **error); -G_DEFINE_TYPE (GConverterOutputStream, - g_converter_output_stream, - G_TYPE_FILTER_OUTPUT_STREAM) +static gboolean g_converter_output_stream_can_poll (GPollableOutputStream *stream); +static gboolean g_converter_output_stream_is_writable (GPollableOutputStream *stream); +static gssize g_converter_output_stream_write_nonblocking (GPollableOutputStream *stream, + const void *buffer, + gsize size, + GError **error); + +static GSource *g_converter_output_stream_create_source (GPollableOutputStream *stream, + GCancellable *cancellable); + +static void g_converter_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface); + +G_DEFINE_TYPE_WITH_CODE (GConverterOutputStream, + g_converter_output_stream, + G_TYPE_FILTER_OUTPUT_STREAM, + G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM, + g_converter_output_stream_pollable_iface_init); + ) static void g_converter_output_stream_class_init (GConverterOutputStreamClass *klass) @@ -129,6 +147,15 @@ g_converter_output_stream_class_init (GConverterOutputStreamClass *klass) } +static void +g_converter_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface) +{ + iface->can_poll = g_converter_output_stream_can_poll; + iface->is_writable = g_converter_output_stream_is_writable; + iface->write_nonblocking = g_converter_output_stream_write_nonblocking; + iface->create_source = g_converter_output_stream_create_source; +} + static void g_converter_output_stream_finalize (GObject *object) { @@ -339,7 +366,7 @@ buffer_append (Buffer *buffer, static gboolean flush_buffer (GConverterOutputStream *stream, - Buffer *buffer, + gboolean blocking, GCancellable *cancellable, GError **error) { @@ -356,12 +383,13 @@ flush_buffer (GConverterOutputStream *stream, available = buffer_data_size (&priv->converted_buffer); if (available > 0) { - res = g_output_stream_write_all (base_stream, - buffer_data (&priv->converted_buffer), - available, - &nwritten, - cancellable, - error); + res = g_pollable_stream_write_all (base_stream, + buffer_data (&priv->converted_buffer), + available, + blocking, + &nwritten, + cancellable, + error); buffer_consumed (&priv->converted_buffer, nwritten); return res; } @@ -370,11 +398,12 @@ flush_buffer (GConverterOutputStream *stream, static gssize -g_converter_output_stream_write (GOutputStream *stream, - const void *buffer, - gsize count, - GCancellable *cancellable, - GError **error) +write_internal (GOutputStream *stream, + const void *buffer, + gsize count, + gboolean blocking, + GCancellable *cancellable, + GError **error) { GConverterOutputStream *cstream; GConverterOutputStreamPrivate *priv; @@ -392,7 +421,7 @@ g_converter_output_stream_write (GOutputStream *stream, /* Write out all available pre-converted data and fail if not possible */ - if (!flush_buffer (cstream, &priv->converted_buffer, cancellable, error)) + if (!flush_buffer (cstream, blocking, cancellable, error)) return -1; if (priv->finished) @@ -499,11 +528,21 @@ g_converter_output_stream_write (GOutputStream *stream, even if writing this to the base stream fails. If it does we'll just stop early and report this error when we try again on the next write call. */ - flush_buffer (cstream, &priv->converted_buffer, cancellable, NULL); + flush_buffer (cstream, blocking, cancellable, NULL); return retval; } +static gssize +g_converter_output_stream_write (GOutputStream *stream, + const void *buffer, + gsize count, + GCancellable *cancellable, + GError **error) +{ + return write_internal (stream, buffer, count, TRUE, cancellable, error); +} + static gboolean g_converter_output_stream_flush (GOutputStream *stream, GCancellable *cancellable, @@ -525,7 +564,7 @@ g_converter_output_stream_flush (GOutputStream *stream, /* Write out all available pre-converted data and fail if not possible */ - if (!flush_buffer (cstream, &priv->converted_buffer, cancellable, error)) + if (!flush_buffer (cstream, TRUE, cancellable, error)) return FALSE; /* Ensure we have *some* initial target space */ @@ -590,12 +629,54 @@ g_converter_output_stream_flush (GOutputStream *stream, } /* Now write all converted data to base stream */ - if (!flush_buffer (cstream, &priv->converted_buffer, cancellable, error)) + if (!flush_buffer (cstream, TRUE, cancellable, error)) return FALSE; return TRUE; } +static gboolean +g_converter_output_stream_can_poll (GPollableOutputStream *stream) +{ + GOutputStream *base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream; + + return (G_IS_POLLABLE_OUTPUT_STREAM (base_stream) && + g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (base_stream))); +} + +static gboolean +g_converter_output_stream_is_writable (GPollableOutputStream *stream) +{ + GOutputStream *base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream; + + return g_pollable_output_stream_is_writable (G_POLLABLE_OUTPUT_STREAM (base_stream)); +} + +static gssize +g_converter_output_stream_write_nonblocking (GPollableOutputStream *stream, + const void *buffer, + gsize count, + GError **error) +{ + return write_internal (G_OUTPUT_STREAM (stream), buffer, count, FALSE, + NULL, error); +} + +static GSource * +g_converter_output_stream_create_source (GPollableOutputStream *stream, + GCancellable *cancellable) +{ + GOutputStream *base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream; + GSource *base_source, *pollable_source; + + base_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM (base_stream), NULL); + pollable_source = g_pollable_source_new_full (stream, base_source, + cancellable); + g_source_unref (base_source); + + return pollable_source; +} + /** * g_converter_output_stream_get_converter: * @converter_stream: a #GConverterOutputStream diff --git a/gio/gmemoryinputstream.c b/gio/gmemoryinputstream.c index 1fed058fe..e657d5b0d 100644 --- a/gio/gmemoryinputstream.c +++ b/gio/gmemoryinputstream.c @@ -22,6 +22,7 @@ #include "config.h" #include "gmemoryinputstream.h" +#include "gpollableinputstream.h" #include "ginputstream.h" #include "gseekable.h" #include "string.h" @@ -39,6 +40,8 @@ * #GMemoryInputStream is a class for using arbitrary * memory chunks as input for GIO streaming input operations. * + * As of GLib 2.34, #GMemoryInputStream implements + * #GPollableInputStream. */ typedef struct _Chunk Chunk; @@ -108,11 +111,20 @@ static gboolean g_memory_input_stream_truncate (GSeekable *seek goffset offset, GCancellable *cancellable, GError **error); + +static void g_memory_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface); +static gboolean g_memory_input_stream_is_readable (GPollableInputStream *stream); +static GSource *g_memory_input_stream_create_source (GPollableInputStream *stream, + GCancellable *cancellable); + static void g_memory_input_stream_finalize (GObject *object); G_DEFINE_TYPE_WITH_CODE (GMemoryInputStream, g_memory_input_stream, G_TYPE_INPUT_STREAM, G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE, - g_memory_input_stream_seekable_iface_init)) + g_memory_input_stream_seekable_iface_init); + G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM, + g_memory_input_stream_pollable_iface_init); + ) static void @@ -174,6 +186,13 @@ g_memory_input_stream_seekable_iface_init (GSeekableIface *iface) iface->truncate_fn = g_memory_input_stream_truncate; } +static void +g_memory_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface) +{ + iface->is_readable = g_memory_input_stream_is_readable; + iface->create_source = g_memory_input_stream_create_source; +} + static void g_memory_input_stream_init (GMemoryInputStream *stream) { @@ -526,3 +545,23 @@ g_memory_input_stream_truncate (GSeekable *seekable, _("Cannot truncate GMemoryInputStream")); return FALSE; } + +static gboolean +g_memory_input_stream_is_readable (GPollableInputStream *stream) +{ + return TRUE; +} + +static GSource * +g_memory_input_stream_create_source (GPollableInputStream *stream, + GCancellable *cancellable) +{ + GSource *base_source, *pollable_source; + + base_source = g_timeout_source_new (0); + pollable_source = g_pollable_source_new_full (stream, base_source, + cancellable); + g_source_unref (base_source); + + return pollable_source; +} diff --git a/gio/gmemoryoutputstream.c b/gio/gmemoryoutputstream.c index 08b4fba6b..b1da60d1d 100644 --- a/gio/gmemoryoutputstream.c +++ b/gio/gmemoryoutputstream.c @@ -25,6 +25,7 @@ #include "config.h" #include "gmemoryoutputstream.h" #include "goutputstream.h" +#include "gpollableoutputstream.h" #include "gseekable.h" #include "gsimpleasyncresult.h" #include "gioerror.h" @@ -41,6 +42,8 @@ * #GMemoryOutputStream is a class for using arbitrary * memory chunks as output for GIO streaming output operations. * + * As of GLib 2.34, #GMemoryOutputStream implements + * #GPollableOutputStream. */ #define MIN_ARRAY_SIZE 16 @@ -119,9 +122,17 @@ static gboolean g_memory_output_stream_truncate (GSeekable *see GCancellable *cancellable, GError **error); +static gboolean g_memory_output_stream_is_writable (GPollableOutputStream *stream); +static GSource *g_memory_output_stream_create_source (GPollableOutputStream *stream, + GCancellable *cancellable); + +static void g_memory_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface); + G_DEFINE_TYPE_WITH_CODE (GMemoryOutputStream, g_memory_output_stream, G_TYPE_OUTPUT_STREAM, G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE, - g_memory_output_stream_seekable_iface_init)) + g_memory_output_stream_seekable_iface_init); + G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM, + g_memory_output_stream_pollable_iface_init)) static void @@ -224,6 +235,13 @@ g_memory_output_stream_class_init (GMemoryOutputStreamClass *klass) G_PARAM_STATIC_STRINGS)); } +static void +g_memory_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface) +{ + iface->is_writable = g_memory_output_stream_is_writable; + iface->create_source = g_memory_output_stream_create_source; +} + static void g_memory_output_stream_set_property (GObject *object, guint prop_id, @@ -800,3 +818,23 @@ g_memory_output_stream_truncate (GSeekable *seekable, return TRUE; } + +static gboolean +g_memory_output_stream_is_writable (GPollableOutputStream *stream) +{ + return TRUE; +} + +static GSource * +g_memory_output_stream_create_source (GPollableOutputStream *stream, + GCancellable *cancellable) +{ + GSource *base_source, *pollable_source; + + base_source = g_timeout_source_new (0); + pollable_source = g_pollable_source_new_full (stream, base_source, + cancellable); + g_source_unref (base_source); + + return pollable_source; +} diff --git a/gio/tests/converter-stream.c b/gio/tests/converter-stream.c index 8017015eb..ae1bdfb5f 100644 --- a/gio/tests/converter-stream.c +++ b/gio/tests/converter-stream.c @@ -724,6 +724,219 @@ test_charset (gconstpointer data) g_object_unref (conv); } + +static void +client_connected (GObject *source, + GAsyncResult *result, + gpointer user_data) +{ + GSocketClient *client = G_SOCKET_CLIENT (source); + GSocketConnection **conn = user_data; + GError *error = NULL; + + *conn = g_socket_client_connect_finish (client, result, &error); + g_assert_no_error (error); +} + +static void +server_connected (GObject *source, + GAsyncResult *result, + gpointer user_data) +{ + GSocketListener *listener = G_SOCKET_LISTENER (source); + GSocketConnection **conn = user_data; + GError *error = NULL; + + *conn = g_socket_listener_accept_finish (listener, result, NULL, &error); + g_assert_no_error (error); +} + +static void +make_socketpair (GIOStream **left, + GIOStream **right) +{ + GInetAddress *iaddr; + GSocketAddress *saddr, *effective_address; + GSocketListener *listener; + GSocketClient *client; + GError *error = NULL; + GSocketConnection *client_conn = NULL, *server_conn = NULL; + + iaddr = g_inet_address_new_loopback (G_SOCKET_FAMILY_IPV4); + saddr = g_inet_socket_address_new (iaddr, 0); + g_object_unref (iaddr); + + listener = g_socket_listener_new (); + g_socket_listener_add_address (listener, saddr, + G_SOCKET_TYPE_STREAM, + G_SOCKET_PROTOCOL_TCP, + NULL, + &effective_address, + &error); + g_assert_no_error (error); + g_object_unref (saddr); + + client = g_socket_client_new (); + + g_socket_client_connect_async (client, + G_SOCKET_CONNECTABLE (effective_address), + NULL, client_connected, &client_conn); + g_socket_listener_accept_async (listener, NULL, + server_connected, &server_conn); + + while (!client_conn || !server_conn) + g_main_context_iteration (NULL, TRUE); + + g_object_unref (client); + g_object_unref (listener); + g_object_unref (effective_address); + + *left = G_IO_STREAM (client_conn); + *right = G_IO_STREAM (server_conn); +} + +static void +test_converter_pollable (void) +{ + GIOStream *left, *right; + guint8 *converted, *inptr; + guint8 *expanded, *outptr, *expanded_end; + gsize n_read, expanded_size; + gsize total_read; + gssize res; + gboolean is_readable; + GConverterResult cres; + GInputStream *cstream; + GPollableInputStream *pollable_in; + GOutputStream *socket_out, *mem_out, *cstream_out; + GPollableOutputStream *pollable_out; + GConverter *expander, *compressor; + GError *error; + int i; + + expander = g_expander_converter_new (); + expanded = g_malloc (100*1000); /* Large enough */ + cres = g_converter_convert (expander, + unexpanded_data, sizeof(unexpanded_data), + expanded, 100*1000, + G_CONVERTER_INPUT_AT_END, + &n_read, &expanded_size, NULL); + g_assert (cres == G_CONVERTER_FINISHED); + g_assert (n_read == 11); + g_assert (expanded_size == 41030); + expanded_end = expanded + expanded_size; + + make_socketpair (&left, &right); + + compressor = g_compressor_converter_new (); + + converted = g_malloc (100*1000); /* Large enough */ + + cstream = g_converter_input_stream_new (g_io_stream_get_input_stream (left), + compressor); + pollable_in = G_POLLABLE_INPUT_STREAM (cstream); + g_assert (g_pollable_input_stream_can_poll (pollable_in)); + + socket_out = g_io_stream_get_output_stream (right); + + total_read = 0; + outptr = expanded; + inptr = converted; + while (TRUE) + { + error = NULL; + + if (outptr < expanded_end) + { + res = g_output_stream_write (socket_out, + outptr, + MIN (1000, (expanded_end - outptr)), + NULL, &error); + g_assert_cmpint (res, >, 0); + outptr += res; + } + else if (socket_out) + { + g_object_unref (right); + socket_out = NULL; + } + + is_readable = g_pollable_input_stream_is_readable (pollable_in); + res = g_pollable_input_stream_read_nonblocking (pollable_in, + inptr, 1, + NULL, &error); + + /* is_readable can be a false positive, but not a false negative */ + if (!is_readable) + g_assert_cmpint (res, ==, -1); + + /* After closing the write end, we can't get WOULD_BLOCK any more */ + if (!socket_out) + g_assert_cmpint (res, !=, -1); + + if (res == -1) + { + g_assert_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK); + g_error_free (error); + + continue; + } + + if (res == 0) + break; + inptr += res; + total_read += res; + } + + g_assert (total_read == n_read - 1); /* Last 2 zeros are combined */ + g_assert (memcmp (converted, unexpanded_data, total_read) == 0); + + g_object_unref (cstream); + g_object_unref (left); + + g_converter_reset (compressor); + + /* This doesn't actually test the behavior on + * G_IO_ERROR_WOULD_BLOCK; to do that we'd need to implement a + * custom GOutputStream that we could control blocking on. + */ + + mem_out = g_memory_output_stream_new (NULL, 0, g_realloc, g_free); + cstream_out = g_converter_output_stream_new (mem_out, compressor); + g_object_unref (mem_out); + pollable_out = G_POLLABLE_OUTPUT_STREAM (cstream_out); + + for (i = 0; i < expanded_size; i++) + { + error = NULL; + res = g_pollable_output_stream_write_nonblocking (pollable_out, + expanded + i, 1, + NULL, &error); + g_assert (res != -1); + if (res == 0) + { + g_assert (i == expanded_size -1); + break; + } + g_assert (res == 1); + } + + g_output_stream_close (cstream_out, NULL, NULL); + + g_assert (g_memory_output_stream_get_data_size (G_MEMORY_OUTPUT_STREAM (mem_out)) == n_read - 1); /* Last 2 zeros are combined */ + g_assert (memcmp (g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (mem_out)), + unexpanded_data, + g_memory_output_stream_get_data_size (G_MEMORY_OUTPUT_STREAM (mem_out))) == 0); + + g_object_unref (cstream_out); + + g_free (expanded); + g_free (converted); + g_object_unref (expander); + g_object_unref (compressor); +} + + int main (int argc, char *argv[]) @@ -759,6 +972,7 @@ main (int argc, for (i = 0; i < G_N_ELEMENTS (charset_tests); i++) g_test_add_data_func (charset_tests[i].path, &charset_tests[i], test_charset); + g_test_add_func ("/converter-stream/pollable", test_converter_pollable); return g_test_run(); }