Merge branch 'gwakeup'

This commit is contained in:
Ryan Lortie 2011-07-26 10:50:25 +02:00
commit 12f516b44c
12 changed files with 634 additions and 299 deletions

View File

@ -2641,6 +2641,7 @@ main (void)
if test x"$glib_cv_eventfd" = x"yes"; then
AC_DEFINE(HAVE_EVENTFD, 1, [we have the eventfd(2) system call])
fi
AM_CONDITIONAL(HAVE_EVENTFD, [test "$glib_cv_eventfd" = "yes"])
dnl ****************************************
dnl *** GLib POLL* compatibility defines ***

View File

@ -22,6 +22,9 @@ MKDB_OPTIONS=--sgml-mode --output-format=xml --name-space=g
HFILE_GLOB=$(addprefix $(top_srcdir)/glib/,$(shell cat $(top_builddir)/glib/glib-public-headers.txt)) $(top_srcdir)/gmodule/*.h
CFILE_GLOB=$(top_srcdir)/glib/*.c $(top_srcdir)/gmodule/*.c
# Ignore some private headers
IGNORE_HFILES = gwakeup.h
# Images to copy into HTML directory
HTML_IMAGES = \
file-name-encodings.png \

View File

@ -20,6 +20,7 @@ gurifuncs.sgml
gunix.sgml
gvarianttype.sgml
gvariant.sgml
gwakeup.sgml
hash_tables.sgml
iochannels.sgml
linked_lists_double.sgml

View File

@ -286,6 +286,7 @@ libgio_2_0_la_SOURCES = \
gasyncresult.c \
gbufferedinputstream.c \
gbufferedoutputstream.c \
../glib/gwakeup.c \
gcancellable.c \
gcontenttype.c \
gcontenttypeprivate.h \

View File

@ -22,18 +22,8 @@
#include "config.h"
#include "glib.h"
#ifdef G_OS_UNIX
#include "glib-unix.h"
#ifdef HAVE_EVENTFD
#include <sys/eventfd.h>
#endif
#endif
#include <gioerror.h>
#ifdef G_OS_WIN32
#include <errno.h>
#include <windows.h>
#include <io.h>
#endif
#include "gwakeup.h"
#include "gcancellable.h"
#include "glibintl.h"
@ -60,12 +50,7 @@ struct _GCancellablePrivate
guint cancelled_running_waiting : 1;
guint fd_refcount;
/* If cancel_pipe[0] is != -1 and cancel_pipe[1] is -1, it is an eventfd */
int cancel_pipe[2];
#ifdef G_OS_WIN32
HANDLE event;
#endif
GWakeup *wakeup;
};
static guint signals[LAST_SIGNAL] = { 0 };
@ -75,41 +60,14 @@ G_DEFINE_TYPE (GCancellable, g_cancellable, G_TYPE_OBJECT);
static GStaticPrivate current_cancellable = G_STATIC_PRIVATE_INIT;
G_LOCK_DEFINE_STATIC(cancellable);
static GCond *cancellable_cond = NULL;
static void
g_cancellable_close_pipe (GCancellable *cancellable)
{
GCancellablePrivate *priv;
priv = cancellable->priv;
if (priv->cancel_pipe[0] != -1)
{
close (priv->cancel_pipe[0]);
priv->cancel_pipe[0] = -1;
}
if (priv->cancel_pipe[1] != -1)
{
close (priv->cancel_pipe[1]);
priv->cancel_pipe[1] = -1;
}
#ifdef G_OS_WIN32
if (priv->event)
{
CloseHandle (priv->event);
priv->event = NULL;
}
#endif
}
static void
g_cancellable_finalize (GObject *object)
{
GCancellable *cancellable = G_CANCELLABLE (object);
g_cancellable_close_pipe (cancellable);
if (cancellable->priv->wakeup)
g_wakeup_free (cancellable->priv->wakeup);
G_OBJECT_CLASS (g_cancellable_parent_class)->finalize (object);
}
@ -195,88 +153,12 @@ g_cancellable_class_init (GCancellableClass *klass)
}
static void
g_cancellable_write_cancelled (GCancellable *cancellable)
{
gssize c;
GCancellablePrivate *priv;
const char ch = 'x';
priv = cancellable->priv;
#ifdef G_OS_WIN32
if (priv->event)
SetEvent (priv->event);
#else
if (priv->cancel_pipe[0] == -1)
return;
g_assert (cancellable->priv->cancelled);
#ifdef HAVE_EVENTFD
if (priv->cancel_pipe[1] == -1)
{
guint64 buf = 1;
do
c = write (priv->cancel_pipe[0], &buf, sizeof (buf));
while (c == -1 && errno == EINTR);
return;
}
#endif /* HAVE_EVENTFD */
do
c = write (priv->cancel_pipe[1], &ch, 1);
while (c == -1 && errno == EINTR);
#endif /* G_OS_WIN32 */
}
#ifndef G_OS_WIN32
static void
g_cancellable_open_pipe (GCancellable *cancellable)
{
GCancellablePrivate *priv;
priv = cancellable->priv;
#ifdef HAVE_EVENTFD
priv->cancel_pipe[0] = eventfd (0, EFD_CLOEXEC | EFD_NONBLOCK);
if (priv->cancel_pipe[0] >= 0)
{
if (priv->cancelled)
g_cancellable_write_cancelled (cancellable);
return;
}
else if (!(errno == ENOSYS || errno == EINVAL))
{
return;
}
/* Fall through on ENOSYS or EINVAL */
#endif
if (g_unix_open_pipe (priv->cancel_pipe, FD_CLOEXEC, NULL))
{
/* Make them nonblocking, just to be sure we don't block
* on errors and stuff
*/
g_unix_set_fd_nonblocking (priv->cancel_pipe[0], TRUE, NULL);
g_unix_set_fd_nonblocking (priv->cancel_pipe[1], TRUE, NULL);
if (priv->cancelled)
g_cancellable_write_cancelled (cancellable);
}
}
#endif
static void
g_cancellable_init (GCancellable *cancellable)
{
cancellable->priv = G_TYPE_INSTANCE_GET_PRIVATE (cancellable,
G_TYPE_CANCELLABLE,
GCancellablePrivate);
cancellable->priv->cancel_pipe[0] = -1;
cancellable->priv->cancel_pipe[1] = -1;
}
/**
@ -388,37 +270,11 @@ g_cancellable_reset (GCancellable *cancellable)
g_cond_wait (cancellable_cond,
g_static_mutex_get_mutex (& G_LOCK_NAME (cancellable)));
}
if (priv->cancelled)
{
/* Make sure we're not leaving old cancel state around */
#ifdef G_OS_WIN32
if (priv->event)
ResetEvent (priv->event);
#endif
if (priv->cancel_pipe[0] != -1)
{
gssize c;
#ifdef HAVE_EVENTFD
if (priv->cancel_pipe[1] == -1)
{
guint64 buf;
do
c = read (priv->cancel_pipe[0], &buf, sizeof(buf));
while (c == -1 && errno == EINTR);
}
else
#endif
{
char ch;
do
c = read (priv->cancel_pipe[0], &ch, 1);
while (c == -1 && errno == EINTR);
}
}
if (priv->wakeup)
g_wakeup_acknowledge (priv->wakeup);
priv->cancelled = FALSE;
}
@ -490,27 +346,15 @@ g_cancellable_set_error_if_cancelled (GCancellable *cancellable,
int
g_cancellable_get_fd (GCancellable *cancellable)
{
GCancellablePrivate *priv;
int fd;
if (cancellable == NULL)
return -1;
priv = cancellable->priv;
GPollFD pollfd;
#ifdef G_OS_WIN32
return -1;
pollfd.fd = -1;
#else
G_LOCK(cancellable);
if (priv->cancel_pipe[0] == -1)
g_cancellable_open_pipe (cancellable);
fd = priv->cancel_pipe[0];
if (fd != -1)
priv->fd_refcount++;
G_UNLOCK(cancellable);
g_cancellable_make_pollfd (cancellable, &pollfd);
#endif
return fd;
return pollfd.fd;
}
/**
@ -550,39 +394,21 @@ g_cancellable_make_pollfd (GCancellable *cancellable, GPollFD *pollfd)
return FALSE;
g_return_val_if_fail (G_IS_CANCELLABLE (cancellable), FALSE);
{
#ifdef G_OS_WIN32
GCancellablePrivate *priv;
G_LOCK(cancellable);
priv = cancellable->priv;
G_LOCK(cancellable);
if (priv->event == NULL)
{
/* A manual reset anonymous event, starting unset */
priv->event = CreateEvent (NULL, TRUE, FALSE, NULL);
if (priv->event == NULL)
{
G_UNLOCK(cancellable);
return FALSE;
}
if (priv->cancelled)
SetEvent(priv->event);
}
priv->fd_refcount++;
G_UNLOCK(cancellable);
cancellable->priv->fd_refcount++;
pollfd->fd = (gintptr)priv->event;
#else /* !G_OS_WIN32 */
int fd = g_cancellable_get_fd (cancellable);
if (cancellable->priv->wakeup == NULL)
{
cancellable->priv->wakeup = g_wakeup_new ();
if (fd == -1)
return FALSE;
pollfd->fd = fd;
#endif /* G_OS_WIN32 */
}
if (cancellable->priv->cancelled)
g_wakeup_signal (cancellable->priv->wakeup);
}
pollfd->events = G_IO_IN;
pollfd->revents = 0;
g_wakeup_get_pollfd (cancellable->priv->wakeup, pollfd);
G_UNLOCK(cancellable);
return TRUE;
}
@ -619,7 +445,10 @@ g_cancellable_release_fd (GCancellable *cancellable)
G_LOCK (cancellable);
priv->fd_refcount--;
if (priv->fd_refcount == 0)
g_cancellable_close_pipe (cancellable);
{
g_wakeup_free (priv->wakeup);
priv->wakeup = NULL;
}
G_UNLOCK (cancellable);
}
@ -662,8 +491,9 @@ g_cancellable_cancel (GCancellable *cancellable)
priv->cancelled = TRUE;
priv->cancelled_running = TRUE;
g_cancellable_write_cancelled (cancellable);
if (priv->wakeup)
g_wakeup_signal (priv->wakeup);
G_UNLOCK(cancellable);

View File

@ -198,6 +198,8 @@ libglib_2_0_la_SOURCES = \
gvarianttypeinfo.h \
gvarianttypeinfo.c \
gvarianttype.c \
gwakeup.h \
gwakeup.c \
gdebug.h \
gprintf.c \
gprintfint.h

View File

@ -96,6 +96,8 @@
#include "gtimer.h"
#endif
#include "gwakeup.h"
/**
* SECTION:main
* @title: The Main Event Loop
@ -235,14 +237,8 @@ struct _GMainContext
GPollFD *cached_poll_array;
guint cached_poll_array_size;
#ifdef G_THREADS_ENABLED
#ifndef G_OS_WIN32
/* this pipe is used to wake up the main loop when a source is added.
*/
gint wake_up_pipe[2];
#else /* G_OS_WIN32 */
HANDLE wake_up_semaphore;
#endif /* G_OS_WIN32 */
#ifdef G_THREADS_ENABLED
GWakeup *wakeup;
GPollFD wake_up_rec;
gboolean poll_waiting;
@ -530,16 +526,8 @@ g_main_context_unref (GMainContext *context)
#ifdef G_THREADS_ENABLED
if (g_thread_supported())
{
#ifndef G_OS_WIN32
if (context->wake_up_pipe[0] != -1)
close (context->wake_up_pipe[0]);
if (context->wake_up_pipe[1] != -1)
close (context->wake_up_pipe[1]);
#else
CloseHandle (context->wake_up_semaphore);
#endif
}
g_wakeup_free (context->wakeup);
else
main_contexts_without_pipe = g_slist_remove (main_contexts_without_pipe,
context);
@ -547,58 +535,16 @@ g_main_context_unref (GMainContext *context)
if (context->cond != NULL)
g_cond_free (context->cond);
#endif
g_free (context);
}
#ifdef G_THREADS_ENABLED
static void
static void
g_main_context_init_pipe (GMainContext *context)
{
GError *error = NULL;
# ifndef G_OS_WIN32
if (context->wake_up_pipe[0] != -1)
return;
#ifdef HAVE_EVENTFD
{
int efd;
efd = eventfd (0, EFD_CLOEXEC);
/* Fall through on -EINVAL too in case kernel doesn't know EFD_CLOEXEC. Bug #653570 */
if (efd == -1 && (errno == ENOSYS || errno == EINVAL))
{
if (!g_unix_open_pipe (context->wake_up_pipe, FD_CLOEXEC, &error))
g_error ("Cannot create pipe main loop wake-up: %s", error->message);
}
else if (efd >= 0)
{
context->wake_up_pipe[0] = efd;
}
else
g_error ("Cannot create eventfd for main loop wake-up: %s", g_strerror (errno));
}
#else
if (!g_unix_open_pipe (context->wake_up_pipe, FD_CLOEXEC, &error))
g_error ("Cannot create pipe main loop wake-up: %s", error->message);
#endif
context->wake_up_rec.fd = context->wake_up_pipe[0];
context->wake_up_rec.events = G_IO_IN;
# else
if (context->wake_up_semaphore != NULL)
return;
context->wake_up_semaphore = CreateSemaphore (NULL, 0, 100, NULL);
if (context->wake_up_semaphore == NULL)
g_error ("Cannot create wake-up semaphore: %s",
g_win32_error_message (GetLastError ()));
context->wake_up_rec.fd = (gintptr) context->wake_up_semaphore;
context->wake_up_rec.events = G_IO_IN;
if (_g_main_poll_debug)
g_print ("wake-up semaphore: %p\n", context->wake_up_semaphore);
# endif
context->wakeup = g_wakeup_new ();
g_wakeup_get_pollfd (context->wakeup, &context->wake_up_rec);
g_main_context_add_poll_unlocked (context, 0, &context->wake_up_rec);
}
@ -648,13 +594,6 @@ g_main_context_new (void)
context->owner = NULL;
context->waiters = NULL;
# ifndef G_OS_WIN32
context->wake_up_pipe[0] = -1;
context->wake_up_pipe[1] = -1;
# else
context->wake_up_semaphore = NULL;
# endif
#endif
context->ref_count = 1;
@ -2967,22 +2906,8 @@ g_main_context_check (GMainContext *context,
#ifdef G_THREADS_ENABLED
if (!context->poll_waiting)
{
#ifndef G_OS_WIN32
#ifdef HAVE_EVENTFD
if (context->wake_up_pipe[1] == -1)
{
guint64 buf;
read (context->wake_up_pipe[0], &buf, sizeof(guint64));
}
else
#endif
{
gchar a;
read (context->wake_up_pipe[0], &a, 1);
}
#endif
}
g_wakeup_acknowledge (context->wakeup);
else
context->poll_waiting = FALSE;
@ -3830,19 +3755,7 @@ g_main_context_wakeup_unlocked (GMainContext *context)
if (g_thread_supported() && context->poll_waiting)
{
context->poll_waiting = FALSE;
#ifndef G_OS_WIN32
#ifdef HAVE_EVENTFD
if (context->wake_up_pipe[1] == -1)
{
guint64 buf = 1;
write (context->wake_up_pipe[0], &buf, sizeof(buf));
}
else
#endif
write (context->wake_up_pipe[1], "A", 1);
#else
ReleaseSemaphore (context->wake_up_semaphore, 1, NULL);
#endif
g_wakeup_signal (context->wakeup);
}
#endif
}

260
glib/gwakeup.c Normal file
View File

@ -0,0 +1,260 @@
/*
* Copyright © 2011 Canonical Limited
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2 of the licence, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*
* Author: Ryan Lortie <desrt@desrt.ca>
*/
#include "config.h"
/* gwakeup.h is special -- GIO and some test cases include it. As such,
* it cannot include other glib headers without triggering the single
* includes warnings. We have to manually include its dependencies here
* (and at all other use sites).
*/
#ifdef GLIB_COMPILATION
#include "gtypes.h"
#include "gpoll.h"
#else
#include <glib.h>
#endif
#include "gwakeup.h"
/**
* SECTION:gwakeup
* @title: GWakeup
* @short_description: portable cross-thread event signal mechanism
*
* #GWakeup is a simple and portable way of signaling events between
* different threads in a way that integrates nicely with g_poll().
* GLib uses it internally for cross-thread signalling in the
* implementation of #GMainContext and #GCancellable.
*
* You first create a #GWakeup with g_wakeup_new() and initialise a
* #GPollFD from it using g_wakeup_get_pollfd(). Polling on the created
* #GPollFD will block until g_wakeup_signal() is called, at which point
* it will immediately return. Future attempts to poll will continue to
* return until g_wakeup_acknowledge() is called. g_wakeup_free() is
* used to free a #GWakeup.
*
* On sufficiently modern Linux, this is implemented using eventfd. On
* Windows it is implemented using an event handle. On other systems it
* is implemented with a pair of pipes.
*
* Since: 2.30
**/
#ifdef _WIN32
#include <windows.h>
#ifdef GLIB_COMPILATION
#include "gmessages.h"
#include "giochannel.h"
#include "gwin32.h"
#endif
GWakeup *
g_wakeup_new (void)
{
HANDLE wakeup;
wakeup = CreateEvent (NULL, TRUE, FALSE, NULL);
if (wakeup == NULL)
g_error ("Cannot create event for GWakeup: %s",
g_win32_error_message (GetLastError ()));
return (GWakeup *) wakeup;
}
void
g_wakeup_get_pollfd (GWakeup *wakeup,
GPollFD *poll_fd)
{
poll_fd->fd = (gintptr) wakeup;
poll_fd->events = G_IO_IN;
}
void
g_wakeup_acknowledge (GWakeup *wakeup)
{
ResetEvent ((HANDLE) wakeup);
}
void
g_wakeup_signal (GWakeup *wakeup)
{
SetEvent ((HANDLE) wakeup);
}
void
g_wakeup_free (GWakeup *wakeup)
{
CloseHandle ((HANDLE) wakeup);
}
#else
#include "glib-unix.h"
#include <fcntl.h>
#if defined (HAVE_EVENTFD)
#include <sys/eventfd.h>
#endif
struct _GWakeup
{
gint fds[2];
};
/**
* g_wakeup_new:
*
* Creates a new #GWakeup.
*
* You should use g_wakeup_free() to free it when you are done.
*
* Returns: a new #GWakeup
*
* Since: 2.30
**/
GWakeup *
g_wakeup_new (void)
{
GError *error = NULL;
GWakeup *wakeup;
wakeup = g_slice_new (GWakeup);
/* try eventfd first, if we think we can */
#if defined (HAVE_EVENTFD)
#ifndef TEST_EVENTFD_FALLBACK
wakeup->fds[0] = eventfd (0, EFD_CLOEXEC | EFD_NONBLOCK);
#else
wakeup->fds[0] = -1;
#endif
if (wakeup->fds[0] != -1)
{
wakeup->fds[1] = -1;
return wakeup;
}
/* for any failure, try a pipe instead */
#endif
if (!g_unix_open_pipe (wakeup->fds, FD_CLOEXEC, &error))
g_error ("Creating pipes for GWakeup: %s\n", error->message);
if (!g_unix_set_fd_nonblocking (wakeup->fds[0], TRUE, &error) ||
!g_unix_set_fd_nonblocking (wakeup->fds[1], TRUE, &error))
g_error ("Set pipes non-blocking for GWakeup: %s\n", error->message);
return wakeup;
}
/**
* g_wakeup_get_pollfd:
* @wakeup: a #GWakeup
* @poll_fd: a #GPollFD
*
* Prepares a @poll_fd such that polling on it will succeed when
* g_wakeup_signal() has been called on @wakeup.
*
* @poll_fd is valid until @wakeup is freed.
*
* Since: 2.30
**/
void
g_wakeup_get_pollfd (GWakeup *wakeup,
GPollFD *poll_fd)
{
poll_fd->fd = wakeup->fds[0];
poll_fd->events = G_IO_IN;
}
/**
* g_wakeup_acknowledge:
* @wakeup: a #GWakeup
*
* Acknowledges receipt of a wakeup signal on @wakeup.
*
* You must call this after @wakeup polls as ready. If not, it will
* continue to poll as ready until you do so.
*
* If you call this function and @wakeup is not signaled, nothing
* happens.
*
* Since: 2.30
**/
void
g_wakeup_acknowledge (GWakeup *wakeup)
{
char buffer[16];
/* read until it is empty */
while (read (wakeup->fds[0], buffer, sizeof buffer) == sizeof buffer);
}
/**
* g_wakeup_signal:
* @wakeup: a #GWakeup
*
* Signals @wakeup.
*
* Any future (or present) polling on the #GPollFD returned by
* g_wakeup_get_pollfd() will immediately succeed until such a time as
* g_wakeup_acknowledge() is called.
*
* This function is safe to call from a UNIX signal handler.
*
* Since: 2.30
**/
void
g_wakeup_signal (GWakeup *wakeup)
{
guint64 one = 1;
if (wakeup->fds[1] == -1)
write (wakeup->fds[0], &one, sizeof one);
else
write (wakeup->fds[1], &one, 1);
}
/**
* g_wakeup_free:
* @wakeup: a #GWakeup
*
* Frees @wakeup.
*
* You must not currently be polling on the #GPollFD returned by
* g_wakeup_get_pollfd(), or the result is undefined.
**/
void
g_wakeup_free (GWakeup *wakeup)
{
close (wakeup->fds[0]);
if (wakeup->fds[1] != -1)
close (wakeup->fds[1]);
g_slice_free (GWakeup, wakeup);
}
#endif /* !_WIN32 */

35
glib/gwakeup.h Normal file
View File

@ -0,0 +1,35 @@
/*
* Copyright © 2011 Canonical Limited
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2 of the licence, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*
* Author: Ryan Lortie <desrt@desrt.ca>
*/
#ifndef __G_WAKEUP_H__
#define __G_WAKEUP_H__
typedef struct _GWakeup GWakeup;
G_GNUC_INTERNAL GWakeup * g_wakeup_new (void);
G_GNUC_INTERNAL void g_wakeup_free (GWakeup *wakeup);
G_GNUC_INTERNAL void g_wakeup_get_pollfd (GWakeup *wakeup,
GPollFD *poll_fd);
G_GNUC_INTERNAL void g_wakeup_signal (GWakeup *wakeup);
G_GNUC_INTERNAL void g_wakeup_acknowledge (GWakeup *wakeup);
#endif

View File

@ -3,6 +3,8 @@
642026
642026-ec
atomic
gwakeup
gwakeup-fallback
spawn-multithreaded
spawn-singlethread
test-spawn-echo

View File

@ -43,3 +43,14 @@ spawn_multithreaded_LDADD = $(progs_ldadd) $(top_builddir)/gthread/libgthread
TEST_PROGS += spawn-singlethread
spawn_singlethread_SOURCES = spawn-singlethread.c
spawn_singlethread_LDADD = $(progs_ldadd) $(top_builddir)/gthread/libgthread-2.0.la
TEST_PROGS += gwakeup
gwakeup_SOURCES = gwakeuptest.c ../../glib/gwakeup.c
gwakeup_LDADD = $(progs_ldadd) $(top_builddir)/gthread/libgthread-2.0.la
if HAVE_EVENTFD
TEST_PROGS += gwakeup-fallback
gwakeup_fallback_SOURCES = gwakeuptest.c ../../glib/gwakeup.c
gwakeup_fallback_CFLAGS = $(AM_CFLAGS) -DTEST_EVENTFD_FALLBACK
gwakeup_fallback_LDADD = $(progs_ldadd) $(top_builddir)/gthread/libgthread-2.0.la
endif

276
gthread/tests/gwakeuptest.c Normal file
View File

@ -0,0 +1,276 @@
#include <unistd.h>
#include <glib.h>
#include <glib/gwakeup.h>
#ifdef _WIN32
void alarm (int sec) { }
#endif
static gboolean
check_signaled (GWakeup *wakeup)
{
GPollFD fd;
g_wakeup_get_pollfd (wakeup, &fd);
return g_poll (&fd, 1, 0);
}
static void
wait_for_signaled (GWakeup *wakeup)
{
GPollFD fd;
g_wakeup_get_pollfd (wakeup, &fd);
g_poll (&fd, 1, -1);
}
static void
test_semantics (void)
{
GWakeup *wakeup;
gint i;
/* prevent the test from deadlocking */
alarm (30);
wakeup = g_wakeup_new ();
g_assert (!check_signaled (wakeup));
g_wakeup_signal (wakeup);
g_assert (check_signaled (wakeup));
g_wakeup_acknowledge (wakeup);
g_assert (!check_signaled (wakeup));
g_wakeup_free (wakeup);
/* free unused */
wakeup = g_wakeup_new ();
g_wakeup_free (wakeup);
/* free while signaled */
wakeup = g_wakeup_new ();
g_wakeup_signal (wakeup);
g_wakeup_free (wakeup);
/* ensure excessive signalling doesn't deadlock */
wakeup = g_wakeup_new ();
for (i = 0; i < 1000000; i++)
g_wakeup_signal (wakeup);
g_assert (check_signaled (wakeup));
/* ensure a single acknowledgement is sufficient */
g_wakeup_acknowledge (wakeup);
g_assert (!check_signaled (wakeup));
g_wakeup_free (wakeup);
/* cancel the alarm */
alarm (0);
}
struct token
{
gpointer owner;
gint ttl;
};
struct context
{
GSList *pending_tokens;
GStaticMutex lock;
GWakeup *wakeup;
gboolean quit;
};
#define NUM_THREADS 50
#define NUM_TOKENS 5
#define TOKEN_TTL 100000
static struct context contexts[NUM_THREADS];
static GThread *threads[NUM_THREADS];
static GWakeup *last_token_wakeup;
static volatile gint tokens_alive;
static void
context_init (struct context *ctx)
{
GStaticMutex lock = G_STATIC_MUTEX_INIT;
ctx->pending_tokens = NULL;
ctx->lock = lock;
ctx->wakeup = g_wakeup_new ();
ctx->quit = FALSE;
}
static void
context_clear (struct context *ctx)
{
g_assert (ctx->pending_tokens == NULL);
g_assert (ctx->quit);
g_wakeup_free (ctx->wakeup);
}
static void
context_quit (struct context *ctx)
{
ctx->quit = TRUE;
g_wakeup_signal (ctx->wakeup);
}
static struct token *
context_pop_token (struct context *ctx)
{
struct token *token;
g_static_mutex_lock (&ctx->lock);
token = ctx->pending_tokens->data;
ctx->pending_tokens = g_slist_remove_link (ctx->pending_tokens,
ctx->pending_tokens);
g_static_mutex_unlock (&ctx->lock);
return token;
}
static void
context_push_token (struct context *ctx,
struct token *token)
{
g_assert (token->owner == ctx);
g_static_mutex_lock (&ctx->lock);
ctx->pending_tokens = g_slist_prepend (ctx->pending_tokens, token);
g_static_mutex_unlock (&ctx->lock);
g_wakeup_signal (ctx->wakeup);
}
static void
dispatch_token (struct token *token)
{
if (token->ttl > 0)
{
struct context *ctx;
gint next_ctx;
next_ctx = g_test_rand_int_range (0, NUM_THREADS);
ctx = &contexts[next_ctx];
token->owner = ctx;
token->ttl--;
context_push_token (ctx, token);
}
else
{
g_slice_free (struct token, token);
if (g_atomic_int_dec_and_test (&tokens_alive))
g_wakeup_signal (last_token_wakeup);
}
}
static struct token *
token_new (int ttl)
{
struct token *token;
token = g_slice_new (struct token);
token->ttl = ttl;
g_atomic_int_inc (&tokens_alive);
return token;
}
static gpointer
thread_func (gpointer data)
{
struct context *ctx = data;
while (!ctx->quit)
{
wait_for_signaled (ctx->wakeup);
g_wakeup_acknowledge (ctx->wakeup);
while (ctx->pending_tokens)
{
struct token *token;
token = context_pop_token (ctx);
g_assert (token->owner == ctx);
dispatch_token (token);
}
}
return NULL;
}
static void
test_threaded (void)
{
gint i;
/* make sure we don't block forever */
alarm (30);
/* simple mainloop test based on GWakeup.
*
* create a bunch of contexts and a thread to 'run' each one. create
* some tokens and randomly pass them between the threads, until the
* TTL on each token is zero.
*
* when no tokens are left, signal that we are done. the mainthread
* will then signal each worker thread to exit and join them to make
* sure that works.
*/
last_token_wakeup = g_wakeup_new ();
/* create contexts, assign to threads */
for (i = 0; i < NUM_THREADS; i++)
{
context_init (&contexts[i]);
threads[i] = g_thread_create (thread_func, &contexts[i], TRUE, NULL);
}
/* dispatch tokens */
for (i = 0; i < NUM_TOKENS; i++)
dispatch_token (token_new (TOKEN_TTL));
/* wait until all tokens are gone */
wait_for_signaled (last_token_wakeup);
/* ask threads to quit, join them, cleanup */
for (i = 0; i < NUM_THREADS; i++)
{
context_quit (&contexts[i]);
g_thread_join (threads[i]);
context_clear (&contexts[i]);
}
g_wakeup_free (last_token_wakeup);
/* cancel alarm */
alarm (0);
}
int
main (int argc, char **argv)
{
g_thread_init (NULL);
g_test_init (&argc, &argv, NULL);
#ifdef TEST_EVENTFD_FALLBACK
#define TESTNAME_SUFFIX "-fallback"
#else
#define TESTNAME_SUFFIX
#endif
g_test_add_func ("/gwakeup/semantics" TESTNAME_SUFFIX, test_semantics);
g_test_add_func ("/gwakeup/threaded" TESTNAME_SUFFIX, test_threaded);
return g_test_run ();
}