gsocket: Factor out blocking parameter from g_socket_receive_message()

This will make future API additions easier. The factored version is
internal for the time being.

https://bugzilla.gnome.org/show_bug.cgi?id=751924
This commit is contained in:
Philip Withnall 2015-07-27 14:21:00 +01:00
parent 5d68947466
commit 7f985b35ce

View File

@ -138,6 +138,17 @@ static gboolean g_socket_initable_init (GInitable *initable,
static GSocketAddress *
cache_recv_address (GSocket *socket, struct sockaddr *native, int native_len);
static gssize
g_socket_receive_message_with_blocking (GSocket *socket,
GSocketAddress **address,
GInputVector *vectors,
gint num_vectors,
GSocketControlMessage ***messages,
gint *num_messages,
gint *flags,
gboolean blocking,
GCancellable *cancellable,
GError **error);
static gint
g_socket_send_messages_with_blocking (GSocket *socket,
GOutputMessage *messages,
@ -4440,6 +4451,203 @@ cache_recv_address (GSocket *socket, struct sockaddr *native, int native_len)
return saddr;
}
static gssize
g_socket_receive_message_with_blocking (GSocket *socket,
GSocketAddress **address,
GInputVector *vectors,
gint num_vectors,
GSocketControlMessage ***messages,
gint *num_messages,
gint *flags,
gboolean blocking,
GCancellable *cancellable,
GError **error)
{
GInputVector one_vector;
char one_byte;
g_return_val_if_fail (G_IS_SOCKET (socket), -1);
if (!check_socket (socket, error))
return -1;
if (!check_timeout (socket, error))
return -1;
if (g_cancellable_set_error_if_cancelled (cancellable, error))
return -1;
if (num_vectors == -1)
{
for (num_vectors = 0;
vectors[num_vectors].buffer != NULL;
num_vectors++)
;
}
if (num_vectors == 0)
{
one_vector.buffer = &one_byte;
one_vector.size = 1;
num_vectors = 1;
vectors = &one_vector;
}
#ifndef G_OS_WIN32
{
GInputMessage input_message;
struct msghdr msg;
gssize result;
input_message.address = address;
input_message.vectors = vectors;
input_message.num_vectors = num_vectors;
input_message.bytes_received = 0;
input_message.flags = (flags != NULL) ? *flags : 0;
input_message.control_messages = messages;
input_message.num_control_messages = (guint *) num_messages;
/* We always set the close-on-exec flag so we don't leak file
* descriptors into child processes. Note that gunixfdmessage.c
* will later call fcntl (fd, FD_CLOEXEC), but that isn't atomic.
*/
#ifdef MSG_CMSG_CLOEXEC
input_message.flags |= MSG_CMSG_CLOEXEC;
#endif
input_message_to_msghdr (&input_message, &msg);
/* do it */
while (1)
{
result = recvmsg (socket->priv->fd, &msg, msg.msg_flags);
#ifdef MSG_CMSG_CLOEXEC
if (result < 0 && get_socket_errno () == EINVAL)
{
/* We must be running on an old kernel. Call without the flag. */
msg.msg_flags &= ~(MSG_CMSG_CLOEXEC);
result = recvmsg (socket->priv->fd, &msg, msg.msg_flags);
}
#endif
if (result < 0)
{
int errsv = get_socket_errno ();
if (errsv == EINTR)
continue;
if (blocking &&
(errsv == EWOULDBLOCK ||
errsv == EAGAIN))
{
if (!g_socket_condition_wait (socket,
G_IO_IN, cancellable, error))
return -1;
continue;
}
socket_set_error_lazy (error, errsv, _("Error receiving message: %s"));
return -1;
}
break;
}
input_message_from_msghdr (&msg, &input_message, socket);
if (flags != NULL)
*flags = input_message.flags;
return result;
}
#else
{
struct sockaddr_storage addr;
int addrlen;
DWORD bytes_received;
DWORD win_flags;
int result;
WSABUF *bufs;
gint i;
/* iov */
bufs = g_newa (WSABUF, num_vectors);
for (i = 0; i < num_vectors; i++)
{
bufs[i].buf = (char *)vectors[i].buffer;
bufs[i].len = (gulong)vectors[i].size;
}
/* flags */
if (flags != NULL)
win_flags = *flags;
else
win_flags = 0;
/* do it */
while (1)
{
addrlen = sizeof addr;
if (address)
result = WSARecvFrom (socket->priv->fd,
bufs, num_vectors,
&bytes_received, &win_flags,
(struct sockaddr *)&addr, &addrlen,
NULL, NULL);
else
result = WSARecv (socket->priv->fd,
bufs, num_vectors,
&bytes_received, &win_flags,
NULL, NULL);
if (result != 0)
{
int errsv = get_socket_errno ();
if (errsv == WSAEINTR)
continue;
if (errsv == WSAEWOULDBLOCK)
{
win32_unset_event_mask (socket, FD_READ);
if (blocking)
{
if (!g_socket_condition_wait (socket,
G_IO_IN, cancellable, error))
return -1;
continue;
}
}
socket_set_error_lazy (error, errsv, _("Error receiving message: %s"));
return -1;
}
win32_unset_event_mask (socket, FD_READ);
break;
}
/* decode address */
if (address != NULL)
{
*address = cache_recv_address (socket, (struct sockaddr *)&addr, addrlen);
}
/* capture the flags */
if (flags != NULL)
*flags = win_flags;
if (messages != NULL)
*messages = NULL;
if (num_messages != NULL)
*num_messages = 0;
return bytes_received;
}
#endif
}
/**
* g_socket_receive_message:
* @socket: a #GSocket
@ -4531,189 +4739,11 @@ g_socket_receive_message (GSocket *socket,
GCancellable *cancellable,
GError **error)
{
GInputVector one_vector;
char one_byte;
g_return_val_if_fail (G_IS_SOCKET (socket), -1);
if (!check_socket (socket, error))
return -1;
if (!check_timeout (socket, error))
return -1;
if (g_cancellable_set_error_if_cancelled (cancellable, error))
return -1;
if (num_vectors == -1)
{
for (num_vectors = 0;
vectors[num_vectors].buffer != NULL;
num_vectors++)
;
}
if (num_vectors == 0)
{
one_vector.buffer = &one_byte;
one_vector.size = 1;
num_vectors = 1;
vectors = &one_vector;
}
#ifndef G_OS_WIN32
{
GInputMessage input_message;
struct msghdr msg;
gssize result;
input_message.address = address;
input_message.vectors = vectors;
input_message.num_vectors = num_vectors;
input_message.bytes_received = 0;
input_message.flags = (flags != NULL) ? *flags : 0;
input_message.control_messages = messages;
input_message.num_control_messages = (guint *) num_messages;
/* We always set the close-on-exec flag so we don't leak file
* descriptors into child processes. Note that gunixfdmessage.c
* will later call fcntl (fd, FD_CLOEXEC), but that isn't atomic.
*/
#ifdef MSG_CMSG_CLOEXEC
input_message.flags |= MSG_CMSG_CLOEXEC;
#endif
input_message_to_msghdr (&input_message, &msg);
/* do it */
while (1)
{
result = recvmsg (socket->priv->fd, &msg, msg.msg_flags);
#ifdef MSG_CMSG_CLOEXEC
if (result < 0 && get_socket_errno () == EINVAL)
{
/* We must be running on an old kernel. Call without the flag. */
msg.msg_flags &= ~(MSG_CMSG_CLOEXEC);
result = recvmsg (socket->priv->fd, &msg, msg.msg_flags);
}
#endif
if (result < 0)
{
int errsv = get_socket_errno ();
if (errsv == EINTR)
continue;
if (socket->priv->blocking &&
(errsv == EWOULDBLOCK ||
errsv == EAGAIN))
{
if (!g_socket_condition_wait (socket,
G_IO_IN, cancellable, error))
return -1;
continue;
}
socket_set_error_lazy (error, errsv, _("Error receiving message: %s"));
return -1;
}
break;
}
input_message_from_msghdr (&msg, &input_message, socket);
if (flags != NULL)
*flags = input_message.flags;
return result;
}
#else
{
struct sockaddr_storage addr;
int addrlen;
DWORD bytes_received;
DWORD win_flags;
int result;
WSABUF *bufs;
gint i;
/* iov */
bufs = g_newa (WSABUF, num_vectors);
for (i = 0; i < num_vectors; i++)
{
bufs[i].buf = (char *)vectors[i].buffer;
bufs[i].len = (gulong)vectors[i].size;
}
/* flags */
if (flags != NULL)
win_flags = *flags;
else
win_flags = 0;
/* do it */
while (1)
{
addrlen = sizeof addr;
if (address)
result = WSARecvFrom (socket->priv->fd,
bufs, num_vectors,
&bytes_received, &win_flags,
(struct sockaddr *)&addr, &addrlen,
NULL, NULL);
else
result = WSARecv (socket->priv->fd,
bufs, num_vectors,
&bytes_received, &win_flags,
NULL, NULL);
if (result != 0)
{
int errsv = get_socket_errno ();
if (errsv == WSAEINTR)
continue;
if (errsv == WSAEWOULDBLOCK)
{
win32_unset_event_mask (socket, FD_READ);
if (socket->priv->blocking)
{
if (!g_socket_condition_wait (socket,
G_IO_IN, cancellable, error))
return -1;
continue;
}
}
socket_set_error_lazy (error, errsv, _("Error receiving message: %s"));
return -1;
}
win32_unset_event_mask (socket, FD_READ);
break;
}
/* decode address */
if (address != NULL)
{
*address = cache_recv_address (socket, (struct sockaddr *)&addr, addrlen);
}
/* capture the flags */
if (flags != NULL)
*flags = win_flags;
if (messages != NULL)
*messages = NULL;
if (num_messages != NULL)
*num_messages = 0;
return bytes_received;
}
#endif
return g_socket_receive_message_with_blocking (socket, address, vectors,
num_vectors, messages,
num_messages, flags,
socket->priv->blocking,
cancellable, error);
}
/**