aboutsummaryrefslogtreecommitdiffstats
path: root/src/py
diff options
context:
space:
mode:
authorpzread <netfirewall@gmail.com>2013-05-26 02:58:29 +0800
committerpzread <netfirewall@gmail.com>2013-05-26 02:58:29 +0800
commit5c2764816a3db7a4a2a8d8099967abf5f9462ffe (patch)
tree029ec09ad4c321b8fb134a2f05931c8cbe2e3e57 /src/py
parent3653c85804a25de0162064449b4585717af77557 (diff)
downloadtaiwan-online-judge-5c2764816a3db7a4a2a8d8099967abf5f9462ffe.tar
taiwan-online-judge-5c2764816a3db7a4a2a8d8099967abf5f9462ffe.tar.gz
taiwan-online-judge-5c2764816a3db7a4a2a8d8099967abf5f9462ffe.tar.bz2
taiwan-online-judge-5c2764816a3db7a4a2a8d8099967abf5f9462ffe.tar.lz
taiwan-online-judge-5c2764816a3db7a4a2a8d8099967abf5f9462ffe.tar.xz
taiwan-online-judge-5c2764816a3db7a4a2a8d8099967abf5f9462ffe.tar.zst
taiwan-online-judge-5c2764816a3db7a4a2a8d8099967abf5f9462ffe.zip
Do file connection
Diffstat (limited to 'src/py')
-rw-r--r--src/py/backend_server.py140
-rw-r--r--src/py/netio.py33
2 files changed, 107 insertions, 66 deletions
diff --git a/src/py/backend_server.py b/src/py/backend_server.py
index 110dd07..cac0e11 100644
--- a/src/py/backend_server.py
+++ b/src/py/backend_server.py
@@ -32,47 +32,27 @@ class BackendWorker(tornado.tcpserver.TCPServer):
self._linkid = None
self._idendesc = None
- self._pendconn_linkidmap = {}
+ self._pend_callconn_linkidmap = {}
+ self._pend_fileconn_linkidmap = {}
self._client_linkidmap = {}
def start(self):
sock_port = random.randrange(4096,8192)
self.listen(sock_port)
self.sock_addr = ('127.0.0.1',sock_port)
+
self._conn_center()
- def handle_stream(self,stream,address):
+ def handle_stream(self,stream,addr):
def _recv_conn_info(data):
- def __send_back(stat):
- netio.send_pack(stream,bytes(json.dumps(stat),'utf-8'))
-
info = json.loads(data.decode('utf-8'))
- linkclass = info['linkclass']
- linkid = info['linkid']
-
- conn = Proxy.instance.get_conn(linkid)
- if conn != None:
- return
-
- if linkid not in self._pendconn_linkidmap:
- __send_back(True)
-
- conn = netio.SocketConnection(linkclass,linkid,stream)
- Proxy.instance.add_conn(conn)
-
- else:
- if self._linkid > linkid:
- __send_back(True)
+ conntype = info['conntype']
- conn = netio.SocketConnection(linkclass,linkid,stream)
- Proxy.instance.add_conn(conn)
+ if conntype == 'call':
+ self._handle_callconn(stream,addr,info)
- pends = self._pendconn_linkidmap.pop(linkid)
- for callback in pends:
- callback(conn)
-
- else:
- __send_back(False)
+ elif conntype == 'file':
+ self._handle_fileconn(stream,addr,info)
netio.recv_pack(stream,_recv_conn_info)
@@ -115,12 +95,11 @@ class BackendWorker(tornado.tcpserver.TCPServer):
self.center_conn.add_close_callback(lambda conn : __retry())
Proxy.instance.add_conn(self.center_conn)
+
imc_register_call('','test_dst',self._test_dst)
imc_register_call('','test_dsta',self._test_dsta)
time.sleep(0.5)
- #x = int(self._iden['linkid']) - (int(self._iden['linkid']) - 2) % 4
- #self._test_call(None,str(x))
if int(self._linkid) % 2 == 0:
self._test_call(None,str(int(self._linkid) + 1))
@@ -140,36 +119,46 @@ class BackendWorker(tornado.tcpserver.TCPServer):
def _conn_linkid(self,linkid):
def __handle_pend(conn):
- pends = self._pendconn_linkidmap.pop(worker_linkid)
+ pends = self._pend_callconn_linkidmap.pop(worker_linkid)
for gr in pends:
gr.switch(conn)
- def __send_info():
- def ___recv_cb(data):
- stat = json.loads(data.decode('utf-8'))
-
- if stat == True:
- conn = netio.SocketConnection(worker_linkclass,worker_linkid,stream)
- Proxy.instance.add_conn(conn)
- __handle_pend(conn)
-
- else:
- stream.set_close_callback(None)
- stream.close()
-
+ def __call_conn_cb():
conn = Proxy.instance.get_conn(worker_linkid)
if conn != None:
__handle_pend(conn)
- stream.set_close_callback(None)
- stream.close()
+ call_stream.set_close_callback(None)
+ call_stream.close()
else:
- netio.send_pack(stream,bytes(json.dumps({
+ netio.send_pack(call_stream,bytes(json.dumps({
+ 'conntype':'call',
'linkclass':'backend',
'linkid':self._linkid
}),'utf-8'))
- netio.recv_pack(stream,___recv_cb)
+ netio.recv_pack(call_stream,__call_recv_cb)
+
+ def __call_recv_cb(data):
+ def ___file_conn_cb():
+ conn = netio.SocketConnection(worker_linkclass,worker_linkid,call_stream,file_stream)
+ Proxy.instance.add_conn(conn)
+ __handle_pend(conn)
+
+ netio.send_pack(file_stream,bytes(json.dumps({
+ 'conntype':'file',
+ 'linkid':self._linkid
+ }),'utf-8'))
+
+ stat = json.loads(data.decode('utf-8'))
+ if stat == True:
+ file_stream = tornado.iostream.IOStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0))
+ file_stream.connect(sock_addr,___file_conn_cb)
+ else:
+ call_stream.set_close_callback(None)
+ call_stream.close()
+
+
if self.center_conn == None:
return None
@@ -186,19 +175,60 @@ class BackendWorker(tornado.tcpserver.TCPServer):
if conn != None:
return conn
- elif worker_linkid in self._pendconn_linkidmap:
- self._pendconn_linkidmap[worker_linkid].append(imc.async.current())
+ elif worker_linkid in self._pend_callconn_linkidmap:
+ self._pend_callconn_linkidmap[worker_linkid].append(imc.async.current())
return imc.async.switchtop()
else:
- self._pendconn_linkidmap[worker_linkid] = [imc.async.current()]
+ self._pend_callconn_linkidmap[worker_linkid] = [imc.async.current()]
- stream = tornado.iostream.IOStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0))
- stream.set_close_callback(lambda : __handle_pend(None))
- stream.connect((ret['sock_ip'],ret['sock_port']),__send_info)
+ sock_addr = (ret['sock_ip'],ret['sock_port'])
+
+ call_stream = tornado.iostream.IOStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0))
+ call_stream.set_close_callback(lambda : __handle_pend(None))
+ call_stream.connect(sock_addr,__call_conn_cb)
return imc.async.switchtop()
+
+ def _handle_callconn(self,call_stream,addr,info):
+ def __send_back(stat):
+ if stat == True:
+ self._pend_fileconn_linkidmap[linkid] = __file_conn_cb
+
+ netio.send_pack(call_stream,bytes(json.dumps(stat),'utf-8'))
+
+ def __file_conn_cb(file_stream):
+ conn = netio.SocketConnection(linkclass,linkid,call_stream,file_stream)
+ Proxy.instance.add_conn(conn)
+
+ linkclass = info['linkclass']
+ linkid = info['linkid']
+
+ conn = Proxy.instance.get_conn(linkid)
+ if conn != None:
+ return
+
+ if linkid not in self._pend_callconn_linkidmap:
+ __send_back(True)
+
+ else:
+ if self._linkid > linkid:
+ __send_back(True)
+
+ pends = self._pend_callconn_linkidmap.pop(linkid)
+ for callback in pends:
+ callback(conn)
+
+ else:
+ __send_back(False)
+ def _handle_fileconn(self,file_stream,addr,info):
+ try:
+ self._pend_fileconn_linkidmap.pop(info['linkid'])(file_stream)
+
+ except KeyError:
+ pass
+
@imc.async.caller
def _test_call(self,iden,param):
print(time.perf_counter())
diff --git a/src/py/netio.py b/src/py/netio.py
index c6aacbc..0e8cab1 100644
--- a/src/py/netio.py
+++ b/src/py/netio.py
@@ -16,35 +16,42 @@ def recv_pack(stream,callback):
stream.read_bytes(8,_recv_size)
class SocketConnection(Connection):
- def __init__(self,linkclass,linkid,stream):
+ def __init__(self,linkclass,linkid,call_stream,file_stream = None):
super().__init__(linkclass,linkid)
- self.ioloop = tornado.ioloop.IOLoop.current()
- self.stream = stream
- self.stream.set_close_callback(self.close)
+ self._ioloop = tornado.ioloop.IOLoop.current()
+ self.call_stream = call_stream
+ self.call_stream.set_close_callback(self.close)
+
+ if file_stream == None:
+ self.file_stream = None
+
+ else:
+ self.file_stream = file_stream
+ self.file_stream.set_close_callback(self.close)
self._start_ping()
def send_msg(self,data):
- self.stream.write(struct.pack('l',len(data)) + data)
+ self.call_stream.write(struct.pack('l',len(data)) + data)
def start_recv(self,recv_callback):
def _recv_size(data):
size, = struct.unpack('l',data)
if size > 0:
- self.stream.read_bytes(size,_recv_data)
+ self.call_stream.read_bytes(size,_recv_data)
else:
if size == -1: #pong
self._ping_delay = 0
- self.stream.read_bytes(8,_recv_size)
+ self.call_stream.read_bytes(8,_recv_size)
def _recv_data(data):
self._recv_callback(self,data)
- self.stream.read_bytes(8,_recv_size)
+ self.call_stream.read_bytes(8,_recv_size)
self._recv_callback = tornado.stack_context.wrap(recv_callback)
- self.stream.read_bytes(8,_recv_size)
+ self.call_stream.read_bytes(8,_recv_size)
def close(self):
try:
@@ -52,11 +59,15 @@ class SocketConnection(Connection):
except AttributeError:
pass
+ self.call_stream.close()
+ if self.file_stream != None:
+ self.file_stream.close()
+
super().close()
def _start_ping(self):
def __check():
- self.stream.write(struct.pack('l',-1))
+ self.call_stream.write(struct.pack('l',-1))
self._ping_delay += 1
if self._ping_delay > 10:
@@ -70,7 +81,7 @@ class WebSocketConnection(Connection):
def __init__(self,linkclass,linkid,handler):
super().__init__(linkclass,linkid)
- self.ioloop = tornado.ioloop.IOLoop.current()
+ self._ioloop = tornado.ioloop.IOLoop.current()
self.handler = handler
def send_msg(self,data):