diff --git a/gio/gsocket.c b/gio/gsocket.c index 44e935263..0a2a523e2 100644 --- a/gio/gsocket.c +++ b/gio/gsocket.c @@ -2974,6 +2974,9 @@ g_socket_accept (GSocket *socket, if (!check_timeout (socket, error)) return NULL; + if (g_cancellable_set_error_if_cancelled (cancellable, error)) + return NULL; + while (TRUE) { gboolean try_accept = TRUE; @@ -3116,6 +3119,9 @@ g_socket_connect (GSocket *socket, if (!g_socket_address_to_native (address, &buffer.storage, sizeof buffer, error)) return FALSE; + if (g_cancellable_set_error_if_cancelled (cancellable, error)) + return FALSE; + if (socket->priv->remote_address) g_object_unref (socket->priv->remote_address); socket->priv->remote_address = g_object_ref (address); diff --git a/gio/gsocketlistener.c b/gio/gsocketlistener.c index 974720008..de367b915 100644 --- a/gio/gsocketlistener.c +++ b/gio/gsocketlistener.c @@ -239,6 +239,32 @@ check_listener (GSocketListener *listener, return TRUE; } +static void +add_socket (GSocketListener *listener, + GSocket *socket, + GObject *source_object, + gboolean set_non_blocking) +{ + if (source_object) + g_object_set_qdata_full (G_OBJECT (socket), source_quark, + g_object_ref (source_object), + g_object_unref); + + g_ptr_array_add (listener->priv->sockets, g_object_ref (socket)); + + /* Because the implementation of `GSocketListener` uses polling and `GSource`s + * to wait for connections, we absolutely do *not* need `GSocket`’s internal + * implementation of blocking operations to get in the way. Otherwise we end + * up calling poll() on the results of poll(), which is racy and confusing. + * + * Unfortunately, the existence of g_socket_listener_add_socket() to add a + * socket which is used elsewhere, means that we need an escape hatch + * (`!set_non_blocking`) to allow sockets to remain in blocking mode if the + * caller really wants it. */ + if (set_non_blocking) + g_socket_set_blocking (socket, FALSE); +} + /** * g_socket_listener_add_socket: * @listener: a #GSocketListener @@ -250,6 +276,9 @@ check_listener (GSocketListener *listener, * new clients from. The socket must be bound to a local * address and listened to. * + * For parallel calls to [class@Gio.SocketListener] methods to work, the socket + * must be in non-blocking mode. (See [property@Gio.Socket:blocking].) + * * @source_object will be passed out in the various calls * to accept to identify this particular source, which is * useful if you're listening on multiple addresses and do @@ -282,13 +311,7 @@ g_socket_listener_add_socket (GSocketListener *listener, return FALSE; } - g_object_ref (socket); - g_ptr_array_add (listener->priv->sockets, socket); - - if (source_object) - g_object_set_qdata_full (G_OBJECT (socket), source_quark, - g_object_ref (source_object), g_object_unref); - + add_socket (listener, socket, source_object, FALSE); if (G_SOCKET_LISTENER_GET_CLASS (listener)->changed) G_SOCKET_LISTENER_GET_CLASS (listener)->changed (listener); @@ -502,11 +525,6 @@ g_socket_listener_add_inet_port (GSocketListener *listener, g_signal_emit (listener, signals[EVENT], 0, G_SOCKET_LISTENER_LISTENED, socket6); - if (source_object) - g_object_set_qdata_full (G_OBJECT (socket6), source_quark, - g_object_ref (source_object), - g_object_unref); - /* If this socket already speaks IPv4 then we are done. */ if (g_socket_speaks_ipv4 (socket6)) need_ipv4_socket = FALSE; @@ -568,11 +586,6 @@ g_socket_listener_add_inet_port (GSocketListener *listener, { g_signal_emit (listener, signals[EVENT], 0, G_SOCKET_LISTENER_LISTENED, socket4); - - if (source_object) - g_object_set_qdata_full (G_OBJECT (socket4), source_quark, - g_object_ref (source_object), - g_object_unref); } } else @@ -612,10 +625,10 @@ g_socket_listener_add_inet_port (GSocketListener *listener, g_assert (socket6 != NULL || socket4 != NULL); if (socket6 != NULL) - g_ptr_array_add (listener->priv->sockets, socket6); + add_socket (listener, socket6, source_object, TRUE); if (socket4 != NULL) - g_ptr_array_add (listener->priv->sockets, socket4); + add_socket (listener, socket4, source_object, TRUE); if (G_SOCKET_LISTENER_GET_CLASS (listener)->changed) G_SOCKET_LISTENER_GET_CLASS (listener)->changed (listener); @@ -623,6 +636,9 @@ g_socket_listener_add_inet_port (GSocketListener *listener, g_clear_error (&socket6_listen_error); g_clear_error (&socket4_listen_error); + g_clear_object (&socket6); + g_clear_object (&socket4); + return TRUE; } @@ -811,7 +827,6 @@ g_socket_listener_accept (GSocketListener *listener, typedef struct { GList *sources; /* (element-type GSource) */ - gboolean returned_yet; } AcceptSocketAsyncData; static void @@ -830,12 +845,12 @@ accept_ready (GSocket *accept_socket, GError *error = NULL; GSocket *socket; GObject *source_object; - AcceptSocketAsyncData *data = g_task_get_task_data (task); /* Don’t call g_task_return_*() multiple times if we have multiple incoming - * connections in the same #GMainContext iteration. */ - if (data->returned_yet) - return G_SOURCE_REMOVE; + * connections in the same `GMainContext` iteration. We expect `GMainContext` + * to guarantee this behaviour, but let’s double check that the other sources + * have been destroyed correctly. */ + g_assert (!g_source_is_destroyed (g_main_current_source ())); socket = g_socket_accept (accept_socket, g_task_get_cancellable (task), &error); if (socket) @@ -849,10 +864,37 @@ accept_ready (GSocket *accept_socket, } else { + /* This can happen if there are multiple pending g_socket_listener_accept_async() + * calls (say N), and fewer queued incoming connections (say C) than that + * on this `GSocket`, in a single `GMainContext` iteration. + * (There may still be queued incoming connections on other `GSocket`s in + * the `GSocketListener`.) + * + * If so, the `GSocketSource`s for that pending g_socket_listener_accept_async() + * call will all raise `G_IO_IN` in this `GMainContext` iteration. The + * first C calls to accept_ready() succeed, but the following N-C calls + * would block, as all the queued incoming connections on this `GSocket` + * have been accepted by then. The `GSocketSource`s for these remaining + * g_socket_listener_accept_async() calls will not have been destroyed, + * because there’s an independent set of `GSocketSource`s for each + * g_socket_listener_accept_async() call, rather than one `GSocketSource` + * for each socket in the `GSocketListener`. + * + * This is why we need sockets in the `GSocketListener` to be non-blocking: + * otherwise the above g_socket_accept() call would block. */ + if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) + { + g_clear_error (&error); + return G_SOURCE_CONTINUE; + } + g_task_return_error (task, error); } - data->returned_yet = TRUE; + /* Explicitly clear the task data so we know the other sources are destroyed. */ + g_task_set_task_data (task, NULL, NULL); + + /* Drop the final reference to the @task. */ g_object_unref (task); return G_SOURCE_REMOVE; @@ -893,8 +935,11 @@ g_socket_listener_accept_socket_async (GSocketListener *listener, return; } + /* This transfers the one strong ref of @task to all of the sources. The first + * source to be ready will call g_socket_accept() and take ownership of the + * @task. The other callbacks have to return early (if dispatched) after + * that. */ data = g_new0 (AcceptSocketAsyncData, 1); - data->returned_yet = FALSE; data->sources = add_sources (listener, accept_ready, task, @@ -1263,12 +1308,7 @@ g_socket_listener_add_any_inet_port (GSocketListener *listener, g_signal_emit (listener, signals[EVENT], 0, G_SOCKET_LISTENER_LISTENED, socket6); - if (source_object) - g_object_set_qdata_full (G_OBJECT (socket6), source_quark, - g_object_ref (source_object), - g_object_unref); - - g_ptr_array_add (listener->priv->sockets, socket6); + add_socket (listener, socket6, source_object, TRUE); } else { @@ -1288,12 +1328,7 @@ g_socket_listener_add_any_inet_port (GSocketListener *listener, g_signal_emit (listener, signals[EVENT], 0, G_SOCKET_LISTENER_LISTENED, socket4); - if (source_object) - g_object_set_qdata_full (G_OBJECT (socket4), source_quark, - g_object_ref (source_object), - g_object_unref); - - g_ptr_array_add (listener->priv->sockets, socket4); + add_socket (listener, socket4, source_object, TRUE); } else { @@ -1327,5 +1362,8 @@ g_socket_listener_add_any_inet_port (GSocketListener *listener, G_SOCKET_LISTENER_GET_CLASS (listener)->changed) G_SOCKET_LISTENER_GET_CLASS (listener)->changed (listener); + g_clear_object (&socket6); + g_clear_object (&socket4); + return candidate_port; } diff --git a/gio/tests/socket-listener.c b/gio/tests/socket-listener.c index ee2d8fc63..e219f966b 100644 --- a/gio/tests/socket-listener.c +++ b/gio/tests/socket-listener.c @@ -23,13 +23,13 @@ #include "config.h" #include +#include #ifdef HAVE_RTLD_NEXT #include #include #include #include -#include #include #endif @@ -549,6 +549,140 @@ test_add_inet_port_listen_failures (void) #endif } +static void +async_result_cb (GObject *source_object, + GAsyncResult *result, + void *user_data) +{ + GAsyncResult **result_out = user_data; + + g_assert (*result_out == NULL); + *result_out = g_object_ref (result); + + g_main_context_wakeup (NULL); +} + +static gboolean +any_are_null (const void * const *ptr_array, + size_t n_elements) +{ + for (size_t i = 0; i < n_elements; i++) + if (ptr_array[i] == NULL) + return TRUE; + + return FALSE; +} + +static void +test_accept_multi_simultaneously (void) +{ + GSocketListener *listener = NULL; + GAsyncResult *accept_results[5] = { NULL, }; + struct + { + uint16_t listening_port; + GSocketClient *client; + GAsyncResult *result; + GSocketConnection *connection; + } + clients[5] = { { 0, NULL, NULL, NULL }, }; + GSocketConnection *server_connection = NULL; + GCancellable *cancellable = NULL; + GError *local_error = NULL; + + g_test_summary ("Test that accepting multiple pending connections on the " + "same GMainContext iteration works"); + g_test_bug ("https://gitlab.gnome.org/GNOME/glib/-/issues/3739"); + + G_STATIC_ASSERT (G_N_ELEMENTS (clients) >= 2); + G_STATIC_ASSERT (G_N_ELEMENTS (accept_results) == G_N_ELEMENTS (clients)); + + listener = g_socket_listener_new (); + cancellable = g_cancellable_new (); + + /* Listen on several ports at once. */ + for (size_t i = 0; i < G_N_ELEMENTS (clients); i++) + { + clients[i].listening_port = g_socket_listener_add_any_inet_port (listener, NULL, &local_error); + g_assert_no_error (local_error); + } + + /* Start to accept a connection, but don’t iterate the `GMainContext` yet. */ + g_socket_listener_accept_async (listener, cancellable, async_result_cb, &accept_results[0]); + + /* Connect to multiple ports before iterating the `GMainContext`, so that + * multiple sockets are ready in the first iteration. */ + for (size_t i = 0; i < G_N_ELEMENTS (clients); i++) + { + clients[i].client = g_socket_client_new (); + g_socket_client_connect_to_host_async (clients[i].client, + "localhost", clients[i].listening_port, + cancellable, async_result_cb, &clients[i].result); + } + + while (accept_results[0] == NULL) + g_main_context_iteration (NULL, TRUE); + + /* Exactly one server connection should have been created, because we called + * g_socket_listener_accept_async() once. */ + server_connection = g_socket_listener_accept_finish (listener, accept_results[0], NULL, + &local_error); + g_assert_no_error (local_error); + g_assert_nonnull (server_connection); + g_io_stream_close (G_IO_STREAM (server_connection), NULL, NULL); + g_clear_object (&server_connection); + + /* Conversely, all the client connection requests should have succeeded + * because the kernel will queue them on the server side. */ + for (size_t i = 0; i < G_N_ELEMENTS (clients); i++) + { + g_assert_nonnull (clients[i].result); + + clients[i].connection = g_socket_client_connect_to_host_finish (clients[i].client, + clients[i].result, + &local_error); + g_assert_no_error (local_error); + g_assert_nonnull (clients[i].connection); + } + + /* Accept the remaining connections. */ + for (size_t i = 1; i < G_N_ELEMENTS (accept_results); i++) + g_socket_listener_accept_async (listener, cancellable, async_result_cb, &accept_results[i]); + + while (any_are_null ((const void * const *) accept_results, G_N_ELEMENTS (accept_results))) + g_main_context_iteration (NULL, TRUE); + + for (size_t i = 1; i < G_N_ELEMENTS (accept_results); i++) + { + server_connection = g_socket_listener_accept_finish (listener, accept_results[i], NULL, + &local_error); + g_assert_no_error (local_error); + g_assert_nonnull (server_connection); + g_io_stream_close (G_IO_STREAM (server_connection), NULL, NULL); + g_clear_object (&server_connection); + } + + /* Clean up. */ + g_socket_listener_close (listener); + g_cancellable_cancel (cancellable); + + while (g_main_context_iteration (NULL, FALSE)); + + for (size_t i = 0; i < G_N_ELEMENTS (clients); i++) + { + g_io_stream_close (G_IO_STREAM (clients[i].connection), NULL, NULL); + g_clear_object (&clients[i].connection); + g_clear_object (&clients[i].result); + g_assert_finalize_object (clients[i].client); + } + + for (size_t i = 0; i < G_N_ELEMENTS (accept_results); i++) + g_clear_object (&accept_results[i]); + + g_assert_finalize_object (listener); + g_clear_object (&cancellable); +} + int main (int argc, char *argv[]) @@ -556,6 +690,7 @@ main (int argc, g_test_init (&argc, &argv, NULL); g_test_add_func ("/socket-listener/event-signal", test_event_signal); + g_test_add_func ("/socket-listener/accept/multi-simultaneously", test_accept_multi_simultaneously); g_test_add_func ("/socket-listener/add-any-inet-port/listen-failures", test_add_any_inet_port_listen_failures); g_test_add_func ("/socket-listener/add-inet-port/listen-failures", test_add_inet_port_listen_failures); diff --git a/gio/tests/socket.c b/gio/tests/socket.c index 5d3278e0f..e631c42d2 100644 --- a/gio/tests/socket.c +++ b/gio/tests/socket.c @@ -2543,6 +2543,84 @@ test_receive_bytes_from (void) ip_test_data_free (data); } +static void +test_accept_cancelled (void) +{ + GSocket *socket = NULL; + GError *local_error = NULL; + GCancellable *cancellable = NULL; + GSocket *socket2 = NULL; + + g_test_summary ("Calling g_socket_accept() with a cancelled cancellable " + "should return immediately regardless of whether the socket " + "is blocking"); + + socket = g_socket_new (G_SOCKET_FAMILY_IPV4, + G_SOCKET_TYPE_STREAM, + G_SOCKET_PROTOCOL_DEFAULT, + &local_error); + g_assert_no_error (local_error); + + cancellable = g_cancellable_new (); + g_cancellable_cancel (cancellable); + + for (unsigned int i = 0; i < 2; i++) + { + g_socket_set_blocking (socket, i); + + socket2 = g_socket_accept (socket, cancellable, &local_error); + g_assert_error (local_error, G_IO_ERROR, G_IO_ERROR_CANCELLED); + g_assert_null (socket2); + g_clear_error (&local_error); + } + + g_clear_object (&cancellable); + g_clear_object (&socket); +} + +static void +test_connect_cancelled (void) +{ + GSocket *socket = NULL; + GError *local_error = NULL; + GCancellable *cancellable = NULL; + GInetAddress *inet_addr = NULL; + GSocketAddress *addr = NULL; + + g_test_summary ("Calling g_socket_connect() with a cancelled cancellable " + "should return immediately regardless of whether the socket " + "is blocking"); + + socket = g_socket_new (G_SOCKET_FAMILY_IPV4, + G_SOCKET_TYPE_STREAM, + G_SOCKET_PROTOCOL_DEFAULT, + &local_error); + g_assert_no_error (local_error); + + cancellable = g_cancellable_new (); + g_cancellable_cancel (cancellable); + + inet_addr = g_inet_address_new_loopback (G_SOCKET_FAMILY_IPV4); + addr = g_inet_socket_address_new (inet_addr, 0); + + for (unsigned int i = 0; i < 2; i++) + { + gboolean retval; + + g_socket_set_blocking (socket, i); + + retval = g_socket_connect (socket, addr, cancellable, &local_error); + g_assert_error (local_error, G_IO_ERROR, G_IO_ERROR_CANCELLED); + g_assert_false (retval); + g_clear_error (&local_error); + } + + g_clear_object (&addr); + g_clear_object (&inet_addr); + g_clear_object (&cancellable); + g_clear_object (&socket); +} + int main (int argc, char *argv[]) @@ -2612,5 +2690,8 @@ main (int argc, g_test_add_func ("/socket/receive_bytes", test_receive_bytes); g_test_add_func ("/socket/receive_bytes_from", test_receive_bytes_from); + g_test_add_func ("/socket/accept/cancelled", test_accept_cancelled); + g_test_add_func ("/socket/connect/cancelled", test_connect_cancelled); + return g_test_run(); }