gsocketclient: Fix criticals

This ensures the parent GTask is kept alive as long as an enumeration
is running and trying to connect.

Closes #1646
Closes #1649
This commit is contained in:
Patrick Griffis 2019-01-29 10:07:06 -05:00 committed by Patrick Griffis
parent 140b82083f
commit d553d92d6e
2 changed files with 107 additions and 24 deletions

View File

@ -1327,7 +1327,7 @@ g_socket_client_connect_to_uri (GSocketClient *client,
typedef struct typedef struct
{ {
GTask *task; GTask *task; /* unowned */
GSocketClient *client; GSocketClient *client;
GSocketConnectable *connectable; GSocketConnectable *connectable;
@ -1345,6 +1345,7 @@ static void connection_attempt_unref (gpointer attempt);
static void static void
g_socket_client_async_connect_data_free (GSocketClientAsyncConnectData *data) g_socket_client_async_connect_data_free (GSocketClientAsyncConnectData *data)
{ {
data->task = NULL;
g_clear_object (&data->connectable); g_clear_object (&data->connectable);
g_clear_object (&data->enumerator); g_clear_object (&data->enumerator);
g_clear_object (&data->proxy_addr); g_clear_object (&data->proxy_addr);
@ -1444,13 +1445,19 @@ set_last_error (GSocketClientAsyncConnectData *data,
} }
static void static void
enumerator_next_async (GSocketClientAsyncConnectData *data) enumerator_next_async (GSocketClientAsyncConnectData *data,
gboolean add_task_ref)
{ {
/* We need to cleanup the state */ /* We need to cleanup the state */
g_clear_object (&data->socket); g_clear_object (&data->socket);
g_clear_object (&data->proxy_addr); g_clear_object (&data->proxy_addr);
g_clear_object (&data->connection); g_clear_object (&data->connection);
/* Each enumeration takes a ref. This arg just avoids repeated unrefs when
an enumeration starts another enumeration */
if (add_task_ref)
g_object_ref (data->task);
g_socket_client_emit_event (data->client, G_SOCKET_CLIENT_RESOLVING, data->connectable, NULL); g_socket_client_emit_event (data->client, G_SOCKET_CLIENT_RESOLVING, data->connectable, NULL);
g_socket_address_enumerator_next_async (data->enumerator, g_socket_address_enumerator_next_async (data->enumerator,
g_task_get_cancellable (data->task), g_task_get_cancellable (data->task),
@ -1478,7 +1485,7 @@ g_socket_client_tls_handshake_callback (GObject *object,
else else
{ {
g_object_unref (object); g_object_unref (object);
enumerator_next_async (data); enumerator_next_async (data, FALSE);
} }
} }
@ -1509,7 +1516,7 @@ g_socket_client_tls_handshake (GSocketClientAsyncConnectData *data)
} }
else else
{ {
enumerator_next_async (data); enumerator_next_async (data, FALSE);
} }
} }
@ -1530,13 +1537,24 @@ g_socket_client_proxy_connect_callback (GObject *object,
} }
else else
{ {
enumerator_next_async (data); enumerator_next_async (data, FALSE);
return; return;
} }
g_socket_client_tls_handshake (data); g_socket_client_tls_handshake (data);
} }
static gboolean
task_completed_or_cancelled (GTask *task)
{
if (g_task_get_completed (task))
return TRUE;
else if (g_task_return_error_if_cancelled (task))
return TRUE;
else
return FALSE;
}
static void static void
g_socket_client_connected_callback (GObject *source, g_socket_client_connected_callback (GObject *source,
GAsyncResult *result, GAsyncResult *result,
@ -1549,8 +1567,7 @@ g_socket_client_connected_callback (GObject *source,
GProxy *proxy; GProxy *proxy;
const gchar *protocol; const gchar *protocol;
/* data is NULL once the task is completed */ if (g_cancellable_is_cancelled (attempt->cancellable) || task_completed_or_cancelled (data->task))
if (data && g_task_return_error_if_cancelled (data->task))
{ {
g_object_unref (data->task); g_object_unref (data->task);
connection_attempt_unref (attempt); connection_attempt_unref (attempt);
@ -1570,17 +1587,15 @@ g_socket_client_connected_callback (GObject *source,
{ {
clarify_connect_error (error, data->connectable, attempt->address); clarify_connect_error (error, data->connectable, attempt->address);
set_last_error (data, error); set_last_error (data, error);
}
else
g_clear_error (&error);
if (data)
{
connection_attempt_remove (attempt); connection_attempt_remove (attempt);
enumerator_next_async (data); enumerator_next_async (data, FALSE);
} }
else else
connection_attempt_unref (attempt); {
g_clear_error (&error);
g_object_unref (data->task);
connection_attempt_unref (attempt);
}
return; return;
} }
@ -1592,7 +1607,6 @@ g_socket_client_connected_callback (GObject *source,
{ {
ConnectionAttempt *attempt_entry = l->data; ConnectionAttempt *attempt_entry = l->data;
g_cancellable_cancel (attempt_entry->cancellable); g_cancellable_cancel (attempt_entry->cancellable);
attempt_entry->data = NULL;
connection_attempt_unref (attempt_entry); connection_attempt_unref (attempt_entry);
} }
g_slist_free (data->connection_attempts); g_slist_free (data->connection_attempts);
@ -1625,7 +1639,7 @@ g_socket_client_connected_callback (GObject *source,
G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
_("Proxying over a non-TCP connection is not supported.")); _("Proxying over a non-TCP connection is not supported."));
enumerator_next_async (data); enumerator_next_async (data, FALSE);
} }
else if (g_hash_table_contains (data->client->priv->app_proxies, protocol)) else if (g_hash_table_contains (data->client->priv->app_proxies, protocol))
{ {
@ -1652,7 +1666,7 @@ g_socket_client_connected_callback (GObject *source,
_("Proxy protocol “%s” is not supported."), _("Proxy protocol “%s” is not supported."),
protocol); protocol);
enumerator_next_async (data); enumerator_next_async (data, FALSE);
} }
} }
@ -1661,7 +1675,7 @@ on_connection_attempt_timeout (gpointer data)
{ {
ConnectionAttempt *attempt = data; ConnectionAttempt *attempt = data;
enumerator_next_async (attempt->data); enumerator_next_async (attempt->data, TRUE);
g_clear_pointer (&attempt->timeout_source, g_source_unref); g_clear_pointer (&attempt->timeout_source, g_source_unref);
return G_SOURCE_REMOVE; return G_SOURCE_REMOVE;
@ -1687,7 +1701,7 @@ g_socket_client_enumerator_callback (GObject *object,
ConnectionAttempt *attempt; ConnectionAttempt *attempt;
GError *error = NULL; GError *error = NULL;
if (g_task_return_error_if_cancelled (data->task)) if (task_completed_or_cancelled (data->task))
{ {
g_object_unref (data->task); g_object_unref (data->task);
return; return;
@ -1698,7 +1712,10 @@ g_socket_client_enumerator_callback (GObject *object,
if (address == NULL) if (address == NULL)
{ {
if (data->connection_attempts) if (data->connection_attempts)
return; {
g_object_unref (data->task);
return;
}
g_socket_client_emit_event (data->client, G_SOCKET_CLIENT_COMPLETE, data->connectable, NULL); g_socket_client_emit_event (data->client, G_SOCKET_CLIENT_COMPLETE, data->connectable, NULL);
if (!error) if (!error)
@ -1732,7 +1749,7 @@ g_socket_client_enumerator_callback (GObject *object,
if (socket == NULL) if (socket == NULL)
{ {
g_object_unref (address); g_object_unref (address);
enumerator_next_async (data); enumerator_next_async (data, FALSE);
return; return;
} }
@ -1804,11 +1821,24 @@ g_socket_client_connect_async (GSocketClient *client,
else else
data->enumerator = g_socket_connectable_enumerate (connectable); data->enumerator = g_socket_connectable_enumerate (connectable);
/* The flow and ownership here isn't quite obvious:
- The task starts an async attempt to connect.
- Each attempt holds a single ref on task.
- Each attempt may create new attempts by timing out (not a failure) so
there are multiple attempts happening in parallel.
- Upon failure an attempt will start a new attempt that steals its ref
until there are no more attempts left and it drops its ref.
- Upon success it will cancel all other attempts and continue on
to the rest of the connection (tls, proxies, etc) which do not
happen in parallel and at the very end drop its ref.
- Upon cancellation an attempt drops its ref.
*/
data->task = g_task_new (client, cancellable, callback, user_data); data->task = g_task_new (client, cancellable, callback, user_data);
g_task_set_source_tag (data->task, g_socket_client_connect_async); g_task_set_source_tag (data->task, g_socket_client_connect_async);
g_task_set_task_data (data->task, data, (GDestroyNotify)g_socket_client_async_connect_data_free); g_task_set_task_data (data->task, data, (GDestroyNotify)g_socket_client_async_connect_data_free);
enumerator_next_async (data); enumerator_next_async (data, FALSE);
} }
/** /**

View File

@ -63,12 +63,65 @@ test_happy_eyeballs (void)
g_object_unref (client); g_object_unref (client);
} }
static void
on_connected_cancelled (GObject *source_object,
GAsyncResult *result,
gpointer user_data)
{
GSocketConnection *conn;
GError *error = NULL;
conn = g_socket_client_connect_to_uri_finish (G_SOCKET_CLIENT (source_object), result, &error);
g_assert_error (error, G_IO_ERROR, G_IO_ERROR_CANCELLED);
g_assert_null (conn);
g_error_free (error);
g_main_loop_quit (user_data);
}
static int
on_timer (GCancellable *cancel)
{
g_cancellable_cancel (cancel);
return G_SOURCE_REMOVE;
}
static void
test_happy_eyeballs_cancel (void)
{
GSocketClient *client;
GSocketService *service;
GError *error = NULL;
guint16 port;
GMainLoop *loop;
GCancellable *cancel;
loop = g_main_loop_new (NULL, FALSE);
service = g_socket_service_new ();
port = g_socket_listener_add_any_inet_port (G_SOCKET_LISTENER (service), NULL, &error);
g_assert_no_error (error);
g_socket_service_start (service);
client = g_socket_client_new ();
cancel = g_cancellable_new ();
g_socket_client_connect_to_host_async (client, "localhost", port, cancel, on_connected_cancelled, loop);
g_timeout_add (1, (GSourceFunc) on_timer, cancel);
g_main_loop_run (loop);
g_main_loop_unref (loop);
g_object_unref (service);
g_object_unref (client);
g_object_unref (cancel);
}
int int
main (int argc, char *argv[]) main (int argc, char *argv[])
{ {
g_test_init (&argc, &argv, NULL); g_test_init (&argc, &argv, NULL);
g_test_add_func ("/socket-client/happy-eyeballs", test_happy_eyeballs); g_test_add_func ("/socket-client/happy-eyeballs/slow", test_happy_eyeballs);
g_test_add_func ("/socket-client/happy-eyeballs/cancellation", test_happy_eyeballs_cancel);
return g_test_run (); return g_test_run ();
} }