summaryrefslogblamecommitdiffhomepage
path: root/test/unit/main.py
blob: 2ab0626b3b1e023b3a3758d351f700f551bece75 (plain) (tree)
















































                                                                     



                                                    





































































































































































































































































                                                                                
import os
import re
import sys
import time
import shutil
import argparse
import platform
import tempfile
import unittest
import subprocess
from multiprocessing import Process


class TestUnit(unittest.TestCase):

    current_dir = os.path.abspath(
        os.path.join(os.path.dirname(__file__), os.pardir)
    )
    pardir = os.path.abspath(
        os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
    )
    architecture = platform.architecture()[0]
    maxDiff = None

    detailed = False
    save_log = False

    def __init__(self, methodName='runTest'):
        super().__init__(methodName)

        if re.match(r'.*\/run\.py$', sys.argv[0]):
            args, rest = TestUnit._parse_args()

            TestUnit._set_args(args)

    @classmethod
    def main(cls):
        args, rest = TestUnit._parse_args()

        for i, arg in enumerate(rest):
            if arg[:5] == 'test_':
                rest[i] = cls.__name__ + '.' + arg

        sys.argv = sys.argv[:1] + rest

        TestUnit._set_args(args)

        unittest.main()

    @classmethod
    def setUpClass(cls):
        TestUnit().check_modules(*cls.prerequisites)

    def setUp(self):
        self._run()

    def tearDown(self):
        self.stop()

        # detect errors and failures for current test

        def list2reason(exc_list):
            if exc_list and exc_list[-1][0] is self:
                return exc_list[-1][1]

        if hasattr(self, '_outcome'):
            result = self.defaultTestResult()
            self._feedErrorsToResult(result, self._outcome.errors)
        else:
            result = getattr(
                self, '_outcomeForDoCleanups', self._resultForDoCleanups
            )

        success = not list2reason(result.errors) and not list2reason(
            result.failures
        )

        # check unit.log for alerts

        unit_log = self.testdir + '/unit.log'

        with open(unit_log, 'r', encoding='utf-8', errors='ignore') as f:
            self._check_alerts(f.read())

        # remove unit.log

        if not TestUnit.save_log and success:
            shutil.rmtree(self.testdir)

        else:
            self._print_path_to_log()

    def check_modules(self, *modules):
        self._run()

        for i in range(50):
            with open(self.testdir + '/unit.log', 'r') as f:
                log = f.read()
                m = re.search('controller started', log)

                if m is None:
                    time.sleep(0.1)
                else:
                    break

        if m is None:
            self.stop()
            exit("Unit is writing log too long")

        missed_module = ''
        for module in modules:
            if module == 'go':
                env = os.environ.copy()
                env['GOPATH'] = self.pardir + '/go'

                try:
                    process = subprocess.Popen(
                        [
                            'go',
                            'build',
                            '-o',
                            self.testdir + '/go/check_module',
                            self.current_dir + '/go/empty/app.go',
                        ],
                        env=env,
                    )
                    process.communicate()

                    m = module if process.returncode == 0 else None

                except:
                    m = None

            elif module == 'node':
                if os.path.isdir(self.pardir + '/node/node_modules'):
                    m = module
                else:
                    m = None

            elif module == 'openssl':
                try:
                    subprocess.check_output(['which', 'openssl'])

                    output = subprocess.check_output(
                        [self.pardir + '/build/unitd', '--version'],
                        stderr=subprocess.STDOUT,
                    )

                    m = re.search('--openssl', output.decode())

                except:
                    m = None

            else:
                m = re.search('module: ' + module, log)

            if m is None:
                missed_module = module
                break

        self.stop()
        self._check_alerts(log)
        shutil.rmtree(self.testdir)

        if missed_module:
            raise unittest.SkipTest('Unit has no ' + missed_module + ' module')

    def stop(self):
        if self._started:
            self._stop()

    def _run(self):
        self.testdir = tempfile.mkdtemp(prefix='unit-test-')

        os.mkdir(self.testdir + '/state')

        print()

        def _run_unit():
            subprocess.call(
                [
                    self.pardir + '/build/unitd',
                    '--no-daemon',
                    '--modules',  self.pardir + '/build',
                    '--state',    self.testdir + '/state',
                    '--pid',      self.testdir + '/unit.pid',
                    '--log',      self.testdir + '/unit.log',
                    '--control',  'unix:' + self.testdir + '/control.unit.sock',
                ]
            )

        self._p = Process(target=_run_unit)
        self._p.start()

        if not self.waitforfiles(
            self.testdir + '/unit.pid',
            self.testdir + '/unit.log',
            self.testdir + '/control.unit.sock',
        ):
            exit("Could not start unit")

        self._started = True

        self.skip_alerts = [
            r'read signalfd\(4\) failed',
            r'sendmsg.+failed',
            r'recvmsg.+failed',
        ]
        self.skip_sanitizer = False

    def _stop(self):
        with open(self.testdir + '/unit.pid', 'r') as f:
            pid = f.read().rstrip()

        subprocess.call(['kill', '-s', 'QUIT', pid])

        for i in range(50):
            if not os.path.exists(self.testdir + '/unit.pid'):
                break
            time.sleep(0.1)

        if os.path.exists(self.testdir + '/unit.pid'):
            exit("Could not terminate unit")

        self._started = False

        self._p.join(timeout=1)
        self._terminate_process(self._p)

    def _terminate_process(self, process):
        if process.is_alive():
            process.terminate()
            process.join(timeout=5)

            if process.is_alive():
                exit("Could not terminate process " + process.pid)

        if process.exitcode:
            exit("Child process terminated with code " + str(process.exitcode))

    def _check_alerts(self, log):
        found = False

        alerts = re.findall('.+\[alert\].+', log)

        if alerts:
            print('All alerts/sanitizer errors found in log:')
            [print(alert) for alert in alerts]
            found = True

        if self.skip_alerts:
            for skip in self.skip_alerts:
                alerts = [al for al in alerts if re.search(skip, al) is None]

        if alerts:
            self._print_path_to_log()
            self.assertFalse(alerts, 'alert(s)')

        if not self.skip_sanitizer:
            sanitizer_errors = re.findall('.+Sanitizer.+', log)

            if sanitizer_errors:
                self._print_path_to_log()
                self.assertFalse(sanitizer_errors, 'sanitizer error(s)')

        if found:
            print('skipped.')

    def waitforfiles(self, *files):
        for i in range(50):
            wait = False
            ret = False

            for f in files:
                if not os.path.exists(f):
                    wait = True
                    break

            if wait:
                time.sleep(0.1)

            else:
                ret = True
                break

        return ret

    @staticmethod
    def _parse_args():
        parser = argparse.ArgumentParser(add_help=False)

        parser.add_argument(
            '-d',
            '--detailed',
            dest='detailed',
            action='store_true',
            help='Detailed output for tests',
        )
        parser.add_argument(
            '-l',
            '--log',
            dest='save_log',
            action='store_true',
            help='Save unit.log after the test execution',
        )

        return parser.parse_known_args()

    @staticmethod
    def _set_args(args):
        TestUnit.detailed = args.detailed
        TestUnit.save_log = args.save_log

    def _print_path_to_log(self):
        print('Path to unit.log:\n' + self.testdir + '/unit.log')