aboutsummaryrefslogtreecommitdiffstats
path: root/src/py/imc/proxy.py
diff options
context:
space:
mode:
authorpzread <netfirewall@gmail.com>2013-05-22 01:42:49 +0800
committerpzread <netfirewall@gmail.com>2013-05-22 01:42:49 +0800
commite2a22bcdb9cc965dfe0f6e9b53e85b61261dcb8a (patch)
tree360e35416a13c37189e8d579b39aa3e9aad0a907 /src/py/imc/proxy.py
parentdab619a7b87ba07fbaa390b5005cc92369f9850f (diff)
downloadtaiwan-online-judge-e2a22bcdb9cc965dfe0f6e9b53e85b61261dcb8a.tar
taiwan-online-judge-e2a22bcdb9cc965dfe0f6e9b53e85b61261dcb8a.tar.gz
taiwan-online-judge-e2a22bcdb9cc965dfe0f6e9b53e85b61261dcb8a.tar.bz2
taiwan-online-judge-e2a22bcdb9cc965dfe0f6e9b53e85b61261dcb8a.tar.lz
taiwan-online-judge-e2a22bcdb9cc965dfe0f6e9b53e85b61261dcb8a.tar.xz
taiwan-online-judge-e2a22bcdb9cc965dfe0f6e9b53e85b61261dcb8a.tar.zst
taiwan-online-judge-e2a22bcdb9cc965dfe0f6e9b53e85b61261dcb8a.zip
Fix async, asyncdb bug. Add auth.current_iden
Diffstat (limited to 'src/py/imc/proxy.py')
-rwxr-xr-xsrc/py/imc/proxy.py71
1 files changed, 37 insertions, 34 deletions
diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py
index 0375191..f0c912e 100755
--- a/src/py/imc/proxy.py
+++ b/src/py/imc/proxy.py
@@ -4,7 +4,7 @@ import uuid
import tornado.ioloop
import tornado.stack_context
-from imc import nonblock
+from imc import async
from imc import auth
class Connection:
@@ -28,16 +28,19 @@ class Connection:
callback(self)
class Proxy:
- def __init__(self,linkclass,linkid,auth,connect_linkid = None):
+ def __init__(self,linkclass,linkid,auth_instance,conn_linkid_fn = None):
self._ioloop = tornado.ioloop.IOLoop.instance()
self._linkclass = linkclass
self._linkid = linkid
- self._auth = auth
- self._connect_linkid = connect_linkid
+ self._auth = auth_instance
+
+ if conn_linkid_fn == None:
+ self._conn_linkid_fn = lambda : None
+ else:
+ self._conn_linkid_fn = conn_linkid_fn
self._conn_linkidmap = {}
self._conn_retidmap = {self._linkid:{}}
- self._retcall_retidmap = {}
self._call_pathmap = {}
self.MSGTYPE_CALL = 'call'
@@ -77,7 +80,7 @@ class Proxy:
wait_ret.append((wait['caller_linkid'],retid))
for linkid,retid in wait_ret:
- self._retcall(linkid,retid,(False,'Eclose'))
+ self._ret_call(linkid,retid,(False,'Eclose'))
linkids = conn.link_linkidmap.keys()
link_del = []
@@ -97,21 +100,6 @@ class Proxy:
except KeyError:
return None
- def request_conn(self,linkid):
- try:
- return self._conn_linkidmap[linkid]
-
- except KeyError:
- stat,conn = self._connect_linkid(linkid)
-
- if stat == False:
- return None
-
- elif conn != None and conn.linkid != linkid:
- self.link_conn(linkid,conn)
-
- return conn
-
def register_call(self,path,func_name,func):
self._call_pathmap[''.join([path,func_name])] = func
@@ -131,7 +119,7 @@ class Proxy:
return result
else:
- conn = self.request_conn(caller_linkid)
+ conn = self._request_conn(caller_linkid)
if conn != None:
self._send_msg_ret(conn,caller_linkid,caller_retid,result)
@@ -150,7 +138,9 @@ class Proxy:
__add_wait_caller(self._linkid)
try:
+ old_iden = self._auth.change_iden(iden)
result = self._call_pathmap[''.join([dst_path,func_name])](iden,param)
+ self._auth.change_iden(old_iden)
except KeyError:
result = (False,'Enoexist')
@@ -160,7 +150,7 @@ class Proxy:
return __ret(result)
else:
- conn = self.request_conn(dst_linkid)
+ conn = self._request_conn(dst_linkid)
if conn == None:
return __ret((False,'Enoexist'))
@@ -169,7 +159,7 @@ class Proxy:
__add_wait_caller(conn.linkid)
self._send_msg_call(conn,caller_retid,timeout,idendesc,dst,func_name,param)
- result = nonblock.switchtop()
+ result = async.switchtop()
del self._conn_retidmap[conn.linkid][caller_retid]
return __ret(result)
@@ -179,20 +169,32 @@ class Proxy:
return
- def _retcall(self,caller_linkid,caller_retid,result):
- @nonblock.caller
+ def _ret_call(self,caller_linkid,caller_retid,result):
+ @async.caller
def __ret_remote():
- conn = self.request_conn(caller_linkid)
+ conn = self._request_conn(caller_linkid)
if conn != None:
self._send_msg_ret(conn,caller_linkid,caller_retid,result)
if caller_linkid == self._linkid:
grid = caller_retid.split('/',1)[1]
- nonblock.retcall(grid,result)
+ async.retcall(grid,result)
else:
__ret_remote()
+ def _request_conn(self,linkid):
+ try:
+ return self._conn_linkidmap[linkid]
+
+ except KeyError:
+ conn = self._conn_linkid_fn(linkid)
+
+ if conn != None and conn.linkid != linkid:
+ self.link_conn(linkid,conn)
+
+ return conn
+
def _recv_dispatch(self,conn,data):
msg = json.loads(data.decode('utf-8'))
msg_type = msg['type']
@@ -217,7 +219,7 @@ class Proxy:
wait_ret.append((wait['caller_linkid'],retid))
for linkid,retid in wait_ret:
- self._retcall(linkid,retid,(False,'Etimeout'))
+ self._ret_call(linkid,retid,(False,'Etimeout'))
def _send_msg_call(self,conn,caller_retid,timeout,idendesc,dst,func_name,param):
msg = {
@@ -229,10 +231,11 @@ class Proxy:
'func_name':func_name,
'param':param
}
+
conn.send_msg(bytes(json.dumps(msg),'utf-8'))
def _recv_msg_call(self,conn,msg):
- @nonblock.caller
+ @async.caller
def __call():
self._route_call(conn.linkclass,conn.linkid,caller_retid,timeout,idendesc,dst,func_name,param)
@@ -262,14 +265,14 @@ class Proxy:
data = msg['result']
result = (data['stat'],data['data'])
- self._retcall(caller_linkid,caller_retid,result)
+ self._ret_call(caller_linkid,caller_retid,result)
-@nonblock.callee
+@async.callee
def imc_call(idendesc,dst,func_name,param,_grid):
- return Proxy.instance.call(_grid,5000,idendesc,dst,func_name,param)
+ return Proxy.instance.call(_grid,1000,idendesc,dst,func_name,param)
def imc_call_async(idendesc,dst,func_name,param,callback = None):
- @nonblock.caller
+ @async.caller
def func():
ret = imc_call(idendesc,dst,func_name,param)
if callback != None: