gsocket: Switch internal functions from blocking booleans to timeouts

In order to support per-operation timeouts on new API like
g_socket_receive_messages(), the internal GSocket API should use
timeouts rather than boolean blocking parameters.

   (timeout == 0) === (blocking == FALSE)
   (timeout == -1) === (blocking == TRUE)
   (timeout > 0) === new behaviour

https://bugzilla.gnome.org/show_bug.cgi?id=751924
This commit is contained in:
Philip Withnall 2015-07-29 11:13:33 +01:00
parent 7f985b35ce
commit a0cefc2217

View File

@ -139,22 +139,22 @@ static GSocketAddress *
cache_recv_address (GSocket *socket, struct sockaddr *native, int native_len); cache_recv_address (GSocket *socket, struct sockaddr *native, int native_len);
static gssize static gssize
g_socket_receive_message_with_blocking (GSocket *socket, g_socket_receive_message_with_timeout (GSocket *socket,
GSocketAddress **address, GSocketAddress **address,
GInputVector *vectors, GInputVector *vectors,
gint num_vectors, gint num_vectors,
GSocketControlMessage ***messages, GSocketControlMessage ***messages,
gint *num_messages, gint *num_messages,
gint *flags, gint *flags,
gboolean blocking, gint64 timeout,
GCancellable *cancellable, GCancellable *cancellable,
GError **error); GError **error);
static gint static gint
g_socket_send_messages_with_blocking (GSocket *socket, g_socket_send_messages_with_timeout (GSocket *socket,
GOutputMessage *messages, GOutputMessage *messages,
guint num_messages, guint num_messages,
gint flags, gint flags,
gboolean blocking, gint64 timeout,
GCancellable *cancellable, GCancellable *cancellable,
GError **error); GError **error);
@ -2549,6 +2549,107 @@ g_socket_get_available_bytes (GSocket *socket)
return avail; return avail;
} }
/* Block on a timed wait for @condition until (@start_time + @timeout).
* Return %G_IO_ERROR_TIMED_OUT if the timeout is reached; otherwise %TRUE.
*/
static gboolean
block_on_timeout (GSocket *socket,
GIOCondition condition,
gint64 timeout,
gint64 start_time,
GCancellable *cancellable,
GError **error)
{
gint64 wait_timeout = -1;
g_return_val_if_fail (timeout != 0, TRUE);
/* check if we've timed out or how much time to wait at most */
if (timeout >= 0)
{
gint64 elapsed = g_get_monotonic_time () - start_time;
if (elapsed >= timeout)
{
g_set_error_literal (error,
G_IO_ERROR, G_IO_ERROR_TIMED_OUT,
_("Socket I/O timed out"));
return FALSE;
}
wait_timeout = timeout - elapsed;
}
return g_socket_condition_timed_wait (socket, condition, wait_timeout,
cancellable, error);
}
static gssize
g_socket_receive_with_timeout (GSocket *socket,
guint8 *buffer,
gsize size,
gint64 timeout,
GCancellable *cancellable,
GError **error)
{
gssize ret;
gint64 start_time;
g_return_val_if_fail (G_IS_SOCKET (socket) && buffer != NULL, -1);
start_time = g_get_monotonic_time ();
if (!check_socket (socket, error))
return -1;
if (!check_timeout (socket, error))
return -1;
if (g_cancellable_set_error_if_cancelled (cancellable, error))
return -1;
while (1)
{
if ((ret = recv (socket->priv->fd, buffer, size, 0)) < 0)
{
int errsv = get_socket_errno ();
if (errsv == EINTR)
continue;
#ifdef WSAEWOULDBLOCK
if (errsv == WSAEWOULDBLOCK)
#else
if (errsv == EWOULDBLOCK ||
errsv == EAGAIN)
#endif
{
win32_unset_event_mask (socket, FD_READ);
if (timeout != 0)
{
if (!block_on_timeout (socket, G_IO_IN, timeout, start_time,
cancellable, error))
return -1;
continue;
}
}
win32_unset_event_mask (socket, FD_READ);
socket_set_error_lazy (error, errsv, _("Error receiving data: %s"));
return -1;
}
win32_unset_event_mask (socket, FD_READ);
break;
}
return ret;
}
/** /**
* g_socket_receive: * g_socket_receive:
* @socket: a #GSocket * @socket: a #GSocket
@ -2594,9 +2695,9 @@ g_socket_receive (GSocket *socket,
GCancellable *cancellable, GCancellable *cancellable,
GError **error) GError **error)
{ {
return g_socket_receive_with_blocking (socket, buffer, size, return g_socket_receive_with_timeout (socket, (guint8 *) buffer, size,
socket->priv->blocking, socket->priv->blocking ? -1 : 0,
cancellable, error); cancellable, error);
} }
/** /**
@ -2626,59 +2727,8 @@ g_socket_receive_with_blocking (GSocket *socket,
GCancellable *cancellable, GCancellable *cancellable,
GError **error) GError **error)
{ {
gssize ret; return g_socket_receive_with_timeout (socket, (guint8 *) buffer, size,
blocking ? -1 : 0, cancellable, error);
g_return_val_if_fail (G_IS_SOCKET (socket) && buffer != NULL, -1);
if (!check_socket (socket, error))
return -1;
if (!check_timeout (socket, error))
return -1;
if (g_cancellable_set_error_if_cancelled (cancellable, error))
return -1;
while (1)
{
if ((ret = recv (socket->priv->fd, buffer, size, 0)) < 0)
{
int errsv = get_socket_errno ();
if (errsv == EINTR)
continue;
#ifdef WSAEWOULDBLOCK
if (errsv == WSAEWOULDBLOCK)
#else
if (errsv == EWOULDBLOCK ||
errsv == EAGAIN)
#endif
{
win32_unset_event_mask (socket, FD_READ);
if (blocking)
{
if (!g_socket_condition_wait (socket,
G_IO_IN, cancellable, error))
return -1;
continue;
}
}
win32_unset_event_mask (socket, FD_READ);
socket_set_error_lazy (error, errsv, _("Error receiving data: %s"));
return -1;
}
win32_unset_event_mask (socket, FD_READ);
break;
}
return ret;
} }
/** /**
@ -2733,6 +2783,67 @@ g_socket_receive_from (GSocket *socket,
#define G_SOCKET_DEFAULT_SEND_FLAGS 0 #define G_SOCKET_DEFAULT_SEND_FLAGS 0
#endif #endif
static gssize
g_socket_send_with_timeout (GSocket *socket,
const guint8 *buffer,
gsize size,
gint64 timeout,
GCancellable *cancellable,
GError **error)
{
gssize ret;
gint64 start_time;
g_return_val_if_fail (G_IS_SOCKET (socket) && buffer != NULL, -1);
start_time = g_get_monotonic_time ();
if (!check_socket (socket, error))
return -1;
if (!check_timeout (socket, error))
return -1;
if (g_cancellable_set_error_if_cancelled (cancellable, error))
return -1;
while (1)
{
if ((ret = send (socket->priv->fd, buffer, size, G_SOCKET_DEFAULT_SEND_FLAGS)) < 0)
{
int errsv = get_socket_errno ();
if (errsv == EINTR)
continue;
#ifdef WSAEWOULDBLOCK
if (errsv == WSAEWOULDBLOCK)
#else
if (errsv == EWOULDBLOCK ||
errsv == EAGAIN)
#endif
{
win32_unset_event_mask (socket, FD_WRITE);
if (timeout != 0)
{
if (!block_on_timeout (socket, G_IO_OUT, timeout, start_time,
cancellable, error))
return -1;
continue;
}
}
socket_set_error_lazy (error, errsv, _("Error sending data: %s"));
return -1;
}
break;
}
return ret;
}
/** /**
* g_socket_send: * g_socket_send:
* @socket: a #GSocket * @socket: a #GSocket
@ -2801,54 +2912,8 @@ g_socket_send_with_blocking (GSocket *socket,
GCancellable *cancellable, GCancellable *cancellable,
GError **error) GError **error)
{ {
gssize ret; return g_socket_send_with_timeout (socket, (const guint8 *) buffer, size,
blocking ? -1 : 0, cancellable, error);
g_return_val_if_fail (G_IS_SOCKET (socket) && buffer != NULL, -1);
if (!check_socket (socket, error))
return -1;
if (!check_timeout (socket, error))
return -1;
if (g_cancellable_set_error_if_cancelled (cancellable, error))
return -1;
while (1)
{
if ((ret = send (socket->priv->fd, buffer, size, G_SOCKET_DEFAULT_SEND_FLAGS)) < 0)
{
int errsv = get_socket_errno ();
if (errsv == EINTR)
continue;
#ifdef WSAEWOULDBLOCK
if (errsv == WSAEWOULDBLOCK)
#else
if (errsv == EWOULDBLOCK ||
errsv == EAGAIN)
#endif
{
win32_unset_event_mask (socket, FD_WRITE);
if (blocking)
{
if (!g_socket_condition_wait (socket,
G_IO_OUT, cancellable, error))
return -1;
continue;
}
}
socket_set_error_lazy (error, errsv, _("Error sending data: %s"));
return -1;
}
break;
}
return ret;
} }
/** /**
@ -4237,25 +4302,30 @@ g_socket_send_messages (GSocket *socket,
GCancellable *cancellable, GCancellable *cancellable,
GError **error) GError **error)
{ {
return g_socket_send_messages_with_blocking (socket, messages, num_messages, return g_socket_send_messages_with_timeout (socket, messages, num_messages,
flags, socket->priv->blocking, flags,
cancellable, error); socket->priv->blocking ? -1 : 0,
cancellable, error);
} }
static gint static gint
g_socket_send_messages_with_blocking (GSocket *socket, g_socket_send_messages_with_timeout (GSocket *socket,
GOutputMessage *messages, GOutputMessage *messages,
guint num_messages, guint num_messages,
gint flags, gint flags,
gboolean blocking, gint64 timeout,
GCancellable *cancellable, GCancellable *cancellable,
GError **error) GError **error)
{ {
gint64 start_time;
g_return_val_if_fail (G_IS_SOCKET (socket), -1); g_return_val_if_fail (G_IS_SOCKET (socket), -1);
g_return_val_if_fail (num_messages == 0 || messages != NULL, -1); g_return_val_if_fail (num_messages == 0 || messages != NULL, -1);
g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), -1); g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), -1);
g_return_val_if_fail (error == NULL || *error == NULL, -1); g_return_val_if_fail (error == NULL || *error == NULL, -1);
start_time = g_get_monotonic_time ();
if (!check_socket (socket, error)) if (!check_socket (socket, error))
return -1; return -1;
@ -4319,13 +4389,21 @@ g_socket_send_messages_with_blocking (GSocket *socket,
if (errsv == EINTR) if (errsv == EINTR)
continue; continue;
if (blocking && if (timeout != 0 &&
(errsv == EWOULDBLOCK || (errsv == EWOULDBLOCK ||
errsv == EAGAIN)) errsv == EAGAIN))
{ {
if (!g_socket_condition_wait (socket, if (!block_on_timeout (socket, G_IO_OUT, timeout, start_time,
G_IO_OUT, cancellable, error)) cancellable, error))
return -1; {
if (num_sent > 0)
{
g_clear_error (error);
break;
}
return -1;
}
continue; continue;
} }
@ -4361,23 +4439,37 @@ g_socket_send_messages_with_blocking (GSocket *socket,
{ {
gssize result; gssize result;
gint i; gint i;
gint64 wait_timeout;
wait_timeout = timeout;
for (i = 0; i < num_messages; ++i) for (i = 0; i < num_messages; ++i)
{ {
GOutputMessage *msg = &messages[i]; GOutputMessage *msg = &messages[i];
GError *msg_error = NULL; GError *msg_error = NULL;
result = g_socket_send_message (socket, msg->address, result = g_socket_send_message_with_timeout (socket, msg->address,
msg->vectors, msg->num_vectors, msg->vectors,
msg->control_messages, msg->num_vectors,
msg->num_control_messages, msg->control_messages,
flags, cancellable, &msg_error); msg->num_control_messages,
flags, wait_timeout,
cancellable, &msg_error);
/* check if we've timed out or how much time to wait at most */
if (timeout > 0)
{
gint64 elapsed = g_get_monotonic_time () - start_time;
wait_timeout = MAX (timeout - elapsed, 1);
}
if (result < 0) if (result < 0)
{ {
/* if we couldn't send all messages, just return how many we did /* if we couldn't send all messages, just return how many we did
* manage to send, provided we managed to send at least one */ * manage to send, provided we managed to send at least one */
if (msg_error->code == G_IO_ERROR_WOULD_BLOCK && i > 0) if (i > 0 &&
(g_error_matches (msg_error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) ||
g_error_matches (msg_error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)))
{ {
g_error_free (msg_error); g_error_free (msg_error);
return i; return i;
@ -4452,22 +4544,25 @@ cache_recv_address (GSocket *socket, struct sockaddr *native, int native_len)
} }
static gssize static gssize
g_socket_receive_message_with_blocking (GSocket *socket, g_socket_receive_message_with_timeout (GSocket *socket,
GSocketAddress **address, GSocketAddress **address,
GInputVector *vectors, GInputVector *vectors,
gint num_vectors, gint num_vectors,
GSocketControlMessage ***messages, GSocketControlMessage ***messages,
gint *num_messages, gint *num_messages,
gint *flags, gint *flags,
gboolean blocking, gint64 timeout,
GCancellable *cancellable, GCancellable *cancellable,
GError **error) GError **error)
{ {
GInputVector one_vector; GInputVector one_vector;
char one_byte; char one_byte;
gint64 start_time;
g_return_val_if_fail (G_IS_SOCKET (socket), -1); g_return_val_if_fail (G_IS_SOCKET (socket), -1);
start_time = g_get_monotonic_time ();
if (!check_socket (socket, error)) if (!check_socket (socket, error))
return -1; return -1;
@ -4537,15 +4632,15 @@ g_socket_receive_message_with_blocking (GSocket *socket,
if (errsv == EINTR) if (errsv == EINTR)
continue; continue;
if (blocking && if (timeout != 0 &&
(errsv == EWOULDBLOCK || (errsv == EWOULDBLOCK ||
errsv == EAGAIN)) errsv == EAGAIN))
{ {
if (!g_socket_condition_wait (socket, if (!block_on_timeout (socket, G_IO_IN, timeout, start_time,
G_IO_IN, cancellable, error)) cancellable, error))
return -1; return -1;
continue; continue;
} }
socket_set_error_lazy (error, errsv, _("Error receiving message: %s")); socket_set_error_lazy (error, errsv, _("Error receiving message: %s"));
@ -4611,10 +4706,10 @@ g_socket_receive_message_with_blocking (GSocket *socket,
{ {
win32_unset_event_mask (socket, FD_READ); win32_unset_event_mask (socket, FD_READ);
if (blocking) if (timeout != 0)
{ {
if (!g_socket_condition_wait (socket, if (!block_on_timeout (socket, G_IO_IN, timeout,
G_IO_IN, cancellable, error)) start_time, cancellable, error))
return -1; return -1;
continue; continue;
@ -4739,10 +4834,10 @@ g_socket_receive_message (GSocket *socket,
GCancellable *cancellable, GCancellable *cancellable,
GError **error) GError **error)
{ {
return g_socket_receive_message_with_blocking (socket, address, vectors, return g_socket_receive_message_with_timeout (socket, address, vectors,
num_vectors, messages, num_vectors, messages,
num_messages, flags, num_messages, flags,
socket->priv->blocking, socket->priv->blocking ? -1 : 0,
cancellable, error); cancellable, error);
} }