diff options
Diffstat (limited to 'test/test_python_isolation.py')
-rw-r--r-- | test/test_python_isolation.py | 315 |
1 files changed, 151 insertions, 164 deletions
diff --git a/test/test_python_isolation.py b/test/test_python_isolation.py index c524aea0..260a87a2 100644 --- a/test/test_python_isolation.py +++ b/test/test_python_isolation.py @@ -4,224 +4,211 @@ import subprocess from pathlib import Path import pytest -from unit.applications.lang.python import TestApplicationPython +from unit.applications.lang.python import ApplicationPython from unit.option import option from unit.utils import findmnt from unit.utils import waitformount from unit.utils import waitforunmount +prerequisites = {'modules': {'python': 'any'}, 'features': {'isolation': True}} -class TestPythonIsolation(TestApplicationPython): - prerequisites = {'modules': {'python': 'any'}, 'features': ['isolation']} +client = ApplicationPython() - def get_cgroup(self, app_name): - output = subprocess.check_output( - ['ps', 'ax', '-o', 'pid', '-o', 'cmd'] - ).decode() - pid = re.search( - fr'(\d+)\s*unit: "{app_name}" application', output - ).group(1) +def get_cgroup(app_name): + output = subprocess.check_output( + ['ps', 'ax', '-o', 'pid', '-o', 'cmd'] + ).decode() - cgroup = f'/proc/{pid}/cgroup' + pid = re.search(fr'(\d+)\s*unit: "{app_name}" application', output).group(1) - if not os.path.isfile(cgroup): - pytest.skip(f'no cgroup at {cgroup}') + cgroup = f'/proc/{pid}/cgroup' - with open(cgroup, 'r') as f: - return f.read().rstrip() + if not os.path.isfile(cgroup): + pytest.skip(f'no cgroup at {cgroup}') - def test_python_isolation_rootfs(self, is_su, temp_dir): - isolation_features = option.available['features']['isolation'].keys() + with open(cgroup, 'r') as f: + return f.read().rstrip() - if not is_su: - if not 'unprivileged_userns_clone' in isolation_features: - pytest.skip('requires unprivileged userns or root') - if 'user' not in isolation_features: - pytest.skip('user namespace is not supported') +def test_python_isolation_rootfs(is_su, require, temp_dir): + isolation = {'rootfs': temp_dir} - if 'mnt' not in isolation_features: - pytest.skip('mnt namespace is not supported') + if not is_su: + require( + { + 'features': { + 'isolation': [ + 'unprivileged_userns_clone', + 'user', + 'mnt', + 'pid', + ] + } + } + ) - if 'pid' not in isolation_features: - pytest.skip('pid namespace is not supported') + isolation['namespaces'] = { + 'mount': True, + 'credential': True, + 'pid': True, + } - isolation = {'rootfs': temp_dir} + client.load('ns_inspect', isolation=isolation) - if not is_su: - isolation['namespaces'] = { - 'mount': True, - 'credential': True, - 'pid': True, - } + assert not ( + client.getjson(url=f'/?path={temp_dir}')['body']['FileExists'] + ), 'temp_dir does not exists in rootfs' - self.load('ns_inspect', isolation=isolation) + assert client.getjson(url='/?path=/proc/self')['body'][ + 'FileExists' + ], 'no /proc/self' - assert ( - self.getjson(url=f'/?path={temp_dir}')['body']['FileExists'] - == False - ), 'temp_dir does not exists in rootfs' + assert not ( + client.getjson(url='/?path=/dev/pts')['body']['FileExists'] + ), 'no /dev/pts' - assert ( - self.getjson(url='/?path=/proc/self')['body']['FileExists'] == True - ), 'no /proc/self' + assert not ( + client.getjson(url='/?path=/sys/kernel')['body']['FileExists'] + ), 'no /sys/kernel' - assert ( - self.getjson(url='/?path=/dev/pts')['body']['FileExists'] == False - ), 'no /dev/pts' + ret = client.getjson(url='/?path=/app/python/ns_inspect') - assert ( - self.getjson(url='/?path=/sys/kernel')['body']['FileExists'] - == False - ), 'no /sys/kernel' + assert ret['body']['FileExists'], 'application exists in rootfs' - ret = self.getjson(url='/?path=/app/python/ns_inspect') - assert ret['body']['FileExists'] == True, 'application exists in rootfs' +def test_python_isolation_rootfs_no_language_deps(require, temp_dir): + require({'privileged_user': True}) - def test_python_isolation_rootfs_no_language_deps(self, is_su, temp_dir): - if not is_su: - pytest.skip('requires root') + isolation = {'rootfs': temp_dir, 'automount': {'language_deps': False}} + client.load('empty', isolation=isolation) - isolation = {'rootfs': temp_dir, 'automount': {'language_deps': False}} - self.load('empty', isolation=isolation) + python_path = f'{temp_dir}/usr' - python_path = f'{temp_dir}/usr' + assert findmnt().find(python_path) == -1 + assert client.get()['status'] != 200, 'disabled language_deps' + assert findmnt().find(python_path) == -1 - assert findmnt().find(python_path) == -1 - assert self.get()['status'] != 200, 'disabled language_deps' - assert findmnt().find(python_path) == -1 + isolation['automount']['language_deps'] = True - isolation['automount']['language_deps'] = True + client.load('empty', isolation=isolation) - self.load('empty', isolation=isolation) + assert findmnt().find(python_path) == -1 + assert client.get()['status'] == 200, 'enabled language_deps' + assert waitformount(python_path), 'language_deps mount' - assert findmnt().find(python_path) == -1 - assert self.get()['status'] == 200, 'enabled language_deps' - assert waitformount(python_path), 'language_deps mount' + client.conf({"listeners": {}, "applications": {}}) - self.conf({"listeners": {}, "applications": {}}) + assert waitforunmount(python_path), 'language_deps unmount' - assert waitforunmount(python_path), 'language_deps unmount' - def test_python_isolation_procfs(self, is_su, temp_dir): - if not is_su: - pytest.skip('requires root') +def test_python_isolation_procfs(require, temp_dir): + require({'privileged_user': True}) - isolation = {'rootfs': temp_dir, 'automount': {'procfs': False}} + isolation = {'rootfs': temp_dir, 'automount': {'procfs': False}} - self.load('ns_inspect', isolation=isolation) + client.load('ns_inspect', isolation=isolation) - assert ( - self.getjson(url='/?path=/proc/self')['body']['FileExists'] == False - ), 'no /proc/self' + assert not ( + client.getjson(url='/?path=/proc/self')['body']['FileExists'] + ), 'no /proc/self' - isolation['automount']['procfs'] = True + isolation['automount']['procfs'] = True - self.load('ns_inspect', isolation=isolation) + client.load('ns_inspect', isolation=isolation) - assert ( - self.getjson(url='/?path=/proc/self')['body']['FileExists'] == True - ), '/proc/self' + assert client.getjson(url='/?path=/proc/self')['body'][ + 'FileExists' + ], '/proc/self' - def test_python_isolation_cgroup(self, is_su, temp_dir): - if not is_su: - pytest.skip('requires root') - if not 'cgroup' in option.available['features']['isolation']: - pytest.skip('cgroup is not supported') +def test_python_isolation_cgroup(require): + require({'privileged_user': True, 'features': {'isolation': ['cgroup']}}) - def set_cgroup_path(path): - isolation = {'cgroup': {'path': path}} - self.load('empty', processes=1, isolation=isolation) + def set_cgroup_path(path): + isolation = {'cgroup': {'path': path}} + client.load('empty', processes=1, isolation=isolation) - set_cgroup_path('scope/python') + set_cgroup_path('scope/python') - cgroup_rel = Path(self.get_cgroup('empty')) - assert cgroup_rel.parts[-2:] == ('scope', 'python'), 'cgroup rel' + cgroup_rel = Path(get_cgroup('empty')) + assert cgroup_rel.parts[-2:] == ('scope', 'python'), 'cgroup rel' - set_cgroup_path('/scope2/python') + set_cgroup_path('/scope2/python') - cgroup_abs = Path(self.get_cgroup('empty')) - assert cgroup_abs.parts[-2:] == ('scope2', 'python'), 'cgroup abs' + cgroup_abs = Path(get_cgroup('empty')) + assert cgroup_abs.parts[-2:] == ('scope2', 'python'), 'cgroup abs' - assert len(cgroup_rel.parts) >= len(cgroup_abs.parts) + assert len(cgroup_rel.parts) >= len(cgroup_abs.parts) - def test_python_isolation_cgroup_two(self, is_su, temp_dir): - if not is_su: - pytest.skip('requires root') - if not 'cgroup' in option.available['features']['isolation']: - pytest.skip('cgroup is not supported') +def test_python_isolation_cgroup_two(require): + require({'privileged_user': True, 'features': {'isolation': ['cgroup']}}) - def set_two_cgroup_path(path, path2): - script_path = f'{option.test_dir}/python/empty' + def set_two_cgroup_path(path, path2): + script_path = f'{option.test_dir}/python/empty' - assert 'success' in self.conf( - { - "listeners": { - "*:7080": {"pass": "applications/one"}, - "*:7081": {"pass": "applications/two"}, - }, - "applications": { - "one": { - "type": "python", - "processes": 1, - "path": script_path, - "working_directory": script_path, - "module": "wsgi", - "isolation": { - 'cgroup': {'path': path}, - }, - }, - "two": { - "type": "python", - "processes": 1, - "path": script_path, - "working_directory": script_path, - "module": "wsgi", - "isolation": { - 'cgroup': {'path': path2}, - }, + assert 'success' in client.conf( + { + "listeners": { + "*:7080": {"pass": "applications/one"}, + "*:7081": {"pass": "applications/two"}, + }, + "applications": { + "one": { + "type": "python", + "processes": 1, + "path": script_path, + "working_directory": script_path, + "module": "wsgi", + "isolation": { + 'cgroup': {'path': path}, }, }, - } - ) - - set_two_cgroup_path('/scope/python', '/scope/python') - assert self.get_cgroup('one') == self.get_cgroup('two') - - set_two_cgroup_path('/scope/python', '/scope2/python') - assert self.get_cgroup('one') != self.get_cgroup('two') - - def test_python_isolation_cgroup_invalid(self, is_su): - if not is_su: - pytest.skip('requires root') - - if not 'cgroup' in option.available['features']['isolation']: - pytest.skip('cgroup is not supported') - - def check_invalid(path): - script_path = f'{option.test_dir}/python/empty' - assert 'error' in self.conf( - { - "listeners": {"*:7080": {"pass": "applications/empty"}}, - "applications": { - "empty": { - "type": "python", - "processes": {"spare": 0}, - "path": script_path, - "working_directory": script_path, - "module": "wsgi", - "isolation": { - 'cgroup': {'path': path}, - }, - } + "two": { + "type": "python", + "processes": 1, + "path": script_path, + "working_directory": script_path, + "module": "wsgi", + "isolation": { + 'cgroup': {'path': path2}, + }, }, - } - ) + }, + } + ) + + set_two_cgroup_path('/scope/python', '/scope/python') + assert get_cgroup('one') == get_cgroup('two') + + set_two_cgroup_path('/scope/python', '/scope2/python') + assert get_cgroup('one') != get_cgroup('two') + + +def test_python_isolation_cgroup_invalid(require): + require({'privileged_user': True, 'features': {'isolation': ['cgroup']}}) + + def check_invalid(path): + script_path = f'{option.test_dir}/python/empty' + assert 'error' in client.conf( + { + "listeners": {"*:7080": {"pass": "applications/empty"}}, + "applications": { + "empty": { + "type": "python", + "processes": {"spare": 0}, + "path": script_path, + "working_directory": script_path, + "module": "wsgi", + "isolation": { + 'cgroup': {'path': path}, + }, + } + }, + } + ) - check_invalid('') - check_invalid('../scope') - check_invalid('scope/../python') + check_invalid('') + check_invalid('../scope') + check_invalid('scope/../python') |