diff options
Diffstat (limited to '')
-rw-r--r-- | test/unit.py | 92 |
1 files changed, 88 insertions, 4 deletions
diff --git a/test/unit.py b/test/unit.py index 58c8327d..cec62489 100644 --- a/test/unit.py +++ b/test/unit.py @@ -1,5 +1,6 @@ import os import re +import ssl import sys import json import time @@ -67,6 +68,19 @@ class TestUnit(unittest.TestCase): except: m = None + elif module == 'openssl': + try: + subprocess.check_output(['which', 'openssl']) + + output = subprocess.check_output([ + self.pardir + '/build/unitd', '--version'], + stderr=subprocess.STDOUT) + + m = re.search('--openssl', output.decode()) + + except: + m = None + else: m = re.search('module: ' + module, log) @@ -192,6 +206,7 @@ class TestUnitHTTP(TestUnit): port = 7080 if 'port' not in kwargs else kwargs['port'] url = '/' if 'url' not in kwargs else kwargs['url'] http = 'HTTP/1.0' if 'http_10' in kwargs else 'HTTP/1.1' + blocking = False if 'blocking' not in kwargs else kwargs['blocking'] headers = ({ 'Host': 'localhost', @@ -215,6 +230,9 @@ class TestUnitHTTP(TestUnit): if 'sock' not in kwargs: sock = socket.socket(sock_types[sock_type], socket.SOCK_STREAM) + if 'wrapper' in kwargs: + sock = kwargs['wrapper'](sock) + connect_args = addr if sock_type == 'unix' else (addr, port) try: sock.connect(connect_args) @@ -222,11 +240,11 @@ class TestUnitHTTP(TestUnit): sock.close() return None + sock.setblocking(blocking) + else: sock = kwargs['sock'] - sock.setblocking(False) - if 'raw' not in kwargs: req = ' '.join([start_str, url, http]) + crlf @@ -371,8 +389,8 @@ class TestUnitApplicationProto(TestUnitControl): def sec_epoch(self): return time.mktime(time.gmtime()) - def date_to_sec_epoch(self, date): - return time.mktime(time.strptime(date, '%a, %d %b %Y %H:%M:%S %Z')) + def date_to_sec_epoch(self, date, template='%a, %d %b %Y %H:%M:%S %Z'): + return time.mktime(time.strptime(date, template)) def search_in_log(self, pattern): with open(self.testdir + '/unit.log', 'r', errors='ignore') as f: @@ -484,3 +502,69 @@ class TestUnitApplicationPerl(TestUnitApplicationProto): } } }) + +class TestUnitApplicationTLS(TestUnitApplicationProto): + def __init__(self, test): + super().__init__(test) + + self.context = ssl.create_default_context() + self.context.check_hostname = False + self.context.verify_mode = ssl.CERT_NONE + + def certificate(self, name='default', load=True): + subprocess.call(['openssl', 'req', '-x509', '-new', '-config', + self.testdir + '/openssl.conf', '-subj', '/CN=' + name + '/', + '-out', self.testdir + '/' + name + '.crt', + '-keyout', self.testdir + '/' + name + '.key']) + + if load: + self.certificate_load(name) + + def certificate_load(self, crt, key=None): + if key is None: + key = crt + + with open(self.testdir + '/' + key + '.key', 'rb') as k, \ + open(self.testdir + '/' + crt + '.crt', 'rb') as c: + return self.conf(k.read() + c.read(), '/certificates/' + crt) + + def get_ssl(self, **kwargs): + return self.get(blocking=True, wrapper=self.context.wrap_socket, + **kwargs) + + def post_ssl(self, **kwargs): + return self.post(blocking=True, wrapper=self.context.wrap_socket, + **kwargs) + + def get_server_certificate(self, addr=('127.0.0.1', 7080)): + return ssl.get_server_certificate(addr) + + def load(self, script, name=None): + if name is None: + name = script + + # create default openssl configuration + + with open(self.testdir + '/openssl.conf', 'w') as f: + f.write("""[ req ] +default_bits = 1024 +encrypt_key = no +distinguished_name = req_distinguished_name +[ req_distinguished_name ]""") + + self.conf({ + "listeners": { + "*:7080": { + "application": name + } + }, + "applications": { + name: { + "type": "python", + "processes": { "spare": 0 }, + "path": self.current_dir + '/python/' + script, + "working_directory": self.current_dir + '/python/' + script, + "module": "wsgi" + } + } + }) |