mirror of
https://gitlab.gnome.org/GNOME/glib.git
synced 2025-07-30 05:43:28 +02:00
gsocket: Add g_socket_receive_messages()
Add support for receiving multiple messages with a single system call, using recvmmsg() if available. Otherwise, fall back to looping over g_socket_receive_message(). This adds new API, g_socket_receive_messages(), and corresponding unit tests. https://bugzilla.gnome.org/show_bug.cgi?id=751924
This commit is contained in:
294
gio/gsocket.c
294
gio/gsocket.c
@@ -3,6 +3,7 @@
|
||||
* Copyright (C) 2008 Christian Kellner, Samuel Cormier-Iijima
|
||||
* Copyright © 2009 Codethink Limited
|
||||
* Copyright © 2009 Red Hat, Inc
|
||||
* Copyright © 2015 Collabora, Ltd.
|
||||
*
|
||||
* This library is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
@@ -21,6 +22,7 @@
|
||||
* Samuel Cormier-Iijima <sciyoshi@gmail.com>
|
||||
* Ryan Lortie <desrt@desrt.ca>
|
||||
* Alexander Larsson <alexl@redhat.com>
|
||||
* Philip Withnall <philip.withnall@collabora.co.uk>
|
||||
*/
|
||||
|
||||
#include "config.h"
|
||||
@@ -150,6 +152,14 @@ g_socket_receive_message_with_timeout (GSocket *socket,
|
||||
GCancellable *cancellable,
|
||||
GError **error);
|
||||
static gint
|
||||
g_socket_receive_messages_with_timeout (GSocket *socket,
|
||||
GInputMessage *messages,
|
||||
guint num_messages,
|
||||
gint flags,
|
||||
gint64 timeout,
|
||||
GCancellable *cancellable,
|
||||
GError **error);
|
||||
static gint
|
||||
g_socket_send_messages_with_timeout (GSocket *socket,
|
||||
GOutputMessage *messages,
|
||||
guint num_messages,
|
||||
@@ -4743,6 +4753,286 @@ g_socket_receive_message_with_timeout (GSocket *socket,
|
||||
#endif
|
||||
}
|
||||
|
||||
/**
|
||||
* g_socket_receive_messages:
|
||||
* @socket: a #GSocket
|
||||
* @messages: (array length=num_messages): an array of #GInputMessage structs
|
||||
* @num_messages: the number of elements in @messages
|
||||
* @flags: an int containing #GSocketMsgFlags flags for the overall operation
|
||||
* @cancellable: (allow-none): a %GCancellable or %NULL
|
||||
* @error: #GError for error reporting, or %NULL to ignore
|
||||
*
|
||||
* Receive multiple data messages from @socket in one go. This is the most
|
||||
* complicated and fully-featured version of this call. For easier use, see
|
||||
* g_socket_receive(), g_socket_receive_from(), and g_socket_receive_message().
|
||||
*
|
||||
* @messages must point to an array of #GInputMessage structs and
|
||||
* @num_messages must be the length of this array. Each #GInputMessage
|
||||
* contains a pointer to an array of #GInputVector structs describing the
|
||||
* buffers that the data received in each message will be written to. Using
|
||||
* multiple #GInputVectors is more memory-efficient than manually copying data
|
||||
* out of a single buffer to multiple sources, and more system-call-efficient
|
||||
* than making multiple calls to g_socket_receive(), such as in scenarios where
|
||||
* a lot of data packets need to be received (e.g. high-bandwidth video
|
||||
* streaming over RTP/UDP).
|
||||
*
|
||||
* @flags modify how all messages are received. The commonly available
|
||||
* arguments for this are available in the #GSocketMsgFlags enum, but the
|
||||
* values there are the same as the system values, and the flags
|
||||
* are passed in as-is, so you can pass in system-specific flags too. These
|
||||
* flags affect the overall receive operation. Flags affecting individual
|
||||
* messages are returned in #GInputMessage.flags.
|
||||
*
|
||||
* The other members of #GInputMessage are treated as described in its
|
||||
* documentation.
|
||||
*
|
||||
* If #GSocket:blocking is %TRUE the call will block until @num_messages have
|
||||
* been received, or the end of the stream is reached.
|
||||
*
|
||||
* If #GSocket:blocking is %FALSE the call will return up to @num_messages
|
||||
* without blocking, or %G_IO_ERROR_WOULD_BLOCK if no messages are queued in the
|
||||
* operating system to be received.
|
||||
*
|
||||
* In blocking mode, if #GSocket:timeout is positive and is reached before any
|
||||
* messages are received, %G_IO_ERROR_TIMED_OUT is returned, otherwise up to
|
||||
* @num_messages are returned. (Note: This is effectively the
|
||||
* behaviour of `MSG_WAITFORONE` with recvmmsg().)
|
||||
*
|
||||
* To be notified when messages are available, wait for the
|
||||
* %G_IO_IN condition. Note though that you may still receive
|
||||
* %G_IO_ERROR_WOULD_BLOCK from g_socket_receive_messages() even if you were
|
||||
* previously notified of a %G_IO_IN condition.
|
||||
*
|
||||
* If the remote peer closes the connection, any messages queued in the
|
||||
* operating system will be returned, and subsequent calls to
|
||||
* g_socket_receive_messages() will return 0 (with no error set).
|
||||
*
|
||||
* On error -1 is returned and @error is set accordingly. An error will only
|
||||
* be returned if zero messages could be received; otherwise the number of
|
||||
* messages successfully received before the error will be returned.
|
||||
*
|
||||
* Returns: number of messages received, or -1 on error. Note that the number
|
||||
* of messages received may be smaller than @num_messages if in non-blocking
|
||||
* mode, if the peer closed the connection, or if @num_messages
|
||||
* was larger than `UIO_MAXIOV` (1024), in which case the caller may re-try
|
||||
* to receive the remaining messages.
|
||||
*
|
||||
* Since: 2.48
|
||||
*/
|
||||
gint
|
||||
g_socket_receive_messages (GSocket *socket,
|
||||
GInputMessage *messages,
|
||||
guint num_messages,
|
||||
gint flags,
|
||||
GCancellable *cancellable,
|
||||
GError **error)
|
||||
{
|
||||
if (!check_socket (socket, error) ||
|
||||
!check_timeout (socket, error))
|
||||
return -1;
|
||||
|
||||
return g_socket_receive_messages_with_timeout (socket, messages, num_messages,
|
||||
flags,
|
||||
socket->priv->blocking ? -1 : 0,
|
||||
cancellable, error);
|
||||
}
|
||||
|
||||
static gint
|
||||
g_socket_receive_messages_with_timeout (GSocket *socket,
|
||||
GInputMessage *messages,
|
||||
guint num_messages,
|
||||
gint flags,
|
||||
gint64 timeout,
|
||||
GCancellable *cancellable,
|
||||
GError **error)
|
||||
{
|
||||
gint64 start_time;
|
||||
|
||||
g_return_val_if_fail (G_IS_SOCKET (socket), -1);
|
||||
g_return_val_if_fail (num_messages == 0 || messages != NULL, -1);
|
||||
g_return_val_if_fail (cancellable == NULL ||
|
||||
G_IS_CANCELLABLE (cancellable), -1);
|
||||
g_return_val_if_fail (error == NULL || *error == NULL, -1);
|
||||
|
||||
start_time = g_get_monotonic_time ();
|
||||
|
||||
if (!check_socket (socket, error))
|
||||
return -1;
|
||||
|
||||
if (!check_timeout (socket, error))
|
||||
return -1;
|
||||
|
||||
if (g_cancellable_set_error_if_cancelled (cancellable, error))
|
||||
return -1;
|
||||
|
||||
if (num_messages == 0)
|
||||
return 0;
|
||||
|
||||
#if !defined (G_OS_WIN32) && defined (HAVE_RECVMMSG)
|
||||
{
|
||||
struct mmsghdr *msgvec;
|
||||
guint i, num_received;
|
||||
|
||||
#ifdef UIO_MAXIOV
|
||||
#define MAX_NUM_MESSAGES UIO_MAXIOV
|
||||
#else
|
||||
#define MAX_NUM_MESSAGES 1024
|
||||
#endif
|
||||
|
||||
if (num_messages > MAX_NUM_MESSAGES)
|
||||
num_messages = MAX_NUM_MESSAGES;
|
||||
|
||||
msgvec = g_newa (struct mmsghdr, num_messages);
|
||||
|
||||
for (i = 0; i < num_messages; ++i)
|
||||
{
|
||||
GInputMessage *msg = &messages[i];
|
||||
struct msghdr *msg_hdr = &msgvec[i].msg_hdr;
|
||||
|
||||
input_message_to_msghdr (msg, msg_hdr);
|
||||
msgvec[i].msg_len = 0;
|
||||
}
|
||||
|
||||
/* We always set the close-on-exec flag so we don't leak file
|
||||
* descriptors into child processes. Note that gunixfdmessage.c
|
||||
* will later call fcntl (fd, FD_CLOEXEC), but that isn't atomic.
|
||||
*/
|
||||
#ifdef MSG_CMSG_CLOEXEC
|
||||
flags |= MSG_CMSG_CLOEXEC;
|
||||
#endif
|
||||
|
||||
for (num_received = 0; num_received < num_messages;)
|
||||
{
|
||||
gint ret;
|
||||
|
||||
/* We operate in non-blocking mode and handle the timeout ourselves. */
|
||||
ret = recvmmsg (socket->priv->fd,
|
||||
msgvec + num_received,
|
||||
num_messages - num_received,
|
||||
flags | G_SOCKET_DEFAULT_SEND_FLAGS, NULL);
|
||||
#ifdef MSG_CMSG_CLOEXEC
|
||||
if (ret < 0 && get_socket_errno () == EINVAL)
|
||||
{
|
||||
/* We must be running on an old kernel. Call without the flag. */
|
||||
flags &= ~(MSG_CMSG_CLOEXEC);
|
||||
ret = recvmmsg (socket->priv->fd,
|
||||
msgvec + num_received,
|
||||
num_messages - num_received,
|
||||
flags | G_SOCKET_DEFAULT_SEND_FLAGS, NULL);
|
||||
}
|
||||
#endif
|
||||
|
||||
if (ret < 0)
|
||||
{
|
||||
int errsv = get_socket_errno ();
|
||||
|
||||
if (errsv == EINTR)
|
||||
continue;
|
||||
|
||||
if (timeout != 0 &&
|
||||
(errsv == EWOULDBLOCK ||
|
||||
errsv == EAGAIN))
|
||||
{
|
||||
if (!block_on_timeout (socket, G_IO_IN, timeout, start_time,
|
||||
cancellable, error))
|
||||
{
|
||||
if (num_received > 0)
|
||||
{
|
||||
g_clear_error (error);
|
||||
break;
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
/* If any messages were successfully received, do not error. */
|
||||
if (num_received > 0)
|
||||
break;
|
||||
|
||||
socket_set_error_lazy (error, errsv,
|
||||
_("Error receiving message: %s"));
|
||||
|
||||
return -1;
|
||||
}
|
||||
else if (ret == 0)
|
||||
{
|
||||
/* EOS. */
|
||||
break;
|
||||
}
|
||||
|
||||
num_received += ret;
|
||||
}
|
||||
|
||||
for (i = 0; i < num_received; ++i)
|
||||
{
|
||||
input_message_from_msghdr (&msgvec[i].msg_hdr, &messages[i], socket);
|
||||
messages[i].bytes_received = msgvec[i].msg_len;
|
||||
}
|
||||
|
||||
return num_received;
|
||||
}
|
||||
#else
|
||||
{
|
||||
guint i;
|
||||
gint64 wait_timeout;
|
||||
|
||||
wait_timeout = timeout;
|
||||
|
||||
for (i = 0; i < num_messages; i++)
|
||||
{
|
||||
GInputMessage *msg = &messages[i];
|
||||
gssize len;
|
||||
GError *msg_error = NULL;
|
||||
|
||||
msg->flags = flags; /* in-out parameter */
|
||||
|
||||
len = g_socket_receive_message_with_timeout (socket,
|
||||
msg->address,
|
||||
msg->vectors,
|
||||
msg->num_vectors,
|
||||
msg->control_messages,
|
||||
(gint *) msg->num_control_messages,
|
||||
&msg->flags,
|
||||
wait_timeout,
|
||||
cancellable,
|
||||
&msg_error);
|
||||
|
||||
/* check if we've timed out or how much time to wait at most */
|
||||
if (timeout > 0)
|
||||
{
|
||||
gint64 elapsed = g_get_monotonic_time () - start_time;
|
||||
wait_timeout = MAX (timeout - elapsed, 1);
|
||||
}
|
||||
|
||||
if (len >= 0)
|
||||
msg->bytes_received = len;
|
||||
|
||||
if (i != 0 &&
|
||||
(g_error_matches (msg_error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) ||
|
||||
g_error_matches (msg_error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)))
|
||||
{
|
||||
g_clear_error (&msg_error);
|
||||
break;
|
||||
}
|
||||
|
||||
if (msg_error != NULL)
|
||||
{
|
||||
g_propagate_error (error, msg_error);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (len == 0)
|
||||
break;
|
||||
}
|
||||
|
||||
return i;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
/**
|
||||
* g_socket_receive_message:
|
||||
* @socket: a #GSocket
|
||||
@@ -4758,8 +5048,8 @@ g_socket_receive_message_with_timeout (GSocket *socket,
|
||||
* @cancellable: (allow-none): a %GCancellable or %NULL
|
||||
* @error: a #GError pointer, or %NULL
|
||||
*
|
||||
* Receive data from a socket. This is the most complicated and
|
||||
* fully-featured version of this call. For easier use, see
|
||||
* Receive data from a socket. For receiving multiple messages, see
|
||||
* g_socket_receive_messages(); for easier use, see
|
||||
* g_socket_receive() and g_socket_receive_from().
|
||||
*
|
||||
* If @address is non-%NULL then @address will be set equal to the
|
||||
|
Reference in New Issue
Block a user