gio/tests: add some more async stream tests

Add read_async() and skip_async() tests to buffered-input-stream.

Fix and re-enable filter-streams's existing close_async() tests, and
add read_async(), skip_async(), and write_async() tests as well. Also,
redo the tests to use dummy GFilterInputStream and GFilterOutputStream
subclasses rather than GBufferedInput/OutputStream, so that we're
testing the base filter stream implementations of everything (since
the buffered stream overrides are already getting tested in the
buffered-input-stream and buffered-output-stream tests anyway).

Add a skip_async() test to unix-streams. (This one would crash without
the bugfix in the previous commit.)
This commit is contained in:
Dan Winship
2013-01-16 10:09:10 -05:00
parent acfa6e2337
commit 578b657f95
3 changed files with 394 additions and 48 deletions

View File

@@ -120,8 +120,9 @@ reader_thread (gpointer user_data)
char main_buf[sizeof (DATA)];
gssize main_len, main_offset;
static void readable (GObject *source, GAsyncResult *res, gpointer user_data);
static void writable (GObject *source, GAsyncResult *res, gpointer user_data);
static void main_thread_read (GObject *source, GAsyncResult *res, gpointer user_data);
static void main_thread_skipped (GObject *source, GAsyncResult *res, gpointer user_data);
static void main_thread_wrote (GObject *source, GAsyncResult *res, gpointer user_data);
static void
do_main_cancel (GOutputStream *out)
@@ -131,13 +132,14 @@ do_main_cancel (GOutputStream *out)
}
static void
readable (GObject *source, GAsyncResult *res, gpointer user_data)
main_thread_skipped (GObject *source, GAsyncResult *res, gpointer user_data)
{
GInputStream *in = G_INPUT_STREAM (source);
GOutputStream *out = user_data;
GError *err = NULL;
gssize nskipped;
main_len = g_input_stream_read_finish (in, res, &err);
nskipped = g_input_stream_skip_finish (in, res, &err);
if (g_cancellable_is_cancelled (main_cancel))
{
@@ -145,16 +147,62 @@ readable (GObject *source, GAsyncResult *res, gpointer user_data)
return;
}
g_assert (err == NULL);
g_assert_no_error (err);
main_offset = 0;
g_output_stream_write_async (out, main_buf, main_len,
G_PRIORITY_DEFAULT, main_cancel,
writable, in);
main_offset += nskipped;
if (main_offset == main_len)
{
main_offset = 0;
g_output_stream_write_async (out, main_buf, main_len,
G_PRIORITY_DEFAULT, main_cancel,
main_thread_wrote, in);
}
else
{
g_input_stream_skip_async (in, main_len - main_offset,
G_PRIORITY_DEFAULT, main_cancel,
main_thread_skipped, out);
}
}
static void
writable (GObject *source, GAsyncResult *res, gpointer user_data)
main_thread_read (GObject *source, GAsyncResult *res, gpointer user_data)
{
GInputStream *in = G_INPUT_STREAM (source);
GOutputStream *out = user_data;
GError *err = NULL;
gssize nread;
nread = g_input_stream_read_finish (in, res, &err);
if (g_cancellable_is_cancelled (main_cancel))
{
do_main_cancel (out);
return;
}
g_assert_no_error (err);
main_offset += nread;
if (main_offset == sizeof (DATA))
{
main_len = main_offset;
main_offset = 0;
/* Now skip the same amount */
g_input_stream_skip_async (in, main_len,
G_PRIORITY_DEFAULT, main_cancel,
main_thread_skipped, out);
}
else
{
g_input_stream_read_async (in, main_buf, sizeof (main_buf),
G_PRIORITY_DEFAULT, main_cancel,
main_thread_read, out);
}
}
static void
main_thread_wrote (GObject *source, GAsyncResult *res, gpointer user_data)
{
GOutputStream *out = G_OUTPUT_STREAM (source);
GInputStream *in = user_data;
@@ -169,22 +217,23 @@ writable (GObject *source, GAsyncResult *res, gpointer user_data)
return;
}
g_assert (err == NULL);
g_assert_no_error (err);
g_assert_cmpint (nwrote, <=, main_len - main_offset);
main_offset += nwrote;
if (main_offset == main_len)
{
main_offset = 0;
g_input_stream_read_async (in, main_buf, sizeof (main_buf),
G_PRIORITY_DEFAULT, main_cancel,
readable, out);
main_thread_read, out);
}
else
{
g_output_stream_write_async (out, main_buf + main_offset,
main_len - main_offset,
G_PRIORITY_DEFAULT, main_cancel,
writable, in);
main_thread_wrote, in);
}
}
@@ -204,12 +253,13 @@ test_pipe_io (gconstpointer nonblocking)
/* Split off two (additional) threads, a reader and a writer. From
* the writer thread, write data synchronously in small chunks,
* which gets read asynchronously by the main thread and then
* written asynchronously to the reader thread, which reads it
* synchronously. Eventually a timeout in the main thread will cause
* it to cancel the writer thread, which will in turn cancel the
* read op in the main thread, which will then close the pipe to
* the reader thread, causing the read op to fail.
* which gets alternately read and skipped asynchronously by the
* main thread and then (if not skipped) written asynchronously to
* the reader thread, which reads it synchronously. Eventually a
* timeout in the main thread will cause it to cancel the writer
* thread, which will in turn cancel the read op in the main thread,
* which will then close the pipe to the reader thread, causing the
* read op to fail.
*/
g_assert (pipe (writer_pipe) == 0 && pipe (reader_pipe) == 0);
@@ -240,7 +290,7 @@ test_pipe_io (gconstpointer nonblocking)
g_input_stream_read_async (in, main_buf, sizeof (main_buf),
G_PRIORITY_DEFAULT, main_cancel,
readable, out);
main_thread_read, out);
g_timeout_add (500, timeout, writer_cancel);