Merge branch '3739-socket-listener-keeps-on-giving' into 'main'

gsocketlistener: Fix accepting multiple connections in a single GMainContext iteration

Closes #3739

See merge request GNOME/glib!4717
This commit is contained in:
Michael Catanzaro
2025-07-28 02:04:34 -05:00
4 changed files with 299 additions and 39 deletions

View File

@@ -2974,6 +2974,9 @@ g_socket_accept (GSocket *socket,
if (!check_timeout (socket, error))
return NULL;
if (g_cancellable_set_error_if_cancelled (cancellable, error))
return NULL;
while (TRUE)
{
gboolean try_accept = TRUE;
@@ -3116,6 +3119,9 @@ g_socket_connect (GSocket *socket,
if (!g_socket_address_to_native (address, &buffer.storage, sizeof buffer, error))
return FALSE;
if (g_cancellable_set_error_if_cancelled (cancellable, error))
return FALSE;
if (socket->priv->remote_address)
g_object_unref (socket->priv->remote_address);
socket->priv->remote_address = g_object_ref (address);

View File

@@ -239,6 +239,32 @@ check_listener (GSocketListener *listener,
return TRUE;
}
static void
add_socket (GSocketListener *listener,
GSocket *socket,
GObject *source_object,
gboolean set_non_blocking)
{
if (source_object)
g_object_set_qdata_full (G_OBJECT (socket), source_quark,
g_object_ref (source_object),
g_object_unref);
g_ptr_array_add (listener->priv->sockets, g_object_ref (socket));
/* Because the implementation of `GSocketListener` uses polling and `GSource`s
* to wait for connections, we absolutely do *not* need `GSocket`s internal
* implementation of blocking operations to get in the way. Otherwise we end
* up calling poll() on the results of poll(), which is racy and confusing.
*
* Unfortunately, the existence of g_socket_listener_add_socket() to add a
* socket which is used elsewhere, means that we need an escape hatch
* (`!set_non_blocking`) to allow sockets to remain in blocking mode if the
* caller really wants it. */
if (set_non_blocking)
g_socket_set_blocking (socket, FALSE);
}
/**
* g_socket_listener_add_socket:
* @listener: a #GSocketListener
@@ -250,6 +276,9 @@ check_listener (GSocketListener *listener,
* new clients from. The socket must be bound to a local
* address and listened to.
*
* For parallel calls to [class@Gio.SocketListener] methods to work, the socket
* must be in non-blocking mode. (See [property@Gio.Socket:blocking].)
*
* @source_object will be passed out in the various calls
* to accept to identify this particular source, which is
* useful if you're listening on multiple addresses and do
@@ -282,13 +311,7 @@ g_socket_listener_add_socket (GSocketListener *listener,
return FALSE;
}
g_object_ref (socket);
g_ptr_array_add (listener->priv->sockets, socket);
if (source_object)
g_object_set_qdata_full (G_OBJECT (socket), source_quark,
g_object_ref (source_object), g_object_unref);
add_socket (listener, socket, source_object, FALSE);
if (G_SOCKET_LISTENER_GET_CLASS (listener)->changed)
G_SOCKET_LISTENER_GET_CLASS (listener)->changed (listener);
@@ -502,11 +525,6 @@ g_socket_listener_add_inet_port (GSocketListener *listener,
g_signal_emit (listener, signals[EVENT], 0,
G_SOCKET_LISTENER_LISTENED, socket6);
if (source_object)
g_object_set_qdata_full (G_OBJECT (socket6), source_quark,
g_object_ref (source_object),
g_object_unref);
/* If this socket already speaks IPv4 then we are done. */
if (g_socket_speaks_ipv4 (socket6))
need_ipv4_socket = FALSE;
@@ -568,11 +586,6 @@ g_socket_listener_add_inet_port (GSocketListener *listener,
{
g_signal_emit (listener, signals[EVENT], 0,
G_SOCKET_LISTENER_LISTENED, socket4);
if (source_object)
g_object_set_qdata_full (G_OBJECT (socket4), source_quark,
g_object_ref (source_object),
g_object_unref);
}
}
else
@@ -612,10 +625,10 @@ g_socket_listener_add_inet_port (GSocketListener *listener,
g_assert (socket6 != NULL || socket4 != NULL);
if (socket6 != NULL)
g_ptr_array_add (listener->priv->sockets, socket6);
add_socket (listener, socket6, source_object, TRUE);
if (socket4 != NULL)
g_ptr_array_add (listener->priv->sockets, socket4);
add_socket (listener, socket4, source_object, TRUE);
if (G_SOCKET_LISTENER_GET_CLASS (listener)->changed)
G_SOCKET_LISTENER_GET_CLASS (listener)->changed (listener);
@@ -623,6 +636,9 @@ g_socket_listener_add_inet_port (GSocketListener *listener,
g_clear_error (&socket6_listen_error);
g_clear_error (&socket4_listen_error);
g_clear_object (&socket6);
g_clear_object (&socket4);
return TRUE;
}
@@ -811,7 +827,6 @@ g_socket_listener_accept (GSocketListener *listener,
typedef struct
{
GList *sources; /* (element-type GSource) */
gboolean returned_yet;
} AcceptSocketAsyncData;
static void
@@ -830,12 +845,12 @@ accept_ready (GSocket *accept_socket,
GError *error = NULL;
GSocket *socket;
GObject *source_object;
AcceptSocketAsyncData *data = g_task_get_task_data (task);
/* Dont call g_task_return_*() multiple times if we have multiple incoming
* connections in the same #GMainContext iteration. */
if (data->returned_yet)
return G_SOURCE_REMOVE;
* connections in the same `GMainContext` iteration. We expect `GMainContext`
* to guarantee this behaviour, but lets double check that the other sources
* have been destroyed correctly. */
g_assert (!g_source_is_destroyed (g_main_current_source ()));
socket = g_socket_accept (accept_socket, g_task_get_cancellable (task), &error);
if (socket)
@@ -849,10 +864,37 @@ accept_ready (GSocket *accept_socket,
}
else
{
/* This can happen if there are multiple pending g_socket_listener_accept_async()
* calls (say N), and fewer queued incoming connections (say C) than that
* on this `GSocket`, in a single `GMainContext` iteration.
* (There may still be queued incoming connections on other `GSocket`s in
* the `GSocketListener`.)
*
* If so, the `GSocketSource`s for that pending g_socket_listener_accept_async()
* call will all raise `G_IO_IN` in this `GMainContext` iteration. The
* first C calls to accept_ready() succeed, but the following N-C calls
* would block, as all the queued incoming connections on this `GSocket`
* have been accepted by then. The `GSocketSource`s for these remaining
* g_socket_listener_accept_async() calls will not have been destroyed,
* because theres an independent set of `GSocketSource`s for each
* g_socket_listener_accept_async() call, rather than one `GSocketSource`
* for each socket in the `GSocketListener`.
*
* This is why we need sockets in the `GSocketListener` to be non-blocking:
* otherwise the above g_socket_accept() call would block. */
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
{
g_clear_error (&error);
return G_SOURCE_CONTINUE;
}
g_task_return_error (task, error);
}
data->returned_yet = TRUE;
/* Explicitly clear the task data so we know the other sources are destroyed. */
g_task_set_task_data (task, NULL, NULL);
/* Drop the final reference to the @task. */
g_object_unref (task);
return G_SOURCE_REMOVE;
@@ -893,8 +935,11 @@ g_socket_listener_accept_socket_async (GSocketListener *listener,
return;
}
/* This transfers the one strong ref of @task to all of the sources. The first
* source to be ready will call g_socket_accept() and take ownership of the
* @task. The other callbacks have to return early (if dispatched) after
* that. */
data = g_new0 (AcceptSocketAsyncData, 1);
data->returned_yet = FALSE;
data->sources = add_sources (listener,
accept_ready,
task,
@@ -1263,12 +1308,7 @@ g_socket_listener_add_any_inet_port (GSocketListener *listener,
g_signal_emit (listener, signals[EVENT], 0,
G_SOCKET_LISTENER_LISTENED, socket6);
if (source_object)
g_object_set_qdata_full (G_OBJECT (socket6), source_quark,
g_object_ref (source_object),
g_object_unref);
g_ptr_array_add (listener->priv->sockets, socket6);
add_socket (listener, socket6, source_object, TRUE);
}
else
{
@@ -1288,12 +1328,7 @@ g_socket_listener_add_any_inet_port (GSocketListener *listener,
g_signal_emit (listener, signals[EVENT], 0,
G_SOCKET_LISTENER_LISTENED, socket4);
if (source_object)
g_object_set_qdata_full (G_OBJECT (socket4), source_quark,
g_object_ref (source_object),
g_object_unref);
g_ptr_array_add (listener->priv->sockets, socket4);
add_socket (listener, socket4, source_object, TRUE);
}
else
{
@@ -1327,5 +1362,8 @@ g_socket_listener_add_any_inet_port (GSocketListener *listener,
G_SOCKET_LISTENER_GET_CLASS (listener)->changed)
G_SOCKET_LISTENER_GET_CLASS (listener)->changed (listener);
g_clear_object (&socket6);
g_clear_object (&socket4);
return candidate_port;
}

View File

@@ -23,13 +23,13 @@
#include "config.h"
#include <gio/gio.h>
#include <stdint.h>
#ifdef HAVE_RTLD_NEXT
#include <dlfcn.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <stdint.h>
#include <sys/socket.h>
#endif
@@ -549,6 +549,140 @@ test_add_inet_port_listen_failures (void)
#endif
}
static void
async_result_cb (GObject *source_object,
GAsyncResult *result,
void *user_data)
{
GAsyncResult **result_out = user_data;
g_assert (*result_out == NULL);
*result_out = g_object_ref (result);
g_main_context_wakeup (NULL);
}
static gboolean
any_are_null (const void * const *ptr_array,
size_t n_elements)
{
for (size_t i = 0; i < n_elements; i++)
if (ptr_array[i] == NULL)
return TRUE;
return FALSE;
}
static void
test_accept_multi_simultaneously (void)
{
GSocketListener *listener = NULL;
GAsyncResult *accept_results[5] = { NULL, };
struct
{
uint16_t listening_port;
GSocketClient *client;
GAsyncResult *result;
GSocketConnection *connection;
}
clients[5] = { { 0, NULL, NULL, NULL }, };
GSocketConnection *server_connection = NULL;
GCancellable *cancellable = NULL;
GError *local_error = NULL;
g_test_summary ("Test that accepting multiple pending connections on the "
"same GMainContext iteration works");
g_test_bug ("https://gitlab.gnome.org/GNOME/glib/-/issues/3739");
G_STATIC_ASSERT (G_N_ELEMENTS (clients) >= 2);
G_STATIC_ASSERT (G_N_ELEMENTS (accept_results) == G_N_ELEMENTS (clients));
listener = g_socket_listener_new ();
cancellable = g_cancellable_new ();
/* Listen on several ports at once. */
for (size_t i = 0; i < G_N_ELEMENTS (clients); i++)
{
clients[i].listening_port = g_socket_listener_add_any_inet_port (listener, NULL, &local_error);
g_assert_no_error (local_error);
}
/* Start to accept a connection, but dont iterate the `GMainContext` yet. */
g_socket_listener_accept_async (listener, cancellable, async_result_cb, &accept_results[0]);
/* Connect to multiple ports before iterating the `GMainContext`, so that
* multiple sockets are ready in the first iteration. */
for (size_t i = 0; i < G_N_ELEMENTS (clients); i++)
{
clients[i].client = g_socket_client_new ();
g_socket_client_connect_to_host_async (clients[i].client,
"localhost", clients[i].listening_port,
cancellable, async_result_cb, &clients[i].result);
}
while (accept_results[0] == NULL)
g_main_context_iteration (NULL, TRUE);
/* Exactly one server connection should have been created, because we called
* g_socket_listener_accept_async() once. */
server_connection = g_socket_listener_accept_finish (listener, accept_results[0], NULL,
&local_error);
g_assert_no_error (local_error);
g_assert_nonnull (server_connection);
g_io_stream_close (G_IO_STREAM (server_connection), NULL, NULL);
g_clear_object (&server_connection);
/* Conversely, all the client connection requests should have succeeded
* because the kernel will queue them on the server side. */
for (size_t i = 0; i < G_N_ELEMENTS (clients); i++)
{
g_assert_nonnull (clients[i].result);
clients[i].connection = g_socket_client_connect_to_host_finish (clients[i].client,
clients[i].result,
&local_error);
g_assert_no_error (local_error);
g_assert_nonnull (clients[i].connection);
}
/* Accept the remaining connections. */
for (size_t i = 1; i < G_N_ELEMENTS (accept_results); i++)
g_socket_listener_accept_async (listener, cancellable, async_result_cb, &accept_results[i]);
while (any_are_null ((const void * const *) accept_results, G_N_ELEMENTS (accept_results)))
g_main_context_iteration (NULL, TRUE);
for (size_t i = 1; i < G_N_ELEMENTS (accept_results); i++)
{
server_connection = g_socket_listener_accept_finish (listener, accept_results[i], NULL,
&local_error);
g_assert_no_error (local_error);
g_assert_nonnull (server_connection);
g_io_stream_close (G_IO_STREAM (server_connection), NULL, NULL);
g_clear_object (&server_connection);
}
/* Clean up. */
g_socket_listener_close (listener);
g_cancellable_cancel (cancellable);
while (g_main_context_iteration (NULL, FALSE));
for (size_t i = 0; i < G_N_ELEMENTS (clients); i++)
{
g_io_stream_close (G_IO_STREAM (clients[i].connection), NULL, NULL);
g_clear_object (&clients[i].connection);
g_clear_object (&clients[i].result);
g_assert_finalize_object (clients[i].client);
}
for (size_t i = 0; i < G_N_ELEMENTS (accept_results); i++)
g_clear_object (&accept_results[i]);
g_assert_finalize_object (listener);
g_clear_object (&cancellable);
}
int
main (int argc,
char *argv[])
@@ -556,6 +690,7 @@ main (int argc,
g_test_init (&argc, &argv, NULL);
g_test_add_func ("/socket-listener/event-signal", test_event_signal);
g_test_add_func ("/socket-listener/accept/multi-simultaneously", test_accept_multi_simultaneously);
g_test_add_func ("/socket-listener/add-any-inet-port/listen-failures", test_add_any_inet_port_listen_failures);
g_test_add_func ("/socket-listener/add-inet-port/listen-failures", test_add_inet_port_listen_failures);

View File

@@ -2543,6 +2543,84 @@ test_receive_bytes_from (void)
ip_test_data_free (data);
}
static void
test_accept_cancelled (void)
{
GSocket *socket = NULL;
GError *local_error = NULL;
GCancellable *cancellable = NULL;
GSocket *socket2 = NULL;
g_test_summary ("Calling g_socket_accept() with a cancelled cancellable "
"should return immediately regardless of whether the socket "
"is blocking");
socket = g_socket_new (G_SOCKET_FAMILY_IPV4,
G_SOCKET_TYPE_STREAM,
G_SOCKET_PROTOCOL_DEFAULT,
&local_error);
g_assert_no_error (local_error);
cancellable = g_cancellable_new ();
g_cancellable_cancel (cancellable);
for (unsigned int i = 0; i < 2; i++)
{
g_socket_set_blocking (socket, i);
socket2 = g_socket_accept (socket, cancellable, &local_error);
g_assert_error (local_error, G_IO_ERROR, G_IO_ERROR_CANCELLED);
g_assert_null (socket2);
g_clear_error (&local_error);
}
g_clear_object (&cancellable);
g_clear_object (&socket);
}
static void
test_connect_cancelled (void)
{
GSocket *socket = NULL;
GError *local_error = NULL;
GCancellable *cancellable = NULL;
GInetAddress *inet_addr = NULL;
GSocketAddress *addr = NULL;
g_test_summary ("Calling g_socket_connect() with a cancelled cancellable "
"should return immediately regardless of whether the socket "
"is blocking");
socket = g_socket_new (G_SOCKET_FAMILY_IPV4,
G_SOCKET_TYPE_STREAM,
G_SOCKET_PROTOCOL_DEFAULT,
&local_error);
g_assert_no_error (local_error);
cancellable = g_cancellable_new ();
g_cancellable_cancel (cancellable);
inet_addr = g_inet_address_new_loopback (G_SOCKET_FAMILY_IPV4);
addr = g_inet_socket_address_new (inet_addr, 0);
for (unsigned int i = 0; i < 2; i++)
{
gboolean retval;
g_socket_set_blocking (socket, i);
retval = g_socket_connect (socket, addr, cancellable, &local_error);
g_assert_error (local_error, G_IO_ERROR, G_IO_ERROR_CANCELLED);
g_assert_false (retval);
g_clear_error (&local_error);
}
g_clear_object (&addr);
g_clear_object (&inet_addr);
g_clear_object (&cancellable);
g_clear_object (&socket);
}
int
main (int argc,
char *argv[])
@@ -2612,5 +2690,8 @@ main (int argc,
g_test_add_func ("/socket/receive_bytes", test_receive_bytes);
g_test_add_func ("/socket/receive_bytes_from", test_receive_bytes_from);
g_test_add_func ("/socket/accept/cancelled", test_accept_cancelled);
g_test_add_func ("/socket/connect/cancelled", test_connect_cancelled);
return g_test_run();
}