diff --git a/ChangeLog b/ChangeLog index f35cc7f1d..ac0646909 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,14 @@ +2005-12-05 Martyn Russell + + * docs/reference/glib/glib-sections.txt: + * glib/gasyncqueue.[ch]: + - Added support for sorting async queues by with _push_sorted(), + _push_sorted_unlocked(), _sort() and _sort_unlocked() (#323047). + + * tests/Makefile.am: + * tests/asyncqueue-test.c: + - Added test case for gasyncqueue.c + Mon Dec 5 15:53:20 2005 Tim Janik * glib/gslice.c: implement chain walking for arbitrary ->next pointer diff --git a/ChangeLog.pre-2-10 b/ChangeLog.pre-2-10 index f35cc7f1d..ac0646909 100644 --- a/ChangeLog.pre-2-10 +++ b/ChangeLog.pre-2-10 @@ -1,3 +1,14 @@ +2005-12-05 Martyn Russell + + * docs/reference/glib/glib-sections.txt: + * glib/gasyncqueue.[ch]: + - Added support for sorting async queues by with _push_sorted(), + _push_sorted_unlocked(), _sort() and _sort_unlocked() (#323047). + + * tests/Makefile.am: + * tests/asyncqueue-test.c: + - Added test case for gasyncqueue.c + Mon Dec 5 15:53:20 2005 Tim Janik * glib/gslice.c: implement chain walking for arbitrary ->next pointer diff --git a/ChangeLog.pre-2-12 b/ChangeLog.pre-2-12 index f35cc7f1d..ac0646909 100644 --- a/ChangeLog.pre-2-12 +++ b/ChangeLog.pre-2-12 @@ -1,3 +1,14 @@ +2005-12-05 Martyn Russell + + * docs/reference/glib/glib-sections.txt: + * glib/gasyncqueue.[ch]: + - Added support for sorting async queues by with _push_sorted(), + _push_sorted_unlocked(), _sort() and _sort_unlocked() (#323047). + + * tests/Makefile.am: + * tests/asyncqueue-test.c: + - Added test case for gasyncqueue.c + Mon Dec 5 15:53:20 2005 Tim Janik * glib/gslice.c: implement chain walking for arbitrary ->next pointer diff --git a/docs/reference/glib/glib-sections.txt b/docs/reference/glib/glib-sections.txt index 0574188b2..23b030814 100644 --- a/docs/reference/glib/glib-sections.txt +++ b/docs/reference/glib/glib-sections.txt @@ -646,10 +646,12 @@ g_async_queue_new g_async_queue_ref g_async_queue_unref g_async_queue_push +g_async_queue_push_sorted g_async_queue_pop g_async_queue_try_pop g_async_queue_timed_pop g_async_queue_length +g_async_queue_sort g_async_queue_lock @@ -657,10 +659,12 @@ g_async_queue_unlock g_async_queue_ref_unlocked g_async_queue_unref_and_unlock g_async_queue_push_unlocked +g_async_queue_push_sorted_unlocked g_async_queue_pop_unlocked g_async_queue_try_pop_unlocked g_async_queue_timed_pop_unlocked g_async_queue_length_unlocked +g_async_queue_sort_unlocked
diff --git a/glib/gasyncqueue.c b/glib/gasyncqueue.c index 276d3cb78..2cd252f92 100644 --- a/glib/gasyncqueue.c +++ b/glib/gasyncqueue.c @@ -218,6 +218,89 @@ g_async_queue_push_unlocked (GAsyncQueue* queue, gpointer data) g_cond_signal (queue->cond); } +/** + * g_async_queue_push_sorted: + * @queue: a #GAsyncQueue + * @data: the @data to push into the @queue + * @func: the #GCompareDataFunc is used to sort @queue. This function + * is passed two elements of the @queue. The function should return + * 0 if they are equal, a negative value if the first element + * should be higher in the @queue or a positive value if the first + * element should be lower in the @queue than the second element. + * @user_data: user data passed to @func. + * + * Inserts @data into @queue using @func to determine the new + * position. + * + * This function requires that the @queue is sorted before pushing on + * new elements. + * + * This function will lock @queue before it sorts the queue and unlock + * it when it is finished. + * + * For an example of @func see g_async_queue_sort(). + * + * Since: 2.10 + **/ +void +g_async_queue_push_sorted (GAsyncQueue *queue, + gpointer data, + GCompareDataFunc func, + gpointer user_data) +{ + g_return_if_fail (queue != NULL); + + g_mutex_lock (queue->mutex); + g_async_queue_push_sorted_unlocked (queue, data, func, user_data); + g_mutex_unlock (queue->mutex); +} + +/** + * g_async_queue_push_sorted_unlocked: + * @queue: a #GAsyncQueue + * @data: the @data to push into the @queue + * @func: the #GCompareDataFunc is used to sort @queue. This function + * is passed two elements of the @queue. The function should return + * 0 if they are equal, a negative value if the first element + * should be higher in the @queue or a positive value if the first + * element should be lower in the @queue than the second element. + * @user_data: user data passed to @func. + * + * Inserts @data into @queue using @func to determine the new + * position. + * + * This function requires that the @queue is sorted before pushing on + * new elements. + * + * This function is called while holding the @queue's lock. + * + * For an example of @func see g_async_queue_sort(). + * + * Since: 2.10 + **/ +void +g_async_queue_push_sorted_unlocked (GAsyncQueue *queue, + gpointer data, + GCompareDataFunc func, + gpointer user_data) +{ + GQueue *q; + GList *list; + + g_return_if_fail (queue != NULL); + + q = queue->queue; + + list = q->head; + while (list && func (list->data, data, user_data) < 0) + list = list->next; + + if (list) + g_queue_insert_before (q, list, data); + else + g_queue_push_tail (q, data); +} + static gpointer g_async_queue_pop_intern_unlocked (GAsyncQueue* queue, gboolean try, GTimeVal *end_time) @@ -452,5 +535,82 @@ g_async_queue_length_unlocked (GAsyncQueue* queue) return queue->queue->length - queue->waiting_threads; } +/** + * g_async_queue_sort: + * @queue: a #GAsyncQueue + * @func: the #GCompareDataFunc is used to sort @queue. This + * function is passed two elements of the @queue. The function + * should return 0 if they are equal, a negative value if the + * first element should be higher in the @queue or a positive + * value if the first element should be lower in the @queue than + * the second element. + * @user_data: user data passed to @func + * + * Sorts @queue using @func. + * + * This function will lock @queue before it sorts the queue and unlock + * it when it is finished. + * + * If you were sorting a list of priority numbers to make sure the + * lowest priority would be at the top of the queue, you could use: + * + * gint id1; + * gint id2; + * + * id1 = GPOINTER_TO_INT (element1); + * id2 = GPOINTER_TO_INT (element2); + * + * return (id2 - id1); + * + * + * Since: 2.10 + **/ +void +g_async_queue_sort (GAsyncQueue *queue, + GCompareDataFunc func, + gpointer user_data) +{ + g_return_if_fail (queue != NULL); + g_return_if_fail (func != NULL); + + g_mutex_lock (queue->mutex); + g_async_queue_sort_unlocked (queue, func, user_data); + g_mutex_unlock (queue->mutex); +} + +/** + * g_async_queue_sort_unlocked: + * @queue: a #GAsyncQueue + * @func: the #GCompareDataFunc is used to sort @queue. This + * function is passed two elements of the @queue. The function + * should return 0 if they are equal, a negative value if the + * first element should be higher in the @queue or a positive + * value if the first element should be lower in the @queue than + * the second element. + * @user_data: user data passed to @func + * + * Sorts @queue using @func. + * + * This function is called while holding the @queue's lock. + * + * Since: 2.10 + **/ +void +g_async_queue_sort_unlocked (GAsyncQueue *queue, + GCompareDataFunc func, + gpointer user_data) +{ + GQueue *q; + + g_return_if_fail (queue != NULL); + g_return_if_fail (func != NULL); + + q = queue->queue; + + q->head = g_list_sort_with_data (q->head, func, user_data); + q->tail = g_list_last (q->head); +} + + #define __G_ASYNCQUEUE_C__ #include "galiasdef.c" diff --git a/glib/gasyncqueue.h b/glib/gasyncqueue.h index e26add9b7..ce11e15ad 100644 --- a/glib/gasyncqueue.h +++ b/glib/gasyncqueue.h @@ -37,45 +37,66 @@ typedef struct _GAsyncQueue GAsyncQueue; */ /* Get a new GAsyncQueue with the ref_count 1 */ -GAsyncQueue* g_async_queue_new (void); +GAsyncQueue* g_async_queue_new (void); /* Lock and unlock a GAsyncQueue. All functions lock the queue for * themselves, but in certain cirumstances you want to hold the lock longer, * thus you lock the queue, call the *_unlocked functions and unlock it again. */ -void g_async_queue_lock (GAsyncQueue *queue); -void g_async_queue_unlock (GAsyncQueue *queue); +void g_async_queue_lock (GAsyncQueue *queue); +void g_async_queue_unlock (GAsyncQueue *queue); + + /* Ref and unref the GAsyncQueue. */ -GAsyncQueue* g_async_queue_ref (GAsyncQueue *queue); -void g_async_queue_unref (GAsyncQueue *queue); +GAsyncQueue* g_async_queue_ref (GAsyncQueue *queue); +void g_async_queue_unref (GAsyncQueue *queue); + + #ifndef G_DISABLE_DEPRECATED /* You don't have to hold the lock for calling *_ref and *_unref anymore. */ -void g_async_queue_ref_unlocked (GAsyncQueue *queue); -void g_async_queue_unref_and_unlock (GAsyncQueue *queue); +void g_async_queue_ref_unlocked (GAsyncQueue *queue); +void g_async_queue_unref_and_unlock (GAsyncQueue *queue); + + #endif /* !G_DISABLE_DEPRECATED */ /* Push data into the async queue. Must not be NULL. */ -void g_async_queue_push (GAsyncQueue *queue, - gpointer data); -void g_async_queue_push_unlocked (GAsyncQueue *queue, - gpointer data); +void g_async_queue_push (GAsyncQueue *queue, + gpointer data); +void g_async_queue_push_unlocked (GAsyncQueue *queue, + gpointer data); + +void g_async_queue_push_sorted (GAsyncQueue *queue, + gpointer data, + GCompareDataFunc func, + gpointer user_data); +void g_async_queue_push_sorted_unlocked (GAsyncQueue *queue, + gpointer data, + GCompareDataFunc func, + gpointer user_data); /* Pop data from the async queue. When no data is there, the thread is blocked * until data arrives. */ -gpointer g_async_queue_pop (GAsyncQueue *queue); -gpointer g_async_queue_pop_unlocked (GAsyncQueue *queue); +gpointer g_async_queue_pop (GAsyncQueue *queue); +gpointer g_async_queue_pop_unlocked (GAsyncQueue *queue); + + /* Try to pop data. NULL is returned in case of empty queue. */ -gpointer g_async_queue_try_pop (GAsyncQueue *queue); -gpointer g_async_queue_try_pop_unlocked (GAsyncQueue *queue); +gpointer g_async_queue_try_pop (GAsyncQueue *queue); +gpointer g_async_queue_try_pop_unlocked (GAsyncQueue *queue); + + /* Wait for data until at maximum until end_time is reached. NULL is returned * in case of empty queue. */ -gpointer g_async_queue_timed_pop (GAsyncQueue *queue, - GTimeVal *end_time); -gpointer g_async_queue_timed_pop_unlocked (GAsyncQueue *queue, - GTimeVal *end_time); +gpointer g_async_queue_timed_pop (GAsyncQueue *queue, + GTimeVal *end_time); +gpointer g_async_queue_timed_pop_unlocked (GAsyncQueue *queue, + GTimeVal *end_time); + + /* Return the length of the queue. Negative values mean that threads * are waiting, positve values mean that there are entries in the @@ -83,8 +104,15 @@ gpointer g_async_queue_timed_pop_unlocked (GAsyncQueue *queue, * the number of waiting threads, g_async_queue_length == 0 could also * mean 'n' entries in the queue and 'n' thread waiting. Such can * happen due to locking of the queue or due to scheduling. */ -gint g_async_queue_length (GAsyncQueue *queue); -gint g_async_queue_length_unlocked (GAsyncQueue *queue); +gint g_async_queue_length (GAsyncQueue *queue); +gint g_async_queue_length_unlocked (GAsyncQueue *queue); +void g_async_queue_sort (GAsyncQueue *queue, + GCompareDataFunc func, + gpointer user_data); +void g_async_queue_sort_unlocked (GAsyncQueue *queue, + GCompareDataFunc func, + gpointer user_data); + G_END_DECLS diff --git a/tests/Makefile.am b/tests/Makefile.am index 445cabbd3..00726d632 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -85,6 +85,7 @@ test_programs = \ patterntest \ printf-test \ queue-test \ + asyncqueue-test \ qsort-test \ rand-test \ relation-test \ @@ -145,6 +146,7 @@ node_test_LDADD = $(progs_ldadd) option_test_LDADD = $(progs_ldadd) printf_test_LDADD = $(progs_ldadd) queue_test_LDADD = $(progs_ldadd) +asyncqueue_test_LDADD = $(thread_ldadd) qsort_test_LDADD = $(progs_ldadd) rand_test_LDADD = $(progs_ldadd) relation_test_LDADD = $(progs_ldadd) diff --git a/tests/asyncqueue-test.c b/tests/asyncqueue-test.c new file mode 100644 index 000000000..810705bc2 --- /dev/null +++ b/tests/asyncqueue-test.c @@ -0,0 +1,182 @@ +#undef G_DISABLE_ASSERT +#undef G_LOG_DOMAIN + +#ifdef HAVE_CONFIG_H +# include +#endif +#include + +#include +#include + +#define d(x) x + +#define MAX_THREADS 50 +#define MAX_SORTS 5 /* only applies if + ASYC_QUEUE_DO_SORT is set to 1 */ +#define MAX_TIME 20 /* seconds */ +#define MIN_TIME 5 /* seconds */ + +#define SORT_QUEUE_AFTER 1 +#define SORT_QUEUE_ON_PUSH 1 /* if this is done, the + SORT_QUEUE_AFTER is ignored */ +#define QUIT_WHEN_DONE 1 + + +#if SORT_QUEUE_ON_PUSH == 1 +# undef SORT_QUEUE_AFTER +# define SORT_QUEUE_AFTER 0 +#endif + + +static GMainLoop *main_loop = NULL; +static GThreadPool *thread_pool = NULL; +static GAsyncQueue *async_queue = NULL; + + +static gint +sort_compare (gconstpointer p1, gconstpointer p2, gpointer user_data) +{ + gint id1; + gint id2; + + id1 = GPOINTER_TO_INT (p1); + id2 = GPOINTER_TO_INT (p2); + + d(g_print ("comparing #1:%d and #2:%d, returning %d\n", + id1, id2, (id2 - id1))); + + return (id2 - id1); +} + +static gboolean +sort_queue (gpointer user_data) +{ + static gint sorts = 0; + gboolean can_quit = FALSE; + gint sort_multiplier; + gint len; + gint i; + + sort_multiplier = GPOINTER_TO_INT (user_data); + + if (SORT_QUEUE_AFTER) { + d(g_print ("sorting async queue...\n")); + g_async_queue_sort (async_queue, sort_compare, NULL); + + sorts++; + + if (sorts >= sort_multiplier) { + can_quit = TRUE; + } + + g_async_queue_sort (async_queue, sort_compare, NULL); + len = g_async_queue_length (async_queue); + + d(g_print ("sorted queue (for %d/%d times, size:%d)...\n", sorts, MAX_SORTS, len)); + } else { + can_quit = TRUE; + len = g_async_queue_length (async_queue); + d(g_print ("printing queue (size:%d)...\n", len)); + } + + for (i = 0; i < len; i++) { + gpointer p; + + p = g_async_queue_pop (async_queue); + d(g_print ("item %d ---> %d\n", i, GPOINTER_TO_INT (p))); + } + + if (can_quit && QUIT_WHEN_DONE) { + g_main_loop_quit (main_loop); + } + + return !can_quit; +} + +static void +enter_thread (gpointer data, gpointer user_data) +{ + gint len; + gint id; + gulong ms; + + id = GPOINTER_TO_INT (data); + + ms = g_random_int_range (MIN_TIME * 1000, MAX_TIME * 1000); + d(g_print ("entered thread with id:%d, adding to queue in:%ld ms\n", id, ms)); + + g_usleep (ms * 1000); + + if (SORT_QUEUE_ON_PUSH) { + g_async_queue_push_sorted (async_queue, GINT_TO_POINTER (id), sort_compare, NULL); + } else { + g_async_queue_push (async_queue, GINT_TO_POINTER (id)); + } + + len = g_async_queue_length (async_queue); + + d(g_print ("thread id:%d added to async queue (size:%d)\n", + id, len)); +} + +int main (int argc, char *argv[]) +{ +#if defined(G_THREADS_ENABLED) && ! defined(G_THREADS_IMPL_NONE) + gint i; + gint max_threads = MAX_THREADS; + gint max_unused_threads = MAX_THREADS; + gint sort_multiplier = MAX_SORTS; + gint sort_interval; + + g_thread_init (NULL); + + d(g_print ("creating async queue...\n")); + async_queue = g_async_queue_new (); + + g_return_val_if_fail (async_queue != NULL, EXIT_FAILURE); + + d(g_print ("creating thread pool with max threads:%d, max unused threads:%d...\n", + max_threads, max_unused_threads)); + thread_pool = g_thread_pool_new (enter_thread, + async_queue, + max_threads, + FALSE, + NULL); + + g_return_val_if_fail (thread_pool != NULL, EXIT_FAILURE); + + g_thread_pool_set_max_unused_threads (max_unused_threads); + + d(g_print ("creating threads...\n")); + for (i = 0; i <= max_threads; i++) { + GError *error; + + g_thread_pool_push (thread_pool, GINT_TO_POINTER (i), &error); + + if (!error) { + g_assert_not_reached (); + } + } + + if (!SORT_QUEUE_AFTER) { + sort_multiplier = 1; + } + + sort_interval = ((MAX_TIME / sort_multiplier) + 2) * 1000; + d(g_print ("adding timeout of %d seconds to sort %d times\n", + sort_interval, sort_multiplier)); + g_timeout_add (sort_interval, sort_queue, GINT_TO_POINTER (sort_multiplier)); + + if (SORT_QUEUE_ON_PUSH) { + d(g_print ("sorting when pushing into the queue...\n")); + } + + d(g_print ("entering main event loop\n")); + + main_loop = g_main_loop_new (NULL, FALSE); + g_main_loop_run (main_loop); +#endif + + return EXIT_SUCCESS; +}