aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorpzread <netfirewall@gmail.com>2013-06-02 02:19:56 +0800
committerpzread <netfirewall@gmail.com>2013-06-02 02:19:56 +0800
commitc00472c8bfbcc86780ad25f7be687815432c4a37 (patch)
tree58e353798596a7489999c2179b4acab9b536a065
parent880dcc553894b3d7c0317c760548cb437110ab04 (diff)
downloadtaiwan-online-judge-c00472c8bfbcc86780ad25f7be687815432c4a37.tar
taiwan-online-judge-c00472c8bfbcc86780ad25f7be687815432c4a37.tar.gz
taiwan-online-judge-c00472c8bfbcc86780ad25f7be687815432c4a37.tar.bz2
taiwan-online-judge-c00472c8bfbcc86780ad25f7be687815432c4a37.tar.lz
taiwan-online-judge-c00472c8bfbcc86780ad25f7be687815432c4a37.tar.xz
taiwan-online-judge-c00472c8bfbcc86780ad25f7be687815432c4a37.tar.zst
taiwan-online-judge-c00472c8bfbcc86780ad25f7be687815432c4a37.zip
Fix async bug. Improve fault tolerance
-rw-r--r--src/py/asyncdb.py26
-rw-r--r--src/py/backend_server.py50
-rw-r--r--src/py/center_server.py14
-rw-r--r--src/py/imc/async.py65
-rw-r--r--src/py/imc/auth.py2
-rwxr-xr-xsrc/py/imc/proxy.py165
-rw-r--r--src/py/tojauth.py4
7 files changed, 154 insertions, 172 deletions
diff --git a/src/py/asyncdb.py b/src/py/asyncdb.py
index 651dd73..1e2bf7a 100644
--- a/src/py/asyncdb.py
+++ b/src/py/asyncdb.py
@@ -19,7 +19,7 @@ class RestrictCursor:
def __iter__(self):
return self._cur
- def execute(self,sql,param = None,_grid = None):
+ def execute(self,sql,param = None):
self._db.execute(self._cur,sql,param)
self.arraysize = self._cur.arraysize
@@ -201,14 +201,13 @@ class AsyncDB:
def cursor(self):
return RestrictCursor(self,self._cursor())
- @imc.async.callee
- def execute(self,cur,sql,param = None,_grid = None):
+ def execute(self,cur,sql,param = None):
fd = cur.connection.fileno()
- self._pendoper_fdmap[fd].append((self.OPER_EXECUTE,(cur,sql,param),_grid))
+ self._pendoper_fdmap[fd].append((self.OPER_EXECUTE,(cur,sql,param),imc.async.get_retid()))
self._ioloop.add_callback(self._oper_dispatch,fd,0)
- imc.async.switchtop()
+ imc.async.switch_top()
def begin_transaction(self):
if len(self._free_connpool) > 0:
@@ -229,18 +228,17 @@ class AsyncDB:
else:
self._close_conn(conn)
- @imc.async.callee
- def _cursor(self,conn = None,_grid = None):
+ def _cursor(self,conn = None):
if conn != None:
fd = conn.fileno()
else:
fd = self._share_connpool[random.randrange(len(self._share_connpool))].fileno()
- self._pendoper_fdmap[fd].append((self.OPER_CURSOR,None,_grid))
+ self._pendoper_fdmap[fd].append((self.OPER_CURSOR,None,imc.async.get_retid()))
self._ioloop.add_callback(self._oper_dispatch,fd,0)
- cur = imc.async.switchtop()
+ cur = imc.async.switch_top()
return cur
def _create_conn(self):
@@ -302,7 +300,7 @@ class AsyncDB:
else:
try:
- oper,data,grid = self._pendoper_fdmap[fd].popleft()
+ oper,data,retid = self._pendoper_fdmap[fd].popleft()
except IndexError:
return
@@ -310,20 +308,20 @@ class AsyncDB:
if oper == self.OPER_CURSOR:
def _ret_cursor(err = None):
if err == None:
- imc.async.retcall(grid,conn.cursor())
+ imc.async.ret(retid,conn.cursor())
else:
- imc.async.retcall(grid,err = err)
+ imc.async.ret(retid,err = err)
self._opercallback_fdmap[fd] = _ret_cursor
elif oper == self.OPER_EXECUTE:
def _ret_execute(err = None):
if err == None:
- imc.async.retcall(grid,None)
+ imc.async.ret(retid)
else:
- imc.async.retcall(grid,err = err)
+ imc.async.ret(retid,err = err)
cur,sql,param = data
diff --git a/src/py/backend_server.py b/src/py/backend_server.py
index 234f8b2..edfee8d 100644
--- a/src/py/backend_server.py
+++ b/src/py/backend_server.py
@@ -103,8 +103,8 @@ class BackendWorker(tornado.tcpserver.TCPServer):
#imc_register_call('','test_dsta',self._test_dsta)
time.sleep(1)
- if int(self._linkid) == 2:
- self._test_call(None,'9')
+ #if int(self._linkid) == 2:
+ self._test_call(None,'9')
sock_ip,sock_port = self.sock_addr
netio.send_pack(stream,bytes(json.dumps({
@@ -122,9 +122,9 @@ class BackendWorker(tornado.tcpserver.TCPServer):
def _conn_linkid(self,linkid):
def __handle_pend(conn):
- pends = self._pend_mainconn_linkidmap.pop(worker_linkid)
- for gr in pends:
- gr.switch(conn)
+ retids = self._pend_mainconn_linkidmap.pop(worker_linkid)
+ for retid in retids:
+ imc.async.ret(retid,conn)
def __conn_cb():
conn = Proxy.instance.get_conn(worker_linkid)
@@ -169,11 +169,11 @@ class BackendWorker(tornado.tcpserver.TCPServer):
return conn
elif worker_linkid in self._pend_mainconn_linkidmap:
- self._pend_mainconn_linkidmap[worker_linkid].append(imc.async.current())
- return imc.async.switchtop()
+ self._pend_mainconn_linkidmap[worker_linkid].append(imc.async.get_retid())
+ return imc.async.switch_top()
else:
- self._pend_mainconn_linkidmap[worker_linkid] = [imc.async.current()]
+ self._pend_mainconn_linkidmap[worker_linkid] = [imc.async.get_retid()]
sock_addr = (ret['sock_ip'],ret['sock_port'])
@@ -181,7 +181,7 @@ class BackendWorker(tornado.tcpserver.TCPServer):
main_stream.set_close_callback(lambda conn : __handle_pend(None))
main_stream.connect(__conn_cb)
- return imc.async.switchtop()
+ return imc.async.switch_top()
def _add_pend_filestream(self,filekey,callback):
self._pend_filestream_filekeymap[filekey] = tornado.stack_context.wrap(callback)
@@ -208,9 +208,9 @@ class BackendWorker(tornado.tcpserver.TCPServer):
if self._linkid > linkid:
__send_back(True)
- pends = self._pend_mainconn_linkidmap.pop(linkid)
- for callback in pends:
- callback(conn)
+ retids = self._pend_mainconn_linkidmap.pop(linkid)
+ for retid in retids:
+ imc.async.ret(retid,conn)
else:
__send_back(False)
@@ -225,20 +225,32 @@ class BackendWorker(tornado.tcpserver.TCPServer):
@imc.async.caller
def _test_call(self,iden,param):
- param = '3'
- fileresult = Proxy.instance.sendfile('/backend/' + param + '/','archlinux-2013.05.01-dual.iso')
+ param = '6'
- dst = '/backend/' + param + '/'
- ret = imc_call(self._idendesc,dst,'test_dst',fileresult.filekey)
- print(fileresult.wait())
+ pend = []
+ for i in range(0,8):
+ if str((i % 8) + 2) == self._linkid:
+ continue
+
+ fileres = Proxy.instance.sendfile('/backend/' + str((i % 8) + 2) + '/','archlinux-2013.05.01-dual.iso')
+
+ dst = '/backend/' + str((i % 8) + 2) + '/'
+ ret = imc_call(self._idendesc,dst,'test_dst',fileres.filekey)
+
+ pend.append(fileres)
+
+ for p in pend:
+ print(p.wait())
+
+ print(self._linkid)
@imc.async.caller
def _test_dst(self,iden,param):
#stat,ret = imc_call(self._idendesc,'/backend/' + self._linkid + '/','test_dsta',param)
#return ret + ' Too'
- fileresult = Proxy.instance.recvfile(param,'data')
- print(fileresult.wait())
+ fileres = Proxy.instance.recvfile(param,'data')
+ #print(fileres.wait())
return 'ok'
diff --git a/src/py/center_server.py b/src/py/center_server.py
index 2ad6dfd..0b32610 100644
--- a/src/py/center_server.py
+++ b/src/py/center_server.py
@@ -139,16 +139,16 @@ class CenterServer(tornado.tcpserver.TCPServer):
linkid = param
try:
- #worker = self._worker_linkidmap[linkid]
+ worker = self._worker_linkidmap[linkid]
- a = int(iden['linkid'])
- b = int(linkid)
+ #a = int(iden['linkid'])
+ #b = int(linkid)
- if b > a:
- worker = self._worker_linkidmap[str(a + 1)]
+ #if b > a:
+ # worker = self._worker_linkidmap[str(a + 1)]
- else:
- worker = self._worker_linkidmap[str(a - 1)]
+ #else:
+ # worker = self._worker_linkidmap[str(a - 1)]
if iden['linkclass'] != 'client':
sock_ip,sock_port = worker.sock_addr
diff --git a/src/py/imc/async.py b/src/py/imc/async.py
index cac8d23..8934ee0 100644
--- a/src/py/imc/async.py
+++ b/src/py/imc/async.py
@@ -1,18 +1,21 @@
import traceback
+import uuid
+import ssl
+from Crypto.Hash import SHA512
from greenlet import greenlet
from imc import auth
-gr_waitmap = {}
+gr_idmap = {}
+ret_idmap = {}
gr_main = greenlet.getcurrent()
-def current():
- return greenlet.getcurrent()
-
-def switchtop():
+def switch_top():
global gr_main
+ assert greenlet.getcurrent() != gr_main
+
old_iden = auth.current_iden
auth.current_iden = None
@@ -22,29 +25,27 @@ def switchtop():
return result
-def callee(f):
- def wrapper(*args,**kwargs):
- kwargs['_grid'] = str(id(greenlet.getcurrent()))
- return f(*args,**kwargs)
-
- return wrapper
-
def caller(f):
def wrapper(*args,**kwargs):
global gr_main
- global gr_waitmap
+ global gr_idmap
+ global ret_idmap
def _call(*args,**kwargs):
ret = f(*args,**kwargs)
- del gr_waitmap[grid]
+ retids = gr_idmap[grid]
+ for retid in retids:
+ del ret_idmap[retid]
+
+ del gr_idmap[grid]
return (True,ret)
try:
gr = greenlet(_call)
- grid = str(id(gr))
+ grid = id(gr)
+ gr_idmap[grid] = set()
old_iden = auth.current_iden
- gr_waitmap[grid] = (gr,old_iden)
result = gr.switch(*args,**kwargs)
auth.current_iden = old_iden
@@ -64,14 +65,35 @@ def caller(f):
return wrapper
-def retcall(grid,value = None,err = None):
- global gr_waitmap
+def get_retid():
+ global gr_idmap
+ global ret_idmap
+
+ gr = greenlet.getcurrent()
+ grid = id(gr)
+ retid = SHA512.new(uuid.uuid1().bytes + ssl.RAND_bytes(64)).hexdigest()
+
+ gr_idmap[grid].add(retid)
+ ret_idmap[retid] = gr
+
+ return retid
+
+def ret(retid,value = None,err = None):
+ global gr_main
+ global gr_idmap
+ global ret_idmap
+
+ assert greenlet.getcurrent() == gr_main
try:
- gr,iden = gr_waitmap[grid]
+ gr = ret_idmap.pop(retid)
+ gr_idmap[id(gr)].remove(retid)
+
+ except KeyError:
+ return
+ try:
old_iden = auth.current_iden
- auth.current_iden = iden
if err == None:
gr.switch(value)
@@ -81,7 +103,6 @@ def retcall(grid,value = None,err = None):
auth.current_iden = old_iden
- except Exception as err:
+ except TypeError as err:
traceback.print_stack()
print(err)
- pass
diff --git a/src/py/imc/auth.py b/src/py/imc/auth.py
index 16c12d9..08e1417 100644
--- a/src/py/imc/auth.py
+++ b/src/py/imc/auth.py
@@ -3,7 +3,7 @@ import json
import binascii
from Crypto.PublicKey import RSA
-from Crypto.Hash import SHA,SHA512
+from Crypto.Hash import SHA512
from Crypto.Signature import PKCS1_v1_5
current_iden = None
diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py
index db23c8a..2a25323 100755
--- a/src/py/imc/proxy.py
+++ b/src/py/imc/proxy.py
@@ -2,7 +2,9 @@ import json
import uuid
import os
import datetime
+import ssl
+from Crypto.Hash import SHA512
import tornado.ioloop
import tornado.stack_context
@@ -38,7 +40,7 @@ class Connection:
class FileResult():
def __init__(self,filekey):
self.filekey = filekey
- self._gr = None
+ self._retid = None
self._result = None
def ret_result(self,res):
@@ -46,13 +48,13 @@ class FileResult():
return
self._result = res
- if self._gr != None:
- self._gr.switch()
+ if self._retid != None:
+ async.ret(self._retid)
def wait(self):
if self._result == None:
- self._gr = async.current()
- async.switchtop()
+ self._retid = async.get_retid()
+ async.switch_top()
return self._result
@@ -81,9 +83,6 @@ class Proxy:
self._info_filekeymap = {}
- self._check_waitcaller_timer = tornado.ioloop.PeriodicCallback(self._check_waitcaller,1000)
- self._check_waitcaller_timer.start()
-
Proxy.instance = self
self.register_call('imc/','pend_recvfile',self._pend_recvfile)
@@ -112,30 +111,16 @@ class Proxy:
del conn.link_linkidmap[linkid]
def del_conn(self,conn):
- wait_map = self._conn_retidmap[conn.linkid]
- waits = wait_map.items()
- wait_ret = []
- for retid,wait in waits:
- wait_ret.append((wait['caller_linkid'],retid))
-
- for linkid,retid in wait_ret:
- self._ret_call(linkid,retid,(False,'Eclose'))
-
- fail_map = self._conn_filekeymap[conn.linkid]
- fails = fail_map.values()
- fail_cb = []
- for callback in fails:
- fail_cb.append(callback)
-
- for callback in fail_cb:
+ callbacks = list(self._conn_retidmap[conn.linkid].values())
+ for callback in callbacks:
+ callback((False,'Eclose'))
+
+ callbacks = list(self._conn_filekeymap[conn.linkid].values())
+ for callback in callbacks:
callback('Eclose')
- linkids = conn.link_linkidmap.keys()
- link_del = []
+ linkids = list(conn.link_linkidmap.keys())
for linkid in linkids:
- link_del.append(linkid)
-
- for linkid in link_del:
self.unlink_conn(linkid)
del self._conn_linkidmap[conn.linkid]
@@ -152,20 +137,11 @@ class Proxy:
def register_call(self,path,func_name,func):
self._call_pathmap[''.join([path,func_name])] = func
- def call(self,caller_grid,timeout,idendesc,dst,func_name,param):
- caller_retid = ''.join([self._linkid,'/',caller_grid])
- return self._route_call(None,caller_retid,timeout,idendesc,dst,func_name,param)
+ def call(self,timeout,idendesc,dst,func_name,param):
+ return self._route_call(None,async.get_retid(),timeout,idendesc,dst,func_name,param)
def sendfile(self,dst_link,filepath):
- @async.callee
- def _call(_grid):
- self.call(_grid,
- 10000,
- self._idendesc,
- dst_link + 'imc/','pend_recvfile',
- {'filekey':filekey,'filesize':filesize})
-
- filekey = str(uuid.uuid1())
+ filekey = SHA512.new(uuid.uuid1().bytes + ssl.RAND_bytes(64)).hexdigest()
filesize = os.stat(filepath).st_size
fileresult = FileResult(filekey)
@@ -177,7 +153,7 @@ class Proxy:
'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),lambda : self._ret_sendfile('Etimeout'))
}
- _call()
+ self.call(10000,self._idendesc,dst_link + 'imc/','pend_recvfile',{'filekey':filekey,'filesize':filesize})
return fileresult
@@ -193,7 +169,7 @@ class Proxy:
in_conn.abort_file(filekey)
self._send_msg_abortfile(in_conn,filekey,err)
- self._ret_sendfile(filekey,err)
+ self._ioloop.add_callback(self._ret_sendfile,filekey,err)
try:
info = self._info_filekeymap[filekey]
@@ -213,14 +189,6 @@ class Proxy:
return info['fileresult']
def rejectfile(self,filekey):
- @async.callee
- def _call(_grid):
- self.call(_grid,
- 10000,
- self._idendesc,
- dst_link + 'imc/','reject_sendfile',
- {'filekey':filekey})
-
try:
info = self._info_filekeymap.pop(filekey)
@@ -228,14 +196,18 @@ class Proxy:
return
dst_link = ''.join(['/',info['src_linkclass'],'/',info['src_linkid'],'/'])
- _call()
+ self.call(10000,self._idendesc,dst_link + 'imc/','reject_sendfile',{'filekey':filekey})
def _route_call(self,in_conn,caller_retid,timeout,idendesc,dst,func_name,param):
def __add_wait_caller(in_linkid):
- self._conn_retidmap[in_linkid][caller_retid] = {
- 'caller_linkid':caller_linkid,
- 'timeout':timeout
- }
+ def ___call(result):
+ self._ioloop.remove_timeout(timer)
+ self._ret_call(caller_linkid,caller_retid,result)
+
+ callback = tornado.stack_context.wrap(___call)
+ timer = self._ioloop.add_timeout(datetime.timedelta(milliseconds = timeout),lambda : callback(('False','Etimeout')))
+
+ self._conn_retidmap[in_linkid][caller_retid] = callback
def __ret(result):
if caller_linkid == self._linkid:
@@ -263,7 +235,6 @@ class Proxy:
dst_path = dst_part[3]
caller_linkid = iden['linkid']
- assert caller_retid.split('/',1)[0] == caller_linkid
if dst_linkid == self._linkid:
__add_wait_caller(self._linkid)
@@ -290,7 +261,8 @@ class Proxy:
__add_wait_caller(conn.linkid)
self._send_msg_call(conn,caller_retid,timeout,idendesc,dst,func_name,param)
- result = async.switchtop()
+ result = async.switch_top()
+
del self._conn_retidmap[conn.linkid][caller_retid]
return __ret(result)
@@ -308,8 +280,7 @@ class Proxy:
self._send_msg_ret(conn,caller_linkid,caller_retid,result)
if caller_linkid == self._linkid:
- grid = caller_retid.split('/',1)[1]
- async.retcall(grid,result)
+ async.ret(caller_retid,result)
else:
__ret_remote()
@@ -326,7 +297,7 @@ class Proxy:
out_conn.abort_file(filekey)
self._send_msg_abortfile(out_conn,filekey,err)
- self._ret_sendfile(filekey,err)
+ self._ioloop.add_callback(self._ret_sendfile,filekey,err)
def __bridge_fail_cb(err):
try:
@@ -352,22 +323,17 @@ class Proxy:
if src_linkid == self._linkid:
try:
info = self._info_filekeymap[filekey]
- assert info['filesize'] == filesize
+ if info['filesize'] != filesize:
+ self._ioloop.add_callback(self._ret_sendfile,filekey,'Efilesize')
except KeyError:
- self._ret_sendfile(filekey,'Enoexist')
- return
-
- except AssertionError:
- self._ret_sendfile(filekey,'Efilesize')
+ self._ioloop.add_callback(self._ret_sendfile,filekey,'Enoexist')
return
self._add_wait_filekey(filekey,filesize,None,out_conn,__send_fail_cb)
out_conn.send_file(filekey,info['filepath'],lambda : self._ret_sendfile(filekey))
else:
- print('test start')
-
in_conn = self._request_conn(src_linkid)
self._add_wait_filekey(filekey,filesize,in_conn,out_conn,__bridge_fail_cb)
@@ -375,6 +341,20 @@ class Proxy:
in_conn.recv_filedata(filekey,filesize,send_fn)
self._send_msg_sendfile(in_conn,src_linkid,filekey,filesize)
+
+ def _add_wait_filekey(self,filekey,filesize,in_conn,out_conn,fail_callback):
+ def __call(err):
+ self._ioloop.remove_timeout(timer)
+ fail_callback(err)
+
+ callback = tornado.stack_context.wrap(__call)
+ timer = self._ioloop.add_timeout(datetime.timedelta(milliseconds = filesize),lambda : callback('Etimeout'))
+
+ if in_conn != None:
+ self._conn_filekeymap[in_conn.linkid][filekey] = callback
+
+ if out_conn != None:
+ self._conn_filekeymap[out_conn.linkid][filekey] = callback
def _ret_sendfile(self,filekey,err = None):
try:
@@ -387,10 +367,10 @@ class Proxy:
fileresult = info['fileresult']
if err == None:
- self._ioloop.add_callback(lambda : fileresult.ret_result('Success'))
+ fileresult.ret_result('Success')
else:
- self._ioloop.add_callback(lambda : fileresult.ret_result(err))
+ fileresult.ret_result(err)
def _request_conn(self,linkid):
try:
@@ -404,6 +384,10 @@ class Proxy:
return conn
+ def _conn_close_cb(self,conn):
+ self.del_conn(conn)
+ print('connection close')
+
def _recv_dispatch(self,conn,data):
msg = json.loads(data.decode('utf-8'))
msg_type = msg['type']
@@ -420,38 +404,6 @@ class Proxy:
elif msg_type == self.MSGTYPE_ABORTFILE:
self._recv_msg_abortfile(conn,msg)
- def _conn_close_cb(self,conn):
- self.del_conn(conn)
- print('connection close')
-
- def _add_wait_filekey(self,filekey,filesize,in_conn,out_conn,fail_callback):
- def __call(err):
- self._ioloop.remove_timeout(timer)
- fail_callback(err)
-
- callback = tornado.stack_context.wrap(__call)
- timer = self._ioloop.add_timeout(datetime.timedelta(milliseconds = filesize),lambda : callback('Etimeout'))
-
- if in_conn != None:
- self._conn_filekeymap[in_conn.linkid][filekey] = callback
-
- if out_conn != None:
- self._conn_filekeymap[out_conn.linkid][filekey] = callback
-
- def _check_waitcaller(self):
- wait_maps = self._conn_retidmap.values()
- for wait_map in wait_maps:
- waits = wait_map.items()
- wait_ret = []
- for retid,wait in waits:
- wait['timeout'] -= 1000
-
- if wait['timeout'] <= 0:
- wait_ret.append((wait['caller_linkid'],retid))
-
- for linkid,retid in wait_ret:
- self._ret_call(linkid,retid,(False,'Etimeout'))
-
def _send_msg_call(self,conn,caller_retid,timeout,idendesc,dst,func_name,param):
msg = {
'type':self.MSGTYPE_CALL,
@@ -558,11 +510,10 @@ class Proxy:
@async.caller
def _reject_sendfile(self,iden,param):
filekey = param['filekey']
- self._ret_sendfile(filekey,'Ereject')
+ self._ioloop.add_callback(self._ret_sendfile,filekey,'Ereject')
-@async.callee
-def imc_call(idendesc,dst,func_name,param,_grid):
- return Proxy.instance.call(_grid,1000000,idendesc,dst,func_name,param)
+def imc_call(idendesc,dst,func_name,param):
+ return Proxy.instance.call(1000000,idendesc,dst,func_name,param)
def imc_call_async(idendesc,dst,func_name,param,callback = None):
@async.caller
diff --git a/src/py/tojauth.py b/src/py/tojauth.py
index 0f26d15..11877ff 100644
--- a/src/py/tojauth.py
+++ b/src/py/tojauth.py
@@ -79,13 +79,13 @@ class TOJAuth(Auth):
return wrapper
- def create_access(self):
+ def create_access(self, owner_idenid):
self.check_access(self.auth_accessid, self.ACCESS_EXECUTE)(0)
cur = self.db.cursor()
sqlstr = ('INSERT INTO "ACCESS" ("owner_idenid") VALUES (%s) '
'RETURNING "accessid";')
- sqlarr = (self.current_iden['idenid'], )
+ sqlarr = (owner_idenid, )
cur.execute(sqlstr, sqlarr)
for data in cur: