From 178e98227fcbd64586438ad6430fffd50419ee9d Mon Sep 17 00:00:00 2001 From: robertabcd Date: Sat, 11 Oct 2014 10:25:07 +0000 Subject: boardd: multi-thread. git-svn-id: http://opensvn.csie.org/pttbbs/trunk@6082 63ad8ddf-47c3-0310-b6dd-a9e9d9715204 --- pttbbs/daemon/boardd/Makefile | 8 +++-- pttbbs/daemon/boardd/boardd.c | 72 +++++++++++++++++++++++++++++++------ pttbbs/daemon/boardd/queue.hpp | 65 +++++++++++++++++++++++++++++++++ pttbbs/daemon/boardd/threadpool.cpp | 53 +++++++++++++++++++++++++++ pttbbs/daemon/boardd/threadpool.h | 18 ++++++++++ pttbbs/daemon/boardd/threadpool.hpp | 59 ++++++++++++++++++++++++++++++ 6 files changed, 262 insertions(+), 13 deletions(-) create mode 100644 pttbbs/daemon/boardd/queue.hpp create mode 100644 pttbbs/daemon/boardd/threadpool.cpp create mode 100644 pttbbs/daemon/boardd/threadpool.h create mode 100644 pttbbs/daemon/boardd/threadpool.hpp diff --git a/pttbbs/daemon/boardd/Makefile b/pttbbs/daemon/boardd/Makefile index d14cd43d..d84d5b5d 100644 --- a/pttbbs/daemon/boardd/Makefile +++ b/pttbbs/daemon/boardd/Makefile @@ -4,20 +4,22 @@ SRCROOT= ../.. .include "$(SRCROOT)/pttbbs.mk" PROG= boardd -SRCS= boardd.c server.c +SRCS= boardd.c server.c threadpool.cpp MAN= UTILDIR= $(SRCROOT)/util UTILOBJ= $(UTILDIR)/util_var.o -CFLAGS+= $(LIBEVENT_CFLAGS) +CXXFLAGS+= -std=c++11 +CFLAGS+= $(LIBEVENT_CFLAGS) -DSERVER_USE_PTHREADS LDFLAGS+= $(LIBEVENT_LIBS_L) LDADD:= $(UTILOBJ) \ $(SRCROOT)/common/bbs/libcmbbs.a \ $(SRCROOT)/common/sys/libcmsys.a \ $(SRCROOT)/common/osdep/libosdep.a \ - $(LIBEVENT_LIBS_l) \ + $(LIBEVENT_LIBS_l) -levent_pthreads \ + -lpthread -lstdc++ \ $(LDADD) .include diff --git a/pttbbs/daemon/boardd/boardd.c b/pttbbs/daemon/boardd/boardd.c index 1d63eab0..e44dbb17 100644 --- a/pttbbs/daemon/boardd/boardd.c +++ b/pttbbs/daemon/boardd/boardd.c @@ -27,10 +27,19 @@ #include #include "server.h" +#include "threadpool.h" #define CONVERT_TO_UTF8 +#define BOARDD_MT #define DEFAULT_ARTICLE_LIST 20 +#define NUM_THREADS 8 + +// Globals + +#ifdef BOARDD_MT +threadpool_t g_threadpool; +#endif // helper function @@ -652,11 +661,45 @@ static const struct { {NULL, cmd_unknown} }; +static void +process_line(struct bufferevent *bev, void *ctx, char *line) +{ + char **argv; + int argc = split_args(line, &argv); + int i; + + for (i = 0; cmdlist[i].cmd; i++) + if (evutil_ascii_strcasecmp(line, cmdlist[i].cmd) == 0) + break; + + (cmdlist[i].func)(bev, ctx, argc - 1, argv + 1); + + free(argv); + free(line); +} + +#ifdef BOARDD_MT + +typedef struct { + struct bufferevent *bev; + void *ctx; + char *line; +} process_line_ctx; + +void +process_line_job_func(void *ctx) +{ + process_line_ctx *cx = (process_line_ctx *) ctx; + process_line(cx->bev, cx->ctx, cx->line); + bufferevent_enable(cx->bev, EV_READ); + free(cx); +} + +#endif + void client_read_cb(struct bufferevent *bev, void *ctx) { - int argc, i; - char **argv; size_t len; struct evbuffer *input = bufferevent_get_input(bev); char *line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF); @@ -664,16 +707,21 @@ client_read_cb(struct bufferevent *bev, void *ctx) if (!line) return; - argc = split_args(line, &argv); - - for (i = 0; cmdlist[i].cmd; i++) - if (evutil_ascii_strcasecmp(line, cmdlist[i].cmd) == 0) - break; +#ifdef BOARDD_MT + process_line_ctx *plc = (process_line_ctx *) malloc(sizeof(process_line_ctx)); + if (!plc) + return; - (cmdlist[i].func)(bev, ctx, argc - 1, argv + 1); + // One request at a time. + bufferevent_disable(bev, EV_READ); - free(argv); - free(line); + plc->bev = bev; + plc->ctx = ctx; + plc->line = line; + threadpool_do(g_threadpool, threadpool_job_new(process_line_job_func, plc)); +#else + process_line(bev, ctx, line); +#endif } void @@ -683,6 +731,10 @@ setup_program() setgid(BBSGID); chdir(BBSHOME); +#ifdef BOARDD_MT + g_threadpool = threadpool_new(NUM_THREADS); +#endif + attach_SHM(); } diff --git a/pttbbs/daemon/boardd/queue.hpp b/pttbbs/daemon/boardd/queue.hpp new file mode 100644 index 00000000..281b444c --- /dev/null +++ b/pttbbs/daemon/boardd/queue.hpp @@ -0,0 +1,65 @@ +// Copyright (c) 2014, Robert Wang +// The MIT License. +#ifndef _QUEUE_H_ +#define _QUEUE_H_ +#include +#include +#include + +template +class Queue +{ +public: + Queue(size_t max = 0) + : eof_(false) + , max_(max) + {} + + void Push(T elem) + { + std::unique_lock lk(mu_); + while (true) { + if (eof_) + throw std::runtime_error("Queue closed"); + if (max_ && qu_.size() >= max_) { + cv_poped_.wait(lk); + continue; + } + qu_.push(elem); + cv_pushed_.notify_one(); + return; + } + } + + bool Pop(T &elem) + { + std::unique_lock lk(mu_); + while (qu_.empty()) { + if (eof_) + return false; + cv_pushed_.wait(lk); + } + elem = qu_.front(); + qu_.pop(); + cv_poped_.notify_one(); + return true; + } + + void Close() + { + std::unique_lock lk(mu_); + eof_ = true; + cv_pushed_.notify_all(); + cv_poped_.notify_all(); + } + +private: + std::queue qu_; + std::mutex mu_; + std::condition_variable cv_pushed_; + std::condition_variable cv_poped_; + bool eof_; + size_t max_; +}; + +#endif diff --git a/pttbbs/daemon/boardd/threadpool.cpp b/pttbbs/daemon/boardd/threadpool.cpp new file mode 100644 index 00000000..cc15505c --- /dev/null +++ b/pttbbs/daemon/boardd/threadpool.cpp @@ -0,0 +1,53 @@ +// Copyright (c) 2014, Robert Wang +// The MIT License. +#include "threadpool.hpp" +extern "C" { +#include "threadpool.h" +} + +namespace { + +class FuncJob : public Job +{ +public: + using Func = void (*)(void *); + FuncJob(Func f, void *ctx) : f(f), ctx(ctx) {} + void Run() { f(ctx); } +private: + Func f; + void *ctx; +}; + +} // namespace + +struct threadpool_ctx { + ThreadPool *pool; +}; + +threadpool_t threadpool_new(size_t num) +{ + threadpool_t p = new threadpool_ctx; + p->pool = new ThreadPool(num); + return p; +} + +void threadpool_free(threadpool_t p) +{ + delete p->pool; + delete p; +} + +void threadpool_do(threadpool_t p, threadpool_job_t job) +{ + p->pool->Do(reinterpret_cast(job)); +} + +threadpool_job_t threadpool_job_new(threadpool_job_func func, void *ctx) +{ + return reinterpret_cast(new FuncJob(func, ctx)); +} + +void threadpool_job_free(threadpool_job_t job) +{ + delete reinterpret_cast(job); +} diff --git a/pttbbs/daemon/boardd/threadpool.h b/pttbbs/daemon/boardd/threadpool.h new file mode 100644 index 00000000..9d589623 --- /dev/null +++ b/pttbbs/daemon/boardd/threadpool.h @@ -0,0 +1,18 @@ +// Copyright (c) 2014, Robert Wang +// The MIT License. +#ifndef _THREADPOOL_H_ +# define _THREADPOOL_H_ + +struct threadpool_ctx; +typedef struct threadpool_ctx *threadpool_t; +typedef void *threadpool_job_t; + +threadpool_t threadpool_new(size_t num); +void threadpool_free(threadpool_t p); +void threadpool_do(threadpool_t p, threadpool_job_t job); + +typedef void (*threadpool_job_func)(void *ctx); +threadpool_job_t threadpool_job_new(threadpool_job_func func, void *ctx); +void threadpool_job_free(threadpool_job_t job); + +#endif diff --git a/pttbbs/daemon/boardd/threadpool.hpp b/pttbbs/daemon/boardd/threadpool.hpp new file mode 100644 index 00000000..bcba42a1 --- /dev/null +++ b/pttbbs/daemon/boardd/threadpool.hpp @@ -0,0 +1,59 @@ +// Copyright (c) 2014, Robert Wang +// The MIT License. +#ifndef _THREADPOOL_HPP +# define _THREADPOOL_HPP +#include +#include "queue.hpp" + +namespace { + +template +void WorkerThread(Queue *q) +{ + while (true) { + Job *job; + if (!q->Pop(job)) + return; + job->Run(); + delete job; + } +} + +} // namespace + +class Job +{ +public: + virtual ~Job() {} + virtual void Run() = 0; +}; + +template +class ThreadPool +{ +public: + ThreadPool(size_t num) + : queue_(num * 4) + { + for (size_t i = 0; i < num; ++i) + threads_.push_back(std::move(std::thread(WorkerThread, &queue_))); + } + + ~ThreadPool() + { + queue_.Close(); + for (auto &t : threads_) + t.join(); + } + + void Do(Job *job) + { + queue_.Push(job); + } + +private: + Queue queue_; + std::vector threads_; +}; + +#endif -- cgit v1.2.3