diff options
author | robertabcd <robertabcd@63ad8ddf-47c3-0310-b6dd-a9e9d9715204> | 2014-10-11 18:25:07 +0800 |
---|---|---|
committer | robertabcd <robertabcd@63ad8ddf-47c3-0310-b6dd-a9e9d9715204> | 2014-10-11 18:25:07 +0800 |
commit | 178e98227fcbd64586438ad6430fffd50419ee9d (patch) | |
tree | c44113bcf8336bdab1873b26173ddf0168c01a3c | |
parent | 7ca57260e684c486ae0e24e94e3d9cefc8b88b3d (diff) | |
download | pttbbs-178e98227fcbd64586438ad6430fffd50419ee9d.tar pttbbs-178e98227fcbd64586438ad6430fffd50419ee9d.tar.gz pttbbs-178e98227fcbd64586438ad6430fffd50419ee9d.tar.bz2 pttbbs-178e98227fcbd64586438ad6430fffd50419ee9d.tar.lz pttbbs-178e98227fcbd64586438ad6430fffd50419ee9d.tar.xz pttbbs-178e98227fcbd64586438ad6430fffd50419ee9d.tar.zst pttbbs-178e98227fcbd64586438ad6430fffd50419ee9d.zip |
boardd: multi-thread.
git-svn-id: http://opensvn.csie.org/pttbbs/trunk@6082 63ad8ddf-47c3-0310-b6dd-a9e9d9715204
-rw-r--r-- | pttbbs/daemon/boardd/Makefile | 8 | ||||
-rw-r--r-- | pttbbs/daemon/boardd/boardd.c | 72 | ||||
-rw-r--r-- | pttbbs/daemon/boardd/queue.hpp | 65 | ||||
-rw-r--r-- | pttbbs/daemon/boardd/threadpool.cpp | 53 | ||||
-rw-r--r-- | pttbbs/daemon/boardd/threadpool.h | 18 | ||||
-rw-r--r-- | pttbbs/daemon/boardd/threadpool.hpp | 59 |
6 files changed, 262 insertions, 13 deletions
diff --git a/pttbbs/daemon/boardd/Makefile b/pttbbs/daemon/boardd/Makefile index d14cd43d..d84d5b5d 100644 --- a/pttbbs/daemon/boardd/Makefile +++ b/pttbbs/daemon/boardd/Makefile @@ -4,20 +4,22 @@ SRCROOT= ../.. .include "$(SRCROOT)/pttbbs.mk" PROG= boardd -SRCS= boardd.c server.c +SRCS= boardd.c server.c threadpool.cpp MAN= UTILDIR= $(SRCROOT)/util UTILOBJ= $(UTILDIR)/util_var.o -CFLAGS+= $(LIBEVENT_CFLAGS) +CXXFLAGS+= -std=c++11 +CFLAGS+= $(LIBEVENT_CFLAGS) -DSERVER_USE_PTHREADS LDFLAGS+= $(LIBEVENT_LIBS_L) LDADD:= $(UTILOBJ) \ $(SRCROOT)/common/bbs/libcmbbs.a \ $(SRCROOT)/common/sys/libcmsys.a \ $(SRCROOT)/common/osdep/libosdep.a \ - $(LIBEVENT_LIBS_l) \ + $(LIBEVENT_LIBS_l) -levent_pthreads \ + -lpthread -lstdc++ \ $(LDADD) .include <bsd.prog.mk> diff --git a/pttbbs/daemon/boardd/boardd.c b/pttbbs/daemon/boardd/boardd.c index 1d63eab0..e44dbb17 100644 --- a/pttbbs/daemon/boardd/boardd.c +++ b/pttbbs/daemon/boardd/boardd.c @@ -27,10 +27,19 @@ #include <perm.h> #include "server.h" +#include "threadpool.h" #define CONVERT_TO_UTF8 +#define BOARDD_MT #define DEFAULT_ARTICLE_LIST 20 +#define NUM_THREADS 8 + +// Globals + +#ifdef BOARDD_MT +threadpool_t g_threadpool; +#endif // helper function @@ -652,11 +661,45 @@ static const struct { {NULL, cmd_unknown} }; +static void +process_line(struct bufferevent *bev, void *ctx, char *line) +{ + char **argv; + int argc = split_args(line, &argv); + int i; + + for (i = 0; cmdlist[i].cmd; i++) + if (evutil_ascii_strcasecmp(line, cmdlist[i].cmd) == 0) + break; + + (cmdlist[i].func)(bev, ctx, argc - 1, argv + 1); + + free(argv); + free(line); +} + +#ifdef BOARDD_MT + +typedef struct { + struct bufferevent *bev; + void *ctx; + char *line; +} process_line_ctx; + +void +process_line_job_func(void *ctx) +{ + process_line_ctx *cx = (process_line_ctx *) ctx; + process_line(cx->bev, cx->ctx, cx->line); + bufferevent_enable(cx->bev, EV_READ); + free(cx); +} + +#endif + void client_read_cb(struct bufferevent *bev, void *ctx) { - int argc, i; - char **argv; size_t len; struct evbuffer *input = bufferevent_get_input(bev); char *line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF); @@ -664,16 +707,21 @@ client_read_cb(struct bufferevent *bev, void *ctx) if (!line) return; - argc = split_args(line, &argv); - - for (i = 0; cmdlist[i].cmd; i++) - if (evutil_ascii_strcasecmp(line, cmdlist[i].cmd) == 0) - break; +#ifdef BOARDD_MT + process_line_ctx *plc = (process_line_ctx *) malloc(sizeof(process_line_ctx)); + if (!plc) + return; - (cmdlist[i].func)(bev, ctx, argc - 1, argv + 1); + // One request at a time. + bufferevent_disable(bev, EV_READ); - free(argv); - free(line); + plc->bev = bev; + plc->ctx = ctx; + plc->line = line; + threadpool_do(g_threadpool, threadpool_job_new(process_line_job_func, plc)); +#else + process_line(bev, ctx, line); +#endif } void @@ -683,6 +731,10 @@ setup_program() setgid(BBSGID); chdir(BBSHOME); +#ifdef BOARDD_MT + g_threadpool = threadpool_new(NUM_THREADS); +#endif + attach_SHM(); } diff --git a/pttbbs/daemon/boardd/queue.hpp b/pttbbs/daemon/boardd/queue.hpp new file mode 100644 index 00000000..281b444c --- /dev/null +++ b/pttbbs/daemon/boardd/queue.hpp @@ -0,0 +1,65 @@ +// Copyright (c) 2014, Robert Wang <robert@arctic.tw> +// The MIT License. +#ifndef _QUEUE_H_ +#define _QUEUE_H_ +#include <queue> +#include <mutex> +#include <condition_variable> + +template <class T> +class Queue +{ +public: + Queue(size_t max = 0) + : eof_(false) + , max_(max) + {} + + void Push(T elem) + { + std::unique_lock<std::mutex> lk(mu_); + while (true) { + if (eof_) + throw std::runtime_error("Queue closed"); + if (max_ && qu_.size() >= max_) { + cv_poped_.wait(lk); + continue; + } + qu_.push(elem); + cv_pushed_.notify_one(); + return; + } + } + + bool Pop(T &elem) + { + std::unique_lock<std::mutex> lk(mu_); + while (qu_.empty()) { + if (eof_) + return false; + cv_pushed_.wait(lk); + } + elem = qu_.front(); + qu_.pop(); + cv_poped_.notify_one(); + return true; + } + + void Close() + { + std::unique_lock<std::mutex> lk(mu_); + eof_ = true; + cv_pushed_.notify_all(); + cv_poped_.notify_all(); + } + +private: + std::queue<T> qu_; + std::mutex mu_; + std::condition_variable cv_pushed_; + std::condition_variable cv_poped_; + bool eof_; + size_t max_; +}; + +#endif diff --git a/pttbbs/daemon/boardd/threadpool.cpp b/pttbbs/daemon/boardd/threadpool.cpp new file mode 100644 index 00000000..cc15505c --- /dev/null +++ b/pttbbs/daemon/boardd/threadpool.cpp @@ -0,0 +1,53 @@ +// Copyright (c) 2014, Robert Wang <robert@arctic.tw> +// The MIT License. +#include "threadpool.hpp" +extern "C" { +#include "threadpool.h" +} + +namespace { + +class FuncJob : public Job +{ +public: + using Func = void (*)(void *); + FuncJob(Func f, void *ctx) : f(f), ctx(ctx) {} + void Run() { f(ctx); } +private: + Func f; + void *ctx; +}; + +} // namespace + +struct threadpool_ctx { + ThreadPool<FuncJob> *pool; +}; + +threadpool_t threadpool_new(size_t num) +{ + threadpool_t p = new threadpool_ctx; + p->pool = new ThreadPool<FuncJob>(num); + return p; +} + +void threadpool_free(threadpool_t p) +{ + delete p->pool; + delete p; +} + +void threadpool_do(threadpool_t p, threadpool_job_t job) +{ + p->pool->Do(reinterpret_cast<FuncJob *>(job)); +} + +threadpool_job_t threadpool_job_new(threadpool_job_func func, void *ctx) +{ + return reinterpret_cast<threadpool_job_t>(new FuncJob(func, ctx)); +} + +void threadpool_job_free(threadpool_job_t job) +{ + delete reinterpret_cast<FuncJob *>(job); +} diff --git a/pttbbs/daemon/boardd/threadpool.h b/pttbbs/daemon/boardd/threadpool.h new file mode 100644 index 00000000..9d589623 --- /dev/null +++ b/pttbbs/daemon/boardd/threadpool.h @@ -0,0 +1,18 @@ +// Copyright (c) 2014, Robert Wang <robert@arctic.tw> +// The MIT License. +#ifndef _THREADPOOL_H_ +# define _THREADPOOL_H_ + +struct threadpool_ctx; +typedef struct threadpool_ctx *threadpool_t; +typedef void *threadpool_job_t; + +threadpool_t threadpool_new(size_t num); +void threadpool_free(threadpool_t p); +void threadpool_do(threadpool_t p, threadpool_job_t job); + +typedef void (*threadpool_job_func)(void *ctx); +threadpool_job_t threadpool_job_new(threadpool_job_func func, void *ctx); +void threadpool_job_free(threadpool_job_t job); + +#endif diff --git a/pttbbs/daemon/boardd/threadpool.hpp b/pttbbs/daemon/boardd/threadpool.hpp new file mode 100644 index 00000000..bcba42a1 --- /dev/null +++ b/pttbbs/daemon/boardd/threadpool.hpp @@ -0,0 +1,59 @@ +// Copyright (c) 2014, Robert Wang <robert@arctic.tw> +// The MIT License. +#ifndef _THREADPOOL_HPP +# define _THREADPOOL_HPP +#include <thread> +#include "queue.hpp" + +namespace { + +template <class Job> +void WorkerThread(Queue<Job *> *q) +{ + while (true) { + Job *job; + if (!q->Pop(job)) + return; + job->Run(); + delete job; + } +} + +} // namespace + +class Job +{ +public: + virtual ~Job() {} + virtual void Run() = 0; +}; + +template <class Job> +class ThreadPool +{ +public: + ThreadPool(size_t num) + : queue_(num * 4) + { + for (size_t i = 0; i < num; ++i) + threads_.push_back(std::move(std::thread(WorkerThread<Job>, &queue_))); + } + + ~ThreadPool() + { + queue_.Close(); + for (auto &t : threads_) + t.join(); + } + + void Do(Job *job) + { + queue_.Push(job); + } + +private: + Queue<Job *> queue_; + std::vector<std::thread> threads_; +}; + +#endif |