Add writev() / writev_all() API to GOutputStream and GPollableOutputStream

This comes with default implementations around the normal write
functions and async variants.

Fixes https://gitlab.gnome.org/GNOME/glib/issues/1431
This commit is contained in:
Sebastian Dröge 2018-09-13 13:10:36 +03:00
parent 042b8dc40d
commit 0bcc177378
6 changed files with 1043 additions and 12 deletions

View File

@ -843,6 +843,12 @@ g_output_stream_write
g_output_stream_write_all
g_output_stream_write_all_async
g_output_stream_write_all_finish
g_output_stream_writev
g_output_stream_writev_all
g_output_stream_writev_async
g_output_stream_writev_finish
g_output_stream_writev_all_async
g_output_stream_writev_all_finish
g_output_stream_splice
g_output_stream_flush
g_output_stream_close
@ -2124,6 +2130,7 @@ g_socket_receive_with_blocking
g_socket_send
g_socket_send_to
g_socket_send_message
g_socket_send_message_with_timeout
g_socket_send_messages
g_socket_send_with_blocking
g_socket_close
@ -3648,6 +3655,7 @@ g_pollable_output_stream_can_poll
g_pollable_output_stream_is_writable
g_pollable_output_stream_create_source
g_pollable_output_stream_write_nonblocking
g_pollable_output_stream_writev_nonblocking
<SUBSECTION Standard>
G_POLLABLE_OUTPUT_STREAM
G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE

View File

@ -29,6 +29,7 @@ G_BEGIN_DECLS
gboolean g_input_stream_async_read_is_via_threads (GInputStream *stream);
gboolean g_input_stream_async_close_is_via_threads (GInputStream *stream);
gboolean g_output_stream_async_write_is_via_threads (GOutputStream *stream);
gboolean g_output_stream_async_writev_is_via_threads (GOutputStream *stream);
gboolean g_output_stream_async_close_is_via_threads (GOutputStream *stream);
void g_socket_connection_set_cached_remote_address (GSocketConnection *connection,

View File

@ -72,6 +72,23 @@ static void g_output_stream_real_write_async (GOutputStream *s
static gssize g_output_stream_real_write_finish (GOutputStream *stream,
GAsyncResult *result,
GError **error);
static gboolean g_output_stream_real_writev (GOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GCancellable *cancellable,
GError **error);
static void g_output_stream_real_writev_async (GOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
int io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer data);
static gboolean g_output_stream_real_writev_finish (GOutputStream *stream,
GAsyncResult *result,
gsize *bytes_written,
GError **error);
static void g_output_stream_real_splice_async (GOutputStream *stream,
GInputStream *source,
GOutputStreamSpliceFlags flags,
@ -134,6 +151,9 @@ g_output_stream_class_init (GOutputStreamClass *klass)
klass->write_async = g_output_stream_real_write_async;
klass->write_finish = g_output_stream_real_write_finish;
klass->writev_fn = g_output_stream_real_writev;
klass->writev_async = g_output_stream_real_writev_async;
klass->writev_finish = g_output_stream_real_writev_finish;
klass->splice_async = g_output_stream_real_splice_async;
klass->splice_finish = g_output_stream_real_splice_finish;
klass->flush_async = g_output_stream_real_flush_async;
@ -299,6 +319,204 @@ g_output_stream_write_all (GOutputStream *stream,
return TRUE;
}
/**
* g_output_stream_writev:
* @stream: a #GOutputStream.
* @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
* @n_vectors: the number of vectors to write
* @bytes_written: (out) (optional): location to store the number of bytes that were
* written to the stream
* @cancellable: (nullable): optional cancellable object
* @error: location to store the error occurring, or %NULL to ignore
*
* Tries to write the bytes contained in the @n_vectors @vectors into the
* stream. Will block during the operation.
*
* If @n_vectors is 0 or the sum of all bytes in @vectors is 0, returns 0 and
* does nothing.
*
* On success, the number of bytes written to the stream is returned.
* It is not an error if this is not the same as the requested size, as it
* can happen e.g. on a partial I/O error, or if there is not enough
* storage in the stream. All writes block until at least one byte
* is written or an error occurs; 0 is never returned (unless
* @n_vectors is 0 or the sum of all bytes in @vectors is 0).
*
* If @cancellable is not %NULL, then the operation can be cancelled by
* triggering the cancellable object from another thread. If the operation
* was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
* operation was partially finished when the operation was cancelled the
* partial result will be returned, without an error.
*
* Some implementations of g_output_stream_writev() may have limitations on the
* aggregate buffer size, and will return %G_IO_ERROR_INVALID_ARGUMENT if these
* are exceeded. For example, when writing to a local file on UNIX platforms,
* the aggregate buffer size must not exceed %G_MAXSSIZE bytes.
*
* Virtual: writev_fn
*
* Returns: %TRUE on success, %FALSE if there was an error
*
* Since: 2.60
*/
gboolean
g_output_stream_writev (GOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GCancellable *cancellable,
GError **error)
{
GOutputStreamClass *class;
gboolean res;
gsize _bytes_written = 0;
if (bytes_written)
*bytes_written = 0;
g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
g_return_val_if_fail (vectors != NULL || n_vectors == 0, FALSE);
g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), FALSE);
g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
if (n_vectors == 0)
return TRUE;
class = G_OUTPUT_STREAM_GET_CLASS (stream);
g_return_val_if_fail (class->writev_fn != NULL, FALSE);
if (!g_output_stream_set_pending (stream, error))
return FALSE;
if (cancellable)
g_cancellable_push_current (cancellable);
res = class->writev_fn (stream, vectors, n_vectors, &_bytes_written, cancellable, error);
g_warn_if_fail (res || _bytes_written == 0);
g_warn_if_fail (res || (error == NULL || *error != NULL));
if (cancellable)
g_cancellable_pop_current (cancellable);
g_output_stream_clear_pending (stream);
if (bytes_written)
*bytes_written = _bytes_written;
return res;
}
/**
* g_output_stream_writev_all:
* @stream: a #GOutputStream.
* @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
* @n_vectors: the number of vectors to write
* @bytes_written: (out) (optional): location to store the number of bytes that were
* written to the stream
* @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
* @error: location to store the error occurring, or %NULL to ignore
*
* Tries to write the bytes contained in the @n_vectors @vectors into the
* stream. Will block during the operation.
*
* This function is similar to g_output_stream_writev(), except it tries to
* write as many bytes as requested, only stopping on an error.
*
* On a successful write of all @n_vectors vectors, %TRUE is returned, and
* @bytes_written is set to the sum of all the sizes of @vectors.
*
* If there is an error during the operation %FALSE is returned and @error
* is set to indicate the error status.
*
* As a special exception to the normal conventions for functions that
* use #GError, if this function returns %FALSE (and sets @error) then
* @bytes_written will be set to the number of bytes that were
* successfully written before the error was encountered. This
* functionality is only available from C. If you need it from another
* language then you must write your own loop around
* g_output_stream_write().
*
* The content of the individual elements of @vectors might be changed by this
* function.
*
* Returns: %TRUE on success, %FALSE if there was an error
*
* Since: 2.60
*/
gboolean
g_output_stream_writev_all (GOutputStream *stream,
GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GCancellable *cancellable,
GError **error)
{
gsize _bytes_written = 0;
gsize i, to_be_written = 0;
if (bytes_written)
*bytes_written = 0;
g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
g_return_val_if_fail (vectors != NULL || n_vectors == 0, FALSE);
g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), FALSE);
g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
/* We can't write more than G_MAXSIZE bytes overall, otherwise we
* would overflow the bytes_written counter */
for (i = 0; i < n_vectors; i++)
{
if (to_be_written > G_MAXSIZE - vectors[i].size)
{
g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
_("Sum of vectors passed to %s too large"), G_STRFUNC);
return FALSE;
}
to_be_written += vectors[i].size;
}
_bytes_written = 0;
while (n_vectors > 0 && to_be_written > 0)
{
gsize n_written = 0;
gboolean res;
res = g_output_stream_writev (stream, vectors, n_vectors, &n_written, cancellable, error);
if (!res)
{
if (bytes_written)
*bytes_written = _bytes_written;
return FALSE;
}
if (n_written == 0)
g_warning ("Write returned zero without error");
_bytes_written += n_written;
/* skip vectors that have been written in full */
while (n_vectors > 0 && n_written >= vectors[0].size)
{
n_written -= vectors[0].size;
++vectors;
--n_vectors;
}
/* skip partially written vector data */
if (n_written > 0 && n_vectors > 0)
{
vectors[0].size -= n_written;
vectors[0].buffer = ((guint8 *) vectors[0].buffer) + n_written;
}
}
if (bytes_written)
*bytes_written = _bytes_written;
return TRUE;
}
/**
* g_output_stream_printf:
* @stream: a #GOutputStream.
@ -923,7 +1141,6 @@ write_all_callback (GObject *stream,
g_task_return_boolean (task, TRUE);
g_object_unref (task);
}
else
g_output_stream_write_async (G_OUTPUT_STREAM (stream),
data->buffer + data->bytes_written,
@ -1060,6 +1277,329 @@ g_output_stream_write_all_finish (GOutputStream *stream,
return g_task_propagate_boolean (task, error);
}
/**
* g_output_stream_writev_async:
* @stream: A #GOutputStream.
* @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
* @n_vectors: the number of vectors to write
* @io_priority: the I/O priority of the request.
* @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
* @callback: (scope async): callback to call when the request is satisfied
* @user_data: (closure): the data to pass to callback function
*
* Request an asynchronous write of the bytes contained in @n_vectors @vectors into
* the stream. When the operation is finished @callback will be called.
* You can then call g_output_stream_writev_finish() to get the result of the
* operation.
*
* During an async request no other sync and async calls are allowed,
* and will result in %G_IO_ERROR_PENDING errors.
*
* On success, the number of bytes written will be passed to the
* @callback. It is not an error if this is not the same as the
* requested size, as it can happen e.g. on a partial I/O error,
* but generally we try to write as many bytes as requested.
*
* You are guaranteed that this method will never fail with
* %G_IO_ERROR_WOULD_BLOCK if @stream can't accept more data, the
* method will just wait until this changes.
*
* Any outstanding I/O request with higher priority (lower numerical
* value) will be executed before an outstanding request with lower
* priority. Default priority is %G_PRIORITY_DEFAULT.
*
* The asynchronous methods have a default fallback that uses threads
* to implement asynchronicity, so they are optional for inheriting
* classes. However, if you override one you must override all.
*
* For the synchronous, blocking version of this function, see
* g_output_stream_writev().
*
* Note that no copy of @vectors will be made, so it must stay valid
* until @callback is called.
*
* Since: 2.60
*/
void
g_output_stream_writev_async (GOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
int io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
GOutputStreamClass *class;
g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
g_return_if_fail (vectors != NULL || n_vectors == 0);
g_return_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable));
class = G_OUTPUT_STREAM_GET_CLASS (stream);
g_return_if_fail (class->writev_async != NULL);
class->writev_async (stream, vectors, n_vectors, io_priority, cancellable,
callback, user_data);
}
/**
* g_output_stream_writev_finish:
* @stream: a #GOutputStream.
* @result: a #GAsyncResult.
* @bytes_written: (out) (optional): location to store the number of bytes that were written to the stream
* @error: a #GError location to store the error occurring, or %NULL to
* ignore.
*
* Finishes a stream writev operation.
*
* Returns: %TRUE on success, %FALSE if there was an error
*
* Since: 2.60
*/
gboolean
g_output_stream_writev_finish (GOutputStream *stream,
GAsyncResult *result,
gsize *bytes_written,
GError **error)
{
GOutputStreamClass *class;
gboolean res;
gsize _bytes_written = 0;
g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
class = G_OUTPUT_STREAM_GET_CLASS (stream);
g_return_val_if_fail (class->writev_finish != NULL, FALSE);
res = class->writev_finish (stream, result, &_bytes_written, error);
g_warn_if_fail (res || _bytes_written == 0);
g_warn_if_fail (res || (error == NULL || *error != NULL));
if (bytes_written)
*bytes_written = _bytes_written;
return res;
}
typedef struct
{
GOutputVector *vectors;
gsize n_vectors; /* (unowned) */
gsize bytes_written;
} AsyncWritevAll;
static void
free_async_writev_all (gpointer data)
{
g_slice_free (AsyncWritevAll, data);
}
static void
writev_all_callback (GObject *stream,
GAsyncResult *result,
gpointer user_data)
{
GTask *task = user_data;
AsyncWritevAll *data = g_task_get_task_data (task);
gint priority = g_task_get_priority (task);
GCancellable *cancellable = g_task_get_cancellable (task);
if (result)
{
GError *error = NULL;
gboolean res;
gsize n_written = 0;
res = g_output_stream_writev_finish (G_OUTPUT_STREAM (stream), result, &n_written, &error);
if (!res)
{
g_task_return_error (task, g_steal_pointer (&error));
g_object_unref (task);
return;
}
g_warn_if_fail (n_written > 0);
data->bytes_written += n_written;
/* skip vectors that have been written in full */
while (data->n_vectors > 0 && n_written >= data->vectors[0].size)
{
n_written -= data->vectors[0].size;
++data->vectors;
--data->n_vectors;
}
/* skip partially written vector data */
if (n_written > 0 && data->n_vectors > 0)
{
data->vectors[0].size -= n_written;
data->vectors[0].buffer = ((guint8 *) data->vectors[0].buffer) + n_written;
}
}
if (data->n_vectors == 0)
{
g_task_return_boolean (task, TRUE);
g_object_unref (task);
}
else
g_output_stream_writev_async (G_OUTPUT_STREAM (stream),
data->vectors,
data->n_vectors,
priority,
cancellable,
writev_all_callback, g_steal_pointer (&task));
}
static void
writev_all_async_thread (GTask *task,
gpointer source_object,
gpointer task_data,
GCancellable *cancellable)
{
GOutputStream *stream = G_OUTPUT_STREAM (source_object);
AsyncWritevAll *data = task_data;
GError *error = NULL;
if (g_output_stream_writev_all (stream, data->vectors, data->n_vectors, &data->bytes_written,
g_task_get_cancellable (task), &error))
g_task_return_boolean (task, TRUE);
else
g_task_return_error (task, g_steal_pointer (&error));
}
/**
* g_output_stream_writev_all_async:
* @stream: A #GOutputStream
* @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
* @n_vectors: the number of vectors to write
* @io_priority: the I/O priority of the request
* @cancellable: (nullable): optional #GCancellable object, %NULL to ignore
* @callback: (scope async): callback to call when the request is satisfied
* @user_data: (closure): the data to pass to callback function
*
* Request an asynchronous write of the bytes contained in the @n_vectors @vectors into
* the stream. When the operation is finished @callback will be called.
* You can then call g_output_stream_writev_all_finish() to get the result of the
* operation.
*
* This is the asynchronous version of g_output_stream_writev_all().
*
* Call g_output_stream_writev_all_finish() to collect the result.
*
* Any outstanding I/O request with higher priority (lower numerical
* value) will be executed before an outstanding request with lower
* priority. Default priority is %G_PRIORITY_DEFAULT.
*
* Note that no copy of @vectors will be made, so it must stay valid
* until @callback is called. The content of the individual elements
* of @vectors might be changed by this function.
*
* Since: 2.60
*/
void
g_output_stream_writev_all_async (GOutputStream *stream,
GOutputVector *vectors,
gsize n_vectors,
int io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
AsyncWritevAll *data;
GTask *task;
gsize i, to_be_written = 0;
g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
g_return_if_fail (vectors != NULL || n_vectors == 0);
g_return_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable));
task = g_task_new (stream, cancellable, callback, user_data);
data = g_slice_new0 (AsyncWritevAll);
data->vectors = vectors;
data->n_vectors = n_vectors;
g_task_set_source_tag (task, g_output_stream_writev_all_async);
g_task_set_task_data (task, data, free_async_writev_all);
g_task_set_priority (task, io_priority);
/* We can't write more than G_MAXSIZE bytes overall, otherwise we
* would overflow the bytes_written counter */
for (i = 0; i < n_vectors; i++)
{
if (to_be_written > G_MAXSIZE - vectors[i].size)
{
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
_("Sum of vectors passed to %s too large"),
G_STRFUNC);
g_object_unref (task);
return;
}
to_be_written += vectors[i].size;
}
/* If async writes are going to be handled via the threadpool anyway
* then we may as well do it with a single dispatch instead of
* bouncing in and out.
*/
if (g_output_stream_async_writev_is_via_threads (stream))
{
g_task_run_in_thread (task, writev_all_async_thread);
g_object_unref (task);
}
else
writev_all_callback (G_OBJECT (stream), NULL, g_steal_pointer (&task));
}
/**
* g_output_stream_writev_all_finish:
* @stream: a #GOutputStream
* @result: a #GAsyncResult
* @bytes_written: (out) (optional): location to store the number of bytes that were written to the stream
* @error: a #GError location to store the error occurring, or %NULL to ignore.
*
* Finishes an asynchronous stream write operation started with
* g_output_stream_writev_all_async().
*
* As a special exception to the normal conventions for functions that
* use #GError, if this function returns %FALSE (and sets @error) then
* @bytes_written will be set to the number of bytes that were
* successfully written before the error was encountered. This
* functionality is only available from C. If you need it from another
* language then you must write your own loop around
* g_output_stream_writev_async().
*
* Returns: %TRUE on success, %FALSE if there was an error
*
* Since: 2.60
*/
gboolean
g_output_stream_writev_all_finish (GOutputStream *stream,
GAsyncResult *result,
gsize *bytes_written,
GError **error)
{
GTask *task;
g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
task = G_TASK (result);
if (bytes_written)
{
AsyncWritevAll *data = (AsyncWritevAll *)g_task_get_task_data (task);
*bytes_written = data->bytes_written;
}
return g_task_propagate_boolean (task, error);
}
static void
write_bytes_callback (GObject *stream,
GAsyncResult *result,
@ -1712,6 +2252,28 @@ g_output_stream_async_write_is_via_threads (GOutputStream *stream)
g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream))));
}
/*< internal >
* g_output_stream_async_writev_is_via_threads:
* @stream: a #GOutputStream.
*
* Checks if an output stream's writev_async function uses threads.
*
* Returns: %TRUE if @stream's writev_async function uses threads.
**/
gboolean
g_output_stream_async_writev_is_via_threads (GOutputStream *stream)
{
GOutputStreamClass *class;
g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
class = G_OUTPUT_STREAM_GET_CLASS (stream);
return (class->writev_async == g_output_stream_real_writev_async &&
!(G_IS_POLLABLE_OUTPUT_STREAM (stream) &&
g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream))));
}
/*< internal >
* g_output_stream_async_close_is_via_threads:
* @stream: output stream
@ -1732,6 +2294,69 @@ g_output_stream_async_close_is_via_threads (GOutputStream *stream)
return class->close_async == g_output_stream_real_close_async;
}
/********************************************
* Default implementation of sync ops *
********************************************/
static gboolean
g_output_stream_real_writev (GOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GCancellable *cancellable,
GError **error)
{
GOutputStreamClass *class;
gsize _bytes_written = 0;
gsize i;
GError *err = NULL;
class = G_OUTPUT_STREAM_GET_CLASS (stream);
if (bytes_written)
*bytes_written = 0;
for (i = 0; i < n_vectors; i++)
{
gssize res = 0;
/* Would we overflow here? In that case simply return and let the caller
* handle this like a short write */
if (_bytes_written > G_MAXSIZE - vectors[i].size)
break;
res = class->write_fn (stream, vectors[i].buffer, vectors[i].size, cancellable, &err);
if (res == -1)
{
/* If we already wrote something we handle this like a short write
* and assume that on the next call the same error happens again, or
* everything finishes successfully without data loss then
*/
if (_bytes_written > 0)
{
if (bytes_written)
*bytes_written = _bytes_written;
g_clear_error (&err);
return TRUE;
}
g_propagate_error (error, err);
return FALSE;
}
_bytes_written += res;
/* if we had a short write break the loop here */
if (res < vectors[i].size)
break;
}
if (bytes_written)
*bytes_written = _bytes_written;
return TRUE;
}
/********************************************
* Default implementation of async ops *
********************************************/
@ -1852,6 +2477,172 @@ g_output_stream_real_write_finish (GOutputStream *stream,
return g_task_propagate_int (G_TASK (result), error);
}
typedef struct {
const GOutputVector *vectors;
gsize n_vectors; /* (unowned) */
gsize bytes_written;
} WritevData;
static void
free_writev_data (WritevData *op)
{
g_slice_free (WritevData, op);
}
static void
writev_async_thread (GTask *task,
gpointer source_object,
gpointer task_data,
GCancellable *cancellable)
{
GOutputStream *stream = source_object;
WritevData *op = task_data;
GOutputStreamClass *class;
GError *error = NULL;
gboolean res;
class = G_OUTPUT_STREAM_GET_CLASS (stream);
res = class->writev_fn (stream, op->vectors, op->n_vectors,
&op->bytes_written, cancellable, &error);
g_warn_if_fail (res || op->bytes_written == 0);
g_warn_if_fail (res || error != NULL);
if (!res)
g_task_return_error (task, g_steal_pointer (&error));
else
g_task_return_boolean (task, TRUE);
}
static void writev_async_pollable (GPollableOutputStream *stream,
GTask *task);
static gboolean
writev_async_pollable_ready (GPollableOutputStream *stream,
gpointer user_data)
{
GTask *task = user_data;
writev_async_pollable (stream, task);
return G_SOURCE_REMOVE;
}
static void
writev_async_pollable (GPollableOutputStream *stream,
GTask *task)
{
GError *error = NULL;
WritevData *op = g_task_get_task_data (task);
GPollableReturn res;
gsize bytes_written = 0;
if (g_task_return_error_if_cancelled (task))
return;
res = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
writev_nonblocking (stream, op->vectors, op->n_vectors, &bytes_written, &error);
switch (res)
{
case G_POLLABLE_RETURN_WOULD_BLOCK:
{
GSource *source;
g_warn_if_fail (error == NULL);
g_warn_if_fail (bytes_written == 0);
source = g_pollable_output_stream_create_source (stream,
g_task_get_cancellable (task));
g_task_attach_source (task, source,
(GSourceFunc) writev_async_pollable_ready);
g_source_unref (source);
}
break;
case G_POLLABLE_RETURN_OK:
g_warn_if_fail (error == NULL);
op->bytes_written = bytes_written;
g_task_return_boolean (task, TRUE);
break;
case G_POLLABLE_RETURN_FAILED:
g_warn_if_fail (bytes_written == 0);
g_warn_if_fail (error != NULL);
g_task_return_error (task, g_steal_pointer (&error));
break;
default:
g_assert_not_reached ();
}
}
static void
g_output_stream_real_writev_async (GOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
int io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
GTask *task;
WritevData *op;
GError *error = NULL;
op = g_slice_new0 (WritevData);
task = g_task_new (stream, cancellable, callback, user_data);
op->vectors = vectors;
op->n_vectors = n_vectors;
g_task_set_check_cancellable (task, FALSE);
g_task_set_source_tag (task, g_output_stream_writev_async);
g_task_set_priority (task, io_priority);
g_task_set_task_data (task, op, (GDestroyNotify) free_writev_data);
if (n_vectors == 0)
{
g_task_return_boolean (task, TRUE);
g_object_unref (task);
return;
}
if (!g_output_stream_set_pending (stream, &error))
{
g_task_return_error (task, g_steal_pointer (&error));
g_object_unref (task);
return;
}
if (!g_output_stream_async_writev_is_via_threads (stream))
writev_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), task);
else
g_task_run_in_thread (task, writev_async_thread);
g_object_unref (task);
}
static gboolean
g_output_stream_real_writev_finish (GOutputStream *stream,
GAsyncResult *result,
gsize *bytes_written,
GError **error)
{
GTask *task;
g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_writev_async), FALSE);
g_output_stream_clear_pending (stream);
task = G_TASK (result);
if (bytes_written)
{
WritevData *op = g_task_get_task_data (task);
*bytes_written = op->bytes_written;
}
return g_task_propagate_boolean (task, error);
}
typedef struct {
GInputStream *source;
GOutputStreamSpliceFlags flags;

View File

@ -119,11 +119,28 @@ struct _GOutputStreamClass
GAsyncResult *result,
GError **error);
gboolean (* writev_fn) (GOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GCancellable *cancellable,
GError **error);
void (* writev_async) (GOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
int io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data);
gboolean (* writev_finish) (GOutputStream *stream,
GAsyncResult *result,
gsize *bytes_written,
GError **error);
/*< private >*/
/* Padding for future expansion */
void (*_g_reserved1) (void);
void (*_g_reserved2) (void);
void (*_g_reserved3) (void);
void (*_g_reserved4) (void);
void (*_g_reserved5) (void);
void (*_g_reserved6) (void);
@ -147,6 +164,22 @@ gboolean g_output_stream_write_all (GOutputStream *stream,
gsize *bytes_written,
GCancellable *cancellable,
GError **error);
GLIB_AVAILABLE_IN_2_60
gboolean g_output_stream_writev (GOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GCancellable *cancellable,
GError **error);
GLIB_AVAILABLE_IN_2_60
gboolean g_output_stream_writev_all (GOutputStream *stream,
GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GCancellable *cancellable,
GError **error);
GLIB_AVAILABLE_IN_2_40
gboolean g_output_stream_printf (GOutputStream *stream,
gsize *bytes_written,
@ -208,6 +241,35 @@ gboolean g_output_stream_write_all_finish (GOutputStream *stream,
gsize *bytes_written,
GError **error);
GLIB_AVAILABLE_IN_2_60
void g_output_stream_writev_async (GOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
int io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data);
GLIB_AVAILABLE_IN_2_60
gboolean g_output_stream_writev_finish (GOutputStream *stream,
GAsyncResult *result,
gsize *bytes_written,
GError **error);
GLIB_AVAILABLE_IN_2_60
void g_output_stream_writev_all_async (GOutputStream *stream,
GOutputVector *vectors,
gsize n_vectors,
int io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data);
GLIB_AVAILABLE_IN_2_60
gboolean g_output_stream_writev_all_finish (GOutputStream *stream,
GAsyncResult *result,
gsize *bytes_written,
GError **error);
GLIB_AVAILABLE_IN_2_34
void g_output_stream_write_bytes_async (GOutputStream *stream,
GBytes *bytes,

View File

@ -41,17 +41,23 @@
G_DEFINE_INTERFACE (GPollableOutputStream, g_pollable_output_stream, G_TYPE_OUTPUT_STREAM)
static gboolean g_pollable_output_stream_default_can_poll (GPollableOutputStream *stream);
static gssize g_pollable_output_stream_default_write_nonblocking (GPollableOutputStream *stream,
const void *buffer,
gsize count,
GError **error);
static gboolean g_pollable_output_stream_default_can_poll (GPollableOutputStream *stream);
static gssize g_pollable_output_stream_default_write_nonblocking (GPollableOutputStream *stream,
const void *buffer,
gsize count,
GError **error);
static GPollableReturn g_pollable_output_stream_default_writev_nonblocking (GPollableOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GError **error);
static void
g_pollable_output_stream_default_init (GPollableOutputStreamInterface *iface)
{
iface->can_poll = g_pollable_output_stream_default_can_poll;
iface->write_nonblocking = g_pollable_output_stream_default_write_nonblocking;
iface->can_poll = g_pollable_output_stream_default_can_poll;
iface->write_nonblocking = g_pollable_output_stream_default_write_nonblocking;
iface->writev_nonblocking = g_pollable_output_stream_default_writev_nonblocking;
}
static gboolean
@ -157,6 +163,67 @@ g_pollable_output_stream_default_write_nonblocking (GPollableOutputStream *stre
write_fn (G_OUTPUT_STREAM (stream), buffer, count, NULL, error);
}
static GPollableReturn
g_pollable_output_stream_default_writev_nonblocking (GPollableOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GError **error)
{
gsize _bytes_written = 0;
GPollableOutputStreamInterface *iface = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream);
gsize i;
GError *err = NULL;
for (i = 0; i < n_vectors; i++)
{
gssize res;
/* Would we overflow here? In that case simply return and let the caller
* handle this like a short write */
if (_bytes_written > G_MAXSIZE - vectors[i].size)
break;
res = iface->write_nonblocking (stream, vectors[i].buffer, vectors[i].size, &err);
if (res == -1)
{
if (bytes_written)
*bytes_written = _bytes_written;
/* If something was written already we handle this like a short
* write and assume that the next call would either give the same
* error again or successfully finish writing without errors or data
* loss
*/
if (_bytes_written > 0)
{
g_clear_error (&err);
return G_POLLABLE_RETURN_OK;
}
else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
{
g_clear_error (&err);
return G_POLLABLE_RETURN_WOULD_BLOCK;
}
else
{
g_propagate_error (error, err);
return G_POLLABLE_RETURN_FAILED;
}
}
_bytes_written += res;
/* if we had a short write break the loop here */
if (res < vectors[i].size)
break;
}
if (bytes_written)
*bytes_written = _bytes_written;
return G_POLLABLE_RETURN_OK;
}
/**
* g_pollable_output_stream_write_nonblocking:
* @stream: a #GPollableOutputStream
@ -179,7 +246,8 @@ g_pollable_output_stream_default_write_nonblocking (GPollableOutputStream *stre
* to having been cancelled.
*
* Also note that if %G_IO_ERROR_WOULD_BLOCK is returned some underlying
* transports like D/TLS require that you send the same @buffer and @count.
* transports like D/TLS require that you re-send the same @buffer and
* @count in the next write call.
*
* Virtual: write_nonblocking
* Returns: the number of bytes written, or -1 on error (including
@ -221,3 +289,91 @@ g_pollable_output_stream_write_nonblocking (GPollableOutputStream *stream,
return res;
}
/**
* g_pollable_output_stream_writev_nonblocking:
* @stream: a #GPollableOutputStream
* @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
* @n_vectors: the number of vectors to write
* @bytes_written: (out) (optional): location to store the number of bytes that were
* written to the stream
* @cancellable: (nullable): a #GCancellable, or %NULL
* @error: #GError for error reporting, or %NULL to ignore.
*
* Attempts to write the bytes contained in the @n_vectors @vectors to @stream,
* as with g_output_stream_writev(). If @stream is not currently writable,
* this will immediately return %@G_POLLABLE_RETURN_WOULD_BLOCK, and you can
* use g_pollable_output_stream_create_source() to create a #GSource
* that will be triggered when @stream is writable. @error will *not* be
* set in that case.
*
* Note that since this method never blocks, you cannot actually
* use @cancellable to cancel it. However, it will return an error
* if @cancellable has already been cancelled when you call, which
* may happen if you call this method after a source triggers due
* to having been cancelled.
*
* Also note that if %G_POLLABLE_RETURN_WOULD_BLOCK is returned some underlying
* transports like D/TLS require that you re-send the same @vectors and
* @n_vectors in the next write call.
*
* Virtual: writev_nonblocking
*
* Returns: %@G_POLLABLE_RETURN_OK on success, %G_POLLABLE_RETURN_WOULD_BLOCK
* if the stream is not currently writable (and @error is *not* set), or
* %G_POLLABLE_RETURN_FAILED if there was an error in which case @error will
* be set.
*
* Since: 2.60
*/
GPollableReturn
g_pollable_output_stream_writev_nonblocking (GPollableOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GCancellable *cancellable,
GError **error)
{
GPollableOutputStreamInterface *iface;
GPollableReturn res;
gsize _bytes_written = 0;
if (bytes_written)
*bytes_written = 0;
g_return_val_if_fail (G_IS_POLLABLE_OUTPUT_STREAM (stream), G_POLLABLE_RETURN_FAILED);
g_return_val_if_fail (vectors != NULL || n_vectors == 0, G_POLLABLE_RETURN_FAILED);
g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), G_POLLABLE_RETURN_FAILED);
g_return_val_if_fail (error == NULL || *error == NULL, G_POLLABLE_RETURN_FAILED);
if (g_cancellable_set_error_if_cancelled (cancellable, error))
return G_POLLABLE_RETURN_FAILED;
if (n_vectors == 0)
return G_POLLABLE_RETURN_OK;
iface = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream);
g_return_val_if_fail (iface->writev_nonblocking != NULL, G_POLLABLE_RETURN_FAILED);
if (cancellable)
g_cancellable_push_current (cancellable);
res = iface->
writev_nonblocking (stream, vectors, n_vectors, &_bytes_written, error);
if (cancellable)
g_cancellable_pop_current (cancellable);
if (res == G_POLLABLE_RETURN_FAILED)
g_warn_if_fail (error == NULL || (*error != NULL && !g_error_matches (*error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)));
else if (res == G_POLLABLE_RETURN_WOULD_BLOCK)
g_warn_if_fail (error == NULL || *error == NULL);
/* in case of not-OK nothing must've been written */
g_warn_if_fail (res == G_POLLABLE_RETURN_OK || _bytes_written == 0);
if (bytes_written)
*bytes_written = _bytes_written;
return res;
}

View File

@ -77,6 +77,11 @@ struct _GPollableOutputStreamInterface
const void *buffer,
gsize count,
GError **error);
GPollableReturn (*writev_nonblocking) (GPollableOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GError **error);
};
GLIB_AVAILABLE_IN_ALL
@ -98,6 +103,14 @@ gssize g_pollable_output_stream_write_nonblocking (GPollableOutputStream *str
GCancellable *cancellable,
GError **error);
GLIB_AVAILABLE_IN_2_60
GPollableReturn g_pollable_output_stream_writev_nonblocking (GPollableOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GCancellable *cancellable,
GError **error);
G_END_DECLS