summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobertabcd <robertabcd@63ad8ddf-47c3-0310-b6dd-a9e9d9715204>2014-10-11 18:25:07 +0800
committerrobertabcd <robertabcd@63ad8ddf-47c3-0310-b6dd-a9e9d9715204>2014-10-11 18:25:07 +0800
commit178e98227fcbd64586438ad6430fffd50419ee9d (patch)
treec44113bcf8336bdab1873b26173ddf0168c01a3c
parent7ca57260e684c486ae0e24e94e3d9cefc8b88b3d (diff)
downloadpttbbs-178e98227fcbd64586438ad6430fffd50419ee9d.tar
pttbbs-178e98227fcbd64586438ad6430fffd50419ee9d.tar.gz
pttbbs-178e98227fcbd64586438ad6430fffd50419ee9d.tar.bz2
pttbbs-178e98227fcbd64586438ad6430fffd50419ee9d.tar.lz
pttbbs-178e98227fcbd64586438ad6430fffd50419ee9d.tar.xz
pttbbs-178e98227fcbd64586438ad6430fffd50419ee9d.tar.zst
pttbbs-178e98227fcbd64586438ad6430fffd50419ee9d.zip
boardd: multi-thread.
git-svn-id: http://opensvn.csie.org/pttbbs/trunk@6082 63ad8ddf-47c3-0310-b6dd-a9e9d9715204
-rw-r--r--pttbbs/daemon/boardd/Makefile8
-rw-r--r--pttbbs/daemon/boardd/boardd.c72
-rw-r--r--pttbbs/daemon/boardd/queue.hpp65
-rw-r--r--pttbbs/daemon/boardd/threadpool.cpp53
-rw-r--r--pttbbs/daemon/boardd/threadpool.h18
-rw-r--r--pttbbs/daemon/boardd/threadpool.hpp59
6 files changed, 262 insertions, 13 deletions
diff --git a/pttbbs/daemon/boardd/Makefile b/pttbbs/daemon/boardd/Makefile
index d14cd43d..d84d5b5d 100644
--- a/pttbbs/daemon/boardd/Makefile
+++ b/pttbbs/daemon/boardd/Makefile
@@ -4,20 +4,22 @@ SRCROOT= ../..
.include "$(SRCROOT)/pttbbs.mk"
PROG= boardd
-SRCS= boardd.c server.c
+SRCS= boardd.c server.c threadpool.cpp
MAN=
UTILDIR= $(SRCROOT)/util
UTILOBJ= $(UTILDIR)/util_var.o
-CFLAGS+= $(LIBEVENT_CFLAGS)
+CXXFLAGS+= -std=c++11
+CFLAGS+= $(LIBEVENT_CFLAGS) -DSERVER_USE_PTHREADS
LDFLAGS+= $(LIBEVENT_LIBS_L)
LDADD:= $(UTILOBJ) \
$(SRCROOT)/common/bbs/libcmbbs.a \
$(SRCROOT)/common/sys/libcmsys.a \
$(SRCROOT)/common/osdep/libosdep.a \
- $(LIBEVENT_LIBS_l) \
+ $(LIBEVENT_LIBS_l) -levent_pthreads \
+ -lpthread -lstdc++ \
$(LDADD)
.include <bsd.prog.mk>
diff --git a/pttbbs/daemon/boardd/boardd.c b/pttbbs/daemon/boardd/boardd.c
index 1d63eab0..e44dbb17 100644
--- a/pttbbs/daemon/boardd/boardd.c
+++ b/pttbbs/daemon/boardd/boardd.c
@@ -27,10 +27,19 @@
#include <perm.h>
#include "server.h"
+#include "threadpool.h"
#define CONVERT_TO_UTF8
+#define BOARDD_MT
#define DEFAULT_ARTICLE_LIST 20
+#define NUM_THREADS 8
+
+// Globals
+
+#ifdef BOARDD_MT
+threadpool_t g_threadpool;
+#endif
// helper function
@@ -652,11 +661,45 @@ static const struct {
{NULL, cmd_unknown}
};
+static void
+process_line(struct bufferevent *bev, void *ctx, char *line)
+{
+ char **argv;
+ int argc = split_args(line, &argv);
+ int i;
+
+ for (i = 0; cmdlist[i].cmd; i++)
+ if (evutil_ascii_strcasecmp(line, cmdlist[i].cmd) == 0)
+ break;
+
+ (cmdlist[i].func)(bev, ctx, argc - 1, argv + 1);
+
+ free(argv);
+ free(line);
+}
+
+#ifdef BOARDD_MT
+
+typedef struct {
+ struct bufferevent *bev;
+ void *ctx;
+ char *line;
+} process_line_ctx;
+
+void
+process_line_job_func(void *ctx)
+{
+ process_line_ctx *cx = (process_line_ctx *) ctx;
+ process_line(cx->bev, cx->ctx, cx->line);
+ bufferevent_enable(cx->bev, EV_READ);
+ free(cx);
+}
+
+#endif
+
void
client_read_cb(struct bufferevent *bev, void *ctx)
{
- int argc, i;
- char **argv;
size_t len;
struct evbuffer *input = bufferevent_get_input(bev);
char *line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF);
@@ -664,16 +707,21 @@ client_read_cb(struct bufferevent *bev, void *ctx)
if (!line)
return;
- argc = split_args(line, &argv);
-
- for (i = 0; cmdlist[i].cmd; i++)
- if (evutil_ascii_strcasecmp(line, cmdlist[i].cmd) == 0)
- break;
+#ifdef BOARDD_MT
+ process_line_ctx *plc = (process_line_ctx *) malloc(sizeof(process_line_ctx));
+ if (!plc)
+ return;
- (cmdlist[i].func)(bev, ctx, argc - 1, argv + 1);
+ // One request at a time.
+ bufferevent_disable(bev, EV_READ);
- free(argv);
- free(line);
+ plc->bev = bev;
+ plc->ctx = ctx;
+ plc->line = line;
+ threadpool_do(g_threadpool, threadpool_job_new(process_line_job_func, plc));
+#else
+ process_line(bev, ctx, line);
+#endif
}
void
@@ -683,6 +731,10 @@ setup_program()
setgid(BBSGID);
chdir(BBSHOME);
+#ifdef BOARDD_MT
+ g_threadpool = threadpool_new(NUM_THREADS);
+#endif
+
attach_SHM();
}
diff --git a/pttbbs/daemon/boardd/queue.hpp b/pttbbs/daemon/boardd/queue.hpp
new file mode 100644
index 00000000..281b444c
--- /dev/null
+++ b/pttbbs/daemon/boardd/queue.hpp
@@ -0,0 +1,65 @@
+// Copyright (c) 2014, Robert Wang <robert@arctic.tw>
+// The MIT License.
+#ifndef _QUEUE_H_
+#define _QUEUE_H_
+#include <queue>
+#include <mutex>
+#include <condition_variable>
+
+template <class T>
+class Queue
+{
+public:
+ Queue(size_t max = 0)
+ : eof_(false)
+ , max_(max)
+ {}
+
+ void Push(T elem)
+ {
+ std::unique_lock<std::mutex> lk(mu_);
+ while (true) {
+ if (eof_)
+ throw std::runtime_error("Queue closed");
+ if (max_ && qu_.size() >= max_) {
+ cv_poped_.wait(lk);
+ continue;
+ }
+ qu_.push(elem);
+ cv_pushed_.notify_one();
+ return;
+ }
+ }
+
+ bool Pop(T &elem)
+ {
+ std::unique_lock<std::mutex> lk(mu_);
+ while (qu_.empty()) {
+ if (eof_)
+ return false;
+ cv_pushed_.wait(lk);
+ }
+ elem = qu_.front();
+ qu_.pop();
+ cv_poped_.notify_one();
+ return true;
+ }
+
+ void Close()
+ {
+ std::unique_lock<std::mutex> lk(mu_);
+ eof_ = true;
+ cv_pushed_.notify_all();
+ cv_poped_.notify_all();
+ }
+
+private:
+ std::queue<T> qu_;
+ std::mutex mu_;
+ std::condition_variable cv_pushed_;
+ std::condition_variable cv_poped_;
+ bool eof_;
+ size_t max_;
+};
+
+#endif
diff --git a/pttbbs/daemon/boardd/threadpool.cpp b/pttbbs/daemon/boardd/threadpool.cpp
new file mode 100644
index 00000000..cc15505c
--- /dev/null
+++ b/pttbbs/daemon/boardd/threadpool.cpp
@@ -0,0 +1,53 @@
+// Copyright (c) 2014, Robert Wang <robert@arctic.tw>
+// The MIT License.
+#include "threadpool.hpp"
+extern "C" {
+#include "threadpool.h"
+}
+
+namespace {
+
+class FuncJob : public Job
+{
+public:
+ using Func = void (*)(void *);
+ FuncJob(Func f, void *ctx) : f(f), ctx(ctx) {}
+ void Run() { f(ctx); }
+private:
+ Func f;
+ void *ctx;
+};
+
+} // namespace
+
+struct threadpool_ctx {
+ ThreadPool<FuncJob> *pool;
+};
+
+threadpool_t threadpool_new(size_t num)
+{
+ threadpool_t p = new threadpool_ctx;
+ p->pool = new ThreadPool<FuncJob>(num);
+ return p;
+}
+
+void threadpool_free(threadpool_t p)
+{
+ delete p->pool;
+ delete p;
+}
+
+void threadpool_do(threadpool_t p, threadpool_job_t job)
+{
+ p->pool->Do(reinterpret_cast<FuncJob *>(job));
+}
+
+threadpool_job_t threadpool_job_new(threadpool_job_func func, void *ctx)
+{
+ return reinterpret_cast<threadpool_job_t>(new FuncJob(func, ctx));
+}
+
+void threadpool_job_free(threadpool_job_t job)
+{
+ delete reinterpret_cast<FuncJob *>(job);
+}
diff --git a/pttbbs/daemon/boardd/threadpool.h b/pttbbs/daemon/boardd/threadpool.h
new file mode 100644
index 00000000..9d589623
--- /dev/null
+++ b/pttbbs/daemon/boardd/threadpool.h
@@ -0,0 +1,18 @@
+// Copyright (c) 2014, Robert Wang <robert@arctic.tw>
+// The MIT License.
+#ifndef _THREADPOOL_H_
+# define _THREADPOOL_H_
+
+struct threadpool_ctx;
+typedef struct threadpool_ctx *threadpool_t;
+typedef void *threadpool_job_t;
+
+threadpool_t threadpool_new(size_t num);
+void threadpool_free(threadpool_t p);
+void threadpool_do(threadpool_t p, threadpool_job_t job);
+
+typedef void (*threadpool_job_func)(void *ctx);
+threadpool_job_t threadpool_job_new(threadpool_job_func func, void *ctx);
+void threadpool_job_free(threadpool_job_t job);
+
+#endif
diff --git a/pttbbs/daemon/boardd/threadpool.hpp b/pttbbs/daemon/boardd/threadpool.hpp
new file mode 100644
index 00000000..bcba42a1
--- /dev/null
+++ b/pttbbs/daemon/boardd/threadpool.hpp
@@ -0,0 +1,59 @@
+// Copyright (c) 2014, Robert Wang <robert@arctic.tw>
+// The MIT License.
+#ifndef _THREADPOOL_HPP
+# define _THREADPOOL_HPP
+#include <thread>
+#include "queue.hpp"
+
+namespace {
+
+template <class Job>
+void WorkerThread(Queue<Job *> *q)
+{
+ while (true) {
+ Job *job;
+ if (!q->Pop(job))
+ return;
+ job->Run();
+ delete job;
+ }
+}
+
+} // namespace
+
+class Job
+{
+public:
+ virtual ~Job() {}
+ virtual void Run() = 0;
+};
+
+template <class Job>
+class ThreadPool
+{
+public:
+ ThreadPool(size_t num)
+ : queue_(num * 4)
+ {
+ for (size_t i = 0; i < num; ++i)
+ threads_.push_back(std::move(std::thread(WorkerThread<Job>, &queue_)));
+ }
+
+ ~ThreadPool()
+ {
+ queue_.Close();
+ for (auto &t : threads_)
+ t.join();
+ }
+
+ void Do(Job *job)
+ {
+ queue_.Push(job);
+ }
+
+private:
+ Queue<Job *> queue_;
+ std::vector<std::thread> threads_;
+};
+
+#endif