diff options
author | Andrei Belov <defan@nginx.com> | 2021-05-27 17:03:24 +0300 |
---|---|---|
committer | Andrei Belov <defan@nginx.com> | 2021-05-27 17:03:24 +0300 |
commit | 0afb4b5790c5a37ba6b880eb351a65fe00521fbe (patch) | |
tree | c7e0b6bed92ee62a5e8b13c945c4134e68554cec /test/test_tls.py | |
parent | 21ff5e086ece7188df3b7338d228fa4fb7f886af (diff) | |
parent | d06e55dfa3692e27a92ff6c2534bb083416bc0c8 (diff) | |
download | unit-0afb4b5790c5a37ba6b880eb351a65fe00521fbe.tar.gz unit-0afb4b5790c5a37ba6b880eb351a65fe00521fbe.tar.bz2 |
Merged with the default branch.1.24.0-1
Diffstat (limited to 'test/test_tls.py')
-rw-r--r-- | test/test_tls.py | 291 |
1 files changed, 177 insertions, 114 deletions
diff --git a/test/test_tls.py b/test/test_tls.py index 206ea828..0cfeaded 100644 --- a/test/test_tls.py +++ b/test/test_tls.py @@ -2,8 +2,10 @@ import io import re import ssl import subprocess +import time import pytest + from unit.applications.tls import TestApplicationTLS from unit.option import option @@ -11,10 +13,6 @@ from unit.option import option class TestTLS(TestApplicationTLS): prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}} - def findall(self, pattern): - with open(option.temp_dir + '/unit.log', 'r', errors='ignore') as f: - return re.findall(pattern, f.read()) - def openssl_date_to_sec_epoch(self, date): return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z') @@ -22,7 +20,7 @@ class TestTLS(TestApplicationTLS): assert 'success' in self.conf( { "pass": "applications/" + application, - "tls": {"certificate": cert} + "tls": {"certificate": cert}, }, 'listeners/*:' + str(port), ) @@ -32,6 +30,91 @@ class TestTLS(TestApplicationTLS): {"pass": "applications/" + application}, 'listeners/*:' + str(port) ) + def req(self, name='localhost', subject=None, x509=False): + subj = subject if subject is not None else '/CN=' + name + '/' + + subprocess.call( + [ + 'openssl', + 'req', + '-new', + '-subj', + subj, + '-config', + option.temp_dir + '/openssl.conf', + '-out', + option.temp_dir + '/' + name + '.csr', + '-keyout', + option.temp_dir + '/' + name + '.key', + ], + stderr=subprocess.STDOUT, + ) + + def generate_ca_conf(self): + with open(option.temp_dir + '/ca.conf', 'w') as f: + f.write( + """[ ca ] +default_ca = myca + +[ myca ] +new_certs_dir = %(dir)s +database = %(database)s +default_md = sha256 +policy = myca_policy +serial = %(certserial)s +default_days = 1 +x509_extensions = myca_extensions +copy_extensions = copy + +[ myca_policy ] +commonName = optional + +[ myca_extensions ] +basicConstraints = critical,CA:TRUE""" + % { + 'dir': option.temp_dir, + 'database': option.temp_dir + '/certindex', + 'certserial': option.temp_dir + '/certserial', + } + ) + + with open(option.temp_dir + '/certserial', 'w') as f: + f.write('1000') + + with open(option.temp_dir + '/certindex', 'w') as f: + f.write('') + + with open(option.temp_dir + '/certindex.attr', 'w') as f: + f.write('') + + def ca(self, cert='root', out='localhost'): + subprocess.call( + [ + 'openssl', + 'ca', + '-batch', + '-config', + option.temp_dir + '/ca.conf', + '-keyfile', + option.temp_dir + '/' + cert + '.key', + '-cert', + option.temp_dir + '/' + cert + '.crt', + '-in', + option.temp_dir + '/' + out + '.csr', + '-out', + option.temp_dir + '/' + out + '.crt', + ], + stderr=subprocess.STDOUT, + ) + + def set_certificate_req_context(self, cert='root'): + self.context = ssl.create_default_context() + self.context.check_hostname = False + self.context.verify_mode = ssl.CERT_REQUIRED + self.context.load_verify_locations( + option.temp_dir + '/' + cert + '.crt' + ) + def test_tls_listener_option_add(self): self.load('empty') @@ -212,113 +295,13 @@ class TestTLS(TestApplicationTLS): self.certificate('root', False) - subprocess.call( - [ - 'openssl', - 'req', - '-new', - '-subj', - '/CN=int/', - '-config', - temp_dir + '/openssl.conf', - '-out', - temp_dir + '/int.csr', - '-keyout', - temp_dir + '/int.key', - ], - stderr=subprocess.STDOUT, - ) + self.req('int') + self.req('end') - subprocess.call( - [ - 'openssl', - 'req', - '-new', - '-subj', - '/CN=end/', - '-config', - temp_dir + '/openssl.conf', - '-out', - temp_dir + '/end.csr', - '-keyout', - temp_dir + '/end.key', - ], - stderr=subprocess.STDOUT, - ) + self.generate_ca_conf() - with open(temp_dir + '/ca.conf', 'w') as f: - f.write( - """[ ca ] -default_ca = myca - -[ myca ] -new_certs_dir = %(dir)s -database = %(database)s -default_md = sha256 -policy = myca_policy -serial = %(certserial)s -default_days = 1 -x509_extensions = myca_extensions - -[ myca_policy ] -commonName = supplied - -[ myca_extensions ] -basicConstraints = critical,CA:TRUE""" - % { - 'dir': temp_dir, - 'database': temp_dir + '/certindex', - 'certserial': temp_dir + '/certserial', - } - ) - - with open(temp_dir + '/certserial', 'w') as f: - f.write('1000') - - with open(temp_dir + '/certindex', 'w') as f: - f.write('') - - subprocess.call( - [ - 'openssl', - 'ca', - '-batch', - '-subj', - '/CN=int/', - '-config', - temp_dir + '/ca.conf', - '-keyfile', - temp_dir + '/root.key', - '-cert', - temp_dir + '/root.crt', - '-in', - temp_dir + '/int.csr', - '-out', - temp_dir + '/int.crt', - ], - stderr=subprocess.STDOUT, - ) - - subprocess.call( - [ - 'openssl', - 'ca', - '-batch', - '-subj', - '/CN=end/', - '-config', - temp_dir + '/ca.conf', - '-keyfile', - temp_dir + '/int.key', - '-cert', - temp_dir + '/int.crt', - '-in', - temp_dir + '/end.csr', - '-out', - temp_dir + '/end.crt', - ], - stderr=subprocess.STDOUT, - ) + self.ca(cert='root', out='int') + self.ca(cert='int', out='end') crt_path = temp_dir + '/end-int.crt' end_path = temp_dir + '/end.crt' @@ -329,10 +312,7 @@ basicConstraints = critical,CA:TRUE""" ) as int: crt.write(end.read() + int.read()) - self.context = ssl.create_default_context() - self.context.check_hostname = False - self.context.verify_mode = ssl.CERT_REQUIRED - self.context.load_verify_locations(temp_dir + '/root.crt') + self.set_certificate_req_context() # incomplete chain @@ -406,6 +386,67 @@ basicConstraints = critical,CA:TRUE""" self.get_ssl()['status'] == 200 ), 'certificate chain intermediate server' + def test_tls_certificate_empty_cn(self, temp_dir): + self.certificate('root', False) + + self.req(subject='/') + + self.generate_ca_conf() + self.ca() + + self.set_certificate_req_context() + + assert 'success' in self.certificate_load('localhost', 'localhost') + + cert = self.conf_get('/certificates/localhost') + assert cert['chain'][0]['subject'] == {}, 'empty subject' + assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' + + def test_tls_certificate_empty_cn_san(self, temp_dir): + self.certificate('root', False) + + self.openssl_conf( + rewrite=True, alt_names=["example.com", "www.example.net"] + ) + + self.req(subject='/') + + self.generate_ca_conf() + self.ca() + + self.set_certificate_req_context() + + assert 'success' in self.certificate_load('localhost', 'localhost') + + cert = self.conf_get('/certificates/localhost') + assert cert['chain'][0]['subject'] == { + 'alt_names': ['example.com', 'www.example.net'] + }, 'subject alt_names' + assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' + + def test_tls_certificate_empty_cn_san_ip(self): + self.certificate('root', False) + + self.openssl_conf( + rewrite=True, + alt_names=['example.com', 'www.example.net', 'IP|10.0.0.1'], + ) + + self.req(subject='/') + + self.generate_ca_conf() + self.ca() + + self.set_certificate_req_context() + + assert 'success' in self.certificate_load('localhost', 'localhost') + + cert = self.conf_get('/certificates/localhost') + assert cert['chain'][0]['subject'] == { + 'alt_names': ['example.com', 'www.example.net'] + }, 'subject alt_names' + assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' + @pytest.mark.skip('not yet') def test_tls_reconfigure(self): self.load('empty') @@ -461,6 +502,28 @@ basicConstraints = critical,CA:TRUE""" assert resp['body'] == '0123456789', 'keepalive 2' + def test_tls_no_close_notify(self): + self.certificate() + + assert 'success' in self.conf( + { + "listeners": { + "*:7080": { + "pass": "routes", + "tls": {"certificate": "default"}, + } + }, + "routes": [{"action": {"return": 200}}], + "applications": {}, + } + ), 'load application configuration' + + (resp, sock) = self.get_ssl(start=True) + + time.sleep(5) + + sock.close() + @pytest.mark.skip('not yet') def test_tls_keepalive_certificate_remove(self): self.load('empty') |