aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorpzread <netfirewall@gmail.com>2013-05-31 17:15:43 +0800
committerpzread <netfirewall@gmail.com>2013-05-31 17:15:43 +0800
commit880dcc553894b3d7c0317c760548cb437110ab04 (patch)
tree5ee03a02cce924f787b80c86862bc2005cced566
parent42463e2fc315928c3b14bd77de5c6741edade365 (diff)
downloadtaiwan-online-judge-880dcc553894b3d7c0317c760548cb437110ab04.tar
taiwan-online-judge-880dcc553894b3d7c0317c760548cb437110ab04.tar.gz
taiwan-online-judge-880dcc553894b3d7c0317c760548cb437110ab04.tar.bz2
taiwan-online-judge-880dcc553894b3d7c0317c760548cb437110ab04.tar.lz
taiwan-online-judge-880dcc553894b3d7c0317c760548cb437110ab04.tar.xz
taiwan-online-judge-880dcc553894b3d7c0317c760548cb437110ab04.tar.zst
taiwan-online-judge-880dcc553894b3d7c0317c760548cb437110ab04.zip
Add wait to sendfile
-rw-r--r--src/py/backend_server.py8
-rwxr-xr-xsrc/py/imc/proxy.py107
2 files changed, 71 insertions, 44 deletions
diff --git a/src/py/backend_server.py b/src/py/backend_server.py
index 85e49b6..234f8b2 100644
--- a/src/py/backend_server.py
+++ b/src/py/backend_server.py
@@ -226,17 +226,19 @@ class BackendWorker(tornado.tcpserver.TCPServer):
def _test_call(self,iden,param):
param = '3'
- filekey = Proxy.instance.sendfile('/backend/' + param + '/','archlinux-2013.05.01-dual.iso')
+ fileresult = Proxy.instance.sendfile('/backend/' + param + '/','archlinux-2013.05.01-dual.iso')
dst = '/backend/' + param + '/'
- ret = imc_call(self._idendesc,dst,'test_dst',filekey)
+ ret = imc_call(self._idendesc,dst,'test_dst',fileresult.filekey)
+ print(fileresult.wait())
@imc.async.caller
def _test_dst(self,iden,param):
#stat,ret = imc_call(self._idendesc,'/backend/' + self._linkid + '/','test_dsta',param)
#return ret + ' Too'
- Proxy.instance.cancelfile(param)
+ fileresult = Proxy.instance.recvfile(param,'data')
+ print(fileresult.wait())
return 'ok'
diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py
index 948041a..db23c8a 100755
--- a/src/py/imc/proxy.py
+++ b/src/py/imc/proxy.py
@@ -35,6 +35,27 @@ class Connection:
def closed(self):
return self._closed
+class FileResult():
+ def __init__(self,filekey):
+ self.filekey = filekey
+ self._gr = None
+ self._result = None
+
+ def ret_result(self,res):
+ if self._result != None:
+ return
+
+ self._result = res
+ if self._gr != None:
+ self._gr.switch()
+
+ def wait(self):
+ if self._result == None:
+ self._gr = async.current()
+ async.switchtop()
+
+ return self._result
+
class Proxy:
def __init__(self,linkclass,linkid,auth_instance,idendesc,conn_linkid_fn = None):
self.MSGTYPE_CALL = 'call'
@@ -66,7 +87,7 @@ class Proxy:
Proxy.instance = self
self.register_call('imc/','pend_recvfile',self._pend_recvfile)
- self.register_call('imc/','cancel_sendfile',self._cancel_sendfile)
+ self.register_call('imc/','reject_sendfile',self._reject_sendfile)
def add_conn(self,conn):
assert conn.linkid not in self._conn_linkidmap
@@ -147,22 +168,20 @@ class Proxy:
filekey = str(uuid.uuid1())
filesize = os.stat(filepath).st_size
+ fileresult = FileResult(filekey)
+
self._info_filekeymap[filekey] = {
'filesize':filesize,
'filepath':filepath,
- 'callback':None
+ 'fileresult':fileresult,
+ 'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),lambda : self._ret_sendfile('Etimeout'))
}
_call()
- return filekey
+ return fileresult
def recvfile(self,filekey,filepath):
- def _handle_cb(err = None):
- info = self._info_filekeymap.pop(filekey)
-
- print('recv done')
-
def _fail_cb(err):
try:
del self._conn_filekeymap[in_conn.linkid][filekey]
@@ -174,7 +193,7 @@ class Proxy:
in_conn.abort_file(filekey)
self._send_msg_abortfile(in_conn,filekey,err)
- _handle_cb(err)
+ self._ret_sendfile(filekey,err)
try:
info = self._info_filekeymap[filekey]
@@ -188,16 +207,18 @@ class Proxy:
in_conn = self._request_conn(src_linkid)
self._add_wait_filekey(filekey,filesize,in_conn,None,_fail_cb)
- in_conn.recv_file(filekey,filesize,filepath,_handle_cb)
+ in_conn.recv_file(filekey,filesize,filepath,lambda : self._ret_sendfile(filekey))
self._send_msg_sendfile(in_conn,src_linkid,filekey,filesize)
- def cancelfile(self,filekey):
+ return info['fileresult']
+
+ def rejectfile(self,filekey):
@async.callee
def _call(_grid):
self.call(_grid,
10000,
self._idendesc,
- dst_link + 'imc/','cancel_sendfile',
+ dst_link + 'imc/','reject_sendfile',
{'filekey':filekey})
try:
@@ -278,13 +299,22 @@ class Proxy:
self._send_msg_call(conn,caller_retid,timeout,idendesc,dst,func_name,param)
return
+
+ def _ret_call(self,caller_linkid,caller_retid,result):
+ @async.caller
+ def __ret_remote():
+ conn = self._request_conn(caller_linkid)
+ if conn != None:
+ self._send_msg_ret(conn,caller_linkid,caller_retid,result)
- def _route_sendfile(self,out_conn,src_linkid,filekey,filesize):
- def __handle_cb(err = None):
- info = self._info_filekeymap.pop(filekey)
+ if caller_linkid == self._linkid:
+ grid = caller_retid.split('/',1)[1]
+ async.retcall(grid,result)
- print('send done')
+ else:
+ __ret_remote()
+ def _route_sendfile(self,out_conn,src_linkid,filekey,filesize):
def __send_fail_cb(err):
try:
del self._conn_filekeymap[out_conn.linkid][filekey]
@@ -296,7 +326,7 @@ class Proxy:
out_conn.abort_file(filekey)
self._send_msg_abortfile(out_conn,filekey,err)
- __handle_cb(err)
+ self._ret_sendfile(filekey,err)
def __bridge_fail_cb(err):
try:
@@ -319,21 +349,21 @@ class Proxy:
except KeyError:
pass
- __handle_cb(err)
-
if src_linkid == self._linkid:
try:
info = self._info_filekeymap[filekey]
assert info['filesize'] == filesize
except KeyError:
+ self._ret_sendfile(filekey,'Enoexist')
return
except AssertionError:
+ self._ret_sendfile(filekey,'Efilesize')
return
self._add_wait_filekey(filekey,filesize,None,out_conn,__send_fail_cb)
- out_conn.send_file(filekey,info['filepath'],__handle_cb)
+ out_conn.send_file(filekey,info['filepath'],lambda : self._ret_sendfile(filekey))
else:
print('test start')
@@ -346,19 +376,21 @@ class Proxy:
self._send_msg_sendfile(in_conn,src_linkid,filekey,filesize)
- def _ret_call(self,caller_linkid,caller_retid,result):
- @async.caller
- def __ret_remote():
- conn = self._request_conn(caller_linkid)
- if conn != None:
- self._send_msg_ret(conn,caller_linkid,caller_retid,result)
+ def _ret_sendfile(self,filekey,err = None):
+ try:
+ info = self._info_filekeymap.pop(filekey)
- if caller_linkid == self._linkid:
- grid = caller_retid.split('/',1)[1]
- async.retcall(grid,result)
+ except KeyError:
+ return
+
+ self._ioloop.remove_timeout(info['timer'])
+
+ fileresult = info['fileresult']
+ if err == None:
+ self._ioloop.add_callback(lambda : fileresult.ret_result('Success'))
else:
- __ret_remote()
+ self._ioloop.add_callback(lambda : fileresult.ret_result(err))
def _request_conn(self,linkid):
try:
@@ -398,7 +430,6 @@ class Proxy:
fail_callback(err)
callback = tornado.stack_context.wrap(__call)
-
timer = self._ioloop.add_timeout(datetime.timedelta(milliseconds = filesize),lambda : callback('Etimeout'))
if in_conn != None:
@@ -520,20 +551,14 @@ class Proxy:
'src_linkclass':iden['linkclass'],
'src_linkid':iden['linkid'],
'filesize':filesize,
- 'callback':None
+ 'fileresult':FileResult(filekey),
+ 'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),lambda : self._ret_sendfile('Etimeout'))
}
@async.caller
- def _cancel_sendfile(self,iden,param):
+ def _reject_sendfile(self,iden,param):
filekey = param['filekey']
-
- try:
- info = self._info_filekeymap.pop(filekey)
-
- except KeyError:
- return
-
- print('cancel')
+ self._ret_sendfile(filekey,'Ereject')
@async.callee
def imc_call(idendesc,dst,func_name,param,_grid):