summaryrefslogtreecommitdiffhomepage
path: root/test/unit/feature/isolation.py
blob: c6f6f3c0e99774a47d9b437d3607390e66e54d50 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import os

from unit.applications.lang.go import TestApplicationGo
from unit.applications.lang.java import TestApplicationJava
from unit.applications.lang.node import TestApplicationNode
from unit.applications.lang.perl import TestApplicationPerl
from unit.applications.lang.php import TestApplicationPHP
from unit.applications.lang.python import TestApplicationPython
from unit.applications.lang.ruby import TestApplicationRuby
from unit.applications.proto import TestApplicationProto


class TestFeatureIsolation(TestApplicationProto):
    allns = ['pid', 'mnt', 'ipc', 'uts', 'cgroup', 'net']

    def check(self, available, temp_dir):
        test_conf = {"namespaces": {"credential": True}}

        module = ''
        app = 'empty'
        if 'go' in available['modules']:
            module = TestApplicationGo()

        elif 'java' in available['modules']:
            module = TestApplicationJava()

        elif 'node' in available['modules']:
            module = TestApplicationNode()
            app = 'basic'

        elif 'perl' in available['modules']:
            module = TestApplicationPerl()
            app = 'body_empty'

        elif 'php' in available['modules']:
            module = TestApplicationPHP()
            app = 'phpinfo'

        elif 'python' in available['modules']:
            module = TestApplicationPython()

        elif 'ruby' in available['modules']:
            module = TestApplicationRuby()

        if not module:
            return

        module.temp_dir = temp_dir
        module.load(app)

        resp = module.conf(test_conf, 'applications/' + app + '/isolation')
        if 'success' not in resp:
            return

        userns = self.getns('user')
        if not userns:
            return

        available['features']['isolation'] = {'user': userns}

        unp_clone_path = '/proc/sys/kernel/unprivileged_userns_clone'
        if os.path.exists(unp_clone_path):
            with open(unp_clone_path, 'r') as f:
                if str(f.read()).rstrip() == '1':
                    available['features']['isolation'][
                        'unprivileged_userns_clone'
                    ] = True

        for ns in self.allns:
            ns_value = self.getns(ns)
            if ns_value:
                available['features']['isolation'][ns] = ns_value

    def getns(self, nstype):
        # read namespace id from symlink file:
        # it points to: '<nstype>:[<ns id>]'
        # # eg.: 'pid:[4026531836]'
        nspath = '/proc/self/ns/' + nstype
        data = None

        if os.path.exists(nspath):
            data = int(os.readlink(nspath)[len(nstype) + 2 : -1])

        return data