diff options
author | pzread <netfirewall@gmail.com> | 2013-05-31 17:15:43 +0800 |
---|---|---|
committer | pzread <netfirewall@gmail.com> | 2013-05-31 17:15:43 +0800 |
commit | 880dcc553894b3d7c0317c760548cb437110ab04 (patch) | |
tree | 5ee03a02cce924f787b80c86862bc2005cced566 | |
parent | 42463e2fc315928c3b14bd77de5c6741edade365 (diff) | |
download | taiwan-online-judge-880dcc553894b3d7c0317c760548cb437110ab04.tar taiwan-online-judge-880dcc553894b3d7c0317c760548cb437110ab04.tar.gz taiwan-online-judge-880dcc553894b3d7c0317c760548cb437110ab04.tar.bz2 taiwan-online-judge-880dcc553894b3d7c0317c760548cb437110ab04.tar.lz taiwan-online-judge-880dcc553894b3d7c0317c760548cb437110ab04.tar.xz taiwan-online-judge-880dcc553894b3d7c0317c760548cb437110ab04.tar.zst taiwan-online-judge-880dcc553894b3d7c0317c760548cb437110ab04.zip |
Add wait to sendfile
-rw-r--r-- | src/py/backend_server.py | 8 | ||||
-rwxr-xr-x | src/py/imc/proxy.py | 107 |
2 files changed, 71 insertions, 44 deletions
diff --git a/src/py/backend_server.py b/src/py/backend_server.py index 85e49b6..234f8b2 100644 --- a/src/py/backend_server.py +++ b/src/py/backend_server.py @@ -226,17 +226,19 @@ class BackendWorker(tornado.tcpserver.TCPServer): def _test_call(self,iden,param): param = '3' - filekey = Proxy.instance.sendfile('/backend/' + param + '/','archlinux-2013.05.01-dual.iso') + fileresult = Proxy.instance.sendfile('/backend/' + param + '/','archlinux-2013.05.01-dual.iso') dst = '/backend/' + param + '/' - ret = imc_call(self._idendesc,dst,'test_dst',filekey) + ret = imc_call(self._idendesc,dst,'test_dst',fileresult.filekey) + print(fileresult.wait()) @imc.async.caller def _test_dst(self,iden,param): #stat,ret = imc_call(self._idendesc,'/backend/' + self._linkid + '/','test_dsta',param) #return ret + ' Too' - Proxy.instance.cancelfile(param) + fileresult = Proxy.instance.recvfile(param,'data') + print(fileresult.wait()) return 'ok' diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py index 948041a..db23c8a 100755 --- a/src/py/imc/proxy.py +++ b/src/py/imc/proxy.py @@ -35,6 +35,27 @@ class Connection: def closed(self): return self._closed +class FileResult(): + def __init__(self,filekey): + self.filekey = filekey + self._gr = None + self._result = None + + def ret_result(self,res): + if self._result != None: + return + + self._result = res + if self._gr != None: + self._gr.switch() + + def wait(self): + if self._result == None: + self._gr = async.current() + async.switchtop() + + return self._result + class Proxy: def __init__(self,linkclass,linkid,auth_instance,idendesc,conn_linkid_fn = None): self.MSGTYPE_CALL = 'call' @@ -66,7 +87,7 @@ class Proxy: Proxy.instance = self self.register_call('imc/','pend_recvfile',self._pend_recvfile) - self.register_call('imc/','cancel_sendfile',self._cancel_sendfile) + self.register_call('imc/','reject_sendfile',self._reject_sendfile) def add_conn(self,conn): assert conn.linkid not in self._conn_linkidmap @@ -147,22 +168,20 @@ class Proxy: filekey = str(uuid.uuid1()) filesize = os.stat(filepath).st_size + fileresult = FileResult(filekey) + self._info_filekeymap[filekey] = { 'filesize':filesize, 'filepath':filepath, - 'callback':None + 'fileresult':fileresult, + 'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),lambda : self._ret_sendfile('Etimeout')) } _call() - return filekey + return fileresult def recvfile(self,filekey,filepath): - def _handle_cb(err = None): - info = self._info_filekeymap.pop(filekey) - - print('recv done') - def _fail_cb(err): try: del self._conn_filekeymap[in_conn.linkid][filekey] @@ -174,7 +193,7 @@ class Proxy: in_conn.abort_file(filekey) self._send_msg_abortfile(in_conn,filekey,err) - _handle_cb(err) + self._ret_sendfile(filekey,err) try: info = self._info_filekeymap[filekey] @@ -188,16 +207,18 @@ class Proxy: in_conn = self._request_conn(src_linkid) self._add_wait_filekey(filekey,filesize,in_conn,None,_fail_cb) - in_conn.recv_file(filekey,filesize,filepath,_handle_cb) + in_conn.recv_file(filekey,filesize,filepath,lambda : self._ret_sendfile(filekey)) self._send_msg_sendfile(in_conn,src_linkid,filekey,filesize) - def cancelfile(self,filekey): + return info['fileresult'] + + def rejectfile(self,filekey): @async.callee def _call(_grid): self.call(_grid, 10000, self._idendesc, - dst_link + 'imc/','cancel_sendfile', + dst_link + 'imc/','reject_sendfile', {'filekey':filekey}) try: @@ -278,13 +299,22 @@ class Proxy: self._send_msg_call(conn,caller_retid,timeout,idendesc,dst,func_name,param) return + + def _ret_call(self,caller_linkid,caller_retid,result): + @async.caller + def __ret_remote(): + conn = self._request_conn(caller_linkid) + if conn != None: + self._send_msg_ret(conn,caller_linkid,caller_retid,result) - def _route_sendfile(self,out_conn,src_linkid,filekey,filesize): - def __handle_cb(err = None): - info = self._info_filekeymap.pop(filekey) + if caller_linkid == self._linkid: + grid = caller_retid.split('/',1)[1] + async.retcall(grid,result) - print('send done') + else: + __ret_remote() + def _route_sendfile(self,out_conn,src_linkid,filekey,filesize): def __send_fail_cb(err): try: del self._conn_filekeymap[out_conn.linkid][filekey] @@ -296,7 +326,7 @@ class Proxy: out_conn.abort_file(filekey) self._send_msg_abortfile(out_conn,filekey,err) - __handle_cb(err) + self._ret_sendfile(filekey,err) def __bridge_fail_cb(err): try: @@ -319,21 +349,21 @@ class Proxy: except KeyError: pass - __handle_cb(err) - if src_linkid == self._linkid: try: info = self._info_filekeymap[filekey] assert info['filesize'] == filesize except KeyError: + self._ret_sendfile(filekey,'Enoexist') return except AssertionError: + self._ret_sendfile(filekey,'Efilesize') return self._add_wait_filekey(filekey,filesize,None,out_conn,__send_fail_cb) - out_conn.send_file(filekey,info['filepath'],__handle_cb) + out_conn.send_file(filekey,info['filepath'],lambda : self._ret_sendfile(filekey)) else: print('test start') @@ -346,19 +376,21 @@ class Proxy: self._send_msg_sendfile(in_conn,src_linkid,filekey,filesize) - def _ret_call(self,caller_linkid,caller_retid,result): - @async.caller - def __ret_remote(): - conn = self._request_conn(caller_linkid) - if conn != None: - self._send_msg_ret(conn,caller_linkid,caller_retid,result) + def _ret_sendfile(self,filekey,err = None): + try: + info = self._info_filekeymap.pop(filekey) - if caller_linkid == self._linkid: - grid = caller_retid.split('/',1)[1] - async.retcall(grid,result) + except KeyError: + return + + self._ioloop.remove_timeout(info['timer']) + + fileresult = info['fileresult'] + if err == None: + self._ioloop.add_callback(lambda : fileresult.ret_result('Success')) else: - __ret_remote() + self._ioloop.add_callback(lambda : fileresult.ret_result(err)) def _request_conn(self,linkid): try: @@ -398,7 +430,6 @@ class Proxy: fail_callback(err) callback = tornado.stack_context.wrap(__call) - timer = self._ioloop.add_timeout(datetime.timedelta(milliseconds = filesize),lambda : callback('Etimeout')) if in_conn != None: @@ -520,20 +551,14 @@ class Proxy: 'src_linkclass':iden['linkclass'], 'src_linkid':iden['linkid'], 'filesize':filesize, - 'callback':None + 'fileresult':FileResult(filekey), + 'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),lambda : self._ret_sendfile('Etimeout')) } @async.caller - def _cancel_sendfile(self,iden,param): + def _reject_sendfile(self,iden,param): filekey = param['filekey'] - - try: - info = self._info_filekeymap.pop(filekey) - - except KeyError: - return - - print('cancel') + self._ret_sendfile(filekey,'Ereject') @async.callee def imc_call(idendesc,dst,func_name,param,_grid): |