Merge branch 'writev' into 'master'

Add writev() API to GOutputStream and GPollableOutputStream

See merge request GNOME/glib!333
This commit is contained in:
Philip Withnall 2019-01-24 14:42:51 +00:00
commit 791819c79d
17 changed files with 3212 additions and 146 deletions

View File

@ -843,6 +843,12 @@ g_output_stream_write
g_output_stream_write_all
g_output_stream_write_all_async
g_output_stream_write_all_finish
g_output_stream_writev
g_output_stream_writev_all
g_output_stream_writev_async
g_output_stream_writev_finish
g_output_stream_writev_all_async
g_output_stream_writev_all_finish
g_output_stream_splice
g_output_stream_flush
g_output_stream_close
@ -2124,6 +2130,7 @@ g_socket_receive_with_blocking
g_socket_send
g_socket_send_to
g_socket_send_message
g_socket_send_message_with_timeout
g_socket_send_messages
g_socket_send_with_blocking
g_socket_close
@ -3648,6 +3655,7 @@ g_pollable_output_stream_can_poll
g_pollable_output_stream_is_writable
g_pollable_output_stream_create_source
g_pollable_output_stream_write_nonblocking
g_pollable_output_stream_writev_nonblocking
<SUBSECTION Standard>
G_POLLABLE_OUTPUT_STREAM
G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE
@ -3659,6 +3667,7 @@ g_pollable_output_stream_get_type
<SECTION>
<FILE>gpollableutils</FILE>
GPollableReturn
GPollableSourceFunc
g_pollable_source_new
g_pollable_source_new_full

View File

@ -512,6 +512,9 @@ typedef enum {
* ]|
* but should instead treat all unrecognized error codes the same as
* #G_IO_ERROR_FAILED.
*
* See also #GPollableReturn for a cheaper way of returning
* %G_IO_ERROR_WOULD_BLOCK to callers without allocating a #GError.
**/
typedef enum {
G_IO_ERROR_FAILED,
@ -1922,6 +1925,30 @@ typedef enum {
G_NETWORK_CONNECTIVITY_FULL = 4
} GNetworkConnectivity;
/**
* GPollableReturn:
* @G_POLLABLE_RETURN_FAILED: Generic error condition for when an operation fails.
* @G_POLLABLE_RETURN_OK: The operation was successfully finished.
* @G_POLLABLE_RETURN_WOULD_BLOCK: The operation would block.
*
* Return value for various IO operations that signal errors via the
* return value and not necessarily via a #GError.
*
* This enum exists to be able to return errors to callers without having to
* allocate a #GError. Allocating #GErrors can be quite expensive for
* regularly happening errors like %G_IO_ERROR_WOULD_BLOCK.
*
* In case of %G_POLLABLE_RETURN_FAILED a #GError should be set for the
* operation to give details about the error that happened.
*
* Since: 2.60
*/
typedef enum {
G_POLLABLE_RETURN_FAILED = 0,
G_POLLABLE_RETURN_OK = 1,
G_POLLABLE_RETURN_WOULD_BLOCK = -G_IO_ERROR_WOULD_BLOCK
} GPollableReturn;
G_END_DECLS
#endif /* __GIO_ENUMS_H__ */

View File

@ -29,6 +29,7 @@ G_BEGIN_DECLS
gboolean g_input_stream_async_read_is_via_threads (GInputStream *stream);
gboolean g_input_stream_async_close_is_via_threads (GInputStream *stream);
gboolean g_output_stream_async_write_is_via_threads (GOutputStream *stream);
gboolean g_output_stream_async_writev_is_via_threads (GOutputStream *stream);
gboolean g_output_stream_async_close_is_via_threads (GOutputStream *stream);
void g_socket_connection_set_cached_remote_address (GSocketConnection *connection,

View File

@ -38,6 +38,7 @@
#ifdef G_OS_UNIX
#include <unistd.h>
#include "gfiledescriptorbased.h"
#include <sys/uio.h>
#endif
#include "glib-private.h"
@ -93,6 +94,14 @@ static gssize g_local_file_output_stream_write (GOutputStream *s
gsize count,
GCancellable *cancellable,
GError **error);
#ifdef G_OS_UNIX
static gboolean g_local_file_output_stream_writev (GOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GCancellable *cancellable,
GError **error);
#endif
static gboolean g_local_file_output_stream_close (GOutputStream *stream,
GCancellable *cancellable,
GError **error);
@ -142,6 +151,9 @@ g_local_file_output_stream_class_init (GLocalFileOutputStreamClass *klass)
gobject_class->finalize = g_local_file_output_stream_finalize;
stream_class->write_fn = g_local_file_output_stream_write;
#ifdef G_OS_UNIX
stream_class->writev_fn = g_local_file_output_stream_writev;
#endif
stream_class->close_fn = g_local_file_output_stream_close;
file_stream_class->query_info = g_local_file_output_stream_query_info;
file_stream_class->get_etag = g_local_file_output_stream_get_etag;
@ -203,6 +215,89 @@ g_local_file_output_stream_write (GOutputStream *stream,
return res;
}
/* On Windows there is no equivalent API for files. The closest API to that is
* WriteFileGather() but it is useless in general: it requires, among other
* things, that each chunk is the size of a whole page and in memory aligned
* to a page. We can't possibly guarantee that in GLib.
*/
#ifdef G_OS_UNIX
/* Macro to check if struct iovec and GOutputVector have the same ABI */
#define G_OUTPUT_VECTOR_IS_IOVEC (sizeof (struct iovec) == sizeof (GOutputVector) && \
sizeof ((struct iovec *) 0)->iov_base == sizeof ((GOutputVector *) 0)->buffer && \
G_STRUCT_OFFSET (struct iovec, iov_base) == G_STRUCT_OFFSET (GOutputVector, buffer) && \
sizeof ((struct iovec *) 0)->iov_len == sizeof((GOutputVector *) 0)->size && \
G_STRUCT_OFFSET (struct iovec, iov_len) == G_STRUCT_OFFSET (GOutputVector, size))
static gboolean
g_local_file_output_stream_writev (GOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GCancellable *cancellable,
GError **error)
{
GLocalFileOutputStream *file;
gssize res;
struct iovec *iov;
if (bytes_written)
*bytes_written = 0;
/* Clamp to G_MAXINT as writev() takes an integer for the number of vectors.
* We handle this like a short write in this case
*/
if (n_vectors > G_MAXINT)
n_vectors = G_MAXINT;
file = G_LOCAL_FILE_OUTPUT_STREAM (stream);
if (G_OUTPUT_VECTOR_IS_IOVEC)
{
/* ABI is compatible */
iov = (struct iovec *) vectors;
}
else
{
gsize i;
/* ABI is incompatible */
iov = g_newa (struct iovec, n_vectors);
for (i = 0; i < n_vectors; i++)
{
iov[i].iov_base = (void *)vectors[i].buffer;
iov[i].iov_len = vectors[i].size;
}
}
while (1)
{
if (g_cancellable_set_error_if_cancelled (cancellable, error))
return FALSE;
res = writev (file->priv->fd, iov, n_vectors);
if (res == -1)
{
int errsv = errno;
if (errsv == EINTR)
continue;
g_set_error (error, G_IO_ERROR,
g_io_error_from_errno (errsv),
_("Error writing to file: %s"),
g_strerror (errsv));
}
else if (bytes_written)
{
*bytes_written = res;
}
break;
}
return res != -1;
}
#endif
void
_g_local_file_output_stream_set_do_close (GLocalFileOutputStream *out,
gboolean do_close)

View File

@ -72,6 +72,23 @@ static void g_output_stream_real_write_async (GOutputStream *s
static gssize g_output_stream_real_write_finish (GOutputStream *stream,
GAsyncResult *result,
GError **error);
static gboolean g_output_stream_real_writev (GOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GCancellable *cancellable,
GError **error);
static void g_output_stream_real_writev_async (GOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
int io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer data);
static gboolean g_output_stream_real_writev_finish (GOutputStream *stream,
GAsyncResult *result,
gsize *bytes_written,
GError **error);
static void g_output_stream_real_splice_async (GOutputStream *stream,
GInputStream *source,
GOutputStreamSpliceFlags flags,
@ -134,6 +151,9 @@ g_output_stream_class_init (GOutputStreamClass *klass)
klass->write_async = g_output_stream_real_write_async;
klass->write_finish = g_output_stream_real_write_finish;
klass->writev_fn = g_output_stream_real_writev;
klass->writev_async = g_output_stream_real_writev_async;
klass->writev_finish = g_output_stream_real_writev_finish;
klass->splice_async = g_output_stream_real_splice_async;
klass->splice_finish = g_output_stream_real_splice_finish;
klass->flush_async = g_output_stream_real_flush_async;
@ -286,9 +306,7 @@ g_output_stream_write_all (GOutputStream *stream,
*bytes_written = _bytes_written;
return FALSE;
}
if (res == 0)
g_warning ("Write returned zero without error");
g_return_val_if_fail (res > 0, FALSE);
_bytes_written += res;
}
@ -299,6 +317,203 @@ g_output_stream_write_all (GOutputStream *stream,
return TRUE;
}
/**
* g_output_stream_writev:
* @stream: a #GOutputStream.
* @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
* @n_vectors: the number of vectors to write
* @bytes_written: (out) (optional): location to store the number of bytes that were
* written to the stream
* @cancellable: (nullable): optional cancellable object
* @error: location to store the error occurring, or %NULL to ignore
*
* Tries to write the bytes contained in the @n_vectors @vectors into the
* stream. Will block during the operation.
*
* If @n_vectors is 0 or the sum of all bytes in @vectors is 0, returns 0 and
* does nothing.
*
* On success, the number of bytes written to the stream is returned.
* It is not an error if this is not the same as the requested size, as it
* can happen e.g. on a partial I/O error, or if there is not enough
* storage in the stream. All writes block until at least one byte
* is written or an error occurs; 0 is never returned (unless
* @n_vectors is 0 or the sum of all bytes in @vectors is 0).
*
* If @cancellable is not %NULL, then the operation can be cancelled by
* triggering the cancellable object from another thread. If the operation
* was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
* operation was partially finished when the operation was cancelled the
* partial result will be returned, without an error.
*
* Some implementations of g_output_stream_writev() may have limitations on the
* aggregate buffer size, and will return %G_IO_ERROR_INVALID_ARGUMENT if these
* are exceeded. For example, when writing to a local file on UNIX platforms,
* the aggregate buffer size must not exceed %G_MAXSSIZE bytes.
*
* Virtual: writev_fn
*
* Returns: %TRUE on success, %FALSE if there was an error
*
* Since: 2.60
*/
gboolean
g_output_stream_writev (GOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GCancellable *cancellable,
GError **error)
{
GOutputStreamClass *class;
gboolean res;
gsize _bytes_written = 0;
if (bytes_written)
*bytes_written = 0;
g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
g_return_val_if_fail (vectors != NULL || n_vectors == 0, FALSE);
g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), FALSE);
g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
if (n_vectors == 0)
return TRUE;
class = G_OUTPUT_STREAM_GET_CLASS (stream);
g_return_val_if_fail (class->writev_fn != NULL, FALSE);
if (!g_output_stream_set_pending (stream, error))
return FALSE;
if (cancellable)
g_cancellable_push_current (cancellable);
res = class->writev_fn (stream, vectors, n_vectors, &_bytes_written, cancellable, error);
g_warn_if_fail (res || _bytes_written == 0);
g_warn_if_fail (res || (error == NULL || *error != NULL));
if (cancellable)
g_cancellable_pop_current (cancellable);
g_output_stream_clear_pending (stream);
if (bytes_written)
*bytes_written = _bytes_written;
return res;
}
/**
* g_output_stream_writev_all:
* @stream: a #GOutputStream.
* @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
* @n_vectors: the number of vectors to write
* @bytes_written: (out) (optional): location to store the number of bytes that were
* written to the stream
* @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
* @error: location to store the error occurring, or %NULL to ignore
*
* Tries to write the bytes contained in the @n_vectors @vectors into the
* stream. Will block during the operation.
*
* This function is similar to g_output_stream_writev(), except it tries to
* write as many bytes as requested, only stopping on an error.
*
* On a successful write of all @n_vectors vectors, %TRUE is returned, and
* @bytes_written is set to the sum of all the sizes of @vectors.
*
* If there is an error during the operation %FALSE is returned and @error
* is set to indicate the error status.
*
* As a special exception to the normal conventions for functions that
* use #GError, if this function returns %FALSE (and sets @error) then
* @bytes_written will be set to the number of bytes that were
* successfully written before the error was encountered. This
* functionality is only available from C. If you need it from another
* language then you must write your own loop around
* g_output_stream_write().
*
* The content of the individual elements of @vectors might be changed by this
* function.
*
* Returns: %TRUE on success, %FALSE if there was an error
*
* Since: 2.60
*/
gboolean
g_output_stream_writev_all (GOutputStream *stream,
GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GCancellable *cancellable,
GError **error)
{
gsize _bytes_written = 0;
gsize i, to_be_written = 0;
if (bytes_written)
*bytes_written = 0;
g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
g_return_val_if_fail (vectors != NULL || n_vectors == 0, FALSE);
g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), FALSE);
g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
/* We can't write more than G_MAXSIZE bytes overall, otherwise we
* would overflow the bytes_written counter */
for (i = 0; i < n_vectors; i++)
{
if (to_be_written > G_MAXSIZE - vectors[i].size)
{
g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
_("Sum of vectors passed to %s too large"), G_STRFUNC);
return FALSE;
}
to_be_written += vectors[i].size;
}
_bytes_written = 0;
while (n_vectors > 0 && to_be_written > 0)
{
gsize n_written = 0;
gboolean res;
res = g_output_stream_writev (stream, vectors, n_vectors, &n_written, cancellable, error);
if (!res)
{
if (bytes_written)
*bytes_written = _bytes_written;
return FALSE;
}
g_return_val_if_fail (n_written > 0, FALSE);
_bytes_written += n_written;
/* skip vectors that have been written in full */
while (n_vectors > 0 && n_written >= vectors[0].size)
{
n_written -= vectors[0].size;
++vectors;
--n_vectors;
}
/* skip partially written vector data */
if (n_written > 0 && n_vectors > 0)
{
vectors[0].size -= n_written;
vectors[0].buffer = ((guint8 *) vectors[0].buffer) + n_written;
}
}
if (bytes_written)
*bytes_written = _bytes_written;
return TRUE;
}
/**
* g_output_stream_printf:
* @stream: a #GOutputStream.
@ -923,7 +1138,6 @@ write_all_callback (GObject *stream,
g_task_return_boolean (task, TRUE);
g_object_unref (task);
}
else
g_output_stream_write_async (G_OUTPUT_STREAM (stream),
data->buffer + data->bytes_written,
@ -1060,6 +1274,329 @@ g_output_stream_write_all_finish (GOutputStream *stream,
return g_task_propagate_boolean (task, error);
}
/**
* g_output_stream_writev_async:
* @stream: A #GOutputStream.
* @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
* @n_vectors: the number of vectors to write
* @io_priority: the I/O priority of the request.
* @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
* @callback: (scope async): callback to call when the request is satisfied
* @user_data: (closure): the data to pass to callback function
*
* Request an asynchronous write of the bytes contained in @n_vectors @vectors into
* the stream. When the operation is finished @callback will be called.
* You can then call g_output_stream_writev_finish() to get the result of the
* operation.
*
* During an async request no other sync and async calls are allowed,
* and will result in %G_IO_ERROR_PENDING errors.
*
* On success, the number of bytes written will be passed to the
* @callback. It is not an error if this is not the same as the
* requested size, as it can happen e.g. on a partial I/O error,
* but generally we try to write as many bytes as requested.
*
* You are guaranteed that this method will never fail with
* %G_IO_ERROR_WOULD_BLOCK if @stream can't accept more data, the
* method will just wait until this changes.
*
* Any outstanding I/O request with higher priority (lower numerical
* value) will be executed before an outstanding request with lower
* priority. Default priority is %G_PRIORITY_DEFAULT.
*
* The asynchronous methods have a default fallback that uses threads
* to implement asynchronicity, so they are optional for inheriting
* classes. However, if you override one you must override all.
*
* For the synchronous, blocking version of this function, see
* g_output_stream_writev().
*
* Note that no copy of @vectors will be made, so it must stay valid
* until @callback is called.
*
* Since: 2.60
*/
void
g_output_stream_writev_async (GOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
int io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
GOutputStreamClass *class;
g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
g_return_if_fail (vectors != NULL || n_vectors == 0);
g_return_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable));
class = G_OUTPUT_STREAM_GET_CLASS (stream);
g_return_if_fail (class->writev_async != NULL);
class->writev_async (stream, vectors, n_vectors, io_priority, cancellable,
callback, user_data);
}
/**
* g_output_stream_writev_finish:
* @stream: a #GOutputStream.
* @result: a #GAsyncResult.
* @bytes_written: (out) (optional): location to store the number of bytes that were written to the stream
* @error: a #GError location to store the error occurring, or %NULL to
* ignore.
*
* Finishes a stream writev operation.
*
* Returns: %TRUE on success, %FALSE if there was an error
*
* Since: 2.60
*/
gboolean
g_output_stream_writev_finish (GOutputStream *stream,
GAsyncResult *result,
gsize *bytes_written,
GError **error)
{
GOutputStreamClass *class;
gboolean res;
gsize _bytes_written = 0;
g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
class = G_OUTPUT_STREAM_GET_CLASS (stream);
g_return_val_if_fail (class->writev_finish != NULL, FALSE);
res = class->writev_finish (stream, result, &_bytes_written, error);
g_warn_if_fail (res || _bytes_written == 0);
g_warn_if_fail (res || (error == NULL || *error != NULL));
if (bytes_written)
*bytes_written = _bytes_written;
return res;
}
typedef struct
{
GOutputVector *vectors;
gsize n_vectors; /* (unowned) */
gsize bytes_written;
} AsyncWritevAll;
static void
free_async_writev_all (gpointer data)
{
g_slice_free (AsyncWritevAll, data);
}
static void
writev_all_callback (GObject *stream,
GAsyncResult *result,
gpointer user_data)
{
GTask *task = user_data;
AsyncWritevAll *data = g_task_get_task_data (task);
gint priority = g_task_get_priority (task);
GCancellable *cancellable = g_task_get_cancellable (task);
if (result)
{
GError *error = NULL;
gboolean res;
gsize n_written = 0;
res = g_output_stream_writev_finish (G_OUTPUT_STREAM (stream), result, &n_written, &error);
if (!res)
{
g_task_return_error (task, g_steal_pointer (&error));
g_object_unref (task);
return;
}
g_warn_if_fail (n_written > 0);
data->bytes_written += n_written;
/* skip vectors that have been written in full */
while (data->n_vectors > 0 && n_written >= data->vectors[0].size)
{
n_written -= data->vectors[0].size;
++data->vectors;
--data->n_vectors;
}
/* skip partially written vector data */
if (n_written > 0 && data->n_vectors > 0)
{
data->vectors[0].size -= n_written;
data->vectors[0].buffer = ((guint8 *) data->vectors[0].buffer) + n_written;
}
}
if (data->n_vectors == 0)
{
g_task_return_boolean (task, TRUE);
g_object_unref (task);
}
else
g_output_stream_writev_async (G_OUTPUT_STREAM (stream),
data->vectors,
data->n_vectors,
priority,
cancellable,
writev_all_callback, g_steal_pointer (&task));
}
static void
writev_all_async_thread (GTask *task,
gpointer source_object,
gpointer task_data,
GCancellable *cancellable)
{
GOutputStream *stream = G_OUTPUT_STREAM (source_object);
AsyncWritevAll *data = task_data;
GError *error = NULL;
if (g_output_stream_writev_all (stream, data->vectors, data->n_vectors, &data->bytes_written,
g_task_get_cancellable (task), &error))
g_task_return_boolean (task, TRUE);
else
g_task_return_error (task, g_steal_pointer (&error));
}
/**
* g_output_stream_writev_all_async:
* @stream: A #GOutputStream
* @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
* @n_vectors: the number of vectors to write
* @io_priority: the I/O priority of the request
* @cancellable: (nullable): optional #GCancellable object, %NULL to ignore
* @callback: (scope async): callback to call when the request is satisfied
* @user_data: (closure): the data to pass to callback function
*
* Request an asynchronous write of the bytes contained in the @n_vectors @vectors into
* the stream. When the operation is finished @callback will be called.
* You can then call g_output_stream_writev_all_finish() to get the result of the
* operation.
*
* This is the asynchronous version of g_output_stream_writev_all().
*
* Call g_output_stream_writev_all_finish() to collect the result.
*
* Any outstanding I/O request with higher priority (lower numerical
* value) will be executed before an outstanding request with lower
* priority. Default priority is %G_PRIORITY_DEFAULT.
*
* Note that no copy of @vectors will be made, so it must stay valid
* until @callback is called. The content of the individual elements
* of @vectors might be changed by this function.
*
* Since: 2.60
*/
void
g_output_stream_writev_all_async (GOutputStream *stream,
GOutputVector *vectors,
gsize n_vectors,
int io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
AsyncWritevAll *data;
GTask *task;
gsize i, to_be_written = 0;
g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
g_return_if_fail (vectors != NULL || n_vectors == 0);
g_return_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable));
task = g_task_new (stream, cancellable, callback, user_data);
data = g_slice_new0 (AsyncWritevAll);
data->vectors = vectors;
data->n_vectors = n_vectors;
g_task_set_source_tag (task, g_output_stream_writev_all_async);
g_task_set_task_data (task, data, free_async_writev_all);
g_task_set_priority (task, io_priority);
/* We can't write more than G_MAXSIZE bytes overall, otherwise we
* would overflow the bytes_written counter */
for (i = 0; i < n_vectors; i++)
{
if (to_be_written > G_MAXSIZE - vectors[i].size)
{
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
_("Sum of vectors passed to %s too large"),
G_STRFUNC);
g_object_unref (task);
return;
}
to_be_written += vectors[i].size;
}
/* If async writes are going to be handled via the threadpool anyway
* then we may as well do it with a single dispatch instead of
* bouncing in and out.
*/
if (g_output_stream_async_writev_is_via_threads (stream))
{
g_task_run_in_thread (task, writev_all_async_thread);
g_object_unref (task);
}
else
writev_all_callback (G_OBJECT (stream), NULL, g_steal_pointer (&task));
}
/**
* g_output_stream_writev_all_finish:
* @stream: a #GOutputStream
* @result: a #GAsyncResult
* @bytes_written: (out) (optional): location to store the number of bytes that were written to the stream
* @error: a #GError location to store the error occurring, or %NULL to ignore.
*
* Finishes an asynchronous stream write operation started with
* g_output_stream_writev_all_async().
*
* As a special exception to the normal conventions for functions that
* use #GError, if this function returns %FALSE (and sets @error) then
* @bytes_written will be set to the number of bytes that were
* successfully written before the error was encountered. This
* functionality is only available from C. If you need it from another
* language then you must write your own loop around
* g_output_stream_writev_async().
*
* Returns: %TRUE on success, %FALSE if there was an error
*
* Since: 2.60
*/
gboolean
g_output_stream_writev_all_finish (GOutputStream *stream,
GAsyncResult *result,
gsize *bytes_written,
GError **error)
{
GTask *task;
g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
task = G_TASK (result);
if (bytes_written)
{
AsyncWritevAll *data = (AsyncWritevAll *)g_task_get_task_data (task);
*bytes_written = data->bytes_written;
}
return g_task_propagate_boolean (task, error);
}
static void
write_bytes_callback (GObject *stream,
GAsyncResult *result,
@ -1712,6 +2249,28 @@ g_output_stream_async_write_is_via_threads (GOutputStream *stream)
g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream))));
}
/*< internal >
* g_output_stream_async_writev_is_via_threads:
* @stream: a #GOutputStream.
*
* Checks if an output stream's writev_async function uses threads.
*
* Returns: %TRUE if @stream's writev_async function uses threads.
**/
gboolean
g_output_stream_async_writev_is_via_threads (GOutputStream *stream)
{
GOutputStreamClass *class;
g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
class = G_OUTPUT_STREAM_GET_CLASS (stream);
return (class->writev_async == g_output_stream_real_writev_async &&
!(G_IS_POLLABLE_OUTPUT_STREAM (stream) &&
g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream))));
}
/*< internal >
* g_output_stream_async_close_is_via_threads:
* @stream: output stream
@ -1732,6 +2291,69 @@ g_output_stream_async_close_is_via_threads (GOutputStream *stream)
return class->close_async == g_output_stream_real_close_async;
}
/********************************************
* Default implementation of sync ops *
********************************************/
static gboolean
g_output_stream_real_writev (GOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GCancellable *cancellable,
GError **error)
{
GOutputStreamClass *class;
gsize _bytes_written = 0;
gsize i;
GError *err = NULL;
class = G_OUTPUT_STREAM_GET_CLASS (stream);
if (bytes_written)
*bytes_written = 0;
for (i = 0; i < n_vectors; i++)
{
gssize res = 0;
/* Would we overflow here? In that case simply return and let the caller
* handle this like a short write */
if (_bytes_written > G_MAXSIZE - vectors[i].size)
break;
res = class->write_fn (stream, vectors[i].buffer, vectors[i].size, cancellable, &err);
if (res == -1)
{
/* If we already wrote something we handle this like a short write
* and assume that on the next call the same error happens again, or
* everything finishes successfully without data loss then
*/
if (_bytes_written > 0)
{
if (bytes_written)
*bytes_written = _bytes_written;
g_clear_error (&err);
return TRUE;
}
g_propagate_error (error, err);
return FALSE;
}
_bytes_written += res;
/* if we had a short write break the loop here */
if (res < vectors[i].size)
break;
}
if (bytes_written)
*bytes_written = _bytes_written;
return TRUE;
}
/********************************************
* Default implementation of async ops *
********************************************/
@ -1852,6 +2474,172 @@ g_output_stream_real_write_finish (GOutputStream *stream,
return g_task_propagate_int (G_TASK (result), error);
}
typedef struct {
const GOutputVector *vectors;
gsize n_vectors; /* (unowned) */
gsize bytes_written;
} WritevData;
static void
free_writev_data (WritevData *op)
{
g_slice_free (WritevData, op);
}
static void
writev_async_thread (GTask *task,
gpointer source_object,
gpointer task_data,
GCancellable *cancellable)
{
GOutputStream *stream = source_object;
WritevData *op = task_data;
GOutputStreamClass *class;
GError *error = NULL;
gboolean res;
class = G_OUTPUT_STREAM_GET_CLASS (stream);
res = class->writev_fn (stream, op->vectors, op->n_vectors,
&op->bytes_written, cancellable, &error);
g_warn_if_fail (res || op->bytes_written == 0);
g_warn_if_fail (res || error != NULL);
if (!res)
g_task_return_error (task, g_steal_pointer (&error));
else
g_task_return_boolean (task, TRUE);
}
static void writev_async_pollable (GPollableOutputStream *stream,
GTask *task);
static gboolean
writev_async_pollable_ready (GPollableOutputStream *stream,
gpointer user_data)
{
GTask *task = user_data;
writev_async_pollable (stream, task);
return G_SOURCE_REMOVE;
}
static void
writev_async_pollable (GPollableOutputStream *stream,
GTask *task)
{
GError *error = NULL;
WritevData *op = g_task_get_task_data (task);
GPollableReturn res;
gsize bytes_written = 0;
if (g_task_return_error_if_cancelled (task))
return;
res = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
writev_nonblocking (stream, op->vectors, op->n_vectors, &bytes_written, &error);
switch (res)
{
case G_POLLABLE_RETURN_WOULD_BLOCK:
{
GSource *source;
g_warn_if_fail (error == NULL);
g_warn_if_fail (bytes_written == 0);
source = g_pollable_output_stream_create_source (stream,
g_task_get_cancellable (task));
g_task_attach_source (task, source,
(GSourceFunc) writev_async_pollable_ready);
g_source_unref (source);
}
break;
case G_POLLABLE_RETURN_OK:
g_warn_if_fail (error == NULL);
op->bytes_written = bytes_written;
g_task_return_boolean (task, TRUE);
break;
case G_POLLABLE_RETURN_FAILED:
g_warn_if_fail (bytes_written == 0);
g_warn_if_fail (error != NULL);
g_task_return_error (task, g_steal_pointer (&error));
break;
default:
g_assert_not_reached ();
}
}
static void
g_output_stream_real_writev_async (GOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
int io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
GTask *task;
WritevData *op;
GError *error = NULL;
op = g_slice_new0 (WritevData);
task = g_task_new (stream, cancellable, callback, user_data);
op->vectors = vectors;
op->n_vectors = n_vectors;
g_task_set_check_cancellable (task, FALSE);
g_task_set_source_tag (task, g_output_stream_writev_async);
g_task_set_priority (task, io_priority);
g_task_set_task_data (task, op, (GDestroyNotify) free_writev_data);
if (n_vectors == 0)
{
g_task_return_boolean (task, TRUE);
g_object_unref (task);
return;
}
if (!g_output_stream_set_pending (stream, &error))
{
g_task_return_error (task, g_steal_pointer (&error));
g_object_unref (task);
return;
}
if (!g_output_stream_async_writev_is_via_threads (stream))
writev_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), task);
else
g_task_run_in_thread (task, writev_async_thread);
g_object_unref (task);
}
static gboolean
g_output_stream_real_writev_finish (GOutputStream *stream,
GAsyncResult *result,
gsize *bytes_written,
GError **error)
{
GTask *task;
g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_writev_async), FALSE);
g_output_stream_clear_pending (stream);
task = G_TASK (result);
if (bytes_written)
{
WritevData *op = g_task_get_task_data (task);
*bytes_written = op->bytes_written;
}
return g_task_propagate_boolean (task, error);
}
typedef struct {
GInputStream *source;
GOutputStreamSpliceFlags flags;

View File

@ -119,11 +119,28 @@ struct _GOutputStreamClass
GAsyncResult *result,
GError **error);
gboolean (* writev_fn) (GOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GCancellable *cancellable,
GError **error);
void (* writev_async) (GOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
int io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data);
gboolean (* writev_finish) (GOutputStream *stream,
GAsyncResult *result,
gsize *bytes_written,
GError **error);
/*< private >*/
/* Padding for future expansion */
void (*_g_reserved1) (void);
void (*_g_reserved2) (void);
void (*_g_reserved3) (void);
void (*_g_reserved4) (void);
void (*_g_reserved5) (void);
void (*_g_reserved6) (void);
@ -147,6 +164,22 @@ gboolean g_output_stream_write_all (GOutputStream *stream,
gsize *bytes_written,
GCancellable *cancellable,
GError **error);
GLIB_AVAILABLE_IN_2_60
gboolean g_output_stream_writev (GOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GCancellable *cancellable,
GError **error);
GLIB_AVAILABLE_IN_2_60
gboolean g_output_stream_writev_all (GOutputStream *stream,
GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GCancellable *cancellable,
GError **error);
GLIB_AVAILABLE_IN_2_40
gboolean g_output_stream_printf (GOutputStream *stream,
gsize *bytes_written,
@ -208,6 +241,35 @@ gboolean g_output_stream_write_all_finish (GOutputStream *stream,
gsize *bytes_written,
GError **error);
GLIB_AVAILABLE_IN_2_60
void g_output_stream_writev_async (GOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
int io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data);
GLIB_AVAILABLE_IN_2_60
gboolean g_output_stream_writev_finish (GOutputStream *stream,
GAsyncResult *result,
gsize *bytes_written,
GError **error);
GLIB_AVAILABLE_IN_2_60
void g_output_stream_writev_all_async (GOutputStream *stream,
GOutputVector *vectors,
gsize n_vectors,
int io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data);
GLIB_AVAILABLE_IN_2_60
gboolean g_output_stream_writev_all_finish (GOutputStream *stream,
GAsyncResult *result,
gsize *bytes_written,
GError **error);
GLIB_AVAILABLE_IN_2_34
void g_output_stream_write_bytes_async (GOutputStream *stream,
GBytes *bytes,

View File

@ -41,17 +41,23 @@
G_DEFINE_INTERFACE (GPollableOutputStream, g_pollable_output_stream, G_TYPE_OUTPUT_STREAM)
static gboolean g_pollable_output_stream_default_can_poll (GPollableOutputStream *stream);
static gssize g_pollable_output_stream_default_write_nonblocking (GPollableOutputStream *stream,
const void *buffer,
gsize count,
GError **error);
static gboolean g_pollable_output_stream_default_can_poll (GPollableOutputStream *stream);
static gssize g_pollable_output_stream_default_write_nonblocking (GPollableOutputStream *stream,
const void *buffer,
gsize count,
GError **error);
static GPollableReturn g_pollable_output_stream_default_writev_nonblocking (GPollableOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GError **error);
static void
g_pollable_output_stream_default_init (GPollableOutputStreamInterface *iface)
{
iface->can_poll = g_pollable_output_stream_default_can_poll;
iface->write_nonblocking = g_pollable_output_stream_default_write_nonblocking;
iface->can_poll = g_pollable_output_stream_default_can_poll;
iface->write_nonblocking = g_pollable_output_stream_default_write_nonblocking;
iface->writev_nonblocking = g_pollable_output_stream_default_writev_nonblocking;
}
static gboolean
@ -157,6 +163,67 @@ g_pollable_output_stream_default_write_nonblocking (GPollableOutputStream *stre
write_fn (G_OUTPUT_STREAM (stream), buffer, count, NULL, error);
}
static GPollableReturn
g_pollable_output_stream_default_writev_nonblocking (GPollableOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GError **error)
{
gsize _bytes_written = 0;
GPollableOutputStreamInterface *iface = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream);
gsize i;
GError *err = NULL;
for (i = 0; i < n_vectors; i++)
{
gssize res;
/* Would we overflow here? In that case simply return and let the caller
* handle this like a short write */
if (_bytes_written > G_MAXSIZE - vectors[i].size)
break;
res = iface->write_nonblocking (stream, vectors[i].buffer, vectors[i].size, &err);
if (res == -1)
{
if (bytes_written)
*bytes_written = _bytes_written;
/* If something was written already we handle this like a short
* write and assume that the next call would either give the same
* error again or successfully finish writing without errors or data
* loss
*/
if (_bytes_written > 0)
{
g_clear_error (&err);
return G_POLLABLE_RETURN_OK;
}
else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
{
g_clear_error (&err);
return G_POLLABLE_RETURN_WOULD_BLOCK;
}
else
{
g_propagate_error (error, err);
return G_POLLABLE_RETURN_FAILED;
}
}
_bytes_written += res;
/* if we had a short write break the loop here */
if (res < vectors[i].size)
break;
}
if (bytes_written)
*bytes_written = _bytes_written;
return G_POLLABLE_RETURN_OK;
}
/**
* g_pollable_output_stream_write_nonblocking:
* @stream: a #GPollableOutputStream
@ -179,7 +246,8 @@ g_pollable_output_stream_default_write_nonblocking (GPollableOutputStream *stre
* to having been cancelled.
*
* Also note that if %G_IO_ERROR_WOULD_BLOCK is returned some underlying
* transports like D/TLS require that you send the same @buffer and @count.
* transports like D/TLS require that you re-send the same @buffer and
* @count in the next write call.
*
* Virtual: write_nonblocking
* Returns: the number of bytes written, or -1 on error (including
@ -221,3 +289,91 @@ g_pollable_output_stream_write_nonblocking (GPollableOutputStream *stream,
return res;
}
/**
* g_pollable_output_stream_writev_nonblocking:
* @stream: a #GPollableOutputStream
* @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
* @n_vectors: the number of vectors to write
* @bytes_written: (out) (optional): location to store the number of bytes that were
* written to the stream
* @cancellable: (nullable): a #GCancellable, or %NULL
* @error: #GError for error reporting, or %NULL to ignore.
*
* Attempts to write the bytes contained in the @n_vectors @vectors to @stream,
* as with g_output_stream_writev(). If @stream is not currently writable,
* this will immediately return %@G_POLLABLE_RETURN_WOULD_BLOCK, and you can
* use g_pollable_output_stream_create_source() to create a #GSource
* that will be triggered when @stream is writable. @error will *not* be
* set in that case.
*
* Note that since this method never blocks, you cannot actually
* use @cancellable to cancel it. However, it will return an error
* if @cancellable has already been cancelled when you call, which
* may happen if you call this method after a source triggers due
* to having been cancelled.
*
* Also note that if %G_POLLABLE_RETURN_WOULD_BLOCK is returned some underlying
* transports like D/TLS require that you re-send the same @vectors and
* @n_vectors in the next write call.
*
* Virtual: writev_nonblocking
*
* Returns: %@G_POLLABLE_RETURN_OK on success, %G_POLLABLE_RETURN_WOULD_BLOCK
* if the stream is not currently writable (and @error is *not* set), or
* %G_POLLABLE_RETURN_FAILED if there was an error in which case @error will
* be set.
*
* Since: 2.60
*/
GPollableReturn
g_pollable_output_stream_writev_nonblocking (GPollableOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GCancellable *cancellable,
GError **error)
{
GPollableOutputStreamInterface *iface;
GPollableReturn res;
gsize _bytes_written = 0;
if (bytes_written)
*bytes_written = 0;
g_return_val_if_fail (G_IS_POLLABLE_OUTPUT_STREAM (stream), G_POLLABLE_RETURN_FAILED);
g_return_val_if_fail (vectors != NULL || n_vectors == 0, G_POLLABLE_RETURN_FAILED);
g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), G_POLLABLE_RETURN_FAILED);
g_return_val_if_fail (error == NULL || *error == NULL, G_POLLABLE_RETURN_FAILED);
if (g_cancellable_set_error_if_cancelled (cancellable, error))
return G_POLLABLE_RETURN_FAILED;
if (n_vectors == 0)
return G_POLLABLE_RETURN_OK;
iface = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream);
g_return_val_if_fail (iface->writev_nonblocking != NULL, G_POLLABLE_RETURN_FAILED);
if (cancellable)
g_cancellable_push_current (cancellable);
res = iface->
writev_nonblocking (stream, vectors, n_vectors, &_bytes_written, error);
if (cancellable)
g_cancellable_pop_current (cancellable);
if (res == G_POLLABLE_RETURN_FAILED)
g_warn_if_fail (error == NULL || (*error != NULL && !g_error_matches (*error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)));
else if (res == G_POLLABLE_RETURN_WOULD_BLOCK)
g_warn_if_fail (error == NULL || *error == NULL);
/* in case of not-OK nothing must've been written */
g_warn_if_fail (res == G_POLLABLE_RETURN_OK || _bytes_written == 0);
if (bytes_written)
*bytes_written = _bytes_written;
return res;
}

View File

@ -77,6 +77,11 @@ struct _GPollableOutputStreamInterface
const void *buffer,
gsize count,
GError **error);
GPollableReturn (*writev_nonblocking) (GPollableOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GError **error);
};
GLIB_AVAILABLE_IN_ALL
@ -98,6 +103,14 @@ gssize g_pollable_output_stream_write_nonblocking (GPollableOutputStream *str
GCancellable *cancellable,
GError **error);
GLIB_AVAILABLE_IN_2_60
GPollableReturn g_pollable_output_stream_writev_nonblocking (GPollableOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GCancellable *cancellable,
GError **error);
G_END_DECLS

View File

@ -151,14 +151,14 @@ static gint g_socket_datagram_based_receive_messages (GDatagramBased *self,
GInputMessage *messages,
guint num_messages,
gint flags,
gint64 timeout,
gint64 timeout_us,
GCancellable *cancellable,
GError **error);
static gint g_socket_datagram_based_send_messages (GDatagramBased *self,
GOutputMessage *messages,
guint num_messages,
gint flags,
gint64 timeout,
gint64 timeout_us,
GCancellable *cancellable,
GError **error);
static GSource *g_socket_datagram_based_create_source (GDatagramBased *self,
@ -168,7 +168,7 @@ static GIOCondition g_socket_datagram_based_condition_check (GDatagramBased
GIOCondition condition);
static gboolean g_socket_datagram_based_condition_wait (GDatagramBased *datagram_based,
GIOCondition condition,
gint64 timeout,
gint64 timeout_us,
GCancellable *cancellable,
GError **error);
@ -183,7 +183,7 @@ g_socket_receive_message_with_timeout (GSocket *socket,
GSocketControlMessage ***messages,
gint *num_messages,
gint *flags,
gint64 timeout,
gint64 timeout_us,
GCancellable *cancellable,
GError **error);
static gint
@ -191,26 +191,15 @@ g_socket_receive_messages_with_timeout (GSocket *socket,
GInputMessage *messages,
guint num_messages,
gint flags,
gint64 timeout,
gint64 timeout_us,
GCancellable *cancellable,
GError **error);
static gssize
g_socket_send_message_with_timeout (GSocket *socket,
GSocketAddress *address,
GOutputVector *vectors,
gint num_vectors,
GSocketControlMessage **messages,
gint num_messages,
gint flags,
gint64 timeout,
GCancellable *cancellable,
GError **error);
static gint
g_socket_send_messages_with_timeout (GSocket *socket,
GOutputMessage *messages,
guint num_messages,
gint flags,
gint64 timeout,
gint64 timeout_us,
GCancellable *cancellable,
GError **error);
@ -1157,7 +1146,7 @@ g_socket_datagram_based_receive_messages (GDatagramBased *self,
GInputMessage *messages,
guint num_messages,
gint flags,
gint64 timeout,
gint64 timeout_us,
GCancellable *cancellable,
GError **error)
{
@ -1165,7 +1154,7 @@ g_socket_datagram_based_receive_messages (GDatagramBased *self,
return FALSE;
return g_socket_receive_messages_with_timeout (G_SOCKET (self), messages,
num_messages, flags, timeout,
num_messages, flags, timeout_us,
cancellable, error);
}
@ -1174,7 +1163,7 @@ g_socket_datagram_based_send_messages (GDatagramBased *self,
GOutputMessage *messages,
guint num_messages,
gint flags,
gint64 timeout,
gint64 timeout_us,
GCancellable *cancellable,
GError **error)
{
@ -1182,7 +1171,7 @@ g_socket_datagram_based_send_messages (GDatagramBased *self,
return FALSE;
return g_socket_send_messages_with_timeout (G_SOCKET (self), messages,
num_messages, flags, timeout,
num_messages, flags, timeout_us,
cancellable, error);
}
@ -1210,7 +1199,7 @@ g_socket_datagram_based_condition_check (GDatagramBased *datagram_based,
static gboolean
g_socket_datagram_based_condition_wait (GDatagramBased *datagram_based,
GIOCondition condition,
gint64 timeout,
gint64 timeout_us,
GCancellable *cancellable,
GError **error)
{
@ -1218,7 +1207,7 @@ g_socket_datagram_based_condition_wait (GDatagramBased *datagram_based,
return FALSE;
return g_socket_condition_timed_wait (G_SOCKET (datagram_based), condition,
timeout, cancellable, error);
timeout_us, cancellable, error);
}
/**
@ -3017,21 +3006,21 @@ g_socket_get_available_bytes (GSocket *socket)
static gboolean
block_on_timeout (GSocket *socket,
GIOCondition condition,
gint64 timeout,
gint64 timeout_us,
gint64 start_time,
GCancellable *cancellable,
GError **error)
{
gint64 wait_timeout = -1;
g_return_val_if_fail (timeout != 0, TRUE);
g_return_val_if_fail (timeout_us != 0, TRUE);
/* check if we've timed out or how much time to wait at most */
if (timeout >= 0)
if (timeout_us >= 0)
{
gint64 elapsed = g_get_monotonic_time () - start_time;
if (elapsed >= timeout)
if (elapsed >= timeout_us)
{
g_set_error_literal (error,
G_IO_ERROR, G_IO_ERROR_TIMED_OUT,
@ -3039,7 +3028,7 @@ block_on_timeout (GSocket *socket,
return FALSE;
}
wait_timeout = timeout - elapsed;
wait_timeout = timeout_us - elapsed;
}
return g_socket_condition_timed_wait (socket, condition, wait_timeout,
@ -3050,7 +3039,7 @@ static gssize
g_socket_receive_with_timeout (GSocket *socket,
guint8 *buffer,
gsize size,
gint64 timeout,
gint64 timeout_us,
GCancellable *cancellable,
GError **error)
{
@ -3088,9 +3077,9 @@ g_socket_receive_with_timeout (GSocket *socket,
{
win32_unset_event_mask (socket, FD_READ);
if (timeout != 0)
if (timeout_us != 0)
{
if (!block_on_timeout (socket, G_IO_IN, timeout, start_time,
if (!block_on_timeout (socket, G_IO_IN, timeout_us, start_time,
cancellable, error))
return -1;
@ -3249,7 +3238,7 @@ static gssize
g_socket_send_with_timeout (GSocket *socket,
const guint8 *buffer,
gsize size,
gint64 timeout,
gint64 timeout_us,
GCancellable *cancellable,
GError **error)
{
@ -3287,9 +3276,9 @@ g_socket_send_with_timeout (GSocket *socket,
{
win32_unset_event_mask (socket, FD_WRITE);
if (timeout != 0)
if (timeout_us != 0)
{
if (!block_on_timeout (socket, G_IO_OUT, timeout, start_time,
if (!block_on_timeout (socket, G_IO_OUT, timeout_us, start_time,
cancellable, error))
return -1;
@ -4126,25 +4115,25 @@ g_socket_condition_wait (GSocket *socket,
* g_socket_condition_timed_wait:
* @socket: a #GSocket
* @condition: a #GIOCondition mask to wait for
* @timeout: the maximum time (in microseconds) to wait, or -1
* @timeout_us: the maximum time (in microseconds) to wait, or -1
* @cancellable: (nullable): a #GCancellable, or %NULL
* @error: a #GError pointer, or %NULL
*
* Waits for up to @timeout microseconds for @condition to become true
* Waits for up to @timeout_us microseconds for @condition to become true
* on @socket. If the condition is met, %TRUE is returned.
*
* If @cancellable is cancelled before the condition is met, or if
* @timeout (or the socket's #GSocket:timeout) is reached before the
* @timeout_us (or the socket's #GSocket:timeout) is reached before the
* condition is met, then %FALSE is returned and @error, if non-%NULL,
* is set to the appropriate value (%G_IO_ERROR_CANCELLED or
* %G_IO_ERROR_TIMED_OUT).
*
* If you don't want a timeout, use g_socket_condition_wait().
* (Alternatively, you can pass -1 for @timeout.)
* (Alternatively, you can pass -1 for @timeout_us.)
*
* Note that although @timeout is in microseconds for consistency with
* Note that although @timeout_us is in microseconds for consistency with
* other GLib APIs, this function actually only has millisecond
* resolution, and the behavior is undefined if @timeout is not an
* resolution, and the behavior is undefined if @timeout_us is not an
* exact number of milliseconds.
*
* Returns: %TRUE if the condition was met, %FALSE otherwise
@ -4154,11 +4143,12 @@ g_socket_condition_wait (GSocket *socket,
gboolean
g_socket_condition_timed_wait (GSocket *socket,
GIOCondition condition,
gint64 timeout,
gint64 timeout_us,
GCancellable *cancellable,
GError **error)
{
gint64 start_time;
gint64 timeout_ms;
g_return_val_if_fail (G_IS_SOCKET (socket), FALSE);
@ -4169,10 +4159,12 @@ g_socket_condition_timed_wait (GSocket *socket,
return FALSE;
if (socket->priv->timeout &&
(timeout < 0 || socket->priv->timeout < timeout / G_USEC_PER_SEC))
timeout = (gint64) socket->priv->timeout * 1000;
else if (timeout != -1)
timeout = timeout / 1000;
(timeout_us < 0 || socket->priv->timeout < timeout_us / G_USEC_PER_SEC))
timeout_ms = (gint64) socket->priv->timeout * 1000;
else if (timeout_us != -1)
timeout_ms = timeout_us / 1000;
else
timeout_ms = -1;
start_time = g_get_monotonic_time ();
@ -4195,8 +4187,8 @@ g_socket_condition_timed_wait (GSocket *socket,
if (g_cancellable_make_pollfd (cancellable, &cancel_fd))
events[num_events++] = (WSAEVENT)cancel_fd.fd;
if (timeout == -1)
timeout = WSA_INFINITE;
if (timeout_ms == -1)
timeout_ms = WSA_INFINITE;
g_mutex_lock (&socket->priv->win32_source_lock);
current_condition = update_condition_unlocked (socket);
@ -4208,7 +4200,7 @@ g_socket_condition_timed_wait (GSocket *socket,
socket->priv->waiting_result = 0;
g_mutex_unlock (&socket->priv->win32_source_lock);
res = WSAWaitForMultipleEvents (num_events, events, FALSE, timeout, FALSE);
res = WSAWaitForMultipleEvents (num_events, events, FALSE, timeout_ms, FALSE);
g_mutex_lock (&socket->priv->win32_source_lock);
socket->priv->waiting = FALSE;
@ -4217,9 +4209,9 @@ g_socket_condition_timed_wait (GSocket *socket,
}
else
{
if (timeout != WSA_INFINITE)
if (timeout_ms != WSA_INFINITE)
{
if (!g_cond_wait_until (&socket->priv->win32_source_cond, &socket->priv->win32_source_lock, timeout))
if (!g_cond_wait_until (&socket->priv->win32_source_cond, &socket->priv->win32_source_lock, timeout_ms))
{
res = WSA_WAIT_TIMEOUT;
break;
@ -4258,11 +4250,11 @@ g_socket_condition_timed_wait (GSocket *socket,
current_condition = update_condition_unlocked (socket);
if (timeout != WSA_INFINITE)
if (timeout_ms != WSA_INFINITE)
{
timeout -= (g_get_monotonic_time () - start_time) * 1000;
if (timeout < 0)
timeout = 0;
timeout_ms -= (g_get_monotonic_time () - start_time) * 1000;
if (timeout_ms < 0)
timeout_ms = 0;
}
}
g_mutex_unlock (&socket->priv->win32_source_lock);
@ -4288,16 +4280,16 @@ g_socket_condition_timed_wait (GSocket *socket,
while (TRUE)
{
int errsv;
result = g_poll (poll_fd, num, timeout);
result = g_poll (poll_fd, num, timeout_ms);
errsv = errno;
if (result != -1 || errsv != EINTR)
break;
if (timeout != -1)
if (timeout_ms != -1)
{
timeout -= (g_get_monotonic_time () - start_time) / 1000;
if (timeout < 0)
timeout = 0;
timeout_ms -= (g_get_monotonic_time () - start_time) / 1000;
if (timeout_ms < 0)
timeout_ms = 0;
}
}
@ -4548,7 +4540,7 @@ input_message_from_msghdr (const struct msghdr *msg,
* @messages: (array length=num_messages) (nullable): a pointer to an
* array of #GSocketControlMessages, or %NULL.
* @num_messages: number of elements in @messages, or -1.
* @flags: an int containing #GSocketMsgFlags flags
* @flags: (type GSocketMsgFlags): an int containing #GSocketMsgFlags flags
* @cancellable: (nullable): a %GCancellable or %NULL
* @error: #GError for error reporting, or %NULL to ignore.
*
@ -4606,22 +4598,61 @@ g_socket_send_message (GSocket *socket,
GCancellable *cancellable,
GError **error)
{
return g_socket_send_message_with_timeout (socket, address,
vectors, num_vectors,
messages, num_messages, flags,
socket->priv->blocking ? -1 : 0,
cancellable, error);
GPollableReturn res;
gsize bytes_written = 0;
res = g_socket_send_message_with_timeout (socket, address,
vectors, num_vectors,
messages, num_messages, flags,
socket->priv->blocking ? -1 : 0,
&bytes_written,
cancellable, error);
if (res == G_POLLABLE_RETURN_WOULD_BLOCK)
socket_set_error_lazy (error, EWOULDBLOCK, _("Error sending message: %s"));
return res == G_POLLABLE_RETURN_OK ? bytes_written : -1;
}
static gssize
/**
* g_socket_send_message_with_timeout:
* @socket: a #GSocket
* @address: (nullable): a #GSocketAddress, or %NULL
* @vectors: (array length=num_vectors): an array of #GOutputVector structs
* @num_vectors: the number of elements in @vectors, or -1
* @messages: (array length=num_messages) (nullable): a pointer to an
* array of #GSocketControlMessages, or %NULL.
* @num_messages: number of elements in @messages, or -1.
* @flags: (type GSocketMsgFlags): an int containing #GSocketMsgFlags flags
* @timeout_us: the maximum time (in microseconds) to wait, or -1
* @bytes_written: (out) (optional): location to store the number of bytes that were written to the socket
* @cancellable: (nullable): a %GCancellable or %NULL
* @error: #GError for error reporting, or %NULL to ignore.
*
* This behaves exactly the same as g_socket_send_message(), except that
* the choice of timeout behavior is determined by the @timeout_us argument
* rather than by @socket's properties.
*
* On error %G_POLLABLE_RETURN_FAILED is returned and @error is set accordingly, or
* if the socket is currently not writable %G_POLLABLE_RETURN_WOULD_BLOCK is
* returned. @bytes_written will contain 0 in both cases.
*
* Returns: %G_POLLABLE_RETURN_OK if all data was successfully written,
* %G_POLLABLE_RETURN_WOULD_BLOCK if the socket is currently not writable, or
* %G_POLLABLE_RETURN_FAILED if an error happened and @error is set.
*
* Since: 2.60
*/
GPollableReturn
g_socket_send_message_with_timeout (GSocket *socket,
GSocketAddress *address,
GOutputVector *vectors,
const GOutputVector *vectors,
gint num_vectors,
GSocketControlMessage **messages,
gint num_messages,
gint flags,
gint64 timeout,
gint64 timeout_us,
gsize *bytes_written,
GCancellable *cancellable,
GError **error)
{
@ -4629,23 +4660,26 @@ g_socket_send_message_with_timeout (GSocket *socket,
char zero;
gint64 start_time;
g_return_val_if_fail (G_IS_SOCKET (socket), -1);
g_return_val_if_fail (address == NULL || G_IS_SOCKET_ADDRESS (address), -1);
g_return_val_if_fail (num_vectors == 0 || vectors != NULL, -1);
g_return_val_if_fail (num_messages == 0 || messages != NULL, -1);
g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), -1);
g_return_val_if_fail (error == NULL || *error == NULL, -1);
if (bytes_written)
*bytes_written = 0;
g_return_val_if_fail (G_IS_SOCKET (socket), G_POLLABLE_RETURN_FAILED);
g_return_val_if_fail (address == NULL || G_IS_SOCKET_ADDRESS (address), G_POLLABLE_RETURN_FAILED);
g_return_val_if_fail (num_vectors == 0 || vectors != NULL, G_POLLABLE_RETURN_FAILED);
g_return_val_if_fail (num_messages == 0 || messages != NULL, G_POLLABLE_RETURN_FAILED);
g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), G_POLLABLE_RETURN_FAILED);
g_return_val_if_fail (error == NULL || *error == NULL, G_POLLABLE_RETURN_FAILED);
start_time = g_get_monotonic_time ();
if (!check_socket (socket, error))
return -1;
return G_POLLABLE_RETURN_FAILED;
if (!check_timeout (socket, error))
return -1;
return G_POLLABLE_RETURN_FAILED;
if (g_cancellable_set_error_if_cancelled (cancellable, error))
return -1;
return G_POLLABLE_RETURN_FAILED;
if (num_vectors == -1)
{
@ -4681,7 +4715,7 @@ g_socket_send_message_with_timeout (GSocket *socket,
GError *child_error = NULL;
output_message.address = address;
output_message.vectors = vectors;
output_message.vectors = (GOutputVector *) vectors;
output_message.num_vectors = num_vectors;
output_message.bytes_sent = 0;
output_message.control_messages = messages;
@ -4692,7 +4726,7 @@ g_socket_send_message_with_timeout (GSocket *socket,
if (child_error != NULL)
{
g_propagate_error (error, child_error);
return -1;
return G_POLLABLE_RETURN_FAILED;
}
while (1)
@ -4705,24 +4739,30 @@ g_socket_send_message_with_timeout (GSocket *socket,
if (errsv == EINTR)
continue;
if (timeout != 0 &&
(errsv == EWOULDBLOCK ||
errsv == EAGAIN))
if (errsv == EWOULDBLOCK || errsv == EAGAIN)
{
if (!block_on_timeout (socket, G_IO_OUT, timeout, start_time,
cancellable, error))
return -1;
if (timeout_us != 0)
{
if (!block_on_timeout (socket, G_IO_OUT, timeout_us, start_time,
cancellable, error))
return G_POLLABLE_RETURN_FAILED;
continue;
continue;
}
return G_POLLABLE_RETURN_WOULD_BLOCK;
}
socket_set_error_lazy (error, errsv, _("Error sending message: %s"));
return -1;
socket_set_error_lazy (error, errsv, _("Error sending message: %s"));
return G_POLLABLE_RETURN_FAILED;
}
break;
}
return result;
if (bytes_written)
*bytes_written = result;
return G_POLLABLE_RETURN_OK;
}
#else
{
@ -4741,7 +4781,7 @@ g_socket_send_message_with_timeout (GSocket *socket,
{
g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
_("GSocketControlMessage not supported on Windows"));
return -1;
return G_POLLABLE_RETURN_FAILED;
}
/* iov */
@ -4758,7 +4798,7 @@ g_socket_send_message_with_timeout (GSocket *socket,
{
addrlen = g_socket_address_get_native_size (address);
if (!g_socket_address_to_native (address, &addr, sizeof addr, error))
return -1;
return G_POLLABLE_RETURN_FAILED;
}
while (1)
@ -4786,23 +4826,27 @@ g_socket_send_message_with_timeout (GSocket *socket,
{
win32_unset_event_mask (socket, FD_WRITE);
if (timeout != 0)
if (timeout_us != 0)
{
if (!block_on_timeout (socket, G_IO_OUT, timeout,
if (!block_on_timeout (socket, G_IO_OUT, timeout_us,
start_time, cancellable, error))
return -1;
return G_POLLABLE_RETURN_FAILED;
continue;
}
return G_POLLABLE_RETURN_WOULD_BLOCK;
}
socket_set_error_lazy (error, errsv, _("Error sending message: %s"));
return -1;
return G_POLLABLE_RETURN_FAILED;
}
break;
}
return bytes_sent;
if (bytes_written)
*bytes_written = bytes_sent;
return G_POLLABLE_RETURN_OK;
}
#endif
}
@ -4812,7 +4856,7 @@ g_socket_send_message_with_timeout (GSocket *socket,
* @socket: a #GSocket
* @messages: (array length=num_messages): an array of #GOutputMessage structs
* @num_messages: the number of elements in @messages
* @flags: an int containing #GSocketMsgFlags flags
* @flags: (type GSocketMsgFlags): an int containing #GSocketMsgFlags flags
* @cancellable: (nullable): a %GCancellable or %NULL
* @error: #GError for error reporting, or %NULL to ignore.
*
@ -4877,7 +4921,7 @@ g_socket_send_messages_with_timeout (GSocket *socket,
GOutputMessage *messages,
guint num_messages,
gint flags,
gint64 timeout,
gint64 timeout_us,
GCancellable *cancellable,
GError **error)
{
@ -4951,11 +4995,11 @@ g_socket_send_messages_with_timeout (GSocket *socket,
if (errsv == EINTR)
continue;
if (timeout != 0 &&
if (timeout_us != 0 &&
(errsv == EWOULDBLOCK ||
errsv == EAGAIN))
{
if (!block_on_timeout (socket, G_IO_OUT, timeout, start_time,
if (!block_on_timeout (socket, G_IO_OUT, timeout_us, start_time,
cancellable, error))
{
if (num_sent > 0)
@ -4993,26 +5037,34 @@ g_socket_send_messages_with_timeout (GSocket *socket,
gint i;
gint64 wait_timeout;
wait_timeout = timeout;
wait_timeout = timeout_us;
for (i = 0; i < num_messages; ++i)
{
GOutputMessage *msg = &messages[i];
GError *msg_error = NULL;
GPollableReturn pollable_result;
gsize bytes_written = 0;
result = g_socket_send_message_with_timeout (socket, msg->address,
msg->vectors,
msg->num_vectors,
msg->control_messages,
msg->num_control_messages,
flags, wait_timeout,
cancellable, &msg_error);
pollable_result = g_socket_send_message_with_timeout (socket, msg->address,
msg->vectors,
msg->num_vectors,
msg->control_messages,
msg->num_control_messages,
flags, wait_timeout,
&bytes_written,
cancellable, &msg_error);
if (pollable_result == G_POLLABLE_RETURN_WOULD_BLOCK)
socket_set_error_lazy (&msg_error, EWOULDBLOCK, _("Error sending message: %s"));
result = pollable_result == G_POLLABLE_RETURN_OK ? bytes_written : -1;
/* check if we've timed out or how much time to wait at most */
if (timeout > 0)
if (timeout_us > 0)
{
gint64 elapsed = g_get_monotonic_time () - start_time;
wait_timeout = MAX (timeout - elapsed, 1);
wait_timeout = MAX (timeout_us - elapsed, 1);
}
if (result < 0)
@ -5101,7 +5153,7 @@ g_socket_receive_message_with_timeout (GSocket *socket,
GSocketControlMessage ***messages,
gint *num_messages,
gint *flags,
gint64 timeout,
gint64 timeout_us,
GCancellable *cancellable,
GError **error)
{
@ -5182,11 +5234,11 @@ g_socket_receive_message_with_timeout (GSocket *socket,
if (errsv == EINTR)
continue;
if (timeout != 0 &&
if (timeout_us != 0 &&
(errsv == EWOULDBLOCK ||
errsv == EAGAIN))
{
if (!block_on_timeout (socket, G_IO_IN, timeout, start_time,
if (!block_on_timeout (socket, G_IO_IN, timeout_us, start_time,
cancellable, error))
return -1;
@ -5256,9 +5308,9 @@ g_socket_receive_message_with_timeout (GSocket *socket,
{
win32_unset_event_mask (socket, FD_READ);
if (timeout != 0)
if (timeout_us != 0)
{
if (!block_on_timeout (socket, G_IO_IN, timeout,
if (!block_on_timeout (socket, G_IO_IN, timeout_us,
start_time, cancellable, error))
return -1;
@ -5298,7 +5350,7 @@ g_socket_receive_message_with_timeout (GSocket *socket,
* @socket: a #GSocket
* @messages: (array length=num_messages): an array of #GInputMessage structs
* @num_messages: the number of elements in @messages
* @flags: an int containing #GSocketMsgFlags flags for the overall operation
* @flags: (type GSocketMsgFlags): an int containing #GSocketMsgFlags flags for the overall operation
* @cancellable: (nullable): a %GCancellable or %NULL
* @error: #GError for error reporting, or %NULL to ignore
*
@ -5382,7 +5434,7 @@ g_socket_receive_messages_with_timeout (GSocket *socket,
GInputMessage *messages,
guint num_messages,
gint flags,
gint64 timeout,
gint64 timeout_us,
GCancellable *cancellable,
GError **error)
{
@ -5469,11 +5521,11 @@ g_socket_receive_messages_with_timeout (GSocket *socket,
if (errsv == EINTR)
continue;
if (timeout != 0 &&
if (timeout_us != 0 &&
(errsv == EWOULDBLOCK ||
errsv == EAGAIN))
{
if (!block_on_timeout (socket, G_IO_IN, timeout, start_time,
if (!block_on_timeout (socket, G_IO_IN, timeout_us, start_time,
cancellable, error))
{
if (num_received > 0)
@ -5519,7 +5571,7 @@ g_socket_receive_messages_with_timeout (GSocket *socket,
guint i;
gint64 wait_timeout;
wait_timeout = timeout;
wait_timeout = timeout_us;
for (i = 0; i < num_messages; i++)
{
@ -5541,10 +5593,10 @@ g_socket_receive_messages_with_timeout (GSocket *socket,
&msg_error);
/* check if we've timed out or how much time to wait at most */
if (timeout > 0)
if (timeout_us > 0)
{
gint64 elapsed = g_get_monotonic_time () - start_time;
wait_timeout = MAX (timeout - elapsed, 1);
wait_timeout = MAX (timeout_us - elapsed, 1);
}
if (len >= 0)
@ -5584,7 +5636,7 @@ g_socket_receive_messages_with_timeout (GSocket *socket,
* which may be filled with an array of #GSocketControlMessages, or %NULL
* @num_messages: (out): a pointer which will be filled with the number of
* elements in @messages, or %NULL
* @flags: (inout): a pointer to an int containing #GSocketMsgFlags flags
* @flags: (type GSocketMsgFlags): (inout): a pointer to an int containing #GSocketMsgFlags flags
* @cancellable: a %GCancellable or %NULL
* @error: a #GError pointer, or %NULL
*

View File

@ -192,7 +192,7 @@ gboolean g_socket_condition_wait (GSocket
GLIB_AVAILABLE_IN_2_32
gboolean g_socket_condition_timed_wait (GSocket *socket,
GIOCondition condition,
gint64 timeout,
gint64 timeout_us,
GCancellable *cancellable,
GError **error);
GLIB_AVAILABLE_IN_ALL
@ -298,7 +298,18 @@ gssize g_socket_send_with_blocking (GSocket
gboolean blocking,
GCancellable *cancellable,
GError **error);
GLIB_AVAILABLE_IN_2_60
GPollableReturn g_socket_send_message_with_timeout (GSocket *socket,
GSocketAddress *address,
const GOutputVector *vectors,
gint num_vectors,
GSocketControlMessage **messages,
gint num_messages,
gint flags,
gint64 timeout_us,
gsize *bytes_written,
GCancellable *cancellable,
GError **error);
GLIB_AVAILABLE_IN_2_36
gboolean g_socket_get_option (GSocket *socket,
gint level,

View File

@ -125,13 +125,42 @@ g_socket_output_stream_write (GOutputStream *stream,
GCancellable *cancellable,
GError **error)
{
GSocketOutputStream *onput_stream = G_SOCKET_OUTPUT_STREAM (stream);
GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (stream);
return g_socket_send_with_blocking (onput_stream->priv->socket,
return g_socket_send_with_blocking (output_stream->priv->socket,
buffer, count, TRUE,
cancellable, error);
}
static gboolean
g_socket_output_stream_writev (GOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GCancellable *cancellable,
GError **error)
{
GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (stream);
GPollableReturn res;
/* Clamp the number of vectors if more given than we can write in one go.
* The caller has to handle short writes anyway.
*/
if (n_vectors > G_MAXINT)
n_vectors = G_MAXINT;
res = g_socket_send_message_with_timeout (output_stream->priv->socket, NULL,
vectors, n_vectors,
NULL, 0, G_SOCKET_MSG_NONE,
-1, bytes_written,
cancellable, error);
/* we have a non-zero timeout so this can't happen */
g_assert (res != G_POLLABLE_RETURN_WOULD_BLOCK);
return res == G_POLLABLE_RETURN_OK;
}
static gboolean
g_socket_output_stream_pollable_is_writable (GPollableOutputStream *pollable)
{
@ -153,6 +182,27 @@ g_socket_output_stream_pollable_write_nonblocking (GPollableOutputStream *polla
NULL, error);
}
static GPollableReturn
g_socket_output_stream_pollable_writev_nonblocking (GPollableOutputStream *pollable,
const GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GError **error)
{
GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (pollable);
/* Clamp the number of vectors if more given than we can write in one go.
* The caller has to handle short writes anyway.
*/
if (n_vectors > G_MAXINT)
n_vectors = G_MAXINT;
return g_socket_send_message_with_timeout (output_stream->priv->socket,
NULL, vectors, n_vectors,
NULL, 0, G_SOCKET_MSG_NONE, 0,
bytes_written, NULL, error);
}
static GSource *
g_socket_output_stream_pollable_create_source (GPollableOutputStream *pollable,
GCancellable *cancellable)
@ -191,6 +241,7 @@ g_socket_output_stream_class_init (GSocketOutputStreamClass *klass)
gobject_class->set_property = g_socket_output_stream_set_property;
goutputstream_class->write_fn = g_socket_output_stream_write;
goutputstream_class->writev_fn = g_socket_output_stream_writev;
g_object_class_install_property (gobject_class, PROP_SOCKET,
g_param_spec_object ("socket",
@ -214,6 +265,7 @@ g_socket_output_stream_pollable_iface_init (GPollableOutputStreamInterface *ifac
iface->is_writable = g_socket_output_stream_pollable_is_writable;
iface->create_source = g_socket_output_stream_pollable_create_source;
iface->write_nonblocking = g_socket_output_stream_pollable_write_nonblocking;
iface->writev_nonblocking = g_socket_output_stream_pollable_writev_nonblocking;
}
static void

View File

@ -26,6 +26,7 @@
#include <errno.h>
#include <stdio.h>
#include <fcntl.h>
#include <sys/uio.h>
#include <glib.h>
#include <glib/gstdio.h>
@ -91,6 +92,12 @@ static gssize g_unix_output_stream_write (GOutputStream *stream,
gsize count,
GCancellable *cancellable,
GError **error);
static gboolean g_unix_output_stream_writev (GOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GCancellable *cancellable,
GError **error);
static gboolean g_unix_output_stream_close (GOutputStream *stream,
GCancellable *cancellable,
GError **error);
@ -107,6 +114,11 @@ static gboolean g_unix_output_stream_pollable_can_poll (GPollableOutputStre
static gboolean g_unix_output_stream_pollable_is_writable (GPollableOutputStream *stream);
static GSource *g_unix_output_stream_pollable_create_source (GPollableOutputStream *stream,
GCancellable *cancellable);
static GPollableReturn g_unix_output_stream_pollable_writev_nonblocking (GPollableOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GError **error);
static void
g_unix_output_stream_class_init (GUnixOutputStreamClass *klass)
@ -118,6 +130,7 @@ g_unix_output_stream_class_init (GUnixOutputStreamClass *klass)
gobject_class->set_property = g_unix_output_stream_set_property;
stream_class->write_fn = g_unix_output_stream_write;
stream_class->writev_fn = g_unix_output_stream_writev;
stream_class->close_fn = g_unix_output_stream_close;
stream_class->close_async = g_unix_output_stream_close_async;
stream_class->close_finish = g_unix_output_stream_close_finish;
@ -159,6 +172,7 @@ g_unix_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface)
iface->can_poll = g_unix_output_stream_pollable_can_poll;
iface->is_writable = g_unix_output_stream_pollable_is_writable;
iface->create_source = g_unix_output_stream_pollable_create_source;
iface->writev_nonblocking = g_unix_output_stream_pollable_writev_nonblocking;
}
static void
@ -325,19 +339,18 @@ g_unix_output_stream_write (GOutputStream *stream,
GUnixOutputStream *unix_stream;
gssize res = -1;
GPollFD poll_fds[2];
int nfds;
int nfds = 0;
int poll_ret;
unix_stream = G_UNIX_OUTPUT_STREAM (stream);
poll_fds[0].fd = unix_stream->priv->fd;
poll_fds[0].events = G_IO_OUT;
nfds++;
if (unix_stream->priv->is_pipe_or_socket &&
g_cancellable_make_pollfd (cancellable, &poll_fds[1]))
nfds = 2;
else
nfds = 1;
nfds++;
while (1)
{
@ -387,6 +400,116 @@ g_unix_output_stream_write (GOutputStream *stream,
return res;
}
/* Macro to check if struct iovec and GOutputVector have the same ABI */
#define G_OUTPUT_VECTOR_IS_IOVEC (sizeof (struct iovec) == sizeof (GOutputVector) && \
sizeof ((struct iovec *) 0)->iov_base == sizeof ((GOutputVector *) 0)->buffer && \
G_STRUCT_OFFSET (struct iovec, iov_base) == G_STRUCT_OFFSET (GOutputVector, buffer) && \
sizeof ((struct iovec *) 0)->iov_len == sizeof((GOutputVector *) 0)->size && \
G_STRUCT_OFFSET (struct iovec, iov_len) == G_STRUCT_OFFSET (GOutputVector, size))
static gboolean
g_unix_output_stream_writev (GOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GCancellable *cancellable,
GError **error)
{
GUnixOutputStream *unix_stream;
gssize res = -1;
GPollFD poll_fds[2];
int nfds = 0;
int poll_ret;
struct iovec *iov;
if (bytes_written)
*bytes_written = 0;
/* Clamp to G_MAXINT as writev() takes an integer for the number of vectors.
* We handle this like a short write in this case
*/
if (n_vectors > G_MAXINT)
n_vectors = G_MAXINT;
unix_stream = G_UNIX_OUTPUT_STREAM (stream);
if (G_OUTPUT_VECTOR_IS_IOVEC)
{
/* ABI is compatible */
iov = (struct iovec *) vectors;
}
else
{
gsize i;
/* ABI is incompatible */
iov = g_newa (struct iovec, n_vectors);
for (i = 0; i < n_vectors; i++)
{
iov[i].iov_base = (void *)vectors[i].buffer;
iov[i].iov_len = vectors[i].size;
}
}
poll_fds[0].fd = unix_stream->priv->fd;
poll_fds[0].events = G_IO_OUT;
nfds++;
if (unix_stream->priv->is_pipe_or_socket &&
g_cancellable_make_pollfd (cancellable, &poll_fds[1]))
nfds++;
while (1)
{
int errsv;
poll_fds[0].revents = poll_fds[1].revents = 0;
do
{
poll_ret = g_poll (poll_fds, nfds, -1);
errsv = errno;
}
while (poll_ret == -1 && errsv == EINTR);
if (poll_ret == -1)
{
g_set_error (error, G_IO_ERROR,
g_io_error_from_errno (errsv),
_("Error writing to file descriptor: %s"),
g_strerror (errsv));
break;
}
if (g_cancellable_set_error_if_cancelled (cancellable, error))
break;
if (!poll_fds[0].revents)
continue;
res = writev (unix_stream->priv->fd, iov, n_vectors);
errsv = errno;
if (res == -1)
{
if (errsv == EINTR || errsv == EAGAIN)
continue;
g_set_error (error, G_IO_ERROR,
g_io_error_from_errno (errsv),
_("Error writing to file descriptor: %s"),
g_strerror (errsv));
}
if (bytes_written)
*bytes_written = res;
break;
}
if (nfds == 2)
g_cancellable_release_fd (cancellable);
return res != -1;
}
static gboolean
g_unix_output_stream_close (GOutputStream *stream,
GCancellable *cancellable,
@ -494,3 +617,70 @@ g_unix_output_stream_pollable_create_source (GPollableOutputStream *stream,
return pollable_source;
}
static GPollableReturn
g_unix_output_stream_pollable_writev_nonblocking (GPollableOutputStream *stream,
const GOutputVector *vectors,
gsize n_vectors,
gsize *bytes_written,
GError **error)
{
GUnixOutputStream *unix_stream = G_UNIX_OUTPUT_STREAM (stream);
struct iovec *iov;
gssize res = -1;
if (!g_pollable_output_stream_is_writable (stream))
{
*bytes_written = 0;
return G_POLLABLE_RETURN_WOULD_BLOCK;
}
/* Clamp to G_MAXINT as writev() takes an integer for the number of vectors.
* We handle this like a short write in this case
*/
if (n_vectors > G_MAXINT)
n_vectors = G_MAXINT;
if (G_OUTPUT_VECTOR_IS_IOVEC)
{
/* ABI is compatible */
iov = (struct iovec *) vectors;
}
else
{
gsize i;
/* ABI is incompatible */
iov = g_newa (struct iovec, n_vectors);
for (i = 0; i < n_vectors; i++)
{
iov[i].iov_base = (void *)vectors[i].buffer;
iov[i].iov_len = vectors[i].size;
}
}
while (1)
{
int errsv;
res = writev (unix_stream->priv->fd, iov, n_vectors);
errsv = errno;
if (res == -1)
{
if (errsv == EINTR)
continue;
g_set_error (error, G_IO_ERROR,
g_io_error_from_errno (errsv),
_("Error writing to file descriptor: %s"),
g_strerror (errsv));
}
if (bytes_written)
*bytes_written = res;
break;
}
return res != -1 ? G_POLLABLE_RETURN_OK : G_POLLABLE_RETURN_FAILED;
}

View File

@ -1162,6 +1162,562 @@ test_load_bytes_async (void)
g_main_loop_unref (data.main_loop);
}
static void
test_writev_helper (GOutputVector *vectors,
gsize n_vectors,
gboolean use_bytes_written,
const guint8 *expected_contents,
gsize expected_length)
{
GFile *file;
GFileIOStream *iostream = NULL;
GOutputStream *ostream;
GError *error = NULL;
gsize bytes_written = 0;
gboolean res;
guint8 *contents;
gsize length;
file = g_file_new_tmp ("g_file_writev_XXXXXX",
&iostream, NULL);
g_assert_nonnull (file);
g_assert_nonnull (iostream);
ostream = g_io_stream_get_output_stream (G_IO_STREAM (iostream));
res = g_output_stream_writev_all (ostream, vectors, n_vectors, use_bytes_written ? &bytes_written : NULL, NULL, &error);
g_assert_no_error (error);
g_assert_true (res);
if (use_bytes_written)
g_assert_cmpuint (bytes_written, ==, expected_length);
res = g_io_stream_close (G_IO_STREAM (iostream), NULL, &error);
g_assert_no_error (error);
g_assert_true (res);
g_object_unref (iostream);
res = g_file_load_contents (file, NULL, (gchar **) &contents, &length, NULL, &error);
g_assert_no_error (error);
g_assert_true (res);
g_assert_cmpmem (contents, length, expected_contents, expected_length);
g_free (contents);
g_file_delete (file, NULL, NULL);
g_object_unref (file);
}
/* Test that writev() on local file output streams works on a non-empty vector */
static void
test_writev (void)
{
GOutputVector vectors[3];
const guint8 buffer[] = {1, 2, 3, 4, 5,
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
1, 2, 3};
vectors[0].buffer = buffer;
vectors[0].size = 5;
vectors[1].buffer = buffer + 5;
vectors[1].size = 12;
vectors[2].buffer = buffer + 5 + 12;
vectors[2].size = 3;
test_writev_helper (vectors, G_N_ELEMENTS (vectors), TRUE, buffer, sizeof buffer);
}
/* Test that writev() on local file output streams works on a non-empty vector without returning bytes_written */
static void
test_writev_no_bytes_written (void)
{
GOutputVector vectors[3];
const guint8 buffer[] = {1, 2, 3, 4, 5,
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
1, 2, 3};
vectors[0].buffer = buffer;
vectors[0].size = 5;
vectors[1].buffer = buffer + 5;
vectors[1].size = 12;
vectors[2].buffer = buffer + 5 + 12;
vectors[2].size = 3;
test_writev_helper (vectors, G_N_ELEMENTS (vectors), FALSE, buffer, sizeof buffer);
}
/* Test that writev() on local file output streams works on 0 vectors */
static void
test_writev_no_vectors (void)
{
test_writev_helper (NULL, 0, TRUE, NULL, 0);
}
/* Test that writev() on local file output streams works on empty vectors */
static void
test_writev_empty_vectors (void)
{
GOutputVector vectors[3];
vectors[0].buffer = NULL;
vectors[0].size = 0;
vectors[1].buffer = NULL;
vectors[1].size = 0;
vectors[2].buffer = NULL;
vectors[2].size = 0;
test_writev_helper (vectors, G_N_ELEMENTS (vectors), TRUE, NULL, 0);
}
/* Test that writev() fails if the sum of sizes in the vector is too big */
static void
test_writev_too_big_vectors (void)
{
GFile *file;
GFileIOStream *iostream = NULL;
GOutputStream *ostream;
GError *error = NULL;
gsize bytes_written = 0;
gboolean res;
guint8 *contents;
gsize length;
GOutputVector vectors[3];
vectors[0].buffer = (void*) 1;
vectors[0].size = G_MAXSIZE / 2;
vectors[1].buffer = (void*) 1;
vectors[1].size = G_MAXSIZE / 2;
vectors[2].buffer = (void*) 1;
vectors[2].size = G_MAXSIZE / 2;
file = g_file_new_tmp ("g_file_writev_XXXXXX",
&iostream, NULL);
g_assert_nonnull (file);
g_assert_nonnull (iostream);
ostream = g_io_stream_get_output_stream (G_IO_STREAM (iostream));
res = g_output_stream_writev_all (ostream, vectors, G_N_ELEMENTS (vectors), &bytes_written, NULL, &error);
g_assert_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT);
g_assert_cmpuint (bytes_written, ==, 0);
g_assert_false (res);
g_clear_error (&error);
res = g_io_stream_close (G_IO_STREAM (iostream), NULL, &error);
g_assert_no_error (error);
g_assert_true (res);
g_object_unref (iostream);
res = g_file_load_contents (file, NULL, (gchar **) &contents, &length, NULL, &error);
g_assert_no_error (error);
g_assert_true (res);
g_assert_cmpmem (contents, length, NULL, 0);
g_free (contents);
g_file_delete (file, NULL, NULL);
g_object_unref (file);
}
typedef struct
{
gsize bytes_written;
GOutputVector *vectors;
gsize n_vectors;
GError *error;
gboolean done;
} WritevAsyncData;
static void
test_writev_async_cb (GObject *object,
GAsyncResult *result,
gpointer user_data)
{
GOutputStream *ostream = G_OUTPUT_STREAM (object);
WritevAsyncData *data = user_data;
GError *error = NULL;
gsize bytes_written;
gboolean res;
res = g_output_stream_writev_finish (ostream, result, &bytes_written, &error);
g_assert_true (res);
g_assert_no_error (error);
data->bytes_written += bytes_written;
/* skip vectors that have been written in full */
while (data->n_vectors > 0 && bytes_written >= data->vectors[0].size)
{
bytes_written -= data->vectors[0].size;
++data->vectors;
--data->n_vectors;
}
/* skip partially written vector data */
if (bytes_written > 0 && data->n_vectors > 0)
{
data->vectors[0].size -= bytes_written;
data->vectors[0].buffer = ((guint8 *) data->vectors[0].buffer) + bytes_written;
}
if (data->n_vectors > 0)
g_output_stream_writev_async (ostream, data->vectors, data->n_vectors, 0, NULL, test_writev_async_cb, &data);
}
/* Test that writev_async() on local file output streams works on a non-empty vector */
static void
test_writev_async (void)
{
WritevAsyncData data = { 0 };
GFile *file;
GFileIOStream *iostream = NULL;
GOutputVector vectors[3];
const guint8 buffer[] = {1, 2, 3, 4, 5,
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
1, 2, 3};
GOutputStream *ostream;
GError *error = NULL;
gboolean res;
guint8 *contents;
gsize length;
vectors[0].buffer = buffer;
vectors[0].size = 5;
vectors[1].buffer = buffer + 5;
vectors[1].size = 12;
vectors[2].buffer = buffer + 5 + 12;
vectors[2].size = 3;
file = g_file_new_tmp ("g_file_writev_XXXXXX",
&iostream, NULL);
g_assert_nonnull (file);
g_assert_nonnull (iostream);
data.vectors = vectors;
data.n_vectors = G_N_ELEMENTS (vectors);
ostream = g_io_stream_get_output_stream (G_IO_STREAM (iostream));
g_output_stream_writev_async (ostream, data.vectors, data.n_vectors, 0, NULL, test_writev_async_cb, &data);
while (data.n_vectors > 0)
g_main_context_iteration (NULL, TRUE);
g_assert_cmpuint (data.bytes_written, ==, sizeof buffer);
res = g_io_stream_close (G_IO_STREAM (iostream), NULL, &error);
g_assert_no_error (error);
g_assert_true (res);
g_object_unref (iostream);
res = g_file_load_contents (file, NULL, (gchar **) &contents, &length, NULL, &error);
g_assert_no_error (error);
g_assert_true (res);
g_assert_cmpmem (contents, length, buffer, sizeof buffer);
g_free (contents);
g_file_delete (file, NULL, NULL);
g_object_unref (file);
}
static void
test_writev_all_cb (GObject *object,
GAsyncResult *result,
gpointer user_data)
{
GOutputStream *ostream = G_OUTPUT_STREAM (object);
WritevAsyncData *data = user_data;
g_output_stream_writev_all_finish (ostream, result, &data->bytes_written, &data->error);
data->done = TRUE;
}
/* Test that writev_async_all() on local file output streams works on a non-empty vector */
static void
test_writev_async_all (void)
{
WritevAsyncData data = { 0 };
GFile *file;
GFileIOStream *iostream = NULL;
GOutputStream *ostream;
GOutputVector vectors[3];
const guint8 buffer[] = {1, 2, 3, 4, 5,
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
1, 2, 3};
GError *error = NULL;
gboolean res;
guint8 *contents;
gsize length;
vectors[0].buffer = buffer;
vectors[0].size = 5;
vectors[1].buffer = buffer + 5;
vectors[1].size = 12;
vectors[2].buffer = buffer + 5 + 12;
vectors[2].size = 3;
file = g_file_new_tmp ("g_file_writev_XXXXXX",
&iostream, NULL);
g_assert_nonnull (file);
g_assert_nonnull (iostream);
ostream = g_io_stream_get_output_stream (G_IO_STREAM (iostream));
g_output_stream_writev_all_async (ostream, vectors, G_N_ELEMENTS (vectors), 0, NULL, test_writev_all_cb, &data);
while (!data.done)
g_main_context_iteration (NULL, TRUE);
g_assert_cmpuint (data.bytes_written, ==, sizeof buffer);
g_assert_no_error (data.error);
res = g_io_stream_close (G_IO_STREAM (iostream), NULL, &error);
g_assert_no_error (error);
g_assert_true (res);
g_object_unref (iostream);
res = g_file_load_contents (file, NULL, (gchar **) &contents, &length, NULL, &error);
g_assert_no_error (error);
g_assert_true (res);
g_assert_cmpmem (contents, length, buffer, sizeof buffer);
g_free (contents);
g_file_delete (file, NULL, NULL);
g_object_unref (file);
}
/* Test that writev_async_all() on local file output streams handles cancellation correctly */
static void
test_writev_async_all_cancellation (void)
{
WritevAsyncData data = { 0 };
GFile *file;
GFileIOStream *iostream = NULL;
GOutputVector vectors[3];
const guint8 buffer[] = {1, 2, 3, 4, 5,
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
1, 2, 3};
GOutputStream *ostream;
GError *error = NULL;
gboolean res;
guint8 *contents;
gsize length;
GCancellable *cancellable;
vectors[0].buffer = buffer;
vectors[0].size = 5;
vectors[1].buffer = buffer + 5;
vectors[1].size = 12;
vectors[2].buffer = buffer + 5 + 12;
vectors[2].size = 3;
file = g_file_new_tmp ("g_file_writev_XXXXXX",
&iostream, NULL);
g_assert_nonnull (file);
g_assert_nonnull (iostream);
ostream = g_io_stream_get_output_stream (G_IO_STREAM (iostream));
cancellable = g_cancellable_new ();
g_output_stream_writev_all_async (ostream, vectors, G_N_ELEMENTS (vectors), 0, cancellable, test_writev_all_cb, &data);
g_cancellable_cancel (cancellable);
while (!data.done)
g_main_context_iteration (NULL, TRUE);
g_assert_cmpuint (data.bytes_written, ==, 0);
g_assert_error (data.error, G_IO_ERROR, G_IO_ERROR_CANCELLED);
g_clear_error (&data.error);
res = g_io_stream_close (G_IO_STREAM (iostream), NULL, &error);
g_assert_no_error (error);
g_assert_true (res);
g_object_unref (iostream);
res = g_file_load_contents (file, NULL, (gchar **) &contents, &length, NULL, &error);
g_assert_no_error (error);
g_assert_true (res);
g_assert_cmpuint (length, ==, 0);
g_free (contents);
g_file_delete (file, NULL, NULL);
g_object_unref (file);
g_object_unref (cancellable);
}
/* Test that writev_async_all() with empty vectors is handled correctly */
static void
test_writev_async_all_empty_vectors (void)
{
WritevAsyncData data = { 0 };
GFile *file;
GFileIOStream *iostream = NULL;
GOutputVector vectors[3];
GOutputStream *ostream;
GError *error = NULL;
gboolean res;
guint8 *contents;
gsize length;
vectors[0].buffer = NULL;
vectors[0].size = 0;
vectors[1].buffer = NULL;
vectors[1].size = 0;
vectors[2].buffer = NULL;
vectors[2].size = 0;
file = g_file_new_tmp ("g_file_writev_XXXXXX",
&iostream, NULL);
g_assert_nonnull (file);
g_assert_nonnull (iostream);
ostream = g_io_stream_get_output_stream (G_IO_STREAM (iostream));
g_output_stream_writev_all_async (ostream, vectors, G_N_ELEMENTS (vectors), 0, NULL, test_writev_all_cb, &data);
while (!data.done)
g_main_context_iteration (NULL, TRUE);
g_assert_cmpuint (data.bytes_written, ==, 0);
g_assert_no_error (data.error);
g_clear_error (&data.error);
res = g_io_stream_close (G_IO_STREAM (iostream), NULL, &error);
g_assert_no_error (error);
g_assert_true (res);
g_object_unref (iostream);
res = g_file_load_contents (file, NULL, (gchar **) &contents, &length, NULL, &error);
g_assert_no_error (error);
g_assert_true (res);
g_assert_cmpuint (length, ==, 0);
g_free (contents);
g_file_delete (file, NULL, NULL);
g_object_unref (file);
}
/* Test that writev_async_all() with no vectors is handled correctly */
static void
test_writev_async_all_no_vectors (void)
{
WritevAsyncData data = { 0 };
GFile *file;
GFileIOStream *iostream = NULL;
GOutputStream *ostream;
GError *error = NULL;
gboolean res;
guint8 *contents;
gsize length;
file = g_file_new_tmp ("g_file_writev_XXXXXX",
&iostream, NULL);
g_assert_nonnull (file);
g_assert_nonnull (iostream);
ostream = g_io_stream_get_output_stream (G_IO_STREAM (iostream));
g_output_stream_writev_all_async (ostream, NULL, 0, 0, NULL, test_writev_all_cb, &data);
while (!data.done)
g_main_context_iteration (NULL, TRUE);
g_assert_cmpuint (data.bytes_written, ==, 0);
g_assert_no_error (data.error);
g_clear_error (&data.error);
res = g_io_stream_close (G_IO_STREAM (iostream), NULL, &error);
g_assert_no_error (error);
g_assert_true (res);
g_object_unref (iostream);
res = g_file_load_contents (file, NULL, (gchar **) &contents, &length, NULL, &error);
g_assert_no_error (error);
g_assert_true (res);
g_assert_cmpuint (length, ==, 0);
g_free (contents);
g_file_delete (file, NULL, NULL);
g_object_unref (file);
}
/* Test that writev_async_all() with too big vectors is handled correctly */
static void
test_writev_async_all_too_big_vectors (void)
{
WritevAsyncData data = { 0 };
GFile *file;
GFileIOStream *iostream = NULL;
GOutputVector vectors[3];
GOutputStream *ostream;
GError *error = NULL;
gboolean res;
guint8 *contents;
gsize length;
vectors[0].buffer = (void*) 1;
vectors[0].size = G_MAXSIZE / 2;
vectors[1].buffer = (void*) 1;
vectors[1].size = G_MAXSIZE / 2;
vectors[2].buffer = (void*) 1;
vectors[2].size = G_MAXSIZE / 2;
file = g_file_new_tmp ("g_file_writev_XXXXXX",
&iostream, NULL);
g_assert_nonnull (file);
g_assert_nonnull (iostream);
ostream = g_io_stream_get_output_stream (G_IO_STREAM (iostream));
g_output_stream_writev_all_async (ostream, vectors, G_N_ELEMENTS (vectors), 0, NULL, test_writev_all_cb, &data);
while (!data.done)
g_main_context_iteration (NULL, TRUE);
g_assert_cmpuint (data.bytes_written, ==, 0);
g_assert_error (data.error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT);
g_clear_error (&data.error);
res = g_io_stream_close (G_IO_STREAM (iostream), NULL, &error);
g_assert_no_error (error);
g_assert_true (res);
g_object_unref (iostream);
res = g_file_load_contents (file, NULL, (gchar **) &contents, &length, NULL, &error);
g_assert_no_error (error);
g_assert_true (res);
g_assert_cmpuint (length, ==, 0);
g_free (contents);
g_file_delete (file, NULL, NULL);
g_object_unref (file);
}
int
main (int argc, char *argv[])
{
@ -1190,6 +1746,17 @@ main (int argc, char *argv[])
g_test_add_func ("/file/measure-async", test_measure_async);
g_test_add_func ("/file/load-bytes", test_load_bytes);
g_test_add_func ("/file/load-bytes-async", test_load_bytes_async);
g_test_add_func ("/file/writev", test_writev);
g_test_add_func ("/file/writev/no-bytes-written", test_writev_no_bytes_written);
g_test_add_func ("/file/writev/no-vectors", test_writev_no_vectors);
g_test_add_func ("/file/writev/empty-vectors", test_writev_empty_vectors);
g_test_add_func ("/file/writev/too-big-vectors", test_writev_too_big_vectors);
g_test_add_func ("/file/writev/async", test_writev_async);
g_test_add_func ("/file/writev/async_all", test_writev_async_all);
g_test_add_func ("/file/writev/async_all-empty-vectors", test_writev_async_all_empty_vectors);
g_test_add_func ("/file/writev/async_all-no-vectors", test_writev_async_all_no_vectors);
g_test_add_func ("/file/writev/async_all-to-big-vectors", test_writev_async_all_too_big_vectors);
g_test_add_func ("/file/writev/async_all-cancellation", test_writev_async_all_cancellation);
return g_test_run ();
}

View File

@ -300,6 +300,94 @@ test_write_bytes (void)
g_bytes_unref (bytes2);
}
/* Test that writev() works on #GMemoryOutputStream with a non-empty set of vectors. This
* covers the default writev() implementation around write(). */
static void
test_writev (void)
{
GOutputStream *mo;
GError *error = NULL;
gboolean res;
gsize bytes_written;
GOutputVector vectors[3];
const guint8 buffer[] = {1, 2, 3, 4, 5,
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
1, 2, 3};
guint8 *output_buffer;
vectors[0].buffer = buffer;
vectors[0].size = 5;
vectors[1].buffer = buffer + vectors[0].size;
vectors[1].size = 12;
vectors[2].buffer = buffer + vectors[0].size + vectors[1].size;
vectors[2].size = 3;
mo = (GOutputStream*) g_object_new (G_TYPE_MEMORY_OUTPUT_STREAM,
"realloc-function", g_realloc,
"destroy-function", g_free,
NULL);
res = g_output_stream_writev_all (mo, vectors, G_N_ELEMENTS (vectors), &bytes_written, NULL, &error);
g_assert_no_error (error);
g_assert_true (res);
g_assert_cmpuint (bytes_written, ==, sizeof buffer);
g_output_stream_close (mo, NULL, &error);
g_assert_no_error (error);
g_assert_cmpuint (g_memory_output_stream_get_data_size (G_MEMORY_OUTPUT_STREAM (mo)), ==, sizeof buffer);
output_buffer = g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (mo));
g_assert_cmpmem (output_buffer, sizeof buffer, buffer, sizeof buffer);
g_object_unref (mo);
}
/* Test that writev_nonblocking() works on #GMemoryOutputStream with a non-empty set of vectors. This
* covers the default writev_nonblocking() implementation around write_nonblocking(). */
static void
test_writev_nonblocking (void)
{
GOutputStream *mo;
GError *error = NULL;
gboolean res;
gsize bytes_written;
GOutputVector vectors[3];
const guint8 buffer[] = {1, 2, 3, 4, 5,
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
1, 2, 3};
guint8 *output_buffer;
vectors[0].buffer = buffer;
vectors[0].size = 5;
vectors[1].buffer = buffer + vectors[0].size;
vectors[1].size = 12;
vectors[2].buffer = buffer + vectors[0].size + vectors[1].size;
vectors[2].size = 3;
mo = (GOutputStream*) g_object_new (G_TYPE_MEMORY_OUTPUT_STREAM,
"realloc-function", g_realloc,
"destroy-function", g_free,
NULL);
res = g_pollable_output_stream_writev_nonblocking (G_POLLABLE_OUTPUT_STREAM (mo),
vectors, G_N_ELEMENTS (vectors),
&bytes_written, NULL, &error);
g_assert_no_error (error);
g_assert_cmpint (res, ==, G_POLLABLE_RETURN_OK);
g_assert_cmpuint (bytes_written, ==, sizeof buffer);
g_output_stream_close (mo, NULL, &error);
g_assert_no_error (error);
g_assert_cmpuint (g_memory_output_stream_get_data_size (G_MEMORY_OUTPUT_STREAM (mo)), ==, sizeof buffer);
output_buffer = g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (mo));
g_assert_cmpmem (output_buffer, sizeof buffer, buffer, sizeof buffer);
g_object_unref (mo);
}
static void
test_steal_as_bytes (void)
{
@ -350,6 +438,8 @@ main (int argc,
g_test_add_func ("/memory-output-stream/get-data-size", test_data_size);
g_test_add_func ("/memory-output-stream/properties", test_properties);
g_test_add_func ("/memory-output-stream/write-bytes", test_write_bytes);
g_test_add_func ("/memory-output-stream/writev", test_writev);
g_test_add_func ("/memory-output-stream/writev_nonblocking", test_writev_nonblocking);
g_test_add_func ("/memory-output-stream/steal_as_bytes", test_steal_as_bytes);
return g_test_run();

View File

@ -232,6 +232,316 @@ test_threaded_712570 (void)
g_mutex_unlock (&mutex_712570);
}
static void
closed_read_write_async_cb (GSocketConnection *conn,
GAsyncResult *result,
gpointer user_data)
{
GError *error = NULL;
gboolean res;
res = g_io_stream_close_finish (G_IO_STREAM (conn), result, &error);
g_assert_no_error (error);
g_assert_true (res);
}
typedef struct {
GSocketConnection *conn;
guint8 *data;
} WriteAsyncData;
static void
written_read_write_async_cb (GOutputStream *ostream,
GAsyncResult *result,
gpointer user_data)
{
WriteAsyncData *data = user_data;
GError *error = NULL;
gboolean res;
gsize bytes_written;
GSocketConnection *conn;
conn = data->conn;
g_free (data->data);
g_free (data);
res = g_output_stream_write_all_finish (ostream, result, &bytes_written, &error);
g_assert_no_error (error);
g_assert_true (res);
g_assert_cmpuint (bytes_written, ==, 20);
g_io_stream_close_async (G_IO_STREAM (conn),
G_PRIORITY_DEFAULT,
NULL,
(GAsyncReadyCallback) closed_read_write_async_cb,
NULL);
g_object_unref (conn);
}
static void
connected_read_write_async_cb (GObject *client,
GAsyncResult *result,
gpointer user_data)
{
GSocketConnection *conn;
GOutputStream *ostream;
GError *error = NULL;
WriteAsyncData *data;
gsize i;
GSocketConnection **sconn = user_data;
conn = g_socket_client_connect_finish (G_SOCKET_CLIENT (client), result, &error);
g_assert_no_error (error);
g_assert_nonnull (conn);
ostream = g_io_stream_get_output_stream (G_IO_STREAM (conn));
data = g_new0 (WriteAsyncData, 1);
data->conn = conn;
data->data = g_new0 (guint8, 20);
for (i = 0; i < 20; i++)
data->data[i] = i;
g_output_stream_write_all_async (ostream,
data->data,
20,
G_PRIORITY_DEFAULT,
NULL,
(GAsyncReadyCallback) written_read_write_async_cb,
data /* stolen */);
*sconn = g_object_ref (conn);
}
typedef struct {
GSocketConnection *conn;
GOutputVector *vectors;
guint n_vectors;
guint8 *data;
} WritevAsyncData;
static void
writtenv_read_write_async_cb (GOutputStream *ostream,
GAsyncResult *result,
gpointer user_data)
{
WritevAsyncData *data = user_data;
GError *error = NULL;
gboolean res;
gsize bytes_written;
GSocketConnection *conn;
conn = data->conn;
g_free (data->data);
g_free (data);
res = g_output_stream_writev_all_finish (ostream, result, &bytes_written, &error);
g_assert_no_error (error);
g_assert_true (res);
g_assert_cmpuint (bytes_written, ==, 20);
g_io_stream_close_async (G_IO_STREAM (conn),
G_PRIORITY_DEFAULT,
NULL,
(GAsyncReadyCallback) closed_read_write_async_cb,
NULL);
g_object_unref (conn);
}
static void
connected_read_writev_async_cb (GObject *client,
GAsyncResult *result,
gpointer user_data)
{
GSocketConnection *conn;
GOutputStream *ostream;
GError *error = NULL;
WritevAsyncData *data;
gsize i;
GSocketConnection **sconn = user_data;
conn = g_socket_client_connect_finish (G_SOCKET_CLIENT (client), result, &error);
g_assert_no_error (error);
g_assert_nonnull (conn);
ostream = g_io_stream_get_output_stream (G_IO_STREAM (conn));
data = g_new0 (WritevAsyncData, 1);
data->conn = conn;
data->vectors = g_new0 (GOutputVector, 3);
data->n_vectors = 3;
data->data = g_new0 (guint8, 20);
for (i = 0; i < 20; i++)
data->data[i] = i;
data->vectors[0].buffer = data->data;
data->vectors[0].size = 5;
data->vectors[1].buffer = data->data + 5;
data->vectors[1].size = 10;
data->vectors[2].buffer = data->data + 15;
data->vectors[2].size = 5;
g_output_stream_writev_all_async (ostream,
data->vectors,
data->n_vectors,
G_PRIORITY_DEFAULT,
NULL,
(GAsyncReadyCallback) writtenv_read_write_async_cb,
data /* stolen */);
*sconn = g_object_ref (conn);
}
typedef struct {
GSocketConnection *conn;
guint8 *data;
} ReadAsyncData;
static void
read_read_write_async_cb (GInputStream *istream,
GAsyncResult *result,
gpointer user_data)
{
ReadAsyncData *data = user_data;
GError *error = NULL;
gboolean res;
gsize bytes_read;
GSocketConnection *conn;
const guint8 expected_data[] = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19 };
res = g_input_stream_read_all_finish (istream, result, &bytes_read, &error);
g_assert_no_error (error);
g_assert_true (res);
g_assert_cmpmem (expected_data, sizeof expected_data, data->data, bytes_read);
conn = data->conn;
g_object_set_data (G_OBJECT (conn), "test-data-read", GINT_TO_POINTER (TRUE));
g_free (data->data);
g_free (data);
g_io_stream_close_async (G_IO_STREAM (conn),
G_PRIORITY_DEFAULT,
NULL,
(GAsyncReadyCallback) closed_read_write_async_cb,
NULL);
g_object_unref (conn);
}
static void
incoming_read_write_async_cb (GSocketService *service,
GSocketConnection *conn,
GObject *source_object,
gpointer user_data)
{
ReadAsyncData *data;
GSocketConnection **cconn = user_data;
GInputStream *istream;
istream = g_io_stream_get_input_stream (G_IO_STREAM (conn));
data = g_new0 (ReadAsyncData, 1);
data->conn = g_object_ref (conn);
data->data = g_new0 (guint8, 20);
g_input_stream_read_all_async (istream,
data->data,
20,
G_PRIORITY_DEFAULT,
NULL,
(GAsyncReadyCallback) read_read_write_async_cb,
data /* stolen */);
*cconn = g_object_ref (conn);
}
static void
test_read_write_async_internal (gboolean writev)
{
GInetAddress *iaddr;
GSocketAddress *saddr, *listening_addr;
GSocketService *service;
GError *error = NULL;
GSocketClient *client;
GSocketConnection *sconn = NULL, *cconn = NULL;
iaddr = g_inet_address_new_loopback (G_SOCKET_FAMILY_IPV4);
saddr = g_inet_socket_address_new (iaddr, 0);
g_object_unref (iaddr);
service = g_socket_service_new ();
g_socket_listener_add_address (G_SOCKET_LISTENER (service),
saddr,
G_SOCKET_TYPE_STREAM,
G_SOCKET_PROTOCOL_TCP,
NULL,
&listening_addr,
&error);
g_assert_no_error (error);
g_object_unref (saddr);
g_signal_connect (service, "incoming", G_CALLBACK (incoming_read_write_async_cb), &sconn);
client = g_socket_client_new ();
if (writev)
g_socket_client_connect_async (client,
G_SOCKET_CONNECTABLE (listening_addr),
NULL,
connected_read_writev_async_cb,
&cconn);
else
g_socket_client_connect_async (client,
G_SOCKET_CONNECTABLE (listening_addr),
NULL,
connected_read_write_async_cb,
&cconn);
g_object_unref (client);
g_object_unref (listening_addr);
g_socket_service_start (service);
g_assert_true (g_socket_service_is_active (service));
do
{
g_main_context_iteration (NULL, TRUE);
}
while (!sconn || !cconn ||
!g_io_stream_is_closed (G_IO_STREAM (sconn)) ||
!g_io_stream_is_closed (G_IO_STREAM (cconn)));
g_assert_true (GPOINTER_TO_INT (g_object_get_data (G_OBJECT (sconn), "test-data-read")));
g_object_unref (sconn);
g_object_unref (cconn);
g_object_unref (service);
}
/* Test if connecting to a socket service and asynchronously writing data on
* one side followed by reading the same data on the other side of the
* connection works correctly
*/
static void
test_read_write_async (void)
{
test_read_write_async_internal (FALSE);
}
/* Test if connecting to a socket service and asynchronously writing data on
* one side followed by reading the same data on the other side of the
* connection works correctly. This uses writev() instead of normal write().
*/
static void
test_read_writev_async (void)
{
test_read_write_async_internal (TRUE);
}
int
main (int argc,
char *argv[])
@ -242,6 +552,8 @@ main (int argc,
g_test_add_func ("/socket-service/start-stop", test_start_stop);
g_test_add_func ("/socket-service/threaded/712570", test_threaded_712570);
g_test_add_func ("/socket-service/read_write_async", test_read_write_async);
g_test_add_func ("/socket-service/read_writev_async", test_read_writev_async);
return g_test_run();
}

View File

@ -1709,6 +1709,154 @@ test_get_available (gconstpointer user_data)
g_object_unref (client);
}
typedef struct {
GInputStream *is;
GOutputStream *os;
const guint8 *write_data;
guint8 *read_data;
} TestReadWriteData;
static gpointer
test_read_write_write_thread (gpointer user_data)
{
TestReadWriteData *data = user_data;
gsize bytes_written;
GError *error = NULL;
gboolean res;
res = g_output_stream_write_all (data->os, data->write_data, 1024, &bytes_written, NULL, &error);
g_assert_true (res);
g_assert_no_error (error);
g_assert_cmpint (bytes_written, ==, 1024);
return NULL;
}
static gpointer
test_read_write_read_thread (gpointer user_data)
{
TestReadWriteData *data = user_data;
gsize bytes_read;
GError *error = NULL;
gboolean res;
res = g_input_stream_read_all (data->is, data->read_data, 1024, &bytes_read, NULL, &error);
g_assert_true (res);
g_assert_no_error (error);
g_assert_cmpint (bytes_read, ==, 1024);
return NULL;
}
static gpointer
test_read_write_writev_thread (gpointer user_data)
{
TestReadWriteData *data = user_data;
gsize bytes_written;
GError *error = NULL;
gboolean res;
GOutputVector vectors[3];
vectors[0].buffer = data->write_data;
vectors[0].size = 256;
vectors[1].buffer = data->write_data + 256;
vectors[1].size = 256;
vectors[2].buffer = data->write_data + 512;
vectors[2].size = 512;
res = g_output_stream_writev_all (data->os, vectors, G_N_ELEMENTS (vectors), &bytes_written, NULL, &error);
g_assert_true (res);
g_assert_no_error (error);
g_assert_cmpint (bytes_written, ==, 1024);
return NULL;
}
/* test if normal read/write/writev via the GSocket*Streams works on TCP sockets */
static void
test_read_write (gconstpointer user_data)
{
gboolean writev = GPOINTER_TO_INT (user_data);
GError *err = NULL;
GSocket *listener, *server, *client;
GInetAddress *addr;
GSocketAddress *saddr;
TestReadWriteData data;
guint8 data_write[1024], data_read[1024];
GSocketConnection *server_stream, *client_stream;
GThread *write_thread, *read_thread;
guint i;
listener = g_socket_new (G_SOCKET_FAMILY_IPV4,
G_SOCKET_TYPE_STREAM,
G_SOCKET_PROTOCOL_DEFAULT,
&err);
g_assert_no_error (err);
g_assert (G_IS_SOCKET (listener));
client = g_socket_new (G_SOCKET_FAMILY_IPV4,
G_SOCKET_TYPE_STREAM,
G_SOCKET_PROTOCOL_DEFAULT,
&err);
g_assert_no_error (err);
g_assert (G_IS_SOCKET (client));
addr = g_inet_address_new_any (G_SOCKET_FAMILY_IPV4);
saddr = g_inet_socket_address_new (addr, 0);
g_socket_bind (listener, saddr, TRUE, &err);
g_assert_no_error (err);
g_object_unref (saddr);
g_object_unref (addr);
saddr = g_socket_get_local_address (listener, &err);
g_assert_no_error (err);
g_socket_listen (listener, &err);
g_assert_no_error (err);
g_socket_connect (client, saddr, NULL, &err);
g_assert_no_error (err);
server = g_socket_accept (listener, NULL, &err);
g_assert_no_error (err);
g_socket_set_blocking (server, FALSE);
g_object_unref (listener);
server_stream = g_socket_connection_factory_create_connection (server);
g_assert_nonnull (server_stream);
client_stream = g_socket_connection_factory_create_connection (client);
g_assert_nonnull (client_stream);
for (i = 0; i < sizeof (data_write); i++)
data_write[i] = i;
data.is = g_io_stream_get_input_stream (G_IO_STREAM (server_stream));
data.os = g_io_stream_get_output_stream (G_IO_STREAM (client_stream));
data.read_data = data_read;
data.write_data = data_write;
if (writev)
write_thread = g_thread_new ("writer", test_read_write_writev_thread, &data);
else
write_thread = g_thread_new ("writer", test_read_write_write_thread, &data);
read_thread = g_thread_new ("reader", test_read_write_read_thread, &data);
g_thread_join (write_thread);
g_thread_join (read_thread);
g_assert_cmpmem (data_write, sizeof data_write, data_read, sizeof data_read);
g_socket_close (server, &err);
g_assert_no_error (err);
g_object_unref (server_stream);
g_object_unref (client_stream);
g_object_unref (saddr);
g_object_unref (server);
g_object_unref (client);
}
int
main (int argc,
char *argv[])
@ -1761,6 +1909,10 @@ main (int argc,
test_get_available);
g_test_add_data_func ("/socket/get_available/stream", GUINT_TO_POINTER (G_SOCKET_TYPE_STREAM),
test_get_available);
g_test_add_data_func ("/socket/read_write", GUINT_TO_POINTER (FALSE),
test_read_write);
g_test_add_data_func ("/socket/read_writev", GUINT_TO_POINTER (TRUE),
test_read_write);
return g_test_run();
}

View File

@ -27,6 +27,7 @@
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#define DATA "abcdefghijklmnopqrstuvwxyz"
@ -351,6 +352,476 @@ test_basic (void)
g_object_unref (os);
}
typedef struct {
GInputStream *is;
GOutputStream *os;
const guint8 *write_data;
guint8 *read_data;
} TestReadWriteData;
static gpointer
test_read_write_write_thread (gpointer user_data)
{
TestReadWriteData *data = user_data;
gsize bytes_written;
GError *error = NULL;
gboolean res;
res = g_output_stream_write_all (data->os, data->write_data, 1024, &bytes_written, NULL, &error);
g_assert_true (res);
g_assert_no_error (error);
g_assert_cmpuint (bytes_written, ==, 1024);
return NULL;
}
static gpointer
test_read_write_read_thread (gpointer user_data)
{
TestReadWriteData *data = user_data;
gsize bytes_read;
GError *error = NULL;
gboolean res;
res = g_input_stream_read_all (data->is, data->read_data, 1024, &bytes_read, NULL, &error);
g_assert_true (res);
g_assert_no_error (error);
g_assert_cmpuint (bytes_read, ==, 1024);
return NULL;
}
static gpointer
test_read_write_writev_thread (gpointer user_data)
{
TestReadWriteData *data = user_data;
gsize bytes_written;
GError *error = NULL;
gboolean res;
GOutputVector vectors[3];
vectors[0].buffer = data->write_data;
vectors[0].size = 256;
vectors[1].buffer = data->write_data + 256;
vectors[1].size = 256;
vectors[2].buffer = data->write_data + 512;
vectors[2].size = 512;
res = g_output_stream_writev_all (data->os, vectors, G_N_ELEMENTS (vectors), &bytes_written, NULL, &error);
g_assert_true (res);
g_assert_no_error (error);
g_assert_cmpuint (bytes_written, ==, 1024);
return NULL;
}
/* test if normal writing/reading from a pipe works */
static void
test_read_write (gconstpointer user_data)
{
gboolean writev = GPOINTER_TO_INT (user_data);
GUnixInputStream *is;
GUnixOutputStream *os;
gint fd[2];
guint8 data_write[1024], data_read[1024];
guint i;
GThread *write_thread, *read_thread;
TestReadWriteData data;
for (i = 0; i < sizeof (data_write); i++)
data_write[i] = i;
g_assert_cmpint (pipe (fd), ==, 0);
is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
data.is = G_INPUT_STREAM (is);
data.os = G_OUTPUT_STREAM (os);
data.read_data = data_read;
data.write_data = data_write;
if (writev)
write_thread = g_thread_new ("writer", test_read_write_writev_thread, &data);
else
write_thread = g_thread_new ("writer", test_read_write_write_thread, &data);
read_thread = g_thread_new ("reader", test_read_write_read_thread, &data);
g_thread_join (write_thread);
g_thread_join (read_thread);
g_assert_cmpmem (data_write, sizeof data_write, data_read, sizeof data_read);
g_object_unref (os);
g_object_unref (is);
}
/* test if g_pollable_output_stream_write_nonblocking() and
* g_pollable_output_stream_read_nonblocking() correctly return WOULD_BLOCK
* and correctly reset their status afterwards again, and all data that is
* written can also be read again.
*/
static void
test_write_wouldblock (void)
{
#ifndef F_GETPIPE_SZ
g_test_skip ("F_GETPIPE_SZ not defined");
#else /* if F_GETPIPE_SZ */
GUnixInputStream *is;
GUnixOutputStream *os;
gint fd[2];
GError *err = NULL;
guint8 data_write[1024], data_read[1024];
guint i;
gint pipe_capacity;
for (i = 0; i < sizeof (data_write); i++)
data_write[i] = i;
g_assert_cmpint (pipe (fd), ==, 0);
g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
pipe_capacity = fcntl (fd[0], F_GETPIPE_SZ, &pipe_capacity, NULL);
g_assert_cmpint (pipe_capacity, >=, 4096);
g_assert_cmpint (pipe_capacity % 1024, >=, 0);
is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
/* Run the whole thing three times to make sure that the streams
* reset the writability/readability state again */
for (i = 0; i < 3; i++) {
gssize written = 0, written_complete = 0;
gssize read = 0, read_complete = 0;
do
{
written_complete += written;
written = g_pollable_output_stream_write_nonblocking (G_POLLABLE_OUTPUT_STREAM (os),
data_write,
sizeof (data_write),
NULL,
&err);
}
while (written > 0);
g_assert_cmpuint (written_complete, >, 0);
g_assert_nonnull (err);
g_assert_error (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
g_clear_error (&err);
do
{
read_complete += read;
read = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is),
data_read,
sizeof (data_read),
NULL,
&err);
if (read > 0)
g_assert_cmpmem (data_read, read, data_write, sizeof (data_write));
}
while (read > 0);
g_assert_cmpuint (read_complete, ==, written_complete);
g_assert_nonnull (err);
g_assert_error (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
g_clear_error (&err);
}
g_object_unref (os);
g_object_unref (is);
#endif /* if F_GETPIPE_SZ */
}
/* test if g_pollable_output_stream_writev_nonblocking() and
* g_pollable_output_stream_read_nonblocking() correctly return WOULD_BLOCK
* and correctly reset their status afterwards again, and all data that is
* written can also be read again.
*/
static void
test_writev_wouldblock (void)
{
#ifndef F_GETPIPE_SZ
g_test_skip ("F_GETPIPE_SZ not defined");
#else /* if F_GETPIPE_SZ */
GUnixInputStream *is;
GUnixOutputStream *os;
gint fd[2];
GError *err = NULL;
guint8 data_write[1024], data_read[1024];
guint i;
GOutputVector vectors[4];
GPollableReturn res;
gint pipe_capacity;
for (i = 0; i < sizeof (data_write); i++)
data_write[i] = i;
g_assert_cmpint (pipe (fd), ==, 0);
g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
pipe_capacity = fcntl (fd[0], F_GETPIPE_SZ, &pipe_capacity, NULL);
g_assert_cmpint (pipe_capacity, >=, 4096);
g_assert_cmpint (pipe_capacity % 1024, >=, 0);
is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
/* Run the whole thing three times to make sure that the streams
* reset the writability/readability state again */
for (i = 0; i < 3; i++) {
gsize written = 0, written_complete = 0;
gssize read = 0, read_complete = 0;
do
{
written_complete += written;
vectors[0].buffer = data_write;
vectors[0].size = 256;
vectors[1].buffer = data_write + 256;
vectors[1].size = 256;
vectors[2].buffer = data_write + 512;
vectors[2].size = 256;
vectors[3].buffer = data_write + 768;
vectors[3].size = 256;
res = g_pollable_output_stream_writev_nonblocking (G_POLLABLE_OUTPUT_STREAM (os),
vectors,
G_N_ELEMENTS (vectors),
&written,
NULL,
&err);
}
while (res == G_POLLABLE_RETURN_OK);
g_assert_cmpuint (written_complete, >, 0);
g_assert_null (err);
g_assert_cmpint (res, ==, G_POLLABLE_RETURN_WOULD_BLOCK);
/* writev() on UNIX streams either succeeds fully or not at all */
g_assert_cmpuint (written, ==, 0);
do
{
read_complete += read;
read = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is),
data_read,
sizeof (data_read),
NULL,
&err);
if (read > 0)
g_assert_cmpmem (data_read, read, data_write, sizeof (data_write));
}
while (read > 0);
g_assert_cmpuint (read_complete, ==, written_complete);
g_assert_nonnull (err);
g_assert_error (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
g_clear_error (&err);
}
g_object_unref (os);
g_object_unref (is);
#endif /* if F_GETPIPE_SZ */
}
#ifdef F_GETPIPE_SZ
static void
write_async_wouldblock_cb (GUnixOutputStream *os,
GAsyncResult *result,
gpointer user_data)
{
gsize *bytes_written = user_data;
GError *err = NULL;
g_output_stream_write_all_finish (G_OUTPUT_STREAM (os), result, bytes_written, &err);
g_assert_no_error (err);
}
static void
read_async_wouldblock_cb (GUnixInputStream *is,
GAsyncResult *result,
gpointer user_data)
{
gsize *bytes_read = user_data;
GError *err = NULL;
g_input_stream_read_all_finish (G_INPUT_STREAM (is), result, bytes_read, &err);
g_assert_no_error (err);
}
#endif /* if F_GETPIPE_SZ */
/* test if the async implementation of write_all() and read_all() in G*Stream
* around the GPollable*Stream API is working correctly.
*/
static void
test_write_async_wouldblock (void)
{
#ifndef F_GETPIPE_SZ
g_test_skip ("F_GETPIPE_SZ not defined");
#else /* if F_GETPIPE_SZ */
GUnixInputStream *is;
GUnixOutputStream *os;
gint fd[2];
guint8 *data, *data_read;
guint i;
gint pipe_capacity;
gsize bytes_written = 0, bytes_read = 0;
g_assert_cmpint (pipe (fd), ==, 0);
/* FIXME: These should not be needed but otherwise
* g_unix_output_stream_write() will block because
* a) the fd is writable
* b) writing 4x capacity will block because writes are atomic
* c) the fd is blocking
*
* See https://gitlab.gnome.org/GNOME/glib/issues/1654
*/
g_unix_set_fd_nonblocking (fd[0], TRUE, NULL);
g_unix_set_fd_nonblocking (fd[1], TRUE, NULL);
g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
pipe_capacity = fcntl (fd[0], F_GETPIPE_SZ, &pipe_capacity, NULL);
g_assert_cmpint (pipe_capacity, >=, 4096);
data = g_new (guint8, 4 * pipe_capacity);
for (i = 0; i < 4 * pipe_capacity; i++)
data[i] = i;
data_read = g_new (guint8, 4 * pipe_capacity);
is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
g_output_stream_write_all_async (G_OUTPUT_STREAM (os),
data,
4 * pipe_capacity,
G_PRIORITY_DEFAULT,
NULL,
(GAsyncReadyCallback) write_async_wouldblock_cb,
&bytes_written);
g_input_stream_read_all_async (G_INPUT_STREAM (is),
data_read,
4 * pipe_capacity,
G_PRIORITY_DEFAULT,
NULL,
(GAsyncReadyCallback) read_async_wouldblock_cb,
&bytes_read);
while (bytes_written == 0 && bytes_read == 0)
g_main_context_iteration (NULL, TRUE);
g_assert_cmpuint (bytes_written, ==, 4 * pipe_capacity);
g_assert_cmpuint (bytes_read, ==, 4 * pipe_capacity);
g_assert_cmpmem (data_read, bytes_read, data, bytes_written);
g_free (data);
g_free (data_read);
g_object_unref (os);
g_object_unref (is);
#endif /* if F_GETPIPE_SZ */
}
#ifdef F_GETPIPE_SZ
static void
writev_async_wouldblock_cb (GUnixOutputStream *os,
GAsyncResult *result,
gpointer user_data)
{
gsize *bytes_written = user_data;
GError *err = NULL;
g_output_stream_writev_all_finish (G_OUTPUT_STREAM (os), result, bytes_written, &err);
g_assert_no_error (err);
}
#endif /* if F_GETPIPE_SZ */
/* test if the async implementation of writev_all() and read_all() in G*Stream
* around the GPollable*Stream API is working correctly.
*/
static void
test_writev_async_wouldblock (void)
{
#ifndef F_GETPIPE_SZ
g_test_skip ("F_GETPIPE_SZ not defined");
#else /* if F_GETPIPE_SZ */
GUnixInputStream *is;
GUnixOutputStream *os;
gint fd[2];
guint8 *data, *data_read;
guint i;
gint pipe_capacity;
gsize bytes_written = 0, bytes_read = 0;
GOutputVector vectors[4];
g_assert_cmpint (pipe (fd), ==, 0);
/* FIXME: These should not be needed but otherwise
* g_unix_output_stream_writev() will block because
* a) the fd is writable
* b) writing 4x capacity will block because writes are atomic
* c) the fd is blocking
*
* See https://gitlab.gnome.org/GNOME/glib/issues/1654
*/
g_unix_set_fd_nonblocking (fd[0], TRUE, NULL);
g_unix_set_fd_nonblocking (fd[1], TRUE, NULL);
g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
pipe_capacity = fcntl (fd[0], F_GETPIPE_SZ, &pipe_capacity, NULL);
g_assert_cmpint (pipe_capacity, >=, 4096);
data = g_new (guint8, 4 * pipe_capacity);
for (i = 0; i < 4 * pipe_capacity; i++)
data[i] = i;
data_read = g_new (guint8, 4 * pipe_capacity);
vectors[0].buffer = data;
vectors[0].size = 1024;
vectors[1].buffer = data + 1024;
vectors[1].size = 1024;
vectors[2].buffer = data + 2048;
vectors[2].size = 1024;
vectors[3].buffer = data + 3072;
vectors[3].size = 4 * pipe_capacity - 3072;
is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
g_output_stream_writev_all_async (G_OUTPUT_STREAM (os),
vectors,
G_N_ELEMENTS (vectors),
G_PRIORITY_DEFAULT,
NULL,
(GAsyncReadyCallback) writev_async_wouldblock_cb,
&bytes_written);
g_input_stream_read_all_async (G_INPUT_STREAM (is),
data_read,
4 * pipe_capacity,
G_PRIORITY_DEFAULT,
NULL,
(GAsyncReadyCallback) read_async_wouldblock_cb,
&bytes_read);
while (bytes_written == 0 && bytes_read == 0)
g_main_context_iteration (NULL, TRUE);
g_assert_cmpuint (bytes_written, ==, 4 * pipe_capacity);
g_assert_cmpuint (bytes_read, ==, 4 * pipe_capacity);
g_assert_cmpmem (data_read, bytes_read, data, bytes_written);
g_free (data);
g_free (data_read);
g_object_unref (os);
g_object_unref (is);
#endif /* F_GETPIPE_SZ */
}
int
main (int argc,
char *argv[])
@ -365,5 +836,23 @@ main (int argc,
GINT_TO_POINTER (TRUE),
test_pipe_io);
g_test_add_data_func ("/unix-streams/read_write",
GINT_TO_POINTER (FALSE),
test_read_write);
g_test_add_data_func ("/unix-streams/read_writev",
GINT_TO_POINTER (TRUE),
test_read_write);
g_test_add_func ("/unix-streams/write-wouldblock",
test_write_wouldblock);
g_test_add_func ("/unix-streams/writev-wouldblock",
test_writev_wouldblock);
g_test_add_func ("/unix-streams/write-async-wouldblock",
test_write_async_wouldblock);
g_test_add_func ("/unix-streams/writev-async-wouldblock",
test_writev_async_wouldblock);
return g_test_run();
}