import grp import os import pwd import pytest from unit.applications.lang.go import TestApplicationGo from unit.option import option from unit.utils import getns prerequisites = {'modules': {'go': 'any'}, 'features': {'isolation': True}} class TestGoIsolation(TestApplicationGo): def unpriv_creds(self): nobody_uid = pwd.getpwnam('nobody').pw_uid try: nogroup_gid = grp.getgrnam('nogroup').gr_gid nogroup = 'nogroup' except KeyError: nogroup_gid = grp.getgrnam('nobody').gr_gid nogroup = 'nobody' return (nobody_uid, nogroup_gid, nogroup) def test_isolation_values(self): self.load('ns_inspect') obj = self.getjson()['body'] for ns, ns_value in option.available['features']['isolation'].items(): if ns.upper() in obj['NS']: assert obj['NS'][ns.upper()] == ns_value, f'{ns} match' def test_isolation_unpriv_user(self, require): require( { 'privileged_user': False, 'features': {'isolation': ['unprivileged_userns_clone']}, } ) self.load('ns_inspect') obj = self.getjson()['body'] assert obj['UID'] == os.geteuid(), 'uid match' assert obj['GID'] == os.getegid(), 'gid match' self.load('ns_inspect', isolation={'namespaces': {'credential': True}}) obj = self.getjson()['body'] nobody_uid, nogroup_gid, nogroup = self.unpriv_creds() # unprivileged unit map itself to nobody in the container by default assert obj['UID'] == nobody_uid, 'uid of nobody' assert obj['GID'] == nogroup_gid, f'gid of {nogroup}' self.load( 'ns_inspect', user='root', isolation={'namespaces': {'credential': True}}, ) obj = self.getjson()['body'] assert obj['UID'] == 0, 'uid match user=root' assert obj['GID'] == 0, 'gid match user=root' self.load( 'ns_inspect', user='root', group=nogroup, isolation={'namespaces': {'credential': True}}, ) obj = self.getjson()['body'] assert obj['UID'] == 0, 'uid match user=root group=nogroup' assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup' self.load( 'ns_inspect', user='root', group='root', isolation={ 'namespaces': {'credential': True}, 'uidmap': [{'container': 0, 'host': os.geteuid(), 'size': 1}], 'gidmap': [{'container': 0, 'host': os.getegid(), 'size': 1}], }, ) obj = self.getjson()['body'] assert obj['UID'] == 0, 'uid match uidmap' assert obj['GID'] == 0, 'gid match gidmap' def test_isolation_priv_user(self, require): require({'privileged_user': True}) self.load('ns_inspect') nobody_uid, nogroup_gid, nogroup = self.unpriv_creds() obj = self.getjson()['body'] assert obj['UID'] == nobody_uid, 'uid match' assert obj['GID'] == nogroup_gid, 'gid match' self.load('ns_inspect', isolation={'namespaces': {'credential': True}}) obj = self.getjson()['body'] # privileged unit map app creds in the container by default assert obj['UID'] == nobody_uid, 'uid nobody' assert obj['GID'] == nogroup_gid, 'gid nobody' self.load( 'ns_inspect', user='root', isolation={'namespaces': {'credential': True}}, ) obj = self.getjson()['body'] assert obj['UID'] == 0, 'uid nobody user=root' assert obj['GID'] == 0, 'gid nobody user=root' self.load( 'ns_inspect', user='root', group=nogroup, isolation={'namespaces': {'credential': True}}, ) obj = self.getjson()['body'] assert obj['UID'] == 0, 'uid match user=root group=nogroup' assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup' self.load( 'ns_inspect', user='root', group='root', isolation={ 'namespaces': {'credential': True}, 'uidmap': [{'container': 0, 'host': 0, 'size': 1}], 'gidmap': [{'container': 0, 'host': 0, 'size': 1}], }, ) obj = self.getjson()['body'] assert obj['UID'] == 0, 'uid match uidmap user=root' assert obj['GID'] == 0, 'gid match gidmap user=root' # map 65535 uids self.load( 'ns_inspect', user='nobody', isolation={ 'namespaces': {'credential': True}, 'uidmap': [{'container': 0, 'host': 0, 'size': nobody_uid + 1}], }, ) obj = self.getjson()['body'] assert obj['UID'] == nobody_uid, 'uid match uidmap user=nobody' assert obj['GID'] == nogroup_gid, 'gid match uidmap user=nobody' def test_isolation_mnt(self, require): require( { 'features': {'isolation': ['unprivileged_userns_clone', 'mnt']}, } ) self.load( 'ns_inspect', isolation={'namespaces': {'mount': True, 'credential': True}}, ) obj = self.getjson()['body'] # all but user and mnt allns = list(option.available['features']['isolation'].keys()) allns.remove('user') allns.remove('mnt') for ns in allns: if ns.upper() in obj['NS']: assert ( obj['NS'][ns.upper()] == option.available['features']['isolation'][ns] ), f'{ns} match' assert obj['NS']['MNT'] != getns('mnt'), 'mnt set' assert obj['NS']['USER'] != getns('user'), 'user set' def test_isolation_pid(self, is_su, require): require({'features': {'isolation': ['pid']}}) if not is_su: require( { 'features': { 'isolation': [ 'unprivileged_userns_clone', 'user', 'mnt', ] } } ) isolation = {'namespaces': {'pid': True}} if not is_su: isolation['namespaces']['mount'] = True isolation['namespaces']['credential'] = True self.load('ns_inspect', isolation=isolation) obj = self.getjson()['body'] assert obj['PID'] == 2, 'pid of container is 2' def test_isolation_namespace_false(self): self.load('ns_inspect') allns = list(option.available['features']['isolation'].keys()) remove_list = ['unprivileged_userns_clone', 'ipc', 'cgroup'] allns = [ns for ns in allns if ns not in remove_list] namespaces = {} for ns in allns: if ns == 'user': namespaces['credential'] = False elif ns == 'mnt': namespaces['mount'] = False elif ns == 'net': namespaces['network'] = False elif ns == 'uts': namespaces['uname'] = False else: namespaces[ns] = False self.load('ns_inspect', isolation={'namespaces': namespaces}) obj = self.getjson()['body'] for ns in allns: if ns.upper() in obj['NS']: assert ( obj['NS'][ns.upper()] == option.available['features']['isolation'][ns] ), f'{ns} match' def test_go_isolation_rootfs_container(self, is_su, require, temp_dir): if not is_su: require( { 'features': { 'isolation': [ 'unprivileged_userns_clone', 'user', 'mnt', 'pid', ] } } ) isolation = {'rootfs': temp_dir} if not is_su: isolation['namespaces'] = { 'mount': True, 'credential': True, 'pid': True, } self.load('ns_inspect', isolation=isolation) obj = self.getjson(url='/?file=/go/app')['body'] assert obj['FileExists'], 'app relative to rootfs' obj = self.getjson(url='/?file=/bin/sh')['body'] assert not obj['FileExists'], 'file should not exists' def test_go_isolation_rootfs_container_priv(self, require, temp_dir): require({'privileged_user': True, 'features': {'isolation': ['mnt']}}) isolation = { 'namespaces': {'mount': True}, 'rootfs': temp_dir, } self.load('ns_inspect', isolation=isolation) obj = self.getjson(url='/?file=/go/app')['body'] assert obj['FileExists'], 'app relative to rootfs' obj = self.getjson(url='/?file=/bin/sh')['body'] assert not obj['FileExists'], 'file should not exists' def test_go_isolation_rootfs_automount_tmpfs( self, is_su, require, temp_dir ): try: open("/proc/self/mountinfo") except: pytest.skip('The system lacks /proc/self/mountinfo file') if not is_su: require( { 'features': { 'isolation': [ 'unprivileged_userns_clone', 'user', 'mnt', 'pid', ] } } ) isolation = {'rootfs': temp_dir} if not is_su: isolation['namespaces'] = { 'mount': True, 'credential': True, 'pid': True, } isolation['automount'] = {'tmpfs': False} self.load('ns_inspect', isolation=isolation) obj = self.getjson(url='/?mounts=true')['body'] assert ( "/ /tmp" not in obj['Mounts'] and "tmpfs" not in obj['Mounts'] ), 'app has no /tmp mounted' isolation['automount'] = {'tmpfs': True} self.load('ns_inspect', isolation=isolation) obj = self.getjson(url='/?mounts=true')['body'] assert ( "/ /tmp" in obj['Mounts'] and "tmpfs" in obj['Mounts'] ), 'app has /tmp mounted on /'