From b64ffc9183d2c94ce6b8fdf10f382904aa8021e8 Mon Sep 17 00:00:00 2001 From: Not Zed Date: Mon, 15 Jan 2001 07:55:30 +0000 Subject: A cancellable connection routine. (remote_send_string): Return cancelled 2001-01-15 Not Zed * camel-remote-store.c (socket_connect): A cancellable connection routine. (remote_send_string): Return cancelled exception if we were. (remote_send_stream): " (remote_recv_line): " * camel-stream-fs.c (stream_read): First cut at cancellation stuff. Its looking a bit ugly. svn path=/trunk/; revision=7496 --- camel/ChangeLog | 11 +++ camel/camel-remote-store.c | 134 +++++++++++++++++++++++---- camel/camel-session.c | 226 +++++++++++++++++++++++++++++++++++++++++++++ camel/camel-session.h | 18 ++++ camel/camel-stream-fs.c | 69 ++++++++++++-- 5 files changed, 434 insertions(+), 24 deletions(-) diff --git a/camel/ChangeLog b/camel/ChangeLog index 9ab195b5a7..11d938abe7 100644 --- a/camel/ChangeLog +++ b/camel/ChangeLog @@ -1,3 +1,14 @@ +2001-01-15 Not Zed + + * camel-remote-store.c (socket_connect): A cancellable connection + routine. + (remote_send_string): Return cancelled exception if we were. + (remote_send_stream): " + (remote_recv_line): " + + * camel-stream-fs.c (stream_read): First cut at cancellation + stuff. Its looking a bit ugly. + 2001-01-15 Jeffrey Stedfast * camel-tcp-stream-ssl.c (stream_connect): Uses an SSL socket now diff --git a/camel/camel-remote-store.c b/camel/camel-remote-store.c index 8d095244e2..9e7a60e378 100644 --- a/camel/camel-remote-store.c +++ b/camel/camel-remote-store.c @@ -25,6 +25,7 @@ #include +#include #include #include #include @@ -205,6 +206,87 @@ timeout_cb (gpointer data) return TRUE; } +/* this is a 'cancellable' connect, cancellable from camel_cancel etc */ +/* returns -1 & errno == EINTR if the connection was cancelled */ +static int socket_connect(struct hostent *h, int port) +{ + struct sockaddr_in sin; + int fd; + int ret; + socklen_t len; + struct timeval tv; + int cancel_fd; + + /* see if we're cancelled yet */ + if (camel_cancel_check(NULL)) { + errno = EINTR; + return -1; + } + + /* setup connect, we do it using a nonblocking socket so we can poll it */ + sin.sin_port = htons(port); + sin.sin_family = h->h_addrtype; + memcpy (&sin.sin_addr, h->h_addr, sizeof (sin.sin_addr)); + + fd = socket (h->h_addrtype, SOCK_STREAM, 0); + + cancel_fd = camel_cancel_fd(NULL); + if (cancel_fd == -1) { + ret = connect(fd, (struct sockaddr *)&sin, sizeof (sin)); + if (ret == -1) { + close(fd); + return -1; + } + return fd; + } else { + fd_set rdset, wrset; + long flags; + + fcntl(fd, F_GETFL, &flags); + fcntl(fd, F_SETFL, flags | O_NONBLOCK); + + ret = connect(fd, (struct sockaddr *)&sin, sizeof (sin)); + if (ret == 0) + return fd; + + if (errno != EINPROGRESS) { + close(fd); + return -1; + } + + FD_ZERO(&rdset); + FD_ZERO(&wrset); + FD_SET(fd, &wrset); + FD_SET(cancel_fd, &rdset); + tv.tv_usec = 0; + tv.tv_sec = 30; + if (select((fd+cancel_fd)/2+1, &rdset, &wrset, 0, &tv) == 0) { + close(fd); + errno = ETIMEDOUT; + return -1; + } + if (cancel_fd != -1 && FD_ISSET(cancel_fd, &rdset)) { + close(fd); + errno = EINTR; + return -1; + } else { + len = sizeof(int); + if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &ret, &len) == -1) { + close(fd); + return -1; + } + if (ret != 0) { + close(fd); + errno = ret; + return -1; + } + } + fcntl(fd, F_SETFL, flags); + } + + return fd; +} + static gboolean remote_connect (CamelService *service, CamelException *ex) { @@ -225,6 +307,20 @@ remote_connect (CamelService *service, CamelException *ex) port = service->url->port; else port = store->default_port; + +#if 1 + fd = socket_connect(h, port); + if (fd == -1) { + if (errno == EINTR) + camel_exception_set(ex, CAMEL_EXCEPTION_USER_CANCEL, _("Connection cancelled")); + else + camel_exception_setv (ex, CAMEL_EXCEPTION_SERVICE_UNAVAILABLE, + _("Could not connect to %s (port %d): %s"), + service->url->host ? service->url->host : _("(unknown host)"), + port, strerror (errno)); + return FALSE; + } +#else sin.sin_port = htons (port); memcpy (&sin.sin_addr, h->h_addr, sizeof (sin.sin_addr)); @@ -240,6 +336,7 @@ remote_connect (CamelService *service, CamelException *ex) return FALSE; } +#endif /* parent class connect initialization */ if (CAMEL_SERVICE_CLASS (store_class)->connect (service, ex) == FALSE) @@ -322,9 +419,11 @@ remote_send_string (CamelRemoteStore *store, CamelException *ex, char *fmt, va_l #endif if (camel_stream_printf (store->ostream, "%s", cmdbuf) == -1) { + if (errno == EINTR) + camel_exception_set(ex, CAMEL_EXCEPTION_USER_CANCEL, _("Operation cancelled")); + else + camel_exception_set(ex, CAMEL_EXCEPTION_SERVICE_UNAVAILABLE, strerror(errno)); g_free (cmdbuf); - camel_exception_set (ex, CAMEL_EXCEPTION_SERVICE_UNAVAILABLE, - g_strerror (errno)); camel_service_disconnect (CAMEL_SERVICE (store), FALSE, NULL); return -1; @@ -381,9 +480,11 @@ remote_send_stream (CamelRemoteStore *store, CamelStream *stream, CamelException d(fprintf (stderr, "(sending stream)\n")); ret = camel_stream_write_to_stream (stream, store->ostream); - if (ret < 0) { - camel_exception_set (ex, CAMEL_EXCEPTION_SERVICE_UNAVAILABLE, - g_strerror (errno)); + if (ret == -1) { + if (errno == EINTR) + camel_exception_set(ex, CAMEL_EXCEPTION_USER_CANCEL, _("Operation cancelled")); + else + camel_exception_set(ex, CAMEL_EXCEPTION_SERVICE_UNAVAILABLE, strerror(errno)); camel_service_disconnect (CAMEL_SERVICE (store), FALSE, NULL); } @@ -446,23 +547,22 @@ remote_recv_line (CamelRemoteStore *store, char **dest, CamelException *ex) if (nread > 0) g_byte_array_append (bytes, buf, nread); } while (nread == sizeof (buf) - 1); - + + if (nread == -1) { + if (errno == EINTR) + camel_exception_set(ex, CAMEL_EXCEPTION_USER_CANCEL, _("Operation cancelled")); + else + camel_exception_set(ex, CAMEL_EXCEPTION_SERVICE_UNAVAILABLE, strerror(errno)); + g_byte_array_free(bytes, TRUE); + camel_service_disconnect (CAMEL_SERVICE (store), FALSE, NULL); + return -1; + } + g_byte_array_append (bytes, "", 1); ret = bytes->data; nread = bytes->len - 1; g_byte_array_free (bytes, FALSE); - if (nread <= 0) { - g_free (ret); - ret = NULL; - camel_exception_set (ex, CAMEL_EXCEPTION_SERVICE_UNAVAILABLE, - nread ? g_strerror (errno) : - _("Server disconnected.")); - - camel_service_disconnect (CAMEL_SERVICE (store), FALSE, NULL); - return -1; - } - /* strip off the CRLF sequence */ while (nread > 0 && ret[nread] != '\r') ret[nread--] = '\0'; diff --git a/camel/camel-session.c b/camel/camel-session.c index b1cc489c71..67d58a1e67 100644 --- a/camel/camel-session.c +++ b/camel/camel-session.c @@ -482,3 +482,229 @@ camel_session_remove_timeout (CamelSession *session, guint handle) { return session->remover (handle); } + +/* ********************************************************************** */ + +struct _CamelCancel { + pthread_t id; /* id of running thread */ + guint32 flags; /* cancelled ? */ + int blocked; /* cancellation blocked depth */ + int refcount; +#ifdef ENABLE_THREADS + EMsgPort *cancel_port; + int cancel_fd; + pthread_mutex_t lock; +#endif +}; + +#define CAMEL_CANCEL_CANCELLED (1<<0) + +#ifdef ENABLE_THREADS +#define CAMEL_CANCEL_LOCK(cc) pthread_mutex_lock(&cc->lock) +#define CAMEL_CANCEL_UNLOCK(cc) pthread_mutex_lock(&cc->lock) +#define CAMEL_ACTIVE_LOCK() pthread_mutex_lock(&cancel_active_lock) +#define CAMEL_ACTIVE_UNLOCK() pthread_mutex_lock(&cancel_active_lock) +static pthread_mutex_t cancel_active_lock = PTHREAD_MUTEX_INITIALIZER; +#else +#define CAMEL_CANCEL_LOCK(cc) +#define CAMEL_CANCEL_UNLOCK(cc) +#define CAMEL_ACTIVE_LOCK() +#define CAMEL_ACTIVE_UNLOCK() +#endif + +static GHashTable *cancel_active; + +typedef struct _CamelCancelMsg { + EMsg msg; +} CamelCancelMsg ; + +/* creates a new cancel handle */ +CamelCancel *camel_cancel_new(void) +{ + CamelCancel *cc; + + cc = g_malloc0(sizeof(*cc)); + + cc->flags = 0; + cc->blocked = 0; + cc->refcount = 1; +#ifdef ENABLE_THREADS + cc->id = ~0; + cc->cancel_port = e_msgport_new(); + cc->cancel_fd = e_msgport_fd(cc->cancel_port); + pthread_mutex_init(&cc->lock, NULL); +#endif + + return cc; +} + +void camel_cancel_reset(CamelCancel *cc) +{ +#ifdef ENABLE_THREADS + CamelCancelMsg *msg; + + while ((msg = (CamelCancelMsg *)e_msgport_get(cc->cancel_port))) + g_free(msg); +#endif + + cc->flags = 0; + cc->blocked = 0; +} + +void camel_cancel_ref(CamelCancel *cc) +{ + CAMEL_CANCEL_LOCK(cc); + cc->refcount++; + CAMEL_CANCEL_UNLOCK(cc); +} + +void camel_cancel_unref(CamelCancel *cc) +{ +#ifdef ENABLE_THREADS + CamelCancelMsg *msg; + + if (cc->refcount == 1) { + while ((msg = (CamelCancelMsg *)e_msgport_get(cc->cancel_port))) + g_free(msg); + + e_msgport_destroy(cc->cancel_port); +#endif + g_free(cc); + } else { + CAMEL_CANCEL_LOCK(cc); + cc->refcount--; + CAMEL_CANCEL_UNLOCK(cc); + } +} + +/* block cancellation */ +void camel_cancel_block(CamelCancel *cc) +{ + CAMEL_CANCEL_LOCK(cc); + + cc->blocked++; + + CAMEL_CANCEL_UNLOCK(cc); +} + +/* unblock cancellation */ +void camel_cancel_unblock(CamelCancel *cc) +{ + CAMEL_CANCEL_LOCK(cc); + + cc->blocked--; + + CAMEL_CANCEL_UNLOCK(cc); +} + +/* cancels an operation */ +void camel_cancel_cancel(CamelCancel *cc) +{ + CamelCancelMsg *msg; + + if ((cc->flags & CAMEL_CANCEL_CANCELLED) == 0) { + CAMEL_CANCEL_LOCK(cc); + msg = g_malloc0(sizeof(*msg)); + e_msgport_put(cc->cancel_port, (EMsg *)msg); + cc->flags |= CAMEL_CANCEL_CANCELLED; + CAMEL_CANCEL_UNLOCK(cc); + } +} + +/* register a thread for cancellation */ +void camel_cancel_register(CamelCancel *cc) +{ + pthread_t id = pthread_self(); + + CAMEL_ACTIVE_LOCK(); + + if (cancel_active == NULL) + cancel_active = g_hash_table_new(NULL, NULL); + + if (cc == NULL) { + cc = g_hash_table_lookup(cancel_active, (void *)id); + if (cc == NULL) { + cc = camel_cancel_new(); + } + } + + cc->id = id; + g_hash_table_insert(cancel_active, (void *)id, cc); + camel_cancel_ref(cc); + + CAMEL_ACTIVE_UNLOCK(); +} + +/* remove a thread from being able to be cancelled */ +void camel_cancel_unregister(CamelCancel *cc) +{ + CAMEL_ACTIVE_LOCK(); + + if (cancel_active == NULL) + cancel_active = g_hash_table_new(NULL, NULL); + + if (cc == NULL) { + cc = g_hash_table_lookup(cancel_active, (void *)cc->id); + if (cc == NULL) { + g_warning("Trying to unregister a thread that was never registered for cancellation"); + } + } + + if (cc) + g_hash_table_remove(cancel_active, (void *)cc->id); + + CAMEL_ACTIVE_UNLOCK(); + + if (cc) + camel_cancel_unref(cc); +} + +/* test for cancellation */ +gboolean camel_cancel_check(CamelCancel *cc) +{ + CamelCancelMsg *msg; + + if (cc == NULL) { + if (cancel_active) { + CAMEL_ACTIVE_LOCK(); + cc = g_hash_table_lookup(cancel_active, (void *)pthread_self()); + CAMEL_ACTIVE_UNLOCK(); + } + if (cc == NULL) + return FALSE; + } + + if (cc->blocked > 0) + return FALSE; + + if (cc->flags & CAMEL_CANCEL_CANCELLED) + return TRUE; + + msg = (CamelCancelMsg *)e_msgport_get(cc->cancel_port); + if (msg) { + CAMEL_CANCEL_LOCK(cc); + cc->flags |= CAMEL_CANCEL_CANCELLED; + CAMEL_CANCEL_UNLOCK(cc); + return TRUE; + } + return FALSE; +} + +/* get the fd for cancellation waiting */ +int camel_cancel_fd(CamelCancel *cc) +{ + if (cc == NULL) { + if (cancel_active) { + CAMEL_ACTIVE_LOCK(); + cc = g_hash_table_lookup(cancel_active, (void *)pthread_self()); + CAMEL_ACTIVE_UNLOCK(); + } + if (cc == NULL) + return -1; + } + if (cc->blocked) + return -1; + + return cc->cancel_fd; +} + diff --git a/camel/camel-session.h b/camel/camel-session.h index 78b4b817e5..9ae9eec089 100644 --- a/camel/camel-session.h +++ b/camel/camel-session.h @@ -126,6 +126,24 @@ guint camel_session_register_timeout (CamelSession *session, gboolean camel_session_remove_timeout (CamelSession *session, guint handle); + +/* cancellation helper stuff, not yet finalised */ +typedef struct _CamelCancel CamelCancel; +/* main thread functions */ +CamelCancel *camel_cancel_new(void); +void camel_cancel_ref(CamelCancel *cc); +void camel_cancel_unref(CamelCancel *cc); +void camel_cancel_reset(CamelCancel *cc); +void camel_cancel_cancel(CamelCancel *cc); +/* subthread functions */ +void camel_cancel_register(CamelCancel *cc); +void camel_cancel_unregister(CamelCancel *cc); +/* called internally by camel, for the current thread */ +void camel_cancel_block(CamelCancel *cc); +void camel_cancel_unblock(CamelCancel *cc); +gboolean camel_cancel_check(CamelCancel *cc); +int camel_cancel_fd(CamelCancel *cc); + #ifdef __cplusplus } #endif /* __cplusplus */ diff --git a/camel/camel-stream-fs.c b/camel/camel-stream-fs.c index 59082ec531..9f05b2fd8c 100644 --- a/camel/camel-stream-fs.c +++ b/camel/camel-stream-fs.c @@ -209,13 +209,39 @@ stream_read (CamelStream *stream, char *buffer, size_t n) CamelStreamFs *stream_fs = CAMEL_STREAM_FS (stream); CamelSeekableStream *seekable = CAMEL_SEEKABLE_STREAM (stream); ssize_t nread; + int cancel_fd; + + if (camel_cancel_check(NULL)) { + errno = EINTR; + return -1; + } if (seekable->bound_end != CAMEL_STREAM_UNBOUND) n = MIN (seekable->bound_end - seekable->position, n); - do { - nread = read (stream_fs->fd, buffer, n); - } while (nread == -1 && errno == EINTR); + cancel_fd = camel_cancel_fd(NULL); + if (cancel_fd == -1) { + do { + nread = read (stream_fs->fd, buffer, n); + } while (nread == -1 && errno == EINTR); + } else { + fd_set rdset; + long flags; + + fcntl(stream_fs->fd, F_GETFL, &flags); + fcntl(stream_fs->fd, F_SETFL, flags | O_NONBLOCK); + FD_ZERO(&rdset); + FD_SET(stream_fs->fd, &rdset); + FD_SET(cancel_fd, &rdset); + select((stream_fs->fd+cancel_fd)/2+1, &rdset, 0, 0, NULL); + if (FD_ISSET(cancel_fd, &rdset)) { + fcntl(stream_fs->fd, F_SETFL, flags); + errno = EINTR; + return -1; + } + nread = read(stream_fs->fd, buffer, n); + fcntl(stream_fs->fd, F_SETFL, flags); + } if (nread > 0) seekable->position += nread; @@ -231,15 +257,44 @@ stream_write (CamelStream *stream, const char *buffer, size_t n) CamelStreamFs *stream_fs = CAMEL_STREAM_FS (stream); CamelSeekableStream *seekable = CAMEL_SEEKABLE_STREAM (stream); ssize_t v, written = 0; + int cancel_fd; + + if (camel_cancel_check(NULL)) { + errno = EINTR; + return -1; + } if (seekable->bound_end != CAMEL_STREAM_UNBOUND) n = MIN (seekable->bound_end - seekable->position, n); - do { - v = write (stream_fs->fd, buffer, n); - if (v > 0) + cancel_fd = camel_cancel_fd(NULL); + if (cancel_fd == -1) { + do { + v = write (stream_fs->fd, buffer, n); + if (v > 0) + written += v; + } while (v == -1 && errno == EINTR); + } else { + fd_set rdset, wrset; + long flags; + + fcntl(stream_fs->fd, F_GETFL, &flags); + fcntl(stream_fs->fd, F_SETFL, flags | O_NONBLOCK); + FD_ZERO(&rdset); + FD_ZERO(&wrset); + FD_SET(stream_fs->fd, &wrset); + FD_SET(cancel_fd, &rdset); + select((stream_fs->fd+cancel_fd)/2+1, &rdset, &wrset, 0, NULL); + if (FD_ISSET(cancel_fd, &rdset)) { + fcntl(stream_fs->fd, F_SETFL, flags); + errno = EINTR; + return -1; + } + v = write(stream_fs->fd, buffer, n); + if (v>0) written += v; - } while (v == -1 && errno == EINTR); + fcntl(stream_fs->fd, F_SETFL, flags); + } if (written > 0) seekable->position += written; -- cgit v1.2.3