From 538be0680e04babfa4a42132e8c6188c4b23efa2 Mon Sep 17 00:00:00 2001 From: Matthew Barnes Date: Thu, 20 Dec 2007 17:58:09 +0000 Subject: ** Fixes bug #362638 2007-12-20 Matthew Barnes ** Fixes bug #362638 * calendar/gui/alarm-notify/alarm-notify.c: * calendar/gui/alarm-notify/alarm-notify.h: * calendar/gui/alarm-notify/alarm-queue.c: Rewrite message passing to use GThreadPool instead of EThread. * mail/mail-mt.h: Overhaul the message passing API: - Define a MailMsg type as the base message struct. - Define types for the various callback functions. - Add a priority value to each message (not yet used). - Add a reference count to each message. - Define a MailMsgInfo type for the virtual function table. - Record the size of message sub-types in MailMsgInfo. - New/changed functions: mail_msg_new() - Easier to use. mail_msg_ref() - Increase reference count. mail_msg_unref() - Decrease reference count. mail_msg_main_loop_push() } mail_msg_unordered_push() } Submit MailMsgs to various mail_msg_fast_ordered_push() } message-processing threads. mail_msg_slow_ordered_push() } * mail/mail-mt.c (mail_msg_new): Use GSlice for memory allocation. * mail/mail-mt.c (mail_msg_ref), (mail_msg_unref): New functions increment/decrement a MailMsg's reference count. * mail/mail-mt.c (mail_cancel_hood_add), (mail_cancel_hook_remove): Convert the 'cancel_hook_list' from an EDList to a GHookList and modify the API accordingly. * mail/mail-mt.c: Use GThreadPools instead of EThreads. Use GAsyncQueues instead of EMsgPorts. * mail/em-composer-utils.c: * mail/em-folder-browser.c: * mail/em-folder-properties.c: * mail/em-folder-tree.c: * mail/em-folder-utils.c: * mail/em-folder-view.c: * mail/em-format-html-print.c: * mail/em-format-html.c: * mail/em-subscribe-editor.c: * mail/em-sync-stream.c: * mail/importers/elm-importer.c: * mail/importers/mail-importer.c: * mail/importers/pine-importer.c: * mail/mail-component.c: * mail/mail-folder-cache.c: * mail/mail-mt.c: * mail/mail-ops.c: * mail/mail-ops.h: * mail/mail-send-recv.c: * mail/mail-session.c: * mail/mail-vfolder.c: * mail/message-list.c: * plugins/folder-unsubscribe/folder-unsubscribe.c: * plugins/groupwise-features/share-folder-common.c: * plugins/exchange-operations/exchange-folder.c: * plugins/mark-all-read/mark-all-read.c: * plugins/mailing-list-actions/mailing-list-actions.c: * plugins/itip-formatter/itip-formatter.c: * plugins/save-attachments/save-attachments.c: Use the new MailMsg API for messages. svn path=/trunk/; revision=34730 --- mail/em-sync-stream.c | 344 ++++++++++++++++---------------------------------- 1 file changed, 106 insertions(+), 238 deletions(-) (limited to 'mail/em-sync-stream.c') diff --git a/mail/em-sync-stream.c b/mail/em-sync-stream.c index 1224e468bd..afa8bef8ea 100644 --- a/mail/em-sync-stream.c +++ b/mail/em-sync-stream.c @@ -25,42 +25,17 @@ #include #endif -#include +#include "em-sync-stream.h" + #include -#include +#include #include -#include -#include "em-sync-stream.h" +#include #include "mail-mt.h" -#define LOG_STREAM - -#define d(x) - #define EMSS_CLASS(x) ((EMSyncStreamClass *)(((CamelObject *)(x))->klass)) -struct _EMSyncStreamPrivate { - /* FIXME: use a single data port/gui channel for all instances */ - /* TODO: possibly just use one of the mail-mt ports ... */ - struct _EMsgPort *data_port, *reply_port; - struct _GIOChannel *gui_channel; - guint gui_watch; - - char *buf_data; - int buf_used; - int buf_size; - -#ifdef LOG_STREAM - FILE *logfd; -#endif -}; - -#ifdef LOG_STREAM -static int dolog; -#endif - -/* Should probably expose messages to outside world ... so subclasses can extend */ enum _write_msg_t { EMSS_WRITE, EMSS_FLUSH, @@ -68,275 +43,168 @@ enum _write_msg_t { }; struct _write_msg { - EMsg msg; + EMSyncStream *emss; + EFlag *done; enum _write_msg_t op; - const char *data; - size_t n; + const gchar *string; + gsize len; }; -static void em_sync_stream_class_init (EMSyncStreamClass *klass); -static void em_sync_stream_init (CamelObject *object); -static void em_sync_stream_finalize (CamelObject *object); - -static ssize_t stream_write(CamelStream *stream, const char *buffer, size_t n); -static int stream_close(CamelStream *stream); -static int stream_flush(CamelStream *stream); - static CamelStreamClass *parent_class = NULL; -CamelType -em_sync_stream_get_type (void) -{ - static CamelType type = CAMEL_INVALID_TYPE; - - if (type == CAMEL_INVALID_TYPE) { -#ifdef LOG_STREAM - dolog = getenv("EVOLUTION_MAIL_LOG_HTML") != NULL; -#endif - type = camel_type_register (CAMEL_STREAM_TYPE, - "EMSyncStream", - sizeof (EMSyncStream), - sizeof (EMSyncStreamClass), - (CamelObjectClassInitFunc) em_sync_stream_class_init, - NULL, - (CamelObjectInitFunc) em_sync_stream_init, - (CamelObjectFinalizeFunc) em_sync_stream_finalize); - } - - return type; -} - -static void -em_sync_stream_class_init (EMSyncStreamClass *klass) -{ - CamelStreamClass *stream_class = CAMEL_STREAM_CLASS (klass); - - parent_class = (CamelStreamClass *) CAMEL_STREAM_TYPE; - - /* virtual method overload */ - stream_class->write = stream_write; - stream_class->flush = stream_flush; - stream_class->close = stream_close; -} - static gboolean -emcs_gui_received(GIOChannel *source, GIOCondition cond, void *data) +emss_process_message (struct _write_msg *msg) { - EMSyncStream *emss = data; - struct _EMSyncStreamPrivate *p = emss->priv; - struct _write_msg *msg; - - d(printf("%p: gui sync op job waiting\n", emss)); - - msg = (struct _write_msg *)e_msgport_get(p->data_port); - /* Should never happen ... */ - if (msg == NULL) - return TRUE; - - d(printf("%p: running sync op %d\n", emss, msg->op)); - - /* force out any pending data before doing anything else */ - if (p->buf_used > 0) { - EMSS_CLASS(emss)->sync_write((CamelStream *)emss, p->buf_data, p->buf_used); -#ifdef LOG_STREAM - if (p->logfd) - fwrite(p->buf_data, 1, p->buf_used, p->logfd); -#endif - p->buf_used = 0; + struct _EMSyncStream *emss = msg->emss; + + /* Force out any pending data before doing anything else. */ + if (emss->buffer != NULL && emss->buffer->len > 0) { + EMSS_CLASS (emss)->sync_write ( + CAMEL_STREAM (emss), emss->buffer->str, + emss->buffer->len); + g_string_set_size (emss->buffer, 0); } - /* FIXME: need to handle return values */ - switch (msg->op) { - case EMSS_WRITE: - EMSS_CLASS(emss)->sync_write((CamelStream *)emss, msg->data, msg->n); -#ifdef LOG_STREAM - if (p->logfd) - fwrite(msg->data, 1, msg->n, p->logfd); -#endif - break; - case EMSS_FLUSH: - EMSS_CLASS(emss)->sync_flush((CamelStream *)emss); - break; - case EMSS_CLOSE: - EMSS_CLASS(emss)->sync_close((CamelStream *)emss); -#ifdef LOG_STREAM - if (p->logfd) { - fclose(p->logfd); - p->logfd = NULL; - } -#endif - break; + case EMSS_WRITE: + EMSS_CLASS (emss)->sync_write ( + CAMEL_STREAM (emss), msg->string, msg->len); + break; + case EMSS_FLUSH: + EMSS_CLASS (emss)->sync_flush ( + CAMEL_STREAM (emss)); + break; + case EMSS_CLOSE: + EMSS_CLASS (emss)->sync_close ( + CAMEL_STREAM (emss)); + break; } - e_msgport_reply((EMsg *)msg); - d(printf("%p: gui sync op jobs done\n", emss)); + e_flag_set (msg->done); - return TRUE; + return FALSE; } static void -em_sync_stream_init (CamelObject *object) +emss_sync_op (EMSyncStream *emss, enum _write_msg_t op, + const gchar *string, gsize len) { - EMSyncStream *emss = (EMSyncStream *)object; - struct _EMSyncStreamPrivate *p; - - p = emss->priv = g_malloc0(sizeof(*p)); - - p->data_port = e_msgport_new(); - p->reply_port = e_msgport_new(); - -#ifndef G_OS_WIN32 - p->gui_channel = g_io_channel_unix_new(e_msgport_fd(p->data_port)); -#else - p->gui_channel = g_io_channel_win32_new_socket(e_msgport_fd(p->data_port)); -#endif - p->gui_watch = g_io_add_watch(p->gui_channel, G_IO_IN, emcs_gui_received, emss); - -#ifdef LOG_STREAM - if (dolog) { - char name[32]; - static int count; - - sprintf(name, "sync-stream.%d.html", count++); - printf("Saving raw data stream to '%s'\n", name); - p->logfd = fopen(name, "w"); - } -#endif - - d(printf("%p: new emss\n", emss)); -} - -static void -sync_op(EMSyncStream *emss, enum _write_msg_t op, const char *data, size_t n) -{ - struct _EMSyncStreamPrivate *p = emss->priv; struct _write_msg msg; - EMsg *reply_msg; - - d(printf("%p: launching sync op %d\n", emss, op)); - /* we do everything synchronous, we should never have any locks, and - this prevents overflow from banked up data */ - - msg.msg.reply_port = p->reply_port; + msg.done = e_flag_new (); + msg.emss = emss; msg.op = op; - msg.data = data; - msg.n = n; + msg.string = string; + msg.len = len; - e_msgport_put(p->data_port, &msg.msg); - reply_msg = e_msgport_wait(p->reply_port); - g_return_if_fail (reply_msg == &msg.msg); + camel_object_ref (emss); - d(printf("%p: returned sync op %d\n", emss, op)); -} + g_idle_add ((GSourceFunc) emss_process_message, &msg); -static void -em_sync_stream_finalize (CamelObject *object) -{ - EMSyncStream *emss = (EMSyncStream *)object; - struct _EMSyncStreamPrivate *p = emss->priv; - - /* TODO: is this stuff safe to do in another thread? */ - g_source_remove(p->gui_watch); - g_io_channel_unref(p->gui_channel); - - e_msgport_destroy(p->data_port); - e_msgport_destroy(p->reply_port); + e_flag_wait (msg.done); + e_flag_free (msg.done); - p->data_port = NULL; - p->reply_port = NULL; - - g_free(p->buf_data); - -#ifdef LOG_STREAM - if (p->logfd) - fclose(p->logfd); -#endif - - g_free(p); + camel_object_unref (emss); } -static ssize_t -stream_write (CamelStream *stream, const char *buffer, size_t n) +static gssize +emss_stream_write (CamelStream *stream, const gchar *string, gsize len) { EMSyncStream *emss = EM_SYNC_STREAM (stream); - struct _EMSyncStreamPrivate *p = emss->priv; if (emss->cancel) return -1; - if (pthread_equal(pthread_self(), mail_gui_thread)) { - EMSS_CLASS(emss)->sync_write(stream, buffer, n); -#ifdef LOG_STREAM - if (p->logfd) - fwrite(buffer, 1, n, p->logfd); -#endif - } else if (p->buf_size > 0) { - size_t left = p->buf_size-p->buf_used; - - if (n >= left) { - sync_op(emss, EMSS_WRITE, buffer, n); - } else { - memcpy(p->buf_data + p->buf_used, buffer, n); - p->buf_used += n; - } + if (mail_in_main_thread ()) { + EMSS_CLASS (emss)->sync_write (stream, string, len); + } else if (emss->buffer != NULL) { + if (len < (emss->buffer->allocated_len - emss->buffer->len)) + g_string_append_len (emss->buffer, string, len); + else + emss_sync_op (emss, EMSS_WRITE, string, len); } else { - sync_op(emss, EMSS_WRITE, buffer, n); + emss_sync_op(emss, EMSS_WRITE, string, len); } - return (ssize_t) n; + return (gssize) len; } static int -stream_flush(CamelStream *stream) +emss_stream_flush (CamelStream *stream) { - EMSyncStream *emss = (EMSyncStream *)stream; + EMSyncStream *emss = EM_SYNC_STREAM (stream); if (emss->cancel) return -1; - if (pthread_equal(pthread_self(), mail_gui_thread)) - return ((EMSyncStreamClass *)(((CamelObject *)emss)->klass))->sync_flush(stream); + if (mail_in_main_thread ()) + return EMSS_CLASS (emss)->sync_flush (stream); else - sync_op(emss, EMSS_FLUSH, NULL, 0); + emss_sync_op (emss, EMSS_FLUSH, NULL, 0); return 0; } static int -stream_close(CamelStream *stream) +emss_stream_close (CamelStream *stream) { - EMSyncStream *emss = (EMSyncStream *)stream; + EMSyncStream *emss = EM_SYNC_STREAM (stream); if (emss->cancel) return -1; - d(printf("%p: closing stream\n", stream)); - - if (pthread_equal(pthread_self(), mail_gui_thread)) { -#ifdef LOG_STREAM - if (emss->priv->logfd) { - fclose(emss->priv->logfd); - emss->priv->logfd = NULL; - } -#endif - return ((EMSyncStreamClass *)(((CamelObject *)emss)->klass))->sync_close(stream); - } else - sync_op(emss, EMSS_CLOSE, NULL, 0); + if (mail_in_main_thread ()) + return EMSS_CLASS (emss)->sync_close (stream); + else + emss_sync_op (emss, EMSS_CLOSE, NULL, 0); return 0; } -void -em_sync_stream_set_buffer_size(EMSyncStream *emss, size_t size) +static void +em_sync_stream_class_init (EMSyncStreamClass *class) +{ + CamelStreamClass *stream_class = CAMEL_STREAM_CLASS (class); + + parent_class = (CamelStreamClass *) CAMEL_STREAM_TYPE; + + stream_class->write = emss_stream_write; + stream_class->flush = emss_stream_flush; + stream_class->close = emss_stream_close; +} + +static void +em_sync_stream_finalize (EMSyncStream *emss) { - struct _EMSyncStreamPrivate *p = emss->priv; + if (emss->buffer != NULL) + g_string_free (emss->buffer, TRUE); +} - g_free(p->buf_data); - p->buf_data = g_malloc(size); - p->buf_size = size; - p->buf_used = 0; +CamelType +em_sync_stream_get_type (void) +{ + static CamelType type = CAMEL_INVALID_TYPE; + + if (G_UNLIKELY (type == CAMEL_INVALID_TYPE)) + type = camel_type_register ( + CAMEL_STREAM_TYPE, + "EMSyncStream", + sizeof (EMSyncStream), + sizeof (EMSyncStreamClass), + (CamelObjectClassInitFunc) em_sync_stream_class_init, + NULL, + (CamelObjectInitFunc) NULL, + (CamelObjectFinalizeFunc) em_sync_stream_finalize); + + return type; +} + +void +em_sync_stream_set_buffer_size (EMSyncStream *emss, gsize size) +{ + if (emss->buffer != NULL) + g_string_free (emss->buffer, TRUE); + emss->buffer = g_string_sized_new (size); } -- cgit v1.2.3