aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorpzread <netfirewall@gmail.com>2013-05-05 16:12:01 +0800
committerpzread <netfirewall@gmail.com>2013-05-05 16:12:01 +0800
commit20d231ddc85779c7d3685296b107c7d617077b2d (patch)
treeacef207840a584bbc540bb090b59e598a743689a
parent873fbd6e580c937beeb4e10cb67923057d5e7c80 (diff)
downloadtaiwan-online-judge-20d231ddc85779c7d3685296b107c7d617077b2d.tar
taiwan-online-judge-20d231ddc85779c7d3685296b107c7d617077b2d.tar.gz
taiwan-online-judge-20d231ddc85779c7d3685296b107c7d617077b2d.tar.bz2
taiwan-online-judge-20d231ddc85779c7d3685296b107c7d617077b2d.tar.lz
taiwan-online-judge-20d231ddc85779c7d3685296b107c7d617077b2d.tar.xz
taiwan-online-judge-20d231ddc85779c7d3685296b107c7d617077b2d.tar.zst
taiwan-online-judge-20d231ddc85779c7d3685296b107c7d617077b2d.zip
Finish BackendWorker connect to CenterServer
-rw-r--r--src/py/backend_server.py80
-rw-r--r--src/py/center_server.py61
-rw-r--r--src/py/imcproxy.py71
-rw-r--r--src/py/netio.py67
4 files changed, 186 insertions, 93 deletions
diff --git a/src/py/backend_server.py b/src/py/backend_server.py
index 277a473..fa809d9 100644
--- a/src/py/backend_server.py
+++ b/src/py/backend_server.py
@@ -1,63 +1,55 @@
+#! /usr/bin/env python
+
import socket
-import struct
import json
+import datetime
import tornado.iostream
import tornado.ioloop
+import netio
import imcproxy
class BackendWorker():
def __init__(self,center_addr):
- self.linkclass = 'backend'
- self.linkid = None
+ self.ioloop = tornado.ioloop.IOLoop.current()
self.center_addr = center_addr
+ self.linkclass = 'backend'
+ self.linkid = None
self.imc_proxy = imcproxy.IMCProxy()
def start(self):
- self._sock_conn()
-
- def send_pack(self,data):
- self.center_stream.write(struct.pack('L',len(data)) + data)
-
- def recv_pack(self,callback):
- def _recv_size(data):
- size, = struct.unpack('L',data)
- self.center_stream.read_bytes(size,_recv_data)
-
- def _recv_data(data):
- callback(json.loads(data.decode('utf-8')))
-
- self.center_stream.read_bytes(8,_recv_size)
-
- def _sock_conn(self):
- def __sock_conn_cb():
- self._send_worker_info()
-
- self.center_stream = tornado.iostream.IOStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0))
- self.center_stream.set_close_callback(self._sock_close_cb)
- self.center_stream.connect(self.center_addr,__sock_conn_cb)
-
- def _sock_close_cb(self):
- print('disconnect')
-
- def _send_worker_info(self):
- def __recv_info_cb(info):
- self.linkid = info['linkid']
- self.imc_proxy.add_sock_conn(self.center_stream,info['center_linkid'])
-
- print('/backend/' + self.linkid)
-
- self.send_pack(bytes(json.dumps({
- 'linkclass':self.linkclass,
- 'ws_addr':('210.70.123.215',81)
- }),'utf-8'))
-
- self.recv_pack(__recv_info_cb)
-
+ self._conn_center()
+
+ def _conn_center(self):
+ def __retry():
+ print('retry connect center')
+ self.ioloop.add_timeout(datetime.timedelta(seconds = 5),self._conn_center)
+
+ def __send_worker_info():
+ def ___recv_info_cb(data):
+ info = json.loads(data.decode('utf-8'))
+
+ self.linkid = info['linkid']
+ self.center_conn = netio.SocketConnection(stream)
+ self.center_conn.add_close_callback(lambda conn : __retry())
+ self.imc_proxy.add_conn(info['center_linkid'],self.center_conn)
+
+ print('/backend/' + self.linkid)
+
+ netio.send_pack(stream,bytes(json.dumps({
+ 'linkclass':self.linkclass,
+ 'ws_addr':('210.70.137.215',81)
+ }),'utf-8'))
+ netio.recv_pack(stream,___recv_info_cb)
+
+ stream = tornado.iostream.IOStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0))
+ stream.set_close_callback(__retry)
+ stream.connect(self.center_addr,lambda : __send_worker_info())
+
if __name__ == '__main__':
- backend_worker = BackendWorker(('localhost',2501))
+ backend_worker = BackendWorker(('localhost',5730))
backend_worker.start()
tornado.ioloop.IOLoop.instance().start()
diff --git a/src/py/center_server.py b/src/py/center_server.py
index 1d93946..4a892f8 100644
--- a/src/py/center_server.py
+++ b/src/py/center_server.py
@@ -1,5 +1,6 @@
+#! /usr/bin/env python
+
import random
-import struct
import json
import uuid
@@ -8,6 +9,7 @@ import tornado.tcpserver
import tornado.httpserver
import tornado.web
+import netio
import imcproxy
class Worker:
@@ -19,40 +21,33 @@ class Worker:
self.linkid = linkid
self.worker_ip = worker_ip
- stream.set_close_callback(self._sock_close_cb)
-
- self.send_pack(bytes(json.dumps({
+ netio.send_pack(self.stream,bytes(json.dumps({
'linkid':self.linkid,
'center_linkid':center_serv.linkid
}),'utf-8'))
- center_serv.add_backend_worker(self)
-
- def send_pack(self,data):
- self.stream.write(struct.pack('L',len(data)) + data)
-
- def recv_pack(self,callback):
- def _recv_size(data):
- size, = struct.unpack('L',data)
- self.stream.read_bytes(size,_recv_data)
+ conn = netio.SocketConnection(self.stream)
+ conn.add_close_callback(lambda conn : self.close())
+ center_serv.imc_proxy.add_conn(self.linkid,conn)
- def _recv_data(data):
- callback(json.loads(data.decode('utf-8')))
-
- self.stream.read_bytes(8,_recv_size)
-
- def _sock_close_cb(self):
+ def close(self):
pass
class BackendWorker(Worker):
def __init__(self,stream,linkid,worker_ip,worker_info):
+ global center_serv
+
super().__init__(stream,'backend',linkid,worker_ip)
self.ws_addr = worker_info['ws_addr']
+ center_serv.add_backend_worker(self)
+
def add_client(self):
return self.ws_addr
- def _sock_close_cb(self):
+ def close(self):
+ global center_serv
+
center_serv.del_backend_worker(self)
print('disconnect')
@@ -63,27 +58,26 @@ class CenterServer(tornado.tcpserver.TCPServer):
self.linkid_usemap = {}
self.backend_workerlist = []
+ self.imc_proxy = imcproxy.IMCProxy()
self.linkclass = 'center'
self.linkid = self._create_linkid()
- self.imc_proxy = imcproxy.IMCProxy()
print('/center/' + self.linkid)
def handle_stream(self,stream,address):
- def _recv_worker_info(worker_info):
+ def _recv_worker_info(data):
+ worker_info = json.loads(data.decode('utf-8'))
+
linkclass = worker_info['linkclass']
if linkclass == 'backend':
linkid = self._create_linkid()
worker_ip,worker_port = address
worker = BackendWorker(stream,linkid,worker_ip,worker_info)
- else:
- return
- self._recv_pack(stream,_recv_worker_info)
+ netio.recv_pack(stream,_recv_worker_info)
def add_backend_worker(self,worker):
self.backend_workerlist.append(worker)
- self.imc_proxy.add_sock_conn(worker.stream,worker.linkid)
def del_backend_worker(self,worker):
self.backend_workerlist.remove(worker)
@@ -105,19 +99,6 @@ class CenterServer(tornado.tcpserver.TCPServer):
return linkid
- def _send_pack(self,stream,data):
- stream.write(struct.pack('L',len(data)) + data)
-
- def _recv_pack(self,stream,callback):
- def __recv_size(data):
- size, = struct.unpack('L',data)
- stream.read_bytes(size,__recv_data)
-
- def __recv_data(data):
- callback(json.loads(data.decode('utf-8')))
-
- stream.read_bytes(8,__recv_size)
-
class WebConnHandler(tornado.web.RequestHandler):
def get(self):
global center_serv
@@ -135,7 +116,7 @@ if __name__ == '__main__':
global center_serv
center_serv = CenterServer()
- center_serv.listen(2501)
+ center_serv.listen(5730)
http_serv = tornado.httpserver.HTTPServer(tornado.web.Application([
('/conn',WebConnHandler),
diff --git a/src/py/imcproxy.py b/src/py/imcproxy.py
index f126816..a46b99d 100644
--- a/src/py/imcproxy.py
+++ b/src/py/imcproxy.py
@@ -1,20 +1,73 @@
-#! /usr/bin/env python
import json
+import tornado.ioloop
+import tornado.stack_context
+
class IMCConnection:
def __init__(self):
+ self._close_callback = []
+
+ def send_msg(self,data):
+ pass
+
+ def start_recvloop(self,recvloop_callback):
pass
-class SocketConnection(IMCConnection):
- def __init__(self,stream):
- super().__init__()
+ def add_close_callback(self,callback):
+ self._close_callback.append(tornado.stack_context.wrap(callback))
+
+ def close(self):
+ for callback in self._close_callback:
+ callback(self)
class IMCProxy:
def __init__(self):
- self.conn_linkidmap = {}
+ self._linkid_connmap = {}
+ self._conn_linkidmap = {}
+
+ self.MSGTYPE_CALL = 'call'
+
+ def add_conn(self,linkid,conn):
+ self._linkid_connmap[id(conn)] = linkid
+ self._conn_linkidmap[linkid] = conn
+
+ conn.add_close_callback(self._conn_close_cb)
+ conn.start_recvloop(self._recvloop_dispatch)
+
+ def del_conn(self,conn):
+ linkid = self._linkid_connmap.pop(id(conn))
+ del self._conn_linkidmap[linkid]
+
+ def get_conn(self,linkid):
+ if linkid not in self.conn_linkidmap:
+ return None
+
+ return self.conn_linkidmap[linkid]
+
+ def _recvloop_dispatch(self,conn,data):
+ msg = json.loads(data.decode('utf-8'))
+ msg_type == msg['type']
+ if msg_type == self.MSGTYPE_CALL:
+ _recv_msg_call(msg)
+
+ def _conn_close_cb(self,conn):
+ self.del_conn(conn)
+ print('connection close')
+
+ def _send_msg_call(self,conn,iden,dst,func,param):
+ msg = {
+ 'type':self.MSGTYPE_CALL,
+ 'iden':iden,
+ 'dst':dst,
+ 'func':func,
+ 'param':param
+ }
+ conn.send_msg(bytes(json.dumps(msg),'utf-8'))
- def add_sock_conn(self,stream,linkid):
- self._add_conn(SocketConnection(stream),linkid)
+ def _recv_msg_call(self,msg):
+ iden = msg['iden']
+ dst = msg['dst']
+ func = msg['func']
+ param = msg['param']
- def _add_conn(self,conn,linkid):
- self.conn_linkidmap[linkid] = conn
+ print(func)
diff --git a/src/py/netio.py b/src/py/netio.py
new file mode 100644
index 0000000..9e80d54
--- /dev/null
+++ b/src/py/netio.py
@@ -0,0 +1,67 @@
+import struct
+
+import tornado.ioloop
+import tornado.stack_context
+
+import imcproxy
+
+def send_pack(stream,data):
+ stream.write(struct.pack('l',len(data)) + data)
+
+def recv_pack(stream,callback):
+ def _recv_size(data):
+ size, = struct.unpack('l',data)
+ stream.read_bytes(size,lambda data : callback(data))
+
+ stream.read_bytes(8,_recv_size)
+
+class SocketConnection(imcproxy.IMCConnection):
+ def __init__(self,stream):
+ super().__init__()
+
+ self.ioloop = tornado.ioloop.IOLoop.current()
+ self.stream = stream
+ self.stream.set_close_callback(self.close)
+
+ self._start_ping()
+
+ def send_msg(self,data):
+ self.stream.write(struct.pack('l',len(data)) + data)
+
+ def start_recvloop(self,recvloop_callback):
+ def _recv_size(data):
+ size, = struct.unpack('l',data)
+ if size > 0:
+ self.stream.read_bytes(size,_recv_data)
+ else:
+ if size == -1: #pong
+ self._ping_delay = 0
+
+ self.stream.read_bytes(8,_recv_size)
+
+ def _recv_data(data):
+ self._recvloop_callback(self,data)
+ self.stream.read_bytes(8,_recv_size)
+
+ self._recvloop_callback = tornado.stack_context.wrap(recvloop_callback)
+ self.stream.read_bytes(8,_recv_size)
+
+ def close(self):
+ try:
+ self._ping_timer.stop()
+ except AttributeError:
+ pass
+
+ super().close()
+
+ def _start_ping(self):
+ def __check():
+ self.stream.write(struct.pack('l',-1))
+
+ self._ping_delay += 1
+ if self._ping_delay > 10:
+ self.close()
+
+ self._ping_timer = tornado.ioloop.PeriodicCallback(__check,1000)
+ self._ping_timer.start()
+ self._ping_delay = 0