From 5674966eeb7eecfa109ad419f1c997dba69ff0af Mon Sep 17 00:00:00 2001 From: Not Zed Date: Sun, 24 Dec 2000 00:58:02 +0000 Subject: Merge from camel-mt-branch. 2000-12-24 Not Zed * Merge from camel-mt-branch. svn path=/trunk/; revision=7152 --- e-util/e-msgport.c | 774 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 774 insertions(+) create mode 100644 e-util/e-msgport.c (limited to 'e-util/e-msgport.c') diff --git a/e-util/e-msgport.c b/e-util/e-msgport.c new file mode 100644 index 0000000000..107b889635 --- /dev/null +++ b/e-util/e-msgport.c @@ -0,0 +1,774 @@ + + +#include +#include +#include +#include +#include +#include + +#include + +#include "e-msgport.h" + +#include + +void e_dlist_init(EDList *v) +{ + v->head = (EDListNode *)&v->tail; + v->tail = 0; + v->tailpred = (EDListNode *)&v->head; +} + +EDListNode *e_dlist_addhead(EDList *l, EDListNode *n) +{ + n->next = l->head; + n->prev = (EDListNode *)&l->head; + l->head->prev = n; + l->head = n; + return n; +} + +EDListNode *e_dlist_addtail(EDList *l, EDListNode *n) +{ + n->next = (EDListNode *)&l->tail; + n->prev = l->tailpred; + l->tailpred->next = n; + l->tailpred = n; + return n; +} + +EDListNode *e_dlist_remove(EDListNode *n) +{ + n->next->prev = n->prev; + n->prev->next = n->next; + return n; +} + +EDListNode *e_dlist_remhead(EDList *l) +{ + EDListNode *n, *nn; + + n = l->head; + nn = n->next; + if (nn) { + nn->prev = n->prev; + l->head = nn; + return n; + } + return NULL; +} + +EDListNode *e_dlist_remtail(EDList *l) +{ + EDListNode *n, *np; + + n = l->tailpred; + np = n->prev; + if (np) { + np->next = n->next; + l->tailpred = np; + return n; + } + return NULL; +} + +int e_dlist_empty(EDList *l) +{ + return (l->head == (EDListNode *)&l->tail); +} + +int e_dlist_length(EDList *l) +{ + EDListNode *n, *nn; + int count = 0; + + n = l->head; + nn = n->next; + while (nn) { + count++; + n = nn; + nn = n->next; + } + + return 0; +} + +struct _EMsgPort { + EDList queue; + int condwait; /* how many waiting in condwait */ + union { + int pipe[2]; + struct { + int read; + int write; + } fd; + } pipe; + /* @#@$#$ glib stuff */ + GCond *cond; + GMutex *lock; +}; + +#define m(x) + +EMsgPort *e_msgport_new(void) +{ + EMsgPort *mp; + + mp = g_malloc(sizeof(*mp)); + e_dlist_init(&mp->queue); + mp->lock = g_mutex_new(); + mp->cond = g_cond_new(); + mp->pipe.fd.read = -1; + mp->pipe.fd.write = -1; + mp->condwait = 0; + + return mp; +} + +void e_msgport_destroy(EMsgPort *mp) +{ + g_mutex_free(mp->lock); + g_cond_free(mp->cond); + if (mp->pipe.fd.read != -1) { + close(mp->pipe.fd.read); + close(mp->pipe.fd.write); + } + g_free(mp); +} + +/* get a fd that can be used to wait on the port asynchronously */ +int e_msgport_fd(EMsgPort *mp) +{ + int fd; + + g_mutex_lock(mp->lock); + fd = mp->pipe.fd.read; + if (fd == -1) { + pipe(mp->pipe.pipe); + fd = mp->pipe.fd.read; + } + g_mutex_unlock(mp->lock); + + return fd; +} + +void e_msgport_put(EMsgPort *mp, EMsg *msg) +{ + m(printf("put:\n")); + g_mutex_lock(mp->lock); + e_dlist_addtail(&mp->queue, &msg->ln); + if (mp->condwait > 0) { + m(printf("put: condwait > 0, waking up\n")); + g_cond_signal(mp->cond); + } + if (mp->pipe.fd.write != -1) { + m(printf("put: have pipe, writing notification to it\n")); + write(mp->pipe.fd.write, "", 1); + } + g_mutex_unlock(mp->lock); + m(printf("put: done\n")); +} + +EMsg *e_msgport_wait(EMsgPort *mp) +{ + EMsg *msg; + + m(printf("wait:\n")); + g_mutex_lock(mp->lock); + while (e_dlist_empty(&mp->queue)) { + if (mp->pipe.fd.read == -1) { + m(printf("wait: waiting on condition\n")); + mp->condwait++; + g_cond_wait(mp->cond, mp->lock); + m(printf("wait: got condition\n")); + mp->condwait--; + } else { + fd_set rfds; + + m(printf("wait: waitng on pipe\n")); + FD_ZERO(&rfds); + FD_SET(mp->pipe.fd.read, &rfds); + g_mutex_unlock(mp->lock); + select(mp->pipe.fd.read+1, &rfds, NULL, NULL, NULL); + pthread_testcancel(); + g_mutex_lock(mp->lock); + m(printf("wait: got pipe\n")); + } + } + msg = (EMsg *)mp->queue.head; + m(printf("wait: message = %p\n", msg)); + g_mutex_unlock(mp->lock); + m(printf("wait: done\n")); + return msg; +} + +EMsg *e_msgport_get(EMsgPort *mp) +{ + EMsg *msg; + char dummy[1]; + + g_mutex_lock(mp->lock); + msg = (EMsg *)e_dlist_remhead(&mp->queue); + if (msg && mp->pipe.fd.read != -1) + read(mp->pipe.fd.read, dummy, 1); + m(printf("get: message = %p\n", msg)); + g_mutex_unlock(mp->lock); + + return msg; +} + +void e_msgport_reply(EMsg *msg) +{ + if (msg->reply_port) { + e_msgport_put(msg->reply_port, msg); + } + /* else lost? */ +} + +struct _EThread { + EMsgPort *server_port; + EMsgPort *reply_port; + pthread_mutex_t mutex; + e_thread_t type; + int queue_limit; + + int waiting; /* if we are waiting for a new message */ + pthread_t id; /* id of our running child thread */ + GList *id_list; /* if THREAD_NEW, then a list of our child threads */ + + EThreadFunc destroy; + void *destroy_data; + + EThreadFunc received; + void *received_data; + + EThreadFunc lost; + void *lost_data; +}; + +static void thread_destroy_msg(EThread *e, EMsg *m); + +EThread *e_thread_new(e_thread_t type) +{ + EThread *e; + + e = g_malloc0(sizeof(*e)); + pthread_mutex_init(&e->mutex, 0); + e->type = type; + e->server_port = e_msgport_new(); + e->id = ~0; + e->queue_limit = INT_MAX; + + return e; +} + +/* close down the threads & resources etc */ +void e_thread_destroy(EThread *e) +{ + int tries = 0; + int busy = FALSE; + EMsg *msg; + + /* make sure we soak up all the messages first */ + while ( (msg = e_msgport_get(e->server_port)) ) { + thread_destroy_msg(e, msg); + } + + pthread_mutex_lock(&e->mutex); + + switch(e->type) { + case E_THREAD_QUEUE: + case E_THREAD_DROP: + /* if we have a thread, 'kill' it */ + while (e->id != ~0 && tries < 5) { + if (e->waiting == 1) { + pthread_t id = e->id; + e->id = ~0; + pthread_mutex_unlock(&e->mutex); + if (pthread_cancel(id) == 0) + pthread_join(id, 0); + pthread_mutex_lock(&e->mutex); + } else { + printf("thread still active, waiting for it to finish\n"); + pthread_mutex_unlock(&e->mutex); + sleep(1); + pthread_mutex_lock(&e->mutex); + } + tries++; + } + busy = e->id != ~0; + break; + case E_THREAD_NEW: + while (g_list_length(e->id_list) && tries < 5) { + printf("thread(s) still active, waiting for them to finish\n"); + pthread_mutex_unlock(&e->mutex); + sleep(1); + pthread_mutex_lock(&e->mutex); + } + busy = g_list_length(e->id_list) != 0; + break; + } + + pthread_mutex_unlock(&e->mutex); + + /* and clean up, if we can */ + if (busy) { + g_warning("threads were busy, leaked EThread"); + return; + } + + e_msgport_destroy(e->server_port); + g_free(e); +} + +/* set the queue maximum depth, what happens when the queue + fills up depends on the queue type */ +void e_thread_set_queue_limit(EThread *e, int limit) +{ + e->queue_limit = limit; +} + +/* set a msg destroy callback, this can not call any e_thread functions on @e */ +void e_thread_set_msg_destroy(EThread *e, EThreadFunc destroy, void *data) +{ + pthread_mutex_lock(&e->mutex); + e->destroy = destroy; + e->destroy_data = data; + pthread_mutex_unlock(&e->mutex); +} + +/* set a message lost callback, called if any message is discarded */ +void e_thread_set_msg_lost(EThread *e, EThreadFunc lost, void *data) +{ + pthread_mutex_lock(&e->mutex); + e->lost = lost; + e->lost_data = lost; + pthread_mutex_unlock(&e->mutex); +} + +/* set a reply port, if set, then send messages back once finished */ +void e_thread_set_reply_port(EThread *e, EMsgPort *reply_port) +{ + e->reply_port = reply_port; +} + +/* set a received data callback */ +void e_thread_set_msg_received(EThread *e, EThreadFunc received, void *data) +{ + pthread_mutex_lock(&e->mutex); + e->received = received; + e->received_data = data; + pthread_mutex_unlock(&e->mutex); +} + +static void +thread_destroy_msg(EThread *e, EMsg *m) +{ + EThreadFunc func; + void *func_data; + + /* we do this so we never get an incomplete/unmatched callback + data */ + pthread_mutex_lock(&e->mutex); + func = e->destroy; + func_data = e->destroy_data; + pthread_mutex_unlock(&e->mutex); + + if (func) + func(e, m, func_data); +} + +static void +thread_received_msg(EThread *e, EMsg *m) +{ + EThreadFunc func; + void *func_data; + + /* we do this so we never get an incomplete/unmatched callback + data */ + pthread_mutex_lock(&e->mutex); + func = e->received; + func_data = e->received_data; + pthread_mutex_unlock(&e->mutex); + + if (func) + func(e, m, func_data); + else + g_warning("No processing callback for EThread, message unprocessed"); +} + +static void +thread_lost_msg(EThread *e, EMsg *m) +{ + EThreadFunc func; + void *func_data; + + /* we do this so we never get an incomplete/unmatched callback + data */ + pthread_mutex_lock(&e->mutex); + func = e->lost; + func_data = e->lost_data; + pthread_mutex_unlock(&e->mutex); + + if (func) + func(e, m, func_data); +} + +/* the actual thread dispatcher */ +static void * +thread_dispatch(void *din) +{ + EThread *e = din; + EMsg *m; + + printf("dispatch thread started: %ld\n", pthread_self()); + + while (1) { + pthread_mutex_lock(&e->mutex); + m = e_msgport_get(e->server_port); + if (m == NULL) { + /* nothing to do? If we are a 'new' type thread, just quit. + Otherwise, go into waiting (can be cancelled here) */ + switch (e->type) { + case E_THREAD_QUEUE: + case E_THREAD_DROP: + e->waiting = 1; + pthread_mutex_unlock(&e->mutex); + e_msgport_wait(e->server_port); + e->waiting = 0; + break; + case E_THREAD_NEW: + e->id_list = g_list_remove(e->id_list, (void *)pthread_self()); + pthread_mutex_unlock(&e->mutex); + return 0; + } + + continue; + } + pthread_mutex_unlock(&e->mutex); + + printf("got message in dispatch thread\n"); + + /* process it */ + thread_received_msg(e, m); + + /* if we have a reply port, send it back, otherwise, lose it */ + if (m->reply_port) { + e_msgport_reply(m); + } else { + thread_destroy_msg(e, m); + } + } + + /* if we run out of things to process we could conceivably 'hang around' for a bit, + but to do this we need to use the fd interface of the msgport, and its utility + is probably debatable anyway */ + + /* signify we are no longer running */ + /* This code isn't used yet, but would be if we ever had a 'quit now' message implemented */ + pthread_mutex_lock(&e->mutex); + switch (e->type) { + case E_THREAD_QUEUE: + case E_THREAD_DROP: + e->id = ~0; + break; + case E_THREAD_NEW: + e->id_list = g_list_remove(e->id_list, (void *)pthread_self()); + break; + } + pthread_mutex_unlock(&e->mutex); + + return 0; +} + +/* send a message to the thread, start thread if necessary */ +void e_thread_put(EThread *e, EMsg *msg) +{ + pthread_t id; + EMsg *dmsg = NULL; + + pthread_mutex_lock(&e->mutex); + + /* the caller forgot to tell us what to do, well, we can't do anything can we */ + if (e->received == NULL) { + pthread_mutex_unlock(&e->mutex); + g_warning("EThread called with no receiver function, no work to do!"); + thread_destroy_msg(e, msg); + return; + } + + msg->reply_port = e->reply_port; + + switch(e->type) { + case E_THREAD_QUEUE: + /* if the queue is full, lose this new addition */ + if (e_dlist_length(&e->server_port->queue) < e->queue_limit) { + e_msgport_put(e->server_port, msg); + } else { + printf("queue limit reached, dropping new message\n"); + dmsg = msg; + } + break; + case E_THREAD_DROP: + /* if the queue is full, lose the oldest (unprocessed) message */ + if (e_dlist_length(&e->server_port->queue) < e->queue_limit) { + e_msgport_put(e->server_port, msg); + } else { + printf("queue limit reached, dropping old message\n"); + e_msgport_put(e->server_port, msg); + dmsg = e_msgport_get(e->server_port); + } + break; + case E_THREAD_NEW: + /* it is possible that an existing thread can catch this message, so + we might create a thread with no work to do. + but that doesn't matter, the other alternative that it be lost is worse */ + e_msgport_put(e->server_port, msg); + if (g_list_length(e->id_list) < e->queue_limit + && pthread_create(&id, NULL, thread_dispatch, e) == 0) { + e->id_list = g_list_append(e->id_list, (void *)id); + } + pthread_mutex_unlock(&e->mutex); + return; + } + + /* create the thread, if there is none to receive it yet */ + if (e->id == ~0) { + if (pthread_create(&e->id, NULL, thread_dispatch, e) == -1) { + g_warning("Could not create dispatcher thread, message queued?: %s", strerror(errno)); + e->id = ~0; + } + } + + pthread_mutex_unlock(&e->mutex); + + if (dmsg) { + thread_lost_msg(e, dmsg); + thread_destroy_msg(e, dmsg); + } +} + +/* yet-another-mutex interface */ +struct _EMutex { + int type; + pthread_t owner; + short waiters; + short depth; + pthread_mutex_t mutex; + pthread_cond_t cond; +}; + +/* sigh, this is just painful to have to need, but recursive + read/write, etc mutexes just aren't very common in thread + implementations */ +/* TODO: Just make it use recursive mutexes if they are available */ +EMutex *e_mutex_new(e_mutex_t type) +{ + struct _EMutex *m; + + m = g_malloc(sizeof(*m)); + m->type = type; + m->waiters = 0; + m->depth = 0; + m->owner = ~0; + + switch (type) { + case E_MUTEX_SIMPLE: + pthread_mutex_init(&m->mutex, 0); + break; + case E_MUTEX_REC: + pthread_mutex_init(&m->mutex, 0); + pthread_cond_init(&m->cond, 0); + break; + /* read / write ? flags for same? */ + } + + return m; +} + +int e_mutex_destroy(EMutex *m) +{ + int ret = 0; + + switch (m->type) { + case E_MUTEX_SIMPLE: + ret = pthread_mutex_destroy(&m->mutex); + if (ret == -1) + g_warning("EMutex destroy failed: %s", strerror(errno)); + g_free(m); + break; + case E_MUTEX_REC: + ret = pthread_mutex_destroy(&m->mutex); + if (ret == -1) + g_warning("EMutex destroy failed: %s", strerror(errno)); + ret = pthread_cond_destroy(&m->cond); + if (ret == -1) + g_warning("EMutex destroy failed: %s", strerror(errno)); + g_free(m); + + } + return ret; +} + +int e_mutex_lock(EMutex *m) +{ + pthread_t id; + + switch (m->type) { + case E_MUTEX_SIMPLE: + return pthread_mutex_lock(&m->mutex); + case E_MUTEX_REC: + id = pthread_self(); + if (pthread_mutex_lock(&m->mutex) == -1) + return -1; + while (1) { + if (m->owner == ~0) { + m->owner = id; + m->depth = 1; + break; + } else if (id == m->owner) { + m->depth++; + break; + } else { + m->waiters++; + if (pthread_cond_wait(&m->cond, &m->mutex) == -1) + return -1; + m->waiters--; + } + } + return pthread_mutex_unlock(&m->mutex); + } + + errno = EINVAL; + return -1; +} + +int e_mutex_unlock(EMutex *m) +{ + switch (m->type) { + case E_MUTEX_SIMPLE: + return pthread_mutex_unlock(&m->mutex); + case E_MUTEX_REC: + if (pthread_mutex_lock(&m->mutex) == -1) + return -1; + g_assert(m->owner == pthread_self()); + + m->depth--; + if (m->depth == 0) { + m->owner = ~0; + if (m->waiters > 0) + pthread_cond_signal(&m->cond); + } + return pthread_mutex_unlock(&m->mutex); + } + + errno = EINVAL; + return -1; +} + +#ifdef STANDALONE +EMsgPort *server_port; + + +void *fdserver(void *data) +{ + int fd; + EMsg *msg; + int id = (int)data; + fd_set rfds; + + fd = e_msgport_fd(server_port); + + while (1) { + int count = 0; + + printf("server %d: waiting on fd %d\n", id, fd); + FD_ZERO(&rfds); + FD_SET(fd, &rfds); + select(fd+1, &rfds, NULL, NULL, NULL); + printf("server %d: Got async notification, checking for messages\n", id); + while ((msg = e_msgport_get(server_port))) { + printf("server %d: got message\n", id); + sleep(1); + printf("server %d: replying\n", id); + e_msgport_reply(msg); + count++; + } + printf("server %d: got %d messages\n", id, count); + } +} + +void *server(void *data) +{ + EMsg *msg; + int id = (int)data; + + while (1) { + printf("server %d: waiting\n", id); + msg = e_msgport_wait(server_port); + msg = e_msgport_get(server_port); + if (msg) { + printf("server %d: got message\n", id); + sleep(1); + printf("server %d: replying\n", id); + e_msgport_reply(msg); + } else { + printf("server %d: didn't get message\n", id); + } + } +} + +void *client(void *data) +{ + EMsg *msg; + EMsgPort *replyport; + int i; + + replyport = e_msgport_new(); + msg = g_malloc0(sizeof(*msg)); + msg->reply_port = replyport; + for (i=0;i<10;i++) { + /* synchronous operation */ + printf("client: sending\n"); + e_msgport_put(server_port, msg); + printf("client: waiting for reply\n"); + e_msgport_wait(replyport); + e_msgport_get(replyport); + printf("client: got reply\n"); + } + + printf("client: sleeping ...\n"); + sleep(2); + printf("client: sending multiple\n"); + + for (i=0;i<10;i++) { + msg = g_malloc0(sizeof(*msg)); + msg->reply_port = replyport; + e_msgport_put(server_port, msg); + } + + printf("client: receiving multiple\n"); + for (i=0;i<10;i++) { + e_msgport_wait(replyport); + msg = e_msgport_get(replyport); + g_free(msg); + } + + printf("client: done\n"); +} + +int main(int argc, char **argv) +{ + pthread_t serverid, clientid; + + g_thread_init(NULL); + + server_port = e_msgport_new(); + + /*pthread_create(&serverid, NULL, server, (void *)1);*/ + pthread_create(&serverid, NULL, fdserver, (void *)1); + pthread_create(&clientid, NULL, client, NULL); + + sleep(60); + + return 0; +} +#endif -- cgit v1.2.3