aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--camel/ChangeLog11
-rw-r--r--camel/camel-remote-store.c134
-rw-r--r--camel/camel-session.c226
-rw-r--r--camel/camel-session.h18
-rw-r--r--camel/camel-stream-fs.c69
5 files changed, 434 insertions, 24 deletions
diff --git a/camel/ChangeLog b/camel/ChangeLog
index 9ab195b5a7..11d938abe7 100644
--- a/camel/ChangeLog
+++ b/camel/ChangeLog
@@ -1,3 +1,14 @@
+2001-01-15 Not Zed <NotZed@Ximian.com>
+
+ * camel-remote-store.c (socket_connect): A cancellable connection
+ routine.
+ (remote_send_string): Return cancelled exception if we were.
+ (remote_send_stream): "
+ (remote_recv_line): "
+
+ * camel-stream-fs.c (stream_read): First cut at cancellation
+ stuff. Its looking a bit ugly.
+
2001-01-15 Jeffrey Stedfast <fejj@ximian.com>
* camel-tcp-stream-ssl.c (stream_connect): Uses an SSL socket now
diff --git a/camel/camel-remote-store.c b/camel/camel-remote-store.c
index 8d095244e2..9e7a60e378 100644
--- a/camel/camel-remote-store.c
+++ b/camel/camel-remote-store.c
@@ -25,6 +25,7 @@
#include <config.h>
+#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
@@ -205,6 +206,87 @@ timeout_cb (gpointer data)
return TRUE;
}
+/* this is a 'cancellable' connect, cancellable from camel_cancel etc */
+/* returns -1 & errno == EINTR if the connection was cancelled */
+static int socket_connect(struct hostent *h, int port)
+{
+ struct sockaddr_in sin;
+ int fd;
+ int ret;
+ socklen_t len;
+ struct timeval tv;
+ int cancel_fd;
+
+ /* see if we're cancelled yet */
+ if (camel_cancel_check(NULL)) {
+ errno = EINTR;
+ return -1;
+ }
+
+ /* setup connect, we do it using a nonblocking socket so we can poll it */
+ sin.sin_port = htons(port);
+ sin.sin_family = h->h_addrtype;
+ memcpy (&sin.sin_addr, h->h_addr, sizeof (sin.sin_addr));
+
+ fd = socket (h->h_addrtype, SOCK_STREAM, 0);
+
+ cancel_fd = camel_cancel_fd(NULL);
+ if (cancel_fd == -1) {
+ ret = connect(fd, (struct sockaddr *)&sin, sizeof (sin));
+ if (ret == -1) {
+ close(fd);
+ return -1;
+ }
+ return fd;
+ } else {
+ fd_set rdset, wrset;
+ long flags;
+
+ fcntl(fd, F_GETFL, &flags);
+ fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+
+ ret = connect(fd, (struct sockaddr *)&sin, sizeof (sin));
+ if (ret == 0)
+ return fd;
+
+ if (errno != EINPROGRESS) {
+ close(fd);
+ return -1;
+ }
+
+ FD_ZERO(&rdset);
+ FD_ZERO(&wrset);
+ FD_SET(fd, &wrset);
+ FD_SET(cancel_fd, &rdset);
+ tv.tv_usec = 0;
+ tv.tv_sec = 30;
+ if (select((fd+cancel_fd)/2+1, &rdset, &wrset, 0, &tv) == 0) {
+ close(fd);
+ errno = ETIMEDOUT;
+ return -1;
+ }
+ if (cancel_fd != -1 && FD_ISSET(cancel_fd, &rdset)) {
+ close(fd);
+ errno = EINTR;
+ return -1;
+ } else {
+ len = sizeof(int);
+ if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &ret, &len) == -1) {
+ close(fd);
+ return -1;
+ }
+ if (ret != 0) {
+ close(fd);
+ errno = ret;
+ return -1;
+ }
+ }
+ fcntl(fd, F_SETFL, flags);
+ }
+
+ return fd;
+}
+
static gboolean
remote_connect (CamelService *service, CamelException *ex)
{
@@ -225,6 +307,20 @@ remote_connect (CamelService *service, CamelException *ex)
port = service->url->port;
else
port = store->default_port;
+
+#if 1
+ fd = socket_connect(h, port);
+ if (fd == -1) {
+ if (errno == EINTR)
+ camel_exception_set(ex, CAMEL_EXCEPTION_USER_CANCEL, _("Connection cancelled"));
+ else
+ camel_exception_setv (ex, CAMEL_EXCEPTION_SERVICE_UNAVAILABLE,
+ _("Could not connect to %s (port %d): %s"),
+ service->url->host ? service->url->host : _("(unknown host)"),
+ port, strerror (errno));
+ return FALSE;
+ }
+#else
sin.sin_port = htons (port);
memcpy (&sin.sin_addr, h->h_addr, sizeof (sin.sin_addr));
@@ -240,6 +336,7 @@ remote_connect (CamelService *service, CamelException *ex)
return FALSE;
}
+#endif
/* parent class connect initialization */
if (CAMEL_SERVICE_CLASS (store_class)->connect (service, ex) == FALSE)
@@ -322,9 +419,11 @@ remote_send_string (CamelRemoteStore *store, CamelException *ex, char *fmt, va_l
#endif
if (camel_stream_printf (store->ostream, "%s", cmdbuf) == -1) {
+ if (errno == EINTR)
+ camel_exception_set(ex, CAMEL_EXCEPTION_USER_CANCEL, _("Operation cancelled"));
+ else
+ camel_exception_set(ex, CAMEL_EXCEPTION_SERVICE_UNAVAILABLE, strerror(errno));
g_free (cmdbuf);
- camel_exception_set (ex, CAMEL_EXCEPTION_SERVICE_UNAVAILABLE,
- g_strerror (errno));
camel_service_disconnect (CAMEL_SERVICE (store), FALSE, NULL);
return -1;
@@ -381,9 +480,11 @@ remote_send_stream (CamelRemoteStore *store, CamelStream *stream, CamelException
d(fprintf (stderr, "(sending stream)\n"));
ret = camel_stream_write_to_stream (stream, store->ostream);
- if (ret < 0) {
- camel_exception_set (ex, CAMEL_EXCEPTION_SERVICE_UNAVAILABLE,
- g_strerror (errno));
+ if (ret == -1) {
+ if (errno == EINTR)
+ camel_exception_set(ex, CAMEL_EXCEPTION_USER_CANCEL, _("Operation cancelled"));
+ else
+ camel_exception_set(ex, CAMEL_EXCEPTION_SERVICE_UNAVAILABLE, strerror(errno));
camel_service_disconnect (CAMEL_SERVICE (store), FALSE, NULL);
}
@@ -446,23 +547,22 @@ remote_recv_line (CamelRemoteStore *store, char **dest, CamelException *ex)
if (nread > 0)
g_byte_array_append (bytes, buf, nread);
} while (nread == sizeof (buf) - 1);
-
+
+ if (nread == -1) {
+ if (errno == EINTR)
+ camel_exception_set(ex, CAMEL_EXCEPTION_USER_CANCEL, _("Operation cancelled"));
+ else
+ camel_exception_set(ex, CAMEL_EXCEPTION_SERVICE_UNAVAILABLE, strerror(errno));
+ g_byte_array_free(bytes, TRUE);
+ camel_service_disconnect (CAMEL_SERVICE (store), FALSE, NULL);
+ return -1;
+ }
+
g_byte_array_append (bytes, "", 1);
ret = bytes->data;
nread = bytes->len - 1;
g_byte_array_free (bytes, FALSE);
- if (nread <= 0) {
- g_free (ret);
- ret = NULL;
- camel_exception_set (ex, CAMEL_EXCEPTION_SERVICE_UNAVAILABLE,
- nread ? g_strerror (errno) :
- _("Server disconnected."));
-
- camel_service_disconnect (CAMEL_SERVICE (store), FALSE, NULL);
- return -1;
- }
-
/* strip off the CRLF sequence */
while (nread > 0 && ret[nread] != '\r')
ret[nread--] = '\0';
diff --git a/camel/camel-session.c b/camel/camel-session.c
index b1cc489c71..67d58a1e67 100644
--- a/camel/camel-session.c
+++ b/camel/camel-session.c
@@ -482,3 +482,229 @@ camel_session_remove_timeout (CamelSession *session, guint handle)
{
return session->remover (handle);
}
+
+/* ********************************************************************** */
+
+struct _CamelCancel {
+ pthread_t id; /* id of running thread */
+ guint32 flags; /* cancelled ? */
+ int blocked; /* cancellation blocked depth */
+ int refcount;
+#ifdef ENABLE_THREADS
+ EMsgPort *cancel_port;
+ int cancel_fd;
+ pthread_mutex_t lock;
+#endif
+};
+
+#define CAMEL_CANCEL_CANCELLED (1<<0)
+
+#ifdef ENABLE_THREADS
+#define CAMEL_CANCEL_LOCK(cc) pthread_mutex_lock(&cc->lock)
+#define CAMEL_CANCEL_UNLOCK(cc) pthread_mutex_lock(&cc->lock)
+#define CAMEL_ACTIVE_LOCK() pthread_mutex_lock(&cancel_active_lock)
+#define CAMEL_ACTIVE_UNLOCK() pthread_mutex_lock(&cancel_active_lock)
+static pthread_mutex_t cancel_active_lock = PTHREAD_MUTEX_INITIALIZER;
+#else
+#define CAMEL_CANCEL_LOCK(cc)
+#define CAMEL_CANCEL_UNLOCK(cc)
+#define CAMEL_ACTIVE_LOCK()
+#define CAMEL_ACTIVE_UNLOCK()
+#endif
+
+static GHashTable *cancel_active;
+
+typedef struct _CamelCancelMsg {
+ EMsg msg;
+} CamelCancelMsg ;
+
+/* creates a new cancel handle */
+CamelCancel *camel_cancel_new(void)
+{
+ CamelCancel *cc;
+
+ cc = g_malloc0(sizeof(*cc));
+
+ cc->flags = 0;
+ cc->blocked = 0;
+ cc->refcount = 1;
+#ifdef ENABLE_THREADS
+ cc->id = ~0;
+ cc->cancel_port = e_msgport_new();
+ cc->cancel_fd = e_msgport_fd(cc->cancel_port);
+ pthread_mutex_init(&cc->lock, NULL);
+#endif
+
+ return cc;
+}
+
+void camel_cancel_reset(CamelCancel *cc)
+{
+#ifdef ENABLE_THREADS
+ CamelCancelMsg *msg;
+
+ while ((msg = (CamelCancelMsg *)e_msgport_get(cc->cancel_port)))
+ g_free(msg);
+#endif
+
+ cc->flags = 0;
+ cc->blocked = 0;
+}
+
+void camel_cancel_ref(CamelCancel *cc)
+{
+ CAMEL_CANCEL_LOCK(cc);
+ cc->refcount++;
+ CAMEL_CANCEL_UNLOCK(cc);
+}
+
+void camel_cancel_unref(CamelCancel *cc)
+{
+#ifdef ENABLE_THREADS
+ CamelCancelMsg *msg;
+
+ if (cc->refcount == 1) {
+ while ((msg = (CamelCancelMsg *)e_msgport_get(cc->cancel_port)))
+ g_free(msg);
+
+ e_msgport_destroy(cc->cancel_port);
+#endif
+ g_free(cc);
+ } else {
+ CAMEL_CANCEL_LOCK(cc);
+ cc->refcount--;
+ CAMEL_CANCEL_UNLOCK(cc);
+ }
+}
+
+/* block cancellation */
+void camel_cancel_block(CamelCancel *cc)
+{
+ CAMEL_CANCEL_LOCK(cc);
+
+ cc->blocked++;
+
+ CAMEL_CANCEL_UNLOCK(cc);
+}
+
+/* unblock cancellation */
+void camel_cancel_unblock(CamelCancel *cc)
+{
+ CAMEL_CANCEL_LOCK(cc);
+
+ cc->blocked--;
+
+ CAMEL_CANCEL_UNLOCK(cc);
+}
+
+/* cancels an operation */
+void camel_cancel_cancel(CamelCancel *cc)
+{
+ CamelCancelMsg *msg;
+
+ if ((cc->flags & CAMEL_CANCEL_CANCELLED) == 0) {
+ CAMEL_CANCEL_LOCK(cc);
+ msg = g_malloc0(sizeof(*msg));
+ e_msgport_put(cc->cancel_port, (EMsg *)msg);
+ cc->flags |= CAMEL_CANCEL_CANCELLED;
+ CAMEL_CANCEL_UNLOCK(cc);
+ }
+}
+
+/* register a thread for cancellation */
+void camel_cancel_register(CamelCancel *cc)
+{
+ pthread_t id = pthread_self();
+
+ CAMEL_ACTIVE_LOCK();
+
+ if (cancel_active == NULL)
+ cancel_active = g_hash_table_new(NULL, NULL);
+
+ if (cc == NULL) {
+ cc = g_hash_table_lookup(cancel_active, (void *)id);
+ if (cc == NULL) {
+ cc = camel_cancel_new();
+ }
+ }
+
+ cc->id = id;
+ g_hash_table_insert(cancel_active, (void *)id, cc);
+ camel_cancel_ref(cc);
+
+ CAMEL_ACTIVE_UNLOCK();
+}
+
+/* remove a thread from being able to be cancelled */
+void camel_cancel_unregister(CamelCancel *cc)
+{
+ CAMEL_ACTIVE_LOCK();
+
+ if (cancel_active == NULL)
+ cancel_active = g_hash_table_new(NULL, NULL);
+
+ if (cc == NULL) {
+ cc = g_hash_table_lookup(cancel_active, (void *)cc->id);
+ if (cc == NULL) {
+ g_warning("Trying to unregister a thread that was never registered for cancellation");
+ }
+ }
+
+ if (cc)
+ g_hash_table_remove(cancel_active, (void *)cc->id);
+
+ CAMEL_ACTIVE_UNLOCK();
+
+ if (cc)
+ camel_cancel_unref(cc);
+}
+
+/* test for cancellation */
+gboolean camel_cancel_check(CamelCancel *cc)
+{
+ CamelCancelMsg *msg;
+
+ if (cc == NULL) {
+ if (cancel_active) {
+ CAMEL_ACTIVE_LOCK();
+ cc = g_hash_table_lookup(cancel_active, (void *)pthread_self());
+ CAMEL_ACTIVE_UNLOCK();
+ }
+ if (cc == NULL)
+ return FALSE;
+ }
+
+ if (cc->blocked > 0)
+ return FALSE;
+
+ if (cc->flags & CAMEL_CANCEL_CANCELLED)
+ return TRUE;
+
+ msg = (CamelCancelMsg *)e_msgport_get(cc->cancel_port);
+ if (msg) {
+ CAMEL_CANCEL_LOCK(cc);
+ cc->flags |= CAMEL_CANCEL_CANCELLED;
+ CAMEL_CANCEL_UNLOCK(cc);
+ return TRUE;
+ }
+ return FALSE;
+}
+
+/* get the fd for cancellation waiting */
+int camel_cancel_fd(CamelCancel *cc)
+{
+ if (cc == NULL) {
+ if (cancel_active) {
+ CAMEL_ACTIVE_LOCK();
+ cc = g_hash_table_lookup(cancel_active, (void *)pthread_self());
+ CAMEL_ACTIVE_UNLOCK();
+ }
+ if (cc == NULL)
+ return -1;
+ }
+ if (cc->blocked)
+ return -1;
+
+ return cc->cancel_fd;
+}
+
diff --git a/camel/camel-session.h b/camel/camel-session.h
index 78b4b817e5..9ae9eec089 100644
--- a/camel/camel-session.h
+++ b/camel/camel-session.h
@@ -126,6 +126,24 @@ guint camel_session_register_timeout (CamelSession *session,
gboolean camel_session_remove_timeout (CamelSession *session,
guint handle);
+
+/* cancellation helper stuff, not yet finalised */
+typedef struct _CamelCancel CamelCancel;
+/* main thread functions */
+CamelCancel *camel_cancel_new(void);
+void camel_cancel_ref(CamelCancel *cc);
+void camel_cancel_unref(CamelCancel *cc);
+void camel_cancel_reset(CamelCancel *cc);
+void camel_cancel_cancel(CamelCancel *cc);
+/* subthread functions */
+void camel_cancel_register(CamelCancel *cc);
+void camel_cancel_unregister(CamelCancel *cc);
+/* called internally by camel, for the current thread */
+void camel_cancel_block(CamelCancel *cc);
+void camel_cancel_unblock(CamelCancel *cc);
+gboolean camel_cancel_check(CamelCancel *cc);
+int camel_cancel_fd(CamelCancel *cc);
+
#ifdef __cplusplus
}
#endif /* __cplusplus */
diff --git a/camel/camel-stream-fs.c b/camel/camel-stream-fs.c
index 59082ec531..9f05b2fd8c 100644
--- a/camel/camel-stream-fs.c
+++ b/camel/camel-stream-fs.c
@@ -209,13 +209,39 @@ stream_read (CamelStream *stream, char *buffer, size_t n)
CamelStreamFs *stream_fs = CAMEL_STREAM_FS (stream);
CamelSeekableStream *seekable = CAMEL_SEEKABLE_STREAM (stream);
ssize_t nread;
+ int cancel_fd;
+
+ if (camel_cancel_check(NULL)) {
+ errno = EINTR;
+ return -1;
+ }
if (seekable->bound_end != CAMEL_STREAM_UNBOUND)
n = MIN (seekable->bound_end - seekable->position, n);
- do {
- nread = read (stream_fs->fd, buffer, n);
- } while (nread == -1 && errno == EINTR);
+ cancel_fd = camel_cancel_fd(NULL);
+ if (cancel_fd == -1) {
+ do {
+ nread = read (stream_fs->fd, buffer, n);
+ } while (nread == -1 && errno == EINTR);
+ } else {
+ fd_set rdset;
+ long flags;
+
+ fcntl(stream_fs->fd, F_GETFL, &flags);
+ fcntl(stream_fs->fd, F_SETFL, flags | O_NONBLOCK);
+ FD_ZERO(&rdset);
+ FD_SET(stream_fs->fd, &rdset);
+ FD_SET(cancel_fd, &rdset);
+ select((stream_fs->fd+cancel_fd)/2+1, &rdset, 0, 0, NULL);
+ if (FD_ISSET(cancel_fd, &rdset)) {
+ fcntl(stream_fs->fd, F_SETFL, flags);
+ errno = EINTR;
+ return -1;
+ }
+ nread = read(stream_fs->fd, buffer, n);
+ fcntl(stream_fs->fd, F_SETFL, flags);
+ }
if (nread > 0)
seekable->position += nread;
@@ -231,15 +257,44 @@ stream_write (CamelStream *stream, const char *buffer, size_t n)
CamelStreamFs *stream_fs = CAMEL_STREAM_FS (stream);
CamelSeekableStream *seekable = CAMEL_SEEKABLE_STREAM (stream);
ssize_t v, written = 0;
+ int cancel_fd;
+
+ if (camel_cancel_check(NULL)) {
+ errno = EINTR;
+ return -1;
+ }
if (seekable->bound_end != CAMEL_STREAM_UNBOUND)
n = MIN (seekable->bound_end - seekable->position, n);
- do {
- v = write (stream_fs->fd, buffer, n);
- if (v > 0)
+ cancel_fd = camel_cancel_fd(NULL);
+ if (cancel_fd == -1) {
+ do {
+ v = write (stream_fs->fd, buffer, n);
+ if (v > 0)
+ written += v;
+ } while (v == -1 && errno == EINTR);
+ } else {
+ fd_set rdset, wrset;
+ long flags;
+
+ fcntl(stream_fs->fd, F_GETFL, &flags);
+ fcntl(stream_fs->fd, F_SETFL, flags | O_NONBLOCK);
+ FD_ZERO(&rdset);
+ FD_ZERO(&wrset);
+ FD_SET(stream_fs->fd, &wrset);
+ FD_SET(cancel_fd, &rdset);
+ select((stream_fs->fd+cancel_fd)/2+1, &rdset, &wrset, 0, NULL);
+ if (FD_ISSET(cancel_fd, &rdset)) {
+ fcntl(stream_fs->fd, F_SETFL, flags);
+ errno = EINTR;
+ return -1;
+ }
+ v = write(stream_fs->fd, buffer, n);
+ if (v>0)
written += v;
- } while (v == -1 && errno == EINTR);
+ fcntl(stream_fs->fd, F_SETFL, flags);
+ }
if (written > 0)
seekable->position += written;