import atexit import os import re import shutil import signal import stat import subprocess import tempfile import time from multiprocessing import Process import pytest from conftest import _check_alerts from conftest import _print_log from conftest import option from conftest import public_dir from conftest import waitforfiles class TestUnit(): @classmethod def setup_class(cls, complete_check=True): cls.available = option.available unit = TestUnit() unit._run() # read unit.log for i in range(50): with open(unit.temp_dir + '/unit.log', 'r') as f: log = f.read() m = re.search('controller started', log) if m is None: time.sleep(0.1) else: break if m is None: _print_log(path=unit.temp_dir + '/unit.log') exit("Unit is writing log too long") def check(available, prerequisites): missed = [] # check modules if 'modules' in prerequisites: available_modules = list(available['modules'].keys()) for module in prerequisites['modules']: if module in available_modules: continue missed.append(module) if missed: pytest.skip('Unit has no ' + ', '.join(missed) + ' module(s)') # check features if 'features' in prerequisites: available_features = list(available['features'].keys()) for feature in prerequisites['features']: if feature in available_features: continue missed.append(feature) if missed: pytest.skip(', '.join(missed) + ' feature(s) not supported') def destroy(): unit.stop() _check_alerts(log) shutil.rmtree(unit.temp_dir) def complete(): destroy() check(cls.available, cls.prerequisites) if complete_check: complete() else: unit.complete = complete return unit def setup_method(self): self._run() def _run(self): build_dir = option.current_dir + '/build' self.unitd = build_dir + '/unitd' if not os.path.isfile(self.unitd): exit("Could not find unit") self.temp_dir = tempfile.mkdtemp(prefix='unit-test-') public_dir(self.temp_dir) if oct(stat.S_IMODE(os.stat(build_dir).st_mode)) != '0o777': public_dir(build_dir) os.mkdir(self.temp_dir + '/state') with open(self.temp_dir + '/unit.log', 'w') as log: self._p = subprocess.Popen( [ self.unitd, '--no-daemon', '--modules', build_dir, '--state', self.temp_dir + '/state', '--pid', self.temp_dir + '/unit.pid', '--log', self.temp_dir + '/unit.log', '--control', 'unix:' + self.temp_dir + '/control.unit.sock', '--tmp', self.temp_dir, ], stderr=log, ) atexit.register(self.stop) if not waitforfiles(self.temp_dir + '/control.unit.sock'): _print_log(path=self.temp_dir + '/unit.log') exit("Could not start unit") self._started = True def teardown_method(self): self.stop() # check unit.log for alerts unit_log = self.temp_dir + '/unit.log' with open(unit_log, 'r', encoding='utf-8', errors='ignore') as f: _check_alerts(f.read()) # remove unit.log if not option.save_log: shutil.rmtree(self.temp_dir) else: _print_log(path=self.temp_dir) assert self.stop_errors == [None, None], 'stop errors' def stop(self): if not self._started: return self.stop_errors = [] self.stop_errors.append(self._stop()) self.stop_errors.append(self.stop_processes()) atexit.unregister(self.stop) self._started = False def _stop(self): if self._p.poll() is not None: return with self._p as p: p.send_signal(signal.SIGQUIT) try: retcode = p.wait(15) if retcode: return 'Child process terminated with code ' + str(retcode) except: p.kill() return 'Could not terminate unit' def run_process(self, target, *args): if not hasattr(self, '_processes'): self._processes = [] process = Process(target=target, args=args) process.start() self._processes.append(process) def stop_processes(self): if not hasattr(self, '_processes'): return fail = False for process in self._processes: if process.is_alive(): process.terminate() process.join(timeout=15) if process.is_alive(): fail = True if fail: return 'Fail to stop process'