summaryrefslogtreecommitdiffhomepage
path: root/test/test_go_isolation.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_go_isolation.py')
-rw-r--r--test/test_go_isolation.py533
1 files changed, 268 insertions, 265 deletions
diff --git a/test/test_go_isolation.py b/test/test_go_isolation.py
index f063f987..ba3390ea 100644
--- a/test/test_go_isolation.py
+++ b/test/test_go_isolation.py
@@ -3,362 +3,365 @@ import os
import pwd
import pytest
-from unit.applications.lang.go import TestApplicationGo
+from unit.applications.lang.go import ApplicationGo
from unit.option import option
from unit.utils import getns
+prerequisites = {'modules': {'go': 'any'}, 'features': {'isolation': True}}
-class TestGoIsolation(TestApplicationGo):
- prerequisites = {'modules': {'go': 'any'}, 'features': ['isolation']}
+client = ApplicationGo()
- @pytest.fixture(autouse=True)
- def setup_method_fixture(self, request, skip_alert):
- skip_alert(r'\[unit\] close\(\d+\) failed: Bad file descriptor')
- def unpriv_creds(self):
- nobody_uid = pwd.getpwnam('nobody').pw_uid
+def unpriv_creds():
+ nobody_uid = pwd.getpwnam('nobody').pw_uid
- try:
- nogroup_gid = grp.getgrnam('nogroup').gr_gid
- nogroup = 'nogroup'
- except KeyError:
- nogroup_gid = grp.getgrnam('nobody').gr_gid
- nogroup = 'nobody'
+ try:
+ nogroup_gid = grp.getgrnam('nogroup').gr_gid
+ nogroup = 'nogroup'
+ except KeyError:
+ nogroup_gid = grp.getgrnam('nobody').gr_gid
+ nogroup = 'nobody'
- return (nobody_uid, nogroup_gid, nogroup)
+ return (nobody_uid, nogroup_gid, nogroup)
- def isolation_key(self, key):
- return key in option.available['features']['isolation'].keys()
- def test_isolation_values(self):
- self.load('ns_inspect')
+def test_isolation_values():
+ client.load('ns_inspect')
- obj = self.getjson()['body']
+ obj = client.getjson()['body']
- for ns, ns_value in option.available['features']['isolation'].items():
- if ns.upper() in obj['NS']:
- assert obj['NS'][ns.upper()] == ns_value, f'{ns} match'
+ for ns, ns_value in option.available['features']['isolation'].items():
+ if ns.upper() in obj['NS']:
+ assert obj['NS'][ns.upper()] == ns_value, f'{ns} match'
- def test_isolation_unpriv_user(self, is_su):
- if not self.isolation_key('unprivileged_userns_clone'):
- pytest.skip('unprivileged clone is not available')
- if is_su:
- pytest.skip('privileged tests, skip this')
+def test_isolation_unpriv_user(require):
+ require(
+ {
+ 'privileged_user': False,
+ 'features': {'isolation': ['unprivileged_userns_clone']},
+ }
+ )
- self.load('ns_inspect')
- obj = self.getjson()['body']
+ client.load('ns_inspect')
+ obj = client.getjson()['body']
- assert obj['UID'] == os.geteuid(), 'uid match'
- assert obj['GID'] == os.getegid(), 'gid match'
+ assert obj['UID'] == os.geteuid(), 'uid match'
+ assert obj['GID'] == os.getegid(), 'gid match'
- self.load('ns_inspect', isolation={'namespaces': {'credential': True}})
+ client.load('ns_inspect', isolation={'namespaces': {'credential': True}})
- obj = self.getjson()['body']
+ obj = client.getjson()['body']
- nobody_uid, nogroup_gid, nogroup = self.unpriv_creds()
+ nobody_uid, nogroup_gid, nogroup = unpriv_creds()
- # unprivileged unit map itself to nobody in the container by default
- assert obj['UID'] == nobody_uid, 'uid of nobody'
- assert obj['GID'] == nogroup_gid, f'gid of {nogroup}'
+ # unprivileged unit map itself to nobody in the container by default
+ assert obj['UID'] == nobody_uid, 'uid of nobody'
+ assert obj['GID'] == nogroup_gid, f'gid of {nogroup}'
- self.load(
- 'ns_inspect',
- user='root',
- isolation={'namespaces': {'credential': True}},
- )
+ client.load(
+ 'ns_inspect',
+ user='root',
+ isolation={'namespaces': {'credential': True}},
+ )
- obj = self.getjson()['body']
+ obj = client.getjson()['body']
- assert obj['UID'] == 0, 'uid match user=root'
- assert obj['GID'] == 0, 'gid match user=root'
+ assert obj['UID'] == 0, 'uid match user=root'
+ assert obj['GID'] == 0, 'gid match user=root'
- self.load(
- 'ns_inspect',
- user='root',
- group=nogroup,
- isolation={'namespaces': {'credential': True}},
- )
+ client.load(
+ 'ns_inspect',
+ user='root',
+ group=nogroup,
+ isolation={'namespaces': {'credential': True}},
+ )
- obj = self.getjson()['body']
+ obj = client.getjson()['body']
- assert obj['UID'] == 0, 'uid match user=root group=nogroup'
- assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup'
+ assert obj['UID'] == 0, 'uid match user=root group=nogroup'
+ assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup'
- self.load(
- 'ns_inspect',
- user='root',
- group='root',
- isolation={
- 'namespaces': {'credential': True},
- 'uidmap': [{'container': 0, 'host': os.geteuid(), 'size': 1}],
- 'gidmap': [{'container': 0, 'host': os.getegid(), 'size': 1}],
- },
- )
+ client.load(
+ 'ns_inspect',
+ user='root',
+ group='root',
+ isolation={
+ 'namespaces': {'credential': True},
+ 'uidmap': [{'container': 0, 'host': os.geteuid(), 'size': 1}],
+ 'gidmap': [{'container': 0, 'host': os.getegid(), 'size': 1}],
+ },
+ )
- obj = self.getjson()['body']
+ obj = client.getjson()['body']
- assert obj['UID'] == 0, 'uid match uidmap'
- assert obj['GID'] == 0, 'gid match gidmap'
+ assert obj['UID'] == 0, 'uid match uidmap'
+ assert obj['GID'] == 0, 'gid match gidmap'
- def test_isolation_priv_user(self, is_su):
- if not is_su:
- pytest.skip('unprivileged tests, skip this')
- self.load('ns_inspect')
+def test_isolation_priv_user(require):
+ require({'privileged_user': True})
- nobody_uid, nogroup_gid, nogroup = self.unpriv_creds()
+ client.load('ns_inspect')
- obj = self.getjson()['body']
+ nobody_uid, nogroup_gid, nogroup = unpriv_creds()
- assert obj['UID'] == nobody_uid, 'uid match'
- assert obj['GID'] == nogroup_gid, 'gid match'
+ obj = client.getjson()['body']
- self.load('ns_inspect', isolation={'namespaces': {'credential': True}})
+ assert obj['UID'] == nobody_uid, 'uid match'
+ assert obj['GID'] == nogroup_gid, 'gid match'
- obj = self.getjson()['body']
+ client.load('ns_inspect', isolation={'namespaces': {'credential': True}})
- # privileged unit map app creds in the container by default
- assert obj['UID'] == nobody_uid, 'uid nobody'
- assert obj['GID'] == nogroup_gid, 'gid nobody'
+ obj = client.getjson()['body']
- self.load(
- 'ns_inspect',
- user='root',
- isolation={'namespaces': {'credential': True}},
- )
+ # privileged unit map app creds in the container by default
+ assert obj['UID'] == nobody_uid, 'uid nobody'
+ assert obj['GID'] == nogroup_gid, 'gid nobody'
- obj = self.getjson()['body']
+ client.load(
+ 'ns_inspect',
+ user='root',
+ isolation={'namespaces': {'credential': True}},
+ )
- assert obj['UID'] == 0, 'uid nobody user=root'
- assert obj['GID'] == 0, 'gid nobody user=root'
+ obj = client.getjson()['body']
- self.load(
- 'ns_inspect',
- user='root',
- group=nogroup,
- isolation={'namespaces': {'credential': True}},
- )
+ assert obj['UID'] == 0, 'uid nobody user=root'
+ assert obj['GID'] == 0, 'gid nobody user=root'
- obj = self.getjson()['body']
+ client.load(
+ 'ns_inspect',
+ user='root',
+ group=nogroup,
+ isolation={'namespaces': {'credential': True}},
+ )
- assert obj['UID'] == 0, 'uid match user=root group=nogroup'
- assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup'
+ obj = client.getjson()['body']
- self.load(
- 'ns_inspect',
- user='root',
- group='root',
- isolation={
- 'namespaces': {'credential': True},
- 'uidmap': [{'container': 0, 'host': 0, 'size': 1}],
- 'gidmap': [{'container': 0, 'host': 0, 'size': 1}],
- },
- )
+ assert obj['UID'] == 0, 'uid match user=root group=nogroup'
+ assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup'
- obj = self.getjson()['body']
+ client.load(
+ 'ns_inspect',
+ user='root',
+ group='root',
+ isolation={
+ 'namespaces': {'credential': True},
+ 'uidmap': [{'container': 0, 'host': 0, 'size': 1}],
+ 'gidmap': [{'container': 0, 'host': 0, 'size': 1}],
+ },
+ )
- assert obj['UID'] == 0, 'uid match uidmap user=root'
- assert obj['GID'] == 0, 'gid match gidmap user=root'
+ obj = client.getjson()['body']
- # map 65535 uids
- self.load(
- 'ns_inspect',
- user='nobody',
- isolation={
- 'namespaces': {'credential': True},
- 'uidmap': [{'container': 0, 'host': 0, 'size': nobody_uid + 1}],
- },
- )
+ assert obj['UID'] == 0, 'uid match uidmap user=root'
+ assert obj['GID'] == 0, 'gid match gidmap user=root'
- obj = self.getjson()['body']
+ # map 65535 uids
+ client.load(
+ 'ns_inspect',
+ user='nobody',
+ isolation={
+ 'namespaces': {'credential': True},
+ 'uidmap': [{'container': 0, 'host': 0, 'size': nobody_uid + 1}],
+ },
+ )
- assert obj['UID'] == nobody_uid, 'uid match uidmap user=nobody'
- assert obj['GID'] == nogroup_gid, 'gid match uidmap user=nobody'
+ obj = client.getjson()['body']
- def test_isolation_mnt(self):
- if not self.isolation_key('mnt'):
- pytest.skip('mnt namespace is not supported')
+ assert obj['UID'] == nobody_uid, 'uid match uidmap user=nobody'
+ assert obj['GID'] == nogroup_gid, 'gid match uidmap user=nobody'
- if not self.isolation_key('unprivileged_userns_clone'):
- pytest.skip('unprivileged clone is not available')
- self.load(
- 'ns_inspect',
- isolation={'namespaces': {'mount': True, 'credential': True}},
+def test_isolation_mnt(require):
+ require(
+ {
+ 'features': {'isolation': ['unprivileged_userns_clone', 'mnt']},
+ }
+ )
+
+ client.load(
+ 'ns_inspect',
+ isolation={'namespaces': {'mount': True, 'credential': True}},
+ )
+
+ obj = client.getjson()['body']
+
+ # all but user and mnt
+ allns = list(option.available['features']['isolation'].keys())
+ allns.remove('user')
+ allns.remove('mnt')
+
+ for ns in allns:
+ if ns.upper() in obj['NS']:
+ assert (
+ obj['NS'][ns.upper()]
+ == option.available['features']['isolation'][ns]
+ ), f'{ns} match'
+
+ assert obj['NS']['MNT'] != getns('mnt'), 'mnt set'
+ assert obj['NS']['USER'] != getns('user'), 'user set'
+
+
+def test_isolation_pid(is_su, require):
+ require({'features': {'isolation': ['pid']}})
+
+ if not is_su:
+ require(
+ {
+ 'features': {
+ 'isolation': [
+ 'unprivileged_userns_clone',
+ 'user',
+ 'mnt',
+ ]
+ }
+ }
)
- obj = self.getjson()['body']
-
- # all but user and mnt
- allns = list(option.available['features']['isolation'].keys())
- allns.remove('user')
- allns.remove('mnt')
-
- for ns in allns:
- if ns.upper() in obj['NS']:
- assert (
- obj['NS'][ns.upper()]
- == option.available['features']['isolation'][ns]
- ), f'{ns} match'
-
- assert obj['NS']['MNT'] != getns('mnt'), 'mnt set'
- assert obj['NS']['USER'] != getns('user'), 'user set'
-
- def test_isolation_pid(self, is_su):
- if not self.isolation_key('pid'):
- pytest.skip('pid namespace is not supported')
+ isolation = {'namespaces': {'pid': True}}
- if not is_su:
- if not self.isolation_key('unprivileged_userns_clone'):
- pytest.skip('unprivileged clone is not available')
+ if not is_su:
+ isolation['namespaces']['mount'] = True
+ isolation['namespaces']['credential'] = True
- if not self.isolation_key('user'):
- pytest.skip('user namespace is not supported')
+ client.load('ns_inspect', isolation=isolation)
- if not self.isolation_key('mnt'):
- pytest.skip('mnt namespace is not supported')
+ obj = client.getjson()['body']
- isolation = {'namespaces': {'pid': True}}
+ assert obj['PID'] == 2, 'pid of container is 2'
- if not is_su:
- isolation['namespaces']['mount'] = True
- isolation['namespaces']['credential'] = True
- self.load('ns_inspect', isolation=isolation)
+def test_isolation_namespace_false():
+ client.load('ns_inspect')
+ allns = list(option.available['features']['isolation'].keys())
- obj = self.getjson()['body']
+ remove_list = ['unprivileged_userns_clone', 'ipc', 'cgroup']
+ allns = [ns for ns in allns if ns not in remove_list]
- assert obj['PID'] == 2, 'pid of container is 2'
+ namespaces = {}
+ for ns in allns:
+ if ns == 'user':
+ namespaces['credential'] = False
+ elif ns == 'mnt':
+ namespaces['mount'] = False
+ elif ns == 'net':
+ namespaces['network'] = False
+ elif ns == 'uts':
+ namespaces['uname'] = False
+ else:
+ namespaces[ns] = False
- def test_isolation_namespace_false(self):
- self.load('ns_inspect')
- allns = list(option.available['features']['isolation'].keys())
+ client.load('ns_inspect', isolation={'namespaces': namespaces})
- remove_list = ['unprivileged_userns_clone', 'ipc', 'cgroup']
- allns = [ns for ns in allns if ns not in remove_list]
+ obj = client.getjson()['body']
- namespaces = {}
- for ns in allns:
- if ns == 'user':
- namespaces['credential'] = False
- elif ns == 'mnt':
- namespaces['mount'] = False
- elif ns == 'net':
- namespaces['network'] = False
- elif ns == 'uts':
- namespaces['uname'] = False
- else:
- namespaces[ns] = False
+ for ns in allns:
+ if ns.upper() in obj['NS']:
+ assert (
+ obj['NS'][ns.upper()]
+ == option.available['features']['isolation'][ns]
+ ), f'{ns} match'
- self.load('ns_inspect', isolation={'namespaces': namespaces})
- obj = self.getjson()['body']
-
- for ns in allns:
- if ns.upper() in obj['NS']:
- assert (
- obj['NS'][ns.upper()]
- == option.available['features']['isolation'][ns]
- ), f'{ns} match'
-
- def test_go_isolation_rootfs_container(self, is_su, temp_dir):
- if not is_su:
- if not self.isolation_key('unprivileged_userns_clone'):
- pytest.skip('unprivileged clone is not available')
-
- if not self.isolation_key('user'):
- pytest.skip('user namespace is not supported')
-
- if not self.isolation_key('mnt'):
- pytest.skip('mnt namespace is not supported')
-
- if not self.isolation_key('pid'):
- pytest.skip('pid namespace is not supported')
-
- isolation = {'rootfs': temp_dir}
-
- if not is_su:
- isolation['namespaces'] = {
- 'mount': True,
- 'credential': True,
- 'pid': True,
+def test_go_isolation_rootfs_container(is_su, require, temp_dir):
+ if not is_su:
+ require(
+ {
+ 'features': {
+ 'isolation': [
+ 'unprivileged_userns_clone',
+ 'user',
+ 'mnt',
+ 'pid',
+ ]
+ }
}
+ )
- self.load('ns_inspect', isolation=isolation)
+ isolation = {'rootfs': temp_dir}
- obj = self.getjson(url='/?file=/go/app')['body']
+ if not is_su:
+ isolation['namespaces'] = {
+ 'mount': True,
+ 'credential': True,
+ 'pid': True,
+ }
- assert obj['FileExists'] == True, 'app relative to rootfs'
+ client.load('ns_inspect', isolation=isolation)
- obj = self.getjson(url='/?file=/bin/sh')['body']
- assert obj['FileExists'] == False, 'file should not exists'
+ obj = client.getjson(url='/?file=/go/app')['body']
- def test_go_isolation_rootfs_container_priv(self, is_su, temp_dir):
- if not is_su:
- pytest.skip('requires root')
+ assert obj['FileExists'], 'app relative to rootfs'
- if not self.isolation_key('mnt'):
- pytest.skip('mnt namespace is not supported')
+ obj = client.getjson(url='/?file=/bin/sh')['body']
+ assert not obj['FileExists'], 'file should not exists'
- isolation = {
- 'namespaces': {'mount': True},
- 'rootfs': temp_dir,
- }
- self.load('ns_inspect', isolation=isolation)
+def test_go_isolation_rootfs_container_priv(require, temp_dir):
+ require({'privileged_user': True, 'features': {'isolation': ['mnt']}})
- obj = self.getjson(url='/?file=/go/app')['body']
+ isolation = {
+ 'namespaces': {'mount': True},
+ 'rootfs': temp_dir,
+ }
- assert obj['FileExists'] == True, 'app relative to rootfs'
+ client.load('ns_inspect', isolation=isolation)
- obj = self.getjson(url='/?file=/bin/sh')['body']
- assert obj['FileExists'] == False, 'file should not exists'
+ obj = client.getjson(url='/?file=/go/app')['body']
- def test_go_isolation_rootfs_automount_tmpfs(self, is_su, temp_dir):
- try:
- open("/proc/self/mountinfo")
- except:
- pytest.skip('The system lacks /proc/self/mountinfo file')
+ assert obj['FileExists'], 'app relative to rootfs'
- if not is_su:
- if not self.isolation_key('unprivileged_userns_clone'):
- pytest.skip('unprivileged clone is not available')
+ obj = client.getjson(url='/?file=/bin/sh')['body']
+ assert not obj['FileExists'], 'file should not exists'
- if not self.isolation_key('user'):
- pytest.skip('user namespace is not supported')
- if not self.isolation_key('mnt'):
- pytest.skip('mnt namespace is not supported')
+def test_go_isolation_rootfs_automount_tmpfs(is_su, require, temp_dir):
+ try:
+ open("/proc/self/mountinfo")
+ except:
+ pytest.skip('The system lacks /proc/self/mountinfo file')
- if not self.isolation_key('pid'):
- pytest.skip('pid namespace is not supported')
+ if not is_su:
+ require(
+ {
+ 'features': {
+ 'isolation': [
+ 'unprivileged_userns_clone',
+ 'user',
+ 'mnt',
+ 'pid',
+ ]
+ }
+ }
+ )
- isolation = {'rootfs': temp_dir}
+ isolation = {'rootfs': temp_dir}
- if not is_su:
- isolation['namespaces'] = {
- 'mount': True,
- 'credential': True,
- 'pid': True,
- }
+ if not is_su:
+ isolation['namespaces'] = {
+ 'mount': True,
+ 'credential': True,
+ 'pid': True,
+ }
- isolation['automount'] = {'tmpfs': False}
+ isolation['automount'] = {'tmpfs': False}
- self.load('ns_inspect', isolation=isolation)
+ client.load('ns_inspect', isolation=isolation)
- obj = self.getjson(url='/?mounts=true')['body']
+ obj = client.getjson(url='/?mounts=true')['body']
- assert (
- "/ /tmp" not in obj['Mounts'] and "tmpfs" not in obj['Mounts']
- ), 'app has no /tmp mounted'
+ assert (
+ "/ /tmp" not in obj['Mounts'] and "tmpfs" not in obj['Mounts']
+ ), 'app has no /tmp mounted'
- isolation['automount'] = {'tmpfs': True}
+ isolation['automount'] = {'tmpfs': True}
- self.load('ns_inspect', isolation=isolation)
+ client.load('ns_inspect', isolation=isolation)
- obj = self.getjson(url='/?mounts=true')['body']
+ obj = client.getjson(url='/?mounts=true')['body']
- assert (
- "/ /tmp" in obj['Mounts'] and "tmpfs" in obj['Mounts']
- ), 'app has /tmp mounted on /'
+ assert (
+ "/ /tmp" in obj['Mounts'] and "tmpfs" in obj['Mounts']
+ ), 'app has /tmp mounted on /'