aboutsummaryrefslogblamecommitdiffstats
path: root/meowpp/test.py
blob: cbb7b8d9f399993e0f0fc20b0d5fceea6a769dd9 (plain) (tree)


















                      
                                                                            

                           
                   































































































                                                                                
                       































                                                                               
                   
                                                 
                                                        

































                                                               
                              
             
                                      






                                  
                        
                             
                                                              

                 
                                

                             
                                                                                

















































                                                                        









                                                    
           
                   













                                       
#! /usr/bin/env python

import getopt
import os
import Queue
import re
import signal
import subprocess
import sys
import time
import threading

TIME_SLICE = 0.1
WAIT_TIMEOUT = 0.1
TERMINATE_TIMEOUT = 5
JOIN_TIMEOUT = 1

class Default(object):
    COMPILER = 'g++'
    COMPILE_ARGS = '-g -Wall -Werror -std=c++11 -pedantic -pthread -lstdc++'
    PROJ_PATH = os.getcwd()
    TEST_PATH = os.getcwd()
    TIME_LIMIT = 30
    WORKER_NUM = 4
    SUFFIX = '_test.cpp'


class Params(object):
    compiler = Default.COMPILER
    compile_args = Default.COMPILE_ARGS
    proj_path = Default.PROJ_PATH
    test_path = Default.TEST_PATH
    time_limit = Default.TIME_LIMIT
    worker_num = Default.WORKER_NUM
    suffix = Default.SUFFIX

    PARSE_HELP = 0
    PARSE_ERROR = -1
    PARSE_ACCEPT = 1

    @staticmethod
    def Parse(argv):
        optlist, args = getopt.getopt(argv, 'c:a:hw:p:P:s:t:')
        for (opt, arg) in optlist:
            if opt == '-c':
                Params.compiler = arg
            elif opt == '-a':
                Params.compile_args = arg
            elif opt == '-h':
                return (Params.PARSE_HELP, Params.HelpString())
            elif opt == '-p':
                Params.proj_path = arg
            elif opt == '-P':
                Params.test_path = arg
            elif opt == '-s':
                Params.suffix = arg
            elif opt == '-t':
                Params.time_limit = int(arg)
            elif opt == '-w':
                Params.worker_num = int(arg)
            else:
                return (Params.PARSE_ERROR, 'Error: Unknown argument %r' % opt)
        return (Params.PARSE_ACCEPT, '')

    @staticmethod
    def HelpString():
        ret = '\n'.join([
            'USAGE:',
            '    test.py [-c <compiler>] [-a <arguments>] [-p <proj_path>]' +
            ' [-P <test_path>] [-s <suffix>] [-t <time_limit>]' +
            ' [-w <worker_num] [-h]',
            '',
            'ARGUMENTS:',
            '    -c <compiler>    Specifies the compiler, default=%s' %
                    Default.COMPILER,
            '    -a <arguments>   Specifies the compile arguments, default=%s' %
                    Default.COMPILE_ARGS,
            '    -p <proj_path>   Specifies the project pathname, default=%s' %
                    Default.PROJ_PATH,
            '    -P <test_path>   Specifies the unittest pathname, default=%s' %
                    Default.TEST_PATH,
            '    -s <suffix>      Specifies the suffix string for the testing' +
                    ' file, default=%s' % Default.SUFFIX,
            '    -t <time_limit>  Specifies the time limit for each testing, ' +
                    'default=%d' % Default.TIME_LIMIT,
            '    -w <worker_num>  Specifies the number of workers, default=%d' %
                    Default.WORKER_NUM,
        ])
        return ret

    @staticmethod
    def PrintParams():
        print('Compiler & its arguments: %r' % GetCompileCmd())
        print('Time limit per test: %r' % Params.time_limit)
        print('Number of workers: %r' % Params.worker_num)
        print('Project path: %r' % Params.proj_path)
        print('Unittest files path: %r (with suffix=%r)' %
              (Params.test_path, Params.suffix))

def GetCompileCmd():
    return ' '.join([Params.compiler,
                     '-I %r' % Params.proj_path,
                     Params.compile_args])


class Testing(object):
    def __init__(self, path, filename):
        self._name = filename
        self._source_pathname = path + '/' + filename
        self._exec_pathname = path + '/' + self._name + '.bin'
        self._log_pathname = path + '/' + self._name + '.log'
        self._proc = None

    def Test(self):
        self._InitLogfile()
        if not self._Compile():
            return (False, 'Compile error')
        self._Run()
        if self._WaitTimeout(Params.time_limit) is None:
            self.Stop()
            return (False, 'Timeout')
        return (True, '') if self._IsSuccessful() else (False, 'Test failure.')

    def _InitLogfile(self):
        if not os.path.isfile(self._log_pathname):
            with open(self._log_pathname, 'w'):
                pass
        else:
            with open(self._log_pathname, 'a') as f:
                f.write(('=' * 72) + '\n')

    def _Compile(self):
        retcode = None
        try:
            compile_cmd = GetCompileCmd()
            retcode = subprocess.call(compile_cmd +
                                      " -o '%s' " % self._exec_pathname +
                                      self._source_pathname +
                                      ' >>%s' % self._log_pathname +
                                      ' 2>&1',
                                      shell=True)
        except:
            pass
        return (retcode == 0)

    def _Run(self):
        self._proc = subprocess.Popen(self._exec_pathname +
                                      ' >>%s' % self._log_pathname +
                                      ' 2>&1',
                                      shell=True,
                                      preexec_fn=os.setsid)

    def Stop(self):
        os.killpg(self._proc.pid, signal.SIGTERM)
        if self._WaitTimeout(TERMINATE_TIMEOUT) is None:
            os.killpg(self._proc.pid, signal.SIGKILL)

    def _IsSuccessful(self):
        return (self._proc.returncode == 0)

    def _WaitTimeout(self, timeout):
        time_sum = 0
        while self._proc.poll() is None and time_sum < timeout:
            time.sleep(TIME_SLICE)
            time_sum += TIME_SLICE
        return self._proc.poll()

    @property
    def name(self):
        return self._name

    @property
    def log_filename(self):
        return self._log_pathname


class StopWorkerNotify(object):
    pass

def Worker(job_queue):
    while True:
        try:
            job = job_queue.get(True, WAIT_TIMEOUT)
        except:
            continue
        if isinstance(job, StopWorkerNotify):
            break
        succ, reason = job.Test()
        if succ:
            Log.Pass(job.name)
        else:
            Log.Fail(job.name, reason)


class Log(object):
    _all_pass = True
    _print_lock = threading.Lock()

    @staticmethod
    def Pass(test_name):
        with Log._print_lock:
            print('\033[32m%r >>> Passed\033[39m' % test_name)

    @staticmethod
    def Fail(test_name, reason):
        Log._all_pass = False
        with Log._print_lock:
            print('\033[31m%r >>> Failed for %r \033[39m' % (test_name, reason))

    @staticmethod
    def IsAllPass():
        return Log._all_pass


def ParseArgv():
    result, result_str = Params.Parse(sys.argv[1:])
    if result == Params.PARSE_HELP:
        print(result_str)
        return (False, 0)
    elif result == Params.PARSE_ERROR:
        sys.stderr.write(result_str)
        return (False, 1)
    return (True, 0)


def CreateAndStartWorkers(job_queue):
    workers = []
    for i in range(Params.worker_num):
        worker = threading.Thread(target=Worker, args=(job_queue,))
        worker.daemon = True
        worker.start()
        workers += [worker]
    return workers

def PutsJobs(job_queue):
    for (path, unused_dirnames, filenames) in os.walk(Params.test_path):
        for filename in filenames:
            if filename.endswith(Params.suffix):
                job_queue.put(Testing(path, filename))

def JoinWorkers(workers, job_queue):
    for i in range(Params.worker_num):
        job_queue.put(StopWorkerNotify())
    while workers:
        alive_workers = []
        for worker in workers:
            worker.join(timeout=JOIN_TIMEOUT)
            if worker.isAlive():
                alive_workers += [worker]
        workers = alive_workers

def RunTestings():
    job_queue = Queue.Queue()
    workers = CreateAndStartWorkers(job_queue)
    PutsJobs(job_queue)
    JoinWorkers(workers, job_queue)
    return (0 if Log.IsAllPass else 1)

def SignalTermHandler(signal, frame):
    pass

def SignalIntHandler(signal, frame):
    pass

def HandleSignals():
    signal.signal(signal.SIGTERM, SignalTermHandler)
    signal.signal(signal.SIGINT, SignalIntHandler)

def Main():
    HandleSignals()
    keep_going, exit_code = ParseArgv()
    if not keep_going:
        return exit_code

    Params.PrintParams()

    exit_code = RunTestings()
    return exit_code


if __name__ == '__main__':
    exit_code = Main()
    exit(exit_code)