summaryrefslogtreecommitdiffhomepage
path: root/test/test_tls.py
diff options
context:
space:
mode:
authorAndrei Belov <defan@nginx.com>2021-05-27 17:03:24 +0300
committerAndrei Belov <defan@nginx.com>2021-05-27 17:03:24 +0300
commit0afb4b5790c5a37ba6b880eb351a65fe00521fbe (patch)
treec7e0b6bed92ee62a5e8b13c945c4134e68554cec /test/test_tls.py
parent21ff5e086ece7188df3b7338d228fa4fb7f886af (diff)
parentd06e55dfa3692e27a92ff6c2534bb083416bc0c8 (diff)
downloadunit-0afb4b5790c5a37ba6b880eb351a65fe00521fbe.tar.gz
unit-0afb4b5790c5a37ba6b880eb351a65fe00521fbe.tar.bz2
Merged with the default branch.1.24.0-1
Diffstat (limited to 'test/test_tls.py')
-rw-r--r--test/test_tls.py291
1 files changed, 177 insertions, 114 deletions
diff --git a/test/test_tls.py b/test/test_tls.py
index 206ea828..0cfeaded 100644
--- a/test/test_tls.py
+++ b/test/test_tls.py
@@ -2,8 +2,10 @@ import io
import re
import ssl
import subprocess
+import time
import pytest
+
from unit.applications.tls import TestApplicationTLS
from unit.option import option
@@ -11,10 +13,6 @@ from unit.option import option
class TestTLS(TestApplicationTLS):
prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}}
- def findall(self, pattern):
- with open(option.temp_dir + '/unit.log', 'r', errors='ignore') as f:
- return re.findall(pattern, f.read())
-
def openssl_date_to_sec_epoch(self, date):
return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z')
@@ -22,7 +20,7 @@ class TestTLS(TestApplicationTLS):
assert 'success' in self.conf(
{
"pass": "applications/" + application,
- "tls": {"certificate": cert}
+ "tls": {"certificate": cert},
},
'listeners/*:' + str(port),
)
@@ -32,6 +30,91 @@ class TestTLS(TestApplicationTLS):
{"pass": "applications/" + application}, 'listeners/*:' + str(port)
)
+ def req(self, name='localhost', subject=None, x509=False):
+ subj = subject if subject is not None else '/CN=' + name + '/'
+
+ subprocess.call(
+ [
+ 'openssl',
+ 'req',
+ '-new',
+ '-subj',
+ subj,
+ '-config',
+ option.temp_dir + '/openssl.conf',
+ '-out',
+ option.temp_dir + '/' + name + '.csr',
+ '-keyout',
+ option.temp_dir + '/' + name + '.key',
+ ],
+ stderr=subprocess.STDOUT,
+ )
+
+ def generate_ca_conf(self):
+ with open(option.temp_dir + '/ca.conf', 'w') as f:
+ f.write(
+ """[ ca ]
+default_ca = myca
+
+[ myca ]
+new_certs_dir = %(dir)s
+database = %(database)s
+default_md = sha256
+policy = myca_policy
+serial = %(certserial)s
+default_days = 1
+x509_extensions = myca_extensions
+copy_extensions = copy
+
+[ myca_policy ]
+commonName = optional
+
+[ myca_extensions ]
+basicConstraints = critical,CA:TRUE"""
+ % {
+ 'dir': option.temp_dir,
+ 'database': option.temp_dir + '/certindex',
+ 'certserial': option.temp_dir + '/certserial',
+ }
+ )
+
+ with open(option.temp_dir + '/certserial', 'w') as f:
+ f.write('1000')
+
+ with open(option.temp_dir + '/certindex', 'w') as f:
+ f.write('')
+
+ with open(option.temp_dir + '/certindex.attr', 'w') as f:
+ f.write('')
+
+ def ca(self, cert='root', out='localhost'):
+ subprocess.call(
+ [
+ 'openssl',
+ 'ca',
+ '-batch',
+ '-config',
+ option.temp_dir + '/ca.conf',
+ '-keyfile',
+ option.temp_dir + '/' + cert + '.key',
+ '-cert',
+ option.temp_dir + '/' + cert + '.crt',
+ '-in',
+ option.temp_dir + '/' + out + '.csr',
+ '-out',
+ option.temp_dir + '/' + out + '.crt',
+ ],
+ stderr=subprocess.STDOUT,
+ )
+
+ def set_certificate_req_context(self, cert='root'):
+ self.context = ssl.create_default_context()
+ self.context.check_hostname = False
+ self.context.verify_mode = ssl.CERT_REQUIRED
+ self.context.load_verify_locations(
+ option.temp_dir + '/' + cert + '.crt'
+ )
+
def test_tls_listener_option_add(self):
self.load('empty')
@@ -212,113 +295,13 @@ class TestTLS(TestApplicationTLS):
self.certificate('root', False)
- subprocess.call(
- [
- 'openssl',
- 'req',
- '-new',
- '-subj',
- '/CN=int/',
- '-config',
- temp_dir + '/openssl.conf',
- '-out',
- temp_dir + '/int.csr',
- '-keyout',
- temp_dir + '/int.key',
- ],
- stderr=subprocess.STDOUT,
- )
+ self.req('int')
+ self.req('end')
- subprocess.call(
- [
- 'openssl',
- 'req',
- '-new',
- '-subj',
- '/CN=end/',
- '-config',
- temp_dir + '/openssl.conf',
- '-out',
- temp_dir + '/end.csr',
- '-keyout',
- temp_dir + '/end.key',
- ],
- stderr=subprocess.STDOUT,
- )
+ self.generate_ca_conf()
- with open(temp_dir + '/ca.conf', 'w') as f:
- f.write(
- """[ ca ]
-default_ca = myca
-
-[ myca ]
-new_certs_dir = %(dir)s
-database = %(database)s
-default_md = sha256
-policy = myca_policy
-serial = %(certserial)s
-default_days = 1
-x509_extensions = myca_extensions
-
-[ myca_policy ]
-commonName = supplied
-
-[ myca_extensions ]
-basicConstraints = critical,CA:TRUE"""
- % {
- 'dir': temp_dir,
- 'database': temp_dir + '/certindex',
- 'certserial': temp_dir + '/certserial',
- }
- )
-
- with open(temp_dir + '/certserial', 'w') as f:
- f.write('1000')
-
- with open(temp_dir + '/certindex', 'w') as f:
- f.write('')
-
- subprocess.call(
- [
- 'openssl',
- 'ca',
- '-batch',
- '-subj',
- '/CN=int/',
- '-config',
- temp_dir + '/ca.conf',
- '-keyfile',
- temp_dir + '/root.key',
- '-cert',
- temp_dir + '/root.crt',
- '-in',
- temp_dir + '/int.csr',
- '-out',
- temp_dir + '/int.crt',
- ],
- stderr=subprocess.STDOUT,
- )
-
- subprocess.call(
- [
- 'openssl',
- 'ca',
- '-batch',
- '-subj',
- '/CN=end/',
- '-config',
- temp_dir + '/ca.conf',
- '-keyfile',
- temp_dir + '/int.key',
- '-cert',
- temp_dir + '/int.crt',
- '-in',
- temp_dir + '/end.csr',
- '-out',
- temp_dir + '/end.crt',
- ],
- stderr=subprocess.STDOUT,
- )
+ self.ca(cert='root', out='int')
+ self.ca(cert='int', out='end')
crt_path = temp_dir + '/end-int.crt'
end_path = temp_dir + '/end.crt'
@@ -329,10 +312,7 @@ basicConstraints = critical,CA:TRUE"""
) as int:
crt.write(end.read() + int.read())
- self.context = ssl.create_default_context()
- self.context.check_hostname = False
- self.context.verify_mode = ssl.CERT_REQUIRED
- self.context.load_verify_locations(temp_dir + '/root.crt')
+ self.set_certificate_req_context()
# incomplete chain
@@ -406,6 +386,67 @@ basicConstraints = critical,CA:TRUE"""
self.get_ssl()['status'] == 200
), 'certificate chain intermediate server'
+ def test_tls_certificate_empty_cn(self, temp_dir):
+ self.certificate('root', False)
+
+ self.req(subject='/')
+
+ self.generate_ca_conf()
+ self.ca()
+
+ self.set_certificate_req_context()
+
+ assert 'success' in self.certificate_load('localhost', 'localhost')
+
+ cert = self.conf_get('/certificates/localhost')
+ assert cert['chain'][0]['subject'] == {}, 'empty subject'
+ assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
+
+ def test_tls_certificate_empty_cn_san(self, temp_dir):
+ self.certificate('root', False)
+
+ self.openssl_conf(
+ rewrite=True, alt_names=["example.com", "www.example.net"]
+ )
+
+ self.req(subject='/')
+
+ self.generate_ca_conf()
+ self.ca()
+
+ self.set_certificate_req_context()
+
+ assert 'success' in self.certificate_load('localhost', 'localhost')
+
+ cert = self.conf_get('/certificates/localhost')
+ assert cert['chain'][0]['subject'] == {
+ 'alt_names': ['example.com', 'www.example.net']
+ }, 'subject alt_names'
+ assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
+
+ def test_tls_certificate_empty_cn_san_ip(self):
+ self.certificate('root', False)
+
+ self.openssl_conf(
+ rewrite=True,
+ alt_names=['example.com', 'www.example.net', 'IP|10.0.0.1'],
+ )
+
+ self.req(subject='/')
+
+ self.generate_ca_conf()
+ self.ca()
+
+ self.set_certificate_req_context()
+
+ assert 'success' in self.certificate_load('localhost', 'localhost')
+
+ cert = self.conf_get('/certificates/localhost')
+ assert cert['chain'][0]['subject'] == {
+ 'alt_names': ['example.com', 'www.example.net']
+ }, 'subject alt_names'
+ assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
+
@pytest.mark.skip('not yet')
def test_tls_reconfigure(self):
self.load('empty')
@@ -461,6 +502,28 @@ basicConstraints = critical,CA:TRUE"""
assert resp['body'] == '0123456789', 'keepalive 2'
+ def test_tls_no_close_notify(self):
+ self.certificate()
+
+ assert 'success' in self.conf(
+ {
+ "listeners": {
+ "*:7080": {
+ "pass": "routes",
+ "tls": {"certificate": "default"},
+ }
+ },
+ "routes": [{"action": {"return": 200}}],
+ "applications": {},
+ }
+ ), 'load application configuration'
+
+ (resp, sock) = self.get_ssl(start=True)
+
+ time.sleep(5)
+
+ sock.close()
+
@pytest.mark.skip('not yet')
def test_tls_keepalive_certificate_remove(self):
self.load('empty')