aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorpzread <netfirewall@gmail.com>2013-05-08 14:53:00 +0800
committerpzread <netfirewall@gmail.com>2013-05-08 14:53:00 +0800
commit026f1974af083ddc46f3b0438cdb553923a289c6 (patch)
tree3e2695e933d2bce80b0b5baa2b2800cdabcf3e59
parent41e592ce5585bd078280e79723391b7e78c0fb1a (diff)
downloadtaiwan-online-judge-026f1974af083ddc46f3b0438cdb553923a289c6.tar
taiwan-online-judge-026f1974af083ddc46f3b0438cdb553923a289c6.tar.gz
taiwan-online-judge-026f1974af083ddc46f3b0438cdb553923a289c6.tar.bz2
taiwan-online-judge-026f1974af083ddc46f3b0438cdb553923a289c6.tar.lz
taiwan-online-judge-026f1974af083ddc46f3b0438cdb553923a289c6.tar.xz
taiwan-online-judge-026f1974af083ddc46f3b0438cdb553923a289c6.tar.zst
taiwan-online-judge-026f1974af083ddc46f3b0438cdb553923a289c6.zip
Tmp backup
-rw-r--r--src/py/backend_server.py6
-rw-r--r--src/py/center_server.py9
-rwxr-xr-xsrc/py/imc/proxy.py102
3 files changed, 79 insertions, 38 deletions
diff --git a/src/py/backend_server.py b/src/py/backend_server.py
index 28480b8..6081a6a 100644
--- a/src/py/backend_server.py
+++ b/src/py/backend_server.py
@@ -56,12 +56,12 @@ class BackendWorker():
@imc.nonblock.func
def _test_call(self,param):
- print('test')
- imc_call(None,'/backend/' + self.linkid,'test_dst','Hello')
+ ret = (yield imc_call(None,'/backend/' + self.center_conn.linkid,'test_dst','Hello'))
+ print(ret)
@imc.nonblock.func
def _test_dst(self,param):
- print('dst')
+ return 'Hello Too'
if __name__ == '__main__':
backend_worker = BackendWorker(('localhost',5730))
diff --git a/src/py/center_server.py b/src/py/center_server.py
index 2e03ccd..0fecb23 100644
--- a/src/py/center_server.py
+++ b/src/py/center_server.py
@@ -10,7 +10,8 @@ import tornado.httpserver
import tornado.web
import netio
-from imc.proxy import Proxy,Connection
+import imc.nonblock
+from imc.proxy import Proxy,Connection,imc_register_call
class Worker:
def __init__(self,stream,linkclass,linkid,worker_ip):
@@ -64,6 +65,8 @@ class CenterServer(tornado.tcpserver.TCPServer):
print('/center/' + self.linkid)
+ imc_register_call('','test_dst',self._test_dst)
+
def handle_stream(self,stream,address):
def _recv_worker_info(data):
worker_info = json.loads(data.decode('utf-8'))
@@ -99,6 +102,10 @@ class CenterServer(tornado.tcpserver.TCPServer):
return linkid
+ @imc.nonblock.func
+ def _test_dst(self,param):
+ return 'Hello Too'
+
class WebConnHandler(tornado.web.RequestHandler):
def get(self):
global center_serv
diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py
index 0d6a942..371285f 100755
--- a/src/py/imc/proxy.py
+++ b/src/py/imc/proxy.py
@@ -3,7 +3,7 @@ import json
import tornado.ioloop
import tornado.stack_context
-import nonblock
+from imc import nonblock
class Connection:
def __init__(self,linkid):
@@ -25,30 +25,36 @@ class Connection:
class Proxy:
def __init__(self,linkid):
+ self._ioloop = tornado.ioloop.IOLoop.instance()
self._linkid = linkid
self._conn_linkidmap = {}
- self._conn_waitretmap = {}
+ self._caller_genidmap = {self._linkid:{}}
self._call_pathmap = {}
self.MSGTYPE_CALL = 'call'
self.MSGTYPE_RET = 'ret'
- self._check_waitret_timer = tornado.ioloop.PeriodicCallback(self._check_waitret,1000)
- self._check_waitret_timer.start()
+ self._check_waitcaller_timer = tornado.ioloop.PeriodicCallback(self._check_waitcaller,1000)
+ self._check_waitcaller_timer.start()
Proxy.instance = self
def add_conn(self,conn):
self._conn_linkidmap[conn.linkid] = conn
- self._conn_waitretmap[conn.linkid] = {}
+ self._caller_genidmap[conn.linkid] = {}
conn.add_close_callback(self._conn_close_cb)
conn.start_recvloop(self._recvloop_dispatch)
def del_conn(self,conn):
+ wait_map = self._caller_genidmap[conn.linkid]
+ wait_genids = wait_map.keys()
+ for genid in wait_genids:
+ wait_map[genid]['callback'](genid,'close',None)
+
del self._conn_linkidmap[conn.linkid]
- del self._conn_waitretmap[conn.linkid]
+ del self._caller_genidmap[conn.linkid]
def get_conn(self,linkid):
if linkid not in self._conn_linkidmap:
@@ -56,32 +62,50 @@ class Proxy:
return self._conn_linkidmap[linkid]
- def call(self,genid,iden,dst,func_name,param):
- def _fail_cb(genid):
+ def call(self,genid,timeout,iden,dst,func_name,param):
+ def _call_cb(genid,err,retvalue):
print('Opps')
- self._route_call(genid,_fail_cb,iden,dst,func_name,param)
+ try:
+ stat,retvalue = self._route_call(genid,iden,dst,func_name,param)
+ if stat == True:
+ self._ioloop.add_callback(nonblock.retcall,genid,retvalue)
+ else:
+ self._add_waitcaller(self._linkid,genid,timeout,_call_cb)
+
+ except Exception as err:
+ _call_cb(genid,err,None)
def register_call(self,path,func_name,func):
self._call_pathmap[''.join([path,'/',func_name])] = func
- def _route_call(self,genid,fail_callback,iden,dst,func_name,param):
+ def _route_call(self,genid,iden,dst,func_name,param):
dst_part = dst.split('/')[1:]
linkid = dst_part[1]
path = ''.join(dst_part[2:])
if linkid == self._linkid:
- self._handle_call(genid,fail_callback,iden,path,func_name,param)
+ stat,retvalue = self._handle_call(genid,iden,path,func_name,param)
+ if stat == True:
+ ret = (True,retvalue)
+ else:
+ ret = (False,self._linkid)
+
else:
conn = self.get_conn(linkid)
if conn == None:
pass
- def _handle_call(self,genid,fail_callback,iden,path,func_name,param):
+ self._send_msg_call(conn,genid,iden,dst,func_name,param)
+ ret = (False,conn.linkid)
+
+ return ret
+
+ def _handle_call(self,genid,iden,path,func_name,param):
try:
- self._call_pathmap[''.join([path,'/',func_name])](param)
+ return self._call_pathmap[''.join([path,'/',func_name])](param)
except KeyError:
- fail_callback(genid)
+ raise Exception('notexist')
def _recvloop_dispatch(self,conn,data):
msg = json.loads(data.decode('utf-8'))
@@ -92,16 +116,18 @@ class Proxy:
self._recv_msg_ret(conn,msg)
def _conn_close_cb(self,conn):
- wait_map = self._conn_waitretmap[conn.linkid]
- wait_genids = wait_map.keys()
- for genid in wait_genids:
- wait_map[genid]['fail_callback'](genid)
-
self.del_conn(conn)
print('connection close')
- def _check_waitret(self):
- wait_maps = self._conn_waitretmap.values()
+ def _add_waitcaller(linkid,genid,timeout,callback):
+ wait = {
+ 'timeout':timeout,
+ 'callback':tornado.stack_context.wrap(callback)
+ }
+ self._caller_genidmap[linkid][genid] = wait
+
+ def _check_waitcaller(self):
+ wait_maps = self._caller_genidmap.values()
for wait_map in wait_maps:
wait_genids = wait_map.keys()
wait_del = []
@@ -110,38 +136,45 @@ class Proxy:
wait['timeout'] -= 1000
if wait['timeout'] <= 0:
- wait['fail_callback'](genid)
+ wait['callback'](genid,'timeout',None)
wait_del.append(genid)
for genid in wait_del:
del wait_map[genid]
- def _send_msg_call(self,conn,timeout,genid,fail_callback,iden,dst,func,param):
- wait = {
- 'timeout':timeout,
- 'fail_callback':tornado.stack_context.wrap(fail_callback)
- }
+ def _send_msg_call(self,conn,genid,iden,dst,func_name,param):
msg = {
'type':self.MSGTYPE_CALL,
'genid':genid,
'iden':iden,
'dst':dst,
- 'func':func,
+ 'func_name':func_name,
'param':param
}
- self._conn_waitretmap[conn.linkid][genid] = wait
conn.send_msg(bytes(json.dumps(msg),'utf-8'))
def _recv_msg_call(self,conn,msg):
genid = msg['genid']
iden = msg['iden']
dst = msg['dst']
- func = msg['func']
+ func_name = msg['func_name']
param = msg['param']
- print(genid)
+ def _call_cb(genid,err,retvalue):
+ print('Opps')
+
+ try:
+ stat,retvalue = self._route_call(genid,iden,dst,func_name,param)
+ if stat == True:
+ self._send_msg_ret(conn,genid,retvalue)
+
+ else:
+ pass
+
+ except Exception as err:
+ _call_cb(genid,err,None)
- self._send_msg_ret(conn,genid,'Hello')
+ #self._send_msg_ret(conn,genid,'Hello')
def _send_msg_ret(self,conn,genid,retvalue):
msg = {
@@ -155,13 +188,14 @@ class Proxy:
genid = msg['genid']
retvalue = msg['retvalue']
- self._conn_waitretmap[conn.linkid].pop(genid)
+ print(self._caller_genidmap)
+ self._caller_genidmap[conn.linkid].pop(genid)
print(retvalue)
@nonblock.call
def imc_call(iden,dst,func_name,param,_genid):
- Proxy.instance.call(_genid,iden,dst,func_name,param)
+ Proxy.instance.call(_genid,60000,iden,dst,func_name,param)
def imc_register_call(path,func_name,func):
Proxy.instance.register_call(path,func_name,func)