/* * libsyncml - A syncml protocol implementation * Copyright (C) 2005 Armin Bauer * Copyright (C) 2009 Michael Bell * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * */ #include #include "sml_queue_internals.h" #include "sml_support.h" #include "sml_error_internals.h" #ifndef DOXYGEN_SHOULD_SKIP_THIS static gboolean _queue_prepare (GSource *source, gint *timeout_) { smlTrace(TRACE_INTERNAL, "%s(%p, %p)", __func__, source, timeout_); *timeout_ = 1; return FALSE; } /* TODO only use me for debugging */ void smlQueueAssert (SmlQueue *queue) { if (queue->tail) { smlAssert(queue->head); } if (queue->prio) { smlAssert(queue->head); } if (queue->head) { smlAssert(queue->tail); } if (g_list_length(queue->head) == 1) { smlAssert(queue->tail == queue->head); } smlAssert(g_list_last(queue->head) == queue->tail); } static gboolean _queue_check (GSource *source) { SmlQueue *queue = *((SmlQueue **)(source + 1)); return smlQueueCheck(queue); } static gboolean _queue_dispatch (GSource *source, GSourceFunc callback, gpointer user_data) { smlTrace(TRACE_INTERNAL, "%s(%p, %p, %p)", __func__, source, callback, user_data); SmlQueue *queue = user_data; smlAssert(queue->handler); while (smlQueueCheck(queue)) smlQueueDispatch(queue); return TRUE; } #endif /* DOXYGEN_SHOULD_SKIP_THIS */ /*! @brief Creates a new asynchronous queue * * This function return the pointer to a newly created SmlQueue * */ SmlQueue* smlQueueNew (GError **error) { CHECK_ERROR_REF SmlQueue *queue = smlTryMalloc0(sizeof(SmlQueue), error); if (!queue) return NULL; if (!g_thread_supported ()) g_thread_init (NULL); queue->mutex = g_mutex_new(); return queue; } void smlQueueFree (SmlQueue *queue) { if (queue->source) smlQueueDetach(queue); if (queue->head) g_list_free(queue->head); g_mutex_free(queue->mutex); smlSafeFree((gpointer *)&queue); } void smlQueueFlush (SmlQueue *queue) { g_mutex_lock(queue->mutex); smlQueueAssert(queue); queue->tail = NULL; g_list_free(queue->head); queue->head = NULL; queue->prio = NULL; smlQueueAssert(queue); g_mutex_unlock(queue->mutex); } gboolean smlQueueCheckPrio (SmlQueue *queue) { g_mutex_lock(queue->mutex); gboolean ret = (queue->prio == NULL) ? FALSE : TRUE; g_mutex_unlock(queue->mutex); return ret; } gboolean smlQueueCheck (SmlQueue *queue) { g_mutex_lock(queue->mutex); gboolean ret = (queue->head == NULL) ? FALSE : TRUE; g_mutex_unlock(queue->mutex); return ret; } gsize smlQueueLength (SmlQueue *queue) { g_mutex_lock(queue->mutex); unsigned int ret = g_list_length(queue->head); g_mutex_unlock(queue->mutex); return ret; } gsize smlQueueLengthPrio (SmlQueue *queue) { g_mutex_lock(queue->mutex); unsigned int ret = g_list_length(queue->prio); g_mutex_unlock(queue->mutex); return ret; } void* smlQueueTryPop (SmlQueue *queue) { smlAssert(queue); void *message = NULL; g_mutex_lock(queue->mutex); smlQueueAssert(queue); if (queue->head) { message = queue->head->data; if (queue->head == queue->tail) queue->tail = NULL; if (queue->prio && message == queue->prio->data) queue->prio = g_list_delete_link(queue->prio, queue->prio); queue->head = g_list_delete_link(queue->head, queue->head); } smlQueueAssert(queue); g_mutex_unlock(queue->mutex); return message; } void* smlQueueTryPopPrio (SmlQueue *queue) { smlAssert(queue); void *message = NULL; g_mutex_lock(queue->mutex); smlQueueAssert(queue); message = queue->prio ? queue->prio->data : NULL; queue->prio = g_list_delete_link(queue->prio, queue->prio); if (message) { queue->head = g_list_remove(queue->head, message); queue->tail = g_list_last(queue->head); } smlQueueAssert(queue); g_mutex_unlock(queue->mutex); return message; } void smlQueueLock (SmlQueue *queue) { smlAssert(queue); g_mutex_lock(queue->mutex); } void smlQueueUnlock (SmlQueue *queue) { smlAssert(queue); g_mutex_unlock(queue->mutex); } void* smlQueuePeek (SmlQueue *queue) { smlAssert(queue); return queue->head ? queue->head->data : NULL; } void* smlQueuePeekPrio (SmlQueue *queue) { smlAssert(queue); void *message = NULL; g_mutex_lock(queue->mutex); message = queue->prio ? queue->prio->data : NULL; smlQueueAssert(queue); g_mutex_unlock(queue->mutex); return message; } void smlQueuePrint (SmlQueue *queue) { smlAssert(queue); g_mutex_lock(queue->mutex); GString *info = g_string_new("Contents of queue "); g_string_append_printf(info, "%p:", queue); GList *m = NULL; for (m = queue->head; m; m = m->next) { g_string_append_printf(info, ", %p (list %p)", m->data, m); } smlTrace(TRACE_INTERNAL, "%s: %s", __func__, VA_STRING(info->str)); g_string_free(info, TRUE); info = g_string_new("Contents of prio queue:"); for (m = queue->prio; m; m = m->next) { g_string_append_printf(info, ", %p (list %p)", m->data, m); } smlTrace(TRACE_INTERNAL, "%s: %s", __func__, VA_STRING(info->str)); g_string_free(info, TRUE); smlTrace(TRACE_INTERNAL, "%s: Tail of queue: %p (list %p)", __func__, queue->tail ? queue->tail->data : NULL, queue->tail); g_mutex_unlock(queue->mutex); } GList* smlQueuePeekNext (SmlQueue *queue, GList *prev) { smlAssert(queue); if (!prev) return queue->head; else return prev->next; } void smlQueuePushHeadPrio (SmlQueue *queue, void *data) { smlAssert(queue); smlAssert(data); g_mutex_lock(queue->mutex); smlQueueAssert(queue); queue->head = g_list_prepend(queue->head, data); if (!queue->tail) queue->tail = queue->head; queue->prio = g_list_prepend(queue->prio, data); smlQueueAssert(queue); g_mutex_unlock(queue->mutex); } void smlQueuePushHead (SmlQueue *queue, void *data) { smlAssert(queue); smlAssert(data); g_mutex_lock(queue->mutex); smlQueueAssert(queue); queue->head = g_list_prepend(queue->head, data); if (!queue->tail) queue->tail = queue->head; smlQueueAssert(queue); g_mutex_unlock(queue->mutex); } /*! @brief Sends a message down a queue * * @param queue The queue to send the message to * @param message The message to send * */ void smlQueueSendPrio (SmlQueue *queue, void *data) { smlAssert(queue); smlAssert(data); g_mutex_lock(queue->mutex); smlQueueAssert(queue); if (queue->tail) { queue->tail = g_list_append(queue->tail, data); queue->tail = queue->tail->next; } else { queue->head = g_list_append(queue->head, data); queue->tail = queue->head; } queue->prio = g_list_append(queue->prio, data); smlQueueAssert(queue); g_mutex_unlock(queue->mutex); } /*! @brief Sends a message down a queue * * @param queue The queue to send the message to * @param message The message to send * */ void smlQueueSend (SmlQueue *queue, void *data) { smlAssert(queue); smlAssert(data); g_mutex_lock(queue->mutex); smlQueueAssert(queue); if (queue->tail) { queue->tail = g_list_append(queue->tail, data); queue->tail = queue->tail->next; } else { queue->head = g_list_append(queue->head, data); queue->tail = queue->head; } smlQueueAssert(queue); g_mutex_unlock(queue->mutex); } /*! @brief Sets the message handler for a queue * * Sets the function that will receive all messages, except the methodcall replies * * @param queue The queue to set the handler on * @param handler The message handler function * @param user_data The userdata that the message handler should receive * */ void smlQueueSetHandler (SmlQueue *queue, SmlQueueHandler handler, void *userdata) { queue->handler = handler; queue->userdata = userdata; } /*! @brief Sets the queue to use the gmainloop with the given context * * This function will attach the SmlQueue as a source to the given context. * The queue will then be check for new messages and the messages will be * handled. * * @param queue The queue to set up * @param context The context to use. NULL for default loop * */ void smlQueueAttach (SmlQueue *queue, GMainContext *context) { smlTrace(TRACE_ENTRY, "%s(%p, %p)", __func__, queue, context); smlAssert(queue); smlAssert(queue->source == NULL); GSourceFuncs *functions = g_malloc0(sizeof(GSourceFuncs)); functions->prepare = _queue_prepare; functions->check = _queue_check; functions->dispatch = _queue_dispatch; functions->finalize = NULL; GSource *source = g_source_new(functions, sizeof(GSource) + sizeof(SmlQueue *)); SmlQueue **queueptr = (SmlQueue **)(source + 1); *queueptr = queue; g_source_set_callback(source, NULL, queue, NULL); queue->source = source; queue->functions = functions; g_source_attach(source, context); queue->context = context; if (context) g_main_context_ref(context); smlTrace(TRACE_EXIT, "%s", __func__); } void smlQueueDetach (SmlQueue *queue) { smlTrace(TRACE_ENTRY, "%s(%p)", __func__, queue); smlAssert(queue); smlAssert(queue->source); g_source_destroy(queue->source); g_source_unref(queue->source); smlSafeFree((gpointer *)&(queue->functions)); queue->source = NULL; if (queue->context) { g_main_context_unref(queue->context); queue->context = NULL; } smlTrace(TRACE_EXIT, "%s", __func__); } gboolean smlQueueIsAttached (SmlQueue *queue) { return queue->source ? TRUE : FALSE; } void smlQueueDispatch (SmlQueue *queue) { void *message = smlQueueTryPop(queue); if (message) queue->handler(message, queue->userdata); } void smlQueueDispatchPrio (SmlQueue *queue) { void *message = smlQueueTryPopPrio(queue); if (message) queue->handler(message, queue->userdata); } /*@}*/