From c00472c8bfbcc86780ad25f7be687815432c4a37 Mon Sep 17 00:00:00 2001 From: pzread Date: Sun, 2 Jun 2013 02:19:56 +0800 Subject: Fix async bug. Improve fault tolerance --- src/py/asyncdb.py | 26 ++++---- src/py/backend_server.py | 50 ++++++++------ src/py/center_server.py | 14 ++-- src/py/imc/async.py | 65 ++++++++++++------- src/py/imc/auth.py | 2 +- src/py/imc/proxy.py | 165 +++++++++++++++++------------------------------ src/py/tojauth.py | 4 +- 7 files changed, 154 insertions(+), 172 deletions(-) (limited to 'src') diff --git a/src/py/asyncdb.py b/src/py/asyncdb.py index 651dd73..1e2bf7a 100644 --- a/src/py/asyncdb.py +++ b/src/py/asyncdb.py @@ -19,7 +19,7 @@ class RestrictCursor: def __iter__(self): return self._cur - def execute(self,sql,param = None,_grid = None): + def execute(self,sql,param = None): self._db.execute(self._cur,sql,param) self.arraysize = self._cur.arraysize @@ -201,14 +201,13 @@ class AsyncDB: def cursor(self): return RestrictCursor(self,self._cursor()) - @imc.async.callee - def execute(self,cur,sql,param = None,_grid = None): + def execute(self,cur,sql,param = None): fd = cur.connection.fileno() - self._pendoper_fdmap[fd].append((self.OPER_EXECUTE,(cur,sql,param),_grid)) + self._pendoper_fdmap[fd].append((self.OPER_EXECUTE,(cur,sql,param),imc.async.get_retid())) self._ioloop.add_callback(self._oper_dispatch,fd,0) - imc.async.switchtop() + imc.async.switch_top() def begin_transaction(self): if len(self._free_connpool) > 0: @@ -229,18 +228,17 @@ class AsyncDB: else: self._close_conn(conn) - @imc.async.callee - def _cursor(self,conn = None,_grid = None): + def _cursor(self,conn = None): if conn != None: fd = conn.fileno() else: fd = self._share_connpool[random.randrange(len(self._share_connpool))].fileno() - self._pendoper_fdmap[fd].append((self.OPER_CURSOR,None,_grid)) + self._pendoper_fdmap[fd].append((self.OPER_CURSOR,None,imc.async.get_retid())) self._ioloop.add_callback(self._oper_dispatch,fd,0) - cur = imc.async.switchtop() + cur = imc.async.switch_top() return cur def _create_conn(self): @@ -302,7 +300,7 @@ class AsyncDB: else: try: - oper,data,grid = self._pendoper_fdmap[fd].popleft() + oper,data,retid = self._pendoper_fdmap[fd].popleft() except IndexError: return @@ -310,20 +308,20 @@ class AsyncDB: if oper == self.OPER_CURSOR: def _ret_cursor(err = None): if err == None: - imc.async.retcall(grid,conn.cursor()) + imc.async.ret(retid,conn.cursor()) else: - imc.async.retcall(grid,err = err) + imc.async.ret(retid,err = err) self._opercallback_fdmap[fd] = _ret_cursor elif oper == self.OPER_EXECUTE: def _ret_execute(err = None): if err == None: - imc.async.retcall(grid,None) + imc.async.ret(retid) else: - imc.async.retcall(grid,err = err) + imc.async.ret(retid,err = err) cur,sql,param = data diff --git a/src/py/backend_server.py b/src/py/backend_server.py index 234f8b2..edfee8d 100644 --- a/src/py/backend_server.py +++ b/src/py/backend_server.py @@ -103,8 +103,8 @@ class BackendWorker(tornado.tcpserver.TCPServer): #imc_register_call('','test_dsta',self._test_dsta) time.sleep(1) - if int(self._linkid) == 2: - self._test_call(None,'9') + #if int(self._linkid) == 2: + self._test_call(None,'9') sock_ip,sock_port = self.sock_addr netio.send_pack(stream,bytes(json.dumps({ @@ -122,9 +122,9 @@ class BackendWorker(tornado.tcpserver.TCPServer): def _conn_linkid(self,linkid): def __handle_pend(conn): - pends = self._pend_mainconn_linkidmap.pop(worker_linkid) - for gr in pends: - gr.switch(conn) + retids = self._pend_mainconn_linkidmap.pop(worker_linkid) + for retid in retids: + imc.async.ret(retid,conn) def __conn_cb(): conn = Proxy.instance.get_conn(worker_linkid) @@ -169,11 +169,11 @@ class BackendWorker(tornado.tcpserver.TCPServer): return conn elif worker_linkid in self._pend_mainconn_linkidmap: - self._pend_mainconn_linkidmap[worker_linkid].append(imc.async.current()) - return imc.async.switchtop() + self._pend_mainconn_linkidmap[worker_linkid].append(imc.async.get_retid()) + return imc.async.switch_top() else: - self._pend_mainconn_linkidmap[worker_linkid] = [imc.async.current()] + self._pend_mainconn_linkidmap[worker_linkid] = [imc.async.get_retid()] sock_addr = (ret['sock_ip'],ret['sock_port']) @@ -181,7 +181,7 @@ class BackendWorker(tornado.tcpserver.TCPServer): main_stream.set_close_callback(lambda conn : __handle_pend(None)) main_stream.connect(__conn_cb) - return imc.async.switchtop() + return imc.async.switch_top() def _add_pend_filestream(self,filekey,callback): self._pend_filestream_filekeymap[filekey] = tornado.stack_context.wrap(callback) @@ -208,9 +208,9 @@ class BackendWorker(tornado.tcpserver.TCPServer): if self._linkid > linkid: __send_back(True) - pends = self._pend_mainconn_linkidmap.pop(linkid) - for callback in pends: - callback(conn) + retids = self._pend_mainconn_linkidmap.pop(linkid) + for retid in retids: + imc.async.ret(retid,conn) else: __send_back(False) @@ -225,20 +225,32 @@ class BackendWorker(tornado.tcpserver.TCPServer): @imc.async.caller def _test_call(self,iden,param): - param = '3' - fileresult = Proxy.instance.sendfile('/backend/' + param + '/','archlinux-2013.05.01-dual.iso') + param = '6' - dst = '/backend/' + param + '/' - ret = imc_call(self._idendesc,dst,'test_dst',fileresult.filekey) - print(fileresult.wait()) + pend = [] + for i in range(0,8): + if str((i % 8) + 2) == self._linkid: + continue + + fileres = Proxy.instance.sendfile('/backend/' + str((i % 8) + 2) + '/','archlinux-2013.05.01-dual.iso') + + dst = '/backend/' + str((i % 8) + 2) + '/' + ret = imc_call(self._idendesc,dst,'test_dst',fileres.filekey) + + pend.append(fileres) + + for p in pend: + print(p.wait()) + + print(self._linkid) @imc.async.caller def _test_dst(self,iden,param): #stat,ret = imc_call(self._idendesc,'/backend/' + self._linkid + '/','test_dsta',param) #return ret + ' Too' - fileresult = Proxy.instance.recvfile(param,'data') - print(fileresult.wait()) + fileres = Proxy.instance.recvfile(param,'data') + #print(fileres.wait()) return 'ok' diff --git a/src/py/center_server.py b/src/py/center_server.py index 2ad6dfd..0b32610 100644 --- a/src/py/center_server.py +++ b/src/py/center_server.py @@ -139,16 +139,16 @@ class CenterServer(tornado.tcpserver.TCPServer): linkid = param try: - #worker = self._worker_linkidmap[linkid] + worker = self._worker_linkidmap[linkid] - a = int(iden['linkid']) - b = int(linkid) + #a = int(iden['linkid']) + #b = int(linkid) - if b > a: - worker = self._worker_linkidmap[str(a + 1)] + #if b > a: + # worker = self._worker_linkidmap[str(a + 1)] - else: - worker = self._worker_linkidmap[str(a - 1)] + #else: + # worker = self._worker_linkidmap[str(a - 1)] if iden['linkclass'] != 'client': sock_ip,sock_port = worker.sock_addr diff --git a/src/py/imc/async.py b/src/py/imc/async.py index cac8d23..8934ee0 100644 --- a/src/py/imc/async.py +++ b/src/py/imc/async.py @@ -1,18 +1,21 @@ import traceback +import uuid +import ssl +from Crypto.Hash import SHA512 from greenlet import greenlet from imc import auth -gr_waitmap = {} +gr_idmap = {} +ret_idmap = {} gr_main = greenlet.getcurrent() -def current(): - return greenlet.getcurrent() - -def switchtop(): +def switch_top(): global gr_main + assert greenlet.getcurrent() != gr_main + old_iden = auth.current_iden auth.current_iden = None @@ -22,29 +25,27 @@ def switchtop(): return result -def callee(f): - def wrapper(*args,**kwargs): - kwargs['_grid'] = str(id(greenlet.getcurrent())) - return f(*args,**kwargs) - - return wrapper - def caller(f): def wrapper(*args,**kwargs): global gr_main - global gr_waitmap + global gr_idmap + global ret_idmap def _call(*args,**kwargs): ret = f(*args,**kwargs) - del gr_waitmap[grid] + retids = gr_idmap[grid] + for retid in retids: + del ret_idmap[retid] + + del gr_idmap[grid] return (True,ret) try: gr = greenlet(_call) - grid = str(id(gr)) + grid = id(gr) + gr_idmap[grid] = set() old_iden = auth.current_iden - gr_waitmap[grid] = (gr,old_iden) result = gr.switch(*args,**kwargs) auth.current_iden = old_iden @@ -64,14 +65,35 @@ def caller(f): return wrapper -def retcall(grid,value = None,err = None): - global gr_waitmap +def get_retid(): + global gr_idmap + global ret_idmap + + gr = greenlet.getcurrent() + grid = id(gr) + retid = SHA512.new(uuid.uuid1().bytes + ssl.RAND_bytes(64)).hexdigest() + + gr_idmap[grid].add(retid) + ret_idmap[retid] = gr + + return retid + +def ret(retid,value = None,err = None): + global gr_main + global gr_idmap + global ret_idmap + + assert greenlet.getcurrent() == gr_main try: - gr,iden = gr_waitmap[grid] + gr = ret_idmap.pop(retid) + gr_idmap[id(gr)].remove(retid) + + except KeyError: + return + try: old_iden = auth.current_iden - auth.current_iden = iden if err == None: gr.switch(value) @@ -81,7 +103,6 @@ def retcall(grid,value = None,err = None): auth.current_iden = old_iden - except Exception as err: + except TypeError as err: traceback.print_stack() print(err) - pass diff --git a/src/py/imc/auth.py b/src/py/imc/auth.py index 16c12d9..08e1417 100644 --- a/src/py/imc/auth.py +++ b/src/py/imc/auth.py @@ -3,7 +3,7 @@ import json import binascii from Crypto.PublicKey import RSA -from Crypto.Hash import SHA,SHA512 +from Crypto.Hash import SHA512 from Crypto.Signature import PKCS1_v1_5 current_iden = None diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py index db23c8a..2a25323 100755 --- a/src/py/imc/proxy.py +++ b/src/py/imc/proxy.py @@ -2,7 +2,9 @@ import json import uuid import os import datetime +import ssl +from Crypto.Hash import SHA512 import tornado.ioloop import tornado.stack_context @@ -38,7 +40,7 @@ class Connection: class FileResult(): def __init__(self,filekey): self.filekey = filekey - self._gr = None + self._retid = None self._result = None def ret_result(self,res): @@ -46,13 +48,13 @@ class FileResult(): return self._result = res - if self._gr != None: - self._gr.switch() + if self._retid != None: + async.ret(self._retid) def wait(self): if self._result == None: - self._gr = async.current() - async.switchtop() + self._retid = async.get_retid() + async.switch_top() return self._result @@ -81,9 +83,6 @@ class Proxy: self._info_filekeymap = {} - self._check_waitcaller_timer = tornado.ioloop.PeriodicCallback(self._check_waitcaller,1000) - self._check_waitcaller_timer.start() - Proxy.instance = self self.register_call('imc/','pend_recvfile',self._pend_recvfile) @@ -112,30 +111,16 @@ class Proxy: del conn.link_linkidmap[linkid] def del_conn(self,conn): - wait_map = self._conn_retidmap[conn.linkid] - waits = wait_map.items() - wait_ret = [] - for retid,wait in waits: - wait_ret.append((wait['caller_linkid'],retid)) - - for linkid,retid in wait_ret: - self._ret_call(linkid,retid,(False,'Eclose')) - - fail_map = self._conn_filekeymap[conn.linkid] - fails = fail_map.values() - fail_cb = [] - for callback in fails: - fail_cb.append(callback) - - for callback in fail_cb: + callbacks = list(self._conn_retidmap[conn.linkid].values()) + for callback in callbacks: + callback((False,'Eclose')) + + callbacks = list(self._conn_filekeymap[conn.linkid].values()) + for callback in callbacks: callback('Eclose') - linkids = conn.link_linkidmap.keys() - link_del = [] + linkids = list(conn.link_linkidmap.keys()) for linkid in linkids: - link_del.append(linkid) - - for linkid in link_del: self.unlink_conn(linkid) del self._conn_linkidmap[conn.linkid] @@ -152,20 +137,11 @@ class Proxy: def register_call(self,path,func_name,func): self._call_pathmap[''.join([path,func_name])] = func - def call(self,caller_grid,timeout,idendesc,dst,func_name,param): - caller_retid = ''.join([self._linkid,'/',caller_grid]) - return self._route_call(None,caller_retid,timeout,idendesc,dst,func_name,param) + def call(self,timeout,idendesc,dst,func_name,param): + return self._route_call(None,async.get_retid(),timeout,idendesc,dst,func_name,param) def sendfile(self,dst_link,filepath): - @async.callee - def _call(_grid): - self.call(_grid, - 10000, - self._idendesc, - dst_link + 'imc/','pend_recvfile', - {'filekey':filekey,'filesize':filesize}) - - filekey = str(uuid.uuid1()) + filekey = SHA512.new(uuid.uuid1().bytes + ssl.RAND_bytes(64)).hexdigest() filesize = os.stat(filepath).st_size fileresult = FileResult(filekey) @@ -177,7 +153,7 @@ class Proxy: 'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),lambda : self._ret_sendfile('Etimeout')) } - _call() + self.call(10000,self._idendesc,dst_link + 'imc/','pend_recvfile',{'filekey':filekey,'filesize':filesize}) return fileresult @@ -193,7 +169,7 @@ class Proxy: in_conn.abort_file(filekey) self._send_msg_abortfile(in_conn,filekey,err) - self._ret_sendfile(filekey,err) + self._ioloop.add_callback(self._ret_sendfile,filekey,err) try: info = self._info_filekeymap[filekey] @@ -213,14 +189,6 @@ class Proxy: return info['fileresult'] def rejectfile(self,filekey): - @async.callee - def _call(_grid): - self.call(_grid, - 10000, - self._idendesc, - dst_link + 'imc/','reject_sendfile', - {'filekey':filekey}) - try: info = self._info_filekeymap.pop(filekey) @@ -228,14 +196,18 @@ class Proxy: return dst_link = ''.join(['/',info['src_linkclass'],'/',info['src_linkid'],'/']) - _call() + self.call(10000,self._idendesc,dst_link + 'imc/','reject_sendfile',{'filekey':filekey}) def _route_call(self,in_conn,caller_retid,timeout,idendesc,dst,func_name,param): def __add_wait_caller(in_linkid): - self._conn_retidmap[in_linkid][caller_retid] = { - 'caller_linkid':caller_linkid, - 'timeout':timeout - } + def ___call(result): + self._ioloop.remove_timeout(timer) + self._ret_call(caller_linkid,caller_retid,result) + + callback = tornado.stack_context.wrap(___call) + timer = self._ioloop.add_timeout(datetime.timedelta(milliseconds = timeout),lambda : callback(('False','Etimeout'))) + + self._conn_retidmap[in_linkid][caller_retid] = callback def __ret(result): if caller_linkid == self._linkid: @@ -263,7 +235,6 @@ class Proxy: dst_path = dst_part[3] caller_linkid = iden['linkid'] - assert caller_retid.split('/',1)[0] == caller_linkid if dst_linkid == self._linkid: __add_wait_caller(self._linkid) @@ -290,7 +261,8 @@ class Proxy: __add_wait_caller(conn.linkid) self._send_msg_call(conn,caller_retid,timeout,idendesc,dst,func_name,param) - result = async.switchtop() + result = async.switch_top() + del self._conn_retidmap[conn.linkid][caller_retid] return __ret(result) @@ -308,8 +280,7 @@ class Proxy: self._send_msg_ret(conn,caller_linkid,caller_retid,result) if caller_linkid == self._linkid: - grid = caller_retid.split('/',1)[1] - async.retcall(grid,result) + async.ret(caller_retid,result) else: __ret_remote() @@ -326,7 +297,7 @@ class Proxy: out_conn.abort_file(filekey) self._send_msg_abortfile(out_conn,filekey,err) - self._ret_sendfile(filekey,err) + self._ioloop.add_callback(self._ret_sendfile,filekey,err) def __bridge_fail_cb(err): try: @@ -352,22 +323,17 @@ class Proxy: if src_linkid == self._linkid: try: info = self._info_filekeymap[filekey] - assert info['filesize'] == filesize + if info['filesize'] != filesize: + self._ioloop.add_callback(self._ret_sendfile,filekey,'Efilesize') except KeyError: - self._ret_sendfile(filekey,'Enoexist') - return - - except AssertionError: - self._ret_sendfile(filekey,'Efilesize') + self._ioloop.add_callback(self._ret_sendfile,filekey,'Enoexist') return self._add_wait_filekey(filekey,filesize,None,out_conn,__send_fail_cb) out_conn.send_file(filekey,info['filepath'],lambda : self._ret_sendfile(filekey)) else: - print('test start') - in_conn = self._request_conn(src_linkid) self._add_wait_filekey(filekey,filesize,in_conn,out_conn,__bridge_fail_cb) @@ -375,6 +341,20 @@ class Proxy: in_conn.recv_filedata(filekey,filesize,send_fn) self._send_msg_sendfile(in_conn,src_linkid,filekey,filesize) + + def _add_wait_filekey(self,filekey,filesize,in_conn,out_conn,fail_callback): + def __call(err): + self._ioloop.remove_timeout(timer) + fail_callback(err) + + callback = tornado.stack_context.wrap(__call) + timer = self._ioloop.add_timeout(datetime.timedelta(milliseconds = filesize),lambda : callback('Etimeout')) + + if in_conn != None: + self._conn_filekeymap[in_conn.linkid][filekey] = callback + + if out_conn != None: + self._conn_filekeymap[out_conn.linkid][filekey] = callback def _ret_sendfile(self,filekey,err = None): try: @@ -387,10 +367,10 @@ class Proxy: fileresult = info['fileresult'] if err == None: - self._ioloop.add_callback(lambda : fileresult.ret_result('Success')) + fileresult.ret_result('Success') else: - self._ioloop.add_callback(lambda : fileresult.ret_result(err)) + fileresult.ret_result(err) def _request_conn(self,linkid): try: @@ -404,6 +384,10 @@ class Proxy: return conn + def _conn_close_cb(self,conn): + self.del_conn(conn) + print('connection close') + def _recv_dispatch(self,conn,data): msg = json.loads(data.decode('utf-8')) msg_type = msg['type'] @@ -420,38 +404,6 @@ class Proxy: elif msg_type == self.MSGTYPE_ABORTFILE: self._recv_msg_abortfile(conn,msg) - def _conn_close_cb(self,conn): - self.del_conn(conn) - print('connection close') - - def _add_wait_filekey(self,filekey,filesize,in_conn,out_conn,fail_callback): - def __call(err): - self._ioloop.remove_timeout(timer) - fail_callback(err) - - callback = tornado.stack_context.wrap(__call) - timer = self._ioloop.add_timeout(datetime.timedelta(milliseconds = filesize),lambda : callback('Etimeout')) - - if in_conn != None: - self._conn_filekeymap[in_conn.linkid][filekey] = callback - - if out_conn != None: - self._conn_filekeymap[out_conn.linkid][filekey] = callback - - def _check_waitcaller(self): - wait_maps = self._conn_retidmap.values() - for wait_map in wait_maps: - waits = wait_map.items() - wait_ret = [] - for retid,wait in waits: - wait['timeout'] -= 1000 - - if wait['timeout'] <= 0: - wait_ret.append((wait['caller_linkid'],retid)) - - for linkid,retid in wait_ret: - self._ret_call(linkid,retid,(False,'Etimeout')) - def _send_msg_call(self,conn,caller_retid,timeout,idendesc,dst,func_name,param): msg = { 'type':self.MSGTYPE_CALL, @@ -558,11 +510,10 @@ class Proxy: @async.caller def _reject_sendfile(self,iden,param): filekey = param['filekey'] - self._ret_sendfile(filekey,'Ereject') + self._ioloop.add_callback(self._ret_sendfile,filekey,'Ereject') -@async.callee -def imc_call(idendesc,dst,func_name,param,_grid): - return Proxy.instance.call(_grid,1000000,idendesc,dst,func_name,param) +def imc_call(idendesc,dst,func_name,param): + return Proxy.instance.call(1000000,idendesc,dst,func_name,param) def imc_call_async(idendesc,dst,func_name,param,callback = None): @async.caller diff --git a/src/py/tojauth.py b/src/py/tojauth.py index 0f26d15..11877ff 100644 --- a/src/py/tojauth.py +++ b/src/py/tojauth.py @@ -79,13 +79,13 @@ class TOJAuth(Auth): return wrapper - def create_access(self): + def create_access(self, owner_idenid): self.check_access(self.auth_accessid, self.ACCESS_EXECUTE)(0) cur = self.db.cursor() sqlstr = ('INSERT INTO "ACCESS" ("owner_idenid") VALUES (%s) ' 'RETURNING "accessid";') - sqlarr = (self.current_iden['idenid'], ) + sqlarr = (owner_idenid, ) cur.execute(sqlstr, sqlarr) for data in cur: -- cgit v1.2.3