diff options
author | pzread <netfirewall@gmail.com> | 2013-05-05 16:12:01 +0800 |
---|---|---|
committer | pzread <netfirewall@gmail.com> | 2013-05-05 16:12:01 +0800 |
commit | 20d231ddc85779c7d3685296b107c7d617077b2d (patch) | |
tree | acef207840a584bbc540bb090b59e598a743689a | |
parent | 873fbd6e580c937beeb4e10cb67923057d5e7c80 (diff) | |
download | taiwan-online-judge-20d231ddc85779c7d3685296b107c7d617077b2d.tar taiwan-online-judge-20d231ddc85779c7d3685296b107c7d617077b2d.tar.gz taiwan-online-judge-20d231ddc85779c7d3685296b107c7d617077b2d.tar.bz2 taiwan-online-judge-20d231ddc85779c7d3685296b107c7d617077b2d.tar.lz taiwan-online-judge-20d231ddc85779c7d3685296b107c7d617077b2d.tar.xz taiwan-online-judge-20d231ddc85779c7d3685296b107c7d617077b2d.tar.zst taiwan-online-judge-20d231ddc85779c7d3685296b107c7d617077b2d.zip |
Finish BackendWorker connect to CenterServer
-rw-r--r-- | src/py/backend_server.py | 80 | ||||
-rw-r--r-- | src/py/center_server.py | 61 | ||||
-rw-r--r-- | src/py/imcproxy.py | 71 | ||||
-rw-r--r-- | src/py/netio.py | 67 |
4 files changed, 186 insertions, 93 deletions
diff --git a/src/py/backend_server.py b/src/py/backend_server.py index 277a473..fa809d9 100644 --- a/src/py/backend_server.py +++ b/src/py/backend_server.py @@ -1,63 +1,55 @@ +#! /usr/bin/env python + import socket -import struct import json +import datetime import tornado.iostream import tornado.ioloop +import netio import imcproxy class BackendWorker(): def __init__(self,center_addr): - self.linkclass = 'backend' - self.linkid = None + self.ioloop = tornado.ioloop.IOLoop.current() self.center_addr = center_addr + self.linkclass = 'backend' + self.linkid = None self.imc_proxy = imcproxy.IMCProxy() def start(self): - self._sock_conn() - - def send_pack(self,data): - self.center_stream.write(struct.pack('L',len(data)) + data) - - def recv_pack(self,callback): - def _recv_size(data): - size, = struct.unpack('L',data) - self.center_stream.read_bytes(size,_recv_data) - - def _recv_data(data): - callback(json.loads(data.decode('utf-8'))) - - self.center_stream.read_bytes(8,_recv_size) - - def _sock_conn(self): - def __sock_conn_cb(): - self._send_worker_info() - - self.center_stream = tornado.iostream.IOStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)) - self.center_stream.set_close_callback(self._sock_close_cb) - self.center_stream.connect(self.center_addr,__sock_conn_cb) - - def _sock_close_cb(self): - print('disconnect') - - def _send_worker_info(self): - def __recv_info_cb(info): - self.linkid = info['linkid'] - self.imc_proxy.add_sock_conn(self.center_stream,info['center_linkid']) - - print('/backend/' + self.linkid) - - self.send_pack(bytes(json.dumps({ - 'linkclass':self.linkclass, - 'ws_addr':('210.70.123.215',81) - }),'utf-8')) - - self.recv_pack(__recv_info_cb) - + self._conn_center() + + def _conn_center(self): + def __retry(): + print('retry connect center') + self.ioloop.add_timeout(datetime.timedelta(seconds = 5),self._conn_center) + + def __send_worker_info(): + def ___recv_info_cb(data): + info = json.loads(data.decode('utf-8')) + + self.linkid = info['linkid'] + self.center_conn = netio.SocketConnection(stream) + self.center_conn.add_close_callback(lambda conn : __retry()) + self.imc_proxy.add_conn(info['center_linkid'],self.center_conn) + + print('/backend/' + self.linkid) + + netio.send_pack(stream,bytes(json.dumps({ + 'linkclass':self.linkclass, + 'ws_addr':('210.70.137.215',81) + }),'utf-8')) + netio.recv_pack(stream,___recv_info_cb) + + stream = tornado.iostream.IOStream(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)) + stream.set_close_callback(__retry) + stream.connect(self.center_addr,lambda : __send_worker_info()) + if __name__ == '__main__': - backend_worker = BackendWorker(('localhost',2501)) + backend_worker = BackendWorker(('localhost',5730)) backend_worker.start() tornado.ioloop.IOLoop.instance().start() diff --git a/src/py/center_server.py b/src/py/center_server.py index 1d93946..4a892f8 100644 --- a/src/py/center_server.py +++ b/src/py/center_server.py @@ -1,5 +1,6 @@ +#! /usr/bin/env python + import random -import struct import json import uuid @@ -8,6 +9,7 @@ import tornado.tcpserver import tornado.httpserver import tornado.web +import netio import imcproxy class Worker: @@ -19,40 +21,33 @@ class Worker: self.linkid = linkid self.worker_ip = worker_ip - stream.set_close_callback(self._sock_close_cb) - - self.send_pack(bytes(json.dumps({ + netio.send_pack(self.stream,bytes(json.dumps({ 'linkid':self.linkid, 'center_linkid':center_serv.linkid }),'utf-8')) - center_serv.add_backend_worker(self) - - def send_pack(self,data): - self.stream.write(struct.pack('L',len(data)) + data) - - def recv_pack(self,callback): - def _recv_size(data): - size, = struct.unpack('L',data) - self.stream.read_bytes(size,_recv_data) + conn = netio.SocketConnection(self.stream) + conn.add_close_callback(lambda conn : self.close()) + center_serv.imc_proxy.add_conn(self.linkid,conn) - def _recv_data(data): - callback(json.loads(data.decode('utf-8'))) - - self.stream.read_bytes(8,_recv_size) - - def _sock_close_cb(self): + def close(self): pass class BackendWorker(Worker): def __init__(self,stream,linkid,worker_ip,worker_info): + global center_serv + super().__init__(stream,'backend',linkid,worker_ip) self.ws_addr = worker_info['ws_addr'] + center_serv.add_backend_worker(self) + def add_client(self): return self.ws_addr - def _sock_close_cb(self): + def close(self): + global center_serv + center_serv.del_backend_worker(self) print('disconnect') @@ -63,27 +58,26 @@ class CenterServer(tornado.tcpserver.TCPServer): self.linkid_usemap = {} self.backend_workerlist = [] + self.imc_proxy = imcproxy.IMCProxy() self.linkclass = 'center' self.linkid = self._create_linkid() - self.imc_proxy = imcproxy.IMCProxy() print('/center/' + self.linkid) def handle_stream(self,stream,address): - def _recv_worker_info(worker_info): + def _recv_worker_info(data): + worker_info = json.loads(data.decode('utf-8')) + linkclass = worker_info['linkclass'] if linkclass == 'backend': linkid = self._create_linkid() worker_ip,worker_port = address worker = BackendWorker(stream,linkid,worker_ip,worker_info) - else: - return - self._recv_pack(stream,_recv_worker_info) + netio.recv_pack(stream,_recv_worker_info) def add_backend_worker(self,worker): self.backend_workerlist.append(worker) - self.imc_proxy.add_sock_conn(worker.stream,worker.linkid) def del_backend_worker(self,worker): self.backend_workerlist.remove(worker) @@ -105,19 +99,6 @@ class CenterServer(tornado.tcpserver.TCPServer): return linkid - def _send_pack(self,stream,data): - stream.write(struct.pack('L',len(data)) + data) - - def _recv_pack(self,stream,callback): - def __recv_size(data): - size, = struct.unpack('L',data) - stream.read_bytes(size,__recv_data) - - def __recv_data(data): - callback(json.loads(data.decode('utf-8'))) - - stream.read_bytes(8,__recv_size) - class WebConnHandler(tornado.web.RequestHandler): def get(self): global center_serv @@ -135,7 +116,7 @@ if __name__ == '__main__': global center_serv center_serv = CenterServer() - center_serv.listen(2501) + center_serv.listen(5730) http_serv = tornado.httpserver.HTTPServer(tornado.web.Application([ ('/conn',WebConnHandler), diff --git a/src/py/imcproxy.py b/src/py/imcproxy.py index f126816..a46b99d 100644 --- a/src/py/imcproxy.py +++ b/src/py/imcproxy.py @@ -1,20 +1,73 @@ -#! /usr/bin/env python import json +import tornado.ioloop +import tornado.stack_context + class IMCConnection: def __init__(self): + self._close_callback = [] + + def send_msg(self,data): + pass + + def start_recvloop(self,recvloop_callback): pass -class SocketConnection(IMCConnection): - def __init__(self,stream): - super().__init__() + def add_close_callback(self,callback): + self._close_callback.append(tornado.stack_context.wrap(callback)) + + def close(self): + for callback in self._close_callback: + callback(self) class IMCProxy: def __init__(self): - self.conn_linkidmap = {} + self._linkid_connmap = {} + self._conn_linkidmap = {} + + self.MSGTYPE_CALL = 'call' + + def add_conn(self,linkid,conn): + self._linkid_connmap[id(conn)] = linkid + self._conn_linkidmap[linkid] = conn + + conn.add_close_callback(self._conn_close_cb) + conn.start_recvloop(self._recvloop_dispatch) + + def del_conn(self,conn): + linkid = self._linkid_connmap.pop(id(conn)) + del self._conn_linkidmap[linkid] + + def get_conn(self,linkid): + if linkid not in self.conn_linkidmap: + return None + + return self.conn_linkidmap[linkid] + + def _recvloop_dispatch(self,conn,data): + msg = json.loads(data.decode('utf-8')) + msg_type == msg['type'] + if msg_type == self.MSGTYPE_CALL: + _recv_msg_call(msg) + + def _conn_close_cb(self,conn): + self.del_conn(conn) + print('connection close') + + def _send_msg_call(self,conn,iden,dst,func,param): + msg = { + 'type':self.MSGTYPE_CALL, + 'iden':iden, + 'dst':dst, + 'func':func, + 'param':param + } + conn.send_msg(bytes(json.dumps(msg),'utf-8')) - def add_sock_conn(self,stream,linkid): - self._add_conn(SocketConnection(stream),linkid) + def _recv_msg_call(self,msg): + iden = msg['iden'] + dst = msg['dst'] + func = msg['func'] + param = msg['param'] - def _add_conn(self,conn,linkid): - self.conn_linkidmap[linkid] = conn + print(func) diff --git a/src/py/netio.py b/src/py/netio.py new file mode 100644 index 0000000..9e80d54 --- /dev/null +++ b/src/py/netio.py @@ -0,0 +1,67 @@ +import struct + +import tornado.ioloop +import tornado.stack_context + +import imcproxy + +def send_pack(stream,data): + stream.write(struct.pack('l',len(data)) + data) + +def recv_pack(stream,callback): + def _recv_size(data): + size, = struct.unpack('l',data) + stream.read_bytes(size,lambda data : callback(data)) + + stream.read_bytes(8,_recv_size) + +class SocketConnection(imcproxy.IMCConnection): + def __init__(self,stream): + super().__init__() + + self.ioloop = tornado.ioloop.IOLoop.current() + self.stream = stream + self.stream.set_close_callback(self.close) + + self._start_ping() + + def send_msg(self,data): + self.stream.write(struct.pack('l',len(data)) + data) + + def start_recvloop(self,recvloop_callback): + def _recv_size(data): + size, = struct.unpack('l',data) + if size > 0: + self.stream.read_bytes(size,_recv_data) + else: + if size == -1: #pong + self._ping_delay = 0 + + self.stream.read_bytes(8,_recv_size) + + def _recv_data(data): + self._recvloop_callback(self,data) + self.stream.read_bytes(8,_recv_size) + + self._recvloop_callback = tornado.stack_context.wrap(recvloop_callback) + self.stream.read_bytes(8,_recv_size) + + def close(self): + try: + self._ping_timer.stop() + except AttributeError: + pass + + super().close() + + def _start_ping(self): + def __check(): + self.stream.write(struct.pack('l',-1)) + + self._ping_delay += 1 + if self._ping_delay > 10: + self.close() + + self._ping_timer = tornado.ioloop.PeriodicCallback(__check,1000) + self._ping_timer.start() + self._ping_delay = 0 |