diff options
-rw-r--r-- | src/py/asyncmcd.py | 199 |
1 files changed, 191 insertions, 8 deletions
diff --git a/src/py/asyncmcd.py b/src/py/asyncmcd.py index 0a4c0e6..244e716 100644 --- a/src/py/asyncmcd.py +++ b/src/py/asyncmcd.py @@ -8,6 +8,9 @@ import tornado.iostream import imc.async +import time +from asyncdb import AsyncDB + class AsyncMCD: def __init__(self): def _conn(): @@ -27,7 +30,7 @@ class AsyncMCD: self._recv_loop() - def get(self,key): + def get(self,ori_key): def _recv(opcode,status,opaque,cas,extra,key,value): del self._opaque_map[opaque] @@ -37,7 +40,7 @@ class AsyncMCD: else: flag, = struct.unpack('!I',extra) if flag == self.TYPE_INT: - ret, = struct.unpack('!Q',value) + ret = int(value) elif flag == self.TYPE_BYTES: ret = value @@ -50,10 +53,10 @@ class AsyncMCD: imc.async.ret(retid,ret) - if not isinstance(key,str): + if not isinstance(ori_key,str): raise TypeError - key = bytes(key,'utf-8') + key = bytes(ori_key,'utf-8') keylen = len(key) opaque = self._get_opaque(_recv) @@ -65,20 +68,75 @@ class AsyncMCD: retid = imc.async.get_retid() return imc.async.switch_top() + def mget(self,keys): + def _recv(opcode,status,opaque,cas,extra,key,value): + del self._opaque_map[opaque] + + if status == 0: + flag, = struct.unpack('!I',extra) + if flag == self.TYPE_INT: + ret = int(value) + + elif flag == self.TYPE_BYTES: + ret = value + + elif flag == self.TYPE_STR: + ret = value.decode('utf-8') + + elif flag == self.TYPE_JSON: + ret = json.loads(value.decode('utf-8')) + + rets[key_opaqmap[opaque]] = value + + for key in keys: + if not isinstance(key,str): + raise TypeError + + rets = {} + key_opaqmap = {} + qkeys = keys[:-1] + + for ori_key in qkeys: + key = bytes(ori_key,'utf-8') + keylen = len(key) + + opaque = self._get_opaque(_recv) + key_opaqmap[opaque] = ori_key + header = self._request_header(0x09,keylen,0,0,keylen,opaque,0) + data = bytes(bytearray().join([header,key])) + + self._stream.write(data) + + ret = self.get(keys[-1]) + if ret != None: + rets[keys[-1]] = ret + + return rets + def set(self,key,value,expiration = 0): + return self._store(0x01,key,value,expiration) + + def add(self,key,value,expiration = 0): + return self._store(0x02,key,value,expiration) + + def replace(self,key,value,expiration = 0): + return self._store(0x03,key,value,expiration) + + def _store(self,opcode,ori_key,value,expiration): def _recv(opcode,status,opaque,cas,extra,key,value): del self._opaque_map[opaque] + imc.async.ret(retid,status) - if not isinstance(key,str): + if not isinstance(ori_key,str): raise TypeError - key = bytes(key,'utf-8') + key = bytes(ori_key,'utf-8') keylen = len(key) if isinstance(value,int): value_type = self.TYPE_INT - value = struct.pack('!Q',value) + value = bytes(str(value),'ascii') elif isinstance(value,bytes): value_type = self.TYPE_BYTES @@ -97,7 +155,7 @@ class AsyncMCD: extralen = len(extra) opaque = self._get_opaque(_recv) - header = self._request_header(0x01,keylen,extralen,0,extralen + keylen + valuelen,opaque,0) + header = self._request_header(opcode,keylen,extralen,0,extralen + keylen + valuelen,opaque,0) data = bytes(bytearray().join([header,extra,key,value])) self._stream.write(data) @@ -105,6 +163,74 @@ class AsyncMCD: retid = imc.async.get_retid() return imc.async.switch_top() + def delete(self,ori_key): + def _recv(opcode,status,opaque,cas,extra,key,value): + del self._opaque_map[opaque] + + imc.async.ret(retid,status) + + if not isinstance(ori_key,str): + raise TypeError + + key = bytes(ori_key,'utf-8') + keylen = len(key) + + opaque = self._get_opaque(_recv) + header = self._request_header(0x04,keylen,0,0,keylen,opaque,0) + data = bytes(bytearray().join([header,key])) + + self._stream.write(data) + + retid = imc.async.get_retid() + return imc.async.switch_top() + + def inc(self,key,value,initial = None,expiration = 0): + return self._count(0x05,key,value,initial,expiration) + + def dec(self,key,value,initial = None,expiration = 0): + return self._count(0x06,key,value,initial,expiration) + + def _count(self,opcode,ori_key,value,initial,expiration): + def _recv(opcode,status,opaque,cas,extra,key,value): + del self._opaque_map[opaque] + + if status == 1: + raise KeyError(ori_key) + + imc.async.ret(retid,struct.unpack('!Q',value)) + + if not isinstance(ori_key,str): + raise TypeError + + if not isinstance(value,int): + raise TypeError + + if initial != None and not isinstance(initial,int): + raise TypeError + + key = bytes(ori_key,'utf-8') + keylen = len(key) + + if initial == None: + extra = struct.pack('!qqI',value,0,0xFFFFFFFF) + + else: + extra = struct.pack('!qqI',value,initial,expiration) + + extralen = len(extra) + + opaque = self._get_opaque(_recv) + header = self._request_header(opcode,keylen,extralen,0,extralen + keylen,opaque,0) + data = bytes(bytearray().join([header,extra,key])) + + self._stream.write(data) + + retid = imc.async.get_retid() + return imc.async.switch_top() + + def _dec(self,key,value): + pass + def _get_opaque(self,data): self._opaque_count += 1 self._opaque_map[self._opaque_count] = data @@ -141,3 +267,60 @@ class AsyncMCD: self._stream.read_bytes(totallen,___recvdata) self._stream.read_bytes(24,__recv) + + +@imc.async.caller +def testmcd(): + data = 23 + + st = time.perf_counter(); + + for i in range(0,256): + mc.set('bob_' + str(i),data) + + et = time.perf_counter(); + print(et - st) + + st = time.perf_counter(); + + #for i in range(0,256): + # ret = mc.get('bob_' + str(i)) + + print(mc.dec('bob_0',10)) + + keys = [] + for i in range(0,256): + keys.append('bob_' + str(i)) + + ret = mc.mget(keys) + print(len(ret)) + + et = time.perf_counter(); + print(et - st) + +@imc.async.caller +def testdb(): + data = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' + cur = db.cursor() + + st = time.perf_counter(); + + for i in range(0,1024): + cur.execute('INSERT INTO "mcdtest" VALUES(%s,%s)',('bob2_' + str(i),data)) + + et = time.perf_counter(); + print(et - st) + + st = time.perf_counter(); + + for i in range(0,1024): + cur.execute('SELECT "value" FROM "mcdtest" WHERE "key"=%s',('bob2_' + str(i),)) + + et = time.perf_counter(); + print(et - st) + +mc = AsyncMCD() +db = AsyncDB('testdb','pzread','pz3655742') +testmcd() + +tornado.ioloop.IOLoop.instance().start() |