aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorpzread <netfirewall@gmail.com>2013-08-08 00:54:42 +0800
committerpzread <netfirewall@gmail.com>2013-08-08 00:54:42 +0800
commit86dd51fffefa509571a3cd7bb43e4ecfd728e513 (patch)
treeec32031e8cd2ba7c3689e4360554128305026a14
parent02030010c7cef3012a0c733e82a531d83ac9f0ce (diff)
downloadtaiwan-online-judge-86dd51fffefa509571a3cd7bb43e4ecfd728e513.tar
taiwan-online-judge-86dd51fffefa509571a3cd7bb43e4ecfd728e513.tar.gz
taiwan-online-judge-86dd51fffefa509571a3cd7bb43e4ecfd728e513.tar.bz2
taiwan-online-judge-86dd51fffefa509571a3cd7bb43e4ecfd728e513.tar.lz
taiwan-online-judge-86dd51fffefa509571a3cd7bb43e4ecfd728e513.tar.xz
taiwan-online-judge-86dd51fffefa509571a3cd7bb43e4ecfd728e513.tar.zst
taiwan-online-judge-86dd51fffefa509571a3cd7bb43e4ecfd728e513.zip
Add DispatchMg
-rw-r--r--src/py/dispatch.py174
1 files changed, 156 insertions, 18 deletions
diff --git a/src/py/dispatch.py b/src/py/dispatch.py
index 9484a5c..56ae951 100644
--- a/src/py/dispatch.py
+++ b/src/py/dispatch.py
@@ -1,19 +1,24 @@
import json
+import random
import config
+import com
import imc.async
+from imc.proxy import Proxy
from tojauth import TOJAuth
from asyncdb import AsyncDB
class Data:
- def __init__(self,dataid,datatype,source,target,status,data,gid,gcount):
+ def __init__(self,dataid,datatype,source,target,status,data,gid,timestamp):
self.dataid = dataid
self.datatype = datatype
self.source = source
self.target = target
+ self.status = status
self.data = data
self.gid = gid
- self,gcount = gcount
+
+ def store(self,db):
class DispatchMg:
_accessid = -1
@@ -35,22 +40,42 @@ class DispatchMg:
@imc.async.caller
@TOJAuth.check_access(_accessid,TOJAuth.ACCESS_WRITE)
- def _add_challenge(self,source,target,data,gid = None,gcount = 1):
+ def _add_challenge(self,source,target,data,gid = None):
cur = self.db.cursor()
+ if gid == None:
+ gid = com.suid()
+
cur.execute(('INSERT INTO "DATA_POOL" '
- '("type","source","target","gid","gcount","status","data") '
+ '("datatype","source","target","gid","status","data") '
'VALUES (%s,%s,%s,%s,%s,%s,%s) RETURNING "dataid";'),
- (self.DATATYPE_CHALLENGE,source,target,gid,gcount,
+ (self.DATATYPE_CHALLENGE,source,target,gid,
self.DATASTSTUS_PEND,json.dumps(data,'utf-8')))
if cur.rowcount == 0:
return 'Efailed'
- dataid = cur.fetchone()[0]
+ dataid = int(cur.fetchone()[0])
+ self._dispatch_data(self.DATATYPE_CHALLENGE,gid)
- data = Data(dataid,self.DATATYPE_CHALLENGE,source,target,
- self.DATASTATUS_PEND,data,gid,gcount)
+ return {'dataid':dataid}
+
+ @imc.async.caller
+ @TOJAuth.check_access(_accessid,TOJAuth.ACCESS_WRITE)
+ def _add_status(self,source,target,data,gid = None):
+ cur = self.db.cursor()
+
+ cur.execute(('INSERT INTO "DATA_POOL" '
+ '("datatype","source","target","gid","status","data") '
+ 'VALUES (%s,%s,%s,%s,%s,%s,%s) RETURNING "dataid";'),
+ (self.DATATYPE_CHALLENGE,source,target,gid,
+ self.DATASTSTUS_PEND,json.dumps(data,'utf-8')))
+
+ if cur.rowcount == 0:
+ return 'Efailed'
+
+ dataid = int(cur.fetchone()[0])
+ _dispatch_data(datatype,gid)
return {'dataid':dataid}
@@ -88,23 +113,136 @@ class DispatchMg:
if linkclass != 'backend':
return 'Efailed'
- return self._unregister_collector(link,''.join(['challenge/',name]))
+ return self._unregister_collector(link,''.join(['status/',name]))
- def _register_collector(self,link,name):
- if name not in self.collector_namemap:
- self.collector_namemap[name] = {}
+ def _register_collector(self,datatype,link,name):
+ if datatype == self.DATATYPE_CHALLENGE:
+ key = 'challenge/' + name
- self.collector_namemap[name][link] = {}
+ elif datatype == self.DATATYPE_STATUS:
+ key = 'status/' + name
+
+ if key not in self.collector_namemap:
+ self.collector_namemap[key] = {}
+
+ self.collector_namemap[key][link] = {}
+
+ cur = db.cursor()
+ cur.execute('SELECT "dataid","gid" WHERE "target"=%s',
+ (name))
return 'Success'
- def _unregister_collector(self,link,name):
- if name not in self.collector_namemap:
+ def _unregister_collector(self,datatype,link,name):
+ if datatype == self.DATATYPE_CHALLENGE:
+ key = 'challenge/' + name
+
+ elif datatype == self.DATATYPE_STATUS:
+ key = 'status/' + name
+
+ if key not in self.collector_namemap:
return 'Success'
- self.collector_namemap[name].pop(link)
+ self.collector_namemap[key].pop(link)
return 'Success'
- def _dispatch_data(self,datatype,target):
- pass
+ def _dispatch_data(self,datatype,gid):
+ def __collector_cb(result):
+ stat,ret = result
+
+ cur = db.cursor()
+
+ if stat == False or len(ret) != len(datalist):
+ cur.execute(('UPDATE "DATA_POOL" SET "status"=%s '
+ 'WHERE "dataid" IN %s'),
+ (self.DATASTATUS_WAIT,tuple(datalist)))
+
+ _dispatch_data(datatype,gid)
+
+ return
+
+ waitlist = []
+ donelist = []
+ for dataid,action in ret:
+ if action == self.DATASTATUS_WAIT:
+ waitlist.append(dataid)
+
+ else action == self.DATASTATUS_DONE:
+ donelist.append(dataid)
+
+ if len(waitlist) > 0:
+ cur.execute(('UPDATE "DATA_POOL" SET "status"=%s '
+ 'WHERE "dataid" IN %s'),
+ (self.DATASTATUS_WAIT,tuple(waitlist)))
+
+ if cur.rowcount == 0:
+ #TODO
+ raise Exception('dispatch update failed')
+
+ if len(donelist) > 0:
+ cur.execute(('UPDATE "DATA_POOL" SET "status"=%s '
+ 'WHERE "dataid" IN %s'),
+ (self.DATASTATUS_DONE,tuple(donelist)))
+
+ if cur.rowcount == 0:
+ #TODO
+ raise Exception('dispatch update failed')
+
+ cur = db.cursor()
+
+ cur.execute(('SELECT "dataid","source","target","status","data",'
+ '"timestamp" FROM "DATA_POOL" '
+ 'WHERE "datatype"=%s AND status!=%s AND "gid"=%s '
+ 'ORDER BY "dataid" ASC;'),
+ (datatype,self.DATASTATUS_DONE,gid))
+
+ if cur.rowcount == 0:
+ return
+
+ waitflag = False
+ datalist = []
+ for pair in cur:
+ data = {
+ 'dataid':pair[0],
+ 'datatype':datatype,
+ 'soruce':pair[1],
+ 'target':pair[2],
+ 'status':pair[3],
+ 'data':json.loads(pair[4],'utf-8'),
+ 'timestamp':pair[5]
+ }
+
+ if data['status'] == self.DATASTSTUS_WAIT:
+ waitflag = True
+ break
+
+ datalist.append(data)
+
+ if waitflag == True:
+ return
+
+ target = datalist[0]['target']
+ last_dataid = datalist[-1]['dataid']
+ cur.execute(('UPDATE "DATA_POOL" SET "status"=%s '
+ 'WHERE "dataid"<=%s AND '
+ '"datatype"=%s AND status=%s AND "gid"=%s;'),
+ (last_dataid,datatype,self.DATASTATUS_WAIT,gid))
+
+ if cur.rowcount == 0:
+ #TODO
+ raise Exception('dispatch update failed')
+
+ if datatype == self.DATATYPE_CHALLENGE:
+ prefix = 'challenge/'
+
+ elif datatype == self.DATATYPE_STATUS:
+ prefix = 'status/'
+
+ links = list(self.collector_namemap[prefix + target].values())
+ worker_link = links[random.randrange(len(links))]
+
+ Proxy.instance.call_async(
+ ''.join([worker_link,'dispatch/collector/',prefix],target,10000,
+ __collector_cb,datalist)
+