#include "e-msgport.h"
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <stdio.h>
#include <pthread.h>
#include <glib.h>
#define m(x) /* msgport debug */
#define t(x) /* thread debug */
void e_dlist_init(EDList *v)
{
v->head = (EDListNode *)&v->tail;
v->tail = 0;
v->tailpred = (EDListNode *)&v->head;
}
EDListNode *e_dlist_addhead(EDList *l, EDListNode *n)
{
n->next = l->head;
n->prev = (EDListNode *)&l->head;
l->head->prev = n;
l->head = n;
return n;
}
EDListNode *e_dlist_addtail(EDList *l, EDListNode *n)
{
n->next = (EDListNode *)&l->tail;
n->prev = l->tailpred;
l->tailpred->next = n;
l->tailpred = n;
return n;
}
EDListNode *e_dlist_remove(EDListNode *n)
{
n->next->prev = n->prev;
n->prev->next = n->next;
return n;
}
EDListNode *e_dlist_remhead(EDList *l)
{
EDListNode *n, *nn;
n = l->head;
nn = n->next;
if (nn) {
nn->prev = n->prev;
l->head = nn;
return n;
}
return NULL;
}
EDListNode *e_dlist_remtail(EDList *l)
{
EDListNode *n, *np;
n = l->tailpred;
np = n->prev;
if (np) {
np->next = n->next;
l->tailpred = np;
return n;
}
return NULL;
}
int e_dlist_empty(EDList *l)
{
return (l->head == (EDListNode *)&l->tail);
}
int e_dlist_length(EDList *l)
{
EDListNode *n, *nn;
int count = 0;
n = l->head;
nn = n->next;
while (nn) {
count++;
n = nn;
nn = n->next;
}
return 0;
}
struct _EMsgPort {
EDList queue;
int condwait; /* how many waiting in condwait */
union {
int pipe[2];
struct {
int read;
int write;
} fd;
} pipe;
/* @#@$#$ glib stuff */
GCond *cond;
GMutex *lock;
};
EMsgPort *e_msgport_new(void)
{
EMsgPort *mp;
mp = g_malloc(sizeof(*mp));
e_dlist_init(&mp->queue);
mp->lock = g_mutex_new();
mp->cond = g_cond_new();
mp->pipe.fd.read = -1;
mp->pipe.fd.write = -1;
mp->condwait = 0;
return mp;
}
void e_msgport_destroy(EMsgPort *mp)
{
g_mutex_free(mp->lock);
g_cond_free(mp->cond);
if (mp->pipe.fd.read != -1) {
close(mp->pipe.fd.read);
close(mp->pipe.fd.write);
}
g_free(mp);
}
/* get a fd that can be used to wait on the port asynchronously */
int e_msgport_fd(EMsgPort *mp)
{
int fd;
g_mutex_lock(mp->lock);
fd = mp->pipe.fd.read;
if (fd == -1) {
pipe(mp->pipe.pipe);
fd = mp->pipe.fd.read;
}
g_mutex_unlock(mp->lock);
return fd;
}
void e_msgport_put(EMsgPort *mp, EMsg *msg)
{
int fd;
m(printf("put:\n"));
g_mutex_lock(mp->lock);
e_dlist_addtail(&mp->queue, &msg->ln);
if (mp->condwait > 0) {
m(printf("put: condwait > 0, waking up\n"));
g_cond_signal(mp->cond);
}
fd = mp->pipe.fd.write;
g_mutex_unlock(mp->lock);
if (fd != -1) {
m(printf("put: have pipe, writing notification to it\n"));
write(fd, "", 1);
}
m(printf("put: done\n"));
}
static void
msgport_cleanlock(void *data)
{
EMsgPort *mp = data;
g_mutex_unlock(mp->lock);
}
EMsg *e_msgport_wait(EMsgPort *mp)
{
EMsg *msg;
m(printf("wait:\n"));
g_mutex_lock(mp->lock);
while (e_dlist_empty(&mp->queue)) {
if (mp->pipe.fd.read == -1) {
m(printf("wait: waiting on condition\n"));
mp->condwait++;
/* if we are cancelled in the cond-wait, then we need to unlock our lock when we cleanup */
pthread_cleanup_push(msgport_cleanlock, mp);
g_cond_wait(mp->cond, mp->lock);
pthread_cleanup_pop(0);
m(printf("wait: got condition\n"));
mp->condwait--;
} else {
fd_set rfds;
m(printf("wait: waitng on pipe\n"));
FD_ZERO(&rfds);
FD_SET(mp->pipe.fd.read, &rfds);
g_mutex_unlock(mp->lock);
select(mp->pipe.fd.read+1, &rfds, NULL, NULL, NULL);
pthread_testcancel();
g_mutex_lock(mp->lock);
m(printf("wait: got pipe\n"));
}
}
msg = (EMsg *)mp->queue.head;
m(printf("wait: message = %p\n", msg));
g_mutex_unlock(mp->lock);
m(printf("wait: done\n"));
return msg;
}
EMsg *e_msgport_get(EMsgPort *mp)
{
EMsg *msg;
char dummy[1];
g_mutex_lock(mp->lock);
msg = (EMsg *)e_dlist_remhead(&mp->queue);
if (msg && mp->pipe.fd.read != -1)
read(mp->pipe.fd.read, dummy, 1);
m(printf("get: message = %p\n", msg));
g_mutex_unlock(mp->lock);
return msg;
}
void e_msgport_reply(EMsg *msg)
{
if (msg->reply_port) {
e_msgport_put(msg->reply_port, msg);
}
/* else lost? */
}
struct _thread_info {
pthread_t id;
int busy;
};
struct _EThread {
struct _EThread *next;
struct _EThread *prev;
EMsgPort *server_port;
EMsgPort *reply_port;
pthread_mutex_t mutex;
e_thread_t type;
int queue_limit;
int waiting; /* if we are waiting for a new message, count of waiting processes */
pthread_t id; /* id of our running child thread */
GList *id_list; /* if THREAD_NEW, then a list of our child threads in thread_info structs */
EThreadFunc destroy;
void *destroy_data;
EThreadFunc received;
void *received_data;
EThreadFunc lost;
void *lost_data;
};
/* All active threads */
static EDList ethread_list = E_DLIST_INITIALISER(ethread_list);
static pthread_mutex_t ethread_lock = PTHREAD_MUTEX_INITIALIZER;
#define E_THREAD_NONE ((pthread_t)~0)
#define E_THREAD_QUIT_REPLYPORT ((struct _EMsgPort *)~0)
static void thread_destroy_msg(EThread *e, EMsg *m);
static struct _thread_info *thread_find(EThread *e, pthread_t id)
{
GList *node;
struct _thread_info *info;
node = e->id_list;
while (node) {
info = node->data;
if (info->id == id)
return info;
node = node->next;
}
return NULL;
}
#if 0
static void thread_remove(EThread *e, pthread_t id)
{
GList *node;
struct _thread_info *info;
node = e->id_list;
while (node) {
info = node->data;
if (info->id == id) {
e->id_list = g_list_remove(e->id_list, info);
g_free(info);
}
node = node->next;
}
}
#endif
EThread *e_thread_new(e_thread_t type)
{
EThread *e;
e = g_malloc0(sizeof(*e));
pthread_mutex_init(&e->mutex, 0);
e->type = type;
e->server_port = e_msgport_new();
e->id = E_THREAD_NONE;
e->queue_limit = INT_MAX;
pthread_mutex_lock(ðread_lock);
e_dlist_addtail(ðread_list, (EDListNode *)e);
pthread_mutex_unlock(ðread_lock);
return e;
}
/* close down the threads & resources etc */
void e_thread_destroy(EThread *e)
{
int busy = FALSE;
EMsg *msg;
struct _thread_info *info;
GList *l;
/* make sure we soak up all the messages first */
while ( (msg = e_msgport_get(e->server_port)) ) {
thread_destroy_msg(e, msg);
}
pthread_mutex_lock(&e->mutex);
switch(e->type) {
case E_THREAD_QUEUE:
case E_THREAD_DROP:
/* if we have a thread, 'kill' it */
if (e->id != E_THREAD_NONE) {
pthread_t id = e->id;
t(printf("Sending thread '%d' quit message\n", id));
e->id = E_THREAD_NONE;
msg = g_malloc0(sizeof(*msg));
msg->reply_port = E_THREAD_QUIT_REPLYPORT;
e_msgport_put(e->server_port, msg);
pthread_mutex_unlock(&e->mutex);
t(printf("Joining thread '%d'\n", id));
pthread_join(id, 0);
t(printf("Joined thread '%d'!\n", id));
pthread_mutex_lock(&e->mutex);
}
busy = e->id != E_THREAD_NONE;
break;
case E_THREAD_NEW:
/* first, send everyone a quit message */
l = e->id_list;
while (l) {
info = l->data;
t(printf("Sending thread '%d' quit message\n", info->id));
msg = g_malloc0(sizeof(*msg));
msg->reply_port = E_THREAD_QUIT_REPLYPORT;
e_msgport_put(e->server_port, msg);
l = l->next;
}
/* then, wait for everyone to quit */
while (e->id_list) {
info = e->id_list->data;
e->id_list = g_list_remove(e->id_list, info);
pthread_mutex_unlock(&e->mutex);
t(printf("Joining thread '%d'\n", info->id));
pthread_join(info->id, 0);
t(printf("Joined thread '%d'!\n", info->id));
pthread_mutex_lock(&e->mutex);
g_free(info);
}
busy = g_list_length(e->id_list) != 0;
break;
}
pthread_mutex_unlock(&e->mutex);
/* and clean up, if we can */
if (busy) {
g_warning("threads were busy, leaked EThread");
return;
}
pthread_mutex_lock(ðread_lock);
e_dlist_remove((EDListNode *)e);
pthread_mutex_unlock(ðread_lock);
pthread_mutex_destroy(&e->mutex);
e_msgport_destroy(e->server_port);
g_free(e);
}
/* set the queue maximum depth, what happens when the queue
fills up depends on the queue type */
void e_thread_set_queue_limit(EThread *e, int limit)
{
e->queue_limit = limit;
}
/* set a msg destroy callback, this can not call any e_thread functions on @e */
void e_thread_set_msg_destroy(EThread *e, EThreadFunc destroy, void *data)
{
pthread_mutex_lock(&e->mutex);
e->destroy = destroy;
e->destroy_data = data;
pthread_mutex_unlock(&e->mutex);
}
/* set a message lost callback, called if any message is discarded */
void e_thread_set_msg_lost(EThread *e, EThreadFunc lost, void *data)
{
pthread_mutex_lock(&e->mutex);
e->lost = lost;
e->lost_data = lost;
pthread_mutex_unlock(&e->mutex);
}
/* set a reply port, if set, then send messages back once finished */
void e_thread_set_reply_port(EThread *e, EMsgPort *reply_port)
{
e->reply_port = reply_port;
}
/* set a received data callback */
void e_thread_set_msg_received(EThread *e, EThreadFunc received, void *data)
{
pthread_mutex_lock(&e->mutex);
e->received = received;
e->received_data = data;
pthread_mutex_unlock(&e->mutex);
}
/* find out if we're busy doing any work, e==NULL, check for all work */
int e_thread_busy(EThread *e)
{
int busy = FALSE;
if (e == NULL) {
pthread_mutex_lock(ðread_lock);
e = (EThread *)ethread_list.head;
while (e->next && !busy) {
busy = e_thread_busy(e);
e = e->next;
}
pthread_mutex_unlock(ðread_lock);
} else {
pthread_mutex_lock(&e->mutex);
switch (e->type) {
case E_THREAD_QUEUE:
case E_THREAD_DROP:
busy = e->waiting != 1 && e->id != E_THREAD_NONE;
break;
case E_THREAD_NEW:
busy = e->waiting != g_list_length(e->id_list);
break;
}
pthread_mutex_unlock(&e->mutex);
}
return busy;
}
static void
thread_destroy_msg(EThread *e, EMsg *m)
{
EThreadFunc func;
void *func_data;
/* we do this so we never get an incomplete/unmatched callback + data */
pthread_mutex_lock(&e->mutex);
func = e->destroy;
func_data = e->destroy_data;
pthread_mutex_unlock(&e->mutex);
if (func)
func(e, m, func_data);
}
static void
thread_received_msg(EThread *e, EMsg *m)
{
EThreadFunc func;
void *func_data;
/* we do this so we never get an incomplete/unmatched callback + data */
pthread_mutex_lock(&e->mutex);
func = e->received;
func_data = e->received_data;
pthread_mutex_unlock(&e->mutex);
if (func)
func(e, m, func_data);
else
g_warning("No processing callback for EThread, message unprocessed");
}
static void
thread_lost_msg(EThread *e, EMsg *m)
{
EThreadFunc func;
void *func_data;
/* we do this so we never get an incomplete/unmatched callback + data */
pthread_mutex_lock(&e->mutex);
func = e->lost;
func_data = e->lost_data;
pthread_mutex_unlock(&e->mutex);
if (func)
func(e, m, func_data);
}
/* the actual thread dispatcher */
static void *
thread_dispatch(void *din)
{
EThread *e = din;
EMsg *m;
struct _thread_info *info;
pthread_t self = pthread_self();
t(printf("dispatch thread started: %ld\n", pthread_self()));
while (1) {
pthread_mutex_lock(&e->mutex);
m = e_msgport_get(e->server_port);
if (m == NULL) {
/* nothing to do? If we are a 'new' type thread, just quit.
Otherwise, go into waiting (can be cancelled here) */
info = NULL;
switch (e->type) {
case E_THREAD_NEW:
case E_THREAD_QUEUE:
case E_THREAD_DROP:
info = thread_find(e, self);
if (info)
info->busy = FALSE;
e->waiting++;
pthread_mutex_unlock(&e->mutex);
e_msgport_wait(e->server_port);
pthread_mutex_lock(&e->mutex);
e->waiting--;
pthread_mutex_unlock(&e->mutex);
break;
#if 0
case E_THREAD_NEW:
e->id_list = g_list_remove(e->id_list, (void *)pthread_self());
pthread_mutex_unlock(&e->mutex);
return 0;
#endif
}
continue;
} else if (m->reply_port == E_THREAD_QUIT_REPLYPORT) {
t(printf("Thread %d got quit message\n", self));
/* Handle a quit message, say we're quitting, free the message, and break out of the loop */
info = thread_find(e, self);
if (info)
info->busy = 2;
pthread_mutex_unlock(&e->mutex);
g_free(m);
break;
} else {
info = thread_find(e, self);
if (info)
info->busy = TRUE;
}
pthread_mutex_unlock(&e->mutex);
t(printf("got message in dispatch thread\n"));
/* process it */
thread_received_msg(e, m);
/* if we have a reply port, send it back, otherwise, lose it */
if (m->reply_port) {
e_msgport_reply(m);
} else {
thread_destroy_msg(e, m);
}
}
return NULL;
}
/* send a message to the thread, start thread if necessary */
void e_thread_put(EThread *e, EMsg *msg)
{
pthread_t id;
EMsg *dmsg = NULL;
pthread_mutex_lock(&e->mutex);
/* the caller forgot to tell us what to do, well, we can't do anything can we */
if (e->received == NULL) {
pthread_mutex_unlock(&e->mutex);
g_warning("EThread called with no receiver function, no work to do!");
thread_destroy_msg(e, msg);
return;
}
msg->reply_port = e->reply_port;
switch(e->type) {
case E_THREAD_QUEUE:
/* if the queue is full, lose this new addition */
if (e_dlist_length(&e->server_port->queue) < e->queue_limit) {
e_msgport_put(e->server_port, msg);
} else {
printf("queue limit reached, dropping new message\n");
dmsg = msg;
}
break;
case E_THREAD_DROP:
/* if the queue is full, lose the oldest (unprocessed) message */
if (e_dlist_length(&e->server_port->queue) < e->queue_limit) {
e_msgport_put(e->server_port, msg);
} else {
printf("queue limit reached, dropping old message\n");
e_msgport_put(e->server_port, msg);
dmsg = e_msgport_get(e->server_port);
}
break;
case E_THREAD_NEW:
/* it is possible that an existing thread can catch this message, so
we might create a thread with no work to do.
but that doesn't matter, the other alternative that it be lost is worse */
e_msgport_put(e->server_port, msg);
if (e->waiting == 0
&& g_list_length(e->id_list) < e->queue_limit
&& pthread_create(&id, NULL, thread_dispatch, e) == 0) {
struct _thread_info *info = g_malloc0(sizeof(*info));
t(printf("created NEW thread %ld\n", id));
info->id = id;
info->busy = TRUE;
e->id_list = g_list_append(e->id_list, info);
}
pthread_mutex_unlock(&e->mutex);
return;
}
/* create the thread, if there is none to receive it yet */
if (e->id == E_THREAD_NONE) {
if (pthread_create(&e->id, NULL, thread_dispatch, e) == -1) {
g_warning("Could not create dispatcher thread, message queued?: %s", strerror(errno));
e->id = E_THREAD_NONE;
}
}
pthread_mutex_unlock(&e->mutex);
if (dmsg) {
thread_lost_msg(e, dmsg);
thread_destroy_msg(e, dmsg);
}
}
/* yet-another-mutex interface */
struct _EMutex {
int type;
pthread_t owner;
short waiters;
short depth;
pthread_mutex_t mutex;
pthread_cond_t cond;
};
/* sigh, this is just painful to have to need, but recursive
read/write, etc mutexes just aren't very common in thread
implementations */
/* TODO: Just make it use recursive mutexes if they are available */
EMutex *e_mutex_new(e_mutex_t type)
{
struct _EMutex *m;
m = g_malloc(sizeof(*m));
m->type = type;
m->waiters = 0;
m->depth = 0;
m->owner = E_THREAD_NONE;
switch (type) {
case E_MUTEX_SIMPLE:
pthread_mutex_init(&m->mutex, 0);
break;
case E_MUTEX_REC:
pthread_mutex_init(&m->mutex, 0);
pthread_cond_init(&m->cond, 0);
break;
/* read / write ? flags for same? */
}
return m;
}
int e_mutex_destroy(EMutex *m)
{
int ret = 0;
switch (m->type) {
case E_MUTEX_SIMPLE:
ret = pthread_mutex_destroy(&m->mutex);
if (ret == -1)
g_warning("EMutex destroy failed: %s", strerror(errno));
g_free(m);
break;
case E_MUTEX_REC:
ret = pthread_mutex_destroy(&m->mutex);
if (ret == -1)
g_warning("EMutex destroy failed: %s", strerror(errno));
ret = pthread_cond_destroy(&m->cond);
if (ret == -1)
g_warning("EMutex destroy failed: %s", strerror(errno));
g_free(m);
}
return ret;
}
int e_mutex_lock(EMutex *m)
{
pthread_t id;
switch (m->type) {
case E_MUTEX_SIMPLE:
return pthread_mutex_lock(&m->mutex);
case E_MUTEX_REC:
id = pthread_self();
if (pthread_mutex_lock(&m->mutex) == -1)
return -1;
while (1) {
if (m->owner == E_THREAD_NONE) {
m->owner = id;
m->depth = 1;
break;
} else if (id == m->owner) {
m->depth++;
break;
} else {
m->waiters++;
if (pthread_cond_wait(&m->cond, &m->mutex) == -1)
return -1;
m->waiters--;
}
}
return pthread_mutex_unlock(&m->mutex);
}
errno = EINVAL;
return -1;
}
int e_mutex_unlock(EMutex *m)
{
switch (m->type) {
case E_MUTEX_SIMPLE:
return pthread_mutex_unlock(&m->mutex);
case E_MUTEX_REC:
if (pthread_mutex_lock(&m->mutex) == -1)
return -1;
g_assert(m->owner == pthread_self());
m->depth--;
if (m->depth == 0) {
m->owner = E_THREAD_NONE;
if (m->waiters > 0)
pthread_cond_signal(&m->cond);
}
return pthread_mutex_unlock(&m->mutex);
}
errno = EINVAL;
return -1;
}
void e_mutex_assert_locked(EMutex *m)
{
g_return_if_fail (m->type == E_MUTEX_REC);
pthread_mutex_lock(&m->mutex);
g_assert(m->owner == pthread_self());
pthread_mutex_unlock(&m->mutex);
}
#ifdef STANDALONE
EMsgPort *server_port;
void *fdserver(void *data)
{
int fd;
EMsg *msg;
int id = (int)data;
fd_set rfds;
fd = e_msgport_fd(server_port);
while (1) {
int count = 0;
printf("server %d: waiting on fd %d\n", id, fd);
FD_ZERO(&rfds);
FD_SET(fd, &rfds);
select(fd+1, &rfds, NULL, NULL, NULL);
printf("server %d: Got async notification, checking for messages\n", id);
while ((msg = e_msgport_get(server_port))) {
printf("server %d: got message\n", id);
sleep(1);
printf("server %d: replying\n", id);
e_msgport_reply(msg);
count++;
}
printf("server %d: got %d messages\n", id, count);
}
}
void *server(void *data)
{
EMsg *msg;
int id = (int)data;
while (1) {
printf("server %d: waiting\n", id);
msg = e_msgport_wait(server_port);
msg = e_msgport_get(server_port);
if (msg) {
printf("server %d: got message\n", id);
sleep(1);
printf("server %d: replying\n", id);
e_msgport_reply(msg);
} else {
printf("server %d: didn't get message\n", id);
}
}
}
void *client(void *data)
{
EMsg *msg;
EMsgPort *replyport;
int i;
replyport = e_msgport_new();
msg = g_malloc0(sizeof(*msg));
msg->reply_port = replyport;
for (i=0;i<10;i++) {
/* synchronous operation */
printf("client: sending\n");
e_msgport_put(server_port, msg);
printf("client: waiting for reply\n");
e_msgport_wait(replyport);
e_msgport_get(replyport);
printf("client: got reply\n");
}
printf("client: sleeping ...\n");
sleep(2);
printf("client: sending multiple\n");
for (i=0;i<10;i++) {
msg = g_malloc0(sizeof(*msg));
msg->reply_port = replyport;
e_msgport_put(server_port, msg);
}
printf("client: receiving multiple\n");
for (i=0;i<10;i++) {
e_msgport_wait(replyport);
msg = e_msgport_get(replyport);
g_free(msg);
}
printf("client: done\n");
}
int main(int argc, char **argv)
{
pthread_t serverid, clientid;
g_thread_init(NULL);
server_port = e_msgport_new();
/*pthread_create(&serverid, NULL, server, (void *)1);*/
pthread_create(&serverid, NULL, fdserver, (void *)1);
pthread_create(&clientid, NULL, client, NULL);
sleep(60);
return 0;
}
#endif