From 172b58b3f60c4b0277ad85fab728a5911bd573ac Mon Sep 17 00:00:00 2001 From: pzread Date: Thu, 16 May 2013 00:10:42 +0800 Subject: Fix some bug --- src/js/imc.js | 207 +++++++++++++++++++++++++++++++++++++++++++++++ src/py/backend_server.py | 6 ++ src/py/center_server.py | 21 +++-- src/py/imc/nonblock.py | 34 +++++--- src/test/imc.js | 207 ----------------------------------------------- src/test/wstest.js | 75 +++++++++-------- 6 files changed, 285 insertions(+), 265 deletions(-) create mode 100644 src/js/imc.js delete mode 100644 src/test/imc.js (limited to 'src') diff --git a/src/js/imc.js b/src/js/imc.js new file mode 100644 index 0000000..98f2bdb --- /dev/null +++ b/src/js/imc.js @@ -0,0 +1,207 @@ +var __extend__ = function(child,parent){ + child.prototype.__super__ = parent; +}; + +var imc = new function(){ + this.Connection = function(linkid){ + var that = this; + + that.link_linkidmap = {}; + that.close_callback = []; + that.linkid = linkid; + + that.send_msg = function(data){}; + that.start_recv = function(recv_callback){}; + + that.close = function(){ + var i; + + for(i = 0;i < that.close_callback.length;i++){ + that.close_callback[i](that); + } + }; + }; + + this.Proxy = function(linkid,connect_linkid){ + var MSGTYPE_CALL = 'call'; + var MSGTYPE_RET = 'ret'; + + var that = this; + var caller_retid_count = 0; + var conn_linkidmap = {}; + var conn_retidmap = {}; + var call_pathmap = {}; + + var route_call = function(caller_retid,timeout,iden,dst,func_name,param,callback){ + var i; + var part; + var dst_linkid; + var dst_path; + var caller_linkid; + var func; + + var _add_wait_caller = function(conn_linkid){ + conn_retidmap[conn_linkid][caller_retid] = { + 'timeout':timeout, + 'callback':callback + } + }; + + part = dst.split('/'); + dst_linkid = part[2]; + dst_path = part.slice(3).join('/'); + + caller_linkid = iden.linkid + if(caller_retid.split('/')[0] != caller_linkid){ + return false; + } + + if(dst_linkid == linkid){ + if((func = call_pathmap[dst_path + func_name]) != undefined){ + _add_wait_caller(linkid); + + func(param,function(data){ + if(linkid in conn_retidmap && caller_retid in conn_retidmap[linkid]){ + delete conn_retidmap[linkid][caller_retid]; + callback({'stat':true,'data':data}); + } + }); + }else{ + callback({'stat':true,'data':'Enoexist'}); + } + }else{ + that.request_conn(dst_linkid,function(conn){ + if(caller_linkid == linkid){ + _add_wait_caller(conn.linkid); + } + + send_msg_call(conn,caller_retid,timeout,iden,dst,func_name,param); + }); + } + }; + + var recv_dispatch = function(conn,data){ + msgo = JSON.parse(data); + if(msgo.type == MSGTYPE_CALL){ + recv_msg_call(conn,msgo); + }else if(msgo.type == MSGTYPE_RET){ + recv_msg_ret(conn,msgo); + } + }; + + var send_msg_call = function(conn,caller_retid,timeout,iden,dst,func_name,param){ + msg = { + 'type':MSGTYPE_CALL, + 'caller_retid':caller_retid, + 'timeout':timeout, + 'iden':iden, + 'dst':dst, + 'func_name':func_name, + 'param':param + }; + + conn.send_msg(JSON.stringify(msg)); + }; + var recv_msg_call = function(conn,msg){ + var caller_retid = msg.caller_retid; + var timeout = msg.timeout; + var iden = msg.iden; + var dst = msg.dst; + var func_name = msg.func_name; + var param = msg.param; + var caller_linkid = iden.linkid; + + route_call(caller_retid,timeout,iden,dst,func_name,param,function(result){ + that.request_conn(caller_retid,function(conn){ + send_msg_ret(conn,caller_linkid,caller_retid,result); + }); + }); + }; + + var send_msg_ret = function(conn,caller_linkid,caller_retid,result){ + msg = { + 'type':MSGTYPE_RET, + 'caller_linkid':caller_linkid, + 'caller_retid':caller_retid, + 'result':result + }; + + conn.send_msg(JSON.stringify(msg)); + }; + var recv_msg_ret = function(conn,msg){ + var caller_linkid = msg['caller_linkid']; + var caller_retid = msg['caller_retid']; + var result = msg['result']; + + if(caller_linkid == linkid){ + if(conn.linkid in conn_retidmap && caller_retid in conn_retidmap[conn.linkid]){ + wait = conn_retidmap[conn.linkid][caller_retid]; + delete conn_retidmap[conn.linkid][caller_retid]; + + wait.callback(result); + } + }else{ + request_conn(caller_linkid,function(conn){ + send_msg_ret(conn,caller_linkid,caller_retid,result); + }); + } + }; + + that.add_conn = function(conn){ + conn_linkidmap[conn.linkid] = conn; + conn_retidmap[conn.linkid] = {}; + conn.start_recv(recv_dispatch); + }; + that.link_conn = function(linkid,conn){ + conn.link_linkidmap[linkid] = true; + conn_linkidmap[linkid] = conn; + }; + that.unlink_conn = function(linkid){ + conn = conn_linkidmap[linkid]; + delete conn_linkidmap[linkid]; + delete conn.link_linkidmap[linkid]; + }; + that.del_conn = function(conn){ + delete conn_linkidmap[conn.linkid]; + }; + that.request_conn = function(linkid,callback){ + var _conn_cb = function(conn){ + if(conn != null && conn.linkid != linkid){ + that.link_conn(linkid,conn); + } + + callback(conn); + }; + + conn = conn_linkidmap[linkid]; + if(conn != undefined){ + _conn_cb(conn); + }else{ + connect_linkid(linkid,_conn_cb); + } + }; + + that.call = function(iden,timeout,dst,func_name,param,callback){ + caller_retid = linkid + '/' + caller_retid_count; + caller_retid_count += 1; + + route_call(caller_retid,timeout,iden,dst,func_name,param,callback); + }; + + that.register_call = function(path,func_name,func){ + call_pathmap[path + func_name] = func; + }; + + conn_retidmap[linkid] = {}; + + imc.Proxy.instance = that; + }; + +}; + +function imc_call(iden,dst,func_name,param,callback){ + imc.Proxy.instance.call(iden,10000,dst,func_name,param,callback); +} +function imc_register_call(path,func_name,func){ + imc.Proxy.instance.register_call(path,func_name,func); +} diff --git a/src/py/backend_server.py b/src/py/backend_server.py index f750a1e..0334417 100644 --- a/src/py/backend_server.py +++ b/src/py/backend_server.py @@ -90,6 +90,8 @@ class BackendWorker(tornado.tcpserver.TCPServer): def _conn_center(self): def __retry(): print('retry connect center') + + self.center_conn = None self._ioloop.add_timeout(datetime.timedelta(seconds = 5),self._conn_center) def __send_worker_info(): @@ -161,6 +163,10 @@ class BackendWorker(tornado.tcpserver.TCPServer): }),'utf-8')) netio.recv_pack(stream,___recv_cb) + if self.center_conn == None: + callback(None) + return + stat,ret = (yield imc_call(self._iden,'/center/' + self.center_conn.linkid + '/','lookup_linkid',linkid)) if stat == False or ret == None: diff --git a/src/py/center_server.py b/src/py/center_server.py index a5dac27..322f681 100644 --- a/src/py/center_server.py +++ b/src/py/center_server.py @@ -55,7 +55,7 @@ class CenterServer(tornado.tcpserver.TCPServer): self._linkid_usemap = {} self._worker_linkidmap = {} - self._client_linkidmap = {} + self._backend_clientmap = {} self._backend_workerlist = [] self.linkclass = 'center' @@ -84,10 +84,12 @@ class CenterServer(tornado.tcpserver.TCPServer): def add_backend_worker(self,backend): self._worker_linkidmap[backend.linkid] = backend + self._backend_clientmap[backend.linkid] = {} self._backend_workerlist.append(backend) def del_backend_worker(self,backend): self._worker_linkidmap.pop(backend.linkid,None) + del self._backend_clientmap[backend.linkid] self._backend_workerlist.remove(backend) def dispatch_client(self): @@ -128,10 +130,10 @@ class CenterServer(tornado.tcpserver.TCPServer): @imc.nonblock.func def _add_client(self,iden,param): - backend_linkid = param['backend_linkid'] + backend_linkid = iden['linkid'] client_linkid = param['client_linkid'] - self._client_linkidmap[client_linkid] = True + self._backend_clientmap[backend_linkid][client_linkid] = True conn = Proxy.instance.get_conn(backend_linkid) Proxy.instance.link_conn(client_linkid,conn) @@ -141,12 +143,15 @@ class CenterServer(tornado.tcpserver.TCPServer): @imc.nonblock.func def _del_client(self,iden,param): + backend_linkid = iden['linkid'] client_linkid = param - del self._client_linkidmap[client_linkid] + del self._backend_clientmap[backend_linkid][client_linkid] conn = Proxy.instance.get_conn(client_linkid) Proxy.instance.unlink_conn(client_linkid) + + @imc.nonblock.func def _test_dst(self,iden,param): #stat,ret = (yield imc_call( @@ -157,9 +162,11 @@ class CenterServer(tornado.tcpserver.TCPServer): #)) linkidlist = [] - linkids = self._client_linkidmap.keys() - for linkid in linkids: - linkidlist.append(linkid) + clientmaps = self._backend_clientmap.values() + for clientmap in clientmaps: + linkids = clientmap.keys() + for linkid in linkids: + linkidlist.append(linkid) return linkidlist diff --git a/src/py/imc/nonblock.py b/src/py/imc/nonblock.py index 02f291d..267bbaa 100644 --- a/src/py/imc/nonblock.py +++ b/src/py/imc/nonblock.py @@ -18,20 +18,27 @@ def func(f): global gen_current_id global gen_waitmap - gen = f(*args,**kwargs) - if isinstance(gen,types.GeneratorType): - gen_current_id = str(id(gen)) - gen_waitmap[gen_current_id] = gen + try: + gen = f(*args,**kwargs) + if isinstance(gen,types.GeneratorType): + gen_current_id = str(id(gen)) + gen_waitmap[gen_current_id] = gen - try: - next(gen) + try: + next(gen) - return (False,gen_current_id) - except StopIteration as ret: - del gen_waitmap[gen_current_id] - return (True,ret) - else: - return (True,gen) + return (False,gen_current_id) + + except StopIteration as ret: + del gen_waitmap[gen_current_id] + return (True,ret) + + + else: + return (True,gen) + + except Exception: + return (True,'Einternal') return wrapper @@ -49,3 +56,6 @@ def retcall(genid,value): except StopIteration as err: del gen_waitmap[gen_current_id] return (True,err.value) + + except Exception: + return (True,'Einternal') diff --git a/src/test/imc.js b/src/test/imc.js deleted file mode 100644 index 98f2bdb..0000000 --- a/src/test/imc.js +++ /dev/null @@ -1,207 +0,0 @@ -var __extend__ = function(child,parent){ - child.prototype.__super__ = parent; -}; - -var imc = new function(){ - this.Connection = function(linkid){ - var that = this; - - that.link_linkidmap = {}; - that.close_callback = []; - that.linkid = linkid; - - that.send_msg = function(data){}; - that.start_recv = function(recv_callback){}; - - that.close = function(){ - var i; - - for(i = 0;i < that.close_callback.length;i++){ - that.close_callback[i](that); - } - }; - }; - - this.Proxy = function(linkid,connect_linkid){ - var MSGTYPE_CALL = 'call'; - var MSGTYPE_RET = 'ret'; - - var that = this; - var caller_retid_count = 0; - var conn_linkidmap = {}; - var conn_retidmap = {}; - var call_pathmap = {}; - - var route_call = function(caller_retid,timeout,iden,dst,func_name,param,callback){ - var i; - var part; - var dst_linkid; - var dst_path; - var caller_linkid; - var func; - - var _add_wait_caller = function(conn_linkid){ - conn_retidmap[conn_linkid][caller_retid] = { - 'timeout':timeout, - 'callback':callback - } - }; - - part = dst.split('/'); - dst_linkid = part[2]; - dst_path = part.slice(3).join('/'); - - caller_linkid = iden.linkid - if(caller_retid.split('/')[0] != caller_linkid){ - return false; - } - - if(dst_linkid == linkid){ - if((func = call_pathmap[dst_path + func_name]) != undefined){ - _add_wait_caller(linkid); - - func(param,function(data){ - if(linkid in conn_retidmap && caller_retid in conn_retidmap[linkid]){ - delete conn_retidmap[linkid][caller_retid]; - callback({'stat':true,'data':data}); - } - }); - }else{ - callback({'stat':true,'data':'Enoexist'}); - } - }else{ - that.request_conn(dst_linkid,function(conn){ - if(caller_linkid == linkid){ - _add_wait_caller(conn.linkid); - } - - send_msg_call(conn,caller_retid,timeout,iden,dst,func_name,param); - }); - } - }; - - var recv_dispatch = function(conn,data){ - msgo = JSON.parse(data); - if(msgo.type == MSGTYPE_CALL){ - recv_msg_call(conn,msgo); - }else if(msgo.type == MSGTYPE_RET){ - recv_msg_ret(conn,msgo); - } - }; - - var send_msg_call = function(conn,caller_retid,timeout,iden,dst,func_name,param){ - msg = { - 'type':MSGTYPE_CALL, - 'caller_retid':caller_retid, - 'timeout':timeout, - 'iden':iden, - 'dst':dst, - 'func_name':func_name, - 'param':param - }; - - conn.send_msg(JSON.stringify(msg)); - }; - var recv_msg_call = function(conn,msg){ - var caller_retid = msg.caller_retid; - var timeout = msg.timeout; - var iden = msg.iden; - var dst = msg.dst; - var func_name = msg.func_name; - var param = msg.param; - var caller_linkid = iden.linkid; - - route_call(caller_retid,timeout,iden,dst,func_name,param,function(result){ - that.request_conn(caller_retid,function(conn){ - send_msg_ret(conn,caller_linkid,caller_retid,result); - }); - }); - }; - - var send_msg_ret = function(conn,caller_linkid,caller_retid,result){ - msg = { - 'type':MSGTYPE_RET, - 'caller_linkid':caller_linkid, - 'caller_retid':caller_retid, - 'result':result - }; - - conn.send_msg(JSON.stringify(msg)); - }; - var recv_msg_ret = function(conn,msg){ - var caller_linkid = msg['caller_linkid']; - var caller_retid = msg['caller_retid']; - var result = msg['result']; - - if(caller_linkid == linkid){ - if(conn.linkid in conn_retidmap && caller_retid in conn_retidmap[conn.linkid]){ - wait = conn_retidmap[conn.linkid][caller_retid]; - delete conn_retidmap[conn.linkid][caller_retid]; - - wait.callback(result); - } - }else{ - request_conn(caller_linkid,function(conn){ - send_msg_ret(conn,caller_linkid,caller_retid,result); - }); - } - }; - - that.add_conn = function(conn){ - conn_linkidmap[conn.linkid] = conn; - conn_retidmap[conn.linkid] = {}; - conn.start_recv(recv_dispatch); - }; - that.link_conn = function(linkid,conn){ - conn.link_linkidmap[linkid] = true; - conn_linkidmap[linkid] = conn; - }; - that.unlink_conn = function(linkid){ - conn = conn_linkidmap[linkid]; - delete conn_linkidmap[linkid]; - delete conn.link_linkidmap[linkid]; - }; - that.del_conn = function(conn){ - delete conn_linkidmap[conn.linkid]; - }; - that.request_conn = function(linkid,callback){ - var _conn_cb = function(conn){ - if(conn != null && conn.linkid != linkid){ - that.link_conn(linkid,conn); - } - - callback(conn); - }; - - conn = conn_linkidmap[linkid]; - if(conn != undefined){ - _conn_cb(conn); - }else{ - connect_linkid(linkid,_conn_cb); - } - }; - - that.call = function(iden,timeout,dst,func_name,param,callback){ - caller_retid = linkid + '/' + caller_retid_count; - caller_retid_count += 1; - - route_call(caller_retid,timeout,iden,dst,func_name,param,callback); - }; - - that.register_call = function(path,func_name,func){ - call_pathmap[path + func_name] = func; - }; - - conn_retidmap[linkid] = {}; - - imc.Proxy.instance = that; - }; - -}; - -function imc_call(iden,dst,func_name,param,callback){ - imc.Proxy.instance.call(iden,10000,dst,func_name,param,callback); -} -function imc_register_call(path,func_name,func){ - imc.Proxy.instance.register_call(path,func_name,func); -} diff --git a/src/test/wstest.js b/src/test/wstest.js index cc7806d..4ad0d83 100644 --- a/src/test/wstest.js +++ b/src/test/wstest.js @@ -4,31 +4,6 @@ var data = new ArrayBuffer(1024); var linkid = null; var iden = null; -var backend_conn = null; - -function test(){ - $.post('http://toj.tfcis.org:83/conn',{},function(res){ - var reto; - - if(res[0] != 'E'){ - reto = JSON.parse(res) - linkid = reto.client_linkid; - - console.log(linkid); - - new imc.Proxy(linkid,function(linkid,callback){ - callback(backend_conn); - }); - iden = {'linkclass':'client','linkid':linkid}; - - imc_register_call('','test_call',test_call); - - conn_backend(reto.client_linkid,reto.backend_linkid,reto.ip,reto.port); - }else{ - - } - }); -} function test_call(param,callback){ callback('Hello Too'); @@ -59,26 +34,48 @@ var WebSocketConnection = function(linkid,ws){ ws.onclose = function(e){ console.log('close'); that.close(); - }; - - backend_conn = that; + conn_backend(); + }; };__extend__(WebSocketConnection,imc.Connection); function conn_backend(client_linkid,backend_linkid,ip,port){ - var ws; - - ws = new WebSocket('ws://' + ip + ':' + port + '/conn'); - ws.onopen = function(){ - var i; + $.post('http://toj.tfcis.org:83/conn',{},function(res){ + var reto; + var ws; + + if(res[0] != 'E'){ + reto = JSON.parse(res) + linkid = reto.client_linkid; - console.log('open'); + ws = new WebSocket('ws://' + reto.ip + ':' + reto.port + '/conn'); + ws.onopen = function(){ + var i; + var conn; - ws.send(JSON.stringify({ - 'client_linkid':client_linkid - })); - imc.Proxy.instance.add_conn(new WebSocketConnection(backend_linkid,ws)); - }; + console.log('open'); + + ws.send(JSON.stringify({ + 'client_linkid':reto.client_linkid + })); + + conn = new WebSocketConnection(reto.backend_linkid,ws); + + new imc.Proxy(linkid,function(linkid,callback){ + callback(conn); + }); + imc.Proxy.instance.add_conn(conn); + + iden = {'linkclass':'client','linkid':linkid}; + + imc_register_call('','test_call',test_call); + }; + }else{ + setTimeout(conn_backend,5000); + } + }); + + } function perf(){ -- cgit v1.2.3