summaryrefslogtreecommitdiffhomepage
path: root/test/unit.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--test/unit.py92
1 files changed, 88 insertions, 4 deletions
diff --git a/test/unit.py b/test/unit.py
index 58c8327d..cec62489 100644
--- a/test/unit.py
+++ b/test/unit.py
@@ -1,5 +1,6 @@
import os
import re
+import ssl
import sys
import json
import time
@@ -67,6 +68,19 @@ class TestUnit(unittest.TestCase):
except:
m = None
+ elif module == 'openssl':
+ try:
+ subprocess.check_output(['which', 'openssl'])
+
+ output = subprocess.check_output([
+ self.pardir + '/build/unitd', '--version'],
+ stderr=subprocess.STDOUT)
+
+ m = re.search('--openssl', output.decode())
+
+ except:
+ m = None
+
else:
m = re.search('module: ' + module, log)
@@ -192,6 +206,7 @@ class TestUnitHTTP(TestUnit):
port = 7080 if 'port' not in kwargs else kwargs['port']
url = '/' if 'url' not in kwargs else kwargs['url']
http = 'HTTP/1.0' if 'http_10' in kwargs else 'HTTP/1.1'
+ blocking = False if 'blocking' not in kwargs else kwargs['blocking']
headers = ({
'Host': 'localhost',
@@ -215,6 +230,9 @@ class TestUnitHTTP(TestUnit):
if 'sock' not in kwargs:
sock = socket.socket(sock_types[sock_type], socket.SOCK_STREAM)
+ if 'wrapper' in kwargs:
+ sock = kwargs['wrapper'](sock)
+
connect_args = addr if sock_type == 'unix' else (addr, port)
try:
sock.connect(connect_args)
@@ -222,11 +240,11 @@ class TestUnitHTTP(TestUnit):
sock.close()
return None
+ sock.setblocking(blocking)
+
else:
sock = kwargs['sock']
- sock.setblocking(False)
-
if 'raw' not in kwargs:
req = ' '.join([start_str, url, http]) + crlf
@@ -371,8 +389,8 @@ class TestUnitApplicationProto(TestUnitControl):
def sec_epoch(self):
return time.mktime(time.gmtime())
- def date_to_sec_epoch(self, date):
- return time.mktime(time.strptime(date, '%a, %d %b %Y %H:%M:%S %Z'))
+ def date_to_sec_epoch(self, date, template='%a, %d %b %Y %H:%M:%S %Z'):
+ return time.mktime(time.strptime(date, template))
def search_in_log(self, pattern):
with open(self.testdir + '/unit.log', 'r', errors='ignore') as f:
@@ -484,3 +502,69 @@ class TestUnitApplicationPerl(TestUnitApplicationProto):
}
}
})
+
+class TestUnitApplicationTLS(TestUnitApplicationProto):
+ def __init__(self, test):
+ super().__init__(test)
+
+ self.context = ssl.create_default_context()
+ self.context.check_hostname = False
+ self.context.verify_mode = ssl.CERT_NONE
+
+ def certificate(self, name='default', load=True):
+ subprocess.call(['openssl', 'req', '-x509', '-new', '-config',
+ self.testdir + '/openssl.conf', '-subj', '/CN=' + name + '/',
+ '-out', self.testdir + '/' + name + '.crt',
+ '-keyout', self.testdir + '/' + name + '.key'])
+
+ if load:
+ self.certificate_load(name)
+
+ def certificate_load(self, crt, key=None):
+ if key is None:
+ key = crt
+
+ with open(self.testdir + '/' + key + '.key', 'rb') as k, \
+ open(self.testdir + '/' + crt + '.crt', 'rb') as c:
+ return self.conf(k.read() + c.read(), '/certificates/' + crt)
+
+ def get_ssl(self, **kwargs):
+ return self.get(blocking=True, wrapper=self.context.wrap_socket,
+ **kwargs)
+
+ def post_ssl(self, **kwargs):
+ return self.post(blocking=True, wrapper=self.context.wrap_socket,
+ **kwargs)
+
+ def get_server_certificate(self, addr=('127.0.0.1', 7080)):
+ return ssl.get_server_certificate(addr)
+
+ def load(self, script, name=None):
+ if name is None:
+ name = script
+
+ # create default openssl configuration
+
+ with open(self.testdir + '/openssl.conf', 'w') as f:
+ f.write("""[ req ]
+default_bits = 1024
+encrypt_key = no
+distinguished_name = req_distinguished_name
+[ req_distinguished_name ]""")
+
+ self.conf({
+ "listeners": {
+ "*:7080": {
+ "application": name
+ }
+ },
+ "applications": {
+ name: {
+ "type": "python",
+ "processes": { "spare": 0 },
+ "path": self.current_dir + '/python/' + script,
+ "working_directory": self.current_dir + '/python/' + script,
+ "module": "wsgi"
+ }
+ }
+ })