gsocket: add g_socket_send_messages()

Allows sending of multiple messages (packets, datagrams)
in one go using sendmmsg(), thus drastically reducing the
number of syscalls when sending out a lot of data, or when
sending out the same data to multiple recipients.

https://bugzilla.gnome.org/show_bug.cgi?id=719646
This commit is contained in:
Tim-Philipp Müller 2014-06-12 18:16:45 +01:00
parent 3c3fc0e463
commit fff5c7cd63
4 changed files with 320 additions and 1 deletions

View File

@ -1030,7 +1030,7 @@ if $glib_failed ; then
AC_MSG_ERROR([Could not determine values for MSG_* constants]) AC_MSG_ERROR([Could not determine values for MSG_* constants])
fi fi
AC_CHECK_FUNCS(getprotobyname_r endservent if_nametoindex if_indextoname) AC_CHECK_FUNCS(getprotobyname_r endservent if_nametoindex if_indextoname sendmmsg)
AS_IF([test $glib_native_win32 = yes], [ AS_IF([test $glib_native_win32 = yes], [
# <wspiapi.h> in the Windows SDK and in mingw-w64 has wrappers for # <wspiapi.h> in the Windows SDK and in mingw-w64 has wrappers for

View File

@ -425,6 +425,41 @@ struct _GOutputVector {
gsize size; gsize size;
}; };
/**
* GOutputMessage:
* @address: (allow-none): a #GSocketAddress, or %NULL
* @vectors: pointer to an array of output vectors
* @num_vectors: the number of output vectors pointed to by @vectors.
* @bytes_sent: initialize to 0. Will be set to the number of bytes
* that have been sent
* @control_messages: (array length=num_control_messages) (allow-none): a pointer
* to an array of #GSocketControlMessages, or %NULL.
* @num_control_messages: number of elements in @control_messages.
*
* Structure used for scatter/gather data output when sending multiple
* messages or packets in one go. You generally pass in an array of
* #GOutputVectors and the operation will use all the buffers as if they
* were one buffer.
*
* If @address is %NULL then the message is sent to the default receiver
* (as previously set by g_socket_connect()).
*
* Since: 2.44
*/
typedef struct _GOutputMessage GOutputMessage;
struct _GOutputMessage {
GSocketAddress *address;
GOutputVector *vectors;
guint num_vectors;
guint bytes_sent;
GSocketControlMessage **control_messages;
guint num_control_messages;
};
typedef struct _GCredentials GCredentials; typedef struct _GCredentials GCredentials;
typedef struct _GUnixCredentialsMessage GUnixCredentialsMessage; typedef struct _GUnixCredentialsMessage GUnixCredentialsMessage;
typedef struct _GUnixFDList GUnixFDList; typedef struct _GUnixFDList GUnixFDList;

View File

@ -3987,6 +3987,281 @@ g_socket_send_message (GSocket *socket,
#endif #endif
} }
/**
* g_socket_send_messages:
* @socket: a #GSocket
* @messages: (array length=num_messages): an array of #GOutputMessage structs
* @num_messages: the number of elements in @messages
* @flags: an int containing #GSocketMsgFlags flags
* @cancellable: (allow-none): a %GCancellable or %NULL
* @error: #GError for error reporting, or %NULL to ignore.
*
* Send multiple data messages from @socket in one go. This is the most
* complicated and fully-featured version of this call. For easier use, see
* g_socket_send(), g_socket_send_to(), and g_socket_send_message().
*
* @messages must point to an array of #GOutputMessage structs and
* @num_messages must be the length of this array. Each #GOutputMessage
* contains an address to send the data to, and a pointer to an array of
* #GOutputVector structs to describe the buffers that the data to be sent
* for each message will be gathered from. Using multiple #GOutputVectors is
* more memory-efficient than manually copying data from multiple sources
* into a single buffer, and more network-efficient than making multiple
* calls to g_socket_send(). Sending multiple messages in one go avoids the
* overhead of making a lot of syscalls in scenarios where a lot of data
* packets need to be sent (e.g. high-bandwidth video streaming over RTP/UDP),
* or where the same data needs to be sent to multiple recipients.
*
* @flags modify how the message is sent. The commonly available arguments
* for this are available in the #GSocketMsgFlags enum, but the
* values there are the same as the system values, and the flags
* are passed in as-is, so you can pass in system-specific flags too.
*
* If the socket is in blocking mode the call will block until there is
* space for all the data in the socket queue. If there is no space available
* and the socket is in non-blocking mode a %G_IO_ERROR_WOULD_BLOCK error
* will be returned if no data was written at all, otherwise the number of
* messages sent will be returned. To be notified when space is available,
* wait for the %G_IO_OUT condition. Note though that you may still receive
* %G_IO_ERROR_WOULD_BLOCK from g_socket_send() even if you were previously
* notified of a %G_IO_OUT condition. (On Windows in particular, this is
* very common due to the way the underlying APIs work.)
*
* On error -1 is returned and @error is set accordingly.
*
* Returns: number of messages sent, or -1 on error. Note that the number of
* messages sent may be smaller than @num_messages if the socket is
* non-blocking or if @num_messages was larger than UIO_MAXIOV (1024),
* in which case the caller may re-try to send the remaining messages.
*
* Since: 2.44
*/
gint
g_socket_send_messages (GSocket *socket,
GOutputMessage *messages,
guint num_messages,
gint flags,
GCancellable *cancellable,
GError **error)
{
g_return_val_if_fail (G_IS_SOCKET (socket), -1);
g_return_val_if_fail (num_messages == 0 || messages != NULL, -1);
g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), -1);
g_return_val_if_fail (error == NULL || *error == NULL, -1);
if (!check_socket (socket, error))
return -1;
if (!check_timeout (socket, error))
return -1;
if (g_cancellable_set_error_if_cancelled (cancellable, error))
return -1;
if (num_messages == 0)
return 0;
#if !defined (G_OS_WIN32) && defined (HAVE_SENDMMSG)
{
struct mmsghdr *msgvec;
gint i, num_sent, result, max_sent;
#ifdef UIO_MAXIOV
#define MAX_NUM_MESSAGES UIO_MAXIOV
#else
#define MAX_NUM_MESSAGES 1024
#endif
if (num_messages > MAX_NUM_MESSAGES)
num_messages = MAX_NUM_MESSAGES;
msgvec = g_newa (struct mmsghdr, num_messages);
for (i = 0; i < num_messages; ++i)
{
GOutputMessage *msg = &messages[i];
struct msghdr *msg_hdr = &msgvec[i].msg_hdr;
msgvec[i].msg_len = 0;
msg_hdr->msg_flags = 0;
/* name */
if (i > 0 && msg->address == messages[i-1].address)
{
msg_hdr->msg_name = msgvec[i-1].msg_hdr.msg_name;
msg_hdr->msg_namelen = msgvec[i-1].msg_hdr.msg_namelen;
}
else if (msg->address)
{
msg_hdr->msg_namelen = g_socket_address_get_native_size (msg->address);
msg_hdr->msg_name = g_alloca (msg_hdr->msg_namelen);
if (!g_socket_address_to_native (msg->address, msg_hdr->msg_name, msg_hdr->msg_namelen, error))
return -1;
}
else
{
msg_hdr->msg_name = NULL;
msg_hdr->msg_namelen = 0;
}
/* iov */
{
/* this entire expression will be evaluated at compile time */
if (sizeof (struct iovec) == sizeof (GOutputVector) &&
sizeof msg_hdr->msg_iov->iov_base == sizeof msg->vectors->buffer &&
G_STRUCT_OFFSET (struct iovec, iov_base) ==
G_STRUCT_OFFSET (GOutputVector, buffer) &&
sizeof msg_hdr->msg_iov->iov_len == sizeof msg->vectors->size &&
G_STRUCT_OFFSET (struct iovec, iov_len) ==
G_STRUCT_OFFSET (GOutputVector, size))
/* ABI is compatible */
{
msg_hdr->msg_iov = (struct iovec *) msg->vectors;
msg_hdr->msg_iovlen = msg->num_vectors;
}
else
/* ABI is incompatible */
{
gint j;
msg_hdr->msg_iov = g_newa (struct iovec, msg->num_vectors);
for (j = 0; j < msg->num_vectors; j++)
{
msg_hdr->msg_iov[j].iov_base = (void *) msg->vectors[j].buffer;
msg_hdr->msg_iov[j].iov_len = msg->vectors[j].size;
}
msg_hdr->msg_iovlen = msg->num_vectors;
}
}
/* control */
{
struct cmsghdr *cmsg;
gint j;
msg_hdr->msg_controllen = 0;
for (j = 0; j < msg->num_control_messages; j++)
msg_hdr->msg_controllen += CMSG_SPACE (g_socket_control_message_get_size (msg->control_messages[j]));
if (msg_hdr->msg_controllen == 0)
msg_hdr->msg_control = NULL;
else
{
msg_hdr->msg_control = g_alloca (msg_hdr->msg_controllen);
memset (msg_hdr->msg_control, '\0', msg_hdr->msg_controllen);
}
cmsg = CMSG_FIRSTHDR (msg_hdr);
for (j = 0; j < msg->num_control_messages; j++)
{
GSocketControlMessage *cm = msg->control_messages[j];
cmsg->cmsg_level = g_socket_control_message_get_level (cm);
cmsg->cmsg_type = g_socket_control_message_get_msg_type (cm);
cmsg->cmsg_len = CMSG_LEN (g_socket_control_message_get_size (cm));
g_socket_control_message_serialize (cm, CMSG_DATA (cmsg));
cmsg = CMSG_NXTHDR (msg_hdr, cmsg);
}
g_assert (cmsg == NULL);
}
}
num_sent = result = 0;
max_sent = num_messages;
while (num_sent < num_messages)
{
gint ret;
if (socket->priv->blocking &&
!g_socket_condition_wait (socket,
G_IO_OUT, cancellable, error))
return -1;
ret = sendmmsg (socket->priv->fd, msgvec + num_sent, num_messages - num_sent,
flags | G_SOCKET_DEFAULT_SEND_FLAGS);
if (ret < 0)
{
int errsv = get_socket_errno ();
if (errsv == EINTR)
continue;
if (socket->priv->blocking &&
(errsv == EWOULDBLOCK ||
errsv == EAGAIN))
continue;
if (num_sent > 0 &&
(errsv == EWOULDBLOCK ||
errsv == EAGAIN))
{
max_sent = num_sent;
break;
}
g_set_error (error, G_IO_ERROR,
socket_io_error_from_errno (errsv),
_("Error sending message: %s"), socket_strerror (errsv));
/* we have to iterate over all messages below now, because we don't
* know where between num_sent and num_messages the error occured */
max_sent = num_messages;
result = -1;
break;
}
num_sent += ret;
result = num_sent;
}
for (i = 0; i < max_sent; ++i)
messages[i].bytes_sent = msgvec[i].msg_len;
return result;
}
#else
{
gssize result;
gint i;
for (i = 0; i < num_messages; ++i)
{
GOutputMessage *msg = &messages[i];
GError *msg_error = NULL;
result = g_socket_send_message (socket, msg->address,
msg->vectors, msg->num_vectors,
msg->control_messages,
msg->num_control_messages,
flags, cancellable, &msg_error);
if (result < 0)
{
/* if we couldn't send all messages, just return how many we did
* manage to send, provided we managed to send at least one */
if (msg_error->code == G_IO_ERROR_WOULD_BLOCK && i > 0)
{
g_error_free (msg_error);
return i;
}
else
{
g_propagate_error (error, msg_error);
return -1;
}
}
msg->bytes_sent = result;
}
return i;
}
#endif
}
static GSocketAddress * static GSocketAddress *
cache_recv_address (GSocket *socket, struct sockaddr *native, int native_len) cache_recv_address (GSocket *socket, struct sockaddr *native, int native_len)
{ {

View File

@ -236,6 +236,15 @@ gssize g_socket_send_message (GSocket
gint flags, gint flags,
GCancellable *cancellable, GCancellable *cancellable,
GError **error); GError **error);
GLIB_AVAILABLE_IN_2_44
gint g_socket_send_messages (GSocket *socket,
GOutputMessage *messages,
guint num_messages,
gint flags,
GCancellable *cancellable,
GError **error);
GLIB_AVAILABLE_IN_ALL GLIB_AVAILABLE_IN_ALL
gboolean g_socket_close (GSocket *socket, gboolean g_socket_close (GSocket *socket,
GError **error); GError **error);