aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/py/asyncmcd.py199
1 files changed, 191 insertions, 8 deletions
diff --git a/src/py/asyncmcd.py b/src/py/asyncmcd.py
index 0a4c0e6..244e716 100644
--- a/src/py/asyncmcd.py
+++ b/src/py/asyncmcd.py
@@ -8,6 +8,9 @@ import tornado.iostream
import imc.async
+import time
+from asyncdb import AsyncDB
+
class AsyncMCD:
def __init__(self):
def _conn():
@@ -27,7 +30,7 @@ class AsyncMCD:
self._recv_loop()
- def get(self,key):
+ def get(self,ori_key):
def _recv(opcode,status,opaque,cas,extra,key,value):
del self._opaque_map[opaque]
@@ -37,7 +40,7 @@ class AsyncMCD:
else:
flag, = struct.unpack('!I',extra)
if flag == self.TYPE_INT:
- ret, = struct.unpack('!Q',value)
+ ret = int(value)
elif flag == self.TYPE_BYTES:
ret = value
@@ -50,10 +53,10 @@ class AsyncMCD:
imc.async.ret(retid,ret)
- if not isinstance(key,str):
+ if not isinstance(ori_key,str):
raise TypeError
- key = bytes(key,'utf-8')
+ key = bytes(ori_key,'utf-8')
keylen = len(key)
opaque = self._get_opaque(_recv)
@@ -65,20 +68,75 @@ class AsyncMCD:
retid = imc.async.get_retid()
return imc.async.switch_top()
+ def mget(self,keys):
+ def _recv(opcode,status,opaque,cas,extra,key,value):
+ del self._opaque_map[opaque]
+
+ if status == 0:
+ flag, = struct.unpack('!I',extra)
+ if flag == self.TYPE_INT:
+ ret = int(value)
+
+ elif flag == self.TYPE_BYTES:
+ ret = value
+
+ elif flag == self.TYPE_STR:
+ ret = value.decode('utf-8')
+
+ elif flag == self.TYPE_JSON:
+ ret = json.loads(value.decode('utf-8'))
+
+ rets[key_opaqmap[opaque]] = value
+
+ for key in keys:
+ if not isinstance(key,str):
+ raise TypeError
+
+ rets = {}
+ key_opaqmap = {}
+ qkeys = keys[:-1]
+
+ for ori_key in qkeys:
+ key = bytes(ori_key,'utf-8')
+ keylen = len(key)
+
+ opaque = self._get_opaque(_recv)
+ key_opaqmap[opaque] = ori_key
+ header = self._request_header(0x09,keylen,0,0,keylen,opaque,0)
+ data = bytes(bytearray().join([header,key]))
+
+ self._stream.write(data)
+
+ ret = self.get(keys[-1])
+ if ret != None:
+ rets[keys[-1]] = ret
+
+ return rets
+
def set(self,key,value,expiration = 0):
+ return self._store(0x01,key,value,expiration)
+
+ def add(self,key,value,expiration = 0):
+ return self._store(0x02,key,value,expiration)
+
+ def replace(self,key,value,expiration = 0):
+ return self._store(0x03,key,value,expiration)
+
+ def _store(self,opcode,ori_key,value,expiration):
def _recv(opcode,status,opaque,cas,extra,key,value):
del self._opaque_map[opaque]
+
imc.async.ret(retid,status)
- if not isinstance(key,str):
+ if not isinstance(ori_key,str):
raise TypeError
- key = bytes(key,'utf-8')
+ key = bytes(ori_key,'utf-8')
keylen = len(key)
if isinstance(value,int):
value_type = self.TYPE_INT
- value = struct.pack('!Q',value)
+ value = bytes(str(value),'ascii')
elif isinstance(value,bytes):
value_type = self.TYPE_BYTES
@@ -97,7 +155,7 @@ class AsyncMCD:
extralen = len(extra)
opaque = self._get_opaque(_recv)
- header = self._request_header(0x01,keylen,extralen,0,extralen + keylen + valuelen,opaque,0)
+ header = self._request_header(opcode,keylen,extralen,0,extralen + keylen + valuelen,opaque,0)
data = bytes(bytearray().join([header,extra,key,value]))
self._stream.write(data)
@@ -105,6 +163,74 @@ class AsyncMCD:
retid = imc.async.get_retid()
return imc.async.switch_top()
+ def delete(self,ori_key):
+ def _recv(opcode,status,opaque,cas,extra,key,value):
+ del self._opaque_map[opaque]
+
+ imc.async.ret(retid,status)
+
+ if not isinstance(ori_key,str):
+ raise TypeError
+
+ key = bytes(ori_key,'utf-8')
+ keylen = len(key)
+
+ opaque = self._get_opaque(_recv)
+ header = self._request_header(0x04,keylen,0,0,keylen,opaque,0)
+ data = bytes(bytearray().join([header,key]))
+
+ self._stream.write(data)
+
+ retid = imc.async.get_retid()
+ return imc.async.switch_top()
+
+ def inc(self,key,value,initial = None,expiration = 0):
+ return self._count(0x05,key,value,initial,expiration)
+
+ def dec(self,key,value,initial = None,expiration = 0):
+ return self._count(0x06,key,value,initial,expiration)
+
+ def _count(self,opcode,ori_key,value,initial,expiration):
+ def _recv(opcode,status,opaque,cas,extra,key,value):
+ del self._opaque_map[opaque]
+
+ if status == 1:
+ raise KeyError(ori_key)
+
+ imc.async.ret(retid,struct.unpack('!Q',value))
+
+ if not isinstance(ori_key,str):
+ raise TypeError
+
+ if not isinstance(value,int):
+ raise TypeError
+
+ if initial != None and not isinstance(initial,int):
+ raise TypeError
+
+ key = bytes(ori_key,'utf-8')
+ keylen = len(key)
+
+ if initial == None:
+ extra = struct.pack('!qqI',value,0,0xFFFFFFFF)
+
+ else:
+ extra = struct.pack('!qqI',value,initial,expiration)
+
+ extralen = len(extra)
+
+ opaque = self._get_opaque(_recv)
+ header = self._request_header(opcode,keylen,extralen,0,extralen + keylen,opaque,0)
+ data = bytes(bytearray().join([header,extra,key]))
+
+ self._stream.write(data)
+
+ retid = imc.async.get_retid()
+ return imc.async.switch_top()
+
+ def _dec(self,key,value):
+ pass
+
def _get_opaque(self,data):
self._opaque_count += 1
self._opaque_map[self._opaque_count] = data
@@ -141,3 +267,60 @@ class AsyncMCD:
self._stream.read_bytes(totallen,___recvdata)
self._stream.read_bytes(24,__recv)
+
+
+@imc.async.caller
+def testmcd():
+ data = 23
+
+ st = time.perf_counter();
+
+ for i in range(0,256):
+ mc.set('bob_' + str(i),data)
+
+ et = time.perf_counter();
+ print(et - st)
+
+ st = time.perf_counter();
+
+ #for i in range(0,256):
+ # ret = mc.get('bob_' + str(i))
+
+ print(mc.dec('bob_0',10))
+
+ keys = []
+ for i in range(0,256):
+ keys.append('bob_' + str(i))
+
+ ret = mc.mget(keys)
+ print(len(ret))
+
+ et = time.perf_counter();
+ print(et - st)
+
+@imc.async.caller
+def testdb():
+ data = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
+ cur = db.cursor()
+
+ st = time.perf_counter();
+
+ for i in range(0,1024):
+ cur.execute('INSERT INTO "mcdtest" VALUES(%s,%s)',('bob2_' + str(i),data))
+
+ et = time.perf_counter();
+ print(et - st)
+
+ st = time.perf_counter();
+
+ for i in range(0,1024):
+ cur.execute('SELECT "value" FROM "mcdtest" WHERE "key"=%s',('bob2_' + str(i),))
+
+ et = time.perf_counter();
+ print(et - st)
+
+mc = AsyncMCD()
+db = AsyncDB('testdb','pzread','pz3655742')
+testmcd()
+
+tornado.ioloop.IOLoop.instance().start()