summaryrefslogblamecommitdiffhomepage
path: root/test/unit/main.py
blob: 2ee432517f24da43bec14c84b4a6b4cffbacfc70 (plain) (tree)
1
2
3
4
5



           
            








                                   

                      








                                                                     
                              



                    
                  








                                                  



                                                 

                                                      






                                                                













                                                  



                                                    























































                                                                         



                                        
                                                                               




                                                              


































                                                                     
                                                  








                                                               

                                                   
















                                                                               




                                                 








                                                            
                               

































                                                                                
                            























































































                                                                               






                                    






                                         
                                     
 

                                    


                                                              

                                                                 
import os
import re
import sys
import time
import fcntl
import shutil
import argparse
import platform
import tempfile
import unittest
import subprocess
from multiprocessing import Process


available_modules = {}

class TestUnit(unittest.TestCase):

    current_dir = os.path.abspath(
        os.path.join(os.path.dirname(__file__), os.pardir)
    )
    pardir = os.path.abspath(
        os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
    )
    architecture = platform.architecture()[0]
    system = platform.system()
    maxDiff = None

    detailed = False
    save_log = False
    unsafe = False

    def __init__(self, methodName='runTest'):
        super().__init__(methodName)

        if re.match(r'.*\/run\.py$', sys.argv[0]):
            args, rest = TestUnit._parse_args()

            TestUnit._set_args(args)

    def run(self, result=None):
        if not hasattr(self, 'application_type'):
            return super().run(result)

        # rerun test for each available module version

        type = self.application_type
        for prerequisite in self.prerequisites:
            if prerequisite in available_modules:
                for version in available_modules[prerequisite]:
                    self.application_type = type + ' ' + version
                    super().run(result)

    @classmethod
    def main(cls):
        args, rest = TestUnit._parse_args()

        for i, arg in enumerate(rest):
            if arg[:5] == 'test_':
                rest[i] = cls.__name__ + '.' + arg

        sys.argv = sys.argv[:1] + rest

        TestUnit._set_args(args)

        unittest.main()

    @classmethod
    def setUpClass(cls):
        TestUnit().check_modules(*cls.prerequisites)

    def setUp(self):
        self._run()

    def tearDown(self):
        self.stop()

        # detect errors and failures for current test

        def list2reason(exc_list):
            if exc_list and exc_list[-1][0] is self:
                return exc_list[-1][1]

        if hasattr(self, '_outcome'):
            result = self.defaultTestResult()
            self._feedErrorsToResult(result, self._outcome.errors)
        else:
            result = getattr(
                self, '_outcomeForDoCleanups', self._resultForDoCleanups
            )

        success = not list2reason(result.errors) and not list2reason(
            result.failures
        )

        # check unit.log for alerts

        unit_log = self.testdir + '/unit.log'

        with open(unit_log, 'r', encoding='utf-8', errors='ignore') as f:
            self._check_alerts(f.read())

        # remove unit.log

        if not TestUnit.save_log and success:
            shutil.rmtree(self.testdir)

        else:
            self._print_path_to_log()

    def check_modules(self, *modules):
        self._run()

        for i in range(50):
            with open(self.testdir + '/unit.log', 'r') as f:
                log = f.read()
                m = re.search('controller started', log)

                if m is None:
                    time.sleep(0.1)
                else:
                    break

        if m is None:
            self.stop()
            exit("Unit is writing log too long")

        # discover all available modules

        global available_modules
        available_modules = {}
        for module in re.findall(r'module: ([a-zA-Z]+) (.*) ".*"$', log, re.M):
            if module[0] not in available_modules:
                available_modules[module[0]] = [module[1]]
            else:
                available_modules[module[0]].append(module[1])

        missed_module = ''
        for module in modules:
            if module == 'go':
                env = os.environ.copy()
                env['GOPATH'] = self.pardir + '/go'

                try:
                    process = subprocess.Popen(
                        [
                            'go',
                            'build',
                            '-o',
                            self.testdir + '/go/check_module',
                            self.current_dir + '/go/empty/app.go',
                        ],
                        env=env,
                    )
                    process.communicate()

                    m = module if process.returncode == 0 else None

                except:
                    m = None

            elif module == 'node':
                if os.path.isdir(self.pardir + '/node/node_modules'):
                    m = module
                else:
                    m = None

            elif module == 'openssl':
                try:
                    subprocess.check_output(['which', 'openssl'])

                    output = subprocess.check_output(
                        [self.unitd, '--version'],
                        stderr=subprocess.STDOUT,
                    )

                    m = re.search('--openssl', output.decode())

                except:
                    m = None

            else:
                if module not in available_modules:
                    m = None

            if m is None:
                missed_module = module
                break

        self.stop()
        self._check_alerts(log)
        shutil.rmtree(self.testdir)

        if missed_module:
            raise unittest.SkipTest('Unit has no ' + missed_module + ' module')

    def stop(self):
        if self._started:
            self._stop()

    def _run(self):
        self.unitd = self.pardir + '/build/unitd'

        if not os.path.isfile(self.unitd):
            exit("Could not find unit")

        self.testdir = tempfile.mkdtemp(prefix='unit-test-')

        os.mkdir(self.testdir + '/state')

        print()

        def _run_unit():
            subprocess.call(
                [
                    self.unitd,
                    '--no-daemon',
                    '--modules',  self.pardir + '/build',
                    '--state',    self.testdir + '/state',
                    '--pid',      self.testdir + '/unit.pid',
                    '--log',      self.testdir + '/unit.log',
                    '--control',  'unix:' + self.testdir + '/control.unit.sock',
                ]
            )

        self._p = Process(target=_run_unit)
        self._p.start()

        if not self.waitforfiles(
            self.testdir + '/unit.pid',
            self.testdir + '/unit.log',
            self.testdir + '/control.unit.sock',
        ):
            exit("Could not start unit")

        self._started = True

        self.skip_alerts = [
            r'read signalfd\(4\) failed',
            r'sendmsg.+failed',
            r'recvmsg.+failed',
        ]
        self.skip_sanitizer = False

    def _stop(self):
        with open(self.testdir + '/unit.pid', 'r') as f:
            pid = f.read().rstrip()

        subprocess.call(['kill', '-s', 'QUIT', pid])

        for i in range(150):
            if not os.path.exists(self.testdir + '/unit.pid'):
                break
            time.sleep(0.1)

        if os.path.exists(self.testdir + '/unit.pid'):
            exit("Could not terminate unit")

        self._started = False

        self._p.join(timeout=1)
        self._terminate_process(self._p)

    def _terminate_process(self, process):
        if process.is_alive():
            process.terminate()
            process.join(timeout=5)

            if process.is_alive():
                exit("Could not terminate process " + process.pid)

        if process.exitcode:
            exit("Child process terminated with code " + str(process.exitcode))

    def _check_alerts(self, log):
        found = False

        alerts = re.findall('.+\[alert\].+', log)

        if alerts:
            print('All alerts/sanitizer errors found in log:')
            [print(alert) for alert in alerts]
            found = True

        if self.skip_alerts:
            for skip in self.skip_alerts:
                alerts = [al for al in alerts if re.search(skip, al) is None]

        if alerts:
            self._print_path_to_log()
            self.assertFalse(alerts, 'alert(s)')

        if not self.skip_sanitizer:
            sanitizer_errors = re.findall('.+Sanitizer.+', log)

            if sanitizer_errors:
                self._print_path_to_log()
                self.assertFalse(sanitizer_errors, 'sanitizer error(s)')

        if found:
            print('skipped.')

    def waitforfiles(self, *files):
        for i in range(50):
            wait = False
            ret = False

            for f in files:
                if not os.path.exists(f):
                    wait = True
                    break

            if wait:
                time.sleep(0.1)

            else:
                ret = True
                break

        return ret

    @staticmethod
    def _parse_args():
        parser = argparse.ArgumentParser(add_help=False)

        parser.add_argument(
            '-d',
            '--detailed',
            dest='detailed',
            action='store_true',
            help='Detailed output for tests',
        )
        parser.add_argument(
            '-l',
            '--log',
            dest='save_log',
            action='store_true',
            help='Save unit.log after the test execution',
        )
        parser.add_argument(
            '-u',
            '--unsafe',
            dest='unsafe',
            action='store_true',
            help='Run unsafe tests',
        )

        return parser.parse_known_args()

    @staticmethod
    def _set_args(args):
        TestUnit.detailed = args.detailed
        TestUnit.save_log = args.save_log
        TestUnit.unsafe = args.unsafe

        # set stdout to non-blocking

        if TestUnit.detailed:
            fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, 0)

    def _print_path_to_log(self):
        print('Path to unit.log:\n' + self.testdir + '/unit.log')