Merge branch '1614-freebsd-threading-flaky' into 'master'

tests: Unmark several gdbus-* tests as flaky

Closes #1614

See merge request GNOME/glib!777
This commit is contained in:
Emmanuele Bassi 2019-04-15 09:11:13 +00:00
commit cd04cf7778
4 changed files with 59 additions and 50 deletions

View File

@ -1493,16 +1493,21 @@ filter_function (GDBusConnection *connection,
} }
else else
{ {
if (g_dbus_message_get_sender (message) == NULL ||
g_dbus_message_get_destination (message) == NULL)
{
message = copy_if_locked (message);
if (message == NULL)
{
g_warning ("Failed to copy outgoing message");
return NULL;
}
}
if (g_dbus_message_get_sender (message) == NULL) if (g_dbus_message_get_sender (message) == NULL)
{ g_dbus_message_set_sender (message, DBUS_SERVICE_NAME);
message = copy_if_locked (message);
g_dbus_message_set_sender (message, DBUS_SERVICE_NAME);
}
if (g_dbus_message_get_destination (message) == NULL) if (g_dbus_message_get_destination (message) == NULL)
{ g_dbus_message_set_destination (message, client->id);
message = copy_if_locked (message);
g_dbus_message_set_destination (message, client->id);
}
} }
return message; return message;

View File

@ -492,9 +492,9 @@ _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker *worker,
{ {
GDBusMessage *ret; GDBusMessage *ret;
if (!g_atomic_int_get (&worker->stopped)) if (!g_atomic_int_get (&worker->stopped))
ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data); ret = worker->message_about_to_be_sent_callback (worker, g_steal_pointer (&message), worker->user_data);
else else
ret = message; ret = g_steal_pointer (&message);
return ret; return ret;
} }
@ -506,13 +506,13 @@ _g_dbus_worker_queue_or_deliver_received_message (GDBusWorker *worker,
if (worker->frozen || g_queue_get_length (worker->received_messages_while_frozen) > 0) if (worker->frozen || g_queue_get_length (worker->received_messages_while_frozen) > 0)
{ {
/* queue up */ /* queue up */
g_queue_push_tail (worker->received_messages_while_frozen, message); g_queue_push_tail (worker->received_messages_while_frozen, g_steal_pointer (&message));
} }
else else
{ {
/* not frozen, nor anything in queue */ /* not frozen, nor anything in queue */
_g_dbus_worker_emit_message_received (worker, message); _g_dbus_worker_emit_message_received (worker, message);
g_object_unref (message); g_clear_object (&message);
} }
} }
@ -529,7 +529,7 @@ unfreeze_in_idle_cb (gpointer user_data)
while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL) while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
{ {
_g_dbus_worker_emit_message_received (worker, message); _g_dbus_worker_emit_message_received (worker, message);
g_object_unref (message); g_clear_object (&message);
} }
worker->frozen = FALSE; worker->frozen = FALSE;
} }
@ -796,7 +796,7 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream,
} }
/* yay, got a message, go deliver it */ /* yay, got a message, go deliver it */
_g_dbus_worker_queue_or_deliver_received_message (worker, message); _g_dbus_worker_queue_or_deliver_received_message (worker, g_steal_pointer (&message));
/* start reading another message! */ /* start reading another message! */
worker->read_buffer_bytes_wanted = 0; worker->read_buffer_bytes_wanted = 0;

View File

@ -62,42 +62,45 @@ G_DEFINE_TYPE_WITH_PRIVATE (GThreadedSocketService,
g_threaded_socket_service, g_threaded_socket_service,
G_TYPE_SOCKET_SERVICE) G_TYPE_SOCKET_SERVICE)
enum typedef enum
{ {
PROP_0, PROP_MAX_THREADS = 1,
PROP_MAX_THREADS } GThreadedSocketServiceProperty;
};
G_LOCK_DEFINE_STATIC(job_count); G_LOCK_DEFINE_STATIC(job_count);
typedef struct typedef struct
{ {
GSocketConnection *connection; GThreadedSocketService *service; /* (owned) */
GObject *source_object; GSocketConnection *connection; /* (owned) */
GObject *source_object; /* (owned) (nullable) */
} GThreadedSocketServiceData; } GThreadedSocketServiceData;
static void static void
g_threaded_socket_service_func (gpointer _data, g_threaded_socket_service_data_free (GThreadedSocketServiceData *data)
gpointer user_data)
{ {
GThreadedSocketService *threaded = user_data; g_clear_object (&data->service);
GThreadedSocketServiceData *data = _data; g_clear_object (&data->connection);
g_clear_object (&data->source_object);
g_slice_free (GThreadedSocketServiceData, data);
}
static void
g_threaded_socket_service_func (gpointer job_data,
gpointer user_data)
{
GThreadedSocketServiceData *data = job_data;
gboolean result; gboolean result;
g_signal_emit (threaded, g_threaded_socket_service_run_signal, g_signal_emit (data->service, g_threaded_socket_service_run_signal,
0, data->connection, data->source_object, &result); 0, data->connection, data->source_object, &result);
g_object_unref (data->connection);
if (data->source_object)
g_object_unref (data->source_object);
g_slice_free (GThreadedSocketServiceData, data);
G_LOCK (job_count); G_LOCK (job_count);
if (threaded->priv->job_count-- == threaded->priv->max_threads) if (data->service->priv->job_count-- == data->service->priv->max_threads)
g_socket_service_start (G_SOCKET_SERVICE (threaded)); g_socket_service_start (G_SOCKET_SERVICE (data->service));
G_UNLOCK (job_count); G_UNLOCK (job_count);
g_object_unref (threaded); g_threaded_socket_service_data_free (data);
} }
static gboolean static gboolean
@ -107,28 +110,27 @@ g_threaded_socket_service_incoming (GSocketService *service,
{ {
GThreadedSocketService *threaded; GThreadedSocketService *threaded;
GThreadedSocketServiceData *data; GThreadedSocketServiceData *data;
GError *local_error = NULL;
threaded = G_THREADED_SOCKET_SERVICE (service); threaded = G_THREADED_SOCKET_SERVICE (service);
data = g_slice_new (GThreadedSocketServiceData); data = g_slice_new0 (GThreadedSocketServiceData);
data->service = g_object_ref (threaded);
/* Ref the socket service for the thread */
g_object_ref (service);
data->connection = g_object_ref (connection); data->connection = g_object_ref (connection);
if (source_object) data->source_object = (source_object != NULL) ? g_object_ref (source_object) : NULL;
data->source_object = g_object_ref (source_object);
else
data->source_object = NULL;
G_LOCK (job_count); G_LOCK (job_count);
if (++threaded->priv->job_count == threaded->priv->max_threads) if (++threaded->priv->job_count == threaded->priv->max_threads)
g_socket_service_stop (service); g_socket_service_stop (service);
G_UNLOCK (job_count); G_UNLOCK (job_count);
g_thread_pool_push (threaded->priv->thread_pool, data, NULL); if (!g_thread_pool_push (threaded->priv->thread_pool, data, &local_error))
{
g_warning ("Error handling incoming socket: %s", local_error->message);
g_threaded_socket_service_data_free (data);
}
g_clear_error (&local_error);
return FALSE; return FALSE;
} }
@ -147,7 +149,7 @@ g_threaded_socket_service_constructed (GObject *object)
service->priv->thread_pool = service->priv->thread_pool =
g_thread_pool_new (g_threaded_socket_service_func, g_thread_pool_new (g_threaded_socket_service_func,
service, NULL,
service->priv->max_threads, service->priv->max_threads,
FALSE, FALSE,
NULL); NULL);
@ -159,6 +161,8 @@ g_threaded_socket_service_finalize (GObject *object)
{ {
GThreadedSocketService *service = G_THREADED_SOCKET_SERVICE (object); GThreadedSocketService *service = G_THREADED_SOCKET_SERVICE (object);
/* All jobs in the pool hold a reference to this #GThreadedSocketService, so
* this should only be called once the pool is empty: */
g_thread_pool_free (service->priv->thread_pool, FALSE, FALSE); g_thread_pool_free (service->priv->thread_pool, FALSE, FALSE);
G_OBJECT_CLASS (g_threaded_socket_service_parent_class) G_OBJECT_CLASS (g_threaded_socket_service_parent_class)
@ -173,7 +177,7 @@ g_threaded_socket_service_get_property (GObject *object,
{ {
GThreadedSocketService *service = G_THREADED_SOCKET_SERVICE (object); GThreadedSocketService *service = G_THREADED_SOCKET_SERVICE (object);
switch (prop_id) switch ((GThreadedSocketServiceProperty) prop_id)
{ {
case PROP_MAX_THREADS: case PROP_MAX_THREADS:
g_value_set_int (value, service->priv->max_threads); g_value_set_int (value, service->priv->max_threads);
@ -192,7 +196,7 @@ g_threaded_socket_service_set_property (GObject *object,
{ {
GThreadedSocketService *service = G_THREADED_SOCKET_SERVICE (object); GThreadedSocketService *service = G_THREADED_SOCKET_SERVICE (object);
switch (prop_id) switch ((GThreadedSocketServiceProperty) prop_id)
{ {
case PROP_MAX_THREADS: case PROP_MAX_THREADS:
service->priv->max_threads = g_value_get_int (value); service->priv->max_threads = g_value_get_int (value);

View File

@ -248,10 +248,10 @@ if host_machine.system() != 'windows'
'extra_sources' : extra_sources, 'extra_sources' : extra_sources,
'suite' : ['slow'], 'suite' : ['slow'],
}, },
'gdbus-auth' : {'extra_sources' : extra_sources, 'suite': ['flaky']}, 'gdbus-auth' : {'extra_sources' : extra_sources},
'gdbus-bz627724' : {'extra_sources' : extra_sources, 'suite': ['flaky']}, 'gdbus-bz627724' : {'extra_sources' : extra_sources},
'gdbus-close-pending' : {'extra_sources' : extra_sources}, 'gdbus-close-pending' : {'extra_sources' : extra_sources},
'gdbus-connection' : {'extra_sources' : extra_sources, 'suite': ['flaky']}, 'gdbus-connection' : {'extra_sources' : extra_sources},
'gdbus-connection-loss' : {'extra_sources' : extra_sources}, 'gdbus-connection-loss' : {'extra_sources' : extra_sources},
'gdbus-connection-slow' : {'extra_sources' : extra_sources}, 'gdbus-connection-slow' : {'extra_sources' : extra_sources},
'gdbus-error' : {'extra_sources' : extra_sources}, 'gdbus-error' : {'extra_sources' : extra_sources},