summaryrefslogtreecommitdiffhomepage
path: root/test/conftest.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/conftest.py')
-rw-r--r--test/conftest.py90
1 files changed, 42 insertions, 48 deletions
diff --git a/test/conftest.py b/test/conftest.py
index 4a1aa7cc..8944c6b3 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -107,7 +107,7 @@ def pytest_configure(config):
option.current_dir = os.path.abspath(
os.path.join(os.path.dirname(__file__), os.pardir)
)
- option.test_dir = option.current_dir + '/test'
+ option.test_dir = f'{option.current_dir}/test'
option.architecture = platform.architecture()[0]
option.system = platform.system()
@@ -148,8 +148,8 @@ def pytest_generate_tests(metafunc):
for version in versions:
option.generated_tests[
- metafunc.function.__name__ + '[{}]'.format(version)
- ] = (type + ' ' + version)
+ f'{metafunc.function.__name__} [{version}]'
+ ] = f'{type} {version}'
# take available module from option and generate tests for each version
@@ -161,18 +161,17 @@ def pytest_generate_tests(metafunc):
generate_tests(available_versions)
elif prereq_version == 'any':
- option.generated_tests[metafunc.function.__name__] = (
- type + ' ' + available_versions[0]
- )
+ option.generated_tests[
+ metafunc.function.__name__
+ ] = f'{type} {available_versions[0]}'
elif callable(prereq_version):
generate_tests(list(filter(prereq_version, available_versions)))
else:
raise ValueError(
- """
-Unexpected prerequisite version "%s" for module "%s" in %s.
-'all', 'any' or callable expected."""
- % (str(prereq_version), module, str(cls))
+ f'''
+Unexpected prerequisite version "{prereq_version}" for module "{module}" in
+{cls}. 'all', 'any' or callable expected.'''
)
@@ -225,7 +224,7 @@ def pytest_sessionstart(session):
check_isolation()
check_unix_abstract()
- _clear_conf(unit['temp_dir'] + '/control.unit.sock')
+ _clear_conf(f'{unit["temp_dir"]}/control.unit.sock')
unit_stop()
@@ -244,7 +243,7 @@ def pytest_runtest_makereport(item, call):
# set a report attribute for each phase of a call, which can
# be "setup", "call", "teardown"
- setattr(item, "rep_" + rep.when, rep)
+ setattr(item, f'rep_{rep.when}', rep)
@pytest.fixture(scope='class', autouse=True)
@@ -264,7 +263,7 @@ def check_prerequisites(request):
missed.append(module)
if missed:
- pytest.skip('Unit has no ' + ', '.join(missed) + ' module(s)')
+ pytest.skip(f'Unit has no {", ".join(missed)} module(s)')
# check features
@@ -278,7 +277,7 @@ def check_prerequisites(request):
missed.append(feature)
if missed:
- pytest.skip(', '.join(missed) + ' feature(s) not supported')
+ pytest.skip(f'{", ".join(missed)} feature(s) not supported')
@pytest.fixture(autouse=True)
@@ -316,7 +315,7 @@ def run(request):
# clean temp_dir before the next test
if not option.restart:
- _clear_conf(unit['temp_dir'] + '/control.unit.sock', log=log)
+ _clear_conf(f'{unit["temp_dir"]}/control.unit.sock', log=log)
if is_findmnt and not waitforunmount(unit['temp_dir'], timeout=600):
exit('Could not unmount some filesystems in tmp dir.')
@@ -374,8 +373,8 @@ def unit_run(state_dir=None):
if not option.restart and 'unitd' in unit_instance:
return unit_instance
- build_dir = option.current_dir + '/build'
- unitd = build_dir + '/unitd'
+ build_dir = f'{option.current_dir}/build'
+ unitd = f'{build_dir}/unitd'
if not os.path.isfile(unitd):
exit('Could not find unit')
@@ -386,50 +385,52 @@ def unit_run(state_dir=None):
if oct(stat.S_IMODE(os.stat(build_dir).st_mode)) != '0o777':
public_dir(build_dir)
- state = temp_dir + '/state' if state_dir is None else state_dir
+ state = f'{temp_dir}/state' if state_dir is None else state_dir
if not os.path.isdir(state):
os.mkdir(state)
+ control_sock = f'{temp_dir}/control.unit.sock'
+
unitd_args = [
unitd,
'--no-daemon',
- '--modules',
+ '--modulesdir',
build_dir,
- '--state',
+ '--libstatedir',
state,
'--pid',
- temp_dir + '/unit.pid',
+ f'{temp_dir}/unit.pid',
'--log',
- temp_dir + '/unit.log',
+ f'{temp_dir}/unit.log',
'--control',
- 'unix:' + temp_dir + '/control.unit.sock',
- '--tmp',
+ f'unix:{temp_dir}/control.unit.sock',
+ '--tmpdir',
temp_dir,
]
if option.user:
unitd_args.extend(['--user', option.user])
- with open(temp_dir + '/unit.log', 'w') as log:
+ with open(f'{temp_dir}/unit.log', 'w') as log:
unit_instance['process'] = subprocess.Popen(unitd_args, stderr=log)
Log.temp_dir = temp_dir
- if not waitforfiles(temp_dir + '/control.unit.sock'):
+ if not waitforfiles(control_sock):
_print_log()
exit('Could not start unit')
unit_instance['temp_dir'] = temp_dir
- unit_instance['control_sock'] = temp_dir + '/control.unit.sock'
+ unit_instance['control_sock'] = control_sock
unit_instance['unitd'] = unitd
option.temp_dir = temp_dir
- with open(temp_dir + '/unit.pid', 'r') as f:
+ with open(f'{temp_dir}/unit.pid', 'r') as f:
unit_instance['pid'] = f.read().rstrip()
if state_dir is None:
- _clear_conf(unit_instance['temp_dir'] + '/control.unit.sock')
+ _clear_conf(control_sock)
_fds_info['main']['fds'] = _count_fds(unit_instance['pid'])
@@ -473,7 +474,7 @@ def unit_stop():
try:
retcode = p.wait(15)
if retcode:
- return 'Child process terminated with code ' + str(retcode)
+ return f'Child process terminated with code {retcode}'
except KeyboardInterrupt:
p.kill()
@@ -518,7 +519,7 @@ def _check_alerts(*, log=None):
def _print_log(log=None):
path = Log.get_path()
- print('Path to unit.log:\n' + path + '\n')
+ print(f'Path to unit.log:\n{path}\n')
if option.print_log:
os.set_blocking(sys.stdout.fileno(), True)
@@ -551,11 +552,11 @@ def _clear_conf(sock, *, log=None):
).keys()
except json.JSONDecodeError:
- pytest.fail('Can\'t parse certificates list.')
+ pytest.fail("Can't parse certificates list.")
for cert in certs:
resp = http.delete(
- url='/certificates/' + cert,
+ url=f'/certificates/{cert}',
sock_type='unix',
addr=sock,
)['body']
@@ -595,17 +596,14 @@ def _check_processes():
out = [
l
for l in out
- if re.search(router_pid + r'\s+' + unit_pid + r'.*unit: router', l)
- is None
+ if re.search(fr'{router_pid}\s+{unit_pid}.*unit: router', l) is None
]
assert len(out) == 1, 'one router'
out = [
l
for l in out
- if re.search(
- controller_pid + r'\s+' + unit_pid + r'.*unit: controller', l
- )
+ if re.search(fr'{controller_pid}\s+{unit_pid}.*unit: controller', l)
is None
]
assert len(out) == 0, 'one controller'
@@ -646,18 +644,16 @@ def _check_fds(*, log=None):
ps['fds'] += fds_diff
if not option.restart:
- assert ps['pid'] == ps_pid, 'same pid %s' % name
+ assert ps['pid'] == ps_pid, f'same pid {name}'
- assert fds_diff <= option.fds_threshold, (
- 'descriptors leak %s' % name
- )
+ assert fds_diff <= option.fds_threshold, f'descriptors leak {name}'
else:
ps['fds'] = _count_fds(ps['pid'])
def _count_fds(pid):
- procfile = '/proc/%s/fd' % pid
+ procfile = f'/proc/{pid}/fd'
if os.path.isdir(procfile):
return len(os.listdir(procfile))
@@ -712,14 +708,12 @@ def stop_processes():
def pid_by_name(name):
output = subprocess.check_output(['ps', 'ax', '-O', 'ppid']).decode()
- m = re.search(
- r'\s*(\d+)\s*' + str(unit_instance['pid']) + r'.*' + name, output
- )
+ m = re.search(fr'\s*(\d+)\s*{unit_instance["pid"]}.*{name}', output)
return None if m is None else m.group(1)
def find_proc(name, ps_output):
- return re.findall(str(unit_instance['pid']) + r'.*' + name, ps_output)
+ return re.findall(f'{unit_instance["pid"]}.*{name}', ps_output)
@pytest.fixture()
@@ -762,7 +756,7 @@ def unit_pid(request):
def pytest_sessionfinish(session):
if not option.restart and option.save_log:
- print('Path to unit.log:\n' + Log.get_path() + '\n')
+ print(f'Path to unit.log:\n{Log.get_path()}\n')
option.restart = True