diff options
author | pzread <netfirewall@gmail.com> | 2013-05-22 01:42:49 +0800 |
---|---|---|
committer | pzread <netfirewall@gmail.com> | 2013-05-22 01:42:49 +0800 |
commit | e2a22bcdb9cc965dfe0f6e9b53e85b61261dcb8a (patch) | |
tree | 360e35416a13c37189e8d579b39aa3e9aad0a907 /src/py/imc/proxy.py | |
parent | dab619a7b87ba07fbaa390b5005cc92369f9850f (diff) | |
download | taiwan-online-judge-e2a22bcdb9cc965dfe0f6e9b53e85b61261dcb8a.tar taiwan-online-judge-e2a22bcdb9cc965dfe0f6e9b53e85b61261dcb8a.tar.gz taiwan-online-judge-e2a22bcdb9cc965dfe0f6e9b53e85b61261dcb8a.tar.bz2 taiwan-online-judge-e2a22bcdb9cc965dfe0f6e9b53e85b61261dcb8a.tar.lz taiwan-online-judge-e2a22bcdb9cc965dfe0f6e9b53e85b61261dcb8a.tar.xz taiwan-online-judge-e2a22bcdb9cc965dfe0f6e9b53e85b61261dcb8a.tar.zst taiwan-online-judge-e2a22bcdb9cc965dfe0f6e9b53e85b61261dcb8a.zip |
Fix async, asyncdb bug. Add auth.current_iden
Diffstat (limited to 'src/py/imc/proxy.py')
-rwxr-xr-x | src/py/imc/proxy.py | 71 |
1 files changed, 37 insertions, 34 deletions
diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py index 0375191..f0c912e 100755 --- a/src/py/imc/proxy.py +++ b/src/py/imc/proxy.py @@ -4,7 +4,7 @@ import uuid import tornado.ioloop import tornado.stack_context -from imc import nonblock +from imc import async from imc import auth class Connection: @@ -28,16 +28,19 @@ class Connection: callback(self) class Proxy: - def __init__(self,linkclass,linkid,auth,connect_linkid = None): + def __init__(self,linkclass,linkid,auth_instance,conn_linkid_fn = None): self._ioloop = tornado.ioloop.IOLoop.instance() self._linkclass = linkclass self._linkid = linkid - self._auth = auth - self._connect_linkid = connect_linkid + self._auth = auth_instance + + if conn_linkid_fn == None: + self._conn_linkid_fn = lambda : None + else: + self._conn_linkid_fn = conn_linkid_fn self._conn_linkidmap = {} self._conn_retidmap = {self._linkid:{}} - self._retcall_retidmap = {} self._call_pathmap = {} self.MSGTYPE_CALL = 'call' @@ -77,7 +80,7 @@ class Proxy: wait_ret.append((wait['caller_linkid'],retid)) for linkid,retid in wait_ret: - self._retcall(linkid,retid,(False,'Eclose')) + self._ret_call(linkid,retid,(False,'Eclose')) linkids = conn.link_linkidmap.keys() link_del = [] @@ -97,21 +100,6 @@ class Proxy: except KeyError: return None - def request_conn(self,linkid): - try: - return self._conn_linkidmap[linkid] - - except KeyError: - stat,conn = self._connect_linkid(linkid) - - if stat == False: - return None - - elif conn != None and conn.linkid != linkid: - self.link_conn(linkid,conn) - - return conn - def register_call(self,path,func_name,func): self._call_pathmap[''.join([path,func_name])] = func @@ -131,7 +119,7 @@ class Proxy: return result else: - conn = self.request_conn(caller_linkid) + conn = self._request_conn(caller_linkid) if conn != None: self._send_msg_ret(conn,caller_linkid,caller_retid,result) @@ -150,7 +138,9 @@ class Proxy: __add_wait_caller(self._linkid) try: + old_iden = self._auth.change_iden(iden) result = self._call_pathmap[''.join([dst_path,func_name])](iden,param) + self._auth.change_iden(old_iden) except KeyError: result = (False,'Enoexist') @@ -160,7 +150,7 @@ class Proxy: return __ret(result) else: - conn = self.request_conn(dst_linkid) + conn = self._request_conn(dst_linkid) if conn == None: return __ret((False,'Enoexist')) @@ -169,7 +159,7 @@ class Proxy: __add_wait_caller(conn.linkid) self._send_msg_call(conn,caller_retid,timeout,idendesc,dst,func_name,param) - result = nonblock.switchtop() + result = async.switchtop() del self._conn_retidmap[conn.linkid][caller_retid] return __ret(result) @@ -179,20 +169,32 @@ class Proxy: return - def _retcall(self,caller_linkid,caller_retid,result): - @nonblock.caller + def _ret_call(self,caller_linkid,caller_retid,result): + @async.caller def __ret_remote(): - conn = self.request_conn(caller_linkid) + conn = self._request_conn(caller_linkid) if conn != None: self._send_msg_ret(conn,caller_linkid,caller_retid,result) if caller_linkid == self._linkid: grid = caller_retid.split('/',1)[1] - nonblock.retcall(grid,result) + async.retcall(grid,result) else: __ret_remote() + def _request_conn(self,linkid): + try: + return self._conn_linkidmap[linkid] + + except KeyError: + conn = self._conn_linkid_fn(linkid) + + if conn != None and conn.linkid != linkid: + self.link_conn(linkid,conn) + + return conn + def _recv_dispatch(self,conn,data): msg = json.loads(data.decode('utf-8')) msg_type = msg['type'] @@ -217,7 +219,7 @@ class Proxy: wait_ret.append((wait['caller_linkid'],retid)) for linkid,retid in wait_ret: - self._retcall(linkid,retid,(False,'Etimeout')) + self._ret_call(linkid,retid,(False,'Etimeout')) def _send_msg_call(self,conn,caller_retid,timeout,idendesc,dst,func_name,param): msg = { @@ -229,10 +231,11 @@ class Proxy: 'func_name':func_name, 'param':param } + conn.send_msg(bytes(json.dumps(msg),'utf-8')) def _recv_msg_call(self,conn,msg): - @nonblock.caller + @async.caller def __call(): self._route_call(conn.linkclass,conn.linkid,caller_retid,timeout,idendesc,dst,func_name,param) @@ -262,14 +265,14 @@ class Proxy: data = msg['result'] result = (data['stat'],data['data']) - self._retcall(caller_linkid,caller_retid,result) + self._ret_call(caller_linkid,caller_retid,result) -@nonblock.callee +@async.callee def imc_call(idendesc,dst,func_name,param,_grid): - return Proxy.instance.call(_grid,5000,idendesc,dst,func_name,param) + return Proxy.instance.call(_grid,1000,idendesc,dst,func_name,param) def imc_call_async(idendesc,dst,func_name,param,callback = None): - @nonblock.caller + @async.caller def func(): ret = imc_call(idendesc,dst,func_name,param) if callback != None: |