From 5c2764816a3db7a4a2a8d8099967abf5f9462ffe Mon Sep 17 00:00:00 2001 From: pzread Date: Sun, 26 May 2013 02:58:29 +0800 Subject: Do file connection --- src/py/backend_server.py | 140 ++++++++++++++++++++++++++++------------------- src/py/netio.py | 33 +++++++---- 2 files changed, 107 insertions(+), 66 deletions(-) (limited to 'src/py') diff --git a/src/py/backend_server.py b/src/py/backend_server.py index 110dd07..cac0e11 100644 --- a/src/py/backend_server.py +++ b/src/py/backend_server.py @@ -32,47 +32,27 @@ class BackendWorker(tornado.tcpserver.TCPServer): self._linkid = None self._idendesc = None - self._pendconn_linkidmap = {} + self._pend_callconn_linkidmap = {} + self._pend_fileconn_linkidmap = {} self._client_linkidmap = {} def start(self): sock_port = random.randrange(4096,8192) self.listen(sock_port) self.sock_addr = ('127.0.0.1',sock_port) + self._conn_center() - def handle_stream(self,stream,address): + def handle_stream(self,stream,addr): def _recv_conn_info(data): - def __send_back(stat): - netio.send_pack(stream,bytes(json.dumps(stat),'utf-8')) - info = json.loads(data.decode('utf-8')) - linkclass = info['linkclass'] - linkid = info['linkid'] - - conn = Proxy.instance.get_conn(linkid) - if conn != None: - return - - if linkid not in self._pendconn_linkidmap: - __send_back(True) - - conn = netio.SocketConnection(linkclass,linkid,stream) - Proxy.instance.add_conn(conn) - - else: - if self._linkid > linkid: - __send_back(True) + conntype = info['conntype'] - conn = netio.SocketConnection(linkclass,linkid,stream) - Proxy.instance.add_conn(conn) + if conntype == 'call': + self._handle_callconn(stream,addr,info) - pends = self._pendconn_linkidmap.pop(linkid) - for callback in pends: - callback(conn) - - else: - __send_back(False) + elif conntype == 'file': + self._handle_fileconn(stream,addr,info) netio.recv_pack(stream,_recv_conn_info) @@ -115,12 +95,11 @@ class BackendWorker(tornado.tcpserver.TCPServer): self.center_conn.add_close_callback(lambda conn : __retry()) Proxy.instance.add_conn(self.center_conn) + imc_register_call('','test_dst',self._test_dst) imc_register_call('','test_dsta',self._test_dsta) time.sleep(0.5) - #x = int(self._iden['linkid']) - (int(self._iden['linkid']) - 2) % 4 - #self._test_call(None,str(x)) if int(self._linkid) % 2 == 0: self._test_call(None,str(int(self._linkid) + 1)) @@ -140,36 +119,46 @@ class BackendWorker(tornado.tcpserver.TCPServer): def _conn_linkid(self,linkid): def __handle_pend(conn): - pends = self._pendconn_linkidmap.pop(worker_linkid) + pends = self._pend_callconn_linkidmap.pop(worker_linkid) for gr in pends: gr.switch(conn) - def __send_info(): - def ___recv_cb(data): - stat = json.loads(data.decode('utf-8')) - - if stat == True: - conn = netio.SocketConnection(worker_linkclass,worker_linkid,stream) - Proxy.instance.add_conn(conn) - __handle_pend(conn) - - else: - stream.set_close_callback(None) - stream.close() - + def __call_conn_cb(): conn = Proxy.instance.get_conn(worker_linkid) if conn != None: __handle_pend(conn) - stream.set_close_callback(None) - stream.close() + call_stream.set_close_callback(None) + call_stream.close() else: - netio.send_pack(stream,bytes(json.dumps({ + netio.send_pack(call_stream,bytes(json.dumps({ + 'conntype':'call', 'linkclass':'backend', 'linkid':self._linkid }),'utf-8')) - netio.recv_pack(stream,___recv_cb) + netio.recv_pack(call_stream,__call_recv_cb) + + def __call_recv_cb(data): + def ___file_conn_cb(): + conn = netio.SocketConnection(worker_linkclass,worker_linkid,call_stream,file_stream) + Proxy.instance.add_conn(conn) + __handle_pend(conn) + + netio.send_pack(file_stream,bytes(json.dumps({ + 'conntype':'file', + 'linkid':self._linkid + }),'utf-8')) + + stat = json.loads(data.decode('utf-8')) + if stat == True: + file_stream = tornado.iostream.IOStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)) + file_stream.connect(sock_addr,___file_conn_cb) + else: + call_stream.set_close_callback(None) + call_stream.close() + + if self.center_conn == None: return None @@ -186,19 +175,60 @@ class BackendWorker(tornado.tcpserver.TCPServer): if conn != None: return conn - elif worker_linkid in self._pendconn_linkidmap: - self._pendconn_linkidmap[worker_linkid].append(imc.async.current()) + elif worker_linkid in self._pend_callconn_linkidmap: + self._pend_callconn_linkidmap[worker_linkid].append(imc.async.current()) return imc.async.switchtop() else: - self._pendconn_linkidmap[worker_linkid] = [imc.async.current()] + self._pend_callconn_linkidmap[worker_linkid] = [imc.async.current()] - stream = tornado.iostream.IOStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)) - stream.set_close_callback(lambda : __handle_pend(None)) - stream.connect((ret['sock_ip'],ret['sock_port']),__send_info) + sock_addr = (ret['sock_ip'],ret['sock_port']) + + call_stream = tornado.iostream.IOStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)) + call_stream.set_close_callback(lambda : __handle_pend(None)) + call_stream.connect(sock_addr,__call_conn_cb) return imc.async.switchtop() + + def _handle_callconn(self,call_stream,addr,info): + def __send_back(stat): + if stat == True: + self._pend_fileconn_linkidmap[linkid] = __file_conn_cb + + netio.send_pack(call_stream,bytes(json.dumps(stat),'utf-8')) + + def __file_conn_cb(file_stream): + conn = netio.SocketConnection(linkclass,linkid,call_stream,file_stream) + Proxy.instance.add_conn(conn) + + linkclass = info['linkclass'] + linkid = info['linkid'] + + conn = Proxy.instance.get_conn(linkid) + if conn != None: + return + + if linkid not in self._pend_callconn_linkidmap: + __send_back(True) + + else: + if self._linkid > linkid: + __send_back(True) + + pends = self._pend_callconn_linkidmap.pop(linkid) + for callback in pends: + callback(conn) + + else: + __send_back(False) + def _handle_fileconn(self,file_stream,addr,info): + try: + self._pend_fileconn_linkidmap.pop(info['linkid'])(file_stream) + + except KeyError: + pass + @imc.async.caller def _test_call(self,iden,param): print(time.perf_counter()) diff --git a/src/py/netio.py b/src/py/netio.py index c6aacbc..0e8cab1 100644 --- a/src/py/netio.py +++ b/src/py/netio.py @@ -16,35 +16,42 @@ def recv_pack(stream,callback): stream.read_bytes(8,_recv_size) class SocketConnection(Connection): - def __init__(self,linkclass,linkid,stream): + def __init__(self,linkclass,linkid,call_stream,file_stream = None): super().__init__(linkclass,linkid) - self.ioloop = tornado.ioloop.IOLoop.current() - self.stream = stream - self.stream.set_close_callback(self.close) + self._ioloop = tornado.ioloop.IOLoop.current() + self.call_stream = call_stream + self.call_stream.set_close_callback(self.close) + + if file_stream == None: + self.file_stream = None + + else: + self.file_stream = file_stream + self.file_stream.set_close_callback(self.close) self._start_ping() def send_msg(self,data): - self.stream.write(struct.pack('l',len(data)) + data) + self.call_stream.write(struct.pack('l',len(data)) + data) def start_recv(self,recv_callback): def _recv_size(data): size, = struct.unpack('l',data) if size > 0: - self.stream.read_bytes(size,_recv_data) + self.call_stream.read_bytes(size,_recv_data) else: if size == -1: #pong self._ping_delay = 0 - self.stream.read_bytes(8,_recv_size) + self.call_stream.read_bytes(8,_recv_size) def _recv_data(data): self._recv_callback(self,data) - self.stream.read_bytes(8,_recv_size) + self.call_stream.read_bytes(8,_recv_size) self._recv_callback = tornado.stack_context.wrap(recv_callback) - self.stream.read_bytes(8,_recv_size) + self.call_stream.read_bytes(8,_recv_size) def close(self): try: @@ -52,11 +59,15 @@ class SocketConnection(Connection): except AttributeError: pass + self.call_stream.close() + if self.file_stream != None: + self.file_stream.close() + super().close() def _start_ping(self): def __check(): - self.stream.write(struct.pack('l',-1)) + self.call_stream.write(struct.pack('l',-1)) self._ping_delay += 1 if self._ping_delay > 10: @@ -70,7 +81,7 @@ class WebSocketConnection(Connection): def __init__(self,linkclass,linkid,handler): super().__init__(linkclass,linkid) - self.ioloop = tornado.ioloop.IOLoop.current() + self._ioloop = tornado.ioloop.IOLoop.current() self.handler = handler def send_msg(self,data): -- cgit v1.2.3