summaryrefslogtreecommitdiffhomepage
path: root/test/test_tls.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_tls.py')
-rw-r--r--test/test_tls.py1019
1 files changed, 520 insertions, 499 deletions
diff --git a/test/test_tls.py b/test/test_tls.py
index 06c38d0b..54fdb665 100644
--- a/test/test_tls.py
+++ b/test/test_tls.py
@@ -4,54 +4,58 @@ import subprocess
import time
import pytest
-from unit.applications.tls import TestApplicationTLS
+from unit.applications.tls import ApplicationTLS
from unit.option import option
-
-class TestTLS(TestApplicationTLS):
- prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}}
-
- def openssl_date_to_sec_epoch(self, date):
- return self.date_to_sec_epoch(date, '%b %d %X %Y %Z')
-
- def add_tls(self, application='empty', cert='default', port=7080):
- assert 'success' in self.conf(
- {
- "pass": f"applications/{application}",
- "tls": {"certificate": cert},
- },
- f'listeners/*:{port}',
- )
-
- def remove_tls(self, application='empty', port=7080):
- assert 'success' in self.conf(
- {"pass": f"applications/{application}"}, f'listeners/*:{port}'
- )
-
- def req(self, name='localhost', subject=None, x509=False):
- subj = subject if subject is not None else f'/CN={name}/'
-
- subprocess.check_output(
- [
- 'openssl',
- 'req',
- '-new',
- '-subj',
- subj,
- '-config',
- f'{option.temp_dir}/openssl.conf',
- '-out',
- f'{option.temp_dir}/{name}.csr',
- '-keyout',
- f'{option.temp_dir}/{name}.key',
- ],
- stderr=subprocess.STDOUT,
- )
-
- def generate_ca_conf(self):
- with open(f'{option.temp_dir}/ca.conf', 'w') as f:
- f.write(
- f"""[ ca ]
+prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}}
+
+client = ApplicationTLS()
+
+
+def add_tls(application='empty', cert='default', port=7080):
+ assert 'success' in client.conf(
+ {
+ "pass": f"applications/{application}",
+ "tls": {"certificate": cert},
+ },
+ f'listeners/*:{port}',
+ )
+
+
+def ca(cert='root', out='localhost'):
+ subprocess.check_output(
+ [
+ 'openssl',
+ 'ca',
+ '-batch',
+ '-config',
+ f'{option.temp_dir}/ca.conf',
+ '-keyfile',
+ f'{option.temp_dir}/{cert}.key',
+ '-cert',
+ f'{option.temp_dir}/{cert}.crt',
+ '-in',
+ f'{option.temp_dir}/{out}.csr',
+ '-out',
+ f'{option.temp_dir}/{out}.crt',
+ ],
+ stderr=subprocess.STDOUT,
+ )
+
+
+def context_cert_req(cert='root'):
+ context = ssl.create_default_context()
+ context.check_hostname = False
+ context.verify_mode = ssl.CERT_REQUIRED
+ context.load_verify_locations(f'{option.temp_dir}/{cert}.crt')
+
+ return context
+
+
+def generate_ca_conf():
+ with open(f'{option.temp_dir}/ca.conf', 'w') as f:
+ f.write(
+ f"""[ ca ]
default_ca = myca
[ myca ]
@@ -69,615 +73,632 @@ commonName = optional
[ myca_extensions ]
basicConstraints = critical,CA:TRUE"""
- )
-
- with open(f'{option.temp_dir}/certserial', 'w') as f:
- f.write('1000')
-
- with open(f'{option.temp_dir}/certindex', 'w') as f:
- f.write('')
-
- with open(f'{option.temp_dir}/certindex.attr', 'w') as f:
- f.write('')
-
- def ca(self, cert='root', out='localhost'):
- subprocess.check_output(
- [
- 'openssl',
- 'ca',
- '-batch',
- '-config',
- f'{option.temp_dir}/ca.conf',
- '-keyfile',
- f'{option.temp_dir}/{cert}.key',
- '-cert',
- f'{option.temp_dir}/{cert}.crt',
- '-in',
- f'{option.temp_dir}/{out}.csr',
- '-out',
- f'{option.temp_dir}/{out}.crt',
- ],
- stderr=subprocess.STDOUT,
)
- def set_certificate_req_context(self, cert='root'):
- self.context = ssl.create_default_context()
- self.context.check_hostname = False
- self.context.verify_mode = ssl.CERT_REQUIRED
- self.context.load_verify_locations(f'{option.temp_dir}/{cert}.crt')
+ with open(f'{option.temp_dir}/certserial', 'w') as f:
+ f.write('1000')
- def test_tls_listener_option_add(self):
- self.load('empty')
+ with open(f'{option.temp_dir}/certindex', 'w') as f:
+ f.write('')
- self.certificate()
+ with open(f'{option.temp_dir}/certindex.attr', 'w') as f:
+ f.write('')
- self.add_tls()
- assert self.get_ssl()['status'] == 200, 'add listener option'
+def remove_tls(application='empty', port=7080):
+ assert 'success' in client.conf(
+ {"pass": f"applications/{application}"}, f'listeners/*:{port}'
+ )
- def test_tls_listener_option_remove(self):
- self.load('empty')
- self.certificate()
+def req(name='localhost', subject=None):
+ subj = subject if subject is not None else f'/CN={name}/'
- self.add_tls()
+ subprocess.check_output(
+ [
+ 'openssl',
+ 'req',
+ '-new',
+ '-subj',
+ subj,
+ '-config',
+ f'{option.temp_dir}/openssl.conf',
+ '-out',
+ f'{option.temp_dir}/{name}.csr',
+ '-keyout',
+ f'{option.temp_dir}/{name}.key',
+ ],
+ stderr=subprocess.STDOUT,
+ )
- self.get_ssl()
- self.remove_tls()
+def test_tls_listener_option_add():
+ client.load('empty')
- assert self.get()['status'] == 200, 'remove listener option'
+ client.certificate()
- def test_tls_certificate_remove(self):
- self.load('empty')
+ add_tls()
- self.certificate()
+ assert client.get_ssl()['status'] == 200, 'add listener option'
- assert 'success' in self.conf_delete(
- '/certificates/default'
- ), 'remove certificate'
- def test_tls_certificate_remove_used(self):
- self.load('empty')
+def test_tls_listener_option_remove():
+ client.load('empty')
- self.certificate()
+ client.certificate()
- self.add_tls()
+ add_tls()
- assert 'error' in self.conf_delete(
- '/certificates/default'
- ), 'remove certificate'
+ client.get_ssl()
- def test_tls_certificate_remove_nonexisting(self):
- self.load('empty')
+ remove_tls()
- self.certificate()
+ assert client.get()['status'] == 200, 'remove listener option'
- self.add_tls()
- assert 'error' in self.conf_delete(
- '/certificates/blah'
- ), 'remove nonexistings certificate'
+def test_tls_certificate_remove():
+ client.load('empty')
- @pytest.mark.skip('not yet')
- def test_tls_certificate_update(self):
- self.load('empty')
+ client.certificate()
- self.certificate()
+ assert 'success' in client.conf_delete(
+ '/certificates/default'
+ ), 'remove certificate'
- self.add_tls()
- cert_old = ssl.get_server_certificate(('127.0.0.1', 7080))
+def test_tls_certificate_remove_used():
+ client.load('empty')
- self.certificate()
+ client.certificate()
- assert cert_old != ssl.get_server_certificate(
- ('127.0.0.1', 7080)
- ), 'update certificate'
+ add_tls()
- @pytest.mark.skip('not yet')
- def test_tls_certificate_key_incorrect(self):
- self.load('empty')
+ assert 'error' in client.conf_delete(
+ '/certificates/default'
+ ), 'remove certificate'
- self.certificate('first', False)
- self.certificate('second', False)
- assert 'error' in self.certificate_load(
- 'first', 'second'
- ), 'key incorrect'
+def test_tls_certificate_remove_nonexisting():
+ client.load('empty')
- def test_tls_certificate_change(self):
- self.load('empty')
+ client.certificate()
- self.certificate()
- self.certificate('new')
+ add_tls()
- self.add_tls()
+ assert 'error' in client.conf_delete(
+ '/certificates/blah'
+ ), 'remove nonexistings certificate'
- cert_old = ssl.get_server_certificate(('127.0.0.1', 7080))
- self.add_tls(cert='new')
+@pytest.mark.skip('not yet')
+def test_tls_certificate_update():
+ client.load('empty')
- assert cert_old != ssl.get_server_certificate(
- ('127.0.0.1', 7080)
- ), 'change certificate'
+ client.certificate()
- def test_tls_certificate_key_rsa(self):
- self.load('empty')
+ add_tls()
- self.certificate()
+ cert_old = ssl.get_server_certificate(('127.0.0.1', 7080))
- assert (
- self.conf_get('/certificates/default/key') == 'RSA (2048 bits)'
- ), 'certificate key rsa'
+ client.certificate()
- def test_tls_certificate_key_ec(self, temp_dir):
- self.load('empty')
+ assert cert_old != ssl.get_server_certificate(
+ ('127.0.0.1', 7080)
+ ), 'update certificate'
- self.openssl_conf()
- subprocess.check_output(
- [
- 'openssl',
- 'ecparam',
- '-noout',
- '-genkey',
- '-out',
- f'{temp_dir}/ec.key',
- '-name',
- 'prime256v1',
- ],
- stderr=subprocess.STDOUT,
- )
+@pytest.mark.skip('not yet')
+def test_tls_certificate_key_incorrect():
+ client.load('empty')
- subprocess.check_output(
- [
- 'openssl',
- 'req',
- '-x509',
- '-new',
- '-subj',
- '/CN=ec/',
- '-config',
- f'{temp_dir}/openssl.conf',
- '-key',
- f'{temp_dir}/ec.key',
- '-out',
- f'{temp_dir}/ec.crt',
- ],
- stderr=subprocess.STDOUT,
- )
+ client.certificate('first', False)
+ client.certificate('second', False)
- self.certificate_load('ec')
+ assert 'error' in client.certificate_load(
+ 'first', 'second'
+ ), 'key incorrect'
- assert (
- self.conf_get('/certificates/ec/key') == 'ECDH'
- ), 'certificate key ec'
- def test_tls_certificate_chain_options(self):
- self.load('empty')
+def test_tls_certificate_change():
+ client.load('empty')
- self.certificate()
+ client.certificate()
+ client.certificate('new')
- chain = self.conf_get('/certificates/default/chain')
+ add_tls()
- assert len(chain) == 1, 'certificate chain length'
+ cert_old = ssl.get_server_certificate(('127.0.0.1', 7080))
- cert = chain[0]
+ add_tls(cert='new')
- assert (
- cert['subject']['common_name'] == 'default'
- ), 'certificate subject common name'
- assert (
- cert['issuer']['common_name'] == 'default'
- ), 'certificate issuer common name'
+ assert cert_old != ssl.get_server_certificate(
+ ('127.0.0.1', 7080)
+ ), 'change certificate'
- assert (
- abs(
- self.sec_epoch()
- - self.openssl_date_to_sec_epoch(cert['validity']['since'])
- )
- < 60
- ), 'certificate validity since'
- assert (
- self.openssl_date_to_sec_epoch(cert['validity']['until'])
- - self.openssl_date_to_sec_epoch(cert['validity']['since'])
- == 2592000
- ), 'certificate validity until'
- def test_tls_certificate_chain(self, temp_dir):
- self.load('empty')
+def test_tls_certificate_key_rsa():
+ client.load('empty')
- self.certificate('root', False)
+ client.certificate()
- self.req('int')
- self.req('end')
+ assert (
+ client.conf_get('/certificates/default/key') == 'RSA (2048 bits)'
+ ), 'certificate key rsa'
- self.generate_ca_conf()
- self.ca(cert='root', out='int')
- self.ca(cert='int', out='end')
+def test_tls_certificate_key_ec(temp_dir):
+ client.load('empty')
- crt_path = f'{temp_dir}/end-int.crt'
- end_path = f'{temp_dir}/end.crt'
- int_path = f'{temp_dir}/int.crt'
+ client.openssl_conf()
- with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open(
- int_path, 'rb'
- ) as int:
- crt.write(end.read() + int.read())
-
- self.set_certificate_req_context()
-
- # incomplete chain
-
- assert 'success' in self.certificate_load(
- 'end', 'end'
- ), 'certificate chain end upload'
+ subprocess.check_output(
+ [
+ 'openssl',
+ 'ecparam',
+ '-noout',
+ '-genkey',
+ '-out',
+ f'{temp_dir}/ec.key',
+ '-name',
+ 'prime256v1',
+ ],
+ stderr=subprocess.STDOUT,
+ )
- chain = self.conf_get('/certificates/end/chain')
- assert len(chain) == 1, 'certificate chain end length'
- assert (
- chain[0]['subject']['common_name'] == 'end'
- ), 'certificate chain end subject common name'
- assert (
- chain[0]['issuer']['common_name'] == 'int'
- ), 'certificate chain end issuer common name'
+ subprocess.check_output(
+ [
+ 'openssl',
+ 'req',
+ '-x509',
+ '-new',
+ '-subj',
+ '/CN=ec/',
+ '-config',
+ f'{temp_dir}/openssl.conf',
+ '-key',
+ f'{temp_dir}/ec.key',
+ '-out',
+ f'{temp_dir}/ec.crt',
+ ],
+ stderr=subprocess.STDOUT,
+ )
- self.add_tls(cert='end')
+ client.certificate_load('ec')
- try:
- resp = self.get_ssl()
- except ssl.SSLError:
- resp = None
+ assert (
+ client.conf_get('/certificates/ec/key') == 'ECDH'
+ ), 'certificate key ec'
- assert resp == None, 'certificate chain incomplete chain'
- # intermediate
+def test_tls_certificate_chain_options(date_to_sec_epoch, sec_epoch):
+ client.load('empty')
+ date_format = '%b %d %X %Y %Z'
- assert 'success' in self.certificate_load(
- 'int', 'int'
- ), 'certificate chain int upload'
+ client.certificate()
- chain = self.conf_get('/certificates/int/chain')
- assert len(chain) == 1, 'certificate chain int length'
- assert (
- chain[0]['subject']['common_name'] == 'int'
- ), 'certificate chain int subject common name'
- assert (
- chain[0]['issuer']['common_name'] == 'root'
- ), 'certificate chain int issuer common name'
+ chain = client.conf_get('/certificates/default/chain')
- self.add_tls(cert='int')
+ assert len(chain) == 1, 'certificate chain length'
- assert self.get_ssl()['status'] == 200, 'certificate chain intermediate'
+ cert = chain[0]
- # intermediate server
+ assert (
+ cert['subject']['common_name'] == 'default'
+ ), 'certificate subject common name'
+ assert (
+ cert['issuer']['common_name'] == 'default'
+ ), 'certificate issuer common name'
- assert 'success' in self.certificate_load(
- 'end-int', 'end'
- ), 'certificate chain end-int upload'
+ assert (
+ abs(
+ sec_epoch
+ - date_to_sec_epoch(cert['validity']['since'], date_format)
+ )
+ < 60
+ ), 'certificate validity since'
+ assert (
+ date_to_sec_epoch(cert['validity']['until'], date_format)
+ - date_to_sec_epoch(cert['validity']['since'], date_format)
+ == 2592000
+ ), 'certificate validity until'
- chain = self.conf_get('/certificates/end-int/chain')
- assert len(chain) == 2, 'certificate chain end-int length'
- assert (
- chain[0]['subject']['common_name'] == 'end'
- ), 'certificate chain end-int int subject common name'
- assert (
- chain[0]['issuer']['common_name'] == 'int'
- ), 'certificate chain end-int int issuer common name'
- assert (
- chain[1]['subject']['common_name'] == 'int'
- ), 'certificate chain end-int end subject common name'
- assert (
- chain[1]['issuer']['common_name'] == 'root'
- ), 'certificate chain end-int end issuer common name'
- self.add_tls(cert='end-int')
+def test_tls_certificate_chain(temp_dir):
+ client.load('empty')
- assert (
- self.get_ssl()['status'] == 200
- ), 'certificate chain intermediate server'
+ client.certificate('root', False)
- def test_tls_certificate_chain_long(self, temp_dir):
- self.load('empty')
+ req('int')
+ req('end')
- self.generate_ca_conf()
+ generate_ca_conf()
- # Minimum chain length is 3.
- chain_length = 10
+ ca(cert='root', out='int')
+ ca(cert='int', out='end')
- for i in range(chain_length):
- if i == 0:
- self.certificate('root', False)
- elif i == chain_length - 1:
- self.req('end')
- else:
- self.req(f'int{i}')
+ crt_path = f'{temp_dir}/end-int.crt'
+ end_path = f'{temp_dir}/end.crt'
+ int_path = f'{temp_dir}/int.crt'
+
+ with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open(
+ int_path, 'rb'
+ ) as int:
+ crt.write(end.read() + int.read())
+
+ # incomplete chain
+
+ assert 'success' in client.certificate_load(
+ 'end', 'end'
+ ), 'certificate chain end upload'
+
+ chain = client.conf_get('/certificates/end/chain')
+ assert len(chain) == 1, 'certificate chain end length'
+ assert (
+ chain[0]['subject']['common_name'] == 'end'
+ ), 'certificate chain end subject common name'
+ assert (
+ chain[0]['issuer']['common_name'] == 'int'
+ ), 'certificate chain end issuer common name'
+
+ add_tls(cert='end')
+
+ ctx_cert_req = context_cert_req()
+ try:
+ resp = client.get_ssl(context=ctx_cert_req)
+ except ssl.SSLError:
+ resp = None
+
+ assert resp is None, 'certificate chain incomplete chain'
+
+ # intermediate
+
+ assert 'success' in client.certificate_load(
+ 'int', 'int'
+ ), 'certificate chain int upload'
+
+ chain = client.conf_get('/certificates/int/chain')
+ assert len(chain) == 1, 'certificate chain int length'
+ assert (
+ chain[0]['subject']['common_name'] == 'int'
+ ), 'certificate chain int subject common name'
+ assert (
+ chain[0]['issuer']['common_name'] == 'root'
+ ), 'certificate chain int issuer common name'
+
+ add_tls(cert='int')
+
+ assert client.get_ssl()['status'] == 200, 'certificate chain intermediate'
+
+ # intermediate server
+
+ assert 'success' in client.certificate_load(
+ 'end-int', 'end'
+ ), 'certificate chain end-int upload'
+
+ chain = client.conf_get('/certificates/end-int/chain')
+ assert len(chain) == 2, 'certificate chain end-int length'
+ assert (
+ chain[0]['subject']['common_name'] == 'end'
+ ), 'certificate chain end-int int subject common name'
+ assert (
+ chain[0]['issuer']['common_name'] == 'int'
+ ), 'certificate chain end-int int issuer common name'
+ assert (
+ chain[1]['subject']['common_name'] == 'int'
+ ), 'certificate chain end-int end subject common name'
+ assert (
+ chain[1]['issuer']['common_name'] == 'root'
+ ), 'certificate chain end-int end issuer common name'
+
+ add_tls(cert='end-int')
+
+ assert (
+ client.get_ssl(context=ctx_cert_req)['status'] == 200
+ ), 'certificate chain intermediate server'
+
+
+def test_tls_certificate_chain_long(temp_dir):
+ client.load('empty')
- for i in range(chain_length - 1):
- if i == 0:
- self.ca(cert='root', out='int1')
- elif i == chain_length - 2:
- self.ca(cert=f'int{(chain_length - 2)}', out='end')
- else:
- self.ca(cert=f'int{i}', out=f'int{(i + 1)}')
+ generate_ca_conf()
- for i in range(chain_length - 1, 0, -1):
- path = (
- f'{temp_dir}/end.crt'
- if i == chain_length - 1
- else f'{temp_dir}/int{i}.crt'
- )
+ # Minimum chain length is 3.
+ chain_length = 10
+
+ for i in range(chain_length):
+ if i == 0:
+ client.certificate('root', False)
+ elif i == chain_length - 1:
+ req('end')
+ else:
+ req(f'int{i}')
+
+ for i in range(chain_length - 1):
+ if i == 0:
+ ca(cert='root', out='int1')
+ elif i == chain_length - 2:
+ ca(cert=f'int{(chain_length - 2)}', out='end')
+ else:
+ ca(cert=f'int{i}', out=f'int{(i + 1)}')
+
+ for i in range(chain_length - 1, 0, -1):
+ path = (
+ f'{temp_dir}/end.crt'
+ if i == chain_length - 1
+ else f'{temp_dir}/int{i}.crt'
+ )
- with open(f'{temp_dir}/all.crt', 'a') as chain, open(path) as cert:
- chain.write(cert.read())
+ with open(f'{temp_dir}/all.crt', 'a') as chain, open(path) as cert:
+ chain.write(cert.read())
- self.set_certificate_req_context()
+ assert 'success' in client.certificate_load(
+ 'all', 'end'
+ ), 'certificate chain upload'
- assert 'success' in self.certificate_load(
- 'all', 'end'
- ), 'certificate chain upload'
+ chain = client.conf_get('/certificates/all/chain')
+ assert len(chain) == chain_length - 1, 'certificate chain length'
- chain = self.conf_get('/certificates/all/chain')
- assert len(chain) == chain_length - 1, 'certificate chain length'
+ add_tls(cert='all')
- self.add_tls(cert='all')
+ assert (
+ client.get_ssl(context=context_cert_req())['status'] == 200
+ ), 'certificate chain long'
- assert self.get_ssl()['status'] == 200, 'certificate chain long'
- def test_tls_certificate_empty_cn(self, temp_dir):
- self.certificate('root', False)
+def test_tls_certificate_empty_cn():
+ client.certificate('root', False)
- self.req(subject='/')
+ req(subject='/')
- self.generate_ca_conf()
- self.ca()
+ generate_ca_conf()
+ ca()
- self.set_certificate_req_context()
+ assert 'success' in client.certificate_load('localhost', 'localhost')
- assert 'success' in self.certificate_load('localhost', 'localhost')
+ cert = client.conf_get('/certificates/localhost')
+ assert cert['chain'][0]['subject'] == {}, 'empty subject'
+ assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
- cert = self.conf_get('/certificates/localhost')
- assert cert['chain'][0]['subject'] == {}, 'empty subject'
- assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
- def test_tls_certificate_empty_cn_san(self, temp_dir):
- self.certificate('root', False)
+def test_tls_certificate_empty_cn_san():
+ client.certificate('root', False)
- self.openssl_conf(
- rewrite=True, alt_names=["example.com", "www.example.net"]
- )
+ client.openssl_conf(
+ rewrite=True, alt_names=["example.com", "www.example.net"]
+ )
- self.req(subject='/')
+ req(subject='/')
- self.generate_ca_conf()
- self.ca()
+ generate_ca_conf()
+ ca()
- self.set_certificate_req_context()
+ assert 'success' in client.certificate_load('localhost', 'localhost')
- assert 'success' in self.certificate_load('localhost', 'localhost')
+ cert = client.conf_get('/certificates/localhost')
+ assert cert['chain'][0]['subject'] == {
+ 'alt_names': ['example.com', 'www.example.net']
+ }, 'subject alt_names'
+ assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
- cert = self.conf_get('/certificates/localhost')
- assert cert['chain'][0]['subject'] == {
- 'alt_names': ['example.com', 'www.example.net']
- }, 'subject alt_names'
- assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
- def test_tls_certificate_empty_cn_san_ip(self):
- self.certificate('root', False)
+def test_tls_certificate_empty_cn_san_ip():
+ client.certificate('root', False)
- self.openssl_conf(
- rewrite=True,
- alt_names=['example.com', 'www.example.net', 'IP|10.0.0.1'],
- )
+ client.openssl_conf(
+ rewrite=True,
+ alt_names=['example.com', 'www.example.net', 'IP|10.0.0.1'],
+ )
- self.req(subject='/')
+ req(subject='/')
- self.generate_ca_conf()
- self.ca()
+ generate_ca_conf()
+ ca()
- self.set_certificate_req_context()
+ assert 'success' in client.certificate_load('localhost', 'localhost')
- assert 'success' in self.certificate_load('localhost', 'localhost')
+ cert = client.conf_get('/certificates/localhost')
+ assert cert['chain'][0]['subject'] == {
+ 'alt_names': ['example.com', 'www.example.net']
+ }, 'subject alt_names'
+ assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
- cert = self.conf_get('/certificates/localhost')
- assert cert['chain'][0]['subject'] == {
- 'alt_names': ['example.com', 'www.example.net']
- }, 'subject alt_names'
- assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
- def test_tls_keepalive(self):
- self.load('mirror')
+def test_tls_keepalive():
+ client.load('mirror')
- assert self.get()['status'] == 200, 'init'
+ assert client.get()['status'] == 200, 'init'
- self.certificate()
+ client.certificate()
- self.add_tls(application='mirror')
+ add_tls(application='mirror')
- (resp, sock) = self.post_ssl(
- headers={
- 'Host': 'localhost',
- 'Connection': 'keep-alive',
- },
- start=True,
- body='0123456789',
- read_timeout=1,
- )
+ (resp, sock) = client.post_ssl(
+ headers={
+ 'Host': 'localhost',
+ 'Connection': 'keep-alive',
+ },
+ start=True,
+ body='0123456789',
+ read_timeout=1,
+ )
- assert resp['body'] == '0123456789', 'keepalive 1'
+ assert resp['body'] == '0123456789', 'keepalive 1'
- resp = self.post_ssl(
- headers={
- 'Host': 'localhost',
- 'Connection': 'close',
+ resp = client.post_ssl(
+ headers={
+ 'Host': 'localhost',
+ 'Connection': 'close',
+ },
+ sock=sock,
+ body='0123456789',
+ )
+
+ assert resp['body'] == '0123456789', 'keepalive 2'
+
+
+def test_tls_no_close_notify():
+ client.certificate()
+
+ assert 'success' in client.conf(
+ {
+ "listeners": {
+ "*:7080": {
+ "pass": "routes",
+ "tls": {"certificate": "default"},
+ }
},
- sock=sock,
- body='0123456789',
- )
+ "routes": [{"action": {"return": 200}}],
+ "applications": {},
+ }
+ ), 'load application configuration'
- assert resp['body'] == '0123456789', 'keepalive 2'
-
- def test_tls_no_close_notify(self):
- self.certificate()
-
- assert 'success' in self.conf(
- {
- "listeners": {
- "*:7080": {
- "pass": "routes",
- "tls": {"certificate": "default"},
- }
- },
- "routes": [{"action": {"return": 200}}],
- "applications": {},
- }
- ), 'load application configuration'
+ (_, sock) = client.get_ssl(start=True)
- (resp, sock) = self.get_ssl(start=True)
+ time.sleep(5)
- time.sleep(5)
+ sock.close()
- sock.close()
- @pytest.mark.skip('not yet')
- def test_tls_keepalive_certificate_remove(self):
- self.load('empty')
+@pytest.mark.skip('not yet')
+def test_tls_keepalive_certificate_remove():
+ client.load('empty')
- assert self.get()['status'] == 200, 'init'
+ assert client.get()['status'] == 200, 'init'
- self.certificate()
+ client.certificate()
- self.add_tls()
+ add_tls()
- (resp, sock) = self.get_ssl(
- headers={'Host': 'localhost', 'Connection': 'keep-alive'},
- start=True,
- read_timeout=1,
- )
+ (resp, sock) = client.get_ssl(
+ headers={'Host': 'localhost', 'Connection': 'keep-alive'},
+ start=True,
+ read_timeout=1,
+ )
- assert 'success' in self.conf(
- {"pass": "applications/empty"}, 'listeners/*:7080'
- )
- assert 'success' in self.conf_delete('/certificates/default')
+ assert 'success' in client.conf(
+ {"pass": "applications/empty"}, 'listeners/*:7080'
+ )
+ assert 'success' in client.conf_delete('/certificates/default')
- try:
- resp = self.get_ssl(sock=sock)
+ try:
+ resp = client.get_ssl(sock=sock)
- except KeyboardInterrupt:
- raise
+ except KeyboardInterrupt:
+ raise
- except:
- resp = None
+ except:
+ resp = None
- assert resp == None, 'keepalive remove certificate'
+ assert resp is None, 'keepalive remove certificate'
- @pytest.mark.skip('not yet')
- def test_tls_certificates_remove_all(self):
- self.load('empty')
- self.certificate()
+@pytest.mark.skip('not yet')
+def test_tls_certificates_remove_all():
+ client.load('empty')
- assert 'success' in self.conf_delete(
- '/certificates'
- ), 'remove all certificates'
+ client.certificate()
- def test_tls_application_respawn(self, skip_alert):
- self.load('mirror')
+ assert 'success' in client.conf_delete(
+ '/certificates'
+ ), 'remove all certificates'
- self.certificate()
- assert 'success' in self.conf('1', 'applications/mirror/processes')
+def test_tls_application_respawn(findall, skip_alert, wait_for_record):
+ client.load('mirror')
- self.add_tls(application='mirror')
+ client.certificate()
- (_, sock) = self.post_ssl(
- headers={
- 'Host': 'localhost',
- 'Connection': 'keep-alive',
- },
- start=True,
- body='0123456789',
- read_timeout=1,
- )
+ assert 'success' in client.conf('1', 'applications/mirror/processes')
- app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0]
+ add_tls(application='mirror')
- subprocess.check_output(['kill', '-9', app_id])
+ (_, sock) = client.post_ssl(
+ headers={
+ 'Host': 'localhost',
+ 'Connection': 'keep-alive',
+ },
+ start=True,
+ body='0123456789',
+ read_timeout=1,
+ )
- skip_alert(fr'process {app_id} exited on signal 9')
+ app_id = findall(r'(\d+)#\d+ "mirror" application started')[0]
- self.wait_for_record(
- fr' (?!{app_id}#)(\d+)#\d+ "mirror" application started'
- )
+ subprocess.check_output(['kill', '-9', app_id])
- resp = self.post_ssl(sock=sock, body='0123456789')
+ skip_alert(fr'process {app_id} exited on signal 9')
- assert resp['status'] == 200, 'application respawn status'
- assert resp['body'] == '0123456789', 'application respawn body'
+ wait_for_record(fr' (?!{app_id}#)(\d+)#\d+ "mirror" application started')
- def test_tls_url_scheme(self):
- self.load('variables')
+ resp = client.post_ssl(sock=sock, body='0123456789')
- assert (
- self.post(
- headers={
- 'Host': 'localhost',
- 'Content-Type': 'text/html',
- 'Custom-Header': '',
- 'Connection': 'close',
- }
- )['headers']['Wsgi-Url-Scheme']
- == 'http'
- ), 'url scheme http'
+ assert resp['status'] == 200, 'application respawn status'
+ assert resp['body'] == '0123456789', 'application respawn body'
- self.certificate()
- self.add_tls(application='variables')
+def test_tls_url_scheme():
+ client.load('variables')
- assert (
- self.post_ssl(
- headers={
- 'Host': 'localhost',
- 'Content-Type': 'text/html',
- 'Custom-Header': '',
- 'Connection': 'close',
- }
- )['headers']['Wsgi-Url-Scheme']
- == 'https'
- ), 'url scheme https'
+ assert (
+ client.post(
+ headers={
+ 'Host': 'localhost',
+ 'Content-Type': 'text/html',
+ 'Custom-Header': '',
+ 'Connection': 'close',
+ }
+ )['headers']['Wsgi-Url-Scheme']
+ == 'http'
+ ), 'url scheme http'
- def test_tls_big_upload(self):
- self.load('upload')
+ client.certificate()
- self.certificate()
+ add_tls(application='variables')
- self.add_tls(application='upload')
+ assert (
+ client.post_ssl(
+ headers={
+ 'Host': 'localhost',
+ 'Content-Type': 'text/html',
+ 'Custom-Header': '',
+ 'Connection': 'close',
+ }
+ )['headers']['Wsgi-Url-Scheme']
+ == 'https'
+ ), 'url scheme https'
- filename = 'test.txt'
- data = '0123456789' * 9000
- res = self.post_ssl(
- body={
- 'file': {
- 'filename': filename,
- 'type': 'text/plain',
- 'data': io.StringIO(data),
- }
+def test_tls_big_upload():
+ client.load('upload')
+
+ client.certificate()
+
+ add_tls(application='upload')
+
+ filename = 'test.txt'
+ data = '0123456789' * 9000
+
+ res = client.post_ssl(
+ body={
+ 'file': {
+ 'filename': filename,
+ 'type': 'text/plain',
+ 'data': io.StringIO(data),
}
- )
- assert res['status'] == 200, 'status ok'
- assert res['body'] == f'{filename}{data}'
+ }
+ )
+ assert res['status'] == 200, 'status ok'
+ assert res['body'] == f'{filename}{data}'
+
- def test_tls_multi_listener(self):
- self.load('empty')
+def test_tls_multi_listener():
+ client.load('empty')
- self.certificate()
+ client.certificate()
- self.add_tls()
- self.add_tls(port=7081)
+ add_tls()
+ add_tls(port=7081)
- assert self.get_ssl()['status'] == 200, 'listener #1'
+ assert client.get_ssl()['status'] == 200, 'listener #1'
- assert self.get_ssl(port=7081)['status'] == 200, 'listener #2'
+ assert client.get_ssl(port=7081)['status'] == 200, 'listener #2'