aboutsummaryrefslogtreecommitdiffstats
path: root/src/py/backend_server.py
diff options
context:
space:
mode:
authorpzread <netfirewall@gmail.com>2013-05-26 02:58:29 +0800
committerpzread <netfirewall@gmail.com>2013-05-26 02:58:29 +0800
commit5c2764816a3db7a4a2a8d8099967abf5f9462ffe (patch)
tree029ec09ad4c321b8fb134a2f05931c8cbe2e3e57 /src/py/backend_server.py
parent3653c85804a25de0162064449b4585717af77557 (diff)
downloadtaiwan-online-judge-5c2764816a3db7a4a2a8d8099967abf5f9462ffe.tar
taiwan-online-judge-5c2764816a3db7a4a2a8d8099967abf5f9462ffe.tar.gz
taiwan-online-judge-5c2764816a3db7a4a2a8d8099967abf5f9462ffe.tar.bz2
taiwan-online-judge-5c2764816a3db7a4a2a8d8099967abf5f9462ffe.tar.lz
taiwan-online-judge-5c2764816a3db7a4a2a8d8099967abf5f9462ffe.tar.xz
taiwan-online-judge-5c2764816a3db7a4a2a8d8099967abf5f9462ffe.tar.zst
taiwan-online-judge-5c2764816a3db7a4a2a8d8099967abf5f9462ffe.zip
Do file connection
Diffstat (limited to 'src/py/backend_server.py')
-rw-r--r--src/py/backend_server.py140
1 files changed, 85 insertions, 55 deletions
diff --git a/src/py/backend_server.py b/src/py/backend_server.py
index 110dd07..cac0e11 100644
--- a/src/py/backend_server.py
+++ b/src/py/backend_server.py
@@ -32,47 +32,27 @@ class BackendWorker(tornado.tcpserver.TCPServer):
self._linkid = None
self._idendesc = None
- self._pendconn_linkidmap = {}
+ self._pend_callconn_linkidmap = {}
+ self._pend_fileconn_linkidmap = {}
self._client_linkidmap = {}
def start(self):
sock_port = random.randrange(4096,8192)
self.listen(sock_port)
self.sock_addr = ('127.0.0.1',sock_port)
+
self._conn_center()
- def handle_stream(self,stream,address):
+ def handle_stream(self,stream,addr):
def _recv_conn_info(data):
- def __send_back(stat):
- netio.send_pack(stream,bytes(json.dumps(stat),'utf-8'))
-
info = json.loads(data.decode('utf-8'))
- linkclass = info['linkclass']
- linkid = info['linkid']
-
- conn = Proxy.instance.get_conn(linkid)
- if conn != None:
- return
-
- if linkid not in self._pendconn_linkidmap:
- __send_back(True)
-
- conn = netio.SocketConnection(linkclass,linkid,stream)
- Proxy.instance.add_conn(conn)
-
- else:
- if self._linkid > linkid:
- __send_back(True)
+ conntype = info['conntype']
- conn = netio.SocketConnection(linkclass,linkid,stream)
- Proxy.instance.add_conn(conn)
+ if conntype == 'call':
+ self._handle_callconn(stream,addr,info)
- pends = self._pendconn_linkidmap.pop(linkid)
- for callback in pends:
- callback(conn)
-
- else:
- __send_back(False)
+ elif conntype == 'file':
+ self._handle_fileconn(stream,addr,info)
netio.recv_pack(stream,_recv_conn_info)
@@ -115,12 +95,11 @@ class BackendWorker(tornado.tcpserver.TCPServer):
self.center_conn.add_close_callback(lambda conn : __retry())
Proxy.instance.add_conn(self.center_conn)
+
imc_register_call('','test_dst',self._test_dst)
imc_register_call('','test_dsta',self._test_dsta)
time.sleep(0.5)
- #x = int(self._iden['linkid']) - (int(self._iden['linkid']) - 2) % 4
- #self._test_call(None,str(x))
if int(self._linkid) % 2 == 0:
self._test_call(None,str(int(self._linkid) + 1))
@@ -140,36 +119,46 @@ class BackendWorker(tornado.tcpserver.TCPServer):
def _conn_linkid(self,linkid):
def __handle_pend(conn):
- pends = self._pendconn_linkidmap.pop(worker_linkid)
+ pends = self._pend_callconn_linkidmap.pop(worker_linkid)
for gr in pends:
gr.switch(conn)
- def __send_info():
- def ___recv_cb(data):
- stat = json.loads(data.decode('utf-8'))
-
- if stat == True:
- conn = netio.SocketConnection(worker_linkclass,worker_linkid,stream)
- Proxy.instance.add_conn(conn)
- __handle_pend(conn)
-
- else:
- stream.set_close_callback(None)
- stream.close()
-
+ def __call_conn_cb():
conn = Proxy.instance.get_conn(worker_linkid)
if conn != None:
__handle_pend(conn)
- stream.set_close_callback(None)
- stream.close()
+ call_stream.set_close_callback(None)
+ call_stream.close()
else:
- netio.send_pack(stream,bytes(json.dumps({
+ netio.send_pack(call_stream,bytes(json.dumps({
+ 'conntype':'call',
'linkclass':'backend',
'linkid':self._linkid
}),'utf-8'))
- netio.recv_pack(stream,___recv_cb)
+ netio.recv_pack(call_stream,__call_recv_cb)
+
+ def __call_recv_cb(data):
+ def ___file_conn_cb():
+ conn = netio.SocketConnection(worker_linkclass,worker_linkid,call_stream,file_stream)
+ Proxy.instance.add_conn(conn)
+ __handle_pend(conn)
+
+ netio.send_pack(file_stream,bytes(json.dumps({
+ 'conntype':'file',
+ 'linkid':self._linkid
+ }),'utf-8'))
+
+ stat = json.loads(data.decode('utf-8'))
+ if stat == True:
+ file_stream = tornado.iostream.IOStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0))
+ file_stream.connect(sock_addr,___file_conn_cb)
+ else:
+ call_stream.set_close_callback(None)
+ call_stream.close()
+
+
if self.center_conn == None:
return None
@@ -186,19 +175,60 @@ class BackendWorker(tornado.tcpserver.TCPServer):
if conn != None:
return conn
- elif worker_linkid in self._pendconn_linkidmap:
- self._pendconn_linkidmap[worker_linkid].append(imc.async.current())
+ elif worker_linkid in self._pend_callconn_linkidmap:
+ self._pend_callconn_linkidmap[worker_linkid].append(imc.async.current())
return imc.async.switchtop()
else:
- self._pendconn_linkidmap[worker_linkid] = [imc.async.current()]
+ self._pend_callconn_linkidmap[worker_linkid] = [imc.async.current()]
- stream = tornado.iostream.IOStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0))
- stream.set_close_callback(lambda : __handle_pend(None))
- stream.connect((ret['sock_ip'],ret['sock_port']),__send_info)
+ sock_addr = (ret['sock_ip'],ret['sock_port'])
+
+ call_stream = tornado.iostream.IOStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0))
+ call_stream.set_close_callback(lambda : __handle_pend(None))
+ call_stream.connect(sock_addr,__call_conn_cb)
return imc.async.switchtop()
+
+ def _handle_callconn(self,call_stream,addr,info):
+ def __send_back(stat):
+ if stat == True:
+ self._pend_fileconn_linkidmap[linkid] = __file_conn_cb
+
+ netio.send_pack(call_stream,bytes(json.dumps(stat),'utf-8'))
+
+ def __file_conn_cb(file_stream):
+ conn = netio.SocketConnection(linkclass,linkid,call_stream,file_stream)
+ Proxy.instance.add_conn(conn)
+
+ linkclass = info['linkclass']
+ linkid = info['linkid']
+
+ conn = Proxy.instance.get_conn(linkid)
+ if conn != None:
+ return
+
+ if linkid not in self._pend_callconn_linkidmap:
+ __send_back(True)
+
+ else:
+ if self._linkid > linkid:
+ __send_back(True)
+
+ pends = self._pend_callconn_linkidmap.pop(linkid)
+ for callback in pends:
+ callback(conn)
+
+ else:
+ __send_back(False)
+ def _handle_fileconn(self,file_stream,addr,info):
+ try:
+ self._pend_fileconn_linkidmap.pop(info['linkid'])(file_stream)
+
+ except KeyError:
+ pass
+
@imc.async.caller
def _test_call(self,iden,param):
print(time.perf_counter())