import argparse import atexit import os import platform import pytest import re import shutil import signal import stat import subprocess import sys import tempfile import time from conftest import option, public_dir, waitforfiles, _check_alerts, _print_log from multiprocessing import Process class TestUnit(): @classmethod def setup_class(cls, complete_check=True): cls.available = option.available unit = TestUnit() unit._run() # read unit.log for i in range(50): with open(unit.temp_dir + '/unit.log', 'r') as f: log = f.read() m = re.search('controller started', log) if m is None: time.sleep(0.1) else: break if m is None: _print_log(path=unit.temp_dir + '/unit.log') exit("Unit is writing log too long") def check(available, prerequisites): missed = [] # check modules if 'modules' in prerequisites: available_modules = list(available['modules'].keys()) for module in prerequisites['modules']: if module in available_modules: continue missed.append(module) if missed: pytest.skip('Unit has no ' + ', '.join(missed) + ' module(s)') # check features if 'features' in prerequisites: available_features = list(available['features'].keys()) for feature in prerequisites['features']: if feature in available_features: continue missed.append(feature) if missed: pytest.skip(', '.join(missed) + ' feature(s) not supported') def destroy(): unit.stop() _check_alerts(log) shutil.rmtree(unit.temp_dir) def complete(): destroy() check(cls.available, cls.prerequisites) if complete_check: complete() else: unit.complete = complete return unit def setup_method(self): self._run() def _run(self): build_dir = option.current_dir + '/build' self.unitd = build_dir + '/unitd' if not os.path.isfile(self.unitd): exit("Could not find unit") self.temp_dir = tempfile.mkdtemp(prefix='unit-test-') public_dir(self.temp_dir) if oct(stat.S_IMODE(os.stat(build_dir).st_mode)) != '0o777': public_dir(build_dir) os.mkdir(self.temp_dir + '/state') with open(self.temp_dir + '/unit.log', 'w') as log: self._p = subprocess.Popen( [ self.unitd, '--no-daemon', '--modules', build_dir, '--state', self.temp_dir + '/state', '--pid', self.temp_dir + '/unit.pid', '--log', self.temp_dir + '/unit.log', '--control', 'unix:' + self.temp_dir + '/control.unit.sock', '--tmp', self.temp_dir, ], stderr=log, ) atexit.register(self.stop) if not waitforfiles(self.temp_dir + '/control.unit.sock'): _print_log(path=self.temp_dir + '/unit.log') exit("Could not start unit") self._started = True def teardown_method(self): self.stop() # check unit.log for alerts unit_log = self.temp_dir + '/unit.log' with open(unit_log, 'r', encoding='utf-8', errors='ignore') as f: _check_alerts(f.read()) # remove unit.log if not option.save_log: shutil.rmtree(self.temp_dir) else: _print_log(path=self.temp_dir) assert self.stop_errors == [None, None], 'stop errors' def stop(self): if not self._started: return self.stop_errors = [] self.stop_errors.append(self._stop()) self.stop_errors.append(self.stop_processes()) atexit.unregister(self.stop) self._started = False def _stop(self): if self._p.poll() is not None: return with self._p as p: p.send_signal(signal.SIGQUIT) try: retcode = p.wait(15) if retcode: return 'Child process terminated with code ' + str(retcode) except: p.kill() return 'Could not terminate unit' def run_process(self, target, *args): if not hasattr(self, '_processes'): self._processes = [] process = Process(target=target, args=args) process.start() self._processes.append(process) def stop_processes(self): if not hasattr(self, '_processes'): return fail = False for process in self._processes: if process.is_alive(): process.terminate() process.join(timeout=15) if process.is_alive(): fail = True if fail: return 'Fail to stop process'