From 5674966eeb7eecfa109ad419f1c997dba69ff0af Mon Sep 17 00:00:00 2001 From: Not Zed Date: Sun, 24 Dec 2000 00:58:02 +0000 Subject: Merge from camel-mt-branch. 2000-12-24 Not Zed * Merge from camel-mt-branch. svn path=/trunk/; revision=7152 --- e-util/ChangeLog | 27 ++ e-util/Makefile.am | 2 + e-util/e-msgport.c | 774 +++++++++++++++++++++++++++++++++++++++++ e-util/e-msgport.h | 79 +++++ e-util/e-sexp.c | 99 ++---- e-util/e-sexp.h | 23 +- filter/ChangeLog | 4 + filter/filter-driver.c | 2 - filter/filter-message-search.c | 2 - libibex/ChangeLog | 24 ++ libibex/Makefile.am | 5 +- libibex/dumpindex.c | 4 + libibex/ibex_block.c | 38 +- libibex/ibex_internal.h | 19 + libibex/testindex.c | 44 ++- 15 files changed, 1070 insertions(+), 76 deletions(-) create mode 100644 e-util/e-msgport.c create mode 100644 e-util/e-msgport.h diff --git a/e-util/ChangeLog b/e-util/ChangeLog index dbb96628d6..d64af7c602 100644 --- a/e-util/ChangeLog +++ b/e-util/ChangeLog @@ -1,3 +1,7 @@ +2000-12-24 Not Zed + + * Merge from camel-mt-branch. + 2000-12-20 JP Rosevear * e-pilot-util.c (e_pilot_utf8_to_pchar): Check for null strings @@ -12,6 +16,29 @@ * Makefile.am: Conditionally build e-pilot-util.[hc] because they depend on pilot-link stuff +2000-12-23 Not Zed + + * e-msgport.c (e_dlist_length): Util function. + (e_thread_new): Another new thread primitive. This one is a + re-usable 'server thread' thingy. + +2000-12-21 Not Zed + + * Makefile.am (libeutil_la_SOURCES): Added msgport stuff. + + * e-msgport.[ch]: Some thread primitives. e_dlist, a + double-linked list (ok, not a thread primitive, but used in ...), + e_msgport - an asynchronous, non-copying message passing + rendesvous port, and e_mutex, a portably configurable mutex. + +2000-12-19 Not Zed + + * e-sexp.c: Make the code compile without being a gtk object (the + old code can still be built as a gtk object if required). Also + removed some dead code. + (e_sexp_ref): New function to ref if we're not a gkt object. + (e_sexp_unref): Same for unref. + 2000-12-13 Larry Ewing * e-html-utils.c (e_text_to_html): make sure we actually make diff --git a/e-util/Makefile.am b/e-util/Makefile.am index eb42fef5e0..f2743b3673 100644 --- a/e-util/Makefile.am +++ b/e-util/Makefile.am @@ -33,6 +33,8 @@ libeutil_la_SOURCES = \ e-list.h \ e-memory.c \ e-memory.h \ + e-msgport.c \ + e-msgport.h \ e-sexp.c \ e-sexp.h \ e-dbhash.c \ diff --git a/e-util/e-msgport.c b/e-util/e-msgport.c new file mode 100644 index 0000000000..107b889635 --- /dev/null +++ b/e-util/e-msgport.c @@ -0,0 +1,774 @@ + + +#include +#include +#include +#include +#include +#include + +#include + +#include "e-msgport.h" + +#include + +void e_dlist_init(EDList *v) +{ + v->head = (EDListNode *)&v->tail; + v->tail = 0; + v->tailpred = (EDListNode *)&v->head; +} + +EDListNode *e_dlist_addhead(EDList *l, EDListNode *n) +{ + n->next = l->head; + n->prev = (EDListNode *)&l->head; + l->head->prev = n; + l->head = n; + return n; +} + +EDListNode *e_dlist_addtail(EDList *l, EDListNode *n) +{ + n->next = (EDListNode *)&l->tail; + n->prev = l->tailpred; + l->tailpred->next = n; + l->tailpred = n; + return n; +} + +EDListNode *e_dlist_remove(EDListNode *n) +{ + n->next->prev = n->prev; + n->prev->next = n->next; + return n; +} + +EDListNode *e_dlist_remhead(EDList *l) +{ + EDListNode *n, *nn; + + n = l->head; + nn = n->next; + if (nn) { + nn->prev = n->prev; + l->head = nn; + return n; + } + return NULL; +} + +EDListNode *e_dlist_remtail(EDList *l) +{ + EDListNode *n, *np; + + n = l->tailpred; + np = n->prev; + if (np) { + np->next = n->next; + l->tailpred = np; + return n; + } + return NULL; +} + +int e_dlist_empty(EDList *l) +{ + return (l->head == (EDListNode *)&l->tail); +} + +int e_dlist_length(EDList *l) +{ + EDListNode *n, *nn; + int count = 0; + + n = l->head; + nn = n->next; + while (nn) { + count++; + n = nn; + nn = n->next; + } + + return 0; +} + +struct _EMsgPort { + EDList queue; + int condwait; /* how many waiting in condwait */ + union { + int pipe[2]; + struct { + int read; + int write; + } fd; + } pipe; + /* @#@$#$ glib stuff */ + GCond *cond; + GMutex *lock; +}; + +#define m(x) + +EMsgPort *e_msgport_new(void) +{ + EMsgPort *mp; + + mp = g_malloc(sizeof(*mp)); + e_dlist_init(&mp->queue); + mp->lock = g_mutex_new(); + mp->cond = g_cond_new(); + mp->pipe.fd.read = -1; + mp->pipe.fd.write = -1; + mp->condwait = 0; + + return mp; +} + +void e_msgport_destroy(EMsgPort *mp) +{ + g_mutex_free(mp->lock); + g_cond_free(mp->cond); + if (mp->pipe.fd.read != -1) { + close(mp->pipe.fd.read); + close(mp->pipe.fd.write); + } + g_free(mp); +} + +/* get a fd that can be used to wait on the port asynchronously */ +int e_msgport_fd(EMsgPort *mp) +{ + int fd; + + g_mutex_lock(mp->lock); + fd = mp->pipe.fd.read; + if (fd == -1) { + pipe(mp->pipe.pipe); + fd = mp->pipe.fd.read; + } + g_mutex_unlock(mp->lock); + + return fd; +} + +void e_msgport_put(EMsgPort *mp, EMsg *msg) +{ + m(printf("put:\n")); + g_mutex_lock(mp->lock); + e_dlist_addtail(&mp->queue, &msg->ln); + if (mp->condwait > 0) { + m(printf("put: condwait > 0, waking up\n")); + g_cond_signal(mp->cond); + } + if (mp->pipe.fd.write != -1) { + m(printf("put: have pipe, writing notification to it\n")); + write(mp->pipe.fd.write, "", 1); + } + g_mutex_unlock(mp->lock); + m(printf("put: done\n")); +} + +EMsg *e_msgport_wait(EMsgPort *mp) +{ + EMsg *msg; + + m(printf("wait:\n")); + g_mutex_lock(mp->lock); + while (e_dlist_empty(&mp->queue)) { + if (mp->pipe.fd.read == -1) { + m(printf("wait: waiting on condition\n")); + mp->condwait++; + g_cond_wait(mp->cond, mp->lock); + m(printf("wait: got condition\n")); + mp->condwait--; + } else { + fd_set rfds; + + m(printf("wait: waitng on pipe\n")); + FD_ZERO(&rfds); + FD_SET(mp->pipe.fd.read, &rfds); + g_mutex_unlock(mp->lock); + select(mp->pipe.fd.read+1, &rfds, NULL, NULL, NULL); + pthread_testcancel(); + g_mutex_lock(mp->lock); + m(printf("wait: got pipe\n")); + } + } + msg = (EMsg *)mp->queue.head; + m(printf("wait: message = %p\n", msg)); + g_mutex_unlock(mp->lock); + m(printf("wait: done\n")); + return msg; +} + +EMsg *e_msgport_get(EMsgPort *mp) +{ + EMsg *msg; + char dummy[1]; + + g_mutex_lock(mp->lock); + msg = (EMsg *)e_dlist_remhead(&mp->queue); + if (msg && mp->pipe.fd.read != -1) + read(mp->pipe.fd.read, dummy, 1); + m(printf("get: message = %p\n", msg)); + g_mutex_unlock(mp->lock); + + return msg; +} + +void e_msgport_reply(EMsg *msg) +{ + if (msg->reply_port) { + e_msgport_put(msg->reply_port, msg); + } + /* else lost? */ +} + +struct _EThread { + EMsgPort *server_port; + EMsgPort *reply_port; + pthread_mutex_t mutex; + e_thread_t type; + int queue_limit; + + int waiting; /* if we are waiting for a new message */ + pthread_t id; /* id of our running child thread */ + GList *id_list; /* if THREAD_NEW, then a list of our child threads */ + + EThreadFunc destroy; + void *destroy_data; + + EThreadFunc received; + void *received_data; + + EThreadFunc lost; + void *lost_data; +}; + +static void thread_destroy_msg(EThread *e, EMsg *m); + +EThread *e_thread_new(e_thread_t type) +{ + EThread *e; + + e = g_malloc0(sizeof(*e)); + pthread_mutex_init(&e->mutex, 0); + e->type = type; + e->server_port = e_msgport_new(); + e->id = ~0; + e->queue_limit = INT_MAX; + + return e; +} + +/* close down the threads & resources etc */ +void e_thread_destroy(EThread *e) +{ + int tries = 0; + int busy = FALSE; + EMsg *msg; + + /* make sure we soak up all the messages first */ + while ( (msg = e_msgport_get(e->server_port)) ) { + thread_destroy_msg(e, msg); + } + + pthread_mutex_lock(&e->mutex); + + switch(e->type) { + case E_THREAD_QUEUE: + case E_THREAD_DROP: + /* if we have a thread, 'kill' it */ + while (e->id != ~0 && tries < 5) { + if (e->waiting == 1) { + pthread_t id = e->id; + e->id = ~0; + pthread_mutex_unlock(&e->mutex); + if (pthread_cancel(id) == 0) + pthread_join(id, 0); + pthread_mutex_lock(&e->mutex); + } else { + printf("thread still active, waiting for it to finish\n"); + pthread_mutex_unlock(&e->mutex); + sleep(1); + pthread_mutex_lock(&e->mutex); + } + tries++; + } + busy = e->id != ~0; + break; + case E_THREAD_NEW: + while (g_list_length(e->id_list) && tries < 5) { + printf("thread(s) still active, waiting for them to finish\n"); + pthread_mutex_unlock(&e->mutex); + sleep(1); + pthread_mutex_lock(&e->mutex); + } + busy = g_list_length(e->id_list) != 0; + break; + } + + pthread_mutex_unlock(&e->mutex); + + /* and clean up, if we can */ + if (busy) { + g_warning("threads were busy, leaked EThread"); + return; + } + + e_msgport_destroy(e->server_port); + g_free(e); +} + +/* set the queue maximum depth, what happens when the queue + fills up depends on the queue type */ +void e_thread_set_queue_limit(EThread *e, int limit) +{ + e->queue_limit = limit; +} + +/* set a msg destroy callback, this can not call any e_thread functions on @e */ +void e_thread_set_msg_destroy(EThread *e, EThreadFunc destroy, void *data) +{ + pthread_mutex_lock(&e->mutex); + e->destroy = destroy; + e->destroy_data = data; + pthread_mutex_unlock(&e->mutex); +} + +/* set a message lost callback, called if any message is discarded */ +void e_thread_set_msg_lost(EThread *e, EThreadFunc lost, void *data) +{ + pthread_mutex_lock(&e->mutex); + e->lost = lost; + e->lost_data = lost; + pthread_mutex_unlock(&e->mutex); +} + +/* set a reply port, if set, then send messages back once finished */ +void e_thread_set_reply_port(EThread *e, EMsgPort *reply_port) +{ + e->reply_port = reply_port; +} + +/* set a received data callback */ +void e_thread_set_msg_received(EThread *e, EThreadFunc received, void *data) +{ + pthread_mutex_lock(&e->mutex); + e->received = received; + e->received_data = data; + pthread_mutex_unlock(&e->mutex); +} + +static void +thread_destroy_msg(EThread *e, EMsg *m) +{ + EThreadFunc func; + void *func_data; + + /* we do this so we never get an incomplete/unmatched callback + data */ + pthread_mutex_lock(&e->mutex); + func = e->destroy; + func_data = e->destroy_data; + pthread_mutex_unlock(&e->mutex); + + if (func) + func(e, m, func_data); +} + +static void +thread_received_msg(EThread *e, EMsg *m) +{ + EThreadFunc func; + void *func_data; + + /* we do this so we never get an incomplete/unmatched callback + data */ + pthread_mutex_lock(&e->mutex); + func = e->received; + func_data = e->received_data; + pthread_mutex_unlock(&e->mutex); + + if (func) + func(e, m, func_data); + else + g_warning("No processing callback for EThread, message unprocessed"); +} + +static void +thread_lost_msg(EThread *e, EMsg *m) +{ + EThreadFunc func; + void *func_data; + + /* we do this so we never get an incomplete/unmatched callback + data */ + pthread_mutex_lock(&e->mutex); + func = e->lost; + func_data = e->lost_data; + pthread_mutex_unlock(&e->mutex); + + if (func) + func(e, m, func_data); +} + +/* the actual thread dispatcher */ +static void * +thread_dispatch(void *din) +{ + EThread *e = din; + EMsg *m; + + printf("dispatch thread started: %ld\n", pthread_self()); + + while (1) { + pthread_mutex_lock(&e->mutex); + m = e_msgport_get(e->server_port); + if (m == NULL) { + /* nothing to do? If we are a 'new' type thread, just quit. + Otherwise, go into waiting (can be cancelled here) */ + switch (e->type) { + case E_THREAD_QUEUE: + case E_THREAD_DROP: + e->waiting = 1; + pthread_mutex_unlock(&e->mutex); + e_msgport_wait(e->server_port); + e->waiting = 0; + break; + case E_THREAD_NEW: + e->id_list = g_list_remove(e->id_list, (void *)pthread_self()); + pthread_mutex_unlock(&e->mutex); + return 0; + } + + continue; + } + pthread_mutex_unlock(&e->mutex); + + printf("got message in dispatch thread\n"); + + /* process it */ + thread_received_msg(e, m); + + /* if we have a reply port, send it back, otherwise, lose it */ + if (m->reply_port) { + e_msgport_reply(m); + } else { + thread_destroy_msg(e, m); + } + } + + /* if we run out of things to process we could conceivably 'hang around' for a bit, + but to do this we need to use the fd interface of the msgport, and its utility + is probably debatable anyway */ + + /* signify we are no longer running */ + /* This code isn't used yet, but would be if we ever had a 'quit now' message implemented */ + pthread_mutex_lock(&e->mutex); + switch (e->type) { + case E_THREAD_QUEUE: + case E_THREAD_DROP: + e->id = ~0; + break; + case E_THREAD_NEW: + e->id_list = g_list_remove(e->id_list, (void *)pthread_self()); + break; + } + pthread_mutex_unlock(&e->mutex); + + return 0; +} + +/* send a message to the thread, start thread if necessary */ +void e_thread_put(EThread *e, EMsg *msg) +{ + pthread_t id; + EMsg *dmsg = NULL; + + pthread_mutex_lock(&e->mutex); + + /* the caller forgot to tell us what to do, well, we can't do anything can we */ + if (e->received == NULL) { + pthread_mutex_unlock(&e->mutex); + g_warning("EThread called with no receiver function, no work to do!"); + thread_destroy_msg(e, msg); + return; + } + + msg->reply_port = e->reply_port; + + switch(e->type) { + case E_THREAD_QUEUE: + /* if the queue is full, lose this new addition */ + if (e_dlist_length(&e->server_port->queue) < e->queue_limit) { + e_msgport_put(e->server_port, msg); + } else { + printf("queue limit reached, dropping new message\n"); + dmsg = msg; + } + break; + case E_THREAD_DROP: + /* if the queue is full, lose the oldest (unprocessed) message */ + if (e_dlist_length(&e->server_port->queue) < e->queue_limit) { + e_msgport_put(e->server_port, msg); + } else { + printf("queue limit reached, dropping old message\n"); + e_msgport_put(e->server_port, msg); + dmsg = e_msgport_get(e->server_port); + } + break; + case E_THREAD_NEW: + /* it is possible that an existing thread can catch this message, so + we might create a thread with no work to do. + but that doesn't matter, the other alternative that it be lost is worse */ + e_msgport_put(e->server_port, msg); + if (g_list_length(e->id_list) < e->queue_limit + && pthread_create(&id, NULL, thread_dispatch, e) == 0) { + e->id_list = g_list_append(e->id_list, (void *)id); + } + pthread_mutex_unlock(&e->mutex); + return; + } + + /* create the thread, if there is none to receive it yet */ + if (e->id == ~0) { + if (pthread_create(&e->id, NULL, thread_dispatch, e) == -1) { + g_warning("Could not create dispatcher thread, message queued?: %s", strerror(errno)); + e->id = ~0; + } + } + + pthread_mutex_unlock(&e->mutex); + + if (dmsg) { + thread_lost_msg(e, dmsg); + thread_destroy_msg(e, dmsg); + } +} + +/* yet-another-mutex interface */ +struct _EMutex { + int type; + pthread_t owner; + short waiters; + short depth; + pthread_mutex_t mutex; + pthread_cond_t cond; +}; + +/* sigh, this is just painful to have to need, but recursive + read/write, etc mutexes just aren't very common in thread + implementations */ +/* TODO: Just make it use recursive mutexes if they are available */ +EMutex *e_mutex_new(e_mutex_t type) +{ + struct _EMutex *m; + + m = g_malloc(sizeof(*m)); + m->type = type; + m->waiters = 0; + m->depth = 0; + m->owner = ~0; + + switch (type) { + case E_MUTEX_SIMPLE: + pthread_mutex_init(&m->mutex, 0); + break; + case E_MUTEX_REC: + pthread_mutex_init(&m->mutex, 0); + pthread_cond_init(&m->cond, 0); + break; + /* read / write ? flags for same? */ + } + + return m; +} + +int e_mutex_destroy(EMutex *m) +{ + int ret = 0; + + switch (m->type) { + case E_MUTEX_SIMPLE: + ret = pthread_mutex_destroy(&m->mutex); + if (ret == -1) + g_warning("EMutex destroy failed: %s", strerror(errno)); + g_free(m); + break; + case E_MUTEX_REC: + ret = pthread_mutex_destroy(&m->mutex); + if (ret == -1) + g_warning("EMutex destroy failed: %s", strerror(errno)); + ret = pthread_cond_destroy(&m->cond); + if (ret == -1) + g_warning("EMutex destroy failed: %s", strerror(errno)); + g_free(m); + + } + return ret; +} + +int e_mutex_lock(EMutex *m) +{ + pthread_t id; + + switch (m->type) { + case E_MUTEX_SIMPLE: + return pthread_mutex_lock(&m->mutex); + case E_MUTEX_REC: + id = pthread_self(); + if (pthread_mutex_lock(&m->mutex) == -1) + return -1; + while (1) { + if (m->owner == ~0) { + m->owner = id; + m->depth = 1; + break; + } else if (id == m->owner) { + m->depth++; + break; + } else { + m->waiters++; + if (pthread_cond_wait(&m->cond, &m->mutex) == -1) + return -1; + m->waiters--; + } + } + return pthread_mutex_unlock(&m->mutex); + } + + errno = EINVAL; + return -1; +} + +int e_mutex_unlock(EMutex *m) +{ + switch (m->type) { + case E_MUTEX_SIMPLE: + return pthread_mutex_unlock(&m->mutex); + case E_MUTEX_REC: + if (pthread_mutex_lock(&m->mutex) == -1) + return -1; + g_assert(m->owner == pthread_self()); + + m->depth--; + if (m->depth == 0) { + m->owner = ~0; + if (m->waiters > 0) + pthread_cond_signal(&m->cond); + } + return pthread_mutex_unlock(&m->mutex); + } + + errno = EINVAL; + return -1; +} + +#ifdef STANDALONE +EMsgPort *server_port; + + +void *fdserver(void *data) +{ + int fd; + EMsg *msg; + int id = (int)data; + fd_set rfds; + + fd = e_msgport_fd(server_port); + + while (1) { + int count = 0; + + printf("server %d: waiting on fd %d\n", id, fd); + FD_ZERO(&rfds); + FD_SET(fd, &rfds); + select(fd+1, &rfds, NULL, NULL, NULL); + printf("server %d: Got async notification, checking for messages\n", id); + while ((msg = e_msgport_get(server_port))) { + printf("server %d: got message\n", id); + sleep(1); + printf("server %d: replying\n", id); + e_msgport_reply(msg); + count++; + } + printf("server %d: got %d messages\n", id, count); + } +} + +void *server(void *data) +{ + EMsg *msg; + int id = (int)data; + + while (1) { + printf("server %d: waiting\n", id); + msg = e_msgport_wait(server_port); + msg = e_msgport_get(server_port); + if (msg) { + printf("server %d: got message\n", id); + sleep(1); + printf("server %d: replying\n", id); + e_msgport_reply(msg); + } else { + printf("server %d: didn't get message\n", id); + } + } +} + +void *client(void *data) +{ + EMsg *msg; + EMsgPort *replyport; + int i; + + replyport = e_msgport_new(); + msg = g_malloc0(sizeof(*msg)); + msg->reply_port = replyport; + for (i=0;i<10;i++) { + /* synchronous operation */ + printf("client: sending\n"); + e_msgport_put(server_port, msg); + printf("client: waiting for reply\n"); + e_msgport_wait(replyport); + e_msgport_get(replyport); + printf("client: got reply\n"); + } + + printf("client: sleeping ...\n"); + sleep(2); + printf("client: sending multiple\n"); + + for (i=0;i<10;i++) { + msg = g_malloc0(sizeof(*msg)); + msg->reply_port = replyport; + e_msgport_put(server_port, msg); + } + + printf("client: receiving multiple\n"); + for (i=0;i<10;i++) { + e_msgport_wait(replyport); + msg = e_msgport_get(replyport); + g_free(msg); + } + + printf("client: done\n"); +} + +int main(int argc, char **argv) +{ + pthread_t serverid, clientid; + + g_thread_init(NULL); + + server_port = e_msgport_new(); + + /*pthread_create(&serverid, NULL, server, (void *)1);*/ + pthread_create(&serverid, NULL, fdserver, (void *)1); + pthread_create(&clientid, NULL, client, NULL); + + sleep(60); + + return 0; +} +#endif diff --git a/e-util/e-msgport.h b/e-util/e-msgport.h new file mode 100644 index 0000000000..c8cede5361 --- /dev/null +++ b/e-util/e-msgport.h @@ -0,0 +1,79 @@ + +#ifndef _E_MSGPORT_H +#define _E_MSGPORT_H + +/* double-linked list yeah another one, deal */ +typedef struct _EDListNode { + struct _EDListNode *next; + struct _EDListNode *prev; +} EDListNode; + +typedef struct _EDList { + struct _EDListNode *head; + struct _EDListNode *tail; + struct _EDListNode *tailpred; +} EDList; + +void e_dlist_init(EDList *v); +EDListNode *e_dlist_addhead(EDList *l, EDListNode *n); +EDListNode *e_dlist_addtail(EDList *l, EDListNode *n); +EDListNode *e_dlist_remove(EDListNode *n); +EDListNode *e_dlist_remhead(EDList *l); +EDListNode *e_dlist_remtail(EDList *l); +int e_dlist_empty(EDList *l); +int e_dlist_length(EDList *l); + +/* message ports - a simple inter-thread 'ipc' primitive */ +/* opaque handle */ +typedef struct _EMsgPort EMsgPort; + +/* header for any message */ +typedef struct _EMsg { + EDListNode ln; + EMsgPort *reply_port; +} EMsg; + +EMsgPort *e_msgport_new(void); +void e_msgport_destroy(EMsgPort *mp); +/* get a fd that can be used to wait on the port asynchronously */ +int e_msgport_fd(EMsgPort *mp); +void e_msgport_put(EMsgPort *mp, EMsg *msg); +EMsg *e_msgport_wait(EMsgPort *mp); +EMsg *e_msgport_get(EMsgPort *mp); +void e_msgport_reply(EMsg *msg); + +/* e threads, a server thread with a message based request-response, and flexible queuing */ +typedef struct _EThread EThread; + +typedef enum { + E_THREAD_QUEUE = 0, /* run one by one, until done, if the queue_limit is reached, discard new request */ + E_THREAD_DROP, /* run one by one, until done, if the queue_limit is reached, discard oldest requests */ + E_THREAD_NEW, /* always run in a new thread, if the queue limit is reached, new requests are + stored in the queue until a thread becomes available for it, creating a thread pool */ +} e_thread_t; + +typedef void (*EThreadFunc)(EThread *, EMsg *, void *data); + +EThread *e_thread_new(e_thread_t type); +void e_thread_destroy(EThread *e); +void e_thread_set_queue_limit(EThread *e, int limit); +void e_thread_set_msg_lost(EThread *e, EThreadFunc destroy, void *data); +void e_thread_set_msg_destroy(EThread *e, EThreadFunc destroy, void *data); +void e_thread_set_reply_port(EThread *e, EMsgPort *reply_port); +void e_thread_set_msg_received(EThread *e, EThreadFunc received, void *data); +void e_thread_put(EThread *e, EMsg *msg); + +/* sigh, another mutex interface, this one allows different mutex types, portably */ +typedef struct _EMutex EMutex; + +typedef enum _e_mutex_t { + E_MUTEX_SIMPLE, /* == pthread_mutex */ + E_MUTEX_REC, /* recursive mutex */ +} e_mutex_t; + +EMutex *e_mutex_new(e_mutex_t type); +int e_mutex_destroy(EMutex *m); +int e_mutex_lock(EMutex *m); +int e_mutex_unlock(EMutex *m); + +#endif diff --git a/e-util/e-sexp.c b/e-util/e-sexp.c index 08b9bb6a62..4b500b4914 100644 --- a/e-util/e-sexp.c +++ b/e-util/e-sexp.c @@ -85,9 +85,9 @@ static struct _ESExpTerm * parse_value(ESExp *f); static void parse_dump_term(struct _ESExpTerm *t, int depth); +#ifdef E_SEXP_IS_GTK_OBJECT static GtkObjectClass *parent_class; - - +#endif static GScannerConfig scanner_config = { @@ -820,67 +820,9 @@ parse_list(ESExp *f, int gotbrace) return t; } -#if 0 -GList * -camel_mbox_folder_search_by_expression(CamelFolder *folder, char *expression, CamelException *ex) -{ - GScanner *gs; - int i; - struct _ESExpTerm *t; - struct _searchcontext *ctx; - struct _ESExpResult *r; - GList *matches = NULL; - - gs = g_scanner_new(&scanner_config); - for(i=0;ifolder = folder; - ctx->summary = camel_folder_get_summary(folder, ex); - ctx->message_info = camel_folder_summary_get_message_info_list(ctx->summary); -#ifdef HAVE_IBEX - ctx->index = ibex_open(CAMEL_MBOX_FOLDER(folder)->index_file_path, FALSE); - if (!ctx->index) { - perror("Cannot open index file, body searches will be ignored\n"); - } -#endif - r = term_eval(ctx, t); - - /* now create a folder summary to return?? */ - if (r - && r->type == ESEXP_RES_ARRAY_PTR) { - d(printf("got result ...\n")); - for (i=0;ivalue.ptrarray->len;i++) { - d(printf("adding match: %s\n", (char *)g_ptr_array_index(r->value.ptrarray, i))); - matches = g_list_prepend(matches, g_strdup(g_ptr_array_index(r->value.ptrarray, i))); - } - e_sexp_result_free(r); - } - - if (ctx->index) - ibex_close(ctx->index); - - gtk_object_unref((GtkObject *)ctx->summary); - g_free(ctx); - parse_term_free(t); - } else { - printf("Warning, Could not parse expression!\n %s\n", expression); - } - - g_scanner_destroy(gs); - - return matches; -} -#endif - - -static void e_sexp_finalise(GtkObject *); +static void e_sexp_finalise(void *); +#ifdef E_SEXP_IS_GTK_OBJECT static void e_sexp_class_init (ESExpClass *class) { @@ -892,6 +834,7 @@ e_sexp_class_init (ESExpClass *class) parent_class = gtk_type_class (gtk_object_get_type ()); } +#endif /* 'builtin' functions */ static struct { @@ -922,7 +865,7 @@ free_symbol(void *key, void *value, void *data) } static void -e_sexp_finalise(GtkObject *o) +e_sexp_finalise(void *o) { ESExp *s = (ESExp *)o; @@ -934,7 +877,9 @@ e_sexp_finalise(GtkObject *o) g_scanner_scope_foreach_symbol(s->scanner, 0, free_symbol, 0); g_scanner_destroy(s->scanner); +#ifdef E_SEXP_IS_GTK_OBJECT ((GtkObjectClass *)(parent_class))->finalize((GtkObject *)o); +#endif } static void @@ -952,8 +897,13 @@ e_sexp_init (ESExp *s) e_sexp_add_function(s, 0, symbols[i].name, symbols[i].func, &symbols[i]); } } + +#ifndef E_SEXP_IS_GTK_OBJECT + s->refcount = 1; +#endif } +#ifdef E_SEXP_IS_GTK_OBJECT guint e_sexp_get_type (void) { @@ -975,15 +925,36 @@ e_sexp_get_type (void) return type; } +#endif ESExp * e_sexp_new (void) { +#ifdef E_SEXP_IS_GTK_OBJECT ESExp *f = E_SEXP ( gtk_type_new (e_sexp_get_type ())); - +#else + ESExp *f = g_malloc0(sizeof(*f)); + e_sexp_init(f); +#endif return f; } +#ifndef E_SEXP_IS_GTK_OBJECT +void e_sexp_ref (ESExp *f) +{ + f->refcount++; +} + +void e_sexp_unref (ESExp *f) +{ + f->refcount--; + if (f->refcount == 0) { + e_sexp_finalise(f); + g_free(f); + } +} +#endif + void e_sexp_add_function(ESExp *f, int scope, char *name, ESExpFunc *func, void *data) { diff --git a/e-util/e-sexp.h b/e-util/e-sexp.h index 1030531133..5cf2740fc1 100644 --- a/e-util/e-sexp.h +++ b/e-util/e-sexp.h @@ -5,11 +5,20 @@ #define _E_SEXP_H #include + +#ifdef E_SEXP_IS_GTK_OBJECT #include +#endif +#ifdef E_SEXP_IS_GTK_OBJECT #define E_SEXP(obj) GTK_CHECK_CAST (obj, e_sexp_get_type (), ESExp) #define E_SEXP_CLASS(klass) GTK_CHECK_CLASS_CAST (klass, e_sexp_get_type (), ESExpClass) #define FILTER_IS_SEXP(obj) GTK_CHECK_TYPE (obj, e_sexp_get_type ()) +#else +#define E_SEXP(obj) ((struct _ESExp *)(obj)) +#define E_SEXP_CLASS(klass) ((struct _ESExpClass *)(klass)) +#define FILTER_IS_SEXP(obj) (1) +#endif typedef struct _ESExp ESExp; typedef struct _ESExpClass ESExpClass; @@ -82,19 +91,29 @@ struct _ESExpTerm { struct _ESExp { +#ifdef E_SEXP_IS_GTK_OBJECT GtkObject object; - +#else + int refcount; +#endif GScanner *scanner; /* for parsing text version */ ESExpTerm *tree; /* root of expression tree */ }; struct _ESExpClass { +#ifdef E_SEXP_IS_GTK_OBJECT GtkObjectClass parent_class; - +#endif }; +#ifdef E_SEXP_IS_GTK_OBJECT guint e_sexp_get_type (void); +#endif ESExp *e_sexp_new (void); +#ifndef E_SEXP_IS_GTK_OBJECT +void e_sexp_ref (ESExp *f); +void e_sexp_unref (ESExp *f); +#endif void e_sexp_add_function (ESExp *f, int scope, char *name, ESExpFunc *func, void *data); void e_sexp_add_ifunction (ESExp *f, int scope, char *name, ESExpIFunc *func, void *data); void e_sexp_add_variable (ESExp *f, int scope, char *name, ESExpTerm *value); diff --git a/filter/ChangeLog b/filter/ChangeLog index 6db104a5d6..57e19636ee 100644 --- a/filter/ChangeLog +++ b/filter/ChangeLog @@ -1,3 +1,7 @@ +2000-12-24 Not Zed + + * Merge from camel-mt-branch. + 2000-12-21 Not Zed * filter-message-search.c (filter_message_search): And here too. diff --git a/filter/filter-driver.c b/filter/filter-driver.c index 0e4fbe682c..deeb075d11 100644 --- a/filter/filter-driver.c +++ b/filter/filter-driver.c @@ -205,8 +205,6 @@ filter_driver_finalise (GtkObject *obj) g_hash_table_foreach (p->globals, free_hash_strings, driver); g_hash_table_destroy (p->globals); - /* anal warning hunters, just leave this, its only temporary, touch and die */ -#define e_sexp_unref(x) (gtk_object_unref((GtkObject *)(x))) e_sexp_unref(p->eval); if (p->defaultfolder) diff --git a/filter/filter-message-search.c b/filter/filter-message-search.c index ff599a2007..59fe4e78f8 100644 --- a/filter/filter-message-search.c +++ b/filter/filter-message-search.c @@ -794,8 +794,6 @@ filter_message_search (CamelMimeMessage *message, CamelMessageInfo *info, else retval = FALSE; - /* anal warning hunters, just leave this, its only temporary, touch and die */ -#define e_sexp_unref(x) (gtk_object_unref(x)) e_sexp_unref(sexp); e_sexp_result_free (result); diff --git a/libibex/ChangeLog b/libibex/ChangeLog index 877d9b8e29..2d3fe7bace 100644 --- a/libibex/ChangeLog +++ b/libibex/ChangeLog @@ -1,3 +1,27 @@ +2000-12-24 Not Zed + + * Merge from camel-mt-branch. + +2000-12-18 Not Zed + + * dumpindex.c (main): Same here. + + * testindex.c (main): Add a g_thread_init(). Sigh, glib's thread + stuff is snot. + (read_words): Setup another flat-out thread to test + multithreadedness at little bit. + + * ibex_block.c (ibex_index_buffer): Add locking around internal + calls. + (ibex_open): Init the locking mutex. + (ibex_close): Free the locking mutex. + (ibex_unindex): + (ibex_find): + (ibex_find_name): + (ibex_contains_name): Add locking around internal calls. + + * ibex_internal.h (struct ibex): Add a lock. Include config.h + 2000-12-13 Christopher James Lahey * disktail.c (tail_compress): diff --git a/libibex/Makefile.am b/libibex/Makefile.am index 61f3d72004..ab7c92206c 100644 --- a/libibex/Makefile.am +++ b/libibex/Makefile.am @@ -18,16 +18,17 @@ noinst_HEADERS = \ index.h INCLUDES = -I$(srcdir) $(GLIB_CFLAGS) $(UNICODE_CFLAGS) \ + $(THREADS_CFLAGS) \ -DG_LOG_DOMAIN=\"libibex\" noinst_PROGRAMS = dumpindex testindex dumpindex_SOURCES = dumpindex.c -dumpindex_LDADD = libibex.la $(GLIB_LIBS) $(UNICODE_LIBS) +dumpindex_LDADD = libibex.la $(GLIB_LIBS) $(UNICODE_LIBS) $(THREADS_LIBS) testindex_SOURCES = testindex.c -testindex_LDADD = libibex.la $(GLIB_LIBS) $(UNICODE_LIBS) -lm +testindex_LDADD = libibex.la $(GLIB_LIBS) $(UNICODE_LIBS) $(THREADS_LIBS) -lm #noinst_PROGRAMS = mkindex lookup # diff --git a/libibex/dumpindex.c b/libibex/dumpindex.c index cf4f02cc8a..410a7083d6 100644 --- a/libibex/dumpindex.c +++ b/libibex/dumpindex.c @@ -37,6 +37,10 @@ int main(int argc, char **argv) { ibex *ib; +#ifdef ENABLE_THREADS + g_thread_init(0); +#endif + if (argc != 2) { printf("Usage: %s ibexfile\n", argv[0]); return 1; diff --git a/libibex/ibex_block.c b/libibex/ibex_block.c index 41388162c4..faeee232ac 100644 --- a/libibex/ibex_block.c +++ b/libibex/ibex_block.c @@ -215,12 +215,17 @@ ibex_index_buffer (ibex *ib, char *name, char *buffer, size_t len, size_t *unrea p = q; } done: + IBEX_LOCK(ib); + d(printf("name %s count %d size %d\n", name, wordlist->len, len)); if (!ib->predone) { ib->words->klass->index_pre(ib->words); ib->predone = TRUE; } ib->words->klass->add_list(ib->words, name, wordlist); + + IBEX_UNLOCK(ib); + ret = 0; error: for (i=0;ilen;i++) @@ -246,12 +251,18 @@ ibex *ibex_open (char *file, int flags, int mode) /* FIXME: the blockcache or the wordindex needs to manage the other one */ ib->words = ib->blocks->words; +#ifdef ENABLE_THREADS + ib->lock = g_mutex_new(); +#endif return ib; } int ibex_save (ibex *ib) { d(printf("syncing database\n")); + + IBEX_LOCK(ib); + if (ib->predone) { ib->words->klass->index_post(ib->words); ib->predone = FALSE; @@ -259,6 +270,9 @@ int ibex_save (ibex *ib) ib->words->klass->sync(ib->words); /* FIXME: some return */ ibex_block_cache_sync(ib->blocks); + + IBEX_UNLOCK(ib); + return 0; } @@ -275,6 +289,9 @@ int ibex_close (ibex *ib) ib->words->klass->close(ib->words); ibex_block_cache_close(ib->blocks); +#ifdef ENABLE_THREADS + g_mutex_free(ib->lock); +#endif g_free(ib); return ret; } @@ -282,32 +299,47 @@ int ibex_close (ibex *ib) void ibex_unindex (ibex *ib, char *name) { d(printf("trying to unindex '%s'\n", name)); + IBEX_LOCK(ib); ib->words->klass->unindex_name(ib->words, name); + IBEX_UNLOCK(ib); } GPtrArray *ibex_find (ibex *ib, char *word) { char *normal; int len; + GPtrArray *ret; len = strlen(word); normal = alloca(len+1); ibex_normalise_word(word, word+len, normal); - return ib->words->klass->find(ib->words, normal); + IBEX_LOCK(ib); + ret = ib->words->klass->find(ib->words, normal); + IBEX_UNLOCK(ib); + return ret; } gboolean ibex_find_name (ibex *ib, char *name, char *word) { char *normal; int len; + gboolean ret; len = strlen(word); normal = alloca(len+1); ibex_normalise_word(word, word+len, normal); - return ib->words->klass->find_name(ib->words, name, normal); + IBEX_LOCK(ib); + ret = ib->words->klass->find_name(ib->words, name, normal); + IBEX_UNLOCK(ib); + return ret; } gboolean ibex_contains_name(ibex *ib, char *name) { - return ib->words->klass->contains_name(ib->words, name); + gboolean ret; + + IBEX_LOCK(ib); + ret = ib->words->klass->contains_name(ib->words, name); + IBEX_UNLOCK(ib); + return ret; } diff --git a/libibex/ibex_internal.h b/libibex/ibex_internal.h index 0eb4b625ce..f2212799c6 100644 --- a/libibex/ibex_internal.h +++ b/libibex/ibex_internal.h @@ -18,6 +18,8 @@ * Boston, MA 02111-1307, USA. */ +#include "config.h" + #include #include "ibex.h" @@ -29,4 +31,21 @@ struct ibex { struct _memcache *blocks; struct _IBEXWord *words; int predone; + + /* sigh i hate glib's mutex stuff too */ +#ifdef ENABLE_THREADS + GMutex *lock; +#endif + }; + +#ifdef ENABLE_THREADS +/*#define IBEX_LOCK(ib) (printf(__FILE__ "%d: %s: locking ibex\n", __LINE__, __FUNCTION__), g_mutex_lock(ib->lock)) + #define IBEX_UNLOCK(ib) (printf(__FILE__ "%d: %s: unlocking ibex\n", __LINE__, __FUNCTION__), g_mutex_unlock(ib->lock))*/ +#define IBEX_LOCK(ib) (g_mutex_lock(ib->lock)) +#define IBEX_UNLOCK(ib) (g_mutex_unlock(ib->lock)) +#else +#define IBEX_LOCK(ib) +#define IBEX_UNLOCK(ib) +#endif + diff --git a/libibex/testindex.c b/libibex/testindex.c index e21d73ff06..a3b6a9ce03 100644 --- a/libibex/testindex.c +++ b/libibex/testindex.c @@ -6,6 +6,10 @@ #include #include "ibex_internal.h" +#ifdef ENABLE_THREADS +#include +#endif + void word_index_mem_dump_info(struct _IBEXWord *idx); /* @@ -69,6 +73,29 @@ static char *getword(GPtrArray *words, float m, float s) return words->pdata[index]; } +#ifdef ENABLE_THREADS +int do_read_words; + +static void * +read_words(void *in) +{ + ibex *ib = in; + GPtrArray *a; + int lastlen = 0; + int i; + + while (do_read_words) { + a = ibex_find(ib, "joneses"); + if (a->len != lastlen) { + printf("Found %d joneses!\n", a->len); + lastlen = a->len; + } + for (i=0;ilen;i++) + g_free(a->pdata[i]); + g_ptr_array_free(a, TRUE); + } +} +#endif int main(int argc, char **argv) { @@ -83,9 +110,15 @@ int main(int argc, char **argv) int files; char *dict; +#ifdef ENABLE_THREADS + pthread_t id; + + g_thread_init(0); +#endif + srand(0xABADF00D); - files = 80000; + files = 8000; dict = "/usr/dict/words"; /* read words into an array */ @@ -119,6 +152,10 @@ int main(int argc, char **argv) return 1; } +#ifdef ENABLE_THREADS + do_read_words = 1; + pthread_create(&id, 0, read_words, ib); +#endif printf("Adding %d files\n", files); /* simulate adding new words to a bunch of files */ @@ -151,6 +188,11 @@ int main(int argc, char **argv) word_index_mem_dump_info(ib->words); +#ifdef ENABLE_THREADS + do_read_words = 0; + pthread_join(id, 0); +#endif + ibex_close(ib); return 0; -- cgit v1.2.3