mirror of
https://gitlab.gnome.org/GNOME/glib.git
synced 2025-02-24 11:12:11 +01:00
GDBusConnection: delegate to the worker to close the stream
We can't safely close the output part of the I/O stream until any pending write or flush has been completed. In the worst case, this could lead to an assertion failure in the worker (when the close wins the race) or not closing the stream at all (when the write wins the race). Bug: https://bugzilla.gnome.org/show_bug.cgi?id=651268 Bug-NB: NB#271520 Signed-off-by: Simon McVittie <simon.mcvittie@collabora.co.uk> Signed-off-by: David Zeuthen <davidz@redhat.com>
This commit is contained in:
parent
7bbc7727d4
commit
c9282849b6
@ -514,12 +514,6 @@ g_dbus_connection_finalize (GObject *object)
|
||||
|
||||
if (connection->stream != NULL)
|
||||
{
|
||||
/* We don't really care if closing the stream succeeds or not */
|
||||
g_io_stream_close_async (connection->stream,
|
||||
G_PRIORITY_DEFAULT,
|
||||
NULL, /* GCancellable */
|
||||
NULL, /* GAsyncReadyCallback */
|
||||
NULL); /* userdata */
|
||||
g_object_unref (connection->stream);
|
||||
connection->stream = NULL;
|
||||
}
|
||||
@ -1223,20 +1217,6 @@ set_closed_unlocked (GDBusConnection *connection,
|
||||
|
||||
/* ---------------------------------------------------------------------------------------------------- */
|
||||
|
||||
static void
|
||||
close_in_thread_func (GSimpleAsyncResult *res,
|
||||
GObject *object,
|
||||
GCancellable *cancellable)
|
||||
{
|
||||
GError *error;
|
||||
|
||||
error = NULL;
|
||||
if (!g_dbus_connection_close_sync (G_DBUS_CONNECTION (object),
|
||||
cancellable,
|
||||
&error))
|
||||
g_simple_async_result_take_error (res, error);
|
||||
}
|
||||
|
||||
/**
|
||||
* g_dbus_connection_close:
|
||||
* @connection: A #GDBusConnection.
|
||||
@ -1286,10 +1266,7 @@ g_dbus_connection_close (GDBusConnection *connection,
|
||||
callback,
|
||||
user_data,
|
||||
g_dbus_connection_close);
|
||||
g_simple_async_result_run_in_thread (simple,
|
||||
close_in_thread_func,
|
||||
G_PRIORITY_DEFAULT,
|
||||
cancellable);
|
||||
_g_dbus_worker_close (connection->worker, cancellable, simple);
|
||||
g_object_unref (simple);
|
||||
}
|
||||
|
||||
@ -1330,6 +1307,22 @@ g_dbus_connection_close_finish (GDBusConnection *connection,
|
||||
return ret;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
GMainLoop *loop;
|
||||
GAsyncResult *result;
|
||||
} SyncCloseData;
|
||||
|
||||
static void
|
||||
sync_close_cb (GObject *source_object,
|
||||
GAsyncResult *res,
|
||||
gpointer user_data)
|
||||
{
|
||||
SyncCloseData *data = user_data;
|
||||
|
||||
data->result = g_object_ref (res);
|
||||
g_main_loop_quit (data->loop);
|
||||
}
|
||||
|
||||
/**
|
||||
* g_dbus_connection_close_sync:
|
||||
* @connection: A #GDBusConnection.
|
||||
@ -1360,11 +1353,24 @@ g_dbus_connection_close_sync (GDBusConnection *connection,
|
||||
CONNECTION_LOCK (connection);
|
||||
if (!connection->closed)
|
||||
{
|
||||
ret = g_io_stream_close (connection->stream,
|
||||
cancellable,
|
||||
error);
|
||||
if (ret)
|
||||
set_closed_unlocked (connection, FALSE, NULL);
|
||||
GMainContext *context;
|
||||
SyncCloseData data;
|
||||
|
||||
context = g_main_context_new ();
|
||||
g_main_context_push_thread_default (context);
|
||||
data.loop = g_main_loop_new (context, TRUE);
|
||||
data.result = NULL;
|
||||
|
||||
CONNECTION_UNLOCK (connection);
|
||||
g_dbus_connection_close (connection, cancellable, sync_close_cb, &data);
|
||||
g_main_loop_run (data.loop);
|
||||
ret = g_dbus_connection_close_finish (connection, data.result, error);
|
||||
CONNECTION_LOCK (connection);
|
||||
|
||||
g_object_unref (data.result);
|
||||
g_main_loop_unref (data.loop);
|
||||
g_main_context_pop_thread_default (context);
|
||||
g_main_context_unref (context);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -424,7 +424,7 @@ struct GDBusWorker
|
||||
GSocketControlMessage **read_ancillary_messages;
|
||||
gint read_num_ancillary_messages;
|
||||
|
||||
/* TRUE if an async write or flush is pending.
|
||||
/* TRUE if an async write, flush or close is pending.
|
||||
* Only the worker thread may change its value, and only with the write_lock.
|
||||
* Other threads may read its value when holding the write_lock.
|
||||
* The worker thread may read its value at any time.
|
||||
@ -435,8 +435,12 @@ struct GDBusWorker
|
||||
GQueue *write_queue;
|
||||
guint64 write_num_messages_written;
|
||||
GList *write_pending_flushes;
|
||||
/* list of CloseData */
|
||||
GList *pending_close_attempts;
|
||||
};
|
||||
|
||||
static void _g_dbus_worker_unref (GDBusWorker *worker);
|
||||
|
||||
/* ---------------------------------------------------------------------------------------------------- */
|
||||
|
||||
typedef struct
|
||||
@ -458,6 +462,24 @@ static void read_message_print_transport_debug (gssize bytes_read,
|
||||
static void write_message_print_transport_debug (gssize bytes_written,
|
||||
MessageToWriteData *data);
|
||||
|
||||
typedef struct {
|
||||
GDBusWorker *worker;
|
||||
GCancellable *cancellable;
|
||||
GSimpleAsyncResult *result;
|
||||
} CloseData;
|
||||
|
||||
static void close_data_free (CloseData *close_data)
|
||||
{
|
||||
if (close_data->cancellable != NULL)
|
||||
g_object_unref (close_data->cancellable);
|
||||
|
||||
if (close_data->result != NULL)
|
||||
g_object_unref (close_data->result);
|
||||
|
||||
_g_dbus_worker_unref (close_data->worker);
|
||||
g_slice_free (CloseData, close_data);
|
||||
}
|
||||
|
||||
/* ---------------------------------------------------------------------------------------------------- */
|
||||
|
||||
static GDBusWorker *
|
||||
@ -1116,6 +1138,24 @@ typedef struct
|
||||
GList *flushers;
|
||||
} FlushAsyncData;
|
||||
|
||||
static void
|
||||
flush_data_list_complete (const GList *flushers,
|
||||
const GError *error)
|
||||
{
|
||||
const GList *l;
|
||||
|
||||
for (l = flushers; l != NULL; l = l->next)
|
||||
{
|
||||
FlushData *f = l->data;
|
||||
|
||||
f->error = error != NULL ? g_error_copy (error) : NULL;
|
||||
|
||||
g_mutex_lock (f->mutex);
|
||||
g_cond_signal (f->cond);
|
||||
g_mutex_unlock (f->mutex);
|
||||
}
|
||||
}
|
||||
|
||||
/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
|
||||
static void
|
||||
ostream_flush_cb (GObject *source_object,
|
||||
@ -1124,7 +1164,6 @@ ostream_flush_cb (GObject *source_object,
|
||||
{
|
||||
FlushAsyncData *data = user_data;
|
||||
GError *error;
|
||||
GList *l;
|
||||
|
||||
error = NULL;
|
||||
g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
|
||||
@ -1145,16 +1184,7 @@ ostream_flush_cb (GObject *source_object,
|
||||
}
|
||||
|
||||
g_assert (data->flushers != NULL);
|
||||
for (l = data->flushers; l != NULL; l = l->next)
|
||||
{
|
||||
FlushData *f = l->data;
|
||||
|
||||
f->error = error != NULL ? g_error_copy (error) : NULL;
|
||||
|
||||
g_mutex_lock (f->mutex);
|
||||
g_cond_signal (f->cond);
|
||||
g_mutex_unlock (f->mutex);
|
||||
}
|
||||
flush_data_list_complete (data->flushers, error);
|
||||
g_list_free (data->flushers);
|
||||
|
||||
if (error != NULL)
|
||||
@ -1277,6 +1307,76 @@ write_message_cb (GObject *source_object,
|
||||
message_to_write_data_free (data);
|
||||
}
|
||||
|
||||
/* called in private thread shared by all GDBusConnection instances
|
||||
*
|
||||
* write-lock is not held on entry
|
||||
* output_pending is true on entry
|
||||
*/
|
||||
static void
|
||||
iostream_close_cb (GObject *source_object,
|
||||
GAsyncResult *res,
|
||||
gpointer user_data)
|
||||
{
|
||||
GDBusWorker *worker = user_data;
|
||||
GError *error = NULL;
|
||||
GList *pending_close_attempts, *pending_flush_attempts;
|
||||
GQueue *send_queue;
|
||||
|
||||
g_io_stream_close_finish (worker->stream, res, &error);
|
||||
|
||||
g_mutex_lock (worker->write_lock);
|
||||
|
||||
pending_close_attempts = worker->pending_close_attempts;
|
||||
worker->pending_close_attempts = NULL;
|
||||
|
||||
pending_flush_attempts = worker->write_pending_flushes;
|
||||
worker->write_pending_flushes = NULL;
|
||||
|
||||
send_queue = worker->write_queue;
|
||||
worker->write_queue = g_queue_new ();
|
||||
|
||||
g_assert (worker->output_pending);
|
||||
worker->output_pending = FALSE;
|
||||
|
||||
g_mutex_unlock (worker->write_lock);
|
||||
|
||||
while (pending_close_attempts != NULL)
|
||||
{
|
||||
CloseData *close_data = pending_close_attempts->data;
|
||||
|
||||
pending_close_attempts = g_list_delete_link (pending_close_attempts,
|
||||
pending_close_attempts);
|
||||
|
||||
if (close_data->result != NULL)
|
||||
{
|
||||
if (error != NULL)
|
||||
g_simple_async_result_set_from_error (close_data->result, error);
|
||||
|
||||
/* this must be in an idle because the result is likely to be
|
||||
* intended for another thread
|
||||
*/
|
||||
g_simple_async_result_complete_in_idle (close_data->result);
|
||||
}
|
||||
|
||||
close_data_free (close_data);
|
||||
}
|
||||
|
||||
g_clear_error (&error);
|
||||
|
||||
/* all messages queued for sending are discarded */
|
||||
g_queue_foreach (send_queue, (GFunc) message_to_write_data_free, NULL);
|
||||
g_queue_free (send_queue);
|
||||
|
||||
/* all queued flushes fail */
|
||||
error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED,
|
||||
_("Operation was cancelled"));
|
||||
flush_data_list_complete (pending_flush_attempts, error);
|
||||
g_list_free (pending_flush_attempts);
|
||||
g_clear_error (&error);
|
||||
|
||||
_g_dbus_worker_unref (worker);
|
||||
}
|
||||
|
||||
/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
|
||||
static void
|
||||
maybe_write_next_message (GDBusWorker *worker)
|
||||
@ -1288,9 +1388,25 @@ maybe_write_next_message (GDBusWorker *worker)
|
||||
g_assert (!worker->output_pending);
|
||||
|
||||
g_mutex_lock (worker->write_lock);
|
||||
|
||||
/* if we want to close the connection, that takes precedence */
|
||||
if (worker->pending_close_attempts != NULL)
|
||||
{
|
||||
worker->output_pending = TRUE;
|
||||
|
||||
g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT,
|
||||
NULL, iostream_close_cb,
|
||||
_g_dbus_worker_ref (worker));
|
||||
data = NULL;
|
||||
}
|
||||
else
|
||||
{
|
||||
data = g_queue_pop_head (worker->write_queue);
|
||||
|
||||
if (data != NULL)
|
||||
worker->output_pending = TRUE;
|
||||
}
|
||||
|
||||
g_mutex_unlock (worker->write_lock);
|
||||
|
||||
/* Note that write_lock is only used for protecting the @write_queue
|
||||
@ -1371,6 +1487,46 @@ write_message_in_idle_cb (gpointer user_data)
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
/*
|
||||
* schedule_write_in_worker_thread:
|
||||
* @write_data: (transfer full) (allow-none):
|
||||
* @close_data: (transfer full) (allow-none):
|
||||
*
|
||||
* Can be called from any thread
|
||||
*
|
||||
* write_lock is not held on entry
|
||||
* output_pending may be true or false
|
||||
*/
|
||||
static void
|
||||
schedule_write_in_worker_thread (GDBusWorker *worker,
|
||||
MessageToWriteData *write_data,
|
||||
CloseData *close_data)
|
||||
{
|
||||
g_mutex_lock (worker->write_lock);
|
||||
|
||||
if (write_data != NULL)
|
||||
g_queue_push_tail (worker->write_queue, write_data);
|
||||
|
||||
if (close_data != NULL)
|
||||
worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts,
|
||||
close_data);
|
||||
|
||||
if (!worker->output_pending)
|
||||
{
|
||||
GSource *idle_source;
|
||||
idle_source = g_idle_source_new ();
|
||||
g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
|
||||
g_source_set_callback (idle_source,
|
||||
write_message_in_idle_cb,
|
||||
_g_dbus_worker_ref (worker),
|
||||
(GDestroyNotify) _g_dbus_worker_unref);
|
||||
g_source_attach (idle_source, shared_thread_data->context);
|
||||
g_source_unref (idle_source);
|
||||
}
|
||||
|
||||
g_mutex_unlock (worker->write_lock);
|
||||
}
|
||||
|
||||
/* ---------------------------------------------------------------------------------------------------- */
|
||||
|
||||
/* can be called from any thread - steals blob */
|
||||
@ -1392,21 +1548,7 @@ _g_dbus_worker_send_message (GDBusWorker *worker,
|
||||
data->blob = blob; /* steal! */
|
||||
data->blob_size = blob_len;
|
||||
|
||||
g_mutex_lock (worker->write_lock);
|
||||
g_queue_push_tail (worker->write_queue, data);
|
||||
if (!worker->output_pending)
|
||||
{
|
||||
GSource *idle_source;
|
||||
idle_source = g_idle_source_new ();
|
||||
g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
|
||||
g_source_set_callback (idle_source,
|
||||
write_message_in_idle_cb,
|
||||
_g_dbus_worker_ref (worker),
|
||||
(GDestroyNotify) _g_dbus_worker_unref);
|
||||
g_source_attach (idle_source, shared_thread_data->context);
|
||||
g_source_unref (idle_source);
|
||||
}
|
||||
g_mutex_unlock (worker->write_lock);
|
||||
schedule_write_in_worker_thread (worker, data, NULL);
|
||||
}
|
||||
|
||||
/* ---------------------------------------------------------------------------------------------------- */
|
||||
@ -1467,6 +1609,28 @@ _g_dbus_worker_new (GIOStream *stream,
|
||||
|
||||
/* ---------------------------------------------------------------------------------------------------- */
|
||||
|
||||
/* can be called from any thread
|
||||
*
|
||||
* write_lock is not held on entry
|
||||
* output_pending may be true or false
|
||||
*/
|
||||
void
|
||||
_g_dbus_worker_close (GDBusWorker *worker,
|
||||
GCancellable *cancellable,
|
||||
GSimpleAsyncResult *result)
|
||||
{
|
||||
CloseData *close_data;
|
||||
|
||||
close_data = g_slice_new0 (CloseData);
|
||||
close_data->worker = _g_dbus_worker_ref (worker);
|
||||
close_data->cancellable =
|
||||
(cancellable == NULL ? NULL : g_object_ref (cancellable));
|
||||
close_data->result = (result == NULL ? NULL : g_object_ref (result));
|
||||
|
||||
g_cancellable_cancel (worker->cancellable);
|
||||
schedule_write_in_worker_thread (worker, NULL, close_data);
|
||||
}
|
||||
|
||||
/* This can be called from any thread - frees worker. Note that
|
||||
* callbacks might still happen if called from another thread than the
|
||||
* worker - use your own synchronization primitive in the callbacks.
|
||||
@ -1475,7 +1639,12 @@ void
|
||||
_g_dbus_worker_stop (GDBusWorker *worker)
|
||||
{
|
||||
worker->stopped = TRUE;
|
||||
g_cancellable_cancel (worker->cancellable);
|
||||
|
||||
/* cancel any pending operations and schedule a close of the underlying I/O
|
||||
* stream in the worker thread
|
||||
*/
|
||||
_g_dbus_worker_close (worker, NULL, NULL);
|
||||
|
||||
_g_dbus_worker_unref (worker);
|
||||
}
|
||||
|
||||
|
@ -76,6 +76,11 @@ gboolean _g_dbus_worker_flush_sync (GDBusWorker *worker,
|
||||
GCancellable *cancellable,
|
||||
GError **error);
|
||||
|
||||
/* can be called from any thread */
|
||||
void _g_dbus_worker_close (GDBusWorker *worker,
|
||||
GCancellable *cancellable,
|
||||
GSimpleAsyncResult *result);
|
||||
|
||||
/* ---------------------------------------------------------------------------------------------------- */
|
||||
|
||||
void _g_dbus_initialize (void);
|
||||
|
Loading…
x
Reference in New Issue
Block a user