+2005-12-05 Martyn Russell <martyn@imendio.com>
+
+ * docs/reference/glib/glib-sections.txt:
+ * glib/gasyncqueue.[ch]:
+ - Added support for sorting async queues by with _push_sorted(),
+ _push_sorted_unlocked(), _sort() and _sort_unlocked() (#323047).
+
+ * tests/Makefile.am:
+ * tests/asyncqueue-test.c:
+ - Added test case for gasyncqueue.c
+
Mon Dec 5 15:53:20 2005 Tim Janik <timj@imendio.com>
* glib/gslice.c: implement chain walking for arbitrary ->next pointer
+2005-12-05 Martyn Russell <martyn@imendio.com>
+
+ * docs/reference/glib/glib-sections.txt:
+ * glib/gasyncqueue.[ch]:
+ - Added support for sorting async queues by with _push_sorted(),
+ _push_sorted_unlocked(), _sort() and _sort_unlocked() (#323047).
+
+ * tests/Makefile.am:
+ * tests/asyncqueue-test.c:
+ - Added test case for gasyncqueue.c
+
Mon Dec 5 15:53:20 2005 Tim Janik <timj@imendio.com>
* glib/gslice.c: implement chain walking for arbitrary ->next pointer
+2005-12-05 Martyn Russell <martyn@imendio.com>
+
+ * docs/reference/glib/glib-sections.txt:
+ * glib/gasyncqueue.[ch]:
+ - Added support for sorting async queues by with _push_sorted(),
+ _push_sorted_unlocked(), _sort() and _sort_unlocked() (#323047).
+
+ * tests/Makefile.am:
+ * tests/asyncqueue-test.c:
+ - Added test case for gasyncqueue.c
+
Mon Dec 5 15:53:20 2005 Tim Janik <timj@imendio.com>
* glib/gslice.c: implement chain walking for arbitrary ->next pointer
g_async_queue_ref
g_async_queue_unref
g_async_queue_push
+g_async_queue_push_sorted
g_async_queue_pop
g_async_queue_try_pop
g_async_queue_timed_pop
g_async_queue_length
+g_async_queue_sort
<SUBSECTION>
g_async_queue_lock
g_async_queue_ref_unlocked
g_async_queue_unref_and_unlock
g_async_queue_push_unlocked
+g_async_queue_push_sorted_unlocked
g_async_queue_pop_unlocked
g_async_queue_try_pop_unlocked
g_async_queue_timed_pop_unlocked
g_async_queue_length_unlocked
+g_async_queue_sort_unlocked
</SECTION>
<SECTION>
g_cond_signal (queue->cond);
}
+/**
+ * g_async_queue_push_sorted:
+ * @queue: a #GAsyncQueue
+ * @data: the @data to push into the @queue
+ * @func: the #GCompareDataFunc is used to sort @queue. This function
+ * is passed two elements of the @queue. The function should return
+ * 0 if they are equal, a negative value if the first element
+ * should be higher in the @queue or a positive value if the first
+ * element should be lower in the @queue than the second element.
+ * @user_data: user data passed to @func.
+ *
+ * Inserts @data into @queue using @func to determine the new
+ * position.
+ *
+ * This function requires that the @queue is sorted before pushing on
+ * new elements.
+ *
+ * This function will lock @queue before it sorts the queue and unlock
+ * it when it is finished.
+ *
+ * For an example of @func see g_async_queue_sort().
+ *
+ * Since: 2.10
+ **/
+void
+g_async_queue_push_sorted (GAsyncQueue *queue,
+ gpointer data,
+ GCompareDataFunc func,
+ gpointer user_data)
+{
+ g_return_if_fail (queue != NULL);
+
+ g_mutex_lock (queue->mutex);
+ g_async_queue_push_sorted_unlocked (queue, data, func, user_data);
+ g_mutex_unlock (queue->mutex);
+}
+
+/**
+ * g_async_queue_push_sorted_unlocked:
+ * @queue: a #GAsyncQueue
+ * @data: the @data to push into the @queue
+ * @func: the #GCompareDataFunc is used to sort @queue. This function
+ * is passed two elements of the @queue. The function should return
+ * 0 if they are equal, a negative value if the first element
+ * should be higher in the @queue or a positive value if the first
+ * element should be lower in the @queue than the second element.
+ * @user_data: user data passed to @func.
+ *
+ * Inserts @data into @queue using @func to determine the new
+ * position.
+ *
+ * This function requires that the @queue is sorted before pushing on
+ * new elements.
+ *
+ * This function is called while holding the @queue's lock.
+ *
+ * For an example of @func see g_async_queue_sort().
+ *
+ * Since: 2.10
+ **/
+void
+g_async_queue_push_sorted_unlocked (GAsyncQueue *queue,
+ gpointer data,
+ GCompareDataFunc func,
+ gpointer user_data)
+{
+ GQueue *q;
+ GList *list;
+
+ g_return_if_fail (queue != NULL);
+
+ q = queue->queue;
+
+ list = q->head;
+ while (list && func (list->data, data, user_data) < 0)
+ list = list->next;
+
+ if (list)
+ g_queue_insert_before (q, list, data);
+ else
+ g_queue_push_tail (q, data);
+}
+
static gpointer
g_async_queue_pop_intern_unlocked (GAsyncQueue* queue, gboolean try,
GTimeVal *end_time)
return queue->queue->length - queue->waiting_threads;
}
+/**
+ * g_async_queue_sort:
+ * @queue: a #GAsyncQueue
+ * @func: the #GCompareDataFunc is used to sort @queue. This
+ * function is passed two elements of the @queue. The function
+ * should return 0 if they are equal, a negative value if the
+ * first element should be higher in the @queue or a positive
+ * value if the first element should be lower in the @queue than
+ * the second element.
+ * @user_data: user data passed to @func
+ *
+ * Sorts @queue using @func.
+ *
+ * This function will lock @queue before it sorts the queue and unlock
+ * it when it is finished.
+ *
+ * If you were sorting a list of priority numbers to make sure the
+ * lowest priority would be at the top of the queue, you could use:
+ * <informalexample><programlisting>
+ * gint id1;
+ * gint id2;
+ *
+ * id1 = GPOINTER_TO_INT (element1);
+ * id2 = GPOINTER_TO_INT (element2);
+ *
+ * return (id2 - id1);
+ * </programlisting></informalexample>
+ *
+ * Since: 2.10
+ **/
+void
+g_async_queue_sort (GAsyncQueue *queue,
+ GCompareDataFunc func,
+ gpointer user_data)
+{
+ g_return_if_fail (queue != NULL);
+ g_return_if_fail (func != NULL);
+
+ g_mutex_lock (queue->mutex);
+ g_async_queue_sort_unlocked (queue, func, user_data);
+ g_mutex_unlock (queue->mutex);
+}
+
+/**
+ * g_async_queue_sort_unlocked:
+ * @queue: a #GAsyncQueue
+ * @func: the #GCompareDataFunc is used to sort @queue. This
+ * function is passed two elements of the @queue. The function
+ * should return 0 if they are equal, a negative value if the
+ * first element should be higher in the @queue or a positive
+ * value if the first element should be lower in the @queue than
+ * the second element.
+ * @user_data: user data passed to @func
+ *
+ * Sorts @queue using @func.
+ *
+ * This function is called while holding the @queue's lock.
+ *
+ * Since: 2.10
+ **/
+void
+g_async_queue_sort_unlocked (GAsyncQueue *queue,
+ GCompareDataFunc func,
+ gpointer user_data)
+{
+ GQueue *q;
+
+ g_return_if_fail (queue != NULL);
+ g_return_if_fail (func != NULL);
+
+ q = queue->queue;
+
+ q->head = g_list_sort_with_data (q->head, func, user_data);
+ q->tail = g_list_last (q->head);
+}
+
+
#define __G_ASYNCQUEUE_C__
#include "galiasdef.c"
*/
/* Get a new GAsyncQueue with the ref_count 1 */
-GAsyncQueue* g_async_queue_new (void);
+GAsyncQueue* g_async_queue_new (void);
/* Lock and unlock a GAsyncQueue. All functions lock the queue for
* themselves, but in certain cirumstances you want to hold the lock longer,
* thus you lock the queue, call the *_unlocked functions and unlock it again.
*/
-void g_async_queue_lock (GAsyncQueue *queue);
-void g_async_queue_unlock (GAsyncQueue *queue);
+void g_async_queue_lock (GAsyncQueue *queue);
+void g_async_queue_unlock (GAsyncQueue *queue);
+
+
/* Ref and unref the GAsyncQueue. */
-GAsyncQueue* g_async_queue_ref (GAsyncQueue *queue);
-void g_async_queue_unref (GAsyncQueue *queue);
+GAsyncQueue* g_async_queue_ref (GAsyncQueue *queue);
+void g_async_queue_unref (GAsyncQueue *queue);
+
+
#ifndef G_DISABLE_DEPRECATED
/* You don't have to hold the lock for calling *_ref and *_unref anymore. */
-void g_async_queue_ref_unlocked (GAsyncQueue *queue);
-void g_async_queue_unref_and_unlock (GAsyncQueue *queue);
+void g_async_queue_ref_unlocked (GAsyncQueue *queue);
+void g_async_queue_unref_and_unlock (GAsyncQueue *queue);
+
+
#endif /* !G_DISABLE_DEPRECATED */
/* Push data into the async queue. Must not be NULL. */
-void g_async_queue_push (GAsyncQueue *queue,
- gpointer data);
-void g_async_queue_push_unlocked (GAsyncQueue *queue,
- gpointer data);
+void g_async_queue_push (GAsyncQueue *queue,
+ gpointer data);
+void g_async_queue_push_unlocked (GAsyncQueue *queue,
+ gpointer data);
+
+void g_async_queue_push_sorted (GAsyncQueue *queue,
+ gpointer data,
+ GCompareDataFunc func,
+ gpointer user_data);
+void g_async_queue_push_sorted_unlocked (GAsyncQueue *queue,
+ gpointer data,
+ GCompareDataFunc func,
+ gpointer user_data);
/* Pop data from the async queue. When no data is there, the thread is blocked
* until data arrives. */
-gpointer g_async_queue_pop (GAsyncQueue *queue);
-gpointer g_async_queue_pop_unlocked (GAsyncQueue *queue);
+gpointer g_async_queue_pop (GAsyncQueue *queue);
+gpointer g_async_queue_pop_unlocked (GAsyncQueue *queue);
+
+
/* Try to pop data. NULL is returned in case of empty queue. */
-gpointer g_async_queue_try_pop (GAsyncQueue *queue);
-gpointer g_async_queue_try_pop_unlocked (GAsyncQueue *queue);
+gpointer g_async_queue_try_pop (GAsyncQueue *queue);
+gpointer g_async_queue_try_pop_unlocked (GAsyncQueue *queue);
+
+
/* Wait for data until at maximum until end_time is reached. NULL is returned
* in case of empty queue. */
-gpointer g_async_queue_timed_pop (GAsyncQueue *queue,
- GTimeVal *end_time);
-gpointer g_async_queue_timed_pop_unlocked (GAsyncQueue *queue,
- GTimeVal *end_time);
+gpointer g_async_queue_timed_pop (GAsyncQueue *queue,
+ GTimeVal *end_time);
+gpointer g_async_queue_timed_pop_unlocked (GAsyncQueue *queue,
+ GTimeVal *end_time);
+
+
/* Return the length of the queue. Negative values mean that threads
* are waiting, positve values mean that there are entries in the
* the number of waiting threads, g_async_queue_length == 0 could also
* mean 'n' entries in the queue and 'n' thread waiting. Such can
* happen due to locking of the queue or due to scheduling. */
-gint g_async_queue_length (GAsyncQueue *queue);
-gint g_async_queue_length_unlocked (GAsyncQueue *queue);
+gint g_async_queue_length (GAsyncQueue *queue);
+gint g_async_queue_length_unlocked (GAsyncQueue *queue);
+void g_async_queue_sort (GAsyncQueue *queue,
+ GCompareDataFunc func,
+ gpointer user_data);
+void g_async_queue_sort_unlocked (GAsyncQueue *queue,
+ GCompareDataFunc func,
+ gpointer user_data);
+
G_END_DECLS
patterntest \
printf-test \
queue-test \
+ asyncqueue-test \
qsort-test \
rand-test \
relation-test \
option_test_LDADD = $(progs_ldadd)
printf_test_LDADD = $(progs_ldadd)
queue_test_LDADD = $(progs_ldadd)
+asyncqueue_test_LDADD = $(thread_ldadd)
qsort_test_LDADD = $(progs_ldadd)
rand_test_LDADD = $(progs_ldadd)
relation_test_LDADD = $(progs_ldadd)
--- /dev/null
+#undef G_DISABLE_ASSERT
+#undef G_LOG_DOMAIN
+
+#ifdef HAVE_CONFIG_H
+# include <config.h>
+#endif
+#include <glib.h>
+
+#include <time.h>
+#include <stdlib.h>
+
+#define d(x) x
+
+#define MAX_THREADS 50
+#define MAX_SORTS 5 /* only applies if
+ ASYC_QUEUE_DO_SORT is set to 1 */
+#define MAX_TIME 20 /* seconds */
+#define MIN_TIME 5 /* seconds */
+
+#define SORT_QUEUE_AFTER 1
+#define SORT_QUEUE_ON_PUSH 1 /* if this is done, the
+ SORT_QUEUE_AFTER is ignored */
+#define QUIT_WHEN_DONE 1
+
+
+#if SORT_QUEUE_ON_PUSH == 1
+# undef SORT_QUEUE_AFTER
+# define SORT_QUEUE_AFTER 0
+#endif
+
+
+static GMainLoop *main_loop = NULL;
+static GThreadPool *thread_pool = NULL;
+static GAsyncQueue *async_queue = NULL;
+
+
+static gint
+sort_compare (gconstpointer p1, gconstpointer p2, gpointer user_data)
+{
+ gint id1;
+ gint id2;
+
+ id1 = GPOINTER_TO_INT (p1);
+ id2 = GPOINTER_TO_INT (p2);
+
+ d(g_print ("comparing #1:%d and #2:%d, returning %d\n",
+ id1, id2, (id2 - id1)));
+
+ return (id2 - id1);
+}
+
+static gboolean
+sort_queue (gpointer user_data)
+{
+ static gint sorts = 0;
+ gboolean can_quit = FALSE;
+ gint sort_multiplier;
+ gint len;
+ gint i;
+
+ sort_multiplier = GPOINTER_TO_INT (user_data);
+
+ if (SORT_QUEUE_AFTER) {
+ d(g_print ("sorting async queue...\n"));
+ g_async_queue_sort (async_queue, sort_compare, NULL);
+
+ sorts++;
+
+ if (sorts >= sort_multiplier) {
+ can_quit = TRUE;
+ }
+
+ g_async_queue_sort (async_queue, sort_compare, NULL);
+ len = g_async_queue_length (async_queue);
+
+ d(g_print ("sorted queue (for %d/%d times, size:%d)...\n", sorts, MAX_SORTS, len));
+ } else {
+ can_quit = TRUE;
+ len = g_async_queue_length (async_queue);
+ d(g_print ("printing queue (size:%d)...\n", len));
+ }
+
+ for (i = 0; i < len; i++) {
+ gpointer p;
+
+ p = g_async_queue_pop (async_queue);
+ d(g_print ("item %d ---> %d\n", i, GPOINTER_TO_INT (p)));
+ }
+
+ if (can_quit && QUIT_WHEN_DONE) {
+ g_main_loop_quit (main_loop);
+ }
+
+ return !can_quit;
+}
+
+static void
+enter_thread (gpointer data, gpointer user_data)
+{
+ gint len;
+ gint id;
+ gulong ms;
+
+ id = GPOINTER_TO_INT (data);
+
+ ms = g_random_int_range (MIN_TIME * 1000, MAX_TIME * 1000);
+ d(g_print ("entered thread with id:%d, adding to queue in:%ld ms\n", id, ms));
+
+ g_usleep (ms * 1000);
+
+ if (SORT_QUEUE_ON_PUSH) {
+ g_async_queue_push_sorted (async_queue, GINT_TO_POINTER (id), sort_compare, NULL);
+ } else {
+ g_async_queue_push (async_queue, GINT_TO_POINTER (id));
+ }
+
+ len = g_async_queue_length (async_queue);
+
+ d(g_print ("thread id:%d added to async queue (size:%d)\n",
+ id, len));
+}
+
+int main (int argc, char *argv[])
+{
+#if defined(G_THREADS_ENABLED) && ! defined(G_THREADS_IMPL_NONE)
+ gint i;
+ gint max_threads = MAX_THREADS;
+ gint max_unused_threads = MAX_THREADS;
+ gint sort_multiplier = MAX_SORTS;
+ gint sort_interval;
+
+ g_thread_init (NULL);
+
+ d(g_print ("creating async queue...\n"));
+ async_queue = g_async_queue_new ();
+
+ g_return_val_if_fail (async_queue != NULL, EXIT_FAILURE);
+
+ d(g_print ("creating thread pool with max threads:%d, max unused threads:%d...\n",
+ max_threads, max_unused_threads));
+ thread_pool = g_thread_pool_new (enter_thread,
+ async_queue,
+ max_threads,
+ FALSE,
+ NULL);
+
+ g_return_val_if_fail (thread_pool != NULL, EXIT_FAILURE);
+
+ g_thread_pool_set_max_unused_threads (max_unused_threads);
+
+ d(g_print ("creating threads...\n"));
+ for (i = 0; i <= max_threads; i++) {
+ GError *error;
+
+ g_thread_pool_push (thread_pool, GINT_TO_POINTER (i), &error);
+
+ if (!error) {
+ g_assert_not_reached ();
+ }
+ }
+
+ if (!SORT_QUEUE_AFTER) {
+ sort_multiplier = 1;
+ }
+
+ sort_interval = ((MAX_TIME / sort_multiplier) + 2) * 1000;
+ d(g_print ("adding timeout of %d seconds to sort %d times\n",
+ sort_interval, sort_multiplier));
+ g_timeout_add (sort_interval, sort_queue, GINT_TO_POINTER (sort_multiplier));
+
+ if (SORT_QUEUE_ON_PUSH) {
+ d(g_print ("sorting when pushing into the queue...\n"));
+ }
+
+ d(g_print ("entering main event loop\n"));
+
+ main_loop = g_main_loop_new (NULL, FALSE);
+ g_main_loop_run (main_loop);
+#endif
+
+ return EXIT_SUCCESS;
+}