Bug 568575 – _async functions for GDataInputStream

2009-01-28  Ryan Lortie  <desrt@desrt.ca>

        Bug 568575 – _async functions for GDataInputStream

        * gdatainputstream.h:
        * gdatainputstream.c: add _async versions of read_line and read_until.
        * gio.symbols:
        * ../docs/reference/gio/gio-sections.txt: add new functions
        * tests/sleepy-stream.c: new test case for async read line
        * tests/Makefile.am: add new test


svn path=/trunk/; revision=7835
This commit is contained in:
Ryan Lortie 2009-01-28 16:39:39 +00:00 committed by Ryan Lortie
parent 17b9b43a7a
commit 129e86cf2c
7 changed files with 667 additions and 39 deletions

View File

@ -593,7 +593,11 @@ g_data_input_stream_read_uint32
g_data_input_stream_read_int64
g_data_input_stream_read_uint64
g_data_input_stream_read_line
g_data_input_stream_read_line_async
g_data_input_stream_read_line_finish
g_data_input_stream_read_until
g_data_input_stream_read_until_async
g_data_input_stream_read_until_finish
<SUBSECTION Standard>
GDataInputStreamClass
G_DATA_INPUT_STREAM

View File

@ -1,3 +1,14 @@
2009-01-28 Ryan Lortie <desrt@desrt.ca>
Bug 568575 _async functions for GDataInputStream
* gdatainputstream.h:
* gdatainputstream.c: add _async versions of read_line and read_until.
* gio.symbols:
* ../docs/reference/gio/gio-sections.txt: add new functions
* tests/sleepy-stream.c: new test case for async read line
* tests/Makefile.am: add new test
2009-01-22 Ryan Lortie <desrt@desrt.ca>
Bug 568723 g_buffered_input_stream_fill_async doesn't take count == -1

View File

@ -2,6 +2,7 @@
*
* Copyright (C) 2006-2007 Red Hat, Inc.
* Copyright (C) 2007 Jürg Billeter
* Copyright © 2009 Codethink Limited
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
@ -23,6 +24,8 @@
#include "config.h"
#include "gdatainputstream.h"
#include "gsimpleasyncresult.h"
#include "gcancellable.h"
#include "gioenumtypes.h"
#include "gioerror.h"
#include "glibintl.h"
@ -808,7 +811,6 @@ g_data_input_stream_read_line (GDataInputStream *stream,
return line;
}
static gssize
scan_for_chars (GDataInputStream *stream,
gsize *checked_out,
@ -928,5 +930,297 @@ g_data_input_stream_read_until (GDataInputStream *stream,
return data_until;
}
typedef struct
{
GDataInputStream *stream;
GSimpleAsyncResult *simple;
gboolean last_saw_cr;
gsize checked;
gint io_priority;
GCancellable *cancellable;
gchar *stop_chars;
gchar *line;
gsize length;
} GDataInputStreamReadData;
static void
g_data_input_stream_read_complete (GDataInputStreamReadData *data,
gsize read_length,
gsize skip_length,
gboolean need_idle_dispatch)
{
if (read_length || skip_length)
{
gssize bytes;
data->length = read_length;
data->line = g_malloc (read_length + 1);
data->line[read_length] = '\0';
/* we already checked the buffer. this shouldn't fail. */
bytes = g_input_stream_read (G_INPUT_STREAM (data->stream),
data->line, read_length, NULL, NULL);
g_assert_cmpint (bytes, ==, read_length);
bytes = g_input_stream_skip (G_INPUT_STREAM (data->stream),
skip_length, NULL, NULL);
g_assert_cmpint (bytes, ==, skip_length);
}
if (need_idle_dispatch)
g_simple_async_result_complete_in_idle (data->simple);
else
g_simple_async_result_complete (data->simple);
g_object_unref (data->simple);
}
static void
g_data_input_stream_read_line_ready (GObject *object,
GAsyncResult *result,
gpointer user_data)
{
GDataInputStreamReadData *data = user_data;
gssize found_pos;
gint newline_len;
if (result)
/* this is a callback. finish the async call. */
{
GBufferedInputStream *buffer = G_BUFFERED_INPUT_STREAM (data->stream);
GError *error = NULL;
gssize bytes;
bytes = g_buffered_input_stream_fill_finish (buffer, result, &error);
if (bytes <= 0)
{
if (bytes < 0)
/* stream error. */
{
g_simple_async_result_set_from_error (data->simple, error);
g_error_free (error);
data->checked = 0;
}
g_data_input_stream_read_complete (data, data->checked, 0, FALSE);
return;
}
/* only proceed if we got more bytes... */
}
if (data->stop_chars)
{
found_pos = scan_for_chars (data->stream,
&data->checked,
data->stop_chars);
newline_len = 0;
}
else
found_pos = scan_for_newline (data->stream, &data->checked,
&data->last_saw_cr, &newline_len);
if (found_pos == -1)
/* didn't find a full line; need to buffer some more bytes */
{
GBufferedInputStream *buffer = G_BUFFERED_INPUT_STREAM (data->stream);
gsize size;
size = g_buffered_input_stream_get_buffer_size (buffer);
if (g_buffered_input_stream_get_available (buffer) == size)
/* need to grow the buffer */
g_buffered_input_stream_set_buffer_size (buffer, size * 2);
/* try again */
g_buffered_input_stream_fill_async (buffer, -1, data->io_priority,
data->cancellable,
g_data_input_stream_read_line_ready,
user_data);
}
else
{
/* read the line and the EOL. no error is possible. */
g_data_input_stream_read_complete (data, found_pos,
newline_len, result == NULL);
}
}
static void
g_data_input_stream_read_data_free (gpointer user_data)
{
GDataInputStreamReadData *data = user_data;
/* we don't hold a ref to ->simple because it keeps a ref to us.
* we are called because it is being finalized.
*/
g_free (data->stop_chars);
if (data->cancellable)
g_object_unref (data->cancellable);
g_free (data->line);
g_slice_free (GDataInputStreamReadData, data);
}
static void
g_data_input_stream_read_async (GDataInputStream *stream,
const gchar *stop_chars,
gint io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data,
gpointer source_tag)
{
GDataInputStreamReadData *data;
data = g_slice_new (GDataInputStreamReadData);
data->stream = stream;
if (cancellable)
g_object_ref (cancellable);
data->cancellable = cancellable;
data->stop_chars = g_strdup (stop_chars);
data->io_priority = io_priority;
data->last_saw_cr = FALSE;
data->checked = 0;
data->line = NULL;
data->simple = g_simple_async_result_new (G_OBJECT (stream), callback,
user_data, source_tag);
g_simple_async_result_set_op_res_gpointer (data->simple, data,
g_data_input_stream_read_data_free);
g_data_input_stream_read_line_ready (NULL, NULL, data);
}
static gchar *
g_data_input_stream_read_finish (GDataInputStream *stream,
GAsyncResult *result,
gsize *length,
GError **error)
{
GDataInputStreamReadData *data;
GSimpleAsyncResult *simple;
gchar *line;
simple = G_SIMPLE_ASYNC_RESULT (result);
if (g_simple_async_result_propagate_error (simple, error))
return NULL;
data = g_simple_async_result_get_op_res_gpointer (simple);
line = data->line;
data->line = NULL;
if (length && line)
*length = data->length;
return line;
}
/**
* g_data_input_stream_read_line_async:
* @stream: a given #GDataInputStream.
* @io_priority: the <link linkend="io-priority">I/O priority</link>
* of the request.
* @cancellable: optional #GCancellable object, %NULL to ignore.
* @callback: callback to call when the request is satisfied.
* @user_data: the data to pass to callback function.
*
* The asynchronous version of g_data_input_stream_read_line(). It is
* an error to have two outstanding calls to this function.
**/
void
g_data_input_stream_read_line_async (GDataInputStream *stream,
gint io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
g_return_if_fail (G_IS_DATA_INPUT_STREAM (stream));
g_return_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable));
g_data_input_stream_read_async (stream, NULL, io_priority,
cancellable, callback, user_data,
g_data_input_stream_read_line_async);
}
/**
* g_data_input_stream_read_until_async:
* @stream: a given #GDataInputStream.
* @stop_chars: characters to terminate the read.
* @io_priority: the <link linkend="io-priority">I/O priority</link>
* of the request.
* @cancellable: optional #GCancellable object, %NULL to ignore.
* @callback: callback to call when the request is satisfied.
* @user_data: the data to pass to callback function.
*
* The asynchronous version of g_data_input_stream_read_until(). It is
* an error to have two outstanding calls to this function.
**/
void
g_data_input_stream_read_until_async (GDataInputStream *stream,
const gchar *stop_chars,
gint io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
g_return_if_fail (G_IS_DATA_INPUT_STREAM (stream));
g_return_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable));
g_return_if_fail (stop_chars != NULL);
g_data_input_stream_read_async (stream, stop_chars, io_priority,
cancellable, callback, user_data,
g_data_input_stream_read_until_async);
}
/**
* g_data_input_stream_read_line_finish:
* @stream: a given #GDataInputStream.
* @result: the #GAsyncResult that was provided to the callback.
* @length: a #gsize to get the length of the data read in.
* @error: #GError for error reporting.
*
* Finish an asynchronous call started by
* g_data_input_stream_read_line_async().
**/
gchar *
g_data_input_stream_read_line_finish (GDataInputStream *stream,
GAsyncResult *result,
gsize *length,
GError **error)
{
g_return_val_if_fail (
g_simple_async_result_is_valid (result, G_OBJECT (stream),
g_data_input_stream_read_line_async), NULL);
return g_data_input_stream_read_finish (stream, result, length, error);
}
/**
* g_data_input_stream_read_until_finish:
* @stream: a given #GDataInputStream.
* @result: the #GAsyncResult that was provided to the callback.
* @length: a #gsize to get the length of the data read in.
* @error: #GError for error reporting.
*
* Finish an asynchronous call started by
* g_data_input_stream_read_until_async().
**/
gchar *
g_data_input_stream_read_until_finish (GDataInputStream *stream,
GAsyncResult *result,
gsize *length,
GError **error)
{
g_return_val_if_fail (
g_simple_async_result_is_valid (result, G_OBJECT (stream),
g_data_input_stream_read_until_async), NULL);
return g_data_input_stream_read_finish (stream, result, length, error);
}
#define __G_DATA_INPUT_STREAM_C__
#include "gioaliasdef.c"

View File

@ -69,45 +69,64 @@ struct _GDataInputStreamClass
void (*_g_reserved5) (void);
};
GType g_data_input_stream_get_type (void) G_GNUC_CONST;
GDataInputStream * g_data_input_stream_new (GInputStream *base_stream);
GType g_data_input_stream_get_type (void) G_GNUC_CONST;
GDataInputStream * g_data_input_stream_new (GInputStream *base_stream);
void g_data_input_stream_set_byte_order (GDataInputStream *stream,
GDataStreamByteOrder order);
GDataStreamByteOrder g_data_input_stream_get_byte_order (GDataInputStream *stream);
void g_data_input_stream_set_newline_type (GDataInputStream *stream,
GDataStreamNewlineType type);
GDataStreamNewlineType g_data_input_stream_get_newline_type (GDataInputStream *stream);
guchar g_data_input_stream_read_byte (GDataInputStream *stream,
GCancellable *cancellable,
GError **error);
gint16 g_data_input_stream_read_int16 (GDataInputStream *stream,
GCancellable *cancellable,
GError **error);
guint16 g_data_input_stream_read_uint16 (GDataInputStream *stream,
GCancellable *cancellable,
GError **error);
gint32 g_data_input_stream_read_int32 (GDataInputStream *stream,
GCancellable *cancellable,
GError **error);
guint32 g_data_input_stream_read_uint32 (GDataInputStream *stream,
GCancellable *cancellable,
GError **error);
gint64 g_data_input_stream_read_int64 (GDataInputStream *stream,
GCancellable *cancellable,
GError **error);
guint64 g_data_input_stream_read_uint64 (GDataInputStream *stream,
GCancellable *cancellable,
GError **error);
char * g_data_input_stream_read_line (GDataInputStream *stream,
gsize *length,
GCancellable *cancellable,
GError **error);
char * g_data_input_stream_read_until (GDataInputStream *stream,
const gchar *stop_chars,
gsize *length,
GCancellable *cancellable,
GError **error);
void g_data_input_stream_set_byte_order (GDataInputStream *stream,
GDataStreamByteOrder order);
GDataStreamByteOrder g_data_input_stream_get_byte_order (GDataInputStream *stream);
void g_data_input_stream_set_newline_type (GDataInputStream *stream,
GDataStreamNewlineType type);
GDataStreamNewlineType g_data_input_stream_get_newline_type (GDataInputStream *stream);
guchar g_data_input_stream_read_byte (GDataInputStream *stream,
GCancellable *cancellable,
GError **error);
gint16 g_data_input_stream_read_int16 (GDataInputStream *stream,
GCancellable *cancellable,
GError **error);
guint16 g_data_input_stream_read_uint16 (GDataInputStream *stream,
GCancellable *cancellable,
GError **error);
gint32 g_data_input_stream_read_int32 (GDataInputStream *stream,
GCancellable *cancellable,
GError **error);
guint32 g_data_input_stream_read_uint32 (GDataInputStream *stream,
GCancellable *cancellable,
GError **error);
gint64 g_data_input_stream_read_int64 (GDataInputStream *stream,
GCancellable *cancellable,
GError **error);
guint64 g_data_input_stream_read_uint64 (GDataInputStream *stream,
GCancellable *cancellable,
GError **error);
char * g_data_input_stream_read_line (GDataInputStream *stream,
gsize *length,
GCancellable *cancellable,
GError **error);
void g_data_input_stream_read_line_async (GDataInputStream *stream,
gint io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data);
char * g_data_input_stream_read_line_finish (GDataInputStream *stream,
GAsyncResult *result,
gsize *length,
GError **error);
char * g_data_input_stream_read_until (GDataInputStream *stream,
const gchar *stop_chars,
gsize *length,
GCancellable *cancellable,
GError **error);
void g_data_input_stream_read_until_async (GDataInputStream *stream,
const gchar *stop_chars,
gint io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data);
char * g_data_input_stream_read_until_finish (GDataInputStream *stream,
GAsyncResult *result,
gsize *length,
GError **error);
G_END_DECLS

View File

@ -169,6 +169,8 @@ g_data_input_stream_read_uint32
g_data_input_stream_read_int64
g_data_input_stream_read_uint64
g_data_input_stream_read_line
g_data_input_stream_read_line_async
g_data_input_stream_read_line_finish
g_data_input_stream_read_until
#endif
#endif

View File

@ -25,6 +25,7 @@ TEST_PROGS += \
data-output-stream \
g-icon \
buffered-input-stream \
sleepy-stream \
filter-streams \
simple-async-result
@ -69,6 +70,9 @@ unix_streams_LDADD = $(progs_ldadd) \
simple_async_result_SOURCES = simple-async-result.c
simple_async_result_LDADD = $(progs_ldadd)
sleepy_stream_SOURCES = sleepy-stream.c
sleepy_stream_LDADD = $(progs_ldadd)
filter_streams_SOURCES = filter-streams.c
filter_streams_LDADD = $(progs_ldadd)

294
gio/tests/sleepy-stream.c Normal file
View File

@ -0,0 +1,294 @@
/*
* Copyright © 2009 Codethink Limited
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; either version 2 of the licence or (at
* your option) any later version.
*
* See the included COPYING file for more information.
*
* Author: Ryan Lortie <desrt@desrt.ca>
*/
#include <gio/gio.h>
#include <string.h>
#define MAX_PIECE_SIZE 100
#define MAX_PIECES 60
static gchar *
cook_piece (void)
{
char buffer[MAX_PIECE_SIZE * 2];
gint symbols, index = 0;
symbols = g_test_rand_int_range (1, MAX_PIECE_SIZE + 1);
while (symbols--)
{
gint c = g_test_rand_int_range (0, 30);
switch (c)
{
case 26:
buffer[index++] = '\n';
case 27:
buffer[index++] = '\r';
break;
case 28:
buffer[index++] = '\r';
case 29:
buffer[index++] = '\n';
break;
default:
buffer[index++] = c + 'a';
break;
}
g_assert_cmpint (index, <=, sizeof buffer);
}
return g_strndup (buffer, index);
}
static gchar **
cook_pieces (void)
{
gchar **array;
gint pieces;
pieces = g_test_rand_int_range (0, MAX_PIECES + 1);
array = g_new (char *, pieces + 1);
array[pieces] = NULL;
while (pieces--)
array[pieces] = cook_piece ();
return array;
}
typedef struct
{
GInputStream parent_instance;
gboolean built_to_fail;
gchar **pieces;
gint index;
const gchar *current;
} SleepyStream;
typedef GInputStreamClass SleepyStreamClass;
G_DEFINE_TYPE (SleepyStream, sleepy_stream, G_TYPE_INPUT_STREAM)
static gssize
sleepy_stream_read (GInputStream *stream,
void *buffer,
gsize length,
GCancellable *cancellable,
GError **error)
{
SleepyStream *sleepy = (SleepyStream *) stream;
if (sleepy->pieces[sleepy->index] == NULL)
{
if (sleepy->built_to_fail)
{
g_set_error (error, 0, 0, "fail");
return -1;
}
else
return 0;
}
else
{
if (!sleepy->current)
sleepy->current = sleepy->pieces[sleepy->index++];
length = MIN (strlen (sleepy->current), length);
memcpy (buffer, sleepy->current, length);
sleepy->current += length;
if (*sleepy->current == '\0')
sleepy->current = NULL;
return length;
}
}
static void
sleepy_stream_init (SleepyStream *sleepy)
{
sleepy->pieces = cook_pieces ();
sleepy->built_to_fail = FALSE;
sleepy->index = 0;
}
static void
sleepy_stream_finalize (GObject *object)
{
SleepyStream *sleepy = (SleepyStream *) object;
g_strfreev (sleepy->pieces);
G_OBJECT_CLASS (sleepy_stream_parent_class)
->finalize (object);
}
static void
sleepy_stream_class_init (SleepyStreamClass *class)
{
G_OBJECT_CLASS (class)->finalize = sleepy_stream_finalize;
class->read_fn = sleepy_stream_read;
/* no read_async implementation.
* main thread will sleep while read runs in a worker.
*/
}
SleepyStream *
sleepy_stream_new (void)
{
return g_object_new (sleepy_stream_get_type (), NULL);
}
static gboolean
read_line (GDataInputStream *stream,
GString *string,
const gchar *eol,
GError **error)
{
gsize length;
int eol_len;
char *str;
eol_len = 1 + (eol[1] != '\0');
str = g_data_input_stream_read_line (stream, &length, NULL, error);
if (str == NULL)
return FALSE;
g_assert (strstr (str, eol) == NULL);
g_assert (strlen (str) == length);
g_string_append (string, str);
g_string_append (string, eol);
g_free (str);
return TRUE;
}
static void
build_comparison (GString *str,
SleepyStream *stream)
{
/* build this for comparison */
gint i;
for (i = 0; stream->pieces[i]; i++)
g_string_append (str, stream->pieces[i]);
if (str->len && str->str[str->len - 1] != '\n')
g_string_append_c (str, '\n');
}
void
test (void)
{
SleepyStream *stream = sleepy_stream_new ();
GDataInputStream *data;
GError *error = NULL;
GString *one;
GString *two;
one = g_string_new (NULL);
two = g_string_new (NULL);
data = g_data_input_stream_new (G_INPUT_STREAM (stream));
g_data_input_stream_set_newline_type (data, G_DATA_STREAM_NEWLINE_TYPE_LF);
build_comparison (one, stream);
while (read_line (data, two, "\n", &error));
g_assert_cmpstr (one->str, ==, two->str);
g_string_free (one, TRUE);
g_string_free (two, TRUE);
g_object_unref (stream);
g_object_unref (data);
}
static GDataInputStream *data;
static GString *one, *two;
static GMainLoop *loop;
static const gchar *eol;
static void
asynch_ready (GObject *object,
GAsyncResult *result,
gpointer user_data)
{
GError *error = NULL;
gsize length;
gchar *str;
g_assert (data == G_DATA_INPUT_STREAM (object));
str = g_data_input_stream_read_line_finish (data, result, &length, &error);
if (str == NULL)
{
g_main_loop_quit (loop);
if (error)
g_error_free (error);
}
else
{
g_assert (length == strlen (str));
g_string_append (two, str);
g_string_append (two, eol);
g_free (str);
/* MOAR!! */
g_data_input_stream_read_line_async (data, 0, NULL, asynch_ready, NULL);
}
}
void
asynch (void)
{
SleepyStream *sleepy = sleepy_stream_new ();
data = g_data_input_stream_new (G_INPUT_STREAM (sleepy));
one = g_string_new (NULL);
two = g_string_new (NULL);
eol = "\n";
build_comparison (one, sleepy);
g_data_input_stream_read_line_async (data, 0, NULL, asynch_ready, NULL);
g_main_loop_run (loop = g_main_loop_new (NULL, FALSE));
g_assert_cmpstr (one->str, ==, two->str);
g_string_free (one, TRUE);
g_string_free (two, TRUE);
g_object_unref (sleepy);
g_object_unref (data);
}
int
main (int argc, char **argv)
{
g_test_init (&argc, &argv, NULL);
g_test_bug_base ("http://bugzilla.gnome.org/");
g_type_init ();
g_test_add_func ("/filter-stream/input", test);
g_test_add_func ("/filter-stream/async", asynch);
return g_test_run();
}