summaryrefslogtreecommitdiffhomepage
path: root/test/test_python_isolation.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_python_isolation.py')
-rw-r--r--test/test_python_isolation.py315
1 files changed, 151 insertions, 164 deletions
diff --git a/test/test_python_isolation.py b/test/test_python_isolation.py
index c524aea0..260a87a2 100644
--- a/test/test_python_isolation.py
+++ b/test/test_python_isolation.py
@@ -4,224 +4,211 @@ import subprocess
from pathlib import Path
import pytest
-from unit.applications.lang.python import TestApplicationPython
+from unit.applications.lang.python import ApplicationPython
from unit.option import option
from unit.utils import findmnt
from unit.utils import waitformount
from unit.utils import waitforunmount
+prerequisites = {'modules': {'python': 'any'}, 'features': {'isolation': True}}
-class TestPythonIsolation(TestApplicationPython):
- prerequisites = {'modules': {'python': 'any'}, 'features': ['isolation']}
+client = ApplicationPython()
- def get_cgroup(self, app_name):
- output = subprocess.check_output(
- ['ps', 'ax', '-o', 'pid', '-o', 'cmd']
- ).decode()
- pid = re.search(
- fr'(\d+)\s*unit: "{app_name}" application', output
- ).group(1)
+def get_cgroup(app_name):
+ output = subprocess.check_output(
+ ['ps', 'ax', '-o', 'pid', '-o', 'cmd']
+ ).decode()
- cgroup = f'/proc/{pid}/cgroup'
+ pid = re.search(fr'(\d+)\s*unit: "{app_name}" application', output).group(1)
- if not os.path.isfile(cgroup):
- pytest.skip(f'no cgroup at {cgroup}')
+ cgroup = f'/proc/{pid}/cgroup'
- with open(cgroup, 'r') as f:
- return f.read().rstrip()
+ if not os.path.isfile(cgroup):
+ pytest.skip(f'no cgroup at {cgroup}')
- def test_python_isolation_rootfs(self, is_su, temp_dir):
- isolation_features = option.available['features']['isolation'].keys()
+ with open(cgroup, 'r') as f:
+ return f.read().rstrip()
- if not is_su:
- if not 'unprivileged_userns_clone' in isolation_features:
- pytest.skip('requires unprivileged userns or root')
- if 'user' not in isolation_features:
- pytest.skip('user namespace is not supported')
+def test_python_isolation_rootfs(is_su, require, temp_dir):
+ isolation = {'rootfs': temp_dir}
- if 'mnt' not in isolation_features:
- pytest.skip('mnt namespace is not supported')
+ if not is_su:
+ require(
+ {
+ 'features': {
+ 'isolation': [
+ 'unprivileged_userns_clone',
+ 'user',
+ 'mnt',
+ 'pid',
+ ]
+ }
+ }
+ )
- if 'pid' not in isolation_features:
- pytest.skip('pid namespace is not supported')
+ isolation['namespaces'] = {
+ 'mount': True,
+ 'credential': True,
+ 'pid': True,
+ }
- isolation = {'rootfs': temp_dir}
+ client.load('ns_inspect', isolation=isolation)
- if not is_su:
- isolation['namespaces'] = {
- 'mount': True,
- 'credential': True,
- 'pid': True,
- }
+ assert not (
+ client.getjson(url=f'/?path={temp_dir}')['body']['FileExists']
+ ), 'temp_dir does not exists in rootfs'
- self.load('ns_inspect', isolation=isolation)
+ assert client.getjson(url='/?path=/proc/self')['body'][
+ 'FileExists'
+ ], 'no /proc/self'
- assert (
- self.getjson(url=f'/?path={temp_dir}')['body']['FileExists']
- == False
- ), 'temp_dir does not exists in rootfs'
+ assert not (
+ client.getjson(url='/?path=/dev/pts')['body']['FileExists']
+ ), 'no /dev/pts'
- assert (
- self.getjson(url='/?path=/proc/self')['body']['FileExists'] == True
- ), 'no /proc/self'
+ assert not (
+ client.getjson(url='/?path=/sys/kernel')['body']['FileExists']
+ ), 'no /sys/kernel'
- assert (
- self.getjson(url='/?path=/dev/pts')['body']['FileExists'] == False
- ), 'no /dev/pts'
+ ret = client.getjson(url='/?path=/app/python/ns_inspect')
- assert (
- self.getjson(url='/?path=/sys/kernel')['body']['FileExists']
- == False
- ), 'no /sys/kernel'
+ assert ret['body']['FileExists'], 'application exists in rootfs'
- ret = self.getjson(url='/?path=/app/python/ns_inspect')
- assert ret['body']['FileExists'] == True, 'application exists in rootfs'
+def test_python_isolation_rootfs_no_language_deps(require, temp_dir):
+ require({'privileged_user': True})
- def test_python_isolation_rootfs_no_language_deps(self, is_su, temp_dir):
- if not is_su:
- pytest.skip('requires root')
+ isolation = {'rootfs': temp_dir, 'automount': {'language_deps': False}}
+ client.load('empty', isolation=isolation)
- isolation = {'rootfs': temp_dir, 'automount': {'language_deps': False}}
- self.load('empty', isolation=isolation)
+ python_path = f'{temp_dir}/usr'
- python_path = f'{temp_dir}/usr'
+ assert findmnt().find(python_path) == -1
+ assert client.get()['status'] != 200, 'disabled language_deps'
+ assert findmnt().find(python_path) == -1
- assert findmnt().find(python_path) == -1
- assert self.get()['status'] != 200, 'disabled language_deps'
- assert findmnt().find(python_path) == -1
+ isolation['automount']['language_deps'] = True
- isolation['automount']['language_deps'] = True
+ client.load('empty', isolation=isolation)
- self.load('empty', isolation=isolation)
+ assert findmnt().find(python_path) == -1
+ assert client.get()['status'] == 200, 'enabled language_deps'
+ assert waitformount(python_path), 'language_deps mount'
- assert findmnt().find(python_path) == -1
- assert self.get()['status'] == 200, 'enabled language_deps'
- assert waitformount(python_path), 'language_deps mount'
+ client.conf({"listeners": {}, "applications": {}})
- self.conf({"listeners": {}, "applications": {}})
+ assert waitforunmount(python_path), 'language_deps unmount'
- assert waitforunmount(python_path), 'language_deps unmount'
- def test_python_isolation_procfs(self, is_su, temp_dir):
- if not is_su:
- pytest.skip('requires root')
+def test_python_isolation_procfs(require, temp_dir):
+ require({'privileged_user': True})
- isolation = {'rootfs': temp_dir, 'automount': {'procfs': False}}
+ isolation = {'rootfs': temp_dir, 'automount': {'procfs': False}}
- self.load('ns_inspect', isolation=isolation)
+ client.load('ns_inspect', isolation=isolation)
- assert (
- self.getjson(url='/?path=/proc/self')['body']['FileExists'] == False
- ), 'no /proc/self'
+ assert not (
+ client.getjson(url='/?path=/proc/self')['body']['FileExists']
+ ), 'no /proc/self'
- isolation['automount']['procfs'] = True
+ isolation['automount']['procfs'] = True
- self.load('ns_inspect', isolation=isolation)
+ client.load('ns_inspect', isolation=isolation)
- assert (
- self.getjson(url='/?path=/proc/self')['body']['FileExists'] == True
- ), '/proc/self'
+ assert client.getjson(url='/?path=/proc/self')['body'][
+ 'FileExists'
+ ], '/proc/self'
- def test_python_isolation_cgroup(self, is_su, temp_dir):
- if not is_su:
- pytest.skip('requires root')
- if not 'cgroup' in option.available['features']['isolation']:
- pytest.skip('cgroup is not supported')
+def test_python_isolation_cgroup(require):
+ require({'privileged_user': True, 'features': {'isolation': ['cgroup']}})
- def set_cgroup_path(path):
- isolation = {'cgroup': {'path': path}}
- self.load('empty', processes=1, isolation=isolation)
+ def set_cgroup_path(path):
+ isolation = {'cgroup': {'path': path}}
+ client.load('empty', processes=1, isolation=isolation)
- set_cgroup_path('scope/python')
+ set_cgroup_path('scope/python')
- cgroup_rel = Path(self.get_cgroup('empty'))
- assert cgroup_rel.parts[-2:] == ('scope', 'python'), 'cgroup rel'
+ cgroup_rel = Path(get_cgroup('empty'))
+ assert cgroup_rel.parts[-2:] == ('scope', 'python'), 'cgroup rel'
- set_cgroup_path('/scope2/python')
+ set_cgroup_path('/scope2/python')
- cgroup_abs = Path(self.get_cgroup('empty'))
- assert cgroup_abs.parts[-2:] == ('scope2', 'python'), 'cgroup abs'
+ cgroup_abs = Path(get_cgroup('empty'))
+ assert cgroup_abs.parts[-2:] == ('scope2', 'python'), 'cgroup abs'
- assert len(cgroup_rel.parts) >= len(cgroup_abs.parts)
+ assert len(cgroup_rel.parts) >= len(cgroup_abs.parts)
- def test_python_isolation_cgroup_two(self, is_su, temp_dir):
- if not is_su:
- pytest.skip('requires root')
- if not 'cgroup' in option.available['features']['isolation']:
- pytest.skip('cgroup is not supported')
+def test_python_isolation_cgroup_two(require):
+ require({'privileged_user': True, 'features': {'isolation': ['cgroup']}})
- def set_two_cgroup_path(path, path2):
- script_path = f'{option.test_dir}/python/empty'
+ def set_two_cgroup_path(path, path2):
+ script_path = f'{option.test_dir}/python/empty'
- assert 'success' in self.conf(
- {
- "listeners": {
- "*:7080": {"pass": "applications/one"},
- "*:7081": {"pass": "applications/two"},
- },
- "applications": {
- "one": {
- "type": "python",
- "processes": 1,
- "path": script_path,
- "working_directory": script_path,
- "module": "wsgi",
- "isolation": {
- 'cgroup': {'path': path},
- },
- },
- "two": {
- "type": "python",
- "processes": 1,
- "path": script_path,
- "working_directory": script_path,
- "module": "wsgi",
- "isolation": {
- 'cgroup': {'path': path2},
- },
+ assert 'success' in client.conf(
+ {
+ "listeners": {
+ "*:7080": {"pass": "applications/one"},
+ "*:7081": {"pass": "applications/two"},
+ },
+ "applications": {
+ "one": {
+ "type": "python",
+ "processes": 1,
+ "path": script_path,
+ "working_directory": script_path,
+ "module": "wsgi",
+ "isolation": {
+ 'cgroup': {'path': path},
},
},
- }
- )
-
- set_two_cgroup_path('/scope/python', '/scope/python')
- assert self.get_cgroup('one') == self.get_cgroup('two')
-
- set_two_cgroup_path('/scope/python', '/scope2/python')
- assert self.get_cgroup('one') != self.get_cgroup('two')
-
- def test_python_isolation_cgroup_invalid(self, is_su):
- if not is_su:
- pytest.skip('requires root')
-
- if not 'cgroup' in option.available['features']['isolation']:
- pytest.skip('cgroup is not supported')
-
- def check_invalid(path):
- script_path = f'{option.test_dir}/python/empty'
- assert 'error' in self.conf(
- {
- "listeners": {"*:7080": {"pass": "applications/empty"}},
- "applications": {
- "empty": {
- "type": "python",
- "processes": {"spare": 0},
- "path": script_path,
- "working_directory": script_path,
- "module": "wsgi",
- "isolation": {
- 'cgroup': {'path': path},
- },
- }
+ "two": {
+ "type": "python",
+ "processes": 1,
+ "path": script_path,
+ "working_directory": script_path,
+ "module": "wsgi",
+ "isolation": {
+ 'cgroup': {'path': path2},
+ },
},
- }
- )
+ },
+ }
+ )
+
+ set_two_cgroup_path('/scope/python', '/scope/python')
+ assert get_cgroup('one') == get_cgroup('two')
+
+ set_two_cgroup_path('/scope/python', '/scope2/python')
+ assert get_cgroup('one') != get_cgroup('two')
+
+
+def test_python_isolation_cgroup_invalid(require):
+ require({'privileged_user': True, 'features': {'isolation': ['cgroup']}})
+
+ def check_invalid(path):
+ script_path = f'{option.test_dir}/python/empty'
+ assert 'error' in client.conf(
+ {
+ "listeners": {"*:7080": {"pass": "applications/empty"}},
+ "applications": {
+ "empty": {
+ "type": "python",
+ "processes": {"spare": 0},
+ "path": script_path,
+ "working_directory": script_path,
+ "module": "wsgi",
+ "isolation": {
+ 'cgroup': {'path': path},
+ },
+ }
+ },
+ }
+ )
- check_invalid('')
- check_invalid('../scope')
- check_invalid('scope/../python')
+ check_invalid('')
+ check_invalid('../scope')
+ check_invalid('scope/../python')