GThreadPool - Don't inherit thread priorities when creating new threads

By default (on POSIX) we would be inheriting thread priorities from the
thread that pushed a new task on non-exclusive thread pools and causes a
new thread to be created. This can cause any non-exclusive thread pool
to accidentally contain threads of different priorities, or e.g. threads
with real-time priority.

To prevent this, custom handling for setting the scheduler settings for
Linux and Windows is added and as a fallback for other platforms a new
thread is added that is responsible for spawning threads for
non-exclusive thread pools.

Fixes https://gitlab.gnome.org/GNOME/glib/issues/1834
This commit is contained in:
Sebastian Dröge 2019-12-24 15:33:30 +02:00
parent be537d8b51
commit 8aeca4fa64
7 changed files with 313 additions and 46 deletions

View File

@ -372,7 +372,7 @@ g_thread_create_full (GThreadFunc func,
GThread *thread; GThread *thread;
thread = g_thread_new_internal (NULL, g_deprecated_thread_proxy, thread = g_thread_new_internal (NULL, g_deprecated_thread_proxy,
func, data, stack_size, error); func, data, stack_size, NULL, error);
if (thread && !joinable) if (thread && !joinable)
{ {

View File

@ -41,11 +41,12 @@
#include "gthread.h" #include "gthread.h"
#include "gthreadprivate.h"
#include "gslice.h"
#include "gmessages.h"
#include "gstrfuncs.h"
#include "gmain.h" #include "gmain.h"
#include "gmessages.h"
#include "gslice.h"
#include "gstrfuncs.h"
#include "gtestutils.h"
#include "gthreadprivate.h"
#include "gutils.h" #include "gutils.h"
#include <stdlib.h> #include <stdlib.h>
@ -67,6 +68,10 @@
#include <windows.h> #include <windows.h>
#endif #endif
#if defined(__linux__)
#include <sys/syscall.h>
#endif
/* clang defines __ATOMIC_SEQ_CST but doesn't support the GCC extension */ /* clang defines __ATOMIC_SEQ_CST but doesn't support the GCC extension */
#if defined(HAVE_FUTEX) && defined(__ATOMIC_SEQ_CST) && !defined(__clang__) #if defined(HAVE_FUTEX) && defined(__ATOMIC_SEQ_CST) && !defined(__clang__)
#define USE_NATIVE_MUTEX #define USE_NATIVE_MUTEX
@ -1137,6 +1142,11 @@ typedef struct
pthread_t system_thread; pthread_t system_thread;
gboolean joined; gboolean joined;
GMutex lock; GMutex lock;
void *(*proxy) (void *);
/* Must be statically allocated and valid forever */
const GThreadSchedulerSettings *scheduler_settings;
} GThreadPosix; } GThreadPosix;
void void
@ -1152,13 +1162,87 @@ g_system_thread_free (GRealThread *thread)
g_slice_free (GThreadPosix, pt); g_slice_free (GThreadPosix, pt);
} }
void
g_system_thread_get_scheduler_settings (GThreadSchedulerSettings *scheduler_settings)
{
/* FIXME: Implement the same for macOS and the BSDs so it doesn't go through
* the fallback code using an additional thread. */
#if defined(__linux__)
pid_t tid;
int res;
/* FIXME: The struct definition does not seem to be possible to pull in
* via any of the normal system headers and it's only declared in the
* kernel headers. That's why we hardcode 56 here right now. */
guint size = 56; /* Size as of Linux 5.3.9 */
guint flags = 0;
tid = (pid_t) syscall (SYS_gettid);
scheduler_settings->attr = g_malloc0 (size);
do
{
int errsv;
res = syscall (SYS_sched_getattr, tid, scheduler_settings->attr, size, flags);
errsv = errno;
if (res == -1)
{
if (errsv == EAGAIN)
{
continue;
}
else if (errsv == E2BIG)
{
g_assert (size < G_MAXINT);
size *= 2;
scheduler_settings->attr = g_realloc (scheduler_settings->attr, size);
/* Needs to be zero-initialized */
memset (scheduler_settings->attr, 0, size);
}
else
{
g_error ("Failed to get thread scheduler attributes: %s", g_strerror (errsv));
}
}
}
while (res == -1);
#endif
}
#if defined(__linux__)
static void *
linux_pthread_proxy (void *data)
{
GThreadPosix *thread = data;
/* Set scheduler settings first if requested */
if (thread->scheduler_settings)
{
pid_t tid = 0;
guint flags = 0;
int res;
int errsv;
tid = (pid_t) syscall (SYS_gettid);
res = syscall (SYS_sched_setattr, tid, thread->scheduler_settings->attr, flags);
errsv = errno;
if (res == -1)
g_error ("Failed to set scheduler settings: %s", g_strerror (errsv));
}
return thread->proxy (data);
}
#endif
GRealThread * GRealThread *
g_system_thread_new (GThreadFunc proxy, g_system_thread_new (GThreadFunc proxy,
gulong stack_size, gulong stack_size,
const char *name, const GThreadSchedulerSettings *scheduler_settings,
GThreadFunc func, const char *name,
gpointer data, GThreadFunc func,
GError **error) gpointer data,
GError **error)
{ {
GThreadPosix *thread; GThreadPosix *thread;
GRealThread *base_thread; GRealThread *base_thread;
@ -1173,6 +1257,8 @@ g_system_thread_new (GThreadFunc proxy,
base_thread->thread.func = func; base_thread->thread.func = func;
base_thread->thread.data = data; base_thread->thread.data = data;
base_thread->name = g_strdup (name); base_thread->name = g_strdup (name);
thread->scheduler_settings = scheduler_settings;
thread->proxy = proxy;
posix_check_cmd (pthread_attr_init (&attr)); posix_check_cmd (pthread_attr_init (&attr));
@ -1190,7 +1276,19 @@ g_system_thread_new (GThreadFunc proxy,
} }
#endif /* HAVE_PTHREAD_ATTR_SETSTACKSIZE */ #endif /* HAVE_PTHREAD_ATTR_SETSTACKSIZE */
#ifdef HAVE_PTHREAD_ATTR_SETINHERITSCHED
if (!scheduler_settings)
{
/* While this is the default, better be explicit about it */
pthread_attr_setinheritsched (&attr, PTHREAD_INHERIT_SCHED);
}
#endif /* HAVE_PTHREAD_ATTR_SETINHERITSCHED */
#if defined(__linux__)
ret = pthread_create (&thread->system_thread, &attr, linux_pthread_proxy, thread);
#else
ret = pthread_create (&thread->system_thread, &attr, (void* (*)(void*))proxy, thread); ret = pthread_create (&thread->system_thread, &attr, (void* (*)(void*))proxy, thread);
#endif
posix_check_cmd (pthread_attr_destroy (&attr)); posix_check_cmd (pthread_attr_destroy (&attr));

View File

@ -428,20 +428,27 @@ g_thread_win32_proxy (gpointer data)
return 0; return 0;
} }
void
g_system_thread_get_scheduler_settings (GThreadSchedulerSettings *scheduler_settings)
{
HANDLE current_thread = GetCurrentThread ();
scheduler_settings->thread_prio = GetThreadPriority (current_thread);
}
GRealThread * GRealThread *
g_system_thread_new (GThreadFunc proxy, g_system_thread_new (GThreadFunc proxy,
gulong stack_size, gulong stack_size,
const char *name, const GThreadSchedulerSettings *scheduler_settings,
GThreadFunc func, const char *name,
gpointer data, GThreadFunc func,
GError **error) gpointer data,
GError **error)
{ {
GThreadWin32 *thread; GThreadWin32 *thread;
GRealThread *base_thread; GRealThread *base_thread;
guint ignore; guint ignore;
const gchar *message = NULL; const gchar *message = NULL;
HANDLE current_thread; int thread_prio;
int current_prio;
thread = g_slice_new0 (GThreadWin32); thread = g_slice_new0 (GThreadWin32);
thread->proxy = proxy; thread->proxy = proxy;
@ -472,15 +479,23 @@ g_system_thread_new (GThreadFunc proxy,
* priority. * priority.
*/ */
current_thread = GetCurrentThread (); if (scheduler_settings)
current_prio = GetThreadPriority (current_thread); {
if (current_prio == THREAD_PRIORITY_ERROR_RETURN) thread_prio = scheduler_settings->thread_prio;
}
else
{
HANDLE current_thread = GetCurrentThread ();
thread_prio = GetThreadPriority (current_thread);
}
if (thread_prio == THREAD_PRIORITY_ERROR_RETURN)
{ {
message = "Error getting current thread priority"; message = "Error getting current thread priority";
goto error; goto error;
} }
if (SetThreadPriority (thread->handle, current_prio) == 0) if (SetThreadPriority (thread->handle, thread_prio) == 0)
{ {
message = "Error setting new thread priority"; message = "Error setting new thread priority";
goto error; goto error;

View File

@ -853,7 +853,7 @@ g_thread_new (const gchar *name,
GError *error = NULL; GError *error = NULL;
GThread *thread; GThread *thread;
thread = g_thread_new_internal (name, g_thread_proxy, func, data, 0, &error); thread = g_thread_new_internal (name, g_thread_proxy, func, data, 0, NULL, &error);
if G_UNLIKELY (thread == NULL) if G_UNLIKELY (thread == NULL)
g_error ("creating thread '%s': %s", name ? name : "", error->message); g_error ("creating thread '%s': %s", name ? name : "", error->message);
@ -884,21 +884,29 @@ g_thread_try_new (const gchar *name,
gpointer data, gpointer data,
GError **error) GError **error)
{ {
return g_thread_new_internal (name, g_thread_proxy, func, data, 0, error); return g_thread_new_internal (name, g_thread_proxy, func, data, 0, NULL, error);
} }
GThread * GThread *
g_thread_new_internal (const gchar *name, g_thread_new_internal (const gchar *name,
GThreadFunc proxy, GThreadFunc proxy,
GThreadFunc func, GThreadFunc func,
gpointer data, gpointer data,
gsize stack_size, gsize stack_size,
GError **error) const GThreadSchedulerSettings *scheduler_settings,
GError **error)
{ {
g_return_val_if_fail (func != NULL, NULL); g_return_val_if_fail (func != NULL, NULL);
return (GThread*) g_system_thread_new (proxy, stack_size, name, return (GThread *) g_system_thread_new (proxy, stack_size, scheduler_settings,
func, data, error); name, func, data, error);
}
void
g_thread_get_scheduler_settings (GThreadSchedulerSettings *scheduler_settings)
{
g_return_if_fail (scheduler_settings != NULL);
g_system_thread_get_scheduler_settings (scheduler_settings);
} }
/** /**

View File

@ -30,6 +30,7 @@
#include "gasyncqueueprivate.h" #include "gasyncqueueprivate.h"
#include "gmain.h" #include "gmain.h"
#include "gtestutils.h" #include "gtestutils.h"
#include "gthreadprivate.h"
#include "gtimer.h" #include "gtimer.h"
#include "gutils.h" #include "gutils.h"
@ -113,6 +114,21 @@ static gint max_unused_threads = 2;
static gint kill_unused_threads = 0; static gint kill_unused_threads = 0;
static guint max_idle_time = 15 * 1000; static guint max_idle_time = 15 * 1000;
#ifdef HAVE_GTHREAD_SCHEDULER_SETTINGS
static GThreadSchedulerSettings shared_thread_scheduler_settings;
#else
typedef struct
{
/* Either thread or error are set in the end. Both transfer-full. */
GThreadPool *pool;
GThread *thread;
GError *error;
} SpawnThreadData;
static GCond spawn_thread_cond;
static GAsyncQueue *spawn_thread_queue;
#endif
static void g_thread_pool_queue_push_unlocked (GRealThreadPool *pool, static void g_thread_pool_queue_push_unlocked (GRealThreadPool *pool,
gpointer data); gpointer data);
static void g_thread_pool_free_internal (GRealThreadPool *pool); static void g_thread_pool_free_internal (GRealThreadPool *pool);
@ -278,6 +294,39 @@ g_thread_pool_wait_for_new_task (GRealThreadPool *pool)
return task; return task;
} }
#ifndef HAVE_GTHREAD_SCHEDULER_SETTINGS
static gpointer
g_thread_pool_spawn_thread (gpointer data)
{
while (TRUE)
{
SpawnThreadData *spawn_thread_data;
GThread *thread = NULL;
GError *error = NULL;
const gchar *prgname = g_get_prgname ();
gchar name[16] = "pool";
if (prgname)
g_snprintf (name, sizeof (name), "pool-%s", prgname);
g_async_queue_lock (spawn_thread_queue);
/* Spawn a new thread for the given pool and wake the requesting thread
* up again with the result. This new thread will have the scheduler
* settings inherited from this thread and in extension of the thread
* that created the first non-exclusive thread-pool. */
spawn_thread_data = g_async_queue_pop_unlocked (spawn_thread_queue);
thread = g_thread_try_new (name, g_thread_pool_thread_proxy, spawn_thread_data->pool, &error);
spawn_thread_data->thread = g_steal_pointer (&thread);
spawn_thread_data->error = g_steal_pointer (&error);
g_cond_broadcast (&spawn_thread_cond);
g_async_queue_unlock (spawn_thread_queue);
}
return NULL;
}
#endif
static gpointer static gpointer
g_thread_pool_thread_proxy (gpointer data) g_thread_pool_thread_proxy (gpointer data)
@ -410,7 +459,40 @@ g_thread_pool_start_thread (GRealThreadPool *pool,
g_snprintf (name, sizeof (name), "pool-%s", prgname); g_snprintf (name, sizeof (name), "pool-%s", prgname);
/* No thread was found, we have to start a new one */ /* No thread was found, we have to start a new one */
thread = g_thread_try_new (name, g_thread_pool_thread_proxy, pool, error); if (pool->pool.exclusive)
{
/* For exclusive thread-pools this is directly called from new() and
* we simply start new threads that inherit the scheduler settings
* from the current thread.
*/
thread = g_thread_try_new (name, g_thread_pool_thread_proxy, pool, error);
}
else
{
/* For non-exclusive thread-pools this can be called at any time
* when a new thread is needed. We make sure to create a new thread
* here with the correct scheduler settings: either by directly
* providing them if supported by the GThread implementation or by
* going via our helper thread.
*/
#ifdef HAVE_GTHREAD_SCHEDULER_SETTINGS
thread = g_thread_new_internal (name, g_thread_proxy, g_thread_pool_thread_proxy, pool, 0, &shared_thread_scheduler_settings, error);
#else
SpawnThreadData spawn_thread_data = { (GThreadPool *) pool, NULL, NULL };
g_async_queue_lock (spawn_thread_queue);
g_async_queue_push_unlocked (spawn_thread_queue, &spawn_thread_data);
while (!spawn_thread_data.thread && !spawn_thread_data.error)
g_cond_wait (&spawn_thread_cond, _g_async_queue_get_mutex (spawn_thread_queue));
thread = spawn_thread_data.thread;
if (!thread)
g_propagate_error (error, g_steal_pointer (&spawn_thread_data.error));
g_async_queue_unlock (spawn_thread_queue);
#endif
}
if (thread == NULL) if (thread == NULL)
return FALSE; return FALSE;
@ -497,7 +579,41 @@ g_thread_pool_new (GFunc func,
G_LOCK (init); G_LOCK (init);
if (!unused_thread_queue) if (!unused_thread_queue)
{
unused_thread_queue = g_async_queue_new (); unused_thread_queue = g_async_queue_new ();
/* For the very first non-exclusive thread-pool we remember the thread
* scheduler settings of the thread creating the pool, if supported by
* the GThread implementation. This is then used for making sure that
* all threads created on the non-exclusive thread-pool have the same
* scheduler settings, and more importantly don't just inherit them
* from the thread that just happened to push a new task and caused
* a new thread to be created.
*
* Not doing so could cause real-time priority threads or otherwise
* threads with problematic scheduler settings to be part of the
* non-exclusive thread-pools.
*
* If this is not supported by the GThread implementation then we here
* start a thread that will inherit the scheduler settings from this
* very thread and whose only purpose is to spawn new threads with the
* same settings for use by the non-exclusive thread-pools.
*
*
* For non-exclusive thread-pools this is not required as all threads
* are created immediately below and are running forever, so they will
* automatically inherit the scheduler settings from this very thread.
*/
if (!exclusive)
{
#ifdef HAVE_GTHREAD_SCHEDULER_SETTINGS
g_thread_get_scheduler_settings (&shared_thread_scheduler_settings);
#else
spawn_thread_queue = g_async_queue_new ();
g_cond_init (&spawn_thread_cond);
g_thread_new ("pool-spawner", g_thread_pool_spawn_thread, NULL);
#endif
}
}
G_UNLOCK (init); G_UNLOCK (init);
if (retval->pool.exclusive) if (retval->pool.exclusive)

View File

@ -35,27 +35,54 @@ struct _GRealThread
}; };
/* system thread implementation (gthread-posix.c, gthread-win32.c) */ /* system thread implementation (gthread-posix.c, gthread-win32.c) */
/* Platform-specific scheduler settings for a thread */
typedef struct _GThreadSchedulerSettings GThreadSchedulerSettings;
/* TODO: Add the same for macOS and the BSDs */
#if defined(__linux__)
struct _GThreadSchedulerSettings
{
struct sched_attr *attr;
};
#define HAVE_GTHREAD_SCHEDULER_SETTINGS 1
#elif defined(G_OS_WIN32)
struct _GThreadSchedulerSettings
{
gint thread_prio;
};
#define HAVE_GTHREAD_SCHEDULER_SETTINGS 1
#endif
void g_system_thread_wait (GRealThread *thread); void g_system_thread_wait (GRealThread *thread);
GRealThread * g_system_thread_new (GThreadFunc proxy, GRealThread *g_system_thread_new (GThreadFunc proxy,
gulong stack_size, gulong stack_size,
const char *name, const GThreadSchedulerSettings *scheduler_settings,
GThreadFunc func, const char *name,
gpointer data, GThreadFunc func,
GError **error); gpointer data,
GError **error);
void g_system_thread_free (GRealThread *thread); void g_system_thread_free (GRealThread *thread);
void g_system_thread_exit (void); void g_system_thread_exit (void);
void g_system_thread_set_name (const gchar *name); void g_system_thread_set_name (const gchar *name);
void g_system_thread_get_scheduler_settings (GThreadSchedulerSettings *scheduler_settings);
/* gthread.c */ /* gthread.c */
GThread * g_thread_new_internal (const gchar *name, GThread *g_thread_new_internal (const gchar *name,
GThreadFunc proxy, GThreadFunc proxy,
GThreadFunc func, GThreadFunc func,
gpointer data, gpointer data,
gsize stack_size, gsize stack_size,
GError **error); const GThreadSchedulerSettings *scheduler_settings,
GError **error);
void g_thread_get_scheduler_settings (GThreadSchedulerSettings *scheduler_settings);
gpointer g_thread_proxy (gpointer thread); gpointer g_thread_proxy (gpointer thread);

View File

@ -1718,6 +1718,9 @@ else
if cc.has_header_symbol('pthread.h', 'pthread_attr_setstacksize') if cc.has_header_symbol('pthread.h', 'pthread_attr_setstacksize')
glib_conf.set('HAVE_PTHREAD_ATTR_SETSTACKSIZE', 1) glib_conf.set('HAVE_PTHREAD_ATTR_SETSTACKSIZE', 1)
endif endif
if cc.has_header_symbol('pthread.h', 'pthread_attr_setinheritsched')
glib_conf.set('HAVE_PTHREAD_ATTR_SETINHERITSCHED', 1)
endif
if cc.has_header_symbol('pthread.h', 'pthread_condattr_setclock') if cc.has_header_symbol('pthread.h', 'pthread_condattr_setclock')
glib_conf.set('HAVE_PTHREAD_CONDATTR_SETCLOCK', 1) glib_conf.set('HAVE_PTHREAD_CONDATTR_SETCLOCK', 1)
endif endif