diff options
Diffstat (limited to 'src/py')
-rwxr-xr-x[-rw-r--r--] | src/py/asyncdb.py | 0 | ||||
-rwxr-xr-x | src/py/backend_server.py | 43 | ||||
-rwxr-xr-x | src/py/center_server.py | 72 | ||||
-rw-r--r-- | src/py/imc/async.py | 40 | ||||
-rw-r--r-- | src/py/imc/auth.py | 9 | ||||
-rwxr-xr-x | src/py/imc/proxy.py | 17 | ||||
-rw-r--r-- | src/py/mail.py | 216 | ||||
-rwxr-xr-x[-rw-r--r--] | src/py/netio.py | 0 | ||||
-rwxr-xr-x | src/py/user.py | 29 |
9 files changed, 357 insertions, 69 deletions
diff --git a/src/py/asyncdb.py b/src/py/asyncdb.py index 1e2bf7a..1e2bf7a 100644..100755 --- a/src/py/asyncdb.py +++ b/src/py/asyncdb.py diff --git a/src/py/backend_server.py b/src/py/backend_server.py index 8d9d169..a27240c 100755 --- a/src/py/backend_server.py +++ b/src/py/backend_server.py @@ -62,17 +62,31 @@ class BackendWorker(tornado.tcpserver.TCPServer): netio.recv_pack(sock_stream,_recv_conn_info) def add_client(self,link,handler): + @imc.async.caller + def _call(): + with TOJAuth.change_current_iden(self._idendesc): + Proxy.instance.call(self.center_conn.link,'add_client',10000,link,self._link) + self._client_linkmap[link] = {} - conn = netio.WebSocketConnection('client',link,handler) + conn = netio.WebSocketConnection(link,handler) conn.add_close_callback(lambda conn : self.del_client(conn.link)) Proxy.instance.add_conn(conn) + _call() + return conn def del_client(self,link): + @imc.async.caller + def _call(): + with TOJAuth.change_current_iden(self._idendesc): + Proxy.instance.call(self.center_conn.link,'del_client',10000,link,self._link) + del self._client_linkmap[link] + _call() + def _conn_center(self): def __retry(conn): print('retry connect center') @@ -94,12 +108,14 @@ class BackendWorker(tornado.tcpserver.TCPServer): self.center_conn.add_close_callback(__retry) Proxy.instance.add_conn(self.center_conn) + Proxy.instance.register_call('test/','get_client_list',self._test_get_client_list) imc_register_call('','test_dst',self._test_dst) + #imc_register_call('','test_dsta',self._test_dsta) - time.sleep(1) + #$time.sleep(1) #if self._link == '/backend/2/': - self._test_call(None) + #self._test_call(None) sock_ip,sock_port = self.sock_addr netio.send_pack(stream,bytes(json.dumps({ @@ -218,6 +234,14 @@ class BackendWorker(tornado.tcpserver.TCPServer): except KeyError: pass + def _get_link(linkclass): + if linkclass == 'center': + return self.center_conn.link + + @imc.async.caller + def _test_get_client_list(self,talk,talk2): + return list(self._client_linkmap.items()) + @imc.async.caller def _test_call(self,param): with TOJAuth.change_current_iden(self._idendesc): @@ -282,6 +306,7 @@ class WebSocketConnHandler(tornado.websocket.WebSocketHandler): else: try: info = json.loads(msg) + print(info) self.backend_conn = backend_worker.add_client(info['client_link'],self) except Exception: @@ -311,12 +336,12 @@ if __name__ == '__main__': worker_list.append(Process(target = start_backend_worker,args = (81, ))) worker_list.append(Process(target = start_backend_worker,args = (82, ))) - worker_list.append(Process(target = start_backend_worker,args = (181, ))) - worker_list.append(Process(target = start_backend_worker,args = (182, ))) - worker_list.append(Process(target = start_backend_worker,args = (183, ))) - worker_list.append(Process(target = start_backend_worker,args = (184, ))) - worker_list.append(Process(target = start_backend_worker,args = (185, ))) - worker_list.append(Process(target = start_backend_worker,args = (186, ))) + #worker_list.append(Process(target = start_backend_worker,args = (181, ))) + #worker_list.append(Process(target = start_backend_worker,args = (182, ))) + #worker_list.append(Process(target = start_backend_worker,args = (183, ))) + #worker_list.append(Process(target = start_backend_worker,args = (184, ))) + #worker_list.append(Process(target = start_backend_worker,args = (185, ))) + #worker_list.append(Process(target = start_backend_worker,args = (186, ))) for proc in worker_list: proc.start() diff --git a/src/py/center_server.py b/src/py/center_server.py index 1164b08..a8037a7 100755 --- a/src/py/center_server.py +++ b/src/py/center_server.py @@ -59,7 +59,8 @@ class CenterServer(tornado.tcpserver.TCPServer): self._ioloop = tornado.ioloop.IOLoop.instance() self._linkid_usemap = {} self._worker_linkmap = {} - self._backend_clientmap = {} + self._client_linkmap = {} + self._client_backendmap = {} self._backend_workerlist = [] pubkey = open('pubkey.pem','r').read() @@ -73,8 +74,10 @@ class CenterServer(tornado.tcpserver.TCPServer): imc_register_call('','lookup_link',self._lookup_link) imc_register_call('','create_iden',self._create_iden) - #imc_register_call('','add_client',self._add_client) - #imc_register_call('','del_client',self._del_client) + imc_register_call('','add_client',self._add_client) + imc_register_call('','del_client',self._del_client) + + imc_register_call('test/','get_client_list',self._test_get_client_list) def handle_stream(self,stream,addr): def _recv_worker_info(data): @@ -96,23 +99,30 @@ class CenterServer(tornado.tcpserver.TCPServer): backend_link = backend.link self._worker_linkmap[backend_link] = backend - self._backend_clientmap[backend_link] = {} self._backend_workerlist.append(backend) + self._client_backendmap[backend_link] = set() def del_backend_worker(self,backend): backend_link = backend.link del self._worker_linkmap[backend_link] - del self._backend_clientmap[backend_link] self._backend_workerlist.remove(backend) + for link in self._client_backendmap[backend_link]: + del self._client_linkmap[link] + self._client_backendmap[backend_link].remove(linkid) + + Proxy.instance.unlink_conn(link) + + del self._client_backendmap[backend_link] + def dispatch_client(self): size = len(self._backend_workerlist) if size == 0: return None - link = self._create_link() - idendesc = TOJAuth.instance.create_iden('client',link,2,TOJAuth.ROLETYPE_GUEST) + link = self._create_link('client') + idendesc = TOJAuth.instance.create_iden(link,2,TOJAuth.ROLETYPE_GUEST) backend = self._backend_workerlist[random.randrange(size)] ws_ip,ws_port = backend.ws_addr @@ -131,8 +141,6 @@ class CenterServer(tornado.tcpserver.TCPServer): @imc.async.caller def _lookup_link(self,link): try: - worker = self._worker_linkmap[link] - #a = int(TOJAuth.get_current_iden()['linkid']) #b = int(linkid) @@ -142,8 +150,10 @@ class CenterServer(tornado.tcpserver.TCPServer): #else: # worker = self._worker_linkidmap[str(a - 1)] - linkclass = TOJAuth.get_current_iden()['link'].split('/',1) + linkclass = TOJAuth.get_current_iden()['link'].split('/',2)[1] if linkclass != 'client': + worker = self._worker_linkmap[link] + sock_ip,sock_port = worker.sock_addr return { 'worker_link':worker.link, @@ -151,6 +161,9 @@ class CenterServer(tornado.tcpserver.TCPServer): 'sock_port':sock_port } + else: + return None + except KeyError: return None @@ -159,25 +172,33 @@ class CenterServer(tornado.tcpserver.TCPServer): def _create_iden(self,link,idenid,roletype,payload): return TOJAuth.instance.create_iden(link,idenid,roletype,payload) - #@imc.async.caller - #def _add_client(self,param): - # backend_linkid = iden['linkid'] - # client_linkid = param['client_linkid'] + @imc.async.caller + @TOJAuth.check_access(1,TOJAuth.ACCESS_EXECUTE) + def _add_client(self,client_link,backend_link): + self._client_linkmap[client_link] = { + 'backend_link':backend_link + } + self._client_backendmap[backend_link].add(client_link) + + conn = Proxy.instance.get_conn(backend_link) + Proxy.instance.link_conn(client_link,conn) + + print(client_link); + + @imc.async.caller + @TOJAuth.check_access(1,TOJAuth.ACCESS_EXECUTE) + def _del_client(self,client_link,backend_link): + del self._client_linkmap[client_link] + self._client_backendmap[backend_link].remove(client_link) + + Proxy.instance.unlink_conn(client_link) - # self._backend_clientmap[backend_linkid][client_linkid] = True - # conn = Proxy.instance.get_conn(backend_linkid) - # Proxy.instance.link_conn(client_linkid,conn) - # print(client_linkid); - #@imc.async.caller - #def _del_client(self,param): - # backend_linkid = iden['linkid'] - # client_linkid = param - # del self._backend_clientmap[backend_linkid][client_linkid] - # conn = Proxy.instance.get_conn(client_linkid) - # Proxy.instance.unlink_conn(client_linkid) + @imc.async.caller + def _test_get_client_list(self,talk,talk2): + return list(self._client_linkmap.items()) @@ -192,6 +213,7 @@ class WebConnHandler(tornado.web.RequestHandler): data = center_serv.dispatch_client() if data == None: self.write('Eno_backend') + else: client_idendesc,backend_link,ip,port = data self.write(json.dumps({ diff --git a/src/py/imc/async.py b/src/py/imc/async.py index c904d2b..fdb42c4 100644 --- a/src/py/imc/async.py +++ b/src/py/imc/async.py @@ -21,10 +21,17 @@ def switch_top(): old_contexts = tornado.stack_context._state.contexts auth.current_idendata = None - result = gr_main.switch(None) + try: + result = gr_main.switch(None) - tornado.stack_context._state.contexts = old_contexts - auth.current_idendata = old_idendata + except Exception as err: + traceback.print_stack() + print(err) + return (False,'Einternal') + + finally: + tornado.stack_context._state.contexts = old_contexts + auth.current_idendata = old_idendata return result @@ -44,18 +51,16 @@ def caller(f): return (True,ret) + old_idendata = auth.current_idendata + old_contexts = tornado.stack_context._state.contexts + try: gr = greenlet(_call) grid = id(gr) gr_idmap[grid] = set() - old_idendata = auth.current_idendata - old_contexts = tornado.stack_context._state.contexts result = gr.switch(*args,**kwargs) - tornado.stack_context._state.contexts = old_contexts - auth.current_idendata = old_idendata - if result == None: return (False,None) @@ -74,6 +79,10 @@ def caller(f): print(err) return (False,'Einternal') + finally: + tornado.stack_context._state.contexts = old_contexts + auth.current_idendata = old_idendata + return wrapper def get_retid(): @@ -103,19 +112,20 @@ def ret(retid,value = None,err = None): except KeyError: return - try: - old_idendata = auth.current_idendata - old_contexts = tornado.stack_context._state.contexts + old_idendata = auth.current_idendata + old_contexts = tornado.stack_context._state.contexts + try: if err == None: gr.switch(value) else: gr.throw(err) - tornado.stack_context._state.contexts = old_contexts - auth.current_idendata = old_idendata - - except TypeError as err: + except Exception as err: traceback.print_stack() print(err) + + finally: + tornado.stack_context._state.contexts = old_contexts + auth.current_idendata = old_idendata diff --git a/src/py/imc/auth.py b/src/py/imc/auth.py index 2b06ac4..02dd2dc 100644 --- a/src/py/imc/auth.py +++ b/src/py/imc/auth.py @@ -34,12 +34,17 @@ class Auth: return idendesc @staticmethod - def change_current_iden(idendesc): + def change_current_iden(idendesc,auth = None): @contextlib.contextmanager def context(): global current_idendata - iden = Auth.instance.get_iden(idendesc) + auth = None + + if auth == None: + auth = Auth.instance + + iden = auth.get_iden(idendesc) if iden == None: raise ValueError('Illegal idendesc') diff --git a/src/py/imc/proxy.py b/src/py/imc/proxy.py index c9d57bc..45841ab 100755 --- a/src/py/imc/proxy.py +++ b/src/py/imc/proxy.py @@ -73,7 +73,7 @@ class FileResult(): return self._result class Proxy: - def __init__(self,link,auth_instance,idendesc,conn_link_fn = lambda : None): + def __init__(self,link,auth,idendesc,conn_link_fn = lambda link : None): self.MSGTYPE_CALL = 'call' self.MSGTYPE_RET = 'ret' self.MSGTYPE_SENDFILE = 'sendfile' @@ -81,7 +81,7 @@ class Proxy: self._ioloop = tornado.ioloop.IOLoop.instance() self._link = link - self._auth = auth_instance + self._auth = auth self._idendesc = idendesc self._conn_link_fn = conn_link_fn @@ -173,7 +173,7 @@ class Proxy: 'timer':self._ioloop.add_timeout(datetime.timedelta(days = 1),lambda : self._ret_sendfile('Etimeout')) } - with Auth.change_current_iden(self._idendesc): + with Auth.change_current_iden(self._idendesc,self._auth): stat,ret = self.call(dst_link + 'imc/','pend_recvfile',65536,self._link,filekey,filesize) if stat == False: @@ -220,7 +220,7 @@ class Proxy: except KeyError: return - with Auth.change_current_iden(self._idendesc): + with Auth.change_current_iden(self._idendesc,self._auth): self.call(info['src_link'] + 'imc/','reject_sendfile',65536,filekey) def _route_call(self,in_conn,caller_link,caller_retid,idendesc,dst,func_name,timeout,param): @@ -250,9 +250,9 @@ class Proxy: else: in_link = self._link - #iden = self._auth.get_iden(in_linkclass,in_linkid,idendesc) - #if iden == None: - # return __ret((False,'Eilliden')) + iden = self._auth.verify_iden(in_link,idendesc) + if iden == None: + return __ret((False,'Eilliden')) try: dst_part = dst.split('/',3) @@ -270,7 +270,7 @@ class Proxy: result = self._call_pathmap[''.join([dst_path,func_name])](*param) else: - with Auth.change_current_iden(idendesc): + with Auth.change_current_iden(idendesc,self._auth): result = self._call_pathmap[''.join([dst_path,func_name])](*param) except KeyError: @@ -283,7 +283,6 @@ class Proxy: else: conn = self._request_conn(dst_link) if conn == None: - print(dst_link) return __ret((False,'Enoexist')) else: diff --git a/src/py/mail.py b/src/py/mail.py new file mode 100644 index 0000000..1dcc2eb --- /dev/null +++ b/src/py/mail.py @@ -0,0 +1,216 @@ +from tojauth import TOJAuth +from asyncdb import AsyncDB +from user import UserMg +import imc.proxy +import config + +class Mail: + _accessid = 3 + + MAIL_TYPE_INBOX = 1 + MAIL_TYPE_SENT_BACKUP = 2 + MAIL_TYPE_DRAFT = 3 + + TITLE_LEN_MIN = 1 + TITLE_LEN_MAX = 256 + CONTENT_LEN_MIN = 0 + CONTENT_LEN_MAX = 65536 + + LIST_ITEM_PER_PAGE = 20 + + def __init__(self, mod_idendesc, get_link): + Mail.instance = self + Mail.db = AsyncDB(config.CORE_DBNAME, config.CORE_DBUSER, + config.CORE_DBPASSWORD) + Mail._idendesc = mod_idendesc + self.get_link = get_link + + @imc.async.caller + def send_mail(self, to_uid, title, content): + if( + type(to_uid) != int or + type(title) != str or + type(content) != str + ): + return 'Eparameter' + + if len(title) < self.TITLE_LEN_MIN: + return 'Etitle_too_short' + elif len(title) > self.TITLE_LEN_MAX: + return 'Etitle_too_long' + elif len(content) < self.CONTENT_LEN_MIN: + return 'Econtent_too_short' + elif len(content) > self.CONTENT_LEN_MAX: + return 'Econtent_too_long' + + user_iden = TOJAuth.get_current_iden() + try: + uid = user_iden['uid'] + except KeyError: + return 'Euid_error' + + if not UserMg.instance.does_uid_exist(to_uid): + return 'Eto_uid_not_exist' + + with TOJAuth.change_current_iden(self._idendesc): + self._add_mail( + to_uid, uid, self.MAIL_TYPE_INBOX, True, title, content + ) + self._add_mail( + uid, uid, self.MAIL_TYPE_SENT_BACKUP, False, title,content + ) + + + @TOJAuth.check_access(_accessid, TOJAuth.ACCESS_EXECUTE) + def _add_mail(self, uid, from_uid, mail_type, unread, title, content): + cur = self.db.cursor() + sqlstr = ('INSERT INTO "MAIL" ("uid", "from_uid", "mail_type", ' + '"unread", "title", "content") VALUES (%s, %s, %s, %s, %s, ' + '%s) RETURNING "mailid";') + sqlarr = (uid, from_uid, mail_type, unread, title, content) + cur.execute(sqlstr, sqlarr) + + mailid = None + for data in cur: + mailid = data[0] + + return mailid + + @imc.async.caller + def recv_mail(self, mailid): + if( + type(mailid) != int + ): + return 'Eparameter' + + user_iden = TOJAuth.get_current_iden() + try: + uid = user_iden['uid'] + except KeyError: + return 'Euid_error' + + with TOJAuth.change_current_iden(self._idendesc): + mail = self._get_mail(mailid) + + if mail == None: + return 'Eno_such_mailid' + if mail['to_uid'] != uid: + TOJAuth.check_access( + self._accessid, TOJAuth.ACCESS_EXECUTE)(lambda x:x)(0) + + with TOJAuth.change_current_iden(self._idendesc): + self._set_unread_stat(mailid, False) + + return mail + + @TOJAuth.check_access(_accessid, TOJAuth.ACCESS_EXECUTE) + def _get_mail(self, mailid): + cur = self.db.cursor() + sqlstr = ('SELECT * FROM "MAIL" WHERE "mailid" = %s;') + sqlarr = (mailid, ) + cur.execute(sqlstr, sqlarr) + + ret = None + for data in cur: + ret = {} + ret['mailid'] = data[0] + ret['to_uid'] = data[1] + ret['from_uid'] = data[2] + ret['mail_type'] = data[3] + ret['title'] = data[5] + ret['content'] = data[6] + ret['send_time'] = data[7] + + ret['to_username'] = ( + UserMg.instance.get_user_info_by_uid(data[1])['username']) + ret['from_username'] = ( + UserMg.instance.get_user_info_by_uid(data[2])['username']) + + return ret + + @TOJAuth.check_access(_accessid, TOJAuth.ACCESS_EXECUTE) + def _set_unread_stat(self, mailid, unread): + cur = self.db.cursor() + sqlstr = ('UPDATE "MAIL" SET "unread" = %s WHERE "mailid" = %s;') + sqlarr = (unread, mailid) + cur.execute(sqlstr, sqlarr) + + @imc.async.caller + def list_mail( + self, mail_type, start_index = 1, end_index = LIST_ITEM_PER_PAGE + ): + if( + type(mail_type) != int + ): + return 'Eparameter' + + user_iden = TOJAuth.get_current_iden() + try: + uid = user_iden['uid'] + except KeyError: + return 'Euid_error' + + with TOJAuth.change_current_iden(self._idendesc): + maillist = self._get_maillist( + uid, mail_type, start_index, end_index) + + return maillist + + @TOJAuth.check_access(_accessid, TOJAuth.ACCESS_EXECUTE) + def _get_maillist(self, uid, mail_type, start_index, end_index): + cur = self.db.cursor() + sqlstr = ('SELECT "mailid", "from_uid", "unread", "title", ' + '"send_time" FROM "MAIL" WHERE "uid" = %s AND "mail_type" = ' + '%s ORDER BY "mailid" ASC LIMIT %s OFFSET %s;') + sqlarr = (uid, mail_type, end_index - start_index + 1, start_index - 1) + cur.execute(sqlstr, sqlarr) + + ret = [] + + for data in cur: + item = {} + item['mailid'] = data[0] + item['from_uid'] = data[1] + item['unread'] = data[2] + item['title'] = data[3] + item['send_time'] = data[4] + + item['from_username'] = ( + UserMg.instance.get_user_info_by_uid(data[1])['username']) + + ret.append(item) + + return ret + + @imc.async.caller + def del_mail(self, mailid): + if( + type(mailid) != int + ): + return 'Eparameter' + + user_iden = TOJAuth.get_current_iden() + try: + uid = user_iden['uid'] + except KeyError: + return 'Euid_error' + + with TOJAuth.change_current_iden(self._idendesc): + mail = self._get_mail(mailid) + + if mail == None: + return 'Eno_such_mailid' + if mail['to_uid'] != uid: + TOJAuth.check_access( + self._accessid, TOJAuth.ACCESS_EXECUTE)(lambda x:x)(0) + + with TOJAuth.change_current_iden(self._idendesc): + self._del_mail(mailid) + + @TOJAuth.check_access(_accessid, TOJAuth.ACCESS_EXECUTE) + def _del_mail(self, mailid): + cur = self.db.cursor() + sqlstr = ('DELETE FROM "MAIL" WHERE "mailid" = %s;') + sqlarr = (mailid, ) + cur.execute(sqlstr, sqlarr) + diff --git a/src/py/netio.py b/src/py/netio.py index aa2f9b2..aa2f9b2 100644..100755 --- a/src/py/netio.py +++ b/src/py/netio.py diff --git a/src/py/user.py b/src/py/user.py index 4d1c2fb..d22d8b3 100755 --- a/src/py/user.py +++ b/src/py/user.py @@ -6,7 +6,7 @@ from asyncdb import AsyncDB import imc.proxy import config -class User: +class UserMg: _accessid = 2 USERNAME_LEN_MIN = 5 @@ -23,10 +23,10 @@ class User: ABOUTME_LEN_MAX = 1000 def __init__(self, mod_idendesc, get_link): - User.instance = self - User.db = AsyncDB(config.CORE_DBNAME, config.CORE_DBUSER, + UserMg.instance = self + UserMg.db = AsyncDB(config.CORE_DBNAME, config.CORE_DBUSER, config.CORE_DBPASSWORD) - User._idendesc = mod_idendesc + UserMg._idendesc = mod_idendesc self.get_link = get_link @imc.async.caller @@ -73,7 +73,7 @@ class User: uid = self._create_user( username, passhash, nickname, email, avatar, aboutme) except psycopg2.IntegrityError: - return 'Eusername_already_exists' + return 'Eusername_exists' return {'uid' : uid} @@ -122,7 +122,10 @@ class User: with TOJAuth.change_current_iden(self._idendesc): idendesc = TOJAuth.instance.create_iden( - TOJAuth.get_current_iden()['link'], idenid, TOJAuth.ROLETYPE_USER, {'uid' : uid} + TOJAuth.get_current_iden()['link'], + idenid, + TOJAuth.ROLETYPE_USER, + {'uid' : uid} ) ret = { @@ -161,7 +164,10 @@ class User: with TOJAuth.change_current_iden(self._idendesc): idendesc = TOJAuth.instance.create_iden( - TOJAuth.get_current_iden()['link'], idenid, TOJAuth.ROLETYPE_USER, {'uid' : uid} + TOJAuth.get_current_iden()['link'], + idenid, + TOJAuth.ROLETYPE_USER, + {'uid' : uid} ) ret = { @@ -179,7 +185,7 @@ class User: ): return 'Eparameter' - ret = self._get_user_info_by_uid(uid) + ret = self.get_user_info_by_uid(uid) if ret == None: return 'Eno_such_uid' @@ -282,7 +288,7 @@ class User: return self._password_hash( 'GENGJIAN_WEISUO_KING^^' + str(uid) + '@E__E@' + passhash + 'Yo!') - def _get_user_info_by_uid(self, uid): + def get_user_info_by_uid(self, uid): cur = self.db.cursor() sqlstr = ('SELECT * FROM "USER" WHERE "uid" = %s;') sqlarr = (uid, ) @@ -329,3 +335,8 @@ class User: return uid != None + def does_uid_exist(self, uid): + idenid = self.get_idenid_by_uid(uid) + + return idenid != None + |