diff options
author | Konstantin Pavlov <thresh@nginx.com> | 2023-08-31 09:41:46 -0700 |
---|---|---|
committer | Konstantin Pavlov <thresh@nginx.com> | 2023-08-31 09:41:46 -0700 |
commit | c45c8919c7232eb20023484f6d1fc9f1f50395d8 (patch) | |
tree | cc12eb307c1611494948645e4b487fa06495c3d2 /test/test_tls_sni.py | |
parent | 88c90e1c351ab8c5bd487a5cd4b735014b08e271 (diff) | |
parent | 9b22b6957bc87b3df002d0bc691fdae6a20abdac (diff) | |
download | unit-c45c8919c7232eb20023484f6d1fc9f1f50395d8.tar.gz unit-c45c8919c7232eb20023484f6d1fc9f1f50395d8.tar.bz2 |
Merged with the default branch.1.31.0-1
Diffstat (limited to 'test/test_tls_sni.py')
-rw-r--r-- | test/test_tls_sni.py | 502 |
1 files changed, 261 insertions, 241 deletions
diff --git a/test/test_tls_sni.py b/test/test_tls_sni.py index e918bb20..253d9813 100644 --- a/test/test_tls_sni.py +++ b/test/test_tls_sni.py @@ -1,38 +1,112 @@ import ssl import subprocess -from unit.applications.tls import TestApplicationTLS +import pytest +from unit.applications.tls import ApplicationTLS from unit.option import option +prerequisites = {'modules': {'openssl': 'any'}} -class TestTLSSNI(TestApplicationTLS): - prerequisites = {'modules': {'openssl': 'any'}} +client = ApplicationTLS() - def setup_method(self): - self._load_conf( - { - "listeners": {"*:7080": {"pass": "routes"}}, - "routes": [{"action": {"return": 200}}], - "applications": {}, - } - ) - def openssl_date_to_sec_epoch(self, date): - return self.date_to_sec_epoch(date, '%b %d %X %Y %Z') +@pytest.fixture(autouse=True) +def setup_method_fixture(): + assert 'success' in client.conf( + { + "listeners": {"*:7080": {"pass": "routes"}}, + "routes": [{"action": {"return": 200}}], + "applications": {}, + } + ) + + +def add_tls(cert='default'): + assert 'success' in client.conf( + {"pass": "routes", "tls": {"certificate": cert}}, + 'listeners/*:7080', + ) + + +def check_cert(host, expect, ctx): + resp, sock = client.get_ssl( + headers={ + 'Host': host, + 'Content-Length': '0', + 'Connection': 'close', + }, + start=True, + context=ctx, + ) + + assert resp['status'] == 200 + assert sock.getpeercert()['subject'][0][0][1] == expect + + +def config_bundles(bundles): + client.certificate('root', False) + + for b in bundles: + client.openssl_conf(rewrite=True, alt_names=bundles[b]['alt_names']) + subj = f'/CN={bundles[b]["subj"]}/' if 'subj' in bundles[b] else '/' + + subprocess.check_output( + [ + 'openssl', + 'req', + '-new', + '-subj', + subj, + '-config', + f'{option.temp_dir}/openssl.conf', + '-out', + f'{option.temp_dir}/{b}.csr', + '-keyout', + f'{option.temp_dir}/{b}.key', + ], + stderr=subprocess.STDOUT, + ) - def add_tls(self, cert='default'): - assert 'success' in self.conf( - {"pass": "routes", "tls": {"certificate": cert}}, - 'listeners/*:7080', + generate_ca_conf() + + for b in bundles: + subj = f'/CN={bundles[b]["subj"]}/' if 'subj' in bundles[b] else '/' + + subprocess.check_output( + [ + 'openssl', + 'ca', + '-batch', + '-subj', + subj, + '-config', + f'{option.temp_dir}/ca.conf', + '-keyfile', + f'{option.temp_dir}/root.key', + '-cert', + f'{option.temp_dir}/root.crt', + '-in', + f'{option.temp_dir}/{b}.csr', + '-out', + f'{option.temp_dir}/{b}.crt', + ], + stderr=subprocess.STDOUT, ) - def remove_tls(self): - assert 'success' in self.conf({"pass": "routes"}, 'listeners/*:7080') + load_certs(bundles) + + context = ssl.create_default_context() + context.check_hostname = False + context.verify_mode = ssl.CERT_REQUIRED + context.load_verify_locations(f'{option.temp_dir}/root.crt') - def generate_ca_conf(self): - with open(f'{option.temp_dir}/ca.conf', 'w') as f: - f.write( - f"""[ ca ] + return context + + +def generate_ca_conf(): + with open(f'{option.temp_dir}/ca.conf', 'w') as f: + f.write( + f"""[ ca ] default_ca = myca [ myca ] @@ -50,231 +124,177 @@ commonName = optional [ myca_extensions ] basicConstraints = critical,CA:TRUE""" - ) - - with open(f'{option.temp_dir}/certserial', 'w') as f: - f.write('1000') - - with open(f'{option.temp_dir}/certindex', 'w') as f: - f.write('') - - def config_bundles(self, bundles): - self.certificate('root', False) - - for b in bundles: - self.openssl_conf(rewrite=True, alt_names=bundles[b]['alt_names']) - subj = f'/CN={bundles[b]["subj"]}/' if 'subj' in bundles[b] else '/' - - subprocess.check_output( - [ - 'openssl', - 'req', - '-new', - '-subj', - subj, - '-config', - f'{option.temp_dir}/openssl.conf', - '-out', - f'{option.temp_dir}/{b}.csr', - '-keyout', - f'{option.temp_dir}/{b}.key', - ], - stderr=subprocess.STDOUT, - ) - - self.generate_ca_conf() - - for b in bundles: - subj = f'/CN={bundles[b]["subj"]}/' if 'subj' in bundles[b] else '/' - - subprocess.check_output( - [ - 'openssl', - 'ca', - '-batch', - '-subj', - subj, - '-config', - f'{option.temp_dir}/ca.conf', - '-keyfile', - f'{option.temp_dir}/root.key', - '-cert', - f'{option.temp_dir}/root.crt', - '-in', - f'{option.temp_dir}/{b}.csr', - '-out', - f'{option.temp_dir}/{b}.crt', - ], - stderr=subprocess.STDOUT, - ) - - self.context = ssl.create_default_context() - self.context.check_hostname = False - self.context.verify_mode = ssl.CERT_REQUIRED - self.context.load_verify_locations(f'{option.temp_dir}/root.crt') - - self.load_certs(bundles) - - def load_certs(self, bundles): - for bname, bvalue in bundles.items(): - assert 'success' in self.certificate_load( - bname, bname - ), f'certificate {bvalue["subj"]} upload' - - def check_cert(self, host, expect): - resp, sock = self.get_ssl( - headers={ - 'Host': host, - 'Content-Length': '0', - 'Connection': 'close', - }, - start=True, ) - assert resp['status'] == 200 - assert sock.getpeercert()['subject'][0][0][1] == expect - - def test_tls_sni(self): - bundles = { - "default": {"subj": "default", "alt_names": ["default"]}, - "localhost.com": { - "subj": "localhost.com", - "alt_names": ["alt1.localhost.com"], - }, - "example.com": { - "subj": "example.com", - "alt_names": ["alt1.example.com", "alt2.example.com"], - }, + with open(f'{option.temp_dir}/certserial', 'w') as f: + f.write('1000') + + with open(f'{option.temp_dir}/certindex', 'w') as f: + f.write('') + + +def load_certs(bundles): + for bname, bvalue in bundles.items(): + assert 'success' in client.certificate_load( + bname, bname + ), f'certificate {bvalue["subj"]} upload' + + +def remove_tls(): + assert 'success' in client.conf({"pass": "routes"}, 'listeners/*:7080') + + +def test_tls_sni(): + bundles = { + "default": {"subj": "default", "alt_names": ["default"]}, + "localhost.com": { + "subj": "localhost.com", + "alt_names": ["alt1.localhost.com"], + }, + "example.com": { + "subj": "example.com", + "alt_names": ["alt1.example.com", "alt2.example.com"], + }, + } + ctx = config_bundles(bundles) + add_tls(["default", "localhost.com", "example.com"]) + + check_cert('alt1.localhost.com', bundles['localhost.com']['subj'], ctx) + check_cert('alt2.example.com', bundles['example.com']['subj'], ctx) + check_cert('blah', bundles['default']['subj'], ctx) + + +def test_tls_sni_no_hostname(): + bundles = { + "localhost.com": {"subj": "localhost.com", "alt_names": []}, + "example.com": { + "subj": "example.com", + "alt_names": ["example.com"], + }, + } + ctx = config_bundles(bundles) + add_tls(["localhost.com", "example.com"]) + + resp, sock = client.get_ssl( + headers={'Content-Length': '0', 'Connection': 'close'}, + start=True, + context=ctx, + ) + assert resp['status'] == 200 + assert ( + sock.getpeercert()['subject'][0][0][1] + == bundles['localhost.com']['subj'] + ) + + +def test_tls_sni_upper_case(): + bundles = { + "localhost.com": {"subj": "LOCALHOST.COM", "alt_names": []}, + "example.com": { + "subj": "example.com", + "alt_names": ["ALT1.EXAMPLE.COM", "*.ALT2.EXAMPLE.COM"], + }, + } + ctx = config_bundles(bundles) + add_tls(["localhost.com", "example.com"]) + + check_cert('localhost.com', bundles['localhost.com']['subj'], ctx) + check_cert('LOCALHOST.COM', bundles['localhost.com']['subj'], ctx) + check_cert('EXAMPLE.COM', bundles['localhost.com']['subj'], ctx) + check_cert('ALT1.EXAMPLE.COM', bundles['example.com']['subj'], ctx) + check_cert('WWW.ALT2.EXAMPLE.COM', bundles['example.com']['subj'], ctx) + + +def test_tls_sni_only_bundle(): + bundles = { + "localhost.com": { + "subj": "localhost.com", + "alt_names": ["alt1.localhost.com", "alt2.localhost.com"], } - self.config_bundles(bundles) - self.add_tls(["default", "localhost.com", "example.com"]) - - self.check_cert('alt1.localhost.com', bundles['localhost.com']['subj']) - self.check_cert('alt2.example.com', bundles['example.com']['subj']) - self.check_cert('blah', bundles['default']['subj']) - - def test_tls_sni_no_hostname(self): - bundles = { - "localhost.com": {"subj": "localhost.com", "alt_names": []}, - "example.com": { - "subj": "example.com", - "alt_names": ["example.com"], - }, + } + ctx = config_bundles(bundles) + add_tls(["localhost.com"]) + + check_cert('domain.com', bundles['localhost.com']['subj'], ctx) + check_cert('alt1.domain.com', bundles['localhost.com']['subj'], ctx) + + +def test_tls_sni_wildcard(): + bundles = { + "localhost.com": {"subj": "localhost.com", "alt_names": []}, + "example.com": { + "subj": "example.com", + "alt_names": ["*.example.com", "*.alt.example.com"], + }, + } + ctx = config_bundles(bundles) + add_tls(["localhost.com", "example.com"]) + + check_cert('example.com', bundles['localhost.com']['subj'], ctx) + check_cert('www.example.com', bundles['example.com']['subj'], ctx) + check_cert('alt.example.com', bundles['example.com']['subj'], ctx) + check_cert('www.alt.example.com', bundles['example.com']['subj'], ctx) + check_cert('www.alt.example.ru', bundles['localhost.com']['subj'], ctx) + + +def test_tls_sni_duplicated_bundle(): + bundles = { + "localhost.com": { + "subj": "localhost.com", + "alt_names": ["localhost.com", "alt2.localhost.com"], } - self.config_bundles(bundles) - self.add_tls(["localhost.com", "example.com"]) + } + ctx = config_bundles(bundles) + add_tls(["localhost.com", "localhost.com"]) - resp, sock = self.get_ssl( - headers={'Content-Length': '0', 'Connection': 'close'}, - start=True, - ) - assert resp['status'] == 200 - assert ( - sock.getpeercert()['subject'][0][0][1] - == bundles['localhost.com']['subj'] - ) + check_cert('localhost.com', bundles['localhost.com']['subj'], ctx) + check_cert('alt2.localhost.com', bundles['localhost.com']['subj'], ctx) - def test_tls_sni_upper_case(self): - bundles = { - "localhost.com": {"subj": "LOCALHOST.COM", "alt_names": []}, - "example.com": { - "subj": "example.com", - "alt_names": ["ALT1.EXAMPLE.COM", "*.ALT2.EXAMPLE.COM"], - }, - } - self.config_bundles(bundles) - self.add_tls(["localhost.com", "example.com"]) - - self.check_cert('localhost.com', bundles['localhost.com']['subj']) - self.check_cert('LOCALHOST.COM', bundles['localhost.com']['subj']) - self.check_cert('EXAMPLE.COM', bundles['localhost.com']['subj']) - self.check_cert('ALT1.EXAMPLE.COM', bundles['example.com']['subj']) - self.check_cert('WWW.ALT2.EXAMPLE.COM', bundles['example.com']['subj']) - - def test_tls_sni_only_bundle(self): - bundles = { - "localhost.com": { - "subj": "localhost.com", - "alt_names": ["alt1.localhost.com", "alt2.localhost.com"], - } - } - self.config_bundles(bundles) - self.add_tls(["localhost.com"]) - - self.check_cert('domain.com', bundles['localhost.com']['subj']) - self.check_cert('alt1.domain.com', bundles['localhost.com']['subj']) - - def test_tls_sni_wildcard(self): - bundles = { - "localhost.com": {"subj": "localhost.com", "alt_names": []}, - "example.com": { - "subj": "example.com", - "alt_names": ["*.example.com", "*.alt.example.com"], - }, - } - self.config_bundles(bundles) - self.add_tls(["localhost.com", "example.com"]) - - self.check_cert('example.com', bundles['localhost.com']['subj']) - self.check_cert('www.example.com', bundles['example.com']['subj']) - self.check_cert('alt.example.com', bundles['example.com']['subj']) - self.check_cert('www.alt.example.com', bundles['example.com']['subj']) - self.check_cert('www.alt.example.ru', bundles['localhost.com']['subj']) - - def test_tls_sni_duplicated_bundle(self): - bundles = { - "localhost.com": { - "subj": "localhost.com", - "alt_names": ["localhost.com", "alt2.localhost.com"], - } - } - self.config_bundles(bundles) - self.add_tls(["localhost.com", "localhost.com"]) - self.check_cert('localhost.com', bundles['localhost.com']['subj']) - self.check_cert('alt2.localhost.com', bundles['localhost.com']['subj']) +def test_tls_sni_same_alt(): + bundles = { + "localhost": {"subj": "subj1", "alt_names": "same.altname.com"}, + "example": {"subj": "subj2", "alt_names": "same.altname.com"}, + } + ctx = config_bundles(bundles) + add_tls(["localhost", "example"]) - def test_tls_sni_same_alt(self): - bundles = { - "localhost": {"subj": "subj1", "alt_names": "same.altname.com"}, - "example": {"subj": "subj2", "alt_names": "same.altname.com"}, - } - self.config_bundles(bundles) - self.add_tls(["localhost", "example"]) - - self.check_cert('localhost', bundles['localhost']['subj']) - self.check_cert('example', bundles['localhost']['subj']) - - def test_tls_sni_empty_cn(self): - bundles = {"localhost": {"alt_names": ["alt.localhost.com"]}} - self.config_bundles(bundles) - self.add_tls(["localhost"]) - - resp, sock = self.get_ssl( - headers={ - 'Host': 'domain.com', - 'Content-Length': '0', - 'Connection': 'close', - }, - start=True, + check_cert('localhost', bundles['localhost']['subj'], ctx) + check_cert('example', bundles['localhost']['subj'], ctx) + + +def test_tls_sni_empty_cn(): + bundles = {"localhost": {"alt_names": ["alt.localhost.com"]}} + ctx = config_bundles(bundles) + add_tls(["localhost"]) + + resp, sock = client.get_ssl( + headers={ + 'Host': 'domain.com', + 'Content-Length': '0', + 'Connection': 'close', + }, + start=True, + context=ctx, + ) + + assert resp['status'] == 200 + assert sock.getpeercert()['subjectAltName'][0][1] == 'alt.localhost.com' + + +def test_tls_sni_invalid(): + _ = config_bundles({"localhost": {"subj": "subj1", "alt_names": ''}}) + add_tls(["localhost"]) + + def check_certificate(cert): + assert 'error' in client.conf( + {"pass": "routes", "tls": {"certificate": cert}}, + 'listeners/*:7080', ) - assert resp['status'] == 200 - assert sock.getpeercert()['subjectAltName'][0][1] == 'alt.localhost.com' - - def test_tls_sni_invalid(self): - self.config_bundles({"localhost": {"subj": "subj1", "alt_names": ''}}) - self.add_tls(["localhost"]) - - def check_certificate(cert): - assert 'error' in self.conf( - {"pass": "routes", "tls": {"certificate": cert}}, - 'listeners/*:7080', - ) - - check_certificate('') - check_certificate('blah') - check_certificate([]) - check_certificate(['blah']) - check_certificate(['localhost', 'blah']) - check_certificate(['localhost', []]) + check_certificate('') + check_certificate('blah') + check_certificate([]) + check_certificate(['blah']) + check_certificate(['localhost', 'blah']) + check_certificate(['localhost', []]) |