From 00ee06e6a332d1415baf5533e34f05a83d64cb02 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Sat, 4 Feb 2012 16:46:29 -0500 Subject: [PATCH] gio: use GPollable* to implement fallback read_async/write_async If a GInputStream does not provide a read_async() implementation, but does implement GPollableInputStream, then instead of doing read-synchronously-in-a-thread, just use g_pollable_input_stream_read_nonblocking() and g_pollable_input_stream_create_source() to implement an async read in the same thread. Similarly for GOutputStream. Remove a bunch of existing read_async()/write_async() implementations that are basically equivalent to the new fallback method. https://bugzilla.gnome.org/show_bug.cgi?id=673997 --- gio/gbufferedinputstream.c | 195 ------------------------------------ gio/gbufferedoutputstream.c | 108 -------------------- gio/ginputstream.c | 71 ++++++++++++- gio/gmemoryinputstream.c | 57 ----------- gio/gmemoryoutputstream.c | 62 ------------ gio/goutputstream.c | 66 +++++++++++- gio/gsocketinputstream.c | 91 ----------------- gio/gsocketoutputstream.c | 117 +++------------------- gio/gunixinputstream.c | 143 ++------------------------ gio/gunixoutputstream.c | 135 ------------------------- 10 files changed, 153 insertions(+), 892 deletions(-) diff --git a/gio/gbufferedinputstream.c b/gio/gbufferedinputstream.c index dbe96c770..e62a3dedf 100644 --- a/gio/gbufferedinputstream.c +++ b/gio/gbufferedinputstream.c @@ -100,16 +100,6 @@ static gssize g_buffered_input_stream_read (GInputStream *s gsize count, GCancellable *cancellable, GError **error); -static void g_buffered_input_stream_read_async (GInputStream *stream, - void *buffer, - gsize count, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer user_data); -static gssize g_buffered_input_stream_read_finish (GInputStream *stream, - GAsyncResult *result, - GError **error); static gssize g_buffered_input_stream_real_fill (GBufferedInputStream *stream, gssize count, GCancellable *cancellable, @@ -150,8 +140,6 @@ g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass) istream_class->skip_async = g_buffered_input_stream_skip_async; istream_class->skip_finish = g_buffered_input_stream_skip_finish; istream_class->read_fn = g_buffered_input_stream_read; - istream_class->read_async = g_buffered_input_stream_read_async; - istream_class->read_finish = g_buffered_input_stream_read_finish; bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass); bstream_class->fill = g_buffered_input_stream_real_fill; @@ -1017,189 +1005,6 @@ g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream, return nread; } -typedef struct -{ - gssize bytes_read; - gssize count; - void *buffer; -} ReadAsyncData; - -static void -free_read_async_data (gpointer _data) -{ - ReadAsyncData *data = _data; - g_slice_free (ReadAsyncData, data); -} - -static void -large_read_callback (GObject *source_object, - GAsyncResult *result, - gpointer user_data) -{ - GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data); - ReadAsyncData *data; - GError *error; - gssize nread; - - data = g_simple_async_result_get_op_res_gpointer (simple); - - error = NULL; - nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object), - result, &error); - - /* Only report the error if we've not already read some data */ - if (nread < 0 && data->bytes_read == 0) - g_simple_async_result_take_error (simple, error); - else if (error) - g_error_free (error); - - if (nread > 0) - data->bytes_read += nread; - - /* Complete immediately, not in idle, since we're already - * in a mainloop callout - */ - g_simple_async_result_complete (simple); - g_object_unref (simple); -} - -static void -read_fill_buffer_callback (GObject *source_object, - GAsyncResult *result, - gpointer user_data) -{ - GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data); - GBufferedInputStream *bstream; - GBufferedInputStreamPrivate *priv; - ReadAsyncData *data; - GError *error; - gssize nread; - gsize available; - - bstream = G_BUFFERED_INPUT_STREAM (source_object); - priv = bstream->priv; - - data = g_simple_async_result_get_op_res_gpointer (simple); - - error = NULL; - nread = g_buffered_input_stream_fill_finish (bstream, - result, &error); - - if (nread < 0 && data->bytes_read == 0) - g_simple_async_result_take_error (simple, error); - else if (error) - g_error_free (error); - - if (nread > 0) - { - available = priv->end - priv->pos; - data->count = MIN (data->count, available); - - memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count); - data->bytes_read += data->count; - priv->pos += data->count; - } - - /* Complete immediately, not in idle, since we're already - * in a mainloop callout - */ - g_simple_async_result_complete (simple); - g_object_unref (simple); -} - -static void -g_buffered_input_stream_read_async (GInputStream *stream, - void *buffer, - gsize count, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer user_data) -{ - GBufferedInputStream *bstream; - GBufferedInputStreamPrivate *priv; - GBufferedInputStreamClass *class; - GInputStream *base_stream; - gsize available; - GSimpleAsyncResult *simple; - ReadAsyncData *data; - - bstream = G_BUFFERED_INPUT_STREAM (stream); - priv = bstream->priv; - - data = g_slice_new (ReadAsyncData); - data->buffer = buffer; - data->bytes_read = 0; - simple = g_simple_async_result_new (G_OBJECT (stream), - callback, user_data, - g_buffered_input_stream_read_async); - g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data); - - available = priv->end - priv->pos; - - if (count <= available) - { - memcpy (buffer, priv->buffer + priv->pos, count); - priv->pos += count; - data->bytes_read = count; - - g_simple_async_result_complete_in_idle (simple); - g_object_unref (simple); - return; - } - - - /* Full request not available, read all currently available - * and request refill for more - */ - - memcpy (buffer, priv->buffer + priv->pos, available); - priv->pos = 0; - priv->end = 0; - - count -= available; - - data->bytes_read = available; - data->count = count; - - if (count > priv->len) - { - /* Large request, shortcut buffer */ - - base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream; - - g_input_stream_read_async (base_stream, - (char *)buffer + data->bytes_read, - count, - io_priority, cancellable, - large_read_callback, - simple); - } - else - { - class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream); - class->fill_async (bstream, priv->len, io_priority, cancellable, - read_fill_buffer_callback, simple); - } -} - -static gssize -g_buffered_input_stream_read_finish (GInputStream *stream, - GAsyncResult *result, - GError **error) -{ - GSimpleAsyncResult *simple; - ReadAsyncData *data; - - simple = G_SIMPLE_ASYNC_RESULT (result); - - g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async); - - data = g_simple_async_result_get_op_res_gpointer (simple); - - return data->bytes_read; -} - typedef struct { gssize bytes_skipped; diff --git a/gio/gbufferedoutputstream.c b/gio/gbufferedoutputstream.c index df8178e05..f624d2521 100644 --- a/gio/gbufferedoutputstream.c +++ b/gio/gbufferedoutputstream.c @@ -88,16 +88,6 @@ static gboolean g_buffered_output_stream_close (GOutputStream *stream, GCancellable *cancellable, GError **error); -static void g_buffered_output_stream_write_async (GOutputStream *stream, - const void *buffer, - gsize count, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer data); -static gssize g_buffered_output_stream_write_finish (GOutputStream *stream, - GAsyncResult *result, - GError **error); static void g_buffered_output_stream_flush_async (GOutputStream *stream, int io_priority, GCancellable *cancellable, @@ -137,8 +127,6 @@ g_buffered_output_stream_class_init (GBufferedOutputStreamClass *klass) ostream_class->write_fn = g_buffered_output_stream_write; ostream_class->flush = g_buffered_output_stream_flush; ostream_class->close_fn = g_buffered_output_stream_close; - ostream_class->write_async = g_buffered_output_stream_write_async; - ostream_class->write_finish = g_buffered_output_stream_write_finish; ostream_class->flush_async = g_buffered_output_stream_flush_async; ostream_class->flush_finish = g_buffered_output_stream_flush_finish; ostream_class->close_async = g_buffered_output_stream_close_async; @@ -578,102 +566,6 @@ flush_buffer_thread (GSimpleAsyncResult *result, g_simple_async_result_take_error (result, error); } -typedef struct { - - FlushData fdata; - - gsize count; - const void *buffer; - -} WriteData; - -static void -free_write_data (gpointer data) -{ - g_slice_free (WriteData, data); -} - -static void -g_buffered_output_stream_write_async (GOutputStream *stream, - const void *buffer, - gsize count, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer data) -{ - GBufferedOutputStream *buffered_stream; - GBufferedOutputStreamPrivate *priv; - GSimpleAsyncResult *res; - WriteData *wdata; - - buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream); - priv = buffered_stream->priv; - - wdata = g_slice_new (WriteData); - wdata->count = count; - wdata->buffer = buffer; - - res = g_simple_async_result_new (G_OBJECT (stream), - callback, - data, - g_buffered_output_stream_write_async); - - g_simple_async_result_set_op_res_gpointer (res, wdata, free_write_data); - - /* if we have space left directly call the - * callback (from idle) otherwise schedule a buffer - * flush in the thread. In both cases the actual - * copying of the data to the buffer will be done in - * the write_finish () func since that should - * be fast enough */ - if (priv->len - priv->pos > 0) - { - g_simple_async_result_complete_in_idle (res); - } - else - { - wdata->fdata.flush_stream = FALSE; - wdata->fdata.close_stream = FALSE; - g_simple_async_result_run_in_thread (res, - flush_buffer_thread, - io_priority, - cancellable); - } - g_object_unref (res); -} - -static gssize -g_buffered_output_stream_write_finish (GOutputStream *stream, - GAsyncResult *result, - GError **error) -{ - GBufferedOutputStreamPrivate *priv; - GBufferedOutputStream *buffered_stream; - GSimpleAsyncResult *simple; - WriteData *wdata; - gssize count; - - simple = G_SIMPLE_ASYNC_RESULT (result); - buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream); - priv = buffered_stream->priv; - - g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == - g_buffered_output_stream_write_async); - - wdata = g_simple_async_result_get_op_res_gpointer (simple); - - /* Now do the real copying of data to the buffer */ - count = priv->len - priv->pos; - count = MIN (wdata->count, count); - - memcpy (priv->buffer + priv->pos, wdata->buffer, count); - - priv->pos += count; - - return count; -} - static void g_buffered_output_stream_flush_async (GOutputStream *stream, int io_priority, diff --git a/gio/ginputstream.c b/gio/ginputstream.c index 7160c232f..c410d525e 100644 --- a/gio/ginputstream.c +++ b/gio/ginputstream.c @@ -30,7 +30,7 @@ #include "gasyncresult.h" #include "gsimpleasyncresult.h" #include "gioerror.h" - +#include "gpollableinputstream.h" /** * SECTION:ginputstream @@ -925,6 +925,10 @@ typedef struct { void *buffer; gsize count_requested; gssize count_read; + + GCancellable *cancellable; + gint io_priority; + gboolean need_idle; } ReadData; static void @@ -947,6 +951,60 @@ read_async_thread (GSimpleAsyncResult *res, g_simple_async_result_take_error (res, error); } +static void read_async_pollable (GPollableInputStream *stream, + GSimpleAsyncResult *result); + +static gboolean +read_async_pollable_ready (GPollableInputStream *stream, + gpointer user_data) +{ + GSimpleAsyncResult *result = user_data; + + read_async_pollable (stream, result); + return FALSE; +} + +static void +read_async_pollable (GPollableInputStream *stream, + GSimpleAsyncResult *result) +{ + GError *error = NULL; + ReadData *op = g_simple_async_result_get_op_res_gpointer (result); + + if (g_cancellable_set_error_if_cancelled (op->cancellable, &error)) + op->count_read = -1; + else + { + op->count_read = G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)-> + read_nonblocking (stream, op->buffer, op->count_requested, &error); + } + + if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) + { + GSource *source; + + g_error_free (error); + op->need_idle = FALSE; + + source = g_pollable_input_stream_create_source (stream, op->cancellable); + g_source_set_callback (source, + (GSourceFunc) read_async_pollable_ready, + g_object_ref (result), g_object_unref); + g_source_set_priority (source, op->io_priority); + g_source_attach (source, g_main_context_get_thread_default ()); + g_source_unref (source); + return; + } + + if (op->count_read == -1) + g_simple_async_result_take_error (result, error); + + if (op->need_idle) + g_simple_async_result_complete_in_idle (result); + else + g_simple_async_result_complete (result); +} + static void g_input_stream_real_read_async (GInputStream *stream, void *buffer, @@ -964,8 +1022,15 @@ g_input_stream_real_read_async (GInputStream *stream, g_simple_async_result_set_op_res_gpointer (res, op, g_free); op->buffer = buffer; op->count_requested = count; - - g_simple_async_result_run_in_thread (res, read_async_thread, io_priority, cancellable); + op->cancellable = cancellable ? g_object_ref (cancellable) : NULL; + op->io_priority = io_priority; + op->need_idle = TRUE; + + if (G_IS_POLLABLE_INPUT_STREAM (stream) && + g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (stream))) + read_async_pollable (G_POLLABLE_INPUT_STREAM (stream), res); + else + g_simple_async_result_run_in_thread (res, read_async_thread, io_priority, cancellable); g_object_unref (res); } diff --git a/gio/gmemoryinputstream.c b/gio/gmemoryinputstream.c index e657d5b0d..dac0ac166 100644 --- a/gio/gmemoryinputstream.c +++ b/gio/gmemoryinputstream.c @@ -70,16 +70,6 @@ static gssize g_memory_input_stream_skip (GInputStream *stream static gboolean g_memory_input_stream_close (GInputStream *stream, GCancellable *cancellable, GError **error); -static void g_memory_input_stream_read_async (GInputStream *stream, - void *buffer, - gsize count, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer user_data); -static gssize g_memory_input_stream_read_finish (GInputStream *stream, - GAsyncResult *result, - GError **error); static void g_memory_input_stream_skip_async (GInputStream *stream, gsize count, int io_priority, @@ -143,8 +133,6 @@ g_memory_input_stream_class_init (GMemoryInputStreamClass *klass) istream_class->skip = g_memory_input_stream_skip; istream_class->close_fn = g_memory_input_stream_close; - istream_class->read_async = g_memory_input_stream_read_async; - istream_class->read_finish = g_memory_input_stream_read_finish; istream_class->skip_async = g_memory_input_stream_skip_async; istream_class->skip_finish = g_memory_input_stream_skip_finish; istream_class->close_async = g_memory_input_stream_close_async; @@ -352,51 +340,6 @@ g_memory_input_stream_close (GInputStream *stream, return TRUE; } -static void -g_memory_input_stream_read_async (GInputStream *stream, - void *buffer, - gsize count, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer user_data) -{ - GSimpleAsyncResult *simple; - GError *error = NULL; - gssize nread; - - nread = G_INPUT_STREAM_GET_CLASS (stream)->read_fn (stream, - buffer, - count, - cancellable, - &error); - simple = g_simple_async_result_new (G_OBJECT (stream), - callback, - user_data, - g_memory_input_stream_read_async); - if (error) - g_simple_async_result_take_error (simple, error); - else - g_simple_async_result_set_op_res_gssize (simple, nread); - g_simple_async_result_complete_in_idle (simple); - g_object_unref (simple); -} - -static gssize -g_memory_input_stream_read_finish (GInputStream *stream, - GAsyncResult *result, - GError **error) -{ - GSimpleAsyncResult *simple; - gssize nread; - - simple = G_SIMPLE_ASYNC_RESULT (result); - g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_memory_input_stream_read_async); - - nread = g_simple_async_result_get_op_res_gssize (simple); - return nread; -} - static void g_memory_input_stream_skip_async (GInputStream *stream, gsize count, diff --git a/gio/gmemoryoutputstream.c b/gio/gmemoryoutputstream.c index b1da60d1d..5a62fbbcd 100644 --- a/gio/gmemoryoutputstream.c +++ b/gio/gmemoryoutputstream.c @@ -89,16 +89,6 @@ static gboolean g_memory_output_stream_close (GOutputStream *stream, GCancellable *cancellable, GError **error); -static void g_memory_output_stream_write_async (GOutputStream *stream, - const void *buffer, - gsize count, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer data); -static gssize g_memory_output_stream_write_finish (GOutputStream *stream, - GAsyncResult *result, - GError **error); static void g_memory_output_stream_close_async (GOutputStream *stream, int io_priority, GCancellable *cancellable, @@ -152,8 +142,6 @@ g_memory_output_stream_class_init (GMemoryOutputStreamClass *klass) ostream_class->write_fn = g_memory_output_stream_write; ostream_class->close_fn = g_memory_output_stream_close; - ostream_class->write_async = g_memory_output_stream_write_async; - ostream_class->write_finish = g_memory_output_stream_write_finish; ostream_class->close_async = g_memory_output_stream_close_async; ostream_class->close_finish = g_memory_output_stream_close_finish; @@ -628,56 +616,6 @@ g_memory_output_stream_close (GOutputStream *stream, return TRUE; } -static void -g_memory_output_stream_write_async (GOutputStream *stream, - const void *buffer, - gsize count, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer data) -{ - GSimpleAsyncResult *simple; - GError *error = NULL; - gssize nwritten; - - nwritten = G_OUTPUT_STREAM_GET_CLASS (stream)->write_fn (stream, - buffer, - count, - cancellable, - &error); - - simple = g_simple_async_result_new (G_OBJECT (stream), - callback, - data, - g_memory_output_stream_write_async); - - if (error) - g_simple_async_result_take_error (simple, error); - else - g_simple_async_result_set_op_res_gssize (simple, nwritten); - g_simple_async_result_complete_in_idle (simple); - g_object_unref (simple); -} - -static gssize -g_memory_output_stream_write_finish (GOutputStream *stream, - GAsyncResult *result, - GError **error) -{ - GSimpleAsyncResult *simple; - gssize nwritten; - - simple = G_SIMPLE_ASYNC_RESULT (result); - - g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == - g_memory_output_stream_write_async); - - nwritten = g_simple_async_result_get_op_res_gssize (simple); - - return nwritten; -} - static void g_memory_output_stream_close_async (GOutputStream *stream, int io_priority, diff --git a/gio/goutputstream.c b/gio/goutputstream.c index d9710467f..9d3815db5 100644 --- a/gio/goutputstream.c +++ b/gio/goutputstream.c @@ -28,7 +28,7 @@ #include "ginputstream.h" #include "gioerror.h" #include "glibintl.h" - +#include "gpollableoutputstream.h" /** * SECTION:goutputstream @@ -1266,6 +1266,10 @@ typedef struct { const void *buffer; gsize count_requested; gssize count_written; + + GCancellable *cancellable; + gint io_priority; + gboolean need_idle; } WriteData; static void @@ -1285,6 +1289,60 @@ write_async_thread (GSimpleAsyncResult *res, g_simple_async_result_take_error (res, error); } +static void write_async_pollable (GPollableOutputStream *stream, + GSimpleAsyncResult *result); + +static gboolean +write_async_pollable_ready (GPollableOutputStream *stream, + gpointer user_data) +{ + GSimpleAsyncResult *result = user_data; + + write_async_pollable (stream, result); + return FALSE; +} + +static void +write_async_pollable (GPollableOutputStream *stream, + GSimpleAsyncResult *result) +{ + GError *error = NULL; + WriteData *op = g_simple_async_result_get_op_res_gpointer (result); + + if (g_cancellable_set_error_if_cancelled (op->cancellable, &error)) + op->count_written = -1; + else + { + op->count_written = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)-> + write_nonblocking (stream, op->buffer, op->count_requested, &error); + } + + if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) + { + GSource *source; + + g_error_free (error); + op->need_idle = FALSE; + + source = g_pollable_output_stream_create_source (stream, op->cancellable); + g_source_set_callback (source, + (GSourceFunc) write_async_pollable_ready, + g_object_ref (result), g_object_unref); + g_source_set_priority (source, op->io_priority); + g_source_attach (source, g_main_context_get_thread_default ()); + g_source_unref (source); + return; + } + + if (op->count_written == -1) + g_simple_async_result_take_error (result, error); + + if (op->need_idle) + g_simple_async_result_complete_in_idle (result); + else + g_simple_async_result_complete (result); +} + static void g_output_stream_real_write_async (GOutputStream *stream, const void *buffer, @@ -1303,7 +1361,11 @@ g_output_stream_real_write_async (GOutputStream *stream, op->buffer = buffer; op->count_requested = count; - g_simple_async_result_run_in_thread (res, write_async_thread, io_priority, cancellable); + if (G_IS_POLLABLE_OUTPUT_STREAM (stream) && + g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream))) + write_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), res); + else + g_simple_async_result_run_in_thread (res, write_async_thread, io_priority, cancellable); g_object_unref (res); } diff --git a/gio/gsocketinputstream.c b/gio/gsocketinputstream.c index 37366169f..89e8a84ba 100644 --- a/gio/gsocketinputstream.c +++ b/gio/gsocketinputstream.c @@ -132,95 +132,6 @@ g_socket_input_stream_read (GInputStream *stream, cancellable, error); } -static gboolean -g_socket_input_stream_read_ready (GSocket *socket, - GIOCondition condition, - GSocketInputStream *stream) -{ - GSimpleAsyncResult *simple; - GError *error = NULL; - gssize result; - - result = g_socket_receive_with_blocking (stream->priv->socket, - stream->priv->buffer, - stream->priv->count, - FALSE, - stream->priv->cancellable, - &error); - - if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) - return TRUE; - - simple = stream->priv->result; - stream->priv->result = NULL; - - if (result >= 0) - g_simple_async_result_set_op_res_gssize (simple, result); - - if (error) - g_simple_async_result_take_error (simple, error); - - if (stream->priv->cancellable) - g_object_unref (stream->priv->cancellable); - - g_simple_async_result_complete (simple); - g_object_unref (simple); - - return FALSE; -} - -static void -g_socket_input_stream_read_async (GInputStream *stream, - void *buffer, - gsize count, - gint io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer user_data) -{ - GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (stream); - GSource *source; - - g_assert (input_stream->priv->result == NULL); - - input_stream->priv->result = - g_simple_async_result_new (G_OBJECT (stream), callback, user_data, - g_socket_input_stream_read_async); - if (cancellable) - g_object_ref (cancellable); - input_stream->priv->cancellable = cancellable; - input_stream->priv->buffer = buffer; - input_stream->priv->count = count; - - source = g_socket_create_source (input_stream->priv->socket, - G_IO_IN | G_IO_HUP | G_IO_ERR, - cancellable); - g_source_set_callback (source, - (GSourceFunc) g_socket_input_stream_read_ready, - g_object_ref (input_stream), g_object_unref); - g_source_attach (source, g_main_context_get_thread_default ()); - g_source_unref (source); -} - -static gssize -g_socket_input_stream_read_finish (GInputStream *stream, - GAsyncResult *result, - GError **error) -{ - GSimpleAsyncResult *simple; - gssize count; - - g_return_val_if_fail (G_IS_SOCKET_INPUT_STREAM (stream), -1); - - simple = G_SIMPLE_ASYNC_RESULT (result); - - g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_socket_input_stream_read_async); - - count = g_simple_async_result_get_op_res_gssize (simple); - - return count; -} - static gboolean g_socket_input_stream_pollable_is_readable (GPollableInputStream *pollable) { @@ -282,8 +193,6 @@ g_socket_input_stream_class_init (GSocketInputStreamClass *klass) gobject_class->set_property = g_socket_input_stream_set_property; ginputstream_class->read_fn = g_socket_input_stream_read; - ginputstream_class->read_async = g_socket_input_stream_read_async; - ginputstream_class->read_finish = g_socket_input_stream_read_finish; g_object_class_install_property (gobject_class, PROP_SOCKET, g_param_spec_object ("socket", diff --git a/gio/gsocketoutputstream.c b/gio/gsocketoutputstream.c index 2320b173c..145f00986 100644 --- a/gio/gsocketoutputstream.c +++ b/gio/gsocketoutputstream.c @@ -136,95 +136,6 @@ g_socket_output_stream_write (GOutputStream *stream, cancellable, error); } -static gboolean -g_socket_output_stream_write_ready (GSocket *socket, - GIOCondition condition, - GSocketOutputStream *stream) -{ - GSimpleAsyncResult *simple; - GError *error = NULL; - gssize result; - - result = g_socket_send_with_blocking (stream->priv->socket, - stream->priv->buffer, - stream->priv->count, - FALSE, - stream->priv->cancellable, - &error); - - if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) - return TRUE; - - simple = stream->priv->result; - stream->priv->result = NULL; - - if (result >= 0) - g_simple_async_result_set_op_res_gssize (simple, result); - - if (error) - g_simple_async_result_take_error (simple, error); - - if (stream->priv->cancellable) - g_object_unref (stream->priv->cancellable); - - g_simple_async_result_complete (simple); - g_object_unref (simple); - - return FALSE; -} - -static void -g_socket_output_stream_write_async (GOutputStream *stream, - const void *buffer, - gsize count, - gint io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer user_data) -{ - GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (stream); - GSource *source; - - g_assert (output_stream->priv->result == NULL); - - output_stream->priv->result = - g_simple_async_result_new (G_OBJECT (stream), callback, user_data, - g_socket_output_stream_write_async); - if (cancellable) - g_object_ref (cancellable); - output_stream->priv->cancellable = cancellable; - output_stream->priv->buffer = buffer; - output_stream->priv->count = count; - - source = g_socket_create_source (output_stream->priv->socket, - G_IO_OUT | G_IO_HUP | G_IO_ERR, - cancellable); - g_source_set_callback (source, - (GSourceFunc) g_socket_output_stream_write_ready, - g_object_ref (output_stream), g_object_unref); - g_source_attach (source, g_main_context_get_thread_default ()); - g_source_unref (source); -} - -static gssize -g_socket_output_stream_write_finish (GOutputStream *stream, - GAsyncResult *result, - GError **error) -{ - GSimpleAsyncResult *simple; - gssize count; - - g_return_val_if_fail (G_IS_SOCKET_OUTPUT_STREAM (stream), -1); - - simple = G_SIMPLE_ASYNC_RESULT (result); - - g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_socket_output_stream_write_async); - - count = g_simple_async_result_get_op_res_gssize (simple); - - return count; -} - static gboolean g_socket_output_stream_pollable_is_writable (GPollableOutputStream *pollable) { @@ -233,6 +144,19 @@ g_socket_output_stream_pollable_is_writable (GPollableOutputStream *pollable) return g_socket_condition_check (output_stream->priv->socket, G_IO_OUT); } +static gssize +g_socket_output_stream_pollable_write_nonblocking (GPollableOutputStream *pollable, + const void *buffer, + gsize size, + GError **error) +{ + GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (pollable); + + return g_socket_send_with_blocking (output_stream->priv->socket, + buffer, size, FALSE, + NULL, error); +} + static GSource * g_socket_output_stream_pollable_create_source (GPollableOutputStream *pollable, GCancellable *cancellable) @@ -250,19 +174,6 @@ g_socket_output_stream_pollable_create_source (GPollableOutputStream *pollable, return pollable_source; } -static gssize -g_socket_output_stream_pollable_write_nonblocking (GPollableOutputStream *pollable, - const void *buffer, - gsize size, - GError **error) -{ - GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (pollable); - - return g_socket_send_with_blocking (output_stream->priv->socket, - buffer, size, FALSE, - NULL, error); -} - #ifdef G_OS_UNIX static int g_socket_output_stream_get_fd (GFileDescriptorBased *fd_based) @@ -286,8 +197,6 @@ g_socket_output_stream_class_init (GSocketOutputStreamClass *klass) gobject_class->set_property = g_socket_output_stream_set_property; goutputstream_class->write_fn = g_socket_output_stream_write; - goutputstream_class->write_async = g_socket_output_stream_write_async; - goutputstream_class->write_finish = g_socket_output_stream_write_finish; g_object_class_install_property (gobject_class, PROP_SOCKET, g_param_spec_object ("socket", diff --git a/gio/gunixinputstream.c b/gio/gunixinputstream.c index 65a0dbdca..a1639f6df 100644 --- a/gio/gunixinputstream.c +++ b/gio/gunixinputstream.c @@ -95,16 +95,6 @@ static gssize g_unix_input_stream_read (GInputStream *stream, static gboolean g_unix_input_stream_close (GInputStream *stream, GCancellable *cancellable, GError **error); -static void g_unix_input_stream_read_async (GInputStream *stream, - void *buffer, - gsize count, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer data); -static gssize g_unix_input_stream_read_finish (GInputStream *stream, - GAsyncResult *result, - GError **error); static void g_unix_input_stream_skip_async (GInputStream *stream, gsize count, int io_priority, @@ -123,6 +113,7 @@ static gboolean g_unix_input_stream_close_finish (GInputStream *stream, GAsyncResult *result, GError **error); +static gboolean g_unix_input_stream_pollable_can_poll (GPollableInputStream *stream); static gboolean g_unix_input_stream_pollable_is_readable (GPollableInputStream *stream); static GSource *g_unix_input_stream_pollable_create_source (GPollableInputStream *stream, GCancellable *cancellable); @@ -147,8 +138,6 @@ g_unix_input_stream_class_init (GUnixInputStreamClass *klass) stream_class->read_fn = g_unix_input_stream_read; stream_class->close_fn = g_unix_input_stream_close; - stream_class->read_async = g_unix_input_stream_read_async; - stream_class->read_finish = g_unix_input_stream_read_finish; if (0) { /* TODO: Implement instead of using fallbacks */ @@ -192,6 +181,7 @@ g_unix_input_stream_class_init (GUnixInputStreamClass *klass) static void g_unix_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface) { + iface->can_poll = g_unix_input_stream_pollable_can_poll; iface->is_readable = g_unix_input_stream_pollable_is_readable; iface->create_source = g_unix_input_stream_pollable_create_source; } @@ -454,129 +444,6 @@ g_unix_input_stream_close (GInputStream *stream, return res != -1; } -typedef struct { - gsize count; - void *buffer; - GAsyncReadyCallback callback; - gpointer user_data; - GCancellable *cancellable; - GUnixInputStream *stream; -} ReadAsyncData; - -static gboolean -read_async_cb (int fd, - GIOCondition condition, - ReadAsyncData *data) -{ - GSimpleAsyncResult *simple; - GError *error = NULL; - gssize count_read; - - /* We know that we can read from fd once without blocking */ - while (1) - { - if (g_cancellable_set_error_if_cancelled (data->cancellable, &error)) - { - count_read = -1; - break; - } - count_read = read (data->stream->priv->fd, data->buffer, data->count); - if (count_read == -1) - { - int errsv = errno; - - if (errsv == EINTR || errsv == EAGAIN) - return TRUE; - - g_set_error (&error, G_IO_ERROR, - g_io_error_from_errno (errsv), - _("Error reading from file descriptor: %s"), - g_strerror (errsv)); - } - break; - } - - simple = g_simple_async_result_new (G_OBJECT (data->stream), - data->callback, - data->user_data, - g_unix_input_stream_read_async); - - g_simple_async_result_set_op_res_gssize (simple, count_read); - - if (count_read == -1) - g_simple_async_result_take_error (simple, error); - - /* Complete immediately, not in idle, since we're already in a mainloop callout */ - g_simple_async_result_complete (simple); - g_object_unref (simple); - - return FALSE; -} - -static void -g_unix_input_stream_read_async (GInputStream *stream, - void *buffer, - gsize count, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer user_data) -{ - GSource *source; - GUnixInputStream *unix_stream; - ReadAsyncData *data; - - unix_stream = G_UNIX_INPUT_STREAM (stream); - - if (!unix_stream->priv->is_pipe_or_socket) - { - G_INPUT_STREAM_CLASS (g_unix_input_stream_parent_class)-> - read_async (stream, buffer, count, io_priority, - cancellable, callback, user_data); - return; - } - - data = g_new0 (ReadAsyncData, 1); - data->count = count; - data->buffer = buffer; - data->callback = callback; - data->user_data = user_data; - data->cancellable = cancellable; - data->stream = unix_stream; - - source = _g_fd_source_new (unix_stream->priv->fd, - G_IO_IN, - cancellable); - g_source_set_name (source, "GUnixInputStream"); - - g_source_set_callback (source, (GSourceFunc)read_async_cb, data, g_free); - g_source_attach (source, g_main_context_get_thread_default ()); - - g_source_unref (source); -} - -static gssize -g_unix_input_stream_read_finish (GInputStream *stream, - GAsyncResult *result, - GError **error) -{ - GUnixInputStream *unix_stream = G_UNIX_INPUT_STREAM (stream); - GSimpleAsyncResult *simple; - gssize nread; - - if (!unix_stream->priv->is_pipe_or_socket) - { - return G_INPUT_STREAM_CLASS (g_unix_input_stream_parent_class)-> - read_finish (stream, result, error); - } - - simple = G_SIMPLE_ASYNC_RESULT (result); - g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_unix_input_stream_read_async); - - nread = g_simple_async_result_get_op_res_gssize (simple); - return nread; -} - static void g_unix_input_stream_skip_async (GInputStream *stream, gsize count, @@ -695,6 +562,12 @@ g_unix_input_stream_close_finish (GInputStream *stream, return TRUE; } +static gboolean +g_unix_input_stream_pollable_can_poll (GPollableInputStream *stream) +{ + return G_UNIX_INPUT_STREAM (stream)->priv->is_pipe_or_socket; +} + static gboolean g_unix_input_stream_pollable_is_readable (GPollableInputStream *stream) { diff --git a/gio/gunixoutputstream.c b/gio/gunixoutputstream.c index 86f527777..abd14a4aa 100644 --- a/gio/gunixoutputstream.c +++ b/gio/gunixoutputstream.c @@ -95,16 +95,6 @@ static gssize g_unix_output_stream_write (GOutputStream *stream, static gboolean g_unix_output_stream_close (GOutputStream *stream, GCancellable *cancellable, GError **error); -static void g_unix_output_stream_write_async (GOutputStream *stream, - const void *buffer, - gsize count, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer data); -static gssize g_unix_output_stream_write_finish (GOutputStream *stream, - GAsyncResult *result, - GError **error); static void g_unix_output_stream_close_async (GOutputStream *stream, int io_priority, GCancellable *cancellable, @@ -138,8 +128,6 @@ g_unix_output_stream_class_init (GUnixOutputStreamClass *klass) stream_class->write_fn = g_unix_output_stream_write; stream_class->close_fn = g_unix_output_stream_close; - stream_class->write_async = g_unix_output_stream_write_async; - stream_class->write_finish = g_unix_output_stream_write_finish; stream_class->close_async = g_unix_output_stream_close_async; stream_class->close_finish = g_unix_output_stream_close_finish; @@ -440,129 +428,6 @@ g_unix_output_stream_close (GOutputStream *stream, return res != -1; } -typedef struct { - gsize count; - const void *buffer; - GAsyncReadyCallback callback; - gpointer user_data; - GCancellable *cancellable; - GUnixOutputStream *stream; -} WriteAsyncData; - -static gboolean -write_async_cb (int fd, - GIOCondition condition, - WriteAsyncData *data) -{ - GSimpleAsyncResult *simple; - GError *error = NULL; - gssize count_written; - - while (1) - { - if (g_cancellable_set_error_if_cancelled (data->cancellable, &error)) - { - count_written = -1; - break; - } - - count_written = write (data->stream->priv->fd, data->buffer, data->count); - if (count_written == -1) - { - int errsv = errno; - - if (errsv == EINTR || errsv == EAGAIN) - return TRUE; - - g_set_error (&error, G_IO_ERROR, - g_io_error_from_errno (errsv), - _("Error writing to file descriptor: %s"), - g_strerror (errsv)); - } - break; - } - - simple = g_simple_async_result_new (G_OBJECT (data->stream), - data->callback, - data->user_data, - g_unix_output_stream_write_async); - - g_simple_async_result_set_op_res_gssize (simple, count_written); - - if (count_written == -1) - g_simple_async_result_take_error (simple, error); - - /* Complete immediately, not in idle, since we're already in a mainloop callout */ - g_simple_async_result_complete (simple); - g_object_unref (simple); - - return FALSE; -} - -static void -g_unix_output_stream_write_async (GOutputStream *stream, - const void *buffer, - gsize count, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer user_data) -{ - GSource *source; - GUnixOutputStream *unix_stream; - WriteAsyncData *data; - - unix_stream = G_UNIX_OUTPUT_STREAM (stream); - - if (!unix_stream->priv->is_pipe_or_socket) - { - G_OUTPUT_STREAM_CLASS (g_unix_output_stream_parent_class)-> - write_async (stream, buffer, count, io_priority, - cancellable, callback, user_data); - return; - } - - data = g_new0 (WriteAsyncData, 1); - data->count = count; - data->buffer = buffer; - data->callback = callback; - data->user_data = user_data; - data->cancellable = cancellable; - data->stream = unix_stream; - - source = _g_fd_source_new (unix_stream->priv->fd, - G_IO_OUT, - cancellable); - g_source_set_name (source, "GUnixOutputStream"); - - g_source_set_callback (source, (GSourceFunc)write_async_cb, data, g_free); - g_source_attach (source, g_main_context_get_thread_default ()); - - g_source_unref (source); -} - -static gssize -g_unix_output_stream_write_finish (GOutputStream *stream, - GAsyncResult *result, - GError **error) -{ - GUnixOutputStream *unix_stream = G_UNIX_OUTPUT_STREAM (stream); - GSimpleAsyncResult *simple; - gssize nwritten; - - if (!unix_stream->priv->is_pipe_or_socket) - { - return G_OUTPUT_STREAM_CLASS (g_unix_output_stream_parent_class)-> - write_finish (stream, result, error); - } - - simple = G_SIMPLE_ASYNC_RESULT (result); - g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_unix_output_stream_write_async); - - nwritten = g_simple_async_result_get_op_res_gssize (simple); - return nwritten; -} - typedef struct { GOutputStream *stream; GAsyncReadyCallback callback;