GDBusWorker: distinguish between 3 sorts of output that might be pending

If the user calls flush_sync() with no messages in the queue, but an
async write call pending, then we ought to flush after that async write
returns (although we don't currently do that). If it was an async close
or flush that was pending, there's no need to flush (again) afterwards.
So, we need to distinguish.

Bug: https://bugzilla.gnome.org/show_bug.cgi?id=662395
Signed-off-by: Simon McVittie <simon.mcvittie@collabora.co.uk>
Reviewed-by: Cosimo Alfarano <cosimo.alfarano@collabora.co.uk>
This commit is contained in:
Simon McVittie 2011-11-21 17:14:55 +00:00
parent a795e563df
commit 18482ab17e

View File

@ -331,6 +331,13 @@ _g_dbus_shared_thread_unref (SharedThreadData *data)
/* ---------------------------------------------------------------------------------------------------- */
typedef enum {
PENDING_NONE = 0,
PENDING_WRITE,
PENDING_FLUSH,
PENDING_CLOSE
} OutputPending;
struct GDBusWorker
{
volatile gint ref_count;
@ -368,12 +375,12 @@ struct GDBusWorker
GSocketControlMessage **read_ancillary_messages;
gint read_num_ancillary_messages;
/* TRUE if an async write, flush or close is pending.
/* Whether an async write, flush or close, or none of those, is pending.
* Only the worker thread may change its value, and only with the write_lock.
* Other threads may read its value when holding the write_lock.
* The worker thread may read its value at any time.
*/
gboolean output_pending;
OutputPending output_pending;
/* used for writing */
GMutex write_lock;
/* queue of MessageToWriteData, protected by write_lock */
@ -903,7 +910,7 @@ static void write_message_continue_writing (MessageToWriteData *data);
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
* output_pending is true on entry
* output_pending is PENDING_WRITE on entry
*/
static void
write_message_async_cb (GObject *source_object,
@ -953,7 +960,7 @@ write_message_async_cb (GObject *source_object,
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
* output_pending is true on entry
* output_pending is PENDING_WRITE on entry
*/
static gboolean
on_socket_ready (GSocket *socket,
@ -968,7 +975,7 @@ on_socket_ready (GSocket *socket,
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
* output_pending is true on entry
* output_pending is PENDING_WRITE on entry
*/
static void
write_message_continue_writing (MessageToWriteData *data)
@ -1105,7 +1112,7 @@ write_message_continue_writing (MessageToWriteData *data)
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
* output_pending is true on entry
* output_pending is PENDING_WRITE on entry
*/
static void
write_message_async (GDBusWorker *worker,
@ -1163,7 +1170,7 @@ flush_data_list_complete (const GList *flushers,
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
* output_pending is true on entry
* output_pending is PENDING_FLUSH on entry
*/
static void
ostream_flush_cb (GObject *source_object,
@ -1201,8 +1208,8 @@ ostream_flush_cb (GObject *source_object,
/* Make sure we tell folks that we don't have additional
flushes pending */
g_mutex_lock (&data->worker->write_lock);
g_assert (data->worker->output_pending);
data->worker->output_pending = FALSE;
g_assert (data->worker->output_pending == PENDING_FLUSH);
data->worker->output_pending = PENDING_NONE;
g_mutex_unlock (&data->worker->write_lock);
/* OK, cool, finally kick off the next write */
@ -1215,7 +1222,7 @@ ostream_flush_cb (GObject *source_object,
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
* output_pending is false on entry
* output_pending is PENDING_NONE on entry
*/
static void
message_written (GDBusWorker *worker,
@ -1263,8 +1270,8 @@ message_written (GDBusWorker *worker,
}
if (flushers != NULL)
{
g_assert (!worker->output_pending);
worker->output_pending = TRUE;
g_assert (worker->output_pending == PENDING_NONE);
worker->output_pending = PENDING_FLUSH;
}
g_mutex_unlock (&worker->write_lock);
@ -1291,7 +1298,7 @@ message_written (GDBusWorker *worker,
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
* output_pending is true on entry
* output_pending is PENDING_WRITE on entry
*/
static void
write_message_cb (GObject *source_object,
@ -1302,8 +1309,8 @@ write_message_cb (GObject *source_object,
GError *error;
g_mutex_lock (&data->worker->write_lock);
g_assert (data->worker->output_pending);
data->worker->output_pending = FALSE;
g_assert (data->worker->output_pending == PENDING_WRITE);
data->worker->output_pending = PENDING_NONE;
g_mutex_unlock (&data->worker->write_lock);
error = NULL;
@ -1326,7 +1333,7 @@ write_message_cb (GObject *source_object,
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
* output_pending is true on entry
* output_pending is PENDING_CLOSE on entry
*/
static void
iostream_close_cb (GObject *source_object,
@ -1351,8 +1358,8 @@ iostream_close_cb (GObject *source_object,
send_queue = worker->write_queue;
worker->write_queue = g_queue_new ();
g_assert (worker->output_pending);
worker->output_pending = FALSE;
g_assert (worker->output_pending == PENDING_CLOSE);
worker->output_pending = PENDING_NONE;
g_mutex_unlock (&worker->write_lock);
@ -1396,7 +1403,7 @@ iostream_close_cb (GObject *source_object,
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
* output_pending must be false on entry
* output_pending must be PENDING_NONE on entry
*/
static void
maybe_write_next_message (GDBusWorker *worker)
@ -1405,7 +1412,7 @@ maybe_write_next_message (GDBusWorker *worker)
write_next:
/* we mustn't try to write two things at once */
g_assert (!worker->output_pending);
g_assert (worker->output_pending == PENDING_NONE);
g_mutex_lock (&worker->write_lock);
@ -1413,7 +1420,7 @@ maybe_write_next_message (GDBusWorker *worker)
if (worker->pending_close_attempts != NULL)
{
worker->close_expected = TRUE;
worker->output_pending = TRUE;
worker->output_pending = PENDING_CLOSE;
g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT,
NULL, iostream_close_cb,
@ -1425,7 +1432,7 @@ maybe_write_next_message (GDBusWorker *worker)
data = g_queue_pop_head (worker->write_queue);
if (data != NULL)
worker->output_pending = TRUE;
worker->output_pending = PENDING_WRITE;
}
g_mutex_unlock (&worker->write_lock);
@ -1455,7 +1462,7 @@ maybe_write_next_message (GDBusWorker *worker)
{
/* filters dropped message */
g_mutex_lock (&worker->write_lock);
worker->output_pending = FALSE;
worker->output_pending = PENDING_NONE;
g_mutex_unlock (&worker->write_lock);
message_to_write_data_free (data);
goto write_next;
@ -1496,7 +1503,7 @@ maybe_write_next_message (GDBusWorker *worker)
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
* output_pending may be true or false
* output_pending may be anything
*/
static gboolean
write_message_in_idle_cb (gpointer user_data)
@ -1506,7 +1513,7 @@ write_message_in_idle_cb (gpointer user_data)
/* Because this is the worker thread, we can read this struct member
* without holding the lock: no other thread ever modifies it.
*/
if (!worker->output_pending)
if (worker->output_pending == PENDING_NONE)
maybe_write_next_message (worker);
return FALSE;
@ -1519,7 +1526,7 @@ write_message_in_idle_cb (gpointer user_data)
* Can be called from any thread
*
* write_lock is not held on entry
* output_pending may be true or false
* output_pending may be anything
*/
static void
schedule_write_in_worker_thread (GDBusWorker *worker,
@ -1535,7 +1542,7 @@ schedule_write_in_worker_thread (GDBusWorker *worker,
worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts,
close_data);
if (!worker->output_pending)
if (worker->output_pending == PENDING_NONE)
{
GSource *idle_source;
idle_source = g_idle_source_new ();
@ -1556,7 +1563,7 @@ schedule_write_in_worker_thread (GDBusWorker *worker,
/* can be called from any thread - steals blob
*
* write_lock is not held on entry
* output_pending may be true or false
* output_pending may be anything
*/
void
_g_dbus_worker_send_message (GDBusWorker *worker,
@ -1609,7 +1616,7 @@ _g_dbus_worker_new (GIOStream *stream,
worker->stream = g_object_ref (stream);
worker->capabilities = capabilities;
worker->cancellable = g_cancellable_new ();
worker->output_pending = FALSE;
worker->output_pending = PENDING_NONE;
worker->frozen = initially_frozen;
worker->received_messages_while_frozen = g_queue_new ();
@ -1640,7 +1647,7 @@ _g_dbus_worker_new (GIOStream *stream,
/* can be called from any thread
*
* write_lock is not held on entry
* output_pending may be true or false
* output_pending may be anything
*/
void
_g_dbus_worker_close (GDBusWorker *worker,
@ -1667,7 +1674,7 @@ _g_dbus_worker_close (GDBusWorker *worker,
* worker - use your own synchronization primitive in the callbacks.
*
* write_lock is not held on entry
* output_pending may be true or false
* output_pending may be anything
*/
void
_g_dbus_worker_stop (GDBusWorker *worker)
@ -1693,7 +1700,7 @@ _g_dbus_worker_stop (GDBusWorker *worker)
* the transport has been flushed
*
* write_lock is not held on entry
* output_pending may be true or false
* output_pending may be anything
*/
gboolean
_g_dbus_worker_flush_sync (GDBusWorker *worker,