Add g_input_stream_read_all_async()

Add an asynchronous version of _read_all().

This API is not fully consistent with the normal expectations of a
non-asynchronous version.  Consistency between the sync and async version is
probably more important.

The API will still bind correctly, but access to all functionality will
not be available: specifically, in the case of an error, higher level
languages will be unable to determine how many bytes were successfully
read before the error.  Most users will probably not want to use this
information anyway, so this is OK -- and if they do need the
information, then they can just write the loop for themselves.

Heavily based on a patch from Ignacio Casal Quinteiro.

https://bugzilla.gnome.org/show_bug.cgi?id=737451
This commit is contained in:
Ryan Lortie 2014-09-29 11:40:46 -04:00
parent aabc3a41c3
commit 76b890d0f1
2 changed files with 194 additions and 0 deletions

View File

@ -655,6 +655,185 @@ g_input_stream_read_finish (GInputStream *stream,
return class->read_finish (stream, result, error); return class->read_finish (stream, result, error);
} }
typedef struct
{
gchar *buffer;
gsize to_read;
gsize bytes_read;
} AsyncReadAll;
static void
free_async_read_all (gpointer data)
{
g_slice_free (AsyncReadAll, data);
}
static void
read_all_callback (GObject *stream,
GAsyncResult *result,
gpointer user_data)
{
GTask *task = user_data;
AsyncReadAll *data = g_task_get_task_data (task);
gboolean got_eof = FALSE;
if (result)
{
GError *error = NULL;
gssize nread;
nread = g_input_stream_read_finish (G_INPUT_STREAM (stream), result, &error);
if (nread == -1)
{
g_task_return_error (task, error);
g_object_unref (task);
return;
}
g_assert_cmpint (nread, <=, data->to_read);
data->to_read -= nread;
data->bytes_read += nread;
got_eof = (nread == 0);
}
if (got_eof || data->to_read == 0)
{
g_task_return_boolean (task, TRUE);
g_object_unref (task);
}
else
g_input_stream_read_async (G_INPUT_STREAM (stream),
data->buffer + data->bytes_read,
data->to_read,
g_task_get_priority (task),
g_task_get_cancellable (task),
read_all_callback, task);
}
static void
read_all_async_thread (GTask *task,
gpointer source_object,
gpointer task_data,
GCancellable *cancellable)
{
GInputStream *stream = source_object;
AsyncReadAll *data = task_data;
GError *error = NULL;
if (g_input_stream_read_all (stream, data->buffer, data->to_read, &data->bytes_read,
g_task_get_cancellable (task), &error))
g_task_return_boolean (task, TRUE);
else
g_task_return_error (task, error);
}
/**
* g_input_stream_read_all_async:
* @stream: A #GInputStream
* @buffer: (array length=count) (element-type guint8): a buffer to
* read data into (which should be at least count bytes long)
* @count: the number of bytes that will be read from the stream
* @io_priority: the [I/O priority][io-priority] of the request
* @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore
* @callback: (scope async): callback to call when the request is satisfied
* @user_data: (closure): the data to pass to callback function
*
* Request an asynchronous read of @count bytes from the stream into the
* buffer starting at @buffer.
*
* This is the asynchronous equivalent of g_input_stream_read_all().
*
* Call g_input_stream_read_all_finish() to collect the result.
*
* Any outstanding I/O request with higher priority (lower numerical
* value) will be executed before an outstanding request with lower
* priority. Default priority is %G_PRIORITY_DEFAULT.
*
* Since: 2.44
**/
void
g_input_stream_read_all_async (GInputStream *stream,
void *buffer,
gsize count,
int io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
AsyncReadAll *data;
GTask *task;
g_return_if_fail (G_IS_INPUT_STREAM (stream));
g_return_if_fail (buffer != NULL || count == 0);
task = g_task_new (stream, cancellable, callback, user_data);
data = g_slice_new0 (AsyncReadAll);
data->buffer = buffer;
data->to_read = count;
g_task_set_task_data (task, data, free_async_read_all);
g_task_set_priority (task, io_priority);
/* If async reads are going to be handled via the threadpool anyway
* then we may as well do it with a single dispatch instead of
* bouncing in and out.
*/
if (g_input_stream_async_read_is_via_threads (stream))
{
g_task_run_in_thread (task, read_all_async_thread);
g_object_unref (task);
}
else
read_all_callback (G_OBJECT (stream), NULL, task);
}
/**
* g_input_stream_read_all_finish:
* @stream: a #GInputStream
* @result: a #GAsyncResult
* @bytes_read: (out): location to store the number of bytes that was read from the stream
* @error: a #GError location to store the error occurring, or %NULL to ignore
*
* Finishes an asynchronous stream read operation started with
* g_input_stream_read_all_async().
*
* As a special exception to the normal conventions for functions that
* use #GError, if this function returns %FALSE (and sets @error) then
* @bytes_read will be set to the number of bytes that were successfully
* read before the error was encountered. This functionality is only
* available from C. If you need it from another language then you must
* write your own loop around g_input_stream_read_async().
*
* Returns: %TRUE on success, %FALSE if there was an error
*
* Since: 2.44
**/
gboolean
g_input_stream_read_all_finish (GInputStream *stream,
GAsyncResult *result,
gsize *bytes_read,
GError **error)
{
GTask *task;
g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
task = G_TASK (result);
if (bytes_read)
{
AsyncReadAll *data = g_task_get_task_data (task);
*bytes_read = data->bytes_read;
}
return g_task_propagate_boolean (task, error);
}
static void static void
read_bytes_callback (GObject *stream, read_bytes_callback (GObject *stream,
GAsyncResult *result, GAsyncResult *result,

View File

@ -151,6 +151,21 @@ GLIB_AVAILABLE_IN_ALL
gssize g_input_stream_read_finish (GInputStream *stream, gssize g_input_stream_read_finish (GInputStream *stream,
GAsyncResult *result, GAsyncResult *result,
GError **error); GError **error);
GLIB_AVAILABLE_IN_2_44
void g_input_stream_read_all_async (GInputStream *stream,
void *buffer,
gsize count,
int io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data);
GLIB_AVAILABLE_IN_2_44
gboolean g_input_stream_read_all_finish (GInputStream *stream,
GAsyncResult *result,
gsize *bytes_read,
GError **error);
GLIB_AVAILABLE_IN_2_34 GLIB_AVAILABLE_IN_2_34
void g_input_stream_read_bytes_async (GInputStream *stream, void g_input_stream_read_bytes_async (GInputStream *stream,
gsize count, gsize count,