aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorpzread <netfirewall@gmail.com>2013-05-28 18:39:00 +0800
committerpzread <netfirewall@gmail.com>2013-05-28 18:39:00 +0800
commit4c3250e4a7c130c1503a1ed8d102d0fd7d40bf2e (patch)
tree9fac357248c20e207f7fcfb34df61354736539ad
parenta0553244901985ae014e0b4c0bc5230c7cdca288 (diff)
downloadtaiwan-online-judge-4c3250e4a7c130c1503a1ed8d102d0fd7d40bf2e.tar
taiwan-online-judge-4c3250e4a7c130c1503a1ed8d102d0fd7d40bf2e.tar.gz
taiwan-online-judge-4c3250e4a7c130c1503a1ed8d102d0fd7d40bf2e.tar.bz2
taiwan-online-judge-4c3250e4a7c130c1503a1ed8d102d0fd7d40bf2e.tar.lz
taiwan-online-judge-4c3250e4a7c130c1503a1ed8d102d0fd7d40bf2e.tar.xz
taiwan-online-judge-4c3250e4a7c130c1503a1ed8d102d0fd7d40bf2e.tar.zst
taiwan-online-judge-4c3250e4a7c130c1503a1ed8d102d0fd7d40bf2e.zip
Done sendfile without fault torlerance
-rw-r--r--src/py/backend_server.py89
-rw-r--r--src/py/center_server.py38
-rwxr-xr-xsrc/py/imc/proxy.py10
-rw-r--r--src/py/netio.py110
4 files changed, 170 insertions, 77 deletions
diff --git a/src/py/backend_server.py b/src/py/backend_server.py
index 67b2e4e..2e91382 100644
--- a/src/py/backend_server.py
+++ b/src/py/backend_server.py
@@ -7,7 +7,6 @@ import time
import random
from multiprocessing import Process
-import tornado.iostream
import tornado.ioloop
import tornado.tcpserver
import tornado.httpserver
@@ -18,7 +17,7 @@ import imc.async
from imc.proxy import Proxy,Connection,imc_call,imc_call_async,imc_register_call
import netio
-from netio import SocketConnection,WebSocketConnection
+from netio import SocketStream,SocketConnection,WebSocketConnection
from tojauth import TOJAuth
class BackendWorker(tornado.tcpserver.TCPServer):
@@ -32,7 +31,7 @@ class BackendWorker(tornado.tcpserver.TCPServer):
self._linkid = None
self._idendesc = None
- self._pend_callconn_linkidmap = {}
+ self._pend_mainconn_linkidmap = {}
self._pend_filestream_filekeymap = {}
self._client_linkidmap = {}
@@ -48,15 +47,15 @@ class BackendWorker(tornado.tcpserver.TCPServer):
info = json.loads(data.decode('utf-8'))
conntype = info['conntype']
- if conntype == 'call':
- self._handle_callconn(sock_stream,addr,info)
+ if conntype == 'main':
+ self._handle_mainconn(sock_stream,addr,info)
elif conntype == 'file':
self._handle_fileconn(sock_stream,addr,info)
fd = stream.fileno()
self._ioloop.remove_handler(fd)
- sock_stream = netio.SocketStream(socket.fromfd(fd,socket.AF_INET,socket.SOCK_STREAM | socket.SOCK_NONBLOCK,0),addr)
+ sock_stream = SocketStream(socket.fromfd(fd,socket.AF_INET,socket.SOCK_STREAM | socket.SOCK_NONBLOCK,0),addr)
netio.recv_pack(sock_stream,_recv_conn_info)
@@ -95,17 +94,17 @@ class BackendWorker(tornado.tcpserver.TCPServer):
self._linkid = iden['linkid']
Proxy('backend',self._linkid,TOJAuth.instance,self._conn_linkid)
- self.center_conn = netio.SocketConnection('center',info['center_linkid'],stream)
+ self.center_conn = SocketConnection('center',info['center_linkid'],stream)
self.center_conn.add_close_callback(__retry)
Proxy.instance.add_conn(self.center_conn)
imc_register_call('','test_dst',self._test_dst)
- imc_register_call('','test_dsta',self._test_dsta)
- time.sleep(0.5)
+ #imc_register_call('','test_dsta',self._test_dsta)
+ time.sleep(1)
- if int(self._linkid) % 2 == 0:
- self._test_call(None,str(int(self._linkid) + 1))
+ if int(self._linkid) == 2:
+ self._test_call(None,'4')
sock_ip,sock_port = self.sock_addr
netio.send_pack(stream,bytes(json.dumps({
@@ -117,17 +116,17 @@ class BackendWorker(tornado.tcpserver.TCPServer):
}),'utf-8'))
netio.recv_pack(stream,___recv_info_cb)
- stream = netio.SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),self.center_addr)
+ stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),self.center_addr)
stream.set_close_callback(__retry)
stream.connect(__send_worker_info)
def _conn_linkid(self,linkid):
def __handle_pend(conn):
- pends = self._pend_callconn_linkidmap.pop(worker_linkid)
+ pends = self._pend_mainconn_linkidmap.pop(worker_linkid)
for gr in pends:
gr.switch(conn)
- def __call_conn_cb():
+ def __conn_cb():
conn = Proxy.instance.get_conn(worker_linkid)
if conn != None:
__handle_pend(conn)
@@ -136,26 +135,16 @@ class BackendWorker(tornado.tcpserver.TCPServer):
else:
netio.send_pack(main_stream,bytes(json.dumps({
- 'conntype':'call',
+ 'conntype':'main',
'linkclass':'backend',
'linkid':self._linkid
}),'utf-8'))
- netio.recv_pack(main_stream,__call_recv_cb)
-
- def __call_recv_cb(data):
- #def ___file_conn_cb():
- # netio.send_pack(file_stream,bytes(json.dumps({
- # 'conntype':'file',
- # 'linkid':self._linkid
- # }),'utf-8'))
-
- # conn = netio.SocketConnection(worker_linkclass,worker_linkid,main_stream,file_stream)
- # Proxy.instance.add_conn(conn)
- # __handle_pend(conn)
-
+ netio.recv_pack(main_stream,__recv_cb)
+
+ def __recv_cb(data):
stat = json.loads(data.decode('utf-8'))
if stat == True:
- conn = netio.SocketConnection(worker_linkclass,worker_linkid,main_stream,self._add_pend_filestream)
+ conn = SocketConnection(worker_linkclass,worker_linkid,main_stream,self._add_pend_filestream)
Proxy.instance.add_conn(conn)
__handle_pend(conn)
@@ -179,28 +168,28 @@ class BackendWorker(tornado.tcpserver.TCPServer):
if conn != None:
return conn
- elif worker_linkid in self._pend_callconn_linkidmap:
- self._pend_callconn_linkidmap[worker_linkid].append(imc.async.current())
+ elif worker_linkid in self._pend_mainconn_linkidmap:
+ self._pend_mainconn_linkidmap[worker_linkid].append(imc.async.current())
return imc.async.switchtop()
else:
- self._pend_callconn_linkidmap[worker_linkid] = [imc.async.current()]
+ self._pend_mainconn_linkidmap[worker_linkid] = [imc.async.current()]
sock_addr = (ret['sock_ip'],ret['sock_port'])
- main_stream = netio.SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),sock_addr)
+ main_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),sock_addr)
main_stream.set_close_callback(lambda conn : __handle_pend(None))
- main_stream.connect(__call_conn_cb)
+ main_stream.connect(__conn_cb)
return imc.async.switchtop()
def _add_pend_filestream(self,filekey,callback):
self._pend_filestream_filekeymap[filekey] = tornado.stack_context.wrap(callback)
- def _handle_callconn(self,main_stream,addr,info):
+ def _handle_mainconn(self,main_stream,addr,info):
def __send_back(stat):
if stat == True:
- conn = netio.SocketConnection(linkclass,linkid,main_stream,self._add_pend_filestream)
+ conn = SocketConnection(linkclass,linkid,main_stream,self._add_pend_filestream)
Proxy.instance.add_conn(conn)
netio.send_pack(main_stream,bytes(json.dumps(stat),'utf-8'))
@@ -212,14 +201,14 @@ class BackendWorker(tornado.tcpserver.TCPServer):
if conn != None:
return
- if linkid not in self._pend_callconn_linkidmap:
+ if linkid not in self._pend_mainconn_linkidmap:
__send_back(True)
else:
if self._linkid > linkid:
__send_back(True)
- pends = self._pend_callconn_linkidmap.pop(linkid)
+ pends = self._pend_mainconn_linkidmap.pop(linkid)
for callback in pends:
callback(conn)
@@ -228,7 +217,6 @@ class BackendWorker(tornado.tcpserver.TCPServer):
def _handle_fileconn(self,file_stream,addr,info):
try:
- print('recv conn')
self._pend_filestream_filekeymap.pop(info['filekey'])(file_stream)
except KeyError:
@@ -237,15 +225,20 @@ class BackendWorker(tornado.tcpserver.TCPServer):
@imc.async.caller
def _test_call(self,iden,param):
+ print('start cold test')
+
filekey = Proxy.instance.sendfile(self._idendesc,'/backend/' + param + '/','archlinux-2013.05.01-dual.iso')
- print(time.perf_counter())
dst = '/backend/' + param + '/'
- for i in range(1):
- ret = imc_call(self._idendesc,dst,'test_dst',filekey)
- print(time.perf_counter())
+ ret = imc_call(self._idendesc,dst,'test_dst',filekey)
+
+ time.sleep(10)
+ print('start warm test')
- print(ret)
+ filekey = Proxy.instance.sendfile(self._idendesc,'/backend/' + param + '/','archlinux-2013.05.01-dual.iso')
+
+ dst = '/backend/' + param + '/'
+ ret = imc_call(self._idendesc,dst,'test_dst',filekey)
@imc.async.caller
def _test_dst(self,iden,param):
@@ -302,8 +295,12 @@ if __name__ == '__main__':
worker_list.append(Process(target = start_backend_worker,args = (81, )))
worker_list.append(Process(target = start_backend_worker,args = (82, )))
- #worker_list.append(Process(target = start_backend_worker,args = (181, )))
- #worker_list.append(Process(target = start_backend_worker,args = (182, )))
+ worker_list.append(Process(target = start_backend_worker,args = (181, )))
+ worker_list.append(Process(target = start_backend_worker,args = (182, )))
+ worker_list.append(Process(target = start_backend_worker,args = (183, )))
+ worker_list.append(Process(target = start_backend_worker,args = (184, )))
+ worker_list.append(Process(target = start_backend_worker,args = (185, )))
+ worker_list.append(Process(target = start_backend_worker,args = (186, )))
for proc in worker_list:
proc.start()
diff --git a/src/py/center_server.py b/src/py/center_server.py
index f7db86b..513d946 100644
--- a/src/py/center_server.py
+++ b/src/py/center_server.py
@@ -3,6 +3,7 @@
import random
import json
import uuid
+import socket
import tornado.ioloop
import tornado.tcpserver
@@ -13,23 +14,23 @@ import imc.async
from imc.proxy import Proxy,Connection,imc_call,imc_call_async,imc_register_call
import netio
-from netio import SocketConnection
+from netio import SocketStream,SocketConnection
from tojauth import TOJAuth
class Worker:
- def __init__(self,stream,linkclass,linkid,idendesc,worker_info,center_linkid):
- self.stream = stream
+ def __init__(self,main_stream,linkclass,linkid,idendesc,worker_info,center_linkid):
+ self.main_stream = main_stream
self.linkclass = linkclass
self.linkid = linkid
self.idendesc = idendesc
self.sock_addr = (worker_info['sock_ip'],worker_info['sock_port'])
- netio.send_pack(self.stream,bytes(json.dumps({
+ netio.send_pack(self.main_stream,bytes(json.dumps({
'idendesc':self.idendesc,
'center_linkid':center_linkid
}),'utf-8'))
- conn = SocketConnection(self.linkclass,self.linkid,self.stream)
+ conn = SocketConnection(self.linkclass,self.linkid,self.main_stream)
conn.add_close_callback(lambda conn : self.close())
Proxy.instance.add_conn(conn)
@@ -37,10 +38,10 @@ class Worker:
pass
class BackendWorker(Worker):
- def __init__(self,stream,linkid,idendesc,worker_info,center_linkid):
+ def __init__(self,main_stream,linkid,idendesc,worker_info,center_linkid):
global center_serv
- super().__init__(stream,'backend',linkid,idendesc,worker_info,center_linkid)
+ super().__init__(main_stream,'backend',linkid,idendesc,worker_info,center_linkid)
self.ws_addr = (worker_info['ws_ip'],worker_info['ws_port'])
center_serv.add_backend_worker(self)
@@ -55,6 +56,7 @@ class CenterServer(tornado.tcpserver.TCPServer):
def __init__(self):
super().__init__()
+ self._ioloop = tornado.ioloop.IOLoop.instance()
self._linkid_usemap = {}
self._worker_linkidmap = {}
self._backend_clientmap = {}
@@ -75,7 +77,7 @@ class CenterServer(tornado.tcpserver.TCPServer):
imc_register_call('','test_dst',self._test_dst)
imc_register_call('','test_dstb',self._test_dstb)
- def handle_stream(self,stream,address):
+ def handle_stream(self,stream,addr):
def _recv_worker_info(data):
worker_info = json.loads(data.decode('utf-8'))
@@ -83,9 +85,13 @@ class CenterServer(tornado.tcpserver.TCPServer):
if linkclass == 'backend':
linkid = self._create_linkid()
idendesc = self._create_idendesc('backend',linkid)
- BackendWorker(stream,linkid,idendesc,worker_info,self._linkid)
+ BackendWorker(main_stream,linkid,idendesc,worker_info,self._linkid)
- netio.recv_pack(stream,_recv_worker_info)
+ fd = stream.fileno()
+ self._ioloop.remove_handler(fd)
+ main_stream = SocketStream(socket.fromfd(fd,socket.AF_INET,socket.SOCK_STREAM | socket.SOCK_NONBLOCK,0),addr)
+
+ netio.recv_pack(main_stream,_recv_worker_info)
def add_backend_worker(self,backend):
backend_linkid = backend.linkid
@@ -133,7 +139,17 @@ class CenterServer(tornado.tcpserver.TCPServer):
linkid = param
try:
- worker = self._worker_linkidmap[linkid]
+ #worker = self._worker_linkidmap[linkid]
+
+ a = int(iden['linkid'])
+ b = int(linkid)
+
+ if b > a:
+ worker = self._worker_linkidmap[str(a + 1)]
+
+ else:
+ worker = self._worker_linkidmap[str(a - 1)]
+
if iden['linkclass'] != 'client':
sock_ip,sock_port = worker.sock_addr
return {
diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py
index b86e484..8897a7e 100755
--- a/src/py/imc/proxy.py
+++ b/src/py/imc/proxy.py
@@ -224,9 +224,6 @@ class Proxy:
return
def _route_sendfile(self,out_conn,src_linkid,filekey,filesize):
- def _recv_redirect_cb(data):
- out_conn.send_filedata(filekey,data)
-
if src_linkid == self._linkid:
try:
info = self._info_filekeymap.pop(filekey)
@@ -242,8 +239,11 @@ class Proxy:
pass
else:
+ print('test start')
+
in_conn = self._request_conn(src_linkid)
- in_conn.recv_filedata(filekey,filesize,_recv_redirect_cb)
+ send_fn = out_conn.send_filedata(filekey,filesize)
+ in_conn.recv_filedata(filekey,filesize,send_fn)
self._send_msg_sendfile(in_conn,src_linkid,filekey,filesize)
@@ -383,7 +383,7 @@ class Proxy:
@async.callee
def imc_call(idendesc,dst,func_name,param,_grid):
- return Proxy.instance.call(_grid,1000,idendesc,dst,func_name,param)
+ return Proxy.instance.call(_grid,1000000,idendesc,dst,func_name,param)
def imc_call_async(idendesc,dst,func_name,param,callback = None):
@async.caller
diff --git a/src/py/netio.py b/src/py/netio.py
index fd10db6..8385b58 100644
--- a/src/py/netio.py
+++ b/src/py/netio.py
@@ -9,6 +9,7 @@ import time
import tornado.ioloop
import tornado.stack_context
+import imc.async
from imc.proxy import Connection
def send_pack(stream,data):
@@ -24,12 +25,14 @@ def recv_pack(stream,callback):
class SocketStream:
def __init__(self,sock,addr):
self.DATA_BUF = 0
- self.DATA_FILE = 1
+ self.DATA_NOBUF = 1
+ self.DATA_FILE = 2
self._ioloop = tornado.ioloop.IOLoop.current()
self._sock = sock
self._conning = False
+ self._closed = False
self._conn_callback = None
self._close_callback = None
@@ -43,31 +46,47 @@ class SocketStream:
self.addr = addr
def connect(self,callback):
+ if self._closed == True:
+ raise ConnectionError
+
try:
- self._conning = True
self._conn_callback = tornado.stack_context.wrap(callback)
self._stat |= tornado.ioloop.IOLoop.WRITE
self._ioloop.update_handler(self._sock.fileno(),self._stat)
+ self._conning = True
self._sock.connect(self.addr)
except BlockingIOError:
pass
- def read_bytes(self,size,callback = None):
- self._read_queue.append([self.DATA_BUF,size,bytearray(),tornado.stack_context.wrap(callback)])
+ def read_bytes(self,size,callback = None,nonbuf = False):
+ if self._closed == True:
+ raise ConnectionError
+
+ if nonbuf == False:
+ self._read_queue.append([self.DATA_BUF,size,bytearray(),tornado.stack_context.wrap(callback)])
+ else:
+ self._read_queue.append([self.DATA_NOBUF,size,tornado.stack_context.wrap(callback)])
+
self._stat |= tornado.ioloop.IOLoop.READ
self._ioloop.update_handler(self._sock.fileno(),self._stat)
def write(self,buf,callback = None):
+ if self._closed == True:
+ raise ConnectionError
+
self._write_queue.append([self.DATA_BUF,0,buf,tornado.stack_context.wrap(callback)])
self._stat |= tornado.ioloop.IOLoop.WRITE
self._ioloop.update_handler(self._sock.fileno(),self._stat)
def sendfile(self,fd,callback = None):
+ if self._closed == True:
+ raise ConnectionError
+
size = os.fstat(fd).st_size
self._write_queue.append([self.DATA_FILE,size,fd,tornado.stack_context.wrap(callback)])
@@ -76,6 +95,9 @@ class SocketStream:
self._ioloop.update_handler(self._sock.fileno(),self._stat)
def recvfile(self,fd,size,callback = None):
+ if self._closed == True:
+ raise ConnectionError
+
self._read_queue.append([self.DATA_FILE,size,fd,tornado.stack_context.wrap(callback)])
self._stat |= tornado.ioloop.IOLoop.READ
@@ -89,6 +111,13 @@ class SocketStream:
self._close_callback = tornado.stack_context.wrap(callback)
def close(self):
+ if self._closed == True:
+ return
+
+ self._closed = True
+ self._ioloop.remove_handler(self._sock.fileno())
+ self._sock.close()
+
if self._close_callback != None:
self._close_callback(self)
@@ -104,6 +133,9 @@ class SocketStream:
try:
while True:
buf = self._sock.recv(size)
+ if len(buf) == 0:
+ self.close()
+ return
iocb[2].extend(buf)
size -= len(buf)
@@ -119,6 +151,24 @@ class SocketStream:
iocb[1] = size
break
+ elif datatype == self.DATA_NOBUF:
+ size = iocb[1]
+
+ try:
+ while True:
+ buf = self._sock.recv(size)
+
+ iocb[2](buf)
+ size -= len(buf)
+
+ if size == 0:
+ self._read_queue.popleft()
+ break
+
+ except BlockingIOError:
+ iocb[1] = size
+ break
+
elif datatype == self.DATA_FILE:
size = iocb[1]
@@ -190,6 +240,9 @@ class SocketStream:
iocb[1] = size
break
+ if self._closed == True:
+ return
+
stat = tornado.ioloop.IOLoop.ERROR
if len(self._read_queue) > 0:
stat |= tornado.ioloop.IOLoop.READ
@@ -209,7 +262,7 @@ class SocketConnection(Connection):
self._stream_filekeymap = {}
self.main_stream = main_stream
- self.main_stream.set_close_callback(self.close)
+ self.main_stream.set_close_callback(lambda conn : self.close())
self.add_pend_filestream = add_pend_filestream_fn
self._start_ping()
@@ -238,13 +291,9 @@ class SocketConnection(Connection):
file_stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),self.main_stream.addr)
file_stream.connect(_conn_cb)
- #send_pack(self.file_stream,bytes(json.dumps({'filekey':filekey,'filesize':filesize}),'utf-8'))
- #recv_pack(self.file_stream,_recv_cb)
-
def recv_file(self,filekey,filesize,filepath):
def _conn_cb(file_stream):
- print(filesize)
-
+ self._stream_filekeymap[filekey] = file_stream
file_stream.recvfile(fd,filesize,_done_cb)
def _done_cb():
@@ -258,11 +307,37 @@ class SocketConnection(Connection):
st = time.perf_counter()
def send_filedata(self,filekey,filesize):
- pass
+ def _conn_cb():
+ nonlocal file_stream
- def recv_filedata(self,filekey,filesize):
- print('test')
- pass
+ file_stream = stream
+ self._stream_filekeymap[filekey] = file_stream
+
+ send_pack(file_stream,bytes(json.dumps({
+ 'conntype':'file',
+ 'filekey':filekey
+ }),'utf-8'))
+
+ old_gr.switch()
+
+ def _send_cb(data):
+ file_stream.write(data)
+
+ file_stream = None
+ stream = SocketStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0),self.main_stream.addr)
+
+ old_gr = imc.async.current()
+ stream.connect(_conn_cb)
+ imc.async.switchtop()
+
+ return _send_cb
+
+ def recv_filedata(self,filekey,filesize,callback):
+ def _conn_cb(file_stream):
+ self._stream_filekeymap[filekey] = file_stream
+ file_stream.read_bytes(filesize,callback,nonbuf = True)
+
+ self.add_pend_filestream(filekey,_conn_cb)
def start_recv(self,recv_callback):
def _recv_size(data):
@@ -285,6 +360,7 @@ class SocketConnection(Connection):
def close(self):
try:
self._ping_timer.stop()
+
except AttributeError:
pass
@@ -294,7 +370,11 @@ class SocketConnection(Connection):
def _start_ping(self):
def __check():
- self.main_stream.write(struct.pack('l',-1))
+ try:
+ self.main_stream.write(struct.pack('l',-1))
+
+ except ConnectionError:
+ return
self._ping_delay += 1
if self._ping_delay > 10: