diff options
Diffstat (limited to 'src/py/backend_server.py')
-rwxr-xr-x[-rw-r--r--] | src/py/backend_server.py | 127 |
1 files changed, 61 insertions, 66 deletions
diff --git a/src/py/backend_server.py b/src/py/backend_server.py index 7535183..8d9d169 100644..100755 --- a/src/py/backend_server.py +++ b/src/py/backend_server.py @@ -29,11 +29,11 @@ class BackendWorker(tornado.tcpserver.TCPServer): self.sock_addr = None self.ws_port = ws_port - self._linkid = None + self._link = None self._idendesc = None - self._pend_mainconn_linkidmap = {} + self._pend_mainconn_linkmap = {} self._pend_filestream_filekeymap = {} - self._client_linkidmap = {} + self._client_linkmap = {} def start(self): sock_port = random.randrange(4096,8192) @@ -61,21 +61,17 @@ class BackendWorker(tornado.tcpserver.TCPServer): netio.recv_pack(sock_stream,_recv_conn_info) - def add_client(self,linkid,handler): - self._client_linkidmap[linkid] = {} + def add_client(self,link,handler): + self._client_linkmap[link] = {} - conn = netio.WebSocketConnection('client',linkid,handler) - conn.add_close_callback(lambda conn : self.del_client(conn.linkid)) + conn = netio.WebSocketConnection('client',link,handler) + conn.add_close_callback(lambda conn : self.del_client(conn.link)) Proxy.instance.add_conn(conn) - #imc_call_async(self._idendesc,'/center/' + self.center_conn.linkid + '/','add_client',{'backend_linkid':self._linkid,'client_linkid':linkid}) - return conn - def del_client(self,linkid): - del self._client_linkidmap[linkid] - - #imc_call_async(self._idendesc,'/center/' + self.center_conn.linkid + '/','del_client',linkid) + def del_client(self,link): + del self._client_linkmap[link] def _conn_center(self): def __retry(conn): @@ -87,25 +83,23 @@ class BackendWorker(tornado.tcpserver.TCPServer): def __send_worker_info(): def ___recv_info_cb(data): info = json.loads(data.decode('utf-8')) - pubkey = open('pubkey.pem','r').read() TOJAuth(pubkey) self._idendesc = info['idendesc'] - iden = TOJAuth.instance.get_iden('backend',self._linkid,self._idendesc) - self._linkid = iden['linkid'] - Proxy('backend',self._linkid,TOJAuth.instance,self._idendesc,self._conn_linkid) + self._link = info['worker_link'] + Proxy(self._link,TOJAuth.instance,self._idendesc,self._conn_link) - self.center_conn = SocketConnection('center',info['center_linkid'],stream,self.center_addr) + self.center_conn = SocketConnection(info['center_link'],stream,self.center_addr) self.center_conn.add_close_callback(__retry) Proxy.instance.add_conn(self.center_conn) imc_register_call('','test_dst',self._test_dst) #imc_register_call('','test_dsta',self._test_dsta) - #time.sleep(2) + time.sleep(1) - if int(self._linkid) == 2: - self._test_call('9') + #if self._link == '/backend/2/': + self._test_call(None) sock_ip,sock_port = self.sock_addr netio.send_pack(stream,bytes(json.dumps({ @@ -121,10 +115,10 @@ class BackendWorker(tornado.tcpserver.TCPServer): stream.set_close_callback(__retry) stream.connect(self.center_addr,__send_worker_info) - def _conn_linkid(self,linkid): + def _conn_link(self,link): def __handle_pend(conn): try: - retids = self._pend_mainconn_linkidmap.pop(worker_linkid) + retids = self._pend_mainconn_linkmap.pop(worker_link) except KeyError: return @@ -133,7 +127,7 @@ class BackendWorker(tornado.tcpserver.TCPServer): imc.async.ret(retid,conn) def __conn_cb(): - conn = Proxy.instance.get_conn(worker_linkid) + conn = Proxy.instance.get_conn(worker_link) if conn != None: __handle_pend(conn) main_stream.set_close_callback(None) @@ -143,8 +137,7 @@ class BackendWorker(tornado.tcpserver.TCPServer): sock_ip,sock_port = self.sock_addr netio.send_pack(main_stream,bytes(json.dumps({ 'conntype':'main', - 'linkclass':'backend', - 'linkid':self._linkid, + 'link':self._link, 'sock_ip':sock_ip, 'sock_port':sock_port }),'utf-8')) @@ -153,7 +146,7 @@ class BackendWorker(tornado.tcpserver.TCPServer): def __recv_cb(data): stat = json.loads(data.decode('utf-8')) if stat == True: - conn = SocketConnection(worker_linkclass,worker_linkid,main_stream,sock_addr,self._add_pend_filestream) + conn = SocketConnection(worker_link,main_stream,sock_addr,self._add_pend_filestream) Proxy.instance.add_conn(conn) __handle_pend(conn) @@ -164,25 +157,25 @@ class BackendWorker(tornado.tcpserver.TCPServer): if self.center_conn == None: return None - stat,ret = imc_call(self._idendesc,'/center/' + self.center_conn.linkid + '/','lookup_linkid',linkid) + with TOJAuth.change_current_iden(self._idendesc): + stat,ret = Proxy.instance.call(self.center_conn.link,'lookup_link',65536,link) if stat == False or ret == None: return None else: - worker_linkclass = ret['worker_linkclass'] - worker_linkid = ret['worker_linkid'] + worker_link = ret['worker_link'] - conn = Proxy.instance.get_conn(worker_linkid) + conn = Proxy.instance.get_conn(worker_link) if conn != None: return conn - elif worker_linkid in self._pend_mainconn_linkidmap: - self._pend_mainconn_linkidmap[worker_linkid].append(imc.async.get_retid()) + elif worker_link in self._pend_mainconn_linkmap: + self._pend_mainconn_linkmap[worker_link].append(imc.async.get_retid()) return imc.async.switch_top() else: - self._pend_mainconn_linkidmap[worker_linkid] = [imc.async.get_retid()] + self._pend_mainconn_linkmap[worker_link] = [imc.async.get_retid()] sock_addr = (ret['sock_ip'],ret['sock_port']) @@ -196,23 +189,22 @@ class BackendWorker(tornado.tcpserver.TCPServer): self._pend_filestream_filekeymap[filekey] = tornado.stack_context.wrap(callback) def _handle_mainconn(self,main_stream,addr,info): - linkclass = info['linkclass'] - linkid = info['linkid'] + link = info['link'] sock_ip = info['sock_ip'] sock_port = info['sock_port'] - conn = Proxy.instance.get_conn(linkid) + conn = Proxy.instance.get_conn(link) if conn != None: return - if (linkid not in self._pend_mainconn_linkidmap) or self._linkid > linkid: - conn = SocketConnection(linkclass,linkid,main_stream,(sock_ip,sock_port),self._add_pend_filestream) + if (link not in self._pend_mainconn_linkmap) or self._link > link: + conn = SocketConnection(link,main_stream,(sock_ip,sock_port),self._add_pend_filestream) Proxy.instance.add_conn(conn) netio.send_pack(main_stream,bytes(json.dumps(True),'utf-8')) - if linkid in self._pend_mainconn_linkidmap: - retids = self._pend_mainconn_linkidmap.pop(linkid) + if link in self._pend_mainconn_linkmap: + retids = self._pend_mainconn_linkmap.pop(link) for retid in retids: imc.async.ret(retid,conn) @@ -228,18 +220,28 @@ class BackendWorker(tornado.tcpserver.TCPServer): @imc.async.caller def _test_call(self,param): - dst = '/backend/' + '3' + '/' - ret = imc_call_async(self._idendesc,dst,'test_dst',lambda result : print(result),'test',113) - print(ret) + with TOJAuth.change_current_iden(self._idendesc): + for i in range(0,1024): + dst = '/backend/' + str((i % 8) + 2) + '/' + if dst == self._link: + continue + + fileres = Proxy.instance.sendfile(dst,'test.py') + ret = imc_call(dst,'test_dst',fileres.filekey) + print(fileres.wait()) - ret = imc_call(self._idendesc,'/center/1/','create_iden','client','1234',1221,TOJAuth.ROLETYPE_USER,{'uid':31}) - print(ret) + print(self._link) + + #imc_call_async(dst,'test_dst',lambda result : print(result),'test',113) + + #ret = imc_call('/center/1/','create_iden','client','1234',1221,TOJAuth.ROLETYPE_USER,{'uid':31}) + #print(ret) return pend = [] for i in range(0,32): - if str((i % 16) + 2) == self._linkid: + if str((i % 16) + 2) == self._link: continue fileres = Proxy.instance.sendfile('/backend/' + str((i % 16) + 2) + '/','Fedora-18-x86_64-DVD.iso') @@ -250,22 +252,15 @@ class BackendWorker(tornado.tcpserver.TCPServer): pend.append(fileres) for p in pend: - print(self._linkid + ' ' + p.wait()) + print(self._link + ' ' + p.wait()) - print(self._linkid) + print(self._link) @imc.async.caller - def _test_dst(self,param,sdfsdf): - #stat,ret = imc_call(self._idendesc,'/backend/' + self._linkid + '/','test_dsta',param) - #return ret + ' Too' - - print(param) - print(sdfsdf) - print(TOJAuth.get_current_iden()) + def _test_dst(self,filekey): + #print(filekey) - #Proxy.instance.rejectfile(param) - #print('recv ' + iden['linkid'] + ' > ' + self._linkid) - #fileres = Proxy.instance.recvfile(param,'data') + fileres = Proxy.instance.recvfile(filekey,'data') #print('recv ' + fileres.wait()) return 'ok' @@ -287,7 +282,7 @@ class WebSocketConnHandler(tornado.websocket.WebSocketHandler): else: try: info = json.loads(msg) - self.backend_conn = backend_worker.add_client(info['client_linkid'],self) + self.backend_conn = backend_worker.add_client(info['client_link'],self) except Exception: self.close() @@ -316,12 +311,12 @@ if __name__ == '__main__': worker_list.append(Process(target = start_backend_worker,args = (81, ))) worker_list.append(Process(target = start_backend_worker,args = (82, ))) - #worker_list.append(Process(target = start_backend_worker,args = (181, ))) - #worker_list.append(Process(target = start_backend_worker,args = (182, ))) - #worker_list.append(Process(target = start_backend_worker,args = (183, ))) - #worker_list.append(Process(target = start_backend_worker,args = (184, ))) - #worker_list.append(Process(target = start_backend_worker,args = (185, ))) - #worker_list.append(Process(target = start_backend_worker,args = (186, ))) + worker_list.append(Process(target = start_backend_worker,args = (181, ))) + worker_list.append(Process(target = start_backend_worker,args = (182, ))) + worker_list.append(Process(target = start_backend_worker,args = (183, ))) + worker_list.append(Process(target = start_backend_worker,args = (184, ))) + worker_list.append(Process(target = start_backend_worker,args = (185, ))) + worker_list.append(Process(target = start_backend_worker,args = (186, ))) for proc in worker_list: proc.start() |