diff options
author | pzread <netfirewall@gmail.com> | 2013-08-08 00:54:42 +0800 |
---|---|---|
committer | pzread <netfirewall@gmail.com> | 2013-08-08 00:54:42 +0800 |
commit | 86dd51fffefa509571a3cd7bb43e4ecfd728e513 (patch) | |
tree | ec32031e8cd2ba7c3689e4360554128305026a14 | |
parent | 02030010c7cef3012a0c733e82a531d83ac9f0ce (diff) | |
download | taiwan-online-judge-86dd51fffefa509571a3cd7bb43e4ecfd728e513.tar taiwan-online-judge-86dd51fffefa509571a3cd7bb43e4ecfd728e513.tar.gz taiwan-online-judge-86dd51fffefa509571a3cd7bb43e4ecfd728e513.tar.bz2 taiwan-online-judge-86dd51fffefa509571a3cd7bb43e4ecfd728e513.tar.lz taiwan-online-judge-86dd51fffefa509571a3cd7bb43e4ecfd728e513.tar.xz taiwan-online-judge-86dd51fffefa509571a3cd7bb43e4ecfd728e513.tar.zst taiwan-online-judge-86dd51fffefa509571a3cd7bb43e4ecfd728e513.zip |
Add DispatchMg
-rw-r--r-- | src/py/dispatch.py | 174 |
1 files changed, 156 insertions, 18 deletions
diff --git a/src/py/dispatch.py b/src/py/dispatch.py index 9484a5c..56ae951 100644 --- a/src/py/dispatch.py +++ b/src/py/dispatch.py @@ -1,19 +1,24 @@ import json +import random import config +import com import imc.async +from imc.proxy import Proxy from tojauth import TOJAuth from asyncdb import AsyncDB class Data: - def __init__(self,dataid,datatype,source,target,status,data,gid,gcount): + def __init__(self,dataid,datatype,source,target,status,data,gid,timestamp): self.dataid = dataid self.datatype = datatype self.source = source self.target = target + self.status = status self.data = data self.gid = gid - self,gcount = gcount + + def store(self,db): class DispatchMg: _accessid = -1 @@ -35,22 +40,42 @@ class DispatchMg: @imc.async.caller @TOJAuth.check_access(_accessid,TOJAuth.ACCESS_WRITE) - def _add_challenge(self,source,target,data,gid = None,gcount = 1): + def _add_challenge(self,source,target,data,gid = None): cur = self.db.cursor() + if gid == None: + gid = com.suid() + cur.execute(('INSERT INTO "DATA_POOL" ' - '("type","source","target","gid","gcount","status","data") ' + '("datatype","source","target","gid","status","data") ' 'VALUES (%s,%s,%s,%s,%s,%s,%s) RETURNING "dataid";'), - (self.DATATYPE_CHALLENGE,source,target,gid,gcount, + (self.DATATYPE_CHALLENGE,source,target,gid, self.DATASTSTUS_PEND,json.dumps(data,'utf-8'))) if cur.rowcount == 0: return 'Efailed' - dataid = cur.fetchone()[0] + dataid = int(cur.fetchone()[0]) + self._dispatch_data(self.DATATYPE_CHALLENGE,gid) - data = Data(dataid,self.DATATYPE_CHALLENGE,source,target, - self.DATASTATUS_PEND,data,gid,gcount) + return {'dataid':dataid} + + @imc.async.caller + @TOJAuth.check_access(_accessid,TOJAuth.ACCESS_WRITE) + def _add_status(self,source,target,data,gid = None): + cur = self.db.cursor() + + cur.execute(('INSERT INTO "DATA_POOL" ' + '("datatype","source","target","gid","status","data") ' + 'VALUES (%s,%s,%s,%s,%s,%s,%s) RETURNING "dataid";'), + (self.DATATYPE_CHALLENGE,source,target,gid, + self.DATASTSTUS_PEND,json.dumps(data,'utf-8'))) + + if cur.rowcount == 0: + return 'Efailed' + + dataid = int(cur.fetchone()[0]) + _dispatch_data(datatype,gid) return {'dataid':dataid} @@ -88,23 +113,136 @@ class DispatchMg: if linkclass != 'backend': return 'Efailed' - return self._unregister_collector(link,''.join(['challenge/',name])) + return self._unregister_collector(link,''.join(['status/',name])) - def _register_collector(self,link,name): - if name not in self.collector_namemap: - self.collector_namemap[name] = {} + def _register_collector(self,datatype,link,name): + if datatype == self.DATATYPE_CHALLENGE: + key = 'challenge/' + name - self.collector_namemap[name][link] = {} + elif datatype == self.DATATYPE_STATUS: + key = 'status/' + name + + if key not in self.collector_namemap: + self.collector_namemap[key] = {} + + self.collector_namemap[key][link] = {} + + cur = db.cursor() + cur.execute('SELECT "dataid","gid" WHERE "target"=%s', + (name)) return 'Success' - def _unregister_collector(self,link,name): - if name not in self.collector_namemap: + def _unregister_collector(self,datatype,link,name): + if datatype == self.DATATYPE_CHALLENGE: + key = 'challenge/' + name + + elif datatype == self.DATATYPE_STATUS: + key = 'status/' + name + + if key not in self.collector_namemap: return 'Success' - self.collector_namemap[name].pop(link) + self.collector_namemap[key].pop(link) return 'Success' - def _dispatch_data(self,datatype,target): - pass + def _dispatch_data(self,datatype,gid): + def __collector_cb(result): + stat,ret = result + + cur = db.cursor() + + if stat == False or len(ret) != len(datalist): + cur.execute(('UPDATE "DATA_POOL" SET "status"=%s ' + 'WHERE "dataid" IN %s'), + (self.DATASTATUS_WAIT,tuple(datalist))) + + _dispatch_data(datatype,gid) + + return + + waitlist = [] + donelist = [] + for dataid,action in ret: + if action == self.DATASTATUS_WAIT: + waitlist.append(dataid) + + else action == self.DATASTATUS_DONE: + donelist.append(dataid) + + if len(waitlist) > 0: + cur.execute(('UPDATE "DATA_POOL" SET "status"=%s ' + 'WHERE "dataid" IN %s'), + (self.DATASTATUS_WAIT,tuple(waitlist))) + + if cur.rowcount == 0: + #TODO + raise Exception('dispatch update failed') + + if len(donelist) > 0: + cur.execute(('UPDATE "DATA_POOL" SET "status"=%s ' + 'WHERE "dataid" IN %s'), + (self.DATASTATUS_DONE,tuple(donelist))) + + if cur.rowcount == 0: + #TODO + raise Exception('dispatch update failed') + + cur = db.cursor() + + cur.execute(('SELECT "dataid","source","target","status","data",' + '"timestamp" FROM "DATA_POOL" ' + 'WHERE "datatype"=%s AND status!=%s AND "gid"=%s ' + 'ORDER BY "dataid" ASC;'), + (datatype,self.DATASTATUS_DONE,gid)) + + if cur.rowcount == 0: + return + + waitflag = False + datalist = [] + for pair in cur: + data = { + 'dataid':pair[0], + 'datatype':datatype, + 'soruce':pair[1], + 'target':pair[2], + 'status':pair[3], + 'data':json.loads(pair[4],'utf-8'), + 'timestamp':pair[5] + } + + if data['status'] == self.DATASTSTUS_WAIT: + waitflag = True + break + + datalist.append(data) + + if waitflag == True: + return + + target = datalist[0]['target'] + last_dataid = datalist[-1]['dataid'] + cur.execute(('UPDATE "DATA_POOL" SET "status"=%s ' + 'WHERE "dataid"<=%s AND ' + '"datatype"=%s AND status=%s AND "gid"=%s;'), + (last_dataid,datatype,self.DATASTATUS_WAIT,gid)) + + if cur.rowcount == 0: + #TODO + raise Exception('dispatch update failed') + + if datatype == self.DATATYPE_CHALLENGE: + prefix = 'challenge/' + + elif datatype == self.DATATYPE_STATUS: + prefix = 'status/' + + links = list(self.collector_namemap[prefix + target].values()) + worker_link = links[random.randrange(len(links))] + + Proxy.instance.call_async( + ''.join([worker_link,'dispatch/collector/',prefix],target,10000, + __collector_cb,datalist) + |