- Added support for sorting async queues by with _push_sorted(),

* docs/reference/glib/glib-sections.txt:
* glib/gasyncqueue.[ch]:
- Added support for sorting async queues by with _push_sorted(),
_push_sorted_unlocked(), _sort() and _sort_unlocked() (#323047).

* tests/Makefile.am:
* tests/asyncqueue-test.c:
- Added test case for gasyncqueue.c
This commit is contained in:
Martyn James Russell 2005-12-05 15:08:45 +00:00
parent 6ed79b115c
commit a127920458
8 changed files with 430 additions and 21 deletions

View File

@ -1,3 +1,14 @@
2005-12-05 Martyn Russell <martyn@imendio.com>
* docs/reference/glib/glib-sections.txt:
* glib/gasyncqueue.[ch]:
- Added support for sorting async queues by with _push_sorted(),
_push_sorted_unlocked(), _sort() and _sort_unlocked() (#323047).
* tests/Makefile.am:
* tests/asyncqueue-test.c:
- Added test case for gasyncqueue.c
Mon Dec 5 15:53:20 2005 Tim Janik <timj@imendio.com>
* glib/gslice.c: implement chain walking for arbitrary ->next pointer

View File

@ -1,3 +1,14 @@
2005-12-05 Martyn Russell <martyn@imendio.com>
* docs/reference/glib/glib-sections.txt:
* glib/gasyncqueue.[ch]:
- Added support for sorting async queues by with _push_sorted(),
_push_sorted_unlocked(), _sort() and _sort_unlocked() (#323047).
* tests/Makefile.am:
* tests/asyncqueue-test.c:
- Added test case for gasyncqueue.c
Mon Dec 5 15:53:20 2005 Tim Janik <timj@imendio.com>
* glib/gslice.c: implement chain walking for arbitrary ->next pointer

View File

@ -1,3 +1,14 @@
2005-12-05 Martyn Russell <martyn@imendio.com>
* docs/reference/glib/glib-sections.txt:
* glib/gasyncqueue.[ch]:
- Added support for sorting async queues by with _push_sorted(),
_push_sorted_unlocked(), _sort() and _sort_unlocked() (#323047).
* tests/Makefile.am:
* tests/asyncqueue-test.c:
- Added test case for gasyncqueue.c
Mon Dec 5 15:53:20 2005 Tim Janik <timj@imendio.com>
* glib/gslice.c: implement chain walking for arbitrary ->next pointer

View File

@ -646,10 +646,12 @@ g_async_queue_new
g_async_queue_ref
g_async_queue_unref
g_async_queue_push
g_async_queue_push_sorted
g_async_queue_pop
g_async_queue_try_pop
g_async_queue_timed_pop
g_async_queue_length
g_async_queue_sort
<SUBSECTION>
g_async_queue_lock
@ -657,10 +659,12 @@ g_async_queue_unlock
g_async_queue_ref_unlocked
g_async_queue_unref_and_unlock
g_async_queue_push_unlocked
g_async_queue_push_sorted_unlocked
g_async_queue_pop_unlocked
g_async_queue_try_pop_unlocked
g_async_queue_timed_pop_unlocked
g_async_queue_length_unlocked
g_async_queue_sort_unlocked
</SECTION>
<SECTION>

View File

@ -218,6 +218,89 @@ g_async_queue_push_unlocked (GAsyncQueue* queue, gpointer data)
g_cond_signal (queue->cond);
}
/**
* g_async_queue_push_sorted:
* @queue: a #GAsyncQueue
* @data: the @data to push into the @queue
* @func: the #GCompareDataFunc is used to sort @queue. This function
* is passed two elements of the @queue. The function should return
* 0 if they are equal, a negative value if the first element
* should be higher in the @queue or a positive value if the first
* element should be lower in the @queue than the second element.
* @user_data: user data passed to @func.
*
* Inserts @data into @queue using @func to determine the new
* position.
*
* This function requires that the @queue is sorted before pushing on
* new elements.
*
* This function will lock @queue before it sorts the queue and unlock
* it when it is finished.
*
* For an example of @func see g_async_queue_sort().
*
* Since: 2.10
**/
void
g_async_queue_push_sorted (GAsyncQueue *queue,
gpointer data,
GCompareDataFunc func,
gpointer user_data)
{
g_return_if_fail (queue != NULL);
g_mutex_lock (queue->mutex);
g_async_queue_push_sorted_unlocked (queue, data, func, user_data);
g_mutex_unlock (queue->mutex);
}
/**
* g_async_queue_push_sorted_unlocked:
* @queue: a #GAsyncQueue
* @data: the @data to push into the @queue
* @func: the #GCompareDataFunc is used to sort @queue. This function
* is passed two elements of the @queue. The function should return
* 0 if they are equal, a negative value if the first element
* should be higher in the @queue or a positive value if the first
* element should be lower in the @queue than the second element.
* @user_data: user data passed to @func.
*
* Inserts @data into @queue using @func to determine the new
* position.
*
* This function requires that the @queue is sorted before pushing on
* new elements.
*
* This function is called while holding the @queue's lock.
*
* For an example of @func see g_async_queue_sort().
*
* Since: 2.10
**/
void
g_async_queue_push_sorted_unlocked (GAsyncQueue *queue,
gpointer data,
GCompareDataFunc func,
gpointer user_data)
{
GQueue *q;
GList *list;
g_return_if_fail (queue != NULL);
q = queue->queue;
list = q->head;
while (list && func (list->data, data, user_data) < 0)
list = list->next;
if (list)
g_queue_insert_before (q, list, data);
else
g_queue_push_tail (q, data);
}
static gpointer
g_async_queue_pop_intern_unlocked (GAsyncQueue* queue, gboolean try,
GTimeVal *end_time)
@ -452,5 +535,82 @@ g_async_queue_length_unlocked (GAsyncQueue* queue)
return queue->queue->length - queue->waiting_threads;
}
/**
* g_async_queue_sort:
* @queue: a #GAsyncQueue
* @func: the #GCompareDataFunc is used to sort @queue. This
* function is passed two elements of the @queue. The function
* should return 0 if they are equal, a negative value if the
* first element should be higher in the @queue or a positive
* value if the first element should be lower in the @queue than
* the second element.
* @user_data: user data passed to @func
*
* Sorts @queue using @func.
*
* This function will lock @queue before it sorts the queue and unlock
* it when it is finished.
*
* If you were sorting a list of priority numbers to make sure the
* lowest priority would be at the top of the queue, you could use:
* <informalexample><programlisting>
* gint id1;
* gint id2;
*
* id1 = GPOINTER_TO_INT (element1);
* id2 = GPOINTER_TO_INT (element2);
*
* return (id2 - id1);
* </programlisting></informalexample>
*
* Since: 2.10
**/
void
g_async_queue_sort (GAsyncQueue *queue,
GCompareDataFunc func,
gpointer user_data)
{
g_return_if_fail (queue != NULL);
g_return_if_fail (func != NULL);
g_mutex_lock (queue->mutex);
g_async_queue_sort_unlocked (queue, func, user_data);
g_mutex_unlock (queue->mutex);
}
/**
* g_async_queue_sort_unlocked:
* @queue: a #GAsyncQueue
* @func: the #GCompareDataFunc is used to sort @queue. This
* function is passed two elements of the @queue. The function
* should return 0 if they are equal, a negative value if the
* first element should be higher in the @queue or a positive
* value if the first element should be lower in the @queue than
* the second element.
* @user_data: user data passed to @func
*
* Sorts @queue using @func.
*
* This function is called while holding the @queue's lock.
*
* Since: 2.10
**/
void
g_async_queue_sort_unlocked (GAsyncQueue *queue,
GCompareDataFunc func,
gpointer user_data)
{
GQueue *q;
g_return_if_fail (queue != NULL);
g_return_if_fail (func != NULL);
q = queue->queue;
q->head = g_list_sort_with_data (q->head, func, user_data);
q->tail = g_list_last (q->head);
}
#define __G_ASYNCQUEUE_C__
#include "galiasdef.c"

View File

@ -37,45 +37,66 @@ typedef struct _GAsyncQueue GAsyncQueue;
*/
/* Get a new GAsyncQueue with the ref_count 1 */
GAsyncQueue* g_async_queue_new (void);
GAsyncQueue* g_async_queue_new (void);
/* Lock and unlock a GAsyncQueue. All functions lock the queue for
* themselves, but in certain cirumstances you want to hold the lock longer,
* thus you lock the queue, call the *_unlocked functions and unlock it again.
*/
void g_async_queue_lock (GAsyncQueue *queue);
void g_async_queue_unlock (GAsyncQueue *queue);
void g_async_queue_lock (GAsyncQueue *queue);
void g_async_queue_unlock (GAsyncQueue *queue);
/* Ref and unref the GAsyncQueue. */
GAsyncQueue* g_async_queue_ref (GAsyncQueue *queue);
void g_async_queue_unref (GAsyncQueue *queue);
GAsyncQueue* g_async_queue_ref (GAsyncQueue *queue);
void g_async_queue_unref (GAsyncQueue *queue);
#ifndef G_DISABLE_DEPRECATED
/* You don't have to hold the lock for calling *_ref and *_unref anymore. */
void g_async_queue_ref_unlocked (GAsyncQueue *queue);
void g_async_queue_unref_and_unlock (GAsyncQueue *queue);
void g_async_queue_ref_unlocked (GAsyncQueue *queue);
void g_async_queue_unref_and_unlock (GAsyncQueue *queue);
#endif /* !G_DISABLE_DEPRECATED */
/* Push data into the async queue. Must not be NULL. */
void g_async_queue_push (GAsyncQueue *queue,
gpointer data);
void g_async_queue_push_unlocked (GAsyncQueue *queue,
gpointer data);
void g_async_queue_push (GAsyncQueue *queue,
gpointer data);
void g_async_queue_push_unlocked (GAsyncQueue *queue,
gpointer data);
void g_async_queue_push_sorted (GAsyncQueue *queue,
gpointer data,
GCompareDataFunc func,
gpointer user_data);
void g_async_queue_push_sorted_unlocked (GAsyncQueue *queue,
gpointer data,
GCompareDataFunc func,
gpointer user_data);
/* Pop data from the async queue. When no data is there, the thread is blocked
* until data arrives. */
gpointer g_async_queue_pop (GAsyncQueue *queue);
gpointer g_async_queue_pop_unlocked (GAsyncQueue *queue);
gpointer g_async_queue_pop (GAsyncQueue *queue);
gpointer g_async_queue_pop_unlocked (GAsyncQueue *queue);
/* Try to pop data. NULL is returned in case of empty queue. */
gpointer g_async_queue_try_pop (GAsyncQueue *queue);
gpointer g_async_queue_try_pop_unlocked (GAsyncQueue *queue);
gpointer g_async_queue_try_pop (GAsyncQueue *queue);
gpointer g_async_queue_try_pop_unlocked (GAsyncQueue *queue);
/* Wait for data until at maximum until end_time is reached. NULL is returned
* in case of empty queue. */
gpointer g_async_queue_timed_pop (GAsyncQueue *queue,
GTimeVal *end_time);
gpointer g_async_queue_timed_pop_unlocked (GAsyncQueue *queue,
GTimeVal *end_time);
gpointer g_async_queue_timed_pop (GAsyncQueue *queue,
GTimeVal *end_time);
gpointer g_async_queue_timed_pop_unlocked (GAsyncQueue *queue,
GTimeVal *end_time);
/* Return the length of the queue. Negative values mean that threads
* are waiting, positve values mean that there are entries in the
@ -83,8 +104,15 @@ gpointer g_async_queue_timed_pop_unlocked (GAsyncQueue *queue,
* the number of waiting threads, g_async_queue_length == 0 could also
* mean 'n' entries in the queue and 'n' thread waiting. Such can
* happen due to locking of the queue or due to scheduling. */
gint g_async_queue_length (GAsyncQueue *queue);
gint g_async_queue_length_unlocked (GAsyncQueue *queue);
gint g_async_queue_length (GAsyncQueue *queue);
gint g_async_queue_length_unlocked (GAsyncQueue *queue);
void g_async_queue_sort (GAsyncQueue *queue,
GCompareDataFunc func,
gpointer user_data);
void g_async_queue_sort_unlocked (GAsyncQueue *queue,
GCompareDataFunc func,
gpointer user_data);
G_END_DECLS

View File

@ -85,6 +85,7 @@ test_programs = \
patterntest \
printf-test \
queue-test \
asyncqueue-test \
qsort-test \
rand-test \
relation-test \
@ -145,6 +146,7 @@ node_test_LDADD = $(progs_ldadd)
option_test_LDADD = $(progs_ldadd)
printf_test_LDADD = $(progs_ldadd)
queue_test_LDADD = $(progs_ldadd)
asyncqueue_test_LDADD = $(thread_ldadd)
qsort_test_LDADD = $(progs_ldadd)
rand_test_LDADD = $(progs_ldadd)
relation_test_LDADD = $(progs_ldadd)

182
tests/asyncqueue-test.c Normal file
View File

@ -0,0 +1,182 @@
#undef G_DISABLE_ASSERT
#undef G_LOG_DOMAIN
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include <glib.h>
#include <time.h>
#include <stdlib.h>
#define d(x) x
#define MAX_THREADS 50
#define MAX_SORTS 5 /* only applies if
ASYC_QUEUE_DO_SORT is set to 1 */
#define MAX_TIME 20 /* seconds */
#define MIN_TIME 5 /* seconds */
#define SORT_QUEUE_AFTER 1
#define SORT_QUEUE_ON_PUSH 1 /* if this is done, the
SORT_QUEUE_AFTER is ignored */
#define QUIT_WHEN_DONE 1
#if SORT_QUEUE_ON_PUSH == 1
# undef SORT_QUEUE_AFTER
# define SORT_QUEUE_AFTER 0
#endif
static GMainLoop *main_loop = NULL;
static GThreadPool *thread_pool = NULL;
static GAsyncQueue *async_queue = NULL;
static gint
sort_compare (gconstpointer p1, gconstpointer p2, gpointer user_data)
{
gint id1;
gint id2;
id1 = GPOINTER_TO_INT (p1);
id2 = GPOINTER_TO_INT (p2);
d(g_print ("comparing #1:%d and #2:%d, returning %d\n",
id1, id2, (id2 - id1)));
return (id2 - id1);
}
static gboolean
sort_queue (gpointer user_data)
{
static gint sorts = 0;
gboolean can_quit = FALSE;
gint sort_multiplier;
gint len;
gint i;
sort_multiplier = GPOINTER_TO_INT (user_data);
if (SORT_QUEUE_AFTER) {
d(g_print ("sorting async queue...\n"));
g_async_queue_sort (async_queue, sort_compare, NULL);
sorts++;
if (sorts >= sort_multiplier) {
can_quit = TRUE;
}
g_async_queue_sort (async_queue, sort_compare, NULL);
len = g_async_queue_length (async_queue);
d(g_print ("sorted queue (for %d/%d times, size:%d)...\n", sorts, MAX_SORTS, len));
} else {
can_quit = TRUE;
len = g_async_queue_length (async_queue);
d(g_print ("printing queue (size:%d)...\n", len));
}
for (i = 0; i < len; i++) {
gpointer p;
p = g_async_queue_pop (async_queue);
d(g_print ("item %d ---> %d\n", i, GPOINTER_TO_INT (p)));
}
if (can_quit && QUIT_WHEN_DONE) {
g_main_loop_quit (main_loop);
}
return !can_quit;
}
static void
enter_thread (gpointer data, gpointer user_data)
{
gint len;
gint id;
gulong ms;
id = GPOINTER_TO_INT (data);
ms = g_random_int_range (MIN_TIME * 1000, MAX_TIME * 1000);
d(g_print ("entered thread with id:%d, adding to queue in:%ld ms\n", id, ms));
g_usleep (ms * 1000);
if (SORT_QUEUE_ON_PUSH) {
g_async_queue_push_sorted (async_queue, GINT_TO_POINTER (id), sort_compare, NULL);
} else {
g_async_queue_push (async_queue, GINT_TO_POINTER (id));
}
len = g_async_queue_length (async_queue);
d(g_print ("thread id:%d added to async queue (size:%d)\n",
id, len));
}
int main (int argc, char *argv[])
{
#if defined(G_THREADS_ENABLED) && ! defined(G_THREADS_IMPL_NONE)
gint i;
gint max_threads = MAX_THREADS;
gint max_unused_threads = MAX_THREADS;
gint sort_multiplier = MAX_SORTS;
gint sort_interval;
g_thread_init (NULL);
d(g_print ("creating async queue...\n"));
async_queue = g_async_queue_new ();
g_return_val_if_fail (async_queue != NULL, EXIT_FAILURE);
d(g_print ("creating thread pool with max threads:%d, max unused threads:%d...\n",
max_threads, max_unused_threads));
thread_pool = g_thread_pool_new (enter_thread,
async_queue,
max_threads,
FALSE,
NULL);
g_return_val_if_fail (thread_pool != NULL, EXIT_FAILURE);
g_thread_pool_set_max_unused_threads (max_unused_threads);
d(g_print ("creating threads...\n"));
for (i = 0; i <= max_threads; i++) {
GError *error;
g_thread_pool_push (thread_pool, GINT_TO_POINTER (i), &error);
if (!error) {
g_assert_not_reached ();
}
}
if (!SORT_QUEUE_AFTER) {
sort_multiplier = 1;
}
sort_interval = ((MAX_TIME / sort_multiplier) + 2) * 1000;
d(g_print ("adding timeout of %d seconds to sort %d times\n",
sort_interval, sort_multiplier));
g_timeout_add (sort_interval, sort_queue, GINT_TO_POINTER (sort_multiplier));
if (SORT_QUEUE_ON_PUSH) {
d(g_print ("sorting when pushing into the queue...\n"));
}
d(g_print ("entering main event loop\n"));
main_loop = g_main_loop_new (NULL, FALSE);
g_main_loop_run (main_loop);
#endif
return EXIT_SUCCESS;
}