From 0bcc1773781264f7fae146dbabb1305af1da44f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Thu, 13 Sep 2018 13:10:36 +0300 Subject: [PATCH] Add writev() / writev_all() API to GOutputStream and GPollableOutputStream This comes with default implementations around the normal write functions and async variants. Fixes https://gitlab.gnome.org/GNOME/glib/issues/1431 --- docs/reference/gio/gio-sections.txt | 8 + gio/gioprivate.h | 1 + gio/goutputstream.c | 793 +++++++++++++++++++++++++++- gio/goutputstream.h | 68 ++- gio/gpollableoutputstream.c | 172 +++++- gio/gpollableoutputstream.h | 13 + 6 files changed, 1043 insertions(+), 12 deletions(-) diff --git a/docs/reference/gio/gio-sections.txt b/docs/reference/gio/gio-sections.txt index 3f2dd9a18..e61001f68 100644 --- a/docs/reference/gio/gio-sections.txt +++ b/docs/reference/gio/gio-sections.txt @@ -843,6 +843,12 @@ g_output_stream_write g_output_stream_write_all g_output_stream_write_all_async g_output_stream_write_all_finish +g_output_stream_writev +g_output_stream_writev_all +g_output_stream_writev_async +g_output_stream_writev_finish +g_output_stream_writev_all_async +g_output_stream_writev_all_finish g_output_stream_splice g_output_stream_flush g_output_stream_close @@ -2124,6 +2130,7 @@ g_socket_receive_with_blocking g_socket_send g_socket_send_to g_socket_send_message +g_socket_send_message_with_timeout g_socket_send_messages g_socket_send_with_blocking g_socket_close @@ -3648,6 +3655,7 @@ g_pollable_output_stream_can_poll g_pollable_output_stream_is_writable g_pollable_output_stream_create_source g_pollable_output_stream_write_nonblocking +g_pollable_output_stream_writev_nonblocking G_POLLABLE_OUTPUT_STREAM G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE diff --git a/gio/gioprivate.h b/gio/gioprivate.h index b79192566..a917510b9 100644 --- a/gio/gioprivate.h +++ b/gio/gioprivate.h @@ -29,6 +29,7 @@ G_BEGIN_DECLS gboolean g_input_stream_async_read_is_via_threads (GInputStream *stream); gboolean g_input_stream_async_close_is_via_threads (GInputStream *stream); gboolean g_output_stream_async_write_is_via_threads (GOutputStream *stream); +gboolean g_output_stream_async_writev_is_via_threads (GOutputStream *stream); gboolean g_output_stream_async_close_is_via_threads (GOutputStream *stream); void g_socket_connection_set_cached_remote_address (GSocketConnection *connection, diff --git a/gio/goutputstream.c b/gio/goutputstream.c index 3e658e88a..f80cc0a2a 100644 --- a/gio/goutputstream.c +++ b/gio/goutputstream.c @@ -72,6 +72,23 @@ static void g_output_stream_real_write_async (GOutputStream *s static gssize g_output_stream_real_write_finish (GOutputStream *stream, GAsyncResult *result, GError **error); +static gboolean g_output_stream_real_writev (GOutputStream *stream, + const GOutputVector *vectors, + gsize n_vectors, + gsize *bytes_written, + GCancellable *cancellable, + GError **error); +static void g_output_stream_real_writev_async (GOutputStream *stream, + const GOutputVector *vectors, + gsize n_vectors, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer data); +static gboolean g_output_stream_real_writev_finish (GOutputStream *stream, + GAsyncResult *result, + gsize *bytes_written, + GError **error); static void g_output_stream_real_splice_async (GOutputStream *stream, GInputStream *source, GOutputStreamSpliceFlags flags, @@ -134,6 +151,9 @@ g_output_stream_class_init (GOutputStreamClass *klass) klass->write_async = g_output_stream_real_write_async; klass->write_finish = g_output_stream_real_write_finish; + klass->writev_fn = g_output_stream_real_writev; + klass->writev_async = g_output_stream_real_writev_async; + klass->writev_finish = g_output_stream_real_writev_finish; klass->splice_async = g_output_stream_real_splice_async; klass->splice_finish = g_output_stream_real_splice_finish; klass->flush_async = g_output_stream_real_flush_async; @@ -299,6 +319,204 @@ g_output_stream_write_all (GOutputStream *stream, return TRUE; } +/** + * g_output_stream_writev: + * @stream: a #GOutputStream. + * @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write. + * @n_vectors: the number of vectors to write + * @bytes_written: (out) (optional): location to store the number of bytes that were + * written to the stream + * @cancellable: (nullable): optional cancellable object + * @error: location to store the error occurring, or %NULL to ignore + * + * Tries to write the bytes contained in the @n_vectors @vectors into the + * stream. Will block during the operation. + * + * If @n_vectors is 0 or the sum of all bytes in @vectors is 0, returns 0 and + * does nothing. + * + * On success, the number of bytes written to the stream is returned. + * It is not an error if this is not the same as the requested size, as it + * can happen e.g. on a partial I/O error, or if there is not enough + * storage in the stream. All writes block until at least one byte + * is written or an error occurs; 0 is never returned (unless + * @n_vectors is 0 or the sum of all bytes in @vectors is 0). + * + * If @cancellable is not %NULL, then the operation can be cancelled by + * triggering the cancellable object from another thread. If the operation + * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an + * operation was partially finished when the operation was cancelled the + * partial result will be returned, without an error. + * + * Some implementations of g_output_stream_writev() may have limitations on the + * aggregate buffer size, and will return %G_IO_ERROR_INVALID_ARGUMENT if these + * are exceeded. For example, when writing to a local file on UNIX platforms, + * the aggregate buffer size must not exceed %G_MAXSSIZE bytes. + * + * Virtual: writev_fn + * + * Returns: %TRUE on success, %FALSE if there was an error + * + * Since: 2.60 + */ +gboolean +g_output_stream_writev (GOutputStream *stream, + const GOutputVector *vectors, + gsize n_vectors, + gsize *bytes_written, + GCancellable *cancellable, + GError **error) +{ + GOutputStreamClass *class; + gboolean res; + gsize _bytes_written = 0; + + if (bytes_written) + *bytes_written = 0; + + g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); + g_return_val_if_fail (vectors != NULL || n_vectors == 0, FALSE); + g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), FALSE); + g_return_val_if_fail (error == NULL || *error == NULL, FALSE); + + if (n_vectors == 0) + return TRUE; + + class = G_OUTPUT_STREAM_GET_CLASS (stream); + + g_return_val_if_fail (class->writev_fn != NULL, FALSE); + + if (!g_output_stream_set_pending (stream, error)) + return FALSE; + + if (cancellable) + g_cancellable_push_current (cancellable); + + res = class->writev_fn (stream, vectors, n_vectors, &_bytes_written, cancellable, error); + + g_warn_if_fail (res || _bytes_written == 0); + g_warn_if_fail (res || (error == NULL || *error != NULL)); + + if (cancellable) + g_cancellable_pop_current (cancellable); + + g_output_stream_clear_pending (stream); + + if (bytes_written) + *bytes_written = _bytes_written; + + return res; +} + +/** + * g_output_stream_writev_all: + * @stream: a #GOutputStream. + * @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write. + * @n_vectors: the number of vectors to write + * @bytes_written: (out) (optional): location to store the number of bytes that were + * written to the stream + * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore. + * @error: location to store the error occurring, or %NULL to ignore + * + * Tries to write the bytes contained in the @n_vectors @vectors into the + * stream. Will block during the operation. + * + * This function is similar to g_output_stream_writev(), except it tries to + * write as many bytes as requested, only stopping on an error. + * + * On a successful write of all @n_vectors vectors, %TRUE is returned, and + * @bytes_written is set to the sum of all the sizes of @vectors. + * + * If there is an error during the operation %FALSE is returned and @error + * is set to indicate the error status. + * + * As a special exception to the normal conventions for functions that + * use #GError, if this function returns %FALSE (and sets @error) then + * @bytes_written will be set to the number of bytes that were + * successfully written before the error was encountered. This + * functionality is only available from C. If you need it from another + * language then you must write your own loop around + * g_output_stream_write(). + * + * The content of the individual elements of @vectors might be changed by this + * function. + * + * Returns: %TRUE on success, %FALSE if there was an error + * + * Since: 2.60 + */ +gboolean +g_output_stream_writev_all (GOutputStream *stream, + GOutputVector *vectors, + gsize n_vectors, + gsize *bytes_written, + GCancellable *cancellable, + GError **error) +{ + gsize _bytes_written = 0; + gsize i, to_be_written = 0; + + if (bytes_written) + *bytes_written = 0; + + g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); + g_return_val_if_fail (vectors != NULL || n_vectors == 0, FALSE); + g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), FALSE); + g_return_val_if_fail (error == NULL || *error == NULL, FALSE); + + /* We can't write more than G_MAXSIZE bytes overall, otherwise we + * would overflow the bytes_written counter */ + for (i = 0; i < n_vectors; i++) + { + if (to_be_written > G_MAXSIZE - vectors[i].size) + { + g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT, + _("Sum of vectors passed to %s too large"), G_STRFUNC); + return FALSE; + } + to_be_written += vectors[i].size; + } + + _bytes_written = 0; + while (n_vectors > 0 && to_be_written > 0) + { + gsize n_written = 0; + gboolean res; + + res = g_output_stream_writev (stream, vectors, n_vectors, &n_written, cancellable, error); + + if (!res) + { + if (bytes_written) + *bytes_written = _bytes_written; + return FALSE; + } + + if (n_written == 0) + g_warning ("Write returned zero without error"); + _bytes_written += n_written; + + /* skip vectors that have been written in full */ + while (n_vectors > 0 && n_written >= vectors[0].size) + { + n_written -= vectors[0].size; + ++vectors; + --n_vectors; + } + /* skip partially written vector data */ + if (n_written > 0 && n_vectors > 0) + { + vectors[0].size -= n_written; + vectors[0].buffer = ((guint8 *) vectors[0].buffer) + n_written; + } + } + + if (bytes_written) + *bytes_written = _bytes_written; + + return TRUE; +} + /** * g_output_stream_printf: * @stream: a #GOutputStream. @@ -923,7 +1141,6 @@ write_all_callback (GObject *stream, g_task_return_boolean (task, TRUE); g_object_unref (task); } - else g_output_stream_write_async (G_OUTPUT_STREAM (stream), data->buffer + data->bytes_written, @@ -1060,6 +1277,329 @@ g_output_stream_write_all_finish (GOutputStream *stream, return g_task_propagate_boolean (task, error); } +/** + * g_output_stream_writev_async: + * @stream: A #GOutputStream. + * @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write. + * @n_vectors: the number of vectors to write + * @io_priority: the I/O priority of the request. + * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore. + * @callback: (scope async): callback to call when the request is satisfied + * @user_data: (closure): the data to pass to callback function + * + * Request an asynchronous write of the bytes contained in @n_vectors @vectors into + * the stream. When the operation is finished @callback will be called. + * You can then call g_output_stream_writev_finish() to get the result of the + * operation. + * + * During an async request no other sync and async calls are allowed, + * and will result in %G_IO_ERROR_PENDING errors. + * + * On success, the number of bytes written will be passed to the + * @callback. It is not an error if this is not the same as the + * requested size, as it can happen e.g. on a partial I/O error, + * but generally we try to write as many bytes as requested. + * + * You are guaranteed that this method will never fail with + * %G_IO_ERROR_WOULD_BLOCK — if @stream can't accept more data, the + * method will just wait until this changes. + * + * Any outstanding I/O request with higher priority (lower numerical + * value) will be executed before an outstanding request with lower + * priority. Default priority is %G_PRIORITY_DEFAULT. + * + * The asynchronous methods have a default fallback that uses threads + * to implement asynchronicity, so they are optional for inheriting + * classes. However, if you override one you must override all. + * + * For the synchronous, blocking version of this function, see + * g_output_stream_writev(). + * + * Note that no copy of @vectors will be made, so it must stay valid + * until @callback is called. + * + * Since: 2.60 + */ +void +g_output_stream_writev_async (GOutputStream *stream, + const GOutputVector *vectors, + gsize n_vectors, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + GOutputStreamClass *class; + + g_return_if_fail (G_IS_OUTPUT_STREAM (stream)); + g_return_if_fail (vectors != NULL || n_vectors == 0); + g_return_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable)); + + class = G_OUTPUT_STREAM_GET_CLASS (stream); + g_return_if_fail (class->writev_async != NULL); + + class->writev_async (stream, vectors, n_vectors, io_priority, cancellable, + callback, user_data); +} + +/** + * g_output_stream_writev_finish: + * @stream: a #GOutputStream. + * @result: a #GAsyncResult. + * @bytes_written: (out) (optional): location to store the number of bytes that were written to the stream + * @error: a #GError location to store the error occurring, or %NULL to + * ignore. + * + * Finishes a stream writev operation. + * + * Returns: %TRUE on success, %FALSE if there was an error + * + * Since: 2.60 + */ +gboolean +g_output_stream_writev_finish (GOutputStream *stream, + GAsyncResult *result, + gsize *bytes_written, + GError **error) +{ + GOutputStreamClass *class; + gboolean res; + gsize _bytes_written = 0; + + g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); + g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE); + g_return_val_if_fail (error == NULL || *error == NULL, FALSE); + + class = G_OUTPUT_STREAM_GET_CLASS (stream); + g_return_val_if_fail (class->writev_finish != NULL, FALSE); + + res = class->writev_finish (stream, result, &_bytes_written, error); + + g_warn_if_fail (res || _bytes_written == 0); + g_warn_if_fail (res || (error == NULL || *error != NULL)); + + if (bytes_written) + *bytes_written = _bytes_written; + + return res; +} + +typedef struct +{ + GOutputVector *vectors; + gsize n_vectors; /* (unowned) */ + gsize bytes_written; +} AsyncWritevAll; + +static void +free_async_writev_all (gpointer data) +{ + g_slice_free (AsyncWritevAll, data); +} + +static void +writev_all_callback (GObject *stream, + GAsyncResult *result, + gpointer user_data) +{ + GTask *task = user_data; + AsyncWritevAll *data = g_task_get_task_data (task); + gint priority = g_task_get_priority (task); + GCancellable *cancellable = g_task_get_cancellable (task); + + if (result) + { + GError *error = NULL; + gboolean res; + gsize n_written = 0; + + res = g_output_stream_writev_finish (G_OUTPUT_STREAM (stream), result, &n_written, &error); + + if (!res) + { + g_task_return_error (task, g_steal_pointer (&error)); + g_object_unref (task); + return; + } + + g_warn_if_fail (n_written > 0); + data->bytes_written += n_written; + + /* skip vectors that have been written in full */ + while (data->n_vectors > 0 && n_written >= data->vectors[0].size) + { + n_written -= data->vectors[0].size; + ++data->vectors; + --data->n_vectors; + } + /* skip partially written vector data */ + if (n_written > 0 && data->n_vectors > 0) + { + data->vectors[0].size -= n_written; + data->vectors[0].buffer = ((guint8 *) data->vectors[0].buffer) + n_written; + } + } + + if (data->n_vectors == 0) + { + g_task_return_boolean (task, TRUE); + g_object_unref (task); + } + else + g_output_stream_writev_async (G_OUTPUT_STREAM (stream), + data->vectors, + data->n_vectors, + priority, + cancellable, + writev_all_callback, g_steal_pointer (&task)); +} + +static void +writev_all_async_thread (GTask *task, + gpointer source_object, + gpointer task_data, + GCancellable *cancellable) +{ + GOutputStream *stream = G_OUTPUT_STREAM (source_object); + AsyncWritevAll *data = task_data; + GError *error = NULL; + + if (g_output_stream_writev_all (stream, data->vectors, data->n_vectors, &data->bytes_written, + g_task_get_cancellable (task), &error)) + g_task_return_boolean (task, TRUE); + else + g_task_return_error (task, g_steal_pointer (&error)); +} + +/** + * g_output_stream_writev_all_async: + * @stream: A #GOutputStream + * @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write. + * @n_vectors: the number of vectors to write + * @io_priority: the I/O priority of the request + * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore + * @callback: (scope async): callback to call when the request is satisfied + * @user_data: (closure): the data to pass to callback function + * + * Request an asynchronous write of the bytes contained in the @n_vectors @vectors into + * the stream. When the operation is finished @callback will be called. + * You can then call g_output_stream_writev_all_finish() to get the result of the + * operation. + * + * This is the asynchronous version of g_output_stream_writev_all(). + * + * Call g_output_stream_writev_all_finish() to collect the result. + * + * Any outstanding I/O request with higher priority (lower numerical + * value) will be executed before an outstanding request with lower + * priority. Default priority is %G_PRIORITY_DEFAULT. + * + * Note that no copy of @vectors will be made, so it must stay valid + * until @callback is called. The content of the individual elements + * of @vectors might be changed by this function. + * + * Since: 2.60 + */ +void +g_output_stream_writev_all_async (GOutputStream *stream, + GOutputVector *vectors, + gsize n_vectors, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + AsyncWritevAll *data; + GTask *task; + gsize i, to_be_written = 0; + + g_return_if_fail (G_IS_OUTPUT_STREAM (stream)); + g_return_if_fail (vectors != NULL || n_vectors == 0); + g_return_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable)); + + task = g_task_new (stream, cancellable, callback, user_data); + data = g_slice_new0 (AsyncWritevAll); + data->vectors = vectors; + data->n_vectors = n_vectors; + + g_task_set_source_tag (task, g_output_stream_writev_all_async); + g_task_set_task_data (task, data, free_async_writev_all); + g_task_set_priority (task, io_priority); + + /* We can't write more than G_MAXSIZE bytes overall, otherwise we + * would overflow the bytes_written counter */ + for (i = 0; i < n_vectors; i++) + { + if (to_be_written > G_MAXSIZE - vectors[i].size) + { + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT, + _("Sum of vectors passed to %s too large"), + G_STRFUNC); + g_object_unref (task); + return; + } + to_be_written += vectors[i].size; + } + + /* If async writes are going to be handled via the threadpool anyway + * then we may as well do it with a single dispatch instead of + * bouncing in and out. + */ + if (g_output_stream_async_writev_is_via_threads (stream)) + { + g_task_run_in_thread (task, writev_all_async_thread); + g_object_unref (task); + } + else + writev_all_callback (G_OBJECT (stream), NULL, g_steal_pointer (&task)); +} + +/** + * g_output_stream_writev_all_finish: + * @stream: a #GOutputStream + * @result: a #GAsyncResult + * @bytes_written: (out) (optional): location to store the number of bytes that were written to the stream + * @error: a #GError location to store the error occurring, or %NULL to ignore. + * + * Finishes an asynchronous stream write operation started with + * g_output_stream_writev_all_async(). + * + * As a special exception to the normal conventions for functions that + * use #GError, if this function returns %FALSE (and sets @error) then + * @bytes_written will be set to the number of bytes that were + * successfully written before the error was encountered. This + * functionality is only available from C. If you need it from another + * language then you must write your own loop around + * g_output_stream_writev_async(). + * + * Returns: %TRUE on success, %FALSE if there was an error + * + * Since: 2.60 + */ +gboolean +g_output_stream_writev_all_finish (GOutputStream *stream, + GAsyncResult *result, + gsize *bytes_written, + GError **error) +{ + GTask *task; + + g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); + g_return_val_if_fail (error == NULL || *error == NULL, FALSE); + + task = G_TASK (result); + + if (bytes_written) + { + AsyncWritevAll *data = (AsyncWritevAll *)g_task_get_task_data (task); + + *bytes_written = data->bytes_written; + } + + return g_task_propagate_boolean (task, error); +} + static void write_bytes_callback (GObject *stream, GAsyncResult *result, @@ -1712,6 +2252,28 @@ g_output_stream_async_write_is_via_threads (GOutputStream *stream) g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream)))); } +/*< internal > + * g_output_stream_async_writev_is_via_threads: + * @stream: a #GOutputStream. + * + * Checks if an output stream's writev_async function uses threads. + * + * Returns: %TRUE if @stream's writev_async function uses threads. + **/ +gboolean +g_output_stream_async_writev_is_via_threads (GOutputStream *stream) +{ + GOutputStreamClass *class; + + g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); + + class = G_OUTPUT_STREAM_GET_CLASS (stream); + + return (class->writev_async == g_output_stream_real_writev_async && + !(G_IS_POLLABLE_OUTPUT_STREAM (stream) && + g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream)))); +} + /*< internal > * g_output_stream_async_close_is_via_threads: * @stream: output stream @@ -1732,6 +2294,69 @@ g_output_stream_async_close_is_via_threads (GOutputStream *stream) return class->close_async == g_output_stream_real_close_async; } +/******************************************** + * Default implementation of sync ops * + ********************************************/ +static gboolean +g_output_stream_real_writev (GOutputStream *stream, + const GOutputVector *vectors, + gsize n_vectors, + gsize *bytes_written, + GCancellable *cancellable, + GError **error) +{ + GOutputStreamClass *class; + gsize _bytes_written = 0; + gsize i; + GError *err = NULL; + + class = G_OUTPUT_STREAM_GET_CLASS (stream); + + if (bytes_written) + *bytes_written = 0; + + for (i = 0; i < n_vectors; i++) + { + gssize res = 0; + + /* Would we overflow here? In that case simply return and let the caller + * handle this like a short write */ + if (_bytes_written > G_MAXSIZE - vectors[i].size) + break; + + res = class->write_fn (stream, vectors[i].buffer, vectors[i].size, cancellable, &err); + + if (res == -1) + { + /* If we already wrote something we handle this like a short write + * and assume that on the next call the same error happens again, or + * everything finishes successfully without data loss then + */ + if (_bytes_written > 0) + { + if (bytes_written) + *bytes_written = _bytes_written; + + g_clear_error (&err); + return TRUE; + } + + g_propagate_error (error, err); + return FALSE; + } + + _bytes_written += res; + /* if we had a short write break the loop here */ + if (res < vectors[i].size) + break; + } + + if (bytes_written) + *bytes_written = _bytes_written; + + return TRUE; +} + /******************************************** * Default implementation of async ops * ********************************************/ @@ -1852,6 +2477,172 @@ g_output_stream_real_write_finish (GOutputStream *stream, return g_task_propagate_int (G_TASK (result), error); } +typedef struct { + const GOutputVector *vectors; + gsize n_vectors; /* (unowned) */ + gsize bytes_written; +} WritevData; + +static void +free_writev_data (WritevData *op) +{ + g_slice_free (WritevData, op); +} + +static void +writev_async_thread (GTask *task, + gpointer source_object, + gpointer task_data, + GCancellable *cancellable) +{ + GOutputStream *stream = source_object; + WritevData *op = task_data; + GOutputStreamClass *class; + GError *error = NULL; + gboolean res; + + class = G_OUTPUT_STREAM_GET_CLASS (stream); + res = class->writev_fn (stream, op->vectors, op->n_vectors, + &op->bytes_written, cancellable, &error); + + g_warn_if_fail (res || op->bytes_written == 0); + g_warn_if_fail (res || error != NULL); + + if (!res) + g_task_return_error (task, g_steal_pointer (&error)); + else + g_task_return_boolean (task, TRUE); +} + +static void writev_async_pollable (GPollableOutputStream *stream, + GTask *task); + +static gboolean +writev_async_pollable_ready (GPollableOutputStream *stream, + gpointer user_data) +{ + GTask *task = user_data; + + writev_async_pollable (stream, task); + return G_SOURCE_REMOVE; +} + +static void +writev_async_pollable (GPollableOutputStream *stream, + GTask *task) +{ + GError *error = NULL; + WritevData *op = g_task_get_task_data (task); + GPollableReturn res; + gsize bytes_written = 0; + + if (g_task_return_error_if_cancelled (task)) + return; + + res = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)-> + writev_nonblocking (stream, op->vectors, op->n_vectors, &bytes_written, &error); + + switch (res) + { + case G_POLLABLE_RETURN_WOULD_BLOCK: + { + GSource *source; + + g_warn_if_fail (error == NULL); + g_warn_if_fail (bytes_written == 0); + + source = g_pollable_output_stream_create_source (stream, + g_task_get_cancellable (task)); + g_task_attach_source (task, source, + (GSourceFunc) writev_async_pollable_ready); + g_source_unref (source); + } + break; + case G_POLLABLE_RETURN_OK: + g_warn_if_fail (error == NULL); + op->bytes_written = bytes_written; + g_task_return_boolean (task, TRUE); + break; + case G_POLLABLE_RETURN_FAILED: + g_warn_if_fail (bytes_written == 0); + g_warn_if_fail (error != NULL); + g_task_return_error (task, g_steal_pointer (&error)); + break; + default: + g_assert_not_reached (); + } +} + +static void +g_output_stream_real_writev_async (GOutputStream *stream, + const GOutputVector *vectors, + gsize n_vectors, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + GTask *task; + WritevData *op; + GError *error = NULL; + + op = g_slice_new0 (WritevData); + task = g_task_new (stream, cancellable, callback, user_data); + op->vectors = vectors; + op->n_vectors = n_vectors; + + g_task_set_check_cancellable (task, FALSE); + g_task_set_source_tag (task, g_output_stream_writev_async); + g_task_set_priority (task, io_priority); + g_task_set_task_data (task, op, (GDestroyNotify) free_writev_data); + + if (n_vectors == 0) + { + g_task_return_boolean (task, TRUE); + g_object_unref (task); + return; + } + + if (!g_output_stream_set_pending (stream, &error)) + { + g_task_return_error (task, g_steal_pointer (&error)); + g_object_unref (task); + return; + } + + if (!g_output_stream_async_writev_is_via_threads (stream)) + writev_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), task); + else + g_task_run_in_thread (task, writev_async_thread); + + g_object_unref (task); +} + +static gboolean +g_output_stream_real_writev_finish (GOutputStream *stream, + GAsyncResult *result, + gsize *bytes_written, + GError **error) +{ + GTask *task; + + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); + g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_writev_async), FALSE); + + g_output_stream_clear_pending (stream); + + task = G_TASK (result); + + if (bytes_written) + { + WritevData *op = g_task_get_task_data (task); + + *bytes_written = op->bytes_written; + } + + return g_task_propagate_boolean (task, error); +} + typedef struct { GInputStream *source; GOutputStreamSpliceFlags flags; diff --git a/gio/goutputstream.h b/gio/goutputstream.h index fef1b8fdf..dc0f4925a 100644 --- a/gio/goutputstream.h +++ b/gio/goutputstream.h @@ -119,11 +119,28 @@ struct _GOutputStreamClass GAsyncResult *result, GError **error); + gboolean (* writev_fn) (GOutputStream *stream, + const GOutputVector *vectors, + gsize n_vectors, + gsize *bytes_written, + GCancellable *cancellable, + GError **error); + + void (* writev_async) (GOutputStream *stream, + const GOutputVector *vectors, + gsize n_vectors, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); + + gboolean (* writev_finish) (GOutputStream *stream, + GAsyncResult *result, + gsize *bytes_written, + GError **error); + /*< private >*/ /* Padding for future expansion */ - void (*_g_reserved1) (void); - void (*_g_reserved2) (void); - void (*_g_reserved3) (void); void (*_g_reserved4) (void); void (*_g_reserved5) (void); void (*_g_reserved6) (void); @@ -147,6 +164,22 @@ gboolean g_output_stream_write_all (GOutputStream *stream, gsize *bytes_written, GCancellable *cancellable, GError **error); + +GLIB_AVAILABLE_IN_2_60 +gboolean g_output_stream_writev (GOutputStream *stream, + const GOutputVector *vectors, + gsize n_vectors, + gsize *bytes_written, + GCancellable *cancellable, + GError **error); +GLIB_AVAILABLE_IN_2_60 +gboolean g_output_stream_writev_all (GOutputStream *stream, + GOutputVector *vectors, + gsize n_vectors, + gsize *bytes_written, + GCancellable *cancellable, + GError **error); + GLIB_AVAILABLE_IN_2_40 gboolean g_output_stream_printf (GOutputStream *stream, gsize *bytes_written, @@ -208,6 +241,35 @@ gboolean g_output_stream_write_all_finish (GOutputStream *stream, gsize *bytes_written, GError **error); +GLIB_AVAILABLE_IN_2_60 +void g_output_stream_writev_async (GOutputStream *stream, + const GOutputVector *vectors, + gsize n_vectors, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); +GLIB_AVAILABLE_IN_2_60 +gboolean g_output_stream_writev_finish (GOutputStream *stream, + GAsyncResult *result, + gsize *bytes_written, + GError **error); + +GLIB_AVAILABLE_IN_2_60 +void g_output_stream_writev_all_async (GOutputStream *stream, + GOutputVector *vectors, + gsize n_vectors, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); + +GLIB_AVAILABLE_IN_2_60 +gboolean g_output_stream_writev_all_finish (GOutputStream *stream, + GAsyncResult *result, + gsize *bytes_written, + GError **error); + GLIB_AVAILABLE_IN_2_34 void g_output_stream_write_bytes_async (GOutputStream *stream, GBytes *bytes, diff --git a/gio/gpollableoutputstream.c b/gio/gpollableoutputstream.c index 40c649f0d..c17cf9268 100644 --- a/gio/gpollableoutputstream.c +++ b/gio/gpollableoutputstream.c @@ -41,17 +41,23 @@ G_DEFINE_INTERFACE (GPollableOutputStream, g_pollable_output_stream, G_TYPE_OUTPUT_STREAM) -static gboolean g_pollable_output_stream_default_can_poll (GPollableOutputStream *stream); -static gssize g_pollable_output_stream_default_write_nonblocking (GPollableOutputStream *stream, - const void *buffer, - gsize count, - GError **error); +static gboolean g_pollable_output_stream_default_can_poll (GPollableOutputStream *stream); +static gssize g_pollable_output_stream_default_write_nonblocking (GPollableOutputStream *stream, + const void *buffer, + gsize count, + GError **error); +static GPollableReturn g_pollable_output_stream_default_writev_nonblocking (GPollableOutputStream *stream, + const GOutputVector *vectors, + gsize n_vectors, + gsize *bytes_written, + GError **error); static void g_pollable_output_stream_default_init (GPollableOutputStreamInterface *iface) { - iface->can_poll = g_pollable_output_stream_default_can_poll; - iface->write_nonblocking = g_pollable_output_stream_default_write_nonblocking; + iface->can_poll = g_pollable_output_stream_default_can_poll; + iface->write_nonblocking = g_pollable_output_stream_default_write_nonblocking; + iface->writev_nonblocking = g_pollable_output_stream_default_writev_nonblocking; } static gboolean @@ -157,6 +163,67 @@ g_pollable_output_stream_default_write_nonblocking (GPollableOutputStream *stre write_fn (G_OUTPUT_STREAM (stream), buffer, count, NULL, error); } +static GPollableReturn +g_pollable_output_stream_default_writev_nonblocking (GPollableOutputStream *stream, + const GOutputVector *vectors, + gsize n_vectors, + gsize *bytes_written, + GError **error) +{ + gsize _bytes_written = 0; + GPollableOutputStreamInterface *iface = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream); + gsize i; + GError *err = NULL; + + for (i = 0; i < n_vectors; i++) + { + gssize res; + + /* Would we overflow here? In that case simply return and let the caller + * handle this like a short write */ + if (_bytes_written > G_MAXSIZE - vectors[i].size) + break; + + res = iface->write_nonblocking (stream, vectors[i].buffer, vectors[i].size, &err); + if (res == -1) + { + if (bytes_written) + *bytes_written = _bytes_written; + + /* If something was written already we handle this like a short + * write and assume that the next call would either give the same + * error again or successfully finish writing without errors or data + * loss + */ + if (_bytes_written > 0) + { + g_clear_error (&err); + return G_POLLABLE_RETURN_OK; + } + else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) + { + g_clear_error (&err); + return G_POLLABLE_RETURN_WOULD_BLOCK; + } + else + { + g_propagate_error (error, err); + return G_POLLABLE_RETURN_FAILED; + } + } + + _bytes_written += res; + /* if we had a short write break the loop here */ + if (res < vectors[i].size) + break; + } + + if (bytes_written) + *bytes_written = _bytes_written; + + return G_POLLABLE_RETURN_OK; +} + /** * g_pollable_output_stream_write_nonblocking: * @stream: a #GPollableOutputStream @@ -179,7 +246,8 @@ g_pollable_output_stream_default_write_nonblocking (GPollableOutputStream *stre * to having been cancelled. * * Also note that if %G_IO_ERROR_WOULD_BLOCK is returned some underlying - * transports like D/TLS require that you send the same @buffer and @count. + * transports like D/TLS require that you re-send the same @buffer and + * @count in the next write call. * * Virtual: write_nonblocking * Returns: the number of bytes written, or -1 on error (including @@ -221,3 +289,91 @@ g_pollable_output_stream_write_nonblocking (GPollableOutputStream *stream, return res; } + +/** + * g_pollable_output_stream_writev_nonblocking: + * @stream: a #GPollableOutputStream + * @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write. + * @n_vectors: the number of vectors to write + * @bytes_written: (out) (optional): location to store the number of bytes that were + * written to the stream + * @cancellable: (nullable): a #GCancellable, or %NULL + * @error: #GError for error reporting, or %NULL to ignore. + * + * Attempts to write the bytes contained in the @n_vectors @vectors to @stream, + * as with g_output_stream_writev(). If @stream is not currently writable, + * this will immediately return %@G_POLLABLE_RETURN_WOULD_BLOCK, and you can + * use g_pollable_output_stream_create_source() to create a #GSource + * that will be triggered when @stream is writable. @error will *not* be + * set in that case. + * + * Note that since this method never blocks, you cannot actually + * use @cancellable to cancel it. However, it will return an error + * if @cancellable has already been cancelled when you call, which + * may happen if you call this method after a source triggers due + * to having been cancelled. + * + * Also note that if %G_POLLABLE_RETURN_WOULD_BLOCK is returned some underlying + * transports like D/TLS require that you re-send the same @vectors and + * @n_vectors in the next write call. + * + * Virtual: writev_nonblocking + * + * Returns: %@G_POLLABLE_RETURN_OK on success, %G_POLLABLE_RETURN_WOULD_BLOCK + * if the stream is not currently writable (and @error is *not* set), or + * %G_POLLABLE_RETURN_FAILED if there was an error in which case @error will + * be set. + * + * Since: 2.60 + */ +GPollableReturn +g_pollable_output_stream_writev_nonblocking (GPollableOutputStream *stream, + const GOutputVector *vectors, + gsize n_vectors, + gsize *bytes_written, + GCancellable *cancellable, + GError **error) +{ + GPollableOutputStreamInterface *iface; + GPollableReturn res; + gsize _bytes_written = 0; + + if (bytes_written) + *bytes_written = 0; + + g_return_val_if_fail (G_IS_POLLABLE_OUTPUT_STREAM (stream), G_POLLABLE_RETURN_FAILED); + g_return_val_if_fail (vectors != NULL || n_vectors == 0, G_POLLABLE_RETURN_FAILED); + g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), G_POLLABLE_RETURN_FAILED); + g_return_val_if_fail (error == NULL || *error == NULL, G_POLLABLE_RETURN_FAILED); + + if (g_cancellable_set_error_if_cancelled (cancellable, error)) + return G_POLLABLE_RETURN_FAILED; + + if (n_vectors == 0) + return G_POLLABLE_RETURN_OK; + + iface = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream); + g_return_val_if_fail (iface->writev_nonblocking != NULL, G_POLLABLE_RETURN_FAILED); + + if (cancellable) + g_cancellable_push_current (cancellable); + + res = iface-> + writev_nonblocking (stream, vectors, n_vectors, &_bytes_written, error); + + if (cancellable) + g_cancellable_pop_current (cancellable); + + if (res == G_POLLABLE_RETURN_FAILED) + g_warn_if_fail (error == NULL || (*error != NULL && !g_error_matches (*error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))); + else if (res == G_POLLABLE_RETURN_WOULD_BLOCK) + g_warn_if_fail (error == NULL || *error == NULL); + + /* in case of not-OK nothing must've been written */ + g_warn_if_fail (res == G_POLLABLE_RETURN_OK || _bytes_written == 0); + + if (bytes_written) + *bytes_written = _bytes_written; + + return res; +} diff --git a/gio/gpollableoutputstream.h b/gio/gpollableoutputstream.h index bf13584d5..1ef830b57 100644 --- a/gio/gpollableoutputstream.h +++ b/gio/gpollableoutputstream.h @@ -77,6 +77,11 @@ struct _GPollableOutputStreamInterface const void *buffer, gsize count, GError **error); + GPollableReturn (*writev_nonblocking) (GPollableOutputStream *stream, + const GOutputVector *vectors, + gsize n_vectors, + gsize *bytes_written, + GError **error); }; GLIB_AVAILABLE_IN_ALL @@ -98,6 +103,14 @@ gssize g_pollable_output_stream_write_nonblocking (GPollableOutputStream *str GCancellable *cancellable, GError **error); +GLIB_AVAILABLE_IN_2_60 +GPollableReturn g_pollable_output_stream_writev_nonblocking (GPollableOutputStream *stream, + const GOutputVector *vectors, + gsize n_vectors, + gsize *bytes_written, + GCancellable *cancellable, + GError **error); + G_END_DECLS