diff options
author | pzread <netfirewall@gmail.com> | 2013-05-28 18:39:00 +0800 |
---|---|---|
committer | pzread <netfirewall@gmail.com> | 2013-05-28 18:39:00 +0800 |
commit | 4c3250e4a7c130c1503a1ed8d102d0fd7d40bf2e (patch) | |
tree | 9fac357248c20e207f7fcfb34df61354736539ad | |
parent | a0553244901985ae014e0b4c0bc5230c7cdca288 (diff) | |
download | taiwan-online-judge-4c3250e4a7c130c1503a1ed8d102d0fd7d40bf2e.tar taiwan-online-judge-4c3250e4a7c130c1503a1ed8d102d0fd7d40bf2e.tar.gz taiwan-online-judge-4c3250e4a7c130c1503a1ed8d102d0fd7d40bf2e.tar.bz2 taiwan-online-judge-4c3250e4a7c130c1503a1ed8d102d0fd7d40bf2e.tar.lz taiwan-online-judge-4c3250e4a7c130c1503a1ed8d102d0fd7d40bf2e.tar.xz taiwan-online-judge-4c3250e4a7c130c1503a1ed8d102d0fd7d40bf2e.tar.zst taiwan-online-judge-4c3250e4a7c130c1503a1ed8d102d0fd7d40bf2e.zip |
Done sendfile without fault torlerance
-rw-r--r-- | src/py/backend_server.py | 89 | ||||
-rw-r--r-- | src/py/center_server.py | 38 | ||||
-rwxr-xr-x | src/py/imc/proxy.py | 10 | ||||
-rw-r--r-- | src/py/netio.py | 110 |
4 files changed, 170 insertions, 77 deletions
diff --git a/src/py/backend_server.py b/src/py/backend_server.py index 67b2e4e..2e91382 100644 --- a/src/py/backend_server.py +++ b/src/py/backend_server.py @@ -7,7 +7,6 @@ import time import random from multiprocessing import Process -import tornado.iostream import tornado.ioloop import tornado.tcpserver import tornado.httpserver @@ -18,7 +17,7 @@ import imc.async from imc.proxy import Proxy,Connection,imc_call,imc_call_async,imc_register_call import netio -from netio import SocketConnection,WebSocketConnection +from netio import SocketStream,SocketConnection,WebSocketConnection from tojauth import TOJAuth class BackendWorker(tornado.tcpserver.TCPServer): @@ -32,7 +31,7 @@ class BackendWorker(tornado.tcpserver.TCPServer): self._linkid = None self._idendesc = None - self._pend_callconn_linkidmap = {} + self._pend_mainconn_linkidmap = {} self._pend_filestream_filekeymap = {} self._client_linkidmap = {} @@ -48,15 +47,15 @@ class BackendWorker(tornado.tcpserver.TCPServer): info = json.loads(data.decode('utf-8')) conntype = info['conntype'] - if conntype == 'call': - self._handle_callconn(sock_stream,addr,info) + if conntype == 'main': + self._handle_mainconn(sock_stream,addr,info) elif conntype == 'file': self._handle_fileconn(sock_stream,addr,info) fd = stream.fileno() self._ioloop.remove_handler(fd) - sock_stream = netio.SocketStream(socket.fromfd(fd,socket.AF_INET,socket.SOCK_STREAM | socket.SOCK_NONBLOCK,0),addr) + sock_stream = SocketStream(socket.fromfd(fd,socket.AF_INET,socket.SOCK_STREAM | socket.SOCK_NONBLOCK,0),addr) netio.recv_pack(sock_stream,_recv_conn_info) @@ -95,17 +94,17 @@ class BackendWorker(tornado.tcpserver.TCPServer): self._linkid = iden['linkid'] Proxy('backend',self._linkid,TOJAuth.instance,self._conn_linkid) - self.center_conn = netio.SocketConnection('center',info['center_linkid'],stream) + self.center_conn = SocketConnection('center',info['center_linkid'],stream) self.center_conn.add_close_callback(__retry) Proxy.instance.add_conn(self.center_conn) imc_register_call('','test_dst',self._test_dst) - imc_register_call('','test_dsta',self._test_dsta) - time.sleep(0.5) + #imc_register_call('','test_dsta',self._test_dsta) + time.sleep(1) - if int(self._linkid) % 2 == 0: - self._test_call(None,str(int(self._linkid) + 1)) + if int(self._linkid) == 2: + self._test_call(None,'4') sock_ip,sock_port = self.sock_addr netio.send_pack(stream,bytes(json.dumps({ @@ -117,17 +116,17 @@ class BackendWorker(tornado.tcpserver.TCPServer): }),'utf-8')) netio.recv_pack(stream,___recv_info_cb) - stream = netio.SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),self.center_addr) + stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),self.center_addr) stream.set_close_callback(__retry) stream.connect(__send_worker_info) def _conn_linkid(self,linkid): def __handle_pend(conn): - pends = self._pend_callconn_linkidmap.pop(worker_linkid) + pends = self._pend_mainconn_linkidmap.pop(worker_linkid) for gr in pends: gr.switch(conn) - def __call_conn_cb(): + def __conn_cb(): conn = Proxy.instance.get_conn(worker_linkid) if conn != None: __handle_pend(conn) @@ -136,26 +135,16 @@ class BackendWorker(tornado.tcpserver.TCPServer): else: netio.send_pack(main_stream,bytes(json.dumps({ - 'conntype':'call', + 'conntype':'main', 'linkclass':'backend', 'linkid':self._linkid }),'utf-8')) - netio.recv_pack(main_stream,__call_recv_cb) - - def __call_recv_cb(data): - #def ___file_conn_cb(): - # netio.send_pack(file_stream,bytes(json.dumps({ - # 'conntype':'file', - # 'linkid':self._linkid - # }),'utf-8')) - - # conn = netio.SocketConnection(worker_linkclass,worker_linkid,main_stream,file_stream) - # Proxy.instance.add_conn(conn) - # __handle_pend(conn) - + netio.recv_pack(main_stream,__recv_cb) + + def __recv_cb(data): stat = json.loads(data.decode('utf-8')) if stat == True: - conn = netio.SocketConnection(worker_linkclass,worker_linkid,main_stream,self._add_pend_filestream) + conn = SocketConnection(worker_linkclass,worker_linkid,main_stream,self._add_pend_filestream) Proxy.instance.add_conn(conn) __handle_pend(conn) @@ -179,28 +168,28 @@ class BackendWorker(tornado.tcpserver.TCPServer): if conn != None: return conn - elif worker_linkid in self._pend_callconn_linkidmap: - self._pend_callconn_linkidmap[worker_linkid].append(imc.async.current()) + elif worker_linkid in self._pend_mainconn_linkidmap: + self._pend_mainconn_linkidmap[worker_linkid].append(imc.async.current()) return imc.async.switchtop() else: - self._pend_callconn_linkidmap[worker_linkid] = [imc.async.current()] + self._pend_mainconn_linkidmap[worker_linkid] = [imc.async.current()] sock_addr = (ret['sock_ip'],ret['sock_port']) - main_stream = netio.SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),sock_addr) + main_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),sock_addr) main_stream.set_close_callback(lambda conn : __handle_pend(None)) - main_stream.connect(__call_conn_cb) + main_stream.connect(__conn_cb) return imc.async.switchtop() def _add_pend_filestream(self,filekey,callback): self._pend_filestream_filekeymap[filekey] = tornado.stack_context.wrap(callback) - def _handle_callconn(self,main_stream,addr,info): + def _handle_mainconn(self,main_stream,addr,info): def __send_back(stat): if stat == True: - conn = netio.SocketConnection(linkclass,linkid,main_stream,self._add_pend_filestream) + conn = SocketConnection(linkclass,linkid,main_stream,self._add_pend_filestream) Proxy.instance.add_conn(conn) netio.send_pack(main_stream,bytes(json.dumps(stat),'utf-8')) @@ -212,14 +201,14 @@ class BackendWorker(tornado.tcpserver.TCPServer): if conn != None: return - if linkid not in self._pend_callconn_linkidmap: + if linkid not in self._pend_mainconn_linkidmap: __send_back(True) else: if self._linkid > linkid: __send_back(True) - pends = self._pend_callconn_linkidmap.pop(linkid) + pends = self._pend_mainconn_linkidmap.pop(linkid) for callback in pends: callback(conn) @@ -228,7 +217,6 @@ class BackendWorker(tornado.tcpserver.TCPServer): def _handle_fileconn(self,file_stream,addr,info): try: - print('recv conn') self._pend_filestream_filekeymap.pop(info['filekey'])(file_stream) except KeyError: @@ -237,15 +225,20 @@ class BackendWorker(tornado.tcpserver.TCPServer): @imc.async.caller def _test_call(self,iden,param): + print('start cold test') + filekey = Proxy.instance.sendfile(self._idendesc,'/backend/' + param + '/','archlinux-2013.05.01-dual.iso') - print(time.perf_counter()) dst = '/backend/' + param + '/' - for i in range(1): - ret = imc_call(self._idendesc,dst,'test_dst',filekey) - print(time.perf_counter()) + ret = imc_call(self._idendesc,dst,'test_dst',filekey) + + time.sleep(10) + print('start warm test') - print(ret) + filekey = Proxy.instance.sendfile(self._idendesc,'/backend/' + param + '/','archlinux-2013.05.01-dual.iso') + + dst = '/backend/' + param + '/' + ret = imc_call(self._idendesc,dst,'test_dst',filekey) @imc.async.caller def _test_dst(self,iden,param): @@ -302,8 +295,12 @@ if __name__ == '__main__': worker_list.append(Process(target = start_backend_worker,args = (81, ))) worker_list.append(Process(target = start_backend_worker,args = (82, ))) - #worker_list.append(Process(target = start_backend_worker,args = (181, ))) - #worker_list.append(Process(target = start_backend_worker,args = (182, ))) + worker_list.append(Process(target = start_backend_worker,args = (181, ))) + worker_list.append(Process(target = start_backend_worker,args = (182, ))) + worker_list.append(Process(target = start_backend_worker,args = (183, ))) + worker_list.append(Process(target = start_backend_worker,args = (184, ))) + worker_list.append(Process(target = start_backend_worker,args = (185, ))) + worker_list.append(Process(target = start_backend_worker,args = (186, ))) for proc in worker_list: proc.start() diff --git a/src/py/center_server.py b/src/py/center_server.py index f7db86b..513d946 100644 --- a/src/py/center_server.py +++ b/src/py/center_server.py @@ -3,6 +3,7 @@ import random import json import uuid +import socket import tornado.ioloop import tornado.tcpserver @@ -13,23 +14,23 @@ import imc.async from imc.proxy import Proxy,Connection,imc_call,imc_call_async,imc_register_call import netio -from netio import SocketConnection +from netio import SocketStream,SocketConnection from tojauth import TOJAuth class Worker: - def __init__(self,stream,linkclass,linkid,idendesc,worker_info,center_linkid): - self.stream = stream + def __init__(self,main_stream,linkclass,linkid,idendesc,worker_info,center_linkid): + self.main_stream = main_stream self.linkclass = linkclass self.linkid = linkid self.idendesc = idendesc self.sock_addr = (worker_info['sock_ip'],worker_info['sock_port']) - netio.send_pack(self.stream,bytes(json.dumps({ + netio.send_pack(self.main_stream,bytes(json.dumps({ 'idendesc':self.idendesc, 'center_linkid':center_linkid }),'utf-8')) - conn = SocketConnection(self.linkclass,self.linkid,self.stream) + conn = SocketConnection(self.linkclass,self.linkid,self.main_stream) conn.add_close_callback(lambda conn : self.close()) Proxy.instance.add_conn(conn) @@ -37,10 +38,10 @@ class Worker: pass class BackendWorker(Worker): - def __init__(self,stream,linkid,idendesc,worker_info,center_linkid): + def __init__(self,main_stream,linkid,idendesc,worker_info,center_linkid): global center_serv - super().__init__(stream,'backend',linkid,idendesc,worker_info,center_linkid) + super().__init__(main_stream,'backend',linkid,idendesc,worker_info,center_linkid) self.ws_addr = (worker_info['ws_ip'],worker_info['ws_port']) center_serv.add_backend_worker(self) @@ -55,6 +56,7 @@ class CenterServer(tornado.tcpserver.TCPServer): def __init__(self): super().__init__() + self._ioloop = tornado.ioloop.IOLoop.instance() self._linkid_usemap = {} self._worker_linkidmap = {} self._backend_clientmap = {} @@ -75,7 +77,7 @@ class CenterServer(tornado.tcpserver.TCPServer): imc_register_call('','test_dst',self._test_dst) imc_register_call('','test_dstb',self._test_dstb) - def handle_stream(self,stream,address): + def handle_stream(self,stream,addr): def _recv_worker_info(data): worker_info = json.loads(data.decode('utf-8')) @@ -83,9 +85,13 @@ class CenterServer(tornado.tcpserver.TCPServer): if linkclass == 'backend': linkid = self._create_linkid() idendesc = self._create_idendesc('backend',linkid) - BackendWorker(stream,linkid,idendesc,worker_info,self._linkid) + BackendWorker(main_stream,linkid,idendesc,worker_info,self._linkid) - netio.recv_pack(stream,_recv_worker_info) + fd = stream.fileno() + self._ioloop.remove_handler(fd) + main_stream = SocketStream(socket.fromfd(fd,socket.AF_INET,socket.SOCK_STREAM | socket.SOCK_NONBLOCK,0),addr) + + netio.recv_pack(main_stream,_recv_worker_info) def add_backend_worker(self,backend): backend_linkid = backend.linkid @@ -133,7 +139,17 @@ class CenterServer(tornado.tcpserver.TCPServer): linkid = param try: - worker = self._worker_linkidmap[linkid] + #worker = self._worker_linkidmap[linkid] + + a = int(iden['linkid']) + b = int(linkid) + + if b > a: + worker = self._worker_linkidmap[str(a + 1)] + + else: + worker = self._worker_linkidmap[str(a - 1)] + if iden['linkclass'] != 'client': sock_ip,sock_port = worker.sock_addr return { diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py index b86e484..8897a7e 100755 --- a/src/py/imc/proxy.py +++ b/src/py/imc/proxy.py @@ -224,9 +224,6 @@ class Proxy: return def _route_sendfile(self,out_conn,src_linkid,filekey,filesize): - def _recv_redirect_cb(data): - out_conn.send_filedata(filekey,data) - if src_linkid == self._linkid: try: info = self._info_filekeymap.pop(filekey) @@ -242,8 +239,11 @@ class Proxy: pass else: + print('test start') + in_conn = self._request_conn(src_linkid) - in_conn.recv_filedata(filekey,filesize,_recv_redirect_cb) + send_fn = out_conn.send_filedata(filekey,filesize) + in_conn.recv_filedata(filekey,filesize,send_fn) self._send_msg_sendfile(in_conn,src_linkid,filekey,filesize) @@ -383,7 +383,7 @@ class Proxy: @async.callee def imc_call(idendesc,dst,func_name,param,_grid): - return Proxy.instance.call(_grid,1000,idendesc,dst,func_name,param) + return Proxy.instance.call(_grid,1000000,idendesc,dst,func_name,param) def imc_call_async(idendesc,dst,func_name,param,callback = None): @async.caller diff --git a/src/py/netio.py b/src/py/netio.py index fd10db6..8385b58 100644 --- a/src/py/netio.py +++ b/src/py/netio.py @@ -9,6 +9,7 @@ import time import tornado.ioloop import tornado.stack_context +import imc.async from imc.proxy import Connection def send_pack(stream,data): @@ -24,12 +25,14 @@ def recv_pack(stream,callback): class SocketStream: def __init__(self,sock,addr): self.DATA_BUF = 0 - self.DATA_FILE = 1 + self.DATA_NOBUF = 1 + self.DATA_FILE = 2 self._ioloop = tornado.ioloop.IOLoop.current() self._sock = sock self._conning = False + self._closed = False self._conn_callback = None self._close_callback = None @@ -43,31 +46,47 @@ class SocketStream: self.addr = addr def connect(self,callback): + if self._closed == True: + raise ConnectionError + try: - self._conning = True self._conn_callback = tornado.stack_context.wrap(callback) self._stat |= tornado.ioloop.IOLoop.WRITE self._ioloop.update_handler(self._sock.fileno(),self._stat) + self._conning = True self._sock.connect(self.addr) except BlockingIOError: pass - def read_bytes(self,size,callback = None): - self._read_queue.append([self.DATA_BUF,size,bytearray(),tornado.stack_context.wrap(callback)]) + def read_bytes(self,size,callback = None,nonbuf = False): + if self._closed == True: + raise ConnectionError + + if nonbuf == False: + self._read_queue.append([self.DATA_BUF,size,bytearray(),tornado.stack_context.wrap(callback)]) + else: + self._read_queue.append([self.DATA_NOBUF,size,tornado.stack_context.wrap(callback)]) + self._stat |= tornado.ioloop.IOLoop.READ self._ioloop.update_handler(self._sock.fileno(),self._stat) def write(self,buf,callback = None): + if self._closed == True: + raise ConnectionError + self._write_queue.append([self.DATA_BUF,0,buf,tornado.stack_context.wrap(callback)]) self._stat |= tornado.ioloop.IOLoop.WRITE self._ioloop.update_handler(self._sock.fileno(),self._stat) def sendfile(self,fd,callback = None): + if self._closed == True: + raise ConnectionError + size = os.fstat(fd).st_size self._write_queue.append([self.DATA_FILE,size,fd,tornado.stack_context.wrap(callback)]) @@ -76,6 +95,9 @@ class SocketStream: self._ioloop.update_handler(self._sock.fileno(),self._stat) def recvfile(self,fd,size,callback = None): + if self._closed == True: + raise ConnectionError + self._read_queue.append([self.DATA_FILE,size,fd,tornado.stack_context.wrap(callback)]) self._stat |= tornado.ioloop.IOLoop.READ @@ -89,6 +111,13 @@ class SocketStream: self._close_callback = tornado.stack_context.wrap(callback) def close(self): + if self._closed == True: + return + + self._closed = True + self._ioloop.remove_handler(self._sock.fileno()) + self._sock.close() + if self._close_callback != None: self._close_callback(self) @@ -104,6 +133,9 @@ class SocketStream: try: while True: buf = self._sock.recv(size) + if len(buf) == 0: + self.close() + return iocb[2].extend(buf) size -= len(buf) @@ -119,6 +151,24 @@ class SocketStream: iocb[1] = size break + elif datatype == self.DATA_NOBUF: + size = iocb[1] + + try: + while True: + buf = self._sock.recv(size) + + iocb[2](buf) + size -= len(buf) + + if size == 0: + self._read_queue.popleft() + break + + except BlockingIOError: + iocb[1] = size + break + elif datatype == self.DATA_FILE: size = iocb[1] @@ -190,6 +240,9 @@ class SocketStream: iocb[1] = size break + if self._closed == True: + return + stat = tornado.ioloop.IOLoop.ERROR if len(self._read_queue) > 0: stat |= tornado.ioloop.IOLoop.READ @@ -209,7 +262,7 @@ class SocketConnection(Connection): self._stream_filekeymap = {} self.main_stream = main_stream - self.main_stream.set_close_callback(self.close) + self.main_stream.set_close_callback(lambda conn : self.close()) self.add_pend_filestream = add_pend_filestream_fn self._start_ping() @@ -238,13 +291,9 @@ class SocketConnection(Connection): file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),self.main_stream.addr) file_stream.connect(_conn_cb) - #send_pack(self.file_stream,bytes(json.dumps({'filekey':filekey,'filesize':filesize}),'utf-8')) - #recv_pack(self.file_stream,_recv_cb) - def recv_file(self,filekey,filesize,filepath): def _conn_cb(file_stream): - print(filesize) - + self._stream_filekeymap[filekey] = file_stream file_stream.recvfile(fd,filesize,_done_cb) def _done_cb(): @@ -258,11 +307,37 @@ class SocketConnection(Connection): st = time.perf_counter() def send_filedata(self,filekey,filesize): - pass + def _conn_cb(): + nonlocal file_stream - def recv_filedata(self,filekey,filesize): - print('test') - pass + file_stream = stream + self._stream_filekeymap[filekey] = file_stream + + send_pack(file_stream,bytes(json.dumps({ + 'conntype':'file', + 'filekey':filekey + }),'utf-8')) + + old_gr.switch() + + def _send_cb(data): + file_stream.write(data) + + file_stream = None + stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),self.main_stream.addr) + + old_gr = imc.async.current() + stream.connect(_conn_cb) + imc.async.switchtop() + + return _send_cb + + def recv_filedata(self,filekey,filesize,callback): + def _conn_cb(file_stream): + self._stream_filekeymap[filekey] = file_stream + file_stream.read_bytes(filesize,callback,nonbuf = True) + + self.add_pend_filestream(filekey,_conn_cb) def start_recv(self,recv_callback): def _recv_size(data): @@ -285,6 +360,7 @@ class SocketConnection(Connection): def close(self): try: self._ping_timer.stop() + except AttributeError: pass @@ -294,7 +370,11 @@ class SocketConnection(Connection): def _start_ping(self): def __check(): - self.main_stream.write(struct.pack('l',-1)) + try: + self.main_stream.write(struct.pack('l',-1)) + + except ConnectionError: + return self._ping_delay += 1 if self._ping_delay > 10: |