diff options
Diffstat (limited to 'src/py')
-rw-r--r-- | src/py/backend_server.py | 22 | ||||
-rw-r--r-- | src/py/center_server.py | 6 | ||||
-rw-r--r-- | src/py/imc/__init__.py | 1 | ||||
-rw-r--r-- | src/py/imc/nonblock.py (renamed from src/py/nonblock.py) | 2 | ||||
-rwxr-xr-x[-rw-r--r--] | src/py/imc/proxy.py (renamed from src/py/imcproxy.py) | 74 | ||||
-rw-r--r-- | src/py/netio.py | 4 |
6 files changed, 92 insertions, 17 deletions
diff --git a/src/py/backend_server.py b/src/py/backend_server.py index 544f647..28480b8 100644 --- a/src/py/backend_server.py +++ b/src/py/backend_server.py @@ -8,7 +8,8 @@ import tornado.iostream import tornado.ioloop import netio -import imcproxy +import imc.nonblock +from imc.proxy import Proxy,Connection,imc_call,imc_register_call class BackendWorker(): def __init__(self,center_addr): @@ -17,7 +18,6 @@ class BackendWorker(): self.linkclass = 'backend' self.linkid = None - self.imc_proxy = imcproxy.IMCProxy() def start(self): self._conn_center() @@ -32,16 +32,17 @@ class BackendWorker(): info = json.loads(data.decode('utf-8')) self.linkid = info['linkid'] + Proxy(self.linkid) + self.center_conn = netio.SocketConnection(info['center_linkid'],stream) self.center_conn.add_close_callback(lambda conn : __retry()) - self.imc_proxy.add_conn(self.center_conn) + Proxy.instance.add_conn(self.center_conn) print('/backend/' + self.linkid) - def ___tmp(genid): - print(genid) - self.imc_proxy._send_msg_call(self.center_conn,5000,13,___tmp,None,None,'Hello',None) + imc_register_call('','test_dst',self._test_dst) + self._test_call(None) netio.send_pack(stream,bytes(json.dumps({ 'linkclass':self.linkclass, @@ -52,6 +53,15 @@ class BackendWorker(): stream = tornado.iostream.IOStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)) stream.set_close_callback(__retry) stream.connect(self.center_addr,lambda : __send_worker_info()) + + @imc.nonblock.func + def _test_call(self,param): + print('test') + imc_call(None,'/backend/' + self.linkid,'test_dst','Hello') + + @imc.nonblock.func + def _test_dst(self,param): + print('dst') if __name__ == '__main__': backend_worker = BackendWorker(('localhost',5730)) diff --git a/src/py/center_server.py b/src/py/center_server.py index cb4ef75..2e03ccd 100644 --- a/src/py/center_server.py +++ b/src/py/center_server.py @@ -10,7 +10,7 @@ import tornado.httpserver import tornado.web import netio -import imcproxy +from imc.proxy import Proxy,Connection class Worker: def __init__(self,stream,linkclass,linkid,worker_ip): @@ -28,7 +28,7 @@ class Worker: conn = netio.SocketConnection(self.linkid,self.stream) conn.add_close_callback(lambda conn : self.close()) - center_serv.imc_proxy.add_conn(conn) + Proxy.instance.add_conn(conn) def close(self): pass @@ -58,9 +58,9 @@ class CenterServer(tornado.tcpserver.TCPServer): self.linkid_usemap = {} self.backend_workerlist = [] - self.imc_proxy = imcproxy.IMCProxy() self.linkclass = 'center' self.linkid = self._create_linkid() + Proxy(self.linkid) print('/center/' + self.linkid) diff --git a/src/py/imc/__init__.py b/src/py/imc/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/py/imc/__init__.py @@ -0,0 +1 @@ + diff --git a/src/py/nonblock.py b/src/py/imc/nonblock.py index 8d21cc4..8726c49 100644 --- a/src/py/nonblock.py +++ b/src/py/imc/nonblock.py @@ -8,7 +8,7 @@ def call(f): global gen_current_id global gen_waitmap - kwargs['genid'] = gen_current_id + kwargs['_genid'] = gen_current_id return f(*args,**kwargs) return wrapper diff --git a/src/py/imcproxy.py b/src/py/imc/proxy.py index 25784f4..0d6a942 100644..100755 --- a/src/py/imcproxy.py +++ b/src/py/imc/proxy.py @@ -3,7 +3,9 @@ import json import tornado.ioloop import tornado.stack_context -class IMCConnection: +import nonblock + +class Connection: def __init__(self,linkid): self.linkid = linkid self._close_callback = [] @@ -21,10 +23,13 @@ class IMCConnection: for callback in self._close_callback: callback(self) -class IMCProxy: - def __init__(self): +class Proxy: + def __init__(self,linkid): + self._linkid = linkid + self._conn_linkidmap = {} self._conn_waitretmap = {} + self._call_pathmap = {} self.MSGTYPE_CALL = 'call' self.MSGTYPE_RET = 'ret' @@ -32,6 +37,8 @@ class IMCProxy: self._check_waitret_timer = tornado.ioloop.PeriodicCallback(self._check_waitret,1000) self._check_waitret_timer.start() + Proxy.instance = self + def add_conn(self,conn): self._conn_linkidmap[conn.linkid] = conn self._conn_waitretmap[conn.linkid] = {} @@ -44,16 +51,45 @@ class IMCProxy: del self._conn_waitretmap[conn.linkid] def get_conn(self,linkid): - if linkid not in self.conn_linkidmap: + if linkid not in self._conn_linkidmap: return None - return self.conn_linkidmap[linkid] + return self._conn_linkidmap[linkid] + + def call(self,genid,iden,dst,func_name,param): + def _fail_cb(genid): + print('Opps') + + self._route_call(genid,_fail_cb,iden,dst,func_name,param) + + def register_call(self,path,func_name,func): + self._call_pathmap[''.join([path,'/',func_name])] = func + + def _route_call(self,genid,fail_callback,iden,dst,func_name,param): + dst_part = dst.split('/')[1:] + linkid = dst_part[1] + path = ''.join(dst_part[2:]) + + if linkid == self._linkid: + self._handle_call(genid,fail_callback,iden,path,func_name,param) + else: + conn = self.get_conn(linkid) + if conn == None: + pass + + def _handle_call(self,genid,fail_callback,iden,path,func_name,param): + try: + self._call_pathmap[''.join([path,'/',func_name])](param) + except KeyError: + fail_callback(genid) def _recvloop_dispatch(self,conn,data): msg = json.loads(data.decode('utf-8')) msg_type = msg['type'] if msg_type == self.MSGTYPE_CALL: self._recv_msg_call(conn,msg) + elif msg_type == self.MSGTYPE_RET: + self._recv_msg_ret(conn,msg) def _conn_close_cb(self,conn): wait_map = self._conn_waitretmap[conn.linkid] @@ -97,7 +133,35 @@ class IMCProxy: conn.send_msg(bytes(json.dumps(msg),'utf-8')) def _recv_msg_call(self,conn,msg): + genid = msg['genid'] iden = msg['iden'] dst = msg['dst'] func = msg['func'] param = msg['param'] + + print(genid) + + self._send_msg_ret(conn,genid,'Hello') + + def _send_msg_ret(self,conn,genid,retvalue): + msg = { + 'type':self.MSGTYPE_RET, + 'genid':genid, + 'retvalue':retvalue + } + conn.send_msg(bytes(json.dumps(msg),'utf-8')) + + def _recv_msg_ret(self,conn,msg): + genid = msg['genid'] + retvalue = msg['retvalue'] + + self._conn_waitretmap[conn.linkid].pop(genid) + + print(retvalue) + +@nonblock.call +def imc_call(iden,dst,func_name,param,_genid): + Proxy.instance.call(_genid,iden,dst,func_name,param) + +def imc_register_call(path,func_name,func): + Proxy.instance.register_call(path,func_name,func) diff --git a/src/py/netio.py b/src/py/netio.py index a08c248..b7454b5 100644 --- a/src/py/netio.py +++ b/src/py/netio.py @@ -3,7 +3,7 @@ import struct import tornado.ioloop import tornado.stack_context -import imcproxy +from imc.proxy import Connection def send_pack(stream,data): stream.write(struct.pack('l',len(data)) + data) @@ -15,7 +15,7 @@ def recv_pack(stream,callback): stream.read_bytes(8,_recv_size) -class SocketConnection(imcproxy.IMCConnection): +class SocketConnection(Connection): def __init__(self,linkid,stream): super().__init__(linkid) |