gio: port basic I/O classes from GSimpleAsyncResult to GTask

https://bugzilla.gnome.org/show_bug.cgi?id=661767
This commit is contained in:
Dan Winship 2012-08-02 15:49:59 -04:00
parent 586adb9790
commit 669505e354
10 changed files with 770 additions and 1149 deletions

View File

@ -26,7 +26,7 @@
#include "ginputstream.h"
#include "gcancellable.h"
#include "gasyncresult.h"
#include "gsimpleasyncresult.h"
#include "gtask.h"
#include "gseekable.h"
#include "gioerror.h"
#include <string.h>
@ -483,39 +483,36 @@ g_buffered_input_stream_fill_async (GBufferedInputStream *stream,
gpointer user_data)
{
GBufferedInputStreamClass *class;
GSimpleAsyncResult *simple;
GError *error = NULL;
g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
if (count == 0)
{
simple = g_simple_async_result_new (G_OBJECT (stream),
callback,
user_data,
g_buffered_input_stream_fill_async);
g_simple_async_result_complete_in_idle (simple);
g_object_unref (simple);
GTask *task;
task = g_task_new (stream, cancellable, callback, user_data);
g_task_set_source_tag (task, g_buffered_input_stream_fill_async);
g_task_return_int (task, 0);
g_object_unref (task);
return;
}
if (count < -1)
{
g_simple_async_report_error_in_idle (G_OBJECT (stream),
callback,
user_data,
G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
_("Too large count value passed to %s"),
G_STRFUNC);
g_task_report_new_error (stream, callback, user_data,
g_buffered_input_stream_fill_async,
G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
_("Too large count value passed to %s"),
G_STRFUNC);
return;
}
if (!g_input_stream_set_pending (G_INPUT_STREAM (stream), &error))
{
g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
callback,
user_data,
error);
g_task_report_error (stream, callback, user_data,
g_buffered_input_stream_fill_async,
error);
return;
}
@ -550,10 +547,7 @@ g_buffered_input_stream_fill_finish (GBufferedInputStream *stream,
if (g_async_result_legacy_propagate_error (result, error))
return -1;
else if (g_async_result_is_tagged (result, g_buffered_input_stream_fill_async))
{
/* Special case read of 0 bytes */
return 0;
}
return g_task_propagate_int (G_TASK (result), error);
class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
return class->fill_finish (stream, result, error);
@ -1041,38 +1035,28 @@ fill_async_callback (GObject *source_object,
{
GError *error;
gssize res;
GSimpleAsyncResult *simple;
simple = user_data;
GTask *task = user_data;
error = NULL;
res = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
result, &error);
g_simple_async_result_set_op_res_gssize (simple, res);
if (res == -1)
{
g_simple_async_result_take_error (simple, error);
}
g_task_return_error (task, error);
else
{
GBufferedInputStream *stream;
GBufferedInputStreamPrivate *priv;
GObject *object;
object = g_async_result_get_source_object (G_ASYNC_RESULT (simple));
priv = G_BUFFERED_INPUT_STREAM (object)->priv;
stream = g_task_get_source_object (task);
priv = G_BUFFERED_INPUT_STREAM (stream)->priv;
g_assert_cmpint (priv->end + res, <=, priv->len);
priv->end += res;
g_object_unref (object);
g_task_return_int (task, res);
}
/* Complete immediately, not in idle, since we're already
* in a mainloop callout
*/
g_simple_async_result_complete (simple);
g_object_unref (simple);
g_object_unref (task);
}
static void
@ -1085,7 +1069,7 @@ g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream,
{
GBufferedInputStreamPrivate *priv;
GInputStream *base_stream;
GSimpleAsyncResult *simple;
GTask *task;
gsize in_buffer;
priv = stream->priv;
@ -1102,9 +1086,7 @@ g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream,
if (priv->len - priv->end < count)
compact_buffer (stream);
simple = g_simple_async_result_new (G_OBJECT (stream),
callback, user_data,
g_buffered_input_stream_real_fill_async);
task = g_task_new (stream, cancellable, callback, user_data);
base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
g_input_stream_read_async (base_stream,
@ -1113,7 +1095,7 @@ g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream,
io_priority,
cancellable,
fill_async_callback,
simple);
task);
}
static gssize
@ -1121,17 +1103,9 @@ g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
GAsyncResult *result,
GError **error)
{
GSimpleAsyncResult *simple;
gssize nread;
g_return_val_if_fail (g_task_is_valid (result, stream), -1);
simple = G_SIMPLE_ASYNC_RESULT (result);
g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_real_fill_async);
if (g_simple_async_result_propagate_error (simple, error))
return -1;
nread = g_simple_async_result_get_op_res_gssize (simple);
return nread;
return g_task_propagate_int (G_TASK (result), error);
}
typedef struct
@ -1152,12 +1126,12 @@ large_skip_callback (GObject *source_object,
GAsyncResult *result,
gpointer user_data)
{
GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
GTask *task = G_TASK (user_data);
SkipAsyncData *data;
GError *error;
gssize nread;
data = g_simple_async_result_get_op_res_gpointer (simple);
data = g_task_get_task_data (task);
error = NULL;
nread = g_input_stream_skip_finish (G_INPUT_STREAM (source_object),
@ -1165,18 +1139,19 @@ large_skip_callback (GObject *source_object,
/* Only report the error if we've not already read some data */
if (nread < 0 && data->bytes_skipped == 0)
g_simple_async_result_take_error (simple, error);
else if (error)
g_error_free (error);
g_task_return_error (task, error);
else
{
if (error)
g_error_free (error);
if (nread > 0)
data->bytes_skipped += nread;
if (nread > 0)
data->bytes_skipped += nread;
/* Complete immediately, not in idle, since we're already
* in a mainloop callout
*/
g_simple_async_result_complete (simple);
g_object_unref (simple);
g_task_return_int (task, data->bytes_skipped);
}
g_object_unref (task);
}
static void
@ -1184,7 +1159,7 @@ skip_fill_buffer_callback (GObject *source_object,
GAsyncResult *result,
gpointer user_data)
{
GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
GTask *task = G_TASK (user_data);
GBufferedInputStream *bstream;
GBufferedInputStreamPrivate *priv;
SkipAsyncData *data;
@ -1195,31 +1170,32 @@ skip_fill_buffer_callback (GObject *source_object,
bstream = G_BUFFERED_INPUT_STREAM (source_object);
priv = bstream->priv;
data = g_simple_async_result_get_op_res_gpointer (simple);
data = g_task_get_task_data (task);
error = NULL;
nread = g_buffered_input_stream_fill_finish (bstream,
result, &error);
if (nread < 0 && data->bytes_skipped == 0)
g_simple_async_result_take_error (simple, error);
else if (error)
g_error_free (error);
if (nread > 0)
g_task_return_error (task, error);
else
{
available = priv->end - priv->pos;
data->count = MIN (data->count, available);
if (error)
g_error_free (error);
data->bytes_skipped += data->count;
priv->pos += data->count;
if (nread > 0)
{
available = priv->end - priv->pos;
data->count = MIN (data->count, available);
data->bytes_skipped += data->count;
priv->pos += data->count;
}
g_task_return_int (task, data->bytes_skipped);
}
/* Complete immediately, not in idle, since we're already
* in a mainloop callout
*/
g_simple_async_result_complete (simple);
g_object_unref (simple);
g_object_unref (task);
}
static void
@ -1235,7 +1211,7 @@ g_buffered_input_stream_skip_async (GInputStream *stream,
GBufferedInputStreamClass *class;
GInputStream *base_stream;
gsize available;
GSimpleAsyncResult *simple;
GTask *task;
SkipAsyncData *data;
bstream = G_BUFFERED_INPUT_STREAM (stream);
@ -1243,20 +1219,17 @@ g_buffered_input_stream_skip_async (GInputStream *stream,
data = g_slice_new (SkipAsyncData);
data->bytes_skipped = 0;
simple = g_simple_async_result_new (G_OBJECT (stream),
callback, user_data,
g_buffered_input_stream_skip_async);
g_simple_async_result_set_op_res_gpointer (simple, data, free_skip_async_data);
task = g_task_new (stream, cancellable, callback, user_data);
g_task_set_task_data (task, data, free_skip_async_data);
available = priv->end - priv->pos;
if (count <= available)
{
priv->pos += count;
data->bytes_skipped = count;
g_simple_async_result_complete_in_idle (simple);
g_object_unref (simple);
g_task_return_int (task, count);
g_object_unref (task);
return;
}
@ -1282,13 +1255,13 @@ g_buffered_input_stream_skip_async (GInputStream *stream,
count,
io_priority, cancellable,
large_skip_callback,
simple);
task);
}
else
{
class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
class->fill_async (bstream, priv->len, io_priority, cancellable,
skip_fill_buffer_callback, simple);
skip_fill_buffer_callback, task);
}
}
@ -1297,14 +1270,7 @@ g_buffered_input_stream_skip_finish (GInputStream *stream,
GAsyncResult *result,
GError **error)
{
GSimpleAsyncResult *simple;
SkipAsyncData *data;
g_return_val_if_fail (g_task_is_valid (result, stream), -1);
simple = G_SIMPLE_ASYNC_RESULT (result);
g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_skip_async);
data = g_simple_async_result_get_op_res_gpointer (simple);
return data->bytes_skipped;
return g_task_propagate_int (G_TASK (result), error);
}

View File

@ -24,7 +24,7 @@
#include "gbufferedoutputstream.h"
#include "goutputstream.h"
#include "gseekable.h"
#include "gsimpleasyncresult.h"
#include "gtask.h"
#include "string.h"
#include "gioerror.h"
#include "glibintl.h"
@ -652,9 +652,10 @@ free_flush_data (gpointer data)
* and so closing and writing is just a special
* case of flushing + some addition stuff */
static void
flush_buffer_thread (GSimpleAsyncResult *result,
GObject *object,
GCancellable *cancellable)
flush_buffer_thread (GTask *task,
gpointer object,
gpointer task_data,
GCancellable *cancellable)
{
GBufferedOutputStream *stream;
GOutputStream *base_stream;
@ -663,7 +664,7 @@ flush_buffer_thread (GSimpleAsyncResult *result,
GError *error = NULL;
stream = G_BUFFERED_OUTPUT_STREAM (object);
fdata = g_simple_async_result_get_op_res_gpointer (result);
fdata = task_data;
base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
res = flush_buffer (stream, cancellable, &error);
@ -689,7 +690,9 @@ flush_buffer_thread (GSimpleAsyncResult *result,
}
if (res == FALSE)
g_simple_async_result_take_error (result, error);
g_task_return_error (task, error);
else
g_task_return_boolean (task, TRUE);
}
static void
@ -699,25 +702,19 @@ g_buffered_output_stream_flush_async (GOutputStream *stream,
GAsyncReadyCallback callback,
gpointer data)
{
GSimpleAsyncResult *res;
FlushData *fdata;
GTask *task;
FlushData *fdata;
fdata = g_slice_new (FlushData);
fdata->flush_stream = TRUE;
fdata->close_stream = FALSE;
res = g_simple_async_result_new (G_OBJECT (stream),
callback,
data,
g_buffered_output_stream_flush_async);
task = g_task_new (stream, cancellable, callback, data);
g_task_set_task_data (task, fdata, free_flush_data);
g_task_set_priority (task, io_priority);
g_simple_async_result_set_op_res_gpointer (res, fdata, free_flush_data);
g_simple_async_result_run_in_thread (res,
flush_buffer_thread,
io_priority,
cancellable);
g_object_unref (res);
g_task_run_in_thread (task, flush_buffer_thread);
g_object_unref (task);
}
static gboolean
@ -725,14 +722,9 @@ g_buffered_output_stream_flush_finish (GOutputStream *stream,
GAsyncResult *result,
GError **error)
{
GSimpleAsyncResult *simple;
g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
simple = G_SIMPLE_ASYNC_RESULT (result);
g_warn_if_fail (g_simple_async_result_get_source_tag (simple) ==
g_buffered_output_stream_flush_async);
return TRUE;
return g_task_propagate_boolean (G_TASK (result), error);
}
static void
@ -742,24 +734,18 @@ g_buffered_output_stream_close_async (GOutputStream *stream,
GAsyncReadyCallback callback,
gpointer data)
{
GSimpleAsyncResult *res;
FlushData *fdata;
GTask *task;
FlushData *fdata;
fdata = g_slice_new (FlushData);
fdata->close_stream = TRUE;
res = g_simple_async_result_new (G_OBJECT (stream),
callback,
data,
g_buffered_output_stream_close_async);
task = g_task_new (stream, cancellable, callback, data);
g_task_set_task_data (task, fdata, free_flush_data);
g_task_set_priority (task, io_priority);
g_simple_async_result_set_op_res_gpointer (res, fdata, free_flush_data);
g_simple_async_result_run_in_thread (res,
flush_buffer_thread,
io_priority,
cancellable);
g_object_unref (res);
g_task_run_in_thread (task, flush_buffer_thread);
g_object_unref (task);
}
static gboolean
@ -767,12 +753,7 @@ g_buffered_output_stream_close_finish (GOutputStream *stream,
GAsyncResult *result,
GError **error)
{
GSimpleAsyncResult *simple;
g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
simple = G_SIMPLE_ASYNC_RESULT (result);
g_warn_if_fail (g_simple_async_result_get_source_tag (simple) ==
g_buffered_output_stream_close_async);
return TRUE;
return g_task_propagate_boolean (G_TASK (result), error);
}

View File

@ -24,7 +24,7 @@
#include "config.h"
#include "gdatainputstream.h"
#include "gsimpleasyncresult.h"
#include "gtask.h"
#include "gcancellable.h"
#include "gioenumtypes.h"
#include "gioerror.h"
@ -952,49 +952,41 @@ g_data_input_stream_read_until (GDataInputStream *stream,
typedef struct
{
GDataInputStream *stream;
GSimpleAsyncResult *simple;
gboolean last_saw_cr;
gsize checked;
gint io_priority;
GCancellable *cancellable;
gchar *stop_chars;
gssize stop_chars_len;
gchar *line;
gsize length;
} GDataInputStreamReadData;
static void
g_data_input_stream_read_complete (GDataInputStreamReadData *data,
gsize read_length,
gsize skip_length,
gboolean need_idle_dispatch)
g_data_input_stream_read_complete (GTask *task,
gsize read_length,
gsize skip_length)
{
GDataInputStreamReadData *data = g_task_get_task_data (task);
GInputStream *stream = g_task_get_source_object (task);
char *line = NULL;
if (read_length || skip_length)
{
gssize bytes;
data->length = read_length;
data->line = g_malloc (read_length + 1);
data->line[read_length] = '\0';
line = g_malloc (read_length + 1);
line[read_length] = '\0';
/* we already checked the buffer. this shouldn't fail. */
bytes = g_input_stream_read (G_INPUT_STREAM (data->stream),
data->line, read_length, NULL, NULL);
bytes = g_input_stream_read (stream, line, read_length, NULL, NULL);
g_assert_cmpint (bytes, ==, read_length);
bytes = g_input_stream_skip (G_INPUT_STREAM (data->stream),
skip_length, NULL, NULL);
bytes = g_input_stream_skip (stream, skip_length, NULL, NULL);
g_assert_cmpint (bytes, ==, skip_length);
}
if (need_idle_dispatch)
g_simple_async_result_complete_in_idle (data->simple);
else
g_simple_async_result_complete (data->simple);
g_object_unref (data->simple);
g_task_return_pointer (task, line, g_free);
g_object_unref (task);
}
static void
@ -1002,14 +994,15 @@ g_data_input_stream_read_line_ready (GObject *object,
GAsyncResult *result,
gpointer user_data)
{
GDataInputStreamReadData *data = user_data;
GTask *task = user_data;
GDataInputStreamReadData *data = g_task_get_task_data (task);
GBufferedInputStream *buffer = g_task_get_source_object (task);
gssize found_pos;
gint newline_len;
if (result)
/* this is a callback. finish the async call. */
{
GBufferedInputStream *buffer = G_BUFFERED_INPUT_STREAM (data->stream);
GError *error = NULL;
gssize bytes;
@ -1020,11 +1013,12 @@ g_data_input_stream_read_line_ready (GObject *object,
if (bytes < 0)
/* stream error. */
{
g_simple_async_result_take_error (data->simple, error);
data->checked = 0;
g_task_return_error (task, error);
g_object_unref (task);
return;
}
g_data_input_stream_read_complete (data, data->checked, 0, FALSE);
g_data_input_stream_read_complete (task, data->checked, 0);
return;
}
@ -1033,20 +1027,19 @@ g_data_input_stream_read_line_ready (GObject *object,
if (data->stop_chars)
{
found_pos = scan_for_chars (data->stream,
found_pos = scan_for_chars (G_DATA_INPUT_STREAM (buffer),
&data->checked,
data->stop_chars,
data->stop_chars_len);
newline_len = 0;
}
else
found_pos = scan_for_newline (data->stream, &data->checked,
found_pos = scan_for_newline (G_DATA_INPUT_STREAM (buffer), &data->checked,
&data->last_saw_cr, &newline_len);
if (found_pos == -1)
/* didn't find a full line; need to buffer some more bytes */
{
GBufferedInputStream *buffer = G_BUFFERED_INPUT_STREAM (data->stream);
gsize size;
size = g_buffered_input_stream_get_buffer_size (buffer);
@ -1056,16 +1049,16 @@ g_data_input_stream_read_line_ready (GObject *object,
g_buffered_input_stream_set_buffer_size (buffer, size * 2);
/* try again */
g_buffered_input_stream_fill_async (buffer, -1, data->io_priority,
data->cancellable,
g_buffered_input_stream_fill_async (buffer, -1,
g_task_get_priority (task),
g_task_get_cancellable (task),
g_data_input_stream_read_line_ready,
user_data);
}
else
{
/* read the line and the EOL. no error is possible. */
g_data_input_stream_read_complete (data, found_pos,
newline_len, result == NULL);
g_data_input_stream_read_complete (task, found_pos, newline_len);
}
}
@ -1074,14 +1067,7 @@ g_data_input_stream_read_data_free (gpointer user_data)
{
GDataInputStreamReadData *data = user_data;
/* we don't hold a ref to ->simple because it keeps a ref to us.
* we are called because it is being finalized.
*/
g_free (data->stop_chars);
if (data->cancellable)
g_object_unref (data->cancellable);
g_free (data->line);
g_slice_free (GDataInputStreamReadData, data);
}
@ -1092,30 +1078,23 @@ g_data_input_stream_read_async (GDataInputStream *stream,
gint io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data,
gpointer source_tag)
gpointer user_data)
{
GDataInputStreamReadData *data;
GTask *task;
data = g_slice_new (GDataInputStreamReadData);
data->stream = stream;
if (cancellable)
g_object_ref (cancellable);
data->cancellable = cancellable;
data = g_slice_new0 (GDataInputStreamReadData);
if (stop_chars_len == -1)
stop_chars_len = strlen (stop_chars);
data->stop_chars = g_memdup (stop_chars, stop_chars_len);
data->stop_chars_len = stop_chars_len;
data->io_priority = io_priority;
data->last_saw_cr = FALSE;
data->checked = 0;
data->line = NULL;
data->simple = g_simple_async_result_new (G_OBJECT (stream), callback,
user_data, source_tag);
g_simple_async_result_set_op_res_gpointer (data->simple, data,
g_data_input_stream_read_data_free);
g_data_input_stream_read_line_ready (NULL, NULL, data);
task = g_task_new (stream, cancellable, callback, user_data);
g_task_set_task_data (task, data, g_data_input_stream_read_data_free);
g_task_set_priority (task, io_priority);
g_data_input_stream_read_line_ready (NULL, NULL, task);
}
static gchar *
@ -1124,22 +1103,17 @@ g_data_input_stream_read_finish (GDataInputStream *stream,
gsize *length,
GError **error)
{
GDataInputStreamReadData *data;
GSimpleAsyncResult *simple;
GTask *task = G_TASK (result);
gchar *line;
simple = G_SIMPLE_ASYNC_RESULT (result);
if (g_simple_async_result_propagate_error (simple, error))
return NULL;
data = g_simple_async_result_get_op_res_gpointer (simple);
line = data->line;
data->line = NULL;
line = g_task_propagate_pointer (task, error);
if (length && line)
*length = data->length;
{
GDataInputStreamReadData *data = g_task_get_task_data (task);
*length = data->length;
}
return line;
}
@ -1173,8 +1147,7 @@ g_data_input_stream_read_line_async (GDataInputStream *stream,
g_return_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable));
g_data_input_stream_read_async (stream, NULL, 0, io_priority,
cancellable, callback, user_data,
g_data_input_stream_read_line_async);
cancellable, callback, user_data);
}
/**
@ -1218,8 +1191,7 @@ g_data_input_stream_read_until_async (GDataInputStream *stream,
g_return_if_fail (stop_chars != NULL);
g_data_input_stream_read_async (stream, stop_chars, -1, io_priority,
cancellable, callback, user_data,
g_data_input_stream_read_until_async);
cancellable, callback, user_data);
}
/**
@ -1249,9 +1221,7 @@ g_data_input_stream_read_line_finish (GDataInputStream *stream,
gsize *length,
GError **error)
{
g_return_val_if_fail (
g_simple_async_result_is_valid (result, G_OBJECT (stream),
g_data_input_stream_read_line_async), NULL);
g_return_val_if_fail (g_task_is_valid (result, stream), NULL);
return g_data_input_stream_read_finish (stream, result, length, error);
}
@ -1321,9 +1291,7 @@ g_data_input_stream_read_until_finish (GDataInputStream *stream,
gsize *length,
GError **error)
{
g_return_val_if_fail (
g_simple_async_result_is_valid (result, G_OBJECT (stream),
g_data_input_stream_read_until_async), NULL);
g_return_val_if_fail (g_task_is_valid (result, stream), NULL);
return g_data_input_stream_read_finish (stream, result, length, error);
}
@ -1463,8 +1431,7 @@ g_data_input_stream_read_upto_async (GDataInputStream *stream,
g_return_if_fail (stop_chars != NULL);
g_data_input_stream_read_async (stream, stop_chars, stop_chars_len, io_priority,
cancellable, callback, user_data,
g_data_input_stream_read_upto_async);
cancellable, callback, user_data);
}
/**
@ -1494,9 +1461,7 @@ g_data_input_stream_read_upto_finish (GDataInputStream *stream,
gsize *length,
GError **error)
{
g_return_val_if_fail (
g_simple_async_result_is_valid (result, G_OBJECT (stream),
g_data_input_stream_read_upto_async), NULL);
g_return_val_if_fail (g_task_is_valid (result, stream), NULL);
return g_data_input_stream_read_finish (stream, result, length, error);
}

View File

@ -28,7 +28,6 @@
#include "gseekable.h"
#include "gcancellable.h"
#include "gasyncresult.h"
#include "gsimpleasyncresult.h"
#include "gioerror.h"
#include "gpollableinputstream.h"
@ -589,7 +588,6 @@ g_input_stream_read_async (GInputStream *stream,
gpointer user_data)
{
GInputStreamClass *class;
GSimpleAsyncResult *simple;
GError *error = NULL;
g_return_if_fail (G_IS_INPUT_STREAM (stream));
@ -597,32 +595,30 @@ g_input_stream_read_async (GInputStream *stream,
if (count == 0)
{
simple = g_simple_async_result_new (G_OBJECT (stream),
callback,
user_data,
g_input_stream_read_async);
g_simple_async_result_complete_in_idle (simple);
g_object_unref (simple);
GTask *task;
task = g_task_new (stream, cancellable, callback, user_data);
g_task_set_source_tag (task, g_input_stream_read_async);
g_task_return_int (task, 0);
g_object_unref (task);
return;
}
if (((gssize) count) < 0)
{
g_simple_async_report_error_in_idle (G_OBJECT (stream),
callback,
user_data,
G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
_("Too large count value passed to %s"),
G_STRFUNC);
g_task_report_new_error (stream, callback, user_data,
g_input_stream_read_async,
G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
_("Too large count value passed to %s"),
G_STRFUNC);
return;
}
if (!g_input_stream_set_pending (stream, &error))
{
g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
callback,
user_data,
error);
g_task_report_error (stream, callback, user_data,
g_input_stream_read_async,
error);
return;
}
@ -657,10 +653,7 @@ g_input_stream_read_finish (GInputStream *stream,
if (g_async_result_legacy_propagate_error (result, error))
return -1;
else if (g_async_result_is_tagged (result, g_input_stream_read_async))
{
/* Special case read of 0 bytes */
return 0;
}
return g_task_propagate_int (G_TASK (result), error);
class = G_INPUT_STREAM_GET_CLASS (stream);
return class->read_finish (stream, result, error);
@ -671,8 +664,8 @@ read_bytes_callback (GObject *stream,
GAsyncResult *result,
gpointer user_data)
{
GSimpleAsyncResult *simple = user_data;
guchar *buf = g_simple_async_result_get_op_res_gpointer (simple);
GTask *task = user_data;
guchar *buf = g_task_get_task_data (task);
GError *error = NULL;
gssize nread;
GBytes *bytes = NULL;
@ -682,7 +675,7 @@ read_bytes_callback (GObject *stream,
if (nread == -1)
{
g_free (buf);
g_simple_async_result_take_error (simple, error);
g_task_return_error (task, error);
}
else if (nread == 0)
{
@ -693,12 +686,9 @@ read_bytes_callback (GObject *stream,
bytes = g_bytes_new_take (buf, nread);
if (bytes)
{
g_simple_async_result_set_op_res_gpointer (simple, bytes,
(GDestroyNotify)g_bytes_unref);
}
g_simple_async_result_complete (simple);
g_object_unref (simple);
g_task_return_pointer (task, bytes, (GDestroyNotify)g_bytes_unref);
g_object_unref (task);
}
/**
@ -740,18 +730,16 @@ g_input_stream_read_bytes_async (GInputStream *stream,
GAsyncReadyCallback callback,
gpointer user_data)
{
GSimpleAsyncResult *simple;
GTask *task;
guchar *buf;
simple = g_simple_async_result_new (G_OBJECT (stream),
callback, user_data,
g_input_stream_read_bytes_async);
task = g_task_new (stream, cancellable, callback, user_data);
buf = g_malloc (count);
g_simple_async_result_set_op_res_gpointer (simple, buf, NULL);
g_task_set_task_data (task, buf, NULL);
g_input_stream_read_async (stream, buf, count,
io_priority, cancellable,
read_bytes_callback, simple);
io_priority, cancellable,
read_bytes_callback, task);
}
/**
@ -770,15 +758,10 @@ g_input_stream_read_bytes_finish (GInputStream *stream,
GAsyncResult *result,
GError **error)
{
GSimpleAsyncResult *simple;
g_return_val_if_fail (G_IS_INPUT_STREAM (stream), NULL);
g_return_val_if_fail (g_simple_async_result_is_valid (result, G_OBJECT (stream), g_input_stream_read_bytes_async), NULL);
g_return_val_if_fail (g_task_is_valid (result, stream), NULL);
simple = G_SIMPLE_ASYNC_RESULT (result);
if (g_simple_async_result_propagate_error (simple, error))
return NULL;
return g_bytes_ref (g_simple_async_result_get_op_res_gpointer (simple));
return g_task_propagate_pointer (G_TASK (result), error);
}
/**
@ -824,40 +807,36 @@ g_input_stream_skip_async (GInputStream *stream,
gpointer user_data)
{
GInputStreamClass *class;
GSimpleAsyncResult *simple;
GError *error = NULL;
g_return_if_fail (G_IS_INPUT_STREAM (stream));
if (count == 0)
{
simple = g_simple_async_result_new (G_OBJECT (stream),
callback,
user_data,
g_input_stream_skip_async);
GTask *task;
g_simple_async_result_complete_in_idle (simple);
g_object_unref (simple);
task = g_task_new (stream, cancellable, callback, user_data);
g_task_set_source_tag (task, g_input_stream_skip_async);
g_task_return_int (task, 0);
g_object_unref (task);
return;
}
if (((gssize) count) < 0)
{
g_simple_async_report_error_in_idle (G_OBJECT (stream),
callback,
user_data,
G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
_("Too large count value passed to %s"),
G_STRFUNC);
g_task_report_new_error (stream, callback, user_data,
g_input_stream_skip_async,
G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
_("Too large count value passed to %s"),
G_STRFUNC);
return;
}
if (!g_input_stream_set_pending (stream, &error))
{
g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
callback,
user_data,
error);
g_task_report_error (stream, callback, user_data,
g_input_stream_skip_async,
error);
return;
}
@ -892,10 +871,7 @@ g_input_stream_skip_finish (GInputStream *stream,
if (g_async_result_legacy_propagate_error (result, error))
return -1;
else if (g_async_result_is_tagged (result, g_input_stream_skip_async))
{
/* Special case skip of 0 bytes */
return 0;
}
return g_task_propagate_int (G_TASK (result), error);
class = G_INPUT_STREAM_GET_CLASS (stream);
return class->skip_finish (stream, result, error);
@ -929,29 +905,26 @@ g_input_stream_close_async (GInputStream *stream,
gpointer user_data)
{
GInputStreamClass *class;
GSimpleAsyncResult *simple;
GError *error = NULL;
g_return_if_fail (G_IS_INPUT_STREAM (stream));
if (stream->priv->closed)
{
simple = g_simple_async_result_new (G_OBJECT (stream),
callback,
user_data,
g_input_stream_close_async);
GTask *task;
g_simple_async_result_complete_in_idle (simple);
g_object_unref (simple);
task = g_task_new (stream, cancellable, callback, user_data);
g_task_set_source_tag (task, g_input_stream_close_async);
g_task_return_boolean (task, TRUE);
g_object_unref (task);
return;
}
if (!g_input_stream_set_pending (stream, &error))
{
g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
callback,
user_data,
error);
g_task_report_error (stream, callback, user_data,
g_input_stream_close_async,
error);
return;
}
@ -986,10 +959,7 @@ g_input_stream_close_finish (GInputStream *stream,
if (g_async_result_legacy_propagate_error (result, error))
return FALSE;
else if (g_async_result_is_tagged (result, g_input_stream_close_async))
{
/* Special case already closed */
return TRUE;
}
return g_task_propagate_boolean (G_TASK (result), error);
class = G_INPUT_STREAM_GET_CLASS (stream);
return class->close_finish (stream, result, error);
@ -1084,95 +1054,86 @@ g_input_stream_clear_pending (GInputStream *stream)
********************************************/
typedef struct {
void *buffer;
gsize count_requested;
gssize count_read;
GCancellable *cancellable;
gint io_priority;
gboolean need_idle;
void *buffer;
gsize count;
} ReadData;
static void
free_read_data (ReadData *op)
{
if (op->cancellable)
g_object_unref (op->cancellable);
g_slice_free (ReadData, op);
}
static void
read_async_thread (GSimpleAsyncResult *res,
GObject *object,
GCancellable *cancellable)
read_async_thread (GTask *task,
gpointer source_object,
gpointer task_data,
GCancellable *cancellable)
{
ReadData *op;
GInputStream *stream = source_object;
ReadData *op = task_data;
GInputStreamClass *class;
GError *error = NULL;
gssize nread;
op = g_simple_async_result_get_op_res_gpointer (res);
class = G_INPUT_STREAM_GET_CLASS (stream);
class = G_INPUT_STREAM_GET_CLASS (object);
op->count_read = class->read_fn (G_INPUT_STREAM (object),
op->buffer, op->count_requested,
cancellable, &error);
if (op->count_read == -1)
g_simple_async_result_take_error (res, error);
nread = class->read_fn (stream,
op->buffer, op->count,
g_task_get_cancellable (task),
&error);
if (nread == -1)
g_task_return_error (task, error);
else
g_task_return_int (task, nread);
}
static void read_async_pollable (GPollableInputStream *stream,
GSimpleAsyncResult *result);
GTask *task);
static gboolean
read_async_pollable_ready (GPollableInputStream *stream,
gpointer user_data)
{
GSimpleAsyncResult *result = user_data;
GTask *task = user_data;
read_async_pollable (stream, result);
read_async_pollable (stream, task);
return FALSE;
}
static void
read_async_pollable (GPollableInputStream *stream,
GSimpleAsyncResult *result)
GTask *task)
{
ReadData *op = g_task_get_task_data (task);
GError *error = NULL;
ReadData *op = g_simple_async_result_get_op_res_gpointer (result);
gssize nread;
if (g_cancellable_set_error_if_cancelled (op->cancellable, &error))
op->count_read = -1;
else
{
op->count_read = G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
read_nonblocking (stream, op->buffer, op->count_requested, &error);
}
if (g_task_return_error_if_cancelled (task))
return;
nread = G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
read_nonblocking (stream, op->buffer, op->count, &error);
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
{
GSource *source;
g_error_free (error);
op->need_idle = FALSE;
source = g_pollable_input_stream_create_source (stream, op->cancellable);
g_source_set_callback (source,
(GSourceFunc) read_async_pollable_ready,
g_object_ref (result), g_object_unref);
g_source_set_priority (source, op->io_priority);
g_source_attach (source, g_main_context_get_thread_default ());
source = g_pollable_input_stream_create_source (stream,
g_task_get_cancellable (task));
g_task_attach_source (task, source,
(GSourceFunc) read_async_pollable_ready);
g_source_unref (source);
return;
}
if (op->count_read == -1)
g_simple_async_result_take_error (result, error);
if (op->need_idle)
g_simple_async_result_complete_in_idle (result);
if (nread == -1)
g_task_return_error (task, error);
else
g_simple_async_result_complete (result);
g_task_return_int (task, nread);
/* g_input_stream_real_read_async() unrefs task */
}
static void
@ -1184,24 +1145,22 @@ g_input_stream_real_read_async (GInputStream *stream,
GAsyncReadyCallback callback,
gpointer user_data)
{
GSimpleAsyncResult *res;
GTask *task;
ReadData *op;
op = g_slice_new0 (ReadData);
res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_input_stream_real_read_async);
g_simple_async_result_set_op_res_gpointer (res, op, (GDestroyNotify) free_read_data);
task = g_task_new (stream, cancellable, callback, user_data);
g_task_set_task_data (task, op, (GDestroyNotify) free_read_data);
g_task_set_priority (task, io_priority);
op->buffer = buffer;
op->count_requested = count;
op->cancellable = cancellable ? g_object_ref (cancellable) : NULL;
op->io_priority = io_priority;
op->need_idle = TRUE;
op->count = count;
if (G_IS_POLLABLE_INPUT_STREAM (stream) &&
g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (stream)))
read_async_pollable (G_POLLABLE_INPUT_STREAM (stream), res);
read_async_pollable (G_POLLABLE_INPUT_STREAM (stream), task);
else
g_simple_async_result_run_in_thread (res, read_async_thread, io_priority, cancellable);
g_object_unref (res);
g_task_run_in_thread (task, read_async_thread);
g_object_unref (task);
}
static gssize
@ -1209,50 +1168,38 @@ g_input_stream_real_read_finish (GInputStream *stream,
GAsyncResult *result,
GError **error)
{
GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
ReadData *op;
g_return_val_if_fail (g_task_is_valid (result, stream), -1);
g_warn_if_fail (g_simple_async_result_get_source_tag (simple) ==
g_input_stream_real_read_async);
if (g_simple_async_result_propagate_error (simple, error))
return -1;
op = g_simple_async_result_get_op_res_gpointer (simple);
return op->count_read;
return g_task_propagate_int (G_TASK (result), error);
}
typedef struct {
gsize count_requested;
gssize count_skipped;
} SkipData;
static void
skip_async_thread (GSimpleAsyncResult *res,
GObject *object,
GCancellable *cancellable)
skip_async_thread (GTask *task,
gpointer source_object,
gpointer task_data,
GCancellable *cancellable)
{
SkipData *op;
GInputStream *stream = source_object;
gsize count = GPOINTER_TO_SIZE (task_data);
GInputStreamClass *class;
GError *error = NULL;
gssize ret;
class = G_INPUT_STREAM_GET_CLASS (object);
op = g_simple_async_result_get_op_res_gpointer (res);
op->count_skipped = class->skip (G_INPUT_STREAM (object),
op->count_requested,
cancellable, &error);
if (op->count_skipped == -1)
g_simple_async_result_take_error (res, error);
class = G_INPUT_STREAM_GET_CLASS (stream);
ret = class->skip (stream, count,
g_task_get_cancellable (task),
&error);
if (ret == -1)
g_task_return_error (task, error);
else
g_task_return_int (task, ret);
}
typedef struct {
char buffer[8192];
gsize count;
gsize count_skipped;
int io_prio;
GCancellable *cancellable;
gpointer user_data;
GAsyncReadyCallback callback;
} SkipFallbackAsyncData;
@ -1263,9 +1210,8 @@ skip_callback_wrapper (GObject *source_object,
gpointer user_data)
{
GInputStreamClass *class;
SkipFallbackAsyncData *data = user_data;
SkipData *op;
GSimpleAsyncResult *simple;
GTask *task = user_data;
SkipFallbackAsyncData *data = g_task_get_task_data (task);
GError *error = NULL;
gssize ret;
@ -1279,35 +1225,28 @@ skip_callback_wrapper (GObject *source_object,
if (data->count > 0)
{
class = G_INPUT_STREAM_GET_CLASS (source_object);
class->read_async (G_INPUT_STREAM (source_object), data->buffer, MIN (8192, data->count), data->io_prio, data->cancellable,
skip_callback_wrapper, data);
class->read_async (G_INPUT_STREAM (source_object),
data->buffer, MIN (8192, data->count),
g_task_get_priority (task),
g_task_get_cancellable (task),
skip_callback_wrapper, data);
return;
}
}
op = g_new0 (SkipData, 1);
op->count_skipped = data->count_skipped;
simple = g_simple_async_result_new (source_object,
data->callback, data->user_data,
g_input_stream_real_skip_async);
g_simple_async_result_set_op_res_gpointer (simple, op, g_free);
if (ret == -1)
if (ret == -1 &&
g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) &&
data->count_skipped)
{
if (data->count_skipped &&
g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
/* No error, return partial read */
g_error_free (error);
else
g_simple_async_result_take_error (simple, error);
/* No error, return partial read */
g_clear_error (&error);
}
/* Complete immediately, not in idle, since we're already in a mainloop callout */
g_simple_async_result_complete (simple);
g_object_unref (simple);
g_free (data);
if (error)
g_task_return_error (task, error);
else
g_task_return_int (task, data->count_skipped);
g_object_unref (task);
}
static void
@ -1319,28 +1258,23 @@ g_input_stream_real_skip_async (GInputStream *stream,
gpointer user_data)
{
GInputStreamClass *class;
SkipData *op;
SkipFallbackAsyncData *data;
GSimpleAsyncResult *res;
GTask *task;
class = G_INPUT_STREAM_GET_CLASS (stream);
task = g_task_new (stream, cancellable, callback, user_data);
g_task_set_priority (task, io_priority);
if (class->read_async == g_input_stream_real_read_async)
{
/* Read is thread-using async fallback.
* Make skip use threads too, so that we can use a possible sync skip
* implementation. */
op = g_new0 (SkipData, 1);
g_task_set_task_data (task, GSIZE_TO_POINTER (count), NULL);
res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data,
g_input_stream_real_skip_async);
g_simple_async_result_set_op_res_gpointer (res, op, g_free);
op->count_requested = count;
g_simple_async_result_run_in_thread (res, skip_async_thread, io_priority, cancellable);
g_object_unref (res);
g_task_run_in_thread (task, skip_async_thread);
g_object_unref (task);
}
else
{
@ -1350,10 +1284,10 @@ g_input_stream_real_skip_async (GInputStream *stream,
data = g_new (SkipFallbackAsyncData, 1);
data->count = count;
data->count_skipped = 0;
data->io_prio = io_priority;
data->cancellable = cancellable;
data->callback = callback;
data->user_data = user_data;
g_task_set_task_data (task, data, g_free);
g_task_set_check_cancellable (task, FALSE);
class->read_async (stream, data->buffer, MIN (8192, count), io_priority, cancellable,
skip_callback_wrapper, data);
}
@ -1365,39 +1299,36 @@ g_input_stream_real_skip_finish (GInputStream *stream,
GAsyncResult *result,
GError **error)
{
GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
SkipData *op;
g_return_val_if_fail (g_task_is_valid (result, stream), -1);
g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_input_stream_real_skip_async);
if (g_simple_async_result_propagate_error (simple, error))
return -1;
op = g_simple_async_result_get_op_res_gpointer (simple);
return op->count_skipped;
return g_task_propagate_int (G_TASK (result), error);
}
static void
close_async_thread (GSimpleAsyncResult *res,
GObject *object,
GCancellable *cancellable)
close_async_thread (GTask *task,
gpointer source_object,
gpointer task_data,
GCancellable *cancellable)
{
GInputStream *stream = source_object;
GInputStreamClass *class;
GError *error = NULL;
gboolean result;
/* Auto handling of cancelation disabled, and ignore
cancellation, since we want to close things anyway, although
possibly in a quick-n-dirty way. At least we never want to leak
open handles */
class = G_INPUT_STREAM_GET_CLASS (object);
class = G_INPUT_STREAM_GET_CLASS (stream);
if (class->close_fn)
{
result = class->close_fn (G_INPUT_STREAM (object), cancellable, &error);
result = class->close_fn (stream,
g_task_get_cancellable (task),
&error);
if (!result)
g_simple_async_result_take_error (res, error);
{
g_task_return_error (task, error);
return;
}
}
g_task_return_boolean (task, TRUE);
}
static void
@ -1407,20 +1338,14 @@ g_input_stream_real_close_async (GInputStream *stream,
GAsyncReadyCallback callback,
gpointer user_data)
{
GSimpleAsyncResult *res;
GTask *task;
res = g_simple_async_result_new (G_OBJECT (stream),
callback,
user_data,
g_input_stream_real_close_async);
task = g_task_new (stream, cancellable, callback, user_data);
g_task_set_check_cancellable (task, FALSE);
g_task_set_priority (task, io_priority);
g_simple_async_result_set_handle_cancellation (res, FALSE);
g_simple_async_result_run_in_thread (res,
close_async_thread,
io_priority,
cancellable);
g_object_unref (res);
g_task_run_in_thread (task, close_async_thread);
g_object_unref (task);
}
static gboolean
@ -1428,12 +1353,7 @@ g_input_stream_real_close_finish (GInputStream *stream,
GAsyncResult *result,
GError **error)
{
GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_input_stream_real_close_async);
if (g_simple_async_result_propagate_error (simple, error))
return FALSE;
return TRUE;
return g_task_propagate_boolean (G_TASK (result), error);
}

View File

@ -27,9 +27,8 @@
#include "glibintl.h"
#include "giostream.h"
#include <gio/gsimpleasyncresult.h>
#include <gio/gasyncresult.h>
#include "gasyncresult.h"
#include "gtask.h"
G_DEFINE_ABSTRACT_TYPE (GIOStream, g_io_stream, G_TYPE_OBJECT);
@ -465,28 +464,26 @@ g_io_stream_close_async (GIOStream *stream,
gpointer user_data)
{
GIOStreamClass *class;
GSimpleAsyncResult *simple;
GError *error = NULL;
g_return_if_fail (G_IS_IO_STREAM (stream));
if (stream->priv->closed)
{
simple = g_simple_async_result_new (G_OBJECT (stream),
callback,
user_data,
g_io_stream_close_async);
g_simple_async_result_complete_in_idle (simple);
g_object_unref (simple);
GTask *task;
task = g_task_new (stream, cancellable, callback, user_data);
g_task_set_source_tag (task, g_io_stream_close_async);
g_task_return_boolean (task, TRUE);
g_object_unref (task);
return;
}
if (!g_io_stream_set_pending (stream, &error))
{
g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
callback,
user_data,
error);
g_task_report_error (stream, callback, user_data,
g_io_stream_close_async,
error);
return;
}
@ -523,10 +520,7 @@ g_io_stream_close_finish (GIOStream *stream,
if (g_async_result_legacy_propagate_error (result, error))
return FALSE;
else if (g_async_result_is_tagged (result, g_io_stream_close_async))
{
/* Special case already closed */
return TRUE;
}
return g_task_propagate_boolean (G_TASK (result), error);
class = G_IO_STREAM_GET_CLASS (stream);
return class->close_finish (stream, result, error);
@ -534,25 +528,30 @@ g_io_stream_close_finish (GIOStream *stream,
static void
close_async_thread (GSimpleAsyncResult *res,
GObject *object,
GCancellable *cancellable)
close_async_thread (GTask *task,
gpointer source_object,
gpointer task_data,
GCancellable *cancellable)
{
GIOStream *stream = source_object;
GIOStreamClass *class;
GError *error = NULL;
gboolean result;
/* Auto handling of cancelation disabled, and ignore cancellation,
* since we want to close things anyway, although possibly in a
* quick-n-dirty way. At least we never want to leak open handles
*/
class = G_IO_STREAM_GET_CLASS (object);
class = G_IO_STREAM_GET_CLASS (stream);
if (class->close_fn)
{
result = class->close_fn (G_IO_STREAM (object), cancellable, &error);
result = class->close_fn (stream,
g_task_get_cancellable (task),
&error);
if (!result)
g_simple_async_result_take_error (res, error);
{
g_task_return_error (task, error);
return;
}
}
g_task_return_boolean (task, TRUE);
}
static void
@ -562,20 +561,14 @@ g_io_stream_real_close_async (GIOStream *stream,
GAsyncReadyCallback callback,
gpointer user_data)
{
GSimpleAsyncResult *res;
GTask *task;
res = g_simple_async_result_new (G_OBJECT (stream),
callback,
user_data,
g_io_stream_real_close_async);
task = g_task_new (stream, cancellable, callback, user_data);
g_task_set_check_cancellable (task, FALSE);
g_task_set_priority (task, io_priority);
g_simple_async_result_set_handle_cancellation (res, FALSE);
g_simple_async_result_run_in_thread (res,
close_async_thread,
io_priority,
cancellable);
g_object_unref (res);
g_task_run_in_thread (task, close_async_thread);
g_object_unref (task);
}
static gboolean
@ -583,15 +576,9 @@ g_io_stream_real_close_finish (GIOStream *stream,
GAsyncResult *result,
GError **error)
{
GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
g_warn_if_fail (g_simple_async_result_get_source_tag (simple) ==
g_io_stream_real_close_async);
if (g_simple_async_result_propagate_error (simple, error))
return FALSE;
return TRUE;
return g_task_propagate_boolean (G_TASK (result), error);
}
typedef struct
@ -622,16 +609,20 @@ splice_context_free (SpliceContext *ctx)
}
static void
splice_complete (GSimpleAsyncResult *simple,
SpliceContext *ctx)
splice_complete (GTask *task,
SpliceContext *ctx)
{
if (ctx->cancelled_id != 0)
g_cancellable_disconnect (ctx->cancellable, ctx->cancelled_id);
ctx->cancelled_id = 0;
if (ctx->error != NULL)
g_simple_async_result_set_from_error (simple, ctx->error);
g_simple_async_result_complete (simple);
{
g_task_return_error (task, ctx->error);
ctx->error = NULL;
}
else
g_task_return_boolean (task, TRUE);
}
static void
@ -639,13 +630,12 @@ splice_close_cb (GObject *iostream,
GAsyncResult *res,
gpointer user_data)
{
GSimpleAsyncResult *simple = user_data;
SpliceContext *ctx;
GTask *task = user_data;
SpliceContext *ctx = g_task_get_task_data (task);
GError *error = NULL;
g_io_stream_close_finish (G_IO_STREAM (iostream), res, &error);
ctx = g_simple_async_result_get_op_res_gpointer (simple);
ctx->completed++;
/* Keep the first error that occurred */
@ -656,9 +646,9 @@ splice_close_cb (GObject *iostream,
/* If all operations are done, complete now */
if (ctx->completed == 4)
splice_complete (simple, ctx);
splice_complete (task, ctx);
g_object_unref (simple);
g_object_unref (task);
}
static void
@ -666,13 +656,12 @@ splice_cb (GObject *ostream,
GAsyncResult *res,
gpointer user_data)
{
GSimpleAsyncResult *simple = user_data;
SpliceContext *ctx;
GTask *task = user_data;
SpliceContext *ctx = g_task_get_task_data (task);
GError *error = NULL;
g_output_stream_splice_finish (G_OUTPUT_STREAM (ostream), res, &error);
ctx = g_simple_async_result_get_op_res_gpointer (simple);
ctx->completed++;
/* ignore cancellation error if it was not requested by the user */
@ -706,32 +695,40 @@ splice_cb (GObject *ostream,
/* Close the IO streams if needed */
if ((ctx->flags & G_IO_STREAM_SPLICE_CLOSE_STREAM1) != 0)
g_io_stream_close_async (ctx->stream1, ctx->io_priority,
ctx->op1_cancellable, splice_close_cb, g_object_ref (simple));
{
g_io_stream_close_async (ctx->stream1,
g_task_get_priority (task),
ctx->op1_cancellable,
splice_close_cb, g_object_ref (task));
}
else
ctx->completed++;
if ((ctx->flags & G_IO_STREAM_SPLICE_CLOSE_STREAM2) != 0)
g_io_stream_close_async (ctx->stream2, ctx->io_priority,
ctx->op2_cancellable, splice_close_cb, g_object_ref (simple));
{
g_io_stream_close_async (ctx->stream2,
g_task_get_priority (task),
ctx->op2_cancellable,
splice_close_cb, g_object_ref (task));
}
else
ctx->completed++;
/* If all operations are done, complete now */
if (ctx->completed == 4)
splice_complete (simple, ctx);
splice_complete (task, ctx);
}
g_object_unref (simple);
g_object_unref (task);
}
static void
splice_cancelled_cb (GCancellable *cancellable,
GSimpleAsyncResult *simple)
splice_cancelled_cb (GCancellable *cancellable,
GTask *task)
{
SpliceContext *ctx;
ctx = g_simple_async_result_get_op_res_gpointer (simple);
ctx = g_task_get_task_data (task);
g_cancellable_cancel (ctx->op1_cancellable);
g_cancellable_cancel (ctx->op2_cancellable);
}
@ -765,16 +762,17 @@ g_io_stream_splice_async (GIOStream *stream1,
GAsyncReadyCallback callback,
gpointer user_data)
{
GSimpleAsyncResult *simple;
GTask *task;
SpliceContext *ctx;
GInputStream *istream;
GOutputStream *ostream;
if (cancellable != NULL && g_cancellable_is_cancelled (cancellable))
{
g_simple_async_report_error_in_idle (NULL, callback,
user_data, G_IO_ERROR, G_IO_ERROR_CANCELLED,
"Operation has been cancelled");
g_task_report_new_error (NULL, callback, user_data,
g_io_stream_splice_async,
G_IO_ERROR, G_IO_ERROR_CANCELLED,
"Operation has been cancelled");
return;
}
@ -782,21 +780,18 @@ g_io_stream_splice_async (GIOStream *stream1,
ctx->stream1 = g_object_ref (stream1);
ctx->stream2 = g_object_ref (stream2);
ctx->flags = flags;
ctx->io_priority = io_priority;
ctx->op1_cancellable = g_cancellable_new ();
ctx->op2_cancellable = g_cancellable_new ();
ctx->completed = 0;
simple = g_simple_async_result_new (NULL, callback, user_data,
g_io_stream_splice_finish);
g_simple_async_result_set_op_res_gpointer (simple, ctx,
(GDestroyNotify) splice_context_free);
task = g_task_new (NULL, cancellable, callback, user_data);
g_task_set_task_data (task, ctx, (GDestroyNotify) splice_context_free);
if (cancellable != NULL)
{
ctx->cancellable = g_object_ref (cancellable);
ctx->cancelled_id = g_cancellable_connect (cancellable,
G_CALLBACK (splice_cancelled_cb), g_object_ref (simple),
G_CALLBACK (splice_cancelled_cb), g_object_ref (task),
g_object_unref);
}
@ -804,15 +799,15 @@ g_io_stream_splice_async (GIOStream *stream1,
ostream = g_io_stream_get_output_stream (stream2);
g_output_stream_splice_async (ostream, istream, G_OUTPUT_STREAM_SPLICE_NONE,
io_priority, ctx->op1_cancellable, splice_cb,
g_object_ref (simple));
g_object_ref (task));
istream = g_io_stream_get_input_stream (stream2);
ostream = g_io_stream_get_output_stream (stream1);
g_output_stream_splice_async (ostream, istream, G_OUTPUT_STREAM_SPLICE_NONE,
io_priority, ctx->op2_cancellable, splice_cb,
g_object_ref (simple));
g_object_ref (task));
g_object_unref (simple);
g_object_unref (task);
}
/**
@ -831,17 +826,7 @@ gboolean
g_io_stream_splice_finish (GAsyncResult *result,
GError **error)
{
GSimpleAsyncResult *simple;
g_return_val_if_fail (g_task_is_valid (result, NULL), FALSE);
g_return_val_if_fail (G_IS_SIMPLE_ASYNC_RESULT (result), FALSE);
simple = G_SIMPLE_ASYNC_RESULT (result);
if (g_simple_async_result_propagate_error (simple, error))
return FALSE;
g_return_val_if_fail (g_simple_async_result_is_valid (result, NULL,
g_io_stream_splice_finish), FALSE);
return TRUE;
return g_task_propagate_boolean (G_TASK (result), error);
}

View File

@ -26,7 +26,7 @@
#include "ginputstream.h"
#include "gseekable.h"
#include "string.h"
#include "gsimpleasyncresult.h"
#include "gtask.h"
#include "gioerror.h"
#include "glibintl.h"
@ -377,17 +377,13 @@ g_memory_input_stream_skip_async (GInputStream *stream,
GAsyncReadyCallback callback,
gpointer user_data)
{
GSimpleAsyncResult *simple;
GTask *task;
gssize nskipped;
nskipped = g_input_stream_skip (stream, count, cancellable, NULL);
simple = g_simple_async_result_new (G_OBJECT (stream),
callback,
user_data,
g_memory_input_stream_skip_async);
g_simple_async_result_set_op_res_gssize (simple, nskipped);
g_simple_async_result_complete_in_idle (simple);
g_object_unref (simple);
task = g_task_new (stream, cancellable, callback, user_data);
g_task_return_int (task, nskipped);
g_object_unref (task);
}
static gssize
@ -395,14 +391,9 @@ g_memory_input_stream_skip_finish (GInputStream *stream,
GAsyncResult *result,
GError **error)
{
GSimpleAsyncResult *simple;
gssize nskipped;
g_return_val_if_fail (g_task_is_valid (result, stream), -1);
simple = G_SIMPLE_ASYNC_RESULT (result);
g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_memory_input_stream_skip_async);
nskipped = g_simple_async_result_get_op_res_gssize (simple);
return nskipped;
return g_task_propagate_int (G_TASK (result), error);
}
static void
@ -412,14 +403,11 @@ g_memory_input_stream_close_async (GInputStream *stream,
GAsyncReadyCallback callback,
gpointer user_data)
{
GSimpleAsyncResult *simple;
GTask *task;
simple = g_simple_async_result_new (G_OBJECT (stream),
callback,
user_data,
g_memory_input_stream_close_async);
g_simple_async_result_complete_in_idle (simple);
g_object_unref (simple);
task = g_task_new (stream, cancellable, callback, user_data);
g_task_return_boolean (task, TRUE);
g_object_unref (task);
}
static gboolean

View File

@ -27,7 +27,7 @@
#include "goutputstream.h"
#include "gpollableoutputstream.h"
#include "gseekable.h"
#include "gsimpleasyncresult.h"
#include "gtask.h"
#include "gioerror.h"
#include "string.h"
#include "glibintl.h"
@ -665,19 +665,15 @@ g_memory_output_stream_close_async (GOutputStream *stream,
GAsyncReadyCallback callback,
gpointer data)
{
GSimpleAsyncResult *simple;
simple = g_simple_async_result_new (G_OBJECT (stream),
callback,
data,
g_memory_output_stream_close_async);
GTask *task;
task = g_task_new (stream, cancellable, callback, data);
/* will always return TRUE */
g_memory_output_stream_close (stream, cancellable, NULL);
g_simple_async_result_complete_in_idle (simple);
g_object_unref (simple);
g_task_return_boolean (task, TRUE);
g_object_unref (task);
}
static gboolean
@ -685,14 +681,9 @@ g_memory_output_stream_close_finish (GOutputStream *stream,
GAsyncResult *result,
GError **error)
{
GSimpleAsyncResult *simple;
g_return_val_if_fail (g_task_is_valid (result, stream), -1);
simple = G_SIMPLE_ASYNC_RESULT (result);
g_warn_if_fail (g_simple_async_result_get_source_tag (simple) ==
g_memory_output_stream_close_async);
return TRUE;
return g_task_propagate_boolean (G_TASK (result), error);
}
static goffset

File diff suppressed because it is too large Load Diff

View File

@ -425,20 +425,16 @@ g_unix_input_stream_close (GInputStream *stream,
if (!unix_stream->priv->close_fd)
return TRUE;
while (1)
/* This might block during the close. Doesn't seem to be a way to avoid it though. */
res = close (unix_stream->priv->fd);
if (res == -1)
{
/* This might block during the close. Doesn't seem to be a way to avoid it though. */
res = close (unix_stream->priv->fd);
if (res == -1)
{
int errsv = errno;
int errsv = errno;
g_set_error (error, G_IO_ERROR,
g_io_error_from_errno (errsv),
_("Error closing file descriptor: %s"),
g_strerror (errsv));
}
break;
g_set_error (error, G_IO_ERROR,
g_io_error_from_errno (errsv),
_("Error closing file descriptor: %s"),
g_strerror (errsv));
}
return res != -1;
@ -466,71 +462,6 @@ g_unix_input_stream_skip_finish (GInputStream *stream,
/* TODO: Not implemented */
}
typedef struct {
GInputStream *stream;
GAsyncReadyCallback callback;
gpointer user_data;
} CloseAsyncData;
static void
close_async_data_free (gpointer _data)
{
CloseAsyncData *data = _data;
g_free (data);
}
static gboolean
close_async_cb (CloseAsyncData *data)
{
GUnixInputStream *unix_stream;
GSimpleAsyncResult *simple;
GError *error = NULL;
gboolean result;
int res;
unix_stream = G_UNIX_INPUT_STREAM (data->stream);
if (!unix_stream->priv->close_fd)
{
result = TRUE;
goto out;
}
while (1)
{
res = close (unix_stream->priv->fd);
if (res == -1)
{
int errsv = errno;
g_set_error (&error, G_IO_ERROR,
g_io_error_from_errno (errsv),
_("Error closing file descriptor: %s"),
g_strerror (errsv));
}
break;
}
result = res != -1;
out:
simple = g_simple_async_result_new (G_OBJECT (data->stream),
data->callback,
data->user_data,
g_unix_input_stream_close_async);
if (!result)
g_simple_async_result_take_error (simple, error);
/* Complete immediately, not in idle, since we're already in a mainloop callout */
g_simple_async_result_complete (simple);
g_object_unref (simple);
return FALSE;
}
static void
g_unix_input_stream_close_async (GInputStream *stream,
int io_priority,
@ -538,19 +469,17 @@ g_unix_input_stream_close_async (GInputStream *stream,
GAsyncReadyCallback callback,
gpointer user_data)
{
GSource *idle;
CloseAsyncData *data;
GTask *task;
GError *error = NULL;
data = g_new0 (CloseAsyncData, 1);
task = g_task_new (stream, cancellable, callback, user_data);
g_task_set_priority (task, io_priority);
data->stream = stream;
data->callback = callback;
data->user_data = user_data;
idle = g_idle_source_new ();
g_source_set_callback (idle, (GSourceFunc)close_async_cb, data, close_async_data_free);
g_source_attach (idle, g_main_context_get_thread_default ());
g_source_unref (idle);
if (g_unix_input_stream_close (stream, cancellable, &error))
g_task_return_boolean (task, TRUE);
else
g_task_return_error (task, error);
g_object_unref (task);
}
static gboolean
@ -558,8 +487,9 @@ g_unix_input_stream_close_finish (GInputStream *stream,
GAsyncResult *result,
GError **error)
{
/* Failures handled in generic close_finish code */
return TRUE;
g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
return g_task_propagate_boolean (G_TASK (result), error);
}
static gboolean

View File

@ -411,101 +411,39 @@ g_unix_output_stream_close (GOutputStream *stream,
if (!unix_stream->priv->close_fd)
return TRUE;
while (1)
/* This might block during the close. Doesn't seem to be a way to avoid it though. */
res = close (unix_stream->priv->fd);
if (res == -1)
{
/* This might block during the close. Doesn't seem to be a way to avoid it though. */
res = close (unix_stream->priv->fd);
if (res == -1)
{
int errsv = errno;
int errsv = errno;
g_set_error (error, G_IO_ERROR,
g_io_error_from_errno (errsv),
_("Error closing file descriptor: %s"),
g_strerror (errsv));
}
break;
g_set_error (error, G_IO_ERROR,
g_io_error_from_errno (errsv),
_("Error closing file descriptor: %s"),
g_strerror (errsv));
}
return res != -1;
}
typedef struct {
GOutputStream *stream;
GAsyncReadyCallback callback;
gpointer user_data;
} CloseAsyncData;
static gboolean
close_async_cb (CloseAsyncData *data)
{
GUnixOutputStream *unix_stream;
GSimpleAsyncResult *simple;
GError *error = NULL;
gboolean result;
int res;
unix_stream = G_UNIX_OUTPUT_STREAM (data->stream);
if (!unix_stream->priv->close_fd)
{
result = TRUE;
goto out;
}
while (1)
{
res = close (unix_stream->priv->fd);
if (res == -1)
{
int errsv = errno;
g_set_error (&error, G_IO_ERROR,
g_io_error_from_errno (errsv),
_("Error closing file descriptor: %s"),
g_strerror (errsv));
}
break;
}
result = res != -1;
out:
simple = g_simple_async_result_new (G_OBJECT (data->stream),
data->callback,
data->user_data,
g_unix_output_stream_close_async);
if (!result)
g_simple_async_result_take_error (simple, error);
/* Complete immediately, not in idle, since we're already in a mainloop callout */
g_simple_async_result_complete (simple);
g_object_unref (simple);
return FALSE;
}
static void
g_unix_output_stream_close_async (GOutputStream *stream,
g_unix_output_stream_close_async (GOutputStream *stream,
int io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
GSource *idle;
CloseAsyncData *data;
GTask *task;
GError *error = NULL;
data = g_new0 (CloseAsyncData, 1);
task = g_task_new (stream, cancellable, callback, user_data);
g_task_set_priority (task, io_priority);
data->stream = stream;
data->callback = callback;
data->user_data = user_data;
idle = g_idle_source_new ();
g_source_set_callback (idle, (GSourceFunc)close_async_cb, data, g_free);
g_source_attach (idle, g_main_context_get_thread_default ());
g_source_unref (idle);
if (g_unix_output_stream_close (stream, cancellable, &error))
g_task_return_boolean (task, TRUE);
else
g_task_return_error (task, error);
g_object_unref (task);
}
static gboolean
@ -513,8 +451,9 @@ g_unix_output_stream_close_finish (GOutputStream *stream,
GAsyncResult *result,
GError **error)
{
/* Failures handled in generic close_finish code */
return TRUE;
g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
return g_task_propagate_boolean (G_TASK (result), error);
}
static gboolean