- Added new API g_thread_pool_get_idle_time() and

* docs/reference/glib/glib-sections.txt:
* glib/glib.symbols:
* glib/gthreadpool.[ch]:
- Added new API g_thread_pool_get_idle_time() and
g_thread_pool_set_idle_time(). (#324228).

* tests/threadpool-test.c:
- Updated test case to do thread pool sorting, thread pool with
no sorting and a thread pool with idle thread timeouts.
This commit is contained in:
Martyn James Russell 2006-01-03 15:09:52 +00:00
parent be5d72cba0
commit 941faa1ca9
8 changed files with 292 additions and 35 deletions

View File

@ -1,3 +1,15 @@
2006-01-03 Martyn Russell <martyn@imendio.com>
* docs/reference/glib/glib-sections.txt:
* glib/glib.symbols:
* glib/gthreadpool.[ch]:
- Added new API g_thread_pool_get_idle_time() and
g_thread_pool_set_idle_time(). (#324228).
* tests/threadpool-test.c:
- Updated test case to do thread pool sorting, thread pool with
no sorting and a thread pool with idle thread timeouts.
2006-01-03 Matthias Clasen <mclasen@redhat.com>
* glib/gmain.h: Add new functions here, too.

View File

@ -1,3 +1,15 @@
2006-01-03 Martyn Russell <martyn@imendio.com>
* docs/reference/glib/glib-sections.txt:
* glib/glib.symbols:
* glib/gthreadpool.[ch]:
- Added new API g_thread_pool_get_idle_time() and
g_thread_pool_set_idle_time(). (#324228).
* tests/threadpool-test.c:
- Updated test case to do thread pool sorting, thread pool with
no sorting and a thread pool with idle thread timeouts.
2006-01-03 Matthias Clasen <mclasen@redhat.com>
* glib/gmain.h: Add new functions here, too.

View File

@ -1,3 +1,15 @@
2006-01-03 Martyn Russell <martyn@imendio.com>
* docs/reference/glib/glib-sections.txt:
* glib/glib.symbols:
* glib/gthreadpool.[ch]:
- Added new API g_thread_pool_get_idle_time() and
g_thread_pool_set_idle_time(). (#324228).
* tests/threadpool-test.c:
- Updated test case to do thread pool sorting, thread pool with
no sorting and a thread pool with idle thread timeouts.
2006-01-03 Matthias Clasen <mclasen@redhat.com>
* glib/gmain.h: Add new functions here, too.

View File

@ -639,6 +639,8 @@ g_thread_pool_get_max_unused_threads
g_thread_pool_get_num_unused_threads
g_thread_pool_stop_unused_threads
g_thread_pool_set_sort_function
g_thread_pool_set_max_idle_time
g_thread_pool_get_max_idle_time
</SECTION>
<SECTION>

View File

@ -1087,12 +1087,14 @@ g_thread_foreach
g_thread_pool_free
g_thread_pool_get_max_threads
g_thread_pool_get_max_unused_threads
g_thread_pool_get_max_idle_time
g_thread_pool_get_num_threads
g_thread_pool_get_num_unused_threads
g_thread_pool_new
g_thread_pool_push
g_thread_pool_set_max_threads
g_thread_pool_set_max_unused_threads
g_thread_pool_set_max_idle_time
g_thread_pool_stop_unused_threads
g_thread_pool_unprocessed
g_thread_pool_set_sort_function

View File

@ -29,6 +29,7 @@
#include "glib.h"
#include "galias.h"
#define debug(...) /* g_printerr (__VA_ARGS__) */
typedef struct _GRealThreadPool GRealThreadPool;
@ -54,7 +55,8 @@ static const gpointer stop_this_thread_marker = (gpointer) &g_thread_pool_new;
static GAsyncQueue *unused_thread_queue;
static gint unused_threads = 0;
static gint max_unused_threads = 0;
G_LOCK_DEFINE_STATIC (unused_threads);
static guint max_idle_time = 0;
G_LOCK_DEFINE_STATIC (settings);
static GMutex *inform_mutex = NULL;
static GCond *inform_cond = NULL;
@ -92,10 +94,12 @@ g_thread_pool_thread_proxy (gpointer data)
GRealThreadPool *pool = data;
gboolean watcher = FALSE;
debug("pool:0x%.8x entering proxy ...\n", (guint)pool);
g_async_queue_lock (pool->queue);
while (TRUE)
{
gpointer task;
gpointer task = NULL;
gboolean goto_global_pool = !pool->pool.exclusive;
gint len = g_async_queue_length_unlocked (pool->queue);
@ -111,10 +115,37 @@ g_thread_pool_thread_proxy (gpointer data)
GTimeVal end_time;
g_get_current_time (&end_time);
g_time_val_add (&end_time, G_USEC_PER_SEC / 2); /* 1/2 second */
debug("pool:0x%.8x waiting 1/2 second to pop next item "
"in queue (%d running, %d unprocessed) ...\n",
(guint)pool,
pool->num_threads,
g_async_queue_length_unlocked (pool->queue));
task = g_async_queue_timed_pop_unlocked (pool->queue, &end_time);
}
else if (g_thread_pool_get_max_idle_time() > 0)
{
/* We always give a maximum time to pop the next task so
* we know that when we evaluate task further down, that
* it has had the maximum time to get a new task and it
* can die */
GTimeVal end_time;
g_get_current_time (&end_time);
debug("pool:0x%.8x waiting %d ms max to pop next item in "
"queue (%d running, %d unprocessed) or exiting ...\n",
(guint)pool,
g_thread_pool_get_max_idle_time (),
pool->num_threads,
g_async_queue_length_unlocked (pool->queue));
g_time_val_add (&end_time, g_thread_pool_get_max_idle_time () * 1000);
task = g_async_queue_timed_pop_unlocked (pool->queue, &end_time);
}
else
task = g_async_queue_pop_unlocked (pool->queue);
{
task = g_async_queue_pop_unlocked (pool->queue);
debug("pool:0x%.8x new task:0x%.8x poped from pool queue ...\n",
(guint)pool, (guint)task);
}
if (task)
{
@ -125,19 +156,49 @@ g_thread_pool_thread_proxy (gpointer data)
* the global pool and just hand the data further to
* the next one waiting in the queue */
{
debug("pool:0x%.8x, task:0x%.8x we have too many threads "
"and max is set, pushing task into queue ...\n",
(guint)pool, (guint)task);
g_thread_pool_queue_push_unlocked (pool, task);
goto_global_pool = TRUE;
}
else if (pool->running || !pool->immediate)
{
g_async_queue_unlock (pool->queue);
debug("pool:0x%.8x, task:0x%.8x calling func ...\n",
(guint)pool, (guint)task);
pool->pool.func (task, pool->pool.user_data);
g_async_queue_lock (pool->queue);
}
}
else if (g_thread_pool_get_max_idle_time() > 0)
{
G_LOCK (settings);
if (pool->num_threads > max_unused_threads) {
G_UNLOCK (settings);
pool->num_threads--;
debug("pool:0x%.8x queue timed pop has no tasks waiting, "
"so stopping thread (%d running, %d unprocessed) ...\n",
(guint)pool,
pool->num_threads,
g_async_queue_length_unlocked (pool->queue));
g_async_queue_unlock (pool->queue);
return NULL;
}
G_UNLOCK (settings);
}
len = g_async_queue_length_unlocked (pool->queue);
}
debug("pool:0x%.8x, len:%d, watcher:%s, exclusive:%s, should run:%s\n",
(guint)pool,
len,
watcher ? "true" : "false",
pool->pool.exclusive ? "true" : "false",
g_thread_should_run (pool, len) ? "true" : "false");
if (!g_thread_should_run (pool, len))
{
g_cond_broadcast (inform_cond);
@ -148,6 +209,11 @@ g_thread_pool_thread_proxy (gpointer data)
/* At this pool there are no threads waiting, but tasks are. */
goto_global_pool = FALSE;
}
else if (len < 1 && g_thread_pool_get_max_idle_time () > 0)
{
goto_global_pool = FALSE;
watcher = FALSE;
}
else if (len == 0 && !watcher && !pool->pool.exclusive)
{
/* Here neither threads nor tasks are queued and we didn't
@ -160,6 +226,7 @@ g_thread_pool_thread_proxy (gpointer data)
if (goto_global_pool)
{
debug("pool:0x%.8x, now in the global pool\n", (guint)pool);
pool->num_threads--;
if (!pool->running && !pool->waiting)
@ -182,23 +249,27 @@ g_thread_pool_thread_proxy (gpointer data)
g_async_queue_lock (unused_thread_queue);
G_LOCK (unused_threads);
G_LOCK (settings);
if ((unused_threads >= max_unused_threads &&
max_unused_threads != -1))
{
G_UNLOCK (unused_threads);
G_UNLOCK (settings);
g_async_queue_unlock (unused_thread_queue);
debug("pool:0x%.8x stopping thread (%d running, %d unprocessed) ...\n",
(guint)pool,
pool->num_threads,
g_async_queue_length_unlocked (pool->queue));
/* Stop this thread */
return NULL;
}
unused_threads++;
G_UNLOCK (unused_threads);
G_UNLOCK (settings);
pool = g_async_queue_pop_unlocked (unused_thread_queue);
G_LOCK (unused_threads);
G_LOCK (settings);
unused_threads--;
G_UNLOCK (unused_threads);
G_UNLOCK (settings);
g_async_queue_unlock (unused_thread_queue);
@ -252,6 +323,8 @@ g_thread_pool_start_thread (GRealThreadPool *pool,
/* See comment in g_thread_pool_thread_proxy as to why this is done
* here and not there */
pool->num_threads++;
debug("pool:0x%.8x thread created, (running:%d)\n",
(guint)pool, pool->num_threads);
}
/**
@ -637,7 +710,7 @@ g_thread_pool_set_max_unused_threads (gint max_threads)
{
g_return_if_fail (max_threads >= -1);
G_LOCK (unused_threads);
G_LOCK (settings);
max_unused_threads = max_threads;
@ -652,7 +725,7 @@ g_thread_pool_set_max_unused_threads (gint max_threads)
g_async_queue_unlock (unused_thread_queue);
}
G_UNLOCK (unused_threads);
G_UNLOCK (settings);
}
/**
@ -667,9 +740,9 @@ g_thread_pool_get_max_unused_threads (void)
{
gint retval;
G_LOCK (unused_threads);
G_LOCK (settings);
retval = max_unused_threads;
G_UNLOCK (unused_threads);
G_UNLOCK (settings);
return retval;
}
@ -681,13 +754,14 @@ g_thread_pool_get_max_unused_threads (void)
*
* Return value: the number of currently unused threads
**/
guint g_thread_pool_get_num_unused_threads (void)
guint
g_thread_pool_get_num_unused_threads (void)
{
guint retval;
G_LOCK (unused_threads);
G_LOCK (settings);
retval = unused_threads;
G_UNLOCK (unused_threads);
G_UNLOCK (settings);
return retval;
}
@ -699,7 +773,8 @@ guint g_thread_pool_get_num_unused_threads (void)
* maximal number of unused threads. This function can be used to
* regularly stop all unused threads e.g. from g_timeout_add().
**/
void g_thread_pool_stop_unused_threads (void)
void
g_thread_pool_stop_unused_threads (void)
{
guint oldval = g_thread_pool_get_max_unused_threads ();
g_thread_pool_set_max_unused_threads (0);
@ -723,9 +798,10 @@ void g_thread_pool_stop_unused_threads (void)
*
* Since: 2.10
**/
void g_thread_pool_set_sort_function (GThreadPool *pool,
GCompareDataFunc func,
gpointer user_data)
void
g_thread_pool_set_sort_function (GThreadPool *pool,
GCompareDataFunc func,
gpointer user_data)
{
GRealThreadPool *real = (GRealThreadPool*) pool;
@ -745,5 +821,57 @@ void g_thread_pool_set_sort_function (GThreadPool *pool,
g_async_queue_unlock (real->queue);
}
/**
* g_thread_pool_set_max_idle_time:
* @interval: the maximum @interval (1/1000ths of a second) a thread
* can be idle.
*
* This function will set the maximum @interval that a thread waiting
* in the pool for new tasks can be idle for before being
* stopped. This function is similar to calling
* g_thread_pool_stop_unused_threads() on a regular timeout, except,
* this is done on a per thread basis.
*
* By setting @interval to 0, idle threads will not be stopped.
*
* This function makes use of g_async_queue_timed_pop () using
* @interval.
*
* Since: 2.10
**/
void
g_thread_pool_set_max_idle_time (guint interval)
{
G_LOCK (settings);
max_idle_time = interval;
G_UNLOCK (settings);
}
/**
* g_thread_pool_get_max_idle_time:
*
* This function will return the maximum @interval that a thread will
* wait in the thread pool for new tasks before being stopped.
*
* If this function returns 0, threads waiting in the thread pool for
* new work are not stopped.
*
* Return value: the maximum @interval to wait for new tasks in the
* thread pool before stopping the thread (1/1000ths of a second).
*
* Since: 2.10
**/
guint
g_thread_pool_get_max_idle_time (void)
{
guint retval;
G_LOCK (settings);
retval = max_idle_time;
G_UNLOCK (settings);
return retval;
}
#define __G_THREADPOOL_C__
#include "galiasdef.c"

View File

@ -101,6 +101,10 @@ void g_thread_pool_set_sort_function (GThreadPool *pool,
GCompareDataFunc func,
gpointer user_data);
/* Set maximum time a thread can be idle in the pool before it is stopped */
void g_thread_pool_set_max_idle_time (guint interval);
guint g_thread_pool_get_max_idle_time (void);
G_END_DECLS
#endif /* __G_THREADPOOL_H__ */

View File

@ -5,13 +5,17 @@
#include <glib.h>
#define d(x) x
#define debug(...) g_printerr (__VA_ARGS__)
#define RUNS 100
#define WAIT 5 /* seconds */
#define MAX_THREADS 10
#define MAX_UNUSED_THREADS 2
/* if > 0 the test will run continously (since the test ends when
* thread count is 0), if -1 it means no limit to unused threads
* if 0 then no unused threads are possible */
#define MAX_UNUSED_THREADS -1
G_LOCK_DEFINE_STATIC (thread_counter_pools);
@ -27,6 +31,7 @@ G_LOCK_DEFINE_STATIC (thread_counter_sort);
static gulong sort_thread_counter = 0;
static GThreadPool *idle_pool = NULL;
static GMainLoop *main_loop = NULL;
@ -38,7 +43,7 @@ test_thread_pools_entry_func (gpointer data, gpointer user_data)
id = GPOINTER_TO_UINT (data);
d(g_print ("[pool] ---> [%3.3d] entered thread\n", id));
debug("[pool] ---> [%3.3d] entered thread\n", id);
G_LOCK (thread_counter_pools);
abs_thread_counter++;
@ -104,8 +109,8 @@ test_thread_sort_entry_func (gpointer data, gpointer user_data)
thread_id = GPOINTER_TO_UINT (data);
is_sorted = GPOINTER_TO_INT (user_data);
d(g_print ("%s ---> entered thread:%2.2d, last thread:%2.2d\n",
is_sorted ? "[ sorted]" : "[unsorted]", thread_id, last_thread_id));
debug("%s ---> entered thread:%2.2d, last thread:%2.2d\n",
is_sorted ? "[ sorted]" : "[unsorted]", thread_id, last_thread_id);
if (is_sorted) {
static gboolean last_failed = FALSE;
@ -163,6 +168,74 @@ test_thread_sort (gboolean sort)
g_assert (g_thread_pool_get_num_threads (pool) == g_thread_pool_get_max_threads (pool));
}
static void
test_thread_idle_time_entry_func (gpointer data, gpointer user_data)
{
guint thread_id;
thread_id = GPOINTER_TO_UINT (data);
debug("[idle] ---> entered thread:%2.2d\n",
thread_id);
g_usleep (WAIT * 1000);
debug("[idle] <--- exiting thread:%2.2d\n",
thread_id);
}
static gboolean
test_thread_idle_timeout (gpointer data)
{
guint interval;
gint i;
interval = GPOINTER_TO_UINT (data);
for (i = 0; i < 2; i++) {
g_thread_pool_push (idle_pool, GUINT_TO_POINTER (100 + i), NULL);
debug("[idle] ===> pushed new thread with id:%d, number of threads:%d, unprocessed:%d\n",
100 + i,
g_thread_pool_get_num_threads (idle_pool),
g_thread_pool_unprocessed (idle_pool));
}
return FALSE;
}
static void
test_thread_idle_time (guint idle_time)
{
guint limit = 50;
guint interval = 10000;
gint i;
idle_pool = g_thread_pool_new (test_thread_idle_time_entry_func,
NULL,
MAX_THREADS,
FALSE,
NULL);
g_thread_pool_set_max_unused_threads (MAX_UNUSED_THREADS);
g_thread_pool_set_max_idle_time (interval);
g_assert (g_thread_pool_get_max_unused_threads () == MAX_UNUSED_THREADS);
g_assert (g_thread_pool_get_max_idle_time () == interval);
for (i = 0; i < limit; i++) {
g_thread_pool_push (idle_pool, GUINT_TO_POINTER (i), NULL);
debug("[idle] ===> pushed new thread with id:%d, number of threads:%d, unprocessed:%d\n",
i,
g_thread_pool_get_num_threads (idle_pool),
g_thread_pool_unprocessed (idle_pool));
}
g_timeout_add ((interval - 1000),
test_thread_idle_timeout,
GUINT_TO_POINTER (interval));
}
static gboolean
test_check_start_and_stop (gpointer user_data)
{
@ -173,7 +246,7 @@ test_check_start_and_stop (gpointer user_data)
if (test_number == 0) {
run_next = TRUE;
d(g_print ("***** RUNNING TEST %2.2d *****\n", test_number));
debug("***** RUNNING TEST %2.2d *****\n", test_number);
}
if (run_next) {
@ -189,8 +262,11 @@ test_check_start_and_stop (gpointer user_data)
case 3:
test_thread_sort (TRUE);
break;
case 4:
test_thread_idle_time (5);
break;
default:
d(g_print ("***** END OF TESTS *****\n"));
debug("***** END OF TESTS *****\n");
g_main_loop_quit (main_loop);
continue_timeout = FALSE;
break;
@ -203,19 +279,28 @@ test_check_start_and_stop (gpointer user_data)
if (test_number == 1) {
G_LOCK (thread_counter_pools);
quit &= running_thread_counter <= 0;
d(g_print ("***** POOL RUNNING THREAD COUNT:%ld\n",
running_thread_counter));
debug("***** POOL RUNNING THREAD COUNT:%ld\n",
running_thread_counter);
G_UNLOCK (thread_counter_pools);
}
if (test_number == 2 || test_number == 3) {
G_LOCK (thread_counter_sort);
quit &= sort_thread_counter <= 0;
d(g_print ("***** POOL SORT THREAD COUNT:%ld\n",
sort_thread_counter));
debug("***** POOL SORT THREAD COUNT:%ld\n",
sort_thread_counter);
G_UNLOCK (thread_counter_sort);
}
if (test_number == 4) {
guint idle;
idle = g_thread_pool_get_num_threads (idle_pool);
quit &= idle < 1;
debug("***** POOL IDLE THREAD COUNT:%d, UNPROCESSED JOBS:%d\n",
idle, g_thread_pool_unprocessed (idle_pool));
}
if (quit) {
run_next = TRUE;
}
@ -232,7 +317,7 @@ main (int argc, char *argv[])
#if defined(G_THREADS_ENABLED) && ! defined(G_THREADS_IMPL_NONE)
g_thread_init (NULL);
d(g_print ("Starting... (in one second)\n"));
debug("Starting... (in one second)\n");
g_timeout_add (1000, test_check_start_and_stop, NULL);
main_loop = g_main_loop_new (NULL, FALSE);