Merge branch 'wip/tingping/socket-client-data-races-fix' into 'master'

gsocketclient: Refactor g_socket_client_connect_async()

Closes #1995, #1872, #1902, #1989, and #1871

See merge request GNOME/glib!1339
This commit is contained in:
Philip Withnall 2020-02-14 19:10:59 +00:00
commit dff212effc

View File

@ -1337,13 +1337,15 @@ typedef struct
GSocketConnectable *connectable;
GSocketAddressEnumerator *enumerator;
GProxyAddress *proxy_addr;
GSocket *socket;
GIOStream *connection;
GCancellable *enumeration_cancellable;
GSList *connection_attempts;
GSList *successful_connections;
GError *last_error;
gboolean enumerated_at_least_once;
gboolean enumeration_completed;
gboolean connection_in_progress;
gboolean completed;
} GSocketClientAsyncConnectData;
@ -1355,10 +1357,9 @@ g_socket_client_async_connect_data_free (GSocketClientAsyncConnectData *data)
data->task = NULL;
g_clear_object (&data->connectable);
g_clear_object (&data->enumerator);
g_clear_object (&data->proxy_addr);
g_clear_object (&data->socket);
g_clear_object (&data->connection);
g_clear_object (&data->enumeration_cancellable);
g_slist_free_full (data->connection_attempts, connection_attempt_unref);
g_slist_free_full (data->successful_connections, connection_attempt_unref);
g_clear_error (&data->last_error);
@ -1370,6 +1371,7 @@ typedef struct
GSocketAddress *address;
GSocket *socket;
GIOStream *connection;
GProxyAddress *proxy_addr;
GSocketClientAsyncConnectData *data; /* unowned */
GSource *timeout_source;
GCancellable *cancellable;
@ -1401,6 +1403,7 @@ connection_attempt_unref (gpointer pointer)
g_clear_object (&attempt->socket);
g_clear_object (&attempt->connection);
g_clear_object (&attempt->cancellable);
g_clear_object (&attempt->proxy_addr);
if (attempt->timeout_source)
{
g_source_destroy (attempt->timeout_source);
@ -1418,37 +1421,59 @@ connection_attempt_remove (ConnectionAttempt *attempt)
}
static void
g_socket_client_async_connect_complete (GSocketClientAsyncConnectData *data)
cancel_all_attempts (GSocketClientAsyncConnectData *data)
{
g_assert (data->connection);
GSList *l;
if (!G_IS_SOCKET_CONNECTION (data->connection))
for (l = data->connection_attempts; l; l = g_slist_next (l))
{
ConnectionAttempt *attempt_entry = l->data;
g_cancellable_cancel (attempt_entry->cancellable);
connection_attempt_unref (attempt_entry);
}
g_slist_free (data->connection_attempts);
data->connection_attempts = NULL;
g_slist_free_full (data->successful_connections, connection_attempt_unref);
data->successful_connections = NULL;
g_cancellable_cancel (data->enumeration_cancellable);
}
static void
g_socket_client_async_connect_complete (ConnectionAttempt *attempt)
{
GSocketClientAsyncConnectData *data = attempt->data;
GError *error = NULL;
g_assert (attempt->connection);
g_assert (!data->completed);
if (!G_IS_SOCKET_CONNECTION (attempt->connection))
{
GSocketConnection *wrapper_connection;
wrapper_connection = g_tcp_wrapper_connection_new (data->connection, data->socket);
g_object_unref (data->connection);
data->connection = (GIOStream *)wrapper_connection;
wrapper_connection = g_tcp_wrapper_connection_new (attempt->connection, attempt->socket);
g_object_unref (attempt->connection);
attempt->connection = (GIOStream *)wrapper_connection;
}
if (!data->completed)
data->completed = TRUE;
cancel_all_attempts (data);
if (g_cancellable_set_error_if_cancelled (g_task_get_cancellable (data->task), &error))
{
GError *error = NULL;
if (g_cancellable_set_error_if_cancelled (g_task_get_cancellable (data->task), &error))
{
g_socket_client_emit_event (data->client, G_SOCKET_CLIENT_COMPLETE, data->connectable, NULL);
g_task_return_error (data->task, g_steal_pointer (&error));
}
else
{
g_socket_client_emit_event (data->client, G_SOCKET_CLIENT_COMPLETE, data->connectable, data->connection);
g_task_return_pointer (data->task, g_steal_pointer (&data->connection), g_object_unref);
}
data->completed = TRUE;
g_debug ("GSocketClient: Connection cancelled!");
g_socket_client_emit_event (data->client, G_SOCKET_CLIENT_COMPLETE, data->connectable, NULL);
g_task_return_error (data->task, g_steal_pointer (&error));
}
else
{
g_debug ("GSocketClient: Connection successful!");
g_socket_client_emit_event (data->client, G_SOCKET_CLIENT_COMPLETE, data->connectable, attempt->connection);
g_task_return_pointer (data->task, g_steal_pointer (&attempt->connection), g_object_unref);
}
connection_attempt_unref (attempt);
g_object_unref (data->task);
}
@ -1470,59 +1495,63 @@ static void
enumerator_next_async (GSocketClientAsyncConnectData *data,
gboolean add_task_ref)
{
/* We need to cleanup the state */
g_clear_object (&data->socket);
g_clear_object (&data->proxy_addr);
g_clear_object (&data->connection);
/* Each enumeration takes a ref. This arg just avoids repeated unrefs when
an enumeration starts another enumeration */
if (add_task_ref)
g_object_ref (data->task);
g_socket_client_emit_event (data->client, G_SOCKET_CLIENT_RESOLVING, data->connectable, NULL);
g_debug ("GSocketClient: Starting new address enumeration");
g_socket_address_enumerator_next_async (data->enumerator,
g_task_get_cancellable (data->task),
data->enumeration_cancellable,
g_socket_client_enumerator_callback,
data);
}
static void try_next_connection_or_finish (GSocketClientAsyncConnectData *, gboolean);
static void
g_socket_client_tls_handshake_callback (GObject *object,
GAsyncResult *result,
gpointer user_data)
{
GSocketClientAsyncConnectData *data = user_data;
ConnectionAttempt *attempt = user_data;
GSocketClientAsyncConnectData *data = attempt->data;
if (g_tls_connection_handshake_finish (G_TLS_CONNECTION (object),
result,
&data->last_error))
{
g_object_unref (data->connection);
data->connection = G_IO_STREAM (object);
g_object_unref (attempt->connection);
attempt->connection = G_IO_STREAM (object);
g_socket_client_emit_event (data->client, G_SOCKET_CLIENT_TLS_HANDSHAKED, data->connectable, data->connection);
g_socket_client_async_connect_complete (data);
g_debug ("GSocketClient: TLS handshake succeeded");
g_socket_client_emit_event (data->client, G_SOCKET_CLIENT_TLS_HANDSHAKED, data->connectable, attempt->connection);
g_socket_client_async_connect_complete (attempt);
}
else
{
g_object_unref (object);
enumerator_next_async (data, FALSE);
connection_attempt_unref (attempt);
g_debug ("GSocketClient: TLS handshake failed: %s", data->last_error->message);
try_next_connection_or_finish (data, TRUE);
}
}
static void
g_socket_client_tls_handshake (GSocketClientAsyncConnectData *data)
g_socket_client_tls_handshake (ConnectionAttempt *attempt)
{
GSocketClientAsyncConnectData *data = attempt->data;
GIOStream *tlsconn;
if (!data->client->priv->tls)
{
g_socket_client_async_connect_complete (data);
g_socket_client_async_connect_complete (attempt);
return;
}
tlsconn = g_tls_client_connection_new (data->connection,
g_debug ("GSocketClient: Starting TLS handshake");
tlsconn = g_tls_client_connection_new (attempt->connection,
data->connectable,
&data->last_error);
if (tlsconn)
@ -1534,11 +1563,12 @@ g_socket_client_tls_handshake (GSocketClientAsyncConnectData *data)
G_PRIORITY_DEFAULT,
g_task_get_cancellable (data->task),
g_socket_client_tls_handshake_callback,
data);
attempt);
}
else
{
enumerator_next_async (data, FALSE);
connection_attempt_unref (attempt);
try_next_connection_or_finish (data, TRUE);
}
}
@ -1547,23 +1577,38 @@ g_socket_client_proxy_connect_callback (GObject *object,
GAsyncResult *result,
gpointer user_data)
{
GSocketClientAsyncConnectData *data = user_data;
ConnectionAttempt *attempt = user_data;
GSocketClientAsyncConnectData *data = attempt->data;
g_object_unref (data->connection);
data->connection = g_proxy_connect_finish (G_PROXY (object),
result,
&data->last_error);
if (data->connection)
g_object_unref (attempt->connection);
attempt->connection = g_proxy_connect_finish (G_PROXY (object),
result,
&data->last_error);
if (attempt->connection)
{
g_socket_client_emit_event (data->client, G_SOCKET_CLIENT_PROXY_NEGOTIATED, data->connectable, data->connection);
g_socket_client_emit_event (data->client, G_SOCKET_CLIENT_PROXY_NEGOTIATED, data->connectable, attempt->connection);
}
else
{
enumerator_next_async (data, FALSE);
connection_attempt_unref (attempt);
try_next_connection_or_finish (data, TRUE);
return;
}
g_socket_client_tls_handshake (data);
g_socket_client_tls_handshake (attempt);
}
static void
complete_connection_with_error (GSocketClientAsyncConnectData *data,
GError *error)
{
g_debug ("GSocketClient: Connection failed: %s", error->message);
g_assert (!data->completed);
g_socket_client_emit_event (data->client, G_SOCKET_CLIENT_COMPLETE, data->connectable, NULL);
data->completed = TRUE;
cancel_all_attempts (data);
g_task_return_error (data->task, error);
}
static gboolean
@ -1577,15 +1622,114 @@ task_completed_or_cancelled (GSocketClientAsyncConnectData *data)
return TRUE;
else if (g_cancellable_set_error_if_cancelled (cancellable, &error))
{
g_socket_client_emit_event (data->client, G_SOCKET_CLIENT_COMPLETE, data->connectable, NULL);
g_task_return_error (task, g_steal_pointer (&error));
data->completed = TRUE;
complete_connection_with_error (data, g_steal_pointer (&error));
return TRUE;
}
else
return FALSE;
}
static gboolean
try_next_successful_connection (GSocketClientAsyncConnectData *data)
{
ConnectionAttempt *attempt;
const gchar *protocol;
GProxy *proxy;
if (data->connection_in_progress)
return FALSE;
g_assert (data->successful_connections != NULL);
attempt = data->successful_connections->data;
g_assert (attempt != NULL);
data->successful_connections = g_slist_remove (data->successful_connections, attempt);
data->connection_in_progress = TRUE;
g_debug ("GSocketClient: Starting application layer connection");
if (!attempt->proxy_addr)
{
g_socket_client_tls_handshake (g_steal_pointer (&attempt));
return TRUE;
}
protocol = g_proxy_address_get_protocol (attempt->proxy_addr);
/* The connection should not be anything other than TCP,
* but let's put a safety guard in case
*/
if (!G_IS_TCP_CONNECTION (attempt->connection))
{
g_critical ("Trying to proxy over non-TCP connection, this is "
"most likely a bug in GLib IO library.");
g_set_error_literal (&data->last_error,
G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
_("Proxying over a non-TCP connection is not supported."));
}
else if (g_hash_table_contains (data->client->priv->app_proxies, protocol))
{
/* Simply complete the connection, we don't want to do TLS handshake
* as the application proxy handling may need proxy handshake first */
g_socket_client_async_connect_complete (g_steal_pointer (&attempt));
return TRUE;
}
else if ((proxy = g_proxy_get_default_for_protocol (protocol)))
{
GIOStream *connection = attempt->connection;
GProxyAddress *proxy_addr = attempt->proxy_addr;
g_socket_client_emit_event (data->client, G_SOCKET_CLIENT_PROXY_NEGOTIATING, data->connectable, attempt->connection);
g_debug ("GSocketClient: Starting proxy connection");
g_proxy_connect_async (proxy,
connection,
proxy_addr,
g_task_get_cancellable (data->task),
g_socket_client_proxy_connect_callback,
g_steal_pointer (&attempt));
g_object_unref (proxy);
return TRUE;
}
else
{
g_clear_error (&data->last_error);
g_set_error (&data->last_error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
_("Proxy protocol “%s” is not supported."),
protocol);
}
data->connection_in_progress = FALSE;
g_clear_pointer (&attempt, connection_attempt_unref);
return FALSE; /* All non-return paths are failures */
}
static void
try_next_connection_or_finish (GSocketClientAsyncConnectData *data,
gboolean end_current_connection)
{
if (end_current_connection)
data->connection_in_progress = FALSE;
if (data->connection_in_progress)
return;
/* Keep trying successful connections until one works, each iteration pops one */
while (data->successful_connections)
{
if (try_next_successful_connection (data))
return;
}
if (!data->enumeration_completed)
{
enumerator_next_async (data, FALSE);
return;
}
complete_connection_with_error (data, data->last_error);
}
static void
g_socket_client_connected_callback (GObject *source,
GAsyncResult *result,
@ -1593,10 +1737,7 @@ g_socket_client_connected_callback (GObject *source,
{
ConnectionAttempt *attempt = user_data;
GSocketClientAsyncConnectData *data = attempt->data;
GSList *l;
GError *error = NULL;
GProxy *proxy;
const gchar *protocol;
if (task_completed_or_cancelled (data) || g_cancellable_is_cancelled (attempt->cancellable))
{
@ -1618,11 +1759,12 @@ g_socket_client_connected_callback (GObject *source,
{
clarify_connect_error (error, data->connectable, attempt->address);
set_last_error (data, error);
g_debug ("GSocketClient: Connection attempt failed: %s", error->message);
connection_attempt_remove (attempt);
enumerator_next_async (data, FALSE);
connection_attempt_unref (attempt);
try_next_connection_or_finish (data, FALSE);
}
else
else /* Silently ignore cancelled attempts */
{
g_clear_error (&error);
g_object_unref (data->task);
@ -1632,74 +1774,21 @@ g_socket_client_connected_callback (GObject *source,
return;
}
data->socket = g_steal_pointer (&attempt->socket);
data->connection = g_steal_pointer (&attempt->connection);
for (l = data->connection_attempts; l; l = g_slist_next (l))
{
ConnectionAttempt *attempt_entry = l->data;
g_cancellable_cancel (attempt_entry->cancellable);
connection_attempt_unref (attempt_entry);
}
g_slist_free (data->connection_attempts);
data->connection_attempts = NULL;
connection_attempt_unref (attempt);
g_socket_connection_set_cached_remote_address ((GSocketConnection*)data->connection, NULL);
g_socket_client_emit_event (data->client, G_SOCKET_CLIENT_CONNECTED, data->connectable, data->connection);
g_socket_connection_set_cached_remote_address ((GSocketConnection*)attempt->connection, NULL);
g_debug ("GSocketClient: TCP connection successful");
g_socket_client_emit_event (data->client, G_SOCKET_CLIENT_CONNECTED, data->connectable, attempt->connection);
/* wrong, but backward compatible */
g_socket_set_blocking (data->socket, TRUE);
g_socket_set_blocking (attempt->socket, TRUE);
if (!data->proxy_addr)
{
g_socket_client_tls_handshake (data);
return;
}
protocol = g_proxy_address_get_protocol (data->proxy_addr);
/* The connection should not be anything other than TCP,
* but let's put a safety guard in case
/* This ends the parallel "happy eyeballs" portion of connecting.
Now that we have a successful tcp connection we will attempt to connect
at the TLS/Proxy layer. If those layers fail we will move on to the next
connection.
*/
if (!G_IS_TCP_CONNECTION (data->connection))
{
g_critical ("Trying to proxy over non-TCP connection, this is "
"most likely a bug in GLib IO library.");
g_set_error_literal (&data->last_error,
G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
_("Proxying over a non-TCP connection is not supported."));
enumerator_next_async (data, FALSE);
}
else if (g_hash_table_contains (data->client->priv->app_proxies, protocol))
{
/* Simply complete the connection, we don't want to do TLS handshake
* as the application proxy handling may need proxy handshake first */
g_socket_client_async_connect_complete (data);
}
else if ((proxy = g_proxy_get_default_for_protocol (protocol)))
{
g_socket_client_emit_event (data->client, G_SOCKET_CLIENT_PROXY_NEGOTIATING, data->connectable, data->connection);
g_proxy_connect_async (proxy,
data->connection,
data->proxy_addr,
g_task_get_cancellable (data->task),
g_socket_client_proxy_connect_callback,
data);
g_object_unref (proxy);
}
else
{
g_clear_error (&data->last_error);
g_set_error (&data->last_error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
_("Proxy protocol “%s” is not supported."),
protocol);
enumerator_next_async (data, FALSE);
}
connection_attempt_remove (attempt);
data->successful_connections = g_slist_append (data->successful_connections, g_steal_pointer (&attempt));
try_next_connection_or_finish (data, FALSE);
}
static gboolean
@ -1707,7 +1796,11 @@ on_connection_attempt_timeout (gpointer data)
{
ConnectionAttempt *attempt = data;
enumerator_next_async (attempt->data, TRUE);
if (!attempt->data->enumeration_completed)
{
g_debug ("GSocketClient: Timeout reached, trying another enumeration");
enumerator_next_async (attempt->data, TRUE);
}
g_clear_pointer (&attempt->timeout_source, g_source_unref);
return G_SOURCE_REMOVE;
@ -1717,9 +1810,9 @@ static void
on_connection_cancelled (GCancellable *cancellable,
gpointer data)
{
GCancellable *attempt_cancellable = data;
GCancellable *linked_cancellable = G_CANCELLABLE (data);
g_cancellable_cancel (attempt_cancellable);
g_cancellable_cancel (linked_cancellable);
}
static void
@ -1743,39 +1836,49 @@ g_socket_client_enumerator_callback (GObject *object,
result, &error);
if (address == NULL)
{
if (data->connection_attempts)
if (G_UNLIKELY (data->enumeration_completed))
return;
data->enumeration_completed = TRUE;
g_debug ("GSocketClient: Address enumeration completed (out of addresses)");
/* As per API docs: We only care about error if its the first call,
after that the enumerator is done.
Note that we don't care about cancellation errors because
task_completed_or_cancelled() above should handle that.
If this fails and nothing is in progress then we will complete task here.
*/
if ((data->enumerated_at_least_once && !data->connection_attempts && !data->connection_in_progress) ||
!data->enumerated_at_least_once)
{
g_object_unref (data->task);
return;
g_debug ("GSocketClient: Address enumeration failed: %s", error ? error->message : NULL);
if (data->last_error)
{
g_clear_error (&error);
error = data->last_error;
data->last_error = NULL;
}
else if (!error)
{
g_set_error_literal (&error, G_IO_ERROR, G_IO_ERROR_FAILED,
_("Unknown error on connect"));
}
complete_connection_with_error (data, error);
}
g_socket_client_emit_event (data->client, G_SOCKET_CLIENT_COMPLETE, data->connectable, NULL);
data->completed = TRUE;
if (!error)
{
if (data->last_error)
{
error = data->last_error;
data->last_error = NULL;
}
else
{
g_set_error_literal (&error, G_IO_ERROR, G_IO_ERROR_FAILED,
_("Unknown error on connect"));
}
}
g_task_return_error (data->task, error);
/* Enumeration should never trigger again, drop our ref */
g_object_unref (data->task);
return;
}
data->enumerated_at_least_once = TRUE;
g_debug ("GSocketClient: Address enumeration succeeded");
g_socket_client_emit_event (data->client, G_SOCKET_CLIENT_RESOLVED,
data->connectable, NULL);
if (G_IS_PROXY_ADDRESS (address) &&
data->client->priv->enable_proxy)
data->proxy_addr = g_object_ref (G_PROXY_ADDRESS (address));
g_clear_error (&data->last_error);
socket = create_socket (data->client, address, &data->last_error);
@ -1793,6 +1896,10 @@ g_socket_client_enumerator_callback (GObject *object,
attempt->cancellable = g_cancellable_new ();
attempt->connection = (GIOStream *)g_socket_connection_factory_create_connection (socket);
attempt->timeout_source = g_timeout_source_new (HAPPY_EYEBALLS_CONNECTION_ATTEMPT_TIMEOUT_MS);
if (G_IS_PROXY_ADDRESS (address) && data->client->priv->enable_proxy)
attempt->proxy_addr = g_object_ref (G_PROXY_ADDRESS (address));
g_source_set_callback (attempt->timeout_source, on_connection_attempt_timeout, attempt, NULL);
g_source_attach (attempt->timeout_source, g_task_get_context (data->task));
data->connection_attempts = g_slist_append (data->connection_attempts, attempt);
@ -1802,6 +1909,7 @@ g_socket_client_enumerator_callback (GObject *object,
g_object_ref (attempt->cancellable), g_object_unref);
g_socket_connection_set_cached_remote_address ((GSocketConnection *)attempt->connection, address);
g_debug ("GSocketClient: Starting TCP connection attempt");
g_socket_client_emit_event (data->client, G_SOCKET_CLIENT_CONNECTING, data->connectable, attempt->connection);
g_socket_connection_connect_async (G_SOCKET_CONNECTION (attempt->connection),
address,
@ -1854,24 +1962,48 @@ g_socket_client_connect_async (GSocketClient *client,
else
data->enumerator = g_socket_connectable_enumerate (connectable);
/* The flow and ownership here isn't quite obvious:
- The task starts an async attempt to connect.
- Each attempt holds a single ref on task.
- Each attempt may create new attempts by timing out (not a failure) so
there are multiple attempts happening in parallel.
- Upon failure an attempt will start a new attempt that steals its ref
until there are no more attempts left and it drops its ref.
- Upon success it will cancel all other attempts and continue on
to the rest of the connection (tls, proxies, etc) which do not
happen in parallel and at the very end drop its ref.
- Upon cancellation an attempt drops its ref.
*/
/* This function tries to match the behavior of g_socket_client_connect ()
which is simple enough but much of it is done in parallel to be as responsive
as possible as per Happy Eyeballs (RFC 8305). This complicates flow quite a
bit but we can describe it in 3 sections:
Firstly we have address enumeration (DNS):
- This may be triggered multiple times by enumerator_next_async().
- It also has its own cancellable (data->enumeration_cancellable).
- Enumeration is done lazily because GNetworkAddressAddressEnumerator
also does work in parallel and may lazily add new addresses.
- If the first enumeration errors then the task errors. Otherwise all enumerations
will potentially be used (until task or enumeration is cancelled).
Then we start attempting connections (TCP):
- Each connection is independent and kept in a ConnectionAttempt object.
- They each hold a ref on the main task and have their own cancellable.
- Multiple attempts may happen in parallel as per Happy Eyeballs.
- Upon failure or timeouts more connection attempts are made.
- If no connections succeed the task errors.
- Upon success they are kept in a list of successful connections.
Lastly we connect at the application layer (TLS, Proxies):
- These are done in serial.
- The reasoning here is that Happy Eyeballs is about making bad connections responsive
at the IP/TCP layers. Issues at the application layer are generally not due to
connectivity issues but rather misconfiguration.
- Upon failure it will try the next TCP connection until it runs out and
the task errors.
- Upon success it cancels everything remaining (enumeration and connections)
and returns the connection.
*/
data->task = g_task_new (client, cancellable, callback, user_data);
g_task_set_check_cancellable (data->task, FALSE); /* We handle this manually */
g_task_set_source_tag (data->task, g_socket_client_connect_async);
g_task_set_task_data (data->task, data, (GDestroyNotify)g_socket_client_async_connect_data_free);
data->enumeration_cancellable = g_cancellable_new ();
if (cancellable)
g_cancellable_connect (cancellable, G_CALLBACK (on_connection_cancelled),
g_object_ref (data->enumeration_cancellable), g_object_unref);
enumerator_next_async (data, FALSE);
}
@ -1990,6 +2122,7 @@ g_socket_client_connect_to_uri_async (GSocketClient *client,
}
else
{
g_debug("g_socket_client_connect_to_uri_async");
g_socket_client_connect_async (client,
connectable, cancellable,
callback, user_data);