aboutsummaryrefslogtreecommitdiffstats
path: root/camel/camel-session.c
diff options
context:
space:
mode:
Diffstat (limited to 'camel/camel-session.c')
-rw-r--r--camel/camel-session.c186
1 files changed, 184 insertions, 2 deletions
diff --git a/camel/camel-session.c b/camel/camel-session.c
index a6e70bcd98..06b7bb8799 100644
--- a/camel/camel-session.c
+++ b/camel/camel-session.c
@@ -61,6 +61,12 @@ static char *get_storage_path (CamelSession *session,
CamelService *service,
CamelException *ex);
+#ifdef ENABLE_THREADS
+static void *session_thread_msg_new(CamelSession *session, CamelSessionThreadOps *ops, unsigned int size);
+static void session_thread_msg_free(CamelSession *session, CamelSessionThreadMsg *msg);
+static int session_thread_queue(CamelSession *session, CamelSessionThreadMsg *msg, int flags);
+static void session_thread_wait(CamelSession *session, int id);
+#endif
/* The vfolder provider is always avilable */
static CamelProvider vee_provider = {
@@ -87,6 +93,10 @@ camel_session_init (CamelSession *session)
session->priv = g_malloc0(sizeof(*session->priv));
#ifdef ENABLE_THREADS
session->priv->lock = g_mutex_new();
+ session->priv->thread_lock = g_mutex_new();
+ session->priv->thread_id = 1;
+ session->priv->thread_active = g_hash_table_new(NULL, NULL);
+ session->priv->thread_queue = NULL;
#endif
}
@@ -105,6 +115,12 @@ camel_session_finalise (CamelObject *o)
{
CamelSession *session = (CamelSession *)o;
+#ifdef ENABLE_THREADS
+ g_hash_table_destroy(session->priv->thread_active);
+ if (session->priv->thread_queue)
+ e_thread_destroy(session->priv->thread_queue);
+#endif
+
g_free(session->storage_path);
g_hash_table_foreach_remove (session->providers,
camel_session_destroy_provider, NULL);
@@ -112,8 +128,8 @@ camel_session_finalise (CamelObject *o)
#ifdef ENABLE_THREADS
g_mutex_free(session->priv->lock);
-#endif
-
+ g_mutex_free(session->priv->thread_lock);
+#endif
g_free(session->priv);
}
@@ -127,6 +143,13 @@ camel_session_class_init (CamelSessionClass *camel_session_class)
camel_session_class->get_service = get_service;
camel_session_class->get_storage_path = get_storage_path;
+#ifdef ENABLE_THREADS
+ camel_session_class->thread_msg_new = session_thread_msg_new;
+ camel_session_class->thread_msg_free = session_thread_msg_free;
+ camel_session_class->thread_queue = session_thread_queue;
+ camel_session_class->thread_wait = session_thread_wait;
+#endif
+
if (vee_provider.service_cache == NULL) {
vee_provider.object_types[CAMEL_PROVIDER_STORE] = camel_vee_store_get_type ();
vee_provider.service_cache = g_hash_table_new (camel_url_hash, camel_url_equal);
@@ -686,3 +709,162 @@ camel_session_get_filter_driver (CamelSession *session,
{
return CS_CLASS (session)->get_filter_driver (session, type, ex);
}
+
+#ifdef ENABLE_THREADS
+
+static void *session_thread_msg_new(CamelSession *session, CamelSessionThreadOps *ops, unsigned int size)
+{
+ CamelSessionThreadMsg *m;
+
+ g_assert(size >= sizeof(*m));
+
+ m = g_malloc0(size);
+ m->ops = ops;
+
+ CAMEL_SESSION_LOCK(session, thread_lock);
+ m->id = session->priv->thread_id++;
+ g_hash_table_insert(session->priv->thread_active, (void *)m->id, m);
+ CAMEL_SESSION_UNLOCK(session, thread_lock);
+
+ return m;
+}
+
+static void session_thread_msg_free(CamelSession *session, CamelSessionThreadMsg *msg)
+{
+ g_assert(msg->ops != NULL);
+
+ printf("free message %p session %p\n", msg, session);
+
+ CAMEL_SESSION_LOCK(session, thread_lock);
+ g_hash_table_remove(session->priv->thread_active, (void *)msg->id);
+ CAMEL_SESSION_UNLOCK(session, thread_lock);
+
+ printf("free msg, ops->free = %p\n", msg->ops->free);
+
+ if (msg->ops->free)
+ msg->ops->free(session, msg);
+ g_free(msg);
+}
+
+static void session_thread_destroy(EThread *thread, CamelSessionThreadMsg *msg, CamelSession *session)
+{
+ printf("destroy message %p session %p\n", msg, session);
+ session_thread_msg_free(session, msg);
+}
+
+static void session_thread_received(EThread *thread, CamelSessionThreadMsg *msg, CamelSession *session)
+{
+ printf("receive message %p session %p\n", msg, session);
+ if (msg->ops->receive)
+ msg->ops->receive(session, msg);
+}
+
+static int session_thread_queue(CamelSession *session, CamelSessionThreadMsg *msg, int flags)
+{
+ int id;
+
+ CAMEL_SESSION_LOCK(session, thread_lock);
+ if (session->priv->thread_queue == NULL) {
+ session->priv->thread_queue = e_thread_new(E_THREAD_QUEUE);
+ e_thread_set_msg_destroy(session->priv->thread_queue, (EThreadFunc)session_thread_destroy, session);
+ e_thread_set_msg_received(session->priv->thread_queue, (EThreadFunc)session_thread_received, session);
+ }
+ CAMEL_SESSION_UNLOCK(session, thread_lock);
+
+ id = msg->id;
+ e_thread_put(session->priv->thread_queue, &msg->msg);
+
+ return id;
+}
+
+static void session_thread_wait(CamelSession *session, int id)
+{
+ int wait;
+
+ /* we just busy wait, only other alternative is to setup a reply port? */
+ do {
+ CAMEL_SESSION_LOCK(session, thread_lock);
+ wait = g_hash_table_lookup(session->priv->thread_active, (void *)id) != NULL;
+ CAMEL_SESSION_UNLOCK(session, thread_lock);
+ if (wait) {
+ usleep(20000);
+ }
+ } while (wait);
+}
+
+/**
+ * camel_session_thread_msg_new:
+ * @session:
+ * @ops:
+ * @size:
+ *
+ * Create a new thread message, using ops as the receive/reply/free
+ * ops, of @size bytes.
+ *
+ * @ops points to the operations used to recieve/process and finally
+ * free the message.
+ **/
+void *camel_session_thread_msg_new(CamelSession *session, CamelSessionThreadOps *ops, unsigned int size)
+{
+ g_assert(CAMEL_IS_SESSION(session));
+ g_assert(ops != NULL);
+ g_assert(size >= sizeof(CamelSessionThreadMsg));
+
+ return CS_CLASS (session)->thread_msg_new(session, ops, size);
+}
+
+/**
+ * camel_session_thread_msg_free:
+ * @session:
+ * @msg:
+ *
+ * Free a @msg. Note that the message must have been allocated using
+ * msg_new, and must nto have been submitted to any queue function.
+ **/
+void camel_session_thread_msg_free(CamelSession *session, CamelSessionThreadMsg *msg)
+{
+ g_assert(CAMEL_IS_SESSION(session));
+ g_assert(msg != NULL);
+ g_assert(msg->ops != NULL);
+
+ return CS_CLASS (session)->thread_msg_free(session, msg);
+}
+
+/**
+ * camel_session_thread_queue:
+ * @session:
+ * @msg:
+ * @flags: queue type flags, currently 0.
+ *
+ * Queue a thread message in another thread for processing.
+ * The operation should be (but needn't) run in a queued manner
+ * with other operations queued in this manner.
+ *
+ * Return value: The id of the operation queued.
+ **/
+int camel_session_thread_queue(CamelSession *session, CamelSessionThreadMsg *msg, int flags)
+{
+ g_assert(CAMEL_IS_SESSION(session));
+ g_assert(msg != NULL);
+
+ return CS_CLASS (session)->thread_queue(session, msg, flags);
+}
+
+/**
+ * camel_session_thread_wait:
+ * @session:
+ * @id:
+ *
+ * Wait on an operation to complete (by id).
+ **/
+void camel_session_thread_wait(CamelSession *session, int id)
+{
+ g_assert(CAMEL_IS_SESSION(session));
+
+ if (id == -1)
+ return;
+
+ return CS_CLASS (session)->thread_wait(session, id);
+}
+
+#endif