/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
/* camel-folder-pt-proxy.c : proxy folder using posix threads */
/*
*
* Author :
* Bertrand Guiheneuf <bertrand@helixcode.com>
*
* Copyright 1999, 2000 HelixCode (http://www.helixcode.com) .
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License as
* published by the Free Software Foundation; either version 2 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA
*/
#include <config.h>
#include "camel-log.h"
#include "camel-marshal-utils.h"
#include "camel-thread-proxy.h"
#include <pthread.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
/* vocabulary:
* operation: commanded by the main thread, executed by the child thread
* callback: commanded by the child thread, generally when an operation is
* completed. Executed in the main thread,
*/
/* needed for proper casts of async funcs when
* calling pthreads_create
*/
typedef void * (*thread_call_func) (void *);
/* forward declarations */
static gboolean
_thread_notification_catch (GIOChannel *source,
GIOCondition condition,
gpointer data);
static void
_notify_availability (CamelThreadProxy *proxy, gchar op_name);
static int
_init_notify_system (CamelThreadProxy *proxy);
/**
* camel_thread_proxy_new: create a new proxy object
*
* Create a new proxy object. This proxy object can be used
* to run async operations and this operations can trigger
* callbacks. It can also be used to proxy signals.
*
* Return value: The newly created proxy object
**/
CamelThreadProxy *
camel_thread_proxy_new (void)
{
CamelThreadProxy *proxy;
CAMEL_LOG_FULL_DEBUG ("Entering CamelThreadProxy::new\n");
proxy = g_new (CamelThreadProxy, 1);
if (!proxy)
return NULL;
proxy->server_op_queue = camel_op_queue_new ();
proxy->client_op_queue = camel_op_queue_new ();
proxy->signal_data_cond = g_cond_new();
proxy->signal_data_mutex = g_mutex_new();
if (_init_notify_system (proxy) < 0) {
g_free (proxy);
return NULL;
}
CAMEL_LOG_FULL_DEBUG ("Leaving CamelThreadProxy::new\n");
return proxy;
}
/**
* camel_thread_proxy_free: free a proxy object
* @proxy: proxy object to free
*
* free a proxy object
**/
void
camel_thread_proxy_free (CamelThreadProxy *proxy)
{
CAMEL_LOG_FULL_DEBUG ("Entering CamelThreadProxy::free\n");
g_cond_free (proxy->signal_data_cond);
g_mutex_free (proxy->signal_data_mutex);
camel_op_queue_free (proxy->server_op_queue);
camel_op_queue_free (proxy->client_op_queue);
CAMEL_LOG_FULL_DEBUG ("Leaving CamelThreadProxy::free\n");
}
/* Operations handling */
/**
* _op_run_free_notify:
* @folder: folder to notify when the operation is completed.
* @op: operation to run.
*
* run an operation, free the operation field
* and then notify the main thread of the op
* completion.
*
* this routine is intended to be called
* in a new thread (in _run_next_op_in_thread)
*
**/
void
_op_run_free_and_notify (CamelOp *op)
{
CamelThreadProxy *th_proxy;
CAMEL_LOG_FULL_DEBUG ("Entering CamelThreadProxy::_op_run_free_and_notify\n");
camel_op_run (op);
camel_op_free (op);
th_proxy = camel_op_get_user_data (op);
_notify_availability (th_proxy, 'a');
CAMEL_LOG_FULL_DEBUG ("Leaving CamelThreadProxy::_op_run_free_and_notify\n");
}
/**
* _run_next_op_in_thread:
* @proxy_object:
*
* run the next operation pending in the proxy
* operation queue
**/
static void
_run_next_op_in_thread (CamelThreadProxy *proxy)
{
CamelOp *op;
CamelOpQueue *server_op_queue;
pthread_t thread;
CAMEL_LOG_FULL_DEBUG ("Entering CamelThreadProxy::_run_next_op_in_thread\n");
server_op_queue = proxy->server_op_queue;
/* get the next pending operation */
op = camel_op_queue_pop_op (server_op_queue);
if (!op) {
camel_op_queue_set_service_availability (server_op_queue, TRUE);
return;
}
/* run the operation in a child thread */
pthread_create (&thread, NULL, (thread_call_func) _op_run_free_and_notify, op);
CAMEL_LOG_FULL_DEBUG ("Leaving CamelThreadProxy::_run_next_op_in_thread\n");
}
/**
* camel_thread_proxy_push_op: push an operation in the proxy operation queue
* @proxy: proxy object
* @op: operation to push in the execution queue
*
* if no thread is currently running, executes the
* operation directly, otherwise push the operation
* in the proxy operation queue.
**/
void
camel_thread_proxy_push_op (CamelThreadProxy *proxy, CamelOp *op)
{
CamelOpQueue *server_op_queue;
CAMEL_LOG_FULL_DEBUG ("Entering CamelThreadProxy::camel_thread_proxy_push_op\n");
g_assert (proxy);
server_op_queue = proxy->server_op_queue;
/* put the proxy object in the user data
so that it can be notified when the
operation is completed */
camel_op_set_user_data (op, (gpointer)proxy);
/* get next operation */
camel_op_queue_push_op (server_op_queue, op);
if (camel_op_queue_get_service_availability (server_op_queue)) {
/* no thread is currently running, run
* the next operation. */
camel_op_queue_set_service_availability (server_op_queue, FALSE);
/* when the operation is completed in the
child thread the main thread gets
notified and executes next operation
(see _thread_notification_catch, case 'a')
so there is no need to set the service
availability to FALSE except here
*/
_run_next_op_in_thread (proxy);
}
CAMEL_LOG_FULL_DEBUG ("Leaving CamelThreadProxy::camel_thread_proxy_push_op\n");
}
/**
* _op_run_and_free: Run an operation and free it
* @op: Operation object
*
* Run an operation object in the current thread
* and free it.
**/
static void
_op_run_and_free (CamelOp *op)
{
CAMEL_LOG_FULL_DEBUG ("Entering CamelThreadProxy::_op_run_and_free\n");
camel_op_run (op);
camel_op_free (op);
CAMEL_LOG_FULL_DEBUG ("Leaving CamelThreadProxy::_op_run_and_free\n");
}
/* Callbacks handling */
/**
* _run_next_cb: Run next callback pending in a proxy object
* @proxy: Proxy object
*
* Run next callback in the callback queue of a proxy object
**/
static void
_run_next_cb (CamelThreadProxy *proxy)
{
CamelOp *op;
CamelOpQueue *client_op_queue;
CAMEL_LOG_FULL_DEBUG ("Entering CamelThreadProxy::_run_next_cb\n");
client_op_queue = proxy->client_op_queue;
/* get the next pending operation */
op = camel_op_queue_pop_op (client_op_queue);
if (!op) return;
/* run the operation in the main thread */
_op_run_and_free (op);
CAMEL_LOG_FULL_DEBUG ("Leaving CamelThreadProxy::_run_next_cb\n");
}
/**
* camel_thread_proxy_push_cb: push a callback in the client queue
* @proxy: proxy object concerned by the callback
* @cb: callback to push
*
* Push an operation in the client queue, ie the queue
* containing the operations (callbacks) intended to be
* executed in the main thread.
**/
void
camel_thread_proxy_push_cb (CamelThreadProxy *proxy, CamelOp *cb)
{
CamelOpQueue *client_op_queue;
CAMEL_LOG_FULL_DEBUG ("Entering CamelThreadProxy::camel_thread_proxy_push_cb\n");
client_op_queue = proxy->client_op_queue;
/* put the proxy object in the user data
so that it can be notified when the
operation is completed */
camel_op_set_user_data (cb, (gpointer)proxy);
/* push the callback in the client queue */
camel_op_queue_push_op (client_op_queue, cb);
/* tell the main thread a new callback is there */
_notify_availability (proxy, 'c');
CAMEL_LOG_FULL_DEBUG ("Leaving CamelThreadProxy::camel_thread_proxy_push_cb\n");
}
/**
* _init_notify_system: set the notify channel up
* @proxy: proxy object
*
* called once to set the notification channel up
**/
static int
_init_notify_system (CamelThreadProxy *proxy)
{
int filedes[2];
CAMEL_LOG_FULL_DEBUG ("Entering CamelThreadProxy::_init_notify_system\n");
/* set up the notification channel */
if (pipe (filedes) < 0) {
CAMEL_LOG_WARNING ("could not create pipe in CamelThreadProxy::_init_notify_system\n");
CAMEL_LOG_FULL_DEBUG ("Full error message : %s\n", strerror(errno));
return -1;
}
proxy->pipe_client_fd = filedes [0];
proxy->pipe_server_fd = filedes [1];
proxy->notify_source = g_io_channel_unix_new (filedes [0]);
proxy->notify_channel = g_io_channel_unix_new (filedes [1]);
/* the _thread_notification_catch function
* will be called in the main thread when the
* child thread writes some data in the channel */
g_io_add_watch (proxy->notify_source, G_IO_IN,
_thread_notification_catch,
proxy);
CAMEL_LOG_FULL_DEBUG ("Leaving CamelThreadProxy::_init_notify_system\n");
return 1;
}
/**
* _notify_availability: notify the main thread from an event
* @proxy: proxy object
* @op_name: operation name
*
* called by child thread to notify the main
* thread something is available for him.
* What this thing is depends on @op_name:
*
* 'a' : thread available. That means the thread is ready
* to process an operation.
* 's' : a signal is available. Used by the signal proxy.
*
*/
static void
_notify_availability (CamelThreadProxy *proxy, gchar op_name)
{
GIOChannel *notification_channel;
guint bytes_written;
CAMEL_LOG_FULL_DEBUG ("Entering CamelThreadProxy::_notify_availability\n");
notification_channel = proxy->notify_channel;
do {
/* the write operation will trigger the
* watch on the main thread side */
g_io_channel_write (notification_channel,
&op_name,
1,
&bytes_written);
} while (bytes_written < 1);
CAMEL_LOG_FULL_DEBUG ("Leaving CamelThreadProxy::_notify_availability\n");
}
/* signal proxying */
/**
* _signal_marshaller_server_side: called in the child thread to proxy a signal
* @object:
* @data:
* @n_args:
* @args:
*
*
**/
static void
_signal_marshaller_server_side (GtkObject *object,
gpointer data,
guint n_args,
GtkArg *args)
{
CamelThreadProxy *proxy;
guint signal_id;
CAMEL_LOG_FULL_DEBUG ("Entering CamelThreadProxy::_signal_marshaller_server_side\n");
proxy = CAMEL_THREAD_PROXY (gtk_object_get_data (object, "__proxy__"));
signal_id = (guint)data;
g_assert (proxy);
g_mutex_lock (proxy->signal_data_mutex);
/* we are going to wait for the main client thread
* to have emitted the last signal we asked him
* to proxy.
*/
while (proxy->signal_data.args)
g_cond_wait (proxy->signal_data_cond,
proxy->signal_data_mutex);
proxy->signal_data.signal_id = signal_id;
proxy->signal_data.args = args;
g_mutex_unlock (proxy->signal_data_mutex);
/* tell the main thread there is a signal pending */
_notify_availability (proxy, 's');
CAMEL_LOG_FULL_DEBUG ("Leaving CamelThreadProxy::_signal_marshaller_server_side\n");
}
static void
_signal_marshaller_client_side (CamelThreadProxy *proxy)
{
g_mutex_lock (proxy->signal_data_mutex);
g_assert (proxy->signal_data.args);
CAMEL_LOG_FULL_DEBUG ("Entering CamelThreadProxy::_signal_marshaller_client_side\n");
/* emit the pending signal */
gtk_signal_emitv (GTK_OBJECT (proxy),
proxy->signal_data.signal_id,
proxy->signal_data.args);
proxy->signal_data.args = NULL;
/* if waiting for the signal to be treated,
* awake the client thread up
*/
g_cond_signal (proxy->signal_data_cond);
g_mutex_unlock (proxy->signal_data_mutex);
CAMEL_LOG_FULL_DEBUG ("Leaving CamelThreadProxy::_signal_marshaller_client_side\n");
}
/**
* camel_thread_proxy_add_signals: init the signal proxy
* @proxy: proxy
* @proxy_object: Proxy Gtk Object
* @real_object: Real Gtk Object
* @signal_to_proxy: NULL terminated array of signal name
*
* Add some signals to the list of signals to be
* proxied by the proxy object.
* The signals emitted by the real object in the child
* thread are reemited by the proxy object in the
* main thread.
**/
void
camel_thread_proxy_add_signals (CamelThreadProxy *proxy,
GtkObject *proxy_object,
GtkObject *real_object,
char *signal_to_proxy[])
{
guint i;
CAMEL_LOG_FULL_DEBUG ("Entering CamelThreadProxy::camel_thread_proxy_init_signals\n");
for (i=0; signal_to_proxy[i]; i++) {
/* connect the signal to the signal marshaller
* user_data is the signal id */
gtk_signal_connect_full (GTK_OBJECT (real_object),
signal_to_proxy[i],
NULL,
_signal_marshaller_server_side,
(gpointer)gtk_signal_lookup (signal_to_proxy[i],
GTK_OBJECT_CLASS (real_object)->type),
NULL,
TRUE,
FALSE);
}
CAMEL_LOG_FULL_DEBUG ("Leaving CamelThreadProxy::camel_thread_proxy_init_signals\n");
}
/**** catch notification from child thread ****/
/**
* _thread_notification_catch: call by glib loop when data is available on the thread io channel
* @source:
* @condition:
* @data:
*
* called by watch set on the IO channel
*
* Return value: TRUE because we don't want the watch to be removed
**/
static gboolean
_thread_notification_catch (GIOChannel *source,
GIOCondition condition,
gpointer data)
{
CamelThreadProxy *proxy = CAMEL_THREAD_PROXY (data);
gchar op_name;
guint bytes_read;
GIOError error;
CAMEL_LOG_FULL_DEBUG ("Entering CamelThreadProxy::_thread_notification_catch\n");
error = g_io_channel_read (source,
&op_name,
1,
&bytes_read);
while ((!error) && (bytes_read == 1)) {
switch (op_name) {
case 'a': /* the thread is OK for a new operation */
_run_next_op_in_thread (proxy);
break;
case 's': /* there is a pending signal to proxy */
_signal_marshaller_client_side (proxy);
break;
case 'c': /* there is a cb pending in the main thread */
_run_next_cb (proxy);
break;
}
error = g_io_channel_read (source,
&op_name,
1,
&bytes_read);
}
CAMEL_LOG_FULL_DEBUG ("Leaving CamelThreadProxy::_thread_notification_catch\n");
/* do not remove the io watch */
return TRUE;
}