aboutsummaryrefslogtreecommitdiffstats
path: root/src/py
diff options
context:
space:
mode:
Diffstat (limited to 'src/py')
-rw-r--r--src/py/backend_server.py22
-rw-r--r--src/py/center_server.py6
-rw-r--r--src/py/imc/__init__.py1
-rw-r--r--src/py/imc/nonblock.py (renamed from src/py/nonblock.py)2
-rwxr-xr-x[-rw-r--r--]src/py/imc/proxy.py (renamed from src/py/imcproxy.py)74
-rw-r--r--src/py/netio.py4
6 files changed, 92 insertions, 17 deletions
diff --git a/src/py/backend_server.py b/src/py/backend_server.py
index 544f647..28480b8 100644
--- a/src/py/backend_server.py
+++ b/src/py/backend_server.py
@@ -8,7 +8,8 @@ import tornado.iostream
import tornado.ioloop
import netio
-import imcproxy
+import imc.nonblock
+from imc.proxy import Proxy,Connection,imc_call,imc_register_call
class BackendWorker():
def __init__(self,center_addr):
@@ -17,7 +18,6 @@ class BackendWorker():
self.linkclass = 'backend'
self.linkid = None
- self.imc_proxy = imcproxy.IMCProxy()
def start(self):
self._conn_center()
@@ -32,16 +32,17 @@ class BackendWorker():
info = json.loads(data.decode('utf-8'))
self.linkid = info['linkid']
+ Proxy(self.linkid)
+
self.center_conn = netio.SocketConnection(info['center_linkid'],stream)
self.center_conn.add_close_callback(lambda conn : __retry())
- self.imc_proxy.add_conn(self.center_conn)
+ Proxy.instance.add_conn(self.center_conn)
print('/backend/' + self.linkid)
- def ___tmp(genid):
- print(genid)
- self.imc_proxy._send_msg_call(self.center_conn,5000,13,___tmp,None,None,'Hello',None)
+ imc_register_call('','test_dst',self._test_dst)
+ self._test_call(None)
netio.send_pack(stream,bytes(json.dumps({
'linkclass':self.linkclass,
@@ -52,6 +53,15 @@ class BackendWorker():
stream = tornado.iostream.IOStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0))
stream.set_close_callback(__retry)
stream.connect(self.center_addr,lambda : __send_worker_info())
+
+ @imc.nonblock.func
+ def _test_call(self,param):
+ print('test')
+ imc_call(None,'/backend/' + self.linkid,'test_dst','Hello')
+
+ @imc.nonblock.func
+ def _test_dst(self,param):
+ print('dst')
if __name__ == '__main__':
backend_worker = BackendWorker(('localhost',5730))
diff --git a/src/py/center_server.py b/src/py/center_server.py
index cb4ef75..2e03ccd 100644
--- a/src/py/center_server.py
+++ b/src/py/center_server.py
@@ -10,7 +10,7 @@ import tornado.httpserver
import tornado.web
import netio
-import imcproxy
+from imc.proxy import Proxy,Connection
class Worker:
def __init__(self,stream,linkclass,linkid,worker_ip):
@@ -28,7 +28,7 @@ class Worker:
conn = netio.SocketConnection(self.linkid,self.stream)
conn.add_close_callback(lambda conn : self.close())
- center_serv.imc_proxy.add_conn(conn)
+ Proxy.instance.add_conn(conn)
def close(self):
pass
@@ -58,9 +58,9 @@ class CenterServer(tornado.tcpserver.TCPServer):
self.linkid_usemap = {}
self.backend_workerlist = []
- self.imc_proxy = imcproxy.IMCProxy()
self.linkclass = 'center'
self.linkid = self._create_linkid()
+ Proxy(self.linkid)
print('/center/' + self.linkid)
diff --git a/src/py/imc/__init__.py b/src/py/imc/__init__.py
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/src/py/imc/__init__.py
@@ -0,0 +1 @@
+
diff --git a/src/py/nonblock.py b/src/py/imc/nonblock.py
index 8d21cc4..8726c49 100644
--- a/src/py/nonblock.py
+++ b/src/py/imc/nonblock.py
@@ -8,7 +8,7 @@ def call(f):
global gen_current_id
global gen_waitmap
- kwargs['genid'] = gen_current_id
+ kwargs['_genid'] = gen_current_id
return f(*args,**kwargs)
return wrapper
diff --git a/src/py/imcproxy.py b/src/py/imc/proxy.py
index 25784f4..0d6a942 100644..100755
--- a/src/py/imcproxy.py
+++ b/src/py/imc/proxy.py
@@ -3,7 +3,9 @@ import json
import tornado.ioloop
import tornado.stack_context
-class IMCConnection:
+import nonblock
+
+class Connection:
def __init__(self,linkid):
self.linkid = linkid
self._close_callback = []
@@ -21,10 +23,13 @@ class IMCConnection:
for callback in self._close_callback:
callback(self)
-class IMCProxy:
- def __init__(self):
+class Proxy:
+ def __init__(self,linkid):
+ self._linkid = linkid
+
self._conn_linkidmap = {}
self._conn_waitretmap = {}
+ self._call_pathmap = {}
self.MSGTYPE_CALL = 'call'
self.MSGTYPE_RET = 'ret'
@@ -32,6 +37,8 @@ class IMCProxy:
self._check_waitret_timer = tornado.ioloop.PeriodicCallback(self._check_waitret,1000)
self._check_waitret_timer.start()
+ Proxy.instance = self
+
def add_conn(self,conn):
self._conn_linkidmap[conn.linkid] = conn
self._conn_waitretmap[conn.linkid] = {}
@@ -44,16 +51,45 @@ class IMCProxy:
del self._conn_waitretmap[conn.linkid]
def get_conn(self,linkid):
- if linkid not in self.conn_linkidmap:
+ if linkid not in self._conn_linkidmap:
return None
- return self.conn_linkidmap[linkid]
+ return self._conn_linkidmap[linkid]
+
+ def call(self,genid,iden,dst,func_name,param):
+ def _fail_cb(genid):
+ print('Opps')
+
+ self._route_call(genid,_fail_cb,iden,dst,func_name,param)
+
+ def register_call(self,path,func_name,func):
+ self._call_pathmap[''.join([path,'/',func_name])] = func
+
+ def _route_call(self,genid,fail_callback,iden,dst,func_name,param):
+ dst_part = dst.split('/')[1:]
+ linkid = dst_part[1]
+ path = ''.join(dst_part[2:])
+
+ if linkid == self._linkid:
+ self._handle_call(genid,fail_callback,iden,path,func_name,param)
+ else:
+ conn = self.get_conn(linkid)
+ if conn == None:
+ pass
+
+ def _handle_call(self,genid,fail_callback,iden,path,func_name,param):
+ try:
+ self._call_pathmap[''.join([path,'/',func_name])](param)
+ except KeyError:
+ fail_callback(genid)
def _recvloop_dispatch(self,conn,data):
msg = json.loads(data.decode('utf-8'))
msg_type = msg['type']
if msg_type == self.MSGTYPE_CALL:
self._recv_msg_call(conn,msg)
+ elif msg_type == self.MSGTYPE_RET:
+ self._recv_msg_ret(conn,msg)
def _conn_close_cb(self,conn):
wait_map = self._conn_waitretmap[conn.linkid]
@@ -97,7 +133,35 @@ class IMCProxy:
conn.send_msg(bytes(json.dumps(msg),'utf-8'))
def _recv_msg_call(self,conn,msg):
+ genid = msg['genid']
iden = msg['iden']
dst = msg['dst']
func = msg['func']
param = msg['param']
+
+ print(genid)
+
+ self._send_msg_ret(conn,genid,'Hello')
+
+ def _send_msg_ret(self,conn,genid,retvalue):
+ msg = {
+ 'type':self.MSGTYPE_RET,
+ 'genid':genid,
+ 'retvalue':retvalue
+ }
+ conn.send_msg(bytes(json.dumps(msg),'utf-8'))
+
+ def _recv_msg_ret(self,conn,msg):
+ genid = msg['genid']
+ retvalue = msg['retvalue']
+
+ self._conn_waitretmap[conn.linkid].pop(genid)
+
+ print(retvalue)
+
+@nonblock.call
+def imc_call(iden,dst,func_name,param,_genid):
+ Proxy.instance.call(_genid,iden,dst,func_name,param)
+
+def imc_register_call(path,func_name,func):
+ Proxy.instance.register_call(path,func_name,func)
diff --git a/src/py/netio.py b/src/py/netio.py
index a08c248..b7454b5 100644
--- a/src/py/netio.py
+++ b/src/py/netio.py
@@ -3,7 +3,7 @@ import struct
import tornado.ioloop
import tornado.stack_context
-import imcproxy
+from imc.proxy import Connection
def send_pack(stream,data):
stream.write(struct.pack('l',len(data)) + data)
@@ -15,7 +15,7 @@ def recv_pack(stream,callback):
stream.read_bytes(8,_recv_size)
-class SocketConnection(imcproxy.IMCConnection):
+class SocketConnection(Connection):
def __init__(self,linkid,stream):
super().__init__(linkid)