diff options
author | pzread <netfirewall@gmail.com> | 2013-05-08 14:53:00 +0800 |
---|---|---|
committer | pzread <netfirewall@gmail.com> | 2013-05-08 14:53:00 +0800 |
commit | 026f1974af083ddc46f3b0438cdb553923a289c6 (patch) | |
tree | 3e2695e933d2bce80b0b5baa2b2800cdabcf3e59 | |
parent | 41e592ce5585bd078280e79723391b7e78c0fb1a (diff) | |
download | taiwan-online-judge-026f1974af083ddc46f3b0438cdb553923a289c6.tar taiwan-online-judge-026f1974af083ddc46f3b0438cdb553923a289c6.tar.gz taiwan-online-judge-026f1974af083ddc46f3b0438cdb553923a289c6.tar.bz2 taiwan-online-judge-026f1974af083ddc46f3b0438cdb553923a289c6.tar.lz taiwan-online-judge-026f1974af083ddc46f3b0438cdb553923a289c6.tar.xz taiwan-online-judge-026f1974af083ddc46f3b0438cdb553923a289c6.tar.zst taiwan-online-judge-026f1974af083ddc46f3b0438cdb553923a289c6.zip |
Tmp backup
-rw-r--r-- | src/py/backend_server.py | 6 | ||||
-rw-r--r-- | src/py/center_server.py | 9 | ||||
-rwxr-xr-x | src/py/imc/proxy.py | 102 |
3 files changed, 79 insertions, 38 deletions
diff --git a/src/py/backend_server.py b/src/py/backend_server.py index 28480b8..6081a6a 100644 --- a/src/py/backend_server.py +++ b/src/py/backend_server.py @@ -56,12 +56,12 @@ class BackendWorker(): @imc.nonblock.func def _test_call(self,param): - print('test') - imc_call(None,'/backend/' + self.linkid,'test_dst','Hello') + ret = (yield imc_call(None,'/backend/' + self.center_conn.linkid,'test_dst','Hello')) + print(ret) @imc.nonblock.func def _test_dst(self,param): - print('dst') + return 'Hello Too' if __name__ == '__main__': backend_worker = BackendWorker(('localhost',5730)) diff --git a/src/py/center_server.py b/src/py/center_server.py index 2e03ccd..0fecb23 100644 --- a/src/py/center_server.py +++ b/src/py/center_server.py @@ -10,7 +10,8 @@ import tornado.httpserver import tornado.web import netio -from imc.proxy import Proxy,Connection +import imc.nonblock +from imc.proxy import Proxy,Connection,imc_register_call class Worker: def __init__(self,stream,linkclass,linkid,worker_ip): @@ -64,6 +65,8 @@ class CenterServer(tornado.tcpserver.TCPServer): print('/center/' + self.linkid) + imc_register_call('','test_dst',self._test_dst) + def handle_stream(self,stream,address): def _recv_worker_info(data): worker_info = json.loads(data.decode('utf-8')) @@ -99,6 +102,10 @@ class CenterServer(tornado.tcpserver.TCPServer): return linkid + @imc.nonblock.func + def _test_dst(self,param): + return 'Hello Too' + class WebConnHandler(tornado.web.RequestHandler): def get(self): global center_serv diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py index 0d6a942..371285f 100755 --- a/src/py/imc/proxy.py +++ b/src/py/imc/proxy.py @@ -3,7 +3,7 @@ import json import tornado.ioloop import tornado.stack_context -import nonblock +from imc import nonblock class Connection: def __init__(self,linkid): @@ -25,30 +25,36 @@ class Connection: class Proxy: def __init__(self,linkid): + self._ioloop = tornado.ioloop.IOLoop.instance() self._linkid = linkid self._conn_linkidmap = {} - self._conn_waitretmap = {} + self._caller_genidmap = {self._linkid:{}} self._call_pathmap = {} self.MSGTYPE_CALL = 'call' self.MSGTYPE_RET = 'ret' - self._check_waitret_timer = tornado.ioloop.PeriodicCallback(self._check_waitret,1000) - self._check_waitret_timer.start() + self._check_waitcaller_timer = tornado.ioloop.PeriodicCallback(self._check_waitcaller,1000) + self._check_waitcaller_timer.start() Proxy.instance = self def add_conn(self,conn): self._conn_linkidmap[conn.linkid] = conn - self._conn_waitretmap[conn.linkid] = {} + self._caller_genidmap[conn.linkid] = {} conn.add_close_callback(self._conn_close_cb) conn.start_recvloop(self._recvloop_dispatch) def del_conn(self,conn): + wait_map = self._caller_genidmap[conn.linkid] + wait_genids = wait_map.keys() + for genid in wait_genids: + wait_map[genid]['callback'](genid,'close',None) + del self._conn_linkidmap[conn.linkid] - del self._conn_waitretmap[conn.linkid] + del self._caller_genidmap[conn.linkid] def get_conn(self,linkid): if linkid not in self._conn_linkidmap: @@ -56,32 +62,50 @@ class Proxy: return self._conn_linkidmap[linkid] - def call(self,genid,iden,dst,func_name,param): - def _fail_cb(genid): + def call(self,genid,timeout,iden,dst,func_name,param): + def _call_cb(genid,err,retvalue): print('Opps') - self._route_call(genid,_fail_cb,iden,dst,func_name,param) + try: + stat,retvalue = self._route_call(genid,iden,dst,func_name,param) + if stat == True: + self._ioloop.add_callback(nonblock.retcall,genid,retvalue) + else: + self._add_waitcaller(self._linkid,genid,timeout,_call_cb) + + except Exception as err: + _call_cb(genid,err,None) def register_call(self,path,func_name,func): self._call_pathmap[''.join([path,'/',func_name])] = func - def _route_call(self,genid,fail_callback,iden,dst,func_name,param): + def _route_call(self,genid,iden,dst,func_name,param): dst_part = dst.split('/')[1:] linkid = dst_part[1] path = ''.join(dst_part[2:]) if linkid == self._linkid: - self._handle_call(genid,fail_callback,iden,path,func_name,param) + stat,retvalue = self._handle_call(genid,iden,path,func_name,param) + if stat == True: + ret = (True,retvalue) + else: + ret = (False,self._linkid) + else: conn = self.get_conn(linkid) if conn == None: pass - def _handle_call(self,genid,fail_callback,iden,path,func_name,param): + self._send_msg_call(conn,genid,iden,dst,func_name,param) + ret = (False,conn.linkid) + + return ret + + def _handle_call(self,genid,iden,path,func_name,param): try: - self._call_pathmap[''.join([path,'/',func_name])](param) + return self._call_pathmap[''.join([path,'/',func_name])](param) except KeyError: - fail_callback(genid) + raise Exception('notexist') def _recvloop_dispatch(self,conn,data): msg = json.loads(data.decode('utf-8')) @@ -92,16 +116,18 @@ class Proxy: self._recv_msg_ret(conn,msg) def _conn_close_cb(self,conn): - wait_map = self._conn_waitretmap[conn.linkid] - wait_genids = wait_map.keys() - for genid in wait_genids: - wait_map[genid]['fail_callback'](genid) - self.del_conn(conn) print('connection close') - def _check_waitret(self): - wait_maps = self._conn_waitretmap.values() + def _add_waitcaller(linkid,genid,timeout,callback): + wait = { + 'timeout':timeout, + 'callback':tornado.stack_context.wrap(callback) + } + self._caller_genidmap[linkid][genid] = wait + + def _check_waitcaller(self): + wait_maps = self._caller_genidmap.values() for wait_map in wait_maps: wait_genids = wait_map.keys() wait_del = [] @@ -110,38 +136,45 @@ class Proxy: wait['timeout'] -= 1000 if wait['timeout'] <= 0: - wait['fail_callback'](genid) + wait['callback'](genid,'timeout',None) wait_del.append(genid) for genid in wait_del: del wait_map[genid] - def _send_msg_call(self,conn,timeout,genid,fail_callback,iden,dst,func,param): - wait = { - 'timeout':timeout, - 'fail_callback':tornado.stack_context.wrap(fail_callback) - } + def _send_msg_call(self,conn,genid,iden,dst,func_name,param): msg = { 'type':self.MSGTYPE_CALL, 'genid':genid, 'iden':iden, 'dst':dst, - 'func':func, + 'func_name':func_name, 'param':param } - self._conn_waitretmap[conn.linkid][genid] = wait conn.send_msg(bytes(json.dumps(msg),'utf-8')) def _recv_msg_call(self,conn,msg): genid = msg['genid'] iden = msg['iden'] dst = msg['dst'] - func = msg['func'] + func_name = msg['func_name'] param = msg['param'] - print(genid) + def _call_cb(genid,err,retvalue): + print('Opps') + + try: + stat,retvalue = self._route_call(genid,iden,dst,func_name,param) + if stat == True: + self._send_msg_ret(conn,genid,retvalue) + + else: + pass + + except Exception as err: + _call_cb(genid,err,None) - self._send_msg_ret(conn,genid,'Hello') + #self._send_msg_ret(conn,genid,'Hello') def _send_msg_ret(self,conn,genid,retvalue): msg = { @@ -155,13 +188,14 @@ class Proxy: genid = msg['genid'] retvalue = msg['retvalue'] - self._conn_waitretmap[conn.linkid].pop(genid) + print(self._caller_genidmap) + self._caller_genidmap[conn.linkid].pop(genid) print(retvalue) @nonblock.call def imc_call(iden,dst,func_name,param,_genid): - Proxy.instance.call(_genid,iden,dst,func_name,param) + Proxy.instance.call(_genid,60000,iden,dst,func_name,param) def imc_register_call(path,func_name,func): Proxy.instance.register_call(path,func_name,func) |