diff options
Diffstat (limited to 'test/test_go_isolation.py')
-rw-r--r-- | test/test_go_isolation.py | 533 |
1 files changed, 268 insertions, 265 deletions
diff --git a/test/test_go_isolation.py b/test/test_go_isolation.py index f063f987..ba3390ea 100644 --- a/test/test_go_isolation.py +++ b/test/test_go_isolation.py @@ -3,362 +3,365 @@ import os import pwd import pytest -from unit.applications.lang.go import TestApplicationGo +from unit.applications.lang.go import ApplicationGo from unit.option import option from unit.utils import getns +prerequisites = {'modules': {'go': 'any'}, 'features': {'isolation': True}} -class TestGoIsolation(TestApplicationGo): - prerequisites = {'modules': {'go': 'any'}, 'features': ['isolation']} +client = ApplicationGo() - @pytest.fixture(autouse=True) - def setup_method_fixture(self, request, skip_alert): - skip_alert(r'\[unit\] close\(\d+\) failed: Bad file descriptor') - def unpriv_creds(self): - nobody_uid = pwd.getpwnam('nobody').pw_uid +def unpriv_creds(): + nobody_uid = pwd.getpwnam('nobody').pw_uid - try: - nogroup_gid = grp.getgrnam('nogroup').gr_gid - nogroup = 'nogroup' - except KeyError: - nogroup_gid = grp.getgrnam('nobody').gr_gid - nogroup = 'nobody' + try: + nogroup_gid = grp.getgrnam('nogroup').gr_gid + nogroup = 'nogroup' + except KeyError: + nogroup_gid = grp.getgrnam('nobody').gr_gid + nogroup = 'nobody' - return (nobody_uid, nogroup_gid, nogroup) + return (nobody_uid, nogroup_gid, nogroup) - def isolation_key(self, key): - return key in option.available['features']['isolation'].keys() - def test_isolation_values(self): - self.load('ns_inspect') +def test_isolation_values(): + client.load('ns_inspect') - obj = self.getjson()['body'] + obj = client.getjson()['body'] - for ns, ns_value in option.available['features']['isolation'].items(): - if ns.upper() in obj['NS']: - assert obj['NS'][ns.upper()] == ns_value, f'{ns} match' + for ns, ns_value in option.available['features']['isolation'].items(): + if ns.upper() in obj['NS']: + assert obj['NS'][ns.upper()] == ns_value, f'{ns} match' - def test_isolation_unpriv_user(self, is_su): - if not self.isolation_key('unprivileged_userns_clone'): - pytest.skip('unprivileged clone is not available') - if is_su: - pytest.skip('privileged tests, skip this') +def test_isolation_unpriv_user(require): + require( + { + 'privileged_user': False, + 'features': {'isolation': ['unprivileged_userns_clone']}, + } + ) - self.load('ns_inspect') - obj = self.getjson()['body'] + client.load('ns_inspect') + obj = client.getjson()['body'] - assert obj['UID'] == os.geteuid(), 'uid match' - assert obj['GID'] == os.getegid(), 'gid match' + assert obj['UID'] == os.geteuid(), 'uid match' + assert obj['GID'] == os.getegid(), 'gid match' - self.load('ns_inspect', isolation={'namespaces': {'credential': True}}) + client.load('ns_inspect', isolation={'namespaces': {'credential': True}}) - obj = self.getjson()['body'] + obj = client.getjson()['body'] - nobody_uid, nogroup_gid, nogroup = self.unpriv_creds() + nobody_uid, nogroup_gid, nogroup = unpriv_creds() - # unprivileged unit map itself to nobody in the container by default - assert obj['UID'] == nobody_uid, 'uid of nobody' - assert obj['GID'] == nogroup_gid, f'gid of {nogroup}' + # unprivileged unit map itself to nobody in the container by default + assert obj['UID'] == nobody_uid, 'uid of nobody' + assert obj['GID'] == nogroup_gid, f'gid of {nogroup}' - self.load( - 'ns_inspect', - user='root', - isolation={'namespaces': {'credential': True}}, - ) + client.load( + 'ns_inspect', + user='root', + isolation={'namespaces': {'credential': True}}, + ) - obj = self.getjson()['body'] + obj = client.getjson()['body'] - assert obj['UID'] == 0, 'uid match user=root' - assert obj['GID'] == 0, 'gid match user=root' + assert obj['UID'] == 0, 'uid match user=root' + assert obj['GID'] == 0, 'gid match user=root' - self.load( - 'ns_inspect', - user='root', - group=nogroup, - isolation={'namespaces': {'credential': True}}, - ) + client.load( + 'ns_inspect', + user='root', + group=nogroup, + isolation={'namespaces': {'credential': True}}, + ) - obj = self.getjson()['body'] + obj = client.getjson()['body'] - assert obj['UID'] == 0, 'uid match user=root group=nogroup' - assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup' + assert obj['UID'] == 0, 'uid match user=root group=nogroup' + assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup' - self.load( - 'ns_inspect', - user='root', - group='root', - isolation={ - 'namespaces': {'credential': True}, - 'uidmap': [{'container': 0, 'host': os.geteuid(), 'size': 1}], - 'gidmap': [{'container': 0, 'host': os.getegid(), 'size': 1}], - }, - ) + client.load( + 'ns_inspect', + user='root', + group='root', + isolation={ + 'namespaces': {'credential': True}, + 'uidmap': [{'container': 0, 'host': os.geteuid(), 'size': 1}], + 'gidmap': [{'container': 0, 'host': os.getegid(), 'size': 1}], + }, + ) - obj = self.getjson()['body'] + obj = client.getjson()['body'] - assert obj['UID'] == 0, 'uid match uidmap' - assert obj['GID'] == 0, 'gid match gidmap' + assert obj['UID'] == 0, 'uid match uidmap' + assert obj['GID'] == 0, 'gid match gidmap' - def test_isolation_priv_user(self, is_su): - if not is_su: - pytest.skip('unprivileged tests, skip this') - self.load('ns_inspect') +def test_isolation_priv_user(require): + require({'privileged_user': True}) - nobody_uid, nogroup_gid, nogroup = self.unpriv_creds() + client.load('ns_inspect') - obj = self.getjson()['body'] + nobody_uid, nogroup_gid, nogroup = unpriv_creds() - assert obj['UID'] == nobody_uid, 'uid match' - assert obj['GID'] == nogroup_gid, 'gid match' + obj = client.getjson()['body'] - self.load('ns_inspect', isolation={'namespaces': {'credential': True}}) + assert obj['UID'] == nobody_uid, 'uid match' + assert obj['GID'] == nogroup_gid, 'gid match' - obj = self.getjson()['body'] + client.load('ns_inspect', isolation={'namespaces': {'credential': True}}) - # privileged unit map app creds in the container by default - assert obj['UID'] == nobody_uid, 'uid nobody' - assert obj['GID'] == nogroup_gid, 'gid nobody' + obj = client.getjson()['body'] - self.load( - 'ns_inspect', - user='root', - isolation={'namespaces': {'credential': True}}, - ) + # privileged unit map app creds in the container by default + assert obj['UID'] == nobody_uid, 'uid nobody' + assert obj['GID'] == nogroup_gid, 'gid nobody' - obj = self.getjson()['body'] + client.load( + 'ns_inspect', + user='root', + isolation={'namespaces': {'credential': True}}, + ) - assert obj['UID'] == 0, 'uid nobody user=root' - assert obj['GID'] == 0, 'gid nobody user=root' + obj = client.getjson()['body'] - self.load( - 'ns_inspect', - user='root', - group=nogroup, - isolation={'namespaces': {'credential': True}}, - ) + assert obj['UID'] == 0, 'uid nobody user=root' + assert obj['GID'] == 0, 'gid nobody user=root' - obj = self.getjson()['body'] + client.load( + 'ns_inspect', + user='root', + group=nogroup, + isolation={'namespaces': {'credential': True}}, + ) - assert obj['UID'] == 0, 'uid match user=root group=nogroup' - assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup' + obj = client.getjson()['body'] - self.load( - 'ns_inspect', - user='root', - group='root', - isolation={ - 'namespaces': {'credential': True}, - 'uidmap': [{'container': 0, 'host': 0, 'size': 1}], - 'gidmap': [{'container': 0, 'host': 0, 'size': 1}], - }, - ) + assert obj['UID'] == 0, 'uid match user=root group=nogroup' + assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup' - obj = self.getjson()['body'] + client.load( + 'ns_inspect', + user='root', + group='root', + isolation={ + 'namespaces': {'credential': True}, + 'uidmap': [{'container': 0, 'host': 0, 'size': 1}], + 'gidmap': [{'container': 0, 'host': 0, 'size': 1}], + }, + ) - assert obj['UID'] == 0, 'uid match uidmap user=root' - assert obj['GID'] == 0, 'gid match gidmap user=root' + obj = client.getjson()['body'] - # map 65535 uids - self.load( - 'ns_inspect', - user='nobody', - isolation={ - 'namespaces': {'credential': True}, - 'uidmap': [{'container': 0, 'host': 0, 'size': nobody_uid + 1}], - }, - ) + assert obj['UID'] == 0, 'uid match uidmap user=root' + assert obj['GID'] == 0, 'gid match gidmap user=root' - obj = self.getjson()['body'] + # map 65535 uids + client.load( + 'ns_inspect', + user='nobody', + isolation={ + 'namespaces': {'credential': True}, + 'uidmap': [{'container': 0, 'host': 0, 'size': nobody_uid + 1}], + }, + ) - assert obj['UID'] == nobody_uid, 'uid match uidmap user=nobody' - assert obj['GID'] == nogroup_gid, 'gid match uidmap user=nobody' + obj = client.getjson()['body'] - def test_isolation_mnt(self): - if not self.isolation_key('mnt'): - pytest.skip('mnt namespace is not supported') + assert obj['UID'] == nobody_uid, 'uid match uidmap user=nobody' + assert obj['GID'] == nogroup_gid, 'gid match uidmap user=nobody' - if not self.isolation_key('unprivileged_userns_clone'): - pytest.skip('unprivileged clone is not available') - self.load( - 'ns_inspect', - isolation={'namespaces': {'mount': True, 'credential': True}}, +def test_isolation_mnt(require): + require( + { + 'features': {'isolation': ['unprivileged_userns_clone', 'mnt']}, + } + ) + + client.load( + 'ns_inspect', + isolation={'namespaces': {'mount': True, 'credential': True}}, + ) + + obj = client.getjson()['body'] + + # all but user and mnt + allns = list(option.available['features']['isolation'].keys()) + allns.remove('user') + allns.remove('mnt') + + for ns in allns: + if ns.upper() in obj['NS']: + assert ( + obj['NS'][ns.upper()] + == option.available['features']['isolation'][ns] + ), f'{ns} match' + + assert obj['NS']['MNT'] != getns('mnt'), 'mnt set' + assert obj['NS']['USER'] != getns('user'), 'user set' + + +def test_isolation_pid(is_su, require): + require({'features': {'isolation': ['pid']}}) + + if not is_su: + require( + { + 'features': { + 'isolation': [ + 'unprivileged_userns_clone', + 'user', + 'mnt', + ] + } + } ) - obj = self.getjson()['body'] - - # all but user and mnt - allns = list(option.available['features']['isolation'].keys()) - allns.remove('user') - allns.remove('mnt') - - for ns in allns: - if ns.upper() in obj['NS']: - assert ( - obj['NS'][ns.upper()] - == option.available['features']['isolation'][ns] - ), f'{ns} match' - - assert obj['NS']['MNT'] != getns('mnt'), 'mnt set' - assert obj['NS']['USER'] != getns('user'), 'user set' - - def test_isolation_pid(self, is_su): - if not self.isolation_key('pid'): - pytest.skip('pid namespace is not supported') + isolation = {'namespaces': {'pid': True}} - if not is_su: - if not self.isolation_key('unprivileged_userns_clone'): - pytest.skip('unprivileged clone is not available') + if not is_su: + isolation['namespaces']['mount'] = True + isolation['namespaces']['credential'] = True - if not self.isolation_key('user'): - pytest.skip('user namespace is not supported') + client.load('ns_inspect', isolation=isolation) - if not self.isolation_key('mnt'): - pytest.skip('mnt namespace is not supported') + obj = client.getjson()['body'] - isolation = {'namespaces': {'pid': True}} + assert obj['PID'] == 2, 'pid of container is 2' - if not is_su: - isolation['namespaces']['mount'] = True - isolation['namespaces']['credential'] = True - self.load('ns_inspect', isolation=isolation) +def test_isolation_namespace_false(): + client.load('ns_inspect') + allns = list(option.available['features']['isolation'].keys()) - obj = self.getjson()['body'] + remove_list = ['unprivileged_userns_clone', 'ipc', 'cgroup'] + allns = [ns for ns in allns if ns not in remove_list] - assert obj['PID'] == 2, 'pid of container is 2' + namespaces = {} + for ns in allns: + if ns == 'user': + namespaces['credential'] = False + elif ns == 'mnt': + namespaces['mount'] = False + elif ns == 'net': + namespaces['network'] = False + elif ns == 'uts': + namespaces['uname'] = False + else: + namespaces[ns] = False - def test_isolation_namespace_false(self): - self.load('ns_inspect') - allns = list(option.available['features']['isolation'].keys()) + client.load('ns_inspect', isolation={'namespaces': namespaces}) - remove_list = ['unprivileged_userns_clone', 'ipc', 'cgroup'] - allns = [ns for ns in allns if ns not in remove_list] + obj = client.getjson()['body'] - namespaces = {} - for ns in allns: - if ns == 'user': - namespaces['credential'] = False - elif ns == 'mnt': - namespaces['mount'] = False - elif ns == 'net': - namespaces['network'] = False - elif ns == 'uts': - namespaces['uname'] = False - else: - namespaces[ns] = False + for ns in allns: + if ns.upper() in obj['NS']: + assert ( + obj['NS'][ns.upper()] + == option.available['features']['isolation'][ns] + ), f'{ns} match' - self.load('ns_inspect', isolation={'namespaces': namespaces}) - obj = self.getjson()['body'] - - for ns in allns: - if ns.upper() in obj['NS']: - assert ( - obj['NS'][ns.upper()] - == option.available['features']['isolation'][ns] - ), f'{ns} match' - - def test_go_isolation_rootfs_container(self, is_su, temp_dir): - if not is_su: - if not self.isolation_key('unprivileged_userns_clone'): - pytest.skip('unprivileged clone is not available') - - if not self.isolation_key('user'): - pytest.skip('user namespace is not supported') - - if not self.isolation_key('mnt'): - pytest.skip('mnt namespace is not supported') - - if not self.isolation_key('pid'): - pytest.skip('pid namespace is not supported') - - isolation = {'rootfs': temp_dir} - - if not is_su: - isolation['namespaces'] = { - 'mount': True, - 'credential': True, - 'pid': True, +def test_go_isolation_rootfs_container(is_su, require, temp_dir): + if not is_su: + require( + { + 'features': { + 'isolation': [ + 'unprivileged_userns_clone', + 'user', + 'mnt', + 'pid', + ] + } } + ) - self.load('ns_inspect', isolation=isolation) + isolation = {'rootfs': temp_dir} - obj = self.getjson(url='/?file=/go/app')['body'] + if not is_su: + isolation['namespaces'] = { + 'mount': True, + 'credential': True, + 'pid': True, + } - assert obj['FileExists'] == True, 'app relative to rootfs' + client.load('ns_inspect', isolation=isolation) - obj = self.getjson(url='/?file=/bin/sh')['body'] - assert obj['FileExists'] == False, 'file should not exists' + obj = client.getjson(url='/?file=/go/app')['body'] - def test_go_isolation_rootfs_container_priv(self, is_su, temp_dir): - if not is_su: - pytest.skip('requires root') + assert obj['FileExists'], 'app relative to rootfs' - if not self.isolation_key('mnt'): - pytest.skip('mnt namespace is not supported') + obj = client.getjson(url='/?file=/bin/sh')['body'] + assert not obj['FileExists'], 'file should not exists' - isolation = { - 'namespaces': {'mount': True}, - 'rootfs': temp_dir, - } - self.load('ns_inspect', isolation=isolation) +def test_go_isolation_rootfs_container_priv(require, temp_dir): + require({'privileged_user': True, 'features': {'isolation': ['mnt']}}) - obj = self.getjson(url='/?file=/go/app')['body'] + isolation = { + 'namespaces': {'mount': True}, + 'rootfs': temp_dir, + } - assert obj['FileExists'] == True, 'app relative to rootfs' + client.load('ns_inspect', isolation=isolation) - obj = self.getjson(url='/?file=/bin/sh')['body'] - assert obj['FileExists'] == False, 'file should not exists' + obj = client.getjson(url='/?file=/go/app')['body'] - def test_go_isolation_rootfs_automount_tmpfs(self, is_su, temp_dir): - try: - open("/proc/self/mountinfo") - except: - pytest.skip('The system lacks /proc/self/mountinfo file') + assert obj['FileExists'], 'app relative to rootfs' - if not is_su: - if not self.isolation_key('unprivileged_userns_clone'): - pytest.skip('unprivileged clone is not available') + obj = client.getjson(url='/?file=/bin/sh')['body'] + assert not obj['FileExists'], 'file should not exists' - if not self.isolation_key('user'): - pytest.skip('user namespace is not supported') - if not self.isolation_key('mnt'): - pytest.skip('mnt namespace is not supported') +def test_go_isolation_rootfs_automount_tmpfs(is_su, require, temp_dir): + try: + open("/proc/self/mountinfo") + except: + pytest.skip('The system lacks /proc/self/mountinfo file') - if not self.isolation_key('pid'): - pytest.skip('pid namespace is not supported') + if not is_su: + require( + { + 'features': { + 'isolation': [ + 'unprivileged_userns_clone', + 'user', + 'mnt', + 'pid', + ] + } + } + ) - isolation = {'rootfs': temp_dir} + isolation = {'rootfs': temp_dir} - if not is_su: - isolation['namespaces'] = { - 'mount': True, - 'credential': True, - 'pid': True, - } + if not is_su: + isolation['namespaces'] = { + 'mount': True, + 'credential': True, + 'pid': True, + } - isolation['automount'] = {'tmpfs': False} + isolation['automount'] = {'tmpfs': False} - self.load('ns_inspect', isolation=isolation) + client.load('ns_inspect', isolation=isolation) - obj = self.getjson(url='/?mounts=true')['body'] + obj = client.getjson(url='/?mounts=true')['body'] - assert ( - "/ /tmp" not in obj['Mounts'] and "tmpfs" not in obj['Mounts'] - ), 'app has no /tmp mounted' + assert ( + "/ /tmp" not in obj['Mounts'] and "tmpfs" not in obj['Mounts'] + ), 'app has no /tmp mounted' - isolation['automount'] = {'tmpfs': True} + isolation['automount'] = {'tmpfs': True} - self.load('ns_inspect', isolation=isolation) + client.load('ns_inspect', isolation=isolation) - obj = self.getjson(url='/?mounts=true')['body'] + obj = client.getjson(url='/?mounts=true')['body'] - assert ( - "/ /tmp" in obj['Mounts'] and "tmpfs" in obj['Mounts'] - ), 'app has /tmp mounted on /' + assert ( + "/ /tmp" in obj['Mounts'] and "tmpfs" in obj['Mounts'] + ), 'app has /tmp mounted on /' |