diff options
Diffstat (limited to 'test/unit/main.py')
-rw-r--r-- | test/unit/main.py | 292 |
1 files changed, 37 insertions, 255 deletions
diff --git a/test/unit/main.py b/test/unit/main.py index 83aa9139..d5940995 100644 --- a/test/unit/main.py +++ b/test/unit/main.py @@ -1,90 +1,26 @@ -import argparse import atexit -import fcntl import os -import platform import re import shutil import signal import stat import subprocess -import sys import tempfile import time -import unittest from multiprocessing import Process +import pytest +from conftest import _check_alerts +from conftest import _print_log +from conftest import option +from conftest import public_dir +from conftest import waitforfiles -class TestUnit(unittest.TestCase): - - current_dir = os.path.abspath( - os.path.join(os.path.dirname(__file__), os.pardir) - ) - pardir = os.path.abspath( - os.path.join(os.path.dirname(__file__), os.pardir, os.pardir) - ) - is_su = os.geteuid() == 0 - uid = os.geteuid() - gid = os.getegid() - architecture = platform.architecture()[0] - system = platform.system() - maxDiff = None - - detailed = False - save_log = False - print_log = False - unsafe = False - - def __init__(self, methodName='runTest'): - super().__init__(methodName) - - if re.match(r'.*\/run\.py$', sys.argv[0]): - args, rest = TestUnit._parse_args() - - TestUnit._set_args(args) - - def run(self, result=None): - if not hasattr(self, 'application_type'): - return super().run(result) - - # rerun test for each available module version - - type = self.application_type - for module in self.prerequisites['modules']: - if module in self.available['modules']: - prereq_version = self.prerequisites['modules'][module] - available_versions = self.available['modules'][module] - - if prereq_version == 'all': - for version in available_versions: - self.application_type = type + ' ' + version - super().run(result) - elif prereq_version == 'any': - self.application_type = type + ' ' + available_versions[0] - super().run(result) - else: - for version in available_versions: - if version.startswith(prereq_version): - self.application_type = type + ' ' + version - super().run(result) +class TestUnit(): @classmethod - def main(cls): - args, rest = TestUnit._parse_args() - - for i, arg in enumerate(rest): - if arg[:5] == 'test_': - rest[i] = cls.__name__ + '.' + arg - - sys.argv = sys.argv[:1] + rest - - TestUnit._set_args(args) - - unittest.main() - - @classmethod - def setUpClass(cls, complete_check=True): - cls.available = {'modules': {}, 'features': {}} + def setup_class(cls, complete_check=True): + cls.available = option.available unit = TestUnit() unit._run() @@ -92,7 +28,7 @@ class TestUnit(unittest.TestCase): # read unit.log for i in range(50): - with open(unit.testdir + '/unit.log', 'r') as f: + with open(unit.temp_dir + '/unit.log', 'r') as f: log = f.read() m = re.search('controller started', log) @@ -102,17 +38,9 @@ class TestUnit(unittest.TestCase): break if m is None: - unit._print_log() + _print_log(path=unit.temp_dir + '/unit.log') exit("Unit is writing log too long") - # discover available modules from unit.log - - for module in re.findall(r'module: ([a-zA-Z]+) (.*) ".*"$', log, re.M): - if module[0] not in cls.available['modules']: - cls.available['modules'][module[0]] = [module[1]] - else: - cls.available['modules'][module[0]].append(module[1]) - def check(available, prerequisites): missed = [] @@ -128,8 +56,7 @@ class TestUnit(unittest.TestCase): missed.append(module) if missed: - print('Unit has no ' + ', '.join(missed) + ' module(s)') - raise unittest.SkipTest() + pytest.skip('Unit has no ' + ', '.join(missed) + ' module(s)') # check features @@ -143,13 +70,12 @@ class TestUnit(unittest.TestCase): missed.append(feature) if missed: - print(', '.join(missed) + ' feature(s) not supported') - raise unittest.SkipTest() + pytest.skip(', '.join(missed) + ' feature(s) not supported') def destroy(): unit.stop() - unit._check_alerts(log) - shutil.rmtree(unit.testdir) + _check_alerts(log) + shutil.rmtree(unit.temp_dir) def complete(): destroy() @@ -161,92 +87,66 @@ class TestUnit(unittest.TestCase): unit.complete = complete return unit - def setUp(self): + def setup_method(self): self._run() def _run(self): - build_dir = self.pardir + '/build' + build_dir = option.current_dir + '/build' self.unitd = build_dir + '/unitd' if not os.path.isfile(self.unitd): exit("Could not find unit") - self.testdir = tempfile.mkdtemp(prefix='unit-test-') + self.temp_dir = tempfile.mkdtemp(prefix='unit-test-') - self.public_dir(self.testdir) + public_dir(self.temp_dir) if oct(stat.S_IMODE(os.stat(build_dir).st_mode)) != '0o777': - self.public_dir(build_dir) + public_dir(build_dir) - os.mkdir(self.testdir + '/state') + os.mkdir(self.temp_dir + '/state') - with open(self.testdir + '/unit.log', 'w') as log: + with open(self.temp_dir + '/unit.log', 'w') as log: self._p = subprocess.Popen( [ self.unitd, '--no-daemon', - '--modules', self.pardir + '/build', - '--state', self.testdir + '/state', - '--pid', self.testdir + '/unit.pid', - '--log', self.testdir + '/unit.log', - '--control', 'unix:' + self.testdir + '/control.unit.sock', - '--tmp', self.testdir, + '--modules', build_dir, + '--state', self.temp_dir + '/state', + '--pid', self.temp_dir + '/unit.pid', + '--log', self.temp_dir + '/unit.log', + '--control', 'unix:' + self.temp_dir + '/control.unit.sock', + '--tmp', self.temp_dir, ], stderr=log, ) atexit.register(self.stop) - if not self.waitforfiles(self.testdir + '/control.unit.sock'): - self._print_log() + if not waitforfiles(self.temp_dir + '/control.unit.sock'): + _print_log(path=self.temp_dir + '/unit.log') exit("Could not start unit") self._started = True - self.skip_alerts = [ - r'read signalfd\(4\) failed', - r'sendmsg.+failed', - r'recvmsg.+failed', - ] - self.skip_sanitizer = False - - def tearDown(self): + def teardown_method(self): self.stop() - # detect errors and failures for current test - - def list2reason(exc_list): - if exc_list and exc_list[-1][0] is self: - return exc_list[-1][1] - - if hasattr(self, '_outcome'): - result = self.defaultTestResult() - self._feedErrorsToResult(result, self._outcome.errors) - else: - result = getattr( - self, '_outcomeForDoCleanups', self._resultForDoCleanups - ) - - success = not list2reason(result.errors) and not list2reason( - result.failures - ) - # check unit.log for alerts - unit_log = self.testdir + '/unit.log' + unit_log = self.temp_dir + '/unit.log' with open(unit_log, 'r', encoding='utf-8', errors='ignore') as f: - self._check_alerts(f.read()) + _check_alerts(f.read()) # remove unit.log - if not TestUnit.save_log and success: - shutil.rmtree(self.testdir) - + if not option.save_log: + shutil.rmtree(self.temp_dir) else: - self._print_log() + _print_log(path=self.temp_dir) - self.assertListEqual(self.stop_errors, [None, None], 'stop errors') + assert self.stop_errors == [None, None], 'stop errors' def stop(self): if not self._started: @@ -301,121 +201,3 @@ class TestUnit(unittest.TestCase): if fail: return 'Fail to stop process' - - def waitforfiles(self, *files): - for i in range(50): - wait = False - ret = False - - for f in files: - if not os.path.exists(f): - wait = True - break - - if wait: - time.sleep(0.1) - - else: - ret = True - break - - return ret - - def public_dir(self, path): - os.chmod(path, 0o777) - - for root, dirs, files in os.walk(path): - for d in dirs: - os.chmod(os.path.join(root, d), 0o777) - for f in files: - os.chmod(os.path.join(root, f), 0o777) - - def _check_alerts(self, log): - found = False - - alerts = re.findall('.+\[alert\].+', log) - - if alerts: - print('All alerts/sanitizer errors found in log:') - [print(alert) for alert in alerts] - found = True - - if self.skip_alerts: - for skip in self.skip_alerts: - alerts = [al for al in alerts if re.search(skip, al) is None] - - if alerts: - self._print_log(log) - self.assertFalse(alerts, 'alert(s)') - - if not self.skip_sanitizer: - sanitizer_errors = re.findall('.+Sanitizer.+', log) - - if sanitizer_errors: - self._print_log(log) - self.assertFalse(sanitizer_errors, 'sanitizer error(s)') - - if found: - print('skipped.') - - @staticmethod - def _parse_args(): - parser = argparse.ArgumentParser(add_help=False) - - parser.add_argument( - '-d', - '--detailed', - dest='detailed', - action='store_true', - help='Detailed output for tests', - ) - parser.add_argument( - '-l', - '--log', - dest='save_log', - action='store_true', - help='Save unit.log after the test execution', - ) - parser.add_argument( - '-r', - '--reprint_log', - dest='print_log', - action='store_true', - help='Print unit.log to stdout in case of errors', - ) - parser.add_argument( - '-u', - '--unsafe', - dest='unsafe', - action='store_true', - help='Run unsafe tests', - ) - - return parser.parse_known_args() - - @staticmethod - def _set_args(args): - TestUnit.detailed = args.detailed - TestUnit.save_log = args.save_log - TestUnit.print_log = args.print_log - TestUnit.unsafe = args.unsafe - - # set stdout to non-blocking - - if TestUnit.detailed or TestUnit.print_log: - fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, 0) - - def _print_log(self, data=None): - path = self.testdir + '/unit.log' - - print('Path to unit.log:\n' + path + '\n') - - if TestUnit.print_log: - os.set_blocking(sys.stdout.fileno(), True) - sys.stdout.flush() - - if data is None: - with open(path, 'r', encoding='utf-8', errors='ignore') as f: - shutil.copyfileobj(f, sys.stdout) - else: - sys.stdout.write(data) |