summaryrefslogtreecommitdiffhomepage
path: root/test/unit/feature/isolation.py
blob: 6a429fb1a7c80da074a5bcafb383d57cae6cffe9 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import os
import json
from unit.applications.proto import TestApplicationProto
from unit.applications.lang.go import TestApplicationGo
from unit.applications.lang.java import TestApplicationJava
from unit.applications.lang.node import TestApplicationNode
from unit.applications.lang.perl import TestApplicationPerl
from unit.applications.lang.php import TestApplicationPHP
from unit.applications.lang.python import TestApplicationPython
from unit.applications.lang.ruby import TestApplicationRuby


class TestFeatureIsolation(TestApplicationProto):
    allns = ['pid', 'mnt', 'ipc', 'uts', 'cgroup', 'net']

    def check(self, available, testdir):
        test_conf = {"namespaces": {"credential": True}}

        module = ''
        app = 'empty'
        if 'go' in available['modules']:
            module = TestApplicationGo()

        elif 'java' in available['modules']:
            module = TestApplicationJava()

        elif 'node' in available['modules']:
            module = TestApplicationNode()
            app = 'basic'

        elif 'perl' in available['modules']:
            module = TestApplicationPerl()
            app = 'body_empty'

        elif 'php' in available['modules']:
            module = TestApplicationPHP()
            app = 'phpinfo'

        elif 'python' in available['modules']:
            module = TestApplicationPython()

        elif 'ruby' in available['modules']:
            module = TestApplicationRuby()

        if not module:
            return

        module.testdir = testdir
        module.load(app)

        resp = module.conf(test_conf, 'applications/' + app + '/isolation')
        if 'success' not in resp:
            return

        userns = self.getns('user')
        if not userns:
            return

        available['features']['isolation'] = {'user': userns}

        unp_clone_path = '/proc/sys/kernel/unprivileged_userns_clone'
        if os.path.exists(unp_clone_path):
            with open(unp_clone_path, 'r') as f:
                if str(f.read()).rstrip() == '1':
                    available['features']['isolation'][
                        'unprivileged_userns_clone'
                    ] = True

        for ns in self.allns:
            ns_value = self.getns(ns)
            if ns_value:
                available['features']['isolation'][ns] = ns_value

    def getns(self, nstype):
        # read namespace id from symlink file:
        # it points to: '<nstype>:[<ns id>]'
        # # eg.: 'pid:[4026531836]'
        nspath = '/proc/self/ns/' + nstype
        data = None

        if os.path.exists(nspath):
            data = int(os.readlink(nspath)[len(nstype) + 2 : -1])

        return data

    def parsejson(self, data):
        return json.loads(data)