import fcntl import os import platform import re import shutil import signal import stat import subprocess import sys import tempfile import time import pytest from unit.check.go import check_go from unit.check.node import check_node from unit.check.tls import check_openssl def pytest_addoption(parser): parser.addoption( "--detailed", default=False, action="store_true", help="Detailed output for tests", ) parser.addoption( "--print_log", default=False, action="store_true", help="Print unit.log to stdout in case of errors", ) parser.addoption( "--save_log", default=False, action="store_true", help="Save unit.log after the test execution", ) parser.addoption( "--unsafe", default=False, action="store_true", help="Run unsafe tests", ) unit_instance = {} option = None def pytest_configure(config): global option option = config.option option.generated_tests = {} option.current_dir = os.path.abspath( os.path.join(os.path.dirname(__file__), os.pardir) ) option.test_dir = option.current_dir + '/test' option.architecture = platform.architecture()[0] option.system = platform.system() # set stdout to non-blocking if option.detailed or option.print_log: fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, 0) def pytest_generate_tests(metafunc): cls = metafunc.cls if not hasattr(cls, 'application_type'): return type = cls.application_type def generate_tests(versions): metafunc.fixturenames.append('tmp_ct') metafunc.parametrize('tmp_ct', versions) for version in versions: option.generated_tests[ metafunc.function.__name__ + '[{}]'.format(version) ] = (type + ' ' + version) # take available module from option and generate tests for each version for module, prereq_version in cls.prerequisites['modules'].items(): if module in option.available['modules']: available_versions = option.available['modules'][module] if prereq_version == 'all': generate_tests(available_versions) elif prereq_version == 'any': option.generated_tests[metafunc.function.__name__] = ( type + ' ' + available_versions[0] ) elif callable(prereq_version): generate_tests( list(filter(prereq_version, available_versions)) ) else: raise ValueError( """ Unexpected prerequisite version "%s" for module "%s" in %s. 'all', 'any' or callable expected.""" % (str(prereq_version), module, str(cls)) ) def pytest_sessionstart(session): option.available = {'modules': {}, 'features': {}} unit = unit_run() # read unit.log for i in range(50): with open(unit['temp_dir'] + '/unit.log', 'r') as f: log = f.read() m = re.search('controller started', log) if m is None: time.sleep(0.1) else: break if m is None: _print_log() exit("Unit is writing log too long") # discover available modules from unit.log for module in re.findall(r'module: ([a-zA-Z]+) (.*) ".*"$', log, re.M): if module[0] not in option.available['modules']: option.available['modules'][module[0]] = [module[1]] else: option.available['modules'][module[0]].append(module[1]) # discover modules from check option.available['modules']['openssl'] = check_openssl(unit['unitd']) option.available['modules']['go'] = check_go( option.current_dir, unit['temp_dir'], option.test_dir ) option.available['modules']['node'] = check_node(option.current_dir) # remove None values option.available['modules'] = { k: v for k, v in option.available['modules'].items() if v is not None } unit_stop() def setup_method(self): option.skip_alerts = [ r'read signalfd\(4\) failed', r'sendmsg.+failed', r'recvmsg.+failed', ] option.skip_sanitizer = False def unit_run(): global unit_instance build_dir = option.current_dir + '/build' unitd = build_dir + '/unitd' if not os.path.isfile(unitd): exit('Could not find unit') temp_dir = tempfile.mkdtemp(prefix='unit-test-') public_dir(temp_dir) if oct(stat.S_IMODE(os.stat(build_dir).st_mode)) != '0o777': public_dir(build_dir) os.mkdir(temp_dir + '/state') with open(temp_dir + '/unit.log', 'w') as log: unit_instance['process'] = subprocess.Popen( [ unitd, '--no-daemon', '--modules', build_dir, '--state', temp_dir + '/state', '--pid', temp_dir + '/unit.pid', '--log', temp_dir + '/unit.log', '--control', 'unix:' + temp_dir + '/control.unit.sock', '--tmp', temp_dir, ], stderr=log, ) if not waitforfiles(temp_dir + '/control.unit.sock'): _print_log() exit('Could not start unit') # dumb (TODO: remove) option.skip_alerts = [ r'read signalfd\(4\) failed', r'sendmsg.+failed', r'recvmsg.+failed', ] option.skip_sanitizer = False unit_instance['temp_dir'] = temp_dir unit_instance['log'] = temp_dir + '/unit.log' unit_instance['control_sock'] = temp_dir + '/control.unit.sock' unit_instance['unitd'] = unitd return unit_instance def unit_stop(): p = unit_instance['process'] if p.poll() is not None: return p.send_signal(signal.SIGQUIT) try: retcode = p.wait(15) if retcode: return 'Child process terminated with code ' + str(retcode) except: p.kill() return 'Could not terminate unit' shutil.rmtree(unit_instance['temp_dir']) def public_dir(path): os.chmod(path, 0o777) for root, dirs, files in os.walk(path): for d in dirs: os.chmod(os.path.join(root, d), 0o777) for f in files: os.chmod(os.path.join(root, f), 0o777) def waitforfiles(*files): for i in range(50): wait = False ret = False for f in files: if not os.path.exists(f): wait = True break if wait: time.sleep(0.1) else: ret = True break return ret def skip_alert(*alerts): option.skip_alerts.extend(alerts) def _check_alerts(log): found = False alerts = re.findall(r'.+\[alert\].+', log) if alerts: print('All alerts/sanitizer errors found in log:') [print(alert) for alert in alerts] found = True if option.skip_alerts: for skip in option.skip_alerts: alerts = [al for al in alerts if re.search(skip, al) is None] if alerts: _print_log(data=log) assert not alerts, 'alert(s)' if not option.skip_sanitizer: sanitizer_errors = re.findall('.+Sanitizer.+', log) if sanitizer_errors: _print_log(data=log) assert not sanitizer_errors, 'sanitizer error(s)' if found: print('skipped.') def _print_log(path=None, data=None): if path is None: path = unit_instance['log'] print('Path to unit.log:\n' + path + '\n') if option.print_log: os.set_blocking(sys.stdout.fileno(), True) sys.stdout.flush() if data is None: with open(path, 'r', encoding='utf-8', errors='ignore') as f: shutil.copyfileobj(f, sys.stdout) else: sys.stdout.write(data) @pytest.fixture def is_unsafe(request): return request.config.getoption("--unsafe") @pytest.fixture def is_su(request): return os.geteuid() == 0 def pytest_sessionfinish(session): unit_stop()