aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorpzread <netfirewall@gmail.com>2013-05-31 01:04:19 +0800
committerpzread <netfirewall@gmail.com>2013-05-31 01:04:19 +0800
commit42463e2fc315928c3b14bd77de5c6741edade365 (patch)
tree629a46aba852314d1e38006d3fc55cb6f1b82709
parent05c48532dc67c44d12682de3ece293e6f2a412f3 (diff)
downloadtaiwan-online-judge-42463e2fc315928c3b14bd77de5c6741edade365.tar
taiwan-online-judge-42463e2fc315928c3b14bd77de5c6741edade365.tar.gz
taiwan-online-judge-42463e2fc315928c3b14bd77de5c6741edade365.tar.bz2
taiwan-online-judge-42463e2fc315928c3b14bd77de5c6741edade365.tar.lz
taiwan-online-judge-42463e2fc315928c3b14bd77de5c6741edade365.tar.xz
taiwan-online-judge-42463e2fc315928c3b14bd77de5c6741edade365.tar.zst
taiwan-online-judge-42463e2fc315928c3b14bd77de5c6741edade365.zip
Add cancel sendfile
-rw-r--r--src/py/backend_server.py8
-rw-r--r--src/py/center_server.py2
-rwxr-xr-xsrc/py/imc/proxy.py97
-rw-r--r--src/py/netio.py9
4 files changed, 83 insertions, 33 deletions
diff --git a/src/py/backend_server.py b/src/py/backend_server.py
index 2ac0f2b..85e49b6 100644
--- a/src/py/backend_server.py
+++ b/src/py/backend_server.py
@@ -92,7 +92,7 @@ class BackendWorker(tornado.tcpserver.TCPServer):
self._idendesc = info['idendesc']
iden = TOJAuth.instance.get_iden('backend',self._linkid,self._idendesc)
self._linkid = iden['linkid']
- Proxy('backend',self._linkid,TOJAuth.instance,self._conn_linkid)
+ Proxy('backend',self._linkid,TOJAuth.instance,self._idendesc,self._conn_linkid)
self.center_conn = SocketConnection('center',info['center_linkid'],stream)
self.center_conn.add_close_callback(__retry)
@@ -225,8 +225,8 @@ class BackendWorker(tornado.tcpserver.TCPServer):
@imc.async.caller
def _test_call(self,iden,param):
- param = '5'
- filekey = Proxy.instance.sendfile(self._idendesc,'/backend/' + param + '/','archlinux-2013.05.01-dual.iso')
+ param = '3'
+ filekey = Proxy.instance.sendfile('/backend/' + param + '/','archlinux-2013.05.01-dual.iso')
dst = '/backend/' + param + '/'
ret = imc_call(self._idendesc,dst,'test_dst',filekey)
@@ -236,7 +236,7 @@ class BackendWorker(tornado.tcpserver.TCPServer):
#stat,ret = imc_call(self._idendesc,'/backend/' + self._linkid + '/','test_dsta',param)
#return ret + ' Too'
- Proxy.instance.recvfile(param,'data')
+ Proxy.instance.cancelfile(param)
return 'ok'
diff --git a/src/py/center_server.py b/src/py/center_server.py
index 513d946..2ad6dfd 100644
--- a/src/py/center_server.py
+++ b/src/py/center_server.py
@@ -68,7 +68,7 @@ class CenterServer(tornado.tcpserver.TCPServer):
self._linkid = self._create_linkid()
self._idendesc = self._create_idendesc('center',self._linkid)
- Proxy('center',self._linkid,TOJAuth.instance)
+ Proxy('center',self._linkid,TOJAuth.instance,self._idendesc)
imc_register_call('','lookup_linkid',self._lookup_linkid)
imc_register_call('','add_client',self._add_client)
diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py
index 65589c3..948041a 100755
--- a/src/py/imc/proxy.py
+++ b/src/py/imc/proxy.py
@@ -36,7 +36,7 @@ class Connection:
return self._closed
class Proxy:
- def __init__(self,linkclass,linkid,auth_instance,conn_linkid_fn = None):
+ def __init__(self,linkclass,linkid,auth_instance,idendesc,conn_linkid_fn = None):
self.MSGTYPE_CALL = 'call'
self.MSGTYPE_RET = 'ret'
self.MSGTYPE_SENDFILE = 'sendfile'
@@ -46,6 +46,7 @@ class Proxy:
self._linkclass = linkclass
self._linkid = linkid
self._auth = auth_instance
+ self._idendesc = idendesc
if conn_linkid_fn == None:
self._conn_linkid_fn = lambda : None
@@ -65,6 +66,7 @@ class Proxy:
Proxy.instance = self
self.register_call('imc/','pend_recvfile',self._pend_recvfile)
+ self.register_call('imc/','cancel_sendfile',self._cancel_sendfile)
def add_conn(self,conn):
assert conn.linkid not in self._conn_linkidmap
@@ -133,12 +135,12 @@ class Proxy:
caller_retid = ''.join([self._linkid,'/',caller_grid])
return self._route_call(None,caller_retid,timeout,idendesc,dst,func_name,param)
- def sendfile(self,idendesc,dst_link,filepath):
+ def sendfile(self,dst_link,filepath):
@async.callee
def _call(_grid):
self.call(_grid,
10000,
- idendesc,
+ self._idendesc,
dst_link + 'imc/','pend_recvfile',
{'filekey':filekey,'filesize':filesize})
@@ -147,7 +149,8 @@ class Proxy:
self._info_filekeymap[filekey] = {
'filesize':filesize,
- 'filepath':filepath
+ 'filepath':filepath,
+ 'callback':None
}
_call()
@@ -155,6 +158,11 @@ class Proxy:
return filekey
def recvfile(self,filekey,filepath):
+ def _handle_cb(err = None):
+ info = self._info_filekeymap.pop(filekey)
+
+ print('recv done')
+
def _fail_cb(err):
try:
del self._conn_filekeymap[in_conn.linkid][filekey]
@@ -166,21 +174,40 @@ class Proxy:
in_conn.abort_file(filekey)
self._send_msg_abortfile(in_conn,filekey,err)
+ _handle_cb(err)
+
try:
- info = self._info_filekeymap.pop(filekey)
- src_linkid = info['src_linkid']
- filesize = info['filesize']
+ info = self._info_filekeymap[filekey]
- in_conn = self._request_conn(src_linkid)
- self._add_wait_filekey(filekey,filesize,in_conn,None,_fail_cb)
+ except KeyError:
+ return
- in_conn.recv_file(filekey,filesize,filepath)
- self._send_msg_sendfile(in_conn,src_linkid,filekey,filesize)
+ src_linkid = info['src_linkid']
+ filesize = info['filesize']
+
+ in_conn = self._request_conn(src_linkid)
+ self._add_wait_filekey(filekey,filesize,in_conn,None,_fail_cb)
+
+ in_conn.recv_file(filekey,filesize,filepath,_handle_cb)
+ self._send_msg_sendfile(in_conn,src_linkid,filekey,filesize)
+
+ def cancelfile(self,filekey):
+ @async.callee
+ def _call(_grid):
+ self.call(_grid,
+ 10000,
+ self._idendesc,
+ dst_link + 'imc/','cancel_sendfile',
+ {'filekey':filekey})
+
+ try:
+ info = self._info_filekeymap.pop(filekey)
except KeyError:
- pass
+ return
- return
+ dst_link = ''.join(['/',info['src_linkclass'],'/',info['src_linkid'],'/'])
+ _call()
def _route_call(self,in_conn,caller_retid,timeout,idendesc,dst,func_name,param):
def __add_wait_caller(in_linkid):
@@ -253,7 +280,12 @@ class Proxy:
return
def _route_sendfile(self,out_conn,src_linkid,filekey,filesize):
- def _send_fail(err):
+ def __handle_cb(err = None):
+ info = self._info_filekeymap.pop(filekey)
+
+ print('send done')
+
+ def __send_fail_cb(err):
try:
del self._conn_filekeymap[out_conn.linkid][filekey]
@@ -264,7 +296,9 @@ class Proxy:
out_conn.abort_file(filekey)
self._send_msg_abortfile(out_conn,filekey,err)
- def _bridge_fail(err):
+ __handle_cb(err)
+
+ def __bridge_fail_cb(err):
try:
del self._conn_filekeymap[in_conn.linkid][filekey]
@@ -285,26 +319,27 @@ class Proxy:
except KeyError:
pass
+ __handle_cb(err)
+
if src_linkid == self._linkid:
try:
- info = self._info_filekeymap.pop(filekey)
+ info = self._info_filekeymap[filekey]
assert info['filesize'] == filesize
- self._add_wait_filekey(filekey,filesize,None,out_conn,_send_fail)
-
- out_conn.send_file(filekey,info['filepath'])
-
except KeyError:
- pass
+ return
except AssertionError:
- pass
+ return
+
+ self._add_wait_filekey(filekey,filesize,None,out_conn,__send_fail_cb)
+ out_conn.send_file(filekey,info['filepath'],__handle_cb)
else:
print('test start')
in_conn = self._request_conn(src_linkid)
- self._add_wait_filekey(filekey,filesize,in_conn,out_conn,_bridge_fail)
+ self._add_wait_filekey(filekey,filesize,in_conn,out_conn,__bridge_fail_cb)
send_fn = out_conn.send_filedata(filekey,filesize)
in_conn.recv_filedata(filekey,filesize,send_fn)
@@ -482,9 +517,23 @@ class Proxy:
filesize = param['filesize']
self._info_filekeymap[filekey] = {
+ 'src_linkclass':iden['linkclass'],
'src_linkid':iden['linkid'],
- 'filesize':filesize
+ 'filesize':filesize,
+ 'callback':None
}
+
+ @async.caller
+ def _cancel_sendfile(self,iden,param):
+ filekey = param['filekey']
+
+ try:
+ info = self._info_filekeymap.pop(filekey)
+
+ except KeyError:
+ return
+
+ print('cancel')
@async.callee
def imc_call(idendesc,dst,func_name,param,_grid):
diff --git a/src/py/netio.py b/src/py/netio.py
index 2910207..df19627 100644
--- a/src/py/netio.py
+++ b/src/py/netio.py
@@ -289,7 +289,7 @@ class SocketConnection(Connection):
self.main_stream.write(struct.pack('l',len(data)) + data)
- def send_file(self,filekey,filepath):
+ def send_file(self,filekey,filepath,callback):
def _conn_cb():
self._add_sendfile(filekey,_fail_cb)
@@ -306,7 +306,7 @@ class SocketConnection(Connection):
os.close(fd)
- print('send done')
+ callback()
def _fail_cb():
try:
@@ -328,7 +328,7 @@ class SocketConnection(Connection):
file_stream.set_close_callback(lambda stream : _fail_cb())
file_stream.connect(_conn_cb)
- def recv_file(self,filekey,filesize,filepath):
+ def recv_file(self,filekey,filesize,filepath,callback):
def _conn_cb(stream):
nonlocal file_stream
@@ -343,9 +343,10 @@ class SocketConnection(Connection):
file_stream.close()
os.close(fd)
- print('recv done')
print(time.perf_counter() - st)
+ callback()
+
def _fail_cb():
try:
del self._sendfile_filekeymap[filekey]