diff --git a/configure.ac b/configure.ac index ced92b22f..aad3fa9ce 100644 --- a/configure.ac +++ b/configure.ac @@ -1030,7 +1030,7 @@ if $glib_failed ; then AC_MSG_ERROR([Could not determine values for MSG_* constants]) fi -AC_CHECK_FUNCS(getprotobyname_r endservent if_nametoindex if_indextoname) +AC_CHECK_FUNCS(getprotobyname_r endservent if_nametoindex if_indextoname sendmmsg) AS_IF([test $glib_native_win32 = yes], [ # in the Windows SDK and in mingw-w64 has wrappers for diff --git a/gio/giotypes.h b/gio/giotypes.h index de62cefa2..7f931456d 100644 --- a/gio/giotypes.h +++ b/gio/giotypes.h @@ -425,6 +425,41 @@ struct _GOutputVector { gsize size; }; +/** + * GOutputMessage: + * @address: (allow-none): a #GSocketAddress, or %NULL + * @vectors: pointer to an array of output vectors + * @num_vectors: the number of output vectors pointed to by @vectors. + * @bytes_sent: initialize to 0. Will be set to the number of bytes + * that have been sent + * @control_messages: (array length=num_control_messages) (allow-none): a pointer + * to an array of #GSocketControlMessages, or %NULL. + * @num_control_messages: number of elements in @control_messages. + * + * Structure used for scatter/gather data output when sending multiple + * messages or packets in one go. You generally pass in an array of + * #GOutputVectors and the operation will use all the buffers as if they + * were one buffer. + * + * If @address is %NULL then the message is sent to the default receiver + * (as previously set by g_socket_connect()). + * + * Since: 2.44 + */ +typedef struct _GOutputMessage GOutputMessage; + +struct _GOutputMessage { + GSocketAddress *address; + + GOutputVector *vectors; + guint num_vectors; + + guint bytes_sent; + + GSocketControlMessage **control_messages; + guint num_control_messages; +}; + typedef struct _GCredentials GCredentials; typedef struct _GUnixCredentialsMessage GUnixCredentialsMessage; typedef struct _GUnixFDList GUnixFDList; diff --git a/gio/gsocket.c b/gio/gsocket.c index 4d863cecd..d9e135d67 100644 --- a/gio/gsocket.c +++ b/gio/gsocket.c @@ -3987,6 +3987,281 @@ g_socket_send_message (GSocket *socket, #endif } +/** + * g_socket_send_messages: + * @socket: a #GSocket + * @messages: (array length=num_messages): an array of #GOutputMessage structs + * @num_messages: the number of elements in @messages + * @flags: an int containing #GSocketMsgFlags flags + * @cancellable: (allow-none): a %GCancellable or %NULL + * @error: #GError for error reporting, or %NULL to ignore. + * + * Send multiple data messages from @socket in one go. This is the most + * complicated and fully-featured version of this call. For easier use, see + * g_socket_send(), g_socket_send_to(), and g_socket_send_message(). + * + * @messages must point to an array of #GOutputMessage structs and + * @num_messages must be the length of this array. Each #GOutputMessage + * contains an address to send the data to, and a pointer to an array of + * #GOutputVector structs to describe the buffers that the data to be sent + * for each message will be gathered from. Using multiple #GOutputVectors is + * more memory-efficient than manually copying data from multiple sources + * into a single buffer, and more network-efficient than making multiple + * calls to g_socket_send(). Sending multiple messages in one go avoids the + * overhead of making a lot of syscalls in scenarios where a lot of data + * packets need to be sent (e.g. high-bandwidth video streaming over RTP/UDP), + * or where the same data needs to be sent to multiple recipients. + * + * @flags modify how the message is sent. The commonly available arguments + * for this are available in the #GSocketMsgFlags enum, but the + * values there are the same as the system values, and the flags + * are passed in as-is, so you can pass in system-specific flags too. + * + * If the socket is in blocking mode the call will block until there is + * space for all the data in the socket queue. If there is no space available + * and the socket is in non-blocking mode a %G_IO_ERROR_WOULD_BLOCK error + * will be returned if no data was written at all, otherwise the number of + * messages sent will be returned. To be notified when space is available, + * wait for the %G_IO_OUT condition. Note though that you may still receive + * %G_IO_ERROR_WOULD_BLOCK from g_socket_send() even if you were previously + * notified of a %G_IO_OUT condition. (On Windows in particular, this is + * very common due to the way the underlying APIs work.) + * + * On error -1 is returned and @error is set accordingly. + * + * Returns: number of messages sent, or -1 on error. Note that the number of + * messages sent may be smaller than @num_messages if the socket is + * non-blocking or if @num_messages was larger than UIO_MAXIOV (1024), + * in which case the caller may re-try to send the remaining messages. + * + * Since: 2.44 + */ +gint +g_socket_send_messages (GSocket *socket, + GOutputMessage *messages, + guint num_messages, + gint flags, + GCancellable *cancellable, + GError **error) +{ + g_return_val_if_fail (G_IS_SOCKET (socket), -1); + g_return_val_if_fail (num_messages == 0 || messages != NULL, -1); + g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), -1); + g_return_val_if_fail (error == NULL || *error == NULL, -1); + + if (!check_socket (socket, error)) + return -1; + + if (!check_timeout (socket, error)) + return -1; + + if (g_cancellable_set_error_if_cancelled (cancellable, error)) + return -1; + + if (num_messages == 0) + return 0; + +#if !defined (G_OS_WIN32) && defined (HAVE_SENDMMSG) + { + struct mmsghdr *msgvec; + gint i, num_sent, result, max_sent; + +#ifdef UIO_MAXIOV +#define MAX_NUM_MESSAGES UIO_MAXIOV +#else +#define MAX_NUM_MESSAGES 1024 +#endif + + if (num_messages > MAX_NUM_MESSAGES) + num_messages = MAX_NUM_MESSAGES; + + msgvec = g_newa (struct mmsghdr, num_messages); + + for (i = 0; i < num_messages; ++i) + { + GOutputMessage *msg = &messages[i]; + struct msghdr *msg_hdr = &msgvec[i].msg_hdr; + + msgvec[i].msg_len = 0; + + msg_hdr->msg_flags = 0; + + /* name */ + if (i > 0 && msg->address == messages[i-1].address) + { + msg_hdr->msg_name = msgvec[i-1].msg_hdr.msg_name; + msg_hdr->msg_namelen = msgvec[i-1].msg_hdr.msg_namelen; + } + else if (msg->address) + { + msg_hdr->msg_namelen = g_socket_address_get_native_size (msg->address); + msg_hdr->msg_name = g_alloca (msg_hdr->msg_namelen); + if (!g_socket_address_to_native (msg->address, msg_hdr->msg_name, msg_hdr->msg_namelen, error)) + return -1; + } + else + { + msg_hdr->msg_name = NULL; + msg_hdr->msg_namelen = 0; + } + + /* iov */ + { + /* this entire expression will be evaluated at compile time */ + if (sizeof (struct iovec) == sizeof (GOutputVector) && + sizeof msg_hdr->msg_iov->iov_base == sizeof msg->vectors->buffer && + G_STRUCT_OFFSET (struct iovec, iov_base) == + G_STRUCT_OFFSET (GOutputVector, buffer) && + sizeof msg_hdr->msg_iov->iov_len == sizeof msg->vectors->size && + G_STRUCT_OFFSET (struct iovec, iov_len) == + G_STRUCT_OFFSET (GOutputVector, size)) + /* ABI is compatible */ + { + msg_hdr->msg_iov = (struct iovec *) msg->vectors; + msg_hdr->msg_iovlen = msg->num_vectors; + } + else + /* ABI is incompatible */ + { + gint j; + + msg_hdr->msg_iov = g_newa (struct iovec, msg->num_vectors); + for (j = 0; j < msg->num_vectors; j++) + { + msg_hdr->msg_iov[j].iov_base = (void *) msg->vectors[j].buffer; + msg_hdr->msg_iov[j].iov_len = msg->vectors[j].size; + } + msg_hdr->msg_iovlen = msg->num_vectors; + } + } + + /* control */ + { + struct cmsghdr *cmsg; + gint j; + + msg_hdr->msg_controllen = 0; + for (j = 0; j < msg->num_control_messages; j++) + msg_hdr->msg_controllen += CMSG_SPACE (g_socket_control_message_get_size (msg->control_messages[j])); + + if (msg_hdr->msg_controllen == 0) + msg_hdr->msg_control = NULL; + else + { + msg_hdr->msg_control = g_alloca (msg_hdr->msg_controllen); + memset (msg_hdr->msg_control, '\0', msg_hdr->msg_controllen); + } + + cmsg = CMSG_FIRSTHDR (msg_hdr); + for (j = 0; j < msg->num_control_messages; j++) + { + GSocketControlMessage *cm = msg->control_messages[j]; + + cmsg->cmsg_level = g_socket_control_message_get_level (cm); + cmsg->cmsg_type = g_socket_control_message_get_msg_type (cm); + cmsg->cmsg_len = CMSG_LEN (g_socket_control_message_get_size (cm)); + g_socket_control_message_serialize (cm, CMSG_DATA (cmsg)); + cmsg = CMSG_NXTHDR (msg_hdr, cmsg); + } + g_assert (cmsg == NULL); + } + } + + num_sent = result = 0; + max_sent = num_messages; + while (num_sent < num_messages) + { + gint ret; + + if (socket->priv->blocking && + !g_socket_condition_wait (socket, + G_IO_OUT, cancellable, error)) + return -1; + + ret = sendmmsg (socket->priv->fd, msgvec + num_sent, num_messages - num_sent, + flags | G_SOCKET_DEFAULT_SEND_FLAGS); + + if (ret < 0) + { + int errsv = get_socket_errno (); + + if (errsv == EINTR) + continue; + + if (socket->priv->blocking && + (errsv == EWOULDBLOCK || + errsv == EAGAIN)) + continue; + + if (num_sent > 0 && + (errsv == EWOULDBLOCK || + errsv == EAGAIN)) + { + max_sent = num_sent; + break; + } + + g_set_error (error, G_IO_ERROR, + socket_io_error_from_errno (errsv), + _("Error sending message: %s"), socket_strerror (errsv)); + + /* we have to iterate over all messages below now, because we don't + * know where between num_sent and num_messages the error occured */ + max_sent = num_messages; + + result = -1; + break; + } + + num_sent += ret; + result = num_sent; + } + + for (i = 0; i < max_sent; ++i) + messages[i].bytes_sent = msgvec[i].msg_len; + + return result; + } +#else + { + gssize result; + gint i; + + for (i = 0; i < num_messages; ++i) + { + GOutputMessage *msg = &messages[i]; + GError *msg_error = NULL; + + result = g_socket_send_message (socket, msg->address, + msg->vectors, msg->num_vectors, + msg->control_messages, + msg->num_control_messages, + flags, cancellable, &msg_error); + + if (result < 0) + { + /* if we couldn't send all messages, just return how many we did + * manage to send, provided we managed to send at least one */ + if (msg_error->code == G_IO_ERROR_WOULD_BLOCK && i > 0) + { + g_error_free (msg_error); + return i; + } + else + { + g_propagate_error (error, msg_error); + return -1; + } + } + + msg->bytes_sent = result; + } + + return i; + } +#endif +} + static GSocketAddress * cache_recv_address (GSocket *socket, struct sockaddr *native, int native_len) { diff --git a/gio/gsocket.h b/gio/gsocket.h index 8b49cc127..dd866e739 100644 --- a/gio/gsocket.h +++ b/gio/gsocket.h @@ -236,6 +236,15 @@ gssize g_socket_send_message (GSocket gint flags, GCancellable *cancellable, GError **error); + +GLIB_AVAILABLE_IN_2_44 +gint g_socket_send_messages (GSocket *socket, + GOutputMessage *messages, + guint num_messages, + gint flags, + GCancellable *cancellable, + GError **error); + GLIB_AVAILABLE_IN_ALL gboolean g_socket_close (GSocket *socket, GError **error);