diff options
Diffstat (limited to 'toj/center/src/judge_server.cpp')
-rwxr-xr-x[-rw-r--r--] | toj/center/src/judge_server.cpp | 338 |
1 files changed, 76 insertions, 262 deletions
diff --git a/toj/center/src/judge_server.cpp b/toj/center/src/judge_server.cpp index f5e6d6d..7a2bb6a 100644..100755 --- a/toj/center/src/judge_server.cpp +++ b/toj/center/src/judge_server.cpp @@ -6,7 +6,6 @@ #include<dlfcn.h> #include<signal.h> #include<limits.h> -#include<ftw.h> #include<pthread.h> #include<semaphore.h> #include<errno.h> @@ -21,9 +20,10 @@ #include<map> #include<queue> -#include"judge_def.h" #include"netio.h" #include"tpool.h" +#include"judge_def.h" +#include"judge.h" #include"center_com.h" #include"judgm_line.h" #include"judgm_lib.h" @@ -48,14 +48,8 @@ server_conn::server_conn(int fd):netio(fd){ this->done_sendjmod_fn = new netio_iofn<server_conn>(this,&server_conn::done_sendjmod); this->recv_sendcode_fn = new netio_iofn<server_conn>(this,&server_conn::recv_sendcode); this->done_sendcode_fn = new netio_iofn<server_conn>(this,&server_conn::done_sendcode); - this->tp_unpackpro_thfn = new tpool_fn<server_conn>(this,&server_conn::tp_unpackpro_th); - this->tp_unpackpro_cbfn = new tpool_fn<server_conn>(this,&server_conn::tp_unpackpro_cb); this->tp_unpackjmod_thfn = new tpool_fn<server_conn>(this,&server_conn::tp_unpackjmod_th); this->tp_unpackjmod_cbfn = new tpool_fn<server_conn>(this,&server_conn::tp_unpackjmod_cb); - this->tp_unpackcode_thfn = new tpool_fn<server_conn>(this,&server_conn::tp_unpackcode_th); - this->tp_unpackcode_cbfn = new tpool_fn<server_conn>(this,&server_conn::tp_unpackcode_cb); - this->tp_judge_thfn = new tpool_fn<server_conn>(this,&server_conn::tp_judge_th); - this->tp_judge_cbfn = new tpool_fn<server_conn>(this,&server_conn::tp_judge_cb); send_setid(); } @@ -110,16 +104,30 @@ int server_conn::send_result(int subid,char *res_data,size_t res_len){ return 0; } -int server_conn::send_setpro(int *proid,int *cacheid,int type,int count){ +int server_conn::send_reqpro(int proid,int cacheid){ + char *write_buf; + int write_len; + center_com_reqpro *reqpro; + + write_buf = create_combuf(CENTER_COMCODE_REQPRO,sizeof(center_com_reqpro),write_len,(void**)&reqpro); + reqpro->proid = proid; + reqpro->cacheid = cacheid; + writebytes(write_buf,write_len,NULL,NULL); +} +int server_conn::send_setpro(std::vector<std::pair<int,int> > &pro_list,int type){ int i; + int count; char *write_buf; int write_len; center_com_setpro *setpro; + judge_pro_info *pro_info; + count = pro_list.size(); write_buf = create_combuf(CENTER_COMCODE_SETPRO,sizeof(center_com_setpro) * count,write_len,(void**)&setpro); + for(i = 0;i < count;i++){ - setpro[i].proid = proid[i]; - setpro[i].cacheid = cacheid[i]; + setpro[i].proid = pro_list[i].first; + setpro[i].cacheid = pro_list[i].second; setpro[i].type = type; } writebytes(write_buf,write_len,NULL,NULL); @@ -144,6 +152,17 @@ int server_conn::send_setjmod(char **jmod_name,int *cacheid,int type,int count){ return 0; } +int server_conn::send_reqcode(int subid){ + char *write_buf; + int write_len; + center_com_reqcode *reqcode; + + write_buf = create_combuf(CENTER_COMCODE_REQCODE,sizeof(center_com_reqcode),write_len,(void**)&reqcode); + reqcode->subid = subid; + writebytes(write_buf,write_len,NULL,NULL); + + return 0; +} int server_conn::readidle(){ readbytes(new center_com_header,sizeof(center_com_header),recv_dispatch_fn,NULL); return 0; @@ -195,33 +214,14 @@ void server_conn::recv_setid(void *buf,size_t len,void *data){ } void server_conn::recv_submit(void *buf,size_t len,void *data){ center_com_submit *sub; - char *write_buf; - int write_len; - center_com_reqcode *reqcode; - std::multimap<int,center_com_submit*>::iterator sub_it; - char tpath[PATH_MAX + 1]; - struct stat st; sub = (center_com_submit*)buf; - if(server_codeconn == NULL){ - server_codeconn = server_connect(); - } + judge_manage_submit(sub->subid,sub->proid,sub->lang,(char*)((char*)buf + sizeof(center_com_submit))); - if(server_submap.find(sub->subid) == server_submap.end()){ - snprintf(tpath,sizeof(tpath),"tmp/code/%d",sub->subid); - if(!stat(tpath,&st)){ - server_queuejudge(sub,tp_judge_thfn,tp_judge_cbfn); - }else{ - write_buf = create_combuf(CENTER_COMCODE_REQCODE,sizeof(center_com_submit),write_len,(void**)&reqcode); - reqcode->subid = sub->subid; - writebytes(write_buf,write_len,NULL,NULL); - server_submap.insert(std::pair<int,center_com_submit*>(sub->subid,sub)); - } - }else{ - server_submap.insert(std::pair<int,center_com_submit*>(sub->subid,sub)); - } + delete sub; } void server_conn::recv_setpro(void *buf,size_t len,void *data){ + int ret; int i; int count; center_com_setpro *setpro; @@ -229,65 +229,55 @@ void server_conn::recv_setpro(void *buf,size_t len,void *data){ char tpath[PATH_MAX + 1]; FILE *f; int cacheid; - std::vector<int> sl_proid; - std::vector<int> sl_cacheid; - - char *write_buf; - int write_len; - center_com_reqpro *reqpro; + std::vector<std::pair<int,int> > pro_list; count = len / sizeof(center_com_setpro); setpro = (center_com_setpro*)buf; for(i = 0;i < count;i++){ if(setpro[i].type == 0){ - snprintf(tpath,sizeof(tpath),"tmp/pro/%d/cacheinfo",setpro[i].proid); - f = fopen(tpath,"r"); - if(f != NULL){ - fscanf(f,"%d",&cacheid); - fclose(f); - - if(cacheid == setpro[i].cacheid){ - sl_proid.push_back(setpro[i].proid); - sl_cacheid.push_back(setpro[i].cacheid); - continue; + ret = judge_manage_updatepro(setpro[i].proid,setpro[i].cacheid,true,NULL); + if(ret == 0){ + if(server_fileconn == NULL){ + server_fileconn = server_connect(); } + server_fileconn->send_reqpro(setpro[i].proid,setpro[i].cacheid); + }else if(ret == 1){ + pro_list.push_back(std::make_pair(setpro[i].proid,setpro[i].cacheid)); } - - if(server_fileconn == NULL){ - server_fileconn = server_connect(); - } - - write_buf = create_combuf(CENTER_COMCODE_REQPRO,sizeof(center_com_reqpro),write_len,(void**)&reqpro); - reqpro->proid = setpro[i].proid; - server_fileconn->writebytes(write_buf,write_len,NULL,NULL); }else if(setpro[i].type == 1){ } } - if(!sl_proid.empty()){ - this->send_setpro(&sl_proid[0],&sl_cacheid[0],0,sl_proid.size()); + if(!pro_list.empty()){ + this->send_setpro(pro_list,0); } delete setpro; } void server_conn::recv_sendpro(void *buf,size_t len,void *data){ center_com_sendpro *sendpro; + judge_pro_info *pro_info; char tpath[PATH_MAX + 1]; int fd; sendpro = (center_com_sendpro*)buf; - snprintf(tpath,sizeof(tpath),"tmp/propack/%d.tar.bz2",sendpro->proid); - fd = open(tpath,O_WRONLY | O_CREAT,0644); - readfile(fd,sendpro->filesize,done_sendpro_fn,sendpro); + + if(judge_manage_updatepro(sendpro->proid,sendpro->cacheid,false,&pro_info) == 0){ + snprintf(tpath,sizeof(tpath),"tmp/propack/%d_%d.tar.bz2",sendpro->proid,sendpro->cacheid); + fd = open(tpath,O_WRONLY | O_CREAT,0644); + readfile(fd,sendpro->filesize,done_sendpro_fn,pro_info); + } + + delete sendpro; } void server_conn::done_sendpro(void *buf,size_t len,void *data){ - center_com_sendpro *sendpro; + judge_pro_info *pro_info; close(*(int*)buf); - sendpro = (center_com_sendpro*)data; - server_packtp->add(tp_unpackpro_thfn,sendpro,tp_unpackpro_cbfn,sendpro); + pro_info = (judge_pro_info*)data; + judge_manage_done_updatepro(pro_info); } void server_conn::recv_setjmod(void *buf,size_t len,void *data){ int i; @@ -374,36 +364,9 @@ void server_conn::done_sendcode(void *buf,size_t len,void *data){ close(*(int*)buf); sendcode = (center_com_sendcode*)data; - server_packtp->add(tp_unpackcode_thfn,sendcode,tp_unpackcode_cbfn,sendcode); -} -void server_conn::tp_unpackpro_th(void *data){ - center_com_sendpro *sendpro; - char pack_path[PATH_MAX + 1]; - char dir_path[PATH_MAX + 1]; - char tpath[PATH_MAX + 1]; - FILE *f; - - sendpro = (center_com_sendpro*)data; - - snprintf(pack_path,sizeof(pack_path),"tmp/propack/%d.tar.bz2",sendpro->proid); - snprintf(dir_path,sizeof(dir_path),"tmp/pro/%d",sendpro->proid); - - mkdir(dir_path,0755); - server_cleardir(dir_path); - pack_unpack(pack_path,dir_path); + judge_manage_done_code(sendcode->subid); - snprintf(tpath,sizeof(tpath),"tmp/pro/%d/cacheinfo",sendpro->proid); - f = fopen(tpath,"w"); - fprintf(f,"%d",sendpro->cacheid); - fclose(f); -} -void server_conn::tp_unpackpro_cb(void *data){ - center_com_sendpro *sendpro; - - sendpro = (center_com_sendpro*)data; - send_setpro(&sendpro->proid,&sendpro->cacheid,0,1); - - delete sendpro; + delete sendcode; } void server_conn::tp_unpackjmod_th(void *data){ center_com_sendjmod *sendjmod; @@ -417,8 +380,8 @@ void server_conn::tp_unpackjmod_th(void *data){ snprintf(pack_path,sizeof(pack_path),"tmp/jmodpack/%s.tar.bz2",sendjmod->jmod_name); snprintf(dir_path,sizeof(dir_path),"tmp/jmod/%s",sendjmod->jmod_name); mkdir(dir_path,0755); - server_cleardir(dir_path); - pack_unpack(pack_path,dir_path); + tool_cleardir(dir_path); + tool_unpack(pack_path,dir_path); snprintf(tpath,sizeof(tpath),"tmp/jmod/%s/cacheinfo",sendjmod->jmod_name); f = fopen(tpath,"w"); @@ -435,175 +398,30 @@ void server_conn::tp_unpackjmod_cb(void *data){ delete sendjmod; } -void server_conn::tp_unpackcode_th(void *data){ - center_com_sendcode *sendcode; - char pack_path[PATH_MAX + 1]; - char dir_path[PATH_MAX + 1]; - char tpath[PATH_MAX + 1]; - FILE *f; - - sendcode = (center_com_sendcode*)data; - - snprintf(pack_path,sizeof(pack_path),"tmp/codepack/%d.tar.bz2",sendcode->subid); - snprintf(dir_path,sizeof(dir_path),"tmp/code/%d",sendcode->subid); - mkdir(dir_path,0755); - server_cleardir(dir_path); - pack_unpack(pack_path,dir_path); -} -void server_conn::tp_unpackcode_cb(void *data){ - center_com_sendcode *sendcode; - int subid; - std::multimap<int,center_com_submit*>::iterator sub_it; - center_com_submit *sub; - - sendcode = (center_com_sendcode*)data; - subid = sendcode->subid; - - while((sub_it = server_submap.find(subid)) != server_submap.end()){ - sub = sub_it->second; - server_queuejudge(sub,tp_judge_thfn,tp_judge_cbfn); - server_submap.erase(sub_it); - } - - delete sendcode; -} -void server_conn::tp_judge_th(void *data){ - server_judgeth_info *th_info; - center_com_submit *sub; - char pro_path[PATH_MAX + 1]; - char code_path[PATH_MAX + 1]; - - th_info = (server_judgeth_info*)data; - sub = th_info->sub; - - snprintf(pro_path,sizeof(pro_path),"tmp/pro/%d",sub->proid); - snprintf(code_path,sizeof(code_path),"tmp/code/%d",sub->subid); - server_judge(sub->subid,pro_path,code_path,th_info->run_path,sub->lang,sub->set_data,th_info->res_data,th_info->res_len); -} -void server_conn::tp_judge_cb(void *data){ - server_judgeth_info *th_info; - center_com_submit *sub; - - th_info = (server_judgeth_info*)data; - sub = th_info->sub; - send_result(sub->subid,th_info->res_data,th_info->res_len); - th_info->use_flag = false; - th_info->sub = NULL; - delete sub; -} - -static int server_queuejudge(center_com_submit *sub,tpool_protofn *th_fn,tpool_protofn *cb_fn){ - int i; - - printf("get submit %d %d\n",sub->subid,sub->proid); - for(i = 0;i < 8;i++){ - if(server_judgepool[i]->use_flag == false){ - server_judgepool[i]->use_flag = true; - server_judgepool[i]->sub = sub; - server_judgetp->add(th_fn,server_judgepool[i],cb_fn,server_judgepool[i]); - break; - } - } - +int judge_server_addtpool(tpool *tpinfo){ + server_addepev(tpinfo->fd,EPOLLIN | EPOLLET,SERVER_EPEV_TPOOL,tpinfo); return 0; } -static int server_judge(int subid,char *pro_path,char *code_path,char *run_path,int lang,char *set_data,char *res_data,size_t &res_len){ - judgm_line_info *line_info; - int pid; - - char tpath[PATH_MAX + 1]; - FILE *set_file; - char cwd_path[PATH_MAX + 1]; - char jmod_name[NAME_MAX + 1]; - char line_path[PATH_MAX + 1]; - char check_name[NAME_MAX + 1]; - char check_path[PATH_MAX + 1]; - char lchr; - char tchr; - - int judgk_modfd; - void *line_dll; - void *check_dll; - judgm_line_run_fn run_fn; - - snprintf(tpath,sizeof(tpath),"%s/setting",pro_path); - set_file = fopen(tpath,"r"); - - getcwd(cwd_path,sizeof(cwd_path)); - fscanf(set_file,"%s",jmod_name); - snprintf(line_path,sizeof(line_path),"%s/tmp/jmod/%s/%s_line.so",cwd_path,jmod_name,jmod_name); - fscanf(set_file,"%s",check_name); - if(check_name[0] == '/'){ - snprintf(check_path,sizeof(check_path),"%s/%s/private%s.so",cwd_path,pro_path,check_name); - }else{ - snprintf(check_path,sizeof(check_path),"%s/tmp/jmod/%s/%s.so",cwd_path,jmod_name,check_name); - } - - lchr = '\n'; - while((tchr = fgetc(set_file)) != EOF){ - if(lchr == '\n' && tchr == '='){ - while(fgetc(set_file) != '\n'); - break; - } - lchr = tchr; - } - - judgk_modfd = open("/dev/judgk",O_RDWR); - line_dll = dlopen(line_path,RTLD_NOW); - check_dll = dlopen(check_path,RTLD_NOW); - - line_info = (judgm_line_info*)mmap(NULL,sizeof(struct judgm_line_info),PROT_READ | PROT_WRITE,MAP_SHARED | MAP_ANONYMOUS,-1,0); - - line_info->subid = subid; - - line_info->pro_path = pro_path; - line_info->code_path = code_path; - line_info->run_path = run_path; - - line_info->judgk_modfd = judgk_modfd; - line_info->line_dll = line_dll; - line_info->check_dll = check_dll; - - line_info->lang = lang; - line_info->set_file = set_file; - line_info->set_data = set_data; - - server_cleardir(line_info->run_path); - - run_fn = (judgm_line_run_fn)dlsym(line_dll,"run"); - if((pid = fork()) == 0){ - run_fn(line_info); - exit(0); - } - waitpid(pid,NULL,0); - - memcpy(res_data,line_info->res_data,line_info->res_len); - res_len = line_info->res_len; - - munmap(line_info,sizeof(judgm_line_info)); - fclose(set_file); - close(judgk_modfd); +int judge_server_setpro(std::vector<std::pair<int,int> > &pro_list){ + server_mainconn->send_setpro(pro_list,0); return 0; } -static int server_cleardir_callback(const char *path,const struct stat *st,int flag,struct FTW *ftw_buf){ - if(ftw_buf->level == 0){ - return 0; +int judge_server_reqcode(int subid){ + if(server_codeconn == NULL){ + server_codeconn = server_connect(); } + server_codeconn->send_reqcode(subid); - if(S_ISDIR(st->st_mode)){ - rmdir(path); - }else{ - unlink(path); - } return 0; } -static int server_cleardir(char *path){ - nftw(path,server_cleardir_callback,64,FTW_DEPTH | FTW_PHYS); +int judge_server_result(int subid,char *res_data,int res_len){ + server_mainconn->send_result(subid,res_data,res_len); return 0; } + static int server_addepev(int fd,unsigned int flag,int type,void *data){ server_epevdata *epevdata; epoll_event epev; @@ -630,7 +448,7 @@ static server_conn* server_connect(){ caddr.sin_family = AF_INET; caddr.sin_port = htons(SERVER_JUDGE_PORT); //caddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); - caddr.sin_addr.s_addr = inet_addr("10.8.0.1"); + caddr.sin_addr.s_addr = inet_addr("10.8.0.2"); cinfo = new server_conn(cfd); server_addepev(cfd,EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLET,SERVER_EPEV_JUDGECLIENT,cinfo); @@ -654,22 +472,18 @@ int main(){ server_epfd = epoll_create1(0); server_id = 0; - server_avail = 2; + server_avail = JUDGE_THREAD_JUDGEMAX; server_mainconn = server_connect(); server_fileconn = NULL; server_codeconn = NULL; + judge_manage_init(); + server_packtp = new tpool(4); + judge_server_addtpool(server_packtp); server_packtp->start(); - server_addepev(server_packtp->fd,EPOLLIN | EPOLLET,SERVER_EPEV_TPOOL,server_packtp); - - for(i = 0;i < 8;i++){ - server_judgepool[i] = new server_judgeth_info(i); - } - server_judgetp = new tpool(8); - server_judgetp->start(); - server_addepev(server_judgetp->fd,EPOLLIN | EPOLLET,SERVER_EPEV_TPOOL,server_judgetp); + while(true){ nevs = epoll_wait(server_epfd,epevs,SERVER_EPOLL_MAXEVENT,-1); for(i = 0;i < nevs;i++){ |