diff options
-rwxr-xr-x | pttbbs/daemon/brcstored/brcstored.py | 176 |
1 files changed, 146 insertions, 30 deletions
diff --git a/pttbbs/daemon/brcstored/brcstored.py b/pttbbs/daemon/brcstored/brcstored.py index 4ab7e43c..fc0bda41 100755 --- a/pttbbs/daemon/brcstored/brcstored.py +++ b/pttbbs/daemon/brcstored/brcstored.py @@ -8,13 +8,35 @@ brcstored: Board RC (brc) storage daemon. Provides a way to manage per-user BRC object into isolated database. """ +# Configuration of backends. + +# (Network) Use gevent of eventlet. +USE_GEVENT = True + +# (Database) Use KyotoCabinet or LevelDB. +USE_KYOTO = False + +#----------------------------------------------------------------------- + +if USE_GEVENT: + from gevent import monkey; monkey.patch_all() + import gevent + import gevent.server +else: + import eventlet + +if USE_KYOTO: + import kyotocabinet +else: + import leveldb + import StringIO import logging +import os import re -import sys -import leveldb -import eventlet import struct +import sys +import time _SERVER_ADDR = '127.0.0.1' @@ -22,36 +44,112 @@ _SERVER_PORT = 5133 _DB_PATH = '/home/bbs/brcstore/database' -def get_data(uid): - try: - return g_db.Get(uid) - except KeyError: - return None +# Performance counters + +class OperationPerf(object): + + def __init__(self, _type, _timeout=0.5): + self._type = _type + self._timeout = _timeout + + def __enter__(self): + self._clock = time.time() + + def __exit__(self, *args, **kargs): + delta = time.time() - self._clock + if delta > self._timeout: + sys.stderr.write("[%s %.3f]" % (self._type, delta)) + +class Perf(object): -def put_data(uid, blob): - g_db.Put(uid, blob) + def __init__(self): + self.read = 0 + self.write = 0 + self.req = 0 + self.clock = 0 + + def add_req(self): + if self.clock == 0: + self.clock = time.time() + self.req += 1 + + def report(self): + if self.req % 100 == 0: + sys.stderr.write( + '+' if self.read > self.write else + '-' if self.read < self.write else + '=') + self.read = self.write = 0 + if self.req % 7000 == 0: + sys.stderr.write("[%.2f req/s]%s\n" % + (self.req / (time.time() - self.clock), + time.strftime('%H:%M'))) + self.clock = time.time() + self.req = 0 + + def add_read(self): + self.read += 1 + + def add_write(self): + self.write += 1 + + +g_perf = Perf() def open_database(db_path): global g_db - # BRCv3 max size = 49152 (8192*3*2), so let's increase block size. - # LevelDB default I/O buffer size: R=8M, W=2M. - g_db = leveldb.LevelDB(db_path, block_size=49152, - block_cache_size=(16 * (2 << 20)), - write_buffer_size=(16 * (2 << 20))) + if USE_KYOTO: + g_db = kyotocabinet.DB() + if not g_db.open(db_path, + kyotocabinet.DB.OWRITER | kyotocabinet.DB.OCREATE): + sys.stderr.write("open error: " + str(g_db.error())) + sys.exit(1) + else: + # BRCv3 max size = 49152 (8192*3*2), so let's increase block size. + # LevelDB default I/O buffer size: R=8M, W=2M. + db = leveldb.LevelDB(db_path, + #block_size=49152, + # block_cache_size=(16 * (2 << 20)), + write_buffer_size=(32 * (2 << 20)), + ) + + class LevelDBWrapper(object): -def handle_request(sock, fd): + def __init__(self, db): + self.db = db + + def get(self, key): + try: + return self.db.Get(key) + except KeyError: + return None + + def set(self, key, value): + self.db.Put(key, value) + + g_db = LevelDBWrapper(db) + + +def handle_request(socket, _): # WRITE: 'w' + UID + '\n' + int32_t len, BYTE[len] + # Returns: NUL # READ: 'r' + UID + '\n' # Returns: int32_t len, BYTE[len] (len=-1 if UID does not exist) + fd = socket.makefile('rw') fmt_len = '@i' + g_perf.add_req() + g_perf.report() try: command = fd.read(1) - uid = fd.readline().strip() + uid = fd.readline() + uid = uid.strip() if command == 'r': - msg = get_data(uid) + g_perf.add_read() + with OperationPerf('R'): + msg = g_db.get(uid) if msg is None: fd.write(struct.pack(fmt_len, -1)) logging.info('Read : %s: (NOT FOUND)', uid) @@ -60,11 +158,18 @@ def handle_request(sock, fd): fd.write(msg) logging.info('Read : %s: size=%d', uid, len(msg)) elif command == 'w': + g_perf.add_write() msglen = struct.unpack(fmt_len, fd.read(struct.calcsize(fmt_len)))[0] msg = fd.read(msglen) + if len(msg) != msglen: + logging.warn('Write: incomplete: %d/%d, %s', + len(msg), msglen, uid) logging.info('Write: %s: size=%d', uid, len(msg)) - put_data(uid, msg) + with OperationPerf('W'): + g_db.set(uid, msg) + # Sync byte, anything should work. + fd.write(chr(0)) elif not command: raise ValueError('Incomplete request (no command).') else: @@ -74,7 +179,7 @@ def handle_request(sock, fd): finally: try: fd.close() - sock.close() + socket.close() except: pass @@ -88,17 +193,28 @@ def main(myname, argv): print "Usage: %s [db_path]" % myname exit(1) db_path = argv[0] if len(argv) > 0 else _DB_PATH - logging.warn("Serving at %s:%s [db:%s]...", _SERVER_ADDR, _SERVER_PORT, - db_path) + if USE_KYOTO: + db_path += '.kch' + logging.warn("Serving via %s at %s:%s [db:%s:%s][pid:%d]...", + "gevent" if USE_GEVENT else "eventlet", + _SERVER_ADDR, _SERVER_PORT, + 'KyotoCabinet' if USE_KYOTO else 'LevelDB', + db_path, os.getpid()) open_database(db_path) - server = eventlet.listen((_SERVER_ADDR, _SERVER_PORT)) - pool = eventlet.GreenPool() - while True: - try: - new_sock, address = server.accept() - pool.spawn_n(handle_request, new_sock, new_sock.makefile('rw')) - except (SystemExit, KeyboardInterrupt): - break + + if USE_GEVENT: + server = gevent.server.StreamServer( + (_SERVER_ADDR, _SERVER_PORT), handle_request) + server.serve_forever() + else: + server = eventlet.listen((_SERVER_ADDR, _SERVER_PORT)) + pool = eventlet.GreenPool() + while True: + try: + new_sock, address = server.accept() + pool.spawn_n(handle_request, new_sock, address) + except (SystemExit, KeyboardInterrupt): + break if __name__ == '__main__': |