gio: port networking classes from GSimpleAsyncResult to GTask

https://bugzilla.gnome.org/show_bug.cgi?id=661767
This commit is contained in:
Dan Winship
2012-08-02 15:48:22 -04:00
parent 130d0fdac0
commit d21309464c
18 changed files with 568 additions and 867 deletions

View File

@@ -39,7 +39,7 @@
#include "goutputstream.h"
#include "gproxy.h"
#include "gproxyaddress.h"
#include "gsimpleasyncresult.h"
#include "gtask.h"
#define SOCKS5_VERSION 0x05
@@ -538,7 +538,6 @@ error:
typedef struct
{
GSimpleAsyncResult *simple;
GIOStream *io_stream;
gchar *hostname;
guint16 port;
@@ -547,7 +546,6 @@ typedef struct
guint8 *buffer;
gssize length;
gssize offset;
GCancellable *cancellable;
} ConnectAsyncData;
static void nego_msg_write_cb (GObject *source,
@@ -562,7 +560,7 @@ static void auth_msg_write_cb (GObject *source,
static void auth_reply_read_cb (GObject *source,
GAsyncResult *result,
gpointer user_data);
static void send_connect_msg (ConnectAsyncData *data);
static void send_connect_msg (GTask *task);
static void connect_msg_write_cb (GObject *source,
GAsyncResult *result,
gpointer user_data);
@@ -579,52 +577,40 @@ static void connect_addr_read_cb (GObject *source,
static void
free_connect_data (ConnectAsyncData *data)
{
if (data->io_stream)
g_object_unref (data->io_stream);
g_object_unref (data->io_stream);
g_free (data->hostname);
g_free (data->username);
g_free (data->password);
g_free (data->buffer);
if (data->cancellable)
g_object_unref (data->cancellable);
g_slice_free (ConnectAsyncData, data);
}
static void
complete_async_from_error (ConnectAsyncData *data, GError *error)
{
GSimpleAsyncResult *simple = data->simple;
g_simple_async_result_take_error (data->simple, error);
g_simple_async_result_set_op_res_gpointer (simple, NULL, NULL);
g_simple_async_result_complete (simple);
g_object_unref (simple);
}
static void
do_read (GAsyncReadyCallback callback, ConnectAsyncData *data)
do_read (GAsyncReadyCallback callback, GTask *task, ConnectAsyncData *data)
{
GInputStream *in;
in = g_io_stream_get_input_stream (data->io_stream);
g_input_stream_read_async (in,
data->buffer + data->offset,
data->length - data->offset,
G_PRIORITY_DEFAULT, data->cancellable,
callback, data);
g_task_get_priority (task),
g_task_get_cancellable (task),
callback, task);
}
static void
do_write (GAsyncReadyCallback callback, ConnectAsyncData *data)
do_write (GAsyncReadyCallback callback, GTask *task, ConnectAsyncData *data)
{
GOutputStream *out;
out = g_io_stream_get_output_stream (data->io_stream);
g_output_stream_write_async (out,
data->buffer + data->offset,
data->length - data->offset,
G_PRIORITY_DEFAULT, data->cancellable,
callback, data);
g_task_get_priority (task),
g_task_get_cancellable (task),
callback, task);
}
static void
@@ -635,20 +621,14 @@ g_socks5_proxy_connect_async (GProxy *proxy,
GAsyncReadyCallback callback,
gpointer user_data)
{
GSimpleAsyncResult *simple;
GTask *task;
ConnectAsyncData *data;
simple = g_simple_async_result_new (G_OBJECT (proxy),
callback, user_data,
g_socks5_proxy_connect_async);
data = g_slice_new0 (ConnectAsyncData);
data->simple = simple;
data->io_stream = g_object_ref (io_stream);
if (cancellable)
data->cancellable = g_object_ref (cancellable);
task = g_task_new (proxy, cancellable, callback, user_data);
g_task_set_task_data (task, data, (GDestroyNotify) free_connect_data);
g_object_get (G_OBJECT (proxy_address),
"destination-hostname", &data->hostname,
@@ -657,15 +637,12 @@ g_socks5_proxy_connect_async (GProxy *proxy,
"password", &data->password,
NULL);
g_simple_async_result_set_op_res_gpointer (simple, data,
(GDestroyNotify) free_connect_data);
data->buffer = g_malloc0 (SOCKS5_NEGO_MSG_LEN);
data->length = set_nego_msg (data->buffer,
data->username || data->password);
data->offset = 0;
do_write (nego_msg_write_cb, data);
do_write (nego_msg_write_cb, task, data);
}
@@ -674,8 +651,9 @@ nego_msg_write_cb (GObject *source,
GAsyncResult *res,
gpointer user_data)
{
GTask *task = user_data;
ConnectAsyncData *data = g_task_get_task_data (task);
GError *error = NULL;
ConnectAsyncData *data = user_data;
gssize written;
written = g_output_stream_write_finish (G_OUTPUT_STREAM (source),
@@ -683,7 +661,8 @@ nego_msg_write_cb (GObject *source,
if (written < 0)
{
complete_async_from_error (data, error);
g_task_return_error (task, error);
g_object_unref (task);
return;
}
@@ -697,11 +676,11 @@ nego_msg_write_cb (GObject *source,
data->length = SOCKS5_NEGO_REP_LEN;
data->offset = 0;
do_read (nego_reply_read_cb, data);
do_read (nego_reply_read_cb, task, data);
}
else
{
do_write (nego_msg_write_cb, data);
do_write (nego_msg_write_cb, task, data);
}
}
@@ -710,8 +689,9 @@ nego_reply_read_cb (GObject *source,
GAsyncResult *res,
gpointer user_data)
{
GTask *task = user_data;
ConnectAsyncData *data = g_task_get_task_data (task);
GError *error = NULL;
ConnectAsyncData *data = user_data;
gssize read;
read = g_input_stream_read_finish (G_INPUT_STREAM (source),
@@ -719,7 +699,8 @@ nego_reply_read_cb (GObject *source,
if (read < 0)
{
complete_async_from_error (data, error);
g_task_return_error (task, error);
g_object_unref (task);
return;
}
@@ -733,7 +714,8 @@ nego_reply_read_cb (GObject *source,
if (!parse_nego_reply (data->buffer, has_auth, &must_auth, &error))
{
complete_async_from_error (data, error);
g_task_return_error (task, error);
g_object_unref (task);
return;
}
@@ -750,20 +732,21 @@ nego_reply_read_cb (GObject *source,
if (data->length < 0)
{
complete_async_from_error (data, error);
g_task_return_error (task, error);
g_object_unref (task);
return;
}
do_write (auth_msg_write_cb, data);
do_write (auth_msg_write_cb, task, data);
}
else
{
send_connect_msg (data);
send_connect_msg (task);
}
}
else
{
do_read (nego_reply_read_cb, data);
do_read (nego_reply_read_cb, task, data);
}
}
@@ -772,8 +755,9 @@ auth_msg_write_cb (GObject *source,
GAsyncResult *result,
gpointer user_data)
{
GTask *task = user_data;
ConnectAsyncData *data = g_task_get_task_data (task);
GError *error = NULL;
ConnectAsyncData *data = user_data;
gssize written;
written = g_output_stream_write_finish (G_OUTPUT_STREAM (source),
@@ -781,7 +765,8 @@ auth_msg_write_cb (GObject *source,
if (written < 0)
{
complete_async_from_error (data, error);
g_task_return_error (task, error);
g_object_unref (task);
return;
}
@@ -795,11 +780,11 @@ auth_msg_write_cb (GObject *source,
data->length = SOCKS5_NEGO_REP_LEN;
data->offset = 0;
do_read (auth_reply_read_cb, data);
do_read (auth_reply_read_cb, task, data);
}
else
{
do_write (auth_msg_write_cb, data);
do_write (auth_msg_write_cb, task, data);
}
}
@@ -808,8 +793,9 @@ auth_reply_read_cb (GObject *source,
GAsyncResult *result,
gpointer user_data)
{
GTask *task = user_data;
ConnectAsyncData *data = g_task_get_task_data (task);
GError *error = NULL;
ConnectAsyncData *data = user_data;
gssize read;
read = g_input_stream_read_finish (G_INPUT_STREAM (source),
@@ -817,7 +803,8 @@ auth_reply_read_cb (GObject *source,
if (read < 0)
{
complete_async_from_error (data, error);
g_task_return_error (task, error);
g_object_unref (task);
return;
}
@@ -827,21 +814,23 @@ auth_reply_read_cb (GObject *source,
{
if (!check_auth_status (data->buffer, &error))
{
complete_async_from_error (data, error);
g_task_return_error (task, error);
g_object_unref (task);
return;
}
send_connect_msg (data);
send_connect_msg (task);
}
else
{
do_read (auth_reply_read_cb, data);
do_read (auth_reply_read_cb, task, data);
}
}
static void
send_connect_msg (ConnectAsyncData *data)
send_connect_msg (GTask *task)
{
ConnectAsyncData *data = g_task_get_task_data (task);
GError *error = NULL;
g_free (data->buffer);
@@ -855,11 +844,12 @@ send_connect_msg (ConnectAsyncData *data)
if (data->length < 0)
{
complete_async_from_error (data, error);
g_task_return_error (task, error);
g_object_unref (task);
return;
}
do_write (connect_msg_write_cb, data);
do_write (connect_msg_write_cb, task, data);
}
static void
@@ -867,8 +857,9 @@ connect_msg_write_cb (GObject *source,
GAsyncResult *result,
gpointer user_data)
{
GTask *task = user_data;
ConnectAsyncData *data = g_task_get_task_data (task);
GError *error = NULL;
ConnectAsyncData *data = user_data;
gssize written;
written = g_output_stream_write_finish (G_OUTPUT_STREAM (source),
@@ -876,7 +867,8 @@ connect_msg_write_cb (GObject *source,
if (written < 0)
{
complete_async_from_error (data, error);
g_task_return_error (task, error);
g_object_unref (task);
return;
}
@@ -890,11 +882,11 @@ connect_msg_write_cb (GObject *source,
data->length = 4;
data->offset = 0;
do_read (connect_reply_read_cb, data);
do_read (connect_reply_read_cb, task, data);
}
else
{
do_write (connect_msg_write_cb, data);
do_write (connect_msg_write_cb, task, data);
}
}
@@ -903,8 +895,9 @@ connect_reply_read_cb (GObject *source,
GAsyncResult *result,
gpointer user_data)
{
GTask *task = user_data;
ConnectAsyncData *data = g_task_get_task_data (task);
GError *error = NULL;
ConnectAsyncData *data = user_data;
gssize read;
read = g_input_stream_read_finish (G_INPUT_STREAM (source),
@@ -912,7 +905,8 @@ connect_reply_read_cb (GObject *source,
if (read < 0)
{
complete_async_from_error (data, error);
g_task_return_error (task, error);
g_object_unref (task);
return;
}
@@ -924,7 +918,8 @@ connect_reply_read_cb (GObject *source,
if (!parse_connect_reply (data->buffer, &atype, &error))
{
complete_async_from_error (data, error);
g_task_return_error (task, error);
g_object_unref (task);
return;
}
@@ -933,25 +928,25 @@ connect_reply_read_cb (GObject *source,
case SOCKS5_ATYP_IPV4:
data->length = 6;
data->offset = 0;
do_read (connect_addr_read_cb, data);
do_read (connect_addr_read_cb, task, data);
break;
case SOCKS5_ATYP_IPV6:
data->length = 18;
data->offset = 0;
do_read (connect_addr_read_cb, data);
do_read (connect_addr_read_cb, task, data);
break;
case SOCKS5_ATYP_DOMAINNAME:
data->length = 1;
data->offset = 0;
do_read (connect_addr_len_read_cb, data);
do_read (connect_addr_len_read_cb, task, data);
break;
}
}
else
{
do_read (connect_reply_read_cb, data);
do_read (connect_reply_read_cb, task, data);
}
}
@@ -960,8 +955,9 @@ connect_addr_len_read_cb (GObject *source,
GAsyncResult *result,
gpointer user_data)
{
GTask *task = user_data;
ConnectAsyncData *data = g_task_get_task_data (task);
GError *error = NULL;
ConnectAsyncData *data = user_data;
gssize read;
read = g_input_stream_read_finish (G_INPUT_STREAM (source),
@@ -969,14 +965,15 @@ connect_addr_len_read_cb (GObject *source,
if (read < 0)
{
complete_async_from_error (data, error);
g_task_return_error (task, error);
g_object_unref (task);
return;
}
data->length = data->buffer[0] + 2;
data->offset = 0;
do_read (connect_addr_read_cb, data);
do_read (connect_addr_read_cb, task, data);
}
static void
@@ -984,8 +981,9 @@ connect_addr_read_cb (GObject *source,
GAsyncResult *result,
gpointer user_data)
{
GTask *task = user_data;
ConnectAsyncData *data = g_task_get_task_data (task);
GError *error = NULL;
ConnectAsyncData *data = user_data;
gssize read;
read = g_input_stream_read_finish (G_INPUT_STREAM (source),
@@ -993,7 +991,8 @@ connect_addr_read_cb (GObject *source,
if (read < 0)
{
complete_async_from_error (data, error);
g_task_return_error (task, error);
g_object_unref (task);
return;
}
@@ -1001,13 +1000,13 @@ connect_addr_read_cb (GObject *source,
if (data->offset == data->length)
{
GSimpleAsyncResult *simple = data->simple;
g_simple_async_result_complete (simple);
g_object_unref (simple);
g_task_return_pointer (task, g_object_ref (data->io_stream), g_object_unref);
g_object_unref (task);
return;
}
else
{
do_read (connect_reply_read_cb, data);
do_read (connect_reply_read_cb, task, data);
}
}
@@ -1016,13 +1015,7 @@ g_socks5_proxy_connect_finish (GProxy *proxy,
GAsyncResult *result,
GError **error)
{
GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
ConnectAsyncData *data = g_simple_async_result_get_op_res_gpointer (simple);
if (g_simple_async_result_propagate_error (simple, error))
return NULL;
return g_object_ref (data->io_stream);
return g_task_propagate_pointer (G_TASK (result), error);
}
static gboolean