#! /usr/bin/env python
import getopt
import os
import Queue
import re
import signal
import subprocess
import sys
import time
import threading
TIME_SLICE = 0.1
WAIT_TIMEOUT = 0.1
TERMINATE_TIMEOUT = 5
JOIN_TIMEOUT = 1
class Default(object):
COMPILER = 'g++'
COMPILE_ARGS = '-g -Wall -Werror -std=c++11 -pedantic -pthread -lstdc++'
PROJ_PATH = os.getcwd()
TEST_PATH = os.getcwd()
TIME_LIMIT = 30
WORKER_NUM = 4
SUFFIX = '_test.cpp'
class Params(object):
compiler = Default.COMPILER
compile_args = Default.COMPILE_ARGS
proj_path = Default.PROJ_PATH
test_path = Default.TEST_PATH
time_limit = Default.TIME_LIMIT
worker_num = Default.WORKER_NUM
suffix = Default.SUFFIX
PARSE_HELP = 0
PARSE_ERROR = -1
PARSE_ACCEPT = 1
@staticmethod
def Parse(argv):
optlist, args = getopt.getopt(argv, 'c:a:hw:p:P:s:t:')
for (opt, arg) in optlist:
if opt == '-c':
Params.compiler = arg
elif opt == '-a':
Params.compile_args = arg
elif opt == '-h':
return (Params.PARSE_HELP, Params.HelpString())
elif opt == '-p':
Params.proj_path = arg
elif opt == '-P':
Params.test_path = arg
elif opt == '-s':
Params.suffix = arg
elif opt == '-t':
Params.time_limit = int(arg)
elif opt == '-w':
Params.worker_num = int(arg)
else:
return (Params.PARSE_ERROR, 'Error: Unknown argument %r' % opt)
return (Params.PARSE_ACCEPT, '')
@staticmethod
def HelpString():
ret = '\n'.join([
'USAGE:',
' test.py [-c <compiler>] [-a <arguments>] [-p <proj_path>]' +
' [-P <test_path>] [-s <suffix>] [-t <time_limit>]' +
' [-w <worker_num] [-h]',
'',
'ARGUMENTS:',
' -c <compiler> Specifies the compiler, default=%s' %
Default.COMPILER,
' -a <arguments> Specifies the compile arguments, default=%s' %
Default.COMPILE_ARGS,
' -p <proj_path> Specifies the project pathname, default=%s' %
Default.PROJ_PATH,
' -P <test_path> Specifies the unittest pathname, default=%s' %
Default.TEST_PATH,
' -s <suffix> Specifies the suffix string for the testing' +
' file, default=%s' % Default.SUFFIX,
' -t <time_limit> Specifies the time limit for each testing, ' +
'default=%d' % Default.TIME_LIMIT,
' -w <worker_num> Specifies the number of workers, default=%d' %
Default.WORKER_NUM,
])
return ret
@staticmethod
def PrintParams():
print('Compiler & its arguments: %r' % GetCompileCmd())
print('Time limit per test: %r' % Params.time_limit)
print('Number of workers: %r' % Params.worker_num)
print('Project path: %r' % Params.proj_path)
print('Unittest files path: %r (with suffix=%r)' %
(Params.test_path, Params.suffix))
def GetCompileCmd():
return ' '.join([Params.compiler,
'-I %r' % Params.proj_path,
Params.compile_args])
class Testing(object):
def __init__(self, path, filename):
self._name = filename
self._source_pathname = path + '/' + filename
self._exec_pathname = path + '/' + self._name + '.bin'
self._log_pathname = path + '/' + self._name + '.log'
self._proc = None
def Test(self):
self._InitLogfile()
if not self._Compile():
return (False, 'Compile error')
self._Run()
if self._WaitTimeout(Params.time_limit) is None:
self.Stop()
return (False, 'Timeout')
return (True, '') if self._IsSuccessful() else (False, 'Test failure.')
def _InitLogfile(self):
if not os.path.isfile(self._log_pathname):
with open(self._log_pathname, 'w'):
pass
else:
with open(self._log_pathname, 'a') as f:
f.write(('=' * 72) + '\n')
def _Compile(self):
retcode = None
try:
compile_cmd = GetCompileCmd()
retcode = subprocess.call(compile_cmd +
" -o '%s' " % self._exec_pathname +
self._source_pathname +
' >>%s' % self._log_pathname +
' 2>&1',
shell=True)
except:
pass
return (retcode == 0)
def _Run(self):
self._proc = subprocess.Popen(self._exec_pathname +
' >>%s' % self._log_pathname +
' 2>&1',
shell=True,
preexec_fn=os.setsid)
def Stop(self):
os.killpg(self._proc.pid, signal.SIGTERM)
if self._WaitTimeout(TERMINATE_TIMEOUT) is None:
os.killpg(self._proc.pid, signal.SIGKILL)
def _IsSuccessful(self):
return (self._proc.returncode == 0)
def _WaitTimeout(self, timeout):
time_sum = 0
while self._proc.poll() is None and time_sum < timeout:
time.sleep(TIME_SLICE)
time_sum += TIME_SLICE
return self._proc.poll()
@property
def name(self):
return self._name
@property
def log_filename(self):
return self._log_pathname
class StopWorkerNotify(object):
pass
def Worker(job_queue):
while True:
try:
job = job_queue.get(True, WAIT_TIMEOUT)
except:
continue
if isinstance(job, StopWorkerNotify):
break
succ, reason = job.Test()
if succ:
Log.Pass(job.name)
else:
Log.Fail(job.name, reason)
class Log(object):
_all_pass = True
_print_lock = threading.Lock()
@staticmethod
def Pass(test_name):
with Log._print_lock:
print('\033[32m%r >>> Passed\033[39m' % test_name)
@staticmethod
def Fail(test_name, reason):
Log._all_pass = False
with Log._print_lock:
print('\033[31m%r >>> Failed for %r \033[39m' % (test_name, reason))
@staticmethod
def IsAllPass():
return Log._all_pass
def ParseArgv():
result, result_str = Params.Parse(sys.argv[1:])
if result == Params.PARSE_HELP:
print(result_str)
return (False, 0)
elif result == Params.PARSE_ERROR:
sys.stderr.write(result_str)
return (False, 1)
return (True, 0)
def CreateAndStartWorkers(job_queue):
workers = []
for i in range(Params.worker_num):
worker = threading.Thread(target=Worker, args=(job_queue,))
worker.daemon = True
worker.start()
workers += [worker]
return workers
def PutsJobs(job_queue):
for (path, unused_dirnames, filenames) in os.walk(Params.test_path):
for filename in filenames:
if filename.endswith(Params.suffix):
job_queue.put(Testing(path, filename))
def JoinWorkers(workers, job_queue):
for i in range(Params.worker_num):
job_queue.put(StopWorkerNotify())
while workers:
alive_workers = []
for worker in workers:
worker.join(timeout=JOIN_TIMEOUT)
if worker.isAlive():
alive_workers += [worker]
workers = alive_workers
def RunTestings():
job_queue = Queue.Queue()
workers = CreateAndStartWorkers(job_queue)
PutsJobs(job_queue)
JoinWorkers(workers, job_queue)
return (0 if Log.IsAllPass else 1)
def SignalTermHandler(signal, frame):
pass
def SignalIntHandler(signal, frame):
pass
def HandleSignals():
signal.signal(signal.SIGTERM, SignalTermHandler)
signal.signal(signal.SIGINT, SignalIntHandler)
def Main():
HandleSignals()
keep_going, exit_code = ParseArgv()
if not keep_going:
return exit_code
Params.PrintParams()
exit_code = RunTestings()
return exit_code
if __name__ == '__main__':
exit_code = Main()
exit(exit_code)