diff options
-rw-r--r-- | test/test_tls_sni.py | 286 | ||||
-rw-r--r-- | test/unit/applications/tls.py | 21 | ||||
-rw-r--r-- | test/unit/http.py | 3 |
3 files changed, 306 insertions, 4 deletions
diff --git a/test/test_tls_sni.py b/test/test_tls_sni.py new file mode 100644 index 00000000..7da05e6e --- /dev/null +++ b/test/test_tls_sni.py @@ -0,0 +1,286 @@ +import subprocess +import ssl + +import pytest +from unit.applications.tls import TestApplicationTLS +from unit.option import option + + +class TestTLSSNI(TestApplicationTLS): + prerequisites = {'modules': {'openssl': 'any'}} + + def setup_method(self): + self._load_conf( + { + "listeners": {"*:7080": {"pass": "routes"}}, + "routes": [{"action": {"return": 200}}], + "applications": {}, + } + ) + + def openssl_date_to_sec_epoch(self, date): + return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z') + + def add_tls(self, cert='default'): + assert 'success' in self.conf( + { + "pass": "routes", + "tls": {"certificate": cert} + }, + 'listeners/*:7080', + ) + + def remove_tls(self): + assert 'success' in self.conf({"pass": "routes"}, 'listeners/*:7080') + + def generate_ca_conf(self): + with open(option.temp_dir + '/ca.conf', 'w') as f: + f.write( + """[ ca ] +default_ca = myca + +[ myca ] +new_certs_dir = %(dir)s +database = %(database)s +default_md = sha256 +policy = myca_policy +serial = %(certserial)s +default_days = 1 +x509_extensions = myca_extensions +copy_extensions = copy + +[ myca_policy ] +commonName = optional + +[ myca_extensions ] +basicConstraints = critical,CA:TRUE""" + % { + 'dir': option.temp_dir, + 'database': option.temp_dir + '/certindex', + 'certserial': option.temp_dir + '/certserial', + } + ) + + with open(option.temp_dir + '/certserial', 'w') as f: + f.write('1000') + + with open(option.temp_dir + '/certindex', 'w') as f: + f.write('') + + def config_bundles(self, bundles): + self.certificate('root', False) + + for b in bundles: + self.openssl_conf(rewrite=True, alt_names=bundles[b]['alt_names']) + subj = ( + '/CN={}/'.format(bundles[b]['subj']) + if 'subj' in bundles[b] + else '/' + ) + + subprocess.call( + [ + 'openssl', + 'req', + '-new', + '-subj', + subj, + '-config', + option.temp_dir + '/openssl.conf', + '-out', + option.temp_dir + '/{}.csr'.format(b), + '-keyout', + option.temp_dir + '/{}.key'.format(b), + ], + stderr=subprocess.STDOUT, + ) + + self.generate_ca_conf() + + for b in bundles: + subj = ( + '/CN={}/'.format(bundles[b]['subj']) + if 'subj' in bundles[b] + else '/' + ) + + subprocess.call( + [ + 'openssl', + 'ca', + '-batch', + '-subj', + subj, + '-config', + option.temp_dir + '/ca.conf', + '-keyfile', + option.temp_dir + '/root.key', + '-cert', + option.temp_dir + '/root.crt', + '-in', + option.temp_dir + '/{}.csr'.format(b), + '-out', + option.temp_dir + '/{}.crt'.format(b), + ], + stderr=subprocess.STDOUT, + ) + + self.context = ssl.create_default_context() + self.context.check_hostname = False + self.context.verify_mode = ssl.CERT_REQUIRED + self.context.load_verify_locations(option.temp_dir + '/root.crt') + + self.load_certs(bundles) + + def load_certs(self, bundles): + for bname, bvalue in bundles.items(): + assert 'success' in self.certificate_load( + bname, bname + ), 'certificate {} upload'.format(bvalue['subj']) + + def check_cert(self, host, expect): + resp, sock = self.get_ssl( + headers={ + 'Host': host, + 'Content-Length': '0', + 'Connection': 'close', + }, + start=True, + ) + + assert resp['status'] == 200 + assert sock.getpeercert()['subject'][0][0][1] == expect + + def test_tls_sni(self): + bundles = { + "default": { + "subj": "default", + "alt_names": ["default"], + }, + "localhost.com": { + "subj": "localhost.com", + "alt_names": ["alt1.localhost.com"], + }, + "example.com": { + "subj": "example.com", + "alt_names": ["alt1.example.com", "alt2.example.com"], + }, + } + self.config_bundles(bundles) + self.add_tls(["default", "localhost.com", "example.com"]) + + self.check_cert('alt1.localhost.com', bundles['localhost.com']['subj']) + self.check_cert('alt2.example.com', bundles['example.com']['subj']) + self.check_cert('blah', bundles['default']['subj']) + + def test_tls_sni_upper_case(self): + bundles = { + "localhost.com": {"subj": "LOCALHOST.COM", "alt_names": []}, + "example.com": { + "subj": "example.com", + "alt_names": ["ALT1.EXAMPLE.COM", "*.ALT2.EXAMPLE.COM"], + }, + } + self.config_bundles(bundles) + self.add_tls(["localhost.com", "example.com"]) + + self.check_cert('localhost.com', bundles['localhost.com']['subj']) + self.check_cert('LOCALHOST.COM', bundles['localhost.com']['subj']) + self.check_cert('EXAMPLE.COM', bundles['localhost.com']['subj']) + self.check_cert('ALT1.EXAMPLE.COM', bundles['example.com']['subj']) + self.check_cert('WWW.ALT2.EXAMPLE.COM', bundles['example.com']['subj']) + + def test_tls_sni_only_bundle(self): + bundles = { + "localhost.com": { + "subj": "localhost.com", + "alt_names": ["alt1.localhost.com", "alt2.localhost.com"], + } + } + self.config_bundles(bundles) + self.add_tls(["localhost.com"]) + + self.check_cert('domain.com', bundles['localhost.com']['subj']) + self.check_cert('alt1.domain.com', bundles['localhost.com']['subj']) + + def test_tls_sni_wildcard(self): + bundles = { + "localhost.com": { + "subj": "localhost.com", + "alt_names": [], + }, + "example.com": { + "subj": "example.com", + "alt_names": ["*.example.com", "*.alt.example.com"], + }, + } + self.config_bundles(bundles) + self.add_tls(["localhost.com", "example.com"]) + + self.check_cert('example.com', bundles['localhost.com']['subj']) + self.check_cert('www.example.com', bundles['example.com']['subj']) + self.check_cert('alt.example.com', bundles['example.com']['subj']) + self.check_cert('www.alt.example.com', bundles['example.com']['subj']) + self.check_cert('www.alt.example.ru', bundles['localhost.com']['subj']) + + def test_tls_sni_duplicated_bundle(self): + bundles = { + "localhost.com": { + "subj": "localhost.com", + "alt_names": ["localhost.com", "alt2.localhost.com"], + } + } + self.config_bundles(bundles) + self.add_tls(["localhost.com", "localhost.com"]) + + self.check_cert('localhost.com', bundles['localhost.com']['subj']) + self.check_cert('alt2.localhost.com', bundles['localhost.com']['subj']) + + def test_tls_sni_same_alt(self): + bundles = { + "localhost": {"subj": "subj1", "alt_names": "same.altname.com"}, + "example": {"subj": "subj2", "alt_names": "same.altname.com"}, + } + self.config_bundles(bundles) + self.add_tls(["localhost", "example"]) + + self.check_cert('localhost', bundles['localhost']['subj']) + self.check_cert('example', bundles['localhost']['subj']) + + def test_tls_sni_empty_cn(self): + bundles = { + "localhost": { + "alt_names": ["alt.localhost.com"], + } + } + self.config_bundles(bundles) + self.add_tls(["localhost"]) + + resp, sock = self.get_ssl( + headers={ + 'Host': 'domain.com', + 'Content-Length': '0', + 'Connection': 'close', + }, + start=True, + ) + + assert resp['status'] == 200 + assert sock.getpeercert()['subjectAltName'][0][1] == 'alt.localhost.com' + + def test_tls_sni_invalid(self): + self.config_bundles({"localhost": {"subj": "subj1", "alt_names": ''}}) + self.add_tls(["localhost"]) + + def check_certificate(cert): + assert 'error' in self.conf( + {"pass": "routes", "tls": {"certificate": cert}}, + 'listeners/*:7080', + ) + + check_certificate('') + check_certificate('blah') + check_certificate([]) + check_certificate(['blah']) + check_certificate(['localhost', 'blah']) + check_certificate(['localhost', []]) diff --git a/test/unit/applications/tls.py b/test/unit/applications/tls.py index b0cd5abb..490ae916 100644 --- a/test/unit/applications/tls.py +++ b/test/unit/applications/tls.py @@ -63,19 +63,34 @@ class TestApplicationTLS(TestApplicationProto): return ssl.get_server_certificate(addr, ssl_version=ssl_version) - def openssl_conf(self): + def openssl_conf(self, rewrite=False, alt_names=[]): conf_path = option.temp_dir + '/openssl.conf' - if os.path.exists(conf_path): + if not rewrite and os.path.exists(conf_path): return + # Generates alt_names section with dns names + a_names = "[alt_names]\n" + for i, k in enumerate(alt_names, 1): + a_names += "DNS.%d = %s\n" % (i, k) + + # Generates section for sign request extension + a_sec = """req_extensions = myca_req_extensions + +[ myca_req_extensions ] +subjectAltName = @alt_names + +{a_names}""".format(a_names=a_names) + with open(conf_path, 'w') as f: f.write( """[ req ] default_bits = 2048 encrypt_key = no distinguished_name = req_distinguished_name -[ req_distinguished_name ]""" + +{a_sec} +[ req_distinguished_name ]""".format(a_sec=a_sec if alt_names else "") ) def load(self, script, name=None): diff --git a/test/unit/http.py b/test/unit/http.py index 57e6ed3a..7706fe05 100644 --- a/test/unit/http.py +++ b/test/unit/http.py @@ -44,7 +44,8 @@ class TestHTTP(): sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) if 'wrapper' in kwargs: - sock = kwargs['wrapper'](sock) + server_hostname = headers.get('Host', 'localhost') + sock = kwargs['wrapper'](sock, server_hostname=server_hostname) connect_args = addr if sock_type == 'unix' else (addr, port) try: |