diff options
author | pzread <netfirewall@gmail.com> | 2013-05-26 02:58:29 +0800 |
---|---|---|
committer | pzread <netfirewall@gmail.com> | 2013-05-26 02:58:29 +0800 |
commit | 5c2764816a3db7a4a2a8d8099967abf5f9462ffe (patch) | |
tree | 029ec09ad4c321b8fb134a2f05931c8cbe2e3e57 /src/py/backend_server.py | |
parent | 3653c85804a25de0162064449b4585717af77557 (diff) | |
download | taiwan-online-judge-5c2764816a3db7a4a2a8d8099967abf5f9462ffe.tar taiwan-online-judge-5c2764816a3db7a4a2a8d8099967abf5f9462ffe.tar.gz taiwan-online-judge-5c2764816a3db7a4a2a8d8099967abf5f9462ffe.tar.bz2 taiwan-online-judge-5c2764816a3db7a4a2a8d8099967abf5f9462ffe.tar.lz taiwan-online-judge-5c2764816a3db7a4a2a8d8099967abf5f9462ffe.tar.xz taiwan-online-judge-5c2764816a3db7a4a2a8d8099967abf5f9462ffe.tar.zst taiwan-online-judge-5c2764816a3db7a4a2a8d8099967abf5f9462ffe.zip |
Do file connection
Diffstat (limited to 'src/py/backend_server.py')
-rw-r--r-- | src/py/backend_server.py | 140 |
1 files changed, 85 insertions, 55 deletions
diff --git a/src/py/backend_server.py b/src/py/backend_server.py index 110dd07..cac0e11 100644 --- a/src/py/backend_server.py +++ b/src/py/backend_server.py @@ -32,47 +32,27 @@ class BackendWorker(tornado.tcpserver.TCPServer): self._linkid = None self._idendesc = None - self._pendconn_linkidmap = {} + self._pend_callconn_linkidmap = {} + self._pend_fileconn_linkidmap = {} self._client_linkidmap = {} def start(self): sock_port = random.randrange(4096,8192) self.listen(sock_port) self.sock_addr = ('127.0.0.1',sock_port) + self._conn_center() - def handle_stream(self,stream,address): + def handle_stream(self,stream,addr): def _recv_conn_info(data): - def __send_back(stat): - netio.send_pack(stream,bytes(json.dumps(stat),'utf-8')) - info = json.loads(data.decode('utf-8')) - linkclass = info['linkclass'] - linkid = info['linkid'] - - conn = Proxy.instance.get_conn(linkid) - if conn != None: - return - - if linkid not in self._pendconn_linkidmap: - __send_back(True) - - conn = netio.SocketConnection(linkclass,linkid,stream) - Proxy.instance.add_conn(conn) - - else: - if self._linkid > linkid: - __send_back(True) + conntype = info['conntype'] - conn = netio.SocketConnection(linkclass,linkid,stream) - Proxy.instance.add_conn(conn) + if conntype == 'call': + self._handle_callconn(stream,addr,info) - pends = self._pendconn_linkidmap.pop(linkid) - for callback in pends: - callback(conn) - - else: - __send_back(False) + elif conntype == 'file': + self._handle_fileconn(stream,addr,info) netio.recv_pack(stream,_recv_conn_info) @@ -115,12 +95,11 @@ class BackendWorker(tornado.tcpserver.TCPServer): self.center_conn.add_close_callback(lambda conn : __retry()) Proxy.instance.add_conn(self.center_conn) + imc_register_call('','test_dst',self._test_dst) imc_register_call('','test_dsta',self._test_dsta) time.sleep(0.5) - #x = int(self._iden['linkid']) - (int(self._iden['linkid']) - 2) % 4 - #self._test_call(None,str(x)) if int(self._linkid) % 2 == 0: self._test_call(None,str(int(self._linkid) + 1)) @@ -140,36 +119,46 @@ class BackendWorker(tornado.tcpserver.TCPServer): def _conn_linkid(self,linkid): def __handle_pend(conn): - pends = self._pendconn_linkidmap.pop(worker_linkid) + pends = self._pend_callconn_linkidmap.pop(worker_linkid) for gr in pends: gr.switch(conn) - def __send_info(): - def ___recv_cb(data): - stat = json.loads(data.decode('utf-8')) - - if stat == True: - conn = netio.SocketConnection(worker_linkclass,worker_linkid,stream) - Proxy.instance.add_conn(conn) - __handle_pend(conn) - - else: - stream.set_close_callback(None) - stream.close() - + def __call_conn_cb(): conn = Proxy.instance.get_conn(worker_linkid) if conn != None: __handle_pend(conn) - stream.set_close_callback(None) - stream.close() + call_stream.set_close_callback(None) + call_stream.close() else: - netio.send_pack(stream,bytes(json.dumps({ + netio.send_pack(call_stream,bytes(json.dumps({ + 'conntype':'call', 'linkclass':'backend', 'linkid':self._linkid }),'utf-8')) - netio.recv_pack(stream,___recv_cb) + netio.recv_pack(call_stream,__call_recv_cb) + + def __call_recv_cb(data): + def ___file_conn_cb(): + conn = netio.SocketConnection(worker_linkclass,worker_linkid,call_stream,file_stream) + Proxy.instance.add_conn(conn) + __handle_pend(conn) + + netio.send_pack(file_stream,bytes(json.dumps({ + 'conntype':'file', + 'linkid':self._linkid + }),'utf-8')) + + stat = json.loads(data.decode('utf-8')) + if stat == True: + file_stream = tornado.iostream.IOStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)) + file_stream.connect(sock_addr,___file_conn_cb) + else: + call_stream.set_close_callback(None) + call_stream.close() + + if self.center_conn == None: return None @@ -186,19 +175,60 @@ class BackendWorker(tornado.tcpserver.TCPServer): if conn != None: return conn - elif worker_linkid in self._pendconn_linkidmap: - self._pendconn_linkidmap[worker_linkid].append(imc.async.current()) + elif worker_linkid in self._pend_callconn_linkidmap: + self._pend_callconn_linkidmap[worker_linkid].append(imc.async.current()) return imc.async.switchtop() else: - self._pendconn_linkidmap[worker_linkid] = [imc.async.current()] + self._pend_callconn_linkidmap[worker_linkid] = [imc.async.current()] - stream = tornado.iostream.IOStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)) - stream.set_close_callback(lambda : __handle_pend(None)) - stream.connect((ret['sock_ip'],ret['sock_port']),__send_info) + sock_addr = (ret['sock_ip'],ret['sock_port']) + + call_stream = tornado.iostream.IOStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)) + call_stream.set_close_callback(lambda : __handle_pend(None)) + call_stream.connect(sock_addr,__call_conn_cb) return imc.async.switchtop() + + def _handle_callconn(self,call_stream,addr,info): + def __send_back(stat): + if stat == True: + self._pend_fileconn_linkidmap[linkid] = __file_conn_cb + + netio.send_pack(call_stream,bytes(json.dumps(stat),'utf-8')) + + def __file_conn_cb(file_stream): + conn = netio.SocketConnection(linkclass,linkid,call_stream,file_stream) + Proxy.instance.add_conn(conn) + + linkclass = info['linkclass'] + linkid = info['linkid'] + + conn = Proxy.instance.get_conn(linkid) + if conn != None: + return + + if linkid not in self._pend_callconn_linkidmap: + __send_back(True) + + else: + if self._linkid > linkid: + __send_back(True) + + pends = self._pend_callconn_linkidmap.pop(linkid) + for callback in pends: + callback(conn) + + else: + __send_back(False) + def _handle_fileconn(self,file_stream,addr,info): + try: + self._pend_fileconn_linkidmap.pop(info['linkid'])(file_stream) + + except KeyError: + pass + @imc.async.caller def _test_call(self,iden,param): print(time.perf_counter()) |