Modify child and signal sources to use worker

This commit is contained in:
Ryan Lortie 2011-08-31 22:58:26 -04:00
parent 7eae486179
commit ba7019e19e

View File

@ -282,7 +282,6 @@ struct _GChildWatchSource
#ifdef G_OS_WIN32 #ifdef G_OS_WIN32
GPollFD poll; GPollFD poll;
#else /* G_OS_WIN32 */ #else /* G_OS_WIN32 */
gint count;
gboolean child_exited; gboolean child_exited;
#endif /* G_OS_WIN32 */ #endif /* G_OS_WIN32 */
}; };
@ -348,8 +347,6 @@ static void g_main_context_remove_poll_unlocked (GMainContext *context,
GPollFD *fd); GPollFD *fd);
static void g_main_context_wakeup_unlocked (GMainContext *context); static void g_main_context_wakeup_unlocked (GMainContext *context);
static void _g_main_wake_up_all_contexts (void);
static gboolean g_timeout_prepare (GSource *source, static gboolean g_timeout_prepare (GSource *source,
gint *timeout); gint *timeout);
static gboolean g_timeout_check (GSource *source); static gboolean g_timeout_check (GSource *source);
@ -362,8 +359,8 @@ static gboolean g_child_watch_check (GSource *source);
static gboolean g_child_watch_dispatch (GSource *source, static gboolean g_child_watch_dispatch (GSource *source,
GSourceFunc callback, GSourceFunc callback,
gpointer user_data); gpointer user_data);
static void g_child_watch_finalize (GSource *source);
#ifdef G_OS_UNIX #ifdef G_OS_UNIX
static gpointer unix_signal_helper_thread (gpointer data) G_GNUC_NORETURN;
static void g_unix_signal_handler (int signum); static void g_unix_signal_handler (int signum);
static gboolean g_unix_signal_watch_prepare (GSource *source, static gboolean g_unix_signal_watch_prepare (GSource *source,
gint *timeout); gint *timeout);
@ -387,22 +384,17 @@ static GMainContext *default_main_context;
#ifndef G_OS_WIN32 #ifndef G_OS_WIN32
/* The UNIX signal pipe contains a single byte specifying which
* signal was received.
*/
#define _UNIX_SIGNAL_PIPE_SIGCHLD_CHAR 'C'
#define _UNIX_SIGNAL_PIPE_SIGHUP_CHAR 'H'
#define _UNIX_SIGNAL_PIPE_SIGINT_CHAR 'I'
#define _UNIX_SIGNAL_PIPE_SIGTERM_CHAR 'T'
/* Guards all the data below */
G_LOCK_DEFINE_STATIC (unix_signal_lock);
static gboolean unix_signal_initialized;
static sigset_t unix_signal_mask;
static gint unix_signal_wake_up_pipe[2];
GSList *unix_signal_watches;
/* Not guarded ( FIXME should it be? ) */ /* UNIX signals work by marking one of these variables then waking the
static gint child_watch_count = 1; * worker context to check on them and dispatch accordingly.
*/
static volatile gchar unix_signal_pending[NSIG];
static volatile gboolean any_unix_signal_pending;
/* Guards all the data below */
G_LOCK_DEFINE_STATIC (unix_signal_lock);
static GSList *unix_signal_watches;
static GSList *unix_child_watches;
static GSourceFuncs g_unix_signal_funcs = static GSourceFuncs g_unix_signal_funcs =
{ {
@ -428,7 +420,7 @@ GSourceFuncs g_child_watch_funcs =
g_child_watch_prepare, g_child_watch_prepare,
g_child_watch_check, g_child_watch_check,
g_child_watch_dispatch, g_child_watch_dispatch,
NULL g_child_watch_finalize
}; };
GSourceFuncs g_idle_funcs = GSourceFuncs g_idle_funcs =
@ -3637,25 +3629,6 @@ g_main_context_get_poll_func (GMainContext *context)
return result; return result;
} }
static void
_g_main_wake_up_all_contexts (void)
{
GSList *list;
/* We were woken up. Wake up all other contexts in all other threads */
G_LOCK (main_context_list);
for (list = main_context_list; list; list = list->next)
{
GMainContext *context = list->data;
LOCK_CONTEXT (context);
g_main_context_wakeup_unlocked (context);
UNLOCK_CONTEXT (context);
}
G_UNLOCK (main_context_list);
}
/* HOLDS: context's lock */ /* HOLDS: context's lock */
/* Wake the main loop up from a poll() */ /* Wake the main loop up from a poll() */
static void static void
@ -4133,77 +4106,142 @@ g_child_watch_check (GSource *source)
#else /* G_OS_WIN32 */ #else /* G_OS_WIN32 */
static gboolean static void
check_for_child_exited (GSource *source) wake_source (GSource *source)
{ {
GChildWatchSource *child_watch_source; GMainContext *context;
gint count;
/* protect against another SIGCHLD in the middle of this call */ /* This should be thread-safe:
count = child_watch_count; *
* - if the source is currently being added to a context, that
* context will be woken up anyway
*
* - if the source is currently being destroyed, we simply need not
* to crash:
*
* - the memory for the source will remain valid until after the
* source finalize function was called (which would remove the
* source from the global list which we are currently holding the
* lock for)
*
* - the GMainContext will either be NULL or point to a live
* GMainContext
*
* - the GMainContext will remain valid since we hold the
* main_context_list lock
*
* Since we are holding a lot of locks here, don't try to enter any
* more GMainContext functions for fear of dealock -- just hit the
* GWakeup and run. Even if that's safe now, it could easily become
* unsafe with some very minor changes in the future, and signal
* handling is not the most well-tested codepath.
*/
G_LOCK(main_context_list);
context = source->context;
if (context)
g_wakeup_signal (context->wakeup);
G_UNLOCK(main_context_list);
}
child_watch_source = (GChildWatchSource *) source; static void
dispatch_unix_signals (void)
{
GSList *node;
if (child_watch_source->child_exited) /* clear this first incase another one arrives while we're processing */
return TRUE; any_unix_signal_pending = FALSE;
if (child_watch_source->count < count) G_LOCK(unix_signal_lock);
/* handle GChildWatchSource instances */
if (unix_signal_pending[SIGCHLD])
{ {
gint child_status; unix_signal_pending[SIGCHLD] = FALSE;
if (waitpid (child_watch_source->pid, &child_status, WNOHANG) > 0) /* The only way we can do this is to scan all of the children.
{ *
child_watch_source->child_status = child_status; * The docs promise that we will not reap children that we are not
child_watch_source->child_exited = TRUE; * explicitly watching, so that ties our hands from calling
} * waitpid(-1). We also can't use siginfo's si_pid field since if
child_watch_source->count = count; * multiple SIGCHLD arrive at the same time, one of them can be
* dropped (since a given UNIX signal can only be pending once).
*/
for (node = unix_child_watches; node; node = node->next)
{
GChildWatchSource *source = node->data;
if (!source->child_exited)
{
if (waitpid (source->pid, &source->child_status, WNOHANG) > 0)
{
source->child_exited = TRUE;
wake_source ((GSource *) source);
}
}
}
} }
return child_watch_source->child_exited; /* handle GUnixSignalWatchSource instances */
for (node = unix_signal_watches; node; node = node->next)
{
GUnixSignalWatchSource *source = node->data;
if (!source->pending)
{
if (unix_signal_pending[source->signum])
{
unix_signal_pending[source->signum] = FALSE;
source->pending = TRUE;
wake_source ((GSource *) source);
}
}
}
G_UNLOCK(unix_signal_lock);
} }
static gboolean static gboolean
g_child_watch_prepare (GSource *source, g_child_watch_prepare (GSource *source,
gint *timeout) gint *timeout)
{ {
*timeout = -1; GChildWatchSource *child_watch_source;
return check_for_child_exited (source); child_watch_source = (GChildWatchSource *) source;
}
static gboolean return child_watch_source->child_exited;
g_child_watch_check (GSource *source)
{
return check_for_child_exited (source);
} }
static gboolean static gboolean
check_for_signal_delivery (GSource *source) g_child_watch_check (GSource *source)
{ {
GUnixSignalWatchSource *unix_signal_source = (GUnixSignalWatchSource*) source; GChildWatchSource *child_watch_source;
gboolean delivered;
G_LOCK (unix_signal_lock); child_watch_source = (GChildWatchSource *) source;
g_assert (unix_signal_initialized);
delivered = unix_signal_source->pending;
G_UNLOCK (unix_signal_lock);
return delivered; return child_watch_source->child_exited;
} }
static gboolean static gboolean
g_unix_signal_watch_prepare (GSource *source, g_unix_signal_watch_prepare (GSource *source,
gint *timeout) gint *timeout)
{ {
*timeout = -1; GUnixSignalWatchSource *unix_signal_source;
return check_for_signal_delivery (source); unix_signal_source = (GUnixSignalWatchSource *) source;
return unix_signal_source->pending;
} }
static gboolean static gboolean
g_unix_signal_watch_check (GSource *source) g_unix_signal_watch_check (GSource *source)
{ {
return check_for_signal_delivery (source); GUnixSignalWatchSource *unix_signal_source;
unix_signal_source = (GUnixSignalWatchSource *) source;
return unix_signal_source->pending;
} }
static gboolean static gboolean
@ -4224,10 +4262,7 @@ g_unix_signal_watch_dispatch (GSource *source,
(callback) (user_data); (callback) (user_data);
G_LOCK (unix_signal_lock);
g_assert (unix_signal_initialized);
unix_signal_source->pending = FALSE; unix_signal_source->pending = FALSE;
G_UNLOCK (unix_signal_lock);
return TRUE; return TRUE;
} }
@ -4235,28 +4270,21 @@ g_unix_signal_watch_dispatch (GSource *source,
static void static void
ensure_unix_signal_handler_installed_unlocked (int signum) ensure_unix_signal_handler_installed_unlocked (int signum)
{ {
static sigset_t installed_signal_mask;
static gboolean initialized;
struct sigaction action; struct sigaction action;
GError *error = NULL;
if (!unix_signal_initialized) if (!initialized)
{ {
sigemptyset (&unix_signal_mask); sigemptyset (&installed_signal_mask);
glib_get_worker_context ();
if (!g_unix_open_pipe (unix_signal_wake_up_pipe, FD_CLOEXEC, &error)) initialized = TRUE;
g_error ("Cannot create UNIX signal wake up pipe: %s\n", error->message);
g_unix_set_fd_nonblocking (unix_signal_wake_up_pipe[1], TRUE, NULL);
/* We create a helper thread that polls on the wakeup pipe indefinitely */
if (g_thread_create (unix_signal_helper_thread, NULL, FALSE, &error) == NULL)
g_error ("Cannot create a thread to monitor UNIX signals: %s\n", error->message);
unix_signal_initialized = TRUE;
} }
if (sigismember (&unix_signal_mask, signum)) if (sigismember (&installed_signal_mask, signum))
return; return;
sigaddset (&unix_signal_mask, signum); sigaddset (&installed_signal_mask, signum);
action.sa_handler = g_unix_signal_handler; action.sa_handler = g_unix_signal_handler;
sigemptyset (&action.sa_mask); sigemptyset (&action.sa_mask);
@ -4279,6 +4307,9 @@ _g_main_create_unix_signal_watch (int signum)
G_LOCK (unix_signal_lock); G_LOCK (unix_signal_lock);
ensure_unix_signal_handler_installed_unlocked (signum); ensure_unix_signal_handler_installed_unlocked (signum);
unix_signal_watches = g_slist_prepend (unix_signal_watches, unix_signal_source); unix_signal_watches = g_slist_prepend (unix_signal_watches, unix_signal_source);
if (unix_signal_pending[signum])
unix_signal_source->pending = TRUE;
unix_signal_pending[signum] = FALSE;
G_UNLOCK (unix_signal_lock); G_UNLOCK (unix_signal_lock);
return source; return source;
@ -4294,6 +4325,14 @@ g_unix_signal_watch_finalize (GSource *source)
#endif /* G_OS_WIN32 */ #endif /* G_OS_WIN32 */
static void
g_child_watch_finalize (GSource *source)
{
G_LOCK (unix_signal_lock);
unix_child_watches = g_slist_remove (unix_child_watches, source);
G_UNLOCK (unix_signal_lock);
}
static gboolean static gboolean
g_child_watch_dispatch (GSource *source, g_child_watch_dispatch (GSource *source,
GSourceFunc callback, GSourceFunc callback,
@ -4322,125 +4361,10 @@ g_child_watch_dispatch (GSource *source,
static void static void
g_unix_signal_handler (int signum) g_unix_signal_handler (int signum)
{ {
if (signum == SIGCHLD) unix_signal_pending[signum] = TRUE;
child_watch_count ++; any_unix_signal_pending = TRUE;
char buf[1]; g_wakeup_signal (glib_worker_context->wakeup);
switch (signum)
{
case SIGCHLD:
buf[0] = _UNIX_SIGNAL_PIPE_SIGCHLD_CHAR;
break;
case SIGHUP:
buf[0] = _UNIX_SIGNAL_PIPE_SIGHUP_CHAR;
break;
case SIGINT:
buf[0] = _UNIX_SIGNAL_PIPE_SIGINT_CHAR;
break;
case SIGTERM:
buf[0] = _UNIX_SIGNAL_PIPE_SIGTERM_CHAR;
break;
default:
/* Shouldn't happen */
return;
}
write (unix_signal_wake_up_pipe[1], buf, 1);
}
static void
deliver_unix_signal (int signum)
{
GSList *iter;
g_assert (signum == SIGHUP || signum == SIGINT || signum == SIGTERM);
G_LOCK (unix_signal_lock);
for (iter = unix_signal_watches; iter; iter = iter->next)
{
GUnixSignalWatchSource *source = iter->data;
if (source->signum != signum)
continue;
source->pending = TRUE;
}
G_UNLOCK (unix_signal_lock);
}
/*
* This thread is created whenever anything in GLib needs
* to deal with UNIX signals; at present, just SIGCHLD
* from g_child_watch_source_new().
*
* Note: We could eventually make this thread a more public interface
* and allow e.g. GDBus to use it instead of its own worker thread.
*/
static gpointer
unix_signal_helper_thread (gpointer data)
{
while (1)
{
gchar b[128];
ssize_t i, bytes_read;
gboolean sigterm_received = FALSE;
gboolean sigint_received = FALSE;
gboolean sighup_received = FALSE;
bytes_read = read (unix_signal_wake_up_pipe[0], b, sizeof (b));
if (bytes_read < 0)
{
g_warning ("Failed to read from child watch wake up pipe: %s",
strerror (errno));
/* Not much we can do here sanely; just wait a second and hope
* it was transient.
*/
g_usleep (G_USEC_PER_SEC);
continue;
}
for (i = 0; i < bytes_read; i++)
{
switch (b[i])
{
case _UNIX_SIGNAL_PIPE_SIGCHLD_CHAR:
/* The child watch source will call waitpid() in its
* prepare() and check() methods; however, we don't
* know which pid exited, so we need to wake up
* all contexts. Note: actually we could get the pid
* from the "siginfo_t" via the handler, but to pass
* that info down the pipe would require a more structured
* data stream (as opposed to a single byte).
*/
break;
case _UNIX_SIGNAL_PIPE_SIGTERM_CHAR:
sigterm_received = TRUE;
break;
case _UNIX_SIGNAL_PIPE_SIGHUP_CHAR:
sighup_received = TRUE;
break;
case _UNIX_SIGNAL_PIPE_SIGINT_CHAR:
sigint_received = TRUE;
break;
default:
g_warning ("Invalid char '%c' read from child watch pipe", b[i]);
break;
}
}
if (sigterm_received)
deliver_unix_signal (SIGTERM);
if (sigint_received)
deliver_unix_signal (SIGINT);
if (sighup_received)
deliver_unix_signal (SIGHUP);
_g_main_wake_up_all_contexts ();
}
}
static void
g_child_watch_source_init (void)
{
G_LOCK (unix_signal_lock);
ensure_unix_signal_handler_installed_unlocked (SIGCHLD);
G_UNLOCK (unix_signal_lock);
} }
#endif /* !G_OS_WIN32 */ #endif /* !G_OS_WIN32 */
@ -4480,17 +4404,22 @@ g_child_watch_source_new (GPid pid)
GSource *source = g_source_new (&g_child_watch_funcs, sizeof (GChildWatchSource)); GSource *source = g_source_new (&g_child_watch_funcs, sizeof (GChildWatchSource));
GChildWatchSource *child_watch_source = (GChildWatchSource *)source; GChildWatchSource *child_watch_source = (GChildWatchSource *)source;
child_watch_source->pid = pid;
#ifdef G_OS_WIN32 #ifdef G_OS_WIN32
child_watch_source->poll.fd = (gintptr) pid; child_watch_source->poll.fd = (gintptr) pid;
child_watch_source->poll.events = G_IO_IN; child_watch_source->poll.events = G_IO_IN;
g_source_add_poll (source, &child_watch_source->poll); g_source_add_poll (source, &child_watch_source->poll);
#else /* G_OS_WIN32 */ #else /* G_OS_WIN32 */
g_child_watch_source_init (); G_LOCK (unix_signal_lock);
ensure_unix_signal_handler_installed_unlocked (SIGCHLD);
unix_child_watches = g_slist_prepend (unix_child_watches, child_watch_source);
if (waitpid (pid, &child_watch_source->child_status, WNOHANG) > 0)
child_watch_source->child_exited = TRUE;
G_UNLOCK (unix_signal_lock);
#endif /* G_OS_WIN32 */ #endif /* G_OS_WIN32 */
child_watch_source->pid = pid;
return source; return source;
} }
@ -4840,10 +4769,13 @@ g_main_context_invoke_full (GMainContext *context,
static gpointer static gpointer
glib_worker_main (gpointer data) glib_worker_main (gpointer data)
{ {
LOCK_CONTEXT (glib_worker_context);
while (TRUE) while (TRUE)
g_main_context_iterate (glib_worker_context, TRUE, TRUE, G_THREAD_SELF); {
g_main_context_iteration (glib_worker_context, TRUE);
if (any_unix_signal_pending)
dispatch_unix_signals ();
}
return NULL; /* worst GCC warning message ever... */ return NULL; /* worst GCC warning message ever... */
} }
@ -4851,7 +4783,7 @@ glib_worker_main (gpointer data)
GMainContext * GMainContext *
glib_get_worker_context (void) glib_get_worker_context (void)
{ {
gsize initialised; static gsize initialised;
g_thread_init_glib (); g_thread_init_glib ();