diff options
Diffstat (limited to 'test/test_go_isolation.py')
-rw-r--r-- | test/test_go_isolation.py | 566 |
1 files changed, 287 insertions, 279 deletions
diff --git a/test/test_go_isolation.py b/test/test_go_isolation.py index 8d3a9025..ba3390ea 100644 --- a/test/test_go_isolation.py +++ b/test/test_go_isolation.py @@ -3,357 +3,365 @@ import os import pwd import pytest -from unit.applications.lang.go import TestApplicationGo +from unit.applications.lang.go import ApplicationGo from unit.option import option from unit.utils import getns prerequisites = {'modules': {'go': 'any'}, 'features': {'isolation': True}} +client = ApplicationGo() -class TestGoIsolation(TestApplicationGo): - def unpriv_creds(self): - nobody_uid = pwd.getpwnam('nobody').pw_uid - try: - nogroup_gid = grp.getgrnam('nogroup').gr_gid - nogroup = 'nogroup' - except KeyError: - nogroup_gid = grp.getgrnam('nobody').gr_gid - nogroup = 'nobody' +def unpriv_creds(): + nobody_uid = pwd.getpwnam('nobody').pw_uid - return (nobody_uid, nogroup_gid, nogroup) + try: + nogroup_gid = grp.getgrnam('nogroup').gr_gid + nogroup = 'nogroup' + except KeyError: + nogroup_gid = grp.getgrnam('nobody').gr_gid + nogroup = 'nobody' - def test_isolation_values(self): - self.load('ns_inspect') + return (nobody_uid, nogroup_gid, nogroup) - obj = self.getjson()['body'] - for ns, ns_value in option.available['features']['isolation'].items(): - if ns.upper() in obj['NS']: - assert obj['NS'][ns.upper()] == ns_value, f'{ns} match' +def test_isolation_values(): + client.load('ns_inspect') - def test_isolation_unpriv_user(self, require): - require( - { - 'privileged_user': False, - 'features': {'isolation': ['unprivileged_userns_clone']}, - } - ) + obj = client.getjson()['body'] - self.load('ns_inspect') - obj = self.getjson()['body'] + for ns, ns_value in option.available['features']['isolation'].items(): + if ns.upper() in obj['NS']: + assert obj['NS'][ns.upper()] == ns_value, f'{ns} match' - assert obj['UID'] == os.geteuid(), 'uid match' - assert obj['GID'] == os.getegid(), 'gid match' - self.load('ns_inspect', isolation={'namespaces': {'credential': True}}) +def test_isolation_unpriv_user(require): + require( + { + 'privileged_user': False, + 'features': {'isolation': ['unprivileged_userns_clone']}, + } + ) - obj = self.getjson()['body'] + client.load('ns_inspect') + obj = client.getjson()['body'] - nobody_uid, nogroup_gid, nogroup = self.unpriv_creds() + assert obj['UID'] == os.geteuid(), 'uid match' + assert obj['GID'] == os.getegid(), 'gid match' - # unprivileged unit map itself to nobody in the container by default - assert obj['UID'] == nobody_uid, 'uid of nobody' - assert obj['GID'] == nogroup_gid, f'gid of {nogroup}' + client.load('ns_inspect', isolation={'namespaces': {'credential': True}}) - self.load( - 'ns_inspect', - user='root', - isolation={'namespaces': {'credential': True}}, - ) + obj = client.getjson()['body'] - obj = self.getjson()['body'] + nobody_uid, nogroup_gid, nogroup = unpriv_creds() - assert obj['UID'] == 0, 'uid match user=root' - assert obj['GID'] == 0, 'gid match user=root' + # unprivileged unit map itself to nobody in the container by default + assert obj['UID'] == nobody_uid, 'uid of nobody' + assert obj['GID'] == nogroup_gid, f'gid of {nogroup}' - self.load( - 'ns_inspect', - user='root', - group=nogroup, - isolation={'namespaces': {'credential': True}}, - ) + client.load( + 'ns_inspect', + user='root', + isolation={'namespaces': {'credential': True}}, + ) - obj = self.getjson()['body'] + obj = client.getjson()['body'] - assert obj['UID'] == 0, 'uid match user=root group=nogroup' - assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup' + assert obj['UID'] == 0, 'uid match user=root' + assert obj['GID'] == 0, 'gid match user=root' - self.load( - 'ns_inspect', - user='root', - group='root', - isolation={ - 'namespaces': {'credential': True}, - 'uidmap': [{'container': 0, 'host': os.geteuid(), 'size': 1}], - 'gidmap': [{'container': 0, 'host': os.getegid(), 'size': 1}], - }, - ) + client.load( + 'ns_inspect', + user='root', + group=nogroup, + isolation={'namespaces': {'credential': True}}, + ) - obj = self.getjson()['body'] + obj = client.getjson()['body'] - assert obj['UID'] == 0, 'uid match uidmap' - assert obj['GID'] == 0, 'gid match gidmap' + assert obj['UID'] == 0, 'uid match user=root group=nogroup' + assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup' - def test_isolation_priv_user(self, require): - require({'privileged_user': True}) + client.load( + 'ns_inspect', + user='root', + group='root', + isolation={ + 'namespaces': {'credential': True}, + 'uidmap': [{'container': 0, 'host': os.geteuid(), 'size': 1}], + 'gidmap': [{'container': 0, 'host': os.getegid(), 'size': 1}], + }, + ) - self.load('ns_inspect') + obj = client.getjson()['body'] - nobody_uid, nogroup_gid, nogroup = self.unpriv_creds() + assert obj['UID'] == 0, 'uid match uidmap' + assert obj['GID'] == 0, 'gid match gidmap' - obj = self.getjson()['body'] - assert obj['UID'] == nobody_uid, 'uid match' - assert obj['GID'] == nogroup_gid, 'gid match' +def test_isolation_priv_user(require): + require({'privileged_user': True}) - self.load('ns_inspect', isolation={'namespaces': {'credential': True}}) + client.load('ns_inspect') - obj = self.getjson()['body'] + nobody_uid, nogroup_gid, nogroup = unpriv_creds() - # privileged unit map app creds in the container by default - assert obj['UID'] == nobody_uid, 'uid nobody' - assert obj['GID'] == nogroup_gid, 'gid nobody' + obj = client.getjson()['body'] - self.load( - 'ns_inspect', - user='root', - isolation={'namespaces': {'credential': True}}, - ) + assert obj['UID'] == nobody_uid, 'uid match' + assert obj['GID'] == nogroup_gid, 'gid match' - obj = self.getjson()['body'] + client.load('ns_inspect', isolation={'namespaces': {'credential': True}}) - assert obj['UID'] == 0, 'uid nobody user=root' - assert obj['GID'] == 0, 'gid nobody user=root' + obj = client.getjson()['body'] - self.load( - 'ns_inspect', - user='root', - group=nogroup, - isolation={'namespaces': {'credential': True}}, - ) + # privileged unit map app creds in the container by default + assert obj['UID'] == nobody_uid, 'uid nobody' + assert obj['GID'] == nogroup_gid, 'gid nobody' - obj = self.getjson()['body'] + client.load( + 'ns_inspect', + user='root', + isolation={'namespaces': {'credential': True}}, + ) - assert obj['UID'] == 0, 'uid match user=root group=nogroup' - assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup' + obj = client.getjson()['body'] - self.load( - 'ns_inspect', - user='root', - group='root', - isolation={ - 'namespaces': {'credential': True}, - 'uidmap': [{'container': 0, 'host': 0, 'size': 1}], - 'gidmap': [{'container': 0, 'host': 0, 'size': 1}], - }, - ) + assert obj['UID'] == 0, 'uid nobody user=root' + assert obj['GID'] == 0, 'gid nobody user=root' - obj = self.getjson()['body'] + client.load( + 'ns_inspect', + user='root', + group=nogroup, + isolation={'namespaces': {'credential': True}}, + ) - assert obj['UID'] == 0, 'uid match uidmap user=root' - assert obj['GID'] == 0, 'gid match gidmap user=root' + obj = client.getjson()['body'] - # map 65535 uids - self.load( - 'ns_inspect', - user='nobody', - isolation={ - 'namespaces': {'credential': True}, - 'uidmap': [{'container': 0, 'host': 0, 'size': nobody_uid + 1}], - }, - ) + assert obj['UID'] == 0, 'uid match user=root group=nogroup' + assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup' + + client.load( + 'ns_inspect', + user='root', + group='root', + isolation={ + 'namespaces': {'credential': True}, + 'uidmap': [{'container': 0, 'host': 0, 'size': 1}], + 'gidmap': [{'container': 0, 'host': 0, 'size': 1}], + }, + ) + + obj = client.getjson()['body'] + + assert obj['UID'] == 0, 'uid match uidmap user=root' + assert obj['GID'] == 0, 'gid match gidmap user=root' + + # map 65535 uids + client.load( + 'ns_inspect', + user='nobody', + isolation={ + 'namespaces': {'credential': True}, + 'uidmap': [{'container': 0, 'host': 0, 'size': nobody_uid + 1}], + }, + ) - obj = self.getjson()['body'] + obj = client.getjson()['body'] - assert obj['UID'] == nobody_uid, 'uid match uidmap user=nobody' - assert obj['GID'] == nogroup_gid, 'gid match uidmap user=nobody' + assert obj['UID'] == nobody_uid, 'uid match uidmap user=nobody' + assert obj['GID'] == nogroup_gid, 'gid match uidmap user=nobody' - def test_isolation_mnt(self, require): + +def test_isolation_mnt(require): + require( + { + 'features': {'isolation': ['unprivileged_userns_clone', 'mnt']}, + } + ) + + client.load( + 'ns_inspect', + isolation={'namespaces': {'mount': True, 'credential': True}}, + ) + + obj = client.getjson()['body'] + + # all but user and mnt + allns = list(option.available['features']['isolation'].keys()) + allns.remove('user') + allns.remove('mnt') + + for ns in allns: + if ns.upper() in obj['NS']: + assert ( + obj['NS'][ns.upper()] + == option.available['features']['isolation'][ns] + ), f'{ns} match' + + assert obj['NS']['MNT'] != getns('mnt'), 'mnt set' + assert obj['NS']['USER'] != getns('user'), 'user set' + + +def test_isolation_pid(is_su, require): + require({'features': {'isolation': ['pid']}}) + + if not is_su: require( { - 'features': {'isolation': ['unprivileged_userns_clone', 'mnt']}, + 'features': { + 'isolation': [ + 'unprivileged_userns_clone', + 'user', + 'mnt', + ] + } } ) - self.load( - 'ns_inspect', - isolation={'namespaces': {'mount': True, 'credential': True}}, - ) + isolation = {'namespaces': {'pid': True}} - obj = self.getjson()['body'] - - # all but user and mnt - allns = list(option.available['features']['isolation'].keys()) - allns.remove('user') - allns.remove('mnt') - - for ns in allns: - if ns.upper() in obj['NS']: - assert ( - obj['NS'][ns.upper()] - == option.available['features']['isolation'][ns] - ), f'{ns} match' - - assert obj['NS']['MNT'] != getns('mnt'), 'mnt set' - assert obj['NS']['USER'] != getns('user'), 'user set' - - def test_isolation_pid(self, is_su, require): - require({'features': {'isolation': ['pid']}}) - - if not is_su: - require( - { - 'features': { - 'isolation': [ - 'unprivileged_userns_clone', - 'user', - 'mnt', - ] - } - } - ) - - isolation = {'namespaces': {'pid': True}} - - if not is_su: - isolation['namespaces']['mount'] = True - isolation['namespaces']['credential'] = True - - self.load('ns_inspect', isolation=isolation) - - obj = self.getjson()['body'] - - assert obj['PID'] == 2, 'pid of container is 2' - - def test_isolation_namespace_false(self): - self.load('ns_inspect') - allns = list(option.available['features']['isolation'].keys()) - - remove_list = ['unprivileged_userns_clone', 'ipc', 'cgroup'] - allns = [ns for ns in allns if ns not in remove_list] - - namespaces = {} - for ns in allns: - if ns == 'user': - namespaces['credential'] = False - elif ns == 'mnt': - namespaces['mount'] = False - elif ns == 'net': - namespaces['network'] = False - elif ns == 'uts': - namespaces['uname'] = False - else: - namespaces[ns] = False - - self.load('ns_inspect', isolation={'namespaces': namespaces}) - - obj = self.getjson()['body'] - - for ns in allns: - if ns.upper() in obj['NS']: - assert ( - obj['NS'][ns.upper()] - == option.available['features']['isolation'][ns] - ), f'{ns} match' - - def test_go_isolation_rootfs_container(self, is_su, require, temp_dir): - if not is_su: - require( - { - 'features': { - 'isolation': [ - 'unprivileged_userns_clone', - 'user', - 'mnt', - 'pid', - ] - } - } - ) + if not is_su: + isolation['namespaces']['mount'] = True + isolation['namespaces']['credential'] = True - isolation = {'rootfs': temp_dir} + client.load('ns_inspect', isolation=isolation) - if not is_su: - isolation['namespaces'] = { - 'mount': True, - 'credential': True, - 'pid': True, - } + obj = client.getjson()['body'] - self.load('ns_inspect', isolation=isolation) + assert obj['PID'] == 2, 'pid of container is 2' - obj = self.getjson(url='/?file=/go/app')['body'] - assert obj['FileExists'], 'app relative to rootfs' +def test_isolation_namespace_false(): + client.load('ns_inspect') + allns = list(option.available['features']['isolation'].keys()) - obj = self.getjson(url='/?file=/bin/sh')['body'] - assert not obj['FileExists'], 'file should not exists' + remove_list = ['unprivileged_userns_clone', 'ipc', 'cgroup'] + allns = [ns for ns in allns if ns not in remove_list] - def test_go_isolation_rootfs_container_priv(self, require, temp_dir): - require({'privileged_user': True, 'features': {'isolation': ['mnt']}}) + namespaces = {} + for ns in allns: + if ns == 'user': + namespaces['credential'] = False + elif ns == 'mnt': + namespaces['mount'] = False + elif ns == 'net': + namespaces['network'] = False + elif ns == 'uts': + namespaces['uname'] = False + else: + namespaces[ns] = False + + client.load('ns_inspect', isolation={'namespaces': namespaces}) + + obj = client.getjson()['body'] + + for ns in allns: + if ns.upper() in obj['NS']: + assert ( + obj['NS'][ns.upper()] + == option.available['features']['isolation'][ns] + ), f'{ns} match' - isolation = { - 'namespaces': {'mount': True}, - 'rootfs': temp_dir, - } - self.load('ns_inspect', isolation=isolation) - - obj = self.getjson(url='/?file=/go/app')['body'] - - assert obj['FileExists'], 'app relative to rootfs' - - obj = self.getjson(url='/?file=/bin/sh')['body'] - assert not obj['FileExists'], 'file should not exists' - - def test_go_isolation_rootfs_automount_tmpfs( - self, is_su, require, temp_dir - ): - try: - open("/proc/self/mountinfo") - except: - pytest.skip('The system lacks /proc/self/mountinfo file') - - if not is_su: - require( - { - 'features': { - 'isolation': [ - 'unprivileged_userns_clone', - 'user', - 'mnt', - 'pid', - ] - } +def test_go_isolation_rootfs_container(is_su, require, temp_dir): + if not is_su: + require( + { + 'features': { + 'isolation': [ + 'unprivileged_userns_clone', + 'user', + 'mnt', + 'pid', + ] } - ) + } + ) + + isolation = {'rootfs': temp_dir} + + if not is_su: + isolation['namespaces'] = { + 'mount': True, + 'credential': True, + 'pid': True, + } + + client.load('ns_inspect', isolation=isolation) + + obj = client.getjson(url='/?file=/go/app')['body'] + + assert obj['FileExists'], 'app relative to rootfs' + + obj = client.getjson(url='/?file=/bin/sh')['body'] + assert not obj['FileExists'], 'file should not exists' - isolation = {'rootfs': temp_dir} - if not is_su: - isolation['namespaces'] = { - 'mount': True, - 'credential': True, - 'pid': True, +def test_go_isolation_rootfs_container_priv(require, temp_dir): + require({'privileged_user': True, 'features': {'isolation': ['mnt']}}) + + isolation = { + 'namespaces': {'mount': True}, + 'rootfs': temp_dir, + } + + client.load('ns_inspect', isolation=isolation) + + obj = client.getjson(url='/?file=/go/app')['body'] + + assert obj['FileExists'], 'app relative to rootfs' + + obj = client.getjson(url='/?file=/bin/sh')['body'] + assert not obj['FileExists'], 'file should not exists' + + +def test_go_isolation_rootfs_automount_tmpfs(is_su, require, temp_dir): + try: + open("/proc/self/mountinfo") + except: + pytest.skip('The system lacks /proc/self/mountinfo file') + + if not is_su: + require( + { + 'features': { + 'isolation': [ + 'unprivileged_userns_clone', + 'user', + 'mnt', + 'pid', + ] + } } + ) + + isolation = {'rootfs': temp_dir} + + if not is_su: + isolation['namespaces'] = { + 'mount': True, + 'credential': True, + 'pid': True, + } - isolation['automount'] = {'tmpfs': False} + isolation['automount'] = {'tmpfs': False} - self.load('ns_inspect', isolation=isolation) + client.load('ns_inspect', isolation=isolation) - obj = self.getjson(url='/?mounts=true')['body'] + obj = client.getjson(url='/?mounts=true')['body'] - assert ( - "/ /tmp" not in obj['Mounts'] and "tmpfs" not in obj['Mounts'] - ), 'app has no /tmp mounted' + assert ( + "/ /tmp" not in obj['Mounts'] and "tmpfs" not in obj['Mounts'] + ), 'app has no /tmp mounted' - isolation['automount'] = {'tmpfs': True} + isolation['automount'] = {'tmpfs': True} - self.load('ns_inspect', isolation=isolation) + client.load('ns_inspect', isolation=isolation) - obj = self.getjson(url='/?mounts=true')['body'] + obj = client.getjson(url='/?mounts=true')['body'] - assert ( - "/ /tmp" in obj['Mounts'] and "tmpfs" in obj['Mounts'] - ), 'app has /tmp mounted on /' + assert ( + "/ /tmp" in obj['Mounts'] and "tmpfs" in obj['Mounts'] + ), 'app has /tmp mounted on /' |