diff options
Diffstat (limited to '')
-rw-r--r-- | test/test_python_isolation.py | 307 |
1 files changed, 153 insertions, 154 deletions
diff --git a/test/test_python_isolation.py b/test/test_python_isolation.py index a9ed2900..260a87a2 100644 --- a/test/test_python_isolation.py +++ b/test/test_python_isolation.py @@ -4,7 +4,7 @@ import subprocess from pathlib import Path import pytest -from unit.applications.lang.python import TestApplicationPython +from unit.applications.lang.python import ApplicationPython from unit.option import option from unit.utils import findmnt from unit.utils import waitformount @@ -12,204 +12,203 @@ from unit.utils import waitforunmount prerequisites = {'modules': {'python': 'any'}, 'features': {'isolation': True}} +client = ApplicationPython() -class TestPythonIsolation(TestApplicationPython): - def get_cgroup(self, app_name): - output = subprocess.check_output( - ['ps', 'ax', '-o', 'pid', '-o', 'cmd'] - ).decode() - pid = re.search( - fr'(\d+)\s*unit: "{app_name}" application', output - ).group(1) +def get_cgroup(app_name): + output = subprocess.check_output( + ['ps', 'ax', '-o', 'pid', '-o', 'cmd'] + ).decode() - cgroup = f'/proc/{pid}/cgroup' + pid = re.search(fr'(\d+)\s*unit: "{app_name}" application', output).group(1) - if not os.path.isfile(cgroup): - pytest.skip(f'no cgroup at {cgroup}') + cgroup = f'/proc/{pid}/cgroup' - with open(cgroup, 'r') as f: - return f.read().rstrip() + if not os.path.isfile(cgroup): + pytest.skip(f'no cgroup at {cgroup}') - def test_python_isolation_rootfs(self, is_su, require, temp_dir): - isolation = {'rootfs': temp_dir} + with open(cgroup, 'r') as f: + return f.read().rstrip() - if not is_su: - require( - { - 'features': { - 'isolation': [ - 'unprivileged_userns_clone', - 'user', - 'mnt', - 'pid', - ] - } - } - ) - isolation['namespaces'] = { - 'mount': True, - 'credential': True, - 'pid': True, +def test_python_isolation_rootfs(is_su, require, temp_dir): + isolation = {'rootfs': temp_dir} + + if not is_su: + require( + { + 'features': { + 'isolation': [ + 'unprivileged_userns_clone', + 'user', + 'mnt', + 'pid', + ] + } } + ) - self.load('ns_inspect', isolation=isolation) + isolation['namespaces'] = { + 'mount': True, + 'credential': True, + 'pid': True, + } - assert not ( - self.getjson(url=f'/?path={temp_dir}')['body']['FileExists'] - ), 'temp_dir does not exists in rootfs' + client.load('ns_inspect', isolation=isolation) - assert self.getjson(url='/?path=/proc/self')['body'][ - 'FileExists' - ], 'no /proc/self' + assert not ( + client.getjson(url=f'/?path={temp_dir}')['body']['FileExists'] + ), 'temp_dir does not exists in rootfs' - assert not ( - self.getjson(url='/?path=/dev/pts')['body']['FileExists'] - ), 'no /dev/pts' + assert client.getjson(url='/?path=/proc/self')['body'][ + 'FileExists' + ], 'no /proc/self' - assert not ( - self.getjson(url='/?path=/sys/kernel')['body']['FileExists'] - ), 'no /sys/kernel' + assert not ( + client.getjson(url='/?path=/dev/pts')['body']['FileExists'] + ), 'no /dev/pts' - ret = self.getjson(url='/?path=/app/python/ns_inspect') + assert not ( + client.getjson(url='/?path=/sys/kernel')['body']['FileExists'] + ), 'no /sys/kernel' - assert ret['body']['FileExists'], 'application exists in rootfs' + ret = client.getjson(url='/?path=/app/python/ns_inspect') - def test_python_isolation_rootfs_no_language_deps(self, require, temp_dir): - require({'privileged_user': True}) + assert ret['body']['FileExists'], 'application exists in rootfs' - isolation = {'rootfs': temp_dir, 'automount': {'language_deps': False}} - self.load('empty', isolation=isolation) - python_path = f'{temp_dir}/usr' +def test_python_isolation_rootfs_no_language_deps(require, temp_dir): + require({'privileged_user': True}) - assert findmnt().find(python_path) == -1 - assert self.get()['status'] != 200, 'disabled language_deps' - assert findmnt().find(python_path) == -1 + isolation = {'rootfs': temp_dir, 'automount': {'language_deps': False}} + client.load('empty', isolation=isolation) - isolation['automount']['language_deps'] = True + python_path = f'{temp_dir}/usr' - self.load('empty', isolation=isolation) + assert findmnt().find(python_path) == -1 + assert client.get()['status'] != 200, 'disabled language_deps' + assert findmnt().find(python_path) == -1 - assert findmnt().find(python_path) == -1 - assert self.get()['status'] == 200, 'enabled language_deps' - assert waitformount(python_path), 'language_deps mount' + isolation['automount']['language_deps'] = True - self.conf({"listeners": {}, "applications": {}}) + client.load('empty', isolation=isolation) - assert waitforunmount(python_path), 'language_deps unmount' + assert findmnt().find(python_path) == -1 + assert client.get()['status'] == 200, 'enabled language_deps' + assert waitformount(python_path), 'language_deps mount' - def test_python_isolation_procfs(self, require, temp_dir): - require({'privileged_user': True}) + client.conf({"listeners": {}, "applications": {}}) - isolation = {'rootfs': temp_dir, 'automount': {'procfs': False}} + assert waitforunmount(python_path), 'language_deps unmount' - self.load('ns_inspect', isolation=isolation) - assert not ( - self.getjson(url='/?path=/proc/self')['body']['FileExists'] - ), 'no /proc/self' +def test_python_isolation_procfs(require, temp_dir): + require({'privileged_user': True}) - isolation['automount']['procfs'] = True + isolation = {'rootfs': temp_dir, 'automount': {'procfs': False}} - self.load('ns_inspect', isolation=isolation) + client.load('ns_inspect', isolation=isolation) - assert self.getjson(url='/?path=/proc/self')['body'][ - 'FileExists' - ], '/proc/self' + assert not ( + client.getjson(url='/?path=/proc/self')['body']['FileExists'] + ), 'no /proc/self' - def test_python_isolation_cgroup(self, require): - require( - {'privileged_user': True, 'features': {'isolation': ['cgroup']}} - ) + isolation['automount']['procfs'] = True - def set_cgroup_path(path): - isolation = {'cgroup': {'path': path}} - self.load('empty', processes=1, isolation=isolation) + client.load('ns_inspect', isolation=isolation) - set_cgroup_path('scope/python') + assert client.getjson(url='/?path=/proc/self')['body'][ + 'FileExists' + ], '/proc/self' - cgroup_rel = Path(self.get_cgroup('empty')) - assert cgroup_rel.parts[-2:] == ('scope', 'python'), 'cgroup rel' - set_cgroup_path('/scope2/python') +def test_python_isolation_cgroup(require): + require({'privileged_user': True, 'features': {'isolation': ['cgroup']}}) - cgroup_abs = Path(self.get_cgroup('empty')) - assert cgroup_abs.parts[-2:] == ('scope2', 'python'), 'cgroup abs' + def set_cgroup_path(path): + isolation = {'cgroup': {'path': path}} + client.load('empty', processes=1, isolation=isolation) - assert len(cgroup_rel.parts) >= len(cgroup_abs.parts) + set_cgroup_path('scope/python') - def test_python_isolation_cgroup_two(self, require): - require( - {'privileged_user': True, 'features': {'isolation': ['cgroup']}} - ) + cgroup_rel = Path(get_cgroup('empty')) + assert cgroup_rel.parts[-2:] == ('scope', 'python'), 'cgroup rel' - def set_two_cgroup_path(path, path2): - script_path = f'{option.test_dir}/python/empty' + set_cgroup_path('/scope2/python') - assert 'success' in self.conf( - { - "listeners": { - "*:7080": {"pass": "applications/one"}, - "*:7081": {"pass": "applications/two"}, - }, - "applications": { - "one": { - "type": "python", - "processes": 1, - "path": script_path, - "working_directory": script_path, - "module": "wsgi", - "isolation": { - 'cgroup': {'path': path}, - }, - }, - "two": { - "type": "python", - "processes": 1, - "path": script_path, - "working_directory": script_path, - "module": "wsgi", - "isolation": { - 'cgroup': {'path': path2}, - }, - }, - }, - } - ) + cgroup_abs = Path(get_cgroup('empty')) + assert cgroup_abs.parts[-2:] == ('scope2', 'python'), 'cgroup abs' - set_two_cgroup_path('/scope/python', '/scope/python') - assert self.get_cgroup('one') == self.get_cgroup('two') + assert len(cgroup_rel.parts) >= len(cgroup_abs.parts) - set_two_cgroup_path('/scope/python', '/scope2/python') - assert self.get_cgroup('one') != self.get_cgroup('two') - def test_python_isolation_cgroup_invalid(self, require): - require( - {'privileged_user': True, 'features': {'isolation': ['cgroup']}} - ) +def test_python_isolation_cgroup_two(require): + require({'privileged_user': True, 'features': {'isolation': ['cgroup']}}) - def check_invalid(path): - script_path = f'{option.test_dir}/python/empty' - assert 'error' in self.conf( - { - "listeners": {"*:7080": {"pass": "applications/empty"}}, - "applications": { - "empty": { - "type": "python", - "processes": {"spare": 0}, - "path": script_path, - "working_directory": script_path, - "module": "wsgi", - "isolation": { - 'cgroup': {'path': path}, - }, - } + def set_two_cgroup_path(path, path2): + script_path = f'{option.test_dir}/python/empty' + + assert 'success' in client.conf( + { + "listeners": { + "*:7080": {"pass": "applications/one"}, + "*:7081": {"pass": "applications/two"}, + }, + "applications": { + "one": { + "type": "python", + "processes": 1, + "path": script_path, + "working_directory": script_path, + "module": "wsgi", + "isolation": { + 'cgroup': {'path': path}, + }, }, - } - ) + "two": { + "type": "python", + "processes": 1, + "path": script_path, + "working_directory": script_path, + "module": "wsgi", + "isolation": { + 'cgroup': {'path': path2}, + }, + }, + }, + } + ) + + set_two_cgroup_path('/scope/python', '/scope/python') + assert get_cgroup('one') == get_cgroup('two') + + set_two_cgroup_path('/scope/python', '/scope2/python') + assert get_cgroup('one') != get_cgroup('two') + + +def test_python_isolation_cgroup_invalid(require): + require({'privileged_user': True, 'features': {'isolation': ['cgroup']}}) + + def check_invalid(path): + script_path = f'{option.test_dir}/python/empty' + assert 'error' in client.conf( + { + "listeners": {"*:7080": {"pass": "applications/empty"}}, + "applications": { + "empty": { + "type": "python", + "processes": {"spare": 0}, + "path": script_path, + "working_directory": script_path, + "module": "wsgi", + "isolation": { + 'cgroup': {'path': path}, + }, + } + }, + } + ) - check_invalid('') - check_invalid('../scope') - check_invalid('scope/../python') + check_invalid('') + check_invalid('../scope') + check_invalid('scope/../python') |