diff options
Diffstat (limited to 'test/conftest.py')
-rw-r--r-- | test/conftest.py | 299 |
1 files changed, 299 insertions, 0 deletions
diff --git a/test/conftest.py b/test/conftest.py new file mode 100644 index 00000000..8683a023 --- /dev/null +++ b/test/conftest.py @@ -0,0 +1,299 @@ +import fcntl +import os +import platform +import pytest +import signal +import stat +import subprocess +import sys +import re +import tempfile +import time + + +def pytest_addoption(parser): + parser.addoption( + "--detailed", + default=False, + action="store_true", + help="Detailed output for tests", + ) + parser.addoption( + "--print_log", + default=False, + action="store_true", + help="Print unit.log to stdout in case of errors", + ) + parser.addoption( + "--save_log", + default=False, + action="store_true", + help="Save unit.log after the test execution", + ) + parser.addoption( + "--unsafe", + default=False, + action="store_true", + help="Run unsafe tests", + ) + + +unit_instance = {} +option = None + + +def pytest_configure(config): + global option + option = config.option + + option.generated_tests = {} + option.current_dir = os.path.abspath( + os.path.join(os.path.dirname(__file__), os.pardir) + ) + option.test_dir = option.current_dir + '/test' + option.architecture = platform.architecture()[0] + option.system = platform.system() + + # set stdout to non-blocking + + if option.detailed or option.print_log: + fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, 0) + + +def pytest_generate_tests(metafunc): + cls = metafunc.cls + if not hasattr(cls, 'application_type'): + return + + type = cls.application_type + + # take available module from option and generate tests for each version + + for module in cls.prerequisites['modules']: + if module in option.available['modules']: + prereq_version = cls.prerequisites['modules'][module] + available_versions = option.available['modules'][module] + + if prereq_version == 'all': + metafunc.fixturenames.append('tmp_ct') + metafunc.parametrize('tmp_ct', range(len(available_versions))) + + for i in range(len(available_versions)): + version = available_versions[i] + option.generated_tests[ + metafunc.function.__name__ + '[{}]'.format(i) + ] = (type + ' ' + version) + elif prereq_version == 'any': + option.generated_tests[metafunc.function.__name__] = ( + type + ' ' + available_versions[0] + ) + else: + for version in available_versions: + if version.startswith(prereq_version): + option.generated_tests[metafunc.function.__name__] = ( + type + ' ' + version + ) + + +def pytest_sessionstart(session): + option.available = {'modules': {}, 'features': {}} + + unit = unit_run() + + # read unit.log + + for i in range(50): + with open(unit['temp_dir'] + '/unit.log', 'r') as f: + log = f.read() + m = re.search('controller started', log) + + if m is None: + time.sleep(0.1) + else: + break + + if m is None: + _print_log() + exit("Unit is writing log too long") + + # discover available modules from unit.log + + for module in re.findall(r'module: ([a-zA-Z]+) (.*) ".*"$', log, re.M): + if module[0] not in option.available['modules']: + option.available['modules'][module[0]] = [module[1]] + else: + option.available['modules'][module[0]].append(module[1]) + + unit_stop() + + +def setup_method(self): + option.skip_alerts = [ + r'read signalfd\(4\) failed', + r'sendmsg.+failed', + r'recvmsg.+failed', + ] + option.skip_sanitizer = False + +def unit_run(): + global unit_instance + build_dir = option.current_dir + '/build' + unitd = build_dir + '/unitd' + + if not os.path.isfile(unitd): + exit('Could not find unit') + + temp_dir = tempfile.mkdtemp(prefix='unit-test-') + public_dir(temp_dir) + + if oct(stat.S_IMODE(os.stat(build_dir).st_mode)) != '0o777': + public_dir(build_dir) + + os.mkdir(temp_dir + '/state') + + with open(temp_dir + '/unit.log', 'w') as log: + unit_instance['process'] = subprocess.Popen( + [ + unitd, + '--no-daemon', + '--modules', + build_dir, + '--state', + temp_dir + '/state', + '--pid', + temp_dir + '/unit.pid', + '--log', + temp_dir + '/unit.log', + '--control', + 'unix:' + temp_dir + '/control.unit.sock', + '--tmp', + temp_dir, + ], + stderr=log, + ) + + if not waitforfiles(temp_dir + '/control.unit.sock'): + _print_log() + exit('Could not start unit') + + # dumb (TODO: remove) + option.skip_alerts = [ + r'read signalfd\(4\) failed', + r'sendmsg.+failed', + r'recvmsg.+failed', + ] + option.skip_sanitizer = False + + unit_instance['temp_dir'] = temp_dir + unit_instance['log'] = temp_dir + '/unit.log' + unit_instance['control_sock'] = temp_dir + '/control.unit.sock' + unit_instance['unitd'] = unitd + + return unit_instance + + +def unit_stop(): + p = unit_instance['process'] + + if p.poll() is not None: + return + + p.send_signal(signal.SIGQUIT) + + try: + retcode = p.wait(15) + if retcode: + return 'Child process terminated with code ' + str(retcode) + except: + p.kill() + return 'Could not terminate unit' + + +def public_dir(path): + os.chmod(path, 0o777) + + for root, dirs, files in os.walk(path): + for d in dirs: + os.chmod(os.path.join(root, d), 0o777) + for f in files: + os.chmod(os.path.join(root, f), 0o777) + +def waitforfiles(*files): + for i in range(50): + wait = False + ret = False + + for f in files: + if not os.path.exists(f): + wait = True + break + + if wait: + time.sleep(0.1) + + else: + ret = True + break + + return ret + + +def skip_alert(*alerts): + option.skip_alerts.extend(alerts) + + +def _check_alerts(log): + found = False + + alerts = re.findall(r'.+\[alert\].+', log) + + if alerts: + print('All alerts/sanitizer errors found in log:') + [print(alert) for alert in alerts] + found = True + + if option.skip_alerts: + for skip in option.skip_alerts: + alerts = [al for al in alerts if re.search(skip, al) is None] + + if alerts: + _print_log(log) + assert not alerts, 'alert(s)' + + if not option.skip_sanitizer: + sanitizer_errors = re.findall('.+Sanitizer.+', log) + + if sanitizer_errors: + _print_log(log) + assert not sanitizer_errors, 'sanitizer error(s)' + + if found: + print('skipped.') + + +def _print_log(data=None): + unit_log = unit_instance['log'] + + print('Path to unit.log:\n' + unit_log + '\n') + + if option.print_log: + os.set_blocking(sys.stdout.fileno(), True) + sys.stdout.flush() + + if data is None: + with open(unit_log, 'r', encoding='utf-8', errors='ignore') as f: + shutil.copyfileobj(f, sys.stdout) + else: + sys.stdout.write(data) + + +@pytest.fixture +def is_unsafe(request): + return request.config.getoption("--unsafe") + +@pytest.fixture +def is_su(request): + return os.geteuid() == 0 + +def pytest_sessionfinish(session): + unit_stop() |