summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--test/test_tls_sni.py286
-rw-r--r--test/unit/applications/tls.py21
-rw-r--r--test/unit/http.py3
3 files changed, 306 insertions, 4 deletions
diff --git a/test/test_tls_sni.py b/test/test_tls_sni.py
new file mode 100644
index 00000000..7da05e6e
--- /dev/null
+++ b/test/test_tls_sni.py
@@ -0,0 +1,286 @@
+import subprocess
+import ssl
+
+import pytest
+from unit.applications.tls import TestApplicationTLS
+from unit.option import option
+
+
+class TestTLSSNI(TestApplicationTLS):
+ prerequisites = {'modules': {'openssl': 'any'}}
+
+ def setup_method(self):
+ self._load_conf(
+ {
+ "listeners": {"*:7080": {"pass": "routes"}},
+ "routes": [{"action": {"return": 200}}],
+ "applications": {},
+ }
+ )
+
+ def openssl_date_to_sec_epoch(self, date):
+ return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z')
+
+ def add_tls(self, cert='default'):
+ assert 'success' in self.conf(
+ {
+ "pass": "routes",
+ "tls": {"certificate": cert}
+ },
+ 'listeners/*:7080',
+ )
+
+ def remove_tls(self):
+ assert 'success' in self.conf({"pass": "routes"}, 'listeners/*:7080')
+
+ def generate_ca_conf(self):
+ with open(option.temp_dir + '/ca.conf', 'w') as f:
+ f.write(
+ """[ ca ]
+default_ca = myca
+
+[ myca ]
+new_certs_dir = %(dir)s
+database = %(database)s
+default_md = sha256
+policy = myca_policy
+serial = %(certserial)s
+default_days = 1
+x509_extensions = myca_extensions
+copy_extensions = copy
+
+[ myca_policy ]
+commonName = optional
+
+[ myca_extensions ]
+basicConstraints = critical,CA:TRUE"""
+ % {
+ 'dir': option.temp_dir,
+ 'database': option.temp_dir + '/certindex',
+ 'certserial': option.temp_dir + '/certserial',
+ }
+ )
+
+ with open(option.temp_dir + '/certserial', 'w') as f:
+ f.write('1000')
+
+ with open(option.temp_dir + '/certindex', 'w') as f:
+ f.write('')
+
+ def config_bundles(self, bundles):
+ self.certificate('root', False)
+
+ for b in bundles:
+ self.openssl_conf(rewrite=True, alt_names=bundles[b]['alt_names'])
+ subj = (
+ '/CN={}/'.format(bundles[b]['subj'])
+ if 'subj' in bundles[b]
+ else '/'
+ )
+
+ subprocess.call(
+ [
+ 'openssl',
+ 'req',
+ '-new',
+ '-subj',
+ subj,
+ '-config',
+ option.temp_dir + '/openssl.conf',
+ '-out',
+ option.temp_dir + '/{}.csr'.format(b),
+ '-keyout',
+ option.temp_dir + '/{}.key'.format(b),
+ ],
+ stderr=subprocess.STDOUT,
+ )
+
+ self.generate_ca_conf()
+
+ for b in bundles:
+ subj = (
+ '/CN={}/'.format(bundles[b]['subj'])
+ if 'subj' in bundles[b]
+ else '/'
+ )
+
+ subprocess.call(
+ [
+ 'openssl',
+ 'ca',
+ '-batch',
+ '-subj',
+ subj,
+ '-config',
+ option.temp_dir + '/ca.conf',
+ '-keyfile',
+ option.temp_dir + '/root.key',
+ '-cert',
+ option.temp_dir + '/root.crt',
+ '-in',
+ option.temp_dir + '/{}.csr'.format(b),
+ '-out',
+ option.temp_dir + '/{}.crt'.format(b),
+ ],
+ stderr=subprocess.STDOUT,
+ )
+
+ self.context = ssl.create_default_context()
+ self.context.check_hostname = False
+ self.context.verify_mode = ssl.CERT_REQUIRED
+ self.context.load_verify_locations(option.temp_dir + '/root.crt')
+
+ self.load_certs(bundles)
+
+ def load_certs(self, bundles):
+ for bname, bvalue in bundles.items():
+ assert 'success' in self.certificate_load(
+ bname, bname
+ ), 'certificate {} upload'.format(bvalue['subj'])
+
+ def check_cert(self, host, expect):
+ resp, sock = self.get_ssl(
+ headers={
+ 'Host': host,
+ 'Content-Length': '0',
+ 'Connection': 'close',
+ },
+ start=True,
+ )
+
+ assert resp['status'] == 200
+ assert sock.getpeercert()['subject'][0][0][1] == expect
+
+ def test_tls_sni(self):
+ bundles = {
+ "default": {
+ "subj": "default",
+ "alt_names": ["default"],
+ },
+ "localhost.com": {
+ "subj": "localhost.com",
+ "alt_names": ["alt1.localhost.com"],
+ },
+ "example.com": {
+ "subj": "example.com",
+ "alt_names": ["alt1.example.com", "alt2.example.com"],
+ },
+ }
+ self.config_bundles(bundles)
+ self.add_tls(["default", "localhost.com", "example.com"])
+
+ self.check_cert('alt1.localhost.com', bundles['localhost.com']['subj'])
+ self.check_cert('alt2.example.com', bundles['example.com']['subj'])
+ self.check_cert('blah', bundles['default']['subj'])
+
+ def test_tls_sni_upper_case(self):
+ bundles = {
+ "localhost.com": {"subj": "LOCALHOST.COM", "alt_names": []},
+ "example.com": {
+ "subj": "example.com",
+ "alt_names": ["ALT1.EXAMPLE.COM", "*.ALT2.EXAMPLE.COM"],
+ },
+ }
+ self.config_bundles(bundles)
+ self.add_tls(["localhost.com", "example.com"])
+
+ self.check_cert('localhost.com', bundles['localhost.com']['subj'])
+ self.check_cert('LOCALHOST.COM', bundles['localhost.com']['subj'])
+ self.check_cert('EXAMPLE.COM', bundles['localhost.com']['subj'])
+ self.check_cert('ALT1.EXAMPLE.COM', bundles['example.com']['subj'])
+ self.check_cert('WWW.ALT2.EXAMPLE.COM', bundles['example.com']['subj'])
+
+ def test_tls_sni_only_bundle(self):
+ bundles = {
+ "localhost.com": {
+ "subj": "localhost.com",
+ "alt_names": ["alt1.localhost.com", "alt2.localhost.com"],
+ }
+ }
+ self.config_bundles(bundles)
+ self.add_tls(["localhost.com"])
+
+ self.check_cert('domain.com', bundles['localhost.com']['subj'])
+ self.check_cert('alt1.domain.com', bundles['localhost.com']['subj'])
+
+ def test_tls_sni_wildcard(self):
+ bundles = {
+ "localhost.com": {
+ "subj": "localhost.com",
+ "alt_names": [],
+ },
+ "example.com": {
+ "subj": "example.com",
+ "alt_names": ["*.example.com", "*.alt.example.com"],
+ },
+ }
+ self.config_bundles(bundles)
+ self.add_tls(["localhost.com", "example.com"])
+
+ self.check_cert('example.com', bundles['localhost.com']['subj'])
+ self.check_cert('www.example.com', bundles['example.com']['subj'])
+ self.check_cert('alt.example.com', bundles['example.com']['subj'])
+ self.check_cert('www.alt.example.com', bundles['example.com']['subj'])
+ self.check_cert('www.alt.example.ru', bundles['localhost.com']['subj'])
+
+ def test_tls_sni_duplicated_bundle(self):
+ bundles = {
+ "localhost.com": {
+ "subj": "localhost.com",
+ "alt_names": ["localhost.com", "alt2.localhost.com"],
+ }
+ }
+ self.config_bundles(bundles)
+ self.add_tls(["localhost.com", "localhost.com"])
+
+ self.check_cert('localhost.com', bundles['localhost.com']['subj'])
+ self.check_cert('alt2.localhost.com', bundles['localhost.com']['subj'])
+
+ def test_tls_sni_same_alt(self):
+ bundles = {
+ "localhost": {"subj": "subj1", "alt_names": "same.altname.com"},
+ "example": {"subj": "subj2", "alt_names": "same.altname.com"},
+ }
+ self.config_bundles(bundles)
+ self.add_tls(["localhost", "example"])
+
+ self.check_cert('localhost', bundles['localhost']['subj'])
+ self.check_cert('example', bundles['localhost']['subj'])
+
+ def test_tls_sni_empty_cn(self):
+ bundles = {
+ "localhost": {
+ "alt_names": ["alt.localhost.com"],
+ }
+ }
+ self.config_bundles(bundles)
+ self.add_tls(["localhost"])
+
+ resp, sock = self.get_ssl(
+ headers={
+ 'Host': 'domain.com',
+ 'Content-Length': '0',
+ 'Connection': 'close',
+ },
+ start=True,
+ )
+
+ assert resp['status'] == 200
+ assert sock.getpeercert()['subjectAltName'][0][1] == 'alt.localhost.com'
+
+ def test_tls_sni_invalid(self):
+ self.config_bundles({"localhost": {"subj": "subj1", "alt_names": ''}})
+ self.add_tls(["localhost"])
+
+ def check_certificate(cert):
+ assert 'error' in self.conf(
+ {"pass": "routes", "tls": {"certificate": cert}},
+ 'listeners/*:7080',
+ )
+
+ check_certificate('')
+ check_certificate('blah')
+ check_certificate([])
+ check_certificate(['blah'])
+ check_certificate(['localhost', 'blah'])
+ check_certificate(['localhost', []])
diff --git a/test/unit/applications/tls.py b/test/unit/applications/tls.py
index b0cd5abb..490ae916 100644
--- a/test/unit/applications/tls.py
+++ b/test/unit/applications/tls.py
@@ -63,19 +63,34 @@ class TestApplicationTLS(TestApplicationProto):
return ssl.get_server_certificate(addr, ssl_version=ssl_version)
- def openssl_conf(self):
+ def openssl_conf(self, rewrite=False, alt_names=[]):
conf_path = option.temp_dir + '/openssl.conf'
- if os.path.exists(conf_path):
+ if not rewrite and os.path.exists(conf_path):
return
+ # Generates alt_names section with dns names
+ a_names = "[alt_names]\n"
+ for i, k in enumerate(alt_names, 1):
+ a_names += "DNS.%d = %s\n" % (i, k)
+
+ # Generates section for sign request extension
+ a_sec = """req_extensions = myca_req_extensions
+
+[ myca_req_extensions ]
+subjectAltName = @alt_names
+
+{a_names}""".format(a_names=a_names)
+
with open(conf_path, 'w') as f:
f.write(
"""[ req ]
default_bits = 2048
encrypt_key = no
distinguished_name = req_distinguished_name
-[ req_distinguished_name ]"""
+
+{a_sec}
+[ req_distinguished_name ]""".format(a_sec=a_sec if alt_names else "")
)
def load(self, script, name=None):
diff --git a/test/unit/http.py b/test/unit/http.py
index 57e6ed3a..7706fe05 100644
--- a/test/unit/http.py
+++ b/test/unit/http.py
@@ -44,7 +44,8 @@ class TestHTTP():
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
if 'wrapper' in kwargs:
- sock = kwargs['wrapper'](sock)
+ server_hostname = headers.get('Host', 'localhost')
+ sock = kwargs['wrapper'](sock, server_hostname=server_hostname)
connect_args = addr if sock_type == 'unix' else (addr, port)
try: